check in bitquery work

This commit is contained in:
Eric Laurello 2025-03-04 16:33:15 -05:00
parent f508022d56
commit 2a797067d9
10 changed files with 337 additions and 10 deletions

View File

@ -50,9 +50,33 @@ on-run-end:
vars:
"dbt_date:time_zone": GMT
UPDATE_UDFS_AND_SPS: False
OBSERV_FULL_TEST: False
START_GHA_TASKS: False
STREAMLINE_INVOKE_STREAMS: True
STREAMLINE_USE_DEV_FOR_EXTERNAL_TABLES: True
STREAMLINE_RUN_HISTORY: False
STREAMLINE_RETRY_UNKNOWN: False
UPDATE_SNOWFLAKE_TAGS: True
UPDATE_UDFS_AND_SPS: False
API_INTEGRATION: '{{ var("config")[target.name]["API_INTEGRATION"] if var("config")[target.name] else var("config")["dev"]["API_INTEGRATION"] }}'
EXTERNAL_FUNCTION_URI: '{{ var("config")[target.name]["EXTERNAL_FUNCTION_URI"] if var("config")[target.name] else var("config")["dev"]["EXTERNAL_FUNCTION_URI"] }}'
ROLES: |
["INTERNAL_DEV"]
["INTERNAL_DEV"]
config:
dev:
API_INTEGRATION: AWS_EXTERNAL_API_STG_V2
EXTERNAL_FUNCTION_URI: qoupd0givh.execute-api.us-east-1.amazonaws.com/stg/
ROLES:
- AWS_LAMBDA_EXTERNAL_API
- INTERNAL_DEV
prod:
API_INTEGRATION: AWS_EXTERNAL_API_PRO_V2
EXTERNAL_FUNCTION_URI:
ROLES:
- AWS_LAMBDA_EXTERNAL_API
- INTERNAL_DEV
- DBT_CLOUD_TON

View File

@ -1,8 +1,8 @@
-- {% macro create_udfs() %}
-- {% if var("UPDATE_UDFS_AND_SPS") %}
-- {% set sql %}
-- CREATE schema if NOT EXISTS silver;
-- {% endset %}
-- {% do run_query(sql) %}
-- {% endif %}
-- {% endmacro %}
{% macro create_udfs() %}
{% if var("UPDATE_UDFS_AND_SPS") %}
{% set sql %}
{{ create_udf_bulk_rest_api_v2() }};
{% endset %}
{% do run_query(sql) %}
{% endif %}
{% endmacro %}

View File

@ -0,0 +1,83 @@
{% macro streamline_external_table_query_v2(
model,
partition_function,
partition_name,
other_cols
) %}
WITH meta AS (
SELECT
LAST_MODIFIED::timestamp_ntz AS _inserted_timestamp,
file_name,
{{ partition_function }} AS {{ partition_name }}
FROM
TABLE(
information_schema.external_table_file_registration_history(
start_time => DATEADD('day', -3, CURRENT_TIMESTAMP()),
table_name => '{{ source( "bronze_streamline", model) }}')
) A
)
SELECT
{{ other_cols }},
DATA,
_inserted_timestamp,
s.{{ partition_name }},
s.value AS VALUE,
file_name
FROM
{{ source(
"bronze_streamline",
model
) }}
s
JOIN
meta b
ON b.file_name = metadata$filename
AND b.{{ partition_name }} = s.{{ partition_name }}
WHERE
b.{{ partition_name }} = s.{{ partition_name }}
AND (
data:error:code IS NULL
)
{% endmacro %}
{% macro streamline_external_table_FR_query_v2(
model,
partition_function,
partition_name,
other_cols
) %}
WITH meta AS (
SELECT
LAST_MODIFIED::timestamp_ntz AS _inserted_timestamp,
file_name,
{{ partition_function }} AS {{ partition_name }}
FROM
TABLE(
information_schema.external_table_files(
table_name => '{{ source( "bronze_streamline", model) }}'
)
) A
)
SELECT
{{ other_cols }},
DATA,
_inserted_timestamp,
s.{{ partition_name }},
s.value AS VALUE,
file_name
FROM
{{ source(
"bronze_streamline",
model
) }}
s
JOIN
meta b
ON b.file_name = metadata$filename
AND b.{{ partition_name }} = s.{{ partition_name }}
WHERE
b.{{ partition_name }} = s.{{ partition_name }}
AND (
data:error:code IS NULL
)
{% endmacro %}

