hisosic

(Python) AWS boto3 활용 S3 Multipart Upload 그리고 Docker Container Control 본문

Cloud/AWS

(Python) AWS boto3 활용 S3 Multipart Upload 그리고 Docker Container Control

hisosic 2021. 9. 8. 18:04

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import threading
import boto3
import logging
import sys
import os
import datetime
import shutil
import argparse
import requests
from boto3.s3.transfer import TransferConfig
from timeit import default_timer


docker_path = "/app"
db_dir = "/data"
send_url = (
    "https://hooks.slack.com/services/TB#######/B0########/asdfasdfasdf"
)


def parse_args():
    parser = argparse.ArgumentParser(description="leveldb Backup")
    parser.add_argument("-db", "--db_path",
                        help=f"DB Path", default=f"{db_dir}")
    parser.add_argument(
        "-r",
        "--region",
        type=str,
        help=f"Region",
        choices=["kr", "jp"],
        default="kr"
    )
    parser.add_argument("-p", "--profile", default=None)
    parser.add_argument('-ut', '--upload-type', type=str,
                        help=f'upload type', choices=["single", "multi"], default="multi")
    parser.add_argument(
        "-n",
        "--network",
        type=str,
        help=f"Service name",
        choices=["Service1", "Service2"],
        default="Service1",
    )
    return parser.parse_args()


def multi_part_upload_with_s3(filename=None, key_path=None, region=None, upload_type="single"):
    start_time = default_timer()
    bucket_name_prefix = "service-backup-"
    if region is None or region == "":
        BUCKET_NAME = f"{bucket_name_prefix}kr"
    else:
        BUCKET_NAME = f"{bucket_name_prefix}{region}"
    if region == "hk":
        s3 = boto3.resource(
            's3',
            region_name="ap-east-1"
        )
    else:
        s3 = boto3.resource(
            's3',
        )
    # single parts
    if upload_type == "single":
        s3.meta.client.meta.events.register(
            'choose-signer.s3.*', disable_signing)
        # config = TransferConfig(use_threads=True, multipart_threshold=1024*1024*8, multipart_chunksize=1024*1024*8)
        config = TransferConfig(multipart_threshold=838860800, max_concurrency=10, multipart_chunksize=8388608,
                                num_download_attempts=5, max_io_queue=100, io_chunksize=262144, use_threads=True)
    # multiparts mode -> AWS S3 CLI: Anonymous users cannot initiate multipart uploads
    elif upload_type == "multi":
        pass
        config = TransferConfig(multipart_threshold=1024 * 25, max_concurrency=10,
                                multipart_chunksize=1024 * 25, use_threads=True)
    else:
        cprint(f"Unknown upload_type-> {upload_type}", "red")
    if filename is None:
        cprint(f"[ERROR] filename is None", "red")
        raise SystemExit()
    if key_path is None:
        key_path = filename
    try:
        s3.meta.client.upload_file(filename, BUCKET_NAME, key_path,
                                   # ExtraArgs={'ACL': 'public-read', 'ContentType': 'text/pdf'},
                                   Config=config,
                                   Callback=ProgressPercentage(filename)
                                   )
    except Exception as e:
        e = str(e).replace(":", ":\n")
        cprint(f"\n[ERROR] File upload fail / cause->{e}\n", "red")
        raise SystemExit()

    elapsed = default_timer() - start_time
    time_completed_at = "{:5.3f}s".format(elapsed)

    cprint(f"\n\t time_completed_at = {time_completed_at}")


class ProgressPercentage(object):
    def __init__(self, filename):
        self._filename = filename
        self._size = float(os.path.getsize(filename))
        self._seen_so_far = 0
        self._lock = threading.Lock()
        self._prevent_bytes = 0

    def __call__(self, bytes_amount):
        # To simplify we'll assume this is hooked up
        # to a single filename.
        with self._lock:
            self._seen_so_far += bytes_amount
            percentage = (self._seen_so_far / self._size) * 100
            # tx = bytes_amount - self._prevent_bytes
            sys.stdout.write(
                "\r \t %s  %s / %s  (%.2f%%) " % (
                    self._filename, sizeof_fmt(
                        self._seen_so_far), sizeof_fmt(self._size),
                    percentage))
            sys.stdout.flush


def upload_s3(filename=None):
    s3 = boto3.client('s3')
    filename = 'data/dummy.file'
    s3.upload_file(filename, BUCKET_NAME, filename)


