flow-models/models/silver/defi/silver__swaps_aggregator.sql
Jack Forgash 45a68debed
Some checks failed
docs_update / run_dbt_jobs (push) Has been cancelled
dbt_run_streamline_evm_daily_realtime / run_dbt_jobs (push) Has been cancelled
dbt_run_moments_metadata / run_dbt_jobs (push) Has been cancelled
dbt_observability_models / run_dbt_jobs (push) Has been cancelled
dbt_run_streamline_evm_daily_silver / run_dbt_jobs (push) Has been cancelled
dbt_run_evm_decoded_logs / run_dbt_jobs (push) Has been cancelled
dbt_run_streamline_transactions / run_dbt_jobs (push) Has been cancelled
dbt_run_streamline_transaction_results / run_dbt_jobs (push) Has been cancelled
dbt_run_streamline_external_realtime / run_dbt_jobs (push) Has been cancelled
dbt_run_streamline_external_points_balances_realtime / run_dbt_jobs (push) Has been cancelled
dbt_run_streamline_evm_realtime / run_dbt_jobs (push) Has been cancelled
dbt_run_scheduled / run_dbt_jobs (push) Has been cancelled
dbt_run_streamline_blocks / run_dbt_jobs (push) Has been cancelled
dbt_run_evm / run_dbt_jobs (push) Has been cancelled
dbt_run_scheduled_non_core / run_dbt_jobs (push) Has been cancelled
dbt_run_streamline_collections / run_dbt_jobs (push) Has been cancelled
docs_update / notify-failure (push) Has been cancelled
dbt_run_streamline_evm_daily_realtime / notify-failure (push) Has been cancelled
dbt_run_moments_metadata / notify-failure (push) Has been cancelled
dbt_observability_models / notify-failure (push) Has been cancelled
dbt_run_streamline_evm_daily_silver / notify-failure (push) Has been cancelled
dbt_run_evm_decoded_logs / notify-failure (push) Has been cancelled
dbt_run_streamline_transactions / notify-failure (push) Has been cancelled
dbt_run_streamline_transaction_results / notify-failure (push) Has been cancelled
dbt_run_streamline_external_realtime / notify-failure (push) Has been cancelled
dbt_run_streamline_external_points_balances_realtime / notify-failure (push) Has been cancelled
dbt_run_streamline_evm_realtime / notify-failure (push) Has been cancelled
dbt_run_scheduled / notify-failure (push) Has been cancelled
dbt_run_streamline_blocks / notify-failure (push) Has been cancelled
dbt_run_evm / notify-failure (push) Has been cancelled
dbt_run_scheduled_non_core / notify-failure (push) Has been cancelled
dbt_run_streamline_collections / notify-failure (push) Has been cancelled
dbt_run_streamline_decoded_logs_history / run_dbt_jobs (push) Has been cancelled
dbt_run_streamline_decoded_logs_history / notify-failure (push) Has been cancelled
AN-6231/modernize incremental predicates (#442)
* upd incr predicate on silver core models

* upd rest of the models

* rm predicate from d+i model
2025-08-06 12:40:17 -04:00

106 lines
2.6 KiB
SQL

{{ config(
materialized = 'incremental',
incremental_predicates = ["dynamic_range_predicate", "block_timestamp::DATE"],
incremental_strategy = 'merge',
merge_exclude_columns = ["inserted_timestamp"],
cluster_by = ['inserted_timestamp::DATE'],
unique_key = 'swaps_aggregator_id',
tags = ['scheduled_non_core']
) }}
WITH events AS (
SELECT
block_height,
block_timestamp,
tx_id,
event_index,
event_contract,
event_type,
event_data,
_inserted_timestamp,
modified_timestamp AS _modified_timestamp
FROM
{{ ref('silver__streamline_events') }}
{% if is_incremental() %}
WHERE
_modified_timestamp >= (
SELECT
MAX(modified_timestamp)
FROM
{{ this }}
)
{% endif %}
),
parse_swap_log AS (
SELECT
block_height,
block_timestamp,
tx_id,
event_type,
ROW_NUMBER() over (
PARTITION BY tx_id
ORDER BY
event_index
) - 1 AS swap_index,
event_data :poolAddress :: STRING AS pool_address,
event_data :poolSource :: STRING AS pool_source,
event_data :tokenInAmount :: FLOAT AS token_in_amount,
event_data :tokenInKey :: STRING AS token_in_contract,
event_data :tokenOutAmount :: FLOAT AS token_out_amount,
event_data :tokenOutKey :: STRING AS token_out_contract
FROM
events
WHERE
event_contract = 'A.e876e00638d54e75.LogEntry'
AND event_type = 'PoolSwapInAggregator'
AND block_height >= 67100587
),
transactions AS (
SELECT
tx_id,
authorizers,
modified_timestamp AS _modified_timestamp
FROM
{{ ref('silver__streamline_transactions_final') }}
WHERE
tx_id IN (
SELECT
DISTINCT tx_id
FROM
parse_swap_log
)
AND block_height >= 67100587
{% if is_incremental() %}
AND _modified_timestamp >= (
SELECT
MAX(modified_timestamp)
FROM
{{ this }}
)
{% endif %}
)
SELECT
block_height,
block_timestamp,
s.tx_id,
swap_index,
pool_address,
pool_source,
t.authorizers [0] :: STRING AS trader,
token_in_amount,
token_in_contract,
token_out_amount,
token_out_contract,
{{ dbt_utils.generate_surrogate_key(
['tx_id','swap_index']
) }} AS swaps_aggregator_id,
SYSDATE() AS inserted_timestamp,
SYSDATE() AS modified_timestamp,
'{{ invocation_id }}' AS invocation_id
FROM
parse_swap_log s
LEFT JOIN transactions t USING (tx_id)