Apache Spark 기초실습 step 1)

2019-03-19

Data Engineering TIL (20190316)

study program : https://www.fastcampus.co.kr/extension_des

[학습목표]

  • Apache Spark 기초실습

[학습기록]

  • 윈도우 환경에서 권장 실습환경 초기셋팅

step1) 우분투 가상머신 설치

step1-1) 버추얼 박스 설치 참고 URL : https://extrememanual.net/7184

step1-2) 우분투 iso 다운 및 설치 참고 URL : https://extrememanual.net/7223

step2) 파이썬 3.7설치

우분투 기본 파이썬 버전을 3.7으로 설정까지 해줘야함

참고 URL : https://codechacha.com/ko/change-python-version/

우분투 기본 파이썬 2.7 이 설치 되었을 경우 아래와 같이 파이썬 3.7로 환경변수를 설정해주는 방법도 있다.

$ export PYSPARK_PYTHON=python3.7

step3) 자바 설치

참고 URL : https://m.blog.naver.com/opusk/220985259485

  • 아파치 스파크 설치

step1) https://spark.apache.org/downloads.html 접속

step2) Spark release는 최신버전, package type는 pre-built for Apache Hadoop 2.7 and later로 선택

step3) Download Spark: spark-2.4.0-bin-hadoop2.7.tgz 클릭

step4) http://mirror.navercorp.com/apache/spark/spark-2.4.0/spark-2.4.0-bin-hadoop2.7.tgz 클릭 및 다운로드

step5) 터미널을 이용하여 파일이 다운로드 되어진 경로로 이동’

step6) ‘tar -xvf spark-2.4.0-bin-hadoop2.7.tgz’ 입력하여 압축해제

step7) ‘cd /home/minman/다운로드/spark-2.4.0-bin-hadoop2.7/bin’ 입력하여 이동

step8) ‘./pyspark’로 pyspark 실행

./에서 .은 현재 디렉토리를 말하고 이는 현재 디렉토리의 pyspark를 실행하라는 의미이다. 통상 실행할때는 그래서 ./를 많이 쓴다.

  • word count 실습

text = sc.textFile(“../README.md”)

text.first()

text.take(3)

text.map(lambda s:s.split(‘ ‘)).take(3)

words = text.flatMap(lambda s:s.split(‘ ‘))

flatmap을 이용해서 sentence로 된 문장을 단어로 다 뺀것이다.

words.take(10)

words.map(lambda w: (w, 1)).take(10)

wordcount = words.map(lambda w: (w, 1)).reduceByKey(lambda a,b: a+b)

맵리듀스 방법을 사용해서 단어를 키벨류 형식으로 만든다음에 reduce함수를 이용해서 다 합친다. 맵리듀스의 리듀스를 할 경우에는 키벨류 형식으로 미리 만들어놓고 그거를 정해져있는 키로 오퍼레이션을 합치는 형식이기 때문에 위와 같이 map을 이용해서 먼저 딕셔너리로 만든것이다. 리듀스는 word라는 키워드를 기준으로 1이 있으면 그 다음에 똑같은 (word,1)이 있을 시 word는 고정하고 a(=1) + b(=1)을 해주는 것이다.

wordcount.sortBy(lambda x:x[0]).take(30)

알파벳순으로 정렬 후 출력

wordcount.sortBy(lambda x:x[1], False).take(30)

빈도많은순으로 정렬 후 출력

  • 실행결과

1

  • 스칼라코드는 ‘zen he’교수의 홈페이지를 참고해서 공부하는게 좋다.

관련 URL : http://homepage.cs.latrobe.edu.au/zhe/ZhenHeSparkRDDAPIExamples.html

zen he 교수 홈페이지의 설명은 스칼라 언어로 되어있는데 공식문서의 해당부분 파이썬 코드 예시를 찾아 공부하면 된다.

  • Narrow & Wide Dependency 실습

1) Narrow dependency : 아래처럼 효율적으로 처리하는 이런류의 map, filter를 말한다.

a = sc.parallelize(range(1, 10000000)).map(lambda x: (x,x))

a.filter(lambda x: x[1] < 100).count()

==> 출력값 : 99

2) Wide dependency : 속도가 느려지는 오퍼레이션이다.

a = sc.parallelize(range(1, 10000000)).map(lambda x: (x,x))

b = sc.parallelize(range(1, 10000)).map(lambda x: (x,x))

a.join(b).count()

==> 출력값 : 9999

출력되는 시간이 엄청 오래걸린다.

3) Join with partition (Narrow dependency) : 헤시파티셔너를 이용한것이다.

from pyspark.rdd import portable_hash

p = portable_hash(10)

a = sc.parallelize(range(1, 10000000)).map(lambda x: (x,x)).partitionBy(p)

b = sc.parallelize(range(1, 10000)).map(lambda x: (x,x)).partitionBy(p)

a.join(b).count()

==> 출력값 : 9999

출력되는 시간이 위에것보다는 짧지만 여전히 오래걸린다.

  • Caching 실습

a = sc.parallelize(range(1, 1000000000)) # 큰 RDD 정의

a.count() # 시간이 어느정도 걸림

b = a.filter(lambda x: x < 100000) # 필터링된 작은 RDD 정의

b.count() # 연산 시간이 꽤 걸림

b.count() # 다시 해봐도 연산 시간이 꽤 걸림

b.persist() # 캐싱을 수행, transpormation 명령과 같이 캐싱명령을 이렇게 날린다고 당장 뭐가 실행되는 것은 아니다. 캐싱을 하겠다고 정의하는 것이다.

b.count() # 이렇게 연산을 하면서, 결과를 캐싱을 실질적으로 수행하게 된다.

