diff --git a/.github/workflows/dbt_docs_update.yml b/.github/workflows/dbt_docs_update.yml new file mode 100644 index 0000000..0c45f01 --- /dev/null +++ b/.github/workflows/dbt_docs_update.yml @@ -0,0 +1,78 @@ +name: docs_update + +on: + push: + branches: + - "main" + +env: + USE_VARS: "${{ vars.USE_VARS }}" + DBT_PROFILES_DIR: "${{ vars.DBT_PROFILES_DIR }}" + DBT_VERSION: "${{ vars.DBT_VERSION }}" + ACCOUNT: "${{ vars.ACCOUNT }}" + ROLE: "${{ vars.ROLE }}" + USER: "${{ vars.USER }}" + PASSWORD: "${{ secrets.PASSWORD }}" + REGION: "${{ vars.REGION }}" + DATABASE: "${{ vars.DATABASE }}" + WAREHOUSE: "${{ vars.WAREHOUSE }}" + SCHEMA: "${{ vars.SCHEMA }}" + +concurrency: + group: ${{ github.workflow }} + +jobs: + run_dbt_jobs: + runs-on: ubuntu-latest + environment: + name: workflow_prod + + steps: + - uses: actions/checkout@v3 + + - uses: actions/setup-python@v4 + with: + python-version: "${{ vars.PYTHON_VERSION }}" + cache: "pip" + + - name: install dependencies + run: | + pip install -r requirements.txt + dbt deps + + - name: checkout docs branch + run: | + git checkout -B docs origin/main + - name: generate dbt docs + run: | + dbt ls -t prod + dbt docs generate -t prod + + - name: move files to docs directory + run: | + mkdir -p ./docs + cp target/{catalog.json,manifest.json,index.html} docs/ + - name: clean up target directory + run: dbt clean + + - name: check for changes + run: git status + + - name: stage changed files + run: git add . + + - name: commit changed files + run: | + git config user.email "abc@xyz" + git config user.name "github-actions" + git commit -am "Auto-update docs" + - name: push changes to docs + run: | + git push -f --set-upstream origin docs + + notify-failure: + needs: [run_dbt_jobs] + if: failure() + uses: ./.github/workflows/slack_notify.yml + secrets: + SLACK_WEBHOOK_URL: ${{ secrets.SLACK_WEBHOOK_URL }} \ No newline at end of file diff --git a/.github/workflows/dbt_run_adhoc.yml b/.github/workflows/dbt_run_adhoc.yml new file mode 100644 index 0000000..986a193 --- /dev/null +++ b/.github/workflows/dbt_run_adhoc.yml @@ -0,0 +1,74 @@ +name: dbt_run_adhoc +run-name: dbt_run_adhoc + +on: + workflow_dispatch: + branches: + - "main" + inputs: + environment: + type: choice + description: DBT Run Environment + required: true + options: + - dev + - prod + default: dev + warehouse: + type: choice + description: Snowflake warehouse + required: true + options: + - DBT + - DBT_CLOUD + - DBT_EMERGENCY + default: DBT + dbt_command: + type: string + description: 'DBT Run Command' + required: true + +env: + USE_VARS: "${{ vars.USE_VARS }}" + DBT_PROFILES_DIR: "${{ vars.DBT_PROFILES_DIR }}" + DBT_VERSION: "${{ vars.DBT_VERSION }}" + ACCOUNT: "${{ vars.ACCOUNT }}" + ROLE: "${{ vars.ROLE }}" + USER: "${{ vars.USER }}" + PASSWORD: "${{ secrets.PASSWORD }}" + REGION: "${{ vars.REGION }}" + DATABASE: "${{ vars.DATABASE }}" + WAREHOUSE: "${{ inputs.WAREHOUSE }}" + SCHEMA: "${{ vars.SCHEMA }}" + +concurrency: + group: ${{ github.workflow }} + +jobs: + run_dbt_jobs: + runs-on: ubuntu-latest + environment: + name: workflow_${{ inputs.environment }} + + steps: + - uses: actions/checkout@v3 + + - uses: actions/setup-python@v4 + with: + python-version: "${{ vars.PYTHON_VERSION }}" + cache: "pip" + + - name: install dependencies + run: | + pip install -r requirements.txt + dbt deps + - name: Run DBT Jobs + run: | + ${{ inputs.dbt_command }} + + notify-failure: + needs: [run_dbt_jobs] + if: failure() + uses: ./.github/workflows/slack_notify.yml + secrets: + SLACK_WEBHOOK_URL: ${{ secrets.SLACK_WEBHOOK_URL }} \ No newline at end of file diff --git a/.github/workflows/dbt_run_dev_refresh.yml b/.github/workflows/dbt_run_dev_refresh.yml new file mode 100644 index 0000000..32db163 --- /dev/null +++ b/.github/workflows/dbt_run_dev_refresh.yml @@ -0,0 +1,44 @@ +name: dbt_run_dev_refresh +run-name: dbt_run_dev_refresh + +on: + workflow_dispatch: +# schedule: +# - cron: '27 8 * * *' + +env: + DBT_PROFILES_DIR: "${{ vars.DBT_PROFILES_DIR }}" + + ACCOUNT: "${{ vars.ACCOUNT }}" + ROLE: "${{ vars.ROLE }}" + USER: "${{ vars.USER }}" + PASSWORD: "${{ secrets.PASSWORD }}" + REGION: "${{ vars.REGION }}" + DATABASE: "${{ vars.DATABASE }}" + WAREHOUSE: "${{ vars.WAREHOUSE }}" + SCHEMA: "${{ vars.SCHEMA }}" + +concurrency: + group: ${{ github.workflow }} + +jobs: + run_dbt_jobs: + runs-on: ubuntu-latest + environment: + name: workflow_prod + + steps: + - uses: actions/checkout@v3 + + - uses: actions/setup-python@v4 + with: + python-version: "${{ vars.PYTHON_VERSION }}" + cache: "pip" + + - name: install dependencies + run: | + pip install -r requirements.txt + dbt deps + - name: Run DBT Jobs + run: | + dbt run-operation run_sp_create_prod_clone diff --git a/.github/workflows/dbt_run_incremental_core.yml b/.github/workflows/dbt_run_incremental_core.yml new file mode 100644 index 0000000..83fa569 --- /dev/null +++ b/.github/workflows/dbt_run_incremental_core.yml @@ -0,0 +1,53 @@ +name: dbt_run_incremental_core +run-name: dbt_run_incremental_core + +on: + workflow_dispatch: + branches: + - "main" + # schedule: + # - cron: "35 3 * * *" + +env: + DBT_PROFILES_DIR: "${{ vars.DBT_PROFILES_DIR }}" + + ACCOUNT: "${{ vars.ACCOUNT }}" + ROLE: "${{ vars.ROLE }}" + USER: "${{ vars.USER }}" + PASSWORD: "${{ secrets.PASSWORD }}" + REGION: "${{ vars.REGION }}" + DATABASE: "${{ vars.DATABASE }}" + WAREHOUSE: "${{ vars.WAREHOUSE }}" + SCHEMA: "${{ vars.SCHEMA }}" + +concurrency: + group: ${{ github.workflow }} + +jobs: + run_dbt_jobs: + runs-on: ubuntu-latest + environment: + name: workflow_prod + + steps: + - uses: actions/checkout@v3 + + - uses: actions/setup-python@v4 + with: + python-version: "${{ vars.PYTHON_VERSION }}" + cache: "pip" + + - name: install dependencies + run: | + pip install -r requirements.txt + dbt deps + - name: Run DBT Jobs + run: | + dbt run -s "sui_models,tag:core" + + notify-failure: + needs: [run_dbt_jobs] + if: failure() + uses: ./.github/workflows/slack_notify.yml + secrets: + SLACK_WEBHOOK_URL: ${{ secrets.SLACK_WEBHOOK_URL }} diff --git a/.github/workflows/dbt_run_streamline_realtime.yml b/.github/workflows/dbt_run_streamline_realtime.yml new file mode 100644 index 0000000..19e22e9 --- /dev/null +++ b/.github/workflows/dbt_run_streamline_realtime.yml @@ -0,0 +1,46 @@ +name: dbt_run_streamline_realtime +run-name: dbt_run_streamline_realtime + +on: + workflow_dispatch: + branches: + - "main" + # schedule: + # - cron: '*/15 * * * *' + +env: + DBT_PROFILES_DIR: "${{ vars.DBT_PROFILES_DIR }}" + + ACCOUNT: "${{ vars.ACCOUNT }}" + ROLE: "${{ vars.ROLE }}" + USER: "${{ vars.USER }}" + PASSWORD: "${{ secrets.PASSWORD }}" + REGION: "${{ vars.REGION }}" + DATABASE: "${{ vars.DATABASE }}" + WAREHOUSE: "${{ vars.WAREHOUSE }}" + SCHEMA: "${{ vars.SCHEMA }}" + +concurrency: + group: ${{ github.workflow }} + +jobs: + run_dbt_jobs: + runs-on: ubuntu-latest + environment: + name: workflow_prod + + steps: + - uses: actions/checkout@v3 + + - uses: actions/setup-python@v4 + with: + python-version: "${{ vars.PYTHON_VERSION }}" + cache: "pip" + + - name: install dependencies + run: | + pip install -r requirements.txt + dbt deps + - name: Run DBT Jobs + run: | + dbt run -m "sui_models,tag:streamline_realtime" --vars '{STREAMLINE_INVOKE_STREAMS: True}' diff --git a/.github/workflows/slack_notify.yml b/.github/workflows/slack_notify.yml new file mode 100644 index 0000000..e4afc12 --- /dev/null +++ b/.github/workflows/slack_notify.yml @@ -0,0 +1,27 @@ +name: Slack Notification +on: + workflow_call: + secrets: + SLACK_WEBHOOK_URL: + required: true + +jobs: + notify: + runs-on: ubuntu-latest + environment: workflow_prod + steps: + - name: Checkout + uses: actions/checkout@v3 + + - name: Set up Python + uses: actions/setup-python@v4 + with: + python-version: "3.10" + + - name: Install dependencies + run: pip install requests + + - name: Send Slack notification + run: python python/slack_alert.py + env: + SLACK_WEBHOOK_URL: ${{ secrets.SLACK_WEBHOOK_URL }} \ No newline at end of file diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..261bec5 --- /dev/null +++ b/.gitignore @@ -0,0 +1,21 @@ +target/ +dbt_modules/ +# newer versions of dbt use this directory instead of dbt_modules for test dependencies +dbt_packages/ +logs/ + +.venv/ +.python-version +dbt-env/ +venv/ +package-lock.yml + +# Visual Studio Code files +*/.vscode +*.code-workspace +.history/ +**/.DS_Store +.vscode/ +.env +.DS_Store +.user.yml \ No newline at end of file diff --git a/README.md b/README.md new file mode 100644 index 0000000..32ca413 --- /dev/null +++ b/README.md @@ -0,0 +1,74 @@ +## Profile Set Up + +#### Use the following within profiles.yml +---- + +```yml +sui: + target: dev + outputs: + dev: + type: snowflake + account: + role: + user: + password: + region: + database: sui_DEV + warehouse: + schema: silver + threads: 4 + client_session_keep_alive: False + query_tag: +``` + +### Resources: +- Learn more about dbt [in the docs](https://docs.getdbt.com/docs/introduction) +- Check out [Discourse](https://discourse.getdbt.com/) for commonly asked questions and answers +- Join the [chat](https://community.getdbt.com/) on Slack for live discussions and support +- Find [dbt events](https://events.getdbt.com) near you +- Check out [the blog](https://blog.getdbt.com/) for the latest news on dbt's development and best practices + +- Check out [the blog](https://blog.getdbt.com/) for the latest news on dbt's development and best practices + +## Applying Model Tags + +### Database / Schema level tags + +Database and schema tags are applied via the `add_database_or_schema_tags` macro. These tags are inherited by their downstream objects. To add/modify tags call the appropriate tag set function within the macro. + +``` +{{ set_database_tag_value('SOME_DATABASE_TAG_KEY','SOME_DATABASE_TAG_VALUE') }} +{{ set_schema_tag_value('SOME_SCHEMA_TAG_KEY','SOME_SCHEMA_TAG_VALUE') }} +``` + +### Model tags + +To add/update a model's snowflake tags, add/modify the `meta` model property under `config`. Only table level tags are supported at this time via DBT. + +``` +{{ config( + ..., + meta={ + 'database_tags':{ + 'table': { + 'PURPOSE': 'SOME_PURPOSE' + } + } + }, + ... +) }} +``` + +By default, model tags are not pushed to snowflake on each load. You can push a tag update for a model by specifying the `UPDATE_SNOWFLAKE_TAGS` project variable during a run. + +``` +dbt run --var '{"UPDATE_SNOWFLAKE_TAGS":True}' -s models/core/core__fact_swaps.sql +``` + +### Querying for existing tags on a model in snowflake + +``` +select * +from table(sui.information_schema.tag_references('sui.core.fact_blocks', 'table')); +``` \ No newline at end of file diff --git a/dbt_project.yml b/dbt_project.yml new file mode 100644 index 0000000..aa6bec5 --- /dev/null +++ b/dbt_project.yml @@ -0,0 +1,102 @@ +# Name your project! Project names should contain only lowercase characters +# and underscores. A good package name should reflect your organization's +# name or the intended use of these models +name: "sui_models" +version: "1.0.0" +config-version: 2 + +require-dbt-version: ">=1.8.0" + +# This setting configures which "profile" dbt uses for this project. +profile: "sui" + +# These configurations specify where dbt should look for different types of files. +# The `model-paths` config, for example, states that models in this project can be +# found in the "models/" directory. You probably won't need to change these! +model-paths: ["models"] +analysis-paths: ["analysis"] +test-paths: ["tests"] +seed-paths: ["data"] +macro-paths: ["macros"] +snapshot-paths: ["snapshots"] + +target-path: "target" # directory which will store compiled SQL files +clean-targets: # directories to be removed by `dbt clean` + - "target" + - "dbt_modules" + - "dbt_packages" + +# Configuring models +# Full documentation: https://docs.getdbt.com/docs/configuring-models + +# In this example config, we tell dbt to build all models in the example/ directory +# as tables. These settings can be overridden in the individual model files +# using the `{{ config(...) }}` macro. +models: + +copy_grants: true + +persist_docs: + relation: true + columns: true + +on_schema_change: "append_new_columns" + livequery_models: + deploy: + core: + materialized: ephemeral + + +data_tests: + sui_models: # replace with the name of the chain + +store_failures: true # all tests + +where: "modified_timestamp::DATE > dateadd(hour, -{{ var('TEST_HOURS_THRESHOLD', 36) }}, sysdate())" + +on-run-start: + - '{{create_sps()}}' + - '{{create_udfs()}}' + +on-run-end: + - '{{ apply_meta_as_tags(results) }}' + +dispatch: + - macro_namespace: dbt + search_order: + - sui-models + - dbt_snowflake_query_tags + - dbt + +query-comment: + comment: '{{ dbt_snowflake_query_tags.get_query_comment(node) }}' + append: true # Snowflake removes prefixed comments. + +vars: + "dbt_date:time_zone": GMT + OBSERV_FULL_TEST: False + START_GHA_TASKS: False + STREAMLINE_INVOKE_STREAMS: True + STREAMLINE_USE_DEV_FOR_EXTERNAL_TABLES: True + STREAMLINE_RUN_HISTORY: False + STREAMLINE_RETRY_UNKNOWN: False + UPDATE_SNOWFLAKE_TAGS: True + UPDATE_UDFS_AND_SPS: False + + + API_INTEGRATION: '{{ var("config")[target.name]["API_INTEGRATION"] if var("config")[target.name] else var("config")["dev"]["API_INTEGRATION"] }}' + EXTERNAL_FUNCTION_URI: '{{ var("config")[target.name]["EXTERNAL_FUNCTION_URI"] if var("config")[target.name] else var("config")["dev"]["EXTERNAL_FUNCTION_URI"] }}' + ROLES: | + ["INTERNAL_DEV"] + + + config: + dev: + API_INTEGRATION: AWS_SUI_API_STG_V2 + EXTERNAL_FUNCTION_URI: azbc07ki8d.execute-api.us-east-1.amazonaws.com/stg/ + ROLES: + - AWS_LAMBDA_SUI_API + - INTERNAL_DEV + + prod: + API_INTEGRATION: AWS_SUI_API_PROD_V2 + EXTERNAL_FUNCTION_URI: nqj8j7ln67.execute-api.us-east-1.amazonaws.com/prod/ + ROLES: + - AWS_LAMBDA_SUI_API + - INTERNAL_DEV + - DBT_CLOUD_SUI \ No newline at end of file diff --git a/docs/.gitkeep b/docs/.gitkeep new file mode 100644 index 0000000..e69de29 diff --git a/macros/.gitkeep b/macros/.gitkeep new file mode 100644 index 0000000..e69de29 diff --git a/macros/create_sps.sql b/macros/create_sps.sql new file mode 100644 index 0000000..681fb25 --- /dev/null +++ b/macros/create_sps.sql @@ -0,0 +1,19 @@ +{% macro create_sps() %} + {% if target.database == 'SUI' %} + CREATE schema IF NOT EXISTS _internal; +{{ sp_create_prod_clone('_internal') }}; + {% endif %} +{% endmacro %} + +{% macro enable_search_optimization( + schema_name, + table_name, + condition = '' + ) %} + {% if target.database == 'SUI' %} + ALTER TABLE + {{ schema_name }}.{{ table_name }} + ADD + search optimization {{ condition }} + {% endif %} +{% endmacro %} diff --git a/macros/create_udfs.sql b/macros/create_udfs.sql new file mode 100644 index 0000000..29fb073 --- /dev/null +++ b/macros/create_udfs.sql @@ -0,0 +1,8 @@ +{% macro create_udfs() %} + {% if var("UPDATE_UDFS_AND_SPS") %} + {% set sql %} + {{ create_udf_bulk_rest_api_v2() }}; +{% endset %} + {% do run_query(sql) %} + {% endif %} +{% endmacro %} diff --git a/macros/custom_naming_macros.sql b/macros/custom_naming_macros.sql new file mode 100644 index 0000000..53fe8c7 --- /dev/null +++ b/macros/custom_naming_macros.sql @@ -0,0 +1,33 @@ +{% macro generate_schema_name( + custom_schema_name = none, + node = none + ) -%} + {% set node_name = node.name %} + {% set split_name = node_name.split('__') %} + {{ split_name [0] | trim }} +{%- endmacro %} + +{% macro generate_alias_name( + custom_alias_name = none, + node = none + ) -%} + {% set node_name = node.name %} + {% set split_name = node_name.split('__') %} + {% if split_name | length < 2 %} + {{ split_name [0] | trim }} + {% else %} + {{ split_name [1] | trim }} + {% endif %} +{%- endmacro %} + +{% macro generate_tmp_view_name(model_name) -%} + {% set node_name = model_name.name %} + {% set split_name = node_name.split('__') %} + {{ target.database ~ '.' ~ split_name[0] ~ '.' ~ split_name [1] ~ '__dbt_tmp' | trim }} +{%- endmacro %} + +{% macro generate_view_name(model_name) -%} + {% set node_name = model_name.name %} + {% set split_name = node_name.split('__') %} + {{ target.database ~ '.' ~ split_name[0] ~ '.' ~ split_name [1] | trim }} +{%- endmacro %} \ No newline at end of file diff --git a/macros/custom_query_tag.sql b/macros/custom_query_tag.sql new file mode 100644 index 0000000..809e1bb --- /dev/null +++ b/macros/custom_query_tag.sql @@ -0,0 +1,11 @@ +{% macro set_query_tag() -%} + {% set new_json = {"repo":project_name, "object":this.table, "profile":target.profile_name, "env":target.name, "existing_tag":get_current_query_tag() } %} +{% set new_query_tag = tojson(new_json) | as_text %} + {% if new_query_tag %} + {% set original_query_tag = get_current_query_tag() %} + {{ log("Setting query_tag to '" ~ new_query_tag ~ "'. Will reset to '" ~ original_query_tag ~ "' after materialization.") }} + {% do run_query("alter session set query_tag = '{}'".format(new_query_tag)) %} + {{ return(original_query_tag)}} + {% endif %} + {{ return(none)}} +{% endmacro %} \ No newline at end of file diff --git a/macros/dbt/get_merge.sql b/macros/dbt/get_merge.sql new file mode 100644 index 0000000..7563d88 --- /dev/null +++ b/macros/dbt/get_merge.sql @@ -0,0 +1,30 @@ +-- incremental_strategy="merge" +{% macro get_merge_sql(target, source, unique_key, dest_columns, incremental_predicates) -%} + {% set merge_sql = fsc_utils.get_merge_sql(target, source, unique_key, dest_columns, incremental_predicates) %} + {{ return(merge_sql) }} +{% endmacro %} + +-- incremental_strategy="delete+insert" +{% macro get_delete_insert_merge_sql(target, source, unique_key, dest_columns, incremental_predicates) -%} + {% set predicate_override = "" %} + -- get the min value of column + {% if incremental_predicates[0] == "min_value_predicate" %} + {% set min_column_name = incremental_predicates[1] %} + {% set query %} + select min({{ min_column_name }}) from {{ source }} + {% endset %} + {% set min_block = run_query(query).columns[0][0] %} + + {% if min_block is not none %} + {% set predicate_override %} + round({{ target }}.{{ min_column_name }},-5) >= round({{ min_block }},-5) + {% endset %} + {% else %} + {% set predicate_override = "1=1" %} + {% endif %} + {% endif %} + {% set predicates = [predicate_override] + incremental_predicates[2:] if predicate_override else incremental_predicates %} + -- standard delete+insert from here + {% set merge_sql = dbt.get_delete_insert_merge_sql(target, source, unique_key, dest_columns, predicates) %} + {{ return(merge_sql) }} +{% endmacro %} \ No newline at end of file diff --git a/macros/dbt/get_tmp_relation_type.sql b/macros/dbt/get_tmp_relation_type.sql new file mode 100644 index 0000000..e7c2d77 --- /dev/null +++ b/macros/dbt/get_tmp_relation_type.sql @@ -0,0 +1,4 @@ +{% macro dbt_snowflake_get_tmp_relation_type(strategy, unique_key, language) %} + -- always table + {{ return('table') }} +{% endmacro %} \ No newline at end of file diff --git a/macros/run_sp_create_prod_clone.sql b/macros/run_sp_create_prod_clone.sql new file mode 100644 index 0000000..d09beab --- /dev/null +++ b/macros/run_sp_create_prod_clone.sql @@ -0,0 +1,10 @@ +{% macro run_sp_create_prod_clone() %} + {% set clone_query %} + call sui._internal.create_prod_clone( + 'sui', + 'sui_dev', + 'internal_dev' + ); +{% endset %} + {% do run_query(clone_query) %} +{% endmacro %} diff --git a/macros/sp_create_prod_clone.sql b/macros/sp_create_prod_clone.sql new file mode 100644 index 0000000..4263b05 --- /dev/null +++ b/macros/sp_create_prod_clone.sql @@ -0,0 +1,51 @@ +{% macro sp_create_prod_clone(target_schema) -%} + +create or replace procedure {{ target_schema }}.create_prod_clone(source_db_name string, destination_db_name string, role_name string) +returns boolean +language javascript +execute as caller +as +$$ + snowflake.execute({sqlText: `BEGIN TRANSACTION;`}); + try { + snowflake.execute({sqlText: `CREATE OR REPLACE DATABASE ${DESTINATION_DB_NAME} CLONE ${SOURCE_DB_NAME}`}); + snowflake.execute({sqlText: `DROP SCHEMA IF EXISTS ${DESTINATION_DB_NAME}._INTERNAL`}); /* this only needs to be in prod */ + snowflake.execute({sqlText: `GRANT USAGE ON DATABASE ${DESTINATION_DB_NAME} TO ROLE AWS_LAMBDA_SUI_API`}); + + snowflake.execute({sqlText: `GRANT OWNERSHIP ON ALL SCHEMAS IN DATABASE ${DESTINATION_DB_NAME} TO ROLE ${ROLE_NAME} COPY CURRENT GRANTS;`}); + snowflake.execute({sqlText: `GRANT OWNERSHIP ON ALL FUNCTIONS IN DATABASE ${DESTINATION_DB_NAME} TO ROLE ${ROLE_NAME} COPY CURRENT GRANTS;`}); + snowflake.execute({sqlText: `GRANT OWNERSHIP ON ALL PROCEDURES IN DATABASE ${DESTINATION_DB_NAME} TO ROLE ${ROLE_NAME} COPY CURRENT GRANTS;`}); + snowflake.execute({sqlText: `GRANT OWNERSHIP ON ALL VIEWS IN DATABASE ${DESTINATION_DB_NAME} TO ROLE ${ROLE_NAME} COPY CURRENT GRANTS;`}); + snowflake.execute({sqlText: `GRANT OWNERSHIP ON ALL STAGES IN DATABASE ${DESTINATION_DB_NAME} TO ROLE ${ROLE_NAME} COPY CURRENT GRANTS;`}); + snowflake.execute({sqlText: `GRANT OWNERSHIP ON ALL TABLES IN DATABASE ${DESTINATION_DB_NAME} TO ROLE ${ROLE_NAME} COPY CURRENT GRANTS;`}); + snowflake.execute({sqlText: `REVOKE OWNERSHIP ON FUTURE FUNCTIONS IN DATABASE ${DESTINATION_DB_NAME} FROM ROLE DBT_CLOUD_SUI;`}); + snowflake.execute({sqlText: `REVOKE OWNERSHIP ON FUTURE PROCEDURES IN DATABASE ${DESTINATION_DB_NAME} FROM ROLE DBT_CLOUD_SUI;`}); + snowflake.execute({sqlText: `REVOKE OWNERSHIP ON FUTURE VIEWS IN DATABASE ${DESTINATION_DB_NAME} FROM ROLE DBT_CLOUD_SUI;`}); + snowflake.execute({sqlText: `REVOKE OWNERSHIP ON FUTURE STAGES IN DATABASE ${DESTINATION_DB_NAME} FROM ROLE DBT_CLOUD_SUI;`}); + snowflake.execute({sqlText: `REVOKE OWNERSHIP ON FUTURE TABLES IN DATABASE ${DESTINATION_DB_NAME} FROM ROLE DBT_CLOUD_SUI;`}); + snowflake.execute({sqlText: `GRANT OWNERSHIP ON FUTURE FUNCTIONS IN DATABASE ${DESTINATION_DB_NAME} TO ROLE ${ROLE_NAME};`}); + snowflake.execute({sqlText: `GRANT OWNERSHIP ON FUTURE PROCEDURES IN DATABASE ${DESTINATION_DB_NAME} TO ROLE ${ROLE_NAME};`}); + snowflake.execute({sqlText: `GRANT OWNERSHIP ON FUTURE VIEWS IN DATABASE ${DESTINATION_DB_NAME} TO ROLE ${ROLE_NAME};`}); + snowflake.execute({sqlText: `GRANT OWNERSHIP ON FUTURE STAGES IN DATABASE ${DESTINATION_DB_NAME} TO ROLE ${ROLE_NAME};`}); + snowflake.execute({sqlText: `GRANT OWNERSHIP ON FUTURE TABLES IN DATABASE ${DESTINATION_DB_NAME} TO ROLE ${ROLE_NAME};`}); + snowflake.execute({sqlText: `GRANT USAGE ON ALL STAGES IN DATABASE ${DESTINATION_DB_NAME} TO ROLE AWS_LAMBDA_SUI_API;`}); + + snowflake.execute({sqlText: `GRANT OWNERSHIP ON DATABASE ${DESTINATION_DB_NAME} TO ROLE ${ROLE_NAME} COPY CURRENT GRANTS;`}) + + var existing_tags = snowflake.execute({sqlText: `SHOW TAGS IN DATABASE ${DESTINATION_DB_NAME};`}); + while (existing_tags.next()) { + var schema = existing_tags.getColumnValue(4); + var tag_name = existing_tags.getColumnValue(2) + snowflake.execute({sqlText: `GRANT OWNERSHIP ON TAG ${DESTINATION_DB_NAME}.${schema}.${tag_name} TO ROLE ${ROLE_NAME} COPY CURRENT GRANTS;`}); + } + + snowflake.execute({sqlText: `COMMIT;`}); + } catch (err) { + snowflake.execute({sqlText: `ROLLBACK;`}); + throw(err); + } + + return true +$$ + +{%- endmacro %} \ No newline at end of file diff --git a/macros/streamline/models.sql b/macros/streamline/models.sql new file mode 100644 index 0000000..af757b9 --- /dev/null +++ b/macros/streamline/models.sql @@ -0,0 +1,69 @@ +{% macro streamline_external_table_query( + model, + partition_function +) %} +WITH meta AS ( + SELECT + job_created_time AS _inserted_timestamp, + file_name, + {{ partition_function }} AS partition_key + FROM + TABLE( + information_schema.external_table_file_registration_history( + start_time => DATEADD('day', -3, CURRENT_TIMESTAMP()), + table_name => '{{ source( "bronze_streamline", model) }}') + ) A +) +SELECT + s.*, + b.file_name, + b._inserted_timestamp +FROM + {{ source( + "bronze_streamline", + model + ) }} + s +JOIN + meta b + ON b.file_name = metadata$filename + AND b.partition_key = s.partition_key +WHERE + b.partition_key = s.partition_key + +{% endmacro %} + +{% macro streamline_external_table_query_fr( + model, + partition_function +) %} +WITH meta AS ( + SELECT + registered_on AS _inserted_timestamp, + file_name, + {{ partition_function }} AS partition_key + FROM + TABLE( + information_schema.external_table_files( + table_name => '{{ source( "bronze_streamline", model) }}' + ) + ) A +) +SELECT + s.*, + b.file_name, + b._inserted_timestamp +FROM + {{ source( + "bronze_streamline", + model + ) }} + s +JOIN + meta b + ON b.file_name = metadata$filename + AND b.partition_key = s.partition_key +WHERE + b.partition_key = s.partition_key + +{% endmacro %} \ No newline at end of file diff --git a/macros/streamline/streamline_udfs.sql b/macros/streamline/streamline_udfs.sql new file mode 100644 index 0000000..86af443 --- /dev/null +++ b/macros/streamline/streamline_udfs.sql @@ -0,0 +1,10 @@ +{% macro create_udf_bulk_rest_api_v2() %} + CREATE + OR REPLACE EXTERNAL FUNCTION streamline.udf_bulk_rest_api_v2( + json OBJECT + ) returns ARRAY {% if target.database == 'SUI' -%} + api_integration = aws_sui_api_prod_v2 AS 'https://nqj8j7ln67.execute-api.us-east-1.amazonaws.com/prod/udf_bulk_rest_api' + {% else %} + api_integration = aws_sui_api_stg_v2 AS 'https://azbc07ki8d.execute-api.us-east-1.amazonaws.com/stg/udf_bulk_rest_api' + {%- endif %} +{% endmacro %} diff --git a/macros/tags/add_database_or_schema_tags.sql b/macros/tags/add_database_or_schema_tags.sql new file mode 100644 index 0000000..cbf581a --- /dev/null +++ b/macros/tags/add_database_or_schema_tags.sql @@ -0,0 +1,6 @@ +{% macro add_database_or_schema_tags() %} + {{ set_database_tag_value( + 'BLOCKCHAIN_NAME', + 'SUI' + ) }} +{% endmacro %} diff --git a/macros/tags/snowflake_tagging.sql b/macros/tags/snowflake_tagging.sql new file mode 100644 index 0000000..bc25e69 --- /dev/null +++ b/macros/tags/snowflake_tagging.sql @@ -0,0 +1,127 @@ +{% macro apply_meta_as_tags(results) %} + {% if var("UPDATE_SNOWFLAKE_TAGS") %} + {{ log('apply_meta_as_tags', info=False) }} + {{ log(results, info=False) }} + {% if execute %} + + {%- set tags_by_schema = {} -%} + {% for res in results -%} + {% if res.node.meta.database_tags %} + + {%- set model_database = res.node.database -%} + {%- set model_schema = res.node.schema -%} + {%- set model_schema_full = model_database+'.'+model_schema -%} + {%- set model_alias = res.node.alias -%} + + {% if model_schema_full not in tags_by_schema.keys() %} + {{ log('need to fetch tags for schema '+model_schema_full, info=False) }} + {%- call statement('main', fetch_result=True) -%} + show tags in {{model_database}}.{{model_schema}} + {%- endcall -%} + {%- set _ = tags_by_schema.update({model_schema_full: load_result('main')['table'].columns.get('name').values()|list}) -%} + {{ log('Added tags to cache', info=False) }} + {% else %} + {{ log('already have tag info for schema', info=False) }} + {% endif %} + + {%- set current_tags_in_schema = tags_by_schema[model_schema_full] -%} + {{ log('current_tags_in_schema:', info=False) }} + {{ log(current_tags_in_schema, info=False) }} + {{ log("========== Processing tags for "+model_schema_full+"."+model_alias+" ==========", info=False) }} + + {% set line -%} + node: {{ res.node.unique_id }}; status: {{ res.status }} (message: {{ res.message }}) + node full: {{ res.node}} + meta: {{ res.node.meta}} + materialized: {{ res.node.config.materialized }} + {%- endset %} + {{ log(line, info=False) }} + + {%- call statement('main', fetch_result=True) -%} + select LEVEL,UPPER(TAG_NAME) as TAG_NAME,TAG_VALUE from table(information_schema.tag_references_all_columns('{{model_schema}}.{{model_alias}}', 'table')) + {%- endcall -%} + {%- set existing_tags_for_table = load_result('main')['data'] -%} + {{ log('Existing tags for table:', info=False) }} + {{ log(existing_tags_for_table, info=False) }} + + {{ log('--', info=False) }} + {% for table_tag in res.node.meta.database_tags.table %} + + {{ create_tag_if_missing(current_tags_in_schema,table_tag|upper) }} + {% set desired_tag_value = res.node.meta.database_tags.table[table_tag] %} + + {{set_table_tag_value_if_different(model_schema,model_alias,table_tag,desired_tag_value,existing_tags_for_table)}} + {% endfor %} + {{ log("========== Finished processing tags for "+model_alias+" ==========", info=False) }} + {% endif %} + {% endfor %} + {% endif %} + {% endif %} +{% endmacro %} + + +{% macro create_tag_if_missing(all_tag_names,table_tag) %} + {% if table_tag not in all_tag_names %} + {{ log('Creating missing tag '+table_tag, info=False) }} + {%- call statement('main', fetch_result=True) -%} + create tag if not exists silver.{{table_tag}} + {%- endcall -%} + {{ log(load_result('main').data, info=False) }} + {% else %} + {{ log('Tag already exists: '+table_tag, info=False) }} + {% endif %} +{% endmacro %} + +{% macro set_table_tag_value_if_different(model_schema,table_name,tag_name,desired_tag_value,existing_tags) %} + {{ log('Ensuring tag '+tag_name+' has value '+desired_tag_value+' at table level', info=False) }} + {%- set existing_tag_for_table = existing_tags|selectattr('0','equalto','TABLE')|selectattr('1','equalto',tag_name|upper)|list -%} + {{ log('Filtered tags for table:', info=False) }} + {{ log(existing_tag_for_table[0], info=False) }} + {% if existing_tag_for_table|length > 0 and existing_tag_for_table[0][2]==desired_tag_value %} + {{ log('Correct tag value already exists', info=False) }} + {% else %} + {{ log('Setting tag value for '+tag_name+' to value '+desired_tag_value, info=False) }} + {%- call statement('main', fetch_result=True) -%} + alter table {{model_schema}}.{{table_name}} set tag {{tag_name}} = '{{desired_tag_value}}' + {%- endcall -%} + {{ log(load_result('main').data, info=False) }} + {% endif %} +{% endmacro %} + +{% macro set_column_tag_value_if_different(table_name,column_name,tag_name,desired_tag_value,existing_tags) %} + {{ log('Ensuring tag '+tag_name+' has value '+desired_tag_value+' at column level', info=False) }} + {%- set existing_tag_for_column = existing_tags|selectattr('0','equalto','COLUMN')|selectattr('1','equalto',tag_name|upper)|list -%} + {{ log('Filtered tags for column:', info=False) }} + {{ log(existing_tag_for_column[0], info=False) }} + {% if existing_tag_for_column|length > 0 and existing_tag_for_column[0][2]==desired_tag_value %} + {{ log('Correct tag value already exists', info=False) }} + {% else %} + {{ log('Setting tag value for '+tag_name+' to value '+desired_tag_value, info=False) }} + {%- call statement('main', fetch_result=True) -%} + alter table {{table_name}} modify column {{column_name}} set tag {{tag_name}} = '{{desired_tag_value}}' + {%- endcall -%} + {{ log(load_result('main').data, info=False) }} + {% endif %} +{% endmacro %} + +{% macro set_database_tag_value(tag_name,tag_value) %} + {% set query %} + create tag if not exists silver.{{tag_name}} + {% endset %} + {% do run_query(query) %} + {% set query %} + alter database {{target.database}} set tag {{target.database}}.silver.{{tag_name}} = '{{tag_value}}' + {% endset %} + {% do run_query(query) %} +{% endmacro %} + +{% macro set_schema_tag_value(target_schema,tag_name,tag_value) %} + {% set query %} + create tag if not exists silver.{{tag_name}} + {% endset %} + {% do run_query(query) %} + {% set query %} + alter schema {{target.database}}.{{target_schema}} set tag {{target.database}}.silver.{{tag_name}} = '{{tag_value}}' + {% endset %} + {% do run_query(query) %} +{% endmacro %} \ No newline at end of file diff --git a/macros/tests/compare_model_subset.sql b/macros/tests/compare_model_subset.sql new file mode 100644 index 0000000..18aa624 --- /dev/null +++ b/macros/tests/compare_model_subset.sql @@ -0,0 +1,29 @@ +{% test compare_model_subset(model, compare_model, compare_columns, model_condition) %} + +{% set compare_cols_csv = compare_columns | join(', ') %} + +with a as ( + select {{compare_cols_csv}} from {{ model }} + {{ model_condition }} +), +b as ( + select {{compare_cols_csv}} from {{ compare_model }} +), +a_minus_b as ( + select * from a + except + select * from b +), +b_minus_a as ( + select * from b + except + select * from a +), +unioned as ( + select 'in_actual_not_in_expected' as which_diff, a_minus_b.* from a_minus_b + union all + select 'in_expected_not_in_actual' as which_diff, b_minus_a.* from b_minus_a +) +select * from unioned + +{% endtest %} \ No newline at end of file diff --git a/macros/tests/sequence_gaps.sql b/macros/tests/sequence_gaps.sql new file mode 100644 index 0000000..84a9aa9 --- /dev/null +++ b/macros/tests/sequence_gaps.sql @@ -0,0 +1,37 @@ +{% macro sequence_gaps( + table, + partition_by, + column + ) %} + {%- set partition_sql = partition_by | join(", ") -%} + {%- set previous_column = "prev_" ~ column -%} + WITH source AS ( + SELECT + {{ partition_sql + "," if partition_sql }} + {{ column }}, + LAG( + {{ column }}, + 1 + ) over ( + {{ "PARTITION BY " ~ partition_sql if partition_sql }} + ORDER BY + {{ column }} ASC + ) AS {{ previous_column }} + FROM + {{ table }} + WHERE + block_timestamp::date <= current_date - 1 + ) +SELECT + {{ partition_sql + "," if partition_sql }} + {{ previous_column }}, + {{ column }}, + {{ column }} - {{ previous_column }} + - 1 AS gap +FROM + source +WHERE + {{ column }} - {{ previous_column }} <> 1 +ORDER BY + gap DESC +{% endmacro %} diff --git a/models/bronze/bronze__checkpoints.sql b/models/bronze/bronze__checkpoints.sql new file mode 100644 index 0000000..ef8f779 --- /dev/null +++ b/models/bronze/bronze__checkpoints.sql @@ -0,0 +1,7 @@ +{{ config ( + materialized = 'view' +) }} +{{ streamline_external_table_query( + model = 'checkpoints', + partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 3), '_', 1) AS INTEGER)" +) }} diff --git a/models/bronze/bronze__checkpoints_FR.sql b/models/bronze/bronze__checkpoints_FR.sql new file mode 100644 index 0000000..8cef026 --- /dev/null +++ b/models/bronze/bronze__checkpoints_FR.sql @@ -0,0 +1,7 @@ +{{ config ( + materialized = 'view' +) }} +{{ streamline_external_table_query_fr( + model = 'checkpoints', + partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 3), '_', 1) AS INTEGER)" +) }} diff --git a/models/bronze/bronze__transactions.sql b/models/bronze/bronze__transactions.sql new file mode 100644 index 0000000..c9aa1a8 --- /dev/null +++ b/models/bronze/bronze__transactions.sql @@ -0,0 +1,7 @@ +{{ config ( + materialized = 'view' +) }} +{{ streamline_external_table_query( + model = 'transactions', + partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 3), '_', 1) AS INTEGER)" +) }} diff --git a/models/bronze/bronze__transactions_FR.sql b/models/bronze/bronze__transactions_FR.sql new file mode 100644 index 0000000..68fc5dd --- /dev/null +++ b/models/bronze/bronze__transactions_FR.sql @@ -0,0 +1,7 @@ +{{ config ( + materialized = 'view' +) }} +{{ streamline_external_table_query_fr( + model = 'transactions', + partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 3), '_', 1) AS INTEGER)" +) }} diff --git a/models/descriptions/__overview__.md b/models/descriptions/__overview__.md new file mode 100644 index 0000000..c1d257c --- /dev/null +++ b/models/descriptions/__overview__.md @@ -0,0 +1,59 @@ +{% docs __overview__ %} + +# Welcome to the Flipside Crypto SUI Models Documentation + +## **What does this documentation cover?** +The documentation included here details the design of the SUI blockchain tables and views available via [Flipside Crypto.](https://flipsidecrypto.xyz/) For more information on how these models are built, please see [the github repository.](https://github.com/flipsideCrypto/sui-models/) + +## **How do I use these docs?** +The easiest way to navigate this documentation is to use the Quick Links below. These links will take you to the documentation for each table, which contains a description, a list of the columns, and other helpful information. + +If you are experienced with dbt docs, feel free to use the sidebar to navigate the documentation, as well as explore the relationships between tables and the logic building them. + +There is more information on how to use dbt docs in the last section of this document. + +## **Quick Links to Table Documentation** + +**Click on the links below to jump to the documentation for each schema.** + +### Core Fact Tables (`sui`.`CORE`.``) +- [dim_labels](#!/model/model.sui_models.core__dim_labels) + + + +The SUI models are built using three layers of SQL models: **bronze, silver, and gold (or core/defi/nft).** + +- Bronze: Data is loaded in from the source as a view +- Silver: All necessary parsing, filtering, de-duping, and other transformations are done here +- Gold (or core/defi/nft): Final views and tables that are available publicly + +The dimension tables are sourced from a variety of on-chain and off-chain sources. + +Convenience views (denoted ez_) are a combination of different fact and dimension tables. These views are built to make it easier to query the data. + +## **Using dbt docs** +### Navigation + +You can use the ```Project``` and ```Database``` navigation tabs on the left side of the window to explore the models in the project. + +### Database Tab + +This view shows relations (tables and views) grouped into database schemas. Note that ephemeral models are *not* shown in this interface, as they do not exist in the database. + +### Graph Exploration + +You can click the blue icon on the bottom-right corner of the page to view the lineage graph of your models. + +On model pages, you'll see the immediate parents and children of the model you're exploring. By clicking the Expand butsui at the top-right of this lineage pane, you'll be able to see all of the models that are used to build, or are built from, the model you're exploring. + +Once expanded, you'll be able to use the ```--models``` and ```--exclude``` model selection syntax to filter the models in the graph. For more information on model selection, check out the [dbt docs](https://docs.getdbt.com/docs/model-selection-syntax). + +Note that you can also right-click on models to interactively filter and explore the graph. + + +### **More information** +- [Flipside](https://flipsidecrypto.xyz/) +- [Github](https://github.com/FlipsideCrypto/sui-models) +- [What is dbt?](https://docs.getdbt.com/docs/introduction) + +{% enddocs %} \ No newline at end of file diff --git a/models/gold/core/core__fact_balance_changes.sql b/models/gold/core/core__fact_balance_changes.sql new file mode 100644 index 0000000..85b0846 --- /dev/null +++ b/models/gold/core/core__fact_balance_changes.sql @@ -0,0 +1,58 @@ +{{ config ( + materialized = "incremental", + unique_key = "fact_transaction_balance_changes_id", + cluster_by = ['block_timestamp::DATE'], + incremental_predicates = ["dynamic_range_predicate", "block_timestamp::date"], + tags = ['gold','core'] +) }} + +WITH base AS ( + + SELECT + A.checkpoint_number, + A.block_timestamp, + A.tx_digest, + A.transaction_json :"transaction" :"data" :"transaction" :"kind" :: STRING AS tx_kind, + A.transaction_json :"transaction" :"data" :"sender" :: STRING AS tx_sender, + A.transaction_json :"transaction" :"data" :"messageVersion" :: STRING AS message_version, + CASE + WHEN transaction_json :"effects" :"status" :"status" = 'failure' THEN FALSE + ELSE TRUE + END AS tx_succeeded, + b.index AS balance_change_index, + b.value AS bc_value, + bc_value :"amount" :: bigint AS amount, + bc_value :"coinType" :: STRING AS coin_type, + bc_value :"owner" :"AddressOwner" :: STRING AS owner + FROM + {{ ref('silver__transactions') }} A, + LATERAL FLATTEN( + A.transaction_json :"balanceChanges" + ) b + +{% if is_incremental() %} +WHERE + modified_timestamp >= ( + SELECT + COALESCE(MAX(modified_timestamp), '1970-01-01' :: TIMESTAMP) AS modified_timestamp + FROM + {{ this }}) + {% endif %} + ) +SELECT + checkpoint_number, + block_timestamp, + tx_digest, + tx_kind, + tx_sender, + message_version, + tx_succeeded, + balance_change_index, + coin_type, + amount, + owner, + {{ dbt_utils.generate_surrogate_key(['tx_digest','balance_change_index']) }} AS fact_transaction_balance_changes_id, + SYSDATE() AS inserted_timestamp, + SYSDATE() AS modified_timestamp +FROM + base diff --git a/models/gold/core/core__fact_changes.sql b/models/gold/core/core__fact_changes.sql new file mode 100644 index 0000000..9801dee --- /dev/null +++ b/models/gold/core/core__fact_changes.sql @@ -0,0 +1,68 @@ +{{ config ( + materialized = "incremental", + unique_key = "fact_changes_id", + cluster_by = ['block_timestamp::DATE'], + incremental_predicates = ["dynamic_range_predicate", "block_timestamp::date"], + tags = ['gold','core'] +) }} + +WITH base AS ( + + SELECT + A.checkpoint_number, + A.block_timestamp, + A.tx_digest, + A.transaction_json :"transaction" :"data" :"transaction" :"kind" :: STRING AS tx_kind, + A.transaction_json :"transaction" :"data" :"sender" :: STRING AS tx_sender, + A.transaction_json :"transaction" :"data" :"messageVersion" :: STRING AS message_version, + CASE + WHEN transaction_json :"effects" :"status" :"status" = 'failure' THEN FALSE + ELSE TRUE + END AS tx_succeeded, + b.index AS change_index, + b.value AS change_value, + change_value :"type" :: STRING AS TYPE, + change_value :"sender" :: STRING AS sender, + change_value :"digest" :: STRING AS digest, + change_value :"objectId" :: STRING AS object_id, + change_value :"objectType" :: STRING AS object_type, + change_value :"version" :BIGINT AS version, + change_value :"previousVersion" :BIGINT AS previous_version, + change_value :"owner" :"ObjectOwner" :: STRING AS object_owner, + FROM + {{ ref('silver__transactions') }} A, + LATERAL FLATTEN( + A.transaction_json :"objectChanges" + ) b + +{% if is_incremental() %} +WHERE + modified_timestamp >= ( + SELECT + COALESCE(MAX(modified_timestamp), '1970-01-01' :: TIMESTAMP) AS modified_timestamp + FROM + {{ this }}) + {% endif %} + ) +SELECT + checkpoint_number, + block_timestamp, + tx_digest, + tx_kind, + tx_sender, + message_version, + tx_succeeded, + change_index, + TYPE, + sender, + digest, + object_id, + object_type, + version, + previous_version, + object_owner, + {{ dbt_utils.generate_surrogate_key(['tx_digest','change_index']) }} AS fact_changes_id, + SYSDATE() AS inserted_timestamp, + SYSDATE() AS modified_timestamp +FROM + base diff --git a/models/gold/core/core__fact_checkpoints.sql b/models/gold/core/core__fact_checkpoints.sql new file mode 100644 index 0000000..33db5fe --- /dev/null +++ b/models/gold/core/core__fact_checkpoints.sql @@ -0,0 +1,34 @@ +{{ config ( + materialized = "incremental", + unique_key = "checkpoint_number", + cluster_by = ['block_timestamp::DATE'], + incremental_predicates = ["dynamic_range_predicate", "block_timestamp::date"], + tags = ['gold','core'] +) }} + +SELECT + checkpoint_number, + block_timestamp, + checkpoint_json :"epoch" :: INT AS epoch, + checkpoint_json :"digest" :: STRING AS checkpoint_digest, + checkpoint_json :"previousDigest" :: STRING AS previous_digest, + checkpoint_json :"networkTotalTransactions" :: bigint AS network_total_transactions, + checkpoint_json :"validatorSignature" :: STRING AS validator_signature, + ARRAY_SIZE( + checkpoint_json :"transactions" + ) AS tx_count, + checkpoint_json :"transactions" AS transactions_array, + {{ dbt_utils.generate_surrogate_key(['checkpoint_number']) }} AS fact_checkpoints_id, + SYSDATE() AS inserted_timestamp, + SYSDATE() AS modified_timestamp +FROM + {{ ref('silver__checkpoints') }} + +{% if is_incremental() %} +WHERE + modified_timestamp >= ( + SELECT + COALESCE(MAX(modified_timestamp), '1970-01-01' :: TIMESTAMP) AS modified_timestamp + FROM + {{ this }}) + {% endif %} diff --git a/models/gold/core/core__fact_events.sql b/models/gold/core/core__fact_events.sql new file mode 100644 index 0000000..6bbe328 --- /dev/null +++ b/models/gold/core/core__fact_events.sql @@ -0,0 +1,77 @@ +{{ config ( + materialized = "incremental", + unique_key = "fact_events_id", + cluster_by = ['block_timestamp::DATE'], + incremental_predicates = ["dynamic_range_predicate", "block_timestamp::date"], + tags = ['gold','core'] +) }} + +WITH base AS ( + + SELECT + A.checkpoint_number, + A.block_timestamp, + A.tx_digest, + A.transaction_json :"transaction" :"data" :"transaction" :"kind" :: STRING AS tx_kind, + A.transaction_json :"transaction" :"data" :"sender" :: STRING AS tx_sender, + A.transaction_json :"transaction" :"data" :"messageVersion" :: STRING AS message_version, + CASE + WHEN transaction_json :"effects" :"status" :"status" = 'failure' THEN FALSE + ELSE TRUE + END AS tx_succeeded, + b.value AS event_value, + event_value :"id" :"eventSeq" :: STRING AS event_index, + event_value :"packageId" :: STRING AS package_id, + event_value :"transactionModule" :: STRING AS transaction_module, + event_value :"sender" :: STRING AS sender, + event_value :"type" :: STRING AS TYPE, + event_value :"parsedJson" AS parsed_json + FROM + {{ ref('silver__transactions') }} A, + LATERAL FLATTEN( + A.transaction_json :"events" + ) b + +{% if is_incremental() %} +WHERE + modified_timestamp >= ( + SELECT + COALESCE(MAX(modified_timestamp), '1970-01-01' :: TIMESTAMP) AS modified_timestamp + FROM + {{ this }}) + {% endif %} + ) +SELECT + checkpoint_number, + block_timestamp, + tx_digest, + tx_kind, + tx_sender, + message_version, + tx_succeeded, + event_index, + TYPE, + SPLIT_PART( + TYPE, + '::', + 1 + ) AS event_address, + SPLIT_PART( + TYPE, + '::', + 2 + ) AS event_module, + SPLIT_PART( + TYPE, + '::', + 3 + ) AS event_resource, + package_id, + transaction_module, + sender, + parsed_json, + {{ dbt_utils.generate_surrogate_key(['tx_digest','event_index']) }} AS fact_events_id, + SYSDATE() AS inserted_timestamp, + SYSDATE() AS modified_timestamp +FROM + base diff --git a/models/gold/core/core__fact_transaction_blocks.sql b/models/gold/core/core__fact_transaction_blocks.sql new file mode 100644 index 0000000..2060764 --- /dev/null +++ b/models/gold/core/core__fact_transaction_blocks.sql @@ -0,0 +1,74 @@ +{{ config ( + materialized = "incremental", + unique_key = "tx_digest", + cluster_by = ['block_timestamp::DATE'], + incremental_predicates = ["dynamic_range_predicate", "block_timestamp::date"], + tags = ['gold','core'] +) }} + +WITH base AS ( + + SELECT + checkpoint_number, + block_timestamp, + tx_digest, + transaction_json :"transaction" :"data" :"transaction" :"kind" :: STRING AS tx_kind, + transaction_json :"transaction" :"data" :"sender" :: STRING AS tx_sender, + transaction_json :"transaction" :"data" :"messageVersion" :: STRING AS message_version, + CASE + WHEN transaction_json :"effects" :"status" :"status" = 'failure' THEN FALSE + ELSE TRUE + END AS tx_succeeded, + transaction_json :"effects" :"status" :"error" :: STRING AS tx_error, + {# transaction_json :"transaction" :txSignatures AS tx_signatures, #} + transaction_json :"effects": "dependencies" AS tx_dependencies, + {# transaction_json :"effects": "gasObject" :"reference" :"digest" :: STRING AS gas_digest, #} + transaction_json :"effects": "gasUsed" :"computationCost" :: bigint AS gas_used_computation_cost, + transaction_json :"effects": "gasUsed" :"nonRefundableStorageFee" :: bigint AS gas_used_non_refundable_storage_fee, + transaction_json :"effects": "gasUsed" :"storageCost" :: bigint AS gas_used_storage_cost, + transaction_json :"effects": "gasUsed" :"storageRebate" :: bigint AS gas_used_storage_rebate, + transaction_json :"transaction" :"data" :"gasData" :"budget" :: bigint AS gas_budget, + transaction_json :"transaction" :"data" :"gasData" :"owner" :: STRING AS gas_owner, + transaction_json :"transaction" :"data" :"gasData" :"price" :: bigint AS gas_price, + {# transaction_json :"transaction" :"data" :"gasData" :"payment" AS gas_payments, #} + ( + gas_used_computation_cost + gas_used_storage_cost - gas_used_storage_rebate + ) / pow( + 10, + 9 + ) AS tx_fee + FROM + {{ ref('silver__transactions') }} + +{% if is_incremental() %} +WHERE + modified_timestamp >= ( + SELECT + COALESCE(MAX(modified_timestamp), '1970-01-01' :: TIMESTAMP) AS modified_timestamp + FROM + {{ this }}) + {% endif %} + ) +SELECT + checkpoint_number, + block_timestamp, + tx_digest, + tx_kind, + tx_sender, + message_version, + tx_fee, + tx_succeeded, + tx_error, + tx_dependencies, + gas_used_computation_cost, + gas_used_non_refundable_storage_fee, + gas_used_storage_cost, + gas_used_storage_rebate, + gas_price, + gas_budget, + gas_owner, + {{ dbt_utils.generate_surrogate_key(['tx_digest']) }} AS fact_transaction_blocks_id, + SYSDATE() AS inserted_timestamp, + SYSDATE() AS modified_timestamp +FROM + base diff --git a/models/gold/core/core__fact_transaction_inputs.sql b/models/gold/core/core__fact_transaction_inputs.sql new file mode 100644 index 0000000..41f8eff --- /dev/null +++ b/models/gold/core/core__fact_transaction_inputs.sql @@ -0,0 +1,70 @@ +{{ config ( + materialized = "incremental", + unique_key = "fact_transaction_inputs_id", + cluster_by = ['block_timestamp::DATE'], + incremental_predicates = ["dynamic_range_predicate", "block_timestamp::date"], + tags = ['gold','core'] +) }} + +WITH base AS ( + + SELECT + A.checkpoint_number, + A.block_timestamp, + A.tx_digest, + A.transaction_json :"transaction" :"data" :"transaction" :"kind" :: STRING AS tx_kind, + A.transaction_json :"transaction" :"data" :"sender" :: STRING AS tx_sender, + A.transaction_json :"transaction" :"data" :"messageVersion" :: STRING AS message_version, + CASE + WHEN transaction_json :"effects" :"status" :"status" = 'failure' THEN FALSE + ELSE TRUE + END AS tx_succeeded, + b.index AS input_index, + b.value AS input_value, + input_value :"initialSharedVersion" :: STRING AS initial_shared_version, + input_value :"mutable" :: BOOLEAN AS mutable, + input_value :"objectId" :: STRING AS object_id, + input_value :"objectType" :: STRING AS object_type, + input_value :"type" :: STRING AS TYPE, + input_value :"version" :: bigint AS version, + input_value :"digest" :: STRING AS digest, + input_value :"value" :: STRING AS VALUE, + input_value :"valueType" :: STRING AS value_type + FROM + {{ ref('silver__transactions') }} A, + LATERAL FLATTEN( + A.transaction_json :"transaction" :"data" :"transaction": "inputs" + ) b + +{% if is_incremental() %} +WHERE + modified_timestamp >= ( + SELECT + COALESCE(MAX(modified_timestamp), '1970-01-01' :: TIMESTAMP) AS modified_timestamp + FROM + {{ this }}) + {% endif %} + ) +SELECT + checkpoint_number, + block_timestamp, + tx_digest, + tx_kind, + tx_sender, + message_version, + tx_succeeded, + input_index, + TYPE, + version, + object_id, + object_type, + digest, + VALUE, + value_type, + initial_shared_version, + mutable, + {{ dbt_utils.generate_surrogate_key(['tx_digest','input_index']) }} AS fact_transaction_inputs_id, + SYSDATE() AS inserted_timestamp, + SYSDATE() AS modified_timestamp +FROM + base diff --git a/models/gold/core/core__fact_transactions.sql b/models/gold/core/core__fact_transactions.sql new file mode 100644 index 0000000..b260e18 --- /dev/null +++ b/models/gold/core/core__fact_transactions.sql @@ -0,0 +1,58 @@ +{{ config ( + materialized = "incremental", + unique_key = "fact_transactions_id", + cluster_by = ['block_timestamp::DATE'], + incremental_predicates = ["dynamic_range_predicate", "block_timestamp::date"], + tags = ['gold','core'] +) }} + +WITH base AS ( + + SELECT + A.checkpoint_number, + A.block_timestamp, + A.tx_digest, + A.transaction_json :"transaction" :"data" :"transaction" :"kind" :: STRING AS tx_kind, + A.transaction_json :"transaction" :"data" :"sender" :: STRING AS tx_sender, + A.transaction_json :"transaction" :"data" :"messageVersion" :: STRING AS message_version, + CASE + WHEN transaction_json :"effects" :"status" :"status" = 'failure' THEN FALSE + ELSE TRUE + END AS tx_succeeded, + b.index AS payload_index, + C.key AS payload_type, + C.value AS payload_details + FROM + {{ ref('silver__transactions') }} A, + LATERAL FLATTEN( + A.transaction_json :"transaction" :"data" :"transaction": "transactions" + ) b, + LATERAL FLATTEN( + b.value + ) C + +{% if is_incremental() %} +WHERE + modified_timestamp >= ( + SELECT + COALESCE(MAX(modified_timestamp), '1970-01-01' :: TIMESTAMP) AS modified_timestamp + FROM + {{ this }}) + {% endif %} + ) +SELECT + checkpoint_number, + block_timestamp, + tx_digest, + tx_kind, + tx_sender, + message_version, + tx_succeeded, + payload_index, + payload_type, + payload_details, + {{ dbt_utils.generate_surrogate_key(['tx_digest','payload_index']) }} AS fact_transactions_id, + SYSDATE() AS inserted_timestamp, + SYSDATE() AS modified_timestamp +FROM + base diff --git a/models/gold/core/gold_core.yml b/models/gold/core/gold_core.yml new file mode 100644 index 0000000..43da7f3 --- /dev/null +++ b/models/gold/core/gold_core.yml @@ -0,0 +1,72 @@ +version: 2 + +models: + - name: core__fact_checkpoints + description: "Contains information about checkpoints in the Sui blockchain" + columns: + - name: checkpoint_number + description: "Unique identifier for the checkpoint within the blockchain." + data_type: integer + + - name: block_timestamp + description: "Timestamp when the checkpoint was created." + data_type: timestamp + + - name: epoch + description: "Epoch number in which the checkpoint was included." + data_type: integer + + - name: checkpoint_digest + description: "The digest identified for the checkpoint" + data_type: string + + - name: previous_digest + description: "Digest of the previous checkpoint for chain continuity." + data_type: string + + - name: network_total_transactions + description: "Cumulative total of transactions on the network up to this checkpoint." + data_type: bigint + + - name: validator_signature + description: "Signature of the validator for this block." + data_type: string + + - name: tx_count + description: "Total number of transactions included in this checkpoint." + data_type: bigint + + - name: transactions_array + description: "The array of transactions included in this checkpoint." + data_type: variant + + - name: FACT_CHECKPOINTS_ID + description: "Surrogate key for the checkpoint fact table, generated from the checkpoint number and block timestamp." + data_type: text + + - name: INSERTED_TIMESTAMP + description: "Timestamp when the record was inserted into the database." + data_type: timestamp + + - name: MODIFIED_TIMESTAMP + description: "Timestamp when the record was last modified." + data_type: timestamp + + + config: + contract: + enforced: true + tests: + - dbt_utils.recency: + datepart: hour + field: block_timestamp + interval: 12 + severity: error + tags: ['test_recency'] + - dbt_utils.sequential_values: + column_name: checkpoint_number + interval: 1 + config: + severity: error + error_if: ">1" + tags: ['test_recency'] \ No newline at end of file diff --git a/models/silver/core/silver__checkpoints.sql b/models/silver/core/silver__checkpoints.sql new file mode 100644 index 0000000..89529f9 --- /dev/null +++ b/models/silver/core/silver__checkpoints.sql @@ -0,0 +1,49 @@ +-- depends_on: {{ ref('bronze__checkpoints') }} +{{ config ( + materialized = "incremental", + unique_key = "checkpoint_number", + cluster_by = ['modified_timestamp::DATE','block_timestamp::DATE'], + incremental_predicates = ["dynamic_range_predicate", "block_timestamp::date"], + tags = ['silver','core'] +) }} + +WITH bronze_checks AS ( + + SELECT + DATA :"result" :"sequenceNumber" :: bigint AS checkpoint_number, + TO_TIMESTAMP( + DATA :"result" :"timestampMs" + ) AS block_timestamp, + partition_key, + DATA :result AS checkpoint_json, + _inserted_timestamp + FROM + +{% if is_incremental() %} +{{ ref('bronze__checkpoints') }} +WHERE + _inserted_timestamp >= ( + SELECT + COALESCE(MAX(_inserted_timestamp), '1900-01-01' :: TIMESTAMP) AS _inserted_timestamp + FROM + {{ this }}) + {% else %} + {{ ref('bronze__checkpoints_FR') }} + {% endif %} + ) +SELECT + checkpoint_number, + block_timestamp, + partition_key, + checkpoint_json, + _inserted_timestamp, + {{ dbt_utils.generate_surrogate_key(['checkpoint_number']) }} AS checkpoints_id, + SYSDATE() AS inserted_timestamp, + SYSDATE() AS modified_timestamp, + '{{ invocation_id }}' AS _invocation_id +FROM + bronze_checks qualify ROW_NUMBER() over ( + PARTITION BY checkpoint_number + ORDER BY + _inserted_timestamp DESC + ) = 1 diff --git a/models/silver/core/silver__transactions.sql b/models/silver/core/silver__transactions.sql new file mode 100644 index 0000000..22bdd2e --- /dev/null +++ b/models/silver/core/silver__transactions.sql @@ -0,0 +1,54 @@ +-- depends_on: {{ ref('bronze__transactions') }} +{{ config ( + materialized = "incremental", + unique_key = "tx_digest", + cluster_by = ['modified_timestamp::DATE','block_timestamp::DATE'], + incremental_predicates = ["dynamic_range_predicate", "block_timestamp::date"], + tags = ['silver','core'] +) }} + +WITH bronze_txs AS ( + + SELECT + DATA :"checkpoint" :: bigint AS checkpoint_number, + DATA :"digest" :: STRING AS tx_digest, + TO_TIMESTAMP( + DATA :"timestampMs" + ) AS block_timestamp, + partition_key, + DATA AS transaction_json, + _inserted_timestamp + FROM + +{% if is_incremental() %} +{{ ref('bronze__transactions') }} +WHERE + DATA :error IS NULL + AND _inserted_timestamp >= ( + SELECT + COALESCE(MAX(_inserted_timestamp), '1900-01-01' :: TIMESTAMP) AS _inserted_timestamp + FROM + {{ this }}) + {% else %} + {{ ref('bronze__transactions_FR') }} + WHERE + DATA :error IS NULL + {% endif %} + ) +SELECT + checkpoint_number, + tx_digest, + block_timestamp, + partition_key, + transaction_json, + _inserted_timestamp, + {{ dbt_utils.generate_surrogate_key(['checkpoint_number','tx_digest']) }} AS transactions_id, + SYSDATE() AS inserted_timestamp, + SYSDATE() AS modified_timestamp, + '{{ invocation_id }}' AS _invocation_id +FROM + bronze_txs qualify ROW_NUMBER() over ( + PARTITION BY tx_digest + ORDER BY + _inserted_timestamp DESC + ) = 1 diff --git a/models/silver/core/silver_core.yml b/models/silver/core/silver_core.yml new file mode 100644 index 0000000..506010f --- /dev/null +++ b/models/silver/core/silver_core.yml @@ -0,0 +1,826 @@ +version: 2 + +models: + - name: silver__account_states + config: + contract: + enforced: true + columns: + - name: block_date + data_type: DATE + - name: account + data_type: VARCHAR + - name: account_states_id + data_type: VARCHAR + - name: account_status + data_type: VARCHAR + - name: balance + data_type: NUMBER + - name: code_boc + data_type: BINARY + - name: code_hash + data_type: VARCHAR + - name: data_boc + data_type: BINARY + - name: data_hash + data_type: VARCHAR + - name: frozen_hash + data_type: VARCHAR + - name: hash + data_type: VARCHAR + - name: inserted_timestamp + data_type: TIMESTAMP_NTZ + - name: last_trans_hash + data_type: VARCHAR + - name: last_trans_lt + data_type: NUMBER + - name: modified_timestamp + data_type: TIMESTAMP_NTZ + - name: timestamp + data_type: NUMBER + - name: _inserted_timestamp + data_type: TIMESTAMP_NTZ + - name: _invocation_id + data_type: VARCHAR + + - name: silver__balances_history + config: + contract: + enforced: true + columns: + - name: block_date + data_type: DATE + - name: lt + data_type: NUMBER + - name: address + data_type: VARCHAR + - name: amount + data_type: NUMBER + - name: asset + data_type: VARCHAR + - name: balances_history_id + data_type: VARCHAR + - name: inserted_timestamp + data_type: TIMESTAMP_NTZ + - name: mintless_claimed + data_type: BOOLEAN + - name: modified_timestamp + data_type: TIMESTAMP_NTZ + - name: timestamp + data_type: NUMBER + - name: _inserted_timestamp + data_type: TIMESTAMP_NTZ + - name: _invocation_id + data_type: VARCHAR + + - name: silver__blocks + config: + contract: + enforced: true + columns: + - name: block_date + data_type: DATE + - name: version + data_type: NUMBER + - name: created_by + data_type: VARCHAR + - name: end_lt + data_type: NUMBER + - name: want_merge + data_type: BOOLEAN + - name: gen_utime + data_type: NUMBER + - name: tx_count + data_type: NUMBER + - name: global_id + data_type: NUMBER + - name: root_hash + data_type: VARCHAR + - name: key_block + data_type: BOOLEAN + - name: mc_block_seqno + data_type: NUMBER + - name: vert_seqno_incr + data_type: BOOLEAN + - name: validator_list_hash_short + data_type: NUMBER + - name: after_merge + data_type: BOOLEAN + - name: want_split + data_type: BOOLEAN + - name: after_split + data_type: BOOLEAN + - name: master_ref_seqno + data_type: NUMBER + - name: mc_block_workchain + data_type: NUMBER + - name: file_hash + data_type: VARCHAR + - name: prev_key_block_seqno + data_type: NUMBER + - name: shard + data_type: NUMBER + - name: seqno + data_type: NUMBER + - name: vert_seqno + data_type: NUMBER + - name: flags + data_type: NUMBER + - name: rand_seed + data_type: VARCHAR + - name: gen_catchain_seqno + data_type: NUMBER + - name: min_ref_mc_seqno + data_type: NUMBER + - name: start_lt + data_type: NUMBER + - name: mc_block_shard + data_type: NUMBER + - name: before_split + data_type: BOOLEAN + - name: workchain + data_type: NUMBER + - name: blocks_id + data_type: VARCHAR + - name: inserted_timestamp + data_type: TIMESTAMP_NTZ + - name: modified_timestamp + data_type: TIMESTAMP_NTZ + - name: _inserted_timestamp + data_type: TIMESTAMP_NTZ + - name: _invocation_id + data_type: VARCHAR + + - name: silver__dex_pools + config: + contract: + enforced: true + columns: + - name: block_date + data_type: DATE + - name: reserves_right + data_type: NUMBER + - name: dex_pools_id + data_type: VARCHAR + - name: discovered_at + data_type: NUMBER + - name: inserted_timestamp + data_type: TIMESTAMP_NTZ + - name: is_liquid + data_type: BOOLEAN + - name: jetsui_left + data_type: VARCHAR + - name: jetsui_right + data_type: VARCHAR + - name: last_updated + data_type: NUMBER + - name: lp_fee + data_type: NUMBER + - name: modified_timestamp + data_type: TIMESTAMP_NTZ + - name: pool + data_type: VARCHAR + - name: project + data_type: VARCHAR + - name: protocol_fee + data_type: NUMBER + - name: referral_fee + data_type: NUMBER + - name: reserves_left + data_type: NUMBER + - name: total_supply + data_type: NUMBER + - name: tvl_sui + data_type: NUMBER + - name: tvl_usd + data_type: NUMBER + - name: version + data_type: NUMBER + - name: _inserted_timestamp + data_type: TIMESTAMP_NTZ + - name: _invocation_id + data_type: VARCHAR + + - name: silver__dex_trades + config: + contract: + enforced: true + columns: + - name: block_date + data_type: DATE + - name: volume_sui + data_type: NUMBER + - name: amount_bought_raw + data_type: NUMBER + - name: amount_sold_raw + data_type: NUMBER + - name: dex_trades_id + data_type: VARCHAR + - name: event_time + data_type: NUMBER + - name: event_type + data_type: VARCHAR + - name: inserted_timestamp + data_type: TIMESTAMP_NTZ + - name: modified_timestamp + data_type: TIMESTAMP_NTZ + - name: platform_tag + data_type: VARCHAR + - name: pool_address + data_type: VARCHAR + - name: project + data_type: VARCHAR + - name: project_type + data_type: VARCHAR + - name: query_id + data_type: NUMBER + - name: referral_address + data_type: VARCHAR + - name: router_address + data_type: VARCHAR + - name: token_bought_address + data_type: VARCHAR + - name: token_sold_address + data_type: VARCHAR + - name: trace_id + data_type: VARCHAR + - name: trader_address + data_type: VARCHAR + - name: tx_hash + data_type: VARCHAR + - name: version + data_type: NUMBER + - name: volume_usd + data_type: NUMBER + - name: _inserted_timestamp + data_type: TIMESTAMP_NTZ + - name: _invocation_id + data_type: VARCHAR + + - name: silver__jetsui_events + config: + contract: + enforced: true + columns: + - name: block_date + data_type: DATE + - name: tx_hash + data_type: VARCHAR + - name: amount + data_type: NUMBER + - name: comment + data_type: VARCHAR + - name: destination + data_type: VARCHAR + - name: forward_sui_amount + data_type: NUMBER + - name: inserted_timestamp + data_type: TIMESTAMP_NTZ + - name: jetsui_events_id + data_type: VARCHAR + - name: jetsui_master + data_type: VARCHAR + - name: jetsui_wallet + data_type: VARCHAR + - name: modified_timestamp + data_type: TIMESTAMP_NTZ + - name: query_id + data_type: NUMBER + - name: trace_id + data_type: VARCHAR + - name: source + data_type: VARCHAR + - name: tx_aborted + data_type: BOOLEAN + - name: tx_lt + data_type: NUMBER + - name: type + data_type: VARCHAR + - name: utime + data_type: NUMBER + - name: _inserted_timestamp + data_type: TIMESTAMP_NTZ + - name: _invocation_id + data_type: VARCHAR + + - name: silver__jetsui_metadata + config: + contract: + enforced: true + columns: + - name: adding_date + data_type: DATE + - name: suiapi_image_url + data_type: VARCHAR + - name: adding_at + data_type: NUMBER + - name: address + data_type: VARCHAR + - name: admin_address + data_type: VARCHAR + - name: code_hash + data_type: VARCHAR + - name: decimals + data_type: NUMBER + - name: description + data_type: VARCHAR + - name: image + data_type: VARCHAR + - name: image_data + data_type: VARCHAR + - name: inserted_timestamp + data_type: TIMESTAMP_NTZ + - name: jetsui_content_onchain + data_type: VARCHAR + - name: jetsui_metadata_id + data_type: VARCHAR + - name: jetsui_wallet_code_hash + data_type: VARCHAR + - name: metadata_status + data_type: NUMBER + - name: mintable + data_type: BOOLEAN + - name: modified_timestamp + data_type: TIMESTAMP_NTZ + - name: name + data_type: VARCHAR + - name: sources + data_type: VARIANT + - name: symbol + data_type: VARCHAR + - name: update_time_metadata + data_type: NUMBER + - name: update_time_onchain + data_type: NUMBER + - name: _inserted_timestamp + data_type: TIMESTAMP_NTZ + - name: _invocation_id + data_type: VARCHAR + + - name: silver__messages_with_data + config: + contract: + enforced: true + columns: + - name: block_date + data_type: DATE + - name: body_boc + data_type: BINARY + - name: body_hash + data_type: VARCHAR + - name: bounce + data_type: BOOLEAN + - name: bounced + data_type: BOOLEAN + - name: comment + data_type: VARCHAR + - name: created_at + data_type: NUMBER + - name: created_lt + data_type: NUMBER + - name: destination + data_type: VARCHAR + - name: direction + data_type: VARCHAR + - name: fwd_fee + data_type: NUMBER + - name: ihr_disabled + data_type: BOOLEAN + - name: ihr_fee + data_type: NUMBER + - name: import_fee + data_type: NUMBER + - name: init_state_boc + data_type: BINARY + - name: init_state_hash + data_type: VARCHAR + - name: inserted_timestamp + data_type: TIMESTAMP_NTZ + - name: messages_with_data_id + data_type: VARCHAR + - name: modified_timestamp + data_type: TIMESTAMP_NTZ + - name: msg_hash + data_type: VARCHAR + - name: opcode + data_type: NUMBER + - name: source + data_type: VARCHAR + - name: trace_id + data_type: VARCHAR + - name: tx_hash + data_type: VARCHAR + - name: tx_lt + data_type: NUMBER + - name: tx_now + data_type: NUMBER + - name: _inserted_timestamp + data_type: TIMESTAMP_NTZ + - name: _invocation_id + data_type: VARCHAR + - name: _value + data_type: NUMBER + + - name: silver__nft_events + config: + contract: + enforced: true + columns: + - name: block_date + data_type: DATE + - name: sale_price + data_type: NUMBER + - name: royalty_address + data_type: VARCHAR + - name: payment_asset + data_type: VARCHAR + - name: marketplace_fee_address + data_type: VARCHAR + - name: owner_address + data_type: VARCHAR + - name: collection_address + data_type: VARCHAR + - name: content_onchain + data_type: TEXT + - name: trace_id + data_type: VARCHAR + - name: sale_contract + data_type: VARCHAR + - name: forward_amount + data_type: NUMBER + - name: nft_item_index + data_type: TEXT + - name: query_id + data_type: NUMBER + - name: is_init + data_type: BOOLEAN + - name: timestamp + data_type: NUMBER + - name: nft_item_address + data_type: VARCHAR + - name: custom_payload + data_type: BINARY + - name: comment + data_type: VARCHAR + - name: sale_end_time + data_type: NUMBER + - name: sale_type + data_type: VARCHAR + - name: auction_max_bid + data_type: NUMBER + - name: auction_min_bid + data_type: NUMBER + - name: marketplace_address + data_type: VARCHAR + - name: forward_payload + data_type: BINARY + - name: royalty_amount + data_type: NUMBER + - name: auction_min_step + data_type: NUMBER + - name: type + data_type: VARCHAR + - name: prev_owner + data_type: VARCHAR + - name: tx_hash + data_type: VARCHAR + - name: marketplace_fee + data_type: NUMBER + - name: lt + data_type: NUMBER + - name: nft_events_id + data_type: VARCHAR + - name: inserted_timestamp + data_type: TIMESTAMP_NTZ + - name: modified_timestamp + data_type: TIMESTAMP_NTZ + - name: _inserted_timestamp + data_type: TIMESTAMP_NTZ + - name: _invocation_id + data_type: VARCHAR + + - name: silver__nft_items + config: + contract: + enforced: true + columns: + - name: block_date + data_type: DATE + - name: collection_address + data_type: VARCHAR + - name: is_init + data_type: BOOLEAN + - name: lt + data_type: NUMBER + - name: timestamp + data_type: NUMBER + - name: address + data_type: VARCHAR + - name: owner_address + data_type: VARCHAR + - name: index + data_type: TEXT + - name: content_onchain + data_type: TEXT + - name: nft_items_id + data_type: VARCHAR + - name: inserted_timestamp + data_type: TIMESTAMP_NTZ + - name: modified_timestamp + data_type: TIMESTAMP_NTZ + - name: _inserted_timestamp + data_type: TIMESTAMP_NTZ + - name: _invocation_id + data_type: VARCHAR + + - name: silver__nft_metadata + config: + contract: + enforced: true + columns: + - name: adding_date + data_type: DATE + - name: description + data_type: VARCHAR + - name: image + data_type: VARCHAR + - name: metadata_status + data_type: NUMBER + - name: parent_address + data_type: VARCHAR + - name: update_time_metadata + data_type: NUMBER + - name: adding_at + data_type: NUMBER + - name: update_time_onchain + data_type: NUMBER + - name: address + data_type: VARCHAR + - name: suiapi_image_url + data_type: VARCHAR + - name: content_onchain + data_type: VARCHAR + - name: type + data_type: VARCHAR + - name: attributes + data_type: VARCHAR + - name: name + data_type: VARCHAR + - name: sources + data_type: VARIANT + - name: image_data + data_type: VARCHAR + - name: nft_metadata_id + data_type: VARCHAR + - name: inserted_timestamp + data_type: TIMESTAMP_NTZ + - name: modified_timestamp + data_type: TIMESTAMP_NTZ + - name: _inserted_timestamp + data_type: TIMESTAMP_NTZ + - name: _invocation_id + data_type: VARCHAR + + - name: silver__nft_sales + config: + contract: + enforced: true + columns: + - name: block_date + data_type: DATE + - name: is_canceled + data_type: BOOLEAN + - name: marketplace_fee_address + data_type: VARCHAR + - name: end_time + data_type: NUMBER + - name: is_complete + data_type: BOOLEAN + - name: last_member + data_type: VARCHAR + - name: marketplace_address + data_type: VARCHAR + - name: royalty_amount + data_type: NUMBER + - name: created_at + data_type: NUMBER + - name: nft_address + data_type: VARCHAR + - name: marketplace_fee + data_type: NUMBER + - name: asset + data_type: VARCHAR + - name: price + data_type: NUMBER + - name: nft_owner_address + data_type: VARCHAR + - name: address + data_type: VARCHAR + - name: min_bid + data_type: NUMBER + - name: timestamp + data_type: NUMBER + - name: royalty_address + data_type: VARCHAR + - name: min_step + data_type: NUMBER + - name: max_bid + data_type: NUMBER + - name: last_bid_at + data_type: NUMBER + - name: lt + data_type: NUMBER + - name: type + data_type: VARCHAR + - name: nft_sales_id + data_type: VARCHAR + - name: inserted_timestamp + data_type: TIMESTAMP_NTZ + - name: modified_timestamp + data_type: TIMESTAMP_NTZ + - name: _inserted_timestamp + data_type: TIMESTAMP_NTZ + - name: _invocation_id + data_type: VARCHAR + + - name: silver__nft_transfers + config: + contract: + enforced: true + columns: + - name: block_date + data_type: DATE + - name: trace_id + data_type: VARCHAR + - name: tx_now + data_type: NUMBER + - name: custom_payload + data_type: BINARY + - name: new_owner + data_type: VARCHAR + - name: forward_payload + data_type: BINARY + - name: comment + data_type: VARCHAR + - name: old_owner + data_type: VARCHAR + - name: tx_aborted + data_type: BOOLEAN + - name: query_id + data_type: NUMBER + - name: tx_hash + data_type: VARCHAR + - name: tx_lt + data_type: NUMBER + - name: response_destination + data_type: VARCHAR + - name: nft_collection_address + data_type: VARCHAR + - name: forward_amount + data_type: NUMBER + - name: nft_item_address + data_type: VARCHAR + - name: nft_item_index + data_type: VARCHAR + - name: nft_transfers_id + data_type: VARCHAR + - name: inserted_timestamp + data_type: TIMESTAMP_NTZ + - name: modified_timestamp + data_type: TIMESTAMP_NTZ + - name: _inserted_timestamp + data_type: TIMESTAMP_NTZ + - name: _invocation_id + data_type: VARCHAR + + - name: silver__transactions + config: + contract: + enforced: true + columns: + - name: block_date + data_type: DATE + - name: compute_success + data_type: BOOLEAN + - name: total_fees + data_type: NUMBER + - name: orig_status + data_type: VARCHAR + - name: compute_skipped + data_type: BOOLEAN + - name: compute_gas_fees + data_type: NUMBER + - name: action_result_code + data_type: NUMBER + - name: destroyed + data_type: BOOLEAN + - name: action_success + data_type: BOOLEAN + - name: compute_msg_state_used + data_type: BOOLEAN + - name: is_tock + data_type: BOOLEAN + - name: account_state_hash_after + data_type: VARCHAR + - name: action_spec_actions + data_type: NUMBER + - name: descr + data_type: VARCHAR + - name: account_state_balance_before + data_type: NUMBER + - name: hash + data_type: VARCHAR + - name: action_result_arg + data_type: NUMBER + - name: aborted + data_type: BOOLEAN + - name: mc_block_seqno + data_type: NUMBER + - name: compute_account_activated + data_type: BOOLEAN + - name: action_skipped_actions + data_type: NUMBER + - name: now + data_type: NUMBER + - name: credit_due_fees_collected + data_type: NUMBER + - name: block_shard + data_type: NUMBER + - name: end_status + data_type: VARCHAR + - name: credit_first + data_type: BOOLEAN + - name: prev_trans_hash + data_type: VARCHAR + - name: block_workchain + data_type: NUMBER + - name: account + data_type: VARCHAR + - name: compute_vm_steps + data_type: NUMBER + - name: storage_fees_collected + data_type: NUMBER + - name: compute_exit_arg + data_type: NUMBER + - name: action_valid + data_type: BOOLEAN + - name: action_status_change + data_type: VARCHAR + - name: installed + data_type: BOOLEAN + - name: prev_trans_lt + data_type: NUMBER + - name: compute_gas_credit + data_type: NUMBER + - name: compute_gas_limit + data_type: NUMBER + - name: skipped_reason + data_type: VARCHAR + - name: action_total_fwd_fees + data_type: NUMBER + - name: account_state_code_hash_before + data_type: VARCHAR + - name: account_state_hash_before + data_type: VARCHAR + - name: compute_exit_code + data_type: NUMBER + - name: trace_id + data_type: VARCHAR + - name: block_seqno + data_type: NUMBER + - name: storage_status_change + data_type: VARCHAR + - name: lt + data_type: NUMBER + - name: compute_mode + data_type: NUMBER + - name: credit + data_type: NUMBER + - name: storage_fees_due + data_type: NUMBER + - name: compute_gas_used + data_type: NUMBER + - name: account_state_code_hash_after + data_type: VARCHAR + - name: action_total_action_fees + data_type: NUMBER + - name: compute_vm_init_state_hash + data_type: VARCHAR + - name: account_state_balance_after + data_type: NUMBER + - name: action_tot_actions + data_type: NUMBER + - name: compute_vm_final_state_hash + data_type: VARCHAR + - name: action_no_funds + data_type: BOOLEAN + - name: transactions_id + data_type: VARCHAR + - name: inserted_timestamp + data_type: TIMESTAMP_NTZ + - name: modified_timestamp + data_type: TIMESTAMP_NTZ + - name: _inserted_timestamp + data_type: TIMESTAMP_NTZ + - name: _invocation_id + data_type: VARCHAR diff --git a/models/sources.yml b/models/sources.yml new file mode 100644 index 0000000..4e420d4 --- /dev/null +++ b/models/sources.yml @@ -0,0 +1,23 @@ +version: 2 + +sources: + - name: bronze_streamline + database: streamline + schema: "{{ 'sui' if target.database == 'SUI' else 'sui_dev' }}" + tables: + - name: checkpoints + - name: transactions + + - name: crosschain + database: "{{ 'crosschain' if target.database == 'SUI' else 'crosschain_dev' }}" + schema: core + tables: + - name: address_tags + - name: dim_dates + - name: crosschain_silver + database: "{{ 'crosschain' if target.database == 'SUI' else 'crosschain_dev' }}" + schema: silver + tables: + - name: number_sequence + - name: complete_native_prices + - name: labels_combined \ No newline at end of file diff --git a/models/streamline/core/_sequence_lookback.sql b/models/streamline/core/_sequence_lookback.sql new file mode 100644 index 0000000..4bcb30e --- /dev/null +++ b/models/streamline/core/_sequence_lookback.sql @@ -0,0 +1,12 @@ +{{ config( + materialized = 'ephemeral', + enabled = false +) }} + +SELECT + COALESCE(MIN(checkpoint_number), 0) AS checkpoint_number +FROM + {{ ref("core__fact_checkpoints") }} +WHERE + block_timestamp >= DATEADD('hour', -72, TRUNCATE(SYSDATE(), 'HOUR')) + AND block_timestamp < DATEADD('hour', -71, TRUNCATE(SYSDATE(), 'HOUR')) diff --git a/models/streamline/core/complete/streamline__checkpoints_complete.sql b/models/streamline/core/complete/streamline__checkpoints_complete.sql new file mode 100644 index 0000000..572ba9a --- /dev/null +++ b/models/streamline/core/complete/streamline__checkpoints_complete.sql @@ -0,0 +1,48 @@ +-- depends_on: {{ ref('bronze__checkpoints') }} +-- depends_on: {{ ref('bronze__checkpoints_FR') }} +{{ config ( + materialized = "incremental", + unique_key = ['checkpoint_number'], + merge_exclude_columns = ["inserted_timestamp"], + cluster_by = "ROUND(checkpoint_number, -5)", + tags = ['streamline_realtime'] +) }} + +SELECT + DATA :"result": "sequenceNumber" :: bigint AS checkpoint_number, + TO_TIMESTAMP( + DATA :"result" :"timestampMs" + ) AS block_timestamp, + DATA :"result": "transactions" AS transactions_array, + ARRAY_SIZE( + DATA :"result": "transactions" + ) AS tx_count, + partition_key, + _inserted_timestamp, + SYSDATE() AS inserted_timestamp, + SYSDATE() AS modified_timestamp, + file_name, + '{{ invocation_id }}' AS _invocation_id, +FROM + +{% if is_incremental() %} +{{ ref('bronze__checkpoints') }} +{% else %} + {{ ref('bronze__checkpoints_FR') }} +{% endif %} +WHERE + DATA :error IS NULL + +{% if is_incremental() %} +AND _inserted_timestamp >= ( + SELECT + COALESCE(MAX(_INSERTED_TIMESTAMP), '1970-01-01' :: DATE) max_INSERTED_TIMESTAMP + FROM + {{ this }}) + {% endif %} + + qualify ROW_NUMBER() over ( + PARTITION BY checkpoint_number + ORDER BY + _inserted_timestamp DESC + ) = 1 diff --git a/models/streamline/core/complete/streamline__transactions_complete.sql b/models/streamline/core/complete/streamline__transactions_complete.sql new file mode 100644 index 0000000..dece040 --- /dev/null +++ b/models/streamline/core/complete/streamline__transactions_complete.sql @@ -0,0 +1,51 @@ +-- depends_on: {{ ref('bronze__transactions') }} +-- depends_on: {{ ref('bronze__transactions_FR') }} +{{ config ( + materialized = "incremental", + unique_key = ['tx_digest'], + incremental_predicates = ["dynamic_range_predicate", "block_timestamp::date"], + merge_exclude_columns = ["inserted_timestamp"], + cluster_by = "block_timestamp::DATE", + tags = ['streamline_realtime'], + post_hook = enable_search_optimization( + '{{this.schema}}', + '{{this.identifier}}', + 'ON EQUALITY(tx_digest)' + ), +) }} + +SELECT + DATA :"checkpoint" :: bigint AS checkpoint_number, + DATA :"digest" :: STRING AS tx_digest, + TO_TIMESTAMP( + DATA :"timestampMs" + ) AS block_timestamp, + partition_key, + _inserted_timestamp, + SYSDATE() AS inserted_timestamp, + SYSDATE() AS modified_timestamp, + file_name, + '{{ invocation_id }}' AS _invocation_id, +FROM + +{% if is_incremental() %} +{{ ref('bronze__transactions') }} +{% else %} + {{ ref('bronze__transactions_FR') }} +{% endif %} +WHERE + DATA :error IS NULL + +{% if is_incremental() %} +AND _inserted_timestamp >= ( + SELECT + COALESCE(MAX(_INSERTED_TIMESTAMP), '1970-01-01' :: DATE) max_INSERTED_TIMESTAMP + FROM + {{ this }}) + {% endif %} + + qualify ROW_NUMBER() over ( + PARTITION BY tx_digest + ORDER BY + _inserted_timestamp DESC + ) = 1 diff --git a/models/streamline/core/realtime/streamline__checkpoints_realtime.sql b/models/streamline/core/realtime/streamline__checkpoints_realtime.sql new file mode 100644 index 0000000..bd5cb80 --- /dev/null +++ b/models/streamline/core/realtime/streamline__checkpoints_realtime.sql @@ -0,0 +1,56 @@ +{{ config ( + materialized = "view", + post_hook = fsc_utils.if_data_call_function_v2( + func = 'streamline.udf_bulk_rest_api_v2', + target = "{{this.schema}}.{{this.identifier}}", + params ={ "external_table" :"checkpoints", + "sql_limit" :"100000", + "producer_batch_size" :"100000", + "worker_batch_size" :"50000", + "sql_source" :"{{this.identifier}}", + "order_by_column": "checkpoint_number DESC" } + ), + tags = ['streamline_realtime'] +) }} + +WITH checks AS ( + + SELECT + checkpoint_number + FROM + {{ ref("streamline__checkpoints") }} + EXCEPT + SELECT + checkpoint_number + FROM + {{ ref("streamline__checkpoints_complete") }} +) +SELECT + checkpoint_number, + ROUND( + checkpoint_number, + -4 + ) :: INT AS partition_key, + {{ target.database }}.live.udf_api( + 'POST', + '{Service}/{Authentication}', + OBJECT_CONSTRUCT( + 'Content-Type', + 'application/json' + ), + OBJECT_CONSTRUCT( + 'jsonrpc', + '2.0', + 'id', + checkpoint_number, + 'method', + 'sui_getCheckpoint', + 'params', + ARRAY_CONSTRUCT( + checkpoint_number :: STRING + ) + ), + 'Vault/prod/sui/quicknode/mainnet' + ) AS request +FROM + checks diff --git a/models/streamline/core/realtime/streamline__transactions_realtime.sql b/models/streamline/core/realtime/streamline__transactions_realtime.sql new file mode 100644 index 0000000..8725549 --- /dev/null +++ b/models/streamline/core/realtime/streamline__transactions_realtime.sql @@ -0,0 +1,112 @@ +{{ config ( + materialized = "view", + post_hook = fsc_utils.if_data_call_function_v2( + func = 'streamline.udf_bulk_rest_api_v2', + target = "{{this.schema}}.{{this.identifier}}", + params ={ "external_table" :"transactions", + "sql_limit" :"100000", + "producer_batch_size" :"100000", + "worker_batch_size" :"50000", + "sql_source" :"{{this.identifier}}", + 'exploded_key': '["result"]', + "order_by_column": "checkpoint_number DESC" } + ), + tags = ['streamline_realtime'] +) }} + +WITH {# last_3_days AS ( + +SELECT + sequence_number +FROM + {{ ref("_sequence_lookback") }} +), +#} +txs AS ( + SELECT + A.tx_digest, + A.tx_index, + A.checkpoint_number, + A.block_timestamp + FROM + {{ ref("streamline__transactions") }} A + LEFT JOIN {{ ref("streamline__transactions_complete") }} + b + ON A.tx_digest = b.tx_digest + AND A.block_timestamp :: DATE = b.block_timestamp :: DATE + WHERE + b.tx_digest IS NULL {# AND sequence_number >= ( + SELECT + sequence_number + FROM + last_3_days +) #} +), +tx_grouped AS ( + SELECT + checkpoint_number, + block_timestamp, + FLOOR( + tx_index / 50 + ) grp, + ARRAY_AGG( + tx_digest + ) AS tx_param, + COUNT(1) AS tx_count_in_request + FROM + txs + GROUP BY + checkpoint_number, + block_timestamp, + grp +) +SELECT + checkpoint_number, + tx_count_in_request, + to_char( + block_timestamp, + 'YYYY_MM_DD_HH_MI_SS_FF3' + ) AS block_timestamp, + ROUND( + checkpoint_number, + -4 + ) :: INT AS partition_key, + {{ target.database }}.live.udf_api( + 'POST', + '{Service}/{Authentication}', + OBJECT_CONSTRUCT( + 'Content-Type', + 'application/json' + ), + OBJECT_CONSTRUCT( + 'jsonrpc', + '2.0', + 'id', + checkpoint_number, + 'method', + 'sui_multiGetTransactionBlocks', + 'params', + ARRAY_CONSTRUCT( + tx_param, + OBJECT_CONSTRUCT( + 'showInput', + TRUE, + 'showRawInput', + FALSE, + 'showEffects', + TRUE, + 'showEvents', + TRUE, + 'showRawEffects', + FALSE, + 'showObjectChanges', + TRUE, + 'showBalanceChanges', + TRUE + ) + ) + ), + 'Vault/prod/sui/quicknode/mainnet' + ) AS request +FROM + tx_grouped diff --git a/models/streamline/core/streamline__chainhead.sql b/models/streamline/core/streamline__chainhead.sql new file mode 100644 index 0000000..5f12898 --- /dev/null +++ b/models/streamline/core/streamline__chainhead.sql @@ -0,0 +1,27 @@ +{{ config ( + materialized = "view", + tags = ['streamline_view'] +) }} + +SELECT + {{ target.database }}.live.udf_api( + 'POST', + '{Service}/{Authentication}', + OBJECT_CONSTRUCT( + 'Content-Type', + 'application/json', + 'fsc-quantum-state', + 'livequery' + ), + OBJECT_CONSTRUCT( + 'jsonrpc', + '2.0', + 'id', + 1, + 'method', + 'sui_getLatestCheckpointSequenceNumber', + 'params', + ARRAY_CONSTRUCT() + ), + 'Vault/prod/sui/quicknode/mainnet' + ) :data: "result" :: INT AS checkpoint_number diff --git a/models/streamline/core/streamline__checkpoints.sql b/models/streamline/core/streamline__checkpoints.sql new file mode 100644 index 0000000..6007599 --- /dev/null +++ b/models/streamline/core/streamline__checkpoints.sql @@ -0,0 +1,20 @@ +{{ config ( + materialized = "view", + tags = ['streamline_view'] +) }} + +SELECT + _id AS checkpoint_number +FROM + {{ source( + 'crosschain_silver', + 'number_sequence' + ) }} +WHERE + _id >= 96605300 + AND _id <= ( + SELECT + MAX(checkpoint_number) + FROM + {{ ref('streamline__chainhead') }} + ) diff --git a/models/streamline/core/streamline__transactions.sql b/models/streamline/core/streamline__transactions.sql new file mode 100644 index 0000000..fb11140 --- /dev/null +++ b/models/streamline/core/streamline__transactions.sql @@ -0,0 +1,31 @@ +{{ config ( + materialized = "incremental", + unique_key = ['tx_digest'], + incremental_predicates = ["dynamic_range_predicate", "block_timestamp::date"], + merge_exclude_columns = ["inserted_timestamp"], + cluster_by = "block_timestamp::DATE", + tags = ['streamline_realtime'] +) }} + +SELECT + checkpoint_number, + block_timestamp, + b.index AS tx_index, + b.value :: STRING AS tx_digest, + SYSDATE() AS inserted_timestamp, + SYSDATE() AS modified_timestamp, + '{{ invocation_id }}' AS _invocation_id, +FROM + {{ ref("streamline__checkpoints_complete") }}, + LATERAL FLATTEN( + transactions_array + ) b + +{% if is_incremental() %} +WHERE + modified_timestamp >= ( + SELECT + COALESCE(MAX(modified_timestamp), '1970-01-01' :: DATE) modified_timestamp + FROM + {{ this }}) + {% endif %} diff --git a/packages.yml b/packages.yml new file mode 100644 index 0000000..da4652b --- /dev/null +++ b/packages.yml @@ -0,0 +1,7 @@ +packages: + - git: https://github.com/FlipsideCrypto/fsc-utils.git + revision: v1.37.0 + - package: get-select/dbt_snowflake_query_tags + version: [">=2.0.0", "<3.0.0"] + - package: dbt-labs/dbt_utils + version: 1.0.0 \ No newline at end of file diff --git a/profiles.yml b/profiles.yml new file mode 100644 index 0000000..908684a --- /dev/null +++ b/profiles.yml @@ -0,0 +1,29 @@ +sui: + target: dev + outputs: + dev: + type: snowflake + account: "{{ env_var('ACCOUNT') }}" + user: "{{ env_var('USER') }}" + password: "{{ env_var('PASSWORD') }}" + role: "{{ env_var('ROLE') }}" + schema: "{{ env_var('SCHEMA') }}" + region: "{{ env_var('REGION') }}" + database: "{{ env_var('DATABASE') }}" + warehouse: "{{ env_var('WAREHOUSE') }}" + threads: 8 + client_session_keep_alive: False + prod: + type: snowflake + account: "{{ env_var('ACCOUNT') }}" + user: "{{ env_var('USER') }}" + password: "{{ env_var('PASSWORD') }}" + role: "{{ env_var('ROLE') }}" + schema: "{{ env_var('SCHEMA') }}" + region: "{{ env_var('REGION') }}" + database: "{{ env_var('DATABASE') }}" + warehouse: "{{ env_var('WAREHOUSE') }}" + threads: 8 + client_session_keep_alive: False + config: + send_anonymous_usage_stats: False \ No newline at end of file diff --git a/python/slack_alert.py b/python/slack_alert.py new file mode 100644 index 0000000..2bb3b35 --- /dev/null +++ b/python/slack_alert.py @@ -0,0 +1,74 @@ +import requests +import os +import sys + +def create_message(): + """Creates a simple failure notification message with repo, workflow name, and URL""" + + # Get GitHub environment variables + repository = os.environ.get('GITHUB_REPOSITORY', 'Unknown repository') + repo_name = repository.split('/')[-1] if '/' in repository else repository + workflow_name = os.environ.get('GITHUB_WORKFLOW', 'Unknown workflow') + run_id = os.environ.get('GITHUB_RUN_ID', '') + server_url = os.environ.get('GITHUB_SERVER_URL', 'https://github.com') + + # Build the workflow URL + workflow_url = f"{server_url}/{repository}/actions/runs/{run_id}" + + message_body = { + "text": f"Failure in {repo_name}", + "attachments": [ + { + "color": "#f44336", # Red color for failures + "fields": [ + { + "title": "Repository", + "value": repository, + "short": True + }, + { + "title": "Workflow", + "value": workflow_name, + "short": True + } + ], + "actions": [ + { + "type": "butsui", + "text": "View Workflow Run", + "style": "primary", + "url": workflow_url + } + ], + "footer": "GitHub Actions" + } + ] + } + + return message_body + +def send_alert(webhook_url): + """Sends a failure notification to Slack""" + + message = create_message() + + try: + response = requests.post(webhook_url, json=message) + + if response.status_code == 200: + print("Successfully sent Slack notification") + else: + print(f"Failed to send Slack notification: {response.status_code} {response.text}") + sys.exit(1) + except Exception as e: + print(f"Error sending Slack notification: {str(e)}") + sys.exit(1) + +if __name__ == '__main__': + webhook_url = os.environ.get("SLACK_WEBHOOK_URL") + + if not webhook_url: + print("ERROR: SLACK_WEBHOOK_URL environment variable is required") + sys.exit(1) + + send_alert(webhook_url) \ No newline at end of file diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..0a98417 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,2 @@ +dbt-core>=1.9,<1.10 +dbt-snowflake>=1.9,<1.10 \ No newline at end of file