updates with best practices, some formatting. testing is the last thing to complete

This commit is contained in:
Mike Stepanovic 2025-03-04 10:26:38 -07:00
parent ea56bbbd34
commit 05585ff7dd
35 changed files with 660 additions and 164 deletions

71
.github/workflows/dbt_docs_update.yml vendored Normal file
View File

@ -0,0 +1,71 @@
name: docs_update
on:
push:
branches:
- "main"
env:
USE_VARS: "${{ vars.USE_VARS }}"
DBT_PROFILES_DIR: "${{ vars.DBT_PROFILES_DIR }}"
DBT_VERSION: "${{ vars.DBT_VERSION }}"
ACCOUNT: "${{ vars.ACCOUNT }}"
ROLE: "${{ vars.ROLE }}"
USER: "${{ vars.USER }}"
PASSWORD: "${{ secrets.PASSWORD }}"
REGION: "${{ vars.REGION }}"
DATABASE: "${{ vars.DATABASE }}"
WAREHOUSE: "${{ vars.WAREHOUSE }}"
SCHEMA: "${{ vars.SCHEMA }}"
concurrency:
group: ${{ github.workflow }}
jobs:
run_dbt_jobs:
runs-on: ubuntu-latest
environment:
name: workflow_prod
steps:
- uses: actions/checkout@v3
- uses: actions/setup-python@v4
with:
python-version: "${{ vars.PYTHON_VERSION }}"
cache: "pip"
- name: install dependencies
run: |
pip install -r requirements.txt
dbt deps
- name: checkout docs branch
run: |
git checkout -B docs origin/main
- name: generate dbt docs
run: |
dbt ls -t prod
dbt docs generate --no-compile -t prod
- name: move files to docs directory
run: |
mkdir -p ./docs
cp target/{catalog.json,manifest.json,index.html} docs/
- name: clean up target directory
run: dbt clean
- name: check for changes
run: git status
- name: stage changed files
run: git add .
- name: commit changed files
run: |
git config user.email "abc@xyz"
git config user.name "github-actions"
git commit -am "Auto-update docs"
- name: push changes to docs
run: |
git push -f --set-upstream origin docs

67
.github/workflows/dbt_run_adhoc.yml vendored Normal file
View File

@ -0,0 +1,67 @@
name: dbt_run_adhoc
run-name: dbt_run_adhoc
on:
workflow_dispatch:
branches:
- "main"
inputs:
environment:
type: choice
description: DBT Run Environment
required: true
options:
- dev
- prod
default: dev
warehouse:
type: choice
description: Snowflake warehouse
required: true
options:
- DBT
- DBT_CLOUD
- DBT_EMERGENCY
default: DBT
dbt_command:
type: string
description: 'DBT Run Command'
required: true
env:
USE_VARS: "${{ vars.USE_VARS }}"
DBT_PROFILES_DIR: "${{ vars.DBT_PROFILES_DIR }}"
DBT_VERSION: "${{ vars.DBT_VERSION }}"
ACCOUNT: "${{ vars.ACCOUNT }}"
ROLE: "${{ vars.ROLE }}"
USER: "${{ vars.USER }}"
PASSWORD: "${{ secrets.PASSWORD }}"
REGION: "${{ vars.REGION }}"
DATABASE: "${{ vars.DATABASE }}"
WAREHOUSE: "${{ inputs.WAREHOUSE }}"
SCHEMA: "${{ vars.SCHEMA }}"
concurrency:
group: ${{ github.workflow }}
jobs:
run_dbt_jobs:
runs-on: ubuntu-latest
environment:
name: workflow_${{ inputs.environment }}
steps:
- uses: actions/checkout@v3
- uses: actions/setup-python@v4
with:
python-version: "${{ vars.PYTHON_VERSION }}"
cache: "pip"
- name: install dependencies
run: |
pip install -r requirements.txt
dbt deps
- name: Run DBT Jobs
run: |
${{ inputs.dbt_command }}

