diff --git a/models/streamline/complete/streamline__blocks_tx_complete.sql b/models/streamline/complete/streamline__blocks_tx_complete.sql new file mode 100644 index 0000000..804cedd --- /dev/null +++ b/models/streamline/complete/streamline__blocks_tx_complete.sql @@ -0,0 +1,41 @@ +{{ config ( + materialized = "incremental", + incremental_strategy = 'merge', + unique_key = "block_number", + cluster_by = "ROUND(block_number, -3)", + merge_exclude_columns = ["inserted_timestamp"], + post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION on equality(block_number)" +) }} +-- depends_on: {{ ref('bronze__blocks_tx') }} + +SELECT + DATA :block_height :: INT AS block_number, + DATA :block_timestamp :: bigint AS block_timestamp, + DATA :first_version :: bigint AS first_version, + DATA :last_version :: bigint AS last_version, + last_version - first_version + 1 AS tx_count_from_versions, + {{ dbt_utils.generate_surrogate_key( + ['block_number'] + ) }} AS blocks_tx_complete_id, + SYSDATE() AS inserted_timestamp, + SYSDATE() AS modified_timestamp, + file_name, + '{{ invocation_id }}' AS _invocation_id +FROM + +{% if is_incremental() %} +{{ ref('bronze__blocks_tx') }} +WHERE + inserted_timestamp >= ( + SELECT + MAX(modified_timestamp) + FROM + {{ this }} + ) +{% else %} + {{ ref('bronze__blocks_tx_FR') }} +{% endif %} + +qualify(ROW_NUMBER() over (PARTITION BY block_number +ORDER BY + inserted_timestamp DESC)) = 1 diff --git a/models/streamline/complete/streamline__transactions_complete.sql b/models/streamline/complete/streamline__transactions_complete.sql new file mode 100644 index 0000000..176646a --- /dev/null +++ b/models/streamline/complete/streamline__transactions_complete.sql @@ -0,0 +1,42 @@ +{{ config ( + materialized = "incremental", + incremental_strategy = 'merge', + unique_key = ['block_number','multiplier_no'], + cluster_by = "ROUND(block_number, -3)", + merge_exclude_columns = ["inserted_timestamp"], + post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION on equality(block_number)" +) }} +-- depends_on: {{ ref('bronze__transactions') }} + +SELECT + A.value :BLOCK_NUMBER :: INT AS block_number, + A.value :MULTIPLIER :: INT AS multiplier_no, + {{ dbt_utils.generate_surrogate_key( + ['block_number','multiplier_no'] + ) }} AS transactions_complete_id, + SYSDATE() AS inserted_timestamp, + SYSDATE() AS modified_timestamp, + file_name, + '{{ invocation_id }}' AS _invocation_id +FROM + +{% if is_incremental() %} +{{ ref('bronze__transactions') }} +{% else %} + {{ ref('bronze__transactions_FR') }} +{% endif %} + +A + +{% if is_incremental() %} +WHERE + A.inserted_timestamp >= ( + SELECT + COALESCE(MAX(modified_timestamp), '1970-01-01' :: DATE) + FROM + {{ this }}) + {% endif %} + + qualify(ROW_NUMBER() over (PARTITION BY block_number + ORDER BY + A.inserted_timestamp DESC)) = 1 diff --git a/models/streamline/silver/realtime/streamline__blocks_tx_realtime.sql b/models/streamline/silver/realtime/streamline__blocks_tx_realtime.sql new file mode 100644 index 0000000..eb59fee --- /dev/null +++ b/models/streamline/silver/realtime/streamline__blocks_tx_realtime.sql @@ -0,0 +1,49 @@ +{{ config ( + materialized = "view", + post_hook = fsc_utils.if_data_call_function_v2( + func = 'streamline.udf_bulk_rest_api_v2', + target = "{{this.schema}}.{{this.identifier}}", + params ={ "external_table" :"blocks_tx", + "sql_limit" :"10000", + "producer_batch_size" :"5000", + "worker_batch_size" :"5000", + "sql_source" :"{{this.identifier}}", + "order_by_column": "block_number" } + ), + tags = ['streamline_core_realtime'] +) }} + +WITH blocks AS ( + + SELECT + block_number + FROM + {{ ref('streamline__blocks') }} + EXCEPT + SELECT + block_number + FROM + {{ ref('streamline__blocks_tx_complete') }} +) +SELECT + block_number, + ROUND( + block_number, + -4 + ) :: INT AS partition_key, + {{ target.database }}.live.udf_api( + 'GET', + '{Service}/v1/blocks/by_height/' || block_number || '?with_transactions=false', + OBJECT_CONSTRUCT( + 'Content-Type', + 'application/json', + 'User-Agent', + 'Flipside_Crypto/0.1' + ), + PARSE_JSON('{}'), + 'Vault/prod/movement/mainnet' + ) AS request +FROM + blocks +ORDER BY + block_number diff --git a/models/streamline/silver/realtime/streamline__transactions_realtime.sql b/models/streamline/silver/realtime/streamline__transactions_realtime.sql new file mode 100644 index 0000000..5fc62a5 --- /dev/null +++ b/models/streamline/silver/realtime/streamline__transactions_realtime.sql @@ -0,0 +1,111 @@ +{{ config ( + materialized = "view", + post_hook = fsc_utils.if_data_call_function_v2( + func = 'streamline.udf_bulk_rest_api_v2', + target = "{{this.schema}}.{{this.identifier}}", + params ={ "external_table" :"transactions", + "sql_limit" :"10000", + "producer_batch_size" :"5000", + "worker_batch_size" :"5000", + "sql_source" :"{{this.identifier}}", + "exploded_key": tojson(["data"]), + "order_by_column": "block_number" } + ), + tags = ['streamline_core_realtime'] +) }} + +WITH blocks AS ( + + SELECT + A.block_number, + tx_count_from_versions AS tx_count, + first_version, + last_version, + block_timestamp + FROM + {{ ref('streamline__blocks_tx_complete') }} A + WHERE + block_number <> 0 +), +numbers AS ( + SELECT + 1 AS n + UNION ALL + SELECT + n + 1 + FROM + numbers + WHERE + n < ( + SELECT + CEIL(MAX(tx_count) / 100.0) + FROM + blocks) + ), + blocks_with_page_numbers AS ( + SELECT + tt.block_number, + n.n - 1 AS multiplier, + first_version, + last_version, + tx_count, + block_timestamp + FROM + blocks tt + JOIN numbers n + ON n.n <= CASE + WHEN tt.tx_count % 100 = 0 THEN tt.tx_count / 100 + ELSE FLOOR( + tt.tx_count / 100 + ) + 1 + END + ), + WORK AS ( + SELECT + A.block_number, + block_timestamp, + first_version, + last_version, + first_version +( + 100 * multiplier + ) AS tx_version, + multiplier, + LEAST ( + tx_count - 100 * multiplier, + 100 + ) AS lim, + tx_count + FROM + blocks_with_page_numbers A + LEFT JOIN {{ ref('streamline__transactions_complete') }} + b + ON A.block_number = b.block_number + AND multiplier = b.multiplier_no + WHERE + b.block_number IS NULL + ) + SELECT + block_number, + block_timestamp, + first_version, + last_version, + tx_version, + multiplier, + ROUND( + block_number, + -4 + ) :: INT AS partition_key, + {{ target.database }}.live.udf_api( + 'GET', + '{Service}/v1/transactions?start=' || tx_version || '&limit=' || lim, + OBJECT_CONSTRUCT( + 'Content-Type', + 'application/json', + 'User-Agent', + 'Flipside_Crypto/0.1' + ), + PARSE_JSON('{}'), + 'Vault/prod/movement/mainnet' + ) AS request + FROM + WORK diff --git a/models/streamline/silver/streamline__blocks.sql b/models/streamline/silver/streamline__blocks.sql new file mode 100644 index 0000000..897d060 --- /dev/null +++ b/models/streamline/silver/streamline__blocks.sql @@ -0,0 +1,22 @@ +{{ config ( + materialized = "view", + tags = ['streamline_view'] +) }} + +SELECT + 0 AS block_number +UNION ALL +SELECT + _id AS block_number +FROM + {{ source( + 'crosschain_silver', + 'number_sequence' + ) }} +WHERE + _id <= ( + SELECT + MAX(block_number) + FROM + {{ ref('streamline__chainhead') }} + ) diff --git a/package-lock.yml b/package-lock.yml index fc976d2..dc06b63 100644 --- a/package-lock.yml +++ b/package-lock.yml @@ -1,16 +1,16 @@ packages: -- package: calogica/dbt_expectations - version: 0.8.5 -- package: dbt-labs/dbt_utils - version: 1.0.0 -- git: https://github.com/FlipsideCrypto/fsc-utils.git - revision: eb33ac727af26ebc8a8cc9711d4a6ebc3790a107 -- package: get-select/dbt_snowflake_query_tags - version: 2.5.0 -- package: Snowflake-Labs/dbt_constraints - version: 0.6.3 -- package: calogica/dbt_date - version: 0.7.2 -- git: https://github.com/FlipsideCrypto/livequery-models.git - revision: b024188be4e9c6bc00ed77797ebdc92d351d620e -sha1_hash: d3219b9c206b5988189dcdafae0ec22ca9b4056c + - package: calogica/dbt_expectations + version: 0.8.5 + - package: dbt-labs/dbt_utils + version: 1.0.0 + - git: https://github.com/FlipsideCrypto/fsc-utils.git + revision: d3cf679e079f0cf06142de9386f215e55fe26b3b + - package: get-select/dbt_snowflake_query_tags + version: 2.5.0 + - package: Snowflake-Labs/dbt_constraints + version: 1.0.4 + - package: calogica/dbt_date + version: 0.7.2 + - git: https://github.com/FlipsideCrypto/livequery-models.git + revision: b024188be4e9c6bc00ed77797ebdc92d351d620e +sha1_hash: f14e55a0ab40f81e4341c5413a5b3d6e566ef058