123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385 |
- /*
- * EliasDB
- *
- * Copyright 2016 Matthias Ladkau. All rights reserved.
- *
- * This Source Code Form is subject to the terms of the Mozilla Public
- * License, v. 2.0. If a copy of the MPL was not distributed with this
- * file, You can obtain one at http://mozilla.org/MPL/2.0/.
- */
- package v1
- import (
- "bytes"
- "encoding/json"
- "errors"
- "fmt"
- "io/ioutil"
- "log"
- "os"
- "strings"
- "testing"
- "devt.de/krotik/common/datautil"
- "devt.de/krotik/eliasdb/api"
- "devt.de/krotik/eliasdb/cluster"
- "devt.de/krotik/eliasdb/cluster/manager"
- "devt.de/krotik/eliasdb/graph/graphstorage"
- )
- func TestClusterQuery(t *testing.T) {
- queryURL := "http://localhost" + TESTPORT + EndpointClusterQuery
- st, _, res := sendTestRequest(queryURL, "GET", nil)
- // We should get a failure back if clustering is not available
- if st != "503 Service Unavailable" || res != "Clustering is not enabled on this instance" {
- t.Error("Unexpected response:", st, res)
- return
- }
- st, _, res = sendTestRequest(queryURL, "DELETE", nil)
- // We should get a failure back if clustering is not available
- if st != "503 Service Unavailable" || res != "Clustering is not enabled on this instance" {
- t.Error("Unexpected response:", st, res)
- return
- }
- // Create now a small cluster
- cluster2 := createCluster(2)
- api.DD = cluster2[0]
- api.DDLog = datautil.NewRingBuffer(10)
- // We should now get back a state
- st, _, res = sendTestRequest(queryURL, "GET", nil)
- if st != "200 OK" || res != `
- {
- "failed": null,
- "members": [
- "TestClusterMember-0",
- "localhost:9020"
- ],
- "replication": 1,
- "ts": [
- "TestClusterMember-0",
- "1"
- ],
- "tsold": [
- "",
- "0"
- ]
- }`[1:] {
- t.Error("Unexpected response:", st, res)
- return
- }
- st, _, res = sendTestRequest(queryURL+"memberinfos", "GET", nil)
- if st != "200 OK" || res != `
- {
- "TestClusterMember-0": {}
- }`[1:] {
- t.Error("Unexpected response:", st, res)
- return
- }
- api.DDLog.Add("test cluster message1")
- api.DDLog.Add("test cluster message2")
- st, _, res = sendTestRequest(queryURL+"log", "GET", nil)
- if st != "200 OK" || res != `
- [
- "test cluster message1",
- "test cluster message2"
- ]`[1:] {
- t.Error("Unexpected response:", st, res)
- return
- }
- st, _, _ = sendTestRequest(queryURL+"log", "DELETE", nil)
- if st != "200 OK" {
- t.Error("Unexpected response:", st)
- return
- }
- st, _, res = sendTestRequest(queryURL+"log", "GET", nil)
- if st != "200 OK" || res != `
- []`[1:] {
- t.Error("Unexpected response:", st, res)
- return
- }
- st, _, _ = sendTestRequest(queryURL+"bla", "DELETE", nil)
- if st != "400 Bad Request" {
- t.Error("Unexpected response:", st)
- return
- }
- log.SetOutput(ioutil.Discard)
- cluster2[0].MemberManager.Start()
- cluster2[1].MemberManager.Start()
- defer func() {
- cluster2[0].MemberManager.Shutdown()
- cluster2[1].MemberManager.Shutdown()
- log.SetOutput(os.Stdout)
- }()
- jsonString, err := json.Marshal(map[string]interface{}{
- "name": cluster2[1].MemberManager.Name(),
- "netaddr": cluster2[1].MemberManager.NetAddr(),
- })
- if err != nil {
- t.Error(err)
- return
- }
- st, _, res = sendTestRequest(queryURL+"ping", "PUT", jsonString)
- if st != "200 OK" || res != `[
- "Pong"
- ]` {
- t.Error("Unexpected response:", st, res)
- return
- }
- manager.MemberErrors = make(map[string]error)
- manager.MemberErrors[cluster2[1].Name()] = errors.New("testerror")
- st, _, res = sendTestRequest(queryURL+"eject", "PUT", jsonString)
- st, _, res = sendTestRequest(queryURL+"ping", "PUT", jsonString)
- if st != "403 Forbidden" || res != "Ping returned an error: ClusterError: Member error (testerror)" {
- t.Error("Unexpected response:", st, res)
- return
- }
- manager.MemberErrors = nil
- st, _, res = sendTestRequest(queryURL, "PUT", nil)
- if st != "400 Bad Request" || res != "Need a command either: join or eject" {
- t.Error("Unexpected response:", st, res)
- return
- }
- jsonString, err = json.Marshal(map[string]interface{}{
- "name": "bla",
- })
- if err != nil {
- t.Error(err)
- return
- }
- st, _, res = sendTestRequest(queryURL+"join", "PUT", jsonString)
- if st != "400 Bad Request" || res != "Required argument netaddr missing in body arguments" {
- t.Error("Unexpected response:", st, res)
- return
- }
- jsonString, err = json.Marshal(map[string]interface{}{
- "name": "bla",
- "netaddr": cluster2[1].MemberManager.NetAddr(),
- })
- if err != nil {
- t.Error(err)
- return
- }
- st, _, res = sendTestRequest(queryURL+"join", "PUT", jsonString)
- if st != "403 Forbidden" || res != "Could not join the cluster: ClusterError: Member error (Unknown target member)" {
- t.Error("Unexpected response:", st, res)
- return
- }
- jsonString, err = json.Marshal(map[string]interface{}{
- "name": cluster2[1].MemberManager.Name(),
- "netaddr": cluster2[1].MemberManager.NetAddr(),
- })
- if err != nil {
- t.Error(err)
- return
- }
- st, _, res = sendTestRequest(queryURL+"join", "PUT", jsonString)
- if st != "200 OK" || res != "" {
- t.Error("Unexpected response:", st, res)
- return
- }
- // Check state info
- if err := checkStateInfo(cluster2[1].MemberManager, `
- {
- "failed": null,
- "members": [
- "TestClusterMember-1",
- "localhost:9021",
- "TestClusterMember-0",
- "localhost:9020"
- ],
- "replication": 1,
- "ts": [
- "TestClusterMember-1",
- "2"
- ],
- "tsold": [
- "TestClusterMember-1",
- "1"
- ]
- }
- `[1:]); err != nil {
- t.Error(err)
- return
- }
- // Set some member info and read it back
- cluster2[1].MemberManager.MemberInfo()["test123"] = "123"
- st, _, res = sendTestRequest(queryURL+"memberinfos", "GET", nil)
- if st != "200 OK" || res != `
- {
- "TestClusterMember-0": {},
- "TestClusterMember-1": {
- "test123": "123"
- }
- }`[1:] {
- t.Error("Unexpected response:", st, res)
- return
- }
- // Eject member from cluster
- jsonString, err = json.Marshal(map[string]interface{}{
- "name": cluster2[0].MemberManager.Name(),
- })
- if err != nil {
- t.Error(err)
- return
- }
- manager.MemberErrors = make(map[string]error)
- manager.MemberErrors[cluster2[1].Name()] = errors.New("testerror")
- st, _, res = sendTestRequest(queryURL+"eject", "PUT", jsonString)
- if st != "403 Forbidden" || res != "Could not eject TestClusterMember-0 from cluster: ClusterError: Member error (testerror)" {
- t.Error("Unexpected response:", st, res)
- return
- }
- manager.MemberErrors = nil
- jsonString, err = json.Marshal(map[string]interface{}{
- "name": cluster2[0].MemberManager.Name(),
- })
- if err != nil {
- t.Error(err)
- return
- }
- st, _, res = sendTestRequest(queryURL+"eject", "PUT", jsonString)
- if st != "200 OK" || res != "" {
- t.Error("Unexpected response:", st, res)
- return
- }
- if err := checkStateInfo(cluster2[1].MemberManager, `
- {
- "failed": null,
- "members": [
- "TestClusterMember-1",
- "localhost:9021"
- ],
- "replication": 1,
- "ts": [
- "TestClusterMember-1",
- "3"
- ],
- "tsold": [
- "TestClusterMember-1",
- "2"
- ]
- }
- `[1:]); err != nil {
- t.Error(err)
- return
- }
- // Some error cases
- st, _, res = sendTestRequest(queryURL+"bla", "PUT", jsonString[2:])
- if st != "400 Bad Request" || !strings.HasPrefix(res, "Could not decode arguments") {
- t.Error("Unexpected response:", st, res)
- return
- }
- st, _, res = sendTestRequest(queryURL+"bla", "PUT", jsonString)
- if st != "400 Bad Request" || res != "Unknown command: bla" {
- t.Error("Unexpected response:", st, res)
- return
- }
- }
- /*
- Create a cluster with n members (all storage is in memory)
- */
- func createCluster(n int) []*cluster.DistributedStorage {
- var mgs []*graphstorage.MemoryGraphStorage
- var cs []*cluster.DistributedStorage
- for i := 0; i < n; i++ {
- mgs = append(mgs, graphstorage.NewMemoryGraphStorage(fmt.Sprintf("mgs%v", i+1)).(*graphstorage.MemoryGraphStorage))
- }
- for i := 0; i < n; i++ {
- ds, _ := cluster.NewDistributedStorage(mgs[i], map[string]interface{}{
- manager.ConfigRPC: fmt.Sprintf("localhost:%v", 9020+i),
- manager.ConfigMemberName: fmt.Sprintf("TestClusterMember-%v", i),
- manager.ConfigClusterSecret: "test123",
- }, manager.NewMemStateInfo())
- cs = append(cs, ds)
- }
- return cs
- }
- func checkStateInfo(mm *manager.MemberManager, expectedStateInfo string) error {
- var w bytes.Buffer
- ret := json.NewEncoder(&w)
- ret.Encode(mm.StateInfo().Map())
- out := bytes.Buffer{}
- err := json.Indent(&out, w.Bytes(), "", " ")
- if err != nil {
- return err
- }
- if out.String() != expectedStateInfo {
- return fmt.Errorf("Unexpected state info: %v\nexpected: %v",
- out.String(), expectedStateInfo)
- }
- return nil
- }
|