mirror of
https://github.com/fnproject/fn.git
synced 2022-10-28 21:29:17 +03:00
Merge pull request #168 from fnproject/fix-async-task-reserving
Fix async task reserving
This commit is contained in:
3
Makefile
3
Makefile
@@ -13,6 +13,9 @@ build:
|
||||
test:
|
||||
./test.sh
|
||||
|
||||
fmt:
|
||||
./go-fmt.sh
|
||||
|
||||
test-datastore:
|
||||
cd api/datastore && go test -v ./...
|
||||
|
||||
|
||||
@@ -3,9 +3,9 @@ package datastoreutil
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/fnproject/fn/api/models"
|
||||
"github.com/jmoiron/sqlx"
|
||||
"github.com/opentracing/opentracing-go"
|
||||
"github.com/fnproject/fn/api/models"
|
||||
)
|
||||
|
||||
func MetricDS(ds models.Datastore) models.Datastore {
|
||||
|
||||
@@ -2,8 +2,8 @@ package logs
|
||||
|
||||
import (
|
||||
"context"
|
||||
"github.com/pkg/errors"
|
||||
"github.com/fnproject/fn/api/models"
|
||||
"github.com/pkg/errors"
|
||||
)
|
||||
|
||||
type mock struct {
|
||||
|
||||
@@ -6,9 +6,9 @@ import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/go-openapi/strfmt"
|
||||
"github.com/fnproject/fn/api/id"
|
||||
"github.com/fnproject/fn/api/models"
|
||||
"github.com/go-openapi/strfmt"
|
||||
)
|
||||
|
||||
var testApp = &models.App{
|
||||
|
||||
@@ -10,9 +10,9 @@ import (
|
||||
"sync"
|
||||
|
||||
"github.com/Sirupsen/logrus"
|
||||
"github.com/fnproject/fn/api/models"
|
||||
mq_config "github.com/iron-io/iron_go3/config"
|
||||
ironmq "github.com/iron-io/iron_go3/mq"
|
||||
"github.com/fnproject/fn/api/models"
|
||||
)
|
||||
|
||||
type assoc struct {
|
||||
|
||||
@@ -8,9 +8,9 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/Sirupsen/logrus"
|
||||
"github.com/google/btree"
|
||||
"github.com/fnproject/fn/api/models"
|
||||
"github.com/fnproject/fn/api/runner/common"
|
||||
"github.com/google/btree"
|
||||
)
|
||||
|
||||
type MemoryMQ struct {
|
||||
|
||||
@@ -7,8 +7,8 @@ import (
|
||||
"strings"
|
||||
|
||||
"github.com/Sirupsen/logrus"
|
||||
"github.com/opentracing/opentracing-go"
|
||||
"github.com/fnproject/fn/api/models"
|
||||
"github.com/opentracing/opentracing-go"
|
||||
)
|
||||
|
||||
// New will parse the URL and return the correct MQ implementation.
|
||||
|
||||
@@ -10,9 +10,9 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/Sirupsen/logrus"
|
||||
"github.com/garyburd/redigo/redis"
|
||||
"github.com/fnproject/fn/api/models"
|
||||
"github.com/fnproject/fn/api/runner/common"
|
||||
"github.com/garyburd/redigo/redis"
|
||||
)
|
||||
|
||||
type RedisMQ struct {
|
||||
|
||||
@@ -14,20 +14,37 @@ import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"crypto/tls"
|
||||
"fmt"
|
||||
"github.com/Sirupsen/logrus"
|
||||
"github.com/opentracing/opentracing-go"
|
||||
"github.com/fnproject/fn/api/models"
|
||||
"github.com/fnproject/fn/api/runner/common"
|
||||
"github.com/fnproject/fn/api/runner/task"
|
||||
"github.com/opentracing/opentracing-go"
|
||||
)
|
||||
|
||||
var client = &http.Client{
|
||||
Transport: &http.Transport{
|
||||
Proxy: http.ProxyFromEnvironment,
|
||||
Dial: (&net.Dialer{
|
||||
Timeout: 10 * time.Second,
|
||||
KeepAlive: 120 * time.Second,
|
||||
}).Dial,
|
||||
MaxIdleConnsPerHost: 512,
|
||||
TLSHandshakeTimeout: 10 * time.Second,
|
||||
TLSClientConfig: &tls.Config{
|
||||
ClientSessionCache: tls.NewLRUClientSessionCache(4096),
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
func getTask(ctx context.Context, url string) (*models.Task, error) {
|
||||
// TODO shove this ctx into the request?
|
||||
span, _ := opentracing.StartSpanFromContext(ctx, "get_task")
|
||||
defer span.Finish()
|
||||
|
||||
// TODO uh, make a better http client :facepalm:
|
||||
resp, err := http.Get(url)
|
||||
req, _ := http.NewRequest(http.MethodGet, url, nil)
|
||||
resp, err := client.Do(req.WithContext(ctx))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -35,6 +52,9 @@ func getTask(ctx context.Context, url string) (*models.Task, error) {
|
||||
io.Copy(ioutil.Discard, resp.Body)
|
||||
resp.Body.Close()
|
||||
}()
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
return nil, errors.New(fmt.Sprintf("Unable to get task. Reason: %v", resp.Status))
|
||||
}
|
||||
|
||||
var task models.Task
|
||||
err = json.NewDecoder(resp.Body).Decode(&task)
|
||||
@@ -86,10 +106,17 @@ func deleteTask(ctx context.Context, url string, task *models.Task) error {
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
c := &http.Client{}
|
||||
if resp, err := c.Do(req); err != nil {
|
||||
|
||||
resp, err := client.Do(req)
|
||||
if err != nil {
|
||||
return err
|
||||
} else if resp.StatusCode != http.StatusAccepted {
|
||||
}
|
||||
defer func() {
|
||||
io.Copy(ioutil.Discard, resp.Body)
|
||||
resp.Body.Close()
|
||||
}()
|
||||
|
||||
if resp.StatusCode != http.StatusAccepted {
|
||||
body, err := ioutil.ReadAll(resp.Body)
|
||||
if err != nil {
|
||||
return err
|
||||
|
||||
@@ -14,13 +14,13 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/Sirupsen/logrus"
|
||||
"github.com/gin-gonic/gin"
|
||||
"github.com/fnproject/fn/api/datastore"
|
||||
"github.com/fnproject/fn/api/logs"
|
||||
"github.com/fnproject/fn/api/models"
|
||||
"github.com/fnproject/fn/api/mqs"
|
||||
"github.com/fnproject/fn/api/runner/drivers"
|
||||
"github.com/fnproject/fn/api/runner/task"
|
||||
"github.com/gin-gonic/gin"
|
||||
)
|
||||
|
||||
func setLogBuffer() *bytes.Buffer {
|
||||
@@ -63,7 +63,7 @@ func getTestServer(mockTasks []*models.Task) *httptest.Server {
|
||||
c.JSON(http.StatusInternalServerError, err)
|
||||
return
|
||||
}
|
||||
c.JSON(http.StatusAccepted, task)
|
||||
c.JSON(http.StatusOK, task)
|
||||
}
|
||||
|
||||
delHandler := func(c *gin.Context) {
|
||||
@@ -120,7 +120,7 @@ func TestGetTaskError(t *testing.T) {
|
||||
{
|
||||
"url": "/invalid",
|
||||
"task": getMockTask(),
|
||||
"error": "json: cannot unmarshal number into Go value of type models.Task", // TODO WTF!
|
||||
"error": "Unable to get task. Reason: 404 Not Found",
|
||||
},
|
||||
}
|
||||
|
||||
|
||||
@@ -15,11 +15,11 @@ import (
|
||||
|
||||
"github.com/Sirupsen/logrus"
|
||||
manifest "github.com/docker/distribution/manifest/schema1"
|
||||
"github.com/fnproject/fn/api/runner/common"
|
||||
"github.com/fnproject/fn/api/runner/drivers"
|
||||
"github.com/fsouza/go-dockerclient"
|
||||
"github.com/heroku/docker-registry-client/registry"
|
||||
"github.com/opentracing/opentracing-go"
|
||||
"github.com/fnproject/fn/api/runner/common"
|
||||
"github.com/fnproject/fn/api/runner/drivers"
|
||||
)
|
||||
|
||||
const hubURL = "https://registry.hub.docker.com"
|
||||
|
||||
@@ -13,10 +13,10 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/Sirupsen/logrus"
|
||||
"github.com/fnproject/fn/api/runner/common"
|
||||
"github.com/fsouza/go-dockerclient"
|
||||
"github.com/opentracing/opentracing-go"
|
||||
"github.com/opentracing/opentracing-go/log"
|
||||
"github.com/fnproject/fn/api/runner/common"
|
||||
)
|
||||
|
||||
const (
|
||||
|
||||
@@ -9,9 +9,9 @@ import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/vrischmann/envconfig"
|
||||
"github.com/fnproject/fn/api/runner/common"
|
||||
"github.com/fnproject/fn/api/runner/drivers"
|
||||
"github.com/vrischmann/envconfig"
|
||||
)
|
||||
|
||||
type taskDockerTest struct {
|
||||
|
||||
@@ -14,14 +14,14 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/Sirupsen/logrus"
|
||||
"github.com/opentracing/opentracing-go"
|
||||
"github.com/opentracing/opentracing-go/log"
|
||||
"github.com/fnproject/fn/api/models"
|
||||
"github.com/fnproject/fn/api/runner/common"
|
||||
"github.com/fnproject/fn/api/runner/drivers"
|
||||
"github.com/fnproject/fn/api/runner/drivers/docker"
|
||||
"github.com/fnproject/fn/api/runner/drivers/mock"
|
||||
"github.com/fnproject/fn/api/runner/task"
|
||||
"github.com/opentracing/opentracing-go"
|
||||
"github.com/opentracing/opentracing-go/log"
|
||||
)
|
||||
|
||||
// TODO clean all of this up, the exposed API is huge and incohesive,
|
||||
|
||||
@@ -10,9 +10,9 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/docker/cli/cli/config/configfile"
|
||||
docker "github.com/fsouza/go-dockerclient"
|
||||
"github.com/fnproject/fn/api/runner/drivers"
|
||||
"github.com/fnproject/fn/api/runner/task"
|
||||
docker "github.com/fsouza/go-dockerclient"
|
||||
)
|
||||
|
||||
var registries dockerRegistries
|
||||
|
||||
@@ -9,13 +9,13 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/Sirupsen/logrus"
|
||||
"github.com/go-openapi/strfmt"
|
||||
"github.com/opentracing/opentracing-go"
|
||||
"github.com/fnproject/fn/api/id"
|
||||
"github.com/fnproject/fn/api/models"
|
||||
"github.com/fnproject/fn/api/runner/drivers"
|
||||
"github.com/fnproject/fn/api/runner/protocol"
|
||||
"github.com/fnproject/fn/api/runner/task"
|
||||
"github.com/go-openapi/strfmt"
|
||||
"github.com/opentracing/opentracing-go"
|
||||
)
|
||||
|
||||
// hot functions - theory of operation
|
||||
|
||||
@@ -3,8 +3,8 @@ package server
|
||||
import (
|
||||
"net/http"
|
||||
|
||||
"github.com/gin-gonic/gin"
|
||||
"github.com/fnproject/fn/api/models"
|
||||
"github.com/gin-gonic/gin"
|
||||
)
|
||||
|
||||
func (s *Server) handleAppCreate(c *gin.Context) {
|
||||
|
||||
@@ -3,10 +3,10 @@ package server
|
||||
import (
|
||||
"net/http"
|
||||
|
||||
"github.com/gin-gonic/gin"
|
||||
"github.com/fnproject/fn/api"
|
||||
"github.com/fnproject/fn/api/models"
|
||||
"github.com/fnproject/fn/api/runner/common"
|
||||
"github.com/gin-gonic/gin"
|
||||
)
|
||||
|
||||
func (s *Server) handleAppDelete(c *gin.Context) {
|
||||
|
||||
@@ -3,8 +3,8 @@ package server
|
||||
import (
|
||||
"net/http"
|
||||
|
||||
"github.com/gin-gonic/gin"
|
||||
"github.com/fnproject/fn/api"
|
||||
"github.com/gin-gonic/gin"
|
||||
)
|
||||
|
||||
func (s *Server) handleAppGet(c *gin.Context) {
|
||||
|
||||
@@ -3,8 +3,8 @@ package server
|
||||
import (
|
||||
"net/http"
|
||||
|
||||
"github.com/gin-gonic/gin"
|
||||
"github.com/fnproject/fn/api/models"
|
||||
"github.com/gin-gonic/gin"
|
||||
)
|
||||
|
||||
func (s *Server) handleAppList(c *gin.Context) {
|
||||
|
||||
@@ -8,11 +8,11 @@ import (
|
||||
"testing"
|
||||
|
||||
"github.com/Sirupsen/logrus"
|
||||
"github.com/gin-gonic/gin"
|
||||
"github.com/fnproject/fn/api/datastore"
|
||||
"github.com/fnproject/fn/api/logs"
|
||||
"github.com/fnproject/fn/api/models"
|
||||
"github.com/fnproject/fn/api/mqs"
|
||||
"github.com/gin-gonic/gin"
|
||||
)
|
||||
|
||||
func setLogBuffer() *bytes.Buffer {
|
||||
|
||||
@@ -3,9 +3,9 @@ package server
|
||||
import (
|
||||
"net/http"
|
||||
|
||||
"github.com/gin-gonic/gin"
|
||||
"github.com/fnproject/fn/api"
|
||||
"github.com/fnproject/fn/api/models"
|
||||
"github.com/gin-gonic/gin"
|
||||
)
|
||||
|
||||
func (s *Server) handleAppUpdate(c *gin.Context) {
|
||||
|
||||
@@ -3,8 +3,8 @@ package server
|
||||
import (
|
||||
"net/http"
|
||||
|
||||
"github.com/gin-gonic/gin"
|
||||
"github.com/fnproject/fn/api"
|
||||
"github.com/gin-gonic/gin"
|
||||
)
|
||||
|
||||
func (s *Server) handleCallGet(c *gin.Context) {
|
||||
|
||||
@@ -3,8 +3,8 @@ package server
|
||||
import (
|
||||
"net/http"
|
||||
|
||||
"github.com/gin-gonic/gin"
|
||||
"github.com/fnproject/fn/api"
|
||||
"github.com/gin-gonic/gin"
|
||||
)
|
||||
|
||||
func (s *Server) handleCallLogGet(c *gin.Context) {
|
||||
|
||||
@@ -4,9 +4,9 @@ package server
|
||||
import (
|
||||
"net/http"
|
||||
|
||||
"github.com/gin-gonic/gin"
|
||||
"github.com/fnproject/fn/api"
|
||||
"github.com/fnproject/fn/api/models"
|
||||
"github.com/gin-gonic/gin"
|
||||
)
|
||||
|
||||
type ApiHandlerFunc func(w http.ResponseWriter, r *http.Request)
|
||||
|
||||
@@ -6,9 +6,9 @@ import (
|
||||
"path"
|
||||
"strings"
|
||||
|
||||
"github.com/gin-gonic/gin"
|
||||
"github.com/fnproject/fn/api"
|
||||
"github.com/fnproject/fn/api/models"
|
||||
"github.com/gin-gonic/gin"
|
||||
)
|
||||
|
||||
/* handleRouteCreateOrUpdate is used to handle POST PUT and PATCH for routes.
|
||||
|
||||
@@ -4,8 +4,8 @@ import (
|
||||
"net/http"
|
||||
"path"
|
||||
|
||||
"github.com/gin-gonic/gin"
|
||||
"github.com/fnproject/fn/api"
|
||||
"github.com/gin-gonic/gin"
|
||||
)
|
||||
|
||||
func (s *Server) handleRouteDelete(c *gin.Context) {
|
||||
|
||||
@@ -4,8 +4,8 @@ import (
|
||||
"net/http"
|
||||
"path"
|
||||
|
||||
"github.com/gin-gonic/gin"
|
||||
"github.com/fnproject/fn/api"
|
||||
"github.com/gin-gonic/gin"
|
||||
)
|
||||
|
||||
func (s *Server) handleRouteGet(c *gin.Context) {
|
||||
|
||||
@@ -3,9 +3,9 @@ package server
|
||||
import (
|
||||
"net/http"
|
||||
|
||||
"github.com/gin-gonic/gin"
|
||||
"github.com/fnproject/fn/api"
|
||||
"github.com/fnproject/fn/api/models"
|
||||
"github.com/gin-gonic/gin"
|
||||
)
|
||||
|
||||
func (s *Server) handleRouteList(c *gin.Context) {
|
||||
|
||||
@@ -12,15 +12,15 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/Sirupsen/logrus"
|
||||
"github.com/gin-gonic/gin"
|
||||
"github.com/go-openapi/strfmt"
|
||||
cache "github.com/patrickmn/go-cache"
|
||||
"github.com/fnproject/fn/api"
|
||||
"github.com/fnproject/fn/api/id"
|
||||
"github.com/fnproject/fn/api/models"
|
||||
"github.com/fnproject/fn/api/runner"
|
||||
"github.com/fnproject/fn/api/runner/common"
|
||||
"github.com/fnproject/fn/api/runner/task"
|
||||
"github.com/gin-gonic/gin"
|
||||
"github.com/go-openapi/strfmt"
|
||||
cache "github.com/patrickmn/go-cache"
|
||||
)
|
||||
|
||||
type runnerResponse struct {
|
||||
|
||||
@@ -9,12 +9,12 @@ import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/gin-gonic/gin"
|
||||
cache "github.com/patrickmn/go-cache"
|
||||
"github.com/fnproject/fn/api/datastore"
|
||||
"github.com/fnproject/fn/api/models"
|
||||
"github.com/fnproject/fn/api/mqs"
|
||||
"github.com/fnproject/fn/api/runner"
|
||||
"github.com/gin-gonic/gin"
|
||||
cache "github.com/patrickmn/go-cache"
|
||||
)
|
||||
|
||||
func testRouterAsync(ds models.Datastore, mq models.MessageQueue, rnr *runner.Runner, enqueue models.Enqueue) *gin.Engine {
|
||||
|
||||
@@ -267,7 +267,7 @@ func (s *Server) handleTaskRequest(c *gin.Context) {
|
||||
handleErrorResponse(c, err)
|
||||
return
|
||||
}
|
||||
c.JSON(http.StatusAccepted, task)
|
||||
c.JSON(http.StatusOK, task)
|
||||
case "DELETE":
|
||||
body, err := ioutil.ReadAll(c.Request.Body)
|
||||
if err != nil {
|
||||
|
||||
@@ -12,12 +12,12 @@ import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/gin-gonic/gin"
|
||||
cache "github.com/patrickmn/go-cache"
|
||||
"github.com/fnproject/fn/api/datastore"
|
||||
"github.com/fnproject/fn/api/models"
|
||||
"github.com/fnproject/fn/api/mqs"
|
||||
"github.com/fnproject/fn/api/runner"
|
||||
"github.com/gin-gonic/gin"
|
||||
cache "github.com/patrickmn/go-cache"
|
||||
)
|
||||
|
||||
var tmpDatastoreTests = "/tmp/func_test_datastore.db"
|
||||
|
||||
@@ -3,8 +3,8 @@ package server
|
||||
import (
|
||||
"net/http"
|
||||
|
||||
"github.com/gin-gonic/gin"
|
||||
"github.com/fnproject/fn/api/version"
|
||||
"github.com/gin-gonic/gin"
|
||||
)
|
||||
|
||||
func handleVersion(c *gin.Context) {
|
||||
|
||||
@@ -27,6 +27,8 @@ dependencies:
|
||||
|
||||
test:
|
||||
override:
|
||||
- make fmt:
|
||||
pwd: $GO_PROJECT
|
||||
- make test:
|
||||
pwd: $GO_PROJECT
|
||||
- make test-build-arm:
|
||||
|
||||
@@ -10,10 +10,10 @@ import (
|
||||
"path/filepath"
|
||||
"time"
|
||||
|
||||
client "github.com/fnproject/fn/cli/client"
|
||||
functions "github.com/funcy/functions_go"
|
||||
"github.com/funcy/functions_go/models"
|
||||
"github.com/urfave/cli"
|
||||
client "github.com/fnproject/fn/cli/client"
|
||||
)
|
||||
|
||||
func deploy() cli.Command {
|
||||
|
||||
@@ -17,8 +17,8 @@ import (
|
||||
|
||||
"strings"
|
||||
|
||||
"github.com/urfave/cli"
|
||||
"github.com/fnproject/fn/cli/langs"
|
||||
"github.com/urfave/cli"
|
||||
)
|
||||
|
||||
var (
|
||||
|
||||
@@ -11,12 +11,12 @@ import (
|
||||
"strings"
|
||||
"text/tabwriter"
|
||||
|
||||
client "github.com/fnproject/fn/cli/client"
|
||||
fnclient "github.com/funcy/functions_go/client"
|
||||
apiroutes "github.com/funcy/functions_go/client/routes"
|
||||
fnmodels "github.com/funcy/functions_go/models"
|
||||
"github.com/jmoiron/jsonq"
|
||||
"github.com/urfave/cli"
|
||||
client "github.com/fnproject/fn/cli/client"
|
||||
)
|
||||
|
||||
type routesCmd struct {
|
||||
|
||||
@@ -12,10 +12,10 @@ import (
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/fnproject/fn/cli/client"
|
||||
functions "github.com/funcy/functions_go"
|
||||
"github.com/onsi/gomega"
|
||||
"github.com/urfave/cli"
|
||||
"github.com/fnproject/fn/cli/client"
|
||||
)
|
||||
|
||||
type testStruct struct {
|
||||
|
||||
@@ -4,8 +4,8 @@ package tests
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/funcy/functions_go/models"
|
||||
"github.com/fnproject/fn/api/id"
|
||||
"github.com/funcy/functions_go/models"
|
||||
)
|
||||
|
||||
func TestRoutes(t *testing.T) {
|
||||
|
||||
Reference in New Issue
Block a user