From 1da00ea9c4a8b4c8b5d60a8da09b8ef912ecd88a Mon Sep 17 00:00:00 2001 From: Eric Laurello Date: Wed, 8 Jan 2025 17:43:19 -0500 Subject: [PATCH] still very slow --- macros/streamline/models.sql | 2 +- .../streamline__block_txs_complete.sql | 69 +++++++++++++------ 2 files changed, 48 insertions(+), 23 deletions(-) diff --git a/macros/streamline/models.sql b/macros/streamline/models.sql index 4cdcbb4..5718d58 100644 --- a/macros/streamline/models.sql +++ b/macros/streamline/models.sql @@ -13,7 +13,7 @@ WITH meta AS ( FROM TABLE( information_schema.external_table_file_registration_history( - start_time => DATEADD('day', -7, CURRENT_TIMESTAMP()), + start_time => DATEADD('day', -3, CURRENT_TIMESTAMP()), table_name => '{{ source( "bronze_streamline", model) }}') ) A ) diff --git a/models/streamline/core/complete/streamline__block_txs_complete.sql b/models/streamline/core/complete/streamline__block_txs_complete.sql index a9123d3..6fc6a9d 100644 --- a/models/streamline/core/complete/streamline__block_txs_complete.sql +++ b/models/streamline/core/complete/streamline__block_txs_complete.sql @@ -1,6 +1,5 @@ -- depends_on: {{ ref('bronze__transactions') }} -- depends_on: {{ ref('bronze__FR_transactions') }} - {{ config ( materialized = "incremental", unique_key = 'block_id', @@ -8,35 +7,61 @@ cluster_by = "ROUND(block_id, -5)", ) }} +{% if execute %} + +{% if is_incremental() %} +{% set max_pk_query %} + +SELECT + COALESCE(MAX(partition_key), 0) - 1000000 AS partition_key +FROM + {{ this }} + + {% endset %} + {% set max_pk = run_query(max_pk_query) [0] [0] %} + {% set max_inserted_query %} +SELECT + MAX(_inserted_timestamp) AS _inserted_timestamp +FROM + {{ this }} + + {% endset %} + {% set max_inserted_timestamp = run_query(max_inserted_query) [0] [0] %} +{% endif %} +{% endif %} + +{% set base_query %} +CREATE +OR REPLACE temporary TABLE streamline.blocks_txs__intermediate_tmp AS SELECT block_id, error, partition_key, _inserted_timestamp, - sysdate() AS inserted_timestamp, - sysdate() AS modified_timestamp, - '{{ invocation_id }}' AS _invocation_id, FROM + {% if is_incremental() %} - {{ ref('bronze__transactions') }} +{{ ref('bronze__transactions') }} WHERE - _inserted_timestamp >= ( - SELECT - COALESCE(MAX(_INSERTED_TIMESTAMP), '1970-01-01' :: DATE) max_INSERTED_TIMESTAMP - FROM - {{ this }} - ) - AND partition_key > ( - SELECT - COALESCE( - MAX(partition_key), - 0 - ) - 1000000 - FROM - {{ this }} - ) + _inserted_timestamp >= '{{ max_inserted_timestamp }}' + AND partition_key >= {{ max_pk }} {% else %} {{ ref('bronze__FR_transactions') }} {% endif %} -QUALIFY - row_number() OVER (PARTITION BY block_id ORDER BY _inserted_timestamp DESC) = 1 \ No newline at end of file + +{% endset %} +{% do run_query(base_query) %} +SELECT + block_id, + error, + partition_key, + _inserted_timestamp, + SYSDATE() AS inserted_timestamp, + SYSDATE() AS modified_timestamp, + '{{ invocation_id }}' AS _invocation_id, +FROM + streamline.blocks_txs__intermediate_tmp qualify ROW_NUMBER() over ( + PARTITION BY block_id + ORDER BY + _inserted_timestamp DESC + ) = 1