빅쿼리 기본개념

2021-12-11

.

Data_engineering_TIL(20211210)

[학습자료]

‘형준킴 염창동형준킴’ 님이 작성한 “갈아먹는 BigQuery” 블로그 글을 읽고 공부한 내용입니다.

** URL :

갈아먹는 BigQuery [1] 빅쿼리 소개 (tistory.com)

갈아먹는 BigQuery [2] 빅쿼리 스키마 및 데이터 모델 (tistory.com)

갈아먹는 BigQuery[3] 빅쿼리 SQL 분산 실행 (tistory.com)

갈아먹는 BigQuery[4] 빅쿼리 아키텍쳐 (tistory.com)

[학습내용]

  • 빅쿼리 개요

1) 개요

페타바이트 이상의 빅데이터에 대해서도 SQL 쿼리를 빠르게 처리하고 저장할 수 있는 앤진겸 데이터 웨어하우스로 GCP 서비스임. 친숙한 SQL문 만으로 분산 저장되어 있는 방대한 데이터를 빠르게 분석할 수 있는 서비스임

2) 등장배경

개발 당시 유행했던 하둡기반의 맵리듀스 방식을 쓰기에는 너무 속도가 느리다고 판단해서 구글에서 드래멜이라는 프로젝트를 시작함. 이 드래멜이 뭐냐면 정형데이터를 분산 저장하고 SQL 문을 통해서 빠르게 데이터를 분석하도록 개발한 엔진임. 분석가들이 이 드래멜 엔진을 이용해서 대용량 데이터를 분석이 가능하도록 하고자 개발하게 되었음. 빅쿼리는 이 드래멜 프로젝트를 기반으로 대중이 사용할 수 있도록 클라우드 서비스로 제공한 것이 바로 빅쿼리임.

  • 빅쿼리가 극강의 성능을 낼 수 있는 요인 두가지

첫번째, Columnar 스토리지

두번째, 트리구조 기반의 분산처리

1) Columnar 스토리지

일반적인 RDBMS가 레코드를 저장하는 방식과 다르게 빅쿼리는 컬럼단위로 데이터를 저장함.

드래멜 프로젝트는 애초에 빅데이터를 분석하기 위해 설계되었기 때문에 이렇게 설계함.

대용량 데이터를 걸럼 기반으로 저장할 경우 장점은 아래와 같음

장점 1. 트래픽 최소화

예를 들어서 “SELECT top(title) from my_table”과 같이 쿼리를 실행한다고 하면 컬럼 기반 저장방식에서는 해당하는 컬럼만 조죄하면 됨. 이는 데이터 양이 커지면 커질수록 효과가 극대회 될 수 밖에 없음. 그리고 반면에 “SELECT *“과 같은 문법은 가급적이면 사용하지 않는 것이 좋음

장점 2. 높은 데이터 압축률

컬럼 단위로 데이터를 저장한다는 것은 곧 데이터 타입의 데이터들이 몰려서 저장된다는 것을 의미함. 이는 데이터를 압축하기에 유리할 수 밨에 없음. 일반적인 RDBMS는 데이터를 1:3 정도의 비율로 압축하지만 컬럼 기반으로 저장할 경우 1:10 정도의 비율로 압축이 가능함. 데이터 압축률이 높다는 것은 결국에는 쿼리 수행성능도 향상 될 수밖에 없음.

이러한 컬림 기반 저장방식은 사실 기존의 RDBMS에서도 구현이 되어 있고, 카산드라 같은 컬럼 베이스 NoSQL 솔루션과 같이 이미 존재하는 기술임. 하지만 드래멜은 이러한 컬럼 기반 저장기술에 더해서 대규모 분산 처리를 하도록 결합해서 성능을 극대화 시킨 것이 기존의 솔루션들과 차별점이 있는 것임.

2) 트리구조 기반의 분산처리

1

구글 클라우드는 수천대의 머신에 쿼리 연산을 분산시키기 위해서 트리 구조를 활용함. 위에 그림과 같이 루트 서버는 클라이언트의 SQL 쿼리문을 분석하여 분산 머신들에서 동작하는 수많은 작은 단위의 쿼리문들을 만들고 이를 intermediate 서버들에게 전달함. 이 서버들은 실제 연산을 수행하는 leaf 서버들에게 쿼리를 전달하고, 쿼리의 결과 값으로 반환하는 값들을 합쳐서 부모 노드에게 전달해주게 됨.