def dump(obj, nested_level=0, output=sys.stdout):
    class bcolors:
        HEADER = "\033[95m"
        OKBLUE = "\033[94m"
        OKGREEN = "\033[92m"
        WARNING = "\033[93m"
        FAIL = "\033[91m"
        ENDC = "\033[0m"
        BOLD = "\033[1m"
        UNDERLINE = "\033[4m"

    spacing = "   "
    def_spacing = "   "
    if type(obj) == dict:
        print("%s{" % (def_spacing + (nested_level) * spacing))
        for k, v in obj.items():
            if hasattr(v, "__iter__"):
                print(
                    bcolors.OKGREEN
                    + "%s%s:" % (def_spacing + (nested_level + 1) * spacing, k)
                    + bcolors.ENDC,
                    end="",
                )
                dump(v, nested_level + 1, output)
            else:
                print(
                    bcolors.OKGREEN
                    + "%s%s:" % (def_spacing + (nested_level + 1) * spacing, k)
                    + bcolors.WARNING
                    + " %s" % v
                    + bcolors.ENDC,
                    file=output,
                )
        print("%s}" % (def_spacing + nested_level * spacing), file=output)
    elif type(obj) == list:
        print("%s[" % (def_spacing + (nested_level) * spacing), file=output)
        for v in obj:
            if hasattr(v, "__iter__"):
                dump(v, nested_level + 1, output)
            else:
                print(
                    bcolors.WARNING
                    + "%s%s" % (def_spacing + (nested_level + 1) * spacing, v)
                    + bcolors.ENDC,
                    file=output,
                )
        print("%s]" % (def_spacing + (nested_level) * spacing), file=output)
    else:
        print(
            bcolors.WARNING
            + "%s%s" % (def_spacing + nested_level * spacing, obj)
            + bcolors.ENDC
        )



def get_loopchain_state(ipaddr="localhost", port=os.environ.get("RPC_PORT", 9000)):
    url = f"http://{ipaddr}:{port}/api/v1/status/peer"
    try:
        r = requests.get(url, verify=False, timeout=5)
        peer_status = r.json()["status"]
        if peer_status == "Service is online: 0":
            block_height = r.json()["block_height"]
            print("Block_Height : ", block_height)
            global upload_filename
            upload_filename = (
                f"{args.network}_BH{block_height}_data-{today_time}.tar.gz"
            )
        else:
            print(f"Please check the peer status. : {peer_status}")
            send_slack(send_url, "Please check the peer status. : ",
                       peer_status, "error")
            sys.exit(1)
    except:
        print(f"Please check the node status. : {url}")
        send_slack(send_url, "Please check the node status. : ", url, "error")
        sys.exit(1)
    return block_height


def as_file_remove():
    global backup_dir
    backup_dir = os.path.join(db_path, "BACKUP")
    filelist = os.listdir(f"{backup_dir}")
    for item in filelist:
        if item.endswith(f".tar.gz"):
            os.remove(os.path.join(backup_dir, item))
            print("delete", f"{backup_dir}", item)


def sizeof_fmt(num, suffix="B"):
    for unit in ["", "K", "M", "G", "T", "P", "E", "Z"]:
        if abs(num) < 1024.0:
            return "%3.1f%s%s" % (num, unit, suffix)
        num /= 1024.0
    return "%.1f%s%s" % (num, "Y", suffix)


def get_dir_size(path):
    total = 0
    with os.scandir(path) as it:
        for entry in it:
            if entry.is_file():
                total += entry.stat().st_size
            elif entry.is_dir():
                total += get_dir_size(entry.path)
    return total


def as_percent(num, denom):
    return float(num) / float(denom) * 100.0


def run_peer(docker_path, dc_file, run_mode):
    docker_cmd = "docker-compose"

    os.chdir(docker_path)
    # print(os.getcwd())
    # print(run_mode)

    # file_lists = os.listdir(run_path)
    docker_files = [file for file in os.listdir(
        docker_path) if file.endswith(".yml")]

    # print(len(docker_files))

    if len(docker_files) > 0:
        # print(docker_files)
        if len(docker_files) > 1:
            fileNameAndTimeList = []
            for file in docker_files:
                file_mtime = os.path.getmtime(file)
                fileNameAndTimeList.append((file, file_mtime))
            sortlist = sorted(fileNameAndTimeList,
                              key=lambda x: x[1], reverse=True)
            dc_file = sortlist[0][0]
        else:
            dc_file = docker_files[0]

    # run mode
    run_mode = run_mode.lower()
    try:
        if run_mode == "start":
            docker_ops = f" -f {dc_file}"
            run_stat = os.system(docker_cmd + docker_ops + " up -d")
        elif run_mode == "stop" or run_mode == "down":
            docker_ops = f" -f {dc_file}"
            run_stat = os.system(docker_cmd + docker_ops + " down")
        elif run_mode == "status":
            run_stat = os.system(docker_cmd + " ps")
        else:
            print(f"Not found run_mode : {run_mode}  - [start|stop|status]")

        return run_stat
    except:
        pass


