Spark 기초개념

2019-03-13

Data_Engineering_TIL_(20190312)

study program

1) https://www.fastcampus.co.kr/extension_des

2) https://www.fastcampus.co.kr/data_camp_hadoop

[학습목표]

  • Apache Spark 개요 및 기본원리 이해

[학습기록]

1. 스파크의 등장

  • 피그하고 하이브는 데이터를 저장하고 처리, 집계, 샘플링하는 것까지는 가능했다. 이렇게 한다음에 실질적인 분석은 R로 수행했다. 그러나 R은 싱글프로세스로 돌아가는 프로그램이기 때문에 데이터가 크면 R은 한계가 발생한다. 그래서 과거에는 R에서 한방에 돌아갈 수 있도록 피그와 하이브에서 다 전처리한 데이터를 던져줬다. 그래서 우리가 원하는 분석을 하려면 시간이 매우 많이 걸렸다. 그러나 스파크가 등장하면서 분산병렬로 데이터를 처리하고, 분석도 가능해졌다.

  • 피그나 하이브는 집계와 샘플링이 가능하여 일반적으로 데이터 전처리 시 사용된다.

  • 먼저 데이터 저장은 스파크의 역할이 아니다. 스파크는 R의 싱글프로세스를 극복하는 분산병렬처리가 가능한 데이터 분석용으로 만들어 진것이다. 데이터는 HDFS가 되었던 오라클 디비가 되었던, NoSQL 이 되었던 어디에서든지 데이터를 가져와서 분산병렬처리가 가능하다.

2. 스파크에서 분석하고자 하는 데이터의 저장양상

데이터가 파일형식이 되었던, HDFS가 되었던, DB파일이 되었던 상관이 없다.

스파크는 데이터를 가져와서 데이터를 처리해주는 분산병렬처리 엔진이다.

양상 1) 데이터는 분석가가 접속하는 클라이언트 머신의 로컬에 저장되어 있는 경우

그 데이터를 가져와서 분산병렬처리를 해줄 수 있다.

양상 2) HDFS 안에 저장되어 있는 경우

양상 3) NoSQL(Hbase,Redis), RDBMS(오라클,MYSQL) 등 디비에 저장되어 있는 경우

3. 맵리듀스가 있는데 왜 굳이 스파크가 필요한 것인가

하둡은 상당히 높은 안정성을 요구한다. 과거에는 프로그램이 돌아가다가 서버가 죽을 수도 있다는 가정하에서 맵리듀스가 만들어진 것이다. 따라서 하둡은 항상 노드들에게 “죽었니 살았니”를 끊임없이 집착하는 경향이 있다. 그래서 데이터가 많던 적던 처리속도에 대해서 지연시간이 크게 발생하게 되었다.

그래서 기존의 맵리듀스보다 더 발전된 새로운 분산병렬처리 프레임워크를 만들겠다는 취지로 스파크가 나온것이다. 맵리듀스도 분산병렬처리를 위한 프레임워크이다. 철저한 틀안에서 돌아간다는 얘기다.

4. 스파크 개요

  • 스파크는 피그의 기능도 내장되어 있고, 하이브의 기능도 내장되어 있다. 또한 머신러닝 라이브러리도 포함하면서 마훗을 대체하게 된다. 또한 sparkR기능을 제공함으로써 R도 대체하게 되었다. 심지어 스트리밍 처리기능도 제공한다.

  • 스파크가 처리할 수 있는 데이터는 뭐든지, 어디에 있든지 상관없다. 예를들어서 DB에 있는 데이터를 가져와서 데이터 처리결과를 HDFS에 저장할 수도 있고, HDFS에서 데이터를 가져와서 데이터를 처리한 다음에 RDBMS에 저장할 수도 있다. 그래서 스파크의 인풋 아웃풋을 자유롭게 채택해서 활용할 수 있다.

  • 스파크는 클러스터 매니저가 있어야 한다. 하둡에서는 얀이 있었는데 스파크에서도 얀을 쓸수도 있고, 메소스라고해서 하둡뿐만 아니라 다른분야에서도 쓰이는 일반적인 클러스터 매니저를 쓸 수도 있다. 또한 스파크에서 제공하는 스파크에서만 돌아가는 클러스터 매니저도 지원해준다. 예를들어서 나는 DB에서 데이터를 가져와서 스파크로 처리하고 다시 DB에 데이터를 저장할거라고하면 굳이 메소스를 안쓰고 스파크 스텐드얼론 매니저를 쓸수도 있는것이다.

  • 스파크 코어는 스칼라라는 언어를 사용해야한다. 하지만 스칼라라는 언어로 다 할수 없기 때문에 스파크 코어의 내장패키지로 제공해주는게 sparkSQL이 있다. SQL 쿼리도 날릴 수 있다는 얘기다.

  • 그래프액스는 네트워크 관련된 작업들을 해주는 기능이다. 예를들어서 구글검색엔진에 들어가는 알고리즘은 패이지랭크 알고리즘을 쓰는데 이는 일반적인 머신러닝 알고리즘이 아니기 때문에 기능을 따로 때서 제공하는 것이다. 그래프 알고리즘을 제외한 것은 머신러닝 라이브러리에 거의다 있다고 할 수 있다. 딥러닝도 DNN까지 지원해준다. 딥러닝을 스파크에서 하기위해 스파크코어에 추가적인 패키지를 탑재하여 쓰기도 한다.

  • 결국 이런 스파크의 개요를 봤을때 스파크로 데이터 처리부터 분석까지 다 할수있다.

