Merge pull request #458 from FlipsideCrypto/AN-5979-1-model-cleanup

AN-5979/pt 1/rm lake, migration models. reorg curated into silver
This commit is contained in:
Jack Forgash 2025-04-30 12:31:41 -06:00 committed by GitHub
commit bc1bc886e8
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
86 changed files with 29 additions and 1910 deletions

View File

@ -2,7 +2,8 @@
materialized = 'incremental',
unique_key = 'test_timestamp',
full_refresh = False,
tags = ['observability', 'deprecated']
tags = ['observability', 'deprecated'],
enabled = false
) }}
-- TODO this can be deprecated. Not a good metric of completeness.
WITH summary_stats AS (

View File

@ -1,7 +0,0 @@
# Near Lake Silver Models
The models in this directory were used to ingest and process the blocks and shards from the Near Lake Indexer.
They are being deprecated as of February 2025 in favor of Streamline 2.
Todo - around 30-60 days after the migration, these models will be deleted and the associated tables likely dropped from Snowflake.

View File

@ -1,18 +0,0 @@
{{ config(
materialized = 'ephemeral',
tags = ['helper', 'receipt_map', 'deprecated']
) }}
SELECT
receipt_object_id,
block_id,
_partition_by_block_number,
_inserted_timestamp
FROM
{{ target.database }}.silver.streamline_receipts_final
WHERE
_inserted_timestamp >= SYSDATE() - INTERVAL '3 days'
AND (
tx_hash IS NULL
OR block_timestamp IS NULL
)

View File

@ -1,41 +0,0 @@
{{ config(
materialized = 'view',
tags = ['helper', 'receipt_map', 'deprecated']
) }}
WITH receipts AS (
SELECT
A.receipt_id PARENT,
b.value :: STRING item,
block_id,
_partition_by_block_number,
_inserted_timestamp
FROM
{{ ref('silver__streamline_receipts') }} A
JOIN LATERAL FLATTEN(
A.outcome_receipts,
outer => TRUE
) b
{% if var("MANUAL_FIX") %}
WHERE
{{ partition_load_manual('front') }}
{% else %}
WHERE
_partition_by_block_number >= (
SELECT
MIN(_partition_by_block_number) - (3000 * {{ var('RECEIPT_MAP_LOOKBACK_HOURS') }})
FROM
(
SELECT MIN(_partition_by_block_number) AS _partition_by_block_number FROM {{ ref('_retry_range') }}
UNION ALL
SELECT MAX(_partition_by_block_number) AS _partition_by_block_number FROM {{ target.database }}.silver.streamline_receipts_final
)
)
{% endif %}
)
SELECT
*
FROM
receipts

View File

@ -1,76 +0,0 @@
{{ config(
materalized = 'view',
unique_key = 'receipt_id',
tags = ['helper', 'receipt_map', 'deprecated']
) }}
WITH
recursive ancestrytree AS (
SELECT
item,
PARENT
FROM
{{ ref('silver__flatten_receipts') }}
WHERE
PARENT IS NOT NULL
UNION ALL
SELECT
items.item,
t.parent
FROM
ancestrytree t
JOIN {{ ref('silver__flatten_receipts') }}
items
ON t.item = items.parent
),
txs AS (
SELECT
tx_hash,
outcome_receipts,
_partition_by_block_number
FROM
{{ ref('silver__streamline_transactions') }}
{% if var("MANUAL_FIX") %}
WHERE
{{ partition_load_manual('front') }}
{% else %}
WHERE
_partition_by_block_number >= (
SELECT
MIN(_partition_by_block_number) - (3000 * {{ var('RECEIPT_MAP_LOOKBACK_HOURS') }})
FROM
(
SELECT MIN(_partition_by_block_number) AS _partition_by_block_number FROM {{ ref('_retry_range') }}
UNION ALL
SELECT MAX(_partition_by_block_number) AS _partition_by_block_number FROM {{ target.database }}.silver.streamline_receipts_final
)
)
{% endif %}
),
FINAL AS (
SELECT
tx_hash,
A.item,
FALSE is_primary_receipt
FROM
ancestrytree A
JOIN txs b
ON A.parent = b.outcome_receipts [0] :: STRING
WHERE
item IS NOT NULL
UNION ALL
SELECT
A.tx_hash,
outcome_receipts [0] :: STRING AS receipt_id,
TRUE is_primary_receipt
FROM
txs A
)
SELECT
tx_hash,
item AS receipt_id,
is_primary_receipt
FROM
FINAL

View File

@ -1,106 +0,0 @@
-- Deprecated Process
{{ config(
materialized = 'incremental',
incremental_predicates = ["COALESCE(DBT_INTERNAL_DEST.block_timestamp::DATE,'2099-12-31') >= (select min(block_timestamp::DATE) from " ~ generate_tmp_view_name(this) ~ ")"],
incremental_strategy = 'merge',
merge_exclude_columns = ['inserted_timestamp'],
cluster_by = ['block_timestamp::DATE','_inserted_timestamp::DATE', '_partition_by_block_number'],
unique_key = 'block_id',
tags = ['load', 'load_blocks', 'deprecated'],
full_refresh = False
) }}
WITH external_blocks AS (
SELECT
metadata$filename AS _filename,
VALUE,
_partition_by_block_number
FROM
{{ source(
"streamline",
"blocks"
) }}
WHERE
_partition_by_block_number >= (
SELECT
MAX(_partition_by_block_number) - (3000 * {{ var('STREAMLINE_LOAD_LOOKBACK_HOURS') }})
FROM
{{ this }}
)
),
meta AS (
SELECT
job_created_time AS _inserted_timestamp,
file_name AS _filename
FROM
TABLE(
information_schema.external_table_file_registration_history(
start_time => DATEADD(
'hour',
-{{ var('STREAMLINE_LOAD_LOOKBACK_HOURS') }},
SYSDATE()
),
table_name => '{{ source( 'streamline', 'blocks' ) }}'
)
) A
),
blocks AS (
SELECT
e.value :header :height :: NUMBER AS block_id,
TO_TIMESTAMP_NTZ(
e.value :header :timestamp :: STRING
) AS block_timestamp,
e.value :header :hash :: STRING AS block_hash,
e.value :header :prev_hash :: STRING AS prev_hash,
e.value :author :: STRING AS block_author,
e.value :header :gas_price :: NUMBER AS gas_price,
e.value :header :total_supply :: NUMBER AS total_supply,
e.value :header :validator_proposals :: ARRAY AS validator_proposals,
e.value :header :validator_reward :: NUMBER AS validator_reward,
e.value :header :latest_protocol_version :: NUMBER AS latest_protocol_version,
e.value :header :epoch_id :: STRING AS epoch_id,
e.value :header :next_epoch_id :: STRING AS next_epoch_id,
NULL AS tx_count,
-- tx_count is legacy field, deprecate from core view
[] AS events,
-- events does not exist, Figment created this
e.value :chunks :: ARRAY AS chunks,
e.value :header :: OBJECT AS header,
e._partition_by_block_number,
m._inserted_timestamp
FROM
external_blocks e
LEFT JOIN meta m USING (
_filename
)
{% if not var('MANUAL_FIX') %}
{% if is_incremental() %}
WHERE
_inserted_timestamp >= (
SELECT
MAX(_inserted_timestamp)
FROM
{{ this }}
)
{% endif %}
{% endif %}
)
SELECT
*,
{{ dbt_utils.generate_surrogate_key(
['block_id']
) }} AS streamline_blocks_id,
SYSDATE() AS inserted_timestamp,
SYSDATE() AS modified_timestamp,
'{{ invocation_id }}' AS _invocation_id
FROM
blocks
qualify ROW_NUMBER() over (
PARTITION BY block_id
ORDER BY
_inserted_timestamp DESC
) = 1

View File

@ -1,73 +0,0 @@
version: 2
models:
- name: silver__streamline_blocks
description: |-
Parses the blocks JSON files for NEAR.
columns:
- name: BLOCK_ID
description: "{{ doc('block_id')}}"
- name: BLOCK_TIMESTAMP
description: "{{ doc('block_timestamp')}}"
- name: BLOCK_HASH
description: "{{ doc('block_hash')}}"
- name: PREV_HASH
description: "{{ doc('prev_hash')}}"
- name: BLOCK_AUTHOR
description: "{{ doc('block_author')}}"
- name: GAS_PRICE
description: "{{ doc('gas_price')}}"
- name: TOTAL_SUPPLY
description: "{{ doc('total_supply')}}"
- name: VALIDATOR_PROPOSALS
description: "{{ doc('validator_proposals')}}"
- name: VALIDATOR_REWARD
description: "{{ doc('validator_reward')}}"
- name: LATEST_PROTOCOL_VERSION
description: "{{ doc('latest_protocol_version')}}"
- name: EPOCH_ID
description: "{{ doc('epoch_id')}}"
- name: NEXT_EPOCH_ID
description: "{{ doc('next_epoch_id')}}"
- name: TX_COUNT
description: "{{ doc('tx_count')}}"
- name: EVENTS
description: "{{ doc('events')}}"
- name: CHUNKS
description: "{{ doc('chunks')}}"
- name: HEADER
description: "{{ doc('header')}}"
- name: _PARTITION_BY_BLOCK_NUMBER
description: "{{ doc('_partition_by_block_number')}}"
- name: _INSERTED_TIMESTAMP
description: "{{ doc('_inserted_timestamp')}}"
- name: STREAMLINE_BLOCKS_ID
description: "{{doc('id')}}"
- name: INSERTED_TIMESTAMP
description: "{{doc('inserted_timestamp')}}"
- name: MODIFIED_TIMESTAMP
description: "{{doc('modified_timestamp')}}"
- name: _INVOCATION_ID
description: "{{doc('invocation_id')}}"

View File

@ -1,127 +0,0 @@
{{ config(
materialized = 'incremental',
incremental_predicates = ['DBT_INTERNAL_DEST._partition_by_block_number >= (select min(_partition_by_block_number) from ' ~ generate_tmp_view_name(this) ~ ')'],
incremental_strategy = 'merge',
merge_exclude_columns = ['inserted_timestamp'],
unique_key = 'receipt_id',
cluster_by = ['modified_timestamp::date', '_partition_by_block_number'],
tags = ['load', 'load_shards', 'deprecated']
) }}
WITH shards AS (
SELECT
block_id,
shard_id,
receipt_execution_outcomes,
chunk :header :chunk_hash :: STRING AS chunk_hash,
chunk :header :shard_id :: INT AS shard_number,
_partition_by_block_number,
_inserted_timestamp
FROM
{{ ref('silver__streamline_shards') }}
WHERE
ARRAY_SIZE(receipt_execution_outcomes) > 0
{% if var('MANUAL_FIX') %}
AND
{{ partition_load_manual('no_buffer') }}
{% else %}
{% if is_incremental() %}
AND modified_timestamp >= (
SELECT
MAX(modified_timestamp)
FROM
{{ this }}
)
{% endif %}
{% endif %}
),
flatten_receipts AS (
SELECT
concat_ws(
'-',
shard_id,
INDEX
) AS receipt_execution_outcome_id,
block_id,
shard_id,
shard_number,
chunk_hash,
INDEX AS receipt_outcome_execution_index,
VALUE :execution_outcome :: OBJECT AS execution_outcome,
VALUE :receipt :: OBJECT AS receipt,
VALUE :receipt :receipt_id :: STRING AS receipt_id,
VALUE :execution_outcome :id :: STRING AS receipt_outcome_id,
_partition_by_block_number,
_inserted_timestamp
FROM
shards,
LATERAL FLATTEN(
input => receipt_execution_outcomes
)
),
-- TODO review new keys like priority, input_data_ids, output_data_receivers
FINAL AS (
SELECT
receipt :receipt_id :: STRING AS receipt_id,
block_id,
shard_id,
shard_number,
receipt_outcome_execution_index AS receipt_index,
chunk_hash,
receipt,
execution_outcome,
execution_outcome :outcome :status :Failure IS NULL AS receipt_succeeded,
TRY_PARSE_JSON(
execution_outcome :outcome :status :Failure
) AS failure_message,
object_keys(
failure_message
) [0] :: STRING AS error_type_0,
COALESCE(
object_keys(
TRY_PARSE_JSON(
failure_message [error_type_0] :kind
)
) [0] :: STRING,
failure_message [error_type_0] :kind :: STRING
) AS error_type_1,
COALESCE(
object_keys(
TRY_PARSE_JSON(
failure_message [error_type_0] :kind [error_type_1]
)
) [0] :: STRING,
failure_message [error_type_0] :kind [error_type_1] :: STRING
) AS error_type_2,
failure_message [error_type_0] :kind [error_type_1] [error_type_2] :: STRING AS error_message,
execution_outcome :outcome :receipt_ids :: ARRAY AS outcome_receipts,
receipt :predecessor_id :: STRING AS predecessor_id, -- TODO manual backfill of this col required
receipt :receiver_id :: STRING AS receiver_id,
receipt :receipt :Action :signer_id :: STRING AS signer_id,
LOWER(
object_keys(
receipt :receipt
) [0] :: STRING
) AS receipt_type,
_partition_by_block_number,
_inserted_timestamp
FROM
flatten_receipts
)
SELECT
*,
{{ dbt_utils.generate_surrogate_key(
['receipt_id']
) }} AS streamline_receipts_id,
SYSDATE() AS inserted_timestamp,
SYSDATE() AS modified_timestamp,
'{{ invocation_id }}' AS _invocation_id
FROM
FINAL
QUALIFY(row_number() over
(partition by receipt_id
order by shard_number = split(shard_id, '-')[1] :: INT desc, modified_timestamp desc
)) = 1

View File

@ -1,118 +0,0 @@
version: 2
models:
- name: silver__streamline_receipts
description: |-
Singular receipt objects with the shard id and chunk hash from which it was included.
These receipts are only action receipts from the execution outcome of the shard.
columns:
- name: RECEIPT_ID
description: "{{ doc('receipt_id')}}"
tests:
- not_null
- unique:
where: receipt_id != 'FA9zcm7WkWxdjkub7WFiKkQdnnQrcEmBht94VFzXfkm1'
- name: BLOCK_ID
description: "{{ doc('block_id')}}"
tests:
- not_null
- name: SHARD_ID
description: "{{ doc('shard_id')}}"
tests:
- not_null
- name: RECEIPT_INDEX
description: "{{ doc('receipt_index')}}"
tests:
- not_null
- name: CHUNK_HASH
description: "{{ doc('chunk_hash')}}"
tests:
- not_null:
where: _partition_by_block_number != 34690000
- name: RECEIPT
description: "{{ doc('receipt')}}"
- name: EXECUTION_OUTCOME
description: "{{ doc('execution_outcome')}}"
- name: RECEIPT_SUCCEEDED
description: "{{ doc('receipt_succeeded')}}"
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- BOOLEAN
- name: FAILURE_MESSAGE
- name: ERROR_TYPE_0
description: "{{ doc('error_type_0')}}"
tests:
- not_null:
where: NOT RECEIPT_SUCCEEDED
- name: ERROR_TYPE_1
description: "{{ doc('error_type_1')}}"
tests:
- not_null:
where: NOT RECEIPT_SUCCEEDED
- name: ERROR_TYPE_2
description: "{{ doc('error_type_2')}}"
tests:
- not_null:
where: NOT RECEIPT_SUCCEEDED AND ERROR_TYPE_1 NOT IN ('DelegateActionExpired', 'DelegateActionInvalidSignature')
- name: ERROR_MESSAGE
description: "{{ doc('error_message')}}"
- name: OUTCOME_RECEIPTS
description: "{{ doc('receipt_outcome_id')}}"
tests:
- not_null
- name: PREDECESSOR_ID
description: "{{ doc('predecessor_id')}}"
- name: RECEIVER_ID
description: "{{ doc('receiver_id')}}"
tests:
- not_null
- name: SIGNER_ID
description: "{{ doc('signer_id')}}"
tests:
- not_null
- name: RECEIPT_TYPE
description: "{{ doc('receipt_type')}}"
tests:
- not_null
- name: _PARTITION_BY_BLOCK_NUMBER
description: "{{ doc('_partition_by_block_number')}}"
- name: _INSERTED_TIMESTAMP
description: "{{ doc('_inserted_timestamp')}}"
- name: STREAMLINE_RECEIPTS_ID
description: "{{doc('id')}}"
tests:
- not_null
- unique:
where: receipt_id != 'FA9zcm7WkWxdjkub7WFiKkQdnnQrcEmBht94VFzXfkm1'
- name: INSERTED_TIMESTAMP
description: "{{doc('inserted_timestamp')}}"
- name: MODIFIED_TIMESTAMP
description: "{{doc('modified_timestamp')}}"
- name: _INVOCATION_ID
description: "{{doc('invocation_id')}}"

View File

@ -1,176 +0,0 @@
{{ config(
materialized = 'incremental',
incremental_predicates = ["COALESCE(DBT_INTERNAL_DEST.block_timestamp::DATE,'2099-12-31') >= (select min(block_timestamp::DATE) from " ~ generate_tmp_view_name(this) ~ ")"],
incremental_strategy = 'merge',
merge_exclude_columns = ['inserted_timestamp'],
unique_key = 'receipt_object_id',
cluster_by = ['block_timestamp::DATE','modified_timestamp::DATE', '_partition_by_block_number', ],
post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION ON EQUALITY(tx_hash,receipt_id,receiver_id,predecessor_id);",
tags = ['receipt_map', 'deprecated'],
full_refresh = False
) }}
WITH retry_range AS (
SELECT
*
FROM
{{ ref('_retry_range')}}
),
base_receipts AS (
SELECT
receipt_id,
block_id,
shard_id,
receipt_index,
chunk_hash,
receipt,
execution_outcome,
outcome_receipts,
predecessor_id,
receiver_id,
signer_id,
receipt_type,
receipt_succeeded,
error_type_0,
error_type_1,
error_type_2,
error_message,
_partition_by_block_number,
_inserted_timestamp
FROM
{{ ref('silver__streamline_receipts') }} r
{% if var('MANUAL_FIX') %}
WHERE
{{ partition_load_manual('no_buffer') }}
{% else %}
WHERE
_partition_by_block_number >= (
SELECT
MIN(_partition_by_block_number) - (3000 * {{ var('RECEIPT_MAP_LOOKBACK_HOURS') }})
FROM
(
SELECT MIN(_partition_by_block_number) AS _partition_by_block_number FROM retry_range
UNION ALL
SELECT MAX(_partition_by_block_number) AS _partition_by_block_number FROM {{ this }}
)
)
AND (
{% if is_incremental() %}
modified_timestamp >= (
SELECT
MAX(modified_timestamp)
FROM
{{ this }}
)
OR
{% endif %}
receipt_id IN (
SELECT
DISTINCT receipt_object_id
FROM
retry_range
)
)
{% endif %}
),
blocks AS (
SELECT
block_id,
block_timestamp,
_partition_by_block_number,
_inserted_timestamp
FROM
{{ ref('silver__streamline_blocks') }}
{% if var('MANUAL_FIX') %}
WHERE
{{ partition_load_manual('no_buffer') }}
{% else %}
WHERE
_partition_by_block_number >= (
SELECT
MIN(_partition_by_block_number) - (3000 * {{ var('RECEIPT_MAP_LOOKBACK_HOURS') }})
FROM
(
SELECT MIN(_partition_by_block_number) AS _partition_by_block_number FROM retry_range
UNION ALL
SELECT MAX(_partition_by_block_number) AS _partition_by_block_number FROM {{ this }}
)
)
{% endif %}
),
append_tx_hash AS (
SELECT
m.tx_hash,
r.receipt_id,
r.block_id,
r.shard_id,
r.receipt_index,
r.chunk_hash,
r.receipt,
r.execution_outcome,
r.outcome_receipts,
r.predecessor_id,
r.receiver_id,
r.signer_id,
r.receipt_type,
r.receipt_succeeded,
r.error_type_0,
r.error_type_1,
r.error_type_2,
r.error_message,
r._partition_by_block_number,
r._inserted_timestamp
FROM
base_receipts r
INNER JOIN {{ ref('silver__receipt_tx_hash_mapping') }}
m USING (receipt_id)
),
FINAL AS (
SELECT
tx_hash,
receipt_id AS receipt_object_id, -- TODO drop this col after manual backfill of just receipt_id
receipt_id,
r.block_id,
b.block_timestamp,
receipt_index,
chunk_hash,
receipt AS receipt_actions,
execution_outcome,
outcome_receipts AS receipt_outcome_id,
predecessor_id, -- TODO manual backfill of this col required
receiver_id,
signer_id,
receipt_type,
execution_outcome :outcome :gas_burnt :: NUMBER AS gas_burnt,
execution_outcome :outcome :status :: variant AS status_value,
execution_outcome :outcome :logs :: ARRAY AS logs,
execution_outcome :proof :: ARRAY AS proof,
execution_outcome :outcome :metadata :: variant AS metadata,
receipt_succeeded,
error_type_0,
error_type_1,
error_type_2,
error_message,
_partition_by_block_number,
_inserted_timestamp
FROM
append_tx_hash r
INNER JOIN blocks b USING (block_id)
)
SELECT
*,
{{ dbt_utils.generate_surrogate_key(
['receipt_object_id']
) }} AS streamline_receipts_final_id, -- todo also will need to update this hash later
SYSDATE() AS inserted_timestamp,
SYSDATE() AS modified_timestamp,
'{{ invocation_id }}' AS _invocation_id
FROM
FINAL

View File

@ -1,120 +0,0 @@
version: 2
models:
- name: silver__streamline_receipts_final
description: |-
Singular receipt objects with the shard id and chunk hash from which it was included.
columns:
- name: TX_HASH
description: "{{ doc('tx_hash')}}"
tests:
- not_null
- name: RECEIPT_OBJECT_ID
description: "{{ doc('receipt_object_id')}}"
tests:
- not_null
- name: RECEIPT_ID
description: "{{ doc('receipt_id')}}"
- name: BLOCK_ID
description: "{{ doc('block_id')}}"
tests:
- not_null
- name: BLOCK_TIMESTAMP
description: "{{ doc('block_timestamp')}}"
tests:
- not_null
- name: RECEIPT_INDEX
description: "{{ doc('receipt_index')}}"
- name: CHUNK_HASH
description: "{{ doc('chunk_hash')}}"
tests:
- not_null:
where: "block_id not in (34691244, 34691277)"
- name: RECEIPT_ACTIONS
description: "{{ doc('receipt')}}"
- name: EXECUTION_OUTCOME
description: "{{ doc('execution_outcome')}}"
- name: RECEIPT_OUTCOME_ID
description: "{{ doc('receipt_outcome_id')}}"
- name: PREDECESSOR_ID
description: "{{ doc('predecessor_id')}}"
- name: RECEIVER_ID
description: "{{ doc('receiver_id')}}"
tests:
- not_null
- name: SIGNER_ID
description: "{{ doc('signer_id')}}"
tests:
- not_null
- name: RECEIPT_TYPE
description: "{{ doc('receipt_type')}}"
- name: GAS_BURNT
description: "{{ doc('gas_burnt')}}"
- name: STATUS_VALUE
description: "{{ doc('status_value')}}"
- name: LOGS
description: "{{ doc('logs')}}"
- name: PROOF
description: "{{ doc('proof')}}"
- name: METADATA
description: "{{ doc('metadata')}}"
- name: RECEIPT_SUCCEEDED
description: "{{ doc('receipt_succeeded')}}"
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- BOOLEAN
- name: ERROR_TYPE_0
description: "{{ doc('error_type_0')}}"
- name: ERROR_TYPE_1
description: "{{ doc('error_type_1')}}"
- name: ERROR_TYPE_2
description: "{{ doc('error_type_2')}}"
- name: ERROR_MESSAGE
description: "{{ doc('error_message')}}"
- name: _PARTITION_BY_BLOCK_NUMBER
description: "{{ doc('_partition_by_block_number')}}"
- name: _INSERTED_TIMESTAMP
description: "{{ doc('_inserted_timestamp')}}"
- name: STREAMLINE_RECEIPTS_FINAL_ID
description: "{{doc('id')}}"
tests:
- not_null
- unique
- name: INSERTED_TIMESTAMP
description: "{{doc('inserted_timestamp')}}"
- name: MODIFIED_TIMESTAMP
description: "{{doc('modified_timestamp')}}"
- name: _INVOCATION_ID
description: "{{doc('invocation_id')}}"

View File

@ -1,96 +0,0 @@
{{ config(
materialized = 'incremental',
incremental_strategy = 'merge',
incremental_predicates = ['DBT_INTERNAL_DEST._partition_by_block_number >= (select min(_partition_by_block_number) from ' ~ generate_tmp_view_name(this) ~ ')'],
merge_exclude_columns = ['inserted_timestamp'],
cluster_by = ['_inserted_timestamp::DATE', '_partition_by_block_number'],
unique_key = 'shard_id',
tags = ['load', 'load_shards', 'deprecated'],
full_refresh = False
) }}
WITH external_shards AS (
SELECT
metadata$filename AS _filename,
VALUE,
_partition_by_block_number
FROM
{{ source(
"streamline",
"shards"
) }}
WHERE
_partition_by_block_number >= (
SELECT
MAX(_partition_by_block_number) - (3000 * {{ var('STREAMLINE_LOAD_LOOKBACK_HOURS') }})
FROM
{{ this }}
)
),
meta AS (
SELECT
job_created_time AS _inserted_timestamp,
file_name AS _filename
FROM
TABLE(
information_schema.external_table_file_registration_history(
start_time => DATEADD(
'hour',
-{{ var('STREAMLINE_LOAD_LOOKBACK_HOURS') }},
SYSDATE()
),
table_name => '{{ source( 'streamline', 'shards' ) }}'
)
) A
),
shards AS (
SELECT
e._filename,
SPLIT(
e._filename,
'/'
) [0] :: NUMBER AS block_id,
RIGHT(SPLIT(e._filename, '.') [0], 1) :: NUMBER AS _shard_number,
concat_ws(
'-',
block_id :: STRING,
_shard_number :: STRING
) AS shard_id,
e.value :chunk :: VARIANT AS chunk,
e.value :receipt_execution_outcomes :: VARIANT AS receipt_execution_outcomes,
e.value :shard_id :: NUMBER AS shard_number,
e.value :state_changes :: VARIANT AS state_changes,
e._partition_by_block_number,
m._inserted_timestamp
FROM
external_shards e
LEFT JOIN meta m USING (_filename)
{% if not var('MANUAL_FIX') %}
{% if is_incremental() %}
WHERE
_inserted_timestamp >= (
SELECT
MAX(_inserted_timestamp)
FROM
{{ this }}
)
{% endif %}
{% endif %}
)
SELECT
*,
{{ dbt_utils.generate_surrogate_key(
['shard_id']
) }} AS streamline_shards_id,
SYSDATE() AS inserted_timestamp,
SYSDATE() AS modified_timestamp,
'{{ invocation_id }}' AS _invocation_id
FROM
shards
qualify ROW_NUMBER() over (
PARTITION BY shard_id
ORDER BY
_inserted_timestamp DESC
) = 1

View File

@ -1,70 +0,0 @@
version: 2
models:
- name: silver__streamline_shards
description: |-
Parses the shards JSON files for NEAR.
columns:
- name: BLOCK_ID
description: "{{ doc('block_id')}}"
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- NUMBER
- INTEGER
- name: SHARD_ID
description: "{{ doc('shard_id')}}"
tests:
- not_null
- unique
- name: CHUNK
description: "{{ doc('chunks')}}"
tests:
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- VARIANT
- OBJECT
- name: RECEIPT_EXECUTION_OUTCOMES
description: "{{ doc('receipt_execution_outcomes')}}"
- name: SHARD_NUMBER
description: "{{ doc('shard_number')}}"
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- NUMBER
- INTEGER
- name: STATE_CHANGES
description: "{{ doc('state_changes')}}"
tests:
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- ARRAY
- VARIANT
- name: _PARTITION_BY_BLOCK_NUMBER
description: "{{ doc('_partition_by_block_number')}}"
- name: _INSERTED_TIMESTAMP
description: "{{ doc('_inserted_timestamp')}}"
- name: STREAMLINE_SHARDS_ID
description: "{{doc('id')}}"
- name: INSERTED_TIMESTAMP
description: "{{doc('inserted_timestamp')}}"
- name: MODIFIED_TIMESTAMP
description: "{{doc('modified_timestamp')}}"
- name: _INVOCATION_ID
description: "{{doc('invocation_id')}}"

View File

@ -1,118 +0,0 @@
{{ config(
materialized = 'incremental',
incremental_predicates = ['DBT_INTERNAL_DEST._partition_by_block_number >= (select min(_partition_by_block_number) from ' ~ generate_tmp_view_name(this) ~ ')'],
incremental_strategy = 'merge',
merge_exclude_columns = ['inserted_timestamp'],
unique_key = 'tx_hash',
cluster_by = ['modified_timestamp::date', '_partition_by_block_number'],
tags = ['load', 'load_shards', 'deprecated']
) }}
WITH chunks AS (
SELECT
block_id,
shard_id,
chunk,
_partition_by_block_number,
_inserted_timestamp
FROM
{{ ref('silver__streamline_shards') }}
WHERE
array_size(chunk :transactions :: ARRAY) > 0
{% if var('MANUAL_FIX') %}
AND
{{ partition_load_manual('no_buffer') }}
{% else %}
{% if is_incremental() %}
AND modified_timestamp >= (
SELECT
MAX(modified_timestamp)
FROM
{{ this }}
)
{% endif %}
{% endif %}
),
flatten_transactions AS (
SELECT
VALUE :transaction :hash :: STRING AS tx_hash,
block_id,
shard_id,
chunk :header :shard_id :: INT AS shard_number,
INDEX AS transactions_index,
chunk :header :chunk_hash :: STRING AS chunk_hash,
VALUE :outcome :execution_outcome :outcome :receipt_ids :: ARRAY AS outcome_receipts,
VALUE AS tx,
_partition_by_block_number,
_inserted_timestamp
FROM
chunks,
LATERAL FLATTEN(
input => chunk :transactions :: ARRAY
)
),
txs AS (
SELECT
tx_hash,
block_id,
shard_id,
shard_number,
transactions_index,
chunk_hash,
outcome_receipts,
tx,
tx :transaction :actions :: VARIANT AS _actions,
tx :transaction :hash :: STRING AS _hash,
tx :transaction :nonce :: STRING AS _nonce,
tx :outcome :execution_outcome :: VARIANT AS _outcome,
tx :transaction :public_key :: STRING AS _public_key,
[] AS _receipt,
tx :transaction :receiver_id :: STRING AS _receiver_id,
tx :transaction :signature :: STRING AS _signature,
tx :transaction :signer_id :: STRING AS _signer_id,
_partition_by_block_number,
_inserted_timestamp
FROM
flatten_transactions
),
FINAL AS (
SELECT
tx_hash,
block_id,
shard_id,
shard_number,
transactions_index,
chunk_hash,
outcome_receipts,
tx,
_actions,
_hash,
_nonce,
_outcome,
_public_key,
_receipt,
_receiver_id,
_signature,
_signer_id,
_partition_by_block_number,
_inserted_timestamp
FROM
txs
)
SELECT
*,
{{ dbt_utils.generate_surrogate_key(
['tx_hash']
) }} AS streamline_transactions_id,
SYSDATE() AS inserted_timestamp,
SYSDATE() AS modified_timestamp,
'{{ invocation_id }}' AS _invocation_id
FROM
FINAL
QUALIFY(row_number() over
(partition by tx_hash
order by shard_number = split(shard_id, '-')[1] :: INT desc, modified_timestamp desc
)) = 1

View File

@ -1,92 +0,0 @@
version: 2
models:
- name: silver__streamline_transactions
description: |-
Singular transaction objects with the shard id and chunk hash from which it was included.
tests:
- dbt_utils.unique_combination_of_columns:
combination_of_columns:
- tx_hash
- block_id
columns:
- name: TX_HASH
description: "{{ doc('tx_hash')}}"
tests:
- not_null
- name: BLOCK_ID
description: "{{ doc('block_id')}}"
tests:
- not_null
- name: SHARD_ID
description: "{{ doc('shard_id')}}"
tests:
- not_null
- name: TRANSACTIONS_INDEX
description: "{{ doc('receipt_index')}}"
tests:
- not_null
- name: CHUNK_HASH
description: "{{ doc('chunk_hash')}}"
tests:
- not_null
- name: OUTCOME_RECEIPTS
description: "{{ doc('outcome')}}"
- name: TX
description: "{{ doc('tx')}}"
tests:
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- OBJECT
- VARIANT
- name: _ACTIONS
description: "{{ doc('actions')}}"
- name: _HASH
description: "{{ doc('tx_hash')}}"
- name: _NONCE
description: "{{ doc('nonce')}}"
- name: _OUTCOME
description: "{{ doc('outcome')}}"
- name: _PUBLIC_KEY
description: "{{ doc('public_key')}}"
- name: _RECEIPT
description: "{{ doc('receipt')}}"
- name: _RECEIVER_ID
description: "{{ doc('tx_receiver')}}"
- name: _SIGNATURE
description: "{{ doc('signature')}}"
- name: _SIGNER_ID
description: "{{ doc('tx_signer')}}"
- name: _INSERTED_TIMESTAMP
description: "{{ doc('_inserted_timestamp')}}"
- name: STREAMLINE_TRANSACTIONS_ID
description: "{{doc('id')}}"
tests:
- unique
- name: INSERTED_TIMESTAMP
description: "{{doc('inserted_timestamp')}}"
- name: MODIFIED_TIMESTAMP
description: "{{doc('modified_timestamp')}}"
- name: _INVOCATION_ID
description: "{{doc('invocation_id')}}"

View File

@ -1,261 +0,0 @@
{{ config(
materialized = 'incremental',
incremental_strategy = 'delete+insert',
unique_key = 'tx_hash',
cluster_by = ['block_timestamp::DATE','modified_timestamp::DATE', '_partition_by_block_number'],
tags = ['receipt_map', 'deprecated']
) }}
WITH int_txs AS (
SELECT
block_id,
tx_hash,
shard_id,
transactions_index,
chunk_hash,
outcome_receipts,
_actions,
_hash,
_nonce,
_outcome,
_public_key,
_receiver_id,
_signature,
_signer_id,
_partition_by_block_number,
_inserted_timestamp
FROM
{{ ref('silver__streamline_transactions') }}
{% if var('MANUAL_FIX') %}
WHERE
{{ partition_load_manual('no_buffer') }}
{% else %}
WHERE
{{ partition_incremental_load(
8000,
6000,
0
) }}
{% endif %}
),
int_receipts AS (
SELECT
block_id,
block_timestamp,
tx_hash,
receipt_object_id,
execution_outcome,
receipt_succeeded,
gas_burnt,
_partition_by_block_number,
_inserted_timestamp
FROM
{{ ref('silver__streamline_receipts_final') }}
{% if var('MANUAL_FIX') %}
WHERE
{{ partition_load_manual('end') }}
{% else %}
WHERE
{{ partition_incremental_load(
8000,
6000,
0
) }}
{% endif %}
),
int_blocks AS (
SELECT
block_id,
block_hash,
block_timestamp,
_partition_by_block_number,
_inserted_timestamp
FROM
{{ ref('silver__streamline_blocks') }}
WHERE
_partition_by_block_number >= (
SELECT MIN(_partition_by_block_number) FROM int_txs
)
),
receipt_array AS (
SELECT
tx_hash,
ARRAY_AGG(execution_outcome) within GROUP (
ORDER BY
block_timestamp
) AS receipt
FROM
int_receipts
GROUP BY
1
),
base_transactions AS (
SELECT
t.tx_hash,
t.block_id,
b.block_hash,
b.block_timestamp,
t.shard_id,
transactions_index,
t.chunk_hash,
outcome_receipts,
OBJECT_CONSTRUCT(
'actions',
_actions, -- does not need to exist as col (in receipts) but fields used in gas calc!
'hash',
_hash, -- exists as col
'nonce',
_nonce, -- exists as col
'outcome',
_outcome, -- does not need to exist as col (in receipts) but fields used in gas calc!
'public_key',
_public_key, -- does not exist as col
'receipt',
r.receipt, -- does not need to exist as col
'receiver_id',
_receiver_id, -- exists as col
'signature',
_signature, -- exists as col
'signer_id',
_signer_id -- exists as col
) AS tx, -- TODO dropping this object
_partition_by_block_number,
t._inserted_timestamp
FROM
int_txs t
INNER JOIN receipt_array r USING (tx_hash)
INNER JOIN int_blocks b USING (block_id)
),
-- TODO review the below section in a subsequent PR. ~3y old at this point.
-- it is calculating the gaas and fees. Largely accurate, but may have found an inaccuracy:
-- Dojw9TnLbTLTxeJAuGtiSTZGdruCAmdu1KsLgjLHwkkb incorrect tx fees / gas burnt?
{# The following steps were copied directly from legacy tx model to replicate columns #}
actions AS (
SELECT
tx_hash,
SUM(
VALUE :FunctionCall :gas :: NUMBER
) AS attached_gas
FROM
base_transactions,
LATERAL FLATTEN(
input => tx :actions
)
GROUP BY
1
),
transactions AS (
SELECT
block_id,
tx :outcome :block_hash :: STRING AS block_hash,
tx_hash,
block_timestamp,
tx :nonce :: NUMBER AS nonce,
tx :signature :: STRING AS signature,
tx :receiver_id :: STRING AS tx_receiver,
tx :signer_id :: STRING AS tx_signer,
tx,
tx :outcome :outcome :gas_burnt :: NUMBER AS transaction_gas_burnt,
tx :outcome :outcome :tokens_burnt :: NUMBER AS transaction_tokens_burnt,
_partition_by_block_number,
_inserted_timestamp
FROM
base_transactions
),
gas_burnt AS (
SELECT
tx_hash,
SUM(gas_burnt) AS receipt_gas_burnt,
SUM(execution_outcome :outcome :tokens_burnt :: NUMBER) AS receipt_tokens_burnt
FROM
int_receipts
WHERE
execution_outcome :outcome: tokens_burnt :: NUMBER != 0
GROUP BY
1
),
determine_tx_status AS (
SELECT
DISTINCT tx_hash,
LAST_VALUE(
receipt_succeeded
) over (
PARTITION BY tx_hash
ORDER BY
block_id ASC
) AS tx_succeeded
FROM
int_receipts
),
FINAL AS (
SELECT
t.block_id,
t.block_hash,
t.tx_hash,
t.block_timestamp,
t.nonce,
t.signature,
t.tx_receiver,
t.tx_signer,
t.tx,
t.transaction_gas_burnt + g.receipt_gas_burnt AS gas_used,
t.transaction_tokens_burnt + g.receipt_tokens_burnt AS transaction_fee,
COALESCE(
actions.attached_gas,
gas_used
) AS attached_gas,
s.tx_succeeded,
IFF (
tx_succeeded,
'Success',
'Fail'
) AS tx_status, -- DEPRECATE TX_STATUS IN GOLD
t._partition_by_block_number,
t._inserted_timestamp
FROM
transactions AS t
INNER JOIN determine_tx_status s
ON t.tx_hash = s.tx_hash
INNER JOIN actions
ON t.tx_hash = actions.tx_hash
INNER JOIN gas_burnt g
ON t.tx_hash = g.tx_hash
)
SELECT
tx_hash,
block_id,
block_hash,
block_timestamp,
nonce,
signature,
tx_receiver,
tx_signer,
tx,
gas_used,
transaction_fee,
attached_gas,
tx_succeeded,
tx_status,
_partition_by_block_number,
_inserted_timestamp,
{{ dbt_utils.generate_surrogate_key(
['tx_hash']
) }} AS streamline_transactions_final_id,
SYSDATE() AS inserted_timestamp,
SYSDATE() AS modified_timestamp,
'{{ invocation_id }}' AS _invocation_id
FROM
FINAL

View File

@ -1,98 +0,0 @@
version: 2
models:
- name: silver__streamline_transactions_final
description: |-
Singular transaction objects with the shard id and chunk hash from which it was included.
columns:
- name: TX_HASH
description: "{{ doc('tx_hash')}}"
- name: BLOCK_ID
description: "{{ doc('block_id')}}"
tests:
- not_null:
where: _inserted_timestamp <= current_timestamp - interval '1 hour'
- name: BLOCK_TIMESTAMP
description: "{{ doc('block_timestamp')}}"
tests:
- not_null:
where: _inserted_timestamp <= current_timestamp - interval '1 hour'
- name: BLOCK_HASH
description: "{{ doc('block_hash')}}"
tests:
- not_null:
where: _inserted_timestamp <= current_timestamp - interval '1 hour'
- name: _PARTITION_BY_BLOCK_NUMBER
description: "{{ doc('_partition_by_block_number')}}"
- name: NONCE
description: "{{ doc('nonce')}}"
- name: SIGNATURE
description: "{{ doc('signature')}}"
- name: TX_RECEIVER
description: "{{ doc('tx_receiver')}}"
- name: TX_SIGNER
description: "{{ doc('tx_signer')}}"
- name: TX
description: "{{ doc('tx')}}"
- name: TX_SUCCEEDED
description: "{{ doc('tx_succeeded')}}"
tests:
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- BOOLEAN
- not_null:
where: _inserted_timestamp <= current_timestamp - interval '1 hour'
- name: TX_STATUS
description: "{{ doc('tx_status')}}"
tests:
- not_null:
where: _inserted_timestamp <= current_timestamp - interval '1 hour'
- name: GAS_USED
description: "{{ doc('gas_used')}}"
tests:
- not_null:
where: _inserted_timestamp <= current_timestamp - interval '1 hour'
- name: ATTACHED_GAS
description: "{{ doc('attached_gas')}}"
tests:
- not_null:
where: _inserted_timestamp <= current_timestamp - interval '1 hour'
- name: TRANSACTION_FEE
description: "{{ doc('transaction_fee')}}"
tests:
- not_null:
where: _inserted_timestamp <= current_timestamp - interval '1 hour'
- name: _INSERTED_TIMESTAMP
description: "{{ doc('_inserted_timestamp')}}"
- name: STREAMLINE_TRANSACTIONS_FINAL_ID
description: "{{doc('id')}}"
tests:
- unique
- name: INSERTED_TIMESTAMP
description: "{{doc('inserted_timestamp')}}"
- name: MODIFIED_TIMESTAMP
description: "{{doc('modified_timestamp')}}"
- name: _INVOCATION_ID
description: "{{doc('invocation_id')}}"

View File

@ -1,34 +0,0 @@
{{ config(
materialized = 'ephemeral'
) }}
SELECT
block_id,
block_timestamp,
block_hash,
prev_hash,
block_author,
chunks AS chunks_json,
header AS header_json,
_partition_by_block_number,
{{ dbt_utils.generate_surrogate_key(
['block_id']
) }} AS blocks_final_id,
COALESCE(
inserted_timestamp,
_inserted_timestamp,
_load_timestamp
) AS inserted_timestamp,
COALESCE(
modified_timestamp,
_inserted_timestamp,
_load_timestamp
) AS modified_timestamp,
_invocation_id
FROM
{{ ref('silver__streamline_blocks') }}
{% if var("NEAR_MIGRATE_ARCHIVE") %}
WHERE
{{ partition_load_manual('no_buffer') }}
{% endif %}

View File

@ -1,79 +0,0 @@
{{ config(
materialized = 'ephemeral'
) }}
WITH lake_receipts_final AS (
SELECT
chunk_hash,
block_id,
block_timestamp,
tx_hash,
COALESCE(
receipt_id,
receipt_object_id
) AS receipt_id,
COALESCE(
predecessor_id,
receipt_actions :predecessor_id :: STRING
) AS predecessor_id,
receiver_id,
receipt_actions AS receipt_json,
execution_outcome AS outcome_json,
receipt_succeeded,
_partition_by_block_number,
{{ dbt_utils.generate_surrogate_key(
['COALESCE(receipt_id, receipt_object_id)']
) }} AS receipts_final_id,
COALESCE(
inserted_timestamp,
_inserted_timestamp,
_load_timestamp
) AS inserted_timestamp,
COALESCE(
modified_timestamp,
_inserted_timestamp,
_load_timestamp
) AS modified_timestamp,
_invocation_id
FROM
{{ ref('silver__streamline_receipts_final') }}
{% if var("NEAR_MIGRATE_ARCHIVE") %}
WHERE
{{ partition_load_manual('no_buffer') }}
{% endif %}
),
lake_transactions_final AS (
SELECT
tx_hash,
tx_succeeded
FROM
{{ ref('silver__streamline_transactions_final') }}
{% if var("NEAR_MIGRATE_ARCHIVE") %}
WHERE
{{ partition_load_manual('front') }}
{% endif %}
)
SELECT
chunk_hash,
block_id,
block_timestamp,
r.tx_hash,
receipt_id,
predecessor_id,
receiver_id,
receipt_json,
outcome_json,
tx_succeeded,
receipt_succeeded,
_partition_by_block_number,
receipts_final_id,
inserted_timestamp,
modified_timestamp,
_invocation_id
FROM
lake_receipts_final r
LEFT JOIN lake_transactions_final tx
ON r.tx_hash = tx.tx_hash

View File

@ -1,86 +0,0 @@
{{ config(
materialized = 'ephemeral'
) }}
WITH lake_transactions_final AS (
SELECT
block_id,
block_timestamp,
tx_hash,
tx_signer,
tx_receiver,
tx_succeeded,
gas_used,
transaction_fee,
attached_gas,
_partition_by_block_number,
{{ dbt_utils.generate_surrogate_key(
['tx_hash']
) }} AS transactions_final_id,
COALESCE(
inserted_timestamp,
_inserted_timestamp,
_load_timestamp
) AS inserted_timestamp,
COALESCE(
modified_timestamp,
_inserted_timestamp,
_load_timestamp
) AS modified_timestamp,
_invocation_id
FROM
{{ ref('silver__streamline_transactions_final') }}
{% if var("NEAR_MIGRATE_ARCHIVE") %}
WHERE
{{ partition_load_manual('no_buffer') }}
{% endif %}
),
lake_transactions_int AS (
SELECT
tx_hash,
block_id,
shard_number,
chunk_hash,
tx :transaction :: variant AS transaction_json,
tx :outcome :execution_outcome :: variant AS outcome_json,
_partition_by_block_number
FROM
{{ ref('silver__streamline_transactions') }}
{% if var("NEAR_MIGRATE_ARCHIVE") %}
WHERE
{{ partition_load_manual('no_buffer') }}
{% endif %}
),
transaction_archive AS (
SELECT
i.chunk_hash,
i.shard_number AS shard_id,
f.block_id,
f.block_timestamp,
f.tx_hash,
f.tx_signer,
f.tx_receiver,
i.transaction_json,
i.outcome_json,
f.tx_succeeded,
f.gas_used,
f.transaction_fee,
f.attached_gas,
f._partition_by_block_number,
f.transactions_final_id,
f.inserted_timestamp,
f.modified_timestamp,
f._invocation_id
FROM
lake_transactions_final f
LEFT JOIN lake_transactions_int i
ON f.tx_hash = i.tx_hash
AND f._partition_by_block_number = i._partition_by_block_number
)
SELECT
*
FROM
transaction_archive

View File

@ -10,29 +10,6 @@
full_refresh = false
) }}
{% if var('NEAR_MIGRATE_ARCHIVE', False) %}
{% if execute %}
{% do log('Migrating blocks ' ~ var('RANGE_START') ~ ' to ' ~ var('RANGE_END'), info=True) %}
{% do log('Invocation ID: ' ~ invocation_id, info=True) %}
{% endif %}
SELECT
block_id,
block_timestamp,
block_hash,
prev_hash,
block_author,
chunks_json,
header_json,
_partition_by_block_number,
blocks_final_id,
inserted_timestamp,
modified_timestamp,
'{{ invocation_id }}' AS _invocation_id
FROM
{{ ref('_migrate_blocks') }}
{% else %}
WITH blocks AS (
SELECT
block_id,
@ -66,5 +43,3 @@ SELECT
'{{ invocation_id }}' AS _invocation_id
FROM
blocks
{% endif %}

View File

@ -10,64 +10,37 @@
full_refresh = false
) }}
{% if var('NEAR_MIGRATE_ARCHIVE', False) %}
{% if execute %}
{% do log('Migrating receipts ' ~ var('RANGE_START') ~ ' to ' ~ var('RANGE_END'), info=True) %}
{% do log('Invocation ID: ' ~ invocation_id, info=True) %}
{% endif %}
{% if execute and not var("MANUAL_FIX") %}
{% if is_incremental() %}
{% set max_mod_query %}
SELECT
COALESCE(MAX(modified_timestamp), '1970-01-01') modified_timestamp
FROM
{{ this }}
{% endset %}
{% set max_mod = run_query(max_mod_query) [0] [0] %}
{% do log('max_mod: ' ~ max_mod, info=True) %}
{% set min_block_date_query %}
SELECT
MIN(origin_block_timestamp :: DATE) block_timestamp
FROM
{{ ref('silver__transactions_v2') }}
WHERE
modified_timestamp >= '{{max_mod}}'
{% endset %}
SELECT
chunk_hash,
block_id,
block_timestamp,
tx_hash,
receipt_id,
predecessor_id,
receiver_id,
receipt_json,
outcome_json,
tx_succeeded,
receipt_succeeded,
_partition_by_block_number,
receipts_final_id,
inserted_timestamp,
modified_timestamp,
'{{ invocation_id }}' AS _invocation_id
FROM
{{ ref('_migrate_receipts') }}
{% else %}
{% if execute and not var("MANUAL_FIX") %}
{% if is_incremental() %}
{% set max_mod_query %}
SELECT
COALESCE(MAX(modified_timestamp), '1970-01-01') modified_timestamp
FROM
{{ this }}
{% endset %}
{% set max_mod = run_query(max_mod_query) [0] [0] %}
{% do log('max_mod: ' ~ max_mod, info=True) %}
{% set min_block_date_query %}
SELECT
MIN(origin_block_timestamp :: DATE) block_timestamp
FROM
{{ ref('silver__transactions_v2') }}
WHERE
modified_timestamp >= '{{max_mod}}'
{% endset %}
{% set min_bd = run_query(min_block_date_query) [0] [0] %}
{% set min_bd = run_query(min_block_date_query) [0] [0] %}
{% do log('min_bd: ' ~ min_bd, info=True) %}
{% if not min_bd or min_bd == 'None' %}
{% set min_bd = '2099-01-01' %}
{% do log('min_bd: ' ~ min_bd, info=True) %}
{% if not min_bd or min_bd == 'None' %}
{% set min_bd = '2099-01-01' %}
{% do log('min_bd: ' ~ min_bd, info=True) %}
{% endif %}
{% do log('min_block_date: ' ~ min_bd, info=True) %}
{% endif %}
{% do log('min_block_date: ' ~ min_bd, info=True) %}
{% endif %}
{% endif %}
WITH txs_with_receipts AS (
SELECT
@ -246,5 +219,3 @@ FROM
FINAL
qualify(row_number() over (partition by receipt_id order by modified_timestamp desc) = 1)
{% endif %}

View File

@ -10,36 +10,6 @@
full_refresh = false
) }}
{% if var('NEAR_MIGRATE_ARCHIVE', False) %}
{% if execute %}
{% do log('Migrating transactions ' ~ var('RANGE_START') ~ ' to ' ~ var('RANGE_END'), info=True) %}
{% do log('Invocation ID: ' ~ invocation_id, info=True) %}
{% endif %}
SELECT
chunk_hash,
block_id,
block_timestamp,
tx_hash,
tx_receiver,
tx_signer,
transaction_json,
outcome_json,
OBJECT_CONSTRUCT() AS status_json,
tx_succeeded,
gas_used,
transaction_fee,
attached_gas,
_partition_by_block_number,
transactions_final_id,
inserted_timestamp,
modified_timestamp,
'{{ invocation_id }}' AS _invocation_id
FROM
{{ ref('_migrate_txs') }}
{% else %}
WITH txs_with_receipts AS (
SELECT
chunk_hash,
@ -142,5 +112,3 @@ SELECT
'{{ invocation_id }}' AS _invocation_id
FROM
transactions_final
{% endif %}