mirror of
https://github.com/rqlite/rqlite.git
synced 2022-10-30 02:37:32 +03:00
Client-level unit testing of Load
This commit is contained in:
@@ -385,6 +385,87 @@ func (c *Client) Backup(br *command.BackupRequest, nodeAddr string, creds *Crede
|
||||
return nil
|
||||
}
|
||||
|
||||
// Load loads a SQLite file into the database.
|
||||
func (c *Client) Load(lr *command.LoadRequest, nodeAddr string, creds *Credentials, timeout time.Duration) error {
|
||||
conn, err := c.dial(nodeAddr, c.timeout)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer conn.Close()
|
||||
|
||||
// Create the request.
|
||||
command := &Command{
|
||||
Type: Command_COMMAND_TYPE_LOAD,
|
||||
Request: &Command_LoadRequest{
|
||||
LoadRequest: lr,
|
||||
},
|
||||
Credentials: creds,
|
||||
}
|
||||
|
||||
p, err := proto.Marshal(command)
|
||||
if err != nil {
|
||||
return fmt.Errorf("command marshal: %s", err)
|
||||
}
|
||||
|
||||
// Write length of Protobuf
|
||||
b := make([]byte, 4)
|
||||
binary.LittleEndian.PutUint16(b[0:], uint16(len(p)))
|
||||
|
||||
if err := conn.SetDeadline(time.Now().Add(timeout)); err != nil {
|
||||
handleConnError(conn)
|
||||
return err
|
||||
}
|
||||
_, err = conn.Write(b)
|
||||
if err != nil {
|
||||
handleConnError(conn)
|
||||
return err
|
||||
}
|
||||
if err := conn.SetDeadline(time.Now().Add(timeout)); err != nil {
|
||||
handleConnError(conn)
|
||||
return err
|
||||
}
|
||||
_, err = conn.Write(p)
|
||||
if err != nil {
|
||||
handleConnError(conn)
|
||||
return err
|
||||
}
|
||||
|
||||
// Read length of response.
|
||||
if err := conn.SetDeadline(time.Now().Add(timeout)); err != nil {
|
||||
handleConnError(conn)
|
||||
return err
|
||||
}
|
||||
_, err = io.ReadFull(conn, b)
|
||||
if err != nil {
|
||||
handleConnError(conn)
|
||||
return err
|
||||
}
|
||||
sz := binary.LittleEndian.Uint32(b[0:])
|
||||
|
||||
// Read in the actual response.
|
||||
p = make([]byte, sz)
|
||||
if err := conn.SetDeadline(time.Now().Add(timeout)); err != nil {
|
||||
handleConnError(conn)
|
||||
return err
|
||||
}
|
||||
_, err = io.ReadFull(conn, p)
|
||||
if err != nil {
|
||||
handleConnError(conn)
|
||||
return err
|
||||
}
|
||||
|
||||
a := &CommandLoadResponse{}
|
||||
err = proto.Unmarshal(p, a)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if a.Error != "" {
|
||||
return errors.New(a.Error)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Stats returns stats on the Client instance
|
||||
func (c *Client) Stats() (map[string]interface{}, error) {
|
||||
c.mu.RLock()
|
||||
|
||||
@@ -28,6 +28,7 @@ const (
|
||||
numExecuteRequest = "num_execute_req"
|
||||
numQueryRequest = "num_query_req"
|
||||
numBackupRequest = "num_backup_req"
|
||||
numLoadRequest = "num_backup_req"
|
||||
|
||||
// Client stats for this package.
|
||||
numGetNodeAPIRequestLocal = "num_get_node_api_req_local"
|
||||
@@ -48,6 +49,7 @@ func init() {
|
||||
stats.Add(numExecuteRequest, 0)
|
||||
stats.Add(numQueryRequest, 0)
|
||||
stats.Add(numBackupRequest, 0)
|
||||
stats.Add(numLoadRequest, 0)
|
||||
stats.Add(numGetNodeAPIRequestLocal, 0)
|
||||
}
|
||||
|
||||
@@ -69,6 +71,9 @@ type Database interface {
|
||||
|
||||
// Backup writes a backup of the database to the writer.
|
||||
Backup(br *command.BackupRequest, dst io.Writer) error
|
||||
|
||||
// Loads an entire SQLite file into the database
|
||||
Load(lr *command.LoadRequest) error
|
||||
}
|
||||
|
||||
// CredentialStore is the interface credential stores must support.
|
||||
@@ -337,6 +342,32 @@ func (s *Service) handleConn(conn net.Conn) {
|
||||
binary.LittleEndian.PutUint32(b[0:], uint32(len(p)))
|
||||
conn.Write(b)
|
||||
conn.Write(p)
|
||||
|
||||
case Command_COMMAND_TYPE_LOAD:
|
||||
stats.Add(numLoadRequest, 1)
|
||||
|
||||
resp := &CommandLoadResponse{}
|
||||
|
||||
lr := c.GetLoadRequest()
|
||||
if lr == nil {
|
||||
resp.Error = "LoadRequest is nil"
|
||||
} else if !s.checkCommandPerm(c, auth.PermLoad) {
|
||||
resp.Error = "unauthorized"
|
||||
} else {
|
||||
if err := s.db.Load(lr); err != nil {
|
||||
resp.Error = err.Error()
|
||||
}
|
||||
}
|
||||
|
||||
p, err = proto.Marshal(resp)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
// Write length of Protobuf first, then write the actual Protobuf.
|
||||
b = make([]byte, 4)
|
||||
binary.LittleEndian.PutUint32(b[0:], uint32(len(p)))
|
||||
conn.Write(b)
|
||||
conn.Write(p)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -306,6 +306,46 @@ func Test_ServiceBackup(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func Test_ServiceLoad(t *testing.T) {
|
||||
ln, mux := mustNewMux()
|
||||
go mux.Serve()
|
||||
tn := mux.Listen(1) // Could be any byte value.
|
||||
db := mustNewMockDatabase()
|
||||
cred := mustNewMockCredentialStore()
|
||||
s := New(tn, db, cred)
|
||||
if s == nil {
|
||||
t.Fatalf("failed to create cluster service")
|
||||
}
|
||||
|
||||
c := NewClient(mustNewDialer(1, false, false), 30*time.Second)
|
||||
|
||||
if err := s.Open(); err != nil {
|
||||
t.Fatalf("failed to open cluster service: %s", err.Error())
|
||||
}
|
||||
|
||||
// Ready for Load tests now.
|
||||
testData := []byte("this is SQLite data")
|
||||
db.loadFn = func(lr *command.LoadRequest) error {
|
||||
if bytes.Compare(lr.Data, testData) != 0 {
|
||||
t.Fatalf("load data is not as expected, exp: %s, got: %s", testData, lr.Data)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
err := c.Load(loadRequest(testData), s.Addr(), NO_CREDS, longWait)
|
||||
if err != nil {
|
||||
t.Fatalf("failed to load database: %s", err.Error())
|
||||
}
|
||||
|
||||
// Clean up resources.
|
||||
if err := ln.Close(); err != nil {
|
||||
t.Fatalf("failed to close Mux's listener: %s", err)
|
||||
}
|
||||
if err := s.Close(); err != nil {
|
||||
t.Fatalf("failed to close cluster service")
|
||||
}
|
||||
}
|
||||
|
||||
// Test_BinaryEncoding_Backwards ensures that software earlier than v6.6.2
|
||||
// can communicate with v6.6.2+ releases. v6.6.2 increased the maximum size
|
||||
// of cluster responses.
|
||||
@@ -376,6 +416,12 @@ func backupRequestBinary(leader bool) *command.BackupRequest {
|
||||
}
|
||||
}
|
||||
|
||||
func loadRequest(b []byte) *command.LoadRequest {
|
||||
return &command.LoadRequest{
|
||||
Data: b,
|
||||
}
|
||||
}
|
||||
|
||||
func asJSON(v interface{}) string {
|
||||
b, err := encoding.JSONMarshal(v)
|
||||
if err != nil {
|
||||
|
||||
@@ -312,6 +312,7 @@ type mockDatabase struct {
|
||||
executeFn func(er *command.ExecuteRequest) ([]*command.ExecuteResult, error)
|
||||
queryFn func(qr *command.QueryRequest) ([]*command.QueryRows, error)
|
||||
backupFn func(br *command.BackupRequest, dst io.Writer) error
|
||||
loadFn func(lr *command.LoadRequest) error
|
||||
}
|
||||
|
||||
func (m *mockDatabase) Execute(er *command.ExecuteRequest) ([]*command.ExecuteResult, error) {
|
||||
@@ -329,6 +330,13 @@ func (m *mockDatabase) Backup(br *command.BackupRequest, dst io.Writer) error {
|
||||
return m.backupFn(br, dst)
|
||||
}
|
||||
|
||||
func (m *mockDatabase) Load(lr *command.LoadRequest) error {
|
||||
if m.loadFn == nil {
|
||||
return nil
|
||||
}
|
||||
return m.loadFn(lr)
|
||||
}
|
||||
|
||||
func mustNewMockDatabase() *mockDatabase {
|
||||
e := func(er *command.ExecuteRequest) ([]*command.ExecuteResult, error) {
|
||||
return []*command.ExecuteResult{}, nil
|
||||
|
||||
Reference in New Issue
Block a user