실시간 데이터 처리를 위한 분산컴퓨팅 개론

2020-04-14

.

Data_Engineering_TIL_(20200414)

카프카, 스팍 스트리밍 등 실시간데이터 분산컴퓨팅에 관련한 학습내용 정리

1. 카프카

단위데이터 메세지라고 하기도 하고, 이벤트라고 하기도 한다.

이런 데이터들이 들어와서 쌓이면 이 쌓여있는 데이터를 끄집어갈 수 있는 저장소로 생각하면 된다. 순차성을 유지하면서 queue된다.

보통은 이벤트가 들어오면 여러머신에 쌓는다. 왜 그러냐면 보안에 한계가 있고, 또한 만약에 전세계 겔럭시 사용자들이 쏘는 데이터가 들어온다고 하면 그거를 받아서 어떤 로직을 돌려서 빠른시간안에 우리가 원하는 결과를 내야한다. 초당 만약에 수천억개씩 데이터가 들어오면 어떻게 처리할거냐 라는 문제가 생긴다. 이 수많은 데이터들이 큐안에 쌓였다해도 컨슈머가 데이터를 한개씩 뽑아먹는다면 느려서 일이 안된다는 것이다.

그래서 나온게 또 분산컴튜팅이다. 다수의 컴퓨터를 이용해서 데이터가 쌓이는 속도 만큼 뽑아먹자 이거다. 데이터가 들어올때는 기본적으로는 라운드 로빈으로 각 토픽으로 들어간다.

어쨌든 분산되어 있다는게 중요하다. 각 토픽당 데이터를 분산처리하는 단위가 파티션(노드단위)이다. 카프카에서는 각 파티션당 복제된 파티션 중에서 하나의 리더가 선출된다. 이 리더는 모든 읽기, 쓰기 연산을 담당한다. 리더를 제외한 나머지는 팔로워가 되고, 이들은 단순히 리더의 데이터를 복사하는 역할만 한다.

뽑아가는 컨슈머가 데이터를 뽑아갈때 마킹을 해야하는데 그거를 offset이라고 한다. 토픽에서 헬스한지 안헬스한지의 노드의 상태정보도 중요하고, 얼마나 데이커가 쌓여있는지 중요하고, 얼마나 데이터를 뽑아갔는지도 중요하고, 이런것들이 중요하다. 그런것들을 코디네이트 주키퍼가 해준다.

카프카는 마스터 슬레이브 구조가 아니다. 물론 논리적으로는 논리적으로 파티션해서 리더가 있고 팔로워가 있기는 하지만 중앙관제라는 개념이 없다. 대신에 파티션 1, 파티션 2, 파티션 3으로 되어 있다면 하나의 토픽을 쪼갤때 이말은 컨슈머가 최소 3개가 있다는 말이다. 얘네들을 컨슈머 그룹이라고 부른다. 그러면 같은 토픽을 컨슈머 그룹이 뽑아갔네라고 하면 이 정보들이 같이 인지가 되고, 주키퍼에 기록이 된다.

토픽당, 파티션당, 컨슈머 그룹당 offset 설정도 기록이 된다. 그리고 상호교환을 한다.

카프카 ui라고 ui 툴도 있다. 위와 같은 정보들을 가져와서 디스플레이 하는것이다. 쌓인게 얼마고 뽑아먹은게 얼마고 이런것까지 다 보여준다. 왜냐하면 주키퍼에 해당 정보가 저장되어 있기 때문에 가능하다. 그래서 카프카가 분산큐라는 것이다.

그런데 카프카가 왜 헤비해지냐면 고려할게 많기 때문이다. 메세지의 사이즈에 따라 구성을 고려해야하고, 카프카의 기본원리가 메모리에 넣었다가 얼마 시간이 지나면 파일로 만들어서 내려버린다.

하나의 토픽을 3개의 파티션으로 만들면 이 파티션에서는 복제 개념인 replication이 있다. 얘도 데이터가 깨졌을때, 장애가 났을때 복구를 해야하기 때문이다. 홀수배이든 단독으로 놓던 복제본을 만든다. 파티션의 복제본은 파티션 1번은 세개 파티션중에 reader인 복제번 1번이 있다. 그거를 파티션의 리더라고 한다. 기본적으로 따로 배정을 안하면 파티션이 배정된 첫번째 노드가 리더가 되고 나머지는 팔로워라고 해서 리플리카가 요기도 복제가 된다. 왜 리더가 중요하냐면 리더한테 쓰기를 시키고 리러한테 read를 하기 때문이다. 백업본에는 read write가 안일어난다. 백단에서 복제만 일어난다. 실질적으로 io발생은 파티션 리더한테만 일어난다.

