AN-5831 bitquery (#91)

Co-authored-by: San Yong <22216004+SanYongxie@users.noreply.github.com>
Co-authored-by: gregoriustanleyy <gstanleytejakusuma@gmail.com>
This commit is contained in:
eric-laurello 2025-03-14 13:45:03 -04:00 committed by GitHub
parent a9b6ef8e14
commit 20a1126d5b
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
16 changed files with 654 additions and 11 deletions

View File

@ -44,4 +44,4 @@ jobs:
dbt deps
- name: Run DBT Jobs
run: |
dbt run -m tag:defillama tag:deepnftvalue tag:core tag:blast tag:polymarket --exclude models/defillama/bronze/bronze__defillama_stablecoin_supply.sql+
dbt run -m external_models,tag:defillama external_models,tag:deepnftvalue external_models,tag:core external_models,tag:blast external_models,tag:polymarket external_models,tag:bitquery --exclude models/defillama/bronze/bronze__defillama_stablecoin_supply.sql+

View File

@ -0,0 +1,47 @@
name: dbt_run_streamline_daily
run-name: dbt_run_streamline_daily
on:
workflow_dispatch:
schedule:
# Runs "at 08:00 UTC" every day (see https://crontab.guru)
- cron: '30 7 * * *'
env:
DBT_PROFILES_DIR: ${{ vars.DBT_PROFILES_DIR }}
ACCOUNT: "${{ vars.ACCOUNT }}"
ROLE: "${{ vars.ROLE }}"
USER: "${{ vars.USER }}"
PASSWORD: "${{ secrets.PASSWORD }}"
REGION: "${{ vars.REGION }}"
DATABASE: "${{ vars.DATABASE }}"
WAREHOUSE: "${{ vars.WAREHOUSE }}"
SCHEMA: "${{ vars.SCHEMA }}"
concurrency:
group: ${{ github.workflow }}
jobs:
run_dbt_jobs:
runs-on: ubuntu-latest
environment:
name: workflow_prod
steps:
- uses: actions/checkout@v3
- uses: actions/setup-python@v4
with:
python-version: "3.10"
cache: "pip"
- name: install dependencies
run: |
pip install -r requirements.txt
dbt deps
- name: Run DBT Jobs
run: |
dbt run --vars '{"STREAMLINE_INVOKE_STREAMS":True}' -m 1+models/bitquery/streamline/streamline__bitquery_realtime.sql

View File

@ -50,9 +50,33 @@ on-run-end:
vars:
"dbt_date:time_zone": GMT
UPDATE_UDFS_AND_SPS: False
OBSERV_FULL_TEST: False
START_GHA_TASKS: False
STREAMLINE_INVOKE_STREAMS: False
STREAMLINE_USE_DEV_FOR_EXTERNAL_TABLES: False
STREAMLINE_RUN_HISTORY: False
STREAMLINE_RETRY_UNKNOWN: False
UPDATE_SNOWFLAKE_TAGS: True
UPDATE_UDFS_AND_SPS: False
API_INTEGRATION: '{{ var("config")[target.name]["API_INTEGRATION"] if var("config")[target.name] else var("config")["dev"]["API_INTEGRATION"] }}'
EXTERNAL_FUNCTION_URI: '{{ var("config")[target.name]["EXTERNAL_FUNCTION_URI"] if var("config")[target.name] else var("config")["dev"]["EXTERNAL_FUNCTION_URI"] }}'
ROLES: |
["INTERNAL_DEV"]
["INTERNAL_DEV"]
config:
dev:
API_INTEGRATION: AWS_EXTERNAL_API_STG_V2
EXTERNAL_FUNCTION_URI: qoupd0givh.execute-api.us-east-1.amazonaws.com/stg/
ROLES:
- AWS_LAMBDA_EXTERNAL_API
- INTERNAL_DEV
prod:
API_INTEGRATION: AWS_EXTERNAL_API_PRO_V2
EXTERNAL_FUNCTION_URI: zv7a5qfhv9.execute-api.us-east-1.amazonaws.com/prod/
ROLES:
- AWS_LAMBDA_EXTERNAL_API
- INTERNAL_DEV
- DBT_CLOUD_EXTERNAL

View File

@ -1,8 +1,8 @@
-- {% macro create_udfs() %}
-- {% if var("UPDATE_UDFS_AND_SPS") %}
-- {% set sql %}
-- CREATE schema if NOT EXISTS silver;
-- {% endset %}
-- {% do run_query(sql) %}
-- {% endif %}
-- {% endmacro %}
{% macro create_udfs() %}
{% if var("UPDATE_UDFS_AND_SPS") %}
{% set sql %}
{{ create_udf_bulk_rest_api_v2() }};
{% endset %}
{% do run_query(sql) %}
{% endif %}
{% endmacro %}

View File

@ -0,0 +1,83 @@
{% macro streamline_external_table_query_v2(
model,
partition_function,
partition_name,
other_cols
) %}
WITH meta AS (
SELECT
LAST_MODIFIED::timestamp_ntz AS _inserted_timestamp,
file_name,
{{ partition_function }} AS {{ partition_name }}
FROM
TABLE(
information_schema.external_table_file_registration_history(
start_time => DATEADD('day', -3, CURRENT_TIMESTAMP()),
table_name => '{{ source( "bronze_streamline", model) }}')
) A
)
SELECT
{{ other_cols }},
DATA,
_inserted_timestamp,
s.{{ partition_name }},
s.value AS VALUE,
file_name
FROM
{{ source(
"bronze_streamline",
model
) }}
s
JOIN
meta b
ON b.file_name = metadata$filename
AND b.{{ partition_name }} = s.{{ partition_name }}
WHERE
b.{{ partition_name }} = s.{{ partition_name }}
AND (
data:error:code IS NULL
)
{% endmacro %}
{% macro streamline_external_table_FR_query_v2(
model,
partition_function,
partition_name,
other_cols
) %}
WITH meta AS (
SELECT
LAST_MODIFIED::timestamp_ntz AS _inserted_timestamp,
file_name,
{{ partition_function }} AS {{ partition_name }}
FROM
TABLE(
information_schema.external_table_files(
table_name => '{{ source( "bronze_streamline", model) }}'
)
) A
)
SELECT
{{ other_cols }},
DATA,
_inserted_timestamp,
s.{{ partition_name }},
s.value AS VALUE,
file_name
FROM
{{ source(
"bronze_streamline",
model
) }}
s
JOIN
meta b
ON b.file_name = metadata$filename
AND b.{{ partition_name }} = s.{{ partition_name }}
WHERE
b.{{ partition_name }} = s.{{ partition_name }}
AND (
data:error:code IS NULL
)
{% endmacro %}

View File

@ -0,0 +1,10 @@
{% macro create_udf_bulk_rest_api_v2() %}
CREATE
OR REPLACE EXTERNAL FUNCTION streamline.udf_bulk_rest_api_v2(
json OBJECT
) returns ARRAY {% if target.database == 'EXTERNAL:' -%}
api_integration = aws_external_api_prod AS 'https://zv7a5qfhv9.execute-api.us-east-1.amazonaws.com/prod/udf_bulk_rest_api'
{% else %}
api_integration = aws_external_api_stg_v2 AS 'https://qoupd0givh.execute-api.us-east-1.amazonaws.com/stg/udf_bulk_rest_api'
{%- endif %}
{% endmacro %}

View File

@ -0,0 +1,9 @@
{{ config (
materialized = 'view'
) }}
{{ streamline_external_table_query_v2(
model = 'bitquery',
partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 3), '_', 1) AS INTEGER)",
partition_name = "partition_key",
other_cols = "to_date(value:DATE_DAY::STRING,'YYYYMMDD') AS DATE_DAY, to_date(value:DATE_DAY::STRING,'YYYYMMDD') AS DATE_DAY, value:BLOCKCHAIN::STRING AS BLOCKCHAIN, value:METRIC::STRING AS METRIC"
) }}

