base + SL

This commit is contained in:
Eric Laurello 2025-02-27 14:11:46 -05:00
parent 459361dbc4
commit c9d4034a9d
44 changed files with 1402 additions and 1 deletions

71
.github/workflows/dbt_docs_update.yml vendored Normal file
View File

@ -0,0 +1,71 @@
name: docs_update
on:
push:
branches:
- "main"
env:
USE_VARS: "${{ vars.USE_VARS }}"
DBT_PROFILES_DIR: "${{ vars.DBT_PROFILES_DIR }}"
DBT_VERSION: "${{ vars.DBT_VERSION }}"
ACCOUNT: "${{ vars.ACCOUNT }}"
ROLE: "${{ vars.ROLE }}"
USER: "${{ vars.USER }}"
PASSWORD: "${{ secrets.PASSWORD }}"
REGION: "${{ vars.REGION }}"
DATABASE: "${{ vars.DATABASE }}"
WAREHOUSE: "${{ vars.WAREHOUSE }}"
SCHEMA: "${{ vars.SCHEMA }}"
concurrency:
group: ${{ github.workflow }}
jobs:
run_dbt_jobs:
runs-on: ubuntu-latest
environment:
name: workflow_prod
steps:
- uses: actions/checkout@v3
- uses: actions/setup-python@v4
with:
python-version: "${{ vars.PYTHON_VERSION }}"
cache: "pip"
- name: install dependencies
run: |
pip install -r requirements.txt
dbt deps
- name: checkout docs branch
run: |
git checkout -B docs origin/main
- name: generate dbt docs
run: |
dbt ls -t prod
dbt docs generate --no-compile -t prod
- name: move files to docs directory
run: |
mkdir -p ./docs
cp target/{catalog.json,manifest.json,index.html} docs/
- name: clean up target directory
run: dbt clean
- name: check for changes
run: git status
- name: stage changed files
run: git add .
- name: commit changed files
run: |
git config user.email "abc@xyz"
git config user.name "github-actions"
git commit -am "Auto-update docs"
- name: push changes to docs
run: |
git push -f --set-upstream origin docs

67
.github/workflows/dbt_run_adhoc.yml vendored Normal file
View File

@ -0,0 +1,67 @@
name: dbt_run_adhoc
run-name: dbt_run_adhoc
on:
workflow_dispatch:
branches:
- "main"
inputs:
environment:
type: choice
description: DBT Run Environment
required: true
options:
- dev
- prod
default: dev
warehouse:
type: choice
description: Snowflake warehouse
required: true
options:
- DBT
- DBT_CLOUD
- DBT_EMERGENCY
default: DBT
dbt_command:
type: string
description: 'DBT Run Command'
required: true
env:
USE_VARS: "${{ vars.USE_VARS }}"
DBT_PROFILES_DIR: "${{ vars.DBT_PROFILES_DIR }}"
DBT_VERSION: "${{ vars.DBT_VERSION }}"
ACCOUNT: "${{ vars.ACCOUNT }}"
ROLE: "${{ vars.ROLE }}"
USER: "${{ vars.USER }}"
PASSWORD: "${{ secrets.PASSWORD }}"
REGION: "${{ vars.REGION }}"
DATABASE: "${{ vars.DATABASE }}"
WAREHOUSE: "${{ inputs.WAREHOUSE }}"
SCHEMA: "${{ vars.SCHEMA }}"
concurrency:
group: ${{ github.workflow }}
jobs:
run_dbt_jobs:
runs-on: ubuntu-latest
environment:
name: workflow_${{ inputs.environment }}
steps:
- uses: actions/checkout@v3
- uses: actions/setup-python@v4
with:
python-version: "${{ vars.PYTHON_VERSION }}"
cache: "pip"
- name: install dependencies
run: |
pip install -r requirements.txt
dbt deps
- name: Run DBT Jobs
run: |
${{ inputs.dbt_command }}

View File

@ -0,0 +1,44 @@
name: dbt_run_dev_refresh
run-name: dbt_run_dev_refresh
on:
workflow_dispatch:
schedule:
- cron: '27 8 * * *'
env:
DBT_PROFILES_DIR: "${{ vars.DBT_PROFILES_DIR }}"
ACCOUNT: "${{ vars.ACCOUNT }}"
ROLE: "${{ vars.ROLE }}"
USER: "${{ vars.USER }}"
PASSWORD: "${{ secrets.PASSWORD }}"
REGION: "${{ vars.REGION }}"
DATABASE: "${{ vars.DATABASE }}"
WAREHOUSE: "${{ vars.WAREHOUSE }}"
SCHEMA: "${{ vars.SCHEMA }}"
concurrency:
group: ${{ github.workflow }}
jobs:
run_dbt_jobs:
runs-on: ubuntu-latest
environment:
name: workflow_prod
steps:
- uses: actions/checkout@v3
- uses: actions/setup-python@v4
with:
python-version: "${{ vars.PYTHON_VERSION }}"
cache: "pip"
- name: install dependencies
run: |
pip install -r requirements.txt
dbt deps
- name: Run DBT Jobs
run: |
dbt run-operation run_sp_create_prod_clone

View File

@ -0,0 +1,45 @@
name: dbt_run_incremental_core
run-name: dbt_run_incremental_core
# on:
# workflow_dispatch:
# branches:
# - "main"
env:
DBT_PROFILES_DIR: "${{ vars.DBT_PROFILES_DIR }}"
ACCOUNT: "${{ vars.ACCOUNT }}"
ROLE: "${{ vars.ROLE }}"
USER: "${{ vars.USER }}"
PASSWORD: "${{ secrets.PASSWORD }}"
REGION: "${{ vars.REGION }}"
DATABASE: "${{ vars.DATABASE }}"
WAREHOUSE: "${{ vars.WAREHOUSE }}"
SCHEMA: "${{ vars.SCHEMA }}"
concurrency:
group: ${{ github.workflow }}
jobs:
run_dbt_jobs:
runs-on: ubuntu-latest
environment:
name: workflow_prod
steps:
- uses: actions/checkout@v3
- uses: actions/setup-python@v4
with:
python-version: "${{ vars.PYTHON_VERSION }}"
cache: "pip"
- name: install dependencies
run: |
pip install -r requirements.txt
dbt deps
- name: Run DBT Jobs
run: |
dbt run -s "ton_models,tag:scheduled_core"

