From 15dcc024764998891b2530eb9fc6c7fb43213846 Mon Sep 17 00:00:00 2001 From: Jack Forgash <58153492+forgxyz@users.noreply.github.com> Date: Thu, 21 Nov 2024 13:47:12 -0700 Subject: [PATCH] AN-5528/Flow API Balances - chg evm_address source (#382) * chg evm_address source * disable transaction_entries * change batch --- ...scheduled_transaction_entries_realtime.yml | 4 +- .../streamline__reward_points_realtime.sql | 6 +-- .../bronze_api__FR_transaction_entries.sql | 2 +- .../bronze_api__transaction_entries.sql | 2 +- .../silver_api__reward_points_spend.sql | 2 +- .../silver_api__transaction_entries.sql | 2 +- .../external/streamline__evm_addresses.sql | 39 +++++++++++++++++-- .../tests__unique_address_count_threshold.sql | 2 +- 8 files changed, 45 insertions(+), 14 deletions(-) diff --git a/.github/workflows/dbt_run_scheduled_transaction_entries_realtime.yml b/.github/workflows/dbt_run_scheduled_transaction_entries_realtime.yml index f70662e..aa0e61b 100644 --- a/.github/workflows/dbt_run_scheduled_transaction_entries_realtime.yml +++ b/.github/workflows/dbt_run_scheduled_transaction_entries_realtime.yml @@ -3,9 +3,9 @@ run-name: dbt_run_scheduled_transaction_entries_realtime on: workflow_dispatch: - schedule: + # schedule: # Daily at 00:00 UTC - - cron: "0 0 * * *" + # - cron: "0 0 * * *" env: USE_VARS: "${{ vars.USE_VARS }}" diff --git a/models/streamline/external/balances/streamline__reward_points_realtime.sql b/models/streamline/external/balances/streamline__reward_points_realtime.sql index 5a41111..bf569d5 100644 --- a/models/streamline/external/balances/streamline__reward_points_realtime.sql +++ b/models/streamline/external/balances/streamline__reward_points_realtime.sql @@ -5,9 +5,9 @@ target = "{{this.schema}}.{{this.identifier}}", params = { "external_table": "reward_points", - "sql_limit": "10000", - "producer_batch_size": "1000", - "worker_batch_size": "1000", + "sql_limit": "18000", + "producer_batch_size": "3000", + "worker_batch_size": "3000", "sql_source": "{{this.identifier}}" } ) diff --git a/models/streamline/external/storefront/bronze_api__FR_transaction_entries.sql b/models/streamline/external/storefront/bronze_api__FR_transaction_entries.sql index b115411..dc8773a 100644 --- a/models/streamline/external/storefront/bronze_api__FR_transaction_entries.sql +++ b/models/streamline/external/storefront/bronze_api__FR_transaction_entries.sql @@ -1,6 +1,6 @@ {{ config ( materialized = 'view', - tags = ['streamline_non_core', 'rewards_points_spend'] + tags = ['rewards_points_spend'] ) }} {{ streamline_external_table_FR_query_v2( diff --git a/models/streamline/external/storefront/bronze_api__transaction_entries.sql b/models/streamline/external/storefront/bronze_api__transaction_entries.sql index 5ab47e9..ac82267 100644 --- a/models/streamline/external/storefront/bronze_api__transaction_entries.sql +++ b/models/streamline/external/storefront/bronze_api__transaction_entries.sql @@ -1,6 +1,6 @@ {{ config ( materialized = 'view', - tags = ['streamline_non_core', 'rewards_points_spend'] + tags = ['rewards_points_spend'] ) }} {{ streamline_external_table_query_v2( diff --git a/models/streamline/external/storefront/silver_api__reward_points_spend.sql b/models/streamline/external/storefront/silver_api__reward_points_spend.sql index 87281d3..9e207c9 100644 --- a/models/streamline/external/storefront/silver_api__reward_points_spend.sql +++ b/models/streamline/external/storefront/silver_api__reward_points_spend.sql @@ -5,7 +5,7 @@ merge_exclude_columns = ["inserted_timestamp"], cluster_by = ['_inserted_timestamp :: DATE'], post_hook = [ "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION on equality(user_wallet_address)" ], - tags = ['streamline_non_core', 'rewards_points_spend'] + tags = ['rewards_points_spend'] ) }} WITH silver_responses AS ( diff --git a/models/streamline/external/storefront/silver_api__transaction_entries.sql b/models/streamline/external/storefront/silver_api__transaction_entries.sql index 08c56ba..73419c7 100644 --- a/models/streamline/external/storefront/silver_api__transaction_entries.sql +++ b/models/streamline/external/storefront/silver_api__transaction_entries.sql @@ -6,7 +6,7 @@ incremental_strategy = 'merge', merge_exclude_columns = ["inserted_timestamp"], cluster_by = ['_inserted_timestamp :: DATE'], - tags = ['streamline_non_core', 'rewards_points_spend'] + tags = ['rewards_points_spend'] ) }} WITH bronze AS ( diff --git a/models/streamline/external/streamline__evm_addresses.sql b/models/streamline/external/streamline__evm_addresses.sql index f0147ae..1a81e05 100644 --- a/models/streamline/external/streamline__evm_addresses.sql +++ b/models/streamline/external/streamline__evm_addresses.sql @@ -7,8 +7,25 @@ tags = ['streamline_non_core'] ) }} -WITH evm_transactions AS ( +WITH points_transfers AS ( + SELECT + from_address, + to_address + FROM + {{ ref('silver_api__points_transfers') }} + +{% if is_incremental() %} +WHERE + modified_timestamp > ( + SELECT + MAX(modified_timestamp) + FROM + {{ this }} + ) +{% endif %} +), +onchain AS ( SELECT DISTINCT from_address AS address FROM @@ -23,6 +40,22 @@ WHERE {{ this }} ) {% endif %} +), +evm_addresses AS ( + SELECT + DISTINCT from_address AS address + FROM + points_transfers + UNION + SELECT + DISTINCT to_address AS address + FROM + points_transfers + UNION + SELECT + DISTINCT address + FROM + onchain ) SELECT address, @@ -33,8 +66,6 @@ SELECT ) }} AS evm_addresses_id, '{{ invocation_id }}' AS _invocation_id FROM - evm_transactions - -qualify(ROW_NUMBER() over (PARTITION BY address + evm_addresses qualify(ROW_NUMBER() over (PARTITION BY address ORDER BY inserted_timestamp DESC)) = 1 diff --git a/tests/points/tests__unique_address_count_threshold.sql b/tests/points/tests__unique_address_count_threshold.sql index cb67930..9f365b7 100644 --- a/tests/points/tests__unique_address_count_threshold.sql +++ b/tests/points/tests__unique_address_count_threshold.sql @@ -18,4 +18,4 @@ SELECT FROM distinct_count WHERE - ct > 7000 + ct > 20000