티아카데미 아파치 스파크 입문과 활용 TIL - spark 개념과 활용

2020-12-03

.

Data_Engineering_TIL(20201202)

study program : T아카데미 - 아파치 스파크 입문과 활용

** URL : https://tacademy.skplanet.com/frontMain.action

[학습내용]

  • HDFS에서 블락단위는 read&write하는 단위이다. HDFS는 사용하는데 가장 워스트 케이스는 작은 파일을이 다수 존재하는 경우이다. 이 경우에서 이 데이터를 HDFS에서 잘 사용하고 싶다면 이 작은 파일들을 병합하는 등의 추가적으로 작업을 해줘야 한다.

  • HDFS에서 하나의 블락은 하나의 노드에만 존재하는 것이 아니라 기본적으로 3개 카피로 복제되어 저장된다. 이는 특정 노드에 문제가 생겼을때 fault tolerance를 보장하기 위함이다. 또한 블럭이 노드에 고르게 분배되어 저장된다면 병렬처리시에 효율성도 올라가게 된다.

  • spark은 hadoop mapreduce의 사상을 그대로 계승했다.

  • Hadoop mapreduce의 Word Count Example

어떤 file이 있고 이 file이 용량이 좀 커서 4개의 block으로 구성되어 있다. 이 블락들은 특정노드들에 분산되어 위치하게 된다. Map하는 단계에서는 각각의 노드가 자기노드에 있는 각 단어들을 tokenizing해서 몇번씩 나오는지 counting을 하게 된다. shuffle과 sort를 통해 각 노드에서 산출된 결과를 갖고 I는 I끼리, sam은 sam끼리 accumulate 해준다. 그런 다음에 reduce하는 작업을 통해 이 결과들을 취합해서 i는 몇번 나왔는지 sam은 몇번나왔는지가 도출되는 것이다.

image

  • spark은 2009년에 UC 버클리에서 개발을 시작했고, cluster computing을 좀 더 빠르게 하는 목적을 갖은 프로젝트였다. 결론적으로 hadoop mapreduce보다 빠른 처리를 하는 것이 목표였다.

“Fast and general purpose cluster computing system”

  • Most popular for running Iterative Machine Learning Algorithms

머신러닝 알고리즘 같은 경우 반복적으로 데이터에 접근해서 Iterative하게 연산을 하는게 일반적이다. 기존의 하둡 mapreduce는 매번 연산요청이 있을때마다 데이터 접근을 위해 디스크로 내려가게 된다. 반면에 spark은 이런 disk i/o가 비효율적이기 때문에 inmemory에서 처리하므로 성능이 빠를 수 밖에 없다.

  • 그래서 spark은 뭐냐

Unified Computing Engine and a Set of Libraries for Parallel Data Processing on Computer Clusters

컴퓨터 클러스터에서 병렬 데이터 프로세싱을 하는 모든 라이브러리의 집합 & 통합된 컴퓨팅 엔진

로우레벨 API로 대표적으로 RDD가 있고 정형데이터 처리를 위한 API는 대표적으로 spark dataframe이 있다.

정형화된 스트리밍 데이터 같은 경우에는 spark dataframe과 spark streaming이 결합된 형태로 structed streaming api를 사용할 수 있다.

2

  • 그러면 Hadoop MapReduce와 spark은 프로세싱 관점에서 어떤 차이가 있냐

윗쪽에 있는 그림은 mapreduce에서 어떤 데이터를 처리하는 예제고, 아래쪽에 있는 그림은 SQL on Hadoop 기술을 이용해서 sql 쿼리로 어떤 데이터를 처리하는 그림이다.

iteration 연산을 할때마다 HDFS 디스크에 읽고 쓰는 비효율성이 있다. sql도 마찬가지로 sql query를 하면 HDFS가 읽어서 결과를 리턴하는 방식이다.

3

반면에 스파크에서는 디스크 I/O가 발생할 수 있는 부분에 대해서 메모리에서 전부 처리하도록 개선이 되었다.

그리고 shuffle&sort 연산은 네트워크 I/O가 발생한다. 왜냐하면 모든 노드에 있는 값들을 전부 모아서 reducer 쪽으로 보내야하기 때문이다.

그래서 디스크 기준으로는 10배, 분산 메로리를 이용했을때는 100배 이상 빠르게 연산할 수 있고, 디스크 I/O와 네트워크 I/O를 줄일수 있다는 것이 spark의 컨셉이다.

4

