Update socketcluster, sc-broker. Add scc-broker-client, sc-framework-health-check, sc-hot-reboot.

This commit is contained in:
Daniel Rose 2018-12-18 13:01:06 +01:00
parent 83d71fa6b9
commit b9526c830f
22 changed files with 473 additions and 7 deletions

View File

@ -117,7 +117,7 @@ export class Server extends EventEmitter {
on(event: "ready", listener: () => void): this;
on(event: "error", listener: (err?: Error) => void): this;
sendToBroker(brokerId: string, data: any, callback: (err: Error | null, data: any) => void): void;
sendToBroker(brokerId: string, data: any, callback?: (err: Error | null, data: any) => void): void;
killBrokers(): void;
destroy(): void;
}

View File

@ -5,6 +5,7 @@
// TypeScript Version: 2.4
import { SCServer } from "socketcluster-server";
import { SCBrokerOptions } from "./scbroker";
import { WorkerExitInfo } from "socketcluster";
import { EventEmitter } from "events";
import { KeyChain, FlexiMap } from "fleximap";
@ -23,7 +24,7 @@ export interface SCBrokerServerOptions {
processTermTimeout?: number;
ipcAckTimeout?: number;
secretKey?: string;
brokerOptions?: SCServer.SCServerOptions;
brokerOptions?: SCBrokerOptions;
}
export interface SCBrokerServer extends EventEmitter {

View File

@ -1,16 +1,19 @@
import { SCServer, SCServerSocket } from "socketcluster-server";
import { SCServerSocket } from "socketcluster-server";
import SCBroker = require("sc-broker/scbroker");
import { FlexiMap } from "fleximap";
import { ExpiryManager } from "expirymanager";
import * as scClusterBrokerClient from "scc-broker-client";
////////////////////////////////////////////////////
/// SCBroker tests
////////////////////////////////////////////////////
const options: SCServer.SCServerOptions = { port: 80 };
const run = () => {
console.log("run called!");
};
let scBroker = new SCBroker();
scBroker = new SCBroker(options);
scBroker = new SCBroker({ run });
scBroker.options = { environment: "prod" };
const id: number = scBroker.id;
@ -56,3 +59,25 @@ class MyBroker extends SCBroker {
this.on("subscribe", channel => {});
}
}
// From the socketcluster sample
class Broker extends SCBroker {
run() {
console.log(" >> Broker PID:", process.pid);
if (this.options.clusterStateServerHost) {
scClusterBrokerClient.attach(this, {
stateServerHost: this.options.clusterStateServerHost,
stateServerPort: this.options.clusterStateServerPort,
mappingEngine: this.options.clusterMappingEngine,
clientPoolSize: this.options.clusterClientPoolSize,
authKey: this.options.clusterAuthKey,
stateServerConnectTimeout: this.options.clusterStateServerConnectTimeout,
stateServerAckTimeout: this.options.clusterStateServerAckTimeout,
stateServerReconnectRandomness: this.options.clusterStateServerReconnectRandomness
});
}
}
}
new Broker();

View File

@ -12,21 +12,53 @@ interface Subscriptions {
}
declare class SCBroker extends EventEmitter {
readonly type: "broker";
readonly MIDDLEWARE_SUBSCRIBE: "subscribe";
readonly MIDDLEWARE_PUBLISH_IN: "publishIn";
id: number;
options: SCServer.SCServerOptions;
debugPort: number;
options: SCBroker.SCBrokerOptions;
instanceId: number;
dataMap: FlexiMap;
dataExpirer: ExpiryManager;
subscriptions: Subscriptions;
constructor(options?: SCServer.SCServerOptions);
constructor(options?: { run?: () => void });
on(event: "subscribe" | "unsubscribe", listener: (channel: string) => void): this;
on(event: "publish", listener: (channel: string, data: any) => void): this;
on(event: "masterMessage", listener: (data: any, respond: (err: Error | null, responseData: any) => void) => void): this;
on(event: "message", listener: (message: any, respond: (err: Error | null, responseData: any) => void) => void): this;
on(event: "warning", listener: (err: Error) => void): this;
publish(channel: string, message: any): void;
run(): void;
exec(query: (dataMap: FlexiMap, dataExpirer: ExpiryManager, subscriptions: Subscriptions) => any, baseKey?: KeyChain): any;
sendToMaster(data: any, callback?: (err: Error | null, responseData: any) => void): void;
}
declare namespace SCBroker {
interface SCBrokerOptions {
// An ID to associate with this specific instance of SC
// this may be useful if you are running an SC app on multiple
// hosts - You can access the instanceId from the Broker object
// (inside brokerController) - If you don't provide an instanceId,
// SC will generate a random one (UUID v4)
instanceId?: string;
// A key which various SC processes will use to interact with
// scBroker broker processes securely, defaults to a 256 bits
// cryptographically random hex string
secretKey?: string;
// In milliseconds, the timeout for calling res(err, data) when
// your sendToWorker, sendToBroker or sendToMaster (IPC) call
// expects an ACK response from the other process
// (when callback is provided)
ipcAckTimeout?: number;
[additionalOptions: string]: any;
}
}

