* turbo

* workflows
This commit is contained in:
Austin 2023-04-25 13:37:45 -04:00 committed by GitHub
parent 9a1b6a2989
commit 86fbb2acfc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
18 changed files with 205 additions and 25 deletions

View File

@ -4,8 +4,8 @@ run-name: dbt_run_scheduled_abi
on:
workflow_dispatch:
schedule:
# Runs at every 30th minute (see https://crontab.guru)
- cron: '30 * * * *'
# Runs at every 4 hours at minute 20 (see https://crontab.guru)
- cron: '20 */4 * * *'
env:
DBT_PROFILES_DIR: ./

View File

@ -3,9 +3,8 @@ run-name: dbt_run_streamline_history
on:
workflow_dispatch:
schedule:
# Runs “At minute 30 past every 2nd hour.” (see https://crontab.guru)
- cron: '30 */2 * * *'
branches:
- "main"
env:
DBT_PROFILES_DIR: ./

View File

@ -4,8 +4,8 @@ run-name: dbt_run_scheduled_streamline_incremental
on:
workflow_dispatch:
schedule:
# Runs every hour 6 hours (see https://crontab.guru)
- cron: '0 */6 * * *'
# Runs every "At minute 15.” (see https://crontab.guru)
- cron: '15 * * * *'
env:
DBT_PROFILES_DIR: ./
@ -26,7 +26,7 @@ jobs:
run_dbt_jobs:
runs-on: ubuntu-latest
environment:
name: workflow_prod_2xl
name: workflow_prod
steps:
- uses: actions/checkout@v3

View File

@ -4,8 +4,8 @@ run-name: dbt_run_streamline_realtime
on:
workflow_dispatch:
schedule:
# Runs "every 1 hour" (see https://crontab.guru)
- cron: '0 * * * *'
# Runs “At every 30th minute.” (see https://crontab.guru)
- cron: '*/30 * * * *'
env:
DBT_PROFILES_DIR: ./

View File

@ -0,0 +1,44 @@
{% macro get_merge_sql(
target,
source,
unique_key,
dest_columns,
incremental_predicates
) -%}
{% set predicate_override = "" %}
{% if incremental_predicates [0] == "dynamic_range" %}
-- run some queries to dynamically determine the min + max of this 'input_column' in the new data
{% set input_column = incremental_predicates [1] %}
{% set get_limits_query %}
SELECT
MIN(
{{ input_column }}
) AS lower_limit,
MAX(
{{ input_column }}
) AS upper_limit
FROM
{{ source }}
{% endset %}
{% set limits = run_query(get_limits_query) [0] %}
{% set lower_limit,
upper_limit = limits [0],
limits [1] %}
-- use those calculated min + max values to limit 'target' scan, to only the days with new data
{% set predicate_override %}
dbt_internal_dest.{{ input_column }} BETWEEN '{{ lower_limit }}'
AND '{{ upper_limit }}' {% endset %}
{% endif %}
{% set predicates = [predicate_override] if predicate_override else incremental_predicates %}
-- standard merge from here
{% set merge_sql = dbt.get_merge_sql(
target,
source,
unique_key,
dest_columns,
predicates
) %}
{{ return(merge_sql) }}
{% endmacro %}

View File

@ -0,0 +1,8 @@
{% macro dbt_snowflake_get_tmp_relation_type(
strategy,
unique_key,
language
) %}
-- always table
{{ return('table') }}
{% endmacro %}

12
macros/lookback.sql Normal file
View File

@ -0,0 +1,12 @@
{% macro lookback() %}
{% if execute and is_incremental() %}
{% set query %}
SELECT
MAX(_inserted_timestamp) :: DATE - 1
FROM
{{ this }};
{% endset %}
{% set last_week = run_query(query).columns [0] [0] %}
{% do return(last_week) %}
{% endif %}
{% endmacro %}

View File

@ -1,8 +1,8 @@
{{ config(
materialized = 'incremental',
unique_key = "tx_hash",
incremental_strategy = 'delete+insert',
unique_key = ['block_number', 'event_index'],
cluster_by = "block_timestamp::date, _inserted_timestamp::date",
incremental_predicates = ["dynamic_range", "block_timestamp::date"],
post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION",
full_refresh = False
) }}
@ -86,6 +86,11 @@ new_records AS (
block_number,
tx_hash
)
{% if is_incremental() %}
WHERE
txs._INSERTED_TIMESTAMP >= '{{ lookback() }}'
{% endif %}
)
{% if is_incremental() %},

View File

@ -3,6 +3,7 @@
materialized = 'incremental',
unique_key = "tx_hash",
cluster_by = "ROUND(block_number, -3)",
incremental_predicates = ["dynamic_range", "block_number"],
post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION on equality(tx_hash)",
full_refresh = False
) }}

View File