0

  • 스파크도 하둡이 발전하는 양상과 유사하게 발전하게 된다. 하둡은 처음에 얀이 없었다. 그래서 얀이라는 새로운 클러스터 매니저가 만들어지게 되었다. 얀이 등장하면서 맵리듀스 이외에 다른것도 할 수 있게 되었다.

  • 그러다가 테즈라고 하는 것이 등장했다. 테즈는 스파크 코어랑 비슷하다. 아파치 하둡2.0의 YARN 위에서 동작하는 비동기 사이클 그래프 프레임워크이다. pig, hive와 연동하여 사용하면 기존의 mapreduce에 비하여 성능이 향상되는 효과를 얻을 수 있다. (이런과정과 유사하게 스파크가 등장하게 되었다.)

  • 그리고 파일을 저장할때 로우단위로 저장하는 것이 일반적인데 분석은 사실 컬럼단위 모델이 많다. 그래서 데이터를 분산저장할때 컬럼단위로 압축해서 저장하는 파일포맷이 이후에 나오게 된다. 스파크도 이 파일포맷을 채용하게 된다.

  • 과거에는 프로그램을 짜면 그때 컴파일을하고, 소스코드를 배포하였다면 데이터베이스처럼 서버(서비스)를 띄워놓고 요청이 오면 바로 실행할 수 있는 개념으로 스파크는 바뀌게 된다. 스파크는 항상 떠서 대기하는 형태다 그래서. 반면에 맵리듀스는 서버가 아니다. 맵리듀스는 어플리케이션이다. 그래서 사용자가 요청을 날리면 그때서야 프로그램이 실행되고 실행이 종료되면 없어진다.

  • 그리고 데이터 처리속도를 향상시키기위해 스파크는 버퍼캐싱도 사용할 수 있다.

5. 빅데이터 처리방안의 변화

1) 1안 Hive파

데이터웨어하우스의 집중해서 이 기능을 발전시키자

결론적으로는 일반적으로 1안은 Hdaoop에서 hive를 집중적으로 발전시켜서 데이터 웨어하우스의 기능에 충실하고자 했는데 spark의 등장으로 거의 망했다.

2) 2안 spark파

스파크는 하나의 클러스터내에서 스파크 버전이 다른 잡을 동시에 돌릴 수 있다.

빅데이터는 결국에는 분석이다. 이 분석이라는 것은 결국에 처리까지 포함해야 하기 때문에 데이터 처리기능만 할 수 있는 하이브는 한계가 있다. 데이터를 처리하고 집계하는 것이 목표가 아니라 궁극적으로는 분석도 가능해야 한다는 것이다.

스파크가 더 빨라지기 위해서는 여러가지 조건을 갖추어야 하는데 특히 자바를 더이상 사용하면 안된다. 즉 컴파일 언어를 지양해야 한다. 그래서 스파크는 스칼라라는 함수형 언어를 채택했다.

0-1

6. 스파크 프로그램 동작

사용자가 임의로 코드를 작성을 하고 실행을 하면 스파크 클라이언트에서 돌아간다. 어플리케이션 마스터는 과거와 다르게 서버방식이다. 계속 떠서 돌아가는 형태다.

하둡은 실행을 하면 데이터노드에서만 돌아가고 클라이언트는 일을 시키는 역할만 한다. 그래서 하둡에서 클라이언트는 그냥 놀고만 있는 것이다. 자원낭비가 일어나고 있는 것이다.

