rn to block_id, large batch testing for partial backfill

This commit is contained in:
Jack Forgash 2025-02-07 09:06:06 -07:00
parent 1ae4a6095b
commit 90eaa0fe91
14 changed files with 125 additions and 76 deletions

View File

@ -12,7 +12,7 @@
WITH bronze_blocks AS (
SELECT
VALUE :BLOCK_NUMBER :: INT AS block_number,
VALUE :BLOCK_ID :: INT AS block_id,
DATA :header :hash :: STRING AS block_hash,
DATA :header :timestamp :: timestamp_ntz AS block_timestamp,
partition_key,
@ -36,7 +36,7 @@ WHERE
{% endif %}
)
SELECT
block_number,
block_id,
block_hash,
block_timestamp,
partition_key,

View File

@ -12,7 +12,7 @@
WITH bronze_chunks AS (
SELECT
VALUE :BLOCK_NUMBER :: INT AS block_number,
VALUE :BLOCK_ID :: INT AS block_id,
DATA :header :shard_id :: INT AS shard_id,
DATA :header :chunk_hash :: STRING AS chunk_hash,
partition_key,
@ -36,7 +36,7 @@ WHERE
{% endif %}
)
SELECT
block_number,
block_id,
shard_id,
chunk_hash,
partition_key,

View File

@ -12,7 +12,7 @@
WITH bronze_transactions AS (
SELECT
VALUE :BLOCK_NUMBER :: INT AS block_number,
VALUE :BLOCK_ID :: INT AS block_id,
DATA :transaction :hash :: STRING AS tx_hash,
DATA :transaction :signer_id :: STRING AS signer_id,
partition_key,
@ -36,7 +36,7 @@ WHERE
{% endif %}
)
SELECT
block_number,
block_id,
tx_hash,
signer_id,
partition_key,

View File

@ -3,7 +3,7 @@
) }}
SELECT
MIN(block_id) AS block_number
MIN(block_id) AS block_id
FROM
{{ ref("core__fact_blocks") }}
WHERE

View File

