1
0
mirror of https://github.com/harness/drone.git synced 2022-03-12 19:30:52 +03:00

add rpc/v2 endpoint

This commit is contained in:
Brad Rydzewski
2019-05-21 11:45:42 -07:00
parent 440f192730
commit 3d4418e351
17 changed files with 494 additions and 19 deletions

View File

@@ -59,7 +59,7 @@ legal claim.**
-----------------------------------------------------------------
Waiver: Small Business
Waiver: Individual and Small Business
Contributor waives the terms of rule 1 for companies meeting all
the following criteria, counting all subsidiaries and affiliated
@@ -76,13 +76,13 @@ future versions of the software.
-----------------------------------------------------------------
Waiver: Low Volume
Waiver: Low Usage
Contributor waives the terms of rule 1 for companies meeting all
the following criteria, counting all subsidiaries and affiliated
entities as one:
1. less than 15,000 total pipelines executed using this software
1. less than 5,000 total pipelines executed using this software
in the immediately preceding, year-long period
Contributor will not revoke this waiver, but may change terms for

View File

@@ -23,6 +23,7 @@ import (
"github.com/drone/drone/metric"
"github.com/drone/drone/operator/manager"
"github.com/drone/drone/operator/manager/rpc"
"github.com/drone/drone/operator/manager/rpc2"
"github.com/drone/drone/server"
"github.com/google/wire"
@@ -38,16 +39,18 @@ var serverSet = wire.NewSet(
web.New,
provideRouter,
provideRPC,
provideRPC2,
provideServer,
provideServerOptions,
)
// provideRouter is a Wire provider function that returns a
// router that is serves the provided handlers.
func provideRouter(api api.Server, web web.Server, rpc http.Handler, metrics *metric.Server) *chi.Mux {
func provideRouter(api api.Server, web web.Server, rpc http.Handler, rpcv2 rpc2.Server, metrics *metric.Server) *chi.Mux {
r := chi.NewRouter()
r.Mount("/metrics", metrics)
r.Mount("/api", api.Handler())
r.Mount("/rpc/v2", rpcv2)
r.Mount("/rpc", rpc)
r.Mount("/", web.Handler())
return r
@@ -59,6 +62,12 @@ func provideRPC(m manager.BuildManager, config config.Config) http.Handler {
return rpc.NewServer(m, config.RPC.Secret)
}
// provideRPC2 is a Wire provider function that returns an rpc
// handler that exposes the build manager to a remote agent.
func provideRPC2(m manager.BuildManager, config config.Config) rpc2.Server {
return rpc2.NewServer(m, config.RPC.Secret)
}
// provideServer is a Wire provider function that returns an
// http server that is configured from the environment.
func provideServer(handler *chi.Mux, config config.Config) *server.Server {

View File

@@ -93,8 +93,9 @@ func InitializeApplication(config2 config.Config) (application, error) {
options := provideServerOptions(config2)
webServer := web.New(admissionService, buildStore, client, hookParser, coreLicense, licenseService, middleware, repositoryStore, session, syncer, triggerer, userStore, userService, webhookSender, options, system)
handler := provideRPC(buildManager, config2)
rpc2Server := provideRPC2(buildManager, config2)
metricServer := metric.NewServer(session)
mux := provideRouter(server, webServer, handler, metricServer)
mux := provideRouter(server, webServer, handler, rpc2Server, metricServer)
serverServer := provideServer(mux, config2)
mainApplication := newApplication(cronScheduler, datadog, runner, serverServer, userStore)
return mainApplication, nil

2
go.mod
View File

@@ -18,7 +18,7 @@ require (
github.com/drone/drone-go v1.0.5-0.20190427184118-618e4496482e
github.com/drone/drone-runtime v1.0.6
github.com/drone/drone-ui v0.0.0-20190423061913-b758d7bee2eb
github.com/drone/drone-yaml v1.0.9-0.20190424150956-115b2ff5f99e
github.com/drone/drone-yaml v1.0.9
github.com/drone/envsubst v1.0.1
github.com/drone/go-license v1.0.2
github.com/drone/go-login v1.0.4-0.20190311170324-2a4df4f242a2

2
go.sum
View File

@@ -64,6 +64,8 @@ github.com/drone/drone-yaml v1.0.8 h1:Jrnd/yC9LbMx/sMkbZcvSCKxWgMWnMaFa6HgjtpA1V
github.com/drone/drone-yaml v1.0.8/go.mod h1:1yrotgyD94qoYwgWWm71vAMbcw7Zd3gDersjeT9lYAk=
github.com/drone/drone-yaml v1.0.9-0.20190424150956-115b2ff5f99e h1:PvLIeYtS0/EXHolUF3adFiSMWArUi49knDMuclpnPH4=
github.com/drone/drone-yaml v1.0.9-0.20190424150956-115b2ff5f99e/go.mod h1:1yrotgyD94qoYwgWWm71vAMbcw7Zd3gDersjeT9lYAk=
github.com/drone/drone-yaml v1.0.9 h1:lVd8l1khAt7Ck+WMAjnRfEj+3jxLGOa8R7FITDh3jbw=
github.com/drone/drone-yaml v1.0.9/go.mod h1:1yrotgyD94qoYwgWWm71vAMbcw7Zd3gDersjeT9lYAk=
github.com/drone/envsubst v1.0.1 h1:NOOStingM2sbBwsIUeQkKUz8ShwCUzmqMxWrpXItfPE=
github.com/drone/envsubst v1.0.1/go.mod h1:bkZbnc/2vh1M12Ecn7EYScpI4YGYU0etwLJICOWi8Z0=
github.com/drone/go-license v1.0.2 h1:7OwndfYk+Lp/cGHkxe4HUn/Ysrrw3WYH2pnd99yrkok=

View File

@@ -50,7 +50,7 @@ type (
Request(ctx context.Context, args *Request) (*core.Stage, error)
// Accept accepts the build stage for execution.
Accept(ctx context.Context, stage int64, machine string) error
Accept(ctx context.Context, stage int64, machine string) (*core.Stage, error)
// Netrc returns a valid netrc for execution.
Netrc(ctx context.Context, repo int64) (*core.Netrc, error)
@@ -196,7 +196,7 @@ func (m *Manager) Request(ctx context.Context, args *Request) (*core.Stage, erro
// agents to pull the same stage from the queue. The system uses optimistic
// locking at the database-level to prevent multiple agents from executing the
// same stage.
func (m *Manager) Accept(ctx context.Context, id int64, machine string) error {
func (m *Manager) Accept(ctx context.Context, id int64, machine string) (*core.Stage, error) {
logger := logrus.WithFields(
logrus.Fields{
"stage-id": id,
@@ -209,11 +209,11 @@ func (m *Manager) Accept(ctx context.Context, id int64, machine string) error {
if err != nil {
logger = logger.WithError(err)
logger.Warnln("manager: cannot find stage")
return err
return nil, err
}
if stage.Machine != "" {
logger.Debugln("manager: stage already assigned. abort.")
return db.ErrOptimisticLock
return nil, db.ErrOptimisticLock
}
stage.Machine = machine
@@ -230,7 +230,7 @@ func (m *Manager) Accept(ctx context.Context, id int64, machine string) error {
} else {
logger.Debugln("manager: stage accepted")
}
return err
return stage, err
}
// Details fetches build details.

View File

@@ -86,9 +86,9 @@ func (s *Client) Request(ctx context.Context, args *manager.Request) (*core.Stag
}
// Accept accepts the build stage for execution.
func (s *Client) Accept(ctx context.Context, stage int64, machine string) error {
func (s *Client) Accept(ctx context.Context, stage int64, machine string) (*core.Stage, error) {
in := &acceptRequest{Stage: stage, Machine: machine}
return s.send(noContext, "/rpc/v1/accept", in, nil)
return nil, s.send(noContext, "/rpc/v1/accept", in, nil)
}
// Netrc returns a valid netrc for execution.

View File

@@ -103,12 +103,12 @@ func (s *Server) handleAccept(w http.ResponseWriter, r *http.Request) {
writeBadRequest(w, err)
return
}
err = s.manager.Accept(ctx, in.Stage, in.Machine)
out, err := s.manager.Accept(ctx, in.Stage, in.Machine)
if err != nil {
writeError(w, err)
return
}
w.WriteHeader(http.StatusNoContent)
json.NewEncoder(w).Encode(out)
}
func (s *Server) handleNetrc(w http.ResponseWriter, r *http.Request) {

View File

@@ -0,0 +1,7 @@
// Copyright 2019 Drone.IO Inc. All rights reserved.
// Use of this source code is governed by the Drone Non-Commercial License
// that can be found in the LICENSE file.
// +build !oss
package rpc2

View File

@@ -0,0 +1,295 @@
// Copyright 2019 Drone.IO Inc. All rights reserved.
// Use of this source code is governed by the Drone Non-Commercial License
// that can be found in the LICENSE file.
// +build !oss
/*
/rpc/v2/stage POST (request)
/rpc/v2/stage/{stage}?machine= POST (accept, details)
/rpc/v2/stage/{stage} PUT (beforeAll, afterAll)
/rpc/v2/stage/{stage}/steps/{step} PUT (before, after)
/rpc/v2/build/{build}/watch POST (watch)
/rpc/v2/stage/{stage}/logs/batch POST (batch)
/rpc/v2/stage/{stage}/logs/upload POST (upload)
*/
package rpc2
import (
"context"
"encoding/json"
"io"
"net/http"
"strconv"
"time"
"github.com/go-chi/chi"
"github.com/drone/drone/core"
"github.com/drone/drone/operator/manager"
"github.com/drone/drone/store/shared/db"
)
// default http request timeout
var defaultTimeout = time.Second * 30
var noContext = context.Background()
// HandleJoin returns an http.HandlerFunc that makes an
// http.Request to join the cluster.
//
// POST /rpc/v2/nodes/:machine
func HandleJoin() http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(200) // this is a no-op
}
}
// HandleLeave returns an http.HandlerFunc that makes an
// http.Request to leave the cluster.
//
// DELETE /rpc/v2/nodes/:machine
func HandleLeave() http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(200) // this is a no-op
}
}
// HandlePing returns an http.HandlerFunc that makes an
// http.Request to ping the server and confirm connectivity.
//
// GET /rpc/v2/ping
func HandlePing() http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(200)
}
}
// HandleRequest returns an http.HandlerFunc that processes an
// http.Request to reqeust a stage from the queue for execution.
//
// POST /rpc/v2/stage
func HandleRequest(m manager.BuildManager) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
ctx, cancel := context.WithTimeout(ctx, defaultTimeout)
defer cancel()
req := new(manager.Request)
err := json.NewDecoder(r.Body).Decode(req)
if err != nil {
writeError(w, err)
return
}
stage, err := m.Request(ctx, req)
if err != nil {
writeError(w, err)
} else {
writeJSON(w, stage)
}
}
}
// HandleAccept returns an http.HandlerFunc that processes an
// http.Request to accept ownership of the stage.
//
// POST /rpc/v2/stage/{stage}?machine=
func HandleAccept(m manager.BuildManager) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
stage, _ := strconv.ParseInt(
chi.URLParam(r, "stage"), 10, 64)
out, err := m.Accept(noContext, stage, r.FormValue("machine"))
if err != nil {
writeError(w, err)
} else {
writeJSON(w, out)
}
}
}
// HandleInfo returns an http.HandlerFunc that processes an
// http.Request to get the build details.
//
// POST /rpc/v2/build/{build}
func HandleInfo(m manager.BuildManager) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
stage, _ := strconv.ParseInt(
chi.URLParam(r, "stage"), 10, 64)
res, err := m.Details(noContext, stage)
if err != nil {
writeError(w, err)
return
}
netrc, err := m.Netrc(noContext, res.Repo.ID)
if err != nil {
writeError(w, err)
return
}
writeJSON(w, &details{
Context: res,
Netrc: netrc,
Repo: &repositroy{
Repository: res.Repo,
Secret: res.Repo.Secret,
},
})
}
}
// HandleUpdateStage returns an http.HandlerFunc that processes
// an http.Request to update a stage.
//
// PUT /rpc/v2/stage/{stage}
func HandleUpdateStage(m manager.BuildManager) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
dst := new(core.Stage)
err := json.NewDecoder(r.Body).Decode(dst)
if err != nil {
writeError(w, err)
return
}
if dst.Status == core.StatusPending ||
dst.Status == core.StatusRunning {
err = m.BeforeAll(noContext, dst)
} else {
err = m.AfterAll(noContext, dst)
}
if err != nil {
writeError(w, err)
} else {
writeJSON(w, dst)
}
}
}
// HandleUpdateStep returns an http.HandlerFunc that processes
// an http.Request to update a step.
//
// POST /rpc/v2/step/{step}
func HandleUpdateStep(m manager.BuildManager) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
dst := new(core.Step)
err := json.NewDecoder(r.Body).Decode(dst)
if err != nil {
writeError(w, err)
return
}
if dst.Status == core.StatusPending ||
dst.Status == core.StatusRunning {
err = m.Before(noContext, dst)
} else {
err = m.After(noContext, dst)
}
if err != nil {
writeError(w, err)
} else {
writeJSON(w, dst)
}
}
}
// HandleWatch returns an http.HandlerFunc that accepts a
// blocking http.Request that watches a build for cancellation
// events.
//
// GET /rpc/v2/build/{build}/watch
func HandleWatch(m manager.BuildManager) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
ctx, cancel := context.WithTimeout(ctx, defaultTimeout)
defer cancel()
build, _ := strconv.ParseInt(
chi.URLParamFromCtx(ctx, "build"), 10, 64)
_, err := m.Watch(ctx, build)
if err != nil {
writeError(w, err)
} else {
writeOK(w)
}
}
}
// HandleLogBatch returns an http.HandlerFunc that accepts an
// http.Request to submit a stream of logs to the system.
//
// POST /rpc/v2/step/{step}/logs/batch
func HandleLogBatch(m manager.BuildManager) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
step, _ := strconv.ParseInt(
chi.URLParam(r, "step"), 10, 64)
lines := []*core.Line{}
err := json.NewDecoder(r.Body).Decode(&lines)
if err != nil {
writeError(w, err)
return
}
// TODO(bradrydzewski) modify the write function to
// accept a slice of lines.
for _, line := range lines {
err := m.Write(noContext, step, line)
if err != nil {
writeError(w, err)
return
}
}
writeOK(w)
}
}
// HandleLogUpload returns an http.HandlerFunc that accepts an
// http.Request to upload and persist logs for a pipeline stage.
//
// POST /rpc/v2/step/{step}/logs/upload
func HandleLogUpload(m manager.BuildManager) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
step, _ := strconv.ParseInt(
chi.URLParam(r, "step"), 10, 64)
err := m.Upload(noContext, step, r.Body)
if err != nil {
writeError(w, err)
} else {
writeOK(w)
}
}
}
// write a 200 Status OK to the response body.
func writeJSON(w http.ResponseWriter, v interface{}) {
json.NewEncoder(w).Encode(v)
}
// write a 200 Status OK to the response body.
func writeOK(w http.ResponseWriter) {
w.WriteHeader(http.StatusOK)
}
// write an error message to the response body.
func writeError(w http.ResponseWriter, err error) {
if err == context.DeadlineExceeded {
w.WriteHeader(204) // should retry
} else if err == context.Canceled {
w.WriteHeader(204) // should retry
} else if err == db.ErrOptimisticLock {
w.WriteHeader(409) // should abort
} else {
w.WriteHeader(500) // should fail
}
io.WriteString(w, err.Error())
}

