From 1e17c6378bd3537c20d559c75cfdfd3cdeee54ff Mon Sep 17 00:00:00 2001 From: shah Date: Fri, 11 Aug 2023 10:01:31 -0700 Subject: [PATCH] added tx & tx_result backfill models --- Makefile | 19 +++++++- dbt_project.yml | 1 + macros/streamline/utils.sql | 29 ------------- models/silver/streamline/README.md | 26 ++++++++--- ...bronze__streamline_transaction_results.sql | 11 +++++ .../streamline__complete_get_blocks.sql | 3 +- .../streamline__complete_get_collections.sql | 1 - ...line__complete_get_transaction_results.sql | 34 +++++++++++++++ .../streamline__complete_get_transactions.sql | 2 +- ...reamline__get_blocks_history_mainnet22.sql | 9 +++- ...ine__get_collections_history_mainnet22.sql | 8 +++- ..._transaction_results_history_mainnet22.sql | 43 +++++++++++++++++++ ...ne__get_transactions_history_mainnet22.sql | 20 +++------ models/sources.yml | 1 + 14 files changed, 149 insertions(+), 58 deletions(-) delete mode 100644 macros/streamline/utils.sql create mode 100644 models/silver/streamline/bronze/core/bronze__streamline_transaction_results.sql create mode 100644 models/silver/streamline/core/complete/streamline__complete_get_transaction_results.sql create mode 100644 models/silver/streamline/core/history/transaction_results/streamline__get_transaction_results_history_mainnet22.sql diff --git a/Makefile b/Makefile index 69bffcd..2b56731 100644 --- a/Makefile +++ b/Makefile @@ -22,6 +22,15 @@ udfs: --target $(DBT_TARGET) \ --profiles-dir ~/.dbt/ +complete: + dbt run \ + --vars '{"STREAMLINE_INVOKE_STREAMS":True, "STREAMLINE_USE_DEV_FOR_EXTERNAL_TABLES": True}' \ + -m 1+models/silver/streamline/core/complete \ + --profile flow \ + --target $(DBT_TARGET) \ + --profiles-dir ~/.dbt + + grant-streamline-privileges: dbt run-operation grant_streamline_privileges \ --profile flow \ @@ -29,4 +38,12 @@ grant-streamline-privileges: --profiles-dir ~/.dbt/ \ --args '{role: $(AWS_LAMBDA_ROLE)}' -undo_clone_purge: sl-flow-api udfs grant-streamline-privileges \ No newline at end of file +undo_clone_purge: sl-flow-api udfs grant-streamline-privileges + +streamline_bronze: + dbt run \ + --vars '{"STREAMLINE_INVOKE_STREAMS":True, "STREAMLINE_USE_DEV_FOR_EXTERNAL_TABLES": True}' \ + -m 1+models/silver/streamline/bronze \ + --profiles-dir ~/.dbt \ + --target $(DBT_TARGET) \ + --profile flow \ No newline at end of file diff --git a/dbt_project.yml b/dbt_project.yml index 084cbb7..8fa27af 100644 --- a/dbt_project.yml +++ b/dbt_project.yml @@ -56,3 +56,4 @@ vars: "dbt_date:time_zone": GMT UPDATE_SNOWFLAKE_TAGS: True UPDATE_UDFS_AND_SPS: False + STREAMLINE_USE_DEV_FOR_EXTERNAL_TABLES: True diff --git a/macros/streamline/utils.sql b/macros/streamline/utils.sql deleted file mode 100644 index b66b028..0000000 --- a/macros/streamline/utils.sql +++ /dev/null @@ -1,29 +0,0 @@ -{% macro generate_blocks_grpc_request(block_height) %} - PARSE_JSON( - CONCAT( - '{"grpc": "proto3",', - '"method": "get_block_by_height",', - '"block_height":"', - block_height :: INTEGER, - '",', - '"method_params": {"height":', - block_height :: INTEGER, - '}}' - ) - ) -{% endmacro %} - -{% macro generate_collections_grpc_request(block_height, collection_guarantee) %} - PARSE_JSON( - CONCAT( - '{"grpc": "proto3",', - '"method": "get_collection_by_i_d",', - '"block_height":"', - block_height :: INTEGER, - '",', - '"method_params": {"id":"', - collection_guarantee.value:collection_id, - '"}}' - ) - ) -{% endmacro %} \ No newline at end of file diff --git a/models/silver/streamline/README.md b/models/silver/streamline/README.md index 6f7ce85..269f29c 100644 --- a/models/silver/streamline/README.md +++ b/models/silver/streamline/README.md @@ -35,17 +35,29 @@ dbt run --vars '{"STREAMLINE_INVOKE_STREAMS":True, "STREAMLINE_USE_DEV_FOR_EXTER dbt run --vars '{"STREAMLINE_INVOKE_STREAMS":True, "STREAMLINE_USE_DEV_FOR_EXTERNAL_TABLES": True}' -m 1+models/silver/streamline/core/complete/streamline__complete_get_collections.sql --profile flow --target dev --profiles-dir ~/.dbt # dev complete_get_transactions -dbt run --vars '{"STREAMLINE_INVOKE_STREAMS":True, "STREAMLINE_USE_DEV_FOR_EXTERNAL_TABLES": True}' -m 1+models/silver/streamline/core/complete/streamline__complete_get_transactionss.sql --profile flow --target dev --profiles-dir ~/.dbt - --- - +dbt run --vars '{"STREAMLINE_INVOKE_STREAMS":True, "STREAMLINE_USE_DEV_FOR_EXTERNAL_TABLES": True}' -m 1+models/silver/streamline/core/complete/streamline__complete_get_transactions.sql --profile flow --target dev --profiles-dir ~/.dbt # dev get_blocks_history -dbt run --vars '{"STREAMLINE_INVOKE_STREAMS":True, "STREAMLINE_USE_DEV_FOR_EXTERNAL_TABLES": True}' -m 1+models/silver/streamline/core/history/blocks/streamline__get_blocks_history.sql --profile flow --target dev --profiles-dir ~/.dbt +dbt run --vars '{"STREAMLINE_INVOKE_STREAMS":True, "STREAMLINE_USE_DEV_FOR_EXTERNAL_TABLES": True}' -m 1+models/silver/streamline/core/history/blocks/streamline__get_blocks_history_mainnet22.sql --profile flow --target dev --profiles-dir ~/.dbt + +# dev get_collections_history +dbt run --vars '{"STREAMLINE_INVOKE_STREAMS":True, "STREAMLINE_USE_DEV_FOR_EXTERNAL_TABLES": True}' -m 1+models/silver/streamline/core/history/collections/streamline__get_collections_history_mainnet22.sql --profile flow --target dev --profiles-dir ~/.dbt # dev get_transactions_history dbt run --vars '{"STREAMLINE_INVOKE_STREAMS":True, "STREAMLINE_USE_DEV_FOR_EXTERNAL_TABLES": True}' -m 1+models/silver/streamline/core/history/transactions/streamline__get_transactions_history_mainnet22.sql --profile flow --target dev --profiles-dir ~/.dbt -# dev complete_get_collections -dbt run --vars '{"STREAMLINE_INVOKE_STREAMS":True, "STREAMLINE_USE_DEV_FOR_EXTERNAL_TABLES": True}' -m 1+models/silver/streamline/core/complete/streamline__complete_get_collections.sql --profile flow --target dev --profiles-dir ~/.dbt +# dev get_transaction_results_history +dbt run --vars '{"STREAMLINE_INVOKE_STREAMS":True, "STREAMLINE_USE_DEV_FOR_EXTERNAL_TABLES": True}' -m 1+models/silver/streamline/core/history/transaction_results/streamline__get_transaction_results_history_mainnet22.sql --profile flow --target dev --profiles-dir ~/.dbt + +# dev bronze__streamline_blocks.sql +dbt run --select bronze__streamline_blocks.sql --profiles-dir ~/.dbt --target dev --profile flow --vars '{"STREAMLINE_USE_DEV_FOR_EXTERNAL_TABLES": True}' +``` + + +```zsh +DBT_TARGET=dev \ +AWS_LAMBDA_ROLE=AWS_LAMBDA_FLOW_API_DEV \ +make undo_clone_purge + +DBT_TARGET=dev make streamline_bronze ``` \ No newline at end of file diff --git a/models/silver/streamline/bronze/core/bronze__streamline_transaction_results.sql b/models/silver/streamline/bronze/core/bronze__streamline_transaction_results.sql new file mode 100644 index 0000000..24e013c --- /dev/null +++ b/models/silver/streamline/bronze/core/bronze__streamline_transaction_results.sql @@ -0,0 +1,11 @@ +{{ config ( + materialized = 'view' +) }} + +{% set model = this.identifier.split("_")[-2:] | join('_') %} +{{ streamline_external_table_query( + model, + partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 3), '_', 1) AS INTEGER)", + partition_name = "_partition_by_block_id", + unique_key = "block_number" +) }} diff --git a/models/silver/streamline/core/complete/streamline__complete_get_blocks.sql b/models/silver/streamline/core/complete/streamline__complete_get_blocks.sql index 868c558..10c3e2b 100644 --- a/models/silver/streamline/core/complete/streamline__complete_get_blocks.sql +++ b/models/silver/streamline/core/complete/streamline__complete_get_blocks.sql @@ -1,4 +1,3 @@ --- depends_on: {{ ref('bronze__streamline_blocks') }} {{ config ( materialized = "incremental", unique_key = "id", @@ -29,6 +28,6 @@ WHERE {{ ref('bronze__streamline_blocks') }} -- TODO: change to bronze__streamline_FR_blocks {% endif %} -qualify(ROW_NUMBER() over (PARTITION BY _partition_by_block_id +qualify(ROW_NUMBER() over (PARTITION BY id ORDER BY _inserted_timestamp DESC)) = 1 \ No newline at end of file diff --git a/models/silver/streamline/core/complete/streamline__complete_get_collections.sql b/models/silver/streamline/core/complete/streamline__complete_get_collections.sql index dd3d86a..9eea5a2 100644 --- a/models/silver/streamline/core/complete/streamline__complete_get_collections.sql +++ b/models/silver/streamline/core/complete/streamline__complete_get_collections.sql @@ -1,4 +1,3 @@ --- depends_on: {{ ref('bronze__streamline_collections') }} {{ config ( materialized = "incremental", unique_key = "id", diff --git a/models/silver/streamline/core/complete/streamline__complete_get_transaction_results.sql b/models/silver/streamline/core/complete/streamline__complete_get_transaction_results.sql new file mode 100644 index 0000000..a1da3dc --- /dev/null +++ b/models/silver/streamline/core/complete/streamline__complete_get_transaction_results.sql @@ -0,0 +1,34 @@ +-- depends_on: {{ ref('bronze__streamline_transactions') }} +{{ config ( + materialized = "incremental", + unique_key = "id", + cluster_by = "ROUND(block_number, -3)", + merge_update_columns = ["id"], + post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION on equality(id)" +) }} + +SELECT + id, + data, + block_number, + _partition_by_block_id, + _inserted_timestamp +FROM + +{% if is_incremental() %} +{{ ref('bronze__streamline_transaction_results') }} +WHERE + _inserted_timestamp >= ( + SELECT + MAX(_inserted_timestamp) _inserted_timestamp + FROM + {{ this }} + ) + +{% else %} + {{ ref('bronze__streamline_transaction_results') }} -- TODO: change to bronze__streamline_FR_transaction_results +{% endif %} + +qualify(ROW_NUMBER() over (PARTITION BY id +ORDER BY + _inserted_timestamp DESC)) = 1 \ No newline at end of file diff --git a/models/silver/streamline/core/complete/streamline__complete_get_transactions.sql b/models/silver/streamline/core/complete/streamline__complete_get_transactions.sql index 9ee00ad..9d3c58d 100644 --- a/models/silver/streamline/core/complete/streamline__complete_get_transactions.sql +++ b/models/silver/streamline/core/complete/streamline__complete_get_transactions.sql @@ -29,6 +29,6 @@ WHERE {{ ref('bronze__streamline_transactions') }} -- TODO: change to bronze__streamline_FR_transactions {% endif %} -qualify(ROW_NUMBER() over (PARTITION BY _partition_by_block_id +qualify(ROW_NUMBER() over (PARTITION BY id ORDER BY _inserted_timestamp DESC)) = 1 \ No newline at end of file diff --git a/models/silver/streamline/core/history/blocks/streamline__get_blocks_history_mainnet22.sql b/models/silver/streamline/core/history/blocks/streamline__get_blocks_history_mainnet22.sql index ea1704e..cf1e1cc 100644 --- a/models/silver/streamline/core/history/blocks/streamline__get_blocks_history_mainnet22.sql +++ b/models/silver/streamline/core/history/blocks/streamline__get_blocks_history_mainnet22.sql @@ -1,7 +1,7 @@ {{ config ( materialized = "view", post_hook = if_data_call_function( - func = "{{this.schema}}.udf_bulk_grpc(object_construct('sql_source', '{{this.identifier}}','method', 'get_block_by_height','node_url','access-001.mainnet22.nodes.onflow.org:9000','external_table', 'blocks', 'sql_limit', {{var('sql_limit','100000')}}, 'producer_batch_size', {{var('producer_batch_size','10000')}}, 'worker_batch_size', {{var('worker_batch_size','1000')}}, 'batch_call_limit', {{var('batch_call_limit','1')}}))", + func = "{{this.schema}}.udf_bulk_grpc(object_construct('sql_source', '{{this.identifier}}','method', 'get_block_by_height','node_url','access-001.mainnet22.nodes.onflow.org:9000','external_table', 'blocks', 'sql_limit', {{var('sql_limit','10000')}}, 'producer_batch_size', {{var('producer_batch_size','100')}}, 'worker_batch_size', {{var('worker_batch_size','10')}}, 'batch_call_limit', {{var('batch_call_limit','1')}}))", target = "{{this.schema}}.{{this.identifier}}" ) ) }} @@ -19,7 +19,12 @@ WITH blocks AS ( {{ ref("streamline__complete_get_blocks") }} ) SELECT - {{ generate_blocks_grpc_request(block_height) }} AS request + OBJECT_CONSTRUCT( + 'grpc', 'proto3', + 'method', 'get_block_by_height', + 'block_height', block_height, + 'method_params', OBJECT_CONSTRUCT('height', block_height) + ) AS request FROM blocks WHERE diff --git a/models/silver/streamline/core/history/collections/streamline__get_collections_history_mainnet22.sql b/models/silver/streamline/core/history/collections/streamline__get_collections_history_mainnet22.sql index baf883b..6e766e6 100644 --- a/models/silver/streamline/core/history/collections/streamline__get_collections_history_mainnet22.sql +++ b/models/silver/streamline/core/history/collections/streamline__get_collections_history_mainnet22.sql @@ -1,4 +1,3 @@ --- depends_on: {{ ref('streamline__get_blocks_history_mainnet22') }} {{ config ( materialized = "view", post_hook = if_data_call_function( @@ -29,7 +28,12 @@ collections AS ( JOIN blocks ON blocks.block_height = block_number ) SELECT - {{ generate_collections_grpc_request(block_height, collection_guarantee) }} AS request + OBJECT_CONSTRUCT( + 'grpc', 'proto3', + 'method', 'get_collection_by_i_d', + 'block_height', block_height::INTEGER, + 'method_params', OBJECT_CONSTRUCT('id', collection_guarantee.value:collection_id) + ) AS request FROM collections, LATERAL FLATTEN(input => data:collection_guarantees) AS collection_guarantee diff --git a/models/silver/streamline/core/history/transaction_results/streamline__get_transaction_results_history_mainnet22.sql b/models/silver/streamline/core/history/transaction_results/streamline__get_transaction_results_history_mainnet22.sql new file mode 100644 index 0000000..c7c4540 --- /dev/null +++ b/models/silver/streamline/core/history/transaction_results/streamline__get_transaction_results_history_mainnet22.sql @@ -0,0 +1,43 @@ +{{ config ( + materialized = "view", + post_hook = if_data_call_function( + func = "{{this.schema}}.udf_bulk_grpc(object_construct('sql_source', '{{this.identifier}}','method', 'get_transaction_result','node_url','access-001.mainnet22.nodes.onflow.org:9000','external_table', 'transaction_results', 'sql_limit', {{var('sql_limit','10000')}}, 'producer_batch_size', {{var('producer_batch_size','10000')}}, 'worker_batch_size', {{var('worker_batch_size','10000')}}, 'batch_call_limit', {{var('batch_call_limit','1')}}))", + target = "{{this.schema}}.{{this.identifier}}" + ) +) }} + +WITH blocks AS ( + + SELECT + block_height + FROM + {{ ref("streamline__blocks") }} + EXCEPT + SELECT + block_number as block_height + FROM + {{ ref("streamline__complete_get_transaction_results") }} +), +tx AS ( + + SELECT + block_number as block_height, + data + FROM + {{ ref('streamline__complete_get_collections') }} + JOIN blocks ON blocks.block_height = block_number +) +SELECT + OBJECT_CONSTRUCT( + 'grpc', 'proto3', + 'method', 'get_transaction_result', + 'block_height', block_height::INTEGER, + 'method_params', OBJECT_CONSTRUCT('id', transaction_id.value::string) + ) AS request +FROM + tx, + LATERAL FLATTEN(input => TRY_PARSE_JSON(data):transaction_ids) AS transaction_id +WHERE + block_height BETWEEN 47169687 AND 55114466 -- Mainnet22 block range +ORDER BY + block_height ASC diff --git a/models/silver/streamline/core/history/transactions/streamline__get_transactions_history_mainnet22.sql b/models/silver/streamline/core/history/transactions/streamline__get_transactions_history_mainnet22.sql index c4a3b37..92ec818 100644 --- a/models/silver/streamline/core/history/transactions/streamline__get_transactions_history_mainnet22.sql +++ b/models/silver/streamline/core/history/transactions/streamline__get_transactions_history_mainnet22.sql @@ -24,25 +24,19 @@ tx AS ( block_number as block_height, data FROM - {{ ref('bronze__streamline_collections') }} + {{ ref('streamline__complete_get_collections') }} JOIN blocks ON blocks.block_height = block_number ) SELECT - PARSE_JSON( - CONCAT( - '{"grpc": "proto3",', - '"method": "get_collection_by_i_d",', - '"block_height":"', - block_height :: INTEGER, - '",', - '"method_params": {"id":"', - transaction_ids, - '"}}' - ) + OBJECT_CONSTRUCT( + 'grpc', 'proto3', + 'method', 'get_transaction', + 'block_height', block_height::INTEGER, + 'method_params', OBJECT_CONSTRUCT('id', transaction_id.value::string) ) AS request FROM tx, - LATERAL FLATTEN(input => data:transaction_ids) AS transaction_ids + LATERAL FLATTEN(input => TRY_PARSE_JSON(data):transaction_ids) AS transaction_id WHERE block_height BETWEEN 47169687 AND 55114466 -- Mainnet22 block range ORDER BY diff --git a/models/sources.yml b/models/sources.yml index 29a6262..8768e4b 100644 --- a/models/sources.yml +++ b/models/sources.yml @@ -30,6 +30,7 @@ sources: - name: blocks - name: collections - name: transactions + - name: transaction_results - name: crosschain_v2 database: crosschain