chainhead, blocks, bronze complete

This commit is contained in:
Eric Laurello 2023-11-14 10:18:51 -05:00
parent 25e98c928d
commit 9d873f9451
9 changed files with 153 additions and 0 deletions

View File

@ -0,0 +1,31 @@
name: dbt_run_streamline_blocks_tx_realtime
run-name: dbt_run_streamline_blocks_tx_realtime
on:
workflow_dispatch:
env:
DBT_PROFILES_DIR: ./
ACCOUNT: "${{ vars.ACCOUNT }}"
ROLE: "${{ vars.ROLE }}"
USER: "${{ vars.USER }}"
PASSWORD: "${{ secrets.PASSWORD }}"
REGION: "${{ vars.REGION }}"
DATABASE: "${{ vars.DATABASE }}"
WAREHOUSE: "${{ vars.WAREHOUSE }}"
SCHEMA: "${{ vars.SCHEMA }}"
concurrency:
group: ${{ github.workflow }}
jobs:
called_workflow_template:
uses: FlipsideCrypto/analytics-workflow-templates/.github/workflows/dbt_run_template.yml@main
with:
dbt_command: |
dbt run --vars '{"STREAMLINE_INVOKE_STREAMS":True}' -m 1+models/streamline/core/realtime/streamline__blocks_tx_realtime.sql;
environment: workflow_prod
warehouse: ${{ vars.WAREHOUSE }}
secrets: inherit

View File

@ -1 +1,2 @@
workflow_name,workflow_schedule
dbt_run_streamline_blocks_tx_realtime, "*/15 * * * *"

1 workflow_name workflow_schedule
2 dbt_run_streamline_blocks_tx_realtime */15 * * * *

View File

@ -7,6 +7,9 @@
{% do run_query(sql) %}
{% if target.database != "APTOS_COMMUNITY_DEV" %}
{% set sql %}
{{ create_udtf_get_base_table(
schema = "streamline"
) }}
{{ create_udf_bulk_json_rpc() }}
{{ create_udf_bulk_rest_api() }}

View File

@ -0,0 +1,24 @@
{% macro create_udtf_get_base_table(schema) %}
create or replace function {{ schema }}.udtf_get_base_table(max_height integer)
returns table (height number)
as
$$
with base as (
select
row_number() over (
order by
seq4()
) as id
from
table(generator(rowcount => 100000000))
)
select
id as height
from
base
where
id <= max_height
$$
;
{% endmacro %}

View File

@ -0,0 +1,11 @@
{{ config (
materialized = 'view'
) }}
{% set model = this.identifier.split("_") [-1] %}
{{ streamline_external_table_FR_query(
model,
partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 3), '_', 1) AS INTEGER )",
partition_name = "_partition_by_block_id",
unique_key = "block_number"
) }}

View File

@ -0,0 +1,11 @@
{{ config (
materialized = 'view'
) }}
{% set model = this.identifier.split("_") [-1] %}
{{ streamline_external_table_query(
model,
partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 3), '_', 1) AS INTEGER )",
partition_name = "_partition_by_block_id",
unique_key = "block_number"
) }}

View File

@ -8,6 +8,11 @@ sources:
- name: dim_date_hours
- name: address_tags
- name: dim_dates
- name: crosschain_silver
database: "{{ 'crosschain' if target.database == 'aptos' else 'crosschain_dev' }}"
schema: silver
tables:
- name: number_sequence
- name: aptos_bronze
database: aptos
schema: bronze

View File

@ -0,0 +1,31 @@
-- depends_on: {{ ref('bronze__streamline_blocks_tx') }}
{{ config (
materialized = "incremental",
unique_key = "id",
cluster_by = "ROUND(block_number, -3)",
merge_update_columns = ["id"],
post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION on equality(id)"
) }}
SELECT
id,
block_number,
_inserted_timestamp
FROM
{% if is_incremental() %}
{{ ref('bronze__streamline_blocks_tx') }}
WHERE
_inserted_timestamp >= (
SELECT
MAX(_inserted_timestamp) _inserted_timestamp
FROM
{{ this }}
)
{% else %}
{{ ref('bronze__streamline_FR_blocks_tx') }}
{% endif %}
qualify(ROW_NUMBER() over (PARTITION BY id
ORDER BY
_inserted_timestamp DESC)) = 1

View File

@ -0,0 +1,36 @@
{{ config (
materialized = "view",
tags = ['streamline_view']
) }}
{% if execute %}
WITH chainhead AS (
SELECT
{{ target.database }}.live.udf_api(
'GET',
'https://twilight-silent-gas.aptos-mainnet.quiknode.pro/f64d711fb5881ce64cf18a31f796885050178031/v1',
OBJECT_CONSTRUCT(
'Content-Type',
'application/json'
),{}
) :data :block_height :: INT AS block_height
)
SELECT
_id AS block_number
FROM
{{ source(
'crosschain_silver',
'number_sequence'
) }}
WHERE
_id <= (
SELECT
block_height
FROM
chainhead
)
{% else %}
SELECT
0 AS block_number
{% endif %}