An 1325/integrate balances lambda (#27)

* integrate balances api, add raw balances external table

* remove silver balances placeholder

* fix hardcoded table names

* update snapshot date
This commit is contained in:
desmond-hui 2022-06-01 12:52:25 -07:00 committed by GitHub
parent 15832efa73
commit 5cf0ef5eba
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 124 additions and 4 deletions

View File

@ -37,9 +37,10 @@ models:
columns: true
vars:
"dbt_date:time_zone": America/Los_Angeles
"dbt_date:time_zone": GMT
on-run-start:
- '{{create_sps()}}'
- '{{create_udfs()}}'
- '{{sp_bulk_get_asset_metadata()}}'
- '{{sp_bulk_get_asset_metadata()}}'
- '{{sp_create_bulk_get_balances()}}'

View File

@ -1,7 +1,10 @@
{% macro create_udfs() %}
{% set sql %}
{{ udf_bulk_get_asset_metadata() }};
{# Add crate udf macros here #}
{% endset %}
{% do run_query(sql) %}
{% set sql %}
{{ udf_bulk_get_balances() }};
{% endset %}
{% do run_query(sql) %}
{% endmacro %}

View File

@ -0,0 +1,30 @@
{% macro sp_create_bulk_get_balances() %}
{% set sql %}
CREATE OR REPLACE PROCEDURE silver.sp_bulk_get_balances()
RETURNS variant
LANGUAGE SQL
AS
$$
DECLARE
RESULT VARCHAR;
row_cnt INTEGER;
BEGIN
row_cnt:= (
SELECT
COUNT(1)
FROM
silver.all_unknown_balances
);
if (
row_cnt > 0
) THEN RESULT:= (
SELECT
silver.udf_bulk_get_balances()
);
ELSE RESULT:= NULL;
END if;
RETURN RESULT;
END;
$${% endset %}
{% do run_query(sql) %}
{% endmacro %}

View File

@ -0,0 +1,8 @@
{% macro udf_bulk_get_balances() %}
CREATE
OR REPLACE EXTERNAL FUNCTION silver.udf_bulk_get_balances() returns text api_integration = aws_osmosis_api_dev AS {% if target.name == "prod" -%}
'https://k7jc1bnb8i.execute-api.us-east-1.amazonaws.com/prod/bulk_get_balances'
{% else %}
'https://auacbjh2tj.execute-api.us-east-1.amazonaws.com/dev/bulk_get_balances'
{%- endif %}
{% endmacro %}

View File

@ -0,0 +1,58 @@
{{ config(
materialized = 'view',
post_hook = 'call silver.sp_bulk_get_balances()',
) }}
WITH all_wallets AS (
SELECT
DISTINCT attribute_value AS address
FROM
{{ ref('silver__msg_attributes') }}
WHERE
RLIKE(
attribute_value,
'osmo\\w{39}'
)
AND block_timestamp :: DATE <= '2022-05-31' -- some snapshot date
),
wallets_per_block AS (
SELECT
DISTINCT block_id,
attribute_value AS address
FROM
{{ ref('silver__msg_attributes') }}
WHERE
RLIKE(
attribute_value,
'osmo\\w{39}'
)
AND block_id > 2383300
),
possible_balances_needed AS (
SELECT
2383300 AS block_id,
address
FROM
all_wallets
UNION ALL
SELECT
*
FROM
wallets_per_block
)
SELECT
block_id,
address
FROM
possible_balances_needed
EXCEPT
SELECT
DISTINCT block_id,
address
FROM
{{ source('osmosis_external','balances_api') }}
ORDER BY
block_id
LIMIT
50000

View File

@ -129,4 +129,24 @@ sources:
- name: symbol
data_type: string
description: ""
expression: value:symbol::string
expression: value:symbol::string
- name: balances_api
description: "Wallet balances from osmosis api"
external:
location: "@osmosis.bronze.analytics_external_tables/{{target.database}}/BALANCES_API"
file_format: "( type = json, strip_outer_array = TRUE )"
auto_refresh: true
partitions:
- name: _inserted_date
data_type: string
expression: substr((split_part(METADATA$FILENAME,'/',3)),16,10)
columns:
- name: block_id
data_type: number
description: ""
- name: address
data_type: string
description: ""
- name: balances
data_type: array
description: ""