안녕하세요. 사람인에이치알 IT연구소 서비스인프라개발팀의 이현규 입니다.
지난 8월에 런칭한 대량 메일 컨텐츠 생성 시스템 개발을 진행하며,
경험했던 것들을 공유하고자 이 포스트를 작성합니다.

먼저, 이번에 개발된 시스템은
이전에 포스팅된 분산처리 시스템 구조개선 프로젝트와 동일한 아키텍쳐를 바탕으로 개발되어 있습니다. 구조가 궁금하시다면 이전 포스트를 참고 부탁 드립니다.

저희 시스템은 크게 Spring Cloud Stream과 kafka를 사용하여 구성으로 하였습니다.
그 중에 이번에 작성할 포스트의 내용은 Spring Cloud Stream에 대한 내용입니다. Spring Cloud Stream은 Spring cloud 프로젝트에서 제공해 주는 프레임워크 중 하나입니다.
먼저 Spring cloud에 대해서 알아보기로 합니다.

Spring cloud란?

Spring Cloud


Building distributed systems doesn't need to be complex and error-prone.
Spring Cloud offers a simple and accessible programming model
to the most common distributed system patterns, helping developers build resilient,
reliable, and coordinated applications.
Spring Cloud is built on top of Spring Boot, making it easy for developers
to get started and become productive quickly.
출처 : https://spring.io/
  • Spring Cloud는 가장 보편적인 분산 시스템 패턴에 대한 단순하고 액세스 가능한 프로그래밍 모델을 제공.
  • 개발자가 탄력적이고 안정적이며 조정된 애플리케이션을 빌드할 수 있도록 지원.
  • Spring Boot위에 구축됨으로써 쉽고 빠르게 생산성을 높일 수 있음.
  • 마이크로서비스를 구축하기 위해 필요한 라이브러리의 집합(프레임워크).
  • Spring Cloud는 spring-cloud-xxxxx 프로젝트들의 umbrella 프로젝트.
  • 마이크로서비스를 구축하기 위한 각각의 독립적인 프로젝트들이 굉장히 많이 존재함.

줄여서 말하자면, Spring Cloud란 마이크로 서비스를 구축하기 위한 프레임워크들 입니다.
그 중 저희팀이 사용한 Spring Cloud Stream에 대해서 알아보기로 합니다.

Spring Cloud Stream이란?

A lightweight event-driven microservices framework to quickly build applications
that can connect to external systems.
Simple declarative model to send and receive messages using Apache Kafka or RabbitMQ
between Spring Boot apps.
출처 : https://spring.io/
  • 외부 시스템에 연결할 수 있는 애플리케이션을 신속하게 구축할 수 있는 경량의 마이크로 서비스 프레임 워크.
  • Apache Kafka 또는 RabbitMQ 등을 사용하여 Spring Boot 어플리케이션과 메세지를 보내고 받음.
  • 이벤트 중심 마이크로 서비스를 구축하기 위한 프레임워크.
    위에 기술된 내용처럼 Spring Cloud Stream에서는 외부 미들웨어와의 통신을 하기 위해서 통합 컴포넌트를 제공합니다. 미들웨어와의 통신을 하기 위해서 꼭 알아야 하는 개념이 있습니다. BinderBinding이라는 개념입니다.

Binder?

  • Spring Cloud Stream이 제공해 주는 미들웨어(kafka)와의 통신 컴포넌트.
  • Spring이 @Configuration 정보를 읽어 미들웨어(kafka) 바인더를 구현체로 제공.
  • 미들웨어(kafka)와 producer 및 consumer의 연결, 위임 및 라우팅 등을 담당.
  • 따라서 Application에서는 미들웨어(kafka)와는 독립적으로 개발 진행 가능.
  • 해당 properties는 spring.cloud.stream.binder에 해당.
    실제 kafka topic에 매핑하는 설정은 spring.cloud.stream.kafka.binder에 해당.

Binding?

  • 바인더의 입/출력을 미들웨어(kafka)에 연결하기 위한 Bridge(해당 바인더에서 만든 Pub/Sub역할).
  • @EnableBinding을 사용하여 Application에서 사용할 바인더 인터페이스를 추가 바인딩이 적용됨.
  • 해당 properties는 spring.cloud.stream.bindings.에 해당.
    실제 kafka topic에 매핑하는 설정은 spring.cloud.stream.kafka.bindings에 해당.

또한, Spring Cloud Stream은 Sink, Source, Processor와 같은 일반적인 바인딩 인터페이스를 제공합니다.

Sink, Source, Processor의 개념
Sink: input(inbound)
Source: output(outbound)
Processor: input, output(both)

사용자가 바인더를 정의하여 사용하고 싶다면
MessageChannel, SubscribableChannel 인터페이스를 이용하여 정의할 수 있고,
@Input과 @Output 사용하여 채널의 이름을 정의할 수 있습니다.

현재 Spring Cloud Stream 이용하여 개발된 시스템의 작성한 코드를 발췌하였습니다.

//interface
public interface MessageBinder {

	String PROCESS_OUT = "process-out";
	String PROCESS_IN = "process-in";

	@Output(PROCESS_OUT)
	MessageChannel createMessage();

