abi model (#137)

* abi model

* remove legacy model

* tests

* key

* tests

* updates
This commit is contained in:
Austin 2025-09-04 20:08:26 -04:00 committed by GitHub
parent 42d32503ef
commit f5435bf17c
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
14 changed files with 439 additions and 95 deletions

View File

@ -45,7 +45,7 @@ jobs:
dbt deps
- name: Run DBT Jobs
run: |
dbt test -m models/bronze/core "sei_models,tag:recent_evm_test" "sei_models,tag:curated"
dbt test -m models/bronze/core "sei_models,tag:recent_evm_test" "sei_models,tag:curated" "sei_models,tag:abis"
notify-failure:
needs: [run_dbt_jobs]

View File

@ -0,0 +1,175 @@
{% macro streamline_external_table_query_v3(
source_name,
source_version='',
partition_function="CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 4), '_', 1) AS INTEGER)",
error_code=false,
balances=false,
block_number=true,
tx_hash=false,
contract_address=false,
data_not_null=true
) %}
{% if source_version != '' %}
{% set source_version = '_' ~ source_version.lower() %}
{% endif %}
WITH meta AS (
SELECT
job_created_time AS _inserted_timestamp,
file_name,
{{ partition_function }} AS partition_key
FROM
TABLE(
information_schema.external_table_file_registration_history(
start_time => DATEADD('day', -3, CURRENT_TIMESTAMP()),
table_name => '{{ source( "bronze_streamline", source_name ~ source_version) }}')
) A
)
SELECT
s.*,
b.file_name,
b._inserted_timestamp
{% if balances %}, --for balances
r.block_timestamp :: TIMESTAMP AS block_timestamp
{% endif %}
{% if block_number %}, --for streamline 2.0+
COALESCE(
s.value :"BLOCK_NUMBER" :: STRING,
s.metadata :request :"data" :id :: STRING,
PARSE_JSON(
s.metadata :request :"data"
) :id :: STRING
) :: INT AS block_number
{% endif %}
{% if contract_address %}, --for contract_abis
COALESCE(
VALUE :"CONTRACT_ADDRESS",
VALUE :"contract_address"
) :: STRING AS contract_address
{% endif %}
{% if tx_hash %}, --for receipts_by_hash
s.value :"TX_HASH" :: STRING AS tx_hash
{% endif %}
FROM
{{ source(
"bronze_streamline",
source_name ~ source_version
) }}
s
JOIN meta b
ON b.file_name = metadata$filename
AND b.partition_key = s.partition_key
{% if balances %}
JOIN {{ ref('_block_ranges') }}
r
ON r.block_number = COALESCE(
s.value :"BLOCK_NUMBER" :: INT,
s.value :"block_number" :: INT
)
{% endif %}
WHERE
b.partition_key = s.partition_key
{% if data_not_null %}
{% if error_code %}
AND DATA :error :code IS NULL
{% else %}
AND (DATA :error IS NULL OR DATA :error :: STRING IS NULL)
{% endif %}
AND DATA IS NOT NULL
{% endif %}
{% endmacro %}
{% macro streamline_external_table_query_fr_v3(
source_name,
source_version='',
partition_function="CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 4), '_', 1) AS INTEGER)",
partition_join_key='partition_key',
error_code=false,
balances=false,
block_number=true,
tx_hash=false,
contract_address=false,
data_not_null=true
) %}
{% if source_version != '' %}
{% set source_version = '_' ~ source_version.lower() %}
{% endif %}
WITH meta AS (
SELECT
registered_on AS _inserted_timestamp,
file_name,
{{ partition_function }} AS partition_key
FROM
TABLE(
information_schema.external_table_files(
table_name => '{{ source( "bronze_streamline", source_name ~ source_version) }}'
)
) A
)
SELECT
s.*,
b.file_name,
b._inserted_timestamp
{% if balances %}, --for balances
r.block_timestamp :: TIMESTAMP AS block_timestamp
{% endif %}
{% if block_number %}, --for streamline 2.0+
COALESCE(
s.value :"BLOCK_NUMBER" :: STRING,
s.value :"block_number" :: STRING,
s.metadata :request :"data" :id :: STRING,
PARSE_JSON(
s.metadata :request :"data"
) :id :: STRING
) :: INT AS block_number
{% endif %}
{% if contract_address %}, --for contract_abis
COALESCE(
VALUE :"CONTRACT_ADDRESS",
VALUE :"contract_address"
) :: STRING AS contract_address
{% endif %}
{% if tx_hash %}, --for receipts_by_hash
s.value :"TX_HASH" :: STRING AS tx_hash
{% endif %}
FROM
{{ source(
"bronze_streamline",
source_name ~ source_version
) }}
s
JOIN meta b
ON b.file_name = metadata$filename
AND b.partition_key = s.{{ partition_join_key }}
{% if balances %}
JOIN {{ ref('_block_ranges') }}
r
ON r.block_number = COALESCE(
s.value :"BLOCK_NUMBER" :: INT,
s.value :"block_number" :: INT
)
{% endif %}
WHERE
b.partition_key = s.{{ partition_join_key }}
{% if data_not_null %}
{% if error_code %}
AND DATA :error :code IS NULL
{% else %}
AND (DATA :error IS NULL OR DATA :error :: STRING IS NULL)
{% endif %}
AND DATA IS NOT NULL
{% endif %}
{% endmacro %}

View File

@ -1,58 +0,0 @@
{{ config(
materialized = 'incremental',
unique_key = "contract_address",
full_refresh = false,
tags = ['noncore']
) }}
WITH base AS (
SELECT
contract_address
FROM
{{ ref('silver_evm__relevant_contracts') }}
WHERE
total_interaction_count >= 1000
{% if is_incremental() %}
and contract_address not in (
SELECT
contract_address
FROM
{{ this }}
WHERE
abi_data :data :result :: STRING <> 'Max rate limit reached'
)
{% endif %}
ORDER BY
total_interaction_count DESC
LIMIT
50
), row_nos AS (
SELECT
contract_address,
ROW_NUMBER() over (
ORDER BY
contract_address
) AS row_no
FROM
base
),
batched AS ({% for item in range(51) %}
SELECT
rn.contract_address, CONCAT('https://seitrace.com/pacific-1/api/v2/smart-contracts/', contract_address) AS url, IFNULL(live.udf_api(url) :data :abi, ARRAY_CONSTRUCT('ABI unavailable')) AS abi_data, SYSDATE() AS _inserted_timestamp
FROM
row_nos rn
WHERE
row_no = {{ item }}
{% if not loop.last %}
UNION ALL
{% endif %}
{% endfor %})
SELECT
contract_address,
abi_data,
_inserted_timestamp
FROM
batched

View File

@ -1,22 +0,0 @@
version: 2
models:
- name: bronze_evm_api__contract_abis
columns:
- name: _INSERTED_TIMESTAMP
tests:
- not_null
- dbt_expectations.expect_row_values_to_have_recent_data:
datepart: day
interval: 1
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- TIMESTAMP_NTZ
- name: CONTRACT_ADDRESS
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- VARCHAR
- dbt_expectations.expect_column_values_to_match_regex:
regex: "^(0x)[0-9a-fA-F]{40}$"

View File

@ -6,9 +6,9 @@ models:
combination_of_columns:
- EZ_DECODED_EVENT_LOGS_ID
- decoded_logs_exist:
fact_logs_model: ref('test_silver_evm__logs_full')
fact_logs_model: ref('test_gold__fact_event_logs_full')
- find_missing_decoded_logs:
fact_logs_model: ref('test_silver_evm__logs_full')
fact_logs_model: ref('test_gold__fact_event_logs_full')
columns:
- name: BLOCK_NUMBER

View File

@ -2,7 +2,7 @@
materialized = "incremental",
unique_key = "contract_address",
merge_exclude_columns = ["inserted_timestamp"],
post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION ON EQUALITY(contract_address,abi_hash,bytecode), SUBSTRING(contract_address,abi_hash,bytecode)",
post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION ON EQUALITY(contract_address,abi_hash,bytecode)",
tags = ['abis']
) }}
@ -33,7 +33,7 @@ verified_abis AS (
FROM
{{ ref('silver_evm__verified_abis') }}
WHERE
abi_source = 'seitrace'
abi_source = 'etherscan'
{% if is_incremental() %}
AND _inserted_timestamp >= (
@ -44,7 +44,7 @@ AND _inserted_timestamp >= (
FROM
{{ this }}
WHERE
abi_source = 'seitrace'
abi_source = 'etherscan'
)
{% endif %}
),

View File

@ -1,3 +1,4 @@
-- depends_on: {{ ref('bronze__contract_abis') }}
{{ config(
materialized = 'incremental',
unique_key = "contract_address",
@ -9,16 +10,20 @@ WITH base AS (
SELECT
contract_address,
PARSE_JSON(
abi_data
b.data:result
) AS DATA,
_inserted_timestamp
FROM
{{ ref('bronze_evm_api__contract_abis') }}
{% if is_incremental() %}
{{ ref('bronze__contract_abis') }} b
{% else %}
{{ ref('bronze__contract_abis_fr') }} b
{% endif %}
WHERE
abi_data [0] :: STRING <> 'ABI unavailable'
b.data:message::string = 'OK'
{% if is_incremental() %}
AND _inserted_timestamp >= (
AND b._inserted_timestamp >= (
SELECT
COALESCE(
MAX(
@ -31,12 +36,12 @@ AND _inserted_timestamp >= (
)
{% endif %}
),
sei_trace_abis AS (
etherscan_abis AS (
SELECT
contract_address,
DATA,
_inserted_timestamp,
'seitrace' AS abi_source
'etherscan' AS abi_source
FROM
base
),
@ -83,7 +88,7 @@ all_abis AS (
NULL AS discord_username,
SHA2(DATA) AS abi_hash
FROM
sei_trace_abis
etherscan_abis
UNION
SELECT
contract_address,

View File

@ -4,4 +4,15 @@ models:
tests:
- dbt_utils.unique_combination_of_columns:
combination_of_columns:
- CONTRACT_ADDRESS
- CONTRACT_ADDRESS
columns:
- name: _INSERTED_TIMESTAMP
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- TIMESTAMP_LTZ
- TIMESTAMP_NTZ
- dbt_expectations.expect_row_values_to_have_recent_data:
datepart: day
interval: 2

View File

@ -0,0 +1,11 @@
{{ config (
materialized = 'view',
tags = ['noncore']
) }}
{# Main query starts here #}
{{ streamline_external_table_query_v3(
source_name = 'contract_abis',
block_number = false,
contract_address = true
) }}

View File

@ -0,0 +1,11 @@
{{ config (
materialized = 'view',
tags = ['noncore']
) }}
{# Main query starts here #}
{{ streamline_external_table_query_fr_v3(
source_name = 'contract_abis',
block_number = false,
contract_address = true
) }}

View File

@ -0,0 +1,79 @@
{# Log configuration details #}
{{ log_model_details() }}
{{ config (
materialized = "ephemeral"
) }}
WITH retry AS (
SELECT
r.contract_address,
GREATEST(
latest_call_block,
latest_event_block
) AS block_number,
total_interaction_count
FROM
{{ ref("silver_evm__relevant_contracts") }}
r
LEFT JOIN {{ source(
'abis_silver',
'verified_abis'
) }}
v USING (contract_address)
LEFT JOIN {{ source(
'complete_streamline',
'complete_contract_abis'
) }} C
ON r.contract_address = C.contract_address
AND C._inserted_timestamp >= CURRENT_DATE - INTERVAL '30 days' -- avoid retrying the same contract within the last 30 days
WHERE
r.total_interaction_count >= 2000 -- high interaction count
AND GREATEST(
max_inserted_timestamp_logs,
max_inserted_timestamp_traces
) >= CURRENT_DATE - INTERVAL '30 days' -- recent activity
AND v.contract_address IS NULL -- no verified abi
AND C.contract_address IS NULL
ORDER BY
total_interaction_count DESC
LIMIT
5
), FINAL AS (
SELECT
implementation_contract AS contract_address,
start_block AS block_number
FROM
{{ ref("silver_evm__proxies") }}
p
JOIN retry r USING (contract_address)
LEFT JOIN {{ source(
'abis_silver',
'verified_abis'
) }}
v
ON v.contract_address = p.implementation_contract
LEFT JOIN {{ source(
'complete_streamline',
'complete_contract_abis'
) }} C
ON p.implementation_contract = C.contract_address
AND C._inserted_timestamp >= CURRENT_DATE - INTERVAL '30 days' -- avoid retrying the same contract within the last 30 days
WHERE
v.contract_address IS NULL
AND C.contract_address IS NULL
UNION ALL
SELECT
contract_address,
block_number
FROM
retry
)
SELECT
*
FROM
FINAL qualify ROW_NUMBER() over (
PARTITION BY contract_address
ORDER BY
block_number DESC
) = 1

View File

@ -0,0 +1,39 @@
-- depends on: {{ ref('bronze__contract_abis') }}
{{ config (
materialized = 'incremental',
unique_key = 'complete_contract_abis_id',
cluster_by = 'partition_key',
post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION on equality(complete_contract_abis_id, contract_address)",
tags = ['noncore']
) }}
SELECT
partition_key,
contract_address,
file_name,
{{ dbt_utils.generate_surrogate_key(
['contract_address']
) }} AS complete_contract_abis_id,
SYSDATE() AS inserted_timestamp,
SYSDATE() AS modified_timestamp,
_inserted_timestamp,
'{{ invocation_id }}' AS _invocation_id
FROM
{% if is_incremental() %}
{{ ref('bronze__contract_abis') }}
WHERE
_inserted_timestamp >= (
SELECT
COALESCE (MAX(_inserted_timestamp), '1970-01-01' :: TIMESTAMP)
FROM
{{ this }})
and data:result::string not like 'Max calls per%'
{% else %}
{{ ref('bronze__contract_abis_fr') }}
where data:result::string not like 'Max calls per%'
{% endif %}
qualify(ROW_NUMBER() over (PARTITION BY complete_contract_abis_id
ORDER BY
_inserted_timestamp DESC)) = 1

View File

@ -0,0 +1,81 @@
{{ config (
materialized = "view",
tags = ['noncore']
) }}
WITH recent_relevant_contracts AS (
SELECT
contract_address,
total_interaction_count,
GREATEST(
max_inserted_timestamp_logs,
max_inserted_timestamp_traces
) max_inserted_timestamp
FROM
{{ ref('silver_evm__relevant_contracts') }} C
LEFT JOIN {{ ref("streamline__complete_contract_abis") }}
s USING (contract_address)
WHERE
s.contract_address IS NULL
AND total_interaction_count > 1000
ORDER BY
total_interaction_count DESC
LIMIT
750
), all_contracts AS (
SELECT
contract_address
FROM
recent_relevant_contracts
{% if is_incremental() %}
UNION
SELECT
contract_address
FROM
{{ ref('_retry_abis') }}
{% endif %}
)
SELECT
contract_address,
DATE_PART('EPOCH_SECONDS', sysdate()::date) :: INT AS partition_key,
live.udf_api(
'GET',
CONCAT(
'https://api.etherscan.io/v2/api?chainid=1329&module=contract&action=getabi&address=',
contract_address,
'&apikey={KEY}'
),
OBJECT_CONSTRUCT(
'Content-Type', 'application/json',
'fsc-quantum-state', 'streamline'
),
{},
'Vault/prod/sei/etherscan'
) AS request
FROM
all_contracts
{# Streamline Function Call #}
{% if execute %}
{% set params = {
"external_table" :"contract_abis",
"sql_limit" : 1000,
"producer_batch_size" : 10,
"worker_batch_size" : 10,
"async_concurrent_requests" : 1,
"sql_source" : 'contract_abis_realtime'
} %}
{% set function_call_sql %}
{{ fsc_utils.if_data_call_function_v2(
func = 'streamline.udf_bulk_rest_api_v2',
target = this.schema ~ "." ~ this.identifier,
params = params
) }}
{% endset %}
{% do run_query(function_call_sql) %}
{{ log("Streamline function call: " ~ function_call_sql, info=true) }}
{% endif %}

View File

@ -55,6 +55,7 @@ sources:
- name: sei_addresses
- name: evm_blocks_v2
- name: evm_transactions_v2
- name: contract_abis
- name: bronze
schema: bronze
tables:
@ -101,4 +102,15 @@ sources:
- name: evm_known_event_sigs
- name: evm_wrapped_assets
- name: dates
- name: scoring_activity_categories
- name: scoring_activity_categories
- name: abis_silver
database: "{{ target.database }}"
schema: silver_evm
tables:
- name: verified_abis
- name: complete_event_abis
- name: complete_streamline
database: "{{ target.database }}"
schema: streamline
tables:
- name: complete_contract_abis