From 9127e6de86737cdd0b25b270d1e46526ce650aea Mon Sep 17 00:00:00 2001 From: Jack Forgash <58153492+forgxyz@users.noreply.github.com> Date: Tue, 25 Jun 2024 12:12:51 -0600 Subject: [PATCH] AN-4852/Testnet Integration (#326) * source, bronze, complete, rt views * complete testnet_ dependency * upd sources.yml * upd models and bronze * upd models.sql * upd model in bronze views * adding chainhead_testnet * add jobs * fix: hour * upd api integration * various fixes. useast2, gha workflows, bad refs, etc * update udtf_get_base_table --------- Co-authored-by: WHYTEWYLL --- .../dbt_run_streamline_blocks_testnet.yml | 46 +++++++++++++ ...dbt_run_streamline_collections_testnet.yml | 46 +++++++++++++ ...streamline_transaction_results_testnet.yml | 46 +++++++++++++ ...bt_run_streamline_transactions_testnet.yml | 46 +++++++++++++ macros/create_udfs.sql | 3 +- macros/streamline/api_integrations.sql | 2 + macros/streamline/get_base_table_udft.sql | 2 +- macros/streamline/streamline_udfs.sql | 15 +++++ .../bronze__streamline_fr_testnet_blocks.sql | 34 ++++++++++ ...nze__streamline_fr_testnet_collections.sql | 11 ++++ ...eamline_fr_testnet_transaction_results.sql | 13 ++++ ...ze__streamline_fr_testnet_transactions.sql | 11 ++++ .../bronze__streamline_testnet_blocks.sql | 35 ++++++++++ ...bronze__streamline_testnet_collections.sql | 11 ++++ ...streamline_testnet_transaction_results.sql | 11 ++++ ...ronze__streamline_testnet_transactions.sql | 11 ++++ ...treamline__complete_get_testnet_blocks.sql | 41 ++++++++++++ ...line__complete_get_testnet_collections.sql | 39 +++++++++++ ...mplete_get_testnet_transaction_results.sql | 41 ++++++++++++ ...ine__complete_get_testnet_transactions.sql | 38 +++++++++++ ...treamline__get_testnet_blocks_realtime.sql | 61 ++++++++++++++++++ ...line__get_testnet_collections_realtime.sql | 64 +++++++++++++++++++ ...t_testnet_transaction_results_realtime.sql | 64 +++++++++++++++++++ ...ine__get_testnet_transactions_realtime.sql | 64 +++++++++++++++++++ .../core/streamline__testnet_blocks.sql | 19 ++++++ models/sources.yml | 5 +- 26 files changed, 775 insertions(+), 4 deletions(-) create mode 100644 .github/workflows/dbt_run_streamline_blocks_testnet.yml create mode 100644 .github/workflows/dbt_run_streamline_collections_testnet.yml create mode 100644 .github/workflows/dbt_run_streamline_transaction_results_testnet.yml create mode 100644 .github/workflows/dbt_run_streamline_transactions_testnet.yml create mode 100644 models/bronze/streamline/testnet/bronze__streamline_fr_testnet_blocks.sql create mode 100644 models/bronze/streamline/testnet/bronze__streamline_fr_testnet_collections.sql create mode 100644 models/bronze/streamline/testnet/bronze__streamline_fr_testnet_transaction_results.sql create mode 100644 models/bronze/streamline/testnet/bronze__streamline_fr_testnet_transactions.sql create mode 100644 models/bronze/streamline/testnet/bronze__streamline_testnet_blocks.sql create mode 100644 models/bronze/streamline/testnet/bronze__streamline_testnet_collections.sql create mode 100644 models/bronze/streamline/testnet/bronze__streamline_testnet_transaction_results.sql create mode 100644 models/bronze/streamline/testnet/bronze__streamline_testnet_transactions.sql create mode 100644 models/silver/streamline/core/complete_testnet/streamline__complete_get_testnet_blocks.sql create mode 100644 models/silver/streamline/core/complete_testnet/streamline__complete_get_testnet_collections.sql create mode 100644 models/silver/streamline/core/complete_testnet/streamline__complete_get_testnet_transaction_results.sql create mode 100644 models/silver/streamline/core/complete_testnet/streamline__complete_get_testnet_transactions.sql create mode 100644 models/silver/streamline/core/realtime_testnet/streamline__get_testnet_blocks_realtime.sql create mode 100644 models/silver/streamline/core/realtime_testnet/streamline__get_testnet_collections_realtime.sql create mode 100644 models/silver/streamline/core/realtime_testnet/streamline__get_testnet_transaction_results_realtime.sql create mode 100644 models/silver/streamline/core/realtime_testnet/streamline__get_testnet_transactions_realtime.sql create mode 100644 models/silver/streamline/core/streamline__testnet_blocks.sql diff --git a/.github/workflows/dbt_run_streamline_blocks_testnet.yml b/.github/workflows/dbt_run_streamline_blocks_testnet.yml new file mode 100644 index 0000000..2c97938 --- /dev/null +++ b/.github/workflows/dbt_run_streamline_blocks_testnet.yml @@ -0,0 +1,46 @@ +name: dbt_run_streamline_blocks_testnet +run-name: dbt_run_streamline_blocks_testnet + +on: + workflow_dispatch: + schedule: + # 1x/hour schedule = At hour 8, 21, 36, 51 every day (see https://crontab.guru) + - cron: "7,27,47 10,20 * * *" + +env: + USE_VARS: "${{ vars.USE_VARS }}" + DBT_PROFILES_DIR: "${{ vars.DBT_PROFILES_DIR }}" + ACCOUNT: "${{ vars.ACCOUNT }}" + ROLE: "${{ vars.ROLE }}" + USER: "${{ vars.USER }}" + PASSWORD: "${{ secrets.PASSWORD }}" + REGION: "${{ vars.REGION }}" + DATABASE: "${{ vars.DATABASE }}" + WAREHOUSE: "${{ vars.WAREHOUSE }}" + SCHEMA: "${{ vars.SCHEMA }}" + +concurrency: + group: ${{ github.workflow }} + +jobs: + run_dbt_jobs: + runs-on: ubuntu-latest + environment: + name: workflow_prod + + steps: + - uses: actions/checkout@v3 + + - uses: actions/setup-python@v4 + with: + python-version: "3.10" + cache: "pip" + + - name: install dependencies + run: | + pip install -r requirements.txt + dbt deps + + - name: Run DBT Realtime + run: | + dbt run -s 2+streamline__get_testnet_blocks_realtime --vars '{"STREAMLINE_INVOKE_STREAMS": True}' diff --git a/.github/workflows/dbt_run_streamline_collections_testnet.yml b/.github/workflows/dbt_run_streamline_collections_testnet.yml new file mode 100644 index 0000000..bc166fd --- /dev/null +++ b/.github/workflows/dbt_run_streamline_collections_testnet.yml @@ -0,0 +1,46 @@ +name: dbt_run_streamline_collections_testnet +run-name: dbt_run_streamline_collections_testnet + +on: + workflow_dispatch: + schedule: + # 1x/hour schedule = At hour 8, 21, 36, 51 every day (see https://crontab.guru) + - cron: "11,31,51 10,20 * * *" + +env: + USE_VARS: "${{ vars.USE_VARS }}" + DBT_PROFILES_DIR: "${{ vars.DBT_PROFILES_DIR }}" + ACCOUNT: "${{ vars.ACCOUNT }}" + ROLE: "${{ vars.ROLE }}" + USER: "${{ vars.USER }}" + PASSWORD: "${{ secrets.PASSWORD }}" + REGION: "${{ vars.REGION }}" + DATABASE: "${{ vars.DATABASE }}" + WAREHOUSE: "${{ vars.WAREHOUSE }}" + SCHEMA: "${{ vars.SCHEMA }}" + +concurrency: + group: ${{ github.workflow }} + +jobs: + run_dbt_jobs: + runs-on: ubuntu-latest + environment: + name: workflow_prod + + steps: + - uses: actions/checkout@v3 + + - uses: actions/setup-python@v4 + with: + python-version: "3.10" + cache: "pip" + + - name: install dependencies + run: | + pip install -r requirements.txt + dbt deps + + - name: Run DBT Realtime + run: | + dbt run -s 2+streamline__get_testnet_collections_realtime --vars '{"STREAMLINE_INVOKE_STREAMS": True}' diff --git a/.github/workflows/dbt_run_streamline_transaction_results_testnet.yml b/.github/workflows/dbt_run_streamline_transaction_results_testnet.yml new file mode 100644 index 0000000..240d47d --- /dev/null +++ b/.github/workflows/dbt_run_streamline_transaction_results_testnet.yml @@ -0,0 +1,46 @@ +name: dbt_run_streamline_transaction_results_testnet +run-name: dbt_run_streamline_transaction_results_testnet + +on: + workflow_dispatch: + schedule: + # 1x/hour schedule = At hour 8, 21, 36, 51 every day (see https://crontab.guru) + - cron: "15,35,55 10,20 * * *" + +env: + USE_VARS: "${{ vars.USE_VARS }}" + DBT_PROFILES_DIR: "${{ vars.DBT_PROFILES_DIR }}" + ACCOUNT: "${{ vars.ACCOUNT }}" + ROLE: "${{ vars.ROLE }}" + USER: "${{ vars.USER }}" + PASSWORD: "${{ secrets.PASSWORD }}" + REGION: "${{ vars.REGION }}" + DATABASE: "${{ vars.DATABASE }}" + WAREHOUSE: "${{ vars.WAREHOUSE }}" + SCHEMA: "${{ vars.SCHEMA }}" + +concurrency: + group: ${{ github.workflow }} + +jobs: + run_dbt_jobs: + runs-on: ubuntu-latest + environment: + name: workflow_prod + + steps: + - uses: actions/checkout@v3 + + - uses: actions/setup-python@v4 + with: + python-version: "3.10" + cache: "pip" + + - name: install dependencies + run: | + pip install -r requirements.txt + dbt deps + + - name: Run DBT Realtime + run: | + dbt run -s 1+sstreamline__get_testnet_transaction_results_realtime --vars '{"STREAMLINE_INVOKE_STREAMS": True}' diff --git a/.github/workflows/dbt_run_streamline_transactions_testnet.yml b/.github/workflows/dbt_run_streamline_transactions_testnet.yml new file mode 100644 index 0000000..37ab93e --- /dev/null +++ b/.github/workflows/dbt_run_streamline_transactions_testnet.yml @@ -0,0 +1,46 @@ +name: dbt_run_streamline_transactions_testnet +run-name: dbt_run_streamline_transactions_testnet + +on: + workflow_dispatch: + schedule: + # 1x/hour schedule = At hour 8, 21, 36, 51 every day (see https://crontab.guru) + - cron: "15,35,55 10,20 * * *" + +env: + USE_VARS: "${{ vars.USE_VARS }}" + DBT_PROFILES_DIR: "${{ vars.DBT_PROFILES_DIR }}" + ACCOUNT: "${{ vars.ACCOUNT }}" + ROLE: "${{ vars.ROLE }}" + USER: "${{ vars.USER }}" + PASSWORD: "${{ secrets.PASSWORD }}" + REGION: "${{ vars.REGION }}" + DATABASE: "${{ vars.DATABASE }}" + WAREHOUSE: "${{ vars.WAREHOUSE }}" + SCHEMA: "${{ vars.SCHEMA }}" + +concurrency: + group: ${{ github.workflow }} + +jobs: + run_dbt_jobs: + runs-on: ubuntu-latest + environment: + name: workflow_prod + + steps: + - uses: actions/checkout@v3 + + - uses: actions/setup-python@v4 + with: + python-version: "3.10" + cache: "pip" + + - name: install dependencies + run: | + pip install -r requirements.txt + dbt deps + + - name: Run DBT Realtime + run: | + dbt run -s 1+streamline__get_testnet_transactions_realtime --vars '{"STREAMLINE_INVOKE_STREAMS": True}' diff --git a/macros/create_udfs.sql b/macros/create_udfs.sql index 43cdf67..2644b0f 100644 --- a/macros/create_udfs.sql +++ b/macros/create_udfs.sql @@ -1,12 +1,12 @@ {% macro create_udfs() %} {% if var("UPDATE_UDFS_AND_SPS") %} - {% if target.database != "FLOW_COMMUNITY_DEV" %} {% set sql %} {{ create_udtf_get_base_table( schema = "streamline" ) }} {{ create_udf_get_chainhead() }} + {{ create_udf_get_chainhead_testnet() }} {{ create_udf_bulk_grpc() }} {{ run_create_udf_array_disjunctive_union() }} @@ -15,6 +15,5 @@ {% endset %} {% do run_query(sql) %} {{- fsc_utils.create_udfs() -}} - {% endif %} {% endif %} {% endmacro %} diff --git a/macros/streamline/api_integrations.sql b/macros/streamline/api_integrations.sql index 8faa3ea..9f87973 100644 --- a/macros/streamline/api_integrations.sql +++ b/macros/streamline/api_integrations.sql @@ -23,6 +23,7 @@ {% endset %} {% do run_query(sql) %} + {% elif target.name == "dev" %} {{ log("Generating api integration for target:" ~ target.name, info=True) }} {% set sql %} @@ -45,6 +46,7 @@ ) enabled = TRUE; {% endset %} {% do run_query(sql) %} + {% elif target.name == "sbx" %} {{ log("Generating api integration for target:" ~ target.name, info=True) }} {% set sql %} diff --git a/macros/streamline/get_base_table_udft.sql b/macros/streamline/get_base_table_udft.sql index 911f251..b54b3d3 100644 --- a/macros/streamline/get_base_table_udft.sql +++ b/macros/streamline/get_base_table_udft.sql @@ -10,7 +10,7 @@ $$ seq4() ) as id from - table(generator(rowcount => 100000000)) -- July 2023 Flow Chain head is at 57M + table(generator(rowcount => 1000000000)) ) select id as height diff --git a/macros/streamline/streamline_udfs.sql b/macros/streamline/streamline_udfs.sql index e149c77..63f0bc1 100644 --- a/macros/streamline/streamline_udfs.sql +++ b/macros/streamline/streamline_udfs.sql @@ -13,6 +13,21 @@ {%- endif %}; {% endmacro %} +{% macro create_udf_get_chainhead_testnet() %} + {{ log("Creating udf get_chainhead_testnet for target:" ~ target.name ~ ", schema: " ~ target.schema, info=True) }} + {{ log("role:" ~ target.role ~ ", user:" ~ target.user, info=True) }} + CREATE + OR REPLACE EXTERNAL FUNCTION streamline.udf_get_chainhead_testnet() returns variant api_integration = + {% if target.name == "prod" %} + aws_flow_api_prod AS 'https://quxfxtl934.execute-api.us-east-1.amazonaws.com/prod/get_chainhead_testnet' + {% elif target.name == "dev" %} + aws_flow_api_dev_2 AS 'https://ul6x832e8l.execute-api.us-east-1.amazonaws.com/dev/get_chainhead_testnet' + {% elif target.name == "sbx" %} + {{ log("Creating sbx get_chainhead_testnet", info=True) }} + aws_flow_api_sbx AS 'https://bc5ejedoq8.execute-api.us-east-1.amazonaws.com/sbx/get_chainhead_testnet' + {%- endif %}; +{% endmacro %} + {% macro create_udf_bulk_grpc() %} {{ log("Creating udf udf_bulk_grpc for target:" ~ target.name ~ ", schema: " ~ target.schema, info=True) }} {{ log("role:" ~ target.role ~ ", user:" ~ target.user, info=True) }} diff --git a/models/bronze/streamline/testnet/bronze__streamline_fr_testnet_blocks.sql b/models/bronze/streamline/testnet/bronze__streamline_fr_testnet_blocks.sql new file mode 100644 index 0000000..3a15aca --- /dev/null +++ b/models/bronze/streamline/testnet/bronze__streamline_fr_testnet_blocks.sql @@ -0,0 +1,34 @@ +{{ config ( + materialized = 'view' +) }} + +WITH meta AS ( + SELECT + registered_on AS _inserted_timestamp, + file_name, + CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 3), '_', 1) AS INTEGER) AS _partition_by_block_id + FROM + TABLE( + information_schema.external_table_files( + table_name => '{{ source( "bronze_streamline", "testnet_blocks") }}' + ) + ) A +) +SELECT + block_number, + DATA, + _inserted_timestamp, + MD5( + CAST( + COALESCE(CAST(block_number AS text), '' :: STRING) AS text + ) + ) AS _fsc_id, + s._partition_by_block_id, + s.value AS VALUE +FROM + {{ source("bronze_streamline", "testnet_blocks") }} s + JOIN meta b + ON b.file_name = metadata$filename + AND b._partition_by_block_id = s._partition_by_block_id +WHERE + b._partition_by_block_id = s._partition_by_block_id diff --git a/models/bronze/streamline/testnet/bronze__streamline_fr_testnet_collections.sql b/models/bronze/streamline/testnet/bronze__streamline_fr_testnet_collections.sql new file mode 100644 index 0000000..e192cab --- /dev/null +++ b/models/bronze/streamline/testnet/bronze__streamline_fr_testnet_collections.sql @@ -0,0 +1,11 @@ +{{ config ( + materialized = 'view' +) }} + +{% set model = this.identifier.split("_")[-2:] | join('_') %} +{{ streamline_external_table_FR_query( + model, + partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 3), '_', 1) AS INTEGER)", + partition_name = "_partition_by_block_id", + unique_key = "id" +) }} diff --git a/models/bronze/streamline/testnet/bronze__streamline_fr_testnet_transaction_results.sql b/models/bronze/streamline/testnet/bronze__streamline_fr_testnet_transaction_results.sql new file mode 100644 index 0000000..7fbacfa --- /dev/null +++ b/models/bronze/streamline/testnet/bronze__streamline_fr_testnet_transaction_results.sql @@ -0,0 +1,13 @@ +{{ config ( + materialized = 'view' +) }} + +{% set model = this.identifier.split("_")[-3:] | join('_') %} +{{ streamline_external_table_FR_query( + model, + partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 3), '_', 1) AS INTEGER)", + partition_name = "_partition_by_block_id", + unique_key = "id" +) }} + + diff --git a/models/bronze/streamline/testnet/bronze__streamline_fr_testnet_transactions.sql b/models/bronze/streamline/testnet/bronze__streamline_fr_testnet_transactions.sql new file mode 100644 index 0000000..e192cab --- /dev/null +++ b/models/bronze/streamline/testnet/bronze__streamline_fr_testnet_transactions.sql @@ -0,0 +1,11 @@ +{{ config ( + materialized = 'view' +) }} + +{% set model = this.identifier.split("_")[-2:] | join('_') %} +{{ streamline_external_table_FR_query( + model, + partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 3), '_', 1) AS INTEGER)", + partition_name = "_partition_by_block_id", + unique_key = "id" +) }} diff --git a/models/bronze/streamline/testnet/bronze__streamline_testnet_blocks.sql b/models/bronze/streamline/testnet/bronze__streamline_testnet_blocks.sql new file mode 100644 index 0000000..24c26fa --- /dev/null +++ b/models/bronze/streamline/testnet/bronze__streamline_testnet_blocks.sql @@ -0,0 +1,35 @@ +{{ config ( + materialized = 'view' +) }} + +WITH meta AS ( + SELECT + last_modified AS _inserted_timestamp, + file_name, + CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 3), '_', 1) AS INTEGER) AS _partition_by_block_id + FROM + TABLE( + information_schema.external_table_file_registration_history( + start_time => DATEADD('day', -3, CURRENT_TIMESTAMP()), + table_name => '{{ source( "bronze_streamline", "testnet_blocks") }}') + ) A + ) + SELECT + block_number, + DATA, + _inserted_timestamp, + MD5( + CAST( + COALESCE(CAST(block_number AS text), '' :: STRING) AS text + ) + ) AS _fsc_id, + s._partition_by_block_id, + s.value AS VALUE + FROM + {{ source("bronze_streamline","testnet_blocks") }} s + JOIN meta b + ON b.file_name = metadata$filename + AND b._partition_by_block_id = s._partition_by_block_id + WHERE + b._partition_by_block_id = s._partition_by_block_id + diff --git a/models/bronze/streamline/testnet/bronze__streamline_testnet_collections.sql b/models/bronze/streamline/testnet/bronze__streamline_testnet_collections.sql new file mode 100644 index 0000000..545031e --- /dev/null +++ b/models/bronze/streamline/testnet/bronze__streamline_testnet_collections.sql @@ -0,0 +1,11 @@ +{{ config ( + materialized = 'view' +) }} + +{% set model = this.identifier.split("_")[-2:] | join('_') %} +{{ streamline_external_table_query( + model, + partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 3), '_', 1) AS INTEGER)", + partition_name = "_partition_by_block_id", + unique_key = "id" +) }} diff --git a/models/bronze/streamline/testnet/bronze__streamline_testnet_transaction_results.sql b/models/bronze/streamline/testnet/bronze__streamline_testnet_transaction_results.sql new file mode 100644 index 0000000..bd31738 --- /dev/null +++ b/models/bronze/streamline/testnet/bronze__streamline_testnet_transaction_results.sql @@ -0,0 +1,11 @@ +{{ config ( + materialized = 'view' +) }} + +{% set model = this.identifier.split("_")[-3:] | join('_') %} +{{ streamline_external_table_query( + model, + partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 3), '_', 1) AS INTEGER)", + partition_name = "_partition_by_block_id", + unique_key = "id" +) }} diff --git a/models/bronze/streamline/testnet/bronze__streamline_testnet_transactions.sql b/models/bronze/streamline/testnet/bronze__streamline_testnet_transactions.sql new file mode 100644 index 0000000..545031e --- /dev/null +++ b/models/bronze/streamline/testnet/bronze__streamline_testnet_transactions.sql @@ -0,0 +1,11 @@ +{{ config ( + materialized = 'view' +) }} + +{% set model = this.identifier.split("_")[-2:] | join('_') %} +{{ streamline_external_table_query( + model, + partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 3), '_', 1) AS INTEGER)", + partition_name = "_partition_by_block_id", + unique_key = "id" +) }} diff --git a/models/silver/streamline/core/complete_testnet/streamline__complete_get_testnet_blocks.sql b/models/silver/streamline/core/complete_testnet/streamline__complete_get_testnet_blocks.sql new file mode 100644 index 0000000..0c8fbcd --- /dev/null +++ b/models/silver/streamline/core/complete_testnet/streamline__complete_get_testnet_blocks.sql @@ -0,0 +1,41 @@ +-- depends_on: {{ ref('bronze__streamline_testnet_blocks') }} +-- depends_on: {{ ref('bronze__streamline_fr_testnet_blocks') }} +{{ config ( + materialized = "incremental", + unique_key = "block_number", + cluster_by = "ROUND(block_number, -3)", + merge_update_columns = ["block_number"], + post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION on equality(block_number)", + tags = ['streamline_complete_testnet'] +) }} + +SELECT + DATA, + block_number, + _partition_by_block_id, + _inserted_timestamp +FROM + +{% if is_incremental() %} +{{ ref('bronze__streamline_testnet_blocks') }} +WHERE + _inserted_timestamp >= COALESCE( + ( + SELECT + MAX(_inserted_timestamp) _inserted_timestamp + FROM + {{ this }} + ), + '1900-01-01' :: timestamp_ntz + ) +{% else %} + {{ ref('bronze__streamline_fr_testnet_blocks') }} +WHERE + TRUE +{% endif %} +AND NOT ( + DATA :status :: INT != 2 +) +qualify(ROW_NUMBER() over (PARTITION BY block_number +ORDER BY + _inserted_timestamp DESC)) = 1 diff --git a/models/silver/streamline/core/complete_testnet/streamline__complete_get_testnet_collections.sql b/models/silver/streamline/core/complete_testnet/streamline__complete_get_testnet_collections.sql new file mode 100644 index 0000000..5657cd4 --- /dev/null +++ b/models/silver/streamline/core/complete_testnet/streamline__complete_get_testnet_collections.sql @@ -0,0 +1,39 @@ +-- depends_on: {{ ref('bronze__streamline_testnet_collections') }} +-- depends_on: {{ ref('bronze__streamline_fr_testnet_collections') }} + +{{ config ( + materialized = "incremental", + unique_key = "id", + cluster_by = "ROUND(block_number, -3)", + merge_update_columns = ["id"], + post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION on equality(id)", + tags = ['streamline_complete_testnet'] +) }} + +SELECT + id, + data, + block_number, + _partition_by_block_id, + _inserted_timestamp +FROM + +{% if is_incremental() %} +{{ ref('bronze__streamline_testnet_collections') }} +WHERE + _inserted_timestamp >= COALESCE( + ( + SELECT + MAX(_inserted_timestamp) _inserted_timestamp + FROM + {{ this }} + ), + '1900-01-01'::timestamp_ntz + ) +{% else %} + {{ ref('bronze__streamline_fr_testnet_collections') }} +{% endif %} + +qualify(ROW_NUMBER() over (PARTITION BY id +ORDER BY + _inserted_timestamp DESC)) = 1 \ No newline at end of file diff --git a/models/silver/streamline/core/complete_testnet/streamline__complete_get_testnet_transaction_results.sql b/models/silver/streamline/core/complete_testnet/streamline__complete_get_testnet_transaction_results.sql new file mode 100644 index 0000000..7c9fd49 --- /dev/null +++ b/models/silver/streamline/core/complete_testnet/streamline__complete_get_testnet_transaction_results.sql @@ -0,0 +1,41 @@ +-- depends_on: {{ ref('bronze__streamline_testnet_transaction_results') }} +-- depends_on: {{ ref('bronze__streamline_fr_testnet_transaction_results') }} +{{ config ( + materialized = "incremental", + unique_key = "id", + cluster_by = "ROUND(block_number, -3)", + merge_update_columns = ["id"], + post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION on equality(id)", + tags = ['streamline_complete_testnet'] +) }} + +SELECT + id, + DATA, + block_number, + _partition_by_block_id, + _inserted_timestamp +FROM + +{% if is_incremental() %} +{{ ref('bronze__streamline_testnet_transaction_results') }} +WHERE + _inserted_timestamp >= COALESCE( + ( + SELECT + MAX(_inserted_timestamp) _inserted_timestamp + FROM + {{ this }} + ), + '1900-01-01' :: timestamp_ntz + ) +{% else %} + {{ ref('bronze__streamline_fr_testnet_transaction_results') }} +WHERE + TRUE +{% endif %} +AND NOT ( + DATA :status :: INT < 4 +) qualify(ROW_NUMBER() over (PARTITION BY id +ORDER BY + _inserted_timestamp DESC)) = 1 diff --git a/models/silver/streamline/core/complete_testnet/streamline__complete_get_testnet_transactions.sql b/models/silver/streamline/core/complete_testnet/streamline__complete_get_testnet_transactions.sql new file mode 100644 index 0000000..30cbd77 --- /dev/null +++ b/models/silver/streamline/core/complete_testnet/streamline__complete_get_testnet_transactions.sql @@ -0,0 +1,38 @@ +-- depends_on: {{ ref('bronze__streamline_testnet_transactions') }} +-- depends_on: {{ ref('bronze__streamline_fr_testnet_transactions') }} +{{ config ( + materialized = "incremental", + unique_key = "id", + cluster_by = "ROUND(block_number, -3)", + merge_update_columns = ["id"], + post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION on equality(id)", + tags = ['streamline_complete_testnet'] +) }} + +SELECT + id, + data, + block_number, + _partition_by_block_id, + _inserted_timestamp +FROM + +{% if is_incremental() %} +{{ ref('bronze__streamline_testnet_transactions') }} +WHERE + _inserted_timestamp >= COALESCE( + ( + SELECT + MAX(_inserted_timestamp) _inserted_timestamp + FROM + {{ this }} + ), + '1900-01-01'::timestamp_ntz + ) +{% else %} + {{ ref('bronze__streamline_fr_testnet_transactions') }} +{% endif %} + +qualify(ROW_NUMBER() over (PARTITION BY id +ORDER BY + _inserted_timestamp DESC)) = 1 diff --git a/models/silver/streamline/core/realtime_testnet/streamline__get_testnet_blocks_realtime.sql b/models/silver/streamline/core/realtime_testnet/streamline__get_testnet_blocks_realtime.sql new file mode 100644 index 0000000..e144a8d --- /dev/null +++ b/models/silver/streamline/core/realtime_testnet/streamline__get_testnet_blocks_realtime.sql @@ -0,0 +1,61 @@ +{{ config ( + materialized = "view", + post_hook = if_data_call_function( + func = "{{this.schema}}.udf_bulk_grpc_us_east_2(object_construct('sql_source', '{{this.identifier}}','node_url','access.devnet.nodes.onflow.org:9000','external_table', 'testnet_blocks', 'sql_limit', {{var('sql_limit','500000')}}, 'producer_batch_size', {{var('producer_batch_size','10000')}}, 'worker_batch_size', {{var('worker_batch_size','1000')}}, 'batch_call_limit', {{var('batch_call_limit','1')}}))", + target = "{{this.schema}}.{{this.identifier}}" + ), + tags = ['streamline_realtime_testnet'] +) }} +WITH post_crescendo AS ( + SELECT + 185000000 AS block_height +), + tbl AS ( + SELECT + block_height + FROM + {{ ref('streamline__testnet_blocks') }} + WHERE + ( + block_height >= ( + SELECT + block_height + FROM + post_crescendo + ) + ) + AND block_height IS NOT NULL + EXCEPT + SELECT + block_number AS block_height + FROM + {{ ref('streamline__complete_get_testnet_blocks') }} + WHERE + ( + block_height >= ( + SELECT + block_height + FROM + post_crescendo + ) + ) + AND block_height IS NOT NULL + ) +SELECT + OBJECT_CONSTRUCT( + 'grpc', + 'proto3', + 'method', + 'get_block_by_height', + 'block_height', + block_height :: INTEGER, + 'method_params', + OBJECT_CONSTRUCT( + 'height', + block_height + ) + ) AS request +FROM + tbl +ORDER BY + block_height DESC diff --git a/models/silver/streamline/core/realtime_testnet/streamline__get_testnet_collections_realtime.sql b/models/silver/streamline/core/realtime_testnet/streamline__get_testnet_collections_realtime.sql new file mode 100644 index 0000000..b9eb8ae --- /dev/null +++ b/models/silver/streamline/core/realtime_testnet/streamline__get_testnet_collections_realtime.sql @@ -0,0 +1,64 @@ +{{ config ( + materialized = "view", + post_hook = if_data_call_function( + func = "{{this.schema}}.udf_bulk_grpc_us_east_2(object_construct('sql_source', '{{this.identifier}}','node_url','access.devnet.nodes.onflow.org:9000','external_table', 'testnet_collections', 'sql_limit', {{var('sql_limit','500000')}}, 'producer_batch_size', {{var('producer_batch_size','10000')}}, 'worker_batch_size', {{var('worker_batch_size','1000')}}, 'batch_call_limit', {{var('batch_call_limit','1')}}))", + target = "{{this.schema}}.{{this.identifier}}" + ), + tags = ['streamline_realtime_testnet'] +) }} + +WITH +post_crescendo AS ( + SELECT + 185000000 AS block_height +), + -- CTE to get targeted block_heights and their associated collection_ids from the complete_get_blocks table + block_collections AS ( + SELECT + cb.block_number AS block_height, + collection_guarantee.value :collection_id AS collection_id + FROM + {{ ref("streamline__complete_get_testnet_blocks") }} + cb, + LATERAL FLATTEN( + input => cb.data :collection_guarantees + ) AS collection_guarantee + WHERE + block_height >= ( + SELECT + block_height + FROM + post_crescendo + ) + ), + -- CTE to identify collections that haven't been ingested yet + collections_to_ingest AS ( + SELECT + bc.block_height, + bc.collection_id + FROM + block_collections bc + LEFT JOIN {{ ref("streamline__complete_get_testnet_collections") }} C + ON bc.block_height = C.block_number + AND bc.collection_id = C.id + WHERE + C.id IS NULL + ) +SELECT + OBJECT_CONSTRUCT( + 'grpc', + 'proto3', + 'method', + 'get_collection_by_i_d', + 'block_height', + block_height :: INTEGER, + 'method_params', + OBJECT_CONSTRUCT( + 'id', + collection_id + ) + ) AS request +FROM + collections_to_ingest +ORDER BY + block_height DESC diff --git a/models/silver/streamline/core/realtime_testnet/streamline__get_testnet_transaction_results_realtime.sql b/models/silver/streamline/core/realtime_testnet/streamline__get_testnet_transaction_results_realtime.sql new file mode 100644 index 0000000..94e02ab --- /dev/null +++ b/models/silver/streamline/core/realtime_testnet/streamline__get_testnet_transaction_results_realtime.sql @@ -0,0 +1,64 @@ +{{ config ( + materialized = "view", + post_hook = if_data_call_function( + func = "{{this.schema}}.udf_bulk_grpc_us_east_2(object_construct('sql_source', '{{this.identifier}}','node_url','access.devnet.nodes.onflow.org:9000','external_table', 'testnet_transaction_results', 'sql_limit', {{var('sql_limit','500000')}}, 'producer_batch_size', {{var('producer_batch_size','10000')}}, 'worker_batch_size', {{var('worker_batch_size','1000')}}, 'batch_call_limit', {{var('batch_call_limit','1')}}))", + target = "{{this.schema}}.{{this.identifier}}" + ), + tags = ['streamline_realtime_testnet'] +) }} + +WITH post_crescendo AS ( + SELECT + 185000000 AS block_height +), + collection_transactions AS ( + SELECT + block_number AS block_height, + TRANSACTION.value :: STRING AS transaction_id + FROM + {{ ref("streamline__complete_get_testnet_collections") }} + cc, + LATERAL FLATTEN( + input => cc.data :transaction_ids + ) AS TRANSACTION + WHERE + block_height >= ( + SELECT + block_height + FROM + post_crescendo + ) + ), + -- CTE to identify transactions that haven't been ingested yet + transactions_to_ingest AS ( + SELECT + ct.block_height, + ct.transaction_id + FROM + collection_transactions ct + LEFT JOIN {{ ref("streamline__complete_get_testnet_transaction_results") }} + t + ON ct.transaction_id = t.id + WHERE + t.id IS NULL + ) -- Generate the requests based on the missing transactions +SELECT + OBJECT_CONSTRUCT( + 'grpc', + 'proto3', + 'method', + 'get_transaction_result', + 'block_height', + block_height :: INTEGER, + 'transaction_id', + transaction_id :: STRING, + 'method_params', + OBJECT_CONSTRUCT( + 'id', + transaction_id :: STRING + ) + ) AS request +FROM + transactions_to_ingest +ORDER BY + block_height DESC diff --git a/models/silver/streamline/core/realtime_testnet/streamline__get_testnet_transactions_realtime.sql b/models/silver/streamline/core/realtime_testnet/streamline__get_testnet_transactions_realtime.sql new file mode 100644 index 0000000..e5a7956 --- /dev/null +++ b/models/silver/streamline/core/realtime_testnet/streamline__get_testnet_transactions_realtime.sql @@ -0,0 +1,64 @@ +{{ config ( + materialized = "view", + post_hook = if_data_call_function( + func = "{{this.schema}}.udf_bulk_grpc_us_east_2(object_construct('sql_source', '{{this.identifier}}','node_url','access.devnet.nodes.onflow.org:9000','external_table', 'testnet_transactions', 'sql_limit', {{var('sql_limit','500000')}}, 'producer_batch_size', {{var('producer_batch_size','10000')}}, 'worker_batch_size', {{var('worker_batch_size','1000')}}, 'batch_call_limit', {{var('batch_call_limit','1')}}))", + target = "{{this.schema}}.{{this.identifier}}" + ), + tags = ['streamline_realtime_testnet'] +) }} + +WITH post_crescendo AS ( + SELECT + 185000000 AS block_height +), + collection_transactions AS ( + SELECT + block_number AS block_height, + TRANSACTION.value :: STRING AS transaction_id + FROM + {{ ref("streamline__complete_get_testnet_collections") }} + cc, + LATERAL FLATTEN( + input => cc.data :transaction_ids + ) AS TRANSACTION + WHERE + block_height >= ( + SELECT + block_height + FROM + post_crescendo + ) + ), + -- CTE to identify transactions that haven't been ingested yet + transactions_to_ingest AS ( + SELECT + ct.block_height, + ct.transaction_id + FROM + collection_transactions ct + LEFT JOIN {{ ref("streamline__complete_get_testnet_transactions") }} + t + ON ct.transaction_id = t.id + WHERE + t.id IS NULL + ) -- Generate the requests based on the missing transactions +SELECT + OBJECT_CONSTRUCT( + 'grpc', + 'proto3', + 'method', + 'get_transaction', + 'block_height', + block_height :: INTEGER, + 'transaction_id', + transaction_id :: STRING, + 'method_params', + OBJECT_CONSTRUCT( + 'id', + transaction_id :: STRING + ) + ) AS request +FROM + transactions_to_ingest +ORDER BY + block_height DESC diff --git a/models/silver/streamline/core/streamline__testnet_blocks.sql b/models/silver/streamline/core/streamline__testnet_blocks.sql new file mode 100644 index 0000000..aa0540b --- /dev/null +++ b/models/silver/streamline/core/streamline__testnet_blocks.sql @@ -0,0 +1,19 @@ +{{ config ( + materialized = "view", + tags = ['streamline_realtime_testnet'] +) }} + +{% if execute %} + +{% set height = run_query('SELECT streamline.udf_get_chainhead_testnet()') %} +{% set block_height = height.columns[0].values()[0] %} +{% else %} +{% set block_height = 0 %} +{% endif %} + +SELECT + height as block_height +FROM + TABLE(streamline.udtf_get_base_table({{block_height}})) +WHERE + block_height > 185000000 diff --git a/models/sources.yml b/models/sources.yml index b1efea0..7b8ae35 100644 --- a/models/sources.yml +++ b/models/sources.yml @@ -118,7 +118,10 @@ sources: - name: TRANSACTION_RESULTS_MAINNET_20 - name: TRANSACTION_RESULTS_MAINNET_21 - name: TRANSACTION_RESULTS_MAINNET_22 - + - name: testnet_blocks + - name: testnet_collections + - name: testnet_transactions + - name: testnet_transaction_results - name: crosschain_silver database: crosschain schema: silver