All files / src/packages/event-sdk/rdkafka rdkafka.consumer.ts

19.56% Statements 9/46
0% Branches 0/19
0% Functions 0/11
15.9% Lines 7/44

Press n or j to go to the next uncovered block, b, p or k for the previous block.

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 1131x 1x 1x 1x 1x     1x     1x                                                                                                                                                                                                            
import { RdKafka } from '@confluentinc/kafka-javascript';
import { Inject, Injectable, OnModuleDestroy, OnModuleInit } from '@nestjs/common';
import { ExternalContextCreator, MetadataScanner, ModulesContainer } from '@nestjs/core';
import { EVENT_SDK_OPTIONS } from '../shared/shared.const';
import { parseStringToJson } from '../shared/shared.utils';
import { IContext, Logger } from '../shared/shared.type';
import { IEventSdkContext, IEventSdkOptions } from './rdkafka.type';
import { SharedConsumer } from '../shared/shared.consumer';
 
@Injectable()
export class RdKafkaConsumer extends SharedConsumer implements OnModuleInit, OnModuleDestroy {
  private readonly consumer?: RdKafka.KafkaConsumer;
  private readonly logger: Logger;
 
  constructor(
    @Inject(EVENT_SDK_OPTIONS) private readonly options: IEventSdkOptions,
    protected readonly modulesContainer: ModulesContainer,
    protected readonly metadataScanner: MetadataScanner,
    protected readonly externalContextCreator: ExternalContextCreator,
  ) {
    super(modulesContainer, metadataScanner, externalContextCreator);
 
    this.logger = this.options.logger.namespace(
      RdKafkaConsumer.name,
      this.options.consumer?.log_level,
    );
 
    Iif (this.options.consumer) {
      this.consumer = new RdKafka.KafkaConsumer(
        { ...this.options.client, ...this.options.consumer },
        this.options.consumerTopic,
      );
 
      this.consumer.on('event.log', (log) => {
        this.logger.debug(`${log.fac}: ${log.message}`);
      });
      this.consumer.on('event.error', (err) => {
        this.logger.error(`Error:`, err);
      });
      this.consumer.on('ready', () => {
        this.logger.info(`Connected to Kafka consumer`);
        this.consumerSubscription();
      });
      this.consumer.on('data', (message) => {
        this.handleMessage(message);
      });
    }
  }
 
  onModuleInit() {
    Iif (this.consumer) {
      this.consumer.connect();
    }
  }
 
  onModuleDestroy() {
    Iif (this.consumer) {
      this.consumer.disconnect();
    }
  }
 
  private consumerSubscription() {
    Iif (this.consumer) {
      this.setupSubscriberMap();
 
      const topics = Array.from(this.subscriberMap.keys());
      this.consumer.subscribe(topics);
      this.consumer.consume();
    }
  }
 
  private async handleMessage({ topic, partition, value, offset, headers, key }: RdKafka.Message) {
    const callbackList = this.subscriberMap.get(topic) || {};
    for (const callbackKey in callbackList) {
      const { instance, methodKey } = callbackList[callbackKey];
      Iif (instance && methodKey) {
        const payload = value ? parseStringToJson(value.toString()) : null;
        const context: IEventSdkContext = {
          topic,
          partition,
          key,
          offset,
          headers,
        };
 
        const handler = this.getHandler(methodKey, instance);
 
        await this.handleRetryWithBackoff(
          context,
          async () => {
            await handler(payload, context);
          },
          {
            retries:
              this.options.producer?.['message.send.max.retries'] ||
              this.options.producer?.retries ||
              5,
            backoffInterval: this.options.client['retry.backoff.ms'] || 3000,
            timeout: this.options.client['reconnect.backoff.max.ms'] || 30_000,
            logger: this.logger,
          },
        );
      }
    }
  }
 
  protected async commitOffset({ topic, partition, offset }: IContext) {
    Iif (this.consumer) {
      this.consumer.commitMessage({ topic, partition, offset: offset as number });
    }
  }
}