View File

@ -0,0 +1,46 @@
name: dbt_run_streamline_realtime
run-name: dbt_run_streamline_realtime
on:
workflow_dispatch:
branches:
- "main"
schedule:
- cron: '27 8 * * *'
env:
DBT_PROFILES_DIR: "${{ vars.DBT_PROFILES_DIR }}"
ACCOUNT: "${{ vars.ACCOUNT }}"
ROLE: "${{ vars.ROLE }}"
USER: "${{ vars.USER }}"
PASSWORD: "${{ secrets.PASSWORD }}"
REGION: "${{ vars.REGION }}"
DATABASE: "${{ vars.DATABASE }}"
WAREHOUSE: "${{ vars.WAREHOUSE }}"
SCHEMA: "${{ vars.SCHEMA }}"
concurrency:
group: ${{ github.workflow }}
jobs:
run_dbt_jobs:
runs-on: ubuntu-latest
environment:
name: workflow_prod
steps:
- uses: actions/checkout@v3
- uses: actions/setup-python@v4
with:
python-version: "${{ vars.PYTHON_VERSION }}"
cache: "pip"
- name: install dependencies
run: |
pip install -r requirements.txt
dbt deps
- name: Run DBT Jobs
run: |
dbt run -m "ton_models,tag:streamline_realtime" --vars '{STREAMLINE_INVOKE_STREAMS: True}'

20
.gitignore vendored Normal file
View File

@ -0,0 +1,20 @@
target/
dbt_modules/
# newer versions of dbt use this directory instead of dbt_modules for test dependencies
dbt_packages/
logs/
.venv/
.python-version
dbt-env/
venv/
# Visual Studio Code files
*/.vscode
*.code-workspace
.history/
**/.DS_Store
.vscode/
.env
.DS_Store
.user.yml

View File

