From dc442db717f91cb59d889bd2724e23253d88555d Mon Sep 17 00:00:00 2001 From: Stanley Date: Thu, 10 Apr 2025 23:11:12 +0700 Subject: [PATCH] AN-5829-Artemis (#98) Co-authored-by: Eric Laurello --- .github/workflows/dbt_run_daily.yml | 2 +- .../workflows/dbt_run_streamline_daily.yml | 2 +- models/artemis/bronze/bronze__artemis.sql | 9 ++ models/artemis/bronze/bronze__artemis_FR.sql | 9 ++ models/artemis/gold/artemis__active_users.sql | 34 ++++++ models/artemis/gold/artemis__active_users.yml | 30 +++++ models/artemis/gold/artemis__tx_count.sql | 35 ++++++ models/artemis/gold/artemis__tx_count.yml | 30 +++++ models/artemis/silver/silver__artemis.sql | 62 ++++++++++ models/artemis/silver/silver__artemis.yml | 29 +++++ .../streamline__artemis_complete.sql | 67 +++++++++++ .../streamline__artemis_metrics.sql | 112 ++++++++++++++++++ .../streamline__artemis_realtime.sql | 83 +++++++++++++ models/sources.yml | 1 + 14 files changed, 503 insertions(+), 2 deletions(-) create mode 100644 models/artemis/bronze/bronze__artemis.sql create mode 100644 models/artemis/bronze/bronze__artemis_FR.sql create mode 100644 models/artemis/gold/artemis__active_users.sql create mode 100644 models/artemis/gold/artemis__active_users.yml create mode 100644 models/artemis/gold/artemis__tx_count.sql create mode 100644 models/artemis/gold/artemis__tx_count.yml create mode 100644 models/artemis/silver/silver__artemis.sql create mode 100644 models/artemis/silver/silver__artemis.yml create mode 100644 models/artemis/streamline/streamline__artemis_complete.sql create mode 100644 models/artemis/streamline/streamline__artemis_metrics.sql create mode 100644 models/artemis/streamline/streamline__artemis_realtime.sql diff --git a/.github/workflows/dbt_run_daily.yml b/.github/workflows/dbt_run_daily.yml index 97bd72b..4a562ff 100644 --- a/.github/workflows/dbt_run_daily.yml +++ b/.github/workflows/dbt_run_daily.yml @@ -44,4 +44,4 @@ jobs: dbt deps - name: Run DBT Jobs run: | - dbt run -m external_models,tag:defillama external_models,tag:deepnftvalue external_models,tag:core external_models,tag:blast external_models,tag:polymarket external_models,tag:bitquery external_models,tag:oklink --exclude models/defillama/bronze/bronze__defillama_stablecoin_supply.sql+ \ No newline at end of file + dbt run -m external_models,tag:defillama external_models,tag:deepnftvalue external_models,tag:core external_models,tag:blast external_models,tag:polymarket external_models,tag:bitquery external_models,tag:oklink external_models,tag:artemis --exclude models/defillama/bronze/bronze__defillama_stablecoin_supply.sql+ \ No newline at end of file diff --git a/.github/workflows/dbt_run_streamline_daily.yml b/.github/workflows/dbt_run_streamline_daily.yml index 38a1586..4f9b487 100644 --- a/.github/workflows/dbt_run_streamline_daily.yml +++ b/.github/workflows/dbt_run_streamline_daily.yml @@ -44,4 +44,4 @@ jobs: dbt deps - name: Run DBT Jobs run: | - dbt run --vars '{"STREAMLINE_INVOKE_STREAMS":True}' -m 1+models/bitquery/streamline/streamline__bitquery_realtime.sql 1+models/oklink/streamline/streamline__oklink_realtime.sql \ No newline at end of file + dbt run --vars '{"STREAMLINE_INVOKE_STREAMS":True}' -m 1+models/bitquery/streamline/streamline__bitquery_realtime.sql 1+models/oklink/streamline/streamline__oklink_realtime.sql 1+models/artemis/streamline/streamline__artemis_realtime.sql \ No newline at end of file diff --git a/models/artemis/bronze/bronze__artemis.sql b/models/artemis/bronze/bronze__artemis.sql new file mode 100644 index 0000000..b2b69be --- /dev/null +++ b/models/artemis/bronze/bronze__artemis.sql @@ -0,0 +1,9 @@ +{{ config ( + materialized = 'view' +) }} +{{ streamline_external_table_query_v2( + model = 'artemis', + partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 3), '_', 1) AS INTEGER)", + partition_name = "partition_key", + other_cols = "to_date(value:DATE_DAY::STRING,'YYYYMMDD') AS DATE_DAY, value:data AS raw_data" +) }} \ No newline at end of file diff --git a/models/artemis/bronze/bronze__artemis_FR.sql b/models/artemis/bronze/bronze__artemis_FR.sql new file mode 100644 index 0000000..bc00585 --- /dev/null +++ b/models/artemis/bronze/bronze__artemis_FR.sql @@ -0,0 +1,9 @@ +{{ config ( + materialized = 'view' +) }} +{{ streamline_external_table_FR_query_v2( + model = 'artemis', + partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 3), '_', 1) AS INTEGER)", + partition_name = "partition_key", + other_cols = "to_date(value:DATE_DAY::STRING,'YYYYMMDD') AS DATE_DAY, value:data AS raw_data" +) }} \ No newline at end of file diff --git a/models/artemis/gold/artemis__active_users.sql b/models/artemis/gold/artemis__active_users.sql new file mode 100644 index 0000000..e58938f --- /dev/null +++ b/models/artemis/gold/artemis__active_users.sql @@ -0,0 +1,34 @@ +{{ config( + materialized = 'incremental', + unique_key = ['block_date', 'blockchain', 'metric'], + tags = ['artemis'] +) }} + +SELECT + blockchain, + 'active_users' AS metric, + 'The reported number of active users as of the block_date' AS description, + metric_date AS block_date, + metric_value AS active_users, + _inserted_timestamp, + {{ dbt_utils.generate_surrogate_key(['blockchain', 'metric', 'block_date']) }} AS active_users_id, + SYSDATE() AS inserted_timestamp, + SYSDATE() AS modified_timestamp, + '{{ invocation_id }}' AS _invocation_id +FROM + {{ ref('silver__artemis') }} +WHERE + metric = 'dau' + AND metric_value IS NOT NULL + + {% if is_incremental() %} + AND _inserted_timestamp >= ( + SELECT COALESCE(MAX(_inserted_timestamp), '1970-01-01'::TIMESTAMP_NTZ) + FROM {{ this }} + ) + {% endif %} +QUALIFY ROW_NUMBER() OVER ( + PARTITION BY active_users_id + ORDER BY + _inserted_timestamp DESC + ) = 1 \ No newline at end of file diff --git a/models/artemis/gold/artemis__active_users.yml b/models/artemis/gold/artemis__active_users.yml new file mode 100644 index 0000000..9123f70 --- /dev/null +++ b/models/artemis/gold/artemis__active_users.yml @@ -0,0 +1,30 @@ +version: 2 + +models: + - name: artemis__active_users + description: "This model returns the number of active users for each blockchain and block date" + + columns: + - name: BLOCKCHAIN + description: "The blockchain where active users are counted from." + tests: + - not_null + - name: METRIC + description: "The metric name is always set as 'active_users'" + - name: DESCRIPTION + description: "The metric description" + - name: BLOCK_DATE + description: "The date as of which the active users are counted." + tests: + - not_null + - dbt_expectations.expect_row_values_to_have_recent_data: + datepart: day + interval: 2 + - name: ACTIVE_USERS + description: "The number of active users for the given blockchain and as of date." + tests: + - not_null + - name: ACTIVE_USERS_ID + description: "The surrogate key for the tx count." + tests: + - unique \ No newline at end of file diff --git a/models/artemis/gold/artemis__tx_count.sql b/models/artemis/gold/artemis__tx_count.sql new file mode 100644 index 0000000..d67b724 --- /dev/null +++ b/models/artemis/gold/artemis__tx_count.sql @@ -0,0 +1,35 @@ +{{ config( + materialized = 'incremental', + unique_key = ['block_date', 'blockchain', 'metric'], + tags = ['artemis'] +) }} + +SELECT + blockchain, + 'tx_count' AS metric, + 'The reported number of transactions as of the block_date' AS description, + metric_date AS block_date, + metric_value AS tx_count, + _inserted_timestamp, + {{ dbt_utils.generate_surrogate_key(['blockchain', 'metric', 'block_date']) }} AS tx_count_id, + SYSDATE() AS inserted_timestamp, + SYSDATE() AS modified_timestamp, + '{{ invocation_id }}' AS _invocation_id +FROM + {{ ref('silver__artemis') }} +WHERE + metric = 'daily_txns' + AND metric_value IS NOT NULL + + {% if is_incremental() %} + AND _inserted_timestamp >= ( + SELECT COALESCE(MAX(_inserted_timestamp), '1970-01-01'::TIMESTAMP_NTZ) + FROM {{ this }} + ) + {% endif %} + +QUALIFY ROW_NUMBER() OVER ( + PARTITION BY tx_count_id + ORDER BY + _inserted_timestamp DESC + ) = 1 \ No newline at end of file diff --git a/models/artemis/gold/artemis__tx_count.yml b/models/artemis/gold/artemis__tx_count.yml new file mode 100644 index 0000000..4e91fd4 --- /dev/null +++ b/models/artemis/gold/artemis__tx_count.yml @@ -0,0 +1,30 @@ +version: 2 + +models: + - name: artemis__tx_count + description: "This model returns the number of transactions for each blockchain and block date" + + columns: + - name: BLOCKCHAIN + description: "The blockchain where tx count is counted from." + tests: + - not_null + - name: METRIC + description: "The metric name is always set as 'tx_count'" + - name: DESCRIPTION + description: "The metric description" + - name: BLOCK_DATE + description: "The date as of which the tx count is counted." + tests: + - not_null + - dbt_expectations.expect_row_values_to_have_recent_data: + datepart: day + interval: 2 + - name: TX_COUNT + description: "The number of tx count for the given blockchain and as of date." + tests: + - not_null + - name: TX_COUNT_ID + description: "The surrogate key for the tx count." + tests: + - unique \ No newline at end of file diff --git a/models/artemis/silver/silver__artemis.sql b/models/artemis/silver/silver__artemis.sql new file mode 100644 index 0000000..07b905a --- /dev/null +++ b/models/artemis/silver/silver__artemis.sql @@ -0,0 +1,62 @@ +-- depends_on: {{ ref("bronze__artemis")}} +-- depends_on: {{ ref("bronze__artemis_FR")}} +{{ config( + materialized = "incremental", + unique_key = ['metric_date', 'blockchain', 'metric'], + tags = ['silver', 'artemis'] +)}} + +WITH source_data AS ( + SELECT + raw_data, + partition_key, + _inserted_timestamp, + file_name + FROM + +{% if is_incremental() %} + {{ ref('bronze__artemis') }} +{% else %} + {{ ref('bronze__artemis_FR') }} +{% endif %} + +{% if is_incremental() %} + +WHERE _inserted_timestamp >= ( + SELECT + MAX(_inserted_timestamp) + FROM {{ this }} +) +{% endif %} +), +parsed_data AS ( + SELECT + s._inserted_timestamp, + s.partition_key, + s.file_name, + blockchain_flat.KEY AS blockchain, + metric_flat.KEY AS metric, + metrics.value:date::STRING AS metric_date, + metrics.value:val AS metric_value + FROM + source_data s, + LATERAL FLATTEN(INPUT => raw_data:data:artemis_ids) AS blockchain_flat, + LATERAL FLATTEN(INPUT => blockchain_flat.value) AS metric_flat, + LATERAL FLATTEN(INPUT => metric_flat.value) AS metrics + WHERE + raw_data:data:artemis_ids IS NOT NULL +) +SELECT + TO_DATE(metric_date) AS metric_date, + blockchain, + metric, + metric_value, + partition_key, + _inserted_timestamp, + file_name, + {{ dbt_utils.generate_surrogate_key(['blockchain', 'metric', 'metric_date']) }} AS artemis_id, + SYSDATE() AS inserted_timestamp, + SYSDATE() AS modified_timestamp, + '{{ invocation_id }}' AS _invocation_id +FROM + parsed_data \ No newline at end of file diff --git a/models/artemis/silver/silver__artemis.yml b/models/artemis/silver/silver__artemis.yml new file mode 100644 index 0000000..3c1d8b8 --- /dev/null +++ b/models/artemis/silver/silver__artemis.yml @@ -0,0 +1,29 @@ +version: 2 + +models: + - name: silver__artemis + description: "Parsed Artemis blockchain metrics data" + columns: + - name: metric_date + description: "The date of the metric measurement" + tests: + - not_null + + - name: blockchain + description: "Blockchain name" + tests: + - not_null + + - name: metric_value + description: "Reported metric value" + + - name: artemis_id + description: "Unique identifier for each metric data point" + tests: + - unique + - not_null + + - name: _inserted_timestamp + description: "Timestamp when record was inserted into the bronze layer" + tests: + - not_null \ No newline at end of file diff --git a/models/artemis/streamline/streamline__artemis_complete.sql b/models/artemis/streamline/streamline__artemis_complete.sql new file mode 100644 index 0000000..01aacd7 --- /dev/null +++ b/models/artemis/streamline/streamline__artemis_complete.sql @@ -0,0 +1,67 @@ +-- depends_on: {{ ref("bronze__artemis")}} +-- depends_on: {{ ref("bronze__artemis_FR")}} +{{ config ( + materialized = "incremental", + unique_key = ['date_day'], + merge_exclude_columns = ["inserted_timestamp"], + tags = ['streamline_realtime'] +) }} + +WITH bronze AS ( + + SELECT + date_day AS request_date_day, + data, + partition_key, + _inserted_timestamp, + file_name + FROM + + {% if is_incremental() %} + {{ ref('bronze__artemis') }} + {% else %} + {{ ref('bronze__artemis_FR') }} + {% endif %} + WHERE + DATA IS NOT NULL + + {% if is_incremental() %} + AND _inserted_timestamp >= ( + SELECT + COALESCE(MAX(_INSERTED_TIMESTAMP), '1970-01-01' :: DATE) max_INSERTED_TIMESTAMP + FROM + {{ this }}) + {% endif %} +), +extracted_dates AS ( + SELECT + request_date_day, + TO_DATE(date_vals.value:date::STRING) AS extracted_date, + data, + partition_key, + _inserted_timestamp, + file_name + FROM + bronze, + LATERAL FLATTEN(INPUT => data:data:artemis_ids) AS blockchain_flat, + LATERAL FLATTEN(INPUT => blockchain_flat.value) AS metric_flat, + LATERAL FLATTEN(INPUT => metric_flat.value) AS date_vals + WHERE + data:data:artemis_ids IS NOT NULL +) +SELECT + extracted_date as date_day, + partition_key, + _inserted_timestamp, + SYSDATE() AS inserted_timestamp, + SYSDATE() AS modified_timestamp, + file_name, + '{{ invocation_id }}' AS _invocation_id +FROM + extracted_dates + +QUALIFY ROW_NUMBER() OVER ( + PARTITION BY extracted_date + ORDER BY + _inserted_timestamp DESC +) = 1 diff --git a/models/artemis/streamline/streamline__artemis_metrics.sql b/models/artemis/streamline/streamline__artemis_metrics.sql new file mode 100644 index 0000000..6b1a362 --- /dev/null +++ b/models/artemis/streamline/streamline__artemis_metrics.sql @@ -0,0 +1,112 @@ +{{ config( + materialized = 'view', + tags = ['streamline_view'] +) }} + +-- Chains: Scroll, Sui, Stacks, Cardano, Celo, Linea, MultiversX, Starknet, Immutable, Celestia, Wormhole, Acala, Astar, zkSync, Gnosis, Metis, Centrifuge, Fantom, Sonic, Moonbeam, Unichain, Algorand, Berachain +-- Metrics: DAILY_TXNS (tx_count), DAU (active_users), DAU Cumulative for (TAM? - might introduce double counting since unique wallets are counted daily) + +WITH chains AS ( + + SELECT + 'scroll' AS artemis_id + UNION ALL + SELECT + 'sui' + UNION ALL + SELECT + 'stacks' + UNION ALL + SELECT + 'cardano' + UNION ALL + SELECT + 'celo' + UNION ALL + SELECT + 'linea' + UNION ALL + SELECT + 'multiversx' + UNION ALL + SELECT + 'starknet' + UNION ALL + SELECT + 'immutable_x' + UNION ALL + SELECT + 'celestia' + UNION ALL + SELECT + 'wormhole' + UNION ALL + SELECT + 'acala' + UNION ALL + SELECT + 'astar' + UNION ALL + SELECT + 'zksync' + UNION ALL + SELECT + 'gnosis' + UNION ALL + SELECT + 'metis' + UNION ALL + SELECT + 'centrifuge' + UNION ALL + SELECT + 'fantom' + UNION ALL + SELECT + 'sonic' + UNION ALL + SELECT + 'moonbeam' + UNION ALL + SELECT + 'unichain' + UNION ALL + SELECT + 'algorand' + UNION ALL + SELECT + 'berachain' +), +metrics AS ( + SELECT + artemis_id AS blockchain, + 'tx_count' AS metric, + '{Service}/data/' AS url, + 'daily_txns' AS endpoint, + 'Daily transaction count' AS description + FROM + chains + UNION ALL + SELECT + artemis_id AS blockchain, + 'active_users' AS metric, + '{Service}/data/' AS url, + 'dau' AS endpoint, + 'Daily active users count' AS description + FROM + chains +) +SELECT + date_day, + blockchain, + metric, + endpoint, + url, + description +FROM + {{ source( + 'crosschain_core', + 'dim_dates' + ) }} + CROSS JOIN metrics +WHERE date_day >= '2025-01-01' \ No newline at end of file diff --git a/models/artemis/streamline/streamline__artemis_realtime.sql b/models/artemis/streamline/streamline__artemis_realtime.sql new file mode 100644 index 0000000..40dee23 --- /dev/null +++ b/models/artemis/streamline/streamline__artemis_realtime.sql @@ -0,0 +1,83 @@ +{{ config ( + materialized = "view", + post_hook = fsc_utils.if_data_call_function_v2( + func = 'streamline.udf_bulk_rest_api_v2', + target = "{{this.schema}}.{{this.identifier}}", + params ={ "external_table" :"artemis", + "sql_limit" :"10", + "producer_batch_size" :"10", + "worker_batch_size" :"10", + "async_concurrent_requests": "1", + "sql_source" :"{{this.identifier}}", + "order_by_column": "date_day" } + ), + tags = ['streamline_realtime'] +) }} + +WITH complete_data AS ( + + {% if is_incremental() %} + SELECT + date_day, + _invocation_id, + MAX(date_day) OVER () AS max_complete_date + FROM + {{ ref("streamline__artemis_complete") }} + {% else %} + SELECT + null as date_day, + null as _invocation_id, + '2025-01-01'::DATE AS max_complete_date + {% endif %} +), +date_params AS ( + SELECT + COALESCE( + DATEADD(day, 1, (SELECT MAX(max_complete_date) FROM complete_data)), + '2025-01-01'::DATE -- Default backfill start date + ) AS min_date, + DATEADD(day, -2, SYSDATE()) AS max_date + FROM + complete_data +), +metrics AS ( + SELECT + m.date_day, + m.blockchain, + m.metric, + m.url, + m.endpoint, + TO_CHAR(p.min_date, 'YYYY-MM-DD') AS start_date, + TO_CHAR(p.max_date, 'YYYY-MM-DD') AS end_date + FROM + {{ ref("streamline__artemis_metrics") }} m + CROSS JOIN date_params p + LEFT JOIN complete_data c + ON m.date_day = c.date_day + WHERE + m.date_day between p.min_date and p.max_date + AND c._invocation_id IS NULL +), +batch_data AS ( + SELECT + MIN(date_day) AS first_day, + url, + REPLACE(LISTAGG(DISTINCT endpoint, ',') WITHIN GROUP (ORDER BY endpoint), ',', '%2C') AS encoded_endpoints, + REPLACE(LISTAGG(DISTINCT blockchain, ',') WITHIN GROUP (ORDER BY blockchain), ',', '%2C') AS encoded_ids, + MIN(start_date) AS min_start_date, + MAX(end_date) AS max_end_date + FROM + metrics + GROUP BY + 2 +) +SELECT + TO_NUMBER(TO_CHAR(first_day, 'YYYYMMDD')) AS DATE_DAY, + TO_NUMBER(TO_CHAR(SYSDATE(), 'YYYYMMDD')) AS partition_key, + {{ target.database }}.live.udf_api( + 'GET', + url || encoded_endpoints || '?APIKey={Authentication}' || '&artemisIds=' || encoded_ids || '&startDate=' || min_start_date || '&endDate=' || max_end_date, {}, {}, + 'Vault/prod/external/artemis' + ) AS request +FROM + batch_data diff --git a/models/sources.yml b/models/sources.yml index dee298a..ca8b613 100644 --- a/models/sources.yml +++ b/models/sources.yml @@ -7,6 +7,7 @@ sources: tables: - name: bitquery - name: oklink + - name: artemis - name: tokenflow_starknet_l1_data database: tokenflow_starknet schema: l1_data