livequery-models/macros/livequery/manage_udfs.sql
Jensen Yap a3b004d0cc
Some checks failed
docs_update / docs_update (push) Has been cancelled
dbt_run_dev_refresh / dev_refresh (push) Has been cancelled
integration test / ${{ matrix.environment }} (hosted, XSMALL) (push) Has been cancelled
integration test / ${{ matrix.environment }} (prod, DBT_CLOUD) (push) Has been cancelled
[STREAM-1155] Enhance UDF definitions and add new UDF for S3 presigned URL retrieval (#125)
2025-07-30 01:26:54 +09:00

253 lines
9.0 KiB
PL/PgSQL

{% macro drop_function(
func_name,
signature
) %}
DROP FUNCTION IF EXISTS {{ func_name }}({{ compile_signature(signature, drop_ = True) }});
{% endmacro %}
{%- macro construct_api_route(route) -%}
'https://{{ var("EXTERNAL_FUNCTION_URI") | lower }}{{ route }}'
{%- endmacro -%}
{%- macro compile_signature(
params,
drop_ = False
) -%}
{% for p in params -%}
{%- set name = p.0 -%}
{%- set data_type = p.1 -%}
{% if drop_ %}
{{ data_type -}}
{% else %}
{{ name ~ " " ~ data_type -}}
{%- endif -%}
{%-if not loop.last -%},
{%- endif -%}
{% endfor -%}
{%- endmacro -%}
{%- macro format_headers(headers) -%}
{%- if headers -%}
{%- if headers is mapping -%}
{%- set header_items = [] -%}
{%- for key, value in headers.items() -%}
{%- set _ = header_items.append("'" ~ key ~ "' = '" ~ value ~ "'") -%}
{%- endfor -%}
HEADERS = (
{{ header_items | join(',\n ') }}
)
{%- elif headers is iterable -%}
{%- set header_items = [] -%}
{%- for item in headers -%}
{%- if item is mapping -%}
{%- for key, value in item.items() -%}
{%- set _ = header_items.append("'" ~ key ~ "' = '" ~ value ~ "'") -%}
{%- endfor -%}
{%- endif -%}
{%- endfor -%}
HEADERS = (
{{ header_items | join(',\n ') }}
)
{%- endif -%}
{%- endif -%}
{%- endmacro -%}
{% macro create_sql_function(
name_,
signature,
return_type,
sql_,
api_integration = none,
options = none,
func_type = none,
max_batch_rows = none,
headers = none
) %}
CREATE OR REPLACE {{ func_type }} FUNCTION {{ name_ }}(
{{- livequery_models.compile_signature(signature) }}
)
COPY GRANTS
RETURNS {{ return_type }}
{% if options -%}
{{ options }}
{% endif %}
{%- if api_integration -%}
api_integration = {{ api_integration -}}
{%- if max_batch_rows -%}
{{ "\n max_batch_rows = " ~ max_batch_rows -}}
{%- endif -%}
{%- if headers -%}
{{ "\n" ~ livequery_models.format_headers(headers) -}}
{%- endif -%}
{{ "\n AS " ~ livequery_models.construct_api_route(sql_) ~ ";" -}}
{%- else -%}
AS
$$
{{ sql_ }}
$$;
{%- endif -%}
{%- endmacro -%}
{%- macro create_or_drop_function_from_config(
config,
drop_ = False
) -%}
{% set name_ = config ["name"] %}
{% set signature = config ["signature"] %}
{% set return_type = config ["return_type"] if config ["return_type"] is string else config ["return_type"][0] %}
{% set sql_ = config ["sql"] %}
{% set options = config ["options"] %}
{% set api_integration = config ["api_integration"] %}
{% set func_type = config ["func_type"] %}
{% set max_batch_rows = config ["max_batch_rows"] %}
{% set headers = config ["headers"] %}
{% if not drop_ -%}
{{ livequery_models.create_sql_function(
name_ = name_,
signature = signature,
return_type = return_type,
sql_ = sql_,
options = options,
api_integration = api_integration,
max_batch_rows = max_batch_rows,
func_type = func_type,
headers = headers
) }}
{%- else -%}
{{ drop_function(
name_,
signature = signature,
) }}
{%- endif %}
{% endmacro %}
{% macro crud_udfs(config_func, schema, drop_) %}
{#
Generate create or drop statements for a list of udf configs for a given schema
config_func: function that returns a list of udf configs
drop_: whether to drop or create the udfs
#}
{% set udfs = fromyaml(config_func())%}
{%- for udf in udfs -%}
{% if udf["name"].split(".") | first == schema %}
CREATE SCHEMA IF NOT EXISTS {{ schema }};
{{- create_or_drop_function_from_config(udf, drop_=drop_) -}}
{%- endif -%}
{%- endfor -%}
{%- endmacro -%}
{% macro crud_udfs_by_chain(config_func, blockchain, network, drop_) %}
{#
Generate create or drop statements for a list of udf configs for a given blockchain and network
config_func: function that returns a list of udf configs
blockchain: blockchain name
network: network name
drop_: whether to drop or create the udfs
#}
{% set schema = blockchain if not network else blockchain ~ "_" ~ network %}
CREATE SCHEMA IF NOT EXISTS {{ schema }};
{%- set configs = fromyaml(config_func(blockchain, network)) if network else fromyaml(config_func(schema, blockchain)) -%}
{%- for udf in configs -%}
{{- livequery_models.create_or_drop_function_from_config(udf, drop_=drop_) -}}
{%- endfor -%}
{%- endmacro -%}
{% macro crud_udfs_by_marketplace(config_func, schema, utility_schema, drop_) %}
{#
Generate create or drop statements for a list of udf configs for a given blockchain and network
config_func: function that returns a list of udf configs
schema: schema name
utility_schema: utility schema name
#}
CREATE SCHEMA IF NOT EXISTS {{ schema }};
{%- set configs = fromyaml(config_func(schema, utility_schema)) if utility_schema else fromyaml(config_func(schema, schema)) -%}
{%- for udf in configs -%}
{{- create_or_drop_function_from_config(udf, drop_=drop_) -}}
{%- endfor -%}
{%- endmacro -%}
{% macro crud_marketplace_udfs(config_func, schemaName, base_api_schema_name, drop_) %}
{#
Generate create or drop statements for a list of udf configs for a given schema and api
config_func: function that returns a list of udf configs
schemaName: the target schema to build the udfs
base_api_schema_name: the schema that contains base api functions
drop_: whether to drop or create the udfs
#}
{%- set udfs = fromyaml(config_func(schemaName, base_api_schema_name)) -%}
{%- for udf in udfs -%}
{{- create_or_drop_function_from_config(udf, drop_=drop_) -}}
{%- endfor -%}
{%- endmacro -%}
{% macro ephemeral_deploy_core(config) %}
{#
This macro is used to deploy functions using ephemeral models.
It should only be used within an ephemeral model.
#}
{% if execute and (var("UPDATE_UDFS_AND_SPS") or var("DROP_UDFS_AND_SPS")) and model.unique_id in selected_resources %}
{% set sql %}
{{- livequery_models.crud_udfs(config, this.schema, var("DROP_UDFS_AND_SPS")) -}}
{%- endset -%}
{%- if var("DROP_UDFS_AND_SPS") -%}
{%- do log("Drop core udfs: " ~ this.database ~ "." ~ this.schema, true) -%}
{%- else -%}
{%- do log("Deploy core udfs: " ~ this.database ~ "." ~ this.schema, true) -%}
{%- endif -%}
{%- do run_query(sql ~ livequery_models.apply_grants_by_schema(this.schema)) -%}
{%- endif -%}
SELECT '{{ model.schema }}' as schema_
{%- endmacro -%}
{% macro ephemeral_deploy(configs) %}
{#
This macro is used to deploy functions using ephemeral models.
It should only be used within an ephemeral model.
#}
{%- set blockchain = this.schema -%}
{%- set network = this.identifier -%}
{% set schema = blockchain ~ "_" ~ network %}
{% if execute and (var("UPDATE_UDFS_AND_SPS") or var("DROP_UDFS_AND_SPS")) and model.unique_id in selected_resources %}
{% set sql %}
{% for config in configs %}
{{- livequery_models.crud_udfs_by_chain(config, blockchain, network, var("DROP_UDFS_AND_SPS")) -}}
{%- endfor -%}
{%- endset -%}
{%- if var("DROP_UDFS_AND_SPS") -%}
{%- do log("Drop partner udfs: " ~ this.database ~ "." ~ schema, true) -%}
{%- else -%}
{%- do log("Deploy partner udfs: " ~ this.database ~ "." ~ schema, true) -%}
{%- endif -%}
{%- do run_query(sql ~ livequery_models.apply_grants_by_schema(schema)) -%}
{%- endif -%}
SELECT '{{ model.schema }}' as schema_
{%- endmacro -%}
{% macro ephemeral_deploy_marketplace(configs) %}
{#
This macro is used to deploy functions using ephemeral models.
It should only be used within an ephemeral model.
#}
{%- set schema = this.schema -%}
{%- set utility_schema = this.identifier -%}
{% if execute and (var("UPDATE_UDFS_AND_SPS") or var("DROP_UDFS_AND_SPS")) and model.unique_id in selected_resources %}
{% set sql %}
{% for config in configs %}
{{- crud_udfs_by_marketplace(config, schema, utility_schema, var("DROP_UDFS_AND_SPS")) -}}
{%- endfor -%}
{%- endset -%}
{%- if var("DROP_UDFS_AND_SPS") -%}
{%- do log("Drop marketplace udfs: " ~ this.database ~ "." ~ schema, true) -%}
{%- else -%}
{%- do log("Deploy marketplace udfs: " ~ this.database ~ "." ~ schema, true) -%}
{%- endif -%}
{%- do run_query(sql ~ livequery_models.apply_grants_by_schema(schema)) -%}
{%- endif -%}
SELECT '{{ model.schema }}' as schema_
{%- endmacro -%}