Merge pull request #18 from FlipsideCrypto/an-6563/sui_backfill_data

An 6563/sui backfill data
This commit is contained in:
eric-laurello 2025-10-14 09:53:36 -04:00 committed by GitHub
commit 47e4bcdf2c
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
12 changed files with 307 additions and 3 deletions

View File

@ -0,0 +1,16 @@
{{ config (
materialized = 'view'
) }}
SELECT
VALUE,
epoch,
DATA
FROM
{{ source(
'bronze_streamline',
'checkpoints_backfill'
) }}
WHERE
epoch <= 629
AND VALUE: sequence_number :: INT < 96605300

View File

@ -0,0 +1,16 @@
{{ config (
materialized = 'view'
) }}
SELECT
VALUE,
epoch,
DATA
FROM
{{ source(
'bronze_streamline',
'events'
) }}
WHERE
epoch <= 629
AND VALUE: checkpoint :: INT < 96605300

View File

@ -0,0 +1,16 @@
{{ config (
materialized = 'view'
) }}
SELECT
VALUE,
epoch,
DATA
FROM
{{ source(
'bronze_streamline',
'transactions_backfill'
) }}
WHERE
epoch <= 629
AND VALUE: checkpoint :: INT < 96605300

View File

@ -33,3 +33,23 @@ WHERE
FROM
{{ this }})
{% endif %}
{% if is_incremental() %}
{% else %}
UNION ALL
SELECT
checkpoint_number,
block_timestamp,
epoch,
checkpoint_digest,
previous_digest,
network_total_transactions,
validator_signature,
tx_count,
ARRAY_CONSTRUCT() AS transactions_array,
{{ dbt_utils.generate_surrogate_key(['checkpoint_number']) }} AS fact_checkpoints_id,
SYSDATE() AS inserted_timestamp,
SYSDATE() AS modified_timestamp
FROM
{{ ref('silver__checkpoints_backfill') }}
{% endif %}

View File

@ -41,7 +41,34 @@ WHERE
FROM
{{ this }})
{% endif %}
)
{% if is_incremental() %}
{% else %}
UNION ALL
SELECT
checkpoint_number,
block_timestamp,
tx_digest,
tx_kind,
tx_sender,
message_version,
tx_succeeded,
NULL AS event_value,
event_index,
TYPE,
package_id,
transaction_module,
sender,
parsed_json
FROM
{{ ref('silver__events_backfill') }} A
JOIN {{ ref('silver__transactions_backfill') }}
b USING (
checkpoint_number,
tx_digest
)
{% endif %}
)
SELECT
checkpoint_number,
block_timestamp,

View File

@ -49,7 +49,32 @@ WHERE
FROM
{{ this }})
{% endif %}
)
{% if is_incremental() %}
{% else %}
UNION ALL
SELECT
checkpoint_number,
block_timestamp,
tx_digest,
tx_kind,
tx_sender,
message_version,
tx_succeeded,
tx_error,
tx_dependencies,
gas_used_computation_fee,
gas_used_non_refundable_storage_fee,
gas_used_storage_fee,
gas_used_storage_rebate,
gas_budget,
gas_owner,
gas_price,
tx_fee
FROM
{{ ref('silver__transaction_blocks_backfill') }}
{% endif %}
)
SELECT
checkpoint_number,
block_timestamp,

View File

@ -40,7 +40,25 @@ WHERE
FROM
{{ this }})
{% endif %}
)
{% if is_incremental() %}
{% else %}
UNION ALL
SELECT
checkpoint_number,
block_timestamp,
tx_digest,
tx_kind,
tx_sender,
message_version,
tx_succeeded,
payload_index,
payload_type,
payload_details
FROM
{{ ref('silver__transactions_backfill') }}
{% endif %}
)
SELECT
checkpoint_number,
block_timestamp,

View File

@ -0,0 +1,26 @@
{{ config (
materialized = "table"
) }}
WITH parsed_checkpoint_data AS (
SELECT
PARSE_JSON(VALUE) AS value_json
FROM
{{ ref('bronze__checkpoints_backfill') }}
)
SELECT
value_json :sequence_number :: NUMBER AS checkpoint_number,
TO_TIMESTAMP(
value_json :timestamp_ms :: NUMBER / 1000
) AS block_timestamp,
value_json :epoch :: STRING AS epoch,
value_json :checkpoint_digest :: STRING AS checkpoint_digest,
value_json :previous_checkpoint_digest :: STRING previous_digest,
value_json :network_total_transaction :: STRING network_total_transactions,
value_json :validator_signature :: STRING AS validator_signature,
value_json :total_transaction_blocks :: INT AS tx_count,
SYSDATE() AS inserted_timestamp,
'{{ invocation_id }}' AS _invocation_id
FROM
parsed_checkpoint_data

