Build a Serverless Real-Time Data Processing App By AWS services

2019-10-24

.

Data_Engineering_TIL_(20190909)

[학습한 Contents]

  • AWS 튜토리얼

1) 주제 : Build a Serverless Real-Time Data Processing App

2) URL : https://aws.amazon.com/ko/getting-started/projects/build-serverless-real-time-data-processing-app-lambda-kinesis-s3-dynamodb-cognito-athena/

[학습목표]

  • AWS 서비스를 이용한 실시간 데이터처리

[구현목표 아키텍처]

0

[실습내용 요약]

step 1) Intro

  • Set up your AWS Cloud9 IDE

  • Set up the Command Line Clients

step 2) Build a data stream

  • Create an Amazon Kinesis stream

  • Produce messages into the stream

  • Read messages from the stream

  • Create an identity pool for the unicorn dashboard

  • Grant the unauthenticated role access to the stream

  • View unicorn status on the dashboard

  • Experiment with the producer

step 3) Aggregate data

  • Create an Amazon Kinesis stream

  • Create an Amazon Kinesis Data Analytics application

  • Read messages from the stream

  • Experiment with the producer

step 4) Process streaming data

  • Create an Amazon DynamoDB tables

  • Create an IAM role for your Lambda function

  • Create a Lambda function to process the stream

  • Monitor the Lambda function

  • Query the DynamoDB table

step 5) Store & query Data

  • Create an Amazon S3 bucket

  • Create an Amazon Kinesis Data Firehose delivery stream

  • Create an Amazon Athena table

  • Explore the batched data files

  • Query the data files

[실습 상세내용]

step 1) Intro

step 1-1) Set up your AWS Cloud9 IDE

클라우드9 콘솔 접속 및 create environment 클릭

아래 그림과 같이 설정 후 생성

1

2

3

다음을 누르면 최종검토 화면 전시, 생성한 내용이 맞는지 확인 후 최종생성

4

생성 클릭하면 아래와 같이 로딩되면서 클라우드9이 생성됨

5

생성이 완료되면 하단의 CLI 창에 aws sts get-caller-identity 명령어를 입력하여 계정관련 정보를 확인

6

step 1-2) Set up the Command Line Clients

https://aws.amazon.com/ko/getting-started/projects/build-serverless-real-time-data-processing-app-lambda-kinesis-s3-dynamodb-cognito-athena/ 의 아래와 같은 화면에서 produce.go와 consumer.go를 다운로드

7

cloud9 cli 창으로 돌아와서 curl -s https://dataprocessing.wildrydes.com/client/client.tar | tar -xv 명령어를 실행하면 아래 그림과 같이 컨슈머와 프로듀서 파일이 좌측 상단화면에 생성되는 것을 확인할 수 있다.

8

주의사항 : cloud9 화면은 실습 간 계속 사용할 예정으로 항상 띄워놓고 있어야 한다.

step 2) Build a data stream

step 2) 에서 구현하고자 하는 아키텍처 부분

9

step 2-1) Create an Amazon Kinesis stream

키네시스 콘솔 접속 -> create data streams 클릭

아래 화면과 같은 설정으로 생성해준다.

10

생성된 결과는 아래 그림과 같다.

11

step 2-2) Produce messages into the stream

다시 cloud9 콘솔로 넘어가서 콘솔에서 ./producer 명령어를 실행한다.

그러면 아래 그림과 같이 producer가 작동하는 것을 확인할 수 있다.

12

step 2-3) Read messages from the stream

그리고 콘솔창을 하나 더 열어서 ./consumer 명령어를 실행하면 아래 그림과 같이 제이슨 형식의 데이터를 초단위로 수집하게 된다.

13

step 2-4) Create an identity pool for the unicorn dashboard

AWS 콘솔에서 Cognito 로 접속해서 ‘자격증명 폴 관리’를 클릭한다.

