Stream 803/throughput tune mn 18 (#283)

* add var read for invoke streams

* WIP throughput tune

* added mn 19 batch model

* remove transient tx-results-batch bronze table

* add concurent_requests

---------

Co-authored-by: shah <info@shahnewazkhan.ca>
This commit is contained in:
Shah Newaz Khan 2024-03-08 14:50:04 -08:00 committed by GitHub
parent 0d3128f868
commit 3d493da486
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 75 additions and 6 deletions

View File

@ -3,6 +3,7 @@ SHELL := /bin/bash
# set default target
DBT_TARGET ?= dev
AWS_LAMBDA_ROLE ?= aws_lambda_flow_api_dev
INVOKE_STREAMS ?= True
dbt-console:
docker-compose run dbt_console
@ -90,8 +91,8 @@ tx_results_history:
tx_results_batch_history:
dbt run \
--vars '{"STREAMLINE_INVOKE_STREAMS":True, "STREAMLINE_USE_DEV_FOR_EXTERNAL_TABLES": True}' \
-m 1+models/silver/streamline/core/history/transaction_results/batch/streamline__get_batch_transaction_results_history_mainnet_18.sql \
--vars '{"STREAMLINE_INVOKE_STREAMS": $(INVOKE_STREAMS), "STREAMLINE_USE_DEV_FOR_EXTERNAL_TABLES": True}' \
-m 1+models/silver/streamline/core/history/transaction_results/batch/streamline__get_batch_transaction_results_history_mainnet_19.sql \
--profile flow \
--target $(DBT_TARGET) \
--profiles-dir ~/.dbt

View File

@ -6,10 +6,11 @@
params = {
"node_url":"access-001.mainnet18.nodes.onflow.org:9000",
"external_table": "transaction_results_batch_mainnet_18",
"sql_limit": "100",
"producer_batch_size": "100",
"worker_batch_size": "100",
"sql_source": "{{this.identifier}}"
"sql_limit": "72000",
"producer_batch_size": "8000",
"worker_batch_size": "1000",
"sql_source": "{{this.identifier}}",
"concurrent_requests": "750"
}
)
) }}

View File

@ -0,0 +1,67 @@
{{ config (
materialized = "view",
post_hook = fsc_utils.if_data_call_function_v2(
func = 'udf_bulk_grpc_us_east_2',
target = "{{this.schema}}.{{this.identifier}}",
params = {
"node_url":"access-001.mainnet19.nodes.onflow.org:9000",
"external_table": "transaction_results_batch_mainnet_19",
"sql_limit": "188000",
"producer_batch_size": "14000",
"worker_batch_size": "100",
"sql_source": "{{this.identifier}}",
"concurrent_requests": "750"
}
)
) }}
WITH blocks AS (
-- CTE to identify blocks that doesn't have tx_results ingested for mainnet 18
SELECT
block_height
FROM
{{ ref("streamline__blocks") }}
WHERE
block_height BETWEEN 35858811
AND 40171633
EXCEPT
SELECT
distinct block_number AS block_height
FROM
{{ ref("streamline__complete_get_transaction_results_history") }}
WHERE
block_number BETWEEN 35858811
AND 40171633
),
block_ids AS (
-- CTE to generate the block_ids for the missing block transactions
SELECT
data:id::STRING as block_id,
block_number
FROM
{{ ref("streamline__complete_get_blocks_history")}}
WHERE
block_number BETWEEN 35858811
AND 40171633
)
-- Generate the requests based on the missing block transactions
SELECT
OBJECT_CONSTRUCT(
'grpc',
'proto3',
'method',
'get_transaction_results_by_block_i_d',
'block_height',
block_height :: INTEGER,
'method_params',
OBJECT_CONSTRUCT(
'block_id',
block_id
)
) AS request
FROM
blocks
JOIN
block_ids on blocks.block_height = block_ids.block_number
ORDER BY
block_height ASC