An 3660/observability models (#155)

* blocks completeness

* block tx count external api model

* txs completeness model

* add col alias

* test result agg cte and temp test range

* curr range test

* revert source change - other pr

* xchain silver

* nuke api key & move to secrets, create workflow

* add scheduled tag

* update workflow w cache

* add obs tag

* rem testing range from txs complete

* comment out schedule and del double comman

* add default null for key var
This commit is contained in:
Jack Forgash 2023-08-23 13:37:16 -06:00 committed by GitHub
parent 2d4753ffda
commit 45f72e71d8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
66 changed files with 560 additions and 64 deletions

View File

@ -0,0 +1,52 @@
name: dbt_get_block_tx_count
run-name: dbt_get_block_tx_count
on:
workflow_dispatch:
# schedule:
# Runs "every minute" (see https://crontab.guru)
# TODO enable after manual test
# - cron: '* * * * *'
env:
USE_VARS: "${{ vars.USE_VARS }}"
DBT_PROFILES_DIR: "${{ vars.DBT_PROFILES_DIR }}"
DBT_VERSION: "${{ vars.DBT_VERSION }}"
ACCOUNT: "${{ vars.ACCOUNT }}"
ROLE: "${{ vars.ROLE }}"
USER: "${{ vars.USER }}"
PASSWORD: "${{ secrets.PASSWORD }}"
REGION: "${{ vars.REGION }}"
DATABASE: "${{ vars.DATABASE }}"
WAREHOUSE: "${{ vars.WAREHOUSE }}"
SCHEMA: "${{ vars.SCHEMA }}"
BITQUERY_API_KEY: "${{ secrets.BITQUERY_API_KEY }}"
concurrency:
group: ${{ github.workflow }}
jobs:
dbt:
runs-on: ubuntu-latest
environment:
name: workflow_prod
steps:
- uses: actions/checkout@v3
- uses: actions/setup-python@v4
with:
python-version: "3.10"
cache: "pip"
- name: install dependencies
run: |
pip install -r requirements.txt
dbt deps
- name: Run DBT Jobs
run: |
dbt run -s tag:get_block_tx_count --vars 'BITQUERY_API_KEY: ${{ secrets.BITQUERY_API_KEY }}'
- name: Store logs
uses: actions/upload-artifact@v3
with:
name: dbt-logs
path: |
logs
target

View File

@ -30,7 +30,7 @@ jobs:
dbt_command: >
dbt run-operation stage_external_sources --vars "ext_full_refresh: true";
dbt seed;
dbt run --exclude streamline__all_topshot_moments_minted_metadata_needed streamline__allday_moments_metadata_needed models/silver/streamline;
dbt run -s tag:scheduled
environment: workflow_prod
warehouse: ${{ vars.WAREHOUSE }}
secrets: inherit

View File

@ -56,5 +56,6 @@ vars:
"dbt_date:time_zone": GMT
UPDATE_SNOWFLAKE_TAGS: True
UPDATE_UDFS_AND_SPS: False
OBSERV_FULL_TEST: False
STREAMLINE_USE_DEV_FOR_EXTERNAL_TABLES: False
STREAMLINE_INVOKE_STREAMS: True
STREAMLINE_INVOKE_STREAMS: False

View File

@ -1,5 +1,6 @@
{{ config (
materialized = 'view'
materialized = 'view',
tags = ['scheduled']
) }}
SELECT

View File

@ -1,5 +1,6 @@
{{ config (
materialized = 'view'
materialized = 'view',
tags = ['scheduled']
) }}
WITH labels AS (

View File

@ -1,5 +1,6 @@
{{ config (
materialized = 'view'
materialized = 'view',
tags = ['scheduled']
) }}
SELECT

View File

@ -1,5 +1,6 @@
{{ config(
materialized = 'view'
materialized = 'view',
tags = ['scheduled']
) }}
WITH coingecko AS (
@ -15,7 +16,7 @@ WITH coingecko AS (
_inserted_timestamp
FROM
{{ source(
'crosschain_v2',
'crosschain_silver',
'hourly_prices_coin_gecko'
) }}
),
@ -31,7 +32,7 @@ coinmarketcap AS (
_inserted_timestamp
FROM
{{ source(
'crosschain_v2',
'crosschain_silver',
'hourly_prices_coin_market_cap'
) }}
),

View File

@ -1,5 +1,6 @@
{{ config (
materialized = 'view'
materialized = 'view',
tags = ['scheduled']
) }}
SELECT

View File

@ -6,7 +6,8 @@
'PURPOSE': 'NFT, ALLDAY'
}
}
}
},
tags = ['scheduled']
) }}
WITH allday AS (

View File

@ -1,5 +1,6 @@
{{ config (
materialized = 'view'
materialized = 'view',
tags = ['scheduled']
) }}
WITH contract_labels AS (

View File

@ -1,6 +1,6 @@
{{ config (
materialized = 'view',
tags = ['nft', 'dapper'],
tags = ['nft', 'dapper', 'scheduled'],
meta = {
'database_tags':{
'table': {

View File

@ -1,5 +1,6 @@
{{ config (
materialized = 'view'
materialized = 'view',
tags = ['scheduled']
) }}
WITH pairs AS (

View File

@ -6,7 +6,8 @@
'PURPOSE': 'NFT, TOPSHOT'
}
}
}
},
tags = ['scheduled']
) }}
WITH topshot AS (

View File

@ -1,5 +1,6 @@
{{ config(
materialized = 'view'
materialized = 'view',
tags = ['scheduled']
) }}
WITH validators AS (

View File

@ -1,6 +1,6 @@
{{ config(
materialized = 'view',
tags = ['ez', 'bridge'],
tags = ['ez', 'bridge', 'scheduled'],
meta={
'database_tags':{
'table': {

View File

@ -1,6 +1,6 @@
{{ config(
materialized = 'view',
tags = ['nft', 'ez'],
tags = ['nft', 'ez', 'scheduled'],
meta={
'database_tags':{
'table': {

View File

@ -1,6 +1,6 @@
{{ config(
materialized = 'view',
tags = ['ez'],
tags = ['ez', 'scheduled'],
meta={
'database_tags':{
'table': {

View File

@ -1,6 +1,6 @@
{{ config (
materialized = 'view',
tags = ['ez'],
tags = ['ez', 'scheduled'],
meta={
'database_tags':{
'table': {

View File

@ -1,6 +1,6 @@
{{ config(
materialized = 'view',
tags = ['ez']
tags = ['ez', 'scheduled']
) }}
SELECT

View File

@ -1,5 +1,6 @@
{{ config(
materialized = 'view'
materialized = 'view',
tags = ['scheduled']
) }}
WITH silver_blocks AS (

View File

@ -1,5 +1,6 @@
{{ config(
materialized = 'view'
materialized = 'view',
tags = ['scheduled']
) }}
WITH events_final AS (

View File

@ -1,5 +1,6 @@
{{ config(
materialized = 'view'
materialized = 'view',
tags = ['scheduled']
) }}
WITH api AS (

View File

@ -1,5 +1,6 @@
{{ config(
materialized = 'view'
materialized = 'view',
tags = ['scheduled']
) }}
WITH token_labels AS (

View File

@ -1,5 +1,6 @@
{{ config(
materialized = 'view'
materialized = 'view',
tags = ['scheduled']
) }}
WITH silver_txs AS (

View File

@ -0,0 +1,66 @@
{{ config(
materialized = 'incremental',
unique_key = 'block_height',
tags = ['observability', 'get_block_tx_count']
) }}
WITH starting_block AS (
{% if is_incremental() %}
SELECT
MAX(block_height) AS block_height_start, {{ target.database }}.streamline.udf_get_chainhead() AS max_block_height
FROM
{{ this }}
{% else %}
-- Candidate 4 starts at 4132133
SELECT
4132133 AS block_height_start, 1000000000 AS max_block_height
{% endif %}),
params AS (
SELECT
'query ($network: FlowNetwork!, $block_height_start: Int!, $block_height_end: Int!) { flow(network: $network) { blocks(height: {gt: $block_height_start, lteq: $block_height_end}) { height transactionsCount } } }' AS query,
OBJECT_CONSTRUCT(
'network',
'flow',
'block_height_start',
block_height_start,
'block_height_end',
IFF(
block_height_start + 25000 > max_block_height,
max_block_height,
block_height_start + 25000
)
) AS variables,
'{{ var('BITQUERY_API_KEY', Null) }}' AS api_key
FROM
starting_block
),
get_bitquery AS (
SELECT
livequery_dev.live.udf_api(
'POST',
'https://graphql.bitquery.io',
OBJECT_CONSTRUCT(
'Content-Type',
'application/json',
'X-API-KEY',
api_key
),
OBJECT_CONSTRUCT(
'query',
query,
'variables',
variables
)
) AS res,
res :data :data :flow :blocks :: ARRAY AS blocks_res
FROM
params
)
SELECT
VALUE :height :: INTEGER AS block_height,
VALUE :transactionsCount :: INTEGER AS transaction_ct
FROM
get_bitquery,
LATERAL FLATTEN(blocks_res)

View File

@ -0,0 +1,187 @@
{{ config(
materialized = 'incremental',
unique_key = 'test_timestamp',
full_refresh = False,
tags = ['observability']
) }}
WITH summary_stats AS (
SELECT
MIN(block_height) AS min_block,
MAX(block_height) AS max_block,
MIN(block_timestamp) AS min_block_timestamp,
MAX(block_timestamp) AS max_block_timestamp,
COUNT(1) AS blocks_tested
FROM
{{ ref('silver__blocks') }}
WHERE
block_timestamp <= DATEADD('hour', -12, SYSDATE())
{% if is_incremental() %}
AND (
block_height >= (
SELECT
MIN(block_height)
FROM
(
SELECT
MIN(block_height) AS block_height
FROM
{{ ref('silver__blocks') }}
WHERE
block_timestamp BETWEEN DATEADD('hour', -96, SYSDATE())
AND DATEADD('hour', -95, SYSDATE())
UNION
SELECT
MIN(VALUE) - 1 AS block_height
FROM
(
SELECT
blocks_impacted_array
FROM
{{ this }}
qualify ROW_NUMBER() over (
ORDER BY
test_timestamp DESC
) = 1
),
LATERAL FLATTEN(
input => blocks_impacted_array
)
)
) {% if var('OBSERV_FULL_TEST') %}
OR block_height >= 7601063
{% endif %}
)
{% endif %}
),
block_range AS (
SELECT
_id AS block_height
FROM
{{ source(
'crosschain_silver',
'number_sequence'
) }}
WHERE
_id BETWEEN (
SELECT
min_block
FROM
summary_stats
)
AND (
SELECT
max_block
FROM
summary_stats
)
),
blocks AS (
SELECT
l.block_height,
block_timestamp,
LAG(
l.block_height,
1
) over (
ORDER BY
l.block_height ASC
) AS prev_BLOCK_HEIGHT
FROM
{{ ref("silver__blocks") }}
l
INNER JOIN block_range b
ON l.block_height = b.block_height
AND l.block_height >= (
SELECT
MIN(block_height)
FROM
block_range
)
),
block_gen AS (
SELECT
_id AS block_height
FROM
{{ source(
'crosschain_silver',
'number_sequence'
) }}
WHERE
_id BETWEEN (
SELECT
MIN(block_height)
FROM
blocks
)
AND (
SELECT
MAX(block_height)
FROM
blocks
)
),
test_blocks AS (
SELECT
'blocks' AS test_name,
MIN(
b.block_height
) AS min_block,
MAX(
b.block_height
) AS max_block,
MIN(
b.block_timestamp
) AS min_block_timestamp,
MAX(
b.block_timestamp
) AS max_block_timestamp,
COUNT(1) AS blocks_tested,
COUNT(
CASE
WHEN C.block_height IS NOT NULL THEN A.block_height
END
) AS blocks_impacted_count,
ARRAY_AGG(
CASE
WHEN C.block_height IS NOT NULL THEN A.block_height
END
) within GROUP (
ORDER BY
A.block_height
) AS blocks_impacted_array,
SYSDATE() AS test_timestamp
FROM
block_gen A
LEFT JOIN blocks b
ON A.block_height = b.block_height
LEFT JOIN blocks C
ON A.block_height > C.prev_block_height
AND A.block_height < C.block_height
AND C.block_height - C.prev_block_height <> 1
WHERE
COALESCE(
b.block_height,
C.block_height
) IS NOT NULL
),
FINAL AS (
SELECT
test_name,
min_block,
max_block,
min_block_timestamp,
max_block_timestamp,
blocks_tested,
blocks_impacted_count,
blocks_impacted_array,
test_timestamp
FROM
test_blocks
)
SELECT
*
FROM
FINAL

View File

@ -0,0 +1,152 @@
{{ config(
materialized = 'incremental',
unique_key = '_test_timestamp',
tags = ['observability']
) }}
WITH summary_stats AS (
SELECT
MIN(block_height) AS min_block,
MAX(block_height) AS max_block,
MIN(block_timestamp) AS min_block_timestamp,
MAX(block_timestamp) AS max_block_timestamp,
COUNT(1) AS blocks_tested
FROM
{{ ref('silver__blocks') }}
WHERE
block_timestamp <= DATEADD('hour', -12, SYSDATE())
{% if is_incremental() %}
AND (
block_height >= (
SELECT
MIN(block_height)
FROM
(
SELECT
MIN(block_height) AS block_height
FROM
{{ ref('silver__blocks') }}
WHERE
block_timestamp BETWEEN DATEADD('hour', -96, SYSDATE())
AND DATEADD('hour', -95, SYSDATE())
UNION
SELECT
MIN(VALUE) - 1 AS block_height
FROM
(
SELECT
blocks_impacted_array
FROM
{{ this }}
qualify ROW_NUMBER() over (
ORDER BY
test_timestamp DESC
) = 1
),
LATERAL FLATTEN(
input => blocks_impacted_array
)
)
) {% if var('OBSERV_FULL_TEST') %}
OR block_height >= 7601063
{% endif %}
)
{% endif %}
),
block_range AS (
SELECT
_id AS block_height
FROM
{{ source(
'crosschain_silver',
'number_sequence'
) }}
WHERE
block_height BETWEEN (
SELECT
min_block
FROM
summary_stats
)
AND (
SELECT
max_block
FROM
summary_stats
)
),
txs_per_block_actual AS (
SELECT
block_height,
COUNT(
DISTINCT tx_id
) AS txs
FROM
{{ ref('silver__transactions') }}
WHERE
block_height IN (
SELECT
block_height
FROM
block_range
)
GROUP BY
1
),
txs_per_block_expected AS (
SELECT
block_height,
transaction_ct AS txs
FROM
{{ ref('silver_observability__block_tx_count') }}
WHERE
block_height IN (
SELECT
block_height
FROM
block_range
)
),
comparison AS (
SELECT
id.block_height,
A.txs AS actual_tx_count,
e.txs AS expected_tx_count
FROM
block_range id
LEFT JOIN txs_per_block_actual A USING (block_height)
LEFT JOIN txs_per_block_expected e USING (block_height)
),
impacted_blocks AS (
SELECT
COUNT(1) AS blocks_impacted_count,
ARRAY_AGG(block_height) within GROUP (
ORDER BY
block_height
) AS blocks_impacted_array
FROM
comparison
WHERE
actual_tx_count != expected_tx_count
),
FINAL AS (
SELECT
'transactions' AS test_name,
min_block,
max_block,
min_block_timestamp,
max_block_timestamp,
blocks_tested,
blocks_impacted_count,
blocks_impacted_array,
SYSDATE() AS _test_timestamp
FROM
summary_stats
JOIN impacted_blocks
)
SELECT
*
FROM
FINAL

View File

@ -2,7 +2,8 @@
materialized = 'incremental',
cluster_by = ['_inserted_timestamp::DATE'],
unique_key = 'block_height',
incremental_strategy = 'delete+insert'
incremental_strategy = 'delete+insert',
tags = ['scheduled']
) }}
WITH bronze_blocks AS (

View File

@ -2,7 +2,8 @@
materialized = 'incremental',
cluster_by = ['_inserted_timestamp::DATE'],
unique_key = 'attribute_id',
incremental_strategy = 'delete+insert'
incremental_strategy = 'delete+insert',
tags = ['scheduled']
) }}
WITH events AS (

View File

@ -2,7 +2,8 @@
materialized = 'incremental',
cluster_by = ['_inserted_timestamp::DATE'],
unique_key = 'attribute_id',
incremental_strategy = 'delete+insert'
incremental_strategy = 'delete+insert',
tags = ['scheduled']
) }}
WITH events AS (

View File

@ -2,7 +2,8 @@
materialized = 'incremental',
cluster_by = ['_inserted_timestamp::DATE'],
unique_key = 'event_id',
incremental_strategy = 'delete+insert'
incremental_strategy = 'delete+insert',
tags = ['scheduled']
) }}
WITH transactions AS (

View File

@ -2,7 +2,8 @@
materialized = 'incremental',
incremental_strategy = 'delete+insert',
cluster_by = ['_inserted_timestamp::date'],
unique_key = "CONCAT_WS('-', tx_id, event_index)"
unique_key = "CONCAT_WS('-', tx_id, event_index)",
tags = ['scheduled']
) }}
WITH silver_events AS (

View File

@ -2,7 +2,8 @@
materialized = 'incremental',
cluster_by = ['_inserted_timestamp::DATE'],
unique_key = 'tx_id',
incremental_strategy = 'delete+insert'
incremental_strategy = 'delete+insert',
tags = ['scheduled']
) }}
WITH bronze_txs AS (

View File

@ -1,7 +1,8 @@
{{ config(
materialized = 'table',
cluster_by = ['event_contract'],
unique_key = 'event_contract'
unique_key = 'event_contract',
tags = ['scheduled']
) }}
WITH splt AS (

View File

@ -2,6 +2,7 @@
materialized = 'table',
cluster_by = ['address'],
unique_key = 'event_id',
tags = ['scheduled']
) }}
WITH labels AS (

View File

@ -2,7 +2,8 @@
materialized = 'incremental',
cluster_by = ['_inserted_timestamp::DATE'],
unique_key = 'tx_id',
incremental_strategy = 'delete+insert'
incremental_strategy = 'delete+insert',
tags = ['scheduled']
) }}
WITH events AS (

View File

@ -2,7 +2,8 @@
materialized = 'incremental',
cluster_by = ['_inserted_timestamp::DATE'],
unique_key = 'tx_id',
incremental_strategy = 'delete+insert'
incremental_strategy = 'delete+insert',
tags = ['scheduled']
) }}
WITH events AS (

View File

@ -1,5 +1,6 @@
{{ config(
materialized = 'table'
materialized = 'table',
tags = ['scheduled']
) }}
WITH labels AS (

View File

@ -1,5 +1,6 @@
{{ config(
materialized = 'view'
materialized = 'view',
tags = ['scheduled']
) }}
SELECT

View File

@ -2,7 +2,8 @@
materialized = 'incremental',
incremental_strategy = 'delete+insert',
cluster_by = ['_inserted_timestamp::DATE'],
unique_key = 'nft_id'
unique_key = 'nft_id',
tags = ['scheduled']
) }}
WITH metadata AS (

View File

@ -3,7 +3,7 @@
incremental_strategy = 'delete+insert',
cluster_by = ['_inserted_timestamp::DATE'],
unique_key = 'tx_id',
tags = ['nft']
tags = ['nft', 'scheduled']
) }}
WITH events AS (

View File

@ -3,7 +3,7 @@
cluster_by = ['_inserted_timestamp'],
unique_key = "concat_ws('-', event_contract, edition_id)",
incremental_strategy = 'delete+insert',
tags = ['nft', 'dapper']
tags = ['nft', 'dapper', 'scheduled']
) }}
WITH events AS (

View File

@ -3,7 +3,7 @@
cluster_by = ['play_id'],
unique_key = "concat_ws('-', event_contract, play_id)",
incremental_strategy = 'delete+insert',
tags = ['nft', 'dapper']
tags = ['nft', 'dapper', 'scheduled']
) }}
WITH play_creation AS (

View File

@ -2,7 +2,7 @@
materialized = 'table',
cluster_by = ['_inserted_timestamp'],
unique_key = "concat_ws('-',event_contract,edition_id,nft_id)",
tags = ['nft', 'dapper']
tags = ['nft', 'dapper', 'scheduled']
) }}
WITH moments AS (

View File

@ -3,7 +3,7 @@
cluster_by = ['_inserted_timestamp'],
unique_key = "concat_ws('-', event_contract, edition_id)",
incremental_strategy = 'delete+insert',
tags = ['nft', 'dapper']
tags = ['nft', 'dapper', 'scheduled']
) }}
WITH events AS (

View File

@ -3,7 +3,7 @@
cluster_by = ['_inserted_timestamp'],
unique_key = "concat_ws('-', event_contract, moment_id)",
incremental_strategy = 'delete+insert',
tags = ['nft', 'dapper']
tags = ['nft', 'dapper', 'scheduled']
) }}
WITH events AS (

View File

@ -3,7 +3,7 @@
cluster_by = ['_inserted_timestamp'],
unique_key = "concat_ws('-', event_contract, series_id)",
incremental_strategy = 'delete+insert',
tags = ['nft', 'dapper']
tags = ['nft', 'dapper', 'scheduled']
) }}
WITH events AS (

View File

@ -3,7 +3,7 @@
cluster_by = ['_inserted_timestamp'],
unique_key = "concat_ws('-', event_contract, set_id)",
incremental_strategy = 'delete+insert',
tags = ['nft', 'dapper']
tags = ['nft', 'dapper', 'scheduled']
) }}
WITH events AS (

View File

@ -3,7 +3,7 @@
incremental_strategy = 'delete+insert',
cluster_by = ['_inserted_timestamp::DATE'],
unique_key = "CONCAT_WS('-', tx_id, event_index)",
tags = ['nft']
tags = ['nft', 'scheduled']
) }}
WITH events AS (

View File

@ -3,7 +3,7 @@
incremental_strategy = 'delete+insert',
cluster_by = ['_inserted_timestamp::DATE'],
unique_key = 'tx_id',
tags = ['nft']
tags = ['nft', 'scheduled']
) }}
WITH topshot AS (

View File

@ -2,7 +2,8 @@
materialized = 'incremental',
incremental_strategy = 'delete+insert',
cluster_by = ['_inserted_timestamp::DATE'],
unique_key = 'nft_id'
unique_key = 'nft_id',
tags = ['scheduled']
) }}
WITH metadata AS (

View File

@ -3,7 +3,7 @@
incremental_strategy = 'merge',
cluster_by = ['_inserted_timestamp::DATE'],
unique_key = ['tx_id','nft_id'],
tags = ['nft']
tags = ['nft', 'scheduled']
) }}
WITH silver_events AS (

View File

@ -3,7 +3,7 @@
incremental_strategy = 'delete+insert',
cluster_by = ['_inserted_timestamp::DATE'],
unique_key = 'tx_id',
tags = ['nft']
tags = ['nft', 'scheduled']
) }}
WITH silver_events AS (

View File

@ -3,7 +3,7 @@
incremental_strategy = 'delete+insert',
cluster_by = ['_inserted_timestamp::DATE'],
unique_key = 'tx_id',
tags = ['nft']
tags = ['nft', 'scheduled']
) }}
WITH silver_events AS (

View File

@ -2,7 +2,8 @@
materialized = 'incremental',
incremental_strategy = 'delete+insert',
cluster_by = ['recorded_at::DATE'],
unique_key = "concat_ws( '-', recorded_at, asset_id )"
unique_key = "concat_ws( '-', recorded_at, asset_id )",
tags = ['scheduled']
) }}
WITH token_prices AS (

View File

@ -2,7 +2,8 @@
materialized = 'incremental',
incremental_strategy = 'delete+insert',
cluster_by = ['_inserted_timestamp::date'],
unique_key = "concat_ws( '-', recorded_hour, id )"
unique_key = "concat_ws( '-', recorded_hour, id )",
tags = ['scheduled']
) }}
-- model named prices hourly but core view is hourly prices
-- this is to organize the silver model with other prices models

