Update blast model to release 1.2 (#35)

* updated the blast model to use release 1.2

* removed user.ymal

* chainhead history and package temp

* v1.22

* udf name

* package upgrade, config in dbt_project

* chainhead

* updates for quantum and standardization

* remove limits

* removed the 2nd param in the receipts model

* if data_call_function v2

* block number column adj

* removed param

---------

Co-authored-by: drethereum <trevor.wenokur@gmail.com>
Co-authored-by: drethereum <71602799+drethereum@users.noreply.github.com>
This commit is contained in:
xiuy001 2024-05-02 14:21:55 -04:00 committed by GitHub
parent 388e0d8d0e
commit 5335c7bc05
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
35 changed files with 400 additions and 253 deletions

View File

@ -67,7 +67,22 @@ vars:
HEAL_MODEL: False
HEAL_CURATED_MODEL: []
START_GHA_TASKS: False
API_INTEGRATION: '{{ var("config")[target.name]["API_INTEGRATION"] if var("config")[target.name] else var("config")["dev"]["API_INTEGRATION"] }}'
#### STREAMLINE 2.0 BEGIN ####
API_INTEGRATION: '{{ var("config")[target.name]["API_INTEGRATION"] if var("config")[target.name] else var("config")["dev"]["API_INTEGRATION"] }}'
EXTERNAL_FUNCTION_URI: '{{ var("config")[target.name]["EXTERNAL_FUNCTION_URI"] if var("config")[target.name] else var("config")["dev"]["EXTERNAL_FUNCTION_URI"] }}'
ROLES: |
["INTERNAL_DEV"]
["INTERNAL_DEV"]
config:
# The keys correspond to dbt profiles and are case sensitive
dev:
API_INTEGRATION: AWS_BLAST_API_DEV
EXTERNAL_FUNCTION_URI: y9d0tuavh6.execute-api.us-east-1.amazonaws.com/stg/
prod:
API_INTEGRATION: AWS_BLAST_API_PROD
EXTERNAL_FUNCTION_URI: 42gzudc5si.execute-api.us-east-1.amazonaws.com/prod/
#### STREAMLINE 2.0 END ####

View File

@ -6,7 +6,7 @@
{{ create_udtf_get_base_table(
schema = "streamline"
) }}
{{ create_udf_rest_api() }}
{{ create_udf_bulk_rest_api_v2() }}
{{ create_aws_blast_api() }}
{{ create_udf_bulk_decode_logs() }}

View File

@ -115,3 +115,72 @@ WHERE
)
)
{% endmacro %}
{% macro streamline_external_table_query_v2(
model,
partition_function
) %}
WITH meta AS (
SELECT
job_created_time AS _inserted_timestamp,
file_name,
{{ partition_function }} AS partition_key
FROM
TABLE(
information_schema.external_table_file_registration_history(
start_time => DATEADD('day', -3, CURRENT_TIMESTAMP()),
table_name => '{{ source( "bronze_streamline", model) }}')
) A
)
SELECT
s.*,
b.file_name,
_inserted_timestamp
FROM
{{ source(
"bronze_streamline",
model
) }}
s
JOIN meta b
ON b.file_name = metadata$filename
AND b.partition_key = s.partition_key
WHERE
b.partition_key = s.partition_key
AND DATA :error IS NULL
{% endmacro %}
{% macro streamline_external_table_FR_query_v2(
model,
partition_function
) %}
WITH meta AS (
SELECT
registered_on AS _inserted_timestamp,
file_name,
{{ partition_function }} AS partition_key
FROM
TABLE(
information_schema.external_table_files(
table_name => '{{ source( "bronze_streamline", model) }}'
)
) A
)
SELECT
s.*,
b.file_name,
_inserted_timestamp
FROM
{{ source(
"bronze_streamline",
model
) }}
s
JOIN meta b
ON b.file_name = metadata$filename
AND b.partition_key = s.partition_key
WHERE
b.partition_key = s.partition_key
AND DATA :error IS NULL
{% endmacro %}

View File

