Refactor to use provider log request schema

**What**
- Remove the log schema and replace with the schema defined in the
provider.  This ensures that the cli uses the same schema as the
providers.

Signed-off-by: Lucas Roesler <roesler.lucas@gmail.com>
This commit is contained in:
Lucas Roesler
2019-06-22 20:35:11 +02:00
committed by Alex Ellis
parent cde4cfdc16
commit 495bc7042d
11 changed files with 374 additions and 210 deletions

12
Gopkg.lock generated
View File

@@ -92,6 +92,17 @@
revision = "4cbb7d968d21f4ae2daacfb27213c9371d4d4449"
version = "0.8.8"
[[projects]]
digest = "1:978f9a2199f9d63160619adf9cea50a8387c66269b81e299afe092bf51a0fd64"
name = "github.com/openfaas/faas-provider"
packages = [
"httputil",
"logs",
]
pruneopts = "UT"
revision = "0ca8ae603fee9736e011b81fbf02777d89a4ea85"
version = "0.9.2"
[[projects]]
digest = "1:cf31692c14422fa27c83a05292eb5cbe0fb2775972e8f1f8446a71549bd8980b"
name = "github.com/pkg/errors"
@@ -159,6 +170,7 @@
"github.com/drone/envsubst",
"github.com/mitchellh/go-homedir",
"github.com/morikuni/aec",
"github.com/openfaas/faas-provider/logs",
"github.com/openfaas/faas/gateway/requests",
"github.com/pkg/errors",
"github.com/ryanuber/go-glob",

View File

