workflows, table renames, simplification

This commit is contained in:
Eric Laurello 2025-03-14 11:55:07 -04:00
parent 31f8e047c1
commit 038f6506de
10 changed files with 149 additions and 165 deletions

View File

@ -44,4 +44,4 @@ jobs:
dbt deps
- name: Run DBT Jobs
run: |
dbt run -m tag:defillama tag:deepnftvalue tag:core tag:blast tag:polymarket --exclude models/defillama/bronze/bronze__defillama_stablecoin_supply.sql+
dbt run -m external_models,tag:defillama external_models,tag:deepnftvalue external_models,tag:core external_models,tag:blast external_models,tag:polymarket external_models,tag:bitquery --exclude models/defillama/bronze/bronze__defillama_stablecoin_supply.sql+

View File

@ -0,0 +1,47 @@
name: dbt_run_streamline_daily
run-name: dbt_run_streamline_daily
on:
workflow_dispatch:
schedule:
# Runs "at 08:00 UTC" every day (see https://crontab.guru)
- cron: '30 7 * * *'
env:
DBT_PROFILES_DIR: ${{ vars.DBT_PROFILES_DIR }}
ACCOUNT: "${{ vars.ACCOUNT }}"
ROLE: "${{ vars.ROLE }}"
USER: "${{ vars.USER }}"
PASSWORD: "${{ secrets.PASSWORD }}"
REGION: "${{ vars.REGION }}"
DATABASE: "${{ vars.DATABASE }}"
WAREHOUSE: "${{ vars.WAREHOUSE }}"
SCHEMA: "${{ vars.SCHEMA }}"
concurrency:
group: ${{ github.workflow }}
jobs:
run_dbt_jobs:
runs-on: ubuntu-latest
environment:
name: workflow_prod
steps:
- uses: actions/checkout@v3
- uses: actions/setup-python@v4
with:
python-version: "3.10"
cache: "pip"
- name: install dependencies
run: |
pip install -r requirements.txt
dbt deps
- name: Run DBT Jobs
run: |
dbt run --vars '{"STREAMLINE_INVOKE_STREAMS":True}' -m 1+models/bitquery/streamline/streamline__bitquery_realtime.sql

View File

@ -53,7 +53,7 @@ vars:
OBSERV_FULL_TEST: False
START_GHA_TASKS: False
STREAMLINE_INVOKE_STREAMS: False
STREAMLINE_USE_DEV_FOR_EXTERNAL_TABLES: True
STREAMLINE_USE_DEV_FOR_EXTERNAL_TABLES: False
STREAMLINE_RUN_HISTORY: False
STREAMLINE_RETRY_UNKNOWN: False
UPDATE_SNOWFLAKE_TAGS: True

View File

@ -5,5 +5,5 @@
model = 'bitquery',
partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 3), '_', 1) AS INTEGER)",
partition_name = "partition_key",
other_cols = "to_date(value:DATE_DAY::STRING,'YYYYMMDD') AS DATE_DAY, value:BLOCKCHAIN::STRING AS BLOCKCHAIN, value:METRIC::STRING AS METRIC"
other_cols = "to_date(value:DATE_DAY::STRING,'YYYYMMDD') AS DATE_DAY, to_date(value:DATE_DAY::STRING,'YYYYMMDD') AS DATE_DAY, value:BLOCKCHAIN::STRING AS BLOCKCHAIN, value:METRIC::STRING AS METRIC"
) }}

View File

