diff --git a/.github/workflows/dbt_run_overflowed_traces.yml b/.github/workflows/dbt_run_overflowed_traces.yml new file mode 100644 index 0000000..30add8f --- /dev/null +++ b/.github/workflows/dbt_run_overflowed_traces.yml @@ -0,0 +1,44 @@ +name: dbt_run_overflowed_traces +run-name: dbt_run_overflowed_traces + +on: + workflow_dispatch: + branches: + - "main" + +env: + DBT_PROFILES_DIR: ./ + + ACCOUNT: "${{ vars.ACCOUNT }}" + ROLE: "${{ vars.ROLE }}" + USER: "${{ vars.USER }}" + PASSWORD: "${{ secrets.PASSWORD }}" + REGION: "${{ vars.REGION }}" + DATABASE: "${{ vars.DATABASE }}" + WAREHOUSE: "${{ vars.WAREHOUSE }}" + SCHEMA: "${{ vars.SCHEMA }}" + +concurrency: + group: ${{ github.workflow }} + +jobs: + run_dbt_jobs: + runs-on: ubuntu-latest + environment: + name: workflow_prod + + steps: + - uses: actions/checkout@v3 + + - uses: actions/setup-python@v4 + with: + python-version: "3.10" + cache: "pip" + + - name: install dependencies + run: | + pip install -r requirements.txt + dbt deps + - name: Run DBT Jobs + run: | + dbt run -m "polygon_models,tag:overflowed_traces" --vars '{"OVERFLOWED_TRACES":True}' \ No newline at end of file diff --git a/dbt_project.yml b/dbt_project.yml index 7964817..c376551 100644 --- a/dbt_project.yml +++ b/dbt_project.yml @@ -64,6 +64,8 @@ vars: UPDATE_SNOWFLAKE_TAGS: True WAIT: 0 OBSERV_FULL_TEST: False + OVERFLOWED_RECEIPTS: False + OVERFLOWED_TRACES: False BALANCES_START: 0 BALANCES_END: 15000000 HEAL_MODEL: False diff --git a/models/bronze/overflow/bronze__overflowed_traces.sql b/models/bronze/overflow/bronze__overflowed_traces.sql new file mode 100644 index 0000000..d10f981 --- /dev/null +++ b/models/bronze/overflow/bronze__overflowed_traces.sql @@ -0,0 +1,80 @@ +{{ config ( + materialized = "view", + tags = ['overflowed_traces'] +) }} + +{% for item in range( + 1, + 11 + ) %} + + SELECT + o.file_name, + f.block_number, + f.index_vals, + f.path, + f.key, + f.value_ + FROM + ( + SELECT + file_name, + file_url, + index_cols, + [overflowed_block, overflowed_tx] AS index_vals + FROM + ( + SELECT + block_number, + POSITION, + file_name, + file_url, + index_cols, + VALUE [0] AS overflowed_block, + VALUE [1] AS overflowed_tx, + block_number = overflowed_block + AND POSITION = overflowed_tx AS missing + FROM + ( + SELECT + block_number, + POSITION, + file_name, + file_url, + index_cols, + utils.udf_detect_overflowed_responses( + file_url, + index_cols + ) AS index_vals + FROM + {{ ref("bronze__potential_overflowed_traces") }} + WHERE + row_no = {{ item }} + ), + LATERAL FLATTEN ( + input => index_vals + ) + ) + WHERE + missing = TRUE + ) o, + TABLE( + utils.udtf_flatten_overflowed_responses( + o.file_url, + o.index_cols, + [o.index_vals] + ) + ) f + WHERE + NOT IS_OBJECT( + f.value_ + ) + AND NOT IS_ARRAY( + f.value_ + ) + AND NOT IS_NULL_VALUE( + f.value_ + ) {% if not loop.last %} + UNION ALL + {% endif %} + {% endfor %} diff --git a/models/bronze/overflow/bronze__potential_overflowed_traces.sql b/models/bronze/overflow/bronze__potential_overflowed_traces.sql new file mode 100644 index 0000000..620976f --- /dev/null +++ b/models/bronze/overflow/bronze__potential_overflowed_traces.sql @@ -0,0 +1,139 @@ +{{ config ( + materialized = "view", + tags = ['overflowed_traces'] +) }} + +WITH impacted_blocks AS ( + + SELECT + 51073769 AS block_number + UNION + SELECT + 50888353 + UNION + SELECT + 51103282 + UNION + SELECT + 50888044 + UNION + SELECT + 51105228 + UNION + SELECT + 51073794 + UNION + SELECT + 51102645 + UNION + SELECT + 50020883 + UNION + SELECT + 51073411 + UNION + SELECT + 51073884 + UNION + SELECT + 51104249 + UNION + SELECT + 51073379 + UNION + SELECT + 51105014 + UNION + SELECT + 51073239 + UNION + SELECT + 51096591 + UNION + SELECT + 50890486 + UNION + SELECT + 51055050 + UNION + SELECT + 50020889 + UNION + SELECT + 51103946 + UNION + SELECT + 51068789 + UNION + SELECT + 50891139 + UNION + SELECT + 51073292 + UNION + SELECT + 51073493 + UNION + SELECT + 50891095 {# remove after filling + SELECT + VALUE :: INT AS block_number + FROM + ( + SELECT + blocks_impacted_array + FROM + {{ ref("silver_observability__traces_completeness") }} + ORDER BY + test_timestamp DESC + LIMIT + 1 + ), LATERAL FLATTEN ( + input => blocks_impacted_array + ) #} +), +all_txs AS ( + SELECT + t.block_number, + t.position, + t.tx_hash + FROM + {{ ref("silver__transactions") }} + t + JOIN impacted_blocks USING (block_number) +), +missing_txs AS ( + SELECT + DISTINCT block_number, + POSITION, + file_name + FROM + all_txs + LEFT JOIN {{ ref("silver__traces") }} + tr USING ( + block_number, + tx_hash + ) + JOIN {{ ref("streamline__complete_debug_traceBlockByNumber") }} USING (block_number) + WHERE + tr.tx_hash IS NULL +) +SELECT + block_number, + POSITION, + file_name, + build_scoped_file_url( + @streamline.bronze.external_tables, + file_name + ) AS file_url, + ['block_number', 'array_index'] AS index_cols, + ROW_NUMBER() over ( + ORDER BY + block_number ASC, + POSITION ASC + ) AS row_no +FROM + missing_txs +ORDER BY + block_number ASC, + POSITION ASC diff --git a/models/silver/core/overflow/formatted_views/silver__overflowed_traces.sql b/models/silver/core/overflow/formatted_views/silver__overflowed_traces.sql new file mode 100644 index 0000000..7345141 --- /dev/null +++ b/models/silver/core/overflow/formatted_views/silver__overflowed_traces.sql @@ -0,0 +1,183 @@ +{{ config ( + materialized = 'view', + tags = ['overflowed_traces'] +) }} + +WITH bronze_overflowed_traces AS ( + + SELECT + block_number :: INT AS block_number, + index_vals [1] :: INT AS tx_position, + IFF( + path IN ( + 'result', + 'result.value', + 'result.type', + 'result.to', + 'result.input', + 'result.gasUsed', + 'result.gas', + 'result.from', + 'result.output', + 'result.error', + 'result.revertReason', + 'gasUsed', + 'gas', + 'type', + 'to', + 'from', + 'value', + 'input', + 'error', + 'output', + 'revertReason', + 'txHash', + 'result.txHash' + ), + 'ORIGIN', + REGEXP_REPLACE(REGEXP_REPLACE(path, '[^0-9]+', '_'), '^_|_$', '') + ) AS trace_address, + SYSDATE() :: timestamp_ltz AS _inserted_timestamp, + OBJECT_AGG( + key, + value_ + ) AS trace_json, + CASE + WHEN trace_address = 'ORIGIN' THEN NULL + WHEN POSITION( + '_' IN trace_address + ) = 0 THEN 'ORIGIN' + ELSE REGEXP_REPLACE( + trace_address, + '_[0-9]+$', + '', + 1, + 1 + ) + END AS parent_trace_address, + SPLIT( + trace_address, + '_' + ) AS str_array + FROM + {{ ref("bronze__overflowed_traces") }} + GROUP BY + block_number, + tx_position, + trace_address, + _inserted_timestamp +), +sub_traces AS ( + SELECT + block_number, + tx_position, + parent_trace_address, + COUNT(*) AS sub_traces + FROM + bronze_overflowed_traces + GROUP BY + block_number, + tx_position, + parent_trace_address +), +num_array AS ( + SELECT + block_number, + tx_position, + trace_address, + ARRAY_AGG(flat_value) AS num_array + FROM + ( + SELECT + block_number, + tx_position, + trace_address, + IFF( + VALUE :: STRING = 'ORIGIN', + -1, + VALUE :: INT + ) AS flat_value + FROM + bronze_overflowed_traces, + LATERAL FLATTEN ( + input => str_array + ) + ) + GROUP BY + block_number, + tx_position, + trace_address +), +cleaned_traces AS ( + SELECT + b.block_number, + b.tx_position, + b.trace_address, + IFNULL( + sub_traces, + 0 + ) AS sub_traces, + num_array, + ROW_NUMBER() over ( + PARTITION BY b.block_number, + b.tx_position + ORDER BY + num_array ASC + ) - 1 AS trace_index, + trace_json, + b._inserted_timestamp + FROM + bronze_overflowed_traces b + LEFT JOIN sub_traces s + ON b.block_number = s.block_number + AND b.tx_position = s.tx_position + AND b.trace_address = s.parent_trace_address + JOIN num_array n + ON b.block_number = n.block_number + AND b.tx_position = n.tx_position + AND b.trace_address = n.trace_address +) +SELECT + tx_position, + trace_index, + block_number, + trace_address, + trace_json :error :: STRING AS error_reason, + trace_json :from :: STRING AS from_address, + trace_json :to :: STRING AS to_address, + IFNULL( + utils.udf_hex_to_int( + trace_json :value :: STRING + ), + '0' + ) AS matic_value_precise_raw, + utils.udf_decimal_adjust( + matic_value_precise_raw, + 18 + ) AS matic_value_precise, + matic_value_precise :: FLOAT AS matic_value, + utils.udf_hex_to_int( + trace_json :gas :: STRING + ) :: INT AS gas, + utils.udf_hex_to_int( + trace_json :gasUsed :: STRING + ) :: INT AS gas_used, + trace_json :input :: STRING AS input, + trace_json :output :: STRING AS output, + trace_json :type :: STRING AS TYPE, + concat_ws( + '_', + TYPE, + trace_address + ) AS identifier, + concat_ws( + '-', + block_number, + tx_position, + identifier + ) AS _call_id, + _inserted_timestamp, + trace_json AS DATA, + sub_traces +FROM + cleaned_traces diff --git a/models/silver/core/overflow/run_models/silver__run_overflowed_traces.sql b/models/silver/core/overflow/run_models/silver__run_overflowed_traces.sql new file mode 100644 index 0000000..d3b00e0 --- /dev/null +++ b/models/silver/core/overflow/run_models/silver__run_overflowed_traces.sql @@ -0,0 +1,52 @@ +{{ config( + materialized = 'incremental', + unique_key = 'test_timestamp', + tags = ['observability'] +) }} + +WITH base AS ( + + SELECT + blocks_impacted_count + FROM + {{ ref('silver_observability__traces_completeness') }} + WHERE + test_timestamp > DATEADD('day', -5, CURRENT_TIMESTAMP()) + ORDER BY + test_timestamp DESC + LIMIT + 1), run_model AS ( + SELECT + blocks_impacted_count, + github_actions.workflow_dispatches( + 'FlipsideCrypto', + 'polygon-models', + 'dbt_run_overflowed_traces.yml', + NULL + ) AS run_overflow_models + FROM + base + WHERE + blocks_impacted_count > 0 + ) + SELECT + dummy, + COALESCE( + blocks_impacted_count, + 0 + ) AS blocks_impacted_count, + COALESCE( + run_overflow_models, + OBJECT_CONSTRUCT( + 'status', + 'skipped' + ) + ) AS run_overflow_models, + SYSDATE() AS test_timestamp + FROM + ( + SELECT + 1 AS dummy + ) + LEFT JOIN run_model + ON 1 = 1 diff --git a/models/silver/core/silver__traces.sql b/models/silver/core/silver__traces.sql index 2ec293e..e719e96 100644 --- a/models/silver/core/silver__traces.sql +++ b/models/silver/core/silver__traces.sql @@ -1,4 +1,5 @@ -- depends_on: {{ ref('bronze__streamline_traces') }} +{% set warehouse = 'DBT_SNOWPARK' if var('OVERFLOWED_TRACES') else target.warehouse %} {{ config ( materialized = "incremental", incremental_strategy = 'delete+insert', @@ -6,7 +7,8 @@ cluster_by = "block_timestamp::date, _inserted_timestamp::date", post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION", full_refresh = false, - tags = ['non_realtime'] + tags = ['non_realtime', 'overflowed_traces'], + snowflake_warehouse = warehouse ) }} WITH bronze_traces AS ( @@ -321,6 +323,59 @@ missing_data AS ( t.is_pending ) {% endif %}, + +{% if is_incremental() and var( + 'OVERFLOWED_TRACES', +) %} +overflowed_traces AS ( + SELECT + t.block_number, + txs.tx_hash, + txs.block_timestamp, + txs.tx_status, + t.tx_position, + t.trace_index, + t.from_address, + t.to_address, + t.matic_value_precise_raw, + t.matic_value_precise, + t.matic_value, + t.gas, + t.gas_used, + t.input, + t.output, + t.type, + t.identifier, + t.sub_traces, + t.error_reason, + IFF( + t.error_reason IS NULL, + 'SUCCESS', + 'FAIL' + ) AS trace_status, + t.data, + IFF( + txs.tx_hash IS NULL + OR txs.block_timestamp IS NULL + OR txs.tx_status IS NULL, + TRUE, + FALSE + ) AS is_pending, + t._call_id, + txs._inserted_timestamp AS _inserted_timestamp + FROM + {{ source( + 'polygon_silver', + 'overflowed_traces' + ) }} + t + LEFT JOIN {{ ref('silver__transactions') }} + txs + ON t.tx_position = txs.position + AND t.block_number = txs.block_number +), +{% endif %} + FINAL AS ( SELECT block_number, @@ -379,6 +434,83 @@ SELECT _inserted_timestamp FROM missing_data +UNION ALL +SELECT + block_number, + tx_hash, + block_timestamp, + tx_status, + tx_position, + trace_index, + from_address, + to_address, + matic_value_precise_raw, + matic_value_precise, + matic_value, + gas, + gas_used, + input, + output, + TYPE, + identifier, + sub_traces, + error_reason, + trace_status, + DATA, + is_pending, + _call_id, + _inserted_timestamp +FROM + {{ this }} + INNER JOIN ( + SELECT + DISTINCT block_number + FROM + missing_data + +{% if is_incremental() and var( + 'OVERFLOWED_TRACES', +) %} +UNION +SELECT + DISTINCT block_number +FROM + overflowed_traces +{% endif %} +) USING (block_number) +{% endif %} + +{% if is_incremental() and var( + 'OVERFLOWED_TRACES', +) %} +UNION ALL +SELECT + block_number, + tx_hash, + block_timestamp, + tx_status, + tx_position, + trace_index, + from_address, + to_address, + matic_value_precise_raw, + matic_value_precise, + matic_value, + gas, + gas_used, + input, + output, + TYPE, + identifier, + sub_traces, + error_reason, + trace_status, + DATA, + is_pending, + _call_id, + _inserted_timestamp +FROM + overflowed_traces {% endif %} ) SELECT diff --git a/models/sources.yml b/models/sources.yml index 324025a..8513f09 100644 --- a/models/sources.yml +++ b/models/sources.yml @@ -59,6 +59,7 @@ sources: schema: silver tables: - name: verified_abis + - name: overflowed_traces - name: polygon_bronze_api database: polygon schema: bronze_api