@@ -8,17 +8,27 @@ import (
"os"
"time"
"github.com/openfaas/faas-cli/flags"
"github.com/openfaas/faas-provider/logs"
"github.com/openfaas/faas-cli/proxy"
"github.com/openfaas/faas-cli/schema"
"github.com/spf13/cobra"
)
var (
nowFunc = time.Now
logFlagValues logFlags
nowFunc = time.Now
)
func init() {
type logFlags struct {
instance string
since time.Duration
sinceTime flags.TimestampFlag
follow bool
tail int
}
func init() {
initLogCmdFlags(functionLogsCmd)
faasCmd.AddCommand(functionLogsCmd)
@@ -45,15 +55,15 @@ func noopPreRunCmd(cmd *cobra.Command, args []string) error {
// initLogCmdFlags configures the logs command flags, this allows the developer to
// reset and reinitialize the flags on the command in unit tests
func initLogCmdFlags(cmd *cobra.Command) {
logFlagValues = logFlags{}
cmd.Flags().StringVarP(&gateway, "gateway", "g", defaultGateway, "Gateway URL starting with http(s)://")
cmd.Flags().BoolVar(&tlsInsecure, "tls-no-verify", false, "Disable TLS validation")
cmd.Flags().String("instance", "", "filter to a specific function instance")
cmd.Flags().String("since", "", "include logs since the given timestamp (RFC3339)")
cmd.Flags().String("pattern", "", "filter logs that matching the given pattern")
cmd.Flags().Bool("invert", false, "invert the pattern match")
cmd.Flags().Bool("follow", true, "tail logs")
cmd.Flags().Int("limit", 0, "maximum number of log entries to return, unlimited if <=0 ")
cmd.Flags().DurationVar(&logFlagValues.since, "since", 0*time.Second, "return logs newer than a relative duration like 5s")
cmd.Flags().Var(&logFlagValues.sinceTime, "since-time", "include logs since the given timestamp (RFC3339)")
cmd.Flags().IntVar(&logFlagValues.tail, "tail", -1, "number of recent log lines file to display. Defaults to -1, unlimited if <=0")
cmd.Flags().BoolVar(&logFlagValues.follow, "follow", true, "continue printing new logs until the end of the request, up to 30s")
}
func runLogs(cmd *cobra.Command, args []string) error {
@@ -64,7 +74,6 @@ func runLogs(cmd *cobra.Command, args []string) error {
}
logRequest := logRequestFromFlags(cmd, args)
logEvents, err := proxy.GetLogs(gatewayAddress, tlsInsecure, logRequest)
if err != nil {
return err
@@ -77,74 +86,23 @@ func runLogs(cmd *cobra.Command, args []string) error {
return nil
}
func logRequestFromFlags(cmd *cobra.Command, args []string) schema.LogRequest {
flags := cmd.Flags()
return schema.LogRequest{
Name: args[0],
Instance: mustString(flags.GetString("instance")),
Limit: mustInt(flags.GetInt("limit")),
Since: mustTimestampP(flags.GetString("since")),
Follow: mustBool(flags.GetBool("follow")),
Pattern: mustStringP(flags.GetString("pattern")),
Invert: mustBool(flags.GetBool("invert")),
func logRequestFromFlags(cmd *cobra.Command, args []string) logs.Request {
return logs.Request{
Name: args[0],
Tail: logFlagValues.tail,
Since: sinceValue(logFlagValues.sinceTime.AsTime(), logFlagValues.since),
Follow: logFlagValues.follow,
}
}
func mustString(v string, e error) string {
if e != nil {
panic(e)
}
return v
}
func mustStringP(v string, e error) *string {
if e != nil {
panic(e)
}
if v == "" {
return nil
func sinceValue(t time.Time, d time.Duration) *time.Time {
if !t.IsZero() {
return &t
}
return &v
}
func mustBool(v bool, e error) bool {
if e != nil {
panic(e)
}
return v
}
func mustInt(v int, e error) int {
if e != nil {
panic(e)
}
return v
}
// return timestamp from a string flag, if it is not a valid duration, then we
// attempt to parse the string as RFC3339
func mustTimestampP(v string, e error) *time.Time {
if e != nil {
panic(e)
}
if v == "" {
return nil
}
d, err := time.ParseDuration(v)
if err == nil {
if d.String() != "0s" {
ts := nowFunc().Add(-1 * d)
return &ts
}
ts, err := time.Parse(time.RFC3339, v)
if err != nil {
panic(e)
}
return &ts
return nil
}
// func mustTimestamp(v string, e error)

View File

@@ -1,11 +1,10 @@
package commands
import (
"errors"
"testing"
"time"
"github.com/openfaas/faas-cli/schema"
"github.com/openfaas/faas-provider/logs"
)
func Test_logsCmdFlagParsing(t *testing.T) {
@@ -20,22 +19,20 @@ func Test_logsCmdFlagParsing(t *testing.T) {
scenarios := []struct {
name string
args []string
expected schema.LogRequest
expected logs.Request
}{
{"name only passed, follow on by default", []string{"funcFoo"}, schema.LogRequest{Name: "funcFoo", Follow: true}},
{"can disable follow", []string{"funcFoo", "--follow=false"}, schema.LogRequest{Name: "funcFoo", Follow: false}},
{"can supply filter pattern", []string{"funcFoo", "--pattern=abc"}, schema.LogRequest{Name: "funcFoo", Follow: true, Pattern: strP("abc")}},
{"can invert filter pattern", []string{"funcFoo", "--pattern=abc", "--invert=true"}, schema.LogRequest{Name: "funcFoo", Follow: true, Pattern: strP("abc"), Invert: true}},
{"can limit number of messages returned", []string{"funcFoo", "--limit=5"}, schema.LogRequest{Name: "funcFoo", Follow: true, Limit: 5}},
{"can set timestamp to send logs since using duration", []string{"funcFoo", "--since=5m"}, schema.LogRequest{Name: "funcFoo", Follow: true, Since: &fiveMinAgo}},
{"can set timestamp to send logs since using timestamp", []string{"funcFoo", "--since=" + fiveMinAgoStr}, schema.LogRequest{Name: "funcFoo", Follow: true, Since: &fiveMinAgo}},
{"name only passed, follow on by default", []string{"funcFoo"}, logs.Request{Name: "funcFoo", Follow: true, Tail: -1}},
{"can disable follow", []string{"funcFoo", "--follow=false"}, logs.Request{Name: "funcFoo", Follow: false, Tail: -1}},
{"can limit number of messages returned", []string{"funcFoo", "--tail=5"}, logs.Request{Name: "funcFoo", Follow: true, Tail: 5}},
{"can set timestamp to send logs since using duration", []string{"funcFoo", "--since=5m"}, logs.Request{Name: "funcFoo", Follow: true, Tail: -1, Since: &fiveMinAgo}},
{"can set timestamp to send logs since using timestamp", []string{"funcFoo", "--since-time=" + fiveMinAgoStr}, logs.Request{Name: "funcFoo", Follow: true, Tail: -1, Since: &fiveMinAgo}},
}
for _, s := range scenarios {
t.Run(s.name, func(t *testing.T) {
functionLogsCmd.ResetFlags()
initLogCmdFlags(functionLogsCmd)
initLogCmdFlags(functionLogsCmd)
functionLogsCmd.ParseFlags(s.args)
logRequest := logRequestFromFlags(functionLogsCmd, functionLogsCmd.Flags().Args())
@@ -49,46 +46,3 @@ func Test_logsCmdFlagParsing(t *testing.T) {
func strP(s string) *string {
return &s
}
func Test_mustTimestampP(t *testing.T) {
nowFunc = func() time.Time {
ts, _ := time.Parse(time.RFC3339, "2019-01-01T01:00:00Z")
return ts
}
scenarios := []struct {
name string
value string
err error
panic bool
expected string
}{
{"empty string returns nil", "", nil, false, "nil"},
{"duration is parsed", "1h", nil, false, "2019-01-01T00:00:00Z"},
{"timestamp is parsed", "2019-01-01T05:01:10Z", nil, false, "2019-01-01T05:01:10Z"},
{"will panic if timestamp cannot be parsed", "-2019-01-01T05:01:10Z", nil, true, "2019-01-01T05:01:10Z"},
{"will panic if supplied an error", "", errors.New("some flag error"), true, ""},
}
for _, s := range scenarios {
t.Run(s.name, func(t *testing.T) {
defer func() {
if r := recover(); r != nil && !s.panic {
t.Errorf("mustTimestampP should only panic when given an error or when time.Parse errors")
}
}()
ts := mustTimestampP(s.value, s.err)
switch s.expected {
case "nil":
if ts != nil {
t.Errorf("expected nil time, got %v", ts)
}
default:
if ts.Format(time.RFC3339) != s.expected {
t.Errorf("expected %s time, got %s", s.expected, ts)
}
}
})
}
}

38
flags/timestamp.go Normal file
View File

@@ -0,0 +1,38 @@
// Copyright (c) OpenFaaS Author(s) 2019. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
package flags
import "time"
// TimestampFlag implements the Value interface to accept and validate a
// RFC3339 timestamp string as a flag
type TimestampFlag string
// Type implements pflag.Value
func (t *TimestampFlag) Type() string {
return "timestamp"
}
// String implements Stringer
func (t *TimestampFlag) String() string {
if t == nil {
return ""
}
return string(*t)
}
// Set implements pflag.Value
func (t *TimestampFlag) Set(value string) error {
_, err := time.Parse(time.RFC3339, value)
if err == nil {
*t = TimestampFlag(value)
}
return err
}
// AsTime returns the underlying time instance
func (t TimestampFlag) AsTime() time.Time {
v, _ := time.Parse(time.RFC3339, t.String())
return v
}

34
flags/timestamp_test.go Normal file
View File

@@ -0,0 +1,34 @@
package flags
import (
"testing"
"time"
)
func TestTimestamp(t *testing.T) {
cases := []struct {
name string
value string
expected time.Time
err error
}{
{"valid rfc3339 parses", "2012-01-02T10:01:12Z", time.Date(2012, time.January, 2, 10, 1, 12, 0, time.UTC), nil},
{"valid rfc3339 parses", "2012-01-02T10:01:12Z", time.Date(2012, time.January, 2, 10, 1, 12, 0, time.UTC), nil},
{"in-valid rfc3339 parses", "2012-01-02T10:01:12Z", time.Date(2012, time.January, 2, 10, 1, 12, 0, time.UTC), nil},
}
for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
var ts TimestampFlag
err := ts.Set(tc.value)
if tc.err != err {
t.Errorf("expected err %s, got %s", tc.err, err)
}
if ts.AsTime().String() != tc.expected.String() {
t.Errorf("expected time %s, got %s", tc.expected.String(), ts.String())
}
})
}
}

View File

@@ -7,13 +7,16 @@ import (
"io/ioutil"
"log"
"net/http"
"net/url"
"strconv"
"strings"
"time"
"github.com/openfaas/faas-cli/schema"
"github.com/openfaas/faas-provider/logs"
)
// GetLogs list deployed functions
func GetLogs(gateway string, tlsInsecure bool, params schema.LogRequest) (<-chan schema.LogMessage, error) {
func GetLogs(gateway string, tlsInsecure bool, params logs.Request) (<-chan logs.Message, error) {
gateway = strings.TrimRight(gateway, "/")
// replace with a client that allows keep alive, Default?
@@ -25,14 +28,14 @@ func GetLogs(gateway string, tlsInsecure bool, params schema.LogRequest) (<-chan
return nil, fmt.Errorf("cannot connect to OpenFaaS on URL: %s", gateway)
}
logRequest.URL.RawQuery = params.AsQueryValues().Encode()
logRequest.URL.RawQuery = reqAsQueryValues(params).Encode()
res, err := client.Do(logRequest)
if err != nil {
return nil, fmt.Errorf("cannot connect to OpenFaaS on URL: %s", gateway)
}
logStream := make(chan schema.LogMessage, 1000)
logStream := make(chan logs.Message, 1000)
switch res.StatusCode {
case http.StatusOK:
go func() {
@@ -41,7 +44,7 @@ func GetLogs(gateway string, tlsInsecure bool, params schema.LogRequest) (<-chan
decoder := json.NewDecoder(res.Body)
for decoder.More() {
msg := schema.LogMessage{}
msg := logs.Message{}
err := decoder.Decode(&msg)
if err != nil {
log.Printf("cannot parse log results: %s\n", err.Error())
@@ -61,6 +64,25 @@ func GetLogs(gateway string, tlsInsecure bool, params schema.LogRequest) (<-chan
return logStream, nil
}
func reqAsQueryValues(r logs.Request) url.Values {
query := url.Values{}
query.Add("name", r.Name)
query.Add("follow", strconv.FormatBool(r.Follow))
if r.Instance != "" {
query.Add("instance", r.Instance)
}
if r.Since != nil {
query.Add("since", r.Since.Format(time.RFC3339))
}
if r.Tail != 0 {
query.Add("tail", strconv.Itoa(r.Tail))
}
return query
}
func makeStreamingHTTPClient(tlsInsecure bool) http.Client {
client := http.Client{}

View File

@@ -1,79 +0,0 @@
package schema
import (
"fmt"
"net/url"
"strconv"
"time"
)
// import or alias these from the provider?
// LogRequest is the query to return the function logs.
type LogRequest struct {
// Name is the function name and is required
Name string `json:"name"`
// Instance is the optional container name, that allows you to request logs from a specific function instance
Instance string `json:"instance"`
// Since is the optional datetime value to start the logs from
Since *time.Time `json:"since"`
// Limit sets the maximum number of log messages to return, <=0 means unlimited
Limit int `json:"limit"`
// Follow is allows the user to request a stream of logs
Follow bool `json:"follow"`
// Pattern is an optional regexp value to filter the log messages
Pattern *string `json:"pattern"`
// Invert allows you to control if the Pattern should be matched or negated
Invert bool `json:"invert"`
}
// String implements that Stringer interface and prints the log Request in a consistent way that
// allows you to safely compare if two requests have the same value.
func (r LogRequest) String() string {
pattern := ""
if r.Pattern != nil {
pattern = *r.Pattern
}
return fmt.Sprintf("name:%s instance:%s since:%v limit:%d follow:%v pattern:%v invert:%v", r.Name, r.Instance, r.Since, r.Limit, r.Follow, pattern, r.Invert)
}
func (r *LogRequest) AsQueryValues() url.Values {
query := url.Values{}
query.Add("name", r.Name)
query.Add("follow", strconv.FormatBool(r.Follow))
if r.Instance != "" {
query.Add("instance", r.Instance)
}
if r.Since != nil {
query.Add("since", r.Since.Format(time.RFC3339))
}
if r.Limit != 0 {
query.Add("limit", strconv.Itoa(r.Limit))
}
if r.Pattern != nil {
query.Add("pattern", *r.Pattern)
query.Add("invert", strconv.FormatBool(r.Invert))
}
return query
}
// LogMessage is a specific log message from a function container log stream
type LogMessage struct {
// Name is the function name
Name string `json:"name"`
// instance is the name/id of the specific function instance
Instance string `json:"instance"`
// Timestamp is the timestamp of when the log message was recorded
Timestamp time.Time `json:"timestamp"`
// Text is the raw log message content
Text string `json:"text"`
}
// String implements the Stringer interface and allows for nice and simple string formatting of a log Message.
func (m LogMessage) String() string {
return fmt.Sprintf("%s %s (%s) %s", m.Timestamp.Format(time.RFC3339), m.Name, m.Instance, m.Text)
}

21
vendor/github.com/openfaas/faas-provider/LICENSE generated vendored Normal file
View File

@@ -0,0 +1,21 @@
MIT License
Copyright (c) 2017 Alex Ellis
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.

View File

@@ -0,0 +1,12 @@
package httputil
import (
"fmt"
"net/http"
)
// Errorf sets the response status code and write formats the provided message as the
// response body
func Errorf(w http.ResponseWriter, statusCode int, msg string, args ...interface{}) {
http.Error(w, fmt.Sprintf(msg, args...), statusCode)
}

View File

@@ -0,0 +1,142 @@
package logs
import (
"context"
"encoding/json"
"log"
"net/http"
"net/url"
"strconv"
"time"
"github.com/openfaas/faas-provider/httputil"
)
// Requester submits queries the logging system.
// This will be passed to the log handler constructor.
type Requester interface {
// Query submits a log request to the actual logging system.
Query(context.Context, Request) (<-chan Message, error)
}
// NewLogHandlerFunc creates an http HandlerFunc from the supplied log Requestor.
func NewLogHandlerFunc(requestor Requester, timeout time.Duration) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
if r.Body != nil {
defer r.Body.Close()
}
cn, ok := w.(http.CloseNotifier)
if !ok {
log.Println("LogHandler: response is not a CloseNotifier, required for streaming response")
http.NotFound(w, r)
return
}
flusher, ok := w.(http.Flusher)
if !ok {
log.Println("LogHandler: response is not a Flusher, required for streaming response")
http.NotFound(w, r)
return
}
logRequest, err := parseRequest(r)
if err != nil {
log.Printf("LogHandler: could not parse request %s", err)
httputil.Errorf(w, http.StatusUnprocessableEntity, "could not parse the log request")
return
}
ctx, cancelQuery := context.WithTimeout(r.Context(), timeout)
defer cancelQuery()
messages, err := requestor.Query(ctx, logRequest)
if err != nil {
// add smarter error handling here
httputil.Errorf(w, http.StatusInternalServerError, "function log request failed")
return
}
// Send the initial headers saying we're gonna stream the response.
w.Header().Set("Connection", "Keep-Alive")
w.Header().Set("Transfer-Encoding", "chunked")
w.Header().Set(http.CanonicalHeaderKey("Content-Type"), "application/x-ndjson")
w.WriteHeader(http.StatusOK)
flusher.Flush()
// ensure that we always try to send the closing chunk, not the inverted order due to how
// the defer stack works. We need two flush statements to ensure that the empty slice is
// sent as its own chunk
defer flusher.Flush()
defer w.Write([]byte{})
defer flusher.Flush()
jsonEncoder := json.NewEncoder(w)
for messages != nil {
select {
case <-cn.CloseNotify():
log.Println("LogHandler: client stopped listening")
return
case msg, ok := <-messages:
if !ok {
log.Println("LogHandler: end of log stream")
messages = nil
return
}
// serialize and write the msg to the http ResponseWriter
err := jsonEncoder.Encode(msg)
if err != nil {
// can't actually write the status header here so we should json serialize an error
// and return that because we have already sent the content type and status code
log.Printf("LogHandler: failed to serialize log message: '%s'\n", msg.String())
log.Println(err.Error())
// write json error message here ?
jsonEncoder.Encode(Message{Text: "failed to serialize log message"})
flusher.Flush()
return
}
flusher.Flush()
}
}
return
}
}
// parseRequest extracts the logRequest from the GET variables or from the POST body
func parseRequest(r *http.Request) (logRequest Request, err error) {
query := r.URL.Query()
logRequest.Name = getValue(query, "name")
logRequest.Instance = getValue(query, "instance")
tailStr := getValue(query, "tail")
if tailStr != "" {
logRequest.Tail, err = strconv.Atoi(tailStr)
if err != nil {
return logRequest, err
}
}
// ignore error because it will default to false if we can't parse it
logRequest.Follow, _ = strconv.ParseBool(getValue(query, "follow"))
sinceStr := getValue(query, "since")
if sinceStr != "" {
since, err := time.Parse(time.RFC3339, sinceStr)
logRequest.Since = &since
if err != nil {
return logRequest, err
}
}
return logRequest, nil
}
// getValue returns the value for the given key. If the key has more than one value, it returns the
// last value. if the value does not exist, it returns the empty string.
func getValue(queryValues url.Values, name string) string {
values := queryValues[name]
if len(values) == 0 {
return ""
}
return values[len(values)-1]
}

50
vendor/github.com/openfaas/faas-provider/logs/logs.go generated vendored Normal file
View File

@@ -0,0 +1,50 @@
// Package logs provides the standard interface and handler for OpenFaaS providers to expose function logs.
//
// The package defines the Requester interface that OpenFaaS providers should implement and then expose using
// the predefined NewLogHandlerFunc. See the example folder for a minimal log provider implementation.
//
// The Requester is where the actual specific logic for connecting to and querying the log system should be implemented.
//
package logs
import (
"fmt"
"time"
)
// Request is the query to return the function logs.
type Request struct {
// Name is the function name and is required
Name string `json:"name"`
// Instance is the optional container name, that allows you to request logs from a specific function instance
Instance string `json:"instance"`
// Since is the optional datetime value to start the logs from
Since *time.Time `json:"since"`
// Tail sets the maximum number of log messages to return, <=0 means unlimited
Tail int `json:"tail"`
// Follow is allows the user to request a stream of logs until the timeout
Follow bool `json:"follow"`
}
// String implements that Stringer interface and prints the log Request in a consistent way that
// allows you to safely compare if two requests have the same value.
func (r Request) String() string {
return fmt.Sprintf("name:%s instance:%s since:%v tail:%d follow:%v", r.Name, r.Instance, r.Since, r.Tail, r.Follow)
}
// Message is a specific log message from a function container log stream
type Message struct {
// Name is the function name
Name string `json:"name"`
// instance is the name/id of the specific function instance
Instance string `json:"instance"`
// Timestamp is the timestamp of when the log message was recorded
Timestamp time.Time `json:"timestamp"`
// Text is the raw log message content
Text string `json:"text"`
}
// String implements the Stringer interface and allows for nice and simple string formatting of a log Message.
func (m Message) String() string {
return fmt.Sprintf("%s %s (%s) %s", m.Timestamp.String(), m.Name, m.Instance, m.Text)
}