AN-5459/point transfers api integration (#378)

* point transfers api integration

* upd 2 silver models

* gha workflow with auth

* upd gha workflow

* add gha on push config for testing

* add print step

* chg echo

* upd auth.py

* rm run on commit from gha

* upd model tags

* merge 2 silver into 1, add yml, upd gha

* upd auth return vals

* add exit 1 on failure

* upd return on success True

* add gha on-run config for final test

* add backup default to env_var(JWT)

* add backup default to env_var(JWT) - 2

* rm run on commit from gha and upd sql limit for balance call

* add yml for evm address model

* add evm address threshold test

* upd gha workflow

* upd per CRs
This commit is contained in:
Jack Forgash 2024-11-15 09:43:52 -07:00 committed by GitHub
parent 7edd60669c
commit 4df472bace
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
17 changed files with 414 additions and 10 deletions

View File

@ -1,5 +1,5 @@
name: dbt_run_scheduled_reward_points_api name: dbt_run_scheduled_reward_points_realtime
run-name: dbt_run_scheduled_reward_points_api run-name: dbt_run_scheduled_reward_points_realtime
on: on:
workflow_dispatch: workflow_dispatch:
@ -19,6 +19,9 @@ env:
DATABASE: "${{ vars.DATABASE }}" DATABASE: "${{ vars.DATABASE }}"
WAREHOUSE: "${{ vars.WAREHOUSE }}" WAREHOUSE: "${{ vars.WAREHOUSE }}"
SCHEMA: "${{ vars.SCHEMA }}" SCHEMA: "${{ vars.SCHEMA }}"
FLOW_POINTS_URL: "${{ secrets.FLOW_POINTS_URL }}"
PRIVATE_KEY: "${{ secrets.PRIVATE_KEY }}"
PUBLIC_ADDRESS: "${{ secrets.PUBLIC_ADDRESS }}"
concurrency: concurrency:
group: ${{ github.workflow }} group: ${{ github.workflow }}
@ -40,9 +43,17 @@ jobs:
pip install -r requirements.txt pip install -r requirements.txt
dbt deps dbt deps
- name: Run DBT Jobs - name: Request User Points Balances
run: > run: >
dbt run -s tag:streamline_non_core --vars '{"STREAMLINE_INVOKE_STREAMS": True, "STREAMLINE_USE_DEV_FOR_EXTERNAL_TABLES": False}' dbt run -s streamline__reward_points_realtime --vars '{"STREAMLINE_INVOKE_STREAMS": True}'
- name: Authenticate with Flow Points API
run: |
python python/points/authenticate.py
- name: Request Points Transfers
run: >
dbt run -s streamline__points_transfers_realtime --vars '{"STREAMLINE_INVOKE_STREAMS": True}'
- name: Store logs - name: Store logs
uses: actions/upload-artifact@v3 uses: actions/upload-artifact@v3

View File

@ -42,11 +42,11 @@ jobs:
- name: Run DBT Jobs - name: Run DBT Jobs
run: > run: >
dbt run -s silver_api__reward_points dbt run -s tag:streamline_non_core
- name: Test DBT Models - name: Test DBT Models
run: > run: >
dbt test -s silver_api__reward_points dbt test -s tag:streamline_non_core
continue-on-error: true continue-on-error: true
- name: Log test results - name: Log test results

View File

@ -5,7 +5,8 @@
unique_key = "reward_points_id", unique_key = "reward_points_id",
incremental_strategy = 'merge', incremental_strategy = 'merge',
merge_exclude_columns = ["inserted_timestamp"], merge_exclude_columns = ["inserted_timestamp"],
cluster_by = ['_inserted_timestamp :: DATE', 'address'] cluster_by = ['_inserted_timestamp :: DATE', 'address'],
tags = ['streamline_non_core']
) }} ) }}
SELECT SELECT

View File

@ -5,13 +5,12 @@
target = "{{this.schema}}.{{this.identifier}}", target = "{{this.schema}}.{{this.identifier}}",
params = { params = {
"external_table": "reward_points", "external_table": "reward_points",
"sql_limit": "1000", "sql_limit": "10000",
"producer_batch_size": "1000", "producer_batch_size": "1000",
"worker_batch_size": "1000", "worker_batch_size": "1000",
"sql_source": "{{this.identifier}}" "sql_source": "{{this.identifier}}"
} }
), )
tags = ['streamline_non_core']
) }} ) }}
WITH evm_addresses AS ( WITH evm_addresses AS (

View File

@ -0,0 +1,24 @@
version: 2
models:
- name: streamline__evm_addresses
description: "Table of unique EVM addresses."
data_tests:
- dbt_utils.recency:
datepart: day
field: modified_timestamp
interval: 1
columns:
- name: address
tests:
- not_null
- unique
- name: modified_timestamp
- name: inserted_timestamp
- name: evm_addresses_id
- name: _invocation_id

View File

@ -0,0 +1,9 @@
{{ config (
materialized = 'view',
tags = ['streamline_non_core']
) }}
{{ streamline_external_table_FR_query_v2(
model = "points_transfers",
partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 3), '_', 1) AS INTEGER )"
) }}

