Server-Sent-Events (이하 Sse)

  • 클라이언트 측에서 폴링을 따로 사용하지 않고, HTTP 커넥션을 통해 서버에서 이벤트 발생 시 클라이언트 측으로 데이터를 푸시하는 기술이다.
  • 가장 초기에는 클라이언트 측에서 서버와 커넥션을 맺기 위한 동작을 수행한다.
  • 커넥션이 끊어지면 클라이언트 측은 다시 서버와 커넥션을 맺기 위한 동작을 수행한다.
  • HTML5 에 있는 EventSource 인터페이스가 Server-Sent-Events 를 수행할 수 있도록 도와준다.
    • EventSource 인터페이스는 모든 브라우저와 호환되고 있지 않다. 관련링크
    • 따라서 나는 3개의 브라우저 상에서 해당 인터페이스를 이용해볼 수 있었다.
      • 크롬 브라우저 ( + 시크릿 모드 )
      • 파이어폭스 브라우저 ( + 시크릿 모드 )
      • 오페라 ( + 시크릿 모드 )
  • 웹 소켓과는 달리 단방향 데이터를 스트리밍 할 수 있으며, 방향은 서버에서 클라이언트로 향한다.

 

Server-Sent-Events 화면 예시

  • 하나의 서비스에 A 라는 유저가 접속해있는 상태
  • 동시에 B 라는 유저도 같이 접속을 하고 있는 상태
  • B 라는 유저가 특정한 동작을 수행하고, 서버는 해당 동작을 이벤트로 판단한다.
    • 판단된 이벤트는 Sse 에 의해서 HTTP 커넥션 위에 클라이언트 측으로 푸시를 수행
    • 해당 내용은 EventSource 의 이벤트 리스너에 의해서 감지되고 이를 A 유저에게 알려준다.

 

Server-Sent-Events 적용 구조

  • 가장 최초에 한번 setupStream() 을 통해서 서버와 커넥션을 맺는다.
  • 서버에서는 서비스에 접속한 유저들의 유무를 파악하는 스케쥴링을 실시한다.
  • 접속한 유저가 존재하고,
    • 생성된 지 30분 이내의 알림
    • 아직 알림을 푸시하지 않은 상태의 알림
    • 위 두 조건을 만족하는 알람에 대해서 사용자에게 해당 데이터를 푸시

 

EventSource 생성 및 스트림 연결 예시

let url =  "api/notification/user/push?uuid=" + STREAM.id;
let eventSource = new EventSource(url, {withCredentials:true});
  • 클라이언트에서 new EventSource() 를 통해 커넥션을 맺기위한 동작을 수행한다.
  • 기본적으로 표준 CORS (Cross Origin Resource Sharing 이하 CORS) 는 기본적으로 타 도메인에 대해서 쿠키 값을 전송하지 않지만, withCredentials : true 옵션을 통해서 쿠키값을 전송하도록 설정한다.
  • 서버는 이벤트 발생 시, 이벤트를 전송할 때 text/event-stream 포맷에 맞추어서 전송해주어야 한다.
  • eventSource 객체가 생성되는 순간에 이벤트 스트림이 열리게 된다.

 

Sse 를 가능토록 하는 클래스, ResponseBodyEmitter & SseEmitter

 

ResponseBodyEmitter

비동기 요청이 들어오고, 하나 이상의 오브젝트 타입의 내용들을 반환하는 리턴 값 타입이다. DeferredResult 으로는 한 번의 결과 값을 반환하지만, ReponseBodyEmitter 는 여러 결과 값을 반환할 수 있다. (DefferredResult 는 비동기 요청을 처리하기 위한 클래스이다.)

 

SseEmitter

스프링에서 Server-Sents-Events 를 구현하기 위한 타입이다. (ResponseBodyEmitter 를 상속받는다.)

 

비동기 요청에 대한 간략 설명

  • 요청이 온다.
    1. HttpServlet 의 service() 수행 및 HttpServletRequest & HttpServletResponse 객체 생성
    2. HTTP Method 타입 (Get/Post/Patch/Delete) 에 맞는 메소드 수행 
    3. processRequest() → doService() → doDispatch() 순으로 수행
    4. processRequest() 에서 비동기 요청에 대한 유틸 클래스 WebAsyncUtils 의 객체를 이용
    5. WebAsyncUtils.getAsyncManager(request) 를 통하여 WebAsyncManager 객체를 획득
    6. WebAsyncManager 는 AsyncTaskExecutor 를 사용하여, 애플리케이션 대신에 Callable 을 실행
    7. Callable 을 통해서 요청 스레드가 아닌 별도의 스레드에서 동작을 수행
    8. 요청 스레드는 이외의 별도의 스레드가 생성되기 때문에 요청을 수행함에 있어서 block 되지 않는다.
    9. 따라서 별도 스레드는 구현된 동작을 수행한 뒤 이후에 요청에 대해 응답

