prod config - complete model, active users silver model

This commit is contained in:
Eric Laurello 2025-03-05 17:01:21 -05:00
parent c7ff545927
commit 7904bb6c31
8 changed files with 165 additions and 30 deletions

View File

@ -52,7 +52,7 @@ vars:
"dbt_date:time_zone": GMT
OBSERV_FULL_TEST: False
START_GHA_TASKS: False
STREAMLINE_INVOKE_STREAMS: True
STREAMLINE_INVOKE_STREAMS: False
STREAMLINE_USE_DEV_FOR_EXTERNAL_TABLES: True
STREAMLINE_RUN_HISTORY: False
STREAMLINE_RETRY_UNKNOWN: False
@ -75,7 +75,7 @@ vars:
prod:
API_INTEGRATION: AWS_EXTERNAL_API_PRO_V2
EXTERNAL_FUNCTION_URI:
EXTERNAL_FUNCTION_URI: zv7a5qfhv9.execute-api.us-east-1.amazonaws.com/prod/
ROLES:
- AWS_LAMBDA_EXTERNAL_API
- INTERNAL_DEV

View File

@ -3,7 +3,7 @@
OR REPLACE EXTERNAL FUNCTION streamline.udf_bulk_rest_api_v2(
json OBJECT
) returns ARRAY {% if target.database == 'EXTERNAL:' -%}
api_integration = aws_external_api_prod AS ''
api_integration = aws_external_api_prod AS 'https://zv7a5qfhv9.execute-api.us-east-1.amazonaws.com/prod/udf_bulk_rest_api'
{% else %}
api_integration = aws_external_api_stg_v2 AS 'https://qoupd0givh.execute-api.us-east-1.amazonaws.com/stg/udf_bulk_rest_api'
{%- endif %}

View File

@ -0,0 +1,105 @@
-- depends_on: {{ ref('bronze__bitquery') }}
{{ config(
materialized = 'incremental',
unique_key = ['blockchain', 'metric', 'as_of_date'],
tags = ['bitquery']
) }}
WITH ripple AS (
SELECT
A.blockchain,
A.metric,
A.date_Day AS as_of_date,
b.value :countBigInt AS active_users,
A._inserted_timestamp
FROM
{% if is_incremental() %}
{{ ref('bronze__bitquery') }}
{% else %}
{{ ref('bronze__bitquery_FR') }}
{% endif %}
A,
LATERAL FLATTEN(
A.data :data :ripple :transactions
) b
WHERE
A.data :errors IS NULL
AND A.metric = 'active_users'
AND A.blockchain = 'ripple'
{% if is_incremental() %}
AND _inserted_timestamp :: DATE > (
SELECT
MAX(_inserted_timestamp) :: DATE
FROM
{{ this }}
)
{% endif %}
),
hedera AS (
SELECT
A.blockchain,
A.metric,
A.date_Day AS as_of_date,
b.value :countBigInt AS active_users,
A._inserted_timestamp
FROM
{% if is_incremental() %}
{{ ref('bronze__bitquery_FR') }}
{% else %}
{{ ref('bronze__bitquery_FR') }}
{% endif %}
A,
LATERAL FLATTEN(
A.data :data :hedera :transactions
) b
WHERE
A.data :errors IS NULL
AND A.metric = 'active_users'
AND A.blockchain = 'hedera'
{% if is_incremental() %}
AND _inserted_timestamp :: DATE > (
SELECT
MAX(_inserted_timestamp) :: DATE
FROM
{{ this }}
)
{% endif %}
),
ua AS (
SELECT
*
FROM
ripple
UNION ALL
SELECT
*
FROM
hedera
)
SELECT
blockchain,
metric,
as_of_date,
active_users,
_inserted_timestamp,
{{ dbt_utils.generate_surrogate_key(
['blockchain','metric','as_of_date']
) }} AS accounts_id,
SYSDATE() AS inserted_timestamp,
SYSDATE() AS modified_timestamp,
'{{ invocation_id }}' AS _invocation_id
FROM
ua qualify ROW_NUMBER() over (
PARTITION BY blockchain,
metric,
as_of_date
ORDER BY
_inserted_timestamp DESC
) = 1

View File

@ -0,0 +1,33 @@
version: 2
models:
- name: silver__bitquery_active_users
tests:
- dbt_utils.unique_combination_of_columns:
combination_of_columns:
- BLOCKCHAIN
- METRIC
- AS_OF_DATE
columns:
- name: BLOCKCHAIN
tests:
- not_null
- name: METRIC
tests:
- not_null
- name: AS_OF_DATE
tests:
- not_null
- dbt_expectations.expect_row_values_to_have_recent_data:
datepart: day
interval: 2
- name: ACTIVE_USERS
tests:
- not_null
- name: _INSERTED_TIMESTAMP
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- TIMESTAMP_NTZ

