This commit is contained in:
Mike Stepanovic 2025-04-07 14:16:36 -06:00
commit ab33217518
28 changed files with 1548 additions and 5 deletions

44
.github/workflows/dbt_run_noncore.yml vendored Normal file
View File

@ -0,0 +1,44 @@
name: dbt_run_noncore
run-name: dbt_run_noncore
on:
workflow_dispatch:
schedule:
- cron: "25,55 0/4 * * *"
env:
DBT_PROFILES_DIR: "${{ vars.DBT_PROFILES_DIR }}"
ACCOUNT: "${{ vars.ACCOUNT }}"
ROLE: "${{ vars.ROLE }}"
USER: "${{ vars.USER }}"
PASSWORD: "${{ secrets.PASSWORD }}"
REGION: "${{ vars.REGION }}"
DATABASE: "${{ vars.DATABASE }}"
WAREHOUSE: "${{ vars.WAREHOUSE }}"
SCHEMA: "${{ vars.SCHEMA }}"
concurrency:
group: ${{ github.workflow }}
jobs:
run_dbt_jobs:
runs-on: ubuntu-latest
environment:
name: workflow_prod
steps:
- uses: actions/checkout@v3
- uses: actions/setup-python@v4
with:
python-version: "${{ vars.PYTHON_VERSION }}"
cache: "pip"
- name: install dependencies
run: |
pip install -r requirements.txt
dbt deps
- name: Run DBT Jobs
run: |
dbt run -m movement_models,tag:noncore

View File

@ -0,0 +1,45 @@
name: dbt_run_observability
run-name: dbt_run_observability
on:
workflow_dispatch:
schedule:
# Runs “At minute 0 past every 8th hour.” (see https://crontab.guru)
- cron: '0 */8 * * *'
env:
DBT_PROFILES_DIR: ./
ACCOUNT: "${{ vars.ACCOUNT }}"
ROLE: "${{ vars.ROLE }}"
USER: "${{ vars.USER }}"
PASSWORD: "${{ secrets.PASSWORD }}"
REGION: "${{ vars.REGION }}"
DATABASE: "${{ vars.DATABASE }}"
WAREHOUSE: "${{ vars.WAREHOUSE }}"
SCHEMA: "${{ vars.SCHEMA }}"
concurrency:
group: ${{ github.workflow }}
jobs:
run_dbt_jobs:
runs-on: ubuntu-latest
environment:
name: workflow_prod
steps:
- uses: actions/checkout@v3
- uses: actions/setup-python@v4
with:
python-version: "${{ vars.PYTHON_VERSION }}"
cache: "pip"
- name: install dependencies
run: |
pip install -r requirements.txt
dbt deps
- name: Run DBT Jobs
run: |
dbt run -s tag:observability

View File

@ -0,0 +1,26 @@
{{ config (
materialized = 'view',
tags = ['noncore']
) }}
SELECT
asset_id,
symbol,
NAME,
decimals,
blockchain,
is_deprecated,
provider,
source,
_inserted_timestamp,
inserted_timestamp,
modified_timestamp,
complete_native_asset_metadata_id,
_invocation_id
FROM
{{ source(
'crosschain_silver',
'complete_native_asset_metadata'
) }}
WHERE
blockchain = 'movement'

View File

@ -0,0 +1,29 @@
{{ config (
materialized = 'view',
tags = ['noncore']
) }}
SELECT
HOUR,
asset_id,
symbol,
NAME,
decimals,
price,
blockchain,
is_imputed,
is_deprecated,
provider,
source,
_inserted_timestamp,
inserted_timestamp,
modified_timestamp,
complete_native_prices_id,
_invocation_id
FROM
{{ source(
'crosschain_silver',
'complete_native_prices'
) }}
WHERE
blockchain = 'movement'

View File

@ -0,0 +1,27 @@
{{ config (
materialized = 'view',
tags = ['noncore']
) }}
SELECT
asset_id,
'movement' AS token_address,
NAME,
symbol,
'movement' AS platform,
platform_id,
provider,
source,
_inserted_timestamp,
inserted_timestamp,
modified_timestamp,
complete_provider_asset_metadata_id,
_invocation_id
FROM
{{ source(
'crosschain_silver',
'complete_provider_asset_metadata'
) }}
WHERE
asset_id = 'movement'
AND token_address IS NULL

View File

@ -0,0 +1,25 @@
{{ config (
materialized = 'view',
tags = ['noncore']
) }}
SELECT
asset_id,
recorded_hour,
OPEN,
high,
low,
CLOSE,
provider,
source,
_inserted_timestamp,
inserted_timestamp,
modified_timestamp,
complete_provider_prices_id,
_invocation_id
FROM
{{ source(
'crosschain_silver',
'complete_provider_prices'
) }}
-- prices for all ids

View File

