Boto3를 이용한 Amazon S3, Kinesis Data Firehose 사용하기

테스트 목적

AWS를 Python 개발환경에서 사용하기 위한 방법중 하나는 Boto3 라이브러리를 이용하는 것이다.

이 테스트는 Boto3를 이용하여 아래의 서비스에 접근하여 사용하는 방법을 공유하기 위한 목적이 있다.

  • Amazon S3

  • Kinesis Firehose

테스트 환경

테스트 환경은 다음과 같다.

OS : macOS mojave 10.14.6

Python 버전 : 3.7.3

Boto3 버전 : 1.9.215

AWS CLI 버전 정보

  • aws-cli : 1.16.222 

  • Python : 2.7.16 

  • Darwin : 18.7.0 

  • botocore : 1.12.212

Tree 버전 : 1.8.0

테스트 사전 정보

다음과 같은 사전 정보가 필요하다.

  • 테스트 환경 구성 방법

  • awscli 설치 및 사용법

테스트 환경 구성 방법

virtual env로 환경을 구성한다.

$ virtualenv .venv
Using base prefix '/usr/local/Cellar/python/3.7.3/Frameworks/Python.framework/Versions/3.7'
New python executable in /Users/kevin/dev/boto3_test/amazon_s3/.venv/bin/python3.7
Also creating executable in /Users/kevin/dev/boto3_test/amazon_s3/.venv/bin/python
Installing setuptools, pip, wheel... 

virtual env 활성화후 boto3 설치

$ source .venv/bin/activate
$ pip3 install boto3

AWS CLI 사용법

다음 링크를 참고한다.


테스트 : Amazon S3

테스트 목적

S3 사용에 있어 다음과 같은 케이스가 발생할 것으로 예상된다.

  • 로컬 파일을 S3에 업로드 하기

  • S3에서 로컬로 파일 다운로드하기

  • S3에서 파일 삭제하기 

  • S3의 디렉터리를 로컬로 다운로드하기

  • 로컬 디렉터리를 S3에 업로드 하기

이 테스트는 위의 케이스 별로 시나리오대로 진행하고 나온 결과가 올바른 지를 확인하는 데에 목적이 있다.

케이스별 공통사항

세부 케이스를 진행하는데 이 테스트에서는 다음과 같은 공통사항이 있다.

  • 미리 지정한 profile로 session 인스턴스 생성

  • session 인스턴스로 s3 resource 인스턴스 생성

Python 코드로 정리하면 다음과 같다.

import boto3

def get_s3():
    # aws profile
    session = boto3.Session(profile_name="test")
    s3 = session.resource('s3')
    return s3 

케이스 : 로컬 파일을 S3에 업로드 하기

다음과 같은 시나리오를 바탕으로 테스트를 진행한다.

  1. 로컬 파일을 생성한다.

  2. 생성된 로컬 파일을 s3://bucket/boto3-test/ 경로에 업로드 한다.

다음과 같이 Python 코드를 작성하고 실행한다.

import boto3

def get_s3():
    session = boto3.Session(profile_name="test")
    s3 = session.resource('s3')
    return s3


def make_file(path="./boto3_file_upload_test.txt"):
    with open(path, "w") as wf:
        wf.write("This file boto3 file upload test !!!!")


def main():
    # make file
    file_path = "./boto3_file_upload_test.txt"
    make_file(file_path)

    # upload s3
    s3 = get_s3()
    bucket = s3.Bucket('bucket')
    bucket.upload_file(
        file_path, 'boto3-test/boto3_file_upload_test.txt')


if __name__ == "__main__":
    main()

생성한 로컬 파일이 s3에 업로드 되었는지 확인한다.

$ aws s3 ls s3://bucket/boto3-test/
2019-08-23 14:39:54          0
2019-08-23 14:45:08         37 boto3_file_upload_test.txt

테스트 결과는 다음과 정리할 수 있다.

  • 파일을 생성하여 Boto3를 이용하여 S3에 업로드한다.

  • AWS CLI를 통해 생성된 파일이 업로드가 된 것을 확인할 수 있다.

케이스 : S3에서 로컬로 파일 다운로드하기

다음과 같은 시나리오를 바탕으로 테스트를 진행한다.

s3://bucket/boto3-test/boto3_file_upload.txt 파일을 로컬로 다운로드 한다.

다음과 같이 Python코드를 작성하고 실행한다.

import boto3

def get_s3():
    # aws profile
    session = boto3.Session(profile_name="test")
    s3 = session.resource('s3')
    return s3

def main():
    download_file_path = "./boto3_file_upload_test2.txt"

    # download s3
    s3 = get_s3()
    bucket = s3.Bucket('bucket')
    bucket.download_file(
        'boto3-test/boto3_file_upload_test.txt', download_file_path)