View File

@ -0,0 +1,9 @@
{{ config (
materialized = 'view'
) }}
{{ streamline_external_table_FR_query_v2(
model = 'bitquery',
partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 3), '_', 1) AS INTEGER)",
partition_name = "partition_key",
other_cols = "to_date(value:DATE_DAY::STRING,'YYYYMMDD') AS DATE_DAY, value:BLOCKCHAIN::STRING AS BLOCKCHAIN, value:METRIC::STRING AS METRIC"
) }}

View File

@ -0,0 +1,75 @@
-- depends_on: {{ ref('bronze__bitquery') }}
{{ config(
materialized = 'incremental',
unique_key = ['blockchain', 'metric', 'as_of_date'],
tags = ['bitquery']
) }}
WITH base AS(
SELECT
A.blockchain,
A.metric,
A.date_Day AS as_of_date,
COALESCE(
REGEXP_SUBSTR(
A.data,
'"countBigInt"\\s*:\\s*"([^"]+)"',
1,
1,
'e',
1
),
REGEXP_SUBSTR(
A.data,
'"senders"\\s*:\\s*"([^"]+)"',
1,
1,
'e',
1
)
) active_users,
A._inserted_timestamp
FROM
{% if is_incremental() %}
{{ ref('bronze__bitquery') }}
{% else %}
{{ ref('bronze__bitquery_FR') }}
{% endif %}
A
WHERE
A.data :errors IS NULL
AND A.metric = 'active_users'
AND active_users IS NOT NULL
{% if is_incremental() %}
AND _inserted_timestamp > (
SELECT
MAX(_inserted_timestamp)
FROM
{{ this }}
)
{% endif %}
)
SELECT
blockchain,
metric,
as_of_date,
active_users,
_inserted_timestamp,
{{ dbt_utils.generate_surrogate_key(
['blockchain','metric','as_of_date']
) }} AS active_users_id,
SYSDATE() AS inserted_timestamp,
SYSDATE() AS modified_timestamp,
'{{ invocation_id }}' AS _invocation_id
FROM
base qualify ROW_NUMBER() over (
PARTITION BY blockchain,
metric,
as_of_date
ORDER BY
_inserted_timestamp DESC
) = 1