@ -0,0 +1,31 @@
{{ config (
materialized = 'view',
tags = ['noncore']
) }}
SELECT
'movement' AS token_address,
asset_id,
symbol,
'Movement' AS NAME,
decimals,
'movement' AS blockchain,
'movement' AS blockchain_name,
blockchain_id,
is_deprecated,
provider,
source,
_inserted_timestamp,
inserted_timestamp,
modified_timestamp,
complete_token_asset_metadata_id,
_invocation_id
FROM
{{ source(
'crosschain_silver',
'complete_token_asset_metadata'
) }}
WHERE
asset_id = 'movement' qualify(ROW_NUMBER() over(PARTITION BY asset_id
ORDER BY
is_deprecated, blockchain_id) = 1)

View File

@ -0,0 +1,34 @@
{{ config (
materialized = 'view',
tags = ['noncore']
) }}
SELECT
HOUR,
'movement' AS token_address,
asset_id,
symbol,
'Movement' AS NAME,
decimals,
price,
'movement' AS blockchain,
'movement' AS blockchain_name,
blockchain_id,
is_imputed,
is_deprecated,
provider,
source,
_inserted_timestamp,
inserted_timestamp,
modified_timestamp,
complete_token_prices_id,
_invocation_id
FROM
{{ source(
'crosschain_silver',
'complete_token_prices'
) }}
WHERE
asset_id = 'movement' qualify(ROW_NUMBER() over(PARTITION BY asset_id
ORDER BY
is_deprecated, blockchain_id) = 1)

View File

@ -0,0 +1,143 @@
{% docs prices_dim_asset_metadata_table_doc %}
A comprehensive dimensional table holding asset metadata and other relevant details pertaining to each id, from multiple providers. This data set includes raw, non-transformed data coming directly from the provider APIs and rows are not intended to be unique. As a result, there may be data quality issues persisting in the APIs that flow through to this dimensional model. If you are interested in using a curated data set instead, please utilize ez_asset_metadata.
{% enddocs %}
{% docs prices_ez_asset_metadata_table_doc %}
A convenience table holding prioritized asset metadata and other relevant details pertaining to each token_address and native asset. This data set is highly curated and contains metadata for one unique asset per blockchain.
{% enddocs %}
{% docs prices_fact_prices_ohlc_hourly_table_doc %}
A comprehensive fact table holding id and provider specific open, high, low, close hourly prices, from multiple providers. This data set includes raw, non-transformed data coming directly from the provider APIs and rows are not intended to be unique. As a result, there may be data quality issues persisting in the APIs that flow through to this fact based model. If you are interested in using a curated data set instead, please utilize ez_prices_hourly.
{% enddocs %}
{% docs prices_ez_prices_hourly_table_doc %}
A convenience table for determining token prices by address and blockchain, and native asset prices by symbol and blockchain. This data set is highly curated and contains metadata for one price per hour per unique asset and blockchain.
{% enddocs %}
{% docs prices_provider %}
The provider or source of the data.
{% enddocs %}
{% docs prices_asset_id %}
The unique identifier representing the asset.
{% enddocs %}
{% docs prices_name %}
The name of asset.
{% enddocs %}
{% docs prices_symbol %}
The symbol of asset.
{% enddocs %}
{% docs prices_token_address %}
The specific address representing the asset on a specific platform. This will be NULL if referring to a native asset.
{% enddocs %}
{% docs prices_token_address_evm %}
The specific address representing the asset on a specific platform. This will be NULL if referring to a native asset. The case (upper / lower) may or may not be specified within the `dim_asset_metadata` table, as this column is raw and not transformed, coming directly from the provider APIs. However, in the `ez_` views, it will be lowercase by default for all EVMs.
{% enddocs %}
{% docs prices_blockchain %}
The Blockchain, Network, or Platform for this asset.
{% enddocs %}
{% docs prices_blockchain_id %}
The unique identifier of the Blockchain, Network, or Platform for this asset.
{% enddocs %}
{% docs prices_decimals %}
The number of decimals for the asset. May be NULL.
{% enddocs %}
{% docs prices_is_native %}
A flag indicating assets native to the respective blockchain.
{% enddocs %}
{% docs prices_is_deprecated %}
A flag indicating if the asset is deprecated or no longer supported by the provider.
{% enddocs %}
{% docs prices_id_deprecation %}
Deprecating soon! Please use the `asset_id` column instead.
{% enddocs %}
{% docs prices_decimals_deprecation %}
Deprecating soon! Please use the decimals column in `ez_asset_metadata` or join in `dim_contracts` instead.
{% enddocs %}
{% docs prices_hour %}
Hour that the price was recorded at.
{% enddocs %}
{% docs prices_price %}
Closing price of the recorded hour in USD.
{% enddocs %}
{% docs prices_is_imputed %}
A flag indicating if the price was imputed, or derived, from the last arriving record. This is generally used for tokens with low-liquidity or inconsistent reporting.
{% enddocs %}
{% docs prices_open %}
Opening price of the recorded hour in USD.
{% enddocs %}
{% docs prices_high %}
Highest price of the recorded hour in USD
{% enddocs %}
{% docs prices_low %}
Lowest price of the recorded hour in USD
{% enddocs %}
{% docs prices_close %}
Closing price of the recorded hour in USD
{% enddocs %}

