241 lines
6.1 KiB
Go
241 lines
6.1 KiB
Go
package docker
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"fmt"
|
|
"io"
|
|
"sort"
|
|
"strconv"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/docker/docker/api/types"
|
|
"github.com/docker/docker/api/types/events"
|
|
"github.com/docker/docker/api/types/filters"
|
|
"github.com/docker/docker/client"
|
|
log "github.com/sirupsen/logrus"
|
|
)
|
|
|
|
type dockerClient struct {
|
|
cli dockerProxy
|
|
filters filters.Args
|
|
}
|
|
|
|
type dockerProxy interface {
|
|
ContainerList(context.Context, types.ContainerListOptions) ([]types.Container, error)
|
|
ContainerLogs(context.Context, string, types.ContainerLogsOptions) (io.ReadCloser, error)
|
|
Events(context.Context, types.EventsOptions) (<-chan events.Message, <-chan error)
|
|
ContainerInspect(ctx context.Context, containerID string) (types.ContainerJSON, error)
|
|
ContainerStats(ctx context.Context, containerID string, stream bool) (types.ContainerStats, error)
|
|
}
|
|
|
|
// Client is a proxy around the docker client
|
|
type Client interface {
|
|
ListContainers() ([]Container, error)
|
|
FindContainer(string) (Container, error)
|
|
ContainerLogs(context.Context, string, int, string) (io.ReadCloser, error)
|
|
Events(context.Context) (<-chan ContainerEvent, <-chan error)
|
|
ContainerLogsBetweenDates(context.Context, string, time.Time, time.Time) (io.ReadCloser, error)
|
|
ContainerStats(context.Context, string, chan<- ContainerStat) error
|
|
}
|
|
|
|
// NewClient creates a new instance of Client
|
|
func NewClient() Client {
|
|
return NewClientWithFilters(map[string]string{})
|
|
}
|
|
|
|
// NewClientWithFilters creates a new instance of Client with docker filters
|
|
func NewClientWithFilters(f map[string]string) Client {
|
|
filterArgs := filters.NewArgs()
|
|
for k, v := range f {
|
|
filterArgs.Add(k, v)
|
|
}
|
|
|
|
log.Debugf("filterArgs = %v", filterArgs)
|
|
|
|
cli, err := client.NewClientWithOpts(client.FromEnv, client.WithAPIVersionNegotiation())
|
|
|
|
if err != nil {
|
|
log.Fatal(err)
|
|
}
|
|
|
|
return &dockerClient{cli, filterArgs}
|
|
}
|
|
|
|
func (d *dockerClient) FindContainer(id string) (Container, error) {
|
|
var container Container
|
|
containers, err := d.ListContainers()
|
|
if err != nil {
|
|
return container, err
|
|
}
|
|
|
|
found := false
|
|
for _, c := range containers {
|
|
if c.ID == id {
|
|
container = c
|
|
found = true
|
|
break
|
|
}
|
|
}
|
|
if found == false {
|
|
return container, fmt.Errorf("Unable to find container with id: %s", id)
|
|
}
|
|
|
|
return container, nil
|
|
}
|
|
|
|
func (d *dockerClient) ListContainers() ([]Container, error) {
|
|
containerListOptions := types.ContainerListOptions{
|
|
Filters: d.filters,
|
|
All: true,
|
|
}
|
|
list, err := d.cli.ContainerList(context.Background(), containerListOptions)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
var containers = make([]Container, 0, len(list))
|
|
for _, c := range list {
|
|
container := Container{
|
|
ID: c.ID[:12],
|
|
Names: c.Names,
|
|
Name: strings.TrimPrefix(c.Names[0], "/"),
|
|
Image: c.Image,
|
|
ImageID: c.ImageID,
|
|
Command: c.Command,
|
|
Created: c.Created,
|
|
State: c.State,
|
|
Status: c.Status,
|
|
}
|
|
containers = append(containers, container)
|
|
}
|
|
|
|
sort.Slice(containers, func(i, j int) bool {
|
|
return strings.ToLower(containers[i].Name) < strings.ToLower(containers[j].Name)
|
|
})
|
|
|
|
return containers, nil
|
|
}
|
|
|
|
func (d *dockerClient) ContainerStats(ctx context.Context, id string, stats chan<- ContainerStat) error {
|
|
response, err := d.cli.ContainerStats(ctx, id, true)
|
|
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
go func() {
|
|
log.Debugf("starting to stream stats for: %s", id)
|
|
defer response.Body.Close()
|
|
decoder := json.NewDecoder(response.Body)
|
|
var v *types.StatsJSON
|
|
for {
|
|
if err := decoder.Decode(&v); err != nil {
|
|
if err == context.Canceled || err == io.EOF {
|
|
log.Debugf("stopping stats streaming for container %s", id)
|
|
return
|
|
}
|
|
log.Errorf("decoder for stats api returned an unknown error %v", err)
|
|
}
|
|
|
|
var (
|
|
cpuDelta = float64(v.CPUStats.CPUUsage.TotalUsage) - float64(v.PreCPUStats.CPUUsage.TotalUsage)
|
|
systemDelta = float64(v.CPUStats.SystemUsage) - float64(v.PreCPUStats.SystemUsage)
|
|
cpuPercent = int64((cpuDelta / systemDelta) * float64(len(v.CPUStats.CPUUsage.PercpuUsage)) * 100)
|
|
memUsage = int64(v.MemoryStats.Usage - v.MemoryStats.Stats["cache"])
|
|
memPercent = int64(float64(memUsage) / float64(v.MemoryStats.Limit) * 100)
|
|
)
|
|
|
|
if cpuPercent > 0 || memUsage > 0 {
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
case stats <- ContainerStat{
|
|
ID: id,
|
|
CPUPercent: cpuPercent,
|
|
MemoryPercent: memPercent,
|
|
MemoryUsage: memUsage,
|
|
}:
|
|
}
|
|
}
|
|
}
|
|
}()
|
|
|
|
return nil
|
|
}
|
|
|
|
func (d *dockerClient) ContainerLogs(ctx context.Context, id string, tailSize int, since string) (io.ReadCloser, error) {
|
|
log.WithField("id", id).WithField("since", since).Debug("streaming logs for container")
|
|
|
|
options := types.ContainerLogsOptions{
|
|
ShowStdout: true,
|
|
ShowStderr: true,
|
|
Follow: true,
|
|
Tail: strconv.Itoa(tailSize),
|
|
Timestamps: true,
|
|
Since: since,
|
|
}
|
|
|
|
reader, err := d.cli.ContainerLogs(ctx, id, options)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
containerJSON, err := d.cli.ContainerInspect(ctx, id)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return newLogReader(reader, containerJSON.Config.Tty), nil
|
|
}
|
|
|
|
func (d *dockerClient) Events(ctx context.Context) (<-chan ContainerEvent, <-chan error) {
|
|
dockerMessages, errors := d.cli.Events(ctx, types.EventsOptions{})
|
|
messages := make(chan ContainerEvent)
|
|
|
|
go func() {
|
|
defer close(messages)
|
|
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
case message, ok := <-dockerMessages:
|
|
if !ok {
|
|
return
|
|
}
|
|
messages <- ContainerEvent{
|
|
ActorID: message.Actor.ID[:12],
|
|
Name: message.Action,
|
|
}
|
|
}
|
|
}
|
|
}()
|
|
|
|
return messages, errors
|
|
}
|
|
|
|
func (d *dockerClient) ContainerLogsBetweenDates(ctx context.Context, id string, from time.Time, to time.Time) (io.ReadCloser, error) {
|
|
options := types.ContainerLogsOptions{
|
|
ShowStdout: true,
|
|
ShowStderr: true,
|
|
Timestamps: true,
|
|
Since: strconv.FormatInt(from.Unix(), 10),
|
|
Until: strconv.FormatInt(to.Unix(), 10),
|
|
}
|
|
|
|
reader, err := d.cli.ContainerLogs(ctx, id, options)
|
|
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
containerJSON, err := d.cli.ContainerInspect(ctx, id)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return newLogReader(reader, containerJSON.Config.Tty), nil
|
|
}
|