polardbxoperator/pkg/hpfs/backupbinlog/heartbeat.go

172 lines
4.5 KiB
Go

package backupbinlog
import (
"context"
"database/sql"
"fmt"
"github.com/go-logr/logr"
"os"
"sigs.k8s.io/controller-runtime/pkg/log/zap"
"strconv"
"sync"
"time"
)
const (
Sname = "pitr_sname"
heartbeatTableDDL = "/*+TDDL:cmd_extra(ENABLE_ASYNC_DDL=FALSE)*/\n" +
"CREATE TABLE IF NOT EXISTS `__cdc_heartbeat__` (\n" +
" `id` bigint(20) NOT NULL AUTO_INCREMENT BY GROUP,\n" +
" `sname` varchar(10) DEFAULT NULL,\n" +
" `gmt_modified` datetime(3) DEFAULT NULL,\n" +
" PRIMARY KEY (`id`)\n) ENGINE = InnoDB DEFAULT CHARSET = utf8mb4 broadcast"
PitrHeartbeatId = 111
)
const (
EnvHeartbeatPrefix = "heartbeat_"
EnvHeartbeatHost = EnvHeartbeatPrefix + "host"
EnvHeartbeatPort = EnvHeartbeatPrefix + "port"
EnvHeartbeatUser = EnvHeartbeatPrefix + "user"
EnvHeartbeatPassword = EnvHeartbeatPrefix + "password"
EnvHeartbeatInterval = EnvHeartbeatPrefix + "interval"
EnvMaxRetryCount = EnvHeartbeatPrefix + "max_retry_count"
)
type HeartBeat struct {
interval time.Duration
host string
port int
user string
pwd string
sql []string
maxRetryCount int
ctx context.Context
cancelFunc context.CancelFunc
lock sync.Mutex
logger logr.Logger
}
func checkValid(values ...string) {
for _, value := range values {
if value == "" {
panic("invalid value")
}
}
}
func NewHeatBeat() *HeartBeat {
host := os.Getenv(EnvHeartbeatHost)
portStr := os.Getenv(EnvHeartbeatPort)
user := os.Getenv(EnvHeartbeatUser)
pwd := os.Getenv(EnvHeartbeatPassword)
intervalStr := os.Getenv(EnvHeartbeatInterval)
maxRetryCountStr := os.Getenv(EnvMaxRetryCount)
logger := zap.New(zap.UseDevMode(true)).WithName("HeatBeat")
logger.Info("env values", "host", host, "port", portStr, "user", user, "interval", intervalStr, "maxRetryCount", maxRetryCountStr)
checkValid(host, portStr, user, intervalStr, maxRetryCountStr)
parsedPort, err := strconv.ParseInt(portStr, 10, 64)
if err != nil {
panic("failed to parse port=" + portStr)
}
interval, err := time.ParseDuration(intervalStr)
if err != nil {
panic("failed to parse interval=" + intervalStr)
}
maxRetryCount, err := strconv.ParseInt(maxRetryCountStr, 10, 64)
if err != nil {
panic("failed to parse maxRetryCount=" + maxRetryCountStr)
}
ctx, cancelFunc := context.WithCancel(context.Background())
return &HeartBeat{
interval: interval,
sql: []string{"set drds_transaction_policy='TSO'", fmt.Sprintf("replace into `__cdc_heartbeat__`(id,Sname, gmt_modified) values(%d,'%s', now())", PitrHeartbeatId, Sname)},
host: host,
port: int(parsedPort),
user: user,
pwd: pwd,
maxRetryCount: int(maxRetryCount),
ctx: ctx,
cancelFunc: cancelFunc,
logger: logger,
lock: sync.Mutex{},
}
}
func (h *HeartBeat) Cancel() {
h.lock.Lock()
defer h.lock.Unlock()
if h.cancelFunc != nil {
h.logger.Info("cancelled")
h.cancelFunc()
}
}
func (h *HeartBeat) connect() *sql.DB {
db, err := sql.Open("mysql", fmt.Sprintf("%s:%s@tcp(%s:%d)/__cdc__?timeout=5s&readTimeout=5s", h.user, h.pwd, h.host, h.port))
if err != nil {
panic(err)
}
return db
}
func (h *HeartBeat) Do() {
go func() {
defer h.Cancel()
db := h.connect()
defer db.Close()
lastTime := time.Now().Add(-h.interval)
_, err := db.ExecContext(h.ctx, heartbeatTableDDL)
if err != nil {
panic(err)
}
retryCount := 0
for retryCount < h.maxRetryCount {
nextDuration := int64(h.interval.Milliseconds()) - (time.Now().UnixMilli() - lastTime.UnixMilli())
select {
case <-time.After(time.Duration(nextDuration) * time.Millisecond):
h.logger.Info("heartbeat")
lastTime = time.Now()
case <-h.ctx.Done():
return
}
conn, err := db.Conn(h.ctx)
if err != nil {
h.logger.Error(err, "failed to get conn")
retryCount = retryCount + 1
return
}
tx, err := conn.BeginTx(h.ctx, &sql.TxOptions{})
if err != nil {
h.logger.Error(err, "fail to begin tx")
retryCount = retryCount + 1
conn.Close()
continue
}
commit := true
for _, query := range h.sql {
_, err := tx.ExecContext(h.ctx, query)
if err != nil {
h.logger.Error(err, fmt.Sprintf("failed to exec query=%s", query))
retryCount = retryCount + 1
tx.Rollback()
commit = false
break
}
}
if commit {
retryCount = 0
tx.Commit()
}
conn.Close()
}
}()
}
func (h *HeartBeat) Wait() {
if h.ctx != nil {
<-h.ctx.Done()
}
}