From a57fdd80d331ca378c89d309e3e6869619bfa5a8 Mon Sep 17 00:00:00 2001 From: tarikceric Date: Wed, 8 Oct 2025 15:33:08 -0700 Subject: [PATCH 1/7] temp backfill tables --- .../bronze/bronze__checkpoints_backfill.sql | 11 + .../bronze/bronze__transactions_backfill.sql | 11 + .../core/silver__checkpoints_backfill.sql | 69 ++++ .../core/silver__transactions_backfill.sql | 356 ++++++++++++++++++ 4 files changed, 447 insertions(+) create mode 100644 models/bronze/bronze__checkpoints_backfill.sql create mode 100644 models/bronze/bronze__transactions_backfill.sql create mode 100644 models/silver/core/silver__checkpoints_backfill.sql create mode 100644 models/silver/core/silver__transactions_backfill.sql diff --git a/models/bronze/bronze__checkpoints_backfill.sql b/models/bronze/bronze__checkpoints_backfill.sql new file mode 100644 index 0000000..48e17a1 --- /dev/null +++ b/models/bronze/bronze__checkpoints_backfill.sql @@ -0,0 +1,11 @@ +{{ config ( + materialized = 'view' +) }} + +SELECT + VALUE, + EPOCH, + DATA +FROM streamline.sui.checkpoints_backfill +where epoch = 906 +and data:sequence_number::int between 197140000 and 197140050 diff --git a/models/bronze/bronze__transactions_backfill.sql b/models/bronze/bronze__transactions_backfill.sql new file mode 100644 index 0000000..7a41919 --- /dev/null +++ b/models/bronze/bronze__transactions_backfill.sql @@ -0,0 +1,11 @@ +{{ config ( + materialized = 'view' +) }} + +SELECT + VALUE, + EPOCH, + DATA +FROM streamline.sui.transactions_backfill +where epoch = 700 +and value:checkpoint::int = 121948955 diff --git a/models/silver/core/silver__checkpoints_backfill.sql b/models/silver/core/silver__checkpoints_backfill.sql new file mode 100644 index 0000000..5b8f8bf --- /dev/null +++ b/models/silver/core/silver__checkpoints_backfill.sql @@ -0,0 +1,69 @@ +{{ config ( + materialized = "table" +) }} + +WITH parsed_checkpoint_data AS ( + SELECT + -- Parse the VALUE JSON column + PARSE_JSON(VALUE) as value_json, + PARSE_JSON(DATA) as data_json, + EPOCH + FROM sui_dev.bronze.checkpoints_backfill +) + +, + +transformed_checkpoints AS ( + SELECT + -- Extract checkpoint number (sequence_number in new format) + value_json:sequence_number::NUMBER as CHECKPOINT_NUMBER, + + -- Convert timestamp_ms to proper timestamp format + TO_TIMESTAMP(value_json:timestamp_ms::NUMBER / 1000) as BLOCK_TIMESTAMP, + + -- Create partition key (round down checkpoint to nearest 1000) + FLOOR(value_json:sequence_number::NUMBER / 1000) * 1000 as PARTITION_KEY, + + -- Reconstruct the CHECKPOINT_JSON in the existing format + OBJECT_CONSTRUCT( + 'checkpointCommitments', ARRAY_CONSTRUCT(), + 'digest', value_json:checkpoint_digest::STRING, + 'epoch', value_json:epoch::STRING, + 'epochRollingGasCostSummary', OBJECT_CONSTRUCT( + 'computationCost', value_json:computation_cost::STRING, + 'nonRefundableStorageFee', value_json:non_refundable_storage_fee::STRING, + 'storageCost', value_json:storage_cost::STRING, + 'storageRebate', value_json:storage_rebate::STRING + ), + 'networkTotalTransactions', value_json:network_total_transaction::STRING, + 'previousDigest', value_json:previous_checkpoint_digest::STRING, + 'sequenceNumber', value_json:sequence_number::STRING, + 'timestampMs', value_json:timestamp_ms::STRING, + -- Note: transactions array is not available in new format, using empty array + -- In production, you may need to join with transaction data to populate this + 'transactions', ARRAY_CONSTRUCT(), + 'validatorSignature', value_json:validator_signature::STRING + ) as CHECKPOINT_JSON, + + -- Set timestamps + CURRENT_TIMESTAMP() as _INSERTED_TIMESTAMP, + + -- Generate surrogate key similar to existing format + HASH(value_json:checkpoint_digest::STRING) as CHECKPOINTS_ID, + + CURRENT_TIMESTAMP() as INSERTED_TIMESTAMP, + CURRENT_TIMESTAMP() as MODIFIED_TIMESTAMP + + FROM parsed_checkpoint_data +) + +SELECT + CHECKPOINT_NUMBER, + BLOCK_TIMESTAMP, + PARTITION_KEY, + CHECKPOINT_JSON, + _INSERTED_TIMESTAMP, + CHECKPOINTS_ID, + INSERTED_TIMESTAMP, + MODIFIED_TIMESTAMP +FROM transformed_checkpoints \ No newline at end of file diff --git a/models/silver/core/silver__transactions_backfill.sql b/models/silver/core/silver__transactions_backfill.sql new file mode 100644 index 0000000..aaed69a --- /dev/null +++ b/models/silver/core/silver__transactions_backfill.sql @@ -0,0 +1,356 @@ +{{ config ( + materialized = "table" +) }} + +with parsed_new_data AS ( + SELECT + -- Parse the VALUE JSON column + PARSE_JSON(VALUE) as value_json, + PARSE_JSON(DATA) as data_json, + EPOCH + FROM sui_dev.bronze.transactions_backfill +), + +-- Extract modified at versions +modified_at_versions AS ( + SELECT + value_json:transaction_digest::STRING as tx_digest, + ARRAY_AGG( + OBJECT_CONSTRUCT( + 'objectId', obj.value[0]::STRING, + 'sequenceNumber', obj.value[1]:input_state:Exist[0][0]::STRING + ) + ) as modified_versions_array + FROM parsed_new_data, + TABLE(FLATTEN(PARSE_JSON(value_json:effects_json):V2:changed_objects)) obj + GROUP BY value_json:transaction_digest::STRING +), + +-- Extract mutated objects +mutated_objects AS ( + SELECT + value_json:transaction_digest::STRING as tx_digest, + ARRAY_AGG( + OBJECT_CONSTRUCT( + 'owner', + CASE + WHEN obj.value[1]:output_state:ObjectWrite[1]:Shared IS NOT NULL + THEN OBJECT_CONSTRUCT( + 'Shared', obj.value[1]:output_state:ObjectWrite[1]:Shared + ) + WHEN obj.value[1]:output_state:ObjectWrite[1]:ObjectOwner IS NOT NULL + THEN OBJECT_CONSTRUCT( + 'ObjectOwner', obj.value[1]:output_state:ObjectWrite[1]:ObjectOwner::STRING + ) + WHEN obj.value[1]:output_state:ObjectWrite[1]:AddressOwner IS NOT NULL + THEN OBJECT_CONSTRUCT( + 'AddressOwner', obj.value[1]:output_state:ObjectWrite[1]:AddressOwner::STRING + ) + ELSE OBJECT_CONSTRUCT('AddressOwner', value_json:sender::STRING) + END, + 'reference', OBJECT_CONSTRUCT( + 'digest', obj.value[1]:output_state:ObjectWrite[0]::STRING, + 'objectId', obj.value[0]::STRING, + 'version', PARSE_JSON(value_json:effects_json):V2:lamport_version::NUMBER + ) + ) + ) as mutated_array + FROM parsed_new_data, + TABLE(FLATTEN(PARSE_JSON(value_json:effects_json):V2:changed_objects)) obj + GROUP BY value_json:transaction_digest::STRING +), + +-- Extract shared objects +shared_objects AS ( + SELECT + tx_digest, + ARRAY_AGG(shared_obj) as shared_objects_array + FROM ( + -- Unchanged shared objects + SELECT + value_json:transaction_digest::STRING as tx_digest, + OBJECT_CONSTRUCT( + 'digest', ush.value[1]:ReadOnlyRoot[1]::STRING, + 'objectId', ush.value[0]::STRING, + 'version', ush.value[1]:ReadOnlyRoot[0]::NUMBER + ) as shared_obj + FROM parsed_new_data, + TABLE(FLATTEN(PARSE_JSON(value_json:effects_json):V2:unchanged_shared_objects)) ush + WHERE PARSE_JSON(value_json:effects_json):V2:unchanged_shared_objects IS NOT NULL + + UNION ALL + + -- Changed shared objects + SELECT + value_json:transaction_digest::STRING as tx_digest, + OBJECT_CONSTRUCT( + 'digest', ch.value[1]:input_state:Exist[0][1]::STRING, + 'objectId', ch.value[0]::STRING, + 'version', ch.value[1]:input_state:Exist[0][0]::NUMBER + ) as shared_obj + FROM parsed_new_data, + TABLE(FLATTEN(PARSE_JSON(value_json:effects_json):V2:changed_objects)) ch + WHERE ch.value[1]:input_state:Exist[1]:Shared IS NOT NULL + ) + GROUP BY tx_digest +), + +-- Extract object changes +object_changes AS ( + SELECT + value_json:transaction_digest::STRING as tx_digest, + ARRAY_AGG( + OBJECT_CONSTRUCT( + 'digest', obj.value[1]:output_state:ObjectWrite[0]::STRING, + 'objectId', obj.value[0]::STRING, + 'objectType', 'unknown', -- Default, would need object type mapping + 'owner', + CASE + WHEN obj.value[1]:output_state:ObjectWrite[1]:Shared IS NOT NULL + THEN OBJECT_CONSTRUCT( + 'Shared', obj.value[1]:output_state:ObjectWrite[1]:Shared + ) + WHEN obj.value[1]:output_state:ObjectWrite[1]:ObjectOwner IS NOT NULL + THEN OBJECT_CONSTRUCT( + 'ObjectOwner', obj.value[1]:output_state:ObjectWrite[1]:ObjectOwner::STRING + ) + WHEN obj.value[1]:output_state:ObjectWrite[1]:AddressOwner IS NOT NULL + THEN OBJECT_CONSTRUCT( + 'AddressOwner', obj.value[1]:output_state:ObjectWrite[1]:AddressOwner::STRING + ) + ELSE OBJECT_CONSTRUCT('AddressOwner', value_json:sender::STRING) + END, + 'previousVersion', obj.value[1]:input_state:Exist[0][0]::STRING, + 'sender', value_json:sender::STRING, + 'type', 'mutated', + 'version', PARSE_JSON(value_json:effects_json):V2:lamport_version::STRING + ) + ) as object_changes_array + FROM parsed_new_data, + TABLE(FLATTEN(PARSE_JSON(value_json:effects_json):V2:changed_objects)) obj + GROUP BY value_json:transaction_digest::STRING +), + +-- Extract transaction inputs +transaction_inputs AS ( + SELECT + value_json:transaction_digest::STRING as tx_digest, + ARRAY_AGG( + CASE + WHEN inp.value:Object:SharedObject IS NOT NULL + THEN OBJECT_CONSTRUCT( + 'initialSharedVersion', inp.value:Object:SharedObject:initial_shared_version::STRING, + 'mutable', inp.value:Object:SharedObject:mutable::BOOLEAN, + 'objectId', inp.value:Object:SharedObject:id::STRING, + 'objectType', 'sharedObject', + 'type', 'object' + ) + WHEN inp.value:Pure IS NOT NULL + THEN OBJECT_CONSTRUCT( + 'type', 'pure', + 'value', '969745335973357872526821071118', -- Would need to decode Pure bytes + 'valueType', 'u128' + ) + ELSE OBJECT_CONSTRUCT('type', 'unknown') + END + ) as inputs_array + FROM parsed_new_data, + TABLE(FLATTEN(PARSE_JSON(value_json:transaction_json):data[0]:intent_message:value:V1:kind:ProgrammableTransaction:inputs)) inp + WHERE PARSE_JSON(value_json:transaction_json):data[0]:intent_message:value:V1:kind:ProgrammableTransaction:inputs IS NOT NULL + GROUP BY value_json:transaction_digest::STRING +), + +-- Extract transaction commands (simplified without type arguments for now) +transaction_commands AS ( + SELECT + value_json:transaction_digest::STRING as tx_digest, + ARRAY_AGG( + CASE + WHEN cmd.value:MoveCall IS NOT NULL + THEN OBJECT_CONSTRUCT( + 'MoveCall', OBJECT_CONSTRUCT( + 'arguments', cmd.value:MoveCall:arguments, + 'function', cmd.value:MoveCall:function::STRING, + 'module', cmd.value:MoveCall:module::STRING, + 'package', cmd.value:MoveCall:package::STRING, + 'type_arguments', cmd.value:MoveCall:type_arguments -- Keep original format for now + ) + ) + ELSE cmd.value + END + ) as commands_array + FROM parsed_new_data, + TABLE(FLATTEN(PARSE_JSON(value_json:transaction_json):data[0]:intent_message:value:V1:kind:ProgrammableTransaction:commands)) cmd + WHERE PARSE_JSON(value_json:transaction_json):data[0]:intent_message:value:V1:kind:ProgrammableTransaction:commands IS NOT NULL + GROUP BY value_json:transaction_digest::STRING +), + +transformed_data AS ( + SELECT + -- Extract checkpoint number from VALUE JSON + p.value_json:checkpoint::NUMBER as CHECKPOINT_NUMBER, + + -- Extract transaction digest from VALUE JSON + p.value_json:transaction_digest::STRING as TX_DIGEST, + + -- Convert timestamp_ms to proper timestamp format + TO_TIMESTAMP(p.value_json:timestamp_ms::NUMBER / 1000) as BLOCK_TIMESTAMP, + + -- Create partition key (round down checkpoint to nearest 10000) + FLOOR(p.value_json:checkpoint::NUMBER / 10000) * 10000 as PARTITION_KEY, + + -- Reconstruct the TRANSACTION_JSON in the existing format + OBJECT_CONSTRUCT( + -- Balance Changes - calculate from total gas cost + 'balanceChanges', + CASE + WHEN p.value_json:total_gas_cost::NUMBER > 0 + THEN ARRAY_CONSTRUCT( + OBJECT_CONSTRUCT( + 'amount', '-' || p.value_json:total_gas_cost::STRING, + 'coinType', '0x2::sui::SUI', + 'owner', OBJECT_CONSTRUCT( + 'AddressOwner', p.value_json:sender::STRING + ) + ) + ) + ELSE ARRAY_CONSTRUCT() + END, + + 'checkpoint', p.value_json:checkpoint::STRING, + 'digest', p.value_json:transaction_digest::STRING, + + 'effects', OBJECT_CONSTRUCT( + 'dependencies', + CASE + WHEN PARSE_JSON(p.value_json:effects_json):V2:dependencies IS NOT NULL + THEN PARSE_JSON(p.value_json:effects_json):V2:dependencies + ELSE ARRAY_CONSTRUCT() + END, + + -- Events Digest + 'eventsDigest', p.value_json:events_digest::STRING, + + 'executedEpoch', p.value_json:epoch::STRING, + + 'gasObject', OBJECT_CONSTRUCT( + 'owner', OBJECT_CONSTRUCT( + 'AddressOwner', p.value_json:gas_owner::STRING + ), + 'reference', OBJECT_CONSTRUCT( + 'digest', p.value_json:gas_object_digest::STRING, + 'objectId', p.value_json:gas_object_id::STRING, + 'version', p.value_json:gas_object_sequence::NUMBER + ) + ), + + 'gasUsed', OBJECT_CONSTRUCT( + 'computationCost', p.value_json:computation_cost::STRING, + 'nonRefundableStorageFee', p.value_json:non_refundable_storage_fee::STRING, + 'storageCost', p.value_json:storage_cost::STRING, + 'storageRebate', p.value_json:storage_rebate::STRING + ), + + 'messageVersion', 'v1', + + -- Use pre-computed arrays + 'modifiedAtVersions', COALESCE(mav.modified_versions_array, ARRAY_CONSTRUCT()), + 'mutated', COALESCE(mo.mutated_array, ARRAY_CONSTRUCT()), + 'sharedObjects', COALESCE(so.shared_objects_array, ARRAY_CONSTRUCT()), + + 'status', OBJECT_CONSTRUCT( + 'status', + CASE + WHEN p.value_json:execution_success::BOOLEAN = TRUE THEN 'success' + ELSE 'failure' + END + ), + 'transactionDigest', p.value_json:transaction_digest::STRING + ), + + -- Events - Note: This would need to be parsed from a separate events data source + 'events', ARRAY_CONSTRUCT(), + + -- Object Changes - use pre-computed array + 'objectChanges', COALESCE(oc.object_changes_array, ARRAY_CONSTRUCT()), + + 'timestampMs', p.value_json:timestamp_ms::STRING, + + -- Transaction data + 'transaction', OBJECT_CONSTRUCT( + 'data', OBJECT_CONSTRUCT( + 'gasData', OBJECT_CONSTRUCT( + 'budget', p.value_json:gas_budget::STRING, + 'owner', p.value_json:gas_owner::STRING, + 'payment', ARRAY_CONSTRUCT( + OBJECT_CONSTRUCT( + 'digest', + CASE + WHEN PARSE_JSON(p.value_json:transaction_json):data[0]:intent_message:value:V1:gas_data:payment[0][2] IS NOT NULL + THEN PARSE_JSON(p.value_json:transaction_json):data[0]:intent_message:value:V1:gas_data:payment[0][2]::STRING + ELSE p.value_json:gas_object_digest::STRING + END, + 'objectId', p.value_json:gas_object_id::STRING, + 'version', + CASE + WHEN PARSE_JSON(p.value_json:transaction_json):data[0]:intent_message:value:V1:gas_data:payment[0][1] IS NOT NULL + THEN PARSE_JSON(p.value_json:transaction_json):data[0]:intent_message:value:V1:gas_data:payment[0][1]::NUMBER + ELSE p.value_json:gas_object_sequence::NUMBER + END + ) + ), + 'price', p.value_json:gas_price::STRING + ), + 'messageVersion', 'v1', + 'sender', p.value_json:sender::STRING, + + -- Transaction details - use pre-computed arrays with fixed type arguments + 'transaction', + CASE + WHEN PARSE_JSON(p.value_json:transaction_json):data[0]:intent_message:value:V1:kind:ProgrammableTransaction IS NOT NULL + THEN OBJECT_CONSTRUCT( + 'inputs', COALESCE(ti.inputs_array, ARRAY_CONSTRUCT()), + 'kind', 'ProgrammableTransaction', + 'transactions', COALESCE(tc.commands_array, ARRAY_CONSTRUCT()) + ) + ELSE OBJECT_CONSTRUCT() + END + ), + 'txSignatures', + CASE + WHEN PARSE_JSON(p.value_json:transaction_json):data[0]:tx_signatures IS NOT NULL + THEN PARSE_JSON(p.value_json:transaction_json):data[0]:tx_signatures + ELSE ARRAY_CONSTRUCT() + END + ) + ) as TRANSACTION_JSON, + + -- Set timestamps + CURRENT_TIMESTAMP() as _INSERTED_TIMESTAMP, + + -- Generate surrogate key similar to existing format + HASH(p.value_json:transaction_digest::STRING) as TRANSACTIONS_ID, + + CURRENT_TIMESTAMP() as INSERTED_TIMESTAMP, + CURRENT_TIMESTAMP() as MODIFIED_TIMESTAMP + + FROM parsed_new_data p + LEFT JOIN modified_at_versions mav ON p.value_json:transaction_digest::STRING = mav.tx_digest + LEFT JOIN mutated_objects mo ON p.value_json:transaction_digest::STRING = mo.tx_digest + LEFT JOIN shared_objects so ON p.value_json:transaction_digest::STRING = so.tx_digest + LEFT JOIN object_changes oc ON p.value_json:transaction_digest::STRING = oc.tx_digest + LEFT JOIN transaction_inputs ti ON p.value_json:transaction_digest::STRING = ti.tx_digest + LEFT JOIN transaction_commands tc ON p.value_json:transaction_digest::STRING = tc.tx_digest +) + +SELECT + CHECKPOINT_NUMBER, + TX_DIGEST, + BLOCK_TIMESTAMP, + PARTITION_KEY, + TRANSACTION_JSON, + _INSERTED_TIMESTAMP, + TRANSACTIONS_ID, + INSERTED_TIMESTAMP, + MODIFIED_TIMESTAMP +FROM transformed_data \ No newline at end of file From 5dcec2b19773406c55d45f6d8a0af88b7bd1e19e Mon Sep 17 00:00:00 2001 From: Eric Laurello Date: Fri, 10 Oct 2025 11:27:14 -0400 Subject: [PATCH 2/7] prep data for gold, no combined silver table --- .../bronze/bronze__transactions_backfill.sql | 10 ++-- models/gold/core/core__fact_checkpoints.sql | 21 ++++++++ .../core/core__fact_transaction_blocks.sql | 28 ++++++++++- .../core/backfill/silver__checkpoints_b.sql | 25 ++++++++++ .../backfill/silver__transaction_blocks_b.sql | 48 +++++++++++++++++++ 5 files changed, 127 insertions(+), 5 deletions(-) create mode 100644 models/silver/core/backfill/silver__checkpoints_b.sql create mode 100644 models/silver/core/backfill/silver__transaction_blocks_b.sql diff --git a/models/bronze/bronze__transactions_backfill.sql b/models/bronze/bronze__transactions_backfill.sql index 7a41919..56122e9 100644 --- a/models/bronze/bronze__transactions_backfill.sql +++ b/models/bronze/bronze__transactions_backfill.sql @@ -4,8 +4,10 @@ SELECT VALUE, - EPOCH, + epoch, DATA -FROM streamline.sui.transactions_backfill -where epoch = 700 -and value:checkpoint::int = 121948955 +FROM + streamline.sui.transactions_backfill +WHERE + epoch = 700 + AND VALUE :checkpoint :: INT = 121932315 diff --git a/models/gold/core/core__fact_checkpoints.sql b/models/gold/core/core__fact_checkpoints.sql index eb8e2c8..584116d 100644 --- a/models/gold/core/core__fact_checkpoints.sql +++ b/models/gold/core/core__fact_checkpoints.sql @@ -33,3 +33,24 @@ WHERE FROM {{ this }}) {% endif %} + + {# {% if is_incremental() %} + {% else %} + #} + UNION ALL + SELECT + checkpoint_number, + block_timestamp, + epoch, + checkpoint_digest, + previous_digest, + network_total_transactions, + validator_signature, + tx_count, + ARRAY_CONSTRUCT() AS transactions_array, + {{ dbt_utils.generate_surrogate_key(['checkpoint_number']) }} AS fact_checkpoints_id, + SYSDATE() AS inserted_timestamp, + SYSDATE() AS modified_timestamp + FROM + {{ ref('silver__checkpoints_b') }} + {# {% endif %} #} diff --git a/models/gold/core/core__fact_transaction_blocks.sql b/models/gold/core/core__fact_transaction_blocks.sql index 31338b5..7c8a9f8 100644 --- a/models/gold/core/core__fact_transaction_blocks.sql +++ b/models/gold/core/core__fact_transaction_blocks.sql @@ -49,7 +49,33 @@ WHERE FROM {{ this }}) {% endif %} - ) + +{% if is_incremental() %} +{% else %} + UNION ALL + SELECT + SELECT + checkpoint_number, + block_timestamp, + tx_digest, + tx_kind, + tx_sender, + message_version, + tx_fee, + tx_succeeded, + tx_error, + tx_dependencies, + gas_used_computation_fee, + gas_used_non_refundable_storage_fee, + gas_used_storage_fee, + gas_used_storage_rebate, + gas_price, + gas_budget, + gas_owner + FROM + {{ ref('silver__transaction_blocks_b') }} + {% endif %} +) SELECT checkpoint_number, block_timestamp, diff --git a/models/silver/core/backfill/silver__checkpoints_b.sql b/models/silver/core/backfill/silver__checkpoints_b.sql new file mode 100644 index 0000000..654af7d --- /dev/null +++ b/models/silver/core/backfill/silver__checkpoints_b.sql @@ -0,0 +1,25 @@ +{{ config ( + materialized = "table" +) }} + +WITH parsed_checkpoint_data AS ( + + SELECT + -- Parse the VALUE JSON column + PARSE_JSON(VALUE) AS value_json + FROM + sui_dev.bronze.checkpoints_backfill +) +SELECT + value_json :sequence_number :: NUMBER AS checkpoint_number, + TO_TIMESTAMP( + value_json :timestamp_ms :: NUMBER / 1000 + ) AS block_timestamp, + value_json :epoch :: STRING AS epoch, + value_json :checkpoint_digest :: STRING AS checkpoint_digest, + value_json :previous_checkpoint_digest :: STRING previous_digest, + value_json :network_total_transaction :: STRING network_total_transactions, + value_json :validator_signature :: STRING AS validator_signature, + value_json :total_transactions :: INT AS tx_count +FROM + parsed_checkpoint_data diff --git a/models/silver/core/backfill/silver__transaction_blocks_b.sql b/models/silver/core/backfill/silver__transaction_blocks_b.sql new file mode 100644 index 0000000..cc4e799 --- /dev/null +++ b/models/silver/core/backfill/silver__transaction_blocks_b.sql @@ -0,0 +1,48 @@ +{{ config ( + materialized = "table" +) }} + +WITH base AS ( + + SELECT + PARSE_JSON(VALUE) AS value_json, + PARSE_JSON( + VALUE :transaction_json + ) AS transaction_json, + PARSE_JSON( + VALUE :effects_json + ) AS efx_json + FROM + sui_dev.bronze.transactions_backfill +) +SELECT + value_json :checkpoint :: NUMBER AS checkpoint_number, + TO_TIMESTAMP( + value_json :timestamp_ms :: NUMBER / 1000 + ) AS block_timestamp, + value_json: transaction_digest :: STRING AS tx_digest, + value_json: "transaction_kind" :: STRING AS tx_kind, + value_json :"sender" :: STRING AS tx_sender, + transaction_json :"data" [0] :"intent_message" :"intent" :"version" :: STRING AS message_version, + -- TBD if this is real + CASE + WHEN efx_json :"V2" :"status" = 'Success' THEN TRUE + ELSE FALSE + END AS tx_succeeded, + efx_json :"V2" :"status" :"Failure" :"error" :: STRING AS tx_error, + efx_json: "V2" :"dependencies" AS tx_dependencies, + value_json :"computation_cost" :: bigint AS gas_used_computation_fee, + value_json: "non_refundable_storage_fee" :: bigint AS gas_used_non_refundable_storage_fee, + value_json: "storage_cost" :: bigint AS gas_used_storage_fee, + value_json: "storage_rebate" :: bigint AS gas_used_storage_rebate, + value_json :"gas_budget" :: bigint AS gas_budget, + value_json :"gas_owner" :: STRING AS gas_owner, + value_json :"gas_price" :: bigint AS gas_price, + ( + gas_used_computation_fee + gas_used_storage_fee - gas_used_storage_rebate + ) / pow( + 10, + 9 + ) AS tx_fee +FROM + base From 8f7e87b71a99d38ac167ad9ef7e85349616dcecb Mon Sep 17 00:00:00 2001 From: Eric Laurello Date: Fri, 10 Oct 2025 16:19:13 -0400 Subject: [PATCH 3/7] Implement parallel backfill pipeline for historical Sui data MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This change introduces a separate backfill data pipeline to ingest historical Sui blockchain data from an alternative source while preserving the existing real-time ingestion pipeline. Architecture: - Bronze layer: View-based models pointing to streamline.sui backfill tables - Silver layer: Table-materialized transformations with backfill-specific JSON parsing - Gold layer: UNION ALL queries combining real-time and backfill data streams The backfill pipeline handles different JSON structures from the alternative data source, particularly for transaction and checkpoint parsing. Gold models now merge both data streams without modifying the incremental processing logic for the primary real-time pipeline. Current scope: Epoch 628, checkpoints < 96605300 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude --- .../backfill/bronze__checkpoints_backfill.sql | 14 + .../backfill/bronze__events_backfill.sql | 14 + .../bronze__transactions_backfill.sql | 4 +- .../bronze/bronze__checkpoints_backfill.sql | 11 - models/gold/core/core__fact_checkpoints.sql | 2 +- models/gold/core/core__fact_events.sql | 27 ++ .../core/core__fact_transaction_blocks.sql | 52 +-- models/gold/core/core__fact_transactions.sql | 19 + ...b.sql => silver__checkpoints_backfill.sql} | 7 +- .../core/backfill/silver__events_backfill.sql | 32 ++ ...> silver__transaction_blocks_backfill.sql} | 6 +- .../silver__transactions_backfill.sql | 48 +++ .../core/silver__checkpoints_backfill.sql | 69 ---- .../core/silver__transactions_backfill.sql | 356 ------------------ 14 files changed, 191 insertions(+), 470 deletions(-) create mode 100644 models/bronze/backfill/bronze__checkpoints_backfill.sql create mode 100644 models/bronze/backfill/bronze__events_backfill.sql rename models/bronze/{ => backfill}/bronze__transactions_backfill.sql (68%) delete mode 100644 models/bronze/bronze__checkpoints_backfill.sql rename models/silver/core/backfill/{silver__checkpoints_b.sql => silver__checkpoints_backfill.sql} (78%) create mode 100644 models/silver/core/backfill/silver__events_backfill.sql rename models/silver/core/backfill/{silver__transaction_blocks_b.sql => silver__transaction_blocks_backfill.sql} (91%) create mode 100644 models/silver/core/backfill/silver__transactions_backfill.sql delete mode 100644 models/silver/core/silver__checkpoints_backfill.sql delete mode 100644 models/silver/core/silver__transactions_backfill.sql diff --git a/models/bronze/backfill/bronze__checkpoints_backfill.sql b/models/bronze/backfill/bronze__checkpoints_backfill.sql new file mode 100644 index 0000000..bcef4e2 --- /dev/null +++ b/models/bronze/backfill/bronze__checkpoints_backfill.sql @@ -0,0 +1,14 @@ +{{ config ( + materialized = 'view' +) }} + +SELECT + VALUE, + epoch, + DATA +FROM + streamline.sui.checkpoints_backfill +WHERE + epoch <= 629 + AND checkpoint_number < 96605300 + AND epoch = 628 diff --git a/models/bronze/backfill/bronze__events_backfill.sql b/models/bronze/backfill/bronze__events_backfill.sql new file mode 100644 index 0000000..93a9201 --- /dev/null +++ b/models/bronze/backfill/bronze__events_backfill.sql @@ -0,0 +1,14 @@ +{{ config ( + materialized = 'view' +) }} + +SELECT + VALUE, + epoch, + DATA +FROM + streamline.sui.events +WHERE + epoch <= 629 + AND VALUE: checkpoint :: INT < 96605300 + AND epoch = 628 diff --git a/models/bronze/bronze__transactions_backfill.sql b/models/bronze/backfill/bronze__transactions_backfill.sql similarity index 68% rename from models/bronze/bronze__transactions_backfill.sql rename to models/bronze/backfill/bronze__transactions_backfill.sql index 56122e9..728649c 100644 --- a/models/bronze/bronze__transactions_backfill.sql +++ b/models/bronze/backfill/bronze__transactions_backfill.sql @@ -9,5 +9,5 @@ SELECT FROM streamline.sui.transactions_backfill WHERE - epoch = 700 - AND VALUE :checkpoint :: INT = 121932315 + epoch <= 629 + AND checkpoint_number < 96605300 diff --git a/models/bronze/bronze__checkpoints_backfill.sql b/models/bronze/bronze__checkpoints_backfill.sql deleted file mode 100644 index 48e17a1..0000000 --- a/models/bronze/bronze__checkpoints_backfill.sql +++ /dev/null @@ -1,11 +0,0 @@ -{{ config ( - materialized = 'view' -) }} - -SELECT - VALUE, - EPOCH, - DATA -FROM streamline.sui.checkpoints_backfill -where epoch = 906 -and data:sequence_number::int between 197140000 and 197140050 diff --git a/models/gold/core/core__fact_checkpoints.sql b/models/gold/core/core__fact_checkpoints.sql index 584116d..e73147d 100644 --- a/models/gold/core/core__fact_checkpoints.sql +++ b/models/gold/core/core__fact_checkpoints.sql @@ -52,5 +52,5 @@ WHERE SYSDATE() AS inserted_timestamp, SYSDATE() AS modified_timestamp FROM - {{ ref('silver__checkpoints_b') }} + {{ ref('silver__checkpoints_backfill') }} {# {% endif %} #} diff --git a/models/gold/core/core__fact_events.sql b/models/gold/core/core__fact_events.sql index 2ad80d4..4c888ec 100644 --- a/models/gold/core/core__fact_events.sql +++ b/models/gold/core/core__fact_events.sql @@ -41,6 +41,33 @@ WHERE FROM {{ this }}) {% endif %} + + {# {% if is_incremental() %} + {% else %} + #} + UNION ALL + SELECT + checkpoint_number, + block_timestamp, + tx_digest, + tx_kind, + tx_sender, + message_version, + tx_succeeded, + NULL AS event_value, + event_index, + TYPE, + package_id, + transaction_module, + sender, + parsed_json + FROM + {{ ref('silver__events_backfill') }} A + JOIN {{ ref('silver__transactions_backfill') }} + b USING ( + checkpoint_number, + tx_digest + ) {# {% endif %} #} ) SELECT checkpoint_number, diff --git a/models/gold/core/core__fact_transaction_blocks.sql b/models/gold/core/core__fact_transaction_blocks.sql index 7c8a9f8..bf2edfb 100644 --- a/models/gold/core/core__fact_transaction_blocks.sql +++ b/models/gold/core/core__fact_transaction_blocks.sql @@ -50,32 +50,32 @@ WHERE {{ this }}) {% endif %} -{% if is_incremental() %} -{% else %} - UNION ALL - SELECT - SELECT - checkpoint_number, - block_timestamp, - tx_digest, - tx_kind, - tx_sender, - message_version, - tx_fee, - tx_succeeded, - tx_error, - tx_dependencies, - gas_used_computation_fee, - gas_used_non_refundable_storage_fee, - gas_used_storage_fee, - gas_used_storage_rebate, - gas_price, - gas_budget, - gas_owner - FROM - {{ ref('silver__transaction_blocks_b') }} - {% endif %} -) + {# {% if is_incremental() %} + {% else %} + #} + UNION ALL + SELECT + checkpoint_number, + block_timestamp, + tx_digest, + tx_kind, + tx_sender, + message_version, + tx_succeeded, + tx_error, + tx_dependencies, + gas_used_computation_fee, + gas_used_non_refundable_storage_fee, + gas_used_storage_fee, + gas_used_storage_rebate, + gas_budget, + gas_owner, + gas_price, + tx_fee + FROM + {{ ref('silver__transaction_blocks_backfill') }} + {# {% endif %} #} + ) SELECT checkpoint_number, block_timestamp, diff --git a/models/gold/core/core__fact_transactions.sql b/models/gold/core/core__fact_transactions.sql index 4759178..d898cdc 100644 --- a/models/gold/core/core__fact_transactions.sql +++ b/models/gold/core/core__fact_transactions.sql @@ -40,6 +40,25 @@ WHERE FROM {{ this }}) {% endif %} + + {# {% if is_incremental() %} + {% else %} + #} + UNION ALL + SELECT + checkpoint_number, + block_timestamp, + tx_digest, + tx_kind, + tx_sender, + message_version, + tx_succeeded, + payload_index, + payload_type, + payload_details + FROM + {{ ref('silver__transactions_backfill') }} + {# {% endif %} #} ) SELECT checkpoint_number, diff --git a/models/silver/core/backfill/silver__checkpoints_b.sql b/models/silver/core/backfill/silver__checkpoints_backfill.sql similarity index 78% rename from models/silver/core/backfill/silver__checkpoints_b.sql rename to models/silver/core/backfill/silver__checkpoints_backfill.sql index 654af7d..35740db 100644 --- a/models/silver/core/backfill/silver__checkpoints_b.sql +++ b/models/silver/core/backfill/silver__checkpoints_backfill.sql @@ -5,10 +5,9 @@ WITH parsed_checkpoint_data AS ( SELECT - -- Parse the VALUE JSON column PARSE_JSON(VALUE) AS value_json FROM - sui_dev.bronze.checkpoints_backfill + {{ ref('bronze__checkpoints_backfill') }} ) SELECT value_json :sequence_number :: NUMBER AS checkpoint_number, @@ -20,6 +19,8 @@ SELECT value_json :previous_checkpoint_digest :: STRING previous_digest, value_json :network_total_transaction :: STRING network_total_transactions, value_json :validator_signature :: STRING AS validator_signature, - value_json :total_transactions :: INT AS tx_count + value_json :total_transactions :: INT AS tx_count, + SYSDATE() AS inserted_timestamp, + '{{ invocation_id }}' AS _invocation_id FROM parsed_checkpoint_data diff --git a/models/silver/core/backfill/silver__events_backfill.sql b/models/silver/core/backfill/silver__events_backfill.sql new file mode 100644 index 0000000..0d7924e --- /dev/null +++ b/models/silver/core/backfill/silver__events_backfill.sql @@ -0,0 +1,32 @@ +{{ config ( + materialized = "table" +) }} + +WITH base AS ( + + SELECT + PARSE_JSON( + A.value + ) AS value_json, + PARSE_JSON( + A.value :event_json + ) AS event_json + FROM + {{ ref('bronze__events_backfill') }} A +) +SELECT + value_json :checkpoint :: INT AS checkpoint_number, + TO_TIMESTAMP( + value_json :timestamp_ms :: NUMBER / 1000 + ) AS block_timestamp, + value_json: transaction_digest :: STRING AS tx_digest, + value_json :event_index :: INT AS event_index, + value_json :"package" :: STRING AS package_id, + value_json :"module" :: STRING AS transaction_module, + value_json :"sender" :: STRING AS sender, + value_json :"event_type" :: STRING AS TYPE, + event_json AS parsed_json, + SYSDATE() AS inserted_timestamp, + '{{ invocation_id }}' AS _invocation_id +FROM + base diff --git a/models/silver/core/backfill/silver__transaction_blocks_b.sql b/models/silver/core/backfill/silver__transaction_blocks_backfill.sql similarity index 91% rename from models/silver/core/backfill/silver__transaction_blocks_b.sql rename to models/silver/core/backfill/silver__transaction_blocks_backfill.sql index cc4e799..ee78c48 100644 --- a/models/silver/core/backfill/silver__transaction_blocks_b.sql +++ b/models/silver/core/backfill/silver__transaction_blocks_backfill.sql @@ -13,7 +13,7 @@ WITH base AS ( VALUE :effects_json ) AS efx_json FROM - sui_dev.bronze.transactions_backfill + {{ ref('bronze__transactions_backfill') }} ) SELECT value_json :checkpoint :: NUMBER AS checkpoint_number, @@ -43,6 +43,8 @@ SELECT ) / pow( 10, 9 - ) AS tx_fee + ) AS tx_fee, + SYSDATE() AS inserted_timestamp, + '{{ invocation_id }}' AS _invocation_id FROM base diff --git a/models/silver/core/backfill/silver__transactions_backfill.sql b/models/silver/core/backfill/silver__transactions_backfill.sql new file mode 100644 index 0000000..ab69c68 --- /dev/null +++ b/models/silver/core/backfill/silver__transactions_backfill.sql @@ -0,0 +1,48 @@ +{{ config ( + materialized = "table" +) }} + +WITH base AS ( + + SELECT + PARSE_JSON( + A.value + ) AS value_json, + PARSE_JSON( + A.value :transaction_json + ) AS transaction_json, + PARSE_JSON( + A.value :effects_json + ) AS efx_json, + b.index AS payload_index, + C.key AS payload_type, + C.value AS payload_details + FROM + {{ ref('bronze__transactions_backfill') }} A, + LATERAL FLATTEN( + transaction_json :data [0] :intent_message :value :"V1" :kind :ProgrammableTransaction :commands + ) b, + LATERAL FLATTEN( + b.value + ) C +) +SELECT + value_json :checkpoint :: NUMBER AS checkpoint_number, + TO_TIMESTAMP( + value_json :timestamp_ms :: NUMBER / 1000 + ) AS block_timestamp, + value_json: transaction_digest :: STRING AS tx_digest, + value_json: "transaction_kind" :: STRING AS tx_kind, + value_json :"sender" :: STRING AS tx_sender, + transaction_json :"data" [0] :"intent_message" :"intent" :"version" :: STRING AS message_version, + CASE + WHEN efx_json :"V2" :"status" = 'Success' THEN TRUE + ELSE FALSE + END AS tx_succeeded, + payload_index, + payload_type, + payload_details, + SYSDATE() AS inserted_timestamp, + '{{ invocation_id }}' AS _invocation_id +FROM + base diff --git a/models/silver/core/silver__checkpoints_backfill.sql b/models/silver/core/silver__checkpoints_backfill.sql deleted file mode 100644 index 5b8f8bf..0000000 --- a/models/silver/core/silver__checkpoints_backfill.sql +++ /dev/null @@ -1,69 +0,0 @@ -{{ config ( - materialized = "table" -) }} - -WITH parsed_checkpoint_data AS ( - SELECT - -- Parse the VALUE JSON column - PARSE_JSON(VALUE) as value_json, - PARSE_JSON(DATA) as data_json, - EPOCH - FROM sui_dev.bronze.checkpoints_backfill -) - -, - -transformed_checkpoints AS ( - SELECT - -- Extract checkpoint number (sequence_number in new format) - value_json:sequence_number::NUMBER as CHECKPOINT_NUMBER, - - -- Convert timestamp_ms to proper timestamp format - TO_TIMESTAMP(value_json:timestamp_ms::NUMBER / 1000) as BLOCK_TIMESTAMP, - - -- Create partition key (round down checkpoint to nearest 1000) - FLOOR(value_json:sequence_number::NUMBER / 1000) * 1000 as PARTITION_KEY, - - -- Reconstruct the CHECKPOINT_JSON in the existing format - OBJECT_CONSTRUCT( - 'checkpointCommitments', ARRAY_CONSTRUCT(), - 'digest', value_json:checkpoint_digest::STRING, - 'epoch', value_json:epoch::STRING, - 'epochRollingGasCostSummary', OBJECT_CONSTRUCT( - 'computationCost', value_json:computation_cost::STRING, - 'nonRefundableStorageFee', value_json:non_refundable_storage_fee::STRING, - 'storageCost', value_json:storage_cost::STRING, - 'storageRebate', value_json:storage_rebate::STRING - ), - 'networkTotalTransactions', value_json:network_total_transaction::STRING, - 'previousDigest', value_json:previous_checkpoint_digest::STRING, - 'sequenceNumber', value_json:sequence_number::STRING, - 'timestampMs', value_json:timestamp_ms::STRING, - -- Note: transactions array is not available in new format, using empty array - -- In production, you may need to join with transaction data to populate this - 'transactions', ARRAY_CONSTRUCT(), - 'validatorSignature', value_json:validator_signature::STRING - ) as CHECKPOINT_JSON, - - -- Set timestamps - CURRENT_TIMESTAMP() as _INSERTED_TIMESTAMP, - - -- Generate surrogate key similar to existing format - HASH(value_json:checkpoint_digest::STRING) as CHECKPOINTS_ID, - - CURRENT_TIMESTAMP() as INSERTED_TIMESTAMP, - CURRENT_TIMESTAMP() as MODIFIED_TIMESTAMP - - FROM parsed_checkpoint_data -) - -SELECT - CHECKPOINT_NUMBER, - BLOCK_TIMESTAMP, - PARTITION_KEY, - CHECKPOINT_JSON, - _INSERTED_TIMESTAMP, - CHECKPOINTS_ID, - INSERTED_TIMESTAMP, - MODIFIED_TIMESTAMP -FROM transformed_checkpoints \ No newline at end of file diff --git a/models/silver/core/silver__transactions_backfill.sql b/models/silver/core/silver__transactions_backfill.sql deleted file mode 100644 index aaed69a..0000000 --- a/models/silver/core/silver__transactions_backfill.sql +++ /dev/null @@ -1,356 +0,0 @@ -{{ config ( - materialized = "table" -) }} - -with parsed_new_data AS ( - SELECT - -- Parse the VALUE JSON column - PARSE_JSON(VALUE) as value_json, - PARSE_JSON(DATA) as data_json, - EPOCH - FROM sui_dev.bronze.transactions_backfill -), - --- Extract modified at versions -modified_at_versions AS ( - SELECT - value_json:transaction_digest::STRING as tx_digest, - ARRAY_AGG( - OBJECT_CONSTRUCT( - 'objectId', obj.value[0]::STRING, - 'sequenceNumber', obj.value[1]:input_state:Exist[0][0]::STRING - ) - ) as modified_versions_array - FROM parsed_new_data, - TABLE(FLATTEN(PARSE_JSON(value_json:effects_json):V2:changed_objects)) obj - GROUP BY value_json:transaction_digest::STRING -), - --- Extract mutated objects -mutated_objects AS ( - SELECT - value_json:transaction_digest::STRING as tx_digest, - ARRAY_AGG( - OBJECT_CONSTRUCT( - 'owner', - CASE - WHEN obj.value[1]:output_state:ObjectWrite[1]:Shared IS NOT NULL - THEN OBJECT_CONSTRUCT( - 'Shared', obj.value[1]:output_state:ObjectWrite[1]:Shared - ) - WHEN obj.value[1]:output_state:ObjectWrite[1]:ObjectOwner IS NOT NULL - THEN OBJECT_CONSTRUCT( - 'ObjectOwner', obj.value[1]:output_state:ObjectWrite[1]:ObjectOwner::STRING - ) - WHEN obj.value[1]:output_state:ObjectWrite[1]:AddressOwner IS NOT NULL - THEN OBJECT_CONSTRUCT( - 'AddressOwner', obj.value[1]:output_state:ObjectWrite[1]:AddressOwner::STRING - ) - ELSE OBJECT_CONSTRUCT('AddressOwner', value_json:sender::STRING) - END, - 'reference', OBJECT_CONSTRUCT( - 'digest', obj.value[1]:output_state:ObjectWrite[0]::STRING, - 'objectId', obj.value[0]::STRING, - 'version', PARSE_JSON(value_json:effects_json):V2:lamport_version::NUMBER - ) - ) - ) as mutated_array - FROM parsed_new_data, - TABLE(FLATTEN(PARSE_JSON(value_json:effects_json):V2:changed_objects)) obj - GROUP BY value_json:transaction_digest::STRING -), - --- Extract shared objects -shared_objects AS ( - SELECT - tx_digest, - ARRAY_AGG(shared_obj) as shared_objects_array - FROM ( - -- Unchanged shared objects - SELECT - value_json:transaction_digest::STRING as tx_digest, - OBJECT_CONSTRUCT( - 'digest', ush.value[1]:ReadOnlyRoot[1]::STRING, - 'objectId', ush.value[0]::STRING, - 'version', ush.value[1]:ReadOnlyRoot[0]::NUMBER - ) as shared_obj - FROM parsed_new_data, - TABLE(FLATTEN(PARSE_JSON(value_json:effects_json):V2:unchanged_shared_objects)) ush - WHERE PARSE_JSON(value_json:effects_json):V2:unchanged_shared_objects IS NOT NULL - - UNION ALL - - -- Changed shared objects - SELECT - value_json:transaction_digest::STRING as tx_digest, - OBJECT_CONSTRUCT( - 'digest', ch.value[1]:input_state:Exist[0][1]::STRING, - 'objectId', ch.value[0]::STRING, - 'version', ch.value[1]:input_state:Exist[0][0]::NUMBER - ) as shared_obj - FROM parsed_new_data, - TABLE(FLATTEN(PARSE_JSON(value_json:effects_json):V2:changed_objects)) ch - WHERE ch.value[1]:input_state:Exist[1]:Shared IS NOT NULL - ) - GROUP BY tx_digest -), - --- Extract object changes -object_changes AS ( - SELECT - value_json:transaction_digest::STRING as tx_digest, - ARRAY_AGG( - OBJECT_CONSTRUCT( - 'digest', obj.value[1]:output_state:ObjectWrite[0]::STRING, - 'objectId', obj.value[0]::STRING, - 'objectType', 'unknown', -- Default, would need object type mapping - 'owner', - CASE - WHEN obj.value[1]:output_state:ObjectWrite[1]:Shared IS NOT NULL - THEN OBJECT_CONSTRUCT( - 'Shared', obj.value[1]:output_state:ObjectWrite[1]:Shared - ) - WHEN obj.value[1]:output_state:ObjectWrite[1]:ObjectOwner IS NOT NULL - THEN OBJECT_CONSTRUCT( - 'ObjectOwner', obj.value[1]:output_state:ObjectWrite[1]:ObjectOwner::STRING - ) - WHEN obj.value[1]:output_state:ObjectWrite[1]:AddressOwner IS NOT NULL - THEN OBJECT_CONSTRUCT( - 'AddressOwner', obj.value[1]:output_state:ObjectWrite[1]:AddressOwner::STRING - ) - ELSE OBJECT_CONSTRUCT('AddressOwner', value_json:sender::STRING) - END, - 'previousVersion', obj.value[1]:input_state:Exist[0][0]::STRING, - 'sender', value_json:sender::STRING, - 'type', 'mutated', - 'version', PARSE_JSON(value_json:effects_json):V2:lamport_version::STRING - ) - ) as object_changes_array - FROM parsed_new_data, - TABLE(FLATTEN(PARSE_JSON(value_json:effects_json):V2:changed_objects)) obj - GROUP BY value_json:transaction_digest::STRING -), - --- Extract transaction inputs -transaction_inputs AS ( - SELECT - value_json:transaction_digest::STRING as tx_digest, - ARRAY_AGG( - CASE - WHEN inp.value:Object:SharedObject IS NOT NULL - THEN OBJECT_CONSTRUCT( - 'initialSharedVersion', inp.value:Object:SharedObject:initial_shared_version::STRING, - 'mutable', inp.value:Object:SharedObject:mutable::BOOLEAN, - 'objectId', inp.value:Object:SharedObject:id::STRING, - 'objectType', 'sharedObject', - 'type', 'object' - ) - WHEN inp.value:Pure IS NOT NULL - THEN OBJECT_CONSTRUCT( - 'type', 'pure', - 'value', '969745335973357872526821071118', -- Would need to decode Pure bytes - 'valueType', 'u128' - ) - ELSE OBJECT_CONSTRUCT('type', 'unknown') - END - ) as inputs_array - FROM parsed_new_data, - TABLE(FLATTEN(PARSE_JSON(value_json:transaction_json):data[0]:intent_message:value:V1:kind:ProgrammableTransaction:inputs)) inp - WHERE PARSE_JSON(value_json:transaction_json):data[0]:intent_message:value:V1:kind:ProgrammableTransaction:inputs IS NOT NULL - GROUP BY value_json:transaction_digest::STRING -), - --- Extract transaction commands (simplified without type arguments for now) -transaction_commands AS ( - SELECT - value_json:transaction_digest::STRING as tx_digest, - ARRAY_AGG( - CASE - WHEN cmd.value:MoveCall IS NOT NULL - THEN OBJECT_CONSTRUCT( - 'MoveCall', OBJECT_CONSTRUCT( - 'arguments', cmd.value:MoveCall:arguments, - 'function', cmd.value:MoveCall:function::STRING, - 'module', cmd.value:MoveCall:module::STRING, - 'package', cmd.value:MoveCall:package::STRING, - 'type_arguments', cmd.value:MoveCall:type_arguments -- Keep original format for now - ) - ) - ELSE cmd.value - END - ) as commands_array - FROM parsed_new_data, - TABLE(FLATTEN(PARSE_JSON(value_json:transaction_json):data[0]:intent_message:value:V1:kind:ProgrammableTransaction:commands)) cmd - WHERE PARSE_JSON(value_json:transaction_json):data[0]:intent_message:value:V1:kind:ProgrammableTransaction:commands IS NOT NULL - GROUP BY value_json:transaction_digest::STRING -), - -transformed_data AS ( - SELECT - -- Extract checkpoint number from VALUE JSON - p.value_json:checkpoint::NUMBER as CHECKPOINT_NUMBER, - - -- Extract transaction digest from VALUE JSON - p.value_json:transaction_digest::STRING as TX_DIGEST, - - -- Convert timestamp_ms to proper timestamp format - TO_TIMESTAMP(p.value_json:timestamp_ms::NUMBER / 1000) as BLOCK_TIMESTAMP, - - -- Create partition key (round down checkpoint to nearest 10000) - FLOOR(p.value_json:checkpoint::NUMBER / 10000) * 10000 as PARTITION_KEY, - - -- Reconstruct the TRANSACTION_JSON in the existing format - OBJECT_CONSTRUCT( - -- Balance Changes - calculate from total gas cost - 'balanceChanges', - CASE - WHEN p.value_json:total_gas_cost::NUMBER > 0 - THEN ARRAY_CONSTRUCT( - OBJECT_CONSTRUCT( - 'amount', '-' || p.value_json:total_gas_cost::STRING, - 'coinType', '0x2::sui::SUI', - 'owner', OBJECT_CONSTRUCT( - 'AddressOwner', p.value_json:sender::STRING - ) - ) - ) - ELSE ARRAY_CONSTRUCT() - END, - - 'checkpoint', p.value_json:checkpoint::STRING, - 'digest', p.value_json:transaction_digest::STRING, - - 'effects', OBJECT_CONSTRUCT( - 'dependencies', - CASE - WHEN PARSE_JSON(p.value_json:effects_json):V2:dependencies IS NOT NULL - THEN PARSE_JSON(p.value_json:effects_json):V2:dependencies - ELSE ARRAY_CONSTRUCT() - END, - - -- Events Digest - 'eventsDigest', p.value_json:events_digest::STRING, - - 'executedEpoch', p.value_json:epoch::STRING, - - 'gasObject', OBJECT_CONSTRUCT( - 'owner', OBJECT_CONSTRUCT( - 'AddressOwner', p.value_json:gas_owner::STRING - ), - 'reference', OBJECT_CONSTRUCT( - 'digest', p.value_json:gas_object_digest::STRING, - 'objectId', p.value_json:gas_object_id::STRING, - 'version', p.value_json:gas_object_sequence::NUMBER - ) - ), - - 'gasUsed', OBJECT_CONSTRUCT( - 'computationCost', p.value_json:computation_cost::STRING, - 'nonRefundableStorageFee', p.value_json:non_refundable_storage_fee::STRING, - 'storageCost', p.value_json:storage_cost::STRING, - 'storageRebate', p.value_json:storage_rebate::STRING - ), - - 'messageVersion', 'v1', - - -- Use pre-computed arrays - 'modifiedAtVersions', COALESCE(mav.modified_versions_array, ARRAY_CONSTRUCT()), - 'mutated', COALESCE(mo.mutated_array, ARRAY_CONSTRUCT()), - 'sharedObjects', COALESCE(so.shared_objects_array, ARRAY_CONSTRUCT()), - - 'status', OBJECT_CONSTRUCT( - 'status', - CASE - WHEN p.value_json:execution_success::BOOLEAN = TRUE THEN 'success' - ELSE 'failure' - END - ), - 'transactionDigest', p.value_json:transaction_digest::STRING - ), - - -- Events - Note: This would need to be parsed from a separate events data source - 'events', ARRAY_CONSTRUCT(), - - -- Object Changes - use pre-computed array - 'objectChanges', COALESCE(oc.object_changes_array, ARRAY_CONSTRUCT()), - - 'timestampMs', p.value_json:timestamp_ms::STRING, - - -- Transaction data - 'transaction', OBJECT_CONSTRUCT( - 'data', OBJECT_CONSTRUCT( - 'gasData', OBJECT_CONSTRUCT( - 'budget', p.value_json:gas_budget::STRING, - 'owner', p.value_json:gas_owner::STRING, - 'payment', ARRAY_CONSTRUCT( - OBJECT_CONSTRUCT( - 'digest', - CASE - WHEN PARSE_JSON(p.value_json:transaction_json):data[0]:intent_message:value:V1:gas_data:payment[0][2] IS NOT NULL - THEN PARSE_JSON(p.value_json:transaction_json):data[0]:intent_message:value:V1:gas_data:payment[0][2]::STRING - ELSE p.value_json:gas_object_digest::STRING - END, - 'objectId', p.value_json:gas_object_id::STRING, - 'version', - CASE - WHEN PARSE_JSON(p.value_json:transaction_json):data[0]:intent_message:value:V1:gas_data:payment[0][1] IS NOT NULL - THEN PARSE_JSON(p.value_json:transaction_json):data[0]:intent_message:value:V1:gas_data:payment[0][1]::NUMBER - ELSE p.value_json:gas_object_sequence::NUMBER - END - ) - ), - 'price', p.value_json:gas_price::STRING - ), - 'messageVersion', 'v1', - 'sender', p.value_json:sender::STRING, - - -- Transaction details - use pre-computed arrays with fixed type arguments - 'transaction', - CASE - WHEN PARSE_JSON(p.value_json:transaction_json):data[0]:intent_message:value:V1:kind:ProgrammableTransaction IS NOT NULL - THEN OBJECT_CONSTRUCT( - 'inputs', COALESCE(ti.inputs_array, ARRAY_CONSTRUCT()), - 'kind', 'ProgrammableTransaction', - 'transactions', COALESCE(tc.commands_array, ARRAY_CONSTRUCT()) - ) - ELSE OBJECT_CONSTRUCT() - END - ), - 'txSignatures', - CASE - WHEN PARSE_JSON(p.value_json:transaction_json):data[0]:tx_signatures IS NOT NULL - THEN PARSE_JSON(p.value_json:transaction_json):data[0]:tx_signatures - ELSE ARRAY_CONSTRUCT() - END - ) - ) as TRANSACTION_JSON, - - -- Set timestamps - CURRENT_TIMESTAMP() as _INSERTED_TIMESTAMP, - - -- Generate surrogate key similar to existing format - HASH(p.value_json:transaction_digest::STRING) as TRANSACTIONS_ID, - - CURRENT_TIMESTAMP() as INSERTED_TIMESTAMP, - CURRENT_TIMESTAMP() as MODIFIED_TIMESTAMP - - FROM parsed_new_data p - LEFT JOIN modified_at_versions mav ON p.value_json:transaction_digest::STRING = mav.tx_digest - LEFT JOIN mutated_objects mo ON p.value_json:transaction_digest::STRING = mo.tx_digest - LEFT JOIN shared_objects so ON p.value_json:transaction_digest::STRING = so.tx_digest - LEFT JOIN object_changes oc ON p.value_json:transaction_digest::STRING = oc.tx_digest - LEFT JOIN transaction_inputs ti ON p.value_json:transaction_digest::STRING = ti.tx_digest - LEFT JOIN transaction_commands tc ON p.value_json:transaction_digest::STRING = tc.tx_digest -) - -SELECT - CHECKPOINT_NUMBER, - TX_DIGEST, - BLOCK_TIMESTAMP, - PARTITION_KEY, - TRANSACTION_JSON, - _INSERTED_TIMESTAMP, - TRANSACTIONS_ID, - INSERTED_TIMESTAMP, - MODIFIED_TIMESTAMP -FROM transformed_data \ No newline at end of file From 89fcf827e6a334b96cf78056e2def13b0889e915 Mon Sep 17 00:00:00 2001 From: Eric Laurello Date: Fri, 10 Oct 2025 16:27:30 -0400 Subject: [PATCH 4/7] Replace hard-coded streamline database references with dbt sources MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Update bronze backfill models to use source() function instead of direct table references. This improves maintainability and allows for environment- specific configuration through the existing bronze_streamline source definition. Changes: - Add checkpoints_backfill, transactions_backfill, events_backfill to sources.yml - Update all bronze backfill models to use {{ source('bronze_streamline', '...') }} 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude --- models/bronze/backfill/bronze__checkpoints_backfill.sql | 2 +- models/bronze/backfill/bronze__events_backfill.sql | 2 +- models/bronze/backfill/bronze__transactions_backfill.sql | 2 +- models/sources.yml | 3 +++ 4 files changed, 6 insertions(+), 3 deletions(-) diff --git a/models/bronze/backfill/bronze__checkpoints_backfill.sql b/models/bronze/backfill/bronze__checkpoints_backfill.sql index bcef4e2..98f5400 100644 --- a/models/bronze/backfill/bronze__checkpoints_backfill.sql +++ b/models/bronze/backfill/bronze__checkpoints_backfill.sql @@ -7,7 +7,7 @@ SELECT epoch, DATA FROM - streamline.sui.checkpoints_backfill + {{ source('bronze_streamline', 'checkpoints_backfill') }} WHERE epoch <= 629 AND checkpoint_number < 96605300 diff --git a/models/bronze/backfill/bronze__events_backfill.sql b/models/bronze/backfill/bronze__events_backfill.sql index 93a9201..d7b5acf 100644 --- a/models/bronze/backfill/bronze__events_backfill.sql +++ b/models/bronze/backfill/bronze__events_backfill.sql @@ -7,7 +7,7 @@ SELECT epoch, DATA FROM - streamline.sui.events + {{ source('bronze_streamline', 'events_backfill') }} WHERE epoch <= 629 AND VALUE: checkpoint :: INT < 96605300 diff --git a/models/bronze/backfill/bronze__transactions_backfill.sql b/models/bronze/backfill/bronze__transactions_backfill.sql index 728649c..bcac443 100644 --- a/models/bronze/backfill/bronze__transactions_backfill.sql +++ b/models/bronze/backfill/bronze__transactions_backfill.sql @@ -7,7 +7,7 @@ SELECT epoch, DATA FROM - streamline.sui.transactions_backfill + {{ source('bronze_streamline', 'transactions_backfill') }} WHERE epoch <= 629 AND checkpoint_number < 96605300 diff --git a/models/sources.yml b/models/sources.yml index fd150b0..d846f21 100644 --- a/models/sources.yml +++ b/models/sources.yml @@ -7,6 +7,9 @@ sources: tables: - name: checkpoints - name: transactions + - name: checkpoints_backfill + - name: transactions_backfill + - name: events_backfill - name: crosschain database: "{{ 'crosschain' if target.database == 'SUI' else 'crosschain_dev' }}" From fdffc38cdf3a4af323aad643fd9f217a221d0cd3 Mon Sep 17 00:00:00 2001 From: Eric Laurello Date: Fri, 10 Oct 2025 16:39:33 -0400 Subject: [PATCH 5/7] rm temp filters --- .../backfill/bronze__checkpoints_backfill.sql | 8 +-- .../backfill/bronze__events_backfill.sql | 8 +-- models/gold/core/core__fact_checkpoints.sql | 39 +++++++------- models/gold/core/core__fact_events.sql | 52 +++++++++---------- .../core/core__fact_transaction_blocks.sql | 51 +++++++++--------- models/gold/core/core__fact_transactions.sql | 37 +++++++------ 6 files changed, 98 insertions(+), 97 deletions(-) diff --git a/models/bronze/backfill/bronze__checkpoints_backfill.sql b/models/bronze/backfill/bronze__checkpoints_backfill.sql index 98f5400..e26280b 100644 --- a/models/bronze/backfill/bronze__checkpoints_backfill.sql +++ b/models/bronze/backfill/bronze__checkpoints_backfill.sql @@ -7,8 +7,10 @@ SELECT epoch, DATA FROM - {{ source('bronze_streamline', 'checkpoints_backfill') }} + {{ source( + 'bronze_streamline', + 'checkpoints_backfill' + ) }} WHERE epoch <= 629 - AND checkpoint_number < 96605300 - AND epoch = 628 + AND checkpoint_number < 96605300 {# AND epoch = 628 #} diff --git a/models/bronze/backfill/bronze__events_backfill.sql b/models/bronze/backfill/bronze__events_backfill.sql index d7b5acf..361f287 100644 --- a/models/bronze/backfill/bronze__events_backfill.sql +++ b/models/bronze/backfill/bronze__events_backfill.sql @@ -7,8 +7,10 @@ SELECT epoch, DATA FROM - {{ source('bronze_streamline', 'events_backfill') }} + {{ source( + 'bronze_streamline', + 'events_backfill' + ) }} WHERE epoch <= 629 - AND VALUE: checkpoint :: INT < 96605300 - AND epoch = 628 + AND VALUE: checkpoint :: INT < 96605300 {# AND epoch = 628 #} diff --git a/models/gold/core/core__fact_checkpoints.sql b/models/gold/core/core__fact_checkpoints.sql index e73147d..1ae1b46 100644 --- a/models/gold/core/core__fact_checkpoints.sql +++ b/models/gold/core/core__fact_checkpoints.sql @@ -34,23 +34,22 @@ WHERE {{ this }}) {% endif %} - {# {% if is_incremental() %} - {% else %} - #} - UNION ALL - SELECT - checkpoint_number, - block_timestamp, - epoch, - checkpoint_digest, - previous_digest, - network_total_transactions, - validator_signature, - tx_count, - ARRAY_CONSTRUCT() AS transactions_array, - {{ dbt_utils.generate_surrogate_key(['checkpoint_number']) }} AS fact_checkpoints_id, - SYSDATE() AS inserted_timestamp, - SYSDATE() AS modified_timestamp - FROM - {{ ref('silver__checkpoints_backfill') }} - {# {% endif %} #} +{% if is_incremental() %} +{% else %} + UNION ALL + SELECT + checkpoint_number, + block_timestamp, + epoch, + checkpoint_digest, + previous_digest, + network_total_transactions, + validator_signature, + tx_count, + ARRAY_CONSTRUCT() AS transactions_array, + {{ dbt_utils.generate_surrogate_key(['checkpoint_number']) }} AS fact_checkpoints_id, + SYSDATE() AS inserted_timestamp, + SYSDATE() AS modified_timestamp + FROM + {{ ref('silver__checkpoints_backfill') }} + {% endif %} diff --git a/models/gold/core/core__fact_events.sql b/models/gold/core/core__fact_events.sql index 4c888ec..b943df9 100644 --- a/models/gold/core/core__fact_events.sql +++ b/models/gold/core/core__fact_events.sql @@ -42,33 +42,33 @@ WHERE {{ this }}) {% endif %} - {# {% if is_incremental() %} - {% else %} - #} - UNION ALL - SELECT +{% if is_incremental() %} +{% else %} + UNION ALL + SELECT + checkpoint_number, + block_timestamp, + tx_digest, + tx_kind, + tx_sender, + message_version, + tx_succeeded, + NULL AS event_value, + event_index, + TYPE, + package_id, + transaction_module, + sender, + parsed_json + FROM + {{ ref('silver__events_backfill') }} A + JOIN {{ ref('silver__transactions_backfill') }} + b USING ( checkpoint_number, - block_timestamp, - tx_digest, - tx_kind, - tx_sender, - message_version, - tx_succeeded, - NULL AS event_value, - event_index, - TYPE, - package_id, - transaction_module, - sender, - parsed_json - FROM - {{ ref('silver__events_backfill') }} A - JOIN {{ ref('silver__transactions_backfill') }} - b USING ( - checkpoint_number, - tx_digest - ) {# {% endif %} #} - ) + tx_digest + ) + {% endif %} +) SELECT checkpoint_number, block_timestamp, diff --git a/models/gold/core/core__fact_transaction_blocks.sql b/models/gold/core/core__fact_transaction_blocks.sql index bf2edfb..f8b67df 100644 --- a/models/gold/core/core__fact_transaction_blocks.sql +++ b/models/gold/core/core__fact_transaction_blocks.sql @@ -50,32 +50,31 @@ WHERE {{ this }}) {% endif %} - {# {% if is_incremental() %} - {% else %} - #} - UNION ALL - SELECT - checkpoint_number, - block_timestamp, - tx_digest, - tx_kind, - tx_sender, - message_version, - tx_succeeded, - tx_error, - tx_dependencies, - gas_used_computation_fee, - gas_used_non_refundable_storage_fee, - gas_used_storage_fee, - gas_used_storage_rebate, - gas_budget, - gas_owner, - gas_price, - tx_fee - FROM - {{ ref('silver__transaction_blocks_backfill') }} - {# {% endif %} #} - ) +{% if is_incremental() %} +{% else %} + UNION ALL + SELECT + checkpoint_number, + block_timestamp, + tx_digest, + tx_kind, + tx_sender, + message_version, + tx_succeeded, + tx_error, + tx_dependencies, + gas_used_computation_fee, + gas_used_non_refundable_storage_fee, + gas_used_storage_fee, + gas_used_storage_rebate, + gas_budget, + gas_owner, + gas_price, + tx_fee + FROM + {{ ref('silver__transaction_blocks_backfill') }} + {% endif %} +) SELECT checkpoint_number, block_timestamp, diff --git a/models/gold/core/core__fact_transactions.sql b/models/gold/core/core__fact_transactions.sql index d898cdc..4abba78 100644 --- a/models/gold/core/core__fact_transactions.sql +++ b/models/gold/core/core__fact_transactions.sql @@ -41,25 +41,24 @@ WHERE {{ this }}) {% endif %} - {# {% if is_incremental() %} - {% else %} - #} - UNION ALL - SELECT - checkpoint_number, - block_timestamp, - tx_digest, - tx_kind, - tx_sender, - message_version, - tx_succeeded, - payload_index, - payload_type, - payload_details - FROM - {{ ref('silver__transactions_backfill') }} - {# {% endif %} #} - ) +{% if is_incremental() %} +{% else %} + UNION ALL + SELECT + checkpoint_number, + block_timestamp, + tx_digest, + tx_kind, + tx_sender, + message_version, + tx_succeeded, + payload_index, + payload_type, + payload_details + FROM + {{ ref('silver__transactions_backfill') }} + {% endif %} +) SELECT checkpoint_number, block_timestamp, From 4c61106ce0913be50cc655acde297bde5b22ab39 Mon Sep 17 00:00:00 2001 From: Eric Laurello Date: Mon, 13 Oct 2025 11:28:06 -0400 Subject: [PATCH 6/7] PR fixes --- models/bronze/backfill/bronze__checkpoints_backfill.sql | 2 +- models/bronze/backfill/bronze__events_backfill.sql | 4 ++-- models/bronze/backfill/bronze__transactions_backfill.sql | 7 +++++-- .../silver/core/backfill/silver__checkpoints_backfill.sql | 2 +- .../core/backfill/silver__transaction_blocks_backfill.sql | 7 +++++-- .../silver/core/backfill/silver__transactions_backfill.sql | 6 +++++- models/sources.yml | 5 +++-- 7 files changed, 22 insertions(+), 11 deletions(-) diff --git a/models/bronze/backfill/bronze__checkpoints_backfill.sql b/models/bronze/backfill/bronze__checkpoints_backfill.sql index e26280b..8838021 100644 --- a/models/bronze/backfill/bronze__checkpoints_backfill.sql +++ b/models/bronze/backfill/bronze__checkpoints_backfill.sql @@ -13,4 +13,4 @@ FROM ) }} WHERE epoch <= 629 - AND checkpoint_number < 96605300 {# AND epoch = 628 #} + AND VALUE: sequence_number :: INT < 96605300 diff --git a/models/bronze/backfill/bronze__events_backfill.sql b/models/bronze/backfill/bronze__events_backfill.sql index 361f287..7ce9fef 100644 --- a/models/bronze/backfill/bronze__events_backfill.sql +++ b/models/bronze/backfill/bronze__events_backfill.sql @@ -9,8 +9,8 @@ SELECT FROM {{ source( 'bronze_streamline', - 'events_backfill' + 'events' ) }} WHERE epoch <= 629 - AND VALUE: checkpoint :: INT < 96605300 {# AND epoch = 628 #} + AND VALUE: checkpoint :: INT < 96605300 diff --git a/models/bronze/backfill/bronze__transactions_backfill.sql b/models/bronze/backfill/bronze__transactions_backfill.sql index bcac443..2e801da 100644 --- a/models/bronze/backfill/bronze__transactions_backfill.sql +++ b/models/bronze/backfill/bronze__transactions_backfill.sql @@ -7,7 +7,10 @@ SELECT epoch, DATA FROM - {{ source('bronze_streamline', 'transactions_backfill') }} + {{ source( + 'bronze_streamline', + 'transactions_backfill' + ) }} WHERE epoch <= 629 - AND checkpoint_number < 96605300 + AND VALUE: checkpoint :: INT < 96605300 diff --git a/models/silver/core/backfill/silver__checkpoints_backfill.sql b/models/silver/core/backfill/silver__checkpoints_backfill.sql index 35740db..4776955 100644 --- a/models/silver/core/backfill/silver__checkpoints_backfill.sql +++ b/models/silver/core/backfill/silver__checkpoints_backfill.sql @@ -19,7 +19,7 @@ SELECT value_json :previous_checkpoint_digest :: STRING previous_digest, value_json :network_total_transaction :: STRING network_total_transactions, value_json :validator_signature :: STRING AS validator_signature, - value_json :total_transactions :: INT AS tx_count, + value_json :total_transaction_blocks :: INT AS tx_count, SYSDATE() AS inserted_timestamp, '{{ invocation_id }}' AS _invocation_id FROM diff --git a/models/silver/core/backfill/silver__transaction_blocks_backfill.sql b/models/silver/core/backfill/silver__transaction_blocks_backfill.sql index ee78c48..457cef2 100644 --- a/models/silver/core/backfill/silver__transaction_blocks_backfill.sql +++ b/models/silver/core/backfill/silver__transaction_blocks_backfill.sql @@ -23,8 +23,11 @@ SELECT value_json: transaction_digest :: STRING AS tx_digest, value_json: "transaction_kind" :: STRING AS tx_kind, value_json :"sender" :: STRING AS tx_sender, - transaction_json :"data" [0] :"intent_message" :"intent" :"version" :: STRING AS message_version, - -- TBD if this is real + LOWER( + object_keys( + transaction_json :"data" [0] :"intent_message" :value + ) [0] + ) :: STRING AS message_version, CASE WHEN efx_json :"V2" :"status" = 'Success' THEN TRUE ELSE FALSE diff --git a/models/silver/core/backfill/silver__transactions_backfill.sql b/models/silver/core/backfill/silver__transactions_backfill.sql index ab69c68..c0f806e 100644 --- a/models/silver/core/backfill/silver__transactions_backfill.sql +++ b/models/silver/core/backfill/silver__transactions_backfill.sql @@ -34,7 +34,11 @@ SELECT value_json: transaction_digest :: STRING AS tx_digest, value_json: "transaction_kind" :: STRING AS tx_kind, value_json :"sender" :: STRING AS tx_sender, - transaction_json :"data" [0] :"intent_message" :"intent" :"version" :: STRING AS message_version, + LOWER( + object_keys( + transaction_json :"data" [0] :"intent_message" :value + ) [0] + ) :: STRING AS message_version, CASE WHEN efx_json :"V2" :"status" = 'Success' THEN TRUE ELSE FALSE diff --git a/models/sources.yml b/models/sources.yml index d846f21..74ca4a5 100644 --- a/models/sources.yml +++ b/models/sources.yml @@ -3,13 +3,14 @@ version: 2 sources: - name: bronze_streamline database: streamline - schema: "{{ 'sui' if target.database == 'SUI' else 'sui_dev' }}" + schema: sui + # "{{ 'sui' if target.database == 'SUI' else 'sui_dev' }}" tables: - name: checkpoints - name: transactions - name: checkpoints_backfill - name: transactions_backfill - - name: events_backfill + - name: events - name: crosschain database: "{{ 'crosschain' if target.database == 'SUI' else 'crosschain_dev' }}" From f5d641070df78a68e3ba2f66e37659ea856a782e Mon Sep 17 00:00:00 2001 From: Eric Laurello Date: Mon, 13 Oct 2025 11:30:48 -0400 Subject: [PATCH 7/7] case in source --- models/sources.yml | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/models/sources.yml b/models/sources.yml index 74ca4a5..5b5dd19 100644 --- a/models/sources.yml +++ b/models/sources.yml @@ -3,8 +3,7 @@ version: 2 sources: - name: bronze_streamline database: streamline - schema: sui - # "{{ 'sui' if target.database == 'SUI' else 'sui_dev' }}" + schema: "{{ 'sui' if target.database == 'SUI' else 'sui_dev' }}" tables: - name: checkpoints - name: transactions