graphmanager_nodes.go 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605
  1. /*
  2. * EliasDB
  3. *
  4. * Copyright 2016 Matthias Ladkau. All rights reserved.
  5. *
  6. * This Source Code Form is subject to the terms of the Mozilla Public
  7. * License, v. 2.0. If a copy of the MPL was not distributed with this
  8. * file, You can obtain one at http://mozilla.org/MPL/2.0/.
  9. */
  10. package graph
  11. import (
  12. "encoding/binary"
  13. "encoding/gob"
  14. "devt.de/krotik/eliasdb/graph/data"
  15. "devt.de/krotik/eliasdb/graph/util"
  16. "devt.de/krotik/eliasdb/hash"
  17. )
  18. func init() {
  19. // It is possible to store nested structures on nodes
  20. gob.Register(make(map[string]interface{}))
  21. }
  22. /*
  23. NodeCount returns the node count for a given node kind.
  24. */
  25. func (gm *Manager) NodeCount(kind string) uint64 {
  26. if val, ok := gm.gs.MainDB()[MainDBNodeCount+kind]; ok {
  27. return binary.LittleEndian.Uint64([]byte(val))
  28. }
  29. return 0
  30. }
  31. /*
  32. NodeKeyIterator iterates node keys of a certain kind.
  33. */
  34. func (gm *Manager) NodeKeyIterator(part string, kind string) (*NodeKeyIterator, error) {
  35. // Get the HTrees which stores the node
  36. tree, _, err := gm.getNodeStorageHTree(part, kind, false)
  37. if err != nil || tree == nil {
  38. return nil, err
  39. }
  40. it := hash.NewHTreeIterator(tree)
  41. if it.LastError != nil {
  42. return nil, &util.GraphError{
  43. Type: util.ErrReading,
  44. Detail: it.LastError.Error(),
  45. }
  46. }
  47. return &NodeKeyIterator{gm, it, nil}, nil
  48. }
  49. /*
  50. FetchNode fetches a single node from a partition of the graph.
  51. */
  52. func (gm *Manager) FetchNode(part string, key string, kind string) (data.Node, error) {
  53. return gm.FetchNodePart(part, key, kind, nil)
  54. }
  55. /*
  56. FetchNodePart fetches part of a single node from a partition of the graph.
  57. */
  58. func (gm *Manager) FetchNodePart(part string, key string, kind string,
  59. attrs []string) (data.Node, error) {
  60. // Get the HTrees which stores the node
  61. attht, valht, err := gm.getNodeStorageHTree(part, kind, false)
  62. if err != nil || attht == nil || valht == nil {
  63. return nil, err
  64. }
  65. // Take reader lock
  66. gm.mutex.RLock()
  67. defer gm.mutex.RUnlock()
  68. // Read the node from the datastore
  69. return gm.readNode(key, kind, attrs, attht, valht)
  70. }
  71. /*
  72. readNode reads a given node from the datastore.
  73. */
  74. func (gm *Manager) readNode(key string, kind string, attrs []string,
  75. attrTree *hash.HTree, valTree *hash.HTree) (data.Node, error) {
  76. keyAttrs := PrefixNSAttrs + key
  77. keyAttrPrefix := PrefixNSAttr + key
  78. // Check if the node exists
  79. attrList, err := attrTree.Get([]byte(keyAttrs))
  80. if err != nil {
  81. return nil, &util.GraphError{Type: util.ErrReading, Detail: err.Error()}
  82. } else if attrList == nil {
  83. return nil, nil
  84. }
  85. var node data.Node
  86. tryPopulateNode := func(encattr string, attr string) error {
  87. // Try to lookup the attribute
  88. val, err := valTree.Get([]byte(keyAttrPrefix + encattr))
  89. if err != nil {
  90. return &util.GraphError{Type: util.ErrReading, Detail: err.Error()}
  91. }
  92. if val != nil {
  93. if node == nil {
  94. node = data.NewGraphNode()
  95. }
  96. node.SetAttr(attr, val)
  97. }
  98. return nil
  99. }
  100. if len(attrs) == 0 {
  101. // Allways create a node if we fetch all attributes
  102. node = data.NewGraphNode()
  103. // Lookup all attributes
  104. for _, encattr := range attrList.([]string) {
  105. attr := gm.nm.Decode32(encattr)
  106. if err := tryPopulateNode(encattr, attr); err != nil {
  107. return nil, err
  108. }
  109. }
  110. } else {
  111. // Lookup the given attributes - it is assumed that most of the time the
  112. // queried attributes do exist
  113. for _, attr := range attrs {
  114. if (attr == data.NodeKey || attr == data.NodeKind) && node == nil {
  115. // Create node - we might only query for node key or node kind
  116. node = data.NewGraphNode()
  117. continue
  118. }
  119. // Only try to populate the attribute if it can be decoded
  120. if encattr := gm.nm.Encode32(attr, false); encattr != "" {
  121. if err := tryPopulateNode(encattr, attr); err != nil {
  122. return nil, err
  123. }
  124. }
  125. }
  126. }
  127. // Set key and kind attributes
  128. if node != nil {
  129. node.SetAttr(data.NodeKey, key)
  130. node.SetAttr(data.NodeKind, kind)
  131. }
  132. return node, nil
  133. }
  134. /*
  135. StoreNode stores a single node in a partition of the graph. This function will
  136. overwrites any existing node.
  137. */
  138. func (gm *Manager) StoreNode(part string, node data.Node) error {
  139. trans := newInternalGraphTrans(gm)
  140. trans.subtrans = true
  141. err := gm.gr.graphEvent(trans, EventNodeStore, part, node)
  142. if err != nil {
  143. if err == ErrEventHandled {
  144. err = nil
  145. }
  146. return err
  147. }
  148. if err = trans.Commit(); err == nil {
  149. err = gm.storeOrUpdateNode(part, node, false)
  150. }
  151. return err
  152. }
  153. /*
  154. UpdateNode updates a single node in a partition of the graph. This function will
  155. only update the given values of the node.
  156. */
  157. func (gm *Manager) UpdateNode(part string, node data.Node) error {
  158. trans := newInternalGraphTrans(gm)
  159. trans.subtrans = true
  160. err := gm.gr.graphEvent(trans, EventNodeUpdate, part, node)
  161. if err != nil {
  162. if err == ErrEventHandled {
  163. err = nil
  164. }
  165. return err
  166. }
  167. if err = trans.Commit(); err == nil {
  168. err = gm.storeOrUpdateNode(part, node, true)
  169. }
  170. return err
  171. }
  172. /*
  173. storeOrUpdateNode stores or updates a single node in a partition of the graph.
  174. */
  175. func (gm *Manager) storeOrUpdateNode(part string, node data.Node, onlyUpdate bool) error {
  176. // Check if the node can be stored
  177. if err := gm.checkNode(node); err != nil {
  178. return err
  179. }
  180. // Get the HTrees which stores the node index and node
  181. iht, err := gm.getNodeIndexHTree(part, node.Kind(), true)
  182. if err != nil {
  183. return err
  184. }
  185. attht, valht, err := gm.getNodeStorageHTree(part, node.Kind(), true)
  186. if err != nil || attht == nil || valht == nil {
  187. return err
  188. }
  189. // Take writer lock
  190. gm.mutex.Lock()
  191. defer gm.mutex.Unlock()
  192. // Write the node to the datastore
  193. oldnode, err := gm.writeNode(node, onlyUpdate, attht, valht, nodeAttributeFilter)
  194. if err != nil {
  195. return err
  196. }
  197. // Increase node count if the node was inserted and write the changes
  198. // to the index.
  199. if oldnode == nil {
  200. currentCount := gm.NodeCount(node.Kind())
  201. if err := gm.writeNodeCount(node.Kind(), currentCount+1, true); err != nil {
  202. return err
  203. }
  204. if iht != nil {
  205. err := util.NewIndexManager(iht).Index(node.Key(), node.IndexMap())
  206. if err != nil {
  207. // The node was written at this point and the model is
  208. // consistent only the index is missing entries
  209. return err
  210. }
  211. }
  212. } else if iht != nil {
  213. err := util.NewIndexManager(iht).Reindex(node.Key(), node.IndexMap(),
  214. oldnode.IndexMap())
  215. if err != nil {
  216. // The node was written at this point and the model is
  217. // consistent only the index is missing entries
  218. return err
  219. }
  220. }
  221. // Execute rules
  222. trans := newInternalGraphTrans(gm)
  223. trans.subtrans = true
  224. var event int
  225. if oldnode == nil {
  226. event = EventNodeCreated
  227. } else {
  228. event = EventNodeUpdated
  229. }
  230. if err := gm.gr.graphEvent(trans, event, part, node, oldnode); err != nil && err != ErrEventHandled {
  231. return err
  232. } else if err := trans.Commit(); err != nil {
  233. return err
  234. }
  235. // Flush changes - errors only reported on the actual node storage flush
  236. gm.gs.FlushMain()
  237. gm.flushNodeIndex(part, node.Kind())
  238. return gm.flushNodeStorage(part, node.Kind())
  239. }
  240. /*
  241. writeNode writes a given node in full or part to the datastore. It is assumed
  242. that the caller holds the writer lock before calling the functions and that,
  243. after the function returns, the changes are flushed to the storage. Returns
  244. the old node if an update occurred. An attribute filter can be speified to skip
  245. specific attributes.
  246. */
  247. func (gm *Manager) writeNode(node data.Node, onlyUpdate bool, attrTree *hash.HTree,
  248. valTree *hash.HTree, attFilter func(attr string) bool) (data.Node, error) {
  249. keyAttrs := PrefixNSAttrs + node.Key()
  250. keyAttrPrefix := PrefixNSAttr + node.Key()
  251. var oldnode data.Node
  252. var attrListOld interface{}
  253. var err error
  254. // Store the node attributes
  255. attrList := make([]string, 0, len(node.IndexMap()))
  256. attrMap := make(map[string]string)
  257. for attr, val := range node.Data() {
  258. // Ignore filtered attributes
  259. if attFilter(attr) {
  260. continue
  261. }
  262. encattr := gm.nm.Encode32(attr, true)
  263. // Build up a lookup map to identify which attribute exist
  264. attrMap[encattr] = ""
  265. // Build up new attributes list
  266. attrList = append(attrList, encattr)
  267. // Store the value in the datastore
  268. oldval, err := valTree.Put([]byte(keyAttrPrefix+encattr), val)
  269. if err != nil {
  270. return nil, &util.GraphError{Type: util.ErrWriting, Detail: err.Error()}
  271. }
  272. // Build up old node
  273. if oldval != nil {
  274. if oldnode == nil {
  275. oldnode = data.NewGraphNode()
  276. }
  277. oldnode.SetAttr(attr, oldval)
  278. }
  279. }
  280. if onlyUpdate {
  281. // If we do only an update write the attribute list only if we added
  282. // new attributes
  283. attrListOld, err = attrTree.Get([]byte(keyAttrs))
  284. if err != nil {
  285. return nil, &util.GraphError{Type: util.ErrReading, Detail: err.Error()}
  286. }
  287. if attrListOld != nil {
  288. // Fill up the attrMap with the existing attributes
  289. for _, encattr := range attrListOld.([]string) {
  290. attrMap[encattr] = ""
  291. }
  292. // Now check if we need to write the attribute list
  293. if len(attrListOld.([]string)) != len(attrMap) {
  294. // Store the new node attributes
  295. attrList = make([]string, 0, len(attrMap))
  296. for encattr := range attrMap {
  297. attrList = append(attrList, encattr)
  298. }
  299. attrListOld, err = attrTree.Put([]byte(keyAttrs), attrList)
  300. }
  301. } else {
  302. // We are actually doing an insert - just write the attribute list
  303. _, err = attrTree.Put([]byte(keyAttrs), attrList)
  304. }
  305. } else {
  306. // Store the new node attributes
  307. attrListOld, err = attrTree.Put([]byte(keyAttrs), attrList)
  308. }
  309. if err != nil {
  310. // Do not try cleanup in case we updated a node - we would do more
  311. // harm than good.
  312. return nil, &util.GraphError{Type: util.ErrWriting, Detail: err.Error()}
  313. }
  314. // Remove deleted keys
  315. if attrListOld != nil {
  316. // Create old node if non of the new attributes were on the old node
  317. if oldnode == nil {
  318. oldnode = data.NewGraphNode()
  319. }
  320. oldnode.SetAttr(data.NodeKey, node.Key())
  321. oldnode.SetAttr(data.NodeKind, node.Kind())
  322. for _, encattrold := range attrListOld.([]string) {
  323. if _, ok := attrMap[encattrold]; !ok {
  324. oldval, err := valTree.Remove([]byte(keyAttrPrefix + encattrold))
  325. if err != nil {
  326. return nil, &util.GraphError{Type: util.ErrWriting, Detail: err.Error()}
  327. }
  328. oldnode.SetAttr(gm.nm.Decode32(encattrold), oldval)
  329. }
  330. }
  331. return oldnode, nil
  332. }
  333. return nil, nil
  334. }
  335. /*
  336. RemoveNode removes a single node from a partition of the graph.
  337. */
  338. func (gm *Manager) RemoveNode(part string, key string, kind string) (data.Node, error) {
  339. var err error
  340. trans := newInternalGraphTrans(gm)
  341. trans.subtrans = true
  342. if err = gm.gr.graphEvent(trans, EventNodeDelete, part, key, kind); err != nil {
  343. if err == ErrEventHandled {
  344. err = nil
  345. }
  346. return nil, err
  347. }
  348. err = trans.Commit()
  349. if err == nil {
  350. // Get the HTree which stores the node index and node kind
  351. iht, err := gm.getNodeIndexHTree(part, kind, false)
  352. if err != nil {
  353. return nil, err
  354. }
  355. attTree, valTree, err := gm.getNodeStorageHTree(part, kind, false)
  356. if err != nil || attTree == nil || valTree == nil {
  357. return nil, err
  358. }
  359. // Take writer lock
  360. gm.mutex.Lock()
  361. defer gm.mutex.Unlock()
  362. // Delete the node from the datastore
  363. node, err := gm.deleteNode(key, kind, attTree, valTree)
  364. if err != nil {
  365. return node, err
  366. }
  367. // Update the index
  368. if node != nil {
  369. if iht != nil {
  370. err := util.NewIndexManager(iht).Deindex(key, node.IndexMap())
  371. if err != nil {
  372. return node, err
  373. }
  374. }
  375. // Decrease the node count
  376. currentCount := gm.NodeCount(kind)
  377. if err := gm.writeNodeCount(kind, currentCount-1, true); err != nil {
  378. return node, err
  379. }
  380. // Execute rules
  381. trans := newInternalGraphTrans(gm)
  382. trans.subtrans = true
  383. if err := gm.gr.graphEvent(trans, EventNodeDeleted, part, node); err != nil && err != ErrEventHandled {
  384. return node, err
  385. } else if err := trans.Commit(); err != nil {
  386. return node, err
  387. }
  388. // Flush changes - errors only reported on the actual node storage flush
  389. gm.gs.FlushMain()
  390. gm.flushNodeIndex(part, kind)
  391. return node, gm.flushNodeStorage(part, kind)
  392. }
  393. }
  394. return nil, err
  395. }
  396. /*
  397. deleteNode deletes a given node from the datastore. It is assumed that the caller
  398. holds the writer lock before calling the functions and that, after the function
  399. returns, the changes are flushed to the storage. Returns the deleted node.
  400. */
  401. func (gm *Manager) deleteNode(key string, kind string, attrTree *hash.HTree,
  402. valTree *hash.HTree) (data.Node, error) {
  403. keyAttrs := PrefixNSAttrs + key
  404. keyAttrPrefix := PrefixNSAttr + key
  405. // Remove the attribute list entry
  406. attrList, err := attrTree.Remove([]byte(keyAttrs))
  407. if err != nil {
  408. return nil, &util.GraphError{Type: util.ErrWriting, Detail: err.Error()}
  409. } else if attrList == nil {
  410. return nil, nil
  411. }
  412. // Create the node object which is returned
  413. node := data.NewGraphNode()
  414. node.SetAttr(data.NodeKey, key)
  415. node.SetAttr(data.NodeKind, kind)
  416. // Remove node attributes
  417. for _, encattr := range attrList.([]string) {
  418. attr := gm.nm.Decode32(encattr)
  419. // Try to remove the attribute
  420. val, err := valTree.Remove([]byte(keyAttrPrefix + encattr))
  421. if err != nil {
  422. return node, &util.GraphError{Type: util.ErrWriting, Detail: err.Error()}
  423. }
  424. node.SetAttr(attr, val)
  425. }
  426. return node, nil
  427. }
  428. /*
  429. Default filter function to filter out system node attributes.
  430. */
  431. func nodeAttributeFilter(attr string) bool {
  432. return attr == data.NodeKey || attr == data.NodeKind
  433. }