170 lines
8.4 KiB
Python
170 lines
8.4 KiB
Python
# Copyright 2021 Alibaba Group Holding Limited.
|
||
#
|
||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||
# you may not use this file except in compliance with the License.
|
||
# You may obtain a copy of the License at
|
||
#
|
||
# http://www.apache.org/licenses/LICENSE-2.0
|
||
#
|
||
# Unless required by applicable law or agreed to in writing, software
|
||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||
# See the License for the specific language governing permissions and
|
||
# limitations under the License.
|
||
|
||
import json
|
||
import re
|
||
import subprocess
|
||
|
||
import click
|
||
|
||
from core.context import Context
|
||
from core.log import LogFactory
|
||
from core.convention import *
|
||
from core.backup_restore.xstore_binlog import XStoreBinlog
|
||
from core.backup_restore.storage.filestream_client import FileStreamClient, BackupStorage
|
||
|
||
|
||
@click.group(name="binlogbackup")
|
||
def binbackup_group():
|
||
pass
|
||
|
||
|
||
@click.command(name='start')
|
||
@click.option('--backup_context', required=True, type=str)
|
||
@click.option('-si', '--start_index', required=True, type=str)
|
||
@click.option('-g', '--gms_label', required=True, type=str)
|
||
@click.option('-xs', '--xstore_name', required=True, type=str)
|
||
def start_binlogbackup(backup_context, start_index, gms_label, xstore_name):
|
||
logger = LogFactory.get_logger("binlogbackup.log")
|
||
|
||
with open(backup_context, 'r') as f:
|
||
params = json.load(f)
|
||
remote_binlog_end_offset_path = params["binlogEndOffsetPath"]
|
||
indexes_path = params["indexesPath"]
|
||
remote_binlog_backup_dir = params["binlogBackupDir"]
|
||
storage_name = params["storageName"]
|
||
sink = params["sink"]
|
||
|
||
logger.info("start binlog backup")
|
||
context = Context()
|
||
mysql_port = context.port_access()
|
||
binlog = XStoreBinlog(mysql_port)
|
||
log_dir = context.volume_path(VOLUME_DATA, "log")
|
||
backup_dir = context.volume_path(VOLUME_DATA, 'backup')
|
||
local_binlog_backup_dir = os.path.join(backup_dir, "binlogbackup")
|
||
|
||
filestream_client = FileStreamClient(context, BackupStorage[str.upper(storage_name)], sink)
|
||
|
||
os.makedirs(local_binlog_backup_dir, exist_ok=True)
|
||
|
||
# 获取binlog的起始文件和最终文件
|
||
min_log_name = get_min_log_name(context, log_dir, start_index, logger)
|
||
logger.info("min_log_name:%s" % min_log_name)
|
||
if gms_label == "true":
|
||
# Todo: optimization cut gms binlog
|
||
max_log_name, max_log_index = get_max_log_from_offset_gms(filestream_client, remote_binlog_end_offset_path,
|
||
local_binlog_backup_dir, logger)
|
||
else:
|
||
max_log_name, max_log_index = get_max_log_from_cp(filestream_client, indexes_path, local_binlog_backup_dir,
|
||
xstore_name, logger)
|
||
# 将可上传的binlog上传
|
||
binlog_list = binlog.get_local_binlog(min_binlog_name=min_log_name, max_binglog_name=max_log_name,
|
||
left_contain=True, right_contain=False)
|
||
upload_binlog_info(binlog_list, log_dir, remote_binlog_backup_dir, filestream_client, logger)
|
||
truncate_and_upload_binlog_info(context, log_dir, local_binlog_backup_dir, remote_binlog_backup_dir,
|
||
filestream_client, max_log_name, max_log_index, logger)
|
||
|
||
# 记录所有上传的binlog_name_list,用于后续恢复时下载binlog
|
||
uploaded_binlog_list = [log_name for i, (log_name, start_log_index) in enumerate(binlog_list)]
|
||
if max_log_name not in uploaded_binlog_list:
|
||
uploaded_binlog_list.append(max_log_name)
|
||
filestream_client.upload_from_string(remote=os.path.join(remote_binlog_backup_dir, "binlog_list"),
|
||
string='\n'.join(uploaded_binlog_list), logger=logger)
|
||
logger.info("List of uploaded binlog:%s", uploaded_binlog_list)
|
||
|
||
logger.info("upload finished")
|
||
|
||
|
||
def get_min_log_name(context, log_dir, start_index, logger):
|
||
logger.info("start_index: %s " % str(start_index))
|
||
for i in range(1,999999):
|
||
binlog_tmp_name = "mysql_bin.%06d" % i
|
||
binlog_tmp_file = os.path.join(log_dir,binlog_tmp_name)
|
||
if os.path.exists(binlog_tmp_file):
|
||
tailor_cmd = [context.mysqlbinlogtailor,"--show-index-info", binlog_tmp_file]
|
||
logger.info("tailor_cmd:%s" % tailor_cmd)
|
||
with subprocess.Popen(tailor_cmd,stdout=subprocess.PIPE) as pipe:
|
||
index_line = pipe.stdout.readline().decode("utf-8").lstrip('[')
|
||
start_log_index = index_line.split(',')[0].split(':')[0]
|
||
end_log_index = index_line.split(',')[1].split(':')[0].strip()
|
||
logger.info("tailer_info:%s" % index_line)
|
||
if int(start_log_index) <= int(start_index) <= int(end_log_index):
|
||
logger.info("start_log_index:%s;end_log_index:%s;" % (str(start_log_index),str(end_log_index)))
|
||
return binlog_tmp_name
|
||
if int(start_log_index) > int(start_index):
|
||
return ""
|
||
else:
|
||
continue
|
||
|
||
|
||
def get_max_log_from_offset_gms(filestream_client, binlog_end_offset_path, binlog_backup_dir, logger):
|
||
local_end_offset_file_path = os.path.join(binlog_backup_dir, "binlogOffsetEnd")
|
||
filestream_client.download_to_file(remote=binlog_end_offset_path, local=local_end_offset_file_path, logger=logger)
|
||
with open(local_end_offset_file_path) as f:
|
||
end = f.readline().rstrip('\n')
|
||
max_log_name = end.split(':')[0]
|
||
max_index_name = end.split(':')[1]
|
||
logger.info("end_offset: %s;max_log_name:%s;max_log_index:%s" % (str(end), max_log_name, str(max_index_name)))
|
||
return max_log_name, max_index_name
|
||
|
||
|
||
# noinspection DuplicatedCode
|
||
def get_max_log_from_cp(filestream_client, indexes_path, binlog_backup_dir, xstore_name, logger):
|
||
indexes_local_path = os.path.join(binlog_backup_dir, "indexes")
|
||
filestream_client.download_to_file(remote=indexes_path, local=indexes_local_path, logger=logger)
|
||
xstore_pattern = xstore_name + ':' # such as "pxc-dn-1:"
|
||
with open(indexes_local_path, 'r') as f:
|
||
for text_line in f.readlines():
|
||
m = re.search(xstore_pattern, text_line)
|
||
if m:
|
||
max_log_info = text_line.split(xstore_pattern)[-1].strip()
|
||
break
|
||
logger.info("max_log_info:" + max_log_info)
|
||
return max_log_info.split(':')[0], max_log_info.split(':')[1]
|
||
|
||
|
||
def truncate_and_upload_binlog_info(context, log_dir, binlogbackup_dir, binlogbackupdir_path, filestream_client,
|
||
max_log_name, max_log_index, logger):
|
||
binlog_file_path = os.path.join(log_dir, max_log_name)
|
||
truncated_log_name = "{}_trunc.{}".format(*max_log_name.split('.'))
|
||
truncate_file_path = os.path.join(binlogbackup_dir, truncated_log_name)
|
||
last_event_timestamp_path = os.path.join(binlogbackup_dir, "last_event_timestamp")
|
||
truncate_cmd = "%s truncate %s --end-offset %s -o %s" % (context.bb_home, binlog_file_path,
|
||
max_log_index, truncate_file_path)
|
||
logger.info("truncate_cmd:" + truncate_cmd)
|
||
with subprocess.Popen(truncate_cmd, shell=True, stdout=subprocess.PIPE) as pipe:
|
||
logger.info("cut max log")
|
||
output = pipe.stdout.readline().decode("utf-8")
|
||
res = re.search(r"LAST EVENT TIMESTAMP: (\d+)", output)
|
||
if res:
|
||
last_event_timestamp = res.group(1)
|
||
logger.info("last event timestamp: " + last_event_timestamp)
|
||
with open(last_event_timestamp_path, 'w') as f: # use to display in pxb right after binlog backup
|
||
f.write(last_event_timestamp)
|
||
filestream_client.upload_from_file(remote=os.path.join(binlogbackupdir_path, "last_event_timestamp"),
|
||
local=last_event_timestamp_path, logger=logger)
|
||
filestream_client.upload_from_file(remote=os.path.join(binlogbackupdir_path, max_log_name),
|
||
local=truncate_file_path, logger=logger)
|
||
|
||
|
||
def upload_binlog_info(binlog_list, log_dir, binlog_backup_dir_path, filestream_client, logger):
|
||
for i, (log_name, start_log_index) in enumerate(binlog_list):
|
||
logger.info("log to upload:%s during binlog backup" % log_name)
|
||
binlog_file_path = os.path.join(log_dir, log_name)
|
||
filestream_client.upload_from_file(remote=os.path.join(binlog_backup_dir_path, log_name),
|
||
local=binlog_file_path, logger=logger)
|
||
|
||
|
||
binbackup_group.add_command(start_binlogbackup)
|