1. 배경
모바일 디바이스와 더불에 IoT 시대로 들어서면서, 데이터는 양만큼이나 빠른 속도가 중요해졌다. 빅데이터가 만족시켜야 하는 3가지 요소, (3V, Volume, Velocity, Variety), 중 속도(Velocity)의 중요성이 더욱 커졌다는 것이다.
그에 따라, 우리가 평소에도 많이 들어본 것 처럼, "실시간" 처리가 중요해졌고, 그와 밀접한 기술적 명명인 "스트림" 처리가 중요해졌다.
스파크 스트리밍은 그런 니즈로 부터 나왔다고 할 수 있다. 스파크 스트리밍을 통해서, 우리는 사용자로 부터 혹은 서버로 부터 실시간으로 데이터를 받아들여서 처리하고 분석할 수 있다.
2. 스파크 스트리밍과 DStream
RDD의 개념을 바탕으로 구축된 스파크처럼 스파크 스트리밍은 DStream (Discretized stream, 이산 스트림)이라 불리는 추상화 개념을 바탕으로 한다.
DStream은 시간별로 도착한 데이터들의 연속적인 모음이다.
내부적으로 각각의 DStream은 각 시간별로 도착한 RDD들의 연속적인 모음으로 구성된다.
DStream은 플럼(Flume), 카프카(Kafka), HDFS 등 다양한 입력 원천으로부터 만들어질 수 있다.
한번 만들어지면 DStream은 두 가지 타입의 연산을 제공한다.
새로운 DStream을 만들어 낼 수 있는 트랜스포메이션(transformation)
외부 시스템에 데이터를 써주는 결과 연산(output operation)
DStream은 RDD에서 가능한 것과 동일한 종류의 많은 연산들을 지원한다.
- 구현
- 스트리밍 기능의 주 시작점인 StreamingContext를 생성한다. 이것은 하부 계층에 존재하는 SparkContext도 생성시키며, 데이터를 처리하는데 사용된다.
- 예제 코드
시스템이 데이터를 받았을 때, "error"라는 단어를 포함하는 라인만 필터링해서 프린트한다.
val ssc = new StreamingContext(conf, Second(1))
val lines = ssc.socketTextStream(“localhost”, 7777)
val errorLines = lines.filter(_.contains(“error”))
errorLines.print()
위의 코드는 데이터가 들어왔을때 처리되는 로직이 기술되어 있는 것이다. 스파크 스트리밍이 데이터를 받기 시작하려면 명시적으로 StreamingContext에 start()를 호출해야 한다. 그러면 스파크 스트리밍은 내부의 SparkContext에 스파크 작업들을 스케줄링하기 시작한다.
이는 별로의 스레드에서 실행되므로 사용자 애플리케이션이 종료하더라도 작업을 유지하기 위해 스트리밍 연산이 완료되기를 기다리도록 awaitTermination을 호출할 필요가 있다.
StreamingContext는 한 번만 시작할 수 있으므로 DStream과 출력 연산에 대한 것들을 모두 작업한 후에 시작해야 한다는 점을 명심해야한다.
ssc.start() //StreamingContext를 시작하고 “종료”를 기다리게 한다.
ssc.awaitTermination() //작업이 끝나기를 기다린다.
3. 아키텍처와 추상 개념
“마이크로 배치(micro-batch)”라 불리는 아키텍처를 사용한다. 스트리밍 처리를 데이터의 작은 배치 단위들 위에서 각 배치 처리의 연속적인 흐름으로 간주하고, 스파크 스트리밍은 다양한 입력 소스로부터 데이터를 받아들여 이것들을 작은 그룹들로 묶는다.
새로운 배치들은 정해진 시간 간격마다 만들어진다.
스파크 스트리밍에서 프로그래밍적인 추상화 개념은 이산 스트림 혹은 DStream이라 불리는 RDD의 연속적인 묶음이다 (아래 그림 참조).
DStream은 외부의 입력 소스로부터 만들 수도 있지만, 다른 DStream에 트랜스포메이션을 적용해서 만들 수도 있다.
DStream은 새로운 “상태 유지(stateful)” 트랜스포메이션을 지원해서, 여러 시간 범위에 걸쳐진 데이터를 집합 연산하는 것도 가능하게 해준다.
트랜스포메이션 이외에 DStream은 위의 예제 코드에 썼던 print() 처럼 출력 연산을 지원한다.
출력 연산은 데이터를 외부 시스템에 쓴다는 점에서 RDD의 액션과 유사하나 스파크 스트리밍에서 이는 매시간 단계마다 주기적으로 실행되며 출력을 묶음(배치) 단위로 생성한다.
드라이버와 작업 노드 간의 (그리고 내부에서) 일어나는 스파크 스트리밍의 실행은 아래의 그림과 같다. 각 입력 소스마다 스파크 스트리밍은 리시버(Receiver)를 실행시키는데, 이는 애플리케이션의 익스큐터들 안에서 데이터를 모으고 RDD에 저장하는 태스크이다. 이들은 데이터를 받아서 장애 대응을 위해 다른 익스큐터에 복제한다 (기본적으로 설정된 동작). 이 데이터는 캐시되는 RDD와 동일한 방식으로 익스큐터의 메모리에 저장된다.
드라이버 프로그램의 StreamingContext는 이 데이터들을 처리하기 위해 주기적으로 스파크 작업들을 실행하고 이전 시간 단계의 RDD와 연결한다.
스파크 스트리밍은 기본적으로 들어온 데이터를 복제하기 때문에 단일 작업 노드 장애에 대응할 수 있다. 그러나 단순히 리니지를 이용할 경우 재연산은 프로그램의 시작 단계부터 다시 이루어지므로 시간이 오래 걸리 수 있다. 따라서 스파크 스트리밍에서는 체크포인팅(checkpointing)이라는 매커니즘을 이용해서 파일 시스템(HDFS 나 S3 같은..)에 주기적으로 상태를 저장한다.
대개는 데이터의 5~10개 배치 묶음마다 체크포인팅을 설정한다. 따라서 스파크 스트리밍은 유실된 데이터를 복구하기 위해 가장 최근의 체크포인트로 돌아가기만 하면 된다.
'Data Science > Spark Streaming' 카테고리의 다른 글
스파크 스트리밍 + 카프카, Spark streaming + Kafka (1) | 2017.11.16 |
---|