Spark core & RDD 개념 TIL - Youtube 'min zzang' 님 영상자료

2020-10-11

.

Data_Engineering_TIL(20201011)

  • Youtube 채널 ‘min zzang’ 님의 ‘아파치 스파크 개념 설명’ 을 공부한 내용입니다.

** URL : https://www.youtube.com/watch?v=D3TLh_QVGPg

[학습내용]

  • spark이 뭐냐

스팍 홈페이지에 가면 “Lightning-fast unified analytics engine” 으로 소개가 되어 있다.

스팍은 분석엔진 툴로 사용자는 스팍이라는 통합된 분석엔진을 가지고 데이터 분석이나 ETL을 할 수 있다. 스팍의 특징은 스팍 홈페이지에서 소개한 대로 빠르다는 것이다.

  • 그러면 스팍은 얼마나 빠르냐

하둡과 비교했을때 100배이상 빠르다고 한다.

하둡보다 그렇게 빠른 이유는 두가지에 근거할 수 있다.

첫번째 스팍은 in-memory 연산엔진이다.

하둡은 하드디스크를 기본으로 연산이 되기 때문에 인메모리 구조의 스팍보다 많은 속도차이를 보일 수 밖에 없다.

두번째 대규모 분산병렬처리가 가능하다.

  • 그리고 스팍은 사용하기 쉽다.

스팍 어플리케이션을 개발하여 적용하기 쉽도록 자바, 스칼라, 파이썬 등 다양한 프로그래밍 언어를 지원한다.

따라서 개발자들이 사용하기 쉽다고 할 수 있다.

스팍에서 사용하는 언어중에 대표주자는 스칼라 언어이다.

  • spark process

image

스팍 코어를 중심으로 그 위에 스팍 sql, streaming, ml 까지 다양한 기능을 지원한다.

ml 같은 경우에는 머신러닝 관련 라이브러리를 스팍 자체적으로 다양하게 지원하려고 하는데 추가적으로 나는 케라스나 텐서플로우 같은 프레임워크를 사용하고 싶다면 스팍 코어위에 텐서플로우를 설치를해서 사용할 수도 있다.

스팍 코어 아래에 standalone scheduler, yarn, mesos 세가지가 보이는데 이것들은 리소스 매니저다. 스팍은 클러스터 구조이기 때문에 리소스매니저는 클러스터 차원의 전체적인 자원을 관리한다. standalone scheduler은 yarn이나 mesos 같은 리소스 매니저를 따로 설치하지 않고 스팍이 자체적으로 갖고 있는 리소스 매니저를 사용하는 것이다. standalone scheduler를 리소스 매니저로 활용할 수는 있지만 여러대의 노드를 관리해야 한다면 일반적으로는 Yarn을 많이 쓴다. Yarn은 하둡 애코시스템에 있는 리소스매니저인데 스팍에서도 리소스매니저 역할을 수행할 수 있다.

  • 그러면 클러스터는 뭐냐

여러대의 컴퓨터를 하나의 자원단위로 쓰는 것을 말한다. 일반적으로 마스터 슬레이브 구조이다.

  • 스팍 클러스터 아키텍처

image

좌측에 Driver program이 마스터의 역할을 하고, 워커노드들이 슬레이브 역할을 하는 것이다.

스팍은 드라이버 프로세스와 워커노드 내에 다수의 executor 프로세스로 구성되어 있다.

SparkContext에서 main 함수가 실행이 되고, spark application의 정보를 유지하거나 관리한다. 또한 전반적인 executor 프로세스 작업에 관련된 스케쥴링 역할을 한다.

그렇다면 화면중간에 Cluster manager는 클러스터의 리소스를 매니징하는 역할을 한다. 중간에서 SparkContextexecutor와 통신을 하고 있다. 또한 참고로 executor 간에도 서로 통신을 하는 것을 알 수 있다. standalone scheduler, yarn, mesos가 Cluster manager라고 보면된다.

