diff --git a/models/bronze/backfill/bronze__checkpoints_backfill.sql b/models/bronze/backfill/bronze__checkpoints_backfill.sql new file mode 100644 index 0000000..8838021 --- /dev/null +++ b/models/bronze/backfill/bronze__checkpoints_backfill.sql @@ -0,0 +1,16 @@ +{{ config ( + materialized = 'view' +) }} + +SELECT + VALUE, + epoch, + DATA +FROM + {{ source( + 'bronze_streamline', + 'checkpoints_backfill' + ) }} +WHERE + epoch <= 629 + AND VALUE: sequence_number :: INT < 96605300 diff --git a/models/bronze/backfill/bronze__events_backfill.sql b/models/bronze/backfill/bronze__events_backfill.sql new file mode 100644 index 0000000..7ce9fef --- /dev/null +++ b/models/bronze/backfill/bronze__events_backfill.sql @@ -0,0 +1,16 @@ +{{ config ( + materialized = 'view' +) }} + +SELECT + VALUE, + epoch, + DATA +FROM + {{ source( + 'bronze_streamline', + 'events' + ) }} +WHERE + epoch <= 629 + AND VALUE: checkpoint :: INT < 96605300 diff --git a/models/bronze/backfill/bronze__transactions_backfill.sql b/models/bronze/backfill/bronze__transactions_backfill.sql new file mode 100644 index 0000000..2e801da --- /dev/null +++ b/models/bronze/backfill/bronze__transactions_backfill.sql @@ -0,0 +1,16 @@ +{{ config ( + materialized = 'view' +) }} + +SELECT + VALUE, + epoch, + DATA +FROM + {{ source( + 'bronze_streamline', + 'transactions_backfill' + ) }} +WHERE + epoch <= 629 + AND VALUE: checkpoint :: INT < 96605300 diff --git a/models/gold/core/core__fact_checkpoints.sql b/models/gold/core/core__fact_checkpoints.sql index eb8e2c8..1ae1b46 100644 --- a/models/gold/core/core__fact_checkpoints.sql +++ b/models/gold/core/core__fact_checkpoints.sql @@ -33,3 +33,23 @@ WHERE FROM {{ this }}) {% endif %} + +{% if is_incremental() %} +{% else %} + UNION ALL + SELECT + checkpoint_number, + block_timestamp, + epoch, + checkpoint_digest, + previous_digest, + network_total_transactions, + validator_signature, + tx_count, + ARRAY_CONSTRUCT() AS transactions_array, + {{ dbt_utils.generate_surrogate_key(['checkpoint_number']) }} AS fact_checkpoints_id, + SYSDATE() AS inserted_timestamp, + SYSDATE() AS modified_timestamp + FROM + {{ ref('silver__checkpoints_backfill') }} + {% endif %} diff --git a/models/gold/core/core__fact_events.sql b/models/gold/core/core__fact_events.sql index 2ad80d4..b943df9 100644 --- a/models/gold/core/core__fact_events.sql +++ b/models/gold/core/core__fact_events.sql @@ -41,7 +41,34 @@ WHERE FROM {{ this }}) {% endif %} - ) + +{% if is_incremental() %} +{% else %} + UNION ALL + SELECT + checkpoint_number, + block_timestamp, + tx_digest, + tx_kind, + tx_sender, + message_version, + tx_succeeded, + NULL AS event_value, + event_index, + TYPE, + package_id, + transaction_module, + sender, + parsed_json + FROM + {{ ref('silver__events_backfill') }} A + JOIN {{ ref('silver__transactions_backfill') }} + b USING ( + checkpoint_number, + tx_digest + ) + {% endif %} +) SELECT checkpoint_number, block_timestamp, diff --git a/models/gold/core/core__fact_transaction_blocks.sql b/models/gold/core/core__fact_transaction_blocks.sql index 31338b5..f8b67df 100644 --- a/models/gold/core/core__fact_transaction_blocks.sql +++ b/models/gold/core/core__fact_transaction_blocks.sql @@ -49,7 +49,32 @@ WHERE FROM {{ this }}) {% endif %} - ) + +{% if is_incremental() %} +{% else %} + UNION ALL + SELECT + checkpoint_number, + block_timestamp, + tx_digest, + tx_kind, + tx_sender, + message_version, + tx_succeeded, + tx_error, + tx_dependencies, + gas_used_computation_fee, + gas_used_non_refundable_storage_fee, + gas_used_storage_fee, + gas_used_storage_rebate, + gas_budget, + gas_owner, + gas_price, + tx_fee + FROM + {{ ref('silver__transaction_blocks_backfill') }} + {% endif %} +) SELECT checkpoint_number, block_timestamp, diff --git a/models/gold/core/core__fact_transactions.sql b/models/gold/core/core__fact_transactions.sql index 4759178..4abba78 100644 --- a/models/gold/core/core__fact_transactions.sql +++ b/models/gold/core/core__fact_transactions.sql @@ -40,7 +40,25 @@ WHERE FROM {{ this }}) {% endif %} - ) + +{% if is_incremental() %} +{% else %} + UNION ALL + SELECT + checkpoint_number, + block_timestamp, + tx_digest, + tx_kind, + tx_sender, + message_version, + tx_succeeded, + payload_index, + payload_type, + payload_details + FROM + {{ ref('silver__transactions_backfill') }} + {% endif %} +) SELECT checkpoint_number, block_timestamp, diff --git a/models/silver/core/backfill/silver__checkpoints_backfill.sql b/models/silver/core/backfill/silver__checkpoints_backfill.sql new file mode 100644 index 0000000..4776955 --- /dev/null +++ b/models/silver/core/backfill/silver__checkpoints_backfill.sql @@ -0,0 +1,26 @@ +{{ config ( + materialized = "table" +) }} + +WITH parsed_checkpoint_data AS ( + + SELECT + PARSE_JSON(VALUE) AS value_json + FROM + {{ ref('bronze__checkpoints_backfill') }} +) +SELECT + value_json :sequence_number :: NUMBER AS checkpoint_number, + TO_TIMESTAMP( + value_json :timestamp_ms :: NUMBER / 1000 + ) AS block_timestamp, + value_json :epoch :: STRING AS epoch, + value_json :checkpoint_digest :: STRING AS checkpoint_digest, + value_json :previous_checkpoint_digest :: STRING previous_digest, + value_json :network_total_transaction :: STRING network_total_transactions, + value_json :validator_signature :: STRING AS validator_signature, + value_json :total_transaction_blocks :: INT AS tx_count, + SYSDATE() AS inserted_timestamp, + '{{ invocation_id }}' AS _invocation_id +FROM + parsed_checkpoint_data diff --git a/models/silver/core/backfill/silver__events_backfill.sql b/models/silver/core/backfill/silver__events_backfill.sql new file mode 100644 index 0000000..0d7924e --- /dev/null +++ b/models/silver/core/backfill/silver__events_backfill.sql @@ -0,0 +1,32 @@ +{{ config ( + materialized = "table" +) }} + +WITH base AS ( + + SELECT + PARSE_JSON( + A.value + ) AS value_json, + PARSE_JSON( + A.value :event_json + ) AS event_json + FROM + {{ ref('bronze__events_backfill') }} A +) +SELECT + value_json :checkpoint :: INT AS checkpoint_number, + TO_TIMESTAMP( + value_json :timestamp_ms :: NUMBER / 1000 + ) AS block_timestamp, + value_json: transaction_digest :: STRING AS tx_digest, + value_json :event_index :: INT AS event_index, + value_json :"package" :: STRING AS package_id, + value_json :"module" :: STRING AS transaction_module, + value_json :"sender" :: STRING AS sender, + value_json :"event_type" :: STRING AS TYPE, + event_json AS parsed_json, + SYSDATE() AS inserted_timestamp, + '{{ invocation_id }}' AS _invocation_id +FROM + base diff --git a/models/silver/core/backfill/silver__transaction_blocks_backfill.sql b/models/silver/core/backfill/silver__transaction_blocks_backfill.sql new file mode 100644 index 0000000..457cef2 --- /dev/null +++ b/models/silver/core/backfill/silver__transaction_blocks_backfill.sql @@ -0,0 +1,53 @@ +{{ config ( + materialized = "table" +) }} + +WITH base AS ( + + SELECT + PARSE_JSON(VALUE) AS value_json, + PARSE_JSON( + VALUE :transaction_json + ) AS transaction_json, + PARSE_JSON( + VALUE :effects_json + ) AS efx_json + FROM + {{ ref('bronze__transactions_backfill') }} +) +SELECT + value_json :checkpoint :: NUMBER AS checkpoint_number, + TO_TIMESTAMP( + value_json :timestamp_ms :: NUMBER / 1000 + ) AS block_timestamp, + value_json: transaction_digest :: STRING AS tx_digest, + value_json: "transaction_kind" :: STRING AS tx_kind, + value_json :"sender" :: STRING AS tx_sender, + LOWER( + object_keys( + transaction_json :"data" [0] :"intent_message" :value + ) [0] + ) :: STRING AS message_version, + CASE + WHEN efx_json :"V2" :"status" = 'Success' THEN TRUE + ELSE FALSE + END AS tx_succeeded, + efx_json :"V2" :"status" :"Failure" :"error" :: STRING AS tx_error, + efx_json: "V2" :"dependencies" AS tx_dependencies, + value_json :"computation_cost" :: bigint AS gas_used_computation_fee, + value_json: "non_refundable_storage_fee" :: bigint AS gas_used_non_refundable_storage_fee, + value_json: "storage_cost" :: bigint AS gas_used_storage_fee, + value_json: "storage_rebate" :: bigint AS gas_used_storage_rebate, + value_json :"gas_budget" :: bigint AS gas_budget, + value_json :"gas_owner" :: STRING AS gas_owner, + value_json :"gas_price" :: bigint AS gas_price, + ( + gas_used_computation_fee + gas_used_storage_fee - gas_used_storage_rebate + ) / pow( + 10, + 9 + ) AS tx_fee, + SYSDATE() AS inserted_timestamp, + '{{ invocation_id }}' AS _invocation_id +FROM + base diff --git a/models/silver/core/backfill/silver__transactions_backfill.sql b/models/silver/core/backfill/silver__transactions_backfill.sql new file mode 100644 index 0000000..c0f806e --- /dev/null +++ b/models/silver/core/backfill/silver__transactions_backfill.sql @@ -0,0 +1,52 @@ +{{ config ( + materialized = "table" +) }} + +WITH base AS ( + + SELECT + PARSE_JSON( + A.value + ) AS value_json, + PARSE_JSON( + A.value :transaction_json + ) AS transaction_json, + PARSE_JSON( + A.value :effects_json + ) AS efx_json, + b.index AS payload_index, + C.key AS payload_type, + C.value AS payload_details + FROM + {{ ref('bronze__transactions_backfill') }} A, + LATERAL FLATTEN( + transaction_json :data [0] :intent_message :value :"V1" :kind :ProgrammableTransaction :commands + ) b, + LATERAL FLATTEN( + b.value + ) C +) +SELECT + value_json :checkpoint :: NUMBER AS checkpoint_number, + TO_TIMESTAMP( + value_json :timestamp_ms :: NUMBER / 1000 + ) AS block_timestamp, + value_json: transaction_digest :: STRING AS tx_digest, + value_json: "transaction_kind" :: STRING AS tx_kind, + value_json :"sender" :: STRING AS tx_sender, + LOWER( + object_keys( + transaction_json :"data" [0] :"intent_message" :value + ) [0] + ) :: STRING AS message_version, + CASE + WHEN efx_json :"V2" :"status" = 'Success' THEN TRUE + ELSE FALSE + END AS tx_succeeded, + payload_index, + payload_type, + payload_details, + SYSDATE() AS inserted_timestamp, + '{{ invocation_id }}' AS _invocation_id +FROM + base diff --git a/models/sources.yml b/models/sources.yml index fd150b0..5b5dd19 100644 --- a/models/sources.yml +++ b/models/sources.yml @@ -7,6 +7,9 @@ sources: tables: - name: checkpoints - name: transactions + - name: checkpoints_backfill + - name: transactions_backfill + - name: events - name: crosschain database: "{{ 'crosschain' if target.database == 'SUI' else 'crosschain_dev' }}"