Compare commits

...

96 Commits

Author SHA1 Message Date
Austin
a363429861
Merge pull request #70 from FlipsideCrypto/fix/udf-compile-error
Fix/udf compile error
2025-12-10 13:50:06 -05:00
Austin
f0ef82c39a int to hex 2025-12-10 13:42:54 -05:00
Austin
e84c7f4601 db 2025-12-10 13:38:21 -05:00
Austin
4dd7280484 move comments to read me 2025-12-10 13:34:41 -05:00
Austin
1036b6833a remove comments 2025-12-10 13:29:48 -05:00
Austin
a91069c976 updates 2025-12-10 13:28:43 -05:00
Austin
c9a5b819c8
Merge pull request #69 from FlipsideCrypto/DAT2-195/read-functions
Dat2 195/read functions
2025-12-10 13:19:47 -05:00
Austin
d0e3f57772 format 2025-12-10 13:15:16 -05:00
Austin
70d5fc1c3e updates 2025-12-10 12:58:54 -05:00
Austin
027f73276c dbs 2025-12-09 13:36:48 -05:00
Austin
6981b8b42d db 2025-12-09 13:31:48 -05:00
Austin
1fd0466311 comment 2025-12-09 13:28:53 -05:00
Austin
b0e51e2b4d updates 2025-12-09 13:24:24 -05:00
drethereum
3697967c46
Merge pull request #68 from FlipsideCrypto/addnew/decompress-zlib-udf
addnew/decompress-zlib-udf
2025-11-04 13:45:28 -07:00
drethereum
70e238a548 closing macro 2025-11-04 12:33:39 -07:00
drethereum
4317c353a5 merge 2025-10-22 17:12:17 -06:00
drethereum
3985d78199 udf 2025-10-22 16:54:55 -06:00
Jensen Yap
6415fc4873
Merge pull request #66 from FlipsideCrypto/hotfix/upgrade-livequery-models-bug
Upgrade livequery models revision to v1.10.2
2025-08-19 11:33:25 +09:00
Jensen Yap
de65b99f86 Upgrade livequery models revision to v1.10.2 2025-08-16 02:34:40 +09:00
Jensen Yap
45fcf86aea
Merge pull request #65 from FlipsideCrypto/STREAM-1324/upgrade-livequery-version 2025-08-13 02:29:50 +09:00
Jensen Yap
ef0f0deec0 update to 1.10.1 2025-08-13 02:03:12 +09:00
Jensen Yap
76b46b9026 update livequery models revision to v1.10.0 2025-08-08 14:06:49 +09:00
Jensen Yap
4799e897e1 fix branch name 2025-08-08 13:56:37 +09:00
Jensen Yap
7b6feb4a40 update 2025-08-08 13:55:55 +09:00
Jensen Yap
36dab6002f bump version to 1.10.0 2025-08-08 12:20:27 +09:00
Jensen Yap
a0672aff35 update livequery models version 2025-08-08 01:21:33 +09:00
Jensen Yap
88e94f5160 update 2025-08-07 23:39:45 +09:00
Jensen Yap
567b311ca8 upgrade livequery models version 2025-08-07 16:13:09 +09:00
Matt Romano
3def5e5c44
Merge pull request #64 from FlipsideCrypto/add-coingecko-stablecoin-parse-udf
add-coingecko-stablecoin-parse-udf
2025-07-30 12:30:30 -07:00
mattromano
cfc2c69de8 add stable coin parse udf configs and logic 2025-07-30 11:10:13 -07:00
Austin
957f7252ab
Merge pull request #63 from FlipsideCrypto/bump-lq-models-1-9
livequery models update
2025-06-12 12:53:40 -04:00
Austin
d5a43b13ef livequery models update 2025-06-12 12:26:49 -04:00
drethereum
f62ca7b724
Merge pull request #61 from FlipsideCrypto/AN-6327/python-version
AN-6327/python-version
2025-06-02 09:30:54 -06:00
drethereum
77301474a1 3.9 2025-05-30 07:39:11 -06:00
Shah Newaz Khan
ae4cb69458
Merge pull request #60 from FlipsideCrypto/STREAM-1273/lq-udf-api-batched
bump livequery -> v1.8.0
2025-05-22 10:20:32 -07:00
shah
633873c64f update readme 2025-05-22 09:19:29 -07:00
shah
6a7231846c bump livequery -> v1.8.0 2025-05-22 09:14:59 -07:00
Shah Newaz Khan
87e00eb90a
Merge pull request #59 from FlipsideCrypto/STREAM-1175/livequery-models-reinstated
Stream 1175/livequery models reinstated
2025-04-17 09:17:09 -07:00
shah
219a8741e3 chane lq-models to 1.7.0 2025-04-16 22:24:39 -07:00
shah
55fe98490a pin livequery-models to v2.0.0 2025-04-16 09:27:51 -07:00
shah
33b43638b8 test livequery-models 2025-04-14 13:38:29 -07:00
shah
a964f313c4 bump lq-base -> v2.1.1 2025-04-11 07:32:33 -07:00
Shah Newaz Khan
62dbf23768
Merge pull request #57 from FlipsideCrypto/STREAM-1089/bump-livequery-base
bump livequery-base to v2.1.0 with namespace scope macro invocation
2025-04-10 16:09:48 -07:00
shah
7f9ff06ebf bump livequery-base to v2.1.0 with namespace scope macro invocation 2025-04-10 16:00:27 -07:00
Jensen Yap
042a1bcb4c
Merge pull request #56 from FlipsideCrypto/fix/update-cyclic-
Fix: Livequery Cyclic Dependency
2025-04-04 09:14:39 +09:00
Jensen Yap
1398cb500f
fix livequery package dependency 2025-04-04 09:10:03 +09:00
Shah Newaz Khan
657cd6c296
Merge pull request #55 from FlipsideCrypto/STREAM-1175/add-lq-deps
add livequery dep | update lq docs
2025-03-10 15:20:29 -07:00
shah
0c4e84498b remove livequery-base as dep 2025-03-05 16:53:37 -08:00
shah
532f641a80 pin livequery-models to v1.6.1 2025-03-05 16:49:22 -08:00
shah
b8a75bb04a add livequery dep | update lq docs 2025-03-05 13:26:37 -08:00
Jensen Yap
5f6015eb85
Merge pull request #54 from FlipsideCrypto/STREAM-1156/update-livequery-base-version
[STREAM-1156] Update livequery-base package to v1.0.1
2025-02-25 11:50:38 +09:00
Jensen Yap
98e099acf7 Update livequery-base package to v1.0.1 2025-02-25 11:36:52 +09:00
Shah Newaz Khan
81412cb3b4
Merge pull request #53 from FlipsideCrypto/STREAM-1111/add-lq-base
pin lq-base to v1.0.0
2024-12-09 20:19:17 -08:00
shah
4136383578 pin lq-base to v1.0.0 2024-12-06 18:25:14 -08:00
Shah Newaz Khan
6096046f00
Merge pull request #52 from FlipsideCrypto/STREAM-1111/add-lq-base
removed dbt non core artifacts
2024-12-06 10:57:15 -08:00
shah
17a2f7a00c pin lq-base to main 2024-12-06 09:00:01 -08:00
shah
decabb1992 update revision from branch name to commit hash 2024-12-02 20:59:51 -08:00
shah
04474dfdcd removed dbt non core artifacts 2024-12-02 14:29:21 -08:00
eric-laurello
ba1555481f
Merge pull request #51 from FlipsideCrypto/querytags
update snowflake query tag
2024-11-20 11:20:50 -05:00
Eric Laurello
d3cf679e07 Bump version to 1.32.0 2024-11-20 10:21:43 -05:00
Eric Laurello
21fd5f1494 ref 2024-11-20 09:43:14 -05:00
Eric Laurello
f76f2e8f89 update existing 2024-11-19 17:16:57 -05:00
Eric Laurello
e31b50e52d custom qery tags 2024-11-19 16:42:57 -05:00
Austin
15d75588e8
Merge pull request #50 from FlipsideCrypto/add/streamline-schema-exists
add
2024-09-25 16:59:09 -04:00
Austin
c3ab97e8e0 remove comment 2024-09-25 16:41:51 -04:00
Austin
eac2e05ee8 add 2024-09-25 16:33:53 -04:00
drethereum
90df5cf5fc
Merge pull request #49 from FlipsideCrypto/AN-5013/evm-sl-2
AN-5013/evm-sl-2
2024-09-11 11:20:41 -06:00
drethereum
8c99db4996 remove arn and api_integration macro, docs 2024-09-03 13:53:59 -06:00
drethereum
d786d4b85b evm udf, decoded_traces and wait function 2024-08-28 15:53:18 -06:00
Austin
2c8b7404f3
Merge pull request #46 from FlipsideCrypto/update-gha-logic
update performance view logic
2024-07-24 09:05:04 -04:00
Austin
eb33ac727a update performance view logic 2024-07-24 08:43:58 -04:00
drethereum
9a91c3025f
Merge pull request #44 from FlipsideCrypto/AN-4890/base58
AN-4890/base58
2024-06-27 12:58:28 -06:00
drethereum
f285fe2bdb merge 2024-06-26 14:19:34 -06:00
drethereum
a8caf58381
Merge pull request #45 from FlipsideCrypto/AN-4974/remove-api-intregration-macro
An 4974/remove api intregration macro
2024-06-26 08:14:42 -06:00
Austin
375c069b87 readme 2024-06-26 08:32:57 -04:00
Austin
280fb9ac53 remove api integration macro 2024-06-26 08:31:19 -04:00
drethereum
e64c8604eb update input 2024-06-21 11:25:52 -06:00
drethereum
a4ff4a46fb leading zeroes 2024-06-20 17:27:19 -06:00
Austin
f560e45de5
Merge pull request #43 from FlipsideCrypto/AN-4930/integration-macros
add integration macros and other tweaks
2024-06-19 16:44:40 -04:00
Austin
12aed56282 add external table queries 2024-06-18 22:11:37 -04:00
Austin
22ec6ca4ff add integration macros and other tweaks 2024-06-18 21:14:51 -04:00
Austin
e2dc225457
Merge pull request #42 from FlipsideCrypto/add/evm-create-macro
create evm udfs
2024-06-18 12:20:47 -04:00
Austin
2b645b2935 empty for tags 2024-06-18 11:59:42 -04:00
Austin
7e56e4c32f add execute 2024-06-18 11:56:30 -04:00
Austin
758c1cd076 remove double set 2024-06-18 11:53:25 -04:00
Austin
3560e17902 add begin/commit 2024-06-18 11:42:18 -04:00
Austin
98adeb6e2a remove decoder create 2024-06-18 11:36:50 -04:00
Austin
b70ad45daa create evm udfs 2024-06-18 11:15:05 -04:00
Austin
52b1ed36b7
Merge pull request #41 from FlipsideCrypto/AN-4759/upgrade-lq
Upgrade LQ to v1.4
2024-04-18 15:05:45 -04:00
Austin
484e9db07d Upgrade LQ to v1.4 2024-04-18 14:34:11 -04:00
Julius Remigio
e94e3d6964 Update revision of livequery-models to v1.3.0 2024-04-11 11:12:49 -07:00
Shah Newaz Khan
7cd34b3dda
Merge pull request #39 from FlipsideCrypto/STREAM-827/patch-udf-bulk-rest-api-v2
remove trailing / from EXTERNAL_FUNCTION_URI
2024-04-04 16:50:14 -07:00
shah
945ac6f610 Added readme.me docs for trailing / 2024-04-04 16:49:32 -07:00
shah
b1e612ace7 remove trailing / from EXTERNAL_FUNCTION_URI 2024-04-04 15:37:38 -07:00
Shah Newaz Khan
69938b7c35
Merge pull request #38 from FlipsideCrypto/STREAM-776/update-lq-deploy-docs
Update livequery deployment docs
2024-03-20 08:50:06 -07:00
shah
f86395ef9e add ephemeral model materialization snippet to livequery readme 2024-03-19 14:05:53 -07:00
10 changed files with 1304 additions and 299 deletions

