123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643 |
- /*
- * EliasDB
- *
- * Copyright 2016 Matthias Ladkau. All rights reserved.
- *
- * This Source Code Form is subject to the terms of the Mozilla Public
- * License, v. 2.0. If a copy of the MPL was not distributed with this
- * file, You can obtain one at http://mozilla.org/MPL/2.0/.
- */
- package v1
- import (
- "bytes"
- "encoding/json"
- "errors"
- "fmt"
- "io/ioutil"
- "log"
- "os"
- "strings"
- "testing"
- "devt.de/krotik/common/datautil"
- "devt.de/krotik/eliasdb/api"
- "devt.de/krotik/eliasdb/cluster"
- "devt.de/krotik/eliasdb/cluster/manager"
- "devt.de/krotik/eliasdb/graph"
- "devt.de/krotik/eliasdb/graph/graphstorage"
- )
- func TestClusterStorage(t *testing.T) {
- clusterQueryURL := "http://localhost" + TESTPORT + EndpointClusterQuery
- graphURL := "http://localhost" + TESTPORT + EndpointGraph
- cluster2 := createCluster(2)
- joinCluster(cluster2, t)
- oldGM := api.GM
- oldGS := api.GS
- api.GS = cluster2[0]
- api.GM = graph.NewGraphManager(cluster2[0])
- api.DD = cluster2[0]
- api.DDLog = datautil.NewRingBuffer(10)
- defer func() {
- api.GM = oldGM
- api.GS = oldGS
- api.DD = nil
- api.DDLog = nil
- }()
- // We should now get back a state
- st, _, res := sendTestRequest(clusterQueryURL, "GET", nil)
- if st != "200 OK" || res != `
- {
- "failed": null,
- "members": [
- "TestClusterMember-0",
- "localhost:9020",
- "TestClusterMember-1",
- "localhost:9021"
- ],
- "replication": 1,
- "ts": [
- "TestClusterMember-0",
- "2"
- ],
- "tsold": [
- "TestClusterMember-0",
- "1"
- ]
- }`[1:] {
- t.Error("Unexpected response:", st, res)
- return
- }
- // Insert some data
- sendTestRequest(graphURL+"i41health/n", "POST", []byte(`
- [{
- "key":"3",
- "kind":"Upload",
- "parcel": "12345"
- }]
- `[1:]))
- cluster.WaitForTransfer()
- n, err := api.GM.FetchNode("i41health", "3", "Upload")
- if err != nil || n.String() != `GraphNode:
- key : 3
- kind : Upload
- parcel : 12345
- ` {
- t.Error("Unexpected result:", n, err)
- return
- }
- }
- func TestClusterQuery(t *testing.T) {
- queryURL := "http://localhost" + TESTPORT + EndpointClusterQuery
- st, _, res := sendTestRequest(queryURL, "GET", nil)
- // We should get a failure back if clustering is not available
- if st != "503 Service Unavailable" || res != "Clustering is not enabled on this instance" {
- t.Error("Unexpected response:", st, res)
- return
- }
- st, _, res = sendTestRequest(queryURL, "DELETE", nil)
- // We should get a failure back if clustering is not available
- if st != "503 Service Unavailable" || res != "Clustering is not enabled on this instance" {
- t.Error("Unexpected response:", st, res)
- return
- }
- // Create now a small cluster
- cluster2 := createCluster(2)
- oldGM := api.GM
- oldGS := api.GS
- api.GS = cluster2[0]
- api.GM = graph.NewGraphManager(cluster2[0])
- api.DD = cluster2[0]
- api.DDLog = datautil.NewRingBuffer(10)
- defer func() {
- api.GM = oldGM
- api.GS = oldGS
- api.DD = nil
- api.DDLog = nil
- }()
- // We should now get back a state
- st, _, res = sendTestRequest(queryURL, "GET", nil)
- if st != "200 OK" || res != `
- {
- "failed": null,
- "members": [
- "TestClusterMember-0",
- "localhost:9020"
- ],
- "replication": 1,
- "ts": [
- "TestClusterMember-0",
- "1"
- ],
- "tsold": [
- "",
- "0"
- ]
- }`[1:] {
- t.Error("Unexpected response:", st, res)
- return
- }
- st, _, res = sendTestRequest(queryURL+"memberinfos", "GET", nil)
- if st != "200 OK" || res != `
- {
- "TestClusterMember-0": {}
- }`[1:] {
- t.Error("Unexpected response:", st, res)
- return
- }
- api.DDLog.Add("test cluster message1")
- api.DDLog.Add("test cluster message2")
- st, _, res = sendTestRequest(queryURL+"log", "GET", nil)
- if st != "200 OK" || res != `
- [
- "test cluster message1",
- "test cluster message2"
- ]`[1:] {
- t.Error("Unexpected response:", st, res)
- return
- }
- st, _, _ = sendTestRequest(queryURL+"log", "DELETE", nil)
- if st != "200 OK" {
- t.Error("Unexpected response:", st)
- return
- }
- st, _, res = sendTestRequest(queryURL+"log", "GET", nil)
- if st != "200 OK" || res != `
- []`[1:] {
- t.Error("Unexpected response:", st, res)
- return
- }
- st, _, _ = sendTestRequest(queryURL+"bla", "DELETE", nil)
- if st != "400 Bad Request" {
- t.Error("Unexpected response:", st)
- return
- }
- log.SetOutput(ioutil.Discard)
- cluster2[0].MemberManager.Start()
- cluster2[1].MemberManager.Start()
- defer func() {
- cluster2[0].MemberManager.Shutdown()
- cluster2[1].MemberManager.Shutdown()
- log.SetOutput(os.Stdout)
- }()
- jsonString, err := json.Marshal(map[string]interface{}{
- "name": cluster2[1].MemberManager.Name(),
- "netaddr": cluster2[1].MemberManager.NetAddr(),
- })
- if err != nil {
- t.Error(err)
- return
- }
- st, _, res = sendTestRequest(queryURL+"ping", "PUT", jsonString)
- if st != "200 OK" || res != `[
- "Pong"
- ]` {
- t.Error("Unexpected response:", st, res)
- return
- }
- manager.MemberErrors = make(map[string]error)
- manager.MemberErrors[cluster2[1].Name()] = errors.New("testerror")
- sendTestRequest(queryURL+"eject", "PUT", jsonString)
- st, _, res = sendTestRequest(queryURL+"ping", "PUT", jsonString)
- if st != "403 Forbidden" || res != "Ping returned an error: ClusterError: Member error (testerror)" {
- t.Error("Unexpected response:", st, res)
- return
- }
- manager.MemberErrors = nil
- st, _, res = sendTestRequest(queryURL, "PUT", nil)
- if st != "400 Bad Request" || res != "Need a command either: join or eject" {
- t.Error("Unexpected response:", st, res)
- return
- }
- jsonString, err = json.Marshal(map[string]interface{}{
- "name": "bla",
- })
- if err != nil {
- t.Error(err)
- return
- }
- st, _, res = sendTestRequest(queryURL+"join", "PUT", jsonString)
- if st != "400 Bad Request" || res != "Required argument netaddr missing in body arguments" {
- t.Error("Unexpected response:", st, res)
- return
- }
- jsonString, err = json.Marshal(map[string]interface{}{
- "name": "bla",
- "netaddr": cluster2[1].MemberManager.NetAddr(),
- })
- if err != nil {
- t.Error(err)
- return
- }
- st, _, res = sendTestRequest(queryURL+"join", "PUT", jsonString)
- if st != "403 Forbidden" || res != "Could not join the cluster: ClusterError: Member error (Unknown target member)" {
- t.Error("Unexpected response:", st, res)
- return
- }
- jsonString, err = json.Marshal(map[string]interface{}{
- "name": cluster2[1].MemberManager.Name(),
- "netaddr": cluster2[1].MemberManager.NetAddr(),
- })
- if err != nil {
- t.Error(err)
- return
- }
- st, _, res = sendTestRequest(queryURL+"join", "PUT", jsonString)
- if st != "200 OK" || res != "" {
- t.Error("Unexpected response:", st, res)
- return
- }
- // Check state info
- if err := checkStateInfo(cluster2[1].MemberManager, `
- {
- "failed": null,
- "members": [
- "TestClusterMember-1",
- "localhost:9021",
- "TestClusterMember-0",
- "localhost:9020"
- ],
- "replication": 1,
- "ts": [
- "TestClusterMember-1",
- "2"
- ],
- "tsold": [
- "TestClusterMember-1",
- "1"
- ]
- }
- `[1:]); err != nil {
- t.Error(err)
- return
- }
- // Set some member info and read it back
- cluster2[1].MemberManager.MemberInfo()["test123"] = "123"
- st, _, res = sendTestRequest(queryURL+"memberinfos", "GET", nil)
- if st != "200 OK" || res != `
- {
- "TestClusterMember-0": {},
- "TestClusterMember-1": {
- "test123": "123"
- }
- }`[1:] {
- t.Error("Unexpected response:", st, res)
- return
- }
- // Eject member from cluster
- jsonString, err = json.Marshal(map[string]interface{}{
- "name": cluster2[0].MemberManager.Name(),
- })
- if err != nil {
- t.Error(err)
- return
- }
- manager.MemberErrors = make(map[string]error)
- manager.MemberErrors[cluster2[1].Name()] = errors.New("testerror")
- st, _, res = sendTestRequest(queryURL+"eject", "PUT", jsonString)
- if st != "403 Forbidden" || res != "Could not eject TestClusterMember-0 from cluster: ClusterError: Member error (testerror)" {
- t.Error("Unexpected response:", st, res)
- return
- }
- manager.MemberErrors = nil
- jsonString, err = json.Marshal(map[string]interface{}{
- "name": cluster2[0].MemberManager.Name(),
- })
- if err != nil {
- t.Error(err)
- return
- }
- st, _, res = sendTestRequest(queryURL+"eject", "PUT", jsonString)
- if st != "200 OK" || res != "" {
- t.Error("Unexpected response:", st, res)
- return
- }
- if err := checkStateInfo(cluster2[1].MemberManager, `
- {
- "failed": null,
- "members": [
- "TestClusterMember-1",
- "localhost:9021"
- ],
- "replication": 1,
- "ts": [
- "TestClusterMember-1",
- "3"
- ],
- "tsold": [
- "TestClusterMember-1",
- "2"
- ]
- }
- `[1:]); err != nil {
- t.Error(err)
- return
- }
- // Some error cases
- st, _, res = sendTestRequest(queryURL+"bla", "PUT", jsonString[2:])
- if st != "400 Bad Request" || !strings.HasPrefix(res, "Could not decode arguments") {
- t.Error("Unexpected response:", st, res)
- return
- }
- st, _, res = sendTestRequest(queryURL+"bla", "PUT", jsonString)
- if st != "400 Bad Request" || res != "Unknown command: bla" {
- t.Error("Unexpected response:", st, res)
- return
- }
- }
- func TestClusterQueryBigCluster(t *testing.T) {
- queryURL := "http://localhost" + TESTPORT + EndpointClusterQuery
- // Create a big cluster
- cluster3 := createCluster(3)
- for _, dd := range cluster3 {
- dd.Start()
- defer dd.Close()
- }
- oldGM := api.GM
- oldGS := api.GS
- api.GS = cluster3[0]
- api.GM = graph.NewGraphManager(cluster3[0])
- api.DD = cluster3[0]
- api.DDLog = datautil.NewRingBuffer(10)
- defer func() {
- api.GM = oldGM
- api.GS = oldGS
- api.DD = nil
- api.DDLog = nil
- }()
- // We should now get back a state
- st, _, res := sendTestRequest(queryURL, "GET", nil)
- if st != "200 OK" || res != `
- {
- "failed": null,
- "members": [
- "TestClusterMember-0",
- "localhost:9020"
- ],
- "replication": 1,
- "ts": [
- "TestClusterMember-0",
- "1"
- ],
- "tsold": [
- "",
- "0"
- ]
- }`[1:] {
- t.Error("Unexpected response:", st, res)
- return
- }
- // Join more cluster members
- api.DD = cluster3[1]
- api.DDLog = datautil.NewRingBuffer(10)
- jsonString, err := json.Marshal(map[string]interface{}{
- "name": cluster3[0].MemberManager.Name(),
- "netaddr": cluster3[0].MemberManager.NetAddr(),
- })
- if err != nil {
- t.Error(err)
- return
- }
- st, _, res = sendTestRequest(queryURL+"join", "PUT", jsonString)
- if st != "200 OK" || res != "" {
- t.Error("Unexpected response:", st, res)
- return
- }
- st, _, res = sendTestRequest(queryURL, "GET", nil)
- if st != "200 OK" || res != `
- {
- "failed": null,
- "members": [
- "TestClusterMember-1",
- "localhost:9021",
- "TestClusterMember-0",
- "localhost:9020"
- ],
- "replication": 1,
- "ts": [
- "TestClusterMember-0",
- "2"
- ],
- "tsold": [
- "TestClusterMember-0",
- "1"
- ]
- }`[1:] {
- t.Error("Unexpected response:", st, res)
- return
- }
- api.DD = cluster3[2]
- api.DDLog = datautil.NewRingBuffer(10)
- jsonString, err = json.Marshal(map[string]interface{}{
- "name": cluster3[0].MemberManager.Name(),
- "netaddr": cluster3[0].MemberManager.NetAddr(),
- })
- if err != nil {
- t.Error(err)
- return
- }
- st, _, res = sendTestRequest(queryURL+"join", "PUT", jsonString)
- if st != "200 OK" || res != "" {
- t.Error("Unexpected response:", st, res)
- return
- }
- st, _, res = sendTestRequest(queryURL, "GET", nil)
- if st != "200 OK" || res != `
- {
- "failed": null,
- "members": [
- "TestClusterMember-2",
- "localhost:9022",
- "TestClusterMember-0",
- "localhost:9020",
- "TestClusterMember-1",
- "localhost:9021"
- ],
- "replication": 1,
- "ts": [
- "TestClusterMember-0",
- "3"
- ],
- "tsold": [
- "TestClusterMember-0",
- "2"
- ]
- }`[1:] {
- t.Error("Unexpected response:", st, res)
- return
- }
- }
- /*
- Create a cluster with n members (all storage is in memory)
- */
- func createCluster(n int) []*cluster.DistributedStorage {
- // By default no log output
- log.SetOutput(ioutil.Discard)
- var mgs []*graphstorage.MemoryGraphStorage
- var cs []*cluster.DistributedStorage
- cluster.ClearMSMap()
- for i := 0; i < n; i++ {
- mgs = append(mgs, graphstorage.NewMemoryGraphStorage(fmt.Sprintf("mgs%v", i+1)).(*graphstorage.MemoryGraphStorage))
- }
- for i := 0; i < n; i++ {
- ds, _ := cluster.NewDistributedStorage(mgs[i], map[string]interface{}{
- manager.ConfigRPC: fmt.Sprintf("localhost:%v", 9020+i),
- manager.ConfigMemberName: fmt.Sprintf("TestClusterMember-%v", i),
- manager.ConfigClusterSecret: "test123",
- }, manager.NewMemStateInfo())
- cs = append(cs, ds)
- }
- return cs
- }
- /*
- joinCluster joins up a given cluster.
- */
- func joinCluster(cluster []*cluster.DistributedStorage, t *testing.T) {
- for i, dd := range cluster {
- dd.Start()
- defer dd.Close()
- if i > 0 {
- err := dd.MemberManager.JoinCluster(cluster[0].MemberManager.Name(),
- cluster[0].MemberManager.NetAddr())
- if err != nil {
- t.Error(err)
- return
- }
- }
- }
- }
- func checkStateInfo(mm *manager.MemberManager, expectedStateInfo string) error {
- var w bytes.Buffer
- ret := json.NewEncoder(&w)
- ret.Encode(mm.StateInfo().Map())
- out := bytes.Buffer{}
- err := json.Indent(&out, w.Bytes(), "", " ")
- if err != nil {
- return err
- }
- if out.String() != expectedStateInfo {
- return fmt.Errorf("Unexpected state info: %v\nexpected: %v",
- out.String(), expectedStateInfo)
- }
- return nil
- }
|