티아카데미 아파치 스파크 입문과 활용 TIL - spark streaming

2020-12-03

.

Data_Engineering_TIL(20201202)

study program : T아카데미 - 아파치 스파크 입문과 활용

** URL : https://tacademy.skplanet.com/frontMain.action

[학습내용]

  • spark은 데이터 병렬처리를 위한 프레임워크다.

  • 빅데이터는 데이터 규모가 단순히 크다는 의미가 아니다.

1

  • 데이터 처리의 세가지 양상

이 장표에서 얘기하고 싶은거는 spark streaming은 완전실시간 보다 Micro batch에 가깝다는 것이다. small batch 사이즈를 갖고 있다.

1) Batch Processing

processing data en masse

big & complex

higher latencies ex) MR

2) Stream Processing

one-at-a-time processing

computations are relatively simple and generally independent

sub-second latency ex) Storm

3) Micro-Batching

small batch size (batch+streaming)

latency in seconds

windowing and stageful computation ex) Spark Streaming

  • Stream Processing Framework

스트리밍데이터는 아래 그림과 같이 두가지 양상이 있다.

어떤 데이터가 들어오고 source operator(예를들어 카프카)로부터 데이터를 pulling하게 되고, 각각의 n개의 processing operator가 동작을 한다. 최종적으로 straming processing에서 sink를 시킨다. 스톰이나 flink같은 것들이 예시다.

2

그리고 아래와 같은 경우가 spark streaming을 쓰는 경우의 예시다. 여기서 receiver가 카프카가 되는 것이고, 데이터가 들어오면 바로바로 처리하는 것이 아니라 마이크로 배치로 레코드를 작은 배치를 가져가게 된다.

3

이 그림에서 얘기하고 싶은 것은 streaming data 처리는 반드시 spark streaming이 아니라는 것이다. 스트리밍 데이터 처리 양상에 따라 또는 상황에 따라 여러 옵션이 있다는 것이다.

4

  • Spark Streaming Integration

data source가 있을테고, streaming processing 후에 sink하는 destination이 있을것이다.

카프카에서 데이터를 가져와서 그 데이터를 연산을 한 후에 HDFS나 데이터베이스에 sink를 하겠다는 것이다.

가져오는 streaming input data는 spark streaming을 통해서 내부적으로 사용자가 지정한 배치사이즈로 데이터가 쪼개지게 된다.

5

  • spark streaming architecture

하단에 DStream 별로 연산이 수행된다.

6

  • spark streaming code 예시 (스칼라 스크립트)

spark streaming 개발을 할때 이런 코드를 사용하게 된다.

정형데이터를 처리할 경우에는 spark dataframe을 사용하기 때문에 RDD 프로그래밍이라고 보면 된다.

import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._

// create a StreamingContext with a SparkConf configuration
// 처리하고자 하는 데이터의 시간간격을 10초로 주었다.
val ssc = new StreamingContext(sparkConf, Seconds(10))

// create a DStream that will connect to serverIP:serverPort
// 소켓으로 특정 아이피포트를 연결을 했다.
val lines = ssc.socketTextStream(serverIP, serverPort)

// split each line into words
// 데이터를 line by line으로 가져오는데 위에서 설정한것처럼 10 단위로 가져온다.
val words = lines.flatMap(_.split(" "))

// count each word in each batch
// 그리고 10초단위로 aggregation을 하게 된다.
val pairs = words.map(word => (word, 1))
val wordCounts = pairs.reduceByKey(_ + _)

// print a few of the counts to the console
// 스트림 데이터를 받아서 단어가 각각 몇번나오는지 print하게 된다.
wordCounts.print()

ssc.start() // Start the computation
ssc.awaitTermination() // Wait for the computation to terminate

트위터의 hashtag를 카운팅하는 예제이다.

트위터 스트리밍 같은 경우에는 spark streaming에서 데이터를 처리할 수 있도록 api를 제공해준다.

