This commit is contained in:
Eric Laurello 2025-04-17 16:42:07 -04:00
parent bafb9fa6d9
commit 3b55818d91
12 changed files with 187 additions and 260 deletions

View File

@ -94,6 +94,12 @@ vars:
ROLES:
- AWS_LAMBDA_APTOS_API
- INTERNAL_DEV
dev-2xl:
API_INTEGRATION: aws_aptos_api_stg_v2
EXTERNAL_FUNCTION_URI: 9v6g64rv1e.execute-api.us-east-1.amazonaws.com/stg/
ROLES:
- AWS_LAMBDA_APTOS_API
- INTERNAL_DEV
prod:
API_INTEGRATION: aws_aptos_api_prod_v2
EXTERNAL_FUNCTION_URI: mxus3semvi.execute-api.us-east-1.amazonaws.com/prod/

View File

@ -0,0 +1,58 @@
{{ config(
materialized = 'incremental',
full_refresh = false,
tags = ['noncore']
) }}
{% if is_incremental() %}
{% set last_version = get_last_transaction_version_created_coin_info() %}
{% endif %}
WITH params AS (
SELECT
'query MyQuery { coin_infos( limit: 100 order_by: {transaction_version_created: asc}' ||
{% if is_incremental() %}
'where: {transaction_version_created: {_gte: "' || {{ last_version }} || '"}} ' ||
{% endif %}
') { coin_type coin_type_hash creator_address decimals name symbol transaction_created_timestamp transaction_version_created }} ' AS query
),
res AS (
SELECT
live.udf_api(
'post',
'https://indexer.mainnet.aptoslabs.com/v1/graphql',
OBJECT_CONSTRUCT(
'Content-Type',
'application/json'
),
OBJECT_CONSTRUCT(
'query',
query,
'variables',{}
)
) AS res,
query,
SYSDATE() AS _inserted_timestamp
FROM
params
)
SELECT
{# res, #}
query,
C.value :coin_type :: STRING AS coin_type,
C.value :coin_type_hash :: STRING AS coin_type_hash,
C.value :creator_address :: STRING AS creator_address,
C.value :decimals :: INT AS decimals,
C.value :name :: STRING AS NAME,
C.value :symbol :: STRING AS symbol,
C.value :transaction_created_timestamp :: datetime AS transaction_created_timestamp,
C.value :transaction_version_created :: INT AS transaction_version_created,
_inserted_timestamp
FROM
res,
LATERAL FLATTEN(
input => res :data :data :coin_infos
) C

View File

@ -0,0 +1,44 @@
{{ config(
materialized = 'incremental',
unique_key = ['store_address'],
incremental_strategy = 'merge',
merge_exclude_columns = ["inserted_timestamp","block_timestamp_first","block_number_first"],
post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION ON EQUALITY(store_address);",
tags = ['core', 'full_test']
) }}
SELECT
block_timestamp AS block_timestamp_first,
block_number AS block_number_first,
address AS store_address,
change_data :metadata :inner :: STRING AS metadata_address,
CASE
WHEN change_data :metadata :inner :: STRING = '0x357b0b74bc833e95a115ad22604854d6b0fca151cecd94111770e5d6ffc9dc2b' THEN TRUE
ELSE FALSE
END AS is_usdt,
{{ dbt_utils.generate_surrogate_key(
['store_address']
) }} AS fungiblestore_metadata_id,
SYSDATE() AS inserted_timestamp,
SYSDATE() AS modified_timestamp,
'{{ invocation_id }}' AS _invocation_id
FROM
{{ ref('silver__changes') }}
WHERE
change_module = 'fungible_asset'
AND change_resource = 'FungibleStore'
{% if is_incremental() %}
AND modified_timestamp >= (
SELECT
MAX(modified_timestamp)
FROM
{{ this }}
)
{% endif %}
qualify ROW_NUMBER() over (
PARTITION BY address
ORDER BY
block_number
) = 1

View File

@ -1,7 +1,7 @@
version: 2
models:
- name: silver__fungiblestore_usdt
- name: silver__fungiblestore_metadata
description: |
Identifies and tracks fungible stores that contain USDT tokens.
This model filters and processes fungible asset stores, specifically identifying those
@ -9,23 +9,14 @@ models:
The model processes data from the last 30 days and maintains incremental updates.
columns:
- name: block_timestamp
description: Timestamp of the block when the store was created/updated
- name: block_timestamp_first
description: Block timestamp of the first time the store metadata was posted to the chain
tests:
- not_null
- name: block_date
description: Date of the block when the store was created/updated
tests:
- not_null
- name: block_number
description: Block number of the store creation/update
tests:
- not_null
- name: tx_hash
description: Transaction hash of the store creation/update
- name: block_number_first
description: Block number of the first time the store metadata was posted to the chain
tests:
- not_null
@ -46,8 +37,4 @@ models:
- not_null
- accepted_values:
values: [true, false]
- name: _inserted_timestamp
description: Timestamp when the record was inserted into the table
tests:
- not_null

View File

@ -0,0 +1,38 @@
{{ config(
materialized = 'incremental',
unique_key = ['tx_hash','change_index'],
incremental_strategy = 'merge',
cluster_by = ['block_timestamp::DATE'],
post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION ON EQUALITY(store_address);",
tags = ['core','full_test']
) }}
SELECT
block_timestamp,
block_number,
version,
tx_hash,
change_index,
address AS store_address,
change_data :owner :: STRING AS owner_address,
{{ dbt_utils.generate_surrogate_key(
['tx_hash','change_index']
) }} AS fungiblestore_owners_id,
SYSDATE() AS inserted_timestamp,
SYSDATE() AS modified_timestamp,
'{{ invocation_id }}' AS _invocation_id
FROM
{{ ref('silver__changes') }}
WHERE
change_address = '0x1'
AND change_module = 'object'
AND change_resource = 'ObjectCore'
{% if is_incremental() %}
AND modified_timestamp >= (
SELECT
MAX(modified_timestamp)
FROM
{{ this }}
)
{% endif %}

View File

@ -1,7 +1,7 @@
version: 2
models:
- name: silver__store_owners
- name: silver__fungiblestore_owners
description: |
Tracks ownership of stores over time, capturing the latest owner for each store address.
This model is primarily used for fungible asset tracking, including USDT.

View File

@ -1,45 +0,0 @@
{{ config(
materialized = 'incremental',
unique_key = ['store_address'],
incremental_strategy = 'merge',
tags = ['core', 'fungible_assets', 'usdt']
) }}
-- This model identifies fungible stores that contain USDT tokens
-- Uses direct filtering with QUALIFY for better performance
SELECT
block_timestamp,
block_timestamp::DATE AS block_date,
block_number,
{{ dbt_utils.generate_surrogate_key( ['tx_hash'] ) }} AS transactions_id,
address AS store_address,
change_data:metadata:inner::STRING AS metadata_address,
{{ dbt_utils.generate_surrogate_key( ['tx_hash'] ) }} AS transactions_id,
-- Flag USDT stores for easy filtering
CASE
WHEN change_data:metadata:inner::STRING = '0x357b0b74bc833e95a115ad22604854d6b0fca151cecd94111770e5d6ffc9dc2b'
THEN TRUE
ELSE FALSE
END AS is_usdt,
SYSDATE() AS inserted_timestamp,
SYSDATE() AS modified_timestamp,
'{{ invocation_id }}' AS _invocation_id
FROM
{{ ref('silver__changes') }}
WHERE
change_module = 'fungible_asset'
AND change_resource = 'FungibleStore'
{% if is_incremental() %}
AND _inserted_timestamp >= (
SELECT
MAX(_inserted_timestamp)
FROM
{{ this }}
)
{% endif %}
QUALIFY
ROW_NUMBER() OVER (PARTITION BY address ORDER BY block_number DESC) = 1
AND is_usdt = TRUE -- Only include USDT stores

View File

@ -1,33 +0,0 @@
{{ config(
materialized = 'incremental',
unique_key = ['tx_hash','change_index'],
incremental_strategy = 'merge',
tags = ['core','full_test']
) }}
SELECT
block_timestamp,
block_timestamp::DATE AS block_date,
block_number,
tx_hash,
change_index,
address AS store_address,
change_data:owner::STRING AS owner_address,
_inserted_timestamp
FROM
{{ ref('silver__changes') }}
WHERE
change_address = '0x1'
AND change_module = 'object'
AND change_resource = 'ObjectCore'
{% if is_incremental() %}
-- Use _inserted_timestamp for incremental logic
AND _inserted_timestamp >= (
SELECT
MAX(_inserted_timestamp)
FROM
{{ this }}
)
{% endif %}
QUALIFY
ROW_NUMBER() OVER (PARTITION BY address ORDER BY block_number DESC) = 1

View File

@ -7,22 +7,19 @@
tags = ['core','full_test']
) }}
-- USDT Token Metadata Address: 0x357b0b74bc833e95a115ad22604854d6b0fca151cecd94111770e5d6ffc9dc2b
WITH events AS (
SELECT
block_number,
version,
success,
block_timestamp,
block_timestamp :: DATE AS block_date,
tx_hash,
event_index,
event_resource,
event_data :amount :: bigint AS amount,
-- Extract store address from event data if available, otherwise use account_address
COALESCE(event_data :store :: STRING, account_address) AS account_address,
creation_number,
account_address,
event_data :store :: STRING AS store_address,
SYSDATE() AS inserted_timestamp,
SYSDATE() AS modified_timestamp,
'{{ invocation_id }}' AS _invocation_id
@ -30,40 +27,38 @@ WITH events AS (
{{ ref('silver__events') }}
WHERE
event_module = 'fungible_asset'
AND event_resource IN ('WithdrawEvent', 'DepositEvent', 'Withdraw', 'Deposit')
AND event_resource IN (
'WithdrawEvent',
'DepositEvent',
'Withdraw',
'Deposit'
)
{% if is_incremental() %}
AND _inserted_timestamp >= (
AND modified_timestamp >= (
SELECT
MAX(_inserted_timestamp)
MAX(modified_timestamp)
FROM
{{ this }}
)
{% endif %}
),
chnges AS (
owners AS (
SELECT
block_timestamp :: DATE AS block_date,
tx_hash,
change_index,
change_data:metadata:inner::string AS token_address
version,
block_timestamp,
store_address,
owner_address
FROM
{{ ref('silver__changes') }}
WHERE
change_module = 'fungible_asset'
AND change_data:metadata:inner::string = '0x357b0b74bc833e95a115ad22604854d6b0fca151cecd94111770e5d6ffc9dc2b'
{% if is_incremental() %}
AND _inserted_timestamp >= (
{{ ref('silver__fungiblestore_owners') }}
),
md AS (
SELECT
MAX(_inserted_timestamp)
store_address,
metadata_address
FROM
{{ this }}
{{ ref('silver__fungiblestore_metadata') }}
)
{% endif %}
qualify(ROW_NUMBER() over(PARTITION BY tx_hash ORDER BY change_index DESC) = 1)
)
SELECT
e.block_number,
e.block_timestamp,
@ -74,8 +69,11 @@ SELECT
e.creation_number,
e.event_resource AS transfer_event,
e.account_address,
e.store_address,
o.owner_address,
md.metadata_address,
e.amount,
c.token_address,
C.token_address,
{{ dbt_utils.generate_surrogate_key(
['e.tx_hash','e.event_index']
) }} AS transfers_id,
@ -84,9 +82,10 @@ SELECT
e._inserted_timestamp,
'{{ invocation_id }}' AS _invocation_id
FROM
events e
INNER JOIN chnges c
ON e.block_date = c.block_date
AND e.tx_hash = c.tx_hash
WHERE
e.event_resource IN ('DepositEvent', 'Deposit','WithdrawEvent', 'Withdraw')
events e asof
JOIN owners o match_condition(
e.version >= o.version
)
ON e.store_address = o.store_address
LEFT JOIN md m
ON e.store_address = m.store_address

View File

@ -1,6 +1,6 @@
version: 2
models:
- name: silver__transfers_usdt
- name: silver__transfers_fungible
tests:
- dbt_constraints.primary_key:
column_name: TRANSFERS_ID

View File

@ -1,60 +0,0 @@
{{ config(
materialized = 'incremental',
unique_key = ['store_address'],
incremental_strategy = 'merge',
tags = ['core', 'usdt', 'ownership']
) }}
-- This model joins USDT fungible stores with their owners
-- Connects object ownership with USDT tokens for comprehensive tracking
WITH store_owners AS (
SELECT
block_timestamp,
block_number,
tx_hash,
store_address,
owner_address
FROM
{{ ref('silver__store_owners') }}
),
fungible_stores AS (
SELECT
block_timestamp,
block_number,
tx_hash,
store_address,
metadata_address,
is_usdt
FROM
{{ ref('silver__fungiblestore_usdt') }}
WHERE
is_usdt = TRUE
)
SELECT
o.block_timestamp AS ownership_timestamp,
o.block_timestamp::DATE AS ownership_date,
o.block_number AS ownership_block,
o.tx_hash AS ownership_tx_hash, -- Transaction that last changed ownership
o.store_address,
o.owner_address,
f.metadata_address,
f.block_number AS store_metadata_block,
f.tx_hash AS store_creation_tx_hash, -- Transaction that created/updated the store
f.is_usdt,
CURRENT_TIMESTAMP() AS _inserted_timestamp
FROM
store_owners o
JOIN
fungible_stores f ON o.store_address = f.store_address
{% if is_incremental() %}
WHERE
o.block_timestamp >= (
SELECT
MAX(ownership_timestamp)
FROM
{{ this }}
)
{% endif %}

View File

@ -1,67 +0,0 @@
version: 2
models:
- name: silver__usdt_fungible_ownership
description: |
Joins USDT fungible stores with their owners, connecting object ownership with USDT tokens
for comprehensive tracking. This model provides a complete view of USDT token ownership
by linking store metadata with owner information.
columns:
- name: ownership_timestamp
description: Timestamp when the ownership was last changed
tests:
- not_null
- name: ownership_date
description: Date when the ownership was last changed
tests:
- not_null
- name: ownership_block
description: Block number of the last ownership change
tests:
- not_null
- name: ownership_tx_hash
description: Transaction hash of the last ownership change
tests:
- not_null
- name: store_address
description: Address of the USDT store
tests:
- not_null
- unique
- name: owner_address
description: Current owner of the USDT store
tests:
- not_null
- name: metadata_address
description: Metadata address of the USDT token
tests:
- not_null
- name: store_metadata_block
description: Block number when the store metadata was created/updated
tests:
- not_null
- name: store_creation_tx_hash
description: Transaction hash that created/updated the store
tests:
- not_null
- name: is_usdt
description: Flag indicating if the store contains USDT tokens
tests:
- not_null
- accepted_values:
values: [true]
- name: _inserted_timestamp
description: Timestamp when the record was inserted into the table
tests:
- not_null