tags and schedules

This commit is contained in:
Eric Laurello 2023-11-10 17:16:08 -05:00
parent 0a102432e3
commit 8f9f6987a4
38 changed files with 203 additions and 1150 deletions

View File

@ -1,5 +1,5 @@
name: dbt_run_scheduled
run-name: dbt_run_scheduled
name: dbt_run_scheduled_core
run-name: dbt_run_scheduled_core
on:
workflow_dispatch:
@ -28,7 +28,7 @@ jobs:
uses: FlipsideCrypto/analytics-workflow-templates/.github/workflows/dbt_run_template.yml@main
with:
dbt_command: |
dbt run -m models/bronze models/gold models/silver --exclude tag:classic models/silver/_observability models/silver/silver__messages.sql;
dbt run -m "terra_models,tag:core"
environment: workflow_prod
warehouse: ${{ vars.WAREHOUSE }}
secrets: inherit

View File

@ -0,0 +1,34 @@
name: dbt_run_scheduled_non_core
run-name: dbt_run_scheduled_non_core
on:
workflow_dispatch:
# schedule:
# # Runs 0700 daily (see https://crontab.guru)
# - cron: '20,50 * * * *'
env:
USE_VARS: "${{ vars.USE_VARS }}"
DBT_PROFILES_DIR: "${{ vars.DBT_PROFILES_DIR }}"
DBT_VERSION: "${{ vars.DBT_VERSION }}"
ACCOUNT: "${{ vars.ACCOUNT }}"
ROLE: "${{ vars.ROLE }}"
USER: "${{ vars.USER }}"
PASSWORD: "${{ secrets.PASSWORD }}"
REGION: "${{ vars.REGION }}"
DATABASE: "${{ vars.DATABASE }}"
WAREHOUSE: "${{ vars.WAREHOUSE }}"
SCHEMA: "${{ vars.SCHEMA }}"
concurrency:
group: ${{ github.workflow }}
jobs:
called_workflow_template:
uses: FlipsideCrypto/analytics-workflow-templates/.github/workflows/dbt_run_template.yml@main
with:
dbt_command: |
dbt run -m "terra_models,tag:core"
environment: workflow_prod
warehouse: ${{ vars.WAREHOUSE }}
secrets: inherit

View File

@ -1,5 +1,6 @@
workflow_name,workflow_schedule
dbt_run_observability,"15 0,4,8,12,17,20 * * *"
dbt_run_scheduled,"6,18,30,42,54 * * * *"
dbt_run_streamline,"0,12,24,36,48 * * * *"
dbt_test_tasks,"0,30 * * * *"
dbt_run_scheduled_core,"3,33 * * * *"
dbt_run_scheduled_non_core,"42 * * * *"
dbt_run_streamline,"22,52 * * * *"
dbt_test_tasks,"10,40 * * * *"
1 workflow_name workflow_schedule
2 dbt_run_observability 15 0,4,8,12,17,20 * * *
3 dbt_run_scheduled dbt_run_scheduled_core 6,18,30,42,54 * * * * 3,33 * * * *
4 dbt_run_streamline dbt_run_scheduled_non_core 0,12,24,36,48 * * * * 42 * * * *
5 dbt_test_tasks dbt_run_streamline 0,30 * * * * 22,52 * * * *
6 dbt_test_tasks 10,40 * * * *

View File

