base plus core

This commit is contained in:
Eric Laurello 2025-07-08 14:01:06 -04:00
parent 324781a661
commit 4ee0733868
54 changed files with 2982 additions and 0 deletions

78
.github/workflows/dbt_docs_update.yml vendored Normal file
View File

@ -0,0 +1,78 @@
name: docs_update
on:
push:
branches:
- "main"
env:
USE_VARS: "${{ vars.USE_VARS }}"
DBT_PROFILES_DIR: "${{ vars.DBT_PROFILES_DIR }}"
DBT_VERSION: "${{ vars.DBT_VERSION }}"
ACCOUNT: "${{ vars.ACCOUNT }}"
ROLE: "${{ vars.ROLE }}"
USER: "${{ vars.USER }}"
PASSWORD: "${{ secrets.PASSWORD }}"
REGION: "${{ vars.REGION }}"
DATABASE: "${{ vars.DATABASE }}"
WAREHOUSE: "${{ vars.WAREHOUSE }}"
SCHEMA: "${{ vars.SCHEMA }}"
concurrency:
group: ${{ github.workflow }}
jobs:
run_dbt_jobs:
runs-on: ubuntu-latest
environment:
name: workflow_prod
steps:
- uses: actions/checkout@v3
- uses: actions/setup-python@v4
with:
python-version: "${{ vars.PYTHON_VERSION }}"
cache: "pip"
- name: install dependencies
run: |
pip install -r requirements.txt
dbt deps
- name: checkout docs branch
run: |
git checkout -B docs origin/main
- name: generate dbt docs
run: |
dbt ls -t prod
dbt docs generate -t prod
- name: move files to docs directory
run: |
mkdir -p ./docs
cp target/{catalog.json,manifest.json,index.html} docs/
- name: clean up target directory
run: dbt clean
- name: check for changes
run: git status
- name: stage changed files
run: git add .
- name: commit changed files
run: |
git config user.email "abc@xyz"
git config user.name "github-actions"
git commit -am "Auto-update docs"
- name: push changes to docs
run: |
git push -f --set-upstream origin docs
notify-failure:
needs: [run_dbt_jobs]
if: failure()
uses: ./.github/workflows/slack_notify.yml
secrets:
SLACK_WEBHOOK_URL: ${{ secrets.SLACK_WEBHOOK_URL }}

74
.github/workflows/dbt_run_adhoc.yml vendored Normal file
View File

@ -0,0 +1,74 @@
name: dbt_run_adhoc
run-name: dbt_run_adhoc
on:
workflow_dispatch:
branches:
- "main"
inputs:
environment:
type: choice
description: DBT Run Environment
required: true
options:
- dev
- prod
default: dev
warehouse:
type: choice
description: Snowflake warehouse
required: true
options:
- DBT
- DBT_CLOUD
- DBT_EMERGENCY
default: DBT
dbt_command:
type: string
description: 'DBT Run Command'
required: true
env:
USE_VARS: "${{ vars.USE_VARS }}"
DBT_PROFILES_DIR: "${{ vars.DBT_PROFILES_DIR }}"
DBT_VERSION: "${{ vars.DBT_VERSION }}"
ACCOUNT: "${{ vars.ACCOUNT }}"
ROLE: "${{ vars.ROLE }}"
USER: "${{ vars.USER }}"
PASSWORD: "${{ secrets.PASSWORD }}"
REGION: "${{ vars.REGION }}"
DATABASE: "${{ vars.DATABASE }}"
WAREHOUSE: "${{ inputs.WAREHOUSE }}"
SCHEMA: "${{ vars.SCHEMA }}"
concurrency:
group: ${{ github.workflow }}
jobs:
run_dbt_jobs:
runs-on: ubuntu-latest
environment:
name: workflow_${{ inputs.environment }}
steps:
- uses: actions/checkout@v3
- uses: actions/setup-python@v4
with:
python-version: "${{ vars.PYTHON_VERSION }}"
cache: "pip"
- name: install dependencies
run: |
pip install -r requirements.txt
dbt deps
- name: Run DBT Jobs
run: |
${{ inputs.dbt_command }}
notify-failure:
needs: [run_dbt_jobs]
if: failure()
uses: ./.github/workflows/slack_notify.yml
secrets:
SLACK_WEBHOOK_URL: ${{ secrets.SLACK_WEBHOOK_URL }}

View File

@ -0,0 +1,44 @@
name: dbt_run_dev_refresh
run-name: dbt_run_dev_refresh
on:
workflow_dispatch:
# schedule:
# - cron: '27 8 * * *'
env:
DBT_PROFILES_DIR: "${{ vars.DBT_PROFILES_DIR }}"
ACCOUNT: "${{ vars.ACCOUNT }}"
ROLE: "${{ vars.ROLE }}"
USER: "${{ vars.USER }}"
PASSWORD: "${{ secrets.PASSWORD }}"
REGION: "${{ vars.REGION }}"
DATABASE: "${{ vars.DATABASE }}"
WAREHOUSE: "${{ vars.WAREHOUSE }}"
SCHEMA: "${{ vars.SCHEMA }}"
concurrency:
group: ${{ github.workflow }}
jobs:
run_dbt_jobs:
runs-on: ubuntu-latest
environment:
name: workflow_prod
steps:
- uses: actions/checkout@v3
- uses: actions/setup-python@v4
with:
python-version: "${{ vars.PYTHON_VERSION }}"
cache: "pip"
- name: install dependencies
run: |
pip install -r requirements.txt
dbt deps
- name: Run DBT Jobs
run: |
dbt run-operation run_sp_create_prod_clone

View File

@ -0,0 +1,53 @@
name: dbt_run_incremental_core
run-name: dbt_run_incremental_core
on:
workflow_dispatch:
branches:
- "main"
# schedule:
# - cron: "35 3 * * *"
env:
DBT_PROFILES_DIR: "${{ vars.DBT_PROFILES_DIR }}"
ACCOUNT: "${{ vars.ACCOUNT }}"
ROLE: "${{ vars.ROLE }}"
USER: "${{ vars.USER }}"
PASSWORD: "${{ secrets.PASSWORD }}"
REGION: "${{ vars.REGION }}"
DATABASE: "${{ vars.DATABASE }}"
WAREHOUSE: "${{ vars.WAREHOUSE }}"
SCHEMA: "${{ vars.SCHEMA }}"
concurrency:
group: ${{ github.workflow }}
jobs:
run_dbt_jobs:
runs-on: ubuntu-latest
environment:
name: workflow_prod
steps:
- uses: actions/checkout@v3
- uses: actions/setup-python@v4
with:
python-version: "${{ vars.PYTHON_VERSION }}"
cache: "pip"
- name: install dependencies
run: |
pip install -r requirements.txt
dbt deps
- name: Run DBT Jobs
run: |
dbt run -s "sui_models,tag:core"
notify-failure:
needs: [run_dbt_jobs]
if: failure()
uses: ./.github/workflows/slack_notify.yml
secrets:
SLACK_WEBHOOK_URL: ${{ secrets.SLACK_WEBHOOK_URL }}

View File

@ -0,0 +1,46 @@
name: dbt_run_streamline_realtime
run-name: dbt_run_streamline_realtime
on:
workflow_dispatch:
branches:
- "main"
# schedule:
# - cron: '*/15 * * * *'
env:
DBT_PROFILES_DIR: "${{ vars.DBT_PROFILES_DIR }}"
ACCOUNT: "${{ vars.ACCOUNT }}"
ROLE: "${{ vars.ROLE }}"
USER: "${{ vars.USER }}"
PASSWORD: "${{ secrets.PASSWORD }}"
REGION: "${{ vars.REGION }}"
DATABASE: "${{ vars.DATABASE }}"
WAREHOUSE: "${{ vars.WAREHOUSE }}"
SCHEMA: "${{ vars.SCHEMA }}"
concurrency:
group: ${{ github.workflow }}
jobs:
run_dbt_jobs:
runs-on: ubuntu-latest
environment:
name: workflow_prod
steps:
- uses: actions/checkout@v3
- uses: actions/setup-python@v4
with:
python-version: "${{ vars.PYTHON_VERSION }}"
cache: "pip"
- name: install dependencies
run: |
pip install -r requirements.txt
dbt deps
- name: Run DBT Jobs
run: |
dbt run -m "sui_models,tag:streamline_realtime" --vars '{STREAMLINE_INVOKE_STREAMS: True}'

27
.github/workflows/slack_notify.yml vendored Normal file
View File

@ -0,0 +1,27 @@
name: Slack Notification
on:
workflow_call:
secrets:
SLACK_WEBHOOK_URL:
required: true
jobs:
notify:
runs-on: ubuntu-latest
environment: workflow_prod
steps:
- name: Checkout
uses: actions/checkout@v3
- name: Set up Python
uses: actions/setup-python@v4
with:
python-version: "3.10"
- name: Install dependencies
run: pip install requests
- name: Send Slack notification
run: python python/slack_alert.py
env:
SLACK_WEBHOOK_URL: ${{ secrets.SLACK_WEBHOOK_URL }}

21
.gitignore vendored Normal file
View File

@ -0,0 +1,21 @@
target/
dbt_modules/
# newer versions of dbt use this directory instead of dbt_modules for test dependencies
dbt_packages/
logs/
.venv/
.python-version
dbt-env/
venv/
package-lock.yml
# Visual Studio Code files
*/.vscode
*.code-workspace
.history/
**/.DS_Store
.vscode/
.env
.DS_Store
.user.yml

74
README.md Normal file
View File

