AN-4936/Backfill Migration Final (#335)

* del cw models

* core views

* defi views

* gov view

* nft_views

* rm ref to old cw models

* del internal cols

* nv 13-15

* nv 13-15

* nv 17-18

* nv 20

* del temp logic

* upd tests on allday metadata

* upd to warn
This commit is contained in:
Jack Forgash 2024-07-01 13:27:42 -06:00 committed by GitHub
parent 3bf75dfd18
commit 30ce62f6a0
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
83 changed files with 120 additions and 6664 deletions

View File

@ -48,7 +48,7 @@ jobs:
run: >
dbt run-operation stage_external_sources --vars "ext_full_refresh: true";
dbt seed;
dbt run -s tag:scheduled_core tag:streamline_complete "flow_models,models/gold" --vars '{"STREAMLINE_START_BLOCK": ${{ vars.STREAMLINE_START_BLOCK }}}'
dbt run -s tag:scheduled_core tag:streamline_complete "flow_models,models/gold"
- name: Store logs
uses: actions/upload-artifact@v3

View File

@ -44,7 +44,7 @@ jobs:
run: >
dbt run-operation stage_external_sources --vars "ext_full_refresh: true";
dbt seed;
dbt run -s tag:scheduled_non_core --vars '{"STREAMLINE_START_BLOCK": ${{ vars.STREAMLINE_START_BLOCK }}}'
dbt run -s tag:scheduled_non_core
- name: Store logs
uses: actions/upload-artifact@v3

View File

@ -2,7 +2,7 @@
# and underscores. A good package name should reflect your organization's
# name or the intended use of these models
name: "flow_models"
version: "1.7.0"
version: "1.8.0"
config-version: 2
require-dbt-version: ">=1.7.0"

View File

@ -3,61 +3,16 @@
tags = ['scheduled']
) }}
WITH chainwalkers AS (
SELECT
NULL AS event_contract_id,
event_contract,
contract_name,
account_address,
_inserted_timestamp,
NULL AS inserted_timestamp,
NULL AS modified_timestamp
FROM
{{ ref('silver__contract_labels') }}
),
streamline AS (
SELECT
event_contract_id,
event_contract,
contract_name,
account_address,
_inserted_timestamp,
inserted_timestamp,
modified_timestamp
FROM
{{ ref('silver__contract_labels_s') }}
),
FINAL AS (
SELECT
*
FROM
chainwalkers
UNION ALL
SELECT
*
FROM
streamline
)
SELECT
event_contract_id,
event_contract,
contract_name,
account_address,
inserted_timestamp,
modified_timestamp,
COALESCE (
event_contract_id,
{{ dbt_utils.generate_surrogate_key(['event_contract']) }}
) AS dim_contract_labels_id,
COALESCE (
inserted_timestamp,
_inserted_timestamp
) AS inserted_timestamp,
COALESCE (
modified_timestamp,
_inserted_timestamp
) AS modified_timestamp
) AS dim_contract_labels_id
FROM
FINAL qualify ROW_NUMBER() over (
PARTITION BY event_contract
ORDER BY
_inserted_timestamp DESC
) = 1
{{ ref('silver__contract_labels_s') }}

View File

@ -3,68 +3,6 @@
tags = ['ez', 'scheduled']
) }}
WITH chainwalkers AS (
SELECT
NULL AS token_transfers_id,
block_height,
block_timestamp,
tx_id,
sender,
recipient,
token_contract,
amount,
tx_succeeded,
_inserted_timestamp,
NULL AS inserted_timestamp,
NULL AS modified_timestamp
FROM
{{ ref('silver__token_transfers') }}
WHERE
token_contract NOT IN (
'A.c38aea683c0c4d38.ZelosAccountingToken',
'A.f1b97c06745f37ad.SwapPair'
)
AND block_height < {{ var(
'STREAMLINE_START_BLOCK'
) }}
),
streamline AS (
SELECT
token_transfers_id,
block_height,
block_timestamp,
tx_id,
sender,
recipient,
token_contract,
amount,
tx_succeeded,
_inserted_timestamp,
inserted_timestamp,
modified_timestamp
FROM
{{ ref('silver__token_transfers_s') }}
WHERE
token_contract NOT IN (
'A.c38aea683c0c4d38.ZelosAccountingToken',
'A.f1b97c06745f37ad.SwapPair'
)
AND block_height >= {{ var(
'STREAMLINE_START_BLOCK'
) }}
),
FINAL AS (
SELECT
*
FROM
streamline
UNION ALL
SELECT
*
FROM
chainwalkers
)
SELECT
block_height,
block_timestamp,
@ -80,13 +18,12 @@ SELECT
['tx_id','sender', 'recipient','token_contract', 'amount']
) }}
) AS ez_token_transfers_id,
COALESCE (
inserted_timestamp,
_inserted_timestamp
) AS inserted_timestamp,
COALESCE (
modified_timestamp,
_inserted_timestamp
) AS modified_timestamp
inserted_timestamp,
modified_timestamp
FROM
FINAL
{{ ref('silver__token_transfers_s') }}
WHERE
token_contract NOT IN (
'A.c38aea683c0c4d38.ZelosAccountingToken',
'A.f1b97c06745f37ad.SwapPair'
)

View File

@ -15,7 +15,6 @@ models:
- amount
config:
severity: warn
error_if: ">10"
columns:
- name: TX_ID

View File

@ -3,67 +3,12 @@
tags = ['scheduled']
) }}
WITH chainwalkers AS (
SELECT
NULL AS blocks_id,
block_height,
block_timestamp,
network,
network_version,
chain_id,
tx_count,
id,
parent_id,
_inserted_timestamp,
NULL AS inserted_timestamp,
NULL AS modified_timestamp
FROM
{{ ref('silver__blocks') }}
WHERE
block_height < {{ var(
'STREAMLINE_START_BLOCK'
) }}
),
streamline AS (
SELECT
blocks_id,
block_height,
block_timestamp,
'mainnet' AS network,
network_version,
'flow' AS chain_id,
tx_count,
id,
parent_id,
_inserted_timestamp,
inserted_timestamp,
modified_timestamp
FROM
{{ ref('silver__streamline_blocks') }}
WHERE
block_height >= {{ var(
'STREAMLINE_START_BLOCK'
) }}
),
FINAL AS (
SELECT
*
FROM
chainwalkers
UNION ALL
SELECT
*
FROM
streamline
)
SELECT
blocks_id,
block_height,
block_height :: INT AS block_height,
block_timestamp,
network,
'mainnet' AS network,
network_version,
chain_id,
'flow' AS chain_id,
tx_count,
id,
parent_id,
@ -71,13 +16,8 @@ SELECT
blocks_id,
{{ dbt_utils.generate_surrogate_key(['block_height']) }}
) AS fact_blocks_id,
COALESCE (
inserted_timestamp,
_inserted_timestamp
) AS inserted_timestamp,
COALESCE (
modified_timestamp,
_inserted_timestamp
) AS modified_timestamp
_inserted_timestamp,
inserted_timestamp,
modified_timestamp
FROM
FINAL
{{ ref('silver__streamline_blocks') }}

View File

@ -3,64 +3,10 @@
tags = ['scheduled']
) }}
WITH chainwalkers AS (
SELECT
NULL AS streamline_event_id,
tx_id,
block_timestamp,
block_height,
tx_succeeded,
event_index,
event_contract,
event_type,
event_data,
_inserted_timestamp,
NULL AS inserted_timestamp,
NULL AS modified_timestamp
FROM
{{ ref('silver__events_final') }}
WHERE
block_height < {{ var(
'STREAMLINE_START_BLOCK'
) }}
),
streamline AS (
SELECT
streamline_event_id,
tx_id,
block_timestamp,
block_height,
tx_succeeded,
event_index,
event_contract,
event_type,
event_data,
_inserted_timestamp,
inserted_timestamp,
modified_timestamp
FROM
{{ ref('silver__streamline_events') }}
WHERE
block_height >= {{ var(
'STREAMLINE_START_BLOCK'
) }}
),
FINAL AS (
SELECT
*
FROM
chainwalkers
UNION ALL
SELECT
*
FROM
streamline
)
SELECT
tx_id,
block_timestamp,
block_height,
block_height :: INT AS block_height,
tx_succeeded,
event_index,
event_contract,
@ -70,13 +16,8 @@ SELECT
streamline_event_id,
{{ dbt_utils.generate_surrogate_key(['tx_id']) }}
) AS fact_events_id,
COALESCE (
inserted_timestamp,
_inserted_timestamp
) AS inserted_timestamp,
COALESCE (
modified_timestamp,
_inserted_timestamp
) AS modified_timestamp
_inserted_timestamp,
inserted_timestamp,
modified_timestamp
FROM
FINAL
{{ ref('silver__streamline_events') }}

View File

@ -3,88 +3,11 @@
tags = ['scheduled']
) }}
WITH chainwalkers AS (
SELECT
NULL AS streamline_transaction_id,
tx_id,
block_timestamp,
block_height,
chain_id,
tx_index,
proposer,
payer,
authorizers,
count_authorizers,
gas_limit,
NULL AS script,
NULL AS arguments,
transaction_result,
tx_succeeded,
error_msg,
_inserted_timestamp,
NULL AS inserted_timestamp,
NULL AS modified_timestamp
FROM
{{ ref('silver__transactions') }}
WHERE
block_height < {{ var(
'STREAMLINE_START_BLOCK'
) }}
),
streamline AS (
SELECT
streamline_transaction_id,
tx_id,
block_timestamp,
block_height,
'flow' AS chain_id,
NULL AS tx_index,
proposer,
payer,
authorizers,
count_authorizers,
gas_limit,
script,
arguments,
OBJECT_CONSTRUCT(
'error',
error_message,
'events',
events,
'status',
status
) AS transaction_result,
tx_succeeded,
error_message AS error_msg,
_inserted_timestamp,
NULL AS inserted_timestamp,
NULL AS modified_timestamp
FROM
{{ ref('silver__streamline_transactions_final') }}
WHERE
NOT pending_result_response
AND block_height >= {{ var(
'STREAMLINE_START_BLOCK'
) }}
),
FINAL AS (
SELECT
*
FROM
chainwalkers
UNION ALL
SELECT
*
FROM
streamline
)
SELECT
tx_id,
block_timestamp,
block_height,
chain_id,
tx_index,
block_height :: INT AS block_height,
'flow' AS chain_id,
proposer,
payer,
authorizers,
@ -92,20 +15,23 @@ SELECT
gas_limit,
script,
arguments,
transaction_result,
OBJECT_CONSTRUCT(
'error',
error_message,
'events',
events,
'status',
status
) AS transaction_result,
tx_succeeded,
error_msg,
error_message AS error_msg,
COALESCE (
streamline_transaction_id,
{{ dbt_utils.generate_surrogate_key(['tx_id']) }}
) AS fact_transactions_id,
COALESCE (
inserted_timestamp,
_inserted_timestamp
) AS inserted_timestamp,
COALESCE (
modified_timestamp,
_inserted_timestamp
) AS modified_timestamp
inserted_timestamp,
modified_timestamp
FROM
FINAL
{{ ref('silver__streamline_transactions_final') }}
WHERE
NOT pending_result_response

View File

@ -98,6 +98,7 @@ models:
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- VARIANT
- OBJECT
- name: TX_SUCCEEDED
description: "{{ doc('tx_succeeded') }}"

View File

@ -3,40 +3,8 @@
tag = ['scheduled']
) }}
WITH pairs_cw AS (
WITH pairs_s AS (
SELECT
tx_id,
NULL AS labels_pools_metapier_id,
swap_contract,
deployment_timestamp,
token0_contract,
token1_contract,
pool_id,
vault_address,
NULL AS inserted_timestamp,
_inserted_timestamp,
NULL AS modified_timestamp
FROM
{{ ref('silver__labels_pools') }}
),
metapier_cw AS (
SELECT
tx_id,
NULL AS labels_pools_metapier_id,
swap_contract,
deployment_timestamp,
token0_contract,
token1_contract,
pool_id,
vault_address,
NULL AS inserted_timestamp,
_inserted_timestamp,
NULL AS modified_timestamp
FROM
{{ ref('silver__labels_pools_metapier') }}
),
pairs_s AS (
SELECT
tx_id,
labels_pools_id AS labels_pools_metapier_id,
@ -47,7 +15,6 @@ pairs_s AS (
pool_id,
vault_address,
inserted_timestamp,
_inserted_timestamp,
modified_timestamp
FROM
{{ ref('silver__labels_pools_s') }}
@ -63,27 +30,16 @@ metapier_s AS (
pool_id,
vault_address,
inserted_timestamp,
_inserted_timestamp,
modified_timestamp
FROM
{{ ref('silver__labels_pools_metapier_s') }}
),
FINAL AS (
SELECT
*
FROM
pairs_cw
UNION
SELECT
*
FROM
metapier_cw
UNION
SELECT
*
FROM
pairs_s
UNION
UNION ALL
SELECT
*
FROM
@ -100,13 +56,7 @@ SELECT
labels_pools_metapier_id,
{{ dbt_utils.generate_surrogate_key(['tx_id']) }}
) AS dim_swap_pool_labels_id,
COALESCE (
inserted_timestamp,
_inserted_timestamp
) AS inserted_timestamp,
COALESCE (
modified_timestamp,
_inserted_timestamp
) AS modified_timestamp
inserted_timestamp,
modified_timestamp
FROM
FINAL
FINAL

View File

@ -4,84 +4,20 @@
meta ={ 'database_tags':{ 'table':{ 'PURPOSE': 'BRIDGE' }} }
) }}
WITH blocto_cw AS (
SELECT
*
FROM
{{ ref('silver__bridge_blocto') }}
WHERE
block_height < {{ var(
'STREAMLINE_START_BLOCK'
) }}
),
celer_cw AS (
SELECT
*
FROM
{{ ref('silver__bridge_celer') }}
WHERE
block_height < {{ var(
'STREAMLINE_START_BLOCK'
) }}
),
WITH
blocto_s AS (
SELECT
*
FROM
{{ ref('silver__bridge_blocto_s') }}
WHERE
block_height >= {{ var(
'STREAMLINE_START_BLOCK'
) }}
),
celer_s AS (
SELECT
*
FROM
{{ ref('silver__bridge_celer_s') }}
WHERE
block_height >= {{ var(
'STREAMLINE_START_BLOCK'
) }}
),
combo AS (
SELECT
NULL AS bridge_id,
tx_id,
block_timestamp,
block_height,
teleport_contract AS bridge_contract,
token_contract,
gross_amount AS amount,
flow_wallet_address,
blockchain,
teleport_direction AS direction,
bridge,
_inserted_timestamp,
NULL AS inserted_timestamp,
NULL AS modified_timestamp
FROM
blocto_cw
UNION ALL
SELECT
NULL AS bridge_id,
tx_id,
block_timestamp,
block_height,
bridge_contract,
token_contract,
amount,
flow_wallet_address,
blockchain,
direction,
bridge,
_inserted_timestamp,
NULL AS inserted_timestamp,
NULL AS modified_timestamp
FROM
celer_cw
UNION ALL
SELECT
bridge_blocto_id AS bridge_id,
tx_id,
@ -94,7 +30,6 @@ combo AS (
blockchain,
teleport_direction AS direction,
bridge,
_inserted_timestamp,
inserted_timestamp,
modified_timestamp
FROM
@ -112,7 +47,6 @@ combo AS (
blockchain,
direction,
bridge,
_inserted_timestamp,
inserted_timestamp,
modified_timestamp
FROM
@ -133,13 +67,7 @@ SELECT
bridge_id,
{{ dbt_utils.generate_surrogate_key(['tx_id']) }}
) AS ez_bridge_transactions_id,
COALESCE (
inserted_timestamp,
_inserted_timestamp
) AS inserted_timestamp,
COALESCE (
modified_timestamp,
_inserted_timestamp
) AS modified_timestamp
inserted_timestamp,
modified_timestamp
FROM
combo

View File

@ -4,67 +4,8 @@
meta ={ 'database_tags':{ 'table':{ 'PURPOSE': 'SWAPS' }} }
) }}
WITH chainwalkers AS (
SELECT
NULL AS swaps_id,
tx_id,
block_timestamp,
block_height,
swap_contract,
swap_index,
trader,
token_out_source,
token_out_contract,
token_out_amount,
token_in_destination,
token_in_contract,
token_in_amount,
NULL AS inserted_timestamp,
NULL AS modified_timestamp
FROM
{{ ref('silver__swaps') }}
WHERE
block_height < {{ var(
'STREAMLINE_START_BLOCK'
) }}
),
streamline AS (
SELECT
swaps_final_id AS swaps_id,
tx_id,
block_timestamp,
block_height,
swap_contract,
swap_index,
trader,
token_out_source,
token_out_contract,
token_out_amount,
token_in_destination,
token_in_contract,
token_in_amount,
inserted_timestamp,
modified_timestamp
FROM
{{ ref('silver__swaps_final') }}
WHERE
block_height >= {{ var(
'STREAMLINE_START_BLOCK'
) }}
),
FINAL AS (
SELECT
*
FROM
chainwalkers
UNION ALL
SELECT
*
FROM
streamline
)
SELECT
swaps_final_id AS swaps_id,
tx_id,
block_timestamp,
block_height,
@ -81,9 +22,9 @@ SELECT
swaps_id,
{{ dbt_utils.generate_surrogate_key(['tx_id', 'swap_index']) }}
) AS ez_swaps_id,
COALESCE(inserted_timestamp, '2000-01-01' :: TIMESTAMP_NTZ) AS inserted_timestamp,
COALESCE(modified_timestamp, '2000-01-01' :: TIMESTAMP_NTZ) AS modified_timestamp
inserted_timestamp,
modified_timestamp
FROM
FINAL
{{ ref('silver__swaps_final') }}
WHERE
token_in_contract IS NOT NULL

View File

@ -34,7 +34,8 @@ models:
- name: TRADER
description: "{{ doc('trader') }}"
tests:
- not_null
- not_null:
severity: warn
- name: TOKEN_OUT_SOURCE
description: "{{ doc('token_out_source') }}"

View File

