graphmanager_nodes.go 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616
  1. /*
  2. * EliasDB
  3. *
  4. * Copyright 2016 Matthias Ladkau. All rights reserved.
  5. *
  6. * This Source Code Form is subject to the terms of the Mozilla Public
  7. * License, v. 2.0. If a copy of the MPL was not distributed with this
  8. * file, You can obtain one at http://mozilla.org/MPL/2.0/.
  9. */
  10. package graph
  11. import (
  12. "encoding/binary"
  13. "encoding/gob"
  14. "devt.de/krotik/eliasdb/graph/data"
  15. "devt.de/krotik/eliasdb/graph/util"
  16. "devt.de/krotik/eliasdb/hash"
  17. )
  18. func init() {
  19. // It is possible to store nested structures on nodes
  20. gob.Register(make(map[string]interface{}))
  21. }
  22. /*
  23. NodeCount returns the node count for a given node kind.
  24. */
  25. func (gm *Manager) NodeCount(kind string) uint64 {
  26. if val, ok := gm.gs.MainDB()[MainDBNodeCount+kind]; ok {
  27. return binary.LittleEndian.Uint64([]byte(val))
  28. }
  29. return 0
  30. }
  31. /*
  32. NodeKeyIterator iterates node keys of a certain kind.
  33. */
  34. func (gm *Manager) NodeKeyIterator(part string, kind string) (*NodeKeyIterator, error) {
  35. // Get the HTrees which stores the node
  36. tree, _, err := gm.getNodeStorageHTree(part, kind, false)
  37. if err != nil || tree == nil {
  38. return nil, err
  39. }
  40. it := hash.NewHTreeIterator(tree)
  41. if it.LastError != nil {
  42. return nil, &util.GraphError{
  43. Type: util.ErrReading,
  44. Detail: it.LastError.Error(),
  45. }
  46. }
  47. return &NodeKeyIterator{gm, it, nil}, nil
  48. }
  49. /*
  50. FetchNode fetches a single node from a partition of the graph.
  51. */
  52. func (gm *Manager) FetchNode(part string, key string, kind string) (data.Node, error) {
  53. return gm.FetchNodePart(part, key, kind, nil)
  54. }
  55. /*
  56. FetchNodePart fetches part of a single node from a partition of the graph.
  57. */
  58. func (gm *Manager) FetchNodePart(part string, key string, kind string,
  59. attrs []string) (data.Node, error) {
  60. // Get the HTrees which stores the node
  61. attht, valht, err := gm.getNodeStorageHTree(part, kind, false)
  62. if err != nil || attht == nil || valht == nil {
  63. return nil, err
  64. }
  65. // Take reader lock
  66. gm.mutex.RLock()
  67. defer gm.mutex.RUnlock()
  68. // Read the node from the datastore
  69. return gm.readNode(key, kind, attrs, attht, valht)
  70. }
  71. /*
  72. readNode reads a given node from the datastore.
  73. */
  74. func (gm *Manager) readNode(key string, kind string, attrs []string,
  75. attrTree *hash.HTree, valTree *hash.HTree) (data.Node, error) {
  76. keyAttrs := PrefixNSAttrs + key
  77. keyAttrPrefix := PrefixNSAttr + key
  78. // Check if the node exists
  79. attrList, err := attrTree.Get([]byte(keyAttrs))
  80. if err != nil {
  81. return nil, &util.GraphError{Type: util.ErrReading, Detail: err.Error()}
  82. } else if attrList == nil {
  83. return nil, nil
  84. }
  85. var node data.Node
  86. tryPopulateNode := func(encattr string, attr string) error {
  87. // Try to lookup the attribute
  88. val, err := valTree.Get([]byte(keyAttrPrefix + encattr))
  89. if err != nil {
  90. return &util.GraphError{Type: util.ErrReading, Detail: err.Error()}
  91. }
  92. if val != nil {
  93. if node == nil {
  94. node = data.NewGraphNode()
  95. }
  96. node.SetAttr(attr, val)
  97. }
  98. return nil
  99. }
  100. if len(attrs) == 0 {
  101. // Allways create a node if we fetch all attributes
  102. node = data.NewGraphNode()
  103. // Lookup all attributes
  104. for _, encattr := range attrList.([]string) {
  105. attr := gm.nm.Decode32(encattr)
  106. if err := tryPopulateNode(encattr, attr); err != nil {
  107. return nil, err
  108. }
  109. }
  110. } else {
  111. // Lookup the given attributes - it is assumed that most of the time the
  112. // queried attributes do exist
  113. for _, attr := range attrs {
  114. if (attr == data.NodeKey || attr == data.NodeKind) && node == nil {
  115. // Create node - we might only query for node key or node kind
  116. node = data.NewGraphNode()
  117. continue
  118. }
  119. // Only try to populate the attribute if it can be decoded
  120. if encattr := gm.nm.Encode32(attr, false); encattr != "" {
  121. if err := tryPopulateNode(encattr, attr); err != nil {
  122. return nil, err
  123. }
  124. }
  125. }
  126. }
  127. // Set key and kind attributes
  128. if node != nil {
  129. node.SetAttr(data.NodeKey, key)
  130. node.SetAttr(data.NodeKind, kind)
  131. }
  132. return node, nil
  133. }
  134. /*
  135. StoreNode stores a single node in a partition of the graph. This function will
  136. overwrites any existing node.
  137. */
  138. func (gm *Manager) StoreNode(part string, node data.Node) error {
  139. trans := newInternalGraphTrans(gm)
  140. trans.subtrans = true
  141. err := gm.gr.graphEvent(trans, EventNodeStore, part, node)
  142. if err != nil {
  143. if err == ErrEventHandled {
  144. err = nil
  145. }
  146. return err
  147. }
  148. if err = trans.Commit(); err == nil {
  149. err = gm.storeOrUpdateNode(part, node, false)
  150. }
  151. return err
  152. }
  153. /*
  154. UpdateNode updates a single node in a partition of the graph. This function will
  155. only update the given values of the node.
  156. */
  157. func (gm *Manager) UpdateNode(part string, node data.Node) error {
  158. trans := newInternalGraphTrans(gm)
  159. trans.subtrans = true
  160. err := gm.gr.graphEvent(trans, EventNodeUpdate, part, node)
  161. if err != nil {
  162. if err == ErrEventHandled {
  163. err = nil
  164. }
  165. return err
  166. }
  167. if err = trans.Commit(); err == nil {
  168. err = gm.storeOrUpdateNode(part, node, true)
  169. }
  170. return err
  171. }
  172. /*
  173. storeOrUpdateNode stores or updates a single node in a partition of the graph.
  174. */
  175. func (gm *Manager) storeOrUpdateNode(part string, node data.Node, onlyUpdate bool) error {
  176. // Check if the node can be stored
  177. if err := gm.checkNode(node); err != nil {
  178. return err
  179. }
  180. // Get the HTrees which stores the node index and node
  181. iht, err := gm.getNodeIndexHTree(part, node.Kind(), true)
  182. if err != nil {
  183. return err
  184. }
  185. attht, valht, err := gm.getNodeStorageHTree(part, node.Kind(), true)
  186. if err != nil || attht == nil || valht == nil {
  187. return err
  188. }
  189. // Take writer lock
  190. gm.mutex.Lock()
  191. defer gm.mutex.Unlock()
  192. // Write the node to the datastore
  193. oldnode, err := gm.writeNode(node, onlyUpdate, attht, valht, nodeAttributeFilter)
  194. if err != nil {
  195. return err
  196. }
  197. // Increase node count if the node was inserted and write the changes
  198. // to the index.
  199. if oldnode == nil {
  200. currentCount := gm.NodeCount(node.Kind())
  201. if err := gm.writeNodeCount(node.Kind(), currentCount+1, true); err != nil {
  202. return err
  203. }
  204. if iht != nil {
  205. err := util.NewIndexManager(iht).Index(node.Key(), node.IndexMap())
  206. if err != nil {
  207. // The node was written at this point and the model is
  208. // consistent only the index is missing entries
  209. return err
  210. }
  211. }
  212. } else if iht != nil {
  213. err := util.NewIndexManager(iht).Reindex(node.Key(), node.IndexMap(),
  214. oldnode.IndexMap())
  215. if err != nil {
  216. // The node was written at this point and the model is
  217. // consistent only the index is missing entries
  218. return err
  219. }
  220. }
  221. defer func() {
  222. // Flush changes
  223. gm.gs.FlushMain()
  224. gm.flushNodeIndex(part, node.Kind())
  225. gm.flushNodeStorage(part, node.Kind())
  226. }()
  227. // Execute rules
  228. trans := newInternalGraphTrans(gm)
  229. trans.subtrans = true
  230. var event int
  231. if oldnode == nil {
  232. event = EventNodeCreated
  233. } else {
  234. event = EventNodeUpdated
  235. }
  236. if err := gm.gr.graphEvent(trans, event, part, node, oldnode); err != nil && err != ErrEventHandled {
  237. return err
  238. } else if err := trans.Commit(); err != nil {
  239. return err
  240. }
  241. return nil
  242. }
  243. /*
  244. writeNode writes a given node in full or part to the datastore. It is assumed
  245. that the caller holds the writer lock before calling the functions and that,
  246. after the function returns, the changes are flushed to the storage. Returns
  247. the old node if an update occurred. An attribute filter can be speified to skip
  248. specific attributes.
  249. */
  250. func (gm *Manager) writeNode(node data.Node, onlyUpdate bool, attrTree *hash.HTree,
  251. valTree *hash.HTree, attFilter func(attr string) bool) (data.Node, error) {
  252. keyAttrs := PrefixNSAttrs + node.Key()
  253. keyAttrPrefix := PrefixNSAttr + node.Key()
  254. var oldnode data.Node
  255. var attrListOld interface{}
  256. var err error
  257. // Store the node attributes
  258. attrList := make([]string, 0, len(node.IndexMap()))
  259. attrMap := make(map[string]string)
  260. for attr, val := range node.Data() {
  261. // Ignore filtered attributes
  262. if attFilter(attr) {
  263. continue
  264. }
  265. encattr := gm.nm.Encode32(attr, true)
  266. // Build up a lookup map to identify which attribute exist
  267. attrMap[encattr] = ""
  268. // Build up new attributes list
  269. attrList = append(attrList, encattr)
  270. // Store the value in the datastore
  271. oldval, err := valTree.Put([]byte(keyAttrPrefix+encattr), val)
  272. if err != nil {
  273. return nil, &util.GraphError{Type: util.ErrWriting, Detail: err.Error()}
  274. }
  275. // Build up old node
  276. if oldval != nil {
  277. if oldnode == nil {
  278. oldnode = data.NewGraphNode()
  279. }
  280. oldnode.SetAttr(attr, oldval)
  281. }
  282. }
  283. if onlyUpdate {
  284. // If we do only an update write the attribute list only if we added
  285. // new attributes
  286. attrListOld, err = attrTree.Get([]byte(keyAttrs))
  287. if err != nil {
  288. return nil, &util.GraphError{Type: util.ErrReading, Detail: err.Error()}
  289. }
  290. if attrListOld != nil {
  291. // Fill up the attrMap with the existing attributes
  292. for _, encattr := range attrListOld.([]string) {
  293. attrMap[encattr] = ""
  294. }
  295. // Now check if we need to write the attribute list
  296. if len(attrListOld.([]string)) != len(attrMap) {
  297. // Store the new node attributes
  298. attrList = make([]string, 0, len(attrMap))
  299. for encattr := range attrMap {
  300. attrList = append(attrList, encattr)
  301. }
  302. attrListOld, err = attrTree.Put([]byte(keyAttrs), attrList)
  303. }
  304. } else {
  305. // We are actually doing an insert - just write the attribute list
  306. _, err = attrTree.Put([]byte(keyAttrs), attrList)
  307. }
  308. } else {
  309. // Store the new node attributes
  310. attrListOld, err = attrTree.Put([]byte(keyAttrs), attrList)
  311. }
  312. if err != nil {
  313. // Do not try cleanup in case we updated a node - we would do more
  314. // harm than good.
  315. return nil, &util.GraphError{Type: util.ErrWriting, Detail: err.Error()}
  316. }
  317. // Remove deleted keys
  318. if attrListOld != nil {
  319. // Create old node if non of the new attributes were on the old node
  320. if oldnode == nil {
  321. oldnode = data.NewGraphNode()
  322. }
  323. oldnode.SetAttr(data.NodeKey, node.Key())
  324. oldnode.SetAttr(data.NodeKind, node.Kind())
  325. for _, encattrold := range attrListOld.([]string) {
  326. if _, ok := attrMap[encattrold]; !ok {
  327. oldval, err := valTree.Remove([]byte(keyAttrPrefix + encattrold))
  328. if err != nil {
  329. return nil, &util.GraphError{Type: util.ErrWriting, Detail: err.Error()}
  330. }
  331. oldnode.SetAttr(gm.nm.Decode32(encattrold), oldval)
  332. }
  333. }
  334. return oldnode, nil
  335. }
  336. return nil, nil
  337. }
  338. /*
  339. RemoveNode removes a single node from a partition of the graph.
  340. */
  341. func (gm *Manager) RemoveNode(part string, key string, kind string) (data.Node, error) {
  342. var err error
  343. trans := newInternalGraphTrans(gm)
  344. trans.subtrans = true
  345. if err = gm.gr.graphEvent(trans, EventNodeDelete, part, key, kind); err != nil {
  346. if err == ErrEventHandled {
  347. err = nil
  348. }
  349. return nil, err
  350. }
  351. err = trans.Commit()
  352. if err == nil {
  353. // Get the HTree which stores the node index and node kind
  354. iht, err := gm.getNodeIndexHTree(part, kind, false)
  355. if err != nil {
  356. return nil, err
  357. }
  358. attTree, valTree, err := gm.getNodeStorageHTree(part, kind, false)
  359. if err != nil || attTree == nil || valTree == nil {
  360. return nil, err
  361. }
  362. // Take writer lock
  363. gm.mutex.Lock()
  364. defer gm.mutex.Unlock()
  365. // Delete the node from the datastore
  366. node, err := gm.deleteNode(key, kind, attTree, valTree)
  367. if err != nil {
  368. return node, err
  369. }
  370. // Update the index
  371. if node != nil {
  372. if iht != nil {
  373. err := util.NewIndexManager(iht).Deindex(key, node.IndexMap())
  374. if err != nil {
  375. return node, err
  376. }
  377. }
  378. // Decrease the node count
  379. currentCount := gm.NodeCount(kind)
  380. if err := gm.writeNodeCount(kind, currentCount-1, true); err != nil {
  381. return node, err
  382. }
  383. defer func() {
  384. // Flush changes
  385. gm.gs.FlushMain()
  386. gm.flushNodeIndex(part, kind)
  387. gm.flushNodeStorage(part, kind)
  388. }()
  389. // Execute rules
  390. trans := newInternalGraphTrans(gm)
  391. trans.subtrans = true
  392. if err := gm.gr.graphEvent(trans, EventNodeDeleted, part, node); err != nil && err != ErrEventHandled {
  393. return node, err
  394. } else if err := trans.Commit(); err != nil {
  395. return node, err
  396. }
  397. return node, nil
  398. }
  399. }
  400. return nil, err
  401. }
  402. /*
  403. deleteNode deletes a given node from the datastore. It is assumed that the caller
  404. holds the writer lock before calling the functions and that, after the function
  405. returns, the changes are flushed to the storage. Returns the deleted node.
  406. */
  407. func (gm *Manager) deleteNode(key string, kind string, attrTree *hash.HTree,
  408. valTree *hash.HTree) (data.Node, error) {
  409. keyAttrs := PrefixNSAttrs + key
  410. keyAttrPrefix := PrefixNSAttr + key
  411. // Remove the attribute list entry
  412. attrList, err := attrTree.Remove([]byte(keyAttrs))
  413. if err != nil {
  414. return nil, &util.GraphError{Type: util.ErrWriting, Detail: err.Error()}
  415. } else if attrList == nil {
  416. return nil, nil
  417. }
  418. // Create the node object which is returned
  419. node := data.NewGraphNode()
  420. node.SetAttr(data.NodeKey, key)
  421. node.SetAttr(data.NodeKind, kind)
  422. // Remove node attributes
  423. for _, encattr := range attrList.([]string) {
  424. attr := gm.nm.Decode32(encattr)
  425. // Try to remove the attribute
  426. val, err := valTree.Remove([]byte(keyAttrPrefix + encattr))
  427. if err != nil {
  428. return node, &util.GraphError{Type: util.ErrWriting, Detail: err.Error()}
  429. }
  430. node.SetAttr(attr, val)
  431. }
  432. return node, nil
  433. }
  434. /*
  435. Default filter function to filter out system node attributes.
  436. */
  437. func nodeAttributeFilter(attr string) bool {
  438. return attr == data.NodeKey || attr == data.NodeKind
  439. }