mirror of
https://github.com/containers/kubernetes-mcp-server.git
synced 2025-10-23 01:22:57 +03:00
feat(http): add graceful shutdown of http server by catching interruption signals (164)
Move http serving under its specific dir --- Add gracefully shutdown for http server
This commit is contained in:
@@ -1,61 +1,71 @@
|
||||
package http
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"net"
|
||||
"context"
|
||||
"errors"
|
||||
"net/http"
|
||||
"os"
|
||||
"os/signal"
|
||||
"syscall"
|
||||
"time"
|
||||
|
||||
"k8s.io/klog/v2"
|
||||
|
||||
"github.com/manusa/kubernetes-mcp-server/pkg/mcp"
|
||||
)
|
||||
|
||||
func RequestMiddleware(next http.Handler) http.Handler {
|
||||
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
start := time.Now()
|
||||
func Serve(ctx context.Context, mcpServer *mcp.Server, port, sseBaseUrl string) error {
|
||||
mux := http.NewServeMux()
|
||||
wrappedMux := RequestMiddleware(mux)
|
||||
|
||||
lrw := &loggingResponseWriter{
|
||||
ResponseWriter: w,
|
||||
statusCode: http.StatusOK,
|
||||
}
|
||||
httpServer := &http.Server{
|
||||
Addr: ":" + port,
|
||||
Handler: wrappedMux,
|
||||
}
|
||||
|
||||
next.ServeHTTP(lrw, r)
|
||||
|
||||
duration := time.Since(start)
|
||||
klog.V(5).Infof("%s %s %d %v", r.Method, r.URL.Path, lrw.statusCode, duration)
|
||||
sseServer := mcpServer.ServeSse(sseBaseUrl, httpServer)
|
||||
streamableHttpServer := mcpServer.ServeHTTP(httpServer)
|
||||
mux.Handle("/sse", sseServer)
|
||||
mux.Handle("/message", sseServer)
|
||||
mux.Handle("/mcp", streamableHttpServer)
|
||||
mux.HandleFunc("/healthz", func(w http.ResponseWriter, r *http.Request) {
|
||||
w.WriteHeader(http.StatusOK)
|
||||
})
|
||||
}
|
||||
|
||||
type loggingResponseWriter struct {
|
||||
http.ResponseWriter
|
||||
statusCode int
|
||||
headerWritten bool
|
||||
}
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
defer cancel()
|
||||
|
||||
func (lrw *loggingResponseWriter) WriteHeader(code int) {
|
||||
if !lrw.headerWritten {
|
||||
lrw.statusCode = code
|
||||
lrw.headerWritten = true
|
||||
lrw.ResponseWriter.WriteHeader(code)
|
||||
sigChan := make(chan os.Signal, 1)
|
||||
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
|
||||
|
||||
serverErr := make(chan error, 1)
|
||||
go func() {
|
||||
klog.V(0).Infof("Streaming and SSE HTTP servers starting on port %s and paths /mcp, /sse, /message", port)
|
||||
if err := httpServer.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) {
|
||||
serverErr <- err
|
||||
}
|
||||
}()
|
||||
|
||||
select {
|
||||
case sig := <-sigChan:
|
||||
klog.V(0).Infof("Received signal %v, initiating graceful shutdown", sig)
|
||||
cancel()
|
||||
case <-ctx.Done():
|
||||
klog.V(0).Infof("Context cancelled, initiating graceful shutdown")
|
||||
case err := <-serverErr:
|
||||
klog.Errorf("HTTP server error: %v", err)
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
func (lrw *loggingResponseWriter) Write(b []byte) (int, error) {
|
||||
if !lrw.headerWritten {
|
||||
lrw.statusCode = http.StatusOK
|
||||
lrw.headerWritten = true
|
||||
}
|
||||
return lrw.ResponseWriter.Write(b)
|
||||
}
|
||||
shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), 10*time.Second)
|
||||
defer shutdownCancel()
|
||||
|
||||
func (lrw *loggingResponseWriter) Flush() {
|
||||
if flusher, ok := lrw.ResponseWriter.(http.Flusher); ok {
|
||||
flusher.Flush()
|
||||
klog.V(0).Infof("Shutting down HTTP server gracefully...")
|
||||
if err := httpServer.Shutdown(shutdownCtx); err != nil {
|
||||
klog.Errorf("HTTP server shutdown error: %v", err)
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
func (lrw *loggingResponseWriter) Hijack() (net.Conn, *bufio.ReadWriter, error) {
|
||||
if hijacker, ok := lrw.ResponseWriter.(http.Hijacker); ok {
|
||||
return hijacker.Hijack()
|
||||
}
|
||||
return nil, nil, http.ErrNotSupported
|
||||
klog.V(0).Infof("HTTP server shutdown complete")
|
||||
return nil
|
||||
}
|
||||
|
||||
61
pkg/http/middleware.go
Normal file
61
pkg/http/middleware.go
Normal file
@@ -0,0 +1,61 @@
|
||||
package http
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"net"
|
||||
"net/http"
|
||||
"time"
|
||||
|
||||
"k8s.io/klog/v2"
|
||||
)
|
||||
|
||||
func RequestMiddleware(next http.Handler) http.Handler {
|
||||
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
start := time.Now()
|
||||
|
||||
lrw := &loggingResponseWriter{
|
||||
ResponseWriter: w,
|
||||
statusCode: http.StatusOK,
|
||||
}
|
||||
|
||||
next.ServeHTTP(lrw, r)
|
||||
|
||||
duration := time.Since(start)
|
||||
klog.V(5).Infof("%s %s %d %v", r.Method, r.URL.Path, lrw.statusCode, duration)
|
||||
})
|
||||
}
|
||||
|
||||
type loggingResponseWriter struct {
|
||||
http.ResponseWriter
|
||||
statusCode int
|
||||
headerWritten bool
|
||||
}
|
||||
|
||||
func (lrw *loggingResponseWriter) WriteHeader(code int) {
|
||||
if !lrw.headerWritten {
|
||||
lrw.statusCode = code
|
||||
lrw.headerWritten = true
|
||||
lrw.ResponseWriter.WriteHeader(code)
|
||||
}
|
||||
}
|
||||
|
||||
func (lrw *loggingResponseWriter) Write(b []byte) (int, error) {
|
||||
if !lrw.headerWritten {
|
||||
lrw.statusCode = http.StatusOK
|
||||
lrw.headerWritten = true
|
||||
}
|
||||
return lrw.ResponseWriter.Write(b)
|
||||
}
|
||||
|
||||
func (lrw *loggingResponseWriter) Flush() {
|
||||
if flusher, ok := lrw.ResponseWriter.(http.Flusher); ok {
|
||||
flusher.Flush()
|
||||
}
|
||||
}
|
||||
|
||||
func (lrw *loggingResponseWriter) Hijack() (net.Conn, *bufio.ReadWriter, error) {
|
||||
if hijacker, ok := lrw.ResponseWriter.(http.Hijacker); ok {
|
||||
return hijacker.Hijack()
|
||||
}
|
||||
return nil, nil, http.ErrNotSupported
|
||||
}
|
||||
@@ -5,7 +5,6 @@ import (
|
||||
"errors"
|
||||
"flag"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"strconv"
|
||||
"strings"
|
||||
|
||||
@@ -206,28 +205,8 @@ func (m *MCPServerOptions) Run() error {
|
||||
defer mcpServer.Close()
|
||||
|
||||
if m.StaticConfig.Port != "" {
|
||||
mux := http.NewServeMux()
|
||||
wrappedMux := internalhttp.RequestMiddleware(mux)
|
||||
|
||||
httpServer := &http.Server{
|
||||
Addr: ":" + m.StaticConfig.Port,
|
||||
Handler: wrappedMux,
|
||||
}
|
||||
|
||||
sseServer := mcpServer.ServeSse(m.SSEBaseUrl, httpServer)
|
||||
streamableHttpServer := mcpServer.ServeHTTP(httpServer)
|
||||
mux.Handle("/sse", sseServer)
|
||||
mux.Handle("/message", sseServer)
|
||||
mux.Handle("/mcp", streamableHttpServer)
|
||||
mux.HandleFunc("/healthz", func(w http.ResponseWriter, r *http.Request) {
|
||||
w.WriteHeader(http.StatusOK)
|
||||
})
|
||||
|
||||
klog.V(0).Infof("Streaming and SSE HTTP servers starting on port %s and paths /mcp, /sse, /message", m.StaticConfig.Port)
|
||||
if err := httpServer.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
ctx := context.Background()
|
||||
return internalhttp.Serve(ctx, mcpServer, m.StaticConfig.Port, m.SSEBaseUrl)
|
||||
}
|
||||
|
||||
if err := mcpServer.ServeStdio(); err != nil && !errors.Is(err, context.Canceled) {
|
||||
|
||||
Reference in New Issue
Block a user