diff --git a/macros/main_package/bronze/streamline_external_table_queries.sql b/macros/main_package/bronze/streamline_external_table_queries.sql index 01750c2..d64484c 100644 --- a/macros/main_package/bronze/streamline_external_table_queries.sql +++ b/macros/main_package/bronze/streamline_external_table_queries.sql @@ -30,7 +30,7 @@ b.partition_key = s.partition_key {% endmacro %} -{% macro streamline_external_table_FR_query_v2( +{% macro streamline_external_table_query_fr( source_name, partition_function="CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 4), '_', 1) AS INTEGER)" ) %} diff --git a/models/main_package/core/bronze/streamline/bronze__tx_counts.sql b/models/main_package/core/bronze/streamline/bronze__tx_count.sql similarity index 77% rename from models/main_package/core/bronze/streamline/bronze__tx_counts.sql rename to models/main_package/core/bronze/streamline/bronze__tx_count.sql index fd8d9ba..213f222 100644 --- a/models/main_package/core/bronze/streamline/bronze__tx_counts.sql +++ b/models/main_package/core/bronze/streamline/bronze__tx_count.sql @@ -4,5 +4,5 @@ ) }} {{ streamline_external_table_query( - source_name = 'tx_counts' + source_name = 'tx_count' ) }} \ No newline at end of file diff --git a/models/main_package/core/bronze/streamline/full_refresh/bronze__tx_counts_fr.sql b/models/main_package/core/bronze/streamline/full_refresh/bronze__tx_count_fr.sql similarity index 78% rename from models/main_package/core/bronze/streamline/full_refresh/bronze__tx_counts_fr.sql rename to models/main_package/core/bronze/streamline/full_refresh/bronze__tx_count_fr.sql index e1ce8d9..d2cf938 100644 --- a/models/main_package/core/bronze/streamline/full_refresh/bronze__tx_counts_fr.sql +++ b/models/main_package/core/bronze/streamline/full_refresh/bronze__tx_count_fr.sql @@ -4,5 +4,5 @@ ) }} {{ streamline_external_table_query_fr( - source_name = 'tx_counts' + source_name = 'tx_count' ) }} \ No newline at end of file diff --git a/models/main_package/core/streamline/complete/streamline__blocks_complete.sql b/models/main_package/core/streamline/complete/streamline__blocks_complete.sql index c13b18e..f36759e 100644 --- a/models/main_package/core/streamline/complete/streamline__blocks_complete.sql +++ b/models/main_package/core/streamline/complete/streamline__blocks_complete.sql @@ -16,7 +16,8 @@ SELECT DATA :result :block :header :time :: TIMESTAMP AS block_timestamp, ARRAY_SIZE( DATA :result :block :data :txs - ) tx_count {{ dbt_utils.generate_surrogate_key( + ) tx_count, + {{ dbt_utils.generate_surrogate_key( ['block_id'] ) }} AS complete_blocks_id, SYSDATE() AS inserted_timestamp, diff --git a/models/main_package/core/streamline/complete/streamline__tx_counts_complete.sql b/models/main_package/core/streamline/complete/streamline__tx_count_complete.sql similarity index 87% rename from models/main_package/core/streamline/complete/streamline__tx_counts_complete.sql rename to models/main_package/core/streamline/complete/streamline__tx_count_complete.sql index 4ee6650..c035a22 100644 --- a/models/main_package/core/streamline/complete/streamline__tx_counts_complete.sql +++ b/models/main_package/core/streamline/complete/streamline__tx_count_complete.sql @@ -1,7 +1,7 @@ {# Get variables #} {% set vars = return_vars() %} --- depends_on: {{ ref('bronze__tx_counts') }} +-- depends_on: {{ ref('bronze__tx_count') }} {{ config ( materialized = "incremental", @@ -26,7 +26,7 @@ SELECT FROM {% if is_incremental() %} -{{ ref('bronze__tx_counts') }} +{{ ref('bronze__tx_count') }} WHERE inserted_timestamp >= ( SELECT @@ -34,9 +34,8 @@ WHERE FROM {{ this }} ) - AND block_id NOT IN (21208991) {% else %} - {{ ref('bronze__tx_counts_fr') }} + {{ ref('bronze__tx_count_fr') }} {% endif %} qualify(ROW_NUMBER() over (PARTITION BY block_id diff --git a/models/main_package/core/streamline/realtime/streamline__blocks_realtime.sql b/models/main_package/core/streamline/realtime/streamline__blocks_realtime.sql index ea5872b..121d885 100644 --- a/models/main_package/core/streamline/realtime/streamline__blocks_realtime.sql +++ b/models/main_package/core/streamline/realtime/streamline__blocks_realtime.sql @@ -53,14 +53,14 @@ LIMIT {{ vars.MAIN_SL_BLOCKS_REALTIME_SQL_LIMIT }} 'producer_batch_size': vars.MAIN_SL_BLOCKS_REALTIME_PRODUCER_BATCH_SIZE, 'worker_batch_size': vars.MAIN_SL_BLOCKS_REALTIME_WORKER_BATCH_SIZE, 'async_concurrent_requests': vars.MAIN_SL_BLOCKS_REALTIME_ASYNC_CONCURRENT_REQUESTS, - 'sql_source': "{{this.identifier}}", + 'sql_source' : this.identifier "order_by_column": "block_id" } %} {% set function_call_sql %} {{ fsc_utils.if_data_call_function_v2( func = 'streamline.udf_bulk_rest_api_v2', - target = '{{this.schema}}.{{this.identifier}}', + target = this.schema ~ '.' ~ this.identifier, params = params ) }} {% endset %} diff --git a/models/main_package/core/streamline/realtime/streamline__transactions_realtime.sql b/models/main_package/core/streamline/realtime/streamline__transactions_realtime.sql index cb86803..9ef575c 100644 --- a/models/main_package/core/streamline/realtime/streamline__transactions_realtime.sql +++ b/models/main_package/core/streamline/realtime/streamline__transactions_realtime.sql @@ -14,6 +14,7 @@ WITH blocks AS ( SELECT A.block_id, + A.block_timestamp, tx_count FROM {{ ref('streamline__blocks_complete') }} A @@ -39,6 +40,7 @@ numbers AS ( blocks_with_page_numbers AS ( SELECT tt.block_id :: INT AS block_id, + tt.block_timestamp, n.n AS page_number FROM blocks tt @@ -52,6 +54,7 @@ numbers AS ( EXCEPT SELECT block_id, + null as block_timestamp, -- placeholder for now... page_number FROM {{ ref('streamline__transactions_complete') }} @@ -98,7 +101,7 @@ LIMIT {{ vars.MAIN_SL_TRANSACTIONS_REALTIME_SQL_LIMIT }} 'producer_batch_size': vars.MAIN_SL_TRANSACTIONS_REALTIME_PRODUCER_BATCH_SIZE, 'worker_batch_size': vars.MAIN_SL_TRANSACTIONS_REALTIME_WORKER_BATCH_SIZE, 'async_concurrent_requests': vars.MAIN_SL_TRANSACTIONS_REALTIME_ASYNC_CONCURRENT_REQUESTS, - 'sql_source': "{{this.identifier}}", + 'sql_source' : this.identifier 'exploded_key': '["result.txs"]', "order_by_column": "block_id_requested" } %} @@ -106,7 +109,7 @@ LIMIT {{ vars.MAIN_SL_TRANSACTIONS_REALTIME_SQL_LIMIT }} {% set function_call_sql %} {{ fsc_utils.if_data_call_function_v2( func = 'streamline.udf_bulk_rest_api_v2', - target = '{{this.schema}}.{{this.identifier}}', + target = this.schema ~ '.' ~ this.identifier, params = params ) }} {% endset %} diff --git a/models/main_package/core/streamline/realtime/streamline__tx_counts_realtime.sql b/models/main_package/core/streamline/realtime/streamline__tx_counts_realtime.sql index b935709..5a3f1b4 100644 --- a/models/main_package/core/streamline/realtime/streamline__tx_counts_realtime.sql +++ b/models/main_package/core/streamline/realtime/streamline__tx_counts_realtime.sql @@ -20,7 +20,7 @@ WITH blocks AS ( SELECT block_id FROM - {{ ref('streamline__tx_counts_complete') }} + {{ ref('streamline__tx_count_complete') }} ), {# retry AS ( SELECT @@ -80,13 +80,13 @@ ORDER BY 'producer_batch_size': vars.MAIN_SL_TX_COUNTS_REALTIME_PRODUCER_BATCH_SIZE, 'worker_batch_size': vars.MAIN_SL_TX_COUNTS_REALTIME_WORKER_BATCH_SIZE, 'async_concurrent_requests': vars.MAIN_SL_TX_COUNTS_REALTIME_ASYNC_CONCURRENT_REQUESTS, - 'sql_source' :"{{this.identifier}}" + 'sql_source' : this.identifier } %} {% set function_call_sql %} {{ fsc_utils.if_data_call_function_v2( func = 'streamline.udf_bulk_rest_api_v2', - target = '{{this.schema}}.{{this.identifier}}', + target = this.schema ~ '.' ~ this.identifier, params = params ) }} {% endset %}