46
.github/workflows/dbt_run_core.yml vendored Normal file
View File

@ -0,0 +1,46 @@
name: dbt_run_incremental_core
run-name: dbt_run_incremental_core
on:
workflow_dispatch:
# schedule:
# # Runs "at minute 10, every hour" (see https://crontab.guru)
# - cron: '10 * * * *'
env:
DBT_PROFILES_DIR: "${{ vars.DBT_PROFILES_DIR }}"
ACCOUNT: "${{ vars.ACCOUNT }}"
ROLE: "${{ vars.ROLE }}"
USER: "${{ vars.USER }}"
PASSWORD: "${{ secrets.PASSWORD }}"
REGION: "${{ vars.REGION }}"
DATABASE: "${{ vars.DATABASE }}"
WAREHOUSE: "${{ vars.WAREHOUSE }}"
SCHEMA: "${{ vars.SCHEMA }}"
concurrency:
group: ${{ github.workflow }}
jobs:
run_dbt_jobs:
runs-on: ubuntu-latest
environment:
name: workflow_prod
steps:
- uses: actions/checkout@v3
- uses: actions/setup-python@v4
with:
python-version: "${{ vars.PYTHON_VERSION }}"
cache: "pip"
- name: install dependencies
run: |
pip install -r requirements.txt
dbt deps
- name: Run DBT Jobs
run: |
dbt run -m "movement_models,tag:core"

46
.github/workflows/dbt_run_daily.yml vendored Normal file
View File

@ -0,0 +1,46 @@
name: dbt_run_daily
run-name: dbt_run_daily
on:
workflow_dispatch:
schedule:
# once daily at 2:15 AM UTC
- cron: "15 2 * * *"
env:
DBT_PROFILES_DIR: "${{ vars.DBT_PROFILES_DIR }}"
ACCOUNT: "${{ vars.ACCOUNT }}"
ROLE: "${{ vars.ROLE }}"
USER: "${{ vars.USER }}"
PASSWORD: "${{ secrets.PASSWORD }}"
REGION: "${{ vars.REGION }}"
DATABASE: "${{ vars.DATABASE }}"
WAREHOUSE: "${{ vars.WAREHOUSE }}"
SCHEMA: "${{ vars.SCHEMA }}"
concurrency:
group: ${{ github.workflow }}
jobs:
run_dbt_jobs:
runs-on: ubuntu-latest
environment:
name: workflow_prod
steps:
- uses: actions/checkout@v3
- uses: actions/setup-python@v4
with:
python-version: "${{ vars.PYTHON_VERSION }}"
cache: "pip"
- name: install dependencies
run: |
pip install -r requirements.txt
dbt deps
- name: Run DBT Jobs
run: |
dbt run -m "movement_models,tag:scheduled_daily"

View File

@ -0,0 +1,44 @@
name: dbt_run_dev_refresh
run-name: dbt_run_dev_refresh
on:
workflow_dispatch:
schedule:
- cron: '27 8 * * *'
env:
DBT_PROFILES_DIR: "${{ vars.DBT_PROFILES_DIR }}"
ACCOUNT: "${{ vars.ACCOUNT }}"
ROLE: "${{ vars.ROLE }}"
USER: "${{ vars.USER }}"
PASSWORD: "${{ secrets.PASSWORD }}"
REGION: "${{ vars.REGION }}"
DATABASE: "${{ vars.DATABASE }}"
WAREHOUSE: "${{ vars.WAREHOUSE }}"
SCHEMA: "${{ vars.SCHEMA }}"
concurrency:
group: ${{ github.workflow }}
jobs:
run_dbt_jobs:
runs-on: ubuntu-latest
environment:
name: workflow_prod
steps:
- uses: actions/checkout@v3
- uses: actions/setup-python@v4
with:
python-version: "${{ vars.PYTHON_VERSION }}"
cache: "pip"
- name: install dependencies
run: |
pip install -r requirements.txt
dbt deps
- name: Run DBT Jobs
run: |
dbt run-operation run_sp_create_prod_clone

