hotfix with qualify
Some checks failed
docs_update / run_dbt_jobs (push) Has been cancelled
docs_update / notify-failure (push) Has been cancelled

This commit is contained in:
Eric Laurello 2025-10-21 08:32:40 -04:00
parent 59fbf4d0ec
commit 81ced7bc61

View File

@ -9,44 +9,48 @@
tags = ['rewards_points_spend', 'streamline_non_core']
) }}
{% if var('STOREFRONT_INITIAL_RUN', false) %}
SELECT
partition_key,
NULL AS entry_count,
NULL AS starting_after,
NULL AS api_limit,
NULL AS first_entry_id,
NULL AS last_entry_id,
TRY_PARSE_JSON(DATA) :createdAt :: timestamp_ntz AS created_at,
TRY_PARSE_JSON(DATA) :id :: STRING AS entry_id,
index_in_batch :: INTEGER AS INDEX,
TRY_PARSE_JSON(DATA) AS DATA,
_inserted_timestamp :: timestamp_ntz AS _inserted_timestamp,
{{ dbt_utils.generate_surrogate_key(
['entry_id', 'partition_key']
) }} AS transaction_entries_id,
SYSDATE() AS inserted_timestamp,
SYSDATE() AS modified_timestamp,
'{{ invocation_id }}' AS _invocation_id
FROM
{{ source('flow_seeds', 'transaction_entries') }}
{% else %}
WITH bronze AS (
{% if var(
'STOREFRONT_INITIAL_RUN',
false
) %}
SELECT
partition_key,
DATA,
VALUE :STARTING_AFTER :: STRING AS starting_after,
VALUE :API_LIMIT :: INTEGER AS api_limit,
ARRAY_SIZE(
DATA :data :: ARRAY
) AS entry_count,
DATA :data [0] :id :: STRING AS first_entry_id,
DATA :data [entry_count - 1] :id :: STRING AS last_entry_id,
_inserted_timestamp
NULL AS entry_count,
NULL AS starting_after,
NULL AS api_limit,
NULL AS first_entry_id,
NULL AS last_entry_id,
TRY_PARSE_JSON(DATA) :createdAt :: timestamp_ntz AS created_at,
TRY_PARSE_JSON(DATA) :id :: STRING AS entry_id,
index_in_batch :: INTEGER AS INDEX,
TRY_PARSE_JSON(DATA) AS DATA,
_inserted_timestamp :: timestamp_ntz AS _inserted_timestamp,
{{ dbt_utils.generate_surrogate_key(
['entry_id', 'partition_key']
) }} AS transaction_entries_id,
SYSDATE() AS inserted_timestamp,
SYSDATE() AS modified_timestamp,
'{{ invocation_id }}' AS _invocation_id
FROM
{{ source(
'flow_seeds',
'transaction_entries'
) }}
{% else %}
WITH bronze AS (
SELECT
partition_key,
DATA,
VALUE :STARTING_AFTER :: STRING AS starting_after,
VALUE :API_LIMIT :: INTEGER AS api_limit,
ARRAY_SIZE(
DATA :data :: ARRAY
) AS entry_count,
DATA :data [0] :id :: STRING AS first_entry_id,
DATA :data [entry_count - 1] :id :: STRING AS last_entry_id,
_inserted_timestamp
FROM
{% if is_incremental() %}
{{ ref('bronze_api__transaction_entries') }}
@ -83,6 +87,9 @@ FROM
bronze,
LATERAL FLATTEN(
input => DATA :data :: ARRAY
)
) qualify ROW_NUMBER() over (
PARTITION BY transaction_entries_id
ORDER BY
_inserted_timestamp DESC
) = 1
{% endif %}