From 53fcb53a571b14ddd563277b917b1ca7fa64c1d8 Mon Sep 17 00:00:00 2001 From: Eric Laurello Date: Tue, 4 Feb 2025 14:01:55 -0500 Subject: [PATCH] bronze & silver --- macros/streamline/models.sql | 6 +- models/bronze/core/bronze__accounts.sql | 2 +- models/bronze/core/bronze__accounts_FR.sql | 2 +- models/bronze/core/bronze__assets.sql | 8 +- models/bronze/core/bronze__assets_FR.sql | 8 +- .../bronze/core/bronze__liquidity_pools.sql | 6 +- .../core/bronze__liquidity_pools_FR.sql | 6 +- models/bronze/core/bronze__operations.sql | 2 +- models/bronze/core/bronze__operations_FR.sql | 2 +- models/bronze/core/bronze__trades.sql | 6 +- models/bronze/core/bronze__trades_FR.sql | 6 +- models/bronze/core/bronze__transactions.sql | 6 +- .../bronze/core/bronze__transactions_FR.sql | 6 +- models/silver/silver__accounts.sql | 87 ++++++++------ models/silver/silver__assets.sql | 68 +++++------ models/silver/silver__ledgers.sql | 13 +- models/silver/silver__liquidity_pools.sql | 113 ++++++++++-------- models/silver/silver__operations.sql | 23 ++-- models/silver/silver__trades.sql | 76 +++++++----- models/silver/silver__transactions.sql | 113 ++++++++++-------- 20 files changed, 308 insertions(+), 251 deletions(-) diff --git a/macros/streamline/models.sql b/macros/streamline/models.sql index 3f6d900..2819e7d 100644 --- a/macros/streamline/models.sql +++ b/macros/streamline/models.sql @@ -22,8 +22,7 @@ SELECT {{ other_cols }}, value, _inserted_timestamp, - s.{{ partition_name }}, - s.value AS VALUE + s.{{ partition_name }} FROM {{ source( "bronze_streamline", @@ -63,8 +62,7 @@ SELECT {{ other_cols }}, value, _inserted_timestamp, - s.{{ partition_name }}, - s.value AS VALUE + s.{{ partition_name }} FROM {{ source( "bronze_streamline", diff --git a/models/bronze/core/bronze__accounts.sql b/models/bronze/core/bronze__accounts.sql index 4bc8b6b..fc82ee2 100644 --- a/models/bronze/core/bronze__accounts.sql +++ b/models/bronze/core/bronze__accounts.sql @@ -6,5 +6,5 @@ partition_function = "TRY_TO_DATE(left(split_part(split_part(file_name, '=', -1), '/', -1),8), 'YYYYMMDD')", partition_name = "partition_gte_id", unique_key = "ACCOUNT_ID", - other_cols = "partition_id, BALANCE, BUYING_LIABILITIES, SELLING_LIABILITIES, SEQUENCE_NUMBER, NUM_SUBENTRIES, INFLATION_DESTINATION, FLAGS, HOME_DOMAIN, MASTER_WEIGHT, THRESHOLD_LOW, THRESHOLD_MEDIUM, THRESHOLD_HIGH, LAST_MODIFIED_LEDGER, LEDGER_ENTRY_CHANGE, DELETED, BATCH_ID, BATCH_RUN_DATE, BATCH_INSERT_TS, SPONSOR, NUM_SPONSORED, NUM_SPONSORING, SEQUENCE_LEDGER, SEQUENCE_TIME, CLOSED_AT, LEDGER_SEQUENCE" + other_cols = "partition_id" ) }} diff --git a/models/bronze/core/bronze__accounts_FR.sql b/models/bronze/core/bronze__accounts_FR.sql index b3bac40..e8c666a 100644 --- a/models/bronze/core/bronze__accounts_FR.sql +++ b/models/bronze/core/bronze__accounts_FR.sql @@ -6,5 +6,5 @@ partition_function = "TRY_TO_DATE(left(split_part(split_part(file_name, '=', -1), '/', -1),8), 'YYYYMMDD')", partition_name = "partition_gte_id", unique_key = "ACCOUNT_ID", - other_cols = "partition_id, BALANCE, BUYING_LIABILITIES, SELLING_LIABILITIES, SEQUENCE_NUMBER, NUM_SUBENTRIES, INFLATION_DESTINATION, FLAGS, HOME_DOMAIN, MASTER_WEIGHT, THRESHOLD_LOW, THRESHOLD_MEDIUM, THRESHOLD_HIGH, LAST_MODIFIED_LEDGER, LEDGER_ENTRY_CHANGE, DELETED, BATCH_ID, BATCH_RUN_DATE, BATCH_INSERT_TS, SPONSOR, NUM_SPONSORED, NUM_SPONSORING, SEQUENCE_LEDGER, SEQUENCE_TIME, CLOSED_AT, LEDGER_SEQUENCE" + other_cols = "partition_id" ) }} diff --git a/models/bronze/core/bronze__assets.sql b/models/bronze/core/bronze__assets.sql index caf2690..31bc4e1 100644 --- a/models/bronze/core/bronze__assets.sql +++ b/models/bronze/core/bronze__assets.sql @@ -3,8 +3,8 @@ ) }} {{ streamline_external_table_query_v2( model = "history_assets", - partition_function = " TO_DATE( SPLIT_PART(SPLIT_PART(file_name, '/', 3), '=', 2)|| '01', 'YYYYMMDD')", - partition_name = "partition_id", - unique_key = "ID", - other_cols = "ASSET_TYPE, ASSET_CODE, ASSET_ISSUER, BATCH_RUN_DATE, BATCH_ID, BATCH_INSERT_TS, ASSET_ID" + partition_function = "TRY_TO_DATE(left(split_part(split_part(file_name, '=', -1), '/', -1),8), 'YYYYMMDD')", + partition_name = "partition_gte_id", + unique_key = "asset_id", + other_cols = "partition_id" ) }} diff --git a/models/bronze/core/bronze__assets_FR.sql b/models/bronze/core/bronze__assets_FR.sql index 5ff1c52..c3e1bb2 100644 --- a/models/bronze/core/bronze__assets_FR.sql +++ b/models/bronze/core/bronze__assets_FR.sql @@ -3,8 +3,8 @@ ) }} {{ streamline_external_table_FR_query_v2( model = "history_assets", - partition_function = " TO_DATE( SPLIT_PART(SPLIT_PART(file_name, '/', 3), '=', 2)|| '01', 'YYYYMMDD')", - partition_name = "partition_id", - unique_key = "ID", - other_cols = " ASSET_TYPE, ASSET_CODE, ASSET_ISSUER, BATCH_RUN_DATE, BATCH_ID, BATCH_INSERT_TS, ASSET_ID" + partition_function = "TRY_TO_DATE(left(split_part(split_part(file_name, '=', -1), '/', -1),8), 'YYYYMMDD')", + partition_name = "partition_gte_id", + unique_key = "asset_id", + other_cols = "partition_id" ) }} diff --git a/models/bronze/core/bronze__liquidity_pools.sql b/models/bronze/core/bronze__liquidity_pools.sql index 7fd33eb..d07b08a 100644 --- a/models/bronze/core/bronze__liquidity_pools.sql +++ b/models/bronze/core/bronze__liquidity_pools.sql @@ -3,8 +3,8 @@ ) }} {{ streamline_external_table_query_v2( model = "liquidity_pools", - partition_function = " TO_DATE( SPLIT_PART(SPLIT_PART(file_name, '/', 3), '=', 2)|| '01', 'YYYYMMDD')", - partition_name = "partition_id", + partition_function = "TRY_TO_DATE(left(split_part(split_part(file_name, '=', -1), '/', -1),8), 'YYYYMMDD')", + partition_name = "partition_gte_id", unique_key = "LIQUIDITY_POOL_ID", - other_cols = "TYPE, FEE, TRUSTLINE_COUNT, POOL_SHARE_COUNT, ASSET_A_TYPE, ASSET_A_CODE, ASSET_A_ISSUER, ASSET_A_ID, ASSET_A_AMOUNT, ASSET_B_TYPE, ASSET_B_CODE, ASSET_B_ISSUER, ASSET_B_ID, ASSET_B_AMOUNT, LAST_MODIFIED_LEDGER, LEDGER_ENTRY_CHANGE, DELETED, BATCH_ID, BATCH_RUN_DATE, BATCH_INSERT_TS, CLOSED_AT, LEDGER_SEQUENCE" + other_cols = "partition_id" ) }} diff --git a/models/bronze/core/bronze__liquidity_pools_FR.sql b/models/bronze/core/bronze__liquidity_pools_FR.sql index 9c65159..7005abe 100644 --- a/models/bronze/core/bronze__liquidity_pools_FR.sql +++ b/models/bronze/core/bronze__liquidity_pools_FR.sql @@ -3,8 +3,8 @@ ) }} {{ streamline_external_table_FR_query_v2( model = "liquidity_pools", - partition_function = " TO_DATE( SPLIT_PART(SPLIT_PART(file_name, '/', 3), '=', 2)|| '01', 'YYYYMMDD')", - partition_name = "partition_id", + partition_function = "TRY_TO_DATE(left(split_part(split_part(file_name, '=', -1), '/', -1),8), 'YYYYMMDD')", + partition_name = "partition_gte_id", unique_key = "LIQUIDITY_POOL_ID", - other_cols = "TYPE, FEE, TRUSTLINE_COUNT, POOL_SHARE_COUNT, ASSET_A_TYPE, ASSET_A_CODE, ASSET_A_ISSUER, ASSET_A_ID, ASSET_A_AMOUNT, ASSET_B_TYPE, ASSET_B_CODE, ASSET_B_ISSUER, ASSET_B_ID, ASSET_B_AMOUNT, LAST_MODIFIED_LEDGER, LEDGER_ENTRY_CHANGE, DELETED, BATCH_ID, BATCH_RUN_DATE, BATCH_INSERT_TS, CLOSED_AT, LEDGER_SEQUENCE" + other_cols = "partition_id" ) }} diff --git a/models/bronze/core/bronze__operations.sql b/models/bronze/core/bronze__operations.sql index 027dadd..fba8d4b 100644 --- a/models/bronze/core/bronze__operations.sql +++ b/models/bronze/core/bronze__operations.sql @@ -6,5 +6,5 @@ partition_function = "TRY_TO_DATE(left(split_part(split_part(file_name, '=', -1), '/', -1),8), 'YYYYMMDD')", partition_name = "partition_gte_id", unique_key = "ID", - other_cols = 'partition_id,VALUE' + other_cols = 'partition_id' ) }} diff --git a/models/bronze/core/bronze__operations_FR.sql b/models/bronze/core/bronze__operations_FR.sql index 488cb77..f3fa7e0 100644 --- a/models/bronze/core/bronze__operations_FR.sql +++ b/models/bronze/core/bronze__operations_FR.sql @@ -6,5 +6,5 @@ partition_function = "TRY_TO_DATE(left(split_part(split_part(file_name, '=', -1), '/', -1),8), 'YYYYMMDD')", partition_name = "partition_gte_id", unique_key = "ID", - other_cols = 'partition_id,VALUE' + other_cols = 'partition_id' ) }} diff --git a/models/bronze/core/bronze__trades.sql b/models/bronze/core/bronze__trades.sql index bb588e7..f433a6d 100644 --- a/models/bronze/core/bronze__trades.sql +++ b/models/bronze/core/bronze__trades.sql @@ -3,8 +3,8 @@ ) }} {{ streamline_external_table_query_v2( model = "history_trades", - partition_function = " TO_DATE( SPLIT_PART(SPLIT_PART(file_name, '/', 3), '=', 2)|| '01', 'YYYYMMDD')", - partition_name = "partition_id", + partition_function = "TRY_TO_DATE(left(split_part(split_part(file_name, '=', -1), '/', -1),8), 'YYYYMMDD')", + partition_name = "partition_gte_id", unique_key = "HISTORY_OPERATION_ID", - other_cols = '"order", LEDGER_CLOSED_AT, SELLING_ACCOUNT_ADDRESS, SELLING_ASSET_CODE, SELLING_ASSET_ISSUER, SELLING_ASSET_TYPE, SELLING_ASSET_ID, SELLING_AMOUNT, BUYING_ACCOUNT_ADDRESS, BUYING_ASSET_CODE, BUYING_ASSET_ISSUER, BUYING_ASSET_TYPE, BUYING_ASSET_ID, BUYING_AMOUNT, PRICE_N, PRICE_D, SELLING_OFFER_ID, BUYING_OFFER_ID, BATCH_ID, BATCH_RUN_DATE, BATCH_INSERT_TS, SELLING_LIQUIDITY_POOL_ID, LIQUIDITY_POOL_FEE, TRADE_TYPE, ROUNDING_SLIPPAGE, SELLER_IS_EXACT' + other_cols = 'partition_id' ) }} diff --git a/models/bronze/core/bronze__trades_FR.sql b/models/bronze/core/bronze__trades_FR.sql index 09c1ce2..61c2713 100644 --- a/models/bronze/core/bronze__trades_FR.sql +++ b/models/bronze/core/bronze__trades_FR.sql @@ -3,8 +3,8 @@ ) }} {{ streamline_external_table_FR_query_v2( model = "history_trades", - partition_function = " TO_DATE( SPLIT_PART(SPLIT_PART(file_name, '/', 3), '=', 2)|| '01', 'YYYYMMDD')", - partition_name = "partition_id", + partition_function = "TRY_TO_DATE(left(split_part(split_part(file_name, '=', -1), '/', -1),8), 'YYYYMMDD')", + partition_name = "partition_gte_id", unique_key = "HISTORY_OPERATION_ID", - other_cols = '"order", LEDGER_CLOSED_AT, SELLING_ACCOUNT_ADDRESS, SELLING_ASSET_CODE, SELLING_ASSET_ISSUER, SELLING_ASSET_TYPE, SELLING_ASSET_ID, SELLING_AMOUNT, BUYING_ACCOUNT_ADDRESS, BUYING_ASSET_CODE, BUYING_ASSET_ISSUER, BUYING_ASSET_TYPE, BUYING_ASSET_ID, BUYING_AMOUNT, PRICE_N, PRICE_D, SELLING_OFFER_ID, BUYING_OFFER_ID, BATCH_ID, BATCH_RUN_DATE, BATCH_INSERT_TS, SELLING_LIQUIDITY_POOL_ID, LIQUIDITY_POOL_FEE, TRADE_TYPE, ROUNDING_SLIPPAGE, SELLER_IS_EXACT' + other_cols = 'partition_id' ) }} diff --git a/models/bronze/core/bronze__transactions.sql b/models/bronze/core/bronze__transactions.sql index ae7740d..aad1ef8 100644 --- a/models/bronze/core/bronze__transactions.sql +++ b/models/bronze/core/bronze__transactions.sql @@ -3,8 +3,8 @@ ) }} {{ streamline_external_table_query_v2( model = "history_transactions", - partition_function = " TO_DATE( SPLIT_PART(SPLIT_PART(file_name, '/', 3), '=', 2)|| '01', 'YYYYMMDD')", - partition_name = "partition_id", + partition_function = "TRY_TO_DATE(left(split_part(split_part(file_name, '=', -1), '/', -1),8), 'YYYYMMDD')", + partition_name = "partition_gte_id", unique_key = "ID", - other_cols = "TRANSACTION_HASH, LEDGER_SEQUENCE, ACCOUNT, ACCOUNT_SEQUENCE, MAX_FEE, OPERATION_COUNT, CREATED_AT, MEMO_TYPE, MEMO, TIME_BOUNDS, SUCCESSFUL, FEE_CHARGED, INNER_TRANSACTION_HASH, FEE_ACCOUNT, NEW_MAX_FEE, ACCOUNT_MUXED, FEE_ACCOUNT_MUXED, BATCH_ID, BATCH_RUN_DATE, BATCH_INSERT_TS, LEDGER_BOUNDS, MIN_ACCOUNT_SEQUENCE, MIN_ACCOUNT_SEQUENCE_AGE, MIN_ACCOUNT_SEQUENCE_LEDGER_GAP, TX_ENVELOPE, TX_RESULT, TX_META, TX_FEE_META, EXTRA_SIGNERS, RESOURCE_FEE, SOROBAN_RESOURCES_INSTRUCTIONS, SOROBAN_RESOURCES_READ_BYTES, SOROBAN_RESOURCES_WRITE_BYTES, CLOSED_AT, TRANSACTION_RESULT_CODE, INCLUSION_FEE_BID, INCLUSION_FEE_CHARGED, RESOURCE_FEE_REFUND, NON_REFUNDABLE_RESOURCE_FEE_CHARGED, REFUNDABLE_RESOURCE_FEE_CHARGED, RENT_FEE_CHARGED, TX_SIGNERS, REFUNDABLE_FEE" + other_cols = "partition_id" ) }} diff --git a/models/bronze/core/bronze__transactions_FR.sql b/models/bronze/core/bronze__transactions_FR.sql index a721ef5..85b37e1 100644 --- a/models/bronze/core/bronze__transactions_FR.sql +++ b/models/bronze/core/bronze__transactions_FR.sql @@ -3,8 +3,8 @@ ) }} {{ streamline_external_table_FR_query_v2( model = "history_transactions", - partition_function = " TO_DATE( SPLIT_PART(SPLIT_PART(file_name, '/', 3), '=', 2)|| '01', 'YYYYMMDD')", - partition_name = "partition_id", + partition_function = "TRY_TO_DATE(left(split_part(split_part(file_name, '=', -1), '/', -1),8), 'YYYYMMDD')", + partition_name = "partition_gte_id", unique_key = "ID", - other_cols = "TRANSACTION_HASH, LEDGER_SEQUENCE, ACCOUNT, ACCOUNT_SEQUENCE, MAX_FEE, OPERATION_COUNT, CREATED_AT, MEMO_TYPE, MEMO, TIME_BOUNDS, SUCCESSFUL, FEE_CHARGED, INNER_TRANSACTION_HASH, FEE_ACCOUNT, NEW_MAX_FEE, ACCOUNT_MUXED, FEE_ACCOUNT_MUXED, BATCH_ID, BATCH_RUN_DATE, BATCH_INSERT_TS, LEDGER_BOUNDS, MIN_ACCOUNT_SEQUENCE, MIN_ACCOUNT_SEQUENCE_AGE, MIN_ACCOUNT_SEQUENCE_LEDGER_GAP, TX_ENVELOPE, TX_RESULT, TX_META, TX_FEE_META, EXTRA_SIGNERS, RESOURCE_FEE, SOROBAN_RESOURCES_INSTRUCTIONS, SOROBAN_RESOURCES_READ_BYTES, SOROBAN_RESOURCES_WRITE_BYTES, CLOSED_AT, TRANSACTION_RESULT_CODE, INCLUSION_FEE_BID, INCLUSION_FEE_CHARGED, RESOURCE_FEE_REFUND, NON_REFUNDABLE_RESOURCE_FEE_CHARGED, REFUNDABLE_RESOURCE_FEE_CHARGED, RENT_FEE_CHARGED, TX_SIGNERS, REFUNDABLE_FEE" + other_cols = "partition_id" ) }} diff --git a/models/silver/silver__accounts.sql b/models/silver/silver__accounts.sql index ecd88d1..5a70873 100644 --- a/models/silver/silver__accounts.sql +++ b/models/silver/silver__accounts.sql @@ -5,7 +5,6 @@ incremental_predicates = ["dynamic_range_predicate", "partition_id::date"], merge_exclude_columns = ["inserted_timestamp"], cluster_by = ['closed_at::DATE','partition_id','modified_timestamp::DATE'], - full_refresh = false, tags = ['scheduled_core'], ) }} @@ -15,20 +14,15 @@ {% set max_is_query %} SELECT - MAX(_inserted_timestamp) AS _inserted_timestamp -FROM - {{ this }} - - {% endset %} - {% set max_is = run_query(max_is_query) [0] [0] %} - {% set max_part_query %} -SELECT + MAX(_inserted_timestamp) AS _inserted_timestamp, MAX(partition_gte_id) AS partition__gte_id FROM {{ this }} {% endset %} - {% set max_part = run_query(max_part_query) [0] [0] %} + {% set result = run_query(max_is_query) %} + {% set max_is = result [0] [0] %} + {% set max_part = result [0] [1] %} {% endif %} {% endif %} @@ -36,32 +30,53 @@ WITH pre_final AS ( SELECT partition_id, partition_gte_id, - account_id :: STRING AS account_id, - balance :: FLOAT AS balance, - buying_liabilities :: FLOAT AS buying_liabilities, - selling_liabilities :: FLOAT AS selling_liabilities, - sequence_number :: INTEGER AS sequence_number, - num_subentries :: INTEGER AS num_subentries, - inflation_destination :: STRING AS inflation_destination, - flags :: INTEGER AS flags, - home_domain :: STRING AS home_domain, - master_weight :: INTEGER AS master_weight, - threshold_low :: INTEGER AS threshold_low, - threshold_medium :: INTEGER AS threshold_medium, - threshold_high :: INTEGER AS threshold_high, - last_modified_ledger :: INTEGER AS last_modified_ledger, - ledger_entry_change :: INTEGER AS ledger_entry_change, - deleted :: BOOLEAN AS deleted, - batch_id :: STRING AS batch_id, - batch_run_date :: TIMESTAMP AS batch_run_date, - batch_insert_ts :: TIMESTAMP AS batch_insert_ts, - sponsor :: STRING AS sponsor, - num_sponsored :: INTEGER AS num_sponsored, - num_sponsoring :: INTEGER AS num_sponsoring, - sequence_ledger :: INTEGER AS sequence_ledger, - sequence_time :: TIMESTAMP AS sequence_time, - closed_at :: TIMESTAMP AS closed_at, - ledger_sequence :: INTEGER AS ledger_sequence, + VALUE :account_id :: STRING AS account_id, + TRY_CAST( + VALUE :balance :: STRING AS FLOAT + ) AS balance, + TRY_CAST( + VALUE :buying_liabilities :: STRING AS FLOAT + ) AS buying_liabilities, + TRY_CAST( + VALUE :selling_liabilities :: STRING AS FLOAT + ) AS selling_liabilities, + VALUE :sequence_number :: INTEGER AS sequence_number, + VALUE :num_subentries :: INTEGER AS num_subentries, + VALUE :inflation_destination :: STRING AS inflation_destination, + VALUE :flags :: INTEGER AS flags, + VALUE :home_domain :: STRING AS home_domain, + VALUE :master_weight :: INTEGER AS master_weight, + VALUE :threshold_low :: INTEGER AS threshold_low, + VALUE :threshold_medium :: INTEGER AS threshold_medium, + VALUE :threshold_high :: INTEGER AS threshold_high, + VALUE :last_modified_ledger :: INTEGER AS last_modified_ledger, + VALUE :ledger_entry_change :: INTEGER AS ledger_entry_change, + VALUE :deleted :: BOOLEAN AS deleted, + TO_TIMESTAMP( + VALUE :batch_run_date :: INT, + 6 + ) AS batch_run_date, + VALUE: batch_id :: STRING AS batch_id, + TO_TIMESTAMP( + VALUE :batch_insert_ts :: INT, + 6 + ) AS batch_insert_ts, + VALUE :sponsor :: STRING AS sponsor, + VALUE :num_sponsored :: INTEGER AS num_sponsored, + VALUE :num_sponsoring :: INTEGER AS num_sponsoring, + VALUE :sequence_ledger :: INTEGER AS sequence_ledger, + TO_TIMESTAMP( + VALUE :sequence_time :: INT, + 6 + ) AS sequence_time, + TO_TIMESTAMP( + VALUE :closed_at :: INT, + 6 + ) AS closed_at, + TO_TIMESTAMP( + VALUE :ledger_sequence :: INT, + 6 + ) AS ledger_sequence, _inserted_timestamp FROM diff --git a/models/silver/silver__assets.sql b/models/silver/silver__assets.sql index ceff93e..4006d4e 100644 --- a/models/silver/silver__assets.sql +++ b/models/silver/silver__assets.sql @@ -11,67 +11,64 @@ {% if execute %} {% if is_incremental() %} -{% set max_bat_query %} +{% set max_is_query %} SELECT - MAX(batch_insert_ts) AS batch_insert_ts + MAX(_inserted_timestamp) AS _inserted_timestamp, + MAX(partition_gte_id) AS partition__gte_id FROM {{ this }} {% endset %} - {% set max_batch = run_query(max_bat_query) [0] [0] %} -{% endif %} - -{% if is_incremental() %} -{% set max_part_query %} -SELECT - MAX(partition_id) AS partition_id -FROM - {{ this }} - - {% endset %} - {% set max_part = run_query(max_part_query) [0] [0] %} + {% set result = run_query(max_is_query) %} + {% set max_is = result [0] [0] %} + {% set max_part = result [0] [1] %} {% endif %} {% endif %} WITH pre_final AS ( SELECT partition_id, - id :: FLOAT AS id, - asset_type :: STRING AS asset_type, - asset_code :: STRING AS asset_code, - asset_issuer :: STRING AS asset_issuer, - batch_run_date :: datetime AS batch_run_date, - batch_id :: STRING AS batch_id, - batch_insert_ts :: datetime AS batch_insert_ts, - asset_id :: INT AS asset_id + partition_gte_id, + VALUE :id :: FLOAT AS id, + VALUE :asset_type :: STRING AS asset_type, + VALUE :asset_code :: STRING AS asset_code, + VALUE :asset_issuer :: STRING AS asset_issuer, + TO_TIMESTAMP( + VALUE :batch_run_date :: INT, + 6 + ) AS batch_run_date, + VALUE: batch_id :: STRING AS batch_id, + TO_TIMESTAMP( + VALUE :batch_insert_ts :: INT, + 6 + ) AS batch_insert_ts, + VALUE :asset_id :: INT AS asset_id, + _inserted_timestamp FROM - {# {% if is_incremental() %} - {{ ref('bronze__assets') }} - {% else %} - {{ ref('bronze__assets_FR') }} - {% endif %} - #} - {{ source( - 'bronze_streamline', - 'history_assets' - ) }} +{% if is_incremental() %} +{{ ref('bronze__assets') }} +{% else %} + {{ ref('bronze__assets_FR') }} +{% endif %} {% if is_incremental() %} WHERE - partition_id >= '{{ max_part }}' - AND batch_insert_ts > '{{ max_batch }}' + partition_gte_id >= '{{ max_part }}' + AND _inserted_timestamp > '{{ max_is }}' {% endif %} qualify ROW_NUMBER() over ( PARTITION BY asset_id ORDER BY - batch_insert_ts DESC + batch_insert_ts DESC, + _inserted_timestamp DESC ) = 1 ) SELECT partition_id, + partition_gte_id, id, asset_type, asset_code, @@ -80,6 +77,7 @@ SELECT batch_id, batch_insert_ts, asset_id, + _inserted_timestamp, {{ dbt_utils.generate_surrogate_key( ['asset_id'] ) }} AS assets_id, diff --git a/models/silver/silver__ledgers.sql b/models/silver/silver__ledgers.sql index 487a1f2..fcba131 100644 --- a/models/silver/silver__ledgers.sql +++ b/models/silver/silver__ledgers.sql @@ -14,20 +14,15 @@ {% set max_is_query %} SELECT - MAX(_inserted_timestamp) AS _inserted_timestamp -FROM - {{ this }} - - {% endset %} - {% set max_is = run_query(max_is_query) [0] [0] %} - {% set max_part_query %} -SELECT + MAX(_inserted_timestamp) AS _inserted_timestamp, MAX(partition_gte_id) AS partition__gte_id FROM {{ this }} {% endset %} - {% set max_part = run_query(max_part_query) [0] [0] %} + {% set result = run_query(max_is_query) %} + {% set max_is = result [0] [0] %} + {% set max_part = result [0] [1] %} {% endif %} {% endif %} diff --git a/models/silver/silver__liquidity_pools.sql b/models/silver/silver__liquidity_pools.sql index 7ab591d..aa2e8f1 100644 --- a/models/silver/silver__liquidity_pools.sql +++ b/models/silver/silver__liquidity_pools.sql @@ -11,44 +11,57 @@ {% if execute %} {% if is_incremental() %} -{% set max_inserted_query %} +{% set max_is_query %} SELECT - MAX(_inserted_timestamp) AS _inserted_timestamp + MAX(_inserted_timestamp) AS _inserted_timestamp, + MAX(partition_gte_id) AS partition__gte_id FROM {{ this }} {% endset %} - {% set max_inserted_timestamp = run_query(max_inserted_query) [0] [0] %} + {% set result = run_query(max_is_query) %} + {% set max_is = result [0] [0] %} + {% set max_part = result [0] [1] %} {% endif %} {% endif %} WITH pre_final AS ( SELECT partition_id, + partition_gte_id, liquidity_pool_id, - TYPE, - fee, - trustline_count, - pool_share_count, - asset_a_type, - asset_a_code, - asset_a_issuer, - asset_a_id, - asset_a_amount, - asset_b_type, - asset_b_code, - asset_b_issuer, - asset_b_id, - asset_b_amount, - last_modified_ledger, - ledger_entry_change, - deleted, - batch_id, - batch_run_date, - batch_insert_ts, - closed_at, - ledger_sequence, + VALUE :TYPE :: STRING AS TYPE, + VALUE :fee :: INTEGER AS fee, + VALUE :trustline_count :: INTEGER AS trustline_count, + VALUE :pool_share_count :: FLOAT AS pool_share_count, + VALUE :asset_a_type :: STRING AS asset_a_type, + VALUE :asset_a_code :: STRING AS asset_a_code, + VALUE :asset_a_issuer :: STRING AS asset_a_issuer, + VALUE :asset_a_id :: INTEGER AS asset_a_id, + VALUE :asset_a_amount :: FLOAT AS asset_a_amount, + VALUE :asset_b_type :: STRING AS asset_b_type, + VALUE :asset_b_code :: STRING AS asset_b_code, + VALUE :asset_b_issuer :: STRING AS asset_b_issuer, + VALUE :asset_b_id :: INTEGER AS asset_b_id, + VALUE :asset_b_amount :: FLOAT AS asset_b_amount, + VALUE :last_modified_ledger :: INTEGER AS last_modified_ledger, + VALUE :ledger_entry_change :: INTEGER AS ledger_entry_change, + VALUE :deleted :: BOOLEAN AS deleted, + TO_TIMESTAMP( + VALUE :batch_run_date :: INT, + 6 + ) AS batch_run_date, + VALUE: batch_id :: STRING AS batch_id, + TO_TIMESTAMP( + VALUE :batch_insert_ts :: INT, + 6 + ) AS batch_insert_ts, + TO_TIMESTAMP( + VALUE :closed_at :: INT, + 6 + ) AS closed_at, + VALUE :ledger_sequence :: INTEGER AS ledger_sequence, _inserted_timestamp FROM @@ -60,42 +73,44 @@ WITH pre_final AS ( {% if is_incremental() %} WHERE - partition_id >= '{{ max_part }}' - AND _inserted_timestamp > '{{ max_inserted_timestamp }}' + partition_gte_id >= '{{ max_part }}' + AND _inserted_timestamp > '{{ max_is }}' {% endif %} qualify ROW_NUMBER() over ( PARTITION BY liquidity_pool_id, closed_at ORDER BY + batch_insert_ts DESC, _inserted_timestamp DESC ) = 1 ) SELECT partition_id, + partition_gte_id, liquidity_pool_id :: STRING AS liquidity_pool_id, - TYPE :: STRING AS TYPE, - fee :: INTEGER AS fee, - trustline_count :: INTEGER AS trustline_count, - pool_share_count :: FLOAT AS pool_share_count, - asset_a_type :: STRING AS asset_a_type, - asset_a_code :: STRING AS asset_a_code, - asset_a_issuer :: STRING AS asset_a_issuer, - asset_a_id :: INTEGER AS asset_a_id, - asset_a_amount :: FLOAT AS asset_a_amount, - asset_b_type :: STRING AS asset_b_type, - asset_b_code :: STRING AS asset_b_code, - asset_b_issuer :: STRING AS asset_b_issuer, - asset_b_id :: INTEGER AS asset_b_id, - asset_b_amount :: FLOAT AS asset_b_amount, - last_modified_ledger :: INTEGER AS last_modified_ledger, - ledger_entry_change :: INTEGER AS ledger_entry_change, - deleted :: BOOLEAN AS deleted, - batch_id :: STRING AS batch_id, - batch_run_date :: TIMESTAMP AS batch_run_date, - batch_insert_ts :: TIMESTAMP AS batch_insert_ts, - closed_at :: TIMESTAMP AS closed_at, - ledger_sequence :: INTEGER AS ledger_sequence, + TYPE, + fee, + trustline_count, + pool_share_count, + asset_a_type, + asset_a_code, + asset_a_issuer, + asset_a_id, + asset_a_amount, + asset_b_type, + asset_b_code, + asset_b_issuer, + asset_b_id, + asset_b_amount, + last_modified_ledger, + ledger_entry_change, + deleted, + batch_id, + batch_run_date, + batch_insert_ts, + closed_at, + ledger_sequence, _inserted_timestamp, {{ dbt_utils.generate_surrogate_key( ['liquidity_pool_id','closed_at'] diff --git a/models/silver/silver__operations.sql b/models/silver/silver__operations.sql index b42ff4f..a88d545 100644 --- a/models/silver/silver__operations.sql +++ b/models/silver/silver__operations.sql @@ -14,20 +14,15 @@ {% set max_is_query %} SELECT - MAX(_inserted_timestamp) AS _inserted_timestamp -FROM - {{ this }} - - {% endset %} - {% set max_is = run_query(max_is_query) [0] [0] %} - {% set max_part_query %} -SELECT + MAX(_inserted_timestamp) AS _inserted_timestamp, MAX(partition_gte_id) AS partition__gte_id FROM {{ this }} {% endset %} - {% set max_part = run_query(max_part_query) [0] [0] %} + {% set result = run_query(max_is_query) %} + {% set max_is = result [0] [0] %} + {% set max_part = result [0] [1] %} {% endif %} {% endif %} @@ -167,8 +162,14 @@ WITH pre_final AS ( 6 ) AS closed_at, VALUE :batch_id :: STRING AS batch_id, - VALUE :batch_run_date :: TIMESTAMP AS batch_run_date, - VALUE :batch_insert_ts :: TIMESTAMP AS batch_insert_ts, + TO_TIMESTAMP( + VALUE :batch_run_date :: INT, + 6 + ) AS batch_run_date, + TO_TIMESTAMP( + VALUE :batch_insert_ts :: INT, + 6 + ) AS batch_insert_ts, _inserted_timestamp FROM diff --git a/models/silver/silver__trades.sql b/models/silver/silver__trades.sql index a139356..a27cfc0 100644 --- a/models/silver/silver__trades.sql +++ b/models/silver/silver__trades.sql @@ -11,48 +11,61 @@ {% if execute %} {% if is_incremental() %} -{% set max_inserted_query %} +{% set max_is_query %} SELECT - MAX(_inserted_timestamp) AS _inserted_timestamp + MAX(_inserted_timestamp) AS _inserted_timestamp, + MAX(partition_gte_id) AS partition__gte_id FROM {{ this }} {% endset %} - {% set max_inserted_timestamp = run_query(max_inserted_query) [0] [0] %} + {% set result = run_query(max_is_query) %} + {% set max_is = result [0] [0] %} + {% set max_part = result [0] [1] %} {% endif %} {% endif %} WITH pre_final AS ( SELECT partition_id, + partition_gte_id, history_operation_id :: INTEGER AS history_operation_id, - "order" :: INTEGER AS "order", - ledger_closed_at :: TIMESTAMP AS ledger_closed_at, - selling_account_address :: STRING AS selling_account_address, - selling_asset_code :: STRING AS selling_asset_code, - selling_asset_issuer :: STRING AS selling_asset_issuer, - selling_asset_type :: STRING AS selling_asset_type, - selling_asset_id :: INTEGER AS selling_asset_id, - selling_amount :: FLOAT AS selling_amount, - buying_account_address :: STRING AS buying_account_address, - buying_asset_code :: STRING AS buying_asset_code, - buying_asset_issuer :: STRING AS buying_asset_issuer, - buying_asset_type :: STRING AS buying_asset_type, - buying_asset_id :: INTEGER AS buying_asset_id, - buying_amount :: FLOAT AS buying_amount, - price_n :: INTEGER AS price_n, - price_d :: INTEGER AS price_d, - selling_offer_id :: INTEGER AS selling_offer_id, - buying_offer_id :: INTEGER AS buying_offer_id, - batch_id :: STRING AS batch_id, - batch_run_date :: TIMESTAMP AS batch_run_date, - batch_insert_ts :: TIMESTAMP AS batch_insert_ts, - selling_liquidity_pool_id :: STRING AS selling_liquidity_pool_id, - liquidity_pool_fee :: INTEGER AS liquidity_pool_fee, - trade_type :: INTEGER AS trade_type, - rounding_slippage :: INTEGER AS rounding_slippage, - seller_is_exact :: BOOLEAN AS seller_is_exact, + VALUE: "order" :: INTEGER AS "order", + TO_TIMESTAMP( + VALUE :ledger_closed_at :: INT, + 6 + ) AS ledger_closed_at, + VALUE: selling_account_address :: STRING AS selling_account_address, + VALUE: selling_asset_code :: STRING AS selling_asset_code, + VALUE: selling_asset_issuer :: STRING AS selling_asset_issuer, + VALUE: selling_asset_type :: STRING AS selling_asset_type, + VALUE: selling_asset_id :: INTEGER AS selling_asset_id, + VALUE: selling_amount :: FLOAT AS selling_amount, + VALUE: buying_account_address :: STRING AS buying_account_address, + VALUE: buying_asset_code :: STRING AS buying_asset_code, + VALUE: buying_asset_issuer :: STRING AS buying_asset_issuer, + VALUE: buying_asset_type :: STRING AS buying_asset_type, + VALUE: buying_asset_id :: INTEGER AS buying_asset_id, + VALUE: buying_amount :: FLOAT AS buying_amount, + VALUE: price_n :: INTEGER AS price_n, + VALUE: price_d :: INTEGER AS price_d, + VALUE: selling_offer_id :: INTEGER AS selling_offer_id, + VALUE: buying_offer_id :: INTEGER AS buying_offer_id, + VALUE :batch_id :: STRING AS batch_id, + TO_TIMESTAMP( + VALUE :batch_run_date :: INT, + 6 + ) AS batch_run_date, + TO_TIMESTAMP( + VALUE :batch_insert_ts :: INT, + 6 + ) AS batch_insert_ts, + VALUE: selling_liquidity_pool_id :: STRING AS selling_liquidity_pool_id, + VALUE: liquidity_pool_fee :: INTEGER AS liquidity_pool_fee, + VALUE: trade_type :: INTEGER AS trade_type, + VALUE: rounding_slippage :: INTEGER AS rounding_slippage, + VALUE: seller_is_exact :: BOOLEAN AS seller_is_exact, _inserted_timestamp FROM @@ -64,18 +77,21 @@ WITH pre_final AS ( {% if is_incremental() %} WHERE - _inserted_timestamp >= '{{ max_inserted_timestamp }}' + partition_gte_id >= '{{ max_part }}' + AND _inserted_timestamp > '{{ max_is }}' {% endif %} qualify ROW_NUMBER() over ( PARTITION BY history_operation_id, "order" ORDER BY + batch_insert_ts DESC, _inserted_timestamp DESC ) = 1 ) SELECT partition_id, + partition_gte_id, history_operation_id, "order", ledger_closed_at, diff --git a/models/silver/silver__transactions.sql b/models/silver/silver__transactions.sql index b0976e2..b719498 100644 --- a/models/silver/silver__transactions.sql +++ b/models/silver/silver__transactions.sql @@ -11,65 +11,81 @@ {% if execute %} {% if is_incremental() %} -{% set max_inserted_query %} +{% set max_is_query %} SELECT - MAX(_inserted_timestamp) AS _inserted_timestamp + MAX(_inserted_timestamp) AS _inserted_timestamp, + MAX(partition_gte_id) AS partition__gte_id FROM {{ this }} {% endset %} - {% set max_inserted_timestamp = run_query(max_inserted_query) [0] [0] %} + {% set result = run_query(max_is_query) %} + {% set max_is = result [0] [0] %} + {% set max_part = result [0] [1] %} {% endif %} {% endif %} WITH pre_final AS ( SELECT partition_id, + partition_gte_id, id :: INTEGER AS id, - transaction_hash :: STRING AS transaction_hash, - ledger_sequence :: INTEGER AS ledger_sequence, - account :: STRING AS account, - account_sequence :: INTEGER AS account_sequence, - max_fee :: INTEGER AS max_fee, - operation_count :: INTEGER AS operation_count, - created_at :: TIMESTAMP AS created_at, - memo_type :: STRING AS memo_type, - memo :: STRING AS memo, - time_bounds :: STRING AS time_bounds, - SUCCESSFUL :: BOOLEAN AS SUCCESSFUL, - fee_charged :: INTEGER AS fee_charged, - inner_transaction_hash :: STRING AS inner_transaction_hash, - fee_account :: STRING AS fee_account, - new_max_fee :: INTEGER AS new_max_fee, - account_muxed :: STRING AS account_muxed, - fee_account_muxed :: STRING AS fee_account_muxed, - batch_id :: STRING AS batch_id, - batch_run_date :: TIMESTAMP AS batch_run_date, - batch_insert_ts :: TIMESTAMP AS batch_insert_ts, - ledger_bounds :: STRING AS ledger_bounds, - min_account_sequence :: INTEGER AS min_account_sequence, - min_account_sequence_age :: INTEGER AS min_account_sequence_age, - min_account_sequence_ledger_gap :: INTEGER AS min_account_sequence_ledger_gap, - tx_envelope :: STRING AS tx_envelope, - tx_result :: STRING AS tx_result, - tx_meta :: STRING AS tx_meta, - tx_fee_meta :: STRING AS tx_fee_meta, - extra_signers :: ARRAY AS extra_signers, - resource_fee :: INTEGER AS resource_fee, - soroban_resources_instructions :: INTEGER AS soroban_resources_instructions, - soroban_resources_read_bytes :: INTEGER AS soroban_resources_read_bytes, - soroban_resources_write_bytes :: INTEGER AS soroban_resources_write_bytes, - closed_at :: TIMESTAMP AS closed_at, - transaction_result_code :: STRING AS transaction_result_code, - inclusion_fee_bid :: INTEGER AS inclusion_fee_bid, - inclusion_fee_charged :: INTEGER AS inclusion_fee_charged, - resource_fee_refund :: INTEGER AS resource_fee_refund, - non_refundable_resource_fee_charged :: INTEGER AS non_refundable_resource_fee_charged, - refundable_resource_fee_charged :: INTEGER AS refundable_resource_fee_charged, - rent_fee_charged :: INTEGER AS rent_fee_charged, - tx_signers :: STRING AS tx_signers, - refundable_fee :: INTEGER AS refundable_fee, + VALUE :transaction_hash :: STRING AS transaction_hash, + VALUE :ledger_sequence :: INTEGER AS ledger_sequence, + VALUE :account :: STRING AS account, + VALUE :account_sequence :: INTEGER AS account_sequence, + VALUE :max_fee :: INTEGER AS max_fee, + VALUE :operation_count :: INTEGER AS operation_count, + TO_TIMESTAMP( + VALUE :created_at :: INT, + 6 + ) AS created_at, + VALUE :memo_type :: STRING AS memo_type, + VALUE :memo :: STRING AS memo, + VALUE :time_bounds :: STRING AS time_bounds, + VALUE :SUCCESSFUL :: BOOLEAN AS SUCCESSFUL, + VALUE :fee_charged :: INTEGER AS fee_charged, + VALUE :inner_transaction_hash :: STRING AS inner_transaction_hash, + VALUE :fee_account :: STRING AS fee_account, + VALUE :new_max_fee :: INTEGER AS new_max_fee, + VALUE :account_muxed :: STRING AS account_muxed, + VALUE :fee_account_muxed :: STRING AS fee_account_muxed, + VALUE :batch_id :: STRING AS batch_id, + TO_TIMESTAMP( + VALUE :batch_run_date :: INT, + 6 + ) AS batch_run_date, + TO_TIMESTAMP( + VALUE :batch_insert_ts :: INT, + 6 + ) AS batch_insert_ts, + VALUE :ledger_bounds :: STRING AS ledger_bounds, + VALUE :min_account_sequence :: INTEGER AS min_account_sequence, + VALUE :min_account_sequence_age :: INTEGER AS min_account_sequence_age, + VALUE :min_account_sequence_ledger_gap :: INTEGER AS min_account_sequence_ledger_gap, + VALUE :tx_envelope :: STRING AS tx_envelope, + VALUE :tx_result :: STRING AS tx_result, + VALUE :tx_meta :: STRING AS tx_meta, + VALUE :tx_fee_meta :: STRING AS tx_fee_meta, + VALUE :extra_signers :: ARRAY AS extra_signers, + VALUE :resource_fee :: INTEGER AS resource_fee, + VALUE :soroban_resources_instructions :: INTEGER AS soroban_resources_instructions, + VALUE :soroban_resources_read_bytes :: INTEGER AS soroban_resources_read_bytes, + VALUE :soroban_resources_write_bytes :: INTEGER AS soroban_resources_write_bytes, + TO_TIMESTAMP( + VALUE :closed_at :: INT, + 6 + ) AS closed_at, + VALUE :transaction_result_code :: STRING AS transaction_result_code, + VALUE :inclusion_fee_bid :: INTEGER AS inclusion_fee_bid, + VALUE :inclusion_fee_charged :: INTEGER AS inclusion_fee_charged, + VALUE :resource_fee_refund :: INTEGER AS resource_fee_refund, + VALUE :non_refundable_resource_fee_charged :: INTEGER AS non_refundable_resource_fee_charged, + VALUE :refundable_resource_fee_charged :: INTEGER AS refundable_resource_fee_charged, + VALUE :rent_fee_charged :: INTEGER AS rent_fee_charged, + VALUE :tx_signers :: STRING AS tx_signers, + VALUE :refundable_fee :: INTEGER AS refundable_fee, _inserted_timestamp FROM @@ -81,17 +97,20 @@ WITH pre_final AS ( {% if is_incremental() %} WHERE - _inserted_timestamp >= '{{ max_inserted_timestamp }}' + partition_gte_id >= '{{ max_part }}' + AND _inserted_timestamp > '{{ max_is }}' {% endif %} qualify ROW_NUMBER() over ( PARTITION BY id ORDER BY + batch_insert_ts DESC, _inserted_timestamp DESC ) = 1 ) SELECT partition_id, + partition_gte_id, id, transaction_hash, ledger_sequence,