From 1cfa0d3b5e16b928d12403750a7b6fd30e9fb08d Mon Sep 17 00:00:00 2001 From: Ryan-Loofy <63126328+Ryan-Loofy@users.noreply.github.com> Date: Tue, 29 Aug 2023 12:26:28 -0400 Subject: [PATCH] Sei txs, blocks, tx count pipeline (#2) * Sei txs, blocks, tx count pipeline * Streamline sei pipelines - XXX will be deleted with PROD deployment after approvals - Temp file will be deleted after UDFs are created * Add requirement and update block_number to block_id * Switch to block number * Remove old sources * Update loads and run times * Add prod integration --- ...run_streamline_blocks_txcount_realtime.yml | 45 +++++ ...t_run_streamline_transactions_realtime.yml | 45 +++++ README.md | 14 +- dbt_project.yml | 7 +- macros/create_sps.sql | 10 +- macros/create_udfs.sql | 12 ++ macros/streamline/api_integrations.sql | 21 ++ macros/streamline/get_base_table_udft.sql | 24 +++ macros/streamline/models.sql | 179 ++++++++++++++++++ macros/streamline/streamline_udfs.sql | 20 ++ macros/utils.sql | 78 ++++++++ .../core/bronze__streamline_FR_blocks.sql | 11 ++ .../bronze__streamline_FR_transactions.sql | 11 ++ .../core/bronze__streamline_FR_txcount.sql | 11 ++ .../bronze/core/bronze__streamline_blocks.sql | 11 ++ .../core/bronze__streamline_transactions.sql | 11 ++ .../core/bronze__streamline_txcount.sql | 11 ++ models/silver/silver__temp.sql | 7 + .../silver/streamline/_max_block_by_date.sql | 27 +++ .../complete/streamline__complete_blocks.sql | 31 +++ .../streamline__complete_tx_search.sql | 31 +++ .../complete/streamline__complete_txcount.sql | 31 +++ .../realtime/streamline__blocks_realtime.sql | 35 ++++ .../streamline__tx_search_realtime.sql | 83 ++++++++ .../realtime/streamline__txcount_realtime.sql | 42 ++++ .../streamline/core/streamline__blocks.sql | 17 ++ models/sources.yml | 12 +- requirements.txt | 1 + 28 files changed, 826 insertions(+), 12 deletions(-) create mode 100644 .github/workflows/dbt_run_streamline_blocks_txcount_realtime.yml create mode 100644 .github/workflows/dbt_run_streamline_transactions_realtime.yml create mode 100644 macros/streamline/api_integrations.sql create mode 100644 macros/streamline/get_base_table_udft.sql create mode 100644 macros/streamline/models.sql create mode 100644 macros/streamline/streamline_udfs.sql create mode 100644 macros/utils.sql create mode 100644 models/bronze/core/bronze__streamline_FR_blocks.sql create mode 100644 models/bronze/core/bronze__streamline_FR_transactions.sql create mode 100644 models/bronze/core/bronze__streamline_FR_txcount.sql create mode 100644 models/bronze/core/bronze__streamline_blocks.sql create mode 100644 models/bronze/core/bronze__streamline_transactions.sql create mode 100644 models/bronze/core/bronze__streamline_txcount.sql create mode 100644 models/silver/silver__temp.sql create mode 100644 models/silver/streamline/_max_block_by_date.sql create mode 100644 models/silver/streamline/core/complete/streamline__complete_blocks.sql create mode 100644 models/silver/streamline/core/complete/streamline__complete_tx_search.sql create mode 100644 models/silver/streamline/core/complete/streamline__complete_txcount.sql create mode 100644 models/silver/streamline/core/realtime/streamline__blocks_realtime.sql create mode 100644 models/silver/streamline/core/realtime/streamline__tx_search_realtime.sql create mode 100644 models/silver/streamline/core/realtime/streamline__txcount_realtime.sql create mode 100644 models/silver/streamline/core/streamline__blocks.sql create mode 100644 requirements.txt diff --git a/.github/workflows/dbt_run_streamline_blocks_txcount_realtime.yml b/.github/workflows/dbt_run_streamline_blocks_txcount_realtime.yml new file mode 100644 index 0000000..c5bb48b --- /dev/null +++ b/.github/workflows/dbt_run_streamline_blocks_txcount_realtime.yml @@ -0,0 +1,45 @@ +name: dbt_run_streamline_blocks_txcount_realtime +run-name: dbt_run_streamline_blocks_txcount_realtime + +on: + workflow_dispatch: + schedule: + # Runs "at minute 25 and 55, every hour" (see https://crontab.guru) + - cron: '25,55 * * * *' + +env: + DBT_PROFILES_DIR: ./ + + ACCOUNT: "${{ vars.ACCOUNT }}" + ROLE: "${{ vars.ROLE }}" + USER: "${{ vars.USER }}" + PASSWORD: "${{ secrets.PASSWORD }}" + REGION: "${{ vars.REGION }}" + DATABASE: "${{ vars.DATABASE }}" + WAREHOUSE: "${{ vars.WAREHOUSE }}" + SCHEMA: "${{ vars.SCHEMA }}" + +concurrency: + group: ${{ github.workflow }} + +jobs: + run_dbt_jobs: + runs-on: ubuntu-latest + environment: + name: workflow_prod_backfill + + steps: + - uses: actions/checkout@v3 + + - uses: actions/setup-python@v4 + with: + python-version: "3.10" + cache: "pip" + + - name: install dependencies + run: | + pip install -r requirements.txt + dbt deps + - name: Run DBT Jobs + run: | + dbt run --vars '{"STREAMLINE_INVOKE_STREAMS":True}' -m 1+models/silver/streamline/core/realtime/streamline__txcount_realtime.sql 1+models/silver/streamline/core/realtime/streamline__blocks_realtime.sql diff --git a/.github/workflows/dbt_run_streamline_transactions_realtime.yml b/.github/workflows/dbt_run_streamline_transactions_realtime.yml new file mode 100644 index 0000000..b4b66d1 --- /dev/null +++ b/.github/workflows/dbt_run_streamline_transactions_realtime.yml @@ -0,0 +1,45 @@ +name: dbt_run_streamline_transactions_realtime +run-name: dbt_run_streamline_transactions_realtime + +on: + workflow_dispatch: + schedule: + # Runs "at minute 10 and 40, every hour" (see https://crontab.guru) + - cron: '10,40 * * * *' + +env: + DBT_PROFILES_DIR: ./ + + ACCOUNT: "${{ vars.ACCOUNT }}" + ROLE: "${{ vars.ROLE }}" + USER: "${{ vars.USER }}" + PASSWORD: "${{ secrets.PASSWORD }}" + REGION: "${{ vars.REGION }}" + DATABASE: "${{ vars.DATABASE }}" + WAREHOUSE: "${{ vars.WAREHOUSE }}" + SCHEMA: "${{ vars.SCHEMA }}" + +concurrency: + group: ${{ github.workflow }} + +jobs: + run_dbt_jobs: + runs-on: ubuntu-latest + environment: + name: workflow_prod_backfill + + steps: + - uses: actions/checkout@v3 + + - uses: actions/setup-python@v4 + with: + python-version: "3.10" + cache: "pip" + + - name: install dependencies + run: | + pip install -r requirements.txt + dbt deps + - name: Run DBT Jobs + run: | + dbt run --vars '{"STREAMLINE_INVOKE_STREAMS":True}' -m 1+models/silver/streamline/core/realtime/streamline__tx_search_realtime.sql diff --git a/README.md b/README.md index 7d6c479..86e9f2b 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,6 @@ ## Profile Set Up -#### Use the following within profiles.yml +#### Use the following within profiles.yml ---- ```yml @@ -22,6 +22,18 @@ sei: query_tag: ``` +### Variables + +To control the creation of UDF or SP macros with dbt run: +* UPDATE_UDFS_AND_SPS +When True, executes all macros included in the on-run-start hooks within dbt_project.yml on model run as normal +When False, none of the on-run-start macros are executed on model run + +Default values are False + +* Usage: +dbt run --var '{"UPDATE_UDFS_AND_SPS":True}' -m ... + ### Resources: - Learn more about dbt [in the docs](https://docs.getdbt.com/docs/introduction) - Check out [Discourse](https://discourse.getdbt.com/) for commonly asked questions and answers diff --git a/dbt_project.yml b/dbt_project.yml index b7c6f2b..b9683d8 100644 --- a/dbt_project.yml +++ b/dbt_project.yml @@ -41,8 +41,11 @@ models: vars: "dbt_date:time_zone": GMT - "UPDATE_SNOWFLAKE_TAGS": TRUE - OBSERV_FULL_TEST: FALSE + UPDATE_SNOWFLAKE_TAGS: TRUE + STREAMLINE_INVOKE_STREAMS: False + STREAMLINE_USE_DEV_FOR_EXTERNAL_TABLES: False + UPDATE_UDFS_AND_SPS: False + WAIT: 0 tests: +store_failures: true # all tests diff --git a/macros/create_sps.sql b/macros/create_sps.sql index 55d399b..d730f4e 100644 --- a/macros/create_sps.sql +++ b/macros/create_sps.sql @@ -1,6 +1,8 @@ {% macro create_sps() %} - {% if target.database == 'SEI' %} - CREATE SCHEMA IF NOT EXISTS _internal; - {{ sp_create_prod_clone('_internal') }}; + {% if var("UPDATE_UDFS_AND_SPS") %} + {% if target.database == 'SEI' %} + CREATE schema IF NOT EXISTS _internal; + {{ sp_create_prod_clone('_internal') }}; + {% endif %} {% endif %} -{% endmacro %} \ No newline at end of file +{% endmacro %} diff --git a/macros/create_udfs.sql b/macros/create_udfs.sql index 56fe0ff..3427691 100644 --- a/macros/create_udfs.sql +++ b/macros/create_udfs.sql @@ -1,2 +1,14 @@ {% macro create_udfs() %} + {% if var("UPDATE_UDFS_AND_SPS") %} + {% set sql %} + CREATE schema if NOT EXISTS silver; +{{ create_udtf_get_base_table( + schema = "streamline" + ) }} + {{ create_udf_get_chainhead() }} + {{ create_udf_bulk_json_rpc() }} + + {% endset %} + {% do run_query(sql) %} + {% endif %} {% endmacro %} diff --git a/macros/streamline/api_integrations.sql b/macros/streamline/api_integrations.sql new file mode 100644 index 0000000..bf39a99 --- /dev/null +++ b/macros/streamline/api_integrations.sql @@ -0,0 +1,21 @@ +{% macro create_aws_sei_api() %} + {{ log( + "Creating integration for target:" ~ target + ) }} + + {% if target.name == "prod" %} + {% set sql %} + CREATE api integration IF NOT EXISTS aws_sei_api api_provider = aws_api_gateway api_aws_role_arn = 'arn:aws:iam::490041342817:role/sei-api-prod-rolesnowflakeudfsAF733095-CZPTGU57XEA0' api_allowed_prefixes = ( + 'https://v19hb3dk4k.execute-api.us-east-1.amazonaws.com/prod/' + ) enabled = TRUE; +{% endset %} + {% do run_query(sql) %} + {% elif target.name == "dev" %} + {% set sql %} + CREATE api integration IF NOT EXISTS aws_sei_api_dev api_provider = aws_api_gateway api_aws_role_arn = 'arn:aws:iam::490041342817:role/sei-api-dev-rolesnowflakeudfsAF733095-4GCHXFFK8LJ7' api_allowed_prefixes = ( + 'https://u1hda5gxml.execute-api.us-east-1.amazonaws.com/dev/' + ) enabled = TRUE; +{% endset %} + {% do run_query(sql) %} + {% endif %} +{% endmacro %} diff --git a/macros/streamline/get_base_table_udft.sql b/macros/streamline/get_base_table_udft.sql new file mode 100644 index 0000000..a488d14 --- /dev/null +++ b/macros/streamline/get_base_table_udft.sql @@ -0,0 +1,24 @@ +{% macro create_udtf_get_base_table(schema) %} +create or replace function {{ schema }}.udtf_get_base_table(max_height integer) +returns table (height number) +as +$$ + with base as ( + select + row_number() over ( + order by + seq4() + ) as id + from + table(generator(rowcount => 100000000)) + ) +select + id as height +from + base +where + id <= max_height +$$ +; + +{% endmacro %} \ No newline at end of file diff --git a/macros/streamline/models.sql b/macros/streamline/models.sql new file mode 100644 index 0000000..a0a55a2 --- /dev/null +++ b/macros/streamline/models.sql @@ -0,0 +1,179 @@ +{% macro decode_logs_history( + start, + stop + ) %} + WITH look_back AS ( + SELECT + block_number + FROM + {{ ref("_max_block_by_date") }} + qualify ROW_NUMBER() over ( + ORDER BY + block_number DESC + ) = 1 + ) +SELECT + l.block_number, + l._log_id, + A.abi AS abi, + OBJECT_CONSTRUCT( + 'topics', + l.topics, + 'data', + l.data, + 'address', + l.contract_address + ) AS DATA +FROM + {{ ref("silver__logs") }} + l + INNER JOIN {{ ref("silver__complete_event_abis") }} A + ON A.parent_contract_address = l.contract_address + AND A.event_signature = l.topics [0] :: STRING + AND l.block_number BETWEEN A.start_block + AND A.end_block +WHERE + ( + l.block_number BETWEEN {{ start }} + AND {{ stop }} + ) + AND l.block_number <= ( + SELECT + block_number + FROM + look_back + ) + AND _log_id NOT IN ( + SELECT + _log_id + FROM + {{ ref("streamline__complete_decode_logs") }} + WHERE + ( + block_number BETWEEN {{ start }} + AND {{ stop }} + ) + AND block_number <= ( + SELECT + block_number + FROM + look_back + ) + ) +{% endmacro %} + +{% macro streamline_external_table_query( + model, + partition_function, + partition_name, + unique_key + ) %} + WITH meta AS ( + SELECT + job_created_time AS _inserted_timestamp, + file_name, + {{ partition_function }} AS {{ partition_name }} + FROM + TABLE( + information_schema.external_table_file_registration_history( + start_time => DATEADD('day', -7, CURRENT_TIMESTAMP()), + table_name => '{{ source( "bronze_streamline", model) }}') + ) A + ) + SELECT + {{ unique_key }}, + DATA, + _inserted_timestamp, + MD5( + CAST( + COALESCE(CAST({{ unique_key }} AS text), '' :: STRING) AS text + ) + ) AS id, + s.{{ partition_name }}, + s.value AS VALUE + FROM + {{ source( + "bronze_streamline", + model + ) }} + s + JOIN meta b + ON b.file_name = metadata$filename + AND b.{{ partition_name }} = s.{{ partition_name }} + WHERE + b.{{ partition_name }} = s.{{ partition_name }} + AND ( + DATA :error :code IS NULL + OR DATA :error :code NOT IN ( + '-32000', + '-32001', + '-32002', + '-32003', + '-32004', + '-32005', + '-32006', + '-32007', + '-32008', + '-32009', + '-32010' + ) + ) +{% endmacro %} + +{% macro streamline_external_table_FR_query( + model, + partition_function, + partition_name, + unique_key + ) %} + WITH meta AS ( + SELECT + registered_on AS _inserted_timestamp, + file_name, + {{ partition_function }} AS {{ partition_name }} + FROM + TABLE( + information_schema.external_table_files( + table_name => '{{ source( "bronze_streamline", model) }}' + ) + ) A + ) +SELECT + {{ unique_key }}, + DATA, + _inserted_timestamp, + MD5( + CAST( + COALESCE(CAST({{ unique_key }} AS text), '' :: STRING) AS text + ) + ) AS id, + s.{{ partition_name }}, + s.value AS VALUE +FROM + {{ source( + "bronze_streamline", + model + ) }} + s + JOIN meta b + ON b.file_name = metadata$filename + AND b.{{ partition_name }} = s.{{ partition_name }} +WHERE + b.{{ partition_name }} = s.{{ partition_name }} + AND ( + DATA :error :code IS NULL + OR DATA :error :code NOT IN ( + '-32000', + '-32001', + '-32002', + '-32003', + '-32004', + '-32005', + '-32006', + '-32007', + '-32008', + '-32009', + '-32010' + ) + ) +{% endmacro %} diff --git a/macros/streamline/streamline_udfs.sql b/macros/streamline/streamline_udfs.sql new file mode 100644 index 0000000..29fb97d --- /dev/null +++ b/macros/streamline/streamline_udfs.sql @@ -0,0 +1,20 @@ +{% macro create_udf_get_chainhead() %} + CREATE + OR REPLACE EXTERNAL FUNCTION streamline.udf_get_chainhead() returns variant api_integration = + {% if target.name == "prod" %} + aws_sei_api AS 'https://v19hb3dk4k.execute-api.us-east-1.amazonaws.com/prod/get_chainhead' + {% else %} + aws_sei_api_dev AS 'https://u1hda5gxml.execute-api.us-east-1.amazonaws.com/dev/get_chainhead' + {%- endif %}; +{% endmacro %} + +{% macro create_udf_bulk_json_rpc() %} + CREATE + OR REPLACE EXTERNAL FUNCTION streamline.udf_bulk_json_rpc( + json variant + ) returns text api_integration = {% if target.name == "prod" %} + aws_sei_api AS 'https://v19hb3dk4k.execute-api.us-east-1.amazonaws.com/prod/udf_bulk_json_rpc' + {% else %} + aws_sei_api_dev AS 'https://u1hda5gxml.execute-api.us-east-1.amazonaws.com/dev/udf_bulk_json_rpc' + {%- endif %}; +{% endmacro %} \ No newline at end of file diff --git a/macros/utils.sql b/macros/utils.sql new file mode 100644 index 0000000..0b800b8 --- /dev/null +++ b/macros/utils.sql @@ -0,0 +1,78 @@ +{% macro if_data_call_function( + func, + target + ) %} + {% if var( + "STREAMLINE_INVOKE_STREAMS" + ) %} + {% if execute %} + {{ log( + "Running macro `if_data_call_function`: Calling udf " ~ func ~ " on " ~ target, + True + ) }} + {% endif %} + SELECT + {{ func }} + WHERE + EXISTS( + SELECT + 1 + FROM + {{ target }} + LIMIT + 1 + ) + {% else %} + {% if execute %} + {{ log( + "Running macro `if_data_call_function`: NOOP", + False + ) }} + {% endif %} + SELECT + NULL + {% endif %} +{% endmacro %} + +{% macro if_data_call_wait() %} + {% if var( + "STREAMLINE_INVOKE_STREAMS" + ) %} + {% set query %} + SELECT + 1 + WHERE + EXISTS( + SELECT + 1 + FROM + {{ model.schema ~ "." ~ model.alias }} + LIMIT + 1 + ) {% endset %} + {% if execute %} + {% set results = run_query( + query + ) %} + {% if results %} + {{ log( + "Waiting...", + info = True + ) }} + + {% set wait_query %} + SELECT + system$wait( + {{ var( + "WAIT", + 600 + ) }} + ) {% endset %} + {% do run_query(wait_query) %} + {% else %} + SELECT + NULL; + {% endif %} + {% endif %} + {% endif %} +{% endmacro %} diff --git a/models/bronze/core/bronze__streamline_FR_blocks.sql b/models/bronze/core/bronze__streamline_FR_blocks.sql new file mode 100644 index 0000000..a430319 --- /dev/null +++ b/models/bronze/core/bronze__streamline_FR_blocks.sql @@ -0,0 +1,11 @@ +{{ config ( + materialized = 'view' +) }} + +{% set model = this.identifier.split("_") [-1] %} +{{ streamline_external_table_FR_query( + model, + partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 3), '_', 1) AS INTEGER)", + partition_name = "_partition_by_block_id", + unique_key = "block_number" +) }} diff --git a/models/bronze/core/bronze__streamline_FR_transactions.sql b/models/bronze/core/bronze__streamline_FR_transactions.sql new file mode 100644 index 0000000..796b55c --- /dev/null +++ b/models/bronze/core/bronze__streamline_FR_transactions.sql @@ -0,0 +1,11 @@ +{{ config ( + materialized = 'view' +) }} + +{% set model = this.identifier.split("_") [-1] %} +{{ streamline_external_table_FR_query( + model, + partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 3), '_', 1) AS INTEGER )", + partition_name = "_partition_by_block_id", + unique_key = "block_number" +) }} diff --git a/models/bronze/core/bronze__streamline_FR_txcount.sql b/models/bronze/core/bronze__streamline_FR_txcount.sql new file mode 100644 index 0000000..796b55c --- /dev/null +++ b/models/bronze/core/bronze__streamline_FR_txcount.sql @@ -0,0 +1,11 @@ +{{ config ( + materialized = 'view' +) }} + +{% set model = this.identifier.split("_") [-1] %} +{{ streamline_external_table_FR_query( + model, + partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 3), '_', 1) AS INTEGER )", + partition_name = "_partition_by_block_id", + unique_key = "block_number" +) }} diff --git a/models/bronze/core/bronze__streamline_blocks.sql b/models/bronze/core/bronze__streamline_blocks.sql new file mode 100644 index 0000000..22b0c51 --- /dev/null +++ b/models/bronze/core/bronze__streamline_blocks.sql @@ -0,0 +1,11 @@ +{{ config ( + materialized = 'view' +) }} + +{% set model = this.identifier.split("_") [-1] %} +{{ streamline_external_table_query( + model, + partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 3), '_', 1) AS INTEGER)", + partition_name = "_partition_by_block_id", + unique_key = "block_number" +) }} diff --git a/models/bronze/core/bronze__streamline_transactions.sql b/models/bronze/core/bronze__streamline_transactions.sql new file mode 100644 index 0000000..da8e375 --- /dev/null +++ b/models/bronze/core/bronze__streamline_transactions.sql @@ -0,0 +1,11 @@ +{{ config ( + materialized = 'view' +) }} + +{% set model = this.identifier.split("_") [-1] %} +{{ streamline_external_table_query( + model, + partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 3), '_', 1) AS INTEGER )", + partition_name = "_partition_by_block_id", + unique_key = "block_number" +) }} diff --git a/models/bronze/core/bronze__streamline_txcount.sql b/models/bronze/core/bronze__streamline_txcount.sql new file mode 100644 index 0000000..da8e375 --- /dev/null +++ b/models/bronze/core/bronze__streamline_txcount.sql @@ -0,0 +1,11 @@ +{{ config ( + materialized = 'view' +) }} + +{% set model = this.identifier.split("_") [-1] %} +{{ streamline_external_table_query( + model, + partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 3), '_', 1) AS INTEGER )", + partition_name = "_partition_by_block_id", + unique_key = "block_number" +) }} diff --git a/models/silver/silver__temp.sql b/models/silver/silver__temp.sql new file mode 100644 index 0000000..71738da --- /dev/null +++ b/models/silver/silver__temp.sql @@ -0,0 +1,7 @@ +{{ config ( + materialized = "view", + tags = ['streamline_view'] +) }} + +SELECT + 1 AS temp \ No newline at end of file diff --git a/models/silver/streamline/_max_block_by_date.sql b/models/silver/streamline/_max_block_by_date.sql new file mode 100644 index 0000000..85113c9 --- /dev/null +++ b/models/silver/streamline/_max_block_by_date.sql @@ -0,0 +1,27 @@ +{{ config ( + materialized = "ephemeral", + unique_key = "block_id", +) }} + +WITH base AS ( + + SELECT + block_timestamp :: DATE AS block_date, + MAX(block_id) as block_number + FROM + {{ ref("silver__blocks") }} + GROUP BY + block_timestamp :: DATE +) +SELECT + block_date, + block_number +FROM + base +WHERE + block_date <> ( + SELECT + MAX(block_date) + FROM + base + ) \ No newline at end of file diff --git a/models/silver/streamline/core/complete/streamline__complete_blocks.sql b/models/silver/streamline/core/complete/streamline__complete_blocks.sql new file mode 100644 index 0000000..f73a1e9 --- /dev/null +++ b/models/silver/streamline/core/complete/streamline__complete_blocks.sql @@ -0,0 +1,31 @@ +-- depends_on: {{ ref('bronze__streamline_blocks') }} +{{ config ( + materialized = "incremental", + unique_key = "id", + cluster_by = "ROUND(block_number, -3)", + merge_update_columns = ["id"], + post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION on equality(id)" +) }} + +SELECT + id, + block_number, + _inserted_timestamp +FROM + +{% if is_incremental() %} +{{ ref('bronze__streamline_blocks') }} +WHERE + _inserted_timestamp >= ( + SELECT + MAX(_inserted_timestamp) _inserted_timestamp + FROM + {{ this }} + ) +{% else %} + {{ ref('bronze__streamline_FR_blocks') }} +{% endif %} + +qualify(ROW_NUMBER() over (PARTITION BY id +ORDER BY + _inserted_timestamp DESC)) = 1 \ No newline at end of file diff --git a/models/silver/streamline/core/complete/streamline__complete_tx_search.sql b/models/silver/streamline/core/complete/streamline__complete_tx_search.sql new file mode 100644 index 0000000..6b71d22 --- /dev/null +++ b/models/silver/streamline/core/complete/streamline__complete_tx_search.sql @@ -0,0 +1,31 @@ +-- depends_on: {{ ref('bronze__streamline_transactions') }} +{{ config ( + materialized = "incremental", + unique_key = "id", + cluster_by = "ROUND(block_number, -3)", + merge_update_columns = ["id"], + post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION on equality(id)" +) }} + +SELECT + id, + block_number, + _inserted_timestamp +FROM + +{% if is_incremental() %} +{{ ref('bronze__streamline_transactions') }} +WHERE + _inserted_timestamp >= ( + SELECT + MAX(_inserted_timestamp) _inserted_timestamp + FROM + {{ this }} + ) +{% else %} + {{ ref('bronze__streamline_FR_transactions') }} +{% endif %} + +qualify(ROW_NUMBER() over (PARTITION BY id +ORDER BY + _inserted_timestamp DESC)) = 1 \ No newline at end of file diff --git a/models/silver/streamline/core/complete/streamline__complete_txcount.sql b/models/silver/streamline/core/complete/streamline__complete_txcount.sql new file mode 100644 index 0000000..2214e5e --- /dev/null +++ b/models/silver/streamline/core/complete/streamline__complete_txcount.sql @@ -0,0 +1,31 @@ +-- depends_on: {{ ref('bronze__streamline_txcount') }} +{{ config ( + materialized = "incremental", + unique_key = "id", + cluster_by = "ROUND(block_number, -3)", + merge_update_columns = ["id"], + post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION on equality(id)" +) }} + +SELECT + id, + block_number, + _inserted_timestamp +FROM + +{% if is_incremental() %} +{{ ref('bronze__streamline_txcount') }} +WHERE + _inserted_timestamp >= ( + SELECT + MAX(_inserted_timestamp) _inserted_timestamp + FROM + {{ this }} + ) +{% else %} + {{ ref('bronze__streamline_FR_txcount') }} +{% endif %} + +qualify(ROW_NUMBER() over (PARTITION BY id +ORDER BY + _inserted_timestamp DESC)) = 1 \ No newline at end of file diff --git a/models/silver/streamline/core/realtime/streamline__blocks_realtime.sql b/models/silver/streamline/core/realtime/streamline__blocks_realtime.sql new file mode 100644 index 0000000..aa5ffbb --- /dev/null +++ b/models/silver/streamline/core/realtime/streamline__blocks_realtime.sql @@ -0,0 +1,35 @@ +{{ config ( + materialized = "view", + post_hook = if_data_call_function( + func = "{{this.schema}}.udf_bulk_json_rpc(object_construct('sql_source', '{{this.identifier}}', 'external_table', 'blocks', 'sql_limit', {{var('sql_limit','300000')}}, 'producer_batch_size', {{var('producer_batch_size','300000')}}, 'worker_batch_size', {{var('worker_batch_size','50000')}}, 'batch_call_limit', {{var('batch_call_limit','100')}}, 'call_type', 'batch'))", + target = "{{this.schema}}.{{this.identifier}}" + ) +) }} + +WITH blocks AS ( + + SELECT + block_number + FROM + {{ ref("streamline__blocks") }} + EXCEPT + SELECT + block_number + FROM + {{ ref("streamline__complete_blocks") }} +) +SELECT + PARSE_JSON( + CONCAT( + '{"jsonrpc": "2.0",', + '"method": "block", "params":["', + block_number :: STRING, + '"],"id":"', + block_number :: STRING, + '"}' + ) + ) AS request +FROM + blocks +ORDER BY + block_number ASC diff --git a/models/silver/streamline/core/realtime/streamline__tx_search_realtime.sql b/models/silver/streamline/core/realtime/streamline__tx_search_realtime.sql new file mode 100644 index 0000000..44be0fe --- /dev/null +++ b/models/silver/streamline/core/realtime/streamline__tx_search_realtime.sql @@ -0,0 +1,83 @@ +{{ config ( + materialized = "view", + post_hook = if_data_call_function( + func = "{{this.schema}}.udf_bulk_json_rpc(object_construct('sql_source', '{{this.identifier}}', 'external_table', 'transactions', 'sql_limit', {{var('sql_limit','6000000')}}, 'producer_batch_size', {{var('producer_batch_size','3000000')}}, 'worker_batch_size', {{var('worker_batch_size','15000')}}, 'batch_call_limit', {{var('batch_call_limit','10')}}, 'exploded_key', '[\"result\", \"txs\"]', 'call_type', 'batch'))", + target = "{{this.schema}}.{{this.identifier}}" + ) +) }} + +WITH blocks AS ( + + SELECT + block_number + FROM + {{ ref("streamline__complete_txcount") }} + EXCEPT + SELECT + block_number + FROM + {{ ref("streamline__complete_tx_search") }} +), +transactions_counts_by_block AS ( + SELECT + tc.block_number, + tc.data :: INTEGER AS txcount + FROM + {{ ref("bronze__streamline_txcount") }} + tc + INNER JOIN blocks b + ON tc.block_number = b.block_number +), +numbers AS ( + -- Recursive CTE to generate numbers. We'll use the maximum txcount value to limit our recursion. + SELECT + 1 AS n + UNION ALL + SELECT + n + 1 + FROM + numbers + WHERE + n < ( + SELECT + CEIL(MAX(txcount) / 100.0) + FROM + transactions_counts_by_block) + ), + blocks_with_page_numbers AS ( + SELECT + tt.block_number AS block_number, + n.n AS page_number + FROM + transactions_counts_by_block tt + JOIN numbers n + ON n.n <= CASE + WHEN tt.txcount % 100 = 0 THEN tt.txcount / 100 + ELSE FLOOR( + tt.txcount / 100 + ) + 1 + END + ) + SELECT + PARSE_JSON( + CONCAT( + '{"jsonrpc": "2.0",', + '"method": "tx_search", "params":["', + 'tx.height=', + block_number :: STRING, + '",', + TRUE, + ',"', + page_number :: STRING, + '",', + '"100",', + '"asc"', + '],"id":"', + block_number :: STRING, + '"}' + ) + ) AS request + FROM + blocks_with_page_numbers + ORDER BY + block_number ASC diff --git a/models/silver/streamline/core/realtime/streamline__txcount_realtime.sql b/models/silver/streamline/core/realtime/streamline__txcount_realtime.sql new file mode 100644 index 0000000..0d4ff10 --- /dev/null +++ b/models/silver/streamline/core/realtime/streamline__txcount_realtime.sql @@ -0,0 +1,42 @@ +{{ config ( + materialized = "view", + post_hook = if_data_call_function( + func = "{{this.schema}}.udf_bulk_json_rpc(object_construct('sql_source', '{{this.identifier}}', 'external_table', 'txcount', 'sql_limit', {{var('sql_limit','300000')}}, 'producer_batch_size', {{var('producer_batch_size','300000')}}, 'worker_batch_size', {{var('worker_batch_size','50000')}}, 'batch_call_limit', {{var('batch_call_limit','100')}}, 'exploded_key', '[\"result\", \"total_count\"]', 'call_type', 'batch'))", + target = "{{this.schema}}.{{this.identifier}}" + ) +) }} + +WITH blocks AS ( + + SELECT + block_number + FROM + {{ ref("streamline__blocks") }} + EXCEPT + SELECT + block_number + FROM + {{ ref("streamline__complete_txcount") }} +) +SELECT + PARSE_JSON( + CONCAT( + '{"jsonrpc": "2.0",', + '"method": "tx_search", "params":["', + 'tx.height=', + block_number :: STRING, + '",', + TRUE, + ',', + '"1",', + '"1",', + '"asc"', + '],"id":"', + block_number :: STRING, + '"}' + ) + ) AS request +FROM + blocks +ORDER BY + block_number ASC diff --git a/models/silver/streamline/core/streamline__blocks.sql b/models/silver/streamline/core/streamline__blocks.sql new file mode 100644 index 0000000..da743e6 --- /dev/null +++ b/models/silver/streamline/core/streamline__blocks.sql @@ -0,0 +1,17 @@ +{{ config ( + materialized = "view", + tags = ['streamline_view'] +) }} + + +{% if execute %} +{% set height = run_query('SELECT streamline.udf_get_chainhead()') %} +{% set block_height = height.columns[0].values()[0] %} +{% else %} +{% set block_height = 0 %} +{% endif %} + +SELECT + height as block_number +FROM + TABLE(streamline.udtf_get_base_table({{block_height}})) \ No newline at end of file diff --git a/models/sources.yml b/models/sources.yml index 3e562e4..edb36ec 100644 --- a/models/sources.yml +++ b/models/sources.yml @@ -18,7 +18,7 @@ sources: - name: lq_txs_101 - name: bronze_api schema: bronze_api - tables: + tables: - name: blockchain - name: crosschain_silver database: "{{ 'crosschain' if target.database == 'SEI' else 'crosschain_dev' }}" @@ -29,12 +29,14 @@ sources: - name: hourly_prices_coin_market_cap - name: hourly_prices_coin_gecko - name: number_sequence - - name: streamline + - name: bronze_streamline database: streamline - schema: "{{ 'SEI' if target.database == 'SEI' else 'SEI_DEV' }}" + schema: | + {{ "SEI_DEV" if var("STREAMLINE_USE_DEV_FOR_EXTERNAL_TABLES", False) else "SEI" }} tables: - name: blocks - - name: tx_search + - name: transactions + - name: txcount - name: bronze schema: bronze tables: @@ -44,4 +46,4 @@ sources: schema: silver tables: - name: link_events - - name: transfers \ No newline at end of file + - name: transfers diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..ec44b06 --- /dev/null +++ b/requirements.txt @@ -0,0 +1 @@ +dbt-snowflake>=1.4,<1.5 \ No newline at end of file