diff --git a/macros/streamline/create_udf_extract_hash_array.sql b/macros/streamline/create_udf_extract_hash_array.sql new file mode 100644 index 0000000..3e9440c --- /dev/null +++ b/macros/streamline/create_udf_extract_hash_array.sql @@ -0,0 +1,18 @@ +{% macro create_udf_extract_hash_array() %} +{% set sql %} +create or replace function {{ target.database}}.STREAMLINE.UDF_EXTRACT_HASH_ARRAY(object_array variant, object_key string) +returns ARRAY +language python +runtime_version = '3.9' +handler = 'extract_hash_array' +AS +$$ +def extract_hash_array(object_array, object_key): + try: + return [object[object_key] for object in object_array] + except: + return [] +$$ +{% endset %} +{% do run_query(sql) %} +{% endmacro %} diff --git a/macros/udfs/create_udf_get_chainhead.sql b/macros/streamline/create_udf_get_chainhead.sql similarity index 100% rename from macros/udfs/create_udf_get_chainhead.sql rename to macros/streamline/create_udf_get_chainhead.sql diff --git a/models/bronze/bronze__FR_chunks.sql b/models/bronze/bronze__FR_chunks.sql new file mode 100644 index 0000000..b86d576 --- /dev/null +++ b/models/bronze/bronze__FR_chunks.sql @@ -0,0 +1,10 @@ +-- TODO - v2 naming? +{{ config ( + materialized = 'view', + tags = ['streamline_helper'] +) }} + +{{ streamline_external_table_FR_query_v2( + model = "chunks_v2", + partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 3), '_', 1) AS INTEGER )" +) }} diff --git a/models/bronze/bronze__chunks.sql b/models/bronze/bronze__chunks.sql new file mode 100644 index 0000000..c811071 --- /dev/null +++ b/models/bronze/bronze__chunks.sql @@ -0,0 +1,10 @@ +{{ config ( + materialized = 'view', + tags = ['streamline_helper'] + +) }} + +{{ streamline_external_table_query_v2( + model = "chunks_v2", + partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 3), '_', 1) AS INTEGER )" +) }} diff --git a/models/silver/core/silver__blocks_v2.sql b/models/silver/core/silver__blocks_v2.sql index b3c8ac5..5c3ed65 100644 --- a/models/silver/core/silver__blocks_v2.sql +++ b/models/silver/core/silver__blocks_v2.sql @@ -1,39 +1,44 @@ -- depends_on: {{ ref('bronze__blocks') }} -- depends_on: {{ ref('bronze__FR_blocks') }} - {{ config ( materialized = "incremental", incremental_strategy = 'merge', unique_key = "block_hash", cluster_by = ['modified_timestamp::DATE','partition_key'], - post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION on equality(block_number)", + post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION on equality(block_hash)", tags = ['scheduled_core'] ) }} WITH bronze_blocks AS ( - SELECT + + SELECT VALUE :BLOCK_NUMBER :: INT AS block_number, DATA :header :hash :: STRING AS block_hash, + DATA :header :timestamp :: timestamp_ntz AS block_timestamp, partition_key, - DATA :: VARIANT AS block_json, + DATA :: variant AS block_json, _inserted_timestamp - FROM - {% if is_incremental() %} - {{ ref('bronze__blocks') }} - WHERE _inserted_timestamp >= ( - SELECT - COALESCE(MAX(_inserted_timestamp), '1900-01-01'::TIMESTAMP) AS _inserted_timestamp - FROM {{ this }} - ) AND DATA IS NOT NULL - {% else %} - {{ ref('bronze__FR_blocks') }} - WHERE DATA IS NOT NULL - {% endif %} -) + FROM -SELECT +{% if is_incremental() %} +{{ ref('bronze__blocks') }} +WHERE + _inserted_timestamp >= ( + SELECT + COALESCE(MAX(_inserted_timestamp), '1900-01-01' :: TIMESTAMP) AS _inserted_timestamp + FROM + {{ this }}) + AND DATA IS NOT NULL + {% else %} + {{ ref('bronze__FR_blocks') }} + WHERE + DATA IS NOT NULL + {% endif %} + ) +SELECT block_number, block_hash, + block_timestamp, partition_key, block_json, _inserted_timestamp, @@ -41,6 +46,11 @@ SELECT SYSDATE() AS inserted_timestamp, SYSDATE() AS modified_timestamp, '{{ invocation_id }}' AS _invocation_id -FROM bronze_blocks +FROM + bronze_blocks -QUALIFY ROW_NUMBER() OVER (PARTITION BY block_hash ORDER BY _inserted_timestamp DESC) = 1 +qualify ROW_NUMBER() over ( + PARTITION BY block_hash + ORDER BY + _inserted_timestamp DESC + ) = 1 diff --git a/models/silver/core/silver__chunks_v2.sql b/models/silver/core/silver__chunks_v2.sql new file mode 100644 index 0000000..f9cf98d --- /dev/null +++ b/models/silver/core/silver__chunks_v2.sql @@ -0,0 +1,56 @@ +-- depends_on: {{ ref('bronze__chunks') }} +-- depends_on: {{ ref('bronze__FR_chunks') }} +{{ config ( + materialized = "incremental", + incremental_strategy = 'merge', + unique_key = "chunk_hash", + cluster_by = ['modified_timestamp::DATE','partition_key'], + post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION on equality(chunk_hash)", + tags = ['scheduled_core'] +) }} + +WITH bronze_chunks AS ( + + SELECT + VALUE :BLOCK_NUMBER :: INT AS block_number, + DATA :header :shard_id :: INT AS shard_id, + DATA :header :chunk_hash :: STRING AS chunk_hash, + partition_key, + DATA :: variant AS chunk_json, + _inserted_timestamp + FROM + +{% if is_incremental() %} +{{ ref('bronze__chunks') }} +WHERE + _inserted_timestamp >= ( + SELECT + COALESCE(MAX(_inserted_timestamp), '1900-01-01' :: TIMESTAMP) AS _inserted_timestamp + FROM + {{ this }}) + AND DATA IS NOT NULL + {% else %} + {{ ref('bronze__FR_chunks') }} + WHERE + DATA IS NOT NULL + {% endif %} + ) +SELECT + block_number, + shard_id, + chunk_hash, + partition_key, + chunk_json, + _inserted_timestamp, + {{ dbt_utils.generate_surrogate_key(['chunk_hash']) }} AS chunks_v2_id, + SYSDATE() AS inserted_timestamp, + SYSDATE() AS modified_timestamp, + '{{ invocation_id }}' AS _invocation_id +FROM + bronze_chunks + +qualify ROW_NUMBER() over ( + PARTITION BY chunk_hash + ORDER BY + _inserted_timestamp DESC + ) = 1 diff --git a/models/streamline/core/complete/streamline__blocks_complete.sql b/models/streamline/core/complete/streamline__blocks_complete.sql index c6d2a65..aea69d7 100644 --- a/models/streamline/core/complete/streamline__blocks_complete.sql +++ b/models/streamline/core/complete/streamline__blocks_complete.sql @@ -12,6 +12,15 @@ SELECT VALUE :BLOCK_NUMBER :: INT AS block_number, DATA :header :hash :: STRING AS block_hash, + DATA :header :chunks_included :: INT AS chunks_expected, + ARRAY_SIZE( + DATA :chunks :: ARRAY + ) AS chunks_included, + {{ target.database }}.streamline.udf_extract_hash_array( + DATA :chunks :: ARRAY, + 'chunk_hash' + ) AS chunk_ids, + -- array_size(chunk_ids) = chunks_included as array_is_complete ? partition_key, _inserted_timestamp, DATA :header :hash :: STRING AS complete_blocks_id, @@ -32,8 +41,11 @@ WHERE ), '1900-01-01' :: timestamp_ntz ) + AND DATA IS NOT NULL {% else %} {{ ref('bronze__FR_blocks') }} +WHERE + DATA IS NOT NULL {% endif %} qualify(ROW_NUMBER() over (PARTITION BY block_number diff --git a/models/streamline/core/complete/streamline__chunks_complete.sql b/models/streamline/core/complete/streamline__chunks_complete.sql new file mode 100644 index 0000000..23eeb73 --- /dev/null +++ b/models/streamline/core/complete/streamline__chunks_complete.sql @@ -0,0 +1,59 @@ +-- depends_on: {{ ref('bronze__chunks') }} +-- depends_on: {{ ref('bronze__FR_chunks') }} +{{ config ( + materialized = "incremental", + unique_key = "chunk_hash", + cluster_by = "ROUND(block_number, -3)", + merge_exclude_columns = ["inserted_timestamp"], + post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION on equality(chunk_hash)", + tags = ['streamline_complete'] +) }} + +SELECT + VALUE :BLOCK_NUMBER :: INT AS block_number, + VALUE :CHUNK_HASH :: STRING AS chunk_hash, + DATA :header: shard_id :: INT AS shard_id, + ARRAY_SIZE( + DATA :receipts :: ARRAY + ) AS receipts_count, + ARRAY_SIZE( + DATA :transactions :: ARRAY + ) AS transactions_count, + {{ target.database }}.streamline.udf_extract_hash_array( + DATA :receipts :: ARRAY, + 'receipt_id' + ) AS receipt_ids, + {{ target.database }}.streamline.udf_extract_hash_array( + DATA :transactions :: ARRAY, + 'hash' + ) AS transaction_ids, + partition_key, + _inserted_timestamp, + DATA :header :chunk_hash :: STRING AS complete_chunks_id, + SYSDATE() AS inserted_timestamp, + SYSDATE() AS modified_timestamp, + '{{ invocation_id }}' AS _invocation_id +FROM + +{% if is_incremental() %} +{{ ref('bronze__chunks') }} +WHERE + _inserted_timestamp >= COALESCE( + ( + SELECT + MAX(_inserted_timestamp) _inserted_timestamp + FROM + {{ this }} + ), + '1900-01-01' :: timestamp_ntz + ) + AND DATA IS NOT NULL +{% else %} + {{ ref('bronze__FR_chunks') }} +WHERE + DATA IS NOT NULL +{% endif %} + +qualify(ROW_NUMBER() over (PARTITION BY chunk_hash +ORDER BY + _inserted_timestamp DESC)) = 1 diff --git a/models/streamline/core/realtime/streamline__blocks_realtime.sql b/models/streamline/core/realtime/streamline__blocks_realtime.sql index 46d96ac..3feef30 100644 --- a/models/streamline/core/realtime/streamline__blocks_realtime.sql +++ b/models/streamline/core/realtime/streamline__blocks_realtime.sql @@ -1,3 +1,5 @@ +-- depends_on: {{ ref('bronze__blocks') }} +-- depends_on: {{ ref('bronze__FR_blocks') }} {{ config ( materialized = "view", post_hook = fsc_utils.if_data_call_function_v2( @@ -14,7 +16,8 @@ ), tags = ['streamline_realtime'] ) }} - +-- Note, roughly 3,000 blocks per hour (~70k/day). +-- batch sizing is WIP WITH last_3_days AS ( SELECT @@ -38,6 +41,8 @@ tbl AS ( ) AND block_number IS NOT NULL EXCEPT + -- TODO there may be skipped block heights! use hash / parent hash instead + -- or will a skipped block height return a unique response that i can log SELECT block_number FROM @@ -63,16 +68,23 @@ SELECT 'POST', '{Service}', OBJECT_CONSTRUCT( - 'Content-Type', 'application/json' + 'Content-Type', + 'application/json' ), OBJECT_CONSTRUCT( - 'jsonrpc', '2.0', - 'method', 'block', - 'id', 'Flipside/getBlock/0.1', - 'params', OBJECT_CONSTRUCT( - 'block_id', block_number + 'jsonrpc', + '2.0', + 'method', + 'block', + 'id', + 'Flipside/getBlock/0.1', + 'params', + OBJECT_CONSTRUCT( + 'block_id', + block_number ) ), 'Vault/prod/near/quicknode/mainnet' ) AS request -FROM tbl +FROM + tbl diff --git a/models/streamline/core/realtime/streamline__chunks_realtime.sql b/models/streamline/core/realtime/streamline__chunks_realtime.sql new file mode 100644 index 0000000..ea19e68 --- /dev/null +++ b/models/streamline/core/realtime/streamline__chunks_realtime.sql @@ -0,0 +1,91 @@ +-- depends_on: {{ ref('bronze__chunks') }} +-- depends_on: {{ ref('bronze__FR_chunks') }} +{{ config ( + materialized = "view", + post_hook = fsc_utils.if_data_call_function_v2( + func = 'streamline.udf_bulk_rest_api_v2', + target = "{{this.schema}}.{{this.identifier}}", + params = { + "external_table": "chunks_v2", + "sql_limit": "18000", + "producer_batch_size": "6000", + "worker_batch_size": "6000", + "sql_source": "{{this.identifier}}", + "order_by_column": "block_number DESC" + } + ), + tags = ['streamline_realtime'] +) }} +-- Note, roughly 3,000 blocks per hour (~70k/day) * 6 chunks per block (for now) +-- batch sizing is WIP +WITH last_3_days AS ( + + SELECT + ZEROIFNULL(block_number) AS block_number + FROM + {{ ref("_block_lookback") }} +), +tbl AS ( + SELECT + block_number, + chunk_hash + FROM + {{ ref('streamline__chunks') }} + WHERE + ( + block_number >= ( + SELECT + block_number + FROM + last_3_days + ) + ) + AND chunk_hash IS NOT NULL + EXCEPT + SELECT + block_number, + chunk_hash + FROM + {{ ref('streamline__chunks_complete') }} + WHERE + block_number >= ( + SELECT + block_number + FROM + last_3_days + ) + AND _inserted_timestamp >= DATEADD( + 'day', + -4, + SYSDATE() + ) + AND chunk_hash IS NOT NULL +) +SELECT + block_number, + chunk_hash, + DATE_PART('EPOCH', SYSDATE()) :: INTEGER AS partition_key, + {{ target.database }}.live.udf_api( + 'POST', + '{Service}', + OBJECT_CONSTRUCT( + 'Content-Type', + 'application/json' + ), + OBJECT_CONSTRUCT( + 'jsonrpc', + '2.0', + 'method', + 'chunk', + 'id', + 'Flipside/getChunk/0.1', + 'params', + OBJECT_CONSTRUCT( + 'chunk_id', + chunk_hash + ) + ), + 'Vault/prod/near/quicknode/mainnet' + ) AS request +FROM + tbl diff --git a/models/streamline/streamline__chunks.sql b/models/streamline/streamline__chunks.sql new file mode 100644 index 0000000..1db6c53 --- /dev/null +++ b/models/streamline/streamline__chunks.sql @@ -0,0 +1,58 @@ +{{ config ( + materialized = "incremental", + unique_key = "chunk_hash", + cluster_by = "ROUND(block_number, -3)", + merge_exclude_columns = ["inserted_timestamp"], + post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION on equality(chunk_hash, block_number)", + tags = ['streamline_helper'] +) }} + +WITH blocks_complete AS ( + + SELECT + block_number, + block_hash, + chunk_ids, + _inserted_timestamp + FROM + {{ ref('streamline__blocks_complete') }} + +{% if is_incremental() %} +WHERE + modified_timestamp >= ( + SELECT + MAX(modified_timestamp) + FROM + {{ this }} + ) +{% endif %} +), +flatten_chunk_ids AS ( + SELECT + block_number, + block_hash, + VALUE :: STRING AS chunk_hash, + INDEX AS chunk_index, + _inserted_timestamp + FROM + blocks_complete, + LATERAL FLATTEN( + input => chunk_ids + ) +) +SELECT + block_number, + block_hash, + chunk_hash, + chunk_index, + _inserted_timestamp, + chunk_hash AS chunks_id, + SYSDATE() AS inserted_timestamp, + SYSDATE() AS modified_timestamp, + '{{ invocation_id }}' AS _invocation_id +FROM + flatten_chunk_ids + +qualify(ROW_NUMBER() over (PARTITION BY chunk_hash +ORDER BY + modified_timestamp DESC)) = 1