diff --git a/data/silver__native_assets_seed.csv b/data/silver__native_assets_seed.csv new file mode 100644 index 0000000..aa6fc86 --- /dev/null +++ b/data/silver__native_assets_seed.csv @@ -0,0 +1,208 @@ +provider,name,symbol,id,decimals +coingecko,Bitcoin,BTC,bitcoin,8 +coingecko,Ethereum,ETH,ethereum,18 +coingecko,BNB,BNB,binancecoin,18 +coingecko,Solana,SOL,solana,18 +coingecko,Cardano,ADA,cardano +coingecko,Avalanche,AVAX,avalanche-2,18 +coingecko,Toncoin,TON,the-open-network +coingecko,Bitcoin Cash,BCH,bitcoin-cash +coingecko,TRON,TRX,tron +coingecko,NEAR Protocol,NEAR,near,18 +coingecko,Aptos,APT,aptos,18 +coingecko,Filecoin,FIL,filecoin +coingecko,Cosmos Hub,ATOM,cosmos,18 +coingecko,Cronos,CRO,crypto-com-chain +coingecko,Hedera,HBAR,hedera-hashgraph +coingecko,Kaspa,KAS,kaspa +coingecko,Injective,INJ,injective-protocol +coingecko,Fantom,FTM,fantom,18 +coingecko,Monero,XMR,monero +coingecko,Arweave,AR,arweave +coingecko,Sui,SUI,sui +coingecko,GALA,GALA,gala +coingecko,Core,CORE,coredaoorg +coingecko,Sei,SEI,sei-network,18 +coingecko,Algorand,ALGO,algorand +coingecko,Flow,FLOW,flow,18 +coingecko,MultiversX,EGLD,elrond-erd-2 +coingecko,Flare,FLR,flare-networks +coingecko,eCash,XEC,ecash +coingecko,Chiliz,CHZ,chiliz +coingecko,Tezos,XTZ,tezos +coingecko,Ronin,RON,ronin +coingecko,Kava,KAVA,kava +coingecko,Klaytn,KLAY,klay-token +coingecko,AIOZ Network,AIOZ,aioz-network +coingecko,Gnosis,GNO,gnosis,18 +coingecko,Oasis Network,ROSE,oasis-network +coingecko,Radix,XRD,radix +coingecko,Astar,ASTR,astar +coingecko,Enjin Coin,ENJ,enjincoin +coingecko,MANTRA,OM,mantra-dao +coingecko,ZetaChain,ZETA,zetachain +coingecko,Chia,XCH,chia +coingecko,Decred,DCR,decred +coingecko,Moonbeam,GLMR,moonbeam +coingecko,Vanar Chain,VANRY,vanar-chain +coingecko,Kadena,KDA,kadena +coingecko,Chromia,CHR,chromaway +coingecko,Aleph Zero,AZERO,aleph-zero +coingecko,WAX,WAXP,wax +coingecko,Dusk,DUSK,dusk-network +coingecko,Oraichain,ORAI,oraichain-token +coingecko,Syscoin,SYS,syscoin +coingecko,Alephium,ALPH,alephium +coingecko,Humans.ai,HEART,humans-ai +coingecko,Tectum,TET,tectum +coingecko,Moonriver,MOVR,moonriver +coingecko,Hive,HIVE,hive +coingecko,Oasys,OAS,oasys +coingecko,QANplatform,QANX,qanplatform +coingecko,Cudos,CUDOS,cudos +coingecko,Ergo,ERG,ergo +coingecko,CANTO,CANTO,canto +coingecko,LUKSO,LYX,lukso-token-2 +coingecko,LTO Network,LTO,lto-network +coingecko,Viction,VIC,tomochain +coingecko,Agoric,BLD,agoric +coingecko,MELD,MELD,meld-2 +coingecko,Picasso,PICA,picasso +coingecko,Concordium,CCD,concordium +coingecko,Archway,ARCH,archway +coingecko,Jackal Protocol,JKL,jackal-protocol +coingecko,Zano,ZANO,zano +coingecko,Thought,THT,thought +coingecko,Taraxa,TARA,taraxa +coingecko,OctaSpace,OCTA,octaspace +coingecko,Evmos,EVMOS,evmos +coingecko,AirDAO,AMB,amber +coingecko,Neurai,XNA,neurai +coingecko,FIO Protocol,FIO,fio-protocol +coingecko,Nexa,NEXA,nexacoin +coingecko,Phantasma,SOUL,phantasma +coingecko,Radiant,RXD,radiant +coingecko,Electra Protocol,XEP,electra-protocol +coingecko,KIRA Network,KEX,kira-network +coingecko,Fuse,FUSE,fuse-network-token +coingecko,Saito,SAITO,saito +coingecko,Hacash,HAC,hacash +coingecko,Chihuahua Chain,HUAHUA,chihuahua-token +coingecko,MainnetZ,NETZ,mainnetz +coingecko,Aura Network,AURA,aura-network +coingecko,Witnet,WIT,witnet +coingecko,White Whale,WHALE,white-whale +coingecko,Script Network,SCPT,script-network +coingecko,Diamond,DMD,diamond +coingecko,Interlay,INTR,interlay +coingecko,MultiVAC,MTV,multivac +coingecko,LiquidLayer,LILA,liquidlayer +coingecko,Epic Cash,EPIC,epic-cash +coingecko,Unique Network,UNQ,unique-network +coingecko,THORChain,RUNE,thorchain,18 +coingecko,Axelar,AXL,axelar,18 +coingecko,Osmosis,OSMO,osmosis,18 +coinmarketcap,Bitcoin,BTC,1,8 +coinmarketcap,Ethereum,ETH,1027,18 +coinmarketcap,BNB,BNB,1839,18 +coinmarketcap,Solana,SOL,5426,18 +coinmarketcap,Cardano,ADA,2010 +coinmarketcap,Avalanche,AVAX,5805,18 +coinmarketcap,Toncoin,TON,11419 +coinmarketcap,Bitcoin Cash,BCH,1831 +coinmarketcap,Polkadot,DOT,6636 +coinmarketcap,TRON,TRX,1958 +coinmarketcap,Internet Computer,ICP,8916 +coinmarketcap,NEAR Protocol,NEAR,6535,18 +coinmarketcap,Aptos,APT,21794,18 +coinmarketcap,Cosmos,ATOM,3794,18 +coinmarketcap,Cronos,CRO,3635 +coinmarketcap,Hedera,HBAR,4642 +coinmarketcap,Injective,INJ,7226 +coinmarketcap,Kaspa,KAS,20396 +coinmarketcap,Theta Network,THETA,2416 +coinmarketcap,Fantom,FTM,3513,18 +coinmarketcap,Sui,SUI,20947 +coinmarketcap,Sei,SEI,23149,18 +coinmarketcap,Algorand,ALGO,4030 +coinmarketcap,Flow,FLOW,4558,18 +coinmarketcap,MultiversX,EGLD,6892 +coinmarketcap,eCash,XEC,10791 +coinmarketcap,Chiliz,CHZ,4066 +coinmarketcap,Tezos,XTZ,2011 +coinmarketcap,Ronin,RON,14101 +coinmarketcap,Kava,KAVA,4846 +coinmarketcap,Klaytn,KLAY,4256 +coinmarketcap,Gnosis,GNO,1659,18 +coinmarketcap,AIOZ Network,AIOZ,9104 +coinmarketcap,Radix,XRD,11948 +coinmarketcap,Astar,ASTR,12885 +coinmarketcap,XDC Network,XDC,2634 +coinmarketcap,Enjin Coin,ENJ,2130 +coinmarketcap,ZetaChain,ZETA,21259 +coinmarketcap,Casper,CSPR,5899 +coinmarketcap,OriginTrail,TRAC,2467 +coinmarketcap,Chia,XCH,9258 +coinmarketcap,Kadena,KDA,5647 +coinmarketcap,Chromia,CHR,3978 +coinmarketcap,WAX,WAXP,2300 +coinmarketcap,Stride,STRD,21781 +coinmarketcap,Aleph Zero,AZERO,11976 +coinmarketcap,Syscoin,SYS,541 +coinmarketcap,Alephium,ALPH,14878 +coinmarketcap,Dusk,DUSK,4092 +coinmarketcap,Hive,HIVE,5370 +coinmarketcap,Oraichain,ORAI,7533 +coinmarketcap,Moonriver,MOVR,9285 +coinmarketcap,Oasys,OAS,22265 +coinmarketcap,NYM,NYM,17591 +coinmarketcap,Ergo,ERG,1762 +coinmarketcap,Tectum,TET,21964 +coinmarketcap,CUDOS,CUDOS,8258 +coinmarketcap,LUKSO,LYX,27622 +coinmarketcap,LTO Network,LTO,3714 +coinmarketcap,The Root Network,ROOT,28479 +coinmarketcap,Dynex,DNX,22858 +coinmarketcap,Coreum,COREUM,16399 +coinmarketcap,KYVE Network,KYVE,27766 +coinmarketcap,Concordium,CCD,18031 +coinmarketcap,Graphlinq Chain,GLQ,9029 +coinmarketcap,Zano,ZANO,4691 +coinmarketcap,AirDAO,AMB,2081 +coinmarketcap,Taraxa,TARA,8715 +coinmarketcap,OctaSpace,OCTA,24261 +coinmarketcap,FIO Protocol,FIO,5865 +coinmarketcap,Abelian,ABEL,25232 +coinmarketcap,Openfabric AI,OFN,28245 +coinmarketcap,Neurai,XNA,27195 +coinmarketcap,Phantasma,SOUL,2827 +coinmarketcap,Matrix AI Network,MAN,2474 +coinmarketcap,Radiant,RXD,22866 +coinmarketcap,Electra Protocol,XEP,8216 +coinmarketcap,Fuse,FUSE,5634 +coinmarketcap,Saito,SAITO,9194 +coinmarketcap,Nexa,NEXA,23380 +coinmarketcap,Areon Network,AREA,23262 +coinmarketcap,Script Network,SCPT,12621 +coinmarketcap,Humanode,HMND,23806 +coinmarketcap,Zenon,ZNN,4003 +coinmarketcap,MultiVAC,MTV,3853 +coinmarketcap,MainnetZ,NetZ,28659 +coinmarketcap,Epic Cash,EPIC,5435 +coinmarketcap,Integritee Network,TEER,13323 +coinmarketcap,Interlay,INTR,20366 +coinmarketcap,Busy DAO,BUSY,9002 +coinmarketcap,Qubic,QUBIC,29169 +coinmarketcap,Picasso,PICA,13678 +coinmarketcap,QANplatform,QANX,5858 +coinmarketcap,Evmos,EVMOS,19899 +coinmarketcap,CANTO,CANTO,21516 +coinmarketcap,LiquidLayer,LILA,28498 +coinmarketcap,MELD,MELD,13397 +coinmarketcap,ULTRON,ULX,21524 +coinmarketcap,Chihuahua,HUAHUA,17208 +coinmarketcap,Karlsen,KLS,29968 +coinmarketcap,Jackal Protocol,JKL,25261 +coinmarketcap,THORChain,RUNE,4157,18 +coinmarketcap,Axelar,AXL,17799,18 +coinmarketcap,Osmosis,OSMO,12220,18 \ No newline at end of file diff --git a/macros/tests/sequence_gaps.sql b/macros/tests/sequence_gaps.sql index aac5ce2..2357951 100644 --- a/macros/tests/sequence_gaps.sql +++ b/macros/tests/sequence_gaps.sql @@ -86,14 +86,14 @@ ORDER BY {%- set previous_column = "prev_" ~ column_name -%} WITH base_source AS ( SELECT - {{ partition_by_1 }}, - {{ partition_by_2 }}, + {{ partition_by_1 }} + {% if partition_by_2 %}, {{ partition_by_2 }} {% endif %}, {{ column_name }}, LAG( {{ column_name }}, 1 ) over ( - PARTITION BY LOWER({{ partition_by_1 }}), {{ partition_by_2 }} + PARTITION BY LOWER({{ partition_by_1 }}) {% if partition_by_2 %}, {{ partition_by_2 }} {% endif %} ORDER BY {{ column_name }} ASC ) AS {{ previous_column }} @@ -104,8 +104,8 @@ ORDER BY {% endif %} ) SELECT - {{ partition_by_1 }}, - {{ partition_by_2 }}, + {{ partition_by_1 }} + {% if partition_by_2 %}, {{ partition_by_2 }} {% endif %}, {{ previous_column }}, {{ column_name }}, DATEDIFF( diff --git a/models/gold/price_temp/price_temp__ez_asset_metadata_temp.sql b/models/gold/price_temp/price_temp__ez_asset_metadata_temp.sql index ff1308a..fe7842c 100644 --- a/models/gold/price_temp/price_temp__ez_asset_metadata_temp.sql +++ b/models/gold/price_temp/price_temp__ez_asset_metadata_temp.sql @@ -14,9 +14,28 @@ SELECT blockchain, blockchain_name, blockchain_id, + FALSE AS is_native, is_deprecated, inserted_timestamp, modified_timestamp, complete_token_asset_metadata_id AS ez_asset_metadata_id FROM {{ ref('silver__complete_token_asset_metadata') }} +UNION ALL +SELECT + NULL AS token_address, + asset_id AS id, -- id column pending deprecation + asset_id, + symbol, + name, + decimals, + blockchain, + blockchain AS blockchain_name, + blockchain AS blockchain_id, + TRUE AS is_native, + is_deprecated, + inserted_timestamp, + modified_timestamp, + complete_native_asset_metadata_id AS ez_asset_metadata_id +FROM + {{ ref('silver__complete_native_asset_metadata')}} \ No newline at end of file diff --git a/models/gold/price_temp/price_temp__ez_asset_metadata_temp.yml b/models/gold/price_temp/price_temp__ez_asset_metadata_temp.yml index 1769e58..daa54a0 100644 --- a/models/gold/price_temp/price_temp__ez_asset_metadata_temp.yml +++ b/models/gold/price_temp/price_temp__ez_asset_metadata_temp.yml @@ -13,7 +13,7 @@ models: - name: SYMBOL description: The symbol of asset. - name: TOKEN_ADDRESS - description: The specific address representing the asset in a specific platform. + description: Address of the token. This will be NULL if referring to a native asset. - name: BLOCKCHAIN description: The Blockchain, Network, or Platform where this asset has a token address. This column is derived by manually grouping blockchain_name, for reference purposes. - name: BLOCKCHAIN_NAME @@ -21,7 +21,9 @@ models: - name: BLOCKCHAIN_ID description: The unique identifier of the Blockchain, Network, or Platform where this asset has a token address. - name: DECIMALS - description: The number of decimal places the token needs adjusted where token values exist. + description: The number of decimals for token contract. May be NULL if referring to a native asset. + - name: IS_NATIVE + description: A flag indicating assets native to the respective blockchain. - name: IS_DEPRECATED description: A flag indicating if the asset is deprecated or no longer supported by the provider. - name: EZ_ASSET_METADATA_ID diff --git a/models/gold/price_temp/price_temp__ez_hourly_token_prices_temp.sql b/models/gold/price_temp/price_temp__ez_hourly_token_prices_temp.sql index c1c3e94..cce60e2 100644 --- a/models/gold/price_temp/price_temp__ez_hourly_token_prices_temp.sql +++ b/models/gold/price_temp/price_temp__ez_hourly_token_prices_temp.sql @@ -8,11 +8,13 @@ SELECT HOUR, token_address, symbol, + name, decimals, price, blockchain, blockchain_name, blockchain_id, + FALSE AS is_native, is_imputed, is_deprecated, inserted_timestamp, @@ -20,3 +22,22 @@ SELECT complete_token_prices_id AS ez_hourly_token_prices_id FROM {{ ref('silver__complete_token_prices') }} +UNION ALL +SELECT + HOUR, + NULL AS token_address, + symbol, + name, + decimals, + price, + blockchain, + blockchain AS blockchain_name, + blockchain AS blockchain_id, + TRUE AS is_native, + is_imputed, + is_deprecated, + inserted_timestamp, + modified_timestamp, + complete_native_prices_id AS ez_hourly_native_prices_id +FROM + {{ ref('silver__complete_native_prices') }} \ No newline at end of file diff --git a/models/gold/price_temp/price_temp__ez_hourly_token_prices_temp.yml b/models/gold/price_temp/price_temp__ez_hourly_token_prices_temp.yml index ab859ba..da6d578 100644 --- a/models/gold/price_temp/price_temp__ez_hourly_token_prices_temp.yml +++ b/models/gold/price_temp/price_temp__ez_hourly_token_prices_temp.yml @@ -7,19 +7,21 @@ models: - name: HOUR description: Hour that the price was recorded at - name: TOKEN_ADDRESS - description: Address of the token + description: Address of the token. This will be NULL if referring to a native asset. - name: SYMBOL description: Symbol of the token - name: DECIMALS - description: The number of decimals for token contract + description: The number of decimals for token contract. May be NULL if referring to a native asset. - name: BLOCKCHAIN - description: The Blockchain, Network, or Platform where this asset has a token address. This column is derived by manually grouping blockchain_name, for reference purposes. + description: The Blockchain, Network, or Platform where this asset has a token address. This column is derived by manually grouping blockchain_name, for reference purposes. - name: BLOCKCHAIN_NAME description: The name of the Blockchain, Network, or Platform where this asset has a token address. This may differ by provider. - name: BLOCKCHAIN_ID description: The unique identifier of the Blockchain, Network, or Platform where this asset has a token address. - name: PRICE description: Closing price of the recorded hour in USD + - name: IS_NATIVE + description: A flag indicating assets native to the respective blockchain. - name: IS_IMPUTED description: A flag indicating if the price was imputed, or derived, from the last arriving record. This is generally used for tokens with low-liquidity or inconsistent reporting. - name: IS_DEPRECATED diff --git a/models/silver/prices/all_providers2/silver__native_asset_metadata_all_providers.sql b/models/silver/prices/all_providers2/silver__native_asset_metadata_all_providers.sql new file mode 100644 index 0000000..b863a46 --- /dev/null +++ b/models/silver/prices/all_providers2/silver__native_asset_metadata_all_providers.sql @@ -0,0 +1,103 @@ +{{ config( + materialized = 'incremental', + unique_key = ['native_asset_metadata_all_providers_id'], + incremental_strategy = 'delete+insert', + cluster_by = ['_inserted_timestamp::DATE'], + tags = ['prices'] +) }} + +WITH coin_gecko AS ( + + SELECT + id, + NAME, + symbol, + decimals, + 'coingecko' AS provider, + source, + is_deprecated, + _inserted_timestamp + FROM + {{ ref( + 'silver__native_asset_metadata_coingecko' + ) }} + +{% if is_incremental() %} +WHERE + _inserted_timestamp >= ( + SELECT + MAX(_inserted_timestamp) + FROM + {{ this }} + ) + OR id NOT IN ( + SELECT + DISTINCT id + FROM + {{ this }} + ) --load all data for new assets +{% endif %} +), +coin_market_cap AS ( + SELECT + id, + NAME, + symbol, + decimals, + 'coinmarketcap' AS provider, + source, + is_deprecated, + _inserted_timestamp + FROM + {{ ref( + 'silver__native_asset_metadata_coinmarketcap' + ) }} + +{% if is_incremental() %} +WHERE + _inserted_timestamp >= ( + SELECT + MAX(_inserted_timestamp) + FROM + {{ this }} + ) + OR id NOT IN ( + SELECT + DISTINCT id + FROM + {{ this }} + ) --load all data for new assets +{% endif %} +), +all_providers AS ( + SELECT + * + FROM + coin_gecko + UNION ALL + SELECT + * + FROM + coin_market_cap +) +SELECT + id, + CASE + WHEN NAME ilike 'bnb' THEN 'bsc' + ELSE NAME + END AS blockchain, + symbol, + name, + decimals, + provider, + source, + is_deprecated, + _inserted_timestamp, + SYSDATE() AS inserted_timestamp, + SYSDATE() AS modified_timestamp, + {{ dbt_utils.generate_surrogate_key(['symbol','provider']) }} AS native_asset_metadata_all_providers_id, + '{{ invocation_id }}' AS _invocation_id +FROM + all_providers p qualify(ROW_NUMBER() over (PARTITION BY symbol, provider +ORDER BY + _inserted_timestamp DESC)) = 1 diff --git a/models/silver/prices/all_providers2/silver__native_asset_metadata_all_providers.yml b/models/silver/prices/all_providers2/silver__native_asset_metadata_all_providers.yml new file mode 100644 index 0000000..595162e --- /dev/null +++ b/models/silver/prices/all_providers2/silver__native_asset_metadata_all_providers.yml @@ -0,0 +1,21 @@ +version: 2 +models: + - name: silver__native_asset_metadata_all_providers + tests: + - dbt_utils.unique_combination_of_columns: + combination_of_columns: + - SYMBOL + - PROVIDER + columns: + - name: PROVIDER + tests: + - not_null + - name: ID + tests: + - not_null + - name: SYMBOL + tests: + - not_null + - name: BLOCKCHAIN + tests: + - not_null \ No newline at end of file diff --git a/models/silver/prices/all_providers2/silver__native_asset_metadata_priority.sql b/models/silver/prices/all_providers2/silver__native_asset_metadata_priority.sql new file mode 100644 index 0000000..031567f --- /dev/null +++ b/models/silver/prices/all_providers2/silver__native_asset_metadata_priority.sql @@ -0,0 +1,61 @@ +{{ config( + materialized = 'incremental', + unique_key = ['native_asset_metadata_priority_id'], + incremental_strategy = 'delete+insert', + post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION on equality(symbol, blockchain)", + tags = ['prices'] +) }} + +WITH all_providers AS ( + SELECT + id, + symbol, + name, + decimals, + blockchain, + provider, + CASE + WHEN provider = 'coingecko' THEN 1 + WHEN provider = 'coinmarketcap' THEN 2 + END AS priority, + source, + is_deprecated, + _inserted_timestamp + FROM + {{ ref('silver__native_asset_metadata_all_providers') }} + +{% if is_incremental() %} +WHERE + _inserted_timestamp >= ( + SELECT + MAX(_inserted_timestamp) - INTERVAL '4 hours' + FROM + {{ this }} + ) + OR symbol NOT IN ( + SELECT + DISTINCT symbol + FROM + {{ this }} + ) --load all data for new assets +{% endif %} +) +SELECT + id, + symbol, + name, + decimals, + blockchain, + provider, + priority, + source, + is_deprecated, + _inserted_timestamp, + SYSDATE() AS inserted_timestamp, + SYSDATE() AS modified_timestamp, + {{ dbt_utils.generate_surrogate_key(['symbol']) }} AS native_asset_metadata_priority_id, + '{{ invocation_id }}' AS _invocation_id +FROM + all_providers qualify(ROW_NUMBER() over (PARTITION BY symbol +ORDER BY + _inserted_timestamp DESC, priority ASC, blockchain ASC, id ASC)) = 1 -- select the last inserted record (latest supported provider), then by priority etc. diff --git a/models/silver/prices/all_providers2/silver__native_asset_metadata_priority.yml b/models/silver/prices/all_providers2/silver__native_asset_metadata_priority.yml new file mode 100644 index 0000000..54d3b48 --- /dev/null +++ b/models/silver/prices/all_providers2/silver__native_asset_metadata_priority.yml @@ -0,0 +1,21 @@ +version: 2 +models: + - name: silver__native_asset_metadata_priority + tests: + - dbt_utils.unique_combination_of_columns: + combination_of_columns: + - SYMBOL + + columns: + - name: PROVIDER + tests: + - not_null + - name: ID + tests: + - not_null + - name: SYMBOL + tests: + - not_null + - name: BLOCKCHAIN + tests: + - not_null \ No newline at end of file diff --git a/models/silver/prices/all_providers2/silver__native_prices_all_providers.sql b/models/silver/prices/all_providers2/silver__native_prices_all_providers.sql new file mode 100644 index 0000000..6ca8fe7 --- /dev/null +++ b/models/silver/prices/all_providers2/silver__native_prices_all_providers.sql @@ -0,0 +1,105 @@ +{{ config( + materialized = 'incremental', + unique_key = ['native_prices_all_providers_id'], + incremental_strategy = 'delete+insert', + cluster_by = ['recorded_hour::DATE'], + tags = ['prices'] +) }} + +WITH coin_gecko AS ( + + SELECT + recorded_hour, + symbol, + id, + NAME, + decimals, + CLOSE AS price, + is_imputed, + 'coingecko' AS provider, + source, + _inserted_timestamp + FROM + {{ ref('silver__native_prices_coingecko') }} + +{% if is_incremental() %} +WHERE + _inserted_timestamp >= ( + SELECT + MAX(_inserted_timestamp) + FROM + {{ this }} + ) + OR id NOT IN ( + SELECT + DISTINCT id + FROM + {{ this }} + ) --load all data for new assets +{% endif %} +), +coin_market_cap AS ( + SELECT + recorded_hour, + symbol, + id, + NAME, + decimals, + CLOSE AS price, + is_imputed, + 'coinmarketcap' AS provider, + source, + _inserted_timestamp + FROM + {{ ref('silver__native_prices_coinmarketcap') }} + +{% if is_incremental() %} +WHERE + _inserted_timestamp >= ( + SELECT + MAX(_inserted_timestamp) + FROM + {{ this }} + ) + OR id NOT IN ( + SELECT + DISTINCT id + FROM + {{ this }} + ) --load all data for new assets +{% endif %} +), +all_providers AS ( + SELECT + * + FROM + coin_gecko + UNION ALL + SELECT + * + FROM + coin_market_cap +) +SELECT + recorded_hour, + symbol, + name, + id, + decimals, + CASE + WHEN NAME ilike 'bnb' THEN 'bsc' + ELSE NAME + END AS blockchain, + price, + is_imputed, + provider, + source, + _inserted_timestamp, + SYSDATE() AS inserted_timestamp, + SYSDATE() AS modified_timestamp, + {{ dbt_utils.generate_surrogate_key(['recorded_hour','symbol','provider']) }} AS native_prices_all_providers_id, + '{{ invocation_id }}' AS _invocation_id +FROM + all_providers qualify(ROW_NUMBER() over (PARTITION BY recorded_hour, symbol, provider +ORDER BY + _inserted_timestamp DESC)) = 1 diff --git a/models/silver/prices/all_providers2/silver__native_prices_all_providers.yml b/models/silver/prices/all_providers2/silver__native_prices_all_providers.yml new file mode 100644 index 0000000..c27ff9b --- /dev/null +++ b/models/silver/prices/all_providers2/silver__native_prices_all_providers.yml @@ -0,0 +1,36 @@ +version: 2 +models: + - name: silver__native_prices_all_providers + tests: + - dbt_utils.unique_combination_of_columns: + combination_of_columns: + - RECORDED_HOUR + - SYMBOL + - PROVIDER + columns: + - name: RECORDED_HOUR + tests: + - dbt_expectations.expect_row_values_to_have_recent_data: + datepart: day + interval: 1 + - name: ID + tests: + - not_null + - name: SYMBOL + tests: + - not_null + - name: BLOCKCHAIN + tests: + - not_null + - name: PROVIDER + tests: + - not_null + - name: PRICE + tests: + - not_null + - name: IS_IMPUTED + tests: + - not_null + - name: _INSERTED_TIMESTAMP + tests: + - not_null \ No newline at end of file diff --git a/models/silver/prices/all_providers2/silver__native_prices_priority.sql b/models/silver/prices/all_providers2/silver__native_prices_priority.sql new file mode 100644 index 0000000..e186284 --- /dev/null +++ b/models/silver/prices/all_providers2/silver__native_prices_priority.sql @@ -0,0 +1,304 @@ +-- depends_on: {{ ref('core__dim_date_hours') }} +-- depends_on: {{ ref('silver__native_asset_metadata_priority') }} +{{ config( + materialized = 'incremental', + unique_key = ['native_prices_priority_id'], + incremental_strategy = 'delete+insert', + cluster_by = ['recorded_hour::DATE'], + post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION on equality(symbol, recorded_hour, blockchain)", + tags = ['prices'] +) }} + +WITH priority_prices AS ( + -- get all prices and qualify by priority + + SELECT + recorded_hour, + symbol, + name, + id, + decimals, + blockchain, + price, + is_imputed, + provider, + CASE + WHEN provider = 'coingecko' + AND is_imputed = FALSE THEN 1 + WHEN provider = 'coinmarketcap' + AND is_imputed = FALSE THEN 2 + WHEN provider = 'coingecko' + AND is_imputed = TRUE THEN 3 + WHEN provider = 'coinmarketcap' + AND is_imputed = TRUE THEN 4 + END AS priority, + source, + _inserted_timestamp + FROM + {{ ref('silver__native_prices_all_providers') }} + +{% if is_incremental() %} +WHERE + _inserted_timestamp >= ( + SELECT + MAX(_inserted_timestamp) + FROM + {{ this }} + ) + OR symbol NOT IN ( + SELECT + DISTINCT symbol + FROM + {{ this }} + ) --load all data for new assets +{% endif %} + +qualify(ROW_NUMBER() over (PARTITION BY recorded_hour, symbol +ORDER BY + priority ASC, id ASC, blockchain ASC, _inserted_timestamp DESC)) = 1 +) + +{% if is_incremental() %}, +price_gaps AS ( + -- identify missing prices by symbol, gaps most likely to exist between providers + SELECT + symbol, + recorded_hour, + prev_recorded_hour, + gap + FROM + ( + SELECT + symbol, + recorded_hour, + LAG( + recorded_hour, + 1 + ) over ( + PARTITION BY symbol, + blockchain + ORDER BY + recorded_hour ASC + ) AS prev_RECORDED_HOUR, + DATEDIFF( + HOUR, + prev_RECORDED_HOUR, + recorded_hour + ) - 1 AS gap + FROM + {{ this }} + ) + WHERE + gap > 0 +), +native_asset_metadata AS ( + -- get all token metadata for tokens with missing prices + SELECT + symbol, + name, + decimals, + _inserted_timestamp + FROM + {{ ref( + 'silver__native_asset_metadata_priority' + ) }} + WHERE + symbol IN ( + SELECT + symbol + FROM + price_gaps + ) +), +latest_supported_assets AS ( + --get the latest supported timestamp for each asset with missing prices + SELECT + symbol, + DATE_TRUNC('hour', MAX(_inserted_timestamp)) AS last_supported_timestamp + FROM + native_asset_metadata + GROUP BY + 1), + date_hours AS ( + SELECT + date_hour, + symbol, + name, + decimals + FROM + {{ ref('core__dim_date_hours') }} + CROSS JOIN native_asset_metadata + WHERE + date_hour <= ( + SELECT + MAX(recorded_hour) + FROM + price_gaps + ) + AND date_hour >= ( + SELECT + MIN(prev_recorded_hour) + FROM + price_gaps + ) + ), + imputed_prices AS ( + -- impute missing prices + SELECT + d.date_hour, + d.symbol, + d.name, + d.decimals, + CASE + WHEN d.date_hour <= s.last_supported_timestamp THEN p.price + ELSE NULL + END AS hourly_price, + CASE + WHEN hourly_price IS NULL + AND ( + d.date_hour <= s.last_supported_timestamp + ) THEN LAST_VALUE( + hourly_price ignore nulls + ) over ( + PARTITION BY d.symbol + ORDER BY + d.date_hour rows BETWEEN unbounded preceding + AND CURRENT ROW + ) + ELSE NULL + END AS imputed_price, + CASE + WHEN imputed_price IS NOT NULL THEN TRUE + ELSE p.is_imputed + END AS imputed, + COALESCE( + hourly_price, + imputed_price + ) AS final_price, + CASE + WHEN imputed_price IS NOT NULL THEN LAST_VALUE( + p.blockchain ignore nulls + ) over ( + PARTITION BY d.symbol + ORDER BY + d.date_hour rows BETWEEN unbounded preceding + AND CURRENT ROW + ) + ELSE p.blockchain + END AS blockchain, + CASE + WHEN imputed_price IS NOT NULL THEN LAST_VALUE( + p.id ignore nulls + ) over ( + PARTITION BY d.symbol + ORDER BY + d.date_hour rows BETWEEN unbounded preceding + AND CURRENT ROW + ) + ELSE p.id + END AS id, + CASE + WHEN imputed_price IS NOT NULL THEN LAST_VALUE( + p.provider ignore nulls + ) over ( + PARTITION BY d.symbol + ORDER BY + d.date_hour rows BETWEEN unbounded preceding + AND CURRENT ROW + ) + ELSE p.provider + END AS provider, + CASE + WHEN imputed_price IS NOT NULL THEN 'imputed_priority' + ELSE p.source + END AS source, + CASE + WHEN imputed_price IS NOT NULL THEN 7 + ELSE p.priority + END AS priority, + CASE + WHEN imputed_price IS NOT NULL THEN SYSDATE() + ELSE p._inserted_timestamp + END AS _inserted_timestamp + FROM + date_hours d + LEFT JOIN {{ this }} + p + ON LOWER( + d.symbol + ) = LOWER( + p.symbol + ) + AND d.date_hour = p.recorded_hour + LEFT JOIN latest_supported_assets s + ON LOWER( + d.symbol + ) = LOWER( + s.symbol + ) + ) + {% endif %}, + FINAL AS ( + SELECT + recorded_hour, + symbol, + name, + decimals, + blockchain, + price, + is_imputed, + id, + provider, + priority, + source, + _inserted_timestamp + FROM + priority_prices + +{% if is_incremental() %} +UNION ALL +SELECT + date_hour AS recorded_hour, + symbol, + name, + decimals, + blockchain, + final_price AS price, + imputed AS is_imputed, + id, + provider, + priority, + source, + _inserted_timestamp +FROM + imputed_prices +WHERE + price IS NOT NULL + AND is_imputed +{% endif %} +) +SELECT + recorded_hour, + symbol, + name, + decimals, + blockchain, + price, + is_imputed, + id, + provider, + priority, + source, + _inserted_timestamp, + SYSDATE() AS inserted_timestamp, + SYSDATE() AS modified_timestamp, + {{ dbt_utils.generate_surrogate_key(['recorded_hour','symbol']) }} AS native_prices_priority_id, + '{{ invocation_id }}' AS _invocation_id +FROM + FINAL + +{% if is_incremental() %} +qualify(ROW_NUMBER() over (PARTITION BY recorded_hour, symbol +ORDER BY + priority ASC, id ASC, _inserted_timestamp DESC)) = 1 +{% endif %} diff --git a/models/silver/prices/all_providers2/silver__native_prices_priority.yml b/models/silver/prices/all_providers2/silver__native_prices_priority.yml new file mode 100644 index 0000000..deca20c --- /dev/null +++ b/models/silver/prices/all_providers2/silver__native_prices_priority.yml @@ -0,0 +1,39 @@ +version: 2 +models: + - name: silver__native_prices_priority + tests: + - dbt_utils.unique_combination_of_columns: + combination_of_columns: + - RECORDED_HOUR + - SYMBOL + + columns: + - name: RECORDED_HOUR + tests: + - dbt_expectations.expect_row_values_to_have_recent_data: + datepart: day + interval: 1 + - price_hour_sequence_gaps: + partition_by_1: SYMBOL + column_name: RECORDED_HOUR + - name: ID + tests: + - not_null + - name: SYMBOL + tests: + - not_null + - name: BLOCKCHAIN + tests: + - not_null + - name: PROVIDER + tests: + - not_null + - name: PRICE + tests: + - not_null + - name: IS_IMPUTED + tests: + - not_null + - name: _INSERTED_TIMESTAMP + tests: + - not_null \ No newline at end of file diff --git a/models/silver/prices/coin_gecko2/silver__native_asset_metadata_coingecko.sql b/models/silver/prices/coin_gecko2/silver__native_asset_metadata_coingecko.sql new file mode 100644 index 0000000..677ffd1 --- /dev/null +++ b/models/silver/prices/coin_gecko2/silver__native_asset_metadata_coingecko.sql @@ -0,0 +1,125 @@ +{{ config( + materialized = 'incremental', + unique_key = ['native_asset_metadata_coingecko_id'], + incremental_strategy = 'delete+insert', + cluster_by = ['_inserted_timestamp::DATE'], + tags = ['prices'] +) }} + +WITH base_assets AS ( + -- get all native asset metdata + + SELECT + A.id, + LOWER( + A.name + ) AS NAME, + LOWER( + A.symbol + ) AS symbol, + decimals, + source, + _inserted_timestamp + FROM + {{ ref('bronze__all_asset_metadata_coingecko2') }} A + INNER JOIN {{ ref('silver__native_assets_seed') }} + n + ON LOWER( + A.id + ) = LOWER( + n.id + ) + AND LOWER( + A.name + ) = LOWER( + n.name + ) + AND LOWER( + A.symbol + ) = LOWER(n.symbol) + +{% if is_incremental() %} +WHERE + _inserted_timestamp >= ( + SELECT + MAX(_inserted_timestamp) + FROM + {{ this }} + ) + OR A.id NOT IN ( + SELECT + DISTINCT id + FROM + {{ this }} + ) --load all data for new assets +{% endif %} +), +current_supported_assets AS ( + -- get all assets currently supported + SELECT + symbol, + _inserted_timestamp + FROM + base_assets + WHERE + _inserted_timestamp = ( + SELECT + MAX(_inserted_timestamp) + FROM + base_assets + ) +), +base_adj AS ( + -- make generic adjustments to asset metadata + SELECT + LOWER( + CASE + WHEN LENGTH(TRIM(A.id)) <= 0 THEN NULL + ELSE TRIM( + A.id + ) + END + ) AS id_adj, + CASE + WHEN LENGTH(TRIM(A.name)) <= 0 THEN NULL + ELSE TRIM( + A.name + ) + END AS name_adj, + CASE + WHEN LENGTH(TRIM(A.symbol)) <= 0 THEN NULL + ELSE TRIM( + A.symbol + ) + END AS symbol_adj, + decimals, + source, + CASE + WHEN C.symbol IS NOT NULL THEN FALSE + ELSE TRUE + END AS is_deprecated, + A._inserted_timestamp + FROM + base_assets A + LEFT JOIN current_supported_assets C + ON A.symbol = C.symbol +) +SELECT + id_adj AS id, + name_adj AS NAME, + symbol_adj AS symbol, + decimals, + source, + is_deprecated, + _inserted_timestamp, + SYSDATE() AS inserted_timestamp, + SYSDATE() AS modified_timestamp, + {{ dbt_utils.generate_surrogate_key(['symbol']) }} AS native_asset_metadata_coingecko_id, + '{{ invocation_id }}' AS _invocation_id +FROM + base_adj +WHERE + symbol IS NOT NULL + AND NAME IS NOT NULL qualify(ROW_NUMBER() over (PARTITION BY symbol +ORDER BY + _inserted_timestamp DESC)) = 1 -- built for native assets diff --git a/models/silver/prices/coin_gecko2/silver__native_asset_metadata_coingecko.yml b/models/silver/prices/coin_gecko2/silver__native_asset_metadata_coingecko.yml new file mode 100644 index 0000000..864f37a --- /dev/null +++ b/models/silver/prices/coin_gecko2/silver__native_asset_metadata_coingecko.yml @@ -0,0 +1,21 @@ +version: 2 +models: + - name: silver__native_asset_metadata_coingecko + tests: + - dbt_utils.unique_combination_of_columns: + combination_of_columns: + - SYMBOL + columns: + - name: ID + tests: + - not_null + - name: NAME + tests: + - not_null + - name: SYMBOL + tests: + - not_null + - name: _INSERTED_TIMESTAMP + tests: + - not_null + \ No newline at end of file diff --git a/models/silver/prices/coin_gecko2/silver__native_prices_coingecko.sql b/models/silver/prices/coin_gecko2/silver__native_prices_coingecko.sql new file mode 100644 index 0000000..12cf2ce --- /dev/null +++ b/models/silver/prices/coin_gecko2/silver__native_prices_coingecko.sql @@ -0,0 +1,253 @@ +-- depends_on: {{ ref('core__dim_date_hours') }} +{{ config( + materialized = 'incremental', + unique_key = ['native_prices_coingecko_id'], + incremental_strategy = 'delete+insert', + cluster_by = ['recorded_hour::DATE'], + tags = ['prices'] +) }} + +WITH base_prices AS ( + -- get all prices and join to asset metadata + + SELECT + p.recorded_hour, + m.symbol, + p.id, + m.name, + m.decimals, + p.close, + p.source, + p._inserted_timestamp + FROM + {{ ref( + 'bronze__all_prices_coingecko2' + ) }} + p + INNER JOIN {{ ref( + 'silver__native_asset_metadata_coingecko' + ) }} + m + ON m.id = LOWER(TRIM(p.id)) + WHERE + ( + p.close <> 0 + AND p.recorded_hour :: DATE <> '1970-01-01' + ) + +{% if is_incremental() %} +AND ( + p._inserted_timestamp >= ( + SELECT + MAX(_inserted_timestamp) + FROM + {{ this }} + ) + OR p.id NOT IN ( + SELECT + DISTINCT id + FROM + {{ this }} + ) --load all data for new assets +) +{% endif %} +), +latest_supported_assets AS ( + -- get the latest supported timestamp for each asset + SELECT + symbol, + DATE_TRUNC('hour', MAX(_inserted_timestamp)) AS last_supported_timestamp + FROM + {{ ref( + 'silver__native_asset_metadata_coingecko' + ) }} + GROUP BY + 1 +) + +{% if is_incremental() %}, +price_gaps AS ( + -- identify missing prices by symbol + SELECT + symbol, + recorded_hour, + prev_recorded_hour, + gap + FROM + ( + SELECT + symbol, + recorded_hour, + LAG( + recorded_hour, + 1 + ) over ( + PARTITION BY symbol + ORDER BY + recorded_hour ASC + ) AS prev_RECORDED_HOUR, + DATEDIFF( + HOUR, + prev_RECORDED_HOUR, + recorded_hour + ) - 1 AS gap + FROM + {{ this }} + ) + WHERE + gap > 0 +), +native_asset_metadata AS ( + -- get all metadata for assets with missing prices + SELECT + symbol, + id, + NAME, + decimals, + _inserted_timestamp + FROM + {{ ref( + 'silver__native_asset_metadata_coingecko' + ) }} + WHERE + symbol IN ( + SELECT + symbol + FROM + price_gaps + ) +), +date_hours AS ( + -- generate spine of all possible hours, between gaps + SELECT + date_hour, + symbol, + id, + NAME, + decimals + FROM + {{ ref('core__dim_date_hours') }} + CROSS JOIN native_asset_metadata + WHERE + date_hour <= ( + SELECT + MAX(recorded_hour) + FROM + price_gaps + ) + AND date_hour >= ( + SELECT + MIN(prev_recorded_hour) + FROM + price_gaps + ) +), +imputed_prices AS ( + -- impute missing prices + SELECT + d.date_hour, + d.symbol, + d.id, + d.name, + d.decimals, + CASE + WHEN d.date_hour <= s.last_supported_timestamp THEN p.close + ELSE NULL + END AS hourly_price, + CASE + WHEN hourly_price IS NULL + AND d.date_hour <= s.last_supported_timestamp THEN LAST_VALUE( + hourly_price ignore nulls + ) over ( + PARTITION BY d.symbol + ORDER BY + d.date_hour rows BETWEEN unbounded preceding + AND CURRENT ROW + ) + ELSE NULL + END AS imputed_price, + CASE + WHEN imputed_price IS NOT NULL THEN TRUE + ELSE p.is_imputed + END AS imputed, + COALESCE( + hourly_price, + imputed_price + ) AS final_price, + CASE + WHEN imputed_price IS NOT NULL THEN 'imputed_cg' + ELSE p.source + END AS source, + CASE + WHEN imputed_price IS NOT NULL THEN SYSDATE() + ELSE p._inserted_timestamp + END AS _inserted_timestamp + FROM + date_hours d + LEFT JOIN {{ this }} + p + ON d.symbol = p.symbol + AND d.date_hour = p.recorded_hour + LEFT JOIN latest_supported_assets s + ON d.symbol = s.symbol +) +{% endif %}, +FINAL AS ( + SELECT + p.recorded_hour, + p.symbol, + p.id, + p.name, + p.decimals, + CASE + WHEN p.recorded_hour <= s.last_supported_timestamp THEN p.close + ELSE NULL + END AS close_price, + -- only include prices during supported ranges + FALSE AS is_imputed, + p.source, + p._inserted_timestamp + FROM + base_prices p + LEFT JOIN latest_supported_assets s + ON p.symbol = s.symbol + WHERE + close_price IS NOT NULL + +{% if is_incremental() %} +UNION ALL +SELECT + date_hour AS recorded_hour, + symbol, + id, + NAME, + decimals, + final_price AS close_price, + imputed AS is_imputed, + source, + _inserted_timestamp +FROM + imputed_prices +WHERE + close_price IS NOT NULL + AND is_imputed +{% endif %} +) +SELECT + recorded_hour, + symbol, + id, + NAME, + decimals, + close_price AS CLOSE, + is_imputed, + source, + _inserted_timestamp, + SYSDATE() AS inserted_timestamp, + SYSDATE() AS modified_timestamp, + {{ dbt_utils.generate_surrogate_key(['recorded_hour','symbol']) }} AS native_prices_coingecko_id, + '{{ invocation_id }}' AS _invocation_id +FROM + FINAL qualify(ROW_NUMBER() over (PARTITION BY recorded_hour, symbol +ORDER BY + _inserted_timestamp DESC)) = 1 diff --git a/models/silver/prices/coin_gecko2/silver__native_prices_coingecko.yml b/models/silver/prices/coin_gecko2/silver__native_prices_coingecko.yml new file mode 100644 index 0000000..29c5a35 --- /dev/null +++ b/models/silver/prices/coin_gecko2/silver__native_prices_coingecko.yml @@ -0,0 +1,39 @@ +version: 2 +models: + - name: silver__native_prices_coingecko + tests: + - dbt_utils.unique_combination_of_columns: + combination_of_columns: + - RECORDED_HOUR + - SYMBOL + + columns: + - name: RECORDED_HOUR + tests: + - price_hour_sequence_gaps: + partition_by_1: SYMBOL + column_name: RECORDED_HOUR + - dbt_expectations.expect_row_values_to_have_recent_data: + datepart: day + interval: 1 + - name: ID + tests: + - not_null + - name: SYMBOL + tests: + - not_null + - name: NAME + tests: + - not_null + - name: CLOSE + tests: + - not_null + - name: IS_IMPUTED + tests: + - not_null + - name: NATIVE_PRICES_COINGECKO_ID + tests: + - not_null + - name: _INSERTED_TIMESTAMP + tests: + - not_null \ No newline at end of file diff --git a/models/silver/prices/coin_market_cap2/silver__native_asset_metadata_coinmarketcap.sql b/models/silver/prices/coin_market_cap2/silver__native_asset_metadata_coinmarketcap.sql new file mode 100644 index 0000000..bb22e52 --- /dev/null +++ b/models/silver/prices/coin_market_cap2/silver__native_asset_metadata_coinmarketcap.sql @@ -0,0 +1,125 @@ +{{ config( + materialized = 'incremental', + unique_key = ['native_asset_metadata_coinmarketcap_id'], + incremental_strategy = 'delete+insert', + cluster_by = ['_inserted_timestamp::DATE'], + tags = ['prices'] +) }} + +WITH base_assets AS ( + -- get all native asset metdata + + SELECT + A.id, + LOWER( + A.name + ) AS NAME, + LOWER( + A.symbol + ) AS symbol, + decimals, + source, + _inserted_timestamp + FROM + {{ ref('bronze__all_asset_metadata_coinmarketcap2') }} A + INNER JOIN {{ ref('silver__native_assets_seed') }} + n + ON LOWER( + A.id + ) = LOWER( + n.id + ) + AND LOWER( + A.name + ) = LOWER( + n.name + ) + AND LOWER( + A.symbol + ) = LOWER(n.symbol) + +{% if is_incremental() %} +WHERE + _inserted_timestamp >= ( + SELECT + MAX(_inserted_timestamp) + FROM + {{ this }} + ) + OR A.id NOT IN ( + SELECT + DISTINCT id + FROM + {{ this }} + ) --load all data for new assets +{% endif %} +), +current_supported_assets AS ( + -- get all assets currently supported + SELECT + symbol, + _inserted_timestamp + FROM + base_assets + WHERE + _inserted_timestamp = ( + SELECT + MAX(_inserted_timestamp) + FROM + base_assets + ) +), +base_adj AS ( + -- make generic adjustments to asset metadata + SELECT + LOWER( + CASE + WHEN LENGTH(TRIM(A.id)) <= 0 THEN NULL + ELSE TRIM( + A.id + ) + END + ) AS id_adj, + CASE + WHEN LENGTH(TRIM(A.name)) <= 0 THEN NULL + ELSE TRIM( + A.name + ) + END AS name_adj, + CASE + WHEN LENGTH(TRIM(A.symbol)) <= 0 THEN NULL + ELSE TRIM( + A.symbol + ) + END AS symbol_adj, + decimals, + source, + CASE + WHEN C.symbol IS NOT NULL THEN FALSE + ELSE TRUE + END AS is_deprecated, + A._inserted_timestamp + FROM + base_assets A + LEFT JOIN current_supported_assets C + ON A.symbol = C.symbol +) +SELECT + id_adj AS id, + name_adj AS NAME, + symbol_adj AS symbol, + decimals, + source, + is_deprecated, + _inserted_timestamp, + SYSDATE() AS inserted_timestamp, + SYSDATE() AS modified_timestamp, + {{ dbt_utils.generate_surrogate_key(['symbol']) }} AS native_asset_metadata_coinmarketcap_id, + '{{ invocation_id }}' AS _invocation_id +FROM + base_adj +WHERE + symbol IS NOT NULL + AND NAME IS NOT NULL qualify(ROW_NUMBER() over (PARTITION BY symbol +ORDER BY + _inserted_timestamp DESC)) = 1 -- built for native assets \ No newline at end of file diff --git a/models/silver/prices/coin_market_cap2/silver__native_asset_metadata_coinmarketcap.yml b/models/silver/prices/coin_market_cap2/silver__native_asset_metadata_coinmarketcap.yml new file mode 100644 index 0000000..eccd469 --- /dev/null +++ b/models/silver/prices/coin_market_cap2/silver__native_asset_metadata_coinmarketcap.yml @@ -0,0 +1,21 @@ +version: 2 +models: + - name: silver__native_asset_metadata_coinmarketcap + tests: + - dbt_utils.unique_combination_of_columns: + combination_of_columns: + - SYMBOL + columns: + - name: ID + tests: + - not_null + - name: NAME + tests: + - not_null + - name: SYMBOL + tests: + - not_null + - name: _INSERTED_TIMESTAMP + tests: + - not_null + \ No newline at end of file diff --git a/models/silver/prices/coin_market_cap2/silver__native_prices_coinmarketcap.sql b/models/silver/prices/coin_market_cap2/silver__native_prices_coinmarketcap.sql new file mode 100644 index 0000000..10e88f0 --- /dev/null +++ b/models/silver/prices/coin_market_cap2/silver__native_prices_coinmarketcap.sql @@ -0,0 +1,253 @@ +-- depends_on: {{ ref('core__dim_date_hours') }} +{{ config( + materialized = 'incremental', + unique_key = ['native_prices_coinmarketcap_id'], + incremental_strategy = 'delete+insert', + cluster_by = ['recorded_hour::DATE'], + tags = ['prices'] +) }} + +WITH base_prices AS ( + -- get all prices and join to asset metadata + + SELECT + p.recorded_hour, + m.symbol, + p.id, + m.name, + m.decimals, + p.close, + p.source, + p._inserted_timestamp + FROM + {{ ref( + 'bronze__all_prices_coinmarketcap2' + ) }} + p + INNER JOIN {{ ref( + 'silver__native_asset_metadata_coinmarketcap' + ) }} + m + ON m.id = LOWER(TRIM(p.id)) + WHERE + ( + p.close <> 0 + AND p.recorded_hour :: DATE <> '1970-01-01' + ) + +{% if is_incremental() %} +AND ( + p._inserted_timestamp >= ( + SELECT + MAX(_inserted_timestamp) + FROM + {{ this }} + ) + OR p.id NOT IN ( + SELECT + DISTINCT id + FROM + {{ this }} + ) --load all data for new assets +) +{% endif %} +), +latest_supported_assets AS ( + -- get the latest supported timestamp for each asset + SELECT + symbol, + DATE_TRUNC('hour', MAX(_inserted_timestamp)) AS last_supported_timestamp + FROM + {{ ref( + 'silver__native_asset_metadata_coinmarketcap' + ) }} + GROUP BY + 1 +) + +{% if is_incremental() %}, +price_gaps AS ( + -- identify missing prices by symbol + SELECT + symbol, + recorded_hour, + prev_recorded_hour, + gap + FROM + ( + SELECT + symbol, + recorded_hour, + LAG( + recorded_hour, + 1 + ) over ( + PARTITION BY symbol + ORDER BY + recorded_hour ASC + ) AS prev_RECORDED_HOUR, + DATEDIFF( + HOUR, + prev_RECORDED_HOUR, + recorded_hour + ) - 1 AS gap + FROM + {{ this }} + ) + WHERE + gap > 0 +), +native_asset_metadata AS ( + -- get all metadata for assets with missing prices + SELECT + symbol, + id, + NAME, + decimals, + _inserted_timestamp + FROM + {{ ref( + 'silver__native_asset_metadata_coinmarketcap' + ) }} + WHERE + symbol IN ( + SELECT + symbol + FROM + price_gaps + ) +), +date_hours AS ( + -- generate spine of all possible hours, between gaps + SELECT + date_hour, + symbol, + id, + NAME, + decimals + FROM + {{ ref('core__dim_date_hours') }} + CROSS JOIN native_asset_metadata + WHERE + date_hour <= ( + SELECT + MAX(recorded_hour) + FROM + price_gaps + ) + AND date_hour >= ( + SELECT + MIN(prev_recorded_hour) + FROM + price_gaps + ) +), +imputed_prices AS ( + -- impute missing prices + SELECT + d.date_hour, + d.symbol, + d.id, + d.name, + d.decimals, + CASE + WHEN d.date_hour <= s.last_supported_timestamp THEN p.close + ELSE NULL + END AS hourly_price, + CASE + WHEN hourly_price IS NULL + AND d.date_hour <= s.last_supported_timestamp THEN LAST_VALUE( + hourly_price ignore nulls + ) over ( + PARTITION BY d.symbol + ORDER BY + d.date_hour rows BETWEEN unbounded preceding + AND CURRENT ROW + ) + ELSE NULL + END AS imputed_price, + CASE + WHEN imputed_price IS NOT NULL THEN TRUE + ELSE p.is_imputed + END AS imputed, + COALESCE( + hourly_price, + imputed_price + ) AS final_price, + CASE + WHEN imputed_price IS NOT NULL THEN 'imputed_cg' + ELSE p.source + END AS source, + CASE + WHEN imputed_price IS NOT NULL THEN SYSDATE() + ELSE p._inserted_timestamp + END AS _inserted_timestamp + FROM + date_hours d + LEFT JOIN {{ this }} + p + ON d.symbol = p.symbol + AND d.date_hour = p.recorded_hour + LEFT JOIN latest_supported_assets s + ON d.symbol = s.symbol +) +{% endif %}, +FINAL AS ( + SELECT + p.recorded_hour, + p.symbol, + p.id, + p.name, + p.decimals, + CASE + WHEN p.recorded_hour <= s.last_supported_timestamp THEN p.close + ELSE NULL + END AS close_price, + -- only include prices during supported ranges + FALSE AS is_imputed, + p.source, + p._inserted_timestamp + FROM + base_prices p + LEFT JOIN latest_supported_assets s + ON p.symbol = s.symbol + WHERE + close_price IS NOT NULL + +{% if is_incremental() %} +UNION ALL +SELECT + date_hour AS recorded_hour, + symbol, + id, + NAME, + decimals, + final_price AS close_price, + imputed AS is_imputed, + source, + _inserted_timestamp +FROM + imputed_prices +WHERE + close_price IS NOT NULL + AND is_imputed +{% endif %} +) +SELECT + recorded_hour, + symbol, + id, + NAME, + decimals, + close_price AS CLOSE, + is_imputed, + source, + _inserted_timestamp, + SYSDATE() AS inserted_timestamp, + SYSDATE() AS modified_timestamp, + {{ dbt_utils.generate_surrogate_key(['recorded_hour','symbol']) }} AS native_prices_coinmarketcap_id, + '{{ invocation_id }}' AS _invocation_id +FROM + FINAL qualify(ROW_NUMBER() over (PARTITION BY recorded_hour, symbol +ORDER BY + _inserted_timestamp DESC)) = 1 diff --git a/models/silver/prices/coin_market_cap2/silver__native_prices_coinmarketcap.yml b/models/silver/prices/coin_market_cap2/silver__native_prices_coinmarketcap.yml new file mode 100644 index 0000000..7f1fa06 --- /dev/null +++ b/models/silver/prices/coin_market_cap2/silver__native_prices_coinmarketcap.yml @@ -0,0 +1,39 @@ +version: 2 +models: + - name: silver__native_prices_coinmarketcap + tests: + - dbt_utils.unique_combination_of_columns: + combination_of_columns: + - RECORDED_HOUR + - SYMBOL + + columns: + - name: RECORDED_HOUR + tests: + - price_hour_sequence_gaps: + partition_by_1: SYMBOL + column_name: RECORDED_HOUR + - dbt_expectations.expect_row_values_to_have_recent_data: + datepart: day + interval: 1 + - name: ID + tests: + - not_null + - name: SYMBOL + tests: + - not_null + - name: NAME + tests: + - not_null + - name: CLOSE + tests: + - not_null + - name: IS_IMPUTED + tests: + - not_null + - name: NATIVE_PRICES_COINMARKETCAP_ID + tests: + - not_null + - name: _INSERTED_TIMESTAMP + tests: + - not_null \ No newline at end of file diff --git a/models/silver/prices/complete/silver__complete_native_asset_metadata.sql b/models/silver/prices/complete/silver__complete_native_asset_metadata.sql new file mode 100644 index 0000000..6af52a1 --- /dev/null +++ b/models/silver/prices/complete/silver__complete_native_asset_metadata.sql @@ -0,0 +1,57 @@ +{{ config( + materialized = 'incremental', + unique_key = ['complete_native_asset_metadata_id'], + incremental_strategy = 'delete+insert', + post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION on equality(symbol, blockchain)", + tags = ['prices'] +) }} + +WITH base_assets AS ( + +SELECT + id AS asset_id, + UPPER(symbol) AS symbol_adj, + name, + decimals, + blockchain, + is_deprecated, + provider, + source, + _inserted_timestamp, + inserted_timestamp, + modified_timestamp, + native_asset_metadata_priority_id AS complete_native_asset_metadata_id +FROM + {{ ref('silver__native_asset_metadata_priority') }} + +{% if is_incremental() %} +WHERE + _inserted_timestamp >= ( + SELECT + MAX(_inserted_timestamp) + FROM + {{ this }} + ) + OR symbol_adj NOT IN ( + SELECT + DISTINCT symbol + FROM + {{ this }} + ) --load all data for new assets +{% endif %} +) + +SELECT + asset_id, + symbol_adj AS symbol, + name, + decimals, + blockchain, + is_deprecated, + provider, + source, + _inserted_timestamp, + inserted_timestamp, + modified_timestamp, + complete_native_asset_metadata_id +FROM base_assets \ No newline at end of file diff --git a/models/silver/prices/complete/silver__complete_native_asset_metadata.yml b/models/silver/prices/complete/silver__complete_native_asset_metadata.yml new file mode 100644 index 0000000..955cd2e --- /dev/null +++ b/models/silver/prices/complete/silver__complete_native_asset_metadata.yml @@ -0,0 +1,18 @@ +version: 2 +models: + - name: silver__complete_native_asset_metadata + tests: + - dbt_utils.unique_combination_of_columns: + combination_of_columns: + - SYMBOL + + columns: + - name: PROVIDER + tests: + - not_null + - name: SYMBOL + tests: + - not_null + - name: BLOCKCHAIN + tests: + - not_null \ No newline at end of file diff --git a/models/silver/prices/complete/silver__complete_native_prices.sql b/models/silver/prices/complete/silver__complete_native_prices.sql new file mode 100644 index 0000000..ffce048 --- /dev/null +++ b/models/silver/prices/complete/silver__complete_native_prices.sql @@ -0,0 +1,86 @@ +{{ config( + materialized = 'incremental', + unique_key = ['complete_native_prices_id'], + incremental_strategy = 'delete+insert', + cluster_by = ['hour::DATE'], + post_hook = "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION on equality(symbol, hour, blockchain)", + tags = ['prices'] +) }} + +WITH base_prices AS ( + + SELECT + DATEADD( + HOUR, + 1, + recorded_hour + ) AS HOUR, + --roll the close price forward 1 hour + p.id AS asset_id, + UPPER( + p.symbol + ) AS symbol_adj, + m.name, + m.decimals, + price, + p.blockchain, + is_imputed, + m.is_deprecated, + p.provider, + p.source, + p._inserted_timestamp, + GREATEST(COALESCE(p.inserted_timestamp, '2000-01-01'), COALESCE(m.inserted_timestamp, '2000-01-01')) AS inserted_timestamp, + GREATEST(COALESCE(p.modified_timestamp, '2000-01-01'), COALESCE(m.modified_timestamp, '2000-01-01')) AS modified_timestamp, + native_prices_priority_id AS complete_native_prices_id + FROM + {{ ref('silver__native_prices_priority') }} + p + LEFT JOIN {{ ref('silver__complete_native_asset_metadata') }} + m + ON UPPER( + p.symbol + ) = m.symbol + +{% if is_incremental() %} +WHERE + p._inserted_timestamp >= ( + SELECT + MAX(_inserted_timestamp) + FROM + {{ this }} + ) + OR symbol_adj NOT IN ( + SELECT + symbol + FROM + ( + SELECT + DISTINCT symbol + FROM + {{ this }} + ) + ) --load all data for new assets, requires additional select statement for compiler +{% endif %} + +qualify(ROW_NUMBER() over (PARTITION BY symbol_adj, HOUR +ORDER BY + p._inserted_timestamp DESC)) = 1 +) +SELECT + HOUR, + asset_id, + symbol_adj AS symbol, + name, + decimals, + price, + blockchain, + is_imputed, + is_deprecated, + provider, + source, + _inserted_timestamp, + inserted_timestamp, + modified_timestamp, + complete_native_prices_id +FROM + base_prices diff --git a/models/silver/prices/complete/silver__complete_native_prices.yml b/models/silver/prices/complete/silver__complete_native_prices.yml new file mode 100644 index 0000000..4493cd6 --- /dev/null +++ b/models/silver/prices/complete/silver__complete_native_prices.yml @@ -0,0 +1,31 @@ +version: 2 +models: + - name: silver__complete_native_prices + tests: + - dbt_utils.unique_combination_of_columns: + combination_of_columns: + - HOUR + - SYMBOL + + columns: + - name: HOUR + tests: + - not_null + - name: SYMBOL + tests: + - not_null + - name: BLOCKCHAIN + tests: + - not_null + - name: PROVIDER + tests: + - not_null + - name: PRICE + tests: + - not_null + - name: IS_IMPUTED + tests: + - not_null + - name: _INSERTED_TIMESTAMP + tests: + - not_null \ No newline at end of file diff --git a/models/silver/prices/complete/silver__complete_token_prices.sql b/models/silver/prices/complete/silver__complete_token_prices.sql index 0edf840..31e596a 100644 --- a/models/silver/prices/complete/silver__complete_token_prices.sql +++ b/models/silver/prices/complete/silver__complete_token_prices.sql @@ -17,6 +17,7 @@ SELECT p.token_address, p.id AS asset_id, UPPER(symbol) AS symbol, + name, decimals, price, p.blockchain,