diff --git a/dbt_project.yml b/dbt_project.yml index 373b852..004fbf5 100644 --- a/dbt_project.yml +++ b/dbt_project.yml @@ -67,7 +67,22 @@ vars: HEAL_MODEL: False HEAL_CURATED_MODEL: [] START_GHA_TASKS: False - API_INTEGRATION: '{{ var("config")[target.name]["API_INTEGRATION"] if var("config")[target.name] else var("config")["dev"]["API_INTEGRATION"] }}' + + #### STREAMLINE 2.0 BEGIN #### + + API_INTEGRATION: '{{ var("config")[target.name]["API_INTEGRATION"] if var("config")[target.name] else var("config")["dev"]["API_INTEGRATION"] }}' EXTERNAL_FUNCTION_URI: '{{ var("config")[target.name]["EXTERNAL_FUNCTION_URI"] if var("config")[target.name] else var("config")["dev"]["EXTERNAL_FUNCTION_URI"] }}' ROLES: | - ["INTERNAL_DEV"] \ No newline at end of file + ["INTERNAL_DEV"] + + config: + # The keys correspond to dbt profiles and are case sensitive + dev: + API_INTEGRATION: AWS_BLAST_API_DEV + EXTERNAL_FUNCTION_URI: y9d0tuavh6.execute-api.us-east-1.amazonaws.com/stg/ + + prod: + API_INTEGRATION: AWS_BLAST_API_PROD + EXTERNAL_FUNCTION_URI: 42gzudc5si.execute-api.us-east-1.amazonaws.com/prod/ + + #### STREAMLINE 2.0 END #### \ No newline at end of file diff --git a/macros/create_udfs.sql b/macros/create_udfs.sql index efe9eaf..45ac8db 100644 --- a/macros/create_udfs.sql +++ b/macros/create_udfs.sql @@ -6,7 +6,7 @@ {{ create_udtf_get_base_table( schema = "streamline" ) }} - {{ create_udf_rest_api() }} + {{ create_udf_bulk_rest_api_v2() }} {{ create_aws_blast_api() }} {{ create_udf_bulk_decode_logs() }} diff --git a/macros/streamline/models.sql b/macros/streamline/models.sql index e2d2444..c45ce7f 100644 --- a/macros/streamline/models.sql +++ b/macros/streamline/models.sql @@ -115,3 +115,72 @@ WHERE ) ) {% endmacro %} + +{% macro streamline_external_table_query_v2( + model, + partition_function + ) %} + WITH meta AS ( + SELECT + job_created_time AS _inserted_timestamp, + file_name, + {{ partition_function }} AS partition_key + FROM + TABLE( + information_schema.external_table_file_registration_history( + start_time => DATEADD('day', -3, CURRENT_TIMESTAMP()), + table_name => '{{ source( "bronze_streamline", model) }}') + ) A + ) + SELECT + s.*, + b.file_name, + _inserted_timestamp + FROM + {{ source( + "bronze_streamline", + model + ) }} + s + JOIN meta b + ON b.file_name = metadata$filename + AND b.partition_key = s.partition_key + WHERE + b.partition_key = s.partition_key + AND DATA :error IS NULL + +{% endmacro %} + +{% macro streamline_external_table_FR_query_v2( + model, + partition_function + ) %} + WITH meta AS ( + SELECT + registered_on AS _inserted_timestamp, + file_name, + {{ partition_function }} AS partition_key + FROM + TABLE( + information_schema.external_table_files( + table_name => '{{ source( "bronze_streamline", model) }}' + ) + ) A + ) +SELECT + s.*, + b.file_name, + _inserted_timestamp +FROM + {{ source( + "bronze_streamline", + model + ) }} + s + JOIN meta b + ON b.file_name = metadata$filename + AND b.partition_key = s.partition_key +WHERE + b.partition_key = s.partition_key + AND DATA :error IS NULL +{% endmacro %} \ No newline at end of file diff --git a/macros/streamline/streamline_udfs.sql b/macros/streamline/streamline_udfs.sql index 9ed17c4..418e9db 100644 --- a/macros/streamline/streamline_udfs.sql +++ b/macros/streamline/streamline_udfs.sql @@ -1,6 +1,6 @@ -{% macro create_udf_rest_api() %} +{% macro create_udf_bulk_rest_api_v2() %} CREATE - OR REPLACE EXTERNAL FUNCTION streamline.udf_rest_api( + OR REPLACE EXTERNAL FUNCTION streamline.udf_bulk_rest_api_v2( json OBJECT ) returns ARRAY api_integration = {% if target.name == "prod" %} diff --git a/models/silver/core/silver__blocks.sql b/models/silver/core/silver__blocks.sql index 2d2d161..dfb81cf 100644 --- a/models/silver/core/silver__blocks.sql +++ b/models/silver/core/silver__blocks.sql @@ -10,50 +10,53 @@ SELECT DATA, - block_number, + IFNULL( + VALUE :BLOCK_NUMBER :: INT, + metadata :request :"data" :id :: INT + ) AS block_number, utils.udf_hex_to_int( - DATA :result :baseFeePerGas :: STRING + DATA :result :baseFeePerGas :: STRING ) :: INT AS base_fee_per_gas, utils.udf_hex_to_int( - DATA :result :difficulty :: STRING + DATA :result :difficulty :: STRING ) :: INT AS difficulty, - DATA :result :extraData :: STRING AS extra_data, + DATA :result :extraData :: STRING AS extra_data, utils.udf_hex_to_int( - DATA :result :gasLimit :: STRING + DATA :result :gasLimit :: STRING ) :: INT AS gas_limit, utils.udf_hex_to_int( - DATA :result :gasUsed :: STRING + DATA :result :gasUsed :: STRING ) :: INT AS gas_used, - DATA :result :hash :: STRING AS HASH, - DATA :result :logsBloom :: STRING AS logs_bloom, - DATA :result :miner :: STRING AS miner, - DATA :result :mixHash :: STRING AS mixHash, + DATA :result :hash :: STRING AS HASH, + DATA :result :logsBloom :: STRING AS logs_bloom, + DATA :result :miner :: STRING AS miner, + DATA :result :mixHash :: STRING AS mixHash, utils.udf_hex_to_int( - DATA :result :nonce :: STRING + DATA :result :nonce :: STRING ) :: INT AS nonce, utils.udf_hex_to_int( - DATA :result :number :: STRING + DATA :result :number :: STRING ) :: INT AS NUMBER, - DATA :result :parentHash :: STRING AS parent_hash, - DATA :result :receiptsRoot :: STRING AS receipts_root, - DATA :result :sha3Uncles :: STRING AS sha3_uncles, + DATA :result :parentHash :: STRING AS parent_hash, + DATA :result :receiptsRoot :: STRING AS receipts_root, + DATA :result :sha3Uncles :: STRING AS sha3_uncles, utils.udf_hex_to_int( - DATA :result :size :: STRING + DATA :result :size :: STRING ) :: INT AS SIZE, - DATA :result :stateRoot :: STRING AS state_root, + DATA :result :stateRoot :: STRING AS state_root, utils.udf_hex_to_int( - DATA :result :timestamp :: STRING + DATA :result :timestamp :: STRING ) :: TIMESTAMP AS block_timestamp, utils.udf_hex_to_int( - DATA :result :totalDifficulty :: STRING + DATA :result :totalDifficulty :: STRING ) :: INT AS total_difficulty, ARRAY_SIZE( - DATA :result :transactions + DATA :result :transactions ) AS tx_count, - DATA :result :transactionsRoot :: STRING AS transactions_root, - DATA :result :uncles AS uncles, - DATA :result :withdrawals AS withdrawals, - DATA :result :withdrawalsRoot :: STRING AS withdrawals_root, + DATA :result :transactionsRoot :: STRING AS transactions_root, + DATA :result :uncles AS uncles, + DATA :result :withdrawals AS withdrawals, + DATA :result :withdrawalsRoot :: STRING AS withdrawals_root, _inserted_timestamp, {{ dbt_utils.generate_surrogate_key( ['block_number'] diff --git a/models/silver/core/silver__confirmed_blocks.sql b/models/silver/core/silver__confirmed_blocks.sql index e81d27a..034ecc0 100644 --- a/models/silver/core/silver__confirmed_blocks.sql +++ b/models/silver/core/silver__confirmed_blocks.sql @@ -10,7 +10,10 @@ WITH base AS ( SELECT - block_number, + IFNULL( + VALUE :BLOCK_NUMBER :: INT, + metadata :request :"data" :id :: INT + ) AS block_number, DATA :result :hash :: STRING AS block_hash, DATA :result :transactions txs, _inserted_timestamp diff --git a/models/silver/core/silver__receipts.sql b/models/silver/core/silver__receipts.sql index 37cd3c2..19f0342 100644 --- a/models/silver/core/silver__receipts.sql +++ b/models/silver/core/silver__receipts.sql @@ -12,7 +12,10 @@ WITH base AS ( SELECT - block_number, + IFNULL( + VALUE :BLOCK_NUMBER :: INT, + metadata :request :"data" :id :: INT + ) AS block_number, DATA, _inserted_timestamp FROM diff --git a/models/silver/core/silver__traces.sql b/models/silver/core/silver__traces.sql index 9d44492..5729345 100644 --- a/models/silver/core/silver__traces.sql +++ b/models/silver/core/silver__traces.sql @@ -12,7 +12,10 @@ WITH bronze_traces AS ( SELECT - block_number, + IFNULL( + VALUE :BLOCK_NUMBER :: INT, + metadata :request :"data" :id :: INT + ) AS block_number, VALUE :array_index :: INT AS tx_position, DATA :result AS full_traces, _inserted_timestamp @@ -32,8 +35,7 @@ WHERE {{ ref('bronze__streamline_FR_traces') }} WHERE _partition_by_block_id <= 2000000 - AND - DATA :result IS NOT NULL + AND DATA :result IS NOT NULL {% endif %} qualify(ROW_NUMBER() over (PARTITION BY block_number, tx_position diff --git a/models/silver/core/silver__transactions.sql b/models/silver/core/silver__transactions.sql index f5b488e..dd40526 100644 --- a/models/silver/core/silver__transactions.sql +++ b/models/silver/core/silver__transactions.sql @@ -12,7 +12,10 @@ WITH base AS ( SELECT - block_number, + IFNULL( + VALUE :BLOCK_NUMBER :: INT, + metadata :request :"data" :id :: INT + ) AS block_number, DATA, _inserted_timestamp FROM diff --git a/models/streamline/bronze/core/bronze__streamline_FR_blocks.sql b/models/streamline/bronze/core/bronze__streamline_FR_blocks.sql index aa947ea..11a1606 100644 --- a/models/streamline/bronze/core/bronze__streamline_FR_blocks.sql +++ b/models/streamline/bronze/core/bronze__streamline_FR_blocks.sql @@ -2,9 +2,7 @@ materialized = 'view' ) }} -{{ streamline_external_table_FR_query( +{{ streamline_external_table_FR_query_v2( model = "blocks", - partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 4), '_', 1) AS INTEGER )", - partition_name = "_partition_by_block_id", - unique_key = "partition_key" + partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 4), '_', 1) AS INTEGER )" ) }} diff --git a/models/streamline/bronze/core/bronze__streamline_FR_confirm_blocks.sql b/models/streamline/bronze/core/bronze__streamline_FR_confirm_blocks.sql index 6869e21..699c0f8 100644 --- a/models/streamline/bronze/core/bronze__streamline_FR_confirm_blocks.sql +++ b/models/streamline/bronze/core/bronze__streamline_FR_confirm_blocks.sql @@ -2,9 +2,7 @@ materialized = 'view' ) }} -{{ streamline_external_table_FR_query( +{{ streamline_external_table_FR_query_v2( model = "confirm_blocks", - partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 4), '_', 1) AS INTEGER )", - partition_name = "_partition_by_block_id", - unique_key = "partition_key" + partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 4), '_', 1) AS INTEGER )" ) }} diff --git a/models/streamline/bronze/core/bronze__streamline_FR_receipts.sql b/models/streamline/bronze/core/bronze__streamline_FR_receipts.sql index f8f2380..580c448 100644 --- a/models/streamline/bronze/core/bronze__streamline_FR_receipts.sql +++ b/models/streamline/bronze/core/bronze__streamline_FR_receipts.sql @@ -2,9 +2,7 @@ materialized = 'view' ) }} -{{ streamline_external_table_FR_query( +{{ streamline_external_table_FR_query_v2( model = "receipts", - partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 4), '_', 1) AS INTEGER )", - partition_name = "_partition_by_block_id", - unique_key = "partition_key" + partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 4), '_', 1) AS INTEGER )" ) }} diff --git a/models/streamline/bronze/core/bronze__streamline_FR_traces.sql b/models/streamline/bronze/core/bronze__streamline_FR_traces.sql index f5ddf8d..d634f2f 100644 --- a/models/streamline/bronze/core/bronze__streamline_FR_traces.sql +++ b/models/streamline/bronze/core/bronze__streamline_FR_traces.sql @@ -2,9 +2,7 @@ materialized = 'view' ) }} -{{ streamline_external_table_FR_query( +{{ streamline_external_table_FR_query_v2( model = "traces", - partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 4), '_', 1) AS INTEGER )", - partition_name = "_partition_by_block_id", - unique_key = "partition_key" + partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 4), '_', 1) AS INTEGER )" ) }} diff --git a/models/streamline/bronze/core/bronze__streamline_FR_transactions.sql b/models/streamline/bronze/core/bronze__streamline_FR_transactions.sql index 6b3e1b1..a991b6d 100644 --- a/models/streamline/bronze/core/bronze__streamline_FR_transactions.sql +++ b/models/streamline/bronze/core/bronze__streamline_FR_transactions.sql @@ -2,9 +2,7 @@ materialized = 'view' ) }} -{{ streamline_external_table_FR_query( +{{ streamline_external_table_FR_query_v2( model = 'transactions', - partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 4), '_', 1) AS INTEGER )", - partition_name = "_partition_by_block_id", - unique_key = "partition_key" + partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 4), '_', 1) AS INTEGER )" ) }} diff --git a/models/streamline/bronze/core/bronze__streamline_blocks.sql b/models/streamline/bronze/core/bronze__streamline_blocks.sql index 8fdeb66..1651e04 100644 --- a/models/streamline/bronze/core/bronze__streamline_blocks.sql +++ b/models/streamline/bronze/core/bronze__streamline_blocks.sql @@ -2,9 +2,7 @@ materialized = 'view' ) }} -{{ streamline_external_table_query( +{{ streamline_external_table_query_v2( model = "blocks", - partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 4), '_', 1) AS INTEGER )", - partition_name = "_partition_by_block_id", - unique_key = "partition_key" + partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 4), '_', 1) AS INTEGER )" ) }} \ No newline at end of file diff --git a/models/streamline/bronze/core/bronze__streamline_confirm_blocks.sql b/models/streamline/bronze/core/bronze__streamline_confirm_blocks.sql index 55f9e46..63aff87 100644 --- a/models/streamline/bronze/core/bronze__streamline_confirm_blocks.sql +++ b/models/streamline/bronze/core/bronze__streamline_confirm_blocks.sql @@ -2,9 +2,7 @@ materialized = 'view' ) }} -{{ streamline_external_table_query( +{{ streamline_external_table_query_v2( model = "confirm_blocks", - partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 4), '_', 1) AS INTEGER )", - partition_name = "_partition_by_block_id", - unique_key = "partition_key" + partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 4), '_', 1) AS INTEGER )" ) }} diff --git a/models/streamline/bronze/core/bronze__streamline_receipts.sql b/models/streamline/bronze/core/bronze__streamline_receipts.sql index 81e4aab..58e45de 100644 --- a/models/streamline/bronze/core/bronze__streamline_receipts.sql +++ b/models/streamline/bronze/core/bronze__streamline_receipts.sql @@ -2,9 +2,7 @@ materialized = 'view' ) }} -{{ streamline_external_table_query( +{{ streamline_external_table_query_v2( model = "receipts", - partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 4), '_', 1) AS INTEGER )", - partition_name = "_partition_by_block_id", - unique_key = "partition_key" + partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 4), '_', 1) AS INTEGER )" ) }} diff --git a/models/streamline/bronze/core/bronze__streamline_traces.sql b/models/streamline/bronze/core/bronze__streamline_traces.sql index 5a08e03..01f265f 100644 --- a/models/streamline/bronze/core/bronze__streamline_traces.sql +++ b/models/streamline/bronze/core/bronze__streamline_traces.sql @@ -2,9 +2,7 @@ materialized = 'view' ) }} -{{ streamline_external_table_query( +{{ streamline_external_table_query_v2( model = "traces", - partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 4), '_', 1) AS INTEGER )", - partition_name = "_partition_by_block_id", - unique_key = "partition_key" + partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 4), '_', 1) AS INTEGER )" ) }} diff --git a/models/streamline/bronze/core/bronze__streamline_transactions.sql b/models/streamline/bronze/core/bronze__streamline_transactions.sql index ad4d781..c318987 100644 --- a/models/streamline/bronze/core/bronze__streamline_transactions.sql +++ b/models/streamline/bronze/core/bronze__streamline_transactions.sql @@ -2,9 +2,7 @@ materialized = 'view' ) }} -{{ streamline_external_table_query( +{{ streamline_external_table_query_v2( model = "transactions", - partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 4), '_', 1) AS INTEGER )", - partition_name = "_partition_by_block_id", - unique_key = "partition_key" + partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 4), '_', 1) AS INTEGER )" ) }} diff --git a/models/streamline/silver/core/complete/streamline__complete_blocks.sql b/models/streamline/silver/core/complete/streamline__complete_blocks.sql index 97f5c03..eb1f855 100644 --- a/models/streamline/silver/core/complete/streamline__complete_blocks.sql +++ b/models/streamline/silver/core/complete/streamline__complete_blocks.sql @@ -1,16 +1,24 @@ -- depends_on: {{ ref('bronze__streamline_blocks') }} {{ config ( materialized = "incremental", - unique_key = "id", + unique_key = "block_number", cluster_by = "ROUND(block_number, -3)", - post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION on equality(id)", + post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION on equality(block_number)", tags = ['streamline_core_complete'] ) }} SELECT - id, - block_number, - _inserted_timestamp + IFNULL( + VALUE :BLOCK_NUMBER :: INT, + metadata :request :"data" :id :: INT + ) AS block_number, + {{ dbt_utils.generate_surrogate_key( + ['block_number'] + ) }} AS complete_blocks_id, + SYSDATE() AS inserted_timestamp, + SYSDATE() AS modified_timestamp, + _inserted_timestamp, + '{{ invocation_id }}' AS _invocation_id FROM {% if is_incremental() %} @@ -26,6 +34,6 @@ WHERE {{ ref('bronze__streamline_FR_blocks') }} {% endif %} -qualify(ROW_NUMBER() over (PARTITION BY id +qualify(ROW_NUMBER() over (PARTITION BY block_number ORDER BY _inserted_timestamp DESC)) = 1 diff --git a/models/streamline/silver/core/complete/streamline__complete_confirmed_blocks.sql b/models/streamline/silver/core/complete/streamline__complete_confirmed_blocks.sql index 4432bd1..a892fd2 100644 --- a/models/streamline/silver/core/complete/streamline__complete_confirmed_blocks.sql +++ b/models/streamline/silver/core/complete/streamline__complete_confirmed_blocks.sql @@ -1,15 +1,23 @@ -- depends_on: {{ ref('bronze__streamline_confirm_blocks') }} {{ config ( materialized = "incremental", - unique_key = "id", + unique_key = "block_number", cluster_by = "ROUND(block_number, -3)", tags = ['streamline_core_complete'] ) }} SELECT - id, - block_number, - _inserted_timestamp + IFNULL( + VALUE :BLOCK_NUMBER :: INT, + metadata :request :"data" :id :: INT + ) AS block_number, + {{ dbt_utils.generate_surrogate_key( + ['block_number'] + ) }} AS complete_confirmed_blocks_id, + SYSDATE() AS inserted_timestamp, + SYSDATE() AS modified_timestamp, + _inserted_timestamp, + '{{ invocation_id }}' AS _invocation_id FROM {% if is_incremental() %} @@ -24,6 +32,6 @@ WHERE {{ ref('bronze__streamline_FR_confirm_blocks') }} {% endif %} - qualify(ROW_NUMBER() over (PARTITION BY id + qualify(ROW_NUMBER() over (PARTITION BY block_number ORDER BY _inserted_timestamp DESC)) = 1 diff --git a/models/streamline/silver/core/complete/streamline__complete_receipts.sql b/models/streamline/silver/core/complete/streamline__complete_receipts.sql index 7068503..5aec039 100644 --- a/models/streamline/silver/core/complete/streamline__complete_receipts.sql +++ b/models/streamline/silver/core/complete/streamline__complete_receipts.sql @@ -1,16 +1,24 @@ -- depends_on: {{ ref('bronze__streamline_receipts') }} {{ config ( materialized = "incremental", - unique_key = "id", + unique_key = "block_number", cluster_by = "ROUND(block_number, -3)", - post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION on equality(id)", + post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION on equality(block_number)", tags = ['streamline_core_complete'] ) }} SELECT - id, - block_number, - _inserted_timestamp + IFNULL( + VALUE :BLOCK_NUMBER :: INT, + metadata :request :"data" :id :: INT + ) AS block_number, + {{ dbt_utils.generate_surrogate_key( + ['block_number'] + ) }} AS complete_receipts_id, + SYSDATE() AS inserted_timestamp, + SYSDATE() AS modified_timestamp, + _inserted_timestamp, + '{{ invocation_id }}' AS _invocation_id FROM {% if is_incremental() %} @@ -26,6 +34,6 @@ WHERE {{ ref('bronze__streamline_FR_receipts') }} {% endif %} -qualify(ROW_NUMBER() over (PARTITION BY id +qualify(ROW_NUMBER() over (PARTITION BY block_number ORDER BY _inserted_timestamp DESC)) = 1 diff --git a/models/streamline/silver/core/complete/streamline__complete_traces.sql b/models/streamline/silver/core/complete/streamline__complete_traces.sql index aaca35c..5dd3848 100644 --- a/models/streamline/silver/core/complete/streamline__complete_traces.sql +++ b/models/streamline/silver/core/complete/streamline__complete_traces.sql @@ -1,16 +1,24 @@ -- depends_on: {{ ref('bronze__streamline_traces') }} {{ config ( materialized = "incremental", - unique_key = "id", + unique_key = "block_number", cluster_by = "ROUND(block_number, -3)", - post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION on equality(id)", + post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION on equality(block_number)", tags = ['streamline_core_complete'] ) }} SELECT - id, - block_number, - _inserted_timestamp + IFNULL( + VALUE :BLOCK_NUMBER :: INT, + metadata :request :"data" :id :: INT + ) AS block_number, + {{ dbt_utils.generate_surrogate_key( + ['block_number'] + ) }} AS complete_traces_id, + SYSDATE() AS inserted_timestamp, + SYSDATE() AS modified_timestamp, + _inserted_timestamp, + '{{ invocation_id }}' AS _invocation_id FROM {% if is_incremental() %} @@ -26,6 +34,6 @@ WHERE {{ ref('bronze__streamline_FR_traces') }} {% endif %} -qualify(ROW_NUMBER() over (PARTITION BY id +qualify(ROW_NUMBER() over (PARTITION BY block_number ORDER BY _inserted_timestamp DESC)) = 1 diff --git a/models/streamline/silver/core/complete/streamline__complete_transactions.sql b/models/streamline/silver/core/complete/streamline__complete_transactions.sql index 15b7173..82645b3 100644 --- a/models/streamline/silver/core/complete/streamline__complete_transactions.sql +++ b/models/streamline/silver/core/complete/streamline__complete_transactions.sql @@ -1,16 +1,24 @@ -- depends_on: {{ ref('bronze__streamline_transactions') }} {{ config ( materialized = "incremental", - unique_key = "id", + unique_key = "block_number", cluster_by = "ROUND(block_number, -3)", - post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION on equality(id)", + post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION on equality(block_number)", tags = ['streamline_core_complete'] ) }} SELECT - id, - block_number, - _inserted_timestamp + IFNULL( + VALUE :BLOCK_NUMBER :: INT, + metadata :request :"data" :id :: INT + ) AS block_number, + {{ dbt_utils.generate_surrogate_key( + ['block_number'] + ) }} AS complete_transactions_id, + SYSDATE() AS inserted_timestamp, + SYSDATE() AS modified_timestamp, + _inserted_timestamp, + '{{ invocation_id }}' AS _invocation_id FROM {% if is_incremental() %} @@ -26,6 +34,6 @@ WHERE {{ ref('bronze__streamline_FR_transactions') }} {% endif %} -qualify(ROW_NUMBER() over (PARTITION BY id +qualify(ROW_NUMBER() over (PARTITION BY block_number ORDER BY _inserted_timestamp DESC)) = 1 diff --git a/models/streamline/silver/core/history/streamline__blocks_history.sql b/models/streamline/silver/core/history/streamline__blocks_history.sql index c4b1d3f..182893e 100644 --- a/models/streamline/silver/core/history/streamline__blocks_history.sql +++ b/models/streamline/silver/core/history/streamline__blocks_history.sql @@ -1,8 +1,13 @@ {{ config ( materialized = "view", - post_hook = if_data_call_function( - func = "{{this.schema}}.udf_rest_api(object_construct('sql_source', '{{this.identifier}}', 'external_table', 'blocks', 'sql_limit', {{var('sql_limit','100000')}}, 'producer_batch_size', {{var('producer_batch_size','100000')}}, 'worker_batch_size', {{var('worker_batch_size','50000')}}, 'sm_secret_name','prod/blast/mainnet'))", - target = "{{this.schema}}.{{this.identifier}}" + post_hook = fsc_utils.if_data_call_function_v2( + func = 'streamline.udf_bulk_rest_api_v2', + target = "{{this.schema}}.{{this.identifier}}", + params ={ "external_table" :"blocks", + "sql_limit" :"100000", + "producer_batch_size" :"100000", + "worker_batch_size" :"50000", + "sql_source" :"{{this.identifier}}" } ), tags = ['streamline_core_history'] ) }} @@ -16,9 +21,6 @@ WITH last_3_days AS ( ), blocks AS ( SELECT - {{ dbt_utils.generate_surrogate_key( - ['block_number'] - ) }} AS id, block_number FROM {{ ref("streamline__blocks") }} @@ -31,7 +33,6 @@ blocks AS ( ) EXCEPT SELECT - id, block_number FROM {{ ref("streamline__complete_blocks") }} @@ -44,29 +45,28 @@ blocks AS ( ) ) SELECT - block_number AS partition_key, - OBJECT_CONSTRUCT( - 'method', + block_number, + ROUND( + block_number, + -3 + ) AS partition_key, + {{ target.database }}.live.udf_api( 'POST', - 'url', '{service}/{Authentication}', - 'headers', OBJECT_CONSTRUCT( 'Content-Type', 'application/json' ), - 'params', - PARSE_JSON('{}'), - 'data', OBJECT_CONSTRUCT( 'id', - block_number :: STRING, + block_number, 'jsonrpc', '2.0', 'method', 'eth_getBlockByNumber', 'params', - ARRAY_CONSTRUCT(utils.udf_int_to_hex(block_number), FALSE)) :: STRING + ARRAY_CONSTRUCT(utils.udf_int_to_hex(block_number), FALSE)), + 'vault/prod/blast/mainnet' ) AS request FROM blocks diff --git a/models/streamline/silver/core/history/streamline__receipts_history.sql b/models/streamline/silver/core/history/streamline__receipts_history.sql index c470ed1..a1ca76a 100644 --- a/models/streamline/silver/core/history/streamline__receipts_history.sql +++ b/models/streamline/silver/core/history/streamline__receipts_history.sql @@ -1,8 +1,14 @@ {{ config ( materialized = "view", - post_hook = if_data_call_function( - func = "{{this.schema}}.udf_rest_api(object_construct('sql_source', '{{this.identifier}}', 'external_table', 'receipts', 'exploded_key','[\"result\"]', 'sql_limit', {{var('sql_limit','100000')}}, 'producer_batch_size', {{var('producer_batch_size','100000')}}, 'worker_batch_size', {{var('worker_batch_size','50000')}}, 'sm_secret_name','prod/blast/mainnet'))", - target = "{{this.schema}}.{{this.identifier}}" + post_hook = fsc_utils.if_data_call_function_v2( + func = 'streamline.udf_bulk_rest_api_v2', + target = "{{this.schema}}.{{this.identifier}}", + params ={ "external_table" :"receipts", + "sql_limit" :"100000", + "producer_batch_size" :"100000", + "worker_batch_size" :"50000", + "sql_source" :"{{this.identifier}}", + "exploded_key": "[\"result\"]" } ), tags = ['streamline_core_history'] ) }} @@ -40,29 +46,28 @@ blocks AS ( ) ) SELECT - block_number AS partition_key, - OBJECT_CONSTRUCT( - 'method', + block_number, + ROUND( + block_number, + -3 + ) AS partition_key, + {{ target.database }}.live.udf_api( 'POST', - 'url', '{service}/{Authentication}', - 'headers', OBJECT_CONSTRUCT( 'Content-Type', 'application/json' ), - 'params', - PARSE_JSON('{}'), - 'data', OBJECT_CONSTRUCT( 'id', - block_number :: STRING, + block_number, 'jsonrpc', '2.0', 'method', 'eth_getBlockReceipts', 'params', - ARRAY_CONSTRUCT(utils.udf_int_to_hex(block_number))) :: STRING + ARRAY_CONSTRUCT(utils.udf_int_to_hex(block_number))), + 'vault/prod/blast/mainnet' ) AS request FROM blocks diff --git a/models/streamline/silver/core/history/streamline__traces_history.sql b/models/streamline/silver/core/history/streamline__traces_history.sql index 7285e89..78898e4 100644 --- a/models/streamline/silver/core/history/streamline__traces_history.sql +++ b/models/streamline/silver/core/history/streamline__traces_history.sql @@ -1,8 +1,14 @@ {{ config ( materialized = "view", - post_hook = if_data_call_function( - func = "{{this.schema}}.udf_rest_api(object_construct('sql_source', '{{this.identifier}}', 'external_table', 'traces', 'exploded_key','[\"result\"]', 'sql_limit', {{var('sql_limit','100000')}}, 'producer_batch_size', {{var('producer_batch_size','100000')}}, 'worker_batch_size', {{var('worker_batch_size','50000')}}, 'sm_secret_name','prod/blast/mainnet'))", - target = "{{this.schema}}.{{this.identifier}}" + post_hook = fsc_utils.if_data_call_function_v2( + func = 'streamline.udf_bulk_rest_api_v2', + target = "{{this.schema}}.{{this.identifier}}", + params ={ "external_table" :"traces", + "sql_limit" :"100000", + "producer_batch_size" :"100000", + "worker_batch_size" :"50000", + "sql_source" :"{{this.identifier}}", + "exploded_key": "[\"result\"]" } ), tags = ['streamline_core_history'] ) }} @@ -40,30 +46,29 @@ blocks AS ( ) ) SELECT - block_number AS partition_key, - OBJECT_CONSTRUCT( - 'method', + block_number, + ROUND( + block_number, + -3 + ) AS partition_key, + {{ target.database }}.live.udf_api( 'POST', - 'url', '{service}/{Authentication}', - 'headers', OBJECT_CONSTRUCT( 'Content-Type', 'application/json' ), - 'params', - PARSE_JSON('{}'), - 'data', OBJECT_CONSTRUCT( 'id', - block_number :: STRING, + block_number, 'jsonrpc', '2.0', 'method', 'debug_traceBlockByNumber', 'params', ARRAY_CONSTRUCT(utils.udf_int_to_hex(block_number), OBJECT_CONSTRUCT('tracer', 'callTracer', 'timeout', '30s')) - ) :: STRING + ), + 'vault/prod/blast/mainnet' ) AS request FROM blocks diff --git a/models/streamline/silver/core/history/streamline__transactions_history.sql b/models/streamline/silver/core/history/streamline__transactions_history.sql index 47a07c5..9b33383 100644 --- a/models/streamline/silver/core/history/streamline__transactions_history.sql +++ b/models/streamline/silver/core/history/streamline__transactions_history.sql @@ -1,8 +1,14 @@ {{ config ( materialized = "view", - post_hook = if_data_call_function( - func = "{{this.schema}}.udf_rest_api(object_construct('sql_source', '{{this.identifier}}', 'external_table', 'transactions', 'exploded_key','[\"result\", \"transactions\"]', 'sql_limit', {{var('sql_limit','100000')}}, 'producer_batch_size', {{var('producer_batch_size','100000')}}, 'worker_batch_size', {{var('worker_batch_size','50000')}}, 'sm_secret_name','prod/blast/mainnet'))", - target = "{{this.schema}}.{{this.identifier}}" + post_hook = fsc_utils.if_data_call_function_v2( + func = 'streamline.udf_bulk_rest_api_v2', + target = "{{this.schema}}.{{this.identifier}}", + params ={ "external_table" :"transactions", + "sql_limit" :"100000", + "producer_batch_size" :"100000", + "worker_batch_size" :"50000", + "sql_source" :"{{this.identifier}}", + "exploded_key": "[\"result\", \"transactions\"]" } ), tags = ['streamline_core_history'] ) }} @@ -16,9 +22,6 @@ WITH last_3_days AS ( ), blocks AS ( SELECT - {{ dbt_utils.generate_surrogate_key( - ['block_number'] - ) }} AS id, block_number FROM {{ ref("streamline__blocks") }} @@ -31,7 +34,6 @@ blocks AS ( ) EXCEPT SELECT - id, block_number FROM {{ ref("streamline__complete_transactions") }} @@ -44,29 +46,28 @@ blocks AS ( ) ) SELECT - block_number AS partition_key, - OBJECT_CONSTRUCT( - 'method', + block_number, + ROUND( + block_number, + -3 + ) AS partition_key, + {{ target.database }}.live.udf_api( 'POST', - 'url', '{service}/{Authentication}', - 'headers', OBJECT_CONSTRUCT( 'Content-Type', 'application/json' ), - 'params', - PARSE_JSON('{}'), - 'data', OBJECT_CONSTRUCT( 'id', - block_number :: STRING, + block_number, 'jsonrpc', '2.0', 'method', 'eth_getBlockByNumber', 'params', - ARRAY_CONSTRUCT(utils.udf_int_to_hex(block_number), TRUE)) :: STRING + ARRAY_CONSTRUCT(utils.udf_int_to_hex(block_number), TRUE)), + 'vault/prod/blast/mainnet' ) AS request FROM blocks diff --git a/models/streamline/silver/core/realtime/streamline__blocks_realtime.sql b/models/streamline/silver/core/realtime/streamline__blocks_realtime.sql index 4757aa3..0f17a0d 100644 --- a/models/streamline/silver/core/realtime/streamline__blocks_realtime.sql +++ b/models/streamline/silver/core/realtime/streamline__blocks_realtime.sql @@ -1,8 +1,13 @@ {{ config ( materialized = "view", - post_hook = if_data_call_function( - func = "{{this.schema}}.udf_rest_api(object_construct('sql_source', '{{this.identifier}}', 'external_table', 'blocks', 'sql_limit', {{var('sql_limit','100000')}}, 'producer_batch_size', {{var('producer_batch_size','100000')}}, 'worker_batch_size', {{var('worker_batch_size','50000')}}, 'sm_secret_name','prod/blast/mainnet'))", - target = "{{this.schema}}.{{this.identifier}}" + post_hook = fsc_utils.if_data_call_function_v2( + func = 'streamline.udf_bulk_rest_api_v2', + target = "{{this.schema}}.{{this.identifier}}", + params ={ "external_table" :"blocks", + "sql_limit" :"100000", + "producer_batch_size" :"100000", + "worker_batch_size" :"50000", + "sql_source" :"{{this.identifier}}" } ), tags = ['streamline_core_realtime'] ) }} @@ -16,11 +21,6 @@ WITH last_3_days AS ( ), to_do AS ( SELECT - MD5( - CAST( - COALESCE(CAST(block_number AS text), '' :: STRING) AS text - ) - ) AS id, block_number FROM {{ ref("streamline__blocks") }} @@ -36,7 +36,6 @@ to_do AS ( AND block_number IS NOT NULL EXCEPT SELECT - id, block_number FROM {{ ref("streamline__complete_blocks") }} @@ -54,29 +53,28 @@ to_do AS ( ) ) SELECT - block_number AS partition_key, - OBJECT_CONSTRUCT( - 'method', + block_number, + ROUND( + block_number, + -3 + ) AS partition_key, + {{ target.database }}.live.udf_api( 'POST', - 'url', '{service}/{Authentication}', - 'headers', OBJECT_CONSTRUCT( 'Content-Type', 'application/json' ), - 'params', - PARSE_JSON('{}'), - 'data', OBJECT_CONSTRUCT( 'id', - block_number :: STRING, + block_number, 'jsonrpc', '2.0', 'method', 'eth_getBlockByNumber', 'params', - ARRAY_CONSTRUCT(utils.udf_int_to_hex(block_number), FALSE)) :: STRING + ARRAY_CONSTRUCT(utils.udf_int_to_hex(block_number), FALSE)), + 'vault/prod/blast/mainnet' ) AS request FROM to_do diff --git a/models/streamline/silver/core/realtime/streamline__confirm_blocks_realtime.sql b/models/streamline/silver/core/realtime/streamline__confirm_blocks_realtime.sql index e1a0f89..e41b698 100644 --- a/models/streamline/silver/core/realtime/streamline__confirm_blocks_realtime.sql +++ b/models/streamline/silver/core/realtime/streamline__confirm_blocks_realtime.sql @@ -1,8 +1,13 @@ {{ config ( materialized = "view", - post_hook = if_data_call_function( - func = "{{this.schema}}.udf_rest_api(object_construct('sql_source', '{{this.identifier}}', 'external_table', 'confirm_blocks', 'sql_limit', {{var('sql_limit','100000')}}, 'producer_batch_size', {{var('producer_batch_size','100000')}}, 'worker_batch_size', {{var('worker_batch_size','50000')}}, 'sm_secret_name','prod/blast/mainnet'))", - target = "{{this.schema}}.{{this.identifier}}" + post_hook = fsc_utils.if_data_call_function_v2( + func = 'streamline.udf_bulk_rest_api_v2', + target = "{{this.schema}}.{{this.identifier}}", + params ={ "external_table" :"confirm_blocks", + "sql_limit" :"100000", + "producer_batch_size" :"100000", + "worker_batch_size" :"50000", + "sql_source" :"{{this.identifier}}" } ), tags = ['streamline_core_realtime'] ) }} @@ -69,29 +74,28 @@ tbl AS ( ) ) SELECT - block_number AS partition_key, - OBJECT_CONSTRUCT( - 'method', + block_number, + ROUND( + block_number, + -3 + ) AS partition_key, + {{ target.database }}.live.udf_api( 'POST', - 'url', '{service}/{Authentication}', - 'headers', OBJECT_CONSTRUCT( 'Content-Type', 'application/json' ), - 'params', - PARSE_JSON('{}'), - 'data', OBJECT_CONSTRUCT( 'id', - block_number :: STRING, + block_number, 'jsonrpc', '2.0', 'method', 'eth_getBlockByNumber', 'params', - ARRAY_CONSTRUCT(utils.udf_int_to_hex(block_number), FALSE)) :: STRING + ARRAY_CONSTRUCT(utils.udf_int_to_hex(block_number), FALSE)), + 'vault/prod/blast/mainnet' ) AS request FROM tbl diff --git a/models/streamline/silver/core/realtime/streamline__receipts_realtime.sql b/models/streamline/silver/core/realtime/streamline__receipts_realtime.sql index 7ce1419..3814d56 100644 --- a/models/streamline/silver/core/realtime/streamline__receipts_realtime.sql +++ b/models/streamline/silver/core/realtime/streamline__receipts_realtime.sql @@ -1,8 +1,14 @@ {{ config ( materialized = "view", - post_hook = if_data_call_function( - func = "{{this.schema}}.udf_rest_api(object_construct('sql_source', '{{this.identifier}}', 'external_table', 'receipts', 'exploded_key','[\"result\"]', 'sql_limit', {{var('sql_limit','100000')}}, 'producer_batch_size', {{var('producer_batch_size','100000')}}, 'worker_batch_size', {{var('worker_batch_size','50000')}}, 'sm_secret_name','prod/blast/mainnet'))", - target = "{{this.schema}}.{{this.identifier}}" + post_hook = fsc_utils.if_data_call_function_v2( + func = 'streamline.udf_bulk_rest_api_v2', + target = "{{this.schema}}.{{this.identifier}}", + params ={ "external_table" :"receipts", + "sql_limit" :"100000", + "producer_batch_size" :"100000", + "worker_batch_size" :"50000", + "sql_source" :"{{this.identifier}}", + "exploded_key": "[\"result\"]" } ), tags = ['streamline_core_realtime'] ) }} @@ -74,29 +80,28 @@ ready_blocks AS ( ) ) SELECT - block_number AS partition_key, - OBJECT_CONSTRUCT( - 'method', + block_number, + ROUND( + block_number, + -3 + ) AS partition_key, + {{ target.database }}.live.udf_api( 'POST', - 'url', '{service}/{Authentication}', - 'headers', OBJECT_CONSTRUCT( 'Content-Type', 'application/json' ), - 'params', - PARSE_JSON('{}'), - 'data', OBJECT_CONSTRUCT( 'id', - block_number :: STRING, + block_number, 'jsonrpc', '2.0', 'method', 'eth_getBlockReceipts', 'params', - ARRAY_CONSTRUCT(utils.udf_int_to_hex(block_number))) :: STRING + ARRAY_CONSTRUCT(utils.udf_int_to_hex(block_number))), + 'vault/prod/blast/mainnet' ) AS request FROM ready_blocks diff --git a/models/streamline/silver/core/realtime/streamline__traces_realtime.sql b/models/streamline/silver/core/realtime/streamline__traces_realtime.sql index d626b65..8006434 100644 --- a/models/streamline/silver/core/realtime/streamline__traces_realtime.sql +++ b/models/streamline/silver/core/realtime/streamline__traces_realtime.sql @@ -1,8 +1,14 @@ {{ config ( materialized = "view", - post_hook = if_data_call_function( - func = "{{this.schema}}.udf_rest_api(object_construct('sql_source', '{{this.identifier}}', 'external_table', 'traces', 'exploded_key','[\"result\"]', 'sql_limit', {{var('sql_limit','100000')}}, 'producer_batch_size', {{var('producer_batch_size','100000')}}, 'worker_batch_size', {{var('worker_batch_size','50000')}}, 'sm_secret_name','prod/blast/mainnet'))", - target = "{{this.schema}}.{{this.identifier}}" + post_hook = fsc_utils.if_data_call_function_v2( + func = 'streamline.udf_bulk_rest_api_v2', + target = "{{this.schema}}.{{this.identifier}}", + params ={ "external_table" :"traces", + "sql_limit" :"100000", + "producer_batch_size" :"100000", + "worker_batch_size" :"50000", + "sql_source" :"{{this.identifier}}", + "exploded_key": "[\"result\"]" } ), tags = ['streamline_core_realtime'] ) }} @@ -69,30 +75,29 @@ ready_blocks AS ( ) ) SELECT - block_number AS partition_key, - OBJECT_CONSTRUCT( - 'method', + block_number, + ROUND( + block_number, + -3 + ) AS partition_key, + {{ target.database }}.live.udf_api( 'POST', - 'url', '{service}/{Authentication}', - 'headers', OBJECT_CONSTRUCT( 'Content-Type', 'application/json' ), - 'params', - PARSE_JSON('{}'), - 'data', OBJECT_CONSTRUCT( 'id', - block_number :: STRING, + block_number, 'jsonrpc', '2.0', 'method', 'debug_traceBlockByNumber', 'params', ARRAY_CONSTRUCT(utils.udf_int_to_hex(block_number), OBJECT_CONSTRUCT('tracer', 'callTracer', 'timeout', '30s')) - ) :: STRING + ), + 'vault/prod/blast/mainnet' ) AS request FROM ready_blocks diff --git a/models/streamline/silver/core/realtime/streamline__transactions_realtime.sql b/models/streamline/silver/core/realtime/streamline__transactions_realtime.sql index a52153c..472b684 100644 --- a/models/streamline/silver/core/realtime/streamline__transactions_realtime.sql +++ b/models/streamline/silver/core/realtime/streamline__transactions_realtime.sql @@ -1,8 +1,14 @@ {{ config ( materialized = "view", - post_hook = if_data_call_function( - func = "{{this.schema}}.udf_rest_api(object_construct('sql_source', '{{this.identifier}}', 'external_table', 'transactions', 'exploded_key','[\"result\", \"transactions\"]', 'sql_limit', {{var('sql_limit','100000')}}, 'producer_batch_size', {{var('producer_batch_size','100000')}}, 'worker_batch_size', {{var('worker_batch_size','50000')}}, 'sm_secret_name','prod/blast/mainnet'))", - target = "{{this.schema}}.{{this.identifier}}" + post_hook = fsc_utils.if_data_call_function_v2( + func = 'streamline.udf_bulk_rest_api_v2', + target = "{{this.schema}}.{{this.identifier}}", + params ={ "external_table" :"transactions", + "sql_limit" :"100000", + "producer_batch_size" :"100000", + "worker_batch_size" :"50000", + "sql_source" :"{{this.identifier}}", + "exploded_key": "[\"result\", \"transactions\"]" } ), tags = ['streamline_core_realtime'] ) }} @@ -16,11 +22,6 @@ WITH last_3_days AS ( ), to_do AS ( SELECT - MD5( - CAST( - COALESCE(CAST(block_number AS text), '' :: STRING) AS text - ) - ) AS id, block_number FROM {{ ref("streamline__blocks") }} @@ -36,7 +37,6 @@ to_do AS ( AND block_number IS NOT NULL EXCEPT SELECT - id, block_number FROM {{ ref("streamline__complete_transactions") }} @@ -55,17 +55,11 @@ to_do AS ( ), ready_blocks AS ( SELECT - id, block_number FROM to_do UNION SELECT - MD5( - CAST( - COALESCE(CAST(block_number AS text), '' :: STRING) AS text - ) - ) AS id, block_number FROM ( @@ -81,29 +75,28 @@ ready_blocks AS ( ) ) SELECT - block_number AS partition_key, - OBJECT_CONSTRUCT( - 'method', + block_number, + ROUND( + block_number, + -3 + ) AS partition_key, + {{ target.database }}.live.udf_api( 'POST', - 'url', '{service}/{Authentication}', - 'headers', OBJECT_CONSTRUCT( 'Content-Type', 'application/json' ), - 'params', - PARSE_JSON('{}'), - 'data', OBJECT_CONSTRUCT( 'id', - block_number :: STRING, + block_number, 'jsonrpc', '2.0', 'method', 'eth_getBlockByNumber', 'params', - ARRAY_CONSTRUCT(utils.udf_int_to_hex(block_number), TRUE)) :: STRING + ARRAY_CONSTRUCT(utils.udf_int_to_hex(block_number), TRUE)), + 'vault/prod/blast/mainnet' ) AS request FROM ready_blocks diff --git a/models/streamline/silver/core/streamline__get_chainhead.sql b/models/streamline/silver/core/streamline__get_chainhead.sql index b231aab..7be0901 100644 --- a/models/streamline/silver/core/streamline__get_chainhead.sql +++ b/models/streamline/silver/core/streamline__get_chainhead.sql @@ -4,12 +4,25 @@ ) }} SELECT - live.udf_api( + {{ target.database }}.live.udf_api( 'POST', - '{service}/{Authentication}',{},{ 'method' :'eth_blockNumber', - 'params' :[], - 'id' :1, - 'jsonrpc' :'2.0' }, + '{service}/{Authentication}', + OBJECT_CONSTRUCT( + 'Content-Type', + 'application/json', + 'fsc-quantum-state', + 'livequery' + ), + OBJECT_CONSTRUCT( + 'id', + 1, + 'jsonrpc', + '2.0', + 'method', + 'eth_blockNumber', + 'params', + [] + ), 'vault/prod/blast/mainnet' ) AS resp, utils.udf_hex_to_int( diff --git a/packages.yml b/packages.yml index 6b10e79..1736389 100644 --- a/packages.yml +++ b/packages.yml @@ -6,6 +6,6 @@ packages: - package: dbt-labs/dbt_utils version: 1.0.0 - git: https://github.com/FlipsideCrypto/fsc-utils.git - revision: v1.23.0 + revision: "v1.23.0" - package: get-select/dbt_snowflake_query_tags version: [">=2.0.0", "<3.0.0"] \ No newline at end of file