Stream 798/tx batch tuning (#280)

* adds tx result batch make directive | bronze make directive

* batch tx_results success

* revert collections bronze scope

* revert tx results history bronze

---------

Co-authored-by: shah <info@shahnewazkhan.ca>
This commit is contained in:
Shah Newaz Khan 2024-03-04 13:33:25 -08:00 committed by GitHub
parent 5734a9b743
commit 0d3128f868
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 109 additions and 21 deletions

View File

@ -83,7 +83,15 @@ tx_history:
tx_results_history:
dbt run \
--vars '{"STREAMLINE_INVOKE_STREAMS":True, "STREAMLINE_USE_DEV_FOR_EXTERNAL_TABLES": True}' \
-m 1+models/silver/streamline/core/history/transaction_results/streamline__get_transaction_results_history_mainnet_22.sql \
-m 1+models/silver/streamline/core/history/transaction_results/streamline__get_transaction_results_history_mainnet_18.sql \
--profile flow \
--target $(DBT_TARGET) \
--profiles-dir ~/.dbt
tx_results_batch_history:
dbt run \
--vars '{"STREAMLINE_INVOKE_STREAMS":True, "STREAMLINE_USE_DEV_FOR_EXTERNAL_TABLES": True}' \
-m 1+models/silver/streamline/core/history/transaction_results/batch/streamline__get_batch_transaction_results_history_mainnet_18.sql \
--profile flow \
--target $(DBT_TARGET) \
--profiles-dir ~/.dbt
@ -96,4 +104,9 @@ lq_overloads:
--profiles-dir ~/.dbt \
--vars '{"UPDATE_EPHEMERAL_UDFS":True}'
bronze:
dbt run \
-s bronze__streamline_transaction_results_history \
--vars '{"STREAMLINE_USE_DEV_FOR_EXTERNAL_TABLES": True}' \
--profiles-dir ~/.dbt \
--target $(DBT_TARGET)

View File

@ -0,0 +1,66 @@
{{ config (
materialized = "view",
post_hook = fsc_utils.if_data_call_function_v2(
func = 'udf_bulk_grpc_us_east_2',
target = "{{this.schema}}.{{this.identifier}}",
params = {
"node_url":"access-001.mainnet18.nodes.onflow.org:9000",
"external_table": "transaction_results_batch_mainnet_18",
"sql_limit": "100",
"producer_batch_size": "100",
"worker_batch_size": "100",
"sql_source": "{{this.identifier}}"
}
)
) }}
WITH blocks AS (
-- CTE to identify blocks that doesn't have tx_results ingested for mainnet 18
SELECT
block_height
FROM
{{ ref("streamline__blocks") }}
WHERE
block_height BETWEEN 31735955
AND 35858810
EXCEPT
SELECT
distinct block_number AS block_height
FROM
{{ ref("streamline__complete_get_transaction_results_history") }}
WHERE
block_number BETWEEN 31735955
AND 35858810
),
block_ids AS (
-- CTE to generate the block_ids for the missing block transactions
SELECT
data:id::STRING as block_id,
block_number
FROM
{{ ref("streamline__complete_get_blocks_history")}}
WHERE
block_number BETWEEN 31735955
AND 35858810
)
-- Generate the requests based on the missing block transactions
SELECT
OBJECT_CONSTRUCT(
'grpc',
'proto3',
'method',
'get_transaction_results_by_block_i_d',
'block_height',
block_height :: INTEGER,
'method_params',
OBJECT_CONSTRUCT(
'block_id',
block_id
)
) AS request
FROM
blocks
JOIN
block_ids on blocks.block_height = block_ids.block_number
ORDER BY
block_height ASC

View File

@ -1,8 +1,17 @@
{{ config (
materialized = "view",
post_hook = if_data_call_function(
func = "{{this.schema}}.udf_bulk_grpc_us_east_2(object_construct('sql_source', '{{this.identifier}}','node_url','access-001.mainnet18.nodes.onflow.org:9000','external_table', 'transaction_results_mainnet_18', 'sql_limit', '6000000', 'producer_batch_size','1000', 'worker_batch_size','2', 'batch_call_limit', {{var('batch_call_limit','1')}}))",
target = "{{this.schema}}.{{this.identifier}}"
post_hook = fsc_utils.if_data_call_function_v2(
func = 'udf_bulk_grpc_us_east_2',
target = "{{this.schema}}.{{this.identifier}}",
params = {
"node_url":"access-001.mainnet18.nodes.onflow.org:9000",
"external_table": "transaction_results_mainnet_18",
"sql_limit": "25000",
"producer_batch_size": "1000",
"worker_batch_size": "200",
"sql_source": "{{this.identifier}}"
}
)
) }}

View File

@ -1,16 +1,16 @@
packages:
- package: calogica/dbt_expectations
version: 0.8.0
- package: dbt-labs/dbt_external_tables
version: 0.8.0
- package: dbt-labs/dbt_utils
version: 1.0.0
- git: https://github.com/FlipsideCrypto/fsc-utils.git
revision: 3a45311f3e94c8a695443f77aab433a05b894f90
- package: get-select/dbt_snowflake_query_tags
version: 2.3.2
- package: calogica/dbt_date
version: 0.7.2
- git: https://github.com/FlipsideCrypto/livequery-models.git
revision: bca494102fbd2d621d32746e9a7fe780678044f8
sha1_hash: 82abbdb1baa6b6b88e7d0e40f3ee8aaf2850e052
- package: calogica/dbt_expectations
version: 0.8.0
- package: dbt-labs/dbt_external_tables
version: 0.8.0
- package: dbt-labs/dbt_utils
version: 1.0.0
- git: https://github.com/FlipsideCrypto/fsc-utils.git
revision: ec7703e90ad70dfd997abe57717a07400aae2f17
- package: get-select/dbt_snowflake_query_tags
version: 2.3.3
- package: calogica/dbt_date
version: 0.7.2
- git: https://github.com/FlipsideCrypto/livequery-models.git
revision: bca494102fbd2d621d32746e9a7fe780678044f8
sha1_hash: bb079c5efcbe9e6e5f17498d0c787df276e58f67

View File

@ -6,6 +6,6 @@ packages:
- package: dbt-labs/dbt_utils
version: 1.0.0
- git: https://github.com/FlipsideCrypto/fsc-utils.git
revision: "v1.9.3"
revision: "v1.17.2"
- package: get-select/dbt_snowflake_query_tags
version: [">=2.0.0", "<3.0.0"]