근데 보통은 처음에 셋팅하면 카프카가 알아서 밸런싱을 해준다. 그런데 장애가 나거나 주키퍼 통신에 문제가 생겨서 특정노드가 이탈을 하면 인싱크그룹이라는 곳에서 리더 순차번호가 매겨져 있는데 순차번호 다음번호로 리더 배정을 해준다. 예를들어서 1번노드가 고장나면 2번이 받아서하고 2번이 고장나면 3번이 받아서 하는 이런 구조다. 그리고 복제그룹에서 튕겨져 나간 노드는 탈퇴된다. 노드가 날아갔으니까.

토픽이 많고 배정한게 많으면 잘못하면 위와 같이 비정상적인 상황에서 부하가 확올라가는 파티션 리더 언밸런싱이 발생할 수 있다. 한노드만 피지컬적으로 자원을 과도하게 쓰는 상황을 말한다. 여러개가 자원을 고르게 써야하는데.

카프카의 기본사상이 거의 동일한 시점에 데이터를 당겨와서 동일하게 처리하고 동일하게 마킹한다는 것이다. 이런 여러 노드의 균형을 잡아주는 것이 중요하다.

그래서 상태관리할때는 모니터링해줘야 할게 까다로운 편이다. 위와 같이 언벨린싱한 현상이 발생하는 순간 클러스터 전체가 느려질 수 있다.

0.8버전부터는 리벨런싱을 오토로 한다고 하지만은 잘 안될수도 있다. 위와 같은 괴정처럼 끊어졌다 올라갔다 이런게 반복되면 불균형이 발생하고 그러면 클러스터에 악영향을 줄수밖에 없다.

많은양의 데이터가 오고가면 메모리 작업도 많아진다. 데이터 보유량도 많아지고.. 그러면 이 메모리에 적재된 데이터가 많으면 많을 수록 가비지컬랙션 작업도 잘 해줘야한다. 그러나 full GC작업해도 소요시간이 걸린다. full GC하면 그렇게 되면 서비스가 스탑될 수 밖에 없다. 아무것도 못한다. full GC일때 아무동작도 못하기 때문이다. 이런 경우가 자주 있기 때문에 카프카를 사용할때는 GC옵션도 잘 줘야 한다. 특히 자바 VM을 쓰는 것들은 잘 줘야 한다.

자바 VM안에는 많은 것들이 돌아갈 수 있다. 주키퍼에 연결 또는 GC 동안에는 보통은 헬스체크 컨넥션 인터벌이 3초 정도로 짧게 된다. 뭔가 문제가 있으면 주키퍼 컨넥션이 계속 리셋 현상이 반복적으로 일어나고 그러면 클러스터에서 이탈이 일어났거나 문제가 생겼다고 판단할 수 있다. 그러면 또 노드간에 불균형이 일어나기 시작하는 문제로 이어진다. 이런 문제가 발생하면 따라서 정리작업을 확실하게 조치를 해줘야지 안그러면 계속 이런문제들이 반복되서 클러스터 전체적으로 불균형에 의한 악영향이 발생한다.

그래서 자원추적을 할때는 리소스들 어떻게 쓰는지 보고, 파티션단에서는 렉이 얼마나 발생하는지 한쪽이 많이 데이터가 누적되고 한쪽은 또 너무 빨리 데이터가 소모되는지 불균형이 발생하는 것을 체크를 잘 해줘야한다. 피지컬도 보고, 사용률도 보고, 토픽도 describe해서 리벨런싱이 잘못되어 있는지 체크도 해줘야한다. describe topic은 각 파티션에 대해서 리드가 몇번인지, 복제본의 그룹은 어떻게 이루어져 있는지 보고 잘못되어 있으면 파티션 재정렬 작업을 해줘야한다. 이 재정렬 작업기능은 카프카에서 제공하는 기능이다.

QA ) 카프카의 1차 헬스체크는 그러면 노드들이 균등하게 작업을 잘 하고 있는지를 체크하는 것인지

A ) 그렇다. 파티션 노드도 설계도 잘해야한다. 노드가 3개인데 토픽을 4개의 파티션 노드로 나눈다고 하면 허느 한 노드는 2개의 파티션을 갖고 있어야 한다. 이런식으로 설계하면 안된다는 것이다. 차라리 2개, 2개, 2개 하던지 1개, 1개, 1개 하던지 균형을 맞춰줘야 한다.

