This commit is contained in:
Eric Laurello 2024-11-05 14:44:45 -05:00
parent 5cc76df685
commit df766b539a
10 changed files with 405 additions and 0 deletions

View File

@ -87,3 +87,74 @@ WHERE
b.{{ partition_name }} = s.{{ partition_name }}
AND DATA :error :code IS NULL
{% endmacro %}
{% macro streamline_external_table_query_v2(
model,
partition_function
) %}
{% set days = var("BRONZE_LOOKBACK_DAYS")%}
WITH meta AS (
SELECT
last_modified AS inserted_timestamp,
file_name,
{{ partition_function }} AS partition_key
FROM
TABLE(
information_schema.external_table_file_registration_history(
start_time => DATEADD('day', -ABS({{days}}), CURRENT_TIMESTAMP()),
table_name => '{{ source( "bronze", model) }}')
) A
)
SELECT
s.*,
b.file_name,
inserted_timestamp
FROM
{{ source(
"bronze",
model
) }}
s
JOIN meta b
ON b.file_name = metadata$filename
AND b.partition_key = s.partition_key
WHERE
b.partition_key = s.partition_key
AND DATA :error IS NULL
{% endmacro %}
{% macro streamline_external_table_FR_query_v2(
model,
partition_function
) %}
WITH meta AS (
SELECT
registered_on AS inserted_timestamp,
file_name,
{{ partition_function }} AS partition_key
FROM
TABLE(
information_schema.external_table_files(
table_name => '{{ source( "bronze", model) }}'
)
) A
)
SELECT
s.*,
b.file_name,
inserted_timestamp
FROM
{{ source(
"bronze",
model
) }}
s
JOIN meta b
ON b.file_name = metadata$filename
AND b.partition_key = s.partition_key
WHERE
b.partition_key = s.partition_key
AND DATA :error IS NULL
{% endmacro %}

View File

@ -0,0 +1,7 @@
{{ config (
materialized = 'view'
) }}
{{ streamline_external_table_query_v2(
model = "axelscan_day_counts_gmp",
partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 3), '_', 1) AS INTEGER )"
) }}

View File

@ -0,0 +1,7 @@
{{ config (
materialized = 'view'
) }}
{{ streamline_external_table_FR_query_v2(
model = "axelscan_day_counts_gmp",
partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 3), '_', 1) AS INTEGER )"
) }}

View File

@ -0,0 +1,7 @@
{{ config (
materialized = 'view'
) }}
{{ streamline_external_table_query_v2(
model = "axelscan_day_counts_gmp",
partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 3), '_', 1) AS INTEGER )"
) }}

View File

@ -0,0 +1,7 @@
{{ config (
materialized = 'view'
) }}
{{ streamline_external_table_FR_query_v2(
model = "axelscan_day_counts_gmp",
partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 3), '_', 1) AS INTEGER )"
) }}

View File

@ -15,6 +15,7 @@ sources:
- name: txs_details
- name: validators
- name: blocks_test_queue
- name: axelscan_day_counts_gmp
- name: tokens
database: osmosis
schema: core

View File

@ -0,0 +1,34 @@
-- depends_on: {{ ref('bronze__axelscan_day_counts_gmp') }}
{{ config (
materialized = "incremental",
unique_key = 'date_day',
) }}
SELECT
(LEFT(partition_key, 4) || '-' || SUBSTRING(partition_key, 5, 2) || '-' || RIGHT(partition_key, 2)) :: DATE date_day,
VALUE :FROMTIME :: bigint AS fromTime,
VALUE :TOTIME :: bigint AS toTime,
DATA :total AS day_count,
{{ dbt_utils.generate_surrogate_key(
['date_day']
) }} AS axelscan_day_counts_gmp_complete_ID,
inserted_timestamp,
SYSDATE() AS modified_timestamp,
'{{ invocation_id }}' AS _invocation_id
FROM
{% if is_incremental() %}
{{ ref('bronze__axelscan_day_counts_gmp') }}
WHERE
inserted_timestamp >= (
SELECT
COALESCE(MAX(inserted_timestamp), '1970-01-01' :: DATE) inserted_timestamp
FROM
{{ this }})
{% else %}
{{ ref('bronze__axelscan_day_counts_gmp_FR') }}
{% endif %}
qualify(ROW_NUMBER() over (PARTITION BY date_day
ORDER BY
inserted_timestamp DESC)) = 1

View File

