Notice
Recent Posts
Recent Comments
Link
| 일 | 월 | 화 | 수 | 목 | 금 | 토 |
|---|---|---|---|---|---|---|
| 1 | 2 | 3 | 4 | 5 | 6 | |
| 7 | 8 | 9 | 10 | 11 | 12 | 13 |
| 14 | 15 | 16 | 17 | 18 | 19 | 20 |
| 21 | 22 | 23 | 24 | 25 | 26 | 27 |
| 28 | 29 | 30 |
Tags
- 테라폼
- invalid apiVersion
- 빠르게
- ERR_CERT_INVALID
- tfenv
- v1alpha1
- 맥북 엑셀 한글
- 인증서
- 맥북 한글
- Chrome
- ovirt
- yum
- 맥북 백업
- Cannot upload enabled repos report
- EKS
- 버전관리
- 연결이 비공개
- rhv
- 테라폼 버전
- OSX 엑셀
- AWS
- terraform
- 타임머신
Archives
- Today
- Total
hisosic
(Python) AWS boto3 활용 S3 Multipart Upload 그리고 Docker Container Control 본문
Cloud/AWS
(Python) AWS boto3 활용 S3 Multipart Upload 그리고 Docker Container Control
hisosic 2021. 9. 8. 18:04
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import threading
import boto3
import logging
import sys
import os
import datetime
import shutil
import argparse
import requests
from boto3.s3.transfer import TransferConfig
from timeit import default_timer
docker_path = "/app"
db_dir = "/data"
send_url = (
"https://hooks.slack.com/services/TB#######/B0########/asdfasdfasdf"
)
def parse_args():
parser = argparse.ArgumentParser(description="leveldb Backup")
parser.add_argument("-db", "--db_path",
help=f"DB Path", default=f"{db_dir}")
parser.add_argument(
"-r",
"--region",
type=str,
help=f"Region",
choices=["kr", "jp"],
default="kr"
)
parser.add_argument("-p", "--profile", default=None)
parser.add_argument('-ut', '--upload-type', type=str,
help=f'upload type', choices=["single", "multi"], default="multi")
parser.add_argument(
"-n",
"--network",
type=str,
help=f"Service name",
choices=["Service1", "Service2"],
default="Service1",
)
return parser.parse_args()
def multi_part_upload_with_s3(filename=None, key_path=None, region=None, upload_type="single"):
start_time = default_timer()
bucket_name_prefix = "service-backup-"
if region is None or region == "":
BUCKET_NAME = f"{bucket_name_prefix}kr"
else:
BUCKET_NAME = f"{bucket_name_prefix}{region}"
if region == "hk":
s3 = boto3.resource(
's3',
region_name="ap-east-1"
)
else:
s3 = boto3.resource(
's3',
)
# single parts
if upload_type == "single":
s3.meta.client.meta.events.register(
'choose-signer.s3.*', disable_signing)
# config = TransferConfig(use_threads=True, multipart_threshold=1024*1024*8, multipart_chunksize=1024*1024*8)
config = TransferConfig(multipart_threshold=838860800, max_concurrency=10, multipart_chunksize=8388608,
num_download_attempts=5, max_io_queue=100, io_chunksize=262144, use_threads=True)
# multiparts mode -> AWS S3 CLI: Anonymous users cannot initiate multipart uploads
elif upload_type == "multi":
pass
config = TransferConfig(multipart_threshold=1024 * 25, max_concurrency=10,
multipart_chunksize=1024 * 25, use_threads=True)
else:
cprint(f"Unknown upload_type-> {upload_type}", "red")
if filename is None:
cprint(f"[ERROR] filename is None", "red")
raise SystemExit()
if key_path is None:
key_path = filename
try:
s3.meta.client.upload_file(filename, BUCKET_NAME, key_path,
# ExtraArgs={'ACL': 'public-read', 'ContentType': 'text/pdf'},
Config=config,
Callback=ProgressPercentage(filename)
)
except Exception as e:
e = str(e).replace(":", ":\n")
cprint(f"\n[ERROR] File upload fail / cause->{e}\n", "red")
raise SystemExit()
elapsed = default_timer() - start_time
time_completed_at = "{:5.3f}s".format(elapsed)
cprint(f"\n\t time_completed_at = {time_completed_at}")
class ProgressPercentage(object):
def __init__(self, filename):
self._filename = filename
self._size = float(os.path.getsize(filename))
self._seen_so_far = 0
self._lock = threading.Lock()
self._prevent_bytes = 0
def __call__(self, bytes_amount):
# To simplify we'll assume this is hooked up
# to a single filename.
with self._lock:
self._seen_so_far += bytes_amount
percentage = (self._seen_so_far / self._size) * 100
# tx = bytes_amount - self._prevent_bytes
sys.stdout.write(
"\r \t %s %s / %s (%.2f%%) " % (
self._filename, sizeof_fmt(
self._seen_so_far), sizeof_fmt(self._size),
percentage))
sys.stdout.flush
def upload_s3(filename=None):
s3 = boto3.client('s3')
filename = 'data/dummy.file'
s3.upload_file(filename, BUCKET_NAME, filename)
def dump(obj, nested_level=0, output=sys.stdout):
class bcolors:
HEADER = "\033[95m"
OKBLUE = "\033[94m"
OKGREEN = "\033[92m"
WARNING = "\033[93m"
FAIL = "\033[91m"
ENDC = "\033[0m"
BOLD = "\033[1m"
UNDERLINE = "\033[4m"
spacing = " "
def_spacing = " "
if type(obj) == dict:
print("%s{" % (def_spacing + (nested_level) * spacing))
for k, v in obj.items():
if hasattr(v, "__iter__"):
print(
bcolors.OKGREEN
+ "%s%s:" % (def_spacing + (nested_level + 1) * spacing, k)
+ bcolors.ENDC,
end="",
)
dump(v, nested_level + 1, output)
else:
print(
bcolors.OKGREEN
+ "%s%s:" % (def_spacing + (nested_level + 1) * spacing, k)
+ bcolors.WARNING
+ " %s" % v
+ bcolors.ENDC,
file=output,
)
print("%s}" % (def_spacing + nested_level * spacing), file=output)
elif type(obj) == list:
print("%s[" % (def_spacing + (nested_level) * spacing), file=output)
for v in obj:
if hasattr(v, "__iter__"):
dump(v, nested_level + 1, output)
else:
print(
bcolors.WARNING
+ "%s%s" % (def_spacing + (nested_level + 1) * spacing, v)
+ bcolors.ENDC,
file=output,
)
print("%s]" % (def_spacing + (nested_level) * spacing), file=output)
else:
print(
bcolors.WARNING
+ "%s%s" % (def_spacing + nested_level * spacing, obj)
+ bcolors.ENDC
)
def get_loopchain_state(ipaddr="localhost", port=os.environ.get("RPC_PORT", 9000)):
url = f"http://{ipaddr}:{port}/api/v1/status/peer"
try:
r = requests.get(url, verify=False, timeout=5)
peer_status = r.json()["status"]
if peer_status == "Service is online: 0":
block_height = r.json()["block_height"]
print("Block_Height : ", block_height)
global upload_filename
upload_filename = (
f"{args.network}_BH{block_height}_data-{today_time}.tar.gz"
)
else:
print(f"Please check the peer status. : {peer_status}")
send_slack(send_url, "Please check the peer status. : ",
peer_status, "error")
sys.exit(1)
except:
print(f"Please check the node status. : {url}")
send_slack(send_url, "Please check the node status. : ", url, "error")
sys.exit(1)
return block_height
def as_file_remove():
global backup_dir
backup_dir = os.path.join(db_path, "BACKUP")
filelist = os.listdir(f"{backup_dir}")
for item in filelist:
if item.endswith(f".tar.gz"):
os.remove(os.path.join(backup_dir, item))
print("delete", f"{backup_dir}", item)
def sizeof_fmt(num, suffix="B"):
for unit in ["", "K", "M", "G", "T", "P", "E", "Z"]:
if abs(num) < 1024.0:
return "%3.1f%s%s" % (num, unit, suffix)
num /= 1024.0
return "%.1f%s%s" % (num, "Y", suffix)
def get_dir_size(path):
total = 0
with os.scandir(path) as it:
for entry in it:
if entry.is_file():
total += entry.stat().st_size
elif entry.is_dir():
total += get_dir_size(entry.path)
return total
def as_percent(num, denom):
return float(num) / float(denom) * 100.0
def run_peer(docker_path, dc_file, run_mode):
docker_cmd = "docker-compose"
os.chdir(docker_path)
# print(os.getcwd())
# print(run_mode)
# file_lists = os.listdir(run_path)
docker_files = [file for file in os.listdir(
docker_path) if file.endswith(".yml")]
# print(len(docker_files))
if len(docker_files) > 0:
# print(docker_files)
if len(docker_files) > 1:
fileNameAndTimeList = []
for file in docker_files:
file_mtime = os.path.getmtime(file)
fileNameAndTimeList.append((file, file_mtime))
sortlist = sorted(fileNameAndTimeList,
key=lambda x: x[1], reverse=True)
dc_file = sortlist[0][0]
else:
dc_file = docker_files[0]
# run mode
run_mode = run_mode.lower()
try:
if run_mode == "start":
docker_ops = f" -f {dc_file}"
run_stat = os.system(docker_cmd + docker_ops + " up -d")
elif run_mode == "stop" or run_mode == "down":
docker_ops = f" -f {dc_file}"
run_stat = os.system(docker_cmd + docker_ops + " down")
elif run_mode == "status":
run_stat = os.system(docker_cmd + " ps")
else:
print(f"Not found run_mode : {run_mode} - [start|stop|status]")
return run_stat
except:
pass
def createFolder(directory):
try:
if not os.path.exists(directory):
os.makedirs(directory)
except OSError:
print("Error: Creating directory. " + directory)
def db_file_compress():
os.chdir(db_path)
cmd = f'tar -I "zstd -T10" -cf {upload_filename} {db_dir}'
print(f"cmd = {cmd}")
if os.system(f"{cmd}") is not 0:
send_slack(send_url, "Compress Failed", cmd, "error")
sys.exit(1)
def send_slack(url, msg_text, code, msgLevel="info"):
import socket
webhook_url = url
channel_name = "#hisosic_alert" # Slack channel name
send_user_name = "AWS S3 Backup Upload" # Sender name or description
msg_title = "MSG TITLE" # Notification Title MSG
msgLevel = msgLevel.lower()
# Message Level color
green_line = "5be312"
yello_line = "f2c744"
red_line = "f70202"
if msgLevel == "info":
p_color = green_line
elif msgLevel == "warn" or msgLevel == "warnning":
p_color = yello_line
elif msgLevel == "error":
p_color = red_line
else:
p_color = green_line
payload = {
# https://app.slack.com/block-kit-builder
"channel": channel_name,
"username": send_user_name,
"text": msg_title,
"blocks": [{"type": "divider"}],
"attachments": [
{
"color": "#" + p_color,
"blocks": [
{
"type": "section",
"text": {
"type": "plain_text",
"text": f"Job Title : {msg_title}",
},
},
{
"type": "section",
"text": {
"type": "plain_text",
"text": f'{"+ [HOST]":^12s} : {socket.gethostname()}, {requests.get("https://api.ipify.org").text}',
},
},
{
"type": "section",
"text": {
"type": "plain_text",
"text": f'{"+ [DATE]":^12s} : {(datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3])}',
},
},
{
"type": "section",
"text": {
"type": "plain_text",
"text": f'{"+ [DESC]":^12s} : {args.network} {msg_text} {code}',
},
},
],
}
],
}
post_result = requests.post(webhook_url, json=payload)
def main():
global args
args = parse_args()
global db_path
if args.db_path:
db_path = args.db_path
else:
db_path = db_dir
# Backup WorkDir Create
createFolder(os.path.join(db_path, "BACKUP"))
# Node Status Check
get_loopchain_state()
# ASIS Backup File Delete
as_file_remove()
# Disk Free Size Check
total, used, free = shutil.disk_usage(db_path)
dirsize = get_dir_size(db_path)
# dirfree = "%.2f%%" % (dirsize / free * 100.0)
dirfree = dirsize / free * 100.0
if dirfree > 70:
print(f"Not enough disk space :", "%.2f%%" % dirfree)
send_slack(send_url, "Not enough disk space : ",
"%.2f%%" % dirfree, "error")
sys.exit(1)
else:
print(f"You have enough disk space :", "%.2f%%" % dirfree)
# Node Container Stop
# run_peer(f"{docker_path}", "docker-compose.yml", "stop")
# LevelDB File Compress
db_file_compress()
# Node Container Start
# run_peer(f"{docker_path}", "docker-compose.yml", "start")
# S3 Bucket Upload
multi_part_upload_with_s3(
f"{upload_filename}", f"{args.network}/{today}/{upload_filename}", args.region, args.upload_type)
# Backup Finish Message
send_slack(send_url, "Backup File Upload Finish", upload_filename, "info")
if __name__ == "__main__":
today = datetime.datetime.today().strftime("%Y%m%d")
today_time = datetime.datetime.today().strftime("%Y%m%d_%H%M")
main()
'Cloud > AWS' 카테고리의 다른 글
| boto3 리전 별 EC2 Instance 전체 정보 가져오기 (ID,Type,EBS,TAG) (0) | 2023.02.21 |
|---|---|
| iostat 로 EBS 볼륨 마이크로버스트 확인 (0) | 2022.01.03 |
| fio 사용 EBS 볼륨 속도 측정 (Random Access Performonce) (0) | 2021.09.02 |
Comments