@ -1,6 +1,6 @@
{% macro create_udf_rest_api() %}
{% macro create_udf_bulk_rest_api_v2() %}
CREATE
OR REPLACE EXTERNAL FUNCTION streamline.udf_rest_api(
OR REPLACE EXTERNAL FUNCTION streamline.udf_bulk_rest_api_v2(
json OBJECT
) returns ARRAY api_integration =
{% if target.name == "prod" %}

View File

@ -10,50 +10,53 @@
SELECT
DATA,
block_number,
IFNULL(
VALUE :BLOCK_NUMBER :: INT,
metadata :request :"data" :id :: INT
) AS block_number,
utils.udf_hex_to_int(
DATA :result :baseFeePerGas :: STRING
DATA :result :baseFeePerGas :: STRING
) :: INT AS base_fee_per_gas,
utils.udf_hex_to_int(
DATA :result :difficulty :: STRING
DATA :result :difficulty :: STRING
) :: INT AS difficulty,
DATA :result :extraData :: STRING AS extra_data,
DATA :result :extraData :: STRING AS extra_data,
utils.udf_hex_to_int(
DATA :result :gasLimit :: STRING
DATA :result :gasLimit :: STRING
) :: INT AS gas_limit,
utils.udf_hex_to_int(
DATA :result :gasUsed :: STRING
DATA :result :gasUsed :: STRING
) :: INT AS gas_used,
DATA :result :hash :: STRING AS HASH,
DATA :result :logsBloom :: STRING AS logs_bloom,
DATA :result :miner :: STRING AS miner,
DATA :result :mixHash :: STRING AS mixHash,
DATA :result :hash :: STRING AS HASH,
DATA :result :logsBloom :: STRING AS logs_bloom,
DATA :result :miner :: STRING AS miner,
DATA :result :mixHash :: STRING AS mixHash,
utils.udf_hex_to_int(
DATA :result :nonce :: STRING
DATA :result :nonce :: STRING
) :: INT AS nonce,
utils.udf_hex_to_int(
DATA :result :number :: STRING
DATA :result :number :: STRING
) :: INT AS NUMBER,
DATA :result :parentHash :: STRING AS parent_hash,
DATA :result :receiptsRoot :: STRING AS receipts_root,
DATA :result :sha3Uncles :: STRING AS sha3_uncles,
DATA :result :parentHash :: STRING AS parent_hash,
DATA :result :receiptsRoot :: STRING AS receipts_root,
DATA :result :sha3Uncles :: STRING AS sha3_uncles,
utils.udf_hex_to_int(
DATA :result :size :: STRING
DATA :result :size :: STRING
) :: INT AS SIZE,
DATA :result :stateRoot :: STRING AS state_root,
DATA :result :stateRoot :: STRING AS state_root,
utils.udf_hex_to_int(
DATA :result :timestamp :: STRING
DATA :result :timestamp :: STRING
) :: TIMESTAMP AS block_timestamp,
utils.udf_hex_to_int(
DATA :result :totalDifficulty :: STRING
DATA :result :totalDifficulty :: STRING
) :: INT AS total_difficulty,
ARRAY_SIZE(
DATA :result :transactions
DATA :result :transactions
) AS tx_count,
DATA :result :transactionsRoot :: STRING AS transactions_root,
DATA :result :uncles AS uncles,
DATA :result :withdrawals AS withdrawals,
DATA :result :withdrawalsRoot :: STRING AS withdrawals_root,
DATA :result :transactionsRoot :: STRING AS transactions_root,
DATA :result :uncles AS uncles,
DATA :result :withdrawals AS withdrawals,
DATA :result :withdrawalsRoot :: STRING AS withdrawals_root,
_inserted_timestamp,
{{ dbt_utils.generate_surrogate_key(
['block_number']

View File

@ -10,7 +10,10 @@
WITH base AS (
SELECT
block_number,
IFNULL(
VALUE :BLOCK_NUMBER :: INT,
metadata :request :"data" :id :: INT
) AS block_number,
DATA :result :hash :: STRING AS block_hash,
DATA :result :transactions txs,
_inserted_timestamp

View File

@ -12,7 +12,10 @@
WITH base AS (
SELECT
block_number,
IFNULL(
VALUE :BLOCK_NUMBER :: INT,
metadata :request :"data" :id :: INT
) AS block_number,
DATA,
_inserted_timestamp
FROM

View File

@ -12,7 +12,10 @@
WITH bronze_traces AS (
SELECT
block_number,
IFNULL(
VALUE :BLOCK_NUMBER :: INT,
metadata :request :"data" :id :: INT
) AS block_number,
VALUE :array_index :: INT AS tx_position,
DATA :result AS full_traces,
_inserted_timestamp
@ -32,8 +35,7 @@ WHERE
{{ ref('bronze__streamline_FR_traces') }}
WHERE
_partition_by_block_id <= 2000000
AND
DATA :result IS NOT NULL
AND DATA :result IS NOT NULL
{% endif %}
qualify(ROW_NUMBER() over (PARTITION BY block_number, tx_position

View File

@ -12,7 +12,10 @@
WITH base AS (
SELECT
block_number,
IFNULL(
VALUE :BLOCK_NUMBER :: INT,
metadata :request :"data" :id :: INT
) AS block_number,
DATA,
_inserted_timestamp
FROM

View File

@ -2,9 +2,7 @@
materialized = 'view'
) }}
{{ streamline_external_table_FR_query(
{{ streamline_external_table_FR_query_v2(
model = "blocks",
partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 4), '_', 1) AS INTEGER )",
partition_name = "_partition_by_block_id",
unique_key = "partition_key"
partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 4), '_', 1) AS INTEGER )"
) }}

View File

@ -2,9 +2,7 @@
materialized = 'view'
) }}
{{ streamline_external_table_FR_query(
{{ streamline_external_table_FR_query_v2(
model = "confirm_blocks",
partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 4), '_', 1) AS INTEGER )",
partition_name = "_partition_by_block_id",
unique_key = "partition_key"
partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 4), '_', 1) AS INTEGER )"
) }}

