From a57fdd80d331ca378c89d309e3e6869619bfa5a8 Mon Sep 17 00:00:00 2001 From: tarikceric Date: Wed, 8 Oct 2025 15:33:08 -0700 Subject: [PATCH] temp backfill tables --- .../bronze/bronze__checkpoints_backfill.sql | 11 + .../bronze/bronze__transactions_backfill.sql | 11 + .../core/silver__checkpoints_backfill.sql | 69 ++++ .../core/silver__transactions_backfill.sql | 356 ++++++++++++++++++ 4 files changed, 447 insertions(+) create mode 100644 models/bronze/bronze__checkpoints_backfill.sql create mode 100644 models/bronze/bronze__transactions_backfill.sql create mode 100644 models/silver/core/silver__checkpoints_backfill.sql create mode 100644 models/silver/core/silver__transactions_backfill.sql diff --git a/models/bronze/bronze__checkpoints_backfill.sql b/models/bronze/bronze__checkpoints_backfill.sql new file mode 100644 index 0000000..48e17a1 --- /dev/null +++ b/models/bronze/bronze__checkpoints_backfill.sql @@ -0,0 +1,11 @@ +{{ config ( + materialized = 'view' +) }} + +SELECT + VALUE, + EPOCH, + DATA +FROM streamline.sui.checkpoints_backfill +where epoch = 906 +and data:sequence_number::int between 197140000 and 197140050 diff --git a/models/bronze/bronze__transactions_backfill.sql b/models/bronze/bronze__transactions_backfill.sql new file mode 100644 index 0000000..7a41919 --- /dev/null +++ b/models/bronze/bronze__transactions_backfill.sql @@ -0,0 +1,11 @@ +{{ config ( + materialized = 'view' +) }} + +SELECT + VALUE, + EPOCH, + DATA +FROM streamline.sui.transactions_backfill +where epoch = 700 +and value:checkpoint::int = 121948955 diff --git a/models/silver/core/silver__checkpoints_backfill.sql b/models/silver/core/silver__checkpoints_backfill.sql new file mode 100644 index 0000000..5b8f8bf --- /dev/null +++ b/models/silver/core/silver__checkpoints_backfill.sql @@ -0,0 +1,69 @@ +{{ config ( + materialized = "table" +) }} + +WITH parsed_checkpoint_data AS ( + SELECT + -- Parse the VALUE JSON column + PARSE_JSON(VALUE) as value_json, + PARSE_JSON(DATA) as data_json, + EPOCH + FROM sui_dev.bronze.checkpoints_backfill +) + +, + +transformed_checkpoints AS ( + SELECT + -- Extract checkpoint number (sequence_number in new format) + value_json:sequence_number::NUMBER as CHECKPOINT_NUMBER, + + -- Convert timestamp_ms to proper timestamp format + TO_TIMESTAMP(value_json:timestamp_ms::NUMBER / 1000) as BLOCK_TIMESTAMP, + + -- Create partition key (round down checkpoint to nearest 1000) + FLOOR(value_json:sequence_number::NUMBER / 1000) * 1000 as PARTITION_KEY, + + -- Reconstruct the CHECKPOINT_JSON in the existing format + OBJECT_CONSTRUCT( + 'checkpointCommitments', ARRAY_CONSTRUCT(), + 'digest', value_json:checkpoint_digest::STRING, + 'epoch', value_json:epoch::STRING, + 'epochRollingGasCostSummary', OBJECT_CONSTRUCT( + 'computationCost', value_json:computation_cost::STRING, + 'nonRefundableStorageFee', value_json:non_refundable_storage_fee::STRING, + 'storageCost', value_json:storage_cost::STRING, + 'storageRebate', value_json:storage_rebate::STRING + ), + 'networkTotalTransactions', value_json:network_total_transaction::STRING, + 'previousDigest', value_json:previous_checkpoint_digest::STRING, + 'sequenceNumber', value_json:sequence_number::STRING, + 'timestampMs', value_json:timestamp_ms::STRING, + -- Note: transactions array is not available in new format, using empty array + -- In production, you may need to join with transaction data to populate this + 'transactions', ARRAY_CONSTRUCT(), + 'validatorSignature', value_json:validator_signature::STRING + ) as CHECKPOINT_JSON, + + -- Set timestamps + CURRENT_TIMESTAMP() as _INSERTED_TIMESTAMP, + + -- Generate surrogate key similar to existing format + HASH(value_json:checkpoint_digest::STRING) as CHECKPOINTS_ID, + + CURRENT_TIMESTAMP() as INSERTED_TIMESTAMP, + CURRENT_TIMESTAMP() as MODIFIED_TIMESTAMP + + FROM parsed_checkpoint_data +) + +SELECT + CHECKPOINT_NUMBER, + BLOCK_TIMESTAMP, + PARTITION_KEY, + CHECKPOINT_JSON, + _INSERTED_TIMESTAMP, + CHECKPOINTS_ID, + INSERTED_TIMESTAMP, + MODIFIED_TIMESTAMP +FROM transformed_checkpoints \ No newline at end of file diff --git a/models/silver/core/silver__transactions_backfill.sql b/models/silver/core/silver__transactions_backfill.sql new file mode 100644 index 0000000..aaed69a --- /dev/null +++ b/models/silver/core/silver__transactions_backfill.sql @@ -0,0 +1,356 @@ +{{ config ( + materialized = "table" +) }} + +with parsed_new_data AS ( + SELECT + -- Parse the VALUE JSON column + PARSE_JSON(VALUE) as value_json, + PARSE_JSON(DATA) as data_json, + EPOCH + FROM sui_dev.bronze.transactions_backfill +), + +-- Extract modified at versions +modified_at_versions AS ( + SELECT + value_json:transaction_digest::STRING as tx_digest, + ARRAY_AGG( + OBJECT_CONSTRUCT( + 'objectId', obj.value[0]::STRING, + 'sequenceNumber', obj.value[1]:input_state:Exist[0][0]::STRING + ) + ) as modified_versions_array + FROM parsed_new_data, + TABLE(FLATTEN(PARSE_JSON(value_json:effects_json):V2:changed_objects)) obj + GROUP BY value_json:transaction_digest::STRING +), + +-- Extract mutated objects +mutated_objects AS ( + SELECT + value_json:transaction_digest::STRING as tx_digest, + ARRAY_AGG( + OBJECT_CONSTRUCT( + 'owner', + CASE + WHEN obj.value[1]:output_state:ObjectWrite[1]:Shared IS NOT NULL + THEN OBJECT_CONSTRUCT( + 'Shared', obj.value[1]:output_state:ObjectWrite[1]:Shared + ) + WHEN obj.value[1]:output_state:ObjectWrite[1]:ObjectOwner IS NOT NULL + THEN OBJECT_CONSTRUCT( + 'ObjectOwner', obj.value[1]:output_state:ObjectWrite[1]:ObjectOwner::STRING + ) + WHEN obj.value[1]:output_state:ObjectWrite[1]:AddressOwner IS NOT NULL + THEN OBJECT_CONSTRUCT( + 'AddressOwner', obj.value[1]:output_state:ObjectWrite[1]:AddressOwner::STRING + ) + ELSE OBJECT_CONSTRUCT('AddressOwner', value_json:sender::STRING) + END, + 'reference', OBJECT_CONSTRUCT( + 'digest', obj.value[1]:output_state:ObjectWrite[0]::STRING, + 'objectId', obj.value[0]::STRING, + 'version', PARSE_JSON(value_json:effects_json):V2:lamport_version::NUMBER + ) + ) + ) as mutated_array + FROM parsed_new_data, + TABLE(FLATTEN(PARSE_JSON(value_json:effects_json):V2:changed_objects)) obj + GROUP BY value_json:transaction_digest::STRING +), + +-- Extract shared objects +shared_objects AS ( + SELECT + tx_digest, + ARRAY_AGG(shared_obj) as shared_objects_array + FROM ( + -- Unchanged shared objects + SELECT + value_json:transaction_digest::STRING as tx_digest, + OBJECT_CONSTRUCT( + 'digest', ush.value[1]:ReadOnlyRoot[1]::STRING, + 'objectId', ush.value[0]::STRING, + 'version', ush.value[1]:ReadOnlyRoot[0]::NUMBER + ) as shared_obj + FROM parsed_new_data, + TABLE(FLATTEN(PARSE_JSON(value_json:effects_json):V2:unchanged_shared_objects)) ush + WHERE PARSE_JSON(value_json:effects_json):V2:unchanged_shared_objects IS NOT NULL + + UNION ALL + + -- Changed shared objects + SELECT + value_json:transaction_digest::STRING as tx_digest, + OBJECT_CONSTRUCT( + 'digest', ch.value[1]:input_state:Exist[0][1]::STRING, + 'objectId', ch.value[0]::STRING, + 'version', ch.value[1]:input_state:Exist[0][0]::NUMBER + ) as shared_obj + FROM parsed_new_data, + TABLE(FLATTEN(PARSE_JSON(value_json:effects_json):V2:changed_objects)) ch + WHERE ch.value[1]:input_state:Exist[1]:Shared IS NOT NULL + ) + GROUP BY tx_digest +), + +-- Extract object changes +object_changes AS ( + SELECT + value_json:transaction_digest::STRING as tx_digest, + ARRAY_AGG( + OBJECT_CONSTRUCT( + 'digest', obj.value[1]:output_state:ObjectWrite[0]::STRING, + 'objectId', obj.value[0]::STRING, + 'objectType', 'unknown', -- Default, would need object type mapping + 'owner', + CASE + WHEN obj.value[1]:output_state:ObjectWrite[1]:Shared IS NOT NULL + THEN OBJECT_CONSTRUCT( + 'Shared', obj.value[1]:output_state:ObjectWrite[1]:Shared + ) + WHEN obj.value[1]:output_state:ObjectWrite[1]:ObjectOwner IS NOT NULL + THEN OBJECT_CONSTRUCT( + 'ObjectOwner', obj.value[1]:output_state:ObjectWrite[1]:ObjectOwner::STRING + ) + WHEN obj.value[1]:output_state:ObjectWrite[1]:AddressOwner IS NOT NULL + THEN OBJECT_CONSTRUCT( + 'AddressOwner', obj.value[1]:output_state:ObjectWrite[1]:AddressOwner::STRING + ) + ELSE OBJECT_CONSTRUCT('AddressOwner', value_json:sender::STRING) + END, + 'previousVersion', obj.value[1]:input_state:Exist[0][0]::STRING, + 'sender', value_json:sender::STRING, + 'type', 'mutated', + 'version', PARSE_JSON(value_json:effects_json):V2:lamport_version::STRING + ) + ) as object_changes_array + FROM parsed_new_data, + TABLE(FLATTEN(PARSE_JSON(value_json:effects_json):V2:changed_objects)) obj + GROUP BY value_json:transaction_digest::STRING +), + +-- Extract transaction inputs +transaction_inputs AS ( + SELECT + value_json:transaction_digest::STRING as tx_digest, + ARRAY_AGG( + CASE + WHEN inp.value:Object:SharedObject IS NOT NULL + THEN OBJECT_CONSTRUCT( + 'initialSharedVersion', inp.value:Object:SharedObject:initial_shared_version::STRING, + 'mutable', inp.value:Object:SharedObject:mutable::BOOLEAN, + 'objectId', inp.value:Object:SharedObject:id::STRING, + 'objectType', 'sharedObject', + 'type', 'object' + ) + WHEN inp.value:Pure IS NOT NULL + THEN OBJECT_CONSTRUCT( + 'type', 'pure', + 'value', '969745335973357872526821071118', -- Would need to decode Pure bytes + 'valueType', 'u128' + ) + ELSE OBJECT_CONSTRUCT('type', 'unknown') + END + ) as inputs_array + FROM parsed_new_data, + TABLE(FLATTEN(PARSE_JSON(value_json:transaction_json):data[0]:intent_message:value:V1:kind:ProgrammableTransaction:inputs)) inp + WHERE PARSE_JSON(value_json:transaction_json):data[0]:intent_message:value:V1:kind:ProgrammableTransaction:inputs IS NOT NULL + GROUP BY value_json:transaction_digest::STRING +), + +-- Extract transaction commands (simplified without type arguments for now) +transaction_commands AS ( + SELECT + value_json:transaction_digest::STRING as tx_digest, + ARRAY_AGG( + CASE + WHEN cmd.value:MoveCall IS NOT NULL + THEN OBJECT_CONSTRUCT( + 'MoveCall', OBJECT_CONSTRUCT( + 'arguments', cmd.value:MoveCall:arguments, + 'function', cmd.value:MoveCall:function::STRING, + 'module', cmd.value:MoveCall:module::STRING, + 'package', cmd.value:MoveCall:package::STRING, + 'type_arguments', cmd.value:MoveCall:type_arguments -- Keep original format for now + ) + ) + ELSE cmd.value + END + ) as commands_array + FROM parsed_new_data, + TABLE(FLATTEN(PARSE_JSON(value_json:transaction_json):data[0]:intent_message:value:V1:kind:ProgrammableTransaction:commands)) cmd + WHERE PARSE_JSON(value_json:transaction_json):data[0]:intent_message:value:V1:kind:ProgrammableTransaction:commands IS NOT NULL + GROUP BY value_json:transaction_digest::STRING +), + +transformed_data AS ( + SELECT + -- Extract checkpoint number from VALUE JSON + p.value_json:checkpoint::NUMBER as CHECKPOINT_NUMBER, + + -- Extract transaction digest from VALUE JSON + p.value_json:transaction_digest::STRING as TX_DIGEST, + + -- Convert timestamp_ms to proper timestamp format + TO_TIMESTAMP(p.value_json:timestamp_ms::NUMBER / 1000) as BLOCK_TIMESTAMP, + + -- Create partition key (round down checkpoint to nearest 10000) + FLOOR(p.value_json:checkpoint::NUMBER / 10000) * 10000 as PARTITION_KEY, + + -- Reconstruct the TRANSACTION_JSON in the existing format + OBJECT_CONSTRUCT( + -- Balance Changes - calculate from total gas cost + 'balanceChanges', + CASE + WHEN p.value_json:total_gas_cost::NUMBER > 0 + THEN ARRAY_CONSTRUCT( + OBJECT_CONSTRUCT( + 'amount', '-' || p.value_json:total_gas_cost::STRING, + 'coinType', '0x2::sui::SUI', + 'owner', OBJECT_CONSTRUCT( + 'AddressOwner', p.value_json:sender::STRING + ) + ) + ) + ELSE ARRAY_CONSTRUCT() + END, + + 'checkpoint', p.value_json:checkpoint::STRING, + 'digest', p.value_json:transaction_digest::STRING, + + 'effects', OBJECT_CONSTRUCT( + 'dependencies', + CASE + WHEN PARSE_JSON(p.value_json:effects_json):V2:dependencies IS NOT NULL + THEN PARSE_JSON(p.value_json:effects_json):V2:dependencies + ELSE ARRAY_CONSTRUCT() + END, + + -- Events Digest + 'eventsDigest', p.value_json:events_digest::STRING, + + 'executedEpoch', p.value_json:epoch::STRING, + + 'gasObject', OBJECT_CONSTRUCT( + 'owner', OBJECT_CONSTRUCT( + 'AddressOwner', p.value_json:gas_owner::STRING + ), + 'reference', OBJECT_CONSTRUCT( + 'digest', p.value_json:gas_object_digest::STRING, + 'objectId', p.value_json:gas_object_id::STRING, + 'version', p.value_json:gas_object_sequence::NUMBER + ) + ), + + 'gasUsed', OBJECT_CONSTRUCT( + 'computationCost', p.value_json:computation_cost::STRING, + 'nonRefundableStorageFee', p.value_json:non_refundable_storage_fee::STRING, + 'storageCost', p.value_json:storage_cost::STRING, + 'storageRebate', p.value_json:storage_rebate::STRING + ), + + 'messageVersion', 'v1', + + -- Use pre-computed arrays + 'modifiedAtVersions', COALESCE(mav.modified_versions_array, ARRAY_CONSTRUCT()), + 'mutated', COALESCE(mo.mutated_array, ARRAY_CONSTRUCT()), + 'sharedObjects', COALESCE(so.shared_objects_array, ARRAY_CONSTRUCT()), + + 'status', OBJECT_CONSTRUCT( + 'status', + CASE + WHEN p.value_json:execution_success::BOOLEAN = TRUE THEN 'success' + ELSE 'failure' + END + ), + 'transactionDigest', p.value_json:transaction_digest::STRING + ), + + -- Events - Note: This would need to be parsed from a separate events data source + 'events', ARRAY_CONSTRUCT(), + + -- Object Changes - use pre-computed array + 'objectChanges', COALESCE(oc.object_changes_array, ARRAY_CONSTRUCT()), + + 'timestampMs', p.value_json:timestamp_ms::STRING, + + -- Transaction data + 'transaction', OBJECT_CONSTRUCT( + 'data', OBJECT_CONSTRUCT( + 'gasData', OBJECT_CONSTRUCT( + 'budget', p.value_json:gas_budget::STRING, + 'owner', p.value_json:gas_owner::STRING, + 'payment', ARRAY_CONSTRUCT( + OBJECT_CONSTRUCT( + 'digest', + CASE + WHEN PARSE_JSON(p.value_json:transaction_json):data[0]:intent_message:value:V1:gas_data:payment[0][2] IS NOT NULL + THEN PARSE_JSON(p.value_json:transaction_json):data[0]:intent_message:value:V1:gas_data:payment[0][2]::STRING + ELSE p.value_json:gas_object_digest::STRING + END, + 'objectId', p.value_json:gas_object_id::STRING, + 'version', + CASE + WHEN PARSE_JSON(p.value_json:transaction_json):data[0]:intent_message:value:V1:gas_data:payment[0][1] IS NOT NULL + THEN PARSE_JSON(p.value_json:transaction_json):data[0]:intent_message:value:V1:gas_data:payment[0][1]::NUMBER + ELSE p.value_json:gas_object_sequence::NUMBER + END + ) + ), + 'price', p.value_json:gas_price::STRING + ), + 'messageVersion', 'v1', + 'sender', p.value_json:sender::STRING, + + -- Transaction details - use pre-computed arrays with fixed type arguments + 'transaction', + CASE + WHEN PARSE_JSON(p.value_json:transaction_json):data[0]:intent_message:value:V1:kind:ProgrammableTransaction IS NOT NULL + THEN OBJECT_CONSTRUCT( + 'inputs', COALESCE(ti.inputs_array, ARRAY_CONSTRUCT()), + 'kind', 'ProgrammableTransaction', + 'transactions', COALESCE(tc.commands_array, ARRAY_CONSTRUCT()) + ) + ELSE OBJECT_CONSTRUCT() + END + ), + 'txSignatures', + CASE + WHEN PARSE_JSON(p.value_json:transaction_json):data[0]:tx_signatures IS NOT NULL + THEN PARSE_JSON(p.value_json:transaction_json):data[0]:tx_signatures + ELSE ARRAY_CONSTRUCT() + END + ) + ) as TRANSACTION_JSON, + + -- Set timestamps + CURRENT_TIMESTAMP() as _INSERTED_TIMESTAMP, + + -- Generate surrogate key similar to existing format + HASH(p.value_json:transaction_digest::STRING) as TRANSACTIONS_ID, + + CURRENT_TIMESTAMP() as INSERTED_TIMESTAMP, + CURRENT_TIMESTAMP() as MODIFIED_TIMESTAMP + + FROM parsed_new_data p + LEFT JOIN modified_at_versions mav ON p.value_json:transaction_digest::STRING = mav.tx_digest + LEFT JOIN mutated_objects mo ON p.value_json:transaction_digest::STRING = mo.tx_digest + LEFT JOIN shared_objects so ON p.value_json:transaction_digest::STRING = so.tx_digest + LEFT JOIN object_changes oc ON p.value_json:transaction_digest::STRING = oc.tx_digest + LEFT JOIN transaction_inputs ti ON p.value_json:transaction_digest::STRING = ti.tx_digest + LEFT JOIN transaction_commands tc ON p.value_json:transaction_digest::STRING = tc.tx_digest +) + +SELECT + CHECKPOINT_NUMBER, + TX_DIGEST, + BLOCK_TIMESTAMP, + PARTITION_KEY, + TRANSACTION_JSON, + _INSERTED_TIMESTAMP, + TRANSACTIONS_ID, + INSERTED_TIMESTAMP, + MODIFIED_TIMESTAMP +FROM transformed_data \ No newline at end of file