비동기 컨트롤러 샘플 코드 (Controller)

  • 서비스를 접속한 유저에 한해서만 푸시를 받을 수 있음, 그에 따라 SseEmitter 객체를 생성
  • 동일한 유저 상에서 서로 클라이언트 접속은 UUID 로 구분
  • 비동기 요청을 시작하고, 이벤트가 발생하면 클라이언트로 데이터를 보낼 수 있도록 응답은 열려있는 상태이다
  • 응답 포맷은 text/event-stream 에 맞춘 상태이다.
@Slf4j
@RestController
@RequestMapping("/api/notification")
public class NotificationController {

    private final NotificationService notificationService;

    public NotificationController(NotificationService notificationService) {
        this.notificationService = notificationService;
    }
	
    /** 생략 **/
    
    @GetMapping(value = "user/push", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public ResponseEntity<SseEmitter> fetchNotify(@AuthenticationPrincipal CustomOAuth2User oAuth2User,
    											  @RequestParam(required = false) String uuid) {

        if (oAuth2User == null || uuid == null) {
            throw new UnauthorizedException("식별되지 않은 유저의 요청입니다.");
        }

        final SseEmitter emitter = new SseEmitter();
        final User user = oAuth2User.getUser();
        final StreamDataSet DATA_SET = new StreamDataSet(user, emitter);
        final String UNIQUE_UUID = uuid;

        if (log.isDebugEnabled()) {
            log.debug("[ 이미터 체크 ] ==============================");
            log.debug("request user :: {}", user);
            log.debug("request uuid :: {}", UNIQUE_UUID);
            log.debug("=============================================");
        }

        try {
            notificationService.addEmitter(UNIQUE_UUID, DATA_SET);
        } catch (Exception e) {
            throw new InternalServerException(e.getMessage());
        }

        emitter.onCompletion(() -> {
            notificationService.removeEmitter(UNIQUE_UUID);
        });
        emitter.onTimeout(() -> {
            emitter.complete();
            notificationService.removeEmitter(UNIQUE_UUID);
        });

        return new ResponseEntity<>(emitter, HttpStatus.OK);
    }
    
}

 

알림 서비스 샘플 코드 1 (Service)

  • 스케쥴링을 통해서 해당 서비스에 접속한 유저의 여부를 판단
@Slf4j
@Service
@EnableScheduling
public class NotificationService {

	/** 생략 **/
    
    private final ConcurrentHashMap<String, StreamDataSet> eventMap = new ConcurrentHashMap<>();
    
    void addEmitter(final String UNIQUE_UUID, final StreamDataSet dataSet) {
        eventMap.put(UNIQUE_UUID, dataSet);
    }

    void removeEmitter(final String UNIQUE_UUID) {
        eventMap.remove(UNIQUE_UUID);
    }
    
    @Scheduled(initialDelay = 2000L, fixedDelay = 5000L)
    public void fetch() {

        if (log.isDebugEnabled()) {
            log.debug("SseEmitter 스케쥴링 =======================================");
            log.debug("현재 시간 :: {} ", LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));
            log.debug("클라이언트 :: {}", eventMap.size());
            log.debug("==========================================================");
        }

        if (eventMap.size() == 0) {
            return;
        }

        this.handleAlert();
    }
    
}

 

알림 서비스 샘플 코드 2 (Service)

  • 접속한 유저의 유니크한 값을 통해, 현재 읽지 않은 알림의 개수를 파악
  • 읽지 않은 알림의 개수
    • 30 분 이내에 작성된 알림
    • 아직 사용자로 전송하지 않은 알림
    • send() 수행
  • send() 수행 이후에 전송 알림에 대해서 컬럼 isAlert 을 'Y' 로 변경
@Slf4j
@Service
@EnableScheduling
public class NotificationService {

	/** 생략 **/

