diff --git a/models/streamline/external/snag/transaction_entries/silver_api__transaction_entries.sql b/models/streamline/external/snag/transaction_entries/silver_api__transaction_entries.sql index ab93a2b..833d34e 100644 --- a/models/streamline/external/snag/transaction_entries/silver_api__transaction_entries.sql +++ b/models/streamline/external/snag/transaction_entries/silver_api__transaction_entries.sql @@ -9,44 +9,48 @@ tags = ['rewards_points_spend', 'streamline_non_core'] ) }} -{% if var('STOREFRONT_INITIAL_RUN', false) %} - -SELECT - partition_key, - NULL AS entry_count, - NULL AS starting_after, - NULL AS api_limit, - NULL AS first_entry_id, - NULL AS last_entry_id, - TRY_PARSE_JSON(DATA) :createdAt :: timestamp_ntz AS created_at, - TRY_PARSE_JSON(DATA) :id :: STRING AS entry_id, - index_in_batch :: INTEGER AS INDEX, - TRY_PARSE_JSON(DATA) AS DATA, - _inserted_timestamp :: timestamp_ntz AS _inserted_timestamp, - {{ dbt_utils.generate_surrogate_key( - ['entry_id', 'partition_key'] - ) }} AS transaction_entries_id, - SYSDATE() AS inserted_timestamp, - SYSDATE() AS modified_timestamp, - '{{ invocation_id }}' AS _invocation_id -FROM - {{ source('flow_seeds', 'transaction_entries') }} - -{% else %} -WITH bronze AS ( +{% if var( + 'STOREFRONT_INITIAL_RUN', + false + ) %} SELECT partition_key, - DATA, - VALUE :STARTING_AFTER :: STRING AS starting_after, - VALUE :API_LIMIT :: INTEGER AS api_limit, - ARRAY_SIZE( - DATA :data :: ARRAY - ) AS entry_count, - DATA :data [0] :id :: STRING AS first_entry_id, - DATA :data [entry_count - 1] :id :: STRING AS last_entry_id, - _inserted_timestamp + NULL AS entry_count, + NULL AS starting_after, + NULL AS api_limit, + NULL AS first_entry_id, + NULL AS last_entry_id, + TRY_PARSE_JSON(DATA) :createdAt :: timestamp_ntz AS created_at, + TRY_PARSE_JSON(DATA) :id :: STRING AS entry_id, + index_in_batch :: INTEGER AS INDEX, + TRY_PARSE_JSON(DATA) AS DATA, + _inserted_timestamp :: timestamp_ntz AS _inserted_timestamp, + {{ dbt_utils.generate_surrogate_key( + ['entry_id', 'partition_key'] + ) }} AS transaction_entries_id, + SYSDATE() AS inserted_timestamp, + SYSDATE() AS modified_timestamp, + '{{ invocation_id }}' AS _invocation_id FROM + {{ source( + 'flow_seeds', + 'transaction_entries' + ) }} + {% else %} + WITH bronze AS ( + SELECT + partition_key, + DATA, + VALUE :STARTING_AFTER :: STRING AS starting_after, + VALUE :API_LIMIT :: INTEGER AS api_limit, + ARRAY_SIZE( + DATA :data :: ARRAY + ) AS entry_count, + DATA :data [0] :id :: STRING AS first_entry_id, + DATA :data [entry_count - 1] :id :: STRING AS last_entry_id, + _inserted_timestamp + FROM {% if is_incremental() %} {{ ref('bronze_api__transaction_entries') }} @@ -83,6 +87,9 @@ FROM bronze, LATERAL FLATTEN( input => DATA :data :: ARRAY - ) - + ) qualify ROW_NUMBER() over ( + PARTITION BY transaction_entries_id + ORDER BY + _inserted_timestamp DESC + ) = 1 {% endif %}