티아카데미 아파치 스파크 입문과 활용 TIL - spark core

2020-12-06

.

Data_Engineering_TIL(20201206)

study program : T아카데미 - 아파치 스파크 입문과 활용

** URL : https://tacademy.skplanet.com/frontMain.action

[학습내용]

  • EC2에 spark 커널이 있는 제플린을 구동해서 간단한 spark 코드를 실행해보는 실습

spark을 띄우기 위해서 spark cluster를 구성해야하는데 시간상 제플린 환경으로 실습한다. 제플린에서 spark context를 띄워서 실습해볼 수 있다.

참고로 spark을 설치한다는 것은 spark만 설치하면 끝나는게 아니라 yarn과 Hadoop을 설치를 해주고 연동해줘야 한다.

아래와 같은 spec으로 aws ec2 생성

aws ec2 run-instances --image-id ami-03b42693dc6a7dc35 --count 1 --instance-type t3.medium --key-name pms-seoul-key --security-group-ids sg-xxxxxxxxxxxx --subnet-id subnet-xxxxxxxxxxxxx --associate-public-ip-address --tag-specifications 'ResourceType=instance,Tags=[{Key=Name,Value=pms-spark-test},{Key=owner,Value=pms},{Key=expiry-date,Value=2020-12-04}]' --block-device-mappings 'DeviceName=/dev/xvda,Ebs={VolumeSize=30,DeleteOnTermination=true}'

그런 다음에 해당 ec2로 ssh 접속해서 아래와 같은 명령어 실행

[ec2-user@ip-10-0-1-170 ~]$ sudo yum update -y

[ec2-user@ip-10-0-1-170 ~]$ sudo amazon-linux-extras install docker -y

[ec2-user@ip-10-0-1-170 ~]$ sudo service docker start
Redirecting to /bin/systemctl start docker.service

[ec2-user@ip-10-0-1-170 ~]$ sudo usermod -a -G docker ec2-user

[ec2-user@ip-10-0-1-170 ~]$ exit

터미널 로그아웃 후 다시 ec2로 재접속하고 아래와 같이 명령어를 실행했을때 sudo 명령어를 안줘도 되는지 확인해본다.

[ec2-user@ip-10-0-1-170 ~]$ docker info
Client:
 Debug Mode: false

Server:
 Containers: 0
  Running: 0
  Paused: 0
  Stopped: 0
 Images: 0
 Server Version: 19.03.13-ce
 Storage Driver: overlay2
  Backing Filesystem: xfs
  Supports d_type: true
  Native Overlay Diff: true
 Logging Driver: json-file
 Cgroup Driver: cgroupfs
 Plugins:
  Volume: local
  Network: bridge host ipvlan macvlan null overlay
  Log: awslogs fluentd gcplogs gelf journald json-file local logentries splunk syslog
 Swarm: inactive
 Runtimes: runc
 Default Runtime: runc
 Init Binary: docker-init
 containerd version: c623d1b36f09f8ef6536a057bd658b3aa8632828
 runc version: ff819c7e9184c13b7c2607fe6c30ae19403a7aff
 init version: de40ad0 (expected: fec3683)
 Security Options:
  seccomp
   Profile: default
 Kernel Version: 4.14.193-149.317.amzn2.x86_64
 Operating System: Amazon Linux 2
 OSType: linux
 Architecture: x86_64
 CPUs: 2
 Total Memory: 3.794GiB
 Name: ip-10-0-1-170.ap-northeast-2.compute.internal
 ID: SPCF:Y75R:B22D:BLRW:ELZ5:3O4B:HPDJ:R3JI:DSQM:W74R:TRA2:BXWE
 Docker Root Dir: /var/lib/docker
 Debug Mode: false
 Registry: https://index.docker.io/v1/
 Labels:
 Experimental: false
 Insecure Registries:
  127.0.0.0/8
 Live Restore Enabled: false

아래와 같은 docker run 명령어를 실행해서 제플린 개발환경을 구성한다.

docker run -p 4040:4040 -p 8080:8080 --privileged=true -v $PWD/logs:/logs -v $PWD/notebook:/notebook -e ZEPPELIN_NOTEBOOK_DIR='/notebook' -e ZEPPELIN_LOG_DIR='/logs' -d apache/zeppelin:0.8.1 /zeppelin/bin/zeppelin.sh

