blocks + txs (#132)

* blocks + txs

* limit
This commit is contained in:
Austin 2025-09-03 10:29:18 -04:00 committed by GitHub
parent 34233235df
commit 16e7cb7df0
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
13 changed files with 56 additions and 178 deletions

View File

@ -2,6 +2,6 @@
materialized = 'view'
) }}
{{ fsc_evm.streamline_external_table_query(
model = "evm_blocks",
model = "evm_blocks_v2",
partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 4), '_', 1) AS INTEGER )"
) }}

View File

@ -2,6 +2,6 @@
materialized = 'view'
) }}
{{ fsc_evm.streamline_external_table_query(
model = "evm_transactions",
model = "evm_transactions_v2",
partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 4), '_', 1) AS INTEGER )"
) }}

View File

@ -1,7 +1,7 @@
{{ config (
materialized = 'view'
) }}
{{ fsc_evm.streamline_external_table_fr_query(
model = "evm_blocks",
partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 4), '_', 1) AS INTEGER )"
) }}
select * from {{ ref('bronze_evm__streamline_fr_blocks_v1') }}
union all
select * from {{ ref('bronze_evm__streamline_fr_blocks_v2') }}

View File

@ -0,0 +1,7 @@
{{ config (
materialized = 'view'
) }}
{{ fsc_evm.streamline_external_table_fr_query(
model = "evm_blocks",
partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 4), '_', 1) AS INTEGER )"
) }}

View File

@ -0,0 +1,7 @@
{{ config (
materialized = 'view'
) }}
{{ fsc_evm.streamline_external_table_fr_query(
model = "evm_blocks_v2",
partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 4), '_', 1) AS INTEGER )"
) }}

View File

@ -1,7 +1,7 @@
{{ config (
materialized = 'view'
) }}
{{ fsc_evm.streamline_external_table_fr_query(
model = 'evm_transactions',
partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 4), '_', 1) AS INTEGER )"
) }}
select * from {{ ref('bronze_evm__streamline_fr_transactions_v1') }}
union all
select * from {{ ref('bronze_evm__streamline_fr_transactions_v2') }}

View File

@ -0,0 +1,7 @@
{{ config (
materialized = 'view'
) }}
{{ fsc_evm.streamline_external_table_fr_query(
model = 'evm_transactions',
partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 4), '_', 1) AS INTEGER )"
) }}

View File

@ -0,0 +1,8 @@
{{ config (
materialized = 'view'
) }}
{{ fsc_evm.streamline_external_table_fr_query(
model = 'evm_transactions_v2',
partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 4), '_', 1) AS INTEGER )"
) }}

View File

@ -1,61 +0,0 @@
{{ config (
materialized = "view",
post_hook = fsc_utils.if_data_call_function_v2(
func = 'streamline.udf_bulk_rest_api_v2',
target = "{{this.schema}}.{{this.identifier}}",
params ={ "external_table" :"evm_blocks",
"sql_limit" :"25000",
"producer_batch_size" :"100000",
"worker_batch_size" :"10000",
"sql_source" :"{{this.identifier}}" }
),
tags = ['streamline_core_evm_history']
) }}
WITH to_do AS (
SELECT
block_number
FROM
{{ ref("streamline__evm_blocks") }}
WHERE block_number IS NOT NULL
EXCEPT
SELECT
block_number
FROM
{{ ref("streamline__complete_evm_blocks") }}
),
ready_blocks AS (
SELECT
block_number
FROM
to_do
)
SELECT
block_number,
ROUND(
block_number,
-3
) :: INT AS partition_key,
{{ target.database }}.live.udf_api(
'POST',
'{Service}/{Authentication}',
OBJECT_CONSTRUCT(
'Content-Type',
'application/json'
),
OBJECT_CONSTRUCT(
'id',
block_number :: STRING,
'jsonrpc',
'2.0',
'method',
'eth_getBlockByNumber',
'params',
ARRAY_CONSTRUCT(utils.udf_int_to_hex(block_number), FALSE)),
'Vault/prod/sei/quicknode/mainnet'
) AS request
FROM
ready_blocks
ORDER BY
block_number asc
limit 25000

View File