데이터를 tokenizing하고 뭔가 연산을 하는 내용이다. hadoop은 mapreduce framework가 있어서 함수를 구현하도록 되어 있다. 로우레벨의 api라고 할 수 있다. spark은 기본적으로 scala 언어로 어플리케이션이 구현되는게 일반적으로 함수형 언어의 간결하고 성능도 빠르고 추상화도 잘되어 있는 장점이 있다. 따라서 개발의 생산성 측면에서도 spark이 hadoop mapreduce 대비해서 장점이 있다고 할 수 있다.

5

  • sort competition

데이터를 연산했을때 리소스를 얼마나 소비하고, 성능이 어떻게 나오는지 엔진의 우수성을 판단하고자 만든 참고자료다.

결론은 spark로 연산했을때 더 적은 노드로 더 빠르게 연산할 수 있다는 것이 결론이다.

6

  • 그래서 spark으로 무엇을 할 수 있는가

Apache Spark supports data analysis, machine learning, graphs, streaming data, etc. It can read/write from a range of data types and allows development in multiple languages.

7

  • spark의 basic 아키텍처

yarn과 같은 cluster manager에 사용자가 개발한 user code(spark application)를 만들어서 spark-submit을 하면 driver process가 뜨게 된다. driver process에서는 spark session을 생성한다. spark-submit을 할때 클러스터의 리소스를 어떻게 쓸지 지정해서 제출할 수 있다. 예를 들어서 익스큐터 갯수와 코어수 메모리 규모를 지정할 수 있다. 그래서 이런식으로 할당받은 익스큐터에서 user code가 처리되는 구조이다.

8

  • Spark Language APIs

스칼라, 파이썬 등 다양한 언어를 지원하지만 기본적으로 spark은 JVM 안에서 구동이 된다. 그리고 실제 연산을 하는 것은 다수의 익스큐터들이 하기 때문에 스칼라, 파이썬 등 이런 언어들은 spark을 쓰기 위한 인터페이스라고 이해하면 된다.

9

  • spark dataframe

pandas dataframe과는 다르게 단일노드가 아니라 멀티 클러스터 환경에서 데이터를 분산병렬처리하는 api라고 할 수 있다.

  • spark에서 데이터를 처리하는 End-to-End Example

json 또는 csv 데이터 형태의 특징은 스키마 정보를 포함하고 있다. 이 스키마 정보를 사용자가 직접 정의해줄 수도 있고, 아니면 파일 자체에 header 값을 읽어서 처리할 수도 있다.

아래에서 narrow는 mapreduce의 map이라고 이해하면 되고, wide는 shuffle&sort 연산으로 이해하면 된다. map은 각각의 노드에서 그 노드가 갖고 있는 블럭을 각각 연산하는 것이고, 디센딩이나 어센딩해서 데이터를 sort하고 가져올 수도 있다.

10

또는 csv 파일 읽고 –> 데이터 프레임 생성하고 –> 그룹바이(특정키값으로 aggregation하는 function) 연산을 하고 –> sum하는 등 데이터 프레임을 변형하는 연산이 쭉 이어진다. 그러면 데이터 프레임이 n개가 계속 생긴다. 아래에 파란색으로 연산되는 부분을 transformation 연산이 된다고 부른다. transformation 연산은 내가 원하는 어떤 데이터 형태로 변환하는 작업을 말한다. 모든 spark의 이벤트 발생은 transformation을 한다고해서 연산이 일어나지 않는다. 아래 그림에서 빨간색 글씨로 collect라는 action 연산을 명령하는 순간 이전의 transformation 연산들이 모두 수행이 된다.

11

  • spark use case

1) Streaming Data

2) Machine Learning

3) Interactive Analysis

4) Data Warehousing

5) Batch Processing

6) Exploratory Data Analysis (EDA)

7) Graph Data Analysis

8) Spatial (GIS) Data Analysis

9) And many more

  • spark을 쓰지 말아야하는 경우

Even though Spark is versatile, that doesn’t mean Spark’s in-memory capabilities are the best fit for all use cases:

1) For many simple use cases Apache MapReduce and Hive might be a more appropriate choice

스팍이 불가능한 use case가 간혹 있다. 데이터가 너무 복잡하거나 안정성 측면에서 고려했을때 스팍으로 사용하는 장점이 없는 경우도 있다.

2) Spark was not designed as a multi-user environment

