add tests for Opey status and invoke endpoints

This commit is contained in:
nemo 2025-02-12 15:06:52 +00:00
parent 7ec6a447c1
commit b78daae06e
4 changed files with 185 additions and 72 deletions

View File

@ -22,7 +22,110 @@ export class OpeyController {
async getStatus(
@Res() response: Response
): Response {
return response.status(200).json({status: 'Opey is running'});
try {
const opeyStatus = await this.opeyClientService.getOpeyStatus()
console.log("Opey status: ", opeyStatus)
return response.status(200).json({status: 'Opey is running'});
} catch (error) {
console.error("Error in /opey endpoint: ", error);
return response.status(500).json({ error: 'Internal Server Error' });
}
}
@Post('/stream')
async streamOpey(
@Session() session: any,
@Req() request: Request,
@Res() response: Response
) {
let user_input: UserInput
try {
user_input = {
"message": request.body.message,
"thread_id": request.body.thread_id,
"is_tool_call_approval": request.body.is_tool_call_approval
}
} catch (error) {
console.error("Error in stream endpoint, could not parse into UserInput: ", error)
return response.status(500).json({ error: 'Internal Server Error' })
}
console.log("Calling OpeyClientService.stream")
const streamMiddlewareTransform = new Transform({
transform(chunk, encoding, callback) {
console.log(`Logged Chunk: ${chunk}`)
this.push(chunk);
callback();
}
})
try {
const nodeStream = await this.opeyClientService.stream(user_input)
console.log(`Stream received from OpeyClientService.stream: ${nodeStream.readable}`)
nodeStream.pipe(streamMiddlewareTransform).pipe(response)
response.status(200)
response.setHeader('Content-Type', 'text/event-stream')
response.setHeader('Cache-Control', 'no-cache')
response.setHeader('Connection', 'keep-alive')
// nodeStream.on('data', (chunk) => {
// const data = chunk.toString()
// console.log(`data: ${data}`)
// response.write(`data: ${data}\n\n`)
// })
// nodeStream.on('end', () => {
// console.log('Stream ended')
// response.end()
// })
// nodeStream.on('error', (error) => {
// console.error(error)
// response.write(`data: Error reading stream\n\n`)
// response.end()
// })
} catch (error) {
console.error(error)
response.status(500).json({ error: 'Internal Server Error' })
}
}
@Post('/invoke')
async invokeOpey(
@Session() session: any,
@Req() request: Request,
@Res() response: Response
): Response {
let user_input: UserInput
try {
user_input = {
"message": request.body.message,
"thread_id": request.body.thread_id,
"is_tool_call_approval": request.body.is_tool_call_approval
}
} catch (error) {
console.error("Error in stream endpoint, could not parse into UserInput: ", error)
return response.status(500).json({ error: 'Internal Server Error' })
}
try {
const opey_response = await this.opeyClientService.invoke(user_input)
console.log("Opey response: ", opey_response)
return response.status(200).json(opey_response)
} catch (error) {
console.error(error)
return response.status(500).json({ error: 'Internal Server Error' })
}
}
@Post('/consent')
@ -113,65 +216,5 @@ export class OpeyController {
}
@Post('/stream')
async streamOpey(
@Session() session: any,
@Req() request: Request,
@Res() response: Response
) {
let user_input: UserInput
try {
user_input = {
"message": request.body.message,
"thread_id": request.body.thread_id,
"is_tool_call_approval": request.body.is_tool_call_approval
}
} catch (error) {
console.error("Error in stream endpoint, could not parse into UserInput: ", error)
return response.status(500).json({ error: 'Internal Server Error' })
}
console.log("Calling OpeyClientService.stream")
const streamMiddlewareTransform = new Transform({
transform(chunk, encoding, callback) {
console.log(`Logged Chunk: ${chunk}`)
this.push(chunk);
callback();
}
})
try {
const nodeStream = await this.opeyClientService.stream(user_input)
console.log(`Stream received from OpeyClientService.stream: ${nodeStream.readable}`)
nodeStream.pipe(streamMiddlewareTransform).pipe(response)
response.status(200)
response.setHeader('Content-Type', 'text/event-stream')
response.setHeader('Cache-Control', 'no-cache')
response.setHeader('Connection', 'keep-alive')
nodeStream.on('data', (chunk) => {
const data = chunk.toString()
console.log(`data: ${data}`)
response.write(`data: ${data}\n\n`)
})
nodeStream.on('end', () => {
console.log('Stream ended')
response.end()
})
nodeStream.on('error', (error) => {
console.error(error)
response.write(`data: Error reading stream\n\n`)
response.end()
})
} catch (error) {
console.error(error)
response.status(500).json({ error: 'Internal Server Error' })
}
}
}

