tags and schedule

This commit is contained in:
Eric Laurello 2023-11-10 17:05:24 -05:00
parent 0011695904
commit d65f30d038
54 changed files with 128 additions and 47 deletions

View File

@ -29,7 +29,6 @@ jobs:
dbt_command: |
dbt run-operation stage_external_sources --vars "ext_full_refresh: true"
dbt run -s "osmosis_models,tag:balances" "osmosis_models,tag:daily"
dbt run-operation stage_external_sources --vars "ext_full_refresh: true"
environment: workflow_prod
warehouse: ${{ vars.WAREHOUSE }}
secrets: inherit

View File

@ -1,5 +1,5 @@
name: dbt_run_incremental
run-name: dbt_run_incremental
name: dbt_run_incremental_core
run-name: dbt_run_incremental_core
on:
workflow_dispatch:
@ -28,8 +28,7 @@ jobs:
with:
dbt_command: |
dbt run-operation stage_external_sources --vars "ext_full_refresh: true"
dbt run -s "osmosis_models,./models" --exclude models/silver/_observability models/streamline tag:balances tag:daily
dbt run-operation stage_external_sources --vars "ext_full_refresh: true"
dbt run -s "osmosis_models,tag:core"
environment: workflow_prod
warehouse: ${{ vars.WAREHOUSE }}
secrets: inherit

View File

@ -0,0 +1,34 @@
name: dbt_run_incremental_non_core
run-name: dbt_run_incremental_non_core
on:
workflow_dispatch:
# schedule:
# - cron: '25,55 * * * *'
env:
USE_VARS: "${{ vars.USE_VARS }}"
DBT_PROFILES_DIR: "${{ vars.DBT_PROFILES_DIR }}"
DBT_VERSION: "${{ vars.DBT_VERSION }}"
ACCOUNT: "${{ vars.ACCOUNT }}"
ROLE: "${{ vars.ROLE }}"
USER: "${{ vars.USER }}"
PASSWORD: "${{ secrets.PASSWORD }}"
REGION: "${{ vars.REGION }}"
DATABASE: "${{ vars.DATABASE }}"
WAREHOUSE: "${{ vars.WAREHOUSE }}"
SCHEMA: "${{ vars.SCHEMA }}"
concurrency:
group: ${{ github.workflow }}
jobs:
called_workflow_template:
uses: FlipsideCrypto/analytics-workflow-templates/.github/workflows/dbt_run_template.yml@main
with:
dbt_command: |
dbt run -s "osmosis_models,tag:noncore"
environment: workflow_prod
warehouse: ${{ vars.WAREHOUSE }}
secrets: inherit

View File

@ -1,5 +1,6 @@
workflow_name,workflow_schedule
dbt_run_incremental,"25,55 * * * *"
dbt_run_incremental_core,"25,55 * * * *"
dbt_run_incremental_non_core,"33 * * * *"
dbt_run_streamline_blocks_txcount_realtime,"0,30 * * * *"
dbt_run_streamline_transactions_realtime,"8,38 * * * *"
dbt_test_tasks,"0,30 * * * *"

1 workflow_name workflow_schedule
2 dbt_run_incremental dbt_run_incremental_core 25,55 * * * *
3 dbt_run_incremental_non_core 33 * * * *
4 dbt_run_streamline_blocks_txcount_realtime 0,30 * * * *
5 dbt_run_streamline_transactions_realtime 8,38 * * * *
6 dbt_test_tasks 0,30 * * * *

View File

@ -3,7 +3,8 @@
incremental_strategy = 'delete+insert',
unique_key = 'id',
cluster_by = ['_inserted_timestamp::date','block_timestamp::date'],
post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION"
post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION",
tags = ['core']
) }}
-- depends_on: {{ ref('bronze__streamline_transactions') }}

View File

@ -1,5 +1,6 @@
{{ config (
materialized = 'view'
materialized = 'view',
tags = ['core']
) }}
{% set model = this.identifier.split("_") [-1] %}

View File

@ -1,5 +1,6 @@
{{ config (
materialized = 'view'
materialized = 'view',
tags = ['core']
) }}
{% set model = this.identifier.split("_") [-1] %}

View File

@ -1,5 +1,6 @@
{{ config (
materialized = 'view'
materialized = 'view',
tags = ['core']
) }}
{% set model = this.identifier.split("_") [-1] %}