b.count() # 바로 앞에서 연산을 수행하면서 캐싱을 했기 때문에 순식간에 이번 명령어는 실행이 완료된다. 이렇게 캐싱을 써서 빨라지는 케이스는 지금과 같이 반복적인 연산이 있을때 활용하면 좋다.

  • 파이썬에는 append라는 함수가 있어서 리스트의 변경이 가능한 부분이 있으나 스칼라 언어와 functional 프로그래밍에서는 데이터를 변경하지 않는것이 기본 철학이다. immutable이라고도 하고 일단 데이터를 한번 로드하면 그것은 변경하지 않는다는 것이다. 이러한 성질은 많은 문제들을 단순화시켜주고 문제를 쉽게 만들어준다. 데이터가 중간에 변경될 수 있다고 하면 캐싱문제 뿐만이니라 병렬처리에서도 많은 문제들이 발생할 것이기 때문이다. 이런 철학은 빅데이터 세계에서 기본가정 같은 것이다.

  • 많은 사람들이 SQL을 원하는 이유

1) 익숙한 언어인 SQL로 데이터를 분석하고 싶다

2) 예전에 짜놓은 SQL 분석 코드들을 재활용하고싶다

3) 맵리듀스 프로그래밍보다 SQL이 빠르고 편하다

4) 정형화된 데이터의 경우 정형화된 방법으로 분석하면 더 높은 수준의 최적화를 구현할 수 있다

  • 스파크 데이터 프레임은 판다스 데이터프레임에서 아이디어를 얻어서 만들었지만 좀더 SQL 비슷한 것이라고 생각하면 된다. 실제 할 수 있는 명령도 slect, group by, join 이런것들을 할 수 있다.

  • 과거에는 하이브로 빅데이터 스케일의 SQL을 쓰기도 했지만 sql쿼리로 작성하고 맵리듀스로 컨버팅하는 과정이 오래걸리는 단점이 있다. 아무리 복잡한 쿼리라도 안정적으로 돌아가는것은 큰 장점이지만 몇일, 심하면 몇주가 걸리는 문제점이 있다. 이러한 느린 속도를 개선하기 위해 Presto, Spark SQL 같은 경쟁 프로젝트들이 나오게 되었다. 하이브는 현재 금융권이나 대기업에서도 많이 쓰이고 있다.

  • spark SQL

코딩이 가능한 SQL이라고 생각하면 쉽다.

1) Spark의 뛰어난 엔진을 이용하여 응답속도와 처리속도를 개선

2) 현재는 Low-level API인 RDD와 공존, 앞으로는 DataFrame, DataSet API 쪽으로 무게가 실릴 가능성 높다.

3) 머신러닝은 정형화된 데이터셋을 주로 다루므로 DataFrame API로 다시 쓰여짐

4) Spark SQL 등에서 사용하기 위해 구조화된 형태의 데이터가 필요했고 그래서 나온게 Spark DataSet, DataFrame API이다.

5) Dataset[Row] = DataFrame이다. 다시말해 데이터셋의 로우가 데이터 프레임이다.

6) RDD를 Dataset, DataFrame으로 변환 가능

7) Json파일 등에서 자동으로 스키마를 읽어들여 DataFrame을 만들 수 있고, 직접 스키마를 만들수도 있음

8) MLlib 에서도 구조화된 데이터를 다루며, 언어별 통일성 등의 장점을취하기 위해 DataFrame을 사용

  • SparkContext vs SparkSession

1) SparkContext : sc.textFile(“…”)

2) SparkSession : spark.read.csv(“…”)

  • DataFrame API : 데이터 프레임에도 transformation과 action개념이 그대로 반영되어 있다. show()가 액션이라고 보면된다.

Schema : df.printSchema(), df.show()

Transformations에 해당되는 명령어들 : df.select(), filter(), groupBy()..

  • SQL

df.createOrReplaceTempView(“table_name”) ## 테이블을 이런 명령어로 지정하면 sql을 쓸 수 있다.

spark.sql(“SELECT * FROM table_name”).show()

  • Spark SQL 주요 참고문서

1) Spark SQL Programming Guide

1-1) Spark SQL 전반적인 내용들에 대한 문서

1-2) https://spark.apache.org/docs/latest/sql-programmingguide.html

2) DataSet (DataFrame)

2-1) DS, DF의 기능들이 기술

2-2) https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.Dataset

3) DataFrame

3-1) select, filter, join, groupBy

3-2) http://spark.apache.org/docs/2.1.0/api/python/pyspark.sql.html#pyspark.sql.DataFrame

4) Functions

4-1) SQL에서 사용할 수 있는 UDF들을 기술

4-2) http://spark.apache.org/docs/2.1.0/api/python/pyspark.sql.html#modulepyspark.sql.functions

5) Column

5-1) Column-wise operation을 참고할 수 있는 문서

5-2) http://spark.apache.org/docs/2.1.0/api/python/pyspark.sql.html#pyspark.sql.Column

6) DataFrameWriter

6-1) 파일을 쓰기위한 DataFrameWriter

6-2) http://spark.apache.org/docs/2.1.0/api/python/pyspark.sql.html#pyspark.sql.DataFrameWriter

  • 왕좌의 게임 인물데이터를 이용한 스파크 실습

df = spark.read.csv(“파일설치한 경로”)

df.show()

2

df = spark.read.option(“header”, “true”).csv(“파일설치한 경로”) # 헤더 포함한 데이터를 RDD로 읽어오려면 데이터프레임을 읽어올때 헤더를 포함한다.

df.filter(df[‘Death Year’].isNull()).count() #몇명이 살아있나 산출

3