이렇게 컬럼 베이스 저장 구조와 트리 기반의 분산처리를 통해서 방대한 양의 데이터를 분석하는 SQL 쿼리문을 순식간에 처리하는 드래멜(=빅쿼리)을 만들어냄.

  • 빅쿼리의 또다른 장점 : 구글 클라우드의 다른 서비스와의 연계

구글 클라우드는 빅쿼리와 함께 빅쿼리에 데이터를 저장할 수 있도록 Cloud Dataflow, Cloud Dataproc, Cloud Storage 등 다양한 에코 시스템이 구축되어 있음. 이러한 서비스들을 잘 조합해서 데이터 파이프라인 구축도 가능함.

  • 빅쿼리의 약점

빅쿼리는 OLAP에 특화되어 있기 기 때문에 OLTP에는 상대적으로 부적합한 편임. 적은 양의 데이터를 계속해서 입력하고, 업데이트 해야하는 작업에서는 오히려 성능이 떨어지는 현상을 보임. 결론적으로 소규모 데이터를 빈번하게 연산을 처리해야 하는 경우에는 적합하지 않음.

  • 빅쿼리의 내부 스키마구조

빅쿼리는 RDBMS 처럼 일정한 스키마를 가진 테이블을 생성하고, 정형데이터를 저장함. 따라서 raw data를 곧바로 저장하는 데이터 레이크로는 사실 적합하지 않고, 데이터 전처리를 거쳐서 정제된 데이터를 적재하는 데이터 웨어하우스에 적합하다고 할 수 있음. 여기서 어떻게 빅쿼리에 데이터를 저장할까에 대한 고민 포인트가 생기는데 빅 쿼리에 데이터를 적재할 테이블의 스키마는 어떻게 짜는게 좋을까라는 것이다.

2

위에 그림에서 왼쪽 표 처럼 원본 데이터가 있는 상황이라고 치자. 이를 전통적인 RDBMS에서는 테이블에 중복된 정보가 저장되지 않도록 정규화를 적용하여 테이블을 쪼개어 데이터를 저장한다. 그리고 특정 거래의 상세 정보를 확인하고 싶을 때에는 조인 연산을 통해서 원하는 정보를 가져오도록 한다. 이 경우 데이터의 중복은 줄어들지만 조인 연산에 많은 비용이 발생하는 단점이 있다. 특히 빅 쿼리의 분석 대상인 페타 바이트 단위의 데이터를 테이블 간 조인하는 것은 비효율 적이다. 그렇기 때문에 빅쿼리에서는 빅쿼리에 적재하기 위해 쪼개어 놓은 테이블을 다시 합쳐주는 denormalization 과정을 하게된다.

3

denormalization을 거친 테이블에서는 다시 중복된 데이터가 저장된다는 문제점이 생긴다. 또한 중복되는 키 값에 대해서 group by 연산을 수행할 경우 성능이 저하되는 문제점이 있다. 이를 해결하기 위해서 빅쿼리는 Nested와 Repeated 라는 개념이 있다.

4

Nested와 Repeated Column은 중복되는 값을 한번만 저장해도 되며, 미리 group by 연산을 수행한 것과 같은 효과를 낸다. 그 결과 데이터 중복은 줄어들고 성능이 향상된다.

  • 드래멜 데이터 모델

그러면 빅쿼리는 어떠한 방식으로 이런식으로 충첩이 있고 반복이 발생하는 스키마의 데이터를 컬럼 기반으로 저장할 수 있는지 의문이 생긴다. 빅쿼리의 전신인 Dremel 논문에 Nested and Repeated 스키마를 Columnar Storage에 저장하기 위한 데이터 모델이 자세히 소개되어 있다.

5

위에 그림은 드래멜에서 데이터를 저장하는 스키마와 그 예시인데 required, optional, repeated 등의 생소한 단어들이 보인다. json 형태로 이를 설명해보자면 group이란 여러 필드들이 묶여있는 객체이고, repeated란 배열을 의미한다. optional group Links라는 것은 Document에서 있어도 되고 없어도 괜찮은 객체 멤버 변수 Links라는 의미임. 메타데이터 r1과 r2를 json으로 바꾸어 표현하면 아래와 같다.