View File

@ -1,5 +1,6 @@
{{ config (
materialized = 'view'
materialized = 'view',
tags = ['core']
) }}
{% set model = this.identifier.split("_") [-1] %}

View File

@ -1,5 +1,6 @@
{{ config (
materialized = 'view'
materialized = 'view',
tags = ['core']
) }}
{% set model = this.identifier.split("_") [-1] %}

View File

@ -1,5 +1,6 @@
{{ config (
materialized = 'view'
materialized = 'view',
tags = ['core']
) }}
{% set model = this.identifier.split("_") [-1] %}

View File

@ -1,5 +1,6 @@
{{ config(
materialized = 'table'
materialized = 'table',
tags = ['daily']
) }}
WITH base AS (

View File

@ -3,6 +3,7 @@
unique_key = "block_id",
incremental_strategy = 'merge',
cluster_by = ['block_timestamp::DATE'],
tags = ['noncore']
) }}
WITH base AS (

View File

@ -2,7 +2,8 @@
materialized = 'incremental',
unique_key = ['chain_id','block_id'],
incremental_strategy = 'merge',
cluster_by = ['block_timestamp::DATE']
cluster_by = ['block_timestamp::DATE'],
tags = ['core']
) }}
-- depends_on: {{ ref('bronze__streamline_blocks') }}

View File

@ -3,7 +3,8 @@
unique_key = "_unique_key",
incremental_strategy = 'merge',
cluster_by = ['block_timestamp::DATE','_inserted_timestamp::DATE'],
post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION"
post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION",
tags = ['core']
) }}
SELECT

View File

@ -3,7 +3,8 @@
unique_key = "_unique_key",
incremental_strategy = 'merge',
cluster_by = ['block_timestamp::DATE','_inserted_timestamp::DATE'],
post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION"
post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION",
tags = ['core']
) }}
WITH b AS (

View File

@ -3,7 +3,8 @@
unique_key = "tx_id",
incremental_strategy = 'merge',
cluster_by = ['block_timestamp::DATE','_inserted_timestamp::DATE'],
post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION on equality(tx_id)"
post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION on equality(tx_id)",
tags = ['core']
) }}
-- depends_on: {{ ref('bronze__streamline_transactions') }}
WITH sl AS (

View File

@ -3,7 +3,8 @@
materialized = 'incremental',
unique_key = "_unique_key",
incremental_strategy = 'merge',
cluster_by = ['block_timestamp::DATE','_inserted_timestamp::DATE']
cluster_by = ['block_timestamp::DATE','_inserted_timestamp::DATE'],
tags = ['core']
) }}
{% if execute %}

View File

@ -2,7 +2,8 @@
materialized = 'incremental',
unique_key = "_unique_key",
incremental_strategy = 'merge',
cluster_by = ['block_timestamp::DATE','_inserted_timestamp::DATE']
cluster_by = ['block_timestamp::DATE','_inserted_timestamp::DATE'],
tags = ['core']
) }}
WITH b AS (

View File

@ -1,5 +1,6 @@
{{ config(
materialized = 'table'
materialized = 'table',
tags = ['daily']
) }}
SELECT

View File

@ -1,5 +1,6 @@
{{ config(
materialized = 'table'
materialized = 'table',
tags = ['daily']
) }}
SELECT

View File

@ -2,7 +2,7 @@
materialized = 'incremental',
unique_key = "tx_id",
incremental_strategy = 'merge',
cluster_by = ['block_timestamp::DATE'],
cluster_by = ['block_timestamp::DATE']
) }}
WITH

View File

@ -2,7 +2,8 @@
materialized = 'incremental',
unique_key = "_unique_key",
incremental_strategy = 'merge',
cluster_by = ['block_timestamp::DATE']
cluster_by = ['block_timestamp::DATE'],
tags = ['noncore']
) }}
WITH

View File

@ -3,6 +3,7 @@
unique_key = "_unique_key",
incremental_strategy = 'merge',
cluster_by = ['block_timestamp::DATE'],
tags = ['noncore']
) }}
WITH

View File

@ -2,7 +2,8 @@
materialized = 'incremental',
unique_key = "_unique_key",
incremental_strategy = 'merge',
cluster_by = ['block_timestamp::DATE','_inserted_timestamp::DATE']
cluster_by = ['block_timestamp::DATE','_inserted_timestamp::DATE'],
tags = ['noncore']
) }}
WITH