그래서 스파크에서는 스파크 프로그램이 클라이언트에서도 돌아가겠금 하였다. 스파크 클라이언트가 마스터가 되는것이다. 프로그램을 실행하면 어플리케이션 마스터에서 RDD 구성을 한다. 그리고 스케쥴링 계획을 잡고, 데이터를 블록단위로 가져올때 정상적으로 가져오는지 트레킹하고, 중간에 부하가 많이 걸리는게 셔플이고 퍼포먼스를 낼 수 있는 관건이기 때문에 이 셔플을 따로 관리한다. 네트워크 부하를 관리하겠다는 것이다.

이런 역할을 클라이언트에서하고 클러스터 매니저는 프로그램이 실행되면 잠깐 사용하고 거의 사용을 하지는 않는 편이다.

그리소 스파크 워커는 맵리듀스는 태스크이다. 프로그램을 실행할때 맵리듀스는 코어하나가 있으면 맵리듀스가 원코어를 사용한다고 하면 프로세스 하나만 돌아가는 것이다. 코어가 두개면 프로세스가 두개 돌아간다. 현재는 컴퓨터 성능이 좋아져서 쓰레드 방식으로 처리하는 경우가 많다. 그래서 실제로 맵이 하나돌아간다고 하지만 쓰레드로 하게되면 동시에 많이 돌릴 수 있다. 이런 쓰레드 방식을 사용하려면 스칼라 같이 함수형 언어를 사용해야 한다. 함수형 언어는 프로세스는 하나지만 쓰레드는 다수의 쓰레드를 쓸 수 있다. 이렇게 되면 속도향상을 도모할 수 있다.

스파크 워커는 데이터를 받아서 split 조각이 있으면 이 조각별로 하나씩 처리하는 것이 아니라 쓰레드의 병렬성을 활용하여 처리할 수 있도록 해준다. 그다음에 블록관리자라는게 있어서 데이터를 읽고 쓰게 해준다. 블록매니저를 따로 둔 이유는 데이터가 HDFS에만 있는게 아니라 DB나 다른 곳에 있을 수 있기 때문이다.

0-2

7. 데이터 프로세싱과 스케쥴링

아래 그림과 가장 좌측단계처럼 코드를 작성하고 실행하면 예를 들어서 물리적인 RDD Object가 나온다는 것이다. 2개의 물리서버가 필요할 것이다. 하나의 RDD를 처리하기 위해서는. 그 다음 단계로 넘어가서 머지하게 되면 3대의 서버가 필요하다는 것이다. 즉 맵이 실행될때 두곳에서 실행되고, 리듀스는 세곳에서 실행된다는 것이다. 아래 그림에서는 이런 과정에서 셔플이 한번 일어나는 것이다.

데그 스케쥴러라는 것이 있어서 프로그램 코드는 논리적이지만 물리적인 계획을 스케쥴링 해준다.

그리고 테스크 스케쥴러가 세번째로 나와 있는데 사실 이 그림은 잘못된 것이다. 실제로는 처음에 프로그램이 실행될때 벌써 클러스터 할당을 다 받기 때문이다.

워커에는 쓰레드가 있어서 자기가 알아서 쓰레드처리를 해주고 데이터도 관리한다.

0-3

8. 데이터 프로세스 모델

스파크가 빠른 이유를 알아보자.

하둡은 항상 데이터가 HDFS 즉 디스크에 저장되어 있다. 프로그램이 실행되면 한번 작업을 한다. 그리고 맵리듀스가 끝나면 그 결과를 다시 HDFS에 저장한다. 실행하는 프로그램의 복잡성에 따라 맵리듀스가 여러번 실행되는 경우가 자주 있기 때문에 연산하고 디스크에 저장하고 이런 과정을 반복하면 시간이 엄청오래 걸린다.

그러나 스파크는 데이터를 가져와서 맵리듀스를 돌린다음에 일단은 메모리에 저장한다. 중간결과를 디스크에 저장하고 가져오는 시간과 중간결과를 메모리에 저장하고 가져오는 시간의 차이는 엄청나게 날 수 밖에 없다.

또한 반복적인 연산이나 쿼리를 요청하는 경우에도 하둡은 디스크i/o를 그대로 다 부담해야 하지만, 스파크는 메모리에 한번 데이터를 전부 적재하는 과정이 있기 때문에 반복을 하면 할수록 시간절약 효과가 엄청나게 커지게 된다.

단 DB가 실시간으로 바뀌는 것에 대해서는 메모리에 어떻게 반영할 수가 없는 것이 단점이다.

0-4

9. Hadoop 클러스터에서 스파크 동작

