diff --git a/.github/workflows/dbt_run_streamline_blocks.yml b/.github/workflows/dbt_run_streamline_blocks_tx_counts.yml similarity index 87% rename from .github/workflows/dbt_run_streamline_blocks.yml rename to .github/workflows/dbt_run_streamline_blocks_tx_counts.yml index ded871a..3697ed2 100644 --- a/.github/workflows/dbt_run_streamline_blocks.yml +++ b/.github/workflows/dbt_run_streamline_blocks_tx_counts.yml @@ -43,5 +43,5 @@ jobs: dbt deps - name: Run DBT Jobs run: | - dbt run --vars '{"STREAMLINE_INVOKE_STREAMS":True}' -m 1+models/streamline/streamline__blocks_realtime.sql || true + dbt run --vars '{"STREAMLINE_INVOKE_STREAMS":True}' -m 1+models/streamline/realtime/streamline__blocks_realtime.sql 1+models/streamline/realtime/streamline__tx_counts_realtime.sql || true workstream report --exit-nonzero \ No newline at end of file diff --git a/.github/workflows/dbt_run_streamline_transactions.yml b/.github/workflows/dbt_run_streamline_transactions.yml index 332322f..c17d38f 100644 --- a/.github/workflows/dbt_run_streamline_transactions.yml +++ b/.github/workflows/dbt_run_streamline_transactions.yml @@ -43,5 +43,5 @@ jobs: dbt deps - name: Run DBT Jobs run: | - dbt run --vars '{"STREAMLINE_INVOKE_STREAMS":True}' -m 1+models/streamline/streamline__txs_realtime.sql || true + dbt run --vars '{"STREAMLINE_INVOKE_STREAMS":True}' -m 1+models/streamline/realtime/streamline__transactions_realtime.sql || true workstream report --exit-nonzero diff --git a/.github/workflows/dbt_run_streamline_validators.yml b/.github/workflows/dbt_run_streamline_validators.yml deleted file mode 100644 index d2dc64e..0000000 --- a/.github/workflows/dbt_run_streamline_validators.yml +++ /dev/null @@ -1,49 +0,0 @@ -name: dbt_run_streamline_validators -run-name: dbt_run_streamline_validators - -on: - push: - branches: - # - main - - turn-off-dev-turn-on-prod - schedule: - # Runs "every 6 hours" (see https://crontab.guru) - - cron: '0 0,12,23 * * *' - -env: - USE_VARS: "${{ vars.USE_VARS }}" - DBT_PROFILES_DIR: "${{ vars.DBT_PROFILES_DIR }}" - DBT_VERSION: "${{ vars.DBT_VERSION }}" - ACCOUNT: "${{ vars.ACCOUNT }}" - ROLE: "${{ vars.ROLE }}" - USER: "${{ vars.USER }}" - PASSWORD: "${{ secrets.PASSWORD }}" - REGION: "${{ vars.REGION }}" - DATABASE: "${{ vars.DATABASE }}" - WAREHOUSE: "${{ vars.WAREHOUSE }}" - SCHEMA: "${{ vars.SCHEMA }}" - -concurrency: - group: ${{ github.workflow }} - -jobs: - run_dbt_jobs: - runs-on: ubuntu-latest - environment: - name: workflow_prod - - steps: - - uses: actions/checkout@v3 - - - uses: actions/setup-python@v4 - with: - python-version: "${{ vars.PYTHON_VERSION }}" - cache: "pip" - - - name: install dependencies - run: | - pip3 install dbt-snowflake==${{ vars.DBT_VERSION }} cli_passthrough requests click - dbt deps - - name: Run DBT Jobs - run: | - dbt run --full-refresh -x -m 1+models/streamline/streamline__validators_realtime.sql diff --git a/macros/streamline/streamline_udfs.sql b/macros/streamline/streamline_udfs.sql index f65c082..4ec0449 100644 --- a/macros/streamline/streamline_udfs.sql +++ b/macros/streamline/streamline_udfs.sql @@ -36,16 +36,6 @@ {%- endif %}; {% endmacro %} -{% macro create_udf_bulk_json_rpc() %} - CREATE - OR REPLACE EXTERNAL FUNCTION streamline.udf_bulk_json_rpc( - json variant - ) returns text api_integration = {% if target.name == "prod" %} - aws_axelar_api AS '' - {% else %} - aws_axelar_api_dev AS 'https://q8knm7tyk5.execute-api.us-east-1.amazonaws.com/dev/udf_bulk_json_rpc' - {%- endif %}; -{% endmacro %} {% macro create_udf_rest_api() %} CREATE diff --git a/models/bronze/bronze__blocks.sql b/models/bronze/core/bronze__blocks.sql similarity index 87% rename from models/bronze/bronze__blocks.sql rename to models/bronze/core/bronze__blocks.sql index 4a83d1e..a49d6d8 100644 --- a/models/bronze/bronze__blocks.sql +++ b/models/bronze/core/bronze__blocks.sql @@ -10,7 +10,7 @@ SELECT VALUE, _partition_by_block_id, - block_number AS block_id, + DATA :result :block :header :height :: INT AS block_id, metadata, DATA, TO_TIMESTAMP( @@ -36,6 +36,6 @@ WHERE ) {% endif %} -qualify(ROW_NUMBER() over (PARTITION BY block_number +qualify(ROW_NUMBER() over (PARTITION BY block_id ORDER BY _inserted_timestamp DESC)) = 1 diff --git a/models/bronze/bronze__transactions.sql b/models/bronze/core/bronze__transactions.sql similarity index 82% rename from models/bronze/bronze__transactions.sql rename to models/bronze/core/bronze__transactions.sql index ff6ca7c..27c4d57 100644 --- a/models/bronze/bronze__transactions.sql +++ b/models/bronze/core/bronze__transactions.sql @@ -10,9 +10,12 @@ SELECT VALUE, _partition_by_block_id, - block_number AS block_id, + DATA :height :: INT AS block_id, REPLACE( - metadata :request :params [0], + COALESCE( + metadata :request :data :params [0], + metadata :request :params [0] + ), 'tx.height=' ) :: INT AS block_id_requested, metadata, @@ -21,7 +24,7 @@ SELECT DATA :tx_result AS tx_result, file_name, TO_TIMESTAMP( - m._inserted_timestamp + _inserted_timestamp ) AS _inserted_timestamp FROM @@ -31,8 +34,6 @@ FROM {{ ref('bronze__streamline_FR_transactions') }} {% endif %} -m - {% if is_incremental() %} WHERE _inserted_timestamp >= ( diff --git a/models/bronze/streamline/bronze__streamline_FR_blocks.sql b/models/bronze/streamline/bronze__streamline_FR_blocks.sql index d066fc5..cb56d4d 100644 --- a/models/bronze/streamline/bronze__streamline_FR_blocks.sql +++ b/models/bronze/streamline/bronze__streamline_FR_blocks.sql @@ -1,9 +1,27 @@ {{ config ( materialized = 'view' ) }} -{{ streamline_external_table_FR_query( - "blocks", - partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 3), '_', 1) AS INTEGER )", - partition_name = "_partition_by_block_id", - unique_key = "block_number" -) }} + +SELECT + partition_key, + DATA, + _INSERTED_TIMESTAMP, + id, + metadata, + file_name, + _PARTITION_BY_BLOCK_ID, + VALUE +FROM + {{ ref('bronze__streamline_FR_transactions_v2') }} +UNION ALL +SELECT + block_number, + DATA, + _INSERTED_TIMESTAMP, + id, + metadata, + file_name, + _PARTITION_BY_BLOCK_ID, + VALUE +FROM + {{ ref('bronze__streamline_FR_transactions_v1') }} diff --git a/models/streamline/streamline__blocks_history_FR.sql b/models/bronze/streamline/bronze__streamline_FR_blocks_v1.sql similarity index 81% rename from models/streamline/streamline__blocks_history_FR.sql rename to models/bronze/streamline/bronze__streamline_FR_blocks_v1.sql index 4a96db7..d066fc5 100644 --- a/models/streamline/streamline__blocks_history_FR.sql +++ b/models/bronze/streamline/bronze__streamline_FR_blocks_v1.sql @@ -2,8 +2,8 @@ materialized = 'view' ) }} {{ streamline_external_table_FR_query( - model = "blocks", - partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 3), '_', 1) AS INTEGER)", + "blocks", + partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 3), '_', 1) AS INTEGER )", partition_name = "_partition_by_block_id", unique_key = "block_number" ) }} diff --git a/models/streamline/streamline__txs_history_FR.sql b/models/bronze/streamline/bronze__streamline_FR_blocks_v2.sql similarity index 68% rename from models/streamline/streamline__txs_history_FR.sql rename to models/bronze/streamline/bronze__streamline_FR_blocks_v2.sql index 0abc617..4fc7e31 100644 --- a/models/streamline/streamline__txs_history_FR.sql +++ b/models/bronze/streamline/bronze__streamline_FR_blocks_v2.sql @@ -2,8 +2,8 @@ materialized = 'view' ) }} {{ streamline_external_table_FR_query( - model = "txs_details", - partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 3), '_', 1) AS INTEGER)", + "blocks_v2", + partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 3), '_', 1) AS INTEGER )", partition_name = "_partition_by_block_id", - unique_key = "block_number" + unique_key = "partition_key" ) }} diff --git a/models/bronze/streamline/bronze__streamline_FR_transactions.sql b/models/bronze/streamline/bronze__streamline_FR_transactions.sql index a031a07..bdee4e2 100644 --- a/models/bronze/streamline/bronze__streamline_FR_transactions.sql +++ b/models/bronze/streamline/bronze__streamline_FR_transactions.sql @@ -2,8 +2,8 @@ materialized = 'view' ) }} {{ streamline_external_table_FR_query( - "txs", + "txs_v2", partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 3), '_', 1) AS INTEGER )", partition_name = "_partition_by_block_id", - unique_key = "block_number" + unique_key = "partition_key" ) }} diff --git a/models/streamline/streamline__validators_history_FR.sql b/models/bronze/streamline/bronze__streamline_FR_transactions_v1.sql similarity index 80% rename from models/streamline/streamline__validators_history_FR.sql rename to models/bronze/streamline/bronze__streamline_FR_transactions_v1.sql index 62fe1ab..06a8d7e 100644 --- a/models/streamline/streamline__validators_history_FR.sql +++ b/models/bronze/streamline/bronze__streamline_FR_transactions_v1.sql @@ -2,8 +2,8 @@ materialized = 'view' ) }} {{ streamline_external_table_FR_query( - model = "validators", - partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 3), '_', 1) AS INTEGER)", + "txs_details", + partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 3), '_', 1) AS INTEGER )", partition_name = "_partition_by_block_id", unique_key = "block_number" ) }} diff --git a/models/bronze/streamline/bronze__streamline_FR_transactions_v2.sql b/models/bronze/streamline/bronze__streamline_FR_transactions_v2.sql new file mode 100644 index 0000000..bdee4e2 --- /dev/null +++ b/models/bronze/streamline/bronze__streamline_FR_transactions_v2.sql @@ -0,0 +1,9 @@ +{{ config ( + materialized = 'view' +) }} +{{ streamline_external_table_FR_query( + "txs_v2", + partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 3), '_', 1) AS INTEGER )", + partition_name = "_partition_by_block_id", + unique_key = "partition_key" +) }} diff --git a/models/streamline/complete/streamline__complete_blocks.sql b/models/streamline/complete/streamline__complete_blocks.sql index 8a203eb..22f1c86 100644 --- a/models/streamline/complete/streamline__complete_blocks.sql +++ b/models/streamline/complete/streamline__complete_blocks.sql @@ -1,23 +1,34 @@ -- depends_on: {{ ref('bronze__streamline_blocks') }} {{ config ( materialized = "incremental", - unique_key = "id", + unique_key = 'block_number', cluster_by = "ROUND(block_number, -3)", - merge_update_columns = ["id"], - post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION on equality(id)" + post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION on equality(block_number)" ) }} SELECT - id, - block_number, - _inserted_timestamp + DATA :result :block :header :height :: INT AS block_number, + {{ dbt_utils.generate_surrogate_key( + ['block_number'] + ) }} AS complete_blocks_id, + SYSDATE() AS inserted_timestamp, + SYSDATE() AS modified_timestamp, + _inserted_timestamp, + '{{ invocation_id }}' AS _invocation_id FROM - {{ ref('bronze__streamline_blocks') }} + +{% if is_incremental() %} +{{ ref('bronze__streamline_blocks') }} WHERE _inserted_timestamp >= ( SELECT COALESCE(MAX(_INSERTED_TIMESTAMP), '1970-01-01' :: DATE) max_INSERTED_TIMESTAMP FROM - {{ this }}) qualify(ROW_NUMBER() over (PARTITION BY id + {{ this }}) + {% else %} + {{ ref('bronze__streamline_FR_blocks') }} + {% endif %} + + qualify(ROW_NUMBER() over (PARTITION BY block_number ORDER BY _inserted_timestamp DESC)) = 1 diff --git a/models/streamline/complete/streamline__complete_transactions.sql b/models/streamline/complete/streamline__complete_transactions.sql index 2625bf2..2a1521e 100644 --- a/models/streamline/complete/streamline__complete_transactions.sql +++ b/models/streamline/complete/streamline__complete_transactions.sql @@ -1,17 +1,24 @@ -- depends_on: {{ ref('bronze__streamline_transactions') }} {{ config ( materialized = "incremental", - unique_key = "id", + unique_key = 'complete_transactions_id', cluster_by = "ROUND(block_number, -3)", - merge_update_columns = ["id"], - post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION on equality(id)" + post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION on equality(block_number)" ) }} SELECT - id, - block_number, - metadata :request [2] AS page, - _inserted_timestamp + DATA :height :: INT AS block_number, + COALESCE( + metadata :request :data :params [2], + metadata :request :params [2] + ) :: INT AS page, + {{ dbt_utils.generate_surrogate_key( + ['block_number','page'] + ) }} AS complete_transactions_id, + SYSDATE() AS inserted_timestamp, + SYSDATE() AS modified_timestamp, + _inserted_timestamp, + '{{ invocation_id }}' AS _invocation_id FROM {% if is_incremental() %} @@ -22,11 +29,10 @@ WHERE COALESCE(MAX(_INSERTED_TIMESTAMP), '1970-01-01' :: DATE) max_INSERTED_TIMESTAMP FROM {{ this }}) - ) -{% else %} - {{ ref('bronze__streamline_FR_transactions') }} -{% endif %} + {% else %} + {{ ref('bronze__streamline_FR_transactions') }} + {% endif %} -qualify(ROW_NUMBER() over (PARTITION BY id -ORDER BY - _inserted_timestamp DESC)) = 1 + qualify(ROW_NUMBER() over (PARTITION BY complete_transactions_id + ORDER BY + _inserted_timestamp DESC)) = 1 diff --git a/models/streamline/complete/streamline__complete_tx_counts.sql b/models/streamline/complete/streamline__complete_tx_counts.sql index c5bb30d..3d847b9 100644 --- a/models/streamline/complete/streamline__complete_tx_counts.sql +++ b/models/streamline/complete/streamline__complete_tx_counts.sql @@ -1,32 +1,35 @@ -- depends_on: {{ ref('bronze__streamline_transactions') }} {{ config ( materialized = "incremental", - unique_key = "id", + unique_key = "block_number", cluster_by = "ROUND(block_number, -3)", - merge_update_columns = ["id"], - post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION on equality(id)" + post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION on equality(block_number)" ) }} SELECT - id, - block_number, - DATA :: INTEGER AS tx_count, - _inserted_timestamp + DATA :height :: INT AS block_number, + DATA :: INT AS tx_count, + {{ dbt_utils.generate_surrogate_key( + ['block_number'] + ) }} AS complete_tx_counts_id, + SYSDATE() AS inserted_timestamp, + SYSDATE() AS modified_timestamp, + _inserted_timestamp, + '{{ invocation_id }}' AS _invocation_id FROM + {{ ref('bronze__streamline_tx_counts') }} {% if is_incremental() %} -{{ ref('bronze__streamline_tx_counts') }} WHERE _inserted_timestamp >= ( SELECT COALESCE(MAX(_INSERTED_TIMESTAMP), '1970-01-01' :: DATE) max_INSERTED_TIMESTAMP FROM {{ this }}) - ) -{% else %} - {{ ref('bronze__streamline_FR_tx_counts') }} -{% endif %} + {% else %} + {{ ref('bronze__streamline_FR_tx_counts') }} + {% endif %} -qualify(ROW_NUMBER() over (PARTITION BY id -ORDER BY - _inserted_timestamp DESC)) = 1 + qualify(ROW_NUMBER() over (PARTITION BY block_number + ORDER BY + _inserted_timestamp DESC)) = 1 diff --git a/models/streamline/realtime/streamline__blocks_realtime.sql b/models/streamline/realtime/streamline__blocks_realtime.sql index e119bc7..d18e7aa 100644 --- a/models/streamline/realtime/streamline__blocks_realtime.sql +++ b/models/streamline/realtime/streamline__blocks_realtime.sql @@ -5,7 +5,7 @@ target = "{{this.schema}}.{{this.identifier}}" ) ) }} - +-- depends_on: {{ ref('streamline__complete_blocks') }} WITH blocks AS ( SELECT @@ -20,10 +20,6 @@ SELECT FROM {{ ref("streamline__complete_blocks") }} {% endif %} -ORDER BY - 1 DESC -LIMIT - 1000 ) SELECT ROUND( @@ -49,7 +45,7 @@ SELECT block_number :: STRING ) ), - 'vault/stg/axelar/node/mainnet' + 'vault/prod/axelar/node/mainnet' ) AS request FROM blocks diff --git a/models/streamline/realtime/streamline__transactions_realtime.sql b/models/streamline/realtime/streamline__transactions_realtime.sql index fb16278..b797bb6 100644 --- a/models/streamline/realtime/streamline__transactions_realtime.sql +++ b/models/streamline/realtime/streamline__transactions_realtime.sql @@ -5,57 +5,87 @@ target = "{{this.schema}}.{{this.identifier}}" ) ) }} - +-- depends_on: {{ ref('streamline__complete_transactions') }} +-- depends_on: {{ ref('streamline__complete_tx_counts') }} WITH blocks AS ( SELECT - block_number + A.block_number, + tx_count FROM - {{ ref("streamline__blocks") }} + {{ ref("streamline__complete_tx_counts") }} A {% if is_incremental() %} -EXCEPT -SELECT - block_number -FROM - {{ ref("streamline__complete_transactions") }} +LEFT JOIN {{ ref("streamline__complete_transactions") }} +b +ON A.block_number = b.block_number +WHERE + b.block_number IS NULL {% endif %} -ORDER BY - 1 DESC -) -SELECT - ROUND( - block_number, - -3 - ) AS partition_key, - live.udf_api( - 'POST', - '{service}/{Authentication}', - OBJECT_CONSTRUCT( - 'Content-Type', - 'application/json' +LIMIT + 10 +), numbers AS ( + -- Recursive CTE to generate numbers. We'll use the maximum txcount value to limit our recursion. + SELECT + 1 AS n + UNION ALL + SELECT + n + 1 + FROM + numbers + WHERE + n < ( + SELECT + CEIL(MAX(tx_count) / 100.0) + FROM + blocks) ), - OBJECT_CONSTRUCT( - 'id', + blocks_with_page_numbers AS ( + SELECT + tt.block_number AS block_number, + n.n AS page_number + FROM + blocks tt + JOIN numbers n + ON n.n <= CASE + WHEN tt.tx_count % 100 = 0 THEN tt.tx_count / 100 + ELSE FLOOR( + tt.tx_count / 100 + ) + 1 + END + ) + SELECT + ROUND( block_number, - 'jsonrpc', - '2.0', - 'method', - 'tx_search', - 'params', - ARRAY_CONSTRUCT( - 'tx.height=' || block_number :: STRING, - TRUE, - '1', - --replace with page - '100', - 'asc', - FALSE - ) - ), - 'vault/stg/axelar/node/mainnet' - ) AS request -FROM - blocks -ORDER BY - block_number + -3 + ) AS partition_key, + live.udf_api( + 'POST', + '{service}/{Authentication}', + OBJECT_CONSTRUCT( + 'Content-Type', + 'application/json' + ), + OBJECT_CONSTRUCT( + 'id', + block_number, + 'jsonrpc', + '2.0', + 'method', + 'tx_search', + 'params', + ARRAY_CONSTRUCT( + 'tx.height=' || block_number :: STRING, + TRUE, + page_number :: STRING, + '100', + 'asc', + FALSE + ) + ), + 'vault/prod/axelar/node/mainnet' + ) AS request + FROM + blocks_with_page_numbers + ORDER BY + block_number diff --git a/models/streamline/realtime/streamline__tx_counts_realtime.sql b/models/streamline/realtime/streamline__tx_counts_realtime.sql index c1564ec..af46d56 100644 --- a/models/streamline/realtime/streamline__tx_counts_realtime.sql +++ b/models/streamline/realtime/streamline__tx_counts_realtime.sql @@ -5,7 +5,7 @@ target = "{{this.schema}}.{{this.identifier}}" ) ) }} - +-- depends_on: {{ ref('streamline__complete_tx_counts') }} WITH blocks AS ( SELECT @@ -20,10 +20,6 @@ SELECT FROM {{ ref("streamline__complete_tx_counts") }} {% endif %} -ORDER BY - 1 DESC -LIMIT - 100 ) SELECT ROUND( @@ -54,7 +50,7 @@ SELECT FALSE ) ), - 'vault/stg/axelar/node/mainnet' + 'vault/prod/axelar/node/mainnet' ) AS request FROM blocks diff --git a/models/streamline/streamline__blocks.sql b/models/streamline/streamline__blocks.sql index 421a2d0..264c8f1 100644 --- a/models/streamline/streamline__blocks.sql +++ b/models/streamline/streamline__blocks.sql @@ -4,7 +4,7 @@ ) }} {% if execute %} - {% set height = run_query("SELECT streamline.udf_get_chainhead()") %} + {% set height = run_query("SELECT live.udf_api( 'POST', '{service}/{Authentication}', OBJECT_CONSTRUCT( 'Content-Type', 'application/json' ), OBJECT_CONSTRUCT( 'id', 0, 'jsonrpc', '2.0', 'method', 'status', 'params', [] ), 'vault/stg/axelar/node/mainnet' ):data:result:sync_info:latest_block_height::INT as block") %} {% set block_height = height.columns [0].values() [0] %} {% else %} {% set block_height = 0 %} diff --git a/models/streamline/streamline__blocks_realtime_bak.sql b/models/streamline/streamline__blocks_realtime_bak.sql deleted file mode 100644 index db8dfdc..0000000 --- a/models/streamline/streamline__blocks_realtime_bak.sql +++ /dev/null @@ -1,28 +0,0 @@ -{{ config ( - materialized = "view", - post_hook = if_data_call_function( - func = "{{this.schema}}.udf_bulk_get_blocks(object_construct('sql_source', '{{this.identifier}}'))", - target = "{{this.schema}}.{{this.identifier}}" - ) -) }} - -{% if execute %} - {% set height = run_query('SELECT streamline.udf_get_chainhead()') %} - {% set block_height = height.columns [0].values() [0] %} -{% else %} - {% set block_height = 0 %} -{% endif %} - -SELECT - height AS block_number -FROM - TABLE(streamline.udtf_get_base_table({{ block_height }})) -EXCEPT -SELECT - block_number -FROM - {{ ref( - "streamline__blocks_history" - ) }} -ORDER BY - 1 ASC diff --git a/models/streamline/streamline__txs_realtime.sql b/models/streamline/streamline__txs_realtime.sql deleted file mode 100644 index b856c9a..0000000 --- a/models/streamline/streamline__txs_realtime.sql +++ /dev/null @@ -1,29 +0,0 @@ -{{ config ( - materialized = "view", - post_hook = if_data_call_function( - func = "{{this.schema}}.udf_bulk_get_txs(object_construct('sql_source', '{{this.identifier}}'))", - target = "{{this.schema}}.{{this.identifier}}" - ) -) }} - -WITH last_3_days AS ( - - SELECT - block_number - FROM - {{ ref("_block_lookback") }} -) -SELECT - A.block_number, - A.tx_count -FROM - {{ ref("streamline__blocks_history") }} A {# LEFT JOIN {{ ref("streamline__txs_history") }} - b - ON A.block_number = b.block_number #} -WHERE - block_number IN ( - 11449803, - 11449804 - ) -ORDER BY - 1 ASC diff --git a/models/streamline/streamline__validators_history.sql b/models/streamline/streamline__validators_history.sql deleted file mode 100644 index b59c1fb..0000000 --- a/models/streamline/streamline__validators_history.sql +++ /dev/null @@ -1,54 +0,0 @@ -{{ config ( - materialized = "incremental", - unique_key = "id", - cluster_by = "ROUND(block_number, -3)", - merge_update_columns = ["id"] -) }} - -WITH meta AS ( - - SELECT - last_modified, - file_name - FROM - TABLE( - information_schema.external_table_files( - table_name => '{{ source( "bronze", "validators") }}' - ) - ) A -) - -{% if is_incremental() %}, -max_date AS ( - SELECT - COALESCE(MAX(_INSERTED_TIMESTAMP), '1970-01-01' :: DATE) max_INSERTED_TIMESTAMP - FROM - {{ this }}) - {% endif %} - SELECT - {{ dbt_utils.generate_surrogate_key( - ['block_number'] - ) }} AS id, - block_number, - last_modified AS _inserted_timestamp - FROM - {{ source( - "bronze", - "validators" - ) }} - JOIN meta b - ON b.file_name = metadata$filename - -{% if is_incremental() %} -WHERE - b.last_modified > ( - SELECT - max_INSERTED_TIMESTAMP - FROM - max_date - ) -{% endif %} - -qualify(ROW_NUMBER() over (PARTITION BY id -ORDER BY - _inserted_timestamp DESC)) = 1 diff --git a/models/streamline/streamline__validators_realtime.sql b/models/streamline/streamline__validators_realtime.sql deleted file mode 100644 index 3854b63..0000000 --- a/models/streamline/streamline__validators_realtime.sql +++ /dev/null @@ -1,28 +0,0 @@ -{{ config ( - materialized = "view", - post_hook = if_data_call_function( - func = "{{this.schema}}.udf_bulk_get_validators(object_construct('sql_source', '{{this.identifier}}'))", - target = "{{this.schema}}.{{this.identifier}}" - ) -) }} - -{% if execute %} - {% set height = run_query('SELECT streamline.udf_get_chainhead()') %} - {% set block_height = height.columns [0].values() [0] %} -{% else %} - {% set block_height = 0 %} -{% endif %} - -SELECT - height AS block_number -FROM - TABLE(streamline.udtf_get_base_table({{ block_height }})) -EXCEPT -SELECT - block_number -FROM - {{ ref( - "streamline__validators_history" - ) }} -ORDER BY - 1 ASC