View File

@ -0,0 +1,37 @@
version: 2
models:
- name: bitquery__active_users
description: >
This model returns the number of active users for each blockchain and as of date. The distinct number of users is from the 30 days prior to the as of date.
tests:
- dbt_utils.unique_combination_of_columns:
combination_of_columns:
- BLOCKCHAIN
- METRIC
- AS_OF_DATE
columns:
- name: BLOCKCHAIN
description: >
The blockchain where the active users are from.
tests:
- not_null
- name: METRIC
description: >
The metric name - always 'active_users'.
tests:
- not_null
- name: AS_OF_DATE
description: >
The date when the active users are counted.
tests:
- not_null
- dbt_expectations.expect_row_values_to_have_recent_data:
datepart: day
interval: 2
- name: ACTIVE_USERS
description: >
The number of active users. The distinct number of users is from the 30 days prior to the as of date.
tests:
- not_null

View File

@ -0,0 +1,72 @@
-- depends_on: {{ ref('bronze__bitquery') }}
{{ config(
materialized = 'incremental',
unique_key = ['blockchain', 'metric', 'block_date'],
tags = ['bitquery']
) }}
WITH base AS (
SELECT
A.blockchain,
A.metric,
REGEXP_SUBSTR(
A.data,
'"date"\\s*:\\s*\\{\\s*"date"\\s*:\\s*"([^"]+)"',
1,
1,
'e',
1
) AS block_date,
REGEXP_SUBSTR(
A.data,
'"countBigInt"\\s*:\\s*"([^"]+)"',
1,
1,
'e',
1
) AS tx_count,
A._inserted_timestamp
FROM
{% if is_incremental() %}
{{ ref('bronze__bitquery') }}
{% else %}
{{ ref('bronze__bitquery_FR') }}
{% endif %}
A
WHERE
A.data :errors IS NULL
AND A.metric = 'tx_count'
AND tx_count IS NOT NULL
{% if is_incremental() %}
AND _inserted_timestamp > (
SELECT
MAX(_inserted_timestamp)
FROM
{{ this }}
)
{% endif %}
)
SELECT
blockchain,
metric,
block_date,
tx_count,
_inserted_timestamp,
{{ dbt_utils.generate_surrogate_key(
['blockchain','metric','block_date']
) }} AS accounts_id,
SYSDATE() AS inserted_timestamp,
SYSDATE() AS modified_timestamp,
'{{ invocation_id }}' AS _invocation_id
FROM
base qualify ROW_NUMBER() over (
PARTITION BY blockchain,
metric,
block_date
ORDER BY
_inserted_timestamp DESC
) = 1