View File

@ -0,0 +1,9 @@
{{ config (
materialized = 'view',
tags = ['streamline_non_core']
) }}
{{ streamline_external_table_query_v2(
model = "points_transfers",
partition_function = "CAST(SPLIT_PART(SPLIT_PART(file_name, '/', 3), '_', 1) AS INTEGER )"
) }}

View File

@ -0,0 +1,102 @@
-- depends_on: {{ ref('bronze_api__points_transfers') }}
-- depends_on: {{ ref('bronze_api__FR_points_transfers') }}
{{ config(
materialized = 'incremental',
unique_key = "batch_id",
incremental_strategy = 'merge',
merge_exclude_columns = ["inserted_timestamp", "_inserted_timestamp"],
cluster_by = ['modified_timestamp :: DATE', 'from_address'],
post_hook = [ "ALTER TABLE {{ this }} ADD SEARCH OPTIMIZATION on equality(from_address, to_address)" ],
tags = ['streamline_non_core']
) }}
WITH points_transfers_raw AS (
SELECT
partition_key,
TO_TIMESTAMP(partition_key) :: DATE AS request_date,
DATA,
_inserted_timestamp
FROM
{% if is_incremental() %}
{{ ref('bronze_api__points_transfers') }}
WHERE
_inserted_timestamp >= (
SELECT
MAX(_inserted_timestamp)
FROM
{{ this }}
)
{% else %}
{{ ref('bronze_api__FR_points_transfers') }}
{% endif %}
),
flatten_protocols AS (
SELECT
partition_key,
request_date,
_inserted_timestamp,
A.value :address :: STRING AS address,
A.value :transfers :: ARRAY AS transfers
FROM
points_transfers_raw,
LATERAL FLATTEN(DATA) A
),
flatten_batches AS (
SELECT
partition_key,
request_date,
_inserted_timestamp,
address AS from_address,
A.index AS batch_index,
A.value :batchId :: STRING AS batch_id,
A.value :status :: STRING AS batch_status,
A.value :transfers :: ARRAY AS batch_transfers
FROM
flatten_protocols,
LATERAL FLATTEN(
transfers
) A
),
flatten_transfers AS (
SELECT
partition_key,
request_date,
from_address,
batch_index,
batch_id,
_inserted_timestamp,
A.index AS transfer_index,
A.value :boxes :: NUMBER AS boxes,
A.value :keys :: NUMBER AS keys,
A.value :points :: NUMBER AS points,
A.value :toAddressId :: STRING AS to_address
FROM
flatten_batches,
LATERAL FLATTEN(batch_transfers) A
)
SELECT
request_date,
batch_id,
batch_index,
transfer_index,
from_address,
to_address,
boxes,
keys,
points,
partition_key,
{{ dbt_utils.generate_surrogate_key(
['batch_id', 'transfer_index']
) }} AS points_transfers_id,
SYSDATE() AS inserted_timestamp,
SYSDATE() AS modified_timestamp,
'{{ invocation_id }}' AS _invocation_id,
_inserted_timestamp
FROM
flatten_transfers
qualify(ROW_NUMBER() over (PARTITION BY batch_id
ORDER BY
_inserted_timestamp ASC)) = 1

View File

@ -0,0 +1,57 @@
version: 2
models:
- name: silver_api__points_transfers
description: "Response from the Reward Points API Transfers Endpoint. Logs each response and dedplicates by batch_id. Original _inserted_timestamp preserved as request timestamp."
tests:
- dbt_utils.recency:
datepart: day
field: request_date
interval: 1
columns:
- name: partition_key
- name: request_date
- name: from_address
tests:
- not_null
- name: batch_id
tests:
- not_null
- name: batch_index
tests:
- not_null
- name: transfer_index
tests:
- not_null
- name: boxes
tests:
- not_null
- name: keys
tests:
- not_null
- name: points
- name: to_address
tests:
- not_null
- name: points_transfers_id
tests:
- not_null
- unique
- name: inserted_timestamp
- name: modified_timestamp
- name: _invocation_id
- name: _inserted_timestamp

View File

@ -0,0 +1,32 @@
{{ config (
materialized = "view",
post_hook = fsc_utils.if_data_call_function_v2(
func = '{{this.schema}}.udf_bulk_rest_api_v2',
target = "{{this.schema}}.{{this.identifier}}",
params = {
"external_table": "points_transfers",
"sql_limit": "1",
"producer_batch_size": "1",
"worker_batch_size": "1",
"sql_source": "{{this.identifier}}"
}
)
) }}
SELECT
DATE_PART('EPOCH', SYSDATE()) :: INTEGER AS partition_key,
{{ target.database }}.live.udf_api(
'GET',
'{Service}/points/dapp/transfer/all',
{
'Authorization': 'Bearer ' || '{{ env_var("JWT", "") }}',
'Accept': 'application/json',
'Connection': 'keep-alive',
'Content-Type': 'application/json',
'User-Agent': 'Flipside/0.1'
},
{},
'Vault/prod/flow/points-api/prod'
) AS request