[ec2-user@ip-10-0-1-170 ~]$ docker run -p 4040:4040 -p 8080:8080 --privileged=true -v $PWD/logs:/logs -v $PWD/notebook:/notebook \
> -e ZEPPELIN_NOTEBOOK_DIR='/notebook' \
> -e ZEPPELIN_LOG_DIR='/logs' \
> -d apache/zeppelin:0.8.1 \
> /zeppelin/bin/zeppelin.sh
Unable to find image 'apache/zeppelin:0.8.1' locally
0.8.1: Pulling from apache/zeppelin
7b722c1070cd: Pull complete
5fbf74db61f1: Pull complete
ed41cb72e5c9: Pull complete
7ea47a67709e: Pull complete
7ba34fd9f5e0: Pull complete
8f2f09b83582: Pull complete
40260f0a8f69: Pull complete
48946af5572c: Pull complete
8b38acee7e8d: Pull complete
a806f41d7e41: Pull complete
7dcaf396dead: Pull complete
8db355f40e66: Pull complete
Digest: sha256:a3a90ec1579f5171ebac565e739547b885ed75efc1ec7581128ec1033a4496cb
Status: Downloaded newer image for apache/zeppelin:0.8.1
9276b4344e403414c354ec41816abcd6005e637836d31f691319323b672fe279

[ec2-user@ip-10-0-1-170 ~]$ docker ps
CONTAINER ID        IMAGE                   COMMAND                  CREATED             STATUS              PORTS                                            NAMES
9276b4344e40        apache/zeppelin:0.8.1   "/usr/bin/tini -- /z…"   7 seconds ago       Up 5 seconds        0.0.0.0:4040->4040/tcp, 0.0.0.0:8080->8080/tcp   confident_swirles

위와 같이 정상적으로 도커를 실행하고 나면 웹브라우저를 열고 [ec2 public ip]:8080 으로 접속해서 제플린으로 정상 접근가능한지 확인해본다.

접속한 다음에 Create new note을 클릭해서 spark 노트북을 새로 만들어준다. 그런 다음에 아래 그림과 같이 첫번째 블럭에 spark 문자열을 입력후 실행해본다. 실행하면 sparksession이 구동되는 것을 확인할 수 있다.

1

그런 다음에 웹브라우저를 하나 새로열고 [ec2 public ip]:4040를 입력해서 아래 그림과 같이 spark ui에 정상접속하는지 확인해본다.

2

생성한 제플린 노트북에서 아래와 같은 코드로 실습을 해본다.

‘티아카데미 아파치 스파크 입문과 활용 - 데이터 처리 실습 제플린 파일’ 폴더에서 ‘test.json’ 참고할것

%spark
//sparksession driver process,어플리케이션 기준 
spark


%spark
//한개의 컬럼과 1000개의 row(0~999) 생성한 데이터 프레임을 만든다
val myRange = spark.range(1000).toDF("number")


%spark
//transformation - narrow dependency
val divisBy2 = myRange.where("number % 2 = 0")

%spark
//action
divisBy2.count()


%sh
//데이터 다운로드
wget https://raw.githubusercontent.com/databricks/Spark-The-Definitive-Guide/master/data/flight-data/csv/2015-summary.csv


%sh
//데이터 확인
head /zeppelin/2015-summary.csv


%spark
//CSV데이터 데이터 프레임으로 바로 읽기, visit to http://localhost:4040
val flightData2015 = spark.read.option("inferSchema", "true").option("header", "true").csv("/zeppelin/2015-summary.csv")


%spark
//3건만 가져오기
//take같이 action이 발생할때 spark 대시보드에 로그가 남는다.
flightData2015.take(3)


%spark
//실행계획 확인하기
flightData2015.sort("count").explain()


%spark
//count필드로 정렬해서 2 가져오기
flightData2015.sort("count").take(2)


%spark
//SQL을 사용하기 위해 임시 view만들기
// tempview나 temptable은 sparkcontext안에서만 유효하기 때문에 sparkcontext가 종료되면 이것들도 같이 날아간다.
flightData2015.createOrReplaceTempView("flight_data_2015")


%spark
//SQL로DEST_COUNTRY_NAME 기준으로 몇건인지 확인하는 spark sql실행하기
val sqlWay = spark.sql("""
SELECT DEST_COUNTRY_NAME, count(1)
FROM flight_data_2015
GROUP BY DEST_COUNTRY_NAME
""")


%spark
//데이터 프레임 통계 보기
sqlWay.describe().show()



