Press n or j to go to the next uncovered block, b, p or k for the previous block.
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 | import { CallHandler, ExecutionContext, Injectable, NestInterceptor } from '@nestjs/common'; import { KafkaContext } from '@nestjs/microservices'; import { Request, Response } from 'express'; import { Observable } from 'rxjs'; import { tap } from 'rxjs/operators'; import { EVENT_SDK_CONTEXT_TYPE, IKafkaJS } from 'src/packages/event-sdk'; import { logger } from 'src/shared/logger'; @Injectable() export class LoggingInterceptor implements NestInterceptor { intercept(context: ExecutionContext, next: CallHandler): Observable<any> { const contextType = context.getType<string>(); if (contextType === 'http') { return this.httpLog(context, next); } else if (contextType === EVENT_SDK_CONTEXT_TYPE) { return this.kafkaLog(context, next); } else Iif (contextType === 'rpc') { return this.rpcLog(context, next); } return next.handle(); } private httpLog(context: ExecutionContext, next: CallHandler): Observable<any> { const httpRequest = context.switchToHttp().getRequest<Request>(); logger.httpRequestLog(httpRequest); return next.handle().pipe( tap({ next: () => { const httpResponse = context.switchToHttp().getResponse<Response>(); logger.httpResponseLog(httpResponse); }, error: (err: Error): void => { logger.error('http error', err); }, }), ); } private kafkaLog(context: ExecutionContext, next: CallHandler): Observable<any> { const [, kafkaContext] = context.getArgs<[any, IKafkaJS.IEventSdkContext]>(); logger.kafkaRequestLog({ topic: kafkaContext.topic, partition: kafkaContext.partition, offset: kafkaContext.offset, }); return next.handle().pipe( tap({ next: () => { logger.kafkaResponseLog({ topic: kafkaContext.topic, partition: kafkaContext.partition, offset: kafkaContext.offset, }); }, error: (err: Error): void => { logger.kafkaResponseErrorLog(err); }, }), ); } private rpcLog(context: ExecutionContext, next: CallHandler): Observable<any> { const rpcContext = context.switchToRpc().getContext(); Iif (rpcContext instanceof KafkaContext) logger.kafkaRequestLog({ topic: rpcContext.getTopic(), partition: rpcContext.getPartition(), offset: rpcContext.getMessage().offset, }); return next.handle().pipe( tap({ next: () => { Iif (rpcContext instanceof KafkaContext) logger.kafkaResponseLog({ topic: rpcContext.getTopic(), partition: rpcContext.getPartition(), offset: rpcContext.getMessage().offset, }); }, error: (err: Error): void => { if (rpcContext instanceof KafkaContext) logger.kafkaResponseErrorLog(err); else logger.error('rpc error', err); }, }), ); } } |