Enhance UDF definitions and add new UDF for S3 presigned URL retrieval

- Introduced `udf_api_sync` and `udf_api_async` functions for synchronous and asynchronous API calls.
- Added `udf_redirect_s3_presigned_url` function to handle S3 presigned URL requests with error handling for gzip decompression and JSON parsing.
- Updated existing UDFs in the live YAML configuration to include new signatures and return types.
- Improved header formatting in the `manage_udfs.sql` file to support dynamic header generation for API requests.
This commit is contained in:
Jensen Yap 2025-06-30 20:49:30 +09:00
parent b3d6329d32
commit 7894b18034
8 changed files with 177 additions and 10 deletions

View File

@ -31,4 +31,40 @@
NOT NULL
sql: udf_api
{% endmacro %}
- name: {{ schema }}.udf_api_sync
signature:
- [method, STRING]
- [url, STRING]
- [headers, OBJECT]
- [DATA, VARIANT]
- [user_id, STRING]
- [SECRET, STRING]
return_type: VARIANT
func_type: EXTERNAL
api_integration: '{{ var("API_INTEGRATION") }}'
max_batch_rows: '1'
headers:
- 'fsc-quantum-execution-mode': 'sync'
options: |
NOT NULL
sql: 'v2/udf_api'
- name: {{ schema }}.udf_api_async
signature:
- [method, STRING]
- [url, STRING]
- [headers, OBJECT]
- [DATA, VARIANT]
- [user_id, STRING]
- [SECRET, STRING]
return_type: VARIANT
func_type: EXTERNAL
api_integration: '{{ var("API_INTEGRATION") }}'
max_batch_rows: '1'
headers:
- 'fsc-quantum-execution-mode': 'async'
options: |
NOT NULL
sql: 'v2/udf_api'
{% endmacro %}

View File

@ -236,7 +236,7 @@ def transform_hex_to_bech32(hex, hrp=''):
return 'Data conversion failed'
checksum = bech32_create_checksum(hrp, data5bit)
return hrp + '1' + ''.join([CHARSET[d] for d in data5bit + checksum])
{% endmacro %}
@ -260,14 +260,14 @@ def int_to_binary(num):
if inverted_string[i] == "1" and carry == 1:
result = "0" + result
elif inverted_string[i] == "0" and carry == 1:
result = "1" + result
result = "1" + result
carry = 0
else:
result = inverted_string[i] + result
binary_string = result
binary_string = result
return binary_string
return binary_string
{% endmacro %}
@ -278,7 +278,7 @@ def binary_to_int(binary):
for char in binary:
if char not in "01":
raise ValueError("Input string must be a valid binary string.")
integer = 0
for i, digit in enumerate(binary[::-1]):
@ -287,5 +287,39 @@ def binary_to_int(binary):
integer += digit_int * 2**i
return str(integer)
{% endmacro %}
{% endmacro %}
{% macro create_udf_redirect_s3_presigned_url() %}
import requests
import json
import gzip
import io
def process_request(url):
resp = requests.get(url)
content = resp.content
# Decompress if URL contains .json.gz
if '.json.gz' in url:
try:
# Decompress the gzipped content
with gzip.GzipFile(fileobj=io.BytesIO(content), mode='rb') as f:
content = f.read()
except Exception as e:
return {"error": "Failed to decompress gzip data", "message": str(e)}
# Try to parse as JSON
try:
text_content = content.decode('utf-8')
return json.loads(text_content)
except (json.JSONDecodeError, UnicodeDecodeError):
# If not JSON or not valid UTF-8, return as string or base64
try:
# Try to return as string if its valid text
return content.decode('utf-8')
except UnicodeDecodeError:
# For binary data, return base64
import base64
return base64.b64encode(content).decode('ascii')
{% endmacro %}

View File

@ -21,6 +21,35 @@
secret_name
)
- name: {{ schema }}.udf_api
signature:
- [method, STRING]
- [url, STRING]
- [headers, OBJECT]
- [data, VARIANT]
- [secret_name, STRING]
- [is_async, BOOLEAN]
return_type: VARIANT
options: |
VOLATILE
sql: |
SELECT
CASE COALESCE(IS_ASYNC, FALSE)
WHEN TRUE
THEN
-- Async execution: run async function then test_requests
_live.redirect_s3_presigned_url(
_live.udf_api_async(
METHOD, URL, HEADERS, DATA, USER_ID, SECRET
) : s3_presigned_url :: STRING
) : data [0][1]
ELSE
-- Default execution: run regular function
_live.udf_api_sync(
METHOD, URL, HEADERS, DATA, USER_ID, SECRET
)
END AS results
- name: {{ schema }}.udf_api
signature:
- [method, STRING]
@ -41,6 +70,7 @@
_utils.UDF_WHOAMI(),
secret_name
)
- name: {{ schema }}.udf_api
signature:
- [method, STRING]