View File

@ -0,0 +1,115 @@
version: 2
models:
- name: price__dim_asset_metadata
description: '{{ doc("prices_dim_asset_metadata_table_doc") }}'
columns:
- name: PROVIDER
description: '{{ doc("prices_provider")}}'
- name: ASSET_ID
description: '{{ doc("prices_asset_id") }}'
- name: NAME
description: '{{ doc("prices_name") }}'
- name: SYMBOL
description: '{{ doc("prices_symbol") }}'
- name: TOKEN_ADDRESS
description: '{{ doc("prices_token_address_evm") }}'
- name: BLOCKCHAIN
description: '{{ doc("prices_blockchain") }}'
- name: BLOCKCHAIN_ID
description: '{{ doc("prices_blockchain_id") }}'
- name: DIM_ASSET_METADATA_ID
description: '{{ doc("pk") }}'
- name: INSERTED_TIMESTAMP
description: '{{ doc("inserted_timestamp") }}'
- name: MODIFIED_TIMESTAMP
description: '{{ doc("modified_timestamp") }}'
- name: price__ez_asset_metadata
description: '{{ doc("prices_ez_asset_metadata_table_doc") }}'
columns:
- name: ASSET_ID
description: '{{ doc("prices_asset_id") }}'
- name: NAME
description: '{{ doc("prices_name") }}'
- name: SYMBOL
description: '{{ doc("prices_symbol") }}'
- name: TOKEN_ADDRESS
description: '{{ doc("prices_token_address_evm") }}'
- name: BLOCKCHAIN
description: '{{ doc("prices_blockchain") }}'
- name: DECIMALS
description: '{{ doc("prices_decimals") }}'
- name: IS_NATIVE
description: '{{ doc("prices_is_native") }}'
- name: IS_DEPRECATED
description: '{{ doc("prices_is_deprecated") }}'
- name: EZ_ASSET_METADATA_ID
description: '{{ doc("pk") }}'
- name: INSERTED_TIMESTAMP
description: '{{ doc("inserted_timestamp") }}'
- name: MODIFIED_TIMESTAMP
description: '{{ doc("modified_timestamp") }}'
- name: price__ez_prices_hourly
description: '{{ doc("prices_ez_prices_hourly_table_doc") }}'
tests:
- dbt_utils.recency:
datepart: hour
field: MODIFIED_TIMESTAMP
interval: 6
severity: error
tags: ['test_recency']
columns:
- name: HOUR
description: '{{ doc("prices_hour")}}'
- name: TOKEN_ADDRESS
description: '{{ doc("prices_token_address_evm") }}'
- name: SYMBOL
description: '{{ doc("prices_symbol") }}'
- name: BLOCKCHAIN
description: '{{ doc("prices_blockchain") }}'
- name: DECIMALS
description: '{{ doc("prices_decimals") }}'
- name: PRICE
description: '{{ doc("prices_price") }}'
- name: IS_NATIVE
description: '{{ doc("prices_is_native") }}'
- name: IS_IMPUTED
description: '{{ doc("prices_is_imputed") }}'
- name: IS_DEPRECATED
description: '{{ doc("prices_is_deprecated") }}'
- name: EZ_PRICES_HOURLY_ID
description: '{{ doc("pk") }}'
- name: INSERTED_TIMESTAMP
description: '{{ doc("inserted_timestamp") }}'
- name: MODIFIED_TIMESTAMP
description: '{{ doc("modified_timestamp") }}'
- name: price__fact_prices_ohlc_hourly
description: '{{ doc("prices_fact_prices_ohlc_hourly_table_doc") }}'
tests:
- dbt_utils.recency:
datepart: hour
field: MODIFIED_TIMESTAMP
interval: 6
severity: error
tags: ['test_recency']
columns:
- name: HOUR
description: '{{ doc("prices_hour")}}'
- name: ASSET_ID
description: '{{ doc("prices_asset_id") }}'
- name: OPEN
description: '{{ doc("prices_open") }}'
- name: HIGH
description: '{{ doc("prices_high") }}'
- name: LOW
description: '{{ doc("prices_low") }}'
- name: CLOSE
description: '{{ doc("prices_close") }}'
- name: FACT_PRICES_OHLC_HOURLY_ID
description: '{{ doc("pk") }}'
- name: INSERTED_TIMESTAMP
description: '{{ doc("inserted_timestamp") }}'
- name: MODIFIED_TIMESTAMP
description: '{{ doc("modified_timestamp") }}'

View File

