티아카데미 아파치 스파크 입문과 활용 TIL - spark 운영과 모니터링

2020-12-08

.

Data_Engineering_TIL(20201208)

study program : T아카데미 - 아파치 스파크 입문과 활용

** URL : https://tacademy.skplanet.com/frontMain.action

[학습내용]

  • long running processing인 spark streaming에서는 GC 모니터링을 잘 해줘야 한다.

  • 드라이버로 가는 액션들이 상당히 헤비한 편이기 때문에 이부분도 조심해야 한다.

  • task 갯수는 core갯수와 연관이 있기 때문에 가용할 수 있는 물리적인 코어가 100개라고 하면 동시에 돌 수 있는 코어는 100개이다. 근데 내가 수행하고자 하는 전체 spark task가 3000개라면 코어가 100개가 동시에 계속 연산하면서 처리할 것이다.

  • 데이터가 스큐되었는지도 확인해야한다. 특정 task에 부하(많은 record)가 확 몰려있는 경우가 있다. 프로세스가 os 문제든, 네트워크 문제든, 디스크 문제든 스큐되었다면 RDD를 repartition하던지 아니면 하는일이 거의없는 task들이 많다면 coalesce를 이용해서 병렬성을 유지해줘야 한다.

  • spark runtime architecture

spark driver에서 sparksession(sparkcontext)이 생성이 된다. spark-submit을 하면 얀같은 클러스터 매니저로 잡이 실행될 것을 요청한다. spark job은 단일노드에서 실행되는 것이 아니라 다수의 노드들이 병렬하게 수행되는 것이기 때문에 spark-submit을 할때 클러스터 매니저에게 필요한 리소스를 요청한다. 익스큐터 갯수, 코어갯수, 익스큐터별 메모리설정 등을 요청한다. 그러면 클러스터 매니저는 spark-submit으로 요청받은 내용에 맞게 cluster worker node를 할당해준다. 리소스가 부족하다면 클러스터 매니저는 spark-submit 한것을 리소스가 부족하다는 메세지와 함께 취소시킨다.

1

  • Spark runtime components

아래에 왼쪽 그림은 클러스터 안에 드라이버가 포함된 것이고, 오른쪽 그림은 클러스터 밖에 드라이버가 포함되어 있는 경우이다. spark-submit cluster 모드와 client 모드라고 보면 된다. 일반적으로 spark-submit은 cluster 모드로 실행하는 경우가 많다. 만약에 드라이버가 문제가 생기면 다시 재구동해야하거나 하는 것들을 클러스터 매니저가 담당하고 있기 때문이다. 간혹가다 드라이버가 클러스터 외부에 존재하는 경우도 있다. 드라이버에 클러스터 안에 있는 것보다 별도의 전용 리소스를 점유해도 될때는 클러스터 외부에서 띄우는 경우도 있다.

2

  • Spark Driver

spark session을 생성하고, 클러스터 매니저에게 자원을 요청한다. 각각의 action을 통해서 job이 submit되고 n개의 task로 나뉘어지게 되는데 각 task들이 일하게 되는 것들은 각 slave worker들에서 각각의 task들이 수행이 되는 것이다.

3

  • 용어정리

1) driver : Process that contains the SparkSession(SparkContext)

2) executor : Process that executes one or more Spark tasks

spark의 task들이 병렬하게 동작할 수 있도록 각각의 노드에 분산되는 프로세스

3) master : Process that manages applications across the cluster

4) worker : Process that manages executors on a particular node

  • Spark Deployment - StandAlone Mode

spark standalone 모드에서는 마스터 노드의 백업이나 리더지정이나 이런것들을 지정해서 관리해줘야 한다.

4

  • Spark Deployment - Yarn Mode

일반적으로 하둡클러스터를 사용하는 경우 yarn을 많이 사용하게 된다.

yarn을 쓰는 목적은 사실상 하둡 클러스터에 spark만 있는게 아니기 때문이다. 클러스터 자원을 hive도 쓰고, hadoop mapreduce도 쓰고, spark도 쓰고 다양한 어플리케이션을 쓰겠다는 것이다. 얀에서 전체 리소스 매니지먼트를 하고 내가 던진 spark job에 대해서 리소스를 요청하고, 리소스를 획득해서 비로소 spark job을 돌릴 수가 있다. 이말은 spark job을 제출할때 yarn과 통신하는 과정에서 boot time이 생길 수 밖에 없다. 그래서 yarn에 어떤 spark job을 돌릴때 메모리랑 cpu 코어 몇개 쓸거고 드라이버는 어떻게할거고 익스큐터는 어떻게할거고 이런정보를 보내면 yarn에서 해당 자원의 사용가능 여부를 확인해서 리소스 정보를 주게된다. 만약에 리소스가 부족한 경우 리소스가 없다고 fail을 return할 것이다.