View File

@ -2,9 +2,7 @@
materialized = 'view'
) }}
{{ streamline_external_table_FR_query(
{{ streamline_external_table_FR_query_v2(
model = "receipts",
partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 4), '_', 1) AS INTEGER )",
partition_name = "_partition_by_block_id",
unique_key = "partition_key"
partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 4), '_', 1) AS INTEGER )"
) }}

View File

@ -2,9 +2,7 @@
materialized = 'view'
) }}
{{ streamline_external_table_FR_query(
{{ streamline_external_table_FR_query_v2(
model = "traces",
partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 4), '_', 1) AS INTEGER )",
partition_name = "_partition_by_block_id",
unique_key = "partition_key"
partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 4), '_', 1) AS INTEGER )"
) }}

View File

@ -2,9 +2,7 @@
materialized = 'view'
) }}
{{ streamline_external_table_FR_query(
{{ streamline_external_table_FR_query_v2(
model = 'transactions',
partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 4), '_', 1) AS INTEGER )",
partition_name = "_partition_by_block_id",
unique_key = "partition_key"
partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 4), '_', 1) AS INTEGER )"
) }}

View File

@ -2,9 +2,7 @@
materialized = 'view'
) }}
{{ streamline_external_table_query(
{{ streamline_external_table_query_v2(
model = "blocks",
partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 4), '_', 1) AS INTEGER )",
partition_name = "_partition_by_block_id",
unique_key = "partition_key"
partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 4), '_', 1) AS INTEGER )"
) }}

View File

@ -2,9 +2,7 @@
materialized = 'view'
) }}
{{ streamline_external_table_query(
{{ streamline_external_table_query_v2(
model = "confirm_blocks",
partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 4), '_', 1) AS INTEGER )",
partition_name = "_partition_by_block_id",
unique_key = "partition_key"
partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 4), '_', 1) AS INTEGER )"
) }}

View File

@ -2,9 +2,7 @@
materialized = 'view'
) }}
{{ streamline_external_table_query(
{{ streamline_external_table_query_v2(
model = "receipts",
partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 4), '_', 1) AS INTEGER )",
partition_name = "_partition_by_block_id",
unique_key = "partition_key"
partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 4), '_', 1) AS INTEGER )"
) }}

View File

@ -2,9 +2,7 @@
materialized = 'view'
) }}
{{ streamline_external_table_query(
{{ streamline_external_table_query_v2(
model = "traces",
partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 4), '_', 1) AS INTEGER )",
partition_name = "_partition_by_block_id",
unique_key = "partition_key"
partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 4), '_', 1) AS INTEGER )"
) }}

View File

@ -2,9 +2,7 @@
materialized = 'view'
) }}
{{ streamline_external_table_query(
{{ streamline_external_table_query_v2(
model = "transactions",
partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 4), '_', 1) AS INTEGER )",
partition_name = "_partition_by_block_id",
unique_key = "partition_key"
partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 4), '_', 1) AS INTEGER )"
) }}

View File