View File

@ -0,0 +1,10 @@
{% macro create_udf_bulk_rest_api_v2() %}
CREATE
OR REPLACE EXTERNAL FUNCTION streamline.udf_bulk_rest_api_v2(
json OBJECT
) returns ARRAY {% if target.database == 'EXTERNAL:' -%}
api_integration = aws_external_api_prod AS ''
{% else %}
api_integration = aws_external_api_stg_v2 AS 'https://qoupd0givh.execute-api.us-east-1.amazonaws.com/stg/udf_bulk_rest_api'
{%- endif %}
{% endmacro %}

View File

@ -0,0 +1,9 @@
{{ config (
materialized = 'view'
) }}
{{ streamline_external_table_query_v2(
model = 'bitquery',
partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 3), '_', 1) AS INTEGER)",
partition_name = "partition_key",
other_cols = "to_date(value:DATE_DAY::STRING,'YYYYMMDD') AS DATE_DAY, value:BLOCKCHAIN::STRING AS BLOCKCHAIN, value:METRIC::STRING AS METRIC"
) }}

View File

@ -0,0 +1,9 @@
{{ config (
materialized = 'view'
) }}
{{ streamline_external_table_FR_query_v2(
model = 'bitquery',
partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 3), '_', 1) AS INTEGER)",
partition_name = "partition_key",
other_cols = "to_date(value:DATE_DAY::STRING,'YYYYMMDD') AS DATE_DAY, value:BLOCKCHAIN::STRING AS BLOCKCHAIN, value:METRIC::STRING AS METRIC"
) }}

View File

@ -0,0 +1,72 @@
{{ config (
materialized = "view",
tags = ['streamline_view']
) }}
WITH metrics AS (
SELECT
'hedera' AS blockchain,
'tx_count' AS metric,
'query ($network: HederaNetwork!, $dateFormat: String!, $from: ISO8601DateTime, $till: ISO8601DateTime) {hedera(network: $network) {transactions(options: {asc: "date.date"}, date: {since: $from, till: $till}) {date {date(format: $dateFormat)} count: countBigInt}}}' AS query_text
UNION ALL
SELECT
'ripple' AS blockchain,
'tx_count' AS metric,
'' AS query_text
UNION ALL
--idk how we're defining active users - thinking it will just be the unique count over the last 30 days
SELECT
'hedera' AS blockchain,
'active_users' AS metric,
'query ($network: HederaNetwork!, $dateFormat: String!, $from: ISO8601DateTime, $till: ISO8601DateTime) {hedera(network: $network) {transactions(options: {asc: "date.date"}, date: {since: $from, till: $till}) count: countBigInt(uniq: payer_account)}}}' AS query_text
UNION ALL
SELECT
'ripple' AS blockchain,
'active_users' AS metric,
'' AS query_text
)
SELECT
date_day,
DATEADD(
'day',
-30,
date_day
) AS date_day_minus_30,
blockchain,
metric,
CASE
WHEN blockchain = 'ripple'
AND metric = 'tx_count' THEN '{ripple(network: ripple) {transactions(date: {after: "' || date_day || '"}) {countBigInt(hash: {}, date: {after: "' || date_day || '"}) date {date}}}}'
WHEN blockchain = 'ripple'
AND metric = 'active_users' THEN '{ripple(network: ripple) {transactions(date: {after: "' || date_day_minus_30 || '"}) {countBigInt(date: {after: "' || date_day_minus_30 || '"}, uniq: senders) }}}'
ELSE query_text
END AS query_text,
CASE
WHEN blockchain = 'hedera' THEN OBJECT_CONSTRUCT(
'limit',
'1',
'offset',
'0',
'network',
blockchain,
'from',
CASE
WHEN metric = 'active_users' THEN date_day_minus_30
ELSE date_day
END,
'till',
date_day,
'dateFormat',
'%Y-%m-%d'
)
END AS variables
FROM
{{ source(
'crosschain_core',
'dim_dates'
) }}
CROSS JOIN metrics
WHERE
date_day >= '2025-01-01'
AND date_day < SYSDATE() :: DATE