def createFolder(directory):
    try:
        if not os.path.exists(directory):
            os.makedirs(directory)
    except OSError:
        print("Error: Creating directory. " + directory)


def db_file_compress():
    os.chdir(db_path)
    cmd = f'tar -I "zstd -T10" -cf {upload_filename} {db_dir}'
    print(f"cmd = {cmd}")

    if os.system(f"{cmd}") is not 0:
        send_slack(send_url, "Compress Failed", cmd, "error")
        sys.exit(1)


def send_slack(url, msg_text, code, msgLevel="info"):
    import socket

    webhook_url = url
    channel_name = "#hisosic_alert"  # Slack channel name
    send_user_name = "AWS S3 Backup Upload"  # Sender name or description
    msg_title = "MSG TITLE"  # Notification Title MSG

    msgLevel = msgLevel.lower()

    # Message Level color
    green_line = "5be312"
    yello_line = "f2c744"
    red_line = "f70202"

    if msgLevel == "info":
        p_color = green_line
    elif msgLevel == "warn" or msgLevel == "warnning":
        p_color = yello_line
    elif msgLevel == "error":
        p_color = red_line
    else:
        p_color = green_line

    payload = {
        # https://app.slack.com/block-kit-builder
        "channel": channel_name,
        "username": send_user_name,
        "text": msg_title,
        "blocks": [{"type": "divider"}],
        "attachments": [
            {
                "color": "#" + p_color,
                "blocks": [
                    {
                        "type": "section",
                        "text": {
                            "type": "plain_text",
                            "text": f"Job Title : {msg_title}",
                        },
                    },
                    {
                        "type": "section",
                        "text": {
                            "type": "plain_text",
                            "text": f'{"+ [HOST]":^12s} : {socket.gethostname()}, {requests.get("https://api.ipify.org").text}',
                        },
                    },
                    {
                        "type": "section",
                        "text": {
                            "type": "plain_text",
                            "text": f'{"+ [DATE]":^12s} : {(datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3])}',
                        },
                    },
                    {
                        "type": "section",
                        "text": {
                            "type": "plain_text",
                            "text": f'{"+ [DESC]":^12s} : {args.network} {msg_text} {code}',
                        },
                    },
                ],
            }
        ],
    }

    post_result = requests.post(webhook_url, json=payload)


def main():
    global args
    args = parse_args()

    global db_path
    if args.db_path:
        db_path = args.db_path
    else:
        db_path = db_dir

    # Backup WorkDir Create
    createFolder(os.path.join(db_path, "BACKUP"))

    # Node Status Check
    get_loopchain_state()

    # ASIS Backup File Delete
    as_file_remove()

    # Disk Free Size Check

    total, used, free = shutil.disk_usage(db_path)
    dirsize = get_dir_size(db_path)
    # dirfree = "%.2f%%" % (dirsize / free * 100.0)
    dirfree = dirsize / free * 100.0
    if dirfree > 70:
        print(f"Not enough disk space :", "%.2f%%" % dirfree)
        send_slack(send_url, "Not enough disk space : ",
                   "%.2f%%" % dirfree, "error")
        sys.exit(1)
    else:
        print(f"You have enough disk space :", "%.2f%%" % dirfree)

    # Node Container Stop
    # run_peer(f"{docker_path}", "docker-compose.yml", "stop")

    # LevelDB File Compress
    db_file_compress()

    # Node Container Start
    # run_peer(f"{docker_path}", "docker-compose.yml", "start")

    # S3 Bucket Upload
    multi_part_upload_with_s3(
        f"{upload_filename}", f"{args.network}/{today}/{upload_filename}", args.region, args.upload_type)

    # Backup Finish Message
    send_slack(send_url, "Backup File Upload Finish", upload_filename, "info")


if __name__ == "__main__":
    today = datetime.datetime.today().strftime("%Y%m%d")
    today_time = datetime.datetime.today().strftime("%Y%m%d_%H%M")
    main()

 

Comments