@ -1 +1,74 @@
# ton-models
## Profile Set Up
#### Use the following within profiles.yml
----
```yml
ton:
target: dev
outputs:
dev:
type: snowflake
account: <ACCOUNT>
role: <ROLE>
user: <USERNAME>
password: <PASSWORD>
region: <REGION>
database: ton_DEV
warehouse: <WAREHOUSE>
schema: silver
threads: 4
client_session_keep_alive: False
query_tag: <TAG>
```
### Resources:
- Learn more about dbt [in the docs](https://docs.getdbt.com/docs/introduction)
- Check out [Discourse](https://discourse.getdbt.com/) for commonly asked questions and answers
- Join the [chat](https://community.getdbt.com/) on Slack for live discussions and support
- Find [dbt events](https://events.getdbt.com) near you
- Check out [the blog](https://blog.getdbt.com/) for the latest news on dbt's development and best practices
- Check out [the blog](https://blog.getdbt.com/) for the latest news on dbt's development and best practices
## Applying Model Tags
### Database / Schema level tags
Database and schema tags are applied via the `add_database_or_schema_tags` macro. These tags are inherited by their downstream objects. To add/modify tags call the appropriate tag set function within the macro.
```
{{ set_database_tag_value('SOME_DATABASE_TAG_KEY','SOME_DATABASE_TAG_VALUE') }}
{{ set_schema_tag_value('SOME_SCHEMA_TAG_KEY','SOME_SCHEMA_TAG_VALUE') }}
```
### Model tags
To add/update a model's snowflake tags, add/modify the `meta` model property under `config`. Only table level tags are supported at this time via DBT.
```
{{ config(
...,
meta={
'database_tags':{
'table': {
'PURPOSE': 'SOME_PURPOSE'
}
}
},
...
) }}
```
By default, model tags are not pushed to snowflake on each load. You can push a tag update for a model by specifying the `UPDATE_SNOWFLAKE_TAGS` project variable during a run.
```
dbt run --var '{"UPDATE_SNOWFLAKE_TAGS":True}' -s models/core/core__fact_swaps.sql
```
### Querying for existing tags on a model in snowflake
```
select *
from table(ton.information_schema.tag_references('ton.core.fact_blocks', 'table'));
```

101
dbt_project.yml Normal file
View File

@ -0,0 +1,101 @@
# Name your project! Project names should contain only lowercase characters
# and underscores. A good package name should reflect your organization's
# name or the intended use of these models
name: "ton_models"
version: "1.0.0"
config-version: 2
require-dbt-version: ">=1.8.0"
# This setting configures which "profile" dbt uses for this project.
profile: "ton"
# These configurations specify where dbt should look for different types of files.
# The `model-paths` config, for example, states that models in this project can be
# found in the "models/" directory. You probably won't need to change these!
model-paths: ["models"]
analysis-paths: ["analysis"]
test-paths: ["tests"]
seed-paths: ["data"]
macro-paths: ["macros"]
snapshot-paths: ["snapshots"]
target-path: "target" # directory which will store compiled SQL files
clean-targets: # directories to be removed by `dbt clean`
- "target"
- "dbt_modules"
- "dbt_packages"
# Configuring models
# Full documentation: https://docs.getdbt.com/docs/configuring-models
# In this example config, we tell dbt to build all models in the example/ directory
# as tables. These settings can be overridden in the individual model files
# using the `{{ config(...) }}` macro.
models:
+copy_grants: true
+persist_docs:
relation: true
columns: true
+on_schema_change: "append_new_columns"
livequery_models:
deploy:
core:
materialized: ephemeral
data_tests:
ton_models: # replace with the name of the chain
+store_failures: true # all tests
on-run-start:
- '{{create_sps()}}'
- '{{create_udfs()}}'
on-run-end:
- '{{ apply_meta_as_tags(results) }}'
dispatch:
- macro_namespace: dbt
search_order:
- ton-models
- dbt_snowflake_query_tags
- dbt
query-comment:
comment: '{{ dbt_snowflake_query_tags.get_query_comment(node) }}'
append: true # Snowflake removes prefixed comments.
vars:
"dbt_date:time_zone": GMT
OBSERV_FULL_TEST: False
START_GHA_TASKS: False
STREAMLINE_INVOKE_STREAMS: true
STREAMLINE_USE_DEV_FOR_EXTERNAL_TABLES: True
STREAMLINE_RUN_HISTORY: False
STREAMLINE_RETRY_UNKNOWN: False
UPDATE_SNOWFLAKE_TAGS: True
UPDATE_UDFS_AND_SPS: True
API_INTEGRATION: '{{ var("config")[target.name]["API_INTEGRATION"] if var("config")[target.name] else var("config")["dev"]["API_INTEGRATION"] }}'
EXTERNAL_FUNCTION_URI: '{{ var("config")[target.name]["EXTERNAL_FUNCTION_URI"] if var("config")[target.name] else var("config")["dev"]["EXTERNAL_FUNCTION_URI"] }}'
ROLES: |
["INTERNAL_DEV"]
config:
dev:
API_INTEGRATION: AWS_TON_API_STG_V2
EXTERNAL_FUNCTION_URI: f1nw4eppf9.execute-api.us-east-1.amazonaws.com/stg/
ROLES:
- AWS_LAMBDA_TON_API
- INTERNAL_DEV
prod:
API_INTEGRATION: AWS_TON_API_PROD_V2
EXTERNAL_FUNCTION_URI: e2rz7s6i8j.execute-api.us-east-1.amazonaws.com/prod/
ROLES:
- AWS_LAMBDA_TON_API
- INTERNAL_DEV
- DBT_CLOUD_TON

0
docs/.gitkeep Normal file
View File

0
macros/.gitkeep Normal file
View File

12
macros/create_sps.sql Normal file
View File

@ -0,0 +1,12 @@
{% macro create_sps() %}
{% if target.database == 'TON' %}
CREATE SCHEMA IF NOT EXISTS _internal;
{{ sp_create_prod_clone('_internal') }};
{% endif %}
{% endmacro %}
{% macro enable_search_optimization(schema_name, table_name, condition = '') %}
{% if target.database == 'TON' %}
ALTER TABLE {{ schema_name }}.{{ table_name }} ADD SEARCH OPTIMIZATION {{ condition }}
{% endif %}
{% endmacro %}

8
macros/create_udfs.sql Normal file
View File

@ -0,0 +1,8 @@
{% macro create_udfs() %}
{% if var("UPDATE_UDFS_AND_SPS") %}
{% set sql %}
{{ create_udf_bulk_rest_api_v2() }};
{% endset %}
{% do run_query(sql) %}
{% endif %}
{% endmacro %}

View File

@ -0,0 +1,33 @@
{% macro generate_schema_name(
custom_schema_name = none,
node = none
) -%}
{% set node_name = node.name %}
{% set split_name = node_name.split('__') %}
{{ split_name [0] | trim }}
{%- endmacro %}
{% macro generate_alias_name(
custom_alias_name = none,
node = none
) -%}
{% set node_name = node.name %}
{% set split_name = node_name.split('__') %}
{% if split_name | length < 2 %}
{{ split_name [0] | trim }}
{% else %}
{{ split_name [1] | trim }}
{% endif %}
{%- endmacro %}
{% macro generate_tmp_view_name(model_name) -%}
{% set node_name = model_name.name %}
{% set split_name = node_name.split('__') %}
{{ target.database ~ '.' ~ split_name[0] ~ '.' ~ split_name [1] ~ '__dbt_tmp' | trim }}
{%- endmacro %}
{% macro generate_view_name(model_name) -%}
{% set node_name = model_name.name %}
{% set split_name = node_name.split('__') %}
{{ target.database ~ '.' ~ split_name[0] ~ '.' ~ split_name [1] | trim }}
{%- endmacro %}

View File

@ -0,0 +1,11 @@
{% macro set_query_tag() -%}
{% set new_json = {"repo":project_name, "object":this.table, "profile":target.profile_name, "env":target.name, "existing_tag":get_current_query_tag() } %}
{% set new_query_tag = tojson(new_json) | as_text %}
{% if new_query_tag %}
{% set original_query_tag = get_current_query_tag() %}
{{ log("Setting query_tag to '" ~ new_query_tag ~ "'. Will reset to '" ~ original_query_tag ~ "' after materialization.") }}
{% do run_query("alter session set query_tag = '{}'".format(new_query_tag)) %}
{{ return(original_query_tag)}}
{% endif %}
{{ return(none)}}
{% endmacro %}

30
macros/dbt/get_merge.sql Normal file
View File

@ -0,0 +1,30 @@
-- incremental_strategy="merge"
{% macro get_merge_sql(target, source, unique_key, dest_columns, incremental_predicates) -%}
{% set merge_sql = fsc_utils.get_merge_sql(target, source, unique_key, dest_columns, incremental_predicates) %}
{{ return(merge_sql) }}
{% endmacro %}
-- incremental_strategy="delete+insert"
{% macro get_delete_insert_merge_sql(target, source, unique_key, dest_columns, incremental_predicates) -%}
{% set predicate_override = "" %}
-- get the min value of column
{% if incremental_predicates[0] == "min_value_predicate" %}
{% set min_column_name = incremental_predicates[1] %}
{% set query %}
select min({{ min_column_name }}) from {{ source }}
{% endset %}
{% set min_block = run_query(query).columns[0][0] %}
{% if min_block is not none %}
{% set predicate_override %}
round({{ target }}.{{ min_column_name }},-5) >= round({{ min_block }},-5)
{% endset %}
{% else %}
{% set predicate_override = "1=1" %}
{% endif %}
{% endif %}
{% set predicates = [predicate_override] + incremental_predicates[2:] if predicate_override else incremental_predicates %}
-- standard delete+insert from here
{% set merge_sql = dbt.get_delete_insert_merge_sql(target, source, unique_key, dest_columns, predicates) %}
{{ return(merge_sql) }}
{% endmacro %}

View File

@ -0,0 +1,4 @@
{% macro dbt_snowflake_get_tmp_relation_type(strategy, unique_key, language) %}
-- always table
{{ return('table') }}
{% endmacro %}

View File

@ -0,0 +1,10 @@
{% macro run_sp_create_prod_clone() %}
{% set clone_query %}
call ton._internal.create_prod_clone(
'ton',
'ton_dev',
'internal_dev'
);
{% endset %}
{% do run_query(clone_query) %}
{% endmacro %}

View File

@ -0,0 +1,51 @@
{% macro sp_create_prod_clone(target_schema) -%}
create or replace procedure {{ target_schema }}.create_prod_clone(source_db_name string, destination_db_name string, role_name string)
returns boolean
language javascript
execute as caller
as
$$
snowflake.execute({sqlText: `BEGIN TRANSACTION;`});
try {
snowflake.execute({sqlText: `CREATE OR REPLACE DATABASE ${DESTINATION_DB_NAME} CLONE ${SOURCE_DB_NAME}`});
snowflake.execute({sqlText: `DROP SCHEMA IF EXISTS ${DESTINATION_DB_NAME}._INTERNAL`}); /* this only needs to be in prod */
snowflake.execute({sqlText: `GRANT USAGE ON DATABASE ${DESTINATION_DB_NAME} TO ROLE AWS_LAMBDA_TON_API`});
snowflake.execute({sqlText: `GRANT OWNERSHIP ON ALL SCHEMAS IN DATABASE ${DESTINATION_DB_NAME} TO ROLE ${ROLE_NAME} COPY CURRENT GRANTS;`});
snowflake.execute({sqlText: `GRANT OWNERSHIP ON ALL FUNCTIONS IN DATABASE ${DESTINATION_DB_NAME} TO ROLE ${ROLE_NAME} COPY CURRENT GRANTS;`});
snowflake.execute({sqlText: `GRANT OWNERSHIP ON ALL PROCEDURES IN DATABASE ${DESTINATION_DB_NAME} TO ROLE ${ROLE_NAME} COPY CURRENT GRANTS;`});
snowflake.execute({sqlText: `GRANT OWNERSHIP ON ALL VIEWS IN DATABASE ${DESTINATION_DB_NAME} TO ROLE ${ROLE_NAME} COPY CURRENT GRANTS;`});
snowflake.execute({sqlText: `GRANT OWNERSHIP ON ALL STAGES IN DATABASE ${DESTINATION_DB_NAME} TO ROLE ${ROLE_NAME} COPY CURRENT GRANTS;`});
snowflake.execute({sqlText: `GRANT OWNERSHIP ON ALL TABLES IN DATABASE ${DESTINATION_DB_NAME} TO ROLE ${ROLE_NAME} COPY CURRENT GRANTS;`});
snowflake.execute({sqlText: `REVOKE OWNERSHIP ON FUTURE FUNCTIONS IN DATABASE ${DESTINATION_DB_NAME} FROM ROLE DBT_CLOUD_TON;`});
snowflake.execute({sqlText: `REVOKE OWNERSHIP ON FUTURE PROCEDURES IN DATABASE ${DESTINATION_DB_NAME} FROM ROLE DBT_CLOUD_TON;`});
snowflake.execute({sqlText: `REVOKE OWNERSHIP ON FUTURE VIEWS IN DATABASE ${DESTINATION_DB_NAME} FROM ROLE DBT_CLOUD_TON;`});
snowflake.execute({sqlText: `REVOKE OWNERSHIP ON FUTURE STAGES IN DATABASE ${DESTINATION_DB_NAME} FROM ROLE DBT_CLOUD_TON;`});
snowflake.execute({sqlText: `REVOKE OWNERSHIP ON FUTURE TABLES IN DATABASE ${DESTINATION_DB_NAME} FROM ROLE DBT_CLOUD_TON;`});
snowflake.execute({sqlText: `GRANT OWNERSHIP ON FUTURE FUNCTIONS IN DATABASE ${DESTINATION_DB_NAME} TO ROLE ${ROLE_NAME};`});
snowflake.execute({sqlText: `GRANT OWNERSHIP ON FUTURE PROCEDURES IN DATABASE ${DESTINATION_DB_NAME} TO ROLE ${ROLE_NAME};`});
snowflake.execute({sqlText: `GRANT OWNERSHIP ON FUTURE VIEWS IN DATABASE ${DESTINATION_DB_NAME} TO ROLE ${ROLE_NAME};`});
snowflake.execute({sqlText: `GRANT OWNERSHIP ON FUTURE STAGES IN DATABASE ${DESTINATION_DB_NAME} TO ROLE ${ROLE_NAME};`});
snowflake.execute({sqlText: `GRANT OWNERSHIP ON FUTURE TABLES IN DATABASE ${DESTINATION_DB_NAME} TO ROLE ${ROLE_NAME};`});
snowflake.execute({sqlText: `GRANT USAGE ON ALL STAGES IN DATABASE ${DESTINATION_DB_NAME} TO ROLE AWS_LAMBDA_TON_API;`});
snowflake.execute({sqlText: `GRANT OWNERSHIP ON DATABASE ${DESTINATION_DB_NAME} TO ROLE ${ROLE_NAME} COPY CURRENT GRANTS;`})
var existing_tags = snowflake.execute({sqlText: `SHOW TAGS IN DATABASE ${DESTINATION_DB_NAME};`});
while (existing_tags.next()) {
var schema = existing_tags.getColumnValue(4);
var tag_name = existing_tags.getColumnValue(2)
snowflake.execute({sqlText: `GRANT OWNERSHIP ON TAG ${DESTINATION_DB_NAME}.${schema}.${tag_name} TO ROLE ${ROLE_NAME} COPY CURRENT GRANTS;`});
}
snowflake.execute({sqlText: `COMMIT;`});
} catch (err) {
snowflake.execute({sqlText: `ROLLBACK;`});
throw(err);
}
return true
$$
{%- endmacro %}

View File

@ -0,0 +1,83 @@
{% macro streamline_external_table_query_v2(
model,
partition_function,
partition_name,
other_cols
) %}
WITH meta AS (
SELECT
LAST_MODIFIED::timestamp_ntz AS _inserted_timestamp,
file_name,
{{ partition_function }} AS {{ partition_name }}
FROM
TABLE(
information_schema.external_table_file_registration_history(
start_time => DATEADD('day', -3, CURRENT_TIMESTAMP()),
table_name => '{{ source( "bronze_streamline", model) }}')
) A
)
SELECT
{{ other_cols }},
DATA,
_inserted_timestamp,
s.{{ partition_name }},
s.value AS VALUE,
file_name
FROM
{{ source(
"bronze_streamline",
model
) }}
s
JOIN
meta b
ON b.file_name = metadata$filename
AND b.{{ partition_name }} = s.{{ partition_name }}
WHERE
b.{{ partition_name }} = s.{{ partition_name }}
AND (
data:error:code IS NULL
)
{% endmacro %}
{% macro streamline_external_table_FR_query_v2(
model,
partition_function,
partition_name,
other_cols
) %}
WITH meta AS (
SELECT
LAST_MODIFIED::timestamp_ntz AS _inserted_timestamp,
file_name,
{{ partition_function }} AS {{ partition_name }}
FROM
TABLE(
information_schema.external_table_files(
table_name => '{{ source( "bronze_streamline", model) }}'
)
) A
)
SELECT
{{ other_cols }},
DATA,
_inserted_timestamp,
s.{{ partition_name }},
s.value AS VALUE,
file_name
FROM
{{ source(
"bronze_streamline",
model
) }}
s
JOIN
meta b
ON b.file_name = metadata$filename
AND b.{{ partition_name }} = s.{{ partition_name }}
WHERE
b.{{ partition_name }} = s.{{ partition_name }}
AND (
data:error:code IS NULL
)
{% endmacro %}

View File

@ -0,0 +1,10 @@
{% macro create_udf_bulk_rest_api_v2() %}
CREATE
OR REPLACE EXTERNAL FUNCTION streamline.udf_bulk_rest_api_v2(
json OBJECT
) returns ARRAY {% if target.database == 'TON' -%}
api_integration = aws_ton_api_prod AS 'https://e2rz7s6i8j.execute-api.us-east-1.amazonaws.com/prod/udf_bulk_rest_api'
{% else %}
api_integration = aws_ton_api_stg_v2 AS 'https://f1nw4eppf9.execute-api.us-east-1.amazonaws.com/stg/udf_bulk_rest_api'
{%- endif %}
{% endmacro %}

View File

@ -0,0 +1,6 @@
{% macro add_database_or_schema_tags() %}
{{ set_database_tag_value(
'BLOCKCHAIN_NAME',
'TON'
) }}
{% endmacro %}

View File

@ -0,0 +1,127 @@
{% macro apply_meta_as_tags(results) %}
{% if var("UPDATE_SNOWFLAKE_TAGS") %}
{{ log('apply_meta_as_tags', info=False) }}
{{ log(results, info=False) }}
{% if execute %}
{%- set tags_by_schema = {} -%}
{% for res in results -%}
{% if res.node.meta.database_tags %}
{%- set model_database = res.node.database -%}
{%- set model_schema = res.node.schema -%}
{%- set model_schema_full = model_database+'.'+model_schema -%}
{%- set model_alias = res.node.alias -%}
{% if model_schema_full not in tags_by_schema.keys() %}
{{ log('need to fetch tags for schema '+model_schema_full, info=False) }}
{%- call statement('main', fetch_result=True) -%}
show tags in {{model_database}}.{{model_schema}}
{%- endcall -%}
{%- set _ = tags_by_schema.update({model_schema_full: load_result('main')['table'].columns.get('name').values()|list}) -%}
{{ log('Added tags to cache', info=False) }}
{% else %}
{{ log('already have tag info for schema', info=False) }}
{% endif %}
{%- set current_tags_in_schema = tags_by_schema[model_schema_full] -%}
{{ log('current_tags_in_schema:', info=False) }}
{{ log(current_tags_in_schema, info=False) }}
{{ log("========== Processing tags for "+model_schema_full+"."+model_alias+" ==========", info=False) }}
{% set line -%}
node: {{ res.node.unique_id }}; status: {{ res.status }} (message: {{ res.message }})
node full: {{ res.node}}
meta: {{ res.node.meta}}
materialized: {{ res.node.config.materialized }}
{%- endset %}
{{ log(line, info=False) }}
{%- call statement('main', fetch_result=True) -%}
select LEVEL,UPPER(TAG_NAME) as TAG_NAME,TAG_VALUE from table(information_schema.tag_references_all_columns('{{model_schema}}.{{model_alias}}', 'table'))
{%- endcall -%}
{%- set existing_tags_for_table = load_result('main')['data'] -%}
{{ log('Existing tags for table:', info=False) }}
{{ log(existing_tags_for_table, info=False) }}
{{ log('--', info=False) }}
{% for table_tag in res.node.meta.database_tags.table %}
{{ create_tag_if_missing(current_tags_in_schema,table_tag|upper) }}
{% set desired_tag_value = res.node.meta.database_tags.table[table_tag] %}
{{set_table_tag_value_if_different(model_schema,model_alias,table_tag,desired_tag_value,existing_tags_for_table)}}
{% endfor %}
{{ log("========== Finished processing tags for "+model_alias+" ==========", info=False) }}
{% endif %}
{% endfor %}
{% endif %}
{% endif %}
{% endmacro %}
{% macro create_tag_if_missing(all_tag_names,table_tag) %}
{% if table_tag not in all_tag_names %}
{{ log('Creating missing tag '+table_tag, info=False) }}
{%- call statement('main', fetch_result=True) -%}
create tag if not exists silver.{{table_tag}}
{%- endcall -%}
{{ log(load_result('main').data, info=False) }}
{% else %}
{{ log('Tag already exists: '+table_tag, info=False) }}
{% endif %}
{% endmacro %}
{% macro set_table_tag_value_if_different(model_schema,table_name,tag_name,desired_tag_value,existing_tags) %}
{{ log('Ensuring tag '+tag_name+' has value '+desired_tag_value+' at table level', info=False) }}
{%- set existing_tag_for_table = existing_tags|selectattr('0','equalto','TABLE')|selectattr('1','equalto',tag_name|upper)|list -%}
{{ log('Filtered tags for table:', info=False) }}
{{ log(existing_tag_for_table[0], info=False) }}
{% if existing_tag_for_table|length > 0 and existing_tag_for_table[0][2]==desired_tag_value %}
{{ log('Correct tag value already exists', info=False) }}
{% else %}
{{ log('Setting tag value for '+tag_name+' to value '+desired_tag_value, info=False) }}
{%- call statement('main', fetch_result=True) -%}
alter table {{model_schema}}.{{table_name}} set tag {{tag_name}} = '{{desired_tag_value}}'
{%- endcall -%}
{{ log(load_result('main').data, info=False) }}
{% endif %}
{% endmacro %}
{% macro set_column_tag_value_if_different(table_name,column_name,tag_name,desired_tag_value,existing_tags) %}
{{ log('Ensuring tag '+tag_name+' has value '+desired_tag_value+' at column level', info=False) }}
{%- set existing_tag_for_column = existing_tags|selectattr('0','equalto','COLUMN')|selectattr('1','equalto',tag_name|upper)|list -%}
{{ log('Filtered tags for column:', info=False) }}
{{ log(existing_tag_for_column[0], info=False) }}
{% if existing_tag_for_column|length > 0 and existing_tag_for_column[0][2]==desired_tag_value %}
{{ log('Correct tag value already exists', info=False) }}
{% else %}
{{ log('Setting tag value for '+tag_name+' to value '+desired_tag_value, info=False) }}
{%- call statement('main', fetch_result=True) -%}
alter table {{table_name}} modify column {{column_name}} set tag {{tag_name}} = '{{desired_tag_value}}'
{%- endcall -%}
{{ log(load_result('main').data, info=False) }}
{% endif %}
{% endmacro %}
{% macro set_database_tag_value(tag_name,tag_value) %}
{% set query %}
create tag if not exists silver.{{tag_name}}
{% endset %}
{% do run_query(query) %}
{% set query %}
alter database {{target.database}} set tag {{target.database}}.silver.{{tag_name}} = '{{tag_value}}'
{% endset %}
{% do run_query(query) %}
{% endmacro %}
{% macro set_schema_tag_value(target_schema,tag_name,tag_value) %}
{% set query %}
create tag if not exists silver.{{tag_name}}
{% endset %}
{% do run_query(query) %}
{% set query %}
alter schema {{target.database}}.{{target_schema}} set tag {{target.database}}.silver.{{tag_name}} = '{{tag_value}}'
{% endset %}
{% do run_query(query) %}
{% endmacro %}

View File

@ -0,0 +1,29 @@
{% test compare_model_subset(model, compare_model, compare_columns, model_condition) %}
{% set compare_cols_csv = compare_columns | join(', ') %}
with a as (
select {{compare_cols_csv}} from {{ model }}
{{ model_condition }}
),
b as (
select {{compare_cols_csv}} from {{ compare_model }}
),
a_minus_b as (
select * from a
except
select * from b
),
b_minus_a as (
select * from b
except
select * from a
),
unioned as (
select 'in_actual_not_in_expected' as which_diff, a_minus_b.* from a_minus_b
union all
select 'in_expected_not_in_actual' as which_diff, b_minus_a.* from b_minus_a
)
select * from unioned
{% endtest %}

View File

@ -0,0 +1,37 @@
{% macro sequence_gaps(
table,
partition_by,
column
) %}
{%- set partition_sql = partition_by | join(", ") -%}
{%- set previous_column = "prev_" ~ column -%}
WITH source AS (
SELECT
{{ partition_sql + "," if partition_sql }}
{{ column }},
LAG(
{{ column }},
1
) over (
{{ "PARTITION BY " ~ partition_sql if partition_sql }}
ORDER BY
{{ column }} ASC
) AS {{ previous_column }}
FROM
{{ table }}
WHERE
block_timestamp::date <= current_date - 1
)
SELECT
{{ partition_sql + "," if partition_sql }}
{{ previous_column }},
{{ column }},
{{ column }} - {{ previous_column }}
- 1 AS gap
FROM
source
WHERE
{{ column }} - {{ previous_column }} <> 1
ORDER BY
gap DESC
{% endmacro %}

View File

@ -0,0 +1,9 @@
{{ config (
materialized = 'view'
) }}
{{ streamline_external_table_query_v2(
model = 'blocks',
partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 3), '_', 1) AS INTEGER)",
partition_name = "partition_key",
other_cols = "value:SEQUENCE_NUMBER::BIGINT as SEQUENCE_NUMBER, value:SHARD::STRING as SHARD, value:WORKCHAIN::INT as WORKCHAIN"
) }}

View File

@ -0,0 +1,9 @@
{{ config (
materialized = 'view'
) }}
{{ streamline_external_table_FR_query_v2(
model = 'blocks',
partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 3), '_', 1) AS INTEGER)",
partition_name = "partition_key",
other_cols = "value:SEQUENCE_NUMBER::BIGINT as SEQUENCE_NUMBER, value:SHARD::STRING as SHARD, value:WORKCHAIN::INT as WORKCHAIN"
) }}

