Initial commit for Cosmos DBT Project (#1)

* Initial commit for Cosmos DBT Project

* Add gitignore

* Removed dbt packages

* Removed dbt logs

* Delete target

* Remove DS_Stores

* Delete ds_store
This commit is contained in:
Ryan-Loofy 2022-11-09 09:19:00 -05:00 committed by GitHub
parent 60e117cc20
commit 3bfdba7ebf
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
30 changed files with 569 additions and 0 deletions

17
.gitignore vendored Normal file
View File

@ -0,0 +1,17 @@
target/
dbt_modules/
# newer versions of dbt use this directory instead of dbt_modules for test dependencies
dbt_packages/
logs/
.venv/
.python-version
# Visual Studio Code files
*/.vscode
*.code-workspace
.history/
**/.DS_Store
.vscode/
.env

9
Dockerfile Normal file
View File

@ -0,0 +1,9 @@
FROM fishtownanalytics/dbt:1.0.0
WORKDIR /support
RUN mkdir /root/.dbt
COPY profiles.yml /root/.dbt
RUN mkdir /root/cosmos
WORKDIR /cosmos
COPY . .
EXPOSE 8080
ENTRYPOINT [ "bash"]

6
Makefile Normal file
View File

@ -0,0 +1,6 @@
SHELL := /bin/bash
dbt-console:
docker-compose run dbt_console
.PHONY: dbt-console

59
README.md Normal file
View File

@ -0,0 +1,59 @@
## Profile Set Up
#### Use the following within profiles.yml
----
```yml
cosmos:
target: dev
outputs:
dev:
type: snowflake
account: vna27887.us-east-1
role: INTERNAL_DEV
user: <USERNAME>
password: <PASSWORD>
region: us-east-1
database: COSMOS_DEV
warehouse: DBT_EMERGENCY
schema: silver
threads: 12
client_session_keep_alive: False
query_tag: <QUERY_TAG>
prod:
type: snowflake
account: vna27887.us-east-1
role: DBT_CLOUD_COSMOS
user: <USERNAME>
password: <PASSWORD>
region: us-east-1
database: COSMOS
warehouse: DBT_EMERGENCY
schema: silver
threads: 12
client_session_keep_alive: False
query_tag: <QUERY_TAG>
```
### Variables
To control which external table environment a model references, as well as, whether a Stream is invoked at runtime using control variables:
* STREAMLINE_INVOKE_STREAMS
When True, invokes streamline on model run as normal
When False, NO-OP
* STREAMLINE_USE_DEV_FOR_EXTERNAL_TABLES
When True, uses DEV schema Streamline.Cosmos_DEV
When False, uses PROD schema Streamline.Cosmos
Default values are False
* Usage:
dbt run --var '{"STREAMLINE_USE_DEV_FOR_EXTERNAL_TABLES":True, "STREAMLINE_INVOKE_STREAMS":True}' -m ...
### Resources:
* Learn more about dbt [in the docs](https://docs.getdbt.com/docs/introduction)
* Check out [Discourse](https://discourse.getdbt.com/) for commonly asked questions and answers
* Join the [chat](https://community.getdbt.com/) on Slack for live discussions and support
* Find [dbt events](https://events.getdbt.com) near you
* Check out [the blog](https://blog.getdbt.com/) for the latest news on dbt's development and best practices

0
analysis/.gitkeep Normal file
View File

0
data/.gitkeep Normal file
View File

44
dbt_project.yml Normal file
View File

@ -0,0 +1,44 @@
# Name your project! Project names should contain only lowercase characters
# and underscores. A good package name should reflect your organization's
# name or the intended use of these models
name: "cosmos_models"
version: "1.0.0"
config-version: 2
# This setting configures which "profile" dbt uses for this project.
profile: "cosmos"
# These configurations specify where dbt should look for different types of files.
# The `source-paths` config, for example, states that models in this project can be
# found in the "models/" directory. You probably won't need to change these!
model-paths: ["models"]
analysis-paths: ["analysis"]
test-paths: ["tests"]
seed-paths: ["data"]
macro-paths: ["macros"]
snapshot-paths: ["snapshots"]
target-path: "target" # directory which will store compiled SQL files
clean-targets: # directories to be removed by `dbt clean`
- "target"
- "dbt_modules"
- "dbt_packages"
models:
+copy_grants: true
+on_schema_change: sync_all_columns
tests:
+store_failures: true # all tests
# Configuring models
# Full documentation: https://docs.getdbt.com/docs/configuring-models
# In this example config, we tell dbt to build all models in the example/ directory
# as tables. These settings can be overridden in the individual model files
# using the `{{ config(...) }}` macro.
vars:
"dbt_date:time_zone": GMT
STREAMLINE_INVOKE_STREAMS: False
STREAMLINE_USE_DEV_FOR_EXTERNAL_TABLES: False

9
docker-compose.yml Normal file
View File

@ -0,0 +1,9 @@
version: "3.4"
services:
dbt_console:
build: .
volumes:
- .:/cosmos
env_file:
- .env

0
macros/.gitkeep Normal file
View File

6
macros/create_sps.sql Normal file
View File

@ -0,0 +1,6 @@
{% macro create_sps() %}
{% if target.database == 'COSMOS' %}
CREATE SCHEMA IF NOT EXISTS _internal;
{{ sp_create_prod_clone('_internal') }};
{% endif %}
{% endmacro %}

15
macros/create_udfs.sql Normal file
View File

@ -0,0 +1,15 @@
{% macro create_udfs() %}
{% set sql %}
CREATE schema if NOT EXISTS silver;
{{ create_udtf_get_base_table(
schema = "streamline"
) }}
{% endset %}
{% do run_query(sql) %}
{% set sql %}
{# {{ create_udf_get_cosmos_chainhead() }} #}
{{ create_udf_get_cosmos_blocks() }}
{% endset %}
{% do run_query(sql) %}
{% endmacro %}

View File

@ -0,0 +1,11 @@
{% macro generate_schema_name(custom_schema_name=none, node=none) -%}
{% set node_name = node.name %}
{% set split_name = node_name.split('__') %}
{{ split_name[0] | trim }}
{%- endmacro %}
{% macro generate_alias_name(custom_alias_name=none, node=none) -%}
{% set node_name = node.name %}
{% set split_name = node_name.split('__') %}
{{ split_name[1] | trim }}
{%- endmacro %}

View File

@ -0,0 +1,7 @@
{% macro run_sp_create_prod_clone() %}
{% set clone_query %}
call cosmos._internal.create_prod_clone('cosmos', 'cosmos_dev', 'internal_dev');
{% endset %}
{% do run_query(clone_query) %}
{% endmacro %}

View File

@ -0,0 +1,11 @@
{% macro create_aws_cosmos_api() %}
{% if target.name == "prod" %}
{% set sql %}
CREATE api integration IF NOT EXISTS aws_cosmos_api api_provider = aws_api_gateway api_aws_role_arn = 'arn:aws:iam::661245089684:role/snowflake-api-cosmos' api_allowed_prefixes = (
'https://e03pt6v501.execute-api.us-east-1.amazonaws.com/prod/',
'https://mryeusnrob.execute-api.us-east-1.amazonaws.com/dev/'
) enabled = TRUE;
{% endset %}
{% do run_query(sql) %}
{% endif %}
{% endmacro %}

View File

@ -0,0 +1,24 @@
{% macro create_udtf_get_base_table(schema) %}
create or replace function {{ schema }}.udtf_get_base_table(max_height integer)
returns table (height number)
as
$$
with base as (
select
row_number() over (
order by
seq4()
) as id
from
table(generator(rowcount => 100000000))
)
select
id as height
from
base
where
id <= max_height
$$
;
{% endmacro %}

View File

@ -0,0 +1,26 @@
{% macro create_sp_get_blocks_history() %}
{% set sql %}
CREATE
OR REPLACE PROCEDURE streamline.sp_get_blocks_history() returns variant LANGUAGE SQL AS $$
DECLARE
RESULT variant;
row_cnt INTEGER;
BEGIN
row_cnt:= (
SELECT
COUNT(1)
FROM
{{ ref('streamline__blocks_history') }}
);
if (
row_cnt > 0
) THEN RESULT:= (
SELECT
streamline.udf_get_blocks()
);
ELSE RESULT:= NULL;
END if;
RETURN RESULT;
END;$$ {% endset %}
{% do run_query(sql) %}
{% endmacro %}

View File

@ -0,0 +1,26 @@
{% macro create_sp_get_blocks_realtime() %}
{% set sql %}
CREATE
OR REPLACE PROCEDURE streamline.sp_get_blocks_realtime() returns variant LANGUAGE SQL AS $$
DECLARE
RESULT variant;
row_cnt INTEGER;
BEGIN
row_cnt:= (
SELECT
COUNT(1)
FROM
{{ ref('streamline__blocks_realtime') }}
);
if (
row_cnt > 0
) THEN RESULT:= (
SELECT
streamline.udf_get_blocks()
);
ELSE RESULT:= NULL;
END if;
RETURN RESULT;
END;$$ {% endset %}
{% do run_query(sql) %}
{% endmacro %}

View File

@ -0,0 +1,18 @@
{% macro create_udf_get_cosmos_blocks() %}
CREATE
OR REPLACE EXTERNAL FUNCTION streamline.udf_get_cosmos_blocks(
json variant
) returns text api_integration = aws_cosmos_api AS {% if target.name == "prod" %}
'https://e03pt6v501.execute-api.us-east-1.amazonaws.com/prod/bulk_get_cosmos_blocks'
{% else %}
'https://mryeusnrob.execute-api.us-east-1.amazonaws.com/dev/bulk_get_cosmos_blocks'
{%- endif %};
{% endmacro %}
{# {% macro create_udf_get_cosmos_chainhead() %}
CREATE EXTERNAL FUNCTION IF NOT EXISTS streamline.udf_get_chainhead() returns variant api_integration = aws_cosmos_api AS {% if target.name == "prod" %}
'https://e03pt6v501.execute-api.us-east-1.amazonaws.com/prod/get_cosmos_chainhead'
{% else %}
'https://mryeusnrob.execute-api.us-east-1.amazonaws.com/dev/get_cosmos_chainhead'
{%- endif %};
{% endmacro %} #}

View File

@ -0,0 +1,34 @@
{% test sequence_gaps(
model,
partition_by,
column_name
) %}
{%- set partition_sql = partition_by | join(", ") -%}
{%- set previous_column = "prev_" ~ column_name -%}
WITH source AS (
SELECT
{{ partition_sql + "," if partition_sql }}
{{ column_name }},
LAG(
{{ column_name }},
1
) over (
{{ "PARTITION BY " ~ partition_sql if partition_sql }}
ORDER BY
{{ column_name }} ASC
) AS {{ previous_column }}
FROM
{{ model }}
)
SELECT
{{ partition_sql + "," if partition_sql }}
{{ previous_column }},
{{ column_name }},
{{ column_name }} - {{ previous_column }}
- 1 AS gap
FROM
source
WHERE
{{ column_name }} - {{ previous_column }} <> 1
ORDER BY
gap DESC {% endtest %}

33
macros/tests/tx_gaps.sql Normal file
View File

@ -0,0 +1,33 @@
{% macro tx_gaps(
model
) %}
WITH block_base AS (
SELECT
block_number,
tx_count
FROM
{{ ref('silver__blocks') }}
),
model_name AS (
SELECT
block_number,
COUNT(
DISTINCT tx_hash
) AS model_tx_count
FROM
{{ model }}
GROUP BY
block_number
)
SELECT
block_base.block_number,
tx_count,
model_name.block_number AS model_block_number,
model_tx_count
FROM
block_base
LEFT JOIN model_name
ON block_base.block_number = model_name.block_number
WHERE
tx_count <> model_tx_count
{% endmacro %}

35
macros/utils.sql Normal file
View File

@ -0,0 +1,35 @@
{% macro if_data_call_function(
func,
target
) %}
{% if var(
"STREAMLINE_INVOKE_STREAMS"
) %}
{% if execute %}
{{ log(
"Running macro `if_data_call_function`: Calling udf " ~ func ~ " on " ~ target,
True
) }}
{% endif %}
SELECT
{{ func }}
WHERE
EXISTS(
SELECT
1
FROM
{{ target }}
LIMIT
1
)
{% else %}
{% if execute %}
{{ log(
"Running macro `if_data_call_function`: NOOP",
False
) }}
{% endif %}
SELECT
NULL
{% endif %}
{% endmacro %}

View File

@ -0,0 +1,19 @@
{{ config (
materialized = "view",
tags = ['streamline_view']
) }}
{% if execute %}
{# {% set height = run_query('SELECT streamline.udf_get_cosmos_chainhead()') %}
{% set block_height = height.columns[0].values()[0] %}
{% else %} #}
{% set block_height = 12000000 %}
{% endif %}
SELECT
height as block_number
FROM
TABLE(streamline.udtf_get_base_table({{block_height}}))
WHERE
height >= 1000000 -- Highest block the archive has available

View File

@ -0,0 +1,40 @@
{{ config (
materialized = "view",
post_hook = if_data_call_function(
func = "{{this.schema}}.udf_get_cosmos_blocks(object_construct('sql_source', '{{this.identifier}}'))",
target = "{{this.schema}}.{{this.identifier}}"
)
) }}
{% for item in range(12) %}
(
SELECT
{{ dbt_utils.surrogate_key(
['block_number']
) }} AS id,
block_number
FROM
{{ ref("streamline__blocks") }}
WHERE
block_number BETWEEN {{ item * 1000000 + 1 }}
AND {{(
item + 1
) * 1000000 }}
EXCEPT
SELECT
id,
block_number
FROM
{{ ref("streamline__complete_blocks") }}
WHERE
block_number BETWEEN {{ item * 1000000 + 1 }}
AND {{(
item + 1
) * 1000000 }}
ORDER BY
block_number
) {% if not loop.last %}
UNION ALL
{% endif %}
{% endfor %}

View File

@ -0,0 +1,32 @@
{{ config (
materialized = "view",
post_hook = if_data_call_function(
func = "{{this.schema}}.udf_get_cosmos_blocks(object_construct('sql_source', '{{this.identifier}}'))",
target = "{{this.schema}}.{{this.identifier}}"
)
) }}
SELECT
{{ dbt_utils.surrogate_key(
['block_number']
) }} AS id,
block_number
FROM
{{ ref("streamline__blocks") }}
WHERE
block_number > 12000000
AND block_number IS NOT NULL
EXCEPT
SELECT
id,
block_number
FROM
{{ ref("streamline__complete_blocks") }}
WHERE
block_number > 12000000
{# UNION ALL
SELECT
id,
block_number
FROM
{{ ref("streamline__blocks_history") }} #}

View File

@ -0,0 +1,54 @@
{{ config (
materialized = "incremental",
unique_key = "id",
cluster_by = "ROUND(block_number, -3)",
merge_update_columns = ["id"]
) }}
WITH meta AS (
SELECT
last_modified,
file_name
FROM
TABLE(
information_schema.external_table_files(
table_name => '{{ source( "bronze_streamline", "blocks") }}'
)
) A
)
{% if is_incremental() %},
max_date AS (
SELECT
COALESCE(MAX(_INSERTED_TIMESTAMP), '1970-01-01' :: DATE) max_INSERTED_TIMESTAMP
FROM
{{ this }})
{% endif %}
SELECT
{{ dbt_utils.surrogate_key(
['block_number']
) }} AS id,
block_number,
last_modified AS _inserted_timestamp
FROM
{{ source(
"bronze_streamline",
"blocks"
) }}
JOIN meta b
ON b.file_name = metadata$filename
{% if is_incremental() %}
WHERE
b.last_modified > (
SELECT
max_INSERTED_TIMESTAMP
FROM
max_date
)
{% endif %}
qualify(ROW_NUMBER() over (PARTITION BY id
ORDER BY
_inserted_timestamp DESC)) = 1

9
models/sources.yml Normal file
View File

@ -0,0 +1,9 @@
version: 2
sources:
- name: bronze_streamline
database: streamline
schema: |
{{ "COSMOS_DEV" if var("STREAMLINE_USE_DEV_FOR_EXTERNAL_TABLES", False) else "COSMOS" }}
tables:
- name: blocks

6
packages.yml Normal file
View File

@ -0,0 +1,6 @@
packages:
- package: calogica/dbt_expectations
version: [">=0.4.0", "<0.9.0"]
- package: dbt-labs/dbt_utils
version: 0.9.2

19
profiles.yml Normal file
View File

@ -0,0 +1,19 @@
cosmos:
target: dev
outputs:
dev:
type: snowflake
account: "{{ env_var('SF_ACCOUNT') }}"
# User/password auth
user: "{{ env_var('SF_USERNAME') }}"
password: "{{ env_var('SF_PASSWORD') }}"
role: "{{ env_var('SF_ROLE') }}"
schema: "{{ env_var('SF_SCHEMA') }}"
region: "{{ env_var('SF_REGION') }}"
database: "{{ env_var('SF_DATABASE') }}"
warehouse: "{{ env_var('SF_WAREHOUSE') }}"
threads: 4
client_session_keep_alive: False
query_tag: cosmos_curator
config:
send_anonymous_usage_stats: False

0
snapshots/.gitkeep Normal file
View File

0
tests/.gitkeep Normal file
View File