diff --git a/.github/workflows/dbt_run_streamline_blocks_txcount_realtime.yml b/.github/workflows/dbt_run_streamline_blocks_txcount_realtime.yml new file mode 100644 index 0000000..548e647 --- /dev/null +++ b/.github/workflows/dbt_run_streamline_blocks_txcount_realtime.yml @@ -0,0 +1,31 @@ +name: dbt_run_streamline_blocks_tx_realtime +run-name: dbt_run_streamline_blocks_tx_realtime + +on: + workflow_dispatch: + +env: + DBT_PROFILES_DIR: ./ + + ACCOUNT: "${{ vars.ACCOUNT }}" + ROLE: "${{ vars.ROLE }}" + USER: "${{ vars.USER }}" + PASSWORD: "${{ secrets.PASSWORD }}" + REGION: "${{ vars.REGION }}" + DATABASE: "${{ vars.DATABASE }}" + WAREHOUSE: "${{ vars.WAREHOUSE }}" + SCHEMA: "${{ vars.SCHEMA }}" + +concurrency: + group: ${{ github.workflow }} + +jobs: + called_workflow_template: + uses: FlipsideCrypto/analytics-workflow-templates/.github/workflows/dbt_run_template.yml@main + with: + dbt_command: | + dbt run --vars '{"STREAMLINE_INVOKE_STREAMS":True}' -m 1+models/streamline/core/realtime/streamline__blocks_tx_realtime.sql; + + environment: workflow_prod + warehouse: ${{ vars.WAREHOUSE }} + secrets: inherit diff --git a/data/github_actions__workflows.csv b/data/github_actions__workflows.csv index 8ccd9a7..9d9dadf 100644 --- a/data/github_actions__workflows.csv +++ b/data/github_actions__workflows.csv @@ -1 +1,2 @@ workflow_name,workflow_schedule +dbt_run_streamline_blocks_tx_realtime, "*/15 * * * *" diff --git a/macros/create_udfs.sql b/macros/create_udfs.sql index ecd02b5..e3d05f6 100644 --- a/macros/create_udfs.sql +++ b/macros/create_udfs.sql @@ -7,6 +7,9 @@ {% do run_query(sql) %} {% if target.database != "APTOS_COMMUNITY_DEV" %} {% set sql %} + {{ create_udtf_get_base_table( + schema = "streamline" + ) }} {{ create_udf_bulk_json_rpc() }} {{ create_udf_bulk_rest_api() }} diff --git a/macros/streamline/get_base_table_udft.sql b/macros/streamline/get_base_table_udft.sql new file mode 100644 index 0000000..a488d14 --- /dev/null +++ b/macros/streamline/get_base_table_udft.sql @@ -0,0 +1,24 @@ +{% macro create_udtf_get_base_table(schema) %} +create or replace function {{ schema }}.udtf_get_base_table(max_height integer) +returns table (height number) +as +$$ + with base as ( + select + row_number() over ( + order by + seq4() + ) as id + from + table(generator(rowcount => 100000000)) + ) +select + id as height +from + base +where + id <= max_height +$$ +; + +{% endmacro %} \ No newline at end of file diff --git a/models/bronze/core/bronze__streamline_FR_blocks_tx.sql b/models/bronze/core/bronze__streamline_FR_blocks_tx.sql new file mode 100644 index 0000000..796b55c --- /dev/null +++ b/models/bronze/core/bronze__streamline_FR_blocks_tx.sql @@ -0,0 +1,11 @@ +{{ config ( + materialized = 'view' +) }} + +{% set model = this.identifier.split("_") [-1] %} +{{ streamline_external_table_FR_query( + model, + partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 3), '_', 1) AS INTEGER )", + partition_name = "_partition_by_block_id", + unique_key = "block_number" +) }} diff --git a/models/bronze/core/bronze__streamline_blocks_tx.sql b/models/bronze/core/bronze__streamline_blocks_tx.sql new file mode 100644 index 0000000..da8e375 --- /dev/null +++ b/models/bronze/core/bronze__streamline_blocks_tx.sql @@ -0,0 +1,11 @@ +{{ config ( + materialized = 'view' +) }} + +{% set model = this.identifier.split("_") [-1] %} +{{ streamline_external_table_query( + model, + partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 3), '_', 1) AS INTEGER )", + partition_name = "_partition_by_block_id", + unique_key = "block_number" +) }} diff --git a/models/sources.yml b/models/sources.yml index 680f7ac..0389e90 100644 --- a/models/sources.yml +++ b/models/sources.yml @@ -8,6 +8,11 @@ sources: - name: dim_date_hours - name: address_tags - name: dim_dates + - name: crosschain_silver + database: "{{ 'crosschain' if target.database == 'aptos' else 'crosschain_dev' }}" + schema: silver + tables: + - name: number_sequence - name: aptos_bronze database: aptos schema: bronze diff --git a/models/streamline/core/complete/streamline__blocks_tx_complete.sql b/models/streamline/core/complete/streamline__blocks_tx_complete.sql new file mode 100644 index 0000000..2b17a7b --- /dev/null +++ b/models/streamline/core/complete/streamline__blocks_tx_complete.sql @@ -0,0 +1,31 @@ +-- depends_on: {{ ref('bronze__streamline_blocks_tx') }} +{{ config ( + materialized = "incremental", + unique_key = "id", + cluster_by = "ROUND(block_number, -3)", + merge_update_columns = ["id"], + post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION on equality(id)" +) }} + +SELECT + id, + block_number, + _inserted_timestamp +FROM + +{% if is_incremental() %} +{{ ref('bronze__streamline_blocks_tx') }} +WHERE + _inserted_timestamp >= ( + SELECT + MAX(_inserted_timestamp) _inserted_timestamp + FROM + {{ this }} + ) +{% else %} + {{ ref('bronze__streamline_FR_blocks_tx') }} +{% endif %} + +qualify(ROW_NUMBER() over (PARTITION BY id +ORDER BY + _inserted_timestamp DESC)) = 1 diff --git a/models/streamline/streamline__blocks.sql b/models/streamline/streamline__blocks.sql new file mode 100644 index 0000000..6470780 --- /dev/null +++ b/models/streamline/streamline__blocks.sql @@ -0,0 +1,36 @@ +{{ config ( + materialized = "view", + tags = ['streamline_view'] +) }} + +{% if execute %} + WITH chainhead AS ( + + SELECT + {{ target.database }}.live.udf_api( + 'GET', + 'https://twilight-silent-gas.aptos-mainnet.quiknode.pro/f64d711fb5881ce64cf18a31f796885050178031/v1', + OBJECT_CONSTRUCT( + 'Content-Type', + 'application/json' + ),{} + ) :data :block_height :: INT AS block_height + ) +SELECT + _id AS block_number +FROM + {{ source( + 'crosschain_silver', + 'number_sequence' + ) }} +WHERE + _id <= ( + SELECT + block_height + FROM + chainhead + ) +{% else %} +SELECT + 0 AS block_number +{% endif %}