또한 모든 spark cluster는 JVM(Java Virtual Machine) 위에서 운영된다.

  • 만약에 물리적인 서버 한대에 standalone 모드로 spark cluster를 구성한다고 가정하면 아래와 같은 아키텍처이다.

image

1대의 Centos 서버위에서 스팍 클러스터를 구성한다고 하면 어떻게 해야할까. 한대의 물리적인 머신위에 3개의 쓰레드 프로세스가 실행되는 것이다. executor마다 cpu 코어가 3개이고, 메모리를 갖고 있다. standalone cluster 중간에서 manager가 executor의 실행을 위해서 리소스를 할당한다.

  • 스파크 완전분산모드에서는 다음과 같은 아키텍처이다.

image

논리적인 머신이 1대가 아니라 4대인 경우를 살펴보자. 마스터노드가 물리적으로 1대가 있을 것이고, executor process가 물리적인 서버 한대에서 실행되는 것을 알 수 있다.

물리적인 머신의 배치가 달라질뿐 spark cluster 아키텍처는 standalone이나 완전분산모드나 거의 같은것을 알수있다.

Yarn은 별도로 설치해서 구성을 해줘야 한다. 만약에서 물리적인 서버 한대에서 spark cluster를 구성해야 한다면 yarn을 따로 설치해서 사용할수도 있지만 보통은 스팍에서 자체 제공하는 standalone scheduler를 사용하는 편이다. 그러나 실제 여러대의 서버로 클러스터를 구성하는 경우 yarn이나 mesos를 설치해서 사용해야 한다.

HDFS는 스팍 클러스터에서 공통으로 사용하는 파일시스템이다.

  • 스팍 실행 프로세스

image

위에 그림은 스팍의 프로세스가 내부엔진에서 어떻게 실행되는지에 대한 그림이다.

개발자가 스팍 어플리케이션에서 main 함수를 만든다고 했다. 그리고 spark application이 실행이 되면 spark context 객체가 생성이 된다.

스팍은 action 과 관련된 함수를 사용할때만 실제 엔진이 구동되는데 action(예를 들어서 show, count 같은 연산)이 아닌 연산들은 어떻게 연산을해서 처리할건지 스팍엔진이 계획만 짜두게 된다(= lazy process). 그래서 action 연산에 도달하기 전까지는 실행을 하지 않다고 action 연산에 도달했을때 비로소 job을 실행한다.

job이 실행이되면 stage들이 보인다. 중간에 skip된 것이라고 표시된 것도 있는데 이것은 job이라는 1번 ~ 35번까지 일을 해야지 개발자가 원하는 일련의 데이터 처리 과정이라고 할때 중간에 31번이나 32번 스테이지는 한두개 과정을 스킵하고 가도 무방하다고 스팍이 자체적으로 판단을 한 것이다.

job에 있는 전체적인 계획을 transformation(넓게 쪼개다) 한게 stage고 stage 내부에서 진짜로 연산하는 것이 task 이다.

  • spark 에서 configuration mode : cluster mode vs client mode

image

image

스팍이 운영되는 형태는 cluster 모드나 client 모드나 같은데 spark driver가 외부에서 실행되느냐 아니면 내부에 있는 마스터에서 실행되느냐의 차이다. 스팍을 운영하는 관점에서는 큰차이는 없다고 보면된다.

  • spark & yarn 필수 환경설정

image

spark와 yarn을 설정하고 어떤 config 파일들을 필수적으로 설정을 해줘야 올바르게 클러스터를 돌릴 수 있는가

클러스터 운영의도에 따라 여러 옵션이 있을 수 있다.

yarn 관련설정은 아래와 같다. (일반적으로 hadoop/etc/hadoop 경로)

1) yarn-site.xml : yarn 리소스 매니저 설정파일

2) hdfs-site.xml : HDFS 설정파일

3) core-site.xml : 보안이나 고가용성등 다양한 옵션적용이 가능한 파일