로그인 정보를 집어넣으면 트위터 hastag 데이터를 가져올 수 있다. 여기서는 1초단위로 데이터값들을 가져오게 되고 이 데이터를 가져와서 분단위로 해서 value 연산한다. time window length를 1초로 했다. 그리고 sliding interval을 갖고 그 값을 연산하게 된다.

val tweets = ssc.twitterStream(<Twitter username>, <Twitter password>)
val hashTags = tweets.flatMap (status => getTags(status))
val tagCounts = hashTags.window(Minutes(1), Seconds(1)).countByValue()

7

아래와 같이 라스트 1분에 대한 hashtag count를 가져오게 된다. 그리고 초단위로 계속 슬라이딩이 된다.

val tagCounts = hashTags.window(Minutes(1), Seconds(1)).countByValue()

8

  • spark streaming Checkpointing

인메모리에서 연산을 하다가 중간에 문제가 생기면 얘를 재연산을 어떻게 해야하냐라는 의문이 생길수가 있다. spark straming에서는 infinite한 lineages 형태이기 때문에 각각의 rdd가 처리(변환)됨에 따라서 이 RDD가 재생성되는데 이것들을 checkpoint로 저장하게 된다. 근데 이런 checkpoint와 같이 status를 계속 저장하게 되어 너무 커지게 되면 복구시간이라던지 연산하는 시간이 또 커질 수 있다.

카프카에서 내가 데이터를 얼마나 처리했는지 offset 을 관리하는 것도 정말 중요하다.

이 슬라이드에서 말하고 싶은 것은 recomputing하기 위해서는 재연산이 필요한데 재연산할때 체크포인팅을 어떻게 관리할 것인가가 중요하다. 결론은 너무 큰 사이즈로 하지 말라는 것이다. 큰사이즈로 하면 얘를 다시 recomputing했을때 시간이 오래걸린다는 것이다.

그래서 체크포인팅을해서 실패된 작업이 재작업 될수 있도록 설정을 해줘야 한다.

9

  • Accumulator & Broadcast

이거는 RDD 공통내용이다. 원래 RDD의 클러스터 전체의 shared variable이고 spark straming에서 활용할 수 있는 부분은 Accumulator은 error count를 할 수 있다. 클러스터 전체에서 사용할 수 있는 증분이 가능한 데이터 구조라고 이해하면 된다. Broadcast는 클러스터에서 read-only로 가져오는 예를 들어서 불변하는 기준정보다.

1) Accumulator

Provides a simple syntax for aggregating values from worker nodes back to the driver program

ex) count events that occur during job execution for debugging purposes

2) Broadcast

Efficiently send a large, read-only value to all the worker nodes for use in one or more Spark operations

  • Accumulator

driver에서 갖고 있는 기준정보. 운영용에서는 적용하기 어려운 부분이 있다 이정보 자체가 드라이버에 메모리 부하가 갈 수 있기 때문이다. 테스트용 검증용으로 적합하다.

Often, an application needs to aggregate multiple values as it progresses

Accumulators generalize MapReduce’s counters to enable this

val badRecords = sc.accumulator(0)
val badBytes = sc.accumulator(0.0)

records.filter(r => {
    if (isBad(r)) {
        badRecords += 1
        badBytes += r.size
        false
    } else { true
           }
}).save(...)

printf(Total bad records: %d, avg size: %f\n,
badRecords.value, badBytes.value / badRecords.value)
  • Broadcast

이거자체가 large read-only variable 이라고 할 수 있다.

Normally, Spark closures, including variables they use, are sent separately with each task

In some cases, a large read-only variable needs to be shared across tasks, or across operations

Examples: large lookup tables, “map-side join”

lookup table인데 얘를 매번 join을 해야한다. 매번 join을 하지만 매번 읽어서 join하기 보다는 한번 로딩해놓고 얘를 join해버리면 되는 경우에 활용할 수 있다.

