diff --git a/LICENSE b/LICENSE index 61f6df5..ce1c29d 100644 --- a/LICENSE +++ b/LICENSE @@ -1,6 +1,6 @@ MIT License -Copyright (c) 2024 Flipside Crypto +Copyright (c) 2023 Flipside Crypto Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal diff --git a/README.md b/README.md new file mode 100644 index 0000000..7cfa887 --- /dev/null +++ b/README.md @@ -0,0 +1,74 @@ +## Profile Set Up + +#### Use the following within profiles.yml +---- + +```yml +aleo: + target: dev + outputs: + dev: + type: snowflake + account: + role: + user: + password: + region: + database: ALEO_DEV + warehouse: + schema: silver + threads: 4 + client_session_keep_alive: False + query_tag: +``` + +### Resources: +- Learn more about dbt [in the docs](https://docs.getdbt.com/docs/introduction) +- Check out [Discourse](https://discourse.getdbt.com/) for commonly asked questions and answers +- Join the [chat](https://community.getdbt.com/) on Slack for live discussions and support +- Find [dbt events](https://events.getdbt.com) near you +- Check out [the blog](https://blog.getdbt.com/) for the latest news on dbt's development and best practices + +- Check out [the blog](https://blog.getdbt.com/) for the latest news on dbt's development and best practices + +## Applying Model Tags + +### Database / Schema level tags + +Database and schema tags are applied via the `add_database_or_schema_tags` macro. These tags are inherited by their downstream objects. To add/modify tags call the appropriate tag set function within the macro. + +``` +{{ set_database_tag_value('SOME_DATABASE_TAG_KEY','SOME_DATABASE_TAG_VALUE') }} +{{ set_schema_tag_value('SOME_SCHEMA_TAG_KEY','SOME_SCHEMA_TAG_VALUE') }} +``` + +### Model tags + +To add/update a model's snowflake tags, add/modify the `meta` model property under `config`. Only table level tags are supported at this time via DBT. + +``` +{{ config( + ..., + meta={ + 'database_tags':{ + 'table': { + 'PURPOSE': 'SOME_PURPOSE' + } + } + }, + ... +) }} +``` + +By default, model tags are not pushed to snowflake on each load. You can push a tag update for a model by specifying the `UPDATE_SNOWFLAKE_TAGS` project variable during a run. + +``` +dbt run --var '{"UPDATE_SNOWFLAKE_TAGS":True}' -s models/core/core__fact_swaps.sql +``` + +### Querying for existing tags on a model in snowflake + +``` +select * +from table(aleo.information_schema.tag_references('aleo.core.fact_blocks', 'table')); +``` \ No newline at end of file diff --git a/analyses/.gitkeep b/analyses/.gitkeep new file mode 100644 index 0000000..e69de29 diff --git a/data/.gitkeep b/data/.gitkeep new file mode 100644 index 0000000..e69de29 diff --git a/dbt_project.yml b/dbt_project.yml new file mode 100644 index 0000000..8e6d866 --- /dev/null +++ b/dbt_project.yml @@ -0,0 +1,114 @@ +# Name your project! Project names should contain only lowercase characters +# and underscores. A good package name should reflect your organization's +# name or the intended use of these models +name: "aleo_models" +version: "1.0.0" +config-version: 2 + +require-dbt-version: ">=1.8.0" + +# This setting configures which "profile" dbt uses for this project. +profile: "aleo" + +# These configurations specify where dbt should look for different types of files. +# The `model-paths` config, for example, states that models in this project can be +# found in the "models/" directory. You probably won't need to change these! +model-paths: ["models"] +analysis-paths: ["analysis"] +test-paths: ["tests"] +seed-paths: ["data"] +macro-paths: ["macros"] +snapshot-paths: ["snapshots"] + +target-path: "target" # directory which will store compiled SQL files +clean-targets: # directories to be removed by `dbt clean` + - "target" + - "dbt_modules" + - "dbt_packages" + +# Configuring models +# Full documentation: https://docs.getdbt.com/docs/configuring-models + +# In this example config, we tell dbt to build all models in the example/ directory +# as tables. These settings can be overridden in the individual model files +# using the `{{ config(...) }}` macro. +models: + +copy_grants: true + +persist_docs: + relation: true + columns: true + +on_schema_change: "append_new_columns" + livequery_models: + deploy: + core: + materialized: ephemeral + +tests: + +store_failures: true # all tests + +on-run-start: + - '{{create_sps()}}' + - '{{create_udfs()}}' + +on-run-end: + - '{{ apply_meta_as_tags(results) }}' + +dispatch: + - macro_namespace: dbt + search_order: + - aleo-models + - dbt_snowflake_query_tags + - dbt + +query-comment: + comment: '{{ dbt_snowflake_query_tags.get_query_comment(node) }}' + append: true # Snowflake removes prefixed comments. + +vars: + "dbt_date:time_zone": GMT + OBSERV_FULL_TEST: False + START_GHA_TASKS: False + STREAMLINE_INVOKE_STREAMS: False + STREAMLINE_RUN_HISTORY: False + STREAMLINE_RETRY_UNKNOWN: False + UPDATE_SNOWFLAKE_TAGS: True + UPDATE_UDFS_AND_SPS: True + BRONZE_LOOKBACK_DAYS: '{{ env_var("BRONZE_LOOKBACK_DAYS", 3) }}' + +#### STREAMLINE 2.0 BEGIN #### + + API_INTEGRATION: '{{ var("config")[target.name]["API_INTEGRATION"] if var("config")[target.name] else var("config")["dev"]["API_INTEGRATION"] }}' + EXTERNAL_FUNCTION_URI: '{{ var("config")[target.name]["EXTERNAL_FUNCTION_URI"] if var("config")[target.name] else var("config")["dev"]["EXTERNAL_FUNCTION_URI"] }}' + ROLES: '{{ var("config")[target.name]["ROLES"] }}' + + config: + # The keys correspond to dbt profiles and are case sensitive + dev: + API_INTEGRATION: aws_aleo_api_stg_v2 + EXTERNAL_FUNCTION_URI: gvmfebiq6g.execute-api.us-east-1.amazonaws.com/stg/ + ROLES: + - AWS_LAMBDA_ALEO_API + - INTERNAL_DEV + dev-2xl: + API_INTEGRATION: aws_aleo_api_stg_v2 + EXTERNAL_FUNCTION_URI: gvmfebiq6g.execute-api.us-east-1.amazonaws.com/stg/ + ROLES: + - AWS_LAMBDA_ALEO_API + - INTERNAL_DEV + + prod: + API_INTEGRATION: aws_aleo_api + EXTERNAL_FUNCTION_URI: vnmhcb1q2j.execute-api.us-east-1.amazonaws.com/prod/ + ROLES: + - AWS_LAMBDA_ALEO_API + - INTERNAL_DEV + - DBT_CLOUD_ALEO + prod-2xl: + API_INTEGRATION: aws_aleo_api + EXTERNAL_FUNCTION_URI: vnmhcb1q2j.execute-api.us-east-1.amazonaws.com/prod/ + ROLES: + - AWS_LAMBDA_ALEO_API + - INTERNAL_DEV + - DBT_CLOUD_ALEO + +#### STREAMLINE 2.0 END #### \ No newline at end of file diff --git a/docs/.gitkeep b/docs/.gitkeep new file mode 100644 index 0000000..e69de29 diff --git a/macros/.gitkeep b/macros/.gitkeep new file mode 100644 index 0000000..e69de29 diff --git a/macros/create_sps.sql b/macros/create_sps.sql new file mode 100644 index 0000000..19c09d8 --- /dev/null +++ b/macros/create_sps.sql @@ -0,0 +1,12 @@ +{% macro create_sps() %} + {% if target.database == 'aleo' %} + CREATE SCHEMA IF NOT EXISTS _internal; + {{ sp_create_prod_clone('_internal') }}; + {% endif %} +{% endmacro %} + +{% macro enable_search_optimization(schema_name, table_name, condition = '') %} + {% if target.database == 'aleo' %} + ALTER TABLE {{ schema_name }}.{{ table_name }} ADD SEARCH OPTIMIZATION {{ condition }} + {% endif %} +{% endmacro %} \ No newline at end of file diff --git a/macros/create_udfs.sql b/macros/create_udfs.sql new file mode 100644 index 0000000..56fe0ff --- /dev/null +++ b/macros/create_udfs.sql @@ -0,0 +1,2 @@ +{% macro create_udfs() %} +{% endmacro %} diff --git a/macros/custom_naming_macros.sql b/macros/custom_naming_macros.sql new file mode 100644 index 0000000..53fe8c7 --- /dev/null +++ b/macros/custom_naming_macros.sql @@ -0,0 +1,33 @@ +{% macro generate_schema_name( + custom_schema_name = none, + node = none + ) -%} + {% set node_name = node.name %} + {% set split_name = node_name.split('__') %} + {{ split_name [0] | trim }} +{%- endmacro %} + +{% macro generate_alias_name( + custom_alias_name = none, + node = none + ) -%} + {% set node_name = node.name %} + {% set split_name = node_name.split('__') %} + {% if split_name | length < 2 %} + {{ split_name [0] | trim }} + {% else %} + {{ split_name [1] | trim }} + {% endif %} +{%- endmacro %} + +{% macro generate_tmp_view_name(model_name) -%} + {% set node_name = model_name.name %} + {% set split_name = node_name.split('__') %} + {{ target.database ~ '.' ~ split_name[0] ~ '.' ~ split_name [1] ~ '__dbt_tmp' | trim }} +{%- endmacro %} + +{% macro generate_view_name(model_name) -%} + {% set node_name = model_name.name %} + {% set split_name = node_name.split('__') %} + {{ target.database ~ '.' ~ split_name[0] ~ '.' ~ split_name [1] | trim }} +{%- endmacro %} \ No newline at end of file diff --git a/macros/custom_query_tag.sql b/macros/custom_query_tag.sql new file mode 100644 index 0000000..809e1bb --- /dev/null +++ b/macros/custom_query_tag.sql @@ -0,0 +1,11 @@ +{% macro set_query_tag() -%} + {% set new_json = {"repo":project_name, "object":this.table, "profile":target.profile_name, "env":target.name, "existing_tag":get_current_query_tag() } %} +{% set new_query_tag = tojson(new_json) | as_text %} + {% if new_query_tag %} + {% set original_query_tag = get_current_query_tag() %} + {{ log("Setting query_tag to '" ~ new_query_tag ~ "'. Will reset to '" ~ original_query_tag ~ "' after materialization.") }} + {% do run_query("alter session set query_tag = '{}'".format(new_query_tag)) %} + {{ return(original_query_tag)}} + {% endif %} + {{ return(none)}} +{% endmacro %} \ No newline at end of file diff --git a/macros/run_sp_create_prod_clone.sql b/macros/run_sp_create_prod_clone.sql new file mode 100644 index 0000000..2fbd0eb --- /dev/null +++ b/macros/run_sp_create_prod_clone.sql @@ -0,0 +1,7 @@ +{% macro run_sp_create_prod_clone() %} +{% set clone_query %} +call aleo._internal.create_prod_clone('aleo', 'aleo_dev', 'internal_dev'); +{% endset %} + +{% do run_query(clone_query) %} +{% endmacro %} \ No newline at end of file diff --git a/macros/sp_create_prod_clone.sql b/macros/sp_create_prod_clone.sql new file mode 100644 index 0000000..20ee897 --- /dev/null +++ b/macros/sp_create_prod_clone.sql @@ -0,0 +1,44 @@ +{% macro sp_create_prod_clone(target_schema) -%} + +create or replace procedure {{ target_schema }}.create_prod_clone(source_db_name string, destination_db_name string, role_name string) +returns boolean +language javascript +execute as caller +as +$$ + snowflake.execute({sqlText: `BEGIN TRANSACTION;`}); + try { + snowflake.execute({sqlText: `CREATE OR REPLACE DATABASE ${DESTINATION_DB_NAME} CLONE ${SOURCE_DB_NAME}`}); + snowflake.execute({sqlText: `DROP SCHEMA IF EXISTS ${DESTINATION_DB_NAME}._INTERNAL`}); /* this only needs to be in prod */ + + snowflake.execute({sqlText: `GRANT OWNERSHIP ON ALL SCHEMAS IN DATABASE ${DESTINATION_DB_NAME} TO ROLE ${ROLE_NAME} COPY CURRENT GRANTS;`}); + snowflake.execute({sqlText: `GRANT OWNERSHIP ON ALL FUNCTIONS IN DATABASE ${DESTINATION_DB_NAME} TO ROLE ${ROLE_NAME} COPY CURRENT GRANTS;`}); + snowflake.execute({sqlText: `GRANT OWNERSHIP ON ALL PROCEDURES IN DATABASE ${DESTINATION_DB_NAME} TO ROLE ${ROLE_NAME} COPY CURRENT GRANTS;`}); + snowflake.execute({sqlText: `GRANT OWNERSHIP ON ALL VIEWS IN DATABASE ${DESTINATION_DB_NAME} TO ROLE ${ROLE_NAME} COPY CURRENT GRANTS;`}); + snowflake.execute({sqlText: `GRANT OWNERSHIP ON ALL STAGES IN DATABASE ${DESTINATION_DB_NAME} TO ROLE ${ROLE_NAME} COPY CURRENT GRANTS;`}); + snowflake.execute({sqlText: `GRANT OWNERSHIP ON ALL TABLES IN DATABASE ${DESTINATION_DB_NAME} TO ROLE ${ROLE_NAME} COPY CURRENT GRANTS;`}); + snowflake.execute({sqlText: `GRANT OWNERSHIP ON FUTURE FUNCTIONS IN DATABASE ${DESTINATION_DB_NAME} TO ROLE ${ROLE_NAME};`}); + snowflake.execute({sqlText: `GRANT OWNERSHIP ON FUTURE PROCEDURES IN DATABASE ${DESTINATION_DB_NAME} TO ROLE ${ROLE_NAME};`}); + snowflake.execute({sqlText: `GRANT OWNERSHIP ON FUTURE VIEWS IN DATABASE ${DESTINATION_DB_NAME} TO ROLE ${ROLE_NAME};`}); + snowflake.execute({sqlText: `GRANT OWNERSHIP ON FUTURE STAGES IN DATABASE ${DESTINATION_DB_NAME} TO ROLE ${ROLE_NAME};`}); + snowflake.execute({sqlText: `GRANT OWNERSHIP ON FUTURE TABLES IN DATABASE ${DESTINATION_DB_NAME} TO ROLE ${ROLE_NAME};`}); + + snowflake.execute({sqlText: `GRANT OWNERSHIP ON DATABASE ${DESTINATION_DB_NAME} TO ROLE ${ROLE_NAME} COPY CURRENT GRANTS;`}) + + var existing_tags = snowflake.execute({sqlText: `SHOW TAGS IN DATABASE ${DESTINATION_DB_NAME};`}); + while (existing_tags.next()) { + var schema = existing_tags.getColumnValue(4); + var tag_name = existing_tags.getColumnValue(2) + snowflake.execute({sqlText: `GRANT OWNERSHIP ON TAG ${DESTINATION_DB_NAME}.${schema}.${tag_name} TO ROLE ${ROLE_NAME} COPY CURRENT GRANTS;`}); + } + + snowflake.execute({sqlText: `COMMIT;`}); + } catch (err) { + snowflake.execute({sqlText: `ROLLBACK;`}); + throw(err); + } + + return true +$$ + +{%- endmacro %} \ No newline at end of file diff --git a/macros/streamline/api_integrations.sql b/macros/streamline/api_integrations.sql new file mode 100644 index 0000000..e4134d5 --- /dev/null +++ b/macros/streamline/api_integrations.sql @@ -0,0 +1,21 @@ +{% macro create_aws_aleo_api() %} + {{ log( + "Creating integration for target:" ~ target + ) }} + + {% if target.name == "prod" %} -- TODO WHEN PROD DEPLOYS + {% set sql %} + CREATE api integration IF NOT EXISTS aws_aleo_api_prod_v2 api_provider = aws_api_gateway api_aws_role_arn = 'arn:aws:iam::704693948482:role/aleo-api-stg-rolesnowflakeudfsAF733095-JHFljSYqZXeT' api_allowed_prefixes = ( + 'https://gvmfebiq6g.execute-api.us-east-1.amazonaws.com/stg/' + ) enabled = TRUE; +{% endset %} + {% do run_query(sql) %} + {% else %} + {% set sql %} + CREATE api integration IF NOT EXISTS aws_aleo_api_stg_v2 api_provider = aws_api_gateway api_aws_role_arn = 'arn:aws:iam::704693948482:role/aleo-api-stg-rolesnowflakeudfsAF733095-JHFljSYqZXeT' api_allowed_prefixes = ( + 'https://gvmfebiq6g.execute-api.us-east-1.amazonaws.com/stg/' + ) enabled = TRUE; +{% endset %} + {% do run_query(sql) %} + {% endif %} +{% endmacro %} diff --git a/macros/streamline/models.sql b/macros/streamline/models.sql new file mode 100644 index 0000000..97b78fd --- /dev/null +++ b/macros/streamline/models.sql @@ -0,0 +1,70 @@ +{% macro streamline_external_table_query_v2( + model, + partition_function + ) %} + + {% set days = var("BRONZE_LOOKBACK_DAYS")%} + + WITH meta AS ( + SELECT + last_modified AS inserted_timestamp, + file_name, + {{ partition_function }} AS partition_key + FROM + TABLE( + information_schema.external_table_file_registration_history( + start_time => DATEADD('day', -ABS({{days}}), CURRENT_TIMESTAMP()), + table_name => '{{ source( "bronze_streamline", model) }}') + ) A + ) + SELECT + s.*, + b.file_name, + inserted_timestamp + FROM + {{ source( + "bronze_streamline", + model + ) }} + s + JOIN meta b + ON b.file_name = metadata$filename + AND b.partition_key = s.partition_key + WHERE + b.partition_key = s.partition_key + AND DATA :error IS NULL +{% endmacro %} + +{% macro streamline_external_table_FR_query_v2( + model, + partition_function + ) %} + WITH meta AS ( + SELECT + registered_on AS inserted_timestamp, + file_name, + {{ partition_function }} AS partition_key + FROM + TABLE( + information_schema.external_table_files( + table_name => '{{ source( "bronze_streamline", model) }}' + ) + ) A + ) +SELECT + s.*, + b.file_name, + inserted_timestamp +FROM + {{ source( + "bronze_streamline", + model + ) }} + s + JOIN meta b + ON b.file_name = metadata$filename + AND b.partition_key = s.partition_key +WHERE + b.partition_key = s.partition_key + AND DATA :error IS NULL +{% endmacro %} diff --git a/macros/streamline/streamline_udfs.sql b/macros/streamline/streamline_udfs.sql new file mode 100644 index 0000000..69ca947 --- /dev/null +++ b/macros/streamline/streamline_udfs.sql @@ -0,0 +1,10 @@ +{% macro create_udf_bulk_rest_api_v2() %} + CREATE + OR REPLACE EXTERNAL FUNCTION streamline.udf_bulk_rest_api_v2( + json OBJECT + ) returns ARRAY api_integration = {% if target.name == "prod" %} + aws_aleo_api_prod_v2 AS 'https://gvmfebiq6g.execute-api.us-east-1.amazonaws.com/stg/udf_bulk_rest_api' -- CHANGE TO PROD URL WHEN DEPLOYED + {% else %} + aws_aleo_api_stg_v2 AS 'https://gvmfebiq6g.execute-api.us-east-1.amazonaws.com/stg/udf_bulk_rest_api' + {%- endif %}; +{% endmacro %} diff --git a/macros/tags/add_database_or_schema_tags.sql b/macros/tags/add_database_or_schema_tags.sql new file mode 100644 index 0000000..66f17de --- /dev/null +++ b/macros/tags/add_database_or_schema_tags.sql @@ -0,0 +1,10 @@ +{% macro add_database_or_schema_tags() %} + {{ set_database_tag_value( + 'BLOCKCHAIN_NAME', + 'aleo' + ) }} + {{ set_database_tag_value( + 'BLOCKCHAIN_TYPE', + 'IBC' + ) }} +{% endmacro %} diff --git a/macros/tags/snowflake_tagging.sql b/macros/tags/snowflake_tagging.sql new file mode 100644 index 0000000..bc25e69 --- /dev/null +++ b/macros/tags/snowflake_tagging.sql @@ -0,0 +1,127 @@ +{% macro apply_meta_as_tags(results) %} + {% if var("UPDATE_SNOWFLAKE_TAGS") %} + {{ log('apply_meta_as_tags', info=False) }} + {{ log(results, info=False) }} + {% if execute %} + + {%- set tags_by_schema = {} -%} + {% for res in results -%} + {% if res.node.meta.database_tags %} + + {%- set model_database = res.node.database -%} + {%- set model_schema = res.node.schema -%} + {%- set model_schema_full = model_database+'.'+model_schema -%} + {%- set model_alias = res.node.alias -%} + + {% if model_schema_full not in tags_by_schema.keys() %} + {{ log('need to fetch tags for schema '+model_schema_full, info=False) }} + {%- call statement('main', fetch_result=True) -%} + show tags in {{model_database}}.{{model_schema}} + {%- endcall -%} + {%- set _ = tags_by_schema.update({model_schema_full: load_result('main')['table'].columns.get('name').values()|list}) -%} + {{ log('Added tags to cache', info=False) }} + {% else %} + {{ log('already have tag info for schema', info=False) }} + {% endif %} + + {%- set current_tags_in_schema = tags_by_schema[model_schema_full] -%} + {{ log('current_tags_in_schema:', info=False) }} + {{ log(current_tags_in_schema, info=False) }} + {{ log("========== Processing tags for "+model_schema_full+"."+model_alias+" ==========", info=False) }} + + {% set line -%} + node: {{ res.node.unique_id }}; status: {{ res.status }} (message: {{ res.message }}) + node full: {{ res.node}} + meta: {{ res.node.meta}} + materialized: {{ res.node.config.materialized }} + {%- endset %} + {{ log(line, info=False) }} + + {%- call statement('main', fetch_result=True) -%} + select LEVEL,UPPER(TAG_NAME) as TAG_NAME,TAG_VALUE from table(information_schema.tag_references_all_columns('{{model_schema}}.{{model_alias}}', 'table')) + {%- endcall -%} + {%- set existing_tags_for_table = load_result('main')['data'] -%} + {{ log('Existing tags for table:', info=False) }} + {{ log(existing_tags_for_table, info=False) }} + + {{ log('--', info=False) }} + {% for table_tag in res.node.meta.database_tags.table %} + + {{ create_tag_if_missing(current_tags_in_schema,table_tag|upper) }} + {% set desired_tag_value = res.node.meta.database_tags.table[table_tag] %} + + {{set_table_tag_value_if_different(model_schema,model_alias,table_tag,desired_tag_value,existing_tags_for_table)}} + {% endfor %} + {{ log("========== Finished processing tags for "+model_alias+" ==========", info=False) }} + {% endif %} + {% endfor %} + {% endif %} + {% endif %} +{% endmacro %} + + +{% macro create_tag_if_missing(all_tag_names,table_tag) %} + {% if table_tag not in all_tag_names %} + {{ log('Creating missing tag '+table_tag, info=False) }} + {%- call statement('main', fetch_result=True) -%} + create tag if not exists silver.{{table_tag}} + {%- endcall -%} + {{ log(load_result('main').data, info=False) }} + {% else %} + {{ log('Tag already exists: '+table_tag, info=False) }} + {% endif %} +{% endmacro %} + +{% macro set_table_tag_value_if_different(model_schema,table_name,tag_name,desired_tag_value,existing_tags) %} + {{ log('Ensuring tag '+tag_name+' has value '+desired_tag_value+' at table level', info=False) }} + {%- set existing_tag_for_table = existing_tags|selectattr('0','equalto','TABLE')|selectattr('1','equalto',tag_name|upper)|list -%} + {{ log('Filtered tags for table:', info=False) }} + {{ log(existing_tag_for_table[0], info=False) }} + {% if existing_tag_for_table|length > 0 and existing_tag_for_table[0][2]==desired_tag_value %} + {{ log('Correct tag value already exists', info=False) }} + {% else %} + {{ log('Setting tag value for '+tag_name+' to value '+desired_tag_value, info=False) }} + {%- call statement('main', fetch_result=True) -%} + alter table {{model_schema}}.{{table_name}} set tag {{tag_name}} = '{{desired_tag_value}}' + {%- endcall -%} + {{ log(load_result('main').data, info=False) }} + {% endif %} +{% endmacro %} + +{% macro set_column_tag_value_if_different(table_name,column_name,tag_name,desired_tag_value,existing_tags) %} + {{ log('Ensuring tag '+tag_name+' has value '+desired_tag_value+' at column level', info=False) }} + {%- set existing_tag_for_column = existing_tags|selectattr('0','equalto','COLUMN')|selectattr('1','equalto',tag_name|upper)|list -%} + {{ log('Filtered tags for column:', info=False) }} + {{ log(existing_tag_for_column[0], info=False) }} + {% if existing_tag_for_column|length > 0 and existing_tag_for_column[0][2]==desired_tag_value %} + {{ log('Correct tag value already exists', info=False) }} + {% else %} + {{ log('Setting tag value for '+tag_name+' to value '+desired_tag_value, info=False) }} + {%- call statement('main', fetch_result=True) -%} + alter table {{table_name}} modify column {{column_name}} set tag {{tag_name}} = '{{desired_tag_value}}' + {%- endcall -%} + {{ log(load_result('main').data, info=False) }} + {% endif %} +{% endmacro %} + +{% macro set_database_tag_value(tag_name,tag_value) %} + {% set query %} + create tag if not exists silver.{{tag_name}} + {% endset %} + {% do run_query(query) %} + {% set query %} + alter database {{target.database}} set tag {{target.database}}.silver.{{tag_name}} = '{{tag_value}}' + {% endset %} + {% do run_query(query) %} +{% endmacro %} + +{% macro set_schema_tag_value(target_schema,tag_name,tag_value) %} + {% set query %} + create tag if not exists silver.{{tag_name}} + {% endset %} + {% do run_query(query) %} + {% set query %} + alter schema {{target.database}}.{{target_schema}} set tag {{target.database}}.silver.{{tag_name}} = '{{tag_value}}' + {% endset %} + {% do run_query(query) %} +{% endmacro %} \ No newline at end of file diff --git a/macros/tests/compare_model_subset.sql b/macros/tests/compare_model_subset.sql new file mode 100644 index 0000000..18aa624 --- /dev/null +++ b/macros/tests/compare_model_subset.sql @@ -0,0 +1,29 @@ +{% test compare_model_subset(model, compare_model, compare_columns, model_condition) %} + +{% set compare_cols_csv = compare_columns | join(', ') %} + +with a as ( + select {{compare_cols_csv}} from {{ model }} + {{ model_condition }} +), +b as ( + select {{compare_cols_csv}} from {{ compare_model }} +), +a_minus_b as ( + select * from a + except + select * from b +), +b_minus_a as ( + select * from b + except + select * from a +), +unioned as ( + select 'in_actual_not_in_expected' as which_diff, a_minus_b.* from a_minus_b + union all + select 'in_expected_not_in_actual' as which_diff, b_minus_a.* from b_minus_a +) +select * from unioned + +{% endtest %} \ No newline at end of file diff --git a/macros/tests/sequence_gaps.sql b/macros/tests/sequence_gaps.sql new file mode 100644 index 0000000..84a9aa9 --- /dev/null +++ b/macros/tests/sequence_gaps.sql @@ -0,0 +1,37 @@ +{% macro sequence_gaps( + table, + partition_by, + column + ) %} + {%- set partition_sql = partition_by | join(", ") -%} + {%- set previous_column = "prev_" ~ column -%} + WITH source AS ( + SELECT + {{ partition_sql + "," if partition_sql }} + {{ column }}, + LAG( + {{ column }}, + 1 + ) over ( + {{ "PARTITION BY " ~ partition_sql if partition_sql }} + ORDER BY + {{ column }} ASC + ) AS {{ previous_column }} + FROM + {{ table }} + WHERE + block_timestamp::date <= current_date - 1 + ) +SELECT + {{ partition_sql + "," if partition_sql }} + {{ previous_column }}, + {{ column }}, + {{ column }} - {{ previous_column }} + - 1 AS gap +FROM + source +WHERE + {{ column }} - {{ previous_column }} <> 1 +ORDER BY + gap DESC +{% endmacro %} diff --git a/macros/tests/tx_gaps.sql b/macros/tests/tx_gaps.sql new file mode 100644 index 0000000..82b449f --- /dev/null +++ b/macros/tests/tx_gaps.sql @@ -0,0 +1,33 @@ +{% macro tx_gaps( + model + ) %} + WITH block_base AS ( + SELECT + block_id, + tx_count + FROM + {{ ref('silver__blocks') }} + ), + model_name AS ( + SELECT + block_id, + COUNT( + DISTINCT tx_id + ) AS model_tx_count + FROM + {{ model }} + GROUP BY + block_id + ) +SELECT + block_base.block_id, + tx_count, + model_name.block_id, + model_tx_count +FROM + block_base + LEFT JOIN model_name + ON block_base.block_id = model_name.block_id +WHERE + tx_count <> model_tx_count +{% endmacro %} diff --git a/models/bronze/bronze__blocks.sql b/models/bronze/bronze__blocks.sql new file mode 100644 index 0000000..766be13 --- /dev/null +++ b/models/bronze/bronze__blocks.sql @@ -0,0 +1,7 @@ +{{ config ( + materialized = 'view' +) }} +{{ streamline_external_table_query_v2( + model = "blocks", + partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 4), '_', 1) AS INTEGER )" +) }} diff --git a/models/bronze/bronze__blocks_FR.sql b/models/bronze/bronze__blocks_FR.sql new file mode 100644 index 0000000..291a12c --- /dev/null +++ b/models/bronze/bronze__blocks_FR.sql @@ -0,0 +1,7 @@ +{{ config ( + materialized = 'view' +) }} +{{ streamline_external_table_FR_query_v2( + model = "blocks", + partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 4), '_', 1) AS INTEGER )" +) }} diff --git a/models/bronze/bronze__transactions.sql b/models/bronze/bronze__transactions.sql new file mode 100644 index 0000000..a7af4cc --- /dev/null +++ b/models/bronze/bronze__transactions.sql @@ -0,0 +1,7 @@ +{{ config ( + materialized = 'view' +) }} +{{ streamline_external_table_query_v2( + model = "transactions", + partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 4), '_', 1) AS INTEGER )" +) }} diff --git a/models/bronze/bronze__transactions_FR.sql b/models/bronze/bronze__transactions_FR.sql new file mode 100644 index 0000000..eeba24a --- /dev/null +++ b/models/bronze/bronze__transactions_FR.sql @@ -0,0 +1,7 @@ +{{ config ( + materialized = 'view' +) }} +{{ streamline_external_table_FR_query_v2( + model = 'transactions', + partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 4), '_', 1) AS INTEGER )" +) }} diff --git a/models/bronze/bronze_api/bronze__programs.sql b/models/bronze/bronze_api/bronze__programs.sql new file mode 100644 index 0000000..acb4f76 --- /dev/null +++ b/models/bronze/bronze_api/bronze__programs.sql @@ -0,0 +1,41 @@ +{{ config( + materialized = 'table', + unique_key = "program_id", + tags = ['core','full_test'] +) }} + +WITH programs AS ( + SELECT + block_id, + block_timestamp, + REGEXP_SUBSTR(tx_data :program :: STRING, 'program\\s+(\\S+);', 1, 1, 'e', 1) AS program_id, + tx_data + FROM + {{ ref('silver__transactions') }} + WHERE + tx_type = 'deploy' +), +mappings AS ( + SELECT + block_id, + block_timestamp, + program_id, + {{ target.database }}.live.udf_api( + 'GET', + '{Service}/program/' || program_id || '/mappings', + OBJECT_CONSTRUCT( + 'Content-Type', + 'application/json' + ),{}, + 'Vault/dev/aleo/mainnet' + ) :data AS mappings, + tx_data + FROM programs +) +SELECT + block_id, + block_timestamp, + program_id, + mappings, + tx_data +FROM mappings \ No newline at end of file diff --git a/models/descriptions/__overview__.md b/models/descriptions/__overview__.md new file mode 100644 index 0000000..d063315 --- /dev/null +++ b/models/descriptions/__overview__.md @@ -0,0 +1,74 @@ +{% docs __overview__ %} + +# Welcome to the Flipside Crypto aleo Models Documentation + +## **What does this documentation cover?** +The documentation included here details the design of the aleo + tables and views available via [Flipside Crypto.](https://flipsidecrypto.aleo/) For more information on how these models are built, please see [the github repository.](https://github.com/flipsideCrypto/aleo-models/) + +## **How do I use these docs?** +The easiest way to navigate this documentation is to use the Quick Links below. These links will take you to the documentation for each table, which contains a description, a list of the columns, and other helpful information. + +If you are experienced with dbt docs, feel free to use the sidebar to navigate the documentation, as well as explore the relationships between tables and the logic building them. + +There is more information on how to use dbt docs in the last section of this document. + +## **Quick Links to Table Documentation** + +**Click on the links below to jump to the documentation for each schema.** + +### Core Tables (`aleo`.`CORE`.``) + +**Dimension Tables:** + + +**Fact Tables:** + + + +**Convenience Tables:** + + + + +## **Data Model Overview** + +The aleo + models are built a few different ways, but the core fact tables are built using three layers of sql models: **bronze, silver, and gold (or core).** + +- Bronze: Data is loaded in from the source as a view +- Silver: All necessary parsing, filtering, de-duping, and other transformations are done here +- Gold (or core): Final views and tables that are available publicly + +The dimension tables are sourced from a variety of on-chain and off-chain sources. + +Convenience views (denoted ez_) are a combination of different fact and dimension tables. These views are built to make it easier to query the data. + +## **Using dbt docs** +### Navigation + +You can use the ```Project``` and ```Database``` navigation tabs on the left side of the window to explore the models in the project. + +### Database Tab + +This view shows relations (tables and views) grouped into database schemas. Note that ephemeral models are *not* shown in this interface, as they do not exist in the database. + +### Graph Exploration + +You can click the blue icon on the bottom-right corner of the page to view the lineage graph of your models. + +On model pages, you'll see the immediate parents and children of the model you're exploring. By clicking the Expand button at the top-right of this lineage pane, you'll be able to see all of the models that are used to build, or are built from, the model you're exploring. + +Once expanded, you'll be able to use the ```--models``` and ```--exclude``` model selection syntax to filter the models in the graph. For more information on model selection, check out the [dbt docs](https://docs.getdbt.com/docs/model-selection-syntax). + +Note that you can also right-click on models to interactively filter and explore the graph. + + +### **More information** +- [Flipside](https://flipsidecrypto.aleo/) +- [Velocity](https://app.flipsidecrypto.com/velocity?nav=Discover) +- [Tutorials](https://docs.flipsidecrypto.com/our-data/tutorials) +- [Github](https://github.com/FlipsideCrypto/aleo-models) +- [What is dbt?](https://docs.getdbt.com/docs/introduction) + +{% enddocs %} \ No newline at end of file diff --git a/models/gold/core/core__dim_programs.sql b/models/gold/core/core__dim_programs.sql new file mode 100644 index 0000000..57f80ba --- /dev/null +++ b/models/gold/core/core__dim_programs.sql @@ -0,0 +1,24 @@ +{{ config( + materialized = 'incremental', + incremental_predicates = ['DBT_INTERNAL_DEST.block_timestamp::DATE >= (select min(block_timestamp::DATE) from ' ~ generate_tmp_view_name(this) ~ ')'], + unique_key = ['program_id'], + incremental_strategy = 'merge', + cluster_by = 'deployment_block_timestamp::DATE', + tags = ['core','full_test'] +) }} + +SELECT + deployment_block_id, + deployment_block_timestamp, + program_id, + edition, + program, + mappings, + verifying_keys, + {{ dbt_utils.generate_surrogate_key( + ['program_id'] + ) }} AS complete_program_id, + SYSDATE() AS insert_timestamp, + SYSDATE() AS modified_timestamp, + '{{ invocation_id }}' AS invocation_id +FROM {{ ref('silver__programs') }} \ No newline at end of file diff --git a/models/gold/core/core__fact_blocks.sql b/models/gold/core/core__fact_blocks.sql new file mode 100644 index 0000000..6c80c5c --- /dev/null +++ b/models/gold/core/core__fact_blocks.sql @@ -0,0 +1,34 @@ +{{ config( + materialized = 'incremental', + incremental_predicates = ['DBT_INTERNAL_DEST.block_timestamp::DATE >= (select min(block_timestamp::DATE) from ' ~ generate_tmp_view_name(this) ~ ')'], + unique_key = "block_id", + incremental_strategy = 'merge', + merge_exclude_columns = ["inserted_timestamp"], + cluster_by = ['block_timestamp::DATE'], + tags = ['core', 'full_test'] +) }} + +SELECT + block_id, + block_timestamp, + network_id, + tx_count, + proving_round, + prover_rounds, + block_rewards, + puzzle_solutions, + {{ dbt_utils.generate_surrogate_key(['block_id']) }} AS fact_blocks_id, + SYSDATE() AS inserted_timestamp, + SYSDATE() AS modified_timestamp, + '{{ invocation_id }}' AS _invocation_id +FROM + {{ ref('silver__blocks') }} + +{% if is_incremental() %} +WHERE + block_timestamp >= DATEADD( + 'hour', + -1, + (SELECT MAX(block_timestamp) FROM {{ this }}) + ) +{% endif %} \ No newline at end of file diff --git a/models/gold/core/core__fact_transactions.sql b/models/gold/core/core__fact_transactions.sql new file mode 100644 index 0000000..85f17e6 --- /dev/null +++ b/models/gold/core/core__fact_transactions.sql @@ -0,0 +1,39 @@ +{{ config( + materialized = 'incremental', + incremental_predicates = ['DBT_INTERNAL_DEST.block_timestamp::DATE >= (select min(block_timestamp::DATE) from ' ~ generate_tmp_view_name(this) ~ ')'], + unique_key = "transaction_id", + incremental_strategy = 'merge', + merge_exclude_columns = ["inserted_timestamp"], + cluster_by = ['block_timestamp::DATE'], + tags = ['core', 'full_test'] +) }} + +SELECT + block_id, + block_timestamp, + tx_id, + tx_type, + tx_data, + tx_transition_count, + fee_proof, + fee_global_state_root, + fee_transition_id, + fee_transition_program, + fee_transition_function, + fee_transition_inputs, + fee_transition_outputs, + complete_transactions_id, + SYSDATE() AS inserted_timestamp, + SYSDATE() AS modified_timestamp, + '{{ invocation_id }}' AS _invocation_id +FROM + {{ ref('silver__transactions') }} + +{% if is_incremental() %} +WHERE + block_timestamp >= DATEADD( + 'hour', + -1, + (SELECT MAX(block_timestamp) FROM {{ this }}) + ) +{% endif %} \ No newline at end of file diff --git a/models/gold/core/core__fact_transitions.sql b/models/gold/core/core__fact_transitions.sql new file mode 100644 index 0000000..947033c --- /dev/null +++ b/models/gold/core/core__fact_transitions.sql @@ -0,0 +1,35 @@ +{{ config( + materialized = 'incremental', + incremental_predicates = ['DBT_INTERNAL_DEST.block_timestamp::DATE >= (select min(block_timestamp::DATE) from ' ~ generate_tmp_view_name(this) ~ ')'], + unique_key = "transition_id", + incremental_strategy = 'merge', + merge_exclude_columns = ["inserted_timestamp"], + cluster_by = ['block_timestamp::DATE'], + tags = ['core', 'full_test'] +) }} + +SELECT + block_id, + block_timestamp, + tx_id, + transition_id, + program_id, + transition_function, + transition_inputs, + {{ dbt_utils.generate_surrogate_key( + ['tx_id','transition_id'] + ) }} AS complete_transition_id, + SYSDATE() AS inserted_timestamp, + SYSDATE() AS modified_timestamp, + '{{ invocation_id }}' AS _invocation_id +FROM + {{ ref('silver__transitions') }} + +{% if is_incremental() %} +WHERE + block_timestamp >= DATEADD( + 'hour', + -1, + (SELECT MAX(block_timestamp) FROM {{ this }}) + ) +{% endif %} \ No newline at end of file diff --git a/models/silver/silver__blocks.sql b/models/silver/silver__blocks.sql new file mode 100644 index 0000000..2a97800 --- /dev/null +++ b/models/silver/silver__blocks.sql @@ -0,0 +1,72 @@ +{{ config( + materialized = 'incremental', + unique_key = "block_id", + incremental_strategy = 'merge', + merge_exclude_columns = ["inserted_timestamp"], + cluster_by = ['modified_timestamp::DATE'], + tags = ['core','full_test'] +) }} + +-- depends_on: {{ ref('bronze__blocks') }} + +WITH base AS ( + + SELECT + DATA, -- IN FINAL + DATA :header AS header, -- IN FINAL + header :metadata :height :: INT AS block_id, -- IN FINAL + header :metadata :timestamp :: datetime AS block_timestamp, + DATA :block_hash :: STRING AS block_hash, -- IN FINAL + COALESCE(ARRAY_SIZE(DATA :transactions) :: NUMBER, 0) AS tx_count, -- IN FINAL + header :metadata :network as network_id, -- IN FINAL + header :metadata :round as proving_round, -- use to identify address of block producer (validator) -- IN FINAL + OBJECT_KEYS(DATA :authority :subdag :subdag) as prover_rounds, -- IN FINAL, REPLACES PROPOSER ADDRESS + DATA :transactions as transactions, -- IN FINAL + DATA :ratifications as block_rewards, -- puzzle rewards (provers) and staker rewards (block reward). puzzle rewards are split by weight -- IN FINAL + DATA :solutions :solutions :solutions as puzzle_solutions -- target is the proportion of prover rewards + FROM + + {% if is_incremental() %} + {{ ref('bronze__blocks') }} + WHERE + inserted_timestamp >= DATEADD( + MINUTE, + -5,( + SELECT + MAX( + modified_timestamp + ) + FROM + {{ this }} + ) + ) + {% else %} + {{ ref('bronze__blocks_FR') }} + {% endif %} + + qualify( + ROW_NUMBER() over ( + PARTITION BY network_id, block_id + ORDER BY block_id DESC, inserted_timestamp DESC + ) + ) = 1 +) +SELECT + block_id, + block_timestamp, + network_id, + tx_count, + proving_round, + prover_rounds, + transactions, + block_rewards, + puzzle_solutions, + DATA, + {{ dbt_utils.generate_surrogate_key( + ['network_id','block_id'] + ) }} AS blocks_id, + SYSDATE() AS inserted_timestamp, + SYSDATE() AS modified_timestamp, + '{{ invocation_id }}' AS _invocation_id +FROM + base diff --git a/models/silver/silver__programs.sql b/models/silver/silver__programs.sql new file mode 100644 index 0000000..c60e4bd --- /dev/null +++ b/models/silver/silver__programs.sql @@ -0,0 +1,41 @@ +{{ config( + materialized = 'table', + unique_key = "program_id", + tags = ['core','full_test'] +) }} + +WITH base as ( + SELECT + block_id, + block_timestamp, + tx_data, + mappings + FROM + {{ ref('bronze__programs') }} +), +parsed AS ( + SELECT + block_id AS deployment_block_id, + block_timestamp AS deployment_block_timestamp, + REGEXP_SUBSTR(tx_data :program :: STRING, 'program\\s+(\\S+);', 1, 1, 'e', 1) AS program_id, + tx_data :edition :: INT AS edition, + tx_data :program :: STRING AS program, + tx_data :verifying_keys :: STRING AS verifying_keys, + mappings + FROM base +) +SELECT + deployment_block_id, + deployment_block_timestamp, + program_id, + edition, + program, + mappings, + verifying_keys, + {{ dbt_utils.generate_surrogate_key( + ['program_id'] + ) }} AS complete_program_id +FROM + parsed +ORDER BY + program_id \ No newline at end of file diff --git a/models/silver/silver__transactions.sql b/models/silver/silver__transactions.sql new file mode 100644 index 0000000..def73c4 --- /dev/null +++ b/models/silver/silver__transactions.sql @@ -0,0 +1,91 @@ +{{ config( + materialized = 'table', + unique_key = "tx_id", + tags = ['core','full_test'] +) }} + +WITH base AS ( + SELECT + t.VALUE :BLOCK_ID_REQUESTED :: INT AS block_id, + t.DATA :id :: STRING as tx_id, + t.DATA :type AS tx_type, + CASE WHEN f.key :: STRING NOT IN ('id', 'type', 'fee') THEN f.value END AS tx_msg, + CASE WHEN f.key :: STRING = 'fee' THEN f.value END AS fee_msg, + t.DATA, + t.PARTITION_KEY + FROM + {% if is_incremental() %} + {{ ref('bronze__transactions') }} t, + {% else %} + {{ ref('bronze__transactions_FR') }} t, + {% endif %} + LATERAL FLATTEN(input => t.DATA) AS f + + {% if is_incremental() %} + WHERE inserted_timestamp >= DATEADD( + MINUTE, + -5,( + SELECT + MAX(modified_timestamp) + FROM + {{ this }} + ) + ) + {% endif %} + +), +detail AS ( + SELECT + block_id, + tx_id, + MAX(COALESCE(tx_type, '')) AS tx_type, + PARSE_JSON(MAX(COALESCE(tx_msg, ''))) AS tx_msg, + PARSE_JSON(MAX(COALESCE(fee_msg, ''))) AS fee_msg, + DATA, + PARTITION_KEY + FROM base + GROUP BY ALL +) +SELECT + block_id, + block_timestamp, + tx_id, + tx_type, + CASE + WHEN tx_type = 'fee' then fee_msg + ELSE tx_msg + END as tx_data, + fee_msg, + COALESCE(ARRAY_SIZE(tx_data :transitions) :: NUMBER, 0) AS tx_transition_count, + CASE + WHEN tx_type = 'fee' then fee_msg :transition + ELSE tx_msg :transitions + END as tx_transitions, + fee_msg :proof :: STRING AS fee_proof, + fee_msg :global_state_root :: STRING AS fee_global_state_root, + fee_msg :transition :id :: STRING AS fee_transition_id, + fee_msg :transition :program :: STRING AS fee_transition_program, + fee_msg :transition :function :: STRING AS fee_transition_function, + fee_msg :transition :inputs :: STRING AS fee_transition_inputs, + fee_msg :transition :outputs :: STRING AS fee_transition_outputs, + DATA, + {{ dbt_utils.generate_surrogate_key( + ['tx_id'] + ) }} AS complete_transactions_id, + partition_key, + SYSDATE() AS inserted_timestamp, + SYSDATE() AS modified_timestamp, + '{{ invocation_id }}' AS _invocation_id +FROM + detail +JOIN ( + SELECT + block_id, + block_timestamp + FROM + {{ ref('silver__blocks') }} + WHERE + tx_count > 0 + ) b +USING(block_id) + diff --git a/models/silver/silver__transitions.sql b/models/silver/silver__transitions.sql new file mode 100644 index 0000000..cac66f7 --- /dev/null +++ b/models/silver/silver__transitions.sql @@ -0,0 +1,54 @@ +{{ config( + materialized = 'table', + unique_key = "transition_id", + tags = ['core','full_test'] +) }} + +WITH base AS ( + SELECT + block_id, + block_timestamp, + tx_id, + tx_data + FROM + {{ ref('silver__transactions') }} +), +transitions AS ( + SELECT + t.block_id, + t.tx_id, + t.block_timestamp, + f.value AS transition, + f.index AS transition_index + FROM base t, + TABLE(FLATTEN(input => PARSE_JSON(tx_data):transitions)) f + WHERE block_id is not null +), +parsed AS ( + SELECT + block_id, + block_timestamp, + tx_id, + transition :id :: STRING AS transition_id, + transition :program :: STRING AS program_id, + transition :function :: STRING AS transition_function, + transition :inputs :: STRING AS transition_inputs, + transition :outputs :: STRING AS transition_outputs + FROM transitions +) +SELECT + block_id, + block_timestamp, + tx_id, + transition_id, + program_id, + transition_function, + transition_inputs, + transition_outputs, + {{ dbt_utils.generate_surrogate_key( + ['tx_id','transition_id'] + ) }} AS complete_transition_id, + SYSDATE() AS inserted_timestamp, + SYSDATE() AS modified_timestamp, + '{{ invocation_id }}' AS invocation_id +FROM parsed \ No newline at end of file diff --git a/models/sources.yml b/models/sources.yml new file mode 100644 index 0000000..e15a830 --- /dev/null +++ b/models/sources.yml @@ -0,0 +1,30 @@ +version: 2 + +sources: + - name: crosschain + database: "{{ 'crosschain' if target.database == 'aleo' else 'crosschain_dev' }}" + schema: core + tables: + - name: dim_date_hours + - name: address_tags + - name: dim_dates + + - name: crosschain_silver + database: "{{ 'crosschain' if target.database == 'ALEO' else 'crosschain_dev' }}" + schema: silver + tables: + - name: number_sequence + - name: labels_combined + + - name: bronze_streamline + database: streamline + schema: | + "{{ 'ALEO' if target.database == 'ALEO' else 'ALEO_DEV' }}" + tables: + - name: blocks + - name: transactions + - name: github_actions + database: ALEO + schema: github_actions + tables: + - name: workflows \ No newline at end of file diff --git a/models/streamline/silver/_max_block_by_date.sql b/models/streamline/silver/_max_block_by_date.sql new file mode 100644 index 0000000..5340acc --- /dev/null +++ b/models/streamline/silver/_max_block_by_date.sql @@ -0,0 +1,27 @@ +{{ config ( + materialized = "ephemeral", + unique_key = "block_id", +) }} + +WITH base AS ( + + SELECT + block_timestamp :: DATE AS block_date, + MAX(block_id) AS block_id + FROM + {{ ref("silver__blocks") }} + GROUP BY + block_timestamp :: DATE +) +SELECT + block_date, + block_id +FROM + base +WHERE + block_date <> ( + SELECT + MAX(block_date) + FROM + base + ) diff --git a/models/streamline/silver/core/complete/streamline__blocks_complete.sql b/models/streamline/silver/core/complete/streamline__blocks_complete.sql new file mode 100644 index 0000000..eb83ae8 --- /dev/null +++ b/models/streamline/silver/core/complete/streamline__blocks_complete.sql @@ -0,0 +1,36 @@ +{{ config ( + materialized = "incremental", + incremental_strategy = 'merge', + unique_key = "block_number", + cluster_by = "ROUND(block_number, -3)", + merge_exclude_columns = ["inserted_timestamp"], + post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION on equality(block_number)" +) }} + +SELECT + DATA :header :metadata :height :: INT AS block_number, + {{ dbt_utils.generate_surrogate_key( + ['block_number'] + ) }} AS complete_blocks_id, + SYSDATE() AS inserted_timestamp, + SYSDATE() AS modified_timestamp, + file_name, + '{{ invocation_id }}' AS _invocation_id +FROM + +{% if is_incremental() %} + {{ ref('bronze__blocks') }} +WHERE + inserted_timestamp >= ( + SELECT + MAX(modified_timestamp) modified_timestamp + FROM + {{ this }} + ) +{% else %} + {{ ref('bronze__blocks_FR') }} +{% endif %} + +qualify(ROW_NUMBER() over (PARTITION BY block_number +ORDER BY + inserted_timestamp DESC)) = 1 diff --git a/models/streamline/silver/core/complete/streamline__transactions_complete.sql b/models/streamline/silver/core/complete/streamline__transactions_complete.sql new file mode 100644 index 0000000..2ab908e --- /dev/null +++ b/models/streamline/silver/core/complete/streamline__transactions_complete.sql @@ -0,0 +1,47 @@ +{{ config ( + materialized = "incremental", + incremental_strategy = 'merge', + unique_key = "complete_transactions_id" +) }} + +-- depends_on: {{ ref('bronze__transactions') }} + +WITH transactions AS ( + SELECT + VALUE:BLOCK_ID_REQUESTED :: INT AS block_id, + DATA :id :: STRING AS transaction_id, + {{ dbt_utils.generate_surrogate_key( + ['VALUE:BLOCK_ID_REQUESTED :: INT', 'DATA :id :: STRING'] + ) }} AS complete_transactions_id, + SYSDATE() AS inserted_timestamp, + SYSDATE() AS modified_timestamp, + file_name, + '{{ invocation_id }}' AS _invocation_id, + ROW_NUMBER() OVER (PARTITION BY DATA :id ORDER BY inserted_timestamp DESC) AS rn + FROM + {% if is_incremental() %} + {{ ref('bronze__transactions') }} + {% else %} + {{ ref('bronze__transactions_FR') }} + {% endif %} + + {% if is_incremental() %} + WHERE inserted_timestamp >= ( + SELECT MAX(modified_timestamp) + FROM {{ this }} + ) + {% endif %} +) + +SELECT + block_id, + transaction_id, + complete_transactions_id, + inserted_timestamp, + modified_timestamp, + file_name, + _invocation_id +FROM transactions +WHERE + rn = 1 + AND block_id IS NOT NULL \ No newline at end of file diff --git a/models/streamline/silver/core/realtime/streamline__blocks_realtime.sql b/models/streamline/silver/core/realtime/streamline__blocks_realtime.sql new file mode 100644 index 0000000..b0d4cd2 --- /dev/null +++ b/models/streamline/silver/core/realtime/streamline__blocks_realtime.sql @@ -0,0 +1,43 @@ +{{ config ( + materialized = "view", + post_hook = fsc_utils.if_data_call_function_v2( + func = 'streamline.udf_bulk_rest_api_v2', + target = "{{this.schema}}.{{this.identifier}}", + params ={ "external_table" :"blocks", + "sql_limit" :"10000", + "producer_batch_size" :"1000", + "worker_batch_size" :"100", + "sql_source" :"{{this.identifier}}" } + ) +) }} +-- depends_on: {{ ref('streamline__blocks_complete') }} +WITH blocks AS ( + + SELECT + block_number + FROM + {{ ref("streamline__blocks") }} + EXCEPT + SELECT + block_number + FROM + {{ ref("streamline__blocks_complete") }} +) +SELECT + ROUND( + block_number, + -4 + ) :: INT AS partition_key, + {{ target.database }}.live.udf_api( + 'GET', + '{Service}/block/' || block_number, + OBJECT_CONSTRUCT( + 'Content-Type', + 'application/json' + ),{}, + 'Vault/dev/aleo/mainnet' + ) AS request +FROM + blocks +ORDER BY + block_number \ No newline at end of file diff --git a/models/streamline/silver/core/realtime/streamline__transactions_realtime.sql b/models/streamline/silver/core/realtime/streamline__transactions_realtime.sql new file mode 100644 index 0000000..df66de0 --- /dev/null +++ b/models/streamline/silver/core/realtime/streamline__transactions_realtime.sql @@ -0,0 +1,56 @@ +{{ config ( + materialized = "view", + post_hook = fsc_utils.if_data_call_function_v2( + func = 'streamline.udf_bulk_rest_api_v2', + target = "{{this.schema}}.{{this.identifier}}", + params ={ "external_table" :"transactions", + "sql_limit" :"500", + "producer_batch_size" :"1000", + "worker_batch_size" :"100", + "sql_source" :"{{this.identifier}}" } + ) +) }} + +WITH blocks AS ( + + SELECT + block_id, + block_timestamp, + transactions, + tx_count + FROM + {{ref("silver__blocks")}} + WHERE + tx_count > 0 +), +transaction_ids AS ( + SELECT + b.block_id, + b.block_timestamp, + t.value:transaction:id::STRING AS transaction_id + FROM + blocks b, + TABLE(FLATTEN(PARSE_JSON(transactions))) t + WHERE + t.value:transaction:id IS NOT NULL +) +SELECT + ROUND( + block_id, + -4 + ) :: INT AS partition_key, + block_timestamp, + {{ target.database }}.live.udf_api( + 'GET', + '{Service}/transaction/' || transaction_id, + OBJECT_CONSTRUCT( + 'Content-Type', + 'application/json' + ),{}, + 'Vault/dev/aleo/mainnet' + ) AS request, + block_id AS block_id_requested +FROM + transaction_ids +ORDER BY + block_id diff --git a/models/streamline/streamline__blocks.sql b/models/streamline/streamline__blocks.sql new file mode 100644 index 0000000..8398acb --- /dev/null +++ b/models/streamline/streamline__blocks.sql @@ -0,0 +1,19 @@ +{{ config ( + materialized = "table", + tags = ['streamline_view'] +) }} + +SELECT + _id AS block_number +FROM + {{ source( + 'crosschain_silver', + 'number_sequence' + ) }} +WHERE + _id <= ( + SELECT + MAX(block_number) + FROM + {{ ref('streamline__chainhead') }} + ) diff --git a/models/streamline/streamline__chainhead.sql b/models/streamline/streamline__chainhead.sql new file mode 100644 index 0000000..36a47d5 --- /dev/null +++ b/models/streamline/streamline__chainhead.sql @@ -0,0 +1,15 @@ +{{ config ( + materialized = "view", + tags = ['streamline_view'] +) }} + +SELECT + {{ target.database }}.live.udf_api( + 'GET', + '{Service}/latest/height', + OBJECT_CONSTRUCT( + 'Content-Type', + 'application/json' + ),{}, + 'Vault/dev/aleo/mainnet' + ) :data :: INT AS block_number diff --git a/package-lock.yml b/package-lock.yml new file mode 100644 index 0000000..1a8ba22 --- /dev/null +++ b/package-lock.yml @@ -0,0 +1,16 @@ +packages: + - package: calogica/dbt_expectations + version: 0.8.5 + - git: https://github.com/FlipsideCrypto/fsc-utils.git + revision: eb33ac727af26ebc8a8cc9711d4a6ebc3790a107 + - package: get-select/dbt_snowflake_query_tags + version: 2.5.0 + - package: dbt-labs/dbt_external_tables + version: 0.8.2 + - package: dbt-labs/dbt_utils + version: 1.0.0 + - package: calogica/dbt_date + version: 0.7.2 + - git: https://github.com/FlipsideCrypto/livequery-models.git + revision: b024188be4e9c6bc00ed77797ebdc92d351d620e +sha1_hash: 4d94cfdedbff54a9267b86ee2d14f7aa324282ab diff --git a/packages.yml b/packages.yml new file mode 100644 index 0000000..fed58de --- /dev/null +++ b/packages.yml @@ -0,0 +1,11 @@ +packages: + - package: calogica/dbt_expectations + version: [">=0.4.0", "<0.9.0"] + - git: https://github.com/FlipsideCrypto/fsc-utils.git + revision: v1.29.0 + - package: get-select/dbt_snowflake_query_tags + version: [">=2.0.0", "<3.0.0"] + - package: dbt-labs/dbt_external_tables + version: 0.8.2 + - package: dbt-labs/dbt_utils + version: 1.0.0 \ No newline at end of file diff --git a/profiles.yml b/profiles.yml new file mode 100644 index 0000000..0661f98 --- /dev/null +++ b/profiles.yml @@ -0,0 +1,29 @@ +xyz: + target: dev + outputs: + dev: + type: snowflake + account: "{{ env_var('ACCOUNT') }}" + user: "{{ env_var('USER') }}" + password: "{{ env_var('PASSWORD') }}" + role: "{{ env_var('ROLE') }}" + schema: "{{ env_var('SCHEMA') }}" + region: "{{ env_var('REGION') }}" + database: "{{ env_var('DATABASE') }}" + warehouse: "{{ env_var('WAREHOUSE') }}" + threads: 8 + client_session_keep_alive: False + prod: + type: snowflake + account: "{{ env_var('ACCOUNT') }}" + user: "{{ env_var('USER') }}" + password: "{{ env_var('PASSWORD') }}" + role: "{{ env_var('ROLE') }}" + schema: "{{ env_var('SCHEMA') }}" + region: "{{ env_var('REGION') }}" + database: "{{ env_var('DATABASE') }}" + warehouse: "{{ env_var('WAREHOUSE') }}" + threads: 8 + client_session_keep_alive: False + config: + send_anonymous_usage_stats: False \ No newline at end of file diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..235f1a5 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,3 @@ +dbt-core>=1.8,<1.9 +dbt-snowflake>=1.8,<1.9 +protobuf==4.25.3 \ No newline at end of file diff --git a/snapshots/.gitkeep b/snapshots/.gitkeep new file mode 100644 index 0000000..e69de29