diff --git a/.github/workflows/dbt_run_streamline_realtime.yml b/.github/workflows/dbt_run_streamline_realtime.yml new file mode 100644 index 0000000..f242366 --- /dev/null +++ b/.github/workflows/dbt_run_streamline_realtime.yml @@ -0,0 +1,44 @@ +name: dbt_run_streamline_realtime +run-name: dbt_run_streamline_realtime + +on: + workflow_dispatch: + schedule: + # Runs "at minute 25 and 55, every hour" (see https://crontab.guru) + - cron: '25,55 * * * *' + +env: + DBT_PROFILES_DIR: ./ + + ACCOUNT: "${{ vars.ACCOUNT }}" + ROLE: "${{ vars.ROLE }}" + USER: "${{ vars.USER }}" + PASSWORD: "${{ secrets.PASSWORD }}" + REGION: "${{ vars.REGION }}" + DATABASE: "${{ vars.DATABASE }}" + WAREHOUSE: "${{ vars.WAREHOUSE }}" + SCHEMA: "${{ vars.SCHEMA }}" + +concurrency: + group: ${{ github.workflow }} + +jobs: + run_dbt_jobs: + runs-on: ubuntu-latest + environment: + name: workflow_prod + + steps: + - uses: actions/checkout@v3 + + - uses: actions/setup-python@v1 + with: + python-version: "3.7.x" + + - name: install dependencies + run: | + pip3 install dbt-snowflake~=${{ vars.DBT_VERSION }} cli_passthrough requests click + dbt deps + - name: Run DBT Jobs + run: | + dbt run --vars '{"STREAMLINE_INVOKE_STREAMS":True}' -m 1+models/silver/streamline/core/realtime diff --git a/.user.yml b/.user.yml new file mode 100644 index 0000000..40bbba7 --- /dev/null +++ b/.user.yml @@ -0,0 +1 @@ +id: 95f7b44b-3137-49d7-8ed2-1034b9c4d4af diff --git a/macros/create_udfs.sql b/macros/create_udfs.sql index e0b9497..71deba6 100644 --- a/macros/create_udfs.sql +++ b/macros/create_udfs.sql @@ -1,10 +1,24 @@ -{% macro create_udfs() %} - {% if var("UPDATE_UDFS_AND_SPS") %} - {% set sql %} - CREATE schema if NOT EXISTS silver; +{% macro create_udfs() %} + {% if var("UPDATE_UDFS_AND_SPS") %} + {% set sql %} + CREATE schema if NOT EXISTS silver; + {{ create_udtf_get_base_table( + schema = "streamline" + ) }} - {% endset %} - {% do run_query(sql) %} - {{- fsc_utils.create_udfs() -}} - {% endif %} + {% endset %} + {% do run_query(sql) %} + {% if target.database != "GNOSIS_COMMUNITY_DEV" %} + {% set sql %} + {{ create_udf_get_chainhead() }} + {{ create_udf_bulk_json_rpc() }} + {{ create_udf_decode_array_string() }} + {{ create_udf_decode_array_object() }} + {{ create_udf_bulk_decode_logs() }} + + {% endset %} + {% do run_query(sql) %} + {% endif %} + {{- fsc_utils.create_udfs() -}} + {% endif %} {% endmacro %} \ No newline at end of file diff --git a/macros/streamline/api_integrations.sql b/macros/streamline/api_integrations.sql new file mode 100644 index 0000000..e47a9cb --- /dev/null +++ b/macros/streamline/api_integrations.sql @@ -0,0 +1,21 @@ +{% macro create_aws_gnosis_api() %} + {{ log( + "Creating integration for target:" ~ target + ) }} + + {% if target.name == "prod" %} + {% set sql %} + CREATE api integration IF NOT EXISTS aws_gnosis_api api_provider = aws_api_gateway api_aws_role_arn = 'arn:aws:iam::490041342817:role/gnosis-api-prod-rolesnowflakeudfsAF733095-1S5YHE2BPGRAE' api_allowed_prefixes = ( + 'https://abjc1ljs1d.execute-api.us-east-1.amazonaws.com/prod/' + ) enabled = TRUE; +{% endset %} + {% do run_query(sql) %} + {% elif target.name == "dev" %} + {% set sql %} + CREATE api integration IF NOT EXISTS aws_gnosis_api_dev api_provider = aws_api_gateway api_aws_role_arn = 'arn:aws:iam::490041342817:role/gnosis-api-dev-rolesnowflakeudfsAF733095-KZU75JVH7RGG' api_allowed_prefixes = ( + 'https://fp4z9mbqa3.execute-api.us-east-1.amazonaws.com/dev/' + ) enabled = TRUE; +{% endset %} + {% do run_query(sql) %} + {% endif %} +{% endmacro %} diff --git a/macros/streamline/get_base_table_udtf.sql b/macros/streamline/get_base_table_udtf.sql new file mode 100644 index 0000000..a488d14 --- /dev/null +++ b/macros/streamline/get_base_table_udtf.sql @@ -0,0 +1,24 @@ +{% macro create_udtf_get_base_table(schema) %} +create or replace function {{ schema }}.udtf_get_base_table(max_height integer) +returns table (height number) +as +$$ + with base as ( + select + row_number() over ( + order by + seq4() + ) as id + from + table(generator(rowcount => 100000000)) + ) +select + id as height +from + base +where + id <= max_height +$$ +; + +{% endmacro %} \ No newline at end of file diff --git a/macros/streamline/streamline_udfs.sql b/macros/streamline/streamline_udfs.sql new file mode 100644 index 0000000..398d10c --- /dev/null +++ b/macros/streamline/streamline_udfs.sql @@ -0,0 +1,56 @@ +{% macro create_udf_get_chainhead() %} + CREATE + OR REPLACE EXTERNAL FUNCTION streamline.udf_get_chainhead() returns variant api_integration = + {% if target.name == "prod" %} + aws_gnosis_api AS 'https://abjc1ljs1d.execute-api.us-east-1.amazonaws.com/prod/get_chainhead' + {% else %} + aws_gnosis_api_dev AS 'https://fp4z9mbqa3.execute-api.us-east-1.amazonaws.com/dev/get_chainhead' + {%- endif %}; +{% endmacro %} + +{% macro create_udf_bulk_json_rpc() %} + CREATE + OR REPLACE EXTERNAL FUNCTION streamline.udf_bulk_json_rpc( + json variant + ) returns text api_integration = {% if target.name == "prod" %} + aws_gnosis_api AS 'https://abjc1ljs1d.execute-api.us-east-1.amazonaws.com/prod/udf_bulk_json_rpc' + {% else %} + aws_gnosis_api_dev AS 'https://fp4z9mbqa3.execute-api.us-east-1.amazonaws.com/dev/udf_bulk_json_rpc' + {%- endif %}; +{% endmacro %} + +{% macro create_udf_decode_array_string() %} + CREATE + OR REPLACE EXTERNAL FUNCTION streamline.udf_decode( + abi ARRAY, + DATA STRING + ) returns ARRAY api_integration = {% if target.name == "prod" %} + aws_gnosis_api AS 'https://abjc1ljs1d.execute-api.us-east-1.amazonaws.com/prod/decode_function' + {% else %} + aws_gnosis_api_dev AS 'https://fp4z9mbqa3.execute-api.us-east-1.amazonaws.com/dev/decode_function' + {%- endif %}; +{% endmacro %} + +{% macro create_udf_decode_array_object() %} + CREATE + OR REPLACE EXTERNAL FUNCTION streamline.udf_decode( + abi ARRAY, + DATA OBJECT + ) returns ARRAY api_integration = {% if target.name == "prod" %} + aws_gnosis_api AS 'https://abjc1ljs1d.execute-api.us-east-1.amazonaws.com/prod/decode_log' + {% else %} + aws_gnosis_api_dev AS 'https://fp4z9mbqa3.execute-api.us-east-1.amazonaws.com/dev/decode_log' + {%- endif %}; +{% endmacro %} + + +{% macro create_udf_bulk_decode_logs() %} + CREATE + OR REPLACE EXTERNAL FUNCTION streamline.udf_bulk_decode_logs( + json OBJECT + ) returns ARRAY api_integration = {% if target.name == "prod" %} + aws_gnosis_api AS 'https://abjc1ljs1d.execute-api.us-east-1.amazonaws.com/prod/bulk_decode_logs' + {% else %} + aws_gnosis_api_dev AS'https://fp4z9mbqa3.execute-api.us-east-1.amazonaws.com/dev/bulk_decode_logs' + {%- endif %}; +{% endmacro %} diff --git a/macros/utils.sql b/macros/utils.sql new file mode 100644 index 0000000..0b800b8 --- /dev/null +++ b/macros/utils.sql @@ -0,0 +1,78 @@ +{% macro if_data_call_function( + func, + target + ) %} + {% if var( + "STREAMLINE_INVOKE_STREAMS" + ) %} + {% if execute %} + {{ log( + "Running macro `if_data_call_function`: Calling udf " ~ func ~ " on " ~ target, + True + ) }} + {% endif %} + SELECT + {{ func }} + WHERE + EXISTS( + SELECT + 1 + FROM + {{ target }} + LIMIT + 1 + ) + {% else %} + {% if execute %} + {{ log( + "Running macro `if_data_call_function`: NOOP", + False + ) }} + {% endif %} + SELECT + NULL + {% endif %} +{% endmacro %} + +{% macro if_data_call_wait() %} + {% if var( + "STREAMLINE_INVOKE_STREAMS" + ) %} + {% set query %} + SELECT + 1 + WHERE + EXISTS( + SELECT + 1 + FROM + {{ model.schema ~ "." ~ model.alias }} + LIMIT + 1 + ) {% endset %} + {% if execute %} + {% set results = run_query( + query + ) %} + {% if results %} + {{ log( + "Waiting...", + info = True + ) }} + + {% set wait_query %} + SELECT + system$wait( + {{ var( + "WAIT", + 600 + ) }} + ) {% endset %} + {% do run_query(wait_query) %} + {% else %} + SELECT + NULL; + {% endif %} + {% endif %} + {% endif %} +{% endmacro %} diff --git a/models/bronze/core/bronze__streamline_FR_blocks.sql b/models/bronze/core/bronze__streamline_FR_blocks.sql new file mode 100644 index 0000000..a430319 --- /dev/null +++ b/models/bronze/core/bronze__streamline_FR_blocks.sql @@ -0,0 +1,11 @@ +{{ config ( + materialized = 'view' +) }} + +{% set model = this.identifier.split("_") [-1] %} +{{ streamline_external_table_FR_query( + model, + partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 3), '_', 1) AS INTEGER)", + partition_name = "_partition_by_block_id", + unique_key = "block_number" +) }} diff --git a/models/bronze/core/bronze__streamline_FR_confirm_blocks.sql b/models/bronze/core/bronze__streamline_FR_confirm_blocks.sql new file mode 100644 index 0000000..ae612bf --- /dev/null +++ b/models/bronze/core/bronze__streamline_FR_confirm_blocks.sql @@ -0,0 +1,9 @@ +{{ config ( + materialized = 'view' +) }} +{{ streamline_external_table_FR_query( + model = "confirm_blocks", + partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 4), '_', 1) AS INTEGER )", + partition_name = "_partition_by_block_id", + unique_key = "block_number" +) }} diff --git a/models/bronze/core/bronze__streamline_FR_receipts.sql b/models/bronze/core/bronze__streamline_FR_receipts.sql new file mode 100644 index 0000000..796b55c --- /dev/null +++ b/models/bronze/core/bronze__streamline_FR_receipts.sql @@ -0,0 +1,11 @@ +{{ config ( + materialized = 'view' +) }} + +{% set model = this.identifier.split("_") [-1] %} +{{ streamline_external_table_FR_query( + model, + partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 3), '_', 1) AS INTEGER )", + partition_name = "_partition_by_block_id", + unique_key = "block_number" +) }} diff --git a/models/bronze/core/bronze__streamline_FR_traces.sql b/models/bronze/core/bronze__streamline_FR_traces.sql new file mode 100644 index 0000000..9c994cc --- /dev/null +++ b/models/bronze/core/bronze__streamline_FR_traces.sql @@ -0,0 +1,10 @@ +{{ config ( + materialized = 'view' +) }} + +{{ streamline_external_table_FR_query( + model = "debug_traceBlockByNumber", + partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 3), '_', 1) AS INTEGER )", + partition_name = "_partition_by_block_id", + unique_key = "block_number" +) }} diff --git a/models/bronze/core/bronze__streamline_FR_transactions.sql b/models/bronze/core/bronze__streamline_FR_transactions.sql new file mode 100644 index 0000000..796b55c --- /dev/null +++ b/models/bronze/core/bronze__streamline_FR_transactions.sql @@ -0,0 +1,11 @@ +{{ config ( + materialized = 'view' +) }} + +{% set model = this.identifier.split("_") [-1] %} +{{ streamline_external_table_FR_query( + model, + partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 3), '_', 1) AS INTEGER )", + partition_name = "_partition_by_block_id", + unique_key = "block_number" +) }} diff --git a/models/bronze/core/bronze__streamline_blocks.sql b/models/bronze/core/bronze__streamline_blocks.sql new file mode 100644 index 0000000..22b0c51 --- /dev/null +++ b/models/bronze/core/bronze__streamline_blocks.sql @@ -0,0 +1,11 @@ +{{ config ( + materialized = 'view' +) }} + +{% set model = this.identifier.split("_") [-1] %} +{{ streamline_external_table_query( + model, + partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 3), '_', 1) AS INTEGER)", + partition_name = "_partition_by_block_id", + unique_key = "block_number" +) }} diff --git a/models/bronze/core/bronze__streamline_confirm_blocks.sql b/models/bronze/core/bronze__streamline_confirm_blocks.sql new file mode 100644 index 0000000..af45189 --- /dev/null +++ b/models/bronze/core/bronze__streamline_confirm_blocks.sql @@ -0,0 +1,9 @@ +{{ config ( + materialized = 'view' +) }} +{{ streamline_external_table_query( + model = "confirm_blocks", + partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 4), '_', 1) AS INTEGER )", + partition_name = "_partition_by_block_id", + unique_key = "block_number" +) }} diff --git a/models/bronze/core/bronze__streamline_receipts.sql b/models/bronze/core/bronze__streamline_receipts.sql new file mode 100644 index 0000000..7fad1ea --- /dev/null +++ b/models/bronze/core/bronze__streamline_receipts.sql @@ -0,0 +1,11 @@ +{{ config ( + materialized = 'view' +) }} + +{% set model = this.identifier.split("_") [-1] %} +{{ streamline_external_table_query( + model, + partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 3), '_', 1) AS INTEGER )", + partition_name = "_partition_by_block_id", + unique_key = "block_number" +) }} diff --git a/models/bronze/core/bronze__streamline_traces.sql b/models/bronze/core/bronze__streamline_traces.sql new file mode 100644 index 0000000..1423eb9 --- /dev/null +++ b/models/bronze/core/bronze__streamline_traces.sql @@ -0,0 +1,10 @@ +{{ config ( + materialized = 'view' +) }} + +{{ streamline_external_table_query( + model = "debug_traceBlockByNumber", + partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 3), '_', 1) AS INTEGER )", + partition_name = "_partition_by_block_id", + unique_key = "block_number" +) }} diff --git a/models/bronze/core/bronze__streamline_transactions.sql b/models/bronze/core/bronze__streamline_transactions.sql new file mode 100644 index 0000000..da8e375 --- /dev/null +++ b/models/bronze/core/bronze__streamline_transactions.sql @@ -0,0 +1,11 @@ +{{ config ( + materialized = 'view' +) }} + +{% set model = this.identifier.split("_") [-1] %} +{{ streamline_external_table_query( + model, + partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 3), '_', 1) AS INTEGER )", + partition_name = "_partition_by_block_id", + unique_key = "block_number" +) }} diff --git a/models/silver/silver__temp.sql b/models/silver/silver__temp.sql new file mode 100644 index 0000000..0b4afbd --- /dev/null +++ b/models/silver/silver__temp.sql @@ -0,0 +1,7 @@ +{{ config ( + materialized = "view", + tags = ['streamline_view'] +) }} + +SELECT + 1 AS temp diff --git a/models/silver/streamline/_max_block_by_hour.sql b/models/silver/streamline/_max_block_by_hour.sql new file mode 100644 index 0000000..80e3d97 --- /dev/null +++ b/models/silver/streamline/_max_block_by_hour.sql @@ -0,0 +1,37 @@ +{{ config ( + materialized = "ephemeral" +) }} + +WITH base AS ( + + SELECT + DATE_TRUNC( + 'hour', + block_timestamp + ) AS block_hour, + MAX(block_number) block_number + FROM + {{ ref("silver__blocks") }} + WHERE + block_timestamp > DATEADD( + 'day', + -5, + CURRENT_DATE + ) + GROUP BY + 1 +) +SELECT + block_hour, + block_number +FROM + base +WHERE + block_hour <> ( + SELECT + MAX( + block_hour + ) + FROM + base + ) diff --git a/models/silver/streamline/core/complete/streamline__complete_confirmed_blocks.sql b/models/silver/streamline/core/complete/streamline__complete_confirmed_blocks.sql new file mode 100644 index 0000000..bf635cd --- /dev/null +++ b/models/silver/streamline/core/complete/streamline__complete_confirmed_blocks.sql @@ -0,0 +1,28 @@ +-- depends_on: {{ ref('bronze__streamline_confirm_blocks') }} +{{ config ( + materialized = "incremental", + unique_key = "id", + cluster_by = "ROUND(block_number, -3)" +) }} + +SELECT + id, + block_number, + _inserted_timestamp +FROM + +{% if is_incremental() %} +{{ ref('bronze__streamline_confirm_blocks') }} +WHERE + _inserted_timestamp >= ( + SELECT + COALESCE(MAX(_inserted_timestamp), '1970-01-01' :: TIMESTAMP) _inserted_timestamp + FROM + {{ this }}) + {% else %} + {{ ref('bronze__streamline_FR_confirm_blocks') }} + {% endif %} + + qualify(ROW_NUMBER() over (PARTITION BY id + ORDER BY + _inserted_timestamp DESC)) = 1 diff --git a/models/silver/streamline/core/complete/streamline__complete_debug_traceBlockByNumber.sql b/models/silver/streamline/core/complete/streamline__complete_debug_traceBlockByNumber.sql new file mode 100644 index 0000000..8df59cd --- /dev/null +++ b/models/silver/streamline/core/complete/streamline__complete_debug_traceBlockByNumber.sql @@ -0,0 +1,31 @@ +-- depends_on: {{ ref('bronze__streamline_traces') }} +{{ config ( + materialized = "incremental", + unique_key = "id", + cluster_by = "ROUND(block_number, -3)", + merge_update_columns = ["id"], + post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION on equality(id)" +) }} + +SELECT + id, + block_number, + _inserted_timestamp +FROM + +{% if is_incremental() %} +{{ ref('bronze__streamline_traces') }} +WHERE + _inserted_timestamp >= ( + SELECT + MAX(_inserted_timestamp) _inserted_timestamp + FROM + {{ this }} + ) +{% else %} + {{ ref('bronze__streamline_FR_traces') }} +{% endif %} + +qualify(ROW_NUMBER() over (PARTITION BY id +ORDER BY + _inserted_timestamp DESC)) = 1 \ No newline at end of file diff --git a/models/silver/streamline/core/complete/streamline__complete_qn_getBlockWithReceipts.sql b/models/silver/streamline/core/complete/streamline__complete_qn_getBlockWithReceipts.sql new file mode 100644 index 0000000..f73a1e9 --- /dev/null +++ b/models/silver/streamline/core/complete/streamline__complete_qn_getBlockWithReceipts.sql @@ -0,0 +1,31 @@ +-- depends_on: {{ ref('bronze__streamline_blocks') }} +{{ config ( + materialized = "incremental", + unique_key = "id", + cluster_by = "ROUND(block_number, -3)", + merge_update_columns = ["id"], + post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION on equality(id)" +) }} + +SELECT + id, + block_number, + _inserted_timestamp +FROM + +{% if is_incremental() %} +{{ ref('bronze__streamline_blocks') }} +WHERE + _inserted_timestamp >= ( + SELECT + MAX(_inserted_timestamp) _inserted_timestamp + FROM + {{ this }} + ) +{% else %} + {{ ref('bronze__streamline_FR_blocks') }} +{% endif %} + +qualify(ROW_NUMBER() over (PARTITION BY id +ORDER BY + _inserted_timestamp DESC)) = 1 \ No newline at end of file diff --git a/models/silver/streamline/core/realtime/streamline__confirm_blocks_realtime.sql b/models/silver/streamline/core/realtime/streamline__confirm_blocks_realtime.sql new file mode 100644 index 0000000..a046ac2 --- /dev/null +++ b/models/silver/streamline/core/realtime/streamline__confirm_blocks_realtime.sql @@ -0,0 +1,45 @@ +{{ config ( + materialized = "view", + post_hook = if_data_call_function( + func = "{{this.schema}}.udf_bulk_json_rpc(object_construct('sql_source', '{{this.identifier}}', 'external_table', 'confirm_blocks', 'sql_limit', {{var('sql_limit','100000')}}, 'producer_batch_size', {{var('producer_batch_size','100000')}}, 'worker_batch_size', {{var('worker_batch_size','25000')}}, 'batch_call_limit', {{var('batch_call_limit','10')}}, 'call_type', 'batch'))", + target = "{{this.schema}}.{{this.identifier}}" + ) +) }} + +with tbl AS ( + SELECT + block_number + FROM + {{ ref("streamline__blocks") }} + EXCEPT + SELECT + block_number + FROM + {{ ref("streamline__complete_confirmed_blocks") }} +) +SELECT + PARSE_JSON( + CONCAT( + '{"jsonrpc": "2.0",', + '"method": "eth_getBlockByNumber", "params":["', + REPLACE( + concat_ws( + '', + '0x', + to_char( + block_number :: INTEGER, + 'XXXXXXXX' + ) + ), + ' ', + '' + ), + '", false],"id":"', + block_number :: INTEGER, + '"}' + ) + ) AS request +FROM + tbl +ORDER BY + block_number ASC diff --git a/models/silver/streamline/core/realtime/streamline__debug_traceBlockByNumber_realtime.sql b/models/silver/streamline/core/realtime/streamline__debug_traceBlockByNumber_realtime.sql new file mode 100644 index 0000000..a96150d --- /dev/null +++ b/models/silver/streamline/core/realtime/streamline__debug_traceBlockByNumber_realtime.sql @@ -0,0 +1,47 @@ +{{ config ( + materialized = "view", + post_hook = if_data_call_function( + func = "{{this.schema}}.udf_bulk_json_rpc(object_construct('sql_source', '{{this.identifier}}', 'external_table', 'debug_traceBlockByNumber', 'sql_limit', {{var('sql_limit','60000')}}, 'producer_batch_size', {{var('producer_batch_size','60000')}}, 'worker_batch_size', {{var('worker_batch_size','60000')}}, 'call_type', 'rest', 'exploded_key','[\"result\"]'))", + target = "{{this.schema}}.{{this.identifier}}" + ) +) }} + +WITH blocks AS ( + + SELECT + block_number + FROM + {{ ref("streamline__blocks") }} + EXCEPT + SELECT + block_number + FROM + {{ ref("streamline__complete_debug_traceBlockByNumber") }} +) +SELECT + PARSE_JSON( + CONCAT( + '{"jsonrpc": "2.0",', + '"method": "debug_traceBlockByNumber", "params":["', + REPLACE( + concat_ws( + '', + '0x', + to_char( + block_number :: INTEGER, + 'XXXXXXXX' + ) + ), + ' ', + '' + ), + '",{"tracer": "callTracer", "timeout": "30s"}', + '],"id":"', + block_number :: INTEGER, + '"}' + ) + ) AS request +FROM + blocks +ORDER BY + block_number ASC diff --git a/models/silver/streamline/core/realtime/streamline__qn_getBlockWithReceipts_realtime.sql b/models/silver/streamline/core/realtime/streamline__qn_getBlockWithReceipts_realtime.sql new file mode 100644 index 0000000..a9c623f --- /dev/null +++ b/models/silver/streamline/core/realtime/streamline__qn_getBlockWithReceipts_realtime.sql @@ -0,0 +1,46 @@ +{{ config ( + materialized = "view", + post_hook = if_data_call_function( + func = "{{this.schema}}.udf_bulk_json_rpc(object_construct('sql_source', '{{this.identifier}}', 'external_table', 'qn_getBlockWithReceipts', 'sql_limit', {{var('sql_limit','100000')}}, 'producer_batch_size', {{var('producer_batch_size','100000')}}, 'worker_batch_size', {{var('worker_batch_size','25000')}}, 'batch_call_limit', {{var('batch_call_limit','10')}}, 'call_type', 'batch'))", + target = "{{this.schema}}.{{this.identifier}}" + ) +) }} + +WITH blocks AS ( + + SELECT + block_number :: STRING AS block_number + FROM + {{ ref("streamline__blocks") }} + EXCEPT + SELECT + block_number :: STRING + FROM + {{ ref("streamline__complete_qn_getBlockWithReceipts") }} +) +SELECT + PARSE_JSON( + CONCAT( + '{"jsonrpc": "2.0",', + '"method": "qn_getBlockWithReceipts", "params":["', + REPLACE( + concat_ws( + '', + '0x', + to_char( + block_number :: INTEGER, + 'XXXXXXXX' + ) + ), + ' ', + '' + ), + '"],"id":"', + block_number :: INTEGER, + '"}' + ) + ) AS request +FROM + blocks +ORDER BY + block_number ASC diff --git a/models/silver/streamline/core/streamline__blocks.sql b/models/silver/streamline/core/streamline__blocks.sql new file mode 100644 index 0000000..da743e6 --- /dev/null +++ b/models/silver/streamline/core/streamline__blocks.sql @@ -0,0 +1,17 @@ +{{ config ( + materialized = "view", + tags = ['streamline_view'] +) }} + + +{% if execute %} +{% set height = run_query('SELECT streamline.udf_get_chainhead()') %} +{% set block_height = height.columns[0].values()[0] %} +{% else %} +{% set block_height = 0 %} +{% endif %} + +SELECT + height as block_number +FROM + TABLE(streamline.udtf_get_base_table({{block_height}})) \ No newline at end of file diff --git a/models/sources.yml b/models/sources.yml index 2bde52f..f9e11d2 100644 --- a/models/sources.yml +++ b/models/sources.yml @@ -13,15 +13,15 @@ sources: tables: - name: address_labels - name: ez_hourly_prices - - name: ethereum + - name: ethereum database: ethereum - schema: core - tables: + schema: core + tables: - name: fact_hourly_token_prices - name: streamline_crosschain database: streamline schema: crosschain - tables: + tables: - name: node_mapping - name: crosschain_silver database: crosschain @@ -34,4 +34,16 @@ sources: database: crosschain schema: bronze_public tables: - - name: user_abis \ No newline at end of file + - name: user_abis + - name: bronze_streamline + database: streamline + schema: | + {{ "GNOSIS_DEV" if var("STREAMLINE_USE_DEV_FOR_EXTERNAL_TABLES", False) else "GNOSIS" }} + tables: + - name: receipts + - name: blocks + - name: transactions + - name: debug_traceBlockByNumber + - name: decoded_logs + - name: confirm_blocks + \ No newline at end of file