add dim_tokens add merge exclude

This commit is contained in:
Eric Laurello 2025-07-08 14:59:12 -04:00
parent 4ee0733868
commit 043b68d428
14 changed files with 152 additions and 6 deletions

View File

@ -0,0 +1,73 @@
{{ config (
materialized = 'incremental',
unique_key = "coin_type",
merge_exclude_columns = ["inserted_timestamp"],
full_refresh = false,
tags = ['silver','core']
) }}
WITH coins AS (
SELECT
coin_type
FROM
{{ ref('silver__coin_types') }}
{% if is_incremental() %}
EXCEPT
SELECT
coin_type
FROM
{{ this }}
WHERE
decimals IS NOT NULL --rerun if decimals is null and inserted_timestamp is within the last 7 days (if the token still doesnt have decimals after 7 day then we will stop trying)
OR (
decimals IS NULL
AND inserted_timestamp < CURRENT_DATE -7
)
{% endif %}
LIMIT
100
), lq AS (
SELECT
coin_type,
{{ target.database }}.live.udf_api(
'POST',
'{Service}/{Authentication}',
OBJECT_CONSTRUCT(
'Content-Type',
'application/json',
'fsc-quantum-state',
'livequery'
),
OBJECT_CONSTRUCT(
'jsonrpc',
'2.0',
'id',
1,
'method',
'suix_getCoinMetadata',
'params',
ARRAY_CONSTRUCT(
coin_type
)
),
'Vault/prod/sui/quicknode/mainnet'
) :data: "result" AS DATA
FROM
coins
)
SELECT
coin_type,
DATA :decimals :: INT AS decimals,
DATA :description :: STRING AS description,
DATA :iconUrl :: STRING AS icon_url,
DATA :name :: STRING AS NAME,
DATA :symbol :: STRING AS symbol,
DATA :id :: STRING AS id,
{{ dbt_utils.generate_surrogate_key(['coin_type']) }} AS coin_metadata_id,
SYSDATE() AS inserted_timestamp,
SYSDATE() AS modified_timestamp,
'{{ invocation_id }}' AS _invocation_id
FROM
lq

View File

@ -0,0 +1,31 @@
{{ config (
materialized = "incremental",
unique_key = "coin_type",
merge_exclude_columns = ["inserted_timestamp"],
tags = ['gold','core']
) }}
SELECT
coin_type,
decimals,
symbol,
NAME,
description,
icon_url,
id,
{{ dbt_utils.generate_surrogate_key(['coin_type']) }} AS coin_types_id,
{{ dbt_utils.generate_surrogate_key(['coin_type']) }} AS dim_tokens_id,
SYSDATE() AS inserted_timestamp,
SYSDATE() AS modified_timestamp
FROM
{{ ref('bronze_api__coin_metadata') }}
{% if is_incremental() %}
WHERE
modified_timestamp >= (
SELECT
MAX(modified_timestamp) AS modified_timestamp
FROM
{{ this }}
)
{% endif %}

View File

@ -3,6 +3,7 @@
unique_key = "fact_transaction_balance_changes_id",
cluster_by = ['block_timestamp::DATE'],
incremental_predicates = ["dynamic_range_predicate", "block_timestamp::date"],
merge_exclude_columns = ["inserted_timestamp"],
tags = ['gold','core']
) }}

View File

@ -3,6 +3,7 @@
unique_key = "fact_changes_id",
cluster_by = ['block_timestamp::DATE'],
incremental_predicates = ["dynamic_range_predicate", "block_timestamp::date"],
merge_exclude_columns = ["inserted_timestamp"],
tags = ['gold','core']
) }}

View File

@ -3,6 +3,7 @@
unique_key = "checkpoint_number",
cluster_by = ['block_timestamp::DATE'],
incremental_predicates = ["dynamic_range_predicate", "block_timestamp::date"],
merge_exclude_columns = ["inserted_timestamp"],
tags = ['gold','core']
) }}

