All files / src/packages/event-sdk/shared shared.consumer.ts

14.58% Statements 7/48
0% Branches 0/11
0% Functions 0/6
10.86% Lines 5/46

Press n or j to go to the next uncovered block, b, p or k for the previous block.

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 1141x 1x 1x 1x               1x                                                                                                                                                                                                            
import { Injectable } from '@nestjs/common';
import { ExternalContextCreator, MetadataScanner, ModulesContainer } from '@nestjs/core';
import { sleep } from 'src/shared/utils';
import {
  EVENT_SDK_CONSUMER_METADATA,
  EVENT_SDK_CONTEXT_TYPE,
  RETRIABLE_ERROR_CODES,
} from './shared.const';
import { IContext, IRetryWithBackoffOptions } from './shared.type';
 
@Injectable()
export abstract class SharedConsumer {
  protected readonly subscriberMap = new Map<
    string,
    {
      [index: string]: {
        instance: Record<string, (...args: any[]) => any>;
        methodKey: string;
      };
    }
  >();
 
  constructor(
    protected readonly modulesContainer: ModulesContainer,
    protected readonly metadataScanner: MetadataScanner,
    protected readonly externalContextCreator: ExternalContextCreator,
  ) {}
 
  protected setupSubscriberMap() {
    const modules = this.modulesContainer.values();
    for (const nestModule of Array.from(modules)) {
      nestModule.controllers.forEach((controller) => {
        const { instance } = controller;
        const instancePrototype = Object.getPrototypeOf(instance);
        const methodKeys = this.metadataScanner.getAllMethodNames(instancePrototype);
 
        for (const methodKey of methodKeys) {
          const targetCallback = instancePrototype[methodKey];
 
          const topic = this.getSubscribeMetadata(targetCallback);
          Iif (topic) {
            const callbackList = this.subscriberMap.get(topic) || {};
            callbackList[Object.keys(callbackList).length] = {
              instance: instance as any,
              methodKey,
            };
            this.subscriberMap.set(topic, callbackList);
          }
        }
      });
    }
  }
 
  private getSubscribeMetadata(callback: (...args: any[]) => any): string {
    return Reflect.getMetadata(EVENT_SDK_CONSUMER_METADATA, callback);
  }
 
  protected getHandler(methodKey: string, instance: Record<string, (...args: any[]) => any>) {
    return this.externalContextCreator.create(
      instance,
      instance[methodKey],
      methodKey,
      undefined,
      undefined,
      undefined,
      undefined,
      { guards: true, interceptors: true, filters: true },
      EVENT_SDK_CONTEXT_TYPE,
    );
  }
 
  protected async handleRetryWithBackoff(
    { topic, partition, offset }: IContext,
    handler: (...args: any[]) => Promise<any>,
    { retries, backoffInterval, timeout, logger }: IRetryWithBackoffOptions,
  ) {
    let attempts = 0;
    const startTime = Date.now();
 
    while (attempts <= retries) {
      const elapsed = Date.now() - startTime;
      Iif (elapsed > timeout) {
        logger.error(`Retry timeout (${timeout}ms) exceeded, committing offset and skipping.`);
        await this.commitOffset({ topic, partition, offset });
        break;
      }
 
      try {
        await handler();
        break;
      } catch (err) {
        attempts++;
        if (!err.code || (err.code && !RETRIABLE_ERROR_CODES.includes(err.code))) {
          logger.error('Non-retriable error, committing offset and skipping', err);
          await this.commitOffset({ topic, partition, offset });
          break;
        } else if (attempts > retries) {
          logger.error('Max retries reached, committing offset and skipping');
          await this.commitOffset({ topic, partition, offset });
          break;
        } else {
          logger.warn(
            `Something wrong when processing message, retried with attempt ${attempts}`,
            err,
          );
          await sleep(backoffInterval);
        }
      }
    }
  }
 
  protected abstract commitOffset({ topic, partition, offset }: IContext): Promise<void>;
}