opentracing -> opencensus (#802)

* update vendor directory, add go.opencensus.io

* update imports

* oops

* s/opentracing/opencensus/ & remove prometheus / zipkin stuff & remove old stats

* the dep train rides again

* fix gin build

* deps from last guy

* start in on the agent metrics

* she builds

* remove tags for now, cardinality error is fussing. subscribe instead of register

* update to patched version of opencensus to proceed for now TODO switch to a release

* meh

fix imports

* println debug the bad boys

* lace it with the tags

* update deps again

* fix all inconsistent cardinality errors

* add our own logger

* fix init

* fix oom measure

* remove bugged removal code

* fix s3 measures

* fix prom handler nil
This commit is contained in:
Reed Allman
2018-03-05 09:35:28 -08:00
committed by GitHub
parent 924d27559c
commit 206aa3c203
5975 changed files with 158755 additions and 566592 deletions

View File

@@ -3,7 +3,6 @@ package agent
import (
"context"
"io"
"net/http"
"strings"
"sync"
"time"
@@ -16,9 +15,11 @@ import (
"github.com/fnproject/fn/api/models"
"github.com/fnproject/fn/fnext"
"github.com/go-openapi/strfmt"
"github.com/opentracing/opentracing-go"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/sirupsen/logrus"
"go.opencensus.io/stats"
"go.opencensus.io/stats/view"
"go.opencensus.io/tag"
"go.opencensus.io/trace"
)
// TODO we should prob store async calls in db immediately since we're returning id (will 404 until post-execution)
@@ -85,8 +86,6 @@ type Agent interface {
// Close is not safe to be called from multiple threads.
io.Closer
// Return the http.Handler used to handle Prometheus metric requests
PromHandler() http.Handler
AddCallListener(fnext.CallListener)
// Enqueue is to use the agent's sweet sweet client bindings to remotely
@@ -109,9 +108,6 @@ type agent struct {
wg sync.WaitGroup // TODO rename
shutonce sync.Once
shutdown chan struct{}
// Prometheus HTTP handler
promHandler http.Handler
}
func New(da DataAccess) Agent {
@@ -128,13 +124,12 @@ func New(da DataAccess) Agent {
})
a := &agent{
cfg: *cfg,
da: da,
driver: driver,
slotMgr: NewSlotQueueMgr(),
resources: NewResourceTracker(),
shutdown: make(chan struct{}),
promHandler: promhttp.Handler(),
cfg: *cfg,
da: da,
driver: driver,
slotMgr: NewSlotQueueMgr(),
resources: NewResourceTracker(),
shutdown: make(chan struct{}),
}
// TODO assert that agent doesn't get started for API nodes up above ?
@@ -174,8 +169,8 @@ func (a *agent) Submit(callI Call) error {
call.req = call.req.WithContext(ctx)
defer cancel()
ctx, finish := statSpans(ctx, call)
defer finish()
ctx, span := trace.StartSpan(ctx, "agent_submit")
defer span.End()
err := a.submit(ctx, call)
return err
@@ -202,14 +197,15 @@ func (a *agent) endStateTrackers(ctx context.Context, call *call) {
}
func (a *agent) submit(ctx context.Context, call *call) error {
StatsEnqueue(ctx)
statsEnqueue(ctx)
// TODO can we replace state trackers with metrics?
a.startStateTrackers(ctx, call)
defer a.endStateTrackers(ctx, call)
slot, err := a.getSlot(ctx, call)
if err != nil {
a.handleStatsDequeue(ctx, call, err)
handleStatsDequeue(ctx, err)
return transformTimeout(err, true)
}
@@ -217,20 +213,19 @@ func (a *agent) submit(ctx context.Context, call *call) error {
err = call.Start(ctx)
if err != nil {
a.handleStatsDequeue(ctx, call, err)
handleStatsDequeue(ctx, err)
return transformTimeout(err, true)
}
// decrement queued count, increment running count
StatsDequeueAndStart(ctx)
statsDequeueAndStart(ctx)
// pass this error (nil or otherwise) to end directly, to store status, etc
err = slot.exec(ctx, call)
a.handleStatsEnd(ctx, call, err)
handleStatsEnd(ctx, err)
// TODO: we need to allocate more time to store the call + logs in case the call timed out,
// but this could put us over the timeout if the call did not reply yet (need better policy).
ctx = opentracing.ContextWithSpan(context.Background(), opentracing.SpanFromContext(ctx))
ctx = common.BackgroundContext(ctx)
err = call.End(ctx, err)
return transformTimeout(err, false)
}
@@ -247,54 +242,33 @@ func transformTimeout(e error, isRetriable bool) error {
// handleStatsDequeue handles stats for dequeuing for early exit (getSlot or Start)
// cases. Only timeouts can be a simple dequeue while other cases are actual errors.
func (a *agent) handleStatsDequeue(ctx context.Context, call *call, err error) {
func handleStatsDequeue(ctx context.Context, err error) {
if err == context.DeadlineExceeded {
StatsDequeue(ctx)
StatsIncrementTooBusy(ctx)
statsDequeue(ctx)
statsTooBusy(ctx)
} else {
StatsDequeueAndFail(ctx)
StatsIncrementErrors(ctx)
statsDequeueAndFail(ctx)
statsErrors(ctx)
}
}
// handleStatsEnd handles stats for after a call is ran, depending on error.
func (a *agent) handleStatsEnd(ctx context.Context, call *call, err error) {
func handleStatsEnd(ctx context.Context, err error) {
if err == nil {
// decrement running count, increment completed count
StatsComplete(ctx)
statsComplete(ctx)
} else {
// decrement running count, increment failed count
StatsFailed(ctx)
statsFailed(ctx)
// increment the timeout or errors count, as appropriate
if err == context.DeadlineExceeded {
StatsIncrementTimedout(ctx)
statsTimedout(ctx)
} else {
StatsIncrementErrors(ctx)
statsErrors(ctx)
}
}
}
func statSpans(ctx context.Context, call *call) (_ context.Context, finish func()) {
// agent_submit_global has no parent span because we don't want it to inherit fn_appname or fn_path
spanGlobal := opentracing.StartSpan("agent_submit_global")
// agent_submit_global has no parent span because we don't want it to inherit fn_path
spanApp := opentracing.StartSpan("agent_submit_app")
spanApp.SetBaggageItem("fn_appname", call.AppName)
// agent_submit has a parent span in the usual way
// it doesn't matter if it inherits fn_appname or fn_path (and we set them here in any case)
span, ctx := opentracing.StartSpanFromContext(ctx, "agent_submit")
span.SetBaggageItem("fn_appname", call.AppName)
span.SetBaggageItem("fn_path", call.Path)
return ctx, func() {
spanGlobal.Finish()
spanApp.Finish()
span.Finish()
}
}
// getSlot returns a Slot (or error) for the request to run. Depending on hot/cold
// request type, this may launch a new container or wait for other containers to become idle
// or it may wait for resources to become available to launch a new container.
@@ -303,8 +277,8 @@ func (a *agent) getSlot(ctx context.Context, call *call) (Slot, error) {
ctx, cancel := context.WithDeadline(ctx, call.slotDeadline)
defer cancel()
span, ctx := opentracing.StartSpanFromContext(ctx, "agent_get_slot")
defer span.Finish()
ctx, span := trace.StartSpan(ctx, "agent_get_slot")
defer span.End()
if protocol.IsStreamable(protocol.Protocol(call.Format)) {
// For hot requests, we use a long lived slot queue, which we use to manage hot containers
@@ -340,9 +314,9 @@ func (a *agent) hotLauncher(ctx context.Context, call *call) {
// IMPORTANT: get a context that has a child span / logger but NO timeout
// TODO this is a 'FollowsFrom'
ctx = opentracing.ContextWithSpan(common.WithLogger(context.Background(), logger), opentracing.SpanFromContext(ctx))
span, ctx := opentracing.StartSpanFromContext(ctx, "agent_hot_launcher")
defer span.Finish()
ctx = common.BackgroundContext(ctx)
ctx, span := trace.StartSpan(ctx, "agent_hot_launcher")
defer span.End()
for {
ctx, cancel := context.WithTimeout(ctx, timeout)
@@ -395,8 +369,8 @@ func (a *agent) checkLaunch(ctx context.Context, call *call) {
// waitHot pings and waits for a hot container from the slot queue
func (a *agent) waitHot(ctx context.Context, call *call) (Slot, error) {
span, ctx := opentracing.StartSpanFromContext(ctx, "agent_wait_hot")
defer span.Finish()
ctx, span := trace.StartSpan(ctx, "agent_wait_hot")
defer span.End()
ctx, cancel := context.WithCancel(ctx)
defer cancel() // shut down dequeuer if we grab a slot
@@ -442,8 +416,8 @@ func (a *agent) launchCold(ctx context.Context, call *call) (Slot, error) {
isAsync := call.Type == models.TypeAsync
ch := make(chan Slot)
span, ctx := opentracing.StartSpanFromContext(ctx, "agent_launch_cold")
defer span.Finish()
ctx, span := trace.StartSpan(ctx, "agent_launch_cold")
defer span.End()
call.containerState.UpdateState(ctx, ContainerStateWait, call.slots)
@@ -479,8 +453,8 @@ func (s *coldSlot) Error() error {
}
func (s *coldSlot) exec(ctx context.Context, call *call) error {
span, ctx := opentracing.StartSpanFromContext(ctx, "agent_cold_exec")
defer span.Finish()
ctx, span := trace.StartSpan(ctx, "agent_cold_exec")
defer span.End()
call.requestState.UpdateState(ctx, RequestStateExec, call.slots)
call.containerState.UpdateState(ctx, ContainerStateBusy, call.slots)
@@ -507,7 +481,7 @@ func (s *coldSlot) Close(ctx context.Context) error {
// call this from here so that in exec we don't have to eat container
// removal latency
// NOTE ensure container removal, no ctx timeout
ctx = opentracing.ContextWithSpan(context.Background(), opentracing.SpanFromContext(ctx))
ctx = common.BackgroundContext(ctx)
s.cookie.Close(ctx)
}
if s.tok != nil {
@@ -535,8 +509,8 @@ func (s *hotSlot) Error() error {
}
func (s *hotSlot) exec(ctx context.Context, call *call) error {
span, ctx := opentracing.StartSpanFromContext(ctx, "agent_hot_exec")
defer span.Finish()
ctx, span := trace.StartSpan(ctx, "agent_hot_exec")
defer span.End()
call.requestState.UpdateState(ctx, RequestStateExec, call.slots)
@@ -575,8 +549,8 @@ func (s *hotSlot) exec(ctx context.Context, call *call) error {
}
func (a *agent) prepCold(ctx context.Context, call *call, tok ResourceToken, ch chan Slot) {
span, ctx := opentracing.StartSpanFromContext(ctx, "agent_prep_cold")
defer span.Finish()
ctx, span := trace.StartSpan(ctx, "agent_prep_cold")
defer span.End()
call.containerState.UpdateState(ctx, ContainerStateStart, call.slots)
@@ -622,9 +596,9 @@ func (a *agent) prepCold(ctx context.Context, call *call, tok ResourceToken, ch
func (a *agent) runHot(ctx context.Context, call *call, tok ResourceToken, state ContainerState) {
// IMPORTANT: get a context that has a child span / logger but NO timeout
// TODO this is a 'FollowsFrom'
ctx = opentracing.ContextWithSpan(context.Background(), opentracing.SpanFromContext(ctx))
span, ctx := opentracing.StartSpanFromContext(ctx, "agent_run_hot")
defer span.Finish()
ctx = common.BackgroundContext(ctx)
ctx, span := trace.StartSpan(ctx, "agent_run_hot")
defer span.End()
defer tok.Close() // IMPORTANT: this MUST get called
state.UpdateState(ctx, ContainerStateStart, call.slots)
@@ -852,17 +826,10 @@ func (c *container) CPUs() uint64 { return c.cpus }
// WriteStat publishes each metric in the specified Stats structure as a histogram metric
func (c *container) WriteStat(ctx context.Context, stat drivers.Stat) {
// Convert each metric value from uint64 to float64
// and, for backward compatibility reasons, prepend each metric name with "docker_stats_fn_"
// (if we don't care about compatibility then we can remove that)
var metrics = make(map[string]float64)
for key, value := range stat.Metrics {
metrics["docker_stats_fn_"+key] = float64(value)
stats.Record(ctx, stats.FindMeasure("docker_stats_"+key).(*stats.Int64Measure).M(int64(value)))
}
common.PublishHistograms(ctx, metrics)
c.statsMu.Lock()
if c.stats != nil {
*(c.stats) = append(*(c.stats), stat)
@@ -870,6 +837,45 @@ func (c *container) WriteStat(ctx context.Context, stat drivers.Stat) {
c.statsMu.Unlock()
}
func init() {
// TODO this is nasty figure out how to use opencensus to not have to declare these
keys := []string{"net_rx", "net_tx", "mem_limit", "mem_usage", "disk_read", "disk_write", "cpu_user", "cpu_total", "cpu_kernel"}
// TODO necessary?
appKey, err := tag.NewKey("fn_appname")
if err != nil {
logrus.Fatal(err)
}
pathKey, err := tag.NewKey("fn_path")
if err != nil {
logrus.Fatal(err)
}
for _, key := range keys {
units := "bytes"
if strings.Contains(key, "cpu") {
units = "cpu"
}
dockerStatsDist, err := stats.Int64("docker_stats_"+key, "docker container stats for "+key, units)
if err != nil {
logrus.Fatal(err)
}
v, err := view.New(
"docker_stats_"+key,
"docker container stats for "+key,
[]tag.Key{appKey, pathKey},
dockerStatsDist,
view.DistributionAggregation{},
)
if err != nil {
logrus.Fatalf("cannot create view: %v", err)
}
if err := v.Subscribe(); err != nil {
logrus.Fatal(err)
}
}
}
//func (c *container) DockerAuth() (docker.AuthConfiguration, error) {
// Implementing the docker.AuthConfiguration interface.
// TODO per call could implement this stored somewhere (vs. configured on host)

View File

@@ -4,9 +4,11 @@ import (
"context"
"time"
"github.com/fnproject/fn/api/common"
"github.com/fnproject/fn/api/models"
opentracing "github.com/opentracing/opentracing-go"
"github.com/sirupsen/logrus"
"go.opencensus.io/tag"
"go.opencensus.io/trace"
)
func (a *agent) asyncDequeue() {
@@ -17,8 +19,8 @@ func (a *agent) asyncDequeue() {
defer cancel()
// parent span here so that we can see how many async calls are running
span, ctx := opentracing.StartSpanFromContext(ctx, "agent_async_dequeue")
defer span.Finish()
ctx, span := trace.StartSpan(ctx, "agent_async_dequeue")
defer span.End()
for {
select {
@@ -73,11 +75,29 @@ func (a *agent) asyncChew(ctx context.Context) <-chan *models.Call {
func (a *agent) asyncRun(ctx context.Context, model *models.Call) {
// IMPORTANT: get a context that has a child span but NO timeout (Submit imposes timeout)
// TODO this is a 'FollowsFrom'
ctx = opentracing.ContextWithSpan(context.Background(), opentracing.SpanFromContext(ctx))
ctx = common.BackgroundContext(ctx)
// since async doesn't come in through the normal request path,
// we've gotta add tags here for stats to come out properly.
appKey, err := tag.NewKey("fn_appname")
if err != nil {
logrus.Fatal(err)
}
pathKey, err := tag.NewKey("fn_path")
if err != nil {
logrus.Fatal(err)
}
ctx, err = tag.New(ctx,
tag.Insert(appKey, model.AppName),
tag.Insert(pathKey, model.Path),
)
if err != nil {
logrus.Fatal(err)
}
// additional enclosing context here since this isn't spawned from an http request
span, ctx := opentracing.StartSpanFromContext(ctx, "agent_async_run")
defer span.Finish()
ctx, span := trace.StartSpan(ctx, "agent_async_run")
defer span.End()
call, err := a.GetCall(
FromModel(model),

View File

@@ -9,12 +9,13 @@ import (
"strings"
"time"
"go.opencensus.io/trace"
"github.com/fnproject/fn/api/agent/drivers"
"github.com/fnproject/fn/api/common"
"github.com/fnproject/fn/api/id"
"github.com/fnproject/fn/api/models"
"github.com/go-openapi/strfmt"
"github.com/opentracing/opentracing-go"
"github.com/sirupsen/logrus"
)
@@ -249,8 +250,8 @@ type call struct {
func (c *call) Model() *models.Call { return c.Call }
func (c *call) Start(ctx context.Context) error {
span, ctx := opentracing.StartSpanFromContext(ctx, "agent_call_start")
defer span.Finish()
ctx, span := trace.StartSpan(ctx, "agent_call_start")
defer span.End()
// Check context timeouts, errors
if ctx.Err() != nil {
@@ -290,8 +291,8 @@ func (c *call) Start(ctx context.Context) error {
}
func (c *call) End(ctx context.Context, errIn error) error {
span, ctx := opentracing.StartSpanFromContext(ctx, "agent_call_end")
defer span.Finish()
ctx, span := trace.StartSpan(ctx, "agent_call_end")
defer span.End()
c.CompletedAt = strfmt.DateTime(time.Now())

View File

@@ -12,13 +12,15 @@ import (
"strings"
"time"
"go.opencensus.io/stats"
"go.opencensus.io/trace"
"github.com/coreos/go-semver/semver"
"github.com/fnproject/fn/api/agent/drivers"
"github.com/fnproject/fn/api/common"
"github.com/fnproject/fn/api/models"
"github.com/fsouza/go-dockerclient"
"github.com/go-openapi/strfmt"
"github.com/opentracing/opentracing-go"
"github.com/sirupsen/logrus"
)
@@ -77,7 +79,6 @@ func NewDocker(conf drivers.Config) *DockerDriver {
}
func checkDockerVersion(driver *DockerDriver, expected string) error {
info, err := driver.docker.Info(context.Background())
if err != nil {
return err
@@ -269,9 +270,9 @@ func (drv *DockerDriver) ensureImage(ctx context.Context, task drivers.Container
if task, ok := task.(Auther); ok {
var err error
span, _ := opentracing.StartSpanFromContext(ctx, "docker_auth")
_, span := trace.StartSpan(ctx, "docker_auth")
config, err = task.DockerAuth()
span.Finish()
span.End()
if err != nil {
return err
}
@@ -396,8 +397,8 @@ func (w *waitResult) Wait(ctx context.Context) (drivers.RunResult, error) {
// Repeatedly collect stats from the specified docker container until the stopSignal is closed or the context is cancelled
func (drv *DockerDriver) collectStats(ctx context.Context, stopSignal <-chan struct{}, container string, task drivers.ContainerTask) {
span, ctx := opentracing.StartSpanFromContext(ctx, "docker_collect_stats")
defer span.Finish()
ctx, span := trace.StartSpan(ctx, "docker_collect_stats")
defer span.End()
log := common.Logger(ctx)
@@ -576,7 +577,7 @@ func (w *waitResult) wait(ctx context.Context) (status string, err error) {
case 0:
return drivers.StatusSuccess, nil
case 137: // OOM
// TODO put in stats opentracing.SpanFromContext(ctx).LogFields(log.String("docker", "oom"))
stats.Record(ctx, dockerOOMMeasure.M(1))
common.Logger(ctx).Error("docker oom")
err := errors.New("container out of memory, you may want to raise route.memory for this route (default: 128MB)")
return drivers.StatusKilled, models.NewAPIError(http.StatusBadGateway, err)

View File

@@ -12,9 +12,11 @@ import (
"github.com/fnproject/fn/api/common"
"github.com/fsouza/go-dockerclient"
"github.com/opentracing/opentracing-go"
"github.com/opentracing/opentracing-go/log"
"github.com/sirupsen/logrus"
"go.opencensus.io/stats"
"go.opencensus.io/stats/view"
"go.opencensus.io/tag"
"go.opencensus.io/trace"
)
const (
@@ -98,18 +100,123 @@ type dockerWrap struct {
dockerNoTimeout *docker.Client
}
func init() {
// TODO doing this at each call site seems not the intention of the library since measurements
// need to be created and views registered. doing this up front seems painful but maybe there
// are benefits?
// TODO do we have to do this? the measurements will be tagged on the context, will they be propagated
// or we have to white list them in the view for them to show up? test...
var err error
appKey, err := tag.NewKey("fn_appname")
if err != nil {
logrus.Fatal(err)
}
pathKey, err := tag.NewKey("fn_path")
if err != nil {
logrus.Fatal(err)
}
{
dockerRetriesMeasure, err = stats.Int64("docker_api_retries", "docker api retries", "")
if err != nil {
logrus.Fatal(err)
}
v, err := view.New(
"docker_api_retries",
"number of times we've retried docker API upon failure",
[]tag.Key{appKey, pathKey},
dockerRetriesMeasure,
view.SumAggregation{},
)
if err != nil {
logrus.Fatalf("cannot create view: %v", err)
}
if err := v.Subscribe(); err != nil {
logrus.Fatal(err)
}
}
{
dockerTimeoutMeasure, err = stats.Int64("docker_api_timeout", "docker api timeouts", "")
if err != nil {
logrus.Fatal(err)
}
v, err := view.New(
"docker_api_timeout_count",
"number of times we've timed out calling docker API",
[]tag.Key{appKey, pathKey},
dockerTimeoutMeasure,
view.CountAggregation{},
)
if err != nil {
logrus.Fatalf("cannot create view: %v", err)
}
if err := v.Subscribe(); err != nil {
logrus.Fatal(err)
}
}
{
dockerErrorMeasure, err = stats.Int64("docker_api_error", "docker api errors", "")
if err != nil {
logrus.Fatal(err)
}
v, err := view.New(
"docker_api_error_count",
"number of unrecoverable errors from docker API",
[]tag.Key{appKey, pathKey},
dockerErrorMeasure,
view.CountAggregation{},
)
if err != nil {
logrus.Fatalf("cannot create view: %v", err)
}
if err := v.Subscribe(); err != nil {
logrus.Fatal(err)
}
}
{
dockerOOMMeasure, err = stats.Int64("docker_oom", "docker oom", "")
if err != nil {
logrus.Fatal(err)
}
v, err := view.New(
"docker_oom_count",
"number of docker container oom",
[]tag.Key{appKey, pathKey},
dockerOOMMeasure,
view.CountAggregation{},
)
if err != nil {
logrus.Fatalf("cannot create view: %v", err)
}
if err := v.Subscribe(); err != nil {
logrus.Fatal(err)
}
}
}
var (
// TODO it's either this or stats.FindMeasure("string").M() -- this is safer but painful
dockerRetriesMeasure *stats.Int64Measure
dockerTimeoutMeasure *stats.Int64Measure
dockerErrorMeasure *stats.Int64Measure
dockerOOMMeasure *stats.Int64Measure
)
func (d *dockerWrap) retry(ctx context.Context, logger logrus.FieldLogger, f func() error) error {
var i int
var err error
span := opentracing.SpanFromContext(ctx)
defer func() { span.LogFields(log.Int("docker_call_retries", i)) }()
defer func() { stats.Record(ctx, dockerRetriesMeasure.M(int64(i))) }()
var b common.Backoff
// 10 retries w/o change to backoff is ~13s if ops take ~0 time
for ; i < 10; i++ {
select {
case <-ctx.Done():
span.LogFields(log.String("task", "fail.docker"))
stats.Record(ctx, dockerTimeoutMeasure.M(1))
logger.WithError(ctx.Err()).Warnf("docker call timed out")
return ctx.Err()
default:
@@ -119,11 +226,10 @@ func (d *dockerWrap) retry(ctx context.Context, logger logrus.FieldLogger, f fun
if common.IsTemporary(err) || isDocker50x(err) {
logger.WithError(err).Warn("docker temporary error, retrying")
b.Sleep(ctx)
span.LogFields(log.String("task", "tmperror.docker"))
continue
}
if err != nil {
span.LogFields(log.String("task", "error.docker"))
stats.Record(ctx, dockerErrorMeasure.M(1))
}
return err
}
@@ -176,22 +282,17 @@ func filterNoSuchContainer(ctx context.Context, err error) error {
}
func (d *dockerWrap) Info(ctx context.Context) (info *docker.DockerInfo, err error) {
span, ctx := opentracing.StartSpanFromContext(ctx, "docker_server_version")
defer span.Finish()
logger := common.Logger(ctx).WithField("docker_cmd", "DockerInfo")
ctx, cancel := context.WithTimeout(ctx, retryTimeout)
defer cancel()
err = d.retry(ctx, logger, func() error {
info, err = d.docker.Info()
return err
})
return info, err
// NOTE: we're not very responsible and prometheus wasn't loved as a child, this
// threads through directly down to the docker call, skipping retires, so that we
// don't have to add tags / tracing / logger to the bare context handed to the one
// place this is called in initialization that has no context to report consistent
// stats like everything else in here. tl;dr this works, just don't use it for anything else.
return d.docker.Info()
}
func (d *dockerWrap) AttachToContainerNonBlocking(ctx context.Context, opts docker.AttachToContainerOptions) (w docker.CloseWaiter, err error) {
span, ctx := opentracing.StartSpanFromContext(ctx, "docker_attach_container")
defer span.Finish()
ctx, span := trace.StartSpan(ctx, "docker_attach_container")
defer span.End()
logger := common.Logger(ctx).WithField("docker_cmd", "AttachContainer")
ctx, cancel := context.WithTimeout(ctx, retryTimeout)
@@ -208,8 +309,8 @@ func (d *dockerWrap) AttachToContainerNonBlocking(ctx context.Context, opts dock
}
func (d *dockerWrap) WaitContainerWithContext(id string, ctx context.Context) (code int, err error) {
span, ctx := opentracing.StartSpanFromContext(ctx, "docker_wait_container")
defer span.Finish()
ctx, span := trace.StartSpan(ctx, "docker_wait_container")
defer span.End()
logger := common.Logger(ctx).WithField("docker_cmd", "WaitContainer")
err = d.retry(ctx, logger, func() error {
@@ -220,8 +321,8 @@ func (d *dockerWrap) WaitContainerWithContext(id string, ctx context.Context) (c
}
func (d *dockerWrap) StartContainerWithContext(id string, hostConfig *docker.HostConfig, ctx context.Context) (err error) {
span, ctx := opentracing.StartSpanFromContext(ctx, "docker_start_container")
defer span.Finish()
ctx, span := trace.StartSpan(ctx, "docker_start_container")
defer span.End()
logger := common.Logger(ctx).WithField("docker_cmd", "StartContainer")
err = d.retry(ctx, logger, func() error {
@@ -236,8 +337,8 @@ func (d *dockerWrap) StartContainerWithContext(id string, hostConfig *docker.Hos
}
func (d *dockerWrap) CreateContainer(opts docker.CreateContainerOptions) (c *docker.Container, err error) {
span, ctx := opentracing.StartSpanFromContext(opts.Context, "docker_create_container")
defer span.Finish()
ctx, span := trace.StartSpan(opts.Context, "docker_create_container")
defer span.End()
logger := common.Logger(ctx).WithField("docker_cmd", "CreateContainer")
err = d.retry(ctx, logger, func() error {
@@ -248,8 +349,8 @@ func (d *dockerWrap) CreateContainer(opts docker.CreateContainerOptions) (c *doc
}
func (d *dockerWrap) PullImage(opts docker.PullImageOptions, auth docker.AuthConfiguration) (err error) {
span, ctx := opentracing.StartSpanFromContext(opts.Context, "docker_pull_image")
defer span.Finish()
ctx, span := trace.StartSpan(opts.Context, "docker_pull_image")
defer span.End()
logger := common.Logger(ctx).WithField("docker_cmd", "PullImage")
err = d.retry(ctx, logger, func() error {
@@ -262,9 +363,9 @@ func (d *dockerWrap) PullImage(opts docker.PullImageOptions, auth docker.AuthCon
func (d *dockerWrap) RemoveContainer(opts docker.RemoveContainerOptions) (err error) {
// extract the span, but do not keep the context, since the enclosing context
// may be timed out, and we still want to remove the container. TODO in caller? who cares?
span, _ := opentracing.StartSpanFromContext(opts.Context, "docker_remove_container")
defer span.Finish()
ctx := opentracing.ContextWithSpan(context.Background(), span)
ctx := common.BackgroundContext(opts.Context)
ctx, span := trace.StartSpan(ctx, "docker_remove_container")
defer span.End()
ctx, cancel := context.WithTimeout(ctx, retryTimeout)
defer cancel()
@@ -278,8 +379,8 @@ func (d *dockerWrap) RemoveContainer(opts docker.RemoveContainerOptions) (err er
}
func (d *dockerWrap) PauseContainer(id string, ctx context.Context) (err error) {
span, _ := opentracing.StartSpanFromContext(ctx, "docker_pause_container")
defer span.Finish()
_, span := trace.StartSpan(ctx, "docker_pause_container")
defer span.End()
ctx, cancel := context.WithTimeout(ctx, pauseTimeout)
defer cancel()
@@ -292,8 +393,8 @@ func (d *dockerWrap) PauseContainer(id string, ctx context.Context) (err error)
}
func (d *dockerWrap) UnpauseContainer(id string, ctx context.Context) (err error) {
span, _ := opentracing.StartSpanFromContext(ctx, "docker_unpause_container")
defer span.Finish()
_, span := trace.StartSpan(ctx, "docker_unpause_container")
defer span.End()
ctx, cancel := context.WithTimeout(ctx, pauseTimeout)
defer cancel()
@@ -306,8 +407,8 @@ func (d *dockerWrap) UnpauseContainer(id string, ctx context.Context) (err error
}
func (d *dockerWrap) InspectImage(ctx context.Context, name string) (i *docker.Image, err error) {
span, ctx := opentracing.StartSpanFromContext(ctx, "docker_inspect_image")
defer span.Finish()
ctx, span := trace.StartSpan(ctx, "docker_inspect_image")
defer span.End()
ctx, cancel := context.WithTimeout(ctx, retryTimeout)
defer cancel()
@@ -320,8 +421,8 @@ func (d *dockerWrap) InspectImage(ctx context.Context, name string) (i *docker.I
}
func (d *dockerWrap) InspectContainerWithContext(container string, ctx context.Context) (c *docker.Container, err error) {
span, ctx := opentracing.StartSpanFromContext(ctx, "docker_inspect_container")
defer span.Finish()
ctx, span := trace.StartSpan(ctx, "docker_inspect_container")
defer span.End()
ctx, cancel := context.WithTimeout(ctx, retryTimeout)
defer cancel()

View File

@@ -17,7 +17,8 @@ import (
"github.com/fnproject/fn/api/agent"
"github.com/fnproject/fn/api/common"
"github.com/fnproject/fn/api/models"
opentracing "github.com/opentracing/opentracing-go"
"go.opencensus.io/plugin/ochttp/propagation/b3"
"go.opencensus.io/trace"
)
// client implements agent.DataAccess
@@ -66,16 +67,16 @@ func NewClient(u string) (agent.DataAccess, error) {
}
func (cl *client) Enqueue(ctx context.Context, c *models.Call) error {
span, ctx := opentracing.StartSpanFromContext(ctx, "hybrid_client_enqueue")
defer span.Finish()
ctx, span := trace.StartSpan(ctx, "hybrid_client_enqueue")
defer span.End()
err := cl.do(ctx, c, nil, "PUT", "runner", "async")
return err
}
func (cl *client) Dequeue(ctx context.Context) (*models.Call, error) {
span, ctx := opentracing.StartSpanFromContext(ctx, "hybrid_client_dequeue")
defer span.Finish()
ctx, span := trace.StartSpan(ctx, "hybrid_client_dequeue")
defer span.End()
var c struct {
C []*models.Call `json:"calls"`
@@ -88,16 +89,16 @@ func (cl *client) Dequeue(ctx context.Context) (*models.Call, error) {
}
func (cl *client) Start(ctx context.Context, c *models.Call) error {
span, ctx := opentracing.StartSpanFromContext(ctx, "hybrid_client_start")
defer span.Finish()
ctx, span := trace.StartSpan(ctx, "hybrid_client_start")
defer span.End()
err := cl.do(ctx, c, nil, "POST", "runner", "start")
return err
}
func (cl *client) Finish(ctx context.Context, c *models.Call, r io.Reader, async bool) error {
span, ctx := opentracing.StartSpanFromContext(ctx, "hybrid_client_end")
defer span.Finish()
ctx, span := trace.StartSpan(ctx, "hybrid_client_end")
defer span.End()
var b bytes.Buffer // TODO pool / we should multipart this?
_, err := io.Copy(&b, r)
@@ -118,8 +119,8 @@ func (cl *client) Finish(ctx context.Context, c *models.Call, r io.Reader, async
}
func (cl *client) GetApp(ctx context.Context, appName string) (*models.App, error) {
span, ctx := opentracing.StartSpanFromContext(ctx, "hybrid_client_get_app")
defer span.Finish()
ctx, span := trace.StartSpan(ctx, "hybrid_client_get_app")
defer span.End()
var a struct {
A models.App `json:"app"`
@@ -129,8 +130,8 @@ func (cl *client) GetApp(ctx context.Context, appName string) (*models.App, erro
}
func (cl *client) GetRoute(ctx context.Context, appName, route string) (*models.Route, error) {
span, ctx := opentracing.StartSpanFromContext(ctx, "hybrid_client_get_route")
defer span.Finish()
ctx, span := trace.StartSpan(ctx, "hybrid_client_get_route")
defer span.End()
// TODO trim prefix is pretty odd here eh?
var r struct {
@@ -181,8 +182,8 @@ func (cl *client) do(ctx context.Context, request, result interface{}, method st
}
func (cl *client) once(ctx context.Context, request, result interface{}, method string, url ...string) error {
span, ctx := opentracing.StartSpanFromContext(ctx, "hybrid_client_http_do")
defer span.Finish()
ctx, span := trace.StartSpan(ctx, "hybrid_client_http_do")
defer span.End()
var b bytes.Buffer // TODO pool
if request != nil {
@@ -196,12 +197,9 @@ func (cl *client) once(ctx context.Context, request, result interface{}, method
if err != nil {
return err
}
req = req.WithContext(ctx)
// shove the span headers in so that the server will continue this span
opentracing.GlobalTracer().Inject(
span.Context(),
opentracing.HTTPHeaders,
opentracing.HTTPHeadersCarrier(req.Header))
var xxx b3.HTTPFormat
xxx.SpanContextToRequest(span.SpanContext(), req)
resp, err := cl.http.Do(req)
if err != nil {

View File

@@ -1,9 +0,0 @@
package agent
import (
"net/http"
)
func (a *agent) PromHandler() http.Handler {
return a.promHandler
}

View File

@@ -7,8 +7,9 @@ import (
"io"
"net/http"
"go.opencensus.io/trace"
"github.com/fnproject/fn/api/models"
opentracing "github.com/opentracing/opentracing-go"
)
// HTTPProtocol converts stdin/stdout streams into HTTP/1.1 compliant
@@ -23,8 +24,8 @@ type HTTPProtocol struct {
func (p *HTTPProtocol) IsStreamable() bool { return true }
func (h *HTTPProtocol) Dispatch(ctx context.Context, ci CallInfo, w io.Writer) error {
span, ctx := opentracing.StartSpanFromContext(ctx, "dispatch_http")
defer span.Finish()
ctx, span := trace.StartSpan(ctx, "dispatch_http")
defer span.End()
req := ci.Request()
@@ -36,23 +37,23 @@ func (h *HTTPProtocol) Dispatch(ctx context.Context, ci CallInfo, w io.Writer) e
req.Header.Set("FN_REQUEST_URL", ci.RequestURL())
req.Header.Set("FN_CALL_ID", ci.CallID())
span, _ = opentracing.StartSpanFromContext(ctx, "dispatch_http_write_request")
_, span = trace.StartSpan(ctx, "dispatch_http_write_request")
// req.Write handles if the user does not specify content length
err := req.Write(h.in)
span.Finish()
span.End()
if err != nil {
return err
}
span, _ = opentracing.StartSpanFromContext(ctx, "dispatch_http_read_response")
_, span = trace.StartSpan(ctx, "dispatch_http_read_response")
resp, err := http.ReadResponse(bufio.NewReader(h.out), ci.Request())
span.Finish()
span.End()
if err != nil {
return models.NewAPIError(http.StatusBadGateway, fmt.Errorf("invalid http response from function err: %v", err))
}
span, _ = opentracing.StartSpanFromContext(ctx, "dispatch_http_write_response")
defer span.Finish()
_, span = trace.StartSpan(ctx, "dispatch_http_write_response")
defer span.End()
rw, ok := w.(http.ResponseWriter)
if !ok {

View File

@@ -9,8 +9,9 @@ import (
"net/http"
"sync"
"go.opencensus.io/trace"
"github.com/fnproject/fn/api/models"
opentracing "github.com/opentracing/opentracing-go"
)
var (
@@ -87,26 +88,26 @@ func (h *JSONProtocol) writeJSONToContainer(ci CallInfo) error {
}
func (h *JSONProtocol) Dispatch(ctx context.Context, ci CallInfo, w io.Writer) error {
span, ctx := opentracing.StartSpanFromContext(ctx, "dispatch_json")
defer span.Finish()
ctx, span := trace.StartSpan(ctx, "dispatch_json")
defer span.End()
span, _ = opentracing.StartSpanFromContext(ctx, "dispatch_json_write_request")
_, span = trace.StartSpan(ctx, "dispatch_json_write_request")
err := h.writeJSONToContainer(ci)
span.Finish()
span.End()
if err != nil {
return err
}
span, _ = opentracing.StartSpanFromContext(ctx, "dispatch_json_read_response")
_, span = trace.StartSpan(ctx, "dispatch_json_read_response")
var jout jsonOut
err = json.NewDecoder(h.out).Decode(&jout)
span.Finish()
span.End()
if err != nil {
return models.NewAPIError(http.StatusBadGateway, fmt.Errorf("invalid json response from function err: %v", err))
}
span, _ = opentracing.StartSpanFromContext(ctx, "dispatch_json_write_response")
defer span.Finish()
_, span = trace.StartSpan(ctx, "dispatch_json_write_response")
defer span.End()
rw, ok := w.(http.ResponseWriter)
if !ok {

View File

@@ -13,7 +13,8 @@ import (
"strings"
"sync"
opentracing "github.com/opentracing/opentracing-go"
"go.opencensus.io/trace"
"github.com/sirupsen/logrus"
)
@@ -166,9 +167,9 @@ func (a *resourceTracker) GetResourceToken(ctx context.Context, memory uint64, c
c.L.Unlock()
}()
span, ctx := opentracing.StartSpanFromContext(ctx, "agent_get_resource_token")
ctx, span := trace.StartSpan(ctx, "agent_get_resource_token")
go func() {
defer span.Finish()
defer span.End()
defer cancel()
c.L.Lock()
@@ -254,9 +255,9 @@ func (a *resourceTracker) WaitAsyncResource(ctx context.Context) chan struct{} {
c.L.Unlock()
}()
span, ctx := opentracing.StartSpanFromContext(ctx, "agent_wait_async_resource")
ctx, span := trace.StartSpan(ctx, "agent_wait_async_resource")
go func() {
defer span.Finish()
defer span.End()
defer cancel()
c.L.Lock()
isWaiting = true

View File

@@ -5,7 +5,10 @@ import (
"sync"
"time"
"github.com/fnproject/fn/api/common"
"github.com/sirupsen/logrus"
"go.opencensus.io/stats"
"go.opencensus.io/stats/view"
"go.opencensus.io/tag"
)
type RequestStateType int
@@ -137,16 +140,76 @@ func (c *containerState) UpdateState(ctx context.Context, newState ContainerStat
// update old state stats
gaugeKey := containerGaugeKeys[oldState]
if gaugeKey != "" {
common.DecrementGauge(ctx, gaugeKey)
stats.Record(ctx, stats.FindMeasure(gaugeKey).(*stats.Int64Measure).M(-1))
}
timeKey := containerTimeKeys[oldState]
if timeKey != "" {
common.PublishElapsedTimeHistogram(ctx, timeKey, before, now)
stats.Record(ctx, stats.FindMeasure(timeKey).(*stats.Int64Measure).M(int64(now.Sub(before).Round(time.Millisecond))))
}
// update new state stats
gaugeKey = containerGaugeKeys[newState]
if gaugeKey != "" {
common.IncrementGauge(ctx, gaugeKey)
stats.Record(ctx, stats.FindMeasure(gaugeKey).(*stats.Int64Measure).M(1))
}
}
func init() {
// TODO(reed): do we have to do this? the measurements will be tagged on the context, will they be propagated
// or we have to white list them in the view for them to show up? test...
appKey, err := tag.NewKey("fn_appname")
if err != nil {
logrus.Fatal(err)
}
pathKey, err := tag.NewKey("fn_path")
if err != nil {
logrus.Fatal(err)
}
for _, key := range containerGaugeKeys {
if key == "" { // leave nil intentionally, let it panic
continue
}
measure, err := stats.Int64(key, "containers in state "+key, "")
if err != nil {
logrus.Fatal(err)
}
v, err := view.New(
key,
"containers in state "+key,
[]tag.Key{appKey, pathKey},
measure,
view.CountAggregation{},
)
if err != nil {
logrus.Fatalf("cannot create view: %v", err)
}
if err := v.Subscribe(); err != nil {
logrus.Fatal(err)
}
}
for _, key := range containerTimeKeys {
if key == "" {
continue
}
measure, err := stats.Int64(key, "time spent in container state "+key, "ms")
if err != nil {
logrus.Fatal(err)
}
v, err := view.New(
key,
"time spent in container state "+key,
[]tag.Key{appKey, pathKey},
measure,
view.DistributionAggregation{},
)
if err != nil {
logrus.Fatalf("cannot create view: %v", err)
}
if err := v.Subscribe(); err != nil {
logrus.Fatal(err)
}
}
}

View File

@@ -2,54 +2,63 @@ package agent
import (
"context"
"github.com/fnproject/fn/api/common"
"github.com/sirupsen/logrus"
"go.opencensus.io/stats"
"go.opencensus.io/stats/view"
"go.opencensus.io/tag"
)
func StatsEnqueue(ctx context.Context) {
common.IncrementGauge(ctx, queuedMetricName)
common.IncrementCounter(ctx, callsMetricName)
// TODO add some suga:
// * hot containers active
// * memory used / available
func statsEnqueue(ctx context.Context) {
stats.Record(ctx, queuedMeasure.M(1))
stats.Record(ctx, callsMeasure.M(1))
}
// Call when a function has been queued but cannot be started because of an error
func StatsDequeue(ctx context.Context) {
common.DecrementGauge(ctx, queuedMetricName)
func statsDequeue(ctx context.Context) {
stats.Record(ctx, queuedMeasure.M(-1))
}
func StatsDequeueAndStart(ctx context.Context) {
common.DecrementGauge(ctx, queuedMetricName)
common.IncrementGauge(ctx, runningMetricName)
func statsDequeueAndStart(ctx context.Context) {
stats.Record(ctx, queuedMeasure.M(-1))
stats.Record(ctx, runningMeasure.M(1))
}
func StatsComplete(ctx context.Context) {
common.DecrementGauge(ctx, runningMetricName)
common.IncrementCounter(ctx, completedMetricName)
func statsComplete(ctx context.Context) {
stats.Record(ctx, runningMeasure.M(-1))
stats.Record(ctx, completedMeasure.M(1))
}
func StatsFailed(ctx context.Context) {
common.DecrementGauge(ctx, runningMetricName)
common.IncrementCounter(ctx, failedMetricName)
func statsFailed(ctx context.Context) {
stats.Record(ctx, runningMeasure.M(-1))
stats.Record(ctx, failedMeasure.M(1))
}
func StatsDequeueAndFail(ctx context.Context) {
common.DecrementGauge(ctx, queuedMetricName)
common.IncrementCounter(ctx, failedMetricName)
func statsDequeueAndFail(ctx context.Context) {
stats.Record(ctx, queuedMeasure.M(-1))
stats.Record(ctx, failedMeasure.M(1))
}
func StatsIncrementTimedout(ctx context.Context) {
common.IncrementCounter(ctx, timedoutMetricName)
func statsTimedout(ctx context.Context) {
stats.Record(ctx, timedoutMeasure.M(1))
}
func StatsIncrementErrors(ctx context.Context) {
common.IncrementCounter(ctx, errorsMetricName)
func statsErrors(ctx context.Context) {
stats.Record(ctx, errorsMeasure.M(1))
}
func StatsIncrementTooBusy(ctx context.Context) {
common.IncrementCounter(ctx, serverBusyMetricName)
func statsTooBusy(ctx context.Context) {
stats.Record(ctx, serverBusyMeasure.M(1))
}
const (
// TODO we should probably prefix these with calls_ ?
queuedMetricName = "queued"
callsMetricName = "calls"
callsMetricName = "calls" // TODO this is a dupe of sum {complete,failed} ?
runningMetricName = "running"
completedMetricName = "completed"
failedMetricName = "failed"
@@ -57,3 +66,192 @@ const (
errorsMetricName = "errors"
serverBusyMetricName = "server_busy"
)
var (
queuedMeasure *stats.Int64Measure
callsMeasure *stats.Int64Measure // TODO this is a dupe of sum {complete,failed} ?
runningMeasure *stats.Int64Measure
completedMeasure *stats.Int64Measure
failedMeasure *stats.Int64Measure
timedoutMeasure *stats.Int64Measure
errorsMeasure *stats.Int64Measure
serverBusyMeasure *stats.Int64Measure
)
func init() {
// TODO(reed): doing this at each call site seems not the intention of the library since measurements
// need to be created and views registered. doing this up front seems painful but maybe there
// are benefits?
// TODO(reed): do we have to do this? the measurements will be tagged on the context, will they be propagated
// or we have to white list them in the view for them to show up? test...
var err error
appKey, err := tag.NewKey("fn_appname")
if err != nil {
logrus.Fatal(err)
}
pathKey, err := tag.NewKey("fn_path")
if err != nil {
logrus.Fatal(err)
}
{
queuedMeasure, err = stats.Int64(queuedMetricName, "calls currently queued against agent", "")
if err != nil {
logrus.Fatal(err)
}
v, err := view.New(
queuedMetricName,
"calls currently queued to agent",
[]tag.Key{appKey, pathKey},
queuedMeasure,
view.SumAggregation{},
)
if err != nil {
logrus.Fatalf("cannot create view: %v", err)
}
if err := v.Subscribe(); err != nil {
logrus.Fatal(err)
}
}
{
callsMeasure, err = stats.Int64(callsMetricName, "calls created in agent", "")
if err != nil {
logrus.Fatal(err)
}
v, err := view.New(
callsMetricName,
"calls created in agent",
[]tag.Key{appKey, pathKey},
callsMeasure,
view.SumAggregation{},
)
if err != nil {
logrus.Fatalf("cannot create view: %v", err)
}
if err := v.Subscribe(); err != nil {
logrus.Fatal(err)
}
}
{
runningMeasure, err = stats.Int64(runningMetricName, "calls currently running in agent", "")
if err != nil {
logrus.Fatal(err)
}
v, err := view.New(
runningMetricName,
"calls currently running in agent",
[]tag.Key{appKey, pathKey},
runningMeasure,
view.SumAggregation{},
)
if err != nil {
logrus.Fatalf("cannot create view: %v", err)
}
if err := v.Subscribe(); err != nil {
logrus.Fatal(err)
}
}
{
completedMeasure, err = stats.Int64(completedMetricName, "calls completed in agent", "")
if err != nil {
logrus.Fatal(err)
}
v, err := view.New(
completedMetricName,
"calls completed in agent",
[]tag.Key{appKey, pathKey},
completedMeasure,
view.SumAggregation{},
)
if err != nil {
logrus.Fatalf("cannot create view: %v", err)
}
if err := v.Subscribe(); err != nil {
logrus.Fatal(err)
}
}
{
failedMeasure, err = stats.Int64(failedMetricName, "calls failed in agent", "")
if err != nil {
logrus.Fatal(err)
}
v, err := view.New(
failedMetricName,
"calls failed in agent",
[]tag.Key{appKey, pathKey},
failedMeasure,
view.SumAggregation{},
)
if err != nil {
logrus.Fatalf("cannot create view: %v", err)
}
if err := v.Subscribe(); err != nil {
logrus.Fatal(err)
}
}
{
timedoutMeasure, err = stats.Int64(timedoutMetricName, "calls timed out in agent", "")
if err != nil {
logrus.Fatal(err)
}
v, err := view.New(
timedoutMetricName,
"calls timed out in agent",
[]tag.Key{appKey, pathKey},
timedoutMeasure,
view.SumAggregation{},
)
if err != nil {
logrus.Fatalf("cannot create view: %v", err)
}
if err := v.Subscribe(); err != nil {
logrus.Fatal(err)
}
}
{
errorsMeasure, err = stats.Int64(errorsMetricName, "calls errored in agent", "")
if err != nil {
logrus.Fatal(err)
}
v, err := view.New(
errorsMetricName,
"calls errored in agent",
[]tag.Key{appKey, pathKey},
errorsMeasure,
view.SumAggregation{},
)
if err != nil {
logrus.Fatalf("cannot create view: %v", err)
}
if err := v.Subscribe(); err != nil {
logrus.Fatal(err)
}
}
{
serverBusyMeasure, err = stats.Int64(serverBusyMetricName, "calls where server was too busy in agent", "")
if err != nil {
logrus.Fatal(err)
}
v, err := view.New(
serverBusyMetricName,
"calls where server was too busy in agent",
[]tag.Key{appKey, pathKey},
serverBusyMeasure,
view.SumAggregation{},
)
if err != nil {
logrus.Fatalf("cannot create view: %v", err)
}
if err := v.Subscribe(); err != nil {
logrus.Fatal(err)
}
}
}