This commit is contained in:
Austin 2025-11-10 16:40:55 -05:00
parent 256b321030
commit 9d6a723977
13 changed files with 391 additions and 0 deletions

View File

@ -16,6 +16,8 @@ sources:
- name: tt_projects
- name: tt_metrics
- name: tt_project_metrics
- name: tt_datasets
- name: tt_dataset_values
- name: tokenflow_eth
database: flipside_prod_db
schema: tokenflow_eth

View File

@ -0,0 +1,9 @@
{{ config (
materialized = 'view',
tags = ['tt_streamline_bronze']
) }}
{{ streamline_external_table_query_v2(
model = 'tt_dataset_values',
partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 4), '_', 1) AS INTEGER)",
partition_name = "partition_key"
) }}

View File

@ -0,0 +1,9 @@
{{ config (
materialized = 'view',
tags = ['tt_streamline_bronze']
) }}
{{ streamline_external_table_FR_query_v2(
model = 'tt_dataset_values',
partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 4), '_', 1) AS INTEGER)",
partition_name = "partition_key"
) }}

View File

@ -0,0 +1,9 @@
{{ config (
materialized = 'view',
tags = ['tt_streamline_bronze']
) }}
{{ streamline_external_table_query_v2(
model = 'tt_dataset_values',
partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 4), '_', 1) AS INTEGER)",
partition_name = "partition_key"
) }}

View File

@ -0,0 +1,9 @@
{{ config (
materialized = 'view',
tags = ['tt_streamline_bronze']
) }}
{{ streamline_external_table_FR_query_v2(
model = 'tt_datasets',
partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 4), '_', 1) AS INTEGER)",
partition_name = "partition_key"
) }}

View File

@ -0,0 +1,23 @@
{{ config(
materialized = 'incremental',
unique_key = 'dim_datasets_id',
tags = ['tt']
) }}
SELECT
mart_id,
data:name::STRING AS name,
sysdate() as inserted_timestamp,
sysdate() as modified_timestamp,
'{{ invocation_id }}' as _invocation_id,
{{ dbt_utils.generate_surrogate_key(
['mart_id']
) }} AS dim_datasets_id
FROM {{ ref('silver__tt_datasets') }}
WHERE 1=1
{% if is_incremental() %}
AND modified_timestamp > (
SELECT coalesce(MAX(modified_timestamp), '2000-01-01') FROM {{ this }}
)
{% endif %}
QUALIFY ROW_NUMBER() OVER (PARTITION BY dim_datasets_id ORDER BY modified_timestamp DESC) = 1

View File

@ -0,0 +1,38 @@
{{ config(
materialized = 'incremental',
unique_key = 'ez_cohort_analysis_id',
tags = ['tt']
) }}
SELECT
data:cohort::string::timestamp AS cohort_timestamp,
data:project_id::string AS project_id,
data:project_name::string AS project_name,
data:initial_cohort_size::int AS initial_cohort_size,
data:market_sector_id::string AS market_sector_id,
data:market_sector_name::string AS market_sector_name,
data:month_0::float AS month_0,
data:month_1::float AS month_1,
data:month_2::float AS month_2,
data:month_3::float AS month_3,
data:month_4::float AS month_4,
data:month_5::float AS month_5,
data:month_6::float AS month_6,
data:month_7::float AS month_7,
data:month_8::float AS month_8,
data:month_9::float AS month_9,
data:month_10::float AS month_10,
data:month_11::float AS month_11,
sysdate() as inserted_timestamp,
sysdate() as modified_timestamp,
'{{ invocation_id }}' as _invocation_id,
{{ dbt_utils.generate_surrogate_key(
['cohort_timestamp','project_id']
) }} AS ez_cohort_analysis_id
FROM {{ ref('silver__tt_dataset_values') }}
WHERE mart_id = 'cohort_analysis'
{% if is_incremental() %}
AND modified_timestamp > (
SELECT coalesce(MAX(modified_timestamp), '2000-01-01') FROM {{ this }}
)
{% endif %}
QUALIFY ROW_NUMBER() OVER (PARTITION BY ez_cohort_analysis_id ORDER BY modified_timestamp DESC) = 1

View File