	@Input(PROCESS_IN)
	SubscribableChannel messagePush();
}
//interface
public interface TargetUserBinder {

	String TARGET_OUT = "target-out";
	String TARGET_IN = "target-in";

	@Output(TARGET_OUT)
	MessageChannel targetSearch();

	@Input(TARGET_IN)
	SubscribableChannel targetFilter();
}
//method
@EnableBinding(value= {TargetUserBinder.class, MessageBinder.class})
public class TargetUserProcess {
	private MessageBinder pushMessageBinder;

	@Autowired
	public TargetUserProcess(MessageBinder pushMessageBinder) {
		this.pushMessageBinder = pushMessageBinder;
	}

	@StreamListener(TargetUserBinder.TARGET_IN)
	public void targetUserSearch(@Payload TargetMeta targetMeta) {
	   //.......
	   //중략
	   //.......
	   pushMessageBinder.createMessage().send(MessageBuilder.withPayload(targetMeta).build());
	}
}

  • @EnableBinding은 어노테이션 파라미터로 필요한 만큼 바인딩 인터페이스를 적용할 수 있습니다.
  • 위 코드의 내용은 MessageBinder와 TargetUserBinder 채널을 사용하는 내용의 코드입니다.
    TargetUserBinder의 targetFilter 채널의 메세지를 구독하여,
    MessageBinder의 createMessage 채널로 메세지를 보내는 내용의 코드입니다.
  • 위 내용에 기재했던 것처럼 제공된 인터페이스와 어노테이션을 사용하여 바인더를 구현하였습니다.

Spring Cloud Stream의 개념과 실제 kafka 바인드 하는 방법에 대해서 알아 보았습니다.


지금부터는 살짝 다른 이야기를 해볼까 합니다. 초기에 kafka는 대용량 메세지를 빠르게 처리하기 위해 만들어진 제품이라고 알고 있었습니다. 하지만, kafka가 발전 되면서 연속적으로 유입되는 메세지(스트림)를 처리할 수 있는 Kafka Streams API를 라이브러리를 제공하고 있습니다.

Kafka Streams?

Kafka Streams은 kafka에 저장된 데이터를 처리하고 분석하기 위한 라이브러리 입니다. 또한, 데이터를 처리하거나 분석하가 위한 상태 저장소도 제공하고 있습니다.

왜 사용하는가?

  • 연속적인 이벤트 스트림 데이터를 좀 더 빠르고 간단하게 처리하여 연속적인 결과값을 얻기 위하여 사용합니다.


배치프로세싱


스트림프로세싱

스트림 프로세싱과 배치 프로세싱의 차이

  • 배치 프로세싱은 정적인 데이터를 특정시간에 일괄(이 단어 자체가 batch임)로 처리하는 것을 의미합니다.
  • 스트림 프로세싱은 일괄처리와는 다르게 다음 일괄처리 시간까지 기다리지 않고 데이터가 도착할 때마다 지속적으로 처리되는 것을 의미합니다.

상태 기반/무상태 기반 스트림 처리란?

  • 둘의 차이점은 스트리밍 데이터를 처리 혹은 분석하기 위한 이전 데이터와 상태 저장소의 유무.

KStream이란?

카프카 저장소에 기록되는 스트리밍 데이터를 간추려 내어 처리(추상화)하거나 분석할 수 있게 해주는 인터페이스. 일반적인 RDB의 테이블 비유하자면 스트림의 데이터를 저장소에 “INSERT” 한다라고 생각하시면 됩니다.

v1 = {"success" : "0", "error" : "1"}
v2 = {"success" : "0", "error" : "2"}

.reduce((v1, v2) -> v1 + v2)

{"success" : "0", "error" : "3"}

위에 간단하게 예시된 설명처럼 지속적으로 유입되는 데이터를 “INSERT” 하게 되면, .reduce() 연산을 한 결과값이 굉장히 짧은 주기로 반환하게 됩니다.

우리가 사용한 이유?

  • 메세지를 생성하고 난 후, 메세지 성공/실패/에러 등등에 대한 여부를 집계하기 위해 사용하였습니다.
  • 또한, 집계처리를 어플리케이션에서 하는게 아닌 카프카 저장소에서 진행되기 때문에 scale-in/out 혹은 컨테이터 다운이 되었을 시에도 안정적으로 동작할 수 있기 때문에 사용하였습니다.

마치며

Spring Cloud Stream과 kafka를 사용하며 고민했던 내용들을 정리하여 기술해 보았습니다. 이 프로젝트를 진행하면서 개인적으로는 조금이나마 발전한 것 같습니다. 기술을 쓰기 위하여 내부적으로 동작하는 내용이라던지 레퍼런스를 꼼꼼히 읽었던 것들이 많은 도움이 되었습니다. 이런 경험과 기술들이 계속 쌓이다 보면, 새로운 것을 접했을 때도 두려워 하지 않고 더 나아갈 수 있을 것 같습니다.

Reference

https://spring.io/
https://doc.spring.io/
https://www.baeldung.com/java-kafka-streams/
https://kafka.apache.org/documentation/streams/
https://docs.confluent.io/