@ -0,0 +1,33 @@
{{ config(
materialized = 'incremental',
incremental_strategy = 'delete+insert',
unique_key = 'dim_asset_metadata_id',
post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION ON EQUALITY(token_address, asset_id, symbol, name)",
tags = ['noncore']
) }}
SELECT
token_address,
asset_id,
symbol,
NAME,
platform AS blockchain,
platform_id AS blockchain_id,
provider,
inserted_timestamp,
modified_timestamp,
complete_provider_asset_metadata_id AS dim_asset_metadata_id
FROM
{{ ref('silver__complete_provider_asset_metadata') }}
{% if is_incremental() %}
WHERE
modified_timestamp >= (
SELECT
MAX(
modified_timestamp
)
FROM
{{ this }}
)
{% endif %}

View File

@ -0,0 +1,61 @@
{{ config(
materialized = 'incremental',
incremental_strategy = 'delete+insert',
unique_key = 'ez_asset_metadata_id',
post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION ON EQUALITY(asset_id, symbol, name)",
tags = ['noncore']
) }}
SELECT
token_address,
asset_id,
symbol,
NAME,
decimals,
blockchain,
TRUE AS is_native,
is_deprecated,
inserted_timestamp,
modified_timestamp,
complete_token_asset_metadata_id AS ez_asset_metadata_id
FROM
{{ ref('silver__complete_token_asset_metadata') }}
{% if is_incremental() %}
WHERE
modified_timestamp >= (
SELECT
MAX(
modified_timestamp
)
FROM
{{ this }}
)
{% endif %}
UNION ALL
SELECT
NULL AS token_address,
asset_id,
symbol,
NAME,
decimals,
blockchain,
TRUE AS is_native,
is_deprecated,
inserted_timestamp,
modified_timestamp,
complete_native_asset_metadata_id AS ez_asset_metadata_id
FROM
{{ ref('silver__complete_native_asset_metadata') }}
{% if is_incremental() %}
WHERE
modified_timestamp >= (
SELECT
MAX(
modified_timestamp
)
FROM
{{ this }}
)
{% endif %}

View File

@ -0,0 +1,64 @@
{{ config(
materialized = 'incremental',
incremental_strategy = 'delete+insert',
unique_key = 'ez_prices_hourly_id',
cluster_by = ['HOUR::DATE','is_native'],
post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION ON EQUALITY(token_address, symbol, NAME)",
tags = ['noncore']
) }}
SELECT
HOUR,
token_address,
symbol,
NAME,
decimals,
price,
blockchain,
FALSE AS is_native,
is_imputed,
is_deprecated,
inserted_timestamp,
modified_timestamp,
complete_token_prices_id AS ez_prices_hourly_id
FROM
{{ ref('silver__complete_token_prices') }}
{% if is_incremental() %}
WHERE
modified_timestamp >= (
SELECT
MAX(
modified_timestamp
)
FROM
{{ this }}
)
{% endif %}
UNION ALL
SELECT
HOUR,
NULL AS token_address,
symbol,
NAME,
decimals,
price,
blockchain,
TRUE AS is_native,
is_imputed,
is_deprecated,
inserted_timestamp,
modified_timestamp,
complete_native_prices_id AS ez_prices_hourly_id
FROM
{{ ref('silver__complete_native_prices') }}
{% if is_incremental() %}
WHERE
modified_timestamp >= (
SELECT
MAX(
modified_timestamp
)
FROM
{{ this }}
)
{% endif %}

View File

@ -0,0 +1,33 @@
{{ config(
materialized = 'incremental',
incremental_strategy = 'delete+insert',
unique_key = 'fact_prices_ohlc_hourly_id',
cluster_by = ['HOUR::DATE'],
post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION ON EQUALITY(asset_id, provider)",
tags = ['noncore']
) }}
SELECT
asset_id,
recorded_hour AS HOUR,
OPEN,
high,
low,
CLOSE,
provider,
inserted_timestamp,
modified_timestamp,
complete_provider_prices_id AS fact_prices_ohlc_hourly_id
FROM
{{ ref('silver__complete_provider_prices') }}
{% if is_incremental() %}
WHERE
modified_timestamp >= (
SELECT
MAX(
modified_timestamp
)
FROM
{{ this }}
)
{% endif %}

View File