View File

@ -0,0 +1,64 @@
{{ config (
materialized = "view",
post_hook = fsc_utils.if_data_call_function_v2(
func = 'streamline.udf_bulk_rest_api_v2',
target = "{{this.schema}}.{{this.identifier}}",
params ={ "external_table" :"bitquery",
"sql_limit" :"100",
"producer_batch_size" :"100",
"worker_batch_size" :"100",
"async_concurrent_requests": "10",
"sql_source" :"{{this.identifier}}",
"order_by_column": "date_day" }
),
tags = ['streamline_realtime']
) }}
WITH metrics AS (
SELECT
date_day,
blockchain,
metric,
query_text,
variables
FROM
{{ ref("streamline__bitquery_metrics") }}
qualify ROW_NUMBER() over (
PARTITION BY blockchain,
metric
ORDER BY
date_day DESC
) = 3 {# EXCEPT
SELECT
date_day,
blockchain,
metric
FROM
{{ ref("streamline__blocks_complete") }}
#}
)
SELECT
TO_NUMBER(to_char(date_day, 'YYYYMMDD')) AS date_day,
blockchain,
metric,
TO_NUMBER(to_char(SYSDATE() :: DATE, 'YYYYMMDD')) AS partition_key,
{{ target.database }}.live.udf_api(
'POST',
'https://graphql.bitquery.io',
OBJECT_CONSTRUCT(
'Content-Type',
'application/json',
'Authorization',
'Bearer {Authentication}'
),
OBJECT_CONSTRUCT(
'query',
query_text,
'variables',
variables
),
'Vault/prod/bitquery'
) AS request
FROM
metrics

View File

@ -0,0 +1,46 @@
-- depends_on: {{ ref('bronze__bitquery_FR') }}
-- depends_on: {{ ref('bronze__bitquery') }}
{{ config (
materialized = "incremental",
unique_key = [' date_day','blockchain','metric'],
merge_exclude_columns = ["inserted_timestamp"],
tags = ['streamline_realtime'],
enabled = false
) }}
SELECT
date_day,
blockchain,
metric,
partition_key,
_inserted_timestamp,
SYSDATE() AS inserted_timestamp,
SYSDATE() AS modified_timestamp,
file_name,
'{{ invocation_id }}' AS _invocation_id,
FROM
{% if is_incremental() %}
{{ ref('bronze__bitquery') }}
{% else %}
{{ ref('bronze__bitquery_FR') }}
{% endif %}
WHERE
DATA :ok = TRUE
{% if is_incremental() %}
AND _inserted_timestamp >= (
SELECT
COALESCE(MAX(_INSERTED_TIMESTAMP), '1970-01-01' :: DATE) max_INSERTED_TIMESTAMP
FROM
{{ this }})
AND DATA IS NOT NULL
{% endif %}
qualify ROW_NUMBER() over (
PARTITION BY sequence_number,
shard,
workchain
ORDER BY
_inserted_timestamp DESC
) = 1

View File

@ -1,6 +1,11 @@
version: 2
sources:
- name: bronze_streamline
database: streamline
schema: "{{ 'external' if target.database == 'EXTERNAL' else 'external_dev' }}"
tables:
- name: bitquery
- name: tokenflow_starknet_l1_data
database: tokenflow_starknet
schema: l1_data
@ -37,6 +42,11 @@ sources:
schema: silver
tables:
- name: apis_keys
- name: crosschain_core
database: crosschain
schema: core
tables:
- name: dim_dates
- name: ethereum_silver
database: ethereum
schema: silver