Kaggle API를 이용한 데이터 다운로드 및 ES에 적재하기

2021-10-02

.

Data_Engineering_TIL(20211002)

[참고자료]

“Elasticsearch에 Data 적재 및 확인” 최정민님 깃허브 자료

URL : https://github.com/cjungm/with-aws/tree/main/ElasticSearch/data_input_ES

[참고사항]

“EC2를 이용한 ElasticSearch Cluster 구현하기 - Logstash 설치 및 ES 기본 데이터 템플릿 설정” 에 이어서 진행하는 실습내용임

URL : https://minman2115.github.io/DE_TIL287

[실습목표]

EC2 설치형으로 구성한 ES 클러스터에 데이터를 적재해본다.

Kaggle API에 요청하여 데이터를 coordinater node(또는 master node)에 다운로드하고, UTF-8로 인코딩 후에 python script를 이용해서 ES에 적재하는 시나리오임

[실습데이터]

kaggle에 ‘Nearby Social Network - All Posts’라는 토픽의 데이터를 사용해서 실습하였음

URL : https://www.kaggle.com/brianhamachek/nearby-social-network-all-posts

  • 실습할 데이터는 아래와 같음

allposts.csv: 43.09 GB

deletedposts.csv : 5 GB

chats_2021-04.csv : 11.17 GB

[실습내용 요약]

STEP 1) kaggle api key 생성 및 ES coordinater node(또는 master node)와 연결설정

STEP 2) kaggle api로 부터 필요한 데이터 다운로드

STEP 3) 다운로드 받은 데이터를 ES에 적재

[실습 상세내용]

STEP 1) kaggle api key 생성 및 ES coordinater node(또는 master node)와 연결설정

kaggle에 접속해서 로그인 한 뒤에 아래 그림과 API 호출시 인증을 위한 토큰을 발급받고, ES 코디네이트 노드(또는 마스터 노드)에 접속할 수 있는 선호하는 툴을 이용해서 발급받은 토큰을 업로드 해준다.

1

그런 다음에 코디네이트 노드(또는 마스터 노드)에서 아래와 같이 명령어를 실행하여 데이터를 다운받는다.

[ec2-user@es-coordinater ~]$ pip3 install --user kaggle
Collecting kaggle
  Downloading kaggle-1.5.12.tar.gz (58 kB)
     |████████████████████████████████| 58 kB 1.5 MB/s
Collecting six>=1.10
  Downloading six-1.16.0-py2.py3-none-any.whl (11 kB)
Collecting certifi
  Downloading certifi-2021.5.30-py2.py3-none-any.whl (145 kB)
     |████████████████████████████████| 145 kB 3.0 MB/s
Collecting python-dateutil
  Downloading python_dateutil-2.8.2-py2.py3-none-any.whl (247 kB)
     |████████████████████████████████| 247 kB 18.4 MB/s
Collecting requests
  Downloading requests-2.26.0-py2.py3-none-any.whl (62 kB)
     |████████████████████████████████| 62 kB 1.9 MB/s
Collecting tqdm
  Downloading tqdm-4.62.3-py2.py3-none-any.whl (76 kB)
     |████████████████████████████████| 76 kB 6.7 MB/s
Collecting python-slugify
  Downloading python_slugify-5.0.2-py2.py3-none-any.whl (6.7 kB)
Collecting urllib3
  Downloading urllib3-1.26.7-py2.py3-none-any.whl (138 kB)
     |████████████████████████████████| 138 kB 47.0 MB/s
Collecting idna<4,>=2.5; python_version >= "3"
  Downloading idna-3.2-py3-none-any.whl (59 kB)
     |████████████████████████████████| 59 kB 11.5 MB/s
Collecting charset-normalizer~=2.0.0; python_version >= "3"
  Downloading charset_normalizer-2.0.6-py3-none-any.whl (37 kB)
Collecting text-unidecode>=1.3
  Downloading text_unidecode-1.3-py2.py3-none-any.whl (78 kB)
     |████████████████████████████████| 78 kB 11.8 MB/s
Using legacy 'setup.py install' for kaggle, since package 'wheel' is not installed.
Installing collected packages: six, certifi, python-dateutil, urllib3, idna, charset-normalizer, requests, tqdm, text-unidecode, python-slugify, kaggle
    Running setup.py install for kaggle ... done
Successfully installed certifi-2021.5.30 charset-normalizer-2.0.6 idna-3.2 kaggle-1.5.12 python-dateutil-2.8.2 python-slugify-5.0.2 requests-2.26.0 six-1.16.0 text-unidecode-1.3 tqdm-4.62.3 urllib3-1.26.7

