mirror of
https://github.com/containers/kubernetes-mcp-server.git
synced 2025-10-23 01:22:57 +03:00
test(mcp): refactor core toolset tests (namespaces) (#330)
Signed-off-by: Marc Nuri <marc@marcnuri.com>
This commit is contained in:
@@ -429,6 +429,7 @@ type BaseMcpSuite struct {
|
||||
|
||||
func (s *BaseMcpSuite) SetupTest() {
|
||||
s.Cfg = config.Default()
|
||||
s.Cfg.ListOutput = "yaml"
|
||||
s.Cfg.KubeConfig = filepath.Join(s.T().TempDir(), "config")
|
||||
s.Require().NoError(os.WriteFile(s.Cfg.KubeConfig, envTest.KubeConfig, 0600), "Expected to write kubeconfig")
|
||||
}
|
||||
|
||||
@@ -107,15 +107,15 @@ func (s *EventsSuite) TestEventsListDenied() {
|
||||
`), s.Cfg), "Expected to parse denied resources config")
|
||||
s.InitMcpClient()
|
||||
s.Run("events_list (denied)", func() {
|
||||
eventList, err := s.CallTool("events_list", map[string]interface{}{})
|
||||
s.Run("events_list has error", func() {
|
||||
s.Truef(eventList.IsError, "call tool should fail")
|
||||
toolResult, err := s.CallTool("events_list", map[string]interface{}{})
|
||||
s.Run("has error", func() {
|
||||
s.Truef(toolResult.IsError, "call tool should fail")
|
||||
s.Nilf(err, "call tool should not return error object")
|
||||
})
|
||||
s.Run("events_list describes denial", func() {
|
||||
s.Run("describes denial", func() {
|
||||
expectedMessage := "failed to list events in all namespaces: resource not allowed: /v1, Kind=Event"
|
||||
s.Equalf(expectedMessage, eventList.Content[0].(mcp.TextContent).Text,
|
||||
"expected descriptive error '%s', got %v", expectedMessage, eventList.Content[0].(mcp.TextContent).Text)
|
||||
s.Equalf(expectedMessage, toolResult.Content[0].(mcp.TextContent).Text,
|
||||
"expected descriptive error '%s', got %v", expectedMessage, toolResult.Content[0].(mcp.TextContent).Text)
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
@@ -5,7 +5,9 @@ import (
|
||||
"slices"
|
||||
"testing"
|
||||
|
||||
"github.com/BurntSushi/toml"
|
||||
"github.com/mark3labs/mcp-go/mcp"
|
||||
"github.com/stretchr/testify/suite"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
|
||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||
@@ -14,108 +16,100 @@ import (
|
||||
|
||||
"github.com/containers/kubernetes-mcp-server/internal/test"
|
||||
"github.com/containers/kubernetes-mcp-server/pkg/config"
|
||||
"github.com/containers/kubernetes-mcp-server/pkg/output"
|
||||
)
|
||||
|
||||
func TestNamespacesList(t *testing.T) {
|
||||
testCase(t, func(c *mcpContext) {
|
||||
c.withEnvTest()
|
||||
toolResult, err := c.callTool("namespaces_list", map[string]interface{}{})
|
||||
t.Run("namespaces_list returns namespace list", func(t *testing.T) {
|
||||
if err != nil {
|
||||
t.Fatalf("call tool failed %v", err)
|
||||
}
|
||||
if toolResult.IsError {
|
||||
t.Fatalf("call tool failed")
|
||||
}
|
||||
type NamespacesSuite struct {
|
||||
BaseMcpSuite
|
||||
}
|
||||
|
||||
func (s *NamespacesSuite) TestNamespacesList() {
|
||||
s.InitMcpClient()
|
||||
s.Run("namespaces_list", func() {
|
||||
toolResult, err := s.CallTool("namespaces_list", map[string]interface{}{})
|
||||
s.Run("no error", func() {
|
||||
s.Nilf(err, "call tool failed %v", err)
|
||||
s.Falsef(toolResult.IsError, "call tool failed")
|
||||
})
|
||||
s.Require().NotNil(toolResult, "Expected tool result from call")
|
||||
var decoded []unstructured.Unstructured
|
||||
err = yaml.Unmarshal([]byte(toolResult.Content[0].(mcp.TextContent).Text), &decoded)
|
||||
t.Run("namespaces_list has yaml content", func(t *testing.T) {
|
||||
if err != nil {
|
||||
t.Fatalf("invalid tool result content %v", err)
|
||||
}
|
||||
s.Run("has yaml content", func() {
|
||||
s.Nilf(err, "invalid tool result content %v", err)
|
||||
})
|
||||
t.Run("namespaces_list returns at least 3 items", func(t *testing.T) {
|
||||
if len(decoded) < 3 {
|
||||
t.Errorf("invalid namespace count, expected at least 3, got %v", len(decoded))
|
||||
}
|
||||
s.Run("returns at least 3 items", func() {
|
||||
s.Truef(len(decoded) >= 3, "expected at least 3 items, got %v", len(decoded))
|
||||
for _, expectedNamespace := range []string{"default", "ns-1", "ns-2"} {
|
||||
idx := slices.IndexFunc(decoded, func(ns unstructured.Unstructured) bool {
|
||||
s.Truef(slices.ContainsFunc(decoded, func(ns unstructured.Unstructured) bool {
|
||||
return ns.GetName() == expectedNamespace
|
||||
})
|
||||
if idx == -1 {
|
||||
t.Errorf("namespace %s not found in the list", expectedNamespace)
|
||||
}
|
||||
}), "namespace %s not found in the list", expectedNamespace)
|
||||
}
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
func TestNamespacesListDenied(t *testing.T) {
|
||||
deniedResourcesServer := test.Must(config.ReadToml([]byte(`
|
||||
func (s *NamespacesSuite) TestNamespacesListDenied() {
|
||||
s.Require().NoError(toml.Unmarshal([]byte(`
|
||||
denied_resources = [ { version = "v1", kind = "Namespace" } ]
|
||||
`)))
|
||||
testCaseWithContext(t, &mcpContext{staticConfig: deniedResourcesServer}, func(c *mcpContext) {
|
||||
c.withEnvTest()
|
||||
namespacesList, _ := c.callTool("namespaces_list", map[string]interface{}{})
|
||||
t.Run("namespaces_list has error", func(t *testing.T) {
|
||||
if !namespacesList.IsError {
|
||||
t.Fatalf("call tool should fail")
|
||||
}
|
||||
`), s.Cfg), "Expected to parse denied resources config")
|
||||
s.InitMcpClient()
|
||||
s.Run("namespaces_list (denied)", func() {
|
||||
toolResult, err := s.CallTool("namespaces_list", map[string]interface{}{})
|
||||
s.Run("has error", func() {
|
||||
s.Truef(toolResult.IsError, "call tool should fail")
|
||||
s.Nilf(err, "call tool should not return error object")
|
||||
})
|
||||
t.Run("namespaces_list describes denial", func(t *testing.T) {
|
||||
s.Run("describes denial", func() {
|
||||
expectedMessage := "failed to list namespaces: resource not allowed: /v1, Kind=Namespace"
|
||||
if namespacesList.Content[0].(mcp.TextContent).Text != expectedMessage {
|
||||
t.Fatalf("expected descriptive error '%s', got %v", expectedMessage, namespacesList.Content[0].(mcp.TextContent).Text)
|
||||
}
|
||||
s.Equalf(expectedMessage, toolResult.Content[0].(mcp.TextContent).Text,
|
||||
"expected descriptive error '%s', got %v", expectedMessage, toolResult.Content[0].(mcp.TextContent).Text)
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
func TestNamespacesListAsTable(t *testing.T) {
|
||||
testCaseWithContext(t, &mcpContext{listOutput: output.Table}, func(c *mcpContext) {
|
||||
c.withEnvTest()
|
||||
toolResult, err := c.callTool("namespaces_list", map[string]interface{}{})
|
||||
t.Run("namespaces_list returns namespace list", func(t *testing.T) {
|
||||
if err != nil {
|
||||
t.Fatalf("call tool failed %v", err)
|
||||
}
|
||||
if toolResult.IsError {
|
||||
t.Fatalf("call tool failed")
|
||||
}
|
||||
func (s *NamespacesSuite) TestNamespacesListAsTable() {
|
||||
s.Cfg.ListOutput = "table"
|
||||
s.InitMcpClient()
|
||||
s.Run("namespaces_list (list_output=table)", func() {
|
||||
toolResult, err := s.CallTool("namespaces_list", map[string]interface{}{})
|
||||
s.Run("no error", func() {
|
||||
s.Nilf(err, "call tool failed %v", err)
|
||||
s.Falsef(toolResult.IsError, "call tool failed")
|
||||
})
|
||||
s.Require().NotNil(toolResult, "Expected tool result from call")
|
||||
out := toolResult.Content[0].(mcp.TextContent).Text
|
||||
t.Run("namespaces_list returns column headers", func(t *testing.T) {
|
||||
s.Run("returns column headers", func() {
|
||||
expectedHeaders := "APIVERSION\\s+KIND\\s+NAME\\s+STATUS\\s+AGE\\s+LABELS"
|
||||
if m, e := regexp.MatchString(expectedHeaders, out); !m || e != nil {
|
||||
t.Fatalf("Expected headers '%s' not found in output:\n%s", expectedHeaders, out)
|
||||
}
|
||||
m, e := regexp.MatchString(expectedHeaders, out)
|
||||
s.Truef(m, "Expected headers '%s' not found in output:\n%s", expectedHeaders, out)
|
||||
s.NoErrorf(e, "Error matching headers regex: %v", e)
|
||||
})
|
||||
t.Run("namespaces_list returns formatted row for ns-1", func(t *testing.T) {
|
||||
s.Run("returns formatted row for ns-1", func() {
|
||||
expectedRow := "(?<apiVersion>v1)\\s+" +
|
||||
"(?<kind>Namespace)\\s+" +
|
||||
"(?<name>ns-1)\\s+" +
|
||||
"(?<status>Active)\\s+" +
|
||||
"(?<age>(\\d+m)?(\\d+s)?)\\s+" +
|
||||
"(?<labels>kubernetes.io/metadata.name=ns-1)"
|
||||
if m, e := regexp.MatchString(expectedRow, out); !m || e != nil {
|
||||
t.Fatalf("Expected row '%s' not found in output:\n%s", expectedRow, out)
|
||||
}
|
||||
m, e := regexp.MatchString(expectedRow, out)
|
||||
s.Truef(m, "Expected row '%s' not found in output:\n%s", expectedRow, out)
|
||||
s.NoErrorf(e, "Error matching ns-1 regex: %v", e)
|
||||
})
|
||||
t.Run("namespaces_list returns formatted row for ns-2", func(t *testing.T) {
|
||||
s.Run("returns formatted row for ns-2", func() {
|
||||
expectedRow := "(?<apiVersion>v1)\\s+" +
|
||||
"(?<kind>Namespace)\\s+" +
|
||||
"(?<name>ns-2)\\s+" +
|
||||
"(?<status>Active)\\s+" +
|
||||
"(?<age>(\\d+m)?(\\d+s)?)\\s+" +
|
||||
"(?<labels>kubernetes.io/metadata.name=ns-2)"
|
||||
if m, e := regexp.MatchString(expectedRow, out); !m || e != nil {
|
||||
t.Fatalf("Expected row '%s' not found in output:\n%s", expectedRow, out)
|
||||
}
|
||||
m, e := regexp.MatchString(expectedRow, out)
|
||||
s.Truef(m, "Expected row '%s' not found in output:\n%s", expectedRow, out)
|
||||
s.NoErrorf(e, "Error matching ns-2 regex: %v", e)
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
func TestNamespaces(t *testing.T) {
|
||||
suite.Run(t, new(NamespacesSuite))
|
||||
}
|
||||
|
||||
func TestProjectsListInOpenShift(t *testing.T) {
|
||||
|
||||
Reference in New Issue
Block a user