LiveQuery prod schemas (#4)

This commit is contained in:
Julius Remigio 2023-02-24 10:03:53 -08:00 committed by GitHub
parent 8e55f678c2
commit eb99c42fb8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 150 additions and 90 deletions

View File

@ -1,8 +1,10 @@
# LiveQuery Models
Dbt repo for managing LiveQuery database.
## Profile Set Up
#### Use the following within profiles.yml
----
Use the following within profiles.yml
```yml
livequery:
@ -36,9 +38,10 @@ livequery:
query_tag: <TAG>
```
### Variables
## Variables
To control the creation of UDF or SP macros with dbt run:
* UPDATE_UDFS_AND_SPS
When True, executes all macros included in the on-run-start hooks within dbt_project.yml on model run as normal
When False, none of the on-run-start macros are executed on model run
@ -51,11 +54,11 @@ dbt run --var 'UPDATE_UDFS_AND_SPS": True' -m ...
Dropping and creating udfs can also be done without running a model:
```sh
dbt run-operation create_udfs --args 'drop_:false'
dbt run-operation create_udfs --args 'drop_:true'
dbt run-operation create_udfs --var 'UPDATE_UDFS_AND_SPS": True' --args 'drop_:false'
dbt run-operation create_udfs --var 'UPDATE_UDFS_AND_SPS": True' --args 'drop_:true'
```
### Resources:
## Resources
* Learn more about dbt [in the docs](https://docs.getdbt.com/docs/introduction)
* Check out [Discourse](https://discourse.getdbt.com/) for commonly asked questions and answers
@ -65,11 +68,11 @@ dbt run-operation create_udfs --args 'drop_:true'
## Applying Model Tags
### Database / Schema level tags
## Database / Schema level tags
Database and schema tags are applied via the `add_database_or_schema_tags` macro. These tags are inherited by their downstream objects. To add/modify tags call the appropriate tag set function within the macro.
```
```jinja
{{ set_database_tag_value('SOME_DATABASE_TAG_KEY','SOME_DATABASE_TAG_VALUE') }}
{{ set_schema_tag_value('SOME_SCHEMA_TAG_KEY','SOME_SCHEMA_TAG_VALUE') }}
```
@ -78,9 +81,9 @@ Database and schema tags are applied via the `add_database_or_schema_tags` macro
To add/update a model's snowflake tags, add/modify the `meta` model property under `config` . Only table level tags are supported at this time via DBT.
```
```jinja
{{ config(
...,
...
meta={
'database_tags':{
'table': {
@ -94,13 +97,13 @@ To add/update a model's snowflake tags, add/modify the `meta` model property und
By default, model tags are pushed to Snowflake on each load. You can disable this by setting the `UPDATE_SNOWFLAKE_TAGS` project variable to `False` during a run.
```
```sh
dbt run --var '{"UPDATE_SNOWFLAKE_TAGS":False}' -s models/core/core__fact_blocks.sql
```
### Querying for existing tags on a model in snowflake
```
```sql
select *
from table(livequery.information_schema.tag_references('livequery.core.fact_blocks', 'table'));
```

View File

@ -1,17 +1,8 @@
{% set name %}
{%- set name -%}
{{- udf_configs() -}}
{% endset %}
{% set udfs = fromyaml(name) %}
{{- create_or_drop_function_from_config(udfs["streamline.introspect"], drop_=True) -}}
{{- create_or_drop_function_from_config(udfs["streamline.whoami"], drop_=True) -}}
{{- create_or_drop_function_from_config(udfs["streamline.udf_register_secret"], drop_=True) -}}
{{- create_or_drop_function_from_config(udfs["beta.udf_register_secret"], drop_=True) -}}
{{- create_or_drop_function_from_config(udfs["streamline.udf_api"], drop_=True) -}}
{{- create_or_drop_function_from_config(udfs["beta.udf_api"], drop_=True) -}}
{{- create_or_drop_function_from_config(udfs["streamline.introspect"], drop_=False) -}}
{{- create_or_drop_function_from_config(udfs["streamline.whoami"], drop_=False) -}}
{{- create_or_drop_function_from_config(udfs["streamline.udf_register_secret"], drop_=False) -}}
{{- create_or_drop_function_from_config(udfs["beta.udf_register_secret"], drop_=False) -}}
{{- create_or_drop_function_from_config(udfs["streamline.udf_api"], drop_=False) -}}
{{- create_or_drop_function_from_config(udfs["beta.udf_api"], drop_=False) -}}
{%- for udf in udfs -%}
{{- create_or_drop_function_from_config(udf, drop_=True) -}}
{{- create_or_drop_function_from_config(udf, drop_=False) -}}
{% endfor %}

View File

@ -43,11 +43,11 @@ on-run-end:
vars:
"dbt_date:time_zone": GMT
UPDATE_UDFS_AND_SPS: true
UPDATE_SNOWFLAKE_TAGS: True
UPDATE_UDFS_AND_SPS: false
UPDATE_SNOWFLAKE_TAGS: true
STREAMLINE_INVOKE_STREAMS: False
STREAMLINE_USE_DEV_FOR_EXTERNAL_TABLES: False
REST_API_ID_PROD: hn8uqhku77
REST_API_ID_DEV: hn8uqhku77
API_INTEGRATION: AWS_LIVE_QUERY_DEV
API_INTEGRATION: AWS_LIVE_QUERY{{ '_DEV' if target.name != 'prod' else '' }}
AWS_REGION: us-east-1

View File

@ -6,20 +6,14 @@
{% set udfs = fromyaml(name) %}
{% set sql %}
CREATE schema if NOT EXISTS silver;
CREATE schema if NOT EXISTS streamline;
CREATE schema if NOT EXISTS beta;
{{- create_or_drop_function_from_config(udfs["streamline.introspect"], drop_=True) }}
{{- create_or_drop_function_from_config(udfs["streamline.whoami"], drop_=True) }}
{{- create_or_drop_function_from_config(udfs["streamline.udf_register_secret"], drop_=True) }}
{{- create_or_drop_function_from_config(udfs["beta.udf_register_secret"], drop_=True) }}
{{- create_or_drop_function_from_config(udfs["streamline.udf_api"], drop_=True) }}
{{- create_or_drop_function_from_config(udfs["beta.udf_api"], drop_=True) }}
{{- create_or_drop_function_from_config(udfs["streamline.introspect"], drop_=False) }}
{{- create_or_drop_function_from_config(udfs["streamline.whoami"], drop_=False) }}
{{- create_or_drop_function_from_config(udfs["streamline.udf_register_secret"], drop_=False) }}
{{- create_or_drop_function_from_config(udfs["beta.udf_register_secret"], drop_=False) }}
{{- create_or_drop_function_from_config(udfs["streamline.udf_api"], drop_=False) }}
{{- create_or_drop_function_from_config(udfs["beta.udf_api"], drop_=False) }}
CREATE schema if NOT EXISTS utils;
CREATE schema if NOT EXISTS _utils;
CREATE schema if NOT EXISTS _live;
CREATE schema if NOT EXISTS live;
{%- for udf in udfs -%}
{{- create_or_drop_function_from_config(udf, drop_=drop_) -}}
{% endfor %}
{% endset %}
{% do run_query(sql) %}
{% endif %}

View File

@ -1,27 +1,90 @@
{% macro udf_configs() %}
streamline.introspect:
name: streamline.udf_introspect
{#
UTILITY SCHEMA
#}
- name: _utils.udf_introspect
signature:
- [echo, STRING]
func_type: SECURE EXTERNAL
return_type: TEXT
api_integration: AWS_LIVE_QUERY_DEV
api_integration: '{{ var("API_INTEGRATION") }}'
sql: introspect
beta.udf_register_secret:
name: beta.udf_register_secret
- name: _utils.udf_whoami
signature: []
func_type: SECURE
return_type: TEXT
options: NOT NULL STRICT IMMUTABLE MEMOIZABLE
sql: |
SELECT
COALESCE(SPLIT_PART(GETVARIABLE('QUERY_TAG_SESSION'), ',',2), CURRENT_USER())
- name: _utils.udf_register_secret
signature:
- [request_id, string]
- [key, string]
- [request_id, STRING]
- [user_id, STRING]
- [key, STRING]
return_type: TEXT
func_type: SECURE EXTERNAL
api_integration: '{{ var("API_INTEGRATION") }}'
options: NOT NULL STRICT
sql: secret/register
- name: utils.udf_register_secret
signature:
- [request_id, STRING]
- [key, STRING]
func_type: SECURE
return_type: TEXT
options: NOT NULL STRICT IMMUTABLE
sql: |
SELECT
STREAMLINE.UDF_REGISTER_SECRET(REQUEST_ID, STREAMLINE.UDF_WHOAMI(), KEY)
_utils.UDF_REGISTER_SECRET(REQUEST_ID, _utils.UDF_WHOAMI(), KEY)
beta.udf_api:
name: beta.udf_api
- name: utils.udf_hex_to_int
signature:
- [hex, STRING]
return_type: TEXT
options: |
NULL
LANGUAGE PYTHON
STRICT IMMUTABLE
RUNTIME_VERSION = '3.8'
HANDLER = 'hex_to_int'
sql: |
{{ python_hex_to_int() | indent(4) }}
- name: utils.udf_hex_to_int
signature:
- [encoding, STRING]
- [hex, STRING]
return_type: TEXT
options: |
NULL
LANGUAGE PYTHON
STRICT IMMUTABLE
RUNTIME_VERSION = '3.8'
HANDLER = 'hex_to_int'
sql: |
{{ python_udf_hex_to_int_with_encoding() | indent(4) }}
{#
LIVE SCHEMA
#}
- name: _live.udf_api
signature:
- [method, STRING]
- [url, STRING]
- [headers, OBJECT]
- [DATA, OBJECT]
- [user_id, STRING]
- [SECRET, STRING]
return_type: VARIANT
func_type: SECURE EXTERNAL
api_integration: '{{ var("API_INTEGRATION") }}'
options: NOT NULL STRICT
sql: udf_api
- name: live.udf_api
signature:
- [method, STRING]
- [url, STRING]
@ -33,49 +96,15 @@ beta.udf_api:
options: NOT NULL STRICT VOLATILE
sql: |
SELECT
STREAMLINE.UDF_API(
_live.UDF_API(
method,
url,
headers,
data,
STREAMLINE.UDF_WHOAMI(),
_utils.UDF_WHOAMI(),
secret_name
)
streamline.udf_api:
name: streamline.udf_api
signature:
- [method, STRING]
- [url, STRING]
- [headers, OBJECT]
- [DATA, OBJECT]
- [user_id, STRING]
- [SECRET, STRING]
return_type: VARIANT
func_type: SECURE EXTERNAL
api_integration: AWS_LIVE_QUERY_DEV
options: NOT NULL STRICT
sql: udf_api
streamline.udf_register_secret:
name: streamline.udf_register_secret
signature:
- [request_id, string]
- [user_id, string]
- [key, string]
return_type: TEXT
func_type: SECURE EXTERNAL
api_integration: AWS_LIVE_QUERY_DEV
options: NOT NULL STRICT
sql: secret/register
streamline.whoami:
name: streamline.udf_whoami
signature: []
func_type: SECURE
return_type: TEXT
options: NOT NULL STRICT IMMUTABLE MEMOIZABLE
sql: |
SELECT
COALESCE(SPLIT_PART(GETVARIABLE('QUERY_TAG_SESSION'), ',',2), CURRENT_USER())
{% endmacro %}

View File

@ -0,0 +1,42 @@
{% macro python_hex_to_int() %}
def hex_to_int(hex) -> str:
"""
Converts hex (of any size) to int (as a string). Snowflake and java script can only handle up to 64-bit (38 digits of precision)
hex_to_int('200000000000000000000000000000211');
>> 680564733841876926926749214863536423441
hex_to_int('0x200000000000000000000000000000211');
>> 680564733841876926926749214863536423441
hex_to_int(NULL);
>> NULL
"""
return (str(int(hex, 16)) if hex and hex != "0x" else None)
{% endmacro %}
{% macro python_udf_hex_to_int_with_encoding() %}
def hex_to_int(encoding, hex) -> str:
"""
Converts hex (of any size) to int (as a string). Snowflake and java script can only handle up to 64-bit (38 digits of precision)
hex_to_int('hex', '200000000000000000000000000000211');
>> 680564733841876926926749214863536423441
hex_to_int('hex', '0x200000000000000000000000000000211');
>> 680564733841876926926749214863536423441
hex_to_int('hex', NULL);
>> NULL
hex_to_int('s2c', 'ffffffffffffffffffffffffffffffffffffffffffffffffffffffffe5b83acf');
>> -440911153
"""
if not hex:
return None
if encoding.lower() == 's2c':
if hex[0:2].lower() != '0x':
hex = f'0x{hex}'
bits = len(hex[2:])*4
value = int(hex, 0)
if value & (1 << (bits-1)):
value -= 1 << bits
return str(value)
else:
return str(int(hex, 16))
{% endmacro %}

View File

@ -37,6 +37,7 @@
CREATE OR REPLACE {{ func_type }} FUNCTION {{ name_ }}(
{{- compile_signature(signature) }}
)
COPY GRANTS
RETURNS {{ return_type }}
{% if options -%}
{{ options }}