From 0dee1c27e9e76809aac38056e14c898cd8b41720 Mon Sep 17 00:00:00 2001 From: WHYTEWYLL <49686519+WHYTEWYLL@users.noreply.github.com> Date: Thu, 14 Mar 2024 15:38:49 -0600 Subject: [PATCH] adding batch methods for chain and cronjob (#284) Finetune --- .github/workflows/dbt_run_history.yml | 37 ++++----- Makefile | 8 ++ ...transaction_results_history_mainnet_17.sql | 67 ++++++++++++++++ ...transaction_results_history_mainnet_18.sql | 12 +-- ...transaction_results_history_mainnet_19.sql | 4 +- ...transaction_results_history_mainnet_22.sql | 67 ++++++++++++++++ ...transaction_results_history_mainnet_14.sql | 2 +- ...transaction_results_history_mainnet_15.sql | 2 +- ...transaction_results_history_mainnet_16.sql | 2 +- ...ine__get_batch_transactions_mainnet_18.sql | 76 +++++++++++++++++++ ...ine__get_batch_transactions_mainnet_19.sql | 76 +++++++++++++++++++ ...e__get_transactions_history_mainnet_18.sql | 2 +- ...e__get_transactions_history_mainnet_19.sql | 2 +- 13 files changed, 323 insertions(+), 34 deletions(-) create mode 100644 models/silver/streamline/core/history/transaction_results/batch/streamline__get_batch_transaction_results_history_mainnet_17.sql create mode 100644 models/silver/streamline/core/history/transaction_results/batch/streamline__get_batch_transaction_results_history_mainnet_22.sql create mode 100644 models/silver/streamline/core/history/transactions/batch/streamline__get_batch_transactions_mainnet_18.sql create mode 100644 models/silver/streamline/core/history/transactions/batch/streamline__get_batch_transactions_mainnet_19.sql diff --git a/.github/workflows/dbt_run_history.yml b/.github/workflows/dbt_run_history.yml index 2ff9053..0b9dfc7 100644 --- a/.github/workflows/dbt_run_history.yml +++ b/.github/workflows/dbt_run_history.yml @@ -4,8 +4,8 @@ run-name: dbt_run_history on: workflow_dispatch: schedule: - # Runs once per day at 00:00 UTC - - cron: "0 0 * * *" + # Runs every 20 minutes + - cron: "*/20 * * * *" env: USE_VARS: "${{ vars.USE_VARS }}" @@ -42,22 +42,17 @@ jobs: - name: Run DBT Jobs run: > - dbt run -s - 2+streamline__get_transactions_history_mainnet_18 - 2+streamline__get_transactions_history_mainnet_19 - 2+streamline__get_transaction_results_history_mainnet_14 - 2+streamline__get_transaction_results_history_mainnet_15 - 2+streamline__get_transaction_results_history_mainnet_17 - 2+streamline__get_transaction_results_history_mainnet_16 - 2+streamline__get_transaction_results_history_mainnet_18 - 2+streamline__get_transaction_results_history_mainnet_19 - 2+streamline__get_transaction_results_history_mainnet_22 - --vars '{"STREAMLINE_INVOKE_STREAMS": True}' - - - name: Store logs - uses: actions/upload-artifact@v3 - with: - name: dbt-logs - path: | - logs - target + dbt run -s + 2+streamline__get_batch_transactions_mainnet_18 + 2+streamline__get_batch_transactions_mainnet_19 + 2+streamline__get_transaction_results_history_mainnet_14 + 2+streamline__get_transaction_results_history_mainnet_15 + 2+streamline__get_transaction_results_history_mainnet_16 + 2+streamline__get_batch_transaction_results_history_mainnet_17 + 2+streamline__get_batch_transaction_results_history_mainnet_18 + 2+streamline__get_batch_transaction_results_history_mainnet_19 + 2+streamline__get_batch_transaction_results_history_mainnet_22 + --vars '{"STREAMLINE_INVOKE_STREAMS": True}' + environment: workflow_prod + warehouse: ${{ vars.WAREHOUSE }} + secrets: inherit \ No newline at end of file diff --git a/Makefile b/Makefile index 2e92f1b..07a366e 100644 --- a/Makefile +++ b/Makefile @@ -97,6 +97,14 @@ tx_results_batch_history: --target $(DBT_TARGET) \ --profiles-dir ~/.dbt +tx_batch_history: + dbt run \ + --vars '{"STREAMLINE_INVOKE_STREAMS": $(INVOKE_STREAMS), "STREAMLINE_USE_DEV_FOR_EXTERNAL_TABLES": True}' \ + -m 1+models/silver/streamline/core/history/transactions/batch/streamline__get_batch_transactions_mainnet_18.sql \ + --profile flow \ + --target dev \ + --profiles-dir ~/.dbt + lq_overloads: dbt run \ -s models/deploy/core/ \ diff --git a/models/silver/streamline/core/history/transaction_results/batch/streamline__get_batch_transaction_results_history_mainnet_17.sql b/models/silver/streamline/core/history/transaction_results/batch/streamline__get_batch_transaction_results_history_mainnet_17.sql new file mode 100644 index 0000000..4bbcfa1 --- /dev/null +++ b/models/silver/streamline/core/history/transaction_results/batch/streamline__get_batch_transaction_results_history_mainnet_17.sql @@ -0,0 +1,67 @@ +{{ config ( + materialized = "view", + post_hook = fsc_utils.if_data_call_function_v2( + func = 'udf_bulk_grpc', + target = "{{this.schema}}.{{this.identifier}}", + params = { + "node_url":"access-001.mainnet17.nodes.onflow.org:9000", + "external_table": "transaction_results_mainnet_17", + "sql_limit": "188000", + "producer_batch_size": "14000", + "worker_batch_size": "100", + "sql_source": "{{this.identifier}}", + "concurrent_requests": "800" + } + ) +) }} + +WITH blocks AS ( +-- CTE to identify blocks that doesn't have tx_results ingested for mainnet 14 + SELECT + block_height + FROM + {{ ref("streamline__blocks") }} + WHERE + block_height BETWEEN 27341470 + AND 31735954 + EXCEPT + SELECT + distinct block_number AS block_height + FROM + {{ ref("streamline__complete_get_transaction_results_history") }} + WHERE + block_number BETWEEN 27341470 + AND 31735954 +), +block_ids AS ( +-- CTE to generate the block_ids for the missing block transactions + SELECT + data:id::STRING as block_id, + block_number + FROM + {{ ref("streamline__complete_get_blocks_history")}} + WHERE + block_number BETWEEN 27341470 + AND 31735954 +) +-- Generate the requests based on the missing block transactions +SELECT + OBJECT_CONSTRUCT( + 'grpc', + 'proto3', + 'method', + 'get_transaction_results_by_block_i_d', + 'block_height', + block_height :: INTEGER, + 'method_params', + OBJECT_CONSTRUCT( + 'block_id', + block_id + ) + ) AS request +FROM + blocks +JOIN + block_ids on blocks.block_height = block_ids.block_number +ORDER BY + block_height ASC diff --git a/models/silver/streamline/core/history/transaction_results/batch/streamline__get_batch_transaction_results_history_mainnet_18.sql b/models/silver/streamline/core/history/transaction_results/batch/streamline__get_batch_transaction_results_history_mainnet_18.sql index 0bf16db..ed3dba5 100644 --- a/models/silver/streamline/core/history/transaction_results/batch/streamline__get_batch_transaction_results_history_mainnet_18.sql +++ b/models/silver/streamline/core/history/transaction_results/batch/streamline__get_batch_transaction_results_history_mainnet_18.sql @@ -1,16 +1,16 @@ {{ config ( materialized = "view", post_hook = fsc_utils.if_data_call_function_v2( - func = 'udf_bulk_grpc_us_east_2', + func = 'udf_bulk_grpc', target = "{{this.schema}}.{{this.identifier}}", params = { "node_url":"access-001.mainnet18.nodes.onflow.org:9000", - "external_table": "transaction_results_batch_mainnet_18", - "sql_limit": "72000", - "producer_batch_size": "8000", - "worker_batch_size": "1000", + "external_table": "transaction_results_mainnet_18", + "sql_limit": "188000", + "producer_batch_size": "14000", + "worker_batch_size": "100", "sql_source": "{{this.identifier}}", - "concurrent_requests": "750" + "concurrent_requests": "770" } ) ) }} diff --git a/models/silver/streamline/core/history/transaction_results/batch/streamline__get_batch_transaction_results_history_mainnet_19.sql b/models/silver/streamline/core/history/transaction_results/batch/streamline__get_batch_transaction_results_history_mainnet_19.sql index 97e1773..87d3564 100644 --- a/models/silver/streamline/core/history/transaction_results/batch/streamline__get_batch_transaction_results_history_mainnet_19.sql +++ b/models/silver/streamline/core/history/transaction_results/batch/streamline__get_batch_transaction_results_history_mainnet_19.sql @@ -1,11 +1,11 @@ {{ config ( materialized = "view", post_hook = fsc_utils.if_data_call_function_v2( - func = 'udf_bulk_grpc_us_east_2', + func = 'udf_bulk_grpc', target = "{{this.schema}}.{{this.identifier}}", params = { "node_url":"access-001.mainnet19.nodes.onflow.org:9000", - "external_table": "transaction_results_batch_mainnet_19", + "external_table": "transaction_results_mainnet_19", "sql_limit": "188000", "producer_batch_size": "14000", "worker_batch_size": "100", diff --git a/models/silver/streamline/core/history/transaction_results/batch/streamline__get_batch_transaction_results_history_mainnet_22.sql b/models/silver/streamline/core/history/transaction_results/batch/streamline__get_batch_transaction_results_history_mainnet_22.sql new file mode 100644 index 0000000..5f635ab --- /dev/null +++ b/models/silver/streamline/core/history/transaction_results/batch/streamline__get_batch_transaction_results_history_mainnet_22.sql @@ -0,0 +1,67 @@ +{{ config ( + materialized = "view", + post_hook = fsc_utils.if_data_call_function_v2( + func = 'udf_bulk_grpc', + target = "{{this.schema}}.{{this.identifier}}", + params = { + "node_url":"access-001.mainnet22.nodes.onflow.org:9000", + "external_table": "transaction_results_mainnet_22", + "sql_limit": "188000", + "producer_batch_size": "14000", + "worker_batch_size": "100", + "sql_source": "{{this.identifier}}", + "concurrent_requests": "800" + } + ) +) }} + +WITH blocks AS ( +-- CTE to identify blocks that doesn't have tx_results ingested for mainnet 18 + SELECT + block_height + FROM + {{ ref("streamline__blocks") }} + WHERE + block_height BETWEEN 47169687 + AND 55114466 + EXCEPT + SELECT + distinct block_number AS block_height + FROM + {{ ref("streamline__complete_get_transaction_results_history") }} + WHERE + block_number BETWEEN 47169687 + AND 55114466 +), +block_ids AS ( +-- CTE to generate the block_ids for the missing block transactions + SELECT + data:id::STRING as block_id, + block_number + FROM + {{ ref("streamline__complete_get_blocks_history")}} + WHERE + block_number BETWEEN 47169687 + AND 55114466 +) +-- Generate the requests based on the missing block transactions +SELECT + OBJECT_CONSTRUCT( + 'grpc', + 'proto3', + 'method', + 'get_transaction_results_by_block_i_d', + 'block_height', + block_height :: INTEGER, + 'method_params', + OBJECT_CONSTRUCT( + 'block_id', + block_id + ) + ) AS request +FROM + blocks +JOIN + block_ids on blocks.block_height = block_ids.block_number +ORDER BY + block_height ASC diff --git a/models/silver/streamline/core/history/transaction_results/streamline__get_transaction_results_history_mainnet_14.sql b/models/silver/streamline/core/history/transaction_results/streamline__get_transaction_results_history_mainnet_14.sql index 1b12faf..0f04612 100644 --- a/models/silver/streamline/core/history/transaction_results/streamline__get_transaction_results_history_mainnet_14.sql +++ b/models/silver/streamline/core/history/transaction_results/streamline__get_transaction_results_history_mainnet_14.sql @@ -1,7 +1,7 @@ {{ config ( materialized = "view", post_hook = if_data_call_function( - func = "{{this.schema}}.udf_bulk_grpc_us_east_2(object_construct('sql_source', '{{this.identifier}}','node_url','access-001.mainnet14.nodes.onflow.org:9000','external_table', 'transaction_results_mainnet_14', 'sql_limit', '6000000', 'producer_batch_size','1000', 'worker_batch_size','2', 'batch_call_limit', {{var('batch_call_limit','1')}}))", + func = "{{this.schema}}.udf_bulk_grpc(object_construct('sql_source', '{{this.identifier}}','node_url','access-001.mainnet14.nodes.onflow.org:9000','external_table', 'transaction_results_mainnet_14', 'sql_limit', '500000', 'producer_batch_size','300000', 'worker_batch_size','100', 'batch_call_limit', {{var('batch_call_limit','1')}}))", target = "{{this.schema}}.{{this.identifier}}" ) ) }} diff --git a/models/silver/streamline/core/history/transaction_results/streamline__get_transaction_results_history_mainnet_15.sql b/models/silver/streamline/core/history/transaction_results/streamline__get_transaction_results_history_mainnet_15.sql index 3363527..715c564 100644 --- a/models/silver/streamline/core/history/transaction_results/streamline__get_transaction_results_history_mainnet_15.sql +++ b/models/silver/streamline/core/history/transaction_results/streamline__get_transaction_results_history_mainnet_15.sql @@ -1,7 +1,7 @@ {{ config ( materialized = "view", post_hook = if_data_call_function( - func = "{{this.schema}}.udf_bulk_grpc_us_east_2(object_construct('sql_source', '{{this.identifier}}','node_url','access-001.mainnet15.nodes.onflow.org:9000','external_table', 'transaction_results_mainnet_15','sql_limit', '6000000', 'producer_batch_size','1000', 'worker_batch_size','2', 'batch_call_limit', {{var('batch_call_limit','1')}}))", + func = "{{this.schema}}.udf_bulk_grpc(object_construct('sql_source', '{{this.identifier}}','node_url','access-001.mainnet15.nodes.onflow.org:9000','external_table', 'transaction_results_mainnet_15','sql_limit', '500000', 'producer_batch_size','400000', 'worker_batch_size','100', 'batch_call_limit', {{var('batch_call_limit','1')}}))", target = "{{this.schema}}.{{this.identifier}}" ) ) }} diff --git a/models/silver/streamline/core/history/transaction_results/streamline__get_transaction_results_history_mainnet_16.sql b/models/silver/streamline/core/history/transaction_results/streamline__get_transaction_results_history_mainnet_16.sql index 826a0bc..f9586a4 100644 --- a/models/silver/streamline/core/history/transaction_results/streamline__get_transaction_results_history_mainnet_16.sql +++ b/models/silver/streamline/core/history/transaction_results/streamline__get_transaction_results_history_mainnet_16.sql @@ -1,7 +1,7 @@ {{ config ( materialized = "view", post_hook = if_data_call_function( - func = "{{this.schema}}.udf_bulk_grpc_us_east_2(object_construct('sql_source', '{{this.identifier}}','node_url','access-001.mainnet16.nodes.onflow.org:9000','external_table', 'transaction_results_mainnet_16', 'sql_limit', '6000000', 'producer_batch_size','1000', 'worker_batch_size','2', 'batch_call_limit', {{var('batch_call_limit','1')}}))", + func = "{{this.schema}}.udf_bulk_grpc(object_construct('sql_source', '{{this.identifier}}','node_url','access-001.mainnet16.nodes.onflow.org:9000','external_table', 'transaction_results_mainnet_16', 'sql_limit', '500000', 'producer_batch_size','200000', 'worker_batch_size','100', 'batch_call_limit', {{var('batch_call_limit','1')}}))", target = "{{this.schema}}.{{this.identifier}}" ) ) }} diff --git a/models/silver/streamline/core/history/transactions/batch/streamline__get_batch_transactions_mainnet_18.sql b/models/silver/streamline/core/history/transactions/batch/streamline__get_batch_transactions_mainnet_18.sql new file mode 100644 index 0000000..e2a1893 --- /dev/null +++ b/models/silver/streamline/core/history/transactions/batch/streamline__get_batch_transactions_mainnet_18.sql @@ -0,0 +1,76 @@ +{{ config ( + materialized = "view", + post_hook = fsc_utils.if_data_call_function_v2( + func = 'udf_bulk_grpc', + target = "{{this.schema}}.{{this.identifier}}", + params = { + "node_url":"access-001.mainnet18.nodes.onflow.org:9000", + "external_table": "transactions_mainnet_18", + "sql_limit": "188000", + "producer_batch_size": "14000", + "worker_batch_size": "100", + "sql_source": "{{this.identifier}}", + "concurrent_requests": "850" + } + ) +) }} + +WITH collection_transactions AS ( + + SELECT + block_number AS block_height, + TRANSACTION.value :: STRING AS transaction_id + FROM + {{ ref('streamline__complete_get_collections_history') }} + cch, + LATERAL FLATTEN( + input => cch.data :transaction_ids + ) AS TRANSACTION + WHERE + block_height BETWEEN 31735955 + AND 35858810 +), +-- CTE to identify transactions that haven't been ingested yet +blocks AS ( + SELECT + distinct(block_height) + FROM + collection_transactions ct + LEFT JOIN {{ ref("streamline__complete_get_transactions_history") }} + t + ON ct.transaction_id = t.id + WHERE + t.id IS NULL +), +block_ids AS ( +-- CTE to generate the block_ids for the missing block transactions + SELECT + data:id::STRING as block_id, + block_number + FROM + {{ ref("streamline__complete_get_blocks_history")}} + WHERE + block_number BETWEEN 31735955 + AND 35858810 +) +-- Generate the requests based on the missing block transactions +SELECT + OBJECT_CONSTRUCT( + 'grpc', + 'proto3', + 'method', + 'get_transactions_by_block_i_d', + 'block_height', + block_height :: INTEGER, + 'method_params', + OBJECT_CONSTRUCT( + 'block_id', + block_id + ) + ) AS request +FROM + blocks +JOIN + block_ids on blocks.block_height = block_ids.block_number +ORDER BY + block_height ASC diff --git a/models/silver/streamline/core/history/transactions/batch/streamline__get_batch_transactions_mainnet_19.sql b/models/silver/streamline/core/history/transactions/batch/streamline__get_batch_transactions_mainnet_19.sql new file mode 100644 index 0000000..78a391f --- /dev/null +++ b/models/silver/streamline/core/history/transactions/batch/streamline__get_batch_transactions_mainnet_19.sql @@ -0,0 +1,76 @@ +{{ config ( + materialized = "view", + post_hook = fsc_utils.if_data_call_function_v2( + func = 'udf_bulk_grpc', + target = "{{this.schema}}.{{this.identifier}}", + params = { + "node_url":"access-001.mainnet19.nodes.onflow.org:9000", + "external_table": "transactions_mainnet_19", + "sql_limit": "188000", + "producer_batch_size": "14000", + "worker_batch_size": "100", + "sql_source": "{{this.identifier}}", + "concurrent_requests": "800" + } + ) +) }} + +WITH collection_transactions AS ( + + SELECT + block_number AS block_height, + TRANSACTION.value :: STRING AS transaction_id + FROM + {{ ref('streamline__complete_get_collections_history') }} + cch, + LATERAL FLATTEN( + input => cch.data :transaction_ids + ) AS TRANSACTION + WHERE + block_height BETWEEN 35858811 + AND 40171633 +), +-- CTE to identify transactions that haven't been ingested yet +blocks AS ( + SELECT + distinct(block_height) + FROM + collection_transactions ct + LEFT JOIN {{ ref("streamline__complete_get_transactions_history") }} + t + ON ct.transaction_id = t.id + WHERE + t.id IS NULL +), +block_ids AS ( +-- CTE to generate the block_ids for the missing block transactions + SELECT + data:id::STRING as block_id, + block_number + FROM + {{ ref("streamline__complete_get_blocks_history")}} + WHERE + block_number BETWEEN 35858811 + AND 40171633 +) +-- Generate the requests based on the missing block transactions +SELECT + OBJECT_CONSTRUCT( + 'grpc', + 'proto3', + 'method', + 'get_transactions_by_block_i_d', + 'block_height', + block_height :: INTEGER, + 'method_params', + OBJECT_CONSTRUCT( + 'block_id', + block_id + ) + ) AS request +FROM + blocks +JOIN + block_ids on blocks.block_height = block_ids.block_number +ORDER BY + block_height ASC diff --git a/models/silver/streamline/core/history/transactions/streamline__get_transactions_history_mainnet_18.sql b/models/silver/streamline/core/history/transactions/streamline__get_transactions_history_mainnet_18.sql index 8238285..e3d2e99 100644 --- a/models/silver/streamline/core/history/transactions/streamline__get_transactions_history_mainnet_18.sql +++ b/models/silver/streamline/core/history/transactions/streamline__get_transactions_history_mainnet_18.sql @@ -1,7 +1,7 @@ {{ config ( materialized = "view", post_hook = if_data_call_function( - func = "{{this.schema}}.udf_bulk_grpc_us_east_2(object_construct('sql_source', '{{this.identifier}}','node_url','access-001.mainnet18.nodes.onflow.org:9000','external_table', 'transactions_mainnet_18', 'sql_limit', '6000000', 'producer_batch_size','1000', 'worker_batch_size', '10', 'batch_call_limit', {{var('batch_call_limit','1')}}))", + func = "{{this.schema}}.udf_bulk_grpc_us_east_2(object_construct('sql_source', '{{this.identifier}}','node_url','access-001.mainnet18.nodes.onflow.org:9000','external_table', 'transactions_mainnet_18', 'sql_limit', '6000000', 'producer_batch_size','10000', 'worker_batch_size', '1250', 'batch_call_limit', {{var('batch_call_limit','1')}}))", target = "{{this.schema}}.{{this.identifier}}" ) ) }} diff --git a/models/silver/streamline/core/history/transactions/streamline__get_transactions_history_mainnet_19.sql b/models/silver/streamline/core/history/transactions/streamline__get_transactions_history_mainnet_19.sql index 5906d1d..50db0bb 100644 --- a/models/silver/streamline/core/history/transactions/streamline__get_transactions_history_mainnet_19.sql +++ b/models/silver/streamline/core/history/transactions/streamline__get_transactions_history_mainnet_19.sql @@ -1,7 +1,7 @@ {{ config ( materialized = "view", post_hook = if_data_call_function( - func = "{{this.schema}}.udf_bulk_grpc_us_east_2(object_construct('sql_source', '{{this.identifier}}','node_url','access-001.mainnet19.nodes.onflow.org:9000','external_table', 'transactions_mainnet_19', 'sql_limit', '6000000', 'producer_batch_size','1000', 'worker_batch_size', '10', 'batch_call_limit', {{var('batch_call_limit','1')}}))", + func = "{{this.schema}}.udf_bulk_grpc_us_east_2(object_construct('sql_source', '{{this.identifier}}','node_url','access-001.mainnet19.nodes.onflow.org:9000','external_table', 'transactions_mainnet_19', 'sql_limit', '6000000', 'producer_batch_size','10000', 'worker_batch_size', '1250', 'batch_call_limit', {{var('batch_call_limit','1')}}))", target = "{{this.schema}}.{{this.identifier}}" ) ) }}