From b19d800f98ff9311352aa3bc8ea875fb0f491867 Mon Sep 17 00:00:00 2001 From: Eric Laurello Date: Wed, 17 Jul 2024 13:12:53 -0400 Subject: [PATCH] fix clustering, spelling, custom recency test --- macros/tests/recency_where.sql | 48 +++++++++++++++++++ models/gold/defi/defi__fact_dex_swaps.yml | 9 ++-- .../defi/dex/silver__dex_swaps_aires.sql | 6 +-- .../defi/dex/silver__dex_swaps_animeswap.sql | 39 +++++++-------- .../dex/silver__dex_swaps_auxexchange.sql | 39 +++++++-------- .../defi/dex/silver__dex_swaps_batswap.sql | 10 ++-- .../defi/dex/silver__dex_swaps_cellana.sql | 39 +++++++-------- .../defi/dex/silver__dex_swaps_cetus.sql | 42 +++++++--------- .../defi/dex/silver__dex_swaps_combined.sql | 3 +- .../defi/dex/silver__dex_swaps_hippo.sql | 39 +++++++-------- .../defi/dex/silver__dex_swaps_liquidswap.sql | 39 +++++++-------- .../defi/dex/silver__dex_swaps_pancake.sql | 39 +++++++-------- .../defi/dex/silver__dex_swaps_sushi.sql | 39 +++++++-------- .../defi/dex/silver__dex_swaps_thala.sql | 39 +++++++-------- .../defi/dex/silver__dex_swaps_tsunami.sql | 11 ++--- 15 files changed, 220 insertions(+), 221 deletions(-) create mode 100644 macros/tests/recency_where.sql diff --git a/macros/tests/recency_where.sql b/macros/tests/recency_where.sql new file mode 100644 index 0000000..95204a6 --- /dev/null +++ b/macros/tests/recency_where.sql @@ -0,0 +1,48 @@ +{% test recency_where(model, field, datepart, interval, where, ignore_time_component=False, group_by_columns = []) %} + {{ return(adapter.dispatch('test_recency', 'dbt_utils')(model, field, datepart, interval, ignore_time_component, group_by_columns, where)) }} +{% endtest %} + +{% macro default__test_recency(model, field, datepart, interval, ignore_time_component, group_by_columns, where) %} + +{% set threshold = 'cast(' ~ dbt.dateadd(datepart, interval * -1, dbt.current_timestamp()) ~ ' as ' ~ ('date' if ignore_time_component else dbt.type_timestamp()) ~ ')' %} + +{% if group_by_columns|length() > 0 %} + {% set select_gb_cols = group_by_columns|join(' ,') + ', ' %} + {% set groupby_gb_cols = 'group by ' + group_by_columns|join(',') %} +{% endif %} + +{% if where|length() > 0 %} + {% set where_clause_val = where %} +{% else %} + {% set where_clause_val = '1=1' %} +{% endif %} + + +with recency as ( + + select + + {{ select_gb_cols }} + {% if ignore_time_component %} + cast(max({{ field }}) as date) as most_recent + {%- else %} + max({{ field }}) as most_recent + {%- endif %} + + from {{ model }} + where {{ where_clause_val }} + + {{ groupby_gb_cols }} + +) + +select + + {{ select_gb_cols }} + most_recent, + {{ threshold }} as threshold + +from recency +where most_recent < {{ threshold }} + +{% endmacro %} \ No newline at end of file diff --git a/models/gold/defi/defi__fact_dex_swaps.yml b/models/gold/defi/defi__fact_dex_swaps.yml index 2877fad..a480510 100644 --- a/models/gold/defi/defi__fact_dex_swaps.yml +++ b/models/gold/defi/defi__fact_dex_swaps.yml @@ -7,6 +7,12 @@ models: combination_of_columns: - TX_HASH - EVENT_INDEX + - recency_where: + field: BLOCK_TIMESTAMP + datepart: day + interval: 1 + group_by_columns: [Platform] + where: "PLATFORM <> 'tsunami'" columns: - name: BLOCK_NUMBER description: '{{ doc("block_number") }}' @@ -16,9 +22,6 @@ models: description: '{{ doc("block_timestamp") }}' tests: - not_null - - dbt_expectations.expect_row_values_to_have_recent_data: - datepart: day - interval: 1 - name: TX_HASH description: '{{ doc("tx_hash") }}' tests: diff --git a/models/silver/defi/dex/silver__dex_swaps_aires.sql b/models/silver/defi/dex/silver__dex_swaps_aires.sql index ed0a712..9a35d09 100644 --- a/models/silver/defi/dex/silver__dex_swaps_aires.sql +++ b/models/silver/defi/dex/silver__dex_swaps_aires.sql @@ -3,7 +3,7 @@ unique_key = "dex_swaps_aires_id", incremental_strategy = 'merge', merge_exclude_columns = ["inserted_timestamp"], - cluster_by = ['block_timestamp::DATE','_inserted_timestamp::DATE'], + cluster_by = ['modified_timestamp::DATE'], tags = ['noncore'], enabled = false ) }} @@ -34,9 +34,9 @@ WITH evnts AS ( AND block_timestamp :: DATE >= '2024-06-01' {% if is_incremental() %} -AND _inserted_timestamp >= ( +AND modified_timestamp >= ( SELECT - MAX(_inserted_timestamp) + MAX(modified_timestamp) FROM {{ this }} ) diff --git a/models/silver/defi/dex/silver__dex_swaps_animeswap.sql b/models/silver/defi/dex/silver__dex_swaps_animeswap.sql index 1a28a2d..a55aa17 100644 --- a/models/silver/defi/dex/silver__dex_swaps_animeswap.sql +++ b/models/silver/defi/dex/silver__dex_swaps_animeswap.sql @@ -3,7 +3,7 @@ unique_key = "dex_swaps_animeswap_id", incremental_strategy = 'merge', merge_exclude_columns = ["inserted_timestamp"], - cluster_by = ['block_timestamp::DATE','_inserted_timestamp::DATE'], + cluster_by = ['modified_timestamp::DATE'], tags = ['noncore'] ) }} @@ -12,22 +12,14 @@ WITH tx AS ( SELECT tx_hash, block_timestamp, - sender + sender, + modified_timestamp FROM {{ ref( 'silver__transactions' ) }} WHERE success - -{% if is_incremental() %} -AND _inserted_timestamp >= ( - SELECT - MAX(_inserted_timestamp) - FROM - {{ this }} -) -{% endif %} ), evnts AS ( SELECT @@ -41,7 +33,7 @@ evnts AS ( event_resource, event_data, event_type, - _inserted_timestamp + modified_timestamp FROM {{ ref( 'silver__events' @@ -50,15 +42,6 @@ evnts AS ( event_address = '0x16fe2df00ea7dde4a63409201f7f4e536bde7bb7335526a35d05111e68aa322c' AND event_resource ILIKE 'SwapEvent%' AND success - -{% if is_incremental() %} -AND _inserted_timestamp >= ( - SELECT - MAX(_inserted_timestamp) - FROM - {{ this }} -) -{% endif %} ) SELECT block_number, @@ -89,7 +72,6 @@ SELECT ) }} AS dex_swaps_animeswap_id, SYSDATE() AS inserted_timestamp, SYSDATE() AS modified_timestamp, - _inserted_timestamp, '{{ invocation_id }}' AS _invocation_id FROM evnts A @@ -97,3 +79,16 @@ FROM tx_hash, block_timestamp ) + +{% if is_incremental() %} +WHERE + GREATEST( + A.modified_timestamp, + b.modified_timestamp + ) >= ( + SELECT + MAX(modified_timestamp) + FROM + {{ this }} + ) +{% endif %} diff --git a/models/silver/defi/dex/silver__dex_swaps_auxexchange.sql b/models/silver/defi/dex/silver__dex_swaps_auxexchange.sql index 7a9299e..10c8e2f 100644 --- a/models/silver/defi/dex/silver__dex_swaps_auxexchange.sql +++ b/models/silver/defi/dex/silver__dex_swaps_auxexchange.sql @@ -3,7 +3,7 @@ unique_key = "dex_swaps_aux_exchange_id", incremental_strategy = 'merge', merge_exclude_columns = ["inserted_timestamp"], - cluster_by = ['block_timestamp::DATE','_inserted_timestamp::DATE'], + cluster_by = ['modified_timestamp::DATE'], tags = ['noncore'] ) }} @@ -12,22 +12,14 @@ WITH tx AS ( SELECT tx_hash, block_timestamp, - sender + sender, + modified_timestamp FROM {{ ref( 'silver__transactions' ) }} WHERE success - -{% if is_incremental() %} -AND _inserted_timestamp >= ( - SELECT - MAX(_inserted_timestamp) - FROM - {{ this }} -) -{% endif %} ), evnts AS ( SELECT @@ -41,7 +33,7 @@ evnts AS ( event_resource, event_data, event_type, - _inserted_timestamp + modified_timestamp FROM {{ ref( 'silver__events' @@ -50,15 +42,6 @@ evnts AS ( event_address = '0xbd35135844473187163ca197ca93b2ab014370587bb0ed3befff9e902d6bb541' AND event_resource ILIKE 'SwapEvent%' AND success - -{% if is_incremental() %} -AND _inserted_timestamp >= ( - SELECT - MAX(_inserted_timestamp) - FROM - {{ this }} -) -{% endif %} ) SELECT block_number, @@ -77,7 +60,6 @@ SELECT ) }} AS dex_swaps_aux_exchange_id, SYSDATE() AS inserted_timestamp, SYSDATE() AS modified_timestamp, - _inserted_timestamp, '{{ invocation_id }}' AS _invocation_id FROM evnts A @@ -85,3 +67,16 @@ FROM tx_hash, block_timestamp ) + +{% if is_incremental() %} +WHERE + GREATEST( + A.modified_timestamp, + b.modified_timestamp + ) >= ( + SELECT + MAX(modified_timestamp) + FROM + {{ this }} + ) +{% endif %} diff --git a/models/silver/defi/dex/silver__dex_swaps_batswap.sql b/models/silver/defi/dex/silver__dex_swaps_batswap.sql index 0e57707..a8d0151 100644 --- a/models/silver/defi/dex/silver__dex_swaps_batswap.sql +++ b/models/silver/defi/dex/silver__dex_swaps_batswap.sql @@ -3,7 +3,7 @@ unique_key = "dex_swaps_batswap_id", incremental_strategy = 'merge', merge_exclude_columns = ["inserted_timestamp"], - cluster_by = ['block_timestamp::DATE','_inserted_timestamp::DATE'], + cluster_by = ['modified_timestamp::DATE'], tags = ['noncore'] ) }} @@ -19,8 +19,7 @@ WITH evnts AS ( event_address, event_resource, event_data, - event_type, - _inserted_timestamp + event_type FROM {{ ref( 'silver__events' @@ -35,9 +34,9 @@ WITH evnts AS ( AND success {% if is_incremental() %} -AND _inserted_timestamp >= ( +AND modified_timestamp >= ( SELECT - MAX(_inserted_timestamp) + MAX(modified_timestamp) FROM {{ this }} ) @@ -72,7 +71,6 @@ SELECT ) }} AS dex_swaps_batswap_id, SYSDATE() AS inserted_timestamp, SYSDATE() AS modified_timestamp, - _inserted_timestamp, '{{ invocation_id }}' AS _invocation_id FROM evnts A diff --git a/models/silver/defi/dex/silver__dex_swaps_cellana.sql b/models/silver/defi/dex/silver__dex_swaps_cellana.sql index c6a4b0d..da57773 100644 --- a/models/silver/defi/dex/silver__dex_swaps_cellana.sql +++ b/models/silver/defi/dex/silver__dex_swaps_cellana.sql @@ -3,7 +3,7 @@ unique_key = "dex_swaps_cellana_id", incremental_strategy = 'merge', merge_exclude_columns = ["inserted_timestamp"], - cluster_by = ['block_timestamp::DATE','_inserted_timestamp::DATE'], + cluster_by = ['modified_timestamp::DATE'], tags = ['noncore'] ) }} @@ -12,22 +12,14 @@ WITH tx AS ( SELECT tx_hash, block_timestamp, - sender + sender, + modified_timestamp FROM {{ ref( 'silver__transactions' ) }} WHERE success - -{% if is_incremental() %} -AND _inserted_timestamp >= ( - SELECT - MAX(_inserted_timestamp) - FROM - {{ this }} -) -{% endif %} ), evnts AS ( SELECT @@ -41,7 +33,7 @@ evnts AS ( event_resource, event_data, event_type, - _inserted_timestamp + modified_timestamp FROM {{ ref( 'silver__events' @@ -50,15 +42,6 @@ evnts AS ( event_address = '0x4bf51972879e3b95c4781a5cdcb9e1ee24ef483e7d22f2d903626f126df62bd1' AND event_resource ILIKE 'SwapEvent%' AND success - -{% if is_incremental() %} -AND _inserted_timestamp >= ( - SELECT - MAX(_inserted_timestamp) - FROM - {{ this }} -) -{% endif %} ) SELECT block_number, @@ -78,7 +61,6 @@ SELECT ) }} AS dex_swaps_cellana_id, SYSDATE() AS inserted_timestamp, SYSDATE() AS modified_timestamp, - _inserted_timestamp, '{{ invocation_id }}' AS _invocation_id FROM evnts A @@ -86,3 +68,16 @@ FROM tx_hash, block_timestamp ) + +{% if is_incremental() %} +WHERE + GREATEST( + A.modified_timestamp, + b.modified_timestamp + ) >= ( + SELECT + MAX(modified_timestamp) + FROM + {{ this }} + ) +{% endif %} diff --git a/models/silver/defi/dex/silver__dex_swaps_cetus.sql b/models/silver/defi/dex/silver__dex_swaps_cetus.sql index f0b98d9..2353a42 100644 --- a/models/silver/defi/dex/silver__dex_swaps_cetus.sql +++ b/models/silver/defi/dex/silver__dex_swaps_cetus.sql @@ -3,7 +3,7 @@ unique_key = "dex_swaps_cetus_id", incremental_strategy = 'merge', merge_exclude_columns = ["inserted_timestamp"], - cluster_by = ['block_timestamp::DATE','_inserted_timestamp::DATE'], + cluster_by = ['modified_timestamp::DATE'], tags = ['noncore'] ) }} @@ -12,22 +12,14 @@ WITH tx AS ( SELECT tx_hash, block_timestamp, - sender + sender, + modified_timestamp FROM {{ ref( 'silver__transactions' ) }} WHERE success - -{% if is_incremental() %} -AND _inserted_timestamp >= ( - SELECT - MAX(_inserted_timestamp) - FROM - {{ this }} -) -{% endif %} ), evnts AS ( SELECT @@ -41,7 +33,7 @@ evnts AS ( event_resource, event_data, event_type, - _inserted_timestamp + modified_timestamp FROM {{ ref( 'silver__events' @@ -50,15 +42,6 @@ evnts AS ( event_address = '0xec42a352cc65eca17a9fa85d0fc602295897ed6b8b8af6a6c79ef490eb8f9eba' AND event_resource ILIKE 'SwapEvent%' AND success - -{% if is_incremental() %} -AND _inserted_timestamp >= ( - SELECT - MAX(_inserted_timestamp) - FROM - {{ this }} -) -{% endif %} ), pre_final AS ( SELECT @@ -117,14 +100,26 @@ pre_final AS ( CASE WHEN a_out = 0 THEN b_out ELSE a_out - END AS amount_out_unadj, - _inserted_timestamp + END AS amount_out_unadj FROM evnts A JOIN tx b USING( tx_hash, block_timestamp ) + +{% if is_incremental() %} +WHERE + GREATEST( + A.modified_timestamp, + b.modified_timestamp + ) >= ( + SELECT + MAX(modified_timestamp) + FROM + {{ this }} + ) +{% endif %} ) SELECT block_number, @@ -143,7 +138,6 @@ SELECT ) }} AS dex_swaps_cetus_id, SYSDATE() AS inserted_timestamp, SYSDATE() AS modified_timestamp, - _inserted_timestamp, '{{ invocation_id }}' AS _invocation_id FROM pre_final diff --git a/models/silver/defi/dex/silver__dex_swaps_combined.sql b/models/silver/defi/dex/silver__dex_swaps_combined.sql index bf6ec1d..e764634 100644 --- a/models/silver/defi/dex/silver__dex_swaps_combined.sql +++ b/models/silver/defi/dex/silver__dex_swaps_combined.sql @@ -4,7 +4,7 @@ ) }} {% set models = [ - ('anieswap', ref('silver__dex_swaps_animeswap')), + ('animeswap', ref('silver__dex_swaps_animeswap')), ('auxexchange', ref('silver__dex_swaps_auxexchange')), ('batswap', ref('silver__dex_swaps_batswap')), ('cellana', ref('silver__dex_swaps_cellana')), @@ -36,7 +36,6 @@ SELECT dex_swaps_animeswap_id AS dex_swaps_combined_id, inserted_timestamp, modified_timestamp, - _inserted_timestamp, _invocation_id FROM ({% for models in models %} diff --git a/models/silver/defi/dex/silver__dex_swaps_hippo.sql b/models/silver/defi/dex/silver__dex_swaps_hippo.sql index 8f8adc4..9955d78 100644 --- a/models/silver/defi/dex/silver__dex_swaps_hippo.sql +++ b/models/silver/defi/dex/silver__dex_swaps_hippo.sql @@ -3,7 +3,7 @@ unique_key = "dex_swaps_hippo_id", incremental_strategy = 'merge', merge_exclude_columns = ["inserted_timestamp"], - cluster_by = ['block_timestamp::DATE','_inserted_timestamp::DATE'], + cluster_by = ['modified_timestamp::DATE'], tags = ['noncore'] ) }} @@ -12,22 +12,14 @@ WITH tx AS ( SELECT tx_hash, block_timestamp, - sender + sender, + modified_timestamp FROM {{ ref( 'silver__transactions' ) }} WHERE success - -{% if is_incremental() %} -AND _inserted_timestamp >= ( - SELECT - MAX(_inserted_timestamp) - FROM - {{ this }} -) -{% endif %} ), evnts AS ( SELECT @@ -41,7 +33,7 @@ evnts AS ( event_resource, event_data, event_type, - _inserted_timestamp + modified_timestamp FROM {{ ref( 'silver__events' @@ -53,15 +45,6 @@ evnts AS ( ) AND event_resource ILIKE 'SwapStepEvent%' AND success - -{% if is_incremental() %} -AND _inserted_timestamp >= ( - SELECT - MAX(_inserted_timestamp) - FROM - {{ this }} -) -{% endif %} ) SELECT block_number, @@ -105,7 +88,6 @@ SELECT ) }} AS dex_swaps_hippo_id, SYSDATE() AS inserted_timestamp, SYSDATE() AS modified_timestamp, - _inserted_timestamp, '{{ invocation_id }}' AS _invocation_id FROM evnts A @@ -113,3 +95,16 @@ FROM tx_hash, block_timestamp ) + +{% if is_incremental() %} +WHERE + GREATEST( + A.modified_timestamp, + b.modified_timestamp + ) >= ( + SELECT + MAX(modified_timestamp) + FROM + {{ this }} + ) +{% endif %} diff --git a/models/silver/defi/dex/silver__dex_swaps_liquidswap.sql b/models/silver/defi/dex/silver__dex_swaps_liquidswap.sql index 685dc8b..2fb6bf6 100644 --- a/models/silver/defi/dex/silver__dex_swaps_liquidswap.sql +++ b/models/silver/defi/dex/silver__dex_swaps_liquidswap.sql @@ -3,7 +3,7 @@ unique_key = "dex_swaps_liquidswap_id", incremental_strategy = 'merge', merge_exclude_columns = ["inserted_timestamp"], - cluster_by = ['block_timestamp::DATE','_inserted_timestamp::DATE'], + cluster_by = ['modified_timestamp::DATE'], tags = ['noncore'] ) }} @@ -12,22 +12,14 @@ WITH tx AS ( SELECT tx_hash, block_timestamp, - sender + sender, + modified_timestamp FROM {{ ref( 'silver__transactions' ) }} WHERE success - -{% if is_incremental() %} -AND _inserted_timestamp >= ( - SELECT - MAX(_inserted_timestamp) - FROM - {{ this }} -) -{% endif %} ), evnts AS ( SELECT @@ -41,7 +33,7 @@ evnts AS ( event_resource, event_data, event_type, - _inserted_timestamp + modified_timestamp FROM {{ ref( 'silver__events' @@ -53,15 +45,6 @@ evnts AS ( ) AND event_resource ILIKE 'SwapEvent%' AND success - -{% if is_incremental() %} -AND _inserted_timestamp >= ( - SELECT - MAX(_inserted_timestamp) - FROM - {{ this }} -) -{% endif %} ) SELECT block_number, @@ -92,7 +75,6 @@ SELECT ) }} AS dex_swaps_liquidswap_id, SYSDATE() AS inserted_timestamp, SYSDATE() AS modified_timestamp, - _inserted_timestamp, '{{ invocation_id }}' AS _invocation_id FROM evnts A @@ -100,3 +82,16 @@ FROM tx_hash, block_timestamp ) + +{% if is_incremental() %} +WHERE + GREATEST( + A.modified_timestamp, + b.modified_timestamp + ) >= ( + SELECT + MAX(modified_timestamp) + FROM + {{ this }} + ) +{% endif %} diff --git a/models/silver/defi/dex/silver__dex_swaps_pancake.sql b/models/silver/defi/dex/silver__dex_swaps_pancake.sql index 2ad26e5..8fd6eaf 100644 --- a/models/silver/defi/dex/silver__dex_swaps_pancake.sql +++ b/models/silver/defi/dex/silver__dex_swaps_pancake.sql @@ -3,7 +3,7 @@ unique_key = "dex_swaps_pancake_id", incremental_strategy = 'merge', merge_exclude_columns = ["inserted_timestamp"], - cluster_by = ['block_timestamp::DATE','_inserted_timestamp::DATE'], + cluster_by = ['modified_timestamp::DATE'], tags = ['noncore'] ) }} @@ -12,22 +12,14 @@ WITH tx AS ( SELECT tx_hash, block_timestamp, - sender + sender, + modified_timestamp FROM {{ ref( 'silver__transactions' ) }} WHERE success - -{% if is_incremental() %} -AND _inserted_timestamp >= ( - SELECT - MAX(_inserted_timestamp) - FROM - {{ this }} -) -{% endif %} ), evnts AS ( SELECT @@ -41,7 +33,7 @@ evnts AS ( event_resource, event_data, event_type, - _inserted_timestamp + modified_timestamp FROM {{ ref( 'silver__events' @@ -50,15 +42,6 @@ evnts AS ( event_address = '0xc7efb4076dbe143cbcd98cfaaa929ecfc8f299203dfff63b95ccb6bfe19850fa' AND event_resource ILIKE 'SwapEvent%' AND success - -{% if is_incremental() %} -AND _inserted_timestamp >= ( - SELECT - MAX(_inserted_timestamp) - FROM - {{ this }} -) -{% endif %} ) SELECT block_number, @@ -89,7 +72,6 @@ SELECT ) }} AS dex_swaps_pancake_id, SYSDATE() AS inserted_timestamp, SYSDATE() AS modified_timestamp, - _inserted_timestamp, '{{ invocation_id }}' AS _invocation_id FROM evnts A @@ -97,3 +79,16 @@ FROM tx_hash, block_timestamp ) + +{% if is_incremental() %} +WHERE + GREATEST( + A.modified_timestamp, + b.modified_timestamp + ) >= ( + SELECT + MAX(modified_timestamp) + FROM + {{ this }} + ) +{% endif %} diff --git a/models/silver/defi/dex/silver__dex_swaps_sushi.sql b/models/silver/defi/dex/silver__dex_swaps_sushi.sql index c3e56ff..d4486c9 100644 --- a/models/silver/defi/dex/silver__dex_swaps_sushi.sql +++ b/models/silver/defi/dex/silver__dex_swaps_sushi.sql @@ -3,7 +3,7 @@ unique_key = "dex_swaps_sushi_id", incremental_strategy = 'merge', merge_exclude_columns = ["inserted_timestamp"], - cluster_by = ['block_timestamp::DATE','_inserted_timestamp::DATE'], + cluster_by = ['modified_timestamp::DATE'], tags = ['noncore'] ) }} @@ -12,22 +12,14 @@ WITH tx AS ( SELECT tx_hash, block_timestamp, - sender + sender, + modified_timestamp FROM {{ ref( 'silver__transactions' ) }} WHERE success - -{% if is_incremental() %} -AND _inserted_timestamp >= ( - SELECT - MAX(_inserted_timestamp) - FROM - {{ this }} -) -{% endif %} ), evnts AS ( SELECT @@ -41,7 +33,7 @@ evnts AS ( event_resource, event_data, event_type, - _inserted_timestamp + modified_timestamp FROM {{ ref( 'silver__events' @@ -50,15 +42,6 @@ evnts AS ( event_address = '0x31a6675cbe84365bf2b0cbce617ece6c47023ef70826533bde5203d32171dc3c' AND event_resource ILIKE 'SwapEvent%' AND success - -{% if is_incremental() %} -AND _inserted_timestamp >= ( - SELECT - MAX(_inserted_timestamp) - FROM - {{ this }} -) -{% endif %} ) SELECT block_number, @@ -89,7 +72,6 @@ SELECT ) }} AS dex_swaps_sushi_id, SYSDATE() AS inserted_timestamp, SYSDATE() AS modified_timestamp, - _inserted_timestamp, '{{ invocation_id }}' AS _invocation_id FROM evnts A @@ -97,3 +79,16 @@ FROM tx_hash, block_timestamp ) + +{% if is_incremental() %} +WHERE + GREATEST( + A.modified_timestamp, + b.modified_timestamp + ) >= ( + SELECT + MAX(modified_timestamp) + FROM + {{ this }} + ) +{% endif %} diff --git a/models/silver/defi/dex/silver__dex_swaps_thala.sql b/models/silver/defi/dex/silver__dex_swaps_thala.sql index c9b08c0..54e81c2 100644 --- a/models/silver/defi/dex/silver__dex_swaps_thala.sql +++ b/models/silver/defi/dex/silver__dex_swaps_thala.sql @@ -3,7 +3,7 @@ unique_key = "dex_swaps_thala_id", incremental_strategy = 'merge', merge_exclude_columns = ["inserted_timestamp"], - cluster_by = ['block_timestamp::DATE','_inserted_timestamp::DATE'], + cluster_by = ['modified_timestamp::DATE'], tags = ['noncore'] ) }} @@ -12,22 +12,14 @@ WITH tx AS ( SELECT tx_hash, block_timestamp, - sender + sender, + modified_timestamp FROM {{ ref( 'silver__transactions' ) }} WHERE success - -{% if is_incremental() %} -AND _inserted_timestamp >= ( - SELECT - MAX(_inserted_timestamp) - FROM - {{ this }} -) -{% endif %} ), evnts AS ( SELECT @@ -41,7 +33,7 @@ evnts AS ( event_resource, event_data, event_type, - _inserted_timestamp + modified_timestamp FROM {{ ref( 'silver__events' @@ -53,15 +45,6 @@ evnts AS ( ) AND event_resource LIKE 'SwapEvent%' AND success - -{% if is_incremental() %} -AND _inserted_timestamp >= ( - SELECT - MAX(_inserted_timestamp) - FROM - {{ this }} -) -{% endif %} ) SELECT block_number, @@ -88,7 +71,6 @@ SELECT ) }} AS dex_swaps_thala_id, SYSDATE() AS inserted_timestamp, SYSDATE() AS modified_timestamp, - _inserted_timestamp, '{{ invocation_id }}' AS _invocation_id FROM evnts A @@ -96,3 +78,16 @@ FROM tx_hash, block_timestamp ) + +{% if is_incremental() %} +WHERE + GREATEST( + A.modified_timestamp, + b.modified_timestamp + ) >= ( + SELECT + MAX(modified_timestamp) + FROM + {{ this }} + ) +{% endif %} diff --git a/models/silver/defi/dex/silver__dex_swaps_tsunami.sql b/models/silver/defi/dex/silver__dex_swaps_tsunami.sql index d3b1a21..74de949 100644 --- a/models/silver/defi/dex/silver__dex_swaps_tsunami.sql +++ b/models/silver/defi/dex/silver__dex_swaps_tsunami.sql @@ -3,8 +3,7 @@ unique_key = "dex_swaps_tsunami_id", incremental_strategy = 'merge', merge_exclude_columns = ["inserted_timestamp"], - cluster_by = ['block_timestamp::DATE','_inserted_timestamp::DATE'], - tags = ['noncore'] + cluster_by = ['modified_timestamp::DATE'], ) }} WITH evnts AS ( @@ -19,8 +18,7 @@ WITH evnts AS ( event_address, event_resource, event_data, - event_type, - _inserted_timestamp + event_type FROM {{ ref( 'silver__events' @@ -31,9 +29,9 @@ WITH evnts AS ( AND success {% if is_incremental() %} -AND _inserted_timestamp >= ( +AND modified_timestamp >= ( SELECT - MAX(_inserted_timestamp) + MAX(modified_timestamp) FROM {{ this }} ) @@ -76,7 +74,6 @@ SELECT ) }} AS dex_swaps_tsunami_id, SYSDATE() AS inserted_timestamp, SYSDATE() AS modified_timestamp, - _inserted_timestamp, '{{ invocation_id }}' AS _invocation_id FROM evnts A