From 63ab57f8442ab4be28f91c96663d0dab7ea8f337 Mon Sep 17 00:00:00 2001 From: desmond-hui <97470747+desmond-hui@users.noreply.github.com> Date: Thu, 26 Oct 2023 07:02:53 -0700 Subject: [PATCH] An 4065/incremental load tuning (#215) * modify how dbt performs merges by default * require at least dbt 1.4 * adjust cluster keys, add incremental pred * ensure query is being pruned on timestamp * add cluster key --- dbt_project.yml | 2 + .../silver/core/silver__streamline_events.sql | 1 + ...silver__streamline_transaction_results.sql | 3 +- .../silver__streamline_transactions_final.sql | 104 ++++++++++-------- 4 files changed, 64 insertions(+), 46 deletions(-) diff --git a/dbt_project.yml b/dbt_project.yml index 765b86b..e4a95a4 100644 --- a/dbt_project.yml +++ b/dbt_project.yml @@ -5,6 +5,8 @@ name: "flow_models" version: "1.2.0" config-version: 2 +require-dbt-version: ">=1.4.0" + # This setting configures which "profile" dbt uses for this project. profile: "flow" diff --git a/models/silver/core/silver__streamline_events.sql b/models/silver/core/silver__streamline_events.sql index 6b53dce..24f8a44 100644 --- a/models/silver/core/silver__streamline_events.sql +++ b/models/silver/core/silver__streamline_events.sql @@ -1,6 +1,7 @@ {{ config( materialized = 'incremental', unique_key = 'event_id', + cluster_by = "_inserted_timestamp::date", tags = ['core', 'streamline_scheduled', 'scheduled'] ) }} diff --git a/models/silver/core/silver__streamline_transaction_results.sql b/models/silver/core/silver__streamline_transaction_results.sql index 511475d..dd5a161 100644 --- a/models/silver/core/silver__streamline_transaction_results.sql +++ b/models/silver/core/silver__streamline_transaction_results.sql @@ -1,8 +1,9 @@ -- depends_on: {{ ref('bronze__streamline_transaction_results') }} {{ config( materialized = 'incremental', + incremental_predicates = ['DBT_INTERNAL_DEST.block_number >= (select min(block_number) from ' ~ generate_tmp_view_name(this) ~ ')'], unique_key = "tx_id", - cluster_by = "_inserted_timestamp::date", + cluster_by = ["block_number","_inserted_timestamp::date"], tags = ['streamline_load', 'core'] ) }} diff --git a/models/silver/core/silver__streamline_transactions_final.sql b/models/silver/core/silver__streamline_transactions_final.sql index bc8de8e..efe0d36 100644 --- a/models/silver/core/silver__streamline_transactions_final.sql +++ b/models/silver/core/silver__streamline_transactions_final.sql @@ -1,3 +1,4 @@ +-- depends_on: {{ ref('silver__streamline_transactions') }} {{ config( materialized = 'incremental', unique_key = "tx_id", @@ -5,42 +6,67 @@ tags = ['core', 'streamline_scheduled', 'scheduled'] ) }} -WITH retry_tx_ids AS ( +{% if execute %} + {% set query = """ + CREATE OR REPLACE TEMPORARY TABLE silver.streamline_transactions_final_intermediate_tmp AS + WITH retry_tx_ids AS ( + SELECT + tx_id, + block_height + FROM """ ~ this ~ """ + WHERE + _inserted_timestamp >= SYSDATE() - INTERVAL '3 days' + AND ( + block_timestamp IS NULL + OR pending_result_response + ) + ) + SELECT + * + FROM + """ ~ ref('silver__streamline_transactions') + %} + {% set incr = "" %} + {% if is_incremental() %} + {% set incr = """ + WHERE + _inserted_timestamp >= ( + SELECT + MAX(_inserted_timestamp) _inserted_timestamp + FROM + """ ~ this ~ """ + ) + OR -- re-run record if block comes in later than tx records + ( + _inserted_timestamp >= SYSDATE() - INTERVAL '3 days' + AND + tx_id IN ( + SELECT + tx_id + FROM + retry_tx_ids + ) + ) + """ %} + {% endif %} - SELECT - tx_id, - block_height - FROM - {{ this }} - WHERE - _inserted_timestamp >= SYSDATE() - INTERVAL '3 days' - AND ( - block_timestamp IS NULL - OR pending_result_response - ) -), -txs AS ( + {% set run = run_query(query ~ incr) %} +{% endif %} + +/* + Do this because snowflake does not do well with dynamic query pruning. + This will set a "static" timestamp value which will always enable query pruning if the timestamp is a cluster key + Coalesce in case there are 0 txs returned by the temp table +*/ +{% if execute %} +{% set min_time = run_query("select coalesce(min(_inserted_timestamp),current_timestamp()) from silver.streamline_transactions_final_intermediate_tmp").columns[0].values()[0] %} +{% endif %} + +WITH txs AS ( SELECT * FROM - {{ ref('silver__streamline_transactions') }} - -{% if is_incremental() %} -WHERE - _inserted_timestamp >= ( - SELECT - MAX(_inserted_timestamp) _inserted_timestamp - FROM - {{ this }} - ) - OR -- re-run record if block comes in later than tx records - tx_id IN ( - SELECT - tx_id - FROM - retry_tx_ids - ) -{% endif %} + silver.streamline_transactions_final_intermediate_tmp ), tx_results AS ( SELECT @@ -50,19 +76,7 @@ tx_results AS ( {% if is_incremental() %} WHERE - _inserted_timestamp >= ( - SELECT - MAX(_inserted_timestamp) _inserted_timestamp - FROM - {{ this }} - ) - OR -- re-run record if block comes in later than tx records - tx_id IN ( - SELECT - tx_id - FROM - retry_tx_ids - ) + _inserted_timestamp >= '{{ min_time }}' {% endif %} ), blocks AS (