%spark
//데이터 프레임으로 실행계획 확인하기
val dataFrameWay = flightData2015
  .groupBy('DEST_COUNTRY_NAME)
  .count.explain


%spark
//sql과 dataframe실행계획 비교하기           
sqlWay.explain


%spark
//sql질의로 max값 가져와서 take로 한건만 확인하기           
spark.sql("SELECT max(count) from flight_data_2015").take(1)


%spark
//데이터 프레임에서 max함수 이용하여 count의 최대값 가져오기           
import org.apache.spark.sql.functions.max

flightData2015.select(max("count")).take(1)


%spark
//spark sql로 DEST_COUNTRY_NAME 을 집계연산을 수행하여 합계를 구하여 큰 순서대로 5건 확인하기           
val maxSql = spark.sql("""
SELECT DEST_COUNTRY_NAME, sum(count) as destination_total
FROM flight_data_2015
GROUP BY DEST_COUNTRY_NAME
ORDER BY sum(count) DESC
LIMIT 5
""")



%spark
//sql결과 확인하기           
maxSql.explain
maxSql.show()



%spark
//동일한 쿼리를 dataframe으로 구현해 봅니다.  End-to-End example
flightData2015
.groupBy("DEST_COUNTRY_NAME")
.sum("count")
.withColumnRenamed("sum(count)", "destination_total")
.sort(desc("destination_total"))
.limit(5)
.explain


%spark
flightData2015
.groupBy("DEST_COUNTRY_NAME")
.sum("count")
.withColumnRenamed("sum(count)", "destination_total")
.sort(desc("destination_total"))
.limit(5)
.show()


%spark
//임의의 값을 가지고 데이터 프레임 생성하기
val data = Seq(("Banana",1000,"USA"), ("Carrots",1500,"USA"), ("Beans",1600,"USA"),
      ("Orange",2000,"USA"),("Orange",2000,"USA"),("Banana",400,"China"),
      ("Carrots",1200,"China"),("Beans",1500,"China"),("Orange",4000,"China"),
      ("Banana",2000,"Canada"),("Carrots",2000,"Canada"),("Beans",2000,"Mexico"))

import spark.sqlContext.implicits._
val df = data.toDF("Product","Amount","Country")
df.show()

%spark
//각 제품의 각 국가로 수출 된 총 금액을 얻으려면 제품 별 그룹화, 국가 별 피봇 팅 및 금액 합계를 얻어온다
val pivotDF = df.groupBy("Product").pivot("Country").sum("Amount")
pivotDF.show()

%spark
//spark2.0에서 성능향상을 위해 pivot대상열을 선언함 
val countries = Seq("USA","China","Canada","Mexico")
val pivotDF = df.groupBy("Product").pivot("Country", countries).sum("Amount")
pivotDF.show()

%spark
//성능 향상을 위해 두단계 집계를 사용함 
val pivotDF = df.groupBy("Product","Country")
      .sum("Amount")
      .groupBy("Product")
      .pivot("Country")
      .sum("sum(Amount)")
pivotDF.show()
           
%spark
//stack기능을 이용하여 주요 국가에 대해 unpivot을 수행
val unPivotDF = pivotDF.select($"Product",
expr("stack(4, 'Canada', Canada, 'China', China, 'Mexico', Mexico, 'USA', USA) as (Country,Total)"))
.where("Total is not null")
unPivotDF.show()
           
  
//%sql로 하면 sparksql 커널로 잡히게 된다.
%sql
SELECT DEST_COUNTRY_NAME, sum(count) as destination_total
FROM flight_data_2015
GROUP BY DEST_COUNTRY_NAME
ORDER BY sum(count) DESC
LIMIT 5
  • 참고사항 : sparksession과 sparkcontext

1) 둘의 차이점 결론

spark.sparkContext = RDD 생성시 사용하는 API 객체로 application 전체의 실행 관련 정보 집약 객체로 스케줄러 등이 포함된 개념이다.

spark.sparkSession = DataFrame 생성시 사용하는 API 객체로 sparkcontext에 세션정보가 추가로 포함된 개념이다.

RDD를 생성하려면 SparkContext 객체를 생성하고, DataFrame 또는 DataSet을 생성하려면 SparkSession 객체를 생성해야 한다. 그런데 sparksession 객체 안에는 sparkcontext 객체가 포함돼 있기 때문에 RDD를 만들때나 데이터프레임을 만들때나 상관없이 sparksession 객체를 생성하면 된다.

2) SparkSession

Spark2.0 이후부터는 SparkSession을 사용해서 Spark Dataset, DataFrame API를 사용할 수 있다. 또한 SparkContext에서 사용 가능한 모든 기능은 SparkSession에서도 사용이 가능하다.

3) sparkcontext

SparkSession를 이용해서 SparkContext()를 사용하고 싶다면 SparkSession에서 sparkContext() 메서드를 사용하면 SparkContext를 사용할 수 있다.SparkConext는 Spark 클러스터에 대한 연결을 나타내며 해당 클러스터에서 RDD, broadcast, accumulator 등의 변수를 사용하여 사용이 가능하다. JVM 당 하나의 SparkContext만 활성화 할 수 있다.

  • RDD란

1) RDDs (Resilient Distributed Datasets) is Data Containers

2) All the different processing components in Spark share the same abstraction called RDD

spark의 모든 프로세싱은 RDD 기반으로 모든 데이터를 share한다고 이해하면 된다.

3) As applications share the RDD abstraction, you can mix different kind of transformations to create new RDDs

4) Created by parallelizing a collection or reading a file

5) Fault tolerant

  • RDD 개념이해를 위한 예시 : log mining

HDFS에 로그가 쌓여있다고 가정하자. 데이터를 line by line으로 읽는데 ‘ERROR’가 포함된 문자열을 detect하고 싶은것이다.