View File

@ -3,6 +3,7 @@
unique_key = "fact_events_id",
cluster_by = ['block_timestamp::DATE'],
incremental_predicates = ["dynamic_range_predicate", "block_timestamp::date"],
merge_exclude_columns = ["inserted_timestamp"],
tags = ['gold','core']
) }}

View File

@ -3,6 +3,7 @@
unique_key = "tx_digest",
cluster_by = ['block_timestamp::DATE'],
incremental_predicates = ["dynamic_range_predicate", "block_timestamp::date"],
merge_exclude_columns = ["inserted_timestamp"],
tags = ['gold','core']
) }}

View File

@ -3,6 +3,7 @@
unique_key = "fact_transaction_inputs_id",
cluster_by = ['block_timestamp::DATE'],
incremental_predicates = ["dynamic_range_predicate", "block_timestamp::date"],
merge_exclude_columns = ["inserted_timestamp"],
tags = ['gold','core']
) }}

View File

@ -3,6 +3,7 @@
unique_key = "fact_transactions_id",
cluster_by = ['block_timestamp::DATE'],
incremental_predicates = ["dynamic_range_predicate", "block_timestamp::date"],
merge_exclude_columns = ["inserted_timestamp"],
tags = ['gold','core']
) }}

View File

@ -4,6 +4,7 @@
unique_key = "checkpoint_number",
cluster_by = ['modified_timestamp::DATE','block_timestamp::DATE'],
incremental_predicates = ["dynamic_range_predicate", "block_timestamp::date"],
merge_exclude_columns = ["inserted_timestamp"],
tags = ['silver','core']
) }}

View File

@ -0,0 +1,33 @@
-- depends_on: {{ ref('bronze__transactions') }}
{{ config (
materialized = "incremental",
unique_key = "coin_type",
merge_exclude_columns = ["inserted_timestamp"],
tags = ['silver','core']
) }}
WITH coins AS (
SELECT
DISTINCT coin_type
FROM
{{ ref('core__fact_balance_changes') }}
{% if is_incremental() %}
WHERE
modified_timestamp >= (
SELECT
MAX(modified_timestamp)
FROM
{{ this }}
)
{% endif %}
)
SELECT
coin_type,
{{ dbt_utils.generate_surrogate_key(['coin_type']) }} AS coin_types_id,
SYSDATE() AS inserted_timestamp,
SYSDATE() AS modified_timestamp,
'{{ invocation_id }}' AS _invocation_id
FROM
coins

View File

@ -4,6 +4,7 @@
unique_key = "tx_digest",
cluster_by = ['modified_timestamp::DATE','block_timestamp::DATE'],
incremental_predicates = ["dynamic_range_predicate", "block_timestamp::date"],
merge_exclude_columns = ["inserted_timestamp"],
tags = ['silver','core']
) }}

View File

@ -4,9 +4,9 @@
func = 'streamline.udf_bulk_rest_api_v2',
target = "{{this.schema}}.{{this.identifier}}",
params ={ "external_table" :"checkpoints",
"sql_limit" :"100000",
"producer_batch_size" :"100000",
"worker_batch_size" :"50000",
"sql_limit" :"150000",
"producer_batch_size" :"150000",
"worker_batch_size" :"75000",
"sql_source" :"{{this.identifier}}",
"order_by_column": "checkpoint_number DESC" }
),

View File

@ -4,9 +4,9 @@
func = 'streamline.udf_bulk_rest_api_v2',
target = "{{this.schema}}.{{this.identifier}}",
params ={ "external_table" :"transactions",
"sql_limit" :"100000",
"producer_batch_size" :"100000",
"worker_batch_size" :"50000",
"sql_limit" :"150000",
"producer_batch_size" :"150000",
"worker_batch_size" :"75000",
"sql_source" :"{{this.identifier}}",
'exploded_key': '["result"]',
"order_by_column": "checkpoint_number DESC" }