if __name__ == "__main__":
    main()

다운로드한 파일을 확인하면 다음과 같다.

$ cat boto3_file_upload_test2.txt
This file boto3 file upload test !!!!%

테스트 결과는 다음과 정리할 수 있다.

  • Boto3를 이용하여 S3의 업로드된 파일을 다운로드한다.

  • 커널 명령을 통해 실제로 S3에서 파일이 다운로드 된 것을 확인할 수 있다.

케이스 : S3의 파일 삭제하기 

다음과 같은 시나리오를 바탕으로 테스트를 진행한다.

s3://bucket/boto3-test/boto3_file_upload.txt 파일을 삭제한다.

다음과 같이 Python코드를 작성하고 실행한다.

import boto3

def get_s3():
    # aws profile
    session = boto3.Session(profile_name="test")
    s3 = session.resource('s3')
    return s3

def main():
    # delete s3
    s3 = get_s3()
    bucket = s3.Bucket('bucket')
    obj = bucket.Object('boto3-test/boto3_file_upload_test.txt')
    obj.delete()

if __name__ == "__main__":
    main()

삭제 되었는지 확인하면 다음과 같다.

$ aws s3 ls s3://bucket/boto3-test/
2019-08-23 14:39:54          0

테스트 결과는 다음과 정리할 수 있다.

  • Boto3를 이용하여 S3의 업로드된 파일을 삭제한다.

  • AWS CLI를 통해 실제로 S3에서 파일이 삭제 된 것을 확인할 수 있었다

케이스 : S3 디렉터리 다운로드 하기

다음과 같은 시나리오를 바탕으로 테스트를 진행한다.

위의 S3 경로 s3://bucket/boto3-test/download_test_directory 디렉터리( 디렉터리는 제외) 로컬로 다운로드한다.

다운로드 할 파일을 확인하면 다음과 같다.

$ aws s3 ls s3://bucket/boto3-test/download_test_directory --recursive
2019-08-26 19:07:16         39 boto3-test/download_test_directory/dir1/dir1_1/dir1_1_file1.txt
2019-08-26 19:07:17         39 boto3-test/download_test_directory/dir1/dir1_1/dir1_1_file2.txt
2019-08-26 19:07:15         39 boto3-test/download_test_directory/dir1/dir1_file1.txt
2019-08-26 19:07:14         39 boto3-test/download_test_directory/dir2/dir2_file1.txt
2019-08-26 19:07:14         39 boto3-test/download_test_directory/dir2/dir2_file2.txt
2019-08-26 19:08:10          0 boto3-test/download_test_directory/dir3/
2019-08-26 19:07:13         39 boto3-test/download_test_directory/file1.txt
2019-08-26 19:07:13         39 boto3-test/download_test_directory/file2.txt 

다음과 같은 Python 코드를 작성하고 실행한다.

import boto3
import os


def get_s3():
    session = boto3.Session(profile_name="test")
    s3 = session.resource('s3')
    return s3


def download_directory_to_s3(s3_dir_path, local_dir_path):
    s3 = get_s3()
    bucket = s3.Bucket('bucket')

    path_list = list()
    for obj in bucket.objects.filter(Prefix=s3_dir_path):
        s3_path = obj.key
        local_path = s3_path.replace(s3_dir_path, local_dir_path)

        if s3_path[-1] == '/':
            continue

        dir_name = os.path.dirname(local_path)
        if not os.path.exists(dir_name):
            print("make directory {}".format(dir_name))
            os.makedirs(dir_name)

        path_list.append([s3_path, local_path])

    for path in path_list:
        print("Download from {} to {}".format(path[0], path[1]))
        bucket.download_file(path[0], path[1])


def main():
    parameter = {"s3_dir_path": "boto3-test/download_test_directory/",
                 "local_dir_path": "./download_test_directory/",
                 }
    download_directory_to_s3(**parameter)


if __name__ == "__main__":
    main()

디렉터리가 정상적으로 다운로드 되었는지 확인한다.

tree download_test_directory
download_test_directory
├── dir1
   ├── dir1_1
      ├── dir1_1_file1.txt
      └── dir1_1_file2.txt
   └── dir1_file1.txt
├── dir2
   ├── dir2_file1.txt
   └── dir2_file2.txt
├── file1.txt
└── file2.txt
3 directories, 7 files

테스트 결과는 다음과 같이 정리할 수 있다.

  • Boto3를 이용하여 S3의 업로드된 디렉터리를 로컬로 다운로드한다. (빈 디렉터리는 제외)

  • 커널 명령을 통해 실제로 S3에서 로컬로 디렉터리가 다운로드 된 것을 확인할 수 있다.

