diff --git a/dbt_project.yml b/dbt_project.yml index dabbeca..e2ce619 100644 --- a/dbt_project.yml +++ b/dbt_project.yml @@ -52,7 +52,7 @@ vars: "dbt_date:time_zone": GMT OBSERV_FULL_TEST: False START_GHA_TASKS: False - STREAMLINE_INVOKE_STREAMS: True + STREAMLINE_INVOKE_STREAMS: False STREAMLINE_USE_DEV_FOR_EXTERNAL_TABLES: True STREAMLINE_RUN_HISTORY: False STREAMLINE_RETRY_UNKNOWN: False @@ -75,7 +75,7 @@ vars: prod: API_INTEGRATION: AWS_EXTERNAL_API_PRO_V2 - EXTERNAL_FUNCTION_URI: + EXTERNAL_FUNCTION_URI: zv7a5qfhv9.execute-api.us-east-1.amazonaws.com/prod/ ROLES: - AWS_LAMBDA_EXTERNAL_API - INTERNAL_DEV diff --git a/macros/streamline/streamline_udfs.sql b/macros/streamline/streamline_udfs.sql index e5ea4d2..bc16432 100644 --- a/macros/streamline/streamline_udfs.sql +++ b/macros/streamline/streamline_udfs.sql @@ -3,7 +3,7 @@ OR REPLACE EXTERNAL FUNCTION streamline.udf_bulk_rest_api_v2( json OBJECT ) returns ARRAY {% if target.database == 'EXTERNAL:' -%} - api_integration = aws_external_api_prod AS '' + api_integration = aws_external_api_prod AS 'https://zv7a5qfhv9.execute-api.us-east-1.amazonaws.com/prod/udf_bulk_rest_api' {% else %} api_integration = aws_external_api_stg_v2 AS 'https://qoupd0givh.execute-api.us-east-1.amazonaws.com/stg/udf_bulk_rest_api' {%- endif %} diff --git a/models/bitquery/silver/silver__bitquery_active_users.sql b/models/bitquery/silver/silver__bitquery_active_users.sql new file mode 100644 index 0000000..49babc0 --- /dev/null +++ b/models/bitquery/silver/silver__bitquery_active_users.sql @@ -0,0 +1,105 @@ +-- depends_on: {{ ref('bronze__bitquery') }} +{{ config( + materialized = 'incremental', + unique_key = ['blockchain', 'metric', 'as_of_date'], + tags = ['bitquery'] +) }} + +WITH ripple AS ( + + SELECT + A.blockchain, + A.metric, + A.date_Day AS as_of_date, + b.value :countBigInt AS active_users, + A._inserted_timestamp + FROM + +{% if is_incremental() %} +{{ ref('bronze__bitquery') }} +{% else %} + {{ ref('bronze__bitquery_FR') }} +{% endif %} + +A, +LATERAL FLATTEN( + A.data :data :ripple :transactions +) b +WHERE + A.data :errors IS NULL + AND A.metric = 'active_users' + AND A.blockchain = 'ripple' + +{% if is_incremental() %} +AND _inserted_timestamp :: DATE > ( + SELECT + MAX(_inserted_timestamp) :: DATE + FROM + {{ this }} +) +{% endif %} +), +hedera AS ( + SELECT + A.blockchain, + A.metric, + A.date_Day AS as_of_date, + b.value :countBigInt AS active_users, + A._inserted_timestamp + FROM + +{% if is_incremental() %} +{{ ref('bronze__bitquery_FR') }} +{% else %} + {{ ref('bronze__bitquery_FR') }} +{% endif %} + +A, +LATERAL FLATTEN( + A.data :data :hedera :transactions +) b +WHERE + A.data :errors IS NULL + AND A.metric = 'active_users' + AND A.blockchain = 'hedera' + +{% if is_incremental() %} +AND _inserted_timestamp :: DATE > ( + SELECT + MAX(_inserted_timestamp) :: DATE + FROM + {{ this }} +) +{% endif %} +), +ua AS ( + SELECT + * + FROM + ripple + UNION ALL + SELECT + * + FROM + hedera +) +SELECT + blockchain, + metric, + as_of_date, + active_users, + _inserted_timestamp, + {{ dbt_utils.generate_surrogate_key( + ['blockchain','metric','as_of_date'] + ) }} AS accounts_id, + SYSDATE() AS inserted_timestamp, + SYSDATE() AS modified_timestamp, + '{{ invocation_id }}' AS _invocation_id +FROM + ua qualify ROW_NUMBER() over ( + PARTITION BY blockchain, + metric, + as_of_date + ORDER BY + _inserted_timestamp DESC + ) = 1 diff --git a/models/bitquery/silver/silver__bitquery_active_users.yml b/models/bitquery/silver/silver__bitquery_active_users.yml new file mode 100644 index 0000000..55466f0 --- /dev/null +++ b/models/bitquery/silver/silver__bitquery_active_users.yml @@ -0,0 +1,33 @@ +version: 2 +models: + - name: silver__bitquery_active_users + tests: + - dbt_utils.unique_combination_of_columns: + combination_of_columns: + - BLOCKCHAIN + - METRIC + - AS_OF_DATE + + columns: + - name: BLOCKCHAIN + tests: + - not_null + - name: METRIC + tests: + - not_null + - name: AS_OF_DATE + tests: + - not_null + - dbt_expectations.expect_row_values_to_have_recent_data: + datepart: day + interval: 2 + - name: ACTIVE_USERS + tests: + - not_null + - name: _INSERTED_TIMESTAMP + tests: + - not_null + - dbt_expectations.expect_column_values_to_be_in_type_list: + column_type_list: + - TIMESTAMP_NTZ + diff --git a/models/bitquery/silver/silver__bitquery_tx_count.sql b/models/bitquery/silver/silver__bitquery_tx_count.sql index fad8ce1..3d6e664 100644 --- a/models/bitquery/silver/silver__bitquery_tx_count.sql +++ b/models/bitquery/silver/silver__bitquery_tx_count.sql @@ -2,7 +2,6 @@ {{ config( materialized = 'incremental', unique_key = ['blockchain', 'metric', 'block_date'], - full_refresh = false, tags = ['bitquery'] ) }} @@ -27,7 +26,8 @@ LATERAL FLATTEN( A.data :data :ripple :transactions ) b WHERE - A.metric = 'tx_count' + A.data :errors IS NULL + AND A.metric = 'tx_count' AND A.blockchain = 'ripple' {% if is_incremental() %} @@ -59,7 +59,8 @@ LATERAL FLATTEN( A.data :data :hedera :transactions ) b WHERE - A.metric = 'tx_count' + A.data :errors IS NULL + AND A.metric = 'tx_count' AND A.blockchain = 'hedera' {% if is_incremental() %} diff --git a/models/bitquery/streamline/streamline__bitquery_complete.sql b/models/bitquery/streamline/streamline__bitquery_complete.sql index db38448..d0d5610 100644 --- a/models/bitquery/streamline/streamline__bitquery_complete.sql +++ b/models/bitquery/streamline/streamline__bitquery_complete.sql @@ -4,8 +4,7 @@ materialized = "incremental", unique_key = [' date_day','blockchain','metric'], merge_exclude_columns = ["inserted_timestamp"], - tags = ['streamline_realtime'], - enabled = false + tags = ['streamline_realtime'] ) }} SELECT @@ -26,7 +25,7 @@ FROM {{ ref('bronze__bitquery_FR') }} {% endif %} WHERE - len(DATA :data) > 10 + DATA :errors IS NULL {% if is_incremental() %} AND _inserted_timestamp >= ( @@ -34,13 +33,12 @@ AND _inserted_timestamp >= ( COALESCE(MAX(_INSERTED_TIMESTAMP), '1970-01-01' :: DATE) max_INSERTED_TIMESTAMP FROM {{ this }}) - AND DATA IS NOT NULL {% endif %} qualify ROW_NUMBER() over ( PARTITION BY date_day, blockchain, - metric, + metric ORDER BY _inserted_timestamp DESC ) = 1 diff --git a/models/bitquery/streamline/streamline__bitquery_metrics.sql b/models/bitquery/streamline/streamline__bitquery_metrics.sql index fe78806..59e7107 100644 --- a/models/bitquery/streamline/streamline__bitquery_metrics.sql +++ b/models/bitquery/streamline/streamline__bitquery_metrics.sql @@ -8,23 +8,27 @@ WITH metrics AS ( SELECT 'hedera' AS blockchain, 'tx_count' AS metric, - 'query ($network: HederaNetwork!, $dateFormat: String!, $from: ISO8601DateTime, $till: ISO8601DateTime) {hedera(network: $network) {transactions(options: {asc: "date.date"}, date: {since: $from, till: $till}) {date {date(format: $dateFormat)} count: countBigInt}}}' AS query_text + 'query ($network: HederaNetwork!, $dateFormat: String!, $from: ISO8601DateTime, $till: ISO8601DateTime) {hedera(network: $network) {transactions(options: {asc: "date.date"}, date: {since: $from, till: $till}) {date {date(format: $dateFormat)} count: countBigInt}}}' AS query_text, + 'Count of tx hashes by day' AS description UNION ALL SELECT 'ripple' AS blockchain, 'tx_count' AS metric, - '' AS query_text + '' AS query_text, + 'Count of tx hashes by day' AS description UNION ALL --idk how we're defining active users - thinking it will just be the unique count over the last 30 days SELECT 'hedera' AS blockchain, 'active_users' AS metric, - 'query ($network: HederaNetwork!, $from: ISO8601DateTime, $till: ISO8601DateTime) {hedera(network: $network) {transactions(date: {since: $from, till: $till}) { countBigInt(uniq: payer_account)}}}' AS query_text + 'query ($network: HederaNetwork!, $from: ISO8601DateTime, $till: ISO8601DateTime) {hedera(network: $network) {transactions(date: {since: $from, till: $till}) { countBigInt(uniq: payer_account)}}}' AS query_text, + 'distinct counts of payer accounts over the last 30 days' AS description UNION ALL SELECT 'ripple' AS blockchain, 'active_users' AS metric, - '' AS query_text + '' AS query_text, + 'distinct counts of senders over the last 30 days' AS description ) SELECT date_day, @@ -60,7 +64,8 @@ SELECT 'dateFormat', '%Y-%m-%d' ) - END AS variables + END AS variables, + description FROM {{ source( 'crosschain_core', diff --git a/models/bitquery/streamline/streamline__bitquery_realtime.sql b/models/bitquery/streamline/streamline__bitquery_realtime.sql index afbb7e4..fcad59a 100644 --- a/models/bitquery/streamline/streamline__bitquery_realtime.sql +++ b/models/bitquery/streamline/streamline__bitquery_realtime.sql @@ -23,20 +23,13 @@ WITH metrics AS ( query_text, variables FROM - {{ ref("streamline__bitquery_metrics") }} - qualify ROW_NUMBER() over ( - PARTITION BY blockchain, - metric - ORDER BY - date_day DESC - ) = 3 {# EXCEPT - SELECT - date_day, - blockchain, - metric - FROM - {{ ref("streamline__blocks_complete") }} - #} + {{ ref("streamline__bitquery_metrics") }} A + LEFT JOIN {{ ref("streamline__bitquery_metrics") }} + b USING ( + blockchain, + metric, + date_day + ) ) SELECT TO_NUMBER(to_char(date_day, 'YYYYMMDD')) AS date_day,