5

각각의 노드매니저 안에 얀 컨테이너로 구동이 되고, 얀 컨테이너 안에 구동되는게 익스큐터들이 뜨게 되고, 캐시영역이 들어가게 된다. 그리고 우리가 원하는 task 영역들이 병렬적으로 수행될 수 있도록 제공한다.

6

  • yarn 실행모드

1) YARN client mode: Spark driver executes the client machine (the machine used for submitting the job), and the YARN application master is just used for requesting the resources from YARN.

2) YARN cluster mode: Spark driver runs inside the YARN application master process, which is further managed by YARN on the cluster

클러스터 모드에서는 리소스 매니저로부터 이런 spark app master와 익스큐터들을 할당받게 된다. 얀에서 드라이버가 뜨는 구조를 가지게 된다. 반면에 클라이언트 모드는 드라이버가 클라이언트에 뜨는 것이다.

7

클라이언트 모드를 쓰는게 또 언제가 있냐 sparksql을 띄울때 아래 명령어와 같이 thrift server를 구동해서 쓸 수 있다. thrift server를 띄우면 jdbc나 odbc 연결이 가능하다. bi 솔루션을 연결해서 데이터를 조회하거나 sql 쿼리를 처리하는 경우인데 이거의 문제점은 thrift server를 띄우면 driver가 thrift server가 되는 것이고, 드라이브에 부하가 많이 갈 수 밖에 없다. 필요로하는 리소스가 상대적으로 익스큐터에 비해서 많이 점유를 해야한다. 그래서 아래 명령어와 같이 드라이버 코어와 메모리를 별도로 부여할 수 있다.

sbin/start-thriftserver.sh
--master yarn \
--deploy-mode client \
--executor-memory 30G \
--num-executors 60 \
--executor-cores 8 \
--driver-cores 12 \
--supervise \
--driver-memory 40G \
--name "thrift"
  • Dynamic Resource Allocation on YARN

spark이 구박받는 이유가 hive 잘 돌고 있는데 spark이 껴들어서 자원을 과점유 하는 경우가 있다. 리소스의 할당을 워크로드에 따라서 스케일 인아웃을 하는 것이다. 이게 어떻게 가능하냐면 external shuffle 이라는걸 이용한다. 익스큐터가 스케일 아웃할 경우 각각의 익스큐터들에 의해서 지금 수행하고 있는 RDD에 대한 정보를 보내줘야 한다. 그때 사용하는게 external shuffle이다.

8

Dynamic Resource Allocation 사용을 위해서 yarn에서 아래와 같이 설정을 해줘야 한다.

yarn-site.xml에 external shuffle 관련정보를 mapping을 해주고, spark config 또는 job을 submit할때 이런 설정들을 enable 시킬 수 있다.

1) shuffle plugin add jar

2) yarn-site.xml add plugin

<property>
<name>yarn.nodemanager.aux-services</name>
<value>mapreduce_shuffle,spark_shuffle</value>
</property>
<property>
<name>yarn.nodemanager.auxservices.mapreduce.shuffle.class</name>
<value>org.apache.hadoop.mapred.ShuffleHandler</value>
</property>

3) edit spark-default.conf

spark.dynamicAllocation.enabled true
spark.shuffle.service.enabled true
spark.dynamicAllocation.minExecutors 50
spark.dynamicAllocation.maxExecutors 100
spark.dynamicAllocation.initialExecutors 50
# 아래의 설정을 통해 익스큐터가 늘어나야할지, failed 되는게 있다면, 지연이 되는게 있다면 더 늘어날 것이다. 
spark.dynamicAllocation.cacheExecutorIdleTimeout 600

참고로 Mesos에서도 Coarse-Grained Mode라고해서 Dynamic Resource Allocation을 지원한다.

  • Spark Deployment - Job Server

하나의 sparkcontext(sparksession)을 공유하는 개념이다. 매번 sparkcontext(sparksession)를 띄우고 죽이고 하는 시간이 아까울때 활용할수 있다. sparkcontext(sparksession)로 여러개의 쿼리나 application을 처리할 수 있다.

9

  • spark cluster 구성할때 Hardware sizing

1) Storage : HDFS or Local

