diff --git a/.github/workflows/dbt_run_batch_backfill.yml b/.github/workflows/dbt_run_batch_backfill.yml index 902160df..e91b6e73 100644 --- a/.github/workflows/dbt_run_batch_backfill.yml +++ b/.github/workflows/dbt_run_batch_backfill.yml @@ -1,11 +1,11 @@ -name: dbt_run_scheduled_batch_backfill -run-name: dbt_run_scheduled_batch_backfill +name: dbt_run_batch_backfill +run-name: dbt_run_batch_backfill on: workflow_dispatch: schedule: - # Runs every 7 mins, adjust to appropriate schedule as needed - - cron: '*/7 * * * *' + # Runs every 10 mins, adjust to appropriate schedule as needed + - cron: '*/10 * * * *' env: DBT_PROFILES_DIR: "${{ vars.DBT_PROFILES_DIR }}" @@ -16,7 +16,7 @@ env: PASSWORD: "${{ secrets.PASSWORD }}" REGION: "${{ vars.REGION }}" DATABASE: "${{ vars.DATABASE }}" - WAREHOUSE: DBT_EMERGENCY + WAREHOUSE: "${{ vars.WAREHOUSE }}" SCHEMA: "${{ vars.SCHEMA }}" DBT_IS_BATCH_LOAD: true @@ -43,4 +43,4 @@ jobs: dbt deps - name: Run DBT Jobs run: | - dbt run -s models/silver/rewards/silver__rewards_rent.sql + dbt run --vars '{"STREAMLINE_INVOKE_STREAMS": True}' -s silver__backfill_transactions_index streamline__block_txs_index diff --git a/models/bronze/streamline/bronze__streamline_FR_block_tx_index_backfill.sql b/models/bronze/streamline/bronze__streamline_FR_block_tx_index_backfill.sql new file mode 100644 index 00000000..84b77d4c --- /dev/null +++ b/models/bronze/streamline/bronze__streamline_FR_block_tx_index_backfill.sql @@ -0,0 +1,12 @@ +{{ config ( + materialized = 'view' +) }} + +{% set model = "block_txs_index_backfill" %} +{{ streamline_external_table_FR_query_v2( + model, + partition_function = "to_date(split_part(split_part(file_name, '/', -2), '_result', 1), 'YYYY_MM_DD')", + partition_name = "_partition_by_created_date", + unique_key = "block_id", + other_cols="" +) }} diff --git a/models/bronze/streamline/bronze__streamline_block_tx_index_backfill.sql b/models/bronze/streamline/bronze__streamline_block_tx_index_backfill.sql new file mode 100644 index 00000000..09f9eeb6 --- /dev/null +++ b/models/bronze/streamline/bronze__streamline_block_tx_index_backfill.sql @@ -0,0 +1,12 @@ +{{ config ( + materialized = 'view' +) }} + +{% set model = "block_txs_index_backfill" %} +{{ streamline_external_table_query_v2( + model, + partition_function = "to_date(split_part(split_part(file_name, '/', -2), '_result', 1), 'YYYY_MM_DD')", + partition_name = "_partition_by_created_date", + unique_key = "block_id", + other_cols="" +) }} diff --git a/models/silver/backfill/silver__backfill_transactions_index.sql b/models/silver/backfill/silver__backfill_transactions_index.sql new file mode 100644 index 00000000..03e6b6ac --- /dev/null +++ b/models/silver/backfill/silver__backfill_transactions_index.sql @@ -0,0 +1,42 @@ +-- depends_on: {{ ref('bronze__streamline_block_tx_index_backfill') }} + +{{ + config( + materialized="incremental", + cluster_by = ['block_timestamp::date','block_id'], + tags=['tx_index_backfill'] + ) +}} + +{% if execute %} + {% if is_incremental() %} + {% set max_partition_query %} + SELECT max(_partition_by_created_date) FROM {{ this }} + {% endset %} + {% set max_partition = run_query(max_partition_query)[0][0] %} + {% endif %} +{% endif %} + +SELECT + block_id, + to_timestamp_ntz(value:"result.blockTime"::int) AS block_timestamp, + data::string as tx_id, + value:array_index::int as tx_index, + _partition_by_created_date, + _inserted_timestamp, + {{ dbt_utils.generate_surrogate_key(['tx_id']) }} AS transactions_id, + sysdate() AS inserted_timestamp, + sysdate() AS modified_timestamp, + '{{ invocation_id }}' AS _invocation_id +FROM + {% if is_incremental() %} + {{ ref('bronze__streamline_block_tx_index_backfill') }} + {% else %} + {{ ref('bronze__streamline_FR_block_tx_index_backfill') }} + {% endif %} +WHERE + data IS NOT NULL +{% if is_incremental() %} + AND _partition_by_created_date >= '{{ max_partition }}' + AND _inserted_timestamp > (SELECT max(_inserted_timestamp) FROM {{ this }}) +{% endif %} \ No newline at end of file diff --git a/models/sources.yml b/models/sources.yml index 8e2ce560..bfb4ddc6 100644 --- a/models/sources.yml +++ b/models/sources.yml @@ -69,6 +69,7 @@ sources: - name: decoded_logs_2 - name: block_txs_2 - name: helius_blocks + - name: block_txs_index_backfill - name: bronze_api schema: bronze_api tables: diff --git a/models/streamline/core/realtime/streamline__block_txs_index.sql b/models/streamline/core/realtime/streamline__block_txs_index.sql new file mode 100644 index 00000000..979f532b --- /dev/null +++ b/models/streamline/core/realtime/streamline__block_txs_index.sql @@ -0,0 +1,67 @@ +{{ config( + materialized = 'view', + post_hook = fsc_utils.if_data_call_function_v2( + func = 'streamline.udf_bulk_rest_api_v2', + target = "{{this.schema}}.{{this.identifier}}", + params ={ "external_table" :"block_txs_index_backfill", + "sql_limit" :"37500", + "producer_batch_size" :"37500", + "worker_batch_size" :"750", + "sql_source" :"{{this.identifier}}", + "order_by_column": "block_id DESC", + "exploded_key": tojson(["result.signatures"]), + "include_top_level_json": tojson(["result.blockTime"]), + } + ) +) }} + +WITH block_ids AS ( + SELECT + b.block_id + FROM + {{ ref('silver__blocks') }} b + WHERE + -- all blocks after this should have tx id filled in already so start the backfill here + b.block_id <= 307868470 + EXCEPT + SELECT DISTINCT + block_id + FROM + {{ ref('silver__backfill_transactions_index') }} +) +SELECT + block_id, + replace(current_date::string,'-','_') AS partition_key, -- Issue with streamline handling `-` in partition key so changing to `_` + {{ target.database }}.live.udf_api( + 'POST', + '{Service}?apikey={Authentication}', + OBJECT_CONSTRUCT( + 'Content-Type', + 'application/json' + ), + OBJECT_CONSTRUCT( + 'id', + block_id, + 'jsonrpc', + '2.0', + 'method', + 'getBlock', + 'params', + ARRAY_CONSTRUCT( + block_id, + OBJECT_CONSTRUCT( + 'encoding', + 'jsonParsed', + 'rewards', + False, + 'transactionDetails', + 'signatures', + 'maxSupportedTransactionVersion', + 0 + ) + ) + ), + 'Vault/prod/solana/ankr/mainnet' + ) AS request +FROM + block_ids \ No newline at end of file