From 687259c871fef172f40bc567be94390e978ab6e2 Mon Sep 17 00:00:00 2001 From: sam Date: Thu, 23 Jan 2025 19:55:11 +0800 Subject: [PATCH] add --- .github/workflows/dbt_run_traces_fix.yml | 50 -- models/gold/core/core__fact_traces.sql | 484 ++++++++++++++++- models/gold/core/core__fact_traces.yml | 62 ++- ...ver_observability__traces_completeness.sql | 2 +- models/silver/abis/silver__proxies.sql | 6 +- .../abis/silver__user_verified_abis.sql | 2 +- .../silver/core/silver__created_contracts.sql | 4 +- models/silver/core/silver__fact_traces2.sql | 486 ------------------ .../silver/core/silver__native_transfers.sql | 24 +- .../core/silver__relevant_contracts.sql | 6 +- models/silver/core/silver__traces.sql | 359 ++----------- models/silver/core/silver__traces2.sql | 15 - models/silver/core/silver__traces_old.sql | 424 +++++++++++++++ .../tests/traces/test_silver__traces_full.sql | 2 +- .../traces/test_silver__traces_recent.sql | 2 +- .../traces2_fix/silver__fact_traces2_fix.sql | 13 +- .../silver/core/retry/_missing_traces.sql | 4 +- 17 files changed, 1017 insertions(+), 928 deletions(-) delete mode 100644 .github/workflows/dbt_run_traces_fix.yml delete mode 100644 models/silver/core/silver__fact_traces2.sql delete mode 100644 models/silver/core/silver__traces2.sql create mode 100644 models/silver/core/silver__traces_old.sql diff --git a/.github/workflows/dbt_run_traces_fix.yml b/.github/workflows/dbt_run_traces_fix.yml deleted file mode 100644 index 7d076cb..0000000 --- a/.github/workflows/dbt_run_traces_fix.yml +++ /dev/null @@ -1,50 +0,0 @@ -name: dbt_run_traces_fix -run-name: dbt_run_traces_fix - -on: - workflow_dispatch: - inputs: - use_xl_env: - description: "Use the 2xl environment" - type: boolean - schedule: - # every 15 minutes (see https://crontab.guru) - - cron: "*/15 * * * *" - -env: - DBT_PROFILES_DIR: ./ - - ACCOUNT: "${{ vars.ACCOUNT }}" - ROLE: "${{ vars.ROLE }}" - USER: "${{ vars.USER }}" - PASSWORD: "${{ secrets.PASSWORD }}" - REGION: "${{ vars.REGION }}" - DATABASE: "${{ vars.DATABASE }}" - WAREHOUSE: "${{ vars.WAREHOUSE }}" - SCHEMA: "${{ vars.SCHEMA }}" - -concurrency: - group: ${{ github.workflow }} - -jobs: - run_dbt_jobs: - runs-on: ubuntu-latest - environment: - name: ${{ github.event_name == 'workflow_dispatch' && inputs.use_xl_env && 'workflow_prod_2xl' || 'workflow_prod_backfill' }} - - steps: - - uses: actions/checkout@v3 - - - uses: actions/setup-python@v4 - with: - python-version: "3.10" - cache: "pip" - - - name: install dependencies - run: | - pip install -r requirements.txt - dbt deps - - - name: run traces fix model - run: | - dbt run -m "blast_models,tag:traces_fix" diff --git a/models/gold/core/core__fact_traces.sql b/models/gold/core/core__fact_traces.sql index eeca29e..1447823 100644 --- a/models/gold/core/core__fact_traces.sql +++ b/models/gold/core/core__fact_traces.sql @@ -1,32 +1,486 @@ -{{ config( - materialized = 'view', - persist_docs ={ "relation": true, - "columns": true } +{{ config ( + materialized = "incremental", + incremental_strategy = 'delete+insert', + unique_key = "block_number", + incremental_predicates = [fsc_evm.standard_predicate()], + cluster_by = "block_timestamp::date", + post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION", + tags = ['non_realtime','core'], + full_refresh = false ) }} +WITH silver_traces AS ( + + SELECT + block_number, + tx_position, + trace_address, + parent_trace_address, + trace_address_array, + trace_json, + traces_id, + 'regular' AS source + FROM + {{ ref('silver__traces') }} + WHERE + 1 = 1 + +{% if is_incremental() and not full_reload_mode %} +AND modified_timestamp > ( + SELECT + MAX(modified_timestamp) + FROM + {{ this }} +) {% elif is_incremental() and full_reload_mode %} +AND block_number BETWEEN ( + SELECT + MAX( + block_number + ) + FROM + {{ this }} +) +AND ( + SELECT + MAX( + block_number + ) + 1000000 + FROM + {{ this }} +) +{% else %} + AND block_number <= 3000000 +{% endif %} +), +sub_traces AS ( + SELECT + block_number, + tx_position, + parent_trace_address, + COUNT(*) AS sub_traces + FROM + silver_traces + GROUP BY + block_number, + tx_position, + parent_trace_address +), +trace_index_array AS ( + SELECT + block_number, + tx_position, + trace_address, + ARRAY_AGG(flat_value) AS number_array + FROM + ( + SELECT + block_number, + tx_position, + trace_address, + IFF( + VALUE :: STRING = 'ORIGIN', + -1, + VALUE :: INT + ) AS flat_value + FROM + silver_traces, + LATERAL FLATTEN ( + input => trace_address_array + ) + ) + GROUP BY + block_number, + tx_position, + trace_address +), +trace_index_sub_traces AS ( + SELECT + b.block_number, + b.tx_position, + b.trace_address, + IFNULL( + sub_traces, + 0 + ) AS sub_traces, + number_array, + ROW_NUMBER() over ( + PARTITION BY b.block_number, + b.tx_position + ORDER BY + number_array ASC + ) - 1 AS trace_index, + b.trace_json, + b.traces_id, + b.source + FROM + silver_traces b + LEFT JOIN sub_traces s + ON b.block_number = s.block_number + AND b.tx_position = s.tx_position + AND b.trace_address = s.parent_trace_address + JOIN trace_index_array n + ON b.block_number = n.block_number + AND b.tx_position = n.tx_position + AND b.trace_address = n.trace_address +), +errored_traces AS ( + SELECT + block_number, + tx_position, + trace_address, + trace_json + FROM + trace_index_sub_traces + WHERE + trace_json :error :: STRING IS NOT NULL +), +error_logic AS ( + SELECT + b0.block_number, + b0.tx_position, + b0.trace_address, + b0.trace_json :error :: STRING AS error, + b1.trace_json :error :: STRING AS any_error, + b2.trace_json :error :: STRING AS origin_error + FROM + trace_index_sub_traces b0 + LEFT JOIN errored_traces b1 + ON b0.block_number = b1.block_number + AND b0.tx_position = b1.tx_position + AND b0.trace_address RLIKE CONCAT( + '^', + b1.trace_address, + '(_[0-9]+)*$' + ) + LEFT JOIN errored_traces b2 + ON b0.block_number = b2.block_number + AND b0.tx_position = b2.tx_position + AND b2.trace_address = 'ORIGIN' +), +aggregated_errors AS ( + SELECT + block_number, + tx_position, + trace_address, + error, + IFF(MAX(any_error) IS NULL + AND error IS NULL + AND origin_error IS NULL, TRUE, FALSE) AS trace_succeeded + FROM + error_logic + GROUP BY + block_number, + tx_position, + trace_address, + error, + origin_error), + json_traces AS ( + SELECT + block_number, + tx_position, + trace_address, + sub_traces, + number_array, + trace_index, + trace_json AS DATA, + trace_succeeded, + trace_json :error :: STRING AS error_reason, + trace_json :revertReason :: STRING AS revert_reason, + trace_json :from :: STRING AS from_address, + trace_json :to :: STRING AS to_address, + IFNULL( + trace_json :value :: STRING, + '0x0' + ) AS value_hex, + IFNULL( + utils.udf_hex_to_int( + trace_json :value :: STRING + ), + '0' + ) AS value_precise_raw, + utils.udf_decimal_adjust( + value_precise_raw, + 18 + ) AS value_precise, + value_precise :: FLOAT AS VALUE, + utils.udf_hex_to_int( + trace_json :gas :: STRING + ) :: INT AS gas, + utils.udf_hex_to_int( + trace_json :gasUsed :: STRING + ) :: INT AS gas_used, + trace_json :input :: STRING AS input, + trace_json :output :: STRING AS output, + trace_json :type :: STRING AS TYPE, + concat_ws( + '_', + TYPE, + trace_address + ) AS identifier, + IFF( + trace_succeeded, + 'SUCCESS', + 'FAIL' + ) AS trace_status, + traces_id + FROM + trace_index_sub_traces + JOIN aggregated_errors USING ( + block_number, + tx_position, + trace_address + ) + ), + incremental_traces AS ( + SELECT + f.block_number, + t.tx_hash, + t.block_timestamp, + t.origin_function_signature, + t.from_address AS origin_from_address, + t.to_address AS origin_to_address, + t.tx_status, + f.tx_position, + f.trace_index, + f.from_address AS from_address, + f.to_address AS to_address, + f.value_hex, + f.value_precise_raw, + f.value_precise, + f.value, + f.gas, + f.gas_used, + f.input, + f.output, + f.type, + f.identifier, + f.sub_traces, + f.error_reason, + f.revert_reason, + f.trace_status, + f.data, + f.traces_id, + f.trace_succeeded, + f.trace_address, + IFF( + t.tx_status = 'SUCCESS', + TRUE, + FALSE + ) AS tx_succeeded + FROM + json_traces f + LEFT OUTER JOIN {{ ref('silver__transactions') }} + t + ON f.tx_position = t.position + AND f.block_number = t.block_number + +{% if is_incremental() and not full_reload_mode %} +AND t.modified_timestamp >= ( + SELECT + DATEADD('hour', -24, MAX(modified_timestamp)) + FROM + {{ this }}) + {% endif %} +) + +{% if is_incremental() %}, +overflow_blocks AS ( + SELECT + DISTINCT block_number + FROM + silver_traces + WHERE + source = 'overflow' +), +heal_missing_data AS ( + SELECT + t.block_number, + txs.tx_hash, + txs.block_timestamp, + txs.origin_function_signature, + txs.from_address AS origin_from_address, + txs.to_address AS origin_to_address, + txs.tx_status, + t.tx_position, + t.trace_index, + t.from_address, + t.to_address, + t.value_hex, + t.value_precise_raw, + t.value_precise, + t.value, + t.gas, + t.gas_used, + t.input, + t.output, + t.type, + t.identifier, + t.sub_traces, + t.error_reason, + t.revert_reason, + t.trace_status, + t.data, + t.fact_traces_id AS traces_id, + t.trace_succeeded, + t.trace_address, + IFF( + txs.tx_status = 'SUCCESS', + TRUE, + FALSE + ) AS tx_succeeded + FROM + {{ this }} + t + JOIN {{ ref('silver__transactions') }} + txs + ON t.tx_position = txs.position + AND t.block_number = txs.block_number + WHERE + t.tx_hash IS NULL + OR t.block_timestamp IS NULL + OR t.tx_status IS NULL +) +{% endif %}, +all_traces AS ( + SELECT + block_number, + tx_hash, + block_timestamp, + origin_function_signature, + origin_from_address, + origin_to_address, + tx_status, + tx_position, + trace_index, + from_address, + to_address, + value_hex, + value_precise_raw, + value_precise, + VALUE, + gas, + gas_used, + input, + output, + TYPE, + identifier, + sub_traces, + error_reason, + revert_reason, + trace_status, + DATA, + trace_succeeded, + trace_address, + tx_succeeded + FROM + incremental_traces + +{% if is_incremental() %} +UNION ALL SELECT - tx_hash, block_number, + tx_hash, block_timestamp, + origin_function_signature, + origin_from_address, + origin_to_address, + tx_status, + tx_position, + trace_index, from_address, to_address, - eth_value AS VALUE, - eth_value_precise_raw AS value_precise_raw, - eth_value_precise AS value_precise, + value_hex, + value_precise_raw, + value_precise, + VALUE, gas, gas_used, input, output, TYPE, identifier, + sub_traces, + error_reason, + revert_reason, + trace_status, + DATA, + trace_succeeded, + trace_address, + tx_succeeded +FROM + heal_missing_data +UNION ALL +SELECT + block_number, + tx_hash, + block_timestamp, + origin_function_signature, + origin_from_address, + origin_to_address, + tx_status, + tx_position, + trace_index, + from_address, + to_address, + value_hex, + value_precise_raw, + value_precise, + VALUE, + gas, + gas_used, + input, + output, + TYPE, + identifier, + sub_traces, + error_reason, + revert_reason, + trace_status, + DATA, + trace_succeeded, + trace_address, + tx_succeeded +FROM + {{ this }} + JOIN overflow_blocks USING (block_number) +{% endif %} +) +SELECT + block_number, + block_timestamp, + tx_hash, + tx_position, + trace_index, + from_address, + to_address, + input, + output, + TYPE, + trace_address, + sub_traces, + VALUE, + value_precise_raw, + value_precise, + value_hex, + gas, + gas_used, + origin_from_address, + origin_to_address, + origin_function_signature, + trace_succeeded, + error_reason, + revert_reason, + tx_succeeded, + identifier, DATA, tx_status, - sub_traces, trace_status, - error_reason, - trace_index, - traces_id AS fact_traces_id, - inserted_timestamp, - modified_timestamp + {{ dbt_utils.generate_surrogate_key( + ['tx_hash', 'trace_index'] + ) }} AS fact_traces_id, + SYSDATE() AS inserted_timestamp, + SYSDATE() AS modified_timestamp FROM - {{ ref('silver__traces') }} + all_traces qualify(ROW_NUMBER() over(PARTITION BY block_number, tx_position, trace_index +ORDER BY + modified_timestamp DESC, block_timestamp DESC nulls last)) = 1 diff --git a/models/gold/core/core__fact_traces.yml b/models/gold/core/core__fact_traces.yml index 9504e4a..ba8a2e1 100644 --- a/models/gold/core/core__fact_traces.yml +++ b/models/gold/core/core__fact_traces.yml @@ -5,15 +5,33 @@ models: columns: - name: BLOCK_NUMBER - description: '{{ doc("blast_traces_block_no") }}' + description: '{{ doc("blast_traces_block_no") }}' - name: BLOCK_TIMESTAMP description: '{{ doc("blast_traces_blocktime") }}' - name: TX_HASH - description: '{{ doc("blast_traces_tx_hash") }}' + description: '{{ doc("blast_traces_tx_hash") }}' + - name: TX_POSITION + description: The position of the transaction within the block. + - name: TRACE_INDEX + description: The index of the trace within the transaction. - name: FROM_ADDRESS description: '{{ doc("blast_traces_from") }}' - name: TO_ADDRESS description: '{{ doc("blast_traces_to") }}' + - name: INPUT + description: '{{ doc("blast_traces_input") }}' + - name: OUTPUT + description: '{{ doc("blast_traces_output") }}' + - name: TYPE + description: '{{ doc("blast_traces_type") }}' + - name: TRACE_ADDRESS + description: This field represents the position of the trace within the transaction. + - name: SUB_TRACES + description: '{{ doc("blast_traces_sub") }}' + - name: IDENTIFIER + description: '{{ doc("blast_traces_identifier") }}' + - name: DATA + description: '{{ doc("blast_traces_call_data") }}' - name: VALUE description: '{{ doc("blast_traces_value") }}' - name: VALUE_PRECISE_RAW @@ -24,33 +42,27 @@ models: description: '{{ doc("blast_traces_gas") }}' - name: GAS_USED description: '{{ doc("blast_traces_gas_used") }}' - - name: INPUT - description: '{{ doc("blast_traces_input") }}' - - name: OUTPUT - description: '{{ doc("blast_traces_output") }}' - - name: TYPE - description: '{{ doc("blast_traces_type") }}' - - name: IDENTIFIER - description: '{{ doc("blast_traces_identifier") }}' - - name: DATA - description: '{{ doc("blast_traces_call_data") }}' - - name: TX_STATUS - description: '{{ doc("blast_tx_status") }}' - - name: SUB_TRACES - description: '{{ doc("blast_traces_sub") }}' + - name: ORIGIN_FROM_ADDRESS + description: The from address at the transaction level. + - name: ORIGIN_TO_ADDRESS + description: The to address at the transaction level. + - name: ORIGIN_FUNCTION_SIGNATURE + description: The function signature at the transaction level. - name: TRACE_STATUS description: The status of the trace, either `SUCCESS` or `FAIL` + - name: TRACE_SUCCEEDED + description: Whether the trace succeeded or failed - name: ERROR_REASON description: The reason for the trace failure, if any. - - name: TRACE_INDEX - description: The index of the trace within the transaction. + - name: REVERT_REASON + description: The reason for the trace revert, if any. + - name: TX_STATUS + description: '{{ doc("blast_tx_status") }}' + - name: TX_SUCCEEDED + description: Whether the transaction succeeded or failed - name: FACT_TRACES_ID - description: '{{ doc("pk") }}' + description: '{{ doc("pk") }}' - name: INSERTED_TIMESTAMP - description: '{{ doc("inserted_timestamp") }}' + description: '{{ doc("inserted_timestamp") }}' - name: MODIFIED_TIMESTAMP - description: '{{ doc("modified_timestamp") }}' - - - - \ No newline at end of file + description: '{{ doc("modified_timestamp") }}' diff --git a/models/silver/_observability/silver_observability__traces_completeness.sql b/models/silver/_observability/silver_observability__traces_completeness.sql index a4ca260..932611c 100644 --- a/models/silver/_observability/silver_observability__traces_completeness.sql +++ b/models/silver/_observability/silver_observability__traces_completeness.sql @@ -81,7 +81,7 @@ broken_blocks AS ( FROM {{ ref("silver__transactions") }} tx - LEFT JOIN {{ ref("silver__traces") }} + LEFT JOIN {{ ref("core__fact_traces") }} tr USING ( block_number, tx_hash diff --git a/models/silver/abis/silver__proxies.sql b/models/silver/abis/silver__proxies.sql index 409c1ab..3cceeaf 100644 --- a/models/silver/abis/silver__proxies.sql +++ b/models/silver/abis/silver__proxies.sql @@ -11,9 +11,9 @@ WITH base AS ( from_address, to_address, MIN(block_number) AS start_block, - MAX(_inserted_timestamp) AS _inserted_timestamp + MAX(modified_timestamp) AS _inserted_timestamp FROM - {{ ref('silver__traces') }} + {{ ref('core__fact_traces') }} WHERE TYPE = 'DELEGATECALL' AND trace_status = 'SUCCESS' @@ -21,7 +21,7 @@ WITH base AS ( AND from_address != to_address -- exclude self-calls {% if is_incremental() %} -AND _inserted_timestamp >= ( +AND modified_timestamp >= ( SELECT MAX(_inserted_timestamp) - INTERVAL '24 hours' FROM diff --git a/models/silver/abis/silver__user_verified_abis.sql b/models/silver/abis/silver__user_verified_abis.sql index c03a123..45dce7d 100644 --- a/models/silver/abis/silver__user_verified_abis.sql +++ b/models/silver/abis/silver__user_verified_abis.sql @@ -473,7 +473,7 @@ valid_traces AS ( SELECT base_address FROM - {{ ref('silver__traces') }} + {{ ref('core__fact_traces') }} JOIN function_mapping ON function_sig = LEFT( input, diff --git a/models/silver/core/silver__created_contracts.sql b/models/silver/core/silver__created_contracts.sql index 35bc74d..bbc1f30 100644 --- a/models/silver/core/silver__created_contracts.sql +++ b/models/silver/core/silver__created_contracts.sql @@ -13,7 +13,7 @@ SELECT to_address AS created_contract_address, from_address AS creator_address, input AS created_contract_input, - _inserted_timestamp, + modified_timestamp AS _inserted_timestamp, {{ dbt_utils.generate_surrogate_key( ['to_address'] ) }} AS created_contracts_id, @@ -21,7 +21,7 @@ SELECT SYSDATE() AS modified_timestamp, '{{ invocation_id }}' AS _invocation_id FROM - {{ ref('silver__traces') }} + {{ ref('core__fact_traces') }} WHERE TYPE ILIKE 'create%' AND to_address IS NOT NULL diff --git a/models/silver/core/silver__fact_traces2.sql b/models/silver/core/silver__fact_traces2.sql deleted file mode 100644 index 6911d1e..0000000 --- a/models/silver/core/silver__fact_traces2.sql +++ /dev/null @@ -1,486 +0,0 @@ -{{ config ( - materialized = "incremental", - incremental_strategy = 'delete+insert', - unique_key = "block_number", - incremental_predicates = [fsc_evm.standard_predicate()], - cluster_by = "block_timestamp::date", - post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION", - tags = ['non_realtime','core'], - full_refresh = false -) }} - -WITH silver_traces AS ( - - SELECT - block_number, - tx_position, - trace_address, - parent_trace_address, - trace_address_array, - trace_json, - traces_id, - 'regular' AS source - FROM - {{ ref('silver__traces2') }} - WHERE - 1 = 1 - -{% if is_incremental() and not full_reload_mode %} -AND modified_timestamp > ( - SELECT - MAX(modified_timestamp) - FROM - {{ this }} -) {% elif is_incremental() and full_reload_mode %} -AND block_number BETWEEN ( - SELECT - MAX( - block_number - ) - FROM - {{ this }} -) -AND ( - SELECT - MAX( - block_number - ) + 1000000 - FROM - {{ this }} -) -{% else %} - AND block_number <= 3000000 -{% endif %} -), -sub_traces AS ( - SELECT - block_number, - tx_position, - parent_trace_address, - COUNT(*) AS sub_traces - FROM - silver_traces - GROUP BY - block_number, - tx_position, - parent_trace_address -), -trace_index_array AS ( - SELECT - block_number, - tx_position, - trace_address, - ARRAY_AGG(flat_value) AS number_array - FROM - ( - SELECT - block_number, - tx_position, - trace_address, - IFF( - VALUE :: STRING = 'ORIGIN', - -1, - VALUE :: INT - ) AS flat_value - FROM - silver_traces, - LATERAL FLATTEN ( - input => trace_address_array - ) - ) - GROUP BY - block_number, - tx_position, - trace_address -), -trace_index_sub_traces AS ( - SELECT - b.block_number, - b.tx_position, - b.trace_address, - IFNULL( - sub_traces, - 0 - ) AS sub_traces, - number_array, - ROW_NUMBER() over ( - PARTITION BY b.block_number, - b.tx_position - ORDER BY - number_array ASC - ) - 1 AS trace_index, - b.trace_json, - b.traces_id, - b.source - FROM - silver_traces b - LEFT JOIN sub_traces s - ON b.block_number = s.block_number - AND b.tx_position = s.tx_position - AND b.trace_address = s.parent_trace_address - JOIN trace_index_array n - ON b.block_number = n.block_number - AND b.tx_position = n.tx_position - AND b.trace_address = n.trace_address -), -errored_traces AS ( - SELECT - block_number, - tx_position, - trace_address, - trace_json - FROM - trace_index_sub_traces - WHERE - trace_json :error :: STRING IS NOT NULL -), -error_logic AS ( - SELECT - b0.block_number, - b0.tx_position, - b0.trace_address, - b0.trace_json :error :: STRING AS error, - b1.trace_json :error :: STRING AS any_error, - b2.trace_json :error :: STRING AS origin_error - FROM - trace_index_sub_traces b0 - LEFT JOIN errored_traces b1 - ON b0.block_number = b1.block_number - AND b0.tx_position = b1.tx_position - AND b0.trace_address RLIKE CONCAT( - '^', - b1.trace_address, - '(_[0-9]+)*$' - ) - LEFT JOIN errored_traces b2 - ON b0.block_number = b2.block_number - AND b0.tx_position = b2.tx_position - AND b2.trace_address = 'ORIGIN' -), -aggregated_errors AS ( - SELECT - block_number, - tx_position, - trace_address, - error, - IFF(MAX(any_error) IS NULL - AND error IS NULL - AND origin_error IS NULL, TRUE, FALSE) AS trace_succeeded - FROM - error_logic - GROUP BY - block_number, - tx_position, - trace_address, - error, - origin_error), - json_traces AS ( - SELECT - block_number, - tx_position, - trace_address, - sub_traces, - number_array, - trace_index, - trace_json AS DATA, - trace_succeeded, - trace_json :error :: STRING AS error_reason, - trace_json :revertReason :: STRING AS revert_reason, - trace_json :from :: STRING AS from_address, - trace_json :to :: STRING AS to_address, - IFNULL( - trace_json :value :: STRING, - '0x0' - ) AS value_hex, - IFNULL( - utils.udf_hex_to_int( - trace_json :value :: STRING - ), - '0' - ) AS value_precise_raw, - utils.udf_decimal_adjust( - value_precise_raw, - 18 - ) AS value_precise, - value_precise :: FLOAT AS VALUE, - utils.udf_hex_to_int( - trace_json :gas :: STRING - ) :: INT AS gas, - utils.udf_hex_to_int( - trace_json :gasUsed :: STRING - ) :: INT AS gas_used, - trace_json :input :: STRING AS input, - trace_json :output :: STRING AS output, - trace_json :type :: STRING AS TYPE, - concat_ws( - '_', - TYPE, - trace_address - ) AS identifier, - IFF( - trace_succeeded, - 'SUCCESS', - 'FAIL' - ) AS trace_status, - traces_id - FROM - trace_index_sub_traces - JOIN aggregated_errors USING ( - block_number, - tx_position, - trace_address - ) - ), - incremental_traces AS ( - SELECT - f.block_number, - t.tx_hash, - t.block_timestamp, - t.origin_function_signature, - t.from_address AS origin_from_address, - t.to_address AS origin_to_address, - t.tx_status, - f.tx_position, - f.trace_index, - f.from_address AS from_address, - f.to_address AS to_address, - f.value_hex, - f.value_precise_raw, - f.value_precise, - f.value, - f.gas, - f.gas_used, - f.input, - f.output, - f.type, - f.identifier, - f.sub_traces, - f.error_reason, - f.revert_reason, - f.trace_status, - f.data, - f.traces_id, - f.trace_succeeded, - f.trace_address, - IFF( - t.tx_status = 'SUCCESS', - TRUE, - FALSE - ) AS tx_succeeded - FROM - json_traces f - LEFT OUTER JOIN {{ ref('silver__transactions') }} - t - ON f.tx_position = t.position - AND f.block_number = t.block_number - -{% if is_incremental() and not full_reload_mode %} -AND t.modified_timestamp >= ( - SELECT - DATEADD('hour', -24, MAX(modified_timestamp)) - FROM - {{ this }}) - {% endif %} -) - -{% if is_incremental() %}, -overflow_blocks AS ( - SELECT - DISTINCT block_number - FROM - silver_traces - WHERE - source = 'overflow' -), -heal_missing_data AS ( - SELECT - t.block_number, - txs.tx_hash, - txs.block_timestamp, - txs.origin_function_signature, - txs.from_address AS origin_from_address, - txs.to_address AS origin_to_address, - txs.tx_status, - t.tx_position, - t.trace_index, - t.from_address, - t.to_address, - t.value_hex, - t.value_precise_raw, - t.value_precise, - t.value, - t.gas, - t.gas_used, - t.input, - t.output, - t.type, - t.identifier, - t.sub_traces, - t.error_reason, - t.revert_reason, - t.trace_status, - t.data, - t.fact_traces_id AS traces_id, - t.trace_succeeded, - t.trace_address, - IFF( - txs.tx_status = 'SUCCESS', - TRUE, - FALSE - ) AS tx_succeeded - FROM - {{ this }} - t - JOIN {{ ref('silver__transactions') }} - txs - ON t.tx_position = txs.position - AND t.block_number = txs.block_number - WHERE - t.tx_hash IS NULL - OR t.block_timestamp IS NULL - OR t.tx_status IS NULL -) -{% endif %}, -all_traces AS ( - SELECT - block_number, - tx_hash, - block_timestamp, - origin_function_signature, - origin_from_address, - origin_to_address, - tx_status, - tx_position, - trace_index, - from_address, - to_address, - value_hex, - value_precise_raw, - value_precise, - VALUE, - gas, - gas_used, - input, - output, - TYPE, - identifier, - sub_traces, - error_reason, - revert_reason, - trace_status, - DATA, - trace_succeeded, - trace_address, - tx_succeeded - FROM - incremental_traces - -{% if is_incremental() %} -UNION ALL -SELECT - block_number, - tx_hash, - block_timestamp, - origin_function_signature, - origin_from_address, - origin_to_address, - tx_status, - tx_position, - trace_index, - from_address, - to_address, - value_hex, - value_precise_raw, - value_precise, - VALUE, - gas, - gas_used, - input, - output, - TYPE, - identifier, - sub_traces, - error_reason, - revert_reason, - trace_status, - DATA, - trace_succeeded, - trace_address, - tx_succeeded -FROM - heal_missing_data -UNION ALL -SELECT - block_number, - tx_hash, - block_timestamp, - origin_function_signature, - origin_from_address, - origin_to_address, - tx_status, - tx_position, - trace_index, - from_address, - to_address, - value_hex, - value_precise_raw, - value_precise, - VALUE, - gas, - gas_used, - input, - output, - TYPE, - identifier, - sub_traces, - error_reason, - revert_reason, - trace_status, - DATA, - trace_succeeded, - trace_address, - tx_succeeded -FROM - {{ this }} - JOIN overflow_blocks USING (block_number) -{% endif %} -) -SELECT - block_number, - block_timestamp, - tx_hash, - tx_position, - trace_index, - from_address, - to_address, - input, - output, - TYPE, - trace_address, - sub_traces, - VALUE, - value_precise_raw, - value_precise, - value_hex, - gas, - gas_used, - origin_from_address, - origin_to_address, - origin_function_signature, - trace_succeeded, - error_reason, - revert_reason, - tx_succeeded, - identifier, - DATA, - tx_status, - trace_status, - {{ dbt_utils.generate_surrogate_key( - ['tx_hash', 'trace_index'] - ) }} AS fact_traces_id, - SYSDATE() AS inserted_timestamp, - SYSDATE() AS modified_timestamp -FROM - all_traces qualify(ROW_NUMBER() over(PARTITION BY block_number, tx_position, trace_index -ORDER BY - modified_timestamp DESC, block_timestamp DESC nulls last)) = 1 diff --git a/models/silver/core/silver__native_transfers.sql b/models/silver/core/silver__native_transfers.sql index e19d464..d96ec33 100644 --- a/models/silver/core/silver__native_transfers.sql +++ b/models/silver/core/silver__native_transfers.sql @@ -16,15 +16,24 @@ WITH eth_base AS ( identifier, from_address, to_address, - eth_value, - _call_id, - _inserted_timestamp, - eth_value_precise_raw, - eth_value_precise, + VALUE AS eth_value, + concat_ws( + '-', + block_number, + tx_position, + CONCAT( + TYPE, + '_', + trace_address + ) + ) AS _call_id, + modified_timestamp AS _inserted_timestamp, + value_precise_raw AS eth_value_precise_raw, + value_precise AS eth_value_precise, tx_position, trace_index FROM - {{ ref('silver__traces') }} + {{ ref('core__fact_traces') }} WHERE eth_value > 0 AND tx_status = 'SUCCESS' @@ -98,7 +107,8 @@ SELECT '{{ invocation_id }}' AS _invocation_id FROM eth_base A - LEFT JOIN {{ ref('silver__complete_token_prices') }} p + LEFT JOIN {{ ref('silver__complete_token_prices') }} + p ON DATE_TRUNC( 'hour', A.block_timestamp diff --git a/models/silver/core/silver__relevant_contracts.sql b/models/silver/core/silver__relevant_contracts.sql index 85b5b2b..23aefb7 100644 --- a/models/silver/core/silver__relevant_contracts.sql +++ b/models/silver/core/silver__relevant_contracts.sql @@ -31,10 +31,10 @@ function_calls AS ( SELECT to_address AS contract_address, COUNT(*) AS function_call_count, - MAX(_inserted_timestamp) AS max_inserted_timestamp_traces, + MAX(modified_timestamp) AS max_inserted_timestamp_traces, MAX(block_number) AS latest_call_block FROM - {{ ref('silver__traces') }} + {{ ref('core__fact_traces') }} WHERE tx_status = 'SUCCESS' AND trace_status = 'SUCCESS' @@ -43,7 +43,7 @@ function_calls AS ( AND input <> '0x' {% if is_incremental() %} -AND _inserted_timestamp > ( +AND modified_timestamp > ( SELECT MAX(max_inserted_timestamp_traces) FROM diff --git a/models/silver/core/silver__traces.sql b/models/silver/core/silver__traces.sql index 3074dcf..9042f5f 100644 --- a/models/silver/core/silver__traces.sql +++ b/models/silver/core/silver__traces.sql @@ -3,28 +3,28 @@ materialized = "incremental", incremental_strategy = 'delete+insert', unique_key = "block_number", - cluster_by = "block_timestamp::date, _inserted_timestamp::date", + cluster_by = ['modified_timestamp::DATE','partition_key'], post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION", tags = ['non_realtime'], full_refresh = false ) }} - +{# {{ fsc_evm.silver_traces_v1( +full_reload_start_block = 3000000, +full_reload_blocks = 1000000, +use_partition_key = TRUE +) }} +#} WITH bronze_traces AS ( SELECT - COALESCE( - VALUE :BLOCK_NUMBER :: INT, - metadata :request :"data" :id :: INT, - PARSE_JSON( - metadata :request :"data" - ) :id :: INT - ) AS block_number, + block_number, + partition_key, VALUE :array_index :: INT AS tx_position, DATA :result AS full_traces, _inserted_timestamp FROM -{% if is_incremental() %} +{% if is_incremental() and not full_reload_mode %} {{ ref('bronze__streamline_traces') }} WHERE _inserted_timestamp >= ( @@ -33,12 +33,25 @@ WHERE FROM {{ this }} ) - AND DATA :result IS NOT NULL + AND DATA :result IS NOT NULL {% elif is_incremental() and full_reload_mode %} + {{ ref('bronze__streamline_fr_traces') }} +WHERE + partition_key BETWEEN ( + SELECT + MAX(partition_key) - 100000 + FROM + {{ this }} + ) + AND ( + SELECT + MAX(partition_key) + 1000000 + FROM + {{ this }} + ) {% else %} {{ ref('bronze__streamline_fr_traces') }} WHERE - _partition_by_block_id <= 2000000 - AND DATA :result IS NOT NULL + partition_key <= 3000000 {% endif %} qualify(ROW_NUMBER() over (PARTITION BY block_number, tx_position @@ -49,6 +62,7 @@ flatten_traces AS ( SELECT block_number, tx_position, + partition_key, IFF( path IN ( 'result', @@ -62,6 +76,7 @@ flatten_traces AS ( 'result.output', 'result.error', 'result.revertReason', + 'result.time', 'gasUsed', 'gas', 'type', @@ -71,6 +86,7 @@ flatten_traces AS ( 'input', 'error', 'output', + 'time', 'revertReason' ), 'ORIGIN', @@ -97,7 +113,7 @@ flatten_traces AS ( SPLIT( trace_address, '_' - ) AS str_array + ) AS trace_address_array FROM bronze_traces txs, TABLE( @@ -115,310 +131,35 @@ flatten_traces AS ( GROUP BY block_number, tx_position, + partition_key, trace_address, _inserted_timestamp -), -sub_traces AS ( - SELECT - block_number, - tx_position, - parent_trace_address, - COUNT(*) AS sub_traces - FROM - flatten_traces - GROUP BY - block_number, - tx_position, - parent_trace_address -), -num_array AS ( - SELECT - block_number, - tx_position, - trace_address, - ARRAY_AGG(flat_value) AS num_array - FROM - ( - SELECT - block_number, - tx_position, - trace_address, - IFF( - VALUE :: STRING = 'ORIGIN', - -1, - VALUE :: INT - ) AS flat_value - FROM - flatten_traces, - LATERAL FLATTEN ( - input => str_array - ) - ) - GROUP BY - block_number, - tx_position, - trace_address -), -cleaned_traces AS ( - SELECT - b.block_number, - b.tx_position, - b.trace_address, - IFNULL( - sub_traces, - 0 - ) AS sub_traces, - num_array, - ROW_NUMBER() over ( - PARTITION BY b.block_number, - b.tx_position - ORDER BY - num_array ASC - ) - 1 AS trace_index, - trace_json, - b._inserted_timestamp - FROM - flatten_traces b - LEFT JOIN sub_traces s - ON b.block_number = s.block_number - AND b.tx_position = s.tx_position - AND b.trace_address = s.parent_trace_address - JOIN num_array n - ON b.block_number = n.block_number - AND b.tx_position = n.tx_position - AND b.trace_address = n.trace_address -), -final_traces AS ( - SELECT - tx_position, - trace_index, - block_number, - trace_address, - trace_json :error :: STRING AS error_reason, - trace_json :from :: STRING AS from_address, - trace_json :to :: STRING AS to_address, - IFNULL( - utils.udf_hex_to_int( - trace_json :value :: STRING - ), - '0' - ) AS eth_value_precise_raw, - utils.udf_decimal_adjust( - eth_value_precise_raw, - 18 - ) AS eth_value_precise, - eth_value_precise :: FLOAT AS eth_value, - utils.udf_hex_to_int( - trace_json :gas :: STRING - ) :: INT AS gas, - utils.udf_hex_to_int( - trace_json :gasUsed :: STRING - ) :: INT AS gas_used, - trace_json :input :: STRING AS input, - trace_json :output :: STRING AS output, - trace_json :type :: STRING AS TYPE, - concat_ws( - '_', - TYPE, - trace_address - ) AS identifier, - concat_ws( - '-', - block_number, - tx_position, - identifier - ) AS _call_id, - _inserted_timestamp, - trace_json AS DATA, - sub_traces - FROM - cleaned_traces -), -new_records AS ( - SELECT - f.block_number, - t.tx_hash, - t.block_timestamp, - t.tx_status, - f.tx_position, - f.trace_index, - f.from_address, - f.to_address, - f.eth_value_precise_raw, - f.eth_value_precise, - f.eth_value, - f.gas, - f.gas_used, - f.input, - f.output, - f.type, - f.identifier, - f.sub_traces, - f.error_reason, - IFF( - f.error_reason IS NULL, - 'SUCCESS', - 'FAIL' - ) AS trace_status, - f.data, - IFF( - t.tx_hash IS NULL - OR t.block_timestamp IS NULL - OR t.tx_status IS NULL, - TRUE, - FALSE - ) AS is_pending, - f._call_id, - f._inserted_timestamp - FROM - final_traces f - LEFT OUTER JOIN {{ ref('silver__transactions') }} - t - ON f.tx_position = t.position - AND f.block_number = t.block_number - -{% if is_incremental() %} -AND t._INSERTED_TIMESTAMP >= ( - SELECT - DATEADD('hour', -24, MAX(_inserted_timestamp)) - FROM - {{ this }}) - {% endif %} -) - -{% if is_incremental() %}, -missing_data AS ( - SELECT - t.block_number, - txs.tx_hash, - txs.block_timestamp, - txs.tx_status, - t.tx_position, - t.trace_index, - t.from_address, - t.to_address, - t.eth_value_precise_raw, - t.eth_value_precise, - t.eth_value, - t.gas, - t.gas_used, - t.input, - t.output, - t.type, - t.identifier, - t.sub_traces, - t.error_reason, - t.trace_status, - t.data, - FALSE AS is_pending, - t._call_id, - GREATEST( - t._inserted_timestamp, - txs._inserted_timestamp - ) AS _inserted_timestamp - FROM - {{ this }} - t - INNER JOIN {{ ref('silver__transactions') }} - txs - ON t.tx_position = txs.position - AND t.block_number = txs.block_number - WHERE - t.is_pending -) -{% endif %}, -FINAL AS ( - SELECT - block_number, - tx_hash, - block_timestamp, - tx_status, - tx_position, - trace_index, - from_address, - to_address, - eth_value_precise_raw, - eth_value_precise, - eth_value, - gas, - gas_used, - input, - output, - TYPE, - identifier, - sub_traces, - error_reason, - trace_status, - DATA, - is_pending, - _call_id, - _inserted_timestamp - FROM - new_records - -{% if is_incremental() %} -UNION -SELECT - block_number, - tx_hash, - block_timestamp, - tx_status, - tx_position, - trace_index, - from_address, - to_address, - eth_value_precise_raw, - eth_value_precise, - eth_value, - gas, - gas_used, - input, - output, - TYPE, - identifier, - sub_traces, - error_reason, - trace_status, - DATA, - is_pending, - _call_id, - _inserted_timestamp -FROM - missing_data -{% endif %} ) SELECT block_number, - tx_hash, - block_timestamp, - tx_status, tx_position, - trace_index, - from_address, - to_address, - eth_value_precise, - eth_value, - gas, - gas_used, - input, - output, - TYPE, - identifier, - sub_traces, - error_reason, - trace_status, - DATA, - is_pending, - _call_id, + trace_address, + parent_trace_address, + trace_address_array, + trace_json, + partition_key, _inserted_timestamp, - eth_value_precise_raw, - {{ dbt_utils.generate_surrogate_key( - ['tx_hash', 'trace_index'] - ) }} AS traces_id, + concat_ws( + '-', + CAST( + block_number AS VARCHAR + ), + CAST( + tx_position AS VARCHAR + ), + CAST( + trace_address AS VARCHAR + ) + ) AS traces_id, SYSDATE() AS inserted_timestamp, SYSDATE() AS modified_timestamp, '{{ invocation_id }}' AS _invocation_id FROM - FINAL qualify(ROW_NUMBER() over(PARTITION BY block_number, tx_position, trace_index + flatten_traces qualify(ROW_NUMBER() over(PARTITION BY traces_id ORDER BY - _inserted_timestamp DESC, is_pending ASC)) = 1 + _inserted_timestamp DESC)) = 1 diff --git a/models/silver/core/silver__traces2.sql b/models/silver/core/silver__traces2.sql deleted file mode 100644 index 553e5b6..0000000 --- a/models/silver/core/silver__traces2.sql +++ /dev/null @@ -1,15 +0,0 @@ --- depends_on: {{ ref('bronze__streamline_traces') }} -{{ config ( - materialized = "incremental", - incremental_strategy = 'delete+insert', - unique_key = "block_number", - cluster_by = ['modified_timestamp::DATE','partition_key'], - post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION", - tags = ['non_realtime'], - full_refresh = false -) }} -{{ fsc_evm.silver_traces_v1( - full_reload_start_block = 3000000, - full_reload_blocks = 1000000, - use_partition_key = true -) }} diff --git a/models/silver/core/silver__traces_old.sql b/models/silver/core/silver__traces_old.sql new file mode 100644 index 0000000..3074dcf --- /dev/null +++ b/models/silver/core/silver__traces_old.sql @@ -0,0 +1,424 @@ +-- depends_on: {{ ref('bronze__streamline_traces') }} +{{ config ( + materialized = "incremental", + incremental_strategy = 'delete+insert', + unique_key = "block_number", + cluster_by = "block_timestamp::date, _inserted_timestamp::date", + post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION", + tags = ['non_realtime'], + full_refresh = false +) }} + +WITH bronze_traces AS ( + + SELECT + COALESCE( + VALUE :BLOCK_NUMBER :: INT, + metadata :request :"data" :id :: INT, + PARSE_JSON( + metadata :request :"data" + ) :id :: INT + ) AS block_number, + VALUE :array_index :: INT AS tx_position, + DATA :result AS full_traces, + _inserted_timestamp + FROM + +{% if is_incremental() %} +{{ ref('bronze__streamline_traces') }} +WHERE + _inserted_timestamp >= ( + SELECT + MAX(_inserted_timestamp) _inserted_timestamp + FROM + {{ this }} + ) + AND DATA :result IS NOT NULL +{% else %} + {{ ref('bronze__streamline_fr_traces') }} +WHERE + _partition_by_block_id <= 2000000 + AND DATA :result IS NOT NULL +{% endif %} + +qualify(ROW_NUMBER() over (PARTITION BY block_number, tx_position +ORDER BY + _inserted_timestamp DESC)) = 1 +), +flatten_traces AS ( + SELECT + block_number, + tx_position, + IFF( + path IN ( + 'result', + 'result.value', + 'result.type', + 'result.to', + 'result.input', + 'result.gasUsed', + 'result.gas', + 'result.from', + 'result.output', + 'result.error', + 'result.revertReason', + 'gasUsed', + 'gas', + 'type', + 'to', + 'from', + 'value', + 'input', + 'error', + 'output', + 'revertReason' + ), + 'ORIGIN', + REGEXP_REPLACE(REGEXP_REPLACE(path, '[^0-9]+', '_'), '^_|_$', '') + ) AS trace_address, + _inserted_timestamp, + OBJECT_AGG( + key, + VALUE + ) AS trace_json, + CASE + WHEN trace_address = 'ORIGIN' THEN NULL + WHEN POSITION( + '_' IN trace_address + ) = 0 THEN 'ORIGIN' + ELSE REGEXP_REPLACE( + trace_address, + '_[0-9]+$', + '', + 1, + 1 + ) + END AS parent_trace_address, + SPLIT( + trace_address, + '_' + ) AS str_array + FROM + bronze_traces txs, + TABLE( + FLATTEN( + input => PARSE_JSON( + txs.full_traces + ), + recursive => TRUE + ) + ) f + WHERE + f.index IS NULL + AND f.key != 'calls' + AND f.path != 'result' + GROUP BY + block_number, + tx_position, + trace_address, + _inserted_timestamp +), +sub_traces AS ( + SELECT + block_number, + tx_position, + parent_trace_address, + COUNT(*) AS sub_traces + FROM + flatten_traces + GROUP BY + block_number, + tx_position, + parent_trace_address +), +num_array AS ( + SELECT + block_number, + tx_position, + trace_address, + ARRAY_AGG(flat_value) AS num_array + FROM + ( + SELECT + block_number, + tx_position, + trace_address, + IFF( + VALUE :: STRING = 'ORIGIN', + -1, + VALUE :: INT + ) AS flat_value + FROM + flatten_traces, + LATERAL FLATTEN ( + input => str_array + ) + ) + GROUP BY + block_number, + tx_position, + trace_address +), +cleaned_traces AS ( + SELECT + b.block_number, + b.tx_position, + b.trace_address, + IFNULL( + sub_traces, + 0 + ) AS sub_traces, + num_array, + ROW_NUMBER() over ( + PARTITION BY b.block_number, + b.tx_position + ORDER BY + num_array ASC + ) - 1 AS trace_index, + trace_json, + b._inserted_timestamp + FROM + flatten_traces b + LEFT JOIN sub_traces s + ON b.block_number = s.block_number + AND b.tx_position = s.tx_position + AND b.trace_address = s.parent_trace_address + JOIN num_array n + ON b.block_number = n.block_number + AND b.tx_position = n.tx_position + AND b.trace_address = n.trace_address +), +final_traces AS ( + SELECT + tx_position, + trace_index, + block_number, + trace_address, + trace_json :error :: STRING AS error_reason, + trace_json :from :: STRING AS from_address, + trace_json :to :: STRING AS to_address, + IFNULL( + utils.udf_hex_to_int( + trace_json :value :: STRING + ), + '0' + ) AS eth_value_precise_raw, + utils.udf_decimal_adjust( + eth_value_precise_raw, + 18 + ) AS eth_value_precise, + eth_value_precise :: FLOAT AS eth_value, + utils.udf_hex_to_int( + trace_json :gas :: STRING + ) :: INT AS gas, + utils.udf_hex_to_int( + trace_json :gasUsed :: STRING + ) :: INT AS gas_used, + trace_json :input :: STRING AS input, + trace_json :output :: STRING AS output, + trace_json :type :: STRING AS TYPE, + concat_ws( + '_', + TYPE, + trace_address + ) AS identifier, + concat_ws( + '-', + block_number, + tx_position, + identifier + ) AS _call_id, + _inserted_timestamp, + trace_json AS DATA, + sub_traces + FROM + cleaned_traces +), +new_records AS ( + SELECT + f.block_number, + t.tx_hash, + t.block_timestamp, + t.tx_status, + f.tx_position, + f.trace_index, + f.from_address, + f.to_address, + f.eth_value_precise_raw, + f.eth_value_precise, + f.eth_value, + f.gas, + f.gas_used, + f.input, + f.output, + f.type, + f.identifier, + f.sub_traces, + f.error_reason, + IFF( + f.error_reason IS NULL, + 'SUCCESS', + 'FAIL' + ) AS trace_status, + f.data, + IFF( + t.tx_hash IS NULL + OR t.block_timestamp IS NULL + OR t.tx_status IS NULL, + TRUE, + FALSE + ) AS is_pending, + f._call_id, + f._inserted_timestamp + FROM + final_traces f + LEFT OUTER JOIN {{ ref('silver__transactions') }} + t + ON f.tx_position = t.position + AND f.block_number = t.block_number + +{% if is_incremental() %} +AND t._INSERTED_TIMESTAMP >= ( + SELECT + DATEADD('hour', -24, MAX(_inserted_timestamp)) + FROM + {{ this }}) + {% endif %} +) + +{% if is_incremental() %}, +missing_data AS ( + SELECT + t.block_number, + txs.tx_hash, + txs.block_timestamp, + txs.tx_status, + t.tx_position, + t.trace_index, + t.from_address, + t.to_address, + t.eth_value_precise_raw, + t.eth_value_precise, + t.eth_value, + t.gas, + t.gas_used, + t.input, + t.output, + t.type, + t.identifier, + t.sub_traces, + t.error_reason, + t.trace_status, + t.data, + FALSE AS is_pending, + t._call_id, + GREATEST( + t._inserted_timestamp, + txs._inserted_timestamp + ) AS _inserted_timestamp + FROM + {{ this }} + t + INNER JOIN {{ ref('silver__transactions') }} + txs + ON t.tx_position = txs.position + AND t.block_number = txs.block_number + WHERE + t.is_pending +) +{% endif %}, +FINAL AS ( + SELECT + block_number, + tx_hash, + block_timestamp, + tx_status, + tx_position, + trace_index, + from_address, + to_address, + eth_value_precise_raw, + eth_value_precise, + eth_value, + gas, + gas_used, + input, + output, + TYPE, + identifier, + sub_traces, + error_reason, + trace_status, + DATA, + is_pending, + _call_id, + _inserted_timestamp + FROM + new_records + +{% if is_incremental() %} +UNION +SELECT + block_number, + tx_hash, + block_timestamp, + tx_status, + tx_position, + trace_index, + from_address, + to_address, + eth_value_precise_raw, + eth_value_precise, + eth_value, + gas, + gas_used, + input, + output, + TYPE, + identifier, + sub_traces, + error_reason, + trace_status, + DATA, + is_pending, + _call_id, + _inserted_timestamp +FROM + missing_data +{% endif %} +) +SELECT + block_number, + tx_hash, + block_timestamp, + tx_status, + tx_position, + trace_index, + from_address, + to_address, + eth_value_precise, + eth_value, + gas, + gas_used, + input, + output, + TYPE, + identifier, + sub_traces, + error_reason, + trace_status, + DATA, + is_pending, + _call_id, + _inserted_timestamp, + eth_value_precise_raw, + {{ dbt_utils.generate_surrogate_key( + ['tx_hash', 'trace_index'] + ) }} AS traces_id, + SYSDATE() AS inserted_timestamp, + SYSDATE() AS modified_timestamp, + '{{ invocation_id }}' AS _invocation_id +FROM + FINAL qualify(ROW_NUMBER() over(PARTITION BY block_number, tx_position, trace_index +ORDER BY + _inserted_timestamp DESC, is_pending ASC)) = 1 diff --git a/models/silver/core/tests/traces/test_silver__traces_full.sql b/models/silver/core/tests/traces/test_silver__traces_full.sql index be52e71..8d9a458 100644 --- a/models/silver/core/tests/traces/test_silver__traces_full.sql +++ b/models/silver/core/tests/traces/test_silver__traces_full.sql @@ -6,4 +6,4 @@ SELECT * FROM - {{ ref('silver__traces') }} + {{ ref('core__fact_traces') }} diff --git a/models/silver/core/tests/traces/test_silver__traces_recent.sql b/models/silver/core/tests/traces/test_silver__traces_recent.sql index 1dc3e76..027b038 100644 --- a/models/silver/core/tests/traces/test_silver__traces_recent.sql +++ b/models/silver/core/tests/traces/test_silver__traces_recent.sql @@ -13,7 +13,7 @@ WITH last_3_days AS ( SELECT * FROM - {{ ref('silver__traces') }} + {{ ref('core__fact_traces') }} WHERE block_number >= ( SELECT diff --git a/models/silver/core/traces2_fix/silver__fact_traces2_fix.sql b/models/silver/core/traces2_fix/silver__fact_traces2_fix.sql index 98397ee..a27b47e 100644 --- a/models/silver/core/traces2_fix/silver__fact_traces2_fix.sql +++ b/models/silver/core/traces2_fix/silver__fact_traces2_fix.sql @@ -1,12 +1,11 @@ -{{ config ( - materialized = "incremental", - incremental_strategy = 'delete+insert', - unique_key = ["block_number", "tx_position", "trace_address"], - tags = ['traces_fix'] +{# {{ config ( +materialized = "incremental", +incremental_strategy = 'delete+insert', +unique_key = ["block_number", "tx_position", "trace_address"], +tags = ['traces_fix'] ) }} {% set batch_query %} - SELECT MAX(next_batch_id) AS next_batch_id FROM @@ -161,4 +160,4 @@ aggregated_errors AS ( prod_trace_succeeded FROM batch - CROSS JOIN final_errors + CROSS JOIN final_errors #} diff --git a/models/streamline/silver/core/retry/_missing_traces.sql b/models/streamline/silver/core/retry/_missing_traces.sql index 8513fd6..f4ea89d 100644 --- a/models/streamline/silver/core/retry/_missing_traces.sql +++ b/models/streamline/silver/core/retry/_missing_traces.sql @@ -14,7 +14,7 @@ SELECT FROM {{ ref("silver__transactions") }} tx - LEFT JOIN {{ ref("silver__traces") }} + LEFT JOIN {{ ref("core__fact_traces") }} tr ON tx.block_number = tr.block_number AND tx.tx_hash = tr.tx_hash @@ -28,4 +28,4 @@ WHERE lookback ) AND tr.block_timestamp >= DATEADD('hour', -84, SYSDATE()) - AND tr.block_timestamp IS NOT NULL \ No newline at end of file + AND tr.block_timestamp IS NOT NULL