diff --git a/.github/workflows/dbt_run_daily.yml b/.github/workflows/dbt_run_daily.yml index 08c7a0f..6a9cb1c 100644 --- a/.github/workflows/dbt_run_daily.yml +++ b/.github/workflows/dbt_run_daily.yml @@ -44,4 +44,4 @@ jobs: dbt deps - name: Run DBT Jobs run: | - dbt run -m tag:defillama tag:deepnftvalue tag:core tag:blast tag:polymarket --exclude models/defillama/bronze/bronze__defillama_stablecoin_supply.sql+ \ No newline at end of file + dbt run -m external_models,tag:defillama external_models,tag:deepnftvalue external_models,tag:core external_models,tag:blast external_models,tag:polymarket external_models,tag:bitquery --exclude models/defillama/bronze/bronze__defillama_stablecoin_supply.sql+ \ No newline at end of file diff --git a/.github/workflows/dbt_run_streamline_daily.yml b/.github/workflows/dbt_run_streamline_daily.yml new file mode 100644 index 0000000..af037b0 --- /dev/null +++ b/.github/workflows/dbt_run_streamline_daily.yml @@ -0,0 +1,47 @@ +name: dbt_run_streamline_daily +run-name: dbt_run_streamline_daily + +on: + workflow_dispatch: + schedule: + # Runs "at 08:00 UTC" every day (see https://crontab.guru) + - cron: '30 7 * * *' + +env: + DBT_PROFILES_DIR: ${{ vars.DBT_PROFILES_DIR }} + + ACCOUNT: "${{ vars.ACCOUNT }}" + ROLE: "${{ vars.ROLE }}" + USER: "${{ vars.USER }}" + PASSWORD: "${{ secrets.PASSWORD }}" + REGION: "${{ vars.REGION }}" + DATABASE: "${{ vars.DATABASE }}" + WAREHOUSE: "${{ vars.WAREHOUSE }}" + SCHEMA: "${{ vars.SCHEMA }}" + +concurrency: + group: ${{ github.workflow }} + + + +jobs: + run_dbt_jobs: + runs-on: ubuntu-latest + environment: + name: workflow_prod + + steps: + - uses: actions/checkout@v3 + + - uses: actions/setup-python@v4 + with: + python-version: "3.10" + cache: "pip" + + - name: install dependencies + run: | + pip install -r requirements.txt + dbt deps + - name: Run DBT Jobs + run: | + dbt run --vars '{"STREAMLINE_INVOKE_STREAMS":True}' -m 1+models/bitquery/streamline/streamline__bitquery_realtime.sql \ No newline at end of file diff --git a/dbt_project.yml b/dbt_project.yml index 3185a1f..1f24842 100644 --- a/dbt_project.yml +++ b/dbt_project.yml @@ -50,9 +50,33 @@ on-run-end: vars: "dbt_date:time_zone": GMT - UPDATE_UDFS_AND_SPS: False + OBSERV_FULL_TEST: False + START_GHA_TASKS: False + STREAMLINE_INVOKE_STREAMS: False + STREAMLINE_USE_DEV_FOR_EXTERNAL_TABLES: False + STREAMLINE_RUN_HISTORY: False + STREAMLINE_RETRY_UNKNOWN: False UPDATE_SNOWFLAKE_TAGS: True + UPDATE_UDFS_AND_SPS: False + API_INTEGRATION: '{{ var("config")[target.name]["API_INTEGRATION"] if var("config")[target.name] else var("config")["dev"]["API_INTEGRATION"] }}' EXTERNAL_FUNCTION_URI: '{{ var("config")[target.name]["EXTERNAL_FUNCTION_URI"] if var("config")[target.name] else var("config")["dev"]["EXTERNAL_FUNCTION_URI"] }}' ROLES: | - ["INTERNAL_DEV"] \ No newline at end of file + ["INTERNAL_DEV"] + + + config: + dev: + API_INTEGRATION: AWS_EXTERNAL_API_STG_V2 + EXTERNAL_FUNCTION_URI: qoupd0givh.execute-api.us-east-1.amazonaws.com/stg/ + ROLES: + - AWS_LAMBDA_EXTERNAL_API + - INTERNAL_DEV + + prod: + API_INTEGRATION: AWS_EXTERNAL_API_PRO_V2 + EXTERNAL_FUNCTION_URI: zv7a5qfhv9.execute-api.us-east-1.amazonaws.com/prod/ + ROLES: + - AWS_LAMBDA_EXTERNAL_API + - INTERNAL_DEV + - DBT_CLOUD_EXTERNAL \ No newline at end of file diff --git a/macros/create_udfs.sql b/macros/create_udfs.sql index 0923536..29fb073 100644 --- a/macros/create_udfs.sql +++ b/macros/create_udfs.sql @@ -1,8 +1,8 @@ --- {% macro create_udfs() %} --- {% if var("UPDATE_UDFS_AND_SPS") %} --- {% set sql %} --- CREATE schema if NOT EXISTS silver; --- {% endset %} --- {% do run_query(sql) %} --- {% endif %} --- {% endmacro %} +{% macro create_udfs() %} + {% if var("UPDATE_UDFS_AND_SPS") %} + {% set sql %} + {{ create_udf_bulk_rest_api_v2() }}; +{% endset %} + {% do run_query(sql) %} + {% endif %} +{% endmacro %} diff --git a/macros/streamline/models.sql b/macros/streamline/models.sql new file mode 100644 index 0000000..cb26888 --- /dev/null +++ b/macros/streamline/models.sql @@ -0,0 +1,83 @@ +{% macro streamline_external_table_query_v2( + model, + partition_function, + partition_name, + other_cols +) %} +WITH meta AS ( + SELECT + LAST_MODIFIED::timestamp_ntz AS _inserted_timestamp, + file_name, + {{ partition_function }} AS {{ partition_name }} + FROM + TABLE( + information_schema.external_table_file_registration_history( + start_time => DATEADD('day', -3, CURRENT_TIMESTAMP()), + table_name => '{{ source( "bronze_streamline", model) }}') + ) A +) +SELECT + {{ other_cols }}, + DATA, + _inserted_timestamp, + s.{{ partition_name }}, + s.value AS VALUE, + file_name +FROM + {{ source( + "bronze_streamline", + model + ) }} + s +JOIN + meta b + ON b.file_name = metadata$filename + AND b.{{ partition_name }} = s.{{ partition_name }} +WHERE + b.{{ partition_name }} = s.{{ partition_name }} + AND ( + data:error:code IS NULL + ) +{% endmacro %} + +{% macro streamline_external_table_FR_query_v2( + model, + partition_function, + partition_name, + other_cols +) %} +WITH meta AS ( + SELECT + LAST_MODIFIED::timestamp_ntz AS _inserted_timestamp, + file_name, + {{ partition_function }} AS {{ partition_name }} + FROM + TABLE( + information_schema.external_table_files( + table_name => '{{ source( "bronze_streamline", model) }}' + ) + ) A +) +SELECT + {{ other_cols }}, + DATA, + _inserted_timestamp, + s.{{ partition_name }}, + s.value AS VALUE, + file_name +FROM + {{ source( + "bronze_streamline", + model + ) }} + s +JOIN + meta b + ON b.file_name = metadata$filename + AND b.{{ partition_name }} = s.{{ partition_name }} +WHERE + b.{{ partition_name }} = s.{{ partition_name }} + AND ( + data:error:code IS NULL + ) +{% endmacro %} \ No newline at end of file diff --git a/macros/streamline/streamline_udfs.sql b/macros/streamline/streamline_udfs.sql new file mode 100644 index 0000000..bc16432 --- /dev/null +++ b/macros/streamline/streamline_udfs.sql @@ -0,0 +1,10 @@ +{% macro create_udf_bulk_rest_api_v2() %} + CREATE + OR REPLACE EXTERNAL FUNCTION streamline.udf_bulk_rest_api_v2( + json OBJECT + ) returns ARRAY {% if target.database == 'EXTERNAL:' -%} + api_integration = aws_external_api_prod AS 'https://zv7a5qfhv9.execute-api.us-east-1.amazonaws.com/prod/udf_bulk_rest_api' + {% else %} + api_integration = aws_external_api_stg_v2 AS 'https://qoupd0givh.execute-api.us-east-1.amazonaws.com/stg/udf_bulk_rest_api' + {%- endif %} +{% endmacro %} diff --git a/models/bitquery/bronze/bronze__bitquery.sql b/models/bitquery/bronze/bronze__bitquery.sql new file mode 100644 index 0000000..2d64d8a --- /dev/null +++ b/models/bitquery/bronze/bronze__bitquery.sql @@ -0,0 +1,9 @@ +{{ config ( + materialized = 'view' +) }} +{{ streamline_external_table_query_v2( + model = 'bitquery', + partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 3), '_', 1) AS INTEGER)", + partition_name = "partition_key", + other_cols = "to_date(value:DATE_DAY::STRING,'YYYYMMDD') AS DATE_DAY, to_date(value:DATE_DAY::STRING,'YYYYMMDD') AS DATE_DAY, value:BLOCKCHAIN::STRING AS BLOCKCHAIN, value:METRIC::STRING AS METRIC" +) }} diff --git a/models/bitquery/bronze/bronze__bitquery_FR.sql b/models/bitquery/bronze/bronze__bitquery_FR.sql new file mode 100644 index 0000000..5b7589e --- /dev/null +++ b/models/bitquery/bronze/bronze__bitquery_FR.sql @@ -0,0 +1,9 @@ +{{ config ( + materialized = 'view' +) }} +{{ streamline_external_table_FR_query_v2( + model = 'bitquery', + partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 3), '_', 1) AS INTEGER)", + partition_name = "partition_key", + other_cols = "to_date(value:DATE_DAY::STRING,'YYYYMMDD') AS DATE_DAY, value:BLOCKCHAIN::STRING AS BLOCKCHAIN, value:METRIC::STRING AS METRIC" +) }} diff --git a/models/bitquery/gold/bitquery__active_users.sql b/models/bitquery/gold/bitquery__active_users.sql new file mode 100644 index 0000000..2509ead --- /dev/null +++ b/models/bitquery/gold/bitquery__active_users.sql @@ -0,0 +1,75 @@ +-- depends_on: {{ ref('bronze__bitquery') }} +{{ config( + materialized = 'incremental', + unique_key = ['blockchain', 'metric', 'as_of_date'], + tags = ['bitquery'] +) }} + +WITH base AS( + + SELECT + A.blockchain, + A.metric, + A.date_Day AS as_of_date, + COALESCE( + REGEXP_SUBSTR( + A.data, + '"countBigInt"\\s*:\\s*"([^"]+)"', + 1, + 1, + 'e', + 1 + ), + REGEXP_SUBSTR( + A.data, + '"senders"\\s*:\\s*"([^"]+)"', + 1, + 1, + 'e', + 1 + ) + ) active_users, + A._inserted_timestamp + FROM + +{% if is_incremental() %} +{{ ref('bronze__bitquery') }} +{% else %} + {{ ref('bronze__bitquery_FR') }} +{% endif %} + +A +WHERE + A.data :errors IS NULL + AND A.metric = 'active_users' + AND active_users IS NOT NULL + +{% if is_incremental() %} +AND _inserted_timestamp > ( + SELECT + MAX(_inserted_timestamp) + FROM + {{ this }} +) +{% endif %} +) +SELECT + blockchain, + metric, + as_of_date, + active_users, + _inserted_timestamp, + {{ dbt_utils.generate_surrogate_key( + ['blockchain','metric','as_of_date'] + ) }} AS active_users_id, + SYSDATE() AS inserted_timestamp, + SYSDATE() AS modified_timestamp, + '{{ invocation_id }}' AS _invocation_id +FROM + base qualify ROW_NUMBER() over ( + PARTITION BY blockchain, + metric, + as_of_date + ORDER BY + _inserted_timestamp DESC + ) = 1 diff --git a/models/bitquery/gold/bitquery__active_users.yml b/models/bitquery/gold/bitquery__active_users.yml new file mode 100644 index 0000000..6a8fe3b --- /dev/null +++ b/models/bitquery/gold/bitquery__active_users.yml @@ -0,0 +1,37 @@ +version: 2 +models: + - name: bitquery__active_users + description: > + This model returns the number of active users for each blockchain and as of date. The distinct number of users is from the 30 days prior to the as of date. + tests: + - dbt_utils.unique_combination_of_columns: + combination_of_columns: + - BLOCKCHAIN + - METRIC + - AS_OF_DATE + + columns: + - name: BLOCKCHAIN + description: > + The blockchain where the active users are from. + tests: + - not_null + - name: METRIC + description: > + The metric name - always 'active_users'. + tests: + - not_null + - name: AS_OF_DATE + description: > + The date when the active users are counted. + tests: + - not_null + - dbt_expectations.expect_row_values_to_have_recent_data: + datepart: day + interval: 2 + - name: ACTIVE_USERS + description: > + The number of active users. The distinct number of users is from the 30 days prior to the as of date. + tests: + - not_null + diff --git a/models/bitquery/gold/bitquery__tx_count.sql b/models/bitquery/gold/bitquery__tx_count.sql new file mode 100644 index 0000000..f2564b6 --- /dev/null +++ b/models/bitquery/gold/bitquery__tx_count.sql @@ -0,0 +1,72 @@ +-- depends_on: {{ ref('bronze__bitquery') }} +{{ config( + materialized = 'incremental', + unique_key = ['blockchain', 'metric', 'block_date'], + tags = ['bitquery'] +) }} + +WITH base AS ( + + SELECT + A.blockchain, + A.metric, + REGEXP_SUBSTR( + A.data, + '"date"\\s*:\\s*\\{\\s*"date"\\s*:\\s*"([^"]+)"', + 1, + 1, + 'e', + 1 + ) AS block_date, + REGEXP_SUBSTR( + A.data, + '"countBigInt"\\s*:\\s*"([^"]+)"', + 1, + 1, + 'e', + 1 + ) AS tx_count, + A._inserted_timestamp + FROM + +{% if is_incremental() %} +{{ ref('bronze__bitquery') }} +{% else %} + {{ ref('bronze__bitquery_FR') }} +{% endif %} + +A +WHERE + A.data :errors IS NULL + AND A.metric = 'tx_count' + AND tx_count IS NOT NULL + +{% if is_incremental() %} +AND _inserted_timestamp > ( + SELECT + MAX(_inserted_timestamp) + FROM + {{ this }} +) +{% endif %} +) +SELECT + blockchain, + metric, + block_date, + tx_count, + _inserted_timestamp, + {{ dbt_utils.generate_surrogate_key( + ['blockchain','metric','block_date'] + ) }} AS accounts_id, + SYSDATE() AS inserted_timestamp, + SYSDATE() AS modified_timestamp, + '{{ invocation_id }}' AS _invocation_id +FROM + base qualify ROW_NUMBER() over ( + PARTITION BY blockchain, + metric, + block_date + ORDER BY + _inserted_timestamp DESC + ) = 1 diff --git a/models/bitquery/gold/bitquery__tx_count.yml b/models/bitquery/gold/bitquery__tx_count.yml new file mode 100644 index 0000000..8b2cc64 --- /dev/null +++ b/models/bitquery/gold/bitquery__tx_count.yml @@ -0,0 +1,36 @@ +version: 2 +models: + - name: bitquery__tx_count + description: > + This model returns the number of transactions for each blockchain and block date. + tests: + - dbt_utils.unique_combination_of_columns: + combination_of_columns: + - BLOCKCHAIN + - METRIC + - BLOCK_DATE + + columns: + - name: BLOCKCHAIN + description: > + The blockchain where the transactions are from. + tests: + - not_null + - name: METRIC + description: > + The metric name - always 'tx_count'. + tests: + - not_null + - name: BLOCK_DATE + description: > + The date when the transactions are counted. + tests: + - not_null + - dbt_expectations.expect_row_values_to_have_recent_data: + datepart: day + interval: 2 + - name: TX_COUNT + description: > + The number of transactions for this date. + tests: + - not_null \ No newline at end of file diff --git a/models/bitquery/streamline/streamline__bitquery_complete.sql b/models/bitquery/streamline/streamline__bitquery_complete.sql new file mode 100644 index 0000000..d0d5610 --- /dev/null +++ b/models/bitquery/streamline/streamline__bitquery_complete.sql @@ -0,0 +1,44 @@ +-- depends_on: {{ ref('bronze__bitquery_FR') }} +-- depends_on: {{ ref('bronze__bitquery') }} +{{ config ( + materialized = "incremental", + unique_key = [' date_day','blockchain','metric'], + merge_exclude_columns = ["inserted_timestamp"], + tags = ['streamline_realtime'] +) }} + +SELECT + date_day, + blockchain, + metric, + partition_key, + _inserted_timestamp, + SYSDATE() AS inserted_timestamp, + SYSDATE() AS modified_timestamp, + file_name, + '{{ invocation_id }}' AS _invocation_id, +FROM + +{% if is_incremental() %} +{{ ref('bronze__bitquery') }} +{% else %} + {{ ref('bronze__bitquery_FR') }} +{% endif %} +WHERE + DATA :errors IS NULL + +{% if is_incremental() %} +AND _inserted_timestamp >= ( + SELECT + COALESCE(MAX(_INSERTED_TIMESTAMP), '1970-01-01' :: DATE) max_INSERTED_TIMESTAMP + FROM + {{ this }}) + {% endif %} + + qualify ROW_NUMBER() over ( + PARTITION BY date_day, + blockchain, + metric + ORDER BY + _inserted_timestamp DESC + ) = 1 diff --git a/models/bitquery/streamline/streamline__bitquery_metrics.sql b/models/bitquery/streamline/streamline__bitquery_metrics.sql new file mode 100644 index 0000000..d2c5e4f --- /dev/null +++ b/models/bitquery/streamline/streamline__bitquery_metrics.sql @@ -0,0 +1,128 @@ +{{ config ( + materialized = "view", + tags = ['streamline_view'] +) }} + +WITH metrics AS ( + + SELECT + 'hedera' AS blockchain, + 'tx_count' AS metric, + 'query ($network: HederaNetwork!, $dateFormat: String!, $from: ISO8601DateTime, $till: ISO8601DateTime) { hedera(network: $network) { transactions(options: {asc: "date.date"}, date: {since: $from, till: $till}) { date {date(format: $dateFormat) } countBigInt}} }' AS query_text, + 'Count of tx hashes by day' AS description + UNION ALL + SELECT + 'ripple' AS blockchain, + 'tx_count' AS metric, + 'query ($network: RippleNetwork!, $dateFormat: String!, $from: ISO8601DateTime, $till: ISO8601DateTime) { ripple(network: $network){ transactions(options: {asc: "date.date"}, date: {since: $from, till: $till}) { date: date { date(format: $dateFormat) } countBigInt }} }' AS query_text, + 'Count of tx hashes by day' AS description + UNION ALL + SELECT + 'moonbeam' AS blockchain, + 'tx_count' AS metric, + 'query ($network: EthereumNetwork!, $dateFormat: String!, $from: ISO8601DateTime, $till: ISO8601DateTime) { ethereum(network: $network){ transactions(options: {asc: "date.date"}, date: {since: $from, till: $till}) { date: date { date(format: $dateFormat) } countBigInt }} }' AS query_text, + 'Count of tx hashes by day' AS description + UNION ALL + SELECT + 'celo_mainnet' AS blockchain, + 'tx_count' AS metric, + 'query ($network: EthereumNetwork!, $dateFormat: String!, $from: ISO8601DateTime, $till: ISO8601DateTime) { ethereum(network: $network){ transactions(options: {asc: "date.date"}, date: {since: $from, till: $till}) { date: date { date(format: $dateFormat) } countBigInt }} }' AS query_text, + 'Count of tx hashes by day' AS description + UNION ALL + SELECT + 'algorand' AS blockchain, + 'tx_count' AS metric, + 'query ($network: AlgorandNetwork!, $dateFormat: String!, $from: ISO8601DateTime, $till: ISO8601DateTime) { algorand(network: $network) { transactions(options: {asc: "date.date"}, date: {till: $till, since: $from} ) { date: date { date(format: $dateFormat) } countBigInt }} }' AS query_text, + 'Count of tx hashes by day' AS description + UNION ALL + SELECT + 'filecoin' AS blockchain, + 'tx_count' AS metric, + 'query ($network: FilecoinNetwork!, $dateFormat: String!, $from: ISO8601DateTime, $till: ISO8601DateTime) { filecoin(network: $network) { messages(options: {asc: "date.date"}, date: {since: $from, till: $till}) { date: date { date(format: $dateFormat) } countBigInt }} }' AS query_text, + 'Count of messages by day' AS description + UNION ALL + SELECT + 'cardano' AS blockchain, + 'tx_count' AS metric, + 'query ($network: CardanoNetwork!, $dateFormat: String!, $from: ISO8601DateTime, $till: ISO8601DateTime) { cardano(network: $network) { transactions(options: {asc: "date.date"}, date: {since: $from, till: $till}) { date: date { date(format: $dateFormat) } countBigInt }} }' AS query_text, + 'Count of messages by day' AS description + UNION ALL + SELECT + 'hedera' AS blockchain, + 'active_users' AS metric, + 'query ($network: HederaNetwork!, $from: ISO8601DateTime, $till: ISO8601DateTime) {hedera(network: $network) {transactions(date: {since: $from, till: $till}) { countBigInt(uniq: payer_account) } } }' AS query_text, + 'distinct counts of payer accounts over the last 30 days' AS description + UNION ALL + SELECT + 'ripple' AS blockchain, + 'active_users' AS metric, + 'query ($network: RippleNetwork!, $from: ISO8601DateTime, $till: ISO8601DateTime) { ripple(network: $network) { transactions( date: {since: $from, till: $till}) { countBigInt(uniq: senders) } } } ' AS query_text, + 'distinct counts of senders over the last 30 days' AS description + UNION ALL + SELECT + 'moonbeam' AS blockchain, + 'active_users' AS metric, + 'query ($network: EthereumNetwork!, $from: ISO8601DateTime, $till: ISO8601DateTime) { ethereum(network: $network){ transactions(date: {since: $from, till: $till}) { countBigInt(uniq: senders) }} }' AS query_text, + 'distinct counts of senders over the last 30 days' AS description + UNION ALL + SELECT + 'celo_mainnet' AS blockchain, + 'active_users' AS metric, + 'query ($network: EthereumNetwork!, $from: ISO8601DateTime, $till: ISO8601DateTime) { ethereum(network: $network){ transactions(date: {since: $from, till: $till}) { countBigInt(uniq: senders) }} }' AS query_text, + 'distinct counts of senders over the last 30 days' AS description + UNION ALL + SELECT + 'algorand' AS blockchain, + 'active_users' AS metric, + 'query ($network: AlgorandNetwork!, $from: ISO8601DateTime, $till: ISO8601DateTime) { algorand(network: $network) { transactions( date: {since: $from, till: $till} ) { countBigInt(uniq: senders) }} }' AS query_text, + 'distinct counts of senders over the last 30 days' AS description + UNION ALL + SELECT + 'filecoin' AS blockchain, + 'active_users' AS metric, + 'query ($network: FilecoinNetwork!, $from: ISO8601DateTime, $till: ISO8601DateTime) { filecoin(network: $network) { messages(date: {since: $from, till: $till}) {senders: countBigInt(uniq: senders) }} }' AS query_text, + 'distinct counts of message senders over the last 30 days' AS description + UNION ALL + SELECT + 'cardano' AS blockchain, + 'active_users' AS metric, + 'query ($network: CardanoNetwork!, $from: ISO8601DateTime, $till: ISO8601DateTime) { cardano(network: $network) { activeAddresses(date: {since: $from, till: $till}) { countBigInt(uniq: address) }} }' AS query_text, + 'distinct counts of addresses over the last 30 days' AS description +) +SELECT + date_day, + DATEADD( + 'day', + -30, + date_day + ) AS date_day_minus_30, + blockchain, + metric, + query_text, + OBJECT_CONSTRUCT( + 'limit', + '1', + 'offset', + '0', + 'network', + blockchain, + 'from', + CASE + WHEN metric = 'active_users' THEN date_day_minus_30 + ELSE date_day + END, + 'till', + date_day, + 'dateFormat', + '%Y-%m-%d' + ) AS variables, + description +FROM + {{ source( + 'crosschain_core', + 'dim_dates' + ) }} + CROSS JOIN metrics +WHERE + date_day >= '2025-01-01' + AND date_day < SYSDATE() :: DATE diff --git a/models/bitquery/streamline/streamline__bitquery_realtime.sql b/models/bitquery/streamline/streamline__bitquery_realtime.sql new file mode 100644 index 0000000..6bbacee --- /dev/null +++ b/models/bitquery/streamline/streamline__bitquery_realtime.sql @@ -0,0 +1,59 @@ +{{ config ( + materialized = "view", + post_hook = fsc_utils.if_data_call_function_v2( + func = 'streamline.udf_bulk_rest_api_v2', + target = "{{this.schema}}.{{this.identifier}}", + params ={ "external_table" :"bitquery", + "sql_limit" :"100", + "producer_batch_size" :"100", + "worker_batch_size" :"100", + "async_concurrent_requests": "10", + "sql_source" :"{{this.identifier}}", + "order_by_column": "date_day" } + ), + tags = ['streamline_realtime'] +) }} + +WITH metrics AS ( + + SELECT + date_day, + blockchain, + metric, + query_text, + variables + FROM + {{ ref("streamline__bitquery_metrics") }} A + LEFT JOIN {{ ref("streamline__bitquery_complete") }} + b USING ( + blockchain, + metric, + date_day + ) + WHERE + b._invocation_id IS NULL +) +SELECT + TO_NUMBER(to_char(date_day, 'YYYYMMDD')) AS date_day, + blockchain, + metric, + TO_NUMBER(to_char(SYSDATE() :: DATE, 'YYYYMMDD')) AS partition_key, + {{ target.database }}.live.udf_api( + 'POST', + 'https://graphql.bitquery.io', + OBJECT_CONSTRUCT( + 'Content-Type', + 'application/json', + 'Authorization', + 'Bearer {Authentication}' + ), + OBJECT_CONSTRUCT( + 'query', + query_text, + 'variables', + variables + ), + 'Vault/prod/external/bitquery' + ) AS request +FROM + metrics diff --git a/models/sources.yml b/models/sources.yml index 0e740f5..c15d769 100644 --- a/models/sources.yml +++ b/models/sources.yml @@ -1,6 +1,11 @@ version: 2 sources: + - name: bronze_streamline + database: streamline + schema: "{{ 'external' if target.database == 'EXTERNAL' else 'external_dev' }}" + tables: + - name: bitquery - name: tokenflow_starknet_l1_data database: tokenflow_starknet schema: l1_data @@ -37,6 +42,11 @@ sources: schema: silver tables: - name: apis_keys + - name: crosschain_core + database: crosschain + schema: core + tables: + - name: dim_dates - name: ethereum_silver database: ethereum schema: silver