커멘드라인 인터페이스가 클라이언트 프로그램인 것이다. 이를 드라이버라는 명칭으로도 쓰인다.

스파크는 클라이언트 프로그램이기도 하지만 드라이버인 것이다.

클라이언트 프로그램 = 드라이버 = 마스터.

나머지는 워커가 되는 것이다.

연산을 하고 중간결과를 드라이버한테 보내고 이 드라이버가 다시 종합해서 또 일을 할당하고 이런 방식이다.

0-5

10. 데이터 규모 및 사용자 요구에 따른 데이터 처리 양상 변화

아래 그림과 같이 저장소는 전통적으로는 로컬 파일시스템이 있었고 데이터를 조금 더 쳬계적이고 관리하기 용이한 RDBMS가 있었다. RDBMS도 데이터를 물리적인 파일로 저장한다. 그리고 이런 파일들과 프로즈램 언어를 가지고 어플리케이션을 만들 수 있었다.

그러나 데이터 규모가 커짐에 따라 이런 전통적인 데이터 처리 양상은 달라지게 된다. 데이터가 이제는 한곳에만 저장되면 안되고 분산저장되야 한다. 그래서 나오게된 개념이 분산파일시스템(HDFS)이다. 사실상 HDFS는 분산파일시스템의 표준으로 쓰이게 된다. 또한 이런 파일들을 전통적인 RDBMS처럼 체계적으로 관리할 수 있어야한다. 그래서 나온 개념이 하이브이다. 이 이후로 다양한 것들이 등장한다. HDFS와 하이브로 사용자들의 수많은 요구들을 모두 만족시킬 수 없기 때문이다.

그래서 나온게 NoSQL DB인 HBase가 나왔고, 이런 NoSQL을 빠른속도로 색인하고 검색할 수 있는 Elastic Search가 나왔다. HDFS가 블록단위로 데이터를 처리한다면 HBase는 Row단위로 데이터를 처리한다.

IoT 실시간 데이터, 로그데이터 등 다양한 종류의 데이터가 다양한 시스템의 저장소에 저장되려면 데이터 허브라는 것을 사용하게 되는데 다양한 소스데이터를 한군데에서 모아서 보내주게 된다. 대표적으로 카프카가 있다. 쉽게생각해서 데이터가 쌓였다가 순차적으로 나가는 큐라고 생각하면 된다.

그리고 하둡 맵리듀스가 성능이 좀 떨어지기 때문에 하둡 맵리듀스는 이제 거의 쓰이지 않고, 하둡 맵리듀스의 대안으로 나온 Spark를 많이 쓴다. 통상적으로는 Hive는 DW 용도로 많이 쓰고 이 DW 쿼리를 위해(하둡기반의 SQL쿼리를 위해) 임팔라나 타조, 프레스토를 일반적으로 쓰는 편이다.

0-6

11. 함수형언어 Scala

0-7

자바, C : 컴파일 언어

자바의 장점은 자바버전 8이 되면서 자바버츄얼머신(JVM)을 제공해주게 되는데 자바가 컴파일방식이긴 하더라도 엄밀하게 JVM만 봤을때는 스크립트 방식과 동일하게 작동된다. 따라서 함수형언어처럼 병렬처리가 가능한 것이 자바 8버전부터 들어가게 된다. 그래서 자바 8이후의 JVM 환경에서는 자바도 일종의 함수형 언어를 지원하는 런타임이 된다.

스칼라는 독자적으로만 언어가 돌아가는 것이 아니라 JVM위에서 돌아가게 된다. 스파크는 기본적으로는 스칼라 언어로 구현되어 있다.

함수형 언어를 쓰게되면 코드를 한줄한줄 읽어들여서 해석해야 한다. 또한 공유메모리가 있어야 한다. 따라서 변수가 있으면 안된다. 변수는 메모리의 특정공간을 잡고 있으면서 값이 변하는 것이다. 함수형 방식에서는 값이 변하면 안된다. 값이 변하게되면 분산병렬처리가 안된다. 따라서 분산병렬처리를 하기 위해서는 변수를 쓰면 안된다. 어떤 특정한 메모리 값 하나를 가지고 값을 바꾸면 안된다는 말이다. 왜냐하면 서버가 여러군대에서 돌아가는데 어떤 값을 하나 공유한다는 것이 말이 안된다.

또한 함수형 언어는 알아서 병렬처리가 되야한다. 프로그래밍 하는 사람이 “이거를 병렬처리 할거야”라고 명시적으로 프로그래밍 하는 것이 아니라 언어에서 지원하는 어떤 특정함수를 쓰게되면 병렬처리가 자동적으로 되어도 아무 문제가 없는 것이다. 최근에 프로세스 단위에서 쓰레드 단위로 운영체제의 방식이 변했기 때문도 있다.