# 유저의 홈디렉토리에 .kaggle 폴더 생성
[ec2-user@es-coordinater ~]$ mkdir -p ~/.kaggle

# 현재 폴더의 kaggle.json 파일을 복사
[ec2-user@es-coordinater ~]$ cp kaggle.json ~/.kaggle/kaggle.json

# kaggle.json을 오너만 읽기, 쓰기 권한 할당
[ec2-user@es-coordinater ~]$ chmod 600 ~/.kaggle/kaggle.json

# kaggle 명령어를 실행어를 어디서나 실행하기 위해 Path 설정
[ec2-user@es-coordinater ~]$ export PATH=$PATH:/home/ec2-user/.local/bin

아래에 케글 토픽에 웹브라우저를 이용해서 접속해서 아래 그림과 같이 api command를 카피한다.

kaggle에 ‘Nearby Social Network - All Posts’라는 토픽

URL : https://www.kaggle.com/brianhamachek/nearby-social-network-all-posts

2

STEP 2) kaggle api로 부터 필요한 데이터 다운로드

# {위에서 복사한 KAGGLE API COMMAND} -p download_data
[ec2-user@es-coordinater ~]$ kaggle datasets download -d brianhamachek/nearby-social-network-all-posts -p download_data
Downloading nearby-social-network-all-posts.zip to download_data
100%|████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████▉| 12.7G/12.7G [03:30<00:00, 87.3MB/s]
100%|█████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 12.7G/12.7G [03:30<00:00, 64.6MB/s]

[ec2-user@es-coordinater ~]$ ll
total 8
drwxrwxr-x 2 ec2-user ec2-user   49 Oct  2 16:14 download_data
-rw-rw-r-- 1 ec2-user ec2-user   66 Oct  2 16:02 kaggle.json
    
[ec2-user@es-coordinater ~]$ cd download_data/

[ec2-user@es-coordinater download_data]$ ll
total 13295620
-rw-rw-r-- 1 ec2-user ec2-user 13614711527 Oct  2 16:17 nearby-social-network-all-posts.zip
    
# 총 용량이 52기가인데 EC2 용량제한 상 deleteposts.csv 하나만 ES에 적재해보자
[ec2-user@es-coordinater download_data]$ unzip nearby-social-network-all-posts.zip
Archive:  nearby-social-network-all-posts.zip
  inflating: deleted-posts/deletedposts.csv

...

STEP 3) 다운로드 받은 데이터를 ES에 적재

[ec2-user@es-coordinater download_data]$ cd ~

[ec2-user@es-coordinater ~]$ sudo pip3 install elasticsearch pandas
WARNING: Running pip install with root privileges is generally not a good idea. Try `pip3 install --user` instead.
Collecting elasticsearch
  Downloading elasticsearch-7.15.0-py2.py3-none-any.whl (378 kB)
     |████████████████████████████████| 378 kB 2.2 MB/s
Collecting pandas
  Downloading pandas-1.3.3-cp37-cp37m-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (11.3 MB)
     |████████████████████████████████| 11.3 MB 43.3 MB/s
Collecting certifi
  Downloading certifi-2021.5.30-py2.py3-none-any.whl (145 kB)
     |████████████████████████████████| 145 kB 61.6 MB/s
Collecting urllib3<2,>=1.21.1
  Downloading urllib3-1.26.7-py2.py3-none-any.whl (138 kB)
     |████████████████████████████████| 138 kB 62.6 MB/s
Collecting numpy>=1.17.3
  Downloading numpy-1.21.2-cp37-cp37m-manylinux_2_12_x86_64.manylinux2010_x86_64.whl (15.7 MB)
     |████████████████████████████████| 15.7 MB 43.6 MB/s
Collecting pytz>=2017.3
  Downloading pytz-2021.3-py2.py3-none-any.whl (503 kB)
     |████████████████████████████████| 503 kB 57.9 MB/s
Collecting python-dateutil>=2.7.3
  Downloading python_dateutil-2.8.2-py2.py3-none-any.whl (247 kB)
     |████████████████████████████████| 247 kB 63.2 MB/s
Collecting six>=1.5
  Downloading six-1.16.0-py2.py3-none-any.whl (11 kB)
Installing collected packages: certifi, urllib3, elasticsearch, numpy, pytz, six, python-dateutil, pandas
  WARNING: The scripts f2py, f2py3 and f2py3.7 are installed in '/usr/local/bin' which is not on PATH.
  Consider adding this directory to PATH or, if you prefer to suppress this warning, use --no-warn-script-location.