View File

@ -0,0 +1,10 @@
// Type definitions for sc-framework-health-check 2.0
// Project: https://github.com/SocketCluster/sc-framework-health-check
// Definitions by: Daniel Rose <https://github.com/DanielRose>
// Definitions: https://github.com/DefinitelyTyped/DefinitelyTyped
// TypeScript Version: 2.4
import SCWorker = require("socketcluster/scworker");
import { Express } from "express";
export function attach(worker: SCWorker, expressApp: Express): void;

View File

@ -0,0 +1,11 @@
// Adapted from the socketcluster sample
import healthChecker = require("sc-framework-health-check");
import express = require("express");
import SCWorker = require("socketcluster/scworker");
const app = express();
const worker = new SCWorker();
// Add GET /health-check express route
healthChecker.attach(worker, app);

View File

@ -0,0 +1,23 @@
{
"compilerOptions": {
"module": "commonjs",
"lib": [
"es6"
],
"noImplicitAny": true,
"noImplicitThis": true,
"strictNullChecks": true,
"strictFunctionTypes": true,
"baseUrl": "../",
"typeRoots": [
"../"
],
"types": [],
"noEmit": true,
"forceConsistentCasingInFileNames": true
},
"files": [
"index.d.ts",
"sc-framework-health-check-tests.ts"
]
}

View File

@ -0,0 +1,3 @@
{
"extends": "dtslint/dt.json"
}

10
types/sc-hot-reboot/index.d.ts vendored Normal file
View File

@ -0,0 +1,10 @@
// Type definitions for sc-hot-reboot 1.0
// Project: https://github.com/SocketCluster/sc-channel
// Definitions by: Daniel Rose <https://github.com/DanielRose>
// Definitions: https://github.com/DefinitelyTyped/DefinitelyTyped
// TypeScript Version: 2.4
import SocketCluster = require("socketcluster");
import { WatchOptions } from "chokidar";
export function attach(scMasterInstance: SocketCluster, options?: WatchOptions): void;

View File

@ -0,0 +1,9 @@
import * as scHotReboot from "sc-hot-reboot";
import SocketCluster = require("socketcluster");
const socketCluster = new SocketCluster();
scHotReboot.attach(socketCluster, {
cwd: __dirname,
ignored: ["public", "node_modules", "README.md", "Dockerfile", "server.js", "broker.js", /[\/\\]\./, "*.log"]
});

View File

@ -0,0 +1,23 @@
{
"compilerOptions": {
"module": "commonjs",
"lib": [
"es6"
],
"noImplicitAny": true,
"noImplicitThis": true,
"strictNullChecks": true,
"strictFunctionTypes": true,
"baseUrl": "../",
"typeRoots": [
"../"
],
"types": [],
"noEmit": true,
"forceConsistentCasingInFileNames": true
},
"files": [
"index.d.ts",
"sc-hot-reboot-tests.ts"
]
}

View File

@ -0,0 +1,3 @@
{
"extends": "dtslint/dt.json"
}

View File