@ -0,0 +1,93 @@
{{ config (
materialized = "view",
post_hook = fsc_utils.if_data_call_function_v2(
func = 'streamline.udf_rest_api',
target = "{{this.schema}}.{{this.identifier}}",
params ={ "external_table" :"axelscan_day_counts_gmp",
"sql_limit" :"100",
"producer_batch_size" :"20",
"worker_batch_size" :"20",
"sql_source" :"{{this.identifier}}",
"order_by_column": "date_day" }
)
) }}
WITH dates_hist AS (
SELECT
A.date_day,
DATE_PART(
epoch_second,
A.date_day
) AS fromTime,
DATE_PART(epoch_second, DATEADD (DAY, 1, A.date_day)) -1 AS toTime
FROM
{{ source(
'crosschain',
'dim_dates'
) }} A {# LEFT JOIN {{ ref('streamline__axelscan_day_counts_gmp_complete') }}
b
ON A.date_day = b.date_day #}
WHERE
A.date_day BETWEEN '2022-05-09'
AND SYSDATE() :: DATE - 2 {# AND b.date_day IS NULL #}
),
dates_recent AS (
SELECT
date_day,
DATE_PART(
epoch_second,
date_day
) AS fromTime,
DATE_PART(epoch_second, DATEADD (DAY, 1, date_day)) -1 AS toTime
FROM
{{ source(
'crosschain',
'dim_dates'
) }}
WHERE
date_day BETWEEN SYSDATE() :: DATE - 1
AND SYSDATE() :: DATE
),
date_combo AS (
SELECT
date_day,
fromTime,
toTime
FROM
dates_hist
UNION ALL
SELECT
date_day,
fromTime,
toTime
FROM
dates_recent
)
SELECT
REPLACE(
date_day :: STRING,
'-'
) AS partition_key,
{# date_day, #}
fromTime,
toTime,
{{ target.database }}.live.udf_api(
'GET',
'https://api.gmp.axelarscan.io',
OBJECT_CONSTRUCT(),
OBJECT_CONSTRUCT(
'method',
'searchGMP',
'fromTime',
fromTime,
'toTime',
toTime,
'size',
1
)
) AS request
FROM
date_combo
ORDER BY
date_day

View File

@ -0,0 +1,103 @@
{{ config (
materialized = "view",
post_hook = fsc_utils.if_data_call_function_v2(
func = 'streamline.udf_bulk_rest_api_v2',
target = "{{this.schema}}.{{this.identifier}}",
params ={ "external_table" :"axelscan_day_counts_transfers",
"sql_limit" :"100",
"producer_batch_size" :"20",
"worker_batch_size" :"20",
"sql_source" :"{{this.identifier}}",
"order_by_column": "date_day" }
)
) }}
WITH dates_hist AS (
SELECT
A.date_day,
DATE_PART(
epoch_second,
A.date_day
) AS fromTime,
DATE_PART(epoch_second, DATEADD (DAY, 1, A.date_day)) -1 AS toTime
FROM
{{ source(
'crosschain',
'dim_dates'
) }} A
{% if is_incremental() %}
LEFT JOIN {{ ref('streamline__axelscan_day_counts_transfers_complete') }}
b
ON A.date_day = b.date_day
WHERE
b.date_day IS NULL
AND A.date_day BETWEEN '2021-12-23'
AND SYSDATE() :: DATE - 2
{% else %}
WHERE
A.date_day BETWEEN '2021-12-23'
AND SYSDATE() :: DATE - 2
{% endif %}
),
dates_recent AS (
SELECT
date_day,
DATE_PART(
epoch_second,
date_day
) AS fromTime,
DATE_PART(epoch_second, DATEADD (DAY, 1, date_day)) -1 AS toTime
FROM
{{ source(
'crosschain',
'dim_dates'
) }}
WHERE
date_day BETWEEN SYSDATE() :: DATE - 1
AND SYSDATE() :: DATE
),
date_combo AS (
SELECT
date_day,
fromTime,
toTime
FROM
dates_hist
UNION ALL
SELECT
date_day,
fromTime,
toTime
FROM
dates_recent
)
SELECT
REPLACE(
date_day :: STRING,
'-',
'_'
) AS partition_key,
date_day,
fromTime,
toTime,
{{ target.database }}.live.udf_api(
'GET',
'https://api.axelarscan.io',
OBJECT_CONSTRUCT(),
OBJECT_CONSTRUCT(
'method',
'searchTransfers',
'fromTime',
fromTime,
'toTime',
toTime,
'size',
1
)
) AS request
FROM
date_combo
ORDER BY
date_day

View File

@ -0,0 +1,75 @@
{{ config (
materialized = "view",
post_hook = fsc_utils.if_data_call_function_v2(
func = 'streamline.udf_bulk_rest_api_v2',
target = "{{this.schema}}.{{this.identifier}}",
params ={ "external_table" :"axelscan_searchgmp",
"sql_limit" :"100",
"producer_batch_size" :"20",
"worker_batch_size" :"20",
"sql_source" :"{{this.identifier}}",
"order_by_column": "date_day" }
)
) }}
WITH ids AS (
SELECT
_id
FROM
{{ source(
'crosschain_silver',
'number_sequence'
) }}
WHERE
_id < 200000
),
dates_hist AS (
SELECT
b._id -1 AS id,
fromTime,
toTime,
A.date_day
FROM
{{ ref('streamline__axelscan_day_counts_gmp_complete') }} A
JOIN ids b
ON b._id <= A.day_count {# EXCEPT
SELECT
id,
fromTime,
toTime,
date_day
FROM
{{ ref('streamline__axelscan_searchgmp_complete') }}
#}
)
SELECT
REPLACE(
date_day :: STRING,
'-',
'_'
) AS partition_key,
date_day,
id,
{{ target.database }}.live.udf_api(
'GET',
'https://api.gmp.axelarscan.io',
OBJECT_CONSTRUCT(),
OBJECT_CONSTRUCT(
'method',
'searchGMP',
'fromTime',
fromTime,
'toTime',
toTime,
'from',
id,
'size',
1
)
) AS request
FROM
dates_hist
ORDER BY
date_day,
id