View File

@ -0,0 +1,9 @@
{{ config (
materialized = 'view'
) }}
{{ streamline_external_table_query_v2(
model = 'shards',
partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 3), '_', 1) AS INTEGER)",
partition_name = "partition_key",
other_cols = "value:SEQUENCE_NUMBER::BIGINT as SEQUENCE_NUMBER"
) }}

View File

@ -0,0 +1,9 @@
{{ config (
materialized = 'view'
) }}
{{ streamline_external_table_FR_query_v2(
model = 'shards',
partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 3), '_', 1) AS INTEGER)",
partition_name = "partition_key",
other_cols = "value:SEQUENCE_NUMBER::BIGINT as SEQUENCE_NUMBER"
) }}

View File

@ -0,0 +1,9 @@
{{ config (
materialized = 'view'
) }}
{{ streamline_external_table_query_v2(
model = 'transactions',
partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 3), '_', 1) AS INTEGER)",
partition_name = "partition_key",
other_cols = "value:SEQUENCE_NUMBER::BIGINT as SEQUENCE_NUMBER, value:SHARD::STRING as SHARD, value:WORKCHAIN::INT as WORKCHAIN"
) }}

View File

@ -0,0 +1,9 @@
{{ config (
materialized = 'view'
) }}
{{ streamline_external_table_FR_query_v2(
model = 'transactions',
partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 3), '_', 1) AS INTEGER)",
partition_name = "partition_key",
other_cols = "value:SEQUENCE_NUMBER::BIGINT as SEQUENCE_NUMBER, value:SHARD::STRING as SHARD, value:WORKCHAIN::INT as WORKCHAIN"
) }}

