From 1bb324b1bf2bf86f030fee0bac142a1d9315bb7d Mon Sep 17 00:00:00 2001 From: tarikceric <46071768+tarikceric@users.noreply.github.com> Date: Mon, 28 Apr 2025 14:50:26 -0700 Subject: [PATCH] add updated orca token swap (#834) --- ...er__swaps_intermediate_orca_token_swap.sql | 192 ++++++++++++++++++ ...er__swaps_intermediate_orca_token_swap.yml | 89 ++++++++ 2 files changed, 281 insertions(+) create mode 100644 models/silver/swaps/orca/token_swap/silver__swaps_intermediate_orca_token_swap.sql create mode 100644 models/silver/swaps/orca/token_swap/silver__swaps_intermediate_orca_token_swap.yml diff --git a/models/silver/swaps/orca/token_swap/silver__swaps_intermediate_orca_token_swap.sql b/models/silver/swaps/orca/token_swap/silver__swaps_intermediate_orca_token_swap.sql new file mode 100644 index 00000000..311c81c1 --- /dev/null +++ b/models/silver/swaps/orca/token_swap/silver__swaps_intermediate_orca_token_swap.sql @@ -0,0 +1,192 @@ +-- depends_on: {{ ref('silver__decoded_instructions_combined') }} +{{ config( + materialized = 'incremental', + unique_key = ['swaps_intermediate_orca_token_swap_id'], + incremental_predicates = ["dynamic_range_predicate", "block_timestamp::date"], + merge_exclude_columns = ["inserted_timestamp"], + cluster_by = ['block_timestamp::DATE','modified_timestamp::DATE'], + post_hook = enable_search_optimization( + '{{this.schema}}', + '{{this.identifier}}', + 'ON EQUALITY(tx_id, swapper, from_mint, to_mint)' + ), + tags = ['scheduled_non_core','scheduled_non_core_hourly'] +) }} + +{% set batch_backfill_size = var('batch_backfill_size', 0) %} +{% set batch_backfill = False if batch_backfill_size == 0 else True %} + +{% if execute %} + {% set base_query %} + CREATE OR REPLACE TEMPORARY TABLE silver.swaps_intermediate_orca_token_swap__intermediate_tmp AS + WITH distinct_entities AS ( + SELECT DISTINCT + tx_id, + block_timestamp + FROM + {{ ref('silver__decoded_instructions_combined') }} + WHERE + program_id in ('9W959DqEETiGZocYWCQPaJ6sBmUzgfxXfqGeTEdp3aQP', 'DjVE6JNiYqPL2QXyCUUh8rNjHrbz9hXHNYt99MQ59qw1') + AND event_type in ('swap') + AND succeeded + {% if is_incremental() and not batch_backfill %} + AND _inserted_timestamp >= ( + SELECT + max(_inserted_timestamp) - INTERVAL '1 hour' + FROM + {{ this }} + ) + {% elif batch_backfill %} + {% set max_block_ts_query %} + SELECT max(_inserted_timestamp)::date FROM {{ this }} + {% endset %} + {% set max_block_ts = run_query(max_block_ts_query)[0][0] %} + {% set end_date = max_block_ts + modules.datetime.timedelta(days=batch_backfill_size) %} + AND _inserted_timestamp::date BETWEEN '{{ max_block_ts }}' AND '{{ end_date }}' + {% else %} + AND _inserted_timestamp::DATE BETWEEN '2024-12-23' AND '2025-01-01' + {% endif %} + ) + + SELECT + d.block_timestamp, + d.block_id, + d.tx_id, + d.signers, + d.succeeded, + d.index, + d.inner_index, + d.program_id, + d.event_type, + d.decoded_instruction, + d._inserted_timestamp + FROM + {{ ref('silver__decoded_instructions_combined') }} d + JOIN + distinct_entities + USING(tx_id) + WHERE + program_id in ('9W959DqEETiGZocYWCQPaJ6sBmUzgfxXfqGeTEdp3aQP', 'DjVE6JNiYqPL2QXyCUUh8rNjHrbz9hXHNYt99MQ59qw1') + AND event_type in ('swap') + AND d.block_timestamp >= ( + SELECT + min(block_timestamp) + FROM + distinct_entities + ) + {% endset %} + {% do run_query(base_query) %} + {% set between_stmts = fsc_utils.dynamic_range_predicate("silver.swaps_intermediate_orca_token_swap__intermediate_tmp","block_timestamp::date") %} +{% endif %} + +WITH base AS ( + SELECT + * + FROM + silver.swaps_intermediate_orca_token_swap__intermediate_tmp +), + +decoded AS ( + SELECT + block_timestamp, + block_id, + tx_id, + index, + inner_index, + COALESCE(LEAD(inner_index) OVER (PARTITION BY tx_id, index + ORDER BY inner_index) -1, 999999 + ) AS inner_index_end, + program_id, + silver.udf_get_account_pubkey_by_name('userTransferAuthority', decoded_instruction:accounts) as swapper, + silver.udf_get_account_pubkey_by_name('source', decoded_instruction:accounts) as source_token_account, + null as source_mint, + null as destination_mint, + silver.udf_get_account_pubkey_by_name('destination', decoded_instruction:accounts) as destination_token_account, + silver.udf_get_account_pubkey_by_name('swapDestination', decoded_instruction:accounts) as program_destination_token_account, + silver.udf_get_account_pubkey_by_name('swapSource', decoded_instruction:accounts) as program_source_token_account, + _inserted_timestamp + FROM + base +), + +transfers AS ( + SELECT + tr.*, + coalesce(split_part(index::text, '.', 1)::INT, index::INT) AS index_1, + nullif(split_part(index::text, '.', 2), '')::INT AS inner_index_1 + FROM + {{ ref('silver__transfers') }} AS tr + INNER JOIN ( + SELECT + DISTINCT tx_id, + block_timestamp :: DATE AS block_date + FROM + decoded + ) AS d + ON d.block_date = tr.block_timestamp :: DATE + AND d.tx_id = tr.tx_id + WHERE + tr.succeeded + AND {{ between_stmts }} +), + +pre_final AS ( + SELECT + A.block_id, + A.block_timestamp, + A.program_id, + A.tx_id, + A.index, + A.inner_index, + A.inner_index_end, + C.succeeded, + A.swapper, + b.amount AS from_amt, + b.mint AS from_mint, + C.amount AS to_amt, + C.mint AS to_mint, + A._inserted_timestamp + FROM + decoded A + LEFT JOIN transfers b + ON A.tx_id = b.tx_id + AND A.source_token_account = b.source_token_account + AND A.program_source_token_account = b.dest_token_account + AND A.index = b.index_1 + AND ( + (b.inner_index_1 BETWEEN A.inner_index AND A.inner_index_end) + OR A.inner_index IS NULL + ) + LEFT JOIN transfers C + ON A.tx_id = C.tx_id + AND A.destination_token_account = C.dest_token_account + AND A.program_destination_token_account = C.source_token_account + AND A.index = C.index_1 + AND ( + (C.inner_index_1 BETWEEN A.inner_index AND A.inner_index_end) + OR A.inner_index IS NULL + ) + QUALIFY ROW_NUMBER() over (PARTITION BY A.tx_id, A.index, A.inner_INDEX ORDER BY inner_index) = 1 +) + +SELECT + block_id, + block_timestamp, + program_id, + tx_id, + succeeded, + row_number() OVER (PARTITION BY tx_id ORDER BY index, inner_index) AS swap_index, + index, + inner_index, + swapper, + from_amt, + from_mint, + to_amt, + to_mint, + _inserted_timestamp, + {{ dbt_utils.generate_surrogate_key(['tx_id','swap_index','program_id']) }} AS swaps_intermediate_orca_token_swap_id, + sysdate() AS inserted_timestamp, + sysdate() AS modified_timestamp, + '{{ invocation_id }}' AS _invocation_id +FROM + pre_final diff --git a/models/silver/swaps/orca/token_swap/silver__swaps_intermediate_orca_token_swap.yml b/models/silver/swaps/orca/token_swap/silver__swaps_intermediate_orca_token_swap.yml new file mode 100644 index 00000000..94b140fb --- /dev/null +++ b/models/silver/swaps/orca/token_swap/silver__swaps_intermediate_orca_token_swap.yml @@ -0,0 +1,89 @@ +version: 2 +models: + - name: silver__swaps_intermediate_orca_token_swap + data_tests: + - dbt_utils.unique_combination_of_columns: + combination_of_columns: + - TX_ID + - SWAP_INDEX + - PROGRAM_ID + where: block_timestamp::date > current_date - 7 + recent_date_filter: &recent_date_filter + config: + where: _inserted_timestamp >= current_date - 7 + columns: + - name: BLOCK_TIMESTAMP + description: "{{ doc('block_timestamp') }}" + data_tests: + - dbt_expectations.expect_row_values_to_have_recent_data: + datepart: day + interval: 2 + - not_null: *recent_date_filter + - name: BLOCK_ID + description: "{{ doc('block_id') }}" + data_tests: + - not_null: *recent_date_filter + - name: TX_ID + description: "{{ doc('tx_id') }}" + data_tests: + - not_null: *recent_date_filter + - name: SUCCEEDED + description: "{{ doc('tx_succeeded') }}" + data_tests: + - not_null: *recent_date_filter + - name: INDEX + description: "{{ doc('index') }}" + data_tests: + - not_null: *recent_date_filter + - name: INNER_INDEX + description: "{{ doc('inner_index') }}" + - name: PROGRAM_ID + description: "{{ doc('program_id') }}" + data_tests: + - not_null: *recent_date_filter + - name: SWAPPER + description: "{{ doc('swaps_swapper') }}" + data_tests: + - not_null: *recent_date_filter + - name: FROM_AMT + description: "{{ doc('swaps_from_amt') }}" + data_tests: + - not_null: *recent_date_filter + - name: FROM_MINT + description: "{{ doc('swaps_from_mint') }}" + data_tests: + - not_null: *recent_date_filter + - name: TO_AMT + description: "{{ doc('swaps_to_amt') }}" + data_tests: + - not_null: *recent_date_filter + - name: TO_MINT + description: "{{ doc('swaps_to_mint') }}" + data_tests: + - not_null: *recent_date_filter + - name: SWAP_INDEX + description: "{{ doc('swaps_swap_index') }}" + data_tests: + - not_null: *recent_date_filter + - name: _INSERTED_TIMESTAMP + description: "{{ doc('_inserted_timestamp') }}" + data_tests: + - not_null + - name: SWAPS_INTERMEDIATE_ORCA_TOKEN_SWAP_ID + description: '{{ doc("pk") }}' + data_tests: + - unique: *recent_date_filter + - name: INSERTED_TIMESTAMP + description: '{{ doc("inserted_timestamp") }}' + data_tests: + - not_null: *recent_date_filter + - name: MODIFIED_TIMESTAMP + description: '{{ doc("modified_timestamp") }}' + data_tests: + - not_null: *recent_date_filter + - name: _INVOCATION_ID + description: '{{ doc("_invocation_id") }}' + data_tests: + - not_null: + name: test_silver__not_null_swaps_intermediate_orca_token_swap__invocation_id + <<: *recent_date_filter \ No newline at end of file