mirror of
https://github.com/containers/kubernetes-mcp-server.git
synced 2025-10-23 01:22:57 +03:00
test(config): extensive test suite for denied lists
This commit is contained in:
@@ -124,23 +124,12 @@ func (c *mcpContext) beforeEach(t *testing.T) {
|
||||
if c.listOutput == nil {
|
||||
c.listOutput = output.Yaml
|
||||
}
|
||||
if c.staticConfig == nil {
|
||||
c.staticConfig = &config.StaticConfig{}
|
||||
}
|
||||
if c.before != nil {
|
||||
c.before(c)
|
||||
}
|
||||
if c.staticConfig == nil {
|
||||
c.staticConfig = &config.StaticConfig{
|
||||
DeniedResources: []config.GroupVersionKind{
|
||||
{
|
||||
Version: "v1",
|
||||
Kind: "Secret",
|
||||
},
|
||||
{
|
||||
Group: "rbac.authorization.k8s.io",
|
||||
Version: "v1",
|
||||
},
|
||||
},
|
||||
}
|
||||
}
|
||||
if c.mcpServer, err = NewServer(Configuration{
|
||||
Profile: c.profile,
|
||||
ListOutput: c.listOutput,
|
||||
@@ -222,10 +211,6 @@ func (c *mcpContext) withKubeConfig(rc *rest.Config) *api.Config {
|
||||
return fakeConfig
|
||||
}
|
||||
|
||||
func (c *mcpContext) withStaticConfig(config *config.StaticConfig) {
|
||||
c.staticConfig = config
|
||||
}
|
||||
|
||||
// withEnvTest sets up the environment for kubeconfig to be used with envTest
|
||||
func (c *mcpContext) withEnvTest() {
|
||||
c.withKubeConfig(envTestRestConfig)
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
package mcp
|
||||
|
||||
import (
|
||||
"github.com/manusa/kubernetes-mcp-server/pkg/config"
|
||||
"github.com/mark3labs/mcp-go/mcp"
|
||||
v1 "k8s.io/api/core/v1"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
@@ -93,3 +94,22 @@ func TestEventsList(t *testing.T) {
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
func TestEventsListDenied(t *testing.T) {
|
||||
deniedResourcesServer := &config.StaticConfig{DeniedResources: []config.GroupVersionKind{{Version: "v1", Kind: "Event"}}}
|
||||
testCaseWithContext(t, &mcpContext{staticConfig: deniedResourcesServer}, func(c *mcpContext) {
|
||||
c.withEnvTest()
|
||||
eventList, _ := c.callTool("events_list", map[string]interface{}{})
|
||||
t.Run("events_list has error", func(t *testing.T) {
|
||||
if !eventList.IsError {
|
||||
t.Fatalf("call tool should fail")
|
||||
}
|
||||
})
|
||||
t.Run("events_list describes denial", func(t *testing.T) {
|
||||
expectedMessage := "failed to list events in all namespaces: resource not allowed: /v1, Kind=Event"
|
||||
if eventList.Content[0].(mcp.TextContent).Text != expectedMessage {
|
||||
t.Fatalf("expected desciptive error '%s', got %v", expectedMessage, eventList.Content[0].(mcp.TextContent).Text)
|
||||
}
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
@@ -3,6 +3,7 @@ package mcp
|
||||
import (
|
||||
"context"
|
||||
"encoding/base64"
|
||||
"github.com/manusa/kubernetes-mcp-server/pkg/config"
|
||||
"github.com/mark3labs/mcp-go/mcp"
|
||||
corev1 "k8s.io/api/core/v1"
|
||||
"k8s.io/apimachinery/pkg/api/errors"
|
||||
@@ -57,6 +58,30 @@ func TestHelmInstall(t *testing.T) {
|
||||
})
|
||||
}
|
||||
|
||||
func TestHelmInstallDenied(t *testing.T) {
|
||||
t.Skip("To be implemented") // TODO: helm_install is not checking for denied resources
|
||||
deniedResourcesServer := &config.StaticConfig{DeniedResources: []config.GroupVersionKind{{Version: "v1", Kind: "Secret"}}}
|
||||
testCaseWithContext(t, &mcpContext{staticConfig: deniedResourcesServer}, func(c *mcpContext) {
|
||||
c.withEnvTest()
|
||||
_, file, _, _ := runtime.Caller(0)
|
||||
chartPath := filepath.Join(filepath.Dir(file), "testdata", "helm-chart-secret")
|
||||
helmInstall, _ := c.callTool("helm_install", map[string]interface{}{
|
||||
"chart": chartPath,
|
||||
})
|
||||
t.Run("helm_install has error", func(t *testing.T) {
|
||||
if !helmInstall.IsError {
|
||||
t.Fatalf("call tool should fail")
|
||||
}
|
||||
})
|
||||
t.Run("helm_install describes denial", func(t *testing.T) {
|
||||
expectedMessage := "failed to install helm chart: resource not allowed: /v1, Kind=Secret"
|
||||
if helmInstall.Content[0].(mcp.TextContent).Text != expectedMessage {
|
||||
t.Fatalf("expected desciptive error '%s', got %v", expectedMessage, helmInstall.Content[0].(mcp.TextContent).Text)
|
||||
}
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
func TestHelmList(t *testing.T) {
|
||||
testCase(t, func(c *mcpContext) {
|
||||
c.withEnvTest()
|
||||
@@ -199,6 +224,10 @@ func TestHelmUninstall(t *testing.T) {
|
||||
})
|
||||
}
|
||||
|
||||
func TestHelmUninstallDenied(t *testing.T) {
|
||||
t.Skip("To be implemented") // TODO: helm_uninstall is not checking for denied resources
|
||||
}
|
||||
|
||||
func clearHelmReleases(ctx context.Context, kc *kubernetes.Clientset) {
|
||||
secrets, _ := kc.CoreV1().Secrets("default").List(ctx, metav1.ListOptions{})
|
||||
for _, secret := range secrets.Items {
|
||||
|
||||
@@ -11,8 +11,6 @@ import (
|
||||
|
||||
"github.com/mark3labs/mcp-go/client"
|
||||
"github.com/mark3labs/mcp-go/mcp"
|
||||
|
||||
"github.com/manusa/kubernetes-mcp-server/pkg/config"
|
||||
)
|
||||
|
||||
func TestWatchKubeConfig(t *testing.T) {
|
||||
@@ -99,7 +97,6 @@ func TestSseHeaders(t *testing.T) {
|
||||
defer mockServer.Close()
|
||||
before := func(c *mcpContext) {
|
||||
c.withKubeConfig(mockServer.config)
|
||||
c.withStaticConfig(&config.StaticConfig{})
|
||||
c.clientOptions = append(c.clientOptions, client.WithHeaders(map[string]string{"kubernetes-authorization": "Bearer a-token-from-mcp-client"}))
|
||||
}
|
||||
pathHeaders := make(map[string]http.Header, 0)
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
package mcp
|
||||
|
||||
import (
|
||||
"github.com/manusa/kubernetes-mcp-server/pkg/config"
|
||||
"github.com/manusa/kubernetes-mcp-server/pkg/output"
|
||||
"github.com/mark3labs/mcp-go/mcp"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
@@ -48,6 +49,25 @@ func TestNamespacesList(t *testing.T) {
|
||||
})
|
||||
}
|
||||
|
||||
func TestNamespacesListDenied(t *testing.T) {
|
||||
deniedResourcesServer := &config.StaticConfig{DeniedResources: []config.GroupVersionKind{{Version: "v1", Kind: "Namespace"}}}
|
||||
testCaseWithContext(t, &mcpContext{staticConfig: deniedResourcesServer}, func(c *mcpContext) {
|
||||
c.withEnvTest()
|
||||
namespacesList, _ := c.callTool("namespaces_list", map[string]interface{}{})
|
||||
t.Run("namespaces_list has error", func(t *testing.T) {
|
||||
if !namespacesList.IsError {
|
||||
t.Fatalf("call tool should fail")
|
||||
}
|
||||
})
|
||||
t.Run("namespaces_list describes denial", func(t *testing.T) {
|
||||
expectedMessage := "failed to list namespaces: resource not allowed: /v1, Kind=Namespace"
|
||||
if namespacesList.Content[0].(mcp.TextContent).Text != expectedMessage {
|
||||
t.Fatalf("expected desciptive error '%s', got %v", expectedMessage, namespacesList.Content[0].(mcp.TextContent).Text)
|
||||
}
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
func TestNamespacesListAsTable(t *testing.T) {
|
||||
testCaseWithContext(t, &mcpContext{listOutput: output.Table}, func(c *mcpContext) {
|
||||
c.withEnvTest()
|
||||
@@ -133,3 +153,22 @@ func TestProjectsListInOpenShift(t *testing.T) {
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
func TestProjectsListInOpenShiftDenied(t *testing.T) {
|
||||
deniedResourcesServer := &config.StaticConfig{DeniedResources: []config.GroupVersionKind{{Group: "project.openshift.io", Version: "v1"}}}
|
||||
testCaseWithContext(t, &mcpContext{staticConfig: deniedResourcesServer, before: inOpenShift, after: inOpenShiftClear}, func(c *mcpContext) {
|
||||
c.withEnvTest()
|
||||
projectsList, _ := c.callTool("projects_list", map[string]interface{}{})
|
||||
t.Run("projects_list has error", func(t *testing.T) {
|
||||
if !projectsList.IsError {
|
||||
t.Fatalf("call tool should fail")
|
||||
}
|
||||
})
|
||||
t.Run("projects_list describes denial", func(t *testing.T) {
|
||||
expectedMessage := "failed to list projects: resource not allowed: project.openshift.io/v1, Kind=Project"
|
||||
if projectsList.Content[0].(mcp.TextContent).Text != expectedMessage {
|
||||
t.Fatalf("expected desciptive error '%s', got %v", expectedMessage, projectsList.Content[0].(mcp.TextContent).Text)
|
||||
}
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
@@ -288,7 +288,7 @@ func (s *Server) podsRun(ctx context.Context, ctr mcp.CallToolRequest) (*mcp.Cal
|
||||
}
|
||||
resources, err := s.k.Derived(ctx).PodsRun(ctx, ns.(string), name.(string), image.(string), int32(port.(float64)))
|
||||
if err != nil {
|
||||
return NewTextResult("", fmt.Errorf("failed to get pod %s log in namespace %s: %v", name, ns, err)), nil
|
||||
return NewTextResult("", fmt.Errorf("failed to run pod %s in namespace %s: %v", name, ns, err)), nil
|
||||
}
|
||||
marshalledYaml, err := output.MarshalYaml(resources)
|
||||
if err != nil {
|
||||
|
||||
@@ -99,3 +99,7 @@ func TestPodsExec(t *testing.T) {
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
func TestPodsExecDenied(t *testing.T) {
|
||||
t.Skip("To be implemented") // TODO: exec is not checking for denied resources
|
||||
}
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
package mcp
|
||||
|
||||
import (
|
||||
"github.com/manusa/kubernetes-mcp-server/pkg/config"
|
||||
"github.com/manusa/kubernetes-mcp-server/pkg/output"
|
||||
"regexp"
|
||||
"strings"
|
||||
@@ -176,6 +177,37 @@ func TestPodsListInNamespace(t *testing.T) {
|
||||
})
|
||||
}
|
||||
|
||||
func TestPodsListDenied(t *testing.T) {
|
||||
deniedResourcesServer := &config.StaticConfig{DeniedResources: []config.GroupVersionKind{{Version: "v1", Kind: "Pod"}}}
|
||||
testCaseWithContext(t, &mcpContext{staticConfig: deniedResourcesServer}, func(c *mcpContext) {
|
||||
c.withEnvTest()
|
||||
podsList, _ := c.callTool("pods_list", map[string]interface{}{})
|
||||
t.Run("pods_list has error", func(t *testing.T) {
|
||||
if !podsList.IsError {
|
||||
t.Fatalf("call tool should fail")
|
||||
}
|
||||
})
|
||||
t.Run("pods_list describes denial", func(t *testing.T) {
|
||||
expectedMessage := "failed to list pods in all namespaces: resource not allowed: /v1, Kind=Pod"
|
||||
if podsList.Content[0].(mcp.TextContent).Text != expectedMessage {
|
||||
t.Fatalf("expected desciptive error '%s', got %v", expectedMessage, podsList.Content[0].(mcp.TextContent).Text)
|
||||
}
|
||||
})
|
||||
podsListInNamespace, _ := c.callTool("pods_list_in_namespace", map[string]interface{}{"namespace": "ns-1"})
|
||||
t.Run("pods_list_in_namespace has error", func(t *testing.T) {
|
||||
if !podsListInNamespace.IsError {
|
||||
t.Fatalf("call tool should fail")
|
||||
}
|
||||
})
|
||||
t.Run("pods_list_in_namespace describes denial", func(t *testing.T) {
|
||||
expectedMessage := "failed to list pods in namespace ns-1: resource not allowed: /v1, Kind=Pod"
|
||||
if podsListInNamespace.Content[0].(mcp.TextContent).Text != expectedMessage {
|
||||
t.Fatalf("expected desciptive error '%s', got %v", expectedMessage, podsListInNamespace.Content[0].(mcp.TextContent).Text)
|
||||
}
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
func TestPodsListAsTable(t *testing.T) {
|
||||
testCaseWithContext(t, &mcpContext{listOutput: output.Table}, func(c *mcpContext) {
|
||||
c.withEnvTest()
|
||||
@@ -380,6 +412,25 @@ func TestPodsGet(t *testing.T) {
|
||||
})
|
||||
}
|
||||
|
||||
func TestPodsGetDenied(t *testing.T) {
|
||||
deniedResourcesServer := &config.StaticConfig{DeniedResources: []config.GroupVersionKind{{Version: "v1", Kind: "Pod"}}}
|
||||
testCaseWithContext(t, &mcpContext{staticConfig: deniedResourcesServer}, func(c *mcpContext) {
|
||||
c.withEnvTest()
|
||||
podsGet, _ := c.callTool("pods_get", map[string]interface{}{"name": "a-pod-in-default"})
|
||||
t.Run("pods_get has error", func(t *testing.T) {
|
||||
if !podsGet.IsError {
|
||||
t.Fatalf("call tool should fail")
|
||||
}
|
||||
})
|
||||
t.Run("pods_get describes denial", func(t *testing.T) {
|
||||
expectedMessage := "failed to get pod a-pod-in-default in namespace : resource not allowed: /v1, Kind=Pod"
|
||||
if podsGet.Content[0].(mcp.TextContent).Text != expectedMessage {
|
||||
t.Fatalf("expected desciptive error '%s', got %v", expectedMessage, podsGet.Content[0].(mcp.TextContent).Text)
|
||||
}
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
func TestPodsDelete(t *testing.T) {
|
||||
testCase(t, func(c *mcpContext) {
|
||||
c.withEnvTest()
|
||||
@@ -511,6 +562,26 @@ func TestPodsDelete(t *testing.T) {
|
||||
})
|
||||
}
|
||||
|
||||
func TestPodsDeleteDenied(t *testing.T) {
|
||||
t.Skip("To be implemented") // TODO: delete is not checking for denied resources
|
||||
deniedResourcesServer := &config.StaticConfig{DeniedResources: []config.GroupVersionKind{{Version: "v1", Kind: "Pod"}}}
|
||||
testCaseWithContext(t, &mcpContext{staticConfig: deniedResourcesServer}, func(c *mcpContext) {
|
||||
c.withEnvTest()
|
||||
podsDelete, _ := c.callTool("pods_delete", map[string]interface{}{"name": "a-pod-in-default"})
|
||||
t.Run("pods_delete has error", func(t *testing.T) {
|
||||
if !podsDelete.IsError {
|
||||
t.Fatalf("call tool should fail")
|
||||
}
|
||||
})
|
||||
t.Run("pods_delete describes denial", func(t *testing.T) {
|
||||
expectedMessage := "failed to delete pod a-pod-in-default in namespace : resource not allowed: /v1, Kind=Pod"
|
||||
if podsDelete.Content[0].(mcp.TextContent).Text != expectedMessage {
|
||||
t.Fatalf("expected desciptive error '%s', got %v", expectedMessage, podsDelete.Content[0].(mcp.TextContent).Text)
|
||||
}
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
func TestPodsDeleteInOpenShift(t *testing.T) {
|
||||
testCaseWithContext(t, &mcpContext{before: inOpenShift, after: inOpenShiftClear}, func(c *mcpContext) {
|
||||
managedLabels := map[string]string{
|
||||
@@ -651,6 +722,26 @@ func TestPodsLog(t *testing.T) {
|
||||
})
|
||||
}
|
||||
|
||||
func TestPodsLogDenied(t *testing.T) {
|
||||
t.Skip("To be implemented") // TODO: log is not checking for denied resources
|
||||
deniedResourcesServer := &config.StaticConfig{DeniedResources: []config.GroupVersionKind{{Version: "v1", Kind: "Pod"}}}
|
||||
testCaseWithContext(t, &mcpContext{staticConfig: deniedResourcesServer}, func(c *mcpContext) {
|
||||
c.withEnvTest()
|
||||
podsLog, _ := c.callTool("pods_log", map[string]interface{}{"name": "a-pod-in-default"})
|
||||
t.Run("pods_log has error", func(t *testing.T) {
|
||||
if !podsLog.IsError {
|
||||
t.Fatalf("call tool should fail")
|
||||
}
|
||||
})
|
||||
t.Run("pods_log describes denial", func(t *testing.T) {
|
||||
expectedMessage := "failed to log pod a-pod-in-default in namespace : resource not allowed: /v1, Kind=Pod"
|
||||
if podsLog.Content[0].(mcp.TextContent).Text != expectedMessage {
|
||||
t.Fatalf("expected desciptive error '%s', got %v", expectedMessage, podsLog.Content[0].(mcp.TextContent).Text)
|
||||
}
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
func TestPodsRun(t *testing.T) {
|
||||
testCase(t, func(c *mcpContext) {
|
||||
c.withEnvTest()
|
||||
@@ -801,6 +892,25 @@ func TestPodsRun(t *testing.T) {
|
||||
})
|
||||
}
|
||||
|
||||
func TestPodsRunDenied(t *testing.T) {
|
||||
deniedResourcesServer := &config.StaticConfig{DeniedResources: []config.GroupVersionKind{{Version: "v1", Kind: "Pod"}}}
|
||||
testCaseWithContext(t, &mcpContext{staticConfig: deniedResourcesServer}, func(c *mcpContext) {
|
||||
c.withEnvTest()
|
||||
podsRun, _ := c.callTool("pods_run", map[string]interface{}{"image": "nginx"})
|
||||
t.Run("pods_run has error", func(t *testing.T) {
|
||||
if !podsRun.IsError {
|
||||
t.Fatalf("call tool should fail")
|
||||
}
|
||||
})
|
||||
t.Run("pods_run describes denial", func(t *testing.T) {
|
||||
expectedMessage := "failed to run pod in namespace : resource not allowed: /v1, Kind=Pod"
|
||||
if podsRun.Content[0].(mcp.TextContent).Text != expectedMessage {
|
||||
t.Fatalf("expected desciptive error '%s', got %v", expectedMessage, podsRun.Content[0].(mcp.TextContent).Text)
|
||||
}
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
func TestPodsRunInOpenShift(t *testing.T) {
|
||||
testCaseWithContext(t, &mcpContext{before: inOpenShift, after: inOpenShiftClear}, func(c *mcpContext) {
|
||||
t.Run("pods_run with image, namespace, and port returns route with port", func(t *testing.T) {
|
||||
|
||||
@@ -204,3 +204,7 @@ func TestPodsTopMetricsAvailable(t *testing.T) {
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
func TestPodsTopDenied(t *testing.T) {
|
||||
t.Skip("To be implemented") // TODO: top is not checking for denied resources
|
||||
}
|
||||
|
||||
@@ -1,18 +1,21 @@
|
||||
package mcp
|
||||
|
||||
import (
|
||||
"github.com/manusa/kubernetes-mcp-server/pkg/output"
|
||||
corev1 "k8s.io/api/core/v1"
|
||||
"regexp"
|
||||
"strings"
|
||||
"testing"
|
||||
|
||||
"github.com/mark3labs/mcp-go/mcp"
|
||||
corev1 "k8s.io/api/core/v1"
|
||||
v1 "k8s.io/api/rbac/v1"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
|
||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||
"k8s.io/client-go/dynamic"
|
||||
"sigs.k8s.io/yaml"
|
||||
|
||||
"github.com/manusa/kubernetes-mcp-server/pkg/config"
|
||||
"github.com/manusa/kubernetes-mcp-server/pkg/output"
|
||||
)
|
||||
|
||||
func TestResourcesList(t *testing.T) {
|
||||
@@ -54,26 +57,6 @@ func TestResourcesList(t *testing.T) {
|
||||
t.Fatalf("invalid error message, got %v", toolResult.Content[0].(mcp.TextContent).Text)
|
||||
}
|
||||
})
|
||||
t.Run("resources_list with a resource in denied list as kind", func(t *testing.T) {
|
||||
toolResult, _ := c.callTool("resources_list", map[string]interface{}{"apiVersion": "v1", "kind": "Secret"})
|
||||
if !toolResult.IsError {
|
||||
t.Fatalf("call tool should fail")
|
||||
}
|
||||
//failed to list resources: resource not allowed: /v1, Kind=Secret
|
||||
if toolResult.Content[0].(mcp.TextContent).Text != `failed to list resources: resource not allowed: /v1, Kind=Secret` {
|
||||
t.Fatalf("invalid error message, got %v", toolResult.Content[0].(mcp.TextContent).Text)
|
||||
}
|
||||
})
|
||||
t.Run("resources_list with a resource in denied list as group", func(t *testing.T) {
|
||||
toolResult, _ := c.callTool("resources_list", map[string]interface{}{"apiVersion": "rbac.authorization.k8s.io/v1", "kind": "Role"})
|
||||
if !toolResult.IsError {
|
||||
t.Fatalf("call tool should fail")
|
||||
}
|
||||
//failed to list resources: resource not allowed: /v1, Kind=Secret
|
||||
if toolResult.Content[0].(mcp.TextContent).Text != `failed to list resources: resource not allowed: rbac.authorization.k8s.io/v1, Kind=Role` {
|
||||
t.Fatalf("invalid error message, got %v", toolResult.Content[0].(mcp.TextContent).Text)
|
||||
}
|
||||
})
|
||||
namespaces, err := c.callTool("resources_list", map[string]interface{}{"apiVersion": "v1", "kind": "Namespace"})
|
||||
t.Run("resources_list returns namespaces", func(t *testing.T) {
|
||||
if err != nil {
|
||||
@@ -168,6 +151,48 @@ func TestResourcesList(t *testing.T) {
|
||||
})
|
||||
}
|
||||
|
||||
func TestResourcesListDenied(t *testing.T) {
|
||||
deniedResourcesServer := &config.StaticConfig{
|
||||
DeniedResources: []config.GroupVersionKind{
|
||||
{Version: "v1", Kind: "Secret"},
|
||||
{Group: "rbac.authorization.k8s.io", Version: "v1"},
|
||||
},
|
||||
}
|
||||
testCaseWithContext(t, &mcpContext{staticConfig: deniedResourcesServer}, func(c *mcpContext) {
|
||||
c.withEnvTest()
|
||||
deniedByKind, _ := c.callTool("resources_list", map[string]interface{}{"apiVersion": "v1", "kind": "Secret"})
|
||||
t.Run("resources_list (denied by kind) has error", func(t *testing.T) {
|
||||
if !deniedByKind.IsError {
|
||||
t.Fatalf("call tool should fail")
|
||||
}
|
||||
})
|
||||
t.Run("resources_list (denied by kind) describes denial", func(t *testing.T) {
|
||||
expectedMessage := "failed to list resources: resource not allowed: /v1, Kind=Secret"
|
||||
if deniedByKind.Content[0].(mcp.TextContent).Text != expectedMessage {
|
||||
t.Fatalf("expected desciptive error '%s', got %v", expectedMessage, deniedByKind.Content[0].(mcp.TextContent).Text)
|
||||
}
|
||||
})
|
||||
deniedByGroup, _ := c.callTool("resources_list", map[string]interface{}{"apiVersion": "rbac.authorization.k8s.io/v1", "kind": "Role"})
|
||||
t.Run("resources_list (denied by group) has error", func(t *testing.T) {
|
||||
if !deniedByGroup.IsError {
|
||||
t.Fatalf("call tool should fail")
|
||||
}
|
||||
})
|
||||
t.Run("resources_list (denied by group) describes denial", func(t *testing.T) {
|
||||
expectedMessage := "failed to list resources: resource not allowed: rbac.authorization.k8s.io/v1, Kind=Role"
|
||||
if deniedByGroup.Content[0].(mcp.TextContent).Text != expectedMessage {
|
||||
t.Fatalf("expected desciptive error '%s', got %v", expectedMessage, deniedByKind.Content[0].(mcp.TextContent).Text)
|
||||
}
|
||||
})
|
||||
allowedResource, _ := c.callTool("resources_list", map[string]interface{}{"apiVersion": "v1", "kind": "Namespace"})
|
||||
t.Run("resources_list (not denied) returns list", func(t *testing.T) {
|
||||
if allowedResource.IsError {
|
||||
t.Fatalf("call tool should not fail")
|
||||
}
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
func TestResourcesListAsTable(t *testing.T) {
|
||||
testCaseWithContext(t, &mcpContext{listOutput: output.Table, before: inOpenShift, after: inOpenShiftClear}, func(c *mcpContext) {
|
||||
c.withEnvTest()
|
||||
@@ -331,6 +356,55 @@ func TestResourcesGet(t *testing.T) {
|
||||
})
|
||||
}
|
||||
|
||||
func TestResourcesGetDenied(t *testing.T) {
|
||||
deniedResourcesServer := &config.StaticConfig{
|
||||
DeniedResources: []config.GroupVersionKind{
|
||||
{Version: "v1", Kind: "Secret"},
|
||||
{Group: "rbac.authorization.k8s.io", Version: "v1"},
|
||||
},
|
||||
}
|
||||
testCaseWithContext(t, &mcpContext{staticConfig: deniedResourcesServer}, func(c *mcpContext) {
|
||||
c.withEnvTest()
|
||||
kc := c.newKubernetesClient()
|
||||
_, _ = kc.CoreV1().Secrets("default").Create(c.ctx, &corev1.Secret{
|
||||
ObjectMeta: metav1.ObjectMeta{Name: "denied-secret"},
|
||||
}, metav1.CreateOptions{})
|
||||
_, _ = kc.RbacV1().Roles("default").Create(c.ctx, &v1.Role{
|
||||
ObjectMeta: metav1.ObjectMeta{Name: "denied-role"},
|
||||
}, metav1.CreateOptions{})
|
||||
deniedByKind, _ := c.callTool("resources_get", map[string]interface{}{"apiVersion": "v1", "kind": "Secret", "namespace": "default", "name": "denied-secret"})
|
||||
t.Run("resources_get (denied by kind) has error", func(t *testing.T) {
|
||||
if !deniedByKind.IsError {
|
||||
t.Fatalf("call tool should fail")
|
||||
}
|
||||
})
|
||||
t.Run("resources_get (denied by kind) describes denial", func(t *testing.T) {
|
||||
expectedMessage := "failed to get resource: resource not allowed: /v1, Kind=Secret"
|
||||
if deniedByKind.Content[0].(mcp.TextContent).Text != expectedMessage {
|
||||
t.Fatalf("expected desciptive error '%s', got %v", expectedMessage, deniedByKind.Content[0].(mcp.TextContent).Text)
|
||||
}
|
||||
})
|
||||
deniedByGroup, _ := c.callTool("resources_get", map[string]interface{}{"apiVersion": "rbac.authorization.k8s.io/v1", "kind": "Role", "namespace": "default", "name": "denied-role"})
|
||||
t.Run("resources_get (denied by group) has error", func(t *testing.T) {
|
||||
if !deniedByGroup.IsError {
|
||||
t.Fatalf("call tool should fail")
|
||||
}
|
||||
})
|
||||
t.Run("resources_get (denied by group) describes denial", func(t *testing.T) {
|
||||
expectedMessage := "failed to get resource: resource not allowed: rbac.authorization.k8s.io/v1, Kind=Role"
|
||||
if deniedByGroup.Content[0].(mcp.TextContent).Text != expectedMessage {
|
||||
t.Fatalf("expected desciptive error '%s', got %v", expectedMessage, deniedByKind.Content[0].(mcp.TextContent).Text)
|
||||
}
|
||||
})
|
||||
allowedResource, _ := c.callTool("resources_get", map[string]interface{}{"apiVersion": "v1", "kind": "Namespace", "name": "default"})
|
||||
t.Run("resources_get (not denied) returns resource", func(t *testing.T) {
|
||||
if allowedResource.IsError {
|
||||
t.Fatalf("call tool should not fail")
|
||||
}
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
func TestResourcesCreateOrUpdate(t *testing.T) {
|
||||
testCase(t, func(c *mcpContext) {
|
||||
c.withEnvTest()
|
||||
@@ -508,6 +582,51 @@ func TestResourcesCreateOrUpdate(t *testing.T) {
|
||||
})
|
||||
}
|
||||
|
||||
func TestResourcesCreateOrUpdateDenied(t *testing.T) {
|
||||
deniedResourcesServer := &config.StaticConfig{
|
||||
DeniedResources: []config.GroupVersionKind{
|
||||
{Version: "v1", Kind: "Secret"},
|
||||
{Group: "rbac.authorization.k8s.io", Version: "v1"},
|
||||
},
|
||||
}
|
||||
testCaseWithContext(t, &mcpContext{staticConfig: deniedResourcesServer}, func(c *mcpContext) {
|
||||
c.withEnvTest()
|
||||
secretYaml := "apiVersion: v1\nkind: Secret\nmetadata:\n name: a-denied-secret\n namespace: default\n"
|
||||
deniedByKind, _ := c.callTool("resources_create_or_update", map[string]interface{}{"resource": secretYaml})
|
||||
t.Run("resources_create_or_update (denied by kind) has error", func(t *testing.T) {
|
||||
if !deniedByKind.IsError {
|
||||
t.Fatalf("call tool should fail")
|
||||
}
|
||||
})
|
||||
t.Run("resources_create_or_update (denied by kind) describes denial", func(t *testing.T) {
|
||||
expectedMessage := "failed to create or update resources: resource not allowed: /v1, Kind=Secret"
|
||||
if deniedByKind.Content[0].(mcp.TextContent).Text != expectedMessage {
|
||||
t.Fatalf("expected desciptive error '%s', got %v", expectedMessage, deniedByKind.Content[0].(mcp.TextContent).Text)
|
||||
}
|
||||
})
|
||||
roleYaml := "apiVersion: rbac.authorization.k8s.io/v1\nkind: Role\nmetadata:\n name: a-denied-role\n namespace: default\n"
|
||||
deniedByGroup, _ := c.callTool("resources_create_or_update", map[string]interface{}{"resource": roleYaml})
|
||||
t.Run("resources_create_or_update (denied by group) has error", func(t *testing.T) {
|
||||
if !deniedByGroup.IsError {
|
||||
t.Fatalf("call tool should fail")
|
||||
}
|
||||
})
|
||||
t.Run("resources_create_or_update (denied by group) describes denial", func(t *testing.T) {
|
||||
expectedMessage := "failed to create or update resources: resource not allowed: rbac.authorization.k8s.io/v1, Kind=Role"
|
||||
if deniedByGroup.Content[0].(mcp.TextContent).Text != expectedMessage {
|
||||
t.Fatalf("expected desciptive error '%s', got %v", expectedMessage, deniedByKind.Content[0].(mcp.TextContent).Text)
|
||||
}
|
||||
})
|
||||
configMapYaml := "apiVersion: v1\nkind: ConfigMap\nmetadata:\n name: a-cm-created-or-updated\n namespace: default\n"
|
||||
allowedResource, _ := c.callTool("resources_create_or_update", map[string]interface{}{"resource": configMapYaml})
|
||||
t.Run("resources_create_or_update (not denied) creates or updates resource", func(t *testing.T) {
|
||||
if allowedResource.IsError {
|
||||
t.Fatalf("call tool should not fail")
|
||||
}
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
func TestResourcesDelete(t *testing.T) {
|
||||
testCase(t, func(c *mcpContext) {
|
||||
c.withEnvTest()
|
||||
@@ -624,3 +743,49 @@ func TestResourcesDelete(t *testing.T) {
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
func TestResourcesDeleteDenied(t *testing.T) {
|
||||
deniedResourcesServer := &config.StaticConfig{
|
||||
DeniedResources: []config.GroupVersionKind{
|
||||
{Version: "v1", Kind: "Secret"},
|
||||
{Group: "rbac.authorization.k8s.io", Version: "v1"},
|
||||
},
|
||||
}
|
||||
testCaseWithContext(t, &mcpContext{staticConfig: deniedResourcesServer}, func(c *mcpContext) {
|
||||
c.withEnvTest()
|
||||
kc := c.newKubernetesClient()
|
||||
_, _ = kc.CoreV1().ConfigMaps("default").Create(c.ctx, &corev1.ConfigMap{
|
||||
ObjectMeta: metav1.ObjectMeta{Name: "allowed-configmap-to-delete"},
|
||||
}, metav1.CreateOptions{})
|
||||
deniedByKind, _ := c.callTool("resources_delete", map[string]interface{}{"apiVersion": "v1", "kind": "Secret", "namespace": "default", "name": "denied-secret"})
|
||||
t.Run("resources_delete (denied by kind) has error", func(t *testing.T) {
|
||||
if !deniedByKind.IsError {
|
||||
t.Fatalf("call tool should fail")
|
||||
}
|
||||
})
|
||||
t.Run("resources_delete (denied by kind) describes denial", func(t *testing.T) {
|
||||
expectedMessage := "failed to delete resource: resource not allowed: /v1, Kind=Secret"
|
||||
if deniedByKind.Content[0].(mcp.TextContent).Text != expectedMessage {
|
||||
t.Fatalf("expected desciptive error '%s', got %v", expectedMessage, deniedByKind.Content[0].(mcp.TextContent).Text)
|
||||
}
|
||||
})
|
||||
deniedByGroup, _ := c.callTool("resources_delete", map[string]interface{}{"apiVersion": "rbac.authorization.k8s.io/v1", "kind": "Role", "namespace": "default", "name": "denied-role"})
|
||||
t.Run("resources_delete (denied by group) has error", func(t *testing.T) {
|
||||
if !deniedByGroup.IsError {
|
||||
t.Fatalf("call tool should fail")
|
||||
}
|
||||
})
|
||||
t.Run("resources_delete (denied by group) describes denial", func(t *testing.T) {
|
||||
expectedMessage := "failed to delete resource: resource not allowed: rbac.authorization.k8s.io/v1, Kind=Role"
|
||||
if deniedByGroup.Content[0].(mcp.TextContent).Text != expectedMessage {
|
||||
t.Fatalf("expected desciptive error '%s', got %v", expectedMessage, deniedByKind.Content[0].(mcp.TextContent).Text)
|
||||
}
|
||||
})
|
||||
allowedResource, _ := c.callTool("resources_delete", map[string]interface{}{"apiVersion": "v1", "kind": "ConfigMap", "name": "allowed-configmap-to-delete"})
|
||||
t.Run("resources_delete (not denied) deletes resource", func(t *testing.T) {
|
||||
if allowedResource.IsError {
|
||||
t.Fatalf("call tool should not fail")
|
||||
}
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
5
pkg/mcp/testdata/helm-chart-secret/Chart.yaml
vendored
Normal file
5
pkg/mcp/testdata/helm-chart-secret/Chart.yaml
vendored
Normal file
@@ -0,0 +1,5 @@
|
||||
apiVersion: v2
|
||||
name: secret-chart
|
||||
version: 0.1.0
|
||||
type: application
|
||||
|
||||
12
pkg/mcp/testdata/helm-chart-secret/templates/secret.yaml
vendored
Normal file
12
pkg/mcp/testdata/helm-chart-secret/templates/secret.yaml
vendored
Normal file
@@ -0,0 +1,12 @@
|
||||
apiVersion: v1
|
||||
kind: Secret
|
||||
metadata:
|
||||
name: {{ .Release.Name }}-secret
|
||||
labels:
|
||||
app.kubernetes.io/managed-by: {{ .Release.Service }}
|
||||
app.kubernetes.io/instance: {{ .Release.Name }}
|
||||
type: Opaque
|
||||
data:
|
||||
username: {{ b64enc "aitana" }}
|
||||
password: {{ b64enc "alex" }}
|
||||
|
||||
Reference in New Issue
Block a user