케이스 : 로컬 디렉터리를 S3에 업로드 하기

다음과 같은 시나리오를 바탕으로 테스트를 진행한다.

로컬 디렉터리( 디렉터리는 제외) s3 경로 s3://bucket/boto3-test/upload_test_directory에 업로드 한다.

업로드할 디렉터리는 다음과 같은 구조로 이루어져 있다.

$ tree upload_test_directory
upload_test_directory
├── dir1
   ├── dir1_1
      ├── dir1_1_file1.txt
      └── dir1_1_file2.txt
   └── dir1_file1.txt
├── dir2
   ├── dir2_file1.txt
   └── dir2_file2.txt
├── dir3
├── file1.txt
└── file2.txt

다음과 같은 Python 코드를 작성하고 실행한다.

import boto3
import os


def get_s3():
    session = boto3.Session(profile_name="test")
    s3 = session.resource('s3')
    return s3


def upload_directory_to_s3(local_dir_path, s3_dir_path):
    path_list = list()
    for path, sub_dirs, files in os.walk(local_dir_path):
        for filename in files:
            local_file_path = os.path.join(path, filename)

            s3_file_path_depart = [s3_dir_path,
                                   os.path.relpath(local_file_path, local_dir_path)]
            s3_file_path = "".join(s3_file_path_depart)
            path_list.append([local_file_path, s3_file_path])

    s3 = get_s3()
    bucket = s3.Bucket('bucket')

    for paths in path_list:
        print("upload from {} to {}".format(paths[0], paths[1]))
        bucket.upload_file(paths[0], paths[1])


def main():
    parameter = {"local_dir_path": "./upload_test_directory/",
                 "s3_dir_path": "boto3-test/upload_test_directory/"}
    upload_directory_to_s3(**parameter)


if __name__ == "__main__":
    main()

os 모듈중 정리가 필요한 메서드를 아래에 정리하였다.

os.walk(디렉터리 경로)

  • 해당 디렉터리 경로를 중심으로 하위 디렉터리를 탐색하여 generator 타입로 반환해준다.

  • 반환되는 정보는 path, sub_dirs, files 형태이며 이에 대한 설명은 다음과 같다

    • path : 경로

    • sub_dirs : path 하위 디렉터리들의 이름들

    • files : path 하위 파일의 이름들

디렉터리의 업로드가 잘 되었는지 확인하면 다음과 같다.

$ aws s3 ls s3://bucket/boto3-test/upload_test_directory/ --recursive
2019-08-26 16:42:02       6148 boto3-test/upload_test_directory/.DS_Store
2019-08-26 16:42:04       6148 boto3-test/upload_test_directory/dir1/.DS_Store
2019-08-26 16:42:05       6148 boto3-test/upload_test_directory/dir1/dir1_1/.DS_Store
2019-08-26 16:42:07         37 boto3-test/upload_test_directory/dir1/dir1_1/dir1_1_file1.txt
2019-08-26 16:42:08         37 boto3-test/upload_test_directory/dir1/dir1_1/dir1_1_file2.txt
2019-08-26 16:42:04         37 boto3-test/upload_test_directory/dir1/dir1_file1.txt
2019-08-26 16:42:03         37 boto3-test/upload_test_directory/dir2/dir2_file1.txt
2019-08-26 16:42:03         37 boto3-test/upload_test_directory/dir2/dir2_file2.txt
2019-08-26 16:42:02         37 boto3-test/upload_test_directory/file1.txt
2019-08-26 16:42:01         37 boto3-test/upload_test_directory/file2.txt 

테스트 결과는 다음과 같이 정리할 수 있다.

  • Boto3를 이용하여 로컬 디렉터리(빈 디렉터리는 제외)를 S3에 업로드한다.

  • AWS CLI 명령을 통해 실제로 로컬에서 S3에 업로드 된 것을 확인할 수 있었다.


테스트 : Kinesis Data Firehose

테스트 목적 

Kinesis Data Firehose 사용에 있어 다음과 같은 케이스가 발생할 것으로 예상된다. 

  • Delivery stream 리스트 확인하기

  • 단일 레코드를 Firehose에 보내기

  • 복수 레코드를 Firehose에 보내기

이 테스트는 위의 케이스 별로 시나리오대로 진행하고 나온 결과가 올바른 지를 확인하는 데에 목적이 있다.

테스트전 확인사항

테스트전 Firehose 정상 동작 확인

Firehose가 정상적으로 동작하는지에 대한 확인은 Management console상에서 해당 Delivery Stream의 “Test with demo data”를 통하여 알 수 있다.

