trans.go 26 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159
  1. /*
  2. * EliasDB
  3. *
  4. * Copyright 2016 Matthias Ladkau. All rights reserved.
  5. *
  6. * This Source Code Form is subject to the terms of the Mozilla Public
  7. * License, v. 2.0. If a copy of the MPL was not distributed with this
  8. * file, You can obtain one at http://mozilla.org/MPL/2.0/.
  9. */
  10. package graph
  11. import (
  12. "fmt"
  13. "strings"
  14. "sync"
  15. "devt.de/krotik/common/errorutil"
  16. "devt.de/krotik/eliasdb/graph/data"
  17. "devt.de/krotik/eliasdb/graph/util"
  18. )
  19. /*
  20. Trans is a transaction object which should be used to group node and edge operations.
  21. */
  22. type Trans interface {
  23. /*
  24. ID returns a unique transaction ID.
  25. */
  26. ID() string
  27. /*
  28. String returns a string representation of this transatction.
  29. */
  30. String() string
  31. /*
  32. Counts returns the transaction size in terms of objects. Returned values
  33. are nodes to store, edges to store, nodes to remove and edges to remove.
  34. */
  35. Counts() (int, int, int, int)
  36. /*
  37. IsEmpty returns if this transaction is empty.
  38. */
  39. IsEmpty() bool
  40. /*
  41. Commit writes the transaction to the graph database. An automatic rollback is done if
  42. any non-fatal error occurs. Failed transactions cannot be committed again.
  43. Serious write errors which may corrupt the database will cause a panic.
  44. */
  45. Commit() error
  46. /*
  47. StoreNode stores a single node in a partition of the graph. This function will
  48. overwrites any existing node.
  49. */
  50. StoreNode(part string, node data.Node) error
  51. /*
  52. UpdateNode updates a single node in a partition of the graph. This function will
  53. only update the given values of the node.
  54. */
  55. UpdateNode(part string, node data.Node) error
  56. /*
  57. RemoveNode removes a single node from a partition of the graph.
  58. */
  59. RemoveNode(part string, nkey string, nkind string) error
  60. /*
  61. StoreEdge stores a single edge in a partition of the graph. This function will
  62. overwrites any existing edge.
  63. */
  64. StoreEdge(part string, edge data.Edge) error
  65. /*
  66. RemoveEdge removes a single edge from a partition of the graph.
  67. */
  68. RemoveEdge(part string, ekey string, ekind string) error
  69. }
  70. /*
  71. NewGraphTrans creates a new graph transaction. This object is not thread safe
  72. and should only be used for non-concurrent use cases; use NewConcurrentGraphTrans
  73. for concurrent use cases.
  74. */
  75. func NewGraphTrans(gm *Manager) Trans {
  76. return newInternalGraphTrans(gm)
  77. }
  78. /*
  79. NewConcurrentGraphTrans creates a new thread-safe graph transaction.
  80. */
  81. func NewConcurrentGraphTrans(gm *Manager) Trans {
  82. return &concurrentTrans{NewGraphTrans(gm), &sync.RWMutex{}}
  83. }
  84. /*
  85. NewRollingTrans wraps an existing transaction into a rolling transaction.
  86. Rolling transactions can be used for VERY large datasets and will commit
  87. themselves after n operations. Rolling transactions are always thread-safe.
  88. */
  89. func NewRollingTrans(t Trans, n int, gm *Manager, newTrans func(*Manager) Trans) Trans {
  90. idCounterLock.Lock()
  91. defer idCounterLock.Unlock()
  92. idCounter++
  93. // Smallest commit threshold is 1
  94. if n < 1 {
  95. n = 1
  96. }
  97. return &rollingTrans{
  98. id: fmt.Sprint(idCounter),
  99. gm: gm,
  100. currentTrans: t,
  101. newTransFunc: newTrans,
  102. transErrors: errorutil.NewCompositeError(),
  103. opThreshold: n,
  104. opCount: 0,
  105. inFlightCount: 0,
  106. wg: &sync.WaitGroup{},
  107. countNodeIns: 0,
  108. countNodeRem: 0,
  109. countEdgeIns: 0,
  110. countEdgeRem: 0,
  111. transLock: &sync.RWMutex{},
  112. }
  113. }
  114. /*
  115. newInternalGraphTrans is used for internal transactions. The returned object
  116. contains extra fields which are only for internal use.
  117. */
  118. func newInternalGraphTrans(gm *Manager) *baseTrans {
  119. idCounterLock.Lock()
  120. defer idCounterLock.Unlock()
  121. idCounter++
  122. return &baseTrans{fmt.Sprint(idCounter), gm, false, make(map[string]data.Node), make(map[string]data.Node),
  123. make(map[string]data.Edge), make(map[string]data.Edge)}
  124. }
  125. /*
  126. idCounter is a simple counter for ids
  127. */
  128. var idCounter uint64
  129. var idCounterLock = &sync.Mutex{}
  130. /*
  131. baseTrans is the main data structure for a graph transaction
  132. */
  133. type baseTrans struct {
  134. id string // Unique transaction ID - not used by EliasDB
  135. gm *Manager // Graph manager which created this transaction
  136. subtrans bool // Flag if the transaction is a subtransaction
  137. storeNodes map[string]data.Node // Nodes which should be stored
  138. removeNodes map[string]data.Node // Nodes which should be removed
  139. storeEdges map[string]data.Edge // Edges which should be stored
  140. removeEdges map[string]data.Edge // Edges which should be removed
  141. }
  142. /*
  143. ID returns a unique transaction ID.
  144. */
  145. func (gt *baseTrans) ID() string {
  146. return gt.id
  147. }
  148. /*
  149. IsEmpty returns if this transaction is empty.
  150. */
  151. func (gt *baseTrans) IsEmpty() bool {
  152. sn, se, rn, re := gt.Counts()
  153. return sn == 0 && se == 0 && rn == 0 && re == 0
  154. }
  155. /*
  156. Counts returns the transaction size in terms of objects. Returned values
  157. are nodes to store, edges to store, nodes to remove and edges to remove.
  158. */
  159. func (gt *baseTrans) Counts() (int, int, int, int) {
  160. return len(gt.storeNodes), len(gt.storeEdges), len(gt.removeNodes), len(gt.removeEdges)
  161. }
  162. /*
  163. String returns a string representation of this transatction.
  164. */
  165. func (gt *baseTrans) String() string {
  166. sn, se, rn, re := gt.Counts()
  167. return fmt.Sprintf("Transaction %v - Nodes: I:%v R:%v - Edges: I:%v R:%v",
  168. gt.id, sn, rn, se, re)
  169. }
  170. /*
  171. Commit writes the transaction to the graph database. An automatic rollback is done if
  172. any non-fatal error occurs. Failed transactions cannot be committed again.
  173. Serious write errors which may corrupt the database will cause a panic.
  174. */
  175. func (gt *baseTrans) Commit() error {
  176. // Take writer lock if we are not in a subtransaction
  177. if !gt.subtrans {
  178. gt.gm.mutex.Lock()
  179. defer gt.gm.mutex.Unlock()
  180. }
  181. // Return if there is nothing to do
  182. if gt.IsEmpty() {
  183. return nil
  184. }
  185. doRollback := func(nodePartsAndKinds map[string]string,
  186. edgePartsAndKinds map[string]string) {
  187. // Rollback main database
  188. gt.gm.gs.RollbackMain()
  189. // Rollback node storages
  190. for kkey := range nodePartsAndKinds {
  191. partAndKind := strings.Split(kkey, "#")
  192. gt.gm.rollbackNodeIndex(partAndKind[0], partAndKind[1])
  193. gt.gm.rollbackNodeStorage(partAndKind[0], partAndKind[1])
  194. }
  195. gt.storeNodes = make(map[string]data.Node)
  196. gt.removeNodes = make(map[string]data.Node)
  197. // Rollback edge storages
  198. if edgePartsAndKinds != nil {
  199. for kkey := range edgePartsAndKinds {
  200. partAndKind := strings.Split(kkey, "#")
  201. gt.gm.rollbackEdgeIndex(partAndKind[0], partAndKind[1])
  202. gt.gm.rollbackEdgeStorage(partAndKind[0], partAndKind[1])
  203. }
  204. }
  205. gt.storeEdges = make(map[string]data.Edge)
  206. gt.removeEdges = make(map[string]data.Edge)
  207. }
  208. // Write nodes and edges until everything has been written
  209. nodePartsAndKinds := make(map[string]string)
  210. edgePartsAndKinds := make(map[string]string)
  211. for !gt.IsEmpty() {
  212. // Write the nodes first
  213. if err := gt.commitNodes(nodePartsAndKinds, edgePartsAndKinds); err != nil {
  214. doRollback(nodePartsAndKinds, nil)
  215. return err
  216. }
  217. // After the nodes write the edges
  218. if err := gt.commitEdges(nodePartsAndKinds, edgePartsAndKinds); err != nil {
  219. doRollback(nodePartsAndKinds, edgePartsAndKinds)
  220. return err
  221. }
  222. }
  223. // Flush changes - panic instead of error reporting since the database
  224. // may be inconsistent
  225. panicIfError := func(err error) {
  226. if err != nil {
  227. panic("Fatal GraphError:" + err.Error())
  228. }
  229. }
  230. panicIfError(gt.gm.gs.FlushMain())
  231. for kkey := range nodePartsAndKinds {
  232. partAndKind := strings.Split(kkey, "#")
  233. panicIfError(gt.gm.flushNodeIndex(partAndKind[0], partAndKind[1]))
  234. panicIfError(gt.gm.flushNodeStorage(partAndKind[0], partAndKind[1]))
  235. }
  236. for kkey := range edgePartsAndKinds {
  237. partAndKind := strings.Split(kkey, "#")
  238. panicIfError(gt.gm.flushEdgeIndex(partAndKind[0], partAndKind[1]))
  239. panicIfError(gt.gm.flushEdgeStorage(partAndKind[0], partAndKind[1]))
  240. }
  241. return nil
  242. }
  243. /*
  244. commitNodes tries to commit all transaction nodes.
  245. */
  246. func (gt *baseTrans) commitNodes(nodePartsAndKinds map[string]string, edgePartsAndKinds map[string]string) error {
  247. // First insert nodes
  248. for tkey, node := range gt.storeNodes {
  249. // Get partition and kind
  250. partAndKind := strings.Split(tkey, "#")
  251. nodePartsAndKinds[partAndKind[0]+"#"+partAndKind[1]] = ""
  252. part := partAndKind[0]
  253. // Get the HTrees which stores the node index and node
  254. iht, err := gt.gm.getNodeIndexHTree(part, node.Kind(), true)
  255. if err != nil {
  256. return err
  257. }
  258. attht, valht, err := gt.gm.getNodeStorageHTree(part, node.Kind(), true)
  259. if err != nil {
  260. return err
  261. }
  262. // Write the node to the datastore
  263. oldnode, err := gt.gm.writeNode(node, false, attht, valht, nodeAttributeFilter)
  264. if err != nil {
  265. return err
  266. }
  267. // Increase node count if the node was inserted and write the changes
  268. // to the index.
  269. if oldnode == nil {
  270. currentCount := gt.gm.NodeCount(node.Kind())
  271. gt.gm.writeNodeCount(node.Kind(), currentCount+1, false)
  272. if iht != nil {
  273. err := util.NewIndexManager(iht).Index(node.Key(), node.IndexMap())
  274. if err != nil {
  275. // The node was written at this point and the model is
  276. // consistent only the index is missing entries
  277. return err
  278. }
  279. }
  280. } else if iht != nil {
  281. err := util.NewIndexManager(iht).Reindex(node.Key(), node.IndexMap(),
  282. oldnode.IndexMap())
  283. if err != nil {
  284. // The node was written at this point and the model is
  285. // consistent only the index is missing entries
  286. return err
  287. }
  288. }
  289. // Execute rules
  290. var event int
  291. if oldnode == nil {
  292. event = EventNodeCreated
  293. } else {
  294. event = EventNodeUpdated
  295. }
  296. if err := gt.gm.gr.graphEvent(gt, event, part, node, oldnode); err != nil {
  297. return err
  298. }
  299. delete(gt.storeNodes, tkey)
  300. }
  301. // Then remove nodes
  302. for tkey, node := range gt.removeNodes {
  303. // Get partition and kind
  304. partAndKind := strings.Split(tkey, "#")
  305. nodePartsAndKinds[partAndKind[0]+"#"+partAndKind[1]] = ""
  306. part := partAndKind[0]
  307. // Get the HTree which stores the node index and node kind
  308. iht, err := gt.gm.getNodeIndexHTree(part, node.Kind(), false)
  309. if err != nil {
  310. return err
  311. }
  312. attTree, valTree, err := gt.gm.getNodeStorageHTree(part, node.Kind(), false)
  313. if err != nil {
  314. return err
  315. }
  316. if attTree == nil || valTree == nil {
  317. // Kind does not exist - continue
  318. delete(gt.removeNodes, tkey)
  319. continue
  320. }
  321. // Delete the node from the datastore
  322. oldnode, err := gt.gm.deleteNode(node.Key(), node.Kind(), attTree, valTree)
  323. if err != nil {
  324. return err
  325. }
  326. // Update the index
  327. if oldnode != nil {
  328. if iht != nil {
  329. err := util.NewIndexManager(iht).Deindex(node.Key(), oldnode.IndexMap())
  330. if err != nil {
  331. return err
  332. }
  333. }
  334. // Decrease the node count
  335. currentCount := gt.gm.NodeCount(node.Kind())
  336. gt.gm.writeNodeCount(node.Kind(), currentCount-1, false)
  337. // Execute rules
  338. if err := gt.gm.gr.graphEvent(gt, EventNodeDeleted, part, oldnode); err != nil {
  339. return err
  340. }
  341. }
  342. delete(gt.removeNodes, tkey)
  343. }
  344. return nil
  345. }
  346. /*
  347. commitEdges tries to commit all transaction edges.
  348. */
  349. func (gt *baseTrans) commitEdges(nodePartsAndKinds map[string]string, edgePartsAndKinds map[string]string) error {
  350. // First insert edges
  351. for tkey, edge := range gt.storeEdges {
  352. // Get partition and kind
  353. partAndKind := strings.Split(tkey, "#")
  354. edgePartsAndKinds[partAndKind[0]+"#"+partAndKind[1]] = ""
  355. nodePartsAndKinds[partAndKind[0]+"#"+edge.End1Kind()] = ""
  356. nodePartsAndKinds[partAndKind[0]+"#"+edge.End2Kind()] = ""
  357. part := partAndKind[0]
  358. // Get the HTrees which stores the edges and the edge index
  359. iht, err := gt.gm.getEdgeIndexHTree(part, edge.Kind(), true)
  360. if err != nil {
  361. return err
  362. }
  363. edgeht, err := gt.gm.getEdgeStorageHTree(part, edge.Kind(), true)
  364. if err != nil {
  365. return err
  366. }
  367. // Get the HTrees which stores the edge endpoints and make sure the endpoints
  368. // do exist
  369. end1nodeht, end1ht, err := gt.gm.getNodeStorageHTree(part, edge.End1Kind(), false)
  370. if err != nil {
  371. return err
  372. } else if end1ht == nil {
  373. return &util.GraphError{
  374. Type: util.ErrInvalidData,
  375. Detail: fmt.Sprintf("Can't store edge to non-existing node kind: %v", edge.End1Kind()),
  376. }
  377. } else if end1, err := end1nodeht.Get([]byte(PrefixNSAttrs + edge.End1Key())); err != nil || end1 == nil {
  378. return &util.GraphError{
  379. Type: util.ErrInvalidData,
  380. Detail: fmt.Sprintf("Can't find edge endpoint: %s (%s)", edge.End1Key(), edge.End1Kind()),
  381. }
  382. }
  383. end2nodeht, end2ht, err := gt.gm.getNodeStorageHTree(part, edge.End2Kind(), false)
  384. if err != nil {
  385. return err
  386. } else if end2ht == nil {
  387. return &util.GraphError{
  388. Type: util.ErrInvalidData,
  389. Detail: "Can't store edge to non-existing node kind: " + edge.End2Kind()}
  390. } else if end2, err := end2nodeht.Get([]byte(PrefixNSAttrs + edge.End2Key())); err != nil || end2 == nil {
  391. return &util.GraphError{
  392. Type: util.ErrInvalidData,
  393. Detail: fmt.Sprintf("Can't find edge endpoint: %s (%s)", edge.End2Key(), edge.End2Kind()),
  394. }
  395. }
  396. // Write edge to the datastore
  397. oldedge, err := gt.gm.writeEdge(edge, edgeht, end1ht, end2ht)
  398. if err != nil {
  399. return err
  400. }
  401. // Increase edge count if the edge was inserted and write the changes
  402. // to the index.
  403. if oldedge == nil {
  404. // Increase edge count
  405. currentCount := gt.gm.EdgeCount(edge.Kind())
  406. gt.gm.writeEdgeCount(edge.Kind(), currentCount+1, false)
  407. // Write edge data to the index
  408. if iht != nil {
  409. if err := util.NewIndexManager(iht).Index(edge.Key(), edge.IndexMap()); err != nil {
  410. // The edge was written at this point and the model is
  411. // consistent only the index is missing entries
  412. return err
  413. }
  414. }
  415. } else if iht != nil {
  416. err := util.NewIndexManager(iht).Reindex(edge.Key(), edge.IndexMap(),
  417. oldedge.IndexMap())
  418. if err != nil {
  419. // The edge was written at this point and the model is
  420. // consistent only the index is missing entries
  421. return err
  422. }
  423. }
  424. // Execute rules
  425. var event int
  426. if oldedge == nil {
  427. event = EventEdgeCreated
  428. } else {
  429. event = EventEdgeUpdated
  430. }
  431. if err := gt.gm.gr.graphEvent(gt, event, part, edge, oldedge); err != nil {
  432. return err
  433. }
  434. delete(gt.storeEdges, tkey)
  435. }
  436. // Then remove edges
  437. for tkey, edge := range gt.removeEdges {
  438. // Get partition and kind
  439. partAndKind := strings.Split(tkey, "#")
  440. edgePartsAndKinds[partAndKind[0]+"#"+partAndKind[1]] = ""
  441. nodePartsAndKinds[partAndKind[0]+"#"+edge.End1Kind()] = ""
  442. nodePartsAndKinds[partAndKind[0]+"#"+edge.End2Kind()] = ""
  443. part := partAndKind[0]
  444. // Get the HTrees which stores the edges and the edge index
  445. iht, err := gt.gm.getEdgeIndexHTree(part, edge.Kind(), true)
  446. if err != nil {
  447. return err
  448. }
  449. edgeht, err := gt.gm.getEdgeStorageHTree(part, edge.Kind(), true)
  450. if err != nil {
  451. return err
  452. }
  453. // Delete the node from the datastore
  454. node, err := gt.gm.deleteNode(edge.Key(), edge.Kind(), edgeht, edgeht)
  455. oldedge := data.NewGraphEdgeFromNode(node)
  456. if err != nil {
  457. return err
  458. }
  459. if node != nil {
  460. // Get the HTrees which stores the edge endpoints
  461. _, end1ht, err := gt.gm.getNodeStorageHTree(part, oldedge.End1Kind(), false)
  462. if err != nil {
  463. return err
  464. }
  465. _, end2ht, err := gt.gm.getNodeStorageHTree(part, oldedge.End2Kind(), false)
  466. if err != nil {
  467. return err
  468. }
  469. // Delete edge info from node storage
  470. if err := gt.gm.deleteEdge(oldedge, end1ht, end2ht); err != nil {
  471. return err
  472. }
  473. if iht != nil {
  474. err := util.NewIndexManager(iht).Deindex(edge.Key(), oldedge.IndexMap())
  475. if err != nil {
  476. return err
  477. }
  478. }
  479. // Decrease edge count
  480. currentCount := gt.gm.EdgeCount(oldedge.Kind())
  481. gt.gm.writeEdgeCount(oldedge.Kind(), currentCount-1, false)
  482. // Execute rules
  483. if err := gt.gm.gr.graphEvent(gt, EventEdgeDeleted, part, oldedge); err != nil {
  484. return err
  485. }
  486. }
  487. delete(gt.removeEdges, tkey)
  488. }
  489. return nil
  490. }
  491. /*
  492. StoreNode stores a single node in a partition of the graph. This function will
  493. overwrites any existing node.
  494. */
  495. func (gt *baseTrans) StoreNode(part string, node data.Node) error {
  496. if err := gt.gm.checkPartitionName(part); err != nil {
  497. return err
  498. } else if err := gt.gm.checkNode(node); err != nil {
  499. return err
  500. }
  501. key := gt.createKey(part, node.Key(), node.Kind())
  502. if _, ok := gt.removeNodes[key]; ok {
  503. delete(gt.removeNodes, key)
  504. }
  505. gt.storeNodes[key] = node
  506. return nil
  507. }
  508. /*
  509. UpdateNode updates a single node in a partition of the graph. This function will
  510. only update the given values of the node.
  511. */
  512. func (gt *baseTrans) UpdateNode(part string, node data.Node) error {
  513. if err := gt.gm.checkPartitionName(part); err != nil {
  514. return err
  515. } else if err := gt.gm.checkNode(node); err != nil {
  516. return err
  517. }
  518. key := gt.createKey(part, node.Key(), node.Kind())
  519. if _, ok := gt.removeNodes[key]; ok {
  520. delete(gt.removeNodes, key)
  521. } else if storeNode, ok := gt.storeNodes[key]; ok {
  522. node = data.NodeMerge(storeNode, node)
  523. } else {
  524. // Check the actual database if the node exists
  525. storeNode, err := gt.gm.FetchNode(part, node.Key(), node.Kind())
  526. if err != nil {
  527. return err
  528. } else if storeNode != nil {
  529. node = data.NodeMerge(storeNode, node)
  530. }
  531. }
  532. gt.storeNodes[key] = node
  533. return nil
  534. }
  535. /*
  536. RemoveNode removes a single node from a partition of the graph.
  537. */
  538. func (gt *baseTrans) RemoveNode(part string, nkey string, nkind string) error {
  539. if err := gt.gm.checkPartitionName(part); err != nil {
  540. return err
  541. }
  542. key := gt.createKey(part, nkey, nkind)
  543. if _, ok := gt.storeNodes[key]; ok {
  544. delete(gt.storeNodes, key)
  545. }
  546. node := data.NewGraphNode()
  547. node.SetAttr(data.NodeKey, nkey)
  548. node.SetAttr(data.NodeKind, nkind)
  549. gt.removeNodes[key] = node
  550. return nil
  551. }
  552. /*
  553. StoreEdge stores a single edge in a partition of the graph. This function will
  554. overwrites any existing edge.
  555. */
  556. func (gt *baseTrans) StoreEdge(part string, edge data.Edge) error {
  557. if err := gt.gm.checkPartitionName(part); err != nil {
  558. return err
  559. } else if err := gt.gm.checkEdge(edge); err != nil {
  560. return err
  561. }
  562. key := gt.createKey(part, edge.Key(), edge.Kind())
  563. if _, ok := gt.removeEdges[key]; ok {
  564. delete(gt.removeEdges, key)
  565. }
  566. gt.storeEdges[key] = edge
  567. return nil
  568. }
  569. /*
  570. RemoveEdge removes a single edge from a partition of the graph.
  571. */
  572. func (gt *baseTrans) RemoveEdge(part string, ekey string, ekind string) error {
  573. if err := gt.gm.checkPartitionName(part); err != nil {
  574. return err
  575. }
  576. key := gt.createKey(part, ekey, ekind)
  577. if _, ok := gt.storeEdges[key]; ok {
  578. delete(gt.storeEdges, key)
  579. }
  580. edge := data.NewGraphEdge()
  581. edge.SetAttr(data.NodeKey, ekey)
  582. edge.SetAttr(data.NodeKind, ekind)
  583. gt.removeEdges[key] = edge
  584. return nil
  585. }
  586. /*
  587. Create a key for the transaction storage.
  588. */
  589. func (gt *baseTrans) createKey(part string, key string, kind string) string {
  590. return part + "#" + kind + "#" + key
  591. }
  592. /*
  593. concurrentTrans is a lock-wrapper around baseTrans which allows concurrent use.
  594. */
  595. type concurrentTrans struct {
  596. Trans
  597. transLock *sync.RWMutex
  598. }
  599. /*
  600. ID returns a unique transaction ID.
  601. */
  602. func (gt *concurrentTrans) ID() string {
  603. gt.transLock.RLock()
  604. defer gt.transLock.RUnlock()
  605. return gt.Trans.ID()
  606. }
  607. /*
  608. String returns a string representation of this transatction.
  609. */
  610. func (gt *concurrentTrans) String() string {
  611. gt.transLock.RLock()
  612. defer gt.transLock.RUnlock()
  613. return gt.Trans.String()
  614. }
  615. /*
  616. Counts returns the transaction size in terms of objects. Returned values
  617. are nodes to store, edges to store, nodes to remove and edges to remove.
  618. */
  619. func (gt *concurrentTrans) Counts() (int, int, int, int) {
  620. gt.transLock.RLock()
  621. defer gt.transLock.RUnlock()
  622. return gt.Trans.Counts()
  623. }
  624. /*
  625. IsEmpty returns if this transaction is empty.
  626. */
  627. func (gt *concurrentTrans) IsEmpty() bool {
  628. gt.transLock.RLock()
  629. defer gt.transLock.RUnlock()
  630. return gt.Trans.IsEmpty()
  631. }
  632. /*
  633. Commit writes the transaction to the graph database. An automatic rollback is done if
  634. any non-fatal error occurs. Failed transactions cannot be committed again.
  635. Serious write errors which may corrupt the database will cause a panic.
  636. */
  637. func (gt *concurrentTrans) Commit() error {
  638. gt.transLock.Lock()
  639. defer gt.transLock.Unlock()
  640. return gt.Trans.Commit()
  641. }
  642. /*
  643. StoreNode stores a single node in a partition of the graph. This function will
  644. overwrites any existing node.
  645. */
  646. func (gt *concurrentTrans) StoreNode(part string, node data.Node) error {
  647. gt.transLock.Lock()
  648. defer gt.transLock.Unlock()
  649. return gt.Trans.StoreNode(part, node)
  650. }
  651. /*
  652. UpdateNode updates a single node in a partition of the graph. This function will
  653. only update the given values of the node.
  654. */
  655. func (gt *concurrentTrans) UpdateNode(part string, node data.Node) error {
  656. gt.transLock.Lock()
  657. defer gt.transLock.Unlock()
  658. return gt.Trans.UpdateNode(part, node)
  659. }
  660. /*
  661. RemoveNode removes a single node from a partition of the graph.
  662. */
  663. func (gt *concurrentTrans) RemoveNode(part string, nkey string, nkind string) error {
  664. gt.transLock.Lock()
  665. defer gt.transLock.Unlock()
  666. return gt.Trans.RemoveNode(part, nkey, nkind)
  667. }
  668. /*
  669. StoreEdge stores a single edge in a partition of the graph. This function will
  670. overwrites any existing edge.
  671. */
  672. func (gt *concurrentTrans) StoreEdge(part string, edge data.Edge) error {
  673. gt.transLock.Lock()
  674. defer gt.transLock.Unlock()
  675. return gt.Trans.StoreEdge(part, edge)
  676. }
  677. /*
  678. RemoveEdge removes a single edge from a partition of the graph.
  679. */
  680. func (gt *concurrentTrans) RemoveEdge(part string, ekey string, ekind string) error {
  681. gt.transLock.Lock()
  682. defer gt.transLock.Unlock()
  683. return gt.Trans.RemoveEdge(part, ekey, ekind)
  684. }
  685. /*
  686. rollingTrans is a rolling transaction which will commit itself after
  687. n operations.
  688. */
  689. type rollingTrans struct {
  690. id string // ID of this transaction
  691. gm *Manager // Graph manager which created this transaction
  692. currentTrans Trans // Current transaction which is build up
  693. newTransFunc func(*Manager) Trans // Function to create a new transaction
  694. transErrors *errorutil.CompositeError // Collected transaction errors
  695. opThreshold int // Operation threshold
  696. opCount int // Operation count
  697. inFlightCount int // Previous transactions which are still committing
  698. wg *sync.WaitGroup // WaitGroup which releases after all in-flight transactions
  699. countNodeIns int // Count for inserted nodes
  700. countNodeRem int // Count for removed nodes
  701. countEdgeIns int // Count for inserted edges
  702. countEdgeRem int // Count for removed edges
  703. transLock *sync.RWMutex // Lock for this transaction
  704. }
  705. /*
  706. ID returns a unique transaction ID.
  707. */
  708. func (gt *rollingTrans) ID() string {
  709. gt.transLock.RLock()
  710. defer gt.transLock.RUnlock()
  711. return gt.id
  712. }
  713. /*
  714. IsEmpty returns if this transaction is empty.
  715. */
  716. func (gt *rollingTrans) IsEmpty() bool {
  717. sn, se, rn, re := gt.Counts()
  718. return sn == 0 && se == 0 && rn == 0 && re == 0
  719. }
  720. /*
  721. Counts returns the transaction size in terms of objects. Returned values
  722. are nodes to store, edges to store, nodes to remove and edges to remove.
  723. */
  724. func (gt *rollingTrans) Counts() (int, int, int, int) {
  725. gt.transLock.RLock()
  726. defer gt.transLock.RUnlock()
  727. // Count current trans
  728. ns, es, nr, er := gt.currentTrans.Counts()
  729. return ns + gt.countNodeIns, es + gt.countEdgeIns,
  730. nr + gt.countNodeRem, er + gt.countEdgeRem
  731. }
  732. /*
  733. String returns a string representation of this transatction.
  734. */
  735. func (gt *rollingTrans) String() string {
  736. gt.transLock.RLock()
  737. defer gt.transLock.RUnlock()
  738. ns, es, nr, er := gt.currentTrans.Counts()
  739. return fmt.Sprintf("Rolling transaction %v - Nodes: I:%v R:%v - "+
  740. "Edges: I:%v R:%v - Threshold: %v - In-flight: %v",
  741. gt.id, ns+gt.countNodeIns, nr+gt.countNodeRem, es+gt.countEdgeIns,
  742. er+gt.countEdgeRem, gt.opThreshold, gt.inFlightCount)
  743. }
  744. /*
  745. Commit writes the remaining operations of this rolling transaction to
  746. the graph database.
  747. */
  748. func (gt *rollingTrans) Commit() error {
  749. // Commit current transaction
  750. gt.transLock.Lock()
  751. if err := gt.currentTrans.Commit(); err != nil {
  752. gt.transErrors.Add(err)
  753. }
  754. gt.transLock.Unlock()
  755. // Wait for other transactions
  756. gt.wg.Wait()
  757. // Return any errors
  758. if gt.transErrors.HasErrors() {
  759. return gt.transErrors
  760. }
  761. return nil
  762. }
  763. /*
  764. checkNewSubTrans checks if a new sub-transaction should be started.
  765. */
  766. func (gt *rollingTrans) checkNewSubTrans() {
  767. if gt.opCount++; gt.opCount >= gt.opThreshold {
  768. // Reset the op counter
  769. gt.opCount = 0
  770. // Start a new transaction and add the counts to the overall counts
  771. cTrans := gt.currentTrans
  772. gt.currentTrans = gt.newTransFunc(gt.gm)
  773. ns, es, nr, er := cTrans.Counts()
  774. gt.countNodeIns += ns
  775. gt.countNodeRem += nr
  776. gt.countEdgeIns += es
  777. gt.countEdgeRem += er
  778. // Start go routine which commits the current transaction
  779. gt.wg.Add(1) // Add to WaitGroup so we can wait for all in-flight transactions
  780. gt.inFlightCount++ // Count the new in-flight transaction
  781. go func() {
  782. defer gt.wg.Done()
  783. err := cTrans.Commit()
  784. gt.transLock.Lock()
  785. if err != nil {
  786. // Store errors
  787. gt.transErrors.Add(err)
  788. }
  789. // Reduce the counts (do this even if there were errors)
  790. gt.countNodeIns -= ns
  791. gt.countNodeRem -= nr
  792. gt.countEdgeIns -= es
  793. gt.countEdgeRem -= er
  794. gt.inFlightCount--
  795. gt.transLock.Unlock()
  796. }()
  797. }
  798. }
  799. /*
  800. StoreNode stores a single node in a partition of the graph. This function will
  801. overwrites any existing node.
  802. */
  803. func (gt *rollingTrans) StoreNode(part string, node data.Node) error {
  804. gt.transLock.Lock()
  805. defer gt.transLock.Unlock()
  806. err := gt.currentTrans.StoreNode(part, node)
  807. if err == nil {
  808. gt.checkNewSubTrans()
  809. }
  810. return err
  811. }
  812. /*
  813. UpdateNode updates a single node in a partition of the graph. This function will
  814. only update the given values of the node.
  815. */
  816. func (gt *rollingTrans) UpdateNode(part string, node data.Node) error {
  817. gt.transLock.Lock()
  818. defer gt.transLock.Unlock()
  819. err := gt.currentTrans.UpdateNode(part, node)
  820. if err == nil {
  821. gt.checkNewSubTrans()
  822. }
  823. return err
  824. }
  825. /*
  826. RemoveNode removes a single node from a partition of the graph.
  827. */
  828. func (gt *rollingTrans) RemoveNode(part string, nkey string, nkind string) error {
  829. gt.transLock.Lock()
  830. defer gt.transLock.Unlock()
  831. err := gt.currentTrans.RemoveNode(part, nkey, nkind)
  832. if err == nil {
  833. gt.checkNewSubTrans()
  834. }
  835. return err
  836. }
  837. /*
  838. StoreEdge stores a single edge in a partition of the graph. This function will
  839. overwrites any existing edge.
  840. */
  841. func (gt *rollingTrans) StoreEdge(part string, edge data.Edge) error {
  842. gt.transLock.Lock()
  843. defer gt.transLock.Unlock()
  844. err := gt.currentTrans.StoreEdge(part, edge)
  845. if err == nil {
  846. gt.checkNewSubTrans()
  847. }
  848. return err
  849. }
  850. /*
  851. RemoveEdge removes a single edge from a partition of the graph.
  852. */
  853. func (gt *rollingTrans) RemoveEdge(part string, ekey string, ekind string) error {
  854. gt.transLock.Lock()
  855. defer gt.transLock.Unlock()
  856. err := gt.currentTrans.RemoveEdge(part, ekey, ekind)
  857. if err == nil {
  858. gt.checkNewSubTrans()
  859. }
  860. return err
  861. }