500
README.md
View File

@ -6,9 +6,9 @@ Dbt repo for managing the Flipside Utility Functions (FSC_UTILS) dbt package.
Control the creation of `UDF` or `SP` macros with dbt run:
* `UPDATE_UDFS_AND_SPS` -
When `True`, executes all macros included in the on-run-start hooks within dbt_project.yml on model run as normal
When False, none of the on-run-start macros are executed on model run
- `UPDATE_UDFS_AND_SPS` -
When `True`, executes all macros included in the on-run-start hooks within dbt_project.yml on model run as normal
When False, none of the on-run-start macros are executed on model run
Default values is `False`
@ -38,9 +38,10 @@ dbt run-operation create_udfs --var 'UPDATE_UDFS_AND_SPS": True' --args 'drop_:t
packages:
- git: "https://github.com/FlipsideCrypto/fsc-utils.git"
revision: "v1.1.0"
```
```
---
**NOTE** Steps `2-5` above can also be automated using `make tag` directive:
### Tag Makefile Directives
@ -54,6 +55,7 @@ The `tag` directive is used to tag the current commit with a version number.
```sh
make tag version=<version_number>
```
Replace <version_number> with the version number you want to use.
What it does:
@ -64,6 +66,7 @@ Creates a new `git tag` with the name `v<version_number>` and a message of `vers
Pushes the new tag to the origin remote.
#### get_latest_tags
The get_latest_tags directive is used to display the latest git tags. By default, it displays the latest tag. You can change the number of tags displayed by setting the MAX_COUNT variable.
Usage:
@ -71,20 +74,25 @@ Usage:
```sh
make get_latest_tags MAX_COUNT=<count>
```
Replace <count> with the number of latest tags you want to display. If you don't specify a count, it defaults to `1`.
What it does:
Displays the latest `<count> git tags` in green text.
---
7. Run dbt deps in the other dbt project to pull the specific version of the package or follow the steps on `adding the dbt package` below.
Regarding Semantic Versioning;
1. Semantic versioning is a versioning scheme for software that aims to convey meaning about the underlying changes with each new release.
2. It's typically formatted as MAJOR.MINOR.PATCH (e.g. v1.2.3), where:
- MAJOR version (first number) should increment when there are potential breaking or incompatible changes.
- MINOR version (second number) should increment when functionality or features are added in a backwards-compatible manner.
- PATCH version (third number) should increment when bug fixes are made without adding new features.
3. Semantic versioning helps package users understand the degree of changes in a new release, and decide when to adopt new versions. With dbt packages, when you tag a release with a semantic version, users can specify the exact version they want to use in their projects.
## Adding the `fsc_utils` dbt package
@ -92,19 +100,23 @@ Regarding Semantic Versioning;
The `fsc_utils` dbt package is a centralized repository consisting of various dbt macros and snowflake functions that can be utilized across other repos.
1. Navigate to the `create_udfs.sql` macro in your respective repo where you want to install the package.
2. Add the following:
2. Add the following:
```
{% set name %}
{% set name %}
{{- fsc_utils.create_udfs() -}}
{% endset %}
{% do run_query(sql) %}
```
3. Note: fsc_utils.create_udfs() takes two parameters (drop_=False, schema="utils"). Set `drop_` to `True` to drop existing functions or define `schema` for the functions (default set to `utils`). Params not required.
```
3. Note: fsc*utils.create_udfs() takes two parameters (drop*=False, schema="utils"). Set `drop_` to `True` to drop existing functions or define `schema` for the functions (default set to `utils`). Params not required.
4. Navigate to `packages.yml` in your respective repo.
5. Add the following:
```
- git: https://github.com/FlipsideCrypto/fsc-utils.git
```
6. Run `dbt deps` to install the package
7. Run the macro `dbt run-operation create_udfs --var '{"UPDATE_UDFS_AND_SPS":True}'`
@ -113,158 +125,313 @@ The `fsc_utils` dbt package is a centralized repository consisting of various db
#### **UTILS Functions**
- `utils.udf_hex_to_int`: Use this UDF to transform any hex string to integer
```
ex: Curve Swaps
SELECT
regexp_substr_all(SUBSTR(DATA, 3, len(DATA)), '.{64}') AS segmented_data,
utils.hex_to_int(segmented_data [1] :: STRING) :: INTEGER AS tokens_sold
FROM
optimism.core.fact_event_logs
WHERE
topics [0] :: STRING IN (
'0x8b3e96f2b889fa771c53c981b40daf005f63f637f1869f707052d15a3dd97140',
'0xd013ca23e77a65003c2c659c5442c00c805371b7fc1ebd4c206c41d1536bd90b'
)
```
```
ex: Curve Swaps
SELECT
regexp_substr_all(SUBSTR(DATA, 3, len(DATA)), '.{64}') AS segmented_data,
utils.hex_to_int(segmented_data [1] :: STRING) :: INTEGER AS tokens_sold
FROM
optimism.core.fact_event_logs
WHERE
topics [0] :: STRING IN (
'0x8b3e96f2b889fa771c53c981b40daf005f63f637f1869f707052d15a3dd97140',
'0xd013ca23e77a65003c2c659c5442c00c805371b7fc1ebd4c206c41d1536bd90b'
)
```
- `utils.udf_hex_to_string`: Use this UDF to transform any hexadecimal string to a regular string, removing any non-printable or control characters from the resulting string.
```
ex: Token Names
WITH base AS (
SELECT
'0x0000000000000000000000000000000000000000000000000000000000000020000000000000000000000000000000000000000000000000000000000000005452617265202d204368616e74616c20486167656c202d20576f6d656e2773204575726f2032303232202d2032303232205371756164202d20576f6d656e2773204e6174696f6e616c205465616d202d2032303232000000000000000000000000' AS input_token_name
)
```
ex: Token Names
WITH base AS (
SELECT
'0x0000000000000000000000000000000000000000000000000000000000000020000000000000000000000000000000000000000000000000000000000000005452617265202d204368616e74616c20486167656c202d20576f6d656e2773204575726f2032303232202d2032303232205371756164202d20576f6d656e2773204e6174696f6e616c205465616d202d2032303232000000000000000000000000' AS input_token_name
)
SELECT
utils.udf_hex_to_string(SUBSTR(input_token_name,(64*2+3),LEN(input_token_name))) AS output_token_name
FROM base;
NOTE: The expression 64 * 2 + 3 in the query navigates to the 131st character of the hexadecimal string returned by an EVM blockchain contract's function, skipping metadata and adjusting for Snowflake's 1-based indexing. Keep in mind that the exact start of relevant data may vary between different contracts and functions.
```
- `utils.udf_encode_contract_call`: Encodes EVM contract function calls into ABI-encoded calldata format for eth_call RPC requests. Handles all Solidity types including tuples and arrays.
```
-- Simple function with no inputs
SELECT utils.udf_encode_contract_call(
PARSE_JSON('{"name": "totalSupply", "inputs": []}'),
ARRAY_CONSTRUCT()
);
-- Returns: 0x18160ddd
-- Function with single address parameter
SELECT utils.udf_encode_contract_call(
PARSE_JSON('{
"name": "balanceOf",
"inputs": [{"name": "account", "type": "address"}]
}'),
ARRAY_CONSTRUCT('0xa0b86991c6218b36c1d19d4a2e9eb0ce3606eb48')
);
-- Returns: 0x70a08231000000000000000000000000a0b86991c6218b36c1d19d4a2e9eb0ce3606eb48
-- Function with multiple parameters
SELECT utils.udf_encode_contract_call(
PARSE_JSON('{
"name": "transfer",
"inputs": [
{"name": "to", "type": "address"},
{"name": "amount", "type": "uint256"}
]
}'),
ARRAY_CONSTRUCT('0x1234567890123456789012345678901234567890', 1000000)
);
-- Complex function with nested tuples
SELECT utils.udf_encode_contract_call(
PARSE_JSON('{
"name": "swap",
"inputs": [{
"name": "params",
"type": "tuple",
"components": [
{"name": "tokenIn", "type": "address"},
{"name": "tokenOut", "type": "address"},
{"name": "amountIn", "type": "uint256"}
]
}]
}'),
ARRAY_CONSTRUCT(
ARRAY_CONSTRUCT(
'0xa0b86991c6218b36c1d19d4a2e9eb0ce3606eb48',
'0xc02aaa39b223fe8d0a0e5c4f27ead9083c756cc2',
1000000
)
)
);
```
- `utils.udf_create_eth_call`: Creates an eth_call JSON-RPC request object from contract address and encoded calldata. Supports block parameter as string or number (auto-converts numbers to hex).
```
-- Using default 'latest' block
SELECT utils.udf_create_eth_call(
'0xa0b86991c6218b36c1d19d4a2e9eb0ce3606eb48',
'0x70a08231000000000000000000000000a0b86991c6218b36c1d19d4a2e9eb0ce3606eb48'
);
-- Using specific block number (auto-converted to hex)
SELECT utils.udf_create_eth_call(
'0xa0b86991c6218b36c1d19d4a2e9eb0ce3606eb48',
'0x70a08231000000000000000000000000a0b86991c6218b36c1d19d4a2e9eb0ce3606eb48',
18500000
);
```
- `utils.udf_create_eth_call_from_abi`: Convenience function that combines contract call encoding and JSON-RPC request creation in a single call. Recommended for most use cases.
```
-- Simple balanceOf call with default 'latest' block
SELECT utils.udf_create_eth_call_from_abi(
'0xa0b86991c6218b36c1d19d4a2e9eb0ce3606eb48',
PARSE_JSON('{
"name": "balanceOf",
"inputs": [{"name": "account", "type": "address"}]
}'),
ARRAY_CONSTRUCT('0xbcca60bb61934080951369a648fb03df4f96263c')
);
-- Same call but at a specific block number
SELECT utils.udf_create_eth_call_from_abi(
'0xa0b86991c6218b36c1d19d4a2e9eb0ce3606eb48',
PARSE_JSON('{
"name": "balanceOf",
"inputs": [{"name": "account", "type": "address"}]
}'),
ARRAY_CONSTRUCT('0xbcca60bb61934080951369a648fb03df4f96263c'),
18500000
);
-- Using ABI from a table
WITH abi_data AS (
SELECT
utils.udf_hex_to_string(SUBSTR(input_token_name,(64*2+3),LEN(input_token_name))) AS output_token_name
FROM base;
NOTE: The expression 64 * 2 + 3 in the query navigates to the 131st character of the hexadecimal string returned by an EVM blockchain contract's function, skipping metadata and adjusting for Snowflake's 1-based indexing. Keep in mind that the exact start of relevant data may vary between different contracts and functions.
```
abi,
'0xa0b86991c6218b36c1d19d4a2e9eb0ce3606eb48' as contract_address,
'0xbcca60bb61934080951369a648fb03df4f96263c' as user_address
FROM ethereum.silver.flat_function_abis
WHERE contract_address = LOWER('0x43506849d7c04f9138d1a2050bbf3a0c054402dd')
AND function_name = 'balanceOf'
)
SELECT
utils.udf_create_eth_call_from_abi(
contract_address,
abi,
ARRAY_CONSTRUCT(user_address)
) as rpc_call
FROM abi_data;
```
## **Streamline V 2.0 Functions**
The `Streamline V 2.0` functions are a set of macros and UDFs that are designed to be used with `Streamline V 2.0` deployments.
The `Streamline V 2.0` functions are a set of macros and UDFs that are designed to be used with `Streamline V 2.0` deployments.
### Available macros:
- [if_data_call_function_v2](/macros/streamline/utils.sql#L86): This macro is used to call a udf in the `Streamline V 2.0` deployment. It is defined in the dbt model config block and accepts the `udf name` and the `udf` parameters. For legibility the `udf` parameters are passed as a `JSON object`.
- [if_data_call_function_v2](/macros/streamline/utils.sql#L86): This macro is used to call a udf in the `Streamline V 2.0` deployment. It is defined in the dbt model config block and accepts the `udf name` and the `udf` parameters. For legibility the `udf` parameters are passed as a `JSON object`.
**NOTE**: Ensure your project has registered the `udf` being invoked here prior to using this macro.
**NOTE**: Ensure your project has registered the `udf` being invoked here prior to using this macro.
**`Parameters`**:
- `func` - The name of the udf to be called.
- `target` - The target table for the udf to be called on, interpolated in the [if_data_call_function_v2 macro](/macros/streamline/utils.sql#L101).
- `params` - The parameters to be passed to the udf, a `JSON object` that contains the minimum parameters required by the udf all Streamline 2.0 udfs.
**`Parameters`**:
- `func` - The name of the udf to be called.
- `target` - The target table for the udf to be called on, interpolated in the [if_data_call_function_v2 macro](/macros/streamline/utils.sql#L101).
- `params` - The parameters to be passed to the udf, a `JSON object` that contains the minimum parameters required by the udf all Streamline 2.0 udfs.
```sql
-- Example usage in a dbt model config block
{{ config (
materialized = "view",
post_hook = fsc_utils.if_data_call_function_v2(
func = 'udf_bulk_rest_api_v2',
target = "{{this.schema}}.{{this.identifier}}",
params = {
"external_table": "external_table",
"sql_limit": "10",
"producer_batch_size": "10",
"worker_batch_size": "10",
"sm_secret_name": "aws/sm/path",
"sql_source": "{{this.identifier}}"
}
),
tags = ['model_tags']
) }}
```
When a dbt model with this config block is run we will see the following in the logs:
```sql
-- Example usage in a dbt model config block
{{ config (
materialized = "view",
post_hook = fsc_utils.if_data_call_function_v2(
func = 'streamline.udf_bulk_rest_api_v2',
target = "{{this.schema}}.{{this.identifier}}",
params = {
"external_table": "external_table",
"sql_limit": "10",
"producer_batch_size": "10",
"worker_batch_size": "10",
"sql_source": "{{this.identifier}}",
"exploded_key": tojson(["result.transactions"])
}
),
tags = ['model_tags']
) }}
```
```sh
When a dbt model with this config block is run we will see the following in the logs:
# Example dbt run logs
```sh
# Example dbt run logs
21:59:44 Found 244 models, 15 seeds, 7 operations, 5 analyses, 875 tests, 282 sources, 0 exposures, 0 metrics, 1024 macros, 0 groups, 0 semantic models
21:59:44
21:59:49
21:59:49 Running 6 on-run-start hooks
...
21:59:50
21:59:51 Concurrency: 12 threads (target='dev')
21:59:51
21:59:51 1 of 1 START sql view model streamline.coingecko_realtime_ohlc ................. [RUN]
21:59:51 Running macro `if_data_call_function`: Calling udf udf_bulk_rest_api_v2 with params:
{
"external_table": "ASSET_OHLC_API/COINGECKO",
"producer_batch_size": "10",
"sql_limit": "10",
"sql_source": "{{this.identifier}}",
"worker_batch_size": "10",
"exploded_key": tojson(["result.transactions"])
}
on {{this.schema}}.{{this.identifier}}
22:00:03 1 of 1 OK created sql view model streamline.coingecko_realtime_ohlc ............ [SUCCESS 1 in 12.75s]
22:00:03
```
```yml
# Setup variables in dbt_project.yml
API_INTEGRATION: '{{ var("config")[target.name]["API_INTEGRATION"] }}'
EXTERNAL_FUNCTION_URI: '{{ var("config")[target.name]["EXTERNAL_FUNCTION_URI"] }}'
ROLES: '{{ var("config")[target.name]["ROLES"] }}'
config:
# The keys correspond to dbt profiles and are case sensitive
dev:
API_INTEGRATION: AWS_CROSSCHAIN_API_STG
EXTERNAL_FUNCTION_URI: q0bnjqvs9a.execute-api.us-east-1.amazonaws.com/stg/
ROLES:
- AWS_LAMBDA_CROSSCHAIN_API
- INTERNAL_DEV
prod:
API_INTEGRATION: AWS_CROSSCHAIN_API_PROD
EXTERNAL_FUNCTION_URI: 35hm1qhag9.execute-api.us-east-1.amazonaws.com/prod/
ROLES:
- AWS_LAMBDA_CROSSCHAIN_API
- INTERNAL_DEV
- DBT_CLOUD_CROSSCHAIN
```
21:59:44 Found 244 models, 15 seeds, 7 operations, 5 analyses, 875 tests, 282 sources, 0 exposures, 0 metrics, 1024 macros, 0 groups, 0 semantic models
21:59:44
21:59:49
21:59:49 Running 6 on-run-start hooks
...
21:59:50
21:59:51 Concurrency: 12 threads (target='dev')
21:59:51
21:59:51 1 of 1 START sql view model streamline.coingecko_realtime_ohlc ................. [RUN]
21:59:51 Running macro `if_data_call_function`: Calling udf udf_bulk_rest_api_v2 with params:
{
"external_table": "ASSET_OHLC_API/COINGECKO",
"producer_batch_size": "10",
"sm_secret_name": "prod/coingecko/rest",
"sql_limit": "10",
"sql_source": "{{this.identifier}}",
"worker_batch_size": "10"
}
on {{this.schema}}.{{this.identifier}}
22:00:03 1 of 1 OK created sql view model streamline.coingecko_realtime_ohlc ............ [SUCCESS 1 in 12.75s]
22:00:03
```
- [create_udf_bulk_rest_api_v2](/macros/streamline/udfs.sql#L1): This macro is used to create a `udf` named `udf_bulk_rest_api_v2` in the `streamline` schema of the database this is invoked in. This function returns a `variant` type and uses an API integration. The API integration and the external function URI are determined based on the target environment (`prod`, `dev`, or `sbx`).
The [macro interpolates](/macros/streamline/udfs.sql#L9) the `API_INTEGRATION` and `EXTERNAL_FUNCTION_URI` vars from the `dbt_project.yml` file.
The [macro interpolates](/macros/streamline/udfs.sql#L9) the `API_INTEGRATION` and `EXTERNAL_FUNCTION_URI` vars from the `dbt_project.yml` file. This is available starting with `v1.27.0`.
```yml
# Setup variables in dbt_project.yml
API_INTEGRATION: '{{ var("config")[target.name]["API_INTEGRATION"] }}'
EXTERNAL_FUNCTION_URI: '{{ var("config")[target.name]["EXTERNAL_FUNCTION_URI"] }}'
config:
# The keys correspond to dbt profiles and are case sensitive
dev:
API_INTEGRATION: AWS_CROSSCHAIN_API_STG
EXTERNAL_FUNCTION_URI: q0bnjqvs9a.execute-api.us-east-1.amazonaws.com/stg
prod:
API_INTEGRATION: AWS_CROSSCHAIN_API_PROD
EXTERNAL_FUNCTION_URI: 35hm1qhag9.execute-api.us-east-1.amazonaws.com/prod
```
**NOTE**: To be congruent with how `EXTERNAL_FUNCTION_URI` is being used by other macros and maintain consistency, starting from `v1.21.7` we need to append a trailing `/` to the `EXTERNAL_FUNCTION_URI` in the `dbt_project.yml` file.
- [create_udf_bulk_decode_logs](/macros/streamline/udfs.sql#L25): This macro is used to create a `udf` name `udf_bulk_decode_logs_v2 ` in the `streamline` schema of the databae this is invoked in. This function returns a `variant` type and uses an API integration. The API integration and the external function URI are determined based on the target environment (`prod`, `dev`, or `sbx`).
The [macro interpolates](/macros/streamline/udfs.sql#L32) the `API_INTEGRATION` and `EXTERNAL_FUNCTION_URI` vars from the `dbt_project.yml` file.
- [create_streamline_udfs](macros/create_streamline_udfs.sql#L1). This macro runs [create_udf_bulk_rest_api_v2](/macros/streamline/udfs.sql#L1) when ran with `--vars '{UPDATE_UDFS_AND_SPS: true}'`.
- [create_evm_streamline_udfs](macros/create_streamline_udfs.sql#L8). This macro runs [create_udf_bulk_rest_api_v2](/macros/streamline/udfs.sql#L1), [create_udf_bulk_decode_logs](/macros/streamline/udfs.sql#L25), and [create_udf_bulk_decode_traces](/macros/streamline/udfs.sql#L49) when ran with `--vars '{UPDATE_UDFS_AND_SPS: true}'`. This is designed to be used on the EVM chains due to the inclusion of `create_udf_bulk_decode_logs` and `create_udf_bulk_decode_traces`.
## **LiveQuery Functions**
LiveQuery is now available to be deployed into individual projects. For base functionality, you will need to deploy the core functions using `dbt run` in your project and reference the path to the LiveQuery schema or by tag.
### Basic Setup ###
### Basic Setup
1. Make sure `fsc-utils` package referenced in the project is version `v1.33.2` or greater. Re-run `dbt deps` if revision was changed.
`livequery_models deploy core` uses ephemeral models, therefore it is recommended to specify the materialization for `livequery_models` in your project's `dbt_project.yml` to `ephemeral` to avoid any conflicts.
```yml
# dbt_project.yml
---
models:
livequery_models:
deploy:
core:
materialized: ephemeral
```
1. Make sure `fsc-utils` package referenced in the project is version `v1.8.0` or greater. Re-run `dbt deps` if revision was changed.
2. Deploy the core LiveQuery functions by schema or tag
By Schema
```
dbt run -s livequery_models.deploy.core --vars '{UPDATE_UDFS_AND_SPS: true}'
```
By Tag
```
dbt run -s "livequery_models,tag:core" --vars '{UPDATE_UDFS_AND_SPS: true}'
```
By Schema
```
dbt run -s livequery_models.deploy.core --vars '{UPDATE_UDFS_AND_SPS: true}'
```
By Tag
```
dbt run -s "livequery_models,tag:core" --vars '{UPDATE_UDFS_AND_SPS: true}'
```
3. Deploy any additional functions
For example, deploy quicknode solana nft function + any dependencies (in this case the quicknode utils function)
```
dbt run -s livequery_models.deploy.quicknode.quicknode_utils__quicknode_utils livequery_models.deploy.quicknode.quicknode_solana_nfts__quicknode_utils --vars '{UPDATE_UDFS_AND_SPS: true}'
```
For example, deploy quicknode solana nft function + any dependencies (in this case the quicknode utils function)
```
dbt run -s livequery_models.deploy.quicknode.quicknode_utils__quicknode_utils livequery_models.deploy.quicknode.quicknode_solana_nfts__quicknode_utils --vars '{UPDATE_UDFS_AND_SPS: true}'
```
4. Override default LiveQuery configuration values by adding the below lines in the `vars` section of your project's `dbt_project.yml`
```
API_INTEGRATION: '{{ var("config")[target.name]["API_INTEGRATION"] if var("config")[target.name] else var("config")["dev"]["API_INTEGRATION"] }}'
EXTERNAL_FUNCTION_URI: '{{ var("config")[target.name]["EXTERNAL_FUNCTION_URI"] if var("config")[target.name] else var("config")["dev"]["EXTERNAL_FUNCTION_URI"] }}'
ROLES: |
["INTERNAL_DEV"]
```
```
API_INTEGRATION: '{{ var("config")[target.name]["API_INTEGRATION"] if var("config")[target.name] else var("config")["dev"]["API_INTEGRATION"] }}'
EXTERNAL_FUNCTION_URI: '{{ var("config")[target.name]["EXTERNAL_FUNCTION_URI"] if var("config")[target.name] else var("config")["dev"]["EXTERNAL_FUNCTION_URI"] }}'
ROLES: |
["INTERNAL_DEV"]
```
### Configuring LiveQuery API endpoints
Individual projects have the option to point to a different LiveQuery API endpoint. To do so, modify your project's `dbt_projects.yml` to include the additional configurations within the project `vars`. If no configurations are specified, the default endpoints defined in the `livequery_models` package are used.
Below is a sample configuration. The `API_INTEGRATION` and `EXTERNAL_FUNCTION_URI` should point to the specific resources deployed for your project. The `ROLES` property is a list of Snowflake role names that are granted usage to the LiveQuery functions on deployment.
Below is a sample configuration. The `API_INTEGRATION` and `EXTERNAL_FUNCTION_URI` should point to the specific resources deployed for your project. The `ROLES` property is a list of Snowflake role names that are granted usage to the LiveQuery functions on deployment. You can also add the optional `MAX_BATCH_ROWS` variable to limit the number of rows processed in a single batch to the `udf_api_batched` function (available starting with `v1.8.0`).
```
config:
@ -274,38 +441,48 @@ config:
EXTERNAL_FUNCTION_URI: myproject.api.livequery.com/path-to-endpoint/
ROLES:
- INTERNAL_DEV
MAX_BATCH_ROWS: 10
```
## Snowflake Tasks for GitHub Actions
A set of macros and UDFs have been created to help with the creation of Snowflake tasks to manage runs in GitHub Actions.
### Basic Setup ###
### Basic Setup
1. Make sure `fsc-utils` package referenced in the project is version `v1.11.0` or greater. Re-run `dbt deps` if revision was changed.
1. Make sure `fsc-utils` package referenced in the project is version `v1.11.0` or greater. Re-run `dbt deps` if revision was changed.
2. Make sure LiveQuery has been deployed to the project. See [LiveQuery Functions](#livequery-functions) for more information.
> If you are using tags to run your workflows, it is highly recommend to add the project name to the tag. For example, `"ethereum_models,tag:core"` instead of `tag:core`. This will ensure that the correct workflows are being ran within your project.
3. Install the GitHub LiveQuery Functions
```
dbt run -s livequery_models.deploy.marketplace.github --vars '{UPDATE_UDFS_AND_SPS: true}'
```
Use `-t prod` when running in production
GitHub secrets have been registered to the Snowflake System account, which is the user that will execute tasks. If you wish to use a different user to interact with the GitHub API, you will need to register the secrets to that user using [Ephit](https://science.flipsidecrypto.xyz/ephit).
```
dbt run -s livequery_models.deploy.marketplace.github --vars '{UPDATE_UDFS_AND_SPS: true}'
```
Use `-t prod` when running in production
GitHub secrets have been registered to the Snowflake System account, which is the user that will execute tasks. If you wish to use a different user to interact with the GitHub API, you will need to register the secrets to that user using [Ephit](https://science.flipsidecrypto.xyz/ephit).
4. Deploy UDFs from `fsc-utils` package
```
dbt run-operation fsc_utils.create_udfs --vars '{UPDATE_UDFS_AND_SPS: true}'
```
Use `-t prod` when running in production
Alternatively, you can add `{{- fsc_utils.create_udfs() -}}` to the `create_udfs` macro in your project to deploy the UDFs from `fsc-utils` on model start and when `UPDATE_UDFS_AND_SPS` is set to `True`.
```
dbt run-operation fsc_utils.create_udfs --vars '{UPDATE_UDFS_AND_SPS: true}'
```
Use `-t prod` when running in production
Alternatively, you can add `{{- fsc_utils.create_udfs() -}}` to the `create_udfs` macro in your project to deploy the UDFs from `fsc-utils` on model start and when `UPDATE_UDFS_AND_SPS` is set to `True`.
5. Add `github_actions__workflows.csv` to the data folder in your project. This file will contain the list of workflows to be created. The workflow name should be the same as the name of the `.yml` file in your project. It is recommended that the file name be the same as the workflow and run name. See [Polygon](https://github.com/FlipsideCrypto/polygon-models/blob/main/data/github_actions__workflows.csv) for sample format.
Seed the file into dbt
Seed the file into dbt
```
dbt seed -s github_actions__workflows
```
Add file to `sources.yml`
```
- name: github_actions
database: {{prod_db}}
@ -313,17 +490,21 @@ A set of macros and UDFs have been created to help with the creation of Snowflak
tables:
- name: workflows
```
If you would like to test in dev, you will need to seed your file to prod with a separate PR.
6. Add the `github_actions` folder to your project's `models` folder. This folder contains the models that will be used to create and monitor the workflows. See [Polygon](https://github.com/FlipsideCrypto/polygon-models/tree/main/models/github_actions)
Build the GitHub Actions View
6. Add the `github_actions` folder to your project's `models` folder. This folder contains the models that will be used to create and monitor the workflows. See [Polygon](https://github.com/FlipsideCrypto/polygon-models/tree/main/models/github_actions)
Build the GitHub Actions View
```
dbt run -m models/github_actions --full-refresh
```
Add `--vars '{UPDATE_UDFS_AND_SPS: true}'` if you have not already created UDFs on version `v1.11.0` or greater.
7. Add the template workflows `dbt_alter_gha_tasks.yml` and `dbt_test_tasks.yml`
> The [alter workflow](https://github.com/FlipsideCrypto/arbitrum-models/blob/main/.github/workflows/dbt_alter_gha_task.yml) is used to `SUSPEND` or `RESUME` tasks, which you will need to do if you want to pause a workflow while merging a big PR, for example. This is intended to be ran on an ad-hoc basis.
> The [test workflow](https://github.com/FlipsideCrypto/arbitrum-models/blob/main/.github/workflows/dbt_test_tasks.yml) is used to test the workflows. It ensures that workflows are running according to the schedule and that the tasks are completing successfully. You will want to include this workflow within `github_actions__workflows.csv`. You can change the `.yml` included in the `models/github_actions` folder to better suite your testing needs, if necessary.
@ -338,11 +519,13 @@ A set of macros and UDFs have been created to help with the creation of Snowflak
9. Add the `START_GHA_TASKS` variable to `dbt_project.yml`
```
START_GHA_TASKS: False
``````
```
10. Create the Tasks
```
dbt run-operation fsc_utils.create_gha_tasks --vars '{"START_GHA_TASKS":True}'
```
> This will create the tasks in Snowflake and the workflows in GitHub Actions. The tasks will only be started if `START_GHA_TASKS` is set to `True` and the target is the production database for your project.
11. Add a Data Dog CI Pipeline Alert on the logs of `dbt_test_tasks` to ensure that the test is checking the workflows successfully. See `Polygon Task Alert` in Data Dog for sample alert.
@ -351,41 +534,40 @@ A set of macros and UDFs have been created to help with the creation of Snowflak
A set of macros to help with generating dynamic merge predicate statements for models in chain projects. Specifically this will output a concatenanted set of BETWEEN statements of contiguous ranges.
### Setup and Usage ###
### Setup and Usage
The macro only supports generating predicates for column types of DATE and INTEGER
1. Make sure fsc-utils package referenced in the project is version `v1.16.1` or greater. Re-run dbt deps if revision was changed.
1. Make sure fsc-utils package referenced in the project is version `v1.16.1` or greater. Re-run dbt deps if revision was changed.
#### Inline Usage ####
#### Inline Usage
{% set between_stmts = fsc_utils.dynamic_range_predicate("silver.my_temp_table", "block_timestamp::date") %}
...
SELECT
SELECT
*
FROM
FROM
some_other_table
WHERE
WHERE
{{ between_stmts }}
#### DBT Snowflake incremental_predicate Usage ####
#### DBT Snowflake incremental_predicate Usage
1. Requires overriding behavior of `get_merge_sql` macro
1. Requires overriding behavior of `get_merge_sql` macro
2. Create a file in `macros/dbt/` ex: `macros/dbt/get_merge.sql`
2. Create a file in `macros/dbt/` ex: `macros/dbt/get_merge.sql`
3. Copy this to the new file
3. Copy this to the new file
```
{% macro get_merge_sql(target, source, unique_key, dest_columns, incremental_predicates) -%}
{% set merge_sql = fsc_utils.get_merge_sql(target, source, unique_key, dest_columns, incremental_predicates) %}
{{ return(merge_sql) }}
{% endmacro %}
```
**NOTE**: This is backwards compatible with the default dbt merge behavior, however it does override the default macro. If additional customization is needed, the above macro should be modified.
4. Example usage to create predicates using block_id
**NOTE**: This is backwards compatible with the default dbt merge behavior, however it does override the default macro. If additional customization is needed, the above macro should be modified.
4. Example usage to create predicates using block_id
```
{{ config(
...
@ -393,12 +575,12 @@ The macro only supports generating predicates for column types of DATE and INTEG
...
) }}
```
Example Output: ```(DBT_INTERNAL_DEST.block_id between 100 and 200 OR DBT_INTERNAL_DEST.block_id between 100000 and 150000)```
Example Output: `(DBT_INTERNAL_DEST.block_id between 100 and 200 OR DBT_INTERNAL_DEST.block_id between 100000 and 150000)`
## Resources
* Learn more about dbt [in the docs](https://docs.getdbt.com/docs/introduction)
* Check out [Discourse](https://discourse.getdbt.com/) for commonly asked questions and answers
* Join the [chat](https://community.getdbt.com/) on Slack for live discussions and support
* Find [dbt events](https://events.getdbt.com) near you
* Check out [the blog](https://blog.getdbt.com/) for the latest news on dbt's development and best practices
- Learn more about dbt [in the docs](https://docs.getdbt.com/docs/introduction)
- Check out [Discourse](https://discourse.getdbt.com/) for commonly asked questions and answers
- Join the [chat](https://community.getdbt.com/) on Slack for live discussions and support
- Find [dbt events](https://events.getdbt.com) near you
- Check out [the blog](https://blog.getdbt.com/) for the latest news on dbt's development and best practices

View File

@ -0,0 +1,17 @@
{% macro create_streamline_udfs() %}
{% if var("UPDATE_UDFS_AND_SPS") %}
{% do run_query("CREATE SCHEMA IF NOT EXISTS streamline") %}
{{ create_udf_bulk_rest_api_v2() }}
{% endif %}
{% endmacro %}
{% macro create_evm_streamline_udfs() %}
{% if var("UPDATE_UDFS_AND_SPS") %}
{% do run_query("CREATE SCHEMA IF NOT EXISTS streamline") %}
{{ create_udf_bulk_rest_api_v2() }}
{{ create_udf_bulk_decode_logs() }}
{{ create_udf_bulk_decode_traces() }}
{% endif %}
{% endmacro %}

View File

@ -1,11 +1,37 @@
{% macro set_query_tag() -%}
{% set new_json = {"repo":project_name, "object":this.table, "profile":target.profile_name, "env":target.name, "existing_tag":get_current_query_tag() } %}
{% set new_query_tag = tojson(new_json) | as_text %}
{% if new_query_tag %}
{% set original_query_tag = get_current_query_tag() %}
{{ log("Setting query_tag to '" ~ new_query_tag ~ "'. Will reset to '" ~ original_query_tag ~ "' after materialization.") }}
{% do run_query("alter session set query_tag = '{}'".format(new_query_tag)) %}
{{ return(original_query_tag)}}
{% endif %}
{{ return(none)}}
{% endmacro %}
{% macro get_query_tag() %}
{# Get the full path of the model #}
{% set model_path = model.path | string %}
{% set folder_path = '/'.join(model_path.split('/')[:-1]) %}
{# Get core folders from vars #}
{% set core_folders = var('core_folders') %}
{# Initialize is_core and check each path pattern #}
{% set ns = namespace(is_core=false) %}
{% for folder in core_folders %}
{% if folder in folder_path %}
{% set ns.is_core = true %}
{% endif %}
{% endfor %}
{# Build the JSON query tag #}
{% set tag_dict = {
"project": project_name,
"model": model.name,
"model_type": "core" if ns.is_core else "non_core",
"invocation_id": invocation_id,
"dbt_tags": config.get('tags', [])
} %}
{% set query_tag = tojson(tag_dict) %}
{# Return the properly escaped string #}
{{ return("'" ~ query_tag ~ "'") }}
{% endmacro %}
{% macro set_query_tag() %}
{% set tag = fsc_utils.get_query_tag() %}
{% do run_query("alter session set query_tag = " ~ tag) %}
{{ return("") }}
{% endmacro %}

View File

@ -12,7 +12,7 @@
NULL
LANGUAGE PYTHON
STRICT IMMUTABLE
RUNTIME_VERSION = '3.8'
RUNTIME_VERSION = '3.9'
HANDLER = 'hex_to_int'
sql: |
{{ fsc_utils.python_hex_to_int() | indent(4) }}
@ -25,11 +25,23 @@
NULL
LANGUAGE PYTHON
STRICT IMMUTABLE
RUNTIME_VERSION = '3.8'
RUNTIME_VERSION = '3.9'
HANDLER = 'hex_to_int'
sql: |
{{ fsc_utils.python_udf_hex_to_int_with_encoding() | indent(4) }}
- name: {{ schema }}.udf_int_to_hex
signature:
- [int, NUMBER]
return_type: VARCHAR(16777216)
options: |
NULL
LANGUAGE SQL
STRICT IMMUTABLE
sql: |
SELECT CONCAT('0x', TRIM(TO_CHAR(int, 'XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX')))
- name: {{ schema }}.udf_hex_to_string
signature:
- [hex, STRING]
@ -105,7 +117,7 @@
return_type: TEXT
options: |
LANGUAGE PYTHON
RUNTIME_VERSION = '3.8'
RUNTIME_VERSION = '3.9'
HANDLER = 'get_simplified_signature'
sql: |
{{ fsc_utils.create_udf_evm_text_signature() | indent(4) }}
@ -116,7 +128,7 @@
return_type: TEXT
options: |
LANGUAGE PYTHON
RUNTIME_VERSION = '3.8'
RUNTIME_VERSION = '3.9'
PACKAGES = ('pycryptodome==3.15.0')
HANDLER = 'udf_encode'
sql: |
@ -129,7 +141,7 @@
return_type: VARCHAR
options: |
LANGUAGE PYTHON
RUNTIME_VERSION = '3.8'
RUNTIME_VERSION = '3.9'
HANDLER = 'custom_divide'
sql: |
{{ fsc_utils.create_udf_decimal_adjust() | indent(4) }}
@ -141,7 +153,7 @@
return_type: TABLE(workflow_name STRING, workflow_schedule STRING, timestamp TIMESTAMP_NTZ)
options: |
LANGUAGE PYTHON
RUNTIME_VERSION = '3.8'
RUNTIME_VERSION = '3.9'
PACKAGES = ('croniter')
HANDLER = 'TimestampGenerator'
sql: |
@ -153,18 +165,18 @@
return_type: VARIANT
options: |
LANGUAGE PYTHON
RUNTIME_VERSION = '3.8'
RUNTIME_VERSION = '3.9'
HANDLER = 'transform'
sql: |
{{ fsc_utils.create_udf_transform_logs() | indent(4) }}
- name: {{ schema }}.udf_base58_to_hex
signature:
- [input, STRING]
- [base58, STRING]
return_type: TEXT
options: |
LANGUAGE PYTHON
RUNTIME_VERSION = '3.8'
RUNTIME_VERSION = '3.9'
HANDLER = 'transform_base58_to_hex'
sql: |
{{ fsc_utils.create_udf_base58_to_hex() | indent(4) }}
@ -175,7 +187,7 @@
return_type: TEXT
options: |
LANGUAGE PYTHON
RUNTIME_VERSION = '3.8'
RUNTIME_VERSION = '3.9'
HANDLER = 'transform_hex_to_base58'
sql: |
{{ fsc_utils.create_udf_hex_to_base58() | indent(4) }}
@ -187,7 +199,7 @@
return_type: TEXT
options: |
LANGUAGE PYTHON
RUNTIME_VERSION = '3.8'
RUNTIME_VERSION = '3.9'
HANDLER = 'transform_hex_to_bech32'
sql: |
{{ fsc_utils.create_udf_hex_to_bech32() | indent(4) }}
@ -198,7 +210,7 @@
return_type: TEXT
options: |
LANGUAGE PYTHON
RUNTIME_VERSION = '3.8'
RUNTIME_VERSION = '3.9'
HANDLER = 'transform_hex_to_algorand'
sql: |
{{ fsc_utils.create_udf_hex_to_algorand() | indent(4) }}
@ -210,7 +222,7 @@
return_type: TEXT
options: |
LANGUAGE PYTHON
RUNTIME_VERSION = '3.8'
RUNTIME_VERSION = '3.9'
HANDLER = 'transform_hex_to_tezos'
sql: |
{{ fsc_utils.create_udf_hex_to_tezos() | indent(4) }}
@ -254,5 +266,144 @@
sql: |
{{ fsc_utils.create_udtf_flatten_overflowed_responses() | indent(4) }}
- name: {{ schema }}.udf_decompress_zlib
signature:
- [compressed_string, STRING]
return_type: STRING
options: |
LANGUAGE PYTHON
RUNTIME_VERSION = '3.10'
COMMENT = 'Decompresses zlib/deflate-compressed data from Python bytes literal string format'
HANDLER = 'decompress_zlib'
sql: |
{{ fsc_utils.create_udf_decompress_zlib() | indent(4) }}
- name: {{ schema }}.udf_stablecoin_data_parse
signature:
- [peggeddata_content, STRING]
return_type: |
TABLE (
id STRING,
name STRING,
address STRING,
symbol STRING,
onCoinGecko BOOLEAN,
gecko_id STRING,
cmcId STRING,
pegType STRING,
pegMechanism STRING,
priceSource STRING,
deadFrom STRING,
delisted BOOLEAN,
deprecated BOOLEAN,
doublecounted BOOLEAN
)
options: |
LANGUAGE PYTHON
RUNTIME_VERSION = '3.10'
HANDLER = 'udf_stablecoin_data_parse'
sql: |
{{ fsc_utils.create_udf_stablecoin_data_parse() | indent(4) }}
- name: {{ schema }}.udf_encode_contract_call
signature:
- [function_abi, VARIANT]
- [input_values, ARRAY]
return_type: STRING
options: |
LANGUAGE PYTHON
RUNTIME_VERSION = '3.10'
PACKAGES = ('eth-abi')
HANDLER = 'encode_call'
COMMENT = 'Encodes EVM contract function calls into ABI-encoded calldata format for eth_call RPC requests. Handles all Solidity types including tuples and arrays.'
sql: |
{{ fsc_utils.create_udf_encode_contract_call() | indent(4) }}
- name: {{ schema }}.udf_create_eth_call
signature:
- [contract_address, STRING]
- [encoded_calldata, STRING]
return_type: OBJECT
options: |
NULL
LANGUAGE SQL
STRICT IMMUTABLE
COMMENT = 'Creates an eth_call JSON-RPC request object with default block parameter "latest".'
sql: |
{{ schema }}.udf_json_rpc_call(
'eth_call',
ARRAY_CONSTRUCT(
OBJECT_CONSTRUCT(
'to', contract_address,
'data', encoded_calldata
),
'latest'
)
)
- name: {{ schema }}.udf_create_eth_call
signature:
- [contract_address, STRING]
- [encoded_calldata, STRING]
- [block_parameter, VARIANT]
return_type: OBJECT
options: |
NULL
LANGUAGE SQL
STRICT IMMUTABLE
COMMENT = 'Creates an eth_call JSON-RPC request object. Accepts contract address, encoded calldata, and optional block parameter (string or number). If block_parameter is a number, it will be converted to hex format using ai.utils.udf_int_to_hex.'
sql: |
{{ schema }}.udf_json_rpc_call(
'eth_call',
ARRAY_CONSTRUCT(
OBJECT_CONSTRUCT(
'to', contract_address,
'data', encoded_calldata
),
CASE
WHEN block_parameter IS NULL THEN 'latest'
WHEN TYPEOF(block_parameter) IN ('INTEGER', 'NUMBER', 'FIXED', 'FLOAT') THEN
{{ schema }}.udf_int_to_hex(block_parameter::NUMBER)
ELSE block_parameter::STRING
END
)
)
- name: {{ schema }}.udf_create_eth_call_from_abi
signature:
- [contract_address, STRING]
- [function_abi, VARIANT]
- [input_values, ARRAY]
return_type: OBJECT
options: |
NULL
LANGUAGE SQL
STRICT IMMUTABLE
COMMENT = 'Convenience function that combines contract call encoding and JSON-RPC request creation for eth_call. Encodes function call from ABI and creates RPC request with default block parameter "latest".'
sql: |
{{ schema }}.udf_create_eth_call(
contract_address,
{{ schema }}.udf_encode_contract_call(function_abi, input_values)
)
- name: {{ schema }}.udf_create_eth_call_from_abi
signature:
- [contract_address, STRING]
- [function_abi, VARIANT]
- [input_values, ARRAY]
- [block_parameter, VARIANT]
return_type: OBJECT
options: |
NULL
LANGUAGE SQL
STRICT IMMUTABLE
COMMENT = 'Convenience function that combines contract call encoding and JSON-RPC request creation for eth_call. Encodes function call from ABI and creates RPC request with specified block parameter.'
sql: |
{{ schema }}.udf_create_eth_call(
contract_address,
{{ schema }}.udf_encode_contract_call(function_abi, input_values),
block_parameter
)
{% endmacro %}

View File

@ -181,15 +181,23 @@ def transform(events: dict):
{% macro create_udf_base58_to_hex() %}
def transform_base58_to_hex(input):
if input is None:
def transform_base58_to_hex(base58):
if base58 is None:
return 'Invalid input'
ALPHABET = "123456789ABCDEFGHJKLMNPQRSTUVWXYZabcdefghijkmnopqrstuvwxyz"
base_count = len(ALPHABET)
num = 0
for char in input:
leading_zeros = 0
for char in base58:
if char == '1':
leading_zeros += 1
else:
break
for char in base58:
num *= base_count
if char in ALPHABET:
num += ALPHABET.index(char)
@ -201,7 +209,9 @@ def transform_base58_to_hex(input):
if len(hex_string) % 2 != 0:
hex_string = '0' + hex_string
return '0x' + hex_string
hex_leading_zeros = '00' * leading_zeros
return '0x' + hex_leading_zeros + hex_string
{% endmacro %}
@ -530,4 +540,473 @@ class FlattenRows:
cleansed["index_cols"] = cleansed[temp_index_cols].apply(list, axis=1)
cleansed.drop(columns=temp_index_cols, inplace=True, errors="ignore")
return list(cleansed[np.roll(cleansed.columns.values, 1).tolist()].itertuples(index=False, name=None))
{% endmacro %}
{% endmacro %}
{% macro create_udf_decompress_zlib() %}
import zlib
import codecs
def decompress_zlib(compressed_string):
try:
if not compressed_string:
return None
# Remove b prefix and suffix if present
if compressed_string.startswith("b'") and compressed_string.endswith("'"):
compressed_string = compressed_string[2:-1]
elif compressed_string.startswith('b"') and compressed_string.endswith('"'):
compressed_string = compressed_string[2:-1]
# Decode the escaped string to bytes
compressed_bytes = codecs.decode(compressed_string, 'unicode_escape')
# Convert to bytes if string
if isinstance(compressed_bytes, str):
compressed_bytes = compressed_bytes.encode('latin-1')
# Decompress the zlib data
decompressed = zlib.decompress(compressed_bytes)
# Return as UTF-8 string
return decompressed.decode('utf-8')
except Exception as e:
return f"Error decompressing: {str(e)}"
{% endmacro %}
{% macro create_udf_stablecoin_data_parse() %}
import re
class udf_stablecoin_data_parse:
def process(self, peggeddata_content):
"""Main parsing function"""
def extract_field_value(obj_text, field_name):
"""Extract field value from object text using regex patterns"""
# Handle different field patterns
patterns = [
rf'{field_name}\s*:\s*"([^"]*)"',
rf"{field_name}\s*:\s*'([^']*)'",
rf'{field_name}\s*:\s*`([^`]*)`',
rf'{field_name}\s*:\s*(true|false|null|undefined)',
rf'{field_name}\s*:\s*([^,}}\n]+)'
]
for pattern in patterns:
match = re.search(pattern, obj_text, re.IGNORECASE | re.DOTALL)
if match:
value = match.group(1).strip()
# Clean up the value
value = re.sub(r'[,}}\n]', '', value).strip()
if value.lower() in ('null', 'undefined', ''):
return None
# Handle boolean values
if value.lower() == 'true':
return True
if value.lower() == 'false':
return False
return value
return None
def convert_value(value, expected_type):
"""Convert value to appropriate type"""
if value is None:
return None
if expected_type == 'BOOLEAN':
if isinstance(value, bool):
return value
if isinstance(value, str):
lower = value.lower()
if lower == 'true':
return True
if lower == 'false':
return False
return None
return str(value) if value is not None else None
try:
# Find the main array content - make the regex non-greedy but capture everything
array_match = re.search(r'export\s+default\s*\[(.*)\];?\s*$', peggeddata_content, re.DOTALL)
if not array_match:
raise Exception('Could not find exported array in peggedData content')
array_content = array_match.group(1).strip()
# Use a simpler regex-based approach to split objects
# Remove comments and clean up the array content first
# Instead of removing line comments entirely, just remove the // markers but keep the content
clean_content = re.sub(r'^\s*//\s*', '', array_content, flags=re.MULTILINE) # Remove // at start of lines
clean_content = re.sub(r'\n\s*//\s*', '\n', clean_content) # Remove // from middle of lines
# Instead of removing block comments entirely, just remove the comment markers but keep the content
clean_content = re.sub(r'/\*', '', clean_content) # Remove opening block comment markers
clean_content = re.sub(r'\*/', '', clean_content) # Remove closing block comment markers
# Find all objects using regex - look for {...} patterns
# This is more reliable than manual parsing
object_pattern = r'\{[^{}]*(?:\{[^{}]*\}[^{}]*)*\}'
matches = re.finditer(object_pattern, clean_content, re.DOTALL)
objects = []
for match in matches:
obj_text = match.group(0).strip()
if obj_text and len(obj_text) > 10: # Filter out small matches
objects.append(obj_text)
# If the simple regex didn't work, try a more complex nested approach
if not objects:
# More complex regex for nested objects
nested_pattern = r'\{(?:[^{}]|(?:\{[^{}]*\}))*\}'
nested_matches = re.findall(nested_pattern, clean_content, re.DOTALL)
objects = [obj.strip() for obj in nested_matches if len(obj.strip()) > 20]
# Still no objects? Try manual parsing with better logic
if not objects:
objects = []
current_object = ''
brace_count = 0
in_string = False
string_char = ''
i = 0
while i < len(clean_content):
char = clean_content[i]
# Handle string literals
if not in_string and char in ('"', "'", '`'):
in_string = True
string_char = char
elif in_string and char == string_char:
# Check if it's escaped
if i > 0 and clean_content[i-1] != '\\':
in_string = False
string_char = ''
# Handle braces only when not in string
if not in_string:
if char == '{':
if brace_count == 0:
current_object = '{' # Start new object
else:
current_object += char
brace_count += 1
elif char == '}':
current_object += char
brace_count -= 1
if brace_count == 0 and current_object.strip():
# Complete object found
objects.append(current_object.strip())
current_object = ''
elif brace_count > 0:
current_object += char
else:
if brace_count > 0:
current_object += char
i += 1
if not objects:
# Last resort: try splitting on id: pattern
id_splits = re.split(r'\n\s*id:\s*["\']', clean_content)
if len(id_splits) > 1:
objects = []
for i, part in enumerate(id_splits[1:], 1): # Skip first empty part
# Try to reconstruct the object
obj_start = clean_content.find(f'id:', clean_content.find(part))
if obj_start > 0:
# Look backwards for opening brace
brace_start = clean_content.rfind('{', 0, obj_start)
if brace_start >= 0:
# Look forward for matching closing brace
brace_count = 0
for j in range(brace_start, len(clean_content)):
if clean_content[j] == '{':
brace_count += 1
elif clean_content[j] == '}':
brace_count -= 1
if brace_count == 0:
obj_text = clean_content[brace_start:j+1].strip()
if len(obj_text) > 20:
objects.append(obj_text)
break
if not objects:
raise Exception(f'No objects found after all parsing attempts. Sample content: {clean_content[:500]}...')
# Process each object and extract the required fields
for i, obj_text in enumerate(objects):
try:
data = {
'id': extract_field_value(obj_text, 'id'),
'name': extract_field_value(obj_text, 'name'),
'address': extract_field_value(obj_text, 'address'),
'symbol': extract_field_value(obj_text, 'symbol'),
'onCoinGecko': extract_field_value(obj_text, 'onCoinGecko'),
'gecko_id': extract_field_value(obj_text, 'gecko_id'),
'cmcId': extract_field_value(obj_text, 'cmcId'),
'pegType': extract_field_value(obj_text, 'pegType'),
'pegMechanism': extract_field_value(obj_text, 'pegMechanism'),
'priceSource': extract_field_value(obj_text, 'priceSource'),
'deadFrom': extract_field_value(obj_text, 'deadFrom'),
'delisted': extract_field_value(obj_text, 'delisted'),
'deprecated': extract_field_value(obj_text, 'deprecated'),
'doublecounted': extract_field_value(obj_text, 'doublecounted')
}
# Only include objects that have at least id and name
if data['id'] and data['name']:
yield (
convert_value(data['id'], 'STRING'),
convert_value(data['name'], 'STRING'),
convert_value(data['address'], 'STRING'),
convert_value(data['symbol'], 'STRING'),
convert_value(data['onCoinGecko'], 'BOOLEAN'),
convert_value(data['gecko_id'], 'STRING'),
convert_value(data['cmcId'], 'STRING'),
convert_value(data['pegType'], 'STRING'),
convert_value(data['pegMechanism'], 'STRING'),
convert_value(data['priceSource'], 'STRING'),
convert_value(data['deadFrom'], 'STRING'),
convert_value(data['delisted'], 'BOOLEAN'),
convert_value(data['deprecated'], 'BOOLEAN'),
convert_value(data['doublecounted'], 'BOOLEAN')
)
except Exception as obj_error:
# Skip malformed objects but continue processing
continue
except Exception as error:
raise Exception(f'Error parsing peggedData content: {str(error)}')
{% endmacro %}
{% macro create_udf_encode_contract_call() %}
def encode_call(function_abi, input_values):
"""
Encodes EVM contract function calls into ABI-encoded calldata.
This function generates complete calldata (selector + encoded params) that can be
used directly in eth_call JSON-RPC requests to query contract state.
"""
import eth_abi
from eth_hash.auto import keccak
import json
def get_function_signature(abi):
"""
Generate function signature using the same logic as utils.udf_evm_text_signature.
Examples:
balanceOf(address)
transfer(address,uint256)
swap((address,address,uint256))
"""
def generate_signature(inputs):
signature_parts = []
for input_data in inputs:
if 'components' in input_data:
# Handle nested tuples
component_signature_parts = []
components = input_data['components']
component_signature_parts.extend(generate_signature(components))
component_signature_parts[-1] = component_signature_parts[-1].rstrip(",")
if input_data['type'].endswith('[]'):
signature_parts.append("(" + "".join(component_signature_parts) + ")[],")
else:
signature_parts.append("(" + "".join(component_signature_parts) + "),")
else:
# Clean up Solidity-specific modifiers
signature_parts.append(input_data['type'].replace('enum ', '').replace(' payable', '') + ",")
return signature_parts
signature_parts = [abi['name'] + "("]
signature_parts.extend(generate_signature(abi.get('inputs', [])))
if len(signature_parts) > 1:
signature_parts[-1] = signature_parts[-1].rstrip(",") + ")"
else:
signature_parts.append(")")
return "".join(signature_parts)
def function_selector(abi):
"""Calculate 4-byte function selector using Keccak256 hash."""
signature = get_function_signature(abi)
hash_bytes = keccak(signature.encode('utf-8'))
return hash_bytes[:4].hex(), signature
def get_canonical_type(input_spec):
"""
Convert ABI input spec to canonical type string for eth_abi encoding.
Handles tuple expansion: tuple -> (address,uint256,bytes)
"""
param_type = input_spec['type']
if param_type.startswith('tuple'):
components = input_spec.get('components', [])
component_types = ','.join([get_canonical_type(comp) for comp in components])
canonical = f"({component_types})"
# Preserve array suffixes: tuple[] -> (address,uint256)[]
if param_type.endswith('[]'):
array_suffix = param_type[5:] # Everything after 'tuple'
canonical += array_suffix
return canonical
return param_type
def prepare_value(value, param_type, components=None):
"""
Convert Snowflake values to Python types suitable for eth_abi encoding.
Handles type coercion and format normalization for all Solidity types.
"""
# Handle null/None values with sensible defaults
if value is None:
if param_type.startswith('uint') or param_type.startswith('int'):
return 0
elif param_type == 'address':
return '0x' + '0' * 40
elif param_type == 'bool':
return False
elif param_type.startswith('bytes'):
return b''
else:
return value
# CRITICAL: Check arrays FIRST (before base types)
# This prevents bytes[] from matching the bytes check
if param_type.endswith('[]'):
base_type = param_type[:-2]
if not isinstance(value, list):
return []
# Special handling for tuple arrays
if base_type == 'tuple' and components:
return [prepare_tuple(v, components) for v in value]
else:
return [prepare_value(v, base_type) for v in value]
# Base type conversions
if param_type == 'address':
addr = str(value).lower()
if not addr.startswith('0x'):
addr = '0x' + addr
return addr
if param_type.startswith('uint') or param_type.startswith('int'):
return int(value)
if param_type == 'bool':
if isinstance(value, str):
return value.lower() in ('true', '1', 'yes')
return bool(value)
if param_type.startswith('bytes'):
if isinstance(value, str):
if value.startswith('0x'):
value = value[2:]
return bytes.fromhex(value)
return value
if param_type == 'string':
return str(value)
return value
def prepare_tuple(value, components):
"""
Recursively prepare tuple values, handling nested structures.
Tuples can contain other tuples, arrays, or tuple arrays.
"""
if not isinstance(value, (list, tuple)):
# Support dict-style input (by component name)
if isinstance(value, dict):
value = [value.get(comp.get('name', f'field_{i}'))
for i, comp in enumerate(components)]
else:
return value
result = []
for i, comp in enumerate(components):
if i >= len(value):
result.append(None)
continue
comp_type = comp['type']
val = value[i]
# Handle tuple arrays within tuples
if comp_type.endswith('[]') and comp_type.startswith('tuple'):
sub_components = comp.get('components', [])
result.append(prepare_value(val, comp_type, sub_components))
elif comp_type.startswith('tuple'):
# Single tuple (not array)
sub_components = comp.get('components', [])
result.append(prepare_tuple(val, sub_components))
else:
result.append(prepare_value(val, comp_type))
return tuple(result)
try:
inputs = function_abi.get('inputs', [])
# Calculate selector using battle-tested signature generation
selector_hex, signature = function_selector(function_abi)
# Functions with no inputs only need the selector
if not inputs:
return '0x' + selector_hex
# Prepare values for encoding
prepared_values = []
for i, inp in enumerate(inputs):
if i >= len(input_values):
prepared_values.append(None)
continue
value = input_values[i]
param_type = inp['type']
# Handle tuple arrays at top level
if param_type.endswith('[]') and param_type.startswith('tuple'):
components = inp.get('components', [])
prepared_values.append(prepare_value(value, param_type, components))
elif param_type.startswith('tuple'):
# Single tuple (not array)
components = inp.get('components', [])
prepared_values.append(prepare_tuple(value, components))
else:
prepared_values.append(prepare_value(value, param_type))
# Get canonical type strings for eth_abi (expands tuples)
types = [get_canonical_type(inp) for inp in inputs]
# Encode parameters using eth_abi
encoded_params = eth_abi.encode(types, prepared_values).hex()
# Return complete calldata: selector + encoded params
return '0x' + selector_hex + encoded_params
except Exception as e:
# Return structured error for debugging
import traceback
return json.dumps({
'error': str(e),
'traceback': traceback.format_exc(),
'function': function_abi.get('name', 'unknown'),
'signature': signature if 'signature' in locals() else 'not computed',
'selector': '0x' + selector_hex if 'selector_hex' in locals() else 'not computed',
'types': types if 'types' in locals() else 'not computed'
})
{% endmacro %}

View File

@ -88,3 +88,72 @@ WHERE
{% endfor %}
{% endmacro %}
{% macro streamline_external_table_query_v2(
model,
partition_function
) %}
WITH meta AS (
SELECT
job_created_time AS _inserted_timestamp,
file_name,
{{ partition_function }} AS partition_key
FROM
TABLE(
information_schema.external_table_file_registration_history(
start_time => DATEADD('day', -3, CURRENT_TIMESTAMP()),
table_name => '{{ source( "bronze_streamline", model) }}')
) A
)
SELECT
s.*,
b.file_name,
_inserted_timestamp
FROM
{{ source(
"bronze_streamline",
model
) }}
s
JOIN meta b
ON b.file_name = metadata$filename
AND b.partition_key = s.partition_key
WHERE
b.partition_key = s.partition_key
AND DATA :error IS NULL
AND DATA is not null
{% endmacro %}
{% macro streamline_external_table_FR_query_v2(
model,
partition_function
) %}
WITH meta AS (
SELECT
registered_on AS _inserted_timestamp,
file_name,
{{ partition_function }} AS partition_key
FROM
TABLE(
information_schema.external_table_files(
table_name => '{{ source( "bronze_streamline", model) }}'
)
) A
)
SELECT
s.*,
b.file_name,
_inserted_timestamp
FROM
{{ source(
"bronze_streamline",
model
) }}
s
JOIN meta b
ON b.file_name = metadata$filename
AND b.partition_key = s.partition_key
WHERE
b.partition_key = s.partition_key
AND DATA :error IS NULL
AND DATA is not null
{% endmacro %}

View File

@ -3,21 +3,69 @@
{{ log("role:" ~ target.role ~ ", user:" ~ target.user, info=True) }}
{% set sql %}
CREATE OR REPLACE EXTERNAL FUNCTION streamline.udf_bulk_rest_api_v2(json variant) returns variant api_integration =
CREATE OR REPLACE EXTERNAL FUNCTION streamline.udf_bulk_rest_api_v2(json object) returns array api_integration =
{% if target.name == "prod" %}
{{ log("Creating prod udf_bulk_rest_api_v2", info=True) }}
{{ var("API_INTEGRATION") }} AS 'https://{{ var("EXTERNAL_FUNCTION_URI") | lower }}/udf_bulk_rest_api'
{{ var("API_INTEGRATION") }} AS 'https://{{ var("EXTERNAL_FUNCTION_URI") | lower }}udf_bulk_rest_api'
{% elif target.name == "dev" %}
{{ log("Creating dev udf_bulk_rest_api_v2", info=True) }}
{{ var("API_INTEGRATION") }} AS 'https://{{ var("EXTERNAL_FUNCTION_URI") | lower }}/udf_bulk_rest_api'
{{ var("API_INTEGRATION") }} AS 'https://{{ var("EXTERNAL_FUNCTION_URI") | lower }}udf_bulk_rest_api'
{% elif target.name == "sbx" %}
{{ log("Creating stg udf_bulk_rest_api_v2", info=True) }}
{{ var("API_INTEGRATION") }} AS 'https://{{ var("EXTERNAL_FUNCTION_URI") | lower }}/udf_bulk_rest_api'
{{ var("API_INTEGRATION") }} AS 'https://{{ var("EXTERNAL_FUNCTION_URI") | lower }}udf_bulk_rest_api'
{% else %}
{{ log("Creating default (dev) udf_bulk_rest_api_v2", info=True) }}
{{ var("config")["dev"]["API_INTEGRATION"] }} AS 'https://{{ var("config")["dev"]["EXTERNAL_FUNCTION_URI"] | lower }}/udf_bulk_rest_api'
{% endif %}
{{ var("config")["dev"]["API_INTEGRATION"] }} AS 'https://{{ var("config")["dev"]["EXTERNAL_FUNCTION_URI"] | lower }}udf_bulk_rest_api'
{% endif %};
{% endset %}
{{ log(sql, info=True) }}
{% do adapter.execute(sql) %}
{% endmacro %}
{% endmacro %}
{% macro create_udf_bulk_decode_logs() %}
{{ log("Creating udf udf_bulk_decode_logs_v2 for target:" ~ target.name ~ ", schema: " ~ target.schema ~ ", DB: " ~ target.database, info=True) }}
{{ log("role:" ~ target.role ~ ", user:" ~ target.user, info=True) }}
{% set sql %}
CREATE OR REPLACE EXTERNAL FUNCTION streamline.udf_bulk_decode_logs_v2(json object) returns array api_integration =
{% if target.name == "prod" %}
{{ log("Creating prod udf_bulk_decode_logs_v2", info=True) }}
{{ var("API_INTEGRATION") }} AS 'https://{{ var("EXTERNAL_FUNCTION_URI") | lower }}bulk_decode_logs'
{% elif target.name == "dev" %}
{{ log("Creating dev udf_bulk_decode_logs_v2", info=True) }}
{{ var("API_INTEGRATION") }} AS 'https://{{ var("EXTERNAL_FUNCTION_URI") | lower }}bulk_decode_logs'
{% elif target.name == "sbx" %}
{{ log("Creating stg udf_bulk_decode_logs_v2", info=True) }}
{{ var("API_INTEGRATION") }} AS 'https://{{ var("EXTERNAL_FUNCTION_URI") | lower }}bulk_decode_logs'
{% else %}
{{ log("Creating default (dev) udf_bulk_decode_logs_v2", info=True) }}
{{ var("config")["dev"]["API_INTEGRATION"] }} AS 'https://{{ var("config")["dev"]["EXTERNAL_FUNCTION_URI"] | lower }}bulk_decode_logs'
{% endif %};
{% endset %}
{{ log(sql, info=True) }}
{% do adapter.execute(sql) %}
{% endmacro %}
{% macro create_udf_bulk_decode_traces() %}
{{ log("Creating udf udf_bulk_decode_traces_v2 for target:" ~ target.name ~ ", schema: " ~ target.schema ~ ", DB: " ~ target.database, info=True) }}
{{ log("role:" ~ target.role ~ ", user:" ~ target.user, info=True) }}
{% set sql %}
CREATE OR REPLACE EXTERNAL FUNCTION streamline.udf_bulk_decode_traces_v2(json object) returns array api_integration =
{% if target.name == "prod" %}
{{ log("Creating prod udf_bulk_decode_traces_v2", info=True) }}
{{ var("API_INTEGRATION") }} AS 'https://{{ var("EXTERNAL_FUNCTION_URI") | lower }}bulk_decode_traces'
{% elif target.name == "dev" %}
{{ log("Creating dev udf_bulk_decode_traces_v2", info=True) }}
{{ var("API_INTEGRATION") }} AS 'https://{{ var("EXTERNAL_FUNCTION_URI") | lower }}bulk_decode_traces'
{% elif target.name == "sbx" %}
{{ log("Creating stg udf_bulk_decode_traces_v2", info=True) }}
{{ var("API_INTEGRATION") }} AS 'https://{{ var("EXTERNAL_FUNCTION_URI") | lower }}bulk_decode_traces'
{% else %}
{{ log("Creating default (dev) udf_bulk_decode_traces_v2", info=True) }}
{{ var("config")["dev"]["API_INTEGRATION"] }} AS 'https://{{ var("config")["dev"]["EXTERNAL_FUNCTION_URI"] | lower }}bulk_decode_traces'
{% endif %};
{% endset %}
{{ log(sql, info=True) }}
{% do adapter.execute(sql) %}
{% endmacro %}

View File

@ -118,4 +118,47 @@
SELECT
NULL
{% endif %}
{% endmacro %}
{% macro if_data_call_wait() %}
{% if var(
"STREAMLINE_INVOKE_STREAMS"
) %}
{% set query %}
SELECT
1
WHERE
EXISTS(
SELECT
1
FROM
{{ model.schema ~ "." ~ model.alias }}
LIMIT
1
) {% endset %}
{% if execute %}
{% set results = run_query(
query
) %}
{% if results %}
{{ log(
"Waiting...",
info = True
) }}
{% set wait_query %}
SELECT
system$wait(
{{ var(
"WAIT",
400
) }}
) {% endset %}
{% do run_query(wait_query) %}
{% else %}
SELECT
NULL;
{% endif %}
{% endif %}
{% endif %}
{% endmacro %}

View File

@ -1,51 +1,37 @@
{% macro create_gha_tasks() %}
{% set query %}
SELECT
task_name,
workflow_name,
workflow_schedule
FROM
{{ ref('github_actions__tasks') }}
SELECT
task_name,
workflow_name,
workflow_schedule
FROM
{{ ref('github_actions__tasks') }}
{% endset %}
{% set results = run_query(query) %}
{% if execute and results is not none %}
{% set results_list = results.rows %}
{% else %}
{% set results_list = [] %}
{% endif %}
{% set prod_db = target.database.lower().replace('_dev', '') %}
{% set prod_db = target.database.lower().replace(
'_dev',
''
) %}
{% for result in results_list %}
{% set task_name = result[0] %}
{% set workflow_name = result[1] %}
{% set workflow_schedule = result[2] %}
{% set task_name = result [0] %}
{% set workflow_name = result [1] %}
{% set workflow_schedule = result [2] %}
{% set sql %}
EXECUTE IMMEDIATE
'CREATE OR REPLACE TASK github_actions.{{ task_name }}
WAREHOUSE = DBT_CLOUD
SCHEDULE = \'USING CRON {{ workflow_schedule }} UTC\'
COMMENT = \'Task to trigger {{ workflow_name }}.yml workflow according to {{ workflow_schedule }}\' AS
DECLARE
rs resultset;
output string;
BEGIN
rs := (SELECT github_actions.workflow_dispatches(\'FlipsideCrypto\', \'{{ prod_db }}-models\', \'{{ workflow_name }}.yml\', NULL):status_code::int AS status_code);
SELECT LISTAGG($1, \';\') INTO :output FROM TABLE(result_scan(LAST_QUERY_ID())) LIMIT 1;
CALL SYSTEM$SET_RETURN_VALUE(:output);
END;'
{% endset %}
EXECUTE IMMEDIATE 'CREATE OR REPLACE TASK github_actions.{{ task_name }} WAREHOUSE = DBT_CLOUD SCHEDULE = \'USING CRON {{ workflow_schedule }} UTC\' COMMENT = \'Task to trigger {{ workflow_name }}.yml workflow according to {{ workflow_schedule }}\' AS DECLARE rs resultset; output string; BEGIN rs := (SELECT github_actions.workflow_dispatches(\'FlipsideCrypto\', \'{{ prod_db }}-models\', \'{{ workflow_name }}.yml\', NULL):status_code::int AS status_code); SELECT LISTAGG($1, \';\') INTO :output FROM TABLE(result_scan(LAST_QUERY_ID())) LIMIT 1; CALL SYSTEM$SET_RETURN_VALUE(:output); END;' {% endset %}
{% do run_query(sql) %}
{% if var("START_GHA_TASKS") %}
{% if target.database.lower() == prod_db %}
{% set sql %}
ALTER TASK github_actions.{{ task_name }} RESUME;
{% endset %}
ALTER task github_actions.{{ task_name }}
resume;
{% endset %}
{% do run_query(sql) %}
{% endif %}
{% endif %}
@ -53,27 +39,28 @@
{% endmacro %}
{% macro gha_tasks_view() %}
SELECT
workflow_name,
concat_ws(
'_',
'TRIGGER',
UPPER(workflow_name)
) AS task_name,
workflow_schedule
FROM
{{ source(
'github_actions',
'workflows'
) }}
SELECT
workflow_name,
concat_ws(
'_',
'TRIGGER',
UPPER(workflow_name)
) AS task_name,
workflow_schedule
FROM
{{ source(
'github_actions',
'workflows'
) }}
{% endmacro %}
{% macro gha_task_history_view() %}
{% set query %}
SELECT
DISTINCT task_name
FROM
{{ ref('github_actions__tasks') }}
SELECT
DISTINCT task_name
FROM
{{ ref('github_actions__tasks') }}
{% endset %}
{% set results = run_query(query) %}
{% if execute and results is not none %}
@ -88,21 +75,14 @@
FROM
({% for result in results_list %}
SELECT
NAME AS task_name,
completed_time,
return_value,
state,
database_name,
schema_name,
scheduled_time,
query_start_time
NAME AS task_name, completed_time, return_value, state, database_name, schema_name, scheduled_time, query_start_time
FROM
TABLE(information_schema.task_history(scheduled_time_range_start => DATEADD('hour', -24, CURRENT_TIMESTAMP()), task_name => '{{ result[0]}}')) {% if not loop.last %}
UNION ALL
{% endif %}
{% endfor %}) AS subquery
WHERE
database_name = '{{ target.database }}'
database_name = '{{ target.database }}'
AND schema_name = 'GITHUB_ACTIONS')
SELECT
*
@ -126,62 +106,76 @@
)
) AS t
)
SELECT
task_name,
workflow_name,
workflow_schedule,
scheduled_time
FROM
base
SELECT
task_name,
workflow_name,
workflow_schedule,
scheduled_time
FROM
base
{% endmacro %}
{% macro gha_task_performance_view() %}
SELECT
s.task_name,
s.workflow_name,
s.scheduled_time,
SELECT
s.task_name,
s.workflow_name,
s.scheduled_time,
h.return_value
FROM
{{ ref('github_actions__task_schedule') }}
s
LEFT JOIN {{ ref('github_actions__task_history') }}
h
ON s.task_name = h.task_name
AND TO_TIMESTAMP_NTZ(
s.scheduled_time
) BETWEEN TO_TIMESTAMP_NTZ(DATEADD(MINUTE, -1, h.scheduled_time))
AND TO_TIMESTAMP_NTZ(DATEADD(MINUTE, 1, h.scheduled_time))
AND TRY_TO_NUMBER(
h.return_value
FROM
{{ ref('github_actions__task_schedule') }}
s
LEFT JOIN {{ ref('github_actions__task_history') }}
h
ON s.task_name = h.task_name
AND TO_TIMESTAMP_NTZ(DATE_TRUNC('minute', s.scheduled_time)) = TO_TIMESTAMP_NTZ(DATE_TRUNC('minute', h.scheduled_time))
AND try_to_number(h.return_value) between 200 and 299
AND h.state = 'SUCCEEDED'
ORDER BY
task_name,
scheduled_time
) BETWEEN 200
AND 299
AND h.state = 'SUCCEEDED'
ORDER BY
task_name,
scheduled_time
{% endmacro %}
{% macro gha_task_current_status_view() %}
WITH base AS (
SELECT
SELECT
task_name,
workflow_name,
scheduled_time,
return_value,
return_value IS NOT NULL AS was_successful
FROM {{ ref('github_actions__task_performance') }}
QUALIFY row_number() OVER (PARTITION BY task_name ORDER BY scheduled_time DESC) <= 2
FROM
{{ ref('github_actions__task_performance') }}
qualify ROW_NUMBER() over (
PARTITION BY task_name
ORDER BY
scheduled_time DESC
) <= 2
)
SELECT
task_name,
workflow_name,
MAX(scheduled_time) AS recent_scheduled_time,
MIN(scheduled_time) AS prior_scheduled_time,
SUM(IFF(return_value = 204, 1, 0)) AS successes,
successes > 0 AS pipeline_active
FROM base
GROUP BY task_name, workflow_name
SELECT
task_name,
workflow_name,
MAX(scheduled_time) AS recent_scheduled_time,
MIN(scheduled_time) AS prior_scheduled_time,
SUM(IFF(return_value = 204, 1, 0)) AS successes,
successes > 0 AS pipeline_active
FROM
base
GROUP BY
task_name,
workflow_name
{% endmacro %}
{% macro alter_gha_task(task_name, task_action) %}
{% macro alter_gha_task(
task_name,
task_action
) %}
{% set sql %}
EXECUTE IMMEDIATE
'ALTER TASK IF EXISTS github_actions.{{ task_name }} {{ task_action }};'
{% endset %}
EXECUTE IMMEDIATE 'ALTER TASK IF EXISTS github_actions.{{ task_name }} {{ task_action }};' {% endset %}
{% do run_query(sql) %}
{% endmacro %}
{% endmacro %}

View File

@ -1,7 +1,3 @@
packages:
- package: calogica/dbt_expectations
version: [">=0.8.0", "<0.9.0"]
- package: dbt-labs/dbt_utils
version: [">=1.0.0", "<1.1.0"]
- git: https://github.com/FlipsideCrypto/livequery-models.git
revision: "v1.2.0"
revision: "v1.10.2"