View File

@ -0,0 +1,36 @@
version: 2
models:
- name: bitquery__tx_count
description: >
This model returns the number of transactions for each blockchain and block date.
tests:
- dbt_utils.unique_combination_of_columns:
combination_of_columns:
- BLOCKCHAIN
- METRIC
- BLOCK_DATE
columns:
- name: BLOCKCHAIN
description: >
The blockchain where the transactions are from.
tests:
- not_null
- name: METRIC
description: >
The metric name - always 'tx_count'.
tests:
- not_null
- name: BLOCK_DATE
description: >
The date when the transactions are counted.
tests:
- not_null
- dbt_expectations.expect_row_values_to_have_recent_data:
datepart: day
interval: 2
- name: TX_COUNT
description: >
The number of transactions for this date.
tests:
- not_null

View File

@ -0,0 +1,44 @@
-- depends_on: {{ ref('bronze__bitquery_FR') }}
-- depends_on: {{ ref('bronze__bitquery') }}
{{ config (
materialized = "incremental",
unique_key = [' date_day','blockchain','metric'],
merge_exclude_columns = ["inserted_timestamp"],
tags = ['streamline_realtime']
) }}
SELECT
date_day,
blockchain,
metric,
partition_key,
_inserted_timestamp,
SYSDATE() AS inserted_timestamp,
SYSDATE() AS modified_timestamp,
file_name,
'{{ invocation_id }}' AS _invocation_id,
FROM
{% if is_incremental() %}
{{ ref('bronze__bitquery') }}
{% else %}
{{ ref('bronze__bitquery_FR') }}
{% endif %}
WHERE
DATA :errors IS NULL
{% if is_incremental() %}
AND _inserted_timestamp >= (
SELECT
COALESCE(MAX(_INSERTED_TIMESTAMP), '1970-01-01' :: DATE) max_INSERTED_TIMESTAMP
FROM
{{ this }})
{% endif %}
qualify ROW_NUMBER() over (
PARTITION BY date_day,
blockchain,
metric
ORDER BY
_inserted_timestamp DESC
) = 1

View File