무식하게 노드가 3개인데 빠르게 처리하는 스팍execute 동시에 처리자를 9대를 주고 파티션을 파티션을 9개라고 하면 잘 돌아가겠지라는 것도 바람직하지 않다. 처음에는 3대씩 배정받아서 잘 된다. 그런데 위에서 언급했듯이 하나의 파티션이 무너지기 시작하면 전체가 균형이 깨져서 와르르 무너질 소지가 있다. 그래서 너무 과하게 파티션을 가져갈 필요도 없다. 1대가 3개씩 파티셔닝이 배정된 것들이 리벨런싱되면 벨런싱이 잘 안될 수 있다. 이 리벨런싱이 두번,세번,네번.. 점점 하다보면 점점 균형이 깨지면서 리벨런싱이 더 어려워진다. 이게 또 잘 체크할수 없어서 관리하기 힘들다는 것이다.

파티션리더는 파티션 노드가 세개 있다면 이 노드들의 리더라는 얘기가 하니다. 한 파티션의 복제본간에 리더를 말하는 것이다. 즉 한카피에 대한 리더를 얘기하는 것이다.

파티션 하나에 데이터가 들어올때 이 데이터본을 다른 노드에다가도 만드는데 그 중에서 I/O가 일어나는 리더를 정하는데 그게 파티션리더인 것이다.

카프카와 같이 노드가 균형있게 리소스를 쓰면서 동작해야 한다는 사상은 HDFS도 마찬가지다.

하이브이던 스팍이던 MR이던 모든 노드가 동일하게 일을 수행완료해서 동일하게 결과를 서머리 할 수 있어야 한다. 그런데 만약에 특정노드가 너무 많은 일을 혼자서 하고 나머지 노드는 상대적으로 너무 적은 일을 한다? 나머지 노드들은 일이 금방끝나서 가만히 기다린다 특정노드가 일을 다 끝낼때까지. 이러면 클러스터의 의미가 없다. 이런 문제는 왜발생하냐 HDFS의 데이터 보유량이 그 특정노드가 너무 많이 갖고 있어서 그렇다. 그래서 블락을 리벨런싱하는 기능이 HDFS가 잘되어 있는 이유가 여기에 있는 것이다. 5TB 사이즈 객체 하나가 있다고 치자. 이거를 HDFS가 노드의 갯수를 고려해서 블락단위로 등분을 슬라이싱을 균등하게 하는 것이다.

HDFS도 크고 많은 파일들이 저장될 수록 언벨런싱이 생길수 밖에 없는데 이런 경우를 잘 캐치하고 리벨런싱을 잘 해줘야한다. HDFS 리벨런싱은 HDFS에서 기능적으로 제공해준다.

HDFS filecheck라는 기능을 이용하면 벨런싱 상태를 볼 수 있다. 일단 감지는 ‘잡이 돌아갈때 특정노드가 느리네’ 라는 것에서 시작하고, ‘리소스 사용량도 거의 동일한데?, 리소스도 모자른거 같지 않는데 이상하네?’를 감지 할 수 있고, 그런다음에 감지할 수 있는게 특정노드에 IO가 많은거 뭔가 많이 읽는 노드가 있으면 filecheck을 해보면 특정노드에 데이터가 몰려서 저장되어 있는것을 확인할 수 있을것이다. 그러면 리벨런스 API를 통해서 명령어를 날리면 최소 쓰레드홀 10%로 만약에 설정하면 노드간에 10% 차이까지는 언벨런싱을 허용한다는 의미로 리벨런싱을 수행하게 된다. 이 쓰레드홀을 조절할 수 있다.

이 리벨런싱 작업은 IO + 네트워크 작업이다보니 비용이 비싸다. 또한 서비스도 리벨런싱하는 동안 올스탑된다. 원천들이 옮겨가기 때문이다.

카프카도 기본적으로 특정시점에서는 백그라운드에서 알아서 리벨런싱하도록 설계되어 있다. kafka read rebalancing.enable = True or False로 설정할 수 있다. 그러나 카프카 구조가 복잡해질 수록 이런 자동으로 처리하는 것도 한계가 있다. 카프카도 마찬가지로 리벨런싱할때 서비스를 다 내려야한다. 만약에 신생노드가 들어오면 리벨런싱 작업시간이 더 오래 걸린다. 데이터가 무빙하기 때문이다. 그룹에 신생노드를 추가하고 토픽자체를 리벨런싱하라는 명령을 날리면된다.

QA) 카프카는 파티션이 추가가 될수있지만 뺄수는 없다는 말은 무슨말인지?