@ -0,0 +1,130 @@
version: 2
models:
- name: silver_observability__blocks_completeness
description: Records of all blocks block gaps (missing blocks) with a timestamp the test was run
config:
contract:
enforced: true
tests:
- dbt_utils.recency:
datepart: day
field: TEST_TIMESTAMP
interval: 2
severity: error
tags: ['test_recency']
columns:
- name: TEST_NAME
data_type: VARCHAR
description: Name for the test
- name: MIN_BLOCK
data_type: NUMBER
description: The lowest block id in the test
tests:
- not_null:
tags: ['test_quality']
- name: MAX_BLOCK
data_type: NUMBER
description: The highest block id in the test
tests:
- not_null:
tags: ['test_quality']
- name: MIN_BLOCK_TIMESTAMP
data_type: TIMESTAMP_NTZ
description: The lowest block timestamp in the test
tests:
- not_null:
tags: ['test_quality']
- name: MAX_BLOCK_TIMESTAMP
data_type: TIMESTAMP_NTZ
description: The highest block timestamp in the test
tests:
- not_null:
tags: ['test_quality']
- name: BLOCKS_TESTED
data_type: NUMBER
description: Count of blocks in the test
tests:
- not_null:
tags: ['test_quality']
- name: BLOCKS_IMPACTED_COUNT
data_type: NUMBER
description: Count of block gaps in the test
tests:
- not_null:
tags: ['test_quality']
- name: BLOCKS_IMPACTED_ARRAY
data_type: ARRAY
description: Array of affected blocks
- name: TEST_TIMESTAMP
data_type: TIMESTAMP_NTZ
description: When the test was run
tests:
- not_null:
tags: ['test_quality']
- unique:
tags: ['test_quality']
- name: MODIFIED_TIMESTAMP
data_type: TIMESTAMP_NTZ
- name: silver_observability__transactions_completeness
description: Records of all blocks with missing transactions with a timestamp the test was run
tests:
- dbt_utils.recency:
datepart: day
field: TEST_TIMESTAMP
interval: 2
severity: error
tags: ['test_recency']
columns:
- name: TEST_NAME
data_type: VARCHAR
description: Name for the test
- name: MIN_BLOCK
data_type: NUMBER
description: The lowest block id in the test
tests:
- not_null:
tags: ['test_quality']
- name: MAX_BLOCK
data_type: NUMBER
description: The highest block id in the test
tests:
- not_null:
tags: ['test_quality']
- name: MIN_BLOCK_TIMESTAMP
data_type: TIMESTAMP_NTZ
description: The lowest block timestamp in the test
tests:
- not_null:
tags: ['test_quality']
- name: MAX_BLOCK_TIMESTAMP
data_type: TIMESTAMP_NTZ
description: The highest block timestamp in the test
tests:
- not_null:
tags: ['test_quality']
- name: BLOCKS_TESTED
data_type: NUMBER
description: Count of blocks in the test
tests:
- not_null:
tags: ['test_quality']
- name: BLOCKS_IMPACTED_COUNT
data_type: NUMBER
description: Count of block gaps in the test
tests:
- not_null:
tags: ['test_quality']
- name: BLOCKS_IMPACTED_ARRAY
data_type: ARRAY
description: Array of affected blocks
- name: TEST_TIMESTAMP
data_type: TIMESTAMP_NTZ
description: When the test was run
tests:
- not_null:
tags: ['test_quality']
- unique:
tags: ['test_quality']
- name: MODIFIED_TIMESTAMP
data_type: TIMESTAMP_NTZ

View File

@ -0,0 +1,169 @@
{{ config(
materialized = 'incremental',
unique_key = 'test_timestamp',
full_refresh = false,
tags = ['observability'],
enabled = true
) }}
WITH summary_stats AS (
SELECT
MIN(block_number) AS min_block,
MAX(block_number) AS max_block,
MIN(block_timestamp) AS min_block_timestamp,
MAX(block_timestamp) AS max_block_timestamp,
COUNT(1) AS blocks_tested
FROM
{{ ref('core__fact_blocks') }}
WHERE
block_timestamp <= DATEADD('hour', -12, CURRENT_TIMESTAMP())
{% if is_incremental() %}
AND (
block_number >= (
SELECT
MIN(block_number)
FROM
(
SELECT
MIN(block_number) AS block_number
FROM
{{ ref('core__fact_blocks') }}
WHERE
block_timestamp BETWEEN DATEADD('hour', -96, CURRENT_TIMESTAMP())
AND DATEADD('hour', -95, CURRENT_TIMESTAMP())
UNION
SELECT
MIN(VALUE) - 1 AS block_number
FROM
(
SELECT
blocks_impacted_array
FROM
{{ this }}
qualify ROW_NUMBER() over (
ORDER BY
test_timestamp DESC
) = 1
),
LATERAL FLATTEN(
input => blocks_impacted_array
)
)
) {% if var('OBSERV_FULL_TEST') %}
OR block_number >= 0
{% endif %}
)
{% endif %}
),
block_range AS (
SELECT
_id AS block_number
FROM
{{ source(
'crosschain_silver',
'number_sequence'
) }}
WHERE
_id BETWEEN (
SELECT
min_block
FROM
summary_stats
)
AND (
SELECT
max_block
FROM
summary_stats
)
),
blocks AS (
SELECT
l.block_number,
block_timestamp,
LAG(
l.block_number,
1
) over (
ORDER BY
l.block_number ASC
) AS prev_BLOCK_NUMBER
FROM
{{ ref("core__fact_blocks") }}
l
INNER JOIN block_range b
ON l.block_number = b.block_number
AND l.block_number >= (
SELECT
MIN(block_number)
FROM
block_range
)
),
block_gen AS (
SELECT
_id AS block_number
FROM
{{ source(
'crosschain_silver',
'number_sequence'
) }}
WHERE
_id BETWEEN (
SELECT
MIN(block_number)
FROM
blocks
)
AND (
SELECT
MAX(block_number)
FROM
blocks
)
)
SELECT
'blocks' AS test_name,
MIN(
b.block_number
) AS min_block,
MAX(
b.block_number
) AS max_block,
MIN(
b.block_timestamp
) AS min_block_timestamp,
MAX(
b.block_timestamp
) AS max_block_timestamp,
COUNT(1) AS blocks_tested,
COUNT(
CASE
WHEN C.block_number IS NOT NULL THEN A.block_number
END
) AS blocks_impacted_count,
ARRAY_AGG(
CASE
WHEN C.block_number IS NOT NULL THEN A.block_number
END
) within GROUP (
ORDER BY
A.block_number
) AS blocks_impacted_array,
SYSDATE() AS test_timestamp,
SYSDATE() AS modified_timestamp
FROM
block_gen A
LEFT JOIN blocks b
ON A.block_number = b.block_number
LEFT JOIN blocks C
ON A.block_number > C.prev_block_number
AND A.block_number < C.block_number
AND C.block_number - C.prev_block_number <> 1
WHERE
COALESCE(
b.block_number,
C.block_number
) IS NOT NULL