View File

@ -2,7 +2,6 @@
{{ config(
materialized = 'incremental',
unique_key = ['blockchain', 'metric', 'block_date'],
full_refresh = false,
tags = ['bitquery']
) }}
@ -27,7 +26,8 @@ LATERAL FLATTEN(
A.data :data :ripple :transactions
) b
WHERE
A.metric = 'tx_count'
A.data :errors IS NULL
AND A.metric = 'tx_count'
AND A.blockchain = 'ripple'
{% if is_incremental() %}
@ -59,7 +59,8 @@ LATERAL FLATTEN(
A.data :data :hedera :transactions
) b
WHERE
A.metric = 'tx_count'
A.data :errors IS NULL
AND A.metric = 'tx_count'
AND A.blockchain = 'hedera'
{% if is_incremental() %}

View File

@ -4,8 +4,7 @@
materialized = "incremental",
unique_key = [' date_day','blockchain','metric'],
merge_exclude_columns = ["inserted_timestamp"],
tags = ['streamline_realtime'],
enabled = false
tags = ['streamline_realtime']
) }}
SELECT
@ -26,7 +25,7 @@ FROM
{{ ref('bronze__bitquery_FR') }}
{% endif %}
WHERE
len(DATA :data) > 10
DATA :errors IS NULL
{% if is_incremental() %}
AND _inserted_timestamp >= (
@ -34,13 +33,12 @@ AND _inserted_timestamp >= (
COALESCE(MAX(_INSERTED_TIMESTAMP), '1970-01-01' :: DATE) max_INSERTED_TIMESTAMP
FROM
{{ this }})
AND DATA IS NOT NULL
{% endif %}
qualify ROW_NUMBER() over (
PARTITION BY date_day,
blockchain,
metric,
metric
ORDER BY
_inserted_timestamp DESC
) = 1

View File

@ -8,23 +8,27 @@ WITH metrics AS (
SELECT
'hedera' AS blockchain,
'tx_count' AS metric,
'query ($network: HederaNetwork!, $dateFormat: String!, $from: ISO8601DateTime, $till: ISO8601DateTime) {hedera(network: $network) {transactions(options: {asc: "date.date"}, date: {since: $from, till: $till}) {date {date(format: $dateFormat)} count: countBigInt}}}' AS query_text
'query ($network: HederaNetwork!, $dateFormat: String!, $from: ISO8601DateTime, $till: ISO8601DateTime) {hedera(network: $network) {transactions(options: {asc: "date.date"}, date: {since: $from, till: $till}) {date {date(format: $dateFormat)} count: countBigInt}}}' AS query_text,
'Count of tx hashes by day' AS description
UNION ALL
SELECT
'ripple' AS blockchain,
'tx_count' AS metric,
'' AS query_text
'' AS query_text,
'Count of tx hashes by day' AS description
UNION ALL
--idk how we're defining active users - thinking it will just be the unique count over the last 30 days
SELECT
'hedera' AS blockchain,
'active_users' AS metric,
'query ($network: HederaNetwork!, $from: ISO8601DateTime, $till: ISO8601DateTime) {hedera(network: $network) {transactions(date: {since: $from, till: $till}) { countBigInt(uniq: payer_account)}}}' AS query_text
'query ($network: HederaNetwork!, $from: ISO8601DateTime, $till: ISO8601DateTime) {hedera(network: $network) {transactions(date: {since: $from, till: $till}) { countBigInt(uniq: payer_account)}}}' AS query_text,
'distinct counts of payer accounts over the last 30 days' AS description
UNION ALL
SELECT
'ripple' AS blockchain,
'active_users' AS metric,
'' AS query_text
'' AS query_text,
'distinct counts of senders over the last 30 days' AS description
)
SELECT
date_day,
@ -60,7 +64,8 @@ SELECT
'dateFormat',
'%Y-%m-%d'
)
END AS variables
END AS variables,
description
FROM
{{ source(
'crosschain_core',

View File

@ -23,20 +23,13 @@ WITH metrics AS (
query_text,
variables
FROM
{{ ref("streamline__bitquery_metrics") }}
qualify ROW_NUMBER() over (
PARTITION BY blockchain,
metric
ORDER BY
date_day DESC
) = 3 {# EXCEPT
SELECT
date_day,
blockchain,
metric
FROM
{{ ref("streamline__blocks_complete") }}
#}
{{ ref("streamline__bitquery_metrics") }} A
LEFT JOIN {{ ref("streamline__bitquery_metrics") }}
b USING (
blockchain,
metric,
date_day
)
)
SELECT
TO_NUMBER(to_char(date_day, 'YYYYMMDD')) AS date_day,