View File

@ -126,6 +126,7 @@ sources:
- name: evm_receipts - name: evm_receipts
- name: evm_traces - name: evm_traces
- name: reward_points - name: reward_points
- name: points_transfers
- name: crosschain_silver - name: crosschain_silver
database: crosschain database: crosschain

View File

@ -0,0 +1,136 @@
import os
import sys
import requests
from eth_account import Account
from eth_account.messages import encode_defunct
from web3 import Web3
import json
from datetime import datetime
import logging
# Setup logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
# Retrieve environment variables
FLOW_POINTS_URL = os.getenv('FLOW_POINTS_URL')
PRIVATE_KEY = os.getenv('PRIVATE_KEY')
PUBLIC_ADDRESS = os.getenv('PUBLIC_ADDRESS')
def verify_wallet_address(address):
"""
Verifies if the provided address is a valid Ethereum address.
Args:
address (str): The Ethereum address to verify.
Returns:
bool: Returns True if valid, False otherwise.
"""
if not Web3.is_address(address):
logger.error("Invalid Ethereum address format.")
return False
logger.info("The wallet address is valid and correctly formatted.")
return True
def authenticate_dapp():
"""
Authenticates the Dapp by generating a JWT.
Steps:
1. Create a Dapp Challenge.
2. Sign the challengeData using the private key.
3. Solve the Dapp Challenge to receive a JWT.
Returns:
str: The JWT token.
"""
if not PRIVATE_KEY or not PUBLIC_ADDRESS:
logger.error("Error: PRIVATE_KEY or PUBLIC_ADDRESS not set in environment.")
return False
# Verify the public address format
if not verify_wallet_address(PUBLIC_ADDRESS):
logger.error("Error: Invalid PUBLIC_ADDRESS format in environment.")
return False
try:
# Validate private key by attempting to create a wallet
_ = Account.from_key(PRIVATE_KEY)
logger.info("Wallet initialized successfully.")
# Step 1: Create Dapp Challenge
logger.info("Creating Dapp Challenge...")
challenge_endpoint = f"{FLOW_POINTS_URL}/points/dapp/challenge"
challenge_payload = {
"addressId": PUBLIC_ADDRESS.lower(),
}
headers = {
"Content-Type": "application/json"
}
challenge_response = requests.post(challenge_endpoint, headers=headers, json=challenge_payload)
if challenge_response.status_code != 200:
error_message = f"Failed to create Dapp challenge: {challenge_response.status_code} {challenge_response.reason}"
logger.error(error_message)
return False
challenge_data = challenge_response.json().get("challengeData")
logger.info("Dapp Challenge created successfully.")
# Step 2: Sign the challengeData
logger.info("Signing challengeData...")
message = encode_defunct(text=challenge_data)
signed_message = Account.sign_message(message, private_key=PRIVATE_KEY)
signature = f"0x{signed_message.signature.hex()}"
logger.info("challengeData signed successfully.")
# Step 3: Solve Dapp Challenge to get JWT
logger.info("Solving Dapp Challenge...")
solve_endpoint = f"{FLOW_POINTS_URL}/points/dapp/solve"
solve_payload = {
"challengeData": challenge_data,
"signature": signature,
}
solve_response = requests.post(solve_endpoint, headers=headers, json=solve_payload)
if solve_response.status_code != 200:
error_message = f"Failed to solve Dapp challenge: {solve_response.status_code} {solve_response.reason}"
logger.error(error_message)
return False
token = solve_response.json().get("token")
logger.info("JWT generated successfully.")
# Set the JWT as an environment variable
with open(os.environ['GITHUB_ENV'], 'a') as f:
f.write(f"JWT={token}\n")
return True
except requests.exceptions.RequestException as e:
error_message = f"HTTP Request failed: {e}"
logger.error(error_message)
return False
except Exception as e:
error_message = f"An unexpected error occurred: {e}"
logger.error(error_message)
return False
def main():
"""
Main function to execute the authentication process.
"""
if not authenticate_dapp():
logger.error("Authentication failed. Exiting with status code 1.")
sys.exit(1)
logger.info("Authentication succeeded.")
if __name__ == "__main__":
main()

View File

@ -1,2 +1,4 @@
dbt-snowflake>=1.7,<1.8 dbt-snowflake>=1.7,<1.8
eth-account==0.13.4
protobuf==4.25.3 protobuf==4.25.3
web3==7.5.0

View File

@ -0,0 +1,21 @@
{{ config(
severity = "error",
tags = ["streamline_non_core"]
) }}
{# This test is to alert if the total EVM Addresses increases and the
model calling balances needs to be adjusted with a higher SQL Limit #}
WITH distinct_count AS (
SELECT
COUNT(
DISTINCT address
) AS ct
FROM
{{ ref('streamline__evm_addresses') }}
)
SELECT
*
FROM
distinct_count
WHERE
ct > 7000