Press n or j to go to the next uncovered block, b, p or k for the previous block.
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 | 1x 1x 1x 1x 1x 1x 1x | import { RdKafka } from '@confluentinc/kafka-javascript';
import { Inject, Injectable, OnModuleDestroy, OnModuleInit } from '@nestjs/common';
import { ExternalContextCreator, MetadataScanner, ModulesContainer } from '@nestjs/core';
import { EVENT_SDK_OPTIONS } from '../shared/shared.const';
import { parseStringToJson } from '../shared/shared.utils';
import { IContext, Logger } from '../shared/shared.type';
import { IEventSdkContext, IEventSdkOptions } from './rdkafka.type';
import { SharedConsumer } from '../shared/shared.consumer';
@Injectable()
export class RdKafkaConsumer extends SharedConsumer implements OnModuleInit, OnModuleDestroy {
private readonly consumer?: RdKafka.KafkaConsumer;
private readonly logger: Logger;
constructor(
@Inject(EVENT_SDK_OPTIONS) private readonly options: IEventSdkOptions,
protected readonly modulesContainer: ModulesContainer,
protected readonly metadataScanner: MetadataScanner,
protected readonly externalContextCreator: ExternalContextCreator,
) {
super(modulesContainer, metadataScanner, externalContextCreator);
this.logger = this.options.logger.namespace(
RdKafkaConsumer.name,
this.options.consumer?.log_level,
);
Iif (this.options.consumer) {
this.consumer = new RdKafka.KafkaConsumer(
{ ...this.options.client, ...this.options.consumer },
this.options.consumerTopic,
);
this.consumer.on('event.log', (log) => {
this.logger.debug(`${log.fac}: ${log.message}`);
});
this.consumer.on('event.error', (err) => {
this.logger.error(`Error:`, err);
});
this.consumer.on('ready', () => {
this.logger.info(`Connected to Kafka consumer`);
this.consumerSubscription();
});
this.consumer.on('data', (message) => {
this.handleMessage(message);
});
}
}
onModuleInit() {
Iif (this.consumer) {
this.consumer.connect();
}
}
onModuleDestroy() {
Iif (this.consumer) {
this.consumer.disconnect();
}
}
private consumerSubscription() {
Iif (this.consumer) {
this.setupSubscriberMap();
const topics = Array.from(this.subscriberMap.keys());
this.consumer.subscribe(topics);
this.consumer.consume();
}
}
private async handleMessage({ topic, partition, value, offset, headers, key }: RdKafka.Message) {
const callbackList = this.subscriberMap.get(topic) || {};
for (const callbackKey in callbackList) {
const { instance, methodKey } = callbackList[callbackKey];
Iif (instance && methodKey) {
const payload = value ? parseStringToJson(value.toString()) : null;
const context: IEventSdkContext = {
topic,
partition,
key,
offset,
headers,
};
const handler = this.getHandler(methodKey, instance);
await this.handleRetryWithBackoff(
context,
async () => {
await handler(payload, context);
},
{
retries:
this.options.producer?.['message.send.max.retries'] ||
this.options.producer?.retries ||
5,
backoffInterval: this.options.client['retry.backoff.ms'] || 3000,
timeout: this.options.client['reconnect.backoff.max.ms'] || 30_000,
logger: this.logger,
},
);
}
}
}
protected async commitOffset({ topic, partition, offset }: IContext) {
Iif (this.consumer) {
this.consumer.commitMessage({ topic, partition, offset: offset as number });
}
}
}
|