View File

@@ -0,0 +1,66 @@
// Copyright 2019 Drone.IO Inc. All rights reserved.
// Use of this source code is governed by the Drone Non-Commercial License
// that can be found in the LICENSE file.
// +build !oss
/*
/stage POST (request)
/stage/{stage}?machine= POST (accept, details)
/stage/{stage} PUT (beforeAll, afterAll)
/stage/{stage}/steps/{step} PUT (before, after)
/build/{build}/watch POST (watch)
/stage/{stage}/logs/batch POST (batch)
/stage/{stage}/logs/upload POST (upload)
*/
package rpc2
import (
"net/http"
"github.com/drone/drone/operator/manager"
"github.com/go-chi/chi"
"github.com/go-chi/chi/middleware"
)
// Server wraps the chi Router in a custom type for wire
// injection purposes.
type Server http.Handler
// NewServer returns a new rpc server that enables remote
// interaction with the build controller using the http transport.
func NewServer(manager manager.BuildManager, secret string) Server {
r := chi.NewRouter()
r.Use(middleware.Recoverer)
r.Use(middleware.NoCache)
r.Use(authorization(secret))
r.Post("/nodes/:machine", HandleJoin())
r.Delete("/nodes/:machine", HandleLeave())
r.Post("/ping", HandlePing())
r.Post("/stage", HandleRequest(manager))
r.Post("/stage/{stage}", HandleAccept(manager))
r.Get("/stage/{stage}", HandleInfo(manager))
r.Put("/stage/{stage}", HandleUpdateStage(manager))
r.Put("/step/{step}", HandleUpdateStep(manager))
r.Post("/build/{build}/watch", HandleWatch(manager))
r.Post("/step/{step}/logs/batch", HandleLogBatch(manager))
r.Post("/step/{step}/logs/upload", HandleLogUpload(manager))
return Server(r)
}
func authorization(token string) func(http.Handler) http.Handler {
return func(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if token == r.Header.Get("X-Drone-Token") {
next.ServeHTTP(w, r)
} else {
w.WriteHeader(401)
}
})
}
}