@ -0,0 +1,24 @@
{{ config(
materialized = 'incremental',
unique_key = 'ez_crypto_screener_id',
tags = ['tt']
) }}
SELECT
data:project_id::string AS project_id,
data:project_name::string AS project_name,
data as screener_data,
-- add some columns here, too many to list them all though
sysdate() as inserted_timestamp,
sysdate() as modified_timestamp,
'{{ invocation_id }}' as _invocation_id,
{{ dbt_utils.generate_surrogate_key(
['project_id']
) }} AS ez_crypto_screener_id
FROM {{ ref('silver__tt_dataset_values') }}
WHERE mart_id = 'crypto_screener'
{% if is_incremental() %}
AND modified_timestamp > (
SELECT coalesce(MAX(modified_timestamp), '2000-01-01') FROM {{ this }}
)
{% endif %}
QUALIFY ROW_NUMBER() OVER (PARTITION BY ez_crypto_screener_id ORDER BY modified_timestamp DESC) = 1

View File

@ -0,0 +1,130 @@
{{ config(
materialized = 'incremental',
unique_key = 'ez_stablecoins_id',
tags = ['tt']
) }}
SELECT
ifnull(data:account_creation_timestamp::timestamp, '1970-01-01') AS account_creation_timestamp,
data:chain_id::string AS chain_id,
data:chain_name::string AS chain_name,
data:market_sector_id::string AS market_sector_id,
data:market_sector_name::string AS market_sector_name,
ifnull(data:mints_180d_sum::float, 0) AS mints_180d_sum,
ifnull(data:mints_180d_trend::float, 0) AS mints_180d_trend,
ifnull(data:mints_1d_sum::float, 0) AS mints_1d_sum,
ifnull(data:mints_30d_sum::float, 0) AS mints_30d_sum,
ifnull(data:mints_30d_trend::float, 0) AS mints_30d_trend,
ifnull(data:mints_365d_sum::float, 0) AS mints_365d_sum,
ifnull(data:mints_365d_trend::float, 0) AS mints_365d_trend,
ifnull(data:mints_7d_sum::float, 0) AS mints_7d_sum,
ifnull(data:mints_7d_trend::float, 0) AS mints_7d_trend,
ifnull(data:mints_90d_sum::float, 0) AS mints_90d_sum,
ifnull(data:mints_90d_trend::float, 0) AS mints_90d_trend,
ifnull(data:mints_max_sum::float, 0) AS mints_max_sum,
ifnull(data:outstanding_supply_180d_change::float, 0) AS outstanding_supply_180d_change,
ifnull(data:outstanding_supply_1d_change::float, 0) AS outstanding_supply_1d_change,
ifnull(data:outstanding_supply_30d_change::float, 0) AS outstanding_supply_30d_change,
ifnull(data:outstanding_supply_365d_change::float, 0) AS outstanding_supply_365d_change,
ifnull(data:outstanding_supply_7d_change::float, 0) AS outstanding_supply_7d_change,
ifnull(data:outstanding_supply_90d_change::float, 0) AS outstanding_supply_90d_change,
ifnull(data:outstanding_supply_latest::float, 0) AS outstanding_supply_latest,
ifnull(data:percent_off_peg_180d::float, 0) AS percent_off_peg_180d,
ifnull(data:percent_off_peg_1d::float, 0) AS percent_off_peg_1d,
ifnull(data:percent_off_peg_30d::float, 0) AS percent_off_peg_30d,
ifnull(data:percent_off_peg_365d::float, 0) AS percent_off_peg_365d,
ifnull(data:percent_off_peg_7d::float, 0) AS percent_off_peg_7d,
ifnull(data:percent_off_peg_90d::float, 0) AS percent_off_peg_90d,
ifnull(data:percent_off_peg_latest::float, 0) AS percent_off_peg_latest,
ifnull(data:percent_off_peg_max::float, 0) AS percent_off_peg_max,
ifnull(data:price_180d_change::float, 0) AS price_180d_change,
ifnull(data:price_1d_change::float, 0) AS price_1d_change,
ifnull(data:price_30d_change::float, 0) AS price_30d_change,
ifnull(data:price_365d_change::float, 0) AS price_365d_change,
ifnull(data:price_7d_change::float, 0) AS price_7d_change,
ifnull(data:price_90d_change::float, 0) AS price_90d_change,
ifnull(data:price_latest::float, 0) AS price_latest,
ifnull(data:project_id::string, '') AS project_id,
ifnull(data:project_name::string, '') AS project_name,
ifnull(data:redemptions_180d_sum::float, 0) AS redemptions_180d_sum,
ifnull(data:redemptions_180d_trend::float, 0) AS redemptions_180d_trend,
ifnull(data:redemptions_1d_sum::float, 0) AS redemptions_1d_sum,
ifnull(data:redemptions_30d_sum::float, 0) AS redemptions_30d_sum,
ifnull(data:redemptions_30d_trend::float, 0) AS redemptions_30d_trend,
ifnull(data:redemptions_365d_sum::float, 0) AS redemptions_365d_sum,
ifnull(data:redemptions_365d_trend::float, 0) AS redemptions_365d_trend,
ifnull(data:redemptions_7d_sum::float, 0) AS redemptions_7d_sum,
ifnull(data:redemptions_7d_trend::float, 0) AS redemptions_7d_trend,
ifnull(data:redemptions_90d_sum::float, 0) AS redemptions_90d_sum,
ifnull(data:redemptions_90d_trend::float, 0) AS redemptions_90d_trend,
ifnull(data:redemptions_max_sum::float, 0) AS redemptions_max_sum,
ifnull(data:stablecoin_name::string, '') AS stablecoin_name,
ifnull(data:stablecoin_peg::string, '') AS stablecoin_peg,
ifnull(data:stablecoin_senders_180d_sum::float, 0) AS stablecoin_senders_180d_sum,
ifnull(data:stablecoin_senders_180d_trend::float, 0) AS stablecoin_senders_180d_trend,
ifnull(data:stablecoin_senders_1d_sum::float, 0) AS stablecoin_senders_1d_sum,
ifnull(data:stablecoin_senders_1d_trend::float, 0) AS stablecoin_senders_1d_trend,
ifnull(data:stablecoin_senders_30d_sum::float, 0) AS stablecoin_senders_30d_sum,
ifnull(data:stablecoin_senders_30d_trend::float, 0) AS stablecoin_senders_30d_trend,
ifnull(data:stablecoin_senders_365d_sum::float, 0) AS stablecoin_senders_365d_sum,
ifnull(data:stablecoin_senders_365d_trend::float, 0) AS stablecoin_senders_365d_trend,
ifnull(data:stablecoin_senders_7d_sum::float, 0) AS stablecoin_senders_7d_sum,
ifnull(data:stablecoin_senders_7d_trend::float, 0) AS stablecoin_senders_7d_trend,
ifnull(data:stablecoin_senders_90d_sum::float, 0) AS stablecoin_senders_90d_sum,
ifnull(data:stablecoin_senders_90d_trend::float, 0) AS stablecoin_senders_90d_trend,
ifnull(data:stablecoin_ticker::string, '') AS stablecoin_ticker,
ifnull(data:token_address::string, '') AS token_address,
ifnull(data:tokenholders_180d_change::float, 0) AS tokenholders_180d_change,
ifnull(data:tokenholders_1d_change::float, 0) AS tokenholders_1d_change,
ifnull(data:tokenholders_30d_change::float, 0) AS tokenholders_30d_change,
ifnull(data:tokenholders_365d_change::float, 0) AS tokenholders_365d_change,
ifnull(data:tokenholders_7d_change::float, 0) AS tokenholders_7d_change,
ifnull(data:tokenholders_90d_change::float, 0) AS tokenholders_90d_change,
ifnull(data:tokenholders_latest::float, 0) AS tokenholders_latest,
ifnull(data:transfer_count_180d_sum::float, 0) AS transfer_count_180d_sum,
ifnull(data:transfer_count_180d_trend::float, 0) AS transfer_count_180d_trend,
ifnull(data:transfer_count_1d_sum::float, 0) AS transfer_count_1d_sum,
ifnull(data:transfer_count_1d_trend::float, 0) AS transfer_count_1d_trend,
ifnull(data:transfer_count_30d_sum::float, 0) AS transfer_count_30d_sum,
ifnull(data:transfer_count_30d_trend::float, 0) AS transfer_count_30d_trend,
ifnull(data:transfer_count_365d_sum::float, 0) AS transfer_count_365d_sum,
ifnull(data:transfer_count_365d_trend::float, 0) AS transfer_count_365d_trend,
ifnull(data:transfer_count_7d_sum::float, 0) AS transfer_count_7d_sum,
ifnull(data:transfer_count_7d_trend::float, 0) AS transfer_count_7d_trend,
ifnull(data:transfer_count_90d_sum::float, 0) AS transfer_count_90d_sum,
ifnull(data:transfer_count_90d_trend::float, 0) AS transfer_count_90d_trend,
ifnull(data:transfer_count_max_sum::float, 0) AS transfer_count_max_sum,
ifnull(data:transfer_value_average_180d_change::float, 0) AS transfer_value_average_180d_change,
ifnull(data:transfer_value_average_1d_change::float, 0) AS transfer_value_average_1d_change,
ifnull(data:transfer_value_average_30d_change::float, 0) AS transfer_value_average_30d_change,
ifnull(data:transfer_value_average_365d_change::float, 0) AS transfer_value_average_365d_change,
ifnull(data:transfer_value_average_7d_change::float, 0) AS transfer_value_average_7d_change,
ifnull(data:transfer_value_average_90d_change::float, 0) AS transfer_value_average_90d_change,
ifnull(data:transfer_value_average_latest::float, 0) AS transfer_value_average_latest,
ifnull(data:transfer_volume_180d_sum::float, 0) AS transfer_volume_180d_sum,
ifnull(data:transfer_volume_180d_trend::float, 0) AS transfer_volume_180d_trend,
ifnull(data:transfer_volume_1d_sum::float, 0) AS transfer_volume_1d_sum,
ifnull(data:transfer_volume_1d_trend::float, 0) AS transfer_volume_1d_trend,
ifnull(data:transfer_volume_30d_sum::float, 0) AS transfer_volume_30d_sum,
ifnull(data:transfer_volume_30d_trend::float, 0) AS transfer_volume_30d_trend,
ifnull(data:transfer_volume_365d_sum::float, 0) AS transfer_volume_365d_sum,
ifnull(data:transfer_volume_365d_trend::float, 0) AS transfer_volume_365d_trend,
ifnull(data:transfer_volume_7d_sum::float, 0) AS transfer_volume_7d_sum,
ifnull(data:transfer_volume_7d_trend::float, 0) AS transfer_volume_7d_trend,
ifnull(data:transfer_volume_90d_sum::float, 0) AS transfer_volume_90d_sum,
ifnull(data:transfer_volume_90d_trend::float, 0) AS transfer_volume_90d_trend,
ifnull(data:transfer_volume_max_sum::float, 0) AS transfer_volume_max_sum,
sysdate() as inserted_timestamp,
sysdate() as modified_timestamp,
'{{ invocation_id }}' as _invocation_id,
{{ dbt_utils.generate_surrogate_key(
['chain_id', 'token_address']
) }} AS ez_stablecoins_id
FROM {{ ref('silver__tt_dataset_values') }}
WHERE mart_id = 'stablecoins'
{% if is_incremental() %}
AND modified_timestamp > (
SELECT coalesce(MAX(modified_timestamp), '2000-01-01') FROM {{ this }}
)
{% endif %}
QUALIFY ROW_NUMBER() OVER (PARTITION BY ez_stablecoins_id ORDER BY modified_timestamp DESC) = 1