6

위와 같이 RDBMS 테이블 형식으로 저장하기에는 다소 까다로운 형태로 mongodb와 같은 document oriented nosql 솔루션이 적합해 보인다. 하지만 드래멜은 이를 컬럼을 기준으로 저장하는 방식을 적용하였고, 이러한 복잡성을 핸들링하기 위해 repitition level과 definition level이라는 것을 사용한다.

1) repitition level

7

위에 그림의 도표는 컬럼 기반 저장 방식으로 위에서 언급한 r1과 r2를 저장한 것인데 위에서 r은 repetition level, d는 definition level을 의미한다. json 데이터를 컬럼별로 나누어 저장이 가능하다면, 이 컬럼별로 저장된 데이터만 보고 원래의 json 데이터를 복원도 가능하다는 것이다. 그리고 이를 위해서 추가적으로 필요한 정보가 바로 r과 d이다. r과 d가 없다고 가정하고 Name.Language.Code를 보자. 이를 앞으로 value의 path라고 하자. en-us, en, NULL, en-gb, NULL 데이터가 있지만 이들이 어느 위치에서 등장하는지 알 수 없다. 예를 들어서 아래 그림과 같이 분명히 en이라는 값이 저장되는 위치는 다르지만 컬럼별로 떼어내어 저장할 경우 결과가 동일해지게 된다.

8

이러한 상황을 방지하고자 도입한 개념이 repetition level 이다. 한마디로 표현하면 자신 이전에 등장한 같은 path의 value와 함께 묶여져 있는 level이다. 예를 다시 들어보면 문서 r1을 위에서 아래로 스캔하며 Name.Language.Code 변수들의 repetition level을 정해보자.

가장 먼저 첫 번째 Name 아래 en-us가 있을것이다. 이는 이전에 등장한 같은 path의 value가 없었기 때문에 r은 0이 된다.

그 다음으로 넘어가면 en이 보인다. 이는 같은 path를 지닌 en-us와 같은 Name.Language에 묶여있다. 따라서 r=2가 된다.

아래로 넘어가보면 Name의 두 번째 원소는 아예 Language를 포함하고 있지 않다. 따라서 Name.Language.Code는 Null 값이며, 이때 같은 path를 지닌 이전 값과는 같은 Name 아래에 묶여있다. 그러므로 r=1이 된다.

그 아래로 넘어가보면 마지막 Language가 보이고 en-gb라는 Code가 보인다. 이는 이전에 등장한 동일한 path의 값 en과는 같은 Name에 속하므로 r=1이 된다.

r2에도 동일한 방식을 적용하여 정리를 해보면 아래와 같은 결과를 얻을 수 있다.

9

2) definition level

repetition level을 통해서 변수들이 등장하는 위치에 대해서 알수 있지만 아직 컬럼기반 저장 데이터만 보고 json 형태의 데이터를 복구하기에는 부족하다. Null 값 문제 때문이다. 이번에는 Name.Language.Country의 예시를 통해서 알아보자. Name.Language.Country의 컬럼 기반 저장 값은 아래와 같다.

10

value가 있는 값들은 그냥 해당 변수의 path의 level이 d가 되며, Name.Language.Country의 경우에는 3이 된다. 문제는 NULL 값이다. r1을 살펴보면 Name 아래의 첫 번째 원소의 Language의 두 번째 원소의 경우 Country 값이 없다. 이 경우 Name.Language 까지는 정의되어 있으니, definition level은 2가 된다. 반면에 r1의 Name의 두 번째 원소의 경우 아예 Language가 없어서 자연스럽게 Country가 NULL 값을 가진다. 이 경우 definition level이 1이 된다.

11

definition level과 repetition level 값을 추가하는 것만으로 데이터 중첩과 반복이 등장하는 데이터를 컬럼 별로 쪼개어 효과적으로 저장할 수 있다.

3) Data Encoding

