Swaps increment factory (#453)
Some checks failed
docs_update / run_dbt_jobs (push) Has been cancelled
docs_update / notify-failure (push) Has been cancelled
dbt_run_streamline_decoded_logs_history / run_dbt_jobs (push) Has been cancelled
dbt_run_streamline_decoded_logs_history / notify-failure (push) Has been cancelled
dbt_run_streamline_evm_daily_silver / run_dbt_jobs (push) Has been cancelled
dbt_test_recent_evm / run_dbt_jobs (push) Has been cancelled
dbt_run_streamline_evm_daily_silver / notify-failure (push) Has been cancelled
dbt_test_recent_evm / notify-failure (push) Has been cancelled
dbt_run_scheduled_streamline_non_core / run_dbt_jobs (push) Has been cancelled
dbt_run_scheduled_scores / run_dbt_jobs (push) Has been cancelled
dbt_run_scheduled_daily / run_dbt_jobs (push) Has been cancelled
dbt_run_scheduled_streamline_non_core / notify-failure (push) Has been cancelled
dbt_run_scheduled_scores / notify-failure (push) Has been cancelled
dbt_run_scheduled_daily / notify-failure (push) Has been cancelled
dbt_test_scheduled / run_dbt_jobs (push) Has been cancelled
dbt_run_scheduled_non_core / run_dbt_jobs (push) Has been cancelled
dbt_run_scheduled_abis / run_dbt_jobs (push) Has been cancelled
dbt_run_evm / run_dbt_jobs (push) Has been cancelled
dbt_run_scheduled_non_core / notify-failure (push) Has been cancelled
dbt_run_scheduled_abis / notify-failure (push) Has been cancelled
dbt_run_evm / notify-failure (push) Has been cancelled

* added

* swaps final

* added

* reorg files

* reorg + rename

---------

Co-authored-by: SAI <sairaj@flipsidecrypto.com>
Co-authored-by: gregoriustanleyy <gstanleytejakusuma@gmail.com>
This commit is contained in:
SAI 2025-07-28 11:10:55 -05:00 committed by GitHub
parent 84ca2eb298
commit 5194f813dd
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 357 additions and 6 deletions

View File

@ -1,7 +1,6 @@
{{ config (
materialized = 'view',
tags = ['ez', 'scheduled'],
meta ={ 'database_tags':{ 'table':{ 'PURPOSE': 'SWAPS' }} }
tags = ['ez', 'scheduled']
) }}
WITH prices AS (

View File

@ -0,0 +1,41 @@
{{ config(
materialized = 'incremental',
incremental_strategy = 'merge',
unique_key = 'event_contract',
tags = ['scheduled_non_core','increment', 'dex']
) }}
WITH pair_data AS (
SELECT
event_data:pairAddress::STRING AS pairAddress,
'A.' || SUBSTR(pairAddress, 3, 99) || '.SwapPair' AS event_contract,
event_data:token0Key::STRING AS token0_contract,
event_data:token1Key::STRING AS token1_contract,
modified_timestamp
FROM
{{ ref('silver__streamline_events') }}
WHERE
event_contract = 'A.b063c16cac85dbd1.SwapFactory'
AND event_type = 'PairCreated'
{% if is_incremental() %}
AND modified_timestamp >= (
SELECT
MAX(modified_timestamp)
FROM
{{ this }}
)
{% endif %}
)
SELECT
pairAddress,
event_contract,
token0_contract,
token1_contract,
-- Now we use the event_contract from the CTE which is guaranteed to be the unique pair contract
{{ dbt_utils.generate_surrogate_key(['event_contract']) }} AS increment_deployed_pairs_id,
SYSDATE() AS inserted_timestamp,
SYSDATE() AS modified_timestamp
FROM
pair_data

View File

@ -0,0 +1,43 @@
version: 2
models:
- name: silver__increment_deployed_pairs
description: |-
This table records newly deployed SwapPair contracts on the Flow blockchain, as detected from PairCreated events emitted by the SwapFactory contract.
columns:
- name: pairAddress
description: "The address of the newly created pair contract."
tests:
- not_null
- name: event_contract
description: "The fully qualified contract name for the SwapPair."
tests:
- not_null
- name: token0_contract
description: "The contract address/key for token0 in the pair."
tests:
- not_null
- name: token1_contract
description: "The contract address/key for token1 in the pair."
tests:
- not_null
- name: increment_deployed_pairs_id
description: "A surrogate key for the row, generated from event_contract."
tests:
- not_null
- unique
- name: inserted_timestamp
description: "The timestamp when the row was inserted."
tests:
- not_null
- name: modified_timestamp
description: "The timestamp when the row was last modified."
tests:
- not_null

View File

@ -0,0 +1,117 @@
{{ config(
materialized = 'incremental',
incremental_strategy = 'delete+insert',
cluster_by = ['block_timestamp::DATE'],
unique_key = 'increment_swaps_id',
tags = ['scheduled_non_core', 'increment', 'dex']
) }}
WITH increment_pairs AS (
-- Get all deployed Increment pairs
SELECT
event_contract AS pair_contract
FROM
{{ ref('silver__increment_deployed_pairs') }}
),
events AS (
SELECT
block_height,
block_timestamp,
tx_id,
event_index,
event_contract,
event_type,
event_data,
_inserted_timestamp,
modified_timestamp AS _modified_timestamp
FROM
{{ ref('silver__streamline_events') }}
WHERE
event_contract IN (SELECT pair_contract FROM increment_pairs)
AND event_type = 'Swap'
{% if is_incremental() %}
AND modified_timestamp >= (
SELECT
MAX(modified_timestamp)
FROM
{{ this }}
)
{% endif %}
),
swappair_events AS (
SELECT
block_height,
block_timestamp,
tx_id,
event_index,
event_contract AS swap_contract,
NULL AS platform,
CASE
WHEN event_data:amount0In::FLOAT > 0 THEN event_data:amount0In::FLOAT
ELSE event_data:amount1In::FLOAT
END AS token_in_amount,
CASE
WHEN event_data:amount0In::FLOAT > 0 THEN event_data:amount0Type::STRING
ELSE event_data:amount1Type::STRING
END AS token_in_contract,
CASE
WHEN event_data:amount0Out::FLOAT > 0 THEN event_data:amount0Out::FLOAT
ELSE event_data:amount1Out::FLOAT
END AS token_out_amount,
CASE
WHEN event_data:amount0Out::FLOAT > 0 THEN event_data:amount0Type::STRING
ELSE event_data:amount1Type::STRING
END AS token_out_contract,
_modified_timestamp
FROM
events
),
transactions AS (
SELECT
tx_id,
authorizers
FROM
{{ ref('silver__streamline_transactions_final') }}
WHERE
tx_id IN (SELECT DISTINCT tx_id FROM swappair_events)
{% if is_incremental() %}
AND modified_timestamp >= (
SELECT
MAX(modified_timestamp)
FROM
{{ this }}
)
{% endif %}
)
SELECT
s.block_height,
s.block_timestamp,
s.tx_id,
ROW_NUMBER() OVER (PARTITION BY s.tx_id ORDER BY s.event_index) - 1 AS swap_index,
s.swap_contract,
s.platform,
t.authorizers[0]::STRING AS trader,
s.token_in_amount,
s.token_in_contract,
s.token_out_amount,
s.token_out_contract,
{{ dbt_utils.generate_surrogate_key(['s.tx_id', 's.event_index']) }} AS increment_swaps_id,
SYSDATE() AS inserted_timestamp,
SYSDATE() AS modified_timestamp,
'{{ invocation_id }}' AS _invocation_id
FROM
swappair_events s
LEFT JOIN
transactions t ON s.tx_id = t.tx_id

View File

@ -0,0 +1,111 @@
version: 2
models:
- name: silver__swaps_factory
description: |-
This table records asset swaps on the Flow blockchain parsed from Swap events emitted by Increment SwapPair contracts (via the SwapFactory contract).
tests:
- dbt_utils.unique_combination_of_columns:
combination_of_columns:
- tx_id
- swap_index
columns:
- name: block_height
description: "{{ doc('block_height') }}"
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- NUMBER
- FLOAT
- name: block_timestamp
description: "{{ doc('block_timestamp') }}"
tests:
- not_null
- dbt_expectations.expect_row_values_to_have_recent_data:
datepart: day
interval: 1
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- TIMESTAMP_NTZ
- name: tx_id
description: "{{ doc('tx_id') }}"
tests:
- not_null
- name: swap_index
description: "{{ doc('swap_index') }}"
- name: swap_contract
description: "{{ doc('swap_contract') }}"
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- STRING
- VARCHAR
- name: platform
description: "The platform or protocol for the swap. (Always NULL for Increment pairs)"
- name: trader
description: "{{ doc('trader') }}"
tests:
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- STRING
- VARCHAR
- name: token_in_amount
description: "{{ doc('token_in_amount') }}"
tests:
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- NUMBER
- FLOAT
- name: token_in_contract
description: "{{ doc('token_in_contract') }}"
tests:
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- STRING
- VARCHAR
- name: token_out_amount
description: "{{ doc('token_out_amount') }}"
tests:
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- NUMBER
- FLOAT
- name: token_out_contract
description: "{{ doc('token_out_contract') }}"
tests:
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- STRING
- VARCHAR
- name: increment_swaps_id
description: "A surrogate key for the row, generated from tx_id and event_index."
tests:
- not_null
- unique
- name: inserted_timestamp
description: "The timestamp when the row was inserted."
tests:
- not_null
- name: modified_timestamp
description: "The timestamp when the row was last modified."
tests:
- not_null
- name: _invocation_id
description: "The dbt invocation ID for the run that produced this row."

View File

@ -38,6 +38,38 @@ WHERE
)
{% endif %}
),
swaps_from_increment_factory AS (
SELECT
block_height,
block_timestamp,
tx_id,
swap_index,
swap_contract,
NULL AS platform,
trader,
token_in_amount,
token_in_contract,
NULL AS token_in_destination,
token_out_amount,
token_out_contract,
NULL AS token_out_source,
modified_timestamp AS _modified_timestamp,
2 AS _priority -- Priority between aggregator and regular swaps
FROM
{{ ref('silver__increment_swaps') }}
{% if is_incremental() %}
WHERE
modified_timestamp >= (
SELECT
MAX(modified_timestamp)
FROM
{{ this }}
)
{% endif %}
),
swaps AS (
SELECT
block_height,
@ -59,10 +91,13 @@ swaps AS (
{{ ref('silver__swaps_s') }}
WHERE
tx_id NOT IN (
SELECT
DISTINCT tx_id
FROM
swaps_from_aggregator
SELECT DISTINCT tx_id
FROM swaps_from_aggregator
UNION ALL
SELECT DISTINCT tx_id
FROM swaps_from_increment_factory
)
{% if is_incremental() %}
@ -80,6 +115,11 @@ swaps_union AS (
FROM
swaps_from_aggregator
UNION ALL
SELECT
*
FROM
swaps_from_increment_factory
UNION ALL
SELECT
*
FROM