‘ERROR’가 포함된 RDD를 따로 만들고, 거기에서 map작업을 하는데 ‘\t’ split했을때 두번째 문자열을 message 변수로 가져오고 그거를 캐싱을 해서 거기에서 foo가 몇개이고 bar가 몇개인지 filter해서 count한다.

RDD가 변환시킬때마다 transformation이 일어난다. 변환될때마다 transformed RDD가 계속 생성된다. 그리고 그거를 count를 해서 action을 하게되면 driver로 이벤트가 날아가게 된다.

3

위에 그림에서 driver는 자바에서 메인함수를 가지는 어플리케이션이라고 할 수 있다. driver에 code가 submit이 되면 클러스터안에 워커한테 task들이 분배가 될것이다. 그리고 cache를 위에서 사용했는데 worker의 메모리 영역에서 일부영역을 캐시영역으로 활용하게 된다. 그리고 spark의 task들이 완료되면 결과값을 driver로 던져주게 된다.

  • RDD fault tolerance가 뭐냐

HDFS는 매번 읽고 쓰고를 하는데 읽고 쓰고 하는 만약에 iteration한 job이 있는데 이게 중간에 갑자기 문제가 생겨서 다시 이 job을 재개한다면 mapreduce는 다시 디스크를 읽어서 중간산출물을 다시 읽게 된다. 그런다음에 다시 computing을 하게 된다. 그러면 spark 같이 인메모리 구조에서는 이런 recomputing을 어떻게 하냐. RDD는 리니지 트레킹이라는 개념이 있다. 아래 그림과 같이 데이터가 변형이 가해질때마다 RDD가 새로 생성이 된다. 그래서 연산중간에 문제가 생기면 바로이전 단계 RDD를 트레킹해서 리트라이 할 수 있도록 한다.

“RDDs track the transformations used to build them (their lineage) to recompute lost data”

4

  • scala vs java

spark은 jvm 기반이기 때문에 자바 스칼라 모두 가능하고, 스칼라 언어의 간결성과 함수형 프로그래밍의 장점을 살리고자 주로 스칼라를 spark에서는 사용한다. spark-submit하고 spark job의 프로세싱은 jvm 위에서 모두 실행이 된다.

5

  • 참고로 spark 데이터프레임만 사용한다면 java, scala, python 언어의 성능차이는 최근에는 거의 없다고 할 수 있다.

  • scala cheat sheet

//variables
var x: Int = 7
    
var x = 7 
// type inferred

val y = "hi"
// read-only

// Functions
def square(x: Int): Int = x*x
def square(x: Int): Int = {
    x*x // last line returned
}
    
// Collections and closures
val nums = Array(1, 2, 3)
nums.map((x: Int) => x + 2) 
// => Array(3, 4,5)

nums.map(x => x + 2) 
// => same

nums.map(_ + 2) 
// => same

nums.reduce((x, y) => x + y) 
// => 6

nums.reduce(_ + _) 
// => 6
  • RDD operations

spark은 최적의 DAG를 action 시점에서 찾기 위해서 lazy operation을 한다. ** DAG : 특정 job이 돌기위해서 실행되는 프로세스

사용자가 이것저것 transformation을 하더라도 실제로 action단계에서 transformation의 실행계획이 최적화 되는것이다.

1) Transformations (e.g. map, filter, groupBy,join)

Lazy operations to build RDDs from other RDDs

2) Actions (e.g. count, collect, save)

Return a result or write it to storage

6

action이 수행되고 결과값은 driver로 return하거나 특정 스토리지에 저장할 수 있다.

  • RDD와 Dataframe

RDDs provide a low level interface into Spark

DataFrames have a schema

DataFrames are cached and optimized by Spark

DataFrames are built on top of the RDDs and the core Spark API

7

  • spark Dataframe 샘플코드

코드가 상당히 직관적이고 쉽다.

// Create a new DataFrame that contains students
students = users.filter(users.age < 21)

//Alternatively, using Pandas-like syntax
students = users[users.age < 21]

//Count the number of students users by gender
students.groupBy("gender").count()

// Join young students with another DataFrame called logs
students.join(logs, logs.userId == users.userId,"left_outer")

RDD와 Dataframe 샘플코드 비교

RDD같은 경우에는 \t으로 split을 한다음에 몇번째 array의 무슨값인지 다 지정을 해줘야 하는데 Dataframe은 스키마를 갖고 있기 때문에 Dataframe은 그런식으로 지정을 해줄 필요가 없다.

1) RDD

data = sc.textFile(...).split("\t")
data.map(lambda x: (x[0], [int(x[1]), 1]))
.reduceByKey(lambda x, y: [x[0] + y[0], x[1] + y[1]])
.map(lambda x: [x[0], x[1][0] / x[1][1]])
.collect()

2) DataFrame

sqlCtx.table("people").groupBy("name").agg("name",avg("age")).collect()
  • Directed Acyclic Graphs (DAG)

