fix clustering, spelling, custom recency test

This commit is contained in:
Eric Laurello 2024-07-17 13:12:53 -04:00
parent 0af7f0f06a
commit b19d800f98
15 changed files with 220 additions and 221 deletions

View File

@ -0,0 +1,48 @@
{% test recency_where(model, field, datepart, interval, where, ignore_time_component=False, group_by_columns = []) %}
{{ return(adapter.dispatch('test_recency', 'dbt_utils')(model, field, datepart, interval, ignore_time_component, group_by_columns, where)) }}
{% endtest %}
{% macro default__test_recency(model, field, datepart, interval, ignore_time_component, group_by_columns, where) %}
{% set threshold = 'cast(' ~ dbt.dateadd(datepart, interval * -1, dbt.current_timestamp()) ~ ' as ' ~ ('date' if ignore_time_component else dbt.type_timestamp()) ~ ')' %}
{% if group_by_columns|length() > 0 %}
{% set select_gb_cols = group_by_columns|join(' ,') + ', ' %}
{% set groupby_gb_cols = 'group by ' + group_by_columns|join(',') %}
{% endif %}
{% if where|length() > 0 %}
{% set where_clause_val = where %}
{% else %}
{% set where_clause_val = '1=1' %}
{% endif %}
with recency as (
select
{{ select_gb_cols }}
{% if ignore_time_component %}
cast(max({{ field }}) as date) as most_recent
{%- else %}
max({{ field }}) as most_recent
{%- endif %}
from {{ model }}
where {{ where_clause_val }}
{{ groupby_gb_cols }}
)
select
{{ select_gb_cols }}
most_recent,
{{ threshold }} as threshold
from recency
where most_recent < {{ threshold }}
{% endmacro %}

View File

@ -7,6 +7,12 @@ models:
combination_of_columns:
- TX_HASH
- EVENT_INDEX
- recency_where:
field: BLOCK_TIMESTAMP
datepart: day
interval: 1
group_by_columns: [Platform]
where: "PLATFORM <> 'tsunami'"
columns:
- name: BLOCK_NUMBER
description: '{{ doc("block_number") }}'
@ -16,9 +22,6 @@ models:
description: '{{ doc("block_timestamp") }}'
tests:
- not_null
- dbt_expectations.expect_row_values_to_have_recent_data:
datepart: day
interval: 1
- name: TX_HASH
description: '{{ doc("tx_hash") }}'
tests:

View File