@ -0,0 +1,74 @@
## Profile Set Up
#### Use the following within profiles.yml
----
```yml
sui:
target: dev
outputs:
dev:
type: snowflake
account: <ACCOUNT>
role: <ROLE>
user: <USERNAME>
password: <PASSWORD>
region: <REGION>
database: sui_DEV
warehouse: <WAREHOUSE>
schema: silver
threads: 4
client_session_keep_alive: False
query_tag: <TAG>
```
### Resources:
- Learn more about dbt [in the docs](https://docs.getdbt.com/docs/introduction)
- Check out [Discourse](https://discourse.getdbt.com/) for commonly asked questions and answers
- Join the [chat](https://community.getdbt.com/) on Slack for live discussions and support
- Find [dbt events](https://events.getdbt.com) near you
- Check out [the blog](https://blog.getdbt.com/) for the latest news on dbt's development and best practices
- Check out [the blog](https://blog.getdbt.com/) for the latest news on dbt's development and best practices
## Applying Model Tags
### Database / Schema level tags
Database and schema tags are applied via the `add_database_or_schema_tags` macro. These tags are inherited by their downstream objects. To add/modify tags call the appropriate tag set function within the macro.
```
{{ set_database_tag_value('SOME_DATABASE_TAG_KEY','SOME_DATABASE_TAG_VALUE') }}
{{ set_schema_tag_value('SOME_SCHEMA_TAG_KEY','SOME_SCHEMA_TAG_VALUE') }}
```
### Model tags
To add/update a model's snowflake tags, add/modify the `meta` model property under `config`. Only table level tags are supported at this time via DBT.
```
{{ config(
...,
meta={
'database_tags':{
'table': {
'PURPOSE': 'SOME_PURPOSE'
}
}
},
...
) }}
```
By default, model tags are not pushed to snowflake on each load. You can push a tag update for a model by specifying the `UPDATE_SNOWFLAKE_TAGS` project variable during a run.
```
dbt run --var '{"UPDATE_SNOWFLAKE_TAGS":True}' -s models/core/core__fact_swaps.sql
```
### Querying for existing tags on a model in snowflake
```
select *
from table(sui.information_schema.tag_references('sui.core.fact_blocks', 'table'));
```

102
dbt_project.yml Normal file
View File

@ -0,0 +1,102 @@
# Name your project! Project names should contain only lowercase characters
# and underscores. A good package name should reflect your organization's
# name or the intended use of these models
name: "sui_models"
version: "1.0.0"
config-version: 2
require-dbt-version: ">=1.8.0"
# This setting configures which "profile" dbt uses for this project.
profile: "sui"
# These configurations specify where dbt should look for different types of files.
# The `model-paths` config, for example, states that models in this project can be
# found in the "models/" directory. You probably won't need to change these!
model-paths: ["models"]
analysis-paths: ["analysis"]
test-paths: ["tests"]
seed-paths: ["data"]
macro-paths: ["macros"]
snapshot-paths: ["snapshots"]
target-path: "target" # directory which will store compiled SQL files
clean-targets: # directories to be removed by `dbt clean`
- "target"
- "dbt_modules"
- "dbt_packages"
# Configuring models
# Full documentation: https://docs.getdbt.com/docs/configuring-models
# In this example config, we tell dbt to build all models in the example/ directory
# as tables. These settings can be overridden in the individual model files
# using the `{{ config(...) }}` macro.
models:
+copy_grants: true
+persist_docs:
relation: true
columns: true
+on_schema_change: "append_new_columns"
livequery_models:
deploy:
core:
materialized: ephemeral
data_tests:
sui_models: # replace with the name of the chain
+store_failures: true # all tests
+where: "modified_timestamp::DATE > dateadd(hour, -{{ var('TEST_HOURS_THRESHOLD', 36) }}, sysdate())"
on-run-start:
- '{{create_sps()}}'
- '{{create_udfs()}}'
on-run-end:
- '{{ apply_meta_as_tags(results) }}'
dispatch:
- macro_namespace: dbt
search_order:
- sui-models
- dbt_snowflake_query_tags
- dbt
query-comment:
comment: '{{ dbt_snowflake_query_tags.get_query_comment(node) }}'
append: true # Snowflake removes prefixed comments.
vars:
"dbt_date:time_zone": GMT
OBSERV_FULL_TEST: False
START_GHA_TASKS: False
STREAMLINE_INVOKE_STREAMS: True
STREAMLINE_USE_DEV_FOR_EXTERNAL_TABLES: True
STREAMLINE_RUN_HISTORY: False
STREAMLINE_RETRY_UNKNOWN: False
UPDATE_SNOWFLAKE_TAGS: True
UPDATE_UDFS_AND_SPS: False
API_INTEGRATION: '{{ var("config")[target.name]["API_INTEGRATION"] if var("config")[target.name] else var("config")["dev"]["API_INTEGRATION"] }}'
EXTERNAL_FUNCTION_URI: '{{ var("config")[target.name]["EXTERNAL_FUNCTION_URI"] if var("config")[target.name] else var("config")["dev"]["EXTERNAL_FUNCTION_URI"] }}'
ROLES: |
["INTERNAL_DEV"]
config:
dev:
API_INTEGRATION: AWS_SUI_API_STG_V2
EXTERNAL_FUNCTION_URI: azbc07ki8d.execute-api.us-east-1.amazonaws.com/stg/
ROLES:
- AWS_LAMBDA_SUI_API
- INTERNAL_DEV
prod:
API_INTEGRATION: AWS_SUI_API_PROD_V2
EXTERNAL_FUNCTION_URI: nqj8j7ln67.execute-api.us-east-1.amazonaws.com/prod/
ROLES:
- AWS_LAMBDA_SUI_API
- INTERNAL_DEV
- DBT_CLOUD_SUI

0
docs/.gitkeep Normal file
View File

0
macros/.gitkeep Normal file
View File

19
macros/create_sps.sql Normal file
View File

@ -0,0 +1,19 @@
{% macro create_sps() %}
{% if target.database == 'SUI' %}
CREATE schema IF NOT EXISTS _internal;
{{ sp_create_prod_clone('_internal') }};
{% endif %}
{% endmacro %}
{% macro enable_search_optimization(
schema_name,
table_name,
condition = ''
) %}
{% if target.database == 'SUI' %}
ALTER TABLE
{{ schema_name }}.{{ table_name }}
ADD
search optimization {{ condition }}
{% endif %}
{% endmacro %}

8
macros/create_udfs.sql Normal file
View File

@ -0,0 +1,8 @@
{% macro create_udfs() %}
{% if var("UPDATE_UDFS_AND_SPS") %}
{% set sql %}
{{ create_udf_bulk_rest_api_v2() }};
{% endset %}
{% do run_query(sql) %}
{% endif %}
{% endmacro %}

View File

@ -0,0 +1,33 @@
{% macro generate_schema_name(
custom_schema_name = none,
node = none
) -%}
{% set node_name = node.name %}
{% set split_name = node_name.split('__') %}
{{ split_name [0] | trim }}
{%- endmacro %}
{% macro generate_alias_name(
custom_alias_name = none,
node = none
) -%}
{% set node_name = node.name %}
{% set split_name = node_name.split('__') %}
{% if split_name | length < 2 %}
{{ split_name [0] | trim }}
{% else %}
{{ split_name [1] | trim }}
{% endif %}
{%- endmacro %}
{% macro generate_tmp_view_name(model_name) -%}
{% set node_name = model_name.name %}
{% set split_name = node_name.split('__') %}
{{ target.database ~ '.' ~ split_name[0] ~ '.' ~ split_name [1] ~ '__dbt_tmp' | trim }}
{%- endmacro %}
{% macro generate_view_name(model_name) -%}
{% set node_name = model_name.name %}
{% set split_name = node_name.split('__') %}
{{ target.database ~ '.' ~ split_name[0] ~ '.' ~ split_name [1] | trim }}
{%- endmacro %}

View File

@ -0,0 +1,11 @@
{% macro set_query_tag() -%}
{% set new_json = {"repo":project_name, "object":this.table, "profile":target.profile_name, "env":target.name, "existing_tag":get_current_query_tag() } %}
{% set new_query_tag = tojson(new_json) | as_text %}
{% if new_query_tag %}
{% set original_query_tag = get_current_query_tag() %}
{{ log("Setting query_tag to '" ~ new_query_tag ~ "'. Will reset to '" ~ original_query_tag ~ "' after materialization.") }}
{% do run_query("alter session set query_tag = '{}'".format(new_query_tag)) %}
{{ return(original_query_tag)}}
{% endif %}
{{ return(none)}}
{% endmacro %}

30
macros/dbt/get_merge.sql Normal file
View File

@ -0,0 +1,30 @@
-- incremental_strategy="merge"
{% macro get_merge_sql(target, source, unique_key, dest_columns, incremental_predicates) -%}
{% set merge_sql = fsc_utils.get_merge_sql(target, source, unique_key, dest_columns, incremental_predicates) %}
{{ return(merge_sql) }}
{% endmacro %}
-- incremental_strategy="delete+insert"
{% macro get_delete_insert_merge_sql(target, source, unique_key, dest_columns, incremental_predicates) -%}
{% set predicate_override = "" %}
-- get the min value of column
{% if incremental_predicates[0] == "min_value_predicate" %}
{% set min_column_name = incremental_predicates[1] %}
{% set query %}
select min({{ min_column_name }}) from {{ source }}
{% endset %}
{% set min_block = run_query(query).columns[0][0] %}
{% if min_block is not none %}
{% set predicate_override %}
round({{ target }}.{{ min_column_name }},-5) >= round({{ min_block }},-5)
{% endset %}
{% else %}
{% set predicate_override = "1=1" %}
{% endif %}
{% endif %}
{% set predicates = [predicate_override] + incremental_predicates[2:] if predicate_override else incremental_predicates %}
-- standard delete+insert from here
{% set merge_sql = dbt.get_delete_insert_merge_sql(target, source, unique_key, dest_columns, predicates) %}
{{ return(merge_sql) }}
{% endmacro %}

View File

@ -0,0 +1,4 @@
{% macro dbt_snowflake_get_tmp_relation_type(strategy, unique_key, language) %}
-- always table
{{ return('table') }}
{% endmacro %}

View File

@ -0,0 +1,10 @@
{% macro run_sp_create_prod_clone() %}
{% set clone_query %}
call sui._internal.create_prod_clone(
'sui',
'sui_dev',
'internal_dev'
);
{% endset %}
{% do run_query(clone_query) %}
{% endmacro %}

View File

@ -0,0 +1,51 @@
{% macro sp_create_prod_clone(target_schema) -%}
create or replace procedure {{ target_schema }}.create_prod_clone(source_db_name string, destination_db_name string, role_name string)
returns boolean
language javascript
execute as caller
as
$$
snowflake.execute({sqlText: `BEGIN TRANSACTION;`});
try {
snowflake.execute({sqlText: `CREATE OR REPLACE DATABASE ${DESTINATION_DB_NAME} CLONE ${SOURCE_DB_NAME}`});
snowflake.execute({sqlText: `DROP SCHEMA IF EXISTS ${DESTINATION_DB_NAME}._INTERNAL`}); /* this only needs to be in prod */
snowflake.execute({sqlText: `GRANT USAGE ON DATABASE ${DESTINATION_DB_NAME} TO ROLE AWS_LAMBDA_SUI_API`});
snowflake.execute({sqlText: `GRANT OWNERSHIP ON ALL SCHEMAS IN DATABASE ${DESTINATION_DB_NAME} TO ROLE ${ROLE_NAME} COPY CURRENT GRANTS;`});
snowflake.execute({sqlText: `GRANT OWNERSHIP ON ALL FUNCTIONS IN DATABASE ${DESTINATION_DB_NAME} TO ROLE ${ROLE_NAME} COPY CURRENT GRANTS;`});
snowflake.execute({sqlText: `GRANT OWNERSHIP ON ALL PROCEDURES IN DATABASE ${DESTINATION_DB_NAME} TO ROLE ${ROLE_NAME} COPY CURRENT GRANTS;`});
snowflake.execute({sqlText: `GRANT OWNERSHIP ON ALL VIEWS IN DATABASE ${DESTINATION_DB_NAME} TO ROLE ${ROLE_NAME} COPY CURRENT GRANTS;`});
snowflake.execute({sqlText: `GRANT OWNERSHIP ON ALL STAGES IN DATABASE ${DESTINATION_DB_NAME} TO ROLE ${ROLE_NAME} COPY CURRENT GRANTS;`});
snowflake.execute({sqlText: `GRANT OWNERSHIP ON ALL TABLES IN DATABASE ${DESTINATION_DB_NAME} TO ROLE ${ROLE_NAME} COPY CURRENT GRANTS;`});
snowflake.execute({sqlText: `REVOKE OWNERSHIP ON FUTURE FUNCTIONS IN DATABASE ${DESTINATION_DB_NAME} FROM ROLE DBT_CLOUD_SUI;`});
snowflake.execute({sqlText: `REVOKE OWNERSHIP ON FUTURE PROCEDURES IN DATABASE ${DESTINATION_DB_NAME} FROM ROLE DBT_CLOUD_SUI;`});
snowflake.execute({sqlText: `REVOKE OWNERSHIP ON FUTURE VIEWS IN DATABASE ${DESTINATION_DB_NAME} FROM ROLE DBT_CLOUD_SUI;`});
snowflake.execute({sqlText: `REVOKE OWNERSHIP ON FUTURE STAGES IN DATABASE ${DESTINATION_DB_NAME} FROM ROLE DBT_CLOUD_SUI;`});
snowflake.execute({sqlText: `REVOKE OWNERSHIP ON FUTURE TABLES IN DATABASE ${DESTINATION_DB_NAME} FROM ROLE DBT_CLOUD_SUI;`});
snowflake.execute({sqlText: `GRANT OWNERSHIP ON FUTURE FUNCTIONS IN DATABASE ${DESTINATION_DB_NAME} TO ROLE ${ROLE_NAME};`});
snowflake.execute({sqlText: `GRANT OWNERSHIP ON FUTURE PROCEDURES IN DATABASE ${DESTINATION_DB_NAME} TO ROLE ${ROLE_NAME};`});
snowflake.execute({sqlText: `GRANT OWNERSHIP ON FUTURE VIEWS IN DATABASE ${DESTINATION_DB_NAME} TO ROLE ${ROLE_NAME};`});
snowflake.execute({sqlText: `GRANT OWNERSHIP ON FUTURE STAGES IN DATABASE ${DESTINATION_DB_NAME} TO ROLE ${ROLE_NAME};`});
snowflake.execute({sqlText: `GRANT OWNERSHIP ON FUTURE TABLES IN DATABASE ${DESTINATION_DB_NAME} TO ROLE ${ROLE_NAME};`});
snowflake.execute({sqlText: `GRANT USAGE ON ALL STAGES IN DATABASE ${DESTINATION_DB_NAME} TO ROLE AWS_LAMBDA_SUI_API;`});
snowflake.execute({sqlText: `GRANT OWNERSHIP ON DATABASE ${DESTINATION_DB_NAME} TO ROLE ${ROLE_NAME} COPY CURRENT GRANTS;`})
var existing_tags = snowflake.execute({sqlText: `SHOW TAGS IN DATABASE ${DESTINATION_DB_NAME};`});
while (existing_tags.next()) {
var schema = existing_tags.getColumnValue(4);
var tag_name = existing_tags.getColumnValue(2)
snowflake.execute({sqlText: `GRANT OWNERSHIP ON TAG ${DESTINATION_DB_NAME}.${schema}.${tag_name} TO ROLE ${ROLE_NAME} COPY CURRENT GRANTS;`});
}
snowflake.execute({sqlText: `COMMIT;`});
} catch (err) {
snowflake.execute({sqlText: `ROLLBACK;`});
throw(err);
}
return true
$$
{%- endmacro %}

View File

@ -0,0 +1,69 @@
{% macro streamline_external_table_query(
model,
partition_function
) %}
WITH meta AS (
SELECT
job_created_time AS _inserted_timestamp,
file_name,
{{ partition_function }} AS partition_key
FROM
TABLE(
information_schema.external_table_file_registration_history(
start_time => DATEADD('day', -3, CURRENT_TIMESTAMP()),
table_name => '{{ source( "bronze_streamline", model) }}')
) A
)
SELECT
s.*,
b.file_name,
b._inserted_timestamp
FROM
{{ source(
"bronze_streamline",
model
) }}
s
JOIN
meta b
ON b.file_name = metadata$filename
AND b.partition_key = s.partition_key
WHERE
b.partition_key = s.partition_key
{% endmacro %}
{% macro streamline_external_table_query_fr(
model,
partition_function
) %}
WITH meta AS (
SELECT
registered_on AS _inserted_timestamp,
file_name,
{{ partition_function }} AS partition_key
FROM
TABLE(
information_schema.external_table_files(
table_name => '{{ source( "bronze_streamline", model) }}'
)
) A
)
SELECT
s.*,
b.file_name,
b._inserted_timestamp
FROM
{{ source(
"bronze_streamline",
model
) }}
s
JOIN
meta b
ON b.file_name = metadata$filename
AND b.partition_key = s.partition_key
WHERE
b.partition_key = s.partition_key
{% endmacro %}

View File

@ -0,0 +1,10 @@
{% macro create_udf_bulk_rest_api_v2() %}
CREATE
OR REPLACE EXTERNAL FUNCTION streamline.udf_bulk_rest_api_v2(
json OBJECT
) returns ARRAY {% if target.database == 'SUI' -%}
api_integration = aws_sui_api_prod_v2 AS 'https://nqj8j7ln67.execute-api.us-east-1.amazonaws.com/prod/udf_bulk_rest_api'
{% else %}
api_integration = aws_sui_api_stg_v2 AS 'https://azbc07ki8d.execute-api.us-east-1.amazonaws.com/stg/udf_bulk_rest_api'
{%- endif %}
{% endmacro %}

View File

@ -0,0 +1,6 @@
{% macro add_database_or_schema_tags() %}
{{ set_database_tag_value(
'BLOCKCHAIN_NAME',
'SUI'
) }}
{% endmacro %}

View File

@ -0,0 +1,127 @@
{% macro apply_meta_as_tags(results) %}
{% if var("UPDATE_SNOWFLAKE_TAGS") %}
{{ log('apply_meta_as_tags', info=False) }}
{{ log(results, info=False) }}
{% if execute %}
{%- set tags_by_schema = {} -%}
{% for res in results -%}
{% if res.node.meta.database_tags %}
{%- set model_database = res.node.database -%}
{%- set model_schema = res.node.schema -%}
{%- set model_schema_full = model_database+'.'+model_schema -%}
{%- set model_alias = res.node.alias -%}
{% if model_schema_full not in tags_by_schema.keys() %}
{{ log('need to fetch tags for schema '+model_schema_full, info=False) }}
{%- call statement('main', fetch_result=True) -%}
show tags in {{model_database}}.{{model_schema}}
{%- endcall -%}
{%- set _ = tags_by_schema.update({model_schema_full: load_result('main')['table'].columns.get('name').values()|list}) -%}
{{ log('Added tags to cache', info=False) }}
{% else %}
{{ log('already have tag info for schema', info=False) }}
{% endif %}
{%- set current_tags_in_schema = tags_by_schema[model_schema_full] -%}
{{ log('current_tags_in_schema:', info=False) }}
{{ log(current_tags_in_schema, info=False) }}
{{ log("========== Processing tags for "+model_schema_full+"."+model_alias+" ==========", info=False) }}
{% set line -%}
node: {{ res.node.unique_id }}; status: {{ res.status }} (message: {{ res.message }})
node full: {{ res.node}}
meta: {{ res.node.meta}}
materialized: {{ res.node.config.materialized }}
{%- endset %}
{{ log(line, info=False) }}
{%- call statement('main', fetch_result=True) -%}
select LEVEL,UPPER(TAG_NAME) as TAG_NAME,TAG_VALUE from table(information_schema.tag_references_all_columns('{{model_schema}}.{{model_alias}}', 'table'))
{%- endcall -%}
{%- set existing_tags_for_table = load_result('main')['data'] -%}
{{ log('Existing tags for table:', info=False) }}
{{ log(existing_tags_for_table, info=False) }}
{{ log('--', info=False) }}
{% for table_tag in res.node.meta.database_tags.table %}
{{ create_tag_if_missing(current_tags_in_schema,table_tag|upper) }}
{% set desired_tag_value = res.node.meta.database_tags.table[table_tag] %}
{{set_table_tag_value_if_different(model_schema,model_alias,table_tag,desired_tag_value,existing_tags_for_table)}}
{% endfor %}
{{ log("========== Finished processing tags for "+model_alias+" ==========", info=False) }}
{% endif %}
{% endfor %}
{% endif %}
{% endif %}
{% endmacro %}
{% macro create_tag_if_missing(all_tag_names,table_tag) %}
{% if table_tag not in all_tag_names %}
{{ log('Creating missing tag '+table_tag, info=False) }}
{%- call statement('main', fetch_result=True) -%}
create tag if not exists silver.{{table_tag}}
{%- endcall -%}
{{ log(load_result('main').data, info=False) }}
{% else %}
{{ log('Tag already exists: '+table_tag, info=False) }}
{% endif %}
{% endmacro %}
{% macro set_table_tag_value_if_different(model_schema,table_name,tag_name,desired_tag_value,existing_tags) %}
{{ log('Ensuring tag '+tag_name+' has value '+desired_tag_value+' at table level', info=False) }}
{%- set existing_tag_for_table = existing_tags|selectattr('0','equalto','TABLE')|selectattr('1','equalto',tag_name|upper)|list -%}
{{ log('Filtered tags for table:', info=False) }}
{{ log(existing_tag_for_table[0], info=False) }}
{% if existing_tag_for_table|length > 0 and existing_tag_for_table[0][2]==desired_tag_value %}
{{ log('Correct tag value already exists', info=False) }}
{% else %}
{{ log('Setting tag value for '+tag_name+' to value '+desired_tag_value, info=False) }}
{%- call statement('main', fetch_result=True) -%}
alter table {{model_schema}}.{{table_name}} set tag {{tag_name}} = '{{desired_tag_value}}'
{%- endcall -%}
{{ log(load_result('main').data, info=False) }}
{% endif %}
{% endmacro %}
{% macro set_column_tag_value_if_different(table_name,column_name,tag_name,desired_tag_value,existing_tags) %}
{{ log('Ensuring tag '+tag_name+' has value '+desired_tag_value+' at column level', info=False) }}
{%- set existing_tag_for_column = existing_tags|selectattr('0','equalto','COLUMN')|selectattr('1','equalto',tag_name|upper)|list -%}
{{ log('Filtered tags for column:', info=False) }}
{{ log(existing_tag_for_column[0], info=False) }}
{% if existing_tag_for_column|length > 0 and existing_tag_for_column[0][2]==desired_tag_value %}
{{ log('Correct tag value already exists', info=False) }}
{% else %}
{{ log('Setting tag value for '+tag_name+' to value '+desired_tag_value, info=False) }}
{%- call statement('main', fetch_result=True) -%}
alter table {{table_name}} modify column {{column_name}} set tag {{tag_name}} = '{{desired_tag_value}}'
{%- endcall -%}
{{ log(load_result('main').data, info=False) }}
{% endif %}
{% endmacro %}
{% macro set_database_tag_value(tag_name,tag_value) %}
{% set query %}
create tag if not exists silver.{{tag_name}}
{% endset %}
{% do run_query(query) %}
{% set query %}
alter database {{target.database}} set tag {{target.database}}.silver.{{tag_name}} = '{{tag_value}}'
{% endset %}
{% do run_query(query) %}
{% endmacro %}
{% macro set_schema_tag_value(target_schema,tag_name,tag_value) %}
{% set query %}
create tag if not exists silver.{{tag_name}}
{% endset %}
{% do run_query(query) %}
{% set query %}
alter schema {{target.database}}.{{target_schema}} set tag {{target.database}}.silver.{{tag_name}} = '{{tag_value}}'
{% endset %}
{% do run_query(query) %}
{% endmacro %}

View File

@ -0,0 +1,29 @@
{% test compare_model_subset(model, compare_model, compare_columns, model_condition) %}
{% set compare_cols_csv = compare_columns | join(', ') %}
with a as (
select {{compare_cols_csv}} from {{ model }}
{{ model_condition }}
),
b as (
select {{compare_cols_csv}} from {{ compare_model }}
),
a_minus_b as (
select * from a
except
select * from b
),
b_minus_a as (
select * from b
except
select * from a
),
unioned as (
select 'in_actual_not_in_expected' as which_diff, a_minus_b.* from a_minus_b
union all
select 'in_expected_not_in_actual' as which_diff, b_minus_a.* from b_minus_a
)
select * from unioned
{% endtest %}

View File

@ -0,0 +1,37 @@
{% macro sequence_gaps(
table,
partition_by,
column
) %}
{%- set partition_sql = partition_by | join(", ") -%}
{%- set previous_column = "prev_" ~ column -%}
WITH source AS (
SELECT
{{ partition_sql + "," if partition_sql }}
{{ column }},
LAG(
{{ column }},
1
) over (
{{ "PARTITION BY " ~ partition_sql if partition_sql }}
ORDER BY
{{ column }} ASC
) AS {{ previous_column }}
FROM
{{ table }}
WHERE
block_timestamp::date <= current_date - 1
)
SELECT
{{ partition_sql + "," if partition_sql }}
{{ previous_column }},
{{ column }},
{{ column }} - {{ previous_column }}
- 1 AS gap
FROM
source
WHERE
{{ column }} - {{ previous_column }} <> 1
ORDER BY
gap DESC
{% endmacro %}

View File

@ -0,0 +1,7 @@
{{ config (
materialized = 'view'
) }}
{{ streamline_external_table_query(
model = 'checkpoints',
partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 3), '_', 1) AS INTEGER)"
) }}

View File

@ -0,0 +1,7 @@
{{ config (
materialized = 'view'
) }}
{{ streamline_external_table_query_fr(
model = 'checkpoints',
partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 3), '_', 1) AS INTEGER)"
) }}