4) mapred-site.xml : mapreduce 관련설정

spark 관련설정은 일반적으로 spark/conf/spark-env.sh에서 해준다.

  • spark 설치시 필요한 것들

spark, java, yarn(설치 및 설정도 필요) … 기타 등등

  • spark API

image

RDD : spark의 가장 기본 API, 저수준 API

spark에서는 데이터프레임과 데이터셋을 많이 쓰지만 내부적으로는 결국 전부 RDD 형태로 운영된다.

따라서 Dataframe과 Dataset을 고수준 API라고 부른다.

그러면 spark에서 RDD를 spark에서 잘 쓰고 있었는데 데이터프레임과 데이터셋을 왜 도입한거냐

spark는 인메모리로 하나의 서버가 아니라 여러대의 서버에서 병렬처리를 하는 목적으로 만든것이 RDD이다. 그러나 RDD로 테이블 조인이나 효율화를 할때는 사용자가 직접 구현해서 RDD에 적용을 해줘야 하는 아쉬움이 있었다. 따라서 RDD의 내부 원리를 정확하게 이해하는 사람들만이 spark를 잘 쓸수 밖에 없었다. 다시말해서 대중들이 사용하기 어려웠다.

그래서 스팍 개발자들이 spark 버전 2부터 최적화 엔진을 개발했다. 텅스텐 프로젝트라는 것을 통해서 카탈리스트를 개발했다. 데이터들이 메모리위에 저장이되고 연산이 되기 때문에 데이터 관리방식을 새로 개발을 했고, 이진인코딩 방식으로 L1, L2 캐시를 효율적으로 활용하도록 개선했다. 그래서 Dataframe과 Dataset이라는 개념에서 카탈리스트라는 optimizer가 지원하도록 만들었다. 그래서 정렬이나 집계, 셔플링 연산도 성능이 대폭 개선되었다.

그리고 UDF도 버전 2의 Dataframe에서 사용할수 있게되었다.

이런 성능개선 때문에 spark 2부터는 API를 쓸때 RDD보다 Dataframe을 사용하도록 권장하고 있다.

왜냐하면 일반사용자가 RDD를 쓰면서 partitioning이라던지 분산병렬처리에 대한 로직을 정확하게 모르기 때문에 Dataframe을 써서 스팍엔진에서 최적화를 알아서 해준다는 것이다.

Dataset은 Dataframe과 유사한데 safe라는 개념이라는게 있다. 데이터에 관련되서 객체화해서 사용할 수 있게 한다. 다시말해서 스키마를 가지고 있어야 하는 API이다. RDD는 schemaless라고해서 스키마가 없어도 되고, Dataframe은 자동적으로 스키마를 갖도록 옵션을 줄수도 있고, 스키마를 직접 사용자가 입력할수도 있다. 하지만 Dataset은 스키마를 갖고 있고, 객체화되어 있기 때문에 좀더 safe하다. 또한 Dataset은 python에서 쓸수없다. 자바나 스칼라에서만 쓸 수 있는 API이다.

실무에서는 대용량의 데이터를 안정적으로 처리해야하기 때문에 Dataset을 많이 쓰는 편이다.

  • RDD(Resilient Distributed Dataset) = 클러스터에 병렬분산된 데이터셋

RDD는 스팍 하위레벨의 인터페이스이자 런타임의 핵심이다. RDD는 저수준이기 때문에 스팍엔진에서 실질적으로 돌아가는 데이터셋 개념이다.

RDD는 크게 두가지로 나눌 수 있다.

1) Transformation 연산자 (변환연산자)

ex) filter, map 등

2) Action 연산자 (실행연산자)

ex) count, take 등

왜 RDD는 Transformation 연산자와 Action 연산자로 나누어져 있을까

그 이유는 lazy evaluation 연산을 하기 때문이다.

