123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296 |
- /*
- * EliasDB
- *
- * Copyright 2016 Matthias Ladkau. All rights reserved.
- *
- * This Source Code Form is subject to the terms of the Mozilla Public
- * License, v. 2.0. If a copy of the MPL was not distributed with this
- * file, You can obtain one at http://mozilla.org/MPL/2.0/.
- */
- package graph
- import (
- "errors"
- "fmt"
- "strings"
- "sync"
- "testing"
- "devt.de/krotik/eliasdb/graph/data"
- "devt.de/krotik/eliasdb/graph/graphstorage"
- "devt.de/krotik/eliasdb/storage"
- )
- /*
- lockTrans is a special transaction for testing which can lock on commit.
- */
- type lockTrans struct {
- Trans
- Lock *sync.Mutex
- }
- func (gt *lockTrans) Commit() error {
- gt.Lock.Lock()
- defer gt.Lock.Unlock()
- return gt.Trans.Commit()
- }
- func newLockTrans(gm *Manager) Trans {
- return &lockTrans{NewConcurrentGraphTrans(gm), &sync.Mutex{}}
- }
- func TestRollingTrans(t *testing.T) {
- var err error
- constructNode := func(key string) data.Node {
- node := data.NewGraphNode()
- node.SetAttr("key", key)
- node.SetAttr("kind", "testnode")
- node.SetAttr(data.NodeName, key)
- return node
- }
- constructEdge := func(key1, key2 string) data.Edge {
- edge := data.NewGraphEdge()
- edge.SetAttr("key", key1+key2)
- edge.SetAttr("kind", "testedge")
- edge.SetAttr(data.NodeName, "testedge")
- edge.SetAttr(data.EdgeEnd1Key, key1)
- edge.SetAttr(data.EdgeEnd1Kind, "testnode")
- edge.SetAttr(data.EdgeEnd1Role, "node1")
- edge.SetAttr(data.EdgeEnd1Cascading, false)
- edge.SetAttr(data.EdgeEnd2Key, key2)
- edge.SetAttr(data.EdgeEnd2Kind, "testnode")
- edge.SetAttr(data.EdgeEnd2Role, "node2")
- edge.SetAttr(data.EdgeEnd2Cascading, false)
- return edge
- }
- // Creeate storage and insert test nodes
- mgs := graphstorage.NewMemoryGraphStorage("mystorage")
- gm := newGraphManagerNoRules(mgs)
- var trans Trans
- // Create a new Rolling Trans which rolls over after 3 operations
- trans = NewRollingTrans(newLockTrans(gm), 3, gm, newLockTrans)
- trans.(*rollingTrans).id = "42"
- if trans.ID() != "42" {
- t.Error("Unexpected result:", trans.ID())
- return
- }
- if !trans.IsEmpty() {
- t.Error("Unexpected result")
- return
- }
- trans.StoreNode("main", constructNode("1"))
- trans.StoreNode("main", constructNode("2"))
- if res := trans.String(); res != "Rolling transaction 42 - Nodes: I:2 R:0 - Edges: I:0 R:0 - Threshold: 3 - In-flight: 0" {
- t.Error("Unexpected result:", res)
- return
- }
- // Straight case - we commit an operation over the threshold; the current
- // transaction is committed and a new one is put in place.
- trans.StoreEdge("main", constructEdge("1", "2"))
- trans.(*rollingTrans).wg.Wait() // The wait here ensures that wewait until the transaction was committed
- // The output should show a clean transaction since all changes
- // have been committed to the datastore
- if res := trans.String(); res != "Rolling transaction 42 - Nodes: I:0 R:0 - Edges: I:0 R:0 - Threshold: 3 - In-flight: 0" {
- t.Error("Unexpected result:", res)
- return
- }
- trans.StoreNode("main", constructNode("1"))
- trans.StoreNode("main", constructNode("2"))
- // Lock the commit of the current transaction - this lets us examine
- // what happens duing the commit of the current transaction
- currentTransLock := trans.(*rollingTrans).currentTrans.(*lockTrans).Lock
- currentTransLock.Lock()
- trans.StoreEdge("main", constructEdge("1", "2"))
- // The committing go routine hangs now on the commit lock and we'll see that one transaction is in flight
- if res := trans.String(); res != "Rolling transaction 42 - Nodes: I:2 R:0 - Edges: I:1 R:0 - Threshold: 3 - In-flight: 1" {
- t.Error("Unexpected result:", res)
- return
- }
- if res := fmt.Sprint(trans.Counts()); res != "2 1 0 0" {
- t.Error("Unexpected result:", res)
- return
- }
- // Release the lock and wait until the commit has finished
- currentTransLock.Unlock()
- trans.(*rollingTrans).wg.Wait() // The wait here ensures that wewait until the transaction was committed
- // The transaction should be empty again
- if res := trans.String(); res != "Rolling transaction 42 - Nodes: I:0 R:0 - Edges: I:0 R:0 - Threshold: 3 - In-flight: 0" {
- t.Error("Unexpected result:", res)
- return
- }
- // Do now some proper operations
- if err = trans.StoreNode("main", constructNode("test")); err != nil {
- t.Error(err)
- return
- }
- if err = trans.UpdateNode("main", constructNode("test")); err != nil {
- t.Error(err)
- return
- }
- if err = trans.RemoveNode("main", "test", "testnode"); err != nil {
- t.Error(err)
- return
- }
- if err = trans.StoreEdge("main", constructEdge("1", "3")); err != nil {
- t.Error(err)
- return
- }
- if err = trans.RemoveEdge("main", "13", "testedge"); err != nil {
- t.Error(err)
- return
- }
- if err = trans.Commit(); err != nil {
- t.Error(err)
- return
- }
- // Test error cases
- trans = NewRollingTrans(newLockTrans(gm), 5, gm, newLockTrans)
- if err = trans.StoreEdge("main", constructEdge("1", "13")); err != nil {
- t.Error(err)
- return
- }
- if err = trans.Commit(); err == nil || err.Error() != "GraphError: Invalid data (Can't find edge endpoint: 13 (testnode))" {
- t.Error(err)
- return
- }
- trans = NewRollingTrans(newLockTrans(gm), 0, gm, newLockTrans)
- if err = trans.StoreEdge("main", constructEdge("1", "13")); err != nil {
- t.Error(err)
- return
- }
- if err = trans.Commit(); err == nil || err.Error() != "GraphError: Invalid data (Can't find edge endpoint: 13 (testnode))" {
- t.Error(err)
- return
- }
- }
- func TestNormalTrans(t *testing.T) {
- if !RunDiskStorageTests {
- return
- }
- constructEdge := func(node1 data.Node, kind string, node2 data.Node) data.Edge {
- edge := data.NewGraphEdge()
- edge.SetAttr("key", "abc"+node1.Key()+node2.Key())
- edge.SetAttr("kind", kind)
- edge.SetAttr(data.EdgeEnd1Key, node1.Key())
- edge.SetAttr(data.EdgeEnd1Kind, node1.Kind())
- edge.SetAttr(data.EdgeEnd1Role, "node1")
- edge.SetAttr(data.EdgeEnd1Cascading, true)
- edge.SetAttr(data.EdgeEnd2Key, node2.Key())
- edge.SetAttr(data.EdgeEnd2Kind, node2.Kind())
- edge.SetAttr(data.EdgeEnd2Role, "node2")
- edge.SetAttr(data.EdgeEnd2Cascading, false)
- edge.SetAttr(data.NodeName, "Edge "+kind)
- return edge
- }
- dgs, err := graphstorage.NewDiskGraphStorage(GraphManagerTestDBDir4, false)
- if err != nil {
- t.Error(err)
- return
- }
- gm := newGraphManagerNoRules(dgs)
- node1 := data.NewGraphNode()
- node1.SetAttr("key", "123")
- node1.SetAttr("kind", "mynode")
- node1.SetAttr("Name", "Node1")
- node2 := data.NewGraphNode()
- node2.SetAttr("key", "456")
- node2.SetAttr("kind", "mynewnode")
- node2.SetAttr("Name", "Node2")
- trans := NewConcurrentGraphTrans(gm)
- // Trivial functions
- if trans.ID() == "" {
- t.Error("ID should not be empty")
- return
- }
- if err := trans.UpdateNode("", data.NewGraphNode()); err == nil || err.Error() != "GraphError: Invalid data (Node is missing a key value)" {
- t.Error(err)
- return
- }
- if !trans.IsEmpty() {
- t.Error("New transaction should be empty")
- return
- }
- // Store some nodes
- if err := trans.StoreNode("main", node1); err != nil {
- t.Error(err)
- return
- }
- if err := trans.StoreNode("main", node2); err != nil {
- t.Error(err)
- return
- }
- if err := trans.StoreEdge("main", constructEdge(node1, "myedge", node2)); err != nil {
- t.Error(err)
- return
- }
- if c := gm.NodeCount("mynode"); c != 0 {
- t.Error("Unexpected node count:", c)
- return
- }
- if err := trans.Commit(); err != nil {
- t.Error(err)
- return
- }
- if c := gm.NodeCount("mynode"); c != 1 {
- t.Error("Unexpected node count:", c)
- return
- }
- if c := gm.NodeCount("mynewnode"); c != 1 {
- t.Error("Unexpected node count:", c)
- return
- }
- if c := gm.EdgeCount("myedge"); c != 1 {
- t.Error("Unexpected edge count:", c)
- return
- }
- trans2 := NewConcurrentGraphTrans(gm)
- trans3 := NewConcurrentGraphTrans(gm)
- node3 := data.NewGraphNode()
- node3.SetAttr("key", "789")
- node3.SetAttr("kind", "mynode")
- node3.SetAttr("Name", "Node3")
- node4 := data.NewGraphNode()
- node4.SetAttr("key", "abc")
- node4.SetAttr("kind", "mynode")
- node4.SetAttr("Name", "Node4")
- if err := trans2.StoreNode("main", node3); err != nil {
- t.Error(err)
- return
- }
- if err := trans2.StoreEdge("main", constructEdge(node3, "myedge", node4)); err != nil {
- t.Error(err)
- return
- }
- if err := trans3.StoreNode("main", node4); err != nil {
- t.Error(err)
- return
- }
- if res := fmt.Sprint(trans2.Counts()); res != "1 1 0 0" {
- t.Error("Unexpected result:", res)
- return
- }
- trans2.(*concurrentTrans).Trans.(*baseTrans).id = "1"
- if res := fmt.Sprint(trans2); res != "Transaction 1 - Nodes: I:1 R:0 - Edges: I:1 R:0" {
- t.Error("Unexpected result:", res)
- return
- }
- // This should fail since node 4 is not there
- if err := trans2.Commit(); err.Error() != "GraphError: Invalid data (Can't find edge endpoint: abc (mynode))" {
- t.Error(err)
- return
- }
- // Transaction should have rolled back
- if c := gm.NodeCount("mynode"); c != 1 {
- t.Error("Unexpected node count:", c)
- return
- }
- if c := gm.NodeCount("mynewnode"); c != 1 {
- t.Error("Unexpected node count:", c)
- return
- }
- if c := gm.EdgeCount("myedge"); c != 1 {
- t.Error("Unexpected edge count:", c)
- return
- }
- // Now commit transaction 3 to make transaction 2 work
- if err := trans3.Commit(); err != nil {
- t.Error(err)
- return
- }
- if c := gm.NodeCount("mynode"); c != 2 {
- t.Error("Unexpected node count:", c)
- return
- }
- if c := gm.NodeCount("mynewnode"); c != 1 {
- t.Error("Unexpected node count:", c)
- return
- }
- if c := gm.EdgeCount("myedge"); c != 1 {
- t.Error("Unexpected edge count:", c)
- return
- }
- if err := trans2.StoreNode("main", node3); err != nil {
- t.Error(err)
- return
- }
- if err := trans2.StoreEdge("main", constructEdge(node3, "myedge", node4)); err != nil {
- t.Error(err)
- return
- }
- if err := trans2.Commit(); err != nil {
- t.Error(err)
- return
- }
- if c := gm.NodeCount("mynode"); c != 3 {
- t.Error("Unexpected node count:", c)
- return
- }
- if c := gm.NodeCount("mynewnode"); c != 1 {
- t.Error("Unexpected node count:", c)
- return
- }
- if c := gm.EdgeCount("myedge"); c != 2 {
- t.Error("Unexpected edge count:", c)
- return
- }
- // Check that we commit the transactions again - the inserts become
- // updates but the numbers won't change
- transUpdate := NewConcurrentGraphTrans(gm)
- transUpdate.StoreNode("main", node1)
- transUpdate.StoreNode("main", node2)
- transUpdate.StoreNode("main", node3)
- transUpdate.StoreNode("main", node4)
- transUpdate.StoreEdge("main", constructEdge(node1, "myedge", node2))
- transUpdate.StoreEdge("main", constructEdge(node3, "myedge", node4))
- if err := transUpdate.Commit(); err != nil {
- t.Error(err)
- return
- }
- // Test commit of empty transaction
- if err := NewConcurrentGraphTrans(gm).Commit(); err != nil {
- t.Error(err)
- return
- }
- if c := gm.NodeCount("mynode"); c != 3 {
- t.Error("Unexpected node count:", c)
- return
- }
- if c := gm.NodeCount("mynewnode"); c != 1 {
- t.Error("Unexpected node count:", c)
- return
- }
- if c := gm.EdgeCount("myedge"); c != 2 {
- t.Error("Unexpected edge count:", c)
- return
- }
- // Test search index updates
- q, _ := gm.NodeIndexQuery("main", node4.Kind())
- res, _ := q.LookupWord("Name", "Node4")
- if fmt.Sprint(res) != "map[abc:[1]]" {
- t.Error("Unexpected index lookup result:", res)
- return
- }
- // Test removal of stuff
- trans4 := NewConcurrentGraphTrans(gm)
- trans4.RemoveEdge("main", "abc789abc", "myedge")
- trans4.RemoveNode("main", node4.Key(), node4.Kind())
- if err := trans4.Commit(); err != nil {
- t.Error(err)
- return
- }
- q, _ = gm.NodeIndexQuery("main", node4.Kind())
- res, _ = q.LookupWord("Name", "Node4")
- if fmt.Sprint(res) != "map[]" {
- t.Error("Unexpected index lookup result:", res)
- return
- }
- if c := gm.NodeCount("mynode"); c != 2 {
- t.Error("Unexpected node count:", c)
- return
- }
- if c := gm.NodeCount("mynewnode"); c != 1 {
- t.Error("Unexpected node count:", c)
- return
- }
- if c := gm.EdgeCount("myedge"); c != 1 {
- t.Error("Unexpected edge count:", c)
- return
- }
- dgs.Close()
- }
- func TestTransBuilding(t *testing.T) {
- node1 := data.NewGraphNode()
- node1.SetAttr("key", "123")
- node1.SetAttr("kind", "mykind")
- node1.SetAttr("Name", "Node1")
- updnode1 := data.NewGraphNode()
- updnode1.SetAttr("key", "123")
- updnode1.SetAttr("kind", "mykind")
- updnode1.SetAttr("Update", "ok")
- node2 := data.NewGraphNode()
- node2.SetAttr("key", "456")
- node2.SetAttr("kind", "mynewkind")
- node2.SetAttr("Name", "Node2")
- constructEdge := func(kind string) data.Edge {
- edge := data.NewGraphEdge()
- edge.SetAttr("key", "abc")
- edge.SetAttr("kind", kind)
- edge.SetAttr(data.EdgeEnd1Key, node1.Key())
- edge.SetAttr(data.EdgeEnd1Kind, node1.Kind())
- edge.SetAttr(data.EdgeEnd1Role, "node1")
- edge.SetAttr(data.EdgeEnd1Cascading, true)
- edge.SetAttr(data.EdgeEnd2Key, node2.Key())
- edge.SetAttr(data.EdgeEnd2Kind, node2.Kind())
- edge.SetAttr(data.EdgeEnd2Role, "node2")
- edge.SetAttr(data.EdgeEnd2Cascading, false)
- edge.SetAttr(data.NodeName, "Edge "+kind)
- return edge
- }
- // Creeate storage and insert test nodes
- mgs := graphstorage.NewMemoryGraphStorage("mystorage")
- gm := newGraphManagerNoRules(mgs)
- trans := newInternalGraphTrans(gm)
- if err := trans.StoreNode("main", node1); err != nil {
- t.Error(err)
- return
- }
- checkMaps(t, trans, "main", node1.Key(), node1.Kind(), true, false, false, false)
- countMaps(t, trans, 1, 0, 0, 0)
- if err := trans.StoreNode("main", node2); err != nil {
- t.Error(err)
- return
- }
- checkMaps(t, trans, "main", node1.Key(), node1.Kind(), true, false, false, false)
- checkMaps(t, trans, "main", node2.Key(), node2.Kind(), true, false, false, false)
- countMaps(t, trans, 2, 0, 0, 0)
- edge1 := constructEdge("myedge")
- if err := trans.StoreEdge("main", edge1); err != nil {
- t.Error(err)
- return
- }
- checkMaps(t, trans, "main", node1.Key(), node1.Kind(), true, false, false, false)
- checkMaps(t, trans, "main", node2.Key(), node2.Kind(), true, false, false, false)
- checkMaps(t, trans, "main", edge1.Key(), edge1.Kind(), false, false, true, false)
- countMaps(t, trans, 2, 0, 1, 0)
- // Check that updating will not remove anything
- if err := trans.UpdateNode("main", updnode1); err != nil {
- t.Error(err)
- return
- }
- checkMaps(t, trans, "main", node1.Key(), node1.Kind(), true, false, false, false)
- checkMaps(t, trans, "main", node2.Key(), node2.Kind(), true, false, false, false)
- checkMaps(t, trans, "main", edge1.Key(), edge1.Kind(), false, false, true, false)
- countMaps(t, trans, 2, 0, 1, 0)
- // Check that the node was updated
- test := trans.storeNodes[trans.createKey("main", node1.Key(), node1.Kind())]
- if !data.NodeCompare(test, data.NodeMerge(node1, updnode1), nil) {
- t.Error("Unexpected update result:", test)
- return
- }
- // Remove a node
- if err := trans.RemoveNode("main", node1.Key(), node1.Kind()); err != nil {
- t.Error(err)
- return
- }
- checkMaps(t, trans, "main", node1.Key(), node1.Kind(), false, true, false, false)
- checkMaps(t, trans, "main", node2.Key(), node2.Kind(), true, false, false, false)
- checkMaps(t, trans, "main", edge1.Key(), edge1.Kind(), false, false, true, false)
- countMaps(t, trans, 1, 1, 1, 0)
- // Check that the update does an insert
- if err := trans.UpdateNode("main", updnode1); err != nil {
- t.Error(err)
- return
- }
- checkMaps(t, trans, "main", node1.Key(), node1.Kind(), true, false, false, false)
- checkMaps(t, trans, "main", node2.Key(), node2.Kind(), true, false, false, false)
- checkMaps(t, trans, "main", edge1.Key(), edge1.Kind(), false, false, true, false)
- countMaps(t, trans, 2, 0, 1, 0)
- // Check that the node was inserted
- test = trans.storeNodes[trans.createKey("main", node1.Key(), node1.Kind())]
- if !data.NodeCompare(test, updnode1, nil) {
- t.Error("Unexpected update result:", test)
- return
- }
- // Remove a node
- if err := trans.RemoveNode("main", node1.Key(), node1.Kind()); err != nil {
- t.Error(err)
- return
- }
- checkMaps(t, trans, "main", node1.Key(), node1.Kind(), false, true, false, false)
- checkMaps(t, trans, "main", node2.Key(), node2.Kind(), true, false, false, false)
- checkMaps(t, trans, "main", edge1.Key(), edge1.Kind(), false, false, true, false)
- countMaps(t, trans, 1, 1, 1, 0)
- // Store node again
- if err := trans.StoreNode("main", node1); err != nil {
- t.Error(err)
- return
- }
- checkMaps(t, trans, "main", node1.Key(), node1.Kind(), true, false, false, false)
- checkMaps(t, trans, "main", node2.Key(), node2.Kind(), true, false, false, false)
- checkMaps(t, trans, "main", edge1.Key(), edge1.Kind(), false, false, true, false)
- countMaps(t, trans, 2, 0, 1, 0)
- // Remove edge
- if err := trans.RemoveEdge("main", edge1.Key(), edge1.Kind()); err != nil {
- t.Error(err)
- return
- }
- checkMaps(t, trans, "main", node1.Key(), node1.Kind(), true, false, false, false)
- checkMaps(t, trans, "main", node2.Key(), node2.Kind(), true, false, false, false)
- checkMaps(t, trans, "main", edge1.Key(), edge1.Kind(), false, false, false, true)
- countMaps(t, trans, 2, 0, 0, 1)
- // Store the edge again
- if err := trans.StoreEdge("main", edge1); err != nil {
- t.Error(err)
- return
- }
- checkMaps(t, trans, "main", node1.Key(), node1.Kind(), true, false, false, false)
- checkMaps(t, trans, "main", node2.Key(), node2.Kind(), true, false, false, false)
- checkMaps(t, trans, "main", edge1.Key(), edge1.Kind(), false, false, true, false)
- countMaps(t, trans, 2, 0, 1, 0)
- // Test trivial errors using broken nodes and partitions
- brokenNode := data.NewGraphNode()
- if trans.StoreNode("b b", brokenNode) == nil || trans.StoreNode("main", brokenNode) == nil {
- t.Error("Error result was expected")
- return
- }
- if trans.UpdateNode("b b", brokenNode) == nil || trans.UpdateNode("main", brokenNode) == nil {
- t.Error("Error result was expected")
- return
- }
- if trans.RemoveNode("b b", "123", "bla") == nil {
- t.Error("Error result was expected")
- return
- }
- brokenEdge := data.NewGraphEdge()
- if trans.StoreEdge("b b", brokenEdge) == nil || trans.StoreEdge("main", brokenEdge) == nil {
- t.Error("Error result was expected")
- return
- }
- if trans.RemoveEdge("b b", "123", "bla") == nil {
- t.Error("Error result was expected")
- return
- }
- // Test update case if a node exists already in the datastore
- node3instore := data.NewGraphNode()
- node3instore.SetAttr("key", "789")
- node3instore.SetAttr("kind", "mynewkind")
- node3instore.SetAttr("Existing", "TestNode3")
- gm.StoreNode("main", node3instore)
- node3 := data.NewGraphNode()
- node3.SetAttr("key", "789")
- node3.SetAttr("kind", "mynewkind")
- node3.SetAttr("Name", "Node3")
- sm := gm.gs.StorageManager("main"+node3.Kind()+StorageSuffixNodes, false)
- sm.(*storage.MemoryStorageManager).AccessMap[1] = storage.AccessCacheAndFetchError
- // Check that the update fails
- if err := trans.UpdateNode("main", node3); err.Error() !=
- "GraphError: Failed to access graph storage component (Slot not found (mystorage/mainmynewkind.nodes - Location:1))" {
- t.Error(err)
- return
- }
- checkMaps(t, trans, "main", node1.Key(), node1.Kind(), true, false, false, false)
- checkMaps(t, trans, "main", node2.Key(), node2.Kind(), true, false, false, false)
- checkMaps(t, trans, "main", edge1.Key(), edge1.Kind(), false, false, true, false)
- countMaps(t, trans, 2, 0, 1, 0)
- delete(sm.(*storage.MemoryStorageManager).AccessMap, 1)
- // Check that the update is converted to an insert with the updated node
- if err := trans.UpdateNode("main", node3); err != nil {
- t.Error(err)
- return
- }
- checkMaps(t, trans, "main", node1.Key(), node1.Kind(), true, false, false, false)
- checkMaps(t, trans, "main", node2.Key(), node2.Kind(), true, false, false, false)
- checkMaps(t, trans, "main", node2.Key(), node3.Kind(), true, false, false, false)
- checkMaps(t, trans, "main", edge1.Key(), edge1.Kind(), false, false, true, false)
- countMaps(t, trans, 3, 0, 1, 0)
- test = trans.storeNodes[trans.createKey("main", node3.Key(), node3.Kind())]
- if !data.NodeCompare(test, data.NodeMerge(node3instore, node3), nil) {
- t.Error("Unexpected update result:", test)
- return
- }
- }
- func checkMaps(t *testing.T, trans *baseTrans, part string, ikey string, ikind string,
- nodeStore bool, nodeRemove bool, edgeStore bool, edgeRemove bool) {
- key := trans.createKey(part, ikey, ikind)
- if _, ok := trans.storeNodes[key]; ok != nodeStore {
- t.Error("Expected element is not in storesNodes:", key)
- }
- if _, ok := trans.removeNodes[key]; ok != nodeRemove {
- t.Error("Expected element is not in removeNodes:", key)
- }
- if _, ok := trans.storeEdges[key]; ok != edgeStore {
- t.Error("Expected element is not in storesEdges:", key)
- }
- if _, ok := trans.removeEdges[key]; ok != edgeRemove {
- t.Error("Expected element is not in removeEdges:", key)
- }
- }
- func countMaps(t *testing.T, trans *baseTrans, nodeStore int, nodeRemove int,
- edgeStore int, edgeRemove int) {
- if c := len(trans.storeNodes); c != nodeStore {
- t.Error("Unexpected storeNodes count:", c, " expected:", nodeStore)
- }
- if c := len(trans.removeNodes); c != nodeRemove {
- t.Error("Unexpected storeNodes count:", c, " expected:", nodeRemove)
- }
- if c := len(trans.storeEdges); c != edgeStore {
- t.Error("Unexpected storeNodes count:", c, " expected:", edgeStore)
- }
- if c := len(trans.removeEdges); c != edgeRemove {
- t.Error("Unexpected storeNodes count:", c, " expected:", edgeRemove)
- }
- }
- func TestTransErrors(t *testing.T) {
- testTransPanic(t)
- constructEdge := func(node1 data.Node, kind string, node2 data.Node) data.Edge {
- edge := data.NewGraphEdge()
- edge.SetAttr("key", "abc"+node1.Key()+node2.Key())
- edge.SetAttr("kind", kind)
- edge.SetAttr(data.EdgeEnd1Key, node1.Key())
- edge.SetAttr(data.EdgeEnd1Kind, node1.Kind())
- edge.SetAttr(data.EdgeEnd1Role, "node1")
- edge.SetAttr(data.EdgeEnd1Cascading, true)
- edge.SetAttr(data.EdgeEnd2Key, node2.Key())
- edge.SetAttr(data.EdgeEnd2Kind, node2.Kind())
- edge.SetAttr(data.EdgeEnd2Role, "node2")
- edge.SetAttr(data.EdgeEnd2Cascading, false)
- edge.SetAttr(data.NodeName, "Edge "+kind)
- return edge
- }
- mgs := graphstorage.NewMemoryGraphStorage("mystorage")
- gm := newGraphManagerNoRules(mgs)
- trans := NewConcurrentGraphTrans(gm)
- resetTrans := func(namesuffix string) {
- trans = NewConcurrentGraphTrans(gm)
- node1 := data.NewGraphNode()
- node1.SetAttr("key", "123")
- node1.SetAttr("kind", "mynode")
- node1.SetAttr("Name", "Node1"+namesuffix)
- node2 := data.NewGraphNode()
- node2.SetAttr("key", "456")
- node2.SetAttr("kind", "mynewnode")
- node2.SetAttr("Name", "Node2"+namesuffix)
- if err := trans.StoreNode("main", node1); err != nil {
- t.Error(err)
- return
- }
- if err := trans.StoreNode("main", node2); err != nil {
- t.Error(err)
- return
- }
- if err := trans.StoreEdge("main", constructEdge(node1, "myedge", node2)); err != nil {
- t.Error(err)
- return
- }
- }
- resetTransAndStorage := func() {
- mgs = graphstorage.NewMemoryGraphStorage("mystorage")
- gm = newGraphManagerNoRules(mgs)
- resetTrans("")
- }
- resetTransAndStorage()
- // Test an inaccessible edge index
- storage.MsmCallNumRollback = 0
- sm := mgs.StorageManager("main"+"myedge"+StorageSuffixEdgesIndex, true).(*storage.MemoryStorageManager)
- sm.AccessMap[3] = storage.AccessInsertError
- if err := trans.Commit(); !strings.Contains(fmt.Sprint(err), "GraphError: Index error") {
- t.Error("Unexpected error return:", err)
- return
- }
- if storage.MsmCallNumRollback != 6 {
- t.Error("Unexpected number of rollback calls:", storage.MsmCallNumRollback)
- }
- delete(sm.AccessMap, 3)
- // Test node commit failures
- resetTransAndStorage()
- sm = mgs.StorageManager("main"+"mynode"+StorageSuffixNodes, true).(*storage.MemoryStorageManager)
- sm.AccessMap[1] = storage.AccessInsertError
- if err := trans.Commit(); !strings.Contains(fmt.Sprint(err), "GraphError: Failed to access graph storage component") {
- t.Error("Unexpected error return:", err)
- return
- }
- delete(sm.AccessMap, 1)
- resetTrans("")
- sm = mgs.StorageManager("main"+"mynode"+StorageSuffixNodes, true).(*storage.MemoryStorageManager)
- sm.AccessMap[4] = storage.AccessInsertError
- if err := trans.Commit(); !strings.Contains(fmt.Sprint(err), "GraphError: Could not write graph information") {
- fmt.Println(sm)
- t.Error("Unexpected error return:", err)
- return
- }
- delete(sm.AccessMap, 4)
- resetTransAndStorage()
- sm = mgs.StorageManager("main"+"mynode"+StorageSuffixNodesIndex, true).(*storage.MemoryStorageManager)
- sm.AccessMap[1] = storage.AccessInsertError
- if err := trans.Commit(); !strings.Contains(fmt.Sprint(err), "GraphError: Failed to access graph storage component") {
- t.Error("Unexpected error return:", err)
- return
- }
- delete(sm.AccessMap, 1)
- resetTransAndStorage()
- sm = mgs.StorageManager("main"+"mynode"+StorageSuffixNodesIndex, true).(*storage.MemoryStorageManager)
- sm.AccessMap[2] = storage.AccessInsertError
- if err := trans.Commit(); !strings.Contains(fmt.Sprint(err), "GraphError: Index error") {
- t.Error("Unexpected error return:", err)
- return
- }
- delete(sm.AccessMap, 2)
- resetTransAndStorage()
- if err := trans.Commit(); err != nil {
- t.Error(err)
- }
- resetTrans("123")
- sm = mgs.StorageManager("main"+"mynode"+StorageSuffixNodesIndex, false).(*storage.MemoryStorageManager)
- sm.AccessMap[2] = storage.AccessCacheAndFetchError
- if err := trans.Commit(); !strings.Contains(fmt.Sprint(err), "GraphError: Index error") {
- t.Error("Unexpected error return:", err)
- return
- }
- delete(sm.AccessMap, 2)
- trans2 := NewConcurrentGraphTrans(gm)
- trans2.RemoveNode("main", "123", "mynode")
- sm = mgs.StorageManager("main"+"mynode"+StorageSuffixNodesIndex, false).(*storage.MemoryStorageManager)
- sm.AccessMap[1] = storage.AccessCacheAndFetchError
- if err := trans2.Commit(); !strings.Contains(fmt.Sprint(err), "GraphError: Failed to access graph storage component") {
- t.Error("Unexpected error return:", err)
- return
- }
- delete(sm.AccessMap, 1)
- resetTransAndStorage()
- if err := trans.Commit(); err != nil {
- t.Error(err)
- }
- trans2 = NewConcurrentGraphTrans(gm)
- trans2.RemoveNode("main", "123", "mynode")
- sm = mgs.StorageManager("main"+"mynode"+StorageSuffixNodesIndex, false).(*storage.MemoryStorageManager)
- sm.AccessMap[2] = storage.AccessCacheAndFetchError
- if err := trans2.Commit(); !strings.Contains(fmt.Sprint(err), "GraphError: Index error") {
- t.Error("Unexpected error return:", err)
- return
- }
- delete(sm.AccessMap, 2)
- resetTransAndStorage()
- if err := trans.Commit(); err != nil {
- t.Error(err)
- }
- trans2 = NewConcurrentGraphTrans(gm)
- trans2.RemoveNode("main", "123", "mynode")
- sm = mgs.StorageManager("main"+"mynode"+StorageSuffixNodes, false).(*storage.MemoryStorageManager)
- sm.AccessMap[1] = storage.AccessCacheAndFetchError
- if err := trans2.Commit(); !strings.Contains(fmt.Sprint(err), "GraphError: Failed to access graph storage component") {
- t.Error("Unexpected error return:", err)
- return
- }
- delete(sm.AccessMap, 1)
- trans2.RemoveNode("main", "123", "mynode")
- sm.AccessMap[3] = storage.AccessCacheAndFetchError
- if err := trans2.Commit(); !strings.Contains(fmt.Sprint(err), "GraphError: Could not write graph information") {
- t.Error("Unexpected error return:", err)
- return
- }
- delete(sm.AccessMap, 3)
- // Test edge errors
- resetTransAndStorage()
- sm = mgs.StorageManager("main"+"myedge"+StorageSuffixEdgesIndex, true).(*storage.MemoryStorageManager)
- sm.AccessMap[1] = storage.AccessInsertError
- if err := trans.Commit(); !strings.Contains(fmt.Sprint(err), "GraphError: Failed to access graph storage component") {
- t.Error("Unexpected error return:", err)
- return
- }
- delete(sm.AccessMap, 1)
- resetTrans("")
- sm = mgs.StorageManager("main"+"myedge"+StorageSuffixEdges, true).(*storage.MemoryStorageManager)
- sm.AccessMap[1] = storage.AccessInsertError
- if err := trans.Commit(); !strings.Contains(fmt.Sprint(err), "GraphError: Failed to access graph storage component") {
- t.Error("Unexpected error return:", err)
- return
- }
- delete(sm.AccessMap, 1)
- resetTrans("")
- sm.AccessMap[2] = storage.AccessInsertError
- if err := trans.Commit(); !strings.Contains(fmt.Sprint(err), "GraphError: Could not write graph information") {
- t.Error("Unexpected error return:", err)
- return
- }
- delete(sm.AccessMap, 2)
- node1 := data.NewGraphNode()
- node1.SetAttr("key", "123")
- node1.SetAttr("kind", "mynode")
- node1.SetAttr("Name", "Node1")
- node2 := data.NewGraphNode()
- node2.SetAttr("key", "456")
- node2.SetAttr("kind", "mynewnode")
- node2.SetAttr("Name", "Node2")
- node3 := data.NewGraphNode()
- node3.SetAttr("key", "xxx")
- node3.SetAttr("kind", "mynode2")
- node3.SetAttr("Name", "Node3")
- if err := trans.StoreEdge("main", constructEdge(node3, "myedge", node3)); err != nil {
- t.Error(err)
- return
- }
- if err := trans.Commit(); !strings.Contains(fmt.Sprint(err), "GraphError: Invalid data") {
- t.Error("Unexpected error return:", err)
- return
- }
- node3 = data.NewGraphNode()
- node3.SetAttr("key", "xxx")
- node3.SetAttr("kind", "mynode")
- node3.SetAttr("Name", "Node3")
- if err := trans.StoreEdge("main", constructEdge(node3, "myedge", node3)); err != nil {
- t.Error(err)
- return
- }
- if err := trans.Commit(); !strings.Contains(fmt.Sprint(err), "GraphError: Invalid data") {
- t.Error("Unexpected error return:", err)
- return
- }
- resetTransAndStorage()
- trans.Commit()
- resetTrans("")
- node3 = data.NewGraphNode()
- node3.SetAttr("key", "xxx")
- node3.SetAttr("kind", "mynode2")
- node3.SetAttr("Name", "Node3")
- if err := trans.StoreEdge("main", constructEdge(node1, "myedge", node3)); err != nil {
- t.Error(err)
- return
- }
- if err := trans.Commit(); !strings.Contains(fmt.Sprint(err), "GraphError: Invalid data") {
- t.Error("Unexpected error return:", err)
- return
- }
- node3 = data.NewGraphNode()
- node3.SetAttr("key", "xxx")
- node3.SetAttr("kind", "mynode")
- node3.SetAttr("Name", "Node3")
- if err := trans.StoreEdge("main", constructEdge(node1, "myedge", node3)); err != nil {
- t.Error(err)
- return
- }
- if err := trans.Commit(); !strings.Contains(fmt.Sprint(err), "GraphError: Invalid data") {
- t.Error("Unexpected error return:", err)
- return
- }
- resetTransAndStorage()
- trans.Commit()
- trans = NewConcurrentGraphTrans(gm)
- if err := trans.StoreEdge("main", constructEdge(node1, "myedge", node2)); err != nil {
- t.Error(err)
- return
- }
- sm = mgs.StorageManager("main"+"mynode"+StorageSuffixNodes, false).(*storage.MemoryStorageManager)
- sm.AccessMap[1] = storage.AccessCacheAndFetchError
- if err := trans.Commit(); !strings.Contains(fmt.Sprint(err), "GraphError: Failed to access graph storage component") {
- t.Error("Unexpected error return:", err)
- return
- }
- delete(sm.AccessMap, 1)
- resetTransAndStorage()
- trans.Commit()
- trans = NewConcurrentGraphTrans(gm)
- if err := trans.StoreEdge("main", constructEdge(node1, "myedge", node2)); err != nil {
- t.Error(err)
- return
- }
- sm = mgs.StorageManager("main"+"mynewnode"+StorageSuffixNodes, false).(*storage.MemoryStorageManager)
- sm.AccessMap[1] = storage.AccessCacheAndFetchError
- if err := trans.Commit(); !strings.Contains(fmt.Sprint(err), "GraphError: Failed to access graph storage component") {
- t.Error("Unexpected error return:", err)
- return
- }
- delete(sm.AccessMap, 1)
- resetTransAndStorage()
- trans.Commit()
- trans = NewConcurrentGraphTrans(gm)
- if err := trans.StoreEdge("main", constructEdge(node1, "myedge", node2)); err != nil {
- t.Error(err)
- return
- }
- sm = mgs.StorageManager("main"+"myedge"+StorageSuffixEdgesIndex, false).(*storage.MemoryStorageManager)
- sm.AccessMap[4] = storage.AccessCacheAndFetchError
- if err := trans.Commit(); !strings.Contains(fmt.Sprint(err), "GraphError: Index error") {
- t.Error("Unexpected error return:", err)
- return
- }
- delete(sm.AccessMap, 4)
- // Test edge deletion errors
- deleteEdge := constructEdge(node1, "myedge", node2)
- resetTransAndStorage()
- trans.Commit()
- trans2 = NewConcurrentGraphTrans(gm)
- if err := trans2.RemoveEdge("main", deleteEdge.Key(), deleteEdge.Kind()); err != nil {
- t.Error(err)
- return
- }
- sm = mgs.StorageManager("main"+"myedge"+StorageSuffixEdgesIndex, false).(*storage.MemoryStorageManager)
- sm.AccessMap[2] = storage.AccessCacheAndFetchError
- if err := trans2.Commit(); !strings.Contains(fmt.Sprint(err), "GraphError: Index error") {
- t.Error("Unexpected error return:", err)
- return
- }
- delete(sm.AccessMap, 2)
- resetTransAndStorage()
- trans.Commit()
- trans2 = NewConcurrentGraphTrans(gm)
- if err := trans2.RemoveEdge("main", deleteEdge.Key(), deleteEdge.Kind()); err != nil {
- t.Error(err)
- return
- }
- sm = mgs.StorageManager("main"+"myedge"+StorageSuffixEdgesIndex, false).(*storage.MemoryStorageManager)
- sm.AccessMap[1] = storage.AccessCacheAndFetchError
- if err := trans2.Commit(); !strings.Contains(fmt.Sprint(err), "GraphError: Failed to access graph storage component") {
- t.Error("Unexpected error return:", err)
- return
- }
- delete(sm.AccessMap, 1)
- trans2 = NewConcurrentGraphTrans(gm)
- if err := trans2.RemoveEdge("main", deleteEdge.Key(), deleteEdge.Kind()); err != nil {
- t.Error(err)
- return
- }
- sm = mgs.StorageManager("main"+"myedge"+StorageSuffixEdges, false).(*storage.MemoryStorageManager)
- sm.AccessMap[1] = storage.AccessCacheAndFetchError
- if err := trans2.Commit(); !strings.Contains(fmt.Sprint(err), "GraphError: Failed to access graph storage component") {
- t.Error("Unexpected error return:", err)
- return
- }
- delete(sm.AccessMap, 1)
- trans2 = NewConcurrentGraphTrans(gm)
- if err := trans2.RemoveEdge("main", deleteEdge.Key(), deleteEdge.Kind()); err != nil {
- t.Error(err)
- return
- }
- sm = mgs.StorageManager("main"+"myedge"+StorageSuffixEdges, false).(*storage.MemoryStorageManager)
- sm.AccessMap[2] = storage.AccessCacheAndFetchError
- if err := trans2.Commit(); !strings.Contains(fmt.Sprint(err), "GraphError: Could not write graph information") {
- t.Error("Unexpected error return:", err)
- return
- }
- delete(sm.AccessMap, 2)
- resetTransAndStorage()
- trans.Commit()
- trans2 = NewConcurrentGraphTrans(gm)
- if err := trans2.RemoveEdge("main", deleteEdge.Key(), deleteEdge.Kind()); err != nil {
- t.Error(err)
- return
- }
- sm = mgs.StorageManager("main"+deleteEdge.End1Kind()+StorageSuffixNodes, false).(*storage.MemoryStorageManager)
- sm.AccessMap[1] = storage.AccessCacheAndFetchError
- if err := trans2.Commit(); !strings.Contains(fmt.Sprint(err), "GraphError: Failed to access graph storage component") {
- t.Error("Unexpected error return:", err)
- return
- }
- delete(sm.AccessMap, 1)
- resetTransAndStorage()
- trans.Commit()
- trans2 = NewConcurrentGraphTrans(gm)
- if err := trans2.RemoveEdge("main", deleteEdge.Key(), deleteEdge.Kind()); err != nil {
- t.Error(err)
- return
- }
- sm = mgs.StorageManager("main"+deleteEdge.End2Kind()+StorageSuffixNodes, false).(*storage.MemoryStorageManager)
- sm.AccessMap[1] = storage.AccessCacheAndFetchError
- if err := trans2.Commit(); !strings.Contains(fmt.Sprint(err), "GraphError: Failed to access graph storage component") {
- t.Error("Unexpected error return:", err)
- return
- }
- delete(sm.AccessMap, 1)
- resetTransAndStorage()
- trans.Commit()
- trans2 = NewConcurrentGraphTrans(gm)
- if err := trans2.RemoveEdge("main", deleteEdge.Key(), deleteEdge.Kind()); err != nil {
- t.Error(err)
- return
- }
- sm = mgs.StorageManager("main"+deleteEdge.End2Kind()+StorageSuffixNodes, false).(*storage.MemoryStorageManager)
- sm.AccessMap[5] = storage.AccessCacheAndFetchError
- if err := trans2.Commit(); !strings.Contains(fmt.Sprint(err), "GraphError: Could not read graph information") {
- t.Error("Unexpected error return:", err)
- return
- }
- delete(sm.AccessMap, 5)
- resetTransAndStorage()
- // Delete non-existing node
- if err := trans.RemoveNode("main", "nonexist", "nonexist"); err != nil {
- t.Error(err)
- return
- }
- if err := trans.Commit(); err != nil {
- t.Error(err)
- return
- }
- }
- func testTransPanic(t *testing.T) {
- defer func() {
- graphstorage.MgsRetFlushMain = nil
- if r := recover(); r == nil {
- t.Error("Transaction with a serious write error (during flushing) did not cause a panic.")
- }
- }()
- mgs := graphstorage.NewMemoryGraphStorage("mystorage")
- gm := newGraphManagerNoRules(mgs)
- gm.getNodeStorageHTree("main", "mynode", true)
- trans := NewConcurrentGraphTrans(gm)
- node1 := data.NewGraphNode()
- node1.SetAttr("key", "123")
- node1.SetAttr("kind", "mynode")
- trans.StoreNode("main", node1)
- graphstorage.MgsRetFlushMain = errors.New("test")
- trans.Commit()
- }
|