@ -0,0 +1,128 @@
{{ config (
materialized = "view",
tags = ['streamline_view']
) }}
WITH metrics AS (
SELECT
'hedera' AS blockchain,
'tx_count' AS metric,
'query ($network: HederaNetwork!, $dateFormat: String!, $from: ISO8601DateTime, $till: ISO8601DateTime) { hedera(network: $network) { transactions(options: {asc: "date.date"}, date: {since: $from, till: $till}) { date {date(format: $dateFormat) } countBigInt}} }' AS query_text,
'Count of tx hashes by day' AS description
UNION ALL
SELECT
'ripple' AS blockchain,
'tx_count' AS metric,
'query ($network: RippleNetwork!, $dateFormat: String!, $from: ISO8601DateTime, $till: ISO8601DateTime) { ripple(network: $network){ transactions(options: {asc: "date.date"}, date: {since: $from, till: $till}) { date: date { date(format: $dateFormat) } countBigInt }} }' AS query_text,
'Count of tx hashes by day' AS description
UNION ALL
SELECT
'moonbeam' AS blockchain,
'tx_count' AS metric,
'query ($network: EthereumNetwork!, $dateFormat: String!, $from: ISO8601DateTime, $till: ISO8601DateTime) { ethereum(network: $network){ transactions(options: {asc: "date.date"}, date: {since: $from, till: $till}) { date: date { date(format: $dateFormat) } countBigInt }} }' AS query_text,
'Count of tx hashes by day' AS description
UNION ALL
SELECT
'celo_mainnet' AS blockchain,
'tx_count' AS metric,
'query ($network: EthereumNetwork!, $dateFormat: String!, $from: ISO8601DateTime, $till: ISO8601DateTime) { ethereum(network: $network){ transactions(options: {asc: "date.date"}, date: {since: $from, till: $till}) { date: date { date(format: $dateFormat) } countBigInt }} }' AS query_text,
'Count of tx hashes by day' AS description
UNION ALL
SELECT
'algorand' AS blockchain,
'tx_count' AS metric,
'query ($network: AlgorandNetwork!, $dateFormat: String!, $from: ISO8601DateTime, $till: ISO8601DateTime) { algorand(network: $network) { transactions(options: {asc: "date.date"}, date: {till: $till, since: $from} ) { date: date { date(format: $dateFormat) } countBigInt }} }' AS query_text,
'Count of tx hashes by day' AS description
UNION ALL
SELECT
'filecoin' AS blockchain,
'tx_count' AS metric,
'query ($network: FilecoinNetwork!, $dateFormat: String!, $from: ISO8601DateTime, $till: ISO8601DateTime) { filecoin(network: $network) { messages(options: {asc: "date.date"}, date: {since: $from, till: $till}) { date: date { date(format: $dateFormat) } countBigInt }} }' AS query_text,
'Count of messages by day' AS description
UNION ALL
SELECT
'cardano' AS blockchain,
'tx_count' AS metric,
'query ($network: CardanoNetwork!, $dateFormat: String!, $from: ISO8601DateTime, $till: ISO8601DateTime) { cardano(network: $network) { transactions(options: {asc: "date.date"}, date: {since: $from, till: $till}) { date: date { date(format: $dateFormat) } countBigInt }} }' AS query_text,
'Count of messages by day' AS description
UNION ALL
SELECT
'hedera' AS blockchain,
'active_users' AS metric,
'query ($network: HederaNetwork!, $from: ISO8601DateTime, $till: ISO8601DateTime) {hedera(network: $network) {transactions(date: {since: $from, till: $till}) { countBigInt(uniq: payer_account) } } }' AS query_text,
'distinct counts of payer accounts over the last 30 days' AS description
UNION ALL
SELECT
'ripple' AS blockchain,
'active_users' AS metric,
'query ($network: RippleNetwork!, $from: ISO8601DateTime, $till: ISO8601DateTime) { ripple(network: $network) { transactions( date: {since: $from, till: $till}) { countBigInt(uniq: senders) } } } ' AS query_text,
'distinct counts of senders over the last 30 days' AS description
UNION ALL
SELECT
'moonbeam' AS blockchain,
'active_users' AS metric,
'query ($network: EthereumNetwork!, $from: ISO8601DateTime, $till: ISO8601DateTime) { ethereum(network: $network){ transactions(date: {since: $from, till: $till}) { countBigInt(uniq: senders) }} }' AS query_text,
'distinct counts of senders over the last 30 days' AS description
UNION ALL
SELECT
'celo_mainnet' AS blockchain,
'active_users' AS metric,
'query ($network: EthereumNetwork!, $from: ISO8601DateTime, $till: ISO8601DateTime) { ethereum(network: $network){ transactions(date: {since: $from, till: $till}) { countBigInt(uniq: senders) }} }' AS query_text,
'distinct counts of senders over the last 30 days' AS description
UNION ALL
SELECT
'algorand' AS blockchain,
'active_users' AS metric,
'query ($network: AlgorandNetwork!, $from: ISO8601DateTime, $till: ISO8601DateTime) { algorand(network: $network) { transactions( date: {since: $from, till: $till} ) { countBigInt(uniq: senders) }} }' AS query_text,
'distinct counts of senders over the last 30 days' AS description
UNION ALL
SELECT
'filecoin' AS blockchain,
'active_users' AS metric,
'query ($network: FilecoinNetwork!, $from: ISO8601DateTime, $till: ISO8601DateTime) { filecoin(network: $network) { messages(date: {since: $from, till: $till}) {senders: countBigInt(uniq: senders) }} }' AS query_text,
'distinct counts of message senders over the last 30 days' AS description
UNION ALL
SELECT
'cardano' AS blockchain,
'active_users' AS metric,
'query ($network: CardanoNetwork!, $from: ISO8601DateTime, $till: ISO8601DateTime) { cardano(network: $network) { activeAddresses(date: {since: $from, till: $till}) { countBigInt(uniq: address) }} }' AS query_text,
'distinct counts of addresses over the last 30 days' AS description
)
SELECT
date_day,
DATEADD(
'day',
-30,
date_day
) AS date_day_minus_30,
blockchain,
metric,
query_text,
OBJECT_CONSTRUCT(
'limit',
'1',
'offset',
'0',
'network',
blockchain,
'from',
CASE
WHEN metric = 'active_users' THEN date_day_minus_30
ELSE date_day
END,
'till',
date_day,
'dateFormat',
'%Y-%m-%d'
) AS variables,
description
FROM
{{ source(
'crosschain_core',
'dim_dates'
) }}
CROSS JOIN metrics
WHERE
date_day >= '2025-01-01'
AND date_day < SYSDATE() :: DATE

View File

@ -0,0 +1,59 @@
{{ config (
materialized = "view",
post_hook = fsc_utils.if_data_call_function_v2(
func = 'streamline.udf_bulk_rest_api_v2',
target = "{{this.schema}}.{{this.identifier}}",
params ={ "external_table" :"bitquery",
"sql_limit" :"100",
"producer_batch_size" :"100",
"worker_batch_size" :"100",
"async_concurrent_requests": "10",
"sql_source" :"{{this.identifier}}",
"order_by_column": "date_day" }
),
tags = ['streamline_realtime']
) }}
WITH metrics AS (
SELECT
date_day,
blockchain,
metric,
query_text,
variables
FROM
{{ ref("streamline__bitquery_metrics") }} A
LEFT JOIN {{ ref("streamline__bitquery_complete") }}
b USING (
blockchain,
metric,
date_day
)
WHERE
b._invocation_id IS NULL
)
SELECT
TO_NUMBER(to_char(date_day, 'YYYYMMDD')) AS date_day,
blockchain,
metric,
TO_NUMBER(to_char(SYSDATE() :: DATE, 'YYYYMMDD')) AS partition_key,
{{ target.database }}.live.udf_api(
'POST',
'https://graphql.bitquery.io',
OBJECT_CONSTRUCT(
'Content-Type',
'application/json',
'Authorization',
'Bearer {Authentication}'
),
OBJECT_CONSTRUCT(
'query',
query_text,
'variables',
variables
),
'Vault/prod/external/bitquery'
) AS request
FROM
metrics

View File

@ -1,6 +1,11 @@
version: 2
sources:
- name: bronze_streamline
database: streamline
schema: "{{ 'external' if target.database == 'EXTERNAL' else 'external_dev' }}"
tables:
- name: bitquery
- name: tokenflow_starknet_l1_data
database: tokenflow_starknet
schema: l1_data
@ -37,6 +42,11 @@ sources:
schema: silver
tables:
- name: apis_keys
- name: crosschain_core
database: crosschain
schema: core
tables:
- name: dim_dates
- name: ethereum_silver
database: ethereum
schema: silver