polardbxoperator/test/framework/polardbxparameter/factory.go

182 lines
4.7 KiB
Go

package polardbxparameter
import (
"context"
"errors"
"io"
"time"
polardbxv1 "github.com/alibaba/polardbx-operator/api/v1"
k8shelper "github.com/alibaba/polardbx-operator/pkg/k8s/helper"
polardbxmeta "github.com/alibaba/polardbx-operator/pkg/operator/v1/polardbx/meta"
"github.com/alibaba/polardbx-operator/pkg/operator/v1/xstore/convention"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
restclient "k8s.io/client-go/rest"
"k8s.io/client-go/tools/remotecommand"
)
type ParameterFactoryOption func(parameter *polardbxv1.PolarDBXParameter)
type ParameterTemplateFactoryOption func(parameterTemplate *polardbxv1.PolarDBXParameterTemplate)
func NewPolarDBXParameter(name, namespace string, opts ...ParameterFactoryOption) *polardbxv1.PolarDBXParameter {
obj := &polardbxv1.PolarDBXParameter{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Namespace: namespace,
},
}
for _, opt := range opts {
opt(obj)
}
return obj
}
func SetLabel(key, value string) ParameterFactoryOption {
return func(polardbxparameter *polardbxv1.PolarDBXParameter) {
if polardbxparameter.Labels == nil {
polardbxparameter.Labels = make(map[string]string)
}
polardbxparameter.Labels[key] = value
}
}
func SetTemplateNameAndClusterName(templateName, clusterName string) ParameterFactoryOption {
return func(polardbxparameter *polardbxv1.PolarDBXParameter) {
polardbxparameter.Spec.TemplateName = templateName
polardbxparameter.Spec.ClusterName = clusterName
}
}
const (
RollingRestart = "rollingRestart"
Restart = "restart"
)
func SetNodeType(role, name, restartType string, paramList []polardbxv1.Params) ParameterFactoryOption {
switch role {
case polardbxmeta.RoleCN:
return func(polardbxparameter *polardbxv1.PolarDBXParameter) {
polardbxparameter.Spec.NodeType.CN = polardbxv1.ParamNode{
Name: name,
RestartType: restartType,
ParamList: paramList,
}
}
case polardbxmeta.RoleDN:
return func(polardbxparameter *polardbxv1.PolarDBXParameter) {
polardbxparameter.Spec.NodeType.DN = polardbxv1.ParamNode{
Name: name,
RestartType: restartType,
ParamList: paramList,
}
}
case polardbxmeta.RoleGMS:
return func(polardbxparameter *polardbxv1.PolarDBXParameter) {
polardbxparameter.Spec.NodeType.GMS = &polardbxv1.ParamNode{
Name: name,
RestartType: restartType,
ParamList: paramList,
}
}
default:
panic("unrecognized role: " + role)
}
}
func NewPolarDBXParameterTemplate(name, namespace string, opts ...ParameterTemplateFactoryOption) *polardbxv1.PolarDBXParameterTemplate {
obj := &polardbxv1.PolarDBXParameterTemplate{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Namespace: namespace,
},
}
for _, opt := range opts {
opt(obj)
}
return obj
}
func SetTemplateNodeType(role string, paramList []polardbxv1.TemplateParams) ParameterTemplateFactoryOption {
switch role {
case polardbxmeta.RoleCN:
return func(polardbxparameter *polardbxv1.PolarDBXParameterTemplate) {
polardbxparameter.Spec.NodeType.CN = polardbxv1.TemplateNode{
Name: "e2e-test",
ParamList: paramList,
}
}
case polardbxmeta.RoleDN:
return func(polardbxparameter *polardbxv1.PolarDBXParameterTemplate) {
polardbxparameter.Spec.NodeType.DN = polardbxv1.TemplateNode{
Name: "e2e-test",
ParamList: paramList,
}
}
default:
panic("unrecognized role: " + role)
}
}
// ExecCmd exec command on specific pod and wait the command's output.
func ExecCmd(client kubernetes.Interface, config *restclient.Config, pod *corev1.Pod, namespace string,
command string, stdin io.Reader, stdout io.Writer, stderr io.Writer) error {
if k8shelper.GetContainerFromPod(pod, convention.ContainerEngine) == nil {
return errors.New("container " + "engine" + " not found in pod " + pod.Name)
}
cmd := []string{
"sh",
"-c",
command,
}
req := client.
CoreV1().
RESTClient().
Post().
Resource("pods").
Name(pod.Name).
Namespace(namespace).
SubResource("exec").
Timeout(5 * time.Second)
option := &corev1.PodExecOptions{
Container: convention.ContainerEngine,
Command: cmd,
Stdin: true,
Stdout: true,
Stderr: true,
TTY: false,
}
if stdin == nil {
option.Stdin = false
}
req.VersionedParams(
option,
scheme.ParameterCodec,
)
exec, err := remotecommand.NewSPDYExecutor(config, "POST", req.URL())
if err != nil {
return err
}
ctx := context.Background()
cancelFn := func() {}
ctx, cancelFn = context.WithTimeout(ctx, 5*time.Second)
defer cancelFn()
err = exec.StreamWithContext(ctx, remotecommand.StreamOptions{
Stdin: stdin,
Stdout: stdout,
Stderr: stderr,
})
if err != nil {
return err
}
return nil
}