blocks migration + final

This commit is contained in:
Jack Forgash 2025-02-19 16:25:13 -07:00
parent 3e59c5031a
commit 925c66040c
11 changed files with 127 additions and 29 deletions

View File

@ -6,7 +6,7 @@
merge_exclude_columns = ['inserted_timestamp'],
cluster_by = ['block_timestamp::DATE','_inserted_timestamp::DATE', '_partition_by_block_number'],
unique_key = 'block_id',
tags = ['load', 'load_blocks','scheduled_core'],
tags = ['load', 'load_blocks','scheduled_core', 'deprecated_lake_archive'],
full_refresh = False
) }}

View File

@ -5,7 +5,7 @@
merge_exclude_columns = ['inserted_timestamp'],
unique_key = 'receipt_id',
cluster_by = ['modified_timestamp::date', '_partition_by_block_number'],
tags = ['load', 'load_shards','scheduled_core']
tags = ['load', 'load_shards','scheduled_core', 'deprecated_lake_archive']
) }}
WITH shards AS (

View File

@ -6,7 +6,7 @@
unique_key = 'receipt_object_id',
cluster_by = ['block_timestamp::DATE','modified_timestamp::DATE', '_partition_by_block_number', ],
post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION ON EQUALITY(tx_hash,receipt_id,receiver_id,predecessor_id);",
tags = ['receipt_map','scheduled_core'],
tags = ['receipt_map','scheduled_core', 'deprecated_lake_archive'],
full_refresh = False
) }}

View File

@ -5,7 +5,7 @@
merge_exclude_columns = ['inserted_timestamp'],
cluster_by = ['_inserted_timestamp::DATE', '_partition_by_block_number'],
unique_key = 'shard_id',
tags = ['load', 'load_shards','scheduled_core'],
tags = ['load', 'load_shards','scheduled_core', 'deprecated_lake_archive'],
full_refresh = False
) }}

View File

