diff --git a/.github/workflows/dbt_run_scheduled_reward_points_realtime.yml b/.github/workflows/dbt_run_scheduled_reward_points_realtime.yml index 3f48c6e..35911a9 100644 --- a/.github/workflows/dbt_run_scheduled_reward_points_realtime.yml +++ b/.github/workflows/dbt_run_scheduled_reward_points_realtime.yml @@ -45,7 +45,7 @@ jobs: - name: Request User Points Balances and Storefront Items run: > - dbt run -s streamline__reward_points_realtime streamline__minting_assets_realtime --vars '{"STREAMLINE_INVOKE_STREAMS": True}' + dbt run -s 1+streamline__reward_points_realtime streamline__minting_assets_realtime --vars '{"STREAMLINE_INVOKE_STREAMS": True}' - name: Authenticate with Flow Points API run: | diff --git a/.github/workflows/dbt_run_streamline_evm_daily_silver.yml b/.github/workflows/dbt_run_streamline_evm_daily_silver.yml index 70cd23b..f11c872 100644 --- a/.github/workflows/dbt_run_streamline_evm_daily_silver.yml +++ b/.github/workflows/dbt_run_streamline_evm_daily_silver.yml @@ -4,8 +4,8 @@ run-name: dbt_run_streamline_evm_daily_silver on: workflow_dispatch: schedule: - # Daily at 01:00 UTC (see https://crontab.guru) - - cron: "0 1 * * *" + # Daily at 02:00 UTC (see https://crontab.guru) + - cron: "0 2 * * *" env: USE_VARS: "${{ vars.USE_VARS }}" diff --git a/models/streamline/external/balances/streamline__reward_points_realtime.sql b/models/streamline/external/balances/streamline__reward_points_realtime.sql index 7f14579..bc2adea 100644 --- a/models/streamline/external/balances/streamline__reward_points_realtime.sql +++ b/models/streamline/external/balances/streamline__reward_points_realtime.sql @@ -5,9 +5,9 @@ target = "{{this.schema}}.{{this.identifier}}", params = { "external_table": "reward_points", - "sql_limit": "24000", - "producer_batch_size": "3000", - "worker_batch_size": "3000", + "sql_limit": "32000", + "producer_batch_size": "8000", + "worker_batch_size": "1600", "sql_source": "{{this.identifier}}" } ) diff --git a/models/streamline/external/streamline__evm_addresses.sql b/models/streamline/external/streamline__evm_addresses.sql index 1a81e05..b562eda 100644 --- a/models/streamline/external/streamline__evm_addresses.sql +++ b/models/streamline/external/streamline__evm_addresses.sql @@ -29,7 +29,7 @@ onchain AS ( SELECT DISTINCT from_address AS address FROM - {{ ref('silver_evm__transactions') }} + {{ ref('core_evm__fact_transactions') }} {% if is_incremental() %} WHERE diff --git a/models/streamline/external/transfers/silver_api__points_transfers.sql b/models/streamline/external/transfers/silver_api__points_transfers.sql index eee1b90..a198261 100644 --- a/models/streamline/external/transfers/silver_api__points_transfers.sql +++ b/models/streamline/external/transfers/silver_api__points_transfers.sql @@ -6,36 +6,21 @@ tags = ['streamline_non_core'] ) }} - - WITH points_transfers_raw AS ( SELECT partition_key, request_date, - DATA, + from_address, + batch_index, + batch_id, + created_at, + batch_status, + batch_transfers, _inserted_timestamp FROM {{ ref('silver_api__points_transfers_response') }} -), -flatten_batches AS ( - SELECT - partition_key, - request_date, - _inserted_timestamp, - DATA :address :: STRING AS from_address, - A.index AS batch_index, - A.value :createdAt :: TIMESTAMP_NTZ AS created_at, - A.value :batchId :: STRING AS batch_id, - A.value :status :: STRING AS batch_status, - A.value :transfers :: ARRAY AS batch_transfers - FROM - points_transfers_raw, - LATERAL FLATTEN( - DATA :transfers :: ARRAY - ) A - ), flatten_transfers AS ( SELECT @@ -52,7 +37,7 @@ flatten_transfers AS ( A.value :points :: NUMBER AS points, A.value :toAddressId :: STRING AS to_address FROM - flatten_batches, + points_transfers_raw, LATERAL FLATTEN(batch_transfers) A ) SELECT @@ -68,7 +53,7 @@ SELECT points, partition_key, {{ dbt_utils.generate_surrogate_key( - ['batch_id', 'transfer_index'] + ['from_address', 'batch_id', 'transfer_index'] ) }} AS points_transfers_id, SYSDATE() AS inserted_timestamp, SYSDATE() AS modified_timestamp, diff --git a/models/streamline/external/transfers/silver_api__points_transfers_response.sql b/models/streamline/external/transfers/silver_api__points_transfers_response.sql index 7b40b71..2d13a3b 100644 --- a/models/streamline/external/transfers/silver_api__points_transfers_response.sql +++ b/models/streamline/external/transfers/silver_api__points_transfers_response.sql @@ -29,11 +29,16 @@ SELECT partition_key, TO_TIMESTAMP(partition_key) :: DATE AS request_date, - DATA, + VALUE :address :: STRING AS from_address, + ZEROIFNULL(VALUE :array_index :: INTEGER) AS batch_index, + DATA :batchId :: STRING AS batch_id, + DATA :createdAt :: TIMESTAMP_NTZ AS created_at, + DATA :secondsToFinalize :: INTEGER AS seconds_to_finalize, + DATA :status :: STRING AS batch_status, + DATA :transfers :: ARRAY AS batch_transfers, _inserted_timestamp, - ROUND(OCTET_LENGTH(DATA) / 1048576, 2) AS data_mb, {{ dbt_utils.generate_surrogate_key( - ['file_name', 'data :address :: STRING'] + ['VALUE :address :: STRING', 'DATA :batchId :: STRING'] ) }} AS points_transfers_id, SYSDATE() AS inserted_timestamp, SYSDATE() AS modified_timestamp, diff --git a/models/streamline/external/transfers/streamline__points_transfers_realtime.sql b/models/streamline/external/transfers/streamline__points_transfers_realtime.sql index 0063031..9cb8486 100644 --- a/models/streamline/external/transfers/streamline__points_transfers_realtime.sql +++ b/models/streamline/external/transfers/streamline__points_transfers_realtime.sql @@ -9,7 +9,8 @@ "producer_batch_size": "1", "worker_batch_size": "1", "sql_source": "{{this.identifier}}", - "exploded_key": tojson(["result"]) + "exploded_key": tojson(["transfers"]), + "include_top_level_json": tojson(["address"]) } ) ) }} @@ -19,7 +20,7 @@ SELECT DATE_PART('EPOCH', SYSDATE()) :: INTEGER AS partition_key, {{ target.database }}.live.udf_api( 'GET', - '{Service}/points/dapp/transfer/all', + '{Service}points/dapp/transfer/all', { 'Authorization': 'Bearer ' || '{{ env_var("JWT", "") }}', 'Accept': 'application/json', @@ -30,4 +31,3 @@ SELECT {}, 'Vault/prod/flow/points-api/prod' ) AS request - diff --git a/tests/points/tests__unique_address_count_threshold.sql b/tests/points/tests__unique_address_count_threshold.sql index 9f365b7..77663ef 100644 --- a/tests/points/tests__unique_address_count_threshold.sql +++ b/tests/points/tests__unique_address_count_threshold.sql @@ -18,4 +18,4 @@ SELECT FROM distinct_count WHERE - ct > 20000 + ct > 30000