An 1132/asset metadata refresh process (#8)

* wip

* create udfs and sp to support api call

* these were accidentally removed

* add view/udf/sp needed for api integration

* dedupe asset metadata before putting into silver

* add ignore list to view

* add model descriptions

* fix where logic
This commit is contained in:
desmond-hui 2022-05-04 08:08:15 -07:00 committed by GitHub
parent 89817b0c71
commit 85368245d2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 248 additions and 14 deletions

View File

@ -39,3 +39,8 @@ models:
vars:
"dbt_date:time_zone": America/Los_Angeles
on-run-start:
- '{{create_sps()}}'
- '{{create_tasks()}}'
- '{{create_udfs()}}'
- '{{sp_bulk_get_asset_metadata()}}'

View File

@ -3,5 +3,4 @@
CREATE SCHEMA IF NOT EXISTS _internal;
{{ task_run_sp_create_prod_clone('_internal') }};
{% endif %}
{% endmacro %}

7
macros/create_udfs.sql Normal file
View File

@ -0,0 +1,7 @@
{% macro create_udfs() %}
{% set sql %}
{{ udf_bulk_get_asset_metadata() }};
{# Add crate udf macros here #}
{% endset %}
{% do run_query(sql) %}
{% endmacro %}

View File

@ -0,0 +1,26 @@
{% macro sp_bulk_get_asset_metadata() %}
{% set sql %}
CREATE
OR REPLACE PROCEDURE silver.sp_bulk_get_asset_metadata() returns variant LANGUAGE SQL AS $$
DECLARE
RESULT VARCHAR;
row_cnt INTEGER;
BEGIN
row_cnt:= (
SELECT
COUNT(1)
FROM
{{ ref('silver__all_undecoded_assets') }}
);
if (
row_cnt > 0
) THEN RESULT:= (
SELECT
silver.udf_bulk_get_asset_metadata()
);
ELSE RESULT:= NULL;
END if;
RETURN RESULT;
END;$$ {% endset %}
{% do run_query(sql) %}
{% endmacro %}

View File

@ -0,0 +1,8 @@
{% macro udf_bulk_get_asset_metadata() %}
CREATE
OR REPLACE EXTERNAL FUNCTION silver.udf_bulk_get_asset_metadata() returns text api_integration = aws_osmosis_api_dev AS {% if target.name == "prod" -%}
'https://k7jc1bnb8i.execute-api.us-east-1.amazonaws.com/prod/get_asset_metadata'
{% else %}
'https://auacbjh2tj.execute-api.us-east-1.amazonaws.com/dev/get_asset_metadata'
{%- endif %}
{% endmacro %}

View File

@ -0,0 +1,5 @@
{% docs pool_assets %}
List of 2 or more asset addresses belonging to the pool
{% enddocs %}

View File

@ -0,0 +1,5 @@
{% docs pool_id %}
Identifier for the pool within the module
{% enddocs %}

View File

@ -0,0 +1,5 @@
{% docs pool_module %}
Module used to create liquidity pool
{% enddocs %}

View File

@ -0,0 +1,33 @@
{{ config(
materialized = 'view',
post_hook = "call silver.sp_bulk_get_asset_metadata()"
) }}
SELECT
DISTINCT A.value :asset_address :: STRING AS address
FROM
{{ ref('silver__pool_metadata') }},
TABLE(FLATTEN(assets)) A
WHERE
--ignore low liquidity pools with unlabeled assets
pool_id NOT IN (
290,
291,
677,
684,
354,
691,
694,
293,
654,
647,
646
)
EXCEPT
SELECT
base AS address
FROM
{{ source(
'osmosis_external',
'asset_metadata_api'
) }}

View File

@ -2,16 +2,32 @@
materialized = 'table'
) }}
SELECT
'osmosis' AS blockchain,
base AS address,
'flipside' AS creator,
'token' AS label_type,
'token_contract' AS label_subtype,
name AS label,
symbol AS project_name,
denom_units AS value
FROM {{ source(
'osmosis_external',
'asset_metadata_api'
) }}
WITH base AS (
SELECT
base AS address,
NAME AS label,
symbol AS project_name,
denom_units AS VALUE
FROM
{{ source(
'osmosis_external',
'asset_metadata_api'
) }}
GROUP BY
1,
2,
3,
4
)
SELECT
'osmosis' AS blockchain,
address,
'flipside' AS creator,
'token' AS label_type,
'token_contract' AS label_subtype,
label,
project_name,
VALUE
FROM
base

View File

@ -0,0 +1,104 @@
{{ config(
materialized = 'incremental',
unique_key = "CONCAT_WS('-', module, pool_id)",
incremental_strategy = 'delete+insert',
) }}
WITH pool_creation_txs AS (
SELECT
DISTINCT tx_id
FROM
{{ ref('silver__msgs') }}
WHERE
msg_type = 'pool_created'
{% if is_incremental() %}
AND ingested_at :: DATE <= CURRENT_DATE - 2
{% endif %}
),
b AS (
SELECT
ma.tx_id,
msg_type,
msg_index,
attribute_index,
attribute_key,
attribute_value
FROM
{{ ref('silver__msg_attributes') }}
ma
INNER JOIN pool_creation_txs t
ON t.tx_id = ma.tx_id
WHERE
(
attribute_key IN (
'module',
'pool_id'
)
OR (
msg_type = 'transfer'
AND attribute_key = 'amount'
AND attribute_value IS NOT NULL
AND attribute_value NOT LIKE '%/pool/%'
AND ARRAY_SIZE(SPLIT(attribute_value, ',')) :: NUMBER > 1
)
)
{% if is_incremental() %}
AND ingested_at :: DATE <= CURRENT_DATE - 2
{% endif %}
),
C AS (
SELECT
tx_id,
OBJECT_AGG(
attribute_key,
attribute_value :: variant
) AS obj
FROM
b
GROUP BY
1
),
d AS (
SELECT
tx_id,
obj :module :: STRING AS module,
obj :pool_id :: NUMBER AS pool_id,
'asset_address' AS object_key,
LTRIM(
A.value,
'0123456789'
) AS asset_address
FROM
C,
TABLE(FLATTEN(SPLIT(obj :amount, ','))) A
),
e AS (
SELECT
tx_id,
module,
pool_id,
asset_address,
OBJECT_AGG(
object_key,
asset_address :: variant
) AS asset_obj
FROM
d
GROUP BY
1,
2,
3,
4
)
SELECT
module,
pool_id,
ARRAY_AGG(asset_obj) AS assets
FROM
e
GROUP BY
1,
2

View File

@ -0,0 +1,21 @@
version: 2
models:
- name: silver__pool_metadata
tests:
- dbt_utils.unique_combination_of_columns:
combination_of_columns:
- MODULE
- POOL_ID
columns:
- name: MODULE
description: "{{ doc('pool_module') }}"
tests:
- not_null
- name: POOL_ID
description: "{{ doc('pool_id') }}"
tests:
- not_null
- name: ASSETS
description: "{{ doc('pool_assets') }}"
tests:
- not_null