An 4015/streamline cutover (#200)

* core views

* defi schema

* bridge test upd

* gov schema

* nft schema

* price schema

* move tx count to blocks model

* add collection_count_agg to blocks

* upd core views to UNION ALL

* upd collections cluster_by

* upd dim moment metadata to qualify

* upd dim moment metadata test

* upd core dim contract labels to qualify and silver (cw) to incr

* upd str blocks lookback`
This commit is contained in:
Jack Forgash 2023-10-12 16:23:20 -06:00 committed by GitHub
parent 964f14ab16
commit bd08ee7f8f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
30 changed files with 666 additions and 143 deletions

View File

@ -68,6 +68,7 @@ vars:
DROP_UDFS_AND_SPS: False
REST_API_PREFIX_PROD: quxfxtl934.execute-api.us-east-1.amazonaws.com/prod/
REST_API_PREFIX_DEV: ul6x832e8l.execute-api.us-east-1.amazonaws.com/dev/
STREAMLINE_START_BLOCK: 55114467
dispatch:
- macro_namespace: dbt

View File

@ -0,0 +1,5 @@
{% docs collection_count_agg %}
Collection count from the tx_count aggregation, used to check against the collection_count in the block header.
{% enddocs %}

View File

@ -0,0 +1,5 @@
{% docs error_msg %}
The error message associated with the transaction, if it failed.
{% enddocs %}

View File

@ -0,0 +1,5 @@
{% docs tx_index %}
Deprecating soon. Note, on previous iterations the tx index was included indicating a transactions position within a block. However, on Flow, transactions are included within a collection. A block contains a number of collections (which themselves contain the transactions). Thus, tx_index is an inaccurate view of position within a block, as it is not collection based. This column is being deprecated.
{% enddocs %}

View File

@ -3,14 +3,41 @@
tags = ['scheduled']
) }}
WITH contract_labels AS (
WITH chainwalkers AS (
SELECT
event_contract,
contract_name,
account_address,
_inserted_timestamp
FROM
{{ ref('silver__contract_labels') }}
),
streamline AS (
SELECT
event_contract,
contract_name,
account_address,
_inserted_timestamp
FROM
{{ ref('silver__contract_labels_s') }}
),
FINAL AS (
SELECT
*
FROM
{{ ref('silver__contract_labels') }}
chainwalkers
UNION ALL
SELECT
*
FROM
streamline
)
SELECT
*
FROM
contract_labels
FINAL qualify ROW_NUMBER() over (
PARTITION BY event_contract
ORDER BY
_inserted_timestamp DESC
) = 1

View File

@ -16,6 +16,7 @@ models:
description: "{{ doc('event_contract') }}"
tests:
- not_null
- unique
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- STRING

View File

@ -3,19 +3,55 @@
tags = ['ez', 'scheduled']
) }}
WITH chainwalkers AS (
SELECT
block_height,
block_timestamp,
tx_id,
sender,
recipient,
token_contract,
amount,
tx_succeeded
FROM
{{ ref('silver__token_transfers') }}
WHERE
token_contract NOT IN (
'A.c38aea683c0c4d38.ZelosAccountingToken',
'A.f1b97c06745f37ad.SwapPair'
)
AND block_height < {{ var(
'STREAMLINE_START_BLOCK'
) }}
),
streamline AS (
SELECT
block_height,
block_timestamp,
tx_id,
sender,
recipient,
token_contract,
amount,
tx_succeeded
FROM
{{ ref('silver__token_transfers_s') }}
WHERE
token_contract NOT IN (
'A.c38aea683c0c4d38.ZelosAccountingToken',
'A.f1b97c06745f37ad.SwapPair'
)
AND block_height >= {{ var(
'STREAMLINE_START_BLOCK'
) }}
)
SELECT
block_height,
block_timestamp,
tx_id,
sender,
recipient,
token_contract,
amount,
tx_succeeded
*
FROM
{{ ref('silver__token_transfers') }}
WHERE
token_contract NOT IN (
'A.c38aea683c0c4d38.ZelosAccountingToken',
'A.f1b97c06745f37ad.SwapPair'
)
streamline
UNION ALL
SELECT
*
FROM
chainwalkers

View File

@ -3,14 +3,8 @@
tags = ['scheduled']
) }}
WITH silver_blocks AS (
WITH chainwalkers AS (
SELECT
*
FROM
{{ ref('silver__blocks') }}
),
gold_blocks AS (
SELECT
block_height,
block_timestamp,
@ -21,9 +15,35 @@ gold_blocks AS (
id,
parent_id
FROM
silver_blocks
{{ ref('silver__blocks') }}
WHERE
block_height < {{ var(
'STREAMLINE_START_BLOCK'
) }}
),
streamline AS (
SELECT
block_height,
block_timestamp,
'mainnet' AS network,
network_version,
'flow' AS chain_id,
tx_count,
id,
parent_id
FROM
{{ ref('silver__streamline_blocks') }}
WHERE
block_height >= {{ var(
'STREAMLINE_START_BLOCK'
) }}
)
SELECT
*
FROM
gold_blocks
chainwalkers
UNION ALL
SELECT
*
FROM
streamline

View File

@ -3,14 +3,8 @@
tags = ['scheduled']
) }}
WITH events_final AS (
WITH chainwalkers AS (
SELECT
*
FROM
{{ ref('silver__events_final') }}
),
events AS (
SELECT
tx_id,
block_timestamp,
@ -21,9 +15,35 @@ events AS (
event_type,
event_data
FROM
events_final
{{ ref('silver__events_final') }}
WHERE
block_height < {{ var(
'STREAMLINE_START_BLOCK'
) }}
),
streamline AS (
SELECT
tx_id,
block_timestamp,
block_height,
tx_succeeded,
event_index,
event_contract,
event_type,
event_data
FROM
{{ ref('silver__streamline_events') }}
WHERE
block_height >= {{ var(
'STREAMLINE_START_BLOCK'
) }}
)
SELECT
*
FROM
events
chainwalkers
UNION ALL
SELECT
*
FROM
streamline

View File

@ -23,6 +23,7 @@ models:
- dbt_expectations.expect_row_values_to_have_recent_data:
datepart: day
interval: 1
where: block_height >= 55114467
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- TIMESTAMP_NTZ
@ -73,7 +74,6 @@ models:
- name: EVENT_DATA
description: "{{ doc('event_attributes') }}"
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- OBJECT

View File

@ -3,14 +3,8 @@
tags = ['scheduled']
) }}
WITH silver_txs AS (
WITH chainwalkers AS (
SELECT
*
FROM
{{ ref('silver__transactions') }}
),
gold_txs AS (
SELECT
tx_id,
block_timestamp,
@ -26,9 +20,48 @@ gold_txs AS (
tx_succeeded,
error_msg
FROM
silver_txs
{{ ref('silver__transactions') }}
WHERE
block_height < {{ var(
'STREAMLINE_START_BLOCK'
) }}
),
streamline AS (
SELECT
tx_id,
block_timestamp,
block_height,
'flow' AS chain_id,
NULL AS tx_index,
proposer,
payer,
authorizers,
count_authorizers,
gas_limit,
OBJECT_CONSTRUCT(
'error',
error_message,
'events',
events,
'status',
status
) AS transaction_result,
tx_succeeded,
error_message AS error_msg
FROM
{{ ref('silver__streamline_transactions_final') }}
WHERE
NOT pending_result_response
AND block_height >= {{ var(
'STREAMLINE_START_BLOCK'
) }}
)
SELECT
*
FROM
gold_txs
chainwalkers
UNION ALL
SELECT
*
FROM
streamline

View File

@ -4,11 +4,6 @@ models:
- name: core__fact_transactions
description: |-
This table records all the transactions of the FLOW blockchain.
tests:
- dbt_utils.unique_combination_of_columns:
combination_of_columns:
- tx_id
- block_height
columns:
- name: TX_ID
@ -24,6 +19,7 @@ models:
- dbt_expectations.expect_row_values_to_have_recent_data:
datepart: day
interval: 1
where: block_height >= 55114467
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- TIMESTAMP_NTZ
@ -47,12 +43,7 @@ models:
- VARCHAR
- name: TX_INDEX
description: tbd
tests:
- not_null
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- NUMBER
description: "{{ doc('tx_index') }}"
- name: PROPOSER
description: "{{ doc('proposer') }}"
@ -111,9 +102,10 @@ models:
- BOOLEAN
- name: ERROR_MSG
description: tbd
description: "{{ doc('error_msg') }}"
tests:
- not_null
- not_null:
where: not TX_SUCCEEDED
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- STRING

View File

@ -3,7 +3,7 @@
tag = ['scheduled']
) }}
WITH pairs AS (
WITH pairs_cw AS (
SELECT
swap_contract,
@ -15,7 +15,7 @@ WITH pairs AS (
FROM
{{ ref('silver__labels_pools') }}
),
metapier AS (
metapier_cw AS (
SELECT
swap_contract,
deployment_timestamp,
@ -26,16 +26,48 @@ metapier AS (
FROM
{{ ref('silver__labels_pools_metapier') }}
),
pairs_s AS (
SELECT
swap_contract,
deployment_timestamp,
token0_contract,
token1_contract,
pool_id,
vault_address
FROM
{{ ref('silver__labels_pools_s') }}
),
metapier_s AS (
SELECT
swap_contract,
deployment_timestamp,
token0_contract,
token1_contract,
pool_id,
vault_address
FROM
{{ ref('silver__labels_pools_metapier_s') }}
),
FINAL AS (
SELECT
*
FROM
pairs
pairs_cw
UNION
SELECT
*
FROM
metapier
metapier_cw
UNION
SELECT
*
FROM
pairs_s
UNION
SELECT
*
FROM
metapier_s
)
SELECT
*

View File

@ -10,18 +10,46 @@
}
) }}
WITH blocto AS (
WITH blocto_cw AS (
SELECT
*
FROM
{{ ref('silver__bridge_blocto') }}
WHERE
block_height < {{ var(
'STREAMLINE_START_BLOCK'
) }}
),
celer AS (
celer_cw AS (
SELECT
*
FROM
{{ ref('silver__bridge_celer') }}
WHERE
block_height < {{ var(
'STREAMLINE_START_BLOCK'
) }}
),
blocto_s AS (
SELECT
*
FROM
{{ ref('silver__bridge_blocto_s') }}
WHERE
block_height >= {{ var(
'STREAMLINE_START_BLOCK'
) }}
),
celer_s AS (
SELECT
*
FROM
{{ ref('silver__bridge_celer_s') }}
WHERE
block_height >= {{ var(
'STREAMLINE_START_BLOCK'
) }}
),
combo AS (
SELECT
@ -36,8 +64,8 @@ combo AS (
teleport_direction AS direction,
bridge
FROM
blocto
UNION
blocto_cw
UNION ALL
SELECT
tx_id,
block_timestamp,
@ -50,7 +78,35 @@ combo AS (
direction,
bridge
FROM
celer
celer_cw
UNION ALL
SELECT
tx_id,
block_timestamp,
block_height,
teleport_contract AS bridge_contract,
token_contract,
gross_amount AS amount,
flow_wallet_address,
blockchain,
teleport_direction AS direction,
bridge
FROM
blocto_s
UNION ALL
SELECT
tx_id,
block_timestamp,
block_height,
bridge_contract,
token_contract,
amount,
flow_wallet_address,
blockchain,
direction,
bridge
FROM
celer_s
)
SELECT
*

View File

@ -23,6 +23,7 @@ models:
- dbt_expectations.expect_row_values_to_have_recent_data:
datepart: day
interval: 3
where: block_height >= 55114467
- dbt_expectations.expect_column_values_to_be_in_type_list:
column_type_list:
- TIMESTAMP_NTZ

View File

@ -10,18 +10,61 @@
}
) }}
WITH chainwalkers AS (
SELECT
tx_id,
block_timestamp,
block_height,
swap_contract,
swap_index,
trader,
token_out_source,
token_out_contract,
token_out_amount,
token_in_destination,
token_in_contract,
token_in_amount
FROM
{{ ref('silver__swaps') }}
WHERE
block_height < {{ var(
'STREAMLINE_START_BLOCK'
) }}
),
streamline AS (
SELECT
tx_id,
block_timestamp,
block_height,
swap_contract,
swap_index,
trader,
token_out_source,
token_out_contract,
token_out_amount,
token_in_destination,
token_in_contract,
token_in_amount
FROM
{{ ref('silver__swaps_s') }}
WHERE
block_height >= {{ var(
'STREAMLINE_START_BLOCK'
) }}
),
FINAL AS (
SELECT
*
FROM
chainwalkers
UNION ALL
SELECT
*
FROM
streamline
)
SELECT
tx_id,
block_timestamp,
block_height,
swap_contract,
swap_index,
trader,
token_out_source,
token_out_contract,
token_out_amount,
token_in_destination,
token_in_contract,
token_in_amount
*
FROM
{{ ref('silver__swaps') }}
FINAL

View File

@ -10,12 +10,26 @@
}
) }}
WITH staking_actions AS (
WITH chainwalkers AS (
SELECT
*
FROM
{{ ref('silver__staking_actions') }}
WHERE
block_height < {{ var(
'STREAMLINE_START_BLOCK'
) }}
),
streamline AS (
SELECT
*
FROM
{{ ref('silver__staking_actions_s') }}
WHERE
block_height >= {{ var(
'STREAMLINE_START_BLOCK'
) }}
),
FINAL AS (
SELECT
@ -29,7 +43,20 @@ FINAL AS (
amount,
node_id
FROM
staking_actions
chainwalkers
UNION ALL
SELECT
tx_id,
event_index,
block_timestamp,
block_height,
tx_succeeded,
delegator,
action,
amount,
node_id
FROM
streamline
)
SELECT
*

View File

@ -10,27 +10,79 @@
}
) }}
WITH chainwalkers AS (
SELECT
event_contract AS nft_collection,
nft_id,
serial_number,
max_mint_size,
play_id,
series_id,
series_name,
set_id,
set_name,
edition_id,
tier,
metadata,
_inserted_timestamp
FROM
{{ ref('silver__nft_moment_metadata_final') }}
WHERE
NOT (
nft_collection = 'A.87ca73a41bb50ad5.Golazos'
AND edition_id = 486
)
AND NOT (
nft_collection = 'A.e4cf4bdc1751c65d.AllDay'
AND edition_id = 1486
)
),
streamline AS (
SELECT
event_contract AS nft_collection,
nft_id,
serial_number,
max_mint_size,
play_id,
series_id,
series_name,
set_id,
set_name,
edition_id,
tier,
metadata,
_inserted_timestamp
FROM
{{ ref('silver__nft_moment_metadata_final_s') }}
WHERE
NOT (
nft_collection = 'A.87ca73a41bb50ad5.Golazos'
AND edition_id = 486
)
AND NOT (
nft_collection = 'A.e4cf4bdc1751c65d.AllDay'
AND edition_id = 1486
)
),
FINAL AS (
SELECT
*
FROM
chainwalkers
UNION ALL
SELECT
*
FROM
streamline
)
SELECT
event_contract AS nft_collection,
nft_id,
serial_number,
max_mint_size,
play_id,
series_id,
series_name,
set_id,
set_name,
edition_id,
tier,
metadata
*
FROM
{{ ref('silver__nft_moment_metadata_final') }}
WHERE
NOT (
nft_collection = 'A.87ca73a41bb50ad5.Golazos'
AND edition_id = 486
)
AND NOT (
nft_collection = 'A.e4cf4bdc1751c65d.AllDay'
AND edition_id = 1486
)
FINAL qualify ROW_NUMBER() over (
PARTITION BY nft_collection,
nft_id
ORDER BY
series_name is not null DESC,
_inserted_timestamp DESC
) = 1

View File

@ -38,16 +38,12 @@ models:
- not_null
- name: SERIES_NAME
description: "{{ doc('series_name') }}"
tests:
- not_null
- name: SET_ID
description: "{{ doc('set_id') }}"
tests:
- not_null
- name: SET_NAME
description: "{{ doc('set_name') }}"
tests:
- not_null
- name: EDITION_ID
description: "{{ doc('edition_id') }}"
tests:

View File

@ -10,14 +10,28 @@
}
) }}
WITH silver_nfts AS (
WITH chainwalkers AS (
SELECT
*
FROM
{{ ref('silver__nft_sales') }}
WHERE
block_height < {{ var(
'STREAMLINE_START_BLOCK'
) }}
),
gold_nfts AS (
streamline AS (
SELECT
*
FROM
{{ ref('silver__nft_sales_s') }}
WHERE
block_height >= {{ var(
'STREAMLINE_START_BLOCK'
) }}
),
FINAL AS (
SELECT
tx_id,
block_height,
@ -33,9 +47,26 @@ gold_nfts AS (
tokenflow,
counterparties
FROM
silver_nfts
chainwalkers
UNION ALL
SELECT
tx_id,
block_height,
block_timestamp,
marketplace,
nft_collection,
nft_id,
buyer,
seller,
price,
currency,
tx_succeeded,
tokenflow,
counterparties
FROM
streamline
)
SELECT
*
FROM
gold_nfts
FINAL

View File

@ -17,7 +17,7 @@ WITH api AS (
FROM
{{ ref('silver__prices_hourly') }}
),
swaps AS (
swaps_cw AS (
SELECT
recorded_hour,
id,
@ -40,18 +40,48 @@ swaps AS (
FROM
{{ ref('silver__prices_swaps_hourly') }}
),
swaps_s AS (
SELECT
recorded_hour,
id,
CASE
WHEN id = 'A.1654653399040a61.FlowToken' THEN 'Flow'
WHEN id = 'A.cfdd90d4a00f7b5b.TeleportedTetherToken' THEN 'USDT'
WHEN id = 'A.3c5959b568896393.FUSD' THEN 'FUSD'
WHEN id = 'A.0f9df91c9121c460.BloctoToken' THEN 'Blocto'
WHEN id = 'A.d01e482eb680ec9f.REVV' THEN 'Revv'
WHEN id = 'A.b19436aae4d94622.FiatToken' THEN 'USDC'
WHEN id = 'A.142fa6570b62fd97.StarlyToken' THEN 'Starly'
WHEN id = 'A.475755d2c9dccc3a.TeleportedSportiumToken' THEN 'Sportium'
ELSE NULL -- will trigger alert if swaps model picks up another token
END AS token,
OPEN,
high,
low,
CLOSE,
provider
FROM
{{ ref('silver__prices_swaps_hourly_s') }}
),
FINAL AS (
SELECT
*
FROM
api
UNION
UNION ALL
SELECT
*
FROM
swaps
swaps_cw
UNION ALL
SELECT
*
FROM
swaps_s
)
SELECT
*
FROM
FINAL
WHERE
recorded_hour IS NOT NULL

View File

@ -22,7 +22,7 @@ prices AS (
FROM
{{ this.database }}.silver.prices
),
prices_swaps AS (
prices_swaps_cw AS (
SELECT
tx_id,
block_timestamp AS TIMESTAMP,
@ -32,6 +32,16 @@ prices_swaps AS (
FROM
{{ ref('silver__prices_swaps') }}
),
prices_swaps_s AS (
SELECT
tx_id,
block_timestamp AS TIMESTAMP,
token_contract,
swap_price AS price_usd,
source
FROM
{{ ref('silver__prices_swaps_s') }}
),
viewnion AS (
SELECT
TIMESTAMP,
@ -44,7 +54,7 @@ viewnion AS (
FROM
prices p
LEFT JOIN token_labels l USING (symbol)
UNION
UNION ALL
SELECT
TIMESTAMP,
l.token,
@ -54,10 +64,24 @@ viewnion AS (
source,
tx_id
FROM
prices_swaps ps
prices_swaps_cw ps
LEFT JOIN token_labels l USING (token_contract)
UNION ALL
SELECT
TIMESTAMP,
l.token,
l.symbol,
pss.token_contract,
price_usd,
source,
tx_id
FROM
prices_swaps_s pss
LEFT JOIN token_labels l USING (token_contract)
)
SELECT
*
FROM
viewnion
WHERE
TIMESTAMP IS NOT NULL

View File

@ -12,7 +12,6 @@ WITH streamline_blocks AS (
block_number,
DATA: height :: STRING AS block_height,
DATA: id :: STRING AS block_id,
-- TODO in core view alias this as just id
DATA :timestamp :: timestamp_ntz AS block_timestamp,
ARRAY_SIZE(
DATA :collection_guarantees :: ARRAY
@ -34,6 +33,23 @@ WHERE
FROM
{{ this }}
)
OR block_height IN (
-- lookback to ensure tx count is correct
SELECT
block_height
FROM
{{ this }}
WHERE
block_height >= {{ var(
'STREAMLINE_START_BLOCK'
) }}
-- limit to half a day for performance
AND _inserted_timestamp >= SYSDATE() - INTERVAL '12 hours'
AND (
tx_count IS NULL
OR collection_count != collection_count_agg
)
)
{% else %}
{{ ref('bronze__streamline_fr_blocks') }}
{% endif %}
@ -52,6 +68,33 @@ network_version AS (
FROM
{{ ref('seeds__network_version') }}
),
collections AS (
SELECT
*
FROM
{{ ref('silver__streamline_collections') }}
{% if is_incremental() %}
WHERE
_inserted_timestamp >= (
SELECT
MAX(_inserted_timestamp)
FROM
{{ this }}
)
{% endif %}
),
tx_count AS (
SELECT
block_number AS block_height,
SUM(tx_count) AS tx_count,
COUNT(1) AS collection_count,
MIN(_inserted_timestamp) AS _inserted_timestamp
FROM
collections
GROUP BY
1
),
FINAL AS (
SELECT
b.block_number,
@ -60,10 +103,16 @@ FINAL AS (
b.block_id AS id,
b.block_timestamp,
b.collection_count,
IFF(
b.collection_count = 0,
b.collection_count,
C.tx_count
) AS tx_count,
b.parent_id,
b.signatures,
b.collection_guarantees,
b.block_seals,
C.collection_count AS collection_count_agg,
b._partition_by_block_id,
b._inserted_timestamp
FROM
@ -71,6 +120,7 @@ FINAL AS (
LEFT JOIN network_version v
ON b.block_height BETWEEN v.root_height
AND v.end_height
LEFT JOIN tx_count C USING (block_height)
)
SELECT
*

View File

@ -32,6 +32,9 @@ models:
- name: BLOCK_SEALS
description: "{{ doc('block_seals') }}"
- name: COLLECTION_COUNT_AGG
description: "{{ doc('collection_count_agg') }}"
- name: _PARTITION_BY_BLOCK_ID
description: "{{ doc('_partition_by_block_id') }}"

View File

@ -2,7 +2,7 @@
{{ config(
materialized = 'incremental',
unique_key = "collection_id",
cluster_by = "block_number",
cluster_by = ['_inserted_timestamp :: DATE', 'block_number'],
tags = ['streamline_load', 'core']
) }}

View File

@ -27,16 +27,26 @@ WHERE
{{ this }}
)
{% endif %}
),
FINAL AS (
SELECT
DISTINCT event_contract,
ec_s [array_size(ec_s)-1] :: STRING AS contract_name,
CONCAT(
'0x',
ec_s [array_size(ec_s)-2] :: STRING
) AS account_address,
_inserted_timestamp
FROM
splt
WHERE
ec_s [0] != 'flow'
)
SELECT
DISTINCT event_contract,
ec_s [array_size(ec_s)-1] :: STRING AS contract_name,
CONCAT(
'0x',
ec_s [array_size(ec_s)-2] :: STRING
) AS account_address,
_inserted_timestamp
*
FROM
splt
WHERE
ec_s [0] != 'flow'
FINAL qualify ROW_NUMBER() over (
PARTITION BY event_contract
ORDER BY
_inserted_timestamp DESC
) = 1

View File

@ -1,5 +1,6 @@
{{ config(
materialized = 'table',
materialized = 'incremental',
incremental_strategy = 'delete+insert',
cluster_by = ['event_contract'],
unique_key = 'event_contract',
tags = ['scheduled', 'chainwalkers_scheduled']
@ -12,18 +13,40 @@ WITH splt AS (
SPLIT(
event_contract,
'.'
) AS ec_s
) AS ec_s,
_inserted_timestamp
FROM
{{ ref('silver__events') }}
{% if is_incremental() %}
WHERE
_inserted_timestamp >= (
SELECT
MAX(_inserted_timestamp)
FROM
{{ this }}
)
{% endif %}
),
FINAL AS (
SELECT
DISTINCT event_contract,
ec_s [array_size(ec_s)-1] :: STRING AS contract_name,
CONCAT(
'0x',
ec_s [array_size(ec_s)-2] :: STRING
) AS account_address,
_inserted_timestamp
FROM
splt
WHERE
ec_s [0] != 'flow'
)
SELECT
DISTINCT event_contract,
ec_s [array_size(ec_s)-1] :: STRING AS contract_name,
CONCAT(
'0x',
ec_s [array_size(ec_s)-2] :: STRING
) AS account_address
*
FROM
splt
WHERE
ec_s [0] != 'flow'
FINAL qualify ROW_NUMBER() over (
PARTITION BY event_contract
ORDER BY
_inserted_timestamp DESC
) = 1

View File

@ -7,7 +7,7 @@ WITH dev_sample AS (
SELECT
*
FROM
flow_dev.silver.streamline_events
{{ ref('silver__streamline_events') }}
WHERE
block_height >= 55114467
),
@ -15,7 +15,7 @@ prod_sample AS (
SELECT
*
FROM
flow.silver.events_final
{{ ref('silver__events_final') }}
WHERE
block_height >= 55114467
),