mirror of
https://github.com/fnproject/fn.git
synced 2022-10-28 21:29:17 +03:00
refactoring API and added dbs: postgres, bolt
This commit is contained in:
15
api/server/config.go
Normal file
15
api/server/config.go
Normal file
@@ -0,0 +1,15 @@
|
||||
package server
|
||||
|
||||
type Config struct {
|
||||
DatabaseURL string `json:"db"`
|
||||
Logging struct {
|
||||
To string `json:"to"`
|
||||
Level string `json:"level"`
|
||||
Prefix string `json:"prefix"`
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Config) Validate() error {
|
||||
// TODO:
|
||||
return nil
|
||||
}
|
||||
259
api/server/datastore/bolt/bolt.go
Normal file
259
api/server/datastore/bolt/bolt.go
Normal file
@@ -0,0 +1,259 @@
|
||||
package bolt
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"net/url"
|
||||
"os"
|
||||
"path/filepath"
|
||||
|
||||
"github.com/Sirupsen/logrus"
|
||||
"github.com/boltdb/bolt"
|
||||
"github.com/iron-io/functions/api/models"
|
||||
)
|
||||
|
||||
type BoltDatastore struct {
|
||||
routesBucket []byte
|
||||
appsBucket []byte
|
||||
logsBucket []byte
|
||||
db *bolt.DB
|
||||
log logrus.FieldLogger
|
||||
}
|
||||
|
||||
func New(url *url.URL) (models.Datastore, error) {
|
||||
dir := filepath.Dir(url.Path)
|
||||
log := logrus.WithFields(logrus.Fields{"db": url.Scheme, "dir": dir})
|
||||
err := os.MkdirAll(dir, 0777)
|
||||
if err != nil {
|
||||
log.WithError(err).Errorln("Could not create data directory for db")
|
||||
return nil, err
|
||||
}
|
||||
log.Infoln("Creating bolt db at ", url.Path)
|
||||
db, err := bolt.Open(url.Path, 0600, nil)
|
||||
if err != nil {
|
||||
log.WithError(err).Errorln("Error on bolt.Open")
|
||||
return nil, err
|
||||
}
|
||||
bucketPrefix := "fns-"
|
||||
if url.Query()["bucket"] != nil {
|
||||
bucketPrefix = url.Query()["bucket"][0]
|
||||
}
|
||||
routesBucketName := []byte(bucketPrefix + "routes")
|
||||
appsBucketName := []byte(bucketPrefix + "apps")
|
||||
logsBucketName := []byte(bucketPrefix + "logs")
|
||||
err = db.Update(func(tx *bolt.Tx) error {
|
||||
for _, name := range [][]byte{routesBucketName, appsBucketName, logsBucketName} {
|
||||
_, err := tx.CreateBucketIfNotExists(name)
|
||||
if err != nil {
|
||||
log.WithError(err).WithFields(logrus.Fields{"name": name}).Error("create bucket")
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
log.WithError(err).Errorln("Error creating bolt buckets")
|
||||
return nil, err
|
||||
}
|
||||
|
||||
ds := &BoltDatastore{
|
||||
routesBucket: routesBucketName,
|
||||
appsBucket: appsBucketName,
|
||||
logsBucket: logsBucketName,
|
||||
db: db,
|
||||
log: log,
|
||||
}
|
||||
log.WithFields(logrus.Fields{"prefix": bucketPrefix, "file": url.Path}).Info("BoltDB initialized")
|
||||
|
||||
return ds, nil
|
||||
}
|
||||
|
||||
func (ds *BoltDatastore) StoreApp(app *models.App) (*models.App, error) {
|
||||
err := ds.db.Update(func(tx *bolt.Tx) error {
|
||||
bIm := tx.Bucket(ds.appsBucket)
|
||||
buf, err := json.Marshal(app)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
err = bIm.Put([]byte(app.Name), buf)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
bjParent := tx.Bucket(ds.routesBucket)
|
||||
_, err = bjParent.CreateBucketIfNotExists([]byte(app.Name))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
})
|
||||
return app, err
|
||||
}
|
||||
|
||||
func (ds *BoltDatastore) RemoveApp(appName string) error {
|
||||
err := ds.db.Update(func(tx *bolt.Tx) error {
|
||||
bIm := tx.Bucket(ds.appsBucket)
|
||||
err := bIm.Delete([]byte(appName))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
bjParent := tx.Bucket(ds.routesBucket)
|
||||
err = bjParent.DeleteBucket([]byte(appName))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
})
|
||||
return err
|
||||
}
|
||||
|
||||
func (ds *BoltDatastore) GetApps(filter *models.AppFilter) ([]*models.App, error) {
|
||||
res := []*models.App{}
|
||||
err := ds.db.View(func(tx *bolt.Tx) error {
|
||||
b := tx.Bucket(ds.appsBucket)
|
||||
err2 := b.ForEach(func(key, v []byte) error {
|
||||
app := &models.App{}
|
||||
err := json.Unmarshal(v, app)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
res = append(res, app)
|
||||
return nil
|
||||
})
|
||||
if err2 != nil {
|
||||
logrus.WithError(err2).Errorln("Couldn't get apps!")
|
||||
}
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return res, nil
|
||||
}
|
||||
|
||||
func (ds *BoltDatastore) GetApp(name string) (*models.App, error) {
|
||||
var res *models.App
|
||||
err := ds.db.View(func(tx *bolt.Tx) error {
|
||||
b := tx.Bucket(ds.appsBucket)
|
||||
v := b.Get([]byte(name))
|
||||
if v != nil {
|
||||
app := &models.App{}
|
||||
err := json.Unmarshal(v, app)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
res = app
|
||||
}
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return res, nil
|
||||
}
|
||||
|
||||
func (ds *BoltDatastore) getRouteBucketForApp(tx *bolt.Tx, appName string) (*bolt.Bucket, error) {
|
||||
var err error
|
||||
bp := tx.Bucket(ds.routesBucket)
|
||||
b := bp.Bucket([]byte(appName))
|
||||
if b == nil {
|
||||
b, err = bp.CreateBucket([]byte(appName))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
return b, nil
|
||||
}
|
||||
|
||||
func (ds *BoltDatastore) StoreRoute(route *models.Route) (*models.Route, error) {
|
||||
err := ds.db.Update(func(tx *bolt.Tx) error {
|
||||
b, err := ds.getRouteBucketForApp(tx, route.AppName)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
buf, err := json.Marshal(route)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
err = b.Put([]byte(route.Name), buf)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return route, nil
|
||||
}
|
||||
|
||||
func (ds *BoltDatastore) RemoveRoute(appName, routeName string) error {
|
||||
err := ds.db.Update(func(tx *bolt.Tx) error {
|
||||
b, err := ds.getRouteBucketForApp(tx, appName)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
err = b.Delete([]byte(routeName))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (ds *BoltDatastore) GetRoute(appName, routeName string) (*models.Route, error) {
|
||||
var route models.Route
|
||||
err := ds.db.View(func(tx *bolt.Tx) error {
|
||||
b, err := ds.getRouteBucketForApp(tx, appName)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
v := b.Get([]byte(routeName))
|
||||
if v == nil {
|
||||
return models.ErrRoutesNotFound
|
||||
}
|
||||
err = json.Unmarshal(v, &route)
|
||||
return err
|
||||
})
|
||||
return &route, err
|
||||
}
|
||||
|
||||
func (ds *BoltDatastore) GetRoutes(filter *models.RouteFilter) ([]*models.Route, error) {
|
||||
res := []*models.Route{}
|
||||
err := ds.db.View(func(tx *bolt.Tx) error {
|
||||
b, err := ds.getRouteBucketForApp(tx, filter.AppName)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
i := 0
|
||||
c := b.Cursor()
|
||||
|
||||
var k, v []byte
|
||||
k, v = c.Last()
|
||||
|
||||
// Iterate backwards, newest first
|
||||
for ; k != nil; k, v = c.Prev() {
|
||||
var route models.Route
|
||||
err := json.Unmarshal(v, &route)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if models.ApplyRouteFilter(&route, filter) {
|
||||
i++
|
||||
res = append(res, &route)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return res, nil
|
||||
}
|
||||
27
api/server/datastore/datastore.go
Normal file
27
api/server/datastore/datastore.go
Normal file
@@ -0,0 +1,27 @@
|
||||
package datastore
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"net/url"
|
||||
|
||||
"github.com/Sirupsen/logrus"
|
||||
"github.com/iron-io/functions/api/models"
|
||||
"github.com/iron-io/functions/api/server/datastore/bolt"
|
||||
"github.com/iron-io/functions/api/server/datastore/postgres"
|
||||
)
|
||||
|
||||
func New(dbURL string) (models.Datastore, error) {
|
||||
u, err := url.Parse(dbURL)
|
||||
if err != nil {
|
||||
logrus.WithFields(logrus.Fields{"url": dbURL}).Fatal("bad DB URL")
|
||||
}
|
||||
logrus.WithFields(logrus.Fields{"db": u.Scheme}).Info("creating new datastore")
|
||||
switch u.Scheme {
|
||||
case "bolt":
|
||||
return bolt.New(u)
|
||||
case "postgres":
|
||||
return postgres.New(u)
|
||||
default:
|
||||
return nil, fmt.Errorf("db type not supported %v", u.Scheme)
|
||||
}
|
||||
}
|
||||
262
api/server/datastore/postgres/postgres.go
Normal file
262
api/server/datastore/postgres/postgres.go
Normal file
@@ -0,0 +1,262 @@
|
||||
package postgres
|
||||
|
||||
import (
|
||||
"database/sql"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"net/url"
|
||||
|
||||
"github.com/Sirupsen/logrus"
|
||||
"github.com/iron-io/functions/api/models"
|
||||
_ "github.com/lib/pq"
|
||||
)
|
||||
|
||||
const routesTableCreate = `CREATE TABLE IF NOT EXISTS routes (
|
||||
name character varying(256) NOT NULL PRIMARY KEY,
|
||||
path text NOT NULL,
|
||||
app_name character varying(256) NOT NULL,
|
||||
image character varying(256) NOT NULL,
|
||||
type character varying(256) NOT NULL,
|
||||
container_path text NOT NULL,
|
||||
headers text NOT NULL,
|
||||
);`
|
||||
|
||||
const appsTableCreate = `CREATE TABLE IF NOT EXISTS apps (
|
||||
name character varying(256) NOT NULL PRIMARY KEY,
|
||||
);`
|
||||
|
||||
const routeSelector = `SELECT path, app_name, image, type, container_path, headers FROM routes`
|
||||
|
||||
// Tries to read in properties into `route` from `scanner`. Bubbles up errors.
|
||||
// Capture sql.Row and sql.Rows
|
||||
type rowScanner interface {
|
||||
Scan(dest ...interface{}) error
|
||||
}
|
||||
|
||||
// Capture sql.DB and sql.Tx
|
||||
type rowQuerier interface {
|
||||
QueryRow(query string, args ...interface{}) *sql.Row
|
||||
}
|
||||
|
||||
func scanRoute(scanner rowScanner, route *models.Route) error {
|
||||
err := scanner.Scan(
|
||||
&route.Name,
|
||||
&route.Path,
|
||||
&route.AppName,
|
||||
&route.Image,
|
||||
&route.Type,
|
||||
&route.ContainerPath,
|
||||
&route.Headers,
|
||||
)
|
||||
return err
|
||||
}
|
||||
|
||||
type PostgresDatastore struct {
|
||||
db *sql.DB
|
||||
}
|
||||
|
||||
func New(url *url.URL) (models.Datastore, error) {
|
||||
db, err := sql.Open("postgres", url.String())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
err = db.Ping()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
maxIdleConns := 30 // c.MaxIdleConnections
|
||||
db.SetMaxIdleConns(maxIdleConns)
|
||||
logrus.WithFields(logrus.Fields{"max_idle_connections": maxIdleConns}).Info("Postgres dialed")
|
||||
|
||||
pg := &PostgresDatastore{
|
||||
db: db,
|
||||
}
|
||||
|
||||
for _, v := range []string{routesTableCreate, appsTableCreate} {
|
||||
_, err = db.Exec(v)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
return pg, nil
|
||||
}
|
||||
|
||||
func (ds *PostgresDatastore) StoreRoute(route *models.Route) (*models.Route, error) {
|
||||
var headers string
|
||||
|
||||
hbyte, err := json.Marshal(route.Headers)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
headers = string(hbyte)
|
||||
|
||||
err = ds.db.QueryRow(`
|
||||
INSERT INTO routes (
|
||||
name, app_name, path, image,
|
||||
type, container_path, headers
|
||||
)
|
||||
VALUES ($1, $2, $3, $4, $5, $6, $7)`,
|
||||
route.Name,
|
||||
route.AppName,
|
||||
route.Path,
|
||||
route.Image,
|
||||
route.Type,
|
||||
route.ContainerPath,
|
||||
headers,
|
||||
).Scan(nil)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return route, nil
|
||||
}
|
||||
|
||||
func (ds *PostgresDatastore) RemoveRoute(appName, routeName string) error {
|
||||
err := ds.db.QueryRow(`
|
||||
DELETE FROM routes
|
||||
WHERE name = $1`,
|
||||
routeName,
|
||||
).Scan(nil)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func getRoute(qr rowQuerier, routeName string) (*models.Route, error) {
|
||||
var route models.Route
|
||||
|
||||
row := qr.QueryRow(fmt.Sprintf("%s WHERE name=$1", routeSelector), routeName)
|
||||
err := scanRoute(row, &route)
|
||||
|
||||
if err == sql.ErrNoRows {
|
||||
return nil, models.ErrRoutesNotFound
|
||||
} else if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &route, nil
|
||||
}
|
||||
|
||||
func (ds *PostgresDatastore) GetRoute(appName, routeName string) (*models.Route, error) {
|
||||
return getRoute(ds.db, routeName)
|
||||
}
|
||||
|
||||
// TODO: Add pagination
|
||||
func (ds *PostgresDatastore) GetRoutes(filter *models.RouteFilter) ([]*models.Route, error) {
|
||||
res := []*models.Route{}
|
||||
filterQuery := buildFilterQuery(filter)
|
||||
rows, err := ds.db.Query(fmt.Sprintf("%s %s", routeSelector, filterQuery))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
for rows.Next() {
|
||||
var route models.Route
|
||||
err := scanRoute(rows, &route)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
res = append(res, &route)
|
||||
|
||||
}
|
||||
if err := rows.Err(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return res, nil
|
||||
}
|
||||
|
||||
func (ds *PostgresDatastore) GetApp(name string) (*models.App, error) {
|
||||
row := ds.db.QueryRow("SELECT * FROM groups WHERE name=$1", name)
|
||||
|
||||
var resName string
|
||||
err := row.Scan(&resName)
|
||||
|
||||
res := &models.App{
|
||||
Name: resName,
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return res, nil
|
||||
}
|
||||
|
||||
func (ds *PostgresDatastore) GetApps(filter *models.AppFilter) ([]*models.App, error) {
|
||||
res := []*models.App{}
|
||||
|
||||
rows, err := ds.db.Query(`
|
||||
SELECT DISTINCT *
|
||||
FROM apps
|
||||
ORDER BY name`,
|
||||
)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
for rows.Next() {
|
||||
var app models.App
|
||||
err := rows.Scan(&app)
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
res = append(res, &app)
|
||||
}
|
||||
|
||||
if err := rows.Err(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return res, nil
|
||||
}
|
||||
|
||||
func (ds *PostgresDatastore) StoreApp(app *models.App) (*models.App, error) {
|
||||
err := ds.db.QueryRow(`
|
||||
INSERT INTO apps (name)
|
||||
VALUES ($1)
|
||||
`, app.Name).Scan(&app.Name)
|
||||
// ON CONFLICT (name) DO UPDATE SET created_at = $2;
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return app, nil
|
||||
}
|
||||
|
||||
func (ds *PostgresDatastore) RemoveApp(appName string) error {
|
||||
err := ds.db.QueryRow(`
|
||||
DELETE FROM apps
|
||||
WHERE name = $1
|
||||
`, appName).Scan(nil)
|
||||
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func buildFilterQuery(filter *models.RouteFilter) string {
|
||||
filterQuery := ""
|
||||
|
||||
filterQueries := []string{}
|
||||
if filter.AppName != "" {
|
||||
filterQueries = append(filterQueries, fmt.Sprintf("app_name = '%s'", filter.AppName))
|
||||
}
|
||||
|
||||
for i, field := range filterQueries {
|
||||
if i == 0 {
|
||||
filterQuery = fmt.Sprintf("WHERE %s ", field)
|
||||
} else {
|
||||
filterQuery = fmt.Sprintf("%s AND %s", filterQuery, field)
|
||||
}
|
||||
}
|
||||
|
||||
return filterQuery
|
||||
}
|
||||
58
api/server/server.go
Normal file
58
api/server/server.go
Normal file
@@ -0,0 +1,58 @@
|
||||
package server
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"os"
|
||||
"path"
|
||||
|
||||
"github.com/Sirupsen/logrus"
|
||||
"github.com/gin-gonic/gin"
|
||||
"github.com/iron-io/functions/api/server/datastore"
|
||||
"github.com/iron-io/functions/api/server/router"
|
||||
)
|
||||
|
||||
type Server struct {
|
||||
router *gin.Engine
|
||||
cfg *Config
|
||||
}
|
||||
|
||||
func New(config *Config) *Server {
|
||||
return &Server{
|
||||
router: gin.Default(),
|
||||
cfg: config,
|
||||
}
|
||||
}
|
||||
|
||||
func extractFields(c *gin.Context) logrus.Fields {
|
||||
fields := logrus.Fields{"action": path.Base(c.HandlerName())}
|
||||
for _, param := range c.Params {
|
||||
fields[param.Key] = param.Value
|
||||
}
|
||||
return fields
|
||||
}
|
||||
|
||||
func (s *Server) Start() {
|
||||
if s.cfg.DatabaseURL == "" {
|
||||
cwd, _ := os.Getwd()
|
||||
s.cfg.DatabaseURL = fmt.Sprintf("bolt://%s/bolt.db?bucket=fns", cwd)
|
||||
}
|
||||
|
||||
ds, err := datastore.New(s.cfg.DatabaseURL)
|
||||
if err != nil {
|
||||
logrus.WithError(err).Fatalln("Invalid DB url.")
|
||||
}
|
||||
|
||||
logrus.SetOutput(os.Stdout)
|
||||
logrus.SetLevel(logrus.DebugLevel)
|
||||
|
||||
s.router.Use(func(c *gin.Context) {
|
||||
c.Set("store", ds)
|
||||
c.Set("log", logrus.WithFields(extractFields(c)))
|
||||
c.Next()
|
||||
})
|
||||
|
||||
router.Start(s.router)
|
||||
|
||||
// Default to :8080
|
||||
s.router.Run()
|
||||
}
|
||||
Reference in New Issue
Block a user