24
models/sources.yml Normal file
View File

@ -0,0 +1,24 @@
version: 2
sources:
- name: bronze_streamline
database: streamline
schema: "{{ 'ton' if target.database == 'TON' else 'ton_dev' }}"
tables:
- name: shards
- name: blocks
- name: transactions
- name: crosschain
database: "{{ 'crosschain' if target.database == 'TON' else 'crosschain_dev' }}"
schema: core
tables:
- name: address_tags
- name: dim_dates
- name: crosschain_silver
database: "{{ 'crosschain' if target.database == 'TON' else 'crosschain_dev' }}"
schema: silver
tables:
- name: number_sequence
- name: complete_native_prices
- name: labels_combined

View File

@ -0,0 +1,46 @@
-- depends_on: {{ ref('bronze__blocks') }}
-- depends_on: {{ ref('bronze__blocks_FR') }}
{{ config (
materialized = "incremental",
unique_key = ['sequence_number','shard','workchain'],
merge_exclude_columns = ["inserted_timestamp"],
cluster_by = "ROUND(sequence_number, -5)",
tags = ['streamline_realtime']
) }}
SELECT
sequence_number,
shard,
workchain,
partition_key,
_inserted_timestamp,
SYSDATE() AS inserted_timestamp,
SYSDATE() AS modified_timestamp,
file_name,
'{{ invocation_id }}' AS _invocation_id,
FROM
{% if is_incremental() %}
{{ ref('bronze__blocks') }}
{% else %}
{{ ref('bronze__blocks_FR') }}
{% endif %}
WHERE
DATA :ok = TRUE
{% if is_incremental() %}
AND _inserted_timestamp >= (
SELECT
COALESCE(MAX(_INSERTED_TIMESTAMP), '1970-01-01' :: DATE) max_INSERTED_TIMESTAMP
FROM
{{ this }})
AND DATA IS NOT NULL
{% endif %}
qualify ROW_NUMBER() over (
PARTITION BY sequence_number,
shard,
workchain
ORDER BY
_inserted_timestamp DESC
) = 1

