chunks rt, complete, silver .. upd custom udf

This commit is contained in:
Jack Forgash 2025-02-04 13:53:24 -07:00
parent e2ef3af72d
commit d89e6358b1
11 changed files with 364 additions and 28 deletions

View File

@ -0,0 +1,18 @@
{% macro create_udf_extract_hash_array() %}
{% set sql %}
create or replace function {{ target.database}}.STREAMLINE.UDF_EXTRACT_HASH_ARRAY(object_array variant, object_key string)
returns ARRAY
language python
runtime_version = '3.9'
handler = 'extract_hash_array'
AS
$$
def extract_hash_array(object_array, object_key):
try:
return [object[object_key] for object in object_array]
except:
return []
$$
{% endset %}
{% do run_query(sql) %}
{% endmacro %}

View File

@ -0,0 +1,10 @@
-- TODO - v2 naming?
{{ config (
materialized = 'view',
tags = ['streamline_helper']
) }}
{{ streamline_external_table_FR_query_v2(
model = "chunks_v2",
partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 3), '_', 1) AS INTEGER )"
) }}

View File

@ -0,0 +1,10 @@
{{ config (
materialized = 'view',
tags = ['streamline_helper']
) }}
{{ streamline_external_table_query_v2(
model = "chunks_v2",
partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 3), '_', 1) AS INTEGER )"
) }}

View File

@ -1,39 +1,44 @@
-- depends_on: {{ ref('bronze__blocks') }}
-- depends_on: {{ ref('bronze__FR_blocks') }}
{{ config (
materialized = "incremental",
incremental_strategy = 'merge',
unique_key = "block_hash",
cluster_by = ['modified_timestamp::DATE','partition_key'],
post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION on equality(block_number)",
post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION on equality(block_hash)",
tags = ['scheduled_core']
) }}
WITH bronze_blocks AS (
SELECT
VALUE :BLOCK_NUMBER :: INT AS block_number,
DATA :header :hash :: STRING AS block_hash,
DATA :header :timestamp :: timestamp_ntz AS block_timestamp,
partition_key,
DATA :: VARIANT AS block_json,
DATA :: variant AS block_json,
_inserted_timestamp
FROM
{% if is_incremental() %}
{{ ref('bronze__blocks') }}
WHERE _inserted_timestamp >= (
{% if is_incremental() %}
{{ ref('bronze__blocks') }}
WHERE
_inserted_timestamp >= (
SELECT
COALESCE(MAX(_inserted_timestamp), '1900-01-01'::TIMESTAMP) AS _inserted_timestamp
FROM {{ this }}
) AND DATA IS NOT NULL
COALESCE(MAX(_inserted_timestamp), '1900-01-01' :: TIMESTAMP) AS _inserted_timestamp
FROM
{{ this }})
AND DATA IS NOT NULL
{% else %}
{{ ref('bronze__FR_blocks') }}
WHERE DATA IS NOT NULL
WHERE
DATA IS NOT NULL
{% endif %}
)
)
SELECT
block_number,
block_hash,
block_timestamp,
partition_key,
block_json,
_inserted_timestamp,
@ -41,6 +46,11 @@ SELECT
SYSDATE() AS inserted_timestamp,
SYSDATE() AS modified_timestamp,
'{{ invocation_id }}' AS _invocation_id
FROM bronze_blocks
FROM
bronze_blocks
QUALIFY ROW_NUMBER() OVER (PARTITION BY block_hash ORDER BY _inserted_timestamp DESC) = 1
qualify ROW_NUMBER() over (
PARTITION BY block_hash
ORDER BY
_inserted_timestamp DESC
) = 1

View File

