From eb99c42fb8f3b7ded3ebb9d979496f3cfd52859b Mon Sep 17 00:00:00 2001 From: Julius Remigio <14811322+juls858@users.noreply.github.com> Date: Fri, 24 Feb 2023 10:03:53 -0800 Subject: [PATCH] LiveQuery prod schemas (#4) --- README.md | 29 +++-- ...st_create_or_drop_function_from_config.sql | 19 +-- dbt_project.yml | 6 +- macros/create_udfs.sql | 20 +-- macros/streamline/configs.yaml.sql | 123 +++++++++++------- macros/streamline/functions.py.sql | 42 ++++++ macros/streamline/utils.sql | 1 + 7 files changed, 150 insertions(+), 90 deletions(-) create mode 100644 macros/streamline/functions.py.sql diff --git a/README.md b/README.md index 51b91f5..181f6a7 100644 --- a/README.md +++ b/README.md @@ -1,8 +1,10 @@ +# LiveQuery Models + +Dbt repo for managing LiveQuery database. + ## Profile Set Up -#### Use the following within profiles.yml - ----- +Use the following within profiles.yml ```yml livequery: @@ -36,9 +38,10 @@ livequery: query_tag: ``` -### Variables +## Variables To control the creation of UDF or SP macros with dbt run: + * UPDATE_UDFS_AND_SPS When True, executes all macros included in the on-run-start hooks within dbt_project.yml on model run as normal When False, none of the on-run-start macros are executed on model run @@ -51,11 +54,11 @@ dbt run --var 'UPDATE_UDFS_AND_SPS": True' -m ... Dropping and creating udfs can also be done without running a model: ```sh -dbt run-operation create_udfs --args 'drop_:false' -dbt run-operation create_udfs --args 'drop_:true' +dbt run-operation create_udfs --var 'UPDATE_UDFS_AND_SPS": True' --args 'drop_:false' +dbt run-operation create_udfs --var 'UPDATE_UDFS_AND_SPS": True' --args 'drop_:true' ``` -### Resources: +## Resources * Learn more about dbt [in the docs](https://docs.getdbt.com/docs/introduction) * Check out [Discourse](https://discourse.getdbt.com/) for commonly asked questions and answers @@ -65,11 +68,11 @@ dbt run-operation create_udfs --args 'drop_:true' ## Applying Model Tags -### Database / Schema level tags +## Database / Schema level tags Database and schema tags are applied via the `add_database_or_schema_tags` macro. These tags are inherited by their downstream objects. To add/modify tags call the appropriate tag set function within the macro. -``` +```jinja {{ set_database_tag_value('SOME_DATABASE_TAG_KEY','SOME_DATABASE_TAG_VALUE') }} {{ set_schema_tag_value('SOME_SCHEMA_TAG_KEY','SOME_SCHEMA_TAG_VALUE') }} ``` @@ -78,9 +81,9 @@ Database and schema tags are applied via the `add_database_or_schema_tags` macro To add/update a model's snowflake tags, add/modify the `meta` model property under `config` . Only table level tags are supported at this time via DBT. -``` +```jinja {{ config( - ..., + ... meta={ 'database_tags':{ 'table': { @@ -94,13 +97,13 @@ To add/update a model's snowflake tags, add/modify the `meta` model property und By default, model tags are pushed to Snowflake on each load. You can disable this by setting the `UPDATE_SNOWFLAKE_TAGS` project variable to `False` during a run. -``` +```sh dbt run --var '{"UPDATE_SNOWFLAKE_TAGS":False}' -s models/core/core__fact_blocks.sql ``` ### Querying for existing tags on a model in snowflake -``` +```sql select * from table(livequery.information_schema.tag_references('livequery.core.fact_blocks', 'table')); ``` diff --git a/analysis/test_create_or_drop_function_from_config.sql b/analysis/test_create_or_drop_function_from_config.sql index fa4eae0..32168bf 100644 --- a/analysis/test_create_or_drop_function_from_config.sql +++ b/analysis/test_create_or_drop_function_from_config.sql @@ -1,17 +1,8 @@ -{% set name %} +{%- set name -%} {{- udf_configs() -}} {% endset %} {% set udfs = fromyaml(name) %} - -{{- create_or_drop_function_from_config(udfs["streamline.introspect"], drop_=True) -}} -{{- create_or_drop_function_from_config(udfs["streamline.whoami"], drop_=True) -}} -{{- create_or_drop_function_from_config(udfs["streamline.udf_register_secret"], drop_=True) -}} -{{- create_or_drop_function_from_config(udfs["beta.udf_register_secret"], drop_=True) -}} -{{- create_or_drop_function_from_config(udfs["streamline.udf_api"], drop_=True) -}} -{{- create_or_drop_function_from_config(udfs["beta.udf_api"], drop_=True) -}} -{{- create_or_drop_function_from_config(udfs["streamline.introspect"], drop_=False) -}} -{{- create_or_drop_function_from_config(udfs["streamline.whoami"], drop_=False) -}} -{{- create_or_drop_function_from_config(udfs["streamline.udf_register_secret"], drop_=False) -}} -{{- create_or_drop_function_from_config(udfs["beta.udf_register_secret"], drop_=False) -}} -{{- create_or_drop_function_from_config(udfs["streamline.udf_api"], drop_=False) -}} -{{- create_or_drop_function_from_config(udfs["beta.udf_api"], drop_=False) -}} \ No newline at end of file +{%- for udf in udfs -%} +{{- create_or_drop_function_from_config(udf, drop_=True) -}} +{{- create_or_drop_function_from_config(udf, drop_=False) -}} +{% endfor %} diff --git a/dbt_project.yml b/dbt_project.yml index 0ffe08b..4257307 100644 --- a/dbt_project.yml +++ b/dbt_project.yml @@ -43,11 +43,11 @@ on-run-end: vars: "dbt_date:time_zone": GMT - UPDATE_UDFS_AND_SPS: true - UPDATE_SNOWFLAKE_TAGS: True + UPDATE_UDFS_AND_SPS: false + UPDATE_SNOWFLAKE_TAGS: true STREAMLINE_INVOKE_STREAMS: False STREAMLINE_USE_DEV_FOR_EXTERNAL_TABLES: False REST_API_ID_PROD: hn8uqhku77 REST_API_ID_DEV: hn8uqhku77 - API_INTEGRATION: AWS_LIVE_QUERY_DEV + API_INTEGRATION: AWS_LIVE_QUERY{{ '_DEV' if target.name != 'prod' else '' }} AWS_REGION: us-east-1 diff --git a/macros/create_udfs.sql b/macros/create_udfs.sql index 4faba28..bdfa5da 100644 --- a/macros/create_udfs.sql +++ b/macros/create_udfs.sql @@ -6,20 +6,14 @@ {% set udfs = fromyaml(name) %} {% set sql %} CREATE schema if NOT EXISTS silver; - CREATE schema if NOT EXISTS streamline; CREATE schema if NOT EXISTS beta; - {{- create_or_drop_function_from_config(udfs["streamline.introspect"], drop_=True) }} - {{- create_or_drop_function_from_config(udfs["streamline.whoami"], drop_=True) }} - {{- create_or_drop_function_from_config(udfs["streamline.udf_register_secret"], drop_=True) }} - {{- create_or_drop_function_from_config(udfs["beta.udf_register_secret"], drop_=True) }} - {{- create_or_drop_function_from_config(udfs["streamline.udf_api"], drop_=True) }} - {{- create_or_drop_function_from_config(udfs["beta.udf_api"], drop_=True) }} - {{- create_or_drop_function_from_config(udfs["streamline.introspect"], drop_=False) }} - {{- create_or_drop_function_from_config(udfs["streamline.whoami"], drop_=False) }} - {{- create_or_drop_function_from_config(udfs["streamline.udf_register_secret"], drop_=False) }} - {{- create_or_drop_function_from_config(udfs["beta.udf_register_secret"], drop_=False) }} - {{- create_or_drop_function_from_config(udfs["streamline.udf_api"], drop_=False) }} - {{- create_or_drop_function_from_config(udfs["beta.udf_api"], drop_=False) }} + CREATE schema if NOT EXISTS utils; + CREATE schema if NOT EXISTS _utils; + CREATE schema if NOT EXISTS _live; + CREATE schema if NOT EXISTS live; + {%- for udf in udfs -%} + {{- create_or_drop_function_from_config(udf, drop_=drop_) -}} + {% endfor %} {% endset %} {% do run_query(sql) %} {% endif %} diff --git a/macros/streamline/configs.yaml.sql b/macros/streamline/configs.yaml.sql index 20a41ff..dfb8575 100644 --- a/macros/streamline/configs.yaml.sql +++ b/macros/streamline/configs.yaml.sql @@ -1,27 +1,90 @@ {% macro udf_configs() %} -streamline.introspect: - name: streamline.udf_introspect + +{# + UTILITY SCHEMA +#} +- name: _utils.udf_introspect signature: - [echo, STRING] func_type: SECURE EXTERNAL return_type: TEXT - api_integration: AWS_LIVE_QUERY_DEV + api_integration: '{{ var("API_INTEGRATION") }}' sql: introspect -beta.udf_register_secret: - name: beta.udf_register_secret + +- name: _utils.udf_whoami + signature: [] + func_type: SECURE + return_type: TEXT + options: NOT NULL STRICT IMMUTABLE MEMOIZABLE + sql: | + SELECT + COALESCE(SPLIT_PART(GETVARIABLE('QUERY_TAG_SESSION'), ',',2), CURRENT_USER()) + +- name: _utils.udf_register_secret signature: - - [request_id, string] - - [key, string] + - [request_id, STRING] + - [user_id, STRING] + - [key, STRING] + return_type: TEXT + func_type: SECURE EXTERNAL + api_integration: '{{ var("API_INTEGRATION") }}' + options: NOT NULL STRICT + sql: secret/register +- name: utils.udf_register_secret + signature: + - [request_id, STRING] + - [key, STRING] func_type: SECURE return_type: TEXT options: NOT NULL STRICT IMMUTABLE sql: | SELECT - STREAMLINE.UDF_REGISTER_SECRET(REQUEST_ID, STREAMLINE.UDF_WHOAMI(), KEY) + _utils.UDF_REGISTER_SECRET(REQUEST_ID, _utils.UDF_WHOAMI(), KEY) -beta.udf_api: - name: beta.udf_api +- name: utils.udf_hex_to_int + signature: + - [hex, STRING] + return_type: TEXT + options: | + NULL + LANGUAGE PYTHON + STRICT IMMUTABLE + RUNTIME_VERSION = '3.8' + HANDLER = 'hex_to_int' + sql: | + {{ python_hex_to_int() | indent(4) }} +- name: utils.udf_hex_to_int + signature: + - [encoding, STRING] + - [hex, STRING] + return_type: TEXT + options: | + NULL + LANGUAGE PYTHON + STRICT IMMUTABLE + RUNTIME_VERSION = '3.8' + HANDLER = 'hex_to_int' + sql: | + {{ python_udf_hex_to_int_with_encoding() | indent(4) }} + +{# + LIVE SCHEMA +#} +- name: _live.udf_api + signature: + - [method, STRING] + - [url, STRING] + - [headers, OBJECT] + - [DATA, OBJECT] + - [user_id, STRING] + - [SECRET, STRING] + return_type: VARIANT + func_type: SECURE EXTERNAL + api_integration: '{{ var("API_INTEGRATION") }}' + options: NOT NULL STRICT + sql: udf_api +- name: live.udf_api signature: - [method, STRING] - [url, STRING] @@ -33,49 +96,15 @@ beta.udf_api: options: NOT NULL STRICT VOLATILE sql: | SELECT - STREAMLINE.UDF_API( + _live.UDF_API( method, url, headers, data, - STREAMLINE.UDF_WHOAMI(), + _utils.UDF_WHOAMI(), secret_name ) -streamline.udf_api: - name: streamline.udf_api - signature: - - [method, STRING] - - [url, STRING] - - [headers, OBJECT] - - [DATA, OBJECT] - - [user_id, STRING] - - [SECRET, STRING] - return_type: VARIANT - func_type: SECURE EXTERNAL - api_integration: AWS_LIVE_QUERY_DEV - options: NOT NULL STRICT - sql: udf_api -streamline.udf_register_secret: - name: streamline.udf_register_secret - signature: - - [request_id, string] - - [user_id, string] - - [key, string] - return_type: TEXT - func_type: SECURE EXTERNAL - api_integration: AWS_LIVE_QUERY_DEV - options: NOT NULL STRICT - sql: secret/register - -streamline.whoami: - name: streamline.udf_whoami - signature: [] - func_type: SECURE - return_type: TEXT - options: NOT NULL STRICT IMMUTABLE MEMOIZABLE - sql: | - SELECT - COALESCE(SPLIT_PART(GETVARIABLE('QUERY_TAG_SESSION'), ',',2), CURRENT_USER()) {% endmacro %} + diff --git a/macros/streamline/functions.py.sql b/macros/streamline/functions.py.sql new file mode 100644 index 0000000..7f3acbc --- /dev/null +++ b/macros/streamline/functions.py.sql @@ -0,0 +1,42 @@ +{% macro python_hex_to_int() %} +def hex_to_int(hex) -> str: + """ + Converts hex (of any size) to int (as a string). Snowflake and java script can only handle up to 64-bit (38 digits of precision) + hex_to_int('200000000000000000000000000000211'); + >> 680564733841876926926749214863536423441 + hex_to_int('0x200000000000000000000000000000211'); + >> 680564733841876926926749214863536423441 + hex_to_int(NULL); + >> NULL + """ + return (str(int(hex, 16)) if hex and hex != "0x" else None) +{% endmacro %} + + +{% macro python_udf_hex_to_int_with_encoding() %} +def hex_to_int(encoding, hex) -> str: + """ + Converts hex (of any size) to int (as a string). Snowflake and java script can only handle up to 64-bit (38 digits of precision) + hex_to_int('hex', '200000000000000000000000000000211'); + >> 680564733841876926926749214863536423441 + hex_to_int('hex', '0x200000000000000000000000000000211'); + >> 680564733841876926926749214863536423441 + hex_to_int('hex', NULL); + >> NULL + hex_to_int('s2c', 'ffffffffffffffffffffffffffffffffffffffffffffffffffffffffe5b83acf'); + >> -440911153 + """ + if not hex: + return None + if encoding.lower() == 's2c': + if hex[0:2].lower() != '0x': + hex = f'0x{hex}' + + bits = len(hex[2:])*4 + value = int(hex, 0) + if value & (1 << (bits-1)): + value -= 1 << bits + return str(value) + else: + return str(int(hex, 16)) +{% endmacro %} \ No newline at end of file diff --git a/macros/streamline/utils.sql b/macros/streamline/utils.sql index 9a0031e..04e28e5 100644 --- a/macros/streamline/utils.sql +++ b/macros/streamline/utils.sql @@ -37,6 +37,7 @@ CREATE OR REPLACE {{ func_type }} FUNCTION {{ name_ }}( {{- compile_signature(signature) }} ) + COPY GRANTS RETURNS {{ return_type }} {% if options -%} {{ options }}