케이스 : Delivery stream 리스트 확인하기

다음과 같은 시나리오를 바탕으로 진행한다.

Firehose 클라이언트에 접근하여 접근가능한 Delivery stream의 목록을 출력한다.

다음과 같은 파이썬 코드를 작성한다.

import boto3

def get_firehose_client():
    session = boto3.Session(profile_name="test")
    firehose = session.client("firehose")
    return firehose


def list_delivery_streams():
    firehose = get_firehose_client()

    response_direct_put = firehose.list_delivery_streams(
        DeliveryStreamType="DirectPut")
    response_kinesis_stream = firehose.list_delivery_streams(
        DeliveryStreamType="KinesisStreamAsSource")
    print("List of Deliver Streams")
    print("Direct Put : {}".format(response_direct_put["DeliveryStreamNames"]))
    print("Kinesis Stream As Source : {}".format(
        response_kinesis_stream["DeliveryStreamNames"]))


def main():
    list_delivery_streams()


if __name__ == "__main__":
    main()

파이썬 코드를 실행하면 다음과 같은 결과를 확인할 수 있다.

$ python3 main.py
List of Deliver Streams
DirectPut : ['test-firehose-es-poc']
KinesisStream As Source : ['test-stream-firehose-es-poc']

AWS CLI로 Delivery stream 리스트를 확인하면 다음과 같다.

$ aws firehose list-delivery-streams
{
 "DeliveryStreamNames": [
 "test-firehose-es-poc",
 "test-stream-firehose-es-poc"
 ],
 "HasMoreDeliveryStreams": false
}

테스트 결과는 다음과 같이 정리할 수 있다.

  • Boto3를 이용하여 Delivery Stream의 목록을 확인할 수 있다.

  • AWS CLI 명령을 통해 실제로 확인한 결과와 Boto3로 확인한 결과가 동일함을 알 수 있다.

케이스 : 단일 레코드를 Firehose에 보내기

다음과 같은 시나리오를 바탕으로 진행한다.

다음과 같은 샘플 데이터를 ”test-firehose-es-poc” Delivery stream으로 전송한다.

{
	"user_id": "Aster-Kotlin-Sapphire-564",
	"app_version": "v1.5.16(90)",
	"device_id": "a03cd3d6-0b14-46e1-a2c8-18b1c86cf738",
	"device_manufacturer": "LG",
	"device_name": "LG G7 ThinQ",
	"device_model": "LM-G710N",
	"device_os": "Android",
	"device_os_number": "8.0",
	"event": "interest_service_click",
	"interest_id": 1201,
	"index": 1,
	"interest_name": "창업/스타트업",
	"interest_service_id": None,
	"interest_service_type": "2",
	"interest_service_name": "piano",
	"timestamp": "2019-04-30T07:30:00Z"
}

“test-firehose-es-poc” Deliver stream은 Direct put으로 들어온 데이터를 Amazon elasticsearch에 전송하고 s3에 백업 데이터를 생성한다.

다음과 같은 Python 코드를 작성한다.

import boto3
import json


def get_firehose_client():
    session = boto3.Session(profile_name="test")
    firehose = session.client("firehose")
    return firehose


def put_record_to_delivery_stream(test_data):
    print("Put record to Delivery Stream")

    firehose = get_firehose_client()

    response = firehose.put_record(
        DeliveryStreamName="test-firehose-es-poc",
        Record={"Data": json.dumps(test_data, ensure_ascii=False)}
    )

    print(response)


def main():
    test_data = {
        "user_id": "Aster-Kotlin-Sapphire-564",
        "app_version": "v1.5.16(90)",
        "device_id": "a03cd3d6-0b14-46e1-a2c8-18b1c86cf738",
        "device_manufacturer": "LG",
        "device_name": "LG G7 ThinQ",
        "device_model": "LM-G710N",
        "device_os": "Android",
        "device_os_number": "8.0",
        "event": "interest_service_click",
        "interest_id": 1201,
        "index": 1,
        "interest_name": "창업/스타트업",
        "interest_service_id": None,
        "interest_service_type": "2",
        "interest_service_name": "piano",
        "timestamp": "2019-04-30T07:30:00Z"
    }

    put_record_to_delivery_stream(test_data)


if __name__ == "__main__":
    main()

Python 코드를 실행하면 다음과 같다.