@ -0,0 +1,56 @@
-- depends_on: {{ ref('bronze__chunks') }}
-- depends_on: {{ ref('bronze__FR_chunks') }}
{{ config (
materialized = "incremental",
incremental_strategy = 'merge',
unique_key = "chunk_hash",
cluster_by = ['modified_timestamp::DATE','partition_key'],
post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION on equality(chunk_hash)",
tags = ['scheduled_core']
) }}
WITH bronze_chunks AS (
SELECT
VALUE :BLOCK_NUMBER :: INT AS block_number,
DATA :header :shard_id :: INT AS shard_id,
DATA :header :chunk_hash :: STRING AS chunk_hash,
partition_key,
DATA :: variant AS chunk_json,
_inserted_timestamp
FROM
{% if is_incremental() %}
{{ ref('bronze__chunks') }}
WHERE
_inserted_timestamp >= (
SELECT
COALESCE(MAX(_inserted_timestamp), '1900-01-01' :: TIMESTAMP) AS _inserted_timestamp
FROM
{{ this }})
AND DATA IS NOT NULL
{% else %}
{{ ref('bronze__FR_chunks') }}
WHERE
DATA IS NOT NULL
{% endif %}
)
SELECT
block_number,
shard_id,
chunk_hash,
partition_key,
chunk_json,
_inserted_timestamp,
{{ dbt_utils.generate_surrogate_key(['chunk_hash']) }} AS chunks_v2_id,
SYSDATE() AS inserted_timestamp,
SYSDATE() AS modified_timestamp,
'{{ invocation_id }}' AS _invocation_id
FROM
bronze_chunks
qualify ROW_NUMBER() over (
PARTITION BY chunk_hash
ORDER BY
_inserted_timestamp DESC
) = 1

View File