@ -0,0 +1,68 @@
import { EventEmitter } from "events";
import SCClientSocket = require("socketcluster-client/lib/scclientsocket");
import Hasher = require("./hasher");
import { Secret } from "jsonwebtoken";
interface ClientPoolOptions {
clientCount?: number;
targetURI: string;
authKey?: Secret;
}
interface BrokenDownURI {
hostname: string;
port?: string;
secure?: true;
}
declare class ClientPool extends EventEmitter {
hasher: Hasher;
clientCount: number;
targetURI: string;
authKey?: Secret;
areClientListenersBound: boolean;
clients: SCClientSocket[];
constructor(options?: ClientPoolOptions);
on(event: "error", listener: (err: Error) => void): this;
on(event: "subscribe", listener: (data: ClientPool.SubscribeData) => void): this;
on(event: "subscribeFail", listener: (data: ClientPool.SubscribeFailData) => void): this;
on(event: "publish" | "publishFail", listener: (data: ClientPool.PublishData) => void): this;
bindClientListeners(): void;
unbindClientListeners(): void;
breakDownURI(uri: string): BrokenDownURI;
selectClient(key: string): SCClientSocket;
publish(channelName: string, data: any): void;
subscriptions(includePending?: boolean): string[];
subscribeAndWatch(channelName: string, handler: (data: any) => void): void;
destroyChannel(channelName: string): void;
destroy(): void;
}
export = ClientPool;
declare namespace ClientPool {
interface SubscribeData {
targetURI: string;
poolIndex: number;
channel: string;
}
interface SubscribeFailData extends SubscribeData {
error: Error;
}
interface PublishData {
targetURI: string;
poolIndex: number;
channel: string;
data: any;
}
}

View File

@ -0,0 +1,34 @@
import { EventEmitter } from "events";
import SCBroker = require("sc-broker/scbroker");
import { Secret } from "jsonwebtoken";
import { MappingEngine, SCCBrokerClientOptions } from ".";
import ClientPool = require("./client-pool");
declare class ClusterBrokerClient extends EventEmitter {
broker: SCBroker;
sccBrokerClientPools: ClientPool[];
sccBrokerURIList: string[];
authKey?: Secret;
mappingEngine: "skeletonRendezvous" | "simple" | MappingEngine;
clientPoolSize: number;
mapper: MappingEngine;
constructor(broker: SCBroker, options?: SCCBrokerClientOptions);
on(event: "error", listener: (err: Error) => void): this;
on(event: "subscribe", listener: (data: ClientPool.SubscribeData) => void): this;
on(event: "subscribeFail", listener: (data: ClientPool.SubscribeFailData) => void): this;
on(event: "publish" | "publishFail", listener: (data: ClientPool.PublishData) => void): this;
on(event: "message", listener: (channelName: string, packet: any) => void): this;
mapChannelNameToBrokerURI(channelName: string): string;
setBrokers(sccBrokerURIList: string[]): void;
getAllSubscriptions(): string[];
subscribe(channelName: string): void;
unsubscribe(channelName: string): void;
publish(channelName: string, data: any): void;
}
export = ClusterBrokerClient;

6
types/scc-broker-client/hasher.d.ts vendored Normal file
View File

@ -0,0 +1,6 @@
declare class Hasher {
hashToIndex(key: string, modulo: number): number;
hashToHex(key: string, algorithm?: string): string;
}
export = Hasher;

33
types/scc-broker-client/index.d.ts vendored Normal file
View File

@ -0,0 +1,33 @@
// Type definitions for scc-broker-client 6.1
// Project: https://github.com/SocketCluster/scc-broker-client
// Definitions by: Daniel Rose <https://github.com/DanielRose>
// Definitions: https://github.com/DefinitelyTyped/DefinitelyTyped
// TypeScript Version: 2.4
import SCBroker = require("sc-broker/scbroker");
import ClusterBrokerClient = require("./cluster-broker-client");
import { Secret } from "jsonwebtoken";
export interface MappingEngine {
setSites(sites: string[]): void;
getSites(): string[];
findSite(key: string): string;
}
export interface SCCBrokerClientOptions {
stateServerReconnectRandomness?: number;
authKey?: Secret;
mappingEngine?: "skeletonRendezvous" | "simple" | MappingEngine;
clientPoolSize?: number;
stateServerHost: string;
stateServerPort?: number;
stateServerConnectTimeout?: number;
stateServerAckTimeout?: number;
noErrorLogging?: boolean;
brokerRetryDelay?: number;
}
export function attach(broker: SCBroker, options: SCCBrokerClientOptions): ClusterBrokerClient;

View File

