요즘에 구상하고 있는 연구의 모델이, 각 서버로 부터 실시간 로그를 받아서 값을 분석하여 최적화하는거라 스파크 스트리밍을 보고있다.
스파크 스트리밍을 선택하자라는 결론에 도달했을때, 입력 소스는 어떻게 할 것인가 하는 고민이 있었다. 뭐, 다양한 방법이 있을 수 있겠지만. 요새 핫하다는 아파치 카프카를 사용해봐야겠다고 마음먹었다. (엔지니어라면 핫한 기술에 손이 가기 마련이니..)
무엇보다, 스파크 공식 홈페이지에서 카프카와 연동하는 방법에 대해 아주 잘 기술되어 있어서 기술을 적용하는데에 있어서도 어려움이 상대적으로 적다고 할 수 있다.
그렇다면, 아파치 카프카(Apache Kafka)는 무엇일까.
카프카는 대용량 실시간 처리를 위해 사용하는 메시징 시스템이며, Pub-Sub 구조를 이용한다.
요즘 잘나간다는 글로벌 서비스들(LinkedIn, Twitter, Netflix, Tumblr, Foursquare 등)은 대용량 데이터를 처리하기 위해 카프카를 쓰고 있다.
카프카에 더 자세한 이야기는 다른 포스팅을 통해 따로 다루도록 하고,
이번 포스팅의 주 목적인 스파크 스트리밍 + 카프카에 대해서 알아보자.
카프카에 대한 내장 지원 모듈을 쓰면 쉽게 많은 토픽에 대한 메세지를 처리할 수 있다.
이를 사용하기 위해서는 프로젝트에 (Maven을 쓸 경우엔 Pom.xml에) spark-streaming-kafka_2.10 아티팩트 (artifact)를 추가해야한다.
물론 아티팩트나 버전에 기술되는 값은 상황에 맞게 적절하게 기술되어져야 한다. 본인이 사용하고 있는 스파크 버전, 스칼라 버전, 카프카 버전 등을 잘 살펴봐야하며, 서로 호환이 되는 것인지도 확인해야한다.
또한, 자신이 작성한 메소드나 객체가 해당 버전의 디펜던시를 통해 import 시킬 수 있는 것인지도 확인이 되어야, 실제로 코딩할 때 예상치 못한 부분에서 에러가 발생하여, 어리둥절해 지는 순간을 최소화할 수 있다.
참고로 이런 부분은 스파크 공식 홈페이지를 통해 가장 간단하게 해결할 수 있다. 그곳의 도큐먼트에 아주 친절하게 잘 설명되어있다. (어설프게 구글링하다가 디펜던시도 꼬이고 코드도 꼬이는 불상사를 막자..)
어쨌든 디펜던시들이 정상적으로 잘 적용됐을때, 제공되는 KafkaUtils 객체는 StreamingContext에서 동작하며, 카프카 메세지의 DStream을 생성한다.
카프카 스트림은 여러 토픽에 대한 메세지를 받아 볼 수 있으므로, 여기서 만들어진 DStream은 토픽과 메세지의 쌍들로 구성된다.
스파크 스트리밍과 카프카를 연결하는 방법에는 두가지 접근법있다.
- Recevier-based Approah
- Direct Approach (No Receviers)
특히, 카프카와 같은 입력 소스는 데이터 스트림의 임의의 위치에 replaying을 허용한다. 이 경우, 그 입력 소스는 스파크 스트리밍에게 데이터 스트림 소비에 관한 더 많은 제어권을 부여하기 때문에, 더 강력한 fault-tolerance의 의미를 가진다. 이것이 Recevier-based Approach다.
Spark 1.3에서 Direct API라는 개념을 선보였는데, 이를 통해 WAL를 사용하지 않고도 한번만에 처리할 수 있도록 했다. 이것이 Direct Approach (No Receivers)이고 exactly-once 방식이라고도 한다.
Approach 1: Receiver-based Approach
이 방식이 돌아가는 방식은 아래와 같다.
- 데이터는 카프카 리시버 (Kafka Receiver)로 부터 지속적으로 받아진다.
- 이때, 리시버는 스파크 워커 노드에 있으며, 카프카의 High-Level Consumer API를 사용한다
- 들어온 데이터는 스파크 워커 노드의 메모리와 WAL에 저장된다 (또한, HDFS에도 복제될 것이다). 그리고 카프카 리시버는 Zookeeper에 카프카의 오프셋을 업데이트한다.
- 들어온 데이터와 그것의 WAL 위치에 대한 정보 역시 저장된다. 만약에 오류가 발생하면, 그 정보를 이용해서 다시 데이터를 읽고 처리할 수 있다.
이렇게 하면, 카프카로 부터 들어오는 데이터가 유실되거나 손실되지 않을 거라는 보장을 할수는 있지만, 오류로 인해 어떤 데이터 레코드가 한번 이상 처리될 수 있는 가능성이 있다. 다시말해 이것은 at-least-once 방식이다.
가령, 어떤 데이터가 잘들어와서 WAL에 까지 잘 저장됐는데, 주키퍼에 업데이트를 하기전에 오류가 발생한다면, 시스템 간의 일관성이 깨지게 된다. 왜냐면, 스파크 스트리밍 입장에서는 데이터를 정상적으로 잘 받아서, 잘 저장시켰지만. 카프카 입장에서는 주키퍼의 카프카 오프셋이 업데이트 되질 않았으니 데이터가 정상적으로 보내지지 않았다고 생각할 것이기 때문이다. 그래서 카프카는 이미 잘 저장되어 있는 데이터를 다시 보내게 된다.
이렇게 움직이는 정보(혹은 이미 보내진 정보)를 가지고, 시스템 간의 일관성을 맞추려니 잘 안되는 것이다. 이러한 문제를 방지하기 위해, 이런 일관성을 관리하고 유지하는 엔티티를 하나로 만들어주자는 생각에서 Direct Approach가 나오게 됐다.
Approach 2: Direct Approach (No Receivers)
이 방식에서는 데이터 오프셋에 대한 관리를 오직 스파크 스트리밍에서만 하도록 했다. 그리고 오류가 발생해서 데이터를 다시 보내야하는 경우(replaying data)엔 카프카의 Simple Consumer API를 사용하여 해결할 수 있도록 했다.
이것은 기존 리시버 + WAL 방식과는 완전히 다른 방식이다. 리시버를 통해 연속적인 데이터를 받고, 그것을 WAL에 저장하는 방식 대신, 간단하게, 모든 batch interval이 시작되는 순간마다 소비해야하는 (Consume) 데이터의 오프셋의 범위를 결정하게 했다. 그 후, 각 batch의 job이 실행될 때, 해당 범위의 데이터를 카프카로 부터 불러온다 (HDFS에서 파일을 읽는 방식과 비슷하다). 물론, 이 오프셋은 체크포인트를 이용해서 안정적으로 저장되며 오류가 발생했을때 그것을 이용해서 복구할 수 있다.
이러한 방식으로 스파크 스트리밍은 Direct Approach를 이용해, 카프카의 데이터들을 한번에, 효율적으로 받으면서도, 리시버와 WAL가 필요없게 만들었다.
'Data Science > Spark Streaming' 카테고리의 다른 글
스파크 스트리밍, Spark Streaming (0) | 2017.11.16 |
---|