View File

@ -2,7 +2,8 @@
materialized = 'incremental',
incremental_strategy = 'delete+insert',
cluster_by = ['block_timestamp::date'],
unique_key = "CONCAT_WS('-', block_timestamp, token_contract)"
unique_key = "CONCAT_WS('-', block_timestamp, token_contract)",
tags = ['scheduled']
) }}
WITH swaps AS (

View File

@ -1,7 +1,8 @@
{{ config(
materialized = 'table',
cluster_by = ['recorded_hour::date'],
unique_key = "CONCAT_WS('-', recorded_hour, token)"
unique_key = "CONCAT_WS('-', recorded_hour, token)",
tags = ['scheduled']
) }}
WITH swap_prices AS (

View File

@ -2,7 +2,8 @@
materialized = 'incremental',
incremental_strategy = 'delete+insert',
cluster_by = ['_inserted_timestamp::DATE'],
unique_key = 'tx_id'
unique_key = 'tx_id',
tags = ['scheduled']
) }}
WITH silver_events AS (

View File

@ -2,7 +2,8 @@
materialized = 'incremental',
cluster_by = ['_inserted_timestamp::DATE'],
unique_key = "CONCAT_WS('-', tx_id, swap_index)",
incremental_strategy = 'delete+insert'
incremental_strategy = 'delete+insert',
tags = ['scheduled']
) }}
WITH events AS (

View File

@ -2,7 +2,8 @@
materialized = 'incremental',
cluster_by = ['_inserted_timestamp::DATE'],
unique_key = "CONCAT_WS('-', tx_id, event_index)",
incremental_strategy = 'delete+insert'
incremental_strategy = 'delete+insert',
tags = ['scheduled']
) }}
WITH swap_contracts AS (

View File

@ -2,7 +2,8 @@
materialized = 'incremental',
cluster_by = ['_inserted_timestamp::DATE'],
unique_key = 'tx_id',
incremental_strategy = 'delete+insert'
incremental_strategy = 'delete+insert',
tags = ['scheduled']
) }}
WITH swaps_events AS (

View File

@ -3,7 +3,7 @@
incremental_strategy = 'delete+insert',
cluster_by = ['_inserted_timestamp::date'],
unique_key = 'tx_id',
tags = ['bridge']
tags = ['bridge', 'scheduled']
) }}
WITH events AS (

View File

@ -3,7 +3,7 @@
incremental_strategy = 'delete+insert',
cluster_by = ['_inserted_timestamp::date'],
unique_key = 'tx_id',
tags = ['bridge']
tags = ['bridge', 'scheduled']
) }}
WITH events AS (

View File

@ -2,7 +2,8 @@
materialized = 'incremental',
incremental_strategy = 'delete+insert',
cluster_by = ['_inserted_timestamp::date'],
unique_key = "CONCAT_WS('-', tx_id, sender, recipient, token_contract, amount)"
unique_key = "CONCAT_WS('-', tx_id, sender, recipient, token_contract, amount)",
tags = ['scheduled']
) }}
WITH events AS (

View File

@ -33,10 +33,10 @@ sources:
- name: transaction_results
- name: crosschain_v2
- name: crosschain_silver
database: crosschain
schema: silver
tables:
- name: hourly_prices_coin_gecko
- name: hourly_prices_coin_market_cap
- name: number_sequence