All files / src/shared/config kafka.config.ts

0% Statements 0/15
100% Branches 0/0
0% Functions 0/4
0% Lines 0/15

Press n or j to go to the next uncovered block, b, p or k for the previous block.

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102                                                                                                                                                                                                           
import { KafkaOptions, Transport } from '@nestjs/microservices';
import { CompressionTypes } from 'kafkajs';
import { IKafkaJS, IRdKafka, EventSdk } from 'src/packages/event-sdk';
import { SERVICE_NAME } from '../const';
import ENV from '../env';
import { eventSdkLogger } from '../logger';
 
export function getKafkaOptions() {
  return {
    client: {
      clientId: SERVICE_NAME,
      brokers: [ENV.KAFKA.BROKER],
      requestTimeout: 30_000,
      connectionTimeout: 10_000,
      enforceRequestTimeout: true,
    },
    consumer: {
      groupId: SERVICE_NAME,
      allowAutoTopicCreation: true,
    },
    send: {
      timeout: 30_000,
      compression: CompressionTypes.GZIP,
    },
    run: {
      partitionsConsumedConcurrently: 3,
    },
    subscribe: {
      fromBeginning: false,
    },
  };
}
 
export function getKafkaConfig(): KafkaOptions {
  return {
    transport: Transport.KAFKA,
    options: getKafkaOptions(),
  };
}
 
export function getKafkaCustomOptionsV1(): IKafkaJS.IEventSdkOptions {
  const baseConfig = getKafkaOptions();
 
  return {
    ...baseConfig,
    client: {
      ...baseConfig.client,
      logLevel: EventSdk.LogLevel.INFO,
      logger: eventSdkLogger,
      retry: {
        initialRetryTime: 300,
        maxRetryTime: 30_000,
        retries: 5,
      },
    },
    producer: {
      ...baseConfig.send,
      compression: EventSdk.CompressionTypes.GZIP,
      logLevel: EventSdk.LogLevel.INFO,
    },
    consumer: {
      ...baseConfig.consumer,
      ...baseConfig.subscribe,
      partitionAssigners: [EventSdk.PartitionAssigners.cooperativeSticky],
      logLevel: EventSdk.LogLevel.INFO,
    },
  };
}
 
export function getKafkaCustomOptionsV2(): IRdKafka.IEventSdkOptions {
  return {
    client: {
      'client.id': SERVICE_NAME,
      'metadata.broker.list': ENV.KAFKA.BROKER,
      'socket.timeout.ms': 30_000,
      'socket.connection.setup.timeout.ms': 10_000,
      'allow.auto.create.topics': true,
      'retry.backoff.ms': 300,
      'reconnect.backoff.max.ms': 30_000,
    },
    producer: {
      'compression.codec': EventSdk.CompressionTypes.GZIP,
      log_level: EventSdk.LogLevel.DEBUG,
      'message.send.max.retries': 5,
    },
    producerTopic: {
      'message.timeout.ms': 30_000,
      'delivery.timeout.ms': 30_000,
    },
    consumer: {
      'group.id': SERVICE_NAME,
      'partition.assignment.strategy': EventSdk.PartitionAssigners.cooperativeSticky,
      log_level: EventSdk.LogLevel.DEBUG,
      debug: 'consumer',
    },
    consumerTopic: {
      'auto.offset.reset': 'latest',
    },
    logger: eventSdkLogger,
  };
}