diff --git a/.github/workflows/dbt_run_adhoc.yml b/.github/workflows/dbt_run_adhoc.yml index b116e3e..b6a0e0b 100644 --- a/.github/workflows/dbt_run_adhoc.yml +++ b/.github/workflows/dbt_run_adhoc.yml @@ -48,9 +48,6 @@ jobs: runs-on: ubuntu-latest environment: name: workflow_${{ inputs.environment }} - strategy: - matrix: - command: ${{fromJson(inputs.dbt_command)}} steps: - uses: actions/checkout@v3 @@ -65,4 +62,4 @@ jobs: dbt deps - name: Run DBT Jobs run: | - ${{ matrix.command }} \ No newline at end of file + ${{ inputs.dbt_command }} \ No newline at end of file diff --git a/.github/workflows/dbt_run_streamline_history.yml b/.github/workflows/dbt_run_streamline_history.yml new file mode 100644 index 0000000..65626af --- /dev/null +++ b/.github/workflows/dbt_run_streamline_history.yml @@ -0,0 +1,47 @@ +name: dbt_run_streamline_blocks +run-name: dbt_run_streamline_blocks + +on: + workflow_dispatch: + schedule: + # Runs "every 6 hours" (see https://crontab.guru) + - cron: '0 1-23/6 * * *' + +env: + DBT_PROFILES_DIR: ./ + + ACCOUNT: "${{ vars.ACCOUNT }}" + ROLE: "${{ vars.ROLE }}" + USER: "${{ vars.USER }}" + PASSWORD: "${{ secrets.PASSWORD }}" + REGION: "${{ vars.REGION }}" + DATABASE: "${{ vars.DATABASE }}" + WAREHOUSE: "${{ vars.WAREHOUSE }}" + SCHEMA: "${{ vars.SCHEMA }}" + +concurrency: + group: ${{ github.workflow }} + +jobs: + run_dbt_jobs: + runs-on: ubuntu-latest + environment: + name: workflow_prod + + steps: + - uses: actions/checkout@v3 + + - uses: actions/setup-python@v1 + with: + python-version: "3.7.x" + + - name: install dependencies + run: | + pip3 install dbt-snowflake==${{ vars.DBT_VERSION }} cli_passthrough requests click + dbt deps + - name: Run DBT Jobs + run: | + dbt run --vars '{"STREAMLINE_INVOKE_STREAMS":True, "STREAMLINE_RUN_HISTORY":True}' -m 1+models/silver/streamline/streamline__blocks_realtime.sql + dbt run --vars '{"STREAMLINE_INVOKE_STREAMS":True, "STREAMLINE_RUN_HISTORY":True}' -m 1+models/silver/streamline/streamline__transactions_realtime.sql + dbt run --vars '{"STREAMLINE_INVOKE_STREAMS":True, "STREAMLINE_RUN_HISTORY":True' -m 1+models/silver/streamline/streamline__receipts_realtime.sql + dbt run --vars '{"STREAMLINE_INVOKE_STREAMS":True, "STREAMLINE_RUN_HISTORY":True}' -m 1+models/silver/streamline/streamline__traces_realtime.sql \ No newline at end of file diff --git a/.github/workflows/dbt_run_streamline_realtime.yml b/.github/workflows/dbt_run_streamline_realtime.yml new file mode 100644 index 0000000..22f5f21 --- /dev/null +++ b/.github/workflows/dbt_run_streamline_realtime.yml @@ -0,0 +1,47 @@ +name: dbt_run_streamline_blocks +run-name: dbt_run_streamline_blocks + +on: + workflow_dispatch: + schedule: + # Runs "every 2 hours" (see https://crontab.guru) + - cron: '0 1-23/2 * * *' + +env: + DBT_PROFILES_DIR: ./ + + ACCOUNT: "${{ vars.ACCOUNT }}" + ROLE: "${{ vars.ROLE }}" + USER: "${{ vars.USER }}" + PASSWORD: "${{ secrets.PASSWORD }}" + REGION: "${{ vars.REGION }}" + DATABASE: "${{ vars.DATABASE }}" + WAREHOUSE: "${{ vars.WAREHOUSE }}" + SCHEMA: "${{ vars.SCHEMA }}" + +concurrency: + group: ${{ github.workflow }} + +jobs: + run_dbt_jobs: + runs-on: ubuntu-latest + environment: + name: workflow_prod + + steps: + - uses: actions/checkout@v3 + + - uses: actions/setup-python@v1 + with: + python-version: "3.7.x" + + - name: install dependencies + run: | + pip3 install dbt-snowflake==${{ vars.DBT_VERSION }} cli_passthrough requests click + dbt deps + - name: Run DBT Jobs + run: | + dbt run --vars '{"STREAMLINE_INVOKE_STREAMS":True}' -m 1+models/silver/streamline/streamline__blocks_realtime.sql + dbt run --vars '{"STREAMLINE_INVOKE_STREAMS":True}' -m 1+models/silver/streamline/streamline__transactions_realtime.sql + dbt run --vars '{"STREAMLINE_INVOKE_STREAMS":True}' -m 1+models/silver/streamline/streamline__receipts_realtime.sql + dbt run --vars '{"STREAMLINE_INVOKE_STREAMS":True}' -m 1+models/silver/streamline/streamline__traces_realtime.sql \ No newline at end of file diff --git a/.gitignore b/.gitignore index ea7e08f..28c99ed 100644 --- a/.gitignore +++ b/.gitignore @@ -14,4 +14,6 @@ logs/ .history/ **/.DS_Store .vscode/ -dbt-env/ \ No newline at end of file +dbt-env/ +.env +.user.yml \ No newline at end of file diff --git a/dbt_project.yml b/dbt_project.yml index b70dd46..209f8c1 100644 --- a/dbt_project.yml +++ b/dbt_project.yml @@ -43,5 +43,8 @@ on-run-end: vars: "dbt_date:time_zone": GMT - UPDATE_UDFS_AND_SPS: False + STREAMLINE_INVOKE_STREAMS: False + STREAMLINE_USE_DEV_FOR_EXTERNAL_TABLES: False + STREAMLINE_RUN_HISTORY: False + UPDATE_UDFS_AND_SPS: True UPDATE_SNOWFLAKE_TAGS: True \ No newline at end of file diff --git a/macros/create_udfs.sql b/macros/create_udfs.sql index 916f09f..cfaeb1b 100644 --- a/macros/create_udfs.sql +++ b/macros/create_udfs.sql @@ -2,10 +2,15 @@ {% if var("UPDATE_UDFS_AND_SPS") %} {% set sql %} CREATE schema if NOT EXISTS silver; - {{ create_js_hex_to_int() }}; - {{ create_udf_hex_to_int( +{{ create_js_hex_to_int() }}; +{{ create_udf_hex_to_int( schema = "public" ) }} + {{ create_udtf_get_base_table( + schema = "streamline" + ) }} + {{ create_udf_get_chainhead() }} + {{ create_udf_json_rpc() }} {% endset %} {% do run_query(sql) %} diff --git a/macros/streamline/api_integrations.sql b/macros/streamline/api_integrations.sql new file mode 100644 index 0000000..97c0617 --- /dev/null +++ b/macros/streamline/api_integrations.sql @@ -0,0 +1,11 @@ +{% macro create_aws_bsc_api() %} + {% if target.name == "prod" %} + {% set sql %} + CREATE api integration IF NOT EXISTS aws_bsc_api api_provider = aws_api_gateway api_aws_role_arn = 'arn:aws:iam::490041342817:role/snowflake-api-bsc' api_allowed_prefixes = ( + 'https://2ltt1xstoc.execute-api.us-east-1.amazonaws.com/prod/', + 'https://qqy8pvhork.execute-api.us-east-1.amazonaws.com/dev/' + ) enabled = TRUE; +{% endset %} + {% do run_query(sql) %} + {% endif %} +{% endmacro %} diff --git a/macros/streamline/get_base_table_udtf.sql b/macros/streamline/get_base_table_udtf.sql new file mode 100644 index 0000000..c4ee9ab --- /dev/null +++ b/macros/streamline/get_base_table_udtf.sql @@ -0,0 +1,22 @@ +{% macro create_udtf_get_base_table(schema) %} + CREATE + OR REPLACE FUNCTION {{ schema }}.udtf_get_base_table( + max_height INTEGER + ) returns TABLE ( + height NUMBER + ) AS $$ WITH base AS ( + SELECT + ROW_NUMBER() over ( + ORDER BY + SEQ4() + ) AS id + FROM + TABLE(GENERATOR(rowcount => 100000000)) + ) +SELECT + id AS height +FROM + base +WHERE + id <= max_height $$; +{% endmacro %} diff --git a/macros/streamline/streamline_udfs.sql b/macros/streamline/streamline_udfs.sql new file mode 100644 index 0000000..b32ec11 --- /dev/null +++ b/macros/streamline/streamline_udfs.sql @@ -0,0 +1,17 @@ +{% macro create_udf_get_chainhead() %} + CREATE EXTERNAL FUNCTION IF NOT EXISTS streamline.udf_get_chainhead() returns variant api_integration = aws_bsc_api AS {% if target.name == "prod" %} + 'https://2ltt1xstoc.execute-api.us-east-1.amazonaws.com/prod/get_chainhead' + {% else %} + 'https://qqy8pvhork.execute-api.us-east-1.amazonaws.com/dev/get_chainhead' + {%- endif %}; +{% endmacro %} + +{% macro create_udf_json_rpc() %} + CREATE EXTERNAL FUNCTION IF NOT EXISTS streamline.udf_json_rpc( + json OBJECT + ) returns ARRAY api_integration = aws_bsc_api AS {% if target.name == "prod" %} + 'https://2ltt1xstoc.execute-api.us-east-1.amazonaws.com/prod/bulk_get_json_rpc' + {% else %} + 'https://qqy8pvhork.execute-api.us-east-1.amazonaws.com/dev/bulk_get_json_rpc' + {%- endif %}; +{% endmacro %} diff --git a/macros/utils.sql b/macros/utils.sql new file mode 100644 index 0000000..09c82fd --- /dev/null +++ b/macros/utils.sql @@ -0,0 +1,78 @@ +{% macro if_data_call_function( + func, + target + ) %} + {% if var( + "STREAMLINE_INVOKE_STREAMS" + ) %} + {% if execute %} + {{ log( + "Running macro `if_data_call_function`: Calling udf " ~ func ~ " on " ~ target, + True + ) }} + {% endif %} + SELECT + {{ func }} + WHERE + EXISTS( + SELECT + 1 + FROM + {{ target }} + LIMIT + 1 + ) + {% else %} + {% if execute %} + {{ log( + "Running macro `if_data_call_function`: NOOP", + False + ) }} + {% endif %} + SELECT + NULL + {% endif %} +{% endmacro %} + +{% macro if_data_call_wait() %} + {% if var( + "STREAMLINE_INVOKE_STREAMS" + ) %} + {% set query %} + SELECT + 1 + WHERE + EXISTS( + SELECT + 1 + FROM + {{ model.schema ~ "." ~ model.alias }} + LIMIT + 1 + ) {% endset %} + {% if execute %} + {% set results = run_query( + query + ) %} + {% if results %} + {{ log( + "Waiting...", + info = True + ) }} + + {% set wait_query %} + SELECT + system $ wait( + {{ var( + "WAIT", + 600 + ) }} + ) {% endset %} + {% do run_query(wait_query) %} + {% else %} + SELECT + NULL; + {% endif %} + {% endif %} + {% endif %} +{% endmacro %} diff --git a/models/silver/streamline/_max_block_by_date.sql b/models/silver/streamline/_max_block_by_date.sql new file mode 100644 index 0000000..7609b67 --- /dev/null +++ b/models/silver/streamline/_max_block_by_date.sql @@ -0,0 +1,27 @@ +{{ config ( + materialized = "ephemeral", + unique_key = "block_number", +) }} + +WITH base AS ( + + SELECT + block_timestamp :: DATE AS block_date, + MAX(block_number) block_number + FROM + {{ ref("silver__blocks") }} + GROUP BY + block_timestamp :: DATE +) +SELECT + block_date, + block_number +FROM + base +WHERE + block_date <> ( + SELECT + MAX(block_date) + FROM + base + ) diff --git a/models/silver/streamline/streamline__blocks.sql b/models/silver/streamline/streamline__blocks.sql new file mode 100644 index 0000000..bad8e4d --- /dev/null +++ b/models/silver/streamline/streamline__blocks.sql @@ -0,0 +1,21 @@ +{{ config ( + materialized = "view", + tags = ['streamline_view'] +) }} + +{% if execute %} + {% set height = run_query('SELECT streamline.udf_get_chainhead()') %} + {% set block_height = height.columns [0].values() [0] %} +{% else %} + {% set block_height = 0 %} +{% endif %} + +SELECT + height AS block_number, + REPLACE( + concat_ws('', '0x', to_char(height, 'XXXXXXXX')), + ' ', + '' + ) AS block_number_hex +FROM + TABLE(streamline.udtf_get_base_table({{ block_height }})) diff --git a/models/silver/streamline/streamline__blocks_realtime.sql b/models/silver/streamline/streamline__blocks_realtime.sql new file mode 100644 index 0000000..cded0f1 --- /dev/null +++ b/models/silver/streamline/streamline__blocks_realtime.sql @@ -0,0 +1,63 @@ +{{ config ( + materialized = "view", + post_hook = if_data_call_function( + func = "{{this.schema}}.udf_json_rpc(object_construct('sql_source', '{{this.identifier}}', 'external_table', 'blocks', 'exploded_key','data', 'method', 'eth_getBlockByNumber', 'producer_batch_size',10000, 'producer_limit_size', 1000000, 'worker_batch_size',1000, 'producer_batch_chunks_size', 10000))", + target = "{{this.schema}}.{{this.identifier}}" + ) +) }} + +WITH last_3_days AS ( + {% if var('STREAMLINE_RUN_HISTORY')%} + SELECT + 0 AS block_number + {% else %} + SELECT + MAX(block_number) - 100000 AS block_number --aprox 3 days + FROM + {{ ref("streamline__blocks") }} + {% endif %} +), +tbl AS ( + SELECT + block_number, + block_number_hex + FROM + {{ ref("streamline__blocks") }} + WHERE + ( + block_number >= ( + SELECT + block_number + FROM + last_3_days + ) + ) + AND block_number IS NOT NULL + EXCEPT + SELECT + block_number, + REPLACE( + concat_ws('', '0x', to_char(block_number, 'XXXXXXXX')), + ' ', + '' + ) AS block_number_hex + FROM + {{ ref("streamline__complete_blocks") }} + WHERE + block_number >= ( + SELECT + block_number + FROM + last_3_days + ) +) +SELECT + block_number, + 'eth_getBlockByNumber' AS method, + CONCAT( + block_number_hex, + '_-_', + 'false' + ) AS params +FROM + tbl \ No newline at end of file diff --git a/models/silver/streamline/streamline__complete_blocks.sql b/models/silver/streamline/streamline__complete_blocks.sql new file mode 100644 index 0000000..6b03ca8 --- /dev/null +++ b/models/silver/streamline/streamline__complete_blocks.sql @@ -0,0 +1,83 @@ +{{ config ( + materialized = "incremental", + unique_key = "id", + cluster_by = "ROUND(block_number, -3)", + merge_update_columns = ["id"], + post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION on equality(id)" +) }} + +WITH meta AS ( + + SELECT + registered_on, + last_modified, + LEAST( + last_modified, + registered_on + ) AS _inserted_timestamp, + file_name + FROM + TABLE( + information_schema.external_table_files( + table_name => '{{ source( "bronze_streamline", "blocks") }}' + ) + ) A + +{% if is_incremental() %} +WHERE + LEAST( + registered_on, + last_modified + ) >= ( + SELECT + COALESCE(MAX(_INSERTED_TIMESTAMP), '1970-01-01' :: DATE) max_INSERTED_TIMESTAMP + FROM + {{ this }}) + ), + partitions AS ( + SELECT + DISTINCT CAST( + SPLIT_PART(SPLIT_PART(file_name, '/', 4), '_', 1) AS INTEGER + ) AS _partition_by_block_number + FROM + meta + ) +{% else %} +) +{% endif %} +SELECT + {{ dbt_utils.surrogate_key( + ['block_number'] + ) }} AS id, + block_number, + last_modified AS _inserted_timestamp +FROM + {{ source( + "bronze_streamline", + "blocks" + ) }} + s + JOIN meta b + ON b.file_name = metadata$filename + +{% if is_incremental() %} +JOIN partitions p +ON p._partition_by_block_number = s._partition_by_block_id +{% endif %} +WHERE + DATA :error :code IS NULL + OR DATA :error :code NOT IN ( + '-32000', + '-32001', + '-32002', + '-32003', + '-32004', + '-32005', + '-32006', + '-32007', + '-32008', + '-32009', + '-32010' + ) qualify(ROW_NUMBER() over (PARTITION BY id +ORDER BY + _inserted_timestamp DESC)) = 1 \ No newline at end of file diff --git a/models/silver/streamline/streamline__complete_receipts.sql b/models/silver/streamline/streamline__complete_receipts.sql new file mode 100644 index 0000000..4b646e6 --- /dev/null +++ b/models/silver/streamline/streamline__complete_receipts.sql @@ -0,0 +1,83 @@ +{{ config ( + materialized = "incremental", + unique_key = "id", + cluster_by = "ROUND(block_number, -3)", + merge_update_columns = ["id"], + post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION on equality(id)" +) }} + +WITH meta AS ( + + SELECT + registered_on, + last_modified, + LEAST( + last_modified, + registered_on + ) AS _inserted_timestamp, + file_name + FROM + TABLE( + information_schema.external_table_files( + table_name => '{{ source( "bronze_streamline", "receipts") }}' + ) + ) A + +{% if is_incremental() %} +WHERE + LEAST( + registered_on, + last_modified + ) >= ( + SELECT + COALESCE(MAX(_INSERTED_TIMESTAMP), '1970-01-01' :: DATE) max_INSERTED_TIMESTAMP + FROM + {{ this }}) + ), + partitions AS ( + SELECT + DISTINCT CAST( + SPLIT_PART(SPLIT_PART(file_name, '/', 4), '_', 1) AS INTEGER + ) AS _partition_by_block_number + FROM + meta + ) +{% else %} +) +{% endif %} +SELECT + {{ dbt_utils.surrogate_key( + ['block_number'] + ) }} AS id, + block_number, + last_modified AS _inserted_timestamp +FROM + {{ source( + "bronze_streamline", + "receipts" + ) }} + s + JOIN meta b + ON b.file_name = metadata$filename + +{% if is_incremental() %} +JOIN partitions p +ON p._partition_by_block_number = s._partition_by_block_id +{% endif %} +WHERE + DATA :error :code IS NULL + OR DATA :error :code NOT IN ( + '-32000', + '-32001', + '-32002', + '-32003', + '-32004', + '-32005', + '-32006', + '-32007', + '-32008', + '-32009', + '-32010' + ) qualify(ROW_NUMBER() over (PARTITION BY id +ORDER BY + _inserted_timestamp DESC)) = 1 \ No newline at end of file diff --git a/models/silver/streamline/streamline__complete_traces.sql b/models/silver/streamline/streamline__complete_traces.sql new file mode 100644 index 0000000..d125b93 --- /dev/null +++ b/models/silver/streamline/streamline__complete_traces.sql @@ -0,0 +1,83 @@ +{{ config ( + materialized = "incremental", + unique_key = "id", + cluster_by = "ROUND(block_number, -3)", + merge_update_columns = ["id"], + post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION on equality(id)" +) }} + +WITH meta AS ( + + SELECT + registered_on, + last_modified, + LEAST( + last_modified, + registered_on + ) AS _inserted_timestamp, + file_name + FROM + TABLE( + information_schema.external_table_files( + table_name => '{{ source( "bronze_streamline", "traces") }}' + ) + ) A + +{% if is_incremental() %} +WHERE + LEAST( + registered_on, + last_modified + ) >= ( + SELECT + COALESCE(MAX(_INSERTED_TIMESTAMP), '1970-01-01' :: DATE) max_INSERTED_TIMESTAMP + FROM + {{ this }}) + ), + partitions AS ( + SELECT + DISTINCT CAST( + SPLIT_PART(SPLIT_PART(file_name, '/', 4), '_', 1) AS INTEGER + ) AS _partition_by_block_number + FROM + meta + ) +{% else %} +) +{% endif %} +SELECT + {{ dbt_utils.surrogate_key( + ['block_number'] + ) }} AS id, + block_number, + last_modified AS _inserted_timestamp +FROM + {{ source( + "bronze_streamline", + "traces" + ) }} + s + JOIN meta b + ON b.file_name = metadata$filename + +{% if is_incremental() %} +JOIN partitions p +ON p._partition_by_block_number = s._partition_by_block_id +{% endif %} +WHERE + DATA :error :code IS NULL + OR DATA :error :code NOT IN ( + '-32000', + '-32001', + '-32002', + '-32003', + '-32004', + '-32005', + '-32006', + '-32007', + '-32008', + '-32009', + '-32010' + ) qualify(ROW_NUMBER() over (PARTITION BY id +ORDER BY + _inserted_timestamp DESC)) = 1 \ No newline at end of file diff --git a/models/silver/streamline/streamline__complete_transactions.sql b/models/silver/streamline/streamline__complete_transactions.sql new file mode 100644 index 0000000..793334f --- /dev/null +++ b/models/silver/streamline/streamline__complete_transactions.sql @@ -0,0 +1,83 @@ +{{ config ( + materialized = "incremental", + unique_key = "id", + cluster_by = "ROUND(block_number, -3)", + merge_update_columns = ["id"], + post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION on equality(id)" +) }} + +WITH meta AS ( + + SELECT + registered_on, + last_modified, + LEAST( + last_modified, + registered_on + ) AS _inserted_timestamp, + file_name + FROM + TABLE( + information_schema.external_table_files( + table_name => '{{ source( "bronze_streamline", "transactions") }}' + ) + ) A + +{% if is_incremental() %} +WHERE + LEAST( + registered_on, + last_modified + ) >= ( + SELECT + COALESCE(MAX(_INSERTED_TIMESTAMP), '1970-01-01' :: DATE) max_INSERTED_TIMESTAMP + FROM + {{ this }}) + ), + partitions AS ( + SELECT + DISTINCT CAST( + SPLIT_PART(SPLIT_PART(file_name, '/', 4), '_', 1) AS INTEGER + ) AS _partition_by_block_number + FROM + meta + ) +{% else %} +) +{% endif %} +SELECT + {{ dbt_utils.surrogate_key( + ['block_number'] + ) }} AS id, + block_number, + last_modified AS _inserted_timestamp +FROM + {{ source( + "bronze_streamline", + "transactions" + ) }} + s + JOIN meta b + ON b.file_name = metadata$filename + +{% if is_incremental() %} +JOIN partitions p +ON p._partition_by_block_number = s._partition_by_block_id +{% endif %} +WHERE + DATA :error :code IS NULL + OR DATA :error :code NOT IN ( + '-32000', + '-32001', + '-32002', + '-32003', + '-32004', + '-32005', + '-32006', + '-32007', + '-32008', + '-32009', + '-32010' + ) qualify(ROW_NUMBER() over (PARTITION BY id +ORDER BY + _inserted_timestamp DESC)) = 1 \ No newline at end of file diff --git a/models/silver/streamline/streamline__receipts_realtime.sql b/models/silver/streamline/streamline__receipts_realtime.sql new file mode 100644 index 0000000..d474702 --- /dev/null +++ b/models/silver/streamline/streamline__receipts_realtime.sql @@ -0,0 +1,59 @@ +{{ config ( + materialized = "view", + post_hook = if_data_call_function( + func = "{{this.schema}}.udf_json_rpc(object_construct('sql_source', '{{this.identifier}}', 'external_table', 'receipts', 'exploded_key','data', 'method', 'eth_getBlockReceipts', 'producer_batch_size',1000, 'producer_limit_size', 1000000, 'worker_batch_size',100, 'producer_batch_chunks_size', 1000))", + target = "{{this.schema}}.{{this.identifier}}" + ) +) }} + +WITH last_3_days AS ( + {% if var('STREAMLINE_RUN_HISTORY')%} + SELECT + 0 AS block_number + {% else %} + SELECT + MAX(block_number) - 100000 AS block_number --aprox 3 days + FROM + {{ ref("streamline__blocks") }} + {% endif %} +), +tbl AS ( + SELECT + block_number, + block_number_hex + FROM + {{ ref("streamline__blocks") }} + WHERE + ( + block_number >= ( + SELECT + block_number + FROM + last_3_days + ) + ) + AND block_number IS NOT NULL + EXCEPT + SELECT + block_number, + REPLACE( + concat_ws('', '0x', to_char(block_number, 'XXXXXXXX')), + ' ', + '' + ) AS block_number_hex + FROM + {{ ref("streamline__complete_receipts") }} + WHERE + block_number >= ( + SELECT + block_number + FROM + last_3_days + ) +) +SELECT + block_number, + 'eth_getBlockReceipts' AS method, + block_number_hex AS params +FROM + tbl \ No newline at end of file diff --git a/models/silver/streamline/streamline__traces_realtime.sql b/models/silver/streamline/streamline__traces_realtime.sql new file mode 100644 index 0000000..385af1f --- /dev/null +++ b/models/silver/streamline/streamline__traces_realtime.sql @@ -0,0 +1,59 @@ +{{ config ( + materialized = "view", + post_hook = if_data_call_function( + func = "{{this.schema}}.udf_json_rpc(object_construct('sql_source', '{{this.identifier}}', 'external_table', 'traces', 'exploded_key','data', 'method', 'trace_block', 'producer_batch_size',100, 'producer_limit_size', 1000000, 'worker_batch_size',10, 'producer_batch_chunks_size', 100))", + target = "{{this.schema}}.{{this.identifier}}" + ) +) }} + +WITH last_3_days AS ( + {% if var('STREAMLINE_RUN_HISTORY')%} + SELECT + 0 AS block_number + {% else %} + SELECT + MAX(block_number) - 100000 AS block_number --aprox 3 days + FROM + {{ ref("streamline__blocks") }} + {% endif %} +), +tbl AS ( + SELECT + block_number, + block_number_hex + FROM + {{ ref("streamline__blocks") }} + WHERE + ( + block_number >= ( + SELECT + block_number + FROM + last_3_days + ) + ) + AND block_number IS NOT NULL + EXCEPT + SELECT + block_number, + REPLACE( + concat_ws('', '0x', to_char(block_number, 'XXXXXXXX')), + ' ', + '' + ) AS block_number_hex + FROM + {{ ref("streamline__complete_traces") }} + WHERE + block_number >= ( + SELECT + block_number + FROM + last_3_days + ) +) +SELECT + block_number, + 'trace_block' AS method, + block_number_hex AS params +FROM + tbl \ No newline at end of file diff --git a/models/silver/streamline/streamline__transactions_realtime.sql b/models/silver/streamline/streamline__transactions_realtime.sql new file mode 100644 index 0000000..6872b30 --- /dev/null +++ b/models/silver/streamline/streamline__transactions_realtime.sql @@ -0,0 +1,63 @@ +{{ config ( + materialized = "view", + post_hook = if_data_call_function( + func = "{{this.schema}}.udf_json_rpc(object_construct('sql_source', '{{this.identifier}}', 'external_table', 'transactions', 'exploded_key','data_-_result_-_transactions', 'method', 'eth_getBlockByNumber', 'producer_batch_size',1000, 'producer_limit_size', 1000000, 'worker_batch_size',100, 'producer_batch_chunks_size', 1000))", + target = "{{this.schema}}.{{this.identifier}}" + ) +) }} + +WITH last_3_days AS ( + {% if var('STREAMLINE_RUN_HISTORY')%} + SELECT + 0 AS block_number + {% else %} + SELECT + MAX(block_number) - 100000 AS block_number --aprox 3 days + FROM + {{ ref("streamline__blocks") }} + {% endif %} +), +tbl AS ( + SELECT + block_number, + block_number_hex + FROM + {{ ref("streamline__blocks") }} + WHERE + ( + block_number >= ( + SELECT + block_number + FROM + last_3_days + ) + ) + AND block_number IS NOT NULL + EXCEPT + SELECT + block_number, + REPLACE( + concat_ws('', '0x', to_char(block_number, 'XXXXXXXX')), + ' ', + '' + ) AS block_number_hex + FROM + {{ ref("streamline__complete_transactions") }} + WHERE + block_number >= ( + SELECT + block_number + FROM + last_3_days + ) +) +SELECT + block_number, + 'eth_getBlockByNumber' AS method, + CONCAT( + block_number_hex, + '_-_', + 'true' + ) AS params +FROM + tbl \ No newline at end of file diff --git a/models/sources.yml b/models/sources.yml index d7c88de..5b875e9 100644 --- a/models/sources.yml +++ b/models/sources.yml @@ -42,4 +42,13 @@ sources: database: crosschain schema: bronze_public tables: - - name: user_abis \ No newline at end of file + - name: user_abis + - name: bronze_streamline + database: streamline + schema: | + {{ "BSC_DEV" if var("STREAMLINE_USE_DEV_FOR_EXTERNAL_TABLES", False) else "BSC" }} + tables: + - name: blocks + - name: transactions + - name: receipts + - name: traces \ No newline at end of file