Data Science/Spark Streaming

스파크 스트리밍 + 카프카, Spark streaming + Kafka

알파해커 테크노트 2017. 11. 16. 16:32
반응형

요즘에 구상하고 있는 연구의 모델이, 각 서버로 부터 실시간 로그를 받아서 값을 분석하여 최적화하는거라 스파크 스트리밍을 보고있다.


스파크 스트리밍을 선택하자라는 결론에 도달했을때, 입력 소스는 어떻게 할 것인가 하는 고민이 있었다. 뭐, 다양한 방법이 있을 수 있겠지만. 요새 핫하다는 아파치 카프카를 사용해봐야겠다고 마음먹었다. (엔지니어라면 핫한 기술에 손이 가기 마련이니..)


무엇보다, 스파크 공식 홈페이지에서 카프카와 연동하는 방법에 대해 아주 잘 기술되어 있어서 기술을 적용하는데에 있어서도 어려움이 상대적으로 적다고 할 수 있다.


그렇다면, 아파치 카프카(Apache Kafka)는 무엇일까.

카프카는 대용량 실시간 처리를 위해 사용하는 메시징 시스템이며, Pub-Sub 구조를 이용한다.

요즘 잘나간다는 글로벌 서비스들(LinkedIn, Twitter, Netflix, Tumblr, Foursquare 등)은 대용량 데이터를 처리하기 위해 카프카를 쓰고 있다.


카프카에 더 자세한 이야기는 다른 포스팅을 통해 따로 다루도록 하고,

이번 포스팅의 주 목적인 스파크 스트리밍 + 카프카에 대해서 알아보자.




카프카에 대한 내장 지원 모듈을 쓰면 쉽게 많은 토픽에 대한 메세지를 처리할 수 있다.

이를 사용하기 위해서는 프로젝트에 (Maven을 쓸 경우엔 Pom.xml에) spark-streaming-kafka_2.10 아티팩트 (artifact)를 추가해야한다. 


물론 아티팩트나 버전에 기술되는 값은 상황에 맞게 적절하게 기술되어져야 한다. 본인이 사용하고 있는 스파크 버전, 스칼라 버전, 카프카 버전 등을 잘 살펴봐야하며, 서로 호환이 되는 것인지도 확인해야한다. 


또한, 자신이 작성한 메소드나 객체가 해당 버전의 디펜던시를 통해 import 시킬 수 있는 것인지도 확인이 되어야, 실제로 코딩할 때 예상치 못한 부분에서 에러가 발생하여, 어리둥절해 지는 순간을 최소화할 수 있다. 


참고로 이런 부분은 스파크 공식 홈페이지를 통해 가장 간단하게 해결할 수 있다. 그곳의 도큐먼트에 아주 친절하게 잘 설명되어있다. (어설프게 구글링하다가 디펜던시도 꼬이고 코드도 꼬이는 불상사를 막자..)


어쨌든 디펜던시들이 정상적으로 잘 적용됐을때, 제공되는 KafkaUtils 객체는 StreamingContext에서 동작하며, 카프카 메세지의 DStream을 생성한다.


카프카 스트림은 여러 토픽에 대한 메세지를 받아 볼 수 있으므로, 여기서 만들어진 DStream은 토픽과 메세지의 쌍들로 구성된다.




스파크 스트리밍과 카프카를 연결하는 방법에는 두가지 접근법있다.

  1. Recevier-based Approah
  2. Direct Approach (No Receviers)
스파크는 데이터를 주고 받는 과정에 있어서, 더 나은 수준의 fault-tolerance와 신뢰 수준을 보장하기 위해, Spark 1.2에서 Write Ahead Logs (WAL)를 선보였었다. 이를 통해 스파크는 신뢰가 높은 입력 소스(Flume, Kafka, Kinesis)로 부터 들어온 것들 뿐만 아니라, 평범하고 오래된 소켓으로 부터 들어온 신뢰도 떨어지는 것들까지, 오류로 인한 데이터 손실이 없도록 (최소화하도록) 보장했다.