2) Local Disks : to store data that doesn’t fit in RAM, intermediate output between stages, 4-8 disks per node

3) Memory : 8 GB to hundreds of gigabytes, allocating only at most 75% of the memory

4) Network: 10 Gigabit or higher network

5) CPU cores : 8-16 cores per machine

CPU가 가장 버틀넥이라고 할 수 있다. RDD 객체를 네트워크로 각 노드가 주고받는걸 보면 serialize, deserialize를 반복한다. 객체를 file이나 메모리와 같은 형태로 저장하거나 binary 형태로 네트워크를 타고 이동하기 위해서 serialize, deserialize를 반복할때 CPU가 주요한 일을 한다.

  • 클라우드에서 spark cluster

EMR은 다른 aws 서비스와의 유기적인 연결이 가능하다.

10

  • 최근에 쿠버네티스 기반으로 spark을 구동할 수 있는 것도 개발됨

11

쿠버네티스에서는 pod라는 개념이 있고, 그 안에서 여러가지 프로세스를 띄울수가 있다. 그리고 shuffle 서비스를 통해서 scaling을 지원을 한다.

12

  • 튜닝관련 디자인 초이스

1) Scala versus Java versus Python versus R

2) DataFrames versus SQL versus Datasets versus RDDs

3) Object Serialization in RDDs( spark.serializer to org.apache.spark.serializer.KryoSerializer)

시리얼라이저를 이용해서 성능개선을 할 수 있다.

4) Dynamic allocation을 활용해서 성능을 개선할 수 있다.

  • Memory Pressure and Garbage Collection

1회성 작업에 대해서는 굳이 가비지 컬렉션을 할 필요는 없지만 long running process는 관리해줄 필요가 있다.

1) Measuring the impact of garbage collection

adding -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps to Spark’s JVM options using the spark.executor.extraJavaOptions configuration parameter

2) Garbage collection tuning

Try the G1GC garbage collector with -XX:+UseG1GC

large executor heap sizes, it can be important to increase the G1 region size with -XX:G1HeapRegionSize

  • Parallelism

아래와 같이 spark config 차원에서 병렬성을 조절할 수도 있고, spark application에서 repartition이나 coalesce로도 조절이 가능하다.

1) spark.default.parallelism

default option : 200

2) spark.sql.shuffle.partitions

default option : 200

  • Temporary Data Storage (Caching)

아래 그림과 같이 raw data를 읽어와서 각각 처리하는 4개의 Dataframe이 존재한다고 가정하자. 이것들이 base dataframe이 될것이다. 이것들을 캐싱해서 각각의 익스큐터 캐시 영역으로 올려버리면 해당 dataframe는 항상 캐시영역을 바라보고 있기 때문에 Raw data를 처음부터 읽지 않는다.

13

캐싱 예제코드

모든 액션이 일어날때 캐시를하면 캐시된 데이터부터 액션이 일어난다. 그래서 캐시를 안하면 DF1, DF2, DF3, DF4를 다 읽을 것이다.

조심해야할 점은 메모리영역이 각각의 익스큐터 영역에서 캐시영역에 들어가는거여서 리소스 관리차원에서 조심해야 한다.

DF1 = spark.read.format("csv").option("inferSchema", "true").option("header", "true").load("/data/flight-data/csv/2015-summary.csv")
DF2 = DF1.groupBy("DEST_COUNTRY_NAME").count().collect()
DF3 = DF1.groupBy("ORIGIN_COUNTRY_NAME").count().collect()
DF4 = DF1.groupBy("count").count().collect()
DF1.cache()
DF1.count()

캐싱 옵션

MEMORY_AND_DISK : spill out 이라고 해서 메모리 영역에 벗어나는 경우에는 디스크에 저장한다.

14

  • cluster 환경에서 조심해야 할 점이 데이터 지역성이다.

Data locality means how close the data is to the code to be processed.

spark dashboard에서 task 들의 지역성이 tab에 나오게 된다. 가장 워스트가 any이다. 이게 뭐냐면 어디있는지는 모르지만 일단 데이터를 찾는 컨셉이다. 프로세스 로컬이 가장 베스트한 경우다. 해당 jvm에 데이터가 포함되어 있는 경우이다.

데이터 지역성은 항상 유심히 봐야하는 정보고 최적화 할 수 있는부분에 대해서는 최적화를 해줘야 한다.

15

  • JVM Memory Management