단방향을 가지는 Acyclic한 그래프를 말한다.

8

DAGs track dependencies (also known as Lineage )

각각의 node들은 RDD가 이전 RDD에서 transformed 된 형태이다.

베이스 RDD는 a와 s이고 이것들이 transformed 되어 변형이 계속 이루어지는 형태이다. 만약에 C RDD에서 연산이 일어나다가 문제가 발생하면 다시 B RDD부터 연산을 재시도할 것이다.

각각의 화살표들은 Transformations을 의미한다.

  • DAG of a Job

실제로 spark action이 실행되면 spark 대시보드에 job이 생성된다. 아래는 spark DAG의 예시다. n개의 stage로 나뉘어지고 n개의 각각의 stage에서 n개의 task로 구성이된다. action이 하나 날아가면 spark job은 하나의 action을 포함하게 되고, 그 job에는 n개의 stage가 있고, 각각의 stage안에는 n개의 task로 구성이 된다.

그러면 stage를 구성을 어떻게 하냐면 operation 종류에 따라서 stage가 구성이된다. 무슨말이냐면 transformation은 같은 stage로 구성이 된다는 것이다. 생각을 해보면 map 단계에서는 shuffling이 필요가 없다. 그래서 같은 stage에서 transformation이 묶이게 된다. 그리고 reduce하거나 shuffle해야 하거나 다른 것들과 join해야 하면 다른 stage로 구성이 된다.

아래 그림에서 stage 1은 6개의 task로 구성이 되어 있다. file 하나를 읽고 split을 하고 tokenizing하는 작업인데 얘네들은 transformation하는 작업이기 때문에 하나의 stage로 묶이게 된 것이다.

stage 2는 다른 또 새로운 파일을 읽는 것이기 때문에 별도의 stage로 구성이 된 것이고, 그래서 stage 1과 stage 2의 결과를 갖고 stage 3에서 shuffling을 한다. inner join을 하는데 inner join을 한다는 것은 키별로 전부 sorting을 한 다음에 거기서 key를 매핑하는 것이다. final stage에서는 inner join한 것들을 write file하게 된다.

그러면 task는 어떤 기준으로 task가 나뉘어 지냐면 executor가 operation할때 core 갯수와 관계가 있다. 만약에 나의 spark cluster에 가용 core가 100개라면 100개의 task까지 생성이 가능한 것이다. 만약에 나의 spark cluster에 가용 core가 15개라면 15개의 task까지 생성이 가능한 것이다.

9

  • Transformation

spark transformation에는 narrow와 wide가 있는데 narrow는 map 연산(예를 들어서 \t으로 쪼개서 tokening 하는 연산)이라고 생각하면 되고, wide는 shuffling되는 것으로 이해하면 된다. 아래 그림에서는 key를 갖고 그룹핑 한 것이다. 그룹핑하기 위해서는 먼저 전체적으로 정렬이 되야 하고 키를 기반으로 다시 분배가 되야 하기 때문에 shuffling이 발생한다. 그래서 wide transformation시에는 network io도 발생한다.

10

  • Actions

The final stage of the workflow

Triggers the execution of the DAG

Returns the results to the driver Or writes the data to HDFS or to a file

아래 그림과 같이 어떤 spark job이 여러 작업을 하고 group by key로 transformation한 다음에 driver로 연산결과를 모은다(collect). 아래 그림에서 action은 collect를 말하는 것이다.

11

  • Python RDD example

1) word count

text_file = sc.textFile("hdfs://usr/godil/text/book.txt")
counts = text_file.flatMap(lambda line: line.split(" ")).map(lambda word: (word, 1)).reduceByKey(lambda a, b: a + b)
counts.saveAsTextFile("hdfs://usr/godil/output/wordCount.txt")

2) Logistic Regression

# Every record of this DataFrame contains the label and features represented by a vector.
df = sqlContext.createDataFrame(data, ["label", "features"])

# Set parameters for the algorithm.
# Here, we limit the number of iterations to 10.
lr = LogisticRegression(maxIter=10)

# Fit the model to the data.
model = lr.fit(df)

# Given a dataset, predict each point's label, and show the results.
model.transform(df).show()
  • RDD Persistence and Removal

RDD Persistence가 RDD 캐시를 말하는 것이다.

1) RDD Persistence

RDD.persist() 로 가능

Storage level: MEMORY_ONLY(디폴트옵션), MEMORY_AND_DISK, MEMORY_ONLY_SER, DISK_ONLY, …

MEMORY_ONLY_SER에서 SER은 뭐냐면 spark의 객체는 내부적으로 serialize하고 deserialize하는 연산을 하게 되는데 그때 serialize된 객체 자체를 저장하기 때문에 실제로는 메모리 용량은 줄일 수 있지만 얘를 또 deserialize하기 위한 오버헤드가 증가할 수 있는 옵션이다.

