streams WIP

This commit is contained in:
nemo 2025-02-11 17:12:07 +00:00
parent a98dc346e8
commit 7ec6a447c1
8 changed files with 954 additions and 163 deletions

6
babel.config.js Normal file
View File

@ -0,0 +1,6 @@
module.exports = {
presets: [
['@babel/preset-env', {targets: {node: 'current'}}],
'@babel/preset-typescript',
],
};

View File

@ -1,5 +1,8 @@
module.exports = {
transform: {'^.+\\.ts?$': 'ts-jest'},
transform: {
'^.+\\.ts?$': 'ts-jest',
"^.+\\.(js)$": "babel-jest",
},
preset: 'ts-jest',
testEnvironment: 'node',
testRegex: '/tests/.*\\.(test|spec)?\\.(ts|tsx)$',

View File

@ -12,7 +12,7 @@
"build-server": "tsc --project tsconfig.server.json",
"preview": "vite preview",
"test:unit": "vitest",
"test": "jest",
"test": "jest --silent=false",
"build-only": "vite build",
"type-check": "vue-tsc --noEmit -p tsconfig.vitest.json --composite false",
"lint": "eslint . --ext .vue,.js,.jsx,.cjs,.mjs,.ts,.tsx,.cts,.mts --fix --ignore-path .gitignore",
@ -39,6 +39,7 @@
"json-editor-vue": "^0.17.3",
"jsonwebtoken": "^9.0.2",
"markdown-it": "^14.1.0",
"node-fetch": "v2.6",
"oauth": "^0.10.0",
"obp-typescript": "^1.0.36",
"pinia": "^2.0.37",
@ -59,17 +60,21 @@
"ws": "^8.18.0"
},
"devDependencies": {
"@babel/core": "^7.26.8",
"@babel/preset-env": "^7.26.8",
"@babel/preset-typescript": "^7.26.0",
"@rushstack/eslint-patch": "^1.4.0",
"@types/jsdom": "^21.1.7",
"@types/jsonwebtoken": "^9.0.6",
"@types/markdown-it": "^14.1.1",
"@types/node": "^20.17.16",
"@types/node": "^20.17.17",
"@vitejs/plugin-vue": "^4.3.0",
"@vitejs/plugin-vue-jsx": "^3.1.0",
"@vue/eslint-config-prettier": "^9.0.0",
"@vue/eslint-config-typescript": "^14.0.0",
"@vue/test-utils": "^2.4.0",
"@vue/tsconfig": "^0.1.3",
"babel-jest": "^29.7.0",
"eslint": "^9.15.0",
"eslint-plugin-vue": "^9.12.0",
"jest": "^29.7.0",

View File

@ -6,6 +6,7 @@ import { Service } from 'typedi'
import OBPClientService from '../services/OBPClientService'
import OpeyClientService from '../services/OpeyClientService'
import { v6 as uuid6 } from 'uuid';
import { Transform } from 'stream'
import { UserInput } from '../schema/OpeySchema'
@Service()
@ -133,29 +134,44 @@ export class OpeyController {
}
console.log("Calling OpeyClientService.stream")
const streamMiddlewareTransform = new Transform({
transform(chunk, encoding, callback) {
console.log(`Logged Chunk: ${chunk}`)
this.push(chunk);
callback();
}
})
try {
console.log("Calling OpeyClientService.stream")
const stream = await this.opeyClientService.stream(user_input)
try{
const nodeStream = await this.opeyClientService.stream(user_input)
console.log(`Stream received from OpeyClientService.stream: ${nodeStream.readable}`)
nodeStream.pipe(streamMiddlewareTransform).pipe(response)
response.status(200)
response.setHeader('Content-Type', 'text/event-stream')
return stream
response.setHeader('Cache-Control', 'no-cache')
response.setHeader('Connection', 'keep-alive')
nodeStream.on('data', (chunk) => {
const data = chunk.toString()
console.log(`data: ${data}`)
response.write(`data: ${data}\n\n`)
})
nodeStream.on('end', () => {
console.log('Stream ended')
response.end()
})
nodeStream.on('error', (error) => {
console.error(error)
response.write(`data: Error reading stream\n\n`)
response.end()
})
} catch (error) {
console.error("Error in stream endpoint: ", error)
return response.status(500).json({ error: 'Internal Server Error' })
console.error(error)
response.status(500).json({ error: 'Internal Server Error' })
}
} catch (error) {
console.error("Error in stream endpoint: ", error)
return response.status(500).json({ error: 'Internal Server Error' })
}
}
}

View File

