graphmanager_nodes.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557
  1. /*
  2. * EliasDB
  3. *
  4. * Copyright 2016 Matthias Ladkau. All rights reserved.
  5. *
  6. * This Source Code Form is subject to the terms of the Mozilla Public
  7. * License, v. 2.0. If a copy of the MPL was not distributed with this
  8. * file, You can obtain one at http://mozilla.org/MPL/2.0/.
  9. */
  10. package graph
  11. import (
  12. "encoding/binary"
  13. "encoding/gob"
  14. "devt.de/krotik/eliasdb/graph/data"
  15. "devt.de/krotik/eliasdb/graph/util"
  16. "devt.de/krotik/eliasdb/hash"
  17. )
  18. func init() {
  19. // It is possible to store nested structures on nodes
  20. gob.Register(make(map[string]interface{}))
  21. }
  22. /*
  23. NodeCount returns the node count for a given node kind.
  24. */
  25. func (gm *Manager) NodeCount(kind string) uint64 {
  26. if val, ok := gm.gs.MainDB()[MainDBNodeCount+kind]; ok {
  27. return binary.LittleEndian.Uint64([]byte(val))
  28. }
  29. return 0
  30. }
  31. /*
  32. NodeKeyIterator iterates node keys of a certain kind.
  33. */
  34. func (gm *Manager) NodeKeyIterator(part string, kind string) (*NodeKeyIterator, error) {
  35. // Get the HTrees which stores the node
  36. tree, _, err := gm.getNodeStorageHTree(part, kind, false)
  37. if err != nil || tree == nil {
  38. return nil, err
  39. }
  40. it := hash.NewHTreeIterator(tree)
  41. if it.LastError != nil {
  42. return nil, &util.GraphError{
  43. Type: util.ErrReading,
  44. Detail: it.LastError.Error(),
  45. }
  46. }
  47. return &NodeKeyIterator{gm, it, nil}, nil
  48. }
  49. /*
  50. FetchNode fetches a single node from a partition of the graph.
  51. */
  52. func (gm *Manager) FetchNode(part string, key string, kind string) (data.Node, error) {
  53. return gm.FetchNodePart(part, key, kind, nil)
  54. }
  55. /*
  56. FetchNodePart fetches part of a single node from a partition of the graph.
  57. */
  58. func (gm *Manager) FetchNodePart(part string, key string, kind string,
  59. attrs []string) (data.Node, error) {
  60. // Get the HTrees which stores the node
  61. attht, valht, err := gm.getNodeStorageHTree(part, kind, false)
  62. if err != nil || attht == nil || valht == nil {
  63. return nil, err
  64. }
  65. // Take reader lock
  66. gm.mutex.RLock()
  67. defer gm.mutex.RUnlock()
  68. // Read the node from the datastore
  69. return gm.readNode(key, kind, attrs, attht, valht)
  70. }
  71. /*
  72. readNode reads a given node from the datastore.
  73. */
  74. func (gm *Manager) readNode(key string, kind string, attrs []string,
  75. attrTree *hash.HTree, valTree *hash.HTree) (data.Node, error) {
  76. keyAttrs := PrefixNSAttrs + key
  77. keyAttrPrefix := PrefixNSAttr + key
  78. // Check if the node exists
  79. attrList, err := attrTree.Get([]byte(keyAttrs))
  80. if err != nil {
  81. return nil, &util.GraphError{Type: util.ErrReading, Detail: err.Error()}
  82. } else if attrList == nil {
  83. return nil, nil
  84. }
  85. var node data.Node
  86. tryPopulateNode := func(encattr string, attr string) error {
  87. // Try to lookup the attribute
  88. val, err := valTree.Get([]byte(keyAttrPrefix + encattr))
  89. if err != nil {
  90. return &util.GraphError{Type: util.ErrReading, Detail: err.Error()}
  91. }
  92. if val != nil {
  93. if node == nil {
  94. node = data.NewGraphNode()
  95. }
  96. node.SetAttr(attr, val)
  97. }
  98. return nil
  99. }
  100. if len(attrs) == 0 {
  101. // Allways create a node if we fetch all attributes
  102. node = data.NewGraphNode()
  103. // Lookup all attributes
  104. for _, encattr := range attrList.([]string) {
  105. attr := gm.nm.Decode32(encattr)
  106. if err := tryPopulateNode(encattr, attr); err != nil {
  107. return nil, err
  108. }
  109. }
  110. } else {
  111. // Lookup the given attributes - it is assumed that most of the time the
  112. // queried attributes do exist
  113. for _, attr := range attrs {
  114. if (attr == data.NodeKey || attr == data.NodeKind) && node == nil {
  115. // Create node - we might only query for node key or node kind
  116. node = data.NewGraphNode()
  117. continue
  118. }
  119. // Only try to populate the attribute if it can be decoded
  120. if encattr := gm.nm.Encode32(attr, false); encattr != "" {
  121. if err := tryPopulateNode(encattr, attr); err != nil {
  122. return nil, err
  123. }
  124. }
  125. }
  126. }
  127. // Set key and kind attributes
  128. if node != nil {
  129. node.SetAttr(data.NodeKey, key)
  130. node.SetAttr(data.NodeKind, kind)
  131. }
  132. return node, nil
  133. }
  134. /*
  135. StoreNode stores a single node in a partition of the graph. This function will
  136. overwrites any existing node.
  137. */
  138. func (gm *Manager) StoreNode(part string, node data.Node) error {
  139. return gm.storeOrUpdateNode(part, node, false)
  140. }
  141. /*
  142. UpdateNode updates a single node in a partition of the graph. This function will
  143. only update the given values of the node.
  144. */
  145. func (gm *Manager) UpdateNode(part string, node data.Node) error {
  146. return gm.storeOrUpdateNode(part, node, true)
  147. }
  148. /*
  149. storeOrUpdateNode stores or updates a single node in a partition of the graph.
  150. */
  151. func (gm *Manager) storeOrUpdateNode(part string, node data.Node, onlyUpdate bool) error {
  152. // Check if the node can be stored
  153. if err := gm.checkNode(node); err != nil {
  154. return err
  155. }
  156. // Get the HTrees which stores the node index and node
  157. iht, err := gm.getNodeIndexHTree(part, node.Kind(), true)
  158. if err != nil {
  159. return err
  160. }
  161. attht, valht, err := gm.getNodeStorageHTree(part, node.Kind(), true)
  162. if err != nil || attht == nil || valht == nil {
  163. return err
  164. }
  165. // Take writer lock
  166. gm.mutex.Lock()
  167. defer gm.mutex.Unlock()
  168. // Write the node to the datastore
  169. oldnode, err := gm.writeNode(node, onlyUpdate, attht, valht, nodeAttributeFilter)
  170. if err != nil {
  171. return err
  172. }
  173. // Increase node count if the node was inserted and write the changes
  174. // to the index.
  175. if oldnode == nil {
  176. currentCount := gm.NodeCount(node.Kind())
  177. if err := gm.writeNodeCount(node.Kind(), currentCount+1, true); err != nil {
  178. return err
  179. }
  180. if iht != nil {
  181. err := util.NewIndexManager(iht).Index(node.Key(), node.IndexMap())
  182. if err != nil {
  183. // The node was written at this point and the model is
  184. // consistent only the index is missing entries
  185. return err
  186. }
  187. }
  188. } else if iht != nil {
  189. err := util.NewIndexManager(iht).Reindex(node.Key(), node.IndexMap(),
  190. oldnode.IndexMap())
  191. if err != nil {
  192. // The node was written at this point and the model is
  193. // consistent only the index is missing entries
  194. return err
  195. }
  196. }
  197. // Execute rules
  198. trans := newInternalGraphTrans(gm)
  199. trans.subtrans = true
  200. var event int
  201. if oldnode == nil {
  202. event = EventNodeCreated
  203. } else {
  204. event = EventNodeUpdated
  205. }
  206. if err := gm.gr.graphEvent(trans, event, part, node, oldnode); err != nil {
  207. return err
  208. } else if err := trans.Commit(); err != nil {
  209. return err
  210. }
  211. // Flush changes - errors only reported on the actual node storage flush
  212. gm.gs.FlushMain()
  213. gm.flushNodeIndex(part, node.Kind())
  214. return gm.flushNodeStorage(part, node.Kind())
  215. }
  216. /*
  217. writeNode writes a given node in full or part to the datastore. It is assumed
  218. that the caller holds the writer lock before calling the functions and that,
  219. after the function returns, the changes are flushed to the storage. Returns
  220. the old node if an update occurred. An attribute filter can be speified to skip
  221. specific attributes.
  222. */
  223. func (gm *Manager) writeNode(node data.Node, onlyUpdate bool, attrTree *hash.HTree,
  224. valTree *hash.HTree, attFilter func(attr string) bool) (data.Node, error) {
  225. keyAttrs := PrefixNSAttrs + node.Key()
  226. keyAttrPrefix := PrefixNSAttr + node.Key()
  227. var oldnode data.Node
  228. var attrListOld interface{}
  229. var err error
  230. // Store the node attributes
  231. attrList := make([]string, 0, len(node.IndexMap()))
  232. attrMap := make(map[string]string)
  233. for attr, val := range node.Data() {
  234. // Ignore filtered attributes
  235. if attFilter(attr) {
  236. continue
  237. }
  238. encattr := gm.nm.Encode32(attr, true)
  239. // Build up a lookup map to identify which attribute exist
  240. attrMap[encattr] = ""
  241. // Build up new attributes list
  242. attrList = append(attrList, encattr)
  243. // Store the value in the datastore
  244. oldval, err := valTree.Put([]byte(keyAttrPrefix+encattr), val)
  245. if err != nil {
  246. return nil, &util.GraphError{Type: util.ErrWriting, Detail: err.Error()}
  247. }
  248. // Build up old node
  249. if oldval != nil {
  250. if oldnode == nil {
  251. oldnode = data.NewGraphNode()
  252. }
  253. oldnode.SetAttr(attr, oldval)
  254. }
  255. }
  256. if onlyUpdate {
  257. // If we do only an update write the attribute list only if we added
  258. // new attributes
  259. attrListOld, err = attrTree.Get([]byte(keyAttrs))
  260. if err != nil {
  261. return nil, &util.GraphError{Type: util.ErrReading, Detail: err.Error()}
  262. }
  263. if attrListOld != nil {
  264. // Fill up the attrMap with the existing attributes
  265. for _, encattr := range attrListOld.([]string) {
  266. attrMap[encattr] = ""
  267. }
  268. // Now check if we need to write the attribute list
  269. if len(attrListOld.([]string)) != len(attrMap) {
  270. // Store the new node attributes
  271. attrList = make([]string, 0, len(attrMap))
  272. for encattr := range attrMap {
  273. attrList = append(attrList, encattr)
  274. }
  275. attrListOld, err = attrTree.Put([]byte(keyAttrs), attrList)
  276. }
  277. } else {
  278. // We are actually doing an insert - just write the attribute list
  279. _, err = attrTree.Put([]byte(keyAttrs), attrList)
  280. }
  281. } else {
  282. // Store the new node attributes
  283. attrListOld, err = attrTree.Put([]byte(keyAttrs), attrList)
  284. }
  285. if err != nil {
  286. // Do not try cleanup in case we updated a node - we would do more
  287. // harm than good.
  288. return nil, &util.GraphError{Type: util.ErrWriting, Detail: err.Error()}
  289. }
  290. // Remove deleted keys
  291. if attrListOld != nil {
  292. // Create old node if non of the new attributes were on the old node
  293. if oldnode == nil {
  294. oldnode = data.NewGraphNode()
  295. }
  296. oldnode.SetAttr(data.NodeKey, node.Key())
  297. oldnode.SetAttr(data.NodeKind, node.Kind())
  298. for _, encattrold := range attrListOld.([]string) {
  299. if _, ok := attrMap[encattrold]; !ok {
  300. oldval, err := valTree.Remove([]byte(keyAttrPrefix + encattrold))
  301. if err != nil {
  302. return nil, &util.GraphError{Type: util.ErrWriting, Detail: err.Error()}
  303. }
  304. oldnode.SetAttr(gm.nm.Decode32(encattrold), oldval)
  305. }
  306. }
  307. return oldnode, nil
  308. }
  309. return nil, nil
  310. }
  311. /*
  312. RemoveNode removes a single node from a partition of the graph.
  313. */
  314. func (gm *Manager) RemoveNode(part string, key string, kind string) (data.Node, error) {
  315. // Get the HTree which stores the node index and node kind
  316. iht, err := gm.getNodeIndexHTree(part, kind, false)
  317. if err != nil {
  318. return nil, err
  319. }
  320. attTree, valTree, err := gm.getNodeStorageHTree(part, kind, false)
  321. if err != nil || attTree == nil || valTree == nil {
  322. return nil, err
  323. }
  324. // Take writer lock
  325. gm.mutex.Lock()
  326. defer gm.mutex.Unlock()
  327. // Delete the node from the datastore
  328. node, err := gm.deleteNode(key, kind, attTree, valTree)
  329. if err != nil {
  330. return node, err
  331. }
  332. // Update the index
  333. if node != nil {
  334. if iht != nil {
  335. err := util.NewIndexManager(iht).Deindex(key, node.IndexMap())
  336. if err != nil {
  337. return node, err
  338. }
  339. }
  340. // Decrease the node count
  341. currentCount := gm.NodeCount(kind)
  342. if err := gm.writeNodeCount(kind, currentCount-1, true); err != nil {
  343. return node, err
  344. }
  345. // Execute rules
  346. trans := newInternalGraphTrans(gm)
  347. trans.subtrans = true
  348. if err := gm.gr.graphEvent(trans, EventNodeDeleted, part, node); err != nil {
  349. return node, err
  350. } else if err := trans.Commit(); err != nil {
  351. return node, err
  352. }
  353. // Flush changes - errors only reported on the actual node storage flush
  354. gm.gs.FlushMain()
  355. gm.flushNodeIndex(part, kind)
  356. return node, gm.flushNodeStorage(part, kind)
  357. }
  358. return nil, nil
  359. }
  360. /*
  361. deleteNode deletes a given node from the datastore. It is assumed that the caller
  362. holds the writer lock before calling the functions and that, after the function
  363. returns, the changes are flushed to the storage. Returns the deleted node.
  364. */
  365. func (gm *Manager) deleteNode(key string, kind string, attrTree *hash.HTree,
  366. valTree *hash.HTree) (data.Node, error) {
  367. keyAttrs := PrefixNSAttrs + key
  368. keyAttrPrefix := PrefixNSAttr + key
  369. // Remove the attribute list entry
  370. attrList, err := attrTree.Remove([]byte(keyAttrs))
  371. if err != nil {
  372. return nil, &util.GraphError{Type: util.ErrWriting, Detail: err.Error()}
  373. } else if attrList == nil {
  374. return nil, nil
  375. }
  376. // Create the node object which is returned
  377. node := data.NewGraphNode()
  378. node.SetAttr(data.NodeKey, key)
  379. node.SetAttr(data.NodeKind, kind)
  380. // Remove node attributes
  381. for _, encattr := range attrList.([]string) {
  382. attr := gm.nm.Decode32(encattr)
  383. // Try to remove the attribute
  384. val, err := valTree.Remove([]byte(keyAttrPrefix + encattr))
  385. if err != nil {
  386. return node, &util.GraphError{Type: util.ErrWriting, Detail: err.Error()}
  387. }
  388. node.SetAttr(attr, val)
  389. }
  390. return node, nil
  391. }
  392. /*
  393. Default filter function to filter out system node attributes.
  394. */
  395. func nodeAttributeFilter(attr string) bool {
  396. return attr == data.NodeKey || attr == data.NodeKind
  397. }