View File

@ -2,7 +2,8 @@
materialized = 'incremental',
unique_key = "_unique_key",
incremental_strategy = 'merge',
cluster_by = ['block_timestamp::DATE','_inserted_timestamp::DATE']
cluster_by = ['block_timestamp::DATE','_inserted_timestamp::DATE'],
tags = ['noncore']
) }}
WITH

View File

@ -2,7 +2,8 @@
materialized = 'incremental',
unique_key = "_unique_key",
incremental_strategy = 'merge',
cluster_by = ['block_timestamp::DATE','_inserted_timestamp::DATE']
cluster_by = ['block_timestamp::DATE','_inserted_timestamp::DATE'],
tags = ['noncore']
) }}
WITH

View File

@ -3,6 +3,7 @@
unique_key = "_unique_key",
incremental_strategy = 'merge',
cluster_by = ['block_timestamp::DATE'],
tags = ['noncore']
) }}
WITH all_staked AS (

View File

@ -2,7 +2,8 @@
materialized = 'incremental',
unique_key = "_unique_key",
incremental_strategy = 'merge',
cluster_by = ['_inserted_timestamp::DATE']
cluster_by = ['_inserted_timestamp::DATE'],
tags = ['noncore']
) }}
WITH count_ids AS (

View File

@ -2,7 +2,8 @@
materialized = 'incremental',
unique_key = ["pool_id","block_id"],
incremental_strategy = 'merge',
cluster_by = ['block_timestamp']
cluster_by = ['block_timestamp'],
tags = ['noncore']
) }}
SELECT

View File

@ -2,7 +2,8 @@
materialized = 'incremental',
unique_key = ["block_date","pool_id","currency"],
incremental_strategy = 'merge',
cluster_by = ['block_date']
cluster_by = ['block_date'],
tags = ['noncore']
) }}
WITH last_block_of_day AS (

View File

@ -2,6 +2,7 @@
materialized = 'incremental',
unique_key = "_unique_key",
incremental_strategy = 'merge',
tags = ['noncore']
) }}
WITH base_atts AS (

View File

@ -2,7 +2,8 @@
materialized = 'incremental',
unique_key = ["pool_id","block_hour"],
incremental_strategy = 'merge',
cluster_by = ['block_timestamp']
cluster_by = ['block_timestamp'],
tags = ['noncore']
) }}
WITH pool_token_prices AS (

View File

@ -3,6 +3,7 @@
unique_key = "CONCAT_WS('-', tx_id, _body_index)",
incremental_strategy = 'delete+insert',
cluster_by = ['block_timestamp::DATE'],
tags = ['noncore']
) }}
WITH

View File

@ -2,7 +2,8 @@
materialized = 'incremental',
unique_key = ["currency","block_date"],
incremental_strategy = 'merge',
cluster_by = ['block_date']
cluster_by = ['block_date'],
tags = ['noncore']
) }}
WITH last_block_of_day AS (

View File

@ -3,6 +3,7 @@
unique_key = ["tx_id","msg_index"],
incremental_strategy = 'merge',
cluster_by = ['block_timestamp::DATE'],
tags = ['noncore']
) }}
WITH bhour AS (

View File

@ -3,6 +3,7 @@
unique_key = "tx_id",
incremental_strategy = 'merge',
cluster_by = ['block_timestamp::DATE'],
tags = ['noncore']
) }}
WITH

View File

@ -3,6 +3,7 @@
unique_key = "tx_id",
incremental_strategy = 'merge',
cluster_by = ['block_timestamp::DATE'],
tags = ['noncore']
) }}
WITH

View File

@ -3,6 +3,7 @@
unique_key = "_unique_key",
incremental_strategy = 'merge',
cluster_by = ['block_timestamp::DATE'],
tags = ['noncore']
) }}
WITH base_tx AS (

View File

@ -2,6 +2,7 @@
materialized = 'incremental',
incremental_strategy = 'delete+insert',
cluster_by = ['block_timestamp'],
tags = ['noncore']
) }}
WITH all_staked AS (

View File

@ -2,7 +2,8 @@
materialized = 'incremental',
unique_key = "_unique_key",
incremental_strategy = 'merge',
cluster_by = ['block_timestamp::DATE']
cluster_by = ['block_timestamp::DATE'],
tags = ['noncore']
) }}
WITH

