_full change

This commit is contained in:
Eric Laurello 2023-05-19 11:15:10 -04:00
parent 4a17b7458f
commit 3683b28be6
7 changed files with 233 additions and 7 deletions

45
analyses/datashares.sql Normal file
View File

@ -0,0 +1,45 @@
{% set dag = {} %}
{% for key, value in graph.nodes.items() -%}
{%
if value.refs
and set(value.fqn).intersection(["gold"])
and value.config.materialized not in ["view", "test"]
and value.config.enabled
and not value.sources
-%}
==================================================================
{{ key }}
{{ value.database }}.{{ value.schema }}.{{ value.alias }}
{%- set name = value.database + "." + value.schema + "." + value.alias %}
{{ "MATERIALIZATION: "~ value.config.materialized }}
{{ value["fqn"]}}
------------------------------------------------------------------
Dependencies:
{{ value.depends_on.nodes | unique | list | pprint }}
ALL ANCESTORS:
{% set ancestors = fromjson("[" ~ get_ancestors(value, include_depth=true, exclude_source=false)[:-1] ~ "]") -%}
{{- ancestors | pprint -}}
{# build dictionary[db_object, dag] #}
{%- set _result = fromjson("[" ~ get_ancestors(value, exclude_source=true)[:-1] ~ "]") %}
{% if _result %}
{% do _result.insert(0, key) %}
{% do dag.update({name : _result | reverse|list}) %}
{% else %}
{% do dag.update({name : [key] }) %}
{%- endif %}
Refs:
{%- set not_ephemeral = [] -%}
{% for item in value.refs if not item[0].startswith("_") %}
{%- set _ = not_ephemeral.append(item) -%}
{%- endfor -%}
{{ not_ephemeral | pprint}}
Sources:
{{ value.sources | pprint }}
{%- endif %}
{%- endfor %}
total views: {{ views | length}}
view with most dependencies: {{ views | sort(reverse=True) | first | pprint }}

View File

@ -1,6 +1,7 @@
{% macro create_sps() %}
{% if target.database == 'AXELAR' %}
CREATE SCHEMA IF NOT EXISTS _internal;
-- {{ sp_create_prod_clone('_internal') }};
CREATE schema IF NOT EXISTS _internal;
CREATE schema IF NOT EXISTS _datashare;
{{ sp_create_prod_clone('_internal') }};
{% endif %}
{% endmacro %}
{% endmacro %}

160
macros/datashares.sql Normal file
View File

@ -0,0 +1,160 @@
{%- macro get_ancestors(node, include_depth=false, exclude_source=false) -%}
{#
Return a list of ancestors for a node in a DAG.
#}
{%- for dep in node.depends_on.nodes | unique | list recursive %}
{% if dep.startswith("model.") and "bronze__" not in dep %}
"{{- loop.depth0 ~ '-'if include_depth else '' }}{{node.config.materialized }}-{{ dep -}}",
{{- loop(graph.nodes[dep].depends_on.nodes) -}}
{% elif not exclude_source %}
"{{- loop.depth0 ~ '-'if include_depth else '' }}{{node.config.materialized }}-{{ dep -}}",
{%- endif -%}
{%- endfor %}
{%- endmacro -%}
{% macro get_view_ddl() %}
{#
Return a dictionary of view names and their DDL statements.
The DDL statements are escaped to be used in a Snowflake query.
The dictionary is converted to JSON to be used in a dbt macro..
#}
{% if execute %}
{% set query %}
SELECT
CONCAT_WS('.', TABLE_SCHEMA, TABLE_NAME) as VIEW_NAME,
VIEW_DEFINITION
FROM {{target.database}}.INFORMATION_SCHEMA.VIEWS
WHERE TABLE_SCHEMA NOT IN ('INFORMATION_SCHEMA', 'STREAMLINE')
AND TABLE_SCHEMA NOT LIKE 'TEST_%'
{%- endset -%}
{%- set results = run_query(query) -%}
{% set ddl = {} %}
{% for key, value in results.rows %}
{%- do ddl.update({key: value|replace("$$", "\$\$")}) -%}
{%- endfor -%}
{{- tojson(ddl) -}}
{%- endif -%}
{%- endmacro -%}
{% macro replace_database_references(references_to_replace, ddl, new_database) %}
{#
Return the DDL statement for a view with the references replaced.
references_to_replace: a dictionary of references to replace
ddl: the DDL statement to replace the references in
new_database: the new database to replace the references with
#}
{% set outer = namespace(replaced=ddl) %}
{% for key in references_to_replace %}
{%- set original = target.database ~ "." ~ key.upper() -%}
{%- set replacement = new_database ~ "." ~ key -%}
{%- set outer.replaced = outer.replaced|replace(original, replacement) -%}
{%- set original = target.database ~ "." ~ key.lower() -%}
{%- set replacement = new_database ~ "." ~ key -%}
{%- set outer.replaced = outer.replaced|replace(original, replacement) -%}
{%- endfor -%}
{% set outer.replaced = outer.replaced|replace(target.database.upper() ~ ".", "__SOURCE__.") %}
{% set outer.replaced = outer.replaced|replace(target.database.lower() ~ ".", "__SOURCE__.") %}
{{- outer.replaced -}}
{%- endmacro -%}
{% macro generate_view_ddl(dag, schema) %}
{#
Return a list of DDL statements for views in a DAG.
dag: a DAG of views
schema: schemas to create schema DDL for
#}
{%- set ddl = fromjson(get_view_ddl()) -%}
{%- set created = {} -%}
{%- set final_text = [] -%}
{%- for view, deps in dag.items() -%}
{%- for d in deps -%}
{%- set table_name = d.split(".")[-1].replace("__", ".").upper() -%}
{%- if ddl.get(table_name) and table_name not in created -%}
{%- set replaced = replace_database_references(ddl.keys(), ddl[table_name], "__NEW__") -%}
{%- do final_text.append(replaced) -%}
{%- do created.update({table_name:true}) -%}
{%- endif -%}
{%- endfor -%}
{%- endfor -%}
{%- set schema_ddl = [] -%}
{%- for s in schema -%}
{%- do schema_ddl.append("CREATE SCHEMA IF NOT EXISTS __NEW__." ~ s ~ ";") -%}
{%- endfor -%}
{{- toyaml(schema_ddl + final_text) -}}
{%- endmacro -%}
{% macro generate_dag_and_schemas(node_paths, materializations) %}
{#
Return a DAG of views and a list of schemas to create.
node_paths: a list of node paths to include in the DAG
materializations: a list of materializations to include in the DAG
#}
{%- set dag = {} -%}
{%- set schema = [] -%}
{%- for key, value in graph.nodes.items() -%}
{%
if value.refs
and set(value.fqn).intersection(node_paths)
and value.config.materialized in materializations
and value.config.enabled
and not value.sources
and not key.endswith("_create_gold")
-%}
{%- set name = value.schema + "." + value.alias -%}
{%- set _result = fromyaml("[" ~ get_ancestors(value, exclude_source=true)[:-1] ~ "]") -%}
{% if _result -%}
{%- do _result.insert(0, key) -%}
{%- do dag.update({name.upper() : _result | reverse|list}) -%}
{% for d in _result -%}
{%- if d.split(".")[-1].split("__")[0] not in schema -%}
{%- do schema.append(d.split(".")[-1].split("__")[0]) -%}
{%- endif -%}
{%- endfor -%}
{%- else -%}
{%- do dag.update({name.upper() : [key] }) -%}
{%- if value.schema not in schema -%}
{%- do schema.append(value.schema) -%}
{%- endif -%}
{%- endif -%}
{%- endif -%}
{%- endfor -%}
{%- set final = {"dag": dag, "schema": schema} -%}
{{- tojson(final) -}}
{%- endmacro -%}
{% macro generate_table_views_ddl(tables, schema) %}
{#
Return a list of DDL statements for views of tables from a list.
tables: a list of tables to create views for
schema: schemas to create schema DDL for
#}
{%- set schema_ddl = [] -%}
{%- set view_ddl = [] -%}
{% for s in schema %}
{%- do schema_ddl.append("CREATE SCHEMA IF NOT EXISTS __NEW__." ~ s ~ ";") -%}
{%- endfor -%}
{% for table in tables %}
{%- do view_ddl.append("CREATE OR REPLACE VIEW __NEW__." ~ table ~ " AS SELECT * FROM " ~ "__SOURCE__." ~ table ~";") -%}
{%- endfor -%}
{{- toyaml(schema_ddl + view_ddl) -}}
{%- endmacro -%}
{% macro generate_datashare_ddl() %}
{#
generate DDL for datashare
Return: DDL for datashare
#}
{%- set gold_views = fromjson(generate_dag_and_schemas(["gold"], ["view"])) -%}
{%- set gold_views_ddl = fromyaml(generate_view_ddl(gold_views["dag"], gold_views["schema"])) -%}
{%- set gold_tables = fromjson(generate_dag_and_schemas(["gold"], ["incremental", "table"])) -%}
{%- set gold_tables_ddl = fromyaml(generate_table_views_ddl(gold_tables["dag"].keys(), gold_tables["schema"])) -%}
{%- set combined_ddl = gold_views_ddl + gold_tables_ddl -%}
{%- do combined_ddl.insert(0, "CREATE DATABASE IF NOT EXISTS __NEW__;") -%}
{{- "BEGIN\n" ~ (combined_ddl | join("\n")) ~ "\nEND" -}}
{%- endmacro -%}

View File

@ -0,0 +1,20 @@
{{
config(
materialized = 'incremental',
incremental_strategy = 'merge',
unique_key = 'ddl_hash',
merge_update_columns = [],
)
}}
{% if execute %}
SELECT
$${{- generate_datashare_ddl() -}}$$ AS ddl,
md5(ddl) AS ddl_hash,
sysdate() as ddl_created_at
{% else %}
SELECT
null as ddl,
null as ddl_hash,
null as ddl_created_at
from dual limit 0
{% endif %}

View File

@ -22,7 +22,7 @@ WITH deco_logs_base AS (
FROM
{{ source(
'avalanche_silver',
'decoded_logs_full'
'decoded_logs'
) }}
WHERE
block_timestamp :: DATE >= '2022-11-01'

View File

@ -22,7 +22,7 @@ WITH deco_logs_base AS (
FROM
{{ source(
'polygon_silver',
'decoded_logs_full'
'decoded_logs'
) }}
WHERE
block_timestamp :: DATE >= '2022-11-01'

View File

@ -46,7 +46,7 @@ sources:
schema: silver
tables:
- name: contracts
- name: decoded_logs_full
- name: decoded_logs
# - name: logs
- name: transfers
- name: bsc_silver
@ -68,7 +68,7 @@ sources:
schema: silver
tables:
- name: contracts
- name: decoded_logs_full
- name: decoded_logs
# - name: logs
- name: transfers
- name: arbitrum