@ -12,6 +12,15 @@
SELECT
VALUE :BLOCK_NUMBER :: INT AS block_number,
DATA :header :hash :: STRING AS block_hash,
DATA :header :chunks_included :: INT AS chunks_expected,
ARRAY_SIZE(
DATA :chunks :: ARRAY
) AS chunks_included,
{{ target.database }}.streamline.udf_extract_hash_array(
DATA :chunks :: ARRAY,
'chunk_hash'
) AS chunk_ids,
-- array_size(chunk_ids) = chunks_included as array_is_complete ?
partition_key,
_inserted_timestamp,
DATA :header :hash :: STRING AS complete_blocks_id,
@ -32,8 +41,11 @@ WHERE
),
'1900-01-01' :: timestamp_ntz
)
AND DATA IS NOT NULL
{% else %}
{{ ref('bronze__FR_blocks') }}
WHERE
DATA IS NOT NULL
{% endif %}
qualify(ROW_NUMBER() over (PARTITION BY block_number

View File

@ -0,0 +1,59 @@
-- depends_on: {{ ref('bronze__chunks') }}
-- depends_on: {{ ref('bronze__FR_chunks') }}
{{ config (
materialized = "incremental",
unique_key = "chunk_hash",
cluster_by = "ROUND(block_number, -3)",
merge_exclude_columns = ["inserted_timestamp"],
post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION on equality(chunk_hash)",
tags = ['streamline_complete']
) }}
SELECT
VALUE :BLOCK_NUMBER :: INT AS block_number,
VALUE :CHUNK_HASH :: STRING AS chunk_hash,
DATA :header: shard_id :: INT AS shard_id,
ARRAY_SIZE(
DATA :receipts :: ARRAY
) AS receipts_count,
ARRAY_SIZE(
DATA :transactions :: ARRAY
) AS transactions_count,
{{ target.database }}.streamline.udf_extract_hash_array(
DATA :receipts :: ARRAY,
'receipt_id'
) AS receipt_ids,
{{ target.database }}.streamline.udf_extract_hash_array(
DATA :transactions :: ARRAY,
'hash'
) AS transaction_ids,
partition_key,
_inserted_timestamp,
DATA :header :chunk_hash :: STRING AS complete_chunks_id,
SYSDATE() AS inserted_timestamp,
SYSDATE() AS modified_timestamp,
'{{ invocation_id }}' AS _invocation_id
FROM
{% if is_incremental() %}
{{ ref('bronze__chunks') }}
WHERE
_inserted_timestamp >= COALESCE(
(
SELECT
MAX(_inserted_timestamp) _inserted_timestamp
FROM
{{ this }}
),
'1900-01-01' :: timestamp_ntz
)
AND DATA IS NOT NULL
{% else %}
{{ ref('bronze__FR_chunks') }}
WHERE
DATA IS NOT NULL
{% endif %}
qualify(ROW_NUMBER() over (PARTITION BY chunk_hash
ORDER BY
_inserted_timestamp DESC)) = 1

View File

@ -1,3 +1,5 @@
-- depends_on: {{ ref('bronze__blocks') }}
-- depends_on: {{ ref('bronze__FR_blocks') }}
{{ config (
materialized = "view",
post_hook = fsc_utils.if_data_call_function_v2(
@ -14,7 +16,8 @@
),
tags = ['streamline_realtime']
) }}
-- Note, roughly 3,000 blocks per hour (~70k/day).
-- batch sizing is WIP
WITH last_3_days AS (
SELECT
@ -38,6 +41,8 @@ tbl AS (
)
AND block_number IS NOT NULL
EXCEPT
-- TODO there may be skipped block heights! use hash / parent hash instead
-- or will a skipped block height return a unique response that i can log
SELECT
block_number
FROM
@ -63,16 +68,23 @@ SELECT
'POST',
'{Service}',
OBJECT_CONSTRUCT(
'Content-Type', 'application/json'
'Content-Type',
'application/json'
),
OBJECT_CONSTRUCT(
'jsonrpc', '2.0',
'method', 'block',
'id', 'Flipside/getBlock/0.1',
'params', OBJECT_CONSTRUCT(
'block_id', block_number
'jsonrpc',
'2.0',
'method',
'block',
'id',
'Flipside/getBlock/0.1',
'params',
OBJECT_CONSTRUCT(
'block_id',
block_number
)
),
'Vault/prod/near/quicknode/mainnet'
) AS request
FROM tbl
FROM
tbl

View File

@ -0,0 +1,91 @@
-- depends_on: {{ ref('bronze__chunks') }}
-- depends_on: {{ ref('bronze__FR_chunks') }}
{{ config (
materialized = "view",
post_hook = fsc_utils.if_data_call_function_v2(
func = 'streamline.udf_bulk_rest_api_v2',
target = "{{this.schema}}.{{this.identifier}}",
params = {
"external_table": "chunks_v2",
"sql_limit": "18000",
"producer_batch_size": "6000",
"worker_batch_size": "6000",
"sql_source": "{{this.identifier}}",
"order_by_column": "block_number DESC"
}
),
tags = ['streamline_realtime']
) }}
-- Note, roughly 3,000 blocks per hour (~70k/day) * 6 chunks per block (for now)
-- batch sizing is WIP
WITH last_3_days AS (
SELECT
ZEROIFNULL(block_number) AS block_number
FROM
{{ ref("_block_lookback") }}
),
tbl AS (
SELECT
block_number,
chunk_hash
FROM
{{ ref('streamline__chunks') }}
WHERE
(
block_number >= (
SELECT
block_number
FROM
last_3_days
)
)
AND chunk_hash IS NOT NULL
EXCEPT
SELECT
block_number,
chunk_hash
FROM
{{ ref('streamline__chunks_complete') }}
WHERE
block_number >= (
SELECT
block_number
FROM
last_3_days
)
AND _inserted_timestamp >= DATEADD(
'day',
-4,
SYSDATE()
)
AND chunk_hash IS NOT NULL
)
SELECT
block_number,
chunk_hash,
DATE_PART('EPOCH', SYSDATE()) :: INTEGER AS partition_key,
{{ target.database }}.live.udf_api(
'POST',
'{Service}',
OBJECT_CONSTRUCT(
'Content-Type',
'application/json'
),
OBJECT_CONSTRUCT(
'jsonrpc',
'2.0',
'method',
'chunk',
'id',
'Flipside/getChunk/0.1',
'params',
OBJECT_CONSTRUCT(
'chunk_id',
chunk_hash
)
),
'Vault/prod/near/quicknode/mainnet'
) AS request
FROM
tbl

View File

@ -0,0 +1,58 @@
{{ config (
materialized = "incremental",
unique_key = "chunk_hash",
cluster_by = "ROUND(block_number, -3)",
merge_exclude_columns = ["inserted_timestamp"],
post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION on equality(chunk_hash, block_number)",
tags = ['streamline_helper']
) }}
WITH blocks_complete AS (
SELECT
block_number,
block_hash,
chunk_ids,
_inserted_timestamp
FROM
{{ ref('streamline__blocks_complete') }}
{% if is_incremental() %}
WHERE
modified_timestamp >= (
SELECT
MAX(modified_timestamp)
FROM
{{ this }}
)
{% endif %}
),
flatten_chunk_ids AS (
SELECT
block_number,
block_hash,
VALUE :: STRING AS chunk_hash,
INDEX AS chunk_index,
_inserted_timestamp
FROM
blocks_complete,
LATERAL FLATTEN(
input => chunk_ids
)
)
SELECT
block_number,
block_hash,
chunk_hash,
chunk_index,
_inserted_timestamp,
chunk_hash AS chunks_id,
SYSDATE() AS inserted_timestamp,
SYSDATE() AS modified_timestamp,
'{{ invocation_id }}' AS _invocation_id
FROM
flatten_chunk_ids
qualify(ROW_NUMBER() over (PARTITION BY chunk_hash
ORDER BY
modified_timestamp DESC)) = 1