$ python3 main.py
Put record to Delivery Stream
{'RecordId': 'OkvHgs4vIsuRjYw0O5VZB1EobIDBMq7nrvxe/eXDMtdQEDC+eFnEL3T5cILNN+pGD4cu284LsuuC0dAWO7GCZcU3tVcxnr8gFfYfFMsNLuz6KLrbOAV1NaN6oPcxhicH5CpdBHAsxFoZPugkAGe7axq9odkI+CMIsD76w7pHUF2I7rQUb8hswnKp1iHx6O6Ip4MbvwefA2U1kPyyqnP1hTPprg/M8xY4', 'Encrypted': False, 'ResponseMetadata': {'RequestId': 'f609df57-39b8-ccff-a3d4-eadd7b964c48', 'HTTPStatusCode': 200, 'HTTPHeaders': {'x-amzn-requestid': 'f609df57-39b8-ccff-a3d4-eadd7b964c48', 'x-amz-id-2': 'Rwyt+v8XyhuqHJYMgpacWRLeSUAQg7Im9hEVdeR/4Utu49+fOyxC20NmHppA9UUg2v+FiDfvVquVx1K7mrGFUiVXOXmLalUd', 'content-type': 'application/x-amz-json-1.1', 'content-length': '257', 'date': 'Mon, 26 Aug 2019 01:18:02 GMT'}, 'RetryAttempts': 0}}

실제로 Amazon elasticsearch에 샘플데이터가 입력 되었는지 확인한다.

$ curl -XGET https://{es_address}/interest_service_click2/_search\?filter_path\=hits\&format\=json\&pretty
{
  "hits" : {
    "total" : 1,
    "max_score" : 1.0,
    "hits" : [
      {
        "_index" : "interest_service_click2",
        "_type" : "isc",
        "_id" : "49595937143545169094912243846452981551249391789660962818.0",
        "_score" : 1.0,
        "_source" : {
          "user_id" : "Aster-Kotlin-Sapphire-564",
          "app_version" : "v1.5.16(90)",
          "device_id" : "a03cd3d6-0b14-46e1-a2c8-18b1c86cf738",
          "device_manufacturer" : "LG",
          "device_name" : "LG G7 ThinQ",
          "device_model" : "LM-G710N",
          "device_os" : "Android",
          "device_os_number" : "8.0",
          "event" : "interest_service_click",
          "interest_id" : 1201,
          "index" : 1,
          "interest_name" : "창업/스타트업",
          "interest_service_id" : null,
          "interest_service_type" : "2",
          "interest_service_name" : "piano",
          "timestamp" : "2019-04-30T07:30:00Z"
        }
      }
    ]
  }
}

Amazon elasticsearch의 백업용도로 저장된 S3를 확인하면 다음과 같다.

$ aws s3 ls s3://test-es-raw-poc/2019/08/26/01/
2019-08-26 10:19:05        520 test-firehose-es-poc-1-2019-08-26-01-18-03-d2346539-c5c2-4385-b163-4b60b14ea33c

$ aws s3 cp s3://test-es-raw-poc/2019/08/26/01/test-firehose-es-poc-1-2019-08-26-01-18-03-d2346539-c5c2-4385-b163-4b60b14ea33c ./single_record.txt
download: s3://test-es-raw-poc/2019/08/26/01/test-firehose-es-poc-1-2019-08-26-01-18-03-d2346539-c5c2-4385-b163-4b60b14ea33c to ./single_record.txt

$ cat single_record.txt
{"user_id": "Aster-Kotlin-Sapphire-564", "app_version": "v1.5.16(90)", "device_id": "a03cd3d6-0b14-46e1-a2c8-18b1c86cf738", "device_manufacturer": "LG", "device_name": "LG G7 ThinQ", "device_model": "LM-G710N", "device_os": "Android", "device_os_number": "8.0", "event": "interest_service_click", "interest_id": 1201, "index": 1, "interest_name": "창업/스타트업", "interest_service_id": null, "interest_service_type": "2", "interest_service_name": "piano", "timestamp": "2019-04-30T07:30:00Z"}%

테스트 결과는 다음과 같이 정리할 수 있다.

  • Boto3를 이용하여 단일 레코드를 firehose에 전송한다.

  • Amazon elasticsearch의 index와 S3의 백업 데이터를 확인한 결과 단일 레코드가 정상적으로 전송된 것을 확인할 수 있다.

케이스 : 다중 레코드를 firehose에 보내기

다음과 같은 시나리오를 바탕으로 진행한다.

다음과 같은 샘플 데이터를 “test-firehose-es-poc” Delivery stream으로 전송한다.