아래 그림과 같이 pool name을 부여하고 create pool을 클릭한다.

14

다음과 같은 화면이 전시될것인데 그러면 allow를 클릭한다.

15

그러면 pool이 생성되는데 우측상단에 edit identity pool을 클릭한다.

16

아래 그림과 같이 pool name을 부여하고 저장한다.

identity pool id는 메모장에 잘 적어둔다.

17

step 2-5) Grant the unauthenticated role access to the stream

IAM 서비스 콘솔로 이동해서

좌측 메뉴에서 roles클릭 -> 아래와 같이 검색하여 해당 롤을 클릭

18

클릭해서 아래 그림과 같은 화면이 전시되면 add inline policy를 클릭한다.

19

아래 그림과 같은 화면이 전시되면 service는 kinesis로 선택, actions에서는 list와 read 선택한다.

20

그리고 resources로 넘어가서 stream 옆에 add arn 클릭,

지역, 어카운트넘버, stream name을 아래 그림과 같이 입력해주고 add 클릭

21

위의 내용들을 설정하면 아래 그림과 같이 setting이 된것을 확인할 수 있다.

22

그리고 좌측하단에 review policy를 클릭하면 아래 그림과 같은 화면이 전시되는데 Name을 아래 그림과 같이 부여해주고 create policy를 클릭한다.

23

생성하면 아래 그림처럼 role이 셋팅된 것을 확인할 수 있다.

24

step 2-6) View unicorn status on the dashboard

브라우저에서 https://dataprocessing.wildrydes.com/dashboard.html 로 접속하면 아래 그림과 같은 하면이 전시가 되면서 identity pool ID를 입력하라고 나오는데 아까 메모해 두었던 identity pool ID를 넣어주고 start를 클릭한다.

25

그러면 아래 그림과 같이 유니콘이 실시간으로 이동하는 화면이 전시가 될 것이다.

26

step 2-7) Experiment with the producer

cloud9 콘솔로 돌아와서 컨트롤 + c 명령어로 producer를 중단했다가 다시 실행했을때 정상적으로 유니콘이 사라졌다 다시 움직이는지 확인한다.

27

그리고 아래 그림과 같이 터미널을 하나 더 띄워서 ./producer -name Bucephalus 명령어를 실행해서 시각화 대시보드에 다른 유니콘 하나를 더 띄워본다.

28

29

step 3) Aggregate data

step 3) 에서 구현하고자 하는 아키텍처 부분

30

step 3-1) Create an Amazon Kinesis stream

키네시스 콘솔로 이동해서 create data streams 클릭, 아래 그림과 같이 세팅 후 생성

31

아래 그림과 같이 생성결과를 확인할 수 있다.

32

step 3-2) Create an Amazon Kinesis Data Analytics application

대시보드로 이동해서 create analytics를 클릭한다.

그리고 아래 그림과 같이 세팅 후 create application을 클릭한다.

33

그리고 아래 그림과 같은 화면이 나오면 connect streaming data를 클릭한다.

34

그리고 아래 그림과 같이 셋팅해준다.

35

이 셋팅화면 하단에 discover schema를 클릭해서 아래 그림과 같이 전시가 되는지 확인한다.

36

그리고 하단에 save and continue를 클릭한다.

37

세이브 엔 컨티뉴를 누르면 아래 그림과 같은 화면이 전시되는데 여기서 go to SQL editor를 클릭한다.

38

아래 그림과 같은 화면이 전시되면 yes, start application 클릭

39

그리고 아래 그림과 같이 쿼리 화면에 다음과 같은 내용을 복붙한다.

CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" (
  "Name"                VARCHAR(16),
  "StatusTime"          TIMESTAMP,
  "Distance"            SMALLINT,
  "MinMagicPoints"      SMALLINT,
  "MaxMagicPoints"      SMALLINT,
  "MinHealthPoints"     SMALLINT,
  "MaxHealthPoints"     SMALLINT
);

