diff --git a/.github/workflows/dbt_run_streamline_evm_realtime.yml b/.github/workflows/dbt_run_streamline_evm_realtime.yml index 0011651..41e0027 100644 --- a/.github/workflows/dbt_run_streamline_evm_realtime.yml +++ b/.github/workflows/dbt_run_streamline_evm_realtime.yml @@ -43,4 +43,4 @@ jobs: - name: Run DBT Jobs run: | - dbt run --vars '{"STREAMLINE_INVOKE_STREAMS":True}' -s models/evm/streamline/core/realtime models/evm/streamline/core/complete + dbt run --vars '{"STREAMLINE_INVOKE_STREAMS":True}' -s models/evm/streamline/core/realtime models/evm/streamline/core/complete "flow_models,tag:streamline_realtime_evm" diff --git a/models/evm/streamline/core/history/streamline__get_evm_blocks_history.sql b/models/evm/streamline/core/history/streamline__get_evm_blocks_history.sql new file mode 100644 index 0000000..c527b35 --- /dev/null +++ b/models/evm/streamline/core/history/streamline__get_evm_blocks_history.sql @@ -0,0 +1,61 @@ +{{ config ( + materialized = "view", + post_hook = fsc_utils.if_data_call_function_v2( + func = 'streamline.udf_bulk_rest_api_v2', + target = "{{this.schema}}.{{this.identifier}}", + params ={ "external_table" :"evm_blocks", + "sql_limit" :"100000", + "producer_batch_size" :"5000", + "worker_batch_size" :"1000", + "sql_source" :"{{this.identifier}}" } + ), + tags = ['streamline_history_evm'] +) }} + +WITH tbl AS ( + SELECT + block_number + FROM + {{ ref('streamline__evm_blocks') }} + WHERE + block_number IS NOT NULL + EXCEPT + SELECT + block_number + FROM + {{ ref('streamline__complete_get_evm_blocks') }} +) +SELECT + block_number, + DATE_PART(epoch_second, SYSDATE()) :: STRING AS request_timestamp, + '{{ invocation_id }}' AS _invocation_id, + ROUND( + block_number, + -3 + ) :: INT AS partition_key, + {{ target.database }}.live.udf_api( + 'POST', + '{Service}', + OBJECT_CONSTRUCT( + 'Content-Type', + 'application/json' + ), + OBJECT_CONSTRUCT( + 'id', + block_number, + 'jsonrpc', + '2.0', + 'method', + 'eth_getBlockByNumber', + 'params', + ARRAY_CONSTRUCT( + utils.udf_int_to_hex(block_number), + TRUE -- Include transactions + ) + ), + 'Vault/{{ target.name }}/flow/evm/mainnet' + ) AS request +FROM + tbl +ORDER BY + block_number DESC diff --git a/models/evm/streamline/core/history/streamline__get_evm_receipts_history.sql b/models/evm/streamline/core/history/streamline__get_evm_receipts_history.sql new file mode 100644 index 0000000..654ba0d --- /dev/null +++ b/models/evm/streamline/core/history/streamline__get_evm_receipts_history.sql @@ -0,0 +1,66 @@ +{{ config ( + materialized = "view", + post_hook = fsc_utils.if_data_call_function_v2( + func = 'streamline.udf_bulk_rest_api_v2', + target = "{{this.schema}}.{{this.identifier}}", + params ={ "external_table" :"evm_receipts", + "sql_limit" :"100000", + "producer_batch_size" :"5000", + "worker_batch_size" :"1000", + "sql_source" :"{{this.identifier}}" } + ), + tags = ['streamline_history_evm'] +) }} + +WITH tbl AS ( + + SELECT + block_number + FROM + {{ ref('streamline__evm_blocks') }} + WHERE block_number IS NOT NULL + EXCEPT + SELECT + block_number + FROM + {{ ref('streamline__complete_get_evm_receipts') }} +), +ready_blocks AS ( + SELECT + block_number + FROM + tbl +) +SELECT + block_number, + DATE_PART(epoch_second, SYSDATE())::STRING AS request_timestamp, + '{{ invocation_id }}' AS _invocation_id, + ROUND( + block_number, + -3 + ) :: INT AS partition_key, + {{ target.database }}.live.udf_api( + 'POST', + '{Service}', + OBJECT_CONSTRUCT( + 'Content-Type', + 'application/json' + ), + OBJECT_CONSTRUCT( + 'id', + block_number, + 'jsonrpc', + '2.0', + 'method', + 'eth_getBlockReceipts', + 'params', + ARRAY_CONSTRUCT( + utils.udf_int_to_hex(block_number) + ) + ), + 'Vault/{{ target.name }}/flow/evm/mainnet' + ) AS request +FROM + ready_blocks +ORDER BY + block_number DESC diff --git a/models/evm/streamline/core/streamline__evm_blocks.sql b/models/evm/streamline/core/streamline__evm_blocks.sql index 5bf1df6..b0d8638 100644 --- a/models/evm/streamline/core/streamline__evm_blocks.sql +++ b/models/evm/streamline/core/streamline__evm_blocks.sql @@ -1,6 +1,6 @@ {{ config( materialized = "view", - tags = ['streamline_realtime_evm'] + tags = ['streamline_realtime_evm', 'streamline_history_evm'] ) }} {% if execute %}