From 4df472bacedffe10ccbc38ceeb4f0880765e4633 Mon Sep 17 00:00:00 2001 From: Jack Forgash <58153492+forgxyz@users.noreply.github.com> Date: Fri, 15 Nov 2024 09:43:52 -0700 Subject: [PATCH] AN-5459/point transfers api integration (#378) * point transfers api integration * upd 2 silver models * gha workflow with auth * upd gha workflow * add gha on push config for testing * add print step * chg echo * upd auth.py * rm run on commit from gha * upd model tags * merge 2 silver into 1, add yml, upd gha * upd auth return vals * add exit 1 on failure * upd return on success True * add gha on-run config for final test * add backup default to env_var(JWT) * add backup default to env_var(JWT) - 2 * rm run on commit from gha and upd sql limit for balance call * add yml for evm address model * add evm address threshold test * upd gha workflow * upd per CRs --- ..._run_scheduled_reward_points_realtime.yml} | 19 ++- ...dbt_run_scheduled_reward_points_silver.yml | 4 +- .../bronze_api__FR_reward_points.sql | 0 .../bronze_api__reward_points.sql | 0 .../silver_api__reward_points.sql | 3 +- .../silver_api__reward_points.yml | 0 .../streamline__reward_points_realtime.sql | 5 +- .../external/streamline__evm_addresses.yml | 24 ++++ .../bronze_api__FR_points_transfers.sql | 9 ++ .../bronze_api__points_transfers.sql | 9 ++ .../silver_api__points_transfers.sql | 102 +++++++++++++ .../silver_api__points_transfers.yml | 57 ++++++++ .../streamline__points_transfers_realtime.sql | 32 +++++ models/sources.yml | 1 + python/points/authenticate.py | 136 ++++++++++++++++++ requirements.txt | 2 + .../tests__unique_address_count_threshold.sql | 21 +++ 17 files changed, 414 insertions(+), 10 deletions(-) rename .github/workflows/{dbt_run_scheduled_reward_points_api.yml => dbt_run_scheduled_reward_points_realtime.yml} (61%) rename models/silver/streamline/external/{ => balances}/bronze_api__FR_reward_points.sql (100%) rename models/silver/streamline/external/{ => balances}/bronze_api__reward_points.sql (100%) rename models/silver/streamline/external/{ => balances}/silver_api__reward_points.sql (92%) rename models/silver/streamline/external/{ => balances}/silver_api__reward_points.yml (100%) rename models/silver/streamline/external/{ => balances}/streamline__reward_points_realtime.sql (92%) create mode 100644 models/silver/streamline/external/streamline__evm_addresses.yml create mode 100644 models/silver/streamline/external/transfers/bronze_api__FR_points_transfers.sql create mode 100644 models/silver/streamline/external/transfers/bronze_api__points_transfers.sql create mode 100644 models/silver/streamline/external/transfers/silver_api__points_transfers.sql create mode 100644 models/silver/streamline/external/transfers/silver_api__points_transfers.yml create mode 100644 models/silver/streamline/external/transfers/streamline__points_transfers_realtime.sql create mode 100644 python/points/authenticate.py create mode 100644 tests/points/tests__unique_address_count_threshold.sql diff --git a/.github/workflows/dbt_run_scheduled_reward_points_api.yml b/.github/workflows/dbt_run_scheduled_reward_points_realtime.yml similarity index 61% rename from .github/workflows/dbt_run_scheduled_reward_points_api.yml rename to .github/workflows/dbt_run_scheduled_reward_points_realtime.yml index f9b5a3e..8ff16e7 100644 --- a/.github/workflows/dbt_run_scheduled_reward_points_api.yml +++ b/.github/workflows/dbt_run_scheduled_reward_points_realtime.yml @@ -1,5 +1,5 @@ -name: dbt_run_scheduled_reward_points_api -run-name: dbt_run_scheduled_reward_points_api +name: dbt_run_scheduled_reward_points_realtime +run-name: dbt_run_scheduled_reward_points_realtime on: workflow_dispatch: @@ -19,6 +19,9 @@ env: DATABASE: "${{ vars.DATABASE }}" WAREHOUSE: "${{ vars.WAREHOUSE }}" SCHEMA: "${{ vars.SCHEMA }}" + FLOW_POINTS_URL: "${{ secrets.FLOW_POINTS_URL }}" + PRIVATE_KEY: "${{ secrets.PRIVATE_KEY }}" + PUBLIC_ADDRESS: "${{ secrets.PUBLIC_ADDRESS }}" concurrency: group: ${{ github.workflow }} @@ -40,9 +43,17 @@ jobs: pip install -r requirements.txt dbt deps - - name: Run DBT Jobs + - name: Request User Points Balances run: > - dbt run -s tag:streamline_non_core --vars '{"STREAMLINE_INVOKE_STREAMS": True, "STREAMLINE_USE_DEV_FOR_EXTERNAL_TABLES": False}' + dbt run -s streamline__reward_points_realtime --vars '{"STREAMLINE_INVOKE_STREAMS": True}' + + - name: Authenticate with Flow Points API + run: | + python python/points/authenticate.py + + - name: Request Points Transfers + run: > + dbt run -s streamline__points_transfers_realtime --vars '{"STREAMLINE_INVOKE_STREAMS": True}' - name: Store logs uses: actions/upload-artifact@v3 diff --git a/.github/workflows/dbt_run_scheduled_reward_points_silver.yml b/.github/workflows/dbt_run_scheduled_reward_points_silver.yml index baa122e..91605de 100644 --- a/.github/workflows/dbt_run_scheduled_reward_points_silver.yml +++ b/.github/workflows/dbt_run_scheduled_reward_points_silver.yml @@ -42,11 +42,11 @@ jobs: - name: Run DBT Jobs run: > - dbt run -s silver_api__reward_points + dbt run -s tag:streamline_non_core - name: Test DBT Models run: > - dbt test -s silver_api__reward_points + dbt test -s tag:streamline_non_core continue-on-error: true - name: Log test results diff --git a/models/silver/streamline/external/bronze_api__FR_reward_points.sql b/models/silver/streamline/external/balances/bronze_api__FR_reward_points.sql similarity index 100% rename from models/silver/streamline/external/bronze_api__FR_reward_points.sql rename to models/silver/streamline/external/balances/bronze_api__FR_reward_points.sql diff --git a/models/silver/streamline/external/bronze_api__reward_points.sql b/models/silver/streamline/external/balances/bronze_api__reward_points.sql similarity index 100% rename from models/silver/streamline/external/bronze_api__reward_points.sql rename to models/silver/streamline/external/balances/bronze_api__reward_points.sql diff --git a/models/silver/streamline/external/silver_api__reward_points.sql b/models/silver/streamline/external/balances/silver_api__reward_points.sql similarity index 92% rename from models/silver/streamline/external/silver_api__reward_points.sql rename to models/silver/streamline/external/balances/silver_api__reward_points.sql index 15a8541..7882e8b 100644 --- a/models/silver/streamline/external/silver_api__reward_points.sql +++ b/models/silver/streamline/external/balances/silver_api__reward_points.sql @@ -5,7 +5,8 @@ unique_key = "reward_points_id", incremental_strategy = 'merge', merge_exclude_columns = ["inserted_timestamp"], - cluster_by = ['_inserted_timestamp :: DATE', 'address'] + cluster_by = ['_inserted_timestamp :: DATE', 'address'], + tags = ['streamline_non_core'] ) }} SELECT diff --git a/models/silver/streamline/external/silver_api__reward_points.yml b/models/silver/streamline/external/balances/silver_api__reward_points.yml similarity index 100% rename from models/silver/streamline/external/silver_api__reward_points.yml rename to models/silver/streamline/external/balances/silver_api__reward_points.yml diff --git a/models/silver/streamline/external/streamline__reward_points_realtime.sql b/models/silver/streamline/external/balances/streamline__reward_points_realtime.sql similarity index 92% rename from models/silver/streamline/external/streamline__reward_points_realtime.sql rename to models/silver/streamline/external/balances/streamline__reward_points_realtime.sql index aa3f761..5a41111 100644 --- a/models/silver/streamline/external/streamline__reward_points_realtime.sql +++ b/models/silver/streamline/external/balances/streamline__reward_points_realtime.sql @@ -5,13 +5,12 @@ target = "{{this.schema}}.{{this.identifier}}", params = { "external_table": "reward_points", - "sql_limit": "1000", + "sql_limit": "10000", "producer_batch_size": "1000", "worker_batch_size": "1000", "sql_source": "{{this.identifier}}" } - ), - tags = ['streamline_non_core'] + ) ) }} WITH evm_addresses AS ( diff --git a/models/silver/streamline/external/streamline__evm_addresses.yml b/models/silver/streamline/external/streamline__evm_addresses.yml new file mode 100644 index 0000000..e29c654 --- /dev/null +++ b/models/silver/streamline/external/streamline__evm_addresses.yml @@ -0,0 +1,24 @@ +version: 2 + +models: + - name: streamline__evm_addresses + description: "Table of unique EVM addresses." + data_tests: + - dbt_utils.recency: + datepart: day + field: modified_timestamp + interval: 1 + + columns: + - name: address + tests: + - not_null + - unique + + - name: modified_timestamp + + - name: inserted_timestamp + + - name: evm_addresses_id + + - name: _invocation_id diff --git a/models/silver/streamline/external/transfers/bronze_api__FR_points_transfers.sql b/models/silver/streamline/external/transfers/bronze_api__FR_points_transfers.sql new file mode 100644 index 0000000..a693989 --- /dev/null +++ b/models/silver/streamline/external/transfers/bronze_api__FR_points_transfers.sql @@ -0,0 +1,9 @@ +{{ config ( + materialized = 'view', + tags = ['streamline_non_core'] +) }} + +{{ streamline_external_table_FR_query_v2( + model = "points_transfers", + partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 3), '_', 1) AS INTEGER )" +) }} diff --git a/models/silver/streamline/external/transfers/bronze_api__points_transfers.sql b/models/silver/streamline/external/transfers/bronze_api__points_transfers.sql new file mode 100644 index 0000000..d097933 --- /dev/null +++ b/models/silver/streamline/external/transfers/bronze_api__points_transfers.sql @@ -0,0 +1,9 @@ +{{ config ( + materialized = 'view', + tags = ['streamline_non_core'] +) }} + +{{ streamline_external_table_query_v2( + model = "points_transfers", + partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 3), '_', 1) AS INTEGER )" +) }} diff --git a/models/silver/streamline/external/transfers/silver_api__points_transfers.sql b/models/silver/streamline/external/transfers/silver_api__points_transfers.sql new file mode 100644 index 0000000..a110c01 --- /dev/null +++ b/models/silver/streamline/external/transfers/silver_api__points_transfers.sql @@ -0,0 +1,102 @@ +-- depends_on: {{ ref('bronze_api__points_transfers') }} +-- depends_on: {{ ref('bronze_api__FR_points_transfers') }} +{{ config( + materialized = 'incremental', + unique_key = "batch_id", + incremental_strategy = 'merge', + merge_exclude_columns = ["inserted_timestamp", "_inserted_timestamp"], + cluster_by = ['modified_timestamp :: DATE', 'from_address'], + post_hook = [ "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION on equality(from_address, to_address)" ], + tags = ['streamline_non_core'] +) }} + +WITH points_transfers_raw AS ( + + SELECT + partition_key, + TO_TIMESTAMP(partition_key) :: DATE AS request_date, + DATA, + _inserted_timestamp + FROM + +{% if is_incremental() %} +{{ ref('bronze_api__points_transfers') }} +WHERE + _inserted_timestamp >= ( + SELECT + MAX(_inserted_timestamp) + FROM + {{ this }} + ) +{% else %} + {{ ref('bronze_api__FR_points_transfers') }} +{% endif %} +), +flatten_protocols AS ( + SELECT + partition_key, + request_date, + _inserted_timestamp, + A.value :address :: STRING AS address, + A.value :transfers :: ARRAY AS transfers + FROM + points_transfers_raw, + LATERAL FLATTEN(DATA) A +), +flatten_batches AS ( + SELECT + partition_key, + request_date, + _inserted_timestamp, + address AS from_address, + A.index AS batch_index, + A.value :batchId :: STRING AS batch_id, + A.value :status :: STRING AS batch_status, + A.value :transfers :: ARRAY AS batch_transfers + FROM + flatten_protocols, + LATERAL FLATTEN( + transfers + ) A +), +flatten_transfers AS ( + SELECT + partition_key, + request_date, + from_address, + batch_index, + batch_id, + _inserted_timestamp, + A.index AS transfer_index, + A.value :boxes :: NUMBER AS boxes, + A.value :keys :: NUMBER AS keys, + A.value :points :: NUMBER AS points, + A.value :toAddressId :: STRING AS to_address + FROM + flatten_batches, + LATERAL FLATTEN(batch_transfers) A +) +SELECT + request_date, + batch_id, + batch_index, + transfer_index, + from_address, + to_address, + boxes, + keys, + points, + partition_key, + {{ dbt_utils.generate_surrogate_key( + ['batch_id', 'transfer_index'] + ) }} AS points_transfers_id, + SYSDATE() AS inserted_timestamp, + SYSDATE() AS modified_timestamp, + '{{ invocation_id }}' AS _invocation_id, + _inserted_timestamp +FROM + flatten_transfers + +qualify(ROW_NUMBER() over (PARTITION BY batch_id +ORDER BY + _inserted_timestamp ASC)) = 1 diff --git a/models/silver/streamline/external/transfers/silver_api__points_transfers.yml b/models/silver/streamline/external/transfers/silver_api__points_transfers.yml new file mode 100644 index 0000000..6c6d9f5 --- /dev/null +++ b/models/silver/streamline/external/transfers/silver_api__points_transfers.yml @@ -0,0 +1,57 @@ +version: 2 + +models: + - name: silver_api__points_transfers + description: "Response from the Reward Points API Transfers Endpoint. Logs each response and dedplicates by batch_id. Original _inserted_timestamp preserved as request timestamp." + tests: + - dbt_utils.recency: + datepart: day + field: request_date + interval: 1 + + columns: + - name: partition_key + + - name: request_date + + - name: from_address + tests: + - not_null + + - name: batch_id + tests: + - not_null + + - name: batch_index + tests: + - not_null + + - name: transfer_index + tests: + - not_null + + - name: boxes + tests: + - not_null + + - name: keys + tests: + - not_null + + - name: points + + - name: to_address + tests: + - not_null + + - name: points_transfers_id + tests: + - not_null + - unique + + - name: inserted_timestamp + + - name: modified_timestamp + + - name: _invocation_id + - name: _inserted_timestamp diff --git a/models/silver/streamline/external/transfers/streamline__points_transfers_realtime.sql b/models/silver/streamline/external/transfers/streamline__points_transfers_realtime.sql new file mode 100644 index 0000000..8e07099 --- /dev/null +++ b/models/silver/streamline/external/transfers/streamline__points_transfers_realtime.sql @@ -0,0 +1,32 @@ +{{ config ( + materialized = "view", + post_hook = fsc_utils.if_data_call_function_v2( + func = '{{this.schema}}.udf_bulk_rest_api_v2', + target = "{{this.schema}}.{{this.identifier}}", + params = { + "external_table": "points_transfers", + "sql_limit": "1", + "producer_batch_size": "1", + "worker_batch_size": "1", + "sql_source": "{{this.identifier}}" + } + ) +) }} + + +SELECT + DATE_PART('EPOCH', SYSDATE()) :: INTEGER AS partition_key, + {{ target.database }}.live.udf_api( + 'GET', + '{Service}/points/dapp/transfer/all', + { + 'Authorization': 'Bearer ' || '{{ env_var("JWT", "") }}', + 'Accept': 'application/json', + 'Connection': 'keep-alive', + 'Content-Type': 'application/json', + 'User-Agent': 'Flipside/0.1' + }, + {}, + 'Vault/prod/flow/points-api/prod' + ) AS request + diff --git a/models/sources.yml b/models/sources.yml index e487fb9..26fe39a 100644 --- a/models/sources.yml +++ b/models/sources.yml @@ -126,6 +126,7 @@ sources: - name: evm_receipts - name: evm_traces - name: reward_points + - name: points_transfers - name: crosschain_silver database: crosschain diff --git a/python/points/authenticate.py b/python/points/authenticate.py new file mode 100644 index 0000000..119e30c --- /dev/null +++ b/python/points/authenticate.py @@ -0,0 +1,136 @@ +import os +import sys +import requests +from eth_account import Account +from eth_account.messages import encode_defunct +from web3 import Web3 +import json +from datetime import datetime +import logging + +# Setup logging +logging.basicConfig( + level=logging.INFO, + format='%(asctime)s - %(levelname)s - %(message)s' +) +logger = logging.getLogger(__name__) + +# Retrieve environment variables +FLOW_POINTS_URL = os.getenv('FLOW_POINTS_URL') +PRIVATE_KEY = os.getenv('PRIVATE_KEY') +PUBLIC_ADDRESS = os.getenv('PUBLIC_ADDRESS') + +def verify_wallet_address(address): + """ + Verifies if the provided address is a valid Ethereum address. + + Args: + address (str): The Ethereum address to verify. + + Returns: + bool: Returns True if valid, False otherwise. + """ + if not Web3.is_address(address): + logger.error("Invalid Ethereum address format.") + return False + + logger.info("The wallet address is valid and correctly formatted.") + return True + +def authenticate_dapp(): + """ + Authenticates the Dapp by generating a JWT. + + Steps: + 1. Create a Dapp Challenge. + 2. Sign the challengeData using the private key. + 3. Solve the Dapp Challenge to receive a JWT. + + Returns: + str: The JWT token. + """ + if not PRIVATE_KEY or not PUBLIC_ADDRESS: + logger.error("Error: PRIVATE_KEY or PUBLIC_ADDRESS not set in environment.") + return False + + # Verify the public address format + if not verify_wallet_address(PUBLIC_ADDRESS): + logger.error("Error: Invalid PUBLIC_ADDRESS format in environment.") + return False + + try: + # Validate private key by attempting to create a wallet + _ = Account.from_key(PRIVATE_KEY) + logger.info("Wallet initialized successfully.") + + # Step 1: Create Dapp Challenge + logger.info("Creating Dapp Challenge...") + challenge_endpoint = f"{FLOW_POINTS_URL}/points/dapp/challenge" + challenge_payload = { + "addressId": PUBLIC_ADDRESS.lower(), + } + headers = { + "Content-Type": "application/json" + } + + challenge_response = requests.post(challenge_endpoint, headers=headers, json=challenge_payload) + + if challenge_response.status_code != 200: + error_message = f"Failed to create Dapp challenge: {challenge_response.status_code} {challenge_response.reason}" + logger.error(error_message) + return False + + challenge_data = challenge_response.json().get("challengeData") + logger.info("Dapp Challenge created successfully.") + + # Step 2: Sign the challengeData + logger.info("Signing challengeData...") + message = encode_defunct(text=challenge_data) + signed_message = Account.sign_message(message, private_key=PRIVATE_KEY) + signature = f"0x{signed_message.signature.hex()}" + logger.info("challengeData signed successfully.") + + # Step 3: Solve Dapp Challenge to get JWT + logger.info("Solving Dapp Challenge...") + solve_endpoint = f"{FLOW_POINTS_URL}/points/dapp/solve" + solve_payload = { + "challengeData": challenge_data, + "signature": signature, + } + + solve_response = requests.post(solve_endpoint, headers=headers, json=solve_payload) + + if solve_response.status_code != 200: + error_message = f"Failed to solve Dapp challenge: {solve_response.status_code} {solve_response.reason}" + logger.error(error_message) + return False + + token = solve_response.json().get("token") + logger.info("JWT generated successfully.") + + # Set the JWT as an environment variable + with open(os.environ['GITHUB_ENV'], 'a') as f: + f.write(f"JWT={token}\n") + + return True + + except requests.exceptions.RequestException as e: + error_message = f"HTTP Request failed: {e}" + logger.error(error_message) + return False + except Exception as e: + error_message = f"An unexpected error occurred: {e}" + logger.error(error_message) + return False + +def main(): + """ + Main function to execute the authentication process. + """ + if not authenticate_dapp(): + logger.error("Authentication failed. Exiting with status code 1.") + sys.exit(1) + logger.info("Authentication succeeded.") + +if __name__ == "__main__": + main() diff --git a/requirements.txt b/requirements.txt index aacfb7d..864b572 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,2 +1,4 @@ dbt-snowflake>=1.7,<1.8 +eth-account==0.13.4 protobuf==4.25.3 +web3==7.5.0 diff --git a/tests/points/tests__unique_address_count_threshold.sql b/tests/points/tests__unique_address_count_threshold.sql new file mode 100644 index 0000000..cb67930 --- /dev/null +++ b/tests/points/tests__unique_address_count_threshold.sql @@ -0,0 +1,21 @@ +{{ config( + severity = "error", + tags = ["streamline_non_core"] +) }} +{# This test is to alert if the total EVM Addresses increases and the +model calling balances needs to be adjusted with a higher SQL Limit #} +WITH distinct_count AS ( + + SELECT + COUNT( + DISTINCT address + ) AS ct + FROM + {{ ref('streamline__evm_addresses') }} +) +SELECT + * +FROM + distinct_count +WHERE + ct > 7000