diff --git a/Makefile b/Makefile index 31e4388..a111bc2 100644 --- a/Makefile +++ b/Makefile @@ -2,7 +2,7 @@ SHELL := /bin/bash # set default target DBT_TARGET ?= sbx -AWS_LAMBDA_ROLE ?= aws_lambda_flow_api_sbx +AWS_LAMBDA_ROLE ?= aws_lambda_flow_api_dev dbt-console: docker-compose run dbt_console @@ -42,8 +42,40 @@ streamline: sl-flow-api udfs grant-streamline-privileges streamline_bronze streamline_bronze: dbt run \ - --vars '{"STREAMLINE_USE_DEV_FOR_EXTERNAL_TABLES": False}' \ + --vars '{"STREAMLINE_USE_DEV_FOR_EXTERNAL_TABLES": True}' \ -m 1+models/silver/streamline/bronze \ --profiles-dir ~/.dbt \ --target $(DBT_TARGET) \ - --profile flow \ No newline at end of file + --profile flow + +blocks_history: + dbt run \ + --vars '{"STREAMLINE_INVOKE_STREAMS":True, "STREAMLINE_USE_DEV_FOR_EXTERNAL_TABLES": True}' \ + -m 1+models/silver/streamline/core/history/blocks/streamline__get_blocks_history_mainnet22.sql \ + --profile flow \ + --target $(DBT_TARGET) \ + --profiles-dir ~/.dbt + +collections_history: + dbt run \ + --vars '{"STREAMLINE_INVOKE_STREAMS":True, "STREAMLINE_USE_DEV_FOR_EXTERNAL_TABLES": True}' \ + -m 1+models/silver/streamline/core/history/collections/streamline__get_collections_history_mainnet22.sql \ + --profile flow \ + --target $(DBT_TARGET) \ + --profiles-dir ~/.dbt + +tx_history: + dbt run \ + --vars '{"STREAMLINE_INVOKE_STREAMS":True, "STREAMLINE_USE_DEV_FOR_EXTERNAL_TABLES": True}' \ + -m 1+models/silver/streamline/core/history/transactions/streamline__get_transactions_history_mainnet22.sql \ + --profile flow \ + --target $(DBT_TARGET) \ + --profiles-dir ~/.dbt + +tx_results_history: + dbt run \ + --vars '{"STREAMLINE_INVOKE_STREAMS":True, "STREAMLINE_USE_DEV_FOR_EXTERNAL_TABLES": True}' \ + -m 1+models/silver/streamline/core/history/transaction_results/streamline__get_transaction_results_history_mainnet22.sql \ + --profile flow \ + --target $(DBT_TARGET) \ + --profiles-dir ~/.dbt \ No newline at end of file diff --git a/models/silver/streamline/core/history/collections/streamline__get_collections_history_mainnet22.sql b/models/silver/streamline/core/history/collections/streamline__get_collections_history_mainnet22.sql index 9ada7ae..65244a0 100644 --- a/models/silver/streamline/core/history/collections/streamline__get_collections_history_mainnet22.sql +++ b/models/silver/streamline/core/history/collections/streamline__get_collections_history_mainnet22.sql @@ -6,39 +6,42 @@ ) ) }} -WITH blocks AS ( - +-- CTE to get all block_heights and their associated collection_ids from the complete_get_blocks table +WITH block_collections AS ( SELECT - block_height + cb.block_number AS block_height, + collection_guarantee.value:collection_id AS collection_id FROM - {{ ref("streamline__blocks") }} - EXCEPT - SELECT - block_number as block_height - FROM - {{ ref("streamline__complete_get_collections") }} + {{ ref("streamline__complete_get_blocks") }} cb, + LATERAL FLATTEN(input => cb.data:collection_guarantees) AS collection_guarantee ), -collections AS ( +-- CTE to identify collections that haven't been ingested yet +collections_to_ingest AS ( SELECT - block_number as block_height, - data + bc.block_height, + bc.collection_id FROM - {{ ref('streamline__complete_get_blocks') }} - JOIN blocks ON blocks.block_height = block_number + block_collections bc + LEFT JOIN + {{ ref("streamline__complete_get_collections") }} c + ON bc.block_height = c.block_number + AND bc.collection_id = c.id + WHERE + c.id IS NULL ) + +-- Generate the requests based on the missing collections SELECT OBJECT_CONSTRUCT( 'grpc', 'proto3', 'method', 'get_collection_by_i_d', 'block_height', block_height::INTEGER, - 'method_params', OBJECT_CONSTRUCT('id', collection_guarantee.value:collection_id) + 'method_params', OBJECT_CONSTRUCT('id', collection_id) ) AS request - FROM - collections, - LATERAL FLATTEN(input => data:collection_guarantees) AS collection_guarantee + collections_to_ingest WHERE block_height BETWEEN 47169687 AND 55114466 -- Mainnet22 block range ORDER BY - block_height ASC + block_height ASC \ No newline at end of file diff --git a/models/silver/streamline/core/history/transaction_results/streamline__get_transaction_results_history_mainnet22.sql b/models/silver/streamline/core/history/transaction_results/streamline__get_transaction_results_history_mainnet22.sql index 2d9a8dd..5bfcabc 100644 --- a/models/silver/streamline/core/history/transaction_results/streamline__get_transaction_results_history_mainnet22.sql +++ b/models/silver/streamline/core/history/transaction_results/streamline__get_transaction_results_history_mainnet22.sql @@ -6,39 +6,39 @@ ) ) }} -WITH blocks AS ( - +-- CTE to get all transaction_ids from the complete_get_collections table +WITH collection_transactions AS ( SELECT - block_height + block_number AS block_height, + transaction.value::STRING AS transaction_id FROM - {{ ref("streamline__blocks") }} - EXCEPT - SELECT - block_number as block_height - FROM - {{ ref("streamline__complete_get_transaction_results") }} + {{ ref("streamline__complete_get_collections") }} cc, + LATERAL FLATTEN(input => cc.data:transaction_ids) AS transaction ), -tx AS ( +-- CTE to identify transaction_results that haven't been ingested yet +transaction_results_to_ingest AS ( SELECT - block_number as block_height, - data + ct.block_height, + ct.transaction_id FROM - {{ ref('streamline__complete_get_collections') }} - JOIN blocks ON blocks.block_height = block_number + collection_transactions ct + LEFT JOIN + {{ ref("streamline__complete_get_transaction_results") }} tr ON ct.transaction_id = tr.id + WHERE + tr.id IS NULL ) + +-- Generate the requests column based on the missing transactions SELECT OBJECT_CONSTRUCT( 'grpc', 'proto3', 'method', 'get_transaction_result', 'block_height', block_height::INTEGER, - 'transaction_id', transaction_id.value::string, - 'method_params', OBJECT_CONSTRUCT('id', transaction_id.value::string) + 'transaction_id', transaction_id::STRING, + 'method_params', OBJECT_CONSTRUCT('id', transaction_id::STRING) ) AS request FROM - tx, - LATERAL FLATTEN(input => TRY_PARSE_JSON(data):transaction_ids) AS transaction_id -WHERE - block_height BETWEEN 47169687 AND 55114466 -- Mainnet22 block range + transaction_results_to_ingest ORDER BY - block_height ASC + block_height ASC \ No newline at end of file diff --git a/models/silver/streamline/core/history/transactions/streamline__get_transactions_history_mainnet22.sql b/models/silver/streamline/core/history/transactions/streamline__get_transactions_history_mainnet22.sql index f8643ec..b215bae 100644 --- a/models/silver/streamline/core/history/transactions/streamline__get_transactions_history_mainnet22.sql +++ b/models/silver/streamline/core/history/transactions/streamline__get_transactions_history_mainnet22.sql @@ -6,39 +6,40 @@ ) ) }} -WITH blocks AS ( - +-- CTE to get all transaction_ids from the complete_get_collections table +WITH collection_transactions AS ( SELECT - block_height + block_number AS block_height, + transaction.value::STRING AS transaction_id FROM - {{ ref("streamline__blocks") }} - EXCEPT - SELECT - block_number as block_height - FROM - {{ ref("streamline__complete_get_transactions") }} + {{ ref("streamline__complete_get_collections") }} cc, + LATERAL FLATTEN(input => cc.data:transaction_ids) AS transaction ), -tx AS ( +-- CTE to identify transactions that haven't been ingested yet +transactions_to_ingest AS ( SELECT - block_number as block_height, - data + ct.block_height, + ct.transaction_id FROM - {{ ref('streamline__complete_get_collections') }} - JOIN blocks ON blocks.block_height = block_number + collection_transactions ct + LEFT JOIN + {{ ref("streamline__complete_get_transactions") }} t ON ct.transaction_id = t.id + WHERE + t.id IS NULL ) + +-- Generate the requests based on the missing transactions SELECT OBJECT_CONSTRUCT( 'grpc', 'proto3', 'method', 'get_transaction', 'block_height', block_height::INTEGER, - 'transaction_id', transaction_id.value::string, - 'method_params', OBJECT_CONSTRUCT('id', transaction_id.value::string) + 'transaction_id', transaction_id::STRING, + 'method_params', OBJECT_CONSTRUCT('id', transaction_id::STRING) ) AS request FROM - tx, - LATERAL FLATTEN(input => TRY_PARSE_JSON(data):transaction_ids) AS transaction_id -WHERE - block_height BETWEEN 47169687 AND 55114466 -- Mainnet22 block range + transactions_to_ingest ORDER BY block_height ASC +