{ "user_id" : "Aster-Kotlin-Sapphire-564", "app_version" : "v1.5.16(90)", "device_id" : "a03cd3d6-0b14-46e1-a2c8-18b1c86cf738", "device_manufacturer" : "LG", "device_name" : "LG G7 ThinQ", "device_model" : "LM-G710N", "device_os" : "Android", "device_os_number" : "8.0", "event" : "interest_service_click", "interest_id" : 1201, "index" : 1, "interest_name" : "창업/스타트업", "interest_service_id" : null, "interest_service_type" : "2", "interest_service_name" : "piano", "timestamp" : "2019-05-03T01:15:00Z" }
{ "user_id" : "Aster-Kotlin-Sapphire-564", "app_version" : "v1.5.16(90)", "device_id" : "a03cd3d6-0b14-46e1-a2c8-18b1c86cf738", "device_manufacturer" : "LG", "device_name" : "LG G7 ThinQ", "device_model" : "LM-G710N", "device_os" : "Android", "device_os_number" : "8.0", "event" : "interest_service_click", "interest_id" : 1301, "index" : 1, "interest_name" : "취업", "interest_service_id" : null, "interest_service_type" : "배워봐요", "interest_service_name" : "학원",  "timestamp" : "2019-05-03T00:50:00Z" }
{ "user_id" : "Aster-Kotlin-Sapphire-564", "app_version" : "v1.5.16(90)", "device_id" : "a03cd3d6-0b14-46e1-a2c8-18b1c86cf738", "device_manufacturer" : "LG", "device_name" : "LG G7 ThinQ", "device_model" : "LM-G710N", "device_os" : "Android", "device_os_number" : "8.0", "event" : "interest_service_click", "interest_id" : 1101, "index" : 1, "interest_name" : "초등학교", "interest_service_id" : null, "interest_service_type" : "물어봐요", "interest_service_name" : "대나무숲", "timestamp" : "2019-05-03T00:20:00Z" }

“test-firehose-es-poc” Deliver stream은 Direct put으로 들어온 데이터를 Amazon elasticsearch에 전송하고 s3에 백업 데이터를 생성한다.

다음과 같은 Python 코드를 작성한다.

import boto3
import json


def get_firehose_client():
    session = boto3.Session(profile_name="test")
    firehose = session.client("firehose")
    return firehose

def batch_record_to_delivery_stream(test_data_list):
    print("Put multiple records to Deliver Stream")

    records = list(map(lambda data : {"Data": json.dumps(data, ensure_ascii=False)}, test_data_list))

    firehose = get_firehose_client()
    
    response = firehose.put_record_batch(DeliveryStreamName="test-firehose-es-poc",Records=records)
    print(response)

def main():
    test_data_list = [
            {
                "user_id": "Aster-Kotlin-Sapphire-564",
                "app_version": "v1.5.16(90)",
                "device_id": "a03cd3d6-0b14-46e1-a2c8-18b1c86cf738",
                "device_manufacturer": "LG", "device_name": "LG G7 ThinQ", "device_model": "LM-G710N", "device_os": "Android", "device_os_number": "8.0",
                "event": "interest_service_click",
                "interest_id": 1101,
                "index": 1,
                "interest_name": "초등학교", "interest_service_id": None, "interest_service_type": "물어봐요","interest_service_name": "대나무숲",
                "timestamp": "2019-05-03T00:20:00Z"
            },
            {
                "user_id": "Aster-Kotlin-Sapphire-564",
                "app_version": "v1.5.16(90)",
                "device_id": "a03cd3d6-0b14-46e1-a2c8-18b1c86cf738",
                "device_manufacturer": "LG", "device_name": "LG G7 ThinQ", "device_model": "LM-G710N", "device_os": "Android", "device_os_number": "8.0",
                "event": "interest_service_click",
                "interest_id": 1301,
                "index": 1,
                "interest_name": "취업", "interest_service_id": None, "interest_service_type": "배워봐요","interest_service_name": "학원",
                "timestamp": "2019-05-03T00:50:00Z"
            },
            {
                "user_id": "Aster-Kotlin-Sapphire-564",
                "app_version": "v1.5.16(90)",
                "device_id": "a03cd3d6-0b14-46e1-a2c8-18b1c86cf738",
                "device_manufacturer": "LG", "device_name": "LG G7 ThinQ", "device_model": "LM-G710N", "device_os": "Android", "device_os_number": "8.0",
                "event": "interest_service_click",
                "interest_id": 1201,
                "index": 1,
                "interest_name": "창업/스타트업", "interest_service_id": None, "interest_service_type": "2","interest_service_name": "piano",
                "timestamp": "2019-05-03T01:15:00Z"
            },
            {
                "user_id": "Aster-Kotlin-Sapphire-564",
                "app_version": "v1.5.16(90)",
                "device_id": "a03cd3d6-0b14-46e1-a2c8-18b1c86cf738",
                "device_manufacturer": "LG",
                "device_name": "LG G7 ThinQ",
                "device_model": "LM-G710N",
                "device_os": "Android",
                "device_os_number": "8.0",
                "event": "interest_service_click",
                "interest_id": 1201,
                "index": 1,
                "interest_name": "창업/스타트업",
                "interest_service_id": None,
                "interest_service_type": "2",
                "interest_service_name": "piano",
                "timestamp": "2019-04-30T07:30:00Z"
            }
        ]
    
	batch_record_to_delivery_stream(test_data_list)

