* tags

* receipts
This commit is contained in:
Austin 2025-02-03 12:31:38 -05:00 committed by GitHub
parent e5f778c474
commit a0c10c0e71
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 129 additions and 2 deletions

View File

@ -43,4 +43,4 @@ jobs:
- name: Run DBT Jobs
run: |
dbt run --vars '{"STREAMLINE_INVOKE_STREAMS":True}' -s models/evm/streamline/core/realtime models/evm/streamline/core/complete
dbt run --vars '{"STREAMLINE_INVOKE_STREAMS":True}' -s models/evm/streamline/core/realtime models/evm/streamline/core/complete "flow_models,tag:streamline_realtime_evm"

View File

@ -0,0 +1,61 @@
{{ config (
materialized = "view",
post_hook = fsc_utils.if_data_call_function_v2(
func = 'streamline.udf_bulk_rest_api_v2',
target = "{{this.schema}}.{{this.identifier}}",
params ={ "external_table" :"evm_blocks",
"sql_limit" :"100000",
"producer_batch_size" :"5000",
"worker_batch_size" :"1000",
"sql_source" :"{{this.identifier}}" }
),
tags = ['streamline_history_evm']
) }}
WITH tbl AS (
SELECT
block_number
FROM
{{ ref('streamline__evm_blocks') }}
WHERE
block_number IS NOT NULL
EXCEPT
SELECT
block_number
FROM
{{ ref('streamline__complete_get_evm_blocks') }}
)
SELECT
block_number,
DATE_PART(epoch_second, SYSDATE()) :: STRING AS request_timestamp,
'{{ invocation_id }}' AS _invocation_id,
ROUND(
block_number,
-3
) :: INT AS partition_key,
{{ target.database }}.live.udf_api(
'POST',
'{Service}',
OBJECT_CONSTRUCT(
'Content-Type',
'application/json'
),
OBJECT_CONSTRUCT(
'id',
block_number,
'jsonrpc',
'2.0',
'method',
'eth_getBlockByNumber',
'params',
ARRAY_CONSTRUCT(
utils.udf_int_to_hex(block_number),
TRUE -- Include transactions
)
),
'Vault/{{ target.name }}/flow/evm/mainnet'
) AS request
FROM
tbl
ORDER BY
block_number DESC

View File

@ -0,0 +1,66 @@
{{ config (
materialized = "view",
post_hook = fsc_utils.if_data_call_function_v2(
func = 'streamline.udf_bulk_rest_api_v2',
target = "{{this.schema}}.{{this.identifier}}",
params ={ "external_table" :"evm_receipts",
"sql_limit" :"100000",
"producer_batch_size" :"5000",
"worker_batch_size" :"1000",
"sql_source" :"{{this.identifier}}" }
),
tags = ['streamline_history_evm']
) }}
WITH tbl AS (
SELECT
block_number
FROM
{{ ref('streamline__evm_blocks') }}
WHERE block_number IS NOT NULL
EXCEPT
SELECT
block_number
FROM
{{ ref('streamline__complete_get_evm_receipts') }}
),
ready_blocks AS (
SELECT
block_number
FROM
tbl
)
SELECT
block_number,
DATE_PART(epoch_second, SYSDATE())::STRING AS request_timestamp,
'{{ invocation_id }}' AS _invocation_id,
ROUND(
block_number,
-3
) :: INT AS partition_key,
{{ target.database }}.live.udf_api(
'POST',
'{Service}',
OBJECT_CONSTRUCT(
'Content-Type',
'application/json'
),
OBJECT_CONSTRUCT(
'id',
block_number,
'jsonrpc',
'2.0',
'method',
'eth_getBlockReceipts',
'params',
ARRAY_CONSTRUCT(
utils.udf_int_to_hex(block_number)
)
),
'Vault/{{ target.name }}/flow/evm/mainnet'
) AS request
FROM
ready_blocks
ORDER BY
block_number DESC

View File

@ -1,6 +1,6 @@
{{ config(
materialized = "view",
tags = ['streamline_realtime_evm']
tags = ['streamline_realtime_evm', 'streamline_history_evm']
) }}
{% if execute %}