diff --git a/models/silver/swaps/raydium/silver__swaps_intermediate_raydium_v4_amm.sql b/models/silver/swaps/raydium/silver__swaps_intermediate_raydium_v4_amm.sql index af2b7e9d..224bed51 100644 --- a/models/silver/swaps/raydium/silver__swaps_intermediate_raydium_v4_amm.sql +++ b/models/silver/swaps/raydium/silver__swaps_intermediate_raydium_v4_amm.sql @@ -10,47 +10,58 @@ {% if execute %} {% set base_query %} - CREATE - OR REPLACE temporary TABLE silver.swaps_intermediate_raydium_v4_amm__intermediate_tmp AS + CREATE OR REPLACE TEMPORARY TABLE silver.swaps_intermediate_raydium_v4_amm__intermediate_tmp AS + WITH distinct_entities AS ( + SELECT DISTINCT + tx_id + FROM + {{ ref('silver__decoded_instructions_combined') }} + WHERE + program_id = '675kPX9MHTjS2zt1qfr1NYHuzeLXfQM9H24wFSUt1Mp8' + AND event_type IN ( + 'swapBaseIn', + 'swapBaseOut' + ) - SELECT - block_timestamp, - block_id, - tx_id, - signers, - succeeded, - INDEX, - inner_index, - program_id, - event_type, - decoded_instruction, - _inserted_timestamp - FROM - {{ ref('silver__decoded_instructions_combined') }} - WHERE - program_id = '675kPX9MHTjS2zt1qfr1NYHuzeLXfQM9H24wFSUt1Mp8' - AND event_type IN ( - 'swapBaseIn', - 'swapBaseOut' + {% if is_incremental() %} + AND _inserted_timestamp >= ( + SELECT + MAX(_inserted_timestamp) - INTERVAL '1 hour' + FROM + {{ this }} + ) + {% else %} + AND _inserted_timestamp :: DATE >= '2024-05-14' + {% endif %} ) - -{% if is_incremental() %} -AND _inserted_timestamp >= ( - SELECT - MAX(_inserted_timestamp) - INTERVAL '1 hour' - FROM - {{ this }} -) -{% else %} - AND _inserted_timestamp :: DATE >= '2024-05-14' -{% endif %} - -{% endset %} -{% do run_query(base_query) %} -{% set between_stmts = fsc_utils.dynamic_range_predicate( - "silver.swaps_intermediate_raydium_v4_amm__intermediate_tmp", - "block_timestamp::date" -) %} + /* need to re-select all decoded instructions from all tx_ids in incremental subset + in order for the window function to output accurate values */ + SELECT + d.block_timestamp, + d.block_id, + d.tx_id, + d.signers, + d.succeeded, + d.INDEX, + d.inner_index, + d.program_id, + d.event_type, + d.decoded_instruction, + d._inserted_timestamp + FROM + {{ ref('silver__decoded_instructions_combined') }} d + JOIN + distinct_entities + USING(tx_id) + WHERE + program_id = '675kPX9MHTjS2zt1qfr1NYHuzeLXfQM9H24wFSUt1Mp8' + AND event_type IN ( + 'swapBaseIn', + 'swapBaseOut' + ) + {% endset %} + {% do run_query(base_query) %} + {% set between_stmts = fsc_utils.dynamic_range_predicate("silver.swaps_intermediate_raydium_v4_amm__intermediate_tmp","block_timestamp::date") %} {% endif %} WITH base AS (