View File

@ -0,0 +1,142 @@
{{ config(
materialized = 'incremental',
unique_key = 'test_timestamp',
full_refresh = false,
tags = ['observability']
) }}
WITH summary_stats AS (
SELECT
MIN(block_number) AS min_block,
MAX(block_number) AS max_block,
MIN(block_timestamp) AS min_block_timestamp,
MAX(block_timestamp) AS max_block_timestamp,
COUNT(1) AS blocks_tested
FROM
{{ ref('core__fact_blocks') }}
WHERE
block_timestamp <= DATEADD('hour', -12, CURRENT_TIMESTAMP())
{% if is_incremental() %}
AND (
block_number >= (
SELECT
MIN(block_number)
FROM
(
SELECT
MIN(block_number) AS block_number
FROM
{{ ref('core__fact_blocks') }}
WHERE
block_timestamp BETWEEN DATEADD('hour', -96, CURRENT_TIMESTAMP())
AND DATEADD('hour', -95, CURRENT_TIMESTAMP())
UNION
SELECT
MIN(VALUE) - 1 AS block_number
FROM
(
SELECT
blocks_impacted_array
FROM
{{ this }}
qualify ROW_NUMBER() over (
ORDER BY
test_timestamp DESC
) = 1
),
LATERAL FLATTEN(
input => blocks_impacted_array
)
)
) {% if var('OBSERV_FULL_TEST') %}
OR block_number >= 0
{% endif %}
)
{% endif %}
),
base_blocks AS (
SELECT
block_number,
tx_count AS transaction_count
FROM
{{ ref('core__fact_blocks') }}
WHERE
block_number BETWEEN (
SELECT
min_block
FROM
summary_stats
)
AND (
SELECT
max_block
FROM
summary_stats
)
AND
block_number NOT IN (0, 1758, 1760, 1761, 1762, 1763, 1764, 1766)
),
actual_tx_counts AS (
SELECT
block_number,
COUNT(1) AS transaction_count
FROM
{{ ref('core__fact_transactions') }}
WHERE
block_number BETWEEN (
SELECT
min_block
FROM
summary_stats
)
AND (
SELECT
max_block
FROM
summary_stats
)
AND
block_number NOT IN (0, 1758, 1760, 1761, 1762, 1763, 1764, 1766)
GROUP BY
block_number
),
potential_missing_txs AS (
SELECT
e.block_number
FROM
base_blocks e
LEFT OUTER JOIN actual_tx_counts A
ON e.block_number = A.block_number
WHERE
COALESCE(
A.transaction_count,
0
) <> e.transaction_count
),
impacted_blocks AS (
SELECT
COUNT(1) AS blocks_impacted_count,
ARRAY_AGG(block_number) within GROUP (
ORDER BY
block_number
) AS blocks_impacted_array
FROM
potential_missing_txs
)
SELECT
'transactions' AS test_name,
min_block,
max_block,
min_block_timestamp,
max_block_timestamp,
blocks_tested,
blocks_impacted_count,
blocks_impacted_array,
SYSDATE() AS test_timestamp,
SYSDATE() AS modified_timestamp
FROM
summary_stats
JOIN impacted_blocks
ON 1 = 1

View File

@ -0,0 +1,24 @@
{{ config(
materialized = 'view',
persist_docs ={ "relation": true, "columns": true },
tags = ['noncore']
) }}
SELECT
asset_id,
symbol,
NAME,
decimals,
blockchain,
is_deprecated,
provider,
source,
_inserted_timestamp,
inserted_timestamp,
modified_timestamp,
complete_native_asset_metadata_id,
_invocation_id
FROM
{{ ref(
'bronze__complete_native_asset_metadata'
) }}

