Press n or j to go to the next uncovered block, b, p or k for the previous block.
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 | 1x 1x 1x 1x 1x | import { Injectable } from '@nestjs/common';
import { ExternalContextCreator, MetadataScanner, ModulesContainer } from '@nestjs/core';
import { sleep } from 'src/shared/utils';
import {
EVENT_SDK_CONSUMER_METADATA,
EVENT_SDK_CONTEXT_TYPE,
RETRIABLE_ERROR_CODES,
} from './shared.const';
import { IContext, IRetryWithBackoffOptions } from './shared.type';
@Injectable()
export abstract class SharedConsumer {
protected readonly subscriberMap = new Map<
string,
{
[index: string]: {
instance: Record<string, (...args: any[]) => any>;
methodKey: string;
};
}
>();
constructor(
protected readonly modulesContainer: ModulesContainer,
protected readonly metadataScanner: MetadataScanner,
protected readonly externalContextCreator: ExternalContextCreator,
) {}
protected setupSubscriberMap() {
const modules = this.modulesContainer.values();
for (const nestModule of Array.from(modules)) {
nestModule.controllers.forEach((controller) => {
const { instance } = controller;
const instancePrototype = Object.getPrototypeOf(instance);
const methodKeys = this.metadataScanner.getAllMethodNames(instancePrototype);
for (const methodKey of methodKeys) {
const targetCallback = instancePrototype[methodKey];
const topic = this.getSubscribeMetadata(targetCallback);
Iif (topic) {
const callbackList = this.subscriberMap.get(topic) || {};
callbackList[Object.keys(callbackList).length] = {
instance: instance as any,
methodKey,
};
this.subscriberMap.set(topic, callbackList);
}
}
});
}
}
private getSubscribeMetadata(callback: (...args: any[]) => any): string {
return Reflect.getMetadata(EVENT_SDK_CONSUMER_METADATA, callback);
}
protected getHandler(methodKey: string, instance: Record<string, (...args: any[]) => any>) {
return this.externalContextCreator.create(
instance,
instance[methodKey],
methodKey,
undefined,
undefined,
undefined,
undefined,
{ guards: true, interceptors: true, filters: true },
EVENT_SDK_CONTEXT_TYPE,
);
}
protected async handleRetryWithBackoff(
{ topic, partition, offset }: IContext,
handler: (...args: any[]) => Promise<any>,
{ retries, backoffInterval, timeout, logger }: IRetryWithBackoffOptions,
) {
let attempts = 0;
const startTime = Date.now();
while (attempts <= retries) {
const elapsed = Date.now() - startTime;
Iif (elapsed > timeout) {
logger.error(`Retry timeout (${timeout}ms) exceeded, committing offset and skipping.`);
await this.commitOffset({ topic, partition, offset });
break;
}
try {
await handler();
break;
} catch (err) {
attempts++;
if (!err.code || (err.code && !RETRIABLE_ERROR_CODES.includes(err.code))) {
logger.error('Non-retriable error, committing offset and skipping', err);
await this.commitOffset({ topic, partition, offset });
break;
} else if (attempts > retries) {
logger.error('Max retries reached, committing offset and skipping');
await this.commitOffset({ topic, partition, offset });
break;
} else {
logger.warn(
`Something wrong when processing message, retried with attempt ${attempts}`,
err,
);
await sleep(backoffInterval);
}
}
}
}
protected abstract commitOffset({ topic, partition, offset }: IContext): Promise<void>;
}
|