mirror of
https://github.com/fnproject/fn.git
synced 2022-10-28 21:29:17 +03:00
Minor naming and control flow changes to satisfy golint
This commit is contained in:
@@ -417,9 +417,9 @@ func (a *agent) ramToken(ctx context.Context, memory uint64) <-chan Token {
|
|||||||
return ch
|
return ch
|
||||||
}
|
}
|
||||||
|
|
||||||
// asyncRam will send a signal on the returned channel when at least half of
|
// asyncRAM will send a signal on the returned channel when at least half of
|
||||||
// the available RAM on this machine is free.
|
// the available RAM on this machine is free.
|
||||||
func (a *agent) asyncRam() chan struct{} {
|
func (a *agent) asyncRAM() chan struct{} {
|
||||||
ch := make(chan struct{})
|
ch := make(chan struct{})
|
||||||
|
|
||||||
c := a.cond
|
c := a.cond
|
||||||
|
|||||||
@@ -15,7 +15,7 @@ func (a *agent) asyncDequeue() {
|
|||||||
select {
|
select {
|
||||||
case <-a.shutdown:
|
case <-a.shutdown:
|
||||||
return
|
return
|
||||||
case <-a.asyncRam():
|
case <-a.asyncRAM():
|
||||||
// TODO we _could_ return a token here to reserve the ram so that there's
|
// TODO we _could_ return a token here to reserve the ram so that there's
|
||||||
// not a race between here and Submit but we're single threaded
|
// not a race between here and Submit but we're single threaded
|
||||||
// dequeueing and retries handled gracefully inside of Submit if we run
|
// dequeueing and retries handled gracefully inside of Submit if we run
|
||||||
|
|||||||
@@ -6,14 +6,16 @@ import (
|
|||||||
"github.com/sirupsen/logrus"
|
"github.com/sirupsen/logrus"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
type contextKey string
|
||||||
|
|
||||||
// WithLogger stores the logger.
|
// WithLogger stores the logger.
|
||||||
func WithLogger(ctx context.Context, l logrus.FieldLogger) context.Context {
|
func WithLogger(ctx context.Context, l logrus.FieldLogger) context.Context {
|
||||||
return context.WithValue(ctx, "logger", l)
|
return context.WithValue(ctx, contextKey("logger"), l)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Logger returns the structured logger.
|
// Logger returns the structured logger.
|
||||||
func Logger(ctx context.Context) logrus.FieldLogger {
|
func Logger(ctx context.Context) logrus.FieldLogger {
|
||||||
l, ok := ctx.Value("logger").(logrus.FieldLogger)
|
l, ok := ctx.Value(contextKey("logger")).(logrus.FieldLogger)
|
||||||
if !ok {
|
if !ok {
|
||||||
return logrus.StandardLogger()
|
return logrus.StandardLogger()
|
||||||
}
|
}
|
||||||
|
|||||||
28
api/id/id.go
28
api/id/id.go
@@ -10,29 +10,29 @@ import (
|
|||||||
type Id [16]byte
|
type Id [16]byte
|
||||||
|
|
||||||
var (
|
var (
|
||||||
machineId uint64
|
machineID uint64
|
||||||
counter uint64
|
counter uint64
|
||||||
)
|
)
|
||||||
|
|
||||||
// SetMachineId may only be called by one thread before any id generation
|
// SetMachineId may only be called by one thread before any id generation
|
||||||
// is done. It must be set if multiple machines are generating ids in order
|
// is done. It must be set if multiple machines are generating ids in order
|
||||||
// to avoid collisions. Only the least significant 48 bits are used.
|
// to avoid collisions. Only the least significant 48 bits are used.
|
||||||
func SetMachineId(id uint64) {
|
func SetMachineId(ID uint64) {
|
||||||
machineId = id
|
machineID = ID
|
||||||
}
|
}
|
||||||
|
|
||||||
// SetMachineIdHost is a convenience wrapper to hide bit twiddling of
|
// SetMachineIdHost is a convenience wrapper to hide bit twiddling of
|
||||||
// calling SetMachineId, it has the same constraints as SetMachineId
|
// calling SetMachineId, it has the same constraints as SetMachineId
|
||||||
// with an addition that net.IP must be a ipv4 address.
|
// with an addition that net.IP must be a ipv4 address.
|
||||||
func SetMachineIdHost(addr net.IP, port uint16) {
|
func SetMachineIdHost(addr net.IP, port uint16) {
|
||||||
var machineId uint64 // 48 bits
|
var machineID uint64 // 48 bits
|
||||||
machineId |= uint64(addr[0]) << 40
|
machineID |= uint64(addr[0]) << 40
|
||||||
machineId |= uint64(addr[1]) << 32
|
machineID |= uint64(addr[1]) << 32
|
||||||
machineId |= uint64(addr[2]) << 24
|
machineID |= uint64(addr[2]) << 24
|
||||||
machineId |= uint64(addr[3]) << 16
|
machineID |= uint64(addr[3]) << 16
|
||||||
machineId |= uint64(port)
|
machineID |= uint64(port)
|
||||||
|
|
||||||
SetMachineId(machineId)
|
SetMachineId(machineID)
|
||||||
}
|
}
|
||||||
|
|
||||||
// New will generate a new Id for use. New is safe to be called from
|
// New will generate a new Id for use. New is safe to be called from
|
||||||
@@ -40,7 +40,7 @@ func SetMachineIdHost(addr net.IP, port uint16) {
|
|||||||
// New are made. 2^32 calls to New per millisecond will be unique, provided
|
// New are made. 2^32 calls to New per millisecond will be unique, provided
|
||||||
// machine id is seeded correctly across machines.
|
// machine id is seeded correctly across machines.
|
||||||
//
|
//
|
||||||
// binary format: [ [ 48 bits time ] [ 48 bits machineId ] [ 32 bits counter ] ]
|
// binary format: [ [ 48 bits time ] [ 48 bits machineID ] [ 32 bits counter ] ]
|
||||||
//
|
//
|
||||||
// Ids are sortable within (not between, thanks to clocks) each machine, with
|
// Ids are sortable within (not between, thanks to clocks) each machine, with
|
||||||
// a modified base32 encoding exposed for convenience in API usage.
|
// a modified base32 encoding exposed for convenience in API usage.
|
||||||
@@ -58,10 +58,10 @@ func New() Id {
|
|||||||
id[4] = byte(ms >> 8)
|
id[4] = byte(ms >> 8)
|
||||||
id[5] = byte(ms)
|
id[5] = byte(ms)
|
||||||
|
|
||||||
id[6] = byte(machineId >> 12)
|
id[6] = byte(machineID >> 12)
|
||||||
id[7] = byte(machineId >> 4)
|
id[7] = byte(machineID >> 4)
|
||||||
|
|
||||||
id[8] = byte(machineId<<4) | byte((count<<4)>>60)
|
id[8] = byte(machineID<<4) | byte((count<<4)>>60)
|
||||||
|
|
||||||
id[8] = byte(count >> 48)
|
id[8] = byte(count >> 48)
|
||||||
id[8] = byte(count >> 40)
|
id[8] = byte(count >> 40)
|
||||||
|
|||||||
@@ -43,11 +43,11 @@ func (c *Config) Scan(value interface{}) error {
|
|||||||
|
|
||||||
if len(b) > 0 {
|
if len(b) > 0 {
|
||||||
return json.Unmarshal(b, c)
|
return json.Unmarshal(b, c)
|
||||||
} else {
|
}
|
||||||
|
|
||||||
*c = nil
|
*c = nil
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
// otherwise, return an error
|
// otherwise, return an error
|
||||||
return fmt.Errorf("config invalid db format: %T %T value, err: %v", value, bv, err)
|
return fmt.Errorf("config invalid db format: %T %T value, err: %v", value, bv, err)
|
||||||
@@ -85,11 +85,11 @@ func (h *Headers) Scan(value interface{}) error {
|
|||||||
|
|
||||||
if len(b) > 0 {
|
if len(b) > 0 {
|
||||||
return json.Unmarshal(b, h)
|
return json.Unmarshal(b, h)
|
||||||
} else {
|
}
|
||||||
|
|
||||||
*h = nil
|
*h = nil
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
// otherwise, return an error
|
// otherwise, return an error
|
||||||
return fmt.Errorf("headers invalid db format: %T %T value, err: %v", value, bv, err)
|
return fmt.Errorf("headers invalid db format: %T %T value, err: %v", value, bv, err)
|
||||||
|
|||||||
@@ -31,10 +31,8 @@ func (m Reason) validateReasonEnum(path, location string, value Reason) error {
|
|||||||
reasonEnum = append(reasonEnum, v)
|
reasonEnum = append(reasonEnum, v)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if err := validate.Enum(path, location, value, reasonEnum); err != nil {
|
err := validate.Enum(path, location, value, reasonEnum)
|
||||||
return err
|
return err
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Validate validates this reason
|
// Validate validates this reason
|
||||||
|
|||||||
@@ -72,7 +72,8 @@ func (r *Route) Validate() error {
|
|||||||
|
|
||||||
if r.Path == "" {
|
if r.Path == "" {
|
||||||
return ErrRoutesMissingPath
|
return ErrRoutesMissingPath
|
||||||
} else {
|
}
|
||||||
|
|
||||||
u, err := url.Parse(r.Path)
|
u, err := url.Parse(r.Path)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return ErrPathMalformed
|
return ErrPathMalformed
|
||||||
@@ -85,7 +86,6 @@ func (r *Route) Validate() error {
|
|||||||
if !path.IsAbs(u.Path) {
|
if !path.IsAbs(u.Path) {
|
||||||
return ErrRoutesInvalidPath
|
return ErrRoutesInvalidPath
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
if r.Image == "" {
|
if r.Image == "" {
|
||||||
return ErrRoutesMissingImage
|
return ErrRoutesMissingImage
|
||||||
|
|||||||
@@ -81,7 +81,7 @@ func (mq *RedisMQ) processPendingReservations() {
|
|||||||
logrus.WithError(err).Error("Redis command error")
|
logrus.WithError(err).Error("Redis command error")
|
||||||
}
|
}
|
||||||
|
|
||||||
reservationId, timeoutString, err := getFirstKeyValue(resp)
|
reservationID, timeoutString, err := getFirstKeyValue(resp)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logrus.WithError(err).Error("error getting kv")
|
logrus.WithError(err).Error("error getting kv")
|
||||||
return
|
return
|
||||||
@@ -91,7 +91,7 @@ func (mq *RedisMQ) processPendingReservations() {
|
|||||||
if err != nil || timeout > time.Now().Unix() {
|
if err != nil || timeout > time.Now().Unix() {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
response, err := redis.Bytes(conn.Do("HGET", mq.k("timeout_jobs"), reservationId))
|
response, err := redis.Bytes(conn.Do("HGET", mq.k("timeout_jobs"), reservationID))
|
||||||
if mq.checkNilResponse(err) {
|
if mq.checkNilResponse(err) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
@@ -108,8 +108,8 @@ func (mq *RedisMQ) processPendingReservations() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// :( because fuck atomicity right?
|
// :( because fuck atomicity right?
|
||||||
conn.Do("ZREM", mq.k("timeouts"), reservationId)
|
conn.Do("ZREM", mq.k("timeouts"), reservationID)
|
||||||
conn.Do("HDEL", mq.k("timeout_jobs"), reservationId)
|
conn.Do("HDEL", mq.k("timeout_jobs"), reservationID)
|
||||||
conn.Do("HDEL", mq.k("reservations"), job.ID)
|
conn.Do("HDEL", mq.k("reservations"), job.ID)
|
||||||
redisPush(conn, mq.queueName, &job)
|
redisPush(conn, mq.queueName, &job)
|
||||||
}
|
}
|
||||||
@@ -127,9 +127,9 @@ func (mq *RedisMQ) processDelayedCalls() {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, resId := range resIds {
|
for _, resID := range resIds {
|
||||||
// Might be a good idea to do this transactionally so we do not have left over reservationIds if the delete fails.
|
// Might be a good idea to do this transactionally so we do not have left over reservationIds if the delete fails.
|
||||||
buf, err := redis.Bytes(conn.Do("HGET", mq.k("delayed_jobs"), resId))
|
buf, err := redis.Bytes(conn.Do("HGET", mq.k("delayed_jobs"), resID))
|
||||||
// If:
|
// If:
|
||||||
// a) A HSET in Push() failed, or
|
// a) A HSET in Push() failed, or
|
||||||
// b) A previous zremrangebyscore failed,
|
// b) A previous zremrangebyscore failed,
|
||||||
@@ -137,23 +137,23 @@ func (mq *RedisMQ) processDelayedCalls() {
|
|||||||
if err == redis.ErrNil {
|
if err == redis.ErrNil {
|
||||||
continue
|
continue
|
||||||
} else if err != nil {
|
} else if err != nil {
|
||||||
logrus.WithError(err).WithFields(logrus.Fields{"reservationId": resId}).Error("Error HGET delayed_jobs")
|
logrus.WithError(err).WithFields(logrus.Fields{"reservationId": resID}).Error("Error HGET delayed_jobs")
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
var job models.Call
|
var job models.Call
|
||||||
err = json.Unmarshal(buf, &job)
|
err = json.Unmarshal(buf, &job)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logrus.WithError(err).WithFields(logrus.Fields{"buf": buf, "reservationId": resId}).Error("Error unmarshaling job")
|
logrus.WithError(err).WithFields(logrus.Fields{"buf": buf, "reservationId": resID}).Error("Error unmarshaling job")
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
_, err = redisPush(conn, mq.queueName, &job)
|
_, err = redisPush(conn, mq.queueName, &job)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logrus.WithError(err).WithFields(logrus.Fields{"reservationId": resId}).Error("Pushing delayed job")
|
logrus.WithError(err).WithFields(logrus.Fields{"reservationId": resID}).Error("Pushing delayed job")
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
conn.Do("HDEL", mq.k("delayed_jobs"), resId)
|
conn.Do("HDEL", mq.k("delayed_jobs"), resID)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Remove everything we processed.
|
// Remove everything we processed.
|
||||||
@@ -192,16 +192,16 @@ func (mq *RedisMQ) delayCall(conn redis.Conn, job *models.Call) (*models.Call, e
|
|||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
reservationId := strconv.FormatInt(resp, 10)
|
reservationID := strconv.FormatInt(resp, 10)
|
||||||
|
|
||||||
// Timestamp -> resID
|
// Timestamp -> resID
|
||||||
_, err = conn.Do("ZADD", mq.k("delays"), time.Now().UTC().Add(time.Duration(job.Delay)*time.Second).Unix(), reservationId)
|
_, err = conn.Do("ZADD", mq.k("delays"), time.Now().UTC().Add(time.Duration(job.Delay)*time.Second).Unix(), reservationID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
// resID -> Task
|
// resID -> Task
|
||||||
_, err = conn.Do("HSET", mq.k("delayed_jobs"), reservationId, buf)
|
_, err = conn.Do("HSET", mq.k("delayed_jobs"), reservationID, buf)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@@ -264,18 +264,18 @@ func (mq *RedisMQ) Reserve(ctx context.Context) (*models.Call, error) {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
reservationId := strconv.FormatInt(response, 10)
|
reservationID := strconv.FormatInt(response, 10)
|
||||||
_, err = conn.Do("ZADD", "timeout:", time.Now().Add(time.Minute).Unix(), reservationId)
|
_, err = conn.Do("ZADD", "timeout:", time.Now().Add(time.Minute).Unix(), reservationID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
_, err = conn.Do("HSET", "timeout", reservationId, resp)
|
_, err = conn.Do("HSET", "timeout", reservationID, resp)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
// Map from job.ID -> reservation ID
|
// Map from job.ID -> reservation ID
|
||||||
_, err = conn.Do("HSET", "reservations", job.ID, reservationId)
|
_, err = conn.Do("HSET", "reservations", job.ID, reservationID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@@ -292,7 +292,7 @@ func (mq *RedisMQ) Delete(ctx context.Context, job *models.Call) error {
|
|||||||
|
|
||||||
conn := mq.pool.Get()
|
conn := mq.pool.Get()
|
||||||
defer conn.Close()
|
defer conn.Close()
|
||||||
resId, err := conn.Do("HGET", "reservations", job.ID)
|
resID, err := conn.Do("HGET", "reservations", job.ID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@@ -300,10 +300,10 @@ func (mq *RedisMQ) Delete(ctx context.Context, job *models.Call) error {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
_, err = conn.Do("ZREM", "timeout:", resId)
|
_, err = conn.Do("ZREM", "timeout:", resID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
_, err = conn.Do("HDEL", "timeout", resId)
|
_, err = conn.Do("HDEL", "timeout", resID)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -90,13 +90,12 @@ func (s *Server) ensureRoute(ctx context.Context, method string, wroute *models.
|
|||||||
return *bad, err
|
return *bad, err
|
||||||
}
|
}
|
||||||
return routeResponse{"Route successfully created", wroute.Route}, nil
|
return routeResponse{"Route successfully created", wroute.Route}, nil
|
||||||
} else {
|
}
|
||||||
err := s.changeRoute(ctx, wroute)
|
err = s.changeRoute(ctx, wroute)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return *bad, err
|
return *bad, err
|
||||||
}
|
}
|
||||||
return routeResponse{"Route successfully updated", wroute.Route}, nil
|
return routeResponse{"Route successfully updated", wroute.Route}, nil
|
||||||
}
|
|
||||||
case http.MethodPatch:
|
case http.MethodPatch:
|
||||||
err := s.changeRoute(ctx, wroute)
|
err := s.changeRoute(ctx, wroute)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|||||||
@@ -88,7 +88,7 @@ func New(ctx context.Context, ds models.Datastore, mq models.MessageQueue, logDB
|
|||||||
LogDB: logDB,
|
LogDB: logDB,
|
||||||
}
|
}
|
||||||
|
|
||||||
setMachineId()
|
setMachineID()
|
||||||
s.Router.Use(loggerWrap, traceWrap, panicWrap)
|
s.Router.Use(loggerWrap, traceWrap, panicWrap)
|
||||||
s.bindHandlers(ctx)
|
s.bindHandlers(ctx)
|
||||||
|
|
||||||
@@ -167,7 +167,7 @@ func setTracer() {
|
|||||||
logrus.WithFields(logrus.Fields{"url": zipkinHTTPEndpoint}).Info("started tracer")
|
logrus.WithFields(logrus.Fields{"url": zipkinHTTPEndpoint}).Info("started tracer")
|
||||||
}
|
}
|
||||||
|
|
||||||
func setMachineId() {
|
func setMachineID() {
|
||||||
port := uint16(viper.GetInt(EnvPort))
|
port := uint16(viper.GetInt(EnvPort))
|
||||||
addr := whoAmI().To4()
|
addr := whoAmI().To4()
|
||||||
if addr == nil {
|
if addr == nil {
|
||||||
|
|||||||
Reference in New Issue
Block a user