View File

@ -0,0 +1,7 @@
{{ config (
materialized = 'view'
) }}
{{ streamline_external_table_query(
model = 'transactions',
partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 3), '_', 1) AS INTEGER)"
) }}

View File

@ -0,0 +1,7 @@
{{ config (
materialized = 'view'
) }}
{{ streamline_external_table_query_fr(
model = 'transactions',
partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 3), '_', 1) AS INTEGER)"
) }}

View File

@ -0,0 +1,59 @@
{% docs __overview__ %}
# Welcome to the Flipside Crypto SUI Models Documentation
## **What does this documentation cover?**
The documentation included here details the design of the SUI blockchain tables and views available via [Flipside Crypto.](https://flipsidecrypto.xyz/) For more information on how these models are built, please see [the github repository.](https://github.com/flipsideCrypto/sui-models/)
## **How do I use these docs?**
The easiest way to navigate this documentation is to use the Quick Links below. These links will take you to the documentation for each table, which contains a description, a list of the columns, and other helpful information.
If you are experienced with dbt docs, feel free to use the sidebar to navigate the documentation, as well as explore the relationships between tables and the logic building them.
There is more information on how to use dbt docs in the last section of this document.
## **Quick Links to Table Documentation**
**Click on the links below to jump to the documentation for each schema.**
### Core Fact Tables (`sui`.`CORE`.`<table_name>`)
- [dim_labels](#!/model/model.sui_models.core__dim_labels)
The SUI models are built using three layers of SQL models: **bronze, silver, and gold (or core/defi/nft).**
- Bronze: Data is loaded in from the source as a view
- Silver: All necessary parsing, filtering, de-duping, and other transformations are done here
- Gold (or core/defi/nft): Final views and tables that are available publicly
The dimension tables are sourced from a variety of on-chain and off-chain sources.
Convenience views (denoted ez_) are a combination of different fact and dimension tables. These views are built to make it easier to query the data.
## **Using dbt docs**
### Navigation
You can use the ```Project``` and ```Database``` navigation tabs on the left side of the window to explore the models in the project.
### Database Tab
This view shows relations (tables and views) grouped into database schemas. Note that ephemeral models are *not* shown in this interface, as they do not exist in the database.
### Graph Exploration
You can click the blue icon on the bottom-right corner of the page to view the lineage graph of your models.
On model pages, you'll see the immediate parents and children of the model you're exploring. By clicking the Expand butsui at the top-right of this lineage pane, you'll be able to see all of the models that are used to build, or are built from, the model you're exploring.
Once expanded, you'll be able to use the ```--models``` and ```--exclude``` model selection syntax to filter the models in the graph. For more information on model selection, check out the [dbt docs](https://docs.getdbt.com/docs/model-selection-syntax).
Note that you can also right-click on models to interactively filter and explore the graph.
### **More information**
- [Flipside](https://flipsidecrypto.xyz/)
- [Github](https://github.com/FlipsideCrypto/sui-models)
- [What is dbt?](https://docs.getdbt.com/docs/introduction)
{% enddocs %}

View File

@ -0,0 +1,58 @@
{{ config (
materialized = "incremental",
unique_key = "fact_transaction_balance_changes_id",
cluster_by = ['block_timestamp::DATE'],
incremental_predicates = ["dynamic_range_predicate", "block_timestamp::date"],
tags = ['gold','core']
) }}
WITH base AS (
SELECT
A.checkpoint_number,
A.block_timestamp,
A.tx_digest,
A.transaction_json :"transaction" :"data" :"transaction" :"kind" :: STRING AS tx_kind,
A.transaction_json :"transaction" :"data" :"sender" :: STRING AS tx_sender,
A.transaction_json :"transaction" :"data" :"messageVersion" :: STRING AS message_version,
CASE
WHEN transaction_json :"effects" :"status" :"status" = 'failure' THEN FALSE
ELSE TRUE
END AS tx_succeeded,
b.index AS balance_change_index,
b.value AS bc_value,
bc_value :"amount" :: bigint AS amount,
bc_value :"coinType" :: STRING AS coin_type,
bc_value :"owner" :"AddressOwner" :: STRING AS owner
FROM
{{ ref('silver__transactions') }} A,
LATERAL FLATTEN(
A.transaction_json :"balanceChanges"
) b
{% if is_incremental() %}
WHERE
modified_timestamp >= (
SELECT
COALESCE(MAX(modified_timestamp), '1970-01-01' :: TIMESTAMP) AS modified_timestamp
FROM
{{ this }})
{% endif %}
)
SELECT
checkpoint_number,
block_timestamp,
tx_digest,
tx_kind,
tx_sender,
message_version,
tx_succeeded,
balance_change_index,
coin_type,
amount,
owner,
{{ dbt_utils.generate_surrogate_key(['tx_digest','balance_change_index']) }} AS fact_transaction_balance_changes_id,
SYSDATE() AS inserted_timestamp,
SYSDATE() AS modified_timestamp
FROM
base

View File

@ -0,0 +1,68 @@
{{ config (
materialized = "incremental",
unique_key = "fact_changes_id",
cluster_by = ['block_timestamp::DATE'],
incremental_predicates = ["dynamic_range_predicate", "block_timestamp::date"],
tags = ['gold','core']
) }}
WITH base AS (
SELECT
A.checkpoint_number,
A.block_timestamp,
A.tx_digest,
A.transaction_json :"transaction" :"data" :"transaction" :"kind" :: STRING AS tx_kind,
A.transaction_json :"transaction" :"data" :"sender" :: STRING AS tx_sender,
A.transaction_json :"transaction" :"data" :"messageVersion" :: STRING AS message_version,
CASE
WHEN transaction_json :"effects" :"status" :"status" = 'failure' THEN FALSE
ELSE TRUE
END AS tx_succeeded,
b.index AS change_index,
b.value AS change_value,
change_value :"type" :: STRING AS TYPE,
change_value :"sender" :: STRING AS sender,
change_value :"digest" :: STRING AS digest,
change_value :"objectId" :: STRING AS object_id,
change_value :"objectType" :: STRING AS object_type,
change_value :"version" :BIGINT AS version,
change_value :"previousVersion" :BIGINT AS previous_version,
change_value :"owner" :"ObjectOwner" :: STRING AS object_owner,
FROM
{{ ref('silver__transactions') }} A,
LATERAL FLATTEN(
A.transaction_json :"objectChanges"
) b
{% if is_incremental() %}
WHERE
modified_timestamp >= (
SELECT
COALESCE(MAX(modified_timestamp), '1970-01-01' :: TIMESTAMP) AS modified_timestamp
FROM
{{ this }})
{% endif %}
)
SELECT
checkpoint_number,
block_timestamp,
tx_digest,
tx_kind,
tx_sender,
message_version,
tx_succeeded,
change_index,
TYPE,
sender,
digest,
object_id,
object_type,
version,
previous_version,
object_owner,
{{ dbt_utils.generate_surrogate_key(['tx_digest','change_index']) }} AS fact_changes_id,
SYSDATE() AS inserted_timestamp,
SYSDATE() AS modified_timestamp
FROM
base

View File

@ -0,0 +1,34 @@
{{ config (
materialized = "incremental",
unique_key = "checkpoint_number",
cluster_by = ['block_timestamp::DATE'],
incremental_predicates = ["dynamic_range_predicate", "block_timestamp::date"],
tags = ['gold','core']
) }}
SELECT
checkpoint_number,
block_timestamp,
checkpoint_json :"epoch" :: INT AS epoch,
checkpoint_json :"digest" :: STRING AS checkpoint_digest,
checkpoint_json :"previousDigest" :: STRING AS previous_digest,
checkpoint_json :"networkTotalTransactions" :: bigint AS network_total_transactions,
checkpoint_json :"validatorSignature" :: STRING AS validator_signature,
ARRAY_SIZE(
checkpoint_json :"transactions"
) AS tx_count,
checkpoint_json :"transactions" AS transactions_array,
{{ dbt_utils.generate_surrogate_key(['checkpoint_number']) }} AS fact_checkpoints_id,
SYSDATE() AS inserted_timestamp,
SYSDATE() AS modified_timestamp
FROM
{{ ref('silver__checkpoints') }}
{% if is_incremental() %}
WHERE
modified_timestamp >= (
SELECT
COALESCE(MAX(modified_timestamp), '1970-01-01' :: TIMESTAMP) AS modified_timestamp
FROM
{{ this }})
{% endif %}

View File

@ -0,0 +1,77 @@
{{ config (
materialized = "incremental",
unique_key = "fact_events_id",
cluster_by = ['block_timestamp::DATE'],
incremental_predicates = ["dynamic_range_predicate", "block_timestamp::date"],
tags = ['gold','core']
) }}
WITH base AS (
SELECT
A.checkpoint_number,
A.block_timestamp,
A.tx_digest,
A.transaction_json :"transaction" :"data" :"transaction" :"kind" :: STRING AS tx_kind,
A.transaction_json :"transaction" :"data" :"sender" :: STRING AS tx_sender,
A.transaction_json :"transaction" :"data" :"messageVersion" :: STRING AS message_version,
CASE
WHEN transaction_json :"effects" :"status" :"status" = 'failure' THEN FALSE
ELSE TRUE
END AS tx_succeeded,
b.value AS event_value,
event_value :"id" :"eventSeq" :: STRING AS event_index,
event_value :"packageId" :: STRING AS package_id,
event_value :"transactionModule" :: STRING AS transaction_module,
event_value :"sender" :: STRING AS sender,
event_value :"type" :: STRING AS TYPE,
event_value :"parsedJson" AS parsed_json
FROM
{{ ref('silver__transactions') }} A,
LATERAL FLATTEN(
A.transaction_json :"events"
) b
{% if is_incremental() %}
WHERE
modified_timestamp >= (
SELECT
COALESCE(MAX(modified_timestamp), '1970-01-01' :: TIMESTAMP) AS modified_timestamp
FROM
{{ this }})
{% endif %}
)
SELECT
checkpoint_number,
block_timestamp,
tx_digest,
tx_kind,
tx_sender,
message_version,
tx_succeeded,
event_index,
TYPE,
SPLIT_PART(
TYPE,
'::',
1
) AS event_address,
SPLIT_PART(
TYPE,
'::',
2
) AS event_module,
SPLIT_PART(
TYPE,
'::',
3
) AS event_resource,
package_id,
transaction_module,
sender,
parsed_json,
{{ dbt_utils.generate_surrogate_key(['tx_digest','event_index']) }} AS fact_events_id,
SYSDATE() AS inserted_timestamp,
SYSDATE() AS modified_timestamp
FROM
base

View File

@ -0,0 +1,74 @@
{{ config (
materialized = "incremental",
unique_key = "tx_digest",
cluster_by = ['block_timestamp::DATE'],
incremental_predicates = ["dynamic_range_predicate", "block_timestamp::date"],
tags = ['gold','core']
) }}
WITH base AS (
SELECT
checkpoint_number,
block_timestamp,
tx_digest,
transaction_json :"transaction" :"data" :"transaction" :"kind" :: STRING AS tx_kind,
transaction_json :"transaction" :"data" :"sender" :: STRING AS tx_sender,
transaction_json :"transaction" :"data" :"messageVersion" :: STRING AS message_version,
CASE
WHEN transaction_json :"effects" :"status" :"status" = 'failure' THEN FALSE
ELSE TRUE
END AS tx_succeeded,
transaction_json :"effects" :"status" :"error" :: STRING AS tx_error,
{# transaction_json :"transaction" :txSignatures AS tx_signatures, #}
transaction_json :"effects": "dependencies" AS tx_dependencies,
{# transaction_json :"effects": "gasObject" :"reference" :"digest" :: STRING AS gas_digest, #}
transaction_json :"effects": "gasUsed" :"computationCost" :: bigint AS gas_used_computation_cost,
transaction_json :"effects": "gasUsed" :"nonRefundableStorageFee" :: bigint AS gas_used_non_refundable_storage_fee,
transaction_json :"effects": "gasUsed" :"storageCost" :: bigint AS gas_used_storage_cost,
transaction_json :"effects": "gasUsed" :"storageRebate" :: bigint AS gas_used_storage_rebate,
transaction_json :"transaction" :"data" :"gasData" :"budget" :: bigint AS gas_budget,
transaction_json :"transaction" :"data" :"gasData" :"owner" :: STRING AS gas_owner,
transaction_json :"transaction" :"data" :"gasData" :"price" :: bigint AS gas_price,
{# transaction_json :"transaction" :"data" :"gasData" :"payment" AS gas_payments, #}
(
gas_used_computation_cost + gas_used_storage_cost - gas_used_storage_rebate
) / pow(
10,
9
) AS tx_fee
FROM
{{ ref('silver__transactions') }}
{% if is_incremental() %}
WHERE
modified_timestamp >= (
SELECT
COALESCE(MAX(modified_timestamp), '1970-01-01' :: TIMESTAMP) AS modified_timestamp
FROM
{{ this }})
{% endif %}
)
SELECT
checkpoint_number,
block_timestamp,
tx_digest,
tx_kind,
tx_sender,
message_version,
tx_fee,
tx_succeeded,
tx_error,
tx_dependencies,
gas_used_computation_cost,
gas_used_non_refundable_storage_fee,
gas_used_storage_cost,
gas_used_storage_rebate,
gas_price,
gas_budget,
gas_owner,
{{ dbt_utils.generate_surrogate_key(['tx_digest']) }} AS fact_transaction_blocks_id,
SYSDATE() AS inserted_timestamp,
SYSDATE() AS modified_timestamp
FROM
base

View File

@ -0,0 +1,70 @@
{{ config (
materialized = "incremental",
unique_key = "fact_transaction_inputs_id",
cluster_by = ['block_timestamp::DATE'],
incremental_predicates = ["dynamic_range_predicate", "block_timestamp::date"],
tags = ['gold','core']
) }}
WITH base AS (
SELECT
A.checkpoint_number,
A.block_timestamp,
A.tx_digest,
A.transaction_json :"transaction" :"data" :"transaction" :"kind" :: STRING AS tx_kind,
A.transaction_json :"transaction" :"data" :"sender" :: STRING AS tx_sender,
A.transaction_json :"transaction" :"data" :"messageVersion" :: STRING AS message_version,
CASE
WHEN transaction_json :"effects" :"status" :"status" = 'failure' THEN FALSE
ELSE TRUE
END AS tx_succeeded,
b.index AS input_index,
b.value AS input_value,
input_value :"initialSharedVersion" :: STRING AS initial_shared_version,
input_value :"mutable" :: BOOLEAN AS mutable,
input_value :"objectId" :: STRING AS object_id,
input_value :"objectType" :: STRING AS object_type,
input_value :"type" :: STRING AS TYPE,
input_value :"version" :: bigint AS version,
input_value :"digest" :: STRING AS digest,
input_value :"value" :: STRING AS VALUE,
input_value :"valueType" :: STRING AS value_type
FROM
{{ ref('silver__transactions') }} A,
LATERAL FLATTEN(
A.transaction_json :"transaction" :"data" :"transaction": "inputs"
) b
{% if is_incremental() %}
WHERE
modified_timestamp >= (
SELECT
COALESCE(MAX(modified_timestamp), '1970-01-01' :: TIMESTAMP) AS modified_timestamp
FROM
{{ this }})
{% endif %}
)
SELECT
checkpoint_number,
block_timestamp,
tx_digest,
tx_kind,
tx_sender,
message_version,
tx_succeeded,
input_index,
TYPE,
version,
object_id,
object_type,
digest,
VALUE,
value_type,
initial_shared_version,
mutable,
{{ dbt_utils.generate_surrogate_key(['tx_digest','input_index']) }} AS fact_transaction_inputs_id,
SYSDATE() AS inserted_timestamp,
SYSDATE() AS modified_timestamp
FROM
base

View File

@ -0,0 +1,58 @@
{{ config (
materialized = "incremental",
unique_key = "fact_transactions_id",
cluster_by = ['block_timestamp::DATE'],
incremental_predicates = ["dynamic_range_predicate", "block_timestamp::date"],
tags = ['gold','core']
) }}
WITH base AS (
SELECT
A.checkpoint_number,
A.block_timestamp,
A.tx_digest,
A.transaction_json :"transaction" :"data" :"transaction" :"kind" :: STRING AS tx_kind,
A.transaction_json :"transaction" :"data" :"sender" :: STRING AS tx_sender,
A.transaction_json :"transaction" :"data" :"messageVersion" :: STRING AS message_version,
CASE
WHEN transaction_json :"effects" :"status" :"status" = 'failure' THEN FALSE
ELSE TRUE
END AS tx_succeeded,
b.index AS payload_index,
C.key AS payload_type,
C.value AS payload_details
FROM
{{ ref('silver__transactions') }} A,
LATERAL FLATTEN(
A.transaction_json :"transaction" :"data" :"transaction": "transactions"
) b,
LATERAL FLATTEN(
b.value
) C
{% if is_incremental() %}
WHERE
modified_timestamp >= (
SELECT
COALESCE(MAX(modified_timestamp), '1970-01-01' :: TIMESTAMP) AS modified_timestamp
FROM
{{ this }})
{% endif %}
)
SELECT
checkpoint_number,
block_timestamp,
tx_digest,
tx_kind,
tx_sender,
message_version,
tx_succeeded,
payload_index,
payload_type,
payload_details,
{{ dbt_utils.generate_surrogate_key(['tx_digest','payload_index']) }} AS fact_transactions_id,
SYSDATE() AS inserted_timestamp,
SYSDATE() AS modified_timestamp
FROM
base

View File

@ -0,0 +1,72 @@
version: 2
models:
- name: core__fact_checkpoints
description: "Contains information about checkpoints in the Sui blockchain"
columns:
- name: checkpoint_number
description: "Unique identifier for the checkpoint within the blockchain."
data_type: integer
- name: block_timestamp
description: "Timestamp when the checkpoint was created."
data_type: timestamp
- name: epoch
description: "Epoch number in which the checkpoint was included."
data_type: integer
- name: checkpoint_digest
description: "The digest identified for the checkpoint"
data_type: string
- name: previous_digest
description: "Digest of the previous checkpoint for chain continuity."
data_type: string
- name: network_total_transactions
description: "Cumulative total of transactions on the network up to this checkpoint."
data_type: bigint
- name: validator_signature
description: "Signature of the validator for this block."
data_type: string
- name: tx_count
description: "Total number of transactions included in this checkpoint."
data_type: bigint
- name: transactions_array
description: "The array of transactions included in this checkpoint."
data_type: variant
- name: FACT_CHECKPOINTS_ID
description: "Surrogate key for the checkpoint fact table, generated from the checkpoint number and block timestamp."
data_type: text
- name: INSERTED_TIMESTAMP
description: "Timestamp when the record was inserted into the database."
data_type: timestamp
- name: MODIFIED_TIMESTAMP
description: "Timestamp when the record was last modified."
data_type: timestamp
config:
contract:
enforced: true
tests:
- dbt_utils.recency:
datepart: hour
field: block_timestamp
interval: 12
severity: error
tags: ['test_recency']
- dbt_utils.sequential_values:
column_name: checkpoint_number
interval: 1
config:
severity: error
error_if: ">1"
tags: ['test_recency']

View File

@ -0,0 +1,49 @@
-- depends_on: {{ ref('bronze__checkpoints') }}
{{ config (
materialized = "incremental",
unique_key = "checkpoint_number",
cluster_by = ['modified_timestamp::DATE','block_timestamp::DATE'],
incremental_predicates = ["dynamic_range_predicate", "block_timestamp::date"],
tags = ['silver','core']
) }}
WITH bronze_checks AS (
SELECT
DATA :"result" :"sequenceNumber" :: bigint AS checkpoint_number,
TO_TIMESTAMP(
DATA :"result" :"timestampMs"
) AS block_timestamp,
partition_key,
DATA :result AS checkpoint_json,
_inserted_timestamp
FROM
{% if is_incremental() %}
{{ ref('bronze__checkpoints') }}
WHERE
_inserted_timestamp >= (
SELECT
COALESCE(MAX(_inserted_timestamp), '1900-01-01' :: TIMESTAMP) AS _inserted_timestamp
FROM
{{ this }})
{% else %}
{{ ref('bronze__checkpoints_FR') }}
{% endif %}
)
SELECT
checkpoint_number,
block_timestamp,
partition_key,
checkpoint_json,
_inserted_timestamp,
{{ dbt_utils.generate_surrogate_key(['checkpoint_number']) }} AS checkpoints_id,
SYSDATE() AS inserted_timestamp,
SYSDATE() AS modified_timestamp,
'{{ invocation_id }}' AS _invocation_id
FROM
bronze_checks qualify ROW_NUMBER() over (
PARTITION BY checkpoint_number
ORDER BY
_inserted_timestamp DESC
) = 1

View File

@ -0,0 +1,54 @@
-- depends_on: {{ ref('bronze__transactions') }}
{{ config (
materialized = "incremental",
unique_key = "tx_digest",
cluster_by = ['modified_timestamp::DATE','block_timestamp::DATE'],
incremental_predicates = ["dynamic_range_predicate", "block_timestamp::date"],
tags = ['silver','core']
) }}
WITH bronze_txs AS (
SELECT
DATA :"checkpoint" :: bigint AS checkpoint_number,
DATA :"digest" :: STRING AS tx_digest,
TO_TIMESTAMP(
DATA :"timestampMs"
) AS block_timestamp,
partition_key,
DATA AS transaction_json,
_inserted_timestamp
FROM
{% if is_incremental() %}
{{ ref('bronze__transactions') }}
WHERE
DATA :error IS NULL
AND _inserted_timestamp >= (
SELECT
COALESCE(MAX(_inserted_timestamp), '1900-01-01' :: TIMESTAMP) AS _inserted_timestamp
FROM
{{ this }})
{% else %}
{{ ref('bronze__transactions_FR') }}
WHERE
DATA :error IS NULL
{% endif %}
)
SELECT
checkpoint_number,
tx_digest,
block_timestamp,
partition_key,
transaction_json,
_inserted_timestamp,
{{ dbt_utils.generate_surrogate_key(['checkpoint_number','tx_digest']) }} AS transactions_id,
SYSDATE() AS inserted_timestamp,
SYSDATE() AS modified_timestamp,
'{{ invocation_id }}' AS _invocation_id
FROM
bronze_txs qualify ROW_NUMBER() over (
PARTITION BY tx_digest
ORDER BY
_inserted_timestamp DESC
) = 1

View File

@ -0,0 +1,826 @@
version: 2
models:
- name: silver__account_states
config:
contract:
enforced: true
columns:
- name: block_date
data_type: DATE
- name: account
data_type: VARCHAR
- name: account_states_id
data_type: VARCHAR
- name: account_status
data_type: VARCHAR
- name: balance
data_type: NUMBER
- name: code_boc
data_type: BINARY
- name: code_hash
data_type: VARCHAR
- name: data_boc
data_type: BINARY
- name: data_hash
data_type: VARCHAR
- name: frozen_hash
data_type: VARCHAR
- name: hash
data_type: VARCHAR
- name: inserted_timestamp
data_type: TIMESTAMP_NTZ
- name: last_trans_hash
data_type: VARCHAR
- name: last_trans_lt
data_type: NUMBER
- name: modified_timestamp
data_type: TIMESTAMP_NTZ
- name: timestamp
data_type: NUMBER
- name: _inserted_timestamp
data_type: TIMESTAMP_NTZ
- name: _invocation_id
data_type: VARCHAR
- name: silver__balances_history
config:
contract:
enforced: true
columns:
- name: block_date
data_type: DATE
- name: lt
data_type: NUMBER
- name: address
data_type: VARCHAR
- name: amount
data_type: NUMBER
- name: asset
data_type: VARCHAR
- name: balances_history_id
data_type: VARCHAR
- name: inserted_timestamp
data_type: TIMESTAMP_NTZ
- name: mintless_claimed
data_type: BOOLEAN
- name: modified_timestamp
data_type: TIMESTAMP_NTZ
- name: timestamp
data_type: NUMBER
- name: _inserted_timestamp
data_type: TIMESTAMP_NTZ
- name: _invocation_id
data_type: VARCHAR
- name: silver__blocks
config:
contract:
enforced: true
columns:
- name: block_date
data_type: DATE
- name: version
data_type: NUMBER
- name: created_by
data_type: VARCHAR
- name: end_lt
data_type: NUMBER
- name: want_merge
data_type: BOOLEAN
- name: gen_utime
data_type: NUMBER
- name: tx_count
data_type: NUMBER
- name: global_id
data_type: NUMBER
- name: root_hash
data_type: VARCHAR
- name: key_block
data_type: BOOLEAN
- name: mc_block_seqno
data_type: NUMBER
- name: vert_seqno_incr
data_type: BOOLEAN
- name: validator_list_hash_short
data_type: NUMBER
- name: after_merge
data_type: BOOLEAN
- name: want_split
data_type: BOOLEAN
- name: after_split
data_type: BOOLEAN
- name: master_ref_seqno
data_type: NUMBER
- name: mc_block_workchain
data_type: NUMBER
- name: file_hash
data_type: VARCHAR
- name: prev_key_block_seqno
data_type: NUMBER
- name: shard
data_type: NUMBER
- name: seqno
data_type: NUMBER
- name: vert_seqno
data_type: NUMBER
- name: flags
data_type: NUMBER
- name: rand_seed
data_type: VARCHAR
- name: gen_catchain_seqno
data_type: NUMBER
- name: min_ref_mc_seqno
data_type: NUMBER
- name: start_lt
data_type: NUMBER
- name: mc_block_shard
data_type: NUMBER
- name: before_split
data_type: BOOLEAN
- name: workchain
data_type: NUMBER
- name: blocks_id
data_type: VARCHAR
- name: inserted_timestamp
data_type: TIMESTAMP_NTZ
- name: modified_timestamp
data_type: TIMESTAMP_NTZ
- name: _inserted_timestamp
data_type: TIMESTAMP_NTZ
- name: _invocation_id
data_type: VARCHAR
- name: silver__dex_pools
config:
contract:
enforced: true
columns:
- name: block_date
data_type: DATE
- name: reserves_right
data_type: NUMBER
- name: dex_pools_id
data_type: VARCHAR
- name: discovered_at
data_type: NUMBER
- name: inserted_timestamp
data_type: TIMESTAMP_NTZ
- name: is_liquid
data_type: BOOLEAN
- name: jetsui_left
data_type: VARCHAR
- name: jetsui_right
data_type: VARCHAR
- name: last_updated
data_type: NUMBER
- name: lp_fee
data_type: NUMBER
- name: modified_timestamp
data_type: TIMESTAMP_NTZ
- name: pool
data_type: VARCHAR
- name: project
data_type: VARCHAR
- name: protocol_fee
data_type: NUMBER
- name: referral_fee
data_type: NUMBER
- name: reserves_left
data_type: NUMBER
- name: total_supply
data_type: NUMBER
- name: tvl_sui
data_type: NUMBER
- name: tvl_usd
data_type: NUMBER
- name: version
data_type: NUMBER
- name: _inserted_timestamp
data_type: TIMESTAMP_NTZ
- name: _invocation_id
data_type: VARCHAR
- name: silver__dex_trades
config:
contract:
enforced: true
columns:
- name: block_date
data_type: DATE
- name: volume_sui
data_type: NUMBER
- name: amount_bought_raw
data_type: NUMBER
- name: amount_sold_raw
data_type: NUMBER
- name: dex_trades_id
data_type: VARCHAR
- name: event_time
data_type: NUMBER
- name: event_type
data_type: VARCHAR
- name: inserted_timestamp
data_type: TIMESTAMP_NTZ
- name: modified_timestamp
data_type: TIMESTAMP_NTZ
- name: platform_tag
data_type: VARCHAR
- name: pool_address
data_type: VARCHAR
- name: project
data_type: VARCHAR
- name: project_type
data_type: VARCHAR
- name: query_id
data_type: NUMBER
- name: referral_address
data_type: VARCHAR
- name: router_address
data_type: VARCHAR
- name: token_bought_address
data_type: VARCHAR
- name: token_sold_address
data_type: VARCHAR
- name: trace_id
data_type: VARCHAR
- name: trader_address
data_type: VARCHAR
- name: tx_hash
data_type: VARCHAR
- name: version
data_type: NUMBER
- name: volume_usd
data_type: NUMBER
- name: _inserted_timestamp
data_type: TIMESTAMP_NTZ
- name: _invocation_id
data_type: VARCHAR
- name: silver__jetsui_events
config:
contract:
enforced: true
columns:
- name: block_date
data_type: DATE
- name: tx_hash
data_type: VARCHAR
- name: amount
data_type: NUMBER
- name: comment
data_type: VARCHAR
- name: destination
data_type: VARCHAR
- name: forward_sui_amount
data_type: NUMBER
- name: inserted_timestamp
data_type: TIMESTAMP_NTZ
- name: jetsui_events_id
data_type: VARCHAR
- name: jetsui_master
data_type: VARCHAR
- name: jetsui_wallet
data_type: VARCHAR
- name: modified_timestamp
data_type: TIMESTAMP_NTZ
- name: query_id
data_type: NUMBER
- name: trace_id
data_type: VARCHAR
- name: source
data_type: VARCHAR
- name: tx_aborted
data_type: BOOLEAN
- name: tx_lt
data_type: NUMBER
- name: type
data_type: VARCHAR
- name: utime
data_type: NUMBER
- name: _inserted_timestamp
data_type: TIMESTAMP_NTZ
- name: _invocation_id
data_type: VARCHAR
- name: silver__jetsui_metadata
config:
contract:
enforced: true
columns:
- name: adding_date
data_type: DATE
- name: suiapi_image_url
data_type: VARCHAR
- name: adding_at
data_type: NUMBER
- name: address
data_type: VARCHAR
- name: admin_address
data_type: VARCHAR
- name: code_hash
data_type: VARCHAR
- name: decimals
data_type: NUMBER
- name: description
data_type: VARCHAR
- name: image
data_type: VARCHAR
- name: image_data
data_type: VARCHAR
- name: inserted_timestamp
data_type: TIMESTAMP_NTZ
- name: jetsui_content_onchain
data_type: VARCHAR
- name: jetsui_metadata_id
data_type: VARCHAR
- name: jetsui_wallet_code_hash
data_type: VARCHAR
- name: metadata_status
data_type: NUMBER
- name: mintable
data_type: BOOLEAN
- name: modified_timestamp
data_type: TIMESTAMP_NTZ
- name: name
data_type: VARCHAR
- name: sources
data_type: VARIANT
- name: symbol
data_type: VARCHAR
- name: update_time_metadata
data_type: NUMBER
- name: update_time_onchain
data_type: NUMBER
- name: _inserted_timestamp
data_type: TIMESTAMP_NTZ
- name: _invocation_id
data_type: VARCHAR
- name: silver__messages_with_data
config:
contract:
enforced: true
columns:
- name: block_date
data_type: DATE
- name: body_boc
data_type: BINARY
- name: body_hash
data_type: VARCHAR
- name: bounce
data_type: BOOLEAN
- name: bounced
data_type: BOOLEAN
- name: comment
data_type: VARCHAR
- name: created_at
data_type: NUMBER
- name: created_lt
data_type: NUMBER
- name: destination
data_type: VARCHAR
- name: direction
data_type: VARCHAR
- name: fwd_fee
data_type: NUMBER
- name: ihr_disabled
data_type: BOOLEAN
- name: ihr_fee
data_type: NUMBER
- name: import_fee
data_type: NUMBER
- name: init_state_boc
data_type: BINARY
- name: init_state_hash
data_type: VARCHAR
- name: inserted_timestamp
data_type: TIMESTAMP_NTZ
- name: messages_with_data_id
data_type: VARCHAR
- name: modified_timestamp
data_type: TIMESTAMP_NTZ
- name: msg_hash
data_type: VARCHAR
- name: opcode
data_type: NUMBER
- name: source
data_type: VARCHAR
- name: trace_id
data_type: VARCHAR
- name: tx_hash
data_type: VARCHAR
- name: tx_lt
data_type: NUMBER
- name: tx_now
data_type: NUMBER
- name: _inserted_timestamp
data_type: TIMESTAMP_NTZ
- name: _invocation_id
data_type: VARCHAR
- name: _value
data_type: NUMBER
- name: silver__nft_events
config:
contract:
enforced: true
columns:
- name: block_date
data_type: DATE
- name: sale_price
data_type: NUMBER
- name: royalty_address
data_type: VARCHAR
- name: payment_asset
data_type: VARCHAR
- name: marketplace_fee_address
data_type: VARCHAR
- name: owner_address
data_type: VARCHAR
- name: collection_address
data_type: VARCHAR
- name: content_onchain
data_type: TEXT
- name: trace_id
data_type: VARCHAR
- name: sale_contract
data_type: VARCHAR
- name: forward_amount
data_type: NUMBER
- name: nft_item_index
data_type: TEXT
- name: query_id
data_type: NUMBER
- name: is_init
data_type: BOOLEAN
- name: timestamp
data_type: NUMBER
- name: nft_item_address
data_type: VARCHAR
- name: custom_payload
data_type: BINARY
- name: comment
data_type: VARCHAR
- name: sale_end_time
data_type: NUMBER
- name: sale_type
data_type: VARCHAR
- name: auction_max_bid
data_type: NUMBER
- name: auction_min_bid
data_type: NUMBER
- name: marketplace_address
data_type: VARCHAR
- name: forward_payload
data_type: BINARY
- name: royalty_amount
data_type: NUMBER
- name: auction_min_step
data_type: NUMBER
- name: type
data_type: VARCHAR
- name: prev_owner
data_type: VARCHAR
- name: tx_hash
data_type: VARCHAR
- name: marketplace_fee
data_type: NUMBER
- name: lt
data_type: NUMBER
- name: nft_events_id
data_type: VARCHAR
- name: inserted_timestamp
data_type: TIMESTAMP_NTZ
- name: modified_timestamp
data_type: TIMESTAMP_NTZ
- name: _inserted_timestamp
data_type: TIMESTAMP_NTZ
- name: _invocation_id
data_type: VARCHAR
- name: silver__nft_items
config:
contract:
enforced: true
columns:
- name: block_date
data_type: DATE
- name: collection_address
data_type: VARCHAR
- name: is_init
data_type: BOOLEAN
- name: lt
data_type: NUMBER
- name: timestamp
data_type: NUMBER
- name: address
data_type: VARCHAR
- name: owner_address
data_type: VARCHAR
- name: index
data_type: TEXT
- name: content_onchain
data_type: TEXT
- name: nft_items_id
data_type: VARCHAR
- name: inserted_timestamp
data_type: TIMESTAMP_NTZ
- name: modified_timestamp
data_type: TIMESTAMP_NTZ
- name: _inserted_timestamp
data_type: TIMESTAMP_NTZ
- name: _invocation_id
data_type: VARCHAR
- name: silver__nft_metadata
config:
contract:
enforced: true
columns:
- name: adding_date
data_type: DATE
- name: description
data_type: VARCHAR
- name: image
data_type: VARCHAR
- name: metadata_status
data_type: NUMBER
- name: parent_address
data_type: VARCHAR
- name: update_time_metadata
data_type: NUMBER
- name: adding_at
data_type: NUMBER
- name: update_time_onchain
data_type: NUMBER
- name: address
data_type: VARCHAR
- name: suiapi_image_url
data_type: VARCHAR
- name: content_onchain
data_type: VARCHAR
- name: type
data_type: VARCHAR
- name: attributes
data_type: VARCHAR
- name: name
data_type: VARCHAR
- name: sources
data_type: VARIANT
- name: image_data
data_type: VARCHAR
- name: nft_metadata_id
data_type: VARCHAR
- name: inserted_timestamp
data_type: TIMESTAMP_NTZ
- name: modified_timestamp
data_type: TIMESTAMP_NTZ
- name: _inserted_timestamp
data_type: TIMESTAMP_NTZ
- name: _invocation_id
data_type: VARCHAR
- name: silver__nft_sales
config:
contract:
enforced: true
columns:
- name: block_date
data_type: DATE
- name: is_canceled
data_type: BOOLEAN
- name: marketplace_fee_address
data_type: VARCHAR
- name: end_time
data_type: NUMBER
- name: is_complete
data_type: BOOLEAN
- name: last_member
data_type: VARCHAR
- name: marketplace_address
data_type: VARCHAR
- name: royalty_amount
data_type: NUMBER
- name: created_at
data_type: NUMBER
- name: nft_address
data_type: VARCHAR
- name: marketplace_fee
data_type: NUMBER
- name: asset
data_type: VARCHAR
- name: price
data_type: NUMBER
- name: nft_owner_address
data_type: VARCHAR
- name: address
data_type: VARCHAR
- name: min_bid
data_type: NUMBER
- name: timestamp
data_type: NUMBER
- name: royalty_address
data_type: VARCHAR
- name: min_step
data_type: NUMBER
- name: max_bid
data_type: NUMBER
- name: last_bid_at
data_type: NUMBER
- name: lt
data_type: NUMBER
- name: type
data_type: VARCHAR
- name: nft_sales_id
data_type: VARCHAR
- name: inserted_timestamp
data_type: TIMESTAMP_NTZ
- name: modified_timestamp
data_type: TIMESTAMP_NTZ
- name: _inserted_timestamp
data_type: TIMESTAMP_NTZ
- name: _invocation_id
data_type: VARCHAR
- name: silver__nft_transfers
config:
contract:
enforced: true
columns:
- name: block_date
data_type: DATE
- name: trace_id
data_type: VARCHAR
- name: tx_now
data_type: NUMBER
- name: custom_payload
data_type: BINARY
- name: new_owner
data_type: VARCHAR
- name: forward_payload
data_type: BINARY
- name: comment
data_type: VARCHAR
- name: old_owner
data_type: VARCHAR
- name: tx_aborted
data_type: BOOLEAN
- name: query_id
data_type: NUMBER
- name: tx_hash
data_type: VARCHAR
- name: tx_lt
data_type: NUMBER
- name: response_destination
data_type: VARCHAR
- name: nft_collection_address
data_type: VARCHAR
- name: forward_amount
data_type: NUMBER
- name: nft_item_address
data_type: VARCHAR
- name: nft_item_index
data_type: VARCHAR
- name: nft_transfers_id
data_type: VARCHAR
- name: inserted_timestamp
data_type: TIMESTAMP_NTZ
- name: modified_timestamp
data_type: TIMESTAMP_NTZ
- name: _inserted_timestamp
data_type: TIMESTAMP_NTZ
- name: _invocation_id
data_type: VARCHAR
- name: silver__transactions
config:
contract:
enforced: true
columns:
- name: block_date
data_type: DATE
- name: compute_success
data_type: BOOLEAN
- name: total_fees
data_type: NUMBER
- name: orig_status
data_type: VARCHAR
- name: compute_skipped
data_type: BOOLEAN
- name: compute_gas_fees
data_type: NUMBER
- name: action_result_code
data_type: NUMBER
- name: destroyed
data_type: BOOLEAN
- name: action_success
data_type: BOOLEAN
- name: compute_msg_state_used
data_type: BOOLEAN
- name: is_tock
data_type: BOOLEAN
- name: account_state_hash_after
data_type: VARCHAR
- name: action_spec_actions
data_type: NUMBER
- name: descr
data_type: VARCHAR
- name: account_state_balance_before
data_type: NUMBER
- name: hash
data_type: VARCHAR
- name: action_result_arg
data_type: NUMBER
- name: aborted
data_type: BOOLEAN
- name: mc_block_seqno
data_type: NUMBER
- name: compute_account_activated
data_type: BOOLEAN
- name: action_skipped_actions
data_type: NUMBER
- name: now
data_type: NUMBER
- name: credit_due_fees_collected
data_type: NUMBER
- name: block_shard
data_type: NUMBER
- name: end_status
data_type: VARCHAR
- name: credit_first
data_type: BOOLEAN
- name: prev_trans_hash
data_type: VARCHAR
- name: block_workchain
data_type: NUMBER
- name: account
data_type: VARCHAR
- name: compute_vm_steps
data_type: NUMBER
- name: storage_fees_collected
data_type: NUMBER
- name: compute_exit_arg
data_type: NUMBER
- name: action_valid
data_type: BOOLEAN
- name: action_status_change
data_type: VARCHAR
- name: installed
data_type: BOOLEAN
- name: prev_trans_lt
data_type: NUMBER
- name: compute_gas_credit
data_type: NUMBER
- name: compute_gas_limit
data_type: NUMBER
- name: skipped_reason
data_type: VARCHAR
- name: action_total_fwd_fees
data_type: NUMBER
- name: account_state_code_hash_before
data_type: VARCHAR
- name: account_state_hash_before
data_type: VARCHAR
- name: compute_exit_code
data_type: NUMBER
- name: trace_id
data_type: VARCHAR
- name: block_seqno
data_type: NUMBER
- name: storage_status_change
data_type: VARCHAR
- name: lt
data_type: NUMBER
- name: compute_mode
data_type: NUMBER
- name: credit
data_type: NUMBER
- name: storage_fees_due
data_type: NUMBER
- name: compute_gas_used
data_type: NUMBER
- name: account_state_code_hash_after
data_type: VARCHAR
- name: action_total_action_fees
data_type: NUMBER
- name: compute_vm_init_state_hash
data_type: VARCHAR
- name: account_state_balance_after
data_type: NUMBER
- name: action_tot_actions
data_type: NUMBER
- name: compute_vm_final_state_hash
data_type: VARCHAR
- name: action_no_funds
data_type: BOOLEAN
- name: transactions_id
data_type: VARCHAR
- name: inserted_timestamp
data_type: TIMESTAMP_NTZ
- name: modified_timestamp
data_type: TIMESTAMP_NTZ
- name: _inserted_timestamp
data_type: TIMESTAMP_NTZ
- name: _invocation_id
data_type: VARCHAR

23
models/sources.yml Normal file
View File

@ -0,0 +1,23 @@
version: 2
sources:
- name: bronze_streamline
database: streamline
schema: "{{ 'sui' if target.database == 'SUI' else 'sui_dev' }}"
tables:
- name: checkpoints
- name: transactions
- name: crosschain
database: "{{ 'crosschain' if target.database == 'SUI' else 'crosschain_dev' }}"
schema: core
tables:
- name: address_tags
- name: dim_dates
- name: crosschain_silver
database: "{{ 'crosschain' if target.database == 'SUI' else 'crosschain_dev' }}"
schema: silver
tables:
- name: number_sequence
- name: complete_native_prices
- name: labels_combined

View File

@ -0,0 +1,12 @@
{{ config(
materialized = 'ephemeral',
enabled = false
) }}
SELECT
COALESCE(MIN(checkpoint_number), 0) AS checkpoint_number
FROM
{{ ref("core__fact_checkpoints") }}
WHERE
block_timestamp >= DATEADD('hour', -72, TRUNCATE(SYSDATE(), 'HOUR'))
AND block_timestamp < DATEADD('hour', -71, TRUNCATE(SYSDATE(), 'HOUR'))

View File

@ -0,0 +1,48 @@
-- depends_on: {{ ref('bronze__checkpoints') }}
-- depends_on: {{ ref('bronze__checkpoints_FR') }}
{{ config (
materialized = "incremental",
unique_key = ['checkpoint_number'],
merge_exclude_columns = ["inserted_timestamp"],
cluster_by = "ROUND(checkpoint_number, -5)",
tags = ['streamline_realtime']
) }}
SELECT
DATA :"result": "sequenceNumber" :: bigint AS checkpoint_number,
TO_TIMESTAMP(
DATA :"result" :"timestampMs"
) AS block_timestamp,
DATA :"result": "transactions" AS transactions_array,
ARRAY_SIZE(
DATA :"result": "transactions"
) AS tx_count,
partition_key,
_inserted_timestamp,
SYSDATE() AS inserted_timestamp,
SYSDATE() AS modified_timestamp,
file_name,
'{{ invocation_id }}' AS _invocation_id,
FROM
{% if is_incremental() %}
{{ ref('bronze__checkpoints') }}
{% else %}
{{ ref('bronze__checkpoints_FR') }}
{% endif %}
WHERE
DATA :error IS NULL
{% if is_incremental() %}
AND _inserted_timestamp >= (
SELECT
COALESCE(MAX(_INSERTED_TIMESTAMP), '1970-01-01' :: DATE) max_INSERTED_TIMESTAMP
FROM
{{ this }})
{% endif %}
qualify ROW_NUMBER() over (
PARTITION BY checkpoint_number
ORDER BY
_inserted_timestamp DESC
) = 1

View File

@ -0,0 +1,51 @@
-- depends_on: {{ ref('bronze__transactions') }}
-- depends_on: {{ ref('bronze__transactions_FR') }}
{{ config (
materialized = "incremental",
unique_key = ['tx_digest'],
incremental_predicates = ["dynamic_range_predicate", "block_timestamp::date"],
merge_exclude_columns = ["inserted_timestamp"],
cluster_by = "block_timestamp::DATE",
tags = ['streamline_realtime'],
post_hook = enable_search_optimization(
'{{this.schema}}',
'{{this.identifier}}',
'ON EQUALITY(tx_digest)'
),
) }}
SELECT
DATA :"checkpoint" :: bigint AS checkpoint_number,
DATA :"digest" :: STRING AS tx_digest,
TO_TIMESTAMP(
DATA :"timestampMs"
) AS block_timestamp,
partition_key,
_inserted_timestamp,
SYSDATE() AS inserted_timestamp,
SYSDATE() AS modified_timestamp,
file_name,
'{{ invocation_id }}' AS _invocation_id,
FROM
{% if is_incremental() %}
{{ ref('bronze__transactions') }}
{% else %}
{{ ref('bronze__transactions_FR') }}
{% endif %}
WHERE
DATA :error IS NULL
{% if is_incremental() %}
AND _inserted_timestamp >= (
SELECT
COALESCE(MAX(_INSERTED_TIMESTAMP), '1970-01-01' :: DATE) max_INSERTED_TIMESTAMP
FROM
{{ this }})
{% endif %}
qualify ROW_NUMBER() over (
PARTITION BY tx_digest
ORDER BY
_inserted_timestamp DESC
) = 1

View File

@ -0,0 +1,56 @@
{{ config (
materialized = "view",
post_hook = fsc_utils.if_data_call_function_v2(
func = 'streamline.udf_bulk_rest_api_v2',
target = "{{this.schema}}.{{this.identifier}}",
params ={ "external_table" :"checkpoints",
"sql_limit" :"100000",
"producer_batch_size" :"100000",
"worker_batch_size" :"50000",
"sql_source" :"{{this.identifier}}",
"order_by_column": "checkpoint_number DESC" }
),
tags = ['streamline_realtime']
) }}
WITH checks AS (
SELECT
checkpoint_number
FROM
{{ ref("streamline__checkpoints") }}
EXCEPT
SELECT
checkpoint_number
FROM
{{ ref("streamline__checkpoints_complete") }}
)
SELECT
checkpoint_number,
ROUND(
checkpoint_number,
-4
) :: INT AS partition_key,
{{ target.database }}.live.udf_api(
'POST',
'{Service}/{Authentication}',
OBJECT_CONSTRUCT(
'Content-Type',
'application/json'
),
OBJECT_CONSTRUCT(
'jsonrpc',
'2.0',
'id',
checkpoint_number,
'method',
'sui_getCheckpoint',
'params',
ARRAY_CONSTRUCT(
checkpoint_number :: STRING
)
),
'Vault/prod/sui/quicknode/mainnet'
) AS request
FROM
checks

View File

@ -0,0 +1,112 @@
{{ config (
materialized = "view",
post_hook = fsc_utils.if_data_call_function_v2(
func = 'streamline.udf_bulk_rest_api_v2',
target = "{{this.schema}}.{{this.identifier}}",
params ={ "external_table" :"transactions",
"sql_limit" :"100000",
"producer_batch_size" :"100000",
"worker_batch_size" :"50000",
"sql_source" :"{{this.identifier}}",
'exploded_key': '["result"]',
"order_by_column": "checkpoint_number DESC" }
),
tags = ['streamline_realtime']
) }}
WITH {# last_3_days AS (
SELECT
sequence_number
FROM
{{ ref("_sequence_lookback") }}
),
#}
txs AS (
SELECT
A.tx_digest,
A.tx_index,
A.checkpoint_number,
A.block_timestamp
FROM
{{ ref("streamline__transactions") }} A
LEFT JOIN {{ ref("streamline__transactions_complete") }}
b
ON A.tx_digest = b.tx_digest
AND A.block_timestamp :: DATE = b.block_timestamp :: DATE
WHERE
b.tx_digest IS NULL {# AND sequence_number >= (
SELECT
sequence_number
FROM
last_3_days
) #}
),
tx_grouped AS (
SELECT
checkpoint_number,
block_timestamp,
FLOOR(
tx_index / 50
) grp,
ARRAY_AGG(
tx_digest
) AS tx_param,
COUNT(1) AS tx_count_in_request
FROM
txs
GROUP BY
checkpoint_number,
block_timestamp,
grp
)
SELECT
checkpoint_number,
tx_count_in_request,
to_char(
block_timestamp,
'YYYY_MM_DD_HH_MI_SS_FF3'
) AS block_timestamp,
ROUND(
checkpoint_number,
-4
) :: INT AS partition_key,
{{ target.database }}.live.udf_api(
'POST',
'{Service}/{Authentication}',
OBJECT_CONSTRUCT(
'Content-Type',
'application/json'
),
OBJECT_CONSTRUCT(
'jsonrpc',
'2.0',
'id',
checkpoint_number,
'method',
'sui_multiGetTransactionBlocks',
'params',
ARRAY_CONSTRUCT(
tx_param,
OBJECT_CONSTRUCT(
'showInput',
TRUE,
'showRawInput',
FALSE,
'showEffects',
TRUE,
'showEvents',
TRUE,
'showRawEffects',
FALSE,
'showObjectChanges',
TRUE,
'showBalanceChanges',
TRUE
)
)
),
'Vault/prod/sui/quicknode/mainnet'
) AS request
FROM
tx_grouped

View File

@ -0,0 +1,27 @@
{{ config (
materialized = "view",
tags = ['streamline_view']
) }}
SELECT
{{ target.database }}.live.udf_api(
'POST',
'{Service}/{Authentication}',
OBJECT_CONSTRUCT(
'Content-Type',
'application/json',
'fsc-quantum-state',
'livequery'
),
OBJECT_CONSTRUCT(
'jsonrpc',
'2.0',
'id',
1,
'method',
'sui_getLatestCheckpointSequenceNumber',
'params',
ARRAY_CONSTRUCT()
),
'Vault/prod/sui/quicknode/mainnet'
) :data: "result" :: INT AS checkpoint_number

View File

@ -0,0 +1,20 @@
{{ config (
materialized = "view",
tags = ['streamline_view']
) }}
SELECT
_id AS checkpoint_number
FROM
{{ source(
'crosschain_silver',
'number_sequence'
) }}
WHERE
_id >= 96605300
AND _id <= (
SELECT
MAX(checkpoint_number)
FROM
{{ ref('streamline__chainhead') }}
)

View File

@ -0,0 +1,31 @@
{{ config (
materialized = "incremental",
unique_key = ['tx_digest'],
incremental_predicates = ["dynamic_range_predicate", "block_timestamp::date"],
merge_exclude_columns = ["inserted_timestamp"],
cluster_by = "block_timestamp::DATE",
tags = ['streamline_realtime']
) }}
SELECT
checkpoint_number,
block_timestamp,
b.index AS tx_index,
b.value :: STRING AS tx_digest,
SYSDATE() AS inserted_timestamp,
SYSDATE() AS modified_timestamp,
'{{ invocation_id }}' AS _invocation_id,
FROM
{{ ref("streamline__checkpoints_complete") }},
LATERAL FLATTEN(
transactions_array
) b
{% if is_incremental() %}
WHERE
modified_timestamp >= (
SELECT
COALESCE(MAX(modified_timestamp), '1970-01-01' :: DATE) modified_timestamp
FROM
{{ this }})
{% endif %}

7
packages.yml Normal file
View File

@ -0,0 +1,7 @@
packages:
- git: https://github.com/FlipsideCrypto/fsc-utils.git
revision: v1.37.0
- package: get-select/dbt_snowflake_query_tags
version: [">=2.0.0", "<3.0.0"]
- package: dbt-labs/dbt_utils
version: 1.0.0

29
profiles.yml Normal file
View File

@ -0,0 +1,29 @@
sui:
target: dev
outputs:
dev:
type: snowflake
account: "{{ env_var('ACCOUNT') }}"
user: "{{ env_var('USER') }}"
password: "{{ env_var('PASSWORD') }}"
role: "{{ env_var('ROLE') }}"
schema: "{{ env_var('SCHEMA') }}"
region: "{{ env_var('REGION') }}"
database: "{{ env_var('DATABASE') }}"
warehouse: "{{ env_var('WAREHOUSE') }}"
threads: 8
client_session_keep_alive: False
prod:
type: snowflake
account: "{{ env_var('ACCOUNT') }}"
user: "{{ env_var('USER') }}"
password: "{{ env_var('PASSWORD') }}"
role: "{{ env_var('ROLE') }}"
schema: "{{ env_var('SCHEMA') }}"
region: "{{ env_var('REGION') }}"
database: "{{ env_var('DATABASE') }}"
warehouse: "{{ env_var('WAREHOUSE') }}"
threads: 8
client_session_keep_alive: False
config:
send_anonymous_usage_stats: False

74
python/slack_alert.py Normal file
View File

@ -0,0 +1,74 @@
import requests
import os
import sys
def create_message():
"""Creates a simple failure notification message with repo, workflow name, and URL"""
# Get GitHub environment variables
repository = os.environ.get('GITHUB_REPOSITORY', 'Unknown repository')
repo_name = repository.split('/')[-1] if '/' in repository else repository
workflow_name = os.environ.get('GITHUB_WORKFLOW', 'Unknown workflow')
run_id = os.environ.get('GITHUB_RUN_ID', '')
server_url = os.environ.get('GITHUB_SERVER_URL', 'https://github.com')
# Build the workflow URL
workflow_url = f"{server_url}/{repository}/actions/runs/{run_id}"
message_body = {
"text": f"Failure in {repo_name}",
"attachments": [
{
"color": "#f44336", # Red color for failures
"fields": [
{
"title": "Repository",
"value": repository,
"short": True
},
{
"title": "Workflow",
"value": workflow_name,
"short": True
}
],
"actions": [
{
"type": "butsui",
"text": "View Workflow Run",
"style": "primary",
"url": workflow_url
}
],
"footer": "GitHub Actions"
}
]
}
return message_body
def send_alert(webhook_url):
"""Sends a failure notification to Slack"""
message = create_message()
try:
response = requests.post(webhook_url, json=message)
if response.status_code == 200:
print("Successfully sent Slack notification")
else:
print(f"Failed to send Slack notification: {response.status_code} {response.text}")
sys.exit(1)
except Exception as e:
print(f"Error sending Slack notification: {str(e)}")
sys.exit(1)
if __name__ == '__main__':
webhook_url = os.environ.get("SLACK_WEBHOOK_URL")
if not webhook_url:
print("ERROR: SLACK_WEBHOOK_URL environment variable is required")
sys.exit(1)
send_alert(webhook_url)

2
requirements.txt Normal file
View File

@ -0,0 +1,2 @@
dbt-core>=1.9,<1.10
dbt-snowflake>=1.9,<1.10