Apache Spark 클러스터링 및 스트리밍 구현실습

2019-04-01

Data_Engineering_TIL_(20190330)

study program : https://www.fastcampus.co.kr/extension_des

[학습목표]

  • AWS EMR을 이용한 스파크 클러스터링 구축실습

  • 스파크 프로그램을 이용한 로컬영역 내 클러스터링 구축실습

  • 스파크 스트리밍 구현실습

[학습기록]

# AWS EMR을 이용한 스파크 클러스터링 구축실습

스텝 1) aws emr 콘솔접속

참고로 아테나는 프레스토랑 같은기능을 하는 툴이라고 생각하면 된다.

glacier는 s3보다 속도가 많이 느린데 저장비용이 매우 저렴한 스토리지다(자기테이프라고 알려져있다).

S3는 하드디스크를 하둡처럼 데이터를 여러벌 저장할 수 있는 툴이고 비싼것이 특징이다.

스텝 2) 클러스터 생성 클릭

클러스터 생성을 클릭하면 아래와 같은 화면이 전시된다.

1

스텝 3) 클러스터 이름입력, 시작모드에서 클러스터 선택, 로깅은 로그를 어디에 남길지를 말하는 것이다.

시작모드는 두개의 옵션이 있는데 클러스터는 클러스터를 생성해서 사용자가 쓸 수 있게 해주는 것이고, 단계실행은 작업수행을 미리정의해놓고 클러스터 띄워서 종료시키는 프로비저닝 같은 기능이다.

스텝 4) emr 최신버전선택, 어플리케이션은 스파크 선택

스텝 5) 인스턴스유형은 적당한 것을 하면 되는데 m4.large로 선택해준다.

스텝 6) 인스턴스 개수는 2개로 한다 마스터 하나, 워커 하나

스텝 7) 내 키페어를 선택

스텝 8) 생성클릭

여기까지 진행 후 AWS에서 클러스터링을 구축해주는데 시간이 약간 소요된다. 결과는 아래의 이미지와 같다.

아래의 이미지에서 상단에 ‘시작’이라고 쓰여져 있는 부분은 현재 클러스터를 구축하고 있는중이라는 의미이고, 구축이 전부 완료되면 ‘시작’ -> ‘대기’가 된다. 또한 이미지 중간에 ‘네트워크 및 하드웨어’ 메뉴에서도 주항색 글씨의 ‘자동 구성’ 또는 ‘프로비저닝 중’ 표기가 ‘실행’이라는 초록색 글씨로 바뀐다.

‘네트워크 및 하드웨어’ 영역에서 가용영역이라는 것이 있는데 실제 이 하드웨어가 어디에 서버가 있는지 표기된 것이다.

‘네트워크 및 하드웨어’ 영역에서 ‘서브넷 ID’는 특정 가상클라우드 서버간 보안이 서로 뚤려있는 네트워크를 말하는데 그 네트워크 아이디를 말하는 것이다.

‘보안 및 액세스’는 IAM이라는 특정 서비스 및 하드웨어 접근 및 실행 권한을 할당할 수 있는 매니지먼트와 IAM으로부터 권한을 얻어서 사용하는 키파일들로 통상 수행을 한다. 또한 시큐리티그룹이라고 해서 서로 통신가능한 설정을 규정하고 있는 그룹이 있고 그 그룹에서 특정 접근에 대해 접근할 수 있는 권한을 설정할 수 있고, 내부에서만 접근할 수 있게 설정할 수도 있는 이런 설정을 할 수 있는 기능이 있다.

‘부트스트랩 작업’은 클러스터가 가동되었을때 무슨 작업을 처음에 수행할것인가 이것을 설정할 수 있다.

2

‘하드웨어’ 탭을 클릭하면 아래 이미지와 같이 마스터, 워커 노드를 확인 할 수 있다.

3

스텝 9) 클러스터가 생성완료되면 ‘웹 연결 활성화’를 클릭한다.

클릭하면 아래와 같은 화면이 전시되는데 사실 아래와 같이 지시하는대로 수행하면 된다.

4

5

6

스텝 10) bash 쉘을 실행하여 내 키페어가 있는 폴더로 이동한다.

스텝 11) ‘ssh -i 키페어이름.pem -ND 8157 hadoop@마스터퍼블릭DNS.ap-northeast-1.compute.amazonaws.com -v’를 입력하여 접속한다.

스텝 12) 인터넷 익스플로러가 크롬인 것을 기준으로 하면 일단, https://chrome.google.com/webstore/detail/foxyproxy-standard/gcknhkkoolaabfmlnjonogaaifnjlfnp?hl=en 로 접속하여 FoxyProxy를 설치한다.

스텝 13) 메모장 프로그램을 실행하여 위의 텍스트를 복붙하고, ‘foxyproxy-settings.xml’으로 저장한다. 인코딩 방식은 UTF-8로 한다.

FoxyProxy는 88포트가 다 막혀있기 때문에 이것을 ssh 터널링을 통해서 ssh로 접속이 가능한데, 그냥 브라우저로 접속하면 ssh를 통해서 접속하는 것이 아니라 인터넷을 통해서 마스터퍼블릭DNS로 접근하려고 시도한다. 그러면 접속이 안되기 때문에 브라우저가 ssh를 통해서 접속할 수 있도록해주는 툴이다. 그것을 가능하게 해주는 설정파일이 프록시셋팅.xml인 것이다.

