diff --git a/dbt_project.yml b/dbt_project.yml index b234c33..66cb0ba 100644 --- a/dbt_project.yml +++ b/dbt_project.yml @@ -39,3 +39,8 @@ models: vars: "dbt_date:time_zone": America/Los_Angeles +on-run-start: + - '{{create_sps()}}' + - '{{create_tasks()}}' + - '{{create_udfs()}}' + - '{{sp_bulk_get_asset_metadata()}}' \ No newline at end of file diff --git a/macros/create_tasks.sql b/macros/create_tasks.sql index 5176696..2fd46f5 100644 --- a/macros/create_tasks.sql +++ b/macros/create_tasks.sql @@ -3,5 +3,4 @@ CREATE SCHEMA IF NOT EXISTS _internal; {{ task_run_sp_create_prod_clone('_internal') }}; {% endif %} - {% endmacro %} \ No newline at end of file diff --git a/macros/create_udfs.sql b/macros/create_udfs.sql new file mode 100644 index 0000000..ae89852 --- /dev/null +++ b/macros/create_udfs.sql @@ -0,0 +1,7 @@ +{% macro create_udfs() %} + {% set sql %} + {{ udf_bulk_get_asset_metadata() }}; +{# Add crate udf macros here #} + {% endset %} + {% do run_query(sql) %} +{% endmacro %} diff --git a/macros/sp_bulk_get_asset_metadata.sql b/macros/sp_bulk_get_asset_metadata.sql new file mode 100644 index 0000000..4b7977f --- /dev/null +++ b/macros/sp_bulk_get_asset_metadata.sql @@ -0,0 +1,26 @@ +{% macro sp_bulk_get_asset_metadata() %} + {% set sql %} + CREATE + OR REPLACE PROCEDURE silver.sp_bulk_get_asset_metadata() returns variant LANGUAGE SQL AS $$ +DECLARE + RESULT VARCHAR; +row_cnt INTEGER; +BEGIN + row_cnt:= ( + SELECT + COUNT(1) + FROM + {{ ref('silver__all_undecoded_assets') }} + ); +if ( + row_cnt > 0 + ) THEN RESULT:= ( + SELECT + silver.udf_bulk_get_asset_metadata() + ); + ELSE RESULT:= NULL; +END if; +RETURN RESULT; +END;$$ {% endset %} +{% do run_query(sql) %} +{% endmacro %} diff --git a/macros/udf_bulk_get_asset_metadata.sql b/macros/udf_bulk_get_asset_metadata.sql new file mode 100644 index 0000000..bd1a94f --- /dev/null +++ b/macros/udf_bulk_get_asset_metadata.sql @@ -0,0 +1,8 @@ +{% macro udf_bulk_get_asset_metadata() %} + CREATE + OR REPLACE EXTERNAL FUNCTION silver.udf_bulk_get_asset_metadata() returns text api_integration = aws_osmosis_api_dev AS {% if target.name == "prod" -%} + 'https://k7jc1bnb8i.execute-api.us-east-1.amazonaws.com/prod/get_asset_metadata' + {% else %} + 'https://auacbjh2tj.execute-api.us-east-1.amazonaws.com/dev/get_asset_metadata' + {%- endif %} +{% endmacro %} \ No newline at end of file diff --git a/models/descriptions/pool_assets.md b/models/descriptions/pool_assets.md new file mode 100644 index 0000000..c185e17 --- /dev/null +++ b/models/descriptions/pool_assets.md @@ -0,0 +1,5 @@ +{% docs pool_assets %} + +List of 2 or more asset addresses belonging to the pool + +{% enddocs %} \ No newline at end of file diff --git a/models/descriptions/pool_id.md b/models/descriptions/pool_id.md new file mode 100644 index 0000000..8aa92e3 --- /dev/null +++ b/models/descriptions/pool_id.md @@ -0,0 +1,5 @@ +{% docs pool_id %} + +Identifier for the pool within the module + +{% enddocs %} \ No newline at end of file diff --git a/models/descriptions/pool_module.md b/models/descriptions/pool_module.md new file mode 100644 index 0000000..ae8713b --- /dev/null +++ b/models/descriptions/pool_module.md @@ -0,0 +1,5 @@ +{% docs pool_module %} + +Module used to create liquidity pool + +{% enddocs %} \ No newline at end of file diff --git a/models/silver/silver__all_undecoded_assets.sql b/models/silver/silver__all_undecoded_assets.sql new file mode 100644 index 0000000..8a438d2 --- /dev/null +++ b/models/silver/silver__all_undecoded_assets.sql @@ -0,0 +1,33 @@ +{{ config( + materialized = 'view', + post_hook = "call silver.sp_bulk_get_asset_metadata()" +) }} + +SELECT + DISTINCT A.value :asset_address :: STRING AS address +FROM + {{ ref('silver__pool_metadata') }}, + TABLE(FLATTEN(assets)) A +WHERE + --ignore low liquidity pools with unlabeled assets + pool_id NOT IN ( + 290, + 291, + 677, + 684, + 354, + 691, + 694, + 293, + 654, + 647, + 646 + ) +EXCEPT +SELECT + base AS address +FROM + {{ source( + 'osmosis_external', + 'asset_metadata_api' + ) }} diff --git a/models/silver/silver__asset_metadata.sql b/models/silver/silver__asset_metadata.sql index 71a33b0..d46c8f2 100644 --- a/models/silver/silver__asset_metadata.sql +++ b/models/silver/silver__asset_metadata.sql @@ -2,16 +2,32 @@ materialized = 'table' ) }} -SELECT - 'osmosis' AS blockchain, - base AS address, - 'flipside' AS creator, - 'token' AS label_type, - 'token_contract' AS label_subtype, - name AS label, - symbol AS project_name, - denom_units AS value -FROM {{ source( - 'osmosis_external', - 'asset_metadata_api' - ) }} \ No newline at end of file +WITH base AS ( + + SELECT + base AS address, + NAME AS label, + symbol AS project_name, + denom_units AS VALUE + FROM + {{ source( + 'osmosis_external', + 'asset_metadata_api' + ) }} + GROUP BY + 1, + 2, + 3, + 4 +) +SELECT + 'osmosis' AS blockchain, + address, + 'flipside' AS creator, + 'token' AS label_type, + 'token_contract' AS label_subtype, + label, + project_name, + VALUE +FROM + base diff --git a/models/silver/silver__pool_metadata.sql b/models/silver/silver__pool_metadata.sql new file mode 100644 index 0000000..1143b74 --- /dev/null +++ b/models/silver/silver__pool_metadata.sql @@ -0,0 +1,104 @@ +{{ config( + materialized = 'incremental', + unique_key = "CONCAT_WS('-', module, pool_id)", + incremental_strategy = 'delete+insert', +) }} + +WITH pool_creation_txs AS ( + + SELECT + DISTINCT tx_id + FROM + {{ ref('silver__msgs') }} + WHERE + msg_type = 'pool_created' + +{% if is_incremental() %} +AND ingested_at :: DATE <= CURRENT_DATE - 2 +{% endif %} +), +b AS ( + SELECT + ma.tx_id, + msg_type, + msg_index, + attribute_index, + attribute_key, + attribute_value + FROM + {{ ref('silver__msg_attributes') }} + ma + INNER JOIN pool_creation_txs t + ON t.tx_id = ma.tx_id + WHERE + ( + attribute_key IN ( + 'module', + 'pool_id' + ) + OR ( + msg_type = 'transfer' + AND attribute_key = 'amount' + AND attribute_value IS NOT NULL + AND attribute_value NOT LIKE '%/pool/%' + AND ARRAY_SIZE(SPLIT(attribute_value, ',')) :: NUMBER > 1 + ) + ) + +{% if is_incremental() %} +AND ingested_at :: DATE <= CURRENT_DATE - 2 +{% endif %} +), +C AS ( + SELECT + tx_id, + OBJECT_AGG( + attribute_key, + attribute_value :: variant + ) AS obj + FROM + b + GROUP BY + 1 +), +d AS ( + SELECT + tx_id, + obj :module :: STRING AS module, + obj :pool_id :: NUMBER AS pool_id, + 'asset_address' AS object_key, + LTRIM( + A.value, + '0123456789' + ) AS asset_address + FROM + C, + TABLE(FLATTEN(SPLIT(obj :amount, ','))) A +), +e AS ( + SELECT + tx_id, + module, + pool_id, + asset_address, + OBJECT_AGG( + object_key, + asset_address :: variant + ) AS asset_obj + FROM + d + GROUP BY + 1, + 2, + 3, + 4 +) +SELECT + module, + pool_id, + ARRAY_AGG(asset_obj) AS assets +FROM + e +GROUP BY + 1, + 2 diff --git a/models/silver/silver__pool_metadata.yml b/models/silver/silver__pool_metadata.yml new file mode 100644 index 0000000..d636f02 --- /dev/null +++ b/models/silver/silver__pool_metadata.yml @@ -0,0 +1,21 @@ +version: 2 +models: + - name: silver__pool_metadata + tests: + - dbt_utils.unique_combination_of_columns: + combination_of_columns: + - MODULE + - POOL_ID + columns: + - name: MODULE + description: "{{ doc('pool_module') }}" + tests: + - not_null + - name: POOL_ID + description: "{{ doc('pool_id') }}" + tests: + - not_null + - name: ASSETS + description: "{{ doc('pool_assets') }}" + tests: + - not_null \ No newline at end of file