Merge pull request #43 from FlipsideCrypto/AN-4930/integration-macros

add integration macros and other tweaks
This commit is contained in:
Austin 2024-06-19 16:44:40 -04:00 committed by GitHub
commit f560e45de5
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 149 additions and 65 deletions

View File

@ -165,15 +165,15 @@ The `Streamline V 2.0` functions are a set of macros and UDFs that are designed
{{ config (
materialized = "view",
post_hook = fsc_utils.if_data_call_function_v2(
func = 'udf_bulk_rest_api_v2',
func = 'streamline.udf_bulk_rest_api_v2',
target = "{{this.schema}}.{{this.identifier}}",
params = {
"external_table": "external_table",
"sql_limit": "10",
"producer_batch_size": "10",
"worker_batch_size": "10",
"sm_secret_name": "aws/sm/path",
"sql_source": "{{this.identifier}}"
"sql_source": "{{this.identifier}}",
"exploded_key": tojson(["result.transactions"])
}
),
tags = ['model_tags']
@ -198,37 +198,56 @@ The `Streamline V 2.0` functions are a set of macros and UDFs that are designed
{
"external_table": "ASSET_OHLC_API/COINGECKO",
"producer_batch_size": "10",
"sm_secret_name": "prod/coingecko/rest",
"sql_limit": "10",
"sql_source": "{{this.identifier}}",
"worker_batch_size": "10"
"worker_batch_size": "10",
"exploded_key": tojson(["result.transactions"])
}
on {{this.schema}}.{{this.identifier}}
22:00:03 1 of 1 OK created sql view model streamline.coingecko_realtime_ohlc ............ [SUCCESS 1 in 12.75s]
22:00:03
```
- [create_udf_bulk_rest_api_v2](/macros/streamline/udfs.sql#L1): This macro is used to create a `udf` named `udf_bulk_rest_api_v2` in the `streamline` schema of the database this is invoked in. This function returns a `variant` type and uses an API integration. The API integration and the external function URI are determined based on the target environment (`prod`, `dev`, or `sbx`).
The [macro interpolates](/macros/streamline/udfs.sql#L9) the `API_INTEGRATION` and `EXTERNAL_FUNCTION_URI` vars from the `dbt_project.yml` file.
**NOTE**: To be congruent with how `EXTERNAL_FUNCTION_URI` is being used by other macros and maintain consistency, starting from `v1.21.7` we need to append a trailing `/` to the `EXTERNAL_FUNCTION_URI` in the `dbt_project.yml` file.
```sql
- [create_aws_api_integrations](/macros/streamline/udfs.sql#L49): This macro is used to build the API Integrations necessary for the streamline UDFs. It requires the `API_INTEGRATION`, `EXTERNAL_FUNCTION_URI`, `API_AWS_ROLE_ARN`, and `ROLES` vars from the `dbt_project.yml` file and is available starting with `v1.25.0`. The API integration is determined based on the target environment. `prod` and `dev` are the two options. If you use a target other than `prod` or `dev`, it will default to `dev`.
```yml
# Setup variables in dbt_project.yml
API_INTEGRATION: '{{ var("config")[target.name]["API_INTEGRATION"] }}'
EXTERNAL_FUNCTION_URI: '{{ var("config")[target.name]["EXTERNAL_FUNCTION_URI"] }}'
API_AWS_ROLE_ARN: '{{ var("config")[target.name]["API_AWS_ROLE_ARN"] if var("config")[target.name] else var("config")["dev"]["API_AWS_ROLE_ARN"] }}'
ROLES: '{{ var("config")[target.name]["ROLES"] }}'
config:
# The keys correspond to dbt profiles and are case sensitive
dev:
API_INTEGRATION: AWS_CROSSCHAIN_API_STG
EXTERNAL_FUNCTION_URI: q0bnjqvs9a.execute-api.us-east-1.amazonaws.com/stg/
API_AWS_ROLE_ARN: arn:aws:iam::704693948482:role/crosschain-api-stg-rolesnowflakeudfsAF733095-CCDPFHsmlGmu
ROLES:
- AWS_LAMBDA_CROSSCHAIN_API
- INTERNAL_DEV
prod:
API_INTEGRATION: AWS_CROSSCHAIN_API_PROD
EXTERNAL_FUNCTION_URI: 35hm1qhag9.execute-api.us-east-1.amazonaws.com/prod/
API_AWS_ROLE_ARN: arn:aws:iam::924682671219:role/crosschain-api-prod-rolesnowflakeudfsAF733095-jBvAIUHiR70D
ROLES:
- AWS_LAMBDA_CROSSCHAIN_API
- INTERNAL_DEV
- DBT_CLOUD_CROSSCHAIN
```
- [create_udf_bulk_rest_api_v2](/macros/streamline/udfs.sql#L1): This macro is used to create a `udf` named `udf_bulk_rest_api_v2` in the `streamline` schema of the database this is invoked in. This function returns a `variant` type and uses an API integration. The API integration and the external function URI are determined based on the target environment (`prod`, `dev`, or `sbx`).
The [macro interpolates](/macros/streamline/udfs.sql#L9) the `API_INTEGRATION` and `EXTERNAL_FUNCTION_URI` vars from the `dbt_project.yml` file.
**NOTE**: To be congruent with how `EXTERNAL_FUNCTION_URI` is being used by other macros and maintain consistency, starting from `v1.21.7` we need to append a trailing `/` to the `EXTERNAL_FUNCTION_URI` in the `dbt_project.yml` file.
- [create_udf_bulk_decode_logs](/macros/streamline/udfs.sql#L25): This macro is used to create a `udf` name `udf_bulk_decode_logs_v2 ` in the `streamline` schema of the databae this is invoked in. This function returns a `variant` type and uses an API integration. The API integration and the external function URI are determined based on the target environment (`prod`, `dev`, or `sbx`).
The [macro interpolates](/macros/streamline/udfs.sql#L32) the `API_INTEGRATION` and `EXTERNAL_FUNCTION_URI` vars from the `dbt_project.yml` file.
- [create_streamline_udfs](macros/create_streamline_udfs.sql#L1). This macro runs [create_aws_api_integrations](/macros/streamline/udfs.sql#L49) and [create_udf_bulk_rest_api_v2](/macros/streamline/udfs.sql#L1) when ran with `--vars '{UPDATE_UDFS_AND_SPS: true}'`.
- [create_evm_streamline_udfs](macros/create_streamline_udfs.sql#L8). This macro runs [create_aws_api_integrations](/macros/streamline/udfs.sql#L49), [create_udf_bulk_rest_api_v2](/macros/streamline/udfs.sql#L1), and [create_udf_bulk_decode_logs](/macros/streamline/udfs.sql#L25) when ran with `--vars '{UPDATE_UDFS_AND_SPS: true}'`. This is designed to be used on the EVM chains due to the inclusion of `create_udf_bulk_decode_logs`.

View File

@ -1,6 +0,0 @@
{% macro create_evm_streamline_udfs() %}
{% if var("UPDATE_UDFS_AND_SPS") %}
{{ create_udf_bulk_rest_api_v2_evm() }}
{{ create_udf_bulk_decode_logs() }}
{% endif %}
{% endmacro %}

View File

@ -0,0 +1,14 @@
{% macro create_streamline_udfs() %}
{% if var("UPDATE_UDFS_AND_SPS") %}
{{ create_aws_api_integrations() }}
{{ create_udf_bulk_rest_api_v2() }}
{% endif %}
{% endmacro %}
{% macro create_evm_streamline_udfs() %}
{% if var("UPDATE_UDFS_AND_SPS") %}
{{ create_aws_api_integrations() }}
{{ create_udf_bulk_rest_api_v2() }}
{{ create_udf_bulk_decode_logs() }}
{% endif %}
{% endmacro %}

View File

@ -88,3 +88,72 @@ WHERE
{% endfor %}
{% endmacro %}
{% macro streamline_external_table_query_v2(
model,
partition_function
) %}
WITH meta AS (
SELECT
job_created_time AS _inserted_timestamp,
file_name,
{{ partition_function }} AS partition_key
FROM
TABLE(
information_schema.external_table_file_registration_history(
start_time => DATEADD('day', -3, CURRENT_TIMESTAMP()),
table_name => '{{ source( "bronze_streamline", model) }}')
) A
)
SELECT
s.*,
b.file_name,
_inserted_timestamp
FROM
{{ source(
"bronze_streamline",
model
) }}
s
JOIN meta b
ON b.file_name = metadata$filename
AND b.partition_key = s.partition_key
WHERE
b.partition_key = s.partition_key
AND DATA :error IS NULL
AND DATA is not null
{% endmacro %}
{% macro streamline_external_table_FR_query_v2(
model,
partition_function
) %}
WITH meta AS (
SELECT
registered_on AS _inserted_timestamp,
file_name,
{{ partition_function }} AS partition_key
FROM
TABLE(
information_schema.external_table_files(
table_name => '{{ source( "bronze_streamline", model) }}'
)
) A
)
SELECT
s.*,
b.file_name,
_inserted_timestamp
FROM
{{ source(
"bronze_streamline",
model
) }}
s
JOIN meta b
ON b.file_name = metadata$filename
AND b.partition_key = s.partition_key
WHERE
b.partition_key = s.partition_key
AND DATA :error IS NULL
AND DATA is not null
{% endmacro %}

View File

@ -2,55 +2,6 @@
{{ log("Creating udf udf_bulk_rest_api for target:" ~ target.name ~ ", schema: " ~ target.schema ~ ", DB: " ~ target.database, info=True) }}
{{ log("role:" ~ target.role ~ ", user:" ~ target.user, info=True) }}
{% set sql %}
CREATE OR REPLACE EXTERNAL FUNCTION streamline.udf_bulk_rest_api_v2(json variant) returns variant api_integration =
{% if target.name == "prod" %}
{{ log("Creating prod udf_bulk_rest_api_v2", info=True) }}
{{ var("API_INTEGRATION") }} AS 'https://{{ var("EXTERNAL_FUNCTION_URI") | lower }}udf_bulk_rest_api'
{% elif target.name == "dev" %}
{{ log("Creating dev udf_bulk_rest_api_v2", info=True) }}
{{ var("API_INTEGRATION") }} AS 'https://{{ var("EXTERNAL_FUNCTION_URI") | lower }}udf_bulk_rest_api'
{% elif target.name == "sbx" %}
{{ log("Creating stg udf_bulk_rest_api_v2", info=True) }}
{{ var("API_INTEGRATION") }} AS 'https://{{ var("EXTERNAL_FUNCTION_URI") | lower }}udf_bulk_rest_api'
{% else %}
{{ log("Creating default (dev) udf_bulk_rest_api_v2", info=True) }}
{{ var("config")["dev"]["API_INTEGRATION"] }} AS 'https://{{ var("config")["dev"]["EXTERNAL_FUNCTION_URI"] | lower }}udf_bulk_rest_api'
{% endif %}
{% endset %}
{{ log(sql, info=True) }}
{% do adapter.execute(sql) %}
{% endmacro %}
{% macro create_udf_bulk_decode_logs() %}
{{ log("Creating udf udf_bulk_decode_logs for target:" ~ target.name ~ ", schema: " ~ target.schema ~ ", DB: " ~ target.database, info=True) }}
{{ log("role:" ~ target.role ~ ", user:" ~ target.user, info=True) }}
{% set sql %}
CREATE OR REPLACE EXTERNAL FUNCTION streamline.udf_bulk_decode_logs(json object) returns array api_integration =
{% if target.name == "prod" %}
{{ log("Creating prod udf_bulk_decode_logs", info=True) }}
{{ var("API_INTEGRATION") }} AS 'https://{{ var("EXTERNAL_FUNCTION_URI") | lower }}bulk_decode_logs'
{% elif target.name == "dev" %}
{{ log("Creating dev udf_bulk_decode_logs", info=True) }}
{{ var("API_INTEGRATION") }} AS 'https://{{ var("EXTERNAL_FUNCTION_URI") | lower }}bulk_decode_logs'
{% elif target.name == "sbx" %}
{{ log("Creating stg udf_bulk_decode_logs", info=True) }}
{{ var("API_INTEGRATION") }} AS 'https://{{ var("EXTERNAL_FUNCTION_URI") | lower }}bulk_decode_logs'
{% else %}
{{ log("Creating default (dev) udf_bulk_decode_logs", info=True) }}
{{ var("config")["dev"]["API_INTEGRATION"] }} AS 'https://{{ var("config")["dev"]["EXTERNAL_FUNCTION_URI"] | lower }}bulk_decode_logs'
{% endif %};
{% endset %}
{{ log(sql, info=True) }}
{% do adapter.execute(sql) %}
{% endmacro %}
{% macro create_udf_bulk_rest_api_v2_evm() %}
{{ log("Creating udf udf_bulk_rest_api for target:" ~ target.name ~ ", schema: " ~ target.schema ~ ", DB: " ~ target.database, info=True) }}
{{ log("role:" ~ target.role ~ ", user:" ~ target.user, info=True) }}
{% set sql %}
CREATE OR REPLACE EXTERNAL FUNCTION streamline.udf_bulk_rest_api_v2(json object) returns array api_integration =
{% if target.name == "prod" %}
@ -70,3 +21,40 @@
{{ log(sql, info=True) }}
{% do adapter.execute(sql) %}
{% endmacro %}
{% macro create_udf_bulk_decode_logs() %}
{{ log("Creating udf udf_bulk_decode_logs_v2 for target:" ~ target.name ~ ", schema: " ~ target.schema ~ ", DB: " ~ target.database, info=True) }}
{{ log("role:" ~ target.role ~ ", user:" ~ target.user, info=True) }}
{% set sql %}
CREATE OR REPLACE EXTERNAL FUNCTION streamline.udf_bulk_decode_logs_v2(json object) returns array api_integration =
{% if target.name == "prod" %}
{{ log("Creating prod udf_bulk_decode_logs_v2", info=True) }}
{{ var("API_INTEGRATION") }} AS 'https://{{ var("EXTERNAL_FUNCTION_URI") | lower }}bulk_decode_logs'
{% elif target.name == "dev" %}
{{ log("Creating dev udf_bulk_decode_logs_v2", info=True) }}
{{ var("API_INTEGRATION") }} AS 'https://{{ var("EXTERNAL_FUNCTION_URI") | lower }}bulk_decode_logs'
{% elif target.name == "sbx" %}
{{ log("Creating stg udf_bulk_decode_logs_v2", info=True) }}
{{ var("API_INTEGRATION") }} AS 'https://{{ var("EXTERNAL_FUNCTION_URI") | lower }}bulk_decode_logs'
{% else %}
{{ log("Creating default (dev) udf_bulk_decode_logs_v2", info=True) }}
{{ var("config")["dev"]["API_INTEGRATION"] }} AS 'https://{{ var("config")["dev"]["EXTERNAL_FUNCTION_URI"] | lower }}bulk_decode_logs'
{% endif %};
{% endset %}
{{ log(sql, info=True) }}
{% do adapter.execute(sql) %}
{% endmacro %}
{% macro create_aws_api_integrations() %}
{{ log("Creating api integration for target:" ~ target.name ~ ", schema: " ~ target.schema ~ ", DB: " ~ target.database, info=True) }}
{{ log("role:" ~ target.role ~ ", user:" ~ target.user, info=True) }}
{% set sql %}
CREATE OR REPLACE api integration {{ var("API_INTEGRATION") }} api_provider = aws_api_gateway api_aws_role_arn = '{{ var("API_AWS_ROLE_ARN") }}' api_allowed_prefixes = (
'https://{{ var("EXTERNAL_FUNCTION_URI") | lower }}'
) enabled = TRUE;
{% endset %}
{{ log(sql, info=True) }}
{% do adapter.execute(sql) %}
{% endmacro %}