특히, 카프카와 같은 입력 소스는 데이터 스트림의 임의의 위치에 replaying을 허용한다. 이 경우, 그 입력 소스는 스파크 스트리밍에게 데이터 스트림 소비에 관한 더 많은 제어권을 부여하기 때문에, 더 강력한 fault-tolerance의 의미를 가진다. 이것이 Recevier-based Approach다.


Spark 1.3에서 Direct API라는 개념을 선보였는데, 이를 통해 WAL를 사용하지 않고도 한번만에 처리할 수 있도록 했다. 이것이 Direct Approach (No Receivers)이고 exactly-once 방식이라고도 한다.




Approach 1: Receiver-based Approach


이 방식이 돌아가는 방식은 아래와 같다.

  1. 데이터는 카프카 리시버 (Kafka Receiver)로 부터 지속적으로 받아진다. 
    • 이때, 리시버는 스파크 워커 노드에 있으며, 카프카의 High-Level Consumer API를 사용한다
  2. 들어온 데이터는 스파크 워커 노드의 메모리와 WAL에 저장된다 (또한, HDFS에도 복제될 것이다). 그리고 카프카 리시버는 Zookeeper에 카프카의 오프셋을 업데이트한다.
  3. 들어온 데이터와 그것의 WAL 위치에 대한 정보 역시 저장된다. 만약에 오류가 발생하면, 그 정보를 이용해서 다시 데이터를 읽고 처리할 수 있다.  



이렇게 하면, 카프카로 부터 들어오는 데이터가 유실되거나 손실되지 않을 거라는 보장을 할수는 있지만, 오류로 인해 어떤 데이터 레코드가 한번 이상 처리될 수 있는 가능성이 있다. 다시말해 이것은 at-least-once 방식이다. 


가령, 어떤 데이터가 잘들어와서 WAL에 까지 잘 저장됐는데, 주키퍼에 업데이트를 하기전에 오류가 발생한다면, 시스템 간의 일관성이 깨지게 된다. 왜냐면, 스파크 스트리밍 입장에서는 데이터를 정상적으로 잘 받아서, 잘 저장시켰지만. 카프카 입장에서는 주키퍼의 카프카 오프셋이 업데이트 되질 않았으니 데이터가 정상적으로 보내지지 않았다고 생각할 것이기 때문이다. 그래서 카프카는 이미 잘 저장되어 있는 데이터를 다시 보내게 된다.


이렇게 움직이는 정보(혹은 이미 보내진 정보)를 가지고, 시스템 간의 일관성을 맞추려니 잘 안되는 것이다. 이러한 문제를 방지하기 위해, 이런 일관성을 관리하고 유지하는 엔티티를 하나로 만들어주자는 생각에서 Direct Approach가 나오게 됐다.




Approach 2: Direct Approach (No Receivers)


이 방식에서는 데이터 오프셋에 대한 관리를 오직 스파크 스트리밍에서만 하도록 했다. 그리고 오류가 발생해서 데이터를 다시 보내야하는 경우(replaying data)엔 카프카의 Simple Consumer API를 사용하여 해결할 수 있도록 했다.


이것은 기존 리시버 + WAL 방식과는 완전히 다른 방식이다. 리시버를 통해 연속적인 데이터를 받고, 그것을 WAL에 저장하는 방식 대신, 간단하게, 모든 batch interval이 시작되는 순간마다 소비해야하는 (Consume) 데이터의 오프셋의 범위를 결정하게 했다. 그 후, 각 batch의 job이 실행될 때, 해당 범위의 데이터를 카프카로 부터 불러온다 (HDFS에서 파일을 읽는 방식과 비슷하다). 물론, 이 오프셋은 체크포인트를 이용해서 안정적으로 저장되며 오류가 발생했을때 그것을 이용해서 복구할 수 있다.



이러한 방식으로 스파크 스트리밍은 Direct Approach를 이용해, 카프카의 데이터들을 한번에, 효율적으로 받으면서도, 리시버와 WAL가 필요없게 만들었다.



반응형

'Data Science > Spark Streaming' 카테고리의 다른 글

스파크 스트리밍, Spark Streaming  (0) 2017.11.16