ez token transfers plus verified flags (#15)

* ez token transfer and all that goes into that

* fix UK and partition

* inc filter
This commit is contained in:
eric-laurello 2025-07-25 14:44:22 -04:00 committed by GitHub
parent 4bc91f5040
commit 9658462cfc
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
18 changed files with 390 additions and 1 deletions

View File

@ -21,6 +21,7 @@ SELECT
{{ unique_key }},
{{ other_cols }},
value,
file_name,
_inserted_timestamp,
s.{{ partition_name }}
FROM
@ -61,6 +62,7 @@ SELECT
{{ unique_key }},
{{ other_cols }},
value,
file_name,
_inserted_timestamp,
s.{{ partition_name }}
FROM

View File

@ -0,0 +1,10 @@
{{ config (
materialized = 'view'
) }}
{{ streamline_external_table_query_v2(
model = "token_transfers_raw",
partition_function = "TRY_TO_DATE(left(split_part(split_part(file_name, '=', -1), '/', -1),8), 'YYYYMMDD')",
partition_name = "partition_gte_id",
unique_key = "metadata",
other_cols = "partition_id"
) }}

View File

@ -0,0 +1,10 @@
{{ config (
materialized = 'view'
) }}
{{ streamline_external_table_FR_query_v2(
model = "token_transfers_raw",
partition_function = "TRY_TO_DATE(left(split_part(split_part(file_name, '=', -1), '/', -1),8), 'YYYYMMDD')",
partition_name = "partition_gte_id",
unique_key = "metadata",
other_cols = "partition_id"
) }}

View File

@ -15,6 +15,7 @@ SELECT
is_deprecated,
provider,
source,
is_verified,
_inserted_timestamp,
inserted_timestamp,
modified_timestamp,

View File

@ -18,6 +18,7 @@ SELECT
is_deprecated,
provider,
source,
is_verified,
_inserted_timestamp,
inserted_timestamp,
modified_timestamp,

View File

@ -0,0 +1,5 @@
{% docs prices_is_verified %}
A flag indicating if the asset has been verified by the Flipside team.
{% enddocs %}

View File

@ -0,0 +1,7 @@
{% docs core__ez_token_transfers %} The token transfers raw table contains the SEP-41 compliant event stream from the token transfer processor. This table's purpose is to track the token value movement on the stellar network in the form of transfer, mint, burn, clawback, and fee events.
transfer, mint, burn, and clawback events are emitted at the operation grain. fee events are emitted at the transaction grain because there is no individual fee per operation.
fee events can be negative in the event of a refund. The final fee paid (intial fee + refund) will always be positive. More information about fee refunds can be found here.
Note that the events within this table are a subset of the events in the history_contract_events table. {% enddocs %}

View File

@ -0,0 +1,76 @@
-- depends_on: {{ ref('silver__transactions') }}
{{ config(
materialized = 'incremental',
unique_key = ["ez_token_transfers_id"],
incremental_predicates = ["dynamic_range_predicate", "block_timestamp::date"],
merge_exclude_columns = ["inserted_timestamp"],
cluster_by = ['block_timestamp::DATE','closed_at::DATE'],
post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION ON EQUALITY(transaction_hash,operation_id,from_address,to_address,contract_id,event_topic);",
tags = ['scheduled_core']
) }}
SELECT
transaction_hash,
ledger_sequence,
operation_id,
closed_at AS block_timestamp,
from_address,
to_address,
amount,
amount_raw,
amount * COALESCE(
b2.price,
b.price
) AS amount_usd,
A.asset,
A.asset_code,
A.asset_issuer,
A.asset_type,
COALESCE(
b.is_verified,
b2.is_verified,
FALSE
) AS token_is_verified,
contract_id,
event_topic,
to_muxed,
to_muxed_id,
transaction_id,
closed_at,
batch_id,
batch_run_date,
batch_insert_ts,
token_transfers_id AS ez_token_transfers_id,
SYSDATE() AS inserted_timestamp,
SYSDATE() AS modified_timestamp
FROM
{{ ref('silver__token_transfers') }} A
LEFT JOIN {{ ref('price__ez_prices_hourly') }}
b
ON DATE_TRUNC(
'hour',
A.closed_at
) = b.hour
AND A.asset_issuer = b.asset_issuer
AND A.asset_code = b.asset_code
LEFT JOIN {{ ref('price__ez_prices_hourly') }}
b2
ON DATE_TRUNC(
'hour',
A.closed_at
) = b2.hour
AND A.asset_type = 'native'
AND b2.is_native
LEFT JOIN {{ ref('core__dim_assets') }} C
ON A.asset_issuer = C.asset_issuer
AND A.asset_code = C.asset_code
{% if is_incremental() %}
WHERE
A.modified_timestamp >= (
SELECT
MAX(modified_timestamp)
FROM
{{ this }}
)
{% endif %}

View File

@ -0,0 +1,108 @@
version: 2
models:
- name: core__ez_token_transfers
description: "{{ doc('core__ez_token_transfers') }}"
columns:
- name: transaction_hash
description: A hex-encoded SHA-256 hash of this transaction's XDR-encoded form.
tests:
- not_null:
where: batch_insert_ts > current_date - {{ var('test_days_threshold', 3) }}
- name: transaction_id
description: A unique identifier for this transaction.
tests:
- not_null:
where: batch_insert_ts > current_date - {{ var('test_days_threshold', 3) }}
- name: operation_id
description: A unique identifier for this transaction.
- name: event_topic
description: The action type applied to the token.
tests:
- not_null:
where: batch_insert_ts > current_date - {{ var('test_days_threshold', 3) }}
- name: from
description: The source address for the token transfer event amount.
- name: to
description: The destination address for the token transfer event amount.
- name: asset
description: ID field for the asset code/issuer pair. Its created by concatenating the asset code, ':' and asset_issuer fields.
tests:
- not_null:
where: batch_insert_ts > current_date - {{ var('test_days_threshold', 3) }}
- name: asset_type
description: The identifier for type of asset code, can be an alphanumeric with 4 characters, 12 characters or the native asset to the network, XLM.
tests:
- not_null:
where: batch_insert_ts > current_date - {{ var('test_days_threshold', 3) }}
- name: asset_code
description: The 4 or 12 character code representation of the asset on the network.
- name: asset_issuer
description: The account address of the original asset issuer that created the asset.
- name: amount
description: The normalized float amount of the asset. Raw amount of asset divided by 0.0000001.
tests:
- not_null:
where: batch_insert_ts > current_date - {{ var('test_days_threshold', 3) }}
- name: amount_raw
description: The raw stroop amount of the asset.
tests:
- not_null:
where: batch_insert_ts > current_date - {{ var('test_days_threshold', 3) }}
- name: token_is_verified
description: A flag indicating if the asset has been verified by the Flipside team.
- name: contract_id
description: Soroban contract id.
tests:
- not_null:
where: batch_insert_ts > current_date - {{ var('test_days_threshold', 3) }}
- name: ledger_sequence
description: The sequence number of this ledger. It represents the order of the ledger within the Stellar blockchain. Each ledger has a unique sequence number that increments with every new ledger, ensuring that ledgers are processed in the correct order.
tests:
- not_null:
where: batch_insert_ts > current_date - {{ var('test_days_threshold', 3) }}
- name: closed_at
description: Timestamp in UTC when this ledger closed and committed to the network. Ledgers are expected to close ~every 5 seconds.
tests:
- not_null:
where: batch_insert_ts > current_date - {{ var('test_days_threshold', 3) }}
- name: to_muxed
description: The multiplexed strkey representation of the `to` address.
- name: to_muxed_id
description: The multiplexed ID used to generate the multiplexed strkey representation of the `to` address.
- name: batch_id
description: String representation of the run id for a given DAG in Airflow. Takes the form of "scheduled__<batch_end_date>-<dag_alias>". Batch ids are unique to the batch and help with monitoring and rerun capabilities.
tests:
- not_null:
where: batch_insert_ts > current_date - {{ var('test_days_threshold', 3) }}
- name: batch_run_date
description: The start date for the batch interval. When taken with the date in the batch_id, the date represents the interval of ledgers processed. The batch run date can be seen as a proxy of closed_at for a ledger.
tests:
- not_null:
where: batch_insert_ts > current_date - {{ var('test_days_threshold', 3) }}
- name: batch_insert_ts
description: The timestamp in UTC when a batch of records was inserted into the database. This field can help identify if a batch executed in real time or as part of a backfill. The timestamp should not be used during ad hoc analysis and is useful for data engineering purposes.
tests:
- not_null:
where: batch_insert_ts > current_date - {{ var('test_days_threshold', 3) }}

View File

@ -17,6 +17,10 @@ SELECT
decimals,
blockchain,
FALSE AS is_native,
COALESCE(
is_verified,
FALSE
) AS is_verified,
is_deprecated,
{{ dbt_utils.generate_surrogate_key(['complete_token_asset_metadata_id']) }} AS ez_asset_metadata_id,
SYSDATE() AS inserted_timestamp,
@ -44,6 +48,7 @@ WHERE
blockchain,
TRUE AS is_native,
is_deprecated,
TRUE AS is_verified,
{{ dbt_utils.generate_surrogate_key(['complete_native_asset_metadata_id']) }} AS ez_asset_metadata_id,
SYSDATE() AS inserted_timestamp,
SYSDATE() AS modified_timestamp

View File

@ -24,6 +24,8 @@ models:
description: '{{ doc("prices_is_native") }}'
- name: IS_DEPRECATED
description: '{{ doc("prices_is_deprecated") }}'
- name: IS_VERIFIED
description: '{{ doc("prices_is_verified") }}'
- name: EZ_ASSET_METADATA_ID
description: '{{ doc("pk") }}'
- name: INSERTED_TIMESTAMP

View File

@ -18,6 +18,10 @@ SELECT
decimals,
blockchain,
FALSE AS is_native,
COALESCE(
is_verified,
FALSE
) AS is_verified,
is_imputed,
is_deprecated,
{{ dbt_utils.generate_surrogate_key(['complete_token_prices_id']) }} AS ez_prices_hourly_id,
@ -45,6 +49,7 @@ WHERE
decimals,
blockchain,
TRUE AS is_native,
TRUE AS is_verified,
is_imputed,
is_deprecated,
{{ dbt_utils.generate_surrogate_key(['complete_native_prices_id']) }} AS ez_prices_hourly_id,

View File

@ -30,6 +30,8 @@ models:
description: '{{ doc("prices_is_imputed") }}'
- name: IS_DEPRECATED
description: '{{ doc("prices_is_deprecated") }}'
- name: IS_VERIFIED
description: '{{ doc("prices_is_verified") }}'
- name: EZ_PRICES_HOURLY_ID
description: '{{ doc("pk") }}'
- name: INSERTED_TIMESTAMP

View File

@ -0,0 +1,138 @@
-- depends_on: {{ ref('bronze__token_transfers') }}
{{ config(
materialized = 'incremental',
unique_key = "token_transfers_id",
incremental_predicates = ["dynamic_range_predicate", "partition_id::date"],
merge_exclude_columns = ["inserted_timestamp"],
cluster_by = ['closed_at::DATE','partition_id','modified_timestamp::DATE'],
tags = ['scheduled_core'],
) }}
{% if execute %}
{% if is_incremental() %}
{% set max_is_query %}
SELECT
MAX(_inserted_timestamp) AS _inserted_timestamp,
MAX(partition_gte_id) AS partition__gte_id
FROM
{{ this }}
{% endset %}
{% set result = run_query(max_is_query) %}
{% set max_is = result [0] [0] %}
{% set max_part = result [0] [1] %}
{% endif %}
{% endif %}
WITH pre_final AS (
SELECT
partition_id,
partition_gte_id,
VALUE :amount :: FLOAT AS amount,
VALUE :amount_raw :: STRING AS amount_raw,
VALUE :asset :: STRING AS asset,
VALUE :asset_code :: STRING AS asset_code,
VALUE :asset_issuer :: STRING AS asset_issuer,
VALUE :asset_type :: STRING AS asset_type,
TO_TIMESTAMP(
VALUE :closed_at :: INT,
6
) AS closed_at,
VALUE :contract_id :: STRING AS contract_id,
VALUE :event_topic :: STRING AS event_topic,
VALUE :from :: STRING AS from_address,
VALUE :ledger_sequence :: bigint AS ledger_sequence,
VALUE :operation_id :: bigint AS operation_id,
VALUE :to :: STRING AS to_address,
VALUE :to_muxed :: STRING AS to_muxed,
VALUE :to_muxed_id :: STRING AS to_muxed_id,
VALUE :transaction_hash :: STRING AS transaction_hash,
VALUE :transaction_id :: bigint AS transaction_id,
VALUE :batch_id :: STRING AS batch_id,
TO_TIMESTAMP(
VALUE :batch_run_date :: INT,
6
) AS batch_run_date,
TO_TIMESTAMP(
VALUE :batch_insert_ts :: INT,
6
) AS batch_insert_ts,
_inserted_timestamp,
ROW_NUMBER() over(
PARTITION BY transaction_hash,
COALESCE(
operation_id,
0
),
to_address,
from_address,
asset,
amount_raw,
event_topic,
file_name
ORDER BY
_inserted_timestamp DESC
) AS artificial_uk
FROM
{% if is_incremental() %}
{{ ref('bronze__token_transfers') }}
{% else %}
{{ ref('bronze__token_transfers_FR') }}
{% endif %}
{% if is_incremental() %}
WHERE
partition_gte_id >= '{{ max_part }}'
AND _inserted_timestamp > '{{ max_is }}'
{% endif %}
qualify DENSE_RANK() over(
PARTITION BY transaction_hash,
COALESCE(
operation_id,
0
),
to_address,
from_address,
asset,
amount_raw,
event_topic
ORDER BY
_inserted_timestamp DESC
) = 1
)
SELECT
partition_id,
partition_gte_id,
amount,
amount_raw,
asset,
asset_code,
asset_issuer,
asset_type,
closed_at,
contract_id,
event_topic,
from_address,
ledger_sequence,
operation_id,
to_address,
to_muxed,
to_muxed_id,
transaction_hash,
transaction_id,
batch_id,
batch_run_date,
batch_insert_ts,
_inserted_timestamp,
{{ dbt_utils.generate_surrogate_key(
['transaction_hash', 'operation_id', 'to_address', 'from_address', 'asset', 'amount_raw','event_topic','artificial_uk']
) }} AS token_transfers_id,
SYSDATE() AS inserted_timestamp,
SYSDATE() AS modified_timestamp,
'{{ invocation_id }}' AS _invocation_id
FROM
pre_final

View File

@ -0,0 +1,11 @@
version: 2
models:
- name: silver__token_transfers
columns:
- name: token_transfers_id
description: "{{ doc('id') }}"
tests:
- not_null:
where: modified_timestamp > current_date - {{ var('test_days_threshold', 3) }}
- unique:
where: modified_timestamp > current_date - {{ var('test_days_threshold', 3) }}

View File

@ -39,6 +39,7 @@ WITH providers AS (
is_deprecated,
provider,
source,
is_verified,
_inserted_timestamp
FROM
{{ ref(
@ -79,6 +80,7 @@ SELECT
A.is_deprecated,
A.provider,
A.source,
A.is_verified,
A._inserted_timestamp,
SYSDATE() AS inserted_timestamp,
SYSDATE() AS modified_timestamp,

View File

@ -43,6 +43,7 @@ WITH providers AS (
is_deprecated,
provider,
source,
is_verified,
_inserted_timestamp
FROM
{{ ref(
@ -87,6 +88,7 @@ SELECT
A.is_deprecated,
A.provider,
A.source,
A.is_verified,
A._inserted_timestamp,
SYSDATE() AS inserted_timestamp,
SYSDATE() AS modified_timestamp,

View File

@ -3,7 +3,8 @@ version: 2
sources:
- name: bronze_streamline
database: streamline
schema: "{{ 'stellar' if target.database == 'STELLAR' else 'stellar_dev' }}"
schema: stellar
# "{{ 'stellar' if target.database == 'STELLAR' else 'stellar_dev' }}"
tables:
- name: accounts
- name: contract_data
@ -16,6 +17,7 @@ sources:
- name: liquidity_pools
- name: streamline_ledgers
- name: trust_lines
- name: token_transfers_raw
- name: crosschain
database: "{{ 'crosschain' if target.database == 'STELLAR' else 'crosschain_dev' }}"
schema: core