247 lines
6.9 KiB
Go
247 lines
6.9 KiB
Go
/*
|
|
Copyright 2021 Alibaba Group Holding Limited.
|
|
|
|
Licensed under the Apache License, Version 2.0 (the "License");
|
|
you may not use this file except in compliance with the License.
|
|
You may obtain a copy of the License at
|
|
|
|
http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
Unless required by applicable law or agreed to in writing, software
|
|
distributed under the License is distributed on an "AS IS" BASIS,
|
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
See the License for the specific language governing permissions and
|
|
limitations under the License.
|
|
*/
|
|
|
|
package init
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"errors"
|
|
"fmt"
|
|
"os"
|
|
"strconv"
|
|
"strings"
|
|
"time"
|
|
|
|
_ "github.com/go-sql-driver/mysql"
|
|
|
|
"github.com/alibaba/polardbx-operator/pkg/meta/core/gms/security"
|
|
dbutil "github.com/alibaba/polardbx-operator/pkg/util/database"
|
|
"github.com/alibaba/polardbx-operator/pkg/util/network"
|
|
)
|
|
|
|
const metadbDatabaseName = "polardbx_meta_db"
|
|
|
|
type Env struct {
|
|
InstanceID string `json:"instance_id"`
|
|
InstanceType string `json:"instance_type"`
|
|
PrimaryInstanceID string `json:"primary_instance_id"`
|
|
PodId string `json:"pod_id"`
|
|
LocalIP string `json:"local_ip"`
|
|
ServerPort int `json:"server_port"`
|
|
HtapPort int `json:"htap_port"`
|
|
MgrPort int `json:"mgr_port"`
|
|
MppPort int `json:"mpp_port"`
|
|
CpuCore int `json:"cpu_core"`
|
|
MemSize int64 `json:"mem_size"`
|
|
MetaDBHost string `json:"metadb_host"`
|
|
MetaDBPort int `json:"metadb_port"`
|
|
MetaDBUser string `json:"metadb_user"`
|
|
MetaDBEncPasswd string `json:"metadb_enc_passwd"`
|
|
EncKey string `json:"enc_key"`
|
|
}
|
|
|
|
func (env *Env) lookupInt64Env(key string) (int64, error) {
|
|
if strVal, exists := os.LookupEnv(key); !exists {
|
|
return 0, fmt.Errorf("env '%s' not found", key)
|
|
} else {
|
|
val, err := strconv.ParseInt(strVal, 10, 64)
|
|
if err != nil {
|
|
return 0, fmt.Errorf("env '%s' is invalid: %s", key, err.Error())
|
|
}
|
|
return val, nil
|
|
}
|
|
}
|
|
|
|
func (env *Env) lookupIntEnv(key string) (int, error) {
|
|
val, err := env.lookupInt64Env(key)
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
return int(val), err
|
|
}
|
|
|
|
func (env *Env) Load() error {
|
|
var exists bool
|
|
var err error
|
|
|
|
if env.PodId, exists = os.LookupEnv("POD_ID"); !exists {
|
|
return errors.New("env 'POD_ID' not found")
|
|
}
|
|
|
|
if localIp, exists := os.LookupEnv("POD_IP"); exists {
|
|
env.LocalIP = localIp
|
|
} else {
|
|
ip, err := network.GetLocalIP()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
env.LocalIP = ip.String()
|
|
}
|
|
fmt.Println("Local IP: " + env.LocalIP)
|
|
|
|
if metaDbAddr, exists := os.LookupEnv("metaDbAddr"); !exists {
|
|
return errors.New("env 'metaDbAddr' not found")
|
|
} else {
|
|
hostPort := strings.SplitN(metaDbAddr, ":", 2)
|
|
if len(hostPort) < 2 {
|
|
return errors.New("env 'metaDbAddr' is invalid: invalid addr, syntax host:port")
|
|
}
|
|
env.MetaDBHost = hostPort[0]
|
|
if env.MetaDBPort, err = strconv.Atoi(hostPort[1]); err != nil {
|
|
return errors.New("env 'metaDbAddr' is invalid: invalid port, " + err.Error())
|
|
}
|
|
}
|
|
|
|
if env.MetaDBUser, exists = os.LookupEnv("metaDbUser"); !exists {
|
|
return errors.New("env 'metaDbUser' not found")
|
|
}
|
|
|
|
if encPasswd, exists := os.LookupEnv("metaDbPasswd"); !exists {
|
|
return errors.New("env 'metaDbPasswd' not found")
|
|
} else {
|
|
env.MetaDBEncPasswd = encPasswd
|
|
}
|
|
|
|
if encKey, exists := os.LookupEnv("dnPasswordKey"); !exists {
|
|
return errors.New("env 'dnPasswordKey' not found")
|
|
} else {
|
|
env.EncKey = encKey
|
|
}
|
|
|
|
if env.InstanceID, exists = os.LookupEnv("instanceId"); !exists {
|
|
return errors.New("env 'instanceId' not found")
|
|
}
|
|
|
|
if env.InstanceType, exists = os.LookupEnv("instanceType"); !exists {
|
|
return errors.New("env 'instanceType' not found")
|
|
}
|
|
|
|
if env.PrimaryInstanceID, exists = os.LookupEnv("primaryInstanceId"); !exists {
|
|
env.PrimaryInstanceID = env.InstanceID
|
|
}
|
|
|
|
if env.ServerPort, err = env.lookupIntEnv("serverPort"); err != nil {
|
|
return err
|
|
}
|
|
|
|
if env.HtapPort, err = env.lookupIntEnv("htapPort"); err != nil {
|
|
return err
|
|
}
|
|
|
|
if env.MgrPort, err = env.lookupIntEnv("mgrPort"); err != nil {
|
|
return err
|
|
}
|
|
|
|
if env.MppPort, err = env.lookupIntEnv("mppPort"); err != nil {
|
|
return err
|
|
}
|
|
|
|
if env.CpuCore, err = env.lookupIntEnv("cpuCore"); err != nil {
|
|
return err
|
|
}
|
|
|
|
if env.MemSize, err = env.lookupInt64Env("memSize"); err != nil {
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func MustMarshalJSON(obj interface{}) string {
|
|
b, err := json.Marshal(obj)
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
return string(b)
|
|
}
|
|
|
|
func Do() {
|
|
fmt.Println("Begin initializing...")
|
|
|
|
fmt.Println("Loading from environment...")
|
|
env := &Env{}
|
|
if err := env.Load(); err != nil {
|
|
fmt.Println("Error when loading from environment: " + err.Error())
|
|
os.Exit(-1)
|
|
}
|
|
|
|
fmt.Println("Environment loaded: " + MustMarshalJSON(env))
|
|
|
|
fmt.Println("Connecting to metadb...")
|
|
passwd, err := security.MustNewPasswordCipher(env.EncKey).Decrypt(env.MetaDBEncPasswd)
|
|
if err != nil {
|
|
fmt.Println("Error when decrypting password: " + err.Error())
|
|
os.Exit(-1)
|
|
}
|
|
db, err := dbutil.OpenMySQLDB(&dbutil.MySQLDataSource{
|
|
Host: env.MetaDBHost,
|
|
Port: env.MetaDBPort,
|
|
Username: env.MetaDBUser,
|
|
Password: passwd,
|
|
Database: metadbDatabaseName,
|
|
Timeout: 10 * time.Second,
|
|
})
|
|
if err != nil {
|
|
fmt.Println("Error when connecting to metadb: " + err.Error())
|
|
os.Exit(-1)
|
|
}
|
|
defer db.Close()
|
|
fmt.Println("Connected to metadb!")
|
|
|
|
fmt.Printf("Try self-registration, register %s:%d to metadb...\n", env.LocalIP, env.ServerPort)
|
|
stmt := fmt.Sprintf(`REPLACE INTO server_info
|
|
(inst_id, inst_type, ip, port, htap_port, mgr_port, mpp_port, status, cpu_core, mem_size, extras)
|
|
VALUES ('%s', '%s', '%s', %d, %d, %d, %d, %d, %d, %d, '%s')`, env.InstanceID, env.InstanceType, env.LocalIP,
|
|
env.ServerPort, env.HtapPort, env.MgrPort, env.MppPort, 0, env.CpuCore, env.MemSize, env.PodId)
|
|
notifyStmt := fmt.Sprintf(`UPDATE config_listener SET op_version = op_version + 1 WHERE data_id = 'polardbx.server.info.%s'`, env.InstanceID)
|
|
|
|
notifyStmtForPrimary := fmt.Sprintf(`UPDATE config_listener SET op_version = op_version + 1 WHERE data_id = 'polardbx.server.info.%s'`, env.PrimaryInstanceID)
|
|
|
|
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
|
defer cancel()
|
|
|
|
tx, err := db.BeginTx(ctx, nil)
|
|
if err != nil {
|
|
fmt.Println("Error when begin transaction: " + err.Error())
|
|
os.Exit(-1)
|
|
}
|
|
|
|
if _, err := tx.ExecContext(ctx, stmt); err != nil {
|
|
fmt.Println("Error when self-registering: " + err.Error())
|
|
os.Exit(-1)
|
|
}
|
|
|
|
if _, err := tx.ExecContext(ctx, notifyStmt); err != nil {
|
|
fmt.Println("Error when updating config listener: " + err.Error())
|
|
os.Exit(-1)
|
|
}
|
|
|
|
if _, err := tx.ExecContext(ctx, notifyStmtForPrimary); err != nil {
|
|
fmt.Println("Error when updating config listener: " + err.Error())
|
|
os.Exit(-1)
|
|
}
|
|
|
|
if err = tx.Commit(); err != nil {
|
|
fmt.Println("Failed to commit transaction: " + err.Error())
|
|
os.Exit(-1)
|
|
}
|
|
|
|
fmt.Println("Registered!")
|
|
|
|
fmt.Println("Successfully initialized!")
|
|
}
|