All files / src/modules/common/event/services builtin-kafka.service.ts

0% Statements 0/17
100% Branches 0/0
0% Functions 0/6
0% Lines 0/15

Press n or j to go to the next uncovered block, b, p or k for the previous block.

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37                                                                         
import { Inject, Injectable, OnModuleDestroy, OnModuleInit } from '@nestjs/common';
import { ClientKafka } from '@nestjs/microservices';
import { instanceToPlain } from 'class-transformer';
import { firstValueFrom } from 'rxjs';
import { logger } from 'src/shared/logger';
import { KafkaTopicEnum } from '../shared/event.const';
import { EmitMessageDto } from '../shared/event.dto';
import { IKafkaService } from '../shared/event.interface';
import { KAFKA_TOKEN } from '../shared/event.provider';
 
@Injectable()
export class BuiltinKafkaService implements IKafkaService, OnModuleInit, OnModuleDestroy {
  constructor(@Inject(KAFKA_TOKEN) private readonly kafkaClient: ClientKafka) {}
 
  onModuleInit() {
    this.kafkaClient.connect().then(() => {
      logger.info('Connected to Kafka');
    });
  }
 
  onModuleDestroy() {
    this.kafkaClient.close().then(() => {
      logger.info('Kafka connection closed');
    });
  }
 
  emitMessage(payload: EmitMessageDto) {
    const data = instanceToPlain(payload);
    return firstValueFrom(
      this.kafkaClient.emit(KafkaTopicEnum.Base, {
        key: data.id,
        value: data,
      }),
    );
  }
}