View File

@ -0,0 +1,42 @@
-- depends_on: {{ ref('bronze__shards') }}
-- depends_on: {{ ref('bronze__shards_FR') }}
{{ config (
materialized = "incremental",
unique_key = 'sequence_number',
merge_exclude_columns = ["inserted_timestamp"],
cluster_by = "ROUND(sequence_number, -5)",
tags = ['streamline_realtime']
) }}
SELECT
sequence_number AS sequence_number,
partition_key,
DATA :result :shards AS shards,
_inserted_timestamp,
SYSDATE() AS inserted_timestamp,
SYSDATE() AS modified_timestamp,
'{{ invocation_id }}' AS _invocation_id,
FROM
{% if is_incremental() %}
{{ ref('bronze__shards') }}
{% else %}
{{ ref('bronze__shards_FR') }}
{% endif %}
WHERE
DATA :ok = TRUE
{% if is_incremental() %}
AND _inserted_timestamp >= (
SELECT
COALESCE(MAX(_INSERTED_TIMESTAMP), '1970-01-01' :: DATE) max_INSERTED_TIMESTAMP
FROM
{{ this }})
AND DATA IS NOT NULL
{% endif %}
qualify ROW_NUMBER() over (
PARTITION BY sequence_number
ORDER BY
_inserted_timestamp DESC
) = 1

