add market sector and market sector metadata pipelines

This commit is contained in:
mattromano 2025-11-10 16:02:17 -08:00
parent 9d6a723977
commit 7097228cc4
11 changed files with 235 additions and 0 deletions

View File

@ -18,6 +18,8 @@ sources:
- name: tt_project_metrics
- name: tt_datasets
- name: tt_dataset_values
- name: tt_market_sectors
- name: tt_market_sector_metadata
- name: tokenflow_eth
database: flipside_prod_db
schema: tokenflow_eth

View File

@ -0,0 +1,10 @@
{{ config (
materialized = 'view',
tags = ['tt_streamline_bronze']
) }}
{{ streamline_external_table_query_v2(
model = 'tt_market_sector_metadata',
partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 4), '_', 1) AS INTEGER)",
partition_name = "partition_key"
) }}

View File

@ -0,0 +1,10 @@
{{ config (
materialized = 'view',
tags = ['tt_streamline_bronze']
) }}
{{ streamline_external_table_FR_query_v2(
model = 'tt_market_sector_metadata',
partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 4), '_', 1) AS INTEGER)",
partition_name = "partition_key"
) }}

View File

@ -0,0 +1,10 @@
{{ config (
materialized = 'view',
tags = ['tt_streamline_bronze']
) }}
{{ streamline_external_table_query_v2(
model = 'tt_market_sectors',
partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 4), '_', 1) AS INTEGER)",
partition_name = "partition_key"
) }}

View File

@ -0,0 +1,10 @@
{{ config (
materialized = 'view',
tags = ['tt_streamline_bronze']
) }}
{{ streamline_external_table_FR_query_v2(
model = 'tt_market_sectors',
partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 4), '_', 1) AS INTEGER)",
partition_name = "partition_key"
) }}

View File

@ -0,0 +1,30 @@
{{ config(
materialized = 'incremental',
unique_key = 'dim_market_sector_metadata_id',
tags = ['tt']
) }}
SELECT
data:market_sector_id::STRING AS market_sector_id,
data:name::STRING AS market_sector_name,
data:description:how::STRING AS description_how,
data:description:what::STRING AS description_what,
project.value:project_id::STRING AS project_id,
project.value:name::STRING AS project_name,
project.value:url::STRING AS project_url,
sysdate() as inserted_timestamp,
sysdate() as modified_timestamp,
'{{ invocation_id }}' as _invocation_id,
{{ dbt_utils.generate_surrogate_key(
['market_sector_id', 'project_id']
) }} AS dim_market_sector_metadata_id
FROM {{ ref('silver__tt_market_sector_metadata') }},
LATERAL FLATTEN(input => data:projects) AS project
WHERE 1=1
{% if is_incremental() %}
AND modified_timestamp > (
SELECT coalesce(MAX(modified_timestamp), '2000-01-01') FROM {{ this }}
)
{% endif %}
QUALIFY ROW_NUMBER() OVER (PARTITION BY dim_market_sector_metadata_id ORDER BY modified_timestamp DESC) = 1

View File

@ -0,0 +1,25 @@
{{ config(
materialized = 'incremental',
unique_key = 'dim_market_sectors_id',
tags = ['tt']
) }}
SELECT
data:id::STRING AS market_sector_id,
data:name::STRING AS market_sector_name,
data:url::STRING AS market_sector_url,
sysdate() as inserted_timestamp,
sysdate() as modified_timestamp,
'{{ invocation_id }}' as _invocation_id,
{{ dbt_utils.generate_surrogate_key(
['market_sector_id']
) }} AS dim_market_sectors_id
FROM {{ ref('silver__tt_market_sectors') }}
WHERE 1=1
{% if is_incremental() %}
AND modified_timestamp > (
SELECT coalesce(MAX(modified_timestamp), '2000-01-01') FROM {{ this }}
)
{% endif %}
QUALIFY ROW_NUMBER() OVER (PARTITION BY dim_market_sectors_id ORDER BY modified_timestamp DESC) = 1

View File

