diff --git a/macros/streamline/models.sql b/macros/streamline/models.sql index 271aa2e..0b3775d 100644 --- a/macros/streamline/models.sql +++ b/macros/streamline/models.sql @@ -87,3 +87,74 @@ WHERE b.{{ partition_name }} = s.{{ partition_name }} AND DATA :error :code IS NULL {% endmacro %} + +{% macro streamline_external_table_query_v2( + model, + partition_function + ) %} + + {% set days = var("BRONZE_LOOKBACK_DAYS")%} + + WITH meta AS ( + SELECT + last_modified AS inserted_timestamp, + file_name, + {{ partition_function }} AS partition_key + FROM + TABLE( + information_schema.external_table_file_registration_history( + start_time => DATEADD('day', -ABS({{days}}), CURRENT_TIMESTAMP()), + table_name => '{{ source( "bronze", model) }}') + ) A + ) + SELECT + s.*, + b.file_name, + inserted_timestamp + FROM + {{ source( + "bronze", + model + ) }} + s + JOIN meta b + ON b.file_name = metadata$filename + AND b.partition_key = s.partition_key + WHERE + b.partition_key = s.partition_key + AND DATA :error IS NULL +{% endmacro %} + +{% macro streamline_external_table_FR_query_v2( + model, + partition_function + ) %} + WITH meta AS ( + SELECT + registered_on AS inserted_timestamp, + file_name, + {{ partition_function }} AS partition_key + FROM + TABLE( + information_schema.external_table_files( + table_name => '{{ source( "bronze", model) }}' + ) + ) A + ) +SELECT + s.*, + b.file_name, + inserted_timestamp +FROM + {{ source( + "bronze", + model + ) }} + s + JOIN meta b + ON b.file_name = metadata$filename + AND b.partition_key = s.partition_key +WHERE + b.partition_key = s.partition_key + AND DATA :error IS NULL +{% endmacro %} \ No newline at end of file diff --git a/models/bronze/axelscan/bronze__axelscan_day_counts_gmp.sql b/models/bronze/axelscan/bronze__axelscan_day_counts_gmp.sql new file mode 100644 index 0000000..26b25ad --- /dev/null +++ b/models/bronze/axelscan/bronze__axelscan_day_counts_gmp.sql @@ -0,0 +1,7 @@ +{{ config ( + materialized = 'view' +) }} +{{ streamline_external_table_query_v2( + model = "axelscan_day_counts_gmp", + partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 3), '_', 1) AS INTEGER )" +) }} diff --git a/models/bronze/axelscan/bronze__axelscan_day_counts_gmp_FR.sql b/models/bronze/axelscan/bronze__axelscan_day_counts_gmp_FR.sql new file mode 100644 index 0000000..1a1e266 --- /dev/null +++ b/models/bronze/axelscan/bronze__axelscan_day_counts_gmp_FR.sql @@ -0,0 +1,7 @@ +{{ config ( + materialized = 'view' +) }} +{{ streamline_external_table_FR_query_v2( + model = "axelscan_day_counts_gmp", + partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 3), '_', 1) AS INTEGER )" +) }} diff --git a/models/bronze/axelscan/bronze__axelscan_searchgmp.sql b/models/bronze/axelscan/bronze__axelscan_searchgmp.sql new file mode 100644 index 0000000..26b25ad --- /dev/null +++ b/models/bronze/axelscan/bronze__axelscan_searchgmp.sql @@ -0,0 +1,7 @@ +{{ config ( + materialized = 'view' +) }} +{{ streamline_external_table_query_v2( + model = "axelscan_day_counts_gmp", + partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 3), '_', 1) AS INTEGER )" +) }} diff --git a/models/bronze/axelscan/bronze__axelscan_searchgmp_FR.sql b/models/bronze/axelscan/bronze__axelscan_searchgmp_FR.sql new file mode 100644 index 0000000..1a1e266 --- /dev/null +++ b/models/bronze/axelscan/bronze__axelscan_searchgmp_FR.sql @@ -0,0 +1,7 @@ +{{ config ( + materialized = 'view' +) }} +{{ streamline_external_table_FR_query_v2( + model = "axelscan_day_counts_gmp", + partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 3), '_', 1) AS INTEGER )" +) }} diff --git a/models/sources.yml b/models/sources.yml index 107031c..78b50b7 100644 --- a/models/sources.yml +++ b/models/sources.yml @@ -15,6 +15,7 @@ sources: - name: txs_details - name: validators - name: blocks_test_queue + - name: axelscan_day_counts_gmp - name: tokens database: osmosis schema: core diff --git a/models/streamline/complete/streamline__axelscan_day_counts_gmp_complete.sql b/models/streamline/complete/streamline__axelscan_day_counts_gmp_complete.sql new file mode 100644 index 0000000..fe294b9 --- /dev/null +++ b/models/streamline/complete/streamline__axelscan_day_counts_gmp_complete.sql @@ -0,0 +1,34 @@ +-- depends_on: {{ ref('bronze__axelscan_day_counts_gmp') }} +{{ config ( + materialized = "incremental", + unique_key = 'date_day', +) }} + +SELECT + (LEFT(partition_key, 4) || '-' || SUBSTRING(partition_key, 5, 2) || '-' || RIGHT(partition_key, 2)) :: DATE date_day, + VALUE :FROMTIME :: bigint AS fromTime, + VALUE :TOTIME :: bigint AS toTime, + DATA :total AS day_count, + {{ dbt_utils.generate_surrogate_key( + ['date_day'] + ) }} AS axelscan_day_counts_gmp_complete_ID, + inserted_timestamp, + SYSDATE() AS modified_timestamp, + '{{ invocation_id }}' AS _invocation_id +FROM + +{% if is_incremental() %} +{{ ref('bronze__axelscan_day_counts_gmp') }} +WHERE + inserted_timestamp >= ( + SELECT + COALESCE(MAX(inserted_timestamp), '1970-01-01' :: DATE) inserted_timestamp + FROM + {{ this }}) + {% else %} + {{ ref('bronze__axelscan_day_counts_gmp_FR') }} + {% endif %} + + qualify(ROW_NUMBER() over (PARTITION BY date_day + ORDER BY + inserted_timestamp DESC)) = 1 diff --git a/models/streamline/realtime/streamline__axelscan_day_counts_gmp_realtime.sql b/models/streamline/realtime/streamline__axelscan_day_counts_gmp_realtime.sql new file mode 100644 index 0000000..c560f5a --- /dev/null +++ b/models/streamline/realtime/streamline__axelscan_day_counts_gmp_realtime.sql @@ -0,0 +1,93 @@ +{{ config ( + materialized = "view", + post_hook = fsc_utils.if_data_call_function_v2( + func = 'streamline.udf_rest_api', + target = "{{this.schema}}.{{this.identifier}}", + params ={ "external_table" :"axelscan_day_counts_gmp", + "sql_limit" :"100", + "producer_batch_size" :"20", + "worker_batch_size" :"20", + "sql_source" :"{{this.identifier}}", + "order_by_column": "date_day" } + ) +) }} + +WITH dates_hist AS ( + + SELECT + A.date_day, + DATE_PART( + epoch_second, + A.date_day + ) AS fromTime, + DATE_PART(epoch_second, DATEADD (DAY, 1, A.date_day)) -1 AS toTime + FROM + {{ source( + 'crosschain', + 'dim_dates' + ) }} A {# LEFT JOIN {{ ref('streamline__axelscan_day_counts_gmp_complete') }} + b + ON A.date_day = b.date_day #} + WHERE + A.date_day BETWEEN '2022-05-09' + AND SYSDATE() :: DATE - 2 {# AND b.date_day IS NULL #} +), +dates_recent AS ( + SELECT + date_day, + DATE_PART( + epoch_second, + date_day + ) AS fromTime, + DATE_PART(epoch_second, DATEADD (DAY, 1, date_day)) -1 AS toTime + FROM + {{ source( + 'crosschain', + 'dim_dates' + ) }} + WHERE + date_day BETWEEN SYSDATE() :: DATE - 1 + AND SYSDATE() :: DATE +), +date_combo AS ( + SELECT + date_day, + fromTime, + toTime + FROM + dates_hist + UNION ALL + SELECT + date_day, + fromTime, + toTime + FROM + dates_recent +) +SELECT + REPLACE( + date_day :: STRING, + '-' + ) AS partition_key, + {# date_day, #} + fromTime, + toTime, + {{ target.database }}.live.udf_api( + 'GET', + 'https://api.gmp.axelarscan.io', + OBJECT_CONSTRUCT(), + OBJECT_CONSTRUCT( + 'method', + 'searchGMP', + 'fromTime', + fromTime, + 'toTime', + toTime, + 'size', + 1 + ) + ) AS request +FROM + date_combo +ORDER BY + date_day diff --git a/models/streamline/realtime/streamline__axelscan_day_counts_transfers_realtime.sql b/models/streamline/realtime/streamline__axelscan_day_counts_transfers_realtime.sql new file mode 100644 index 0000000..a6dc3fa --- /dev/null +++ b/models/streamline/realtime/streamline__axelscan_day_counts_transfers_realtime.sql @@ -0,0 +1,103 @@ +{{ config ( + materialized = "view", + post_hook = fsc_utils.if_data_call_function_v2( + func = 'streamline.udf_bulk_rest_api_v2', + target = "{{this.schema}}.{{this.identifier}}", + params ={ "external_table" :"axelscan_day_counts_transfers", + "sql_limit" :"100", + "producer_batch_size" :"20", + "worker_batch_size" :"20", + "sql_source" :"{{this.identifier}}", + "order_by_column": "date_day" } + ) +) }} + +WITH dates_hist AS ( + + SELECT + A.date_day, + DATE_PART( + epoch_second, + A.date_day + ) AS fromTime, + DATE_PART(epoch_second, DATEADD (DAY, 1, A.date_day)) -1 AS toTime + FROM + {{ source( + 'crosschain', + 'dim_dates' + ) }} A + +{% if is_incremental() %} +LEFT JOIN {{ ref('streamline__axelscan_day_counts_transfers_complete') }} +b +ON A.date_day = b.date_day +WHERE + b.date_day IS NULL + AND A.date_day BETWEEN '2021-12-23' + AND SYSDATE() :: DATE - 2 +{% else %} +WHERE + A.date_day BETWEEN '2021-12-23' + AND SYSDATE() :: DATE - 2 +{% endif %} +), +dates_recent AS ( + SELECT + date_day, + DATE_PART( + epoch_second, + date_day + ) AS fromTime, + DATE_PART(epoch_second, DATEADD (DAY, 1, date_day)) -1 AS toTime + FROM + {{ source( + 'crosschain', + 'dim_dates' + ) }} + WHERE + date_day BETWEEN SYSDATE() :: DATE - 1 + AND SYSDATE() :: DATE +), +date_combo AS ( + SELECT + date_day, + fromTime, + toTime + FROM + dates_hist + UNION ALL + SELECT + date_day, + fromTime, + toTime + FROM + dates_recent +) +SELECT + REPLACE( + date_day :: STRING, + '-', + '_' + ) AS partition_key, + date_day, + fromTime, + toTime, + {{ target.database }}.live.udf_api( + 'GET', + 'https://api.axelarscan.io', + OBJECT_CONSTRUCT(), + OBJECT_CONSTRUCT( + 'method', + 'searchTransfers', + 'fromTime', + fromTime, + 'toTime', + toTime, + 'size', + 1 + ) + ) AS request +FROM + date_combo +ORDER BY + date_day diff --git a/models/streamline/realtime/streamline__axelscan_searchgmp_realtime.sql b/models/streamline/realtime/streamline__axelscan_searchgmp_realtime.sql new file mode 100644 index 0000000..e64b534 --- /dev/null +++ b/models/streamline/realtime/streamline__axelscan_searchgmp_realtime.sql @@ -0,0 +1,75 @@ +{{ config ( + materialized = "view", + post_hook = fsc_utils.if_data_call_function_v2( + func = 'streamline.udf_bulk_rest_api_v2', + target = "{{this.schema}}.{{this.identifier}}", + params ={ "external_table" :"axelscan_searchgmp", + "sql_limit" :"100", + "producer_batch_size" :"20", + "worker_batch_size" :"20", + "sql_source" :"{{this.identifier}}", + "order_by_column": "date_day" } + ) +) }} + +WITH ids AS ( + + SELECT + _id + FROM + {{ source( + 'crosschain_silver', + 'number_sequence' + ) }} + WHERE + _id < 200000 +), +dates_hist AS ( + SELECT + b._id -1 AS id, + fromTime, + toTime, + A.date_day + FROM + {{ ref('streamline__axelscan_day_counts_gmp_complete') }} A + JOIN ids b + ON b._id <= A.day_count {# EXCEPT + SELECT + id, + fromTime, + toTime, + date_day + FROM + {{ ref('streamline__axelscan_searchgmp_complete') }} + #} +) +SELECT + REPLACE( + date_day :: STRING, + '-', + '_' + ) AS partition_key, + date_day, + id, + {{ target.database }}.live.udf_api( + 'GET', + 'https://api.gmp.axelarscan.io', + OBJECT_CONSTRUCT(), + OBJECT_CONSTRUCT( + 'method', + 'searchGMP', + 'fromTime', + fromTime, + 'toTime', + toTime, + 'from', + id, + 'size', + 1 + ) + ) AS request +FROM + dates_hist +ORDER BY + date_day, + id