172 lines
4.5 KiB
Go
172 lines
4.5 KiB
Go
package backupbinlog
|
|
|
|
import (
|
|
"context"
|
|
"database/sql"
|
|
"fmt"
|
|
"github.com/go-logr/logr"
|
|
"os"
|
|
"sigs.k8s.io/controller-runtime/pkg/log/zap"
|
|
"strconv"
|
|
"sync"
|
|
"time"
|
|
)
|
|
|
|
const (
|
|
Sname = "pitr_sname"
|
|
heartbeatTableDDL = "/*+TDDL:cmd_extra(ENABLE_ASYNC_DDL=FALSE)*/\n" +
|
|
"CREATE TABLE IF NOT EXISTS `__cdc_heartbeat__` (\n" +
|
|
" `id` bigint(20) NOT NULL AUTO_INCREMENT BY GROUP,\n" +
|
|
" `sname` varchar(10) DEFAULT NULL,\n" +
|
|
" `gmt_modified` datetime(3) DEFAULT NULL,\n" +
|
|
" PRIMARY KEY (`id`)\n) ENGINE = InnoDB DEFAULT CHARSET = utf8mb4 broadcast"
|
|
PitrHeartbeatId = 111
|
|
)
|
|
|
|
const (
|
|
EnvHeartbeatPrefix = "heartbeat_"
|
|
EnvHeartbeatHost = EnvHeartbeatPrefix + "host"
|
|
EnvHeartbeatPort = EnvHeartbeatPrefix + "port"
|
|
EnvHeartbeatUser = EnvHeartbeatPrefix + "user"
|
|
EnvHeartbeatPassword = EnvHeartbeatPrefix + "password"
|
|
EnvHeartbeatInterval = EnvHeartbeatPrefix + "interval"
|
|
EnvMaxRetryCount = EnvHeartbeatPrefix + "max_retry_count"
|
|
)
|
|
|
|
type HeartBeat struct {
|
|
interval time.Duration
|
|
host string
|
|
port int
|
|
user string
|
|
pwd string
|
|
sql []string
|
|
maxRetryCount int
|
|
ctx context.Context
|
|
cancelFunc context.CancelFunc
|
|
lock sync.Mutex
|
|
logger logr.Logger
|
|
}
|
|
|
|
func checkValid(values ...string) {
|
|
for _, value := range values {
|
|
if value == "" {
|
|
panic("invalid value")
|
|
}
|
|
}
|
|
}
|
|
|
|
func NewHeatBeat() *HeartBeat {
|
|
host := os.Getenv(EnvHeartbeatHost)
|
|
portStr := os.Getenv(EnvHeartbeatPort)
|
|
user := os.Getenv(EnvHeartbeatUser)
|
|
pwd := os.Getenv(EnvHeartbeatPassword)
|
|
intervalStr := os.Getenv(EnvHeartbeatInterval)
|
|
maxRetryCountStr := os.Getenv(EnvMaxRetryCount)
|
|
logger := zap.New(zap.UseDevMode(true)).WithName("HeatBeat")
|
|
logger.Info("env values", "host", host, "port", portStr, "user", user, "interval", intervalStr, "maxRetryCount", maxRetryCountStr)
|
|
checkValid(host, portStr, user, intervalStr, maxRetryCountStr)
|
|
parsedPort, err := strconv.ParseInt(portStr, 10, 64)
|
|
if err != nil {
|
|
panic("failed to parse port=" + portStr)
|
|
}
|
|
interval, err := time.ParseDuration(intervalStr)
|
|
if err != nil {
|
|
panic("failed to parse interval=" + intervalStr)
|
|
}
|
|
maxRetryCount, err := strconv.ParseInt(maxRetryCountStr, 10, 64)
|
|
if err != nil {
|
|
panic("failed to parse maxRetryCount=" + maxRetryCountStr)
|
|
}
|
|
|
|
ctx, cancelFunc := context.WithCancel(context.Background())
|
|
return &HeartBeat{
|
|
interval: interval,
|
|
sql: []string{"set drds_transaction_policy='TSO'", fmt.Sprintf("replace into `__cdc_heartbeat__`(id,Sname, gmt_modified) values(%d,'%s', now())", PitrHeartbeatId, Sname)},
|
|
host: host,
|
|
port: int(parsedPort),
|
|
user: user,
|
|
pwd: pwd,
|
|
maxRetryCount: int(maxRetryCount),
|
|
ctx: ctx,
|
|
cancelFunc: cancelFunc,
|
|
logger: logger,
|
|
lock: sync.Mutex{},
|
|
}
|
|
}
|
|
|
|
func (h *HeartBeat) Cancel() {
|
|
h.lock.Lock()
|
|
defer h.lock.Unlock()
|
|
if h.cancelFunc != nil {
|
|
h.logger.Info("cancelled")
|
|
h.cancelFunc()
|
|
}
|
|
}
|
|
|
|
func (h *HeartBeat) connect() *sql.DB {
|
|
db, err := sql.Open("mysql", fmt.Sprintf("%s:%s@tcp(%s:%d)/__cdc__?timeout=5s&readTimeout=5s", h.user, h.pwd, h.host, h.port))
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
return db
|
|
}
|
|
|
|
func (h *HeartBeat) Do() {
|
|
go func() {
|
|
defer h.Cancel()
|
|
db := h.connect()
|
|
defer db.Close()
|
|
lastTime := time.Now().Add(-h.interval)
|
|
_, err := db.ExecContext(h.ctx, heartbeatTableDDL)
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
retryCount := 0
|
|
for retryCount < h.maxRetryCount {
|
|
nextDuration := int64(h.interval.Milliseconds()) - (time.Now().UnixMilli() - lastTime.UnixMilli())
|
|
select {
|
|
case <-time.After(time.Duration(nextDuration) * time.Millisecond):
|
|
h.logger.Info("heartbeat")
|
|
lastTime = time.Now()
|
|
case <-h.ctx.Done():
|
|
return
|
|
}
|
|
conn, err := db.Conn(h.ctx)
|
|
if err != nil {
|
|
h.logger.Error(err, "failed to get conn")
|
|
retryCount = retryCount + 1
|
|
return
|
|
}
|
|
tx, err := conn.BeginTx(h.ctx, &sql.TxOptions{})
|
|
if err != nil {
|
|
h.logger.Error(err, "fail to begin tx")
|
|
retryCount = retryCount + 1
|
|
conn.Close()
|
|
continue
|
|
}
|
|
commit := true
|
|
for _, query := range h.sql {
|
|
_, err := tx.ExecContext(h.ctx, query)
|
|
if err != nil {
|
|
h.logger.Error(err, fmt.Sprintf("failed to exec query=%s", query))
|
|
retryCount = retryCount + 1
|
|
tx.Rollback()
|
|
commit = false
|
|
break
|
|
}
|
|
}
|
|
if commit {
|
|
retryCount = 0
|
|
tx.Commit()
|
|
}
|
|
conn.Close()
|
|
}
|
|
}()
|
|
}
|
|
|
|
func (h *HeartBeat) Wait() {
|
|
if h.ctx != nil {
|
|
<-h.ctx.Done()
|
|
}
|
|
}
|