2) RDD Removal

RDD.unpersist()를 해주면 됨

  • Broadcast Variables and Accumulators (Shared Variables )

spark 클러스터 차원으로 인메모리에 어떤 값을 정의해놓고 참조해서 쓸수 있는 방법은 두가지가 있다. Broadcast Variables을 사용하는 방법이 있는데 driver에서 각각의 executor들로 데이터를 다 로드해서 전부 갖고 있는 것이고, Accumulators는 이벤트를 받아서 driver에서 그 값들을 들고 있는 것이다.

spark streaming에서 shared variables을 조심해서 써야한다. 왜냐하면 spark streaming은 일반적으로 long running process이기 때문에 메모리 이슈가 발생할 수 있기 때문이다.

12

Broadcast variables allow the programmer to keep a read-only variable cached on each node, rather than sending a copy of it with tasks

sc.broadcast해서 선언을 하면 그 값들을 각각의 executor들이 가져올 수 있는 것이다. 브로드캐스트는 매우 큰데이터를 올려서 매번 join해서 연산할때 이거를 메모리형태로 올리고 map side join을 하겠다는 의도로 쓰는게 예제 케이스이다.

프로그래밍 코드 ex)

broadcastV1 = sc.broadcast([1, 2, 3, 4, 5, 6])

broadcastV1.value

[1,2,3,4,5,6]

Accumulators are variables that are only “added” to through an associative operation and can be efficiently supported in parallel

sc.accumulator를 정의하고 accum.add()하면 그 값들을 driver가 참조하는 것이다. accumulator를 사용하는 예제 케이스가 디버깅 용도로 사용할 수 있다. accumulator는 mapreduce에서 디버깅 용도로 쓰는 counter와 같은 개념으로 이해하면 된다.

프로그래밍 코드 ex)

accum = sc.accumulator(0)

accum.add(x)

accum.value

  • 제플린에서 아래와 같은 코드를 실행해본다.
%sh
//zeppelin로그를 확인합니다.
ls -al /logs


%spark
//특정 파일을 읽어 10개의 라인을 가져옵니다.
val logs = sc.textFile("/logs/zeppelin--4c4bf3f64278.log")
logs.take(10)



%spark
//전체 문자길이와 개수  평균등을 구해봅니다.
val lengths = logs.map(str => str.length )
val totalLength = lengths.reduce( (acc, newValue) => acc + newValue )
val count = lengths.count()
val average = totalLength.toDouble / count

%spark
// line의 사이즈를 구한다
val log = logs.map((log) => log.size )
log.collect().foreach(println)


%spark
// 라인의 길이가 200이상인 것만 필터링
val filter = logs.filter( (log) => log.size > 200)


%spark
//출력하여 다음과 같이 확인한다
filter.collect().foreach(println)
  • RDD repartition

블락들이 여러개 있고, 각각의 노드에 블락이 저장되어 있다. 각 블락에 저장된 메세지가 아래와 같이 있다. 그래서 base rdd가 input이 된다. 메세지들은 어떤건 error 메세지고, 어떤건 info 메세지인데 여기서 나는 람다를 이용해서 Error가 포함된 메세지만 가져와서 RDD를 생성한다. Error가 포함된 메세지만 가져온 RDD를 생성하고, coalesce를 사용한다. 두번째 task에는 해당 내용이 한건도 없고, 첫번째 task는 두건, 세번째 task는 한건, 네번째 task는 4건이 있는 것이다. 여기서 parallelism을 조정할 수 있다. 다시말해서 RDD의 partition도 조절할 수 있다. 원래 errorsRDD의 partition은 4개인데 2개로 줄인것이다. partition을 조정할 수 있는 방법은 두가지이다. repartion과 coalesce 두가지가 있다. repartition은 전부 sort를 해서 전체를 shuffling 시킨다. 반면에 coalesce는 shuffling하지 않고, 현재 상태에서 partition을 조정한다. 여기서는 4개의 파티션을 두개로 죽인것이다.

실제 사례에서도 어떤 task에는 데이터가 몇백만건이 들어가 있고, 어떤 task에는 3건 들어가 있고 이런경우가 있다. 그렇게 되면 아무리 병렬처리를 해도 몇백만건 들어가 있는 RDD 연산이 안끝나면 전체가 기다려야 하는 비효율적인 상황이 발생한다.

13

  • RDD cache의 필요성

errorsRDD를 s3에 저장한다거나, 전체를 count한다거나, 이거를 또 가공을해서 처리를 한다고 하는 등의 재활용도 있는 경우가 있을것이다. errorsRDD를 캐싱하지 않는다면 매번 다른형태의 action을 할때마다 원천데이터를 가져오는 base RDD 맨처음부터 연산을 다시할 것이다.

14

  • spark 대시보드 보는법

