mirror of
https://github.com/rqlite/rqlite.git
synced 2022-10-30 02:37:32 +03:00
Leader redirection almost complete
This commit is contained in:
@@ -33,6 +33,7 @@ var raftAddr string
|
||||
var joinAddr string
|
||||
var dsn string
|
||||
var inMem bool
|
||||
var disRedirect bool
|
||||
var cpuprofile string
|
||||
|
||||
const desc = `rqlite is a distributed system that provides a replicated SQLite database.`
|
||||
@@ -43,6 +44,7 @@ func init() {
|
||||
flag.StringVar(&joinAddr, "join", "", "host:port of leader to join")
|
||||
flag.StringVar(&dsn, "dsn", "", `SQLite DSN parameters. E.g. "cache=shared&mode=memory"`)
|
||||
flag.BoolVar(&inMem, "mem", false, "Use an in-memory database")
|
||||
flag.BoolVar(&disRedirect, "noredir", false, "Disable leader-redirect")
|
||||
flag.StringVar(&cpuprofile, "cpuprofile", "", "Write CPU profile to file")
|
||||
flag.Usage = func() {
|
||||
fmt.Fprintf(os.Stderr, "\n%s\n\n", desc)
|
||||
@@ -101,6 +103,7 @@ func main() {
|
||||
|
||||
// Create the HTTP query server.
|
||||
s := httpd.New(httpAddr, store)
|
||||
s.DisableRedirect = disRedirect
|
||||
if err := s.Start(); err != nil {
|
||||
log.Fatalf("failed to start HTTP server: %s", err.Error())
|
||||
|
||||
|
||||
@@ -11,6 +11,7 @@ import (
|
||||
"time"
|
||||
|
||||
sql "github.com/otoolep/rqlite/db"
|
||||
"github.com/otoolep/rqlite/store"
|
||||
)
|
||||
|
||||
// Store is the interface the Raft-driven database must implement.
|
||||
@@ -62,6 +63,8 @@ type Service struct {
|
||||
store Store // The Raft-backed database store.
|
||||
|
||||
lastBackup time.Time // Time of last successful backup.
|
||||
|
||||
DisableRedirect bool // Disable leader-redirection.
|
||||
}
|
||||
|
||||
// New returns an uninitialized HTTP service.
|
||||
@@ -248,8 +251,11 @@ func (s *Service) handleExecute(w http.ResponseWriter, r *http.Request) {
|
||||
|
||||
results, err := s.store.Execute(queries, isTx)
|
||||
if err != nil {
|
||||
if err == store.ErrNotLeader && !s.DisableRedirect {
|
||||
http.Redirect(w, r, s.store.Leader(), http.StatusTemporaryRedirect)
|
||||
return
|
||||
}
|
||||
resp.Error = err.Error()
|
||||
|
||||
} else {
|
||||
resp.Results = results
|
||||
}
|
||||
@@ -309,8 +315,11 @@ func (s *Service) handleQuery(w http.ResponseWriter, r *http.Request) {
|
||||
|
||||
results, err := s.store.Query(queries, isTx, !noLeader, verify)
|
||||
if err != nil {
|
||||
if err == store.ErrNotLeader && !s.DisableRedirect {
|
||||
http.Redirect(w, r, s.store.Leader(), http.StatusTemporaryRedirect)
|
||||
return
|
||||
}
|
||||
resp.Error = err.Error()
|
||||
|
||||
} else {
|
||||
resp.Results = results
|
||||
}
|
||||
|
||||
@@ -6,6 +6,7 @@ package store
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
@@ -224,7 +225,7 @@ func (s *Store) Stats() (map[string]interface{}, error) {
|
||||
// Execute executes queries that return no rows, but do modify the database.
|
||||
func (s *Store) Execute(queries []string, tx bool) ([]*sql.Result, error) {
|
||||
if s.raft.State() != raft.Leader {
|
||||
return nil, fmt.Errorf("not leader")
|
||||
return nil, ErrNotLeader
|
||||
}
|
||||
|
||||
c := &command{
|
||||
@@ -276,7 +277,7 @@ func (s *Store) Query(queries []string, tx, leader, verify bool) ([]*sql.Rows, e
|
||||
defer s.mu.RUnlock()
|
||||
|
||||
if leader && s.raft.State() != raft.Leader {
|
||||
return nil, fmt.Errorf("not leader")
|
||||
return nil, ErrNotLeader
|
||||
}
|
||||
|
||||
if verify {
|
||||
|
||||
Reference in New Issue
Block a user