From e307841e9171958150bf26f72efe723574221813 Mon Sep 17 00:00:00 2001 From: Austin Date: Tue, 17 Jun 2025 12:03:47 -0400 Subject: [PATCH] 2.0 --- ...treamline_realtime_blocks_transactions.yml | 2 +- ...=> dbt_run_streamline_realtime_step_2.yml} | 6 +-- .github/workflows/dbt_test_daily.yml | 1 - .github/workflows/dbt_test_tasks.yml | 51 ------------------ .github/workflows/dbt_test_weekly.yml | 1 - data/github_actions__workflows.csv | 4 +- models/bronze/core/bronze__receipts.sql | 40 ++++++++++++++ models/bronze/core/bronze__receipts_fr.sql | 28 ++++++++++ models/bronze/core/bronze__receipts_fr_v2.sql | 40 ++++++++++++++ models/bronze/core/bronze__traces.sql | 40 ++++++++++++++ models/bronze/core/bronze__traces_fr.sql | 28 ++++++++++ models/bronze/core/bronze__traces_fr_v2.sql | 40 ++++++++++++++ models/silver/core/silver__blocks.sql | 4 +- models/silver/core/silver__transactions.sql | 2 +- .../complete/streamline__complete_blocks.sql | 9 +++- .../complete/streamline__complete_traces.sql | 9 +++- .../streamline__complete_transactions.sql | 10 ++-- .../streamline__complete_tx_receipts.sql | 10 +++- ...treamline__blocks_transactions_history.sql | 48 +++++++++++++++++ .../streamline__receipts_by_hash_history.sql | 53 +++++++++++++++++++ .../streamline__traces_by_hash_history.sql | 52 ++++++++++++++++++ .../history/streamline__traces_history.sql | 40 -------------- ...reamline__blocks_transactions_realtime.sql | 27 ++++++++-- .../streamline__receipts_by_hash_realtime.sql | 33 +++++++++--- .../streamline__traces_by_hash_realtime.sql | 33 +++++++++--- .../silver/streamline/streamline__blocks.sql | 2 +- .../streamline/streamline__get_chainhead.sql | 2 +- 27 files changed, 483 insertions(+), 132 deletions(-) rename .github/workflows/{dbt_run_streamline_realtime_tx_receipts.yml => dbt_run_streamline_realtime_step_2.yml} (88%) delete mode 100644 .github/workflows/dbt_test_tasks.yml create mode 100644 models/silver/streamline/history/streamline__blocks_transactions_history.sql create mode 100644 models/silver/streamline/history/streamline__receipts_by_hash_history.sql create mode 100644 models/silver/streamline/history/streamline__traces_by_hash_history.sql delete mode 100644 models/silver/streamline/history/streamline__traces_history.sql diff --git a/.github/workflows/dbt_run_streamline_realtime_blocks_transactions.yml b/.github/workflows/dbt_run_streamline_realtime_blocks_transactions.yml index d9b25c4..b805df5 100644 --- a/.github/workflows/dbt_run_streamline_realtime_blocks_transactions.yml +++ b/.github/workflows/dbt_run_streamline_realtime_blocks_transactions.yml @@ -45,7 +45,7 @@ jobs: - name: Run DBT Jobs run: | - dbt run --vars '{"STREAMLINE_INVOKE_STREAMS":True}' -m 1+models/silver/streamline/realtime/streamline__blocks_realtime.sql 1+models/silver/streamline/realtime/streamline__transactions_realtime.sql + dbt run --vars '{"STREAMLINE_INVOKE_STREAMS":True}' -m tag:streamline_core_evm_realtime - name: Store logs uses: actions/upload-artifact@v4 diff --git a/.github/workflows/dbt_run_streamline_realtime_tx_receipts.yml b/.github/workflows/dbt_run_streamline_realtime_step_2.yml similarity index 88% rename from .github/workflows/dbt_run_streamline_realtime_tx_receipts.yml rename to .github/workflows/dbt_run_streamline_realtime_step_2.yml index e9ac3f7..2716fbf 100644 --- a/.github/workflows/dbt_run_streamline_realtime_tx_receipts.yml +++ b/.github/workflows/dbt_run_streamline_realtime_step_2.yml @@ -1,5 +1,5 @@ -name: dbt_run_streamline_realtime_tx_receipts -run-name: dbt_run_streamline_realtime_tx_receipts +name: dbt_run_streamline_realtime_step_2 +run-name: dbt_run_streamline_realtime_step_2 on: workflow_dispatch: @@ -45,7 +45,7 @@ jobs: - name: Run DBT Jobs run: | - dbt run --vars '{"STREAMLINE_INVOKE_STREAMS":True}' -m 1+models/silver/streamline/realtime/streamline__tx_receipts_realtime.sql + dbt run --vars '{"STREAMLINE_INVOKE_STREAMS":True}' -m tag:streamline_core_evm_realtime_step_2 - name: Store logs uses: actions/upload-artifact@v4 diff --git a/.github/workflows/dbt_test_daily.yml b/.github/workflows/dbt_test_daily.yml index 96befdc..617975b 100644 --- a/.github/workflows/dbt_test_daily.yml +++ b/.github/workflows/dbt_test_daily.yml @@ -46,7 +46,6 @@ jobs: - name: Run DBT Jobs run: | dbt test --exclude tag:full_test --models tag:recent_test - continue-on-error: true notify-failure: needs: [run_dbt_jobs] diff --git a/.github/workflows/dbt_test_tasks.yml b/.github/workflows/dbt_test_tasks.yml deleted file mode 100644 index bcfc912..0000000 --- a/.github/workflows/dbt_test_tasks.yml +++ /dev/null @@ -1,51 +0,0 @@ -name: dbt_test_tasks - -on: - workflow_call: - -env: - USE_VARS: "${{ vars.USE_VARS }}" - DBT_VERSION: "${{ vars.DBT_VERSION }}" - DBT_PROFILES_DIR: "${{ vars.DBT_PROFILES_DIR }}" - ACCOUNT: "${{ vars.ACCOUNT }}" - ROLE: "${{ vars.ROLE }}" - USER: "${{ vars.USER }}" - PASSWORD: "${{ secrets.PASSWORD }}" - REGION: "${{ vars.REGION }}" - DATABASE: "${{ vars.DATABASE }}" - SCHEMA: "${{ vars.SCHEMA }}" - WAREHOUSE: "${{ vars.WAREHOUSE }}" - -concurrency: - group: ${{ github.workflow }} - cancel-in-progress: false - -jobs: - run_dbt_jobs: - runs-on: ubuntu-latest - environment: - name: workflow_prod - - steps: - - uses: actions/checkout@v3 - - - uses: actions/setup-python@v4 - with: - python-version: "3.10" - cache: "pip" - - - name: install dependencies - run: | - pip install -r requirements.txt - dbt deps - - - name: Run DBT Jobs - run: | - dbt test -m models/github_actions/github_actions__current_task_status.sql - - notify-failure: - needs: [run_dbt_jobs] - if: failure() - uses: ./.github/workflows/slack_notify.yml - secrets: - EVM_SLACK_WEBHOOK_URL: ${{ secrets.EVM_SLACK_WEBHOOK_URL }} \ No newline at end of file diff --git a/.github/workflows/dbt_test_weekly.yml b/.github/workflows/dbt_test_weekly.yml index 5af2efe..f69436b 100644 --- a/.github/workflows/dbt_test_weekly.yml +++ b/.github/workflows/dbt_test_weekly.yml @@ -46,7 +46,6 @@ jobs: - name: Run DBT Jobs run: | dbt test --exclude tag:recent_test --models tag:full_test - continue-on-error: true notify-failure: needs: [run_dbt_jobs] diff --git a/data/github_actions__workflows.csv b/data/github_actions__workflows.csv index f77a3a3..77032aa 100644 --- a/data/github_actions__workflows.csv +++ b/data/github_actions__workflows.csv @@ -1,5 +1,5 @@ workflow_name,workflow_schedule dbt_run_streamline_realtime_blocks_transactions,"12,42 * * * *" -dbt_run_streamline_realtime_tx_receipts,"17,47 * * * *" +dbt_run_streamline_realtime_step_2,"20,52 * * * *" dbt_run_scheduled,"25,55 * * * *" -dbt_test_tasks,"28,58 * * * *" +dbt_test_tasks,"28,58 * * * *" \ No newline at end of file diff --git a/models/bronze/core/bronze__receipts.sql b/models/bronze/core/bronze__receipts.sql index e69de29..bf8c3ef 100644 --- a/models/bronze/core/bronze__receipts.sql +++ b/models/bronze/core/bronze__receipts.sql @@ -0,0 +1,40 @@ +{{ config ( + materialized = 'view', + tags = ['bronze','core','streamline_v1','phase_1'] +) }} + +WITH meta AS ( + SELECT + job_created_time AS _inserted_timestamp, + file_name, + CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 4), '_', 1) AS INTEGER) AS partition_key + FROM + TABLE( + information_schema.external_table_file_registration_history( + start_time => DATEADD('day', -3, CURRENT_TIMESTAMP()), + table_name => '{{ source( "bronze_streamline", "receipts_by_hash") }}' + ) + ) A +) + SELECT + s.*, + b.file_name, + b._inserted_timestamp, + COALESCE( + s.value :"BLOCK_NUMBER" :: STRING, + s.metadata :request :"data" :id :: STRING, + PARSE_JSON( + s.metadata :request :"data" + ) :id :: STRING + ) :: INT AS block_number, + s.value: "TX_HASH" :: STRING AS tx_hash + FROM + {{ source( "bronze_streamline", "receipts_by_hash") }} + s + JOIN meta b + ON b.file_name = metadata$filename + AND b.partition_key = s.partition_key + WHERE + b.partition_key = s.partition_key + AND DATA :error IS NULL + AND DATA IS NOT NULL \ No newline at end of file diff --git a/models/bronze/core/bronze__receipts_fr.sql b/models/bronze/core/bronze__receipts_fr.sql index e69de29..703c3fd 100644 --- a/models/bronze/core/bronze__receipts_fr.sql +++ b/models/bronze/core/bronze__receipts_fr.sql @@ -0,0 +1,28 @@ +{{ config ( + materialized = 'view', + tags = ['bronze','core','streamline_v1','phase_1'] +) }} + +SELECT + partition_key, + block_number, + tx_hash, + VALUE, + DATA, + metadata, + file_name, + _inserted_timestamp +FROM + {{ ref('bronze__receipts_fr_v2') }} +UNION ALL +SELECT + _partition_by_block_id AS partition_key, + block_number, + tx_hash, + VALUE, + DATA, + metadata, + file_name, + _inserted_timestamp +FROM + {{ ref('bronze__receipts_fr_v1') }} \ No newline at end of file diff --git a/models/bronze/core/bronze__receipts_fr_v2.sql b/models/bronze/core/bronze__receipts_fr_v2.sql index e69de29..6a16443 100644 --- a/models/bronze/core/bronze__receipts_fr_v2.sql +++ b/models/bronze/core/bronze__receipts_fr_v2.sql @@ -0,0 +1,40 @@ +{{ config ( + materialized = 'view', + tags = ['bronze','core','streamline_v1','phase_1'] +) }} + + WITH meta AS ( + SELECT + registered_on AS _inserted_timestamp, + file_name, + CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 4), '_', 1) AS INTEGER ) AS partition_key + FROM + TABLE( + information_schema.external_table_files( + table_name => '{{ source( "bronze_streamline", "receipts_by_hash") }}' + ) + ) A + ) +SELECT + s.*, + b.file_name, + b._inserted_timestamp, + COALESCE( + s.value :"BLOCK_NUMBER" :: STRING, + s.value :"block_number" :: STRING, + s.metadata :request :"data" :id :: STRING, + PARSE_JSON( + s.metadata :request :"data" + ) :id :: STRING + ) :: INT AS block_number, + s.value: "TX_HASH" :: STRING AS tx_hash +FROM + {{ source( "bronze_streamline", "receipts_by_hash") }} + s + JOIN meta b + ON b.file_name = metadata$filename + AND b.partition_key = s.partition_key +WHERE + b.partition_key = s.partition_key + AND DATA :error IS NULL + AND DATA IS NOT NULL \ No newline at end of file diff --git a/models/bronze/core/bronze__traces.sql b/models/bronze/core/bronze__traces.sql index e69de29..dc0bca8 100644 --- a/models/bronze/core/bronze__traces.sql +++ b/models/bronze/core/bronze__traces.sql @@ -0,0 +1,40 @@ +{{ config ( + materialized = 'view', + tags = ['bronze','core','streamline_v1','phase_1'] +) }} + +WITH meta AS ( + SELECT + job_created_time AS _inserted_timestamp, + file_name, + CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 4), '_', 1) AS INTEGER) AS partition_key + FROM + TABLE( + information_schema.external_table_file_registration_history( + start_time => DATEADD('day', -3, CURRENT_TIMESTAMP()), + table_name => '{{ source( "bronze_streamline", "traces_by_hash") }}' + ) + ) A +) + SELECT + s.*, + b.file_name, + b._inserted_timestamp, + COALESCE( + s.value :"BLOCK_NUMBER" :: STRING, + s.metadata :request :"data" :id :: STRING, + PARSE_JSON( + s.metadata :request :"data" + ) :id :: STRING + ) :: INT AS block_number, + s.value: "TX_HASH" :: STRING AS tx_hash + FROM + {{ source( "bronze_streamline", "traces_by_hash") }} + s + JOIN meta b + ON b.file_name = metadata$filename + AND b.partition_key = s.partition_key + WHERE + b.partition_key = s.partition_key + AND DATA :error IS NULL + AND DATA IS NOT NULL \ No newline at end of file diff --git a/models/bronze/core/bronze__traces_fr.sql b/models/bronze/core/bronze__traces_fr.sql index e69de29..b420e1c 100644 --- a/models/bronze/core/bronze__traces_fr.sql +++ b/models/bronze/core/bronze__traces_fr.sql @@ -0,0 +1,28 @@ +{{ config ( + materialized = 'view', + tags = ['bronze','core','streamline_v1','phase_1'] +) }} + +SELECT + partition_key, + block_number, + tx_hash, + VALUE, + DATA, + metadata, + file_name, + _inserted_timestamp +FROM + {{ ref('bronze__traces_fr_v2') }} +UNION ALL +SELECT + _partition_by_block_id AS partition_key, + block_number, + tx_hash, + VALUE, + DATA, + metadata, + file_name, + _inserted_timestamp +FROM + {{ ref('bronze__traces_fr_v1') }} \ No newline at end of file diff --git a/models/bronze/core/bronze__traces_fr_v2.sql b/models/bronze/core/bronze__traces_fr_v2.sql index e69de29..5ecc5eb 100644 --- a/models/bronze/core/bronze__traces_fr_v2.sql +++ b/models/bronze/core/bronze__traces_fr_v2.sql @@ -0,0 +1,40 @@ +{{ config ( + materialized = 'view', + tags = ['bronze','core','streamline_v1','phase_1'] +) }} + + WITH meta AS ( + SELECT + registered_on AS _inserted_timestamp, + file_name, + CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 4), '_', 1) AS INTEGER ) AS partition_key + FROM + TABLE( + information_schema.external_table_files( + table_name => '{{ source( "bronze_streamline", "traces_by_hash") }}' + ) + ) A + ) +SELECT + s.*, + b.file_name, + b._inserted_timestamp, + COALESCE( + s.value :"BLOCK_NUMBER" :: STRING, + s.value :"block_number" :: STRING, + s.metadata :request :"data" :id :: STRING, + PARSE_JSON( + s.metadata :request :"data" + ) :id :: STRING + ) :: INT AS block_number, + s.value: "TX_HASH" :: STRING AS tx_hash +FROM + {{ source( "bronze_streamline", "traces_by_hash") }} + s + JOIN meta b + ON b.file_name = metadata$filename + AND b.partition_key = s.partition_key +WHERE + b.partition_key = s.partition_key + AND DATA :error IS NULL + AND DATA IS NOT NULL \ No newline at end of file diff --git a/models/silver/core/silver__blocks.sql b/models/silver/core/silver__blocks.sql index 301667d..3931375 100644 --- a/models/silver/core/silver__blocks.sql +++ b/models/silver/core/silver__blocks.sql @@ -4,7 +4,7 @@ incremental_strategy = 'delete+insert', unique_key = "block_number", cluster_by = "block_timestamp::date", - tags = ['core'] + tags = ['core','streamline_core_evm_realtime'] ) }} SELECT @@ -49,7 +49,7 @@ SELECT DATA :result :logsBloom :: STRING AS logs_bloom, DATA :result :stateRoot :: STRING AS state_root, DATA :result :transactionsRoot :: STRING AS transactions_root, - _partition_by_block_id, + partition_key as _partition_by_block_id, _inserted_timestamp, SYSDATE() AS inserted_timestamp, SYSDATE() AS modified_timestamp, diff --git a/models/silver/core/silver__transactions.sql b/models/silver/core/silver__transactions.sql index 972c06a..104ccd1 100644 --- a/models/silver/core/silver__transactions.sql +++ b/models/silver/core/silver__transactions.sql @@ -4,7 +4,7 @@ incremental_strategy = 'delete+insert', unique_key = "tx_id", cluster_by = "block_timestamp::date, _inserted_timestamp::date", - tags = ['core'] + tags = ['core','streamline_core_evm_realtime'] ) }} WITH base AS ( diff --git a/models/silver/streamline/complete/streamline__complete_blocks.sql b/models/silver/streamline/complete/streamline__complete_blocks.sql index 31cc0c4..ef189ee 100644 --- a/models/silver/streamline/complete/streamline__complete_blocks.sql +++ b/models/silver/streamline/complete/streamline__complete_blocks.sql @@ -3,11 +3,16 @@ materialized = "incremental", unique_key = "id", cluster_by = "ROUND(block_number, -3)", - post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION on equality(id)" + post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION on equality(id)", + tags = ['streamline_core_evm_realtime'] ) }} SELECT - id, + MD5( + CAST( + COALESCE(CAST(block_number AS text), '' :: STRING) AS text + ) + ) AS id, block_number, _inserted_timestamp, DATA :result :transactions AS transactions diff --git a/models/silver/streamline/complete/streamline__complete_traces.sql b/models/silver/streamline/complete/streamline__complete_traces.sql index b1f87ea..6a5c6b7 100644 --- a/models/silver/streamline/complete/streamline__complete_traces.sql +++ b/models/silver/streamline/complete/streamline__complete_traces.sql @@ -3,11 +3,16 @@ materialized = "incremental", unique_key = "id", cluster_by = "ROUND(block_number, -3)", - post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION on equality(id)" + post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION on equality(id)", + tags = ['streamline_core_evm_realtime_step_2'] ) }} SELECT - id, + MD5( + CAST( + COALESCE(CAST(CONCAT(block_number, '_-_', COALESCE(tx_hash, '')) AS text), '' :: STRING) AS text + ) + ) AS id, block_number, tx_hash, _inserted_timestamp diff --git a/models/silver/streamline/complete/streamline__complete_transactions.sql b/models/silver/streamline/complete/streamline__complete_transactions.sql index 05b8770..95a4dd4 100644 --- a/models/silver/streamline/complete/streamline__complete_transactions.sql +++ b/models/silver/streamline/complete/streamline__complete_transactions.sql @@ -3,13 +3,17 @@ materialized = "incremental", unique_key = "id", cluster_by = "ROUND(block_number, -3)", - post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION on equality(id)" + post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION on equality(id)", + tags = ['streamline_core_evm_realtime'] ) }} SELECT - id, + MD5( + CAST( + COALESCE(CAST(block_number AS text), '' :: STRING) AS text + ) + ) AS id, block_number, - tx_hash, _inserted_timestamp FROM diff --git a/models/silver/streamline/complete/streamline__complete_tx_receipts.sql b/models/silver/streamline/complete/streamline__complete_tx_receipts.sql index 0b63985..6205e4f 100644 --- a/models/silver/streamline/complete/streamline__complete_tx_receipts.sql +++ b/models/silver/streamline/complete/streamline__complete_tx_receipts.sql @@ -2,11 +2,17 @@ {{ config ( materialized = "incremental", unique_key = "id", - post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION on equality(id)" + post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION on equality(id)", + tags = ['streamline_core_evm_realtime_step_2'] ) }} SELECT - id, + MD5( + CAST( + COALESCE(CAST(CONCAT(block_number, '_-_', COALESCE(tx_hash, '')) AS text), '' :: STRING) AS text + ) + ) AS id, + block_number, tx_hash, _inserted_timestamp FROM diff --git a/models/silver/streamline/history/streamline__blocks_transactions_history.sql b/models/silver/streamline/history/streamline__blocks_transactions_history.sql new file mode 100644 index 0000000..1126f26 --- /dev/null +++ b/models/silver/streamline/history/streamline__blocks_transactions_history.sql @@ -0,0 +1,48 @@ +{{ config ( + materialized = "view", + post_hook = fsc_utils.if_data_call_function_v2( + func = 'streamline.udf_bulk_rest_api_v2', + target = "{{this.schema}}.{{this.identifier}}", + params ={ "external_table" :'blocks_transactions', + "sql_limit" :"120000", + "producer_batch_size" :"12000", + "worker_batch_size" :"4000", + "sql_source" :'{{this.identifier}}', + "exploded_key": tojson(['result', 'result.transactions']) } + ), + tags = ['streamline_core_evm_history'] +) }} + +with blocks as ( + select + block_number + from {{ ref('streamline__blocks') }} + except + select + block_number + from {{ ref('streamline__complete_blocks') }} + inner join {{ ref('streamline__complete_transactions') }} using (block_number) +) +SELECT + block_number, + ROUND(block_number, -3) AS partition_key, + live.udf_api( + 'POST', + '{URL}', + OBJECT_CONSTRUCT( + 'Content-Type', 'application/json', + 'fsc-quantum-state', 'streamline' + ), + OBJECT_CONSTRUCT( + 'id', block_number, + 'jsonrpc', '2.0', + 'method', 'eth_getBlockByNumber', + 'params', ARRAY_CONSTRUCT(utils.udf_int_to_hex(block_number), TRUE) + ), + 'Vault/prod/evm/aurora/mainnet' + ) AS request +from blocks + +order by block_number desc + +limit 120000 \ No newline at end of file diff --git a/models/silver/streamline/history/streamline__receipts_by_hash_history.sql b/models/silver/streamline/history/streamline__receipts_by_hash_history.sql new file mode 100644 index 0000000..d57701e --- /dev/null +++ b/models/silver/streamline/history/streamline__receipts_by_hash_history.sql @@ -0,0 +1,53 @@ +{{ config ( + materialized = "view", + post_hook = fsc_utils.if_data_call_function_v2( + func = 'streamline.udf_bulk_rest_api_v2', + target = "{{this.schema}}.{{this.identifier}}", + params ={ "external_table" :'receipts_by_hash', + "sql_limit" :"120000", + "producer_batch_size" :"12000", + "worker_batch_size" :"4000", + "sql_source" :'{{this.identifier}}' } + ), + tags = ['streamline_core_evm_history'] +) }} + +with txs as ( + select + block_number, + tx_hash + from {{ ref('silver__transactions') }} + except + select + block_number, + tx_hash + from {{ ref('streamline__complete_tx_receipts') }} +) + +SELECT + block_number, + tx_hash, + ROUND( + block_number, + -3 + ) AS partition_key, + live.udf_api( + 'POST', + '{URL}', + OBJECT_CONSTRUCT( + 'Content-Type', 'application/json', + 'fsc-quantum-state', 'streamline' + ), + OBJECT_CONSTRUCT( + 'id', block_number, + 'jsonrpc', '2.0', + 'method', 'eth_getTransactionReceipt', + 'params', ARRAY_CONSTRUCT(tx_hash) + ), + 'Vault/prod/evm/aurora/mainnet' + ) AS request +from txs + +order by block_number desc + +limit 120000 \ No newline at end of file diff --git a/models/silver/streamline/history/streamline__traces_by_hash_history.sql b/models/silver/streamline/history/streamline__traces_by_hash_history.sql new file mode 100644 index 0000000..b3c71f3 --- /dev/null +++ b/models/silver/streamline/history/streamline__traces_by_hash_history.sql @@ -0,0 +1,52 @@ +{{ config ( + materialized = "view", + post_hook = fsc_utils.if_data_call_function_v2( + func = 'streamline.udf_bulk_rest_api_v2', + target = "{{this.schema}}.{{this.identifier}}", + params ={ + "external_table" :'traces_by_hash', + "sql_limit" :"120000", + "producer_batch_size" :"12000", + "worker_batch_size" :"4000", + "sql_source" :'{{this.identifier}}' + } + ), + tags = ['streamline_core_evm_history'] +) }} + +with txs as ( + select + block_number, + tx_hash + from {{ ref('silver__transactions') }} + except + select + block_number, + tx_hash + from {{ ref('streamline__complete_traces') }} +) + +SELECT + block_number, + tx_hash, + ROUND(block_number, -3) AS partition_key, + live.udf_api( + 'POST', + '{URL}', + OBJECT_CONSTRUCT( + 'Content-Type', 'application/json', + 'fsc-quantum-state', 'streamline' + ), + OBJECT_CONSTRUCT( + 'id', block_number, + 'jsonrpc', '2.0', + 'method', 'debug_traceTransaction', + 'params', ARRAY_CONSTRUCT(tx_hash, OBJECT_CONSTRUCT('tracer', 'callTracer', 'timeout', '120s')) + ), + 'Vault/prod/evm/aurora/mainnet' + ) AS request +from txs + +order by block_number desc + +limit 120000 \ No newline at end of file diff --git a/models/silver/streamline/history/streamline__traces_history.sql b/models/silver/streamline/history/streamline__traces_history.sql deleted file mode 100644 index 4909cdf..0000000 --- a/models/silver/streamline/history/streamline__traces_history.sql +++ /dev/null @@ -1,40 +0,0 @@ -{{ config ( - materialized = "view", - post_hook = if_data_call_function( - func = "{{this.schema}}.udf_json_rpc(object_construct('sql_source', '{{this.identifier}}', 'external_table', 'traces', 'producer_batch_size',20000, 'producer_limit_size',5000000, 'worker_batch_size',200))", - target = "{{this.schema}}.{{this.identifier}}" - ) -) }} - -WITH tbl AS ( - - SELECT - block_number, - tx_hash - FROM - {{ ref("silver__transactions") }} - WHERE - block_number IS NOT NULL - AND tx_hash IS NOT NULL - EXCEPT - SELECT - block_number, - tx_hash - FROM - {{ ref("streamline__complete_traces") }} - WHERE - block_number IS NOT NULL - AND tx_hash IS NOT NULL -) -SELECT - block_number, - 'debug_traceTransaction' AS method, - CONCAT( - tx_hash, - '_-_', - '{"tracer": "callTracer","timeout": "30s"}' - ) AS params -FROM - tbl -ORDER BY - block_number ASC diff --git a/models/silver/streamline/realtime/streamline__blocks_transactions_realtime.sql b/models/silver/streamline/realtime/streamline__blocks_transactions_realtime.sql index 8ceb652..dacc989 100644 --- a/models/silver/streamline/realtime/streamline__blocks_transactions_realtime.sql +++ b/models/silver/streamline/realtime/streamline__blocks_transactions_realtime.sql @@ -4,17 +4,29 @@ func = 'streamline.udf_bulk_rest_api_v2', target = "{{this.schema}}.{{this.identifier}}", params ={ "external_table" :'blocks_transactions', - "sql_limit" :"30000", - "producer_batch_size" :"30000", - "worker_batch_size" :"10000", + "sql_limit" :"12000", + "producer_batch_size" :"12000", + "worker_batch_size" :"4000", "sql_source" :'{{this.identifier}}', "exploded_key": tojson(['result', 'result.transactions']) } ), tags = ['streamline_core_evm_realtime'] ) }} +with blocks as ( + select + block_number + from {{ ref('streamline__blocks') }} + where block_number >= (select block_number from {{ ref('_block_lookback') }}) + except + select + block_number + from {{ ref('streamline__complete_blocks') }} + inner join {{ ref('streamline__complete_transactions') }} using (block_number) + where block_number >= (select block_number from {{ ref('_block_lookback') }}) +) SELECT - 150815373 as block_number, + block_number, ROUND(block_number, -3) AS partition_key, live.udf_api( 'POST', @@ -30,4 +42,9 @@ SELECT 'params', ARRAY_CONSTRUCT(utils.udf_int_to_hex(block_number), TRUE) ), 'Vault/prod/evm/aurora/mainnet' - ) AS request \ No newline at end of file + ) AS request +from blocks + +order by block_number asc + +limit 12000 \ No newline at end of file diff --git a/models/silver/streamline/realtime/streamline__receipts_by_hash_realtime.sql b/models/silver/streamline/realtime/streamline__receipts_by_hash_realtime.sql index b92808c..f2a4eaa 100644 --- a/models/silver/streamline/realtime/streamline__receipts_by_hash_realtime.sql +++ b/models/silver/streamline/realtime/streamline__receipts_by_hash_realtime.sql @@ -4,17 +4,31 @@ func = 'streamline.udf_bulk_rest_api_v2', target = "{{this.schema}}.{{this.identifier}}", params ={ "external_table" :'receipts_by_hash', - "sql_limit" :"30000", - "producer_batch_size" :"30000", - "worker_batch_size" :"10000", + "sql_limit" :"12000", + "producer_batch_size" :"12000", + "worker_batch_size" :"4000", "sql_source" :'{{this.identifier}}' } ), - tags = ['streamline','core','realtime','phase_1'] + tags = ['streamline_core_evm_realtime_step_2'] ) }} +with txs as ( + select + block_number, + tx_hash + from {{ ref('silver__transactions') }} + where block_number >= (select block_number from {{ ref('_block_lookback') }}) + except + select + block_number, + tx_hash + from {{ ref('streamline__complete_tx_receipts') }} + where block_number >= (select block_number from {{ ref('_block_lookback') }}) +) + SELECT - 150949168 as block_number, - '0x967586085f70584ab5e5886ba9c7d1b2f227f554ed594a491978e4a2110bfdd7' as tx_hash, + block_number, + tx_hash, ROUND( block_number, -3 @@ -33,4 +47,9 @@ SELECT 'params', ARRAY_CONSTRUCT(tx_hash) ), 'Vault/prod/evm/aurora/mainnet' - ) AS request \ No newline at end of file + ) AS request +from txs + +order by block_number asc + +limit 12000 \ No newline at end of file diff --git a/models/silver/streamline/realtime/streamline__traces_by_hash_realtime.sql b/models/silver/streamline/realtime/streamline__traces_by_hash_realtime.sql index 0306455..bbdf617 100644 --- a/models/silver/streamline/realtime/streamline__traces_by_hash_realtime.sql +++ b/models/silver/streamline/realtime/streamline__traces_by_hash_realtime.sql @@ -5,18 +5,32 @@ target = "{{this.schema}}.{{this.identifier}}", params ={ "external_table" :'traces_by_hash', - "sql_limit" :"30000", - "producer_batch_size" :"30000", - "worker_batch_size" :"10000", + "sql_limit" :"12000", + "producer_batch_size" :"12000", + "worker_batch_size" :"4000", "sql_source" :'{{this.identifier}}' } ), - tags = ['streamline_core_evm_realtime'] + tags = ['streamline_core_evm_realtime_step_2'] ) }} +with txs as ( + select + block_number, + tx_hash + from {{ ref('silver__transactions') }} + where block_number >= (select block_number from {{ ref('_block_lookback') }}) + except + select + block_number, + tx_hash + from {{ ref('streamline__complete_traces') }} + where block_number >= (select block_number from {{ ref('_block_lookback') }}) +) + SELECT - 150949168 as block_number, - '0x967586085f70584ab5e5886ba9c7d1b2f227f554ed594a491978e4a2110bfdd7' as tx_hash, + block_number, + tx_hash, ROUND(block_number, -3) AS partition_key, live.udf_api( 'POST', @@ -32,4 +46,9 @@ SELECT 'params', ARRAY_CONSTRUCT(tx_hash, OBJECT_CONSTRUCT('tracer', 'callTracer', 'timeout', '120s')) ), 'Vault/prod/evm/aurora/mainnet' - ) AS request \ No newline at end of file + ) AS request +from txs + +order by block_number asc + +limit 12000 \ No newline at end of file diff --git a/models/silver/streamline/streamline__blocks.sql b/models/silver/streamline/streamline__blocks.sql index 2445bdd..5eb7a8f 100644 --- a/models/silver/streamline/streamline__blocks.sql +++ b/models/silver/streamline/streamline__blocks.sql @@ -1,6 +1,6 @@ {{ config ( materialized = "view", - tags = ['streamline','core','chainhead','phase_1'] + tags = ['streamline_core_evm_realtime'] ) }} SELECT diff --git a/models/silver/streamline/streamline__get_chainhead.sql b/models/silver/streamline/streamline__get_chainhead.sql index 3965979..9bfdc74 100644 --- a/models/silver/streamline/streamline__get_chainhead.sql +++ b/models/silver/streamline/streamline__get_chainhead.sql @@ -1,6 +1,6 @@ {{ config ( materialized = 'table', - tags = ['streamline','core','chainhead','phase_1'] + tags = ['streamline_core_evm_realtime'] ) }} SELECT