@ -3,7 +3,7 @@
unique_key = "dex_swaps_aires_id",
incremental_strategy = 'merge',
merge_exclude_columns = ["inserted_timestamp"],
cluster_by = ['block_timestamp::DATE','_inserted_timestamp::DATE'],
cluster_by = ['modified_timestamp::DATE'],
tags = ['noncore'],
enabled = false
) }}
@ -34,9 +34,9 @@ WITH evnts AS (
AND block_timestamp :: DATE >= '2024-06-01'
{% if is_incremental() %}
AND _inserted_timestamp >= (
AND modified_timestamp >= (
SELECT
MAX(_inserted_timestamp)
MAX(modified_timestamp)
FROM
{{ this }}
)

View File

@ -3,7 +3,7 @@
unique_key = "dex_swaps_animeswap_id",
incremental_strategy = 'merge',
merge_exclude_columns = ["inserted_timestamp"],
cluster_by = ['block_timestamp::DATE','_inserted_timestamp::DATE'],
cluster_by = ['modified_timestamp::DATE'],
tags = ['noncore']
) }}
@ -12,22 +12,14 @@ WITH tx AS (
SELECT
tx_hash,
block_timestamp,
sender
sender,
modified_timestamp
FROM
{{ ref(
'silver__transactions'
) }}
WHERE
success
{% if is_incremental() %}
AND _inserted_timestamp >= (
SELECT
MAX(_inserted_timestamp)
FROM
{{ this }}
)
{% endif %}
),
evnts AS (
SELECT
@ -41,7 +33,7 @@ evnts AS (
event_resource,
event_data,
event_type,
_inserted_timestamp
modified_timestamp
FROM
{{ ref(
'silver__events'
@ -50,15 +42,6 @@ evnts AS (
event_address = '0x16fe2df00ea7dde4a63409201f7f4e536bde7bb7335526a35d05111e68aa322c'
AND event_resource ILIKE 'SwapEvent%'
AND success
{% if is_incremental() %}
AND _inserted_timestamp >= (
SELECT
MAX(_inserted_timestamp)
FROM
{{ this }}
)
{% endif %}
)
SELECT
block_number,
@ -89,7 +72,6 @@ SELECT
) }} AS dex_swaps_animeswap_id,
SYSDATE() AS inserted_timestamp,
SYSDATE() AS modified_timestamp,
_inserted_timestamp,
'{{ invocation_id }}' AS _invocation_id
FROM
evnts A
@ -97,3 +79,16 @@ FROM
tx_hash,
block_timestamp
)
{% if is_incremental() %}
WHERE
GREATEST(
A.modified_timestamp,
b.modified_timestamp
) >= (
SELECT
MAX(modified_timestamp)
FROM
{{ this }}
)
{% endif %}

View File

@ -3,7 +3,7 @@
unique_key = "dex_swaps_aux_exchange_id",
incremental_strategy = 'merge',
merge_exclude_columns = ["inserted_timestamp"],
cluster_by = ['block_timestamp::DATE','_inserted_timestamp::DATE'],
cluster_by = ['modified_timestamp::DATE'],
tags = ['noncore']
) }}
@ -12,22 +12,14 @@ WITH tx AS (
SELECT
tx_hash,
block_timestamp,
sender
sender,
modified_timestamp
FROM
{{ ref(
'silver__transactions'
) }}
WHERE
success
{% if is_incremental() %}
AND _inserted_timestamp >= (
SELECT
MAX(_inserted_timestamp)
FROM
{{ this }}
)
{% endif %}
),
evnts AS (
SELECT
@ -41,7 +33,7 @@ evnts AS (
event_resource,
event_data,
event_type,
_inserted_timestamp
modified_timestamp
FROM
{{ ref(
'silver__events'
@ -50,15 +42,6 @@ evnts AS (
event_address = '0xbd35135844473187163ca197ca93b2ab014370587bb0ed3befff9e902d6bb541'
AND event_resource ILIKE 'SwapEvent%'
AND success
{% if is_incremental() %}
AND _inserted_timestamp >= (
SELECT
MAX(_inserted_timestamp)
FROM
{{ this }}
)
{% endif %}
)
SELECT
block_number,
@ -77,7 +60,6 @@ SELECT
) }} AS dex_swaps_aux_exchange_id,
SYSDATE() AS inserted_timestamp,
SYSDATE() AS modified_timestamp,
_inserted_timestamp,
'{{ invocation_id }}' AS _invocation_id
FROM
evnts A
@ -85,3 +67,16 @@ FROM
tx_hash,
block_timestamp
)
{% if is_incremental() %}
WHERE
GREATEST(
A.modified_timestamp,
b.modified_timestamp
) >= (
SELECT
MAX(modified_timestamp)
FROM
{{ this }}
)
{% endif %}

View File

@ -3,7 +3,7 @@
unique_key = "dex_swaps_batswap_id",
incremental_strategy = 'merge',
merge_exclude_columns = ["inserted_timestamp"],
cluster_by = ['block_timestamp::DATE','_inserted_timestamp::DATE'],
cluster_by = ['modified_timestamp::DATE'],
tags = ['noncore']
) }}
@ -19,8 +19,7 @@ WITH evnts AS (
event_address,
event_resource,
event_data,
event_type,
_inserted_timestamp
event_type
FROM
{{ ref(
'silver__events'
@ -35,9 +34,9 @@ WITH evnts AS (
AND success
{% if is_incremental() %}
AND _inserted_timestamp >= (
AND modified_timestamp >= (
SELECT
MAX(_inserted_timestamp)
MAX(modified_timestamp)
FROM
{{ this }}
)
@ -72,7 +71,6 @@ SELECT
) }} AS dex_swaps_batswap_id,
SYSDATE() AS inserted_timestamp,
SYSDATE() AS modified_timestamp,
_inserted_timestamp,
'{{ invocation_id }}' AS _invocation_id
FROM
evnts A

View File

@ -3,7 +3,7 @@
unique_key = "dex_swaps_cellana_id",
incremental_strategy = 'merge',
merge_exclude_columns = ["inserted_timestamp"],
cluster_by = ['block_timestamp::DATE','_inserted_timestamp::DATE'],
cluster_by = ['modified_timestamp::DATE'],
tags = ['noncore']
) }}
@ -12,22 +12,14 @@ WITH tx AS (
SELECT
tx_hash,
block_timestamp,
sender
sender,
modified_timestamp
FROM
{{ ref(
'silver__transactions'
) }}
WHERE
success
{% if is_incremental() %}
AND _inserted_timestamp >= (
SELECT
MAX(_inserted_timestamp)
FROM
{{ this }}
)
{% endif %}
),
evnts AS (
SELECT
@ -41,7 +33,7 @@ evnts AS (
event_resource,
event_data,
event_type,
_inserted_timestamp
modified_timestamp
FROM
{{ ref(
'silver__events'
@ -50,15 +42,6 @@ evnts AS (
event_address = '0x4bf51972879e3b95c4781a5cdcb9e1ee24ef483e7d22f2d903626f126df62bd1'
AND event_resource ILIKE 'SwapEvent%'
AND success
{% if is_incremental() %}
AND _inserted_timestamp >= (
SELECT
MAX(_inserted_timestamp)
FROM
{{ this }}
)
{% endif %}
)
SELECT
block_number,
@ -78,7 +61,6 @@ SELECT
) }} AS dex_swaps_cellana_id,
SYSDATE() AS inserted_timestamp,
SYSDATE() AS modified_timestamp,
_inserted_timestamp,
'{{ invocation_id }}' AS _invocation_id
FROM
evnts A
@ -86,3 +68,16 @@ FROM
tx_hash,
block_timestamp
)
{% if is_incremental() %}
WHERE
GREATEST(
A.modified_timestamp,
b.modified_timestamp
) >= (
SELECT
MAX(modified_timestamp)
FROM
{{ this }}
)
{% endif %}

View File

@ -3,7 +3,7 @@
unique_key = "dex_swaps_cetus_id",
incremental_strategy = 'merge',
merge_exclude_columns = ["inserted_timestamp"],
cluster_by = ['block_timestamp::DATE','_inserted_timestamp::DATE'],
cluster_by = ['modified_timestamp::DATE'],
tags = ['noncore']
) }}
@ -12,22 +12,14 @@ WITH tx AS (
SELECT
tx_hash,
block_timestamp,
sender
sender,
modified_timestamp
FROM
{{ ref(
'silver__transactions'
) }}
WHERE
success
{% if is_incremental() %}
AND _inserted_timestamp >= (
SELECT
MAX(_inserted_timestamp)
FROM
{{ this }}
)
{% endif %}
),
evnts AS (
SELECT
@ -41,7 +33,7 @@ evnts AS (
event_resource,
event_data,
event_type,
_inserted_timestamp
modified_timestamp
FROM
{{ ref(
'silver__events'
@ -50,15 +42,6 @@ evnts AS (
event_address = '0xec42a352cc65eca17a9fa85d0fc602295897ed6b8b8af6a6c79ef490eb8f9eba'
AND event_resource ILIKE 'SwapEvent%'
AND success
{% if is_incremental() %}
AND _inserted_timestamp >= (
SELECT
MAX(_inserted_timestamp)
FROM
{{ this }}
)
{% endif %}
),
pre_final AS (
SELECT
@ -117,14 +100,26 @@ pre_final AS (
CASE
WHEN a_out = 0 THEN b_out
ELSE a_out
END AS amount_out_unadj,
_inserted_timestamp
END AS amount_out_unadj
FROM
evnts A
JOIN tx b USING(
tx_hash,
block_timestamp
)
{% if is_incremental() %}
WHERE
GREATEST(
A.modified_timestamp,
b.modified_timestamp
) >= (
SELECT
MAX(modified_timestamp)
FROM
{{ this }}
)
{% endif %}
)
SELECT
block_number,
@ -143,7 +138,6 @@ SELECT
) }} AS dex_swaps_cetus_id,
SYSDATE() AS inserted_timestamp,
SYSDATE() AS modified_timestamp,
_inserted_timestamp,
'{{ invocation_id }}' AS _invocation_id
FROM
pre_final

View File

@ -4,7 +4,7 @@
) }}
{% set models = [
('anieswap', ref('silver__dex_swaps_animeswap')),
('animeswap', ref('silver__dex_swaps_animeswap')),
('auxexchange', ref('silver__dex_swaps_auxexchange')),
('batswap', ref('silver__dex_swaps_batswap')),
('cellana', ref('silver__dex_swaps_cellana')),
@ -36,7 +36,6 @@ SELECT
dex_swaps_animeswap_id AS dex_swaps_combined_id,
inserted_timestamp,
modified_timestamp,
_inserted_timestamp,
_invocation_id
FROM
({% for models in models %}

View File

@ -3,7 +3,7 @@
unique_key = "dex_swaps_hippo_id",
incremental_strategy = 'merge',
merge_exclude_columns = ["inserted_timestamp"],
cluster_by = ['block_timestamp::DATE','_inserted_timestamp::DATE'],
cluster_by = ['modified_timestamp::DATE'],
tags = ['noncore']
) }}
@ -12,22 +12,14 @@ WITH tx AS (
SELECT
tx_hash,
block_timestamp,
sender
sender,
modified_timestamp
FROM
{{ ref(
'silver__transactions'
) }}
WHERE
success
{% if is_incremental() %}
AND _inserted_timestamp >= (
SELECT
MAX(_inserted_timestamp)
FROM
{{ this }}
)
{% endif %}
),
evnts AS (
SELECT
@ -41,7 +33,7 @@ evnts AS (
event_resource,
event_data,
event_type,
_inserted_timestamp
modified_timestamp
FROM
{{ ref(
'silver__events'
@ -53,15 +45,6 @@ evnts AS (
)
AND event_resource ILIKE 'SwapStepEvent%'
AND success
{% if is_incremental() %}
AND _inserted_timestamp >= (
SELECT
MAX(_inserted_timestamp)
FROM
{{ this }}
)
{% endif %}
)
SELECT
block_number,
@ -105,7 +88,6 @@ SELECT
) }} AS dex_swaps_hippo_id,
SYSDATE() AS inserted_timestamp,
SYSDATE() AS modified_timestamp,
_inserted_timestamp,
'{{ invocation_id }}' AS _invocation_id
FROM
evnts A
@ -113,3 +95,16 @@ FROM
tx_hash,
block_timestamp
)
{% if is_incremental() %}
WHERE
GREATEST(
A.modified_timestamp,
b.modified_timestamp
) >= (
SELECT
MAX(modified_timestamp)
FROM
{{ this }}
)
{% endif %}