48
.github/workflows/dbt_test_daily.yml vendored Normal file
View File

@ -0,0 +1,48 @@
name: dbt_test_daily
run-name: dbt_test_daily
on:
workflow_dispatch:
schedule:
- cron: '0 4 * * *'
env:
DBT_PROFILES_DIR: ./
USE_VARS: "${{ vars.USE_VARS }}"
ACCOUNT: "${{ vars.ACCOUNT }}"
ROLE: "${{ vars.ROLE }}"
USER: "${{ vars.USER }}"
PASSWORD: "${{ secrets.PASSWORD }}"
REGION: "${{ vars.REGION }}"
DATABASE: "${{ vars.DATABASE }}"
WAREHOUSE: "${{ vars.WAREHOUSE }}"
SCHEMA: "${{ vars.SCHEMA }}"
SLACK_WEBHOOK_URL: "${{ secrets.SLACK_WEBHOOK_URL }}"
jobs:
run_dbt_jobs:
runs-on: ubuntu-latest
environment:
name: workflow_prod
steps:
- uses: actions/checkout@v3
- uses: actions/setup-python@v4
with:
python-version: "3.10.x"
cache: "pip"
- name: install dependencies
run: |
pip install -r requirements.txt
dbt deps
- name: Run DBT Jobs
run: |
dbt test -m "movement_models,models/silver" "movement_models,models/gold"
continue-on-error: true
- name: Log test results
run: |
python python/dbt_test_alert.py

48
.github/workflows/dbt_test_monthly.yml vendored Normal file
View File

@ -0,0 +1,48 @@
name: dbt_test_monthly
run-name: dbt_test_monthly
on:
workflow_dispatch:
schedule:
- cron: '0 4 1 * *'
env:
DBT_PROFILES_DIR: ./
USE_VARS: "${{ vars.USE_VARS }}"
ACCOUNT: "${{ vars.ACCOUNT }}"
ROLE: "${{ vars.ROLE }}"
USER: "${{ vars.USER }}"
PASSWORD: "${{ secrets.PASSWORD }}"
REGION: "${{ vars.REGION }}"
DATABASE: "${{ vars.DATABASE }}"
WAREHOUSE: "${{ vars.WAREHOUSE }}"
SCHEMA: "${{ vars.SCHEMA }}"
SLACK_WEBHOOK_URL: "${{ secrets.SLACK_WEBHOOK_URL }}"
jobs:
run_dbt_jobs:
runs-on: ubuntu-latest
environment:
name: workflow_prod
steps:
- uses: actions/checkout@v3
- uses: actions/setup-python@v4
with:
python-version: "3.10.x"
cache: "pip"
- name: install dependencies
run: |
pip install -r requirements.txt
dbt deps
- name: Run DBT Jobs
run: |
dbt test -m "movement_models,models/silver" "movement_models,models/gold"
continue-on-error: true
- name: Log test results
run: |
python python/dbt_test_alert.py

54
.github/workflows/dbt_test_recent.yml vendored Normal file
View File

@ -0,0 +1,54 @@
name: dbt_test_recent
run-name: dbt_test_recent
on:
workflow_dispatch:
schedule:
# run recency tests 2 hours after each BQ to SF export
- cron: "25 3,6,10,14,18,22 * * *"
env:
USE_VARS: "${{ vars.USE_VARS }}"
DBT_PROFILES_DIR: "${{ vars.DBT_PROFILES_DIR }}"
DBT_VERSION: "${{ vars.DBT_VERSION }}"
ACCOUNT: "${{ vars.ACCOUNT }}"
ROLE: "${{ vars.ROLE }}"
USER: "${{ vars.USER }}"
PASSWORD: "${{ secrets.PASSWORD }}"
REGION: "${{ vars.REGION }}"
DATABASE: "${{ vars.DATABASE }}"
WAREHOUSE: "${{ vars.WAREHOUSE }}"
SCHEMA: "${{ vars.SCHEMA }}"
SLACK_WEBHOOK_URL: "${{ secrets.SLACK_WEBHOOK_URL }}"
concurrency:
group: ${{ github.workflow }}
jobs:
run_dbt_jobs:
runs-on: ubuntu-latest
environment:
name: workflow_prod
steps:
- uses: actions/checkout@v3
- uses: actions/setup-python@v4
with:
python-version: "${{ vars.PYTHON_VERSION }}"
cache: "pip"
- name: install dependencies
run: |
pip install -r requirements.txt
dbt deps
- name: Run DBT Jobs
run: |
dbt run -m "movement_models,tag:recent_test"
dbt test -m "movement_models,tag:recent_test"
continue-on-error: true
- name: Log test results
run: |
python python/dbt_test_alert.py