CREATE OR REPLACE PUMP "STREAM_PUMP" AS
  INSERT INTO "DESTINATION_SQL_STREAM"
    SELECT STREAM "Name", "ROWTIME", SUM("Distance"), MIN("MagicPoints"),
                  MAX("MagicPoints"), MIN("HealthPoints"), MAX("HealthPoints")
    FROM "SOURCE_SQL_STREAM_001"
    GROUP BY FLOOR("SOURCE_SQL_STREAM_001"."ROWTIME" TO MINUTE), "Name";

그리고 save and run SQL을 클릭한다.

40

그러면 아래 그림과 같이 집계된 결과가 전시된다.

41

화면 상단에 wildrydes를 클릭해서 아래 그림과 같은 콘솔화면으로 빠져나온다.

그런 다음에 connect to a destination을 클릭한다.

43

아래 그림과 같이 설정 셋팅을 해준다.

42

step 3-3) Read messages from the stream

cloud9 콘솔로 돌아가서 아래 그림과 같이 ./consumer -stream wildrydes-summary 명령어를 실행해서 데이터가 잘 전시되는지 확인한다.

44

step 3-4) Experiment with the producer

새로운 콘솔 터미널을 열어서 아래 그림과 같이 ./producer -name Bucephalus 명령어를 실행한다.

45

step 4) Process streaming data

step 4) 에서 구현하고자 하는 아키텍처

46

step 4-1) Create an Amazon DynamoDB tables

DynamoDB 서비스 콘솔로 접속 -> create table 클릭 -> 아래 그림과 같이 셋팅 및 생성

47

생성후 overview 메뉴에서 하단에 ARN 주소 확인, 메모장에 임시기록

48

step 4-2) Create an IAM role for your Lambda function

IAM 서비스 콘솔에 접속 -> 좌측 메뉴에서 policies 클릭 -> create policy 클릭

49

아래 그림과 같이 서비스는 다이나모 디비로 설정, actions는 BatchWriteItem를 검색하여 설정, 그리고 리소스에서 table에 add arn 클릭

50

아래 그림과 같은 화면에서 지역, 아이디고유번호,테이블이름 기입(메모장에 기록해둔 다이나모디비 arn 주소 참고할 것) 그리고 add 클릭

51

add를 클릭하면 아래 그림과 같은 화면이 전시되는데 우측하단에 review policy 클릭

52

그러면 아래 그림과 같은 화면이 전시되는데 Name에 WildRydesStreamProcessorRole 기입 후 create policy 클릭

53

IAM 콘솔의 좌측메뉴에서 roles 클릭 -> create role 클릭 -> 아래 그림과 같이 설정 -> 우측하단에 next : permission 클릭

54

그러면 아래 그림과 같은 화면이 전시되는데 filter policies창에 AWSLambdaKinesisExecutionRole 을 검색하여 나온 결과의 체크박스 체크

55

filter policies창에 AWSLambdaKinesisExecutionRole를 지우고 다시 WildRydesDynamoDBWritePolicy 를 입력하여 마찬가지로 검색결과로 나온 것의 체크박스 체크 그리고 next : tags 클릭

56

그러면 아래 그림과 같은 화면이 전시되는데 우측하단에 create role 클릭

57

step 4-3) Create a Lambda function to process the stream

AWS 람다 콘솔로 접속 -> create function 클릭 -> 아래 그림과 같이 설정

58

아래 그림과 같이 하단 코드에디터에 다음 코드들을 입력

59

'use strict';

const AWS = require('aws-sdk');
const dynamoDB = new AWS.DynamoDB.DocumentClient();
const tableName = process.env.TABLE_NAME;

exports.handler = function(event, context, callback) {
  const requestItems = buildRequestItems(event.Records);
  const requests = buildRequests(requestItems);

  Promise.all(requests)
    .then(() => callback(null, `Delivered ${event.Records.length} records`))
    .catch(callback);
};

