From d5577d9602ea430a18fcb4cac5174dc438dea8df Mon Sep 17 00:00:00 2001 From: xiuy001 <97179309+xiuy001@users.noreply.github.com> Date: Thu, 11 May 2023 20:32:32 -0400 Subject: [PATCH] An 3060 get streamline in dbt (#3) * updated * update * added all * updated * updated create udfs * updated api_integrations --- .github/workflows/dbt_run_streamline.yml | 48 ----- .../workflows/dbt_run_streamline_history.yml | 44 +++++ .../workflows/dbt_run_streamline_realtime.yml | 44 +++++ .../dbt_run_streamline_validators.yml | 48 ----- dbt_project.yml | 7 +- macros/create_sps.sql | 2 +- macros/create_udfs.sql | 7 + macros/streamline/api_integrations.sql | 11 ++ macros/streamline/get_base_table_udtf.sql | 24 +++ macros/streamline/models.sql | 171 ++++++++++++++++++ macros/streamline/streamline_udfs.sql | 40 ++++ macros/utils.sql | 43 +++++ .../bronze__streamline_FR_eth_blocks.sql | 10 + ...bronze__streamline_FR_eth_transactions.sql | 10 + ...ronze__streamline_FR_tendermint_blocks.sql | 10 + ..._streamline_FR_tendermint_transactions.sql | 10 + ...e__streamline_FR_tendermint_validators.sql | 10 + .../bronze__streamline_eth_blocks.sql | 12 ++ .../bronze__streamline_eth_transactions.sql | 12 ++ .../bronze__streamline_tendermint_blocks.sql | 12 ++ ...ze__streamline_tendermint_transactions.sql | 12 ++ ...onze__streamline_tendermint_validators.sql | 12 ++ models/sources.yml | 11 ++ .../streamline__complete_eth_blocks.sql | 30 +++ .../streamline__complete_eth_transactions.sql | 30 +++ ...streamline__complete_tendermint_blocks.sql | 30 +++ ...line__complete_tendermint_transactions.sql | 30 +++ ...amline__complete_tendermint_validators.sql | 30 +++ .../streamline__eth_blocks_realtime.sql | 62 +++++++ .../streamline__eth_transactions_realtime.sql | 62 +++++++ ...streamline__tendermint_blocks_realtime.sql | 52 ++++++ ...line__tendermint_transactions_realtime.sql | 50 +++++ ...amline__tendermint_validators_realtime.sql | 50 +++++ models/streamline/streamline__blocks.sql | 21 +++ 34 files changed, 959 insertions(+), 98 deletions(-) delete mode 100644 .github/workflows/dbt_run_streamline.yml create mode 100644 .github/workflows/dbt_run_streamline_history.yml create mode 100644 .github/workflows/dbt_run_streamline_realtime.yml delete mode 100644 .github/workflows/dbt_run_streamline_validators.yml create mode 100644 macros/streamline/api_integrations.sql create mode 100644 macros/streamline/get_base_table_udtf.sql create mode 100644 macros/streamline/models.sql create mode 100644 macros/streamline/streamline_udfs.sql create mode 100644 models/bronze/streamline/bronze__streamline_FR_eth_blocks.sql create mode 100644 models/bronze/streamline/bronze__streamline_FR_eth_transactions.sql create mode 100644 models/bronze/streamline/bronze__streamline_FR_tendermint_blocks.sql create mode 100644 models/bronze/streamline/bronze__streamline_FR_tendermint_transactions.sql create mode 100644 models/bronze/streamline/bronze__streamline_FR_tendermint_validators.sql create mode 100644 models/bronze/streamline/bronze__streamline_eth_blocks.sql create mode 100644 models/bronze/streamline/bronze__streamline_eth_transactions.sql create mode 100644 models/bronze/streamline/bronze__streamline_tendermint_blocks.sql create mode 100644 models/bronze/streamline/bronze__streamline_tendermint_transactions.sql create mode 100644 models/bronze/streamline/bronze__streamline_tendermint_validators.sql create mode 100644 models/streamline/complete/streamline__complete_eth_blocks.sql create mode 100644 models/streamline/complete/streamline__complete_eth_transactions.sql create mode 100644 models/streamline/complete/streamline__complete_tendermint_blocks.sql create mode 100644 models/streamline/complete/streamline__complete_tendermint_transactions.sql create mode 100644 models/streamline/complete/streamline__complete_tendermint_validators.sql create mode 100644 models/streamline/realtime/streamline__eth_blocks_realtime.sql create mode 100644 models/streamline/realtime/streamline__eth_transactions_realtime.sql create mode 100644 models/streamline/realtime/streamline__tendermint_blocks_realtime.sql create mode 100644 models/streamline/realtime/streamline__tendermint_transactions_realtime.sql create mode 100644 models/streamline/realtime/streamline__tendermint_validators_realtime.sql create mode 100644 models/streamline/streamline__blocks.sql diff --git a/.github/workflows/dbt_run_streamline.yml b/.github/workflows/dbt_run_streamline.yml deleted file mode 100644 index 3e12928..0000000 --- a/.github/workflows/dbt_run_streamline.yml +++ /dev/null @@ -1,48 +0,0 @@ -name: dbt_run_streamline -run-name: dbt_run_streamline - -on: - push: - branches: - - main - - turn-off-dev-turn-on-prod - schedule: - # Runs "every 6 hours" (see https://crontab.guru) - - cron: '0 */6 * * *' - -env: - DBT_PROFILES_DIR: "${{ secrets.DBT_PROFILES_DIR }}" - - ACCOUNT: "${{ secrets.ACCOUNT }}" - ROLE: "${{ secrets.ROLE }}" - USER: "${{ secrets.USER }}" - PASSWORD: "${{ secrets.PASSWORD }}" - REGION: "${{ secrets.REGION }}" - DATABASE: "${{ secrets.DATABASE }}" - WAREHOUSE: "${{ secrets.WAREHOUSE }}" - SCHEMA: "${{ secrets.SCHEMA }}" - - -jobs: - run_dbt_jobs: - runs-on: ubuntu-latest - environment: - name: workflow_prod - - steps: - - uses: actions/checkout@v3 - - - uses: actions/setup-python@v1 - with: - python-version: "3.7.x" - - - name: install dependencies - run: | - pip3 install dbt-snowflake==${{ secrets.DBT_VERSION }} cli_passthrough requests click - dbt deps - - name: Run DBT Jobs - run: | - dbt run -m ./models/streamline/* - - - diff --git a/.github/workflows/dbt_run_streamline_history.yml b/.github/workflows/dbt_run_streamline_history.yml new file mode 100644 index 0000000..5b3254b --- /dev/null +++ b/.github/workflows/dbt_run_streamline_history.yml @@ -0,0 +1,44 @@ +name: dbt_run_streamline_history +run-name: dbt_run_streamline_history + +on: + workflow_dispatch: + schedule: + # Runs "every 6 hours" (see https://crontab.guru) + - cron: '0 1-23/6 * * *' + +env: + DBT_PROFILES_DIR: ./ + + ACCOUNT: "${{ vars.ACCOUNT }}" + ROLE: "${{ vars.ROLE }}" + USER: "${{ vars.USER }}" + PASSWORD: "${{ secrets.PASSWORD }}" + REGION: "${{ vars.REGION }}" + DATABASE: "${{ vars.DATABASE }}" + WAREHOUSE: "${{ vars.WAREHOUSE }}" + SCHEMA: "${{ vars.SCHEMA }}" + +concurrency: + group: ${{ github.workflow }} + +jobs: + run_dbt_jobs: + runs-on: ubuntu-latest + environment: + name: workflow_prod + + steps: + - uses: actions/checkout@v3 + + - uses: actions/setup-python@v1 + with: + python-version: "3.7.x" + + - name: install dependencies + run: | + pip3 install dbt-snowflake==${{ vars.DBT_VERSION }} cli_passthrough requests click + dbt deps + - name: Run DBT Jobs + run: | + dbt run --vars '{"STREAMLINE_INVOKE_STREAMS":True, "STREAMLINE_RUN_HISTORY":True}' -m 1+models/streamline/realtime \ No newline at end of file diff --git a/.github/workflows/dbt_run_streamline_realtime.yml b/.github/workflows/dbt_run_streamline_realtime.yml new file mode 100644 index 0000000..f7cf2bc --- /dev/null +++ b/.github/workflows/dbt_run_streamline_realtime.yml @@ -0,0 +1,44 @@ +name: dbt_run_streamline_realtime +run-name: dbt_run_streamline_realtime + +on: + workflow_dispatch: + schedule: + # Runs "every 1 hour at min 40" (see https://crontab.guru) + - cron: '40 */1 * * *' + +env: + DBT_PROFILES_DIR: ./ + + ACCOUNT: "${{ vars.ACCOUNT }}" + ROLE: "${{ vars.ROLE }}" + USER: "${{ vars.USER }}" + PASSWORD: "${{ secrets.PASSWORD }}" + REGION: "${{ vars.REGION }}" + DATABASE: "${{ vars.DATABASE }}" + WAREHOUSE: "${{ vars.WAREHOUSE }}" + SCHEMA: "${{ vars.SCHEMA }}" + +concurrency: + group: ${{ github.workflow }} + +jobs: + run_dbt_jobs: + runs-on: ubuntu-latest + environment: + name: workflow_prod_backfill + + steps: + - uses: actions/checkout@v3 + + - uses: actions/setup-python@v1 + with: + python-version: "3.7.x" + + - name: install dependencies + run: | + pip3 install dbt-snowflake==${{ vars.DBT_VERSION }} cli_passthrough requests click + dbt deps + - name: Run DBT Jobs + run: | + dbt run --vars '{"STREAMLINE_INVOKE_STREAMS":True}' -m 1+models/streamline/realtime \ No newline at end of file diff --git a/.github/workflows/dbt_run_streamline_validators.yml b/.github/workflows/dbt_run_streamline_validators.yml deleted file mode 100644 index ec08c55..0000000 --- a/.github/workflows/dbt_run_streamline_validators.yml +++ /dev/null @@ -1,48 +0,0 @@ -name: dbt_run_streamline_validators -run-name: dbt_run_streamline_validators - -on: - push: - branches: - - main - - turn-off-dev-turn-on-prod - schedule: - # Runs "every 6 hours" (see https://crontab.guru) - - cron: '0 0,12,23 * * *' - -env: - DBT_PROFILES_DIR: "${{ secrets.DBT_PROFILES_DIR }}" - - ACCOUNT: "${{ secrets.ACCOUNT }}" - ROLE: "${{ secrets.ROLE }}" - USER: "${{ secrets.USER }}" - PASSWORD: "${{ secrets.PASSWORD }}" - REGION: "${{ secrets.REGION }}" - DATABASE: "${{ secrets.DATABASE }}" - WAREHOUSE: "${{ secrets.WAREHOUSE }}" - SCHEMA: "${{ secrets.SCHEMA }}" - - -jobs: - run_dbt_jobs: - runs-on: ubuntu-latest - environment: - name: workflow_prod - - steps: - - uses: actions/checkout@v3 - - - uses: actions/setup-python@v1 - with: - python-version: "3.7.x" - - - name: install dependencies - run: | - pip3 install dbt-snowflake==${{ secrets.DBT_VERSION }} cli_passthrough requests click - dbt deps - - name: Run DBT Jobs - run: | - dbt run --full-refresh -x -m 1+models/streamline/streamline__validators_realtime.sql - - - diff --git a/dbt_project.yml b/dbt_project.yml index 0f63641..3b9ebd4 100644 --- a/dbt_project.yml +++ b/dbt_project.yml @@ -41,7 +41,12 @@ models: vars: "dbt_date:time_zone": GMT - "UPDATE_SNOWFLAKE_TAGS": TRUE + STREAMLINE_INVOKE_STREAMS: False + STREAMLINE_USE_DEV_FOR_EXTERNAL_TABLES: False + UPDATE_UDFS_AND_SPS: False + STREAMLINE_RUN_HISTORY: False + UPDATE_SNOWFLAKE_TAGS: True + WAIT: 0 tests: +store_failures: true # all tests diff --git a/macros/create_sps.sql b/macros/create_sps.sql index b077b9e..04ae357 100644 --- a/macros/create_sps.sql +++ b/macros/create_sps.sql @@ -1,5 +1,5 @@ {% macro create_sps() %} - {% if target.database == 'evmos' %} + {% if target.database == 'EVMOS' %} CREATE SCHEMA IF NOT EXISTS _internal; {{ sp_create_prod_clone('_internal') }}; {% endif %} diff --git a/macros/create_udfs.sql b/macros/create_udfs.sql index 56fe0ff..1f1e9bb 100644 --- a/macros/create_udfs.sql +++ b/macros/create_udfs.sql @@ -1,2 +1,9 @@ {% macro create_udfs() %} + {{ create_udtf_get_base_table( + schema = "streamline" + ) }} + {{ create_udf_get_chainhead() }} + {{ create_udf_json_rpc() }} + {{ create_udf_get_tendermint_transactions() }} + {{ create_udf_get_tendermint_validators() }} {% endmacro %} diff --git a/macros/streamline/api_integrations.sql b/macros/streamline/api_integrations.sql new file mode 100644 index 0000000..1c374b3 --- /dev/null +++ b/macros/streamline/api_integrations.sql @@ -0,0 +1,11 @@ +{% macro create_aws_ethereum_api() %} + {% if target.name == "prod" %} + {% set sql %} + CREATE api integration IF NOT EXISTS aws_evmos_api api_provider = aws_api_gateway api_aws_role_arn = 'arn:aws:iam::490041342817:role/snowflake-api-evmos' api_allowed_prefixes = ( + 'https://55h4rahr50.execute-api.us-east-1.amazonaws.com/dev/', + 'https://n0reh6ugbf.execute-api.us-east-1.amazonaws.com/prod/' + ) enabled = TRUE; +{% endset %} + {% do run_query(sql) %} + {% endif %} +{% endmacro %} diff --git a/macros/streamline/get_base_table_udtf.sql b/macros/streamline/get_base_table_udtf.sql new file mode 100644 index 0000000..a488d14 --- /dev/null +++ b/macros/streamline/get_base_table_udtf.sql @@ -0,0 +1,24 @@ +{% macro create_udtf_get_base_table(schema) %} +create or replace function {{ schema }}.udtf_get_base_table(max_height integer) +returns table (height number) +as +$$ + with base as ( + select + row_number() over ( + order by + seq4() + ) as id + from + table(generator(rowcount => 100000000)) + ) +select + id as height +from + base +where + id <= max_height +$$ +; + +{% endmacro %} \ No newline at end of file diff --git a/macros/streamline/models.sql b/macros/streamline/models.sql new file mode 100644 index 0000000..02cf854 --- /dev/null +++ b/macros/streamline/models.sql @@ -0,0 +1,171 @@ + +{% macro decode_logs_history( + start, + stop + ) %} + WITH look_back AS ( + SELECT + block_number + FROM + {{ ref("_max_block_by_date") }} + qualify ROW_NUMBER() over ( + ORDER BY + block_number DESC + ) = 1 + ) +SELECT + l.block_number, + l._log_id, + abi.data AS abi, + l.data +FROM + {{ ref("streamline__decode_logs") }} + l + INNER JOIN {{ ref("silver__abis") }} + abi + ON l.abi_address = abi.contract_address +WHERE + ( + l.block_number BETWEEN {{ start }} + AND {{ stop }} + ) + AND l.block_number <= ( + SELECT + block_number + FROM + look_back + ) + AND _log_id NOT IN ( + SELECT + _log_id + FROM + {{ ref("streamline__complete_decode_logs") }} + WHERE + ( + block_number BETWEEN {{ start }} + AND {{ stop }} + ) + AND block_number <= ( + SELECT + block_number + FROM + look_back + ) + ) +{% endmacro %} + +{% macro streamline_external_table_query( + model, + partition_function, + partition_name, + unique_key + ) %} + WITH meta AS ( + SELECT + job_created_time AS _inserted_timestamp, + file_name, + {{ partition_function }} AS {{ partition_name }} + FROM + TABLE( + information_schema.external_table_file_registration_history( + start_time => DATEADD('day', -7, CURRENT_TIMESTAMP()), + table_name => '{{ source( "bronze_streamline", model) }}') + ) A + ) + SELECT + {{ unique_key }}, + DATA, + _inserted_timestamp, + MD5( + CAST( + COALESCE(CAST({{ unique_key }} AS text), '' :: STRING) AS text + ) + ) AS id, + s.{{ partition_name }}, + s.value AS VALUE + FROM + {{ source( + "bronze_streamline", + model + ) }} + s + JOIN meta b + ON b.file_name = metadata$filename + AND b.{{ partition_name }} = s.{{ partition_name }} + WHERE + b.{{ partition_name }} = s.{{ partition_name }} + AND ( + DATA :error :code IS NULL + OR DATA :error :code NOT IN ( + '-32000', + '-32001', + '-32002', + '-32003', + '-32004', + '-32005', + '-32006', + '-32007', + '-32008', + '-32009', + '-32010' + ) + ) +{% endmacro %} + +{% macro streamline_external_table_FR_query( + model, + partition_function, + partition_name, + unique_key + ) %} + WITH meta AS ( + SELECT + registered_on AS _inserted_timestamp, + file_name, + {{ partition_function }} AS {{ partition_name }} + FROM + TABLE( + information_schema.external_table_files( + table_name => '{{ source( "bronze_streamline", model) }}' + ) + ) A + ) +SELECT + {{ unique_key }}, + DATA, + _inserted_timestamp, + MD5( + CAST( + COALESCE(CAST({{ unique_key }} AS text), '' :: STRING) AS text + ) + ) AS id, + s.{{ partition_name }}, + s.value AS VALUE +FROM + {{ source( + "bronze_streamline", + model + ) }} + s + JOIN meta b + ON b.file_name = metadata$filename + AND b.{{ partition_name }} = s.{{ partition_name }} +WHERE + b.{{ partition_name }} = s.{{ partition_name }} + AND ( + DATA :error :code IS NULL + OR DATA :error :code NOT IN ( + '-32000', + '-32001', + '-32002', + '-32003', + '-32004', + '-32005', + '-32006', + '-32007', + '-32008', + '-32009', + '-32010' + ) + ) +{% endmacro %} \ No newline at end of file diff --git a/macros/streamline/streamline_udfs.sql b/macros/streamline/streamline_udfs.sql new file mode 100644 index 0000000..823d8b9 --- /dev/null +++ b/macros/streamline/streamline_udfs.sql @@ -0,0 +1,40 @@ +{% macro create_udf_get_chainhead() %} + CREATE EXTERNAL FUNCTION IF NOT EXISTS streamline.udf_get_chainhead() returns variant api_integration = aws_evmos_api AS {% if target.name == "prod" %} + 'https://n0reh6ugbf.execute-api.us-east-1.amazonaws.com/prod/get_chainhead' + {% else %} + 'https://55h4rahr50.execute-api.us-east-1.amazonaws.com/dev/get_chainhead' + {%- endif %}; +{% endmacro %} + +{% macro create_udf_json_rpc() %} + CREATE + OR REPLACE EXTERNAL FUNCTION streamline.udf_json_rpc( + json OBJECT + ) returns ARRAY api_integration = aws_evmos_api AS {% if target.name == "prod" %} + 'https://n0reh6ugbf.execute-api.us-east-1.amazonaws.com/prod/bulk_get_json_rpc' + {% else %} + 'https://55h4rahr50.execute-api.us-east-1.amazonaws.com/dev/bulk_get_json_rpc' + {%- endif %}; +{% endmacro %} + +{% macro create_udf_get_tendermint_transactions() %} + CREATE + OR REPLACE EXTERNAL FUNCTION streamline.bulk_get_tendermint_transactions( + json OBJECT + ) returns ARRAY api_integration = aws_evmos_api AS {% if target.name == "prod" %} + 'https://n0reh6ugbf.execute-api.us-east-1.amazonaws.com/prod/bulk_get_tendermint_transactions' + {% else %} + 'https://55h4rahr50.execute-api.us-east-1.amazonaws.com/dev/bulk_get_tendermint_transactions' + {%- endif %}; +{% endmacro %} + +{% macro create_udf_get_tendermint_validators() %} + CREATE + OR REPLACE EXTERNAL FUNCTION streamline.bulk_get_tendermint_validators( + json OBJECT + ) returns ARRAY api_integration = aws_evmos_api AS {% if target.name == "prod" %} + 'https://n0reh6ugbf.execute-api.us-east-1.amazonaws.com/prod/bulk_get_tendermint_validators' + {% else %} + 'https://55h4rahr50.execute-api.us-east-1.amazonaws.com/dev/bulk_get_tendermint_validators' + {%- endif %}; +{% endmacro %} \ No newline at end of file diff --git a/macros/utils.sql b/macros/utils.sql index 85549f1..0b800b8 100644 --- a/macros/utils.sql +++ b/macros/utils.sql @@ -33,3 +33,46 @@ NULL {% endif %} {% endmacro %} + +{% macro if_data_call_wait() %} + {% if var( + "STREAMLINE_INVOKE_STREAMS" + ) %} + {% set query %} + SELECT + 1 + WHERE + EXISTS( + SELECT + 1 + FROM + {{ model.schema ~ "." ~ model.alias }} + LIMIT + 1 + ) {% endset %} + {% if execute %} + {% set results = run_query( + query + ) %} + {% if results %} + {{ log( + "Waiting...", + info = True + ) }} + + {% set wait_query %} + SELECT + system$wait( + {{ var( + "WAIT", + 600 + ) }} + ) {% endset %} + {% do run_query(wait_query) %} + {% else %} + SELECT + NULL; + {% endif %} + {% endif %} + {% endif %} +{% endmacro %} diff --git a/models/bronze/streamline/bronze__streamline_FR_eth_blocks.sql b/models/bronze/streamline/bronze__streamline_FR_eth_blocks.sql new file mode 100644 index 0000000..c190e9e --- /dev/null +++ b/models/bronze/streamline/bronze__streamline_FR_eth_blocks.sql @@ -0,0 +1,10 @@ +{{ config ( + materialized = 'view' +) }} + +{{ streamline_external_table_FR_query( + "eth_blocks", + partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 4), '_', 1) AS INTEGER )", + partition_name = "_partition_by_block_id", + unique_key = "block_number" +) }} \ No newline at end of file diff --git a/models/bronze/streamline/bronze__streamline_FR_eth_transactions.sql b/models/bronze/streamline/bronze__streamline_FR_eth_transactions.sql new file mode 100644 index 0000000..8c54a36 --- /dev/null +++ b/models/bronze/streamline/bronze__streamline_FR_eth_transactions.sql @@ -0,0 +1,10 @@ +{{ config ( + materialized = 'view' +) }} + +{{ streamline_external_table_FR_query( + "eth_transactions", + partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 4), '_', 1) AS INTEGER )", + partition_name = "_partition_by_block_id", + unique_key = "block_number" +) }} \ No newline at end of file diff --git a/models/bronze/streamline/bronze__streamline_FR_tendermint_blocks.sql b/models/bronze/streamline/bronze__streamline_FR_tendermint_blocks.sql new file mode 100644 index 0000000..c199622 --- /dev/null +++ b/models/bronze/streamline/bronze__streamline_FR_tendermint_blocks.sql @@ -0,0 +1,10 @@ +{{ config ( + materialized = 'view' +) }} + +{{ streamline_external_table_FR_query( + "tendermint_blocks", + partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 4), '_', 1) AS INTEGER )", + partition_name = "_partition_by_block_id", + unique_key = "block_number" +) }} \ No newline at end of file diff --git a/models/bronze/streamline/bronze__streamline_FR_tendermint_transactions.sql b/models/bronze/streamline/bronze__streamline_FR_tendermint_transactions.sql new file mode 100644 index 0000000..85ea40f --- /dev/null +++ b/models/bronze/streamline/bronze__streamline_FR_tendermint_transactions.sql @@ -0,0 +1,10 @@ +{{ config ( + materialized = 'view' +) }} + +{{ streamline_external_table_FR_query( + "tendermint_transactions", + partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 4), '_', 1) AS INTEGER )", + partition_name = "_partition_by_block_id", + unique_key = "block_number" +) }} \ No newline at end of file diff --git a/models/bronze/streamline/bronze__streamline_FR_tendermint_validators.sql b/models/bronze/streamline/bronze__streamline_FR_tendermint_validators.sql new file mode 100644 index 0000000..a186098 --- /dev/null +++ b/models/bronze/streamline/bronze__streamline_FR_tendermint_validators.sql @@ -0,0 +1,10 @@ +{{ config ( + materialized = 'view' +) }} + +{{ streamline_external_table_FR_query( + "tendermint_validators", + partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 4), '_', 1) AS INTEGER )", + partition_name = "_partition_by_block_id", + unique_key = "block_number" +) }} \ No newline at end of file diff --git a/models/bronze/streamline/bronze__streamline_eth_blocks.sql b/models/bronze/streamline/bronze__streamline_eth_blocks.sql new file mode 100644 index 0000000..9de31ba --- /dev/null +++ b/models/bronze/streamline/bronze__streamline_eth_blocks.sql @@ -0,0 +1,12 @@ + +{{ config ( + materialized = 'view' +) }} + +{% set model = this.identifier.split("_") [-1] %} +{{ streamline_external_table_query( + "eth_blocks", + partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 4), '_', 1) AS INTEGER )", + partition_name = "_partition_by_block_id", + unique_key = "block_number" +) }} \ No newline at end of file diff --git a/models/bronze/streamline/bronze__streamline_eth_transactions.sql b/models/bronze/streamline/bronze__streamline_eth_transactions.sql new file mode 100644 index 0000000..927fd0c --- /dev/null +++ b/models/bronze/streamline/bronze__streamline_eth_transactions.sql @@ -0,0 +1,12 @@ + +{{ config ( + materialized = 'view' +) }} + +{% set model = this.identifier.split("_") [-1] %} +{{ streamline_external_table_query( + "eth_transactions", + partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 4), '_', 1) AS INTEGER )", + partition_name = "_partition_by_block_id", + unique_key = "block_number" +) }} \ No newline at end of file diff --git a/models/bronze/streamline/bronze__streamline_tendermint_blocks.sql b/models/bronze/streamline/bronze__streamline_tendermint_blocks.sql new file mode 100644 index 0000000..e21f1f7 --- /dev/null +++ b/models/bronze/streamline/bronze__streamline_tendermint_blocks.sql @@ -0,0 +1,12 @@ + +{{ config ( + materialized = 'view' +) }} + +{% set model = this.identifier.split("_") [-1] %} +{{ streamline_external_table_query( + "tendermint_blocks", + partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 4), '_', 1) AS INTEGER )", + partition_name = "_partition_by_block_id", + unique_key = "block_number" +) }} \ No newline at end of file diff --git a/models/bronze/streamline/bronze__streamline_tendermint_transactions.sql b/models/bronze/streamline/bronze__streamline_tendermint_transactions.sql new file mode 100644 index 0000000..0a66bd1 --- /dev/null +++ b/models/bronze/streamline/bronze__streamline_tendermint_transactions.sql @@ -0,0 +1,12 @@ + +{{ config ( + materialized = 'view' +) }} + +{% set model = this.identifier.split("_") [-1] %} +{{ streamline_external_table_query( + "tendermint_transactions", + partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 4), '_', 1) AS INTEGER )", + partition_name = "_partition_by_block_id", + unique_key = "block_number" +) }} \ No newline at end of file diff --git a/models/bronze/streamline/bronze__streamline_tendermint_validators.sql b/models/bronze/streamline/bronze__streamline_tendermint_validators.sql new file mode 100644 index 0000000..b19ed78 --- /dev/null +++ b/models/bronze/streamline/bronze__streamline_tendermint_validators.sql @@ -0,0 +1,12 @@ + +{{ config ( + materialized = 'view' +) }} + +{% set model = this.identifier.split("_") [-1] %} +{{ streamline_external_table_query( + "tendermint_validators", + partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 4), '_', 1) AS INTEGER )", + partition_name = "_partition_by_block_id", + unique_key = "block_number" +) }} \ No newline at end of file diff --git a/models/sources.yml b/models/sources.yml index cce853d..04155ed 100644 --- a/models/sources.yml +++ b/models/sources.yml @@ -7,7 +7,18 @@ sources: tables: - name: dim_date_hours - name: address_tags + - name: address_labels - name: dim_dates + - name: bronze_streamline + database: streamline + schema: | + {{ "EVMOS_DEV" if var("STREAMLINE_USE_DEV_FOR_EXTERNAL_TABLES", False) else "EVMOS" }} + tables: + - name: eth_blocks + - name: eth_transactions + - name: tendermint_blocks + - name: tendermint_transactions + - name: tendermint_validators - name: address_labels - name: bronze database: evmos diff --git a/models/streamline/complete/streamline__complete_eth_blocks.sql b/models/streamline/complete/streamline__complete_eth_blocks.sql new file mode 100644 index 0000000..8b25bf3 --- /dev/null +++ b/models/streamline/complete/streamline__complete_eth_blocks.sql @@ -0,0 +1,30 @@ +-- depends_on: {{ ref('bronze__streamline_eth_blocks') }} +{{ config ( + materialized = "incremental", + unique_key = "id", + cluster_by = "ROUND(block_number, -3)", + post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION on equality(id)" +) }} + +SELECT + id, + block_number, + _inserted_timestamp +FROM + +{% if is_incremental() %} +{{ ref('bronze__streamline_eth_blocks') }} +WHERE + _inserted_timestamp >= ( + SELECT + MAX(_inserted_timestamp) _inserted_timestamp + FROM + {{ this }} + ) +{% else %} + {{ ref('bronze__streamline_FR_eth_blocks') }} +{% endif %} + +qualify(ROW_NUMBER() over (PARTITION BY id +ORDER BY + _inserted_timestamp DESC)) = 1 diff --git a/models/streamline/complete/streamline__complete_eth_transactions.sql b/models/streamline/complete/streamline__complete_eth_transactions.sql new file mode 100644 index 0000000..95b14f4 --- /dev/null +++ b/models/streamline/complete/streamline__complete_eth_transactions.sql @@ -0,0 +1,30 @@ +-- depends_on: {{ ref('bronze__streamline_eth_transactions') }} +{{ config ( + materialized = "incremental", + unique_key = "id", + cluster_by = "ROUND(block_number, -3)", + post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION on equality(id)" +) }} + +SELECT + id, + block_number, + _inserted_timestamp +FROM + +{% if is_incremental() %} +{{ ref('bronze__streamline_eth_transactions') }} +WHERE + _inserted_timestamp >= ( + SELECT + MAX(_inserted_timestamp) _inserted_timestamp + FROM + {{ this }} + ) +{% else %} + {{ ref('bronze__streamline_FR_eth_transactions') }} +{% endif %} + +qualify(ROW_NUMBER() over (PARTITION BY id +ORDER BY + _inserted_timestamp DESC)) = 1 \ No newline at end of file diff --git a/models/streamline/complete/streamline__complete_tendermint_blocks.sql b/models/streamline/complete/streamline__complete_tendermint_blocks.sql new file mode 100644 index 0000000..f7cc942 --- /dev/null +++ b/models/streamline/complete/streamline__complete_tendermint_blocks.sql @@ -0,0 +1,30 @@ +-- depends_on: {{ ref('bronze__streamline_tendermint_blocks') }} +{{ config ( + materialized = "incremental", + unique_key = "id", + cluster_by = "ROUND(block_number, -3)", + post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION on equality(id)" +) }} + +SELECT + id, + block_number, + _inserted_timestamp +FROM + +{% if is_incremental() %} +{{ ref('bronze__streamline_tendermint_blocks') }} +WHERE + _inserted_timestamp >= ( + SELECT + MAX(_inserted_timestamp) _inserted_timestamp + FROM + {{ this }} + ) +{% else %} + {{ ref('bronze__streamline_FR_tendermint_blocks') }} +{% endif %} + +qualify(ROW_NUMBER() over (PARTITION BY id +ORDER BY + _inserted_timestamp DESC)) = 1 \ No newline at end of file diff --git a/models/streamline/complete/streamline__complete_tendermint_transactions.sql b/models/streamline/complete/streamline__complete_tendermint_transactions.sql new file mode 100644 index 0000000..02d4bde --- /dev/null +++ b/models/streamline/complete/streamline__complete_tendermint_transactions.sql @@ -0,0 +1,30 @@ +-- depends_on: {{ ref('bronze__streamline_tendermint_transactions') }} +{{ config ( + materialized = "incremental", + unique_key = "id", + cluster_by = "ROUND(block_number, -3)", + post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION on equality(id)" +) }} + +SELECT + id, + block_number, + _inserted_timestamp +FROM + +{% if is_incremental() %} +{{ ref('bronze__streamline_tendermint_transactions') }} +WHERE + _inserted_timestamp >= ( + SELECT + MAX(_inserted_timestamp) _inserted_timestamp + FROM + {{ this }} + ) +{% else %} + {{ ref('bronze__streamline_FR_tendermint_transactions') }} +{% endif %} + +qualify(ROW_NUMBER() over (PARTITION BY id +ORDER BY + _inserted_timestamp DESC)) = 1 \ No newline at end of file diff --git a/models/streamline/complete/streamline__complete_tendermint_validators.sql b/models/streamline/complete/streamline__complete_tendermint_validators.sql new file mode 100644 index 0000000..b0697ba --- /dev/null +++ b/models/streamline/complete/streamline__complete_tendermint_validators.sql @@ -0,0 +1,30 @@ +-- depends_on: {{ ref('bronze__streamline_tendermint_validators') }} +{{ config ( + materialized = "incremental", + unique_key = "id", + cluster_by = "ROUND(block_number, -3)", + post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION on equality(id)" +) }} + +SELECT + id, + block_number, + _inserted_timestamp +FROM + +{% if is_incremental() %} +{{ ref('bronze__streamline_tendermint_validators') }} +WHERE + _inserted_timestamp >= ( + SELECT + MAX(_inserted_timestamp) _inserted_timestamp + FROM + {{ this }} + ) +{% else %} + {{ ref('bronze__streamline_FR_tendermint_validators') }} +{% endif %} + +qualify(ROW_NUMBER() over (PARTITION BY id +ORDER BY + _inserted_timestamp DESC)) = 1 \ No newline at end of file diff --git a/models/streamline/realtime/streamline__eth_blocks_realtime.sql b/models/streamline/realtime/streamline__eth_blocks_realtime.sql new file mode 100644 index 0000000..6daf73f --- /dev/null +++ b/models/streamline/realtime/streamline__eth_blocks_realtime.sql @@ -0,0 +1,62 @@ +{{ config ( + materialized = "view", + post_hook = if_data_call_function( + func = "{{this.schema}}.udf_json_rpc(object_construct('url_route','eth_rpc', 'sql_source', '{{this.identifier}}', 'external_table', 'eth_blocks', 'method', 'eth_getBlockByNumber', 'producer_batch_size',1000, 'producer_limit_size', 1000000, 'worker_batch_size',100, 'producer_batch_chunks_size', 1000))", + target = "{{this.schema}}.{{this.identifier}}" + ) +) }} + +WITH last_3_days AS ({% if var('STREAMLINE_RUN_HISTORY') %} + + SELECT + 0 AS block_number + {% else %} + SELECT + MAX(block_number) - 100000 AS block_number --aprox 3 days + FROM + {{ ref("streamline__blocks") }} + {% endif %}), + tbl AS ( + SELECT + block_number, + block_number_hex + FROM + {{ ref("streamline__blocks") }} + WHERE + ( + block_number >= ( + SELECT + block_number + FROM + last_3_days + ) + ) + AND block_number IS NOT NULL + EXCEPT + SELECT + block_number, + REPLACE( + concat_ws('', '0x', to_char(block_number, 'XXXXXXXX')), + ' ', + '' + ) AS block_number_hex + FROM + {{ ref("streamline__complete_eth_blocks") }} + WHERE + block_number >= ( + SELECT + block_number + FROM + last_3_days + ) + ) +SELECT + block_number, + 'eth_getBlockByNumber' AS method, + CONCAT( + block_number_hex, + '_-_', + 'false' + ) AS params +FROM + tbl diff --git a/models/streamline/realtime/streamline__eth_transactions_realtime.sql b/models/streamline/realtime/streamline__eth_transactions_realtime.sql new file mode 100644 index 0000000..60843c2 --- /dev/null +++ b/models/streamline/realtime/streamline__eth_transactions_realtime.sql @@ -0,0 +1,62 @@ +{{ config ( + materialized = "view", + post_hook = if_data_call_function( + func = "{{this.schema}}.udf_json_rpc(object_construct('sql_source', '{{this.identifier}}', 'external_table', 'eth_transactions', 'exploded_key','[\"result\", \"transactions\"]', 'method', 'eth_getBlockByNumber', 'producer_batch_size',1000, 'producer_limit_size', 1000000, 'worker_batch_size',100))", + target = "{{this.schema}}.{{this.identifier}}" + ) +) }} + +WITH last_3_days AS ({% if var('STREAMLINE_RUN_HISTORY') %} + + SELECT + 0 AS block_number + {% else %} + SELECT + MAX(block_number) - 100000 AS block_number --aprox 3 days + FROM + {{ ref("streamline__blocks") }} + {% endif %}), + tbl AS ( + SELECT + block_number, + block_number_hex + FROM + {{ ref("streamline__blocks") }} + WHERE + ( + block_number >= ( + SELECT + block_number + FROM + last_3_days + ) + ) + AND block_number IS NOT NULL + EXCEPT + SELECT + block_number, + REPLACE( + concat_ws('', '0x', to_char(block_number, 'XXXXXXXX')), + ' ', + '' + ) AS block_number_hex + FROM + {{ ref("streamline__complete_eth_transactions") }} + WHERE + block_number >= ( + SELECT + block_number + FROM + last_3_days + ) + ) +SELECT + block_number, + 'eth_getBlockByNumber' AS method, + CONCAT( + block_number_hex, + '_-_', + 'true' + ) AS params +FROM + tbl diff --git a/models/streamline/realtime/streamline__tendermint_blocks_realtime.sql b/models/streamline/realtime/streamline__tendermint_blocks_realtime.sql new file mode 100644 index 0000000..cf3b392 --- /dev/null +++ b/models/streamline/realtime/streamline__tendermint_blocks_realtime.sql @@ -0,0 +1,52 @@ +{{ config ( + materialized = "view", + post_hook = if_data_call_function( + func = "{{this.schema}}.udf_json_rpc(object_construct('url_route','tendermint_rpc', 'sql_source', '{{this.identifier}}', 'external_table', 'tendermint_blocks', 'method', 'eth_getBlockByNumber', 'producer_batch_size',1000, 'producer_limit_size', 1000000, 'worker_batch_size',100, 'producer_batch_chunks_size', 1000))", + target = "{{this.schema}}.{{this.identifier}}" + ) +) }} + +WITH last_3_days AS ({% if var('STREAMLINE_RUN_HISTORY') %} + + SELECT + 0 AS block_number + {% else %} + SELECT + MAX(block_number) - 100000 AS block_number --aprox 3 days + FROM + {{ ref("streamline__blocks") }} + {% endif %}), + tbl AS ( + SELECT + block_number + FROM + {{ ref("streamline__blocks") }} + WHERE + ( + block_number >= ( + SELECT + block_number + FROM + last_3_days + ) + ) + AND block_number IS NOT NULL + EXCEPT + SELECT + block_number + FROM + {{ ref("streamline__complete_tendermint_blocks") }} + WHERE + block_number >= ( + SELECT + block_number + FROM + last_3_days + ) + ) +SELECT + block_number, + 'block' AS method, + block_number :: STRING AS params +FROM + tbl diff --git a/models/streamline/realtime/streamline__tendermint_transactions_realtime.sql b/models/streamline/realtime/streamline__tendermint_transactions_realtime.sql new file mode 100644 index 0000000..174b520 --- /dev/null +++ b/models/streamline/realtime/streamline__tendermint_transactions_realtime.sql @@ -0,0 +1,50 @@ +{{ config ( + materialized = "view", + post_hook = if_data_call_function( + func = "{{this.schema}}.bulk_get_tendermint_transactions(object_construct('sql_source', '{{this.identifier}}'))", + target = "{{this.schema}}.{{this.identifier}}" + ) +) }} + +WITH last_3_days AS ({% if var('STREAMLINE_RUN_HISTORY') %} + + SELECT + 0 AS block_number + {% else %} + SELECT + MAX(block_number) - 100000 AS block_number --aprox 3 days + FROM + {{ ref("streamline__blocks") }} + {% endif %}), + tbl AS ( + SELECT + block_number + FROM + {{ ref("streamline__blocks") }} + WHERE + ( + block_number >= ( + SELECT + block_number + FROM + last_3_days + ) + ) + AND block_number IS NOT NULL + EXCEPT + SELECT + block_number + FROM + {{ ref("streamline__complete_tendermint_transactions") }} + WHERE + block_number >= ( + SELECT + block_number + FROM + last_3_days + ) + ) +SELECT + block_number +FROM + tbl diff --git a/models/streamline/realtime/streamline__tendermint_validators_realtime.sql b/models/streamline/realtime/streamline__tendermint_validators_realtime.sql new file mode 100644 index 0000000..9f53896 --- /dev/null +++ b/models/streamline/realtime/streamline__tendermint_validators_realtime.sql @@ -0,0 +1,50 @@ +{{ config ( + materialized = "view", + post_hook = if_data_call_function( + func = "{{this.schema}}.bulk_get_tendermint_validators(object_construct('sql_source', '{{this.identifier}}'))", + target = "{{this.schema}}.{{this.identifier}}" + ) +) }} + +WITH last_3_days AS ({% if var('STREAMLINE_RUN_HISTORY') %} + + SELECT + 0 AS block_number + {% else %} + SELECT + MAX(block_number) - 100000 AS block_number --aprox 3 days + FROM + {{ ref("streamline__blocks") }} + {% endif %}), + tbl AS ( + SELECT + block_number + FROM + {{ ref("streamline__blocks") }} + WHERE + ( + block_number >= ( + SELECT + block_number + FROM + last_3_days + ) + ) + AND block_number IS NOT NULL + EXCEPT + SELECT + block_number + FROM + {{ ref("streamline__complete_tendermint_validators") }} + WHERE + block_number >= ( + SELECT + block_number + FROM + last_3_days + ) + ) +SELECT + block_number +FROM + tbl diff --git a/models/streamline/streamline__blocks.sql b/models/streamline/streamline__blocks.sql new file mode 100644 index 0000000..d43cee8 --- /dev/null +++ b/models/streamline/streamline__blocks.sql @@ -0,0 +1,21 @@ +{{ config ( + materialized = "view", + tags = ['streamline_view'] +) }} + +{% if execute %} + {% set height = run_query('SELECT streamline.udf_get_chainhead()') %} + {% set block_height = height.columns [0].values() [0] %} +{% else %} + {% set block_height = 0 %} +{% endif %} + +SELECT + height AS block_number, + REPLACE( + concat_ws('', '0x', to_char(block_number, 'XXXXXXXX')), + ' ', + '' + ) AS block_number_hex +FROM + TABLE(streamline.udtf_get_base_table({{ block_height }}))