diff --git a/.github/workflows/dbt_run_axelscan.yml b/.github/workflows/dbt_run_axelscan.yml index 9b84598..8e74246 100644 --- a/.github/workflows/dbt_run_axelscan.yml +++ b/.github/workflows/dbt_run_axelscan.yml @@ -56,7 +56,9 @@ jobs: dbt run -m "axelar_models,tag:axelscan_search" dbt run -m "axelar_models,tag:axelscan_search" dbt run -m "axelar_models,tag:axelscan_search" - + dbt run -m 1+models/streamline/realtime/streamline__axelscan_searchgmp_realtime.sql 1+models/streamline/complete/streamline__axelscan_day_counts_gmp_complete.sql --vars '{"STREAMLINE_INVOKE_STREAMS":True}' + + diff --git a/models/bronze/bronze_api/axelscan/bronze_api__axelscan_day_counts_gmp.sql b/models/bronze/bronze_api/axelscan/bronze_api__axelscan_day_counts_gmp.sql index a36fea2..9856faa 100644 --- a/models/bronze/bronze_api/axelscan/bronze_api__axelscan_day_counts_gmp.sql +++ b/models/bronze/bronze_api/axelscan/bronze_api__axelscan_day_counts_gmp.sql @@ -2,7 +2,8 @@ materialized = 'incremental', unique_key = 'date_day', full_refresh = false, - tags = ['axelscan'] + tags = ['axelscan'], + enabled = false ) }} WITH dates_hist AS ( diff --git a/models/bronze/bronze_api/axelscan/bronze_api__axelscan_id_groups_gmp.sql b/models/bronze/bronze_api/axelscan/bronze_api__axelscan_id_groups_gmp.sql index 61ab065..4fe67c9 100644 --- a/models/bronze/bronze_api/axelscan/bronze_api__axelscan_id_groups_gmp.sql +++ b/models/bronze/bronze_api/axelscan/bronze_api__axelscan_id_groups_gmp.sql @@ -2,7 +2,8 @@ materialized = 'incremental', unique_key = 'date_day', incremental_strategy = 'delete+insert', - tags = ['axelscan'] + tags = ['axelscan'], + enabled = false ) }} WITH ids_days AS ( diff --git a/models/bronze/bronze_api/axelscan/bronze_api__axelscan_searchgmp.sql b/models/bronze/bronze_api/axelscan/bronze_api__axelscan_searchgmp.sql index 11b105d..ab5a498 100644 --- a/models/bronze/bronze_api/axelscan/bronze_api__axelscan_searchgmp.sql +++ b/models/bronze/bronze_api/axelscan/bronze_api__axelscan_searchgmp.sql @@ -2,7 +2,8 @@ materialized = 'incremental', full_refresh = false, cluster_by = ['_inserted_timestamp::DATE'], - tags = ['axelscan','axelscan_search'] + tags = ['axelscan','axelscan_search'], + enabled = false ) }} WITH max_id AS ( diff --git a/models/gold/axelscan/axelscan__fact_gmp.sql b/models/gold/axelscan/axelscan__fact_gmp.sql index 10e8b76..bb49cf0 100644 --- a/models/gold/axelscan/axelscan__fact_gmp.sql +++ b/models/gold/axelscan/axelscan__fact_gmp.sql @@ -2,7 +2,8 @@ materialized = 'view', meta ={ 'database_tags':{ 'table':{ 'PURPOSE': 'AXELSCAN', }} }, - tags = ['noncore'] + tags = ['noncore'], + enabled = false ) }} SELECT diff --git a/models/silver/defi/axelscan/silver__axelscan_gmp.sql b/models/silver/defi/axelscan/silver__axelscan_gmp.sql index fffab70..4f7f9ae 100644 --- a/models/silver/defi/axelscan/silver__axelscan_gmp.sql +++ b/models/silver/defi/axelscan/silver__axelscan_gmp.sql @@ -5,7 +5,8 @@ merge_exclude_columns = ["inserted_timestamp"], cluster_by = 'created_at::DATE', post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION ON EQUALITY(id);", - tags = ['noncore'] + tags = ['noncore'], + enabled = false ) }} WITH base AS ( diff --git a/models/streamline/complete/streamline__axelscan_day_counts_gmp_complete.sql b/models/streamline/complete/streamline__axelscan_day_counts_gmp_complete.sql index c426978..0453fe8 100644 --- a/models/streamline/complete/streamline__axelscan_day_counts_gmp_complete.sql +++ b/models/streamline/complete/streamline__axelscan_day_counts_gmp_complete.sql @@ -1,7 +1,7 @@ -- depends_on: {{ ref('bronze__axelscan_day_counts_gmp') }} {{ config ( materialized = "incremental", - unique_key = 'date_day', + unique_key = ['date_day','from_time'], tags = ['streamline_axelscan'] ) }} @@ -11,7 +11,7 @@ SELECT VALUE :TO_TIME :: bigint AS TO_TIME, DATA :total AS day_count, {{ dbt_utils.generate_surrogate_key( - ['date_day'] + ['date_day','from_time'] ) }} AS axelscan_day_counts_gmp_complete_id, inserted_timestamp, SYSDATE() AS modified_timestamp, @@ -30,6 +30,6 @@ WHERE {{ ref('bronze__axelscan_day_counts_gmp_FR') }} {% endif %} - qualify(ROW_NUMBER() over (PARTITION BY date_day + qualify(ROW_NUMBER() over (PARTITION BY date_day, from_time ORDER BY inserted_timestamp DESC)) = 1 diff --git a/models/streamline/complete/streamline__axelscan_searchgmp_complete.sql b/models/streamline/complete/streamline__axelscan_searchgmp_complete.sql index aa3c795..4c25753 100644 --- a/models/streamline/complete/streamline__axelscan_searchgmp_complete.sql +++ b/models/streamline/complete/streamline__axelscan_searchgmp_complete.sql @@ -9,8 +9,10 @@ SELECT partition_key AS date_day, VALUE :ID :: INT AS id, + VALUE :FROM_TIME :: bigint AS from_time, + VALUE :TO_TIME :: bigint AS TO_TIME, {{ dbt_utils.generate_surrogate_key( - ['date_day','id'] + ['date_day','id','from_time'] ) }} AS axelscan_searchgmp_complete_id, inserted_timestamp, SYSDATE() AS modified_timestamp, diff --git a/models/streamline/realtime/streamline__axelscan_day_counts_gmp_realtime.sql b/models/streamline/realtime/streamline__axelscan_day_counts_gmp_realtime.sql index e56ab70..be953f1 100644 --- a/models/streamline/realtime/streamline__axelscan_day_counts_gmp_realtime.sql +++ b/models/streamline/realtime/streamline__axelscan_day_counts_gmp_realtime.sql @@ -4,11 +4,12 @@ func = 'streamline.udf_rest_api', target = "{{this.schema}}.{{this.identifier}}", params ={ "external_table" :"axelscan_day_counts_gmp", - "sql_limit" :"200", - "producer_batch_size" :"100", - "worker_batch_size" :"100", + "sql_limit" :"4000", + "producer_batch_size" :"200", + "worker_batch_size" :"200", "sql_source" :"{{this.identifier}}", - "order_by_column": "date_day" } + "order_by_column": "ob", + "async_concurrent_requests": "5" } ), tags = ['streamline_axelscan'] ) }} @@ -17,37 +18,45 @@ WITH dates_hist AS ( SELECT A.date_day, - DATE_PART( + (ROW_NUMBER() over (PARTITION BY A.date_day + ORDER BY + SEQ4()) - 1) * 60 + DATE_PART( epoch_second, A.date_day ) AS from_time, - DATE_PART(epoch_second, DATEADD (DAY, 1, A.date_day)) -1 AS TO_TIME + from_time AS ft, + from_time + 59 AS TO_TIME FROM {{ source( 'crosschain', 'dim_dates' ) }} A + JOIN TABLE(GENERATOR(rowcount => 1440)) x LEFT JOIN {{ ref('streamline__axelscan_day_counts_gmp_complete') }} b ON A.date_day = b.date_day + AND ft = b.from_time WHERE - A.date_day BETWEEN '2022-05-09' + A.date_day BETWEEN '2024-12-10' AND SYSDATE() :: DATE - 2 AND b.date_day IS NULL ), dates_recent AS ( SELECT date_day, - DATE_PART( + (ROW_NUMBER() over (PARTITION BY date_day + ORDER BY + SEQ4()) - 1) * 60 + DATE_PART( epoch_second, date_day ) AS from_time, - DATE_PART(epoch_second, DATEADD (DAY, 1, date_day)) -1 AS TO_TIME + from_time + 59 AS TO_TIME FROM {{ source( 'crosschain', 'dim_dates' ) }} + JOIN TABLE(GENERATOR(rowcount => 1440)) x WHERE date_day BETWEEN SYSDATE() :: DATE - 1 AND SYSDATE() :: DATE @@ -55,7 +64,7 @@ dates_recent AS ( date_combo AS ( SELECT date_day, - from_time, + ft AS from_time, TO_TIME FROM dates_hist @@ -74,8 +83,9 @@ SELECT ) AS partition_key, from_time, TO_TIME, + partition_key || '-' || from_time :: STRING AS ob, {{ target.database }}.live.udf_api( - 'GET', + 'POST', 'https://api.gmp.axelarscan.io', OBJECT_CONSTRUCT(), OBJECT_CONSTRUCT( diff --git a/models/streamline/realtime/streamline__axelscan_searchgmp_realtime.sql b/models/streamline/realtime/streamline__axelscan_searchgmp_realtime.sql index 7562b2d..907364d 100644 --- a/models/streamline/realtime/streamline__axelscan_searchgmp_realtime.sql +++ b/models/streamline/realtime/streamline__axelscan_searchgmp_realtime.sql @@ -4,17 +4,18 @@ func = 'streamline.udf_rest_api', target = "{{this.schema}}.{{this.identifier}}", params ={ "external_table" :"axelscan_searchgmp", - "sql_limit" :"1000", - "producer_batch_size" :"500", - "worker_batch_size" :"500", + "sql_limit" :"4000", + "producer_batch_size" :"100", + "worker_batch_size" :"100", "sql_source" :"{{this.identifier}}", - "order_by_column": "ob" } + "order_by_column": "ob", + "async_concurrent_requests" :"5" } ), tags = ['streamline_axelscan'] ) }} ---set a arbitrary limit for the number of ids to pull to speed up the performance. Shouldn't be more than 200K records in a day +--This is the max number of ids that the api supports for the searchGMP method {% set limit = var( - 'AXELSCAN_ID_LIMIT', 200000 + 'AXELSCAN_ID_LIMIT', 501 ) %} WITH ids AS ( @@ -50,6 +51,7 @@ ids_topull AS ( b ON A.date_day = b.date_day AND A.id = b.id + AND A.from_time = b.from_time WHERE b.date_day IS NULL ) @@ -59,9 +61,11 @@ SELECT '-' ) AS partition_key, id, - partition_key || '-' || id :: STRING AS ob, + from_time, + TO_TIME, + partition_key || '-' || from_time || '-' || id :: STRING AS ob, {{ target.database }}.live.udf_api( - 'GET', + 'POST', 'https://api.gmp.axelarscan.io', OBJECT_CONSTRUCT(), OBJECT_CONSTRUCT( @@ -81,5 +85,3 @@ FROM ids_topull ORDER BY ob -LIMIT - 2000