abi upgrad (#259)

This commit is contained in:
Austin 2024-03-06 15:21:22 -05:00 committed by GitHub
parent d5291ae73f
commit 6c443f5fd1
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 668 additions and 191 deletions

View File

@ -6,20 +6,27 @@ WITH retry AS (
SELECT
contract_address,
MAX(block_number) AS block_number,
COUNT(*) AS events
GREATEST(
latest_call_block,
latest_event_block
) AS block_number,
total_interaction_count
FROM
{{ ref("silver__logs") }}
l
{{ ref("silver__relevant_contracts") }}
r
LEFT JOIN {{ source(
'optimism_silver',
'verified_abis'
) }}
v USING (contract_address)
WHERE
l.block_timestamp >= CURRENT_DATE - INTERVAL '30 days' -- recent activity
r.total_interaction_count >= 250 -- high interaction count
AND GREATEST(
max_inserted_timestamp_logs,
max_inserted_timestamp_traces
) >= CURRENT_DATE - INTERVAL '30 days' -- recent activity
AND v.contract_address IS NULL -- no verified abi
AND l.contract_address NOT IN (
AND r.contract_address NOT IN (
SELECT
contract_address
FROM
@ -31,10 +38,9 @@ WITH retry AS (
_inserted_timestamp >= CURRENT_DATE - INTERVAL '30 days' -- this won't let us retry the same contract within 30 days
AND abi_data :data :result :: STRING <> 'Max rate limit reached'
)
GROUP BY
contract_address
ORDER BY
events DESC
total_interaction_count DESC
LIMIT
25
), FINAL AS (

View File

@ -10,7 +10,9 @@ WITH base AS (
SELECT
contract_address
FROM
{{ ref('silver__relevant_abi_contracts') }}
{{ ref('silver__relevant_contracts') }}
WHERE
total_interaction_count >= 100
{% if is_incremental() %}
EXCEPT

View File

@ -9,19 +9,22 @@ WITH base AS (
SELECT
contract_address,
latest_block
latest_event_block AS latest_block
FROM
{{ ref('silver__relevant_contracts') }}
WHERE
total_event_count >= 25
{% if is_incremental() %}
WHERE
contract_address NOT IN (
SELECT
contract_address
FROM
{{ this }}
)
AND contract_address NOT IN (
SELECT
contract_address
FROM
{{ this }}
)
{% endif %}
ORDER BY
total_event_count DESC
LIMIT
500
), function_sigs AS (

View File

@ -6,18 +6,67 @@
tags = ['abis']
) }}
WITH proxies AS (
WITH new_abis AS (
SELECT
created_block,
proxy_created_block,
contract_address,
proxy_address,
start_block,
_id,
_inserted_timestamp
DISTINCT contract_address
FROM
{{ ref('silver__flat_event_abis') }}
{% if is_incremental() %}
WHERE
_inserted_timestamp >= (
SELECT
MAX(_inserted_timestamp) - INTERVAL '12 hours'
FROM
{{ this }}
)
{% endif %}
),
proxies AS (
SELECT
p0.created_block,
p0.proxy_created_block,
p0.contract_address,
p0.proxy_address,
p0.start_block,
p0._id,
p0._inserted_timestamp
FROM
{{ ref('silver__proxies') }}
p0
JOIN new_abis na0
ON p0.contract_address = na0.contract_address
UNION
SELECT
p1.created_block,
p1.proxy_created_block,
p1.contract_address,
p1.proxy_address,
p1.start_block,
p1._id,
p1._inserted_timestamp
FROM
{{ ref('silver__proxies') }}
p1
JOIN new_abis na1
ON p1.proxy_address = na1.contract_address
),
all_relevant_contracts AS (
SELECT
DISTINCT contract_address
FROM
proxies
UNION
SELECT
DISTINCT proxy_address AS contract_address
FROM
proxies
UNION
SELECT
contract_address
FROM
new_abis
),
flat_abis AS (
SELECT
@ -32,27 +81,7 @@ flat_abis AS (
_inserted_timestamp
FROM
{{ ref('silver__flat_event_abis') }}
{% if is_incremental() %}
WHERE
_inserted_timestamp >= (
SELECT
MAX(_inserted_timestamp) - INTERVAL '24 hours'
FROM
{{ this }}
)
OR contract_address IN (
SELECT
DISTINCT contract_address AS contract_address
FROM
proxies
UNION ALL
SELECT
DISTINCT proxy_address AS contract_address
FROM
proxies
)
{% endif %}
JOIN all_relevant_contracts USING (contract_address)
),
base AS (
SELECT
@ -152,8 +181,8 @@ new_records AS (
ORDER BY
priority ASC,
_inserted_timestamp DESC,
proxy_created_block DESC NULLS LAST,
proxy_inserted_timestamp DESC NULLS LAST
proxy_created_block DESC nulls last,
proxy_inserted_timestamp DESC nulls last
) = 1
)
SELECT
@ -184,4 +213,4 @@ FROM
start_block
ORDER BY
_inserted_timestamp DESC
) = 1
) = 1

View File

@ -1,29 +0,0 @@
{{ config(
materialized = 'table',
unique_key = "contract_address",
tags = ['abis']
) }}
WITH base AS (
SELECT
contract_address
FROM
{{ ref('silver__relevant_contracts') }}
),
proxies AS (
SELECT
proxy_address
FROM
{{ ref('silver__proxies') }}
JOIN base USING (contract_address)
)
SELECT
contract_address
FROM
base
UNION
SELECT
proxy_address AS contract_address
FROM
proxies

View File

@ -1,7 +0,0 @@
version: 2
models:
- name: silver__relevant_abi_contracts
tests:
- dbt_utils.unique_combination_of_columns:
combination_of_columns:
- CONTRACT_ADDRESS

View File

@ -9,6 +9,7 @@ WITH base AS (
SELECT
contract_address,
abi,
PARSE_JSON(abi) AS DATA,
SHA2(PARSE_JSON(abi)) AS abi_hash,
discord_username,
_inserted_timestamp
@ -44,32 +45,369 @@ ORDER BY
_inserted_timestamp ASC
LIMIT
10
), contracts AS (
), flat_event_abi AS (
SELECT
contract_address,
_inserted_timestamp,
DATA,
VALUE :inputs AS inputs,
VALUE :payable :: BOOLEAN AS payable,
VALUE :stateMutability :: STRING AS stateMutability,
VALUE :type :: STRING AS TYPE,
VALUE :anonymous :: BOOLEAN AS anonymous,
VALUE :name :: STRING AS NAME
FROM
base,
LATERAL FLATTEN (
input => DATA
)
WHERE
TYPE = 'event' qualify ROW_NUMBER() over (
PARTITION BY contract_address,
NAME,
inputs
ORDER BY
LENGTH(inputs)
) = 1
),
event_types AS (
SELECT
contract_address,
_inserted_timestamp,
inputs,
anonymous,
NAME,
ARRAY_AGG(
VALUE :type :: STRING
) AS event_type
FROM
flat_event_abi,
LATERAL FLATTEN (
input => inputs
)
GROUP BY
contract_address,
_inserted_timestamp,
inputs,
anonymous,
NAME
),
apply_event_udfs AS (
SELECT
contract_address,
NAME AS event_name,
PARSE_JSON(
OBJECT_CONSTRUCT(
'anonymous',
anonymous,
'inputs',
inputs,
'name',
NAME,
'type',
'event'
) :: STRING
) AS abi,
utils.udf_evm_text_signature(abi) AS simple_event_name,
utils.udf_keccak256(simple_event_name) AS event_signature,
NAME,
inputs,
event_type,
_inserted_timestamp
FROM
event_types
),
final_flat_event_abis AS (
SELECT
contract_address,
event_name,
abi,
simple_event_name,
event_signature,
NAME,
inputs,
event_type,
_inserted_timestamp
FROM
apply_event_udfs
),
flat_function_abis AS (
SELECT
contract_address,
DATA,
_inserted_timestamp,
VALUE :inputs AS inputs,
VALUE :outputs AS outputs,
VALUE :payable :: BOOLEAN AS payable,
VALUE :stateMutability :: STRING AS stateMutability,
VALUE :type :: STRING AS TYPE,
VALUE :name :: STRING AS NAME
FROM
base,
LATERAL FLATTEN (
input => DATA
)
WHERE
TYPE = 'function'
),
udf_function_abis AS (
SELECT
*,
PARSE_JSON(
object_construct_keep_null(
'inputs',
IFNULL(
inputs,
[]
),
'outputs',
IFNULL(
outputs,
[]
),
'name',
NAME,
'type',
'function'
) :: STRING
) AS abi,
utils.udf_evm_text_signature(abi) AS simple_function_name,
utils.udf_keccak256(simple_function_name) AS function_signature
FROM
flat_function_abis qualify ROW_NUMBER() over (
PARTITION BY contract_address,
function_signature
ORDER BY
_inserted_timestamp DESC
) = 1
),
flat_inputs AS (
SELECT
contract_address,
inputs,
NAME,
simple_function_name,
function_signature,
ARRAY_AGG(
VALUE :type :: STRING
) AS inputs_type
FROM
udf_function_abis,
LATERAL FLATTEN (
input => inputs
)
GROUP BY
ALL
),
fill_missing_input_names AS (
SELECT
contract_address,
NAME,
inputs_type,
simple_function_name,
function_signature,
VALUE :internalType :: STRING AS internalType,
VALUE :type :: STRING AS TYPE,
CASE
WHEN VALUE :name :: STRING = '' THEN CONCAT('input_', ROW_NUMBER() over (PARTITION BY contract_address, function_signature
ORDER BY
INDEX ASC) :: STRING)
ELSE VALUE :name :: STRING
END AS name_fixed,
inputs,
INDEX,
VALUE :components AS components
FROM
flat_inputs,
LATERAL FLATTEN (
input => inputs
)
),
final_flat_inputs AS (
SELECT
contract_address,
NAME,
inputs_type,
simple_function_name,
function_signature,
ARRAY_AGG(
OBJECT_CONSTRUCT(
'internalType',
internalType,
'name',
name_fixed,
'type',
TYPE,
'components',
components
)
) within GROUP (
ORDER BY
INDEX
) AS inputs
FROM
fill_missing_input_names
GROUP BY
ALL
),
flat_outputs AS (
SELECT
contract_address,
outputs,
simple_function_name,
function_signature,
NAME,
ARRAY_AGG(
VALUE :type :: STRING
) AS outputs_type
FROM
udf_function_abis,
LATERAL FLATTEN (
input => outputs
)
GROUP BY
ALL
),
fill_missing_output_names AS (
SELECT
contract_address,
NAME,
outputs_type,
simple_function_name,
function_signature,
VALUE :internalType :: STRING AS internalType,
VALUE :type :: STRING AS TYPE,
CASE
WHEN VALUE :name :: STRING = '' THEN CONCAT('output_', ROW_NUMBER() over (PARTITION BY contract_address, function_signature
ORDER BY
INDEX ASC) :: STRING)
ELSE VALUE :name :: STRING
END AS name_fixed,
outputs,
INDEX,
VALUE :components AS components
FROM
flat_outputs,
LATERAL FLATTEN (
input => outputs
)
),
final_flat_outputs AS (
SELECT
contract_address,
NAME,
outputs_type,
simple_function_name,
function_signature,
ARRAY_AGG(
OBJECT_CONSTRUCT(
'internalType',
internalType,
'name',
name_fixed,
'type',
TYPE,
'components',
components
)
) within GROUP (
ORDER BY
INDEX
) AS outputs
FROM
fill_missing_output_names
GROUP BY
ALL
),
all_contracts AS (
SELECT
A.contract_address,
A.name AS function_name,
i.inputs,
o.outputs,
i.inputs_type,
o.outputs_type,
A._inserted_timestamp,
A.function_signature,
A.simple_function_name
FROM
udf_function_abis A
LEFT JOIN final_flat_inputs i
ON A.contract_address = i.contract_address
AND A.function_signature = i.function_signature
LEFT JOIN final_flat_outputs o
ON A.contract_address = o.contract_address
AND A.function_signature = o.function_signature
),
apply_function_udfs AS (
SELECT
contract_address,
function_name,
PARSE_JSON(
object_construct_keep_null(
'inputs',
IFNULL(
inputs,
[]
),
'outputs',
IFNULL(
outputs,
[]
),
'name',
function_name,
'type',
'function'
) :: STRING
) AS abi,
simple_function_name,
function_signature,
inputs,
outputs,
inputs_type,
outputs_type,
_inserted_timestamp
FROM
all_contracts
),
final_function_abis AS (
SELECT
contract_address,
function_name,
abi,
simple_function_name,
function_signature,
inputs,
outputs,
inputs_type,
outputs_type,
_inserted_timestamp
FROM
apply_function_udfs
),
new_abis AS (
SELECT
DISTINCT contract_address
FROM
base
),
contracts AS (
SELECT
contract_address
FROM
{{ ref('silver__proxies') }}
WHERE
contract_address IN (
SELECT
contract_address
FROM
base
)
JOIN new_abis USING (contract_address)
),
proxies AS (
SELECT
proxy_address,
contract_address
p.proxy_address,
p.contract_address
FROM
{{ ref('silver__proxies') }}
WHERE
proxy_address IN (
SELECT
contract_address
FROM
base
)
p
JOIN new_abis n
ON p.proxy_address = n.contract_address
),
final_groupings AS (
SELECT
@ -114,84 +452,99 @@ identified_addresses AS (
FROM
final_groupings
),
logs AS (
function_mapping AS (
SELECT
l.block_number,
l.contract_address,
OBJECT_CONSTRUCT(
'topics',
l.topics,
'data',
l.data,
'address',
l.contract_address
) AS logs_data,
b.abi,
base_address AS abi_address
ia.base_address,
ia.contract_address,
LEFT(
function_signature,
10
) AS function_sig
FROM
{{ ref('silver__logs') }}
l
JOIN identified_addresses C USING (contract_address)
JOIN base b
ON b.contract_address = C.base_address
identified_addresses ia
JOIN final_function_abis ffa
ON ia.base_address = ffa.contract_address
),
recent_logs AS (
valid_traces AS (
SELECT
block_number,
contract_address,
logs_data,
abi,
abi_address
DISTINCT base_address
FROM
logs qualify(ROW_NUMBER() over(PARTITION BY abi_address
ORDER BY
block_number DESC)) BETWEEN 1
AND 500
),
decoded_logs AS (
SELECT
*,
ethereum.streamline.udf_decode(PARSE_JSON(abi), logs_data) AS decoded_output,
decoded_output [0] :decoded :: BOOLEAN AS decoded,
CASE
WHEN decoded THEN 1
ELSE 0
END AS successful_row
FROM
recent_logs
),
successful_abis AS (
SELECT
abi_address,
SUM(successful_row) AS successful_rows,
COUNT(*) AS total_rows,
successful_rows / total_rows AS success_rate
FROM
decoded_logs
GROUP BY
abi_address
)
SELECT
contract_address,
abi,
discord_username,
_inserted_timestamp,
abi_hash,
CONCAT(
contract_address,
'-',
abi_hash
) AS id
FROM
base
WHERE
contract_address IN (
SELECT
abi_address
FROM
successful_abis
WHERE
success_rate > 0.75
) qualify(ROW_NUMBER() over(PARTITION BY contract_address
ORDER BY
_inserted_timestamp DESC)) = 1
(
SELECT
base_address
FROM
{{ ref('silver__traces') }}
JOIN function_mapping
ON function_sig = LEFT(
input,
10
)
AND IFF(
TYPE = 'DELEGATECALL',
from_address,
to_address
) = contract_address
WHERE
block_timestamp > DATEADD('month', -12, SYSDATE())
LIMIT
50000)
), event_mapping AS (
SELECT
ia.base_address,
ia.contract_address,
event_signature
FROM
identified_addresses ia
JOIN final_flat_event_abis fea
ON ia.base_address = fea.contract_address
),
valid_logs AS (
SELECT
DISTINCT base_address
FROM
(
SELECT
base_address
FROM
{{ ref('silver__logs') }}
l
JOIN event_mapping ia
ON ia.contract_address = l.contract_address
AND event_signature = topics [0] :: STRING
WHERE
block_timestamp > DATEADD('month', -12, SYSDATE())
LIMIT
50000)
), all_valid_addresses AS (
SELECT
base_address
FROM
valid_traces
UNION
SELECT
base_address
FROM
valid_logs
)
SELECT
contract_address,
abi,
discord_username,
_inserted_timestamp,
abi_hash,
CONCAT(
contract_address,
'-',
abi_hash
) AS id
FROM
base
WHERE
contract_address IN (
SELECT
base_address
FROM
all_valid_addresses
) qualify(ROW_NUMBER() over(PARTITION BY contract_address
ORDER BY
_inserted_timestamp DESC)) = 1

View File

@ -1,18 +1,138 @@
{{ config(
materialized = 'table',
materialized = 'incremental',
unique_key = "contract_address",
post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION on equality(contract_address)",
tags = ['non_realtime']
) }}
SELECT
contract_address,
'optimism' AS blockchain,
COUNT(*) AS transfers,
MAX(block_number) AS latest_block
FROM
{{ ref('silver__logs') }}
WITH emitted_events AS (
SELECT
contract_address,
COUNT(*) AS event_count,
MAX(_inserted_timestamp) AS max_inserted_timestamp_logs,
MAX(block_number) AS latest_event_block
FROM
{{ ref('silver__logs') }}
{% if is_incremental() %}
WHERE
_inserted_timestamp > (
SELECT
MAX(max_inserted_timestamp_logs)
FROM
{{ this }}
)
{% endif %}
GROUP BY
1,
2
HAVING
COUNT(*) > 25
contract_address
),
function_calls AS (
SELECT
IFF(
TYPE = 'DELEGATECALL',
from_address,
to_address
) AS contract_address,
COUNT(*) AS function_call_count,
MAX(_inserted_timestamp) AS max_inserted_timestamp_traces,
MAX(block_number) AS latest_call_block
FROM
{{ ref('silver__traces') }}
WHERE
tx_status = 'SUCCESS'
AND trace_status = 'SUCCESS'
AND to_address IS NOT NULL
AND input IS NOT NULL
AND input <> '0x'
{% if is_incremental() %}
AND _inserted_timestamp > (
SELECT
MAX(max_inserted_timestamp_traces)
FROM
{{ this }}
)
{% endif %}
GROUP BY
1
),
active_contracts AS (
SELECT
contract_address
FROM
emitted_events
UNION
SELECT
contract_address
FROM
function_calls
),
previous_totals AS (
{% if is_incremental() %}
SELECT
contract_address, total_event_count, total_call_count, max_inserted_timestamp_logs, latest_event_block, max_inserted_timestamp_traces, latest_call_block
FROM
{{ this }}
{% else %}
SELECT
NULL AS contract_address, 0 AS total_event_count, 0 AS total_call_count, '1970-01-01 00:00:00' AS max_inserted_timestamp_logs, 0 AS latest_event_block, '1970-01-01 00:00:00' AS max_inserted_timestamp_traces, 0 AS latest_call_block
{% endif %})
SELECT
C.contract_address,
COALESCE(
p.total_event_count,
0
) + COALESCE(
e.event_count,
0
) AS total_event_count,
COALESCE(
p.total_call_count,
0
) + COALESCE(
f.function_call_count,
0
) AS total_call_count,
COALESCE(
p.total_event_count,
0
) + COALESCE(
e.event_count,
0
) + COALESCE(
p.total_call_count,
0
) + COALESCE(
f.function_call_count,
0
) AS total_interaction_count,
COALESCE(
e.max_inserted_timestamp_logs,
p.max_inserted_timestamp_logs,
'1970-01-01 00:00:00'
) AS max_inserted_timestamp_logs,
COALESCE(
f.max_inserted_timestamp_traces,
p.max_inserted_timestamp_traces,
'1970-01-01 00:00:00'
) AS max_inserted_timestamp_traces,
COALESCE(
e.latest_event_block,
p.latest_event_block,
0
) AS latest_event_block,
COALESCE(
f.latest_call_block,
p.latest_call_block,
0
) AS latest_call_block
FROM
active_contracts C
LEFT JOIN emitted_events e
ON C.contract_address = e.contract_address
LEFT JOIN function_calls f
ON C.contract_address = f.contract_address
LEFT JOIN previous_totals p
ON C.contract_address = p.contract_address