From 05da02b5feb44b7e0ea62d4091e684b5e892c628 Mon Sep 17 00:00:00 2001 From: Jack Forgash <58153492+forgxyz@users.noreply.github.com> Date: Tue, 16 Sep 2025 20:32:27 -0600 Subject: [PATCH] AN-6550/flow testnet v2 (#483) * flow testnet v2 * set batch sizes * add QN node_url to sl testnet model * upd node_url in testnet rt models * add gha workflows for testnet * testnet gold models * upd tag on gold * upd tests * upd tests and set min to 280mm * upd test - rm null --------- Co-authored-by: shah --- .../workflows/dbt_run_scheduled_testnet.yml | 53 +++++ .../dbt_run_streamline_testnet_blocks.yml | 53 +++++ ...dbt_run_streamline_testnet_collections.yml | 53 +++++ ...streamline_testnet_transaction_results.yml | 53 +++++ ...bt_run_streamline_testnet_transactions.yml | 53 +++++ Makefile | 8 + .../bronze__streamline_fr_testnet_blocks.sql | 34 +-- ...nze__streamline_fr_testnet_collections.sql | 9 +- ...eamline_fr_testnet_transaction_results.sql | 9 +- ...ze__streamline_fr_testnet_transactions.sql | 9 +- .../bronze__streamline_testnet_blocks.sql | 35 +--- ...bronze__streamline_testnet_collections.sql | 9 +- ...streamline_testnet_transaction_results.sql | 9 +- ...ronze__streamline_testnet_transactions.sql | 10 +- models/gold/testnet/testnet__fact_blocks.sql | 23 +++ models/gold/testnet/testnet__fact_blocks.yml | 89 ++++++++ models/gold/testnet/testnet__fact_events.sql | 23 +++ models/gold/testnet/testnet__fact_events.yml | 87 ++++++++ .../testnet/testnet__fact_transactions.sql | 37 ++++ .../testnet/testnet__fact_transactions.yml | 127 ++++++++++++ .../silver/testnet/silver__testnet_blocks.sql | 144 +++++++++++++ .../silver/testnet/silver__testnet_blocks.yml | 55 +++++ .../testnet/silver__testnet_collections.sql | 44 ++++ .../testnet/silver__testnet_collections.yml | 37 ++++ .../silver/testnet/silver__testnet_events.sql | 143 +++++++++++++ .../silver/testnet/silver__testnet_events.yml | 95 +++++++++ .../silver__testnet_transaction_results.sql | 46 +++++ .../silver__testnet_transaction_results.yml | 43 ++++ .../testnet/silver__testnet_transactions.sql | 51 +++++ .../testnet/silver__testnet_transactions.yml | 58 ++++++ .../silver__testnet_transactions_final.sql | 195 ++++++++++++++++++ .../silver__testnet_transactions_final.yml | 140 +++++++++++++ models/sources.yml | 4 + ...treamline__get_testnet_blocks_realtime.sql | 10 +- ...line__get_testnet_collections_realtime.sql | 8 +- ...t_testnet_transaction_results_realtime.sql | 8 +- ...ine__get_testnet_transactions_realtime.sql | 8 +- .../core/streamline__testnet_blocks.sql | 2 +- 38 files changed, 1759 insertions(+), 115 deletions(-) create mode 100644 .github/workflows/dbt_run_scheduled_testnet.yml create mode 100644 .github/workflows/dbt_run_streamline_testnet_blocks.yml create mode 100644 .github/workflows/dbt_run_streamline_testnet_collections.yml create mode 100644 .github/workflows/dbt_run_streamline_testnet_transaction_results.yml create mode 100644 .github/workflows/dbt_run_streamline_testnet_transactions.yml create mode 100644 models/gold/testnet/testnet__fact_blocks.sql create mode 100644 models/gold/testnet/testnet__fact_blocks.yml create mode 100644 models/gold/testnet/testnet__fact_events.sql create mode 100644 models/gold/testnet/testnet__fact_events.yml create mode 100644 models/gold/testnet/testnet__fact_transactions.sql create mode 100644 models/gold/testnet/testnet__fact_transactions.yml create mode 100644 models/silver/testnet/silver__testnet_blocks.sql create mode 100644 models/silver/testnet/silver__testnet_blocks.yml create mode 100644 models/silver/testnet/silver__testnet_collections.sql create mode 100644 models/silver/testnet/silver__testnet_collections.yml create mode 100644 models/silver/testnet/silver__testnet_events.sql create mode 100644 models/silver/testnet/silver__testnet_events.yml create mode 100644 models/silver/testnet/silver__testnet_transaction_results.sql create mode 100644 models/silver/testnet/silver__testnet_transaction_results.yml create mode 100644 models/silver/testnet/silver__testnet_transactions.sql create mode 100644 models/silver/testnet/silver__testnet_transactions.yml create mode 100644 models/silver/testnet/silver__testnet_transactions_final.sql create mode 100644 models/silver/testnet/silver__testnet_transactions_final.yml diff --git a/.github/workflows/dbt_run_scheduled_testnet.yml b/.github/workflows/dbt_run_scheduled_testnet.yml new file mode 100644 index 0000000..f9ab20a --- /dev/null +++ b/.github/workflows/dbt_run_scheduled_testnet.yml @@ -0,0 +1,53 @@ +name: dbt_run_scheduled_testnet +run-name: dbt_run_scheduled_testnet + +on: + workflow_dispatch: + schedule: + # Every hour at minute 40 + - cron: "40 * * * *" + +env: + USE_VARS: "${{ vars.USE_VARS }}" + DBT_PROFILES_DIR: "${{ vars.DBT_PROFILES_DIR }}" + ACCOUNT: "${{ vars.ACCOUNT }}" + ROLE: "${{ vars.ROLE }}" + USER: "${{ vars.USER }}" + PASSWORD: "${{ secrets.PASSWORD }}" + REGION: "${{ vars.REGION }}" + DATABASE: "${{ vars.DATABASE }}" + WAREHOUSE: "${{ vars.WAREHOUSE }}" + SCHEMA: "${{ vars.SCHEMA }}" + +concurrency: + group: ${{ github.workflow }} + +jobs: + run_dbt_jobs: + runs-on: ubuntu-latest + environment: + name: workflow_prod + + steps: + - uses: actions/checkout@v3 + + - uses: actions/setup-python@v4 + with: + python-version: "3.10" + cache: "pip" + + - name: install dependencies + run: | + pip install -r requirements.txt + dbt deps + + - name: Update evm tables + run: | + dbt run -s tag:testnet + + notify-failure: + needs: [run_dbt_jobs] + if: failure() + uses: ./.github/workflows/slack_notify.yml + secrets: + SLACK_WEBHOOK_URL: ${{ secrets.SLACK_WEBHOOK_URL }} diff --git a/.github/workflows/dbt_run_streamline_testnet_blocks.yml b/.github/workflows/dbt_run_streamline_testnet_blocks.yml new file mode 100644 index 0000000..d30584e --- /dev/null +++ b/.github/workflows/dbt_run_streamline_testnet_blocks.yml @@ -0,0 +1,53 @@ +name: dbt_run_streamline_testnet_blocks +run-name: dbt_run_streamline_testnet_blocks + +on: + workflow_dispatch: + schedule: + # Hourly at minute 0 + - cron: "0 * * * *" + +env: + USE_VARS: "${{ vars.USE_VARS }}" + DBT_PROFILES_DIR: "${{ vars.DBT_PROFILES_DIR }}" + ACCOUNT: "${{ vars.ACCOUNT }}" + ROLE: "${{ vars.ROLE }}" + USER: "${{ vars.USER }}" + PASSWORD: "${{ secrets.PASSWORD }}" + REGION: "${{ vars.REGION }}" + DATABASE: "${{ vars.DATABASE }}" + WAREHOUSE: "${{ vars.WAREHOUSE }}" + SCHEMA: "${{ vars.SCHEMA }}" + +concurrency: + group: ${{ github.workflow }} + +jobs: + run_dbt_jobs: + runs-on: ubuntu-latest + environment: + name: workflow_prod + + steps: + - uses: actions/checkout@v3 + + - uses: actions/setup-python@v4 + with: + python-version: "3.10" + cache: "pip" + + - name: install dependencies + run: | + pip install -r requirements.txt + dbt deps + + - name: Run DBT Realtime + run: | + dbt run -s 2+streamline__get_testnet_blocks_realtime --vars '{"STREAMLINE_INVOKE_STREAMS": True}' + + notify-failure: + needs: [run_dbt_jobs] + if: failure() + uses: ./.github/workflows/slack_notify.yml + secrets: + SLACK_WEBHOOK_URL: ${{ secrets.SLACK_WEBHOOK_URL }} diff --git a/.github/workflows/dbt_run_streamline_testnet_collections.yml b/.github/workflows/dbt_run_streamline_testnet_collections.yml new file mode 100644 index 0000000..dc45574 --- /dev/null +++ b/.github/workflows/dbt_run_streamline_testnet_collections.yml @@ -0,0 +1,53 @@ +name: dbt_run_streamline_testnet_collections +run-name: dbt_run_streamline_testnet_collections + +on: + workflow_dispatch: + schedule: + # Hourly at minute 8 + - cron: "8 * * * *" + +env: + USE_VARS: "${{ vars.USE_VARS }}" + DBT_PROFILES_DIR: "${{ vars.DBT_PROFILES_DIR }}" + ACCOUNT: "${{ vars.ACCOUNT }}" + ROLE: "${{ vars.ROLE }}" + USER: "${{ vars.USER }}" + PASSWORD: "${{ secrets.PASSWORD }}" + REGION: "${{ vars.REGION }}" + DATABASE: "${{ vars.DATABASE }}" + WAREHOUSE: "${{ vars.WAREHOUSE }}" + SCHEMA: "${{ vars.SCHEMA }}" + +concurrency: + group: ${{ github.workflow }} + +jobs: + run_dbt_jobs: + runs-on: ubuntu-latest + environment: + name: workflow_prod + + steps: + - uses: actions/checkout@v3 + + - uses: actions/setup-python@v4 + with: + python-version: "3.10" + cache: "pip" + + - name: install dependencies + run: | + pip install -r requirements.txt + dbt deps + + - name: Run DBT Realtime + run: | + dbt run -s 2+streamline__get_testnet_collections_realtime --vars '{"STREAMLINE_INVOKE_STREAMS": True}' + + notify-failure: + needs: [run_dbt_jobs] + if: failure() + uses: ./.github/workflows/slack_notify.yml + secrets: + SLACK_WEBHOOK_URL: ${{ secrets.SLACK_WEBHOOK_URL }} diff --git a/.github/workflows/dbt_run_streamline_testnet_transaction_results.yml b/.github/workflows/dbt_run_streamline_testnet_transaction_results.yml new file mode 100644 index 0000000..395af0a --- /dev/null +++ b/.github/workflows/dbt_run_streamline_testnet_transaction_results.yml @@ -0,0 +1,53 @@ +name: dbt_run_streamline_testnet_transaction_results +run-name: dbt_run_streamline_testnet_transaction_results + +on: + workflow_dispatch: + schedule: + # Hourly at minute 18 + - cron: "18 * * * *" + +env: + USE_VARS: "${{ vars.USE_VARS }}" + DBT_PROFILES_DIR: "${{ vars.DBT_PROFILES_DIR }}" + ACCOUNT: "${{ vars.ACCOUNT }}" + ROLE: "${{ vars.ROLE }}" + USER: "${{ vars.USER }}" + PASSWORD: "${{ secrets.PASSWORD }}" + REGION: "${{ vars.REGION }}" + DATABASE: "${{ vars.DATABASE }}" + WAREHOUSE: "${{ vars.WAREHOUSE }}" + SCHEMA: "${{ vars.SCHEMA }}" + +concurrency: + group: ${{ github.workflow }} + +jobs: + run_dbt_jobs: + runs-on: ubuntu-latest + environment: + name: workflow_prod + + steps: + - uses: actions/checkout@v3 + + - uses: actions/setup-python@v4 + with: + python-version: "3.10" + cache: "pip" + + - name: install dependencies + run: | + pip install -r requirements.txt + dbt deps + + - name: Run DBT Realtime + run: | + dbt run -s 2+streamline__get_testnet_transaction_results_realtime --vars '{"STREAMLINE_INVOKE_STREAMS": True, "producer_batch_size": 60000, "worker_batch_size": 2000}' + + notify-failure: + needs: [run_dbt_jobs] + if: failure() + uses: ./.github/workflows/slack_notify.yml + secrets: + SLACK_WEBHOOK_URL: ${{ secrets.SLACK_WEBHOOK_URL }} diff --git a/.github/workflows/dbt_run_streamline_testnet_transactions.yml b/.github/workflows/dbt_run_streamline_testnet_transactions.yml new file mode 100644 index 0000000..f820549 --- /dev/null +++ b/.github/workflows/dbt_run_streamline_testnet_transactions.yml @@ -0,0 +1,53 @@ +name: dbt_run_streamline_testnet_transactions +run-name: dbt_run_streamline_testnet_transactions + +on: + workflow_dispatch: + schedule: + # Hourly at minute 18 + - cron: "18 * * * *" + +env: + USE_VARS: "${{ vars.USE_VARS }}" + DBT_PROFILES_DIR: "${{ vars.DBT_PROFILES_DIR }}" + ACCOUNT: "${{ vars.ACCOUNT }}" + ROLE: "${{ vars.ROLE }}" + USER: "${{ vars.USER }}" + PASSWORD: "${{ secrets.PASSWORD }}" + REGION: "${{ vars.REGION }}" + DATABASE: "${{ vars.DATABASE }}" + WAREHOUSE: "${{ vars.WAREHOUSE }}" + SCHEMA: "${{ vars.SCHEMA }}" + +concurrency: + group: ${{ github.workflow }} + +jobs: + run_dbt_jobs: + runs-on: ubuntu-latest + environment: + name: workflow_prod + + steps: + - uses: actions/checkout@v3 + + - uses: actions/setup-python@v4 + with: + python-version: "3.10" + cache: "pip" + + - name: install dependencies + run: | + pip install -r requirements.txt + dbt deps + + - name: Run DBT Realtime + run: | + dbt run -s 2+streamline__get_testnet_transactions_realtime --vars '{"STREAMLINE_INVOKE_STREAMS": True}' + + notify-failure: + needs: [run_dbt_jobs] + if: failure() + uses: ./.github/workflows/slack_notify.yml + secrets: + SLACK_WEBHOOK_URL: ${{ secrets.SLACK_WEBHOOK_URL }} diff --git a/Makefile b/Makefile index ae3c150..750ba92 100644 --- a/Makefile +++ b/Makefile @@ -119,3 +119,11 @@ bronze: --vars '{"STREAMLINE_USE_DEV_FOR_EXTERNAL_TABLES": True}' \ --profiles-dir ~/.dbt \ --target $(DBT_TARGET) + +blocks_testnet: + dbt run \ + --vars '{"STREAMLINE_INVOKE_STREAMS":True, "STREAMLINE_USE_DEV_FOR_EXTERNAL_TABLES": True}' \ + -m 1+models/streamline/core/realtime_testnet/streamline__get_testnet_blocks_realtime.sql \ + --profile flow \ + --target $(DBT_TARGET) \ + --profiles-dir ~/.dbt \ No newline at end of file diff --git a/models/bronze/streamline/testnet/bronze__streamline_fr_testnet_blocks.sql b/models/bronze/streamline/testnet/bronze__streamline_fr_testnet_blocks.sql index 3a15aca..2e13d57 100644 --- a/models/bronze/streamline/testnet/bronze__streamline_fr_testnet_blocks.sql +++ b/models/bronze/streamline/testnet/bronze__streamline_fr_testnet_blocks.sql @@ -2,33 +2,7 @@ materialized = 'view' ) }} -WITH meta AS ( - SELECT - registered_on AS _inserted_timestamp, - file_name, - CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 3), '_', 1) AS INTEGER) AS _partition_by_block_id - FROM - TABLE( - information_schema.external_table_files( - table_name => '{{ source( "bronze_streamline", "testnet_blocks") }}' - ) - ) A -) -SELECT - block_number, - DATA, - _inserted_timestamp, - MD5( - CAST( - COALESCE(CAST(block_number AS text), '' :: STRING) AS text - ) - ) AS _fsc_id, - s._partition_by_block_id, - s.value AS VALUE -FROM - {{ source("bronze_streamline", "testnet_blocks") }} s - JOIN meta b - ON b.file_name = metadata$filename - AND b._partition_by_block_id = s._partition_by_block_id -WHERE - b._partition_by_block_id = s._partition_by_block_id +{{ streamline_external_table_FR_query_v2( + model = 'testnet_blocks_v2', + partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 3), '_', 1) AS INTEGER)" +) }} \ No newline at end of file diff --git a/models/bronze/streamline/testnet/bronze__streamline_fr_testnet_collections.sql b/models/bronze/streamline/testnet/bronze__streamline_fr_testnet_collections.sql index e192cab..a9b00ac 100644 --- a/models/bronze/streamline/testnet/bronze__streamline_fr_testnet_collections.sql +++ b/models/bronze/streamline/testnet/bronze__streamline_fr_testnet_collections.sql @@ -2,10 +2,7 @@ materialized = 'view' ) }} -{% set model = this.identifier.split("_")[-2:] | join('_') %} -{{ streamline_external_table_FR_query( - model, - partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 3), '_', 1) AS INTEGER)", - partition_name = "_partition_by_block_id", - unique_key = "id" +{{ streamline_external_table_FR_query_v2( + model = 'testnet_collections_v2', + partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 3), '_', 1) AS INTEGER)" ) }} diff --git a/models/bronze/streamline/testnet/bronze__streamline_fr_testnet_transaction_results.sql b/models/bronze/streamline/testnet/bronze__streamline_fr_testnet_transaction_results.sql index 7fbacfa..8596055 100644 --- a/models/bronze/streamline/testnet/bronze__streamline_fr_testnet_transaction_results.sql +++ b/models/bronze/streamline/testnet/bronze__streamline_fr_testnet_transaction_results.sql @@ -2,12 +2,9 @@ materialized = 'view' ) }} -{% set model = this.identifier.split("_")[-3:] | join('_') %} -{{ streamline_external_table_FR_query( - model, - partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 3), '_', 1) AS INTEGER)", - partition_name = "_partition_by_block_id", - unique_key = "id" +{{ streamline_external_table_FR_query_v2( + model = 'testnet_transaction_results_v2', + partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 3), '_', 1) AS INTEGER)" ) }} diff --git a/models/bronze/streamline/testnet/bronze__streamline_fr_testnet_transactions.sql b/models/bronze/streamline/testnet/bronze__streamline_fr_testnet_transactions.sql index e192cab..e508c7e 100644 --- a/models/bronze/streamline/testnet/bronze__streamline_fr_testnet_transactions.sql +++ b/models/bronze/streamline/testnet/bronze__streamline_fr_testnet_transactions.sql @@ -2,10 +2,7 @@ materialized = 'view' ) }} -{% set model = this.identifier.split("_")[-2:] | join('_') %} -{{ streamline_external_table_FR_query( - model, - partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 3), '_', 1) AS INTEGER)", - partition_name = "_partition_by_block_id", - unique_key = "id" +{{ streamline_external_table_FR_query_v2( + model = 'testnet_transactions_v2', + partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 3), '_', 1) AS INTEGER)" ) }} diff --git a/models/bronze/streamline/testnet/bronze__streamline_testnet_blocks.sql b/models/bronze/streamline/testnet/bronze__streamline_testnet_blocks.sql index 24c26fa..636839e 100644 --- a/models/bronze/streamline/testnet/bronze__streamline_testnet_blocks.sql +++ b/models/bronze/streamline/testnet/bronze__streamline_testnet_blocks.sql @@ -2,34 +2,7 @@ materialized = 'view' ) }} -WITH meta AS ( - SELECT - last_modified AS _inserted_timestamp, - file_name, - CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 3), '_', 1) AS INTEGER) AS _partition_by_block_id - FROM - TABLE( - information_schema.external_table_file_registration_history( - start_time => DATEADD('day', -3, CURRENT_TIMESTAMP()), - table_name => '{{ source( "bronze_streamline", "testnet_blocks") }}') - ) A - ) - SELECT - block_number, - DATA, - _inserted_timestamp, - MD5( - CAST( - COALESCE(CAST(block_number AS text), '' :: STRING) AS text - ) - ) AS _fsc_id, - s._partition_by_block_id, - s.value AS VALUE - FROM - {{ source("bronze_streamline","testnet_blocks") }} s - JOIN meta b - ON b.file_name = metadata$filename - AND b._partition_by_block_id = s._partition_by_block_id - WHERE - b._partition_by_block_id = s._partition_by_block_id - +{{ streamline_external_table_query_v2( + model = "testnet_blocks_v2", + partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 3), '_', 1) AS INTEGER )" +) }} \ No newline at end of file diff --git a/models/bronze/streamline/testnet/bronze__streamline_testnet_collections.sql b/models/bronze/streamline/testnet/bronze__streamline_testnet_collections.sql index 545031e..bc2ff44 100644 --- a/models/bronze/streamline/testnet/bronze__streamline_testnet_collections.sql +++ b/models/bronze/streamline/testnet/bronze__streamline_testnet_collections.sql @@ -2,10 +2,7 @@ materialized = 'view' ) }} -{% set model = this.identifier.split("_")[-2:] | join('_') %} -{{ streamline_external_table_query( - model, - partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 3), '_', 1) AS INTEGER)", - partition_name = "_partition_by_block_id", - unique_key = "id" +{{ streamline_external_table_query_v2( + model = 'testnet_collections_v2', + partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 3), '_', 1) AS INTEGER)" ) }} diff --git a/models/bronze/streamline/testnet/bronze__streamline_testnet_transaction_results.sql b/models/bronze/streamline/testnet/bronze__streamline_testnet_transaction_results.sql index bd31738..bccb2ae 100644 --- a/models/bronze/streamline/testnet/bronze__streamline_testnet_transaction_results.sql +++ b/models/bronze/streamline/testnet/bronze__streamline_testnet_transaction_results.sql @@ -2,10 +2,7 @@ materialized = 'view' ) }} -{% set model = this.identifier.split("_")[-3:] | join('_') %} -{{ streamline_external_table_query( - model, - partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 3), '_', 1) AS INTEGER)", - partition_name = "_partition_by_block_id", - unique_key = "id" +{{ streamline_external_table_query_v2( + model = 'testnet_transaction_results_v2', + partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 3), '_', 1) AS INTEGER)" ) }} diff --git a/models/bronze/streamline/testnet/bronze__streamline_testnet_transactions.sql b/models/bronze/streamline/testnet/bronze__streamline_testnet_transactions.sql index 545031e..16b3c5e 100644 --- a/models/bronze/streamline/testnet/bronze__streamline_testnet_transactions.sql +++ b/models/bronze/streamline/testnet/bronze__streamline_testnet_transactions.sql @@ -2,10 +2,8 @@ materialized = 'view' ) }} -{% set model = this.identifier.split("_")[-2:] | join('_') %} -{{ streamline_external_table_query( - model, - partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 3), '_', 1) AS INTEGER)", - partition_name = "_partition_by_block_id", - unique_key = "id" + +{{ streamline_external_table_query_v2( + model = 'testnet_transactions_v2', + partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 3), '_', 1) AS INTEGER)" ) }} diff --git a/models/gold/testnet/testnet__fact_blocks.sql b/models/gold/testnet/testnet__fact_blocks.sql new file mode 100644 index 0000000..4ab966a --- /dev/null +++ b/models/gold/testnet/testnet__fact_blocks.sql @@ -0,0 +1,23 @@ +{{ config( + materialized = 'view', + tags = ['testnet'] +) }} + +SELECT + block_height :: INT AS block_height, + block_timestamp, + 'testnet' AS network, + network_version, + 'flow' AS chain_id, + tx_count, + id, + parent_id, + COALESCE ( + blocks_id, + {{ dbt_utils.generate_surrogate_key(['block_height']) }} + ) AS fact_blocks_id, + _inserted_timestamp, + inserted_timestamp, + modified_timestamp +FROM + {{ ref('silver__testnet_blocks') }} diff --git a/models/gold/testnet/testnet__fact_blocks.yml b/models/gold/testnet/testnet__fact_blocks.yml new file mode 100644 index 0000000..f308738 --- /dev/null +++ b/models/gold/testnet/testnet__fact_blocks.yml @@ -0,0 +1,89 @@ +version: 2 + +models: + - name: testnet__fact_blocks + description: "{{ doc('core__fact_blocks') }}" + tests: + - sequence_gaps: + column_name: block_height + where: BLOCK_TIMESTAMP::DATE < CURRENT_DATE + severity: warn + + columns: + - name: BLOCK_HEIGHT + description: "{{ doc('block_height') }}" + tests: + - not_null + - unique + - dbt_expectations.expect_column_values_to_be_in_type_list: + column_type_list: + - NUMBER + - FLOAT + + - name: BLOCK_TIMESTAMP + description: "{{ doc('block_timestamp') }}" + tests: + - not_null + - dbt_expectations.expect_row_values_to_have_recent_data: + datepart: day + interval: 1 + - dbt_expectations.expect_column_values_to_be_in_type_list: + column_type_list: + - TIMESTAMP_NTZ + + - name: NETWORK + description: "{{ doc('network') }}" + tests: + - not_null + - dbt_expectations.expect_column_values_to_be_in_type_list: + column_type_list: + - STRING + - VARCHAR + + - name: NETWORK_VERSION + description: "{{ doc('network_version') }}" + + - name: CHAIN_ID + description: "{{ doc('chain_id') }}" + tests: + - not_null + - dbt_expectations.expect_column_values_to_be_in_type_list: + column_type_list: + - STRING + - VARCHAR + + - name: TX_COUNT + description: "{{ doc('tx_count') }}" + tests: + - not_null: + where: inserted_timestamp <= SYSDATE() - interval '12 hours' + - dbt_expectations.expect_column_values_to_be_in_type_list: + column_type_list: + - NUMBER + + - name: ID + description: "{{ doc('id') }}" + tests: + - not_null + - dbt_expectations.expect_column_values_to_be_in_type_list: + column_type_list: + - STRING + - VARCHAR + + - name: PARENT_ID + description: "{{ doc('parent_id') }}" + tests: + - not_null + - dbt_expectations.expect_column_values_to_be_in_type_list: + column_type_list: + - STRING + - VARCHAR + + - name: FACT_BLOCKS_ID + description: "{{ doc('pk_id') }}" + + - name: INSERTED_TIMESTAMP + description: "{{ doc('inserted_timestamp') }}" + + - name: MODIFIED_TIMESTAMP + description: "{{ doc('modified_timestamp') }}" diff --git a/models/gold/testnet/testnet__fact_events.sql b/models/gold/testnet/testnet__fact_events.sql new file mode 100644 index 0000000..bffca8a --- /dev/null +++ b/models/gold/testnet/testnet__fact_events.sql @@ -0,0 +1,23 @@ +{{ config( + materialized = 'view', + tags = ['testnet'] +) }} + +SELECT + tx_id, + block_timestamp, + block_height :: INT AS block_height, + tx_succeeded, + event_index, + event_contract, + event_type, + event_data, + COALESCE ( + streamline_event_id, + {{ dbt_utils.generate_surrogate_key(['tx_id']) }} + ) AS fact_events_id, + _inserted_timestamp, + inserted_timestamp, + modified_timestamp +FROM + {{ ref('silver__testnet_events') }} diff --git a/models/gold/testnet/testnet__fact_events.yml b/models/gold/testnet/testnet__fact_events.yml new file mode 100644 index 0000000..5503ea5 --- /dev/null +++ b/models/gold/testnet/testnet__fact_events.yml @@ -0,0 +1,87 @@ +version: 2 + +models: + - name: testnet__fact_events + description: "{{ doc('core__fact_events') }}" + tests: + - dbt_utils.unique_combination_of_columns: + combination_of_columns: + - tx_id + - event_index + + columns: + - name: TX_ID + description: "{{ doc('tx_id') }}" + tests: + - not_null + + - name: BLOCK_TIMESTAMP + description: "{{ doc('block_timestamp') }}" + tests: + - not_null + - dbt_expectations.expect_row_values_to_have_recent_data: + datepart: day + interval: 1 + where: block_height >= 280000000 + - dbt_expectations.expect_column_values_to_be_in_type_list: + column_type_list: + - TIMESTAMP_NTZ + + - name: BLOCK_HEIGHT + description: "{{ doc('block_height') }}" + tests: + - not_null + - dbt_expectations.expect_column_values_to_be_in_type_list: + column_type_list: + - NUMBER + - FLOAT + + - name: TX_SUCCEEDED + description: "{{ doc('tx_succeeded') }}" + tests: + - not_null + - dbt_expectations.expect_column_values_to_be_in_type_list: + column_type_list: + - BOOLEAN + + - name: EVENT_INDEX + description: "{{ doc('event_index') }}" + tests: + - not_null + - dbt_expectations.expect_column_values_to_be_in_type_list: + column_type_list: + - NUMBER + + - name: EVENT_CONTRACT + description: "{{ doc('event_contract') }}" + tests: + - not_null + - dbt_expectations.expect_column_values_to_be_in_type_list: + column_type_list: + - STRING + - VARCHAR + + - name: EVENT_TYPE + description: "{{ doc('event_type') }}" + tests: + - not_null + - dbt_expectations.expect_column_values_to_be_in_type_list: + column_type_list: + - STRING + - VARCHAR + + - name: EVENT_DATA + description: "{{ doc('event_attributes') }}" + tests: + - dbt_expectations.expect_column_values_to_be_in_type_list: + column_type_list: + - OBJECT + + - name: FACT_EVENTS_ID + description: "{{ doc('pk_id') }}" + + - name: INSERTED_TIMESTAMP + description: "{{ doc('inserted_timestamp') }}" + + - name: MODIFIED_TIMESTAMP + description: "{{ doc('modified_timestamp') }}" diff --git a/models/gold/testnet/testnet__fact_transactions.sql b/models/gold/testnet/testnet__fact_transactions.sql new file mode 100644 index 0000000..fcf8164 --- /dev/null +++ b/models/gold/testnet/testnet__fact_transactions.sql @@ -0,0 +1,37 @@ +{{ config( + materialized = 'view', + tags = ['testnet'] +) }} + +SELECT + tx_id, + block_timestamp, + block_height :: INT AS block_height, + 'flow' AS chain_id, + proposer, + payer, + authorizers, + count_authorizers, + gas_limit, + script, + arguments, + OBJECT_CONSTRUCT( + 'error', + error_message, + 'events', + events, + 'status', + status + ) AS transaction_result, + tx_succeeded, + error_message AS error_msg, + COALESCE ( + streamline_transaction_id, + {{ dbt_utils.generate_surrogate_key(['tx_id']) }} + ) AS fact_transactions_id, + inserted_timestamp, + modified_timestamp +FROM + {{ ref('silver__testnet_transactions_final') }} +WHERE + NOT pending_result_response diff --git a/models/gold/testnet/testnet__fact_transactions.yml b/models/gold/testnet/testnet__fact_transactions.yml new file mode 100644 index 0000000..e8808c4 --- /dev/null +++ b/models/gold/testnet/testnet__fact_transactions.yml @@ -0,0 +1,127 @@ +version: 2 + +models: + - name: testnet__fact_transactions + description: "{{ doc('core__fact_transactions') }}" + + columns: + - name: TX_ID + description: "{{ doc('tx_id') }}" + tests: + - not_null + - unique + + - name: BLOCK_TIMESTAMP + description: "{{ doc('block_timestamp') }}" + tests: + - not_null + - dbt_expectations.expect_row_values_to_have_recent_data: + datepart: day + interval: 1 + where: block_height >= 280000000 + - dbt_expectations.expect_column_values_to_be_in_type_list: + column_type_list: + - TIMESTAMP_NTZ + + - name: BLOCK_HEIGHT + description: "{{ doc('block_height') }}" + tests: + - not_null + - dbt_expectations.expect_column_values_to_be_in_type_list: + column_type_list: + - NUMBER + - FLOAT + + - name: CHAIN_ID + description: "{{ doc('chain_id') }}" + tests: + - not_null + - dbt_expectations.expect_column_values_to_be_in_type_list: + column_type_list: + - STRING + - VARCHAR + + - name: TX_INDEX + description: "{{ doc('tx_index') }}" + + - name: PROPOSER + description: "{{ doc('proposer') }}" + tests: + - not_null + - dbt_expectations.expect_column_values_to_be_in_type_list: + column_type_list: + - STRING + - VARCHAR + + - name: PAYER + description: "{{ doc('payer') }}" + tests: + - not_null + - dbt_expectations.expect_column_values_to_be_in_type_list: + column_type_list: + - STRING + - VARCHAR + + - name: AUTHORIZERS + description: "{{ doc('authorizers') }}" + tests: + - dbt_expectations.expect_column_values_to_be_in_type_list: + column_type_list: + - ARRAY + + - name: COUNT_AUTHORIZERS + description: "{{ doc('count_authorizers') }}" + tests: + - dbt_expectations.expect_column_values_to_be_in_type_list: + column_type_list: + - NUMBER + + - name: GAS_LIMIT + description: "{{ doc('gas_limit') }}" + tests: + - not_null + - dbt_expectations.expect_column_values_to_be_in_type_list: + column_type_list: + - NUMBER + + - name: SCRIPT + description: "{{ doc('script') }}" + + - name: ARGUMENTS + description: "{{ doc('arguments') }}" + + - name: TRANSACTION_RESULT + description: "{{ doc('transaction_result') }}" + tests: + - not_null + - dbt_expectations.expect_column_values_to_be_in_type_list: + column_type_list: + - VARIANT + - OBJECT + + - name: TX_SUCCEEDED + description: "{{ doc('tx_succeeded') }}" + tests: + - not_null + - dbt_expectations.expect_column_values_to_be_in_type_list: + column_type_list: + - BOOLEAN + + - name: ERROR_MSG + description: "{{ doc('error_msg') }}" + tests: + - not_null: + where: not TX_SUCCEEDED + - dbt_expectations.expect_column_values_to_be_in_type_list: + column_type_list: + - STRING + - VARCHAR + + - name: FACT_TRANSACTIONS_ID + description: "{{ doc('pk_id') }}" + + - name: INSERTED_TIMESTAMP + description: "{{ doc('inserted_timestamp') }}" + + - name: MODIFIED_TIMESTAMP + description: "{{ doc('modified_timestamp') }}" diff --git a/models/silver/testnet/silver__testnet_blocks.sql b/models/silver/testnet/silver__testnet_blocks.sql new file mode 100644 index 0000000..bec53f3 --- /dev/null +++ b/models/silver/testnet/silver__testnet_blocks.sql @@ -0,0 +1,144 @@ +-- depends_on: {{ ref('bronze__streamline_testnet_blocks') }} +-- depends_on: {{ ref('bronze__streamline_fr_testnet_blocks') }} + +{{ config( + materialized = 'incremental', + unique_key = "block_number", + incremental_strategy = 'merge', + incremental_predicates = ["dynamic_range_predicate", "_partition_by_block_id"], + merge_exclude_columns = ["inserted_timestamp"], + cluster_by = "block_timestamp::date", + tags = ['testnet'] +) }} + +WITH + +{% if is_incremental() %} +tx_count_lookback AS ( + -- lookback to ensure tx count is correct + + SELECT + block_height + FROM + {{ this }} + WHERE + block_height >= {{ var( + 'STREAMLINE_START_BLOCK' + ) }} -- TODO, remove AFTER backfill is complete + -- limit to 3 day lookback for performance + AND _inserted_timestamp >= SYSDATE() - INTERVAL '3 days' + AND ( + tx_count IS NULL + OR collection_count != collection_count_agg + ) +), +{% endif %} + +streamline_blocks AS ( + SELECT + block_number, + DATA: height :: STRING AS block_height, + DATA: id :: STRING AS block_id, + DATA :timestamp :: timestamp_ntz AS block_timestamp, + ARRAY_SIZE( + DATA :collection_guarantees :: ARRAY + ) AS collection_count, + DATA: parent_id :: STRING AS parent_id, + DATA: signatures :: ARRAY AS signatures, + DATA: collection_guarantees :: ARRAY AS collection_guarantees, + DATA: block_seals :: ARRAY AS block_seals, + _partition_by_block_id, + _inserted_timestamp + FROM + +{% if is_incremental() %} +{{ ref('bronze__streamline_testnet_blocks') }} +WHERE + _inserted_timestamp >= ( + SELECT + MAX(_inserted_timestamp) _inserted_timestamp + FROM + {{ this }} + ) + OR block_height IN ( + SELECT + block_height + FROM + tx_count_lookback + ) +{% else %} + {{ ref('bronze__streamline_fr_testnet_blocks') }} +{% endif %} + +qualify(ROW_NUMBER() over (PARTITION BY block_number +ORDER BY + _inserted_timestamp DESC)) = 1 +), +collections AS ( + SELECT + * + FROM + {{ ref('silver__testnet_collections') }} + +{% if is_incremental() %} +WHERE + _inserted_timestamp >= ( + SELECT + MAX(_inserted_timestamp) + FROM + {{ this }} + ) + OR block_number IN ( + SELECT + block_height + FROM + tx_count_lookback + ) +{% endif %} + +), +tx_count AS ( + SELECT + block_number AS block_height, + SUM(tx_count) AS tx_count, + COUNT(1) AS collection_count, + MIN(_inserted_timestamp) AS _inserted_timestamp + FROM + collections + GROUP BY + 1 +), +FINAL AS ( + SELECT + b.block_number, + b.block_height, + NULL AS network_version, + b.block_id AS id, + b.block_timestamp, + b.collection_count, + IFF( + b.collection_count = 0, + b.collection_count, + C.tx_count + ) AS tx_count, + b.parent_id, + b.signatures, + b.collection_guarantees, + b.block_seals, + C.collection_count AS collection_count_agg, + b._partition_by_block_id, + {{ dbt_utils.generate_surrogate_key( + ['block_number'] + ) }} AS blocks_id, + SYSDATE() AS inserted_timestamp, + SYSDATE() AS modified_timestamp, + '{{ invocation_id }}' AS _invocation_id, + b._inserted_timestamp + FROM + streamline_blocks b + LEFT JOIN tx_count C USING (block_height) +) +SELECT + * +FROM + FINAL diff --git a/models/silver/testnet/silver__testnet_blocks.yml b/models/silver/testnet/silver__testnet_blocks.yml new file mode 100644 index 0000000..9436346 --- /dev/null +++ b/models/silver/testnet/silver__testnet_blocks.yml @@ -0,0 +1,55 @@ +version: 2 + +models: + - name: silver__testnet_blocks + description: -| + Initial table for the gRPC blocks response, loading data into Snowflake from the external AWS table. + + columns: + - name: BLOCK_NUMBER + description: "{{ doc('block_number') }}" + + - name: BLOCK_HEIGHT + description: "{{ doc('block_height') }}" + + - name: ID + description: "{{ doc('id') }}" + + - name: BLOCK_TIMESTAMP + description: "{{ doc('block_timestamp') }}" + + - name: COLLECTION_COUNT + description: "{{ doc('collection_count') }}" + + - name: PARENT_ID + description: "{{ doc('parent_id') }}" + + - name: SIGNATURES + description: "{{ doc('signatures') }}" + + - name: COLLECTION_GUARANTEES + description: "{{ doc('collection_guarantees') }}" + + - name: BLOCK_SEALS + description: "{{ doc('block_seals') }}" + + - name: COLLECTION_COUNT_AGG + description: "{{ doc('collection_count_agg') }}" + + - name: _PARTITION_BY_BLOCK_ID + description: "{{ doc('_partition_by_block_id') }}" + + - name: _INSERTED_TIMESTAMP + description: "{{ doc('_inserted_timestamp') }}" + + - name: BLOCKS_ID + description: "{{ doc('pk_id') }}" + + - name: INSERTED_TIMESTAMP + description: "{{ doc('inserted_timestamp') }}" + + - name: MODIFIED_TIMESTAMP + description: "{{ doc('modified_timestamp') }}" + + - name: _INVOCATION_ID + description: "{{ doc('invocation_id') }}" diff --git a/models/silver/testnet/silver__testnet_collections.sql b/models/silver/testnet/silver__testnet_collections.sql new file mode 100644 index 0000000..225f9fd --- /dev/null +++ b/models/silver/testnet/silver__testnet_collections.sql @@ -0,0 +1,44 @@ +-- depends_on: {{ ref('bronze__streamline_testnet_collections') }} +-- depends_on: {{ ref('bronze__streamline_fr_testnet_collections') }} +{{ config( + materialized = 'incremental', + unique_key = "collection_id", + incremental_strategy = 'merge', + incremental_predicates = ["dynamic_range_predicate", "block_number"], + merge_exclude_columns = ["inserted_timestamp"], + cluster_by = ['_inserted_timestamp :: DATE', 'block_number'], + tags = ['testnet'] +) }} + +SELECT + block_number, + DATA: id :: STRING AS collection_id, + ARRAY_SIZE( + DATA :transaction_ids :: ARRAY + ) AS tx_count, + DATA: transaction_ids :: ARRAY AS transaction_ids, + _partition_by_block_id, + {{ dbt_utils.generate_surrogate_key( + ['collection_id'] + ) }} AS streamline_collection_id, + _inserted_timestamp, + SYSDATE() AS inserted_timestamp, + SYSDATE() AS modified_timestamp, + '{{ invocation_id }}' AS _invocation_id +FROM +{% if is_incremental() %} +{{ ref('bronze__streamline_testnet_collections') }} +WHERE + _inserted_timestamp >= ( + SELECT + MAX(_inserted_timestamp) _inserted_timestamp + FROM + {{ this }} + ) +{% else %} + {{ ref('bronze__streamline_fr_testnet_collections') }} +{% endif %} + +qualify(ROW_NUMBER() over (PARTITION BY collection_id +ORDER BY + _inserted_timestamp DESC)) = 1 diff --git a/models/silver/testnet/silver__testnet_collections.yml b/models/silver/testnet/silver__testnet_collections.yml new file mode 100644 index 0000000..72d7538 --- /dev/null +++ b/models/silver/testnet/silver__testnet_collections.yml @@ -0,0 +1,37 @@ +version: 2 + +models: + - name: silver__testnet_collections + description: -| + Initial table for the gRPC collections response, loading data into Snowflake from the external AWS table. + + columns: + - name: BLOCK_NUMBER + description: "{{ doc('block_number') }}" + + - name: COLLECTION_ID + description: "{{ doc('collection_id') }}" + + - name: TX_COUNT + description: "{{ doc('tx_count') }}" + + - name: TRANSACTION_IDS + description: "{{ doc('transaction_ids') }}" + + - name: _PARTITION_BY_BLOCK_ID + description: "{{ doc('_partition_by_block_id') }}" + + - name: _INSERTED_TIMESTAMP + description: "{{ doc('_inserted_timestamp') }}" + + - name: streamline_collection_id + description: "{{ doc('pk_id') }}" + + - name: INSERTED_TIMESTAMP + description: "{{ doc('inserted_timestamp') }}" + + - name: MODIFIED_TIMESTAMP + description: "{{ doc('modified_timestamp') }}" + + - name: _INVOCATION_ID + description: "{{ doc('invocation_id') }}" diff --git a/models/silver/testnet/silver__testnet_events.sql b/models/silver/testnet/silver__testnet_events.sql new file mode 100644 index 0000000..a10ad51 --- /dev/null +++ b/models/silver/testnet/silver__testnet_events.sql @@ -0,0 +1,143 @@ +{{ config( + materialized = 'incremental', + unique_key = 'event_id', + incremental_strategy = 'merge', + incremental_predicates = ["dynamic_range_predicate", "block_timestamp::date"], + merge_exclude_columns = ["inserted_timestamp"], + cluster_by = "block_timestamp::date", + post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION ON EQUALITY(tx_id,event_id,event_contract,event_type);", + tags = ['testnet'] +) }} + +WITH transactions AS ( + + SELECT + * + FROM + {{ ref('silver__testnet_transactions_final') }} + WHERE + NOT pending_result_response + +{% if is_incremental() %} +AND modified_timestamp >= ( + SELECT + MAX(modified_timestamp) + FROM + {{ this }} +) +{% endif %} +), +flatten_events AS ( + SELECT + block_height, + block_timestamp, + tx_id, + tx_succeeded, + events_count, + VALUE :: variant AS event_data_full, + VALUE :event_index :: INT AS event_index, + concat_ws( + '-', + tx_id, + event_index + ) AS event_id, + VALUE :payload :: STRING AS payload, + TRY_PARSE_JSON(utils.udf_hex_to_string(payload)) AS decoded_payload, + VALUE :type :: STRING AS event_type_id, + VALUE :values :: variant AS event_values, + COALESCE( + SUBSTR( + VALUE :type :: STRING, + 0, + LENGTH( + VALUE :type :: STRING + ) - LENGTH(SPLIT(VALUE :type :: STRING, '.') [3]) - 1 + ), + -- if null, then flow. + SPLIT( + VALUE :type :: STRING, + '.' + ) [0] + ) AS event_contract, + COALESCE( + SPLIT( + VALUE :type :: STRING, + '.' + ) [3], + -- if null, then flow. + SPLIT( + VALUE :type :: STRING, + '.' + ) [1] + ) :: STRING AS event_type, + _inserted_timestamp, + _partition_by_block_id + FROM + transactions t, + LATERAL FLATTEN( + input => events + ) e + QUALIFY ROW_NUMBER() OVER (PARTITION BY event_id ORDER BY _inserted_timestamp DESC) = 1 + +), +attributes AS ( + SELECT + event_id, + OBJECT_AGG( + data_key, + IFF(IS_ARRAY(TRY_PARSE_JSON(data_value)) OR IS_OBJECT(TRY_PARSE_JSON(data_value)), PARSE_JSON(data_value)::VARIANT, data_value::VARIANT) + ) AS event_data + FROM + ( + SELECT + event_id, + VALUE :name :: variant AS data_key, + COALESCE( + VALUE :value :value :fields, + VALUE :value :value :staticType, + VALUE :value :value :value :value :: STRING, + VALUE :value :value :value :: STRING, + VALUE :value :value :: STRING, + 'null' + ) AS data_value + FROM + flatten_events, + LATERAL FLATTEN ( + COALESCE( + decoded_payload :value :fields :: variant, + event_values :value :fields :: variant + ) + ) + ) + GROUP BY + 1 + ), + FINAL AS ( + SELECT + e.tx_id, + e.block_height, + e.block_timestamp, + e.event_id, + e.event_index, + e.events_count, + e.payload, + e.event_contract, + e.event_type, + A.event_data, + e.tx_succeeded, + e._inserted_timestamp, + e._partition_by_block_id, + {{ dbt_utils.generate_surrogate_key( + ['event_id'] + ) }} AS streamline_event_id, + SYSDATE() AS inserted_timestamp, + SYSDATE() AS modified_timestamp, + '{{ invocation_id }}' AS _invocation_id + FROM + flatten_events e + LEFT JOIN attributes A USING (event_id) + ) + SELECT + * + FROM + FINAL diff --git a/models/silver/testnet/silver__testnet_events.yml b/models/silver/testnet/silver__testnet_events.yml new file mode 100644 index 0000000..e1ab5cd --- /dev/null +++ b/models/silver/testnet/silver__testnet_events.yml @@ -0,0 +1,95 @@ +version: 2 + +models: + - name: silver__testnet_events + description: |- + This table records events from each transaction on the FLOW testnet blockchain. + tests: + - dbt_utils.recency: + datepart: minutes + field: block_timestamp + interval: 360 + - dbt_utils.recency: + datepart: hours + field: _inserted_timestamp + interval: 6 + + columns: + - name: tx_id + description: "{{ doc('tx_id') }}" + tests: + - not_null + + - name: block_height + description: "{{ doc('block_height') }}" + tests: + - not_null + + - name: block_timestamp + description: "{{ doc('block_timestamp') }}" + tests: + - not_null + - dbt_expectations.expect_column_values_to_be_of_type: + column_type: TIMESTAMP_NTZ + + - name: event_id + description: "{{ doc('event_id') }}" + tests: + - not_null + - unique + + - name: event_index + description: "{{ doc('event_index') }}" + tests: + - not_null + - dbt_expectations.expect_column_values_to_be_of_type: + column_type: NUMBER + + - name: EVENT_COUNT + description: "{{ doc('event_count') }}" + + - name: payload + description: "{{ doc('payload') }}" + tests: + - not_null + + - name: event_contract + description: "{{ doc('event_contract') }}" + tests: + - not_null + + - name: event_type + description: "{{ doc('event_type') }}" + tests: + - not_null + + - name: event_data + description: "{{ doc('event_attributes') }}" + tests: + - dbt_expectations.expect_column_values_to_be_of_type: + column_type: OBJECT + + - name: tx_succeeded + description: "{{ doc('tx_succeeded') }}" + tests: + - not_null + - dbt_expectations.expect_column_values_to_be_of_type: + column_type: BOOLEAN + + - name: _inserted_timestamp + description: "{{ doc('_inserted_timestamp') }}" + + - name: _partition_by_block_id + description: "{{ doc('_partition_by_block_id') }}" + + - name: streamline_event_id + description: "{{ doc('pk_id') }}" + + - name: INSERTED_TIMESTAMP + description: "{{ doc('inserted_timestamp') }}" + + - name: MODIFIED_TIMESTAMP + description: "{{ doc('modified_timestamp') }}" + + - name: _INVOCATION_ID + description: "{{ doc('invocation_id') }}" diff --git a/models/silver/testnet/silver__testnet_transaction_results.sql b/models/silver/testnet/silver__testnet_transaction_results.sql new file mode 100644 index 0000000..abc0de8 --- /dev/null +++ b/models/silver/testnet/silver__testnet_transaction_results.sql @@ -0,0 +1,46 @@ +-- depends_on: {{ ref('bronze__streamline_testnet_transaction_results') }} +-- depends_on: {{ ref('bronze__streamline_fr_testnet_transaction_results') }} +{{ config( + materialized = 'incremental', + incremental_predicates = ["dynamic_range_predicate", "_partition_by_block_id"], + unique_key = "tx_id", + incremental_strategy = 'merge', + merge_exclude_columns = ["inserted_timestamp"], + cluster_by = ["block_number","_inserted_timestamp::date"], + tags = ['testnet'] +) }} + +SELECT + block_number, + id AS tx_id, + DATA :error_message :: STRING AS error_message, + DATA :events :: ARRAY AS events, + DATA :status :: INT AS status, + DATA :status_code :: INT AS status_code, + _partition_by_block_id, + _inserted_timestamp, + {{ dbt_utils.generate_surrogate_key( + ['tx_id'] + ) }} AS tx_results_id, + SYSDATE() AS inserted_timestamp, + SYSDATE() AS modified_timestamp, + '{{ invocation_id }}' AS _invocation_id +FROM + +{% if is_incremental() %} +{{ ref('bronze__streamline_testnet_transaction_results') }} +WHERE + _inserted_timestamp >= ( + SELECT + MAX(_inserted_timestamp) _inserted_timestamp + FROM + {{ this }} + ) + +{% else %} + {{ ref('bronze__streamline_fr_testnet_transaction_results') }} +{% endif %} + +qualify(ROW_NUMBER() over (PARTITION BY tx_id +ORDER BY + _inserted_timestamp DESC)) = 1 diff --git a/models/silver/testnet/silver__testnet_transaction_results.yml b/models/silver/testnet/silver__testnet_transaction_results.yml new file mode 100644 index 0000000..27c380e --- /dev/null +++ b/models/silver/testnet/silver__testnet_transaction_results.yml @@ -0,0 +1,43 @@ +version: 2 + +models: + - name: silver__testnet_transaction_results + description: -| + Initial table for the gRPC transaction results response, loading data into Snowflake from the external AWS table. + + columns: + - name: BLOCK_NUMBER + description: "{{ doc('block_number') }}" + + - name: TX_ID + description: "{{ doc('tx_id') }}" + + - name: ERROR_MESSAGE + description: "{{ doc('error_message') }}" + + - name: EVENTS + description: "{{ doc('events') }}" + + - name: STATUS + description: "{{ doc('status') }}" + + - name: STATUS_CODE + description: "{{ doc('status_code') }}" + + - name: _PARTITION_BY_BLOCK_ID + description: "{{ doc('_partition_by_block_id') }}" + + - name: _INSERTED_TIMESTAMP + description: "{{ doc('_inserted_timestamp') }}" + + - name: tx_results_id + description: "{{ doc('pk_id') }}" + + - name: INSERTED_TIMESTAMP + description: "{{ doc('inserted_timestamp') }}" + + - name: MODIFIED_TIMESTAMP + description: "{{ doc('modified_timestamp') }}" + + - name: _INVOCATION_ID + description: "{{ doc('invocation_id') }}" diff --git a/models/silver/testnet/silver__testnet_transactions.sql b/models/silver/testnet/silver__testnet_transactions.sql new file mode 100644 index 0000000..b3e4547 --- /dev/null +++ b/models/silver/testnet/silver__testnet_transactions.sql @@ -0,0 +1,51 @@ +-- depends_on: {{ ref('bronze__streamline_testnet_transactions') }} +-- depends_on: {{ ref('bronze__streamline_fr_testnet_transactions') }} +{{ config( + materialized = 'incremental', + unique_key = "tx_id", + incremental_strategy = 'merge', + incremental_predicates = ["dynamic_range_predicate", "_partition_by_block_id"], + merge_exclude_columns = ["inserted_timestamp"], + cluster_by = "_inserted_timestamp::date", + tags = ['testnet'] +) }} + +SELECT + block_number, + DATA: reference_block_id :: STRING AS block_id, + id AS tx_id, + DATA: gas_limit :: NUMBER AS gas_limit, + DATA: payer :: STRING AS payer, + DATA: arguments :: ARRAY AS arguments, + DATA: authorizers :: ARRAY AS authorizers, + DATA: envelope_signatures :: ARRAY AS envelope_signatures, + DATA: payload_signatures :: ARRAY AS payload_signatures, + DATA: proposal_key :: variant AS proposal_key, + DATA: script :: STRING AS script, + {{ dbt_utils.generate_surrogate_key( + ['tx_id'] + ) }} AS streamline_tx_id, + SYSDATE() AS inserted_timestamp, + SYSDATE() AS modified_timestamp, + '{{ invocation_id }}' AS _invocation_id, + _partition_by_block_id, + _inserted_timestamp +FROM + + +{% if is_incremental() %} +{{ ref('bronze__streamline_testnet_transactions') }} +WHERE + _inserted_timestamp >= ( + SELECT + MAX(_inserted_timestamp) _inserted_timestamp + FROM + {{ this }} + ) +{% else %} + {{ ref('bronze__streamline_fr_testnet_transactions') }} +{% endif %} + +qualify(ROW_NUMBER() over (PARTITION BY tx_id +ORDER BY + _inserted_timestamp DESC)) = 1 diff --git a/models/silver/testnet/silver__testnet_transactions.yml b/models/silver/testnet/silver__testnet_transactions.yml new file mode 100644 index 0000000..d289312 --- /dev/null +++ b/models/silver/testnet/silver__testnet_transactions.yml @@ -0,0 +1,58 @@ +version: 2 + +models: + - name: silver__testnet_transactions + description: -| + Initial table for the gRPC transactions response, loading data into Snowflake from the external AWS table. + + columns: + - name: BLOCK_NUMBER + description: "{{ doc('block_number') }}" + + - name: BLOCK_ID + description: "{{ doc('block_id') }}" + + - name: TX_ID + description: "{{ doc('tx_id') }}" + + - name: GAS_LIMIT + description: "{{ doc('gas_limit') }}" + + - name: PAYER + description: "{{ doc('payer') }}" + + - name: ARGUMENTS + description: "{{ doc('arguments') }}" + + - name: AUTHORIZERS + description: "{{ doc('authorizers') }}" + + - name: ENVELOPE_SIGNATURES + description: "{{ doc('envelope_signatures') }}" + + - name: PAYLOAD_SIGNATURES + description: "{{ doc('payload_signatures') }}" + + - name: PROPOSAL_KEY + description: "{{ doc('proposal_key') }}" + + - name: SCRIPT + description: "{{ doc('script') }}" + + - name: _PARTITION_BY_BLOCK_ID + description: "{{ doc('_partition_by_block_id') }}" + + - name: _INSERTED_TIMESTAMP + description: "{{ doc('_inserted_timestamp') }}" + + - name: streamline_tx_id + description: "{{ doc('pk_id') }}" + + - name: INSERTED_TIMESTAMP + description: "{{ doc('inserted_timestamp') }}" + + - name: MODIFIED_TIMESTAMP + description: "{{ doc('modified_timestamp') }}" + + - name: _INVOCATION_ID + description: "{{ doc('invocation_id') }}" diff --git a/models/silver/testnet/silver__testnet_transactions_final.sql b/models/silver/testnet/silver__testnet_transactions_final.sql new file mode 100644 index 0000000..eb51b0a --- /dev/null +++ b/models/silver/testnet/silver__testnet_transactions_final.sql @@ -0,0 +1,195 @@ +-- depends_on: {{ ref('silver__testnet_transactions') }} +{{ config( + materialized = 'incremental', + unique_key = "tx_id", + incremental_strategy = 'merge', + incremental_predicates = ["dynamic_range_predicate", "_partition_by_block_id"], + merge_exclude_columns = ["inserted_timestamp"], + cluster_by = "block_timestamp::date", + post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION ON EQUALITY(tx_id,proposer,payer,authorizers);", + tags = ['testnet'] +) }} + +{% if execute %} + {% if is_incremental() %} + {% set query = """ + CREATE OR REPLACE TEMPORARY TABLE silver.testnet_transactions_final_intermediate_tmp AS + WITH retry_tx_ids AS ( + SELECT + tx_id, + block_height + FROM """ ~ this ~ """ + WHERE + modified_timestamp >= SYSDATE() - INTERVAL '""" ~ var('RETRY_WINDOW', 3) ~ """ days' + AND ( + block_timestamp IS NULL + OR pending_result_response + ) + ) + SELECT + * + FROM + """ ~ ref('silver__testnet_transactions') ~ """ + WHERE + modified_timestamp >= ( + SELECT + MAX(modified_timestamp) modified_timestamp + FROM + """ ~ this ~ """ + ) + OR -- re-run record if block comes in later than tx records + ( + modified_timestamp >= SYSDATE() - INTERVAL '""" ~ var('RETRY_WINDOW', 3) ~ """ days' + AND + tx_id IN ( + SELECT + tx_id + FROM + retry_tx_ids + ) + ) + """ %} + {% else %} + {% set query = """ + CREATE OR REPLACE TEMPORARY TABLE silver.testnet_transactions_final_intermediate_tmp AS + SELECT + * + FROM + """ ~ ref('silver__testnet_transactions') ~ """ + """ %} + {% endif %} + + {% set run = run_query(query) %} +{% endif %} +/* + Do this because snowflake does not do well with dynamic query pruning. + This will set a "static" timestamp value which will always enable query pruning if the timestamp is a cluster key + Coalesce in case there are 0 txs returned by the temp table +*/ +{% if execute %} + {% set min_time = run_query("select coalesce(min(modified_timestamp),sysdate()) from silver.testnet_transactions_final_intermediate_tmp").columns [0].values() [0] %} +{% endif %} + +WITH txs AS ( + + SELECT + * + FROM + silver.testnet_transactions_final_intermediate_tmp +), +tx_results AS ( + SELECT + * + FROM + {{ ref('silver__testnet_transaction_results') }} + +{% if is_incremental() %} +WHERE + modified_timestamp >= SYSDATE() - INTERVAL '{{ var('RETRY_WINDOW', 3) }} days' + AND tx_id IN ( + SELECT + DISTINCT tx_id + FROM + silver.testnet_transactions_final_intermediate_tmp + ) +{% endif %} + +), +blocks AS ( + SELECT + * + FROM + {{ ref('silver__testnet_blocks') }} + + +{% if is_incremental() %} +WHERE + modified_timestamp >= SYSDATE() - INTERVAL '{{ var('RETRY_WINDOW', 3) }} days' + AND block_number IN ( + SELECT + DISTINCT block_number + FROM + silver.testnet_transactions_final_intermediate_tmp + ) +{% endif %} + +), +FINAL AS ( + SELECT + COALESCE( + t.tx_id, + tr.tx_id + ) AS tx_id, + tr.status IS NULL AS pending_result_response, + t.block_number, + b.block_timestamp, + t.gas_limit, + CONCAT( + '0x', + payer + ) AS payer, + t.arguments, + {{ target.database }}.silver.udf_address_array_adj( + t.authorizers + ) AS authorizers, + ARRAY_SIZE( + t.authorizers + ) AS count_authorizers, + t.envelope_signatures, + t.payload_signatures, + t.proposal_key, + CONCAT( + '0x', + t.proposal_key: address :: STRING + ) AS proposer, + t.script, + tr.error_message, + tr.events, + ARRAY_SIZE( + tr.events + ) AS events_count, + tr.status, + tr.status_code, + GREATEST( + [b._inserted_timestamp], + [tr._inserted_timestamp], + [t._inserted_timestamp] + ) [0] :: timestamp_ntz AS _inserted_timestamp, + t._partition_by_block_id + FROM + txs t + LEFT JOIN tx_results tr USING (tx_id) + LEFT JOIN blocks b + ON t.block_number = b.block_number +) +SELECT + tx_id, + pending_result_response, + block_timestamp, + block_number AS block_height, + gas_limit, + payer, + arguments, + authorizers, + count_authorizers, + envelope_signatures, + payload_signatures, + proposal_key, + proposer, + script, + events, + events_count, + status, + status_code, + error_message, + NOT status_code :: BOOLEAN AS tx_succeeded, + _inserted_timestamp, + _partition_by_block_id, + {{ dbt_utils.generate_surrogate_key( + ['tx_id'] + ) }} AS streamline_transaction_id, + SYSDATE() AS inserted_timestamp, + SYSDATE() AS modified_timestamp, + '{{ invocation_id }}' AS _invocation_id +FROM + FINAL diff --git a/models/silver/testnet/silver__testnet_transactions_final.yml b/models/silver/testnet/silver__testnet_transactions_final.yml new file mode 100644 index 0000000..3baad53 --- /dev/null +++ b/models/silver/testnet/silver__testnet_transactions_final.yml @@ -0,0 +1,140 @@ +version: 2 + +models: + - name: silver__testnet_transactions_final + description: |- + This table records all the transactions of the FLOW testnet blockchain. + tests: + - dbt_utils.recency: + datepart: minutes + field: block_timestamp + interval: 360 + - dbt_utils.recency: + datepart: hours + field: _inserted_timestamp + interval: 6 + + columns: + - name: tx_id + description: "{{ doc('tx_id') }}" + tests: + - not_null + - unique + + - name: pending_result_response + description: "{{ doc('pending_result_response') }}" + tests: + - not_null + - dbt_expectations.expect_column_values_to_be_of_type: + column_type: BOOLEAN + - dbt_expectations.expect_column_values_to_be_in_set: + value_set: [true, false] + row_condition: "date_trunc('day', block_timestamp) <= SYSDATE() - interval '1 day' AND block_height >= 280000000" + config: + severity: error + error_if: ">50" + + - name: block_timestamp + description: "{{ doc('block_timestamp') }}" + tests: + - not_null: + where: block_height >= {{ var('STREAMLINE_START_BLOCK' )}} + - dbt_expectations.expect_column_values_to_be_of_type: + column_type: TIMESTAMP_NTZ + + - name: block_height + description: "{{ doc('block_height') }}" + tests: + - not_null + + - name: gas_limit + description: "{{ doc('gas_limit') }}" + tests: + - not_null + - dbt_expectations.expect_column_values_to_be_of_type: + column_type: NUMBER + + - name: payer + description: "{{ doc('payer') }}" + tests: + - not_null + + - name: arguments + description: "{{ doc('arguments') }}" + + - name: authorizers + description: "{{ doc('authorizers') }}" + + - name: count_authorizers + description: "{{ doc('count_authorizers') }}" + + - name: envelope_signatures + description: "{{ doc('envelope_signatures') }}" + + - name: payload_signatures + description: "{{ doc('payload_signatures') }}" + + - name: proposal_key + description: "{{ doc('proposal_key') }}" + + - name: proposer + description: "{{ doc('proposer') }}" + + - name: script + description: "{{ doc('script') }}" + + - name: events + description: "{{ doc('events') }}" + tests: + - dbt_expectations.expect_column_values_to_be_of_type: + column_type: ARRAY + + - name: EVENT_COUNT + description: "{{ doc('event_count') }}" + + - name: status + description: "{{ doc('status') }}" + tests: + - not_null: + where: not pending_result_response + - dbt_expectations.expect_column_values_to_be_of_type: + column_type: NUMBER + + - name: status_code + description: "{{ doc('status_code') }}" + tests: + - not_null: + where: not pending_result_response + - dbt_expectations.expect_column_values_to_be_of_type: + column_type: NUMBER + + - name: error_message + description: "{{ doc('error_message') }}" + + - name: tx_succeeded + description: "{{ doc('tx_succeeded') }}" + tests: + - not_null: + where: not pending_result_response + - dbt_expectations.expect_column_values_to_be_of_type: + column_type: BOOLEAN + + - name: _inserted_timestamp + description: "{{ doc('_inserted_timestamp') }}" + tests: + - not_null + + - name: _partition_by_block_id + description: "{{ doc('_partition_by_block_id') }}" + + - name: streamline_transaction_id + description: "{{ doc('pk_id') }}" + + - name: INSERTED_TIMESTAMP + description: "{{ doc('inserted_timestamp') }}" + + - name: MODIFIED_TIMESTAMP + description: "{{ doc('modified_timestamp') }}" + + - name: _INVOCATION_ID + description: "{{ doc('invocation_id') }}" diff --git a/models/sources.yml b/models/sources.yml index 49cb564..90c8535 100644 --- a/models/sources.yml +++ b/models/sources.yml @@ -112,9 +112,13 @@ sources: - name: TRANSACTION_RESULTS_MAINNET_21 - name: TRANSACTION_RESULTS_MAINNET_22 - name: testnet_blocks + - name: testnet_blocks_v2 - name: testnet_collections + - name: testnet_collections_v2 - name: testnet_transactions + - name: testnet_transactions_v2 - name: testnet_transaction_results + - name: testnet_transaction_results_v2 - name: evm_blocks - name: evm_receipts - name: evm_traces diff --git a/models/streamline/core/realtime_testnet/streamline__get_testnet_blocks_realtime.sql b/models/streamline/core/realtime_testnet/streamline__get_testnet_blocks_realtime.sql index e144a8d..6efb5c7 100644 --- a/models/streamline/core/realtime_testnet/streamline__get_testnet_blocks_realtime.sql +++ b/models/streamline/core/realtime_testnet/streamline__get_testnet_blocks_realtime.sql @@ -1,14 +1,14 @@ {{ config ( materialized = "view", post_hook = if_data_call_function( - func = "{{this.schema}}.udf_bulk_grpc_us_east_2(object_construct('sql_source', '{{this.identifier}}','node_url','access.devnet.nodes.onflow.org:9000','external_table', 'testnet_blocks', 'sql_limit', {{var('sql_limit','500000')}}, 'producer_batch_size', {{var('producer_batch_size','10000')}}, 'worker_batch_size', {{var('worker_batch_size','1000')}}, 'batch_call_limit', {{var('batch_call_limit','1')}}))", + func = "{{this.schema}}.udf_bulk_grpc_v2(object_construct('sql_source', '{{this.identifier}}','node_url','late-multi-patina.flow-testnet.quiknode.pro:8999','external_table', 'testnet_blocks_v2', 'sql_limit', {{var('sql_limit','15000')}}, 'producer_batch_size', {{var('producer_batch_size','5000')}}, 'worker_batch_size', {{var('worker_batch_size','500')}}, 'batch_call_limit', {{var('batch_call_limit','1')}}))", target = "{{this.schema}}.{{this.identifier}}" ), tags = ['streamline_realtime_testnet'] ) }} -WITH post_crescendo AS ( +WITH min_block_height AS ( SELECT - 185000000 AS block_height + 280000000 AS block_height ), tbl AS ( SELECT @@ -21,7 +21,7 @@ WITH post_crescendo AS ( SELECT block_height FROM - post_crescendo + min_block_height ) ) AND block_height IS NOT NULL @@ -36,7 +36,7 @@ WITH post_crescendo AS ( SELECT block_height FROM - post_crescendo + min_block_height ) ) AND block_height IS NOT NULL diff --git a/models/streamline/core/realtime_testnet/streamline__get_testnet_collections_realtime.sql b/models/streamline/core/realtime_testnet/streamline__get_testnet_collections_realtime.sql index b9eb8ae..91ae94e 100644 --- a/models/streamline/core/realtime_testnet/streamline__get_testnet_collections_realtime.sql +++ b/models/streamline/core/realtime_testnet/streamline__get_testnet_collections_realtime.sql @@ -1,16 +1,16 @@ {{ config ( materialized = "view", post_hook = if_data_call_function( - func = "{{this.schema}}.udf_bulk_grpc_us_east_2(object_construct('sql_source', '{{this.identifier}}','node_url','access.devnet.nodes.onflow.org:9000','external_table', 'testnet_collections', 'sql_limit', {{var('sql_limit','500000')}}, 'producer_batch_size', {{var('producer_batch_size','10000')}}, 'worker_batch_size', {{var('worker_batch_size','1000')}}, 'batch_call_limit', {{var('batch_call_limit','1')}}))", + func = "{{this.schema}}.udf_bulk_grpc_v2(object_construct('sql_source', '{{this.identifier}}','node_url','late-multi-patina.flow-testnet.quiknode.pro:8999','external_table', 'testnet_collections_v2', 'sql_limit', {{var('sql_limit','25000')}}, 'producer_batch_size', {{var('producer_batch_size','5000')}}, 'worker_batch_size', {{var('worker_batch_size','500')}}, 'batch_call_limit', {{var('batch_call_limit','1')}}))", target = "{{this.schema}}.{{this.identifier}}" ), tags = ['streamline_realtime_testnet'] ) }} WITH -post_crescendo AS ( +min_block_height AS ( SELECT - 185000000 AS block_height + 280000000 AS block_height ), -- CTE to get targeted block_heights and their associated collection_ids from the complete_get_blocks table block_collections AS ( @@ -28,7 +28,7 @@ post_crescendo AS ( SELECT block_height FROM - post_crescendo + min_block_height ) ), -- CTE to identify collections that haven't been ingested yet diff --git a/models/streamline/core/realtime_testnet/streamline__get_testnet_transaction_results_realtime.sql b/models/streamline/core/realtime_testnet/streamline__get_testnet_transaction_results_realtime.sql index 94e02ab..64d15f6 100644 --- a/models/streamline/core/realtime_testnet/streamline__get_testnet_transaction_results_realtime.sql +++ b/models/streamline/core/realtime_testnet/streamline__get_testnet_transaction_results_realtime.sql @@ -1,15 +1,15 @@ {{ config ( materialized = "view", post_hook = if_data_call_function( - func = "{{this.schema}}.udf_bulk_grpc_us_east_2(object_construct('sql_source', '{{this.identifier}}','node_url','access.devnet.nodes.onflow.org:9000','external_table', 'testnet_transaction_results', 'sql_limit', {{var('sql_limit','500000')}}, 'producer_batch_size', {{var('producer_batch_size','10000')}}, 'worker_batch_size', {{var('worker_batch_size','1000')}}, 'batch_call_limit', {{var('batch_call_limit','1')}}))", + func = "{{this.schema}}.udf_bulk_grpc_v2(object_construct('sql_source', '{{this.identifier}}','node_url','late-multi-patina.flow-testnet.quiknode.pro:8999','external_table', 'testnet_transaction_results_v2', 'sql_limit', {{var('sql_limit','25000')}}, 'producer_batch_size', {{var('producer_batch_size','5000')}}, 'worker_batch_size', {{var('worker_batch_size','500')}}, 'batch_call_limit', {{var('batch_call_limit','1')}}))", target = "{{this.schema}}.{{this.identifier}}" ), tags = ['streamline_realtime_testnet'] ) }} -WITH post_crescendo AS ( +WITH min_block_height AS ( SELECT - 185000000 AS block_height + 280000000 AS block_height ), collection_transactions AS ( SELECT @@ -26,7 +26,7 @@ WITH post_crescendo AS ( SELECT block_height FROM - post_crescendo + min_block_height ) ), -- CTE to identify transactions that haven't been ingested yet diff --git a/models/streamline/core/realtime_testnet/streamline__get_testnet_transactions_realtime.sql b/models/streamline/core/realtime_testnet/streamline__get_testnet_transactions_realtime.sql index e5a7956..a297a98 100644 --- a/models/streamline/core/realtime_testnet/streamline__get_testnet_transactions_realtime.sql +++ b/models/streamline/core/realtime_testnet/streamline__get_testnet_transactions_realtime.sql @@ -1,15 +1,15 @@ {{ config ( materialized = "view", post_hook = if_data_call_function( - func = "{{this.schema}}.udf_bulk_grpc_us_east_2(object_construct('sql_source', '{{this.identifier}}','node_url','access.devnet.nodes.onflow.org:9000','external_table', 'testnet_transactions', 'sql_limit', {{var('sql_limit','500000')}}, 'producer_batch_size', {{var('producer_batch_size','10000')}}, 'worker_batch_size', {{var('worker_batch_size','1000')}}, 'batch_call_limit', {{var('batch_call_limit','1')}}))", + func = "{{this.schema}}.udf_bulk_grpc_v2(object_construct('sql_source', '{{this.identifier}}','node_url','late-multi-patina.flow-testnet.quiknode.pro:8999','external_table', 'testnet_transactions_v2', 'sql_limit', {{var('sql_limit','50000')}}, 'producer_batch_size', {{var('producer_batch_size','5000')}}, 'worker_batch_size', {{var('worker_batch_size','500')}}, 'batch_call_limit', {{var('batch_call_limit','1')}}))", target = "{{this.schema}}.{{this.identifier}}" ), tags = ['streamline_realtime_testnet'] ) }} -WITH post_crescendo AS ( +WITH min_block_height AS ( SELECT - 185000000 AS block_height + 280000000 AS block_height ), collection_transactions AS ( SELECT @@ -26,7 +26,7 @@ WITH post_crescendo AS ( SELECT block_height FROM - post_crescendo + min_block_height ) ), -- CTE to identify transactions that haven't been ingested yet diff --git a/models/streamline/core/streamline__testnet_blocks.sql b/models/streamline/core/streamline__testnet_blocks.sql index aa0540b..46a6dc7 100644 --- a/models/streamline/core/streamline__testnet_blocks.sql +++ b/models/streamline/core/streamline__testnet_blocks.sql @@ -16,4 +16,4 @@ SELECT FROM TABLE(streamline.udtf_get_base_table({{block_height}})) WHERE - block_height > 185000000 + block_height > 280000000