브라우저에서 프록시를 설정해서 ec2 위의 주소(ssh -i 키페어이름.pem -ND 8157 hadoop@마스터퍼블릭DNS.ap-northeast-1.compute.amazonaws.com -v)를 8890으로 접속하게 되면 인터넷을 통해서 접속하는 것이 아니라 8157 포트의 ssh 터널을 통해서 접속할 수 있다.

스텝 14) 위의 그림 4. ~ 8. 지시내용을 수행한다.

스텝 15) 아래의 화면으로 돌아가서 새로고침을 하게되면 ‘연결’에서 spark, 리소스 관리자 등의 링크가 활성화 될 것이다.

스텝 16) 활성화 된 링크로 접속하여 정상작동하는지 확인한다.

2

# 스파크 프로그램을 이용한 로컬영역 내 클러스터링 구축실습

리눅스 운영체제 환경기준

스텝1) 스파크 설치한 디렉토리에 접속, sbin폴더 접속

sbin폴더의 구성파일들은 아래 그림과 같다.

7

스텝2) 마스터서버 실행하면 되는데 ./start-master.sh 명령어를 실행한다.

스텝3) 크롬열어서 localhost:8080 접속하여 제대로 띄워졌는지 확인

localhost:8080는 스파크 마스터 관리 UI를 말한다

스텝4) ./start-slave.sh -m 4G spark://minman-VirtualBox:7077 명령어 실행

스텝5) 여기까지하면 클러스터는 완성이 되어있고 어플리케이션을 띄워서 자원을 써주면 된다.

./pyspark –help 명령어를 실행하면 파이스파크에 대한 옵션을 볼 수 있다.

스텝6) ./pyspark –master spark://minman-VirtualBox:7077 명령어 실행

스텝7) 클러스터를 중지하고 삭제하고 싶을 경우 아래와 같은 순서대로 명령어를 실행하면 된다.

./stop-slave.sh 명령어 실행 -> ./stop-master.sh 명령어 실행

통상 마스터와 슬래이브가 컴퓨터가 다를 것이다. 그 경우에는 stop-slave 명령어를 실행하려면 슬래이브 컴퓨터에서 입력을 해야하고, stop-slaves는 마스터에서 실행할 수 있는 명령어이다. 마스터에서 슬레이브로 ssh 접속이 가능하게 셋업을 하면 슬레이브로 접속해서 스탑슬레이브를 하는 구조이다. 방금 한 실습은 로컬에서 했기 때문에 로컬로 슬레이브에 ssh접속한다음에 스탑슬레이브를 실행한 경우이다.

# 스파크 스트리밍 구현실습

스텝 1) 리눅스 쉘 2개를 띄우고 1개에서는 ‘nc -lk 9999’ 명령어 실행

Linux nc command란

nc is the command which runs netcat, a simple Unix utility that reads and writes data across network connections, using the TCP or UDP protocol.

관련 URL : https://www.computerhope.com/unix/nc.htm

스텝 2) 나머지 1개의 쉘에서 pyspark 실행

스텝 3) 아래의 코드들을 실행

from pyspark.sql import SparkSession
from pyspark.sql.functions import explode
from pyspark.sql.functions import split

lines = spark \
    .readStream \
    .format("socket") \
    .option("host", "minman-VirtualBox") \
    .option("port", 9999) \
    .load()

# 주의 점으로는 옵션에서 본인의 로컬에서 'hostname'을 확인하고 그것을 입력해야함
# EX) 쉘에서 'hostname' 명령어를 입력하면 결과로 minman-VirtualBox 라고 나옴


# Split the lines into words
words = lines.select(
   explode(
       split(lines.value, " ")
   ).alias("word")
)

# Generate running word count
wordCounts = words.groupBy("word").count()

# Start running the query that prints the running counts to the console
query = wordCounts \
    .writeStream \
    .outputMode("complete") \
    .format("console") \
    .start()

## 스트리밍 종료를 원하면 다음과 같은 명령어를 입력하면 된다. query.awaitTermination()

위와 같이 코드를 실행하면 writer와 listener 같은 구도를 나타내고, awaitTemination에서 소켓을 열고 인풋을 기다렸다가 들어오면 워드카운팅을 하는 형태이다.

스텝 4) ‘nc -lk 9999’를 실행한 쉘에서 원하는 문장을 입력하면 입력한 문장을 wordcount해주고, wordcount된 결과는 pyspark 쉘에 전시가 된다.

그래서 ‘nc -lk 9999’를 실행한 쉘에서 예를들어 “hi minsu my name is cathy”를 입력해보았다.

위의코드 실행과정 1) pyspark 실행한 쉘에서 스트리밍을 위한 코드를 입력하여 실행함

8

9

위의코드 실행과정 2) ‘nc -lk 9999’를 실행한 쉘에서 “hi minsu my name is cathy”를 입력한다.

10

위의코드 실행과정 3) pyspark 실행한 쉘에서 아래와 같은 결과가 전시된다.

11

위와 같이 스트리밍이나 이벤트 큐로는 최근에는 카프카를 많이 쓰고 있다.

카프카 관련 URL : https://soul0.tistory.com/498

넷플릭스의 Keystone Real-time Stream Processing Platform 사례 : https://medium.com/netflix-techblog/keystone-real-time-stream-processing-platform-a3ee651812a

우버의 실시간 처리 사례 : https://eng.uber.com/reliable-reprocessing/