@ -0,0 +1,41 @@
import * as scClusterBrokerClient from "scc-broker-client";
import SCBroker = require("sc-broker/scbroker");
const scBroker = new SCBroker();
const clusterBrokerClient = scClusterBrokerClient
.attach(scBroker, {
stateServerHost: "localhost",
stateServerPort: 8000,
mappingEngine: "simple",
clientPoolSize: 100,
authKey: "secret-key",
stateServerConnectTimeout: 10000,
stateServerAckTimeout: 1000,
stateServerReconnectRandomness: 100
})
.on("error", err => {
console.log(`Received ${err}`);
})
.on("subscribe", data => {
console.log(`Subscribed to ${data.channel}, ${data.poolIndex}, ${data.targetURI}`);
})
.on("subscribeFail", data => {
console.log(`Error ${data.error} while subscribing to ${data.channel}, ${data.poolIndex}, ${data.targetURI}`);
})
.on("publish", data => {
console.log(`Published ${data.data} to ${data.channel}, ${data.poolIndex}, ${data.targetURI}`);
})
.on("publishFail", data => {
console.log(`Error while publishing ${data.data} to ${data.channel}, ${data.poolIndex}, ${data.targetURI}`);
})
.on("message", (channelName, packet) => {
console.log(`Received ${packet} on channel ${channelName}`);
});
clusterBrokerClient.subscribe("test-channel");
clusterBrokerClient.publish("test-channel", "lalala");
const subs = clusterBrokerClient.getAllSubscriptions();
clusterBrokerClient.unsubscribe("test-channel");

View File

@ -0,0 +1,26 @@
{
"compilerOptions": {
"module": "commonjs",
"lib": [
"es6"
],
"noImplicitAny": true,
"noImplicitThis": true,
"strictNullChecks": true,
"strictFunctionTypes": true,
"baseUrl": "../",
"typeRoots": [
"../"
],
"types": [],
"noEmit": true,
"forceConsistentCasingInFileNames": true
},
"files": [
"index.d.ts",
"client-pool.d.ts",
"cluster-broker-client.d.ts",
"hasher.d.ts",
"scc-broker-client-tests.ts"
]
}

View File

@ -0,0 +1,3 @@
{
"extends": "dtslint/dt.json"
}

View File