@ -2,15 +2,15 @@
-- depends_on: {{ ref('bronze__FR_blocks') }}
{{ config (
materialized = "incremental",
unique_key = "block_number",
cluster_by = "ROUND(block_number, -3)",
unique_key = "block_id",
cluster_by = "ROUND(block_id, -3)",
merge_exclude_columns = ["inserted_timestamp"],
post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION on equality(block_number)",
post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION on equality(block_id)",
tags = ['streamline_complete']
) }}
SELECT
VALUE :BLOCK_NUMBER :: INT AS block_number,
VALUE :BLOCK_ID :: INT AS block_id,
DATA :header :hash :: STRING AS block_hash,
DATA :header :chunks_included :: INT AS chunks_expected,
ARRAY_SIZE(
@ -48,6 +48,6 @@ WHERE
DATA IS NOT NULL
{% endif %}
qualify(ROW_NUMBER() over (PARTITION BY block_number
qualify(ROW_NUMBER() over (PARTITION BY block_id
ORDER BY
_inserted_timestamp DESC)) = 1

View File

@ -3,14 +3,14 @@
{{ config (
materialized = "incremental",
unique_key = "chunk_hash",
cluster_by = "ROUND(block_number, -3)",
cluster_by = "ROUND(block_id, -3)",
merge_exclude_columns = ["inserted_timestamp"],
post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION on equality(chunk_hash)",
tags = ['streamline_complete']
) }}
SELECT
VALUE :BLOCK_NUMBER :: INT AS block_number,
VALUE :BLOCK_ID :: INT AS block_id,
VALUE :CHUNK_HASH :: STRING AS chunk_hash,
DATA :header: shard_id :: INT AS shard_id,
ARRAY_SIZE(
@ -19,10 +19,6 @@ SELECT
ARRAY_SIZE(
DATA :transactions :: ARRAY
) AS transactions_count,
{{ target.database }}.streamline.udf_extract_hash_array(
DATA :receipts :: ARRAY,
'receipt_id'
) AS receipt_ids,
{{ target.database }}.streamline.udf_extract_hash_array(
DATA :transactions :: ARRAY,
'hash'

View File

@ -3,14 +3,14 @@
{{ config (
materialized = "incremental",
unique_key = "tx_hash",
cluster_by = "ROUND(block_number, -3)",
cluster_by = "ROUND(block_id, -3)",
merge_exclude_columns = ["inserted_timestamp"],
post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION on equality(tx_hash)",
tags = ['streamline_complete']
) }}
SELECT
VALUE :BLOCK_NUMBER :: INT AS block_number,
VALUE :BLOCK_ID :: INT AS block_id,
VALUE :TX_HASH :: STRING AS tx_hash,
DATA :transaction :signer_id :: STRING AS signer_id,
partition_key,

View File

@ -7,50 +7,59 @@
target = "{{this.schema}}.{{this.identifier}}",
params = {
"external_table": "blocks_v2",
"sql_limit": "3000",
"producer_batch_size": "3000",
"worker_batch_size": "3000",
"sql_limit": "490000",
"producer_batch_size": "70000",
"worker_batch_size": "35000",
"sql_source": "{{this.identifier}}",
"order_by_column": "block_number DESC"
"order_by_column": "block_id DESC"
}
),
tags = ['streamline_realtime']
) }}
-- Note, roughly 3,000 blocks per hour (~70k/day).
-- batch sizing is WIP
WITH last_3_days AS (
WITH
{% if var('STREAMLINE_PARTIAL_BACKFILL', false) %}
last_3_days AS (
SELECT
120960000 as block_id
),
{% else %}
last_3_days AS (
SELECT
ZEROIFNULL(block_number) AS block_number
ZEROIFNULL(block_id) AS block_id
FROM
{{ ref("_block_lookback") }}
),
{% endif %}
tbl AS (
SELECT
block_number
block_id
FROM
{{ ref('streamline__blocks') }}
WHERE
(
block_number >= (
block_id >= (
SELECT
block_number
block_id
FROM
last_3_days
)
)
AND block_number IS NOT NULL
AND block_id IS NOT NULL
EXCEPT
-- TODO there may be skipped block heights! use hash / parent hash instead
-- or will a skipped block height return a unique response that i can log
SELECT
block_number
block_id
FROM
{{ ref('streamline__blocks_complete') }}
WHERE
block_number >= (
block_id >= (
SELECT
block_number
block_id
FROM
last_3_days
)
@ -62,7 +71,7 @@ tbl AS (
AND block_hash IS NOT NULL
)
SELECT
block_number,
block_id,
DATE_PART('EPOCH', SYSDATE()) :: INTEGER AS partition_key,
{{ target.database }}.live.udf_api(
'POST',
@ -77,11 +86,11 @@ SELECT
'method',
'block',
'id',
'Flipside/getBlock/0.1',
'Flipside/getBlock/' || block_id :: STRING,
'params',
OBJECT_CONSTRUCT(
'block_id',
block_number
block_id
)
),
'Vault/prod/near/quicknode/mainnet'

View File

@ -7,35 +7,43 @@
target = "{{this.schema}}.{{this.identifier}}",
params = {
"external_table": "chunks_v2",
"sql_limit": "18000",
"producer_batch_size": "6000",
"worker_batch_size": "6000",
"sql_limit": "210000",
"producer_batch_size": "70000",
"worker_batch_size": "35000",
"sql_source": "{{this.identifier}}",
"order_by_column": "block_number DESC"
"order_by_column": "block_id DESC"
}
),
tags = ['streamline_realtime']
) }}
-- Note, roughly 3,000 blocks per hour (~70k/day) * 6 chunks per block (for now)
-- batch sizing is WIP
WITH last_3_days AS (
WITH
{% if var('STREAMLINE_PARTIAL_BACKFILL', false) %}
last_3_days AS (
SELECT
120960000 as block_id
),
{% else %}
last_3_days AS (
SELECT
ZEROIFNULL(block_number) AS block_number
ZEROIFNULL(block_id) AS block_id
FROM
{{ ref("_block_lookback") }}
),
{% endif %}
tbl AS (
SELECT
block_number,
block_id,
chunk_hash
FROM
{{ ref('streamline__chunks') }}
WHERE
(
block_number >= (
block_id >= (
SELECT
block_number
block_id
FROM
last_3_days
)
@ -43,14 +51,14 @@ tbl AS (
AND chunk_hash IS NOT NULL
EXCEPT
SELECT
block_number,
block_id,
chunk_hash
FROM
{{ ref('streamline__chunks_complete') }}
WHERE
block_number >= (
block_id >= (
SELECT
block_number
block_id
FROM
last_3_days
)
@ -62,7 +70,7 @@ tbl AS (
AND chunk_hash IS NOT NULL
)
SELECT
block_number,
block_id,
chunk_hash,
DATE_PART('EPOCH', SYSDATE()) :: INTEGER AS partition_key,
{{ target.database }}.live.udf_api(
@ -78,7 +86,7 @@ SELECT
'method',
'chunk',
'id',
'Flipside/getChunk/0.1',
'Flipside/getChunk/' || chunk_hash :: STRING,
'params',
OBJECT_CONSTRUCT(
'chunk_id',

View File

@ -7,11 +7,11 @@
target = "{{this.schema}}.{{this.identifier}}",
params = {
"external_table": "transactions_v2",
"sql_limit": "100000",
"producer_batch_size": "10000",
"worker_batch_size": "10000",
"sql_limit": "1500000",
"producer_batch_size": "500000",
"worker_batch_size": "250000",
"sql_source": "{{this.identifier}}",
"order_by_column": "block_number DESC"
"order_by_column": "block_id DESC"
}
),
tags = ['streamline_realtime']
@ -23,25 +23,33 @@
-- TODO reminder that we are waiting for FINAL status so retry on some will be required
WITH last_3_days AS (
WITH
{% if var('STREAMLINE_PARTIAL_BACKFILL', false) %}
last_3_days AS (
SELECT
120960000 as block_id
),
{% else %}
last_3_days AS (
SELECT
ZEROIFNULL(block_number) AS block_number
ZEROIFNULL(block_id) AS block_id
FROM
{{ ref("_block_lookback") }}
),
{% endif %}
tbl AS (
SELECT
block_number,
block_id,
tx_hash,
signer_id
FROM
{{ ref('streamline__transactions') }}
WHERE
(
block_number >= (
block_id >= (
SELECT
block_number
block_id
FROM
last_3_days
)
@ -50,15 +58,15 @@ tbl AS (
AND signer_id IS NOT NULL
EXCEPT
SELECT
block_number,
block_id,
tx_hash,
signer_id
FROM
{{ ref('streamline__transactions_complete') }}
WHERE
block_number >= (
block_id >= (
SELECT
block_number
block_id
FROM
last_3_days
)
@ -71,7 +79,7 @@ tbl AS (
AND signer_id IS NOT NULL
)
SELECT
block_number,
block_id,
tx_hash,
DATE_PART('EPOCH', SYSDATE()) :: INTEGER AS partition_key,
{{ target.database }}.live.udf_api(
@ -87,7 +95,7 @@ SELECT
'method',
'EXPERIMENTAL_tx_status',
'id',
'Flipside/getTransactionWithStatus/0.1',
'Flipside/getTransactionWithStatus/' || tx_hash :: STRING,
'params',
OBJECT_CONSTRUCT(
'tx_hash',

View File

@ -4,14 +4,14 @@
) }}
{% if execute %}
{% set height = run_query("SELECT streamline.udf_get_chainhead()") %}
{% set height = run_query("SELECT block_id from streamline.get_chainhead") %}
{% set block_number = height.columns [0].values() [0] %}
{% else %}
{% set block_number = 0 %}
{% endif %}
SELECT
_id AS block_number
_id AS block_id
FROM
{{ source(
'crosschain_silver',

View File

@ -1,16 +1,16 @@
{{ config (
materialized = "incremental",
unique_key = "chunk_hash",
cluster_by = "ROUND(block_number, -3)",
cluster_by = "ROUND(block_id, -3)",
merge_exclude_columns = ["inserted_timestamp"],
post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION on equality(chunk_hash, block_number)",
post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION on equality(chunk_hash, block_id)",
tags = ['streamline_helper']
) }}
WITH blocks_complete AS (
SELECT
block_number,
block_id,
block_hash,
chunk_ids,
_inserted_timestamp
@ -29,7 +29,7 @@ WHERE
),
flatten_chunk_ids AS (
SELECT
block_number,
block_id,
block_hash,
VALUE :: STRING AS chunk_hash,
INDEX AS chunk_index,
@ -41,7 +41,7 @@ flatten_chunk_ids AS (
)
)
SELECT
block_number,
block_id,
block_hash,
chunk_hash,
chunk_index,

View File

@ -1,16 +1,16 @@
{{ config (
materialized = "incremental",
unique_key = "tx_hash",
cluster_by = "ROUND(block_number, -3)",
cluster_by = "ROUND(block_id, -3)",
merge_exclude_columns = ["inserted_timestamp"],
post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION on equality(tx_hash, block_number)",
post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION on equality(tx_hash, block_id)",
tags = ['streamline_helper']
) }}
WITH chunks_complete AS (
SELECT
block_number,
block_id,
chunk_hash,
shard_id,
transaction_ids,
@ -31,7 +31,7 @@ WHERE
),
flatten_tx_ids AS (
SELECT
block_number,
block_id,
chunk_hash,
shard_id,
VALUE :: STRING AS tx_hash,
@ -45,7 +45,7 @@ flatten_tx_ids AS (
)
)
SELECT
block_number,
block_id,
chunk_hash,
shard_id,
tx_hash,

View File

@ -0,0 +1,28 @@
{{ config(
materialized = 'view',
tags = ['streamline_helper']
) }}
SELECT
{{ target.database }}.live.udf_api(
'POST',
'{Service}',
OBJECT_CONSTRUCT(
'Content-Type',
'application/json'
),
OBJECT_CONSTRUCT(
'jsonrpc',
'2.0',
'id',
'dontcare',
'method',
'status',
'params',
OBJECT_CONSTRUCT(
'finality',
'final'
)
),
'Vault/prod/near/quicknode/mainnet'
) :data :result :sync_info :latest_block_height :: INT AS block_id