그런데 스팍 스트리밍 같은 경우에는 조심해야하는게 스팍 스트리밍은 한번 구동하면 일반적으로 오랜시간동안 계속 running한다. 이때 JVM GC문제가 있을 수 있다. 그래서 가급적이면 Broadcast나 Accumulator를 spark streaming에 적용하는 것은 별로 좋지 않다.

val pageNames = sc.textFile(pages.txt).map(...)
val pageMap = pageNames.collect().toMap()
val bc = sc.broadcast(pageMap)
Type is Broadcast[Map[...]]
val visits = sc.textFile(visits.txt).map(...)
val joined = visits.map(v => (v._1, (bc.value(v._1), v._2)))
  • Architecture적인 고려사항

1) Large Data : computational resources

2) Scalability : scale-out architecture

3) Distributed Processing : CPUs/RAM

4) Fault tolerant : resume processing of the failed steps in a job

5) Enterprise constrains : SLAs (job scheduling, reprocessing,clustering and so on)

  • Real-time data processing use cases

1) Real-Time monitoring

2) Real-Time Business Intelligence

3) Operational Intelligence (CEP)

특정조건에 트리거링 되었을때 동작하는 것

4) Assembly lines (제조업 데이터)

  • Performance Tuning 요소

1) Batch and Window Sizes

500 milliseconds has proven to be a good minimum size for many applications

Start with a larger batch size (around 10 seconds) and work your way down to a smaller batch size

Consider increasing this interval for expensive computations if it is a bottleneck.

2) Level of Parallelism

프로세싱의 병렬성을 높이는 요소가 RDD repartition이다. 받은 data pipe가 10개인데 이 10개의 task가 헤비하다고 하면 100개로 repartition을 하는 것이다. 그러면 task가 동시에 100개를 병렬로 처리할 수 있다.

** repartition의 주의사항 : repartition하게 되면 전체 데이터가 shuffing된다.

Increasing the number of receivers (kafka -> topic repartition)

Explicitly repartitioning received data : Stream.Repartition

Increasing parallelism in aggregation ex) reduceByKey() => hash-partitioned

rdd에서 aggregation group by key를 하지말고, reduceByKey로 하면 key가 shuffling되지 않기 때문에 group by key보다는 빠른 성능을 낼 수 있다는 것이다.

3) GC and Memory Usage

spark-submit –conf spark.executor.extraJavaOptions=-XX:+UseConcMarkSweepGC App.jar

set spark.cleaner.ttl ( evil RDDs)

** spark streaming 의 경우 gc관리를 해줘야 한다. jvm 모니터링해서 어디서 병목이 일어나는지, 어디서 문제가 발생하는지 모니터링 해야한다.

  • Streaming Using Kafka

Kafka is a distributed, partitioned, and replicated commit log service. It is a distributed messaging server. Kafka maintains the message feed in categories called topics. For each topic, Kafka maintains the partitioned log. This partitioned log consists of one or more partitions spread across the cluster

카프카는 메세지의 카테고리를 topic으로 관리한다. 각각의 topic들은 n개의 partitioning으로 구성이 된다. 데이터가 한곳에 저장되는 것이 아니라 여러곳에 복제되어 저장된다. HDFS에 블락단위로 저장되는 컨셉과 유사하다.

카프카는 메세지를 보내는 것을 producer라고 하고, 컨슈머 그룹을 여러개를 정할 수가 있다. 그래서 컨슈머 그룹을 여러개 둬서 하나의 데이터에 대해서 각각의 컨슈머가 데이터를 어디까지 소비했는지에 대한 정보인 offset을 이용하여 데이터를 재소비하거나 재가공이 가능하다.

10

  • 그러면 spark에서 kafka에서 오는 데이터를 어떻게 처리하냐

아래는 RDD 코드로 카프카에 연결해서 데이터를 처리하는 시나리오다.

