Deploy function on bare docker. (#286)

This commit is contained in:
Minghe
2019-10-08 11:08:01 +08:00
committed by GitHub
parent f70ef1c406
commit 3454ec0fdb
29 changed files with 376 additions and 155 deletions

View File

@@ -33,7 +33,7 @@ jobs:
DOCKER_USERNAME: ${{ secrets.DOCKER_USERNAME }}
run: |
export KUBECONFIG=/home/runner/.kube/kind-config-fx-test
DEBUG=true go test -v ./image/... ./container/...
DEBUG=true go test -v ./container_runtimes/... ./deploy/...
- name: build fx
run: |
@@ -49,6 +49,8 @@ jobs:
- name: test fx cli
run: |
echo $KUBECONFIG
unset KUBECONFIG
make cli-test
- name: test AKS

View File

@@ -1,30 +0,0 @@
package docker
import (
"context"
"github.com/metrue/fx/container"
)
type Docker struct {
}
func (d *Docker) Deploy(ctx context.Context, name string, image string, port []int32) error {
return nil
}
func (d *Docker) Update(ctx context.Context, name string) error {
return nil
}
func (d *Docker) Destroy(ctx context.Context, name string) error {
return nil
}
func (d *Docker) GetStatus(ctx context.Context, name string) error {
return nil
}
var (
_ container.Runner = &Docker{}
)

View File

@@ -1,6 +1,7 @@
package docker
import (
"bytes"
"context"
"encoding/base64"
"encoding/json"
@@ -11,9 +12,12 @@ import (
"github.com/apex/log"
dockerTypes "github.com/docker/docker/api/types"
dockerTypesContainer "github.com/docker/docker/api/types/container"
"github.com/docker/docker/client"
"github.com/docker/go-connections/nat"
"github.com/google/uuid"
"github.com/metrue/fx/image"
containerruntimes "github.com/metrue/fx/container_runtimes"
"github.com/metrue/fx/types"
"github.com/metrue/fx/utils"
)
@@ -23,8 +27,7 @@ type Docker struct {
}
// CreateClient create a docker instance
func CreateClient() (*Docker, error) {
ctx := context.Background()
func CreateClient(ctx context.Context) (*Docker, error) {
cli, err := client.NewClientWithOpts(client.FromEnv)
if err != nil {
return nil, err
@@ -33,8 +36,8 @@ func CreateClient() (*Docker, error) {
return &Docker{cli}, nil
}
// Build a directory to be a image
func (d *Docker) Build(workdir string, name string) error {
// BuildImage a directory to be a image
func (d *Docker) BuildImage(ctx context.Context, workdir string, name string) error {
tarDir, err := ioutil.TempDir("/tmp", "fx-tar")
if err != nil {
return err
@@ -62,7 +65,7 @@ func (d *Docker) Build(workdir string, name string) error {
},
}
resp, err := d.ImageBuild(context.Background(), dockerBuildContext, options)
resp, err := d.ImageBuild(ctx, dockerBuildContext, options)
if err != nil {
return err
}
@@ -80,10 +83,8 @@ func (d *Docker) Build(workdir string, name string) error {
return nil
}
// Push image to hub.docker.com
func (d *Docker) Push(name string) (string, error) {
ctx := context.Background()
// PushImage push image to hub.docker.com
func (d *Docker) PushImage(ctx context.Context, name string) (string, error) {
username := os.Getenv("DOCKER_USERNAME")
password := os.Getenv("DOCKER_PASSWORD")
if username == "" || password == "" {
@@ -124,6 +125,67 @@ func (d *Docker) Push(name string) (string, error) {
return nameWithTag, nil
}
// InspectImage inspect a image
func (d *Docker) InspectImage(ctx context.Context, name string, img interface{}) error {
_, body, err := d.ImageInspectWithRaw(ctx, name)
if err != nil {
return err
}
rdr := bytes.NewReader(body)
return json.NewDecoder(rdr).Decode(&img)
}
// StartContainer create and start a container from given image
func (d *Docker) StartContainer(ctx context.Context, name string, image string, ports []int32) error {
config := &dockerTypesContainer.Config{
Image: image,
ExposedPorts: nat.PortSet{
"3000/tcp": struct{}{},
},
}
bindings := []nat.PortBinding{}
for _, port := range ports {
bindings = append(bindings, nat.PortBinding{
HostIP: types.DefaultHost,
HostPort: fmt.Sprintf("%d", port),
})
}
hostConfig := &dockerTypesContainer.HostConfig{
AutoRemove: true,
PortBindings: nat.PortMap{
"3000/tcp": bindings,
},
}
resp, err := d.ContainerCreate(ctx, config, hostConfig, nil, name)
if os.Getenv("DEBUG") != "" {
body, err := json.Marshal(resp)
if err != nil {
return err
}
log.Info(string(body))
}
if err != nil {
return err
}
if err := d.ContainerStart(ctx, resp.ID, dockerTypes.ContainerStartOptions{}); err != nil {
return err
}
return nil
}
// StopContainer stop and remove container
func (d *Docker) StopContainer(ctx context.Context, name string) error {
return d.ContainerStop(ctx, name, nil)
}
// InspectContainer inspect a container
func (d *Docker) InspectContainer(ctx context.Context, name string, container interface{}) error {
return nil
}
var (
_ image.Builder = &Docker{}
_ containerruntimes.ContainerRuntime = &Docker{}
)

View File

@@ -0,0 +1,59 @@
package docker
import (
"context"
"os"
"strings"
"testing"
"time"
dockerTypes "github.com/docker/docker/api/types"
)
func TestDocker(t *testing.T) {
ctx := context.Background()
cli, err := CreateClient(ctx)
if err != nil {
t.Fatal(err)
}
workdir := "./fixture"
name := "fx-test-docker-image"
if err := cli.BuildImage(ctx, workdir, name); err != nil {
t.Fatal(err)
}
// wait a while for image to be tagged successfully after build
time.Sleep(2 * time.Second)
var imgInfo dockerTypes.ImageInspect
if err := cli.InspectImage(ctx, name, &imgInfo); err != nil {
t.Fatal(err)
}
found := false
for _, t := range imgInfo.RepoTags {
slice := strings.Split(t, ":")
if slice[0] == name {
found = true
break
}
}
if !found {
t.Fatalf("should have built image with tag %s", name)
}
username := os.Getenv("DOCKER_USERNAME")
password := os.Getenv("DOCKER_PASSWORD")
if username == "" || password == "" {
t.Skip("Skip push image test since DOCKER_USERNAME and DOCKER_PASSWORD not set in enviroment variable")
}
img, err := cli.PushImage(ctx, name)
if err != nil {
t.Fatal(err)
}
expect := username + "/" + name
if img != expect {
t.Fatalf("should get %s but got %s", expect, img)
}
}

View File

@@ -0,0 +1,13 @@
package containerruntimes
import "context"
// ContainerRuntime interface
type ContainerRuntime interface {
BuildImage(ctx context.Context, workdir string, name string) error
PushImage(ctx context.Context, name string) (string, error)
InspectImage(ct context.Context, name string, img interface{}) error
StartContainer(ctx context.Context, name string, image string, ports []int32) error
StopContainer(ctx context.Context, name string) error
InspectContainer(ctx context.Context, name string, container interface{}) error
}

View File

@@ -1,10 +1,10 @@
package container
package deploy
import "context"
// Runner make a image a service
type Runner interface {
Deploy(ctx context.Context, name string, image string, ports []int32) error
// Deployer make a image a service
type Deployer interface {
Deploy(ctx context.Context, workdir string, name string, ports []int32) error
Destroy(ctx context.Context, name string) error
Update(ctx context.Context, name string) error
GetStatus(ctx context.Context, name string) error

70
deploy/docker/docker.go Normal file
View File

@@ -0,0 +1,70 @@
package docker
import (
"context"
"time"
dockerTypes "github.com/docker/docker/api/types"
runtime "github.com/metrue/fx/container_runtimes/docker"
"github.com/metrue/fx/deploy"
"github.com/metrue/fx/utils"
)
// Docker manage container
type Docker struct {
client *runtime.Docker
}
// CreateClient create a docker instance
func CreateClient(ctx context.Context) (*Docker, error) {
cli, err := runtime.CreateClient(ctx)
if err != nil {
return nil, err
}
return &Docker{client: cli}, nil
}
// Deploy create a Docker container from given image, and bind the constants.FxContainerExposePort to given port
func (d *Docker) Deploy(ctx context.Context, workdir string, name string, ports []int32) error {
if err := d.client.BuildImage(ctx, workdir, name); err != nil {
return err
}
// config := &dockerTypesContainer.Config{
// Image: image,
// ExposedPorts: nat.PortSet{
// "3000/tcp": struct{}{},
// },
// }
// when deploy a function on a bare Docker running without Kubernetes,
// image would be built on-demand on host locally, so there is no need to
// pull image from remote.
// But it takes some times waiting image ready after image built, we retry to make sure it ready here
var imgInfo dockerTypes.ImageInspect
if err := utils.RunWithRetry(func() error {
return d.client.InspectImage(ctx, name, &imgInfo)
}, time.Second*1, 5); err != nil {
return err
}
return d.client.StartContainer(ctx, name, name, ports)
}
// Update a container
func (d *Docker) Update(ctx context.Context, name string) error {
return nil
}
// Destroy stop and remove container
func (d *Docker) Destroy(ctx context.Context, name string) error {
return d.client.ContainerStop(ctx, name, nil)
}
// GetStatus get status of container
func (d *Docker) GetStatus(ctx context.Context, name string) error {
return nil
}
var (
_ deploy.Deployer = &Docker{}
)

View File

@@ -0,0 +1,28 @@
package docker
import (
"context"
"testing"
"time"
)
func TestDocker(t *testing.T) {
ctx := context.Background()
cli, err := CreateClient(ctx)
if err != nil {
t.Fatal(err)
}
workdir := "./fixture"
name := "helloworld"
ports := []int32{12345, 12346}
if err := cli.Deploy(ctx, workdir, name, ports); err != nil {
t.Fatal(err)
}
time.Sleep(1 * time.Second)
if err := cli.Destroy(ctx, name); err != nil {
t.Fatal(err)
}
}

View File

@@ -0,0 +1,5 @@
FROM metrue/fx-node-base
COPY . .
EXPOSE 3000
CMD ["node", "app.js"]

View File

@@ -0,0 +1,9 @@
const Koa = require('koa');
const bodyParser = require('koa-bodyparser');
const fx = require('./fx');
const app = new Koa();
app.use(bodyParser());
app.use(fx);
app.listen(3000);

View File

@@ -0,0 +1,3 @@
module.exports = (ctx) => {
ctx.body = 'hello world'
}

View File

@@ -0,0 +1,5 @@
FROM metrue/fx-node-base
COPY . .
EXPOSE 3000
CMD ["node", "app.js"]

View File

@@ -0,0 +1,9 @@
const Koa = require('koa');
const bodyParser = require('koa-bodyparser');
const fx = require('./fx');
const app = new Koa();
app.use(bodyParser());
app.use(fx);
app.listen(3000);

View File

@@ -0,0 +1,3 @@
module.exports = (ctx) => {
ctx.body = 'hello world'
}

View File

@@ -6,7 +6,8 @@ import (
"os"
"github.com/google/uuid"
"github.com/metrue/fx/container"
runtime "github.com/metrue/fx/container_runtimes/docker"
"github.com/metrue/fx/deploy"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/clientcmd"
)
@@ -38,12 +39,24 @@ func Create() (*K8S, error) {
// Deploy a image to be a service
func (k *K8S) Deploy(
ctx context.Context,
workdir string,
name string,
image string,
ports []int32,
) error {
namespace := "default"
dockerClient, err := runtime.CreateClient(ctx)
if err != nil {
return err
}
if err := dockerClient.BuildImage(ctx, workdir, name); err != nil {
return err
}
image, err := dockerClient.PushImage(ctx, name)
if err != nil {
return err
}
// By using a label selector between Pod and Service, we can link Service and Pod directly, it means a Endpoint will
// be created automatically, then incoming traffic to Service will be forward to Pod.
// Then we have no need to create Endpoint manually anymore.
@@ -101,5 +114,5 @@ func (k *K8S) GetStatus(ctx context.Context, name string) error {
}
var (
_ container.Runner = &K8S{}
_ deploy.Deployer = &K8S{}
)

View File

@@ -7,9 +7,8 @@ import (
)
func TestK8SRunner(t *testing.T) {
// TODO image is ready on hub.docker.com
name := "fx-test-func"
image := "metrue/kube-hello"
workdir := "./fixture"
name := "hello"
ports := []int32{32300}
kubeconfig := os.Getenv("KUBECONFIG")
if kubeconfig == "" {
@@ -21,7 +20,7 @@ func TestK8SRunner(t *testing.T) {
}
ctx := context.Background()
if err := k8s.Deploy(ctx, name, image, ports); err != nil {
if err := k8s.Deploy(ctx, workdir, name, ports); err != nil {
t.Fatal(err)
}

View File

@@ -10,9 +10,9 @@ import (
"github.com/metrue/fx/api"
"github.com/metrue/fx/config"
"github.com/metrue/fx/constants"
"github.com/metrue/fx/container"
"github.com/metrue/fx/container/kubernetes"
"github.com/metrue/fx/image/docker"
"github.com/metrue/fx/deploy"
dockerDeployer "github.com/metrue/fx/deploy/docker"
k8sDeployer "github.com/metrue/fx/deploy/kubernetes"
"github.com/metrue/fx/packer"
"github.com/metrue/fx/utils"
"github.com/pkg/errors"
@@ -69,40 +69,33 @@ func Deploy(cfg config.Configer) HandleFunc {
}
lang := utils.GetLangFromFileName(funcFile)
// TODO refactor to runner manager stuff
var runner container.Runner
// TODO support docker also
if os.Getenv("KUBECONFIG") != "" {
runner, err = kubernetes.Create()
if err != nil {
return err
}
wd, err := ioutil.TempDir("/tmp", "fx-wd")
if err != nil {
return err
}
if err := packer.PackIntoDir(lang, string(body), wd); err != nil {
return err
}
imageBuilder, err := docker.CreateClient()
if err != nil {
return err
}
if err := imageBuilder.Build(wd, name); err != nil {
return err
}
imageName, err := imageBuilder.Push(name)
if err != nil {
return err
}
if err := runner.Deploy(context.Background(), name, imageName, []int32{int32(port)}); err != nil {
return err
}
return nil
workdir, err := ioutil.TempDir("/tmp", "fx-wd")
if err != nil {
return err
}
if err := packer.PackIntoDir(lang, string(body), workdir); err != nil {
return err
}
return fmt.Errorf("KUBECONFIG not ready")
var deployer deploy.Deployer
if os.Getenv("KUBECONFIG") != "" {
deployer, err = k8sDeployer.Create()
if err != nil {
return err
}
} else {
bctx := context.Background()
deployer, err = dockerDeployer.CreateClient(bctx)
if err != nil {
return err
}
}
// TODO multiple ports support
return deployer.Deploy(
context.Background(),
workdir,
name,
[]int32{int32(port)},
)
}
}

View File

@@ -2,30 +2,36 @@ package handlers
import (
"context"
"fmt"
"os"
"github.com/metrue/fx/config"
"github.com/metrue/fx/container/kubernetes"
"github.com/metrue/fx/deploy"
dockerDeployer "github.com/metrue/fx/deploy/docker"
k8sDeployer "github.com/metrue/fx/deploy/kubernetes"
"github.com/urfave/cli"
)
// Destroy command handle
func Destroy(cfg config.Configer) HandleFunc {
return func(ctx *cli.Context) error {
return func(ctx *cli.Context) (err error) {
services := ctx.Args()
c := context.Background()
var runner deploy.Deployer
if os.Getenv("KUBECONFIG") != "" {
runner, err := kubernetes.Create()
runner, err = k8sDeployer.Create()
if err != nil {
return err
}
for _, svc := range services {
if err := runner.Destroy(context.Background(), svc); err != nil {
return err
}
}
} else {
return fmt.Errorf("no KUBECONFIG set in environment variables")
runner, err = dockerDeployer.CreateClient(c)
if err != nil {
return err
}
}
for _, svc := range services {
if err := runner.Destroy(c, svc); err != nil {
return err
}
}
return nil
}

View File

@@ -1,33 +0,0 @@
package docker
import (
"os"
"testing"
)
func TestDocker(t *testing.T) {
cli, err := CreateClient()
if err != nil {
t.Fatal(err)
}
workdir := "./fixture"
name := "fx-test-docker-image"
if err := cli.Build(workdir, name); err != nil {
t.Fatal(err)
}
username := os.Getenv("DOCKER_USERNAME")
password := os.Getenv("DOCKER_PASSWORD")
if username == "" || password == "" {
t.Skip("Skip push image test since DOCKER_USERNAME and DOCKER_PASSWORD not set in enviroment variable")
}
img, err := cli.Push(name)
if err != nil {
t.Fatal(err)
}
expect := username + "/" + name
if img != expect {
t.Fatalf("should get %s but got %s", expect, img)
}
}

View File

@@ -1,6 +0,0 @@
package image
// Builder image builder
type Builder interface {
Build(workdir string, name string) error
}

View File

@@ -1,17 +0,0 @@
package rkt
import (
"github.com/metrue/fx/image"
)
// Rkt rkt as a image builder
type Rkt struct{}
// Build build a directory to be a image
func (r *Rkt) Build(workdir string, name string) error {
return nil
}
var (
_ image.Builder = &Rkt{}
)

View File

@@ -13,6 +13,14 @@ run() {
$fx down ${service}_${lang} # | grep "Down Service ${service}"
}
deploy() {
local lang=$1
local port=$2
$fx deploy --name ${service}_${lang} --port ${port} test/functions/func.${lang}
docker ps
$fx destroy ${service}_${lang}
}
build_image() {
local lang=$1
local tag=$2
@@ -35,6 +43,8 @@ for lang in 'js' 'rb' 'py' 'go' 'php' 'jl' 'java' 'd'; do
run $lang $port
((port++))
deploy $lang $port
build_image $lang "test-fx-image-build-${lang}"
mkdir -p /tmp/${lang}/images
export_image ${lang} /tmp/${lang}/images

18
utils/retry.go Normal file
View File

@@ -0,0 +1,18 @@
package utils
import "time"
// RunWithRetry run a closure and retry for times when it fails
func RunWithRetry(f func() error, interval time.Duration, maxRetries int) (err error) {
var times int
for times < maxRetries {
if err = f(); err != nil {
times++
time.Sleep(interval)
} else {
break
}
}
return
}