Adds Last-Event-ID (#449)
This commit is contained in:
@@ -35,7 +35,7 @@ type dockerProxy interface {
|
||||
type Client interface {
|
||||
ListContainers() ([]Container, error)
|
||||
FindContainer(string) (Container, error)
|
||||
ContainerLogs(context.Context, string, int) (<-chan string, <-chan error)
|
||||
ContainerLogs(context.Context, string, int, string) (<-chan string, <-chan error)
|
||||
Events(context.Context) (<-chan events.Message, <-chan error)
|
||||
ContainerLogsBetweenDates(context.Context, string, time.Time, time.Time) ([]string, error)
|
||||
}
|
||||
@@ -151,8 +151,17 @@ func logReader(reader io.ReadCloser, tty bool) func() (string, error) {
|
||||
}
|
||||
}
|
||||
|
||||
func (d *dockerClient) ContainerLogs(ctx context.Context, id string, tailSize int) (<-chan string, <-chan error) {
|
||||
options := types.ContainerLogsOptions{ShowStdout: true, ShowStderr: true, Follow: true, Tail: strconv.Itoa(tailSize), Timestamps: true}
|
||||
func (d *dockerClient) ContainerLogs(ctx context.Context, id string, tailSize int, since string) (<-chan string, <-chan error) {
|
||||
log.WithField("id", id).WithField("since", since).Debug("Streaming logs for container")
|
||||
|
||||
options := types.ContainerLogsOptions{
|
||||
ShowStdout: true,
|
||||
ShowStderr: true,
|
||||
Follow: true,
|
||||
Tail: strconv.Itoa(tailSize),
|
||||
Timestamps: true,
|
||||
Since: since,
|
||||
}
|
||||
reader, err := d.cli.ContainerLogs(ctx, id, options)
|
||||
errChannel := make(chan error, 1)
|
||||
|
||||
|
||||
@@ -120,14 +120,14 @@ func Test_dockerClient_ContainerLogs_happy(t *testing.T) {
|
||||
b = append(b, []byte(expected)...)
|
||||
|
||||
reader := ioutil.NopCloser(bytes.NewReader(b))
|
||||
options := types.ContainerLogsOptions{ShowStdout: true, ShowStderr: true, Follow: true, Tail: "300", Timestamps: true}
|
||||
options := types.ContainerLogsOptions{ShowStdout: true, ShowStderr: true, Follow: true, Tail: "300", Timestamps: true, Since: "since"}
|
||||
proxy.On("ContainerLogs", mock.Anything, id, options).Return(reader, nil)
|
||||
|
||||
json := types.ContainerJSON{Config: &container.Config{Tty: false}}
|
||||
proxy.On("ContainerInspect", mock.Anything, id).Return(json, nil)
|
||||
|
||||
client := &dockerClient{proxy, filters.NewArgs()}
|
||||
messages, _ := client.ContainerLogs(context.Background(), id, 300)
|
||||
messages, _ := client.ContainerLogs(context.Background(), id, 300, "since")
|
||||
|
||||
actual, _ := <-messages
|
||||
assert.Equal(t, expected, actual, "message doesn't match expected")
|
||||
@@ -151,7 +151,7 @@ func Test_dockerClient_ContainerLogs_happy_with_tty(t *testing.T) {
|
||||
proxy.On("ContainerInspect", mock.Anything, id).Return(json, nil)
|
||||
|
||||
client := &dockerClient{proxy, filters.NewArgs()}
|
||||
messages, _ := client.ContainerLogs(context.Background(), id, 300)
|
||||
messages, _ := client.ContainerLogs(context.Background(), id, 300, "")
|
||||
|
||||
actual, _ := <-messages
|
||||
assert.Equal(t, expected, actual, "message doesn't match expected")
|
||||
@@ -169,7 +169,7 @@ func Test_dockerClient_ContainerLogs_error(t *testing.T) {
|
||||
|
||||
client := &dockerClient{proxy, filters.NewArgs()}
|
||||
|
||||
messages, err := client.ContainerLogs(context.Background(), id, 300)
|
||||
messages, err := client.ContainerLogs(context.Background(), id, 300, "")
|
||||
|
||||
assert.Nil(t, messages, "messages should be nil")
|
||||
|
||||
|
||||
@@ -41,7 +41,7 @@ func (m *MockedClient) ListContainers() ([]docker.Container, error) {
|
||||
return containers, args.Error(1)
|
||||
}
|
||||
|
||||
func (m *MockedClient) ContainerLogs(ctx context.Context, id string, tailSize int) (<-chan string, <-chan error) {
|
||||
func (m *MockedClient) ContainerLogs(ctx context.Context, id string, tailSize int, since string) (<-chan string, <-chan error) {
|
||||
args := m.Called(ctx, id, tailSize)
|
||||
channel, ok := args.Get(0).(chan string)
|
||||
if !ok {
|
||||
|
||||
16
routes.go
16
routes.go
@@ -7,6 +7,7 @@ import (
|
||||
"io"
|
||||
"net/http"
|
||||
"runtime"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/gorilla/mux"
|
||||
@@ -114,14 +115,12 @@ func (h *handler) streamLogs(w http.ResponseWriter, r *http.Request) {
|
||||
return
|
||||
}
|
||||
|
||||
messages, err := h.client.ContainerLogs(r.Context(), container.ID, tailSize)
|
||||
messages, err := h.client.ContainerLogs(r.Context(), container.ID, tailSize, r.Header.Get("Last-Event-ID"))
|
||||
|
||||
w.Header().Set("Content-Type", "text/event-stream")
|
||||
w.Header().Set("Cache-Control", "no-cache")
|
||||
w.Header().Set("Connection", "keep-alive")
|
||||
w.Header().Set("X-Accel-Buffering", "no")
|
||||
|
||||
log.Debugf("Starting to stream logs for %s", id)
|
||||
Loop:
|
||||
for {
|
||||
select {
|
||||
@@ -129,11 +128,14 @@ Loop:
|
||||
if !ok {
|
||||
break Loop
|
||||
}
|
||||
_, e := fmt.Fprintf(w, "data: %s\n\n", message)
|
||||
if e != nil {
|
||||
log.Debugf("Error while writing to log stream: %v", e)
|
||||
break Loop
|
||||
fmt.Fprintf(w, "data: %s\n", message)
|
||||
if index := strings.IndexAny(message, " "); index != -1 {
|
||||
id := message[:index]
|
||||
if _, err := time.Parse(time.RFC3339Nano, id); err == nil {
|
||||
fmt.Fprintf(w, "id: %s\n", id)
|
||||
}
|
||||
}
|
||||
fmt.Fprintf(w, "\n")
|
||||
f.Flush()
|
||||
case e := <-err:
|
||||
if e == io.EOF {
|
||||
|
||||
Reference in New Issue
Block a user