mirror of
https://github.com/openshift/openshift-mcp-server.git
synced 2025-10-17 14:27:48 +03:00
feat: watch for configuration changes
Watch kube config files for changes. Automatically reload kubernetes client and list of tools. Useful for logins or context changes after an MCP session has started.
This commit is contained in:
@@ -42,6 +42,7 @@ Kubernetes Model Context Protocol (MCP) server
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
defer mcpServer.Close()
|
||||
|
||||
var sseServer *server.SSEServer
|
||||
if ssePort := viper.GetInt("sse-port"); ssePort > 0 {
|
||||
@@ -49,13 +50,11 @@ Kubernetes Model Context Protocol (MCP) server
|
||||
if err := sseServer.Start(fmt.Sprintf(":%d", ssePort)); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
defer sseServer.Shutdown(cmd.Context())
|
||||
}
|
||||
if err := mcpServer.ServeStdio(); err != nil && !errors.Is(err, context.Canceled) {
|
||||
panic(err)
|
||||
}
|
||||
if sseServer != nil {
|
||||
_ = sseServer.Shutdown(cmd.Context())
|
||||
}
|
||||
},
|
||||
}
|
||||
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
package kubernetes
|
||||
|
||||
import (
|
||||
"github.com/fsnotify/fsnotify"
|
||||
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
|
||||
"k8s.io/client-go/discovery"
|
||||
"k8s.io/client-go/discovery/cached/memory"
|
||||
@@ -17,8 +18,12 @@ import (
|
||||
// Exposed for testing
|
||||
var InClusterConfig = rest.InClusterConfig
|
||||
|
||||
type CloseWatchKubeConfig func() error
|
||||
|
||||
type Kubernetes struct {
|
||||
cfg *rest.Config
|
||||
kubeConfigFiles []string
|
||||
CloseWatchKubeConfig CloseWatchKubeConfig
|
||||
clientSet *kubernetes.Clientset
|
||||
discoveryClient *discovery.DiscoveryClient
|
||||
deferredDiscoveryRESTMapper *restmapper.DeferredDiscoveryRESTMapper
|
||||
@@ -44,6 +49,7 @@ func NewKubernetes() (*Kubernetes, error) {
|
||||
}
|
||||
return &Kubernetes{
|
||||
cfg: cfg,
|
||||
kubeConfigFiles: resolveConfig().ConfigAccess().GetLoadingPrecedence(),
|
||||
clientSet: clientSet,
|
||||
discoveryClient: discoveryClient,
|
||||
deferredDiscoveryRESTMapper: restmapper.NewDeferredDiscoveryRESTMapper(memory.NewMemCacheClient(discoveryClient)),
|
||||
@@ -51,6 +57,44 @@ func NewKubernetes() (*Kubernetes, error) {
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (k *Kubernetes) WatchKubeConfig(onKubeConfigChange func() error) {
|
||||
if len(k.kubeConfigFiles) == 0 {
|
||||
return
|
||||
}
|
||||
watcher, err := fsnotify.NewWatcher()
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
for _, file := range k.kubeConfigFiles {
|
||||
_ = watcher.Add(file)
|
||||
}
|
||||
go func() {
|
||||
for {
|
||||
select {
|
||||
case _, ok := <-watcher.Events:
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
_ = onKubeConfigChange()
|
||||
case _, ok := <-watcher.Errors:
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
}()
|
||||
if k.CloseWatchKubeConfig != nil {
|
||||
_ = k.CloseWatchKubeConfig()
|
||||
}
|
||||
k.CloseWatchKubeConfig = watcher.Close
|
||||
}
|
||||
|
||||
func (k *Kubernetes) Close() {
|
||||
if k.CloseWatchKubeConfig != nil {
|
||||
_ = k.CloseWatchKubeConfig()
|
||||
}
|
||||
}
|
||||
|
||||
func marshal(v any) (string, error) {
|
||||
switch t := v.(type) {
|
||||
case []unstructured.Unstructured:
|
||||
|
||||
@@ -129,6 +129,7 @@ func (c *mcpContext) beforeEach(t *testing.T) {
|
||||
|
||||
func (c *mcpContext) afterEach() {
|
||||
c.cancel()
|
||||
c.mcpServer.Close()
|
||||
_ = c.mcpClient.Close()
|
||||
c.mcpHttpServer.Close()
|
||||
}
|
||||
|
||||
@@ -27,6 +27,7 @@ func NewSever() (*Server, error) {
|
||||
if err := s.reloadKubernetesClient(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
s.k.WatchKubeConfig(s.reloadKubernetesClient)
|
||||
return s, nil
|
||||
}
|
||||
|
||||
@@ -57,6 +58,12 @@ func (s *Server) ServeSse(baseUrl string) *server.SSEServer {
|
||||
return server.NewSSEServer(s.server, options...)
|
||||
}
|
||||
|
||||
func (s *Server) Close() {
|
||||
if s.k != nil {
|
||||
s.k.Close()
|
||||
}
|
||||
}
|
||||
|
||||
func NewTextResult(content string, err error) *mcp.CallToolResult {
|
||||
if err != nil {
|
||||
return &mcp.CallToolResult{
|
||||
|
||||
@@ -1,12 +1,51 @@
|
||||
package mcp
|
||||
|
||||
import (
|
||||
"context"
|
||||
"github.com/mark3labs/mcp-go/mcp"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"slices"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
func TestWatchKubeConfig(t *testing.T) {
|
||||
testCase(t, func(c *mcpContext) {
|
||||
// Given
|
||||
withTimeout, cancel := context.WithTimeout(c.ctx, 5*time.Second)
|
||||
defer cancel()
|
||||
var notification *mcp.JSONRPCNotification
|
||||
c.mcpClient.OnNotification(func(n mcp.JSONRPCNotification) {
|
||||
notification = &n
|
||||
})
|
||||
// When
|
||||
f, _ := os.OpenFile(filepath.Join(c.tempDir, "config"), os.O_APPEND|os.O_WRONLY, 0644)
|
||||
_, _ = f.WriteString("\n")
|
||||
for {
|
||||
if notification != nil {
|
||||
break
|
||||
}
|
||||
select {
|
||||
case <-withTimeout.Done():
|
||||
break
|
||||
default:
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
}
|
||||
}
|
||||
// Then
|
||||
t.Run("WatchKubeConfig notifies tools change", func(t *testing.T) {
|
||||
if notification == nil {
|
||||
t.Fatalf("WatchKubeConfig did not notify")
|
||||
}
|
||||
if notification.Method != "notifications/tools/list_changed" {
|
||||
t.Fatalf("WatchKubeConfig did not notify tools change, got %s", notification.Method)
|
||||
}
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
func TestTools(t *testing.T) {
|
||||
expectedNames := []string{
|
||||
"configuration_view",
|
||||
|
||||
Reference in New Issue
Block a user