From c79ec93d98dfeed45307f668a3ddeeb6b2d0fea3 Mon Sep 17 00:00:00 2001 From: desmond-hui <97470747+desmond-hui@users.noreply.github.com> Date: Fri, 22 Jul 2022 08:58:02 -0700 Subject: [PATCH] An 1740/new crosschain prices (#4) * integrate coin market cap asset ohlc api * revert, accidentally moved this file * add sample profile --- README.md | 31 ++++++- dbt_project.yml | 1 + macros/create_udfs.sql | 2 + ...te_bulk_fill_cmc_historical_price_gaps.sql | 2 +- ...p_bulk_get_coin_market_cap_hourly_ohlc.sql | 7 ++ ...create_get_coin_market_cap_hourly_ohlc.sql | 30 +++++++ .../udf_get_coin_market_cap_hourly_ohlc.sql | 8 ++ models/sources.yml | 50 ++++++++++- ...nown_coin_market_cap_asset_ohlc_hourly.sql | 90 +++++++++++++++++++ .../streamline__legacy_prices_gaps.sql} | 1 - 10 files changed, 217 insertions(+), 5 deletions(-) create mode 100644 macros/streamline/get_coin_market_cap_hourly_ohlc/run_sp_bulk_get_coin_market_cap_hourly_ohlc.sql create mode 100644 macros/streamline/get_coin_market_cap_hourly_ohlc/sp_create_get_coin_market_cap_hourly_ohlc.sql create mode 100644 macros/streamline/get_coin_market_cap_hourly_ohlc/udf_get_coin_market_cap_hourly_ohlc.sql create mode 100644 models/streamline/streamline__all_unknown_coin_market_cap_asset_ohlc_hourly.sql rename models/{silver/silver__legacy_prices_gaps.sql => streamline/streamline__legacy_prices_gaps.sql} (97%) diff --git a/README.md b/README.md index 18ac77a..dee8af9 100644 --- a/README.md +++ b/README.md @@ -1 +1,30 @@ -# crosschain-models \ No newline at end of file +## Profile Set Up + +#### Use the following within profiles.yml +---- + +```yml +crosschain: + target: dev + outputs: + dev: + type: snowflake + account: + role: + user: + password: + region: + database: CROSSCHAIN_DEV + warehouse: + schema: silver + threads: 4 + client_session_keep_alive: False + query_tag: +``` + +### Resources: +- Learn more about dbt [in the docs](https://docs.getdbt.com/docs/introduction) +- Check out [Discourse](https://discourse.getdbt.com/) for commonly asked questions and answers +- Join the [chat](https://community.getdbt.com/) on Slack for live discussions and support +- Find [dbt events](https://events.getdbt.com) near you +- Check out [the blog](https://blog.getdbt.com/) for the latest news on dbt's development and best practices \ No newline at end of file diff --git a/dbt_project.yml b/dbt_project.yml index 72ef7cd..12f8931 100644 --- a/dbt_project.yml +++ b/dbt_project.yml @@ -32,6 +32,7 @@ on-run-start: - '{{ sp_create_bulk_get_coin_market_cap_asset_metadata() }}' - '{{ sp_create_bulk_get_coin_market_cap_prices() }}' - '{{ sp_create_bulk_get_coin_gecko_prices() }}' + - '{{ sp_create_bulk_get_coin_market_cap_hourly_ohlc() }}' - '{{create_tasks()}}' diff --git a/macros/create_udfs.sql b/macros/create_udfs.sql index 429a4eb..c310969 100644 --- a/macros/create_udfs.sql +++ b/macros/create_udfs.sql @@ -1,10 +1,12 @@ {% macro create_udfs() %} {% set sql %} + CREATE schema if NOT EXISTS streamline; {{ udf_bulk_fill_cmc_historical_price_gaps() }}; {{ udf_bulk_get_coin_gecko_asset_metadata() }}; {{ udf_bulk_get_coin_market_cap_asset_metadata() }}; {{ udf_bulk_get_coin_market_cap_prices() }}; {{ udf_bulk_get_coin_gecko_prices() }}; + {{ udf_bulk_get_coin_market_cap_hourly_ohlc() }}; {% endset %} {% do run_query(sql) %} {% endmacro %} diff --git a/macros/streamline/fill_cmc_historical_price_gaps/sp_create_bulk_fill_cmc_historical_price_gaps.sql b/macros/streamline/fill_cmc_historical_price_gaps/sp_create_bulk_fill_cmc_historical_price_gaps.sql index 4f0f3ad..4b5a2ec 100644 --- a/macros/streamline/fill_cmc_historical_price_gaps/sp_create_bulk_fill_cmc_historical_price_gaps.sql +++ b/macros/streamline/fill_cmc_historical_price_gaps/sp_create_bulk_fill_cmc_historical_price_gaps.sql @@ -13,7 +13,7 @@ $$ SELECT COUNT(1) FROM - silver.legacy_prices_gaps + streamline.legacy_prices_gaps ); if ( row_cnt > 0 diff --git a/macros/streamline/get_coin_market_cap_hourly_ohlc/run_sp_bulk_get_coin_market_cap_hourly_ohlc.sql b/macros/streamline/get_coin_market_cap_hourly_ohlc/run_sp_bulk_get_coin_market_cap_hourly_ohlc.sql new file mode 100644 index 0000000..827cfd6 --- /dev/null +++ b/macros/streamline/get_coin_market_cap_hourly_ohlc/run_sp_bulk_get_coin_market_cap_hourly_ohlc.sql @@ -0,0 +1,7 @@ +{% macro run_sp_bulk_get_coin_market_cap_hourly_ohlc() %} + {% set sql %} + call streamline.sp_bulk_get_coin_market_cap_hourly_ohlc(); + {% endset %} + + {% do run_query(sql) %} +{% endmacro %} \ No newline at end of file diff --git a/macros/streamline/get_coin_market_cap_hourly_ohlc/sp_create_get_coin_market_cap_hourly_ohlc.sql b/macros/streamline/get_coin_market_cap_hourly_ohlc/sp_create_get_coin_market_cap_hourly_ohlc.sql new file mode 100644 index 0000000..c8cae08 --- /dev/null +++ b/macros/streamline/get_coin_market_cap_hourly_ohlc/sp_create_get_coin_market_cap_hourly_ohlc.sql @@ -0,0 +1,30 @@ +{% macro sp_create_bulk_get_coin_market_cap_hourly_ohlc() %} +{% set sql %} +CREATE OR REPLACE PROCEDURE streamline.sp_bulk_get_coin_market_cap_hourly_ohlc() +RETURNS variant +LANGUAGE SQL +AS +$$ + DECLARE + RESULT VARCHAR; + row_cnt INTEGER; + BEGIN + row_cnt:= ( + SELECT + COUNT(1) + FROM + silver.all_unknown_coin_market_cap_asset_ohlc_hourly + ); + if ( + row_cnt > 0 + ) THEN RESULT:= ( + SELECT + streamline.udf_bulk_get_coin_market_cap_hourly_ohlc() + ); + ELSE RESULT:= NULL; + END if; + RETURN RESULT; + END; +$${% endset %} +{% do run_query(sql) %} +{% endmacro %} \ No newline at end of file diff --git a/macros/streamline/get_coin_market_cap_hourly_ohlc/udf_get_coin_market_cap_hourly_ohlc.sql b/macros/streamline/get_coin_market_cap_hourly_ohlc/udf_get_coin_market_cap_hourly_ohlc.sql new file mode 100644 index 0000000..dadc1d3 --- /dev/null +++ b/macros/streamline/get_coin_market_cap_hourly_ohlc/udf_get_coin_market_cap_hourly_ohlc.sql @@ -0,0 +1,8 @@ +{% macro udf_bulk_get_coin_market_cap_hourly_ohlc() %} + CREATE + OR REPLACE EXTERNAL FUNCTION streamline.udf_bulk_get_coin_market_cap_hourly_ohlc() returns text api_integration = aws_crosschain_api_dev AS {% if target.name == "prod" -%} + 'https://q2il6n5mmg.execute-api.us-east-1.amazonaws.com/prod/bulk_get_coin_market_cap_hourly_ohlc' + {% else %} + 'https://ubuxgfotp2.execute-api.us-east-1.amazonaws.com/dev/bulk_get_coin_market_cap_hourly_ohlc' + {%- endif %} +{% endmacro %} \ No newline at end of file diff --git a/models/sources.yml b/models/sources.yml index d62247b..495497e 100644 --- a/models/sources.yml +++ b/models/sources.yml @@ -20,6 +20,52 @@ sources: schema: core tables: - name: fact_transactions - - + - name: crosschain_external + schema: bronze + loader: S3 + tables: + - name: asset_metadata_api + description: "all coins supported by provider" + external: + location: "@crosschain.bronze.analytics_external_tables/{{target.database}}/ASSET_METADATA_API" + file_format: "( type = json, strip_outer_array = TRUE )" + auto_refresh: true + partitions: + - name: provider + data_type: string + expression: split_part(METADATA$FILENAME,'/',3) + columns: + - name: id + data_type: string + description: "" + - name: symbol + data_type: string + description: "" + - name: name + data_type: string + description: "" + - name: asset_ohlc_coin_market_cap_api + description: "all coins supported by provider" + external: + location: "@crosschain.bronze.analytics_external_tables/{{target.database}}/ASSET_OHLC_API/coinmarketcap" + file_format: "( type = json, strip_outer_array = TRUE )" + auto_refresh: true + partitions: + - name: _inserted_date + data_type: string + expression: substr((split_part(METADATA$FILENAME,'/',4)),16,10) + columns: + - name: id + data_type: number + description: "" + - name: api_start_time + data_type: number + - name: api_end_time + data_type: number + - name: metadata + data_type: string + description: "" + - name: data + data_type: variant + description: "" \ No newline at end of file diff --git a/models/streamline/streamline__all_unknown_coin_market_cap_asset_ohlc_hourly.sql b/models/streamline/streamline__all_unknown_coin_market_cap_asset_ohlc_hourly.sql new file mode 100644 index 0000000..b3bb7ab --- /dev/null +++ b/models/streamline/streamline__all_unknown_coin_market_cap_asset_ohlc_hourly.sql @@ -0,0 +1,90 @@ +{{ config( + materialized = 'view', +) }} + +WITH hours AS ( + + SELECT + HOUR + FROM + {% if target.name == 'prod' %} + {{ source( + 'legacy_db', + 'hours' + ) }} + {% else %} + {{ source( + 'legacy_dev_db', + 'hours' + ) }} + {% endif %} + WHERE + HOUR >= '2022-07-20' + AND HOUR < DATE_TRUNC( + 'hour', + CURRENT_TIMESTAMP + ) -- the hour should always be less than current time because it must be "completed" before ohlcv is available +), +cmc_active_assets AS ( + SELECT + id::number as id + FROM + {{ source( + 'crosschain_external', + 'asset_metadata_api' + ) }} + WHERE + provider = 'coinmarketcap' + AND VALUE :status :: STRING = 'active' +), +base AS ( + SELECT + DATE_PART('epoch', DATEADD('minute', -1, HOUR)) AS start_time, + DATE_PART('epoch', DATEADD('hour', 1, HOUR)) AS end_time, + id + FROM + cmc_active_assets + CROSS JOIN hours + EXCEPT + SELECT + api_start_time, + api_end_time, + id + FROM + {{ source( + 'crosschain_external', + 'asset_ohlc_coin_market_cap_api' + ) }} +), +base_params AS ( + SELECT + start_time, + end_time, + id, + 1 AS cnt, + SUM(cnt) over ( + PARTITION BY start_time, + end_time + ORDER BY + id + ) AS csum, + CEIL( + csum / 1000 + ) AS group_cnt + FROM + base +) +SELECT + start_time, + end_time, + group_cnt, + LISTAGG( + id, + ',' + ) AS asset_ids +FROM + base_params +GROUP BY + 1, + 2, + 3 diff --git a/models/silver/silver__legacy_prices_gaps.sql b/models/streamline/streamline__legacy_prices_gaps.sql similarity index 97% rename from models/silver/silver__legacy_prices_gaps.sql rename to models/streamline/streamline__legacy_prices_gaps.sql index c21b10e..97e4ae5 100644 --- a/models/silver/silver__legacy_prices_gaps.sql +++ b/models/streamline/streamline__legacy_prices_gaps.sql @@ -1,6 +1,5 @@ {{ config( materialized = 'view', - post_hook = 'call silver.sp_bulk_fill_cmc_historical_price_gaps()' ) }} WITH cte_date (date_rec) AS (