test: initial approach to pods_exec

This commit is contained in:
Marc Nuri
2025-03-30 10:03:50 +02:00
parent b08fe66d56
commit 61289cf1df
4 changed files with 228 additions and 7 deletions

View File

@@ -35,7 +35,8 @@ type Kubernetes struct {
kubeConfigFiles []string
CloseWatchKubeConfig CloseWatchKubeConfig
scheme *runtime.Scheme
parameterCodec *runtime.ParameterCodec
parameterCodec runtime.ParameterCodec
restClient rest.Interface
clientSet kubernetes.Interface
discoveryClient *discovery.DiscoveryClient
deferredDiscoveryRESTMapper *restmapper.DeferredDiscoveryRESTMapper
@@ -47,7 +48,11 @@ func NewKubernetes() (*Kubernetes, error) {
if err != nil {
return nil, err
}
clientSet, err := kubernetes.NewForConfig(cfg)
restClient, err := rest.HTTPClientFor(cfg)
if err != nil {
return nil, err
}
clientSet, err := kubernetes.NewForConfigAndClient(cfg, restClient)
if err != nil {
return nil, err
}
@@ -63,12 +68,11 @@ func NewKubernetes() (*Kubernetes, error) {
if err = v1.AddToScheme(scheme); err != nil {
return nil, err
}
parameterCodec := runtime.NewParameterCodec(scheme)
return &Kubernetes{
cfg: cfg,
kubeConfigFiles: resolveConfig().ConfigAccess().GetLoadingPrecedence(),
scheme: scheme,
parameterCodec: &parameterCodec,
parameterCodec: runtime.NewParameterCodec(scheme),
clientSet: clientSet,
discoveryClient: discoveryClient,
deferredDiscoveryRESTMapper: restmapper.NewDeferredDiscoveryRESTMapper(memory.NewMemCacheClient(discoveryClient)),
@@ -148,7 +152,11 @@ func resolveClientConfig() (*rest.Config, error) {
if err == nil && inClusterConfig != nil {
return inClusterConfig, nil
}
return resolveConfig().ClientConfig()
cfg, err := resolveConfig().ClientConfig()
if cfg != nil && cfg.UserAgent == "" {
cfg.UserAgent = rest.DefaultKubernetesUserAgent()
}
return cfg, err
}
func configuredNamespace() string {

View File

@@ -0,0 +1,165 @@
package kubernetes
import (
"encoding/json"
"errors"
"io"
v1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/serializer"
"k8s.io/apimachinery/pkg/util/httpstream"
"k8s.io/apimachinery/pkg/util/httpstream/spdy"
"k8s.io/client-go/kubernetes/fake"
"k8s.io/client-go/rest"
"net/http"
"net/http/httptest"
)
type MockServer struct {
server *httptest.Server
config *rest.Config
restClient *rest.RESTClient
restHandlers []http.HandlerFunc
clientSet *fake.Clientset
parameterCodec runtime.ParameterCodec
}
func NewMockServer() *MockServer {
ms := &MockServer{
clientSet: fake.NewClientset(),
}
scheme := runtime.NewScheme()
codecs := serializer.NewCodecFactory(scheme)
ms.parameterCodec = runtime.NewParameterCodec(scheme)
ms.server = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
for _, handler := range ms.restHandlers {
handler(w, req)
}
}))
ms.config = &rest.Config{
Host: ms.server.URL,
APIPath: "/api",
ContentConfig: rest.ContentConfig{
NegotiatedSerializer: codecs,
ContentType: runtime.ContentTypeJSON,
GroupVersion: &v1.SchemeGroupVersion,
},
}
ms.restClient, _ = rest.RESTClientFor(ms.config)
ms.restHandlers = make([]http.HandlerFunc, 0)
return ms
}
func (m *MockServer) Close() {
m.server.Close()
}
func (m *MockServer) ClientSet() *fake.Clientset {
return m.clientSet
}
func (m *MockServer) Handle(handler http.Handler) {
m.restHandlers = append(m.restHandlers, handler.ServeHTTP)
}
func (m *MockServer) NewKubernetes() *Kubernetes {
return &Kubernetes{
cfg: m.config,
restClient: m.restClient,
clientSet: m.clientSet,
parameterCodec: m.parameterCodec,
}
}
type streamAndReply struct {
httpstream.Stream
replySent <-chan struct{}
}
type streamContext struct {
conn io.Closer
stdinStream io.ReadCloser
stdoutStream io.WriteCloser
stderrStream io.WriteCloser
writeStatus func(status *apierrors.StatusError) error
}
type StreamOptions struct {
Stdin io.Reader
Stdout io.Writer
Stderr io.Writer
}
func v4WriteStatusFunc(stream io.Writer) func(status *apierrors.StatusError) error {
return func(status *apierrors.StatusError) error {
bs, err := json.Marshal(status.Status())
if err != nil {
return err
}
_, err = stream.Write(bs)
return err
}
}
func createHTTPStreams(w http.ResponseWriter, req *http.Request, opts *StreamOptions) (*streamContext, error) {
_, err := httpstream.Handshake(req, w, []string{"v4.channel.k8s.io"})
if err != nil {
return nil, err
}
upgrader := spdy.NewResponseUpgrader()
streamCh := make(chan streamAndReply)
conn := upgrader.UpgradeResponse(w, req, func(stream httpstream.Stream, replySent <-chan struct{}) error {
streamCh <- streamAndReply{Stream: stream, replySent: replySent}
return nil
})
ctx := &streamContext{
conn: conn,
}
// wait for stream
replyChan := make(chan struct{}, 4)
defer close(replyChan)
receivedStreams := 0
expectedStreams := 1
if opts.Stdout != nil {
expectedStreams++
}
if opts.Stdin != nil {
expectedStreams++
}
if opts.Stderr != nil {
expectedStreams++
}
WaitForStreams:
for {
select {
case stream := <-streamCh:
streamType := stream.Headers().Get(v1.StreamType)
switch streamType {
case v1.StreamTypeError:
replyChan <- struct{}{}
ctx.writeStatus = v4WriteStatusFunc(stream)
case v1.StreamTypeStdout:
replyChan <- struct{}{}
ctx.stdoutStream = stream
case v1.StreamTypeStdin:
replyChan <- struct{}{}
ctx.stdinStream = stream
case v1.StreamTypeStderr:
replyChan <- struct{}{}
ctx.stderrStream = stream
default:
// add other stream ...
return nil, errors.New("unimplemented stream type")
}
case <-replyChan:
receivedStreams++
if receivedStreams == expectedStreams {
break WaitForStreams
}
}
}
return ctx, nil
}

View File

@@ -214,12 +214,13 @@ func (k *Kubernetes) PodsExec(ctx context.Context, namespace, name, container st
func (k *Kubernetes) createExecutor(namespace, name string, podExecOptions *v1.PodExecOptions) (remotecommand.Executor, error) {
// Compute URL
// https://github.com/kubernetes/kubectl/blob/5366de04e168bcbc11f5e340d131a9ca8b7d0df4/pkg/cmd/exec/exec.go#L382-L397
req := k.clientSet.CoreV1().RESTClient().Post().
req := k.restClient.
Post().
Resource("pods").
Namespace(namespace).
Name(name).
SubResource("exec")
req.VersionedParams(podExecOptions, *k.parameterCodec)
req.VersionedParams(podExecOptions, k.parameterCodec)
spdyExec, err := remotecommand.NewSPDYExecutor(k.cfg, "POST", req.URL())
if err != nil {
return nil, err

View File

@@ -0,0 +1,47 @@
package kubernetes
import (
"bytes"
"context"
"io"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"net/http"
"testing"
)
func TestPodsExec(t *testing.T) {
mockServer := NewMockServer()
defer mockServer.Close()
_ = mockServer.ClientSet().Tracker().Add(&v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Namespace: "default",
Name: "pod-to-exec",
},
Spec: v1.PodSpec{Containers: []v1.Container{{Name: "container-to-exec"}}},
})
mockServer.Handle(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
var stdin, stdout bytes.Buffer
ctx, err := createHTTPStreams(w, req, &StreamOptions{
Stdin: &stdin,
Stdout: &stdout,
})
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
_, _ = w.Write([]byte(err.Error()))
return
}
defer ctx.conn.Close()
if req.URL.Path == "/api/v1/namespaces/default/pods/pod-to-exec/exec" {
_, _ = io.WriteString(ctx.stdoutStream, "total 0\n")
}
}))
k8s := mockServer.NewKubernetes()
out, err := k8s.PodsExec(context.Background(), "default", "pod-to-exec", "", []string{"ls", "-l"})
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if out != "total 0\n" {
t.Fatalf("unexpected output: %s", out)
}
}