48
.github/workflows/dbt_test_weekly.yml vendored Normal file
View File

@ -0,0 +1,48 @@
name: dbt_test_weekly
run-name: dbt_test_weekly
on:
workflow_dispatch:
schedule:
- cron: '0 4 * * 1'
env:
DBT_PROFILES_DIR: ./
USE_VARS: "${{ vars.USE_VARS }}"
ACCOUNT: "${{ vars.ACCOUNT }}"
ROLE: "${{ vars.ROLE }}"
USER: "${{ vars.USER }}"
PASSWORD: "${{ secrets.PASSWORD }}"
REGION: "${{ vars.REGION }}"
DATABASE: "${{ vars.DATABASE }}"
WAREHOUSE: "${{ vars.WAREHOUSE }}"
SCHEMA: "${{ vars.SCHEMA }}"
SLACK_WEBHOOK_URL: "${{ secrets.SLACK_WEBHOOK_URL }}"
jobs:
run_dbt_jobs:
runs-on: ubuntu-latest
environment:
name: workflow_prod
steps:
- uses: actions/checkout@v3
- uses: actions/setup-python@v4
with:
python-version: "3.10.x"
cache: "pip"
- name: install dependencies
run: |
pip install -r requirements.txt
dbt deps
- name: Run DBT Jobs
run: |
dbt test -m "movement_models,models/silver" "movement_models,models/gold"
continue-on-error: true
- name: Log test results
run: |
python python/dbt_test_alert.py

View File

@ -3,31 +3,31 @@
) %}
WITH block_base AS (
SELECT
block_id,
block_number,
tx_count
FROM
{{ ref('silver__blocks') }}
),
model_name AS (
SELECT
block_id,
block_number,
COUNT(
DISTINCT tx_id
) AS model_tx_count
FROM
{{ model }}
GROUP BY
block_id
block_number
)
SELECT
block_base.block_id,
block_base.block_number,
tx_count,
model_name.block_id,
model_name.block_number,
model_tx_count
FROM
block_base
LEFT JOIN model_name
ON block_base.block_id = model_name.block_id
ON block_base.block_number = model_name.block_number
WHERE
tx_count <> model_tx_count
{% endmacro %}

View File