또한 데이터의 work flow에 대해서 방식이 DAG를 따라야 한다. 프로그램이 절차형으로 실행되는 것이 아니라 함수 단위로 데이터의 흐름이 각각 처리되어야한다. 플로우 중간에 서클이 생기면 안되고 시작과 끝이 분명하게 존재해야 한다.

0-8

그래서 위의 그림과 같이 자바 8이 되면서 내부적으로 절차형언어와 함수형언어를 다 쓸 수 있도록 바뀌게 된다. 그래서 자바 소스코드가 있으면 컴파일을 하게 되면 jar 파일과 같이 바이트 코드로 바뀌게 된다. 그리고 JVM위에 올라가서 실행되게 된다. 그리고 JVM이 이 코드들을 실행하면서 병렬로 처리가 가능하다. 그래서 스칼라 언어도 JVM위에서 실행된다. 초창기에는 스칼라 독립적으로 실행되었다. 그래서 스칼라언어로 코드를 작성해서 실행하면 JVM 내부에서 바이트 코드로 바뀌게 되어 실행된다.

0-9

그러면 하둡 맵리듀스가 실행되는 방식과 스파크가 실행되는 방식을 비교해보자.

전통적인 하둡 맵리듀스 방식은 다음과 같다. 만약에 클라이언트 머신에서 하이브 쿼리를 날린다고 가정하자. 그러면 자바로 된 맵리듀스 코드를 생성해준다. 그리고 바이트 코드인 jar로 변환되기 위해 컴파일을 또 수행한다. 이 시간이 2초 ~ 3초 소요된다. 컴파일이 완료되면 클라이언트에는 프로그램이 있는데 실제 돌아가는 것은 데이터노드에서 돌아가야 한다. 그런 다음에 이 자르파일을 실행파일로 만든다. 그리고 하이브에서 잡트래커에게 잡을 수행해달라고 요청한다. 그러면 스케쥴러가 일을 할당해준다. 그런다음에 이 클라이언트의 jar 파일을 각 데이터노드에 배포를 하게된다. 그래서 만약에 데이터노드가 100개 있으면 이 자르파일을 100개의 데이터노드에 전부 배포될때까지 프로그램이 실행이 안된다. 100대의 데이터노드가 테스크를 할당받은 다음에 자르파일을 배포받게 된다. 그런다음에 JVM위에서 돌아갈 수 있도록 실행을 해준다.

반면에 스파크는 처음에 명령어를 입력할 수 있는 쉘을 띄운다. 그게 스파크 드라이버다. 클라이언트 머신에서 스파크 드라이버를 실행하면 동시에 워커노드의 스파크 익스큐터(스파크 실행기)가 JVM위에서 실행된다. 그래서 스파크 드라이버에서 스파크 스크립트를 작성하고 명령을 때리면 클라이언트가 워커노드에게 자르파일을 전송하는 것이 아니라 “너는 뭘 해라”라는 명령을 바로 내린다. 워커노드들이 어떤일을 할지 클라이언트에서 이미 전부 스케쥴링이 된 상태로 명령(스크립트)이 전달된다. 워커노드는 이 명령을 받아서 바로 실행하게 된다. 드라이브에서 명령을 실행하면 자동으로 리소스(자원)을 할당을 받고 실행하게된다.

스크립트 실행할때마다 리소스를 할당받아야 하면 얀이 필요했는데 이제는 얀이 필요없게 되는 것이다. 기존의 하둡 맵리듀스시에서는 명령을 실행할때마다 맵리듀스가 몇개 돌아가는지 쿼리를 전부 실행하고 컴파일하고 잡트레커에 요청하기 전까지는 맵이 몇개인지 리듀스가 몇개인지 알수가 없다. 그리고 데이터의 규모가 큰지 작은지도 알 수 없기 때문에 그때마다 테스크 스케쥴링을 해줘야 한다. 반면에 스파크는 마스터가 필요없이 클아이언트의 스파크 드라이버에서 이미 이런 스케쥴링이랑 자원할당이 자동적으로 다 된다는 것이다.