데이터 분석정도는 가능할지 모르나 BI사용을 위해 아주 헤비한 Query를 무분별하게 쓰는 경우(이런 경우는 차라리 DB를 쓰는게 낫다)는 지양해야 한다. 스팍이 데이터를 읽는구조가 shared storage나 HDFS에서 데이터를 읽기 때문에 디스크 I/O가 있을수 있고, 어떤 쿼리는 DB처럼 성능이 안나올 수도 있다. DB는 스팍과 다르게 인덱스 구조를 갖고 있고, Query에 최적화 되어있기 때문이다.

3) Spark users are required to know that memory they have is sufficient for a dataset

경우에 따라 메모리 이슈가 발생하는 경우가 종종 있다.

4) Adding more users adds complications, since the users will have to coordinate memory usage to run code

초창기 spark 버전에서는 로우레벨 api인 RDD를 이용해서 데이터를 처리하기 때문에 메모리 관리를 사용자가 직접해줘야 하는 부분들이 있어서 까다로운 점이 있었다.

  • [use case] sk텔레콤 Big Telco Realtime Network Analytics

** 관련내용 참고자료 : https://www2.slideshare.net/jerryjung7/stsg17-speaker-yousunjeong

무선통신 네트워크의 품질을 모니터링하고 성능저하에 선제적으로 조치하기 위해서 아폴로라는 과제를 했다. 실시간 데이터 분석과제였다.

베이스 스테이션 스토리지라고 해서 sk텔레콤의 전국의 기지국에서 각각 축적하는 데이터를 실시간으로 수집을 한다. 그런 다음에 실시간으로

카프카가 이 각각의 기지국으로부터 데이터를 수집을 하고, spark streaming을 통해서 데이터를 10초 단위로 통계정보를 생성한다.

어려웠던 점은 기지국에서 오는 데이터 타입이 하나의 데이터 타입이 아니라 여러 데이터 타입이 있었는데 이것들을 파싱하고 10초마다 통계정보를 생성했다. 그런 다음에 HDFS에 10초단위로 또 적제를 하고, 네트워크 품질에 영향을 요소에 대해 abnormality detection을 하게 된다. 예를 들어서 네트워크 장비의 온도라던지, 트레픽 정보에서 비정상적인 정보라던지.

만약에 어떤 행사나 사건으로 인해서 트레픽이 갑자기 몰리는 경우가 있는데 이런 경우를 대비해서 튜닝할 수 있도록 했다. 그리고 유저나 서비스 레벨에서 최적화가 될 수 있도록 했다.

스팍으로 데이터를 처리함과 동시에 실시간 대시보드로 처리하여 알람을 전시하는 것도 구현하였고, 또한 해당 기지국에 들어가서 기본적인 metric 정보를 가져와서 또 실시간 대시보드로 전시할 수 있게도 구현했다.

12

일부 네트워크 작업내용에 대해서는 Hadoop Datawarehouse를 구축하였다. 기존의 RDBMS에서 하드웨어를 기준으로 했을때 MPP나 다양한 것들이 가능했다. 그러나 이게 어플라이언스 형태로 상당히 고비용이었다. 무슨말이냐면 어떤 RDBMS 솔루션을 도입한다면 하드웨어+솔루션 세트로 들어오게 된다는 것이다. 그래서 이런 RDBMS 인프라에서 Hadoop Datawarehouse로 전환을 했었다. Hadoop Datawarehouse로 전환을 해서 spark sql을 사용하고자 했다. 저렴한 코모디티 하드웨어를 스케일 아웃할 수 있는 형태로 비용을 절감했다.

13

기존의 문제점은 각각의 네트워크 종류에 따라 네트워크 구간이 많았다. 또한 NMS라고해서 각각의 네트워크를 모니터링하고 품질을 측정하는 서비스가 130개정도의 각각의 DBMS에 전부 각각 존재했다. 이거를 하둡 데이터웨어하우스에 전부 통합했다.

만약에 특정 네트워크 구간에서 문제가 생기면 각 구간에서 데이터를 관리하고 있었다. 특정장비나 특정노드가 문제가 생기면 서비스 품질에 영향을 주기 때문에 그래서 데이터를 하둡 데이터웨어하우스에 통합하고, 네트워크 failure에 대해서 추적할 수 있도록 했다. 그래서 통합한 데이터를 갖고 spark 으로 분석할 수 있도록 했다.

14

[기타 spark 참고사항]

  • spark 익스큐터가 실제 spark code에 대해 일을 하느 주체고, 드라이버는 익스큐터에서 수행한 결과를 모으는 작업을 한다.

  • 기본적으로 hadoop의 file system은 append only다. RDBMS에서 제공하는 update나 delete 같은 transaction에는 적합하지 않을 수 있다.