val ssc = new StreamingContext(sc, Seconds(2))
val zkQuorum = "localhost:2181"
// 아래에 group이라는게 consumer group을 의미한다.
val group = "test-group"
val topics = "test"
val numThreads = 1
val topicMap = topics.split(",").map((_,numThreads.toInt)).toMap
val lineMap = KafkaUtils.createStream(ssc,zkQuorum, group, topicMap)
val lines = lineMap.map(_._2)
val words = lines.flatMap(_.split(" "))
val pairs = words.map(x => (x,1))
val runningCounts = pairs.updateStateByKey( (values: Seq[Int], state: Option[Int]) => Some(state.sum + values.sum)) 
runningCounts.print
//hdfs에 checkpoint를 설정하게 된다.
ssc.checkpoint("hdfs://localhost:9000/user/hduser/checkpoint")
ssc.start
ssc.awaitTermination

11

  • Spark 2.0 : Structured Streaming

spark streaming 자체는 RDD 프로그래밍인데 2.0버전 이후로 하이레벨의 데이터프레임 기반에 스트리밍을 지원하는 것이 Structured Streaming이다.

12

1) Structured Streaming

High-level streaming API built on Spark SQL engine

Runs the same queries on DataFrames

Event time, windowing, sessions, sources & sinks

2) Unifies streaming, interactive and batch queries

Aggregate data in a stream, then serve using JDBC

Change queries at runtime

Build and apply ML models

  • Spark 2.0 Example: Page View Count

Input: records in Kafka

Query: select count(*) group by page, minute(evtime)

Trigger:“every 5 sec”

Output mode: “update-in-place”, into MySQL sink

logs = ctx.read.format("json").stream("s3://logs")
logs.groupBy(logs.user_id).agg(sum(logs.time)).write.format("jdbc").stream("jdbc:mysql//...")

pure streaming system 같은 경우에는 input 들어오고 연산하고 sink하는 구조를 갖는다면 continuous application은 dataframe같은 기존에 static data와 결합해서 그 결과를 받을 수도 있고, 또는 stream으로 들어오는 데이터에 대해서 add hoc query를 날릴수도 있다.

13

Structured Streaming을 spark 의 high level api로 구현이 가능하다.

14

  • Continuous windowed aggregation

15

  • watermarking for handling late data

late 메세지는 기지국 데이터가 데이터를 바로바로 제때 보내주면 좋은데 여러가지 네트워크 사정으로 late한 메세지가 발생하게 된다. spark streaming 기준으로는 이런 late한 메세지는 그냥 탈락인데 이 경우에는 watermark(=쓰레시홀드) 옵션을 줘서 아래 코드를 기준으로 15분은 기다려준다. 그래서 late 한 데이터도 데이터 프로세싱을 할 수 있게 해준다.

16

  • JOINING streams with static data

당연하겠지만 이거 자체가 데이터 프레임이기 때문에 static data를 바로 붙여서 사용할 수 있다. 예제를 보면 카프카에서 ‘iot-updates’라는 토픽을 땡겨오고, jdbc에서 ‘iot-device-info’ 라는 메타데이터도 땡겨온다. 그런 다음에 데이터프레임이기 때문에 아래 예제와 같이 join이 가능하다. 그래서 joined dataset을 구성하고, HDFS나 mysql이나 다른 external system으로 sink할 수 있는 부분을 지원한다.

17

  • 또한 다양한 output mode를 지원한다.

디버깅도 지원하고 여기서는 complete mode라고 해서 aggregation이 한번에 가는 것이고, 디버깅용이라고 하면 append mode로 데이터를 계속 확인할 수 있다. 그래서 특정 조건에 대해서 데이터를 남길때 어떤 방식으로 남길지 다 조정이 가능하다.

18

  • query management

하나의 데이터 스트림에 대해서 여러가지 스트림을 생성할 수 있다. 만약에 카프카 스트림에서 데이터를 받아온다고 하면 이것들에 대해서 여러가지 쿼리를 생성해서 각각의 쿼리별로 어떤거는 HDFS에 저장하고 어떤거는 집계해서 다른 in-memory로 넘기는 작업을 할 수 있다. 그래서 각각의 쿼리 매니지먼트를 데이터 프레임 기반으로 할 수 있다.

19