View File

@ -306,4 +306,17 @@
sql: |
{{ create_udf_binary_to_int() | indent(4) }}
- name: {{ schema }}.udf_redirect_s3_presigned_url
signature:
- [url, STRING]
return_type: TEXT
options: |
LANGUAGE PYTHON
RUNTIME_VERSION = '3.10'
HANDLER = 'process_request'
EXTERNAL_ACCESS_INTEGRATIONS = (S3_EXPRESS_EXTERNAL_ACCESS_INTEGRATION)
PACKAGES = ('requests')
sql: |
{{ create_udf_redirect_s3_presigned_url() | indent(4) }}
{% endmacro %}

View File

@ -0,0 +1,20 @@
{% macro create_s3_express_external_access_integration() %}
{% set sql %}
CREATE OR REPLACE NETWORK RULE live.s3_express_network_rule
MODE = EGRESS
TYPE = HOST_PORT
VALUE_LIST = (
'*.s3express-use1-az4.us-east-1.amazonaws.com:443',
'*.s3express-use1-az5.us-east-1.amazonaws.com:443',
'*.s3express-use1-az6.us-east-1.amazonaws.com:443'
);
CREATE OR REPLACE EXTERNAL ACCESS INTEGRATION live.s3_express_external_access_integration
ALLOWED_NETWORK_RULES = (s3_express_network_rule)
ENABLED = true
;
{% endset %}
{% do run_query(sql) %}
{% do log("External S3 Express access integration successfully created", true) %}
{% endmacro %}

View File

@ -26,6 +26,32 @@
{% endfor -%}
{%- endmacro -%}
{%- macro format_headers(headers) -%}
{%- if headers -%}
{%- if headers is mapping -%}
{%- set header_items = [] -%}
{%- for key, value in headers.items() -%}
{%- set _ = header_items.append("'" ~ key ~ "' = '" ~ value ~ "'") -%}
{%- endfor -%}
HEADERS = (
{{ header_items | join(',\n ') }}
)
{%- elif headers is iterable -%}
{%- set header_items = [] -%}
{%- for item in headers -%}
{%- if item is mapping -%}
{%- for key, value in item.items() -%}
{%- set _ = header_items.append("'" ~ key ~ "' = '" ~ value ~ "'") -%}
{%- endfor -%}
{%- endif -%}
{%- endfor -%}
HEADERS = (
{{ header_items | join(',\n ') }}
)
{%- endif -%}
{%- endif -%}
{%- endmacro -%}
{% macro create_sql_function(
name_,
signature,
@ -34,7 +60,8 @@
api_integration = none,
options = none,
func_type = none,
max_batch_rows = none
max_batch_rows = none,
headers = none
) %}
CREATE OR REPLACE {{ func_type }} FUNCTION {{ name_ }}(
{{- livequery_models.compile_signature(signature) }}
@ -49,6 +76,9 @@
{%- if max_batch_rows -%}
{{ "\n max_batch_rows = " ~ max_batch_rows -}}
{%- endif -%}
{%- if headers -%}
{{ "\n" ~ livequery_models.format_headers(headers) -}}
{%- endif -%}
{{ "\n AS " ~ livequery_models.construct_api_route(sql_) ~ ";" -}}
{%- else -%}
AS
@ -70,6 +100,7 @@
{% set api_integration = config ["api_integration"] %}
{% set func_type = config ["func_type"] %}
{% set max_batch_rows = config ["max_batch_rows"] %}
{% set headers = config ["headers"] %}
{% if not drop_ -%}
{{ livequery_models.create_sql_function(
name_ = name_,
@ -79,7 +110,8 @@
options = options,
api_integration = api_integration,
max_batch_rows = max_batch_rows,
func_type = func_type
func_type = func_type,
headers = headers
) }}
{%- else -%}
{{ drop_function(

View File

@ -0,0 +1 @@
{{ create_s3_express_external_access_integration() }}

View File

@ -1,3 +1,4 @@
-- depends_on: {{ ref('_utils') }}
-- depends_on: {{ ref('_external_access')}}
{% set config = config_core_utils %}
{{ ephemeral_deploy_core(config) }}