@ -5,13 +5,30 @@
tags = ['bitquery']
) }}
WITH ripple AS (
WITH base AS(
SELECT
A.blockchain,
A.metric,
A.date_Day AS as_of_date,
b.value :countBigInt AS active_users,
COALESCE(
REGEXP_SUBSTR(
A.data,
'"countBigInt"\\s*:\\s*"([^"]+)"',
1,
1,
'e',
1
),
REGEXP_SUBSTR(
A.data,
'"senders"\\s*:\\s*"([^"]+)"',
1,
1,
'e',
1
)
) active_users,
A._inserted_timestamp
FROM
@ -21,67 +38,20 @@ WITH ripple AS (
{{ ref('bronze__bitquery_FR') }}
{% endif %}
A,
LATERAL FLATTEN(
A.data :data :ripple :transactions
) b
A
WHERE
A.data :errors IS NULL
AND A.metric = 'active_users'
AND A.blockchain = 'ripple'
AND active_users IS NOT NULL
{% if is_incremental() %}
AND _inserted_timestamp :: DATE > (
AND _inserted_timestamp > (
SELECT
MAX(_inserted_timestamp) :: DATE
MAX(_inserted_timestamp)
FROM
{{ this }}
)
{% endif %}
),
hedera AS (
SELECT
A.blockchain,
A.metric,
A.date_Day AS as_of_date,
b.value :countBigInt AS active_users,
A._inserted_timestamp
FROM
{% if is_incremental() %}
{{ ref('bronze__bitquery_FR') }}
{% else %}
{{ ref('bronze__bitquery_FR') }}
{% endif %}
A,
LATERAL FLATTEN(
A.data :data :hedera :transactions
) b
WHERE
A.data :errors IS NULL
AND A.metric = 'active_users'
AND A.blockchain = 'hedera'
{% if is_incremental() %}
AND _inserted_timestamp :: DATE > (
SELECT
MAX(_inserted_timestamp) :: DATE
FROM
{{ this }}
)
{% endif %}
),
ua AS (
SELECT
*
FROM
ripple
UNION ALL
SELECT
*
FROM
hedera
)
SELECT
blockchain,
@ -96,7 +66,7 @@ SELECT
SYSDATE() AS modified_timestamp,
'{{ invocation_id }}' AS _invocation_id
FROM
ua qualify ROW_NUMBER() over (
base qualify ROW_NUMBER() over (
PARTITION BY blockchain,
metric,
as_of_date

View File

@ -1,6 +1,6 @@
version: 2
models:
- name: silver__bitquery_active_users
- name: bitquery__active_users
tests:
- dbt_utils.unique_combination_of_columns:
combination_of_columns:

View File

@ -0,0 +1,72 @@
-- depends_on: {{ ref('bronze__bitquery') }}
{{ config(
materialized = 'incremental',
unique_key = ['blockchain', 'metric', 'block_date'],
tags = ['bitquery']
) }}
WITH base AS (
SELECT
A.blockchain,
A.metric,
REGEXP_SUBSTR(
A.data,
'"date"\\s*:\\s*\\{\\s*"date"\\s*:\\s*"([^"]+)"',
1,
1,
'e',
1
) AS block_date,
REGEXP_SUBSTR(
A.data,
'"countBigInt"\\s*:\\s*"([^"]+)"',
1,
1,
'e',
1
) AS tx_count,
A._inserted_timestamp
FROM
{% if is_incremental() %}
{{ ref('bronze__bitquery') }}
{% else %}
{{ ref('bronze__bitquery_FR') }}
{% endif %}
A
WHERE
A.data :errors IS NULL
AND A.metric = 'tx_count'
AND tx_count IS NOT NULL
{% if is_incremental() %}
AND _inserted_timestamp > (
SELECT
MAX(_inserted_timestamp)
FROM
{{ this }}
)
{% endif %}
)
SELECT
blockchain,
metric,
block_date,
tx_count,
_inserted_timestamp,
{{ dbt_utils.generate_surrogate_key(
['blockchain','metric','block_date']
) }} AS accounts_id,
SYSDATE() AS inserted_timestamp,
SYSDATE() AS modified_timestamp,
'{{ invocation_id }}' AS _invocation_id
FROM
base qualify ROW_NUMBER() over (
PARTITION BY blockchain,
metric,
block_date
ORDER BY
_inserted_timestamp DESC
) = 1

View File

@ -1,6 +1,6 @@
version: 2
models:
- name: silver__bitquery_tx_count
- name: bitquery__tx_count
tests:
- dbt_utils.unique_combination_of_columns:
combination_of_columns:

View File

@ -1,105 +0,0 @@
-- depends_on: {{ ref('bronze__bitquery') }}
{{ config(
materialized = 'incremental',
unique_key = ['blockchain', 'metric', 'block_date'],
tags = ['bitquery']
) }}
WITH ripple AS (
SELECT
A.blockchain,
A.metric,
b.value :date :date :: DATE AS block_date,
b.value :countBigInt AS tx_count,
A._inserted_timestamp
FROM
{% if is_incremental() %}
{{ ref('bronze__bitquery') }}
{% else %}
{{ ref('bronze__bitquery_FR') }}
{% endif %}
A,
LATERAL FLATTEN(
A.data :data :ripple :transactions
) b
WHERE
A.data :errors IS NULL
AND A.metric = 'tx_count'
AND A.blockchain = 'ripple'
{% if is_incremental() %}
AND _inserted_timestamp :: DATE > (
SELECT
MAX(_inserted_timestamp) :: DATE
FROM
{{ this }}
)
{% endif %}
),
hedera AS (
SELECT
A.blockchain,
A.metric,
b.value :date :date :: DATE AS block_date,
b.value :count AS tx_count,
A._inserted_timestamp
FROM
{% if is_incremental() %}
{{ ref('bronze__bitquery_FR') }}
{% else %}
{{ ref('bronze__bitquery_FR') }}
{% endif %}
A,
LATERAL FLATTEN(
A.data :data :hedera :transactions
) b
WHERE
A.data :errors IS NULL
AND A.metric = 'tx_count'
AND A.blockchain = 'hedera'
{% if is_incremental() %}
AND _inserted_timestamp :: DATE > (
SELECT
MAX(_inserted_timestamp) :: DATE
FROM
{{ this }}
)
{% endif %}
),
ua AS (
SELECT
*
FROM
ripple
UNION ALL
SELECT
*
FROM
hedera
)
SELECT
blockchain,
metric,
block_date,
tx_count,
_inserted_timestamp,
{{ dbt_utils.generate_surrogate_key(
['blockchain','metric','block_date']
) }} AS accounts_id,
SYSDATE() AS inserted_timestamp,
SYSDATE() AS modified_timestamp,
'{{ invocation_id }}' AS _invocation_id
FROM
ua qualify ROW_NUMBER() over (
PARTITION BY blockchain,
metric,
block_date
ORDER BY
_inserted_timestamp DESC
) = 1

View File

@ -50,7 +50,7 @@ WITH metrics AS (
SELECT
'hedera' AS blockchain,
'active_users' AS metric,
'query ($network: HederaNetwork!, $from: ISO8601DateTime, $till: ISO8601DateTime) {hedera(network: $network) {transactions(date: {since: $from, till: $till}) { date: date { date(format: $dateFormat) } countBigInt(uniq: payer_account) } } }' AS query_text,
'query ($network: HederaNetwork!, $from: ISO8601DateTime, $till: ISO8601DateTime) {hedera(network: $network) {transactions(date: {since: $from, till: $till}) { countBigInt(uniq: payer_account) } } }' AS query_text,
'distinct counts of payer accounts over the last 30 days' AS description
UNION ALL
SELECT