|
- /*
- * EliasDB
- *
- * Copyright 2016 Matthias Ladkau. All rights reserved.
- *
- * This Source Code Form is subject to the terms of the Mozilla Public
- * License, v. 2.0. If a copy of the MPL was not distributed with this
- * file, You can obtain one at http://mozilla.org/MPL/2.0/.
- */
- package graph
- import (
- "encoding/binary"
- "encoding/gob"
- "fmt"
- "sort"
- "strings"
- "devt.de/krotik/eliasdb/graph/data"
- "devt.de/krotik/eliasdb/graph/util"
- "devt.de/krotik/eliasdb/hash"
- )
- /*
- edgeTargetInfo is an internal structure which stores edge information
- */
- type edgeTargetInfo struct {
- CascadeToTarget bool // Flag if delete operations should be cascaded to the target
- CascadeLastToTarget bool // Flag if delete operations should be cascaded to the target after the last edge was deleted
- CascadeFromTarget bool // Flag if delete operations should be cascaded from the target after the last edge was deleted
- CascadeLastFromTarget bool // Flag if delete operations should be cascaded from the target
- TargetNodeKey string // Key of the target node
- TargetNodeKind string // Kind of the target ndoe
- }
- func init() {
- // Make sure we can use the relevant types in a gob operation
- gob.Register(make(map[string]string))
- gob.Register(make(map[string]*edgeTargetInfo))
- gob.Register(&edgeTargetInfo{})
- }
- /*
- EdgeCount returns the edge count for a given edge kind.
- */
- func (gm *Manager) EdgeCount(kind string) uint64 {
- if val, ok := gm.gs.MainDB()[MainDBEdgeCount+kind]; ok {
- return binary.LittleEndian.Uint64([]byte(val))
- }
- return 0
- }
- /*
- FetchNodeEdgeSpecs returns all possible edge specs for a certain node.
- */
- func (gm *Manager) FetchNodeEdgeSpecs(part string, key string, kind string) ([]string, error) {
- _, tree, err := gm.getNodeStorageHTree(part, kind, false)
- if err != nil || tree == nil {
- return nil, err
- }
- // Take reader lock
- gm.mutex.RLock()
- defer gm.mutex.RUnlock()
- specsNodeKey := PrefixNSSpecs + key
- obj, err := tree.Get([]byte(specsNodeKey))
- if err != nil {
- return nil, &util.GraphError{Type: util.ErrReading, Detail: err.Error()}
- } else if obj == nil {
- return nil, nil
- }
- specsNodeMap := obj.(map[string]string)
- specsNode := make([]string, 0, len(specsNodeMap))
- for spec := range specsNodeMap {
- role1 := gm.nm.Decode16(spec[:2])
- relKind := gm.nm.Decode16(spec[2:4])
- role2 := gm.nm.Decode16(spec[4:6])
- end2Kind := gm.nm.Decode16(spec[6:])
- specsNode = append(specsNode,
- role1+":"+relKind+":"+role2+":"+end2Kind)
- }
- // Ensure the output is deterministic
- sort.StringSlice(specsNode).Sort()
- return specsNode, nil
- }
- /*
- TraverseMulti traverses from a given node to other nodes following a given
- partial edge spec. Since the edge spec can be partial it is possible to
- traverse multiple edge kinds. A spec with the value ":::" would follow
- all relationships. The last parameter allData specifies if all data
- should be retrieved for the connected nodes and edges. If set to false only
- the minimal set of attributes will be populated.
- */
- func (gm *Manager) TraverseMulti(part string, key string, kind string,
- spec string, allData bool) ([]data.Node, []data.Edge, error) {
- sspec := strings.Split(spec, ":")
- if len(sspec) != 4 {
- return nil, nil, &util.GraphError{Type: util.ErrInvalidData, Detail: "Invalid spec: " + spec}
- } else if IsFullSpec(spec) {
- return gm.Traverse(part, key, kind, spec, allData)
- }
- // Get all specs for the given node
- specs, err := gm.FetchNodeEdgeSpecs(part, key, kind)
- if err != nil || specs == nil {
- return nil, nil, err
- }
- matchSpec := func(spec string) bool {
- mspec := strings.Split(spec, ":")
- // Check spec components
- if (sspec[0] != "" && mspec[0] != sspec[0]) ||
- (sspec[1] != "" && mspec[1] != sspec[1]) ||
- (sspec[2] != "" && mspec[2] != sspec[2]) ||
- (sspec[3] != "" && mspec[3] != sspec[3]) {
- return false
- }
- return true
- }
- // Match specs and collect the results
- var nodes []data.Node
- var edges []data.Edge
- for _, rspec := range specs {
- if spec == ":::" || matchSpec(rspec) {
- sn, se, err := gm.Traverse(part, key, kind, rspec, allData)
- if err != nil {
- return nil, nil, err
- }
- nodes = append(nodes, sn...)
- edges = append(edges, se...)
- }
- }
- return nodes, edges, nil
- }
- /*
- Traverse traverses from a given node to other nodes following a given edge spec.
- The last parameter allData specifies if all data should be retrieved for
- the connected nodes and edges. If set to false only the minimal set of
- attributes will be populated.
- */
- func (gm *Manager) Traverse(part string, key string, kind string,
- spec string, allData bool) ([]data.Node, []data.Edge, error) {
- _, tree, err := gm.getNodeStorageHTree(part, kind, false)
- if err != nil || tree == nil {
- return nil, nil, err
- }
- // Take reader lock
- gm.mutex.RLock()
- defer gm.mutex.RUnlock()
- sspec := strings.Split(spec, ":")
- if len(sspec) != 4 {
- return nil, nil, &util.GraphError{Type: util.ErrInvalidData, Detail: "Invalid spec: " + spec}
- } else if !IsFullSpec(spec) {
- return nil, nil, &util.GraphError{Type: util.ErrInvalidData, Detail: "Invalid spec: " + spec +
- " - spec needs to be fully specified for direct traversal"}
- }
- encspec := gm.nm.Encode16(sspec[0], false) + gm.nm.Encode16(sspec[1], false) +
- gm.nm.Encode16(sspec[2], false) + gm.nm.Encode16(sspec[3], false)
- edgeInfoKey := PrefixNSEdge + key + encspec
- // Lookup the target map containing edgeTargetInfo objects
- obj, err := tree.Get([]byte(edgeInfoKey))
- if err != nil || obj == nil {
- return nil, nil, err
- }
- targetMap := obj.(map[string]*edgeTargetInfo)
- nodes := make([]data.Node, 0, len(targetMap))
- edges := make([]data.Edge, 0, len(targetMap))
- if !allData {
- // Populate nodes and edges with the minimal set of attributes
- // no further lookups required
- for k, v := range targetMap {
- edge := data.NewGraphEdge()
- edge.SetAttr(data.NodeKey, k)
- edge.SetAttr(data.NodeKind, sspec[1])
- edge.SetAttr(data.EdgeEnd1Key, key)
- edge.SetAttr(data.EdgeEnd1Kind, kind)
- edge.SetAttr(data.EdgeEnd1Role, sspec[0])
- edge.SetAttr(data.EdgeEnd1Cascading, v.CascadeToTarget)
- edge.SetAttr(data.EdgeEnd1CascadingLast, v.CascadeLastToTarget)
- edge.SetAttr(data.EdgeEnd2Key, v.TargetNodeKey)
- edge.SetAttr(data.EdgeEnd2Kind, v.TargetNodeKind)
- edge.SetAttr(data.EdgeEnd2Role, sspec[2])
- edge.SetAttr(data.EdgeEnd2Cascading, v.CascadeFromTarget)
- edge.SetAttr(data.EdgeEnd2CascadingLast, v.CascadeLastFromTarget)
- edges = append(edges, edge)
- node := data.NewGraphNode()
- node.SetAttr(data.NodeKey, v.TargetNodeKey)
- node.SetAttr(data.NodeKind, v.TargetNodeKind)
- nodes = append(nodes, node)
- }
- } else {
- // Get the HTrees which stores the edges
- edgeht, err := gm.getEdgeStorageHTree(part, sspec[1], false)
- if err != nil || edgeht == nil {
- return nil, nil, err
- }
- for k, v := range targetMap {
- // Read the edge from the datastore
- edgenode, err := gm.readNode(k, sspec[1], nil, edgeht, edgeht)
- if err != nil || edgenode == nil {
- return nil, nil, err
- }
- edge := data.NewGraphEdgeFromNode(edgenode)
- // Exchange ends if necessary
- if edge.End2Key() == key && edge.End2Kind() == kind {
- swap := func(attr1 string, attr2 string) {
- tmp := edge.Attr(attr1)
- edge.SetAttr(attr1, edge.Attr(attr2))
- edge.SetAttr(attr2, tmp)
- }
- swap(data.EdgeEnd1Key, data.EdgeEnd2Key)
- swap(data.EdgeEnd1Kind, data.EdgeEnd2Kind)
- swap(data.EdgeEnd1Role, data.EdgeEnd2Role)
- swap(data.EdgeEnd1Cascading, data.EdgeEnd2Cascading)
- }
- edges = append(edges, edge)
- // Get the HTrees which stores the node
- attht, valht, err := gm.getNodeStorageHTree(part, v.TargetNodeKind, false)
- if err != nil || attht == nil || valht == nil {
- return nil, nil, err
- }
- node, err := gm.readNode(v.TargetNodeKey, v.TargetNodeKind, nil, attht, valht)
- if err != nil {
- return nil, nil, err
- }
- nodes = append(nodes, node)
- }
- }
- return nodes, edges, nil
- }
- /*
- FetchEdge fetches a single edge from a partition of the graph.
- */
- func (gm *Manager) FetchEdge(part string, key string, kind string) (data.Edge, error) {
- return gm.FetchEdgePart(part, key, kind, nil)
- }
- /*
- FetchEdgePart fetches part of a single edge from a partition of the graph.
- */
- func (gm *Manager) FetchEdgePart(part string, key string, kind string,
- attrs []string) (data.Edge, error) {
- // Get the HTrees which stores the edge
- edgeht, err := gm.getEdgeStorageHTree(part, kind, true)
- if err != nil || edgeht == nil {
- return nil, err
- }
- // Take reader lock
- gm.mutex.RLock()
- defer gm.mutex.RUnlock()
- // Read the edge from the datastore
- node, err := gm.readNode(key, kind, attrs, edgeht, edgeht)
- return data.NewGraphEdgeFromNode(node), err
- }
- /*
- StoreEdge stores a single edge in a partition of the graph. This function will
- overwrites any existing edge.
- */
- func (gm *Manager) StoreEdge(part string, edge data.Edge) error {
- // Check if the edge can be stored
- if err := gm.checkEdge(edge); err != nil {
- return err
- }
- // Get the HTrees which stores the edges and the edge index
- iht, err := gm.getEdgeIndexHTree(part, edge.Kind(), true)
- if err != nil {
- return err
- }
- edgeht, err := gm.getEdgeStorageHTree(part, edge.Kind(), true)
- if err != nil {
- return err
- }
- // Get the HTrees which stores the edge endpoints and make sure the endpoints
- // do exist
- end1nodeht, end1ht, err := gm.getNodeStorageHTree(part, edge.End1Kind(), false)
- if err != nil {
- return err
- } else if end1ht == nil {
- return &util.GraphError{
- Type: util.ErrInvalidData,
- Detail: "Can't store edge to non-existend node kind: " + edge.End1Kind(),
- }
- } else if end1, err := end1nodeht.Get([]byte(PrefixNSAttrs + edge.End1Key())); err != nil || end1 == nil {
- return &util.GraphError{
- Type: util.ErrInvalidData,
- Detail: fmt.Sprintf("Can't find edge endpoint: %s (%s)", edge.End1Key(), edge.End1Kind()),
- }
- }
- end2nodeht, end2ht, err := gm.getNodeStorageHTree(part, edge.End2Kind(), false)
- if err != nil {
- return err
- } else if end2ht == nil {
- return &util.GraphError{
- Type: util.ErrInvalidData,
- Detail: "Can't store edge to non-existend node kind: " + edge.End2Kind(),
- }
- } else if end2, err := end2nodeht.Get([]byte(PrefixNSAttrs + edge.End2Key())); err != nil || end2 == nil {
- return &util.GraphError{
- Type: util.ErrInvalidData,
- Detail: fmt.Sprintf("Can't find edge endpoint: %s (%s)", edge.End2Key(), edge.End2Kind()),
- }
- }
- // Take writer lock
- gm.mutex.Lock()
- defer gm.mutex.Unlock()
- // Write edge to the datastore
- oldedge, err := gm.writeEdge(edge, edgeht, end1ht, end2ht)
- if err != nil {
- return err
- }
- // Increase edge count if the edge was inserted and write the changes
- // to the index.
- if oldedge == nil {
- // Increase edge count
- currentCount := gm.EdgeCount(edge.Kind())
- if err := gm.writeEdgeCount(edge.Kind(), currentCount+1, true); err != nil {
- return err
- }
- // Write edge data to the index
- if iht != nil {
- if err := util.NewIndexManager(iht).Index(edge.Key(), edge.IndexMap()); err != nil {
- // The edge was written at this point and the model is
- // consistent only the index is missing entries
- return err
- }
- }
- } else if iht != nil {
- err := util.NewIndexManager(iht).Reindex(edge.Key(), edge.IndexMap(),
- oldedge.IndexMap())
- if err != nil {
- // The edge was written at this point and the model is
- // consistent only the index is missing entries
- return err
- }
- }
- // Execute rules
- trans := newInternalGraphTrans(gm)
- trans.subtrans = true
- var event int
- if oldedge == nil {
- event = EventEdgeCreated
- } else {
- event = EventEdgeUpdated
- }
- if err := gm.gr.graphEvent(trans, event, part, edge, oldedge); err != nil {
- return err
- } else if err := trans.Commit(); err != nil {
- return err
- }
- // Flush changes - errors only reported on the actual node storage flush
- gm.gs.FlushMain()
- gm.flushEdgeIndex(part, edge.Kind())
- gm.flushNodeStorage(part, edge.End1Kind())
- gm.flushNodeStorage(part, edge.End2Kind())
- return gm.flushEdgeStorage(part, edge.Kind())
- }
- /*
- writeEdge writes a given edge to the datastore. It is assumed that the caller
- holds the writer lock before calling the functions and that, after the function
- returns, the changes are flushed to the storage. The caller has also to ensure
- that the endpoints of the edge do exist. Returns the old edge if an
- update occurred.
- */
- func (gm *Manager) writeEdge(edge data.Edge, edgeTree *hash.HTree,
- end1Tree *hash.HTree, end2Tree *hash.HTree) (data.Edge, error) {
- // Create lookup keys
- spec1 := gm.nm.Encode16(edge.End1Role(), true) + gm.nm.Encode16(edge.Kind(), true) +
- gm.nm.Encode16(edge.End2Role(), true) + gm.nm.Encode16(edge.End2Kind(), true)
- spec2 := gm.nm.Encode16(edge.End2Role(), true) + gm.nm.Encode16(edge.Kind(), true) +
- gm.nm.Encode16(edge.End1Role(), true) + gm.nm.Encode16(edge.End1Kind(), true)
- specsNode1Key := PrefixNSSpecs + edge.End1Key()
- edgeInfo1Key := PrefixNSEdge + edge.End1Key() + spec1
- specsNode2Key := PrefixNSSpecs + edge.End2Key()
- edgeInfo2Key := PrefixNSEdge + edge.End2Key() + spec2
- // Function to insert a new spec into a specs map
- updateSpecMap := func(key string, spec string, tree *hash.HTree) error {
- var specsNode map[string]string
- obj, err := tree.Get([]byte(key))
- if err != nil {
- return err
- } else if obj == nil {
- specsNode = make(map[string]string)
- } else {
- specsNode = obj.(map[string]string)
- }
- specsNode[spec] = ""
- if _, err = tree.Put([]byte(key), specsNode); err != nil {
- return err
- }
- return nil
- }
- // Function to update the edgeTargetInfo entry
- updateTargetInfo := func(key string, endkey string, endkind string,
- cascadeToTarget bool, cascadeLastToTarget bool, cascadeFromTarget bool, cascadeLastFromTarget bool, tree *hash.HTree) error {
- var targetMap map[string]*edgeTargetInfo
- obj, err := tree.Get([]byte(key))
- if err != nil {
- return err
- } else if obj == nil {
- targetMap = make(map[string]*edgeTargetInfo)
- } else {
- targetMap = obj.(map[string]*edgeTargetInfo)
- }
- // Update the target info
- targetMap[edge.Key()] = &edgeTargetInfo{cascadeToTarget, cascadeLastToTarget,
- cascadeFromTarget, cascadeLastFromTarget, endkey, endkind}
- if _, err = tree.Put([]byte(key), targetMap); err != nil {
- return err
- }
- return nil
- }
- // Write node data for edge - if the data is incorrect we write the old
- // data back later. It is assumed that most of the time the data is correct
- // so we can avoid an extra read lookup
- var oldedge data.Edge
- if oldedgenode, err := gm.writeNode(edge, false, edgeTree, edgeTree, edgeAttributeFilter); err != nil {
- return nil, err
- } else if oldedgenode != nil {
- oldedge = data.NewGraphEdgeFromNode(oldedgenode)
- // Do a sanity check that the endpoints were not updated.
- if !data.NodeCompare(oldedge, edge, []string{data.EdgeEnd1Key,
- data.EdgeEnd1Kind, data.EdgeEnd1Role, data.EdgeEnd2Key,
- data.EdgeEnd2Kind, data.EdgeEnd2Role}) {
- // If the check fails then write back the old data and return
- // no error checking when writing back
- gm.writeNode(oldedge, false, edgeTree, edgeTree, edgeAttributeFilter)
- return nil, &util.GraphError{
- Type: util.ErrInvalidData,
- Detail: "Cannot update endpoints or spec of existing edge: " + edge.Key(),
- }
- }
- return oldedge, nil
- }
- // Create / update specs map on the nodes
- if err := updateSpecMap(specsNode1Key, spec1, end1Tree); err != nil {
- return nil, err
- }
- if err := updateSpecMap(specsNode2Key, spec2, end2Tree); err != nil {
- return nil, err
- }
- // Create / update the edgeInfo entries
- if err := updateTargetInfo(edgeInfo1Key, edge.End2Key(), edge.End2Kind(),
- edge.End1IsCascading(), edge.End1IsCascadingLast(), edge.End2IsCascading(),
- edge.End2IsCascadingLast(), end1Tree); err != nil {
- return nil, err
- }
- if err := updateTargetInfo(edgeInfo2Key, edge.End1Key(), edge.End1Kind(),
- edge.End2IsCascading(), edge.End2IsCascadingLast(),
- edge.End1IsCascading(), edge.End1IsCascadingLast(), end2Tree); err != nil {
- return nil, err
- }
- return nil, nil
- }
- /*
- RemoveEdge removes a single edge from a partition of the graph.
- */
- func (gm *Manager) RemoveEdge(part string, key string, kind string) (data.Edge, error) {
- // Get the HTrees which stores the edges and the edge index
- iht, err := gm.getEdgeIndexHTree(part, kind, true)
- if err != nil {
- return nil, err
- }
- edgeht, err := gm.getEdgeStorageHTree(part, kind, true)
- if err != nil {
- return nil, err
- }
- // Take writer lock
- gm.mutex.Lock()
- defer gm.mutex.Unlock()
- // Delete the node from the datastore
- node, err := gm.deleteNode(key, kind, edgeht, edgeht)
- edge := data.NewGraphEdgeFromNode(node)
- if err != nil {
- return edge, err
- }
- if node != nil {
- // Get the HTrees which stores the edge endpoints
- _, end1ht, err := gm.getNodeStorageHTree(part, edge.End1Kind(), false)
- if err != nil {
- return edge, err
- }
- _, end2ht, err := gm.getNodeStorageHTree(part, edge.End2Kind(), false)
- if err != nil {
- return edge, err
- }
- // Delete edge info from node storage
- if err := gm.deleteEdge(edge, end1ht, end2ht); err != nil {
- return edge, err
- }
- if iht != nil {
- err := util.NewIndexManager(iht).Deindex(key, edge.IndexMap())
- if err != nil {
- return edge, err
- }
- }
- // Decrease edge count
- currentCount := gm.EdgeCount(edge.Kind())
- if err := gm.writeEdgeCount(edge.Kind(), currentCount-1, true); err != nil {
- return edge, err
- }
- // Execute rules
- trans := newInternalGraphTrans(gm)
- trans.subtrans = true
- if err := gm.gr.graphEvent(trans, EventEdgeDeleted, part, edge); err != nil {
- return edge, err
- } else if err := trans.Commit(); err != nil {
- return edge, err
- }
- // Flush changes - errors only reported on the actual node storage flush
- gm.gs.FlushMain()
- gm.flushEdgeIndex(part, edge.Kind())
- gm.flushNodeStorage(part, edge.End1Kind())
- gm.flushNodeStorage(part, edge.End2Kind())
- return edge, gm.flushEdgeStorage(part, edge.Kind())
- }
- return nil, nil
- }
- /*
- Delete edge information from a given node storage
- */
- func (gm *Manager) deleteEdge(edge data.Edge, end1Tree *hash.HTree, end2Tree *hash.HTree) error {
- // Create lookup keys
- spec1 := gm.nm.Encode16(edge.End1Role(), true) + gm.nm.Encode16(edge.Kind(), true) +
- gm.nm.Encode16(edge.End2Role(), true) + gm.nm.Encode16(edge.End2Kind(), true)
- spec2 := gm.nm.Encode16(edge.End2Role(), true) + gm.nm.Encode16(edge.Kind(), true) +
- gm.nm.Encode16(edge.End1Role(), true) + gm.nm.Encode16(edge.End1Kind(), true)
- specsNode1Key := PrefixNSSpecs + edge.End1Key()
- edgeInfo1Key := PrefixNSEdge + edge.End1Key() + spec1
- specsNode2Key := PrefixNSSpecs + edge.End2Key()
- edgeInfo2Key := PrefixNSEdge + edge.End2Key() + spec2
- // Function to delete a spec from a specs map
- updateSpecMap := func(key string, spec string, tree *hash.HTree) error {
- var specsNode map[string]string
- obj, err := tree.Get([]byte(key))
- if err != nil {
- return &util.GraphError{Type: util.ErrReading, Detail: err.Error()}
- } else if obj == nil {
- return &util.GraphError{
- Type: util.ErrInvalidData,
- Detail: fmt.Sprintf("Expected spec entry is missing: %v", key),
- }
- } else {
- specsNode = obj.(map[string]string)
- }
- delete(specsNode, spec)
- if len(specsNode) == 0 {
- if _, err = tree.Remove([]byte(key)); err != nil {
- return err
- }
- } else if _, err = tree.Put([]byte(key), specsNode); err != nil {
- return err
- }
- return nil
- }
- // Function to delete the edgeTargetInfo entry
- updateTargetInfo := func(key string, tree *hash.HTree) (bool, error) {
- var targetMap map[string]*edgeTargetInfo
- obj, err := tree.Get([]byte(key))
- if err != nil {
- return false, &util.GraphError{Type: util.ErrReading, Detail: err.Error()}
- } else if obj == nil {
- return false, &util.GraphError{
- Type: util.ErrInvalidData,
- Detail: fmt.Sprintf("Expected edgeTargetInfo entry is missing: %v", key),
- }
- } else {
- targetMap = obj.(map[string]*edgeTargetInfo)
- }
- delete(targetMap, edge.Key())
- if len(targetMap) == 0 {
- if _, err = tree.Remove([]byte(key)); err != nil {
- return false, err
- }
- return true, nil
- } else if _, err = tree.Put([]byte(key), targetMap); err != nil {
- return false, err
- }
- return false, nil
- }
- // Remove the edgeInfo entries
- end1TargetInfoRemoved, err := updateTargetInfo(edgeInfo1Key, end1Tree)
- if err != nil {
- return err
- }
- end2TargetInfoRemoved, err := updateTargetInfo(edgeInfo2Key, end2Tree)
- if err != nil {
- return err
- }
- // Remove specs map on the nodes if the target info structure was removed
- if end1TargetInfoRemoved {
- if err := updateSpecMap(specsNode1Key, spec1, end1Tree); err != nil {
- return err
- }
- }
- if end2TargetInfoRemoved {
- if err := updateSpecMap(specsNode2Key, spec2, end2Tree); err != nil {
- return err
- }
- }
- return nil
- }
- /*
- Default filter function to filter out system edge attributes.
- */
- func edgeAttributeFilter(attr string) bool {
- return attr == data.NodeKey || attr == data.NodeKind
- }
|