if __name__ == "__main__":
    main()

위의 코드를 실행하면 다음과 같다.

$ python3 main.py
Put multiple records to Deliver Stream
{'FailedPutCount': 0, 'Encrypted': False, 'RequestResponses': [{'RecordId': 'JvtN3+H/2oeKA6g087JPipW+g8STzCQQp50mcIw5FQ0wQIC2EihXrB1C31kMjNXY0lcQvXo6bn3H1O/PQMWI3veWxxCb9vkL3lieblJOu+v/q8TLAlsfNCsMADGz9yPLQg4B5uRfogNKv1+M44c6RnAHIg+n5aX/K8UUgbVVF47ZeyiJsokbshNmhO7F8+3BA4rAdXW2O4hSZj640O0RAtw6QuTB3CQN'}, {'RecordId': 'JT+5YNtI/3nwtCiiaoA/w5pBxkHYdbJFwGv8KiYDk7cTjIhenCGmnlBZqpwDswVWNGdnTT+e/qEP3VQI2dx1cwjHDpRRlilNBOYwKZaueoazeQ77p2NKgarweztvVVK1r91PjIlEUlW3fJfSAqiZsCoa6NQDs3u51gRNxi8Z9iBA/3uUXDehUkszxmlIzpvFKmrYrJ6UGIdBDHIpyqmWYdCAwAXJJTBk'}, {'RecordId': 'Mriv4BFuAEpziVO0hX0zZsoCspb8Sl4fdTOUitqYYTKGhfr+7noUWfYTTp0QF0JkuS8z/KHEr8aiV53IrLe860NzZY1mSN4ZAxmIKLxQqwQceFaj0xEBJJuYjrSOuDKRVD3rCz8n/Ab4QzEP/FOrUioKK7uf4kySsE7TvTkW1pWeKHabE27FjK/+9x4//3ep3fjp5K3TO81UX6Xk7AFa5BxiGUjVzcTI'}], 'ResponseMetadata': {'RequestId': 'c59f84d4-d42a-432f-9042-d44926133eb6', 'HTTPStatusCode': 200, 'HTTPHeaders': {'x-amzn-requestid': 'c59f84d4-d42a-432f-9042-d44926133eb6', 'x-amz-id-2': '01OYNLqVZ9T7VGgpQ/pSJ1MtwblA5/kShluUDM6sLzv4ncJEyTrM7lHoKxEwo1EWzbXZmIu7Nz0oaw72UhQhAzAl3Ji9BgqZ', 'content-type': 'application/x-amz-json-1.1', 'content-length': '779', 'date': 'Mon, 26 Aug 2019 03:13:32 GMT'}, 'RetryAttempts': 0}}

실제 Amazon elasticsearch에서 레코드가 입력되었는지 확인하면 다음과 같다.

