Server-Sent-Events (이하 Sse)
- 클라이언트 측에서 폴링을 따로 사용하지 않고, HTTP 커넥션을 통해 서버에서 이벤트 발생 시 클라이언트 측으로 데이터를 푸시하는 기술이다.
- 가장 초기에는 클라이언트 측에서 서버와 커넥션을 맺기 위한 동작을 수행한다.
- 커넥션이 끊어지면 클라이언트 측은 다시 서버와 커넥션을 맺기 위한 동작을 수행한다.
- HTML5 에 있는 EventSource 인터페이스가 Server-Sent-Events 를 수행할 수 있도록 도와준다.
- EventSource 인터페이스는 모든 브라우저와 호환되고 있지 않다. 관련링크
- 따라서 나는 3개의 브라우저 상에서 해당 인터페이스를 이용해볼 수 있었다.
- 크롬 브라우저 ( + 시크릿 모드 )
- 파이어폭스 브라우저 ( + 시크릿 모드 )
- 오페라 ( + 시크릿 모드 )
- 웹 소켓과는 달리 단방향 데이터를 스트리밍 할 수 있으며, 방향은 서버에서 클라이언트로 향한다.
Server-Sent-Events 화면 예시
- 하나의 서비스에 A 라는 유저가 접속해있는 상태
- 동시에 B 라는 유저도 같이 접속을 하고 있는 상태
- B 라는 유저가 특정한 동작을 수행하고, 서버는 해당 동작을 이벤트로 판단한다.
- 판단된 이벤트는 Sse 에 의해서 HTTP 커넥션 위에 클라이언트 측으로 푸시를 수행
- 해당 내용은 EventSource 의 이벤트 리스너에 의해서 감지되고 이를 A 유저에게 알려준다.
Server-Sent-Events 적용 구조
- 가장 최초에 한번 setupStream() 을 통해서 서버와 커넥션을 맺는다.
- 서버에서는 서비스에 접속한 유저들의 유무를 파악하는 스케쥴링을 실시한다.
- 접속한 유저가 존재하고,
- 생성된 지 30분 이내의 알림
- 아직 알림을 푸시하지 않은 상태의 알림
- 위 두 조건을 만족하는 알람에 대해서 사용자에게 해당 데이터를 푸시
EventSource 생성 및 스트림 연결 예시
let url = "api/notification/user/push?uuid=" + STREAM.id;
let eventSource = new EventSource(url, {withCredentials:true});
- 클라이언트에서 new EventSource() 를 통해 커넥션을 맺기위한 동작을 수행한다.
- 기본적으로 표준 CORS (Cross Origin Resource Sharing 이하 CORS) 는 기본적으로 타 도메인에 대해서 쿠키 값을 전송하지 않지만, withCredentials : true 옵션을 통해서 쿠키값을 전송하도록 설정한다.
- 서버는 이벤트 발생 시, 이벤트를 전송할 때 text/event-stream 포맷에 맞추어서 전송해주어야 한다.
- eventSource 객체가 생성되는 순간에 이벤트 스트림이 열리게 된다.
Sse 를 가능토록 하는 클래스, ResponseBodyEmitter & SseEmitter
비동기 요청이 들어오고, 하나 이상의 오브젝트 타입의 내용들을 반환하는 리턴 값 타입이다. DeferredResult 으로는 한 번의 결과 값을 반환하지만, ReponseBodyEmitter 는 여러 결과 값을 반환할 수 있다. (DefferredResult 는 비동기 요청을 처리하기 위한 클래스이다.)
스프링에서 Server-Sents-Events 를 구현하기 위한 타입이다. (ResponseBodyEmitter 를 상속받는다.)
비동기 요청에 대한 간략 설명
- 요청이 온다.
- HttpServlet 의 service() 수행 및 HttpServletRequest & HttpServletResponse 객체 생성
- HTTP Method 타입 (Get/Post/Patch/Delete) 에 맞는 메소드 수행
- processRequest() → doService() → doDispatch() 순으로 수행
- processRequest() 에서 비동기 요청에 대한 유틸 클래스 WebAsyncUtils 의 객체를 이용
- WebAsyncUtils.getAsyncManager(request) 를 통하여 WebAsyncManager 객체를 획득
- WebAsyncManager 는 AsyncTaskExecutor 를 사용하여, 애플리케이션 대신에 Callable 을 실행
- Callable 을 통해서 요청 스레드가 아닌 별도의 스레드에서 동작을 수행
- 요청 스레드는 이외의 별도의 스레드가 생성되기 때문에 요청을 수행함에 있어서 block 되지 않는다.
- 따라서 별도 스레드는 구현된 동작을 수행한 뒤 이후에 요청에 대해 응답
비동기 컨트롤러 샘플 코드 (Controller)
- 서비스를 접속한 유저에 한해서만 푸시를 받을 수 있음, 그에 따라 SseEmitter 객체를 생성
- 동일한 유저 상에서 서로 클라이언트 접속은 UUID 로 구분
- 비동기 요청을 시작하고, 이벤트가 발생하면 클라이언트로 데이터를 보낼 수 있도록 응답은 열려있는 상태이다
- 응답 포맷은 text/event-stream 에 맞춘 상태이다.
@Slf4j
@RestController
@RequestMapping("/api/notification")
public class NotificationController {
private final NotificationService notificationService;
public NotificationController(NotificationService notificationService) {
this.notificationService = notificationService;
}
/** 생략 **/
@GetMapping(value = "user/push", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public ResponseEntity<SseEmitter> fetchNotify(@AuthenticationPrincipal CustomOAuth2User oAuth2User,
@RequestParam(required = false) String uuid) {
if (oAuth2User == null || uuid == null) {
throw new UnauthorizedException("식별되지 않은 유저의 요청입니다.");
}
final SseEmitter emitter = new SseEmitter();
final User user = oAuth2User.getUser();
final StreamDataSet DATA_SET = new StreamDataSet(user, emitter);
final String UNIQUE_UUID = uuid;
if (log.isDebugEnabled()) {
log.debug("[ 이미터 체크 ] ==============================");
log.debug("request user :: {}", user);
log.debug("request uuid :: {}", UNIQUE_UUID);
log.debug("=============================================");
}
try {
notificationService.addEmitter(UNIQUE_UUID, DATA_SET);
} catch (Exception e) {
throw new InternalServerException(e.getMessage());
}
emitter.onCompletion(() -> {
notificationService.removeEmitter(UNIQUE_UUID);
});
emitter.onTimeout(() -> {
emitter.complete();
notificationService.removeEmitter(UNIQUE_UUID);
});
return new ResponseEntity<>(emitter, HttpStatus.OK);
}
}
알림 서비스 샘플 코드 1 (Service)
- 스케쥴링을 통해서 해당 서비스에 접속한 유저의 여부를 판단
@Slf4j
@Service
@EnableScheduling
public class NotificationService {
/** 생략 **/
private final ConcurrentHashMap<String, StreamDataSet> eventMap = new ConcurrentHashMap<>();
void addEmitter(final String UNIQUE_UUID, final StreamDataSet dataSet) {
eventMap.put(UNIQUE_UUID, dataSet);
}
void removeEmitter(final String UNIQUE_UUID) {
eventMap.remove(UNIQUE_UUID);
}
@Scheduled(initialDelay = 2000L, fixedDelay = 5000L)
public void fetch() {
if (log.isDebugEnabled()) {
log.debug("SseEmitter 스케쥴링 =======================================");
log.debug("현재 시간 :: {} ", LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));
log.debug("클라이언트 :: {}", eventMap.size());
log.debug("==========================================================");
}
if (eventMap.size() == 0) {
return;
}
this.handleAlert();
}
}
알림 서비스 샘플 코드 2 (Service)
- 접속한 유저의 유니크한 값을 통해, 현재 읽지 않은 알림의 개수를 파악
- 읽지 않은 알림의 개수
- 30 분 이내에 작성된 알림
- 아직 사용자로 전송하지 않은 알림
- send() 수행
- send() 수행 이후에 전송 알림에 대해서 컬럼 isAlert 을 'Y' 로 변경
@Slf4j
@Service
@EnableScheduling
public class NotificationService {
/** 생략 **/
@Transactional
public void handleAlert() {
List<String> toBeRemoved = new ArrayList<>(eventMap.size());
List<Long> alertIdList = new ArrayList<>();
for (Map.Entry<String, StreamDataSet> entry : eventMap.entrySet()) {
final String uniqueKey = entry.getKey();
final StreamDataSet dataSet = entry.getValue();
final User user = dataSet.getUser();
final List<Notification> receivingAlert = notificationRepository.findByNotificationTargetUserUidAndIsReadIsFalse(user.getUid());
final int noneReadCount = receivingAlert.size();
/** 접속 유저가 읽지 않은 알람의 개수 **/
if (noneReadCount == 0) {
continue;
}
final SseEmitter emitter = dataSet.getSseEmitter();
/** 30분 이내에 작성된 알람 목록 확인 **/
final List<Notification> alertList = getListAnMinuteAndAlertFalse(receivingAlert);
if (alertList.size() == 0) {
continue;
}
/** 알림데이터 생성 **/
NotificationAlert alert = NotificationAlert.builder()
.uid(user.getUid())
.notificationCount(noneReadCount)
.notifications(alertList)
.build();
/** 알림 목록 ID 획득 **/
alertIdList.addAll(alertList.stream()
.map(Notification::getId)
.collect(Collectors.toList()));
try {
/** 알림 전송 수행 **/
emitter.send(alert, MediaType.APPLICATION_JSON_UTF8);
} catch (Exception e) {
log.error("이미터 센드 시 에러 발생 :: {}", e.getMessage());
toBeRemoved.add(uniqueKey);
}
} // for
/** 전송된 알람들 IS_ALERT 'Y' 로 변경 **/
updateIsAlert(alertIdList);
/** 전송 오류 SseEmitter 제거 **/
for (String uuid : toBeRemoved) {
eventMap.remove(uuid);
}
}
}
/**
* - 30분 이전에 발생된 알람 여부
* - 알람 푸시 수행 여부
*
* @param paramList 현재 접속 사용자에게 존재하는 전체 알림
* @return 현재 시간으로부터 30분 이전에 발생한 알림 목록
*/
private ArrayList<Notification> getListAnMinuteAndAlertFalse(List<Notification> paramList) {
ArrayList<Notification> alertList = new ArrayList<>();
LocalDateTime beforeTime = LocalDateTime.now().minusMinutes(30);
for (Notification notification : paramList) {
boolean isAlert = notification.isAlert();
LocalDateTime createdAt = notification.getCreatedAt();
if (createdAt.isBefore(beforeTime) || isAlert) {
continue;
}
// 30 분 이내 알리미 & 안 읽은 알리미
alertList.add(notification);
}
return alertList;
}
/**
* 전송된 알림에 대해서 IS_READ 값을 'Y' 로 변경
*
* @param alertIds 전송된 알림 ID 목록
*/
private void updateIsAlert(List<Long> alertIds) {
Set<Long> idSet = new HashSet<>(alertIds);
idSet.stream().forEach(notificationRepository::updateNotificationIsAlertById);
}
예외 처리 샘플 코드
- 식별되지 않은 사용자에게 요청이 들어온 경우
- 유저에게 데이터를 전달하는 경우
@ResponseStatus(value = HttpStatus.UNAUTHORIZED, reason = "request does not contain authentication credentials")
public class UnauthorizedException extends RuntimeException {
public UnauthorizedException(String message){
super(message);
}
}
@ResponseStatus(value = HttpStatus.INTERNAL_SERVER_ERROR, reason = "Internal Server Error")
public class InternalServerException extends RuntimeException {
public InternalServerException(String message){
super(message);
}
}
알림 삭제 샘플 코드
- 알림은 일정기간만 보관하고 이후에 삭제를 수행한다.
- 일정한 주기마다 수행될 수 있도록 @Scheduled 어노테이션을 이용한다.
- https://crontab.guru/
- 여기서 중요한 느낀 점은 데이터라는 것은 매우 소중하게 다루어야 한다는 것이다.
- 알림에 대해서 일정기간 보관을 수행하고 바로 삭제하는 것은 바람직하지 않다.
- 바로 삭제하는 것이 아닌, 사용자에게 보여주지 않는 방식으로 진행하여야 한다.
- 이후 시간이 충분히 흐른 뒤 데이터 삭제에 큰 문제가 없다고 판단되면 그 때 배치로 삭제하는 것이 효율적
/**
* reference :: https://crontab.guru/#0_2_*_*_*
* At 02 : 00
*/
@Scheduled(cron = "0 0 2 * * *")
public void deleteNotificationByCron() {
notificationRepository.deleteNotificationByCron();
}
용어
네트워크 상에서 서로 모르는 개체들을 식별하고 구분하기 위한 고유 범용 식별자.
reference
https://www.w3.org/TR/eventsource/#server-sent-events-intro
https://developer.mozilla.org/en-US/docs/Web/API/Server-sent_events/Using_server-sent_events
https://html.spec.whatwg.org/multipage/server-sent-events.html#dom-eventsource
https://xpadro.com/2015/07/understanding-callable-and-spring-deferredresult.html
'Spring' 카테고리의 다른 글
20190901 [환경설정] application.yml 설정 및 내용정리. (0) | 2019.09.01 |
---|---|
20190826 리버스 프록시 & nginx 이용 무중단배포 [여러가지 용어들] (0) | 2019.08.26 |
20180705 properties 파일 사용기. (0) | 2018.07.05 |
20180622 Spring Security (0) | 2018.06.22 |
20180620 스프링 인터셉터 사용기. (0) | 2018.06.20 |