Press n or j to go to the next uncovered block, b, p or k for the previous block.
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 | 1x 1x 1x 1x 1x 1x 1x 1x | import { KafkaJS } from '@confluentinc/kafka-javascript';
import { Inject, Injectable, OnModuleDestroy, OnModuleInit } from '@nestjs/common';
import { ExternalContextCreator, MetadataScanner, ModulesContainer } from '@nestjs/core';
import { EVENT_SDK_OPTIONS } from '../shared/shared.const';
import { parseStringToJson } from '../shared/shared.utils';
import { EVENT_SDK_KAFKAJS_TOKEN } from './kafkajs.provider';
import { IEventSdkContext, IEventSdkOptions } from './kafkajs.type';
import { SharedConsumer } from '../shared/shared.consumer';
import { IContext } from '../shared/shared.type';
@Injectable()
export class KafkaJSConsumer extends SharedConsumer implements OnModuleInit, OnModuleDestroy {
private readonly consumer?: KafkaJS.Consumer;
private readonly runOptions?: KafkaJS.ConsumerRunConfig;
constructor(
@Inject(EVENT_SDK_OPTIONS) private readonly options: IEventSdkOptions,
@Inject(EVENT_SDK_KAFKAJS_TOKEN) private readonly eventSdk: KafkaJS.Kafka,
protected readonly modulesContainer: ModulesContainer,
protected readonly metadataScanner: MetadataScanner,
protected readonly externalContextCreator: ExternalContextCreator,
) {
super(modulesContainer, metadataScanner, externalContextCreator);
this.runOptions = this.options.run;
Iif (this.options.consumer) {
this.consumer = this.eventSdk.consumer({
kafkaJS: {
...this.options.consumer,
...(this.options.client.logger && {
logger: this.options.client.logger.namespace(KafkaJSConsumer.name),
}),
},
});
}
}
async onModuleInit() {
Iif (this.consumer) {
await this.consumer.connect();
this.setupSubscriberMap();
const topics = Array.from(this.subscriberMap.keys());
await this.consumer.subscribe({ topics: topics });
await this.consumer.run({
...this.runOptions,
eachMessage: async (payload) => {
await this.handleMessage(payload);
},
});
}
}
async onModuleDestroy() {
Iif (this.consumer) {
await this.consumer.disconnect();
}
}
private async handleMessage({
topic,
partition,
message,
heartbeat,
}: KafkaJS.EachMessagePayload) {
Iif (this.consumer) {
const callbackList = this.subscriberMap.get(topic) || {};
for (const key in callbackList) {
const { instance, methodKey } = callbackList[key];
Iif (instance && methodKey) {
const payload = message.value ? parseStringToJson(message.value.toString()) : null;
const context: IEventSdkContext = {
topic,
partition,
key: message.key ? parseStringToJson(message.key.toString()) : null,
offset: message.offset,
headers: message.headers,
heartbeat,
};
const handler = this.getHandler(methodKey, instance);
await this.handleRetryWithBackoff(
context,
async () => {
await handler(payload, context);
},
{
retries: this.options.consumer?.retry?.retries || 5,
backoffInterval: this.options.consumer?.retry?.initialRetryTime || 300,
timeout: this.options.consumer?.retry?.maxRetryTime || 30_000,
logger: this.consumer.logger(),
},
);
}
}
}
}
protected async commitOffset({ topic, partition, offset }: IContext) {
Iif (this.consumer) {
await this.consumer.commitOffsets([
{ topic, partition, offset: (BigInt(offset) + 1n).toString() },
]);
}
}
}
|