From 9658462cfc1f03000d47a1e7353392283f48308c Mon Sep 17 00:00:00 2001 From: eric-laurello <102970824+eric-laurello@users.noreply.github.com> Date: Fri, 25 Jul 2025 14:44:22 -0400 Subject: [PATCH] ez token transfers plus verified flags (#15) * ez token transfer and all that goes into that * fix UK and partition * inc filter --- macros/streamline/models.sql | 2 + .../bronze/core/bronze__token_transfers.sql | 10 ++ .../core/bronze__token_transfers_FR.sql | 10 ++ .../bronze__complete_token_asset_metadata.sql | 1 + .../prices/bronze__complete_token_prices.sql | 1 + models/descriptions/prices_is_verified.md | 5 + .../tables/core__ez_token_transfers.md | 7 + models/gold/core/core__ez_token_transfers.sql | 76 ++++++++++ models/gold/core/core__ez_token_transfers.yml | 108 ++++++++++++++ .../gold/prices/price__ez_asset_metadata.sql | 5 + .../gold/prices/price__ez_asset_metadata.yml | 2 + .../gold/prices/price__ez_prices_hourly.sql | 5 + .../gold/prices/price__ez_prices_hourly.yml | 2 + .../silver/core/silver__token_transfers.sql | 138 ++++++++++++++++++ .../silver/core/silver__token_transfers.yml | 11 ++ .../silver__complete_token_asset_metadata.sql | 2 + .../prices/silver__complete_token_prices.sql | 2 + models/sources.yml | 4 +- 18 files changed, 390 insertions(+), 1 deletion(-) create mode 100644 models/bronze/core/bronze__token_transfers.sql create mode 100644 models/bronze/core/bronze__token_transfers_FR.sql create mode 100644 models/descriptions/prices_is_verified.md create mode 100644 models/descriptions/tables/core__ez_token_transfers.md create mode 100644 models/gold/core/core__ez_token_transfers.sql create mode 100644 models/gold/core/core__ez_token_transfers.yml create mode 100644 models/silver/core/silver__token_transfers.sql create mode 100644 models/silver/core/silver__token_transfers.yml diff --git a/macros/streamline/models.sql b/macros/streamline/models.sql index 2819e7d..4eecfa8 100644 --- a/macros/streamline/models.sql +++ b/macros/streamline/models.sql @@ -21,6 +21,7 @@ SELECT {{ unique_key }}, {{ other_cols }}, value, + file_name, _inserted_timestamp, s.{{ partition_name }} FROM @@ -61,6 +62,7 @@ SELECT {{ unique_key }}, {{ other_cols }}, value, + file_name, _inserted_timestamp, s.{{ partition_name }} FROM diff --git a/models/bronze/core/bronze__token_transfers.sql b/models/bronze/core/bronze__token_transfers.sql new file mode 100644 index 0000000..5e94f4a --- /dev/null +++ b/models/bronze/core/bronze__token_transfers.sql @@ -0,0 +1,10 @@ +{{ config ( + materialized = 'view' +) }} +{{ streamline_external_table_query_v2( + model = "token_transfers_raw", + partition_function = "TRY_TO_DATE(left(split_part(split_part(file_name, '=', -1), '/', -1),8), 'YYYYMMDD')", + partition_name = "partition_gte_id", + unique_key = "metadata", + other_cols = "partition_id" +) }} diff --git a/models/bronze/core/bronze__token_transfers_FR.sql b/models/bronze/core/bronze__token_transfers_FR.sql new file mode 100644 index 0000000..a5f22fe --- /dev/null +++ b/models/bronze/core/bronze__token_transfers_FR.sql @@ -0,0 +1,10 @@ +{{ config ( + materialized = 'view' +) }} +{{ streamline_external_table_FR_query_v2( + model = "token_transfers_raw", + partition_function = "TRY_TO_DATE(left(split_part(split_part(file_name, '=', -1), '/', -1),8), 'YYYYMMDD')", + partition_name = "partition_gte_id", + unique_key = "metadata", + other_cols = "partition_id" +) }} diff --git a/models/bronze/prices/bronze__complete_token_asset_metadata.sql b/models/bronze/prices/bronze__complete_token_asset_metadata.sql index c4d6ff0..3df0d65 100644 --- a/models/bronze/prices/bronze__complete_token_asset_metadata.sql +++ b/models/bronze/prices/bronze__complete_token_asset_metadata.sql @@ -15,6 +15,7 @@ SELECT is_deprecated, provider, source, + is_verified, _inserted_timestamp, inserted_timestamp, modified_timestamp, diff --git a/models/bronze/prices/bronze__complete_token_prices.sql b/models/bronze/prices/bronze__complete_token_prices.sql index 807a83f..807f068 100644 --- a/models/bronze/prices/bronze__complete_token_prices.sql +++ b/models/bronze/prices/bronze__complete_token_prices.sql @@ -18,6 +18,7 @@ SELECT is_deprecated, provider, source, + is_verified, _inserted_timestamp, inserted_timestamp, modified_timestamp, diff --git a/models/descriptions/prices_is_verified.md b/models/descriptions/prices_is_verified.md new file mode 100644 index 0000000..8bfa93c --- /dev/null +++ b/models/descriptions/prices_is_verified.md @@ -0,0 +1,5 @@ +{% docs prices_is_verified %} + +A flag indicating if the asset has been verified by the Flipside team. + +{% enddocs %} \ No newline at end of file diff --git a/models/descriptions/tables/core__ez_token_transfers.md b/models/descriptions/tables/core__ez_token_transfers.md new file mode 100644 index 0000000..61aa274 --- /dev/null +++ b/models/descriptions/tables/core__ez_token_transfers.md @@ -0,0 +1,7 @@ +{% docs core__ez_token_transfers %} The token transfers raw table contains the SEP-41 compliant event stream from the token transfer processor. This table's purpose is to track the token value movement on the stellar network in the form of transfer, mint, burn, clawback, and fee events. + +transfer, mint, burn, and clawback events are emitted at the operation grain. fee events are emitted at the transaction grain because there is no individual fee per operation. + +fee events can be negative in the event of a refund. The final fee paid (intial fee + refund) will always be positive. More information about fee refunds can be found here. + +Note that the events within this table are a subset of the events in the history_contract_events table. {% enddocs %} \ No newline at end of file diff --git a/models/gold/core/core__ez_token_transfers.sql b/models/gold/core/core__ez_token_transfers.sql new file mode 100644 index 0000000..487f048 --- /dev/null +++ b/models/gold/core/core__ez_token_transfers.sql @@ -0,0 +1,76 @@ +-- depends_on: {{ ref('silver__transactions') }} +{{ config( + materialized = 'incremental', + unique_key = ["ez_token_transfers_id"], + incremental_predicates = ["dynamic_range_predicate", "block_timestamp::date"], + merge_exclude_columns = ["inserted_timestamp"], + cluster_by = ['block_timestamp::DATE','closed_at::DATE'], + post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION ON EQUALITY(transaction_hash,operation_id,from_address,to_address,contract_id,event_topic);", + tags = ['scheduled_core'] +) }} + +SELECT + transaction_hash, + ledger_sequence, + operation_id, + closed_at AS block_timestamp, + from_address, + to_address, + amount, + amount_raw, + amount * COALESCE( + b2.price, + b.price + ) AS amount_usd, + A.asset, + A.asset_code, + A.asset_issuer, + A.asset_type, + COALESCE( + b.is_verified, + b2.is_verified, + FALSE + ) AS token_is_verified, + contract_id, + event_topic, + to_muxed, + to_muxed_id, + transaction_id, + closed_at, + batch_id, + batch_run_date, + batch_insert_ts, + token_transfers_id AS ez_token_transfers_id, + SYSDATE() AS inserted_timestamp, + SYSDATE() AS modified_timestamp +FROM + {{ ref('silver__token_transfers') }} A + LEFT JOIN {{ ref('price__ez_prices_hourly') }} + b + ON DATE_TRUNC( + 'hour', + A.closed_at + ) = b.hour + AND A.asset_issuer = b.asset_issuer + AND A.asset_code = b.asset_code + LEFT JOIN {{ ref('price__ez_prices_hourly') }} + b2 + ON DATE_TRUNC( + 'hour', + A.closed_at + ) = b2.hour + AND A.asset_type = 'native' + AND b2.is_native + LEFT JOIN {{ ref('core__dim_assets') }} C + ON A.asset_issuer = C.asset_issuer + AND A.asset_code = C.asset_code + +{% if is_incremental() %} +WHERE + A.modified_timestamp >= ( + SELECT + MAX(modified_timestamp) + FROM + {{ this }} + ) +{% endif %} diff --git a/models/gold/core/core__ez_token_transfers.yml b/models/gold/core/core__ez_token_transfers.yml new file mode 100644 index 0000000..e3119fd --- /dev/null +++ b/models/gold/core/core__ez_token_transfers.yml @@ -0,0 +1,108 @@ +version: 2 + +models: + - name: core__ez_token_transfers + description: "{{ doc('core__ez_token_transfers') }}" + columns: + + - name: transaction_hash + description: A hex-encoded SHA-256 hash of this transaction's XDR-encoded form. + tests: + - not_null: + where: batch_insert_ts > current_date - {{ var('test_days_threshold', 3) }} + + - name: transaction_id + description: A unique identifier for this transaction. + tests: + - not_null: + where: batch_insert_ts > current_date - {{ var('test_days_threshold', 3) }} + + - name: operation_id + description: A unique identifier for this transaction. + + - name: event_topic + description: The action type applied to the token. + tests: + - not_null: + where: batch_insert_ts > current_date - {{ var('test_days_threshold', 3) }} + + - name: from + description: The source address for the token transfer event amount. + + - name: to + description: The destination address for the token transfer event amount. + + - name: asset + description: ID field for the asset code/issuer pair. Its created by concatenating the asset code, ':' and asset_issuer fields. + tests: + - not_null: + where: batch_insert_ts > current_date - {{ var('test_days_threshold', 3) }} + + - name: asset_type + description: The identifier for type of asset code, can be an alphanumeric with 4 characters, 12 characters or the native asset to the network, XLM. + tests: + - not_null: + where: batch_insert_ts > current_date - {{ var('test_days_threshold', 3) }} + + - name: asset_code + description: The 4 or 12 character code representation of the asset on the network. + + - name: asset_issuer + description: The account address of the original asset issuer that created the asset. + + - name: amount + description: The normalized float amount of the asset. Raw amount of asset divided by 0.0000001. + tests: + - not_null: + where: batch_insert_ts > current_date - {{ var('test_days_threshold', 3) }} + + - name: amount_raw + description: The raw stroop amount of the asset. + tests: + - not_null: + where: batch_insert_ts > current_date - {{ var('test_days_threshold', 3) }} + + - name: token_is_verified + description: A flag indicating if the asset has been verified by the Flipside team. + + - name: contract_id + description: Soroban contract id. + tests: + - not_null: + where: batch_insert_ts > current_date - {{ var('test_days_threshold', 3) }} + + - name: ledger_sequence + description: The sequence number of this ledger. It represents the order of the ledger within the Stellar blockchain. Each ledger has a unique sequence number that increments with every new ledger, ensuring that ledgers are processed in the correct order. + tests: + - not_null: + where: batch_insert_ts > current_date - {{ var('test_days_threshold', 3) }} + + - name: closed_at + description: Timestamp in UTC when this ledger closed and committed to the network. Ledgers are expected to close ~every 5 seconds. + tests: + - not_null: + where: batch_insert_ts > current_date - {{ var('test_days_threshold', 3) }} + + - name: to_muxed + description: The multiplexed strkey representation of the `to` address. + + - name: to_muxed_id + description: The multiplexed ID used to generate the multiplexed strkey representation of the `to` address. + + - name: batch_id + description: String representation of the run id for a given DAG in Airflow. Takes the form of "scheduled__-". Batch ids are unique to the batch and help with monitoring and rerun capabilities. + tests: + - not_null: + where: batch_insert_ts > current_date - {{ var('test_days_threshold', 3) }} + + - name: batch_run_date + description: The start date for the batch interval. When taken with the date in the batch_id, the date represents the interval of ledgers processed. The batch run date can be seen as a proxy of closed_at for a ledger. + tests: + - not_null: + where: batch_insert_ts > current_date - {{ var('test_days_threshold', 3) }} + + - name: batch_insert_ts + description: The timestamp in UTC when a batch of records was inserted into the database. This field can help identify if a batch executed in real time or as part of a backfill. The timestamp should not be used during ad hoc analysis and is useful for data engineering purposes. + tests: + - not_null: + where: batch_insert_ts > current_date - {{ var('test_days_threshold', 3) }} \ No newline at end of file diff --git a/models/gold/prices/price__ez_asset_metadata.sql b/models/gold/prices/price__ez_asset_metadata.sql index 0002ca0..7e1175f 100644 --- a/models/gold/prices/price__ez_asset_metadata.sql +++ b/models/gold/prices/price__ez_asset_metadata.sql @@ -17,6 +17,10 @@ SELECT decimals, blockchain, FALSE AS is_native, + COALESCE( + is_verified, + FALSE + ) AS is_verified, is_deprecated, {{ dbt_utils.generate_surrogate_key(['complete_token_asset_metadata_id']) }} AS ez_asset_metadata_id, SYSDATE() AS inserted_timestamp, @@ -44,6 +48,7 @@ WHERE blockchain, TRUE AS is_native, is_deprecated, + TRUE AS is_verified, {{ dbt_utils.generate_surrogate_key(['complete_native_asset_metadata_id']) }} AS ez_asset_metadata_id, SYSDATE() AS inserted_timestamp, SYSDATE() AS modified_timestamp diff --git a/models/gold/prices/price__ez_asset_metadata.yml b/models/gold/prices/price__ez_asset_metadata.yml index f663849..dc6b629 100644 --- a/models/gold/prices/price__ez_asset_metadata.yml +++ b/models/gold/prices/price__ez_asset_metadata.yml @@ -24,6 +24,8 @@ models: description: '{{ doc("prices_is_native") }}' - name: IS_DEPRECATED description: '{{ doc("prices_is_deprecated") }}' + - name: IS_VERIFIED + description: '{{ doc("prices_is_verified") }}' - name: EZ_ASSET_METADATA_ID description: '{{ doc("pk") }}' - name: INSERTED_TIMESTAMP diff --git a/models/gold/prices/price__ez_prices_hourly.sql b/models/gold/prices/price__ez_prices_hourly.sql index dc00c5d..9ee41f4 100644 --- a/models/gold/prices/price__ez_prices_hourly.sql +++ b/models/gold/prices/price__ez_prices_hourly.sql @@ -18,6 +18,10 @@ SELECT decimals, blockchain, FALSE AS is_native, + COALESCE( + is_verified, + FALSE + ) AS is_verified, is_imputed, is_deprecated, {{ dbt_utils.generate_surrogate_key(['complete_token_prices_id']) }} AS ez_prices_hourly_id, @@ -45,6 +49,7 @@ WHERE decimals, blockchain, TRUE AS is_native, + TRUE AS is_verified, is_imputed, is_deprecated, {{ dbt_utils.generate_surrogate_key(['complete_native_prices_id']) }} AS ez_prices_hourly_id, diff --git a/models/gold/prices/price__ez_prices_hourly.yml b/models/gold/prices/price__ez_prices_hourly.yml index fe81d51..1b35f47 100644 --- a/models/gold/prices/price__ez_prices_hourly.yml +++ b/models/gold/prices/price__ez_prices_hourly.yml @@ -30,6 +30,8 @@ models: description: '{{ doc("prices_is_imputed") }}' - name: IS_DEPRECATED description: '{{ doc("prices_is_deprecated") }}' + - name: IS_VERIFIED + description: '{{ doc("prices_is_verified") }}' - name: EZ_PRICES_HOURLY_ID description: '{{ doc("pk") }}' - name: INSERTED_TIMESTAMP diff --git a/models/silver/core/silver__token_transfers.sql b/models/silver/core/silver__token_transfers.sql new file mode 100644 index 0000000..71a894f --- /dev/null +++ b/models/silver/core/silver__token_transfers.sql @@ -0,0 +1,138 @@ +-- depends_on: {{ ref('bronze__token_transfers') }} +{{ config( + materialized = 'incremental', + unique_key = "token_transfers_id", + incremental_predicates = ["dynamic_range_predicate", "partition_id::date"], + merge_exclude_columns = ["inserted_timestamp"], + cluster_by = ['closed_at::DATE','partition_id','modified_timestamp::DATE'], + tags = ['scheduled_core'], +) }} + +{% if execute %} + +{% if is_incremental() %} +{% set max_is_query %} + +SELECT + MAX(_inserted_timestamp) AS _inserted_timestamp, + MAX(partition_gte_id) AS partition__gte_id +FROM + {{ this }} + + {% endset %} + {% set result = run_query(max_is_query) %} + {% set max_is = result [0] [0] %} + {% set max_part = result [0] [1] %} +{% endif %} +{% endif %} + +WITH pre_final AS ( + SELECT + partition_id, + partition_gte_id, + VALUE :amount :: FLOAT AS amount, + VALUE :amount_raw :: STRING AS amount_raw, + VALUE :asset :: STRING AS asset, + VALUE :asset_code :: STRING AS asset_code, + VALUE :asset_issuer :: STRING AS asset_issuer, + VALUE :asset_type :: STRING AS asset_type, + TO_TIMESTAMP( + VALUE :closed_at :: INT, + 6 + ) AS closed_at, + VALUE :contract_id :: STRING AS contract_id, + VALUE :event_topic :: STRING AS event_topic, + VALUE :from :: STRING AS from_address, + VALUE :ledger_sequence :: bigint AS ledger_sequence, + VALUE :operation_id :: bigint AS operation_id, + VALUE :to :: STRING AS to_address, + VALUE :to_muxed :: STRING AS to_muxed, + VALUE :to_muxed_id :: STRING AS to_muxed_id, + VALUE :transaction_hash :: STRING AS transaction_hash, + VALUE :transaction_id :: bigint AS transaction_id, + VALUE :batch_id :: STRING AS batch_id, + TO_TIMESTAMP( + VALUE :batch_run_date :: INT, + 6 + ) AS batch_run_date, + TO_TIMESTAMP( + VALUE :batch_insert_ts :: INT, + 6 + ) AS batch_insert_ts, + _inserted_timestamp, + ROW_NUMBER() over( + PARTITION BY transaction_hash, + COALESCE( + operation_id, + 0 + ), + to_address, + from_address, + asset, + amount_raw, + event_topic, + file_name + ORDER BY + _inserted_timestamp DESC + ) AS artificial_uk + FROM + +{% if is_incremental() %} +{{ ref('bronze__token_transfers') }} +{% else %} + {{ ref('bronze__token_transfers_FR') }} +{% endif %} + +{% if is_incremental() %} +WHERE + partition_gte_id >= '{{ max_part }}' + AND _inserted_timestamp > '{{ max_is }}' +{% endif %} + +qualify DENSE_RANK() over( + PARTITION BY transaction_hash, + COALESCE( + operation_id, + 0 + ), + to_address, + from_address, + asset, + amount_raw, + event_topic + ORDER BY + _inserted_timestamp DESC +) = 1 +) +SELECT + partition_id, + partition_gte_id, + amount, + amount_raw, + asset, + asset_code, + asset_issuer, + asset_type, + closed_at, + contract_id, + event_topic, + from_address, + ledger_sequence, + operation_id, + to_address, + to_muxed, + to_muxed_id, + transaction_hash, + transaction_id, + batch_id, + batch_run_date, + batch_insert_ts, + _inserted_timestamp, + {{ dbt_utils.generate_surrogate_key( + ['transaction_hash', 'operation_id', 'to_address', 'from_address', 'asset', 'amount_raw','event_topic','artificial_uk'] + ) }} AS token_transfers_id, + SYSDATE() AS inserted_timestamp, + SYSDATE() AS modified_timestamp, + '{{ invocation_id }}' AS _invocation_id +FROM + pre_final diff --git a/models/silver/core/silver__token_transfers.yml b/models/silver/core/silver__token_transfers.yml new file mode 100644 index 0000000..cda3dd7 --- /dev/null +++ b/models/silver/core/silver__token_transfers.yml @@ -0,0 +1,11 @@ +version: 2 +models: + - name: silver__token_transfers + columns: + - name: token_transfers_id + description: "{{ doc('id') }}" + tests: + - not_null: + where: modified_timestamp > current_date - {{ var('test_days_threshold', 3) }} + - unique: + where: modified_timestamp > current_date - {{ var('test_days_threshold', 3) }} diff --git a/models/silver/prices/silver__complete_token_asset_metadata.sql b/models/silver/prices/silver__complete_token_asset_metadata.sql index 9da28ba..b678ceb 100644 --- a/models/silver/prices/silver__complete_token_asset_metadata.sql +++ b/models/silver/prices/silver__complete_token_asset_metadata.sql @@ -39,6 +39,7 @@ WITH providers AS ( is_deprecated, provider, source, + is_verified, _inserted_timestamp FROM {{ ref( @@ -79,6 +80,7 @@ SELECT A.is_deprecated, A.provider, A.source, + A.is_verified, A._inserted_timestamp, SYSDATE() AS inserted_timestamp, SYSDATE() AS modified_timestamp, diff --git a/models/silver/prices/silver__complete_token_prices.sql b/models/silver/prices/silver__complete_token_prices.sql index cb8411d..b52daec 100644 --- a/models/silver/prices/silver__complete_token_prices.sql +++ b/models/silver/prices/silver__complete_token_prices.sql @@ -43,6 +43,7 @@ WITH providers AS ( is_deprecated, provider, source, + is_verified, _inserted_timestamp FROM {{ ref( @@ -87,6 +88,7 @@ SELECT A.is_deprecated, A.provider, A.source, + A.is_verified, A._inserted_timestamp, SYSDATE() AS inserted_timestamp, SYSDATE() AS modified_timestamp, diff --git a/models/sources.yml b/models/sources.yml index c889cdf..49b51a4 100644 --- a/models/sources.yml +++ b/models/sources.yml @@ -3,7 +3,8 @@ version: 2 sources: - name: bronze_streamline database: streamline - schema: "{{ 'stellar' if target.database == 'STELLAR' else 'stellar_dev' }}" + schema: stellar + # "{{ 'stellar' if target.database == 'STELLAR' else 'stellar_dev' }}" tables: - name: accounts - name: contract_data @@ -16,6 +17,7 @@ sources: - name: liquidity_pools - name: streamline_ledgers - name: trust_lines + - name: token_transfers_raw - name: crosschain database: "{{ 'crosschain' if target.database == 'STELLAR' else 'crosschain_dev' }}" schema: core