@ -257,6 +257,8 @@ declare namespace SCServer {
authSignAsync?: boolean;
authVerifyAsync?: boolean;
httpServer?: Server;
[additionalOptions: string]: any;
}
interface SCServerSocketStatus {

View File

@ -31,6 +31,16 @@ interface WorkerClusterExitInfo {
childProcess: ChildProcess;
}
interface KillWorkersOptions {
// Shut down the workers immediately without waiting for termination timeout.
immediate?: boolean;
// Shut down the cluster master (load balancer) as well as all the workers.
killClusterMaster?: boolean;
}
type ColorCodes = "red" | "green" | "yellow";
export = SocketCluster;
declare class SocketCluster extends EventEmitter {
@ -59,6 +69,21 @@ declare class SocketCluster extends EventEmitter {
on(event: "workerClusterStart", listener: (workerClusterInfo: WorkerClusterStartInfo) => void): this;
on(event: "workerClusterReady", listener: (workerClusterInfo: WorkerClusterReadyInfo) => void): this;
on(event: "workerClusterExit", listener: (workerClusterInfo: WorkerClusterExitInfo) => void): this;
run(): void;
sendToWorker(workerId: number, data: any, callback?: (err: Error, responseData: any, workerId: number) => void): void;
sendToBroker(brokerId: number, data: any, callback?: (err: Error | null, responseData: any) => void): void;
killWorkers(options?: KillWorkersOptions): void;
killBrokers(): void;
log(message: string, time?: number): void;
colorText(message: string, color?: ColorCodes | number): string;
destroy(callback?: () => void): void;
static create(options?: SCServer.SCServerOptions): SocketCluster;
}
declare namespace SocketCluster {

View File

@ -2,6 +2,9 @@ import * as fsutil from "socketcluster/fsutil";
import SocketCluster = require("socketcluster");
import { SCServer } from "socketcluster-server";
import { ChildProcess } from "child_process";
import path = require("path");
import * as minimist from "minimist";
import * as scHotReboot from "sc-hot-reboot";
////////////////////////////////////////////////////
/// SocketCluster tests
@ -12,6 +15,8 @@ import { ChildProcess } from "child_process";
let sc = new SocketCluster();
sc = new SocketCluster(options);
sc = SocketCluster.create(options);
sc.options = { environment: "prod" };
sc.on(sc.EVENT_FAIL, err => {
@ -61,6 +66,79 @@ import { ChildProcess } from "child_process";
const signal: string = workerClusterInfo.signal;
const childProcess: ChildProcess = workerClusterInfo.childProcess;
});
sc.sendToWorker(123, "test");
sc.sendToWorker(222, "test 2", (err, responseData, workerId) => {
if (!err) {
sc.log(`Received ${responseData} from ${workerId}`);
}
});
sc.sendToBroker(123, "test");
sc.sendToBroker(222, "test 2", (err, responseData) => {
if (!err) {
sc.colorText(`Received ${responseData} from broker`, "yellow");
}
});
sc.killWorkers({ immediate: true });
sc.killBrokers();
sc.destroy();
}
// Adapted from the socketcluster sample
{
const argv = minimist(process.argv.slice(2));
const workerControllerPath = argv.wc || process.env.SOCKETCLUSTER_WORKER_CONTROLLER;
const brokerControllerPath = argv.bc || process.env.SOCKETCLUSTER_BROKER_CONTROLLER;
const workerClusterControllerPath = argv.wcc || process.env.SOCKETCLUSTER_WORKERCLUSTER_CONTROLLER;
const environment = process.env.ENV || "dev";
const options: SCServer.SCServerOptions = {
workers: Number(argv.w) || Number(process.env.SOCKETCLUSTER_WORKERS) || 1,
brokers: Number(argv.b) || Number(process.env.SOCKETCLUSTER_BROKERS) || 1,
port: Number(argv.p) || Number(process.env.SOCKETCLUSTER_PORT) || 8000,
// You can switch to 'sc-uws' for improved performance.
wsEngine: process.env.SOCKETCLUSTER_WS_ENGINE || "ws",
appName: argv.n || process.env.SOCKETCLUSTER_APP_NAME || null,
workerController: workerControllerPath || path.join(__dirname, "worker.js"),
brokerController: brokerControllerPath || path.join(__dirname, "broker.js"),
workerClusterController: workerClusterControllerPath || null,
socketChannelLimit: Number(process.env.SOCKETCLUSTER_SOCKET_CHANNEL_LIMIT) || 1000,
clusterStateServerHost: argv.cssh || process.env.SCC_STATE_SERVER_HOST || null,
clusterStateServerPort: process.env.SCC_STATE_SERVER_PORT || null,
clusterMappingEngine: process.env.SCC_MAPPING_ENGINE || null,
clusterClientPoolSize: process.env.SCC_CLIENT_POOL_SIZE || null,
clusterAuthKey: process.env.SCC_AUTH_KEY || null,
clusterInstanceIp: process.env.SCC_INSTANCE_IP || null,
clusterInstanceIpFamily: process.env.SCC_INSTANCE_IP_FAMILY || null,
clusterStateServerConnectTimeout: Number(process.env.SCC_STATE_SERVER_CONNECT_TIMEOUT) || null,
clusterStateServerAckTimeout: Number(process.env.SCC_STATE_SERVER_ACK_TIMEOUT) || null,
clusterStateServerReconnectRandomness: Number(process.env.SCC_STATE_SERVER_RECONNECT_RANDOMNESS) || null,
crashWorkerOnError: argv["auto-reboot"] !== false,
// If using nodemon, set this to true, and make sure that environment is 'dev'.
killMasterOnSignal: false,
environment
};
const socketCluster = new SocketCluster(options);
socketCluster.on(socketCluster.EVENT_WORKER_CLUSTER_START, workerClusterInfo => {
console.log(" >> WorkerCluster PID:", workerClusterInfo.pid);
});
if (socketCluster.options.environment === "dev") {
// This will cause SC workers to reboot when code changes anywhere in the app directory.
// The second options argument here is passed directly to chokidar.
// See https://github.com/paulmillr/chokidar#api for details.
console.log(` !! The sc-hot-reboot plugin is watching for code changes in the ${__dirname} directory`);
scHotReboot.attach(socketCluster, {
cwd: __dirname,
ignored: ["public", "node_modules", "README.md", "Dockerfile", "server.js", "broker.js", /[\/\\]\./, "*.log"]
});
}
}
////////////////////////////////////////////////////