diff --git a/.github/workflows/dbt_run_overflowed_receipts.yml b/.github/workflows/dbt_run_overflowed_receipts.yml new file mode 100644 index 0000000..db3f0d8 --- /dev/null +++ b/.github/workflows/dbt_run_overflowed_receipts.yml @@ -0,0 +1,46 @@ +name: dbt_run_overflowed_receipts +run-name: dbt_run_overflowed_receipts + +on: + workflow_dispatch: + branches: + - "main" + +env: + DBT_PROFILES_DIR: ./ + + ACCOUNT: "${{ vars.ACCOUNT }}" + ROLE: "${{ vars.ROLE }}" + USER: "${{ vars.USER }}" + PASSWORD: "${{ secrets.PASSWORD }}" + REGION: "${{ vars.REGION }}" + DATABASE: "${{ vars.DATABASE }}" + WAREHOUSE: "${{ vars.WAREHOUSE }}" + SCHEMA: "${{ vars.SCHEMA }}" + +concurrency: + group: ${{ github.workflow }} + + + +jobs: + run_dbt_jobs: + runs-on: ubuntu-latest + environment: + name: workflow_prod + + steps: + - uses: actions/checkout@v3 + + - uses: actions/setup-python@v4 + with: + python-version: "3.10" + cache: "pip" + + - name: install dependencies + run: | + pip install -r requirements.txt + dbt deps + - name: Run DBT Jobs + run: | + dbt run -m "base_models,tag:overflowed_receipts" --vars '{"OVERFLOWED_RECEIPTS":True}' \ No newline at end of file diff --git a/dbt_project.yml b/dbt_project.yml index 1d16ae5..54919bb 100644 --- a/dbt_project.yml +++ b/dbt_project.yml @@ -67,6 +67,7 @@ vars: UPDATE_SNOWFLAKE_TAGS: True OBSERV_FULL_TEST: False OVERFLOWED_TRACES: False + OVERFLOWED_RECEIPTS: False WAIT: 0 HEAL_MODEL: False HEAL_MODELS: [] diff --git a/models/bronze/overflow/bronze__overflowed_logs.sql b/models/bronze/overflow/bronze__overflowed_logs.sql new file mode 100644 index 0000000..42f2283 --- /dev/null +++ b/models/bronze/overflow/bronze__overflowed_logs.sql @@ -0,0 +1,80 @@ +{{ config ( + materialized = "view", + tags = ['overflowed_receipts'] +) }} + +{% for item in range( + 1, + 21 + ) %} + + SELECT + o.file_name, + f.block_number, + f.index_vals, + f.path, + f.key, + f.value_ + FROM + ( + SELECT + file_name, + file_url, + index_cols, + [overflowed_block, overflowed_tx] AS index_vals + FROM + ( + SELECT + block_number, + POSITION, + file_name, + file_url, + index_cols, + VALUE [0] AS overflowed_block, + VALUE [1] AS overflowed_tx, + block_number = overflowed_block + AND POSITION = overflowed_tx AS missing + FROM + ( + SELECT + block_number, + POSITION, + file_name, + file_url, + index_cols, + utils.udf_detect_overflowed_responses( + file_url, + index_cols + ) AS index_vals + FROM + {{ ref("bronze__potential_overflowed_logs") }} + WHERE + row_no = {{ item }} + ), + LATERAL FLATTEN ( + input => index_vals + ) + ) + WHERE + missing = TRUE + ) o, + TABLE( + utils.udtf_flatten_overflowed_responses( + o.file_url, + o.index_cols, + [o.index_vals] + ) + ) f + WHERE + NOT IS_OBJECT( + f.value_ + ) + AND NOT IS_ARRAY( + f.value_ + ) + AND NOT IS_NULL_VALUE( + f.value_ + ) {% if not loop.last %} + UNION ALL + {% endif %} + {% endfor %} diff --git a/models/bronze/overflow/bronze__overflowed_receipts.sql b/models/bronze/overflow/bronze__overflowed_receipts.sql new file mode 100644 index 0000000..0d221b0 --- /dev/null +++ b/models/bronze/overflow/bronze__overflowed_receipts.sql @@ -0,0 +1,80 @@ +{{ config ( + materialized = "view", + tags = ['overflowed_receipts'] +) }} + +{% for item in range( + 1, + 21 + ) %} + + SELECT + o.file_name, + f.block_number, + f.index_vals, + f.path, + f.key, + f.value_ + FROM + ( + SELECT + file_name, + file_url, + index_cols, + [overflowed_block, overflowed_tx] AS index_vals + FROM + ( + SELECT + block_number, + POSITION, + file_name, + file_url, + index_cols, + VALUE [0] AS overflowed_block, + VALUE [1] AS overflowed_tx, + block_number = overflowed_block + AND POSITION = overflowed_tx AS missing + FROM + ( + SELECT + block_number, + POSITION, + file_name, + file_url, + index_cols, + utils.udf_detect_overflowed_responses( + file_url, + index_cols + ) AS index_vals + FROM + {{ ref("bronze__potential_overflowed_receipts") }} + WHERE + row_no = {{ item }} + ), + LATERAL FLATTEN ( + input => index_vals + ) + ) + WHERE + missing = TRUE + ) o, + TABLE( + utils.udtf_flatten_overflowed_responses( + o.file_url, + o.index_cols, + [o.index_vals] + ) + ) f + WHERE + NOT IS_OBJECT( + f.value_ + ) + AND NOT IS_ARRAY( + f.value_ + ) + AND NOT IS_NULL_VALUE( + f.value_ + ) {% if not loop.last %} + UNION ALL + {% endif %} + {% endfor %} diff --git a/models/bronze/overflow/bronze__potential_overflowed_logs.sql b/models/bronze/overflow/bronze__potential_overflowed_logs.sql new file mode 100644 index 0000000..8185da1 --- /dev/null +++ b/models/bronze/overflow/bronze__potential_overflowed_logs.sql @@ -0,0 +1,45 @@ +{{ config ( + materialized = "view", + tags = ['overflowed_receipts'] +) }} + +WITH missing_txs AS ( + + SELECT + r.block_number, + r.position, + r.tx_hash, + cr.file_name + FROM + {{ ref("silver__receipts") }} + r + JOIN {{ ref("streamline__receipts_complete") }} + cr + ON r.block_number = cr.block_number + LEFT JOIN {{ ref("silver__logs") }} + l + ON r.block_number = l.block_number + AND r.tx_hash = l.tx_hash + WHERE + l.tx_hash IS NULL +) +SELECT + block_number, + POSITION, + tx_hash, + file_name, + build_scoped_file_url( + @streamline.bronze.external_tables, + file_name + ) AS file_url, + ['block_number', 'array_index'] AS index_cols, + ROW_NUMBER() over ( + ORDER BY + block_number ASC, + POSITION ASC + ) AS row_no +FROM + missing_txs +ORDER BY + block_number ASC, + POSITION ASC \ No newline at end of file diff --git a/models/bronze/overflow/bronze__potential_overflowed_receipts.sql b/models/bronze/overflow/bronze__potential_overflowed_receipts.sql new file mode 100644 index 0000000..08bd5b9 --- /dev/null +++ b/models/bronze/overflow/bronze__potential_overflowed_receipts.sql @@ -0,0 +1,68 @@ +{{ config ( + materialized = "view", + tags = ['overflowed_receipts'] +) }} + +WITH impacted_blocks AS ( + + SELECT + VALUE :: INT AS block_number + FROM + ( + SELECT + blocks_impacted_array + FROM + {{ ref("silver_observability__receipts_completeness") }} + ORDER BY + test_timestamp DESC + LIMIT + 1 + ), LATERAL FLATTEN ( + input => blocks_impacted_array + ) +), +all_txs AS ( + SELECT + t.block_number, + t.position, + t.tx_hash + FROM + {{ ref("silver__transactions") }} + t + JOIN impacted_blocks USING (block_number) +), +missing_txs AS ( + SELECT + DISTINCT block_number, + POSITION, + file_name + FROM + all_txs + LEFT JOIN {{ ref("silver__receipts") }} + tr USING ( + block_number, + tx_hash + ) + JOIN {{ ref("streamline__receipts_complete") }} USING (block_number) + WHERE + tr.tx_hash IS NULL +) +SELECT + block_number, + POSITION, + file_name, + build_scoped_file_url( + @streamline.bronze.external_tables, + file_name + ) AS file_url, + ['block_number', 'array_index'] AS index_cols, + ROW_NUMBER() over ( + ORDER BY + block_number ASC, + POSITION ASC + ) AS row_no +FROM + missing_txs +ORDER BY + block_number ASC, + POSITION ASC diff --git a/models/silver/_observability/silver_observability__logs_completeness.sql b/models/silver/_observability/silver_observability__logs_completeness.sql index 452478d..19dc74c 100644 --- a/models/silver/_observability/silver_observability__logs_completeness.sql +++ b/models/silver/_observability/silver_observability__logs_completeness.sql @@ -49,7 +49,7 @@ AND ( LATERAL FLATTEN( input => blocks_impacted_array ) - HAVING block_number NOT IN (23411110, 23635928,23635927) + ) ) {% if var('OBSERV_FULL_TEST') %} OR block_number >= 0 diff --git a/models/silver/_observability/silver_observability__receipts_completeness.sql b/models/silver/_observability/silver_observability__receipts_completeness.sql index 638e073..f9fb8aa 100644 --- a/models/silver/_observability/silver_observability__receipts_completeness.sql +++ b/models/silver/_observability/silver_observability__receipts_completeness.sql @@ -49,7 +49,7 @@ AND ( LATERAL FLATTEN( input => blocks_impacted_array ) - HAVING block_number NOT IN (23411110, 23635928,23635927) + ) ) {% if var('OBSERV_FULL_TEST') %} OR block_number >= 0 diff --git a/models/silver/core/silver__logs.sql b/models/silver/core/silver__logs.sql index 781a667..64cbc7a 100644 --- a/models/silver/core/silver__logs.sql +++ b/models/silver/core/silver__logs.sql @@ -1,11 +1,13 @@ +{% set warehouse = 'DBT_SNOWPARK' if var('OVERFLOWED_RECEIPTS') else target.warehouse %} {{ config( materialized = 'incremental', incremental_strategy = 'delete+insert', unique_key = "block_number", cluster_by = "block_timestamp::date, _inserted_timestamp::date", post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION", - tags = ['core','non_realtime'], - full_refresh = false + tags = ['core','non_realtime','overflowed_receipts'], + full_refresh = false, + snowflake_warehouse = warehouse ) }} WITH base AS ( @@ -135,6 +137,78 @@ missing_data AS ( t.is_pending ) {% endif %}, +{% if is_incremental() and var( + 'OVERFLOWED_RECEIPTS', +) %} +overflowed_logs AS ( + SELECT + block_number, + block_timestamp, + tx_hash, + origin_from_address, + origin_to_address, + origin_function_signature, + tx_status, + contract_address, + block_hash, + DATA, + event_index, + event_removed, + topics, + _inserted_timestamp, + CONCAT( + tx_hash :: STRING, + '-', + event_index :: STRING + ) AS _log_id, + CASE + WHEN block_timestamp IS NULL + OR origin_function_signature IS NULL THEN TRUE + ELSE FALSE + END AS is_pending + FROM + {{ source( + 'base_silver', + 'overflowed_logs' + ) }} + -- source works around circular dependency + LEFT JOIN {{ ref('silver__transactions') }} + txs USING ( + block_number, + tx_hash + ) +), +existing_blocks AS ( + SELECT + block_number, + block_timestamp, + tx_hash, + origin_from_address, + origin_to_address, + origin_function_signature, + tx_status, + contract_address, + block_hash, + DATA, + event_index, + event_removed, + topics, + _inserted_timestamp, + _log_id, + is_pending + FROM + {{ this }} + JOIN ( + SELECT + DISTINCT block_number + FROM + overflowed_logs + ) USING ( + block_number + ) +), +{% endif %} + FINAL AS ( SELECT block_number, @@ -144,7 +218,6 @@ FINAL AS ( origin_to_address, origin_function_signature, tx_status, - {#tx_position,#} -- new contract_address, block_hash, DATA, @@ -167,7 +240,6 @@ SELECT origin_to_address, origin_function_signature, tx_status, - {#tx_position,#} -- new contract_address, block_hash, DATA, @@ -180,6 +252,51 @@ SELECT FROM missing_data {% endif %} + +{% if is_incremental() and var( + 'OVERFLOWED_RECEIPTS', +) %} +UNION ALL +SELECT + block_number, + block_timestamp, + tx_hash, + origin_from_address, + origin_to_address, + origin_function_signature, + tx_status, + contract_address, + block_hash, + DATA, + event_index, + event_removed, + topics, + _inserted_timestamp, + _log_id, + is_pending +FROM + overflowed_logs +UNION ALL +SELECT + block_number, + block_timestamp, + tx_hash, + origin_from_address, + origin_to_address, + origin_function_signature, + tx_status, + contract_address, + block_hash, + DATA, + event_index, + event_removed, + topics, + _inserted_timestamp, + _log_id, + is_pending +FROM + existing_blocks +{% endif %} ) SELECT *, diff --git a/models/silver/core/silver__receipts.sql b/models/silver/core/silver__receipts.sql index 80f73db..2c01f24 100644 --- a/models/silver/core/silver__receipts.sql +++ b/models/silver/core/silver__receipts.sql @@ -1,12 +1,14 @@ -- depends_on: {{ ref('bronze__receipts') }} +{% set warehouse = 'DBT_SNOWPARK' if var('OVERFLOWED_RECEIPTS') else target.warehouse %} {{ config( materialized = 'incremental', incremental_strategy = 'delete+insert', unique_key = "block_number", cluster_by = "ROUND(block_number, -3)", post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION ON EQUALITY(block_hash, tx_hash, from_address, to_address)", - tags = ['core','non_realtime'], - full_refresh = false + tags = ['core','non_realtime','overflowed_receipts'], + full_refresh = false, + snowflake_warehouse = warehouse ) }} WITH base AS ( @@ -119,10 +121,175 @@ FINAL AS ( FROM base ) +{% if is_incremental() and var( + 'OVERFLOWED_RECEIPTS', +) %}, +overflowed_receipts AS ( + SELECT + block_number, + block_hash, + blockNumber, + cumulative_gas_used, + effective_gas_price, + from_address, + gas_used, + [] :: variant AS logs, + logs_bloom, + status, + tx_success, + tx_status, + to_address1, + to_address, + tx_hash, + POSITION, + TYPE + FROM + {{ source( + 'base_silver', + 'overflowed_receipts' + ) }} + -- source works around circular dependency +), +existing_blocks AS ( + SELECT + block_number, + block_hash, + blockNumber, + cumulative_gas_used, + effective_gas_price, + from_address, + gas_used, + logs, + logs_bloom, + status, + tx_success, + tx_status, + to_address1, + to_address, + tx_hash, + POSITION, + TYPE, + _inserted_timestamp + FROM + {{ this }} + INNER JOIN ( + SELECT + DISTINCT block_number + FROM + overflowed_receipts + ) USING(block_number) +), +final_overflowed AS ( + SELECT + block_number, + block_hash, + blockNumber, + cumulative_gas_used, + effective_gas_price, + from_address, + gas_used, + logs, + logs_bloom, + status, + tx_success, + tx_status, + to_address1, + to_address, + tx_hash, + POSITION, + TYPE, + _inserted_timestamp, + FALSE AS overflowed + FROM + FINAL + UNION ALL + SELECT + block_number, + block_hash, + blockNumber, + cumulative_gas_used, + effective_gas_price, + from_address, + gas_used, + logs, + logs_bloom, + status, + tx_success, + tx_status, + to_address1, + to_address, + tx_hash, + POSITION, + TYPE, + _inserted_timestamp, + FALSE AS overflowed + FROM + existing_blocks + UNION ALL + SELECT + block_number, + block_hash, + blockNumber, + cumulative_gas_used, + effective_gas_price, + from_address, + gas_used, + logs, + logs_bloom, + status, + tx_success, + tx_status, + to_address1, + to_address, + tx_hash, + POSITION, + TYPE, + _inserted_timestamp, + TRUE AS overflowed + FROM + overflowed_receipts + INNER JOIN ( + SELECT + block_number, + MAX(_inserted_timestamp) AS _inserted_timestamp + FROM + existing_blocks + GROUP BY + block_number + ) USING( + block_number + ) +) +{% endif %} SELECT - * + block_number, + block_hash, + blockNumber, + cumulative_gas_used, + effective_gas_price, + from_address, + gas_used, + logs, + logs_bloom, + status, + tx_success, + tx_status, + to_address1, + to_address, + tx_hash, + POSITION, + TYPE, + _inserted_timestamp, + overflowed FROM + +{% if is_incremental() and var( + 'OVERFLOWED_RECEIPTS', +) %} +final_overflowed +{% else %} FINAL +{% endif %} WHERE tx_hash IS NOT NULL AND POSITION IS NOT NULL qualify(ROW_NUMBER() over (PARTITION BY block_number, POSITION diff --git a/models/silver/overflow/run_models/silver__run_overflowed_receipts.sql b/models/silver/overflow/run_models/silver__run_overflowed_receipts.sql new file mode 100644 index 0000000..1b3ee46 --- /dev/null +++ b/models/silver/overflow/run_models/silver__run_overflowed_receipts.sql @@ -0,0 +1,52 @@ +{{ config( + materialized = 'incremental', + unique_key = 'test_timestamp', + tags = ['observability'] +) }} + +WITH base AS ( + + SELECT + blocks_impacted_count + FROM + {{ ref('silver_observability__receipts_completeness') }} + WHERE + test_timestamp > DATEADD('day', -5, CURRENT_TIMESTAMP()) + ORDER BY + test_timestamp DESC + LIMIT + 1), run_model AS ( + SELECT + blocks_impacted_count, + github_actions.workflow_dispatches( + 'FlipsideCrypto', + 'bsc-models', + 'dbt_run_overflowed_receipts.yml', + NULL + ) AS run_overflow_models + FROM + base + WHERE + blocks_impacted_count > 0 + ) + SELECT + dummy, + COALESCE( + blocks_impacted_count, + 0 + ) AS blocks_impacted_count, + COALESCE( + run_overflow_models, + OBJECT_CONSTRUCT( + 'status', + 'skipped' + ) + ) AS run_overflow_models, + SYSDATE() AS test_timestamp + FROM + ( + SELECT + 1 AS dummy + ) + LEFT JOIN run_model + ON 1 = 1 diff --git a/models/silver/overflow/silver__overflowed_receipts.sql b/models/silver/overflow/silver__overflowed_receipts.sql new file mode 100644 index 0000000..aa0dde8 --- /dev/null +++ b/models/silver/overflow/silver__overflowed_receipts.sql @@ -0,0 +1,66 @@ +{{ config ( + materialized = 'view', + tags = ['overflowed_receipts'] +) }} + +WITH base AS ( + + SELECT + block_number, + index_vals [1] :: INT AS tx_position, + OBJECT_AGG( + key, + value_ + ) AS DATA + FROM + {{ ref("bronze__overflowed_receipts") }} + WHERE + path NOT LIKE '%[%' + GROUP BY + ALL +) +SELECT + block_number, + DATA :blockHash :: STRING AS block_hash, + utils.udf_hex_to_int( + DATA :blockNumber :: STRING + ) :: INT AS blockNumber, + utils.udf_hex_to_int( + DATA :cumulativeGasUsed :: STRING + ) :: INT AS cumulative_gas_used, + utils.udf_hex_to_int( + DATA :effectiveGasPrice :: STRING + ) :: INT / pow( + 10, + 9 + ) AS effective_gas_price, + DATA :from :: STRING AS from_address, + utils.udf_hex_to_int( + DATA :gasUsed :: STRING + ) :: INT AS gas_used, + DATA :logsBloom :: STRING AS logs_bloom, + utils.udf_hex_to_int( + DATA :status :: STRING + ) :: INT AS status, + CASE + WHEN status = 1 THEN TRUE + ELSE FALSE + END AS tx_success, + CASE + WHEN status = 1 THEN 'SUCCESS' + ELSE 'FAIL' + END AS tx_status, + DATA :to :: STRING AS to_address1, + CASE + WHEN to_address1 = '' THEN NULL + ELSE to_address1 + END AS to_address, + DATA :transactionHash :: STRING AS tx_hash, + utils.udf_hex_to_int( + DATA :transactionIndex :: STRING + ) :: INT AS POSITION, + utils.udf_hex_to_int( + DATA :type :: STRING + ) :: INT AS TYPE +FROM + base diff --git a/models/sources.yml b/models/sources.yml index 997e13c..834ba0b 100644 --- a/models/sources.yml +++ b/models/sources.yml @@ -69,8 +69,11 @@ sources: database: "{{ 'base' if target.database == 'BASE' else 'base_dev' }}" schema: silver tables: - - name: overflowed_traces - name: verified_abis + - name: overflowed_traces + - name: overflowed_traces2 + - name: overflowed_receipts + - name: overflowed_logs - name: base_gold database: "{{ 'base' if target.database == 'BASE' else 'base_dev' }}" schema: core