View File

@ -3,7 +3,7 @@
unique_key = "dex_swaps_liquidswap_id",
incremental_strategy = 'merge',
merge_exclude_columns = ["inserted_timestamp"],
cluster_by = ['block_timestamp::DATE','_inserted_timestamp::DATE'],
cluster_by = ['modified_timestamp::DATE'],
tags = ['noncore']
) }}
@ -12,22 +12,14 @@ WITH tx AS (
SELECT
tx_hash,
block_timestamp,
sender
sender,
modified_timestamp
FROM
{{ ref(
'silver__transactions'
) }}
WHERE
success
{% if is_incremental() %}
AND _inserted_timestamp >= (
SELECT
MAX(_inserted_timestamp)
FROM
{{ this }}
)
{% endif %}
),
evnts AS (
SELECT
@ -41,7 +33,7 @@ evnts AS (
event_resource,
event_data,
event_type,
_inserted_timestamp
modified_timestamp
FROM
{{ ref(
'silver__events'
@ -53,15 +45,6 @@ evnts AS (
)
AND event_resource ILIKE 'SwapEvent%'
AND success
{% if is_incremental() %}
AND _inserted_timestamp >= (
SELECT
MAX(_inserted_timestamp)
FROM
{{ this }}
)
{% endif %}
)
SELECT
block_number,
@ -92,7 +75,6 @@ SELECT
) }} AS dex_swaps_liquidswap_id,
SYSDATE() AS inserted_timestamp,
SYSDATE() AS modified_timestamp,
_inserted_timestamp,
'{{ invocation_id }}' AS _invocation_id
FROM
evnts A
@ -100,3 +82,16 @@ FROM
tx_hash,
block_timestamp
)
{% if is_incremental() %}
WHERE
GREATEST(
A.modified_timestamp,
b.modified_timestamp
) >= (
SELECT
MAX(modified_timestamp)
FROM
{{ this }}
)
{% endif %}

