All files / src/modules/domain/pubsub/sse/controllers sse.controller.ts

0% Statements 0/25
100% Branches 0/0
0% Functions 0/6
0% Lines 0/23

Press n or j to go to the next uncovered block, b, p or k for the previous block.

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54                                                                                                           
import { Controller, Param, Sse } from '@nestjs/common';
import { OnEvent } from '@nestjs/event-emitter';
import { MessagePattern } from '@nestjs/microservices';
import { ApiOperation, ApiTags } from '@nestjs/swagger';
import { Observable, Subject } from 'rxjs';
import {
  EventTypeEnum,
  KafkaTopicEnum,
  RabbitPatternEnum,
} from 'src/modules/common/event/shared/event.const';
import { EventConsumer } from 'src/modules/common/event/shared/event.decorator';
import { EmitMessageDto } from 'src/modules/common/event/shared/event.dto';
import { SseService } from '../services/sse.service';
import { IMessageEvent } from '../shared/interface';
 
@ApiTags('Pubsub - SSE')
@Controller({ path: 'pubsub/sse' })
export class SseController {
  constructor(private readonly sseService: SseService) {}
 
  @Sse(':id')
  @ApiOperation({
    operationId: 'serverPushChannel',
    summary: 'SSE channel which push data to client side.',
    description: 'Received data from endpoint **emitMessage**',
  })
  sse(@Param('id') channelId: string): Observable<IMessageEvent> {
    const subject = new Subject<IMessageEvent>();
 
    this.sseService.addChannel(channelId, subject);
 
    subject.subscribe({
      complete: () => this.sseService.removeChannel(channelId),
    });
 
    return subject.asObservable();
  }
 
  @OnEvent(EventTypeEnum.Base)
  handleEvent(payload: EmitMessageDto) {
    this.sseService.pushDataToChannel(payload.id, payload.data);
  }
 
  @MessagePattern(RabbitPatternEnum.Base)
  handleEventFromRabbitMQ(payload: EmitMessageDto) {
    this.sseService.pushDataToChannel(payload.id, payload.data);
  }
 
  @EventConsumer(KafkaTopicEnum.Base)
  handleEventFromKafka(payload: EmitMessageDto) {
    this.sseService.pushDataToChannel(payload.id, payload.data);
  }
}