View File

@ -3,6 +3,7 @@
unique_key = "_unique_key",
incremental_strategy = 'merge',
cluster_by = ['block_timestamp::DATE'],
tags = ['noncore']
) }}
WITH

View File

@ -3,6 +3,7 @@
unique_key = "_unique_key",
incremental_strategy = 'merge',
cluster_by = ['_inserted_timestamp::DATE'],
tags = ['noncore']
) }}
SELECT

View File

@ -3,6 +3,7 @@
unique_key = "_unique_key",
incremental_strategy = 'merge',
cluster_by = ['block_timestamp::DATE'],
tags = ['noncore']
) }}
WITH super AS (

View File

@ -2,7 +2,8 @@
materialized = 'incremental',
unique_key = "_unique_key",
incremental_strategy = 'merge',
cluster_by = ['block_timestamp::DATE']
cluster_by = ['block_timestamp::DATE'],
tags = ['noncore']
) }}
WITH

View File

@ -2,7 +2,8 @@
materialized = 'incremental',
unique_key = ["tx_id","msg_group","msg_sub_group"],
incremental_strategy = 'merge',
cluster_by = ['block_timestamp::DATE']
cluster_by = ['block_timestamp::DATE'],
tags = ['noncore']
) }}
WITH txs AS (

View File

@ -1,7 +1,8 @@
{{ config(
materialized = 'incremental',
unique_key = 'proposal_id',
incremental_strategy = 'delete+insert'
incremental_strategy = 'delete+insert',
tags = ['noncore']
) }}
WITH base AS (

View File

@ -1,5 +1,6 @@
{{ config(
materialized = 'table'
materialized = 'table',
tags = ['daily']
) }}
SELECT
@ -18,20 +19,19 @@ SELECT
VALUE :min_self_delegation :: NUMBER AS min_self_delegation,
VALUE :rank :: NUMBER AS RANK,
VALUE :uptime :missed_blocks :: NUMBER AS missed_blocks,
VALUE AS raw_metadata,
VALUE AS raw_metadata,
concat_ws(
'-',
address,
creator,
blockchain
) AS _unique_key,
'-',
address,
creator,
blockchain
) AS _unique_key,
VALUE :update_time :: TIMESTAMP AS _inserted_timestamp
FROM
{{ source(
'bronze_streamline',
'validator_metadata_api'
) }}
qualify(ROW_NUMBER() over(PARTITION BY blockchain, creator, address
ORDER BY
_inserted_timestamp DESC)) = 1

View File

@ -2,7 +2,8 @@
materialized = 'incremental',
unique_key = ["tx_id","msg_index"],
incremental_strategy = 'merge',
cluster_by = ['block_timestamp::DATE','action']
cluster_by = ['block_timestamp::DATE','action'],
tags = ['noncore']
) }}
WITH mars_contracts AS (

View File

@ -2,7 +2,8 @@
materialized = 'incremental',
unique_key = ["token_address","pool_id","block_id"],
incremental_strategy = 'merge',
cluster_by = ['_inserted_timestamp::DATE']
cluster_by = ['_inserted_timestamp::DATE'],
tags = ['noncore']
) }}
WITH top_pools AS (

View File

@ -2,7 +2,8 @@
materialized = 'incremental',
unique_key = ["token_address","block_id"],
incremental_strategy = 'merge',
cluster_by = ['_inserted_timestamp::DATE']
cluster_by = ['_inserted_timestamp::DATE'],
tags = ['noncore']
) }}
WITH osmo_price AS (

View File

@ -3,6 +3,7 @@
unique_key = ["block_hour", "currency"],
incremental_strategy = 'merge',
cluster_by = ['block_hour::DATE'],
tags = ['noncore']
) }}
WITH swaps AS (

View File

@ -3,6 +3,7 @@
unique_key = "_unique_key",
incremental_strategy = 'merge',
cluster_by = ['recorded_hour::DATE'],
tags = ['noncore']
) }}
WITH date_hours AS (

View File

@ -3,6 +3,7 @@
unique_key = "_unique_key",
incremental_strategy = 'merge',
cluster_by = ['recorded_hour::DATE'],
tags = ['noncore']
) }}
WITH date_hours AS (