$ curl -XGET https://{es_address}/interest_service_click2/_search\?filter_path\=hits\&format\=json\&pretty
{
  "hits" : {
    "total" : 4,
    "max_score" : 1.0,
    "hits" : [
      {
        "_index" : "interest_service_click2",
        "_type" : "isc",
        "_id" : "49595937143545169094912246787272632161781037484956712962.0",
        "_score" : 1.0,
        "_source" : {
          "user_id" : "Aster-Kotlin-Sapphire-564",
          "app_version" : "v1.5.16(90)",
          "device_id" : "a03cd3d6-0b14-46e1-a2c8-18b1c86cf738",
          "device_manufacturer" : "LG",
          "device_name" : "LG G7 ThinQ",
          "device_model" : "LM-G710N",
          "device_os" : "Android",
          "device_os_number" : "8.0",
          "event" : "interest_service_click",
          "interest_id" : 1201,
          "index" : 1,
          "interest_name" : "창업/스타트업",
          "interest_service_id" : null,
          "interest_service_type" : "2",
          "interest_service_name" : "piano",
          "timestamp" : "2019-05-03T01:15:00Z"
        }
      },
      {
        "_index" : "interest_service_click2",
        "_type" : "isc",
        "_id" : "49595937143545169094912246787271423235961422855782006786.0",
        "_score" : 1.0,
        "_source" : {
          "user_id" : "Aster-Kotlin-Sapphire-564",
          "app_version" : "v1.5.16(90)",
          "device_id" : "a03cd3d6-0b14-46e1-a2c8-18b1c86cf738",
          "device_manufacturer" : "LG",
          "device_name" : "LG G7 ThinQ",
          "device_model" : "LM-G710N",
          "device_os" : "Android",
          "device_os_number" : "8.0",
          "event" : "interest_service_click",
          "interest_id" : 1301,
          "index" : 1,
          "interest_name" : "취업",
          "interest_service_id" : null,
          "interest_service_type" : "배워봐요",
          "interest_service_name" : "학원",
          "timestamp" : "2019-05-03T00:50:00Z"
        }
      },
      {
        "_index" : "interest_service_click2",
        "_type" : "isc",
        "_id" : "49595937143545169094912246787270214310141808226607300610.0",
        "_score" : 1.0,
        "_source" : {
          "user_id" : "Aster-Kotlin-Sapphire-564",
          "app_version" : "v1.5.16(90)",
          "device_id" : "a03cd3d6-0b14-46e1-a2c8-18b1c86cf738",
          "device_manufacturer" : "LG",
          "device_name" : "LG G7 ThinQ",
          "device_model" : "LM-G710N",
          "device_os" : "Android",
          "device_os_number" : "8.0",
          "event" : "interest_service_click",
          "interest_id" : 1101,
          "index" : 1,
          "interest_name" : "초등학교",
          "interest_service_id" : null,
          "interest_service_type" : "물어봐요",
          "interest_service_name" : "대나무숲",
          "timestamp" : "2019-05-03T00:20:00Z"
        }
      },
    ]
  }
}

백업용 s3 데이터를 확인하면 다음과 같다.

$ aws s3 ls s3://test-es-raw-poc/2019/08/26/03/
2019-08-26 12:14:35       1528 test-firehose-es-poc-1-2019-08-26-03-13-33-dd434742-87da-4cbe-b62a-b84c0366215f

$ aws s3 cp s3://test-es-raw-poc/2019/08/26/03/test-firehose-es-poc-1-2019-08-26-03-13-33-dd434742-87da-4cbe-b62a-b84c0366215f ~/dev/boto3_test/kinesis_data_firehose/multiple_record.txt
download: s3://test-es-raw-poc/2019/08/26/03/test-firehose-es-poc-1-2019-08-26-03-13-33-dd434742-87da-4cbe-b62a-b84c0366215f to dev/boto3_test/kinesis_data_firehose/multiple_record.txt

$ cat dev/boto3_test/kinesis_data_firehose/multiple_record.txt
{"user_id": "Aster-Kotlin-Sapphire-564", "app_version": "v1.5.16(90)", "device_id": "a03cd3d6-0b14-46e1-a2c8-18b1c86cf738", "device_manufacturer": "LG", "device_name": "LG G7 ThinQ", "device_model": "LM-G710N", "device_os": "Android", "device_os_number": "8.0", "event": "interest_service_click", "interest_id": 1101, "index": 1, "interest_name": "초등학교", "interest_service_id": null, "interest_service_type": "물어봐요", "interest_service_name": "대나무숲", "timestamp": "2019-05-03T00:20:00Z"}
{"user_id": "Aster-Kotlin-Sapphire-564", "app_version": "v1.5.16(90)", "device_id": "a03cd3d6-0b14-46e1-a2c8-18b1c86cf738", "device_manufacturer": "LG", "device_name": "LG G7 ThinQ", "device_model": "LM-G710N", "device_os": "Android", "device_os_number": "8.0", "event": "interest_service_click", "interest_id": 1301, "index": 1, "interest_name": "취업", "interest_service_id": null, "interest_service_type": "배워봐요", "interest_service_name": "학원", "timestamp": "2019-05-03T00:50:00Z"}
{"user_id": "Aster-Kotlin-Sapphire-564", "app_version": "v1.5.16(90)", "device_id": "a03cd3d6-0b14-46e1-a2c8-18b1c86cf738", "device_manufacturer": "LG", "device_name": "LG G7 ThinQ", "device_model": "LM-G710N", "device_os": "Android", "device_os_number": "8.0", "event": "interest_service_click", "interest_id": 1201, "index": 1, "interest_name": "창업/스타트업", "interest_service_id": null, "interest_service_type": "2", "interest_service_name": "piano", "timestamp": "2019-05-03T01:15:00Z"}%

테스트 결과는 다음과 같이 정리할 수 있다.

  • Boto3를 이용하여 다중 레코드를 firehose에 전송한다.

  • Amazon elasticsearch의 index와 S3의 백업 데이터를 확인한 결과 다중 레코드가 정상적으로 전송된 것을 확인할 수 있다.

댓글남기기