An 4065/incremental load tuning (#215)

* modify how dbt performs merges by default

* require at least dbt 1.4

* adjust cluster keys, add incremental pred

* ensure query is being pruned on timestamp

* add cluster key
This commit is contained in:
desmond-hui 2023-10-26 07:02:53 -07:00 committed by GitHub
parent 45a07a1da8
commit 63ab57f844
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 64 additions and 46 deletions

View File

@ -5,6 +5,8 @@ name: "flow_models"
version: "1.2.0"
config-version: 2
require-dbt-version: ">=1.4.0"
# This setting configures which "profile" dbt uses for this project.
profile: "flow"

View File

@ -1,6 +1,7 @@
{{ config(
materialized = 'incremental',
unique_key = 'event_id',
cluster_by = "_inserted_timestamp::date",
tags = ['core', 'streamline_scheduled', 'scheduled']
) }}

View File

@ -1,8 +1,9 @@
-- depends_on: {{ ref('bronze__streamline_transaction_results') }}
{{ config(
materialized = 'incremental',
incremental_predicates = ['DBT_INTERNAL_DEST.block_number >= (select min(block_number) from ' ~ generate_tmp_view_name(this) ~ ')'],
unique_key = "tx_id",
cluster_by = "_inserted_timestamp::date",
cluster_by = ["block_number","_inserted_timestamp::date"],
tags = ['streamline_load', 'core']
) }}

View File

@ -1,3 +1,4 @@
-- depends_on: {{ ref('silver__streamline_transactions') }}
{{ config(
materialized = 'incremental',
unique_key = "tx_id",
@ -5,42 +6,67 @@
tags = ['core', 'streamline_scheduled', 'scheduled']
) }}
WITH retry_tx_ids AS (
{% if execute %}
{% set query = """
CREATE OR REPLACE TEMPORARY TABLE silver.streamline_transactions_final_intermediate_tmp AS
WITH retry_tx_ids AS (
SELECT
tx_id,
block_height
FROM """ ~ this ~ """
WHERE
_inserted_timestamp >= SYSDATE() - INTERVAL '3 days'
AND (
block_timestamp IS NULL
OR pending_result_response
)
)
SELECT
*
FROM
""" ~ ref('silver__streamline_transactions')
%}
{% set incr = "" %}
{% if is_incremental() %}
{% set incr = """
WHERE
_inserted_timestamp >= (
SELECT
MAX(_inserted_timestamp) _inserted_timestamp
FROM
""" ~ this ~ """
)
OR -- re-run record if block comes in later than tx records
(
_inserted_timestamp >= SYSDATE() - INTERVAL '3 days'
AND
tx_id IN (
SELECT
tx_id
FROM
retry_tx_ids
)
)
""" %}
{% endif %}
SELECT
tx_id,
block_height
FROM
{{ this }}
WHERE
_inserted_timestamp >= SYSDATE() - INTERVAL '3 days'
AND (
block_timestamp IS NULL
OR pending_result_response
)
),
txs AS (
{% set run = run_query(query ~ incr) %}
{% endif %}
/*
Do this because snowflake does not do well with dynamic query pruning.
This will set a "static" timestamp value which will always enable query pruning if the timestamp is a cluster key
Coalesce in case there are 0 txs returned by the temp table
*/
{% if execute %}
{% set min_time = run_query("select coalesce(min(_inserted_timestamp),current_timestamp()) from silver.streamline_transactions_final_intermediate_tmp").columns[0].values()[0] %}
{% endif %}
WITH txs AS (
SELECT
*
FROM
{{ ref('silver__streamline_transactions') }}
{% if is_incremental() %}
WHERE
_inserted_timestamp >= (
SELECT
MAX(_inserted_timestamp) _inserted_timestamp
FROM
{{ this }}
)
OR -- re-run record if block comes in later than tx records
tx_id IN (
SELECT
tx_id
FROM
retry_tx_ids
)
{% endif %}
silver.streamline_transactions_final_intermediate_tmp
),
tx_results AS (
SELECT
@ -50,19 +76,7 @@ tx_results AS (
{% if is_incremental() %}
WHERE
_inserted_timestamp >= (
SELECT
MAX(_inserted_timestamp) _inserted_timestamp
FROM
{{ this }}
)
OR -- re-run record if block comes in later than tx records
tx_id IN (
SELECT
tx_id
FROM
retry_tx_ids
)
_inserted_timestamp >= '{{ min_time }}'
{% endif %}
),
blocks AS (