@ -1,16 +1,24 @@
-- depends_on: {{ ref('bronze__streamline_blocks') }}
{{ config (
materialized = "incremental",
unique_key = "id",
unique_key = "block_number",
cluster_by = "ROUND(block_number, -3)",
post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION on equality(id)",
post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION on equality(block_number)",
tags = ['streamline_core_complete']
) }}
SELECT
id,
block_number,
_inserted_timestamp
IFNULL(
VALUE :BLOCK_NUMBER :: INT,
metadata :request :"data" :id :: INT
) AS block_number,
{{ dbt_utils.generate_surrogate_key(
['block_number']
) }} AS complete_blocks_id,
SYSDATE() AS inserted_timestamp,
SYSDATE() AS modified_timestamp,
_inserted_timestamp,
'{{ invocation_id }}' AS _invocation_id
FROM
{% if is_incremental() %}
@ -26,6 +34,6 @@ WHERE
{{ ref('bronze__streamline_FR_blocks') }}
{% endif %}
qualify(ROW_NUMBER() over (PARTITION BY id
qualify(ROW_NUMBER() over (PARTITION BY block_number
ORDER BY
_inserted_timestamp DESC)) = 1

View File

@ -1,15 +1,23 @@
-- depends_on: {{ ref('bronze__streamline_confirm_blocks') }}
{{ config (
materialized = "incremental",
unique_key = "id",
unique_key = "block_number",
cluster_by = "ROUND(block_number, -3)",
tags = ['streamline_core_complete']
) }}
SELECT
id,
block_number,
_inserted_timestamp
IFNULL(
VALUE :BLOCK_NUMBER :: INT,
metadata :request :"data" :id :: INT
) AS block_number,
{{ dbt_utils.generate_surrogate_key(
['block_number']
) }} AS complete_confirmed_blocks_id,
SYSDATE() AS inserted_timestamp,
SYSDATE() AS modified_timestamp,
_inserted_timestamp,
'{{ invocation_id }}' AS _invocation_id
FROM
{% if is_incremental() %}
@ -24,6 +32,6 @@ WHERE
{{ ref('bronze__streamline_FR_confirm_blocks') }}
{% endif %}
qualify(ROW_NUMBER() over (PARTITION BY id
qualify(ROW_NUMBER() over (PARTITION BY block_number
ORDER BY
_inserted_timestamp DESC)) = 1

View File

@ -1,16 +1,24 @@
-- depends_on: {{ ref('bronze__streamline_receipts') }}
{{ config (
materialized = "incremental",
unique_key = "id",
unique_key = "block_number",
cluster_by = "ROUND(block_number, -3)",
post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION on equality(id)",
post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION on equality(block_number)",
tags = ['streamline_core_complete']
) }}
SELECT
id,
block_number,
_inserted_timestamp
IFNULL(
VALUE :BLOCK_NUMBER :: INT,
metadata :request :"data" :id :: INT
) AS block_number,
{{ dbt_utils.generate_surrogate_key(
['block_number']
) }} AS complete_receipts_id,
SYSDATE() AS inserted_timestamp,
SYSDATE() AS modified_timestamp,
_inserted_timestamp,
'{{ invocation_id }}' AS _invocation_id
FROM
{% if is_incremental() %}
@ -26,6 +34,6 @@ WHERE
{{ ref('bronze__streamline_FR_receipts') }}
{% endif %}
qualify(ROW_NUMBER() over (PARTITION BY id
qualify(ROW_NUMBER() over (PARTITION BY block_number
ORDER BY
_inserted_timestamp DESC)) = 1

View File

@ -1,16 +1,24 @@
-- depends_on: {{ ref('bronze__streamline_traces') }}
{{ config (
materialized = "incremental",
unique_key = "id",
unique_key = "block_number",
cluster_by = "ROUND(block_number, -3)",
post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION on equality(id)",
post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION on equality(block_number)",
tags = ['streamline_core_complete']
) }}
SELECT
id,
block_number,
_inserted_timestamp
IFNULL(
VALUE :BLOCK_NUMBER :: INT,
metadata :request :"data" :id :: INT
) AS block_number,
{{ dbt_utils.generate_surrogate_key(
['block_number']
) }} AS complete_traces_id,
SYSDATE() AS inserted_timestamp,
SYSDATE() AS modified_timestamp,
_inserted_timestamp,
'{{ invocation_id }}' AS _invocation_id
FROM
{% if is_incremental() %}
@ -26,6 +34,6 @@ WHERE
{{ ref('bronze__streamline_FR_traces') }}
{% endif %}
qualify(ROW_NUMBER() over (PARTITION BY id
qualify(ROW_NUMBER() over (PARTITION BY block_number
ORDER BY
_inserted_timestamp DESC)) = 1

View File

@ -1,16 +1,24 @@
-- depends_on: {{ ref('bronze__streamline_transactions') }}
{{ config (
materialized = "incremental",
unique_key = "id",
unique_key = "block_number",
cluster_by = "ROUND(block_number, -3)",
post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION on equality(id)",
post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION on equality(block_number)",
tags = ['streamline_core_complete']
) }}
SELECT
id,
block_number,
_inserted_timestamp
IFNULL(
VALUE :BLOCK_NUMBER :: INT,
metadata :request :"data" :id :: INT
) AS block_number,
{{ dbt_utils.generate_surrogate_key(
['block_number']
) }} AS complete_transactions_id,
SYSDATE() AS inserted_timestamp,
SYSDATE() AS modified_timestamp,
_inserted_timestamp,
'{{ invocation_id }}' AS _invocation_id
FROM
{% if is_incremental() %}
@ -26,6 +34,6 @@ WHERE
{{ ref('bronze__streamline_FR_transactions') }}
{% endif %}
qualify(ROW_NUMBER() over (PARTITION BY id
qualify(ROW_NUMBER() over (PARTITION BY block_number
ORDER BY
_inserted_timestamp DESC)) = 1

View File

@ -1,8 +1,13 @@
{{ config (
materialized = "view",
post_hook = if_data_call_function(
func = "{{this.schema}}.udf_rest_api(object_construct('sql_source', '{{this.identifier}}', 'external_table', 'blocks', 'sql_limit', {{var('sql_limit','100000')}}, 'producer_batch_size', {{var('producer_batch_size','100000')}}, 'worker_batch_size', {{var('worker_batch_size','50000')}}, 'sm_secret_name','prod/blast/mainnet'))",
target = "{{this.schema}}.{{this.identifier}}"
post_hook = fsc_utils.if_data_call_function_v2(
func = 'streamline.udf_bulk_rest_api_v2',
target = "{{this.schema}}.{{this.identifier}}",
params ={ "external_table" :"blocks",
"sql_limit" :"100000",
"producer_batch_size" :"100000",
"worker_batch_size" :"50000",
"sql_source" :"{{this.identifier}}" }
),
tags = ['streamline_core_history']
) }}
@ -16,9 +21,6 @@ WITH last_3_days AS (
),
blocks AS (
SELECT
{{ dbt_utils.generate_surrogate_key(
['block_number']
) }} AS id,
block_number
FROM
{{ ref("streamline__blocks") }}
@ -31,7 +33,6 @@ blocks AS (
)
EXCEPT
SELECT
id,
block_number
FROM
{{ ref("streamline__complete_blocks") }}
@ -44,29 +45,28 @@ blocks AS (
)
)
SELECT
block_number AS partition_key,
OBJECT_CONSTRUCT(
'method',
block_number,
ROUND(
block_number,
-3
) AS partition_key,
{{ target.database }}.live.udf_api(
'POST',
'url',
'{service}/{Authentication}',
'headers',
OBJECT_CONSTRUCT(
'Content-Type',
'application/json'
),
'params',
PARSE_JSON('{}'),
'data',
OBJECT_CONSTRUCT(
'id',
block_number :: STRING,
block_number,
'jsonrpc',
'2.0',
'method',
'eth_getBlockByNumber',
'params',
ARRAY_CONSTRUCT(utils.udf_int_to_hex(block_number), FALSE)) :: STRING
ARRAY_CONSTRUCT(utils.udf_int_to_hex(block_number), FALSE)),
'vault/prod/blast/mainnet'
) AS request
FROM
blocks

View File

@ -1,8 +1,14 @@
{{ config (
materialized = "view",
post_hook = if_data_call_function(
func = "{{this.schema}}.udf_rest_api(object_construct('sql_source', '{{this.identifier}}', 'external_table', 'receipts', 'exploded_key','[\"result\"]', 'sql_limit', {{var('sql_limit','100000')}}, 'producer_batch_size', {{var('producer_batch_size','100000')}}, 'worker_batch_size', {{var('worker_batch_size','50000')}}, 'sm_secret_name','prod/blast/mainnet'))",
target = "{{this.schema}}.{{this.identifier}}"
post_hook = fsc_utils.if_data_call_function_v2(
func = 'streamline.udf_bulk_rest_api_v2',
target = "{{this.schema}}.{{this.identifier}}",
params ={ "external_table" :"receipts",
"sql_limit" :"100000",
"producer_batch_size" :"100000",
"worker_batch_size" :"50000",
"sql_source" :"{{this.identifier}}",
"exploded_key": "[\"result\"]" }
),
tags = ['streamline_core_history']
) }}
@ -40,29 +46,28 @@ blocks AS (
)
)
SELECT
block_number AS partition_key,
OBJECT_CONSTRUCT(
'method',
block_number,
ROUND(
block_number,
-3
) AS partition_key,
{{ target.database }}.live.udf_api(
'POST',
'url',
'{service}/{Authentication}',
'headers',
OBJECT_CONSTRUCT(
'Content-Type',
'application/json'
),
'params',
PARSE_JSON('{}'),
'data',
OBJECT_CONSTRUCT(
'id',
block_number :: STRING,
block_number,
'jsonrpc',
'2.0',
'method',
'eth_getBlockReceipts',
'params',
ARRAY_CONSTRUCT(utils.udf_int_to_hex(block_number))) :: STRING
ARRAY_CONSTRUCT(utils.udf_int_to_hex(block_number))),
'vault/prod/blast/mainnet'
) AS request
FROM
blocks

View File

@ -1,8 +1,14 @@
{{ config (
materialized = "view",
post_hook = if_data_call_function(
func = "{{this.schema}}.udf_rest_api(object_construct('sql_source', '{{this.identifier}}', 'external_table', 'traces', 'exploded_key','[\"result\"]', 'sql_limit', {{var('sql_limit','100000')}}, 'producer_batch_size', {{var('producer_batch_size','100000')}}, 'worker_batch_size', {{var('worker_batch_size','50000')}}, 'sm_secret_name','prod/blast/mainnet'))",
target = "{{this.schema}}.{{this.identifier}}"
post_hook = fsc_utils.if_data_call_function_v2(
func = 'streamline.udf_bulk_rest_api_v2',
target = "{{this.schema}}.{{this.identifier}}",
params ={ "external_table" :"traces",
"sql_limit" :"100000",
"producer_batch_size" :"100000",
"worker_batch_size" :"50000",
"sql_source" :"{{this.identifier}}",
"exploded_key": "[\"result\"]" }
),
tags = ['streamline_core_history']
) }}
@ -40,30 +46,29 @@ blocks AS (
)
)
SELECT
block_number AS partition_key,
OBJECT_CONSTRUCT(
'method',
block_number,
ROUND(
block_number,
-3
) AS partition_key,
{{ target.database }}.live.udf_api(
'POST',
'url',
'{service}/{Authentication}',
'headers',
OBJECT_CONSTRUCT(
'Content-Type',
'application/json'
),
'params',
PARSE_JSON('{}'),
'data',
OBJECT_CONSTRUCT(
'id',
block_number :: STRING,
block_number,
'jsonrpc',
'2.0',
'method',
'debug_traceBlockByNumber',
'params',
ARRAY_CONSTRUCT(utils.udf_int_to_hex(block_number), OBJECT_CONSTRUCT('tracer', 'callTracer', 'timeout', '30s'))
) :: STRING
),
'vault/prod/blast/mainnet'
) AS request
FROM
blocks

View File

@ -1,8 +1,14 @@
{{ config (
materialized = "view",
post_hook = if_data_call_function(
func = "{{this.schema}}.udf_rest_api(object_construct('sql_source', '{{this.identifier}}', 'external_table', 'transactions', 'exploded_key','[\"result\", \"transactions\"]', 'sql_limit', {{var('sql_limit','100000')}}, 'producer_batch_size', {{var('producer_batch_size','100000')}}, 'worker_batch_size', {{var('worker_batch_size','50000')}}, 'sm_secret_name','prod/blast/mainnet'))",
target = "{{this.schema}}.{{this.identifier}}"
post_hook = fsc_utils.if_data_call_function_v2(
func = 'streamline.udf_bulk_rest_api_v2',
target = "{{this.schema}}.{{this.identifier}}",
params ={ "external_table" :"transactions",
"sql_limit" :"100000",
"producer_batch_size" :"100000",
"worker_batch_size" :"50000",
"sql_source" :"{{this.identifier}}",
"exploded_key": "[\"result\", \"transactions\"]" }
),
tags = ['streamline_core_history']
) }}
@ -16,9 +22,6 @@ WITH last_3_days AS (
),
blocks AS (
SELECT
{{ dbt_utils.generate_surrogate_key(
['block_number']
) }} AS id,
block_number
FROM
{{ ref("streamline__blocks") }}
@ -31,7 +34,6 @@ blocks AS (
)
EXCEPT
SELECT
id,
block_number
FROM
{{ ref("streamline__complete_transactions") }}
@ -44,29 +46,28 @@ blocks AS (
)
)
SELECT
block_number AS partition_key,
OBJECT_CONSTRUCT(
'method',
block_number,
ROUND(
block_number,
-3
) AS partition_key,
{{ target.database }}.live.udf_api(
'POST',
'url',
'{service}/{Authentication}',
'headers',
OBJECT_CONSTRUCT(
'Content-Type',
'application/json'
),
'params',
PARSE_JSON('{}'),
'data',
OBJECT_CONSTRUCT(
'id',
block_number :: STRING,
block_number,
'jsonrpc',
'2.0',
'method',
'eth_getBlockByNumber',
'params',
ARRAY_CONSTRUCT(utils.udf_int_to_hex(block_number), TRUE)) :: STRING
ARRAY_CONSTRUCT(utils.udf_int_to_hex(block_number), TRUE)),
'vault/prod/blast/mainnet'
) AS request
FROM
blocks

View File

@ -1,8 +1,13 @@
{{ config (
materialized = "view",
post_hook = if_data_call_function(
func = "{{this.schema}}.udf_rest_api(object_construct('sql_source', '{{this.identifier}}', 'external_table', 'blocks', 'sql_limit', {{var('sql_limit','100000')}}, 'producer_batch_size', {{var('producer_batch_size','100000')}}, 'worker_batch_size', {{var('worker_batch_size','50000')}}, 'sm_secret_name','prod/blast/mainnet'))",
target = "{{this.schema}}.{{this.identifier}}"
post_hook = fsc_utils.if_data_call_function_v2(
func = 'streamline.udf_bulk_rest_api_v2',
target = "{{this.schema}}.{{this.identifier}}",
params ={ "external_table" :"blocks",
"sql_limit" :"100000",
"producer_batch_size" :"100000",
"worker_batch_size" :"50000",
"sql_source" :"{{this.identifier}}" }
),
tags = ['streamline_core_realtime']
) }}
@ -16,11 +21,6 @@ WITH last_3_days AS (
),
to_do AS (
SELECT
MD5(
CAST(
COALESCE(CAST(block_number AS text), '' :: STRING) AS text
)
) AS id,
block_number
FROM
{{ ref("streamline__blocks") }}
@ -36,7 +36,6 @@ to_do AS (
AND block_number IS NOT NULL
EXCEPT
SELECT
id,
block_number
FROM
{{ ref("streamline__complete_blocks") }}
@ -54,29 +53,28 @@ to_do AS (
)
)
SELECT
block_number AS partition_key,
OBJECT_CONSTRUCT(
'method',
block_number,
ROUND(
block_number,
-3
) AS partition_key,
{{ target.database }}.live.udf_api(
'POST',
'url',
'{service}/{Authentication}',
'headers',
OBJECT_CONSTRUCT(
'Content-Type',
'application/json'
),
'params',
PARSE_JSON('{}'),
'data',
OBJECT_CONSTRUCT(
'id',
block_number :: STRING,
block_number,
'jsonrpc',
'2.0',
'method',
'eth_getBlockByNumber',
'params',
ARRAY_CONSTRUCT(utils.udf_int_to_hex(block_number), FALSE)) :: STRING
ARRAY_CONSTRUCT(utils.udf_int_to_hex(block_number), FALSE)),
'vault/prod/blast/mainnet'
) AS request
FROM
to_do

View File

@ -1,8 +1,13 @@
{{ config (
materialized = "view",
post_hook = if_data_call_function(
func = "{{this.schema}}.udf_rest_api(object_construct('sql_source', '{{this.identifier}}', 'external_table', 'confirm_blocks', 'sql_limit', {{var('sql_limit','100000')}}, 'producer_batch_size', {{var('producer_batch_size','100000')}}, 'worker_batch_size', {{var('worker_batch_size','50000')}}, 'sm_secret_name','prod/blast/mainnet'))",
target = "{{this.schema}}.{{this.identifier}}"
post_hook = fsc_utils.if_data_call_function_v2(
func = 'streamline.udf_bulk_rest_api_v2',
target = "{{this.schema}}.{{this.identifier}}",
params ={ "external_table" :"confirm_blocks",
"sql_limit" :"100000",
"producer_batch_size" :"100000",
"worker_batch_size" :"50000",
"sql_source" :"{{this.identifier}}" }
),
tags = ['streamline_core_realtime']
) }}
@ -69,29 +74,28 @@ tbl AS (
)
)
SELECT
block_number AS partition_key,
OBJECT_CONSTRUCT(
'method',
block_number,
ROUND(
block_number,
-3
) AS partition_key,
{{ target.database }}.live.udf_api(
'POST',
'url',
'{service}/{Authentication}',
'headers',
OBJECT_CONSTRUCT(
'Content-Type',
'application/json'
),
'params',
PARSE_JSON('{}'),
'data',
OBJECT_CONSTRUCT(
'id',
block_number :: STRING,
block_number,
'jsonrpc',
'2.0',
'method',
'eth_getBlockByNumber',
'params',
ARRAY_CONSTRUCT(utils.udf_int_to_hex(block_number), FALSE)) :: STRING
ARRAY_CONSTRUCT(utils.udf_int_to_hex(block_number), FALSE)),
'vault/prod/blast/mainnet'
) AS request
FROM
tbl

View File

@ -1,8 +1,14 @@
{{ config (
materialized = "view",
post_hook = if_data_call_function(
func = "{{this.schema}}.udf_rest_api(object_construct('sql_source', '{{this.identifier}}', 'external_table', 'receipts', 'exploded_key','[\"result\"]', 'sql_limit', {{var('sql_limit','100000')}}, 'producer_batch_size', {{var('producer_batch_size','100000')}}, 'worker_batch_size', {{var('worker_batch_size','50000')}}, 'sm_secret_name','prod/blast/mainnet'))",
target = "{{this.schema}}.{{this.identifier}}"
post_hook = fsc_utils.if_data_call_function_v2(
func = 'streamline.udf_bulk_rest_api_v2',
target = "{{this.schema}}.{{this.identifier}}",
params ={ "external_table" :"receipts",
"sql_limit" :"100000",
"producer_batch_size" :"100000",
"worker_batch_size" :"50000",
"sql_source" :"{{this.identifier}}",
"exploded_key": "[\"result\"]" }
),
tags = ['streamline_core_realtime']
) }}
@ -74,29 +80,28 @@ ready_blocks AS (
)
)
SELECT
block_number AS partition_key,
OBJECT_CONSTRUCT(
'method',
block_number,
ROUND(
block_number,
-3
) AS partition_key,
{{ target.database }}.live.udf_api(
'POST',
'url',
'{service}/{Authentication}',
'headers',
OBJECT_CONSTRUCT(
'Content-Type',
'application/json'
),
'params',
PARSE_JSON('{}'),
'data',
OBJECT_CONSTRUCT(
'id',
block_number :: STRING,
block_number,
'jsonrpc',
'2.0',
'method',
'eth_getBlockReceipts',
'params',
ARRAY_CONSTRUCT(utils.udf_int_to_hex(block_number))) :: STRING
ARRAY_CONSTRUCT(utils.udf_int_to_hex(block_number))),
'vault/prod/blast/mainnet'
) AS request
FROM
ready_blocks

View File

@ -1,8 +1,14 @@
{{ config (
materialized = "view",
post_hook = if_data_call_function(
func = "{{this.schema}}.udf_rest_api(object_construct('sql_source', '{{this.identifier}}', 'external_table', 'traces', 'exploded_key','[\"result\"]', 'sql_limit', {{var('sql_limit','100000')}}, 'producer_batch_size', {{var('producer_batch_size','100000')}}, 'worker_batch_size', {{var('worker_batch_size','50000')}}, 'sm_secret_name','prod/blast/mainnet'))",
target = "{{this.schema}}.{{this.identifier}}"
post_hook = fsc_utils.if_data_call_function_v2(
func = 'streamline.udf_bulk_rest_api_v2',
target = "{{this.schema}}.{{this.identifier}}",
params ={ "external_table" :"traces",
"sql_limit" :"100000",
"producer_batch_size" :"100000",
"worker_batch_size" :"50000",
"sql_source" :"{{this.identifier}}",
"exploded_key": "[\"result\"]" }
),
tags = ['streamline_core_realtime']
) }}
@ -69,30 +75,29 @@ ready_blocks AS (
)
)
SELECT
block_number AS partition_key,
OBJECT_CONSTRUCT(
'method',
block_number,
ROUND(
block_number,
-3
) AS partition_key,
{{ target.database }}.live.udf_api(
'POST',
'url',
'{service}/{Authentication}',
'headers',
OBJECT_CONSTRUCT(
'Content-Type',
'application/json'
),
'params',
PARSE_JSON('{}'),
'data',
OBJECT_CONSTRUCT(
'id',
block_number :: STRING,
block_number,
'jsonrpc',
'2.0',
'method',
'debug_traceBlockByNumber',
'params',
ARRAY_CONSTRUCT(utils.udf_int_to_hex(block_number), OBJECT_CONSTRUCT('tracer', 'callTracer', 'timeout', '30s'))
) :: STRING
),
'vault/prod/blast/mainnet'
) AS request
FROM
ready_blocks

View File

@ -1,8 +1,14 @@
{{ config (
materialized = "view",
post_hook = if_data_call_function(
func = "{{this.schema}}.udf_rest_api(object_construct('sql_source', '{{this.identifier}}', 'external_table', 'transactions', 'exploded_key','[\"result\", \"transactions\"]', 'sql_limit', {{var('sql_limit','100000')}}, 'producer_batch_size', {{var('producer_batch_size','100000')}}, 'worker_batch_size', {{var('worker_batch_size','50000')}}, 'sm_secret_name','prod/blast/mainnet'))",
target = "{{this.schema}}.{{this.identifier}}"
post_hook = fsc_utils.if_data_call_function_v2(
func = 'streamline.udf_bulk_rest_api_v2',
target = "{{this.schema}}.{{this.identifier}}",
params ={ "external_table" :"transactions",
"sql_limit" :"100000",
"producer_batch_size" :"100000",
"worker_batch_size" :"50000",
"sql_source" :"{{this.identifier}}",
"exploded_key": "[\"result\", \"transactions\"]" }
),
tags = ['streamline_core_realtime']
) }}
@ -16,11 +22,6 @@ WITH last_3_days AS (
),
to_do AS (
SELECT
MD5(
CAST(
COALESCE(CAST(block_number AS text), '' :: STRING) AS text
)
) AS id,
block_number
FROM
{{ ref("streamline__blocks") }}
@ -36,7 +37,6 @@ to_do AS (
AND block_number IS NOT NULL
EXCEPT
SELECT
id,
block_number
FROM
{{ ref("streamline__complete_transactions") }}
@ -55,17 +55,11 @@ to_do AS (
),
ready_blocks AS (
SELECT
id,
block_number
FROM
to_do
UNION
SELECT
MD5(
CAST(
COALESCE(CAST(block_number AS text), '' :: STRING) AS text
)
) AS id,
block_number
FROM
(
@ -81,29 +75,28 @@ ready_blocks AS (
)
)
SELECT
block_number AS partition_key,
OBJECT_CONSTRUCT(
'method',
block_number,
ROUND(
block_number,
-3
) AS partition_key,
{{ target.database }}.live.udf_api(
'POST',
'url',
'{service}/{Authentication}',
'headers',
OBJECT_CONSTRUCT(
'Content-Type',
'application/json'
),
'params',
PARSE_JSON('{}'),
'data',
OBJECT_CONSTRUCT(
'id',
block_number :: STRING,
block_number,
'jsonrpc',
'2.0',
'method',
'eth_getBlockByNumber',
'params',
ARRAY_CONSTRUCT(utils.udf_int_to_hex(block_number), TRUE)) :: STRING
ARRAY_CONSTRUCT(utils.udf_int_to_hex(block_number), TRUE)),
'vault/prod/blast/mainnet'
) AS request
FROM
ready_blocks

View File

@ -4,12 +4,25 @@
) }}
SELECT
live.udf_api(
{{ target.database }}.live.udf_api(
'POST',
'{service}/{Authentication}',{},{ 'method' :'eth_blockNumber',
'params' :[],
'id' :1,
'jsonrpc' :'2.0' },
'{service}/{Authentication}',
OBJECT_CONSTRUCT(
'Content-Type',
'application/json',
'fsc-quantum-state',
'livequery'
),
OBJECT_CONSTRUCT(
'id',
1,
'jsonrpc',
'2.0',
'method',
'eth_blockNumber',
'params',
[]
),
'vault/prod/blast/mainnet'
) AS resp,
utils.udf_hex_to_int(

View File

@ -6,6 +6,6 @@ packages:
- package: dbt-labs/dbt_utils
version: 1.0.0
- git: https://github.com/FlipsideCrypto/fsc-utils.git
revision: v1.23.0
revision: "v1.23.0"
- package: get-select/dbt_snowflake_query_tags
version: [">=2.0.0", "<3.0.0"]