Browse Source

fix: Make cluster translationRec and transferRec gobable

Matthias Ladkau 4 years ago
parent
commit
34623d8972

+ 13 - 4
cluster/memberaddresstable.go

@@ -11,6 +11,7 @@
 package cluster
 
 import (
+	"encoding/gob"
 	"errors"
 	"fmt"
 	"sync"
@@ -22,6 +23,14 @@ import (
 	"devt.de/krotik/eliasdb/storage"
 )
 
+func init() {
+
+	// Make sure we can use the relevant types in a gob operation
+
+	gob.Register(&translationRec{})
+	gob.Register(&transferRec{})
+}
+
 /*
 rootIDTranslationTree is the root id for the translation map
 */
@@ -47,16 +56,16 @@ translationRec is a translation record which stores a local storage location wit
 version number.
 */
 type translationRec struct {
-	loc uint64 // Local storage location
-	ver uint64 // Version of the local stored data
+	Loc uint64 // Local storage location
+	Ver uint64 // Version of the local stored data
 }
 
 /*
 transferRec is a transfer record which stores a data transfer request.
 */
 type transferRec struct {
-	members []string     // Target members
-	request *DataRequest // Data request
+	Members []string     // Target members
+	Request *DataRequest // Data request
 }
 
 /*

+ 4 - 4
cluster/memberaddresstable_test.go

@@ -144,7 +144,7 @@ func TestAddressTableClusterLoc(t *testing.T) {
 
 	// Now check the translation lookup
 
-	if tr, ok, err := ms1[0].at.TransClusterLoc("test1", 50); tr.loc != 123 || tr.ver != 1 || !ok || err != nil {
+	if tr, ok, err := ms1[0].at.TransClusterLoc("test1", 50); tr.Loc != 123 || tr.Ver != 1 || !ok || err != nil {
 		t.Error("Unexpected translation:", tr, ok, err)
 		return
 	}
@@ -154,17 +154,17 @@ func TestAddressTableClusterLoc(t *testing.T) {
 		return
 	}
 
-	if tr, ok, err := ms1[0].at.SetTransClusterLoc("test1", 50, 555, 2); tr.loc != 123 || tr.ver != 1 || !ok || err != nil {
+	if tr, ok, err := ms1[0].at.SetTransClusterLoc("test1", 50, 555, 2); tr.Loc != 123 || tr.Ver != 1 || !ok || err != nil {
 		t.Error("Unexpected translation:", tr, ok, err)
 		return
 	}
 
-	if tr, ok, err := ms1[0].at.TransClusterLoc("test1", 50); tr.loc != 555 || tr.ver != 2 || !ok || err != nil {
+	if tr, ok, err := ms1[0].at.TransClusterLoc("test1", 50); tr.Loc != 555 || tr.Ver != 2 || !ok || err != nil {
 		t.Error("Unexpected translation:", tr, ok, err)
 		return
 	}
 
-	if tr, ok, err := ms1[0].at.RemoveTransClusterLoc("test1", 50); tr.loc != 555 || tr.ver != 2 || !ok || err != nil {
+	if tr, ok, err := ms1[0].at.RemoveTransClusterLoc("test1", 50); tr.Loc != 555 || tr.Ver != 2 || !ok || err != nil {
 		t.Error("Unexpected translation:", tr, ok, err)
 		return
 	}

+ 19 - 18
cluster/memberstorage.go

@@ -231,10 +231,11 @@ func (ms *memberStorage) handleInsertRequest(distTable *DistributionTable, reque
 	}
 
 	if err == nil {
+		var loc uint64
 
 		// Insert into the local storage
 
-		loc, err := sm.Insert(request.Value)
+		loc, err = sm.Insert(request.Value)
 
 		if err == nil {
 
@@ -301,14 +302,14 @@ func (ms *memberStorage) handleUpdateRequest(distTable *DistributionTable, reque
 			// Update the local storage
 
 			if !request.Transfer {
-				err = sm.Update(transRec.loc, request.Value)
-				newVersion = transRec.ver + 1
+				err = sm.Update(transRec.Loc, request.Value)
+				newVersion = transRec.Ver + 1
 
 			} else {
 				newVersion = toUInt64(request.Args[RPVer])
 
-				if newVersion >= transRec.ver {
-					err = sm.Update(transRec.loc, request.Value)
+				if newVersion >= transRec.Ver {
+					err = sm.Update(transRec.Loc, request.Value)
 
 				} else {
 
@@ -330,7 +331,7 @@ func (ms *memberStorage) handleUpdateRequest(distTable *DistributionTable, reque
 
 				// Increase the version of the translation record
 
-				_, _, err = ms.at.SetTransClusterLoc(dsname, cloc, transRec.loc, newVersion)
+				_, _, err = ms.at.SetTransClusterLoc(dsname, cloc, transRec.Loc, newVersion)
 
 				if err == nil {
 
@@ -400,7 +401,7 @@ func (ms *memberStorage) handleFreeRequest(distTable *DistributionTable, request
 
 				// Remove from the local storage
 
-				err = sm.Free(transRec.loc)
+				err = sm.Free(transRec.Loc)
 
 				if !request.Transfer {
 
@@ -461,7 +462,7 @@ func (ms *memberStorage) handleFetchRequest(distTable *DistributionTable,
 		if sm != nil {
 			var res []byte
 
-			err = sm.Fetch(transRec.loc, &res)
+			err = sm.Fetch(transRec.Loc, &res)
 
 			if err == nil {
 
@@ -546,7 +547,7 @@ func (ms *memberStorage) handleRebalanceRequest(distTable *DistributionTable, re
 
 			// Check if the version is newer and update the local record if it is
 
-			if tr.ver < ver {
+			if tr.Ver < ver {
 
 				// Local record exists and needs to be updated
 
@@ -563,11 +564,11 @@ func (ms *memberStorage) handleRebalanceRequest(distTable *DistributionTable, re
 
 					// Update the local storage
 
-					if err = sm.Update(tr.loc, res); err == nil {
+					if err = sm.Update(tr.Loc, res); err == nil {
 
 						// Update the translation
 
-						_, _, err = ms.at.SetTransClusterLoc(smname, cloc, tr.loc, ver)
+						_, _, err = ms.at.SetTransClusterLoc(smname, cloc, tr.Loc, ver)
 
 						manager.LogDebug(ms.ds.MemberManager.Name(),
 							fmt.Sprintf("(Store): Rebalance updated %v location: %v", smname, cloc))
@@ -621,7 +622,7 @@ func (ms *memberStorage) handleRebalanceRequest(distTable *DistributionTable, re
 
 				manager.LogDebug(ms.ds.MemberManager.Name(),
 					fmt.Sprintf("(Store): Rebalance removes %v location: %v from member %v",
-						smname, tr.loc, rsource))
+						smname, tr.Loc, rsource))
 
 				_, err = ms.ds.sendDataRequest(rsource, &DataRequest{RTFree, map[DataRequestArg]interface{}{
 					RPStoreName: smname,
@@ -662,15 +663,15 @@ func (ms *memberStorage) dump(smname string) string {
 			if val != nil {
 				tr := val.(*transferRec)
 
-				args, _ := json.Marshal(tr.request.Args)
+				args, _ := json.Marshal(tr.Request.Args)
 
-				vals, ok := tr.request.Value.([]byte)
+				vals, ok := tr.Request.Value.([]byte)
 				if !ok {
-					vals, _ = json.Marshal(tr.request.Value)
+					vals, _ = json.Marshal(tr.Request.Value)
 				}
 
 				buf.WriteString(fmt.Sprintf("transfer: %v - %v %v %q\n",
-					tr.members, tr.request.RequestType, string(args), vals))
+					tr.Members, tr.Request.RequestType, string(args), vals))
 			}
 		}
 	}
@@ -724,8 +725,8 @@ func (ms *memberStorage) dump(smname string) string {
 				if strings.HasPrefix(key, fmt.Sprint(transPrefix, smname, "#")) {
 					key = string(key[len(fmt.Sprint(transPrefix, smname, "#")):])
 
-					locmap[v.(*translationRec).loc] = fmt.Sprintf("%v (v:%v)",
-						key, v.(*translationRec).ver)
+					locmap[v.(*translationRec).Loc] = fmt.Sprintf("%v (v:%v)",
+						key, v.(*translationRec).Ver)
 				}
 			}
 

+ 1 - 1
cluster/rebalance.go

@@ -108,7 +108,7 @@ func (ms *memberStorage) rebalanceWorker(forceRun bool) {
 
 				maintMgmts = append(maintMgmts, smname)
 				maintLocs = append(maintLocs, cloc)
-				maintVers = append(maintVers, tr.ver)
+				maintVers = append(maintVers, tr.Ver)
 			}
 		}
 

+ 5 - 5
cluster/transfer.go

@@ -73,15 +73,15 @@ func (ms *memberStorage) transferWorker() {
 
 			manager.LogDebug(ms.ds.Name(), "(TR): ",
 				fmt.Sprintf("Processing transfer request %v for %v from %v",
-					tr.request.RequestType, tr.members, ts))
+					tr.Request.RequestType, tr.Members, ts))
 
 			// Send the request to all members
 
 			var failedMembers []string
 
-			for _, member := range tr.members {
+			for _, member := range tr.Members {
 
-				if _, err := ms.ds.sendDataRequest(member, tr.request); err != nil {
+				if _, err := ms.ds.sendDataRequest(member, tr.Request); err != nil {
 					manager.LogDebug(ms.ds.Name(), "(TR): ",
 						fmt.Sprintf("Member %v Error: %v", member, err))
 
@@ -93,8 +93,8 @@ func (ms *memberStorage) transferWorker() {
 
 			if len(failedMembers) == 0 {
 				processed = append(processed, key)
-			} else if len(failedMembers) < len(tr.members) {
-				tr.members = failedMembers
+			} else if len(failedMembers) < len(tr.Members) {
+				tr.Members = failedMembers
 				ms.at.transfer.Put(key, tr)
 			}
 		}

+ 154 - 0
graph/graphmanager_cluster_test.go

@@ -23,6 +23,160 @@ import (
 	"devt.de/krotik/eliasdb/hash"
 )
 
+func TestClusterWithPhysicalStorage(t *testing.T) {
+
+	log.SetOutput(ioutil.Discard)
+
+	dgs1, err := graphstorage.NewDiskGraphStorage(GraphManagerTestDBDir5, false)
+	if err != nil {
+		t.Error(err)
+		return
+	}
+
+	ds1, _ := cluster.NewDistributedStorage(dgs1, map[string]interface{}{
+		manager.ConfigRPC:           fmt.Sprintf("localhost:%v", 9021),
+		manager.ConfigMemberName:    fmt.Sprintf("TestClusterMember-1"),
+		manager.ConfigClusterSecret: "test123",
+	}, manager.NewMemStateInfo())
+
+	ds1.Start()
+	defer ds1.Close()
+
+	dgs2, err := graphstorage.NewDiskGraphStorage(GraphManagerTestDBDir6, false)
+	if err != nil {
+		t.Error(err)
+		return
+	}
+
+	ds2, _ := cluster.NewDistributedStorage(dgs2, map[string]interface{}{
+		manager.ConfigRPC:           fmt.Sprintf("localhost:%v", 9022),
+		manager.ConfigMemberName:    fmt.Sprintf("TestClusterMember-2"),
+		manager.ConfigClusterSecret: "test123",
+	}, manager.NewMemStateInfo())
+
+	ds2.Start()
+	defer ds2.Close()
+
+	err = ds2.MemberManager.JoinCluster(ds1.MemberManager.Name(),
+		ds1.MemberManager.NetAddr())
+	if err != nil {
+		t.Error(err)
+		return
+	}
+
+	sm := ds1.StorageManager("foo", true)
+	sm2 := ds2.StorageManager("foo", true)
+
+	loc, err := sm.Insert("test123")
+	if loc != 1 || err != nil {
+		t.Error("Unexpected result:", loc, err)
+		return
+	}
+
+	loc, err = sm2.Insert("test456")
+	if loc != 2 || err != nil {
+		t.Error("Unexpected result:", loc, err)
+		return
+	}
+
+	res := ""
+
+	if err := sm2.Fetch(1, &res); err != nil {
+		t.Error(err)
+		return
+	}
+
+	if res != "test123" {
+		t.Error("Unexpected result:", res)
+		return
+	}
+
+	if err := sm2.Fetch(2, &res); err != nil {
+		t.Error(err)
+		return
+	}
+
+	if res != "test456" {
+		t.Error("Unexpected result:", res)
+		return
+	}
+
+	// *** HTree storage
+
+	// Use a HTree to insert to and fetch from a storage manager
+
+	sm = ds1.StorageManager("foo2", true)
+	sm2 = ds2.StorageManager("foo2", true)
+
+	htree, err := hash.NewHTree(sm)
+	if err != nil {
+		t.Error(err)
+		return
+	}
+
+	if valres, err := htree.Put([]byte("123"), "Test1"); err != nil || valres != nil {
+		t.Error("Unexpected result:", valres, err)
+		return
+	}
+
+	if valres, err := htree.Put([]byte("123"), "Test2"); err != nil || valres != "Test1" {
+		t.Error("Unexpected result:", valres, err)
+		return
+	}
+
+	// Try to retrieve the item again
+
+	cluster.WaitForTransfer()
+
+	if val, err := htree.Get([]byte("123")); err != nil || val != "Test2" {
+		t.Error("Unexpected result:", val, err)
+		return
+	}
+
+	htree2, err := hash.LoadHTree(sm2, 1)
+	if val, err := htree2.Get([]byte("123")); err != nil || val != "Test2" {
+		t.Error("Unexpected result:", val, err)
+		return
+	}
+
+	// *** GraphManager storage
+
+	gm1 := NewGraphManager(ds1)
+
+	if err := gm1.StoreNode("main", data.NewGraphNodeFromMap(map[string]interface{}{
+		"key":  "123",
+		"kind": "testnode",
+		"foo":  "bar",
+	})); err != nil {
+		t.Error("Unexpected result:", err)
+		return
+	}
+
+	cluster.WaitForTransfer()
+
+	if node, err := gm1.FetchNode("main", "123", "testnode"); err != nil ||
+		node.String() != `GraphNode:
+     key : 123
+    kind : testnode
+     foo : bar
+` {
+		t.Error("Unexpected result:", node, err)
+		return
+	}
+
+	gm2 := NewGraphManager(ds2)
+
+	if node, err := gm2.FetchNode("main", "123", "testnode"); err != nil ||
+		node.String() != `GraphNode:
+     key : 123
+    kind : testnode
+     foo : bar
+` {
+		t.Error("Unexpected result:", node, err)
+		return
+	}
+}
+
 func TestClusterStorage(t *testing.T) {
 
 	cluster2 := createCluster(2)

+ 4 - 1
graph/graphmanager_test.go

@@ -30,9 +30,12 @@ const GraphManagerTestDBDir1 = "gmtest1"
 const GraphManagerTestDBDir2 = "gmtest2"
 const GraphManagerTestDBDir3 = "gmtest3"
 const GraphManagerTestDBDir4 = "gmtest4"
+const GraphManagerTestDBDir5 = "gmtest5"
+const GraphManagerTestDBDir6 = "gmtest6"
 
 var DBDIRS = []string{GraphManagerTestDBDir1, GraphManagerTestDBDir2,
-	GraphManagerTestDBDir3, GraphManagerTestDBDir4}
+	GraphManagerTestDBDir3, GraphManagerTestDBDir4, GraphManagerTestDBDir5,
+	GraphManagerTestDBDir6}
 
 const InvlaidFileName = "**" + string(0x0)
 

+ 0 - 0
graph/import_export.go


+ 0 - 0
graph/import_export_test.go