@ -1,8 +1,9 @@
{{ config(
materialized = 'incremental',
unique_key = "block_number",
unique_key = ['fact_blocks_id'],
incremental_strategy = 'merge',
merge_exclude_columns = ["inserted_timestamp"],
incremental_predicates = ["dynamic_range_predicate","block_timestamp::DATE"],
merge_exclude_columns = ['inserted_timestamp'],
cluster_by = ['block_timestamp::DATE'],
tags = ['core','full_test']
) }}
@ -14,9 +15,11 @@ SELECT
first_version,
last_version,
tx_count_from_versions AS tx_count,
blocks_id AS fact_blocks_id,
inserted_timestamp,
modified_timestamp
{{ dbt_utils.generate_surrogate_key(['block_number']) }} AS fact_blocks_id,
SYSDATE() AS inserted_timestamp,
SYSDATE() AS modified_timestamp,
'{{ invocation_id }}' AS _invocation_id
FROM
{{ ref(
'silver__blocks'

View File

@ -4,7 +4,7 @@ models:
description: '{{ doc("core__fact_blocks") }}'
columns:
- name: BLOCK_NUMBER
- name: block_number
description: '{{ doc("block_number") }}'
- name: BLOCK_TIMESTAMP
description: '{{ doc("block_timestamp") }}'

View File

@ -2,10 +2,10 @@
materialized = 'incremental',
unique_key = ['tx_hash','change_index'],
incremental_strategy = 'merge',
incremental_predicates = ["dynamic_range_predicate", "block_timestamp::DATE"],
incremental_predicates = ["dynamic_range_predicate","block_timestamp::DATE"],
merge_exclude_columns = ["inserted_timestamp"],
cluster_by = ['block_timestamp::DATE','modified_timestamp::DATE'],
post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION ON EQUALITY(version,tx_hash, change_type,inner_change_type,change_address,change_module,change_resource,payload_function);",
post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION ON EQUALITY(version,tx_hash,change_type,inner_change_type,change_address,change_module,change_resource,payload_function);",
tags = ['core','full_test']
) }}
@ -29,9 +29,10 @@ SELECT
key,
VALUE,
state_key_hash,
changes_id AS fact_changes_id,
inserted_timestamp,
modified_timestamp
{{ dbt_utils.generate_surrogate_key(['tx_hash','change_index']) }} AS fact_changes_id,
SYSDATE() AS inserted_timestamp,
SYSDATE() AS modified_timestamp,
'{{ invocation_id }}' AS _invocation_id
FROM
{{ ref(
'silver__changes'

View File

@ -4,7 +4,7 @@ models:
description: '{{ doc("core__fact_changes") }}'
columns:
- name: BLOCK_NUMBER
- name: block_number
description: '{{ doc("block_number") }}'
- name: BLOCK_TIMESTAMP
description: '{{ doc("block_timestamp") }}'

View File

@ -2,10 +2,10 @@
materialized = 'incremental',
unique_key = ['tx_hash','event_index'],
incremental_strategy = 'merge',
incremental_predicates = ["dynamic_range_predicate", "block_timestamp::DATE"],
merge_exclude_columns = ["inserted_timestamp"],
incremental_predicates = ["dynamic_range_predicate","block_timestamp::DATE"],
merge_exclude_columns = ['inserted_timestamp'],
cluster_by = ['block_timestamp::DATE'],
post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION ON EQUALITY(version,tx_hash, event_type,event_address,event_module,event_resource,payload_function);",
post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION ON EQUALITY(version,tx_hash,event_type,event_address,event_module,event_resource,payload_function);",
tags = ['core','full_test']
) }}
@ -26,9 +26,10 @@ SELECT
account_address,
creation_number,
sequence_number,
events_id AS fact_events_id,
inserted_timestamp,
modified_timestamp
{{ dbt_utils.generate_surrogate_key(['tx_hash','event_index']) }} AS fact_events_id,
SYSDATE() AS inserted_timestamp,
SYSDATE() AS modified_timestamp,
'{{ invocation_id }}' AS _invocation_id
FROM
{{ ref(
'silver__events'

View File

@ -4,7 +4,7 @@ models:
description: '{{ doc("core__fact_events") }}'
columns:
- name: BLOCK_NUMBER
- name: block_number
description: '{{ doc("block_number") }}'
- name: BLOCK_TIMESTAMP
description: '{{ doc("block_timestamp") }}'

View File

@ -4,7 +4,7 @@ models:
description: '{{ doc("core__fact_transactions") }}'
columns:
- name: BLOCK_NUMBER
- name: block_number
description: '{{ doc("block_number") }}'
- name: BLOCK_TIMESTAMP
description: '{{ doc("block_timestamp") }}'

View File

@ -2,8 +2,8 @@
materialized = 'incremental',
unique_key = ['tx_hash','block_timestamp::DATE'],
incremental_strategy = 'merge',
incremental_predicates = ["dynamic_range_predicate", "block_timestamp::DATE"],
merge_exclude_columns = ["inserted_timestamp"],
incremental_predicates = ["dynamic_range_predicate","block_timestamp::DATE"],
merge_exclude_columns = ['inserted_timestamp'],
cluster_by = ['block_timestamp::DATE'],
post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION ON EQUALITY(version,tx_hash);",
tags = ['core','full_test']

View File

@ -4,7 +4,7 @@ models:
description: '{{ doc("core__fact_transactions_block_metadata") }}'
columns:
- name: BLOCK_NUMBER
- name: block_number
description: '{{ doc("block_number") }}'
- name: BLOCK_TIMESTAMP
description: '{{ doc("block_timestamp") }}'

View File

@ -4,7 +4,7 @@ models:
description: '{{ doc("core__fact_transactions_state_checkpoint") }}'
columns:
- name: BLOCK_NUMBER
- name: block_number
description: '{{ doc("block_number") }}'
- name: BLOCK_TIMESTAMP
description: '{{ doc("block_timestamp") }}'

View File

@ -4,10 +4,10 @@ models:
tests:
- fsc_utils.sequence_gaps:
column_name: BLOCK_NUMBER
column_name: block_number
where: BLOCK_TIMESTAMP < CURRENT_DATE - 1
columns:
- name: BLOCK_NUMBER
- name: block_number
tests:
- unique
- not_null

View File

@ -4,12 +4,12 @@ models:
tests:
- fsc_utils.sequence_gaps:
column_name: BLOCK_NUMBER
column_name: block_number
config:
severity: error
error_if: ">100"
columns:
- name: BLOCK_NUMBER
- name: block_number
tests:
- unique
- not_null

View File

@ -8,7 +8,7 @@ models:
where: BLOCK_TIMESTAMP < CURRENT_DATE - 1
columns:
- name: BLOCK_NUMBER
- name: block_number
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:

View File

@ -10,7 +10,7 @@ models:
error_if: ">100"
columns:
- name: BLOCK_NUMBER
- name: block_number
tests:
- not_null
- name: BLOCK_TIMESTAMP

View File

@ -89,7 +89,7 @@ blocks AS (
) over (
ORDER BY
l.block_number ASC
) AS prev_BLOCK_NUMBER
) AS prev_block_number
FROM
{{ ref("core__fact_blocks") }}
l

View File

@ -20,7 +20,7 @@ WITH summary_stats AS (
{% if is_incremental() %}
AND (
block_number >= (
>= (
SELECT
MIN(block_number)
FROM

View File

@ -1,27 +1,63 @@
{{ config(
materialized = 'incremental',
unique_key = "block_number",
unique_key = ['block_number'],
incremental_strategy = 'merge',
merge_exclude_columns = ["inserted_timestamp"],
merge_exclude_columns = ['inserted_timestamp'],
cluster_by = ['modified_timestamp::DATE'],
tags = ['core','full_test']
) }}
-- depends_on: {{ ref('bronze__blocks_tx') }}
SELECT
VALUE,
DATA :block_height :: INT AS block_number,
DATA :block_hash :: STRING AS block_hash,
DATA :block_timestamp :: bigint AS block_timestamp_num,
TO_TIMESTAMP(
block_timestamp_num :: STRING
) AS block_timestamp,
DATA :first_version :: bigint AS first_version,
DATA :last_version :: bigint AS last_version,
ARRAY_SIZE(
DATA :transactions
) AS tx_count_from_transactions_array,
last_version - first_version + 1 AS tx_count_from_versions,
WITH base AS (
SELECT
VALUE,
DATA :block_height :: INT AS block_number,
DATA :block_hash :: STRING AS block_hash,
DATA :block_timestamp :: bigint AS block_timestamp_num,
TO_TIMESTAMP(
block_timestamp_num :: STRING
) AS block_timestamp,
DATA :first_version :: bigint AS first_version,
DATA :last_version :: bigint AS last_version,
ARRAY_SIZE(
DATA :transactions
) AS tx_count_from_transactions_array,
last_version - first_version + 1 AS tx_count_from_versions
FROM
{% if is_incremental() %}
{{ ref('bronze__blocks_tx') }}
WHERE
inserted_timestamp >= (
SELECT
MAX(
DATEADD(
'minute',
-5,
modified_timestamp
)
)
FROM
{{ this }}
)
AND
block_number > 0
{% else %}
{{ ref('bronze__blocks_tx_FR') }}
{% endif %}
qualify(ROW_NUMBER() over(PARTITION BY block_number
ORDER BY
inserted_timestamp DESC)) = 1
)
SELECT
block_number,
block_hash,
block_timestamp_num,
block_timestamp,
first_version,
last_version,
tx_count_from_transactions_array,
tx_count_from_versions,
{{ dbt_utils.generate_surrogate_key(
['block_number']
) }} AS blocks_id,
@ -29,27 +65,4 @@ SELECT
SYSDATE() AS modified_timestamp,
'{{ invocation_id }}' AS _invocation_id
FROM
{% if is_incremental() %}
{{ ref('bronze__blocks_tx') }}
WHERE
inserted_timestamp >= (
SELECT
MAX(
DATEADD(
'minute',
-5,
modified_timestamp
)
)
FROM
{{ this }}
)
AND
block_number > 0
{% else %}
{{ ref('bronze__blocks_tx_FR') }}
{% endif %}
qualify(ROW_NUMBER() over(PARTITION BY block_number
ORDER BY
inserted_timestamp DESC)) = 1
base

View File

@ -7,7 +7,7 @@ models:
column_name: BLOCKS_ID
columns:
- name: BLOCK_NUMBER
- name: block_number
tests:
- not_null
- name: BLOCK_TIMESTAMP_NUM

View File

@ -1,6 +1,11 @@
{{ config(
materialized = 'view',
tags = ['core']
materialized = 'incremental',
unique_key = ['tx_hash', 'change_index'],
incremental_strategy = 'merge',
incremental_predicates = ["dynamic_range_predicate", "block_timestamp::DATE"],
merge_exclude_columns = ['inserted_timestamp'],
cluster_by = ['modified_timestamp::DATE'],
tags = ['core', 'full_test']
) }}
SELECT

View File

@ -3,7 +3,7 @@ models:
- name: silver__changes
columns:
- name: BLOCK_NUMBER
- name: block_number
- name: BLOCK_TIMESTAMP
- name: TX_HASH
- name: VERSION

View File

@ -1,6 +1,11 @@
{{ config(
materialized = 'view',
tags = ['core']
materialized = 'incremental',
unique_key = ['tx_hash','event_index'],
incremental_strategy = 'merge',
incremental_predicates = ["dynamic_range_predicate", "block_timestamp::DATE"],
merge_exclude_columns = ['inserted_timestamp'],
cluster_by = ['modified_timestamp::DATE'],
tags = ['core', 'full_test']
) }}
SELECT

View File

@ -3,7 +3,7 @@ models:
- name: silver__events
columns:
- name: BLOCK_NUMBER
- name: block_number
- name: BLOCK_TIMESTAMP
- name: TX_HASH
- name: VERSION

View File

@ -1,16 +1,17 @@
{{ config(
materialized = 'incremental',
unique_key = ['tx_hash','block_timestamp::DATE'],
incremental_strategy = 'merge',
incremental_predicates = ["dynamic_range_predicate", "block_timestamp::DATE"],
merge_exclude_columns = ["inserted_timestamp"],
cluster_by = ['modified_timestamp::DATE','tx_type'],
tags = ['core','full_test']
materialized = 'incremental',
unique_key = ['tx_hash', 'block_timestamp::DATE'],
incremental_strategy = 'merge',
incremental_predicates = ["dynamic_range_predicate", "block_timestamp::DATE"],
merge_exclude_columns = ['inserted_timestamp'],
cluster_by = ['modified_timestamp::DATE'],
tags = ['core', 'full_test']
) }}
-- depends_on: {{ ref('bronze__blocks_tx') }}
-- depends_on: {{ ref('bronze__transactions') }}
WITH from_blocks AS (
WITH from_blocks AS (
SELECT
TO_TIMESTAMP(
b.value :timestamp :: STRING
@ -21,76 +22,71 @@ WITH from_blocks AS (
b.value AS DATA,
inserted_timestamp AS file_last_updated
FROM
{% if is_incremental() %}
{{ ref('bronze__blocks_tx') }}
{% else %}
{{ ref('bronze__blocks_tx_FR') }}
{% endif %}
A,
LATERAL FLATTEN (DATA :transactions) b
{% if is_incremental() %}
{{ ref('bronze__blocks_tx') }}
{% else %}
{{ ref('bronze__blocks_tx_FR') }}
{% endif %}
A,
LATERAL FLATTEN (DATA :transactions) b
{% if is_incremental() %}
WHERE
A.inserted_timestamp >= (
SELECT
DATEADD('minute', -15, MAX(modified_timestamp))
FROM
{{ this }})
{% endif %}
),
from_transactions AS (
SELECT
TO_TIMESTAMP(
b.value :timestamp :: STRING
) AS block_timestamp,
b.value :hash :: STRING AS tx_hash,
b.value :version :: INT AS version,
b.value :type :: STRING AS tx_type,
b.value AS DATA,
inserted_timestamp AS file_last_updated
FROM
{% if is_incremental() %}
{{ ref('bronze__transactions') }}
{% else %}
{{ ref('bronze__transactions_FR') }}
{% endif %}
A,
LATERAL FLATTEN(A.data) b
{% if is_incremental() %}
WHERE
A.inserted_timestamp >= (
SELECT
DATEADD('minute', -15, MAX(modified_timestamp))
FROM
{{ this }})
{% endif %}
),
combo AS (
SELECT
block_timestamp,
tx_hash,
version,
tx_type,
DATA,
file_last_updated
FROM
from_blocks
UNION ALL
SELECT
block_timestamp,
tx_hash,
version,
tx_type,
DATA,
file_last_updated
FROM
from_transactions A
)
{% if is_incremental() %}
WHERE
A.inserted_timestamp >= (
SELECT
DATEADD('minute', -15, MAX(modified_timestamp))
FROM
{{ this }})
{% endif %}
),
from_transactions AS (
SELECT
TO_TIMESTAMP(
b.value :timestamp :: STRING
) AS block_timestamp,
b.value :hash :: STRING AS tx_hash,
b.value :version :: INT AS version,
b.value :type :: STRING AS tx_type,
b.value AS DATA,
inserted_timestamp AS file_last_updated
FROM
{% if is_incremental() %}
{{ ref('bronze__transactions') }}
{% else %}
{{ ref('bronze__transactions_FR') }}
{% endif %}
A,
LATERAL FLATTEN(A.data) b
{% if is_incremental() %}
WHERE
A.inserted_timestamp >= (
SELECT
DATEADD('minute', -15, MAX(modified_timestamp))
FROM
{{ this }})
{% endif %}
),
combo AS (
SELECT
block_timestamp,
tx_hash,
version,
tx_type,
DATA,
file_last_updated
FROM
from_blocks
UNION ALL
SELECT
block_timestamp,
tx_hash,
version,
tx_type,
DATA,
file_last_updated
FROM
from_transactions A
)
SELECT
COALESCE(
block_timestamp,

View File

@ -13,14 +13,14 @@ models:
column_type_list:
- TIMESTAMP_LTZ
- TIMESTAMP_NTZ
- name: BLOCK_NUMBER_MIN
- name: block_number_MIN
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- NUMBER
- FLOAT
- name: BLOCK_NUMBER_MAX
- name: block_number_MAX
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list: