trans_test.go 35 KB


  1. /*
  2. * EliasDB
  3. *
  4. * Copyright 2016 Matthias Ladkau. All rights reserved.
  5. *
  6. * This Source Code Form is subject to the terms of the Mozilla Public
  7. * License, v. 2.0. If a copy of the MPL was not distributed with this
  8. * file, You can obtain one at http://mozilla.org/MPL/2.0/.
  9. */
  10. package graph
  11. import (
  12. "errors"
  13. "fmt"
  14. "strings"
  15. "sync"
  16. "testing"
  17. "devt.de/krotik/eliasdb/graph/data"
  18. "devt.de/krotik/eliasdb/graph/graphstorage"
  19. "devt.de/krotik/eliasdb/storage"
  20. )
  21. /*
  22. lockTrans is a special transaction for testing which can lock on commit.
  23. */
  24. type lockTrans struct {
  25. Trans
  26. Lock *sync.Mutex
  27. }
  28. func (gt *lockTrans) Commit() error {
  29. gt.Lock.Lock()
  30. defer gt.Lock.Unlock()
  31. return gt.Trans.Commit()
  32. }
  33. func newLockTrans(gm *Manager) Trans {
  34. return &lockTrans{NewConcurrentGraphTrans(gm), &sync.Mutex{}}
  35. }
  36. func TestRollingTrans(t *testing.T) {
  37. var err error
  38. constructNode := func(key string) data.Node {
  39. node := data.NewGraphNode()
  40. node.SetAttr("key", key)
  41. node.SetAttr("kind", "testnode")
  42. node.SetAttr(data.NodeName, key)
  43. return node
  44. }
  45. constructEdge := func(key1, key2 string) data.Edge {
  46. edge := data.NewGraphEdge()
  47. edge.SetAttr("key", key1+key2)
  48. edge.SetAttr("kind", "testedge")
  49. edge.SetAttr(data.NodeName, "testedge")
  50. edge.SetAttr(data.EdgeEnd1Key, key1)
  51. edge.SetAttr(data.EdgeEnd1Kind, "testnode")
  52. edge.SetAttr(data.EdgeEnd1Role, "node1")
  53. edge.SetAttr(data.EdgeEnd1Cascading, false)
  54. edge.SetAttr(data.EdgeEnd2Key, key2)
  55. edge.SetAttr(data.EdgeEnd2Kind, "testnode")
  56. edge.SetAttr(data.EdgeEnd2Role, "node2")
  57. edge.SetAttr(data.EdgeEnd2Cascading, false)
  58. return edge
  59. }
  60. // Creeate storage and insert test nodes
  61. mgs := graphstorage.NewMemoryGraphStorage("mystorage")
  62. gm := newGraphManagerNoRules(mgs)
  63. var trans Trans
  64. // Create a new Rolling Trans which rolls over after 3 operations
  65. trans = NewRollingTrans(newLockTrans(gm), 3, gm, newLockTrans)
  66. trans.(*rollingTrans).id = "42"
  67. if trans.ID() != "42" {
  68. t.Error("Unexpected result:", trans.ID())
  69. return
  70. }
  71. if !trans.IsEmpty() {
  72. t.Error("Unexpected result")
  73. return
  74. }
  75. trans.StoreNode("main", constructNode("1"))
  76. trans.StoreNode("main", constructNode("2"))
  77. if res := trans.String(); res != "Rolling transaction 42 - Nodes: I:2 R:0 - Edges: I:0 R:0 - Threshold: 3 - In-flight: 0" {
  78. t.Error("Unexpected result:", res)
  79. return
  80. }
  81. // Straight case - we commit an operation over the threshold; the current
  82. // transaction is committed and a new one is put in place.
  83. trans.StoreEdge("main", constructEdge("1", "2"))
  84. trans.(*rollingTrans).wg.Wait() // The wait here ensures that wewait until the transaction was committed
  85. // The output should show a clean transaction since all changes
  86. // have been committed to the datastore
  87. if res := trans.String(); res != "Rolling transaction 42 - Nodes: I:0 R:0 - Edges: I:0 R:0 - Threshold: 3 - In-flight: 0" {
  88. t.Error("Unexpected result:", res)
  89. return
  90. }
  91. trans.StoreNode("main", constructNode("1"))
  92. trans.StoreNode("main", constructNode("2"))
  93. // Lock the commit of the current transaction - this lets us examine
  94. // what happens duing the commit of the current transaction
  95. currentTransLock := trans.(*rollingTrans).currentTrans.(*lockTrans).Lock
  96. currentTransLock.Lock()
  97. trans.StoreEdge("main", constructEdge("1", "2"))
  98. // The committing go routine hangs now on the commit lock and we'll see that one transaction is in flight
  99. if res := trans.String(); res != "Rolling transaction 42 - Nodes: I:2 R:0 - Edges: I:1 R:0 - Threshold: 3 - In-flight: 1" {
  100. t.Error("Unexpected result:", res)
  101. return
  102. }
  103. if res := fmt.Sprint(trans.Counts()); res != "2 1 0 0" {
  104. t.Error("Unexpected result:", res)
  105. return
  106. }
  107. // Release the lock and wait until the commit has finished
  108. currentTransLock.Unlock()
  109. trans.(*rollingTrans).wg.Wait() // The wait here ensures that wewait until the transaction was committed
  110. // The transaction should be empty again
  111. if res := trans.String(); res != "Rolling transaction 42 - Nodes: I:0 R:0 - Edges: I:0 R:0 - Threshold: 3 - In-flight: 0" {
  112. t.Error("Unexpected result:", res)
  113. return
  114. }
  115. // Do now some proper operations
  116. if err = trans.StoreNode("main", constructNode("test")); err != nil {
  117. t.Error(err)
  118. return
  119. }
  120. if err = trans.UpdateNode("main", constructNode("test")); err != nil {
  121. t.Error(err)
  122. return
  123. }
  124. if err = trans.RemoveNode("main", "test", "testnode"); err != nil {
  125. t.Error(err)
  126. return
  127. }
  128. if err = trans.StoreEdge("main", constructEdge("1", "3")); err != nil {
  129. t.Error(err)
  130. return
  131. }
  132. if err = trans.RemoveEdge("main", "13", "testedge"); err != nil {
  133. t.Error(err)
  134. return
  135. }
  136. if err = trans.Commit(); err != nil {
  137. t.Error(err)
  138. return
  139. }
  140. // Test error cases
  141. trans = NewRollingTrans(newLockTrans(gm), 5, gm, newLockTrans)
  142. if err = trans.StoreEdge("main", constructEdge("1", "13")); err != nil {
  143. t.Error(err)
  144. return
  145. }
  146. if err = trans.Commit(); err == nil || err.Error() != "GraphError: Invalid data (Can't find edge endpoint: 13 (testnode))" {
  147. t.Error(err)
  148. return
  149. }
  150. trans = NewRollingTrans(newLockTrans(gm), 0, gm, newLockTrans)
  151. if err = trans.StoreEdge("main", constructEdge("1", "13")); err != nil {
  152. t.Error(err)
  153. return
  154. }
  155. if err = trans.Commit(); err == nil || err.Error() != "GraphError: Invalid data (Can't find edge endpoint: 13 (testnode))" {
  156. t.Error(err)
  157. return
  158. }
  159. }
  160. func TestNormalTrans(t *testing.T) {
  161. if !RunDiskStorageTests {
  162. return
  163. }
  164. constructEdge := func(node1 data.Node, kind string, node2 data.Node) data.Edge {
  165. edge := data.NewGraphEdge()
  166. edge.SetAttr("key", "abc"+node1.Key()+node2.Key())
  167. edge.SetAttr("kind", kind)
  168. edge.SetAttr(data.EdgeEnd1Key, node1.Key())
  169. edge.SetAttr(data.EdgeEnd1Kind, node1.Kind())
  170. edge.SetAttr(data.EdgeEnd1Role, "node1")
  171. edge.SetAttr(data.EdgeEnd1Cascading, true)
  172. edge.SetAttr(data.EdgeEnd2Key, node2.Key())
  173. edge.SetAttr(data.EdgeEnd2Kind, node2.Kind())
  174. edge.SetAttr(data.EdgeEnd2Role, "node2")
  175. edge.SetAttr(data.EdgeEnd2Cascading, false)
  176. edge.SetAttr(data.NodeName, "Edge "+kind)
  177. return edge
  178. }
  179. dgs, err := graphstorage.NewDiskGraphStorage(GraphManagerTestDBDir4, false)
  180. if err != nil {
  181. t.Error(err)
  182. return
  183. }
  184. gm := newGraphManagerNoRules(dgs)
  185. node1 := data.NewGraphNode()
  186. node1.SetAttr("key", "123")
  187. node1.SetAttr("kind", "mynode")
  188. node1.SetAttr("Name", "Node1")
  189. node2 := data.NewGraphNode()
  190. node2.SetAttr("key", "456")
  191. node2.SetAttr("kind", "mynewnode")
  192. node2.SetAttr("Name", "Node2")
  193. trans := NewConcurrentGraphTrans(gm)
  194. // Trivial functions
  195. if trans.ID() == "" {
  196. t.Error("ID should not be empty")
  197. return
  198. }
  199. if err := trans.UpdateNode("", data.NewGraphNode()); err == nil || err.Error() != "GraphError: Invalid data (Node is missing a key value)" {
  200. t.Error(err)
  201. return
  202. }
  203. if !trans.IsEmpty() {
  204. t.Error("New transaction should be empty")
  205. return
  206. }
  207. // Store some nodes
  208. if err := trans.StoreNode("main", node1); err != nil {
  209. t.Error(err)
  210. return
  211. }
  212. if err := trans.StoreNode("main", node2); err != nil {
  213. t.Error(err)
  214. return
  215. }
  216. if err := trans.StoreEdge("main", constructEdge(node1, "myedge", node2)); err != nil {
  217. t.Error(err)
  218. return
  219. }
  220. if c := gm.NodeCount("mynode"); c != 0 {
  221. t.Error("Unexpected node count:", c)
  222. return
  223. }
  224. if err := trans.Commit(); err != nil {
  225. t.Error(err)
  226. return
  227. }
  228. if c := gm.NodeCount("mynode"); c != 1 {
  229. t.Error("Unexpected node count:", c)
  230. return
  231. }
  232. if c := gm.NodeCount("mynewnode"); c != 1 {
  233. t.Error("Unexpected node count:", c)
  234. return
  235. }
  236. if c := gm.EdgeCount("myedge"); c != 1 {
  237. t.Error("Unexpected edge count:", c)
  238. return
  239. }
  240. trans2 := NewConcurrentGraphTrans(gm)
  241. trans3 := NewConcurrentGraphTrans(gm)
  242. node3 := data.NewGraphNode()
  243. node3.SetAttr("key", "789")
  244. node3.SetAttr("kind", "mynode")
  245. node3.SetAttr("Name", "Node3")
  246. node4 := data.NewGraphNode()
  247. node4.SetAttr("key", "abc")
  248. node4.SetAttr("kind", "mynode")
  249. node4.SetAttr("Name", "Node4")
  250. if err := trans2.StoreNode("main", node3); err != nil {
  251. t.Error(err)
  252. return
  253. }
  254. if err := trans2.StoreEdge("main", constructEdge(node3, "myedge", node4)); err != nil {
  255. t.Error(err)
  256. return
  257. }
  258. if err := trans3.StoreNode("main", node4); err != nil {
  259. t.Error(err)
  260. return
  261. }
  262. if res := fmt.Sprint(trans2.Counts()); res != "1 1 0 0" {
  263. t.Error("Unexpected result:", res)
  264. return
  265. }
  266. trans2.(*concurrentTrans).Trans.(*baseTrans).id = "1"
  267. if res := fmt.Sprint(trans2); res != "Transaction 1 - Nodes: I:1 R:0 - Edges: I:1 R:0" {
  268. t.Error("Unexpected result:", res)
  269. return
  270. }
  271. // This should fail since node 4 is not there
  272. if err := trans2.Commit(); err.Error() != "GraphError: Invalid data (Can't find edge endpoint: abc (mynode))" {
  273. t.Error(err)
  274. return
  275. }
  276. // Transaction should have rolled back
  277. if c := gm.NodeCount("mynode"); c != 1 {
  278. t.Error("Unexpected node count:", c)
  279. return
  280. }
  281. if c := gm.NodeCount("mynewnode"); c != 1 {
  282. t.Error("Unexpected node count:", c)
  283. return
  284. }
  285. if c := gm.EdgeCount("myedge"); c != 1 {
  286. t.Error("Unexpected edge count:", c)
  287. return
  288. }
  289. // Now commit transaction 3 to make transaction 2 work
  290. if err := trans3.Commit(); err != nil {
  291. t.Error(err)
  292. return
  293. }
  294. if c := gm.NodeCount("mynode"); c != 2 {
  295. t.Error("Unexpected node count:", c)
  296. return
  297. }
  298. if c := gm.NodeCount("mynewnode"); c != 1 {
  299. t.Error("Unexpected node count:", c)
  300. return
  301. }
  302. if c := gm.EdgeCount("myedge"); c != 1 {
  303. t.Error("Unexpected edge count:", c)
  304. return
  305. }
  306. if err := trans2.StoreNode("main", node3); err != nil {
  307. t.Error(err)
  308. return
  309. }
  310. if err := trans2.StoreEdge("main", constructEdge(node3, "myedge", node4)); err != nil {
  311. t.Error(err)
  312. return
  313. }
  314. if err := trans2.Commit(); err != nil {
  315. t.Error(err)
  316. return
  317. }
  318. if c := gm.NodeCount("mynode"); c != 3 {
  319. t.Error("Unexpected node count:", c)
  320. return
  321. }
  322. if c := gm.NodeCount("mynewnode"); c != 1 {
  323. t.Error("Unexpected node count:", c)
  324. return
  325. }
  326. if c := gm.EdgeCount("myedge"); c != 2 {
  327. t.Error("Unexpected edge count:", c)
  328. return
  329. }
  330. // Check that we commit the transactions again - the inserts become
  331. // updates but the numbers won't change
  332. transUpdate := NewConcurrentGraphTrans(gm)
  333. transUpdate.StoreNode("main", node1)
  334. transUpdate.StoreNode("main", node2)
  335. transUpdate.StoreNode("main", node3)
  336. transUpdate.StoreNode("main", node4)
  337. transUpdate.StoreEdge("main", constructEdge(node1, "myedge", node2))
  338. transUpdate.StoreEdge("main", constructEdge(node3, "myedge", node4))
  339. if err := transUpdate.Commit(); err != nil {
  340. t.Error(err)
  341. return
  342. }
  343. // Test commit of empty transaction
  344. if err := NewConcurrentGraphTrans(gm).Commit(); err != nil {
  345. t.Error(err)
  346. return
  347. }
  348. if c := gm.NodeCount("mynode"); c != 3 {
  349. t.Error("Unexpected node count:", c)
  350. return
  351. }
  352. if c := gm.NodeCount("mynewnode"); c != 1 {
  353. t.Error("Unexpected node count:", c)
  354. return
  355. }
  356. if c := gm.EdgeCount("myedge"); c != 2 {
  357. t.Error("Unexpected edge count:", c)
  358. return
  359. }
  360. // Test search index updates
  361. q, _ := gm.NodeIndexQuery("main", node4.Kind())
  362. res, _ := q.LookupWord("Name", "Node4")
  363. if fmt.Sprint(res) != "map[abc:[1]]" {
  364. t.Error("Unexpected index lookup result:", res)
  365. return
  366. }
  367. // Test removal of stuff
  368. trans4 := NewConcurrentGraphTrans(gm)
  369. trans4.RemoveEdge("main", "abc789abc", "myedge")
  370. trans4.RemoveNode("main", node4.Key(), node4.Kind())
  371. if err := trans4.Commit(); err != nil {
  372. t.Error(err)
  373. return
  374. }
  375. q, _ = gm.NodeIndexQuery("main", node4.Kind())
  376. res, _ = q.LookupWord("Name", "Node4")
  377. if fmt.Sprint(res) != "map[]" {
  378. t.Error("Unexpected index lookup result:", res)
  379. return
  380. }
  381. if c := gm.NodeCount("mynode"); c != 2 {
  382. t.Error("Unexpected node count:", c)
  383. return
  384. }
  385. if c := gm.NodeCount("mynewnode"); c != 1 {
  386. t.Error("Unexpected node count:", c)
  387. return
  388. }
  389. if c := gm.EdgeCount("myedge"); c != 1 {
  390. t.Error("Unexpected edge count:", c)
  391. return
  392. }
  393. dgs.Close()
  394. }
  395. func TestTransBuilding(t *testing.T) {
  396. node1 := data.NewGraphNode()
  397. node1.SetAttr("key", "123")
  398. node1.SetAttr("kind", "mykind")
  399. node1.SetAttr("Name", "Node1")
  400. updnode1 := data.NewGraphNode()
  401. updnode1.SetAttr("key", "123")
  402. updnode1.SetAttr("kind", "mykind")
  403. updnode1.SetAttr("Update", "ok")
  404. node2 := data.NewGraphNode()
  405. node2.SetAttr("key", "456")
  406. node2.SetAttr("kind", "mynewkind")
  407. node2.SetAttr("Name", "Node2")
  408. constructEdge := func(kind string) data.Edge {
  409. edge := data.NewGraphEdge()
  410. edge.SetAttr("key", "abc")
  411. edge.SetAttr("kind", kind)
  412. edge.SetAttr(data.EdgeEnd1Key, node1.Key())
  413. edge.SetAttr(data.EdgeEnd1Kind, node1.Kind())
  414. edge.SetAttr(data.EdgeEnd1Role, "node1")
  415. edge.SetAttr(data.EdgeEnd1Cascading, true)
  416. edge.SetAttr(data.EdgeEnd2Key, node2.Key())
  417. edge.SetAttr(data.EdgeEnd2Kind, node2.Kind())
  418. edge.SetAttr(data.EdgeEnd2Role, "node2")
  419. edge.SetAttr(data.EdgeEnd2Cascading, false)
  420. edge.SetAttr(data.NodeName, "Edge "+kind)
  421. return edge
  422. }
  423. // Creeate storage and insert test nodes
  424. mgs := graphstorage.NewMemoryGraphStorage("mystorage")
  425. gm := newGraphManagerNoRules(mgs)
  426. trans := newInternalGraphTrans(gm)
  427. if err := trans.StoreNode("main", node1); err != nil {
  428. t.Error(err)
  429. return
  430. }
  431. checkMaps(t, trans, "main", node1.Key(), node1.Kind(), true, false, false, false)
  432. countMaps(t, trans, 1, 0, 0, 0)
  433. if err := trans.StoreNode("main", node2); err != nil {
  434. t.Error(err)
  435. return
  436. }
  437. checkMaps(t, trans, "main", node1.Key(), node1.Kind(), true, false, false, false)
  438. checkMaps(t, trans, "main", node2.Key(), node2.Kind(), true, false, false, false)
  439. countMaps(t, trans, 2, 0, 0, 0)
  440. edge1 := constructEdge("myedge")
  441. if err := trans.StoreEdge("main", edge1); err != nil {
  442. t.Error(err)
  443. return
  444. }
  445. checkMaps(t, trans, "main", node1.Key(), node1.Kind(), true, false, false, false)
  446. checkMaps(t, trans, "main", node2.Key(), node2.Kind(), true, false, false, false)
  447. checkMaps(t, trans, "main", edge1.Key(), edge1.Kind(), false, false, true, false)
  448. countMaps(t, trans, 2, 0, 1, 0)
  449. // Check that updating will not remove anything
  450. if err := trans.UpdateNode("main", updnode1); err != nil {
  451. t.Error(err)
  452. return
  453. }
  454. checkMaps(t, trans, "main", node1.Key(), node1.Kind(), true, false, false, false)
  455. checkMaps(t, trans, "main", node2.Key(), node2.Kind(), true, false, false, false)
  456. checkMaps(t, trans, "main", edge1.Key(), edge1.Kind(), false, false, true, false)
  457. countMaps(t, trans, 2, 0, 1, 0)
  458. // Check that the node was updated
  459. test := trans.storeNodes[trans.createKey("main", node1.Key(), node1.Kind())]
  460. if !data.NodeCompare(test, data.NodeMerge(node1, updnode1), nil) {
  461. t.Error("Unexpected update result:", test)
  462. return
  463. }
  464. // Remove a node
  465. if err := trans.RemoveNode("main", node1.Key(), node1.Kind()); err != nil {
  466. t.Error(err)
  467. return
  468. }
  469. checkMaps(t, trans, "main", node1.Key(), node1.Kind(), false, true, false, false)
  470. checkMaps(t, trans, "main", node2.Key(), node2.Kind(), true, false, false, false)
  471. checkMaps(t, trans, "main", edge1.Key(), edge1.Kind(), false, false, true, false)
  472. countMaps(t, trans, 1, 1, 1, 0)
  473. // Check that the update does an insert
  474. if err := trans.UpdateNode("main", updnode1); err != nil {
  475. t.Error(err)
  476. return
  477. }
  478. checkMaps(t, trans, "main", node1.Key(), node1.Kind(), true, false, false, false)
  479. checkMaps(t, trans, "main", node2.Key(), node2.Kind(), true, false, false, false)
  480. checkMaps(t, trans, "main", edge1.Key(), edge1.Kind(), false, false, true, false)
  481. countMaps(t, trans, 2, 0, 1, 0)
  482. // Check that the node was inserted
  483. test = trans.storeNodes[trans.createKey("main", node1.Key(), node1.Kind())]
  484. if !data.NodeCompare(test, updnode1, nil) {
  485. t.Error("Unexpected update result:", test)
  486. return
  487. }
  488. // Remove a node
  489. if err := trans.RemoveNode("main", node1.Key(), node1.Kind()); err != nil {
  490. t.Error(err)
  491. return
  492. }
  493. checkMaps(t, trans, "main", node1.Key(), node1.Kind(), false, true, false, false)
  494. checkMaps(t, trans, "main", node2.Key(), node2.Kind(), true, false, false, false)
  495. checkMaps(t, trans, "main", edge1.Key(), edge1.Kind(), false, false, true, false)
  496. countMaps(t, trans, 1, 1, 1, 0)
  497. // Store node again
  498. if err := trans.StoreNode("main", node1); err != nil {
  499. t.Error(err)
  500. return
  501. }
  502. checkMaps(t, trans, "main", node1.Key(), node1.Kind(), true, false, false, false)
  503. checkMaps(t, trans, "main", node2.Key(), node2.Kind(), true, false, false, false)
  504. checkMaps(t, trans, "main", edge1.Key(), edge1.Kind(), false, false, true, false)
  505. countMaps(t, trans, 2, 0, 1, 0)
  506. // Remove edge
  507. if err := trans.RemoveEdge("main", edge1.Key(), edge1.Kind()); err != nil {
  508. t.Error(err)
  509. return
  510. }
  511. checkMaps(t, trans, "main", node1.Key(), node1.Kind(), true, false, false, false)
  512. checkMaps(t, trans, "main", node2.Key(), node2.Kind(), true, false, false, false)
  513. checkMaps(t, trans, "main", edge1.Key(), edge1.Kind(), false, false, false, true)
  514. countMaps(t, trans, 2, 0, 0, 1)
  515. // Store the edge again
  516. if err := trans.StoreEdge("main", edge1); err != nil {
  517. t.Error(err)
  518. return
  519. }
  520. checkMaps(t, trans, "main", node1.Key(), node1.Kind(), true, false, false, false)
  521. checkMaps(t, trans, "main", node2.Key(), node2.Kind(), true, false, false, false)
  522. checkMaps(t, trans, "main", edge1.Key(), edge1.Kind(), false, false, true, false)
  523. countMaps(t, trans, 2, 0, 1, 0)
  524. // Test trivial errors using broken nodes and partitions
  525. brokenNode := data.NewGraphNode()
  526. if trans.StoreNode("b b", brokenNode) == nil || trans.StoreNode("main", brokenNode) == nil {
  527. t.Error("Error result was expected")
  528. return
  529. }
  530. if trans.UpdateNode("b b", brokenNode) == nil || trans.UpdateNode("main", brokenNode) == nil {
  531. t.Error("Error result was expected")
  532. return
  533. }
  534. if trans.RemoveNode("b b", "123", "bla") == nil {
  535. t.Error("Error result was expected")
  536. return
  537. }
  538. brokenEdge := data.NewGraphEdge()
  539. if trans.StoreEdge("b b", brokenEdge) == nil || trans.StoreEdge("main", brokenEdge) == nil {
  540. t.Error("Error result was expected")
  541. return
  542. }
  543. if trans.RemoveEdge("b b", "123", "bla") == nil {
  544. t.Error("Error result was expected")
  545. return
  546. }
  547. // Test update case if a node exists already in the datastore
  548. node3instore := data.NewGraphNode()
  549. node3instore.SetAttr("key", "789")
  550. node3instore.SetAttr("kind", "mynewkind")
  551. node3instore.SetAttr("Existing", "TestNode3")
  552. gm.StoreNode("main", node3instore)
  553. node3 := data.NewGraphNode()
  554. node3.SetAttr("key", "789")
  555. node3.SetAttr("kind", "mynewkind")
  556. node3.SetAttr("Name", "Node3")
  557. sm := gm.gs.StorageManager("main"+node3.Kind()+StorageSuffixNodes, false)
  558. sm.(*storage.MemoryStorageManager).AccessMap[1] = storage.AccessCacheAndFetchError
  559. // Check that the update fails
  560. if err := trans.UpdateNode("main", node3); err.Error() !=
  561. "GraphError: Failed to access graph storage component (Slot not found (mystorage/mainmynewkind.nodes - Location:1))" {
  562. t.Error(err)
  563. return
  564. }
  565. checkMaps(t, trans, "main", node1.Key(), node1.Kind(), true, false, false, false)
  566. checkMaps(t, trans, "main", node2.Key(), node2.Kind(), true, false, false, false)
  567. checkMaps(t, trans, "main", edge1.Key(), edge1.Kind(), false, false, true, false)
  568. countMaps(t, trans, 2, 0, 1, 0)
  569. delete(sm.(*storage.MemoryStorageManager).AccessMap, 1)
  570. // Check that the update is converted to an insert with the updated node
  571. if err := trans.UpdateNode("main", node3); err != nil {
  572. t.Error(err)
  573. return
  574. }
  575. checkMaps(t, trans, "main", node1.Key(), node1.Kind(), true, false, false, false)
  576. checkMaps(t, trans, "main", node2.Key(), node2.Kind(), true, false, false, false)
  577. checkMaps(t, trans, "main", node2.Key(), node3.Kind(), true, false, false, false)
  578. checkMaps(t, trans, "main", edge1.Key(), edge1.Kind(), false, false, true, false)
  579. countMaps(t, trans, 3, 0, 1, 0)
  580. test = trans.storeNodes[trans.createKey("main", node3.Key(), node3.Kind())]
  581. if !data.NodeCompare(test, data.NodeMerge(node3instore, node3), nil) {
  582. t.Error("Unexpected update result:", test)
  583. return
  584. }
  585. }
  586. func checkMaps(t *testing.T, trans *baseTrans, part string, ikey string, ikind string,
  587. nodeStore bool, nodeRemove bool, edgeStore bool, edgeRemove bool) {
  588. key := trans.createKey(part, ikey, ikind)
  589. if _, ok := trans.storeNodes[key]; ok != nodeStore {
  590. t.Error("Expected element is not in storesNodes:", key)
  591. }
  592. if _, ok := trans.removeNodes[key]; ok != nodeRemove {
  593. t.Error("Expected element is not in removeNodes:", key)
  594. }
  595. if _, ok := trans.storeEdges[key]; ok != edgeStore {
  596. t.Error("Expected element is not in storesEdges:", key)
  597. }
  598. if _, ok := trans.removeEdges[key]; ok != edgeRemove {
  599. t.Error("Expected element is not in removeEdges:", key)
  600. }
  601. }
  602. func countMaps(t *testing.T, trans *baseTrans, nodeStore int, nodeRemove int,
  603. edgeStore int, edgeRemove int) {
  604. if c := len(trans.storeNodes); c != nodeStore {
  605. t.Error("Unexpected storeNodes count:", c, " expected:", nodeStore)
  606. }
  607. if c := len(trans.removeNodes); c != nodeRemove {
  608. t.Error("Unexpected storeNodes count:", c, " expected:", nodeRemove)
  609. }
  610. if c := len(trans.storeEdges); c != edgeStore {
  611. t.Error("Unexpected storeNodes count:", c, " expected:", edgeStore)
  612. }
  613. if c := len(trans.removeEdges); c != edgeRemove {
  614. t.Error("Unexpected storeNodes count:", c, " expected:", edgeRemove)
  615. }
  616. }
  617. func TestTransErrors(t *testing.T) {
  618. testTransPanic(t)
  619. constructEdge := func(node1 data.Node, kind string, node2 data.Node) data.Edge {
  620. edge := data.NewGraphEdge()
  621. edge.SetAttr("key", "abc"+node1.Key()+node2.Key())
  622. edge.SetAttr("kind", kind)
  623. edge.SetAttr(data.EdgeEnd1Key, node1.Key())
  624. edge.SetAttr(data.EdgeEnd1Kind, node1.Kind())
  625. edge.SetAttr(data.EdgeEnd1Role, "node1")
  626. edge.SetAttr(data.EdgeEnd1Cascading, true)
  627. edge.SetAttr(data.EdgeEnd2Key, node2.Key())
  628. edge.SetAttr(data.EdgeEnd2Kind, node2.Kind())
  629. edge.SetAttr(data.EdgeEnd2Role, "node2")
  630. edge.SetAttr(data.EdgeEnd2Cascading, false)
  631. edge.SetAttr(data.NodeName, "Edge "+kind)
  632. return edge
  633. }
  634. mgs := graphstorage.NewMemoryGraphStorage("mystorage")
  635. gm := newGraphManagerNoRules(mgs)
  636. trans := NewConcurrentGraphTrans(gm)
  637. resetTrans := func(namesuffix string) {
  638. trans = NewConcurrentGraphTrans(gm)
  639. node1 := data.NewGraphNode()
  640. node1.SetAttr("key", "123")
  641. node1.SetAttr("kind", "mynode")
  642. node1.SetAttr("Name", "Node1"+namesuffix)
  643. node2 := data.NewGraphNode()
  644. node2.SetAttr("key", "456")
  645. node2.SetAttr("kind", "mynewnode")
  646. node2.SetAttr("Name", "Node2"+namesuffix)
  647. if err := trans.StoreNode("main", node1); err != nil {
  648. t.Error(err)
  649. return
  650. }
  651. if err := trans.StoreNode("main", node2); err != nil {
  652. t.Error(err)
  653. return
  654. }
  655. if err := trans.StoreEdge("main", constructEdge(node1, "myedge", node2)); err != nil {
  656. t.Error(err)
  657. return
  658. }
  659. }
  660. resetTransAndStorage := func() {
  661. mgs = graphstorage.NewMemoryGraphStorage("mystorage")
  662. gm = newGraphManagerNoRules(mgs)
  663. resetTrans("")
  664. }
  665. resetTransAndStorage()
  666. // Test an inaccessible edge index
  667. storage.MsmCallNumRollback = 0
  668. sm := mgs.StorageManager("main"+"myedge"+StorageSuffixEdgesIndex, true).(*storage.MemoryStorageManager)
  669. sm.AccessMap[3] = storage.AccessInsertError
  670. if err := trans.Commit(); !strings.Contains(fmt.Sprint(err), "GraphError: Index error") {
  671. t.Error("Unexpected error return:", err)
  672. return
  673. }
  674. if storage.MsmCallNumRollback != 6 {
  675. t.Error("Unexpected number of rollback calls:", storage.MsmCallNumRollback)
  676. }
  677. delete(sm.AccessMap, 3)
  678. // Test node commit failures
  679. resetTransAndStorage()
  680. sm = mgs.StorageManager("main"+"mynode"+StorageSuffixNodes, true).(*storage.MemoryStorageManager)
  681. sm.AccessMap[1] = storage.AccessInsertError
  682. if err := trans.Commit(); !strings.Contains(fmt.Sprint(err), "GraphError: Failed to access graph storage component") {
  683. t.Error("Unexpected error return:", err)
  684. return
  685. }
  686. delete(sm.AccessMap, 1)
  687. resetTrans("")
  688. sm = mgs.StorageManager("main"+"mynode"+StorageSuffixNodes, true).(*storage.MemoryStorageManager)
  689. sm.AccessMap[4] = storage.AccessInsertError
  690. if err := trans.Commit(); !strings.Contains(fmt.Sprint(err), "GraphError: Could not write graph information") {
  691. fmt.Println(sm)
  692. t.Error("Unexpected error return:", err)
  693. return
  694. }
  695. delete(sm.AccessMap, 4)
  696. resetTransAndStorage()
  697. sm = mgs.StorageManager("main"+"mynode"+StorageSuffixNodesIndex, true).(*storage.MemoryStorageManager)
  698. sm.AccessMap[1] = storage.AccessInsertError
  699. if err := trans.Commit(); !strings.Contains(fmt.Sprint(err), "GraphError: Failed to access graph storage component") {
  700. t.Error("Unexpected error return:", err)
  701. return
  702. }
  703. delete(sm.AccessMap, 1)
  704. resetTransAndStorage()
  705. sm = mgs.StorageManager("main"+"mynode"+StorageSuffixNodesIndex, true).(*storage.MemoryStorageManager)
  706. sm.AccessMap[2] = storage.AccessInsertError
  707. if err := trans.Commit(); !strings.Contains(fmt.Sprint(err), "GraphError: Index error") {
  708. t.Error("Unexpected error return:", err)
  709. return
  710. }
  711. delete(sm.AccessMap, 2)
  712. resetTransAndStorage()
  713. if err := trans.Commit(); err != nil {
  714. t.Error(err)
  715. }
  716. resetTrans("123")
  717. sm = mgs.StorageManager("main"+"mynode"+StorageSuffixNodesIndex, false).(*storage.MemoryStorageManager)
  718. sm.AccessMap[2] = storage.AccessCacheAndFetchError
  719. if err := trans.Commit(); !strings.Contains(fmt.Sprint(err), "GraphError: Index error") {
  720. t.Error("Unexpected error return:", err)
  721. return
  722. }
  723. delete(sm.AccessMap, 2)
  724. trans2 := NewConcurrentGraphTrans(gm)
  725. trans2.RemoveNode("main", "123", "mynode")
  726. sm = mgs.StorageManager("main"+"mynode"+StorageSuffixNodesIndex, false).(*storage.MemoryStorageManager)
  727. sm.AccessMap[1] = storage.AccessCacheAndFetchError
  728. if err := trans2.Commit(); !strings.Contains(fmt.Sprint(err), "GraphError: Failed to access graph storage component") {
  729. t.Error("Unexpected error return:", err)
  730. return
  731. }
  732. delete(sm.AccessMap, 1)
  733. resetTransAndStorage()
  734. if err := trans.Commit(); err != nil {
  735. t.Error(err)
  736. }
  737. trans2 = NewConcurrentGraphTrans(gm)
  738. trans2.RemoveNode("main", "123", "mynode")
  739. sm = mgs.StorageManager("main"+"mynode"+StorageSuffixNodesIndex, false).(*storage.MemoryStorageManager)
  740. sm.AccessMap[2] = storage.AccessCacheAndFetchError
  741. if err := trans2.Commit(); !strings.Contains(fmt.Sprint(err), "GraphError: Index error") {
  742. t.Error("Unexpected error return:", err)
  743. return
  744. }
  745. delete(sm.AccessMap, 2)
  746. resetTransAndStorage()
  747. if err := trans.Commit(); err != nil {
  748. t.Error(err)
  749. }
  750. trans2 = NewConcurrentGraphTrans(gm)
  751. trans2.RemoveNode("main", "123", "mynode")
  752. sm = mgs.StorageManager("main"+"mynode"+StorageSuffixNodes, false).(*storage.MemoryStorageManager)
  753. sm.AccessMap[1] = storage.AccessCacheAndFetchError
  754. if err := trans2.Commit(); !strings.Contains(fmt.Sprint(err), "GraphError: Failed to access graph storage component") {
  755. t.Error("Unexpected error return:", err)
  756. return
  757. }
  758. delete(sm.AccessMap, 1)
  759. trans2.RemoveNode("main", "123", "mynode")
  760. sm.AccessMap[3] = storage.AccessCacheAndFetchError
  761. if err := trans2.Commit(); !strings.Contains(fmt.Sprint(err), "GraphError: Could not write graph information") {
  762. t.Error("Unexpected error return:", err)
  763. return
  764. }
  765. delete(sm.AccessMap, 3)
  766. // Test edge errors
  767. resetTransAndStorage()
  768. sm = mgs.StorageManager("main"+"myedge"+StorageSuffixEdgesIndex, true).(*storage.MemoryStorageManager)
  769. sm.AccessMap[1] = storage.AccessInsertError
  770. if err := trans.Commit(); !strings.Contains(fmt.Sprint(err), "GraphError: Failed to access graph storage component") {
  771. t.Error("Unexpected error return:", err)
  772. return
  773. }
  774. delete(sm.AccessMap, 1)
  775. resetTrans("")
  776. sm = mgs.StorageManager("main"+"myedge"+StorageSuffixEdges, true).(*storage.MemoryStorageManager)
  777. sm.AccessMap[1] = storage.AccessInsertError
  778. if err := trans.Commit(); !strings.Contains(fmt.Sprint(err), "GraphError: Failed to access graph storage component") {
  779. t.Error("Unexpected error return:", err)
  780. return
  781. }
  782. delete(sm.AccessMap, 1)
  783. resetTrans("")
  784. sm.AccessMap[2] = storage.AccessInsertError
  785. if err := trans.Commit(); !strings.Contains(fmt.Sprint(err), "GraphError: Could not write graph information") {
  786. t.Error("Unexpected error return:", err)
  787. return
  788. }
  789. delete(sm.AccessMap, 2)
  790. node1 := data.NewGraphNode()
  791. node1.SetAttr("key", "123")
  792. node1.SetAttr("kind", "mynode")
  793. node1.SetAttr("Name", "Node1")
  794. node2 := data.NewGraphNode()
  795. node2.SetAttr("key", "456")
  796. node2.SetAttr("kind", "mynewnode")
  797. node2.SetAttr("Name", "Node2")
  798. node3 := data.NewGraphNode()
  799. node3.SetAttr("key", "xxx")
  800. node3.SetAttr("kind", "mynode2")
  801. node3.SetAttr("Name", "Node3")
  802. if err := trans.StoreEdge("main", constructEdge(node3, "myedge", node3)); err != nil {
  803. t.Error(err)
  804. return
  805. }
  806. if err := trans.Commit(); !strings.Contains(fmt.Sprint(err), "GraphError: Invalid data") {
  807. t.Error("Unexpected error return:", err)
  808. return
  809. }
  810. node3 = data.NewGraphNode()
  811. node3.SetAttr("key", "xxx")
  812. node3.SetAttr("kind", "mynode")
  813. node3.SetAttr("Name", "Node3")
  814. if err := trans.StoreEdge("main", constructEdge(node3, "myedge", node3)); err != nil {
  815. t.Error(err)
  816. return
  817. }
  818. if err := trans.Commit(); !strings.Contains(fmt.Sprint(err), "GraphError: Invalid data") {
  819. t.Error("Unexpected error return:", err)
  820. return
  821. }
  822. resetTransAndStorage()
  823. trans.Commit()
  824. resetTrans("")
  825. node3 = data.NewGraphNode()
  826. node3.SetAttr("key", "xxx")
  827. node3.SetAttr("kind", "mynode2")
  828. node3.SetAttr("Name", "Node3")
  829. if err := trans.StoreEdge("main", constructEdge(node1, "myedge", node3)); err != nil {
  830. t.Error(err)
  831. return
  832. }
  833. if err := trans.Commit(); !strings.Contains(fmt.Sprint(err), "GraphError: Invalid data") {
  834. t.Error("Unexpected error return:", err)
  835. return
  836. }
  837. node3 = data.NewGraphNode()
  838. node3.SetAttr("key", "xxx")
  839. node3.SetAttr("kind", "mynode")
  840. node3.SetAttr("Name", "Node3")
  841. if err := trans.StoreEdge("main", constructEdge(node1, "myedge", node3)); err != nil {
  842. t.Error(err)
  843. return
  844. }
  845. if err := trans.Commit(); !strings.Contains(fmt.Sprint(err), "GraphError: Invalid data") {
  846. t.Error("Unexpected error return:", err)
  847. return
  848. }
  849. resetTransAndStorage()
  850. trans.Commit()
  851. trans = NewConcurrentGraphTrans(gm)
  852. if err := trans.StoreEdge("main", constructEdge(node1, "myedge", node2)); err != nil {
  853. t.Error(err)
  854. return
  855. }
  856. sm = mgs.StorageManager("main"+"mynode"+StorageSuffixNodes, false).(*storage.MemoryStorageManager)
  857. sm.AccessMap[1] = storage.AccessCacheAndFetchError
  858. if err := trans.Commit(); !strings.Contains(fmt.Sprint(err), "GraphError: Failed to access graph storage component") {
  859. t.Error("Unexpected error return:", err)
  860. return
  861. }
  862. delete(sm.AccessMap, 1)
  863. resetTransAndStorage()
  864. trans.Commit()
  865. trans = NewConcurrentGraphTrans(gm)
  866. if err := trans.StoreEdge("main", constructEdge(node1, "myedge", node2)); err != nil {
  867. t.Error(err)
  868. return
  869. }
  870. sm = mgs.StorageManager("main"+"mynewnode"+StorageSuffixNodes, false).(*storage.MemoryStorageManager)
  871. sm.AccessMap[1] = storage.AccessCacheAndFetchError
  872. if err := trans.Commit(); !strings.Contains(fmt.Sprint(err), "GraphError: Failed to access graph storage component") {
  873. t.Error("Unexpected error return:", err)
  874. return
  875. }
  876. delete(sm.AccessMap, 1)
  877. resetTransAndStorage()
  878. trans.Commit()
  879. trans = NewConcurrentGraphTrans(gm)
  880. if err := trans.StoreEdge("main", constructEdge(node1, "myedge", node2)); err != nil {
  881. t.Error(err)
  882. return
  883. }
  884. sm = mgs.StorageManager("main"+"myedge"+StorageSuffixEdgesIndex, false).(*storage.MemoryStorageManager)
  885. sm.AccessMap[4] = storage.AccessCacheAndFetchError
  886. if err := trans.Commit(); !strings.Contains(fmt.Sprint(err), "GraphError: Index error") {
  887. t.Error("Unexpected error return:", err)
  888. return
  889. }
  890. delete(sm.AccessMap, 4)
  891. // Test edge deletion errors
  892. deleteEdge := constructEdge(node1, "myedge", node2)
  893. resetTransAndStorage()
  894. trans.Commit()
  895. trans2 = NewConcurrentGraphTrans(gm)
  896. if err := trans2.RemoveEdge("main", deleteEdge.Key(), deleteEdge.Kind()); err != nil {
  897. t.Error(err)
  898. return
  899. }
  900. sm = mgs.StorageManager("main"+"myedge"+StorageSuffixEdgesIndex, false).(*storage.MemoryStorageManager)
  901. sm.AccessMap[2] = storage.AccessCacheAndFetchError
  902. if err := trans2.Commit(); !strings.Contains(fmt.Sprint(err), "GraphError: Index error") {
  903. t.Error("Unexpected error return:", err)
  904. return
  905. }
  906. delete(sm.AccessMap, 2)
  907. resetTransAndStorage()
  908. trans.Commit()
  909. trans2 = NewConcurrentGraphTrans(gm)
  910. if err := trans2.RemoveEdge("main", deleteEdge.Key(), deleteEdge.Kind()); err != nil {
  911. t.Error(err)
  912. return
  913. }
  914. sm = mgs.StorageManager("main"+"myedge"+StorageSuffixEdgesIndex, false).(*storage.MemoryStorageManager)
  915. sm.AccessMap[1] = storage.AccessCacheAndFetchError
  916. if err := trans2.Commit(); !strings.Contains(fmt.Sprint(err), "GraphError: Failed to access graph storage component") {
  917. t.Error("Unexpected error return:", err)
  918. return
  919. }
  920. delete(sm.AccessMap, 1)
  921. trans2 = NewConcurrentGraphTrans(gm)
  922. if err := trans2.RemoveEdge("main", deleteEdge.Key(), deleteEdge.Kind()); err != nil {
  923. t.Error(err)
  924. return
  925. }
  926. sm = mgs.StorageManager("main"+"myedge"+StorageSuffixEdges, false).(*storage.MemoryStorageManager)
  927. sm.AccessMap[1] = storage.AccessCacheAndFetchError
  928. if err := trans2.Commit(); !strings.Contains(fmt.Sprint(err), "GraphError: Failed to access graph storage component") {
  929. t.Error("Unexpected error return:", err)
  930. return
  931. }
  932. delete(sm.AccessMap, 1)
  933. trans2 = NewConcurrentGraphTrans(gm)
  934. if err := trans2.RemoveEdge("main", deleteEdge.Key(), deleteEdge.Kind()); err != nil {
  935. t.Error(err)
  936. return
  937. }
  938. sm = mgs.StorageManager("main"+"myedge"+StorageSuffixEdges, false).(*storage.MemoryStorageManager)
  939. sm.AccessMap[2] = storage.AccessCacheAndFetchError
  940. if err := trans2.Commit(); !strings.Contains(fmt.Sprint(err), "GraphError: Could not write graph information") {
  941. t.Error("Unexpected error return:", err)
  942. return
  943. }
  944. delete(sm.AccessMap, 2)
  945. resetTransAndStorage()
  946. trans.Commit()
  947. trans2 = NewConcurrentGraphTrans(gm)
  948. if err := trans2.RemoveEdge("main", deleteEdge.Key(), deleteEdge.Kind()); err != nil {
  949. t.Error(err)
  950. return
  951. }
  952. sm = mgs.StorageManager("main"+deleteEdge.End1Kind()+StorageSuffixNodes, false).(*storage.MemoryStorageManager)
  953. sm.AccessMap[1] = storage.AccessCacheAndFetchError
  954. if err := trans2.Commit(); !strings.Contains(fmt.Sprint(err), "GraphError: Failed to access graph storage component") {
  955. t.Error("Unexpected error return:", err)
  956. return
  957. }
  958. delete(sm.AccessMap, 1)
  959. resetTransAndStorage()
  960. trans.Commit()
  961. trans2 = NewConcurrentGraphTrans(gm)
  962. if err := trans2.RemoveEdge("main", deleteEdge.Key(), deleteEdge.Kind()); err != nil {
  963. t.Error(err)
  964. return
  965. }
  966. sm = mgs.StorageManager("main"+deleteEdge.End2Kind()+StorageSuffixNodes, false).(*storage.MemoryStorageManager)
  967. sm.AccessMap[1] = storage.AccessCacheAndFetchError
  968. if err := trans2.Commit(); !strings.Contains(fmt.Sprint(err), "GraphError: Failed to access graph storage component") {
  969. t.Error("Unexpected error return:", err)
  970. return
  971. }
  972. delete(sm.AccessMap, 1)
  973. resetTransAndStorage()
  974. trans.Commit()
  975. trans2 = NewConcurrentGraphTrans(gm)
  976. if err := trans2.RemoveEdge("main", deleteEdge.Key(), deleteEdge.Kind()); err != nil {
  977. t.Error(err)
  978. return
  979. }
  980. sm = mgs.StorageManager("main"+deleteEdge.End2Kind()+StorageSuffixNodes, false).(*storage.MemoryStorageManager)
  981. sm.AccessMap[5] = storage.AccessCacheAndFetchError
  982. if err := trans2.Commit(); !strings.Contains(fmt.Sprint(err), "GraphError: Could not read graph information") {
  983. t.Error("Unexpected error return:", err)
  984. return
  985. }
  986. delete(sm.AccessMap, 5)
  987. resetTransAndStorage()
  988. // Delete non-existing node
  989. if err := trans.RemoveNode("main", "nonexist", "nonexist"); err != nil {
  990. t.Error(err)
  991. return
  992. }
  993. if err := trans.Commit(); err != nil {
  994. t.Error(err)
  995. return
  996. }
  997. }
  998. func testTransPanic(t *testing.T) {
  999. defer func() {
  1000. graphstorage.MgsRetFlushMain = nil
  1001. if r := recover(); r == nil {
  1002. t.Error("Transaction with a serious write error (during flushing) did not cause a panic.")
  1003. }
  1004. }()
  1005. mgs := graphstorage.NewMemoryGraphStorage("mystorage")
  1006. gm := newGraphManagerNoRules(mgs)
  1007. gm.getNodeStorageHTree("main", "mynode", true)
  1008. trans := NewConcurrentGraphTrans(gm)
  1009. node1 := data.NewGraphNode()
  1010. node1.SetAttr("key", "123")
  1011. node1.SetAttr("kind", "mynode")
  1012. trans.StoreNode("main", node1)
  1013. graphstorage.MgsRetFlushMain = errors.New("test")
  1014. trans.Commit()
  1015. }