Watch for new pods when logs --follow (#6914)

* Watch for new pods when logs --follow

* Fix integration tests

* Implement --follow for podman platform

* Add integration test
This commit is contained in:
Philippe Martin
2023-06-28 14:21:04 +02:00
committed by GitHub
parent c8a1414926
commit f276d0d77b
9 changed files with 239 additions and 21 deletions

View File

@@ -9,6 +9,7 @@ import (
"github.com/google/go-cmp/cmp"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/watch"
)
const (
@@ -48,6 +49,10 @@ func (o fakePlatform) GetPodUsingComponentName(componentName string) (*corev1.Po
panic("not implemented yet")
}
func (o fakePlatform) PodWatcher(ctx context.Context, selector string) (watch.Interface, error) {
return nil, nil
}
func TestExecuteCommand(t *testing.T) {
for _, tt := range []struct {
name string

View File

@@ -6,6 +6,7 @@ import (
"io"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/watch"
odolabels "github.com/redhat-developer/odo/pkg/labels"
odocontext "github.com/redhat-developer/odo/pkg/odo/context"
@@ -17,8 +18,9 @@ type LogsClient struct {
}
type ContainerLogs struct {
Name string
Logs io.ReadCloser
PodName string
ContainerName string
Logs io.ReadCloser
}
type Events struct {
@@ -80,7 +82,11 @@ func (o *LogsClient) getLogsForMode(
if err != nil {
events.Err <- fmt.Errorf("failed to get logs for container %s; error: %v", container.Name, err)
}
events.Logs <- ContainerLogs{container.Name, containerLogs}
events.Logs <- ContainerLogs{
PodName: pod.GetName(),
ContainerName: container.Name,
Logs: containerLogs,
}
}
case err := <-errChan:
events.Err <- err
@@ -92,18 +98,42 @@ func (o *LogsClient) getLogsForMode(
appname := odocontext.GetApplication(ctx)
if mode == odolabels.ComponentDevMode || mode == odolabels.ComponentAnyMode {
selector = odolabels.GetSelector(componentName, appname, odolabels.ComponentDevMode, false)
err := o.getPodsForSelector(selector, namespace, podChan)
getPods := func() error {
if mode == odolabels.ComponentDevMode || mode == odolabels.ComponentAnyMode {
selector = odolabels.GetSelector(componentName, appname, odolabels.ComponentDevMode, false)
err := o.getPodsForSelector(selector, namespace, podChan)
if err != nil {
return err
}
}
if mode == odolabels.ComponentDeployMode || mode == odolabels.ComponentAnyMode {
selector = odolabels.GetSelector(componentName, appname, odolabels.ComponentDeployMode, false)
err := o.getPodsForSelector(selector, namespace, podChan)
if err != nil {
return err
}
}
return nil
}
err := getPods()
if err != nil {
errChan <- err
}
if follow {
podWatcher, err := o.platformClient.PodWatcher(ctx, "")
if err != nil {
errChan <- err
}
}
if mode == odolabels.ComponentDeployMode || mode == odolabels.ComponentAnyMode {
selector = odolabels.GetSelector(componentName, appname, odolabels.ComponentDeployMode, false)
err := o.getPodsForSelector(selector, namespace, podChan)
if err != nil {
errChan <- err
for ev := range podWatcher.ResultChan() {
switch ev.Type {
case watch.Added, watch.Modified:
err = getPods()
if err != nil {
errChan <- err
}
}
}
}
@@ -125,7 +155,9 @@ func (o *LogsClient) getPodsForSelector(
return err
}
for _, pod := range podList.Items {
pods[pod.GetName()] = struct{}{}
if pod.Status.Phase == "Running" {
pods[pod.GetName()] = struct{}{}
}
}
// get all pods in the namespace
@@ -139,11 +171,15 @@ func (o *LogsClient) getPodsForSelector(
// Pod's logs have already been displayed to user
continue
}
podList.Items = append(podList.Items, pod)
if pod.Status.Phase == "Running" {
podList.Items = append(podList.Items, pod)
}
}
for _, pod := range podList.Items {
podChan <- pod
if pod.Status.Phase == "Running" {
podChan <- pod
}
}
return nil

View File

@@ -152,14 +152,36 @@ func (o *LogsOptions) Run(ctx context.Context) error {
errChan := make(chan error) // errors are put on this channel
var mu sync.Mutex
displayedLogs := map[string]struct{}{}
for {
select {
case containerLogs := <-events.Logs:
uniqueName := getUniqueContainerName(containerLogs.Name, uniqueContainerNames)
podContainerName := fmt.Sprintf("%s-%s", containerLogs.PodName, containerLogs.ContainerName)
if _, ok := displayedLogs[podContainerName]; ok {
continue
}
displayedLogs[podContainerName] = struct{}{}
uniqueName := getUniqueContainerName(containerLogs.ContainerName, uniqueContainerNames)
uniqueContainerNames[uniqueName] = struct{}{}
colour := log.ColorPicker()
logs := containerLogs.Logs
func() {
mu.Lock()
defer mu.Unlock()
color.Set(colour)
defer color.Unset()
help := ""
if uniqueName != containerLogs.ContainerName {
help = fmt.Sprintf(" (%s)", uniqueName)
}
_, err = fmt.Fprintf(o.out, "--> Logs for %s / %s%s\n", containerLogs.PodName, containerLogs.ContainerName, help)
if err != nil {
errChan <- err
}
}()
if o.follow {
atomic.AddInt64(&goroutines.count, 1)
go func(out io.Writer) {
@@ -170,6 +192,7 @@ func (o *LogsOptions) Run(ctx context.Context) error {
if err != nil {
errChan <- err
}
delete(displayedLogs, podContainerName)
events.Done <- struct{}{}
}(o.out)
} else {
@@ -183,7 +206,7 @@ func (o *LogsOptions) Run(ctx context.Context) error {
case err = <-events.Err:
return err
case <-events.Done:
if goroutines.count == 0 {
if !o.follow && goroutines.count == 0 {
if len(uniqueContainerNames) == 0 {
// This will be the case when:
// 1. user specifies --dev flag, but the component's running in Deploy mode

View File

@@ -6,6 +6,7 @@ import (
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/watch"
)
// Client is the interface that wraps operations that can be performed on any supported platform.
@@ -33,4 +34,6 @@ type Client interface {
GetRunningPodFromSelector(selector string) (*corev1.Pod, error)
GetPodUsingComponentName(componentName string) (*corev1.Pod, error)
PodWatcher(ctx context.Context, selector string) (watch.Interface, error)
}

View File

@@ -12,6 +12,7 @@ import (
gomock "github.com/golang/mock/gomock"
v1 "k8s.io/api/core/v1"
unstructured "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
watch "k8s.io/apimachinery/pkg/watch"
)
// MockClient is a mock of Client interface.
@@ -140,3 +141,18 @@ func (mr *MockClientMockRecorder) GetRunningPodFromSelector(selector interface{}
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetRunningPodFromSelector", reflect.TypeOf((*MockClient)(nil).GetRunningPodFromSelector), selector)
}
// PodWatcher mocks base method.
func (m *MockClient) PodWatcher(ctx context.Context, selector string) (watch.Interface, error) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "PodWatcher", ctx, selector)
ret0, _ := ret[0].(watch.Interface)
ret1, _ := ret[1].(error)
return ret0, ret1
}
// PodWatcher indicates an expected call of PodWatcher.
func (mr *MockClientMockRecorder) PodWatcher(ctx, selector interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "PodWatcher", reflect.TypeOf((*MockClient)(nil).PodWatcher), ctx, selector)
}

View File

@@ -13,6 +13,7 @@ import (
api "github.com/redhat-developer/odo/pkg/api"
v1 "k8s.io/api/core/v1"
unstructured "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
watch "k8s.io/apimachinery/pkg/watch"
)
// MockClient is a mock of Client interface.
@@ -243,6 +244,21 @@ func (mr *MockClientMockRecorder) PodStop(podname interface{}) *gomock.Call {
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "PodStop", reflect.TypeOf((*MockClient)(nil).PodStop), podname)
}
// PodWatcher mocks base method.
func (m *MockClient) PodWatcher(ctx context.Context, selector string) (watch.Interface, error) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "PodWatcher", ctx, selector)
ret0, _ := ret[0].(watch.Interface)
ret1, _ := ret[1].(error)
return ret0, ret1
}
// PodWatcher indicates an expected call of PodWatcher.
func (mr *MockClientMockRecorder) PodWatcher(ctx, selector interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "PodWatcher", reflect.TypeOf((*MockClient)(nil).PodWatcher), ctx, selector)
}
// Version mocks base method.
func (m *MockClient) Version(ctx context.Context) (SystemVersionReport, error) {
m.ctrl.T.Helper()

View File

@@ -1,13 +1,18 @@
package podman
import (
"bufio"
"bytes"
"context"
"encoding/json"
"fmt"
"os/exec"
"strings"
"time"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/klog"
"github.com/redhat-developer/odo/pkg/platform"
@@ -24,7 +29,8 @@ func (o *PodmanCli) GetPodsMatchingSelector(selector string) (*corev1.PodList, e
for _, podReport := range podsReport {
pod, err := o.KubeGenerate(podReport.Name)
if err != nil {
return nil, err
// The pod has disappeared in the meantime, forget it
continue
}
// We remove the podname- prefix from the container names as Podman adds this prefix
// (to avoid colliding container names?)
@@ -33,6 +39,13 @@ func (o *PodmanCli) GetPodsMatchingSelector(selector string) (*corev1.PodList, e
prefix := pod.GetName() + "-"
container.Name = strings.TrimPrefix(container.Name, prefix)
}
inspect, err := o.PodInspect(podReport.Name)
if err != nil {
// The pod has disappeared in the meantime, forget it
continue
}
pod.Status.Phase = corev1.PodPhase(inspect.State)
result.Items = append(result.Items, *pod)
}
return &result, nil
@@ -129,3 +142,67 @@ func (o *PodmanCli) getPodsFromSelector(selector string) ([]ListPodsReport, erro
}
return list, nil
}
type podWatcher struct {
stop chan struct{}
pods map[string]struct{}
events chan watch.Event
}
func (o *PodmanCli) PodWatcher(ctx context.Context, selector string) (watch.Interface, error) {
watcher := podWatcher{
stop: make(chan struct{}),
pods: make(map[string]struct{}),
events: make(chan watch.Event),
}
go watcher.watch(o.podmanCmd, o.containerRunGlobalExtraArgs)
return watcher, nil
}
func (o podWatcher) watch(podmanCmd string, containerRunGlobalExtraArgs []string) {
args := []string{"ps", "--quiet"}
args = append(containerRunGlobalExtraArgs, args...)
ticker := time.NewTicker(3 * time.Second)
for {
select {
case <-o.stop:
return
case <-ticker.C:
cmd := exec.Command(podmanCmd, args...)
out, err := cmd.Output()
if err != nil {
klog.V(4).Infof("error getting containers from podman: %s", err)
continue
}
scanner := bufio.NewScanner(bytes.NewReader(out))
currentPods := make(map[string]struct{})
for scanner.Scan() {
podName := scanner.Text()
currentPods[podName] = struct{}{}
if _, ok := o.pods[podName]; !ok {
o.events <- watch.Event{
Type: watch.Added,
}
o.pods[podName] = struct{}{}
}
}
for p := range o.pods {
if _, ok := currentPods[p]; !ok {
o.events <- watch.Event{
Type: watch.Deleted,
}
delete(o.pods, p)
}
}
}
}
}
func (o podWatcher) Stop() {
o.stop <- struct{}{}
}
func (o podWatcher) ResultChan() <-chan watch.Event {
return o.events
}

View File

@@ -59,8 +59,8 @@ commands:
components:
- container:
endpoints:
- name: http-3000
targetPort: 3000
- name: http-8080
targetPort: 8080
image: registry.access.redhat.com/ubi8/nodejs-14:latest
memoryLimit: 1024Mi
mountSources: true

View File

@@ -101,7 +101,7 @@ var _ = Describe("odo logs command tests", func() {
cmd := getLogCommand(podman)
out := cmd.ShouldPass().Out()
Expect(out).To(ContainSubstring(noContainersRunning))
cmd = getLogCommand(podman, "--follow")
cmd = getLogCommand(podman)
out = cmd.ShouldPass().Out()
Expect(out).To(ContainSubstring(noContainersRunning))
})
@@ -182,6 +182,48 @@ var _ = Describe("odo logs command tests", func() {
})
})
}))
When("logs --follow is started", func() {
var logsSession helper.LogsSession
var err error
BeforeEach(func() {
logsSession, _, _, err = helper.StartLogsFollow(podman, "--dev")
Expect(err).ToNot(HaveOccurred())
})
AfterEach(func() {
logsSession.Kill()
})
When("running in Dev mode", helper.LabelPodmanIf(podman, func() {
var devSession helper.DevSession
BeforeEach(func() {
var err error
devSession, err = helper.StartDevMode(helper.DevSessionOpts{
RunOnPodman: podman,
})
Expect(err).ToNot(HaveOccurred())
if !podman {
// We need to wait for the pod deployed as a Kubernetes component
Eventually(func() bool {
return areAllPodsRunning()
}).Should(Equal(true))
}
})
AfterEach(func() {
devSession.Stop()
devSession.WaitEnd()
})
It("should successfully follow logs of running component", func() {
Eventually(func() bool {
logs := logsSession.OutContents()
return strings.Contains(string(logs), "Server running on")
}, 20*time.Second, 5).Should(BeTrue())
})
}))
})
}
When("running in Deploy mode", func() {