A) 위에 언급한 내용과 결국에는 같은 얘기다. 기존에 데이터가 있으니까 그것들을 쪼개서 재분배하는 것은 어렵지 않는데 파티션을 줄인다? 줄이는 작업은 프로세스가 중요하다. 줄일때 무슨일이 일어나냐면 원천데이터를 갖고 있음에도 불구하고 싹 비워버린다. 원천소스가 날아가버린다는 말이다. 비우기 전에 기존에 있는 노드들로 데이터들을 재분배해줘야하는데 이렇게 하도록 아직 카프카 소스가 안만들어져있다. 카프카에서 이걸 개선하려고 노력하고 있다.

카프카는 기본사상은 가급적이면 데이터가 들어오는 대로 바로바로 소비가 되는 것이 중요하다. 다시말해서 프로듀서와 컨슈머가 속도가 잘 맞으면 좋다. 소비하는 속도보다 데이터가 들어오는 속도가 더 빠르다? 결국에는 문제가 생길 수 밖에 없다.

카프카로 데이터가 들어오면 플러싱인터벌 사이즈가 되기 전까지는 일단 메모리에 넣는다. 그런데 들어간 데이터가 시간이 지나서 낡은 메세지가 된다 그러면 새로운 메세지가 보관하기 위해 이 낡은 메세지를 파일(디스크로 저장)로 비워야 한다. 당연히 메모리에서 읽는것보다 파일로 읽는것이 IO비용이 더 많이 든다. 따라서 컨슈머의 소비 속도가 중요하다.

하지만 모든 어플리케이션이 모두 꾸준한 속도를 유지할 수는 없다. 기본적으로 모든어플리케이션들은 시스템에 자원을 쓰는데 시스템 자원들이 남으면 상관없지만 언젠가는 결국에는 문제가 발생할수 밖에 없다. 그러면 카프카 입장에서는 외부에서 시스템 자원을 덜쓰고 있을때는 많이 처리하고, 반면에 외부에서 시스템 자원을 많이 쓰면 조금만 처리할 수 있도록 자동 조정되는게 가장 좋다. 카프카는 스트리밍이 자기가 알아서 인풋(블락)을 자동으로 계속 계산해서 조절한다. 과거에 데이터가 들어오는 속도를 기록해두고 그걸 참고해서 조절한다. 이걸 백프레셔라고 한다. 사실 이런 개념없이 막 쌓기만 하면 언젠가는 무리가 오기 때문에 당연한 것이다.

2. 스팍 스트리밍

카프카는 저장하고 보관하는 큐의 개념이다. 반면에 스팍스트리밍은 기본적으로는 스팍이다. 그런데 라이브러리 하나인것이다. 뭐냐하면 원천에서 끊임없이 데이터를 뽑아와서 끊임없이 select를 하는것이다. 끊임없이 get을 한다는 얘기이다. 즉 끈임없이 컨슈밍을 한다는 얘기이다. 내가 정한 poll 사이즈만큼 할 수 있다.예를들어서 10000개 갖고과 1000개 갖고와 . 그렇게해서 데이터프레임이나 원하는 사이즈의 데이터셋에 데이터를 담는 것이다.

전통적인 실시간 데이터 처리방식은 정해진 시간에 예를들어서 1시간마다 한번씩 데이터 10000개를 처리하는데 처리하는 시간이 한시간 걸린다. 문제는 데이터를 가져와서 result까지 결과를 도출하는 시간이 갭이 1시간이라는 것이다. 한시간 동안 저장할 수 있는 자원도 필요하다.그러면 이렇게 데이터를 처리하는 것을 10분단위로 쪼개는 개념으로 해보자. 그러면 약 2000개씩 10분단위로 쪼개서 배치로 짜면된다. 이거를 더 잘개쪼개서 1분단위로 해보자. 그러면 처리해야할 사이즈도 줄고 빠르게 처리할 수 있을것이다. 이렇게 아주 작은 시간단위로 배치로 쪼개서 해보자 라는게 스팍 스트리밍이다. 그래서 스팍스트리밍을 마이크로 배치라고도 부른다. 결국에는 배치고 스팍에서는 윈도우라는 개념을 쓴다. 데이터 개수이던 크기던 윈도우 개념으로 인풋을 받아서 그게 마이크로 배치한다는 것이다.

그런데 중요한것은 배치 1번잡과 배치 2번잡은 시리얼하게 진행된다. 즉 1번잡이 끝나야 2번잡이 진행된다.

만약에 앞에서 처리한 데이터를 뒷단에서도 참조하고 싶다. 예를들어 앞에서 수집한 시간데이터 누적치의 계산 서머리를 뒷단에 넘기고 누적치를 유동적으로 변화시켜서 할수 없을까 라는 생각을 할 수 있다. 원래 스팍스트리밍은 데이터를 처리하면 싹다 메모리를 비워버린다. 과거 배치의 데이터를 참고할 수 없는 구조이다.