View File

@@ -0,0 +1,27 @@
// Use of this source code is governed by the Drone Non-Commercial License
// that can be found in the LICENSE file.
// +build !oss
package rpc2
// Copyright 2019 Drone.IO Inc. All rights reserved.
import (
"github.com/drone/drone/core"
"github.com/drone/drone/operator/manager"
)
// details provides the runner with the build details and
// includes all environment data required to execute the build.
type details struct {
*manager.Context
Netrc *core.Netrc `json:"netrc"`
Repo *repositroy `json:"repository"`
}
// repository wraps a repository object to include the secret
// when the repository is marshaled to json.
type repositroy struct {
*core.Repository
Secret string `json:"secret"`
}

View File

@@ -575,7 +575,7 @@ func (r *Runner) poll(ctx context.Context) error {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
err = r.Manager.Accept(ctx, p.ID, r.Machine)
_, err = r.Manager.Accept(ctx, p.ID, r.Machine)
if err == db.ErrOptimisticLock {
return nil
} else if err != nil {

View File

@@ -82,6 +82,8 @@ func (q *queue) Resume(ctx context.Context) error {
func (q *queue) Request(ctx context.Context, params core.Filter) (*core.Stage, error) {
w := &worker{
kind: params.Kind,
typ: params.Type,
os: params.OS,
arch: params.Arch,
kernel: params.Kernel,
@@ -144,6 +146,11 @@ func (q *queue) signal(ctx context.Context) error {
loop:
for w := range q.workers {
// the worker must match the resource kind and type
if !matchResource(w.kind, w.typ, item.Kind, item.Type) {
continue
}
// the worker is platform-specific. check to ensure
// the queue item matches the worker platform.
if w.os != item.OS {
@@ -205,6 +212,8 @@ func (q *queue) start() error {
}
type worker struct {
kind string
typ string
os string
arch string
kernel string
@@ -250,3 +259,20 @@ func withinLimits(stage *core.Stage, siblings []*core.Stage) bool {
}
return count < stage.Limit
}
// matchResource is a helper function that returns
func matchResource(kinda, typea, kindb, typeb string) bool {
if kinda == "" {
kinda = "pipeline"
}
if kindb == "" {
kindb = "pipeline"
}
if typea == "" {
typea = "docker"
}
if typeb == "" {
typeb = "docker"
}
return kinda == kindb && typea == typeb
}

View File

@@ -149,3 +149,41 @@ func TestWithinLimits(t *testing.T) {
}
}
}
func TestMatchResource(t *testing.T) {
tests := []struct {
kinda, typea, kindb, typeb string
want bool
}{
// unspecified in yaml, unspecified by agent
{"", "", "", "", true},
// unspecified in yaml, specified by agent
{"pipeline", "docker", "", "", true},
{"pipeline", "", "", "", true},
{"", "docker", "", "", true},
// specified in yaml, unspecified by agent
{"", "", "pipeline", "docker", true},
{"", "", "pipeline", "", true},
{"", "", "", "docker", true},
// specified in yaml, specified by agent
{"pipeline", "docker", "pipeline", "docker", true},
{"pipeline", "exec", "pipeline", "docker", false},
{"approval", "slack", "pipeline", "docker", false},
// misc
{"", "docker", "pipeline", "docker", true},
{"pipeline", "", "pipeline", "docker", true},
{"pipeline", "docker", "", "docker", true},
{"pipeline", "docker", "pipeline", "", true},
}
for i, test := range tests {
got, want := matchResource(test.kinda, test.typea, test.kindb, test.typeb), test.want
if got != want {
t.Errorf("Unexpectd results at index %d", i)
}
}
}

View File

@@ -333,8 +333,8 @@ func (t *triggerer) Trigger(ctx context.Context, repo *core.Repository, base *co
RepoID: repo.ID,
Number: i + 1,
Name: match.Name,
Kind: "",
Type: "",
Kind: match.Kind,
Type: match.Type,
OS: match.Platform.OS,
Arch: match.Platform.Arch,
Variant: match.Platform.Variant,
@@ -348,7 +348,9 @@ func (t *triggerer) Trigger(ctx context.Context, repo *core.Repository, base *co
Created: time.Now().Unix(),
Updated: time.Now().Unix(),
}
if stage.Kind == "pipeline" && stage.Type == "" {
stage.Type = "docker"
}
if stage.OS == "" {
stage.OS = "linux"
}

View File

@@ -451,6 +451,8 @@ var (
}
dummyStage = &core.Stage{
Kind: "pipeline",
Type: "docker",
RepoID: 1,
Name: "default",
Number: 1,