Successfully installed certifi-2021.5.30 elasticsearch-7.15.0 numpy-1.21.2 pandas-1.3.3 python-dateutil-2.8.2 pytz-2021.3 six-1.16.0 urllib3-1.26.7

# 다운로드받은 데이터 인코딩 타입 확인
# file -bi {data file}
[ec2-user@es-coordinater ~]$ file -bi /home/ec2-user/download_data/deleted-posts/deletedposts.csv
text/plain; charset=utf-16le

# utf8로 데이터 타입 변환
# iconv -f {data file encoding} -t UTF-8 {data file} > utf_{data file}
[ec2-user@es-coordinater ~]$ iconv -f utf-16le -t UTF-8 /home/ec2-user/download_data/deleted-posts/deletedposts.csv > /home/ec2-user/download_data/deleted-posts/utf8_deletedposts.csv

[ec2-user@es-coordinater deleted-posts]$ ll
total 8412600
-rw-rw-r-- 1 ec2-user ec2-user 5742999086 Mar 24  2021 deletedposts.csv
-rw-rw-r-- 1 ec2-user ec2-user 2871499545 Oct  2 17:20 utf8_deletedposts.csv
    
[ec2-user@es-coordinater deleted-posts]$ file -bi /home/ec2-user/download_data/deleted-posts/utf8_deletedposts.csv
text/plain; charset=utf-8

위와 같이 하였으면 아래와 같이 다운로드 받은 파일을 ES로 적재하는 python script를 작성한다.

  • /home/ec2-user/json_data_to_ES.py (다운로드 받은 파일이 json 형태일때)
from elasticsearch import Elasticsearch
import pprint as ppr
import json

json_file = "{file_name}"
index_name = "{index_name}"
doc_name = "{doc_name}"

es = Elasticsearch(
    # 마스터 계정정보를 이용하여 http로 ES와 통신
    hosts = [{'host': '{host}', 'port': '{port}'}],
    http_auth = ('{username}', '{password}'),
    scheme="http"
)

with open(json_file, "r", encoding="utf-8") as jsonfile:
    data = json.loads(jsonfile.read())
    for n, doc in enumerate(data):
        res = es.index(index=index_name, doc_type=doc_name,body=doc)

jsonfile.close()

print("=========================== {csv_file} ===========================")
print("ES index count : ", es.count(index=index_name,doc_type=doc_name)['count'])
  • /home/ec2-user/csv_data_to_ES.py (다운로드 받은 파일이 csv 형태일때)
from elasticsearch import helpers, Elasticsearch
import csv
from pprint import pprint
import sys
maxInt = sys.maxsize
# csv file read max로 증가
csv.field_size_limit(maxInt)
import warnings
warnings.filterwarnings(action='ignore')

csv_file = "{file_name}"
index_name = "{index_name}"
doc_name = "{doc_name}"

es = Elasticsearch(
    # 마스터 계정정보를 이용하여 http로 ES와 통신
    hosts = [{'host': '{host}', 'port': '{port}'}],
    http_auth = ('{username}', '{password}'),
    scheme="http"
)

with open(csv_file) as csvfile:
    reader = csv.DictReader(csvfile)
    helpers.bulk(es, reader, index=index_name, doc_type=doc_name)
    
csvfile.close()

print("=========================== {csv_file} ===========================")
print("ES index count : ", es.count(index=index_name,doc_type=doc_name)['count'])

아래와 같이 명령어를 실행하여 적재를 해보자.

[ec2-user@es-coordinater deleted-posts]$ cd ~

[ec2-user@es-coordinater ~]$ vim csv_data_to_ES.py
from elasticsearch import helpers, Elasticsearch
import csv
from pprint import pprint
import sys
maxInt = sys.maxsize
# csv file read max로 증가
csv.field_size_limit(maxInt)
import warnings
warnings.filterwarnings(action='ignore')

csv_file = "/home/ec2-user/download_data/deleted-posts/utf8_deletedposts.csv"
index_name = "minman_test_index"
doc_name = "minman_doc"

es = Elasticsearch(
    # 마스터 계정정보를 이용하여 http로 ES와 통신
    hosts = [{'host': '10.10.1.226', 'port': '9200'}],
    http_auth = ('elastic', 'xxxxxxxxxx'),
    scheme="http"
)

with open(csv_file) as csvfile:
    reader = csv.DictReader(csvfile)
    helpers.bulk(es, reader, index=index_name, doc_type=doc_name)

csvfile.close()

print("=========================== {csv_file} ===========================")
print("ES index count : ", es.count(index=index_name,doc_type=doc_name)['count'])