@ -1,65 +1,9 @@
{{ config(
materialized = 'view',
tags = ['ez', 'scheduled'],
meta ={ 'database_tags':{ 'table':{ 'PURPOSE': 'STAKING' }}}
meta ={ 'database_tags':{ 'table':{ 'PURPOSE': 'STAKING' }} }
) }}
WITH chainwalkers AS (
SELECT
*
FROM
{{ ref('silver__staking_actions') }}
WHERE
block_height < {{ var(
'STREAMLINE_START_BLOCK'
) }}
),
streamline AS (
SELECT
*
FROM
{{ ref('silver__staking_actions_s') }}
WHERE
block_height >= {{ var(
'STREAMLINE_START_BLOCK'
) }}
),
FINAL AS (
SELECT
NULL AS staking_actions_id,
tx_id,
event_index,
block_timestamp,
block_height,
tx_succeeded,
delegator,
action,
amount,
node_id,
_inserted_timestamp,
NULL AS inserted_timestamp,
NULL AS modified_timestamp
FROM
chainwalkers
UNION ALL
SELECT
staking_actions_id,
tx_id,
event_index,
block_timestamp,
block_height,
tx_succeeded,
delegator,
action,
amount,
node_id,
_inserted_timestamp,
inserted_timestamp,
modified_timestamp
FROM
streamline
)
SELECT
tx_id,
event_index,
@ -74,13 +18,7 @@ SELECT
staking_actions_id,
{{ dbt_utils.generate_surrogate_key(['tx_id']) }}
) AS ez_staking_actions_id,
COALESCE (
inserted_timestamp,
_inserted_timestamp
) AS inserted_timestamp,
COALESCE (
modified_timestamp,
_inserted_timestamp
) AS modified_timestamp
inserted_timestamp,
modified_timestamp
FROM
FINAL
{{ ref('silver__staking_actions_s') }}

View File

@ -6,32 +6,6 @@
WITH allday AS (
SELECT
NULL AS nft_unique_id,
nft_id,
nft_collection,
nflallday_id,
serial_number,
moment_tier,
total_circulation,
moment_description,
player,
team,
season,
week,
classification,
play_type,
moment_date,
series,
set_name,
video_urls,
moment_stats_full,
_inserted_timestamp,
_inserted_timestamp AS inserted_timestamp,
_inserted_timestamp AS modified_timestamp
FROM
{{ ref('silver__nft_allday_metadata') }}
UNION
SELECT
nft_allday_metadata_s_id AS nft_unique_id,
nft_id,
@ -52,7 +26,6 @@ WITH allday AS (
set_name,
video_urls,
moment_stats_full,
_inserted_timestamp,
inserted_timestamp,
modified_timestamp
FROM
@ -78,7 +51,6 @@ WITH allday AS (
set_name,
video_urls,
moment_stats_full,
_inserted_timestamp,
_inserted_timestamp AS inserted_timestamp,
_inserted_timestamp AS modified_timestamp
FROM
@ -107,13 +79,7 @@ SELECT
nft_unique_id,
{{ dbt_utils.generate_surrogate_key(['nft_id']) }}
) AS dim_allday_metadata_id,
COALESCE (
inserted_timestamp,
_inserted_timestamp
) AS inserted_timestamp,
COALESCE (
modified_timestamp,
_inserted_timestamp
) AS modified_timestamp
inserted_timestamp,
modified_timestamp
FROM
allday

View File

@ -5,99 +5,56 @@ models:
description: |-
Data for NFL AllDay Moments, including player, team, stats and more. This is produced via API and may differ in structure from metadata available on-chain in the `dim_moment_metadata` table.
tests:
- dbt_utils.unique_combination_of_columns:
combination_of_columns:
- nft_id
severity: error
error_if: ">10"
warn_if: "<10"
columns:
- name: NFT_ID
description: "{{ doc('nft_id') }}"
tests:
- not_null:
severity: error
error_if: ">10"
warn_if: "<10"
- unique:
severity: error
error_if: ">10"
warn_if: "<10"
- not_null
- unique
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- STRING
- VARCHAR
severity: error
error_if: ">10"
warn_if: "<10"
- name: NFT_COLLECTION
description: "{{ doc('nft_collection') }}"
tests:
- not_null:
severity: error
error_if: ">10"
warn_if: "<10"
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- STRING
- VARCHAR
severity: error
error_if: ">10"
warn_if: "<10"
- name: NFLALLDAY_ID
description: "{{ doc('nflallday_id') }}"
tests:
- not_null:
severity: error
error_if: ">10"
warn_if: "<10"
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- STRING
- VARCHAR
severity: error
error_if: ">10"
warn_if: "<10"
- name: SERIAL_NUMBER
description: "{{ doc('serial_number') }}"
tests:
- not_null:
severity: error
error_if: ">10"
warn_if: "<10"
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- NUMBER
severity: error
error_if: ">10"
warn_if: "<10"
- name: TOTAL_CIRCULATION
description: "{{ doc('total_circulation') }}"
tests:
- not_null:
severity: error
error_if: ">10"
warn_if: "<10"
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- NUMBER
severity: error
error_if: ">10"
warn_if: "<10"
- name: MOMENT_DESCRIPTION
description: "{{ doc('moment_description') }}"
tests:
- not_null:
severity: error
error_if: ">10"
warn_if: "<10"
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- STRING
@ -106,10 +63,7 @@ models:
- name: PLAYER
description: "{{ doc('player') }}"
tests:
- not_null:
severity: error
error_if: ">10"
warn_if: "<10"
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- STRING
@ -118,10 +72,7 @@ models:
- name: TEAM
description: "{{ doc('team') }}"
tests:
- not_null:
severity: error
error_if: ">10"
warn_if: "<10"
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- STRING
@ -130,10 +81,7 @@ models:
- name: SEASON
description: "{{ doc('season') }}"
tests:
- not_null:
severity: error
error_if: ">10"
warn_if: "<10"
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- STRING
@ -142,10 +90,7 @@ models:
- name: WEEK
description: "{{ doc('week') }}"
tests:
- not_null:
severity: error
error_if: ">10"
warn_if: "<10"
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- STRING
@ -154,10 +99,7 @@ models:
- name: CLASSIFICATION
description: "{{ doc('classification') }}"
tests:
- not_null:
severity: error
error_if: ">10"
warn_if: "<10"
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- STRING
@ -166,10 +108,7 @@ models:
- name: PLAY_TYPE
description: "{{ doc('play_type') }}"
tests:
- not_null:
severity: error
error_if: ">10"
warn_if: "<10"
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- STRING
@ -178,10 +117,7 @@ models:
- name: MOMENT_DATE
description: "{{ doc('moment_date') }}"
tests:
- not_null:
severity: error
error_if: ">10"
warn_if: "<10"
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- TIMESTAMP_NTZ
@ -189,10 +125,7 @@ models:
- name: SERIES
description: "{{ doc('series') }}"
tests:
- not_null:
severity: error
error_if: ">10"
warn_if: "<10"
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- STRING
@ -201,10 +134,7 @@ models:
- name: SET_NAME
description: "{{ doc('set_name') }}"
tests:
- not_null:
severity: error
error_if: ">10"
warn_if: "<10"
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- STRING
@ -213,10 +143,7 @@ models:
- name: VIDEO_URLS
description: "{{ doc('video_urls') }}"
tests:
- not_null:
severity: error
error_if: ">10"
warn_if: "<10"
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- ARRAY
@ -224,10 +151,7 @@ models:
- name: MOMENT_STATS_FULL
description: "{{ doc('moment_stats_full') }}"
tests:
- not_null:
severity: error
error_if: ">10"
warn_if: "<10"
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- OBJECT

View File

@ -4,80 +4,8 @@
meta ={ 'database_tags':{ 'table':{ 'PURPOSE': 'NFT, ALLDAY, GOLAZOS, TOPSHOT' }} }
) }}
WITH chainwalkers AS (
SELECT
NULL AS nft_moment_metadata_id,
event_contract AS nft_collection,
nft_id,
serial_number,
max_mint_size,
play_id,
series_id,
series_name,
set_id,
set_name,
edition_id,
tier,
metadata,
_inserted_timestamp,
NULL AS inserted_timestamp,
NULL AS modified_timestamp
FROM
{{ ref('silver__nft_moment_metadata_final') }}
WHERE
NOT (
nft_collection = 'A.87ca73a41bb50ad5.Golazos'
AND edition_id = 486
)
AND NOT (
nft_collection = 'A.e4cf4bdc1751c65d.AllDay'
AND edition_id = 1486
)
),
streamline AS (
SELECT
nft_moment_metadata_id,
event_contract AS nft_collection,
nft_id,
serial_number,
max_mint_size,
play_id,
series_id,
series_name,
set_id,
set_name,
edition_id,
tier,
metadata,
_inserted_timestamp,
inserted_timestamp,
modified_timestamp
FROM
{{ ref('silver__nft_moment_metadata_final_s') }}
WHERE
NOT (
nft_collection = 'A.87ca73a41bb50ad5.Golazos'
AND edition_id = 486
)
AND NOT (
nft_collection = 'A.e4cf4bdc1751c65d.AllDay'
AND edition_id = 1486
)
),
FINAL AS (
SELECT
*
FROM
chainwalkers
UNION ALL
SELECT
*
FROM
streamline
)
SELECT
nft_collection,
event_contract AS nft_collection,
nft_id,
serial_number,
max_mint_size,
@ -95,17 +23,22 @@ SELECT
['nft_collection','edition_id','nft_id']
) }}
) AS dim_moment_metadata_id,
COALESCE (
inserted_timestamp,
_inserted_timestamp
) AS inserted_timestamp,
COALESCE (
modified_timestamp,
_inserted_timestamp
) AS modified_timestamp
inserted_timestamp,
modified_timestamp
FROM
FINAL qualify ROW_NUMBER() over (
PARTITION BY dim_moment_metadata_id
ORDER BY
_inserted_timestamp DESC
) = 1
{{ ref('silver__nft_moment_metadata_final_s') }}
WHERE
NOT (
nft_collection = 'A.87ca73a41bb50ad5.Golazos'
AND edition_id = 486
)
AND NOT (
nft_collection = 'A.e4cf4bdc1751c65d.AllDay'
AND edition_id = 1486
)
qualify ROW_NUMBER() over (
PARTITION BY dim_moment_metadata_id
ORDER BY
inserted_timestamp DESC
) = 1

View File

@ -4,70 +4,6 @@
meta ={ 'database_tags':{ 'table':{ 'PURPOSE': 'NFT' }} }
) }}
WITH chainwalkers AS (
SELECT
*
FROM
{{ ref('silver__nft_sales') }}
WHERE
block_height < {{ var(
'STREAMLINE_START_BLOCK'
) }}
),
streamline AS (
SELECT
*
FROM
{{ ref('silver__nft_sales_s') }}
WHERE
block_height >= {{ var(
'STREAMLINE_START_BLOCK'
) }}
),
FINAL AS (
SELECT
NULL AS nft_sales_id,
tx_id,
block_height,
block_timestamp,
marketplace,
nft_collection,
nft_id,
buyer,
seller,
price,
currency,
tx_succeeded,
tokenflow,
counterparties,
NULL AS inserted_timestamp,
_inserted_timestamp,
NULL AS modified_timestamp
FROM
chainwalkers
UNION ALL
SELECT
nft_sales_id,
tx_id,
block_height,
block_timestamp,
marketplace,
nft_collection,
nft_id,
buyer,
seller,
price,
currency,
tx_succeeded,
tokenflow,
counterparties,
inserted_timestamp,
_inserted_timestamp,
modified_timestamp
FROM
streamline
)
SELECT
tx_id,
block_height,
@ -86,13 +22,7 @@ SELECT
nft_sales_id,
{{ dbt_utils.generate_surrogate_key(['tx_id']) }}
) AS ez_nft_sales_id,
COALESCE (
inserted_timestamp,
_inserted_timestamp
) AS inserted_timestamp,
COALESCE (
modified_timestamp,
_inserted_timestamp
) AS modified_timestamp
inserted_timestamp,
modified_timestamp
FROM
FINAL
{{ ref('silver__nft_sales_s') }}

View File

@ -13,9 +13,8 @@ models:
- buyer
- nft_collection
- nft_id
severity: error
error_if: ">10"
warn_if: "<10"
severity: warn
error_if: ">5000"
columns:
- name: TX_ID

View File