	@Transactional
    public void handleAlert() {

        List<String> toBeRemoved = new ArrayList<>(eventMap.size());
        List<Long> alertIdList = new ArrayList<>();

        for (Map.Entry<String, StreamDataSet> entry : eventMap.entrySet()) {

            final String uniqueKey = entry.getKey();
            final StreamDataSet dataSet = entry.getValue();

            final User user = dataSet.getUser();
            final List<Notification> receivingAlert = notificationRepository.findByNotificationTargetUserUidAndIsReadIsFalse(user.getUid());
            final int noneReadCount = receivingAlert.size();

            /** 접속 유저가 읽지 않은 알람의 개수 **/
            if (noneReadCount == 0) {
                continue;
            }

            final SseEmitter emitter = dataSet.getSseEmitter();

            /** 30분 이내에 작성된 알람 목록 확인 **/
            final List<Notification> alertList = getListAnMinuteAndAlertFalse(receivingAlert);

            if (alertList.size() == 0) {
                continue;
            }

            /** 알림데이터 생성 **/
            NotificationAlert alert = NotificationAlert.builder()
                    .uid(user.getUid())
                    .notificationCount(noneReadCount)
                    .notifications(alertList)
                    .build();


            /** 알림 목록 ID 획득 **/
            alertIdList.addAll(alertList.stream()
                                    .map(Notification::getId)
                                    .collect(Collectors.toList()));

            try {

                /** 알림 전송 수행 **/
                emitter.send(alert, MediaType.APPLICATION_JSON_UTF8);

            } catch (Exception e) {
                log.error("이미터 센드 시 에러 발생 :: {}", e.getMessage());
                toBeRemoved.add(uniqueKey);
            }

        } // for

        /** 전송된 알람들 IS_ALERT 'Y' 로 변경 **/
        updateIsAlert(alertIdList);

        /** 전송 오류 SseEmitter 제거 **/
        for (String uuid : toBeRemoved) {
            eventMap.remove(uuid);
        }
    }

}
/**
 * - 30분 이전에 발생된 알람 여부
 * - 알람 푸시 수행 여부
 *
 * @param paramList 현재 접속 사용자에게 존재하는 전체 알림
 * @return 현재 시간으로부터 30분 이전에 발생한 알림 목록
 */
private ArrayList<Notification> getListAnMinuteAndAlertFalse(List<Notification> paramList) {

    ArrayList<Notification> alertList = new ArrayList<>();

    LocalDateTime beforeTime = LocalDateTime.now().minusMinutes(30);

    for (Notification notification : paramList) {

        boolean isAlert = notification.isAlert();
        LocalDateTime createdAt = notification.getCreatedAt();

        if (createdAt.isBefore(beforeTime) || isAlert) {
            continue;
        }

        // 30 분 이내 알리미 & 안 읽은 알리미
        alertList.add(notification);
    }

    return alertList;
}
/**
 * 전송된 알림에 대해서 IS_READ 값을 'Y' 로 변경
 *
 * @param alertIds 전송된 알림 ID 목록
 */
private void updateIsAlert(List<Long> alertIds) {

    Set<Long> idSet = new HashSet<>(alertIds);
    idSet.stream().forEach(notificationRepository::updateNotificationIsAlertById);

}

 

예외 처리 샘플 코드

  • 식별되지 않은 사용자에게 요청이 들어온 경우
  • 유저에게 데이터를 전달하는 경우
@ResponseStatus(value = HttpStatus.UNAUTHORIZED, reason = "request does not contain authentication credentials")
public class UnauthorizedException extends RuntimeException {

    public UnauthorizedException(String message){
        super(message);
    }

}
@ResponseStatus(value = HttpStatus.INTERNAL_SERVER_ERROR, reason = "Internal Server Error")
public class InternalServerException extends RuntimeException {

    public InternalServerException(String message){
        super(message);
    }
    
}

 

알림 삭제 샘플 코드

  • 알림은 일정기간만 보관하고 이후에 삭제를 수행한다.
  • 일정한 주기마다 수행될 수 있도록 @Scheduled 어노테이션을 이용한다.
  • https://crontab.guru/
  • 여기서 중요한 느낀 점은 데이터라는 것은 매우 소중하게 다루어야 한다는 것이다.
    • 알림에 대해서 일정기간 보관을 수행하고 바로 삭제하는 것은 바람직하지 않다.
    • 바로 삭제하는 것이 아닌, 사용자에게 보여주지 않는 방식으로 진행하여야 한다.
    • 이후 시간이 충분히 흐른 뒤 데이터 삭제에 큰 문제가 없다고 판단되면 그 때 배치로 삭제하는 것이 효율적
/**
 * reference :: https://crontab.guru/#0_2_*_*_*
 * At 02 : 00
 */
@Scheduled(cron = "0 0 2 * * *")
public void deleteNotificationByCron() {
    
    notificationRepository.deleteNotificationByCron();
    
}

 

용어

UUID

네트워크 상에서 서로 모르는 개체들을 식별하고 구분하기 위한 고유 범용 식별자.

 

reference

https://www.w3.org/TR/eventsource/#server-sent-events-intro

https://developer.mozilla.org/en-US/docs/Web/API/Server-sent_events/Using_server-sent_events

https://html.spec.whatwg.org/multipage/server-sent-events.html#dom-eventsource

https://docs.spring.io/spring/docs/current/javadoc-api/org/springframework/web/servlet/mvc/method/annotation/SseEmitter.html

https://docs.spring.io/spring/docs/current/javadoc-api/org/springframework/web/servlet/mvc/method/annotation/ResponseBodyEmitter.html

https://xpadro.com/2015/07/understanding-callable-and-spring-deferredresult.html

 

Posted by doubler
,