View File

@ -9,15 +9,14 @@ export class StreamInput extends UserInput {
stream_tokens: boolean;
}
export type OpeyPaths = {
[key: string]: string;
}
export type OpeyConfig = {
baseUri: string,
authConfig: any,
paths: {
stream: string,
invoke: string,
approve_tool: string,
feedback: string,
}
paths: OpeyPaths,
}
export type AuthConfig = {

View File

@ -15,6 +15,7 @@ export default class OpeyClientService {
baseUri: process.env.VITE_CHATBOT_URL? process.env.VITE_CHATBOT_URL : 'http://localhost:5000',
authConfig: this.authConfig,
paths: {
status: '/status',
stream: '/stream',
invoke: '/invoke',
approve_tool: '/approve_tool/{thead_id}',
@ -23,7 +24,31 @@ export default class OpeyClientService {
}
}
async getOpeyStatus(): Promise<any> {
// Endpoint to check if Opey is running
try {
const url = `${this.opeyConfig.baseUri}${this.opeyConfig.paths.status}`
const response = await fetch(url, {
method: 'GET',
headers: {}
})
if (response.status === 200) {
const status = await response.json()
return status
} else {
throw new Error(`Error getting status from Opey: ${response.status} ${response.statusText}`)
}
} catch (error) {
throw new Error(`Error getting status from Opey: ${error}`)
}
}
async stream(user_input: UserInput): Promise<NodeJS.ReadableStream> {
// Endpoint to post a message to Opey and stream the response tokens/messages
try {
const url = `${this.opeyConfig.baseUri}${this.opeyConfig.paths.stream}`
@ -31,7 +56,7 @@ export default class OpeyClientService {
const stream_input = user_input as StreamInput
stream_input.stream_tokens = true
console.log(`Posting to Opey: ${JSON.stringify(stream_input)}\n URL: ${url}`) //DEBUG
console.log(`Posting to Opey with streaming: ${JSON.stringify(stream_input)}\n URL: ${url}`) //DEBUG
const response = await fetch(url, {
method: 'POST',
@ -47,10 +72,34 @@ export default class OpeyClientService {
return response.body as NodeJS.ReadableStream
}
catch (error) {
throw new Error(`Error streaming to Opey: ${error}`)
throw new Error(`Error streaming from Opey: ${error}`)
}
}
async invoke(user_input: UserInput): Promise<any> {
// Endpoint to post a message to Opey and get a response without stream
// I.e. a normal REST call
const url = `${this.opeyConfig.baseUri}${this.opeyConfig.paths.invoke}`
console.log(`Posting to Opey, STREAMING OFF: ${JSON.stringify(user_input)}\n URL: ${url}`) //DEBUG
try {
const response = await fetch(url, {
method: 'POST',
headers: {
"Authorization": `Bearer ${this.opeyConfig.authConfig.opeyJWT}`,
"Content-Type": "application/json"
},
body: JSON.stringify(user_input)
})
if (response.status === 200) {
const opey_response = await response.json()
return opey_response
} else {
throw new Error(`Error invoking Opey: ${response.status} ${response.statusText}`)
}
} catch (error) {
throw new Error(`Error invoking Opey: ${error}`)
}
}
}

View File

@ -23,6 +23,28 @@ describe('GET /api/opey', () => {
});
});
describe('GET /api/opey/invoke', () => {
let response: Response;
let userInput: UserInput = {
message: "Hello Opey",
thread_id: uuidv4(),
is_tool_call_approval: false
}
it('Should return 200', async () => {
const response = await request(app)
.post("/api/opey/invoke")
.send(userInput)
.set('Content-Type', 'application/json')
.then(response => {
console.log(`Response: ${response.body}`)
expect(response.status).toBe(200);
})
});
})
describe('POST /api/opey/stream', () => {