feat(kubernetes): fallback to configured namespace when listing from all namespaces

Fixes #4

If user is not authorized to list from all namespaces try to list from the configured namespace only.
This commit is contained in:
Marc Nuri
2025-02-20 16:52:16 +01:00
parent 90c2802429
commit 3522e4fb44
4 changed files with 139 additions and 14 deletions

View File

@@ -95,11 +95,16 @@ func resolveClientConfig() (*rest.Config, error) {
return resolveConfig().ClientConfig()
}
func configuredNamespace() string {
if ns, _, nsErr := resolveConfig().Namespace(); nsErr == nil {
return ns
}
return ""
}
func namespaceOrDefault(namespace string) string {
if namespace == "" {
if ns, _, nsErr := resolveConfig().Namespace(); nsErr == nil {
namespace = ns
}
return configuredNamespace()
}
return namespace
}

View File

@@ -3,6 +3,7 @@ package kubernetes
import (
"context"
"github.com/manusa/kubernetes-mcp-server/pkg/version"
authv1 "k8s.io/api/authorization/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime/schema"
@@ -23,6 +24,11 @@ func (k *Kubernetes) ResourcesList(ctx context.Context, gvk *schema.GroupVersion
if err != nil {
return "", err
}
// Check if operation is allowed for all namespaces (applicable for namespaced resources)
isNamespaced, _ := k.isNamespaced(gvk)
if isNamespaced && !k.canIUse(ctx, gvr, namespace, "list") && namespace == "" {
namespace = configuredNamespace()
}
rl, err := k.dynamicClient.Resource(*gvr).Namespace(namespace).List(ctx, metav1.ListOptions{})
if err != nil {
return "", err
@@ -125,3 +131,20 @@ func (k *Kubernetes) supportsGroupVersion(groupVersion string) bool {
}
return true
}
func (k *Kubernetes) canIUse(ctx context.Context, gvr *schema.GroupVersionResource, namespace, verb string) bool {
response, err := k.clientSet.AuthorizationV1().SelfSubjectAccessReviews().Create(ctx, &authv1.SelfSubjectAccessReview{
Spec: authv1.SelfSubjectAccessReviewSpec{ResourceAttributes: &authv1.ResourceAttributes{
Namespace: namespace,
Verb: verb,
Group: gvr.Group,
Version: gvr.Version,
Resource: gvr.Resource,
}},
}, metav1.CreateOptions{})
if err != nil {
// TODO: maybe return the error too
return false
}
return response.Status.Allowed
}

View File

@@ -9,6 +9,7 @@ import (
"github.com/mark3labs/mcp-go/server"
"github.com/spf13/afero"
corev1 "k8s.io/api/core/v1"
rbacv1 "k8s.io/api/rbac/v1"
apiextensionsv1spec "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset/typed/apiextensions/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@@ -34,6 +35,7 @@ import (
// envTest has an expensive setup, so we only want to do it once per entire test run.
var envTest *envtest.Environment
var envTestRestConfig *rest.Config
var envTestUser = envtest.User{Name: "test-user", Groups: []string{"test:users"}}
func TestMain(m *testing.M) {
// Set up
@@ -62,9 +64,17 @@ func TestMain(m *testing.M) {
envTest = &envtest.Environment{
BinaryAssetsDirectory: filepath.Join(envTestDir, "k8s", versionDir),
}
envTestRestConfig, _ = envTest.Start()
kc, _ := kubernetes.NewForConfig(envTestRestConfig)
createTestData(context.Background(), kc)
adminSystemMasterBaseConfig, _ := envTest.Start()
au, err := envTest.AddUser(envTestUser, adminSystemMasterBaseConfig)
if err != nil {
panic(err)
}
envTestRestConfig = au.Config()
//Create test data as administrator
ctx := context.Background()
restoreAuth(ctx)
createTestData(ctx)
// Test!
code := m.Run()
@@ -232,25 +242,46 @@ func (c *mcpContext) callTool(name string, args map[string]interface{}) (*mcp.Ca
return c.mcpClient.CallTool(c.ctx, callToolRequest)
}
func createTestData(ctx context.Context, kc *kubernetes.Clientset) {
_, _ = kc.CoreV1().Namespaces().
func restoreAuth(ctx context.Context) {
kubernetesAdmin := kubernetes.NewForConfigOrDie(envTest.Config)
// Authorization
_, _ = kubernetesAdmin.RbacV1().ClusterRoles().Update(ctx, &rbacv1.ClusterRole{
ObjectMeta: metav1.ObjectMeta{Name: "allow-all"},
Rules: []rbacv1.PolicyRule{{
Verbs: []string{"*"},
APIGroups: []string{"*"},
Resources: []string{"*"},
}},
}, metav1.UpdateOptions{})
_, _ = kubernetesAdmin.RbacV1().ClusterRoleBindings().Update(ctx, &rbacv1.ClusterRoleBinding{
ObjectMeta: metav1.ObjectMeta{Name: "allow-all"},
Subjects: []rbacv1.Subject{{Kind: "Group", Name: envTestUser.Groups[0]}},
RoleRef: rbacv1.RoleRef{Kind: "ClusterRole", Name: "allow-all"},
}, metav1.UpdateOptions{})
}
func createTestData(ctx context.Context) {
kubernetesAdmin := kubernetes.NewForConfigOrDie(envTestRestConfig)
// Namespaces
_, _ = kubernetesAdmin.CoreV1().Namespaces().
Create(ctx, &corev1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: "ns-1"}}, metav1.CreateOptions{})
_, _ = kc.CoreV1().Namespaces().
_, _ = kubernetesAdmin.CoreV1().Namespaces().
Create(ctx, &corev1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: "ns-2"}}, metav1.CreateOptions{})
_, _ = kc.CoreV1().Namespaces().
_, _ = kubernetesAdmin.CoreV1().Namespaces().
Create(ctx, &corev1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: "ns-to-delete"}}, metav1.CreateOptions{})
_, _ = kc.CoreV1().Pods("default").Create(ctx, &corev1.Pod{
_, _ = kubernetesAdmin.CoreV1().Pods("default").Create(ctx, &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{Name: "a-pod-in-default"},
Spec: corev1.PodSpec{Containers: []corev1.Container{{Name: "nginx", Image: "nginx"}}},
}, metav1.CreateOptions{})
_, _ = kc.CoreV1().Pods("ns-1").Create(ctx, &corev1.Pod{
// Pods for listing
_, _ = kubernetesAdmin.CoreV1().Pods("ns-1").Create(ctx, &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{Name: "a-pod-in-ns-1"},
Spec: corev1.PodSpec{Containers: []corev1.Container{{Name: "nginx", Image: "nginx"}}},
}, metav1.CreateOptions{})
_, _ = kc.CoreV1().Pods("ns-2").Create(ctx, &corev1.Pod{
_, _ = kubernetesAdmin.CoreV1().Pods("ns-2").Create(ctx, &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{Name: "a-pod-in-ns-2"},
Spec: corev1.PodSpec{Containers: []corev1.Container{{Name: "nginx", Image: "nginx"}}},
}, metav1.CreateOptions{})
_, _ = kc.CoreV1().ConfigMaps("default").
_, _ = kubernetesAdmin.CoreV1().ConfigMaps("default").
Create(ctx, &corev1.ConfigMap{ObjectMeta: metav1.ObjectMeta{Name: "a-configmap-to-delete"}}, metav1.CreateOptions{})
}

View File

@@ -2,6 +2,7 @@ package mcp
import (
corev1 "k8s.io/api/core/v1"
rbacv1 "k8s.io/api/rbac/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime/schema"
@@ -68,6 +69,65 @@ func TestPodsListInAllNamespaces(t *testing.T) {
})
}
func TestPodsListInAllNamespacesUnauthorized(t *testing.T) {
testCase(t, func(c *mcpContext) {
c.withEnvTest()
defer restoreAuth(c.ctx)
client := c.newKubernetesClient()
// Authorize user only for default/configured namespace
r, _ := client.RbacV1().Roles("default").Create(c.ctx, &rbacv1.Role{
ObjectMeta: metav1.ObjectMeta{Name: "allow-pods-list"},
Rules: []rbacv1.PolicyRule{{
Verbs: []string{"get", "list"},
APIGroups: []string{""},
Resources: []string{"pods"},
}},
}, metav1.CreateOptions{})
_, _ = client.RbacV1().RoleBindings("default").Create(c.ctx, &rbacv1.RoleBinding{
ObjectMeta: metav1.ObjectMeta{Name: "allow-pods-list"},
Subjects: []rbacv1.Subject{{Kind: "User", Name: envTestUser.Name}},
RoleRef: rbacv1.RoleRef{Kind: "Role", Name: r.Name},
}, metav1.CreateOptions{})
// Deny cluster by removing cluster rule
_ = client.RbacV1().ClusterRoles().Delete(c.ctx, "allow-all", metav1.DeleteOptions{})
toolResult, err := c.callTool("pods_list", map[string]interface{}{})
t.Run("pods_list returns pods list for default namespace only", func(t *testing.T) {
if err != nil {
t.Fatalf("call tool failed %v", err)
return
}
if toolResult.IsError {
t.Fatalf("call tool failed")
return
}
})
var decoded []unstructured.Unstructured
err = yaml.Unmarshal([]byte(toolResult.Content[0].(map[string]interface{})["text"].(string)), &decoded)
t.Run("pods_list has yaml content", func(t *testing.T) {
if err != nil {
t.Fatalf("invalid tool result content %v", err)
return
}
})
t.Run("pods_list returns 1 items", func(t *testing.T) {
if len(decoded) != 1 {
t.Fatalf("invalid pods count, expected 1, got %v", len(decoded))
return
}
})
t.Run("pods_list returns pod in default", func(t *testing.T) {
if decoded[0].GetName() != "a-pod-in-default" {
t.Fatalf("invalid pod name, expected a-pod-in-default, got %v", decoded[0].GetName())
return
}
if decoded[0].GetNamespace() != "default" {
t.Fatalf("invalid pod namespace, expected default, got %v", decoded[0].GetNamespace())
return
}
})
})
}
func TestPodsListInNamespace(t *testing.T) {
testCase(t, func(c *mcpContext) {
c.withEnvTest()
@@ -184,6 +244,12 @@ func TestPodsGet(t *testing.T) {
return
}
})
t.Run("pods_get with name and nil namespace omits managed fields", func(t *testing.T) {
if decodedNilNamespace.GetManagedFields() != nil {
t.Fatalf("managed fields should be omitted, got %v", decodedNilNamespace.GetManagedFields())
return
}
})
podsGetInNamespace, err := c.callTool("pods_get", map[string]interface{}{
"namespace": "ns-1",
"name": "a-pod-in-ns-1",