View File

@ -0,0 +1,36 @@
-- depends_on: {{ ref('bronze__tt_dataset_values') }}
{{ config(
materialized = 'incremental',
incremental_strategy = 'delete+insert',
unique_key = ['mart_id','run_timestamp'],
cluster_by = ['run_date'],
tags = ['tt']
) }}
SELECT
value:"RUN_TIMESTAMP"::INT AS run_timestamp,
value:"MART_ID"::STRING AS mart_id,
value:array_index::integer AS array_index,
to_timestamp(run_timestamp)::date AS run_date,
partition_key,
DATA,
_inserted_timestamp,
sysdate() as inserted_timestamp,
sysdate() as modified_timestamp,
'{{ invocation_id }}' as _invocation_id,
{{ dbt_utils.generate_surrogate_key(
['mart_id','run_timestamp','array_index']
) }} AS tt_dataset_values_id
from
{% if is_incremental() %}
{{ ref('bronze__tt_dataset_values') }}
where _inserted_timestamp > (
select coalesce(max(_inserted_timestamp), '2000-01-01') from {{ this }}
)
{% else %}
{{ ref('bronze__tt_dataset_values_FR') }}
{% endif %}
QUALIFY(
ROW_NUMBER() OVER (PARTITION BY tt_dataset_values_id ORDER BY _inserted_timestamp DESC)
) = 1

