flow-models/models/silver/defi/silver__swaps_final.sql
SAI 5194f813dd
Some checks failed
docs_update / run_dbt_jobs (push) Has been cancelled
docs_update / notify-failure (push) Has been cancelled
dbt_run_streamline_decoded_logs_history / run_dbt_jobs (push) Has been cancelled
dbt_run_streamline_decoded_logs_history / notify-failure (push) Has been cancelled
dbt_run_streamline_evm_daily_silver / run_dbt_jobs (push) Has been cancelled
dbt_test_recent_evm / run_dbt_jobs (push) Has been cancelled
dbt_run_streamline_evm_daily_silver / notify-failure (push) Has been cancelled
dbt_test_recent_evm / notify-failure (push) Has been cancelled
dbt_run_scheduled_streamline_non_core / run_dbt_jobs (push) Has been cancelled
dbt_run_scheduled_scores / run_dbt_jobs (push) Has been cancelled
dbt_run_scheduled_daily / run_dbt_jobs (push) Has been cancelled
dbt_run_scheduled_streamline_non_core / notify-failure (push) Has been cancelled
dbt_run_scheduled_scores / notify-failure (push) Has been cancelled
dbt_run_scheduled_daily / notify-failure (push) Has been cancelled
dbt_test_scheduled / run_dbt_jobs (push) Has been cancelled
dbt_run_scheduled_non_core / run_dbt_jobs (push) Has been cancelled
dbt_run_scheduled_abis / run_dbt_jobs (push) Has been cancelled
dbt_run_evm / run_dbt_jobs (push) Has been cancelled
dbt_run_scheduled_non_core / notify-failure (push) Has been cancelled
dbt_run_scheduled_abis / notify-failure (push) Has been cancelled
dbt_run_evm / notify-failure (push) Has been cancelled
Swaps increment factory (#453)
* added

* swaps final

* added

* reorg files

* reorg + rename

---------

Co-authored-by: SAI <sairaj@flipsidecrypto.com>
Co-authored-by: gregoriustanleyy <gstanleytejakusuma@gmail.com>
2025-07-28 23:10:55 +07:00

141 lines
3.1 KiB
SQL

{{ config(
materialized = 'incremental',
incremental_strategy = 'delete+insert',
cluster_by = ['block_timestamp::DATE'],
unique_key = 'swaps_final_id',
post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION ON EQUALITY(tx_id,trader);",
tags = ['scheduled_non_core']
) }}
WITH swaps_from_aggregator AS (
SELECT
block_height,
block_timestamp,
tx_id,
swap_index,
pool_address AS swap_contract,
pool_source AS platform,
trader,
token_in_amount,
token_in_contract,
NULL AS token_in_destination,
token_out_amount,
token_out_contract,
NULL AS token_out_source,
modified_timestamp AS _modified_timestamp,
0 AS _priority
FROM
{{ ref('silver__swaps_aggregator') }}
{% if is_incremental() %}
WHERE
_modified_timestamp >= (
SELECT
MAX(modified_timestamp)
FROM
{{ this }}
)
{% endif %}
),
swaps_from_increment_factory AS (
SELECT
block_height,
block_timestamp,
tx_id,
swap_index,
swap_contract,
NULL AS platform,
trader,
token_in_amount,
token_in_contract,
NULL AS token_in_destination,
token_out_amount,
token_out_contract,
NULL AS token_out_source,
modified_timestamp AS _modified_timestamp,
2 AS _priority -- Priority between aggregator and regular swaps
FROM
{{ ref('silver__increment_swaps') }}
{% if is_incremental() %}
WHERE
modified_timestamp >= (
SELECT
MAX(modified_timestamp)
FROM
{{ this }}
)
{% endif %}
),
swaps AS (
SELECT
block_height,
block_timestamp,
tx_id,
swap_index,
swap_contract,
NULL AS platform,
trader,
token_in_amount,
token_in_contract,
token_in_destination,
token_out_amount,
token_out_contract,
token_out_source,
modified_timestamp AS _modified_timestamp,
1 AS _priority
FROM
{{ ref('silver__swaps_s') }}
WHERE
tx_id NOT IN (
SELECT DISTINCT tx_id
FROM swaps_from_aggregator
UNION ALL
SELECT DISTINCT tx_id
FROM swaps_from_increment_factory
)
{% if is_incremental() %}
AND _modified_timestamp >= (
SELECT
MAX(modified_timestamp)
FROM
{{ this }}
)
{% endif %}
),
swaps_union AS (
SELECT
*
FROM
swaps_from_aggregator
UNION ALL
SELECT
*
FROM
swaps_from_increment_factory
UNION ALL
SELECT
*
FROM
swaps
) {# Note - curr prices pipeline does not include token address data, making the join difficult and
inaccurate.NEW prices models DO have this so will
ADD
price fields WITH may RELEASE.#}
SELECT
*,
{{ dbt_utils.generate_surrogate_key(
['tx_id', 'swap_index']
) }} AS swaps_final_id,
SYSDATE() AS inserted_timestamp,
SYSDATE() AS modified_timestamp,
'{{ invocation_id }}' AS _invocation_id
FROM
swaps_union