123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557 |
- /*
- * EliasDB
- *
- * Copyright 2016 Matthias Ladkau. All rights reserved.
- *
- * This Source Code Form is subject to the terms of the Mozilla Public
- * License, v. 2.0. If a copy of the MPL was not distributed with this
- * file, You can obtain one at http://mozilla.org/MPL/2.0/.
- */
- package graph
- import (
- "encoding/binary"
- "encoding/gob"
- "devt.de/krotik/eliasdb/graph/data"
- "devt.de/krotik/eliasdb/graph/util"
- "devt.de/krotik/eliasdb/hash"
- )
- func init() {
- // It is possible to store nested structures on nodes
- gob.Register(make(map[string]interface{}))
- }
- /*
- NodeCount returns the node count for a given node kind.
- */
- func (gm *Manager) NodeCount(kind string) uint64 {
- if val, ok := gm.gs.MainDB()[MainDBNodeCount+kind]; ok {
- return binary.LittleEndian.Uint64([]byte(val))
- }
- return 0
- }
- /*
- NodeKeyIterator iterates node keys of a certain kind.
- */
- func (gm *Manager) NodeKeyIterator(part string, kind string) (*NodeKeyIterator, error) {
- // Get the HTrees which stores the node
- tree, _, err := gm.getNodeStorageHTree(part, kind, false)
- if err != nil || tree == nil {
- return nil, err
- }
- it := hash.NewHTreeIterator(tree)
- if it.LastError != nil {
- return nil, &util.GraphError{
- Type: util.ErrReading,
- Detail: it.LastError.Error(),
- }
- }
- return &NodeKeyIterator{gm, it, nil}, nil
- }
- /*
- FetchNode fetches a single node from a partition of the graph.
- */
- func (gm *Manager) FetchNode(part string, key string, kind string) (data.Node, error) {
- return gm.FetchNodePart(part, key, kind, nil)
- }
- /*
- FetchNodePart fetches part of a single node from a partition of the graph.
- */
- func (gm *Manager) FetchNodePart(part string, key string, kind string,
- attrs []string) (data.Node, error) {
- // Get the HTrees which stores the node
- attht, valht, err := gm.getNodeStorageHTree(part, kind, false)
- if err != nil || attht == nil || valht == nil {
- return nil, err
- }
- // Take reader lock
- gm.mutex.RLock()
- defer gm.mutex.RUnlock()
- // Read the node from the datastore
- return gm.readNode(key, kind, attrs, attht, valht)
- }
- /*
- readNode reads a given node from the datastore.
- */
- func (gm *Manager) readNode(key string, kind string, attrs []string,
- attrTree *hash.HTree, valTree *hash.HTree) (data.Node, error) {
- keyAttrs := PrefixNSAttrs + key
- keyAttrPrefix := PrefixNSAttr + key
- // Check if the node exists
- attrList, err := attrTree.Get([]byte(keyAttrs))
- if err != nil {
- return nil, &util.GraphError{Type: util.ErrReading, Detail: err.Error()}
- } else if attrList == nil {
- return nil, nil
- }
- var node data.Node
- tryPopulateNode := func(encattr string, attr string) error {
- // Try to lookup the attribute
- val, err := valTree.Get([]byte(keyAttrPrefix + encattr))
- if err != nil {
- return &util.GraphError{Type: util.ErrReading, Detail: err.Error()}
- }
- if val != nil {
- if node == nil {
- node = data.NewGraphNode()
- }
- node.SetAttr(attr, val)
- }
- return nil
- }
- if len(attrs) == 0 {
- // Allways create a node if we fetch all attributes
- node = data.NewGraphNode()
- // Lookup all attributes
- for _, encattr := range attrList.([]string) {
- attr := gm.nm.Decode32(encattr)
- if err := tryPopulateNode(encattr, attr); err != nil {
- return nil, err
- }
- }
- } else {
- // Lookup the given attributes - it is assumed that most of the time the
- // queried attributes do exist
- for _, attr := range attrs {
- if (attr == data.NodeKey || attr == data.NodeKind) && node == nil {
- // Create node - we might only query for node key or node kind
- node = data.NewGraphNode()
- continue
- }
- // Only try to populate the attribute if it can be decoded
- if encattr := gm.nm.Encode32(attr, false); encattr != "" {
- if err := tryPopulateNode(encattr, attr); err != nil {
- return nil, err
- }
- }
- }
- }
- // Set key and kind attributes
- if node != nil {
- node.SetAttr(data.NodeKey, key)
- node.SetAttr(data.NodeKind, kind)
- }
- return node, nil
- }
- /*
- StoreNode stores a single node in a partition of the graph. This function will
- overwrites any existing node.
- */
- func (gm *Manager) StoreNode(part string, node data.Node) error {
- return gm.storeOrUpdateNode(part, node, false)
- }
- /*
- UpdateNode updates a single node in a partition of the graph. This function will
- only update the given values of the node.
- */
- func (gm *Manager) UpdateNode(part string, node data.Node) error {
- return gm.storeOrUpdateNode(part, node, true)
- }
- /*
- storeOrUpdateNode stores or updates a single node in a partition of the graph.
- */
- func (gm *Manager) storeOrUpdateNode(part string, node data.Node, onlyUpdate bool) error {
- // Check if the node can be stored
- if err := gm.checkNode(node); err != nil {
- return err
- }
- // Get the HTrees which stores the node index and node
- iht, err := gm.getNodeIndexHTree(part, node.Kind(), true)
- if err != nil {
- return err
- }
- attht, valht, err := gm.getNodeStorageHTree(part, node.Kind(), true)
- if err != nil || attht == nil || valht == nil {
- return err
- }
- // Take writer lock
- gm.mutex.Lock()
- defer gm.mutex.Unlock()
- // Write the node to the datastore
- oldnode, err := gm.writeNode(node, onlyUpdate, attht, valht, nodeAttributeFilter)
- if err != nil {
- return err
- }
- // Increase node count if the node was inserted and write the changes
- // to the index.
- if oldnode == nil {
- currentCount := gm.NodeCount(node.Kind())
- if err := gm.writeNodeCount(node.Kind(), currentCount+1, true); err != nil {
- return err
- }
- if iht != nil {
- err := util.NewIndexManager(iht).Index(node.Key(), node.IndexMap())
- if err != nil {
- // The node was written at this point and the model is
- // consistent only the index is missing entries
- return err
- }
- }
- } else if iht != nil {
- err := util.NewIndexManager(iht).Reindex(node.Key(), node.IndexMap(),
- oldnode.IndexMap())
- if err != nil {
- // The node was written at this point and the model is
- // consistent only the index is missing entries
- return err
- }
- }
- // Execute rules
- trans := newInternalGraphTrans(gm)
- trans.subtrans = true
- var event int
- if oldnode == nil {
- event = EventNodeCreated
- } else {
- event = EventNodeUpdated
- }
- if err := gm.gr.graphEvent(trans, event, part, node, oldnode); err != nil {
- return err
- } else if err := trans.Commit(); err != nil {
- return err
- }
- // Flush changes - errors only reported on the actual node storage flush
- gm.gs.FlushMain()
- gm.flushNodeIndex(part, node.Kind())
- return gm.flushNodeStorage(part, node.Kind())
- }
- /*
- writeNode writes a given node in full or part to the datastore. It is assumed
- that the caller holds the writer lock before calling the functions and that,
- after the function returns, the changes are flushed to the storage. Returns
- the old node if an update occurred. An attribute filter can be speified to skip
- specific attributes.
- */
- func (gm *Manager) writeNode(node data.Node, onlyUpdate bool, attrTree *hash.HTree,
- valTree *hash.HTree, attFilter func(attr string) bool) (data.Node, error) {
- keyAttrs := PrefixNSAttrs + node.Key()
- keyAttrPrefix := PrefixNSAttr + node.Key()
- var oldnode data.Node
- var attrListOld interface{}
- var err error
- // Store the node attributes
- attrList := make([]string, 0, len(node.IndexMap()))
- attrMap := make(map[string]string)
- for attr, val := range node.Data() {
- // Ignore filtered attributes
- if attFilter(attr) {
- continue
- }
- encattr := gm.nm.Encode32(attr, true)
- // Build up a lookup map to identify which attribute exist
- attrMap[encattr] = ""
- // Build up new attributes list
- attrList = append(attrList, encattr)
- // Store the value in the datastore
- oldval, err := valTree.Put([]byte(keyAttrPrefix+encattr), val)
- if err != nil {
- return nil, &util.GraphError{Type: util.ErrWriting, Detail: err.Error()}
- }
- // Build up old node
- if oldval != nil {
- if oldnode == nil {
- oldnode = data.NewGraphNode()
- }
- oldnode.SetAttr(attr, oldval)
- }
- }
- if onlyUpdate {
- // If we do only an update write the attribute list only if we added
- // new attributes
- attrListOld, err = attrTree.Get([]byte(keyAttrs))
- if err != nil {
- return nil, &util.GraphError{Type: util.ErrReading, Detail: err.Error()}
- }
- if attrListOld != nil {
- // Fill up the attrMap with the existing attributes
- for _, encattr := range attrListOld.([]string) {
- attrMap[encattr] = ""
- }
- // Now check if we need to write the attribute list
- if len(attrListOld.([]string)) != len(attrMap) {
- // Store the new node attributes
- attrList = make([]string, 0, len(attrMap))
- for encattr := range attrMap {
- attrList = append(attrList, encattr)
- }
- attrListOld, err = attrTree.Put([]byte(keyAttrs), attrList)
- }
- } else {
- // We are actually doing an insert - just write the attribute list
- _, err = attrTree.Put([]byte(keyAttrs), attrList)
- }
- } else {
- // Store the new node attributes
- attrListOld, err = attrTree.Put([]byte(keyAttrs), attrList)
- }
- if err != nil {
- // Do not try cleanup in case we updated a node - we would do more
- // harm than good.
- return nil, &util.GraphError{Type: util.ErrWriting, Detail: err.Error()}
- }
- // Remove deleted keys
- if attrListOld != nil {
- // Create old node if non of the new attributes were on the old node
- if oldnode == nil {
- oldnode = data.NewGraphNode()
- }
- oldnode.SetAttr(data.NodeKey, node.Key())
- oldnode.SetAttr(data.NodeKind, node.Kind())
- for _, encattrold := range attrListOld.([]string) {
- if _, ok := attrMap[encattrold]; !ok {
- oldval, err := valTree.Remove([]byte(keyAttrPrefix + encattrold))
- if err != nil {
- return nil, &util.GraphError{Type: util.ErrWriting, Detail: err.Error()}
- }
- oldnode.SetAttr(gm.nm.Decode32(encattrold), oldval)
- }
- }
- return oldnode, nil
- }
- return nil, nil
- }
- /*
- RemoveNode removes a single node from a partition of the graph.
- */
- func (gm *Manager) RemoveNode(part string, key string, kind string) (data.Node, error) {
- // Get the HTree which stores the node index and node kind
- iht, err := gm.getNodeIndexHTree(part, kind, false)
- if err != nil {
- return nil, err
- }
- attTree, valTree, err := gm.getNodeStorageHTree(part, kind, false)
- if err != nil || attTree == nil || valTree == nil {
- return nil, err
- }
- // Take writer lock
- gm.mutex.Lock()
- defer gm.mutex.Unlock()
- // Delete the node from the datastore
- node, err := gm.deleteNode(key, kind, attTree, valTree)
- if err != nil {
- return node, err
- }
- // Update the index
- if node != nil {
- if iht != nil {
- err := util.NewIndexManager(iht).Deindex(key, node.IndexMap())
- if err != nil {
- return node, err
- }
- }
- // Decrease the node count
- currentCount := gm.NodeCount(kind)
- if err := gm.writeNodeCount(kind, currentCount-1, true); err != nil {
- return node, err
- }
- // Execute rules
- trans := newInternalGraphTrans(gm)
- trans.subtrans = true
- if err := gm.gr.graphEvent(trans, EventNodeDeleted, part, node); err != nil {
- return node, err
- } else if err := trans.Commit(); err != nil {
- return node, err
- }
- // Flush changes - errors only reported on the actual node storage flush
- gm.gs.FlushMain()
- gm.flushNodeIndex(part, kind)
- return node, gm.flushNodeStorage(part, kind)
- }
- return nil, nil
- }
- /*
- deleteNode deletes a given node from the datastore. It is assumed that the caller
- holds the writer lock before calling the functions and that, after the function
- returns, the changes are flushed to the storage. Returns the deleted node.
- */
- func (gm *Manager) deleteNode(key string, kind string, attrTree *hash.HTree,
- valTree *hash.HTree) (data.Node, error) {
- keyAttrs := PrefixNSAttrs + key
- keyAttrPrefix := PrefixNSAttr + key
- // Remove the attribute list entry
- attrList, err := attrTree.Remove([]byte(keyAttrs))
- if err != nil {
- return nil, &util.GraphError{Type: util.ErrWriting, Detail: err.Error()}
- } else if attrList == nil {
- return nil, nil
- }
- // Create the node object which is returned
- node := data.NewGraphNode()
- node.SetAttr(data.NodeKey, key)
- node.SetAttr(data.NodeKind, kind)
- // Remove node attributes
- for _, encattr := range attrList.([]string) {
- attr := gm.nm.Decode32(encattr)
- // Try to remove the attribute
- val, err := valTree.Remove([]byte(keyAttrPrefix + encattr))
- if err != nil {
- return node, &util.GraphError{Type: util.ErrWriting, Detail: err.Error()}
- }
- node.SetAttr(attr, val)
- }
- return node, nil
- }
- /*
- Default filter function to filter out system node attributes.
- */
- func nodeAttributeFilter(attr string) bool {
- return attr == data.NodeKey || attr == data.NodeKind
- }
|