다시말하면 기존의 하둡 맵리듀스의 클라이언트는 일을 시키기만 했었다. 클라이언트에서 프로그램 돌아가는게 아무것도 없었다. 오직 컴파일하고 실행하는 작업만 수행했고, 마스터노드한테 자원을 할당받고, 스케쥴링을 맡겼는데, 스파크에서는 클라이언트가 전부 다 통합해서 하게 된다. 기존의 하둡맵리듀스에서는 소계/통계를 하게되어도 그 결과는 그 일을 한 워커노드의 HDFS에 임시저장되는 반면에 스파크는 작업은 워커노드에서 하되 각각의 결과는 클라이언트가 가져와서 자기가 취합하게 되고 연산이 추가적으로 더 필요하면 해당 연산을하는 워커노드에 다시 보내주는 이런 차이점이 있다. 그래서 스파크에는 collect라는 명령어가 있는데 하둡맵리듀스에서는 그런명령어는 없다.

12. Job DAG 관점에서 하둡 맵리듀스와 스파크 비교

0-10

하둡 맵리듀스를 실행하면 엄청나게 많은 Temp가 HDFS에 생겼다가 잡이 끝나면 클랜징작업으로 싹 지워지는 현상을 볼 수 있다. 비효율적이다.

반면에 스파크의 처리방식은 아래와 같다.

0-11

데이터 직렬화 : 로우기반의 데이터가 아니라 컬럼기반의 데이터로 바꾸는데 그 과정에서 로우단위에서 컬럼단위로 바꾸고 압축을해서 메모리나 디스크에 상주를 시킨다. 통상 MEMORY_AND_DISK_SER과 DISK_ONLY를 많이 쓰는 편이다.

13. RDD

  • 스파크의 핵심개념은 RDD이다. RDD는 Resilient Distributed Dataset으로 탄력적이면서 분산된 데이터셋이라는 의미이다. 분산처리(병렬처리)가 되는 가상의 데이터셋(리스트(array))인데 오류 자동복구가 가능하다는 것이다. 다양한 계산수행이 가능하고 메모리를 활용하여 높은 성능을 보여주는 것이 특징이다.

  • RDD는 결국에 리스트인데 가상에 리스트라고 생각하면 된다. map, reduce, count, filter, join 등 다양한 작업 가능하고, 작업을 병렬적으로 처리할수 있으며, 여러 작업을 설정해두고 결과를 얻을 때 lazy하게 계산이 가능하다. lazy하게 계산이 가능하다는 것은 RDD에 연산되는 모든스텝의 연산과정을 메모리에 담고 있고, 한단계 더 나아가서 빅데이터 스케일에서 장점을 보인다. 예를들어서 3단계의 맵리듀스 연산 단계가 있다면 각 단계별로 결과를 확인하면서 한스텝 한스텝 나아가는데 스파크에서는 메모리에 각 연산과정을 저장해두고 실행을 했을때 메모리에 저장되었던 연산과정이 한번에 일어난다.

  • 하둡의 맵리듀스를 어떻게하면 좀 더 편하게 써볼까하는 아이디어에서 나온게 RDD이다. 이후에 사람들이 빅데이터를 접하다보니까 비정형데이터 보다는 그래도 정형으로 된 데이블데이터를 많이 실제로는 활용하게 되어서 테이블데이터를 조금 더 편하게 다룰수 있는 도구가 있어야 겠다는 필요성을 느꼈고 또한 머신러닝 기술이 발전하면서 머신러닝 알고리즘에 들어가는 데이터도 많은경우에 테이블형태의 데이터이기 때문에 이런 요구사항이 커져서 RDD형태보다는 데이터프레임 형태로 된게 여러모로 유용하다고 판단되어 스파크 데이터프레임이 만들어지게 되었다.

  • 스파크가 큰 인기를 끈 또다른 이유는 스칼라 언어로 된 인터페이스가 굉장히 잘 되어 있기 때문이다. 프로그램을 사용하기 매우 편하다는 의미이다. 예를들어서 맵리듀스 코드를 구현하려면 매우 어렵고 많은 양의 코드타이핑이 필요하지만 스파크에서는 단 몇줄이면 전부 구현 가능하다.

  • 스파크의 또다른 장점은 스파크 엔진을 기반으로 다양한 확장 프로젝트를 제공한다는 것이다. 스파크 하나만 잘 배워도 아래의 확장 프로젝트도 쉽게 접근할 수 있다.

1) Spark SQL: Hive와 비슷하게 SQL로 데이터 분석

2) Spark Streaming: 실시간 분석

3) MLlib: 머신러닝 라이브러리