View File

@ -0,0 +1,32 @@
{{ config (
materialized = "table"
) }}
WITH base AS (
SELECT
PARSE_JSON(
A.value
) AS value_json,
PARSE_JSON(
A.value :event_json
) AS event_json
FROM
{{ ref('bronze__events_backfill') }} A
)
SELECT
value_json :checkpoint :: INT AS checkpoint_number,
TO_TIMESTAMP(
value_json :timestamp_ms :: NUMBER / 1000
) AS block_timestamp,
value_json: transaction_digest :: STRING AS tx_digest,
value_json :event_index :: INT AS event_index,
value_json :"package" :: STRING AS package_id,
value_json :"module" :: STRING AS transaction_module,
value_json :"sender" :: STRING AS sender,
value_json :"event_type" :: STRING AS TYPE,
event_json AS parsed_json,
SYSDATE() AS inserted_timestamp,
'{{ invocation_id }}' AS _invocation_id
FROM
base

View File

@ -0,0 +1,53 @@
{{ config (
materialized = "table"
) }}
WITH base AS (
SELECT
PARSE_JSON(VALUE) AS value_json,
PARSE_JSON(
VALUE :transaction_json
) AS transaction_json,
PARSE_JSON(
VALUE :effects_json
) AS efx_json
FROM
{{ ref('bronze__transactions_backfill') }}
)
SELECT
value_json :checkpoint :: NUMBER AS checkpoint_number,
TO_TIMESTAMP(
value_json :timestamp_ms :: NUMBER / 1000
) AS block_timestamp,
value_json: transaction_digest :: STRING AS tx_digest,
value_json: "transaction_kind" :: STRING AS tx_kind,
value_json :"sender" :: STRING AS tx_sender,
LOWER(
object_keys(
transaction_json :"data" [0] :"intent_message" :value
) [0]
) :: STRING AS message_version,
CASE
WHEN efx_json :"V2" :"status" = 'Success' THEN TRUE
ELSE FALSE
END AS tx_succeeded,
efx_json :"V2" :"status" :"Failure" :"error" :: STRING AS tx_error,
efx_json: "V2" :"dependencies" AS tx_dependencies,
value_json :"computation_cost" :: bigint AS gas_used_computation_fee,
value_json: "non_refundable_storage_fee" :: bigint AS gas_used_non_refundable_storage_fee,
value_json: "storage_cost" :: bigint AS gas_used_storage_fee,
value_json: "storage_rebate" :: bigint AS gas_used_storage_rebate,
value_json :"gas_budget" :: bigint AS gas_budget,
value_json :"gas_owner" :: STRING AS gas_owner,
value_json :"gas_price" :: bigint AS gas_price,
(
gas_used_computation_fee + gas_used_storage_fee - gas_used_storage_rebate
) / pow(
10,
9
) AS tx_fee,
SYSDATE() AS inserted_timestamp,
'{{ invocation_id }}' AS _invocation_id
FROM
base

View File

@ -0,0 +1,52 @@
{{ config (
materialized = "table"
) }}
WITH base AS (
SELECT
PARSE_JSON(
A.value
) AS value_json,
PARSE_JSON(
A.value :transaction_json
) AS transaction_json,
PARSE_JSON(
A.value :effects_json
) AS efx_json,
b.index AS payload_index,
C.key AS payload_type,
C.value AS payload_details
FROM
{{ ref('bronze__transactions_backfill') }} A,
LATERAL FLATTEN(
transaction_json :data [0] :intent_message :value :"V1" :kind :ProgrammableTransaction :commands
) b,
LATERAL FLATTEN(
b.value
) C
)
SELECT
value_json :checkpoint :: NUMBER AS checkpoint_number,
TO_TIMESTAMP(
value_json :timestamp_ms :: NUMBER / 1000
) AS block_timestamp,
value_json: transaction_digest :: STRING AS tx_digest,
value_json: "transaction_kind" :: STRING AS tx_kind,
value_json :"sender" :: STRING AS tx_sender,
LOWER(
object_keys(
transaction_json :"data" [0] :"intent_message" :value
) [0]
) :: STRING AS message_version,
CASE
WHEN efx_json :"V2" :"status" = 'Success' THEN TRUE
ELSE FALSE
END AS tx_succeeded,
payload_index,
payload_type,
payload_details,
SYSDATE() AS inserted_timestamp,
'{{ invocation_id }}' AS _invocation_id
FROM
base

View File

@ -7,6 +7,9 @@ sources:
tables:
- name: checkpoints
- name: transactions
- name: checkpoints_backfill
- name: transactions_backfill
- name: events
- name: crosschain
database: "{{ 'crosschain' if target.database == 'SUI' else 'crosschain_dev' }}"