diff --git a/models/streamline/complete/streamline__blocks_tx_complete.sql b/models/streamline/complete/streamline__blocks_tx_complete.sql index d602314..804cedd 100644 --- a/models/streamline/complete/streamline__blocks_tx_complete.sql +++ b/models/streamline/complete/streamline__blocks_tx_complete.sql @@ -10,10 +10,10 @@ SELECT DATA :block_height :: INT AS block_number, - ARRAY_SIZE( - DATA :transactions - ) AS tx_count_from_transactions_array, - DATA :last_version :: bigint - DATA :first_version :: bigint + 1 AS tx_count_from_versions, + DATA :block_timestamp :: bigint AS block_timestamp, + DATA :first_version :: bigint AS first_version, + DATA :last_version :: bigint AS last_version, + last_version - first_version + 1 AS tx_count_from_versions, {{ dbt_utils.generate_surrogate_key( ['block_number'] ) }} AS blocks_tx_complete_id, diff --git a/models/streamline/complete/streamline__transactions_complete.sql b/models/streamline/complete/streamline__transactions_complete.sql index 7e5a5ca..176646a 100644 --- a/models/streamline/complete/streamline__transactions_complete.sql +++ b/models/streamline/complete/streamline__transactions_complete.sql @@ -1,7 +1,7 @@ {{ config ( materialized = "incremental", incremental_strategy = 'merge', - unique_key = "transactions_complete_id", + unique_key = ['block_number','multiplier_no'], cluster_by = "ROUND(block_number, -3)", merge_exclude_columns = ["inserted_timestamp"], post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION on equality(block_number)" @@ -27,10 +27,6 @@ FROM {% endif %} A -JOIN {{ ref('silver__blocks') }} -b -ON DATA [0] :version :: INT BETWEEN b.first_version -AND b.last_version {% if is_incremental() %} WHERE diff --git a/models/streamline/silver/_max_block_by_date.sql b/models/streamline/silver/_max_block_by_date.sql deleted file mode 100644 index 7609b67..0000000 --- a/models/streamline/silver/_max_block_by_date.sql +++ /dev/null @@ -1,27 +0,0 @@ -{{ config ( - materialized = "ephemeral", - unique_key = "block_number", -) }} - -WITH base AS ( - - SELECT - block_timestamp :: DATE AS block_date, - MAX(block_number) block_number - FROM - {{ ref("silver__blocks") }} - GROUP BY - block_timestamp :: DATE -) -SELECT - block_date, - block_number -FROM - base -WHERE - block_date <> ( - SELECT - MAX(block_date) - FROM - base - ) diff --git a/models/streamline/silver/realtime/streamline__blocks_tx_realtime.sql b/models/streamline/silver/realtime/streamline__blocks_tx_realtime.sql index fec7374..eb59fee 100644 --- a/models/streamline/silver/realtime/streamline__blocks_tx_realtime.sql +++ b/models/streamline/silver/realtime/streamline__blocks_tx_realtime.sql @@ -4,11 +4,11 @@ func = 'streamline.udf_bulk_rest_api_v2', target = "{{this.schema}}.{{this.identifier}}", params ={ "external_table" :"blocks_tx", - "sql_limit" :"8000", - "producer_batch_size" :"50", - "worker_batch_size" :"400", + "sql_limit" :"10000", + "producer_batch_size" :"5000", + "worker_batch_size" :"5000", "sql_source" :"{{this.identifier}}", - "order_by_column": "block_number"} + "order_by_column": "block_number" } ), tags = ['streamline_core_realtime'] ) }} @@ -19,8 +19,6 @@ WITH blocks AS ( block_number FROM {{ ref('streamline__blocks') }} - WHERE - block_number != 0 EXCEPT SELECT block_number @@ -35,7 +33,7 @@ SELECT ) :: INT AS partition_key, {{ target.database }}.live.udf_api( 'GET', - '{Service}/v1/blocks/by_height/' || block_number || '?with_transactions=true', + '{Service}/v1/blocks/by_height/' || block_number || '?with_transactions=false', OBJECT_CONSTRUCT( 'Content-Type', 'application/json', diff --git a/models/streamline/silver/realtime/streamline__transactions_realtime.sql b/models/streamline/silver/realtime/streamline__transactions_realtime.sql index df46c03..cbc9892 100644 --- a/models/streamline/silver/realtime/streamline__transactions_realtime.sql +++ b/models/streamline/silver/realtime/streamline__transactions_realtime.sql @@ -3,26 +3,29 @@ post_hook = fsc_utils.if_data_call_function_v2( func = 'streamline.udf_bulk_rest_api_v2', target = "{{this.schema}}.{{this.identifier}}", - params ={ - "external_table" :"transactions", - "sql_limit" :"1000", - "producer_batch_size" :"50", - "worker_batch_size" :"400", - "sql_source" :"{{this.identifier}}", - "exploded_key": tojson([]), - "order_by_column": "block_number" - } + params ={ "external_table" :"transactions", + "sql_limit" :"10000", + "producer_batch_size" :"5000", + "worker_batch_size" :"5000", + "sql_source" :"{{this.identifier}}", + "exploded_key": tojson(["result"]), + "order_by_column": "block_number" } ), tags = ['streamline_core_realtime'] ) }} WITH blocks AS ( + SELECT A.block_number, tx_count_from_versions AS tx_count, - first_version AS version_start + first_version, + last_version, + block_timestamp FROM - {{ ref('silver__blocks') }} A + {{ ref('streamline__blocks_tx_complete') }} A + WHERE + block_number <> 0 ), numbers AS ( SELECT @@ -41,10 +44,12 @@ numbers AS ( ), blocks_with_page_numbers AS ( SELECT - tt.block_number :: INT AS block_number, + tt.block_number, n.n - 1 AS multiplier, - version_start, - tx_count + first_version, + last_version, + tx_count, + block_timestamp FROM blocks tt JOIN numbers n @@ -58,10 +63,18 @@ numbers AS ( WORK AS ( SELECT A.block_number, - version_start +( + block_timestamp, + first_version, + last_version, + first_version +( 100 * multiplier ) AS tx_version, - multiplier + multiplier, + LEAST ( + tx_count - 100 * multiplier, + 100 + ) AS lim, + tx_count FROM blocks_with_page_numbers A LEFT JOIN {{ ref('streamline__transactions_complete') }} @@ -72,14 +85,19 @@ numbers AS ( b.block_number IS NULL ) SELECT + block_number, + block_timestamp, + first_version, + last_version, tx_version, + multiplier, ROUND( - tx_version, + block_number, -4 ) :: INT AS partition_key, {{ target.database }}.live.udf_api( 'GET', - '{Service}/v1/transactions?start=' || tx_version || '&limit=100', + '{Service}/v1/transactions?start=' || tx_version || '&limit=' || lim, OBJECT_CONSTRUCT( 'Content-Type', 'application/json', @@ -88,10 +106,6 @@ numbers AS ( ), PARSE_JSON('{}'), 'Vault/prod/movement/mainnet' - ) AS request, - block_number, - multiplier + ) AS request FROM WORK - ORDER BY - block_number diff --git a/models/streamline/silver/streamline__blocks.sql b/models/streamline/silver/streamline__blocks.sql index 3923098..897d060 100644 --- a/models/streamline/silver/streamline__blocks.sql +++ b/models/streamline/silver/streamline__blocks.sql @@ -3,6 +3,9 @@ tags = ['streamline_view'] ) }} +SELECT + 0 AS block_number +UNION ALL SELECT _id AS block_number FROM diff --git a/package-lock.yml b/package-lock.yml index fc976d2..dc06b63 100644 --- a/package-lock.yml +++ b/package-lock.yml @@ -1,16 +1,16 @@ packages: -- package: calogica/dbt_expectations - version: 0.8.5 -- package: dbt-labs/dbt_utils - version: 1.0.0 -- git: https://github.com/FlipsideCrypto/fsc-utils.git - revision: eb33ac727af26ebc8a8cc9711d4a6ebc3790a107 -- package: get-select/dbt_snowflake_query_tags - version: 2.5.0 -- package: Snowflake-Labs/dbt_constraints - version: 0.6.3 -- package: calogica/dbt_date - version: 0.7.2 -- git: https://github.com/FlipsideCrypto/livequery-models.git - revision: b024188be4e9c6bc00ed77797ebdc92d351d620e -sha1_hash: d3219b9c206b5988189dcdafae0ec22ca9b4056c + - package: calogica/dbt_expectations + version: 0.8.5 + - package: dbt-labs/dbt_utils + version: 1.0.0 + - git: https://github.com/FlipsideCrypto/fsc-utils.git + revision: d3cf679e079f0cf06142de9386f215e55fe26b3b + - package: get-select/dbt_snowflake_query_tags + version: 2.5.0 + - package: Snowflake-Labs/dbt_constraints + version: 1.0.4 + - package: calogica/dbt_date + version: 0.7.2 + - git: https://github.com/FlipsideCrypto/livequery-models.git + revision: b024188be4e9c6bc00ed77797ebdc92d351d620e +sha1_hash: f14e55a0ab40f81e4341c5413a5b3d6e566ef058