예를들어서 describe라는 action이 일어나면 대시보드에 로그가 생긴다. 그러면 해당 job이 submit되는 것이다. 해당 job을 (describe을) 클릭해서 들어가면 n개의 stage로 구성이 된 것을 확인할 수 있다. 여기서는 3개의 stage가 구성이 된 것을 확인할 수 있다. DAG visualization도 확인할 수 있다. 그래서 거기서 특정 stage를 클릭해서 들어가면 각각의 stage 정보를 볼 수 있다. shuffled로 RDD가 생성이 되었고, 그거를 다시 sort aggregate해서 mappartitionRDD가 생성이 되는 형태를 확인할 수 있다. 그리고 이 stage를 구성하는 task를 볼 수 있다.

현재 실습용 제플린 도커를 쓰고 있기 때문에 driver 밖에는 없다. 아무리 연산해도 task가 하나만 생성된다. 실제 spark cluster를 사용한다면 n개의 task들이 쭉 뜰것이다. 여기서 체크를 해줘야 할게 각각의 task 별로 gc time이라던지 데이터가 스큐되었는지도 확인해야한다. 특정 task에 부하(많은 record)가 확 몰려있는 경우가 있다. 이거는 블락 size가 언발란스한 것도 있는등의 이유가 있을것이다.

참고로 만약에 내가 persist를 이용해서 rdd를 캐싱하였다면. 아래 그림에서 storage 메뉴에 캐싱한게 표시가 될 것이다. 캐시 영역은 각각의 executor 들에서 일정 메모리영역을 점유하는 것이기 때문에 spark에서 자동으로 캐싱을 해주는 것은 아니다. 명시적으로 persist를 이용해서 해줘야 한다.

15

  • Spark Job 용어

1) job : The work required to compute an RDD

2) stage : A wave of work within a job, corresponding to one or more pipelined RDD’s

pipelined RDD가 subset으로 존재하는 개념

3) task : A unit of work within a stage, corresponding to one RDD partition

각각의 stage는 그 안에 task unit을 갖는다.

4) shuffle : The transfer of data between stages

  • Structured API Execution

Datasets, DataFrames, SQL tables and views에 대한 Structured API Execution은 아래와 같다.

카탈리스트 옵티마이저라는게 있다. spark Dataframe을 연산할때 특정 프로그래밍 언어가 아니라 모든 언어가 동일한 성능을 보이는데 이런 성능개선을 제공하는 것이 카탈리스트 옵티마이저이다. 그래서 Structured API를 실행하면 실제 물리 실행계획(DAG)을 관장해서 DAG가 어떻게 수행이 될지 최적화해준다.

16

  • 카탈리스트 옵티마이저가 수행하는 플래닝 단계

step 1) Logical Planning

카탈리스트 옵티마이저가 수행하는 플래닝 단계이다. 카탈로그라고 하는 데이터의 메타정보를 통해서 논리적으로 최적화를 한다.

17

step 2) Physical Planning

위에서 도출한 optimized logical plan를 갖고 cost model을 최적화한다. 그래서 최종적으로 실행하고자 하는 spark code가 클러스터 안에서 최적의 DAG를 만들게 된다. 그래서 java던 scala던 python이던 spark dataframe을 사용하면 이런 최적화를 통해서 특정 언어와 상관없이 동등한 성능을 가져갈 수 있다.

18

  • Structured Operation 예시

예시 1)

spark.read.json("/zeppelin/2015-summary.json").createOrReplaceTempView("some_sql_view") # DF => SQL
spark.sql("""
SELECT DEST_COUNTRY_NAME, sum(count)
FROM some_sql_view GROUP BY DEST_COUNTRY_NAME
""").where("DEST_COUNTRY_NAME like 'S%'").where("`sum(count)` > 10").count() # SQL => DF

예시2)

users = context.jdbc("jdbc:postgresql:production","users")
logs = context.load("/path/to/traffic.log")
logs.join(users, logs.userId == users.userId,"left_outer").groupBy("userId").agg({"*": "count"})
  • Catalyst Optimizer

Structured API가 동일한 Catalyst Optimizer를 쓰기 때문에 jdbc로 연결하던 odbc로 연결하던 어떤 프로그래밍 언어를 사용하던 동일한 성능을 내는 것이다.

19

Analyzing a logical plan to resolve references

Logical plan optimization

Physical planning

Code generation to compile the parts of the query to Java bytecode

  • Plan Optimization & Execution

20

  • Optimization 예시

쿼리가 인풋으로 들어오게 되면 쿼리에 대한 논리적인 플랜을 짤것이다. 근데 쿼리를 할때 데이터가 엄청 큰데 데이터를 전체를 읽고 전체를 읽어서 join하면 매우 비효율적일 것이다. 실제로 물리계획에서는 user 테이블에 조건에 맞는 user를 스캔하고, 그거를 필터링하고 이벤트에 조건에 맞는 데이터를 스캔해서 조인하겠다는 것이다. 그래서 실제 조인할때는 전체 데이터가 아니라 일부 필요한 데이터만 가져와서 조인하겠다는 것이다.

