diff --git a/Makefile b/Makefile index e5fcb53..31e4388 100644 --- a/Makefile +++ b/Makefile @@ -42,7 +42,7 @@ streamline: sl-flow-api udfs grant-streamline-privileges streamline_bronze streamline_bronze: dbt run \ - --vars '{"STREAMLINE_INVOKE_STREAMS":True, "STREAMLINE_USE_DEV_FOR_EXTERNAL_TABLES": True}' \ + --vars '{"STREAMLINE_USE_DEV_FOR_EXTERNAL_TABLES": False}' \ -m 1+models/silver/streamline/bronze \ --profiles-dir ~/.dbt \ --target $(DBT_TARGET) \ diff --git a/macros/streamline/get_base_table_udft.sql b/macros/streamline/get_base_table_udft.sql index 57cc6f6..911f251 100644 --- a/macros/streamline/get_base_table_udft.sql +++ b/macros/streamline/get_base_table_udft.sql @@ -3,17 +3,17 @@ create or replace function {{ schema }}.udtf_get_base_table(max_height integer) returns table (height number) as $$ -with base as ( - select - row_number() over ( - order by - seq4() - ) as height - from - table(generator(rowcount => 100000000)) -) + with base as ( + select + row_number() over ( + order by + seq4() + ) as id + from + table(generator(rowcount => 100000000)) -- July 2023 Flow Chain head is at 57M + ) select - height + id as height from base where diff --git a/models/silver/streamline/bronze/core/bronze__streamline_blocks.sql b/models/silver/streamline/bronze/core/bronze__streamline_blocks.sql index f7cb438..4f0fbd0 100644 --- a/models/silver/streamline/bronze/core/bronze__streamline_blocks.sql +++ b/models/silver/streamline/bronze/core/bronze__streamline_blocks.sql @@ -26,8 +26,7 @@ WITH meta AS ( s._partition_by_block_id, s.value AS VALUE FROM - streamline.FLOW_DEV.blocks - s + {{ source("bronze_streamline","blocks") }} s JOIN meta b ON b.file_name = metadata$filename AND b._partition_by_block_id = s._partition_by_block_id diff --git a/models/silver/streamline/bronze/core/bronze__streamline_FR_blocks.sql b/models/silver/streamline/bronze/core/bronze__streamline_fr_blocks.sql similarity index 94% rename from models/silver/streamline/bronze/core/bronze__streamline_FR_blocks.sql rename to models/silver/streamline/bronze/core/bronze__streamline_fr_blocks.sql index 2e0b778..b1af0e3 100644 --- a/models/silver/streamline/bronze/core/bronze__streamline_FR_blocks.sql +++ b/models/silver/streamline/bronze/core/bronze__streamline_fr_blocks.sql @@ -26,8 +26,7 @@ SELECT s._partition_by_block_id, s.value AS VALUE FROM - streamline.FLOW_DEV.blocks - s + {{ source("bronze_streamline","blocks") }} s JOIN meta b ON b.file_name = metadata$filename AND b._partition_by_block_id = s._partition_by_block_id diff --git a/models/silver/streamline/bronze/core/bronze__streamline_FR_collections.sql b/models/silver/streamline/bronze/core/bronze__streamline_fr_collections.sql similarity index 100% rename from models/silver/streamline/bronze/core/bronze__streamline_FR_collections.sql rename to models/silver/streamline/bronze/core/bronze__streamline_fr_collections.sql diff --git a/models/silver/streamline/bronze/core/bronze__streamline_FR_transaction_results.sql b/models/silver/streamline/bronze/core/bronze__streamline_fr_transaction_results.sql similarity index 100% rename from models/silver/streamline/bronze/core/bronze__streamline_FR_transaction_results.sql rename to models/silver/streamline/bronze/core/bronze__streamline_fr_transaction_results.sql diff --git a/models/silver/streamline/bronze/core/bronze__streamline_FR_transactions.sql b/models/silver/streamline/bronze/core/bronze__streamline_fr_transactions.sql similarity index 100% rename from models/silver/streamline/bronze/core/bronze__streamline_FR_transactions.sql rename to models/silver/streamline/bronze/core/bronze__streamline_fr_transactions.sql diff --git a/models/silver/streamline/core/complete/streamline__complete_get_blocks.sql b/models/silver/streamline/core/complete/streamline__complete_get_blocks.sql index 71c6919..4c57a37 100644 --- a/models/silver/streamline/core/complete/streamline__complete_get_blocks.sql +++ b/models/silver/streamline/core/complete/streamline__complete_get_blocks.sql @@ -18,11 +18,14 @@ FROM {% if is_incremental() %} {{ ref('bronze__streamline_blocks') }} WHERE - _inserted_timestamp >= ( - SELECT + _inserted_timestamp >= COALESCE( + ( + SELECT MAX(_inserted_timestamp) _inserted_timestamp - FROM - {{ this }} + FROM + {{ this }} + ), + '1900-01-01'::timestamp ) {% else %} {{ ref('bronze__streamline_fr_blocks') }} diff --git a/models/silver/streamline/core/complete/streamline__complete_get_collections.sql b/models/silver/streamline/core/complete/streamline__complete_get_collections.sql index 760d8e8..4fc55c2 100644 --- a/models/silver/streamline/core/complete/streamline__complete_get_collections.sql +++ b/models/silver/streamline/core/complete/streamline__complete_get_collections.sql @@ -18,17 +18,19 @@ FROM {% if is_incremental() %} {{ ref('bronze__streamline_collections') }} WHERE - _inserted_timestamp >= ( - SELECT + _inserted_timestamp >= COALESCE( + ( + SELECT MAX(_inserted_timestamp) _inserted_timestamp - FROM - {{ this }} + FROM + {{ this }} + ), + '1900-01-01'::timestamp ) - {% else %} {{ ref('bronze__streamline_fr_collections') }} {% endif %} -qualify(ROW_NUMBER() over (PARTITION BY _partition_by_block_id +qualify(ROW_NUMBER() over (PARTITION BY id ORDER BY _inserted_timestamp DESC)) = 1 \ No newline at end of file diff --git a/models/silver/streamline/core/complete/streamline__complete_get_transaction_results.sql b/models/silver/streamline/core/complete/streamline__complete_get_transaction_results.sql index c6b1462..b6fd819 100644 --- a/models/silver/streamline/core/complete/streamline__complete_get_transaction_results.sql +++ b/models/silver/streamline/core/complete/streamline__complete_get_transaction_results.sql @@ -18,13 +18,15 @@ FROM {% if is_incremental() %} {{ ref('bronze__streamline_transaction_results') }} WHERE - _inserted_timestamp >= ( - SELECT + _inserted_timestamp >= COALESCE( + ( + SELECT MAX(_inserted_timestamp) _inserted_timestamp - FROM - {{ this }} + FROM + {{ this }} + ), + '1900-01-01'::timestamp ) - {% else %} {{ ref('bronze__streamline_fr_transaction_results') }} {% endif %} diff --git a/models/silver/streamline/core/complete/streamline__complete_get_transactions.sql b/models/silver/streamline/core/complete/streamline__complete_get_transactions.sql index f26b23e..381c7e9 100644 --- a/models/silver/streamline/core/complete/streamline__complete_get_transactions.sql +++ b/models/silver/streamline/core/complete/streamline__complete_get_transactions.sql @@ -18,13 +18,15 @@ FROM {% if is_incremental() %} {{ ref('bronze__streamline_transactions') }} WHERE - _inserted_timestamp >= ( - SELECT + _inserted_timestamp >= COALESCE( + ( + SELECT MAX(_inserted_timestamp) _inserted_timestamp - FROM - {{ this }} + FROM + {{ this }} + ), + '1900-01-01'::timestamp ) - {% else %} {{ ref('bronze__streamline_fr_transactions') }} {% endif %} diff --git a/models/silver/streamline/core/history/blocks/streamline__get_blocks_history_mainnet22.sql b/models/silver/streamline/core/history/blocks/streamline__get_blocks_history_mainnet22.sql index cf1e1cc..ac51a9c 100644 --- a/models/silver/streamline/core/history/blocks/streamline__get_blocks_history_mainnet22.sql +++ b/models/silver/streamline/core/history/blocks/streamline__get_blocks_history_mainnet22.sql @@ -1,7 +1,7 @@ {{ config ( materialized = "view", post_hook = if_data_call_function( - func = "{{this.schema}}.udf_bulk_grpc(object_construct('sql_source', '{{this.identifier}}','method', 'get_block_by_height','node_url','access-001.mainnet22.nodes.onflow.org:9000','external_table', 'blocks', 'sql_limit', {{var('sql_limit','10000')}}, 'producer_batch_size', {{var('producer_batch_size','100')}}, 'worker_batch_size', {{var('worker_batch_size','10')}}, 'batch_call_limit', {{var('batch_call_limit','1')}}))", + func = "{{this.schema}}.udf_bulk_grpc(object_construct('sql_source', '{{this.identifier}}','node_url','access-001.mainnet22.nodes.onflow.org:9000','external_table', 'blocks', 'sql_limit', {{var('sql_limit','500000')}}, 'producer_batch_size', {{var('producer_batch_size','10000')}}, 'worker_batch_size', {{var('worker_batch_size','1000')}}, 'batch_call_limit', {{var('batch_call_limit','1')}}))", target = "{{this.schema}}.{{this.identifier}}" ) ) }} diff --git a/models/silver/streamline/core/history/collections/streamline__get_collections_history_mainnet22.sql b/models/silver/streamline/core/history/collections/streamline__get_collections_history_mainnet22.sql index 8e62ab8..9ada7ae 100644 --- a/models/silver/streamline/core/history/collections/streamline__get_collections_history_mainnet22.sql +++ b/models/silver/streamline/core/history/collections/streamline__get_collections_history_mainnet22.sql @@ -1,7 +1,7 @@ {{ config ( materialized = "view", post_hook = if_data_call_function( - func = "{{this.schema}}.udf_bulk_grpc(object_construct('sql_source', '{{this.identifier}}','method', 'get_collection_by_i_d','node_url','access-001.mainnet22.nodes.onflow.org:9000','external_table', 'collections', 'sql_limit', {{var('sql_limit','10000')}}, 'producer_batch_size', {{var('producer_batch_size','10000')}}, 'worker_batch_size', {{var('worker_batch_size','10000')}}, 'batch_call_limit', {{var('batch_call_limit','1')}}))", + func = "{{this.schema}}.udf_bulk_grpc(object_construct('sql_source', '{{this.identifier}}','node_url','access-001.mainnet22.nodes.onflow.org:9000','external_table', 'collections', 'sql_limit', {{var('sql_limit','500000')}}, 'producer_batch_size', {{var('producer_batch_size','10000')}}, 'worker_batch_size', {{var('worker_batch_size','1000')}}, 'batch_call_limit', {{var('batch_call_limit','1')}}))", target = "{{this.schema}}.{{this.identifier}}" ) ) }} diff --git a/models/silver/streamline/core/history/transaction_results/streamline__get_transaction_results_history_mainnet22.sql b/models/silver/streamline/core/history/transaction_results/streamline__get_transaction_results_history_mainnet22.sql index a67ce22..2d9a8dd 100644 --- a/models/silver/streamline/core/history/transaction_results/streamline__get_transaction_results_history_mainnet22.sql +++ b/models/silver/streamline/core/history/transaction_results/streamline__get_transaction_results_history_mainnet22.sql @@ -1,7 +1,7 @@ {{ config ( materialized = "view", post_hook = if_data_call_function( - func = "{{this.schema}}.udf_bulk_grpc(object_construct('sql_source', '{{this.identifier}}','method', 'get_transaction_result','node_url','access-001.mainnet22.nodes.onflow.org:9000','external_table', 'transaction_results', 'sql_limit', {{var('sql_limit','10000')}}, 'producer_batch_size', {{var('producer_batch_size','10000')}}, 'worker_batch_size', {{var('worker_batch_size','10000')}}, 'batch_call_limit', {{var('batch_call_limit','1')}}))", + func = "{{this.schema}}.udf_bulk_grpc(object_construct('sql_source', '{{this.identifier}}','node_url','access-001.mainnet22.nodes.onflow.org:9000','external_table', 'transaction_results', 'sql_limit', {{var('sql_limit','500000')}}, 'producer_batch_size', {{var('producer_batch_size','30000')}}, 'worker_batch_size', {{var('worker_batch_size','3000')}}, 'batch_call_limit', {{var('batch_call_limit','1')}}))", target = "{{this.schema}}.{{this.identifier}}" ) ) }} diff --git a/models/silver/streamline/core/history/transactions/streamline__get_transactions_history_mainnet22.sql b/models/silver/streamline/core/history/transactions/streamline__get_transactions_history_mainnet22.sql index 32afa70..f8643ec 100644 --- a/models/silver/streamline/core/history/transactions/streamline__get_transactions_history_mainnet22.sql +++ b/models/silver/streamline/core/history/transactions/streamline__get_transactions_history_mainnet22.sql @@ -1,7 +1,7 @@ {{ config ( materialized = "view", post_hook = if_data_call_function( - func = "{{this.schema}}.udf_bulk_grpc(object_construct('sql_source', '{{this.identifier}}','method', 'get_transaction','node_url','access-001.mainnet22.nodes.onflow.org:9000','external_table', 'transactions', 'sql_limit', {{var('sql_limit','10000')}}, 'producer_batch_size', {{var('producer_batch_size','10000')}}, 'worker_batch_size', {{var('worker_batch_size','10000')}}, 'batch_call_limit', {{var('batch_call_limit','1')}}))", + func = "{{this.schema}}.udf_bulk_grpc(object_construct('sql_source', '{{this.identifier}}','node_url','access-001.mainnet22.nodes.onflow.org:9000','external_table', 'transactions', 'sql_limit', {{var('sql_limit','500000')}}, 'producer_batch_size', {{var('producer_batch_size','30000')}}, 'worker_batch_size', {{var('worker_batch_size','3000')}}, 'batch_call_limit', {{var('batch_call_limit','1')}}))", target = "{{this.schema}}.{{this.identifier}}" ) ) }}