Merge pull request #5 from FlipsideCrypto/gold

Gold
This commit is contained in:
eric-laurello 2025-04-14 15:28:23 -04:00 committed by GitHub
commit bfaa232d52
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
25 changed files with 730 additions and 45 deletions

View File

@ -5,6 +5,11 @@ on:
workflow_dispatch:
branches:
- "main"
schedule:
# An hour after the daily TON exports Airflow job kicks off
# 20 minutes after the daily TON external table refresh happens
# https://github.com/ton-studio/ton-etl/blob/main/datalake/airflow/dags/datalake_daily_sync.py#L58
- cron: "35 1 * * *"
env:
DBT_PROFILES_DIR: "${{ vars.DBT_PROFILES_DIR }}"

View File

@ -1,5 +1,6 @@
{{ config (
materialized = 'view'
materialized = 'view',
enabled = false
) }}
{{ streamline_external_table_FR_query_v2(
model = 'messages_tdl',

View File

@ -0,0 +1,35 @@
{{ config(
materialized = 'incremental',
unique_key = ['fact_account_states_id'],
incremental_strategy = 'merge',
merge_exclude_columns = ['inserted_timestamp'],
cluster_by = ['block_timestamp::DATE'],
post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION ON EQUALITY(account);",
tags = ['scheduled_core']
) }}
SELECT
TO_TIMESTAMP(TIMESTAMP) AS block_timestamp,
account,
TIMESTAMP,
last_trans_lt,
last_trans_hash AS last_tx_hash,
account_status,
balance,
frozen_hash,
HASH AS account_state_hash,
account_states_id AS fact_account_states_id,
SYSDATE() AS inserted_timestamp,
SYSDATE() AS modified_timestamp
FROM
{{ ref('silver__account_states') }}
{% if is_incremental() %}
WHERE
modified_timestamp >= (
SELECT
MAX(modified_timestamp)
FROM
{{ this }}
)
{% endif %}

View File

@ -0,0 +1,32 @@
{{ config(
materialized = 'incremental',
unique_key = ['fact_balances_history_id'],
incremental_strategy = 'merge',
merge_exclude_columns = ['inserted_timestamp'],
cluster_by = ['block_timestamp::DATE'],
post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION ON EQUALITY(address);",
tags = ['scheduled_core']
) }}
SELECT
TO_TIMESTAMP(TIMESTAMP) AS block_timestamp,
address,
asset,
amount,
mintless_claimed,
lt,
balances_history_id AS fact_balances_history_id,
SYSDATE() AS inserted_timestamp,
SYSDATE() AS modified_timestamp
FROM
{{ ref('silver__balances_history') }}
{% if is_incremental() %}
WHERE
modified_timestamp >= (
SELECT
MAX(modified_timestamp)
FROM
{{ this }}
)
{% endif %}

View File

@ -0,0 +1,56 @@
{{ config(
materialized = 'incremental',
unique_key = ['fact_blocks_id'],
incremental_strategy = 'merge',
merge_exclude_columns = ['inserted_timestamp'],
cluster_by = ['block_timestamp::DATE','workchain'],
tags = ['scheduled_core']
) }}
SELECT
TO_TIMESTAMP(gen_utime) AS block_timestamp,
gen_utime,
workchain,
version,
shard,
seqno,
vert_seqno,
start_lt,
end_lt,
mc_block_seqno,
mc_block_shard,
tx_count,
global_id,
created_by,
want_merge,
root_hash,
key_block,
vert_seqno_incr,
validator_list_hash_short,
after_merge,
want_split,
after_split,
master_ref_seqno,
mc_block_workchain,
file_hash,
prev_key_block_seqno,
flags,
rand_seed,
gen_catchain_seqno,
min_ref_mc_seqno,
before_split,
blocks_id AS fact_blocks_id,
SYSDATE() AS inserted_timestamp,
SYSDATE() AS modified_timestamp
FROM
{{ ref('silver__blocks') }}
{% if is_incremental() %}
WHERE
modified_timestamp >= (
SELECT
MAX(modified_timestamp)
FROM
{{ this }}
)
{% endif %}

View File

@ -0,0 +1,43 @@
{{ config(
materialized = 'incremental',
unique_key = ['fact_jetton_events_id'],
incremental_strategy = 'merge',
merge_exclude_columns = ['inserted_timestamp'],
cluster_by = ['block_timestamp::DATE','source','destination'],
tags = ['scheduled_core']
) }}
SELECT
TO_TIMESTAMP(utime) AS block_timestamp,
tx_hash,
CASE
WHEN tx_aborted THEN FALSE
ELSE TRUE
END tx_succeeded,
TYPE,
source,
destination,
forward_ton_amount,
amount,
jetton_master,
jetton_wallet,
COMMENT,
query_id,
tx_lt,
utime,
tx_aborted,
jetton_events_id AS fact_jetton_events_id,
SYSDATE() AS inserted_timestamp,
SYSDATE() AS modified_timestamp
FROM
{{ ref('silver__jetton_events') }}
{% if is_incremental() %}
WHERE
modified_timestamp >= (
SELECT
MAX(modified_timestamp)
FROM
{{ this }}
)
{% endif %}

View File

@ -0,0 +1,45 @@
{{ config(
materialized = 'incremental',
unique_key = ['fact_jetton_metadata_id'],
incremental_strategy = 'merge',
merge_exclude_columns = ['inserted_timestamp'],
cluster_by = ['update_timestamp_onchain::DATE'],
post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION ON EQUALITY(address,symbol);",
tags = ['scheduled_core']
) }}
SELECT
address,
TO_TIMESTAMP(update_time_onchain) AS update_timestamp_onchain,
symbol,
decimals,
NAME,
description,
tonapi_image_url,
image_data,
image,
admin_address,
mintable,
jetton_content_onchain,
jetton_wallet_code_hash,
code_hash,
adding_at,
sources,
metadata_status,
update_time_onchain,
update_time_metadata,
jetton_metadata_id AS fact_jetton_metadata_id,
SYSDATE() AS inserted_timestamp,
SYSDATE() AS modified_timestamp
FROM
{{ ref('silver__jetton_metadata') }}
{% if is_incremental() %}
WHERE
modified_timestamp >= (
SELECT
MAX(modified_timestamp)
FROM
{{ this }}
)
{% endif %}

View File

@ -0,0 +1,50 @@
{{ config(
materialized = 'incremental',
unique_key = ['fact_messages_id'],
incremental_strategy = 'merge',
merge_exclude_columns = ['inserted_timestamp'],
cluster_by = ['block_timestamp::DATE'],
post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION ON EQUALITY(tx_hash,msg_hash,source,destination);",
tags = ['scheduled_core']
) }}
SELECT
TO_TIMESTAMP(tx_now) AS block_timestamp,
tx_hash,
msg_hash,
body_hash,
trace_id,
direction,
source,
destination,
_VALUE AS VALUE,
opcode,
created_at,
tx_now,
ihr_fee,
import_fee,
fwd_fee,
ihr_disabled,
bounced,
bounce,
COMMENT,
tx_lt,
created_lt,
init_state_hash,
init_state_boc,
body_boc,
messages_with_data_id AS fact_messages_id,
SYSDATE() AS inserted_timestamp,
SYSDATE() AS modified_timestamp
FROM
{{ ref('silver__messages_with_data') }}
{% if is_incremental() %}
WHERE
modified_timestamp >= (
SELECT
MAX(modified_timestamp)
FROM
{{ this }}
)
{% endif %}

View File

@ -0,0 +1,88 @@
{{ config(
materialized = 'incremental',
unique_key = ['fact_transactions_id'],
incremental_strategy = 'merge',
merge_exclude_columns = ['inserted_timestamp'],
cluster_by = ['block_timestamp::DATE'],
post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION ON EQUALITY(tx_hash,prev_tx_hash,account);",
tags = ['scheduled_core']
) }}
SELECT
TO_TIMESTAMP(now) AS block_timestamp,
HASH AS tx_hash,
prev_trans_hash AS prev_tx_hash,
CASE
WHEN aborted THEN FALSE
ELSE TRUE
END tx_succeeded,
aborted,
account,
orig_status,
end_status,
compute_success,
compute_skipped,
compute_gas_fees,
action_result_code,
action_success,
action_spec_actions,
action_result_arg,
action_skipped_actions,
action_valid,
action_tot_actions,
action_no_funds,
action_status_change,
compute_msg_state_used,
descr,
block_workchain,
block_seqno,
block_shard,
mc_block_seqno,
total_fees,
storage_fees_collected,
credit_due_fees_collected,
action_total_fwd_fees,
storage_fees_due,
action_total_action_fees,
account_state_balance_before,
account_state_balance_after,
account_state_hash_before,
account_state_hash_after,
account_state_code_hash_before,
account_state_code_hash_after,
installed,
destroyed,
is_tock,
credit_first,
compute_account_activated,
compute_vm_steps,
compute_exit_arg,
compute_gas_credit,
compute_gas_limit,
compute_gas_used,
compute_vm_init_state_hash,
compute_vm_final_state_hash,
skipped_reason,
compute_exit_code,
storage_status_change,
compute_mode,
credit,
trace_id,
lt,
prev_trans_lt,
now,
transactions_id AS fact_transactions_id,
SYSDATE() AS inserted_timestamp,
SYSDATE() AS modified_timestamp
FROM
{{ ref('silver__transactions') }}
{% if is_incremental() %}
WHERE
modified_timestamp >= (
SELECT
MAX(modified_timestamp)
FROM
{{ this }}
)
{% endif %}

View File

@ -0,0 +1,8 @@
version: 2
models:
- name: core__fact_blocks
config:
event_time: block_date

View File

@ -0,0 +1,44 @@
{{ config(
materialized = 'incremental',
unique_key = ['fact_dex_pools_id'],
incremental_strategy = 'merge',
merge_exclude_columns = ['inserted_timestamp'],
cluster_by = ['block_timestamp_last_updated::DATE'],
post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION ON EQUALITY(pool);",
tags = ['scheduled_core']
) }}
SELECT
TO_TIMESTAMP(last_updated) AS block_timestamp_last_updated,
TO_TIMESTAMP(discovered_at) AS block_timestamp_discovered_at,
project,
pool,
version,
is_liquid,
reserves_right,
reserves_left,
jetton_right,
jetton_left,
total_supply,
protocol_fee,
referral_fee,
lp_fee,
tvl_ton,
tvl_usd,
last_updated,
discovered_at,
dex_pools_id AS fact_dex_pools_id,
SYSDATE() AS inserted_timestamp,
SYSDATE() AS modified_timestamp
FROM
{{ ref('silver__dex_pools') }}
{% if is_incremental() %}
WHERE
modified_timestamp >= (
SELECT
MAX(modified_timestamp)
FROM
{{ this }}
)
{% endif %}

View File

@ -0,0 +1,46 @@
{{ config(
materialized = 'incremental',
unique_key = ['fact_dex_trades_id'],
incremental_strategy = 'merge',
merge_exclude_columns = ['inserted_timestamp'],
cluster_by = ['block_timestamp::DATE'],
post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION ON EQUALITY(tx_hash,pool_address,trader_address,token_bought_address,token_sold_address);",
tags = ['scheduled_core']
) }}
SELECT
TO_TIMESTAMP(event_time) AS block_timestamp,
event_type,
tx_hash,
project_type,
project,
pool_address,
version,
trader_address,
token_bought_address,
token_sold_address,
amount_bought_raw,
amount_sold_raw,
router_address,
volume_ton,
volume_usd,
referral_address,
platform_tag,
trace_id,
query_id,
event_time,
dex_trades_id AS fact_dex_trades_id,
SYSDATE() AS inserted_timestamp,
SYSDATE() AS modified_timestamp
FROM
{{ ref('silver__dex_trades') }}
{% if is_incremental() %}
WHERE
modified_timestamp >= (
SELECT
MAX(modified_timestamp)
FROM
{{ this }}
)
{% endif %}

View File

@ -0,0 +1,57 @@
{{ config(
materialized = 'incremental',
unique_key = ['fact_nft_events_id'],
incremental_strategy = 'merge',
merge_exclude_columns = ['inserted_timestamp'],
cluster_by = ['block_timestamp::DATE'],
post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION ON EQUALITY(tx_hash,nft_item_address,sale_contract,marketplace_address );",
tags = ['scheduled_core']
) }}
SELECT
TO_TIMESTAMP(TIMESTAMP) AS block_timestamp,
tx_hash,
TYPE,
sale_type,
nft_item_address,
nft_item_index,
payment_asset,
sale_price,
forward_amount,
royalty_amount,
marketplace_fee,
auction_max_bid,
auction_min_bid,
sale_contract,
royalty_address,
marketplace_address,
marketplace_fee_address,
owner_address,
collection_address,
content_onchain,
trace_id,
query_id,
is_init,
custom_payload,
COMMENT,
sale_end_time,
forward_payload,
auction_min_step,
prev_owner,
TIMESTAMP,
lt,
nft_events_id AS fact_nft_events_id,
SYSDATE() AS inserted_timestamp,
SYSDATE() AS modified_timestamp
FROM
{{ ref('silver__nft_events') }}
{% if is_incremental() %}
WHERE
modified_timestamp >= (
SELECT
MAX(modified_timestamp)
FROM
{{ this }}
)
{% endif %}

View File

@ -0,0 +1,35 @@
{{ config(
materialized = 'incremental',
unique_key = ['fact_nft_items_id'],
incremental_strategy = 'merge',
merge_exclude_columns = ['inserted_timestamp'],
cluster_by = ['block_timestamp::DATE'],
post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION ON EQUALITY(address,collection_address,owner_address);",
tags = ['scheduled_core']
) }}
SELECT
TO_TIMESTAMP(TIMESTAMP) AS block_timestamp,
address,
INDEX,
collection_address,
owner_address,
content_onchain,
is_init,
lt,
TIMESTAMP,
nft_items_id AS fact_nft_items_id,
SYSDATE() AS inserted_timestamp,
SYSDATE() AS modified_timestamp
FROM
{{ ref('silver__nft_items') }}
{% if is_incremental() %}
WHERE
modified_timestamp >= (
SELECT
MAX(modified_timestamp)
FROM
{{ this }}
)
{% endif %}

View File

@ -0,0 +1,42 @@
{{ config(
materialized = 'incremental',
unique_key = ['fact_nft_metadata_id'],
incremental_strategy = 'merge',
merge_exclude_columns = ['inserted_timestamp'],
cluster_by = ['update_timestamp_onchain::DATE'],
post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION ON EQUALITY(address,parent_address);",
tags = ['scheduled_core']
) }}
SELECT
address,
parent_address,
TO_TIMESTAMP(update_time_onchain) AS update_timestamp_onchain,
TYPE,
NAME,
description,
metadata_status,
attributes,
image,
image_data,
tonapi_image_url,
content_onchain,
sources,
update_time_onchain,
update_time_metadata,
adding_at,
nft_metadata_id AS fact_nft_metadata_id,
SYSDATE() AS inserted_timestamp,
SYSDATE() AS modified_timestamp
FROM
{{ ref('silver__nft_metadata') }}
{% if is_incremental() %}
WHERE
modified_timestamp >= (
SELECT
MAX(modified_timestamp)
FROM
{{ this }}
)
{% endif %}

View File

@ -0,0 +1,49 @@
{{ config(
materialized = 'incremental',
unique_key = ['fact_nft_sales_id'],
incremental_strategy = 'merge',
merge_exclude_columns = ['inserted_timestamp'],
cluster_by = ['block_timestamp::DATE'],
post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION ON EQUALITY(address,nft_address,asset,nft_owner_address);",
tags = ['scheduled_core']
) }}
SELECT
TO_TIMESTAMP(TIMESTAMP) AS block_timestamp,
address,
nft_address,
TYPE,
asset,
price,
marketplace_fee,
royalty_amount,
nft_owner_address,
marketplace_address,
marketplace_fee_address,
royalty_address,
is_complete,
is_canceled,
min_bid,
max_bid,
min_step,
end_time,
last_bid_at,
last_member,
created_at,
TIMESTAMP,
lt,
nft_sales_id AS fact_nft_sales_id,
SYSDATE() AS inserted_timestamp,
SYSDATE() AS modified_timestamp
FROM
{{ ref('silver__nft_sales') }}
{% if is_incremental() %}
WHERE
modified_timestamp >= (
SELECT
MAX(modified_timestamp)
FROM
{{ this }}
)
{% endif %}

View File

@ -0,0 +1,47 @@
{{ config(
materialized = 'incremental',
unique_key = ['fact_nft_transfers_id'],
incremental_strategy = 'merge',
merge_exclude_columns = ['inserted_timestamp'],
cluster_by = ['block_timestamp::DATE'],
post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION ON EQUALITY(tx_hash,old_owner,new_owner,nft_collection_address,nft_item_address);",
tags = ['scheduled_core']
) }}
SELECT
TO_TIMESTAMP(tx_now) AS block_timestamp,
tx_hash,
CASE
WHEN tx_aborted THEN FALSE
ELSE TRUE
END tx_succeeded,
tx_aborted,
old_owner,
new_owner,
nft_collection_address,
nft_item_address,
nft_item_index,
response_destination,
forward_amount,
custom_payload,
forward_payload,
COMMENT,
trace_id,
query_id,
tx_now,
tx_lt,
nft_transfers_id AS fact_nft_transfers_id,
SYSDATE() AS inserted_timestamp,
SYSDATE() AS modified_timestamp
FROM
{{ ref('silver__nft_transfers') }}
{% if is_incremental() %}
WHERE
modified_timestamp >= (
SELECT
MAX(modified_timestamp)
FROM
{{ this }}
)
{% endif %}

View File

@ -36,6 +36,7 @@ SELECT
address,
amount,
TIMESTAMP,
mintless_claimed,
_inserted_timestamp,
{{ dbt_utils.generate_surrogate_key(
['address','asset','TIMESTAMP']

View File

@ -45,13 +45,13 @@ WITH pre_final AS (
_inserted_timestamp
FROM
{{ ref('bronze__blocks') }}
qualify ROW_NUMBER() over (
PARTITION BY seqno,
shard,
workchain
ORDER BY
_inserted_timestamp DESC
) = 1
{# qualify ROW_NUMBER() over (
PARTITION BY seqno,
shard,
workchain
ORDER BY
_inserted_timestamp DESC
) = 1 #}
)
SELECT
block_date,

View File

@ -31,12 +31,12 @@ WITH pre_final AS (
_inserted_timestamp
FROM
{{ ref('bronze__dex_pools') }}
qualify ROW_NUMBER() over (
PARTITION BY pool,
last_updated
ORDER BY
_inserted_timestamp DESC
) = 1
{# qualify ROW_NUMBER() over (
PARTITION BY pool,
last_updated
ORDER BY
_inserted_timestamp DESC
) = 1 #}
)
SELECT
block_date,

View File

@ -34,12 +34,12 @@ WITH pre_final AS (
_inserted_timestamp
FROM
{{ ref('bronze__dex_trades') }}
qualify ROW_NUMBER() over (
PARTITION BY tx_hash,
trace_id
ORDER BY
_inserted_timestamp DESC
) = 1
{# qualify ROW_NUMBER() over (
PARTITION BY tx_hash,
trace_id
ORDER BY
_inserted_timestamp DESC
) = 1 #}
)
SELECT
block_date,

View File

@ -28,11 +28,11 @@ WITH pre_final AS (
_inserted_timestamp
FROM
{{ ref('bronze__jetton_events') }}
qualify ROW_NUMBER() over (
PARTITION BY tx_hash
ORDER BY
_inserted_timestamp DESC
) = 1
{# qualify ROW_NUMBER() over (
PARTITION BY tx_hash
ORDER BY
_inserted_timestamp DESC
) = 1 #}
)
SELECT
block_date,

View File

@ -33,13 +33,13 @@ WITH pre_final AS (
_inserted_timestamp
FROM
{{ ref('bronze__jetton_metadata') }}
qualify ROW_NUMBER() over (
PARTITION BY address,
update_time_metadata,
update_time_onchain
ORDER BY
_inserted_timestamp DESC
) = 1
{# qualify ROW_NUMBER() over (
PARTITION BY address,
update_time_metadata,
update_time_onchain
ORDER BY
_inserted_timestamp DESC
) = 1 #}
)
SELECT
adding_date,

View File

@ -5,7 +5,8 @@
begin = '2024-01-01',
batch_size = 'day',
cluster_by = ['block_date::DATE','modified_timestamp::DATE'],
tags = ['scheduled_core']
tags = ['scheduled_core'],
enabled = false
) }}
WITH pre_final AS (
@ -36,12 +37,12 @@ WITH pre_final AS (
_inserted_timestamp
FROM
{{ ref('bronze__messages') }}
qualify ROW_NUMBER() over (
PARTITION BY tx_hash,
msg_hash
ORDER BY
_inserted_timestamp DESC
) = 1
{# qualify ROW_NUMBER() over (
PARTITION BY tx_hash,
msg_hash
ORDER BY
_inserted_timestamp DESC
) = 1 #}
)
SELECT
block_date,

View File

@ -38,12 +38,12 @@ WITH pre_final AS (
_inserted_timestamp
FROM
{{ ref('bronze__messages_with_data') }}
qualify ROW_NUMBER() over (
PARTITION BY tx_hash,
msg_hash
ORDER BY
_inserted_timestamp DESC
) = 1
{# qualify ROW_NUMBER() over (
PARTITION BY tx_hash,
msg_hash
ORDER BY
_inserted_timestamp DESC
) = 1 #}
)
SELECT
block_date,