diff --git a/.github/workflows/dbt_run_adhoc.yml b/.github/workflows/dbt_run_adhoc.yml new file mode 100644 index 0000000..757ff1c --- /dev/null +++ b/.github/workflows/dbt_run_adhoc.yml @@ -0,0 +1,66 @@ +name: dbt_run_adhoc +run-name: dbt_run_adhoc + +on: + workflow_dispatch: + branches: + - "main" + inputs: + environment: + type: choice + description: DBT Run Environment + required: true + options: + - dev + - prod + default: dev + warehouse: + type: choice + description: Snowflake warehouse + required: true + options: + - DBT + - DBT_CLOUD + - DBT_EMERGENCY + default: DBT + dbt_command: + type: string + description: 'DBT Run Command' + required: true + +env: + DBT_PROFILES_DIR: ./ + + ACCOUNT: "${{ vars.ACCOUNT }}" + ROLE: "${{ vars.ROLE }}" + USER: "${{ vars.USER }}" + PASSWORD: "${{ secrets.PASSWORD }}" + REGION: "${{ vars.REGION }}" + DATABASE: "${{ vars.DATABASE }}" + WAREHOUSE: "${{ inputs.warehouse }}" + SCHEMA: "${{ vars.SCHEMA }}" + +concurrency: + group: ${{ github.workflow }} + +jobs: + run_dbt_jobs: + runs-on: ubuntu-latest + environment: + name: workflow_${{ inputs.environment }} + + steps: + - uses: actions/checkout@v3 + + - uses: actions/setup-python@v4 + with: + python-version: "3.10" + cache: "pip" + + - name: install dependencies + run: | + pip install -r requirements.txt + dbt deps + - name: Run DBT Jobs + run: | + ${{ inputs.dbt_command }} \ No newline at end of file diff --git a/.github/workflows/dbt_run_livequery_chainhead.yml b/.github/workflows/dbt_run_livequery_chainhead.yml new file mode 100644 index 0000000..158a144 --- /dev/null +++ b/.github/workflows/dbt_run_livequery_chainhead.yml @@ -0,0 +1,44 @@ +name: dbt_run_livequery_chainhead +run-name: dbt_run_livequery_chainhead + +on: + workflow_dispatch: + branches: + - "main" + +env: + DBT_PROFILES_DIR: ./ + + ACCOUNT: "${{ vars.ACCOUNT }}" + ROLE: "${{ vars.ROLE }}" + USER: "${{ vars.USER }}" + PASSWORD: "${{ secrets.PASSWORD }}" + REGION: "${{ vars.REGION }}" + DATABASE: "${{ vars.DATABASE }}" + WAREHOUSE: "${{ vars.WAREHOUSE }}" + SCHEMA: "${{ vars.SCHEMA }}" + +concurrency: + group: ${{ github.workflow }} + +jobs: + run_dbt_jobs: + runs-on: ubuntu-latest + environment: + name: workflow_prod + + steps: + - uses: actions/checkout@v3 + + - uses: actions/setup-python@v4 + with: + python-version: "3.10" + cache: "pip" + + - name: install dependencies + run: | + pip install -r requirements.txt + dbt deps + - name: Run DBT Jobs + run: | + dbt run -m "berachain_models,tag:streamline_core_complete" \ No newline at end of file diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..d99e9bd --- /dev/null +++ b/.gitignore @@ -0,0 +1,18 @@ + +target/ +dbt_modules/ +# newer versions of dbt use this directory instead of dbt_modules for test dependencies +dbt_packages/ +logs/ + +.venv/ +.python-version + +# Visual Studio Code files +*/.vscode +*.code-workspace +.history/ +**/.DS_Store +.vscode/ +.env +dbt-env/ \ No newline at end of file diff --git a/README.md b/README.md new file mode 100644 index 0000000..5c797d2 --- /dev/null +++ b/README.md @@ -0,0 +1,109 @@ +## Profile Set Up + +#### Use the following within profiles.yml +---- + +```yml +berachain: + target: dev + outputs: + dev: + type: snowflake + account: + role: + user: + password: + region: + database: BERACHAIN_DEV + warehouse: + schema: silver + threads: 12 + client_session_keep_alive: False + query_tag: + prod: + type: snowflake + account: + role: + user: + password: + region: + database: BERACHAIN + warehouse: + schema: silver + threads: 12 + client_session_keep_alive: False + query_tag: +``` + +### Variables + +To control the creation of UDF or SP macros with dbt run: +* UPDATE_UDFS_AND_SPS +When True, executes all macros included in the on-run-start hooks within dbt_project.yml on model run as normal +When False, none of the on-run-start macros are executed on model run + +Default values are False + +* Usage: +dbt run --vars '{"UPDATE_UDFS_AND_SPS":True}' -m ... + +To reload records in a curated complete table without a full-refresh, such as `silver_bridge.complete_bridge_activity`: +* HEAL_CURATED_MODEL +Default is an empty array [] +When item is included in var array [], incremental logic will be skipped for that CTE / code block +When item is not included in var array [] or does not match specified item in model, incremental logic will apply +Example set up: `{% if is_incremental() and 'axelar' not in var('HEAL_CURATED_MODEL') %}` + +* Usage: +Single CTE: dbt run --vars '{"HEAL_CURATED_MODEL":"axelar"}' -m ... +Multiple CTEs: dbt run --vars '{"HEAL_CURATED_MODEL":["axelar","across","celer_cbridge"]}' -m ... + + +### Resources: +- Learn more about dbt [in the docs](https://docs.getdbt.com/docs/introduction) +- Check out [Discourse](https://discourse.getdbt.com/) for commonly asked questions and answers +- Join the [chat](https://community.getdbt.com/) on Slack for live discussions and support +- Find [dbt events](https://events.getdbt.com) near you +- Check out [the blog](https://blog.getdbt.com/) for the latest news on dbt's development and best practices + +## Applying Model Tags + +### Database / Schema level tags + +Database and schema tags are applied via the `add_database_or_schema_tags` macro. These tags are inherited by their downstream objects. To add/modify tags call the appropriate tag set function within the macro. + +``` +{{ set_database_tag_value('SOME_DATABASE_TAG_KEY','SOME_DATABASE_TAG_VALUE') }} +{{ set_schema_tag_value('SOME_SCHEMA_TAG_KEY','SOME_SCHEMA_TAG_VALUE') }} +``` + +### Model tags + +To add/update a model's snowflake tags, add/modify the `meta` model property under `config`. Only table level tags are supported at this time via DBT. + +``` +{{ config( + ..., + meta={ + 'database_tags':{ + 'table': { + 'PURPOSE': 'SOME_PURPOSE' + } + } + }, + ... +) }} +``` + +By default, model tags are pushed to Snowflake on each load. You can disable this by setting the `UPDATE_SNOWFLAKE_TAGS` project variable to `False` during a run. + +``` +dbt run --vars '{"UPDATE_SNOWFLAKE_TAGS":False}' -s models/core/core__fact_blocks.sql +``` + +### Querying for existing tags on a model in snowflake + +``` +select * +from table(berchain.information_schema.tag_references('berchain.core.fact_blocks', 'table')); +``` \ No newline at end of file diff --git a/analysis/.gitkeep b/analysis/.gitkeep new file mode 100644 index 0000000..e69de29 diff --git a/data/.gitkeep b/data/.gitkeep new file mode 100644 index 0000000..e69de29 diff --git a/data/github_actions__workflows.csv b/data/github_actions__workflows.csv new file mode 100644 index 0000000..6128a16 --- /dev/null +++ b/data/github_actions__workflows.csv @@ -0,0 +1,2 @@ +workflow_name,workflow_schedule +dbt_run_streamline_chainhead,"3,33 * * * *" \ No newline at end of file diff --git a/dbt_project.yml b/dbt_project.yml new file mode 100644 index 0000000..4be2791 --- /dev/null +++ b/dbt_project.yml @@ -0,0 +1,69 @@ +# Name your project! Project names should contain only lowercase characters +# and underscores. A good package name should reflect your organization's +# name or the intended use of these models +name: "berachain_models" +version: "1.0.0" +config-version: 2 + +# This setting configures which "profile" dbt uses for this project. +profile: "berachain" + +# These configurations specify where dbt should look for different types of files. +# The `source-paths` config, for example, states that models in this project can be +# found in the "models/" directory. You probably won't need to change these! +model-paths: ["models"] +analysis-paths: ["analysis"] +test-paths: ["tests"] +seed-paths: ["data"] +macro-paths: ["macros"] +snapshot-paths: ["snapshots"] + +target-path: "target" # directory which will store compiled SQL files +clean-targets: # directories to be removed by `dbt clean` + - "target" + - "dbt_modules" + - "dbt_packages" + +tests: + +store_failures: true # all tests + +on-run-start: + - "{{ create_sps() }}" + - "{{ create_udfs() }}" + +on-run-end: + - '{{ apply_meta_as_tags(results) }}' + +dispatch: + - macro_namespace: dbt + search_order: + - berachain-models + - dbt_snowflake_query_tags + - dbt + +query-comment: + comment: '{{ dbt_snowflake_query_tags.get_query_comment(node) }}' + append: true # Snowflake removes prefixed comments. + +# Configuring models +# Full documentation: https://docs.getdbt.com/docs/configuring-models + +models: + +copy_grants: true + +on_schema_change: "append_new_columns" + +# In this example config, we tell dbt to build all models in the example/ directory +# as tables. These settings can be overridden in the individual model files +# using the `{{ config(...) }}` macro. + +vars: + "dbt_date:time_zone": GMT + STREAMLINE_INVOKE_STREAMS: False + STREAMLINE_USE_DEV_FOR_EXTERNAL_TABLES: False + UPDATE_UDFS_AND_SPS: False + UPDATE_SNOWFLAKE_TAGS: True + OBSERV_FULL_TEST: False + WAIT: 0 + HEAL_MODEL: False + HEAL_CURATED_MODEL: [] + START_GHA_TASKS: False \ No newline at end of file diff --git a/macros/.gitkeep b/macros/.gitkeep new file mode 100644 index 0000000..e69de29 diff --git a/macros/create_sps.sql b/macros/create_sps.sql new file mode 100644 index 0000000..0c993c4 --- /dev/null +++ b/macros/create_sps.sql @@ -0,0 +1,8 @@ +{% macro create_sps() %} + {% if var("UPDATE_UDFS_AND_SPS") %} + {% if target.database == 'BERACHAIN' %} + CREATE schema IF NOT EXISTS _internal; + {{ sp_create_prod_clone('_internal') }}; + {% endif %} + {% endif %} +{% endmacro %} diff --git a/macros/create_udfs.sql b/macros/create_udfs.sql new file mode 100644 index 0000000..bff35e9 --- /dev/null +++ b/macros/create_udfs.sql @@ -0,0 +1,14 @@ +{% macro create_udfs() %} + {% if var("UPDATE_UDFS_AND_SPS") %} + {% set sql %} + CREATE schema if NOT EXISTS silver; + + {{ create_udtf_get_base_table( + schema = "streamline" + ) }} + + {% endset %} + {% do run_query(sql) %} + {{- fsc_utils.create_udfs() -}} + {% endif %} +{% endmacro %} diff --git a/macros/custom_naming_macros.sql b/macros/custom_naming_macros.sql new file mode 100644 index 0000000..0f4a72c --- /dev/null +++ b/macros/custom_naming_macros.sql @@ -0,0 +1,11 @@ +{% macro generate_schema_name(custom_schema_name=none, node=none) -%} + {% set node_name = node.name %} + {% set split_name = node_name.split('__') %} + {{ split_name[0] | trim }} +{%- endmacro %} + +{% macro generate_alias_name(custom_alias_name=none, node=none) -%} + {% set node_name = node.name %} + {% set split_name = node_name.split('__') %} + {{ split_name[1] | trim }} +{%- endmacro %} diff --git a/macros/dbt/get_merge_sql.sql b/macros/dbt/get_merge_sql.sql new file mode 100644 index 0000000..8fefc01 --- /dev/null +++ b/macros/dbt/get_merge_sql.sql @@ -0,0 +1,44 @@ +{% macro get_merge_sql( + target, + source, + unique_key, + dest_columns, + incremental_predicates + ) -%} + {% set predicate_override = "" %} + {% if incremental_predicates [0] == "dynamic_range" %} + -- run some queries to dynamically determine the min + max of this 'input_column' in the new data + {% set input_column = incremental_predicates [1] %} + {% set get_limits_query %} + SELECT + MIN( + {{ input_column }} + ) AS lower_limit, + MAX( + {{ input_column }} + ) AS upper_limit + FROM + {{ source }} + + {% endset %} + {% set limits = run_query(get_limits_query) [0] %} + {% set lower_limit, + upper_limit = limits [0], + limits [1] %} + -- use those calculated min + max values to limit 'target' scan, to only the days with new data + {% set predicate_override %} + dbt_internal_dest.{{ input_column }} BETWEEN '{{ lower_limit }}' + AND '{{ upper_limit }}' {% endset %} + {% endif %} + + {% set predicates = [predicate_override] if predicate_override else incremental_predicates %} + -- standard merge from here + {% set merge_sql = dbt.get_merge_sql( + target, + source, + unique_key, + dest_columns, + predicates + ) %} + {{ return(merge_sql) }} +{% endmacro %} diff --git a/macros/dbt/get_tmp_relation_type.sql b/macros/dbt/get_tmp_relation_type.sql new file mode 100644 index 0000000..3bb7438 --- /dev/null +++ b/macros/dbt/get_tmp_relation_type.sql @@ -0,0 +1,8 @@ +{% macro dbt_snowflake_get_tmp_relation_type( + strategy, + unique_key, + language + ) %} + -- always table + {{ return('table') }} +{% endmacro %} diff --git a/macros/run_sp_create_prod_clone.sql b/macros/run_sp_create_prod_clone.sql new file mode 100644 index 0000000..a98ac92 --- /dev/null +++ b/macros/run_sp_create_prod_clone.sql @@ -0,0 +1,10 @@ +{% macro run_sp_create_prod_clone() %} + {% set clone_query %} + call berchain._internal.create_prod_clone( + 'berchain', + 'berchain_dev', + 'internal_dev' + ); +{% endset %} + {% do run_query(clone_query) %} +{% endmacro %} diff --git a/macros/sp_create_prod_clone.sql b/macros/sp_create_prod_clone.sql new file mode 100644 index 0000000..20ee897 --- /dev/null +++ b/macros/sp_create_prod_clone.sql @@ -0,0 +1,44 @@ +{% macro sp_create_prod_clone(target_schema) -%} + +create or replace procedure {{ target_schema }}.create_prod_clone(source_db_name string, destination_db_name string, role_name string) +returns boolean +language javascript +execute as caller +as +$$ + snowflake.execute({sqlText: `BEGIN TRANSACTION;`}); + try { + snowflake.execute({sqlText: `CREATE OR REPLACE DATABASE ${DESTINATION_DB_NAME} CLONE ${SOURCE_DB_NAME}`}); + snowflake.execute({sqlText: `DROP SCHEMA IF EXISTS ${DESTINATION_DB_NAME}._INTERNAL`}); /* this only needs to be in prod */ + + snowflake.execute({sqlText: `GRANT OWNERSHIP ON ALL SCHEMAS IN DATABASE ${DESTINATION_DB_NAME} TO ROLE ${ROLE_NAME} COPY CURRENT GRANTS;`}); + snowflake.execute({sqlText: `GRANT OWNERSHIP ON ALL FUNCTIONS IN DATABASE ${DESTINATION_DB_NAME} TO ROLE ${ROLE_NAME} COPY CURRENT GRANTS;`}); + snowflake.execute({sqlText: `GRANT OWNERSHIP ON ALL PROCEDURES IN DATABASE ${DESTINATION_DB_NAME} TO ROLE ${ROLE_NAME} COPY CURRENT GRANTS;`}); + snowflake.execute({sqlText: `GRANT OWNERSHIP ON ALL VIEWS IN DATABASE ${DESTINATION_DB_NAME} TO ROLE ${ROLE_NAME} COPY CURRENT GRANTS;`}); + snowflake.execute({sqlText: `GRANT OWNERSHIP ON ALL STAGES IN DATABASE ${DESTINATION_DB_NAME} TO ROLE ${ROLE_NAME} COPY CURRENT GRANTS;`}); + snowflake.execute({sqlText: `GRANT OWNERSHIP ON ALL TABLES IN DATABASE ${DESTINATION_DB_NAME} TO ROLE ${ROLE_NAME} COPY CURRENT GRANTS;`}); + snowflake.execute({sqlText: `GRANT OWNERSHIP ON FUTURE FUNCTIONS IN DATABASE ${DESTINATION_DB_NAME} TO ROLE ${ROLE_NAME};`}); + snowflake.execute({sqlText: `GRANT OWNERSHIP ON FUTURE PROCEDURES IN DATABASE ${DESTINATION_DB_NAME} TO ROLE ${ROLE_NAME};`}); + snowflake.execute({sqlText: `GRANT OWNERSHIP ON FUTURE VIEWS IN DATABASE ${DESTINATION_DB_NAME} TO ROLE ${ROLE_NAME};`}); + snowflake.execute({sqlText: `GRANT OWNERSHIP ON FUTURE STAGES IN DATABASE ${DESTINATION_DB_NAME} TO ROLE ${ROLE_NAME};`}); + snowflake.execute({sqlText: `GRANT OWNERSHIP ON FUTURE TABLES IN DATABASE ${DESTINATION_DB_NAME} TO ROLE ${ROLE_NAME};`}); + + snowflake.execute({sqlText: `GRANT OWNERSHIP ON DATABASE ${DESTINATION_DB_NAME} TO ROLE ${ROLE_NAME} COPY CURRENT GRANTS;`}) + + var existing_tags = snowflake.execute({sqlText: `SHOW TAGS IN DATABASE ${DESTINATION_DB_NAME};`}); + while (existing_tags.next()) { + var schema = existing_tags.getColumnValue(4); + var tag_name = existing_tags.getColumnValue(2) + snowflake.execute({sqlText: `GRANT OWNERSHIP ON TAG ${DESTINATION_DB_NAME}.${schema}.${tag_name} TO ROLE ${ROLE_NAME} COPY CURRENT GRANTS;`}); + } + + snowflake.execute({sqlText: `COMMIT;`}); + } catch (err) { + snowflake.execute({sqlText: `ROLLBACK;`}); + throw(err); + } + + return true +$$ + +{%- endmacro %} \ No newline at end of file diff --git a/macros/streamline/get_base_table_udtf.sql b/macros/streamline/get_base_table_udtf.sql new file mode 100644 index 0000000..a488d14 --- /dev/null +++ b/macros/streamline/get_base_table_udtf.sql @@ -0,0 +1,24 @@ +{% macro create_udtf_get_base_table(schema) %} +create or replace function {{ schema }}.udtf_get_base_table(max_height integer) +returns table (height number) +as +$$ + with base as ( + select + row_number() over ( + order by + seq4() + ) as id + from + table(generator(rowcount => 100000000)) + ) +select + id as height +from + base +where + id <= max_height +$$ +; + +{% endmacro %} \ No newline at end of file diff --git a/macros/streamline/models.sql b/macros/streamline/models.sql new file mode 100644 index 0000000..62dbf9d --- /dev/null +++ b/macros/streamline/models.sql @@ -0,0 +1,67 @@ +{% macro streamline_external_table_query_v2( + model, + partition_function + ) %} + WITH meta AS ( + SELECT + job_created_time AS _inserted_timestamp, + file_name, + {{ partition_function }} AS partition_key + FROM + TABLE( + information_schema.external_table_file_registration_history( + start_time => DATEADD('day', -3, CURRENT_TIMESTAMP()), + table_name => '{{ source( "bronze_streamline", model) }}') + ) A + ) + SELECT + s.*, + b.file_name, + _inserted_timestamp + FROM + {{ source( + "bronze_streamline", + model + ) }} + s + JOIN meta b + ON b.file_name = metadata$filename + AND b.partition_key = s.partition_key + WHERE + b.partition_key = s.partition_key + AND DATA :error IS NULL +{% endmacro %} + +{% macro streamline_external_table_FR_query_v2( + model, + partition_function + ) %} + WITH meta AS ( + SELECT + registered_on AS _inserted_timestamp, + file_name, + {{ partition_function }} AS partition_key + FROM + TABLE( + information_schema.external_table_files( + table_name => '{{ source( "bronze_streamline", model) }}' + ) + ) A + ) +SELECT + s.*, + b.file_name, + _inserted_timestamp +FROM + {{ source( + "bronze_streamline", + model + ) }} + s + JOIN meta b + ON b.file_name = metadata$filename + AND b.partition_key = s.partition_key +WHERE + b.partition_key = s.partition_key + AND DATA :error IS NULL +{% endmacro %} diff --git a/macros/tags/add_database_or_schema_tags.sql b/macros/tags/add_database_or_schema_tags.sql new file mode 100644 index 0000000..f0bf38b --- /dev/null +++ b/macros/tags/add_database_or_schema_tags.sql @@ -0,0 +1,4 @@ +{% macro add_database_or_schema_tags() %} + {{ set_database_tag_value('BLOCKCHAIN_NAME','BERACHAIN') }} + {{ set_database_tag_value('BLOCKCHAIN_TYPE','EVM') }} +{% endmacro %} \ No newline at end of file diff --git a/macros/tags/snowflake_tagging.sql b/macros/tags/snowflake_tagging.sql new file mode 100644 index 0000000..bc25e69 --- /dev/null +++ b/macros/tags/snowflake_tagging.sql @@ -0,0 +1,127 @@ +{% macro apply_meta_as_tags(results) %} + {% if var("UPDATE_SNOWFLAKE_TAGS") %} + {{ log('apply_meta_as_tags', info=False) }} + {{ log(results, info=False) }} + {% if execute %} + + {%- set tags_by_schema = {} -%} + {% for res in results -%} + {% if res.node.meta.database_tags %} + + {%- set model_database = res.node.database -%} + {%- set model_schema = res.node.schema -%} + {%- set model_schema_full = model_database+'.'+model_schema -%} + {%- set model_alias = res.node.alias -%} + + {% if model_schema_full not in tags_by_schema.keys() %} + {{ log('need to fetch tags for schema '+model_schema_full, info=False) }} + {%- call statement('main', fetch_result=True) -%} + show tags in {{model_database}}.{{model_schema}} + {%- endcall -%} + {%- set _ = tags_by_schema.update({model_schema_full: load_result('main')['table'].columns.get('name').values()|list}) -%} + {{ log('Added tags to cache', info=False) }} + {% else %} + {{ log('already have tag info for schema', info=False) }} + {% endif %} + + {%- set current_tags_in_schema = tags_by_schema[model_schema_full] -%} + {{ log('current_tags_in_schema:', info=False) }} + {{ log(current_tags_in_schema, info=False) }} + {{ log("========== Processing tags for "+model_schema_full+"."+model_alias+" ==========", info=False) }} + + {% set line -%} + node: {{ res.node.unique_id }}; status: {{ res.status }} (message: {{ res.message }}) + node full: {{ res.node}} + meta: {{ res.node.meta}} + materialized: {{ res.node.config.materialized }} + {%- endset %} + {{ log(line, info=False) }} + + {%- call statement('main', fetch_result=True) -%} + select LEVEL,UPPER(TAG_NAME) as TAG_NAME,TAG_VALUE from table(information_schema.tag_references_all_columns('{{model_schema}}.{{model_alias}}', 'table')) + {%- endcall -%} + {%- set existing_tags_for_table = load_result('main')['data'] -%} + {{ log('Existing tags for table:', info=False) }} + {{ log(existing_tags_for_table, info=False) }} + + {{ log('--', info=False) }} + {% for table_tag in res.node.meta.database_tags.table %} + + {{ create_tag_if_missing(current_tags_in_schema,table_tag|upper) }} + {% set desired_tag_value = res.node.meta.database_tags.table[table_tag] %} + + {{set_table_tag_value_if_different(model_schema,model_alias,table_tag,desired_tag_value,existing_tags_for_table)}} + {% endfor %} + {{ log("========== Finished processing tags for "+model_alias+" ==========", info=False) }} + {% endif %} + {% endfor %} + {% endif %} + {% endif %} +{% endmacro %} + + +{% macro create_tag_if_missing(all_tag_names,table_tag) %} + {% if table_tag not in all_tag_names %} + {{ log('Creating missing tag '+table_tag, info=False) }} + {%- call statement('main', fetch_result=True) -%} + create tag if not exists silver.{{table_tag}} + {%- endcall -%} + {{ log(load_result('main').data, info=False) }} + {% else %} + {{ log('Tag already exists: '+table_tag, info=False) }} + {% endif %} +{% endmacro %} + +{% macro set_table_tag_value_if_different(model_schema,table_name,tag_name,desired_tag_value,existing_tags) %} + {{ log('Ensuring tag '+tag_name+' has value '+desired_tag_value+' at table level', info=False) }} + {%- set existing_tag_for_table = existing_tags|selectattr('0','equalto','TABLE')|selectattr('1','equalto',tag_name|upper)|list -%} + {{ log('Filtered tags for table:', info=False) }} + {{ log(existing_tag_for_table[0], info=False) }} + {% if existing_tag_for_table|length > 0 and existing_tag_for_table[0][2]==desired_tag_value %} + {{ log('Correct tag value already exists', info=False) }} + {% else %} + {{ log('Setting tag value for '+tag_name+' to value '+desired_tag_value, info=False) }} + {%- call statement('main', fetch_result=True) -%} + alter table {{model_schema}}.{{table_name}} set tag {{tag_name}} = '{{desired_tag_value}}' + {%- endcall -%} + {{ log(load_result('main').data, info=False) }} + {% endif %} +{% endmacro %} + +{% macro set_column_tag_value_if_different(table_name,column_name,tag_name,desired_tag_value,existing_tags) %} + {{ log('Ensuring tag '+tag_name+' has value '+desired_tag_value+' at column level', info=False) }} + {%- set existing_tag_for_column = existing_tags|selectattr('0','equalto','COLUMN')|selectattr('1','equalto',tag_name|upper)|list -%} + {{ log('Filtered tags for column:', info=False) }} + {{ log(existing_tag_for_column[0], info=False) }} + {% if existing_tag_for_column|length > 0 and existing_tag_for_column[0][2]==desired_tag_value %} + {{ log('Correct tag value already exists', info=False) }} + {% else %} + {{ log('Setting tag value for '+tag_name+' to value '+desired_tag_value, info=False) }} + {%- call statement('main', fetch_result=True) -%} + alter table {{table_name}} modify column {{column_name}} set tag {{tag_name}} = '{{desired_tag_value}}' + {%- endcall -%} + {{ log(load_result('main').data, info=False) }} + {% endif %} +{% endmacro %} + +{% macro set_database_tag_value(tag_name,tag_value) %} + {% set query %} + create tag if not exists silver.{{tag_name}} + {% endset %} + {% do run_query(query) %} + {% set query %} + alter database {{target.database}} set tag {{target.database}}.silver.{{tag_name}} = '{{tag_value}}' + {% endset %} + {% do run_query(query) %} +{% endmacro %} + +{% macro set_schema_tag_value(target_schema,tag_name,tag_value) %} + {% set query %} + create tag if not exists silver.{{tag_name}} + {% endset %} + {% do run_query(query) %} + {% set query %} + alter schema {{target.database}}.{{target_schema}} set tag {{target.database}}.silver.{{tag_name}} = '{{tag_value}}' + {% endset %} + {% do run_query(query) %} +{% endmacro %} \ No newline at end of file diff --git a/macros/tests/missing_txs.sql b/macros/tests/missing_txs.sql new file mode 100644 index 0000000..293fb1b --- /dev/null +++ b/macros/tests/missing_txs.sql @@ -0,0 +1,103 @@ +{% macro missing_txs( + model + ) %} + WITH txs_base AS ( + SELECT + block_number AS base_block_number, + tx_hash AS base_tx_hash + FROM + {{ ref('test_silver__transactions_full') }} + ), + model_name AS ( + SELECT + block_number AS model_block_number, + tx_hash AS model_tx_hash + FROM + {{ model }} + ) +SELECT + base_block_number, + base_tx_hash, + model_block_number, + model_tx_hash +FROM + txs_base + LEFT JOIN model_name + ON base_block_number = model_block_number + AND base_tx_hash = model_tx_hash +WHERE + ( + model_tx_hash IS NULL + OR model_block_number IS NULL + ) +{% endmacro %} + +{% macro recent_missing_txs( + model + ) %} + WITH txs_base AS ( + SELECT + block_number AS base_block_number, + tx_hash AS base_tx_hash + FROM + {{ ref('test_silver__transactions_recent') }} + ), + model_name AS ( + SELECT + block_number AS model_block_number, + tx_hash AS model_tx_hash + FROM + {{ model }} + ) +SELECT + base_block_number, + base_tx_hash, + model_block_number, + model_tx_hash +FROM + txs_base + LEFT JOIN model_name + ON base_block_number = model_block_number + AND base_tx_hash = model_tx_hash +WHERE + model_tx_hash IS NULL + OR model_block_number IS NULL +{% endmacro %} + +{% macro missing_confirmed_txs( + model1, + model2 + ) %} + WITH txs_base AS ( + SELECT + block_number AS base_block_number, + block_hash AS base_block_hash, + tx_hash AS base_tx_hash + FROM + {{ model1 }} + ), + model_name AS ( + SELECT + block_number AS model_block_number, + block_hash AS model_block_hash, + tx_hash AS model_tx_hash + FROM + {{ model2 }} + ) +SELECT + DISTINCT base_block_number AS block_number +FROM + txs_base + LEFT JOIN model_name + ON base_block_number = model_block_number + AND base_tx_hash = model_tx_hash + AND base_block_hash = model_block_hash +WHERE + model_tx_hash IS NULL + AND model_block_number <= ( + SELECT + MAX(base_block_number) + FROM + txs_base + ) +{% endmacro %} diff --git a/macros/utils.sql b/macros/utils.sql new file mode 100644 index 0000000..2e3b3cb --- /dev/null +++ b/macros/utils.sql @@ -0,0 +1,42 @@ +{% macro if_data_call_wait() %} + {% if var( + "STREAMLINE_INVOKE_STREAMS" + ) %} + {% set query %} + SELECT + 1 + WHERE + EXISTS( + SELECT + 1 + FROM + {{ model.schema ~ "." ~ model.alias }} + LIMIT + 1 + ) {% endset %} + {% if execute %} + {% set results = run_query( + query + ) %} + {% if results %} + {{ log( + "Waiting...", + info = True + ) }} + + {% set wait_query %} + SELECT + system$wait( + {{ var( + "WAIT", + 600 + ) }} + ) {% endset %} + {% do run_query(wait_query) %} + {% else %} + SELECT + NULL; + {% endif %} + {% endif %} + {% endif %} +{% endmacro %} diff --git a/models/bronze/bronze_lq__get_block_by_number.sql b/models/bronze/bronze_lq__get_block_by_number.sql new file mode 100644 index 0000000..5ddd16c --- /dev/null +++ b/models/bronze/bronze_lq__get_block_by_number.sql @@ -0,0 +1,59 @@ +{{ config ( + materialized = "incremental", + unique_key = "block_number", + cluster_by = "ROUND(block_number, -3)", + full_refresh = false, + tags = ['streamline_core_complete'] +) }} + +WITH blocks_to_do AS ( + + SELECT + block_number, + block_number_hex + FROM + {{ ref('streamline__blocks') }} + +{% if is_incremental() %} +EXCEPT +SELECT + block_number, + block_number_hex +FROM + {{ this }} +{% endif %} +), +ordered_blocks AS ( + SELECT + block_number, + block_number_hex, + MOD( + block_number, + 10 + ) AS batch + FROM + blocks_to_do + ORDER BY + block_number DESC + LIMIT + 3000 +), batched AS ({% for item in range(11) %} +SELECT + block_number, block_number_hex, ROUND(block_number, -3) :: INT AS partition_key, {{ target.database }}.live.udf_api('POST', '{service}/{Authentication}', OBJECT_CONSTRUCT('Content-Type', 'application/json'), OBJECT_CONSTRUCT('id', block_number :: STRING, 'jsonrpc', '2.0', 'method', 'eth_getBlockByNumber', 'params', ARRAY_CONSTRUCT(block_number_hex, FALSE)), 'Vault/prod/berachain/quicknode/artio') AS node_call, SYSDATE() AS _inserted_timestamp +FROM + ordered_blocks +WHERE + batch = {{ item }} + + {% if not loop.last %} + UNION ALL + {% endif %} +{% endfor %}) +SELECT + block_number, + block_number_hex, + partition_key, + node_call, + _inserted_timestamp +FROM + batched diff --git a/models/bronze/bronze_lq__get_transaction_receipts.sql b/models/bronze/bronze_lq__get_transaction_receipts.sql new file mode 100644 index 0000000..185da1b --- /dev/null +++ b/models/bronze/bronze_lq__get_transaction_receipts.sql @@ -0,0 +1,64 @@ +{{ config ( + materialized = "incremental", + unique_key = ["block_number", "tx_hash"], + cluster_by = "ROUND(block_number, -3)", + full_refresh = false, + tags = ['streamline_core_complete'] +) }} + +WITH blocks_to_do AS ( + + SELECT + block_number, + tx_hash, + FROM + {{ ref('bronze_lq__tx_hashes') }} + +{% if is_incremental() %} +EXCEPT +SELECT + block_number, + tx_hash +FROM + {{ this }} +{% endif %} +), +ordered_blocks AS ( + SELECT + block_number, + tx_hash, + ROW_NUMBER() over ( + ORDER BY + block_number, + tx_hash + ) AS row_no, + MOD( + row_no, + 10 + ) AS batch + FROM + blocks_to_do + ORDER BY + block_number DESC + LIMIT + 3000 +), batched AS ({% for item in range(11) %} +SELECT + block_number, tx_hash, ROUND(block_number, -3) :: INT AS partition_key, {{ target.database }}.live.udf_api('POST', '{service}/{Authentication}', OBJECT_CONSTRUCT('Content-Type', 'application/json'), OBJECT_CONSTRUCT('id', block_number :: STRING, 'jsonrpc', '2.0', 'method', 'eth_getTransactionReceipt', 'params', ARRAY_CONSTRUCT(tx_hash)), 'Vault/prod/berachain/quicknode/artio') AS node_call, SYSDATE() AS _inserted_timestamp +FROM + ordered_blocks +WHERE + batch = {{ item }} + + {% if not loop.last %} + UNION ALL + {% endif %} +{% endfor %}) +SELECT + block_number, + tx_hash, + partition_key, + node_call, + _inserted_timestamp +FROM + batched diff --git a/models/bronze/bronze_lq__get_transactions.sql b/models/bronze/bronze_lq__get_transactions.sql new file mode 100644 index 0000000..15b8e58 --- /dev/null +++ b/models/bronze/bronze_lq__get_transactions.sql @@ -0,0 +1,64 @@ +{{ config ( + materialized = "incremental", + unique_key = ["block_number", "tx_hash"], + cluster_by = "ROUND(block_number, -3)", + full_refresh = false, + tags = ['streamline_core_complete'] +) }} + +WITH blocks_to_do AS ( + + SELECT + block_number, + tx_hash, + FROM + {{ ref('bronze_lq__tx_hashes') }} + +{% if is_incremental() %} +EXCEPT +SELECT + block_number, + tx_hash +FROM + {{ this }} +{% endif %} +), +ordered_blocks AS ( + SELECT + block_number, + tx_hash, + ROW_NUMBER() over ( + ORDER BY + block_number, + tx_hash + ) AS row_no, + MOD( + row_no, + 10 + ) AS batch + FROM + blocks_to_do + ORDER BY + block_number DESC + LIMIT + 3000 +), batched AS ({% for item in range(11) %} +SELECT + block_number, tx_hash, ROUND(block_number, -3) :: INT AS partition_key, {{ target.database }}.live.udf_api('POST', '{service}/{Authentication}', OBJECT_CONSTRUCT('Content-Type', 'application/json'), OBJECT_CONSTRUCT('id', block_number :: STRING, 'jsonrpc', '2.0', 'method', 'eth_getTransactionByHash', 'params', ARRAY_CONSTRUCT(tx_hash)), 'Vault/prod/berachain/quicknode/artio') AS node_call, SYSDATE() AS _inserted_timestamp +FROM + ordered_blocks +WHERE + batch = {{ item }} + + {% if not loop.last %} + UNION ALL + {% endif %} +{% endfor %}) +SELECT + block_number, + tx_hash, + partition_key, + node_call, + _inserted_timestamp +FROM + batched diff --git a/models/bronze/bronze_lq__tx_hashes.sql b/models/bronze/bronze_lq__tx_hashes.sql new file mode 100644 index 0000000..64f98aa --- /dev/null +++ b/models/bronze/bronze_lq__tx_hashes.sql @@ -0,0 +1,13 @@ +{{ config ( + materialized = "view", + tags = ['streamline_core_complete'] +) }} + +SELECT + block_number, + VALUE :: STRING AS tx_hash +FROM + {{ ref('bronze_lq__get_block_by_number') }}, + LATERAL FLATTEN ( + input => node_call :data :result :transactions + ) diff --git a/models/bronze/views/bronze__blocks.sql b/models/bronze/views/bronze__blocks.sql new file mode 100644 index 0000000..e44ad9e --- /dev/null +++ b/models/bronze/views/bronze__blocks.sql @@ -0,0 +1,10 @@ +{{ config ( + materialized = "view", + tags = ['streamline_core_complete'] +) }} + +SELECT + block_number, + node_call :data :result AS block_response +FROM + {{ ref('bronze_lq__get_block_by_number') }} diff --git a/models/bronze/views/bronze__receipts.sql b/models/bronze/views/bronze__receipts.sql new file mode 100644 index 0000000..9a29a89 --- /dev/null +++ b/models/bronze/views/bronze__receipts.sql @@ -0,0 +1,11 @@ +{{ config ( + materialized = "view", + tags = ['streamline_core_complete'] +) }} + +SELECT + block_number, + tx_hash, + node_call :data :result AS receipt_response +FROM + {{ ref('bronze_lq__get_transaction_receipts') }} diff --git a/models/bronze/views/bronze__transactions.sql b/models/bronze/views/bronze__transactions.sql new file mode 100644 index 0000000..a7bd43e --- /dev/null +++ b/models/bronze/views/bronze__transactions.sql @@ -0,0 +1,11 @@ +{{ config ( + materialized = "view", + tags = ['streamline_core_complete'] +) }} + +SELECT + block_number, + tx_hash, + node_call :data :result AS tx_response +FROM + {{ ref('bronze_lq__get_transactions') }} diff --git a/models/silver/silver__dummy.sql b/models/silver/silver__dummy.sql new file mode 100644 index 0000000..f006b7b --- /dev/null +++ b/models/silver/silver__dummy.sql @@ -0,0 +1,5 @@ +{{ config ( + materialized = "view" +) }} + +select 1 as dummy \ No newline at end of file diff --git a/models/sources.yml b/models/sources.yml new file mode 100644 index 0000000..3e7d471 --- /dev/null +++ b/models/sources.yml @@ -0,0 +1,13 @@ +version: 2 + +sources: + - name: silver_crosschain + database: "{{ 'crosschain' if target.database == 'BERACHAIN' else 'crosschain_dev' }}" + schema: silver + tables: + - name: number_sequence + - name: github_actions + database: berachain + schema: github_actions + tables: + - name: workflows diff --git a/models/streamline/streamline__blocks.sql b/models/streamline/streamline__blocks.sql new file mode 100644 index 0000000..5897f6d --- /dev/null +++ b/models/streamline/streamline__blocks.sql @@ -0,0 +1,19 @@ +{{ config ( + materialized = "view", + tags = ['streamline_core_complete'] +) }} + +SELECT + _id AS block_number, + REPLACE( + concat_ws('', '0x', to_char(block_number, 'XXXXXXXX')), + ' ', + '' + ) AS block_number_hex +FROM + {{ source( + 'silver_crosschain', + 'number_sequence' + ) }} +WHERE + _id between 762066 and 2172232 diff --git a/package-lock.yml b/package-lock.yml new file mode 100644 index 0000000..6588f36 --- /dev/null +++ b/package-lock.yml @@ -0,0 +1,16 @@ +packages: +- package: calogica/dbt_expectations + version: 0.8.2 +- package: dbt-labs/dbt_external_tables + version: 0.8.2 +- package: dbt-labs/dbt_utils + version: 1.0.0 +- git: https://github.com/FlipsideCrypto/fsc-utils.git + revision: 484e9db07d2060286768bb745e1b0e879178d43b +- package: get-select/dbt_snowflake_query_tags + version: 2.3.3 +- package: calogica/dbt_date + version: 0.7.2 +- git: https://github.com/FlipsideCrypto/livequery-models.git + revision: b024188be4e9c6bc00ed77797ebdc92d351d620e +sha1_hash: 5b45e0f95979d82d85fd603d44f5bf35be9a7064 diff --git a/packages.yml b/packages.yml new file mode 100644 index 0000000..6b10e79 --- /dev/null +++ b/packages.yml @@ -0,0 +1,11 @@ +packages: + - package: calogica/dbt_expectations + version: 0.8.2 + - package: dbt-labs/dbt_external_tables + version: 0.8.2 + - package: dbt-labs/dbt_utils + version: 1.0.0 + - git: https://github.com/FlipsideCrypto/fsc-utils.git + revision: v1.23.0 + - package: get-select/dbt_snowflake_query_tags + version: [">=2.0.0", "<3.0.0"] \ No newline at end of file diff --git a/profiles.yml b/profiles.yml new file mode 100644 index 0000000..c787be4 --- /dev/null +++ b/profiles.yml @@ -0,0 +1,31 @@ +berachain: + target: prod + outputs: + dev: + type: snowflake + account: "{{ env_var('ACCOUNT') }}" + role: "{{ env_var('ROLE') }}" + user: "{{ env_var('USER') }}" + password: "{{ env_var('PASSWORD') }}" + region: "{{ env_var('REGION') }}" + database: "{{ env_var('DATABASE') }}" + warehouse: "{{ env_var('WAREHOUSE') }}" + schema: SILVER + threads: 4 + client_session_keep_alive: False + query_tag: berachain_curator + prod: + type: snowflake + account: "{{ env_var('ACCOUNT') }}" + role: "{{ env_var('ROLE') }}" + user: "{{ env_var('USER') }}" + password: "{{ env_var('PASSWORD') }}" + region: "{{ env_var('REGION') }}" + database: "{{ env_var('DATABASE') }}" + warehouse: "{{ env_var('WAREHOUSE') }}" + schema: SILVER + threads: 4 + client_session_keep_alive: False + query_tag: berachain_curator + config: + send_anonymous_usage_stats: False \ No newline at end of file diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..39b82bb --- /dev/null +++ b/requirements.txt @@ -0,0 +1,2 @@ +dbt-snowflake>=1.7,<1.8 +protobuf==4.25.3 \ No newline at end of file diff --git a/snapshots/.gitkeep b/snapshots/.gitkeep new file mode 100644 index 0000000..e69de29