View File

@ -0,0 +1,27 @@
{{ config(
materialized = 'view',
persist_docs ={ "relation": true, "columns": true },
tags = ['noncore']
) }}
SELECT
HOUR,
asset_id,
symbol,
NAME,
decimals,
price,
blockchain,
is_imputed,
is_deprecated,
provider,
source,
_inserted_timestamp,
inserted_timestamp,
modified_timestamp,
complete_native_prices_id,
_invocation_id
FROM
{{ ref(
'bronze__complete_native_prices'
) }}

View File

@ -0,0 +1,24 @@
{{ config(
materialized = 'view',
persist_docs ={ "relation": true, "columns": true },
tags = ['noncore']
) }}
SELECT
asset_id,
token_address,
NAME,
symbol,
platform,
platform_id,
provider,
source,
_inserted_timestamp,
inserted_timestamp,
modified_timestamp,
complete_provider_asset_metadata_id,
_invocation_id
FROM
{{ ref(
'bronze__complete_provider_asset_metadata'
) }}

View File

@ -0,0 +1,32 @@
{{ config(
materialized = 'view',
persist_docs ={ "relation": true, "columns": true },
tags = ['noncore']
) }}
SELECT
p.asset_id,
recorded_hour,
OPEN,
high,
low,
CLOSE,
p.provider,
p.source,
p._inserted_timestamp,
p.inserted_timestamp,
p.modified_timestamp,
p.complete_provider_prices_id,
p._invocation_id
FROM
{{ ref(
'bronze__complete_provider_prices'
) }}
p
INNER JOIN {{ ref('bronze__complete_provider_asset_metadata') }}
m
ON p.asset_id = m.asset_id
qualify(ROW_NUMBER() over (PARTITION BY p.asset_id, recorded_hour, p.provider
ORDER BY
p.modified_timestamp DESC)) = 1

View File

@ -0,0 +1,29 @@
{{ config(
materialized = 'view',
persist_docs ={ "relation": true, "columns": true },
tags = ['noncore']
) }}
SELECT
LOWER(
A.token_address
) AS token_address,
asset_id,
symbol,
NAME,
decimals,
blockchain,
blockchain_name,
blockchain_id,
is_deprecated,
provider,
source,
_inserted_timestamp,
inserted_timestamp,
modified_timestamp,
complete_token_asset_metadata_id,
_invocation_id
FROM
{{ ref(
'bronze__complete_token_asset_metadata'
) }} A

View File

@ -0,0 +1,32 @@
{{ config(
materialized = 'view',
persist_docs ={ "relation": true, "columns": true },
tags = ['noncore']
) }}
SELECT
HOUR,
LOWER(
token_address
) AS token_address,
asset_id,
symbol,
NAME,
decimals,
price,
blockchain,
blockchain_name,
blockchain_id,
is_imputed,
is_deprecated,
provider,
source,
_inserted_timestamp,
inserted_timestamp,
modified_timestamp,
complete_token_prices_id,
_invocation_id
FROM
{{ ref(
'bronze__complete_token_prices'
) }}

View File