@ -3,7 +3,7 @@
incremental_strategy = 'delete+insert',
unique_key = 'tx_hash',
cluster_by = ['block_timestamp::DATE','modified_timestamp::DATE', '_partition_by_block_number'],
tags = ['receipt_map','scheduled_core']
tags = ['receipt_map','scheduled_core', 'deprecated_lake_archive']
) }}
WITH int_txs AS (

View File

@ -0,0 +1,24 @@
{{ config(
materialized = 'ephemeral'
) }}
SELECT
block_id,
block_timestamp,
block_hash,
prev_hash,
block_author,
chunks AS chunks_json,
header AS header_json,
_partition_by_block_number,
streamline_blocks_id,
inserted_timestamp,
modified_timestamp,
_invocation_id
FROM
{{ ref('silver__streamline_blocks') }}
{% if var("BATCH_MIGRATE") %}
WHERE
{{ partition_load_manual('no_buffer') }}
{% endif %}

View File

@ -1,17 +1,27 @@
{{
config(
'materialized' = 'ephemeral'
)
}}
{{ config(
'materialized' = 'ephemeral'
) }}
SELECT
chunk_hash,
block_id,
block_timestamp,
tx_hash,
COALESCE(receipt_id, receipt_object_id) AS receipt_id,
COALESCE(
receipt_id,
receipt_object_id
) AS receipt_id,
receipt_actions AS receipt_json,
execution_outcome AS outcome_json,
_partition_by_block_number
_partition_by_block_number,
streamline_receipts_final_id,
inserted_timestamp,
modified_timestamp,
_invocation_id
FROM
{{ ref('silver__streamline_receipts_final') }}
{% if var("BATCH_MIGRATE") %}
WHERE
{{ partition_load_manual('no_buffer') }}
{% endif %}

View File

@ -1,7 +1,7 @@
{{ config(
materialized = 'ephemeral'
) }}
-- likely need to add batch logic for the migration
WITH lake_transactions_final AS (
SELECT
@ -13,9 +13,18 @@ WITH lake_transactions_final AS (
gas_used,
transaction_fee,
attached_gas,
_partition_by_block_number
_partition_by_block_number,
streamline_transactions_final_id,
inserted_timestamp,
modified_timestamp,
_invocation_id
FROM
{{ ref('silver__streamline_transactions_final') }}
{% if var("BATCH_MIGRATE") %}
WHERE
{{ partition_load_manual('no_buffer') }}
{% endif %}
),
lake_transactions_int AS (
SELECT
@ -28,6 +37,11 @@ lake_transactions_int AS (
_partition_by_block_number
FROM
{{ ref('silver__streamline_transactions') }}
{% if var("BATCH_MIGRATE") %}
WHERE
{{ partition_load_manual('no_buffer') }}
{% endif %}
),
transaction_archive AS (
SELECT

View File

@ -0,0 +1,54 @@
{{ config(
materialized = 'incremental',
incremental_predicates = ["COALESCE(DBT_INTERNAL_DEST.block_timestamp::DATE,'2099-12-31') >= (select min(block_timestamp::DATE) from " ~ generate_tmp_view_name(this) ~ ")"],
incremental_strategy = 'merge',
merge_exclude_columns = ['inserted_timestamp'],
unique_key = 'block_id',
cluster_by = ['block_timestamp::DATE','modified_timestamp::DATE', '_partition_by_block_number'],
post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION ON EQUALITY(block_id);",
tags = ['scheduled_core']
) }}
{% if var('NEAR_MIGRATE_ARCHIVE', False) %}
SELECT
block_id,
block_timestamp,
block_hash,
prev_hash,
block_author,
chunks_json,
header_json,
_partition_by_block_number,
streamline_blocks_id AS blocks_final_id,
inserted_timestamp,
SYSDATE() AS modified_timestamp,
'{{ invocation_id }}' AS _invocation_id
FROM
{{ ref('silver__streamline_blocks') }}
{% else %}
WITH blocks AS (
SELECT
block_id,
block_timestamp,
block_hash,
block_json :prev_hash :: STRING AS prev_hash,
block_json :author :: STRING AS block_author,
block_json :chunks :: ARRAY AS chunks_json,
block_json :header :: OBJECT AS header_json,
partition_key AS _partition_by_block_number
FROM
{{ ref('silver__blocks_v2') }}
)
SELECT
*,
{{ dbt_utils.generate_surrogate_key(
['block_id']
) }} AS blocks_final_id,
SYSDATE() AS inserted_timestamp,
SYSDATE() AS modified_timestamp,
'{{ invocation_id }}' AS _invocation_id
FROM
blocks

View File

@ -20,13 +20,13 @@
receipt_id,
receipt_json,
outcome_json,
_partition_by_block_number
_partition_by_block_number,
streamline_receipts_final_id AS receipts_final_id,
inserted_timestamp,
SYSDATE() AS modified_timestamp,
'{{ invocation_id }}' AS _invocation_id
FROM
{{ ref('_migrate_receipts') }}
{% if var("BATCH_MIGRATE") %}
WHERE
{{ partition_load_manual('no_buffer') }}
{% endif %}
{% else %}
@ -59,7 +59,8 @@ flatten_receipts AS (
chunk_hash,
tx_hash,
tx_succeeded,
VALUE :: variant AS receipt_json
VALUE :: variant AS receipt_json,
_partition_by_block_number
FROM
txs_with_receipts,
LATERAL FLATTEN(
@ -88,7 +89,8 @@ receipts_full AS (
r.receipt_id,
receipt_json,
outcome_json,
tx_succeeded
tx_succeeded,
_partition_by_block_number
FROM
flatten_receipts r
LEFT JOIN flatten_receipt_outcomes ro

View File

@ -30,18 +30,12 @@
transaction_fee,
attached_gas,
_partition_by_block_number,
{{ dbt_utils.generate_surrogate_key(
['tx_hash']
) }} AS transactions_final_id,
SYSDATE() AS inserted_timestamp,
streamline_transactions_final_id AS transactions_final_id,
inserted_timestamp,
SYSDATE() AS modified_timestamp,
'{{ invocation_id }}' AS _invocation_id
FROM
{{ ref('_migrate_txs') }}
{% if var("BATCH_MIGRATE") %}
WHERE
{{ partition_load_manual('no_buffer') }}
{% endif %}
{% else %}