Update types for kafkajs from official repository (#36063)

This commit is contained in:
Michel van der Hulst 2019-06-13 10:11:03 +02:00 committed by Ron Buckton
parent 5ce5329b56
commit 5749a49e50
2 changed files with 533 additions and 522 deletions

File diff suppressed because it is too large Load Diff

View File

@ -2,42 +2,30 @@ import * as fs from "fs";
import {
Kafka,
AssignerProtocol,
PartitionAssigners,
logLevel,
CompressionTypes,
CompressionCodecs,
ResourceTypes,
PartitionAssigner,
LoggerMessage
CompressionCodecs
} from "kafkajs";
const { MemberMetadata, MemberAssignment } = AssignerProtocol;
const { roundRobin } = PartitionAssigners;
// COMMON
const host = "localhost";
const topic = "topic-test";
const logger = (loggerMessage: LoggerMessage): void => {
console.log(`[${loggerMessage.namespace}] ${loggerMessage.log.message}`);
};
const kafka = new Kafka({
logLevel: logLevel.INFO,
brokers: [`${host}:9094`, `${host}:9097`, `${host}:9100`],
clientId: "example-consumer",
ssl: {
servername: "localhost",
rejectUnauthorized: false,
ca: [fs.readFileSync("./testHelpers/certs/cert-signed", "utf-8")]
},
sasl: {
mechanism: "plain",
username: "test",
password: "testtest"
},
logCreator: () => logger
}
});
// CONSUMER
@ -47,13 +35,20 @@ const runConsumer = async () => {
await consumer.connect();
await consumer.subscribe({ topic });
await consumer.run({
// eachBatch: async ({ batch }) => {
// console.log(batch)
// },
eachBatch: async ({ commitOffsetsIfNecessary }) => {
commitOffsetsIfNecessary({
topics: [{
topic: 'topic-name',
partitions: [
{ partition: 0, offset: '500' }
]
}]
});
},
eachMessage: async ({ topic, partition, message }) => {
const prefix = `${topic}[${partition} | ${message.offset}] / ${
message.timestamp
}`;
}`;
console.log(`- ${prefix} ${message.key}#${message.value}`);
}
});
@ -96,10 +91,17 @@ runProducer().catch(e => console.error(`[example/producer] ${e.message}`, e));
const admin = kafka.admin({ retry: { retries: 10 } });
const runAdmin = async () => {
await admin.connect();
const { topics } = await admin.getTopicMetadata({});
await admin.createTopics({ topics: [{ topic, numPartitions: 10, replicationFactor: 1}], timeout: 30000, waitForLeaders: true });
await admin.disconnect();
await admin.connect();
await admin.fetchTopicMetadata({
topic: 'string',
partitions: []
});
await admin.createTopics({
topics: [{ topic, numPartitions: 10, replicationFactor: 1 }],
timeout: 30000,
waitForLeaders: true
});
await admin.disconnect();
};
runAdmin().catch(e => console.error(`[example/admin] ${e.message}`, e));
@ -113,39 +115,10 @@ async () => {
});
};
// import SnappyCodec from "kafkajs-snappy";
const SnappyCodec: any = undefined;
CompressionCodecs[CompressionTypes.Snappy] = SnappyCodec;
const myCustomAssignmentArray = [0];
const assignment: { [key: number]: { [key: string]: number[] } } = {
0: { a: [0] }
};
const MyPartitionAssigner: PartitionAssigner = ({ cluster: any }) => ({
name: "MyPartitionAssigner",
version: 1,
async assign({ members, topics }) {
// perform assignment
return myCustomAssignmentArray.map(memberId => ({
memberId,
memberAssignment: MemberAssignment.encode({
version: this.version,
assignment: assignment[memberId]
})
}));
},
protocol({ topics }) {
return {
name: this.name,
metadata: MemberMetadata.encode({
version: this.version,
topics
})
};
}
});
kafka.consumer({
groupId: "my-group",
partitionAssigners: [MyPartitionAssigner, roundRobin]
partitionAssigners: [roundRobin]
});