From 043b68d42860dce5143927107d29561630fcc878 Mon Sep 17 00:00:00 2001 From: Eric Laurello Date: Tue, 8 Jul 2025 14:59:12 -0400 Subject: [PATCH] add dim_tokens add merge exclude --- models/bronze/bronze_api__coin_metadata.sql | 73 +++++++++++++++++++ models/gold/core/core__dim_tokens.sql | 31 ++++++++ .../gold/core/core__fact_balance_changes.sql | 1 + models/gold/core/core__fact_changes.sql | 1 + models/gold/core/core__fact_checkpoints.sql | 1 + models/gold/core/core__fact_events.sql | 1 + .../core/core__fact_transaction_blocks.sql | 1 + .../core/core__fact_transaction_inputs.sql | 1 + models/gold/core/core__fact_transactions.sql | 1 + models/silver/core/silver__checkpoints.sql | 1 + models/silver/core/silver__coin_types.sql | 33 +++++++++ models/silver/core/silver__transactions.sql | 1 + .../streamline__checkpoints_realtime.sql | 6 +- .../streamline__transactions_realtime.sql | 6 +- 14 files changed, 152 insertions(+), 6 deletions(-) create mode 100644 models/bronze/bronze_api__coin_metadata.sql create mode 100644 models/gold/core/core__dim_tokens.sql create mode 100644 models/silver/core/silver__coin_types.sql diff --git a/models/bronze/bronze_api__coin_metadata.sql b/models/bronze/bronze_api__coin_metadata.sql new file mode 100644 index 0000000..46fa28a --- /dev/null +++ b/models/bronze/bronze_api__coin_metadata.sql @@ -0,0 +1,73 @@ +{{ config ( + materialized = 'incremental', + unique_key = "coin_type", + merge_exclude_columns = ["inserted_timestamp"], + full_refresh = false, + tags = ['silver','core'] +) }} + +WITH coins AS ( + + SELECT + coin_type + FROM + {{ ref('silver__coin_types') }} + +{% if is_incremental() %} +EXCEPT +SELECT + coin_type +FROM + {{ this }} +WHERE + decimals IS NOT NULL --rerun if decimals is null and inserted_timestamp is within the last 7 days (if the token still doesnt have decimals after 7 day then we will stop trying) + OR ( + decimals IS NULL + AND inserted_timestamp < CURRENT_DATE -7 + ) +{% endif %} +LIMIT + 100 +), lq AS ( + SELECT + coin_type, + {{ target.database }}.live.udf_api( + 'POST', + '{Service}/{Authentication}', + OBJECT_CONSTRUCT( + 'Content-Type', + 'application/json', + 'fsc-quantum-state', + 'livequery' + ), + OBJECT_CONSTRUCT( + 'jsonrpc', + '2.0', + 'id', + 1, + 'method', + 'suix_getCoinMetadata', + 'params', + ARRAY_CONSTRUCT( + coin_type + ) + ), + 'Vault/prod/sui/quicknode/mainnet' + ) :data: "result" AS DATA + FROM + coins +) +SELECT + coin_type, + DATA :decimals :: INT AS decimals, + DATA :description :: STRING AS description, + DATA :iconUrl :: STRING AS icon_url, + DATA :name :: STRING AS NAME, + DATA :symbol :: STRING AS symbol, + DATA :id :: STRING AS id, + {{ dbt_utils.generate_surrogate_key(['coin_type']) }} AS coin_metadata_id, + SYSDATE() AS inserted_timestamp, + SYSDATE() AS modified_timestamp, + '{{ invocation_id }}' AS _invocation_id +FROM + lq diff --git a/models/gold/core/core__dim_tokens.sql b/models/gold/core/core__dim_tokens.sql new file mode 100644 index 0000000..472f9b5 --- /dev/null +++ b/models/gold/core/core__dim_tokens.sql @@ -0,0 +1,31 @@ +{{ config ( + materialized = "incremental", + unique_key = "coin_type", + merge_exclude_columns = ["inserted_timestamp"], + tags = ['gold','core'] +) }} + +SELECT + coin_type, + decimals, + symbol, + NAME, + description, + icon_url, + id, + {{ dbt_utils.generate_surrogate_key(['coin_type']) }} AS coin_types_id, + {{ dbt_utils.generate_surrogate_key(['coin_type']) }} AS dim_tokens_id, + SYSDATE() AS inserted_timestamp, + SYSDATE() AS modified_timestamp +FROM + {{ ref('bronze_api__coin_metadata') }} + +{% if is_incremental() %} +WHERE + modified_timestamp >= ( + SELECT + MAX(modified_timestamp) AS modified_timestamp + FROM + {{ this }} + ) +{% endif %} diff --git a/models/gold/core/core__fact_balance_changes.sql b/models/gold/core/core__fact_balance_changes.sql index 85b0846..80946ee 100644 --- a/models/gold/core/core__fact_balance_changes.sql +++ b/models/gold/core/core__fact_balance_changes.sql @@ -3,6 +3,7 @@ unique_key = "fact_transaction_balance_changes_id", cluster_by = ['block_timestamp::DATE'], incremental_predicates = ["dynamic_range_predicate", "block_timestamp::date"], + merge_exclude_columns = ["inserted_timestamp"], tags = ['gold','core'] ) }} diff --git a/models/gold/core/core__fact_changes.sql b/models/gold/core/core__fact_changes.sql index 9801dee..ed3a7c3 100644 --- a/models/gold/core/core__fact_changes.sql +++ b/models/gold/core/core__fact_changes.sql @@ -3,6 +3,7 @@ unique_key = "fact_changes_id", cluster_by = ['block_timestamp::DATE'], incremental_predicates = ["dynamic_range_predicate", "block_timestamp::date"], + merge_exclude_columns = ["inserted_timestamp"], tags = ['gold','core'] ) }} diff --git a/models/gold/core/core__fact_checkpoints.sql b/models/gold/core/core__fact_checkpoints.sql index 33db5fe..eb8e2c8 100644 --- a/models/gold/core/core__fact_checkpoints.sql +++ b/models/gold/core/core__fact_checkpoints.sql @@ -3,6 +3,7 @@ unique_key = "checkpoint_number", cluster_by = ['block_timestamp::DATE'], incremental_predicates = ["dynamic_range_predicate", "block_timestamp::date"], + merge_exclude_columns = ["inserted_timestamp"], tags = ['gold','core'] ) }} diff --git a/models/gold/core/core__fact_events.sql b/models/gold/core/core__fact_events.sql index 6bbe328..d300ee6 100644 --- a/models/gold/core/core__fact_events.sql +++ b/models/gold/core/core__fact_events.sql @@ -3,6 +3,7 @@ unique_key = "fact_events_id", cluster_by = ['block_timestamp::DATE'], incremental_predicates = ["dynamic_range_predicate", "block_timestamp::date"], + merge_exclude_columns = ["inserted_timestamp"], tags = ['gold','core'] ) }} diff --git a/models/gold/core/core__fact_transaction_blocks.sql b/models/gold/core/core__fact_transaction_blocks.sql index 2060764..8e8dca0 100644 --- a/models/gold/core/core__fact_transaction_blocks.sql +++ b/models/gold/core/core__fact_transaction_blocks.sql @@ -3,6 +3,7 @@ unique_key = "tx_digest", cluster_by = ['block_timestamp::DATE'], incremental_predicates = ["dynamic_range_predicate", "block_timestamp::date"], + merge_exclude_columns = ["inserted_timestamp"], tags = ['gold','core'] ) }} diff --git a/models/gold/core/core__fact_transaction_inputs.sql b/models/gold/core/core__fact_transaction_inputs.sql index 41f8eff..03a0ad2 100644 --- a/models/gold/core/core__fact_transaction_inputs.sql +++ b/models/gold/core/core__fact_transaction_inputs.sql @@ -3,6 +3,7 @@ unique_key = "fact_transaction_inputs_id", cluster_by = ['block_timestamp::DATE'], incremental_predicates = ["dynamic_range_predicate", "block_timestamp::date"], + merge_exclude_columns = ["inserted_timestamp"], tags = ['gold','core'] ) }} diff --git a/models/gold/core/core__fact_transactions.sql b/models/gold/core/core__fact_transactions.sql index b260e18..4759178 100644 --- a/models/gold/core/core__fact_transactions.sql +++ b/models/gold/core/core__fact_transactions.sql @@ -3,6 +3,7 @@ unique_key = "fact_transactions_id", cluster_by = ['block_timestamp::DATE'], incremental_predicates = ["dynamic_range_predicate", "block_timestamp::date"], + merge_exclude_columns = ["inserted_timestamp"], tags = ['gold','core'] ) }} diff --git a/models/silver/core/silver__checkpoints.sql b/models/silver/core/silver__checkpoints.sql index 89529f9..31352df 100644 --- a/models/silver/core/silver__checkpoints.sql +++ b/models/silver/core/silver__checkpoints.sql @@ -4,6 +4,7 @@ unique_key = "checkpoint_number", cluster_by = ['modified_timestamp::DATE','block_timestamp::DATE'], incremental_predicates = ["dynamic_range_predicate", "block_timestamp::date"], + merge_exclude_columns = ["inserted_timestamp"], tags = ['silver','core'] ) }} diff --git a/models/silver/core/silver__coin_types.sql b/models/silver/core/silver__coin_types.sql new file mode 100644 index 0000000..a1886ac --- /dev/null +++ b/models/silver/core/silver__coin_types.sql @@ -0,0 +1,33 @@ +-- depends_on: {{ ref('bronze__transactions') }} +{{ config ( + materialized = "incremental", + unique_key = "coin_type", + merge_exclude_columns = ["inserted_timestamp"], + tags = ['silver','core'] +) }} + +WITH coins AS ( + + SELECT + DISTINCT coin_type + FROM + {{ ref('core__fact_balance_changes') }} + +{% if is_incremental() %} +WHERE + modified_timestamp >= ( + SELECT + MAX(modified_timestamp) + FROM + {{ this }} + ) +{% endif %} +) +SELECT + coin_type, + {{ dbt_utils.generate_surrogate_key(['coin_type']) }} AS coin_types_id, + SYSDATE() AS inserted_timestamp, + SYSDATE() AS modified_timestamp, + '{{ invocation_id }}' AS _invocation_id +FROM + coins diff --git a/models/silver/core/silver__transactions.sql b/models/silver/core/silver__transactions.sql index 22bdd2e..016cf76 100644 --- a/models/silver/core/silver__transactions.sql +++ b/models/silver/core/silver__transactions.sql @@ -4,6 +4,7 @@ unique_key = "tx_digest", cluster_by = ['modified_timestamp::DATE','block_timestamp::DATE'], incremental_predicates = ["dynamic_range_predicate", "block_timestamp::date"], + merge_exclude_columns = ["inserted_timestamp"], tags = ['silver','core'] ) }} diff --git a/models/streamline/core/realtime/streamline__checkpoints_realtime.sql b/models/streamline/core/realtime/streamline__checkpoints_realtime.sql index bd5cb80..7ab0b58 100644 --- a/models/streamline/core/realtime/streamline__checkpoints_realtime.sql +++ b/models/streamline/core/realtime/streamline__checkpoints_realtime.sql @@ -4,9 +4,9 @@ func = 'streamline.udf_bulk_rest_api_v2', target = "{{this.schema}}.{{this.identifier}}", params ={ "external_table" :"checkpoints", - "sql_limit" :"100000", - "producer_batch_size" :"100000", - "worker_batch_size" :"50000", + "sql_limit" :"150000", + "producer_batch_size" :"150000", + "worker_batch_size" :"75000", "sql_source" :"{{this.identifier}}", "order_by_column": "checkpoint_number DESC" } ), diff --git a/models/streamline/core/realtime/streamline__transactions_realtime.sql b/models/streamline/core/realtime/streamline__transactions_realtime.sql index 8725549..d5f0d85 100644 --- a/models/streamline/core/realtime/streamline__transactions_realtime.sql +++ b/models/streamline/core/realtime/streamline__transactions_realtime.sql @@ -4,9 +4,9 @@ func = 'streamline.udf_bulk_rest_api_v2', target = "{{this.schema}}.{{this.identifier}}", params ={ "external_table" :"transactions", - "sql_limit" :"100000", - "producer_batch_size" :"100000", - "worker_batch_size" :"50000", + "sql_limit" :"150000", + "producer_batch_size" :"150000", + "worker_batch_size" :"75000", "sql_source" :"{{this.identifier}}", 'exploded_key': '["result"]', "order_by_column": "checkpoint_number DESC" }