Start HTTP server ASAP

This commit is contained in:
Philip O'Toole
2022-10-28 11:27:36 -04:00
parent 5bbd2214fd
commit e158ae8a72
3 changed files with 107 additions and 13 deletions

View File

@@ -76,6 +76,12 @@ func main() {
// Start requested profiling.
startProfile(cfg.CPUProfile, cfg.MemProfile)
// Get any credential store.
credStr, err := credentialStore(cfg)
if err != nil {
log.Fatalf("failed to get credential store: %s", err.Error())
}
// Create internode network mux and configure.
muxLn, err := net.Listen("tcp", cfg.RaftAddr)
if err != nil {
@@ -94,17 +100,6 @@ func main() {
log.Fatalf("failed to create store: %s", err.Error())
}
// Now, open store.
if err := str.Open(); err != nil {
log.Fatalf("failed to open store: %s", err.Error())
}
// Get any credential store.
credStr, err := credentialStore(cfg)
if err != nil {
log.Fatalf("failed to get credential store: %s", err.Error())
}
// Create cluster service now, so nodes will be able to learn information about each other.
clstr, err := clusterService(cfg, mux.Listen(cluster.MuxClusterHeader), str, credStr)
if err != nil {
@@ -112,7 +107,9 @@ func main() {
}
log.Printf("cluster TCP mux Listener registered with byte header %d", cluster.MuxClusterHeader)
// Start the HTTP API server.
// We want to start the HTTP server as soon as possible, so the node is responsive and external
// systems can see that it's running. We still have to open the Store though, so the node won't
// be able to do much until that happens however.
clstrDialer := tcp.NewDialer(cluster.MuxClusterHeader, cfg.NodeEncrypt, cfg.NoNodeVerify)
clstrClient := cluster.NewClient(clstrDialer, cfg.ClusterConnectTimeout)
if err := clstrClient.SetLocal(cfg.RaftAdv, clstr); err != nil {
@@ -122,6 +119,12 @@ func main() {
if err != nil {
log.Fatalf("failed to start HTTP server: %s", err.Error())
}
log.Printf("HTTP server started")
// Now, open store. How long this takes does depend on how much data is being stored by rqlite.
if err := str.Open(); err != nil {
log.Fatalf("failed to open store: %s", err.Error())
}
// Register remaining status providers.
httpServ.RegisterStatus("cluster", clstr)

View File

@@ -29,6 +29,9 @@ import (
)
var (
// ErrNotOpen is returned when a Store is not open.
ErrNotOpen = errors.New("store not open")
// ErrNotLeader is returned when a node attempts to execute a leader-only
// operation.
ErrNotLeader = errors.New("not leader")
@@ -517,8 +520,12 @@ func (s *Store) ID() string {
}
// LeaderAddr returns the address of the current leader. Returns a
// blank string if there is no leader.
// blank string if there is no leader or if the Store is not open.
func (s *Store) LeaderAddr() (string, error) {
if !s.open {
return "", nil
}
return string(s.raft.Leader()), nil
}
@@ -545,6 +552,10 @@ func (s *Store) LeaderID() (string, error) {
// Nodes returns the slice of nodes in the cluster, sorted by ID ascending.
func (s *Store) Nodes() ([]*Server, error) {
if !s.open {
return nil, ErrNotOpen
}
f := s.raft.GetConfiguration()
if f.Error() != nil {
return nil, f.Error()
@@ -620,6 +631,12 @@ func (s *Store) WaitForFSMIndex(idx uint64, timeout time.Duration) (uint64, erro
// Stats returns stats for the store.
func (s *Store) Stats() (map[string]interface{}, error) {
if !s.open {
return map[string]interface{}{
"open": false,
}, nil
}
fsmIdx := func() uint64 {
s.fsmIndexMu.RLock()
defer s.fsmIndexMu.RUnlock()
@@ -669,6 +686,7 @@ func (s *Store) Stats() (map[string]interface{}, error) {
return nil, err
}
status := map[string]interface{}{
"open": s.open,
"node_id": s.raftID,
"raft": raftStats,
"fsm_index": fsmIdx,
@@ -702,6 +720,10 @@ func (s *Store) Stats() (map[string]interface{}, error) {
// Execute executes queries that return no rows, but do modify the database.
func (s *Store) Execute(ex *command.ExecuteRequest) ([]*command.ExecuteResult, error) {
if !s.open {
return nil, ErrNotOpen
}
if s.raft.State() != raft.Leader {
return nil, ErrNotLeader
}
@@ -747,6 +769,10 @@ func (s *Store) execute(ex *command.ExecuteRequest) ([]*command.ExecuteResult, e
// Query executes queries that return rows, and do not modify the database.
func (s *Store) Query(qr *command.QueryRequest) ([]*command.QueryRows, error) {
if !s.open {
return nil, ErrNotOpen
}
if qr.Level == command.QueryRequest_QUERY_REQUEST_LEVEL_STRONG {
if s.raft.State() != raft.Leader {
return nil, ErrNotLeader
@@ -813,6 +839,10 @@ func (s *Store) Query(qr *command.QueryRequest) ([]*command.QueryRows, error) {
// level equivalent to "weak". Otherwise, no guarantees are made about the read consistency
// level. This function is safe to call while the database is being changed.
func (s *Store) Backup(br *command.BackupRequest, dst io.Writer) (retErr error) {
if !s.open {
return ErrNotOpen
}
startT := time.Now()
defer func() {
if retErr == nil {
@@ -856,6 +886,10 @@ func (s *Store) Backup(br *command.BackupRequest, dst io.Writer) (retErr error)
// Loads an entire SQLite file into the database, sending the request
// through the Raft log.
func (s *Store) Load(lr *command.LoadRequest) error {
if !s.open {
return ErrNotOpen
}
startT := time.Now()
b, err := command.MarshalLoadRequest(lr)
@@ -899,6 +933,10 @@ func (s *Store) Load(lr *command.LoadRequest) error {
//
// Notifying is idempotent. A node may repeatedly notify the Store without issue.
func (s *Store) Notify(id, addr string) error {
if !s.open {
return ErrNotOpen
}
s.notifyMu.Lock()
defer s.notifyMu.Unlock()
@@ -940,6 +978,10 @@ func (s *Store) Notify(id, addr string) error {
// Join joins a node, identified by id and located at addr, to this store.
// The node must be ready to respond to Raft communications at that address.
func (s *Store) Join(id, addr string, voter bool) error {
if !s.open {
return ErrNotOpen
}
if s.raft.State() != raft.Leader {
return ErrNotLeader
}
@@ -995,6 +1037,10 @@ func (s *Store) Join(id, addr string, voter bool) error {
// Remove removes a node from the store, specified by ID.
func (s *Store) Remove(id string) error {
if !s.open {
return ErrNotOpen
}
s.logger.Printf("received request to remove node %s", id)
if err := s.remove(id); err != nil {
s.logger.Printf("failed to remove node %s: %s", id, err.Error())

View File

@@ -23,6 +23,51 @@ func init() {
rand.Seed(time.Now().UnixNano())
}
func Test_StoreSingleNodeNotOpen(t *testing.T) {
s, ln := mustNewStore(t, true)
defer s.Close(true)
defer ln.Close()
a, err := s.LeaderAddr()
if err != nil {
t.Fatalf("failed to get leader address: %s", err.Error())
}
if a != "" {
t.Fatalf("non-empty Leader return for non-open store: %s", a)
}
// Check key methods handle being called when Store is not open.
if err := s.Join("id", "localhost", true); err != ErrNotOpen {
t.Fatalf("wrong error received for non-open store: %s", err)
}
if err := s.Notify("id", "localhost"); err != ErrNotOpen {
t.Fatalf("wrong error received for non-open store: %s", err)
}
if err := s.Remove("id"); err != ErrNotOpen {
t.Fatalf("wrong error received for non-open store: %s", err)
}
if _, err := s.Stats(); err != ErrNotOpen {
t.Fatalf("wrong error received for non-open store: %s", err)
}
if _, err := s.Nodes(); err != ErrNotOpen {
t.Fatalf("wrong error received for non-open store: %s", err)
}
if err := s.Backup(nil, nil); err != ErrNotOpen {
t.Fatalf("wrong error received for non-open store: %s", err)
}
if _, err := s.Execute(nil); err != ErrNotOpen {
t.Fatalf("wrong error received for non-open store: %s", err)
}
if _, err := s.Query(nil); err != ErrNotOpen {
t.Fatalf("wrong error received for non-open store: %s", err)
}
if err := s.Load(nil); err != ErrNotOpen {
t.Fatalf("wrong error received for non-open store: %s", err)
}
}
func Test_OpenStoreSingleNode(t *testing.T) {
s, ln := mustNewStore(t, true)
defer s.Close(true)