diff --git a/.github/workflows/dbt_run_streamline_blocks_realtime.yml b/.github/workflows/dbt_run_streamline_realtime.yml similarity index 79% rename from .github/workflows/dbt_run_streamline_blocks_realtime.yml rename to .github/workflows/dbt_run_streamline_realtime.yml index f1da471..fea58c3 100644 --- a/.github/workflows/dbt_run_streamline_blocks_realtime.yml +++ b/.github/workflows/dbt_run_streamline_realtime.yml @@ -1,11 +1,11 @@ -name: dbt_run_streamline_blocks_realtime -run-name: dbt_run_streamline_blocks_realtime +name: dbt_run_streamline_realtime +run-name: dbt_run_streamline_realtime on: workflow_dispatch: schedule: - # Runs "every 2 hours" (see https://crontab.guru) - - cron: '0 1-23/2 * * *' + # Runs "at minute 5 and 35, every hour" (see https://crontab.guru) + - cron: '5,35 * * * *' env: DBT_PROFILES_DIR: ./ @@ -41,4 +41,4 @@ jobs: dbt deps - name: Run DBT Jobs run: | - dbt run --vars '{"STREAMLINE_INVOKE_STREAMS":True}' -m 1+models/silver/streamline/streamline__blocks_realtime.sql + dbt run --vars '{"STREAMLINE_INVOKE_STREAMS":True}' -m 1+models/silver/streamline/core/realtime diff --git a/.github/workflows/dbt_run_streamline_receipts_realtime.yml b/.github/workflows/dbt_run_streamline_receipts_realtime.yml deleted file mode 100644 index 09ceec1..0000000 --- a/.github/workflows/dbt_run_streamline_receipts_realtime.yml +++ /dev/null @@ -1,44 +0,0 @@ -name: dbt_run_streamline_receipts_realtime -run-name: dbt_run_streamline_receipts_realtime - -on: - workflow_dispatch: - schedule: - # Runs "every 3 hours" (see https://crontab.guru) - - cron: '0 1-23/3 * * *' - -env: - DBT_PROFILES_DIR: ./ - - ACCOUNT: "${{ vars.ACCOUNT }}" - ROLE: "${{ vars.ROLE }}" - USER: "${{ vars.USER }}" - PASSWORD: "${{ secrets.PASSWORD }}" - REGION: "${{ vars.REGION }}" - DATABASE: "${{ vars.DATABASE }}" - WAREHOUSE: "${{ vars.WAREHOUSE }}" - SCHEMA: "${{ vars.SCHEMA }}" - -concurrency: - group: ${{ github.workflow }} - -jobs: - run_dbt_jobs: - runs-on: ubuntu-latest - environment: - name: workflow_prod - - steps: - - uses: actions/checkout@v3 - - - uses: actions/setup-python@v1 - with: - python-version: "3.7.x" - - - name: install dependencies - run: | - pip3 install dbt-snowflake~=${{ vars.DBT_VERSION }} cli_passthrough requests click - dbt deps - - name: Run DBT Jobs - run: | - dbt run --vars '{"STREAMLINE_INVOKE_STREAMS":True}' -m 1+models/silver/streamline/streamline__eth_getTransactionReceipt_realtime.sql diff --git a/.github/workflows/dbt_run_streamline_traces_realtime.yml b/.github/workflows/dbt_run_streamline_traces_realtime.yml deleted file mode 100644 index 74970f4..0000000 --- a/.github/workflows/dbt_run_streamline_traces_realtime.yml +++ /dev/null @@ -1,44 +0,0 @@ -name: dbt_run_streamline_traces_realtime -run-name: dbt_run_streamline_traces_realtime - -on: - workflow_dispatch: - schedule: - # Runs "every 1 hours" (see https://crontab.guru) - - cron: '0 */1 * * *' - -env: - DBT_PROFILES_DIR: ./ - - ACCOUNT: "${{ vars.ACCOUNT }}" - ROLE: "${{ vars.ROLE }}" - USER: "${{ vars.USER }}" - PASSWORD: "${{ secrets.PASSWORD }}" - REGION: "${{ vars.REGION }}" - DATABASE: "${{ vars.DATABASE }}" - WAREHOUSE: "${{ vars.WAREHOUSE }}" - SCHEMA: "${{ vars.SCHEMA }}" - -concurrency: - group: ${{ github.workflow }} - -jobs: - run_dbt_jobs: - runs-on: ubuntu-latest - environment: - name: workflow_prod - - steps: - - uses: actions/checkout@v3 - - - uses: actions/setup-python@v1 - with: - python-version: "3.7.x" - - - name: install dependencies - run: | - pip3 install dbt-snowflake~=${{ vars.DBT_VERSION }} cli_passthrough requests click - dbt deps - - name: Run DBT Jobs - run: | - dbt run --vars '{"STREAMLINE_INVOKE_STREAMS":True}' -m 1+models/silver/streamline/streamline__debug_traceTransaction_realtime.sql diff --git a/.github/workflows/dbt_run_streamline_transactions_realtime.yml b/.github/workflows/dbt_run_streamline_transactions_realtime.yml deleted file mode 100644 index ae79683..0000000 --- a/.github/workflows/dbt_run_streamline_transactions_realtime.yml +++ /dev/null @@ -1,44 +0,0 @@ -name: dbt_run_streamline_transactions_realtime -run-name: dbt_run_streamline_transactions_realtime - -on: - workflow_dispatch: - schedule: - # Runs "every 2 hours" (see https://crontab.guru) - - cron: '0 */2 * * *' - -env: - DBT_PROFILES_DIR: ./ - - ACCOUNT: "${{ vars.ACCOUNT }}" - ROLE: "${{ vars.ROLE }}" - USER: "${{ vars.USER }}" - PASSWORD: "${{ secrets.PASSWORD }}" - REGION: "${{ vars.REGION }}" - DATABASE: "${{ vars.DATABASE }}" - WAREHOUSE: "${{ vars.WAREHOUSE }}" - SCHEMA: "${{ vars.SCHEMA }}" - -concurrency: - group: ${{ github.workflow }} - -jobs: - run_dbt_jobs: - runs-on: ubuntu-latest - environment: - name: workflow_prod - - steps: - - uses: actions/checkout@v3 - - - uses: actions/setup-python@v1 - with: - python-version: "3.7.x" - - - name: install dependencies - run: | - pip3 install dbt-snowflake~=${{ vars.DBT_VERSION }} cli_passthrough requests click - dbt deps - - name: Run DBT Jobs - run: | - dbt run --vars '{"STREAMLINE_INVOKE_STREAMS":True}' -m 1+models/silver/streamline/streamline__transactions_realtime.sql diff --git a/macros/create_udfs.sql b/macros/create_udfs.sql index 922598c..c91e37d 100644 --- a/macros/create_udfs.sql +++ b/macros/create_udfs.sql @@ -12,7 +12,10 @@ {% set sql %} {{ create_udf_get_chainhead() }} {{ create_udf_bulk_json_rpc() }} - {{ create_udf_bulk_json_rpc_block_id() }} + {{ create_udf_bulk_get_traces() }} + {{ create_udf_decode_array_string() }} + {{ create_udf_decode_array_object() }} + {{ create_udf_bulk_decode_logs() }} {% endset %} {% do run_query(sql) %} diff --git a/macros/streamline/api_integrations.sql b/macros/streamline/api_integrations.sql index c245a9b..3ff4458 100644 --- a/macros/streamline/api_integrations.sql +++ b/macros/streamline/api_integrations.sql @@ -1,9 +1,19 @@ {% macro create_aws_base_api() %} + {{ log( + "Creating integration for target:" ~ target + ) }} + {% if target.name == "prod" %} {% set sql %} - CREATE api integration IF NOT EXISTS aws_base_api api_provider = aws_api_gateway api_aws_role_arn = 'arn:aws:iam::490041342817:role/snowflake-api-base' api_allowed_prefixes = ( - 'https://avaxk4phkl.execute-api.us-east-1.amazonaws.com/prod/', - 'https://k9b03inxm4.execute-api.us-east-1.amazonaws.com/dev/' + CREATE api integration IF NOT EXISTS aws_base_api api_provider = aws_api_gateway api_aws_role_arn = 'arn:aws:iam::490041342817:role/base-api-prod-rolesnowflakeudfsAF733095-FFKP94OAGPXW' api_allowed_prefixes = ( + 'https://u27qk1trpc.execute-api.us-east-1.amazonaws.com/prod/' + ) enabled = TRUE; +{% endset %} + {% do run_query(sql) %} + {% elif target.name == "dev" %} + {% set sql %} + CREATE api integration IF NOT EXISTS aws_base_api_dev api_provider = aws_api_gateway api_aws_role_arn = 'arn:aws:iam::490041342817:role/base-api-dev-rolesnowflakeudfsAF733095-I354FW5312ZX' api_allowed_prefixes = ( + 'https://rijt3fsk7b.execute-api.us-east-1.amazonaws.com/dev/' ) enabled = TRUE; {% endset %} {% do run_query(sql) %} diff --git a/macros/streamline/models.sql b/macros/streamline/models.sql new file mode 100644 index 0000000..b26d9c8 --- /dev/null +++ b/macros/streamline/models.sql @@ -0,0 +1,115 @@ +{% macro streamline_external_table_query( + model, + partition_function, + partition_name, + unique_key + ) %} + WITH meta AS ( + SELECT + last_modified AS _inserted_timestamp, + file_name, + {{ partition_function }} AS {{ partition_name }} + FROM + TABLE( + information_schema.external_table_file_registration_history( + start_time => DATEADD('day', -7, CURRENT_TIMESTAMP()), + table_name => '{{ source( "bronze_streamline", model) }}') + ) A + ) + SELECT + {{ unique_key }}, + DATA, + _inserted_timestamp, + MD5( + CAST( + COALESCE(CAST({{ unique_key }} AS text), '' :: STRING) AS text + ) + ) AS id, + s.{{ partition_name }}, + s.value AS VALUE + FROM + {{ source( + "bronze_streamline", + model + ) }} + s + JOIN meta b + ON b.file_name = metadata$filename + AND b.{{ partition_name }} = s.{{ partition_name }} + WHERE + b.{{ partition_name }} = s.{{ partition_name }} + AND ( + DATA :error :code IS NULL + OR DATA :error :code NOT IN ( + '-32000', + '-32001', + '-32002', + '-32003', + '-32004', + '-32005', + '-32006', + '-32007', + '-32008', + '-32009', + '-32010' + ) + ) +{% endmacro %} + +{% macro streamline_external_table_FR_query( + model, + partition_function, + partition_name, + unique_key + ) %} + WITH meta AS ( + SELECT + registered_on AS _inserted_timestamp, + file_name, + {{ partition_function }} AS {{ partition_name }} + FROM + TABLE( + information_schema.external_table_files( + table_name => '{{ source( "bronze_streamline", model) }}' + ) + ) A + ) +SELECT + {{ unique_key }}, + DATA, + _inserted_timestamp, + MD5( + CAST( + COALESCE(CAST({{ unique_key }} AS text), '' :: STRING) AS text + ) + ) AS id, + s.{{ partition_name }}, + s.value AS VALUE +FROM + {{ source( + "bronze_streamline", + model + ) }} + s + JOIN meta b + ON b.file_name = metadata$filename + AND b.{{ partition_name }} = s.{{ partition_name }} +WHERE + b.{{ partition_name }} = s.{{ partition_name }} + AND ( + DATA :error :code IS NULL + OR DATA :error :code NOT IN ( + '-32000', + '-32001', + '-32002', + '-32003', + '-32004', + '-32005', + '-32006', + '-32007', + '-32008', + '-32009', + '-32010' + ) + ) +{% endmacro %} diff --git a/macros/streamline/streamline_udfs.sql b/macros/streamline/streamline_udfs.sql index 494434a..d2a2755 100644 --- a/macros/streamline/streamline_udfs.sql +++ b/macros/streamline/streamline_udfs.sql @@ -1,9 +1,10 @@ {% macro create_udf_get_chainhead() %} - CREATE OR REPLACE EXTERNAL FUNCTION streamline.udf_get_chainhead( - ) returns variant api_integration = aws_base_api AS {% if target.name == "prod" %} - 'https://avaxk4phkl.execute-api.us-east-1.amazonaws.com/prod/get_chainhead' + CREATE + OR REPLACE EXTERNAL FUNCTION streamline.udf_get_chainhead() returns variant api_integration = + {% if target.name == "prod" %} + aws_base_api AS 'https://u27qk1trpc.execute-api.us-east-1.amazonaws.com/prod/get_chainhead' {% else %} - 'https://k9b03inxm4.execute-api.us-east-1.amazonaws.com/dev/get_chainhead' + aws_base_api_dev AS 'https://rijt3fsk7b.execute-api.us-east-1.amazonaws.com/dev/get_chainhead' {%- endif %}; {% endmacro %} @@ -11,20 +12,56 @@ CREATE OR REPLACE EXTERNAL FUNCTION streamline.udf_bulk_json_rpc( json variant - ) returns text api_integration = aws_base_api AS {% if target.name == "prod" %} - 'https://avaxk4phkl.execute-api.us-east-1.amazonaws.com/prod/udf_bulk_json_rpc' + ) returns text api_integration = {% if target.name == "prod" %} + aws_base_api AS 'https://u27qk1trpc.execute-api.us-east-1.amazonaws.com/prod/udf_bulk_json_rpc' {% else %} - 'https://k9b03inxm4.execute-api.us-east-1.amazonaws.com/dev/udf_bulk_json_rpc' + aws_base_api_dev AS 'https://rijt3fsk7b.execute-api.us-east-1.amazonaws.com/dev/udf_bulk_json_rpc' {%- endif %}; {% endmacro %} -{% macro create_udf_bulk_json_rpc_block_id() %} +{% macro create_udf_bulk_get_traces() %} CREATE - OR REPLACE EXTERNAL FUNCTION streamline.udf_bulk_json_rpc_block_id( + OR REPLACE EXTERNAL FUNCTION streamline.udf_bulk_get_traces( json variant - ) returns text api_integration = aws_base_api AS {% if target.name == "prod" %} - 'https://avaxk4phkl.execute-api.us-east-1.amazonaws.com/prod/udf_bulk_json_rpc_block_id' + ) returns text api_integration = {% if target.name == "prod" %} + aws_base_api AS 'https://u27qk1trpc.execute-api.us-east-1.amazonaws.com/prod/udf_bulk_get_traces' {% else %} - 'https://k9b03inxm4.execute-api.us-east-1.amazonaws.com/dev/udf_bulk_json_rpc_block_id' + aws_base_api_dev AS 'https://rijt3fsk7b.execute-api.us-east-1.amazonaws.com/dev/udf_bulk_get_traces' + {%- endif %}; +{% endmacro %} + +{% macro create_udf_decode_array_string() %} + CREATE + OR REPLACE EXTERNAL FUNCTION streamline.udf_decode( + abi ARRAY, + DATA STRING + ) returns ARRAY api_integration = {% if target.name == "prod" %} + aws_base_api AS 'https://u27qk1trpc.execute-api.us-east-1.amazonaws.com/prod/decode_function' + {% else %} + aws_base_api_dev AS 'https://rijt3fsk7b.execute-api.us-east-1.amazonaws.com/dev/decode_function' + {%- endif %}; +{% endmacro %} + +{% macro create_udf_decode_array_object() %} + CREATE + OR REPLACE EXTERNAL FUNCTION streamline.udf_decode( + abi ARRAY, + DATA OBJECT + ) returns ARRAY api_integration = {% if target.name == "prod" %} + aws_base_api AS 'https://u27qk1trpc.execute-api.us-east-1.amazonaws.com/prod/decode_log' + {% else %} + aws_base_api_dev AS 'https://rijt3fsk7b.execute-api.us-east-1.amazonaws.com/dev/decode_log' + {%- endif %}; +{% endmacro %} + + +{% macro create_udf_bulk_decode_logs() %} + CREATE + OR REPLACE EXTERNAL FUNCTION streamline.udf_bulk_decode_logs( + json OBJECT + ) returns ARRAY api_integration = {% if target.name == "prod" %} + aws_base_api AS 'https://u27qk1trpc.execute-api.us-east-1.amazonaws.com/prod/bulk_decode_logs' + {% else %} + aws_base_api_dev AS'https://rijt3fsk7b.execute-api.us-east-1.amazonaws.com/dev/bulk_decode_logs' {%- endif %}; {% endmacro %} diff --git a/models/bronze/core/bronze__streamline_FR_blocks.sql b/models/bronze/core/bronze__streamline_FR_blocks.sql new file mode 100644 index 0000000..a430319 --- /dev/null +++ b/models/bronze/core/bronze__streamline_FR_blocks.sql @@ -0,0 +1,11 @@ +{{ config ( + materialized = 'view' +) }} + +{% set model = this.identifier.split("_") [-1] %} +{{ streamline_external_table_FR_query( + model, + partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 3), '_', 1) AS INTEGER)", + partition_name = "_partition_by_block_id", + unique_key = "block_number" +) }} diff --git a/models/bronze/core/bronze__streamline_FR_confirm_blocks.sql b/models/bronze/core/bronze__streamline_FR_confirm_blocks.sql new file mode 100644 index 0000000..ae612bf --- /dev/null +++ b/models/bronze/core/bronze__streamline_FR_confirm_blocks.sql @@ -0,0 +1,9 @@ +{{ config ( + materialized = 'view' +) }} +{{ streamline_external_table_FR_query( + model = "confirm_blocks", + partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 4), '_', 1) AS INTEGER )", + partition_name = "_partition_by_block_id", + unique_key = "block_number" +) }} diff --git a/models/bronze/core/bronze__streamline_FR_receipts.sql b/models/bronze/core/bronze__streamline_FR_receipts.sql new file mode 100644 index 0000000..796b55c --- /dev/null +++ b/models/bronze/core/bronze__streamline_FR_receipts.sql @@ -0,0 +1,11 @@ +{{ config ( + materialized = 'view' +) }} + +{% set model = this.identifier.split("_") [-1] %} +{{ streamline_external_table_FR_query( + model, + partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 3), '_', 1) AS INTEGER )", + partition_name = "_partition_by_block_id", + unique_key = "block_number" +) }} diff --git a/models/bronze/core/bronze__streamline_FR_traces.sql b/models/bronze/core/bronze__streamline_FR_traces.sql new file mode 100644 index 0000000..9c994cc --- /dev/null +++ b/models/bronze/core/bronze__streamline_FR_traces.sql @@ -0,0 +1,10 @@ +{{ config ( + materialized = 'view' +) }} + +{{ streamline_external_table_FR_query( + model = "debug_traceBlockByNumber", + partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 3), '_', 1) AS INTEGER )", + partition_name = "_partition_by_block_id", + unique_key = "block_number" +) }} diff --git a/models/bronze/core/bronze__streamline_FR_transactions.sql b/models/bronze/core/bronze__streamline_FR_transactions.sql new file mode 100644 index 0000000..796b55c --- /dev/null +++ b/models/bronze/core/bronze__streamline_FR_transactions.sql @@ -0,0 +1,11 @@ +{{ config ( + materialized = 'view' +) }} + +{% set model = this.identifier.split("_") [-1] %} +{{ streamline_external_table_FR_query( + model, + partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 3), '_', 1) AS INTEGER )", + partition_name = "_partition_by_block_id", + unique_key = "block_number" +) }} diff --git a/models/bronze/core/bronze__streamline_blocks.sql b/models/bronze/core/bronze__streamline_blocks.sql new file mode 100644 index 0000000..22b0c51 --- /dev/null +++ b/models/bronze/core/bronze__streamline_blocks.sql @@ -0,0 +1,11 @@ +{{ config ( + materialized = 'view' +) }} + +{% set model = this.identifier.split("_") [-1] %} +{{ streamline_external_table_query( + model, + partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 3), '_', 1) AS INTEGER)", + partition_name = "_partition_by_block_id", + unique_key = "block_number" +) }} diff --git a/models/bronze/core/bronze__streamline_confirm_blocks.sql b/models/bronze/core/bronze__streamline_confirm_blocks.sql new file mode 100644 index 0000000..af45189 --- /dev/null +++ b/models/bronze/core/bronze__streamline_confirm_blocks.sql @@ -0,0 +1,9 @@ +{{ config ( + materialized = 'view' +) }} +{{ streamline_external_table_query( + model = "confirm_blocks", + partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 4), '_', 1) AS INTEGER )", + partition_name = "_partition_by_block_id", + unique_key = "block_number" +) }} diff --git a/models/bronze/core/bronze__streamline_receipts.sql b/models/bronze/core/bronze__streamline_receipts.sql new file mode 100644 index 0000000..7fad1ea --- /dev/null +++ b/models/bronze/core/bronze__streamline_receipts.sql @@ -0,0 +1,11 @@ +{{ config ( + materialized = 'view' +) }} + +{% set model = this.identifier.split("_") [-1] %} +{{ streamline_external_table_query( + model, + partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 3), '_', 1) AS INTEGER )", + partition_name = "_partition_by_block_id", + unique_key = "block_number" +) }} diff --git a/models/bronze/core/bronze__streamline_traces.sql b/models/bronze/core/bronze__streamline_traces.sql new file mode 100644 index 0000000..1423eb9 --- /dev/null +++ b/models/bronze/core/bronze__streamline_traces.sql @@ -0,0 +1,10 @@ +{{ config ( + materialized = 'view' +) }} + +{{ streamline_external_table_query( + model = "debug_traceBlockByNumber", + partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 3), '_', 1) AS INTEGER )", + partition_name = "_partition_by_block_id", + unique_key = "block_number" +) }} diff --git a/models/bronze/core/bronze__streamline_transactions.sql b/models/bronze/core/bronze__streamline_transactions.sql new file mode 100644 index 0000000..da8e375 --- /dev/null +++ b/models/bronze/core/bronze__streamline_transactions.sql @@ -0,0 +1,11 @@ +{{ config ( + materialized = 'view' +) }} + +{% set model = this.identifier.split("_") [-1] %} +{{ streamline_external_table_query( + model, + partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 3), '_', 1) AS INTEGER )", + partition_name = "_partition_by_block_id", + unique_key = "block_number" +) }} diff --git a/models/silver/base_goerli/methods/silver_goerli__blocks_method.sql b/models/silver/base_goerli/methods/silver_goerli__blocks_method.sql index 8d10ec8..e3b3936 100644 --- a/models/silver/base_goerli/methods/silver_goerli__blocks_method.sql +++ b/models/silver/base_goerli/methods/silver_goerli__blocks_method.sql @@ -14,7 +14,7 @@ WITH meta AS ( FROM TABLE( information_schema.external_table_files( - table_name => '{{ source( "bronze_streamline", "blocks") }}' + table_name => '{{ source( "bronze_streamline", "goerli_blocks") }}' ) ) A @@ -48,7 +48,7 @@ base AS ( FROM {{ source( "bronze_streamline", - "blocks" + "goerli_blocks" ) }} t JOIN meta b diff --git a/models/silver/base_goerli/methods/silver_goerli__receipts_method.sql b/models/silver/base_goerli/methods/silver_goerli__receipts_method.sql index 9b4f2dd..47b90fb 100644 --- a/models/silver/base_goerli/methods/silver_goerli__receipts_method.sql +++ b/models/silver/base_goerli/methods/silver_goerli__receipts_method.sql @@ -13,7 +13,7 @@ WITH meta AS ( FROM TABLE( information_schema.external_table_files( - table_name => '{{ source( "bronze_streamline", "eth_getTransactionReceipt") }}' + table_name => '{{ source( "bronze_streamline", "goerli_eth_getTransactionReceipt") }}' ) ) A @@ -47,7 +47,7 @@ base AS ( FROM {{ source( "bronze_streamline", - "eth_getTransactionReceipt" + "goerli_eth_getTransactionReceipt" ) }} t JOIN meta b @@ -66,7 +66,7 @@ base AS ( '-32008', '-32009', '-32010' - ) + ) OR response :: STRING IS NOT NULL ) @@ -93,5 +93,5 @@ SELECT _inserted_timestamp FROM base -QUALIFY ROW_NUMBER() OVER (PARTITION BY tx_hash +QUALIFY ROW_NUMBER() OVER (PARTITION BY tx_hash ORDER BY _inserted_timestamp DESC) = 1 \ No newline at end of file diff --git a/models/silver/base_goerli/methods/silver_goerli__traces_method.sql b/models/silver/base_goerli/methods/silver_goerli__traces_method.sql index 7236987..b30f711 100644 --- a/models/silver/base_goerli/methods/silver_goerli__traces_method.sql +++ b/models/silver/base_goerli/methods/silver_goerli__traces_method.sql @@ -14,7 +14,7 @@ WITH meta AS ( FROM TABLE( information_schema.external_table_files( - table_name => '{{ source( "bronze_streamline", "debug_traceTransaction") }}' + table_name => '{{ source( "bronze_streamline", "goerli_debug_traceTransaction") }}' ) ) A @@ -53,7 +53,7 @@ SELECT FROM {{ source( "bronze_streamline", - "debug_traceTransaction" + "goerli_debug_traceTransaction" ) }} t JOIN meta b diff --git a/models/silver/base_goerli/methods/silver_goerli__tx_method.sql b/models/silver/base_goerli/methods/silver_goerli__tx_method.sql index 84c85a2..4435332 100644 --- a/models/silver/base_goerli/methods/silver_goerli__tx_method.sql +++ b/models/silver/base_goerli/methods/silver_goerli__tx_method.sql @@ -14,7 +14,7 @@ WITH meta AS ( FROM TABLE( information_schema.external_table_files( - table_name => '{{ source( "bronze_streamline", "transactions") }}' + table_name => '{{ source( "bronze_streamline", "goerli_transactions") }}' ) ) A @@ -49,7 +49,7 @@ base AS ( FROM {{ source( "bronze_streamline", - "transactions" + "goerli_transactions" ) }} t JOIN meta b diff --git a/models/silver/silver__temp.sql b/models/silver/silver__temp.sql new file mode 100644 index 0000000..1f4d4c4 --- /dev/null +++ b/models/silver/silver__temp.sql @@ -0,0 +1,8 @@ +{{ config ( + materialized = "view", + tags = ['streamline_view'] +) }} + +SELECT + 1 AS temp + diff --git a/models/silver/streamline/core/complete/streamline__complete_confirmed_blocks.sql b/models/silver/streamline/core/complete/streamline__complete_confirmed_blocks.sql new file mode 100644 index 0000000..bf635cd --- /dev/null +++ b/models/silver/streamline/core/complete/streamline__complete_confirmed_blocks.sql @@ -0,0 +1,28 @@ +-- depends_on: {{ ref('bronze__streamline_confirm_blocks') }} +{{ config ( + materialized = "incremental", + unique_key = "id", + cluster_by = "ROUND(block_number, -3)" +) }} + +SELECT + id, + block_number, + _inserted_timestamp +FROM + +{% if is_incremental() %} +{{ ref('bronze__streamline_confirm_blocks') }} +WHERE + _inserted_timestamp >= ( + SELECT + COALESCE(MAX(_inserted_timestamp), '1970-01-01' :: TIMESTAMP) _inserted_timestamp + FROM + {{ this }}) + {% else %} + {{ ref('bronze__streamline_FR_confirm_blocks') }} + {% endif %} + + qualify(ROW_NUMBER() over (PARTITION BY id + ORDER BY + _inserted_timestamp DESC)) = 1 diff --git a/models/silver/streamline/core/complete/streamline__complete_debug_traceBlockByNumber.sql b/models/silver/streamline/core/complete/streamline__complete_debug_traceBlockByNumber.sql new file mode 100644 index 0000000..8df59cd --- /dev/null +++ b/models/silver/streamline/core/complete/streamline__complete_debug_traceBlockByNumber.sql @@ -0,0 +1,31 @@ +-- depends_on: {{ ref('bronze__streamline_traces') }} +{{ config ( + materialized = "incremental", + unique_key = "id", + cluster_by = "ROUND(block_number, -3)", + merge_update_columns = ["id"], + post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION on equality(id)" +) }} + +SELECT + id, + block_number, + _inserted_timestamp +FROM + +{% if is_incremental() %} +{{ ref('bronze__streamline_traces') }} +WHERE + _inserted_timestamp >= ( + SELECT + MAX(_inserted_timestamp) _inserted_timestamp + FROM + {{ this }} + ) +{% else %} + {{ ref('bronze__streamline_FR_traces') }} +{% endif %} + +qualify(ROW_NUMBER() over (PARTITION BY id +ORDER BY + _inserted_timestamp DESC)) = 1 \ No newline at end of file diff --git a/models/silver/streamline/core/complete/streamline__complete_qn_getBlockWithReceipts.sql b/models/silver/streamline/core/complete/streamline__complete_qn_getBlockWithReceipts.sql new file mode 100644 index 0000000..f73a1e9 --- /dev/null +++ b/models/silver/streamline/core/complete/streamline__complete_qn_getBlockWithReceipts.sql @@ -0,0 +1,31 @@ +-- depends_on: {{ ref('bronze__streamline_blocks') }} +{{ config ( + materialized = "incremental", + unique_key = "id", + cluster_by = "ROUND(block_number, -3)", + merge_update_columns = ["id"], + post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION on equality(id)" +) }} + +SELECT + id, + block_number, + _inserted_timestamp +FROM + +{% if is_incremental() %} +{{ ref('bronze__streamline_blocks') }} +WHERE + _inserted_timestamp >= ( + SELECT + MAX(_inserted_timestamp) _inserted_timestamp + FROM + {{ this }} + ) +{% else %} + {{ ref('bronze__streamline_FR_blocks') }} +{% endif %} + +qualify(ROW_NUMBER() over (PARTITION BY id +ORDER BY + _inserted_timestamp DESC)) = 1 \ No newline at end of file diff --git a/models/silver/streamline/core/realtime/streamline__confirm_blocks_realtime.sql b/models/silver/streamline/core/realtime/streamline__confirm_blocks_realtime.sql new file mode 100644 index 0000000..ceed6a3 --- /dev/null +++ b/models/silver/streamline/core/realtime/streamline__confirm_blocks_realtime.sql @@ -0,0 +1,45 @@ +{{ config ( + materialized = "view", + post_hook = if_data_call_function( + func = "{{this.schema}}.udf_bulk_json_rpc(object_construct('sql_source', '{{this.identifier}}', 'external_table', 'confirm_blocks', 'sql_limit', {{var('sql_limit','100000')}}, 'producer_batch_size', {{var('producer_batch_size','100000')}}, 'worker_batch_size', {{var('worker_batch_size','100000')}}, 'batch_call_limit', {{var('batch_call_limit','10')}}))", + target = "{{this.schema}}.{{this.identifier}}" + ) +) }} + +with tbl AS ( + SELECT + block_number + FROM + {{ ref("streamline__blocks") }} + EXCEPT + SELECT + block_number + FROM + {{ ref("streamline__complete_confirmed_blocks") }} +) +SELECT + PARSE_JSON( + CONCAT( + '{"jsonrpc": "2.0",', + '"method": "eth_getBlockByNumber", "params":["', + REPLACE( + concat_ws( + '', + '0x', + to_char( + block_number :: INTEGER, + 'XXXXXXXX' + ) + ), + ' ', + '' + ), + '", false],"id":"', + block_number :: INTEGER, + '"}' + ) + ) AS request +FROM + tbl +ORDER BY + block_number ASC diff --git a/models/silver/streamline/core/realtime/streamline__debug_traceBlockByNumber_realtime.sql b/models/silver/streamline/core/realtime/streamline__debug_traceBlockByNumber_realtime.sql new file mode 100644 index 0000000..56f3834 --- /dev/null +++ b/models/silver/streamline/core/realtime/streamline__debug_traceBlockByNumber_realtime.sql @@ -0,0 +1,47 @@ +{{ config ( + materialized = "view", + post_hook = if_data_call_function( + func = "{{this.schema}}.udf_bulk_get_traces(object_construct('sql_source', '{{this.identifier}}', 'external_table', 'debug_traceBlockByNumber', 'sql_limit', {{var('sql_limit','100000')}}, 'producer_batch_size', {{var('producer_batch_size','100000')}}, 'worker_batch_size', {{var('worker_batch_size','50000')}}, 'batch_call_limit', {{var('batch_call_limit','1')}}))", + target = "{{this.schema}}.{{this.identifier}}" + ) +) }} + + +with blocks AS ( + SELECT + block_number + FROM + {{ ref("streamline__blocks") }} + EXCEPT + SELECT + block_number + FROM + {{ ref("streamline__complete_debug_traceBlockByNumber") }} +) +SELECT + PARSE_JSON( + CONCAT( + '{"jsonrpc": "2.0",', + '"method": "debug_traceBlockByNumber", "params":["', + REPLACE( + concat_ws( + '', + '0x', + to_char( + block_number :: INTEGER, + 'XXXXXXXX' + ) + ), + ' ', + '' + ), + '",{"tracer": "callTracer", "timeout": "30s"}', + '],"id":"', + block_number :: INTEGER, + '"}' + ) + ) AS request +FROM + blocks +ORDER BY + block_number ASC diff --git a/models/silver/streamline/core/realtime/streamline__qn_getBlockWithReceipts_realtime.sql b/models/silver/streamline/core/realtime/streamline__qn_getBlockWithReceipts_realtime.sql new file mode 100644 index 0000000..7983147 --- /dev/null +++ b/models/silver/streamline/core/realtime/streamline__qn_getBlockWithReceipts_realtime.sql @@ -0,0 +1,45 @@ +{{ config ( + materialized = "view", + post_hook = if_data_call_function( + func = "{{this.schema}}.udf_bulk_json_rpc(object_construct('sql_source', '{{this.identifier}}', 'external_table', 'qn_getBlockWithReceipts', 'sql_limit', {{var('sql_limit','100000')}}, 'producer_batch_size', {{var('producer_batch_size','100000')}}, 'worker_batch_size', {{var('worker_batch_size','100000')}}, 'batch_call_limit', {{var('batch_call_limit','10')}}))", + target = "{{this.schema}}.{{this.identifier}}" + ) +) }} + +with blocks AS ( + SELECT + block_number :: STRING AS block_number + FROM + {{ ref("streamline__blocks") }} + EXCEPT + SELECT + block_number :: STRING + FROM + {{ ref("streamline__complete_qn_getBlockWithReceipts") }} +) +SELECT + PARSE_JSON( + CONCAT( + '{"jsonrpc": "2.0",', + '"method": "qn_getBlockWithReceipts", "params":["', + REPLACE( + concat_ws( + '', + '0x', + to_char( + block_number :: INTEGER, + 'XXXXXXXX' + ) + ), + ' ', + '' + ), + '"],"id":"', + block_number :: INTEGER, + '"}' + ) + ) AS request +FROM + blocks +ORDER BY + block_number ASC diff --git a/models/silver/streamline/streamline__blocks.sql b/models/silver/streamline/core/streamline__blocks.sql similarity index 100% rename from models/silver/streamline/streamline__blocks.sql rename to models/silver/streamline/core/streamline__blocks.sql diff --git a/models/silver/streamline/streamline__blocks_realtime.sql b/models/silver/streamline/streamline__blocks_realtime.sql deleted file mode 100644 index 9fa6d20..0000000 --- a/models/silver/streamline/streamline__blocks_realtime.sql +++ /dev/null @@ -1,39 +0,0 @@ -{{ config ( - materialized = "view", - post_hook = if_data_call_function( - func = "{{this.schema}}.udf_bulk_json_rpc(object_construct('sql_source', '{{this.identifier}}', 'external_table', 'blocks', 'sql_limit', {{var('sql_limit','100000')}}, 'producer_batch_size', {{var('producer_batch_size','100000')}}, 'worker_batch_size', {{var('worker_batch_size','50000')}}, 'batch_call_limit', {{var('batch_call_limit','1000')}}))", - target = "{{this.schema}}.{{this.identifier}}" - ) -) }} - -WITH blocks AS ( - - SELECT - block_number :: STRING AS block_number - FROM - {{ ref("streamline__blocks") }} - WHERE - block_number > 1000000 - EXCEPT - SELECT - block_number :: STRING - FROM - {{ ref("streamline__complete_blocks") }} - WHERE - block_number > 1000000 -) -SELECT - PARSE_JSON( - CONCAT( - '{"jsonrpc": "2.0",', - '"method": "eth_getBlockByNumber", "params":[', - block_number :: STRING, - ',', - FALSE :: BOOLEAN, - '],"id":', - block_number :: STRING, - '}' - ) - ) AS request -FROM - blocks diff --git a/models/silver/streamline/streamline__complete_blocks.sql b/models/silver/streamline/streamline__complete_blocks.sql deleted file mode 100644 index 09033b5..0000000 --- a/models/silver/streamline/streamline__complete_blocks.sql +++ /dev/null @@ -1,79 +0,0 @@ -{{ config ( - materialized = "incremental", - unique_key = "id", - cluster_by = "ROUND(block_number, -3)", - merge_update_columns = ["id"], - post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION on equality(id)" -) }} - -WITH meta AS ( - - SELECT - registered_on, - last_modified, - file_name - FROM - TABLE( - information_schema.external_table_files( - table_name => '{{ source( "bronze_streamline", "blocks") }}' - ) - ) A - -{% if is_incremental() %} -WHERE - LEAST( - registered_on, - last_modified - ) >= ( - SELECT - COALESCE(MAX(_INSERTED_TIMESTAMP), '1970-01-01' :: DATE) max_INSERTED_TIMESTAMP - FROM - {{ this }}) - ), - partitions AS ( - SELECT - CAST( - SPLIT_PART(SPLIT_PART(file_name, '/', 3), '_', 1) AS INTEGER - ) AS _partition_by_block_number - FROM - meta - ) -{% else %} -) -{% endif %} -SELECT - MD5( - CAST(COALESCE(CAST(block_number AS text), '') AS text) - ) AS id, - block_number, - registered_on AS _inserted_timestamp -FROM - {{ source( - "bronze_streamline", - "blocks" - ) }} - t - JOIN meta b - ON b.file_name = metadata$filename - -{% if is_incremental() %} -JOIN partitions p -ON p._partition_by_block_number = t._partition_by_block_id -{% endif %} -WHERE - DATA :error :code IS NULL - OR DATA :error :code NOT IN ( - '-32000', - '-32001', - '-32002', - '-32003', - '-32004', - '-32005', - '-32006', - '-32007', - '-32008', - '-32009', - '-32010' - ) qualify(ROW_NUMBER() over (PARTITION BY id -ORDER BY - _inserted_timestamp DESC)) = 1 diff --git a/models/silver/streamline/streamline__complete_debug_traceTransaction.sql b/models/silver/streamline/streamline__complete_debug_traceTransaction.sql deleted file mode 100644 index fa1271d..0000000 --- a/models/silver/streamline/streamline__complete_debug_traceTransaction.sql +++ /dev/null @@ -1,80 +0,0 @@ -{{ config ( - materialized = "incremental", - unique_key = "id", - cluster_by = "ROUND(block_number, -3)", - merge_update_columns = ["id"], - post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION on equality(id)" -) }} - -WITH meta AS ( - - SELECT - registered_on, - last_modified, - file_name - FROM - TABLE( - information_schema.external_table_files( - table_name => '{{ source( "bronze_streamline", "debug_traceTransaction") }}' - ) - ) A - -{% if is_incremental() %} -WHERE - LEAST( - registered_on, - last_modified - ) >= ( - SELECT - COALESCE(MAX(_INSERTED_TIMESTAMP), '1970-01-01' :: DATE) max_INSERTED_TIMESTAMP - FROM - {{ this }}) - ), - partitions AS ( - SELECT - CAST( - SPLIT_PART(SPLIT_PART(file_name, '/', 3), '_', 1) AS INTEGER - ) AS _partition_by_block_number - FROM - meta - ) -{% else %} -) -{% endif %} -SELECT - split(data:id :: STRING, '-')[1] :: STRING as tx_hash, - block_number, - MD5( - CAST(COALESCE(CAST(block_number AS text), '') AS text) || CAST(COALESCE(CAST(tx_hash AS text), '') AS text) - ) AS id, - registered_on AS _inserted_timestamp -FROM - {{ source( - "bronze_streamline", - "debug_traceTransaction" - ) }} - t - JOIN meta b - ON b.file_name = metadata$filename - -{% if is_incremental() %} -JOIN partitions p -ON p._partition_by_block_number = t._partition_by_block_id -{% endif %} -WHERE - DATA :error :code IS NULL - OR DATA :error :code NOT IN ( - '-32000', - '-32001', - '-32002', - '-32003', - '-32004', - '-32005', - '-32006', - '-32007', - '-32008', - '-32009', - '-32010' - ) qualify(ROW_NUMBER() over (PARTITION BY id -ORDER BY - _inserted_timestamp DESC)) = 1 diff --git a/models/silver/streamline/streamline__complete_eth_getTransactionReceipt.sql b/models/silver/streamline/streamline__complete_eth_getTransactionReceipt.sql deleted file mode 100644 index 32c3013..0000000 --- a/models/silver/streamline/streamline__complete_eth_getTransactionReceipt.sql +++ /dev/null @@ -1,80 +0,0 @@ -{{ config ( - materialized = "incremental", - unique_key = "id", - cluster_by = "ROUND(block_number, -3)", - merge_update_columns = ["id"], - post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION on equality(id)" -) }} - -WITH meta AS ( - - SELECT - registered_on, - last_modified, - file_name - FROM - TABLE( - information_schema.external_table_files( - table_name => '{{ source( "bronze_streamline", "eth_getTransactionReceipt") }}' - ) - ) A - -{% if is_incremental() %} -WHERE - LEAST( - registered_on, - last_modified - ) >= ( - SELECT - COALESCE(MAX(_INSERTED_TIMESTAMP), '1970-01-01' :: DATE) max_INSERTED_TIMESTAMP - FROM - {{ this }}) - ), - partitions AS ( - SELECT - CAST( - SPLIT_PART(SPLIT_PART(file_name, '/', 3), '_', 1) AS INTEGER - ) AS _partition_by_block_number - FROM - meta - ) -{% else %} -) -{% endif %} -SELECT - split(data:id :: STRING, '-')[1] :: STRING as tx_hash, - block_number, - MD5( - CAST(COALESCE(CAST(block_number AS text), '') AS text) || CAST(COALESCE(CAST(tx_hash AS text), '') AS text) - ) AS id, - registered_on AS _inserted_timestamp -FROM - {{ source( - "bronze_streamline", - "eth_getTransactionReceipt" - ) }} - t - JOIN meta b - ON b.file_name = metadata$filename - -{% if is_incremental() %} -JOIN partitions p -ON p._partition_by_block_number = t._partition_by_block_id -{% endif %} -WHERE - DATA :error :code IS NULL - OR DATA :error :code NOT IN ( - '-32000', - '-32001', - '-32002', - '-32003', - '-32004', - '-32005', - '-32006', - '-32007', - '-32008', - '-32009', - '-32010' - ) qualify(ROW_NUMBER() over (PARTITION BY id -ORDER BY - _inserted_timestamp DESC)) = 1 diff --git a/models/silver/streamline/streamline__complete_transactions.sql b/models/silver/streamline/streamline__complete_transactions.sql deleted file mode 100644 index 6c22a65..0000000 --- a/models/silver/streamline/streamline__complete_transactions.sql +++ /dev/null @@ -1,79 +0,0 @@ -{{ config ( - materialized = "incremental", - unique_key = "id", - cluster_by = "ROUND(block_number, -3)", - merge_update_columns = ["id"], - post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION on equality(id)" -) }} - -WITH meta AS ( - - SELECT - registered_on, - last_modified, - file_name - FROM - TABLE( - information_schema.external_table_files( - table_name => '{{ source( "bronze_streamline", "transactions") }}' - ) - ) A - -{% if is_incremental() %} -WHERE - LEAST( - registered_on, - last_modified - ) >= ( - SELECT - COALESCE(MAX(_INSERTED_TIMESTAMP), '1970-01-01' :: DATE) max_INSERTED_TIMESTAMP - FROM - {{ this }}) - ), - partitions AS ( - SELECT - CAST( - SPLIT_PART(SPLIT_PART(file_name, '/', 3), '_', 1) AS INTEGER - ) AS _partition_by_block_number - FROM - meta - ) -{% else %} -) -{% endif %} -SELECT - MD5( - CAST(COALESCE(CAST(block_number AS text), '') AS text) - ) AS id, - block_number, - registered_on AS _inserted_timestamp -FROM - {{ source( - "bronze_streamline", - "transactions" - ) }} - t - JOIN meta b - ON b.file_name = metadata$filename - -{% if is_incremental() %} -JOIN partitions p -ON p._partition_by_block_number = t._partition_by_block_id -{% endif %} -WHERE - DATA :error :code IS NULL - OR DATA :error :code NOT IN ( - '-32000', - '-32001', - '-32002', - '-32003', - '-32004', - '-32005', - '-32006', - '-32007', - '-32008', - '-32009', - '-32010' - ) qualify(ROW_NUMBER() over (PARTITION BY id -ORDER BY - _inserted_timestamp DESC)) = 1 diff --git a/models/silver/streamline/streamline__debug_traceTransaction_realtime.sql b/models/silver/streamline/streamline__debug_traceTransaction_realtime.sql deleted file mode 100644 index 94730df..0000000 --- a/models/silver/streamline/streamline__debug_traceTransaction_realtime.sql +++ /dev/null @@ -1,42 +0,0 @@ -{{ config ( - materialized = "view", - post_hook = if_data_call_function( - func = "{{this.schema}}.udf_bulk_json_rpc_block_id(object_construct('sql_source', '{{this.identifier}}', 'external_table', 'debug_traceTransaction', 'sql_limit', {{var('sql_limit','400000')}}, 'producer_batch_size', {{var('producer_batch_size','40000')}}, 'worker_batch_size', {{var('worker_batch_size','20000')}}, 'batch_call_limit', {{var('batch_call_limit','10')}}))", - target = "{{this.schema}}.{{this.identifier}}" - ) -) }} - -WITH transactions AS ( - - SELECT - tx_hash :: STRING as tx_hash, - block_number :: STRING AS block_number - FROM - {{ ref("streamline__transactions") }} - WHERE - block_number > 1000000 - EXCEPT - SELECT - tx_hash :: STRING, - block_number :: STRING - FROM - {{ ref("streamline__complete_debug_traceTransaction") }} - WHERE - block_number > 1000000 -) -SELECT - PARSE_JSON( - CONCAT( - '{"jsonrpc": "2.0",', - '"method": "debug_traceTransaction", "params":["', - tx_hash :: STRING, - '",{"tracer": "callTracer"}', - '],"id":"', - block_number :: STRING, - '-', - tx_hash :: STRING, - '"}' - ) - ) AS request -FROM - transactions diff --git a/models/silver/streamline/streamline__eth_getTransactionReceipt_realtime.sql b/models/silver/streamline/streamline__eth_getTransactionReceipt_realtime.sql deleted file mode 100644 index 2535c62..0000000 --- a/models/silver/streamline/streamline__eth_getTransactionReceipt_realtime.sql +++ /dev/null @@ -1,41 +0,0 @@ -{{ config ( - materialized = "view", - post_hook = if_data_call_function( - func = "{{this.schema}}.udf_bulk_json_rpc_block_id(object_construct('sql_source', '{{this.identifier}}', 'external_table', 'eth_getTransactionReceipt', 'sql_limit', {{var('sql_limit','200000')}}, 'producer_batch_size', {{var('producer_batch_size','20000')}}, 'worker_batch_size', {{var('worker_batch_size','10000')}}, 'batch_call_limit', {{var('batch_call_limit','50')}}))", - target = "{{this.schema}}.{{this.identifier}}" - ) -) }} - -WITH transactions AS ( - - SELECT - tx_hash :: STRING as tx_hash, - block_number :: STRING AS block_number - FROM - {{ ref("streamline__transactions") }} - WHERE - block_number > 1000000 - EXCEPT - SELECT - tx_hash :: STRING, - block_number :: STRING - FROM - {{ ref("streamline__complete_eth_getTransactionReceipt") }} - WHERE - block_number > 1000000 -) -SELECT - PARSE_JSON( - CONCAT( - '{"jsonrpc": "2.0",', - '"method": "eth_getTransactionReceipt", "params":["', - tx_hash :: STRING, - '"],"id":"', - block_number :: STRING, - '-', - tx_hash :: STRING, - '"}' - ) - ) AS request -FROM - transactions diff --git a/models/silver/streamline/streamline__transactions.sql b/models/silver/streamline/streamline__transactions.sql deleted file mode 100644 index 33d04ea..0000000 --- a/models/silver/streamline/streamline__transactions.sql +++ /dev/null @@ -1,10 +0,0 @@ -{{ config ( - materialized = "view", - tags = ['streamline_view'] -) }} - -SELECT - tx_hash :: STRING as tx_hash, - block_number -FROM - {{ ref('silver_goerli__tx_method') }} diff --git a/models/silver/streamline/streamline__transactions_realtime.sql b/models/silver/streamline/streamline__transactions_realtime.sql deleted file mode 100644 index 15241b6..0000000 --- a/models/silver/streamline/streamline__transactions_realtime.sql +++ /dev/null @@ -1,39 +0,0 @@ -{{ config ( - materialized = "view", - post_hook = if_data_call_function( - func = "{{this.schema}}.udf_bulk_json_rpc(object_construct('sql_source', '{{this.identifier}}', 'external_table', 'transactions', 'sql_limit', {{var('sql_limit','100000')}}, 'producer_batch_size', {{var('producer_batch_size','50000')}}, 'worker_batch_size', {{var('worker_batch_size','25000')}}, 'batch_call_limit', {{var('batch_call_limit','500')}}))", - target = "{{this.schema}}.{{this.identifier}}" - ) -) }} - -WITH blocks AS ( - - SELECT - block_number :: STRING AS block_number - FROM - {{ ref("streamline__blocks") }} - WHERE - block_number > 1000000 - EXCEPT - SELECT - block_number :: STRING - FROM - {{ ref("streamline__complete_transactions") }} - WHERE - block_number > 1000000 -) -SELECT - PARSE_JSON( - CONCAT( - '{"jsonrpc": "2.0",', - '"method": "eth_getBlockByNumber", "params":[', - block_number :: STRING, - ',', - TRUE :: BOOLEAN, - '],"id":', - block_number :: STRING, - '}' - ) - ) AS request -FROM - blocks diff --git a/models/sources.yml b/models/sources.yml index 08f0393..9544b03 100644 --- a/models/sources.yml +++ b/models/sources.yml @@ -11,10 +11,16 @@ sources: schema: | {{ "BASE_DEV" if var("STREAMLINE_USE_DEV_FOR_EXTERNAL_TABLES", False) else "BASE" }} tables: + - name: receipts - name: blocks - name: transactions - - name: debug_traceTransaction - - name: eth_getTransactionReceipt + - name: debug_traceBlockByNumber + - name: decoded_logs + - name: confirm_blocks + - name: goerli_blocks + - name: goerli_transactions + - name: goerli_debug_traceTransaction + - name: goerli_eth_getTransactionReceipt - name: udfs_streamline database: udfs schema: streamline