diff --git a/macros/streamline/models.sql b/macros/streamline/models.sql index 3f6d900..2819e7d 100644 --- a/macros/streamline/models.sql +++ b/macros/streamline/models.sql @@ -22,8 +22,7 @@ SELECT {{ other_cols }}, value, _inserted_timestamp, - s.{{ partition_name }}, - s.value AS VALUE + s.{{ partition_name }} FROM {{ source( "bronze_streamline", @@ -63,8 +62,7 @@ SELECT {{ other_cols }}, value, _inserted_timestamp, - s.{{ partition_name }}, - s.value AS VALUE + s.{{ partition_name }} FROM {{ source( "bronze_streamline", diff --git a/models/bronze/core/bronze__accounts.sql b/models/bronze/core/bronze__accounts.sql index a6dc716..fc82ee2 100644 --- a/models/bronze/core/bronze__accounts.sql +++ b/models/bronze/core/bronze__accounts.sql @@ -3,8 +3,8 @@ ) }} {{ streamline_external_table_query_v2( model = "accounts", - partition_function = " TO_DATE( SPLIT_PART(SPLIT_PART(file_name, '/', 3), '=', 2)|| '01', 'YYYYMMDD')", - partition_name = "partition_id", + partition_function = "TRY_TO_DATE(left(split_part(split_part(file_name, '=', -1), '/', -1),8), 'YYYYMMDD')", + partition_name = "partition_gte_id", unique_key = "ACCOUNT_ID", - other_cols = "BALANCE, BUYING_LIABILITIES, SELLING_LIABILITIES, SEQUENCE_NUMBER, NUM_SUBENTRIES, INFLATION_DESTINATION, FLAGS, HOME_DOMAIN, MASTER_WEIGHT, THRESHOLD_LOW, THRESHOLD_MEDIUM, THRESHOLD_HIGH, LAST_MODIFIED_LEDGER, LEDGER_ENTRY_CHANGE, DELETED, BATCH_ID, BATCH_RUN_DATE, BATCH_INSERT_TS, SPONSOR, NUM_SPONSORED, NUM_SPONSORING, SEQUENCE_LEDGER, SEQUENCE_TIME, CLOSED_AT, LEDGER_SEQUENCE" + other_cols = "partition_id" ) }} diff --git a/models/bronze/core/bronze__accounts_FR.sql b/models/bronze/core/bronze__accounts_FR.sql index d9129f3..e8c666a 100644 --- a/models/bronze/core/bronze__accounts_FR.sql +++ b/models/bronze/core/bronze__accounts_FR.sql @@ -3,8 +3,8 @@ ) }} {{ streamline_external_table_FR_query_v2( model = "accounts", - partition_function = " TO_DATE( SPLIT_PART(SPLIT_PART(file_name, '/', 3), '=', 2)|| '01', 'YYYYMMDD')", - partition_name = "partition_id", + partition_function = "TRY_TO_DATE(left(split_part(split_part(file_name, '=', -1), '/', -1),8), 'YYYYMMDD')", + partition_name = "partition_gte_id", unique_key = "ACCOUNT_ID", - other_cols = "BALANCE, BUYING_LIABILITIES, SELLING_LIABILITIES, SEQUENCE_NUMBER, NUM_SUBENTRIES, INFLATION_DESTINATION, FLAGS, HOME_DOMAIN, MASTER_WEIGHT, THRESHOLD_LOW, THRESHOLD_MEDIUM, THRESHOLD_HIGH, LAST_MODIFIED_LEDGER, LEDGER_ENTRY_CHANGE, DELETED, BATCH_ID, BATCH_RUN_DATE, BATCH_INSERT_TS, SPONSOR, NUM_SPONSORED, NUM_SPONSORING, SEQUENCE_LEDGER, SEQUENCE_TIME, CLOSED_AT, LEDGER_SEQUENCE" + other_cols = "partition_id" ) }} diff --git a/models/bronze/core/bronze__assets.sql b/models/bronze/core/bronze__assets.sql index caf2690..31bc4e1 100644 --- a/models/bronze/core/bronze__assets.sql +++ b/models/bronze/core/bronze__assets.sql @@ -3,8 +3,8 @@ ) }} {{ streamline_external_table_query_v2( model = "history_assets", - partition_function = " TO_DATE( SPLIT_PART(SPLIT_PART(file_name, '/', 3), '=', 2)|| '01', 'YYYYMMDD')", - partition_name = "partition_id", - unique_key = "ID", - other_cols = "ASSET_TYPE, ASSET_CODE, ASSET_ISSUER, BATCH_RUN_DATE, BATCH_ID, BATCH_INSERT_TS, ASSET_ID" + partition_function = "TRY_TO_DATE(left(split_part(split_part(file_name, '=', -1), '/', -1),8), 'YYYYMMDD')", + partition_name = "partition_gte_id", + unique_key = "asset_id", + other_cols = "partition_id" ) }} diff --git a/models/bronze/core/bronze__assets_FR.sql b/models/bronze/core/bronze__assets_FR.sql index 5ff1c52..c3e1bb2 100644 --- a/models/bronze/core/bronze__assets_FR.sql +++ b/models/bronze/core/bronze__assets_FR.sql @@ -3,8 +3,8 @@ ) }} {{ streamline_external_table_FR_query_v2( model = "history_assets", - partition_function = " TO_DATE( SPLIT_PART(SPLIT_PART(file_name, '/', 3), '=', 2)|| '01', 'YYYYMMDD')", - partition_name = "partition_id", - unique_key = "ID", - other_cols = " ASSET_TYPE, ASSET_CODE, ASSET_ISSUER, BATCH_RUN_DATE, BATCH_ID, BATCH_INSERT_TS, ASSET_ID" + partition_function = "TRY_TO_DATE(left(split_part(split_part(file_name, '=', -1), '/', -1),8), 'YYYYMMDD')", + partition_name = "partition_gte_id", + unique_key = "asset_id", + other_cols = "partition_id" ) }} diff --git a/models/bronze/core/bronze__ledgers.sql b/models/bronze/core/bronze__ledgers.sql index 7df8534..6822587 100644 --- a/models/bronze/core/bronze__ledgers.sql +++ b/models/bronze/core/bronze__ledgers.sql @@ -3,8 +3,9 @@ ) }} {{ streamline_external_table_query_v2( model = "history_ledgers", - partition_function = " TO_DATE( SPLIT_PART(SPLIT_PART(file_name, '/', 3), '=', 2)|| '01', 'YYYYMMDD')", - partition_name = "partition_id", + partition_function = "TRY_TO_DATE(left(split_part(split_part(file_name, '=', -1), '/', -1),8), 'YYYYMMDD')", + partition_name = "partition_gte_id", unique_key = "SEQUENCE", - other_cols = "LEDGER_HASH, PREVIOUS_LEDGER_HASH, TRANSACTION_COUNT, OPERATION_COUNT, CLOSED_AT, ID, TOTAL_COINS, FEE_POOL, BASE_FEE, BASE_RESERVE, MAX_TX_SET_SIZE, PROTOCOL_VERSION, LEDGER_HEADER, SUCCESSFUL_TRANSACTION_COUNT, FAILED_TRANSACTION_COUNT, TX_SET_OPERATION_COUNT, BATCH_ID, BATCH_RUN_DATE, BATCH_INSERT_TS, SOROBAN_FEE_WRITE_1KB, NODE_ID, SIGNATURE, TOTAL_BYTE_SIZE_OF_BUCKET_LIST" + other_cols = "partition_id, LEDGER_HASH, PREVIOUS_LEDGER_HASH, TRANSACTION_COUNT, OPERATION_COUNT, CLOSED_AT, ID, TOTAL_COINS, FEE_POOL, BASE_FEE, BASE_RESERVE, MAX_TX_SET_SIZE, PROTOCOL_VERSION, SUCCESSFUL_TRANSACTION_COUNT, FAILED_TRANSACTION_COUNT, TX_SET_OPERATION_COUNT, BATCH_ID, BATCH_RUN_DATE, BATCH_INSERT_TS, SOROBAN_FEE_WRITE_1KB, NODE_ID, SIGNATURE, TOTAL_BYTE_SIZE_OF_BUCKET_LIST" ) }} +--LEDGER_HEADER, diff --git a/models/bronze/core/bronze__ledgers_FR.sql b/models/bronze/core/bronze__ledgers_FR.sql index 74a23c8..076523c 100644 --- a/models/bronze/core/bronze__ledgers_FR.sql +++ b/models/bronze/core/bronze__ledgers_FR.sql @@ -3,8 +3,9 @@ ) }} {{ streamline_external_table_FR_query_v2( model = "history_ledgers", - partition_function = " TO_DATE( SPLIT_PART(SPLIT_PART(file_name, '/', 3), '=', 2)|| '01', 'YYYYMMDD')", - partition_name = "partition_id", + partition_function = "TRY_TO_DATE(left(split_part(split_part(file_name, '=', -1), '/', -1),8), 'YYYYMMDD')", + partition_name = "partition_gte_id", unique_key = "SEQUENCE", - other_cols = "LEDGER_HASH, PREVIOUS_LEDGER_HASH, TRANSACTION_COUNT, OPERATION_COUNT, CLOSED_AT, ID, TOTAL_COINS, FEE_POOL, BASE_FEE, BASE_RESERVE, MAX_TX_SET_SIZE, PROTOCOL_VERSION, LEDGER_HEADER, SUCCESSFUL_TRANSACTION_COUNT, FAILED_TRANSACTION_COUNT, TX_SET_OPERATION_COUNT, BATCH_ID, BATCH_RUN_DATE, BATCH_INSERT_TS, SOROBAN_FEE_WRITE_1KB, NODE_ID, SIGNATURE, TOTAL_BYTE_SIZE_OF_BUCKET_LIST" + other_cols = "partition_id, LEDGER_HASH, PREVIOUS_LEDGER_HASH, TRANSACTION_COUNT, OPERATION_COUNT, CLOSED_AT, ID, TOTAL_COINS, FEE_POOL, BASE_FEE, BASE_RESERVE, MAX_TX_SET_SIZE, PROTOCOL_VERSION, SUCCESSFUL_TRANSACTION_COUNT, FAILED_TRANSACTION_COUNT, TX_SET_OPERATION_COUNT, BATCH_ID, BATCH_RUN_DATE, BATCH_INSERT_TS, SOROBAN_FEE_WRITE_1KB, NODE_ID, SIGNATURE, TOTAL_BYTE_SIZE_OF_BUCKET_LIST" ) }} +--LEDGER_HEADER, diff --git a/models/bronze/core/bronze__liquidity_pools.sql b/models/bronze/core/bronze__liquidity_pools.sql index 7fd33eb..d07b08a 100644 --- a/models/bronze/core/bronze__liquidity_pools.sql +++ b/models/bronze/core/bronze__liquidity_pools.sql @@ -3,8 +3,8 @@ ) }} {{ streamline_external_table_query_v2( model = "liquidity_pools", - partition_function = " TO_DATE( SPLIT_PART(SPLIT_PART(file_name, '/', 3), '=', 2)|| '01', 'YYYYMMDD')", - partition_name = "partition_id", + partition_function = "TRY_TO_DATE(left(split_part(split_part(file_name, '=', -1), '/', -1),8), 'YYYYMMDD')", + partition_name = "partition_gte_id", unique_key = "LIQUIDITY_POOL_ID", - other_cols = "TYPE, FEE, TRUSTLINE_COUNT, POOL_SHARE_COUNT, ASSET_A_TYPE, ASSET_A_CODE, ASSET_A_ISSUER, ASSET_A_ID, ASSET_A_AMOUNT, ASSET_B_TYPE, ASSET_B_CODE, ASSET_B_ISSUER, ASSET_B_ID, ASSET_B_AMOUNT, LAST_MODIFIED_LEDGER, LEDGER_ENTRY_CHANGE, DELETED, BATCH_ID, BATCH_RUN_DATE, BATCH_INSERT_TS, CLOSED_AT, LEDGER_SEQUENCE" + other_cols = "partition_id" ) }} diff --git a/models/bronze/core/bronze__liquidity_pools_FR.sql b/models/bronze/core/bronze__liquidity_pools_FR.sql index 9c65159..7005abe 100644 --- a/models/bronze/core/bronze__liquidity_pools_FR.sql +++ b/models/bronze/core/bronze__liquidity_pools_FR.sql @@ -3,8 +3,8 @@ ) }} {{ streamline_external_table_FR_query_v2( model = "liquidity_pools", - partition_function = " TO_DATE( SPLIT_PART(SPLIT_PART(file_name, '/', 3), '=', 2)|| '01', 'YYYYMMDD')", - partition_name = "partition_id", + partition_function = "TRY_TO_DATE(left(split_part(split_part(file_name, '=', -1), '/', -1),8), 'YYYYMMDD')", + partition_name = "partition_gte_id", unique_key = "LIQUIDITY_POOL_ID", - other_cols = "TYPE, FEE, TRUSTLINE_COUNT, POOL_SHARE_COUNT, ASSET_A_TYPE, ASSET_A_CODE, ASSET_A_ISSUER, ASSET_A_ID, ASSET_A_AMOUNT, ASSET_B_TYPE, ASSET_B_CODE, ASSET_B_ISSUER, ASSET_B_ID, ASSET_B_AMOUNT, LAST_MODIFIED_LEDGER, LEDGER_ENTRY_CHANGE, DELETED, BATCH_ID, BATCH_RUN_DATE, BATCH_INSERT_TS, CLOSED_AT, LEDGER_SEQUENCE" + other_cols = "partition_id" ) }} diff --git a/models/bronze/core/bronze__operations.sql b/models/bronze/core/bronze__operations.sql index d16f58c..fba8d4b 100644 --- a/models/bronze/core/bronze__operations.sql +++ b/models/bronze/core/bronze__operations.sql @@ -2,9 +2,9 @@ materialized = 'view' ) }} {{ streamline_external_table_query_v2( - model = "enriched_history_operations", - partition_function = "TO_DATE( SPLIT_PART(SPLIT_PART(file_name, '/', 3), '=', 2)|| '01', 'YYYYMMDD')", - partition_name = "partition_id", - unique_key = "OP_ID", - other_cols = 'ACCOUNT, AMOUNT, ASSET_CODE, ASSET_ISSUER, ASSET_TYPE, ASSET_ID, AUTHORIZE, BALANCE_ID, BUYING_ASSET_CODE, BUYING_ASSET_ISSUER, BUYING_ASSET_TYPE, BUYING_ASSET_ID,"from", FUNDER, HIGH_THRESHOLD, HOME_DOMAIN, INFLATION_DEST,"into","limit", LOW_THRESHOLD, MASTER_KEY_WEIGHT, MED_THRESHOLD, NAME, OFFER_ID, PATH, PRICE, D, N, SELLING_ASSET_CODE, SELLING_ASSET_ISSUER, SELLING_ASSET_TYPE, SELLING_ASSET_ID, SET_FLAGS, SET_FLAGS_S, SIGNER_KEY, SIGNER_WEIGHT, SOURCE_AMOUNT, SOURCE_ASSET_CODE, SOURCE_ASSET_ISSUER, SOURCE_ASSET_TYPE, SOURCE_ASSET_ID, SOURCE_MAX, STARTING_BALANCE,"to", TRUSTEE, TRUSTOR, TRUSTLINE_ASSET,"value", CLEAR_FLAGS, CLEAR_FLAGS_S, DESTINATION_MIN, BUMP_TO, SPONSOR, SPONSORED_ID, BEGIN_SPONSOR, AUTHORIZE_TO_MAINTAIN_LIABILITIES, CLAWBACK_ENABLED, LIQUIDITY_POOL_ID, RESERVE_A_ASSET_TYPE, RESERVE_A_ASSET_ID, RESERVE_A_ASSET_CODE, RESERVE_A_ASSET_ISSUER, RESERVE_A_MAX_AMOUNT, RESERVE_A_DEPOSIT_AMOUNT, RESERVE_B_ASSET_TYPE, RESERVE_B_ASSET_ID, RESERVE_B_ASSET_CODE, RESERVE_B_ASSET_ISSUER, RESERVE_B_MAX_AMOUNT, RESERVE_B_DEPOSIT_AMOUNT, MIN_PRICE, MIN_PRICE_R, MAX_PRICE, MAX_PRICE_R, SHARES_RECEIVED, RESERVE_A_MIN_AMOUNT, RESERVE_B_MIN_AMOUNT, SHARES, RESERVE_A_WITHDRAW_AMOUNT, RESERVE_B_WITHDRAW_AMOUNT, OP_SOURCE_ACCOUNT, OP_SOURCE_ACCOUNT_MUXED, TRANSACTION_ID, TYPE, TRANSACTION_HASH, LEDGER_SEQUENCE, TXN_ACCOUNT, ACCOUNT_SEQUENCE, MAX_FEE, TXN_OPERATION_COUNT, TXN_CREATED_AT, MEMO_TYPE, MEMO, TIME_BOUNDS, SUCCESSFUL, FEE_CHARGED, FEE_ACCOUNT, NEW_MAX_FEE, ACCOUNT_MUXED, FEE_ACCOUNT_MUXED, LEDGER_HASH, PREVIOUS_LEDGER_HASH, TRANSACTION_COUNT, LEDGER_OPERATION_COUNT, CLOSED_AT, LEDGER_ID, TOTAL_COINS, FEE_POOL, BASE_FEE, BASE_RESERVE, MAX_TX_SET_SIZE, PROTOCOL_VERSION, SUCCESSFUL_TRANSACTION_COUNT, FAILED_TRANSACTION_COUNT, BATCH_ID, BATCH_RUN_DATE, BATCH_INSERT_TS, LEDGER_BOUNDS, MIN_ACCOUNT_SEQUENCE, MIN_ACCOUNT_SEQUENCE_AGE, MIN_ACCOUNT_SEQUENCE_LEDGER_GAP, EXTRA_SIGNERS, ASSET_BALANCE_CHANGES, PARAMETERS, PARAMETERS_DECODED, FUNCTION, ADDRESS, SOROBAN_OPERATION_TYPE, EXTEND_TO, CONTRACT_ID, CONTRACT_CODE_HASH, RESOURCE_FEE, SOROBAN_RESOURCES_INSTRUCTIONS, SOROBAN_RESOURCES_READ_BYTES, SOROBAN_RESOURCES_WRITE_BYTES, TRANSACTION_RESULT_CODE, INCLUSION_FEE_BID, INCLUSION_FEE_CHARGED, RESOURCE_FEE_REFUND, OPERATION_RESULT_CODE, OPERATION_TRACE_CODE, OP_APPLICATION_ORDER, TXN_APPLICATION_ORDER' + model = "history_operations", + partition_function = "TRY_TO_DATE(left(split_part(split_part(file_name, '=', -1), '/', -1),8), 'YYYYMMDD')", + partition_name = "partition_gte_id", + unique_key = "ID", + other_cols = 'partition_id' ) }} diff --git a/models/bronze/core/bronze__operations_FR.sql b/models/bronze/core/bronze__operations_FR.sql index 6cd1a23..f3fa7e0 100644 --- a/models/bronze/core/bronze__operations_FR.sql +++ b/models/bronze/core/bronze__operations_FR.sql @@ -2,9 +2,9 @@ materialized = 'view' ) }} {{ streamline_external_table_FR_query_v2( - model = "enriched_history_operations", - partition_function = "TO_DATE( SPLIT_PART(SPLIT_PART(file_name, '/', 3), '=', 2)|| '01', 'YYYYMMDD')", - partition_name = "partition_id", - unique_key = "OP_ID", - other_cols = 'ACCOUNT, AMOUNT, ASSET_CODE, ASSET_ISSUER, ASSET_TYPE, ASSET_ID, AUTHORIZE, BALANCE_ID, BUYING_ASSET_CODE, BUYING_ASSET_ISSUER, BUYING_ASSET_TYPE, BUYING_ASSET_ID,"from", FUNDER, HIGH_THRESHOLD, HOME_DOMAIN, INFLATION_DEST,"into","limit", LOW_THRESHOLD, MASTER_KEY_WEIGHT, MED_THRESHOLD, NAME, OFFER_ID, PATH, PRICE, D, N, SELLING_ASSET_CODE, SELLING_ASSET_ISSUER, SELLING_ASSET_TYPE, SELLING_ASSET_ID, SET_FLAGS, SET_FLAGS_S, SIGNER_KEY, SIGNER_WEIGHT, SOURCE_AMOUNT, SOURCE_ASSET_CODE, SOURCE_ASSET_ISSUER, SOURCE_ASSET_TYPE, SOURCE_ASSET_ID, SOURCE_MAX, STARTING_BALANCE,"to", TRUSTEE, TRUSTOR, TRUSTLINE_ASSET,"value", CLEAR_FLAGS, CLEAR_FLAGS_S, DESTINATION_MIN, BUMP_TO, SPONSOR, SPONSORED_ID, BEGIN_SPONSOR, AUTHORIZE_TO_MAINTAIN_LIABILITIES, CLAWBACK_ENABLED, LIQUIDITY_POOL_ID, RESERVE_A_ASSET_TYPE, RESERVE_A_ASSET_ID, RESERVE_A_ASSET_CODE, RESERVE_A_ASSET_ISSUER, RESERVE_A_MAX_AMOUNT, RESERVE_A_DEPOSIT_AMOUNT, RESERVE_B_ASSET_TYPE, RESERVE_B_ASSET_ID, RESERVE_B_ASSET_CODE, RESERVE_B_ASSET_ISSUER, RESERVE_B_MAX_AMOUNT, RESERVE_B_DEPOSIT_AMOUNT, MIN_PRICE, MIN_PRICE_R, MAX_PRICE, MAX_PRICE_R, SHARES_RECEIVED, RESERVE_A_MIN_AMOUNT, RESERVE_B_MIN_AMOUNT, SHARES, RESERVE_A_WITHDRAW_AMOUNT, RESERVE_B_WITHDRAW_AMOUNT, OP_SOURCE_ACCOUNT, OP_SOURCE_ACCOUNT_MUXED, TRANSACTION_ID, TYPE, TRANSACTION_HASH, LEDGER_SEQUENCE, TXN_ACCOUNT, ACCOUNT_SEQUENCE, MAX_FEE, TXN_OPERATION_COUNT, TXN_CREATED_AT, MEMO_TYPE, MEMO, TIME_BOUNDS, SUCCESSFUL, FEE_CHARGED, FEE_ACCOUNT, NEW_MAX_FEE, ACCOUNT_MUXED, FEE_ACCOUNT_MUXED, LEDGER_HASH, PREVIOUS_LEDGER_HASH, TRANSACTION_COUNT, LEDGER_OPERATION_COUNT, CLOSED_AT, LEDGER_ID, TOTAL_COINS, FEE_POOL, BASE_FEE, BASE_RESERVE, MAX_TX_SET_SIZE, PROTOCOL_VERSION, SUCCESSFUL_TRANSACTION_COUNT, FAILED_TRANSACTION_COUNT, BATCH_ID, BATCH_RUN_DATE, BATCH_INSERT_TS, LEDGER_BOUNDS, MIN_ACCOUNT_SEQUENCE, MIN_ACCOUNT_SEQUENCE_AGE, MIN_ACCOUNT_SEQUENCE_LEDGER_GAP, EXTRA_SIGNERS, ASSET_BALANCE_CHANGES, PARAMETERS, PARAMETERS_DECODED, FUNCTION, ADDRESS, SOROBAN_OPERATION_TYPE, EXTEND_TO, CONTRACT_ID, CONTRACT_CODE_HASH, RESOURCE_FEE, SOROBAN_RESOURCES_INSTRUCTIONS, SOROBAN_RESOURCES_READ_BYTES, SOROBAN_RESOURCES_WRITE_BYTES, TRANSACTION_RESULT_CODE, INCLUSION_FEE_BID, INCLUSION_FEE_CHARGED, RESOURCE_FEE_REFUND, OPERATION_RESULT_CODE, OPERATION_TRACE_CODE, OP_APPLICATION_ORDER, TXN_APPLICATION_ORDER' + model = "history_operations", + partition_function = "TRY_TO_DATE(left(split_part(split_part(file_name, '=', -1), '/', -1),8), 'YYYYMMDD')", + partition_name = "partition_gte_id", + unique_key = "ID", + other_cols = 'partition_id' ) }} diff --git a/models/bronze/core/bronze__trades.sql b/models/bronze/core/bronze__trades.sql index bb588e7..f433a6d 100644 --- a/models/bronze/core/bronze__trades.sql +++ b/models/bronze/core/bronze__trades.sql @@ -3,8 +3,8 @@ ) }} {{ streamline_external_table_query_v2( model = "history_trades", - partition_function = " TO_DATE( SPLIT_PART(SPLIT_PART(file_name, '/', 3), '=', 2)|| '01', 'YYYYMMDD')", - partition_name = "partition_id", + partition_function = "TRY_TO_DATE(left(split_part(split_part(file_name, '=', -1), '/', -1),8), 'YYYYMMDD')", + partition_name = "partition_gte_id", unique_key = "HISTORY_OPERATION_ID", - other_cols = '"order", LEDGER_CLOSED_AT, SELLING_ACCOUNT_ADDRESS, SELLING_ASSET_CODE, SELLING_ASSET_ISSUER, SELLING_ASSET_TYPE, SELLING_ASSET_ID, SELLING_AMOUNT, BUYING_ACCOUNT_ADDRESS, BUYING_ASSET_CODE, BUYING_ASSET_ISSUER, BUYING_ASSET_TYPE, BUYING_ASSET_ID, BUYING_AMOUNT, PRICE_N, PRICE_D, SELLING_OFFER_ID, BUYING_OFFER_ID, BATCH_ID, BATCH_RUN_DATE, BATCH_INSERT_TS, SELLING_LIQUIDITY_POOL_ID, LIQUIDITY_POOL_FEE, TRADE_TYPE, ROUNDING_SLIPPAGE, SELLER_IS_EXACT' + other_cols = 'partition_id' ) }} diff --git a/models/bronze/core/bronze__trades_FR.sql b/models/bronze/core/bronze__trades_FR.sql index 09c1ce2..61c2713 100644 --- a/models/bronze/core/bronze__trades_FR.sql +++ b/models/bronze/core/bronze__trades_FR.sql @@ -3,8 +3,8 @@ ) }} {{ streamline_external_table_FR_query_v2( model = "history_trades", - partition_function = " TO_DATE( SPLIT_PART(SPLIT_PART(file_name, '/', 3), '=', 2)|| '01', 'YYYYMMDD')", - partition_name = "partition_id", + partition_function = "TRY_TO_DATE(left(split_part(split_part(file_name, '=', -1), '/', -1),8), 'YYYYMMDD')", + partition_name = "partition_gte_id", unique_key = "HISTORY_OPERATION_ID", - other_cols = '"order", LEDGER_CLOSED_AT, SELLING_ACCOUNT_ADDRESS, SELLING_ASSET_CODE, SELLING_ASSET_ISSUER, SELLING_ASSET_TYPE, SELLING_ASSET_ID, SELLING_AMOUNT, BUYING_ACCOUNT_ADDRESS, BUYING_ASSET_CODE, BUYING_ASSET_ISSUER, BUYING_ASSET_TYPE, BUYING_ASSET_ID, BUYING_AMOUNT, PRICE_N, PRICE_D, SELLING_OFFER_ID, BUYING_OFFER_ID, BATCH_ID, BATCH_RUN_DATE, BATCH_INSERT_TS, SELLING_LIQUIDITY_POOL_ID, LIQUIDITY_POOL_FEE, TRADE_TYPE, ROUNDING_SLIPPAGE, SELLER_IS_EXACT' + other_cols = 'partition_id' ) }} diff --git a/models/bronze/core/bronze__transactions.sql b/models/bronze/core/bronze__transactions.sql index ae7740d..aad1ef8 100644 --- a/models/bronze/core/bronze__transactions.sql +++ b/models/bronze/core/bronze__transactions.sql @@ -3,8 +3,8 @@ ) }} {{ streamline_external_table_query_v2( model = "history_transactions", - partition_function = " TO_DATE( SPLIT_PART(SPLIT_PART(file_name, '/', 3), '=', 2)|| '01', 'YYYYMMDD')", - partition_name = "partition_id", + partition_function = "TRY_TO_DATE(left(split_part(split_part(file_name, '=', -1), '/', -1),8), 'YYYYMMDD')", + partition_name = "partition_gte_id", unique_key = "ID", - other_cols = "TRANSACTION_HASH, LEDGER_SEQUENCE, ACCOUNT, ACCOUNT_SEQUENCE, MAX_FEE, OPERATION_COUNT, CREATED_AT, MEMO_TYPE, MEMO, TIME_BOUNDS, SUCCESSFUL, FEE_CHARGED, INNER_TRANSACTION_HASH, FEE_ACCOUNT, NEW_MAX_FEE, ACCOUNT_MUXED, FEE_ACCOUNT_MUXED, BATCH_ID, BATCH_RUN_DATE, BATCH_INSERT_TS, LEDGER_BOUNDS, MIN_ACCOUNT_SEQUENCE, MIN_ACCOUNT_SEQUENCE_AGE, MIN_ACCOUNT_SEQUENCE_LEDGER_GAP, TX_ENVELOPE, TX_RESULT, TX_META, TX_FEE_META, EXTRA_SIGNERS, RESOURCE_FEE, SOROBAN_RESOURCES_INSTRUCTIONS, SOROBAN_RESOURCES_READ_BYTES, SOROBAN_RESOURCES_WRITE_BYTES, CLOSED_AT, TRANSACTION_RESULT_CODE, INCLUSION_FEE_BID, INCLUSION_FEE_CHARGED, RESOURCE_FEE_REFUND, NON_REFUNDABLE_RESOURCE_FEE_CHARGED, REFUNDABLE_RESOURCE_FEE_CHARGED, RENT_FEE_CHARGED, TX_SIGNERS, REFUNDABLE_FEE" + other_cols = "partition_id" ) }} diff --git a/models/bronze/core/bronze__transactions_FR.sql b/models/bronze/core/bronze__transactions_FR.sql index a721ef5..85b37e1 100644 --- a/models/bronze/core/bronze__transactions_FR.sql +++ b/models/bronze/core/bronze__transactions_FR.sql @@ -3,8 +3,8 @@ ) }} {{ streamline_external_table_FR_query_v2( model = "history_transactions", - partition_function = " TO_DATE( SPLIT_PART(SPLIT_PART(file_name, '/', 3), '=', 2)|| '01', 'YYYYMMDD')", - partition_name = "partition_id", + partition_function = "TRY_TO_DATE(left(split_part(split_part(file_name, '=', -1), '/', -1),8), 'YYYYMMDD')", + partition_name = "partition_gte_id", unique_key = "ID", - other_cols = "TRANSACTION_HASH, LEDGER_SEQUENCE, ACCOUNT, ACCOUNT_SEQUENCE, MAX_FEE, OPERATION_COUNT, CREATED_AT, MEMO_TYPE, MEMO, TIME_BOUNDS, SUCCESSFUL, FEE_CHARGED, INNER_TRANSACTION_HASH, FEE_ACCOUNT, NEW_MAX_FEE, ACCOUNT_MUXED, FEE_ACCOUNT_MUXED, BATCH_ID, BATCH_RUN_DATE, BATCH_INSERT_TS, LEDGER_BOUNDS, MIN_ACCOUNT_SEQUENCE, MIN_ACCOUNT_SEQUENCE_AGE, MIN_ACCOUNT_SEQUENCE_LEDGER_GAP, TX_ENVELOPE, TX_RESULT, TX_META, TX_FEE_META, EXTRA_SIGNERS, RESOURCE_FEE, SOROBAN_RESOURCES_INSTRUCTIONS, SOROBAN_RESOURCES_READ_BYTES, SOROBAN_RESOURCES_WRITE_BYTES, CLOSED_AT, TRANSACTION_RESULT_CODE, INCLUSION_FEE_BID, INCLUSION_FEE_CHARGED, RESOURCE_FEE_REFUND, NON_REFUNDABLE_RESOURCE_FEE_CHARGED, REFUNDABLE_RESOURCE_FEE_CHARGED, RENT_FEE_CHARGED, TX_SIGNERS, REFUNDABLE_FEE" + other_cols = "partition_id" ) }} diff --git a/models/silver/silver__accounts.sql b/models/silver/silver__accounts.sql index 8a5b912..5a70873 100644 --- a/models/silver/silver__accounts.sql +++ b/models/silver/silver__accounts.sql @@ -5,54 +5,78 @@ incremental_predicates = ["dynamic_range_predicate", "partition_id::date"], merge_exclude_columns = ["inserted_timestamp"], cluster_by = ['closed_at::DATE','partition_id','modified_timestamp::DATE'], - full_refresh = false, tags = ['scheduled_core'], ) }} {% if execute %} {% if is_incremental() %} -{% set max_inserted_query %} +{% set max_is_query %} SELECT - MAX(_inserted_timestamp) AS _inserted_timestamp + MAX(_inserted_timestamp) AS _inserted_timestamp, + MAX(partition_gte_id) AS partition__gte_id FROM {{ this }} {% endset %} - {% set max_inserted_timestamp = run_query(max_inserted_query) [0] [0] %} + {% set result = run_query(max_is_query) %} + {% set max_is = result [0] [0] %} + {% set max_part = result [0] [1] %} {% endif %} {% endif %} WITH pre_final AS ( SELECT partition_id, - account_id, - balance, - buying_liabilities, - selling_liabilities, - sequence_number, - num_subentries, - inflation_destination, - flags, - home_domain, - master_weight, - threshold_low, - threshold_medium, - threshold_high, - last_modified_ledger, - ledger_entry_change, - deleted, - batch_id, - batch_run_date, - batch_insert_ts, - sponsor, - num_sponsored, - num_sponsoring, - sequence_ledger, - sequence_time, - closed_at, - ledger_sequence, + partition_gte_id, + VALUE :account_id :: STRING AS account_id, + TRY_CAST( + VALUE :balance :: STRING AS FLOAT + ) AS balance, + TRY_CAST( + VALUE :buying_liabilities :: STRING AS FLOAT + ) AS buying_liabilities, + TRY_CAST( + VALUE :selling_liabilities :: STRING AS FLOAT + ) AS selling_liabilities, + VALUE :sequence_number :: INTEGER AS sequence_number, + VALUE :num_subentries :: INTEGER AS num_subentries, + VALUE :inflation_destination :: STRING AS inflation_destination, + VALUE :flags :: INTEGER AS flags, + VALUE :home_domain :: STRING AS home_domain, + VALUE :master_weight :: INTEGER AS master_weight, + VALUE :threshold_low :: INTEGER AS threshold_low, + VALUE :threshold_medium :: INTEGER AS threshold_medium, + VALUE :threshold_high :: INTEGER AS threshold_high, + VALUE :last_modified_ledger :: INTEGER AS last_modified_ledger, + VALUE :ledger_entry_change :: INTEGER AS ledger_entry_change, + VALUE :deleted :: BOOLEAN AS deleted, + TO_TIMESTAMP( + VALUE :batch_run_date :: INT, + 6 + ) AS batch_run_date, + VALUE: batch_id :: STRING AS batch_id, + TO_TIMESTAMP( + VALUE :batch_insert_ts :: INT, + 6 + ) AS batch_insert_ts, + VALUE :sponsor :: STRING AS sponsor, + VALUE :num_sponsored :: INTEGER AS num_sponsored, + VALUE :num_sponsoring :: INTEGER AS num_sponsoring, + VALUE :sequence_ledger :: INTEGER AS sequence_ledger, + TO_TIMESTAMP( + VALUE :sequence_time :: INT, + 6 + ) AS sequence_time, + TO_TIMESTAMP( + VALUE :closed_at :: INT, + 6 + ) AS closed_at, + TO_TIMESTAMP( + VALUE :ledger_sequence :: INT, + 6 + ) AS ledger_sequence, _inserted_timestamp FROM @@ -64,18 +88,21 @@ WITH pre_final AS ( {% if is_incremental() %} WHERE - _inserted_timestamp >= '{{ max_inserted_timestamp }}' + partition_gte_id >= '{{ max_part }}' + AND _inserted_timestamp > '{{ max_is }}' {% endif %} qualify ROW_NUMBER() over ( PARTITION BY account_id, closed_at ORDER BY + batch_insert_ts DESC, _inserted_timestamp DESC ) = 1 ) SELECT partition_id, + partition_gte_id, account_id, balance, buying_liabilities, diff --git a/models/silver/silver__assets.sql b/models/silver/silver__assets.sql index f30659a..4006d4e 100644 --- a/models/silver/silver__assets.sql +++ b/models/silver/silver__assets.sql @@ -11,29 +11,39 @@ {% if execute %} {% if is_incremental() %} -{% set max_inserted_query %} +{% set max_is_query %} SELECT - MAX(_inserted_timestamp) AS _inserted_timestamp + MAX(_inserted_timestamp) AS _inserted_timestamp, + MAX(partition_gte_id) AS partition__gte_id FROM {{ this }} {% endset %} - {% set max_inserted_timestamp = run_query(max_inserted_query) [0] [0] %} + {% set result = run_query(max_is_query) %} + {% set max_is = result [0] [0] %} + {% set max_part = result [0] [1] %} {% endif %} {% endif %} WITH pre_final AS ( SELECT partition_id, - id, - asset_type, - asset_code, - asset_issuer, - batch_run_date, - batch_id, - batch_insert_ts, - asset_id, + partition_gte_id, + VALUE :id :: FLOAT AS id, + VALUE :asset_type :: STRING AS asset_type, + VALUE :asset_code :: STRING AS asset_code, + VALUE :asset_issuer :: STRING AS asset_issuer, + TO_TIMESTAMP( + VALUE :batch_run_date :: INT, + 6 + ) AS batch_run_date, + VALUE: batch_id :: STRING AS batch_id, + TO_TIMESTAMP( + VALUE :batch_insert_ts :: INT, + 6 + ) AS batch_insert_ts, + VALUE :asset_id :: INT AS asset_id, _inserted_timestamp FROM @@ -45,7 +55,8 @@ WITH pre_final AS ( {% if is_incremental() %} WHERE - _inserted_timestamp >= '{{ max_inserted_timestamp }}' + partition_gte_id >= '{{ max_part }}' + AND _inserted_timestamp > '{{ max_is }}' {% endif %} qualify ROW_NUMBER() over ( @@ -57,6 +68,7 @@ qualify ROW_NUMBER() over ( ) SELECT partition_id, + partition_gte_id, id, asset_type, asset_code, diff --git a/models/silver/silver__ledgers.sql b/models/silver/silver__ledgers.sql index ae24d42..fcba131 100644 --- a/models/silver/silver__ledgers.sql +++ b/models/silver/silver__ledgers.sql @@ -5,52 +5,55 @@ incremental_predicates = ["dynamic_range_predicate", "partition_id::date"], merge_exclude_columns = ["inserted_timestamp"], cluster_by = ['closed_at::DATE','partition_id','modified_timestamp::DATE'], - full_refresh = false, tags = ['scheduled_core'], ) }} {% if execute %} {% if is_incremental() %} -{% set max_inserted_query %} +{% set max_is_query %} SELECT - MAX(_inserted_timestamp) AS _inserted_timestamp + MAX(_inserted_timestamp) AS _inserted_timestamp, + MAX(partition_gte_id) AS partition__gte_id FROM {{ this }} {% endset %} - {% set max_inserted_timestamp = run_query(max_inserted_query) [0] [0] %} + {% set result = run_query(max_is_query) %} + {% set max_is = result [0] [0] %} + {% set max_part = result [0] [1] %} {% endif %} {% endif %} WITH pre_final AS ( SELECT partition_id, - SEQUENCE, - ledger_hash, - previous_ledger_hash, - transaction_count, - operation_count, - closed_at, - id, - total_coins, - fee_pool, - base_fee, - base_reserve, - max_tx_set_size, - protocol_version, - ledger_header, - successful_transaction_count, - failed_transaction_count, - tx_set_operation_count, - batch_id, - batch_run_date, - batch_insert_ts, - soroban_fee_write_1kb, - node_id, - signature, - total_byte_size_of_bucket_list, + partition_gte_id, + SEQUENCE :: INTEGER AS SEQUENCE, + ledger_hash :: STRING AS ledger_hash, + previous_ledger_hash :: STRING AS previous_ledger_hash, + transaction_count :: INTEGER AS transaction_count, + operation_count :: INTEGER AS operation_count, + closed_at :: TIMESTAMP AS closed_at, + id :: INTEGER AS id, + total_coins :: INTEGER AS total_coins, + fee_pool :: INTEGER AS fee_pool, + base_fee :: INTEGER AS base_fee, + base_reserve :: INTEGER AS base_reserve, + max_tx_set_size :: INTEGER AS max_tx_set_size, + protocol_version :: INTEGER AS protocol_version, + {# ledger_header :: STRING AS ledger_header, #} + successful_transaction_count :: INTEGER AS successful_transaction_count, + failed_transaction_count :: INTEGER AS failed_transaction_count, + tx_set_operation_count :: INTEGER AS tx_set_operation_count, + batch_id :: STRING AS batch_id, + batch_run_date :: TIMESTAMP AS batch_run_date, + batch_insert_ts :: TIMESTAMP AS batch_insert_ts, + soroban_fee_write_1kb :: INTEGER AS soroban_fee_write_1kb, + node_id :: STRING AS node_id, + signature :: STRING AS signature, + total_byte_size_of_bucket_list :: INTEGER AS total_byte_size_of_bucket_list, _inserted_timestamp FROM @@ -62,17 +65,20 @@ WITH pre_final AS ( {% if is_incremental() %} WHERE - _inserted_timestamp >= '{{ max_inserted_timestamp }}' + partition_gte_id >= '{{ max_part }}' + AND _inserted_timestamp > '{{ max_is }}' {% endif %} qualify ROW_NUMBER() over ( PARTITION BY SEQUENCE ORDER BY + batch_insert_ts DESC, _inserted_timestamp DESC ) = 1 ) SELECT partition_id, + partition_gte_id, SEQUENCE, ledger_hash, previous_ledger_hash, @@ -86,7 +92,7 @@ SELECT base_reserve, max_tx_set_size, protocol_version, - ledger_header, + {# ledger_header, #} successful_transaction_count, failed_transaction_count, tx_set_operation_count, @@ -97,12 +103,12 @@ SELECT node_id, signature, total_byte_size_of_bucket_list, - _inserted_timestamp, {{ dbt_utils.generate_surrogate_key( ['sequence'] ) }} AS ledgers_id, SYSDATE() AS inserted_timestamp, SYSDATE() AS modified_timestamp, - '{{ invocation_id }}' AS _invocation_id + '{{ invocation_id }}' AS _invocation_id, + _inserted_timestamp FROM pre_final diff --git a/models/silver/silver__liquidity_pools.sql b/models/silver/silver__liquidity_pools.sql index d22ae5c..aa2e8f1 100644 --- a/models/silver/silver__liquidity_pools.sql +++ b/models/silver/silver__liquidity_pools.sql @@ -11,44 +11,57 @@ {% if execute %} {% if is_incremental() %} -{% set max_inserted_query %} +{% set max_is_query %} SELECT - MAX(_inserted_timestamp) AS _inserted_timestamp + MAX(_inserted_timestamp) AS _inserted_timestamp, + MAX(partition_gte_id) AS partition__gte_id FROM {{ this }} {% endset %} - {% set max_inserted_timestamp = run_query(max_inserted_query) [0] [0] %} + {% set result = run_query(max_is_query) %} + {% set max_is = result [0] [0] %} + {% set max_part = result [0] [1] %} {% endif %} {% endif %} WITH pre_final AS ( SELECT partition_id, + partition_gte_id, liquidity_pool_id, - TYPE, - fee, - trustline_count, - pool_share_count, - asset_a_type, - asset_a_code, - asset_a_issuer, - asset_a_id, - asset_a_amount, - asset_b_type, - asset_b_code, - asset_b_issuer, - asset_b_id, - asset_b_amount, - last_modified_ledger, - ledger_entry_change, - deleted, - batch_id, - batch_run_date, - batch_insert_ts, - closed_at, - ledger_sequence, + VALUE :TYPE :: STRING AS TYPE, + VALUE :fee :: INTEGER AS fee, + VALUE :trustline_count :: INTEGER AS trustline_count, + VALUE :pool_share_count :: FLOAT AS pool_share_count, + VALUE :asset_a_type :: STRING AS asset_a_type, + VALUE :asset_a_code :: STRING AS asset_a_code, + VALUE :asset_a_issuer :: STRING AS asset_a_issuer, + VALUE :asset_a_id :: INTEGER AS asset_a_id, + VALUE :asset_a_amount :: FLOAT AS asset_a_amount, + VALUE :asset_b_type :: STRING AS asset_b_type, + VALUE :asset_b_code :: STRING AS asset_b_code, + VALUE :asset_b_issuer :: STRING AS asset_b_issuer, + VALUE :asset_b_id :: INTEGER AS asset_b_id, + VALUE :asset_b_amount :: FLOAT AS asset_b_amount, + VALUE :last_modified_ledger :: INTEGER AS last_modified_ledger, + VALUE :ledger_entry_change :: INTEGER AS ledger_entry_change, + VALUE :deleted :: BOOLEAN AS deleted, + TO_TIMESTAMP( + VALUE :batch_run_date :: INT, + 6 + ) AS batch_run_date, + VALUE: batch_id :: STRING AS batch_id, + TO_TIMESTAMP( + VALUE :batch_insert_ts :: INT, + 6 + ) AS batch_insert_ts, + TO_TIMESTAMP( + VALUE :closed_at :: INT, + 6 + ) AS closed_at, + VALUE :ledger_sequence :: INTEGER AS ledger_sequence, _inserted_timestamp FROM @@ -60,19 +73,22 @@ WITH pre_final AS ( {% if is_incremental() %} WHERE - _inserted_timestamp >= '{{ max_inserted_timestamp }}' + partition_gte_id >= '{{ max_part }}' + AND _inserted_timestamp > '{{ max_is }}' {% endif %} qualify ROW_NUMBER() over ( PARTITION BY liquidity_pool_id, closed_at ORDER BY + batch_insert_ts DESC, _inserted_timestamp DESC ) = 1 ) SELECT partition_id, - liquidity_pool_id, + partition_gte_id, + liquidity_pool_id :: STRING AS liquidity_pool_id, TYPE, fee, trustline_count, diff --git a/models/silver/silver__operations.sql b/models/silver/silver__operations.sql index 77983cc..a88d545 100644 --- a/models/silver/silver__operations.sql +++ b/models/silver/silver__operations.sql @@ -1,8 +1,8 @@ -- depends_on: {{ ref('bronze__operations') }} {{ config( materialized = 'incremental', - unique_key = "op_id", - incremental_predicates = ["dynamic_range_predicate", "partition_id::date"], + unique_key = "id", + incremental_predicates = ["dynamic_range_predicate","partition_id::date"], merge_exclude_columns = ["inserted_timestamp"], cluster_by = ['closed_at::DATE','partition_id','modified_timestamp::DATE'], tags = ['scheduled_core'], @@ -11,165 +11,165 @@ {% if execute %} {% if is_incremental() %} -{% set max_inserted_query %} +{% set max_is_query %} SELECT - MAX(_inserted_timestamp) AS _inserted_timestamp + MAX(_inserted_timestamp) AS _inserted_timestamp, + MAX(partition_gte_id) AS partition__gte_id FROM {{ this }} {% endset %} - {% set max_inserted_timestamp = run_query(max_inserted_query) [0] [0] %} + {% set result = run_query(max_is_query) %} + {% set max_is = result [0] [0] %} + {% set max_part = result [0] [1] %} {% endif %} {% endif %} WITH pre_final AS ( SELECT partition_id, - op_id, - account, - amount, - asset_code, - asset_issuer, - asset_type, - asset_id, - authorize, - balance_id, - buying_asset_code, - buying_asset_issuer, - buying_asset_type, - buying_asset_id, - "from", - funder, - high_threshold, - home_domain, - inflation_dest, - "into", - "limit", - low_threshold, - master_key_weight, - med_threshold, - NAME, - offer_id, - path, - price, - d, - n, - selling_asset_code, - selling_asset_issuer, - selling_asset_type, - selling_asset_id, - set_flags, - set_flags_s, - signer_key, - signer_weight, - source_amount, - source_asset_code, - source_asset_issuer, - source_asset_type, - source_asset_id, - source_max, - starting_balance, - "to", - trustee, - trustor, - trustline_asset, - "value", - clear_flags, - clear_flags_s, - destination_min, - bump_to, - sponsor, - sponsored_id, - begin_sponsor, - authorize_to_maintain_liabilities, - clawback_enabled, - liquidity_pool_id, - reserve_a_asset_type, - reserve_a_asset_id, - reserve_a_asset_code, - reserve_a_asset_issuer, - reserve_a_max_amount, - reserve_a_deposit_amount, - reserve_b_asset_type, - reserve_b_asset_id, - reserve_b_asset_code, - reserve_b_asset_issuer, - reserve_b_max_amount, - reserve_b_deposit_amount, - min_price, - min_price_r, - max_price, - max_price_r, - shares_received, - reserve_a_min_amount, - reserve_b_min_amount, - shares, - reserve_a_withdraw_amount, - reserve_b_withdraw_amount, - op_source_account, - op_source_account_muxed, - transaction_id, - TYPE, - transaction_hash, - ledger_sequence, - txn_account, - account_sequence, - max_fee, - txn_operation_count, - txn_created_at, - memo_type, - memo, - time_bounds, - SUCCESSFUL, - fee_charged, - fee_account, - new_max_fee, - account_muxed, - fee_account_muxed, - ledger_hash, - previous_ledger_hash, - transaction_count, - ledger_operation_count, - closed_at, - ledger_id, - total_coins, - fee_pool, - base_fee, - base_reserve, - max_tx_set_size, - protocol_version, - successful_transaction_count, - failed_transaction_count, - batch_id, - batch_run_date, - batch_insert_ts, - ledger_bounds, - min_account_sequence, - min_account_sequence_age, - min_account_sequence_ledger_gap, - extra_signers, - asset_balance_changes, - PARAMETERS, - parameters_decoded, - FUNCTION, - address, - soroban_operation_type, - extend_to, - contract_id, - contract_code_hash, - resource_fee, - soroban_resources_instructions, - soroban_resources_read_bytes, - soroban_resources_write_bytes, - transaction_result_code, - inclusion_fee_bid, - inclusion_fee_charged, - resource_fee_refund, - operation_result_code, - operation_trace_code, - op_application_order, - txn_application_order, + partition_gte_id, + id :: INTEGER AS id, + VALUE :source_account :: STRING AS source_account, + VALUE :source_account_muxed :: STRING AS op_source_account_muxed, + VALUE :ledger_sequence :: INTEGER AS ledger_sequence, + VALUE :transaction_id :: INTEGER AS transaction_id, + VALUE :type :: INTEGER AS TYPE, + VALUE :type_string :: STRING AS type_string, + VALUE :details :account :: STRING AS account, + VALUE :details :account_muxed :: STRING AS op_account_muxed, + VALUE :details :account_muxed_id :: INTEGER AS op_account_muxed_id, + VALUE :details :account_id :: STRING AS op_account_id, + VALUE :details :amount :: FLOAT AS amount, + VALUE :details :asset :: STRING AS asset, + VALUE :details :asset_code :: STRING AS asset_code, + VALUE :details :asset_issuer :: STRING AS asset_issuer, + VALUE :details :asset_id :: STRING AS asset_id, + VALUE :details :asset_type :: STRING AS asset_type, + VALUE :details :authorize :: BOOLEAN AS authorize, + VALUE :details :balance_id :: STRING AS balance_id, + VALUE :details :buying_asset_code :: STRING AS buying_asset_code, + VALUE :details :buying_asset_issuer :: STRING AS buying_asset_issuer, + VALUE :details :buying_asset_id :: STRING AS buying_asset_id, + VALUE :details :buying_asset_type :: STRING AS buying_asset_type, + VALUE :details :claimable_balance_id :: STRING AS claimable_balance_id, + VALUE :details :claimant :: STRING AS claimant, + VALUE :details :claimant_muxed :: STRING AS claimant_muxed, + VALUE :details :claimant_muxed_id :: INTEGER AS claimant_muxed_id, + VALUE :details :claimants :: variant AS claimants, + VALUE :details :data_account_id :: STRING AS data_account_id, + VALUE :details :data_name :: STRING AS data_name, + VALUE :details :"from" :: STRING AS "from", + VALUE :details :from_muxed :: STRING AS from_muxed, + VALUE :details :from_muxed_id :: INTEGER AS from_muxed_id, + VALUE :details :funder :: STRING AS funder, + VALUE :details :funder_muxed :: STRING AS funder_muxed, + VALUE :details :funder_muxed_id :: INTEGER AS funder_muxed_id, + VALUE :details :high_threshold :: INTEGER AS high_threshold, + VALUE :details :home_domain :: STRING AS home_domain, + VALUE :details :inflation_dest :: STRING AS inflation_dest, + VALUE :details :"into" :: STRING AS "into", + VALUE :details :into_muxed :: STRING AS into_muxed, + VALUE :details :into_muxed_id :: INTEGER AS into_muxed_id, + VALUE :details :"limit" :: FLOAT AS "limit", + VALUE :details :low_threshold :: INTEGER AS low_threshold, + VALUE :details :master_key_weight :: INTEGER AS master_key_weight, + VALUE :details :med_threshold :: INTEGER AS med_threshold, + VALUE :details :name :: STRING AS NAME, + VALUE :details :offer_id :: INTEGER AS offer_id, + VALUE :details :path :: variant AS path, + VALUE :details :price :: ARRAY AS price, + VALUE :details :price_r :: variant AS price_r, + VALUE :details :selling_asset_code :: STRING AS selling_asset_code, + VALUE :details :selling_asset_issuer :: STRING AS selling_asset_issuer, + VALUE :details :selling_asset_id :: STRING AS selling_asset_id, + VALUE :details :selling_asset_type :: STRING AS selling_asset_type, + VALUE :details :set_flags :: ARRAY AS set_flags, + VALUE :details :set_flags_s :: ARRAY AS set_flags_s, + VALUE :details :signer_account_id :: STRING AS signer_account_id, + VALUE :details :signer_key :: STRING AS signer_key, + VALUE :details :signer_weight :: INTEGER AS signer_weight, + VALUE :details :source_amount :: FLOAT AS source_amount, + VALUE :details :source_asset_code :: STRING AS source_asset_code, + VALUE :details :source_asset_issuer :: STRING AS source_asset_issuer, + VALUE :details :source_asset_id :: STRING AS source_asset_id, + VALUE :details :source_asset_type :: STRING AS source_asset_type, + VALUE :details :source_max :: FLOAT AS source_max, + VALUE :details :starting_balance :: FLOAT AS starting_balance, + VALUE :details :"to" :: STRING AS "to", + VALUE :details :to_muxed :: STRING AS to_muxed, + VALUE :details :to_muxed_id :: INTEGER AS to_muxed_id, + VALUE :details :trustee :: STRING AS trustee, + VALUE :details :trustee_muxed :: STRING AS trustee_muxed, + VALUE :details :trustee_muxed_id :: INTEGER AS trustee_muxed_id, + VALUE :details :trustline_account_id :: STRING AS trustline_account_id, + VALUE :details :trustline_asset :: STRING AS trustline_asset, + VALUE :details :trustor :: STRING AS trustor, + VALUE :details :trustor_muxed :: STRING AS trustor_muxed, + VALUE :details :trustor_muxed_id :: INTEGER AS trustor_muxed_id, + VALUE :details :value :: STRING AS VALUE, + VALUE :details :clear_flags :: ARRAY AS clear_flags, + VALUE :details :clear_flags_s :: ARRAY AS clear_flags_s, + VALUE :details :destination_min :: STRING AS destination_min, + VALUE :details :bump_to :: STRING AS bump_to, + VALUE :details :authorize_to_maintain_liabilities :: BOOLEAN AS authorize_to_maintain_liabilities, + VALUE :details :clawback_enabled :: BOOLEAN AS clawback_enabled, + VALUE :details :sponsor :: STRING AS sponsor, + VALUE :details :sponsored_id :: STRING AS sponsored_id, + VALUE :details :begin_sponsor :: STRING AS begin_sponsor, + VALUE :details :begin_sponsor_muxed :: STRING AS begin_sponsor_muxed, + VALUE :details :begin_sponsor_muxed_id :: INTEGER AS begin_sponsor_muxed_id, + VALUE :details :liquidity_pool_id :: STRING AS liquidity_pool_id, + VALUE :details :reserve_a_asset_type :: STRING AS reserve_a_asset_type, + VALUE :details :reserve_a_asset_code :: STRING AS reserve_a_asset_code, + VALUE :details :reserve_a_asset_issuer :: STRING AS reserve_a_asset_issuer, + VALUE :details :reserve_a_asset_id :: STRING AS reserve_a_asset_id, + VALUE :details :reserve_a_max_amount :: FLOAT AS reserve_a_max_amount, + VALUE :details :reserve_a_deposit_amount :: FLOAT AS reserve_a_deposit_amount, + VALUE :details :reserve_b_asset_type :: STRING AS reserve_b_asset_type, + VALUE :details :reserve_b_asset_code :: STRING AS reserve_b_asset_code, + VALUE :details :reserve_b_asset_issuer :: STRING AS reserve_b_asset_issuer, + VALUE :details :reserve_b_asset_id :: STRING AS reserve_b_asset_id, + VALUE :details :reserve_b_max_amount :: FLOAT AS reserve_b_max_amount, + VALUE :details :reserve_b_deposit_amount :: FLOAT AS reserve_b_deposit_amount, + VALUE :details :min_price :: FLOAT AS min_price, + VALUE :details :min_price_r :: variant AS min_price_r, + VALUE :details :max_price :: FLOAT AS max_price, + VALUE :details :max_price_r :: variant AS max_price_r, + VALUE :details :shares_received :: FLOAT AS shares_received, + VALUE :details :reserve_a_min_amount :: FLOAT AS reserve_a_min_amount, + VALUE :details :reserve_a_withdraw_amount :: FLOAT AS reserve_a_withdraw_amount, + VALUE :details :reserve_b_min_amount :: FLOAT AS reserve_b_min_amount, + VALUE :details :reserve_b_withdraw_amount :: FLOAT AS reserve_b_withdraw_amount, + VALUE :details :shares :: FLOAT AS shares, + VALUE :details :asset_balance_changes :: variant AS asset_balance_changes, + VALUE :details :parameters :: variant AS PARAMETERS, + VALUE :details :parameters_decoded :: variant AS parameters_decoded, + VALUE :details :function :: STRING AS FUNCTION, + VALUE :details :address :: STRING AS address, + VALUE :details :type :: STRING AS soroban_operation_type, + VALUE :details :extend_to :: INTEGER AS extend_to, + VALUE :details :contract_id :: STRING AS contract_id, + VALUE :details :contract_code_hash :: STRING AS contract_code_hash, + VALUE :details :ledger_key_hash :: STRING AS ledger_key_hash, + VALUE :details :ledgers_to_expire :: INTEGER AS ledgers_to_expire, + VALUE :details :: variant AS details_json, + VALUE :operation_result_code :: STRING AS operation_result_code, + VALUE :operation_trace_code :: STRING AS operation_trace_code, + TO_TIMESTAMP( + VALUE :closed_at :: INT, + 6 + ) AS closed_at, + VALUE :batch_id :: STRING AS batch_id, + TO_TIMESTAMP( + VALUE :batch_run_date :: INT, + 6 + ) AS batch_run_date, + TO_TIMESTAMP( + VALUE :batch_insert_ts :: INT, + 6 + ) AS batch_insert_ts, _inserted_timestamp FROM @@ -181,164 +181,21 @@ WITH pre_final AS ( {% if is_incremental() %} WHERE - _inserted_timestamp >= '{{ max_inserted_timestamp }}' + partition_gte_id >= '{{ max_part }}' + AND _inserted_timestamp > '{{ max_is }}' {% endif %} qualify ROW_NUMBER() over ( - PARTITION BY op_id + PARTITION BY id ORDER BY + batch_insert_ts DESC, _inserted_timestamp DESC ) = 1 ) SELECT - partition_id, - op_id, - account, - amount, - asset_code, - asset_issuer, - asset_type, - asset_id, - authorize, - balance_id, - buying_asset_code, - buying_asset_issuer, - buying_asset_type, - buying_asset_id, - "from", - funder, - high_threshold, - home_domain, - inflation_dest, - "into", - "limit", - low_threshold, - master_key_weight, - med_threshold, - NAME, - offer_id, - path, - price, - d, - n, - selling_asset_code, - selling_asset_issuer, - selling_asset_type, - selling_asset_id, - set_flags, - set_flags_s, - signer_key, - signer_weight, - source_amount, - source_asset_code, - source_asset_issuer, - source_asset_type, - source_asset_id, - source_max, - starting_balance, - "to", - trustee, - trustor, - trustline_asset, - "value", - clear_flags, - clear_flags_s, - destination_min, - bump_to, - sponsor, - sponsored_id, - begin_sponsor, - authorize_to_maintain_liabilities, - clawback_enabled, - liquidity_pool_id, - reserve_a_asset_type, - reserve_a_asset_id, - reserve_a_asset_code, - reserve_a_asset_issuer, - reserve_a_max_amount, - reserve_a_deposit_amount, - reserve_b_asset_type, - reserve_b_asset_id, - reserve_b_asset_code, - reserve_b_asset_issuer, - reserve_b_max_amount, - reserve_b_deposit_amount, - min_price, - min_price_r, - max_price, - max_price_r, - shares_received, - reserve_a_min_amount, - reserve_b_min_amount, - shares, - reserve_a_withdraw_amount, - reserve_b_withdraw_amount, - op_source_account, - op_source_account_muxed, - transaction_id, - TYPE, - transaction_hash, - ledger_sequence, - txn_account, - account_sequence, - max_fee, - txn_operation_count, - txn_created_at, - memo_type, - memo, - time_bounds, - SUCCESSFUL, - fee_charged, - fee_account, - new_max_fee, - account_muxed, - fee_account_muxed, - ledger_hash, - previous_ledger_hash, - transaction_count, - ledger_operation_count, - closed_at, - ledger_id, - total_coins, - fee_pool, - base_fee, - base_reserve, - max_tx_set_size, - protocol_version, - successful_transaction_count, - failed_transaction_count, - batch_id, - batch_run_date, - batch_insert_ts, - ledger_bounds, - min_account_sequence, - min_account_sequence_age, - min_account_sequence_ledger_gap, - extra_signers, - asset_balance_changes, - PARAMETERS, - parameters_decoded, - FUNCTION, - address, - soroban_operation_type, - extend_to, - contract_id, - contract_code_hash, - resource_fee, - soroban_resources_instructions, - soroban_resources_read_bytes, - soroban_resources_write_bytes, - transaction_result_code, - inclusion_fee_bid, - inclusion_fee_charged, - resource_fee_refund, - operation_result_code, - operation_trace_code, - op_application_order, - txn_application_order, - _inserted_timestamp, + *, {{ dbt_utils.generate_surrogate_key( - ['op_id'] + ['id'] ) }} AS operations_id, SYSDATE() AS inserted_timestamp, SYSDATE() AS modified_timestamp, diff --git a/models/silver/silver__trades.sql b/models/silver/silver__trades.sql index d63e6dc..a27cfc0 100644 --- a/models/silver/silver__trades.sql +++ b/models/silver/silver__trades.sql @@ -11,48 +11,61 @@ {% if execute %} {% if is_incremental() %} -{% set max_inserted_query %} +{% set max_is_query %} SELECT - MAX(_inserted_timestamp) AS _inserted_timestamp + MAX(_inserted_timestamp) AS _inserted_timestamp, + MAX(partition_gte_id) AS partition__gte_id FROM {{ this }} {% endset %} - {% set max_inserted_timestamp = run_query(max_inserted_query) [0] [0] %} + {% set result = run_query(max_is_query) %} + {% set max_is = result [0] [0] %} + {% set max_part = result [0] [1] %} {% endif %} {% endif %} WITH pre_final AS ( SELECT partition_id, - history_operation_id, - "order", - ledger_closed_at, - selling_account_address, - selling_asset_code, - selling_asset_issuer, - selling_asset_type, - selling_asset_id, - selling_amount, - buying_account_address, - buying_asset_code, - buying_asset_issuer, - buying_asset_type, - buying_asset_id, - buying_amount, - price_n, - price_d, - selling_offer_id, - buying_offer_id, - batch_id, - batch_run_date, - batch_insert_ts, - selling_liquidity_pool_id, - liquidity_pool_fee, - trade_type, - rounding_slippage, - seller_is_exact, + partition_gte_id, + history_operation_id :: INTEGER AS history_operation_id, + VALUE: "order" :: INTEGER AS "order", + TO_TIMESTAMP( + VALUE :ledger_closed_at :: INT, + 6 + ) AS ledger_closed_at, + VALUE: selling_account_address :: STRING AS selling_account_address, + VALUE: selling_asset_code :: STRING AS selling_asset_code, + VALUE: selling_asset_issuer :: STRING AS selling_asset_issuer, + VALUE: selling_asset_type :: STRING AS selling_asset_type, + VALUE: selling_asset_id :: INTEGER AS selling_asset_id, + VALUE: selling_amount :: FLOAT AS selling_amount, + VALUE: buying_account_address :: STRING AS buying_account_address, + VALUE: buying_asset_code :: STRING AS buying_asset_code, + VALUE: buying_asset_issuer :: STRING AS buying_asset_issuer, + VALUE: buying_asset_type :: STRING AS buying_asset_type, + VALUE: buying_asset_id :: INTEGER AS buying_asset_id, + VALUE: buying_amount :: FLOAT AS buying_amount, + VALUE: price_n :: INTEGER AS price_n, + VALUE: price_d :: INTEGER AS price_d, + VALUE: selling_offer_id :: INTEGER AS selling_offer_id, + VALUE: buying_offer_id :: INTEGER AS buying_offer_id, + VALUE :batch_id :: STRING AS batch_id, + TO_TIMESTAMP( + VALUE :batch_run_date :: INT, + 6 + ) AS batch_run_date, + TO_TIMESTAMP( + VALUE :batch_insert_ts :: INT, + 6 + ) AS batch_insert_ts, + VALUE: selling_liquidity_pool_id :: STRING AS selling_liquidity_pool_id, + VALUE: liquidity_pool_fee :: INTEGER AS liquidity_pool_fee, + VALUE: trade_type :: INTEGER AS trade_type, + VALUE: rounding_slippage :: INTEGER AS rounding_slippage, + VALUE: seller_is_exact :: BOOLEAN AS seller_is_exact, _inserted_timestamp FROM @@ -64,18 +77,21 @@ WITH pre_final AS ( {% if is_incremental() %} WHERE - _inserted_timestamp >= '{{ max_inserted_timestamp }}' + partition_gte_id >= '{{ max_part }}' + AND _inserted_timestamp > '{{ max_is }}' {% endif %} qualify ROW_NUMBER() over ( PARTITION BY history_operation_id, "order" ORDER BY + batch_insert_ts DESC, _inserted_timestamp DESC ) = 1 ) SELECT partition_id, + partition_gte_id, history_operation_id, "order", ledger_closed_at, diff --git a/models/silver/silver__transactions.sql b/models/silver/silver__transactions.sql index 3930a65..b719498 100644 --- a/models/silver/silver__transactions.sql +++ b/models/silver/silver__transactions.sql @@ -11,65 +11,81 @@ {% if execute %} {% if is_incremental() %} -{% set max_inserted_query %} +{% set max_is_query %} SELECT - MAX(_inserted_timestamp) AS _inserted_timestamp + MAX(_inserted_timestamp) AS _inserted_timestamp, + MAX(partition_gte_id) AS partition__gte_id FROM {{ this }} {% endset %} - {% set max_inserted_timestamp = run_query(max_inserted_query) [0] [0] %} + {% set result = run_query(max_is_query) %} + {% set max_is = result [0] [0] %} + {% set max_part = result [0] [1] %} {% endif %} {% endif %} WITH pre_final AS ( SELECT partition_id, - id, - transaction_hash, - ledger_sequence, - account, - account_sequence, - max_fee, - operation_count, - created_at, - memo_type, - memo, - time_bounds, - SUCCESSFUL, - fee_charged, - inner_transaction_hash, - fee_account, - new_max_fee, - account_muxed, - fee_account_muxed, - batch_id, - batch_run_date, - batch_insert_ts, - ledger_bounds, - min_account_sequence, - min_account_sequence_age, - min_account_sequence_ledger_gap, - tx_envelope, - tx_result, - tx_meta, - tx_fee_meta, - extra_signers, - resource_fee, - soroban_resources_instructions, - soroban_resources_read_bytes, - soroban_resources_write_bytes, - closed_at, - transaction_result_code, - inclusion_fee_bid, - inclusion_fee_charged, - resource_fee_refund, - non_refundable_resource_fee_charged, - refundable_resource_fee_charged, - rent_fee_charged, - tx_signers, - refundable_fee, + partition_gte_id, + id :: INTEGER AS id, + VALUE :transaction_hash :: STRING AS transaction_hash, + VALUE :ledger_sequence :: INTEGER AS ledger_sequence, + VALUE :account :: STRING AS account, + VALUE :account_sequence :: INTEGER AS account_sequence, + VALUE :max_fee :: INTEGER AS max_fee, + VALUE :operation_count :: INTEGER AS operation_count, + TO_TIMESTAMP( + VALUE :created_at :: INT, + 6 + ) AS created_at, + VALUE :memo_type :: STRING AS memo_type, + VALUE :memo :: STRING AS memo, + VALUE :time_bounds :: STRING AS time_bounds, + VALUE :SUCCESSFUL :: BOOLEAN AS SUCCESSFUL, + VALUE :fee_charged :: INTEGER AS fee_charged, + VALUE :inner_transaction_hash :: STRING AS inner_transaction_hash, + VALUE :fee_account :: STRING AS fee_account, + VALUE :new_max_fee :: INTEGER AS new_max_fee, + VALUE :account_muxed :: STRING AS account_muxed, + VALUE :fee_account_muxed :: STRING AS fee_account_muxed, + VALUE :batch_id :: STRING AS batch_id, + TO_TIMESTAMP( + VALUE :batch_run_date :: INT, + 6 + ) AS batch_run_date, + TO_TIMESTAMP( + VALUE :batch_insert_ts :: INT, + 6 + ) AS batch_insert_ts, + VALUE :ledger_bounds :: STRING AS ledger_bounds, + VALUE :min_account_sequence :: INTEGER AS min_account_sequence, + VALUE :min_account_sequence_age :: INTEGER AS min_account_sequence_age, + VALUE :min_account_sequence_ledger_gap :: INTEGER AS min_account_sequence_ledger_gap, + VALUE :tx_envelope :: STRING AS tx_envelope, + VALUE :tx_result :: STRING AS tx_result, + VALUE :tx_meta :: STRING AS tx_meta, + VALUE :tx_fee_meta :: STRING AS tx_fee_meta, + VALUE :extra_signers :: ARRAY AS extra_signers, + VALUE :resource_fee :: INTEGER AS resource_fee, + VALUE :soroban_resources_instructions :: INTEGER AS soroban_resources_instructions, + VALUE :soroban_resources_read_bytes :: INTEGER AS soroban_resources_read_bytes, + VALUE :soroban_resources_write_bytes :: INTEGER AS soroban_resources_write_bytes, + TO_TIMESTAMP( + VALUE :closed_at :: INT, + 6 + ) AS closed_at, + VALUE :transaction_result_code :: STRING AS transaction_result_code, + VALUE :inclusion_fee_bid :: INTEGER AS inclusion_fee_bid, + VALUE :inclusion_fee_charged :: INTEGER AS inclusion_fee_charged, + VALUE :resource_fee_refund :: INTEGER AS resource_fee_refund, + VALUE :non_refundable_resource_fee_charged :: INTEGER AS non_refundable_resource_fee_charged, + VALUE :refundable_resource_fee_charged :: INTEGER AS refundable_resource_fee_charged, + VALUE :rent_fee_charged :: INTEGER AS rent_fee_charged, + VALUE :tx_signers :: STRING AS tx_signers, + VALUE :refundable_fee :: INTEGER AS refundable_fee, _inserted_timestamp FROM @@ -81,17 +97,20 @@ WITH pre_final AS ( {% if is_incremental() %} WHERE - _inserted_timestamp >= '{{ max_inserted_timestamp }}' + partition_gte_id >= '{{ max_part }}' + AND _inserted_timestamp > '{{ max_is }}' {% endif %} qualify ROW_NUMBER() over ( PARTITION BY id ORDER BY + batch_insert_ts DESC, _inserted_timestamp DESC ) = 1 ) SELECT partition_id, + partition_gte_id, id, transaction_hash, ledger_sequence, diff --git a/models/sources.yml b/models/sources.yml index 7e37db7..e7d4761 100644 --- a/models/sources.yml +++ b/models/sources.yml @@ -3,14 +3,14 @@ version: 2 sources: - name: bronze_streamline database: streamline - schema: "{{ 'stellar' if target.database == 'STELLAR' else 'stellar_dev' }}" + schema: stellar tables: - name: accounts - name: contract_data - - name: enriched_history_operations - name: history_assets - name: history_contract_events - name: history_ledgers + - name: history_operations - name: history_trades - name: history_transactions - name: liquidity_pools @@ -30,16 +30,3 @@ sources: schema: github_actions tables: - name: workflows - - name: bronze_bq - database: STELLAR_SAMPLE - schema: SAMPLE_DATA - tables: - - name: ACCOUNTS - - name: CONTRACT_DATA - - name: ENRICHED_HISTORY_OPERATIONS - - name: HISTORY_ASSETS - - name: HISTORY_CONTRACT_EVENTS - - name: HISTORY_LEDGERS - - name: HISTORY_TRADES - - name: HISTORY_TRANSACTIONS - - name: LIQUIDITY_POOLS \ No newline at end of file