4) GraphX: 페이지랭크같은 그래프 연산(분석)

  • 스파크 데이터프레임은 판다스 데이터프레임과 매우 유사한데 차이점은 판다스 데이터프레임은 컴퓨터 한대가 커버할 수 있는 범위만 허용되지만 스파크 데이터 프레임은 빅데이터 스킬도 커버가 가능하다.

  • 스파크에서 RDD 동작은 action(표현된 데이터를 가져옴)과 transformation(데이터를 어떻게 구해낼지를 표현)로 이루어져 있다. Map, Filter 등이 transformation에 해당되고 count, collect, take, data load 등이 action에 해당된다.

  • Lineage와 Lazy Execution도 잘 알고 있어야 한다. Lineage는 특정데이터가 어떤 연산을 거쳐 어떻게 바뀌어야 하는지에 대한 수행단계 내용을 말한다. Lineage는 잘 갖고 있어야 하는데 그 이유는 클러스터 중 일부의 고장 등으로 작업이 중간에 실패하더라도 Lineage를 통해 데이터를 복구가 가능하기 때문이다. Lazy Execution은 Transformation시에는 계산을 수행하지 않고 Action이 수행되는 시점부터 데이터를 읽어들여서 계산을 시작하는 것을 말하다. Lazy Execution의 장점은 맵리듀스에 비해서 중간단계의 연산결과를 디스크에 저장하지 않기 때문에 성능면에서 효율적이다.

  • 예를 들어서 데이터양이 많아서 한 스텝의 연산별로 2시간씩 걸린다고 치면 과거 하둡 맵리듀스 시절에는 2시간 기다렸다 연산결과 확인하고, 또 2시간 기다렸다 연산결과 확인하고, 또 2시간 기다렸다 연산결과 확인하는 방식이라면 스파크는 실행을 누르면 그냥 6시간을 기다리면 되지만 실제로는 6시간도 아니고 1~2시간이면 끝난다. 굉장히 일이 빨리 수행되는 그런 장점이 있다.

  • take는 collect와 비슷한데 collect는 RDD에 있는 데이터를 전부 꺼내는데, take는 RDD에 있는 데이터 일부만 꺼낸다. 빅데이터 스케일에서는 collect를 쓸 수 없고, take를 써야한다.

  • 예를 들어서 1000억개의 데이터를 로드를 한다면 먼저 로드를 하고 RDD를 만드는데 실제로 로드하지는 않고 로드 하는척만하고 로드한 정보만 갖고 있다. 그러다가 count같은 액션이 일어나면 그때 데이터를 로드를 하기 시작한다. 이때 클러스터에서 응답이 가능한 메모리 용량만큼 읽어서 RDD에서 count한 count 끝나면 읽어온 데이터 버리고, 그다음에 또 응답가능한 메모리만큼 읽어서 RDD에서 count한 다음에 count 끝나면 읽어온 데이터 버리고 이런 방식으로 연산하여 최종결과를 얻어오는 방식이다.

  • 사실 RDD에서 count action을 할때는 데이터가 메모리에 있을 필요가 없이 필요한 만큼 하나씩 읽어서 count하면 되는건데 join이나 group을 해야할때는 메모리가 많이 필요하다. 메모리가 부족한 경우도 있는데 이 경우에는 부득이하게 디스크에 저장하게된다.

  • RDD Transformations은 RDD의 데이터를 다른 형태로 변환하는것으로 실제로 데이터가 변환되는 것이 아니라, 데이터를 읽어들여서 어떻게 바꾸는지 방식만을 기록하는 것이다. 실제 변환은 Action이 수행되는 시점에서 이루어진다. map, filter, flatMap, mapPartitions, sample, union, intersection, distinct, groupByKey, reduceByKey, join, repartition 등 다양한 연산을 할 수 있다.

  • RDD Actions은 여러가지 변환 (Transformation)이 담긴 RDD의 정보를 통한 계산을 수행하는 것이다. reduce, collect, count, first, take, saveAsTextFile, countByKey, foreach 등이 있다.

  • RDD Caching는 반복 계산에서의 성능 향상을 위해 RDD의 내용을 메모리에 캐싱이 가능하다는 의미이다. 다르게 얘기하면 반복계산이 아닌경우 캐싱을 할필요가 없다는 말이다. rdd.persist() 또는 rdd.cache() 메서드를 통해 구현할 수 있다. rdd.unpersist()는 캐싱을 해제하는 것을 말한다.

  • 스파크에서 캐싱을 하려면 예를들어서 스파크 클러스터가 있다고 치자 클러스터를 구성하는 컴퓨터는 10대가 있다고 치자. 각각의 클러스터는 32기가의 메모리를 갖고 있다고 가정하자. 그러면 총 320기가의 메모리의 클러스터가 된다. 이때 연산하고자 하는 데이터가 100기가라고 치자. 이 클러스터에서 캐싱을 위해 활용하는 메모리가 전체 메모리의 60%라고 쳐도 100기가의 데이터는 충분히 수용할 수 있을것이다. 만약에 이 데이터가 100기가가 아니라 400기가면 메모리에 넣어도 넘쳐 흐르기 때문에 넘치는 부분을 디스크로 내려버린다. 그러면 또 느려지게 된다. 그래서 전부 다시 읽는것보다 느릴 수도 있다. 이럴 경우 캐싱을 안하는게 오히려 이득일 수도 있다. 따라서 캐싱을 한다고해서 무조건 빠른것은 아니다.

  • 참고로 오픈소스 프로젝트는 공식문서를 참고하는게 가장 정확하고 가장 좋다고 할 수 있다.

  • RDD 내부는 4가지 파트로 구성된다.

