diff --git a/models/gold/defi/defi__ez_dex_swaps.sql b/models/gold/defi/defi__ez_dex_swaps.sql index 77a1d57..cca9887 100644 --- a/models/gold/defi/defi__ez_dex_swaps.sql +++ b/models/gold/defi/defi__ez_dex_swaps.sql @@ -1,7 +1,6 @@ {{ config ( materialized = 'view', - tags = ['ez', 'scheduled'], - meta ={ 'database_tags':{ 'table':{ 'PURPOSE': 'SWAPS' }} } + tags = ['ez', 'scheduled'] ) }} WITH prices AS ( diff --git a/models/silver/defi/dex/increment/silver__increment_deployed_pairs.sql b/models/silver/defi/dex/increment/silver__increment_deployed_pairs.sql new file mode 100644 index 0000000..7ce5bcc --- /dev/null +++ b/models/silver/defi/dex/increment/silver__increment_deployed_pairs.sql @@ -0,0 +1,41 @@ +{{ config( + materialized = 'incremental', + incremental_strategy = 'merge', + unique_key = 'event_contract', + tags = ['scheduled_non_core','increment', 'dex'] +) }} + +WITH pair_data AS ( + SELECT + event_data:pairAddress::STRING AS pairAddress, + 'A.' || SUBSTR(pairAddress, 3, 99) || '.SwapPair' AS event_contract, + event_data:token0Key::STRING AS token0_contract, + event_data:token1Key::STRING AS token1_contract, + modified_timestamp + FROM + {{ ref('silver__streamline_events') }} + WHERE + event_contract = 'A.b063c16cac85dbd1.SwapFactory' + AND event_type = 'PairCreated' + + {% if is_incremental() %} + AND modified_timestamp >= ( + SELECT + MAX(modified_timestamp) + FROM + {{ this }} + ) + {% endif %} +) + +SELECT + pairAddress, + event_contract, + token0_contract, + token1_contract, + -- Now we use the event_contract from the CTE which is guaranteed to be the unique pair contract + {{ dbt_utils.generate_surrogate_key(['event_contract']) }} AS increment_deployed_pairs_id, + SYSDATE() AS inserted_timestamp, + SYSDATE() AS modified_timestamp +FROM + pair_data \ No newline at end of file diff --git a/models/silver/defi/dex/increment/silver__increment_deployed_pairs.yml b/models/silver/defi/dex/increment/silver__increment_deployed_pairs.yml new file mode 100644 index 0000000..5b18635 --- /dev/null +++ b/models/silver/defi/dex/increment/silver__increment_deployed_pairs.yml @@ -0,0 +1,43 @@ +version: 2 + +models: + - name: silver__increment_deployed_pairs + description: |- + This table records newly deployed SwapPair contracts on the Flow blockchain, as detected from PairCreated events emitted by the SwapFactory contract. + + columns: + - name: pairAddress + description: "The address of the newly created pair contract." + tests: + - not_null + + - name: event_contract + description: "The fully qualified contract name for the SwapPair." + tests: + - not_null + + - name: token0_contract + description: "The contract address/key for token0 in the pair." + tests: + - not_null + + - name: token1_contract + description: "The contract address/key for token1 in the pair." + tests: + - not_null + + - name: increment_deployed_pairs_id + description: "A surrogate key for the row, generated from event_contract." + tests: + - not_null + - unique + + - name: inserted_timestamp + description: "The timestamp when the row was inserted." + tests: + - not_null + + - name: modified_timestamp + description: "The timestamp when the row was last modified." + tests: + - not_null \ No newline at end of file diff --git a/models/silver/defi/dex/increment/silver__increment_swaps.sql b/models/silver/defi/dex/increment/silver__increment_swaps.sql new file mode 100644 index 0000000..3cbac11 --- /dev/null +++ b/models/silver/defi/dex/increment/silver__increment_swaps.sql @@ -0,0 +1,117 @@ +{{ config( + materialized = 'incremental', + incremental_strategy = 'delete+insert', + cluster_by = ['block_timestamp::DATE'], + unique_key = 'increment_swaps_id', + tags = ['scheduled_non_core', 'increment', 'dex'] +) }} + +WITH increment_pairs AS ( + -- Get all deployed Increment pairs + SELECT + event_contract AS pair_contract + FROM + {{ ref('silver__increment_deployed_pairs') }} +), + +events AS ( + SELECT + block_height, + block_timestamp, + tx_id, + event_index, + event_contract, + event_type, + event_data, + _inserted_timestamp, + modified_timestamp AS _modified_timestamp + FROM + {{ ref('silver__streamline_events') }} + WHERE + event_contract IN (SELECT pair_contract FROM increment_pairs) + AND event_type = 'Swap' + + {% if is_incremental() %} + AND modified_timestamp >= ( + SELECT + MAX(modified_timestamp) + FROM + {{ this }} + ) + {% endif %} + +), + +swappair_events AS ( + SELECT + block_height, + block_timestamp, + tx_id, + event_index, + event_contract AS swap_contract, + NULL AS platform, + CASE + WHEN event_data:amount0In::FLOAT > 0 THEN event_data:amount0In::FLOAT + ELSE event_data:amount1In::FLOAT + END AS token_in_amount, + + CASE + WHEN event_data:amount0In::FLOAT > 0 THEN event_data:amount0Type::STRING + ELSE event_data:amount1Type::STRING + END AS token_in_contract, + + CASE + WHEN event_data:amount0Out::FLOAT > 0 THEN event_data:amount0Out::FLOAT + ELSE event_data:amount1Out::FLOAT + END AS token_out_amount, + + CASE + WHEN event_data:amount0Out::FLOAT > 0 THEN event_data:amount0Type::STRING + ELSE event_data:amount1Type::STRING + END AS token_out_contract, + + _modified_timestamp + FROM + events +), + +transactions AS ( + SELECT + tx_id, + authorizers + FROM + {{ ref('silver__streamline_transactions_final') }} + WHERE + tx_id IN (SELECT DISTINCT tx_id FROM swappair_events) + + {% if is_incremental() %} + AND modified_timestamp >= ( + SELECT + MAX(modified_timestamp) + FROM + {{ this }} + ) + {% endif %} +) + +SELECT + s.block_height, + s.block_timestamp, + s.tx_id, + ROW_NUMBER() OVER (PARTITION BY s.tx_id ORDER BY s.event_index) - 1 AS swap_index, + s.swap_contract, + s.platform, + t.authorizers[0]::STRING AS trader, + s.token_in_amount, + s.token_in_contract, + s.token_out_amount, + s.token_out_contract, + {{ dbt_utils.generate_surrogate_key(['s.tx_id', 's.event_index']) }} AS increment_swaps_id, + SYSDATE() AS inserted_timestamp, + SYSDATE() AS modified_timestamp, + '{{ invocation_id }}' AS _invocation_id +FROM + swappair_events s +LEFT JOIN + transactions t ON s.tx_id = t.tx_id + \ No newline at end of file diff --git a/models/silver/defi/dex/increment/silver__increment_swaps.yml b/models/silver/defi/dex/increment/silver__increment_swaps.yml new file mode 100644 index 0000000..f46309d --- /dev/null +++ b/models/silver/defi/dex/increment/silver__increment_swaps.yml @@ -0,0 +1,111 @@ +version: 2 + +models: + - name: silver__swaps_factory + description: |- + This table records asset swaps on the Flow blockchain parsed from Swap events emitted by Increment SwapPair contracts (via the SwapFactory contract). + tests: + - dbt_utils.unique_combination_of_columns: + combination_of_columns: + - tx_id + - swap_index + + columns: + - name: block_height + description: "{{ doc('block_height') }}" + tests: + - not_null + - dbt_expectations.expect_column_values_to_be_in_type_list: + column_type_list: + - NUMBER + - FLOAT + + - name: block_timestamp + description: "{{ doc('block_timestamp') }}" + tests: + - not_null + - dbt_expectations.expect_row_values_to_have_recent_data: + datepart: day + interval: 1 + - dbt_expectations.expect_column_values_to_be_in_type_list: + column_type_list: + - TIMESTAMP_NTZ + + - name: tx_id + description: "{{ doc('tx_id') }}" + tests: + - not_null + + - name: swap_index + description: "{{ doc('swap_index') }}" + + - name: swap_contract + description: "{{ doc('swap_contract') }}" + tests: + - not_null + - dbt_expectations.expect_column_values_to_be_in_type_list: + column_type_list: + - STRING + - VARCHAR + + - name: platform + description: "The platform or protocol for the swap. (Always NULL for Increment pairs)" + + - name: trader + description: "{{ doc('trader') }}" + tests: + - dbt_expectations.expect_column_values_to_be_in_type_list: + column_type_list: + - STRING + - VARCHAR + + - name: token_in_amount + description: "{{ doc('token_in_amount') }}" + tests: + - dbt_expectations.expect_column_values_to_be_in_type_list: + column_type_list: + - NUMBER + - FLOAT + + - name: token_in_contract + description: "{{ doc('token_in_contract') }}" + tests: + - dbt_expectations.expect_column_values_to_be_in_type_list: + column_type_list: + - STRING + - VARCHAR + + - name: token_out_amount + description: "{{ doc('token_out_amount') }}" + tests: + - dbt_expectations.expect_column_values_to_be_in_type_list: + column_type_list: + - NUMBER + - FLOAT + + - name: token_out_contract + description: "{{ doc('token_out_contract') }}" + tests: + - dbt_expectations.expect_column_values_to_be_in_type_list: + column_type_list: + - STRING + - VARCHAR + + - name: increment_swaps_id + description: "A surrogate key for the row, generated from tx_id and event_index." + tests: + - not_null + - unique + + - name: inserted_timestamp + description: "The timestamp when the row was inserted." + tests: + - not_null + + - name: modified_timestamp + description: "The timestamp when the row was last modified." + tests: + - not_null + + - name: _invocation_id + description: "The dbt invocation ID for the run that produced this row." \ No newline at end of file diff --git a/models/silver/defi/silver__swaps_final.sql b/models/silver/defi/silver__swaps_final.sql index 3a9e893..fad31f9 100644 --- a/models/silver/defi/silver__swaps_final.sql +++ b/models/silver/defi/silver__swaps_final.sql @@ -38,6 +38,38 @@ WHERE ) {% endif %} ), + +swaps_from_increment_factory AS ( + SELECT + block_height, + block_timestamp, + tx_id, + swap_index, + swap_contract, + NULL AS platform, + trader, + token_in_amount, + token_in_contract, + NULL AS token_in_destination, + token_out_amount, + token_out_contract, + NULL AS token_out_source, + modified_timestamp AS _modified_timestamp, + 2 AS _priority -- Priority between aggregator and regular swaps + FROM + {{ ref('silver__increment_swaps') }} + +{% if is_incremental() %} +WHERE + modified_timestamp >= ( + SELECT + MAX(modified_timestamp) + FROM + {{ this }} + ) +{% endif %} +), + swaps AS ( SELECT block_height, @@ -59,10 +91,13 @@ swaps AS ( {{ ref('silver__swaps_s') }} WHERE tx_id NOT IN ( - SELECT - DISTINCT tx_id - FROM - swaps_from_aggregator + SELECT DISTINCT tx_id + FROM swaps_from_aggregator + + UNION ALL + + SELECT DISTINCT tx_id + FROM swaps_from_increment_factory ) {% if is_incremental() %} @@ -80,6 +115,11 @@ swaps_union AS ( FROM swaps_from_aggregator UNION ALL + SELECT + * + FROM + swaps_from_increment_factory + UNION ALL SELECT * FROM