@ -30,7 +30,9 @@
%}
{% set incr = "" %}
{% if is_incremental() %}
{% set incr = """
WHERE
modified_timestamp >= (
SELECT
@ -50,7 +52,7 @@
)
)
""" %}
{% endif %}
{% endif %}
{% set run = run_query(query ~ incr) %}
{% endif %}
@ -86,6 +88,7 @@ WHERE
silver.streamline_transactions_final_intermediate_tmp
)
{% endif %}
),
blocks AS (
SELECT
@ -93,6 +96,7 @@ blocks AS (
FROM
{{ ref('silver__streamline_blocks') }}
{% if is_incremental() %}
WHERE
modified_timestamp >= SYSDATE() - INTERVAL '3 days'
@ -103,6 +107,7 @@ WHERE
silver.streamline_transactions_final_intermediate_tmp
)
{% endif %}
),
FINAL AS (
SELECT

View File

@ -129,7 +129,7 @@ trade_data AS (
_inserted_timestamp
FROM
swaps_single_trade sst
LEFT JOIN {{ ref('silver__contract_labels') }}
LEFT JOIN {{ ref('silver__contract_labels_s') }}
l USING (event_contract)
WHERE
event_type = 'Trade'

View File

@ -36,7 +36,7 @@ pair_labels AS (
SELECT
*
FROM
{{ ref('silver__contract_labels') }}
{{ ref('silver__contract_labels_s') }}
WHERE
contract_name ILIKE '%swappair%'
),

View File

@ -9,7 +9,7 @@ WITH mints AS (
event_contract,
event_data :momentID :: STRING AS moment_id
FROM
{{ ref('silver__nft_moments') }}
{{ ref('silver__nft_moments_s') }}
WHERE
event_contract = 'A.0b2a3299cc857e29.TopShot'
AND event_type = 'MomentMinted'
@ -19,7 +19,7 @@ sales AS (
nft_collection AS event_contract,
nft_id AS moment_id
FROM
{{ ref('silver__nft_sales') }}
{{ ref('silver__nft_sales_s') }}
WHERE
nft_collection ILIKE '%topshot%'
),

View File

@ -1,9 +0,0 @@
# SILVER_CW / The Streamline Migration
This directory is a copy of the silver directory with models downstream of the Chainwalkers source. Chainwalkers is being de-commissioned in October 2023, but the Streamline backfill will take some time to finish. These models are thus copied over as an archive of the Chainwalkers data, and to keep the new (streamline) models separate and organized.
This entire directory may be deleted once Chainwalkers is fully deprecated.
New models that have been migrated to use Streamline as a source are suffixed with `_s`. No suffix was added for models that do not require a migration (prices, labels, external APIs, etc.). While I would prefer the new models have no suffix, the decision was made to append `_s` to the new streamline models (instead of copying data into new tables and appending `_cw` to the deprecating data) to minimize the touchpoints on prod data. A decision may be made once the CW data is fully deprecated to drop the suffix from all models and rebuild in cleanly named tables.
The `scheduled` tag will remain on all models that should run on an hourly schedule. 2 new tags have been added for org purposes: `streamline_scheduled` and `chainwalkers_scheduled` to enable clear model selection. These tags were added only where a model was already tagged with `scheduled` to limit net-new.

View File

@ -1,85 +0,0 @@
{{ config(
materialized = 'incremental',
cluster_by = ['_inserted_timestamp::DATE'],
unique_key = 'block_height',
incremental_strategy = 'delete+insert',
tags = ['scheduled', 'chainwalkers_scheduled']
) }}
WITH bronze_blocks AS (
SELECT
*
FROM
{{ ref('bronze__blocks') }}
{% if is_incremental() %}
WHERE
_inserted_timestamp >= (
SELECT
MAX(_inserted_timestamp)
FROM
{{ this }}
)
{% endif %}
qualify ROW_NUMBER() over (
PARTITION BY block_id
ORDER BY
_ingested_at DESC
) = 1
),
silver_blocks AS (
SELECT
block_id AS block_height,
block_timestamp,
network,
chain_id,
tx_count,
COALESCE(
header :block_id,
header :block_header :block_id,
header :id
) :: STRING AS id,
COALESCE(
header :parent_id,
header :parentId,
header :block_header :parent_id
) :: STRING AS parent_id,
_ingested_at,
_inserted_timestamp
FROM
bronze_blocks
),
network_version AS (
SELECT
root_height,
network_version,
COALESCE(LAG(root_height) over (
ORDER BY
network_version DESC) - 1, 'inf' :: FLOAT) AS end_height
FROM
{{ ref('seeds__network_version') }}
),
add_version AS (
SELECT
block_height,
block_timestamp,
network,
v.network_version,
chain_id,
tx_count,
id,
parent_id,
_ingested_at,
_inserted_timestamp
FROM
silver_blocks b
LEFT JOIN network_version v
ON b.block_height BETWEEN v.root_height
AND v.end_height
)
SELECT
*
FROM
add_version

View File

@ -1,36 +0,0 @@
version: 2
models:
- name: silver__blocks
description: Information about blocks on the FLOW network and corresponding metadata.
columns:
- name: block_height
description: "{{ doc('block_height') }}"
- name: block_timestamp
description: "{{ doc('block_timestamp') }}"
- name: network
description: "{{ doc('network') }}"
- name: network_version
description: "{{ doc('network_version') }}"
- name: chain_id
description: "{{ doc('chain_id') }}"
- name: tx_count
description: "{{ doc('tx_count') }}"
- name: id
description: "{{ doc('id') }}"
- name: parent_id
description: "{{ doc('parent_id') }}"
- name: _ingested_at
description: "{{ doc('_ingested_at') }}"
- name: _inserted_timestamp
description: "{{ doc('_inserted_timestamp') }}"

View File

@ -1,170 +0,0 @@
{{ config(
materialized = 'incremental',
cluster_by = ['_inserted_timestamp::DATE'],
unique_key = 'attribute_id',
incremental_strategy = 'delete+insert',
tags = ['scheduled', 'chainwalkers_scheduled']
) }}
WITH events AS (
SELECT
*
FROM
{{ ref('silver__events') }}
WHERE
_inserted_timestamp :: DATE < '2022-07-18'
{% if is_incremental() %}
AND _inserted_timestamp >= (
SELECT
MAX(_inserted_timestamp)
FROM
{{ this }}
)
{% endif %}
),
events_data AS (
SELECT
event_id,
tx_id,
block_timestamp,
event_index,
event_contract,
event_type,
_event_data_type,
_event_data_fields,
_ingested_at,
COALESCE(
_event_data_type :fields,
_event_data_type :Fields
) AS event_data_type_fields,
_inserted_timestamp
FROM
events
),
attributes AS (
SELECT
event_id,
tx_id,
block_timestamp,
event_index,
event_contract,
event_type,
COALESCE(
VALUE :identifier,
VALUE :Identifier
) :: STRING AS attribute_key,
COALESCE(
_event_data_fields [index] :Value,
_event_data_fields [index]
) AS attribute_value,
concat_ws(
'-',
event_id,
INDEX
) AS attribute_id,
INDEX AS attribute_index,
_ingested_at,
_inserted_timestamp
FROM
events_data,
LATERAL FLATTEN(
input => event_data_type_fields
)
),
handle_address_arrays AS (
SELECT
attribute_id,
b.index,
LPAD(TRIM(to_char(b.value :: INT, 'XXXXXXX')) :: STRING, 2, '0') AS hex
FROM
attributes A,
TABLE(FLATTEN(attribute_value, recursive => TRUE)) b
WHERE
IS_ARRAY(attribute_value) = TRUE
ORDER BY
1,
2
),
recombine_address AS (
SELECT
attribute_id,
CONCAT(
'0x',
ARRAY_TO_STRING(ARRAY_AGG(hex) within GROUP (
ORDER BY
INDEX ASC), '')
) AS decoded_address
FROM
handle_address_arrays
GROUP BY
1
),
replace_arrays AS (
SELECT
A.attribute_id,
event_id,
tx_id,
block_timestamp,
event_index,
attribute_index,
event_contract,
event_type,
attribute_key,
attribute_value,
decoded_address,
COALESCE(
decoded_address,
attribute_value
) :: STRING AS attribute_value_adj,
_ingested_at,
_inserted_timestamp
FROM
attributes A
LEFT JOIN recombine_address USING (attribute_id)
),
address_adjustment AS (
SELECT
attribute_id,
LENGTH(attribute_value_adj) AS ava_len,
CONCAT(
'0x',
LPAD(SPLIT(attribute_value_adj, '0x') [1], 16, '0') :: STRING
) AS address_adj
FROM
replace_arrays
WHERE
attribute_value_adj LIKE '0x%'
AND ava_len < 19
),
FINAL AS (
SELECT
A.attribute_id,
event_id,
tx_id,
block_timestamp,
event_index,
attribute_index,
event_contract,
event_type,
attribute_key,
decoded_address,
attribute_value,
REPLACE(
COALESCE(
address_adj,
attribute_value_adj
),
'"'
) AS attribute_value_adj,
_ingested_at,
_inserted_timestamp
FROM
replace_arrays A
LEFT JOIN address_adjustment USING (attribute_id)
)
SELECT
*
FROM
FINAL

View File

@ -1,154 +0,0 @@
{{ config(
materialized = 'incremental',
cluster_by = ['_inserted_timestamp::DATE'],
unique_key = 'attribute_id',
incremental_strategy = 'delete+insert',
tags = ['scheduled', 'chainwalkers_scheduled']
) }}
WITH events AS (
SELECT
*
FROM
{{ ref('silver__events') }}
WHERE
_inserted_timestamp :: DATE >= '2022-07-18'
{% if is_incremental() %}
AND _inserted_timestamp >= (
SELECT
MAX(_inserted_timestamp)
FROM
{{ this }}
)
{% endif %}
),
payload_method AS (
SELECT
*
FROM
events
WHERE
_try_parse_payload IS NOT NULL
),
events_data AS (
SELECT
event_id,
tx_id,
block_timestamp,
event_index,
event_contract,
event_type,
_event_data_type,
_event_data_fields,
_ingested_at,
COALESCE(
_event_data_type :fields,
_event_data_type :Fields
) AS event_data_type_fields,
_inserted_timestamp
FROM
events
WHERE
_try_parse_payload IS NULL
),
attributes AS (
SELECT
event_id,
tx_id,
block_timestamp,
event_index,
event_contract,
event_type,
COALESCE(
VALUE :identifier,
VALUE :Identifier
) :: STRING AS attribute_key,
COALESCE(
_event_data_fields [index] :fields,
_event_data_fields [index] :staticType :typeID,
_event_data_fields [index] :value :value,
_event_data_fields [index] :value,
_event_data_fields [index] :Value,
_event_data_fields [index]
) :: STRING AS attribute_value,
concat_ws(
'-',
event_id,
INDEX
) AS attribute_id,
INDEX AS attribute_index,
_ingested_at,
_inserted_timestamp,
'attributes' AS _cte
FROM
events_data,
LATERAL FLATTEN(
input => event_data_type_fields
)
),
attributes_2 AS (
SELECT
event_id,
tx_id,
block_timestamp,
event_index,
event_contract,
event_type,
VALUE :name :: STRING AS attribute_key,
COALESCE(
VALUE :value :value :fields,
VALUE :value :value :staticType,
VALUE :value :value :value :value,
VALUE :value :value :value,
VALUE :value :value
) :: STRING AS attribute_value,
concat_ws(
'-',
event_id,
INDEX
) AS attribute_id,
INDEX AS attribute_index,
_ingested_at,
_inserted_timestamp,
'attributes_2' AS _cte
FROM
payload_method,
LATERAL FLATTEN(
_try_parse_payload :value :fields
)
),
combo AS (
SELECT
*
FROM
attributes
UNION
SELECT
*
FROM
attributes_2
),
FINAL AS (
SELECT
attribute_id,
event_id,
tx_id,
block_timestamp,
event_index,
attribute_index,
event_contract,
event_type,
attribute_key,
attribute_value,
_ingested_at,
_inserted_timestamp,
_cte
FROM
combo
)
SELECT
*
FROM
FINAL

View File

@ -1,114 +0,0 @@
version: 2
models:
- name: silver__event_attributes_https
description: |-
This table cleans and transform attributes from each event in the events table.
tests:
- dbt_utils.unique_combination_of_columns:
combination_of_columns:
- tx_id
- event_index
- attribute_index
columns:
- name: attribute_id
description: "{{ doc('attribute_id') }}"
tests:
- not_null
- unique
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- STRING
- VARCHAR
- name: event_id
description: "{{ doc('event_id') }}"
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- STRING
- VARCHAR
- name: tx_id
description: "{{ doc('tx_id') }}"
tests:
- not_null
- name: block_timestamp
description: "{{ doc('block_timestamp') }}"
tests:
- not_null
- dbt_expectations.expect_row_values_to_have_recent_data:
datepart: day
interval: 1
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- TIMESTAMP_NTZ
- name: event_index
description: "{{ doc('event_index') }}"
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- NUMBER
- name: attribute_index
description: "{{ doc('attribute_index') }}"
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- NUMBER
- name: event_contract
description: "{{ doc('event_contract') }}"
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- STRING
- VARCHAR
- name: event_type
description: "{{ doc('event_type') }}"
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- STRING
- VARCHAR
- name: attribute_key
description: "{{ doc('attribute_key') }}"
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- STRING
- VARCHAR
- name: decoded_address
description: "{{ doc('decoded_address') }}"
- name: attribute_value
description: "{{ doc('attribute_value') }}"
- name: attribute_value_adj
description: "{{ doc('attribute_value_adj') }}"
- name: _ingested_at
description: "{{ doc('_ingested_at') }}"
- name: _inserted_timestamp
description: "{{ doc('_inserted_timestamp') }}"
tests:
- not_null
- dbt_expectations.expect_row_values_to_have_recent_data:
datepart: day
interval: 1
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- TIMESTAMP_NTZ

View File

@ -1,100 +0,0 @@
{{ config(
materialized = 'incremental',
cluster_by = ['_inserted_timestamp::DATE'],
unique_key = 'event_id',
incremental_strategy = 'delete+insert',
tags = ['scheduled', 'chainwalkers_scheduled']
) }}
WITH transactions AS (
SELECT
*
FROM
{{ ref('silver__transactions') }}
{% if is_incremental() %}
WHERE
_inserted_timestamp >= (
SELECT
MAX(_inserted_timestamp)
FROM
{{ this }}
)
{% endif %}
),
events AS (
SELECT
VALUE,
tx_id,
block_timestamp,
block_height,
tx_succeeded,
INDEX AS _index_from_flatten,
COALESCE(
VALUE :event_index,
VALUE :eventIndex
) :: NUMBER AS event_index,
VALUE :value :: variant AS event_data,
COALESCE(
VALUE :value :EventType,
VALUE :value :eventType
) :: variant AS event_data_type,
COALESCE(
VALUE :value :Fields,
VALUE :value :fields
) :: variant AS event_data_fields,
SPLIT(
IFF(
(
event_data_type :qualifiedIdentifier LIKE 'A.%'
OR event_data_type :qualifiedIdentifier LIKE 'flow%'
),
event_data_type :qualifiedIdentifier,
VALUE :type
),
'.'
) AS type_split,
ARRAY_TO_STRING(
ARRAY_SLICE(type_split, 0, ARRAY_SIZE(type_split) -1),
'.') AS event_contract,
type_split [array_size(type_split)-1] :: STRING AS event_type,
concat_ws(
'-',
tx_id,
event_index
) AS event_id,
TRY_PARSE_JSON(BASE64_DECODE_STRING(VALUE :payload)) AS try_parse_payload,
_ingested_at,
_inserted_timestamp
FROM
transactions,
LATERAL FLATTEN(
input => transaction_result :events
)
),
FINAL AS (
SELECT
event_id,
tx_id,
block_timestamp,
block_height,
tx_succeeded,
event_index,
event_contract,
event_type,
event_data,
event_data_type AS _event_data_type,
event_data_fields AS _event_data_fields,
try_parse_payload AS _try_parse_payload,
_event_data_type :fields AS _attribute_fields,
_index_from_flatten,
_ingested_at,
_inserted_timestamp
FROM
events
)
SELECT
*
FROM
FINAL

View File

@ -1,116 +0,0 @@
version: 2
models:
- name: silver__events
description: |-
This table records events from each transaction on the FLOW blockchain.
tests:
- dbt_utils.unique_combination_of_columns:
combination_of_columns:
- tx_id
- event_index
columns:
- name: event_id
description: "{{ doc('event_id') }}"
tests:
- not_null
- unique
- name: tx_id
description: "{{ doc('tx_id') }}"
tests:
- not_null
- name: block_timestamp
description: "{{ doc('block_timestamp') }}"
tests:
- not_null
- dbt_expectations.expect_row_values_to_have_recent_data:
datepart: day
interval: 1
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- TIMESTAMP_NTZ
- name: block_height
description: "{{ doc('block_height') }}"
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- NUMBER
- FLOAT
- name: tx_succeeded
description: "{{ doc('tx_succeeded') }}"
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- BOOLEAN
- name: event_index
description: "{{ doc('event_index') }}"
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- NUMBER
- name: event_contract
description: "{{ doc('event_contract') }}"
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- STRING
- VARCHAR
- name: event_type
description: "{{ doc('event_type') }}"
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- STRING
- VARCHAR
- name: event_data
description: "{{ doc('event_data') }}"
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- VARIANT
- name: _event_data_type
description: "{{ doc('_event_data_type') }}"
tests:
- not_null:
name: event_data_type__not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- VARIANT
name: event_data_type__expected_type
- name: _event_data_fields
description: "{{ doc('_event_data_fields') }}"
tests:
- not_null:
name: event_data_fields__not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- VARIANT
name: event_data_fields__expected_type
- name: _ingested_at
description: "{{ doc('_ingested_at') }}"
- name: _inserted_timestamp
description: "{{ doc('_inserted_timestamp') }}"
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- TIMESTAMP_NTZ

View File

@ -1,183 +0,0 @@
{{ config(
materialized = 'incremental',
incremental_strategy = 'delete+insert',
cluster_by = ['_inserted_timestamp::date'],
unique_key = "CONCAT_WS('-', tx_id, event_index)",
tags = ['scheduled', 'chainwalkers_scheduled']
) }}
WITH silver_events AS (
SELECT
*
FROM
{{ ref('silver__events') }}
{% if is_incremental() %}
WHERE
_inserted_timestamp >= (
SELECT
MAX(_inserted_timestamp)
FROM
{{ this }}
)
{% endif %}
),
silver_event_attributes AS (
SELECT
*
FROM
{{ ref('silver__event_attributes') }}
{% if is_incremental() %}
WHERE
_inserted_timestamp >= (
SELECT
MAX(_inserted_timestamp)
FROM
{{ this }}
)
{% endif %}
),
silver_event_attributes_https AS (
SELECT
*
FROM
{{ ref('silver__event_attributes_https') }}
{% if is_incremental() %}
WHERE
_inserted_timestamp >= (
SELECT
MAX(_inserted_timestamp)
FROM
{{ this }}
)
{% endif %}
),
all_event_attributes AS (
SELECT
attribute_id,
event_id,
tx_id,
block_timestamp,
event_index,
attribute_index,
event_contract,
event_type,
attribute_key,
attribute_value_adj AS attribute_value,
_ingested_at,
_inserted_timestamp
FROM
silver_event_attributes
UNION
SELECT
attribute_id,
event_id,
tx_id,
block_timestamp,
event_index,
attribute_index,
event_contract,
event_type,
attribute_key,
attribute_value,
_ingested_at,
_inserted_timestamp
FROM
silver_event_attributes_https
),
objs AS (
SELECT
event_id,
OBJECT_AGG(
attribute_key,
IFF(
attribute_value IS NULL,
'null',
attribute_value
) :: variant
) AS event_data
FROM
all_event_attributes
GROUP BY
1
),
location_object AS (
SELECT
event_id,
tx_id,
block_timestamp,
block_height,
tx_succeeded,
event_index,
event_contract,
event_type,
COALESCE(
_event_data_type :location,
_event_data_type :Location
) AS event_data,
_ingested_at,
_inserted_timestamp
FROM
silver_events
WHERE
_event_data_fields = '[]'
AND tx_id NOT IN (
SELECT
tx_id
FROM
silver_event_attributes_https
)
),
gold_events AS (
SELECT
e.event_id,
e.tx_id,
e.block_timestamp,
e.block_height,
e.tx_succeeded,
e.event_index,
e.event_contract,
e.event_type,
A.event_data,
e._ingested_at,
e._inserted_timestamp
FROM
objs A
LEFT JOIN silver_events e USING (event_id)
),
FINAL AS (
SELECT
tx_id,
block_timestamp,
block_height,
tx_succeeded,
event_index,
event_contract,
event_type,
event_data,
_ingested_at,
_inserted_timestamp
FROM
gold_events
UNION
SELECT
tx_id,
block_timestamp,
block_height,
tx_succeeded,
event_index,
event_contract,
event_type,
event_data,
_ingested_at,
_inserted_timestamp
FROM
location_object
)
SELECT
*
FROM
FINAL

View File

@ -1,37 +0,0 @@
version: 2
models:
- name: silver__events_final
description: |-
This table records events from each transaction on the FLOW blockchain.
columns:
- name: tx_id
description: "{{ doc('tx_id') }}"
- name: block_timestamp
description: "{{ doc('block_timestamp') }}"
- name: block_height
description: "{{ doc('block_height') }}"
- name: tx_succeeded
description: "{{ doc('tx_succeeded') }}"
- name: event_index
description: "{{ doc('event_index') }}"
- name: event_contract
description: "{{ doc('event_contract') }}"
- name: event_type
description: "{{ doc('event_type') }}"
- name: event_data
description: "{{ doc('event_attributes') }}"
- name: _ingested_at
description: "{{ doc('_ingested_at') }}"
- name: _inserted_timestamp
description: "{{ doc('_inserted_timestamp') }}"

View File

@ -1,123 +0,0 @@
{{ config(
materialized = 'incremental',
cluster_by = ['_inserted_timestamp::DATE'],
unique_key = 'tx_id',
incremental_strategy = 'delete+insert',
tags = ['scheduled', 'chainwalkers_scheduled']
) }}
WITH bronze_txs AS (
SELECT
*
FROM
{{ ref('bronze__transactions') }}
{% if is_incremental() %}
WHERE
_inserted_timestamp >= (
SELECT
MAX(_inserted_timestamp)
FROM
{{ this }}
)
{% endif %}
qualify ROW_NUMBER() over (
PARTITION BY tx_id
ORDER BY
_ingested_at DESC
) = 1
),
silver_txs AS (
SELECT
tx_id,
block_timestamp,
block_id AS block_height,
chain_id,
tx_block_index AS tx_index,
COALESCE(
tx :proposal_key :Address,
tx :proposalKeyAddress
) :: STRING AS proposer,
tx :payer :: STRING AS payer,
tx :authorizers :: ARRAY AS authorizers,
ARRAY_SIZE(authorizers) AS count_authorizers,
COALESCE(
tx :gas_limit,
tx :gasLimit
) :: NUMBER AS gas_limit,
COALESCE(
tx :transaction_result,
tx :result
) :: variant AS transaction_result,
CASE
WHEN transaction_result :error = '' THEN TRUE
WHEN transaction_result :error :: STRING IS NULL THEN TRUE
ELSE FALSE
END AS tx_succeeded,
COALESCE(
transaction_result :error,
''
) :: STRING AS error_msg,
_ingested_at,
_inserted_timestamp
FROM
bronze_txs
),
concat_authorizers AS (
SELECT
tx_id,
CONCAT(
'0x',
VALUE
) AS x_auth
FROM
silver_txs,
LATERAL FLATTEN (
input => authorizers
)
),
authorizers_array AS (
SELECT
tx_id,
ARRAY_AGG(x_auth) AS authorizers
FROM
concat_authorizers
GROUP BY
1
),
FINAL AS (
SELECT
t.tx_id,
block_timestamp,
block_height,
chain_id,
tx_index,
CONCAT(
'0x',
proposer
) AS proposer,
CONCAT(
'0x',
payer
) AS payer,
COALESCE(
aa.authorizers,
t.authorizers
) AS authorizers,
count_authorizers,
gas_limit,
transaction_result,
tx_succeeded,
error_msg,
_ingested_at,
_inserted_timestamp
FROM
silver_txs t
LEFT JOIN authorizers_array aa USING (tx_id)
)
SELECT
*
FROM
FINAL

View File

@ -1,52 +0,0 @@
version: 2
models:
- name: silver__transactions
description: |-
This table records all the transactions of the FLOW blockchain.
columns:
- name: tx_id
description: "{{ doc('tx_id') }}"
- name: block_timestamp
description: "{{ doc('block_timestamp') }}"
- name: block_height
description: "{{ doc('block_height') }}"
- name: chain_id
description: "{{ doc('chain_id') }}"
- name: tx_index
description: tbd
- name: proposer
description: "{{ doc('proposer') }}"
- name: payer
description: "{{ doc('payer') }}"
- name: authorizers
description: "{{ doc('authorizers') }}"
- name: count_authorizers
description: "{{ doc('count_authorizers') }}"
- name: gas_limit
description: "{{ doc('gas_limit') }}"
- name: transaction_result
description: "{{ doc('transaction_result') }}"
- name: tx_succeeded
description: "{{ doc('tx_succeeded') }}"
- name: error_msg
description: tbd
- name: _ingested_at
description: "{{ doc('_ingested_at') }}"
- name: _inserted_timestamp
description: "{{ doc('_inserted_timestamp') }}"

View File

@ -1,52 +0,0 @@
{{ config(
materialized = 'incremental',
incremental_strategy = 'delete+insert',
cluster_by = ['event_contract'],
unique_key = 'event_contract',
tags = ['scheduled', 'chainwalkers_scheduled']
) }}
WITH splt AS (
SELECT
event_contract,
SPLIT(
event_contract,
'.'
) AS ec_s,
_inserted_timestamp
FROM
{{ ref('silver__events') }}
{% if is_incremental() %}
WHERE
_inserted_timestamp >= (
SELECT
MAX(_inserted_timestamp)
FROM
{{ this }}
)
{% endif %}
),
FINAL AS (
SELECT
DISTINCT event_contract,
ec_s [array_size(ec_s)-1] :: STRING AS contract_name,
CONCAT(
'0x',
ec_s [array_size(ec_s)-2] :: STRING
) AS account_address,
_inserted_timestamp
FROM
splt
WHERE
ec_s [0] != 'flow'
)
SELECT
*
FROM
FINAL qualify ROW_NUMBER() over (
PARTITION BY event_contract
ORDER BY
_inserted_timestamp DESC
) = 1

View File

@ -1,16 +0,0 @@
version: 2
models:
- name: silver__contract_labels
description: |-
This table extracts all contract labels referenced in the events item of a Flow transaction.
columns:
- name: event_contract
description: "{{ doc('event_contract') }}"
- name: contract_name
description: "{{ doc('contract_name') }}"
- name: account_address
description: "{{ doc('account_address') }}"

View File

@ -1,66 +0,0 @@
{{ config(
materialized = 'incremental',
cluster_by = ['_inserted_timestamp::DATE'],
unique_key = 'tx_id',
incremental_strategy = 'delete+insert',
tags = ['scheduled', 'chainwalkers_scheduled']
) }}
WITH events AS (
SELECT
*
FROM
{{ ref('silver__events_final') }}
{% if is_incremental() %}
WHERE
_inserted_timestamp >= (
SELECT
MAX(_inserted_timestamp)
FROM
{{ this }}
)
{% endif %}
),
pair_labels AS (
SELECT
*
FROM
{{ ref('silver__contract_labels') }}
WHERE
contract_name ILIKE '%swappair%'
),
pair_creation AS (
SELECT
tx_id,
block_timestamp,
event_contract,
event_data :numPairs :: NUMBER AS pair_number,
event_data :pairAddress :: STRING AS account_address,
event_data :token0Key :: STRING AS token0_contract,
event_data :token1Key :: STRING AS token1_contract,
_inserted_timestamp
FROM
events
WHERE
event_type = 'PairCreated'
),
FINAL AS (
SELECT
tx_id,
block_timestamp AS deployment_timestamp,
pair_number as pool_id,
p.account_address as vault_address,
token0_contract,
token1_contract,
l.event_contract AS swap_contract,
_inserted_timestamp
FROM
pair_creation p
LEFT JOIN pair_labels l USING (account_address)
)
SELECT
*
FROM
FINAL

View File

@ -1,49 +0,0 @@
version: 2
models:
- name: silver__labels_pools
description: |-
Looks for new PairCreated events from the Swap pair factory and records info.
columns:
- name: tx_id
description: "{{ doc('tx_id') }}"
tests:
- not_null
- unique
- name: deployment_timestamp
description: "{{ doc('deployment_timestamp') }}"
tests:
- not_null
- name: token0_contract
description: "{{ doc('token0_contract') }}"
tests:
- not_null
- name: token1_contract
description: "{{ doc('token1_contract') }}"
tests:
- not_null
- name: pool_id
description: "{{ doc('pool_id') }}"
tests:
- not_null
- name: vault_address
description: "{{ doc('vault_address') }}"
tests:
- not_null
- unique
- name: swap_contract
description: "{{ doc('swap_contract') }}"
tests:
- not_null
- name: _inserted_timestamp
description: "{{ doc('_inserted_timestamp') }}"
tests:
- not_null

View File

@ -1,120 +0,0 @@
{{ config(
materialized = 'incremental',
cluster_by = ['_inserted_timestamp::DATE'],
unique_key = 'tx_id',
incremental_strategy = 'delete+insert',
tags = ['scheduled', 'chainwalkers_scheduled']
) }}
WITH events AS (
SELECT
*
FROM
{{ ref('silver__events_final') }}
{% if is_incremental() %}
WHERE
_inserted_timestamp >= (
SELECT
MAX(_inserted_timestamp)
FROM
{{ this }}
)
{% endif %}
),
pierpools AS (
SELECT
tx_id,
block_timestamp,
event_contract,
event_type,
event_data :poolId :: STRING AS pool_id,
_inserted_timestamp
FROM
events
WHERE
event_contract = 'A.609e10301860b683.PierSwapFactory'
AND event_type = 'NewPoolCreated'
),
pier_events AS (
SELECT
*
FROM
events
WHERE
tx_id IN (
SELECT
DISTINCT tx_id
FROM
pierpools
)
),
token_withdraws AS (
SELECT
tx_id,
block_timestamp,
event_contract,
event_index,
_inserted_timestamp
FROM
pier_events
WHERE
event_type = 'TokensWithdrawn'
AND event_data :amount :: DOUBLE = 0
),
pairs AS (
SELECT
tx_id,
block_timestamp AS deployment_timestamp,
event_contract AS token0_contract,
LAG(event_contract) over (
PARTITION BY tx_id
ORDER BY
event_index
) AS token1_contract,
_inserted_timestamp
FROM
token_withdraws
),
escrow AS (
SELECT
tx_id,
event_data :address :: STRING AS vault_address
FROM
pier_events
WHERE
event_contract = 'flow'
AND event_type = 'AccountCreated'
),
pool_addr AS (
SELECT
tx_id,
event_contract AS swap_contract
FROM
pier_events
WHERE
event_type = 'Mint'
),
FINAL AS (
SELECT
C.tx_id,
C.deployment_timestamp,
C.token0_contract,
C.token1_contract,
p.pool_id,
e.vault_address,
pa.swap_contract,
C._inserted_timestamp
FROM
pairs C
LEFT JOIN pierpools p USING (tx_id)
LEFT JOIN escrow e USING (tx_id)
LEFT JOIN pool_addr pa USING (tx_id)
WHERE
token1_contract IS NOT NULL
)
SELECT
*
FROM
FINAL

View File

@ -1,50 +0,0 @@
version: 2
models:
- name: silver__labels_pools_metapier
description: |-
Looks for new PoolCreated events from the Metapier factory and recordes info.
columns:
- name: tx_id
description: "{{ doc('tx_id') }}"
tests:
- not_null
- unique
- name: deployment_timestamp
description: "{{ doc('deployment_timestamp') }}"
tests:
- not_null
- name: token0_contract
description: "{{ doc('token0_contract') }}"
tests:
- not_null
- name: token1_contract
description: "{{ doc('token1_contract') }}"
tests:
- not_null
- name: pool_id
description: "{{ doc('pool_id') }}"
tests:
- not_null
- unique
- name: vault_address
description: "{{ doc('vault_address') }}"
tests:
- not_null
- unique
- name: swap_contract
description: "{{ doc('swap_contract') }}"
tests:
- not_null
- name: _inserted_timestamp
description: "{{ doc('_inserted_timestamp') }}"
tests:
- not_null

View File

@ -1,337 +0,0 @@
{{ config(
materialized = 'incremental',
incremental_strategy = 'delete+insert',
cluster_by = ['_inserted_timestamp::DATE'],
unique_key = 'tx_id',
tags = ['nft', 'scheduled', 'chainwalkers_scheduled']
) }}
WITH events AS (
SELECT
*
FROM
{{ ref('silver__events_final') }}
{% if is_incremental() %}
WHERE
_inserted_timestamp >= (
SELECT
MAX(_inserted_timestamp)
FROM
{{ this }}
)
{% endif %}
),
mapped_sales AS (
SELECT
DISTINCT tx_id
FROM
{{ ref('silver__nft_transactions_secondary_market') }}
UNION
SELECT
DISTINCT tx_id
FROM
{{ ref('silver__nft_topshot_sales') }}
{% if is_incremental() %}
WHERE
_inserted_timestamp >= (
SELECT
MAX(_inserted_timestamp)
FROM
{{ this }}
)
{% endif %}
),
duc AS (
SELECT
DISTINCT tx_id
FROM
events
WHERE
event_contract = 'A.ead892083b3e2c6c.DapperUtilityCoin'
),
duc_events AS (
SELECT
*
FROM
events
WHERE
tx_id IN (
SELECT
tx_id
FROM
duc
)
AND tx_id NOT IN (
SELECT
tx_id
FROM
mapped_sales
)
),
duc_transfers AS (
SELECT
_inserted_timestamp,
tx_id,
COUNT(event_type) AS event_count,
MAX(
event_index + 1
) AS max_index
FROM
duc_events
WHERE
event_type IN (
'TokensDeposited',
'TokensWithdrawn',
'FeesDeducted',
'ForwardedDeposit'
)
GROUP BY
_inserted_timestamp,
tx_id
HAVING
event_count = max_index
),
gig_nfts AS (
SELECT
*
FROM
duc_events
WHERE
tx_id NOT IN (
SELECT
DISTINCT tx_id
FROM
duc_transfers
)
AND event_contract ILIKE 'A.329feb3ab062d289%'
AND event_type IN (
'Withdraw',
'Deposit'
)
),
gig_sales_events AS (
SELECT
*
FROM
events
WHERE
tx_id IN (
SELECT
DISTINCT tx_id
FROM
gig_nfts
)
),
missing_contract AS (
SELECT
tx_id,
block_timestamp,
block_height,
tx_succeeded,
_inserted_timestamp,
event_contract AS currency,
event_data :amount :: DOUBLE AS amount,
event_data :from :: STRING AS forwarded_from,
TRUE AS missing
FROM
gig_sales_events
WHERE
event_index = 0
AND event_type = 'TokensWithdrawn'
),
purchase_amt AS (
SELECT
tx_id,
block_timestamp,
block_height,
tx_succeeded,
_inserted_timestamp,
'A.ead892083b3e2c6c.DapperUtilityCoin' AS currency,
event_data :amount :: DOUBLE AS amount,
event_data :from :: STRING AS forwarded_from,
FALSE AS missing
FROM
gig_sales_events
WHERE
event_type = 'ForwardedDeposit'
AND tx_id NOT IN (
SELECT
tx_id
FROM
missing_contract
)
),
triage AS (
SELECT
*
FROM
missing_contract
UNION
SELECT
*
FROM
purchase_amt
),
withdraw_event AS (
SELECT
tx_id,
block_timestamp,
block_height,
_inserted_timestamp,
event_contract AS nft_collection,
event_data :from :: STRING AS seller,
event_data :id :: STRING AS nft_id
FROM
gig_sales_events
WHERE
event_type = 'Withdraw'
AND event_data :from :: STRING != 'null'
),
deposit_event AS (
SELECT
tx_id,
block_timestamp,
block_height,
_inserted_timestamp,
event_contract AS nft_collection,
event_data :to :: STRING AS buyer,
event_data :id :: STRING AS nft_id
FROM
gig_sales_events
WHERE
event_type = 'Deposit'
AND event_data :to :: STRING != 'null'
),
gl_sales AS (
SELECT
p.tx_id,
p.block_timestamp,
p.block_height,
p.tx_succeeded,
p._inserted_timestamp,
'Gigantik Primary Market' AS marketplace,
p.missing,
p.currency,
p.amount,
p.forwarded_from,
w.seller,
d.buyer,
w.nft_collection,
w.nft_id AS withdraw_nft_id,
d.nft_id AS deposit_nft_id,
w.nft_collection = d.nft_collection AS collection_check,
w.nft_id = d.nft_id AS nft_id_check
FROM
triage p
LEFT JOIN withdraw_event w USING (
tx_id,
block_timestamp,
block_height,
_inserted_timestamp
)
LEFT JOIN deposit_event d USING (
tx_id,
block_timestamp,
block_height,
_inserted_timestamp
)
),
multi AS (
SELECT
tx_id,
COUNT(
DISTINCT deposit_nft_id
) AS nfts
FROM
gl_sales
WHERE
nft_id_check
GROUP BY
1
),
giglabs_final AS (
SELECT
s.tx_id,
block_timestamp,
block_height,
marketplace,
currency,
amount / m.nfts AS price,
seller,
buyer,
nft_collection,
withdraw_nft_id AS nft_id,
m.nfts,
tx_succeeded,
_inserted_timestamp
FROM
gl_sales s
LEFT JOIN multi m USING (tx_id)
WHERE
nft_id_check
),
step_data AS (
SELECT
tx_id,
event_index,
event_type,
event_data
FROM
events
WHERE
tx_id IN (
SELECT
tx_id
FROM
giglabs_final
)
AND event_type IN (
'TokensWithdrawn',
'TokensDeposited',
'ForwardedDeposit'
)
),
counterparty_data AS (
SELECT
tx_id,
ARRAY_AGG(OBJECT_CONSTRUCT(event_type, event_data)) within GROUP (
ORDER BY
event_index
) AS tokenflow,
ARRAY_AGG(COALESCE(event_data :to, event_data :from) :: STRING) within GROUP (
ORDER BY
event_index
) AS counterparties
FROM
step_data
GROUP BY
1
),
FINAL AS (
SELECT
s.tx_id,
block_timestamp,
block_height,
marketplace,
currency,
price,
seller,
buyer,
nft_collection,
nft_id,
nfts,
tokenflow,
counterparties,
tx_succeeded,
_inserted_timestamp
FROM
giglabs_final s
LEFT JOIN counterparty_data C USING (tx_id)
)
SELECT
*
FROM
FINAL

View File

@ -1,74 +0,0 @@
version: 2
models:
- name: silver__nft_giglabs
description: |-
NFT primary market sales for Giglabs marketplaces.
tests:
- dbt_utils.unique_combination_of_columns:
combination_of_columns:
- tx_id
- seller
- buyer
- nft_collection
- nft_id
columns:
- name: tx_id
description: "{{ doc('tx_id') }}"
- name: block_height
description: "{{ doc('block_height') }}"
- name: block_timestamp
description: "{{ doc('block_timestamp') }}"
- name: marketplace
description: "{{ doc('marketplace') }}"
tests:
- not_null
- name: nft_collection
description: "{{ doc('nft_collection') }}"
tests:
- not_null
- name: nft_id
description: "{{ doc('nft_id') }}"
tests:
- not_null
- name: buyer
description: "{{ doc('buyer') }}"
tests:
- not_null
- name: seller
description: "{{ doc('seller') }}"
tests:
- not_null
- name: price
description: "{{ doc('price') }}"
tests:
- not_null
- name: currency
description: "{{ doc('currency') }}"
tests:
- not_null
- name: tx_succeeded
description: "{{ doc('tx_succeeded') }}"
- name: _ingested_at
description: "{{ doc('_ingested_at') }}"
- name: tokenflow
description: "{{ doc('tokenflow') }}"
- name: counterparties
description: "{{ doc('counterparties') }}"
- name: _inserted_timestamp
description: "{{ doc('_inserted_timestamp') }}"

View File

@ -1,50 +0,0 @@
{{ config(
materialized = 'incremental',
cluster_by = ['_inserted_timestamp'],
unique_key = "concat_ws('-', event_contract, edition_id)",
incremental_strategy = 'delete+insert',
tags = ['nft', 'dapper', 'scheduled', 'chainwalkers_scheduled']
) }}
WITH events AS (
SELECT
*
FROM
{{ ref('silver__events_final') }}
WHERE
event_type = 'EditionCreated'
AND event_contract IN (
'A.e4cf4bdc1751c65d.AllDay',
'A.b715b81853fef53f.AllDay',
'A.87ca73a41bb50ad5.Golazos'
)
{% if is_incremental() %}
AND _inserted_timestamp >= (
SELECT
MAX(_inserted_timestamp)
FROM
{{ this }}
)
{% endif %}
),
org AS (
SELECT
tx_id,
block_timestamp,
event_contract,
event_data :id :: STRING AS edition_id,
event_data :maxMintSize :: STRING AS max_mint_size,
event_data :playID :: STRING AS play_id,
event_data :seriesID :: STRING AS series_id,
event_data :setID :: STRING AS set_id,
event_data :tier :: STRING AS tier,
_inserted_timestamp
FROM
events
)
SELECT
*
FROM
org

View File

@ -1,32 +0,0 @@
version: 2
models:
- name: silver__nft_moment_editions
description: |-
Cleaned EditionCreated events.
tests:
- dbt_utils.unique_combination_of_columns:
combination_of_columns:
- event_contract
- edition_id
columns:
- name: tx_id
description: "{{ doc('tx_id') }}"
- name: block_timestamp
description: "{{ doc('block_timestamp') }}"
- name: event_contract
description: "{{ doc('event_contract') }}"
- name: edition_id
description: "{{ doc('edition_id') }}"
- name: max_mint_size
description: "{{ doc('max_mint_size') }}"
- name: play_id
description: "{{ doc('play_id') }}"
- name: series_id
description: "{{ doc('series_id') }}"
- name: set_id
description: "{{ doc('set_id') }}"
- name: tier
description: "{{ doc('tier') }}"
- name: _inserted_timestamp
description: "{{ doc('_inserted_timestamp') }}"

View File

@ -1,75 +0,0 @@
{{ config(
materialized = 'incremental',
cluster_by = ['play_id'],
unique_key = "concat_ws('-', event_contract, play_id)",
incremental_strategy = 'delete+insert',
tags = ['nft', 'dapper', 'scheduled', 'chainwalkers_scheduled']
) }}
WITH play_creation AS (
SELECT
*
FROM
{{ ref('silver__events_final') }}
WHERE
event_type = 'PlayCreated'
{#
currently includes the following contracts
A.c38aea683c0c4d38.Eternal
A.b715b81853fef53f.AllDay
A.67af7ecf76556cd3.ABD
A.0b2a3299cc857e29.TopShot
A.5c0992b465832a94.TKNZ
A.e4cf4bdc1751c65d.AllDay
A.87ca73a41bb50ad5.Golazos
#}
{% if is_incremental() %}
AND _inserted_timestamp >= (
SELECT
MAX(_inserted_timestamp)
FROM
{{ this }}
)
{% endif %}
),
play_metadata AS (
SELECT
tx_id,
block_timestamp,
event_contract,
event_data :id :: NUMBER AS play_id,
VALUE :key :value :: STRING AS column_header,
VALUE :value :value :: STRING AS column_value,
_inserted_timestamp
FROM
play_creation,
LATERAL FLATTEN(input => TRY_PARSE_JSON(event_data :metadata))
),
FINAL AS (
SELECT
tx_id,
block_timestamp,
event_contract,
play_id,
_inserted_timestamp,
OBJECT_AGG(
column_header :: variant,
column_value :: variant
) AS metadata
FROM
play_metadata
GROUP BY
1,
2,
3,
4,
5
)
SELECT
*
FROM
FINAL

View File

@ -1,22 +0,0 @@
version: 2
models:
- name: silver__nft_moment_metadata
description: |-
Cleaned PlayCreated events, which is what records the metadata on-chain.
tests:
- dbt_utils.unique_combination_of_columns:
combination_of_columns:
- event_contract
- play_id
columns:
- name: tx_id
description: "{{ doc('tx_id') }}"
- name: block_timestamp
description: "{{ doc('block_timestamp') }}"
- name: event_contract
description: "{{ doc('event_contract') }}"
- name: play_id
description: "{{ doc('play_id') }}"
- name: _inserted_timestamp
description: "{{ doc('_inserted_timestamp') }}"

View File

@ -1,80 +0,0 @@
{{ config(
materialized = 'table',
cluster_by = ['_inserted_timestamp'],
unique_key = "concat_ws('-',event_contract,edition_id,nft_id)",
tags = ['nft', 'dapper', 'scheduled', 'chainwalkers_scheduled']
) }}
WITH moments AS (
SELECT
*
FROM
{{ ref('silver__nft_moment_minted') }}
),
metadata AS (
SELECT
*
FROM
{{ ref('silver__nft_moment_metadata') }}
),
editions AS (
SELECT
*
FROM
{{ ref('silver__nft_moment_editions') }}
),
series AS (
SELECT
*
FROM
{{ ref('silver__nft_moment_series') }}
),
set_nm AS (
SELECT
*
FROM
{{ ref('silver__nft_moment_set') }}
),
FINAL AS (
SELECT
-- tx id and block timestamp don't matter for the final table
m.tx_id,
m.block_timestamp,
m.event_contract,
m.nft_id,
m.serial_number,
e.max_mint_size,
e.play_id,
e.series_id,
s.series_name,
e.set_id,
sn.set_name,
e.edition_id,
e.tier,
pl.metadata,
m._inserted_timestamp,
sn._inserted_timestamp AS _inserted_timestamp_set
FROM
moments m
LEFT JOIN editions e USING (
event_contract,
edition_id
)
LEFT JOIN metadata pl USING (
event_contract,
play_id
)
LEFT JOIN series s USING (
event_contract,
series_id
)
LEFT JOIN set_nm sn USING (
event_contract,
set_id
)
)
SELECT
*
FROM
FINAL

View File

@ -1,39 +0,0 @@
version: 2
models:
- name: silver__nft_moment_metadata_final
description: |-
Cleaned PlayCreated events.
columns:
- name: tx_id
description: "{{ doc('tx_id') }}"
- name: block_timestamp
description: "{{ doc('block_timestamp') }}"
- name: event_contract
description: "{{ doc('event_contract') }}"
- name: nft_id
description: "{{ doc('nft_id') }}"
- name: serial_number
description: "{{ doc('serial_number') }}"
- name: max_mint_size
description: "{{ doc('max_mint_size') }}"
- name: play_id
description: "{{ doc('play_id') }}"
- name: series_id
description: "{{ doc('series_id') }}"
- name: series_name
description: "{{ doc('series_name') }}"
- name: set_id
description: "{{ doc('set_id') }}"
- name: set_name
description: "{{ doc('set_name') }}"
- name: edition_id
description: "{{ doc('edition_id') }}"
- name: tier
description: "{{ doc('tier') }}"
- name: metadata
description: "{{ doc('metadata') }}"
- name: _inserted_timestamp
description: "{{ doc('_inserted_timestamp') }}"
tests:
- not_null

View File

@ -1,42 +0,0 @@
{{ config(
materialized = 'incremental',
cluster_by = ['_inserted_timestamp'],
unique_key = "concat_ws('-', event_contract, edition_id)",
incremental_strategy = 'delete+insert',
tags = ['nft', 'dapper', 'scheduled', 'chainwalkers_scheduled']
) }}
WITH events AS (
SELECT
*
FROM
{{ ref('silver__events_final') }}
WHERE
event_type = 'MomentNFTMinted'
{% if is_incremental() %}
AND _inserted_timestamp >= (
SELECT
MAX(_inserted_timestamp)
FROM
{{ this }}
)
{% endif %}
),
org AS (
SELECT
tx_id,
block_timestamp,
event_contract,
event_data :editionID :: STRING AS edition_id,
event_data :id :: STRING AS nft_id,
event_data :serialNumber :: STRING AS serial_number,
_inserted_timestamp
FROM
events
)
SELECT
*
FROM
org

View File

@ -1,32 +0,0 @@
version: 2
models:
- name: silver__nft_moment_minted
description: |-
Cleaned MomentNFTMinted events.
tests:
- dbt_utils.unique_combination_of_columns:
combination_of_columns:
- event_contract
- nft_id
columns:
- name: tx_id
description: "{{ doc('tx_id') }}"
- name: block_timestamp
description: "{{ doc('block_timestamp') }}"
- name: event_contract
description: "{{ doc('event_contract') }}"
tests:
- accepted_values:
values:
- "A.e4cf4bdc1751c65d.AllDay"
- "A.b715b81853fef53f.AllDay"
- "A.87ca73a41bb50ad5.Golazos"
- name: serial_number
description: "{{ doc('serial_number') }}"
- name: edition_id
description: "{{ doc('edition_id') }}"
- name: nft_id
description: "{{ doc('nft_id') }}"
- name: _inserted_timestamp
description: "{{ doc('_inserted_timestamp') }}"

View File

@ -1,43 +0,0 @@
{{ config(
materialized = 'incremental',
cluster_by = ['_inserted_timestamp'],
unique_key = "concat_ws('-', event_contract, moment_id)",
incremental_strategy = 'delete+insert',
tags = ['nft', 'dapper', 'scheduled', 'chainwalkers_scheduled']
) }}
WITH events AS (
SELECT
*
FROM
{{ ref('silver__events_final') }}
WHERE
event_type = 'MomentMinted'
{% if is_incremental() %}
AND _inserted_timestamp >= (
SELECT
MAX(_inserted_timestamp)
FROM
{{ this }}
)
{% endif %}
),
org AS (
SELECT
tx_id,
block_timestamp,
event_contract,
event_data :momentID :: STRING AS moment_id,
event_data :serialNumber :: STRING AS serial_number,
event_data :seriesID :: STRING AS series_id,
event_data :setID :: STRING AS set_id,
_inserted_timestamp
FROM
events
)
SELECT
*
FROM
org

View File

@ -1,35 +0,0 @@
version: 2
models:
- name: silver__nft_moment_minted_2
description: |-
Cleaned MomentNFTMinted events.
tests:
- dbt_utils.unique_combination_of_columns:
combination_of_columns:
- event_contract
- moment_id
columns:
- name: tx_id
description: "{{ doc('tx_id') }}"
- name: block_timestamp
description: "{{ doc('block_timestamp') }}"
- name: event_contract
description: "{{ doc('event_contract') }}"
tests:
- accepted_values:
values:
- "A.c38aea683c0c4d38.Eternal"
- "A.d4ad4740ee426334.Moments"
- "A.67af7ecf76556cd3.ABD"
- "A.0b2a3299cc857e29.TopShot"
- name: moment_id
description: "{{ doc('moment_id') }}"
- name: serial_number
description: "{{ doc('serial_number') }}"
- name: set_id
description: "{{ doc('set_id') }}"
- name: series_id
description: "{{ doc('series_id') }}"
- name: _inserted_timestamp
description: "{{ doc('_inserted_timestamp') }}"

View File

@ -1,42 +0,0 @@
{{ config(
materialized = 'incremental',
cluster_by = ['_inserted_timestamp'],
unique_key = "concat_ws('-', event_contract, series_id)",
incremental_strategy = 'delete+insert',
tags = ['nft', 'dapper', 'scheduled', 'chainwalkers_scheduled']
) }}
WITH events AS (
SELECT
*
FROM
{{ ref('silver__events_final') }}
WHERE
event_type = 'SeriesCreated'
AND ARRAY_CONTAINS('name' :: variant, object_keys(event_data))
{% if is_incremental() %}
AND _inserted_timestamp >= (
SELECT
MAX(_inserted_timestamp)
FROM
{{ this }}
)
{% endif %}
),
org AS (
SELECT
tx_id,
block_timestamp,
event_contract,
event_data :id :: STRING AS series_id,
event_data :name :: STRING AS series_name,
_inserted_timestamp
FROM
events
)
SELECT
*
FROM
org

View File

@ -1,30 +0,0 @@
version: 2
models:
- name: silver__nft_moment_series
description: |-
Cleaned SeriesCreated events.
tests:
- dbt_utils.unique_combination_of_columns:
combination_of_columns:
- event_contract
- series_id
columns:
- name: tx_id
description: "{{ doc('tx_id') }}"
- name: block_timestamp
description: "{{ doc('block_timestamp') }}"
- name: event_contract
description: "{{ doc('event_contract') }}"
tests:
- accepted_values:
values:
- "A.e4cf4bdc1751c65d.AllDay"
- "A.b715b81853fef53f.AllDay"
- "A.87ca73a41bb50ad5.Golazos"
- name: series_id
description: "{{ doc('series_id') }}"
- name: series_name
description: "{{ doc('series_name') }}"
- name: _inserted_timestamp
description: "{{ doc('_inserted_timestamp') }}"

View File

@ -1,44 +0,0 @@
{{ config(
materialized = 'incremental',
cluster_by = ['_inserted_timestamp'],
unique_key = "concat_ws('-', event_contract, set_id)",
incremental_strategy = 'delete+insert',
tags = ['nft', 'dapper', 'scheduled', 'chainwalkers_scheduled']
) }}
WITH events AS (
SELECT
*
FROM
{{ ref('silver__events_final') }}
WHERE
event_type = 'SetCreated'
AND ARRAY_CONTAINS('name' :: variant, object_keys(event_data))
{% if is_incremental() %}
AND _inserted_timestamp >= (
SELECT
MAX(_inserted_timestamp)
FROM
{{ this }}
)
{% endif %}
),
org AS (
SELECT
tx_id,
block_timestamp,
event_contract,
event_data :id :: STRING AS set_id,
event_data :name :: STRING AS set_name,
_inserted_timestamp
FROM
events
WHERE
set_id IS NOT NULL
)
SELECT
*
FROM
org

View File

@ -1,24 +0,0 @@
version: 2
models:
- name: silver__nft_moment_set
description: |-
Cleaned SetCreated events.
tests:
- dbt_utils.unique_combination_of_columns:
combination_of_columns:
- event_contract
- set_id
columns:
- name: tx_id
description: "{{ doc('tx_id') }}"
- name: block_timestamp
description: "{{ doc('block_timestamp') }}"
- name: event_contract
description: "{{ doc('event_contract') }}"
- name: set_id
description: "{{ doc('set_id') }}"
- name: set_name
description: "{{ doc('set_name') }}"
- name: _inserted_timestamp
description: "{{ doc('_inserted_timestamp') }}"

View File

@ -1,47 +0,0 @@
{{ config(
materialized = 'incremental',
incremental_strategy = 'delete+insert',
cluster_by = ['_inserted_timestamp::DATE'],
unique_key = "CONCAT_WS('-', tx_id, event_index)",
tags = ['nft', 'scheduled', 'chainwalkers_scheduled']
) }}
WITH events AS (
SELECT
*
FROM
{{ ref('silver__events_final') }}
{% if is_incremental() %}
WHERE
_inserted_timestamp >= (
SELECT
MAX(_inserted_timestamp)
FROM
{{ this }}
)
{% endif %}
),
moment_events AS (
SELECT
*
FROM
events
WHERE
event_type IN (
'MomentPurchased',
'MomentLocked',
'MomentCreated',
'MomentNFTBurned',
'MomentListed',
'MomentDestroyed',
'MomentWithdrawn',
'MomentMinted',
'MomentNFTMinted'
)
)
SELECT
*
FROM
moment_events

View File

@ -1,116 +0,0 @@
{{ config(
materialized = 'incremental',
incremental_strategy = 'delete+insert',
cluster_by = ['_inserted_timestamp::DATE'],
unique_key = 'tx_id',
tags = ['nft', 'scheduled', 'chainwalkers_scheduled']
) }}
WITH topshot AS (
SELECT
*
FROM
{{ ref('silver__nft_topshot_sales') }}
{% if is_incremental() %}
WHERE
_inserted_timestamp >= (
SELECT
MAX(_inserted_timestamp)
FROM
{{ this }}
)
{% endif %}
),
secondary_mkts AS (
SELECT
*
FROM
{{ ref('silver__nft_transactions_secondary_market') }}
{% if is_incremental() %}
WHERE
_inserted_timestamp >= (
SELECT
MAX(_inserted_timestamp)
FROM
{{ this }}
)
{% endif %}
),
giglabs AS (
SELECT
*
FROM
{{ ref('silver__nft_giglabs') }}
{% if is_incremental() %}
WHERE
_inserted_timestamp >= (
SELECT
MAX(_inserted_timestamp)
FROM
{{ this }}
)
{% endif %}
),
combo AS (
SELECT
tx_id,
block_height,
block_timestamp,
marketplace,
nft_collection,
nft_id,
buyer,
seller,
price,
currency,
tx_succeeded,
_inserted_timestamp,
tokenflow,
counterparties
FROM
topshot
UNION
SELECT
tx_id,
block_height,
block_timestamp,
marketplace,
nft_collection,
nft_id,
buyer,
seller,
price,
currency,
tx_succeeded,
_inserted_timestamp,
tokenflow,
counterparties
FROM
secondary_mkts
UNION
SELECT
tx_id,
block_height,
block_timestamp,
marketplace,
nft_collection,
nft_id,
buyer,
seller,
price,
currency,
tx_succeeded,
_inserted_timestamp,
tokenflow,
counterparties
FROM
giglabs
)
SELECT
*
FROM
combo

View File

@ -1,52 +0,0 @@
version: 2
models:
- name: silver__nft_sales
description: |-
NFT market sales on the Flow blockchain. This table will only contain successful transactions, as failed transactions will not have the requisite events to determine it was an attempted NFT purchase.
columns:
- name: tx_id
description: "{{ doc('tx_id') }}"
- name: block_height
description: "{{ doc('block_height') }}"
- name: block_timestamp
description: "{{ doc('block_timestamp') }}"
- name: marketplace
description: "{{ doc('marketplace') }}"
- name: nft_collection
description: "{{ doc('nft_collection') }}"
- name: nft_id
description: "{{ doc('nft_id') }}"
- name: buyer
description: "{{ doc('buyer') }}"
- name: seller
description: "{{ doc('seller') }}"
- name: price
description: "{{ doc('price') }}"
- name: currency
description: "{{ doc('currency') }}"
- name: tx_succeeded
description: "{{ doc('tx_succeeded') }}"
- name: _ingested_at
description: "{{ doc('_ingested_at') }}"
- name: tokenflow
description: "{{ doc('tokenflow') }}"
- name: counterparties
description: "{{ doc('counterparties') }}"
- name: _inserted_timestamp
description: "{{ doc('_inserted_timestamp') }}"

View File

@ -1,66 +0,0 @@
{{ config(
materialized = 'incremental',
incremental_strategy = 'merge',
cluster_by = ['_inserted_timestamp::DATE'],
unique_key = ['tx_id','nft_id'],
tags = ['nft', 'scheduled', 'chainwalkers_scheduled']
) }}
WITH silver_events AS (
SELECT
*
FROM
{{ ref('silver__events_final') }}
{% if is_incremental() %}
WHERE
_inserted_timestamp >= (
SELECT
MAX(_inserted_timestamp)
FROM
{{ this }}
)
{% endif %}
),
nft_txs AS (
SELECT
tx_id,
event_index,
block_height,
block_timestamp,
tx_succeeded,
event_data :id :: INT AS nft_id,
_inserted_timestamp
FROM
silver_events
WHERE
event_data :from = '0xe1f2a091f7bb5245'
AND event_contract = 'A.0b2a3299cc857e29.TopShot'
)
SELECT
A.tx_id,
A.block_height,
A.block_timestamp,
'topshot pack' AS marketplace,
NULL AS nft_collection,
A.nft_id,
b.event_data :to :: STRING buyer,
NULL AS seller,
NULL price,
NULL currency,
A.tx_succeeded,
NULL AS tokenflow,
NULL AS counterparties,
MD5(
CAST(COALESCE(CAST(A.tx_id AS VARCHAR), '') AS VARCHAR)
) AS pack_id,
A._inserted_timestamp
FROM
nft_txs A
JOIN silver_events b
ON A.tx_id = b.tx_id
AND A.nft_id :: STRING = b.event_data :id :: STRING
WHERE
event_data :to IS NOT NULL
AND A.event_index <> b.event_index

View File

@ -1,100 +0,0 @@
version: 2
models:
- name: silver__nft_topshot_pack_sales
description: TopShot pack sales
columns:
- name: tx_id
description: "{{ doc('tx_id') }}"
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- STRING
- VARCHAR
- name: block_height
description: "{{ doc('block_height') }}"
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- NUMBER
- FLOAT
- name: block_timestamp
description: "{{ doc('block_timestamp') }}"
tests:
- not_null
- dbt_expectations.expect_row_values_to_have_recent_data:
datepart: day
interval: 1
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- TIMESTAMP_NTZ
- name: marketplace
description: "{{ doc('marketplace') }}"
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- STRING
- VARCHAR
- name: nft_collection
description: "{{ doc('nft_collection') }}"
- name: nft_id
description: "{{ doc('nft_id') }}"
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- STRING
- VARCHAR
- NUMBER
- name: buyer
description: "{{ doc('buyer') }}"
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- STRING
- VARCHAR
- name: seller
description: "{{ doc('seller') }}"
- name: price
description: "{{ doc('price') }}"
- name: currency
description: "{{ doc('currency') }}"
- name: tx_succeeded
description: "{{ doc('tx_succeeded') }}"
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- BOOLEAN
- name: _ingested_at
description: "{{ doc('_ingested_at') }}"
- name: tokenflow
description: "{{ doc('tokenflow') }}"
- name: counterparties
description: "{{ doc('counterparties') }}"
- name: _inserted_timestamp
description: "{{ doc('_inserted_timestamp') }}"
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- TIMESTAMP_NTZ

View File

@ -1,158 +0,0 @@
{{ config(
materialized = 'incremental',
incremental_strategy = 'delete+insert',
cluster_by = ['_inserted_timestamp::DATE'],
unique_key = 'tx_id',
tags = ['nft', 'scheduled', 'chainwalkers_scheduled']
) }}
WITH silver_events AS (
SELECT
*
FROM
{{ ref('silver__events_final') }}
-- WHERE
-- event_data :: STRING != '{}'
{% if is_incremental() %}
WHERE
_inserted_timestamp >= (
SELECT
MAX(_inserted_timestamp)
FROM
{{ this }}
)
{% endif %}
),
moment_data AS (
SELECT
block_height,
block_timestamp,
tx_id,
event_contract :: STRING AS marketplace,
event_data :id :: STRING AS nft_id,
event_data :price :: DOUBLE AS price,
event_data :seller :: STRING AS seller,
tx_succeeded,
_ingested_at,
_inserted_timestamp
FROM
silver_events
WHERE
event_type = 'MomentPurchased'
AND event_contract LIKE 'A.c1e4f4f4c4257510%' -- topshot
),
currency_data AS (
SELECT
tx_id,
event_contract :: STRING AS currency
FROM
silver_events
WHERE
tx_id IN (
SELECT
tx_id
FROM
moment_data
)
AND event_index = 0
),
nft_data AS (
SELECT
tx_id,
event_contract :: STRING AS nft_collection,
event_data :to :: STRING AS buyer
FROM
silver_events
WHERE
tx_id IN (
SELECT
tx_id
FROM
moment_data
)
AND event_type = 'Deposit'
),
combo AS (
SELECT
tx_id,
block_height,
block_timestamp,
marketplace,
nft_collection,
nft_id,
buyer,
seller,
price,
currency,
tx_succeeded,
_ingested_at,
_inserted_timestamp
FROM
moment_data
LEFT JOIN currency_data USING (tx_id)
LEFT JOIN nft_data USING (tx_id)
),
step_data AS (
SELECT
tx_id,
event_index,
event_type,
event_data
FROM
{{ ref('silver__events_final') }}
WHERE
tx_id IN (
SELECT
tx_id
FROM
combo
)
AND event_type IN (
'TokensWithdrawn',
'TokensDeposited',
'ForwardedDeposit'
)
),
counterparty_data AS (
SELECT
tx_id,
ARRAY_AGG(OBJECT_CONSTRUCT(event_type, event_data)) within GROUP (
ORDER BY
event_index
) AS tokenflow,
ARRAY_AGG(COALESCE(event_data :to, event_data :from) :: STRING) within GROUP (
ORDER BY
event_index
) AS counterparties
FROM
step_data
GROUP BY
1
),
FINAL AS (
SELECT
C.tx_id,
block_height,
block_timestamp,
marketplace,
nft_collection,
nft_id,
buyer,
seller,
price,
currency,
tx_succeeded,
_ingested_at,
_inserted_timestamp,
cd.tokenflow,
cd.counterparties
FROM
combo C
LEFT JOIN counterparty_data cd USING (tx_id)
)
SELECT
*
FROM
FINAL

View File

@ -1,139 +0,0 @@
version: 2
models:
- name: silver__nft_topshot_sales
description: |-
TopShot direct market sales on the Flow blockchain.
columns:
- name: tx_id
description: "{{ doc('tx_id') }}"
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- STRING
- VARCHAR
- name: block_height
description: "{{ doc('block_height') }}"
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- NUMBER
- FLOAT
- name: block_timestamp
description: "{{ doc('block_timestamp') }}"
tests:
- not_null
- dbt_expectations.expect_row_values_to_have_recent_data:
datepart: day
interval: 1
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- TIMESTAMP_NTZ
- name: marketplace
description: "{{ doc('marketplace') }}"
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- STRING
- VARCHAR
- name: nft_collection
description: "{{ doc('nft_collection') }}"
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- STRING
- VARCHAR
- name: nft_id
description: "{{ doc('nft_id') }}"
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- STRING
- VARCHAR
- NUMBER
- name: buyer
description: "{{ doc('buyer') }}"
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- STRING
- VARCHAR
- name: seller
description: "{{ doc('seller') }}"
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- STRING
- VARCHAR
- name: price
description: "{{ doc('price') }}"
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- NUMBER
- FLOAT
- name: currency
description: "{{ doc('currency') }}"
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- STRING
- VARCHAR
- name: tx_succeeded
description: "{{ doc('tx_succeeded') }}"
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- BOOLEAN
- name: _ingested_at
description: "{{ doc('_ingested_at') }}"
- name: tokenflow
description: "{{ doc('tokenflow') }}"
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- ARRAY
- OBJECT
- VARIANT
- name: counterparties
description: "{{ doc('counterparties') }}"
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- ARRAY
- OBJECT
- VARIANT
- name: _inserted_timestamp
description: "{{ doc('_inserted_timestamp') }}"
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- TIMESTAMP_NTZ

View File

@ -1,509 +0,0 @@
{{ config(
materialized = 'incremental',
incremental_strategy = 'delete+insert',
cluster_by = ['_inserted_timestamp::DATE'],
unique_key = 'tx_id',
tags = ['nft', 'scheduled', 'chainwalkers_scheduled']
) }}
WITH silver_events AS (
SELECT
*
FROM
{{ ref('silver__events_final') }}
{% if is_incremental() %}
WHERE
_inserted_timestamp >= (
SELECT
MAX(_inserted_timestamp)
FROM
{{ this }}
)
{% endif %}
),
sale_trigger AS (
SELECT
tx_id,
block_timestamp,
block_height,
tx_succeeded,
event_contract AS marketplace,
event_data,
COALESCE(
event_data :purchased :: BOOLEAN,
event_data :accepted :: BOOLEAN,
IFF(
event_data :status = 'sold'
OR event_data :status IS NULL,
TRUE,
FALSE
),
TRUE
) AS is_purchased,
_ingested_at,
_inserted_timestamp
FROM
silver_events
WHERE
is_purchased
AND -- each market uses a slightly different sale trigger
(
(
event_contract = 'A.30cf5dcf6ea8d379.AeraPack'
AND event_type = 'Purchased'
)
OR (
event_contract = 'A.8f9231920da9af6d.AFLPack'
AND event_type = 'PackBought'
)
OR (
event_contract = 'A.e2e1689b53e92a82.AniqueMarket'
AND event_type = 'CollectiblePurchased'
)
OR (
event_contract = 'A.9969d64233d69723.BlockleteMarket_NFT_V2'
AND event_type = 'BlockletePurchased'
)
OR (
event_contract = 'A.64f83c60989ce555.ChainmonstersMarketplace'
AND event_type = 'CollectionRemovedSaleOffer'
)
OR (
event_contract = 'A.c8c340cebd11f690.DarkCountryMarket'
AND event_type = 'SaleOfferAccepted'
)
OR (
event_contract = 'A.921ea449dffec68a.FlovatarMarketplace'
AND event_type IN (
'FlovatarPurchased',
'FlovatarComponentPurchased'
)
)
OR (
event_contract = 'A.09e03b1f871b3513.TheFabricantMarketplace'
AND event_type = 'NFTPurchased'
)
OR (
event_contract = 'A.097bafa4e0b48eef.FindMarketAuctionEscrow'
AND event_type = 'EnglishAuction'
)
OR (
event_contract = 'A.097bafa4e0b48eef.FindMarketDirectOfferEscrow'
AND event_type = 'DirectOffer'
)
OR (
event_contract = 'A.097bafa4e0b48eef.FindMarketSale'
AND event_type = 'Sale'
)
OR (
event_contract = 'A.097bafa4e0b48eef.FindPack'
AND event_type = 'Purchased'
)
OR (
event_contract = 'A.8b148183c28ff88f.GaiaOrder'
AND event_type = 'OrderClosed'
)
OR (
event_contract = 'A.abda6627c70c7f52.GeniaceMarketplace'
AND event_type = 'SaleOfferCompleted'
)
OR (
event_contract = 'A.82ed1b9cba5bb1b3.KaratNFTMarket'
AND event_type = 'SaleOfferAccepted'
)
OR (
event_contract = 'A.2162bbe13ade251e.MatrixMarketOpenOffer'
AND event_type = 'OfferCompleted'
)
OR (
event_contract = 'A.49b8e5d4d66ae880.MintStoreMarketFactory'
AND event_type = 'MintStoreItemPurchased'
)
OR (
event_contract = 'A.a49cc0ee46c54bfb.MotoGPNFTStorefront'
AND event_type = 'SaleOfferCompleted'
)
OR (
event_contract = 'A.b8ea91944fd51c43.Offers'
AND event_type = 'OfferCompleted'
)
OR (
event_contract = 'A.b8ea91944fd51c43.OffersV2'
AND event_type = 'OfferCompleted'
)
OR (
event_contract = 'A.856bd81e73e6752b.PonsNftMarketContract'
AND event_type = 'PonsNFTSold'
)
OR (
event_contract = 'A.52cbea4e6f616b8e.PublishedNFTStorefront'
AND event_type = 'ListingCompleted'
)
OR (
event_contract = 'A.489fcc527edc21cf.TuneGOMarket'
AND event_type = 'SaleOfferAccepted'
)
OR (
event_contract = 'A.4eb8a10cb9f87357.NFTStorefront' -- general storefront
AND event_type = 'ListingCompleted'
)
OR (
event_contract = 'A.4eb8a10cb9f87357.NFTStorefrontV2' -- funds move in 2ND TOKEN MVMT not FIRST
AND event_type = 'ListingCompleted'
)
OR (
event_contract = 'A.85b8bbf926dcddfa.NFTStoreFront'
AND event_type = 'ListingSold'
)
OR (
event_contract = 'A.85b075e08d13f697.OlympicPinMarket'
AND event_type = 'PiecePurchased'
)
OR (
event_contract = 'A.5b82f21c0edf76e3.StarlyCardMarket'
AND event_type = 'CollectionRemovedSaleOffer'
)
OR (
event_contract = 'A.62b3063fbe672fc8.ZeedzMarketplace'
AND event_type = 'RemovedListing'
)
)
),
num_triggers AS (
SELECT
tx_id,
-- storing the marketplace contract interactions
ARRAY_AGG(marketplace) within GROUP (
ORDER BY
marketplace
) AS marketplaces,
-- compare total sales (by listing id) with distinct to eliminate the case where 2 marketplaces
-- (general & gaia) are called for 1 sale
ARRAY_AGG(
COALESCE(
event_data :orderId,
-- general
event_data :listingResourceID,
-- gaia, zeedz
event_data :saleItemID,
--chainmonster
event_data :itemID,
--starly, darkcountry
event_data :id,
-- olympic pin, flovatar, fina
event_data :saleOfferResourceID,
-- moto gp
event_data :bidId,
-- matrix
event_data :listingID,
-- fabricant
event_data :templateId,
-- AFLPack
event_data :saleOfferId,
-- tunego
event_data :offerId,
-- OffersV2
event_data :nftId,
-- pons doesn't do order ids
event_data :packId -- find pack, aera
)
) AS sale_ids,
ARRAY_AGG(
DISTINCT COALESCE(
event_data :orderId,
-- general
event_data :listingResourceID,
-- gaia, zeedz
event_data :saleItemID,
--chainmonster
event_data :itemID,
--starly, darkcountry
event_data :id,
-- olympic pin, flovatar, fina
event_data :saleOfferResourceID,
-- moto gp
event_data :bidId,
-- matrix
event_data :listingID,
-- fabricant
event_data :templateId,
-- AFLPack
event_data :saleOfferId,
-- tunego
event_data :offerId,
-- OffersV2
event_data :nftId,
-- pons doesn't do order ids
event_data :packId -- find pack, aera
)
) AS dist_sale_ids,
COUNT(1) AS sale_trigger_count,
ARRAY_SIZE(dist_sale_ids) AS num_sales
FROM
sale_trigger
GROUP BY
1
),
omit_nft_nontransfers AS (
SELECT
tx_id,
ARRAY_AGG(
DISTINCT event_type
) AS events,
-- don't forget to update below if adding any new movement method !
ARRAY_SIZE(
array_intersection(
['Deposit', 'Withdraw', 'FlovatarSaleWithdrawn', 'FlovatarComponentSaleWithdrawn'],
events
)
) = 2 AS nft_transferred,
count_if(
event_type = 'Deposit'
) AS nft_deposits
FROM
silver_events
WHERE
tx_id IN (
SELECT
tx_id
FROM
num_triggers
WHERE
num_sales < 2
)
GROUP BY
1
HAVING
nft_deposits = 1
),
first_token_withdraw AS (
SELECT
tx_id,
MIN(event_index) AS min_index
FROM
silver_events
WHERE
tx_id IN (
SELECT
tx_id
FROM
omit_nft_nontransfers
WHERE
nft_transferred
)
AND event_type = 'TokensWithdrawn'
GROUP BY
1
),
-- 3 most important events are the first TokenWithdraw, then Withdraw and Deposit (NFT movement)
token_withdraw_event AS (
SELECT
tx_id,
event_contract AS currency,
event_data :amount :: DOUBLE AS amount,
event_data :from :: STRING AS buyer_purchase,
min_index
FROM
silver_events
LEFT JOIN first_token_withdraw USING (tx_id)
WHERE
tx_id IN (
SELECT
tx_id
FROM
omit_nft_nontransfers
WHERE
nft_transferred
)
AND event_index = min_index
AND event_type = 'TokensWithdrawn'
),
nft_withdraw_event_seller AS (
SELECT
tx_id,
event_index AS event_index_seller,
event_contract AS nft_collection_seller,
COALESCE(
event_data :from,
event_data :address
) :: STRING AS seller,
COALESCE(
event_data :id,
event_data :tokenId
) :: STRING AS nft_id_seller
FROM
silver_events
WHERE
tx_id IN (
SELECT
tx_id
FROM
omit_nft_nontransfers
WHERE
nft_transferred
)
AND event_type IN (
'Withdraw',
'FlovatarSaleWithdrawn',
'FlovatarComponentSaleWithdrawn' -- if adding anything new, don't forget about omit_nft_nontransfers check!
)
),
nft_deposit_event_buyer AS (
SELECT
tx_id,
event_contract AS nft_collection_deposit,
event_data :id :: STRING AS nft_id_deposit,
event_data :to :: STRING AS buyer_deposit
FROM
silver_events
WHERE
tx_id IN (
SELECT
tx_id
FROM
omit_nft_nontransfers
WHERE
nft_transferred
)
AND event_type = 'Deposit'
),
nft_sales AS (
SELECT
e.tx_id,
e.block_timestamp,
e.block_height,
e.tx_succeeded,
e.is_purchased,
e.marketplace,
w.currency,
IFF(
e.marketplace = 'A.4eb8a10cb9f87357.NFTStorefrontV2',
e.event_data :salePrice :: DOUBLE,
w.amount
) AS amount,
w.buyer_purchase,
s.nft_collection_seller,
s.seller,
s.nft_id_seller,
b.nft_collection_deposit,
b.nft_id_deposit,
b.buyer_deposit,
e._ingested_at,
e._inserted_timestamp
FROM
sale_trigger e
LEFT JOIN token_withdraw_event w USING (tx_id)
LEFT JOIN nft_withdraw_event_seller s USING (tx_id)
LEFT JOIN nft_deposit_event_buyer b USING (tx_id)
WHERE
tx_id IN (
SELECT
tx_id
FROM
omit_nft_nontransfers
WHERE
nft_transferred
)
),
step_data AS (
SELECT
tx_id,
event_index,
event_type,
event_data
FROM
silver_events
WHERE
tx_id IN (
SELECT
tx_id
FROM
nft_sales
)
AND event_type IN (
'TokensWithdrawn',
'TokensDeposited',
'ForwardedDeposit',
'RoyaltyDeposited'
)
),
counterparty_data AS (
SELECT
tx_id,
ARRAY_AGG(OBJECT_CONSTRUCT(event_type, event_data)) within GROUP (
ORDER BY
event_index
) AS tokenflow,
ARRAY_SIZE(tokenflow) AS steps,
ARRAY_AGG(event_type) within GROUP (
ORDER BY
event_index
) AS action,
ARRAY_AGG(event_data) within GROUP (
ORDER BY
event_index
) AS step_data,
ARRAY_AGG(COALESCE(event_data :to, event_data :from) :: STRING) within GROUP (
ORDER BY
event_index
) AS counterparties
FROM
step_data
GROUP BY
1
),
FINAL AS (
SELECT
ns.tx_id,
block_timestamp,
block_height,
marketplace,
nft_collection_deposit AS nft_collection,
nft_id_seller AS nft_id,
currency,
amount AS price,
seller,
buyer_deposit AS buyer,
cd.tokenflow,
cd.steps AS num_steps,
cd.action AS step_action,
cd.step_data,
cd.counterparties,
tx_succeeded,
_ingested_at,
_inserted_timestamp
FROM
nft_sales ns
LEFT JOIN counterparty_data cd USING (tx_id)
),
dedupe_gaia AS (
SELECT
*
FROM
FINAL
WHERE
tx_id IN (
SELECT
tx_id
FROM
num_triggers
WHERE
sale_trigger_count = 2
AND num_sales = 1
) qualify ROW_NUMBER() over (
PARTITION BY tx_id
ORDER BY
marketplace
) = 1
)
SELECT
*
FROM
FINAL
EXCEPT
SELECT
*
FROM
dedupe_gaia

View File

@ -1,175 +0,0 @@
version: 2
models:
- name: silver__nft_transactions_secondary_market
description: |-
This table filters all NFT sales that interact with the general purpose contract `A.4eb8a10cb9f87357.NFTStorefront`. Transactions that purchase multiple NFTs in a single transaction are presently excluded from this table!
tests:
- dbt_utils.unique_combination_of_columns:
combination_of_columns:
- tx_id
- seller
- buyer
- nft_collection
- nft_id
columns:
- name: tx_id
description: "{{ doc('tx_id') }}"
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- STRING
- VARCHAR
- name: block_height
description: "{{ doc('block_height') }}"
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- NUMBER
- FLOAT
- name: block_timestamp
description: "{{ doc('block_timestamp') }}"
tests:
- not_null
- dbt_expectations.expect_row_values_to_have_recent_data:
datepart: day
interval: 1
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- TIMESTAMP_NTZ
- name: marketplace
description: "{{ doc('marketplace') }}"
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- STRING
- VARCHAR
- name: nft_collection
description: "{{ doc('nft_collection') }}"
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- STRING
- VARCHAR
- name: nft_id
description: "{{ doc('nft_id') }}"
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- STRING
- VARCHAR
- NUMBER
- name: currency
description: "{{ doc('currency') }}"
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- STRING
- VARCHAR
- name: price
description: "{{ doc('price') }}"
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- NUMBER
- FLOAT
- name: seller
description: "{{ doc('seller') }}"
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- STRING
- VARCHAR
- name: buyer
description: "{{ doc('buyer') }}"
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- STRING
- VARCHAR
- name: tokenflow
description: "{{ doc('tokenflow') }}"
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- ARRAY
- OBJECT
- VARIANT
- name: num_steps
description: "{{ doc('num_steps') }}"
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- NUMBER
- name: step_action
description: "{{ doc('step_action') }}"
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- ARRAY
- OBJECT
- VARIANT
- name: step_data
description: "{{ doc('step_data') }}"
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- ARRAY
- OBJECT
- VARIANT
- name: counterparties
description: "{{ doc('counterparties') }}"
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- ARRAY
- OBJECT
- VARIANT
- name: tx_succeeded
description: "{{ doc('tx_succeeded') }}"
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- BOOLEAN
- name: _ingested_at
description: "{{ doc('_ingested_at') }}"
- name: _inserted_timestamp
description: "{{ doc('_inserted_timestamp') }}"
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- TIMESTAMP_NTZ

View File

@ -1,99 +0,0 @@
{{ config(
materialized = 'incremental',
incremental_strategy = 'delete+insert',
cluster_by = ['_inserted_timestamp::DATE'],
unique_key = 'tx_id',
tags = ['scheduled', 'chainwalkers_scheduled']
) }}
WITH silver_events AS (
SELECT
*
FROM
{{ ref('silver__events_final') }}
-- WHERE
-- event_data :: STRING != '{}'
{% if is_incremental() %}
WHERE
_inserted_timestamp >= (
SELECT
MAX(_inserted_timestamp)
FROM
{{ this }}
)
{% endif %}
),
flow_staking AS (
SELECT
tx_id,
event_index,
block_timestamp,
block_height,
tx_succeeded,
event_contract,
event_type AS action,
event_data :amount :: FLOAT AS amount,
event_data :delegatorID :: STRING AS delegator_id,
event_data :nodeID :: STRING AS node_id,
_inserted_timestamp
FROM
silver_events
WHERE
event_contract = 'A.8624b52f9ddcd04a.FlowIDTableStaking'
AND event_type IN (
'DelegatorTokensCommitted',
'DelegatorRewardTokensWithdrawn',
'DelegatorUnstakedTokensWithdrawn',
'TokensCommitted',
'RewardTokensWithdrawn',
'UnstakedTokensWithdrawn'
)
),
add_auth AS (
SELECT
tx_id,
COALESCE(
authorizers [1],
authorizers [0]
) :: STRING AS primary_authorizer
FROM
{{ ref('silver__transactions') }}
WHERE
tx_id IN (
SELECT
tx_id
FROM
flow_staking
)
{% if is_incremental() %}
AND _inserted_timestamp >= (
SELECT
MAX(_inserted_timestamp)
FROM
{{ this }}
)
{% endif %}
),
FINAL AS (
SELECT
s.tx_id,
event_index,
block_timestamp,
block_height,
tx_succeeded,
primary_authorizer AS delegator,
action,
amount,
node_id,
_inserted_timestamp
FROM
flow_staking s
LEFT JOIN add_auth A USING (tx_id)
)
SELECT
*
FROM
FINAL

View File

@ -1,37 +0,0 @@
version: 2
models:
- name: silver__staking_actions
description: |-
This table provides transaction-level info on FLOW staking activities.
columns:
- name: tx_id
description: "{{ doc('tx_id') }}"
- name: event_index
description: "{{ doc('event_index') }}"
- name: block_timestamp
description: "{{ doc('block_timestamp') }}"
- name: block_height
description: "{{ doc('block_height') }}"
- name: tx_succeeded
description: "{{ doc('tx_succeeded') }}"
- name: delegator
description: "{{ doc('delegator') }}"
- name: action
description: "{{ doc('action') }}"
- name: amount
description: "{{ doc('amount') }}"
- name: node_id
description: "{{ doc('node_id') }}"
- name: _inserted_timestamp
description: "{{ doc('_inserted_timestamp') }}"

View File

@ -1,318 +0,0 @@
{{ config(
materialized = 'incremental',
cluster_by = ['_inserted_timestamp::DATE'],
unique_key = "CONCAT_WS('-', tx_id, swap_index)",
incremental_strategy = 'delete+insert',
tags = ['scheduled', 'chainwalkers_scheduled']
) }}
WITH events AS (
SELECT
*
FROM
{{ ref('silver__swaps_events') }}
{% if is_incremental() %}
WHERE
_inserted_timestamp >= (
SELECT
MAX(_inserted_timestamp)
FROM
{{ this }}
)
{% endif %}
),
swap_events AS (
SELECT
*
FROM
events
WHERE
tx_id NOT IN (
SELECT
DISTINCT tx_id
FROM
events
WHERE
event_type = 'RewardTokensWithdrawn'
OR event_type = 'NFTReceived'
) -- PierPair needs a bespoke model as it does not deposit traded token to Pool contract
AND tx_id NOT IN (
SELECT
DISTINCT tx_id
FROM
events
WHERE
event_contract LIKE '%PierPair%'
)
),
pool_info AS (
SELECT
tx_id,
block_timestamp,
block_height,
event_index,
event_type,
RANK() over (
PARTITION BY tx_id
ORDER BY
event_index
) - 1 AS swap_index,
event_contract AS pool_contract,
CASE
WHEN object_keys(event_data) [0] :: STRING = 'side' THEN 'Blocto'
WHEN object_keys(event_data) [0] :: STRING = 'direction' THEN 'Increment'
WHEN object_keys(event_data) [3] :: STRING = 'swapAForB' THEN 'Metapier'
ELSE 'Other'
END AS likely_dex,
COALESCE(
event_data :direction :: NUMBER,
event_data :side :: NUMBER - 1,
event_data :swapAForB :: BOOLEAN :: NUMBER
) AS direction,
COALESCE(
event_data :inTokenAmount,
event_data :token1Amount,
event_data :amountIn
) :: DOUBLE AS in_token_amount,
COALESCE(
event_data :outTokenAmount,
event_data :token2Amount,
event_data :amountOut
) :: DOUBLE AS out_token_amount,
_inserted_timestamp
FROM
swap_events
WHERE
event_type IN (
'Trade',
'Swap'
)
),
token_withdraws AS (
SELECT
tx_id,
block_timestamp,
block_height,
event_index,
RANK() over (
PARTITION BY tx_id
ORDER BY
event_index
) - 1 AS token_index,
RANK() over (
PARTITION BY CONCAT(
tx_id,
event_data :amount :: STRING,
event_data :from :: STRING
)
ORDER BY
event_index
) - 1 AS unique_order,
event_contract,
event_data,
_inserted_timestamp
FROM
swap_events
WHERE
event_type = 'TokensWithdrawn'
AND tx_id IN (
SELECT
DISTINCT tx_id
FROM
pool_info
)
),
token_deposits AS (
SELECT
tx_id,
block_timestamp,
block_height,
event_index,
RANK() over (
PARTITION BY tx_id
ORDER BY
event_index
) - 1 AS token_index,
RANK() over (
PARTITION BY CONCAT(
tx_id,
event_data :amount :: STRING,
event_data :to :: STRING
)
ORDER BY
event_index
) - 1 AS unique_order,
event_contract,
event_data,
_inserted_timestamp
FROM
swap_events
WHERE
event_type = 'TokensDeposited'
AND tx_id IN (
SELECT
DISTINCT tx_id
FROM
pool_info
)
),
link_token_movement AS (
SELECT
w.tx_id,
w.block_timestamp,
w.block_height,
w._inserted_timestamp,
-- set transfer index based on execution via deposit, not withdraw, event
RANK() over (
PARTITION BY w.tx_id
ORDER BY
d.event_index
) - 1 AS transfer_index,
w.event_data :from :: STRING AS withdraw_from,
d.event_data :to :: STRING AS deposit_to,
w.event_data :amount :: DOUBLE AS amount,
w.event_contract AS token_contract,
w.event_contract = d.event_contract AS contract_check
FROM
token_withdraws w
LEFT JOIN token_deposits d
ON w.tx_id = d.tx_id
AND w.event_contract = d.event_contract
AND w.event_data :amount :: STRING = d.event_data :amount :: STRING
AND w.unique_order = d.unique_order
),
restructure AS (
SELECT
t.tx_id,
t.transfer_index,
p.swap_index,
RANK() over (
PARTITION BY t.tx_id,
swap_index
ORDER BY
transfer_index
) - 1 AS token_position,
t.withdraw_from,
t.deposit_to,
CONCAT('0x', SPLIT(pool_contract, '.') [1]) AS pool_address,
sub.trader,
ARRAYS_OVERLAP(ARRAY_CONSTRUCT(t.withdraw_from, t.deposit_to), ARRAY_CONSTRUCT(pool_address, sub.trader)) AS transfer_involve_pool_or_trader,
t.amount,
t.token_contract,
p.pool_contract,
p.direction,
p.in_token_amount,
p.out_token_amount
FROM
link_token_movement t
LEFT JOIN pool_info p
ON p.tx_id = t.tx_id
AND (
p.in_token_amount = t.amount -- blocto takes a 0.3% fee
OR ROUND((p.in_token_amount / 0.997) - t.amount) = 0
OR p.out_token_amount = t.amount
OR ROUND((p.out_token_amount / 0.997) - t.amount) = 0
)
AND transfer_index >= swap_index
LEFT JOIN (
SELECT
tx_id,
withdraw_from AS trader
FROM
link_token_movement
WHERE
transfer_index = 0
) sub
ON t.tx_id = sub.tx_id
WHERE
swap_index IS NOT NULL -- exclude the network fee token movement
AND transfer_involve_pool_or_trader
),
-- there are some cases where the same token is transferred multiple times in a single swap
-- causing an error with object agg
excl_dup_transfer AS (
SELECT
tx_id,
pool_contract,
swap_index,
CONCAT(
'token',
token_position
),
COUNT(1) AS identical_transfer_count
FROM
restructure
GROUP BY
1,
2,
3,
4
HAVING
identical_transfer_count > 1
),
pool_token_alignment AS (
SELECT
tx_id,
pool_contract,
swap_index,
OBJECT_AGG(CONCAT('token', token_position), token_contract :: variant) AS tokens,
OBJECT_AGG(CONCAT('amount', token_position), amount) AS amounts,
OBJECT_AGG(CONCAT('from', token_position), withdraw_from :: variant) AS withdraws,
OBJECT_AGG(CONCAT('to', token_position), deposit_to :: variant) AS deposits
FROM
restructure
WHERE
tx_id NOT IN (
SELECT
DISTINCT tx_id
FROM
excl_dup_transfer
)
GROUP BY
1,
2,
3
),
boilerplate AS (
SELECT
tx_id,
block_timestamp,
block_height,
_inserted_timestamp,
withdraw_from AS trader
FROM
link_token_movement
WHERE
transfer_index = 0
AND tx_id NOT IN (
SELECT
DISTINCT tx_id
FROM
excl_dup_transfer
)
),
FINAL AS (
SELECT
tx_id,
block_timestamp,
block_height,
pool_contract AS swap_contract,
swap_index,
trader,
withdraws :from0 :: STRING AS token_out_source,
tokens :token0 :: STRING AS token_out_contract,
amounts :amount0 :: DOUBLE AS token_out_amount,
deposits :to1 :: STRING AS token_in_destination,
tokens :token1 :: STRING AS token_in_contract,
amounts :amount1 :: DOUBLE AS token_in_amount,
_inserted_timestamp
FROM
boilerplate
LEFT JOIN pool_token_alignment USING (tx_id)
)
SELECT
*
FROM
FINAL

View File

@ -1,112 +0,0 @@
version: 2
models:
- name: silver__swaps
description: |-
This table records asset swaps on the Flow blockchain.
tests:
- dbt_utils.unique_combination_of_columns:
combination_of_columns:
- tx_id
- trader
- swap_index
columns:
- name: tx_id
description: "{{ doc('tx_id') }}"
tests:
- not_null
- name: block_timestamp
description: "{{ doc('block_timestamp') }}"
tests:
- not_null
- dbt_expectations.expect_row_values_to_have_recent_data:
datepart: day
interval: 1
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- TIMESTAMP_NTZ
- name: block_height
description: "{{ doc('block_height') }}"
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- NUMBER
- FLOAT
- name: swap_contract
description: "{{ doc('swap_contract') }}"
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- STRING
- VARCHAR
- name: swap_index
description: "{{ doc('swap_index') }}"
- name: trader
description: "{{ doc('trader') }}"
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- STRING
- VARCHAR
- name: token_out_source
description: "{{ doc('token_out_source') }}"
- name: token_out_amount
description: "{{ doc('token_out_amount') }}"
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- NUMBER
- FLOAT
- name: token_out_contract
description: "{{ doc('token_out_contract') }}"
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- STRING
- VARCHAR
- name: token_in_destination
description: "{{ doc('token_in_destination') }}"
- name: token_in_amount
description: "{{ doc('token_in_amount') }}"
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- NUMBER
- FLOAT
- name: token_in_contract
description: "{{ doc('token_in_contract') }}"
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- STRING
- VARCHAR
- name: _ingested_at
description: "{{ doc('_ingested_at') }}"
- name: _inserted_timestamp
description: "{{ doc('_inserted_timestamp') }}"
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- TIMESTAMP_NTZ

View File

@ -1,65 +0,0 @@
{{ config(
materialized = 'incremental',
cluster_by = ['_inserted_timestamp::DATE'],
unique_key = "CONCAT_WS('-', tx_id, event_index)",
incremental_strategy = 'delete+insert',
tags = ['scheduled', 'chainwalkers_scheduled']
) }}
WITH swap_contracts AS (
SELECT
*
FROM
{{ ref('silver__contract_labels') }}
WHERE
contract_name LIKE '%SwapPair%'
),
swaps_txs AS (
SELECT
*
FROM
{{ ref('silver__events_final') }}
WHERE
event_contract IN (
SELECT
event_contract
FROM
swap_contracts
)
{% if is_incremental() %}
AND _inserted_timestamp >= (
SELECT
MAX(_inserted_timestamp)
FROM
{{ this }}
)
{% endif %}
),
swap_events AS (
SELECT
*
FROM
{{ ref('silver__events_final') }}
WHERE
tx_id IN (
SELECT
tx_id
FROM
swaps_txs
)
{% if is_incremental() %}
AND _inserted_timestamp >= (
SELECT
MAX(_inserted_timestamp)
FROM
{{ this }}
)
{% endif %}
)
SELECT
*
FROM
swap_events

View File

@ -1,37 +0,0 @@
version: 2
models:
- name: silver__swaps_events
description: |-
This table records events from each Swap related transaction on the FLOW blockchain, as determined by an interaction with a SwapPair contract. This is intended to be internal only for downstream usage.
columns:
- name: tx_id
description: "{{ doc('tx_id') }}"
- name: block_timestamp
description: "{{ doc('block_timestamp') }}"
- name: block_height
description: "{{ doc('block_height') }}"
- name: tx_succeeded
description: "{{ doc('tx_succeeded') }}"
- name: event_index
description: "{{ doc('event_index') }}"
- name: event_contract
description: "{{ doc('event_contract') }}"
- name: event_type
description: "{{ doc('event_type') }}"
- name: event_data
description: "{{ doc('event_attributes') }}"
- name: _ingested_at
description: "{{ doc('_ingested_at') }}"
- name: _inserted_timestamp
description: "{{ doc('_inserted_timestamp') }}"

View File

@ -1,174 +0,0 @@
{{ config(
materialized = 'incremental',
cluster_by = ['_inserted_timestamp::DATE'],
unique_key = 'tx_id',
incremental_strategy = 'delete+insert',
tags = ['scheduled', 'chainwalkers_scheduled']
) }}
WITH swaps_events AS (
SELECT
*
FROM
{{ ref('silver__swaps_events') }}
{% if is_incremental() %}
WHERE
_inserted_timestamp >= (
SELECT
MAX(_inserted_timestamp)
FROM
{{ this }}
)
{% endif %}
),
action_ct AS (
SELECT
tx_id,
event_type,
COUNT(1) AS n
FROM
swaps_events
WHERE
event_type IN (
'Trade',
'Swap'
)
GROUP BY
1,
2
),
step_ct AS (
SELECT
tx_id,
OBJECT_AGG(
event_type,
n
) AS ob
FROM
action_ct
GROUP BY
1
),
single_trade AS (
SELECT
tx_id
FROM
step_ct
WHERE
ob :Trade = 1
AND ob :Swap IS NULL
),
swaps_single_trade AS (
SELECT
*
FROM
swaps_events
WHERE
tx_id IN (
SELECT
tx_id
FROM
single_trade
)
),
index_id AS (
SELECT
tx_id,
event_type,
MIN(event_index) AS event_index
FROM
swaps_single_trade
WHERE
event_type = 'TokensWithdrawn'
GROUP BY
1,
2
),
token_out_data AS (
SELECT
sst.tx_id,
block_timestamp,
block_height,
event_contract AS event_contract_token_out,
event_data AS event_data_token_out,
event_data :amount :: DOUBLE AS token_amount_token_out,
LOWER(
event_data :from :: STRING
) AS trader_token_out,
_ingested_at,
_inserted_timestamp
FROM
index_id ii
LEFT JOIN swaps_single_trade sst USING (
tx_id,
event_index
)
),
trade_data AS (
SELECT
tx_id,
block_timestamp,
event_type,
event_contract AS event_contract_trade,
event_data AS event_data_trade,
event_data :side :: NUMBER AS swap_side,
event_data :token1Amount :: DOUBLE AS token_1_amount,
-- note some are decimal adjusted, some are not. identify by contract
event_data :token2Amount :: DOUBLE AS token_2_amount,
l.account_address AS swap_account,
_ingested_at,
_inserted_timestamp
FROM
swaps_single_trade sst
LEFT JOIN {{ ref('silver__contract_labels') }}
l USING (event_contract)
WHERE
event_type = 'Trade'
),
token_in_data AS (
SELECT
sst.tx_id,
sst.block_timestamp,
sst.event_contract AS event_contract_token_in,
sst.event_data AS event_data_token_in,
sst.event_data :amount :: DOUBLE AS amount_token_in
FROM
trade_data t
LEFT JOIN swaps_single_trade sst
ON sst.tx_id = t.tx_id
AND t.swap_account = LOWER(
sst.event_data :from :: STRING
)
WHERE
sst.event_type = 'TokensWithdrawn'
),
combo AS (
SELECT
tod.tx_id,
tod.block_timestamp,
tod.block_height,
td.event_contract_trade AS swap_contract,
LOWER(
tod.trader_token_out
) AS trader,
tod.token_amount_token_out AS token_out_amount,
tod.event_contract_token_out AS token_out_contract,
tid.amount_token_in AS token_in_amount,
tid.event_contract_token_in AS token_in_contract,
-- keep these next 3 columns bc i can derive fees from the difference in token_out_amount and token_[n]_amount where n = swap_side
td.swap_side,
td.token_1_amount,
td.token_2_amount,
tod._ingested_at,
tod._inserted_timestamp
FROM
token_out_data tod
LEFT JOIN trade_data td USING (tx_id)
LEFT JOIN token_in_data tid USING (tx_id)
)
SELECT
*
FROM
combo

View File

@ -1,129 +0,0 @@
version: 2
models:
- name: silver__swaps_single_trade
description: |-
This table records single-pool asset swaps on the Flow blockchain. A single-pool asset swap is one where a liquidity pool exists for the trade such that no routing is required. (i.e. Swapping FUSD to USDT using `A.87f3f233f34b0733.FusdUsdtSwapPair`)
tests:
- dbt_utils.unique_combination_of_columns:
combination_of_columns:
- tx_id
- trader
- swap_contract
columns:
- name: tx_id
description: "{{ doc('tx_id') }}"
tests:
- not_null
- name: block_timestamp
description: "{{ doc('block_timestamp') }}"
tests:
- not_null
- dbt_expectations.expect_row_values_to_have_recent_data:
datepart: day
interval: 1
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- TIMESTAMP_NTZ
- name: block_height
description: "{{ doc('block_height') }}"
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- NUMBER
- FLOAT
- name: swap_contract
description: "{{ doc('swap_contract') }}"
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- STRING
- VARCHAR
- name: trader
description: "{{ doc('trader') }}"
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- STRING
- VARCHAR
- name: token_out_amount
description: "{{ doc('token_out_amount') }}"
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- NUMBER
- FLOAT
- name: token_out_contract
description: "{{ doc('token_out_contract') }}"
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- STRING
- VARCHAR
- name: token_in_amount
description: "{{ doc('token_in_amount') }}"
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- NUMBER
- FLOAT
- name: token_in_contract
description: "{{ doc('token_in_contract') }}"
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- STRING
- VARCHAR
- name: swap_side
description: "{{ doc('swap_side') }}"
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- NUMBER
- name: token_1_amount
description: "{{ doc('token_1_amount') }}"
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- NUMBER
- FLOAT
- name: token_2_amount
description: "{{ doc('token_2_amount') }}"
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- NUMBER
- FLOAT
- name: _ingested_at
description: "{{ doc('_ingested_at') }}"
- name: _inserted_timestamp
description: "{{ doc('_inserted_timestamp') }}"
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- TIMESTAMP_NTZ

View File

@ -1,298 +0,0 @@
{{ config(
materialized = 'incremental',
incremental_strategy = 'delete+insert',
cluster_by = ['_inserted_timestamp::date'],
unique_key = 'tx_id',
tags = ['bridge', 'scheduled', 'chainwalkers_scheduled']
) }}
WITH events AS (
SELECT
*
FROM
{{ ref('silver__events_final') }}
-- WHERE
-- event_data :: STRING != '{}'
{% if is_incremental() %}
WHERE
_inserted_timestamp >= (
SELECT
MAX(_inserted_timestamp)
FROM
{{ this }}
)
{% endif %}
),
teleport_events AS (
SELECT
tx_id,
block_timestamp,
block_height,
event_contract AS teleport_contract_fee,
event_data :amount :: DOUBLE AS amount_fee,
event_data :type :: NUMBER AS teleport_direction,
_ingested_at,
_inserted_timestamp
FROM
events
WHERE
event_type = 'FeeCollected'
AND event_contract LIKE '%Teleport%'
),
teleports_in AS (
SELECT
tx_id,
event_contract AS event_contract_teleport,
event_data :amount :: DOUBLE AS amount_teleport,
event_data :from AS from_teleport,
STRTOK_TO_ARRAY(
REPLACE(REPLACE(event_data :from :: STRING, '['), ']'),
', '
) :: ARRAY AS from_teleport_array,
COALESCE(
event_data :hash,
event_data :txHash
) :: STRING AS hash_teleport,
_ingested_at,
_inserted_timestamp
FROM
events
WHERE
tx_id IN (
SELECT
tx_id
FROM
teleport_events
WHERE
teleport_direction = 1
)
AND event_index = 0
AND event_contract LIKE '%Teleport%'
AND event_type IN (
'TokensTeleportedIn',
'Unlocked'
)
AND from_teleport :: STRING NOT LIKE '%{\"ArrayType%'
),
deposits AS (
SELECT
tx_id,
event_contract,
event_index,
event_data :amount :: DOUBLE AS amount_deposits,
ROW_NUMBER() over (
PARTITION BY tx_id
ORDER BY
amount_deposits DESC
) AS rn,
event_data :to :: STRING AS to_deposits
FROM
events
WHERE
tx_id IN (
SELECT
tx_id
FROM
teleports_in
)
AND event_type = 'TokensDeposited'
ORDER BY
tx_id,
amount_deposits DESC
),
blocto_inbound AS (
SELECT
t.tx_id,
f.block_timestamp,
f.block_height,
t.event_contract_teleport AS teleport_contract,
d.event_contract AS token_contract,
t.amount_teleport AS gross_amount,
f.amount_fee,
d.amount_deposits AS net_amount,
d.to_deposits AS flow_wallet_address,
f.teleport_direction,
'blocto' AS bridge,
f._ingested_at,
f._inserted_timestamp
FROM
teleports_in t
LEFT JOIN deposits d USING (tx_id)
LEFT JOIN teleport_events f USING (tx_id)
WHERE
d.rn = 1
),
teleports_out_withdraw_non_fiat AS (
SELECT
tx_id,
event_contract,
event_data :amount :: DOUBLE AS amount_withdraw,
event_data :from :: STRING AS from_withdraw
FROM
events
WHERE
tx_id IN (
SELECT
tx_id
FROM
teleport_events
WHERE
teleport_direction = 0
)
AND event_index = 0
AND event_contract != 'A.b19436aae4d94622.FiatToken'
),
teleports_out_withdraw_fiat AS (
SELECT
tx_id,
event_contract,
event_data :amount :: DOUBLE AS amount_withdraw,
event_data :from :: STRING AS from_withdraw
FROM
events
WHERE
tx_id IN (
SELECT
tx_id
FROM
teleport_events
WHERE
teleport_direction = 0
)
AND event_index = 1
AND event_contract = 'A.b19436aae4d94622.FiatToken'
),
teleports_out_withdraw AS (
SELECT
tx_id,
event_contract,
amount_withdraw,
from_withdraw
FROM
teleports_out_withdraw_non_fiat
UNION
SELECT
tx_id,
event_contract,
amount_withdraw,
from_withdraw
FROM
teleports_out_withdraw_fiat
),
teleports_out AS (
SELECT
tx_id,
event_contract AS event_contract_teleport,
event_data :amount :: DOUBLE AS amount_teleport,
event_data :to AS to_teleport,
STRTOK_TO_ARRAY(
REPLACE(REPLACE(event_data :to :: STRING, '['), ']'),
', '
) :: ARRAY AS to_teleport_array
FROM
events
WHERE
tx_id IN (
SELECT
tx_id
FROM
teleport_events
WHERE
teleport_direction = 0
)
AND event_type IN (
'TokensTeleportedOut',
'Locked'
)
),
blocto_outbound AS (
SELECT
t.tx_id,
f.block_timestamp,
f.block_height,
t.event_contract_teleport AS teleport_contract,
w.event_contract AS token_contract,
w.amount_withdraw AS gross_amount,
f.amount_fee,
t.amount_teleport AS net_amount,
w.from_withdraw AS flow_wallet_address,
f.teleport_direction,
'blocto' AS bridge,
f._ingested_at,
f._inserted_timestamp
FROM
teleports_out t
LEFT JOIN teleports_out_withdraw w USING (tx_id)
LEFT JOIN teleport_events f USING (tx_id)
),
tbl_union AS (
SELECT
tx_id,
block_timestamp,
block_height,
teleport_contract,
token_contract,
gross_amount,
amount_fee,
net_amount,
flow_wallet_address,
teleport_direction,
bridge,
_ingested_at,
_inserted_timestamp
FROM
blocto_inbound
UNION
SELECT
tx_id,
block_timestamp,
block_height,
teleport_contract,
token_contract,
gross_amount,
amount_fee,
net_amount,
flow_wallet_address,
teleport_direction,
bridge,
_ingested_at,
_inserted_timestamp
FROM
blocto_outbound
),
tele_labels AS (
SELECT
teleport_contract,
blockchain
FROM
{{ ref('seeds__blocto_teleport_labels') }}
),
FINAL AS (
SELECT
tx_id,
block_timestamp,
block_height,
t.teleport_contract,
token_contract,
gross_amount,
amount_fee,
net_amount,
flow_wallet_address,
CASE
WHEN teleport_direction = 0 THEN 'outbound'
ELSE 'inbound'
END AS teleport_direction,
l.blockchain,
bridge,
_ingested_at,
_inserted_timestamp
FROM
tbl_union t
LEFT JOIN tele_labels l USING (teleport_contract)
)
SELECT
*
FROM
FINAL

View File

@ -1,131 +0,0 @@
version: 2
models:
- name: silver__bridge_blocto
description: |-
This table parses transactions where tokens are bridged to or from the Flow network via Blocto Teleport.
tests:
- dbt_utils.unique_combination_of_columns:
combination_of_columns:
- tx_id
columns:
- name: tx_id
description: "{{ doc('tx_id') }}"
tests:
- not_null
- name: block_timestamp
description: "{{ doc('block_timestamp') }}"
tests:
- not_null
- dbt_expectations.expect_row_values_to_have_recent_data:
datepart: day
interval: 3
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- TIMESTAMP_NTZ
- name: block_height
description: "{{ doc('block_height') }}"
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- NUMBER
- FLOAT
- name: teleport_contract
description: "{{ doc('teleport_contract') }}"
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- STRING
- VARCHAR
- name: token_contract
description: "{{ doc('token_contract') }}"
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- STRING
- VARCHAR
- name: gross_amount
description: "{{ doc('gross_amount') }}"
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- NUMBER
- DOUBLE
- FLOAT
- name: amount_fee
description: "{{ doc('amount_fee') }}"
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- NUMBER
- DOUBLE
- FLOAT
- name: net_amount
description: "{{ doc('net_amount') }}"
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- NUMBER
- DOUBLE
- FLOAT
- name: flow_wallet_address
description: "{{ doc('flow_wallet_address') }}"
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- STRING
- VARCHAR
- name: teleport_direction
description: "{{ doc('teleport_direction') }}"
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- STRING
- VARCHAR
- name: blockchain
description: "{{ doc('blockchain') }}"
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- STRING
- VARCHAR
- name: bridge
description: "{{ doc('bridge') }}"
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- STRING
- VARCHAR
- name: _ingested_at
description: "{{ doc('_ingested_at') }}"
- name: _inserted_timestamp
description: "{{ doc('_inserted_timestamp') }}"
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- TIMESTAMP_NTZ

View File

@ -1,165 +0,0 @@
{{ config(
materialized = 'incremental',
incremental_strategy = 'delete+insert',
cluster_by = ['_inserted_timestamp::date'],
unique_key = 'tx_id',
tags = ['bridge', 'scheduled', 'chainwalkers_scheduled']
) }}
WITH events AS (
SELECT
*
FROM
{{ ref('silver__events_final') }}
-- WHERE
-- event_data :: STRING != '{}'
{% if is_incremental() %}
WHERE
_inserted_timestamp >= (
SELECT
MAX(_inserted_timestamp)
FROM
{{ this }}
)
{% endif %}
),
cbridge_txs AS (
SELECT
tx_id,
block_timestamp,
block_height,
tx_succeeded,
event_index,
event_contract,
event_type,
event_data,
_ingested_at,
_inserted_timestamp
FROM
events
WHERE
event_contract = 'A.08dd120226ec2213.PegBridge'
),
inbound AS (
SELECT
tx_id,
block_timestamp,
block_height,
event_contract AS bridge_contract,
REPLACE(REPLACE(event_data :token :: STRING, '.Vault'), '"') AS token_contract,
event_data :amount :: DOUBLE AS amount,
event_data :receiver :: STRING AS flow_wallet_address,
REPLACE(CONCAT('0x', event_data :depositor) :: STRING, '"') AS counterparty,
event_data :refChId :: NUMBER AS chain_id,
'inbound' AS direction,
'cbridge' AS bridge,
_ingested_at,
_inserted_timestamp
FROM
events
WHERE
tx_id IN (
SELECT
tx_id
FROM
cbridge_txs
)
AND event_type = 'Mint'
),
outbound AS (
SELECT
tx_id,
block_timestamp,
block_height,
event_contract AS bridge_contract,
REPLACE(REPLACE(event_data :token :: STRING, '.Vault'), '"') AS token_contract,
event_data :amount :: DOUBLE AS amount,
event_data :burner :: STRING AS flow_wallet_address,
REPLACE(
event_data :toAddr :: STRING,
'"'
) AS counterparty,
event_data :toChain :: NUMBER AS chain_id,
'outbound' AS direction,
'cbridge' AS bridge,
_ingested_at,
_inserted_timestamp
FROM
events
WHERE
tx_id IN (
SELECT
tx_id
FROM
cbridge_txs
)
AND event_type = 'Burn'
),
tbl_union AS (
SELECT
tx_id,
block_timestamp,
block_height,
bridge_contract,
token_contract,
amount,
flow_wallet_address,
counterparty,
chain_id,
direction,
bridge,
_ingested_at,
_inserted_timestamp
FROM
inbound
UNION
SELECT
tx_id,
block_timestamp,
block_height,
bridge_contract,
token_contract,
amount,
flow_wallet_address,
counterparty,
chain_id,
direction,
bridge,
_ingested_at,
_inserted_timestamp
FROM
outbound
),
chain_ids AS (
SELECT
chain_id,
blockchain
FROM
{{ ref('seeds__celer_chain_ids') }}
),
FINAL AS (
SELECT
tx_id,
block_timestamp,
block_height,
bridge_contract,
token_contract,
amount,
flow_wallet_address,
counterparty,
t.chain_id,
l.blockchain,
direction,
bridge,
_ingested_at,
_inserted_timestamp
FROM
tbl_union t
LEFT JOIN chain_ids l USING (chain_id)
)
SELECT
*
FROM
FINAL

View File

@ -1,128 +0,0 @@
version: 2
models:
- name: silver__bridge_celer
description: |-
This table parses transactions where tokens are bridged to or from the Flow network using the Celer cBridge.
tests:
- dbt_utils.unique_combination_of_columns:
combination_of_columns:
- tx_id
columns:
- name: tx_id
description: "{{ doc('tx_id') }}"
tests:
- not_null
- name: block_timestamp
description: "{{ doc('block_timestamp') }}"
tests:
- not_null
- dbt_expectations.expect_row_values_to_have_recent_data:
datepart: day
interval: 3
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- TIMESTAMP_NTZ
- name: block_height
description: "{{ doc('block_height') }}"
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- NUMBER
- FLOAT
- name: bridge_contract
description: "{{ doc('bridge_contract') }}"
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- STRING
- VARCHAR
- name: token_contract
description: "{{ doc('token_contract') }}"
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- STRING
- VARCHAR
- name: amount
description: "{{ doc('amount') }}"
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- NUMBER
- DOUBLE
- FLOAT
- name: flow_wallet_address
description: "{{ doc('flow_wallet_address') }}"
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- STRING
- VARCHAR
- name: counterparty
description: "{{ doc('counterparty') }}"
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- STRING
- VARCHAR
- name: chain_id
description: "{{ doc('chain_id') }}"
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- NUMBER
- name: blockchain
description: "{{ doc('blockchain') }}"
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- STRING
- VARCHAR
- name: direction
description: "{{ doc('direction') }}"
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- STRING
- VARCHAR
- name: bridge
description: "{{ doc('bridge') }}"
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- STRING
- VARCHAR
- name: _ingested_at
description: "{{ doc('_ingested_at') }}"
- name: _inserted_timestamp
description: "{{ doc('_inserted_timestamp') }}"
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- TIMESTAMP_NTZ

View File

@ -1,139 +0,0 @@
{{ config(
materialized = 'incremental',
incremental_strategy = 'delete+insert',
cluster_by = ['_inserted_timestamp::date'],
unique_key = "CONCAT_WS('-', tx_id, sender, recipient, token_contract, amount)",
tags = ['scheduled', 'chainwalkers_scheduled']
) }}
WITH events AS (
SELECT
*
FROM
{{ ref('silver__events_final') }}
-- WHERE
-- event_data :: STRING != '{}'
{% if is_incremental() %}
WHERE
_inserted_timestamp >= (
SELECT
MAX(_inserted_timestamp)
FROM
{{ this }}
)
{% endif %}
),
transfers AS (
SELECT
_inserted_timestamp,
tx_id,
event_contract,
COUNT(event_type) AS event_count,
MAX(
event_index + 1
) AS max_index
FROM
events
WHERE
event_type IN (
'TokensDeposited',
'TokensWithdrawn',
'FeesDeducted'
)
GROUP BY
_inserted_timestamp,
tx_id,
event_contract
HAVING
event_count = max_index
OR event_contract = 'A.b19436aae4d94622.FiatToken'
),
withdraws AS (
SELECT
block_height,
_inserted_timestamp,
block_timestamp,
tx_id,
event_data :from :: STRING AS sender,
event_contract AS token_contract,
event_data :amount :: FLOAT AS amount,
tx_succeeded
FROM
events
WHERE
tx_id IN (
SELECT
tx_id
FROM
transfers
)
AND event_type = 'TokensWithdrawn'
GROUP BY
block_height,
_inserted_timestamp,
block_timestamp,
tx_id,
sender,
token_contract,
amount,
tx_succeeded
),
deposits AS (
SELECT
tx_id,
_inserted_timestamp,
event_data :to :: STRING AS recipient,
event_contract AS token_contract,
event_data :amount :: FLOAT AS amount
FROM
events
WHERE
tx_id IN (
SELECT
tx_id
FROM
transfers
)
AND event_type = 'TokensDeposited'
GROUP BY
tx_id,
_inserted_timestamp,
recipient,
token_contract,
amount
),
FINAL AS (
SELECT
block_height,
w._inserted_timestamp AS _inserted_timestamp,
block_timestamp,
w.tx_id,
sender,
recipient,
w.token_contract,
SUM(COALESCE(d.amount, w.amount)) AS amount,
tx_succeeded
FROM
withdraws w
LEFT JOIN deposits d
ON w.tx_id = d.tx_id
AND w.token_contract = d.token_contract
AND w.amount = d.amount
WHERE
sender IS NOT NULL
GROUP BY
block_height,
w._inserted_timestamp,
block_timestamp,
w.tx_id,
sender,
recipient,
w.token_contract,
tx_succeeded
)
SELECT
*
FROM
FINAL

View File

@ -1,31 +0,0 @@
version: 2
models:
- name: silver__token_transfers
description: |-
This table records all token transfers on the FLOW blockchain.
columns:
- name: tx_id
description: "{{ doc('tx_id') }}"
- name: block_timestamp
description: "{{ doc('block_timestamp') }}"
- name: block_height
description: "{{ doc('block_height') }}"
- name: sender
description: "{{ doc('sender') }}"
- name: recipient
description: "{{ doc('recipient') }}"
- name: token_contract
description: "{{ doc('token_contract') }}"
- name: amount
description: "{{ doc('amount') }}"
- name: tx_succeeded
description: "{{ doc('tx_succeeded') }}"

View File

@ -2,7 +2,7 @@ WITH mint_events AS (
SELECT
MAX(block_timestamp) :: DATE AS last_mint_date
FROM
{{ ref('silver__nft_moments') }}
{{ ref('silver__nft_moments_s') }}
WHERE
event_contract = 'A.e4cf4bdc1751c65d.AllDay'
AND event_type = 'MomentNFTMinted'

View File

@ -2,7 +2,7 @@ WITH mint_events AS (
SELECT
MAX(block_timestamp) :: DATE AS last_mint_date
FROM
{{ ref('silver__nft_moments') }}
{{ ref('silver__nft_moments_s') }}
WHERE
event_contract = 'A.0b2a3299cc857e29.TopShot'
AND event_type = 'MomentMinted'