View File

@ -3,7 +3,7 @@
unique_key = "dex_swaps_pancake_id",
incremental_strategy = 'merge',
merge_exclude_columns = ["inserted_timestamp"],
cluster_by = ['block_timestamp::DATE','_inserted_timestamp::DATE'],
cluster_by = ['modified_timestamp::DATE'],
tags = ['noncore']
) }}
@ -12,22 +12,14 @@ WITH tx AS (
SELECT
tx_hash,
block_timestamp,
sender
sender,
modified_timestamp
FROM
{{ ref(
'silver__transactions'
) }}
WHERE
success
{% if is_incremental() %}
AND _inserted_timestamp >= (
SELECT
MAX(_inserted_timestamp)
FROM
{{ this }}
)
{% endif %}
),
evnts AS (
SELECT
@ -41,7 +33,7 @@ evnts AS (
event_resource,
event_data,
event_type,
_inserted_timestamp
modified_timestamp
FROM
{{ ref(
'silver__events'
@ -50,15 +42,6 @@ evnts AS (
event_address = '0xc7efb4076dbe143cbcd98cfaaa929ecfc8f299203dfff63b95ccb6bfe19850fa'
AND event_resource ILIKE 'SwapEvent%'
AND success
{% if is_incremental() %}
AND _inserted_timestamp >= (
SELECT
MAX(_inserted_timestamp)
FROM
{{ this }}
)
{% endif %}
)
SELECT
block_number,
@ -89,7 +72,6 @@ SELECT
) }} AS dex_swaps_pancake_id,
SYSDATE() AS inserted_timestamp,
SYSDATE() AS modified_timestamp,
_inserted_timestamp,
'{{ invocation_id }}' AS _invocation_id
FROM
evnts A
@ -97,3 +79,16 @@ FROM
tx_hash,
block_timestamp
)
{% if is_incremental() %}
WHERE
GREATEST(
A.modified_timestamp,
b.modified_timestamp
) >= (
SELECT
MAX(modified_timestamp)
FROM
{{ this }}
)
{% endif %}

View File

@ -3,7 +3,7 @@
unique_key = "dex_swaps_sushi_id",
incremental_strategy = 'merge',
merge_exclude_columns = ["inserted_timestamp"],
cluster_by = ['block_timestamp::DATE','_inserted_timestamp::DATE'],
cluster_by = ['modified_timestamp::DATE'],
tags = ['noncore']
) }}
@ -12,22 +12,14 @@ WITH tx AS (
SELECT
tx_hash,
block_timestamp,
sender
sender,
modified_timestamp
FROM
{{ ref(
'silver__transactions'
) }}
WHERE
success
{% if is_incremental() %}
AND _inserted_timestamp >= (
SELECT
MAX(_inserted_timestamp)
FROM
{{ this }}
)
{% endif %}
),
evnts AS (
SELECT
@ -41,7 +33,7 @@ evnts AS (
event_resource,
event_data,
event_type,
_inserted_timestamp
modified_timestamp
FROM
{{ ref(
'silver__events'
@ -50,15 +42,6 @@ evnts AS (
event_address = '0x31a6675cbe84365bf2b0cbce617ece6c47023ef70826533bde5203d32171dc3c'
AND event_resource ILIKE 'SwapEvent%'
AND success
{% if is_incremental() %}
AND _inserted_timestamp >= (
SELECT
MAX(_inserted_timestamp)
FROM
{{ this }}
)
{% endif %}
)
SELECT
block_number,
@ -89,7 +72,6 @@ SELECT
) }} AS dex_swaps_sushi_id,
SYSDATE() AS inserted_timestamp,
SYSDATE() AS modified_timestamp,
_inserted_timestamp,
'{{ invocation_id }}' AS _invocation_id
FROM
evnts A
@ -97,3 +79,16 @@ FROM
tx_hash,
block_timestamp
)
{% if is_incremental() %}
WHERE
GREATEST(
A.modified_timestamp,
b.modified_timestamp
) >= (
SELECT
MAX(modified_timestamp)
FROM
{{ this }}
)
{% endif %}

View File

@ -3,7 +3,7 @@
unique_key = "dex_swaps_thala_id",
incremental_strategy = 'merge',
merge_exclude_columns = ["inserted_timestamp"],
cluster_by = ['block_timestamp::DATE','_inserted_timestamp::DATE'],
cluster_by = ['modified_timestamp::DATE'],
tags = ['noncore']
) }}
@ -12,22 +12,14 @@ WITH tx AS (
SELECT
tx_hash,
block_timestamp,
sender
sender,
modified_timestamp
FROM
{{ ref(
'silver__transactions'
) }}
WHERE
success
{% if is_incremental() %}
AND _inserted_timestamp >= (
SELECT
MAX(_inserted_timestamp)
FROM
{{ this }}
)
{% endif %}
),
evnts AS (
SELECT
@ -41,7 +33,7 @@ evnts AS (
event_resource,
event_data,
event_type,
_inserted_timestamp
modified_timestamp
FROM
{{ ref(
'silver__events'
@ -53,15 +45,6 @@ evnts AS (
)
AND event_resource LIKE 'SwapEvent%'
AND success
{% if is_incremental() %}
AND _inserted_timestamp >= (
SELECT
MAX(_inserted_timestamp)
FROM
{{ this }}
)
{% endif %}
)
SELECT
block_number,
@ -88,7 +71,6 @@ SELECT
) }} AS dex_swaps_thala_id,
SYSDATE() AS inserted_timestamp,
SYSDATE() AS modified_timestamp,
_inserted_timestamp,
'{{ invocation_id }}' AS _invocation_id
FROM
evnts A
@ -96,3 +78,16 @@ FROM
tx_hash,
block_timestamp
)
{% if is_incremental() %}
WHERE
GREATEST(
A.modified_timestamp,
b.modified_timestamp
) >= (
SELECT
MAX(modified_timestamp)
FROM
{{ this }}
)
{% endif %}

View File

@ -3,8 +3,7 @@
unique_key = "dex_swaps_tsunami_id",
incremental_strategy = 'merge',
merge_exclude_columns = ["inserted_timestamp"],
cluster_by = ['block_timestamp::DATE','_inserted_timestamp::DATE'],
tags = ['noncore']
cluster_by = ['modified_timestamp::DATE'],
) }}
WITH evnts AS (
@ -19,8 +18,7 @@ WITH evnts AS (
event_address,
event_resource,
event_data,
event_type,
_inserted_timestamp
event_type
FROM
{{ ref(
'silver__events'
@ -31,9 +29,9 @@ WITH evnts AS (
AND success
{% if is_incremental() %}
AND _inserted_timestamp >= (
AND modified_timestamp >= (
SELECT
MAX(_inserted_timestamp)
MAX(modified_timestamp)
FROM
{{ this }}
)
@ -76,7 +74,6 @@ SELECT
) }} AS dex_swaps_tsunami_id,
SYSDATE() AS inserted_timestamp,
SYSDATE() AS modified_timestamp,
_inserted_timestamp,
'{{ invocation_id }}' AS _invocation_id
FROM
evnts A