function buildRequestItems(records) {
  return records.map((record) => {
    const json = Buffer.from(record.kinesis.data, 'base64').toString('ascii');
    const item = JSON.parse(json);

    return {
      PutRequest: {
        Item: item,
      },
    };
  });
}

function buildRequests(requestItems) {
  const requests = [];

  while (requestItems.length > 0) {
    const request = batchWrite(requestItems.splice(0, 25));

    requests.push(request);
  }

  return requests;
}

function batchWrite(requestItems, attempt = 0) {
  const params = {
    RequestItems: {
      [tableName]: requestItems,
    },
  };

  let delay = 0;

  if (attempt > 0) {
    delay = 50 * Math.pow(2, attempt);
  }

  return new Promise(function(resolve, reject) {
    setTimeout(function() {
      dynamoDB.batchWrite(params).promise()
        .then(function(data) {
          if (data.UnprocessedItems.hasOwnProperty(tableName)) {
            return batchWrite(data.UnprocessedItems[tableName], attempt + 1);
          }
        })
        .then(resolve)
        .catch(reject);
    }, delay);
  });
}

그리고 더 하단으로 스크롤을 내려서 아래 그림과 같이 설정

60

다시 가장 상단으로 스크롤을 올리면 아래 그림과 같은 화면에서 add trigger 클릭

61

아래 그림과 같이 설정 후 add 클릭 후 빠져나온 화면에서 우측 상단에 save 클릭

62

step 4-4) Monitor the Lambda function

다시 클라우드9 콘솔로 돌아와서 터미널을 하나 새로 열고 ./producer -name Rocinante 명령어를 실행해서 잘 작동하는지 확인한다.

63

step 4-5) Query the DynamoDB table

다이나모디비 서비스 콘솔로 접속 -> 왼쪽에서 Tables 클릭 -> UnicornSensorData 선택 -> items 클릭하면 아래 그림과 같이 결과가 나오는지 확인한다.

64

step 5) Store & query Data

step 5) 에서 구현하고자 하는 아키텍처 부분

65

step 5-1) Create an Amazon S3 bucket

s3 콘솔로 접속해서 아래 그림과 같이 s3 버킷 하나를 생성해준다.

66

step 5-2) Create an Amazon Kinesis Data Firehose delivery stream

키네시스 콘솔로 접속 후 create delivery stream 클릭, 아래 그림과 같이 설정 후 next 클릭

67

아래 그림과 같이 설정 후 next 클릭

68

아래 그림과 같이 설정 후 next

69

아래 그림과 같이 설정 후 create new or choose 클릭

70

71

아래 그림과 같은 화면이 나오면 allow 클릭

72

그러면 아래 그림과 같은 화면이 전시가 되는것을 확인하고 next 클릭

73

아래 그림과 같이 delivery streams가 생성된 것을 확인

74

step 5-3) Create an Amazon Athena table

아테나 콘솔로 접속후 아래 그림과 같이 다음 쿼리를 돌려본다.

75

CREATE EXTERNAL TABLE IF NOT EXISTS wildrydes (
       Name string,
       StatusTime timestamp,
       Latitude float,
       Longitude float,
       Distance float,
       HealthPoints int,
       MagicPoints int
     )
     ROW FORMAT SERDE 'org.apache.hive.hcatalog.data.JsonSerDe'
     LOCATION 's3://YOUR_BUCKET_NAME_HERE/';

step 5-4) Explore the batched data files

아래 그림과 같이 아까 생성한 s3 버킷으로 이동해서 배치 데이터별로 잘 데이터가 생성되었는지 확인한다.

76

step 5-5) Query the data files

아래 그림과 같이 다시 아테나 콘솔로 돌아와서 다음과 같은 쿼리를 날려서 결과를 확인해본다.

SELECT * FROM wildrydes

77

step 5-6) 실습이 종료되면 사용했던 자원들에 대해 모두 terminate한다.