View File

@ -0,0 +1,47 @@
-- depends_on: {{ ref('bronze__transactions') }}
-- depends_on: {{ ref('bronze__transactions_FR') }}
{{ config (
materialized = "incremental",
unique_key = ['sequence_number','shard','workchain'],
merge_exclude_columns = ["inserted_timestamp"],
cluster_by = "ROUND(sequence_number, -5)",
tags = ['streamline_realtime']
) }}
SELECT
sequence_number,
shard,
workchain,
partition_key,
_inserted_timestamp,
SYSDATE() AS inserted_timestamp,
SYSDATE() AS modified_timestamp,
file_name,
'{{ invocation_id }}' AS _invocation_id,
FROM
{% if is_incremental() %}
{{ ref('bronze__transactions') }}
{% else %}
{{ ref('bronze__transactions_FR') }}
{% endif %}
WHERE
VALUE :"result.incomplete" = FALSE
AND VALUE :ok = TRUE
{% if is_incremental() %}
AND _inserted_timestamp >= (
SELECT
COALESCE(MAX(_INSERTED_TIMESTAMP), '1970-01-01' :: DATE) max_INSERTED_TIMESTAMP
FROM
{{ this }})
AND DATA IS NOT NULL
{% endif %}
qualify ROW_NUMBER() over (
PARTITION BY sequence_number,
shard,
workchain
ORDER BY
_inserted_timestamp DESC
) = 1

View File

@ -0,0 +1,51 @@
{{ config (
materialized = "view",
post_hook = fsc_utils.if_data_call_function_v2(
func = 'streamline.udf_bulk_rest_api_v2',
target = "{{this.schema}}.{{this.identifier}}",
params ={ "external_table" :"blocks",
"sql_limit" :"100000",
"producer_batch_size" :"100000",
"worker_batch_size" :"50000",
"sql_source" :"{{this.identifier}}",
"order_by_column": "sequence_number DESC" }
),
tags = ['streamline_realtime']
) }}
WITH shards AS (
SELECT
sequence_number,
shard,
workchain
FROM
{{ ref("streamline__blocks") }}
EXCEPT
SELECT
sequence_number,
shard,
workchain
FROM
{{ ref("streamline__blocks_complete") }}
)
SELECT
sequence_number,
shard,
workchain,
ROUND(
sequence_number,
-5
) :: INT AS partition_key,
{{ target.database }}.live.udf_api(
'GET',
'{Service}/{Authentication}/getBlockHeader?workchain=' || workchain || '&shard=' || shard || '&seqno=' || sequence_number,
OBJECT_CONSTRUCT(
'Content-Type',
'application/json'
),
OBJECT_CONSTRUCT(),
'Vault/prod/ton/quicknode/mainnet'
) AS request
FROM
shards

View File

@ -0,0 +1,45 @@
{{ config (
materialized = "view",
post_hook = fsc_utils.if_data_call_function_v2(
func = 'streamline.udf_bulk_rest_api_v2',
target = "{{this.schema}}.{{this.identifier}}",
params ={ "external_table" :"shards",
"sql_limit" :"100000",
"producer_batch_size" :"100000",
"worker_batch_size" :"50000",
"sql_source" :"{{this.identifier}}",
"order_by_column": "sequence_number DESC" }
),
tags = ['streamline_realtime']
) }}
WITH sequences AS (
SELECT
sequence_number
FROM
{{ ref("streamline__sequences") }}
EXCEPT
SELECT
sequence_number
FROM
{{ ref("streamline__shards_complete") }}
)
SELECT
sequence_number,
ROUND(
sequence_number,
-5
) :: INT AS partition_key,
{{ target.database }}.live.udf_api(
'GET',
'{Service}/{Authentication}/shards?seqno=' || sequence_number,
OBJECT_CONSTRUCT(
'Content-Type',
'application/json'
),
OBJECT_CONSTRUCT(),
'Vault/prod/ton/quicknode/mainnet'
) AS request
FROM
sequences