View File

@ -0,0 +1,34 @@
-- depends_on: {{ ref('bronze__tt_datasets') }}
{{ config(
materialized = 'incremental',
unique_key = 'tt_datasets_id',
cluster_by = ['run_date'],
tags = ['tt']
) }}
SELECT
value:"RUN_TIMESTAMP"::INT AS run_timestamp,
to_timestamp(run_timestamp)::date AS run_date,
partition_key,
DATA,
DATA:mart_id::STRING AS mart_id,
_inserted_timestamp,
sysdate() as inserted_timestamp,
sysdate() as modified_timestamp,
'{{ invocation_id }}' as _invocation_id,
{{ dbt_utils.generate_surrogate_key(
['mart_id','run_timestamp']
) }} AS tt_datasets_id
from
{% if is_incremental() %}
{{ ref('bronze__tt_datasets') }}
where _inserted_timestamp > (
select coalesce(max(_inserted_timestamp), '2000-01-01') from {{ this }}
)
{% else %}
{{ ref('bronze__tt_datasets_FR') }}
{% endif %}
QUALIFY(
ROW_NUMBER() OVER (PARTITION BY tt_datasets_id ORDER BY _inserted_timestamp DESC)
) = 1

View File

@ -0,0 +1,38 @@
{{ config (
materialized = "view",
post_hook = fsc_utils.if_data_call_function_v2(
func = 'streamline.udf_bulk_rest_api_v2',
target = "{{this.schema}}.{{this.identifier}}",
params ={ "external_table" :"tt_dataset_values",
"sql_limit" :"100",
"producer_batch_size" :"1",
"worker_batch_size" :"1",
"async_concurrent_requests" :"1",
"sql_source" :"{{this.identifier}}",
"exploded_key": tojson(['data'])
}
),
tags = ['tt_streamline_realtime']
) }}
with relevant_datasets as (
select
mart_id
from {{ ref('token_terminal__dim_datasets') }}
)
SELECT
mart_id,
date_part('epoch_second', sysdate()) as run_timestamp,
date_part('epoch_second', sysdate()::DATE) AS partition_key,
{{ target.database }}.live.udf_api(
'GET',
'https://api.tokenterminal.com/v2/datasets/' || mart_id,
OBJECT_CONSTRUCT(
'Authorization', 'Bearer {api_key}',
'fsc-quantum-state', 'streamline'
),
{},
'Vault/prod/external/token_terminal'
) AS request
from relevant_datasets

View File

@ -0,0 +1,30 @@
{{ config (
materialized = "view",
post_hook = fsc_utils.if_data_call_function_v2(
func = 'streamline.udf_bulk_rest_api_v2',
target = "{{this.schema}}.{{this.identifier}}",
params ={ "external_table" :"tt_datasets",
"sql_limit" :"1",
"producer_batch_size" :"1",
"worker_batch_size" :"1",
"async_concurrent_requests" :"1",
"sql_source" :"{{this.identifier}}",
"exploded_key": tojson(['data'])
}
),
tags = ['tt_streamline_realtime']
) }}
SELECT
date_part('epoch_second', sysdate()) as run_timestamp,
date_part('epoch_second', sysdate()::DATE) AS partition_key,
{{ target.database }}.live.udf_api(
'GET',
'https://api.tokenterminal.com/v2/datasets',
OBJECT_CONSTRUCT(
'Authorization', 'Bearer {api_key}',
'fsc-quantum-state', 'streamline'
),
{},
'Vault/prod/external/token_terminal'
) AS request