@ -1,5 +1,6 @@
{{ config (
materialized = 'view'
materialized = 'view',
tags = ['core']
) }}
WITH lq_base AS (

View File

@ -1,5 +1,6 @@
{{ config (
materialized = 'view'
materialized = 'view',
tags = ['core']
) }}
WITH lq_base AS (

View File

@ -1,6 +1,7 @@
{{ config(
materialized = 'incremental',
full_refresh = false
full_refresh = false,
tags = ['noncore']
) }}
WITH min_block AS (

View File

@ -1,5 +1,6 @@
{{ config (
materialized = 'view'
materialized = 'view',
tags = ['core']
) }}
{{ streamline_external_table_FR_query(
"blocks",

View File

@ -1,5 +1,6 @@
{{ config (
materialized = 'view'
materialized = 'view',
tags = ['core']
) }}
{{ streamline_external_table_FR_query(
"transactions",

View File

@ -1,5 +1,6 @@
{{ config (
materialized = 'view'
materialized = 'view',
tags = ['core']
) }}
{% set model = this.identifier.split("_") [-1] %}

View File

@ -1,5 +1,6 @@
{{ config (
materialized = 'view'
materialized = 'view',
tags = ['core']
) }}
{% set model = this.identifier.split("_") [-1] %}

View File

@ -1,6 +1,6 @@
{{ config(
materialized = "view",
secure = true
tags = ['core']
) }}
WITH staking AS (

View File

@ -1,8 +1,16 @@
{{ config(materialized="view", secure=true) }}
{{ config(
materialized = "view",
tags = ['core']
) }}
with swap as (select * from {{ ref("silver__dex_swaps") }})
WITH swap AS (
select
SELECT
*
FROM
{{ ref("silver__dex_swaps") }}
)
SELECT
block_id,
block_timestamp,
blockchain,
@ -15,7 +23,7 @@ select
from_decimal,
to_amount,
to_currency,
to_decimal,
TO_DECIMAL,
pool_id
from swap
FROM
swap

View File

@ -1,6 +1,7 @@
{{ config(
materialized = 'incremental',
unique_key = 'address'
unique_key = 'address',
tags = ['noncore']
) }}
WITH labels AS (

View File

@ -3,6 +3,7 @@
unique_key = "block_id",
incremental_strategy = 'merge',
cluster_by = ['block_timestamp::DATE'],
tags = ['noncore']
) }}
WITH base AS (

View File

@ -3,6 +3,7 @@
unique_key = 'block_id',
incremental_strategy = 'delete+insert',
cluster_by = ['block_timestamp::DATE', '_inserted_timestamp::DATE'],
tags = ['core']
) }}
-- depends_on: {{ ref('bronze__streamline_blocks') }}
-- depends_on: {{ ref('bronze__streamline_FR_blocks') }}

View File

@ -3,6 +3,7 @@
unique_key = 'block_id',
incremental_strategy = 'delete+insert',
cluster_by = ['_inserted_timestamp::DATE'],
tags = ['core']
) }}
-- depends_on: {{ ref('bronze__streamline_blocks') }}
-- depends_on: {{ ref('bronze__streamline_FR_blocks') }}

View File

@ -3,10 +3,10 @@
unique_key = "SWAP_ID",
incremental_strategy = "delete+insert",
cluster_by = ["block_timestamp::DATE", "_inserted_timestamp::DATE"],
tags = ['noncore']
) }}
WITH all_swaps AS(
WITH all_swaps AS(
SELECT
block_id,
@ -22,15 +22,17 @@ WITH all_swaps AS(
{{ ref("silver__messages") }}
WHERE
message_type ILIKE '%msgexecutecontract%'
AND (message_value :msg :swap IS NOT NULL
OR message_value :msg :execute_swap_operations IS NOT NULL)
AND (
message_value :msg :swap IS NOT NULL
OR message_value :msg :execute_swap_operations IS NOT NULL
)
AND {{ incremental_load_filter('_inserted_timestamp') }}
),
flattened_data AS( --- flattened allswaps table
flattened_data AS(
--- flattened allswaps table
SELECT
swap.*,
flat.seq,
flat.seq,
flat.key,
flat.value
FROM
@ -39,8 +41,8 @@ flattened_data AS( --- flattened allswaps table
input => swap_info
) flat
),
offer_amount_pattern AS( ---associated swaps columns excluding swap contract_addresses column
offer_amount_pattern AS(
---associated swaps columns excluding swap contract_addresses column
SELECT
*,
len('offer_amount') AS offer_amtlen,
@ -49,102 +51,98 @@ offer_amount_pattern AS( ---associated swaps columns excluding swap contract_
flattened_data
WHERE
key ILIKE '%offer_amount%'),
action_pattern AS( --- only swap contract_addresses column
SELECT
*,
len('action') AS actlen,
RIGHT(key, len(key) - actlen) AS suffix_ch
FROM
flattened_data
WHERE
key ILIKE '%action%'
AND VALUE = 'swap'
),
processed_swaps AS( --- Associated swap columns including the swap contract addresses column
SELECT
offer.block_id,
offer.block_timestamp,
offer._inserted_timestamp,
offer.blockchain,
offer.chain_id,
offer.tx_id,
offer.msg_index,
offer.tx_succeeded,
COALESCE(
offer.swap_info [CONCAT('offer_amount',offer.suffix_ch)] :: INTEGER,
offer.swap_info ['offer_amount'] :: INTEGER
) AS from_amount,
COALESCE(
offer.swap_info [CONCAT('offer_asset',offer.suffix_ch)] :: STRING,
offer.swap_info ['offer_asset'] :: STRING
) AS from_currency,
6 :: INTEGER AS from_decimal,
COALESCE(
offer.swap_info [CONCAT('return_amount',offer.suffix_ch)] :: INTEGER,
offer.swap_info ['return_amount'] :: INTEGER
) AS to_amount,
COALESCE(
offer.swap_info [CONCAT('ask_asset',offer.suffix_ch)] :: STRING,
offer.swap_info ['ask_asset'] :: STRING
) AS to_currency,
6 :: INTEGER AS TO_DECIMAL,
COALESCE(
action.swap_info [CONCAT('_contract_address',action.suffix_ch)] :: STRING,
action.swap_info ['_contract_address'] :: STRING
) AS contract_address
FROM
offer_amount_pattern offer
FULL JOIN action_pattern action
ON offer.tx_id = action.tx_id
AND offer.seq = action.seq
),
transactions AS (
SELECT
tx_id,
tx_sender
FROM
{{ ref ('silver__transactions') }}
WHERE
{{ incremental_load_filter("_inserted_timestamp") }}
),
final_table AS (
SELECT
DISTINCT concat_ws (
'-',
s.tx_id,
msg_index,
s.contract_address
) AS swap_id,
s.block_id,
s.block_timestamp,
s._inserted_timestamp,
s.blockchain,
s.chain_id,
s.tx_id,
s.tx_succeeded,
t.tx_sender AS trader,
from_amount,
from_currency,
from_decimal,
to_amount,
to_currency,
TO_DECIMAL,
contract_address AS pool_id
FROM
processed_swaps s
INNER JOIN transactions t
ON s.tx_id = t.tx_id
QUALIFY ROW_NUMBER() OVER (
PARTITION BY swap_id
ORDER BY s.block_timestamp DESC
) = 1
)
SELECT
*
FROM
final_table
action_pattern AS(
--- only swap contract_addresses column
SELECT
*,
len('action') AS actlen,
RIGHT(key, len(key) - actlen) AS suffix_ch
FROM
flattened_data
WHERE
key ILIKE '%action%'
AND VALUE = 'swap'),
processed_swaps AS(
--- Associated swap columns including the swap contract addresses column
SELECT
offer.block_id,
offer.block_timestamp,
offer._inserted_timestamp,
offer.blockchain,
offer.chain_id,
offer.tx_id,
offer.msg_index,
offer.tx_succeeded,
COALESCE(
offer.swap_info [CONCAT('offer_amount',offer.suffix_ch)] :: INTEGER,
offer.swap_info ['offer_amount'] :: INTEGER
) AS from_amount,
COALESCE(
offer.swap_info [CONCAT('offer_asset',offer.suffix_ch)] :: STRING,
offer.swap_info ['offer_asset'] :: STRING
) AS from_currency,
6 :: INTEGER AS from_decimal,
COALESCE(
offer.swap_info [CONCAT('return_amount',offer.suffix_ch)] :: INTEGER,
offer.swap_info ['return_amount'] :: INTEGER
) AS to_amount,
COALESCE(
offer.swap_info [CONCAT('ask_asset',offer.suffix_ch)] :: STRING,
offer.swap_info ['ask_asset'] :: STRING
) AS to_currency,
6 :: INTEGER AS TO_DECIMAL,
COALESCE(
action.swap_info [CONCAT('_contract_address',action.suffix_ch)] :: STRING,
action.swap_info ['_contract_address'] :: STRING
) AS contract_address
FROM
offer_amount_pattern offer full
JOIN action_pattern action
ON offer.tx_id = action.tx_id
AND offer.seq = action.seq
),
transactions AS (
SELECT
tx_id,
tx_sender
FROM
{{ ref ('silver__transactions') }}
WHERE
{{ incremental_load_filter("_inserted_timestamp") }}
),
final_table AS (
SELECT
DISTINCT concat_ws (
'-',
s.tx_id,
msg_index,
s.contract_address
) AS swap_id,
s.block_id,
s.block_timestamp,
s._inserted_timestamp,
s.blockchain,
s.chain_id,
s.tx_id,
s.tx_succeeded,
t.tx_sender AS trader,
from_amount,
from_currency,
from_decimal,
to_amount,
to_currency,
TO_DECIMAL,
contract_address AS pool_id
FROM
processed_swaps s
INNER JOIN transactions t
ON s.tx_id = t.tx_id qualify ROW_NUMBER() over (
PARTITION BY swap_id
ORDER BY
s.block_timestamp DESC
) = 1
)
SELECT
*
FROM
final_table

View File

@ -1,51 +0,0 @@
{{ config(
materialized = 'incremental',
unique_key = "tx_id",
incremental_strategy = 'merge',
cluster_by = ['block_timestamp::DATE'],
enabled = false
) }}
WITH base AS (
SELECT
block_id,
block_timestamp,
'terra' AS blockchain,
chain_id,
tx_id,
tx_succeeded,
message_value :proposer :: STRING AS proposer,
attributes :submit_proposal :proposal_id :: INTEGER AS proposal_id,
attributes :submit_proposal :proposal_type :: STRING AS proposal_type,
_ingested_at,
_inserted_timestamp
FROM
{{ ref('silver__messages') }}
WHERE
message_type ILIKE '%MsgSubmitProposal%'
AND attributes :message :module :: STRING = 'governance'
{% if is_incremental() %}
AND _inserted_timestamp >= (
SELECT
MAX(_inserted_timestamp)
FROM
{{ this }}
)
{% endif %}
)
SELECT
block_id,
block_timestamp,
blockchain,
chain_id,
tx_id,
tx_succeeded,
proposer,
proposal_id,
proposal_type,
_ingested_at,
_inserted_timestamp
FROM
base

View File

@ -1,104 +0,0 @@
version: 2
models:
- name: silver__governance_submit_proposal
tests:
- dbt_utils.unique_combination_of_columns:
combination_of_columns:
- TX_ID
columns:
- name: BLOCK_ID
description: "{{ doc('block_id') }}"
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- NUMBER
- FLOAT
- name: BLOCK_TIMESTAMP
description: "{{ doc('block_timestamp') }}"
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- TIMESTAMP_NTZ
- name: BLOCKCHAIN
description: "{{ doc('blockchain') }}"
tests:
- not_null
- accepted_values:
values:
- "terra"
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- STRING
- VARCHAR
- name: CHAIN_ID
description: "{{ doc('chain_id') }}"
tests:
- not_null
- accepted_values:
values:
- "phoenix-1"
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- STRING
- VARCHAR
- name: TX_ID
description: "{{ doc('tx_id') }}"
tests:
- unique
- not_null
- relationships:
to: ref('silver__transactions')
field: tx_id
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- STRING
- VARCHAR
- name: TX_SUCCEEDED
description: "{{ doc('tx_succeeded') }}"
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- boolean
- name: PROPOSER
description: "{{ doc('proposer') }}"
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- STRING
- VARCHAR
- name: PROPOSAL_ID
description: "{{ doc('proposal_id') }}"
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- NUMBER
- name: PROPOSAL_TYPE
description: "{{ doc('proposal_type') }}"
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- VARCHAR
- STRING
- name: _INSERTED_TIMESTAMP
description: "{{ doc('_inserted_timestamp') }}"
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- timestamp_ntz

View File

@ -1,70 +0,0 @@
{{ config(
materialized = 'incremental',
unique_key = 'tx_id',
incremental_strategy = 'delete+insert',
enabled = false
) }}
WITH base_votes AS (
SELECT
tx_id,
block_id,
block_timestamp,
tx_succeeded,
chain_id,
attributes,
_inserted_timestamp
FROM
{{ ref('silver__messages') }}
WHERE
message_type = '/cosmos.gov.v1beta1.MsgVote'
AND {{ incremental_load_filter('_inserted_timestamp') }}
),
parsed_votes AS (
SELECT
tx_id,
block_id,
block_timestamp,
tx_succeeded,
chain_id,
attributes :message :sender :: text AS voter,
attributes :proposal_vote :proposal_id :: NUMBER AS proposal_id,
PARSE_JSON(
attributes :proposal_vote :option
) AS parsed_vote_option,
parsed_vote_option :option :: NUMBER AS vote_option,
CASE
vote_option
WHEN 1 THEN 'Yes'
WHEN 2 THEN 'Abstain'
WHEN 3 THEN 'No'
WHEN 4 THEN 'NoWithVeto'
END AS vote_option_text,
parsed_vote_option :weight :: NUMBER AS vote_weight,
'terra' AS blockchain,
_inserted_timestamp
FROM
base_votes
),
FINAL AS (
SELECT
tx_id,
block_id,
block_timestamp,
blockchain,
chain_id,
voter,
proposal_id,
vote_option,
vote_option_text,
vote_weight,
tx_succeeded,
_inserted_timestamp
FROM
parsed_votes
)
SELECT
*
FROM
FINAL

View File

@ -1,133 +0,0 @@
version: 2
models:
- name: silver__governance_votes
description: |-
This table contains votes cast on governance proposals.
columns:
- name: tx_id
description: "{{ doc('tx_id') }}"
tests:
- unique
- not_null
- relationships:
to: ref('silver__transactions')
field: tx_id
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- string
- varchar
- name: block_id
description: "{{ doc('block_id') }}"
tests:
- not_null
- relationships:
to: ref('silver__blocks')
field: block_id
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- number
- name: block_timestamp
description: "{{ doc('block_timestamp') }}"
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- timestamp_ntz
- name: blockchain
description: "{{ doc('blockchain') }}"
tests:
- not_null
- accepted_values:
values:
- 'terra'
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- string
- varchar
- name: chain_id
description: "{{ doc('chain_id') }}"
tests:
- not_null
- accepted_values:
values:
- 'phoenix-1'
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- string
- varchar
- name: voter
description: "{{ doc('voter') }}"
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- string
- varchar
- name: proposal_id
description: "{{ doc('proposal_id') }}"
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- number
- name: vote_option
description: "{{ doc('vote_option') }}"
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_between:
min_value: 1
max_value: 4
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- number
- name: vote_option_text
description: "{{ doc('vote_option_text') }}"
tests:
- not_null
- accepted_values:
values:
- 'Yes'
- 'Abstain'
- 'No'
- 'NoWithVeto'
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- string
- varchar
- name: vote_weight
description: "{{ doc('vote_weight') }}"
tests:
- not_null
- accepted_values:
values:
- 1
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- number
- name: tx_succeeded
description: "{{ doc('tx_succeeded') }}"
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- boolean
- name: _inserted_timestamp
description: "{{ doc('_inserted_timestamp') }}"
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- timestamp_ntz

View File

@ -1,134 +0,0 @@
{{ config(
materialized = "incremental",
cluster_by = ["_inserted_timestamp"],
unique_key = "action_id",
enabled = false
) }}
WITH pools AS (
SELECT
*
FROM
{{ ref("core__dim_address_labels") }}
WHERE
label_subtype = 'pool'
),
prelim_table AS (
SELECT
block_id,
block_timestamp,
'terra' AS blockchain,
tx_id,
tx_succeeded,
chain_id,
message_value,
message_type,
message_index,
NULLIF(
message_value :contract,
message_value :msg :send :contract
) :: STRING AS pool_address,
message_value :sender :: STRING AS liquidity_provider_address,
attributes,
path,
VALUE :: STRING AS obj_value,
_ingested_at,
_inserted_timestamp
FROM
{{ ref("silver__messages") }},
LATERAL FLATTEN(
input => attributes :wasm
)
WHERE
attributes :wasm IS NOT NULL
AND message_type = '/cosmwasm.wasm.v1.MsgExecuteContract'
AND {{ incremental_load_filter("_inserted_timestamp") }}
),
intermediate_table AS (
SELECT
prelim_table.*,
label
FROM
prelim_table
JOIN pools
ON prelim_table.pool_address = pools.address
),
final_table AS (
SELECT
block_id,
block_timestamp,
blockchain,
ROW_NUMBER() over (
PARTITION BY tx_id
ORDER BY
_inserted_timestamp DESC
) AS action_index,
CONCAT(
tx_id,
'-',
action_index -1
) AS action_id,
tx_id,
tx_succeeded,
chain_id,
pool_address,
liquidity_provider_address,
CASE
WHEN path = 'refund_assets' THEN 'withdraw_liquidity'
WHEN path = 'assets' THEN 'provide_liquidity'
WHEN path = 'withdrawn_share' THEN 'burn_lp_token'
WHEN path = 'share' THEN 'mint_lp_token'
ELSE NULL
END AS action,
REGEXP_SUBSTR(
VALUE,
'[0-9]+'
) :: bigint AS amount,
IFF(path IN ('withdrawn_share', 'share'), label, REGEXP_SUBSTR(VALUE, '[^[:digit:]](.*)')) AS currency,
NULL AS decimals,
_ingested_at,
_inserted_timestamp
FROM
intermediate_table,
LATERAL SPLIT_TO_TABLE(
intermediate_table.obj_value,
', '
)
WHERE
tx_id IN (
SELECT
tx_id
FROM
intermediate_table
WHERE
obj_value IN (
'provide_liquidity',
'withdraw_liquidity'
)
)
AND path IN (
'refund_assets',
'withdrawn_share',
'share',
'assets'
)
)
SELECT
block_id,
block_timestamp,
action_id,
tx_id,
tx_succeeded,
blockchain,
chain_id,
pool_address,
liquidity_provider_address,
action,
amount,
currency,
decimals,
_ingested_at,
_inserted_timestamp
FROM
final_table

View File

@ -1,132 +0,0 @@
version: 2
models:
- name: silver__lp_actions
description: |-
This table contains actions on liquidity pools (LP) that impact liquidity.
These include the creation of LP tokens for providing liquidity, and the
burning of LP tokens for withdrawing liquidity.
columns:
- name: action_id
description: "{{ doc('liquidity_action_id') }}"
tests:
- unique
- not_null
- dbt_expectations.expect_column_values_to_be_of_type:
column_type: varchar
- name: block_id
description: "{{ doc('block_id') }}"
tests:
- not_null
- relationships:
to: ref('silver__blocks')
field: block_id
- dbt_expectations.expect_column_values_to_be_of_type:
column_type: number
- name: block_timestamp
description: "{{ doc('block_timestamp') }}"
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_of_type:
column_type: timestamp_ntz
- name: tx_id
description: "{{ doc('tx_id') }}"
tests:
- not_null
- relationships:
to: ref('silver__transactions')
field: tx_id
- dbt_expectations.expect_column_values_to_be_of_type:
column_type: varchar
- name: tx_succeeded
description: "{{ doc('tx_succeeded') }}"
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_of_type:
column_type: boolean
- name: chain_id
description: "{{ doc('chain_id') }}"
tests:
- not_null
- accepted_values:
values:
- "phoenix-1"
- dbt_expectations.expect_column_values_to_be_of_type:
column_type: varchar
- name: blockchain
description: "{{ doc('blockchain') }}"
tests:
- not_null
- accepted_values:
values:
- "terra"
- dbt_expectations.expect_column_values_to_be_of_type:
column_type: varchar
- name: liquidity_provider_address
description: "{{ doc('liquidity_provider_address') }}"
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_of_type:
column_type: varchar
- dbt_expectations.expect_column_values_to_match_like_pattern:
like_pattern: "terra%"
- name: action
description: "{{ doc('liquidity_pool_action') }}"
tests:
- not_null
- accepted_values:
values:
- "burn_lp_token"
- "mint_lp_token"
- "provide_liquidity"
- "withdraw_liquidity"
- dbt_expectations.expect_column_values_to_be_of_type:
column_type: varchar
- name: pool_address
description: "{{ doc('liquidity_pool_address') }}"
tests:
- not_null
- relationships:
to: ref('core__dim_address_labels')
field: address
- dbt_expectations.expect_column_values_to_be_of_type:
column_type: varchar
- dbt_expectations.expect_column_values_to_match_like_pattern:
like_pattern: "terra%"
- name: amount
description: "{{ doc('liquidity_pool_amount') }}"
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_of_type:
column_type: number
- dbt_expectations.expect_column_min_to_be_between:
min_value: 0
- name: currency
description: "{{ doc('currency') }}"
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_of_type:
column_type: varchar
- name: decimals
description: "{{ doc('decimal') }}"
tests:
- not_null:
enabled: false
- dbt_expectations.expect_column_values_to_be_of_type:
column_type: number
enabled: false
- dbt_expectations.expect_column_min_to_be_between:
min_value: 0

View File

@ -1,8 +1,8 @@
{{ config(
materialized = "incremental",
cluster_by = ["_inserted_timestamp"],
unique_key = "message_id",
tags = ' ["run_now"]'
unique_key = "message_id",,
tags = ['core']
) }}
WITH txs AS (

View File

@ -2,7 +2,8 @@
materialized = "incremental",
cluster_by = ["_inserted_timestamp"],
unique_key = "message_id",
incremental_strategy = 'delete+insert'
incremental_strategy = 'delete+insert',
tags = ['core']
) }}
WITH txs AS (

View File

@ -3,7 +3,8 @@
unique_key = "_unique_key",
incremental_strategy = 'merge',
cluster_by = ['block_timestamp::DATE','_inserted_timestamp::DATE'],
post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION"
post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION",
tags = ['core']
) }}
SELECT

View File

@ -2,7 +2,8 @@
materialized = "incremental",
cluster_by = ["_inserted_timestamp"],
unique_key = "message_id",
incremental_strategy = 'delete+insert'
incremental_strategy = 'delete+insert',
tags = ['core']
) }}
WITH txs AS (

View File

@ -3,7 +3,8 @@
unique_key = "_unique_key",
incremental_strategy = 'merge',
cluster_by = ['block_timestamp::DATE','_inserted_timestamp::DATE'],
post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION"
post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION",
tags = ['core']
) }}
WITH b AS (

View File

@ -1,74 +0,0 @@
{{ config(
materialized = "incremental",
cluster_by = ["_inserted_timestamp"],
unique_key = "mint_id",
enabled = false
) }}
WITH nft_mints AS (
SELECT
block_id,
block_timestamp,
'terra' AS blockchain,
chain_id,
tx_id,
tx_succeeded,
CASE
WHEN attributes :wasm :_contract_address IS NOT NULL THEN attributes :wasm :_contract_address :: STRING
WHEN attributes :wasm :_contract_address_1 IS NOT NULL THEN attributes :wasm :_contract_address_1 :: STRING
WHEN message_value :msg :mint :mint_request :nft_contract IS NOT NULL THEN message_value :msg :mint :mint_request :nft_contract :: STRING
ELSE NULL
END AS contract_address,
message_value :msg :mint AS mint_obj,
attributes,
NULLIF(
message_value :funds [0] :amount :: bigint,
0
) AS mint_price,
message_value :sender :: STRING AS minter,
attributes :wasm :token_id :: STRING AS token_id,
attributes :coin_spent :currency_0 :: STRING AS currency,
NULL AS decimals,
ROW_NUMBER() over (
PARTITION BY tx_id
ORDER BY
_inserted_timestamp DESC
) AS INDEX,
CONCAT(
tx_id,
'-',
INDEX
) AS mint_id,
_ingested_at,
_inserted_timestamp
FROM
{{ ref('silver__messages') }}
WHERE
(
message_value :msg :mint :extension IS NOT NULL
OR message_value :msg :mint :metadata_uri IS NOT NULL
OR message_value :msg :mint :mint_request IS NOT NULL
OR message_value :msg :mint :metadata IS NOT NULL
)
AND message_type != '/cosmwasm.wasm.v1.MsgInstantiateContract'
AND {{ incremental_load_filter("_inserted_timestamp") }}
)
SELECT
block_id,
block_timestamp,
blockchain,
chain_id,
tx_id,
tx_succeeded,
contract_address,
mint_price,
minter,
token_id,
currency,
decimals,
mint_id,
_ingested_at,
_inserted_timestamp
FROM
nft_mints

View File

@ -1,124 +0,0 @@
version: 2
models:
- name: silver__nft_mints
description: |-
This table records all the nft mints transfer in the terra db
tests:
- dbt_utils.unique_combination_of_columns:
combination_of_columns:
- MINT_ID
columns:
- name: BLOCK_ID
description: "{{ doc('block_id')}}"
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- NUMBER
- FLOAT
- name: BLOCK_TIMESTAMP
description: "{{ doc('block_timestamp')}}"
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- TIMESTAMP_NTZ
- name: BLOCKCHAIN
description: "{{ doc('blockchain')}}"
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- STRING
- VARCHAR
- name: CHAIN_ID
description: "{{ doc('chain_id')}}"
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- STRING
- VARCHAR
- name: TX_ID
description: "{{ doc('tx_id')}}"
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- STRING
- VARCHAR
- name: TX_SUCCEEDED
description: "{{ doc('tx_succeeded')}}"
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- BOOLEAN
- name: MINTER
description: "{{ doc('minter')}}"
tests:
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- STRING
- VARCHAR
- name: CONTRACT_ADDRESS
description: "{{ doc('contract_address') }}"
tests:
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- STRING
- VARCHAR
- name: TOKEN_ID
description: "{{ doc('token_id') }}"
tests:
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- NUMBER
- VARCHAR
- name: MINT_PRICE
description: "{{ doc('mint_price') }}"
tests:
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- NUMBER
- name: CURRENCY
description: "{{ doc ('currency') }}"
tests:
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- STRING
- VARCHAR
- name: DECIMAL
description: "{{ doc('decimal') }}"
tests:
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- NUMBER
- name: MINT_ID
description: "{{ doc('mint_id') }}"
tests:
- not_null
- unique
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- STRING
- VARCHAR
- name: _INGESTED_AT
description: "{{ doc('_ingested_at') }}"
- name: _INSERTED_TIMESTAMP
description: "{{ doc('_inserted_timestamp') }}"

View File

@ -1,65 +0,0 @@
{{ config(
materialized = "incremental",
cluster_by = ["_inserted_timestamp"],
unique_key = "unique_id",
enabled = false
) }}
WITH msg AS (
SELECT
block_id,
block_timestamp,
'terra' AS blockchain,
chain_id,
tx_id,
tx_succeeded,
6 AS decimals,
attributes :wasm :recipient :: STRING AS purchaser,
attributes :wasm :seller :: STRING AS seller,
attributes :wasm :amount :: NUMBER AS sales_amount,
attributes :wasm :denom :: STRING AS currency,
attributes :wasm :_contract_address_0 :: STRING AS marketplace,
attributes :wasm :nft_contract :: STRING AS contract_address,
attributes :wasm :token_id_0 :: STRING AS token_id,
_ingested_at,
_inserted_timestamp
FROM
{{ ref("silver__messages") }}
WHERE
message_type = '/cosmwasm.wasm.v1.MsgExecuteContract'
AND attributes :wasm :action_0 = 'settle'
AND attributes :wasm :action_1 = 'transfer_nft'
AND attributes :wasm :action_2 = 'settle_hook'
AND {{ incremental_load_filter("_inserted_timestamp") }}
),
FINAL AS (
SELECT
block_id,
block_timestamp,
blockchain,
chain_id,
tx_id,
tx_succeeded,
decimals,
purchaser,
seller,
sales_amount,
currency,
marketplace,
contract_address,
token_id,
CONCAT(
tx_id,
'-',
token_id
) AS unique_id,
_ingested_at,
_inserted_timestamp
FROM
msg
)
SELECT
*
FROM
FINAL

View File

@ -1,122 +0,0 @@
version: 2
models:
- name: silver__nft_sales
description: |-
This table contains nft sales transactions that happens on various marketplace in the terra2 blockchain
columns:
- name: unique_id
description: "{{ doc('unique_key') }}"
tests:
- unique
- not_null
- dbt_expectations.expect_column_values_to_be_of_type:
column_type: varchar
- name: block_id
description: "{{ doc('block_id') }}"
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- NUMBER
- FLOAT
- name: block_timestamp
description: "{{ doc('block_timestamp') }}"
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_of_type:
column_type: timestamp_ntz
- name: tx_id
description: "{{ doc('tx_id') }}"
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- STRING
- VARCHAR
- name: tx_succeeded
description: "{{ doc('tx_succeeded') }}"
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- BOOLEAN
- name: chain_id
description: "{{ doc('chain_id') }}"
tests:
- not_null
- accepted_values:
values:
- "phoenix-1"
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- STRING
- VARCHAR
- name: blockchain
description: "{{ doc('blockchain') }}"
tests:
- not_null
- accepted_values:
values:
- "terra"
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- STRING
- VARCHAR
- name: purchaser
description: "{{ doc('purchaser') }}"
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- STRING
- VARCHAR
- name: seller
description: "{{ doc('seller') }}"
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- STRING
- VARCHAR
- name: sales_amount
description: "{{ doc('sales_amount') }}"
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- NUMBER
- FLOAT
- name: currency
description: "{{ doc('currency') }}"
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- STRING
- VARCHAR
- name: decimals
description: "{{ doc('decimal') }}"
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- NUMBER
- name: _INGESTED_AT
description: "{{ doc('_ingested_at') }}"
- name: _INSERTED_TIMESTAMP
description: "{{ doc('_inserted_timestamp') }}"

View File

@ -2,7 +2,8 @@
materialized = 'incremental',
unique_key = "staking_id",
incremental_strategy = 'merge',
cluster_by = ['block_timestamp::DATE']
cluster_by = ['block_timestamp::DATE'],
tags = ['noncore']
) }}
WITH

