All files / src/packages/event-sdk/rdkafka rdkafka.producer.ts

33.33% Statements 6/18
100% Branches 0/0
0% Functions 0/7
25% Lines 4/16

Press n or j to go to the next uncovered block, b, p or k for the previous block.

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 541x 1x 1x         1x                                                                                            
import { RdKafka } from '@confluentinc/kafka-javascript';
import { Inject, Injectable, OnModuleDestroy, OnModuleInit } from '@nestjs/common';
import { EVENT_SDK_OPTIONS } from '../shared/shared.const';
import { IEventSdkProducer, Logger } from '../shared/shared.type';
import { IEventSdkEmitEvent, IEventSdkOptions } from './rdkafka.type';
 
@Injectable()
export class RdKafkaProducer implements IEventSdkProducer, OnModuleInit, OnModuleDestroy {
  private readonly producer: RdKafka.Producer;
  private readonly logger: Logger;
 
  constructor(@Inject(EVENT_SDK_OPTIONS) private readonly options: IEventSdkOptions) {
    this.logger = this.options.logger.namespace(
      RdKafkaProducer.name,
      this.options.producer?.log_level,
    );
 
    this.producer = new RdKafka.Producer(
      { ...this.options.client, ...this.options.producer },
      this.options.producerTopic,
    );
 
    this.producer.on('event.log', (log) => {
      this.logger.debug(`${log.fac}: ${log.message}`);
    });
    this.producer.on('event.error', (err) => {
      this.logger.error(`Error:`, err);
    });
    this.producer.on('ready', () => {
      this.logger.info(`Connected to Kafka producer`);
    });
  }
 
  async onModuleInit() {
    this.producer.connect();
  }
 
  async onModuleDestroy() {
    this.producer.disconnect();
  }
 
  async emit<T>(payload: IEventSdkEmitEvent<T>) {
    return this.producer.produce(
      payload.topic,
      null,
      Buffer.from(JSON.stringify(payload.data)),
      payload.key,
      null,
      null,
      payload.headers,
    );
  }
}