1) Partition : 데이터를 나누는 단위

2) Dependency : RDD의 파티션이 어디에서 파생되었는지를 기술하는 모델, 리니지의 한 단계라고 할 수 있다. 성능에 가장 impact가 있는 부분이다.

3) Function : 부모 RDD에서 어떻게 파생되었는지를 계산하는 함수

4) Metadata : 데이터 위치와 파티션 정보를 가지고 있음

1

  • 디펜던시는 두가지 타입이 있다. wide 디펜던시는 성능의 문제가 걸려있다. 왜냐하면 데이터를 다른 컴퓨터에 네트워크로 전송을 해야하기 때문에 네트워크 속도 문제도 있고, 많은 데이터를 엮어서 다른 데이터를 만드는 것이기 때문에 메모리 문제도 생긴다. 빅데이터 문제에서는 가능하면 wide 디펜던시를 피하는게 좋다.

2

  • 스파크 RDD에서 join 시 키를 그때그때 정해줘서 쿼리를 날리는 것이 아니라 데이터 구조를 아예 키벨류 구조로 키를 정해놓고 join하면 이미 정해진 키끼리 조인이 되는 것이다. outerjoin은 키가 매치가 안되어도 빈칸으로 가져오라는 것이다.

  • 반면에 스파크 데이터프레임에서는 sql처럼 그때그때 무슨키로 조인할때 지정해주는식으로 sql처럼 join을 해주면 된다.

3

  • 클러스터 매니저 : 마스터, 컴퓨터 한대

  • 워커 : 슬레이브로 실제 일을하는 요소다. 워커들은 각각 컴퓨터 한대

  • 드라이브 프로그램 : pyspark를 실행하면 spark context (sc)라는 객체가 있는데 컨텍스트를 통해서 마스터와 연락하거나 워커로 부터 데이터를 받으면서 일을 수행하는 것을 말한다.

  • 테스크 : 스파크에서는 테스크 단위로 수행한다. 데이터가 너무 크면 쪼개져서 하나의 파티션이라는 단위로 나뉘어져서 RDD에서 처리하게 된다. 파티션 하나를 맵하면 파티션 하나가 다른 파티션으로 변환될텐데 그 하나가 테스크 하나가 된다. 워커에는 executer라는게 있고 거기에서 테스크들을 여러개를 수행할 수 있는 방식으로 되어있다. 워커에 여러개의 executer를 띄울 수 있다. executer마다 캐시를 가지고 있어서 데이터를 캐싱하기도 한다.

  • 스파크 간단한 실습 예시 : word count

spark.md이라는 텍스트 파일의 word count를 해보자

step1) spark shell 구동

./bin/pyspark

step2) 텍스트 로드

text = sc.textFile(“README.md”)

step3) split

스페이스로 스플릿한다. flatmap은 string 하나를 split했더니 array로 출력되어 이중array가 된 데이터를 하나의 어레이 안에 정리해주는 함수를 말한다.

words = text.flatMap(lambda s: s.split(“ “))

step4) word count

words.map(lambda w: (w, 1))는 데이터를 키벨류로 정리해주는 것이고

reduceByKey(lambda a,b: a+b)는 정리한 키를 모아서 데이터가 몇개가 나왔는지 카운트 해준다.

counts = words.map(lambda w: (w, 1)).reduceByKey(lambda a,b: a+b)

step5) print

result = counts.sortBy(lambda x: x[1], False).take(20)

for x in result:

print(x)

** 기타 참고자료

[참고자료 1] spark core concept

image

[참고자료 2] spark-submit

** 이미지 출처 : https://12bme.tistory.com/441?category=682904

image

[참고자료 3] Spark executor memory decomposition

image