From 3683b28be65ca34c2fabd42e1fe61fa75eaefb5e Mon Sep 17 00:00:00 2001 From: Eric Laurello Date: Fri, 19 May 2023 11:15:10 -0400 Subject: [PATCH] _full change --- analyses/datashares.sql | 45 +++++ macros/create_sps.sql | 7 +- macros/datashares.sql | 160 ++++++++++++++++++ .../_datashare/_datashare___create_gold.sql | 20 +++ .../silver/squid/silver__squid_avalanche.sql | 2 +- models/silver/squid/silver__squid_polygon.sql | 2 +- models/sources.yml | 4 +- 7 files changed, 233 insertions(+), 7 deletions(-) create mode 100644 analyses/datashares.sql create mode 100644 macros/datashares.sql create mode 100644 models/_datashare/_datashare___create_gold.sql diff --git a/analyses/datashares.sql b/analyses/datashares.sql new file mode 100644 index 0000000..c797a13 --- /dev/null +++ b/analyses/datashares.sql @@ -0,0 +1,45 @@ + +{% set dag = {} %} +{% for key, value in graph.nodes.items() -%} + {% + if value.refs + and set(value.fqn).intersection(["gold"]) + and value.config.materialized not in ["view", "test"] + and value.config.enabled + and not value.sources + -%} + ================================================================== + {{ key }} + {{ value.database }}.{{ value.schema }}.{{ value.alias }} + {%- set name = value.database + "." + value.schema + "." + value.alias %} + {{ "MATERIALIZATION: "~ value.config.materialized }} + {{ value["fqn"]}} + ------------------------------------------------------------------ + Dependencies: + {{ value.depends_on.nodes | unique | list | pprint }} + ALL ANCESTORS: + {% set ancestors = fromjson("[" ~ get_ancestors(value, include_depth=true, exclude_source=false)[:-1] ~ "]") -%} + {{- ancestors | pprint -}} + {# build dictionary[db_object, dag] #} + {%- set _result = fromjson("[" ~ get_ancestors(value, exclude_source=true)[:-1] ~ "]") %} + {% if _result %} + {% do _result.insert(0, key) %} + {% do dag.update({name : _result | reverse|list}) %} + {% else %} + {% do dag.update({name : [key] }) %} + {%- endif %} + Refs: + {%- set not_ephemeral = [] -%} + {% for item in value.refs if not item[0].startswith("_") %} + {%- set _ = not_ephemeral.append(item) -%} + {%- endfor -%} + {{ not_ephemeral | pprint}} + Sources: + {{ value.sources | pprint }} + + {%- endif %} +{%- endfor %} + +total views: {{ views | length}} +view with most dependencies: {{ views | sort(reverse=True) | first | pprint }} + diff --git a/macros/create_sps.sql b/macros/create_sps.sql index 3b5bf79..2e96fd2 100644 --- a/macros/create_sps.sql +++ b/macros/create_sps.sql @@ -1,6 +1,7 @@ {% macro create_sps() %} {% if target.database == 'AXELAR' %} - CREATE SCHEMA IF NOT EXISTS _internal; - -- {{ sp_create_prod_clone('_internal') }}; + CREATE schema IF NOT EXISTS _internal; +CREATE schema IF NOT EXISTS _datashare; +{{ sp_create_prod_clone('_internal') }}; {% endif %} -{% endmacro %} \ No newline at end of file +{% endmacro %} diff --git a/macros/datashares.sql b/macros/datashares.sql new file mode 100644 index 0000000..2c58dbf --- /dev/null +++ b/macros/datashares.sql @@ -0,0 +1,160 @@ +{%- macro get_ancestors(node, include_depth=false, exclude_source=false) -%} +{# + Return a list of ancestors for a node in a DAG. + #} + {%- for dep in node.depends_on.nodes | unique | list recursive %} + {% if dep.startswith("model.") and "bronze__" not in dep %} + "{{- loop.depth0 ~ '-'if include_depth else '' }}{{node.config.materialized }}-{{ dep -}}", + {{- loop(graph.nodes[dep].depends_on.nodes) -}} + {% elif not exclude_source %} + "{{- loop.depth0 ~ '-'if include_depth else '' }}{{node.config.materialized }}-{{ dep -}}", + {%- endif -%} + {%- endfor %} +{%- endmacro -%} + +{% macro get_view_ddl() %} +{# + Return a dictionary of view names and their DDL statements. + The DDL statements are escaped to be used in a Snowflake query. + The dictionary is converted to JSON to be used in a dbt macro.. + #} + {% if execute %} + {% set query %} + SELECT + CONCAT_WS('.', TABLE_SCHEMA, TABLE_NAME) as VIEW_NAME, + VIEW_DEFINITION + FROM {{target.database}}.INFORMATION_SCHEMA.VIEWS + WHERE TABLE_SCHEMA NOT IN ('INFORMATION_SCHEMA', 'STREAMLINE') + AND TABLE_SCHEMA NOT LIKE 'TEST_%' + {%- endset -%} + {%- set results = run_query(query) -%} + {% set ddl = {} %} + {% for key, value in results.rows %} + {%- do ddl.update({key: value|replace("$$", "\$\$")}) -%} + {%- endfor -%} + {{- tojson(ddl) -}} + {%- endif -%} +{%- endmacro -%} + +{% macro replace_database_references(references_to_replace, ddl, new_database) %} +{# + Return the DDL statement for a view with the references replaced. + + references_to_replace: a dictionary of references to replace + ddl: the DDL statement to replace the references in + new_database: the new database to replace the references with +#} + {% set outer = namespace(replaced=ddl) %} + {% for key in references_to_replace %} + {%- set original = target.database ~ "." ~ key.upper() -%} + {%- set replacement = new_database ~ "." ~ key -%} + {%- set outer.replaced = outer.replaced|replace(original, replacement) -%} + {%- set original = target.database ~ "." ~ key.lower() -%} + {%- set replacement = new_database ~ "." ~ key -%} + {%- set outer.replaced = outer.replaced|replace(original, replacement) -%} + {%- endfor -%} + {% set outer.replaced = outer.replaced|replace(target.database.upper() ~ ".", "__SOURCE__.") %} + {% set outer.replaced = outer.replaced|replace(target.database.lower() ~ ".", "__SOURCE__.") %} + {{- outer.replaced -}} +{%- endmacro -%} + +{% macro generate_view_ddl(dag, schema) %} +{# + Return a list of DDL statements for views in a DAG. + + dag: a DAG of views + schema: schemas to create schema DDL for + #} + {%- set ddl = fromjson(get_view_ddl()) -%} + {%- set created = {} -%} + {%- set final_text = [] -%} + {%- for view, deps in dag.items() -%} + {%- for d in deps -%} + {%- set table_name = d.split(".")[-1].replace("__", ".").upper() -%} + {%- if ddl.get(table_name) and table_name not in created -%} + {%- set replaced = replace_database_references(ddl.keys(), ddl[table_name], "__NEW__") -%} + {%- do final_text.append(replaced) -%} + {%- do created.update({table_name:true}) -%} + {%- endif -%} + {%- endfor -%} + {%- endfor -%} + {%- set schema_ddl = [] -%} + {%- for s in schema -%} + {%- do schema_ddl.append("CREATE SCHEMA IF NOT EXISTS __NEW__." ~ s ~ ";") -%} + {%- endfor -%} + {{- toyaml(schema_ddl + final_text) -}} +{%- endmacro -%} + +{% macro generate_dag_and_schemas(node_paths, materializations) %} +{# + Return a DAG of views and a list of schemas to create. + + node_paths: a list of node paths to include in the DAG + materializations: a list of materializations to include in the DAG + #} + {%- set dag = {} -%} + {%- set schema = [] -%} + {%- for key, value in graph.nodes.items() -%} + {% + if value.refs + and set(value.fqn).intersection(node_paths) + and value.config.materialized in materializations + and value.config.enabled + and not value.sources + and not key.endswith("_create_gold") + -%} + {%- set name = value.schema + "." + value.alias -%} + {%- set _result = fromyaml("[" ~ get_ancestors(value, exclude_source=true)[:-1] ~ "]") -%} + {% if _result -%} + {%- do _result.insert(0, key) -%} + {%- do dag.update({name.upper() : _result | reverse|list}) -%} + {% for d in _result -%} + {%- if d.split(".")[-1].split("__")[0] not in schema -%} + {%- do schema.append(d.split(".")[-1].split("__")[0]) -%} + {%- endif -%} + {%- endfor -%} + {%- else -%} + {%- do dag.update({name.upper() : [key] }) -%} + {%- if value.schema not in schema -%} + {%- do schema.append(value.schema) -%} + {%- endif -%} + {%- endif -%} + {%- endif -%} + {%- endfor -%} + {%- set final = {"dag": dag, "schema": schema} -%} + {{- tojson(final) -}} +{%- endmacro -%} + +{% macro generate_table_views_ddl(tables, schema) %} +{# + Return a list of DDL statements for views of tables from a list. + + tables: a list of tables to create views for + schema: schemas to create schema DDL for + #} + {%- set schema_ddl = [] -%} + {%- set view_ddl = [] -%} + {% for s in schema %} + {%- do schema_ddl.append("CREATE SCHEMA IF NOT EXISTS __NEW__." ~ s ~ ";") -%} + {%- endfor -%} + {% for table in tables %} + {%- do view_ddl.append("CREATE OR REPLACE VIEW __NEW__." ~ table ~ " AS SELECT * FROM " ~ "__SOURCE__." ~ table ~";") -%} + {%- endfor -%} + {{- toyaml(schema_ddl + view_ddl) -}} +{%- endmacro -%} + +{% macro generate_datashare_ddl() %} +{# + generate DDL for datashare + + Return: DDL for datashare + #} + {%- set gold_views = fromjson(generate_dag_and_schemas(["gold"], ["view"])) -%} + {%- set gold_views_ddl = fromyaml(generate_view_ddl(gold_views["dag"], gold_views["schema"])) -%} + {%- set gold_tables = fromjson(generate_dag_and_schemas(["gold"], ["incremental", "table"])) -%} + {%- set gold_tables_ddl = fromyaml(generate_table_views_ddl(gold_tables["dag"].keys(), gold_tables["schema"])) -%} + {%- set combined_ddl = gold_views_ddl + gold_tables_ddl -%} + {%- do combined_ddl.insert(0, "CREATE DATABASE IF NOT EXISTS __NEW__;") -%} + {{- "BEGIN\n" ~ (combined_ddl | join("\n")) ~ "\nEND" -}} +{%- endmacro -%} + diff --git a/models/_datashare/_datashare___create_gold.sql b/models/_datashare/_datashare___create_gold.sql new file mode 100644 index 0000000..36d9199 --- /dev/null +++ b/models/_datashare/_datashare___create_gold.sql @@ -0,0 +1,20 @@ +{{ + config( + materialized = 'incremental', + incremental_strategy = 'merge', + unique_key = 'ddl_hash', + merge_update_columns = [], + ) +}} +{% if execute %} +SELECT +$${{- generate_datashare_ddl() -}}$$ AS ddl, +md5(ddl) AS ddl_hash, +sysdate() as ddl_created_at +{% else %} +SELECT +null as ddl, +null as ddl_hash, +null as ddl_created_at +from dual limit 0 +{% endif %} \ No newline at end of file diff --git a/models/silver/squid/silver__squid_avalanche.sql b/models/silver/squid/silver__squid_avalanche.sql index c770db9..bee3a37 100644 --- a/models/silver/squid/silver__squid_avalanche.sql +++ b/models/silver/squid/silver__squid_avalanche.sql @@ -22,7 +22,7 @@ WITH deco_logs_base AS ( FROM {{ source( 'avalanche_silver', - 'decoded_logs_full' + 'decoded_logs' ) }} WHERE block_timestamp :: DATE >= '2022-11-01' diff --git a/models/silver/squid/silver__squid_polygon.sql b/models/silver/squid/silver__squid_polygon.sql index 1216300..63d0e12 100644 --- a/models/silver/squid/silver__squid_polygon.sql +++ b/models/silver/squid/silver__squid_polygon.sql @@ -22,7 +22,7 @@ WITH deco_logs_base AS ( FROM {{ source( 'polygon_silver', - 'decoded_logs_full' + 'decoded_logs' ) }} WHERE block_timestamp :: DATE >= '2022-11-01' diff --git a/models/sources.yml b/models/sources.yml index 17cb003..45334de 100644 --- a/models/sources.yml +++ b/models/sources.yml @@ -46,7 +46,7 @@ sources: schema: silver tables: - name: contracts - - name: decoded_logs_full + - name: decoded_logs # - name: logs - name: transfers - name: bsc_silver @@ -68,7 +68,7 @@ sources: schema: silver tables: - name: contracts - - name: decoded_logs_full + - name: decoded_logs # - name: logs - name: transfers - name: arbitrum