test(mcp): refactor configuration toolset tests (#327)

Signed-off-by: Marc Nuri <marc@marcnuri.com>
This commit is contained in:
Marc Nuri
2025-09-17 11:11:50 +02:00
committed by GitHub
parent d9d35b9834
commit f496c643e7

View File

@@ -3,177 +3,151 @@ package mcp
import ( import (
"testing" "testing"
"github.com/containers/kubernetes-mcp-server/pkg/kubernetes"
"github.com/mark3labs/mcp-go/mcp" "github.com/mark3labs/mcp-go/mcp"
"github.com/stretchr/testify/suite"
"k8s.io/client-go/rest" "k8s.io/client-go/rest"
v1 "k8s.io/client-go/tools/clientcmd/api/v1" v1 "k8s.io/client-go/tools/clientcmd/api/v1"
"sigs.k8s.io/yaml" "sigs.k8s.io/yaml"
"github.com/containers/kubernetes-mcp-server/internal/test"
"github.com/containers/kubernetes-mcp-server/pkg/config"
"github.com/containers/kubernetes-mcp-server/pkg/kubernetes"
) )
func TestConfigurationView(t *testing.T) { type ConfigurationSuite struct {
testCase(t, func(c *mcpContext) { suite.Suite
toolResult, err := c.callTool("configuration_view", map[string]interface{}{}) *test.McpClient
t.Run("configuration_view returns configuration", func(t *testing.T) { mcpServer *Server
if err != nil { Cfg *config.StaticConfig
t.Fatalf("call tool failed %v", err) }
}
func (s *ConfigurationSuite) SetupTest() {
s.Cfg = config.Default()
}
func (s *ConfigurationSuite) TearDownTest() {
if s.McpClient != nil {
s.McpClient.Close()
}
if s.mcpServer != nil {
s.mcpServer.Close()
}
}
func (s *ConfigurationSuite) InitMcpClient() {
var err error
s.mcpServer, err = NewServer(Configuration{StaticConfig: s.Cfg})
s.Require().NoError(err, "Expected no error creating MCP server")
s.McpClient = test.NewMcpClient(s.T(), s.mcpServer.ServeHTTP(nil))
}
func (s *ConfigurationSuite) TestConfigurationView() {
// Out of cluster requires kubeconfig
mockServer := test.NewMockServer()
s.T().Cleanup(mockServer.Close)
s.Cfg.KubeConfig = mockServer.KubeconfigFile(s.T())
s.InitMcpClient()
s.Run("configuration_view", func() {
toolResult, err := s.CallTool("configuration_view", map[string]interface{}{})
s.Run("returns configuration", func() {
s.Nilf(err, "call tool failed %v", err)
})
s.Require().NotNil(toolResult, "Expected tool result from call")
var decoded *v1.Config
err = yaml.Unmarshal([]byte(toolResult.Content[0].(mcp.TextContent).Text), &decoded)
s.Run("has yaml content", func() {
s.Nilf(err, "invalid tool result content %v", err)
})
s.Run("returns current-context", func() {
s.Equalf("fake-context", decoded.CurrentContext, "fake-context not found: %v", decoded.CurrentContext)
})
s.Run("returns context info", func() {
s.Lenf(decoded.Contexts, 1, "invalid context count, expected 1, got %v", len(decoded.Contexts))
s.Equalf("fake-context", decoded.Contexts[0].Name, "fake-context not found: %v", decoded.Contexts)
s.Equalf("fake", decoded.Contexts[0].Context.Cluster, "fake-cluster not found: %v", decoded.Contexts)
s.Equalf("fake", decoded.Contexts[0].Context.AuthInfo, "fake-auth not found: %v", decoded.Contexts)
})
s.Run("returns cluster info", func() {
s.Lenf(decoded.Clusters, 1, "invalid cluster count, expected 1, got %v", len(decoded.Clusters))
s.Equalf("fake", decoded.Clusters[0].Name, "fake-cluster not found: %v", decoded.Clusters)
s.Regexpf(`^https?://(127\.0\.0\.1|localhost):\d{1,5}$`, decoded.Clusters[0].Cluster.Server, "fake-server not found: %v", decoded.Clusters)
})
s.Run("returns auth info", func() {
s.Lenf(decoded.AuthInfos, 1, "invalid auth info count, expected 1, got %v", len(decoded.AuthInfos))
s.Equalf("fake", decoded.AuthInfos[0].Name, "fake-auth not found: %v", decoded.AuthInfos)
})
})
s.Run("configuration_view(minified=false)", func() {
toolResult, err := s.CallTool("configuration_view", map[string]interface{}{
"minified": false,
})
s.Run("returns configuration", func() {
s.Nilf(err, "call tool failed %v", err)
}) })
var decoded *v1.Config var decoded *v1.Config
err = yaml.Unmarshal([]byte(toolResult.Content[0].(mcp.TextContent).Text), &decoded) err = yaml.Unmarshal([]byte(toolResult.Content[0].(mcp.TextContent).Text), &decoded)
t.Run("configuration_view has yaml content", func(t *testing.T) { s.Run("has yaml content", func() {
if err != nil { s.Nilf(err, "invalid tool result content %v", err)
t.Fatalf("invalid tool result content %v", err)
}
}) })
t.Run("configuration_view returns current-context", func(t *testing.T) { s.Run("returns additional context info", func() {
if decoded.CurrentContext != "fake-context" { s.Lenf(decoded.Contexts, 2, "invalid context count, expected 2, got %v", len(decoded.Contexts))
t.Errorf("fake-context not found: %v", decoded.CurrentContext) s.Equalf("additional-context", decoded.Contexts[0].Name, "additional-context not found: %v", decoded.Contexts)
} s.Equalf("additional-cluster", decoded.Contexts[0].Context.Cluster, "additional-cluster not found: %v", decoded.Contexts)
s.Equalf("additional-auth", decoded.Contexts[0].Context.AuthInfo, "additional-auth not found: %v", decoded.Contexts)
s.Equalf("fake-context", decoded.Contexts[1].Name, "fake-context not found: %v", decoded.Contexts)
}) })
t.Run("configuration_view returns context info", func(t *testing.T) { s.Run("returns cluster info", func() {
if len(decoded.Contexts) != 1 { s.Lenf(decoded.Clusters, 2, "invalid cluster count, expected 2, got %v", len(decoded.Clusters))
t.Errorf("invalid context count, expected 1, got %v", len(decoded.Contexts)) s.Equalf("additional-cluster", decoded.Clusters[0].Name, "additional-cluster not found: %v", decoded.Clusters)
}
if decoded.Contexts[0].Name != "fake-context" {
t.Errorf("fake-context not found: %v", decoded.Contexts)
}
if decoded.Contexts[0].Context.Cluster != "fake" {
t.Errorf("fake-cluster not found: %v", decoded.Contexts)
}
if decoded.Contexts[0].Context.AuthInfo != "fake" {
t.Errorf("fake-auth not found: %v", decoded.Contexts)
}
}) })
t.Run("configuration_view returns cluster info", func(t *testing.T) { s.Run("configuration_view with minified=false returns auth info", func() {
if len(decoded.Clusters) != 1 { s.Lenf(decoded.AuthInfos, 2, "invalid auth info count, expected 2, got %v", len(decoded.AuthInfos))
t.Errorf("invalid cluster count, expected 1, got %v", len(decoded.Clusters)) s.Equalf("additional-auth", decoded.AuthInfos[0].Name, "additional-auth not found: %v", decoded.AuthInfos)
}
if decoded.Clusters[0].Name != "fake" {
t.Errorf("fake-cluster not found: %v", decoded.Clusters)
}
if decoded.Clusters[0].Cluster.Server != "https://127.0.0.1:6443" {
t.Errorf("fake-server not found: %v", decoded.Clusters)
}
})
t.Run("configuration_view returns auth info", func(t *testing.T) {
if len(decoded.AuthInfos) != 1 {
t.Errorf("invalid auth info count, expected 1, got %v", len(decoded.AuthInfos))
}
if decoded.AuthInfos[0].Name != "fake" {
t.Errorf("fake-auth not found: %v", decoded.AuthInfos)
}
})
toolResult, err = c.callTool("configuration_view", map[string]interface{}{
"minified": false,
})
t.Run("configuration_view with minified=false returns configuration", func(t *testing.T) {
if err != nil {
t.Fatalf("call tool failed %v", err)
}
})
err = yaml.Unmarshal([]byte(toolResult.Content[0].(mcp.TextContent).Text), &decoded)
t.Run("configuration_view with minified=false has yaml content", func(t *testing.T) {
if err != nil {
t.Fatalf("invalid tool result content %v", err)
}
})
t.Run("configuration_view with minified=false returns additional context info", func(t *testing.T) {
if len(decoded.Contexts) != 2 {
t.Fatalf("invalid context count, expected2, got %v", len(decoded.Contexts))
}
if decoded.Contexts[0].Name != "additional-context" {
t.Errorf("additional-context not found: %v", decoded.Contexts)
}
if decoded.Contexts[0].Context.Cluster != "additional-cluster" {
t.Errorf("additional-cluster not found: %v", decoded.Contexts)
}
if decoded.Contexts[0].Context.AuthInfo != "additional-auth" {
t.Errorf("additional-auth not found: %v", decoded.Contexts)
}
if decoded.Contexts[1].Name != "fake-context" {
t.Errorf("fake-context not found: %v", decoded.Contexts)
}
})
t.Run("configuration_view with minified=false returns cluster info", func(t *testing.T) {
if len(decoded.Clusters) != 2 {
t.Errorf("invalid cluster count, expected 2, got %v", len(decoded.Clusters))
}
if decoded.Clusters[0].Name != "additional-cluster" {
t.Errorf("additional-cluster not found: %v", decoded.Clusters)
}
})
t.Run("configuration_view with minified=false returns auth info", func(t *testing.T) {
if len(decoded.AuthInfos) != 2 {
t.Errorf("invalid auth info count, expected 2, got %v", len(decoded.AuthInfos))
}
if decoded.AuthInfos[0].Name != "additional-auth" {
t.Errorf("additional-auth not found: %v", decoded.AuthInfos)
}
}) })
}) })
} }
func TestConfigurationViewInCluster(t *testing.T) { func (s *ConfigurationSuite) TestConfigurationViewInCluster() {
kubernetes.InClusterConfig = func() (*rest.Config, error) { kubernetes.InClusterConfig = func() (*rest.Config, error) {
return &rest.Config{ return &rest.Config{
Host: "https://kubernetes.default.svc", Host: "https://kubernetes.default.svc",
BearerToken: "fake-token", BearerToken: "fake-token",
}, nil }, nil
} }
defer func() { s.T().Cleanup(func() { kubernetes.InClusterConfig = rest.InClusterConfig })
kubernetes.InClusterConfig = rest.InClusterConfig s.InitMcpClient()
}() s.Run("configuration_view", func() {
testCase(t, func(c *mcpContext) { toolResult, err := s.CallTool("configuration_view", map[string]interface{}{})
toolResult, err := c.callTool("configuration_view", map[string]interface{}{}) s.Run("returns configuration", func() {
t.Run("configuration_view returns configuration", func(t *testing.T) { s.Nilf(err, "call tool failed %v", err)
if err != nil {
t.Fatalf("call tool failed %v", err)
}
}) })
s.Require().NotNil(toolResult, "Expected tool result from call")
var decoded *v1.Config var decoded *v1.Config
err = yaml.Unmarshal([]byte(toolResult.Content[0].(mcp.TextContent).Text), &decoded) err = yaml.Unmarshal([]byte(toolResult.Content[0].(mcp.TextContent).Text), &decoded)
t.Run("configuration_view has yaml content", func(t *testing.T) { s.Run("has yaml content", func() {
if err != nil { s.Nilf(err, "invalid tool result content %v", err)
t.Fatalf("invalid tool result content %v", err)
}
}) })
t.Run("configuration_view returns current-context", func(t *testing.T) { s.Run("returns current-context", func() {
if decoded.CurrentContext != "context" { s.Equalf("context", decoded.CurrentContext, "context not found: %v", decoded.CurrentContext)
t.Fatalf("context not found: %v", decoded.CurrentContext)
}
}) })
t.Run("configuration_view returns context info", func(t *testing.T) { s.Run("returns context info", func() {
if len(decoded.Contexts) != 1 { s.Lenf(decoded.Contexts, 1, "invalid context count, expected 1, got %v", len(decoded.Contexts))
t.Fatalf("invalid context count, expected 1, got %v", len(decoded.Contexts)) s.Equalf("context", decoded.Contexts[0].Name, "context not found: %v", decoded.Contexts)
} s.Equalf("cluster", decoded.Contexts[0].Context.Cluster, "cluster not found: %v", decoded.Contexts)
if decoded.Contexts[0].Name != "context" { s.Equalf("user", decoded.Contexts[0].Context.AuthInfo, "user not found: %v", decoded.Contexts)
t.Fatalf("context not found: %v", decoded.Contexts)
}
if decoded.Contexts[0].Context.Cluster != "cluster" {
t.Fatalf("cluster not found: %v", decoded.Contexts)
}
if decoded.Contexts[0].Context.AuthInfo != "user" {
t.Fatalf("user not found: %v", decoded.Contexts)
}
}) })
t.Run("configuration_view returns cluster info", func(t *testing.T) { s.Run("returns cluster info", func() {
if len(decoded.Clusters) != 1 { s.Lenf(decoded.Clusters, 1, "invalid cluster count, expected 1, got %v", len(decoded.Clusters))
t.Fatalf("invalid cluster count, expected 1, got %v", len(decoded.Clusters)) s.Equalf("cluster", decoded.Clusters[0].Name, "cluster not found: %v", decoded.Clusters)
} s.Equalf("https://kubernetes.default.svc", decoded.Clusters[0].Cluster.Server, "server not found: %v", decoded.Clusters)
if decoded.Clusters[0].Name != "cluster" {
t.Fatalf("cluster not found: %v", decoded.Clusters)
}
if decoded.Clusters[0].Cluster.Server != "https://kubernetes.default.svc" {
t.Fatalf("server not found: %v", decoded.Clusters)
}
}) })
t.Run("configuration_view returns auth info", func(t *testing.T) { s.Run("returns auth info", func() {
if len(decoded.AuthInfos) != 1 { s.Lenf(decoded.AuthInfos, 1, "invalid auth info count, expected 1, got %v", len(decoded.AuthInfos))
t.Fatalf("invalid auth info count, expected 1, got %v", len(decoded.AuthInfos)) s.Equalf("user", decoded.AuthInfos[0].Name, "user not found: %v", decoded.AuthInfos)
}
if decoded.AuthInfos[0].Name != "user" {
t.Fatalf("user not found: %v", decoded.AuthInfos)
}
}) })
}) })
} }
func TestConfiguration(t *testing.T) {
suite.Run(t, new(ConfigurationSuite))
}