Transformation 연산은 스팍엔진이 해당연산에 대한 것만 플랜만 짜놓고 Action 연산일때 비로소 실제 실행을 하는 구조이다.

  • 스팍에서 고수준 API : Dataframe, Dataset

Dataframe이나 Dataset를 통해서 사용자가 쉽게 데이터를 가공시 이를 실질적으로 실행하는 저수준 엔진단으로 변환될때 optimizer라는 메모리 최적화 기술이 들어가 있다.

spark dataframe은 python의 pandas dataframe과 유사하다. 이 둘의 차이점은 역시 spark dataframe은 클러스터에서 분산처리가 되도록 설계가 되어 있다는 점이다. spark dataframe은 spark를 만든 개발자들이 만든 카탈리스트 엔진을 적용하여 분산처리가 빠르고 효율적인 기능을 낼 수 있도록 RDD 대비 개선하였다.

  • Dataframe 내부실행 flow

image

좌측에 spark sql, dataframe, dataset은 고수준 API다. 이 세가지 형태가 데이터 작업을 할때 스팍엔진단에서 어떻게 실행되냐를 보여주는 것이다. 스팍이 빠르다는 이유가 카탈시스트라는 옵티마이저 때문이다. 사용자가 코드를 작성을 해서 실행하면 코드가 어떻게 실행이 될 것인지 논리적인 플랜을 분석해서 스팍엔진이 그 플랜을 갖고 있게 된다. 그런다음에 최적화하는 옵티마이저가 해당 플랜을 가장 잘 실행될 수 있도록 최적화 시킨 논리적인 플랜을 찾게 된다. 그런다음에 이 논리적인 플랜을 실제 물리적으로 어떻게 실행할 것인지 여러방안의 물리적인 플랜들이 나오게 되고, 비용모델이라는 것을 통해서 어떤 물리적인 플랜이 리소스를 가장 효율적으로 연산할 수 있는가를 따져서 거기에서 선택된 물리적인 플랜이 저수준 API인 RDD 형태로 코드가 실행이 되는 것이다.

사용자는 사용하기 쉽게 sql이나 dataframe 형태로 뭔가 작업을 하더라도 스팍엔진단에서는 결국 RDD 형태로 연산하게 된다. spark는 메모리 위에서 분산처리를 하기 때문에 그렇게 할 수 있는 API가 RDD이기 때문에 결국에는 RDD로 연산이 되는 것이다.

  • RDD에서 Dataframe 생성방법

사용자가 어쨌든 데이터를 불러올때는 RDD 형태일 것이다. RDD에서 사용자가 데이터 프레임을 생성해서 그 위에서 데이터 작업을 하는 것이다.

그래서 개발자가 코드를 이용해서 RDD에서 Dataframe을 생성하는 방법은 다음과 같다.

1) 비정형데이터( ex) 로그데이터 )

데이터를 RDD로 로드 –> 각 줄을 파싱 –> 정형화 과정 –> Dataframe으로 활용

** RDD에서 Dataframe을 생성하는 3가지 방법

첫번째 방법 : 로우의 데이터를 튜플 형태로 저장한 RDD를 사용해서 to dataframe으로 활용하는 방법

두번째 방법 : 케이스 클래스를 적용하는 방법

세번째 방법 : 스키마를 명시적으로 지정하는 방법

2) 정형데이터를 Source Dataframe으로 불러오기

정형화된 데이터는 그대로 Dataframe으로 로드하면 되기 때문에 아래와 같은 커넥터 라이브러리를 이용하면 된다.

SQL DB, JDBC, ODBC, Mongo DB, MySQL DB 등 커넥터 라이브러리를 이용

  • spark context와 sparksession

RDD를 사용할 경우 spark context 객체가 main 함수로 활용을 하는데, 스팍 2버전 부터는 sparksession 이라는 객체를 사용해서 RDD 뿐만 아니라 Dataframe, spark sql API 등을 생성할 수 있다. 그래서 일반적으로는 sparksession으로 Dataframe을 생성해서 데이터 작업을 한다.