Press n or j to go to the next uncovered block, b, p or k for the previous block.
| 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 | import { Controller, Param, Sse } from '@nestjs/common'; import { OnEvent } from '@nestjs/event-emitter'; import { MessagePattern } from '@nestjs/microservices'; import { ApiOperation, ApiTags } from '@nestjs/swagger'; import { Observable, Subject } from 'rxjs'; import { EventTypeEnum, KafkaTopicEnum, RabbitPatternEnum, } from 'src/modules/common/event/shared/event.const'; import { EventConsumer } from 'src/modules/common/event/shared/event.decorator'; import { EmitMessageDto } from 'src/modules/common/event/shared/event.dto'; import { SseService } from '../services/sse.service'; import { IMessageEvent } from '../shared/interface'; @ApiTags('Pubsub - SSE') @Controller({ path: 'pubsub/sse' }) export class SseController { constructor(private readonly sseService: SseService) {} @Sse(':id') @ApiOperation({ operationId: 'serverPushChannel', summary: 'SSE channel which push data to client side.', description: 'Received data from endpoint **emitMessage**', }) sse(@Param('id') channelId: string): Observable<IMessageEvent> { const subject = new Subject<IMessageEvent>(); this.sseService.addChannel(channelId, subject); subject.subscribe({ complete: () => this.sseService.removeChannel(channelId), }); return subject.asObservable(); } @OnEvent(EventTypeEnum.Base) handleEvent(payload: EmitMessageDto) { this.sseService.pushDataToChannel(payload.id, payload.data); } @MessagePattern(RabbitPatternEnum.Base) handleEventFromRabbitMQ(payload: EmitMessageDto) { this.sseService.pushDataToChannel(payload.id, payload.data); } @EventConsumer(KafkaTopicEnum.Base) handleEventFromKafka(payload: EmitMessageDto) { this.sseService.pushDataToChannel(payload.id, payload.data); } } |