View File

@ -0,0 +1,54 @@
{{ config (
materialized = "view",
post_hook = fsc_utils.if_data_call_function_v2(
func = 'streamline.udf_bulk_rest_api_v2',
target = "{{this.schema}}.{{this.identifier}}",
params ={ "external_table" :"transactions",
"sql_limit" :"100000",
"producer_batch_size" :"20000",
"worker_batch_size" :"5000",
"exploded_key": tojson(["result.transactions"]),
"include_top_level_json": tojson(["result.incomplete","ok"]),
"sql_source" :"{{this.identifier}}",
"order_by_column": "sequence_number DESC" }
),
tags = ['streamline_realtime']
) }}
WITH shards AS (
SELECT
sequence_number,
shard,
workchain
FROM
{{ ref("streamline__blocks") }}
EXCEPT
SELECT
sequence_number,
shard,
workchain
FROM
{{ ref("streamline__transactions_complete") }}
)
SELECT
sequence_number,
shard,
workchain,
ROUND(
sequence_number,
-5
) :: INT AS partition_key,
{{ target.database }}.live.udf_api(
'GET',
'{Service}/{Authentication}/getBlockTransactionsExt?workchain=' || workchain || '&shard=' || shard || '&seqno=' || sequence_number || '&count=256',
--256 appears to be the max number of transactions per block that QN support - tbd on if there are blockc with > 256 transactions
OBJECT_CONSTRUCT(
'Content-Type',
'application/json'
),
OBJECT_CONSTRUCT(),
'Vault/prod/ton/quicknode/mainnet'
) AS request
FROM
shards

View File

@ -0,0 +1,21 @@
{{ config (
materialized = "view",
tags = ['streamline_view']
) }}
SELECT
sequence_number AS mastechain_sequence_number,
VALUE :seqno AS sequence_number,
VALUE :shard :: STRING AS shard,
VALUE :workchain AS workchain
FROM
{{ ref('streamline__shards_complete') }},
LATERAL FLATTEN(shards)
UNION ALL
SELECT
sequence_number AS mastechain_sequence_number,
sequence_number AS sequence_number,
'8000000000000000' AS shard,
-1 AS workchain
FROM
{{ ref('streamline__sequences') }}

View File

@ -0,0 +1,18 @@
{{ config (
materialized = "view",
tags = ['streamline_view']
) }}
SELECT
{{ target.database }}.live.udf_api(
'GET',
'{Service}/{Authentication}/getMasterchainInfo',
OBJECT_CONSTRUCT(
'Content-Type',
'application/json',
'fsc-quantum-state',
'livequery'
),
OBJECT_CONSTRUCT(),
'Vault/prod/ton/quicknode/mainnet'
) :data: "result" :"last" :"seqno" AS sequence_number

View File

@ -0,0 +1,22 @@
{{ config (
materialized = "view",
tags = ['streamline_view']
) }}
SELECT
0 AS sequence_number
UNION
SELECT
_id AS sequence_number
FROM
{{ source(
'crosschain_silver',
'number_sequence'
) }}
WHERE
_id <= (
SELECT
MAX(sequence_number)
FROM
{{ ref('streamline__chainhead') }}
)

16
package-lock.yml Normal file
View File

@ -0,0 +1,16 @@
packages:
- package: calogica/dbt_expectations
version: 0.8.5
- git: https://github.com/FlipsideCrypto/fsc-utils.git
revision: d3cf679e079f0cf06142de9386f215e55fe26b3b
- package: get-select/dbt_snowflake_query_tags
version: 2.5.0
- package: dbt-labs/dbt_external_tables
version: 0.8.2
- package: dbt-labs/dbt_utils
version: 1.0.0
- package: calogica/dbt_date
version: 0.7.2
- git: https://github.com/FlipsideCrypto/livequery-models.git
revision: b024188be4e9c6bc00ed77797ebdc92d351d620e
sha1_hash: 733d4f1fb94f4356106bab2f9af580d0898b3b50

11
packages.yml Normal file
View File

@ -0,0 +1,11 @@
packages:
- package: calogica/dbt_expectations
version: [">=0.4.0", "<0.9.0"]
- git: https://github.com/FlipsideCrypto/fsc-utils.git
revision: v1.32.0
- package: get-select/dbt_snowflake_query_tags
version: [">=2.0.0", "<3.0.0"]
- package: dbt-labs/dbt_external_tables
version: 0.8.2
- package: dbt-labs/dbt_utils
version: 1.0.0

29
profiles.yml Normal file
View File

@ -0,0 +1,29 @@
ton:
target: dev
outputs:
dev:
type: snowflake
account: "{{ env_var('ACCOUNT') }}"
user: "{{ env_var('USER') }}"
password: "{{ env_var('PASSWORD') }}"
role: "{{ env_var('ROLE') }}"
schema: "{{ env_var('SCHEMA') }}"
region: "{{ env_var('REGION') }}"
database: "{{ env_var('DATABASE') }}"
warehouse: "{{ env_var('WAREHOUSE') }}"
threads: 8
client_session_keep_alive: False
prod:
type: snowflake
account: "{{ env_var('ACCOUNT') }}"
user: "{{ env_var('USER') }}"
password: "{{ env_var('PASSWORD') }}"
role: "{{ env_var('ROLE') }}"
schema: "{{ env_var('SCHEMA') }}"
region: "{{ env_var('REGION') }}"
database: "{{ env_var('DATABASE') }}"
warehouse: "{{ env_var('WAREHOUSE') }}"
threads: 8
client_session_keep_alive: False
config:
send_anonymous_usage_stats: False

3
requirements.txt Normal file
View File

@ -0,0 +1,3 @@
dbt-core>=1.8,<1.9
dbt-snowflake>=1.8,<1.9
protobuf==4.25.3