@ -0,0 +1,35 @@
-- depends_on: {{ ref('bronze__tt_market_sector_metadata') }}
{{ config(
materialized = 'incremental',
unique_key = 'tt_market_sector_metadata_id',
cluster_by = ['run_date'],
tags = ['tt']
) }}
SELECT
value:"RUN_TIMESTAMP"::INT AS run_timestamp,
to_timestamp(run_timestamp)::date AS run_date,
partition_key,
DATA,
DATA:market_sector_id::STRING AS market_sector_id,
_inserted_timestamp,
sysdate() as inserted_timestamp,
sysdate() as modified_timestamp,
'{{ invocation_id }}' as _invocation_id,
{{ dbt_utils.generate_surrogate_key(
['market_sector_id','run_timestamp']
) }} AS tt_market_sector_metadata_id
FROM
{% if is_incremental() %}
{{ ref('bronze__tt_market_sector_metadata') }}
WHERE _inserted_timestamp > (
SELECT coalesce(max(_inserted_timestamp), '2000-01-01') FROM {{ this }}
)
{% else %}
{{ ref('bronze__tt_market_sector_metadata_FR') }}
{% endif %}
QUALIFY(
ROW_NUMBER() OVER (PARTITION BY tt_market_sector_metadata_id ORDER BY _inserted_timestamp DESC)
) = 1

View File

@ -0,0 +1,34 @@
-- depends_on: {{ ref('bronze__tt_market_sectors') }}
{{ config(
materialized = 'incremental',
unique_key = 'tt_market_sectors_id',
cluster_by = ['run_date'],
tags = ['tt']
) }}
SELECT
value:"RUN_TIMESTAMP"::INT AS run_timestamp,
to_timestamp(run_timestamp)::date AS run_date,
partition_key,
DATA,
DATA:id::STRING AS market_sector_id,
_inserted_timestamp,
sysdate() as inserted_timestamp,
sysdate() as modified_timestamp,
'{{ invocation_id }}' as _invocation_id,
{{ dbt_utils.generate_surrogate_key(
['market_sector_id','run_timestamp']
) }} AS tt_market_sectors_id
FROM
{% if is_incremental() %}
{{ ref('bronze__tt_market_sectors') }}
WHERE _inserted_timestamp > (
SELECT coalesce(max(_inserted_timestamp), '2000-01-01') FROM {{ this }}
)
{% else %}
{{ ref('bronze__tt_market_sectors_FR') }}
{% endif %}
QUALIFY(
ROW_NUMBER() OVER (PARTITION BY tt_market_sectors_id ORDER BY _inserted_timestamp DESC)
) = 1

View File

@ -0,0 +1,39 @@
{{ config (
materialized = "view",
post_hook = fsc_utils.if_data_call_function_v2(
func = 'streamline.udf_bulk_rest_api_v2',
target = "{{this.schema}}.{{this.identifier}}",
params ={ "external_table" :"tt_market_sector_metadata",
"sql_limit" :"24",
"producer_batch_size" :"1",
"worker_batch_size" :"1",
"async_concurrent_requests" :"1",
"sql_source" :"{{this.identifier}}",
"exploded_key": tojson(['data'])
}
),
tags = ['tt_streamline_realtime']
) }}
WITH relevant_market_sectors AS (
SELECT
market_sector_id
FROM {{ ref('token_terminal__dim_market_sectors') }}
WHERE market_sector_id IS NOT NULL
)
SELECT
market_sector_id,
date_part('epoch_second', sysdate()) as run_timestamp,
date_part('epoch_second', sysdate()::DATE) AS partition_key,
{{ target.database }}.live.udf_api(
'GET',
'https://api.tokenterminal.com/v2/market-sectors/' || market_sector_id,
OBJECT_CONSTRUCT(
'Authorization', 'Bearer {api_key}',
'fsc-quantum-state', 'streamline'
),
{},
'Vault/prod/external/token_terminal'
) AS request
FROM relevant_market_sectors

View File

@ -0,0 +1,30 @@
{{ config (
materialized = "view",
post_hook = fsc_utils.if_data_call_function_v2(
func = 'streamline.udf_bulk_rest_api_v2',
target = "{{this.schema}}.{{this.identifier}}",
params ={ "external_table" :"tt_market_sectors",
"sql_limit" :"1",
"producer_batch_size" :"1",
"worker_batch_size" :"1",
"async_concurrent_requests" :"1",
"sql_source" :"{{this.identifier}}",
"exploded_key": tojson(['data'])
}
),
tags = ['tt_streamline_realtime']
) }}
SELECT
date_part('epoch_second', sysdate()) as run_timestamp,
date_part('epoch_second', sysdate()::DATE) AS partition_key,
{{ target.database }}.live.udf_api(
'GET',
'https://api.tokenterminal.com/v2/market-sectors',
OBJECT_CONSTRUCT(
'Authorization', 'Bearer {api_key}',
'fsc-quantum-state', 'streamline'
),
{},
'Vault/prod/external/token_terminal'
) AS request