@ -5,6 +5,10 @@ export class UserInput {
is_tool_call_approval: boolean;
}
export class StreamInput extends UserInput {
stream_tokens: boolean;
}
export type OpeyConfig = {
baseUri: string,
authConfig: any,

View File

@ -1,6 +1,6 @@
import { Service } from 'typedi'
import { got } from 'got';
import { UserInput, OpeyConfig, AuthConfig } from '../schema/OpeySchema'
import { UserInput, StreamInput, OpeyConfig, AuthConfig } from '../schema/OpeySchema'
import fetch from 'node-fetch';
@Service()
export default class OpeyClientService {
@ -23,22 +23,28 @@ export default class OpeyClientService {
}
}
async stream(user_input: UserInput): Promise<any> {
async stream(user_input: UserInput): Promise<NodeJS.ReadableStream> {
try {
console.log(`Streaming to Opey: ${JSON.stringify(user_input)}`) //DEBUG
const stream = got.stream.post(`${this.opeyConfig.baseUri}${this.opeyConfig.paths.stream}`, {
const url = `${this.opeyConfig.baseUri}${this.opeyConfig.paths.stream}`
// We need to set whether we want to stream tokens or not
const stream_input = user_input as StreamInput
stream_input.stream_tokens = true
console.log(`Posting to Opey: ${JSON.stringify(stream_input)}\n URL: ${url}`) //DEBUG
const response = await fetch(url, {
method: 'POST',
headers: {
"Authorization": `Bearer ${this.opeyConfig.authConfig.opeyJWT}`
"Authorization": `Bearer ${this.opeyConfig.authConfig.opeyJWT}`,
"Content-Type": "application/json"
},
body: JSON.stringify(user_input),
});
console.log(`Response from Opey: ${stream}`) //DEBUG
//response.data.on('data', (chunk) => {console.log(`Recieved chunk: ${chunk.toString()}`)});
return stream;
body: JSON.stringify(stream_input)
})
if (!response.body) {
throw new Error("No response body")
}
return response.body as NodeJS.ReadableStream
}
catch (error) {
throw new Error(`Error streaming to Opey: ${error}`)

View File

@ -1,11 +1,15 @@
import { OpeyController } from "../server/controllers/OpeyController";
import app from '../server/app';
import request from 'supertest';
import fetch from 'node-fetch';
import http from 'node:http';
import { UserInput } from '../server/schema/OpeySchema';
import {v4 as uuidv4} from 'uuid';
import { agent } from "superagent";
const BEFORE_ALL_TIMEOUT = 30000; // 30 sec
const SERVER_URL = process.env.VITE_OBP_API_EXPLORER_HOST
describe('GET /api/opey', () => {
let response: Response;
@ -21,35 +25,74 @@ describe('GET /api/opey', () => {
describe('POST /api/opey/stream', () => {
let response: Response;
it('Should return 200', async () => {
const httpAgent = new http.Agent({ keepAlive: true, port: 9999 });
beforeAll(async () => {
app.listen(5173)
});
afterAll(async () => {
app.close()
httpAgent.destroy()
});
it('Should stream response', async () => {
let userInput: UserInput = {
message: "Hello Opey",
thread_id: uuidv4(),
is_tool_call_approval: false
}
const response = await request(app)
.post("/api/opey/stream")
.send(userInput)
.set('Content-Type', 'application/json')
.buffer(false)
.parse((res, callback) => {
res.on('data', (chunk) => {
console.log(`Recieved chunk: ${chunk.toString()}`);
expect(chunk.toString()).toBeTruthy();
});
res.on('end', () => {
callback(null, null);
});
});
expect(response.status).toBe(200);
expect(response.headers['content-type']).toContain('text/event-stream'); // Ensure it is an SSE stream
expect(response.headers['transfer-encoding']).toBe('chunked'); // Ensure it is streamed
});
// const response = await request(app)
// .post("/api/opey/stream")
// .set('Content-Type', 'text/event-stream')
// .responseType('blob')
// .send(userInput)
await fetch(`${SERVER_URL}/api/opey/stream`, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'connection': 'keep-alive'
},
body: JSON.stringify(userInput),
})
.catch(error => {
console.error(`Error performing test fetch: ${error}`)
})
.then(streamingResponse => {
console.log(streamingResponse)
streamingResponse.body.on('data', (chunk) => {
console.log(`${chunk}`)
})
// response.on
// console.log(response.body)
// const readable = response.body
// readable.on('data', (chunk) => {
// const data = chunk.toString()
// console.log(`data: ${data}`)
// })
})
.finally(() => {
httpAgent.destroy()
})
// while (true) {
// const {value, done} = await reader.read();
// if (done) break;
// console.log('Received', value);
// }
// expect(response.headers['content-type']).toBe('text/event-stream')
// expect(response.status).toBe(200)
// Optionally, parse chunks or check SSE headers
})
});

922
yarn.lock

File diff suppressed because it is too large Load Diff