@ -0,0 +1,218 @@
version: 2
models:
- name: silver__complete_native_asset_metadata
tests:
- dbt_utils.unique_combination_of_columns:
combination_of_columns:
- SYMBOL
tags: ['test_quality']
columns:
- name: PROVIDER
tests:
- not_null:
tags: ['test_quality']
- name: SYMBOL
tests:
- not_null:
tags: ['test_quality']
- name: BLOCKCHAIN
tests:
- not_null:
tags: ['test_quality']
- name: MODIFIED_TIMESTAMP
tests:
- not_null:
tags: ['test_quality']
- name: COMPLETE_NATIVE_ASSET_METADATA_ID
tests:
- unique:
tags: ['test_quality']
- name: silver__complete_native_prices
tests:
- dbt_utils.unique_combination_of_columns:
combination_of_columns:
- HOUR
- SYMBOL
tags: ['test_quality']
columns:
- name: HOUR
tests:
- not_null:
tags: ['test_quality']
- name: SYMBOL
tests:
- not_null:
tags: ['test_quality']
- name: BLOCKCHAIN
tests:
- not_null:
tags: ['test_quality']
- name: PROVIDER
tests:
- not_null:
tags: ['test_quality']
- name: PRICE
tests:
- not_null:
tags: ['test_quality']
- name: IS_IMPUTED
tests:
- not_null:
tags: ['test_quality']
- name: _INSERTED_TIMESTAMP
tests:
- not_null:
tags: ['test_quality']
- name: MODIFIED_TIMESTAMP
tests:
- not_null:
tags: ['test_quality']
- name: COMPLETE_NATIVE_PRICES_ID
tests:
- unique:
tags: ['test_quality']
- name: silver__complete_provider_asset_metadata
tests:
- dbt_utils.unique_combination_of_columns:
combination_of_columns:
- ASSET_ID
- TOKEN_ADDRESS
- NAME
- SYMBOL
- PLATFORM
- PLATFORM_ID
- PROVIDER
tags: ['test_quality']
columns:
- name: PROVIDER
tests:
- not_null:
tags: ['test_quality']
- name: ASSET_ID
tests:
- not_null:
tags: ['test_quality']
- name: MODIFIED_TIMESTAMP
tests:
- not_null:
tags: ['test_quality']
- name: COMPLETE_PROVIDER_ASSET_METADATA_ID
tests:
- unique:
tags: ['test_quality']
- name: silver__complete_provider_prices
tests:
- dbt_utils.unique_combination_of_columns:
combination_of_columns:
- ASSET_ID
- RECORDED_HOUR
- PROVIDER
tags: ['test_quality']
columns:
- name: PROVIDER
tests:
- not_null:
tags: ['test_quality']
- name: ASSET_ID
tests:
- not_null:
tags: ['test_quality']
- name: RECORDED_HOUR
tests:
- not_null:
tags: ['test_quality']
- name: MODIFIED_TIMESTAMP
tests:
- not_null:
tags: ['test_quality']
- name: COMPLETE_PROVIDER_PRICES_ID
tests:
- unique:
tags: ['test_quality']
- name: silver__complete_token_asset_metadata
tests:
- dbt_utils.unique_combination_of_columns:
combination_of_columns:
- TOKEN_ADDRESS
- BLOCKCHAIN
tags: ['test_quality']
columns:
- name: PROVIDER
tests:
- not_null:
tags: ['test_quality']
- name: TOKEN_ADDRESS
tests:
- not_null:
tags: ['test_quality']
- name: BLOCKCHAIN
tests:
- not_null:
tags: ['test_quality']
- name: BLOCKCHAIN_ID
tests:
- not_null:
tags: ['test_quality']
- name: MODIFIED_TIMESTAMP
tests:
- not_null:
tags: ['test_quality']
- name: COMPLETE_TOKEN_ASSET_METADATA_ID
tests:
- unique:
tags: ['test_quality']
- name: silver__complete_token_prices
tests:
- dbt_utils.unique_combination_of_columns:
combination_of_columns:
- HOUR
- TOKEN_ADDRESS
- BLOCKCHAIN
tags: ['test_quality']
columns:
- name: HOUR
tests:
- not_null:
tags: ['test_quality']
- name: TOKEN_ADDRESS
tests:
- not_null:
tags: ['test_quality']
- name: BLOCKCHAIN
tests:
- not_null:
tags: ['test_quality']
- name: BLOCKCHAIN_ID
tests:
- not_null:
tags: ['test_quality']
- name: PROVIDER
tests:
- not_null:
tags: ['test_quality']
- name: PRICE
tests:
- not_null:
tags: ['test_quality']
- name: IS_IMPUTED
tests:
- not_null:
tags: ['test_quality']
- name: _INSERTED_TIMESTAMP
tests:
- not_null:
tags: ['test_quality']
- name: MODIFIED_TIMESTAMP
tests:
- not_null:
tags: ['test_quality']
- name: COMPLETE_TOKEN_PRICES_ID
tests:
- unique:
tags: ['test_quality']

View File

@ -1,5 +1,4 @@
version: 2
sources:
- name: crosschain
database: "{{ 'crosschain' if target.database == 'MOVEMENT' else 'crosschain_dev' }}"
@ -13,6 +12,13 @@ sources:
schema: silver
tables:
- name: number_sequence
- name: labels_combined
- name: complete_token_asset_metadata
- name: complete_token_prices
- name: complete_provider_asset_metadata
- name: complete_provider_prices
- name: complete_native_asset_metadata
- name: complete_native_prices
- name: bronze_streamline
database: streamline
schema: |
@ -30,4 +36,4 @@ sources:
database: movement
schema: github_actions
tables:
- name: workflows
- name: workflows

View File

@ -41,7 +41,7 @@ SELECT
'Flipside_Crypto/0.1'
),
PARSE_JSON('{}'),
'Vault/prod/movement/mainnet'
'Vault/prod/movement/mainnet_fsc'
) AS request
FROM
blocks

View File

@ -105,7 +105,7 @@ numbers AS (
'Flipside_Crypto/0.1'
),
PARSE_JSON('{}'),
'Vault/prod/movement/mainnet'
'Vault/prod/movement/mainnet_fsc'
) AS request
FROM
WORK

View File

@ -16,5 +16,5 @@ SELECT
'Flipside_Crypto/0.1'
),
OBJECT_CONSTRUCT(),
'Vault/prod/movement/mainnet'
'Vault/prod/movement/mainnet_fsc'
) :data :block_height :: INT AS block_number