mirror of
https://github.com/fnproject/fn.git
synced 2022-10-28 21:29:17 +03:00
* App ID * Clean-up * Use ID or name to reference apps * Can use app by name or ID * Get rid of AppName for routes API and model routes API is completely backwards-compatible routes API accepts both app ID and name * Get rid of AppName from calls API and model * Fixing tests * Get rid of AppName from logs API and model * Restrict API to work with app names only * Addressing review comments * Fix for hybrid mode * Fix rebase problems * Addressing review comments * Addressing review comments pt.2 * Fixing test issue * Addressing review comments pt.3 * Updated docstring * Adjust UpdateApp SQL implementation to work with app IDs instead of names * Fixing tests * fmt after rebase * Make tests green again! * Use GetAppByID wherever it is necessary - adding new v2 endpoints to keep hybrid api/runner mode working - extract CallBase from Call object to expose that to a user (it doesn't include any app reference, as we do for all other API objects) * Get rid of GetAppByName * Adjusting server router setup * Make hybrid work again * Fix datastore tests * Fixing tests * Do not ignore app_id * Resolve issues after rebase * Updating test to make it work as it was * Tabula rasa for migrations * Adding calls API test - we need to ensure we give "App not found" for the missing app and missing call in first place - making previous test work (request missing call for the existing app) * Make datastore tests work fine with correctly applied migrations * Make CallFunction middleware work again had to adjust its implementation to set app ID before proceeding * The biggest rebase ever made * Fix 8's migration * Fix tests * Fix hybrid client * Fix tests problem * Increment app ID migration version * Fixing TestAppUpdate * Fix rebase issues * Addressing review comments * Renew vendor * Updated swagger doc per recommendations
652 lines
20 KiB
Go
652 lines
20 KiB
Go
package agent
|
|
|
|
import (
|
|
"context"
|
|
"crypto/tls"
|
|
"crypto/x509"
|
|
"encoding/json"
|
|
"errors"
|
|
"fmt"
|
|
"io"
|
|
"io/ioutil"
|
|
"net"
|
|
"net/http"
|
|
"sync"
|
|
"sync/atomic"
|
|
"time"
|
|
|
|
runner "github.com/fnproject/fn/api/agent/grpc"
|
|
"github.com/fnproject/fn/api/models"
|
|
"github.com/fnproject/fn/fnext"
|
|
"github.com/go-openapi/strfmt"
|
|
"github.com/golang/protobuf/ptypes/empty"
|
|
"github.com/sirupsen/logrus"
|
|
"google.golang.org/grpc"
|
|
"google.golang.org/grpc/credentials"
|
|
"google.golang.org/grpc/metadata"
|
|
"google.golang.org/grpc/peer"
|
|
)
|
|
|
|
// callHandle represents the state of the call as handled by the pure runner, and additionally it implements the
|
|
// interface of http.ResponseWriter so that it can be used for streaming the output back.
|
|
type callHandle struct {
|
|
engagement runner.RunnerProtocol_EngageServer
|
|
c *call // the agent's version of call
|
|
input io.WriteCloser
|
|
started bool
|
|
done chan error // to synchronize
|
|
// As the state can be set and checked by both goroutines handling this state, we need a mutex.
|
|
stateMutex sync.Mutex
|
|
// Timings, for metrics:
|
|
receivedTime strfmt.DateTime // When was the call received?
|
|
allocatedTime strfmt.DateTime // When did we finish allocating capacity?
|
|
// Last communication error on the stream (if any). This basically acts as a cancellation flag too.
|
|
streamError error
|
|
// For implementing http.ResponseWriter:
|
|
outHeaders http.Header
|
|
outStatus int
|
|
headerWritten bool
|
|
}
|
|
|
|
func (ch *callHandle) Header() http.Header {
|
|
return ch.outHeaders
|
|
}
|
|
|
|
func (ch *callHandle) WriteHeader(status int) {
|
|
ch.outStatus = status
|
|
ch.commitHeaders()
|
|
}
|
|
|
|
func (ch *callHandle) commitHeaders() error {
|
|
if ch.headerWritten {
|
|
return nil
|
|
}
|
|
ch.headerWritten = true
|
|
logrus.Debugf("Committing call result with status %d", ch.outStatus)
|
|
|
|
var outHeaders []*runner.HttpHeader
|
|
|
|
for h, vals := range ch.outHeaders {
|
|
for _, v := range vals {
|
|
outHeaders = append(outHeaders, &runner.HttpHeader{
|
|
Key: h,
|
|
Value: v,
|
|
})
|
|
}
|
|
}
|
|
|
|
// Only write if we are not in an error situation. If we cause a stream error, then record that but don't cancel
|
|
// the call: basically just blackhole the output and return the write error to cause Submit to fail properly.
|
|
ch.stateMutex.Lock()
|
|
defer ch.stateMutex.Unlock()
|
|
err := ch.streamError
|
|
if err != nil {
|
|
return fmt.Errorf("Bailing out because of communication error: %v", ch.streamError)
|
|
}
|
|
|
|
logrus.Debug("Sending call result start message")
|
|
err = ch.engagement.Send(&runner.RunnerMsg{
|
|
Body: &runner.RunnerMsg_ResultStart{
|
|
ResultStart: &runner.CallResultStart{
|
|
Meta: &runner.CallResultStart_Http{
|
|
Http: &runner.HttpRespMeta{
|
|
Headers: outHeaders,
|
|
StatusCode: int32(ch.outStatus),
|
|
},
|
|
},
|
|
},
|
|
},
|
|
})
|
|
if err != nil {
|
|
logrus.WithError(err).Error("Error sending call result")
|
|
ch.streamError = err
|
|
return err
|
|
}
|
|
logrus.Debug("Sent call result message")
|
|
return nil
|
|
}
|
|
|
|
func (ch *callHandle) Write(data []byte) (int, error) {
|
|
err := ch.commitHeaders()
|
|
if err != nil {
|
|
return 0, fmt.Errorf("Error sending data: %v", err)
|
|
}
|
|
|
|
// Only write if we are not in an error situation. If we cause a stream error, then record that but don't cancel
|
|
// the call: basically just blackhole the output and return the write error to cause Submit to fail properly.
|
|
ch.stateMutex.Lock()
|
|
defer ch.stateMutex.Unlock()
|
|
err = ch.streamError
|
|
if err != nil {
|
|
return 0, fmt.Errorf("Bailing out because of communication error: %v", ch.streamError)
|
|
}
|
|
|
|
logrus.Debugf("Sending call response data %d bytes long", len(data))
|
|
err = ch.engagement.Send(&runner.RunnerMsg{
|
|
Body: &runner.RunnerMsg_Data{
|
|
Data: &runner.DataFrame{
|
|
Data: data,
|
|
Eof: false,
|
|
},
|
|
},
|
|
})
|
|
if err != nil {
|
|
ch.streamError = err
|
|
return 0, fmt.Errorf("Error sending data: %v", err)
|
|
}
|
|
return len(data), nil
|
|
}
|
|
|
|
func (ch *callHandle) Close() error {
|
|
err := ch.commitHeaders()
|
|
if err != nil {
|
|
return fmt.Errorf("Error sending close frame: %v", err)
|
|
}
|
|
|
|
// Only write if we are not in an error situation. If we cause a stream error, then record that but don't cancel
|
|
// the call: basically just blackhole the output and return the write error to cause the caller to fail properly.
|
|
ch.stateMutex.Lock()
|
|
defer ch.stateMutex.Unlock()
|
|
err = ch.streamError
|
|
if err != nil {
|
|
return fmt.Errorf("Bailing out because of communication error: %v", ch.streamError)
|
|
}
|
|
logrus.Debug("Sending call response data end")
|
|
err = ch.engagement.Send(&runner.RunnerMsg{
|
|
Body: &runner.RunnerMsg_Data{
|
|
Data: &runner.DataFrame{
|
|
Eof: true,
|
|
},
|
|
},
|
|
})
|
|
|
|
if err != nil {
|
|
return fmt.Errorf("Error sending close frame: %v", err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// cancel implements the logic for cancelling the execution of a call based on what the state in the handle is.
|
|
func (ch *callHandle) cancel(ctx context.Context, err error) {
|
|
ch.stateMutex.Lock()
|
|
defer ch.stateMutex.Unlock()
|
|
|
|
// Do not double-cancel.
|
|
if ch.streamError != nil {
|
|
return
|
|
}
|
|
|
|
// First, record that there has been an error.
|
|
ch.streamError = err
|
|
// Caller may have died or disconnected. The behaviour here depends on the state of the call.
|
|
// If the call was placed and is running we need to handle it...
|
|
if ch.c != nil {
|
|
// If we've actually started the call we're in the middle of an execution with i/o going back and forth.
|
|
// This is hard to stop. Side effects can be occurring at any point. However, at least we should stop
|
|
// the i/o flow. Recording the stream error in the handle should have stopped the output, but we also
|
|
// want to stop any input being sent through, so we close the input stream and let the function
|
|
// probably crash out. If it doesn't crash out, well, it means the function doesn't handle i/o errors
|
|
// properly and it will hang there until the timeout, then it'll be killed properly by the timeout
|
|
// handling in Submit.
|
|
if ch.started {
|
|
ch.input.Close()
|
|
}
|
|
}
|
|
}
|
|
|
|
type CapacityGate interface {
|
|
// CheckAndReserveCapacity must perform an atomic check plus reservation. If an error is returned, then it is
|
|
// guaranteed that no capacity has been committed. If nil is returned, then it is guaranteed that the provided units
|
|
// of capacity have been committed.
|
|
CheckAndReserveCapacity(units uint64) error
|
|
|
|
// ReleaseCapacity must perform an atomic release of capacity. The units provided must not bring the capacity under
|
|
// zero; implementations are free to panic in that case.
|
|
ReleaseCapacity(units uint64)
|
|
}
|
|
|
|
type pureRunnerCapacityManager struct {
|
|
totalCapacityUnits uint64
|
|
committedCapacityUnits uint64
|
|
mtx sync.Mutex
|
|
}
|
|
|
|
type capacityDeallocator func()
|
|
|
|
func newPureRunnerCapacityManager(units uint64) *pureRunnerCapacityManager {
|
|
return &pureRunnerCapacityManager{
|
|
totalCapacityUnits: units,
|
|
committedCapacityUnits: 0,
|
|
}
|
|
}
|
|
|
|
func (prcm *pureRunnerCapacityManager) CheckAndReserveCapacity(units uint64) error {
|
|
prcm.mtx.Lock()
|
|
defer prcm.mtx.Unlock()
|
|
if prcm.totalCapacityUnits-prcm.committedCapacityUnits >= units {
|
|
prcm.committedCapacityUnits = prcm.committedCapacityUnits + units
|
|
return nil
|
|
}
|
|
return models.ErrCallTimeoutServerBusy
|
|
}
|
|
|
|
func (prcm *pureRunnerCapacityManager) ReleaseCapacity(units uint64) {
|
|
prcm.mtx.Lock()
|
|
defer prcm.mtx.Unlock()
|
|
if units <= prcm.committedCapacityUnits {
|
|
prcm.committedCapacityUnits = prcm.committedCapacityUnits - units
|
|
return
|
|
}
|
|
panic("Fatal error in pure runner capacity calculation, getting to sub-zero capacity")
|
|
}
|
|
|
|
// pureRunner implements Agent and delegates execution of functions to an internal Agent; basically it wraps around it
|
|
// and provides the gRPC server that implements the LB <-> Runner protocol.
|
|
type pureRunner struct {
|
|
gRPCServer *grpc.Server
|
|
listen string
|
|
a Agent
|
|
inflight int32
|
|
capacity CapacityGate
|
|
}
|
|
|
|
func (pr *pureRunner) GetAppID(ctx context.Context, appName string) (string, error) {
|
|
return pr.a.GetAppID(ctx, appName)
|
|
}
|
|
|
|
func (pr *pureRunner) GetAppByID(ctx context.Context, appID string) (*models.App, error) {
|
|
return pr.a.GetAppByID(ctx, appID)
|
|
}
|
|
|
|
func (pr *pureRunner) GetCall(opts ...CallOpt) (Call, error) {
|
|
return pr.a.GetCall(opts...)
|
|
}
|
|
|
|
func (pr *pureRunner) Submit(Call) error {
|
|
return errors.New("Submit cannot be called directly in a Pure Runner.")
|
|
}
|
|
|
|
func (pr *pureRunner) Close() error {
|
|
// First stop accepting requests
|
|
pr.gRPCServer.GracefulStop()
|
|
// Then let the agent finish
|
|
err := pr.a.Close()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (pr *pureRunner) AddCallListener(cl fnext.CallListener) {
|
|
pr.a.AddCallListener(cl)
|
|
}
|
|
|
|
func (pr *pureRunner) Enqueue(context.Context, *models.Call) error {
|
|
return errors.New("Enqueue cannot be called directly in a Pure Runner.")
|
|
}
|
|
|
|
func (pr *pureRunner) ensureFunctionIsRunning(state *callHandle) {
|
|
// Only start it once!
|
|
state.stateMutex.Lock()
|
|
defer state.stateMutex.Unlock()
|
|
if !state.started {
|
|
state.started = true
|
|
go func() {
|
|
err := pr.a.Submit(state.c)
|
|
if err != nil {
|
|
// In this case the function has failed for a legitimate reason. We send a call failed message if we
|
|
// can. If there's a streaming error doing that then we are basically in the "double exception" case
|
|
// and who knows what's best to do. Submit has already finished so we don't need to cancel... but at
|
|
// least we should set streamError if it's not set.
|
|
state.stateMutex.Lock()
|
|
defer state.stateMutex.Unlock()
|
|
if state.streamError == nil {
|
|
err2 := state.engagement.Send(&runner.RunnerMsg{
|
|
Body: &runner.RunnerMsg_Finished{Finished: &runner.CallFinished{
|
|
Success: false,
|
|
Details: fmt.Sprintf("%v", err),
|
|
}}})
|
|
if err2 != nil {
|
|
state.streamError = err2
|
|
}
|
|
}
|
|
state.done <- err
|
|
return
|
|
}
|
|
// First close the writer, then send the call finished message
|
|
err = state.Close()
|
|
if err != nil {
|
|
// If we fail to close the writer we need to communicate back that the function has failed; if there's
|
|
// a streaming error doing that then we are basically in the "double exception" case and who knows
|
|
// what's best to do. Submit has already finished so we don't need to cancel... but at least we should
|
|
// set streamError if it's not set.
|
|
state.stateMutex.Lock()
|
|
defer state.stateMutex.Unlock()
|
|
if state.streamError == nil {
|
|
err2 := state.engagement.Send(&runner.RunnerMsg{
|
|
Body: &runner.RunnerMsg_Finished{Finished: &runner.CallFinished{
|
|
Success: false,
|
|
Details: fmt.Sprintf("%v", err),
|
|
}}})
|
|
if err2 != nil {
|
|
state.streamError = err2
|
|
}
|
|
}
|
|
state.done <- err
|
|
return
|
|
}
|
|
// At this point everything should have worked. Send a successful message... and if that runs afoul of a
|
|
// stream error, well, we're in a bit of trouble. Everything has finished, so there is nothing to cancel
|
|
// and we just give up, but at least we set streamError.
|
|
state.stateMutex.Lock()
|
|
defer state.stateMutex.Unlock()
|
|
if state.streamError == nil {
|
|
err2 := state.engagement.Send(&runner.RunnerMsg{
|
|
Body: &runner.RunnerMsg_Finished{Finished: &runner.CallFinished{
|
|
Success: true,
|
|
Details: state.c.Model().ID,
|
|
}}})
|
|
if err2 != nil {
|
|
state.streamError = err2
|
|
state.done <- err2
|
|
return
|
|
}
|
|
}
|
|
|
|
state.done <- nil
|
|
}()
|
|
}
|
|
}
|
|
|
|
func (pr *pureRunner) handleData(ctx context.Context, data *runner.DataFrame, state *callHandle) error {
|
|
pr.ensureFunctionIsRunning(state)
|
|
|
|
// Only push the input if we're in a non-error situation
|
|
state.stateMutex.Lock()
|
|
defer state.stateMutex.Unlock()
|
|
if state.streamError == nil {
|
|
if len(data.Data) > 0 {
|
|
_, err := state.input.Write(data.Data)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
if data.Eof {
|
|
state.input.Close()
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (pr *pureRunner) handleTryCall(ctx context.Context, tc *runner.TryCall, state *callHandle) (capacityDeallocator, error) {
|
|
state.receivedTime = strfmt.DateTime(time.Now())
|
|
var c models.Call
|
|
err := json.Unmarshal([]byte(tc.ModelsCallJson), &c)
|
|
if err != nil {
|
|
return func() {}, err
|
|
}
|
|
|
|
// Capacity check first
|
|
err = pr.capacity.CheckAndReserveCapacity(c.Memory)
|
|
if err != nil {
|
|
return func() {}, err
|
|
}
|
|
|
|
// Proceed!
|
|
var w http.ResponseWriter
|
|
w = state
|
|
inR, inW := io.Pipe()
|
|
agent_call, err := pr.a.GetCall(FromModelAndInput(&c, inR), WithWriter(w))
|
|
if err != nil {
|
|
return func() { pr.capacity.ReleaseCapacity(c.Memory) }, err
|
|
}
|
|
state.c = agent_call.(*call)
|
|
state.input = inW
|
|
state.allocatedTime = strfmt.DateTime(time.Now())
|
|
|
|
return func() { pr.capacity.ReleaseCapacity(c.Memory) }, nil
|
|
}
|
|
|
|
// Handles a client engagement
|
|
func (pr *pureRunner) Engage(engagement runner.RunnerProtocol_EngageServer) error {
|
|
// Keep lightweight tabs on what this runner is doing: for draindown tests
|
|
atomic.AddInt32(&pr.inflight, 1)
|
|
defer atomic.AddInt32(&pr.inflight, -1)
|
|
|
|
pv, ok := peer.FromContext(engagement.Context())
|
|
logrus.Debug("Starting engagement")
|
|
if ok {
|
|
logrus.Debug("Peer is ", pv)
|
|
}
|
|
md, ok := metadata.FromIncomingContext(engagement.Context())
|
|
if ok {
|
|
logrus.Debug("MD is ", md)
|
|
}
|
|
|
|
var state = callHandle{
|
|
engagement: engagement,
|
|
c: nil,
|
|
input: nil,
|
|
started: false,
|
|
done: make(chan error),
|
|
streamError: nil,
|
|
outHeaders: make(http.Header),
|
|
outStatus: 200,
|
|
headerWritten: false,
|
|
}
|
|
|
|
grpc.EnableTracing = false
|
|
logrus.Debug("Entering engagement handler")
|
|
|
|
msg, err := engagement.Recv()
|
|
if err != nil {
|
|
// In this case the connection has dropped before we've even started.
|
|
return err
|
|
}
|
|
switch body := msg.Body.(type) {
|
|
case *runner.ClientMsg_Try:
|
|
dealloc, err := pr.handleTryCall(engagement.Context(), body.Try, &state)
|
|
defer dealloc()
|
|
// At the stage of TryCall, there is only one thread running and nothing has happened yet so there should
|
|
// not be a streamError. We can handle `err` by sending a message back. If we cause a stream error by sending
|
|
// the message, we are in a "double exception" case and we might as well cancel the call with the original
|
|
// error, so we can ignore the error from Send.
|
|
if err != nil {
|
|
_ = engagement.Send(&runner.RunnerMsg{
|
|
Body: &runner.RunnerMsg_Acknowledged{Acknowledged: &runner.CallAcknowledged{
|
|
Committed: false,
|
|
Details: fmt.Sprintf("%v", err),
|
|
}}})
|
|
state.cancel(engagement.Context(), err)
|
|
return err
|
|
}
|
|
|
|
// If we succeed in creating the call, but we get a stream error sending a message back, we must cancel
|
|
// the call because we've probably lost the connection.
|
|
err = engagement.Send(&runner.RunnerMsg{
|
|
Body: &runner.RunnerMsg_Acknowledged{Acknowledged: &runner.CallAcknowledged{
|
|
Committed: true,
|
|
Details: state.c.Model().ID,
|
|
SlotAllocationLatency: time.Time(state.allocatedTime).Sub(time.Time(state.receivedTime)).String(),
|
|
}}})
|
|
if err != nil {
|
|
state.cancel(engagement.Context(), err)
|
|
return err
|
|
}
|
|
|
|
// Then at this point we start handling the data that should be being pushed to us.
|
|
foundEof := false
|
|
for !foundEof {
|
|
msg, err := engagement.Recv()
|
|
if err != nil {
|
|
// In this case the connection has dropped or there's something bad happening. We know we can't even
|
|
// send a message back. Cancel the call, all bets are off.
|
|
state.cancel(engagement.Context(), err)
|
|
return err
|
|
}
|
|
|
|
switch body := msg.Body.(type) {
|
|
case *runner.ClientMsg_Data:
|
|
err := pr.handleData(engagement.Context(), body.Data, &state)
|
|
if err != nil {
|
|
// If this happens, then we couldn't write into the input. The state of the function is inconsistent
|
|
// and therefore we need to cancel. We also need to communicate back that the function has failed;
|
|
// that could also run afoul of a stream error, but at that point we don't care, just cancel the
|
|
// call with the original error.
|
|
_ = state.engagement.Send(&runner.RunnerMsg{
|
|
Body: &runner.RunnerMsg_Finished{Finished: &runner.CallFinished{
|
|
Success: false,
|
|
Details: fmt.Sprintf("%v", err),
|
|
}}})
|
|
state.cancel(engagement.Context(), err)
|
|
return err
|
|
}
|
|
// Then break the loop if this was the last input data frame, i.e. eof is on
|
|
if body.Data.Eof {
|
|
foundEof = true
|
|
}
|
|
default:
|
|
err := errors.New("Protocol failure in communication with function runner")
|
|
// This is essentially a panic. Try to communicate back that the call has failed, and bail out; that
|
|
// could also run afoul of a stream error, but at that point we don't care, just cancel the call with
|
|
// the catastrophic error.
|
|
_ = state.engagement.Send(&runner.RunnerMsg{
|
|
Body: &runner.RunnerMsg_Finished{Finished: &runner.CallFinished{
|
|
Success: false,
|
|
Details: fmt.Sprintf("%v", err),
|
|
}}})
|
|
state.cancel(engagement.Context(), err)
|
|
return err
|
|
}
|
|
}
|
|
|
|
// Synchronize to the function running goroutine finishing
|
|
select {
|
|
case <-state.done:
|
|
case <-engagement.Context().Done():
|
|
return engagement.Context().Err()
|
|
}
|
|
|
|
default:
|
|
// Protocol error. This should not happen.
|
|
return errors.New("Protocol failure in communication with function runner")
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (pr *pureRunner) Status(ctx context.Context, _ *empty.Empty) (*runner.RunnerStatus, error) {
|
|
return &runner.RunnerStatus{
|
|
Active: atomic.LoadInt32(&pr.inflight),
|
|
}, nil
|
|
}
|
|
|
|
func (pr *pureRunner) Start() error {
|
|
logrus.Info("Pure Runner listening on ", pr.listen)
|
|
lis, err := net.Listen("tcp", pr.listen)
|
|
if err != nil {
|
|
return fmt.Errorf("Could not listen on %s: %s", pr.listen, err)
|
|
}
|
|
|
|
if err := pr.gRPCServer.Serve(lis); err != nil {
|
|
return fmt.Errorf("grpc serve error: %s", err)
|
|
}
|
|
return err
|
|
}
|
|
|
|
func UnsecuredPureRunner(cancel context.CancelFunc, addr string, da DataAccess) (*pureRunner, error) {
|
|
return NewPureRunner(cancel, addr, da, "", "", "", nil)
|
|
}
|
|
|
|
func DefaultPureRunner(cancel context.CancelFunc, addr string, da DataAccess, cert string, key string, ca string) (*pureRunner, error) {
|
|
return NewPureRunner(cancel, addr, da, cert, key, ca, nil)
|
|
}
|
|
|
|
func NewPureRunner(cancel context.CancelFunc, addr string, da DataAccess, cert string, key string, ca string, gate CapacityGate) (*pureRunner, error) {
|
|
a := createAgent(da, true)
|
|
var pr *pureRunner
|
|
var err error
|
|
if cert != "" && key != "" && ca != "" {
|
|
c, err := creds(cert, key, ca)
|
|
if err != nil {
|
|
logrus.WithField("runner_addr", addr).Warn("Failed to create credentials!")
|
|
return nil, err
|
|
}
|
|
pr, err = createPureRunner(addr, a, c, gate)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
} else {
|
|
logrus.Warn("Running pure runner in insecure mode!")
|
|
pr, err = createPureRunner(addr, a, nil, gate)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
|
|
go func() {
|
|
err := pr.Start()
|
|
if err != nil {
|
|
logrus.WithError(err).Error("Failed to start pure runner")
|
|
cancel()
|
|
}
|
|
}()
|
|
|
|
return pr, nil
|
|
}
|
|
|
|
func creds(cert string, key string, ca string) (credentials.TransportCredentials, error) {
|
|
// Load the certificates from disk
|
|
certificate, err := tls.LoadX509KeyPair(cert, key)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("Could not load server key pair: %s", err)
|
|
}
|
|
|
|
// Create a certificate pool from the certificate authority
|
|
certPool := x509.NewCertPool()
|
|
authority, err := ioutil.ReadFile(ca)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("Could not read ca certificate: %s", err)
|
|
}
|
|
|
|
if ok := certPool.AppendCertsFromPEM(authority); !ok {
|
|
return nil, errors.New("Failed to append client certs")
|
|
}
|
|
|
|
return credentials.NewTLS(&tls.Config{
|
|
ClientAuth: tls.RequireAndVerifyClientCert,
|
|
Certificates: []tls.Certificate{certificate},
|
|
ClientCAs: certPool,
|
|
}), nil
|
|
}
|
|
|
|
func createPureRunner(addr string, a Agent, creds credentials.TransportCredentials, gate CapacityGate) (*pureRunner, error) {
|
|
var srv *grpc.Server
|
|
if creds != nil {
|
|
srv = grpc.NewServer(grpc.Creds(creds))
|
|
} else {
|
|
srv = grpc.NewServer()
|
|
}
|
|
if gate == nil {
|
|
memUnits := getAvailableMemoryUnits()
|
|
gate = newPureRunnerCapacityManager(memUnits)
|
|
}
|
|
pr := &pureRunner{
|
|
gRPCServer: srv,
|
|
listen: addr,
|
|
a: a,
|
|
capacity: gate,
|
|
}
|
|
|
|
runner.RegisterRunnerProtocolServer(srv, pr)
|
|
return pr, nil
|
|
}
|
|
|
|
const megabyte uint64 = 1024 * 1024
|
|
|
|
func getAvailableMemoryUnits() uint64 {
|
|
// To reuse code - but it's a bit of a hack. TODO: refactor the OS-specific get memory funcs out of that.
|
|
throwawayRT := NewResourceTracker(nil).(*resourceTracker)
|
|
return throwawayRT.ramAsyncTotal / megabyte
|
|
}
|