@ -1,9 +1,9 @@
-- depends_on: {{ ref('bronze__streamline_traces') }}
{{ config (
materialized = "incremental",
unique_key = "concat(block_number, '-', tx_position)",
incremental_strategy = 'delete+insert',
unique_key = ['block_number', 'tx_position', 'trace_index'],
cluster_by = "block_timestamp::date, _inserted_timestamp::date",
incremental_predicates = ["dynamic_range", "block_timestamp::date"],
post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION",
full_refresh = False
) }}

View File

@ -3,6 +3,7 @@
materialized = 'incremental',
unique_key = "tx_hash",
cluster_by = "block_timestamp::date, _inserted_timestamp::date",
incremental_predicates = ["dynamic_range", "block_timestamp::date"],
post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION",
full_refresh = False
) }}
@ -118,9 +119,16 @@ new_records AS (
AND A.data :hash :: STRING = r.tx_hash
LEFT OUTER JOIN {{ ref('silver__blocks2') }}
b
ON A.block_number = b.block_number qualify(ROW_NUMBER() over (PARTITION BY A.block_number, A.data :hash :: STRING
ORDER BY
A._inserted_timestamp DESC)) = 1
ON A.block_number = b.block_number
{% if is_incremental() %}
WHERE
r._INSERTED_TIMESTAMP >= '{{ lookback() }}'
{% endif %}
qualify(ROW_NUMBER() over (PARTITION BY A.block_number, A.data :hash :: STRING
ORDER BY
A._inserted_timestamp DESC)) = 1
)
{% if is_incremental() %},

View File

@ -0,0 +1,23 @@
{{ config(
materialized = 'incremental',
unique_key = "block_number"
) }}
SELECT
block_number,
MIN(_inserted_timestamp) AS _inserted_timestamp,
COUNT(*) AS tx_count
FROM
{{ ref('silver__transactions2') }}
{% if is_incremental() %}
WHERE
_inserted_timestamp >= (
SELECT
MAX(_inserted_timestamp) _inserted_timestamp
FROM
{{ this }}
)
{% endif %}
GROUP BY
block_number

View File

@ -3,9 +3,25 @@
tags = ['recent_test']
) }}
WITH last_3_days AS (
SELECT
block_number
FROM
{{ ref("_max_block_by_date") }}
qualify ROW_NUMBER() over (
ORDER BY
block_number DESC
) = 3
)
SELECT
*
FROM
{{ ref('silver__blocks2') }}
WHERE
_inserted_timestamp :: DATE >= CURRENT_DATE() - 1
block_number >= (
SELECT
block_number
FROM
last_3_days
)

View File

@ -3,9 +3,25 @@
tags = ['recent_test']
) }}
WITH last_3_days AS (
SELECT
block_number
FROM
{{ ref("_max_block_by_date") }}
qualify ROW_NUMBER() over (
ORDER BY
block_number DESC
) = 3
)
SELECT
*
FROM
{{ ref('silver__logs2') }}
WHERE
_inserted_timestamp :: DATE >= CURRENT_DATE() - 1
block_number >= (
SELECT
block_number
FROM
last_3_days
)

View File

@ -3,9 +3,25 @@
tags = ['recent_test']
) }}
WITH last_3_days AS (
SELECT
block_number
FROM
{{ ref("_max_block_by_date") }}
qualify ROW_NUMBER() over (
ORDER BY
block_number DESC
) = 3
)
SELECT
*
FROM
{{ ref('silver__receipts') }}
WHERE
_inserted_timestamp :: DATE >= CURRENT_DATE() - 1
block_number >= (
SELECT
block_number
FROM
last_3_days
)

View File

@ -3,9 +3,25 @@
tags = ['recent_test']
) }}
WITH last_3_days AS (
SELECT
block_number
FROM
{{ ref("_max_block_by_date") }}
qualify ROW_NUMBER() over (
ORDER BY
block_number DESC
) = 3
)
SELECT
*
FROM
{{ ref('silver__traces2') }}
WHERE
_inserted_timestamp :: DATE >= CURRENT_DATE() - 1
block_number >= (
SELECT
block_number
FROM
last_3_days
)

View File

@ -3,9 +3,25 @@
tags = ['recent_test']
) }}
WITH last_3_days AS (
SELECT
block_number
FROM
{{ ref("_max_block_by_date") }}
qualify ROW_NUMBER() over (
ORDER BY
block_number DESC
) = 3
)
SELECT
*
FROM
{{ ref('silver__transactions2') }}
WHERE
_inserted_timestamp :: DATE >= CURRENT_DATE() - 1
block_number >= (
SELECT
block_number
FROM
last_3_days
)

View File

@ -1,7 +1,7 @@
packages:
- package: calogica/dbt_expectations
version: 0.8.0
version: 0.8.2
- package: dbt-labs/dbt_external_tables
version: 0.8.0
version: 0.8.2
- package: dbt-labs/dbt_utils
version: 0.9.2
version: 1.0.0