Press n or j to go to the next uncovered block, b, p or k for the previous block.
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 | 1x 1x 1x 1x | import { RdKafka } from '@confluentinc/kafka-javascript';
import { Inject, Injectable, OnModuleDestroy, OnModuleInit } from '@nestjs/common';
import { EVENT_SDK_OPTIONS } from '../shared/shared.const';
import { IEventSdkProducer, Logger } from '../shared/shared.type';
import { IEventSdkEmitEvent, IEventSdkOptions } from './rdkafka.type';
@Injectable()
export class RdKafkaProducer implements IEventSdkProducer, OnModuleInit, OnModuleDestroy {
private readonly producer: RdKafka.Producer;
private readonly logger: Logger;
constructor(@Inject(EVENT_SDK_OPTIONS) private readonly options: IEventSdkOptions) {
this.logger = this.options.logger.namespace(
RdKafkaProducer.name,
this.options.producer?.log_level,
);
this.producer = new RdKafka.Producer(
{ ...this.options.client, ...this.options.producer },
this.options.producerTopic,
);
this.producer.on('event.log', (log) => {
this.logger.debug(`${log.fac}: ${log.message}`);
});
this.producer.on('event.error', (err) => {
this.logger.error(`Error:`, err);
});
this.producer.on('ready', () => {
this.logger.info(`Connected to Kafka producer`);
});
}
async onModuleInit() {
this.producer.connect();
}
async onModuleDestroy() {
this.producer.disconnect();
}
async emit<T>(payload: IEventSdkEmitEvent<T>) {
return this.producer.produce(
payload.topic,
null,
Buffer.from(JSON.stringify(payload.data)),
payload.key,
null,
null,
payload.headers,
);
}
}
|