View File

@ -2,6 +2,7 @@
materialized = "incremental",
cluster_by = ["_inserted_timestamp::DATE"],
unique_key = "address",
tags = ['noncore']
) }}
WITH token_labels AS (
@ -26,7 +27,7 @@ WITH token_labels AS (
AND {{ incremental_load_filter("_inserted_timestamp") }}
)
SELECT
'terra' as blockchain,
'terra' AS blockchain,
block_timestamp,
tx_id,
label,

View File

@ -1,7 +1,8 @@
{{ config(
materialized = 'incremental',
cluster_by = ['_inserted_timestamp::DATE'],
unique_key = 'tx_id'
unique_key = 'tx_id',
tags = ['core']
) }}
-- depends_on: {{ ref('bronze__streamline_transactions') }}
-- depends_on: {{ ref('bronze__streamline_FR_transactions') }}

View File

@ -2,7 +2,8 @@
materialized = 'incremental',
unique_key = ['tx_id','msg_index'],
incremental_strategy = 'merge',
cluster_by = ['block_timestamp::DATE']
cluster_by = ['block_timestamp::DATE'],
tags = ['core']
) }}
WITH base_atts AS (

View File

@ -2,7 +2,8 @@
materialized = 'incremental',
unique_key = ['tx_id','msg_index'],
incremental_strategy = 'merge',
cluster_by = ['block_timestamp::DATE']
cluster_by = ['block_timestamp::DATE'],
tags = ['core']
) }}
WITH base_atts AS (

View File

@ -2,7 +2,8 @@
materialized = 'incremental',
unique_key = ['tx_id','msg_index'],
incremental_strategy = 'merge',
cluster_by = ['block_timestamp::DATE']
cluster_by = ['block_timestamp::DATE'],
tags = ['core']
) }}
WITH base_atts AS (