From f5435bf17c81a22e7ed41f5023d4be077b3455fc Mon Sep 17 00:00:00 2001 From: Austin <93135983+austinFlipside@users.noreply.github.com> Date: Thu, 4 Sep 2025 20:08:26 -0400 Subject: [PATCH] abi model (#137) * abi model * remove legacy model * tests * key * tests * updates --- .github/workflows/dbt_test_evm_recent.yml | 2 +- .../streamline_external_table_queries.sql | 175 ++++++++++++++++++ .../api_udf/bronze_evm_api__contract_abis.sql | 58 ------ .../api_udf/bronze_evm_api__contract_abis.yml | 22 --- ...t_gold_evm__ez_decoded_event_logs_full.yml | 4 +- models/evm/silver/abis/silver_evm__abis.sql | 6 +- .../silver/abis/silver_evm__verified_abis.sql | 19 +- .../silver/abis/silver_evm__verified_abis.yml | 13 +- .../core/abis/bronze__contract_abis.sql | 11 ++ .../core/abis/bronze__contract_abis_fr.sql | 11 ++ .../streamline/silver/abis/_retry_abis.sql | 79 ++++++++ .../streamline__complete_contract_abis.sql | 39 ++++ .../streamline__contract_abis_realtime.sql | 81 ++++++++ models/sources.yml | 14 +- 14 files changed, 439 insertions(+), 95 deletions(-) create mode 100644 macros/streamline/streamline_external_table_queries.sql delete mode 100644 models/evm/bronze/api_udf/bronze_evm_api__contract_abis.sql delete mode 100644 models/evm/bronze/api_udf/bronze_evm_api__contract_abis.yml create mode 100644 models/evm/streamline/bronze/core/abis/bronze__contract_abis.sql create mode 100644 models/evm/streamline/bronze/core/abis/bronze__contract_abis_fr.sql create mode 100644 models/evm/streamline/silver/abis/_retry_abis.sql create mode 100644 models/evm/streamline/silver/abis/streamline__complete_contract_abis.sql create mode 100644 models/evm/streamline/silver/abis/streamline__contract_abis_realtime.sql diff --git a/.github/workflows/dbt_test_evm_recent.yml b/.github/workflows/dbt_test_evm_recent.yml index c6acccd..9318b69 100644 --- a/.github/workflows/dbt_test_evm_recent.yml +++ b/.github/workflows/dbt_test_evm_recent.yml @@ -45,7 +45,7 @@ jobs: dbt deps - name: Run DBT Jobs run: | - dbt test -m models/bronze/core "sei_models,tag:recent_evm_test" "sei_models,tag:curated" + dbt test -m models/bronze/core "sei_models,tag:recent_evm_test" "sei_models,tag:curated" "sei_models,tag:abis" notify-failure: needs: [run_dbt_jobs] diff --git a/macros/streamline/streamline_external_table_queries.sql b/macros/streamline/streamline_external_table_queries.sql new file mode 100644 index 0000000..8dce929 --- /dev/null +++ b/macros/streamline/streamline_external_table_queries.sql @@ -0,0 +1,175 @@ +{% macro streamline_external_table_query_v3( + source_name, + source_version='', + partition_function="CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 4), '_', 1) AS INTEGER)", + error_code=false, + balances=false, + block_number=true, + tx_hash=false, + contract_address=false, + data_not_null=true + ) %} + + {% if source_version != '' %} + {% set source_version = '_' ~ source_version.lower() %} + {% endif %} + + WITH meta AS ( + SELECT + job_created_time AS _inserted_timestamp, + file_name, + {{ partition_function }} AS partition_key + FROM + TABLE( + information_schema.external_table_file_registration_history( + start_time => DATEADD('day', -3, CURRENT_TIMESTAMP()), + table_name => '{{ source( "bronze_streamline", source_name ~ source_version) }}') + ) A + ) + SELECT + s.*, + b.file_name, + b._inserted_timestamp + + {% if balances %}, --for balances + r.block_timestamp :: TIMESTAMP AS block_timestamp + {% endif %} + + {% if block_number %}, --for streamline 2.0+ + COALESCE( + s.value :"BLOCK_NUMBER" :: STRING, + s.metadata :request :"data" :id :: STRING, + PARSE_JSON( + s.metadata :request :"data" + ) :id :: STRING + ) :: INT AS block_number + {% endif %} + + {% if contract_address %}, --for contract_abis + COALESCE( + VALUE :"CONTRACT_ADDRESS", + VALUE :"contract_address" + ) :: STRING AS contract_address + {% endif %} + + {% if tx_hash %}, --for receipts_by_hash + s.value :"TX_HASH" :: STRING AS tx_hash + {% endif %} + FROM + {{ source( + "bronze_streamline", + source_name ~ source_version + ) }} + s + JOIN meta b + ON b.file_name = metadata$filename + AND b.partition_key = s.partition_key + + {% if balances %} + JOIN {{ ref('_block_ranges') }} + r + ON r.block_number = COALESCE( + s.value :"BLOCK_NUMBER" :: INT, + s.value :"block_number" :: INT + ) + {% endif %} + WHERE + b.partition_key = s.partition_key + {% if data_not_null %} + {% if error_code %} + AND DATA :error :code IS NULL + {% else %} + AND (DATA :error IS NULL OR DATA :error :: STRING IS NULL) + {% endif %} + AND DATA IS NOT NULL + {% endif %} +{% endmacro %} + +{% macro streamline_external_table_query_fr_v3( + source_name, + source_version='', + partition_function="CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 4), '_', 1) AS INTEGER)", + partition_join_key='partition_key', + error_code=false, + balances=false, + block_number=true, + tx_hash=false, + contract_address=false, + data_not_null=true + ) %} + + {% if source_version != '' %} + {% set source_version = '_' ~ source_version.lower() %} + {% endif %} + + WITH meta AS ( + SELECT + registered_on AS _inserted_timestamp, + file_name, + {{ partition_function }} AS partition_key + FROM + TABLE( + information_schema.external_table_files( + table_name => '{{ source( "bronze_streamline", source_name ~ source_version) }}' + ) + ) A + ) +SELECT + s.*, + b.file_name, + b._inserted_timestamp + + {% if balances %}, --for balances + r.block_timestamp :: TIMESTAMP AS block_timestamp +{% endif %} + +{% if block_number %}, --for streamline 2.0+ + COALESCE( + s.value :"BLOCK_NUMBER" :: STRING, + s.value :"block_number" :: STRING, + s.metadata :request :"data" :id :: STRING, + PARSE_JSON( + s.metadata :request :"data" + ) :id :: STRING + ) :: INT AS block_number +{% endif %} + +{% if contract_address %}, --for contract_abis + COALESCE( + VALUE :"CONTRACT_ADDRESS", + VALUE :"contract_address" + ) :: STRING AS contract_address +{% endif %} + +{% if tx_hash %}, --for receipts_by_hash + s.value :"TX_HASH" :: STRING AS tx_hash +{% endif %} +FROM + {{ source( + "bronze_streamline", + source_name ~ source_version + ) }} + s + JOIN meta b + ON b.file_name = metadata$filename + AND b.partition_key = s.{{ partition_join_key }} + + {% if balances %} + JOIN {{ ref('_block_ranges') }} + r + ON r.block_number = COALESCE( + s.value :"BLOCK_NUMBER" :: INT, + s.value :"block_number" :: INT + ) + {% endif %} +WHERE + b.partition_key = s.{{ partition_join_key }} + {% if data_not_null %} + {% if error_code %} + AND DATA :error :code IS NULL + {% else %} + AND (DATA :error IS NULL OR DATA :error :: STRING IS NULL) + {% endif %} + AND DATA IS NOT NULL + {% endif %} +{% endmacro %} diff --git a/models/evm/bronze/api_udf/bronze_evm_api__contract_abis.sql b/models/evm/bronze/api_udf/bronze_evm_api__contract_abis.sql deleted file mode 100644 index da3b913..0000000 --- a/models/evm/bronze/api_udf/bronze_evm_api__contract_abis.sql +++ /dev/null @@ -1,58 +0,0 @@ -{{ config( - materialized = 'incremental', - unique_key = "contract_address", - full_refresh = false, - tags = ['noncore'] -) }} - -WITH base AS ( - - SELECT - contract_address - FROM - {{ ref('silver_evm__relevant_contracts') }} - WHERE - total_interaction_count >= 1000 - -{% if is_incremental() %} -and contract_address not in ( -SELECT - contract_address -FROM - {{ this }} - WHERE - abi_data :data :result :: STRING <> 'Max rate limit reached' -) -{% endif %} -ORDER BY - total_interaction_count DESC -LIMIT - 50 -), row_nos AS ( - SELECT - contract_address, - ROW_NUMBER() over ( - ORDER BY - contract_address - ) AS row_no - FROM - base -), -batched AS ({% for item in range(51) %} -SELECT - rn.contract_address, CONCAT('https://seitrace.com/pacific-1/api/v2/smart-contracts/', contract_address) AS url, IFNULL(live.udf_api(url) :data :abi, ARRAY_CONSTRUCT('ABI unavailable')) AS abi_data, SYSDATE() AS _inserted_timestamp -FROM - row_nos rn -WHERE - row_no = {{ item }} - - {% if not loop.last %} - UNION ALL - {% endif %} -{% endfor %}) -SELECT - contract_address, - abi_data, - _inserted_timestamp -FROM - batched diff --git a/models/evm/bronze/api_udf/bronze_evm_api__contract_abis.yml b/models/evm/bronze/api_udf/bronze_evm_api__contract_abis.yml deleted file mode 100644 index 7913879..0000000 --- a/models/evm/bronze/api_udf/bronze_evm_api__contract_abis.yml +++ /dev/null @@ -1,22 +0,0 @@ -version: 2 -models: - - name: bronze_evm_api__contract_abis - - columns: - - name: _INSERTED_TIMESTAMP - tests: - - not_null - - dbt_expectations.expect_row_values_to_have_recent_data: - datepart: day - interval: 1 - - dbt_expectations.expect_column_values_to_be_in_type_list: - column_type_list: - - TIMESTAMP_NTZ - - name: CONTRACT_ADDRESS - tests: - - not_null - - dbt_expectations.expect_column_values_to_be_in_type_list: - column_type_list: - - VARCHAR - - dbt_expectations.expect_column_values_to_match_regex: - regex: "^(0x)[0-9a-fA-F]{40}$" \ No newline at end of file diff --git a/models/evm/gold/tests/decoded_logs/test_gold_evm__ez_decoded_event_logs_full.yml b/models/evm/gold/tests/decoded_logs/test_gold_evm__ez_decoded_event_logs_full.yml index b1125e0..c057a08 100644 --- a/models/evm/gold/tests/decoded_logs/test_gold_evm__ez_decoded_event_logs_full.yml +++ b/models/evm/gold/tests/decoded_logs/test_gold_evm__ez_decoded_event_logs_full.yml @@ -6,9 +6,9 @@ models: combination_of_columns: - EZ_DECODED_EVENT_LOGS_ID - decoded_logs_exist: - fact_logs_model: ref('test_silver_evm__logs_full') + fact_logs_model: ref('test_gold__fact_event_logs_full') - find_missing_decoded_logs: - fact_logs_model: ref('test_silver_evm__logs_full') + fact_logs_model: ref('test_gold__fact_event_logs_full') columns: - name: BLOCK_NUMBER diff --git a/models/evm/silver/abis/silver_evm__abis.sql b/models/evm/silver/abis/silver_evm__abis.sql index c999af4..d9edc4f 100644 --- a/models/evm/silver/abis/silver_evm__abis.sql +++ b/models/evm/silver/abis/silver_evm__abis.sql @@ -2,7 +2,7 @@ materialized = "incremental", unique_key = "contract_address", merge_exclude_columns = ["inserted_timestamp"], - post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION ON EQUALITY(contract_address,abi_hash,bytecode), SUBSTRING(contract_address,abi_hash,bytecode)", + post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION ON EQUALITY(contract_address,abi_hash,bytecode)", tags = ['abis'] ) }} @@ -33,7 +33,7 @@ verified_abis AS ( FROM {{ ref('silver_evm__verified_abis') }} WHERE - abi_source = 'seitrace' + abi_source = 'etherscan' {% if is_incremental() %} AND _inserted_timestamp >= ( @@ -44,7 +44,7 @@ AND _inserted_timestamp >= ( FROM {{ this }} WHERE - abi_source = 'seitrace' + abi_source = 'etherscan' ) {% endif %} ), diff --git a/models/evm/silver/abis/silver_evm__verified_abis.sql b/models/evm/silver/abis/silver_evm__verified_abis.sql index c3f5fd0..1f12347 100644 --- a/models/evm/silver/abis/silver_evm__verified_abis.sql +++ b/models/evm/silver/abis/silver_evm__verified_abis.sql @@ -1,3 +1,4 @@ +-- depends_on: {{ ref('bronze__contract_abis') }} {{ config( materialized = 'incremental', unique_key = "contract_address", @@ -9,16 +10,20 @@ WITH base AS ( SELECT contract_address, PARSE_JSON( - abi_data + b.data:result ) AS DATA, _inserted_timestamp FROM - {{ ref('bronze_evm_api__contract_abis') }} + {% if is_incremental() %} + {{ ref('bronze__contract_abis') }} b + {% else %} + {{ ref('bronze__contract_abis_fr') }} b + {% endif %} WHERE - abi_data [0] :: STRING <> 'ABI unavailable' + b.data:message::string = 'OK' {% if is_incremental() %} -AND _inserted_timestamp >= ( +AND b._inserted_timestamp >= ( SELECT COALESCE( MAX( @@ -31,12 +36,12 @@ AND _inserted_timestamp >= ( ) {% endif %} ), -sei_trace_abis AS ( +etherscan_abis AS ( SELECT contract_address, DATA, _inserted_timestamp, - 'seitrace' AS abi_source + 'etherscan' AS abi_source FROM base ), @@ -83,7 +88,7 @@ all_abis AS ( NULL AS discord_username, SHA2(DATA) AS abi_hash FROM - sei_trace_abis + etherscan_abis UNION SELECT contract_address, diff --git a/models/evm/silver/abis/silver_evm__verified_abis.yml b/models/evm/silver/abis/silver_evm__verified_abis.yml index f990c7d..a7c8141 100644 --- a/models/evm/silver/abis/silver_evm__verified_abis.yml +++ b/models/evm/silver/abis/silver_evm__verified_abis.yml @@ -4,4 +4,15 @@ models: tests: - dbt_utils.unique_combination_of_columns: combination_of_columns: - - CONTRACT_ADDRESS \ No newline at end of file + - CONTRACT_ADDRESS + columns: + - name: _INSERTED_TIMESTAMP + tests: + - not_null + - dbt_expectations.expect_column_values_to_be_in_type_list: + column_type_list: + - TIMESTAMP_LTZ + - TIMESTAMP_NTZ + - dbt_expectations.expect_row_values_to_have_recent_data: + datepart: day + interval: 2 \ No newline at end of file diff --git a/models/evm/streamline/bronze/core/abis/bronze__contract_abis.sql b/models/evm/streamline/bronze/core/abis/bronze__contract_abis.sql new file mode 100644 index 0000000..11a8c79 --- /dev/null +++ b/models/evm/streamline/bronze/core/abis/bronze__contract_abis.sql @@ -0,0 +1,11 @@ +{{ config ( + materialized = 'view', + tags = ['noncore'] +) }} + +{# Main query starts here #} +{{ streamline_external_table_query_v3( + source_name = 'contract_abis', + block_number = false, + contract_address = true +) }} \ No newline at end of file diff --git a/models/evm/streamline/bronze/core/abis/bronze__contract_abis_fr.sql b/models/evm/streamline/bronze/core/abis/bronze__contract_abis_fr.sql new file mode 100644 index 0000000..7fb87d2 --- /dev/null +++ b/models/evm/streamline/bronze/core/abis/bronze__contract_abis_fr.sql @@ -0,0 +1,11 @@ +{{ config ( + materialized = 'view', + tags = ['noncore'] +) }} + +{# Main query starts here #} +{{ streamline_external_table_query_fr_v3( + source_name = 'contract_abis', + block_number = false, + contract_address = true +) }} \ No newline at end of file diff --git a/models/evm/streamline/silver/abis/_retry_abis.sql b/models/evm/streamline/silver/abis/_retry_abis.sql new file mode 100644 index 0000000..01a79f2 --- /dev/null +++ b/models/evm/streamline/silver/abis/_retry_abis.sql @@ -0,0 +1,79 @@ +{# Log configuration details #} +{{ log_model_details() }} +{{ config ( + materialized = "ephemeral" +) }} + +WITH retry AS ( + + SELECT + r.contract_address, + GREATEST( + latest_call_block, + latest_event_block + ) AS block_number, + total_interaction_count + FROM + {{ ref("silver_evm__relevant_contracts") }} + r + LEFT JOIN {{ source( + 'abis_silver', + 'verified_abis' + ) }} + v USING (contract_address) + LEFT JOIN {{ source( + 'complete_streamline', + 'complete_contract_abis' + ) }} C + ON r.contract_address = C.contract_address + AND C._inserted_timestamp >= CURRENT_DATE - INTERVAL '30 days' -- avoid retrying the same contract within the last 30 days + WHERE + r.total_interaction_count >= 2000 -- high interaction count + AND GREATEST( + max_inserted_timestamp_logs, + max_inserted_timestamp_traces + ) >= CURRENT_DATE - INTERVAL '30 days' -- recent activity + AND v.contract_address IS NULL -- no verified abi + AND C.contract_address IS NULL + ORDER BY + total_interaction_count DESC + LIMIT + 5 +), FINAL AS ( + SELECT + implementation_contract AS contract_address, + start_block AS block_number + FROM + {{ ref("silver_evm__proxies") }} + p + JOIN retry r USING (contract_address) + LEFT JOIN {{ source( + 'abis_silver', + 'verified_abis' + ) }} + v + ON v.contract_address = p.implementation_contract + LEFT JOIN {{ source( + 'complete_streamline', + 'complete_contract_abis' + ) }} C + ON p.implementation_contract = C.contract_address + AND C._inserted_timestamp >= CURRENT_DATE - INTERVAL '30 days' -- avoid retrying the same contract within the last 30 days + WHERE + v.contract_address IS NULL + AND C.contract_address IS NULL + UNION ALL + SELECT + contract_address, + block_number + FROM + retry +) +SELECT + * +FROM + FINAL qualify ROW_NUMBER() over ( + PARTITION BY contract_address + ORDER BY + block_number DESC + ) = 1 diff --git a/models/evm/streamline/silver/abis/streamline__complete_contract_abis.sql b/models/evm/streamline/silver/abis/streamline__complete_contract_abis.sql new file mode 100644 index 0000000..a376f30 --- /dev/null +++ b/models/evm/streamline/silver/abis/streamline__complete_contract_abis.sql @@ -0,0 +1,39 @@ +-- depends on: {{ ref('bronze__contract_abis') }} +{{ config ( + materialized = 'incremental', + unique_key = 'complete_contract_abis_id', + cluster_by = 'partition_key', + post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION on equality(complete_contract_abis_id, contract_address)", + tags = ['noncore'] +) }} + +SELECT + partition_key, + contract_address, + file_name, + {{ dbt_utils.generate_surrogate_key( + ['contract_address'] + ) }} AS complete_contract_abis_id, + SYSDATE() AS inserted_timestamp, + SYSDATE() AS modified_timestamp, + _inserted_timestamp, + '{{ invocation_id }}' AS _invocation_id +FROM + +{% if is_incremental() %} +{{ ref('bronze__contract_abis') }} +WHERE + _inserted_timestamp >= ( + SELECT + COALESCE (MAX(_inserted_timestamp), '1970-01-01' :: TIMESTAMP) + FROM + {{ this }}) + and data:result::string not like 'Max calls per%' + {% else %} + {{ ref('bronze__contract_abis_fr') }} + where data:result::string not like 'Max calls per%' + {% endif %} + + qualify(ROW_NUMBER() over (PARTITION BY complete_contract_abis_id + ORDER BY + _inserted_timestamp DESC)) = 1 diff --git a/models/evm/streamline/silver/abis/streamline__contract_abis_realtime.sql b/models/evm/streamline/silver/abis/streamline__contract_abis_realtime.sql new file mode 100644 index 0000000..b15e982 --- /dev/null +++ b/models/evm/streamline/silver/abis/streamline__contract_abis_realtime.sql @@ -0,0 +1,81 @@ +{{ config ( + materialized = "view", + tags = ['noncore'] +) }} + +WITH recent_relevant_contracts AS ( + + SELECT + contract_address, + total_interaction_count, + GREATEST( + max_inserted_timestamp_logs, + max_inserted_timestamp_traces + ) max_inserted_timestamp + FROM + {{ ref('silver_evm__relevant_contracts') }} C + LEFT JOIN {{ ref("streamline__complete_contract_abis") }} + s USING (contract_address) + WHERE + s.contract_address IS NULL + AND total_interaction_count > 1000 + ORDER BY + total_interaction_count DESC + LIMIT + 750 +), all_contracts AS ( + SELECT + contract_address + FROM + recent_relevant_contracts + +{% if is_incremental() %} +UNION +SELECT + contract_address +FROM + {{ ref('_retry_abis') }} +{% endif %} +) +SELECT + contract_address, + DATE_PART('EPOCH_SECONDS', sysdate()::date) :: INT AS partition_key, + live.udf_api( + 'GET', + CONCAT( + 'https://api.etherscan.io/v2/api?chainid=1329&module=contract&action=getabi&address=', + contract_address, + '&apikey={KEY}' + ), + OBJECT_CONSTRUCT( + 'Content-Type', 'application/json', + 'fsc-quantum-state', 'streamline' + ), + {}, + 'Vault/prod/sei/etherscan' + ) AS request +FROM + all_contracts + +{# Streamline Function Call #} +{% if execute %} + {% set params = { + "external_table" :"contract_abis", + "sql_limit" : 1000, + "producer_batch_size" : 10, + "worker_batch_size" : 10, + "async_concurrent_requests" : 1, + "sql_source" : 'contract_abis_realtime' + } %} + + {% set function_call_sql %} + {{ fsc_utils.if_data_call_function_v2( + func = 'streamline.udf_bulk_rest_api_v2', + target = this.schema ~ "." ~ this.identifier, + params = params + ) }} + {% endset %} + + {% do run_query(function_call_sql) %} + {{ log("Streamline function call: " ~ function_call_sql, info=true) }} +{% endif %} \ No newline at end of file diff --git a/models/sources.yml b/models/sources.yml index c307700..1064af6 100644 --- a/models/sources.yml +++ b/models/sources.yml @@ -55,6 +55,7 @@ sources: - name: sei_addresses - name: evm_blocks_v2 - name: evm_transactions_v2 + - name: contract_abis - name: bronze schema: bronze tables: @@ -101,4 +102,15 @@ sources: - name: evm_known_event_sigs - name: evm_wrapped_assets - name: dates - - name: scoring_activity_categories \ No newline at end of file + - name: scoring_activity_categories + - name: abis_silver + database: "{{ target.database }}" + schema: silver_evm + tables: + - name: verified_abis + - name: complete_event_abis + - name: complete_streamline + database: "{{ target.database }}" + schema: streamline + tables: + - name: complete_contract_abis \ No newline at end of file