# nohup python3 -u {data 적재 script} > input.log &
[ec2-user@es-coordinater ~]$ nohup python3 -u /home/ec2-user/csv_data_to_ES.py > /home/ec2-user/input_to_ES.log &
[1] 32381

[ec2-user@es-coordinater ~]$ ps -ef | grep 32381
ec2-user 32381  6600 37 17:26 pts/0    00:00:11 python3 -u /home/ec2-user/csv_data_to_ES.py
ec2-user 32496  6600  0 17:27 pts/0    00:00:00 grep --color=auto 32381
            
# ES 적재후 로그를 확인해보면 아래와 같다.
[ec2-user@es-coordinater ~]$ cat input_to_ES.log
=========================== {csv_file} ===========================
ES index count :  10933000
    
# 클러스터에 존재하는 모든 index 조회
[ec2-user@es-coordinater ~]$ curl -XGET 'es-coordinater:9200/_cat/indices?v' -k -u elastic
Enter host password for user 'elastic':
health status index                               uuid                   pri rep docs.count docs.deleted store.size pri.store.size

...

green  open   minman_test_index                   nE6j6oTrRRORJn1--55Ung   1   1   10956404            0      7.1gb          3.5gb

...

# minman_test_index document 조회
[ec2-user@es-coordinater ~]$ curl -XGET 'es-coordinater:9200/minman_test_index/_search?pretty' -k -u elastic
Enter host password for user 'elastic':
{
  "took" : 3,
  "timed_out" : false,
  "_shards" : {
    "total" : 1,
    "successful" : 1,
    "skipped" : 0,
    "failed" : 0
  },
  "hits" : {
    "total" : {
      "value" : 10000,
      "relation" : "gte"
    },
    "max_score" : 1.0,
    "hits" : [
      {
        "_index" : "minman_test_index",
        "_type" : "minman_doc",
        "_id" : "RFE0QHwBD-vVNjEEU-wX",
        "_score" : 1.0,
        "_source" : {
          "PostID" : "3202B6632D2BD175CF8E2CE7C111C530855FA646                        ",
          "ParentPostID" : "C849682A57C86081FBED108FC86DE16A9DAE4946                        ",
          "ProfileID" : "9331D603C1268611940395393BB87A06322F2158                        ",
          "PostedOn" : "2017-04-07 13:10:29.973000000",
          "latitude" : "12.9",
          "longitude" : "77.5",
          "ImageID" : "0",
          "PostText" : "-1"
        }
      },
      {
        "_index" : "minman_test_index",
        "_type" : "minman_doc",
        "_id" : "RVE0QHwBD-vVNjEEU-wX",
        "_score" : 1.0,
        "_source" : {
          "PostID" : "CF57EB232757F0917B14CE466272F33901C2CC43                        ",
          "ParentPostID" : "BA8A1361CF1A0EB1A75A7ACBDBA9A7633F8377DA                        ",
          "ProfileID" : "62C34DC2A476C23B3D9DC69F055F153914A4303A                        ",
          "PostedOn" : "2017-04-07 13:10:31.990000000",
          "latitude" : "5.5",
          "longitude" : "-0.20000000000000001",
          "ImageID" : "0",
          "PostText" : "-1"
        }
      },
      {
        "_index" : "minman_test_index",
        "_type" : "minman_doc",
        "_id" : "RlE0QHwBD-vVNjEEU-wX",
        "_score" : 1.0,
        "_source" : {
          "PostID" : "B1FC5A13A8B7B47E12AD53C9CE01EB09733DC1E3                        ",
          "ParentPostID" : "5DE5620A8240157D97ADBC27826E1C9EDC23219C                        ",
          "ProfileID" : "7139F9A2AACC500EEF632B3E8D8C9D89119BC7E6                        ",
          "PostedOn" : "2017-04-07 13:10:31.990000000",
          "latitude" : "12.800000000000001",
          "longitude" : "77.599999999999994",
          "ImageID" : "0",
          "PostText" : "-1"
        }
      },
      {
        "_index" : "minman_test_index",
        "_type" : "minman_doc",
        "_id" : "R1E0QHwBD-vVNjEEU-wX",
        "_score" : 1.0,
        "_source" : {
          "PostID" : "F4E870B72F752E97A300E499CACC98FD38BC2196                        ",
          "ParentPostID" : "7984B0A0E139CABADB5AFC7756D473FB34D23819                        ",
          "ProfileID" : "1B2FA31C7A794E3422A2FB445BE7B11D055BFB69                        ",
          "PostedOn" : "2017-04-07 13:10:43.240000000",
          "latitude" : "56.899999999999999",
          "longitude" : "23.600000000000001",
          "ImageID" : "4",
          "PostText" : "-1"
        }
      },
      {
        "_index" : "minman_test_index",
        "_type" : "minman_doc",
        "_id" : "SFE0QHwBD-vVNjEEU-wX",
        "_score" : 1.0,
        "_source" : {
          "PostID" : "C70FCE67245C5F207B525E0F41343A46D2B2F9FF                        ",
          "ParentPostID" : "BD614D906722852D146F77C05B92CBEF9EC390E9                        ",
          "ProfileID" : "9331D603C1268611940395393BB87A06322F2158                        ",
          "PostedOn" : "2017-04-07 13:11:10.430000000",
          "latitude" : "12.9",
          "longitude" : "77.5",
          "ImageID" : "0",
          "PostText" : "-1"
        }
      },
      {
        "_index" : "minman_test_index",
        "_type" : "minman_doc",
        "_id" : "SVE0QHwBD-vVNjEEU-wX",
        "_score" : 1.0,
        "_source" : {
          "PostID" : "199AE9F18AEA8660B71F7635E367226F7912AF81                        ",
          "ParentPostID" : "04C67CD85AD7AD888FCB2FFF9A7C57FEB91E758C                        ",
          "ProfileID" : "7BC60E2783B43BC77A28A5B86235925C25DF1806                        ",
          "PostedOn" : "2017-04-07 13:11:16.510000000",
          "latitude" : "31",
          "longitude" : "73.900000000000006",
          "ImageID" : "0",
          "PostText" : "-1"
        }
      },
      {
        "_index" : "minman_test_index",
        "_type" : "minman_doc",
        "_id" : "SlE0QHwBD-vVNjEEU-wX",
        "_score" : 1.0,
        "_source" : {
          "PostID" : "7E3C99D8537DE704DAD7F8677C5E7550E8F2D157                        ",
          "ParentPostID" : "5DE5620A8240157D97ADBC27826E1C9EDC23219C                        ",
          "ProfileID" : "57C5D5FDA9983C32C0F2F1C45AB9D823B381DB1F                        ",
          "PostedOn" : "2017-04-07 13:11:45.627000000",
          "latitude" : "35.100000000000001",
          "longitude" : "-90",
          "ImageID" : "0",
          "PostText" : "-1"
        }
      },
      {
        "_index" : "minman_test_index",
        "_type" : "minman_doc",
        "_id" : "S1E0QHwBD-vVNjEEU-wX",
        "_score" : 1.0,
        "_source" : {
          "PostID" : "E1AC93DCD740B90437E631F9AAF793ACC6301D94                        ",
          "ParentPostID" : "7984B0A0E139CABADB5AFC7756D473FB34D23819                        ",
          "ProfileID" : "C7671F94A8BB4462083DB0FAE4F34F9C0CBC7F08                        ",
          "PostedOn" : "2017-04-07 13:12:00.083000000",
          "latitude" : "54.5",
          "longitude" : "-5.9000000000000004",
          "ImageID" : "1",
          "PostText" : "-1"
        }
      },
      {
        "_index" : "minman_test_index",
        "_type" : "minman_doc",
        "_id" : "TFE0QHwBD-vVNjEEU-wX",
        "_score" : 1.0,
        "_source" : {
          "PostID" : "D8CE1F3368026B2384E42F038FA7117B7AC5EECC                        ",
          "ParentPostID" : "7984B0A0E139CABADB5AFC7756D473FB34D23819                        ",
          "ProfileID" : "B3260A90EC5CE59A01FA16FAC105C9867B0EB451                        ",
          "PostedOn" : "2017-04-07 13:12:02.570000000",
          "latitude" : "14.300000000000001",
          "longitude" : "121.09999999999999",
          "ImageID" : "8",
          "PostText" : "-1"
        }
      },
      {
        "_index" : "minman_test_index",
        "_type" : "minman_doc",
        "_id" : "TVE0QHwBD-vVNjEEU-wX",
        "_score" : 1.0,
        "_source" : {
          "PostID" : "DA6EAEB616822A876E8AAFA6C244F1D769A943AF                        ",
          "ParentPostID" : "7984B0A0E139CABADB5AFC7756D473FB34D23819                        ",
          "ProfileID" : "A2ECCDCA067861D9453D26D6D48F1FA989D083B2                        ",
          "PostedOn" : "2017-04-07 13:12:10.227000000",
          "latitude" : "52.200000000000003",
          "longitude" : "-0.90000000000000002",
          "ImageID" : "12",
          "PostText" : "-1"
        }
      }
    ]
  }
}