@ -3,12 +3,13 @@
post_hook = fsc_utils.if_data_call_function_v2(
func = 'streamline.udf_bulk_rest_api_v2',
target = "{{this.schema}}.{{this.identifier}}",
params ={ "external_table" :"evm_transactions",
"sql_limit" :"25000",
"producer_batch_size" :"100000",
params ={ "external_table" :"evm_blocks_transactions",
"sql_limit" :"30000",
"producer_batch_size" :"30000",
"worker_batch_size" :"10000",
"sql_source" :"{{this.identifier}}",
"exploded_key": tojson(["result.transactions"]) }
"exploded_key": tojson(['result', 'result.transactions'])
}
),
tags = ['streamline_core_evm_history']
) }}
@ -18,12 +19,12 @@ WITH to_do AS (
block_number
FROM
{{ ref("streamline__evm_blocks") }}
WHERE block_number IS NOT NULL
EXCEPT
SELECT
block_number
FROM
{{ ref("streamline__complete_evm_transactions") }}
{{ ref("streamline__complete_evm_blocks") }} b
INNER JOIN {{ ref("streamline__complete_evm_transactions") }} t USING (block_number)
),
ready_blocks AS (
SELECT
@ -58,5 +59,4 @@ SELECT
FROM
ready_blocks
ORDER BY
block_number asc
limit 25000
block_number ASC

View File

@ -1,89 +0,0 @@
{{ config (
materialized = "view",
post_hook = fsc_utils.if_data_call_function_v2(
func = 'streamline.udf_bulk_rest_api_v2',
target = "{{this.schema}}.{{this.identifier}}",
params ={ "external_table" :"evm_blocks",
"sql_limit" :"30000",
"producer_batch_size" :"30000",
"worker_batch_size" :"10000",
"sql_source" :"{{this.identifier}}" }
),
tags = ['streamline_core_evm_realtime']
) }}
WITH last_3_days AS (
SELECT
block_number
FROM
{{ ref("_evm_block_lookback") }}
),
to_do AS (
SELECT
block_number
FROM
{{ ref("streamline__evm_blocks") }}
WHERE
(
block_number >= (
SELECT
block_number
FROM
last_3_days
)
)
AND block_number IS NOT NULL
EXCEPT
SELECT
block_number
FROM
{{ ref("streamline__complete_evm_blocks") }}
WHERE
block_number >= (
SELECT
block_number
FROM
last_3_days
)
AND _inserted_timestamp >= DATEADD(
'day',
-4,
SYSDATE()
)
),
ready_blocks AS (
SELECT
block_number
FROM
to_do {# add retry here #}
)
SELECT
block_number,
ROUND(
block_number,
-3
) :: INT AS partition_key,
{{ target.database }}.live.udf_api(
'POST',
'{Service}/{Authentication}',
OBJECT_CONSTRUCT(
'Content-Type',
'application/json'
),
OBJECT_CONSTRUCT(
'id',
block_number :: STRING,
'jsonrpc',
'2.0',
'method',
'eth_getBlockByNumber',
'params',
ARRAY_CONSTRUCT(utils.udf_int_to_hex(block_number), FALSE)),
'Vault/prod/sei/quicknode/mainnet'
) AS request
FROM
ready_blocks
ORDER BY
block_number DESC
limit 25000

View File

@ -3,12 +3,13 @@
post_hook = fsc_utils.if_data_call_function_v2(
func = 'streamline.udf_bulk_rest_api_v2',
target = "{{this.schema}}.{{this.identifier}}",
params ={ "external_table" :"evm_transactions",
params ={ "external_table" :"evm_blocks_transactions",
"sql_limit" :"30000",
"producer_batch_size" :"30000",
"worker_batch_size" :"10000",
"sql_source" :"{{this.identifier}}",
"exploded_key": tojson(["result.transactions"]) }
"exploded_key": tojson(['result', 'result.transactions'])
}
),
tags = ['streamline_core_evm_realtime']
) }}
@ -39,19 +40,15 @@ to_do AS (
SELECT
block_number
FROM
{{ ref("streamline__complete_evm_transactions") }}
{{ ref("streamline__complete_evm_blocks") }} b
INNER JOIN {{ ref("streamline__complete_evm_transactions") }} t USING (block_number)
WHERE
block_number >= (
b.block_number >= (
SELECT
block_number
FROM
last_3_days
)
AND _inserted_timestamp >= DATEADD(
'day',
-4,
SYSDATE()
)
),
ready_blocks AS (
SELECT
@ -97,4 +94,4 @@ SELECT
ready_blocks
ORDER BY
block_number DESC
limit 25000
limit 30000

View File

@ -53,6 +53,8 @@ sources:
- name: txcount_v2
- name: evm_decoded_logs
- name: sei_addresses
- name: evm_blocks_v2
- name: evm_transactions_v2
- name: bronze
schema: bronze
tables: