From 7097228cc45a859b4a85f4eb3abbdf1eb7c48e2c Mon Sep 17 00:00:00 2001 From: mattromano Date: Mon, 10 Nov 2025 16:02:17 -0800 Subject: [PATCH] add market sector and market sector metadata pipelines --- models/sources.yml | 2 + .../bronze__tt_market_sector_metadata.sql | 10 +++++ .../bronze__tt_market_sector_metadata_FR.sql | 10 +++++ .../bronze/bronze__tt_market_sectors.sql | 10 +++++ .../bronze/bronze__tt_market_sectors_FR.sql | 10 +++++ ...n_terminal__dim_market_sector_metadata.sql | 30 ++++++++++++++ .../token_terminal__dim_market_sectors.sql | 25 ++++++++++++ .../silver__tt_market_sector_metadata.sql | 35 +++++++++++++++++ .../silver/silver__tt_market_sectors.sql | 34 ++++++++++++++++ .../streamline__tt_market_sector_metadata.sql | 39 +++++++++++++++++++ .../streamline__tt_market_sectors.sql | 30 ++++++++++++++ 11 files changed, 235 insertions(+) create mode 100644 models/token_terminal/bronze/bronze__tt_market_sector_metadata.sql create mode 100644 models/token_terminal/bronze/bronze__tt_market_sector_metadata_FR.sql create mode 100644 models/token_terminal/bronze/bronze__tt_market_sectors.sql create mode 100644 models/token_terminal/bronze/bronze__tt_market_sectors_FR.sql create mode 100644 models/token_terminal/gold/token_terminal__dim_market_sector_metadata.sql create mode 100644 models/token_terminal/gold/token_terminal__dim_market_sectors.sql create mode 100644 models/token_terminal/silver/silver__tt_market_sector_metadata.sql create mode 100644 models/token_terminal/silver/silver__tt_market_sectors.sql create mode 100644 models/token_terminal/streamline/realtime/streamline__tt_market_sector_metadata.sql create mode 100644 models/token_terminal/streamline/realtime/streamline__tt_market_sectors.sql diff --git a/models/sources.yml b/models/sources.yml index 0322645..7e35c20 100644 --- a/models/sources.yml +++ b/models/sources.yml @@ -18,6 +18,8 @@ sources: - name: tt_project_metrics - name: tt_datasets - name: tt_dataset_values + - name: tt_market_sectors + - name: tt_market_sector_metadata - name: tokenflow_eth database: flipside_prod_db schema: tokenflow_eth diff --git a/models/token_terminal/bronze/bronze__tt_market_sector_metadata.sql b/models/token_terminal/bronze/bronze__tt_market_sector_metadata.sql new file mode 100644 index 0000000..6492907 --- /dev/null +++ b/models/token_terminal/bronze/bronze__tt_market_sector_metadata.sql @@ -0,0 +1,10 @@ +{{ config ( + materialized = 'view', + tags = ['tt_streamline_bronze'] +) }} +{{ streamline_external_table_query_v2( + model = 'tt_market_sector_metadata', + partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 4), '_', 1) AS INTEGER)", + partition_name = "partition_key" +) }} + diff --git a/models/token_terminal/bronze/bronze__tt_market_sector_metadata_FR.sql b/models/token_terminal/bronze/bronze__tt_market_sector_metadata_FR.sql new file mode 100644 index 0000000..7b7b107 --- /dev/null +++ b/models/token_terminal/bronze/bronze__tt_market_sector_metadata_FR.sql @@ -0,0 +1,10 @@ +{{ config ( + materialized = 'view', + tags = ['tt_streamline_bronze'] +) }} +{{ streamline_external_table_FR_query_v2( + model = 'tt_market_sector_metadata', + partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 4), '_', 1) AS INTEGER)", + partition_name = "partition_key" +) }} + diff --git a/models/token_terminal/bronze/bronze__tt_market_sectors.sql b/models/token_terminal/bronze/bronze__tt_market_sectors.sql new file mode 100644 index 0000000..3102f03 --- /dev/null +++ b/models/token_terminal/bronze/bronze__tt_market_sectors.sql @@ -0,0 +1,10 @@ +{{ config ( + materialized = 'view', + tags = ['tt_streamline_bronze'] +) }} +{{ streamline_external_table_query_v2( + model = 'tt_market_sectors', + partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 4), '_', 1) AS INTEGER)", + partition_name = "partition_key" +) }} + diff --git a/models/token_terminal/bronze/bronze__tt_market_sectors_FR.sql b/models/token_terminal/bronze/bronze__tt_market_sectors_FR.sql new file mode 100644 index 0000000..1ef8513 --- /dev/null +++ b/models/token_terminal/bronze/bronze__tt_market_sectors_FR.sql @@ -0,0 +1,10 @@ +{{ config ( + materialized = 'view', + tags = ['tt_streamline_bronze'] +) }} +{{ streamline_external_table_FR_query_v2( + model = 'tt_market_sectors', + partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 4), '_', 1) AS INTEGER)", + partition_name = "partition_key" +) }} + diff --git a/models/token_terminal/gold/token_terminal__dim_market_sector_metadata.sql b/models/token_terminal/gold/token_terminal__dim_market_sector_metadata.sql new file mode 100644 index 0000000..2f32e4d --- /dev/null +++ b/models/token_terminal/gold/token_terminal__dim_market_sector_metadata.sql @@ -0,0 +1,30 @@ +{{ config( + materialized = 'incremental', + unique_key = 'dim_market_sector_metadata_id', + tags = ['tt'] +) }} + +SELECT + data:market_sector_id::STRING AS market_sector_id, + data:name::STRING AS market_sector_name, + data:description:how::STRING AS description_how, + data:description:what::STRING AS description_what, + project.value:project_id::STRING AS project_id, + project.value:name::STRING AS project_name, + project.value:url::STRING AS project_url, + sysdate() as inserted_timestamp, + sysdate() as modified_timestamp, + '{{ invocation_id }}' as _invocation_id, + {{ dbt_utils.generate_surrogate_key( + ['market_sector_id', 'project_id'] + ) }} AS dim_market_sector_metadata_id +FROM {{ ref('silver__tt_market_sector_metadata') }}, + LATERAL FLATTEN(input => data:projects) AS project +WHERE 1=1 +{% if is_incremental() %} +AND modified_timestamp > ( + SELECT coalesce(MAX(modified_timestamp), '2000-01-01') FROM {{ this }} +) +{% endif %} +QUALIFY ROW_NUMBER() OVER (PARTITION BY dim_market_sector_metadata_id ORDER BY modified_timestamp DESC) = 1 + diff --git a/models/token_terminal/gold/token_terminal__dim_market_sectors.sql b/models/token_terminal/gold/token_terminal__dim_market_sectors.sql new file mode 100644 index 0000000..38023fb --- /dev/null +++ b/models/token_terminal/gold/token_terminal__dim_market_sectors.sql @@ -0,0 +1,25 @@ +{{ config( + materialized = 'incremental', + unique_key = 'dim_market_sectors_id', + tags = ['tt'] +) }} + +SELECT + data:id::STRING AS market_sector_id, + data:name::STRING AS market_sector_name, + data:url::STRING AS market_sector_url, + sysdate() as inserted_timestamp, + sysdate() as modified_timestamp, + '{{ invocation_id }}' as _invocation_id, + {{ dbt_utils.generate_surrogate_key( + ['market_sector_id'] + ) }} AS dim_market_sectors_id +FROM {{ ref('silver__tt_market_sectors') }} +WHERE 1=1 +{% if is_incremental() %} +AND modified_timestamp > ( + SELECT coalesce(MAX(modified_timestamp), '2000-01-01') FROM {{ this }} +) +{% endif %} +QUALIFY ROW_NUMBER() OVER (PARTITION BY dim_market_sectors_id ORDER BY modified_timestamp DESC) = 1 + diff --git a/models/token_terminal/silver/silver__tt_market_sector_metadata.sql b/models/token_terminal/silver/silver__tt_market_sector_metadata.sql new file mode 100644 index 0000000..69ee053 --- /dev/null +++ b/models/token_terminal/silver/silver__tt_market_sector_metadata.sql @@ -0,0 +1,35 @@ +-- depends_on: {{ ref('bronze__tt_market_sector_metadata') }} +{{ config( + materialized = 'incremental', + unique_key = 'tt_market_sector_metadata_id', + cluster_by = ['run_date'], + tags = ['tt'] +) }} + +SELECT + value:"RUN_TIMESTAMP"::INT AS run_timestamp, + to_timestamp(run_timestamp)::date AS run_date, + partition_key, + DATA, + DATA:market_sector_id::STRING AS market_sector_id, + _inserted_timestamp, + sysdate() as inserted_timestamp, + sysdate() as modified_timestamp, + '{{ invocation_id }}' as _invocation_id, + {{ dbt_utils.generate_surrogate_key( + ['market_sector_id','run_timestamp'] + ) }} AS tt_market_sector_metadata_id +FROM + {% if is_incremental() %} + {{ ref('bronze__tt_market_sector_metadata') }} + WHERE _inserted_timestamp > ( + SELECT coalesce(max(_inserted_timestamp), '2000-01-01') FROM {{ this }} + ) + {% else %} + {{ ref('bronze__tt_market_sector_metadata_FR') }} + {% endif %} + +QUALIFY( + ROW_NUMBER() OVER (PARTITION BY tt_market_sector_metadata_id ORDER BY _inserted_timestamp DESC) +) = 1 + diff --git a/models/token_terminal/silver/silver__tt_market_sectors.sql b/models/token_terminal/silver/silver__tt_market_sectors.sql new file mode 100644 index 0000000..aadab1b --- /dev/null +++ b/models/token_terminal/silver/silver__tt_market_sectors.sql @@ -0,0 +1,34 @@ +-- depends_on: {{ ref('bronze__tt_market_sectors') }} +{{ config( + materialized = 'incremental', + unique_key = 'tt_market_sectors_id', + cluster_by = ['run_date'], + tags = ['tt'] +) }} + +SELECT + value:"RUN_TIMESTAMP"::INT AS run_timestamp, + to_timestamp(run_timestamp)::date AS run_date, + partition_key, + DATA, + DATA:id::STRING AS market_sector_id, + _inserted_timestamp, + sysdate() as inserted_timestamp, + sysdate() as modified_timestamp, + '{{ invocation_id }}' as _invocation_id, + {{ dbt_utils.generate_surrogate_key( + ['market_sector_id','run_timestamp'] + ) }} AS tt_market_sectors_id +FROM + {% if is_incremental() %} + {{ ref('bronze__tt_market_sectors') }} + WHERE _inserted_timestamp > ( + SELECT coalesce(max(_inserted_timestamp), '2000-01-01') FROM {{ this }} + ) + {% else %} + {{ ref('bronze__tt_market_sectors_FR') }} + {% endif %} + +QUALIFY( + ROW_NUMBER() OVER (PARTITION BY tt_market_sectors_id ORDER BY _inserted_timestamp DESC) +) = 1 \ No newline at end of file diff --git a/models/token_terminal/streamline/realtime/streamline__tt_market_sector_metadata.sql b/models/token_terminal/streamline/realtime/streamline__tt_market_sector_metadata.sql new file mode 100644 index 0000000..20e9fb5 --- /dev/null +++ b/models/token_terminal/streamline/realtime/streamline__tt_market_sector_metadata.sql @@ -0,0 +1,39 @@ +{{ config ( + materialized = "view", + post_hook = fsc_utils.if_data_call_function_v2( + func = 'streamline.udf_bulk_rest_api_v2', + target = "{{this.schema}}.{{this.identifier}}", + params ={ "external_table" :"tt_market_sector_metadata", + "sql_limit" :"24", + "producer_batch_size" :"1", + "worker_batch_size" :"1", + "async_concurrent_requests" :"1", + "sql_source" :"{{this.identifier}}", + "exploded_key": tojson(['data']) + } + ), + tags = ['tt_streamline_realtime'] +) }} + +WITH relevant_market_sectors AS ( + SELECT + market_sector_id + FROM {{ ref('token_terminal__dim_market_sectors') }} + WHERE market_sector_id IS NOT NULL +) + +SELECT + market_sector_id, + date_part('epoch_second', sysdate()) as run_timestamp, + date_part('epoch_second', sysdate()::DATE) AS partition_key, + {{ target.database }}.live.udf_api( + 'GET', + 'https://api.tokenterminal.com/v2/market-sectors/' || market_sector_id, + OBJECT_CONSTRUCT( + 'Authorization', 'Bearer {api_key}', + 'fsc-quantum-state', 'streamline' + ), + {}, + 'Vault/prod/external/token_terminal' + ) AS request +FROM relevant_market_sectors \ No newline at end of file diff --git a/models/token_terminal/streamline/realtime/streamline__tt_market_sectors.sql b/models/token_terminal/streamline/realtime/streamline__tt_market_sectors.sql new file mode 100644 index 0000000..4f5cb7a --- /dev/null +++ b/models/token_terminal/streamline/realtime/streamline__tt_market_sectors.sql @@ -0,0 +1,30 @@ +{{ config ( + materialized = "view", + post_hook = fsc_utils.if_data_call_function_v2( + func = 'streamline.udf_bulk_rest_api_v2', + target = "{{this.schema}}.{{this.identifier}}", + params ={ "external_table" :"tt_market_sectors", + "sql_limit" :"1", + "producer_batch_size" :"1", + "worker_batch_size" :"1", + "async_concurrent_requests" :"1", + "sql_source" :"{{this.identifier}}", + "exploded_key": tojson(['data']) + } + ), + tags = ['tt_streamline_realtime'] +) }} + +SELECT + date_part('epoch_second', sysdate()) as run_timestamp, + date_part('epoch_second', sysdate()::DATE) AS partition_key, + {{ target.database }}.live.udf_api( + 'GET', + 'https://api.tokenterminal.com/v2/market-sectors', + OBJECT_CONSTRUCT( + 'Authorization', 'Bearer {api_key}', + 'fsc-quantum-state', 'streamline' + ), + {}, + 'Vault/prod/external/token_terminal' + ) AS request \ No newline at end of file