그런데 특정메세지가 들어오면 약간 딜레이되는 condition을 만들어서 , 그 다음 후속에서 마킹을 줘서 다음 윈도우에 반영시킬수는 없을까? 라는 개념이 나온게 2.0부터이다.

이런 한계는 네트워크 문제 때문이다. 특정시간에 어떤 데이터를 처리한다고 해도 같은 클러스터내 노드끼리의 거리거 수만km이상 떨어져 있으면 데이터를 동시에 같은 시간에 처리한다고해도 네트워크를 타고 오면서 마이크로배치가 딜레이 되는 경우가 있다. 스팍1.0대에서는 이런경우는 오류가 일어난다. 그래서 스파크2.0에서는 워터마크를 살짝 딜레이 하는 기능을 또 추가했다. 최소한의 쓰레시홀드를 잡아서 할 수 있다.

대시보드를 보고 있다가 사용자가 어느정도 타임을 딜레이시켜서 오는 데이터까지를 인풋으로 잡아야겠네 라는 설정이 가능하다.

다른 애코시스템도 마찬가지겠지만 메이저 버전이 바뀌면 많은 개념이 바뀌게 된다. 연계도 달라지고 기능도 달라진다.

3. 샤드

QA) 샤드라는 개념도 파티션이랑 비슷한 것인지

A) 파티션의 개념과 비슷하다. Replication은 똑같은 데이터를 다른곳에 저장하는 것이다. 디스크도 그만큼 많이 들 것이다. 복구용으로 쓰는것이다. 반면에 샤드는 리플리카가 있던없던 원천데이터는 하나인데 여러개로 슬라이싱해서 각각의 데이터 프레임으로 분리하는 것이다. 예를 들어서 특정 데이터 슬라이스를 조회하는 구간은 날짜던 뭐든 슬라이스를 구분할 수 있는 샤드키가 있다. 해시테이블로 딱 나누면 각각의 슬라이스 범위를 나눌 수 있도록 구분을 해줄 수 있다. 샤드번호를 기준으로 슬라이스를 나눈다. 샤드번호는 누군가는 갖고 있어야 한다. 중앙관제이던 어디든. 슬라이스가 용이한 이유는 단순히 디스크를 복제본을 안떠서 절약하자는용도가 아니고 누군가는 이 샤드키에 대해 정리해서 내가 찾고자 하는 데이터들이 예를들아서 1번 샤드번호에 저기 저 노드에만 있다라고만 알고 있으면 리듀스 작업이 필요없다. 각 노드에 전부 안찔러봐도 된다는 말이다. 이 머신에 찔러서 내가 원하는 데이터 범위만 있으면 되는 것이고, 또 다른 특정 머신을 찔러서 내가 원하는 데이터 범위가 있으면 되는 것이다. 굳이 클러스터내에 모든 노드를 찔러볼 필요가 없는 것이다. 이거를 클라이언트가 결정할 수도 있다. 통신을 통해 받아올수도 있고 샤드키를 내가 계산하는 디폴트 계산식에 의해 만들어질수도 있다. 해시키로 만들어질수도 있다. 그게 날짜가 될수도 있고..

샤드의 개념은 필요한 노드만 찔러서 결과를 얻고자 함에 있다. 분산시스템에서 불필요한 작업을 줄여보겠다는 것이다. HDFS에서는 내가 HDFS의 3번 4번 노드를 안찔러보고 싶어도, 내가 원하는 데이터가 없어도 VM이 executor들은 배정을 한다.

샤드도 replication을 한다. 그래서 샤드 한노드가 깨지면 replica 2번이 근처에 있으면 대체하게 된다. 역시 언벨런싱이 생길 여지가 있고 샤드 리벨런싱을 잘 해줘야한다. 재빠르게 반영을 잘해줘야한다.

분산시스템의 큰 사상들은 대부분 애코시스템에서 일치한다. 하둡의 리플리카 갯수는 왜 홀수야, 복구용으로 리플리카 쓰네 마찬가지다. 분산시스템 처음 만드는 이유부터해서 나오는 아이디어가 동일하기 때문이다. 깨졌을때 복구해야하는 리플리카 개념도 있어야 하고, 정합성 체크를 위해 리플리카 갯수는 홀수개여야하고, 특정노드가 깨졌을때 과반수가 맞으면 그 데이터의 정합성이 맞다고 판단하고 데이터를 복제를 하고 이런거다.