All files / src/packages/event-sdk/kafkajs kafkajs.consumer.ts

25% Statements 10/40
0% Branches 0/22
0% Functions 0/7
21.05% Lines 8/38

Press n or j to go to the next uncovered block, b, p or k for the previous block.

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 1111x 1x 1x 1x 1x 1x   1x       1x                                                                                                                                                                                                      
import { KafkaJS } from '@confluentinc/kafka-javascript';
import { Inject, Injectable, OnModuleDestroy, OnModuleInit } from '@nestjs/common';
import { ExternalContextCreator, MetadataScanner, ModulesContainer } from '@nestjs/core';
import { EVENT_SDK_OPTIONS } from '../shared/shared.const';
import { parseStringToJson } from '../shared/shared.utils';
import { EVENT_SDK_KAFKAJS_TOKEN } from './kafkajs.provider';
import { IEventSdkContext, IEventSdkOptions } from './kafkajs.type';
import { SharedConsumer } from '../shared/shared.consumer';
import { IContext } from '../shared/shared.type';
 
@Injectable()
export class KafkaJSConsumer extends SharedConsumer implements OnModuleInit, OnModuleDestroy {
  private readonly consumer?: KafkaJS.Consumer;
  private readonly runOptions?: KafkaJS.ConsumerRunConfig;
 
  constructor(
    @Inject(EVENT_SDK_OPTIONS) private readonly options: IEventSdkOptions,
    @Inject(EVENT_SDK_KAFKAJS_TOKEN) private readonly eventSdk: KafkaJS.Kafka,
 
    protected readonly modulesContainer: ModulesContainer,
    protected readonly metadataScanner: MetadataScanner,
    protected readonly externalContextCreator: ExternalContextCreator,
  ) {
    super(modulesContainer, metadataScanner, externalContextCreator);
 
    this.runOptions = this.options.run;
 
    Iif (this.options.consumer) {
      this.consumer = this.eventSdk.consumer({
        kafkaJS: {
          ...this.options.consumer,
          ...(this.options.client.logger && {
            logger: this.options.client.logger.namespace(KafkaJSConsumer.name),
          }),
        },
      });
    }
  }
 
  async onModuleInit() {
    Iif (this.consumer) {
      await this.consumer.connect();
 
      this.setupSubscriberMap();
 
      const topics = Array.from(this.subscriberMap.keys());
      await this.consumer.subscribe({ topics: topics });
      await this.consumer.run({
        ...this.runOptions,
        eachMessage: async (payload) => {
          await this.handleMessage(payload);
        },
      });
    }
  }
 
  async onModuleDestroy() {
    Iif (this.consumer) {
      await this.consumer.disconnect();
    }
  }
 
  private async handleMessage({
    topic,
    partition,
    message,
    heartbeat,
  }: KafkaJS.EachMessagePayload) {
    Iif (this.consumer) {
      const callbackList = this.subscriberMap.get(topic) || {};
      for (const key in callbackList) {
        const { instance, methodKey } = callbackList[key];
        Iif (instance && methodKey) {
          const payload = message.value ? parseStringToJson(message.value.toString()) : null;
          const context: IEventSdkContext = {
            topic,
            partition,
            key: message.key ? parseStringToJson(message.key.toString()) : null,
            offset: message.offset,
            headers: message.headers,
            heartbeat,
          };
 
          const handler = this.getHandler(methodKey, instance);
 
          await this.handleRetryWithBackoff(
            context,
            async () => {
              await handler(payload, context);
            },
            {
              retries: this.options.consumer?.retry?.retries || 5,
              backoffInterval: this.options.consumer?.retry?.initialRetryTime || 300,
              timeout: this.options.consumer?.retry?.maxRetryTime || 30_000,
              logger: this.consumer.logger(),
            },
          );
        }
      }
    }
  }
 
  protected async commitOffset({ topic, partition, offset }: IContext) {
    Iif (this.consumer) {
      await this.consumer.commitOffsets([
        { topic, partition, offset: (BigInt(offset) + 1n).toString() },
      ]);
    }
  }
}