21

그리고 추가적으로 Catalyst Optimizer를 통해서 파티션된 테이블 정보를 읽고, 거기서 유저 아이디를 조인하고 다시 함수를 통해서 우편번호를 넣는다. 얘를 paruqet format으로 load해서 join하는 단계에서 optimized scan을 통해서 partition된 데이터 조건에 맞게 얘네들을 일부만 스캔해서 가져온다.

하둡사상에서 가장 안좋은 케이스가 파티션을 제대로 지정하지 않고 데이터를 full scan하는 것이다. 하둡은 디비가 아니다. 따라서 partition key에 맞는 일부 데이터만 읽도록 해야한다.

사용자 선에서는 spark dataframe api만 사용하면 Catalyst Optimizer가 이런 최적화 작업을 알아서 해주기 때문에 이런고민은 안해도 된다.

간혹가다가 하이브에서는 잘 도는데 spark에서는 잘 안도는 경우가 있다. 이런 경우에는 실행계획을 잘 확인해서 조치해줘야 한다. 실제 그런 사례가 있었는데 이때 optimizer를 끄도록 했다. Catalyst Optimizer로 최적화 하는게 경우에 따라서는 적절하지 않을 수도 있다.

22

  • spark dataframe을 이용한 Window Functions 적용예시

23

product, category, revenue 테이블이 있는데 이거를 category 기준으로 revenue를 sum하는 것이다. 거기서 가장 높은 두개만 list up 하고 싶다는 의도이다. 여기서 window funtion을 쓸 수 있다.

SELECT product, category, revenue
FROM ( SELECT product, category, revenue, dense_rank() 
       OVER (PARTITION BY category 
             ORDER BY revenue DESC) as rank FROM productRevenue) tmp
WHERE rank <= 2
  • spark dataframe을 이용한 pivot 예시

24

# Pandas
pivot_table(df, values='D', index=['A', 'B'], columns=['C'], aggfunc=np.sum)
reshape2 : dcast(df, A + B ~ C, sum)

# DF
df.groupBy(A", "B").pivot("C").sum("D")
  • spark Dataframe으로 할 수 있는 것들

25

  • Spark SQL

spark에서 사용할 수 있는 sql interface이다. ANSI SQL 기반이다. sparksql에서 작성한 sql query도 Catalyst Optimizer가 최적화를 해준다. 물론 쿼리 자체를 잘못짜면 성능이 떨어지는건 당연하다. 예를 들어서 파티션키가 존재함에도 불구하고 파티션 키를 지정하지 않아서 전체를 풀스캔하는 쿼리가 있다.

SQL or Structured Query Language is a domainspecific language for expressing relational operations over data.

Spark implements a subset of ANSI SQL:2003. This SQL standard is one that is available in the majority of SQL databases and this support means that Spark successfully runs the popular benchmark TPC-DS.

  • Dataset

Dataset의 row기반이 dataframe이다. 데이터프레임은 데이터셋의 서브셋이라고 보면된다.

데이터셋은 자바와 스칼라만 사용할 수 있다. jvm기반의 언어만 제공한다. 스칼라에서는 스키마가 정의된 case class 객체로 데이터셋을 정의하게 된다.

26

데이터셋은 스키마에 명시된 데이터 타입의 일치여부를 compile time에 실행한다. 그래서 jvm 기반의 언어 특성을 살린것이라고 보면 된다.

그래서 Error를 캐치할때 dataframe이나 SQL보다 빠르다. 그렇다고해서 Dataset을 쓴다고해서 성능상의 이점이 있는 것은 아니다. 프로그래밍 코드를 좀 더 방어적이고 고도화(Type 안정성 증대)해서 짤 수 있다는 장점이 있는 것이다.

Type-safe: operate on domain objects with compiled lambda functions

DataFrame = Dataset[ROW]

스칼라코드 예시

val df = spark.read.json("people.json")

// Convert data to domain objects.
case class Person(name: String, age: Int)
val ds: Dataset[Person] = df.as[Person]
ds.filter(_.age > 30)

// Compute histogram of age by name.
val hist = ds.groupBy(_.name).mapGroups {
case (name, people: Iter[Person]) => val buckets = new Array[Int](10)
people.map(_.age).foreach { a => buckets(a / 10) += 1} (name, buckets) }

데이터셋에는 인코더라고해서 sparksql의 io를 최적화할 수 있는데 실제로 이부분이 상당히 성능개선에 크지는 않다.

27

  • RDD와 Dataset과는 무슨차이가 있냐

별도의 엔코더를 만들어서 적용하면 성능향상이 있을것으로 판단될때, type safe한 데이터를 처리하고 싶을때, Dataframe에서는 구현이 안되는 custom한 기능을 추가하고 싶을때 dataset을 쓴다.

28