익스큐터 메모리 설정할때의 옵션이 있다. java heap size는 전체 메모리에서 약 90% 정도 된다. max값,min값,flag를 설정해줄 수 있다. 스토리지 영역이 실제 캐싱되는 부분이다. 60% 정도 된다. 캐시가 다차면 LRU 알고리즘에 의해 오래된 것들은 삭제가 된다. 아래의 옵션들은 모두 사용자가 임의로 바꿀 수 있는 옵션들이다.

16

  • 모니터링

모니터링은 뭘 해야하냐. 일단 클러스터 차원에서는 드라이버와 각각의 익스큐터를 모니터링 해야한다. 머신에 대한 모니터링은 기본적으로 하는 것이고, 네트워크에 대한 모니터링도 해줘야 한다. 결론적으로 중점적으로 봐야할게 jvm 모니터링이다.

spark ui를 통해 기본적인 모니터링이 가능하다.

17

모니터링 대상

1) Spark Applications and Jobs

2) JVM

3) OS/Machine

4) Cluster 상태

  • 트러블슈팅

케이스 1) Spark Jobs Not Starting

spark-submit했는데 아무 반응이 없을때, spark job이 시작이 안될때

Signs and symptoms

1) Spark jobs don’t start.

2) The Spark UI doesn’t show any nodes on the cluster except the driver.

3) The Spark UI seems to be reporting incorrect information.

Potential treatments

1) Ensure that machines can communicate with one another on the ports that you expect.

포트가 중복될때 이런경우가 있으니 확인해볼것

2) Ensure that your Spark resource configurations are correct and that your cluster manager is properly set up for Spark.

리소스를 제대로 할당 받지 못한 경우 spark driver가 안뜨는 경우가 있다.

케이스 2) Slow Tasks or Stragglers

가장 이상적인 경우가 모든 노드에 데이터가 균등하게 저장되어 있고, 각각의 노드가 균등하게 일을 하는 경우다. 근데 예를 들어서 전체 테스크의 80%는 일이 다 끝났는데 20% 정도가 계속 뭔가 일을 하는 경우가 있다. 애러를 내고, 리트라이를 계속하고 이런경우가 있다. 리트라이가 사실상 out of memory 인 경우가 있다. 아니면 특정디스크나 네트워크가 문제가 있는 경우가 있다.

Slow Tasks or Stragglers는 스큐드된 작업을 얘기하는 건데 이럴때는 파티셔닝을 늘려줘야 한다. 왜냐하면 블럭단위로 task를 물고 다 뜨게 되는데 여기서 특정블럭이 데이터가 균등하지 않고, 데이터가 커져있는 상태로 들어가 있다고 하면 그 task는 오래걸릴것이다.

Potential treatments

1) Try increasing the number of partitions to have less data per partition.

2) Try repartitioning by another combination of columns.

3) Try increasing the memory allocated to your executors if possible.

4) Monitor the executor that is having trouble and see if it is the same machine across jobs

케이스 3) OutOfMemoryErrors or garbage collection messages in the driver logs.

불필요하게 드라이버에 부하가 가는 코드들이 있는지 확인해서 제거한다. 예를들어서 collect, count, show는 굳이 필요가 없는 부분들이니까.

spark streaming같이 long running process의 경우 broadcast나 accumulate 사용을 조심해야 한다. 스물스물 메모리가 증가하다가 out of memory가 날 수 있기 때문이다.

Potential treatments

1) Your code might have tried to collect an overly large dataset to the driver node using operations such as collect.

2) You might be using a broadcast join where the data to be broadcast is too big.

3) A long-running application generated a large number of objects on the driver and is unable to release them.

4) Increase the driver’s memory allocation if possible to let it work with more data.

케이스 4) OutOfMemoryErrors or garbage collection messages in the executor logs.

익스큐터에서는 out of memory가 자주 일어나는 편이다.

역시 가장 쉬운 해결방법은 메모리를 늘리는 것이다. 그 다음에 익스큐터를 늘리는 것도 있다.

Executors that crash or become unresponsive.

Potential treatments

1) Try increasing the memory available to executors and the number of executors.

2) Look for garbage collection error messages in the executor logs.

garbage collection 디버그 옵션 켜서 익스큐터 로그에서 가비지 컬렉션 애러를 확인할 수 있다.

3) This is more likely to happen with RDDs or with Datasets because of object instantiations. Try using fewer UDFs and more of Spark’s structured operations when possible.

4) Use Java monitoring tools such as map to get a histogram of heap memory usage on your executors.

5) If executors are being placed on nodes that also have other workloads running on them, such as a key-value store, try to isolate your Spark jobs from other jobs.