이제 각각의 컬럼은 디스크 공간에 블록 단위로 저장된다. 각각의 블록은 repetition level과 definition level을 포함하며, value들은 모두 압축되어 저장한다. NULL 값은 별도로 명시적으로 저장하지 않는다. definition level이 원래 path의 level과 같지 않으면 NULL 값인걸 알 수 있기 때문이다. 마찬가지로 NULL 값이 아닌 value들은 definition level이 필요하지 않으므로 이를 저장하지 않는다. 당연히 path의 level과 definition level은 같을 것이기 때문이다. 또한 repetition level과 definition level은 사용가능한 최소한의 비트만 사용하여 저장한다. 예를 들어서 repetition level의 최대 값이 3일 경우 2bit만 사용하여 인코딩한다.

  • 트리구조의 SQL 쿼리 처리방식 좀더 알아보기

1

드래멜은 입력 쿼리가 들어오면 execution tree를 생성한다. 트리는 root server, intermediate servers, leaf searvers로 구성된다. 루트 서버는 사용자로부터 SQL 쿼리를 입력받는다. 루트 서버는 이를 분산 노드들에서 실행할 수 있도록 작은 SQL문들로 쪼개어 새롭게 SQL문을 생성한 다음 intermediate 서버로 내려보낸다. intermediate 서버들도 전달받은 쿼리를 더 작게 쪼개어 리프 노드로 내려보낸다. 리프 노드는 빅쿼리의 경우 Colossus라는 저장소에 저장된 데이터를 읽어와서 쿼리 연산을 수행하고 결과를 부모 노드에게 전달한다. 이를 순차적으로 취합하여 최종 쿼리 수행 결과를 얻게 된다.

간단한 예시를 통해서 어떻게 SQL 문이 쪼개지고, 분산 노드들에게 전달되는 지를 알아보자

“SELECT A, COUNT(B) FROM T GROUP BY A”

루트 서버는 이 SQL을 입력받은 다음, 테이블 T의 메타 데이터를 확인한 뒤, 다음과 같이 쿼리를 재생성한다.

12

첫 번째 줄부터 보면 원본 테이블 T 대신에 R11부터 R1n을 union한 테이블로부터 sum을 계산한다. 이 때 R11부터 R1n은 루트 노드(레벨 1)가 1번부터 n번까지의 intermediate 노드들에게 분산 시킨 쿼리의 결과들을 의미한다.

노드들에게 쿼리는 어떻게 분산 시킬까요? 먼저 테이블 T는 horizontally partitioned 되어 있다. 테이블 하나에 저장되는 레코드의 양이 많아서 이를 횡으로 나누어 여러 물리 머신에 나눠 저장한 것이다. 분산된 테이블 T11부터 T1n에 대하여 각각 쿼리를 수행하도록 SQL을 쪼개어 준 뒤, 트리의 하위 노드들에 전달한다. Intermediate 노드들도 같은 작업을 반복하여 연산을 더 작게 쪼갠다. 이렇게 연산량이 작아진 SQL문에 리프 노드에 도달하고 여러 대의 머신에서 병렬적으로 연산을 진행하게 되는 것이다.

1) Query Dispatcher

드래멜은 여러 사용자가 동시에 쿼리를 요청할 수 있는 시스템이어야 한다. 이를 위해서 쿼리들의 우선 순위를 정하고 스케쥴링 및 로드 밸런싱을 해주는 Query Dispatcher라는게 있다. 드래멜에서 특정 쿼리에 대한 연산량을 slot이라는 단위로 계산한다. slot은 리프 서버에서 쿼리를 수행하는 단일 쓰레드를 의미한다. 만일 리프 서버가 3000대가 있고 각각이 8개의 쓰레드를 돌린다면 총 24000개의 슬럿이 있는 것이다. 만일 100,000개의 분산 테이블에 저장된 테이블에 대한 쿼리가 들어올 경우 각 슬럿에 5개의 테이블 조각을 할당하면 처리가 가능하다.

쿼리 디스패쳐는 이러한 스케쥴링 외에도 fault tolerence를 수행한다. 특정 머신에서 쿼리를 수행하는 속도가 지나치게 느리면 이를 다른 노드에게 일임한다. 또한 특정 데이터 복사본에 접근이 불가능할 경우, 다른 복사본에 접근하도록 한다는 등의 동작을 수행한다. 또한 쿼리 디스패쳐는 결과를 리턴하기 이전에 반드시 스캔해야하는 최소 테이블 조각 비율을 준수하는 역할도 수행한다. 예를 들어 98%라면 원본 테이블의 98%에 해당하는 분산 테이블을 스캔하기 이전에는 결과를 리턴하지 않도록 한다.

2) In Memory Query Execution

그러면 리프 노드에서 SQL을 실행하여 얻은 데이터들이 어떻게 상위 노드로 전달되는지, 노드들 간에 데이터 교환은 어떻게 이루어지는지 궁금할 수 있다. 빅쿼리에서는 이렇게 노드들 간에 데이터 교환 작업을 셔플링이라고 부른다. 맵리듀스 논문에서 나온 그 셔플링 개념과 유사하다. 빅쿼리에서는 셔플링을 row 단위로 노드들 간에 데이터를 주고 받는 것을 의미한다. 이를 위해서 세 가지 high level API이 구현되었고, 이를 기반으로 셔플링 구조를 설계하였다.

Producer (producer_id) {  
  void SendRow(row, consumer_id) : Called to send a row to a given consumer  
                                   on behalf of this producer.
}
Consumer (consumer_id) {  
  string ReceiveRow() : Called to receive one row for this consumer.
}
Controller {  
  StartShuffle() : Called before any producers or consumers start sending or 
                   receiving rows.   
  EndShuffle()   : Called after all producers and consumers have successfully                   
                   sent and received all rows. 
}

Producer는 데이터 처리 작업을 진행한 결과를 다음 노드에게 전달해주는 역할을 하며, Consumer는 이전 노드에서 전달한 데이터를 수신하는 역할을 한다. 하나의 워커 노드 안에는 이러한 Producer와 Consumer 모듈이 모두 들어있다. 그리고 Controller API를 통해서 데이터를 주고 받는 과정을 제어한다. 이런 과정을 통해서 데이터를 repartitioning하고, 동적 SQL 계획에 적합하게끔 셔플링을 진행한다고 논문에는 추상적으로 나와 있다.

13

이러한 빅 쿼리의 셔플 구조는 기존의 맵리듀스 구조와 크게 두 가지 측면에서 차이가 있다. 먼저 모든 연산과 데이터 통신을 디스크가 아닌 메모리 단에서 이루어지도록 하였다. Jupiter라는 고속 네트워크를 통해서 shared memeory abstraction를 기반으로 Producer는 자신이 처리한 데이터를 메모리에 직접 출력한다. 이는 local disk 공간에 파일을 출력한 뒤 마스터에게 알려주는 MapReduce와 큰 차이를 보인다. 그리고 빅쿼리에서는 셔플링 과정이 모두 끝나기 전에 repartition된 데이터는 곧바로 다음 데이터 처리 파이프라인을 수행할 수 있다. 이는 역시 모든 셔플링 과정이 끝난 이후에 리듀스 과정을 수행할 수 있었던 MapReduce와 큰 차이점을 보이며 성능 상의 향상을 가져온다고 한다.

3) 트리구조의 SQL 쿼리를 처리할때 논리적인 특징

특징 1. 테이블 형식의 데이터는 횡적으로 쪼개져서 분산 파일 시스템에 저장되어 있음

특징 2. 리프 노드들은 분산 파일 시스템과 통신을 통해 SQL 문을 실행하고, 그 결과를 부모 노드에 전달함

특징 3. 부모 노드들은 다시 이를 합쳐서 자신의 부모 노드에 전달하며, 루트 노드에 도달하면 최종 SQL 결과를 리턴함

특징 4. SQL을 실행하는 연산의 가장 작은 단위는 슬럿임. 특정 SQL에 필요한 슬럿 수는 빅 쿼리가 동적으로 계산함

특징 5. 빅쿼리는 슬럿을 할당하고 전체 쿼리 수행을 조율하며 falult tolerance를 보장하는 Query Dispatcher라는 모듈이 있음

특징 6. 노드들 간에 데이터를 주고 받는 셔플링은 모두 메모리 기반으로 실행되어 성능을 극대화 한다.

  • 빅쿼리 아키텍처

14

빅 쿼리는 크게 네 가지 구성 요소로 이루어져 있다.

구성 요소 1. Dremel(Compute): 방대한 분산 노드들에서 SQL 쿼리를 실행

구성 요소 2. Colossus(Storage): 데이터를 저장하고 실시간 처리를 할 수 있는 구글의 차세대 파일 시스템

구성 요소 3. Jupiter(Network): Compute와 Storage 사이의 통신 담당

구성 요소 4. Borg(Orchestration): 이 모든 분산 노드들을 조율 및 운영, 쿠버네티스의 전신

1) Dremel - Execution Engine

빅 쿼리에서 쿼리의 수행 연산은 Dremel을 통해서 이루어짐. Dremel은 SQL 쿼리가 들어오면 이를 execution tree로 만든다. 그리고 분산된 노드들에서 쿼리를 수행할 수 있게끔 원본 쿼리를 쪼갠 다음, 앞서 만든 트리를 통해 리프 노드로 전달한다. 리프 노드는 Colossus라는 빅쿼리의 파일시스템에서 데이터를 읽어와서 쿼리 연산을 수행한 뒤, 결과를 부모 노드에게 전달하고 이를 모두 취합하여 쿼리 결과를 가져오게 됨.

2) Colossus - Distributed Storage

Colossus는 GFS(구글 파일 시스템, HDFS의 전신) 이후 등장한 구글의 가장 최신 버전의 분산 파일 시스템임. 구글의 각 데이터 센터 별로 콜로수스 클러스터가 구성되어 있으며, 이는 빅 쿼리에 넉넉한 저장 용량을 제공함. BigQuery는 Columnar Storage 방식으로 Colossus에 데이터를 저장하게 됨.

3) Borg - Compute

방대한 규모의 분산 서버 클러스터를 운영하다보면 일부 머신이 고장난다던가, 파워가 끊긴다던가, 네트워크 장애가 발생한다던가 하는 문제가 발생할 수 있다. 또한 CPU 코어와 같은 물리적인 리소스를 적절히 할당해주는 것도 큰 과제다. 빅 쿼리는 이러한 운영 상의 문제를 해결하기 위해서 Borg를 사용하며, 이는 쿠버네티스의 전신 격으로 생각하면 된다.

4) Jupiter - The Network

빅 데이터 분산 처리의 가장 큰 병목 구간은 연산도 저장도 아닌 네트워크 병목인 경우가 많았다. Jupiter 네트워크는 초당 1 페타바이트 규모의 데이터를 전송가능하여 거대한 작업량도 순식간에 분산시켜준다. 또한 이는 10만대 가량의 머신들 간에 초당 10Gbs로 통신을 할 수 있도록 지원한다.

  • 빅쿼리가 데이터를 저장하는 개념적인 구조 : BigQuery Storage Hierarchy

15

BigQuery Storage Hierarchy은 크게 구글에 의해서 운영되어지는 Managed by Google 레이어 위에 사용자가 직접 정의하고 데이터를 저장하거나 조작할 수 있는 User-level Features가 정의되어 있다. 가장 아랫단에는 구글의 인프라가 있으며 그 위에 구글의 분산 파일 시스템인 Colossus가 얹어져서 동작한다.

Colossus 레이어 위에 Capacitor란 Columnar Storage 방식으로 데이터를 압축하여 저장하는 일종의 포맷이다. Automated DBA Tasks란 데이터 베이스가 더 빨리 동작하도록 주기적으로 물리 데이터 영역을 최적화 시키는 동작을 의미한다. 사용자들이 쓰는 로지컬 데이터 영역에 영향을 주지 않기 때문에 사용자는 인식하지 못한다. Dynamic Execution은 SQL 쿼리를 동적으로 계획하여 분산 노드에서 실행시켜주는 모듈을 말한다.

Managed by Google 레이어 위에 이제 사용자가 직접 정의하고 사용할 수 있는 레이어가 얹어지게 된다. Logical Storage Layer는 사용자가 직접 테이블을 정의하고 데이터를 적재하는 레이어이다. 그 위로 IAM을 직접 정의하여 데이터 접근 권한을 제어한다. 그리고 그 위로 public dataset을 사용한다던가 다른 데이터 셋들을 구매해서 사용한다는 등의 모듈들이 추가되는 방식이다.