|
- /*
- * EliasDB
- *
- * Copyright 2016 Matthias Ladkau. All rights reserved.
- *
- * This Source Code Form is subject to the terms of the Mozilla Public
- * License, v. 2.0. If a copy of the MPL was not distributed with this
- * file, You can obtain one at http://mozilla.org/MPL/2.0/.
- */
- package graph
- import (
- "fmt"
- "strings"
- "sync"
- "devt.de/krotik/common/errorutil"
- "devt.de/krotik/eliasdb/graph/data"
- "devt.de/krotik/eliasdb/graph/util"
- )
- /*
- Trans is a transaction object which should be used to group node and edge operations.
- */
- type Trans interface {
- /*
- ID returns a unique transaction ID.
- */
- ID() string
- /*
- String returns a string representation of this transatction.
- */
- String() string
- /*
- Counts returns the transaction size in terms of objects. Returned values
- are nodes to store, edges to store, nodes to remove and edges to remove.
- */
- Counts() (int, int, int, int)
- /*
- IsEmpty returns if this transaction is empty.
- */
- IsEmpty() bool
- /*
- Commit writes the transaction to the graph database. An automatic rollback is done if
- any non-fatal error occurs. Failed transactions cannot be committed again.
- Serious write errors which may corrupt the database will cause a panic.
- */
- Commit() error
- /*
- StoreNode stores a single node in a partition of the graph. This function will
- overwrites any existing node.
- */
- StoreNode(part string, node data.Node) error
- /*
- UpdateNode updates a single node in a partition of the graph. This function will
- only update the given values of the node.
- */
- UpdateNode(part string, node data.Node) error
- /*
- RemoveNode removes a single node from a partition of the graph.
- */
- RemoveNode(part string, nkey string, nkind string) error
- /*
- StoreEdge stores a single edge in a partition of the graph. This function will
- overwrites any existing edge.
- */
- StoreEdge(part string, edge data.Edge) error
- /*
- RemoveEdge removes a single edge from a partition of the graph.
- */
- RemoveEdge(part string, ekey string, ekind string) error
- }
- /*
- NewGraphTrans creates a new graph transaction. This object is not thread safe
- and should only be used for non-concurrent use cases; use NewConcurrentGraphTrans
- for concurrent use cases.
- */
- func NewGraphTrans(gm *Manager) Trans {
- return newInternalGraphTrans(gm)
- }
- /*
- NewConcurrentGraphTrans creates a new thread-safe graph transaction.
- */
- func NewConcurrentGraphTrans(gm *Manager) Trans {
- return &concurrentTrans{NewGraphTrans(gm), &sync.RWMutex{}}
- }
- /*
- NewRollingTrans wraps an existing transaction into a rolling transaction.
- Rolling transactions can be used for VERY large datasets and will commit
- themselves after n operations. Rolling transactions are always thread-safe.
- */
- func NewRollingTrans(t Trans, n int, gm *Manager, newTrans func(*Manager) Trans) Trans {
- idCounterLock.Lock()
- defer idCounterLock.Unlock()
- idCounter++
- // Smallest commit threshold is 1
- if n < 1 {
- n = 1
- }
- return &rollingTrans{
- id: fmt.Sprint(idCounter),
- gm: gm,
- currentTrans: t,
- newTransFunc: newTrans,
- transErrors: errorutil.NewCompositeError(),
- opThreshold: n,
- opCount: 0,
- inFlightCount: 0,
- wg: &sync.WaitGroup{},
- countNodeIns: 0,
- countNodeRem: 0,
- countEdgeIns: 0,
- countEdgeRem: 0,
- transLock: &sync.RWMutex{},
- }
- }
- /*
- newInternalGraphTrans is used for internal transactions. The returned object
- contains extra fields which are only for internal use.
- */
- func newInternalGraphTrans(gm *Manager) *baseTrans {
- idCounterLock.Lock()
- defer idCounterLock.Unlock()
- idCounter++
- return &baseTrans{fmt.Sprint(idCounter), gm, false, make(map[string]data.Node), make(map[string]data.Node),
- make(map[string]data.Edge), make(map[string]data.Edge)}
- }
- /*
- idCounter is a simple counter for ids
- */
- var idCounter uint64
- var idCounterLock = &sync.Mutex{}
- /*
- baseTrans is the main data structure for a graph transaction
- */
- type baseTrans struct {
- id string // Unique transaction ID - not used by EliasDB
- gm *Manager // Graph manager which created this transaction
- subtrans bool // Flag if the transaction is a subtransaction
- storeNodes map[string]data.Node // Nodes which should be stored
- removeNodes map[string]data.Node // Nodes which should be removed
- storeEdges map[string]data.Edge // Edges which should be stored
- removeEdges map[string]data.Edge // Edges which should be removed
- }
- /*
- ID returns a unique transaction ID.
- */
- func (gt *baseTrans) ID() string {
- return gt.id
- }
- /*
- IsEmpty returns if this transaction is empty.
- */
- func (gt *baseTrans) IsEmpty() bool {
- sn, se, rn, re := gt.Counts()
- return sn == 0 && se == 0 && rn == 0 && re == 0
- }
- /*
- Counts returns the transaction size in terms of objects. Returned values
- are nodes to store, edges to store, nodes to remove and edges to remove.
- */
- func (gt *baseTrans) Counts() (int, int, int, int) {
- return len(gt.storeNodes), len(gt.storeEdges), len(gt.removeNodes), len(gt.removeEdges)
- }
- /*
- String returns a string representation of this transatction.
- */
- func (gt *baseTrans) String() string {
- sn, se, rn, re := gt.Counts()
- return fmt.Sprintf("Transaction %v - Nodes: I:%v R:%v - Edges: I:%v R:%v",
- gt.id, sn, rn, se, re)
- }
- /*
- Commit writes the transaction to the graph database. An automatic rollback is done if
- any non-fatal error occurs. Failed transactions cannot be committed again.
- Serious write errors which may corrupt the database will cause a panic.
- */
- func (gt *baseTrans) Commit() error {
- // Take writer lock if we are not in a subtransaction
- if !gt.subtrans {
- gt.gm.mutex.Lock()
- defer gt.gm.mutex.Unlock()
- }
- // Return if there is nothing to do
- if gt.IsEmpty() {
- return nil
- }
- doRollback := func(nodePartsAndKinds map[string]string,
- edgePartsAndKinds map[string]string) {
- // Rollback main database
- gt.gm.gs.RollbackMain()
- // Rollback node storages
- for kkey := range nodePartsAndKinds {
- partAndKind := strings.Split(kkey, "#")
- gt.gm.rollbackNodeIndex(partAndKind[0], partAndKind[1])
- gt.gm.rollbackNodeStorage(partAndKind[0], partAndKind[1])
- }
- gt.storeNodes = make(map[string]data.Node)
- gt.removeNodes = make(map[string]data.Node)
- // Rollback edge storages
- if edgePartsAndKinds != nil {
- for kkey := range edgePartsAndKinds {
- partAndKind := strings.Split(kkey, "#")
- gt.gm.rollbackEdgeIndex(partAndKind[0], partAndKind[1])
- gt.gm.rollbackEdgeStorage(partAndKind[0], partAndKind[1])
- }
- }
- gt.storeEdges = make(map[string]data.Edge)
- gt.removeEdges = make(map[string]data.Edge)
- }
- // Write nodes and edges until everything has been written
- nodePartsAndKinds := make(map[string]string)
- edgePartsAndKinds := make(map[string]string)
- for !gt.IsEmpty() {
- // Write the nodes first
- if err := gt.commitNodes(nodePartsAndKinds, edgePartsAndKinds); err != nil {
- doRollback(nodePartsAndKinds, nil)
- return err
- }
- // After the nodes write the edges
- if err := gt.commitEdges(nodePartsAndKinds, edgePartsAndKinds); err != nil {
- doRollback(nodePartsAndKinds, edgePartsAndKinds)
- return err
- }
- }
- // Flush changes - panic instead of error reporting since the database
- // may be inconsistent
- panicIfError := func(err error) {
- if err != nil {
- panic("Fatal GraphError:" + err.Error())
- }
- }
- panicIfError(gt.gm.gs.FlushMain())
- for kkey := range nodePartsAndKinds {
- partAndKind := strings.Split(kkey, "#")
- panicIfError(gt.gm.flushNodeIndex(partAndKind[0], partAndKind[1]))
- panicIfError(gt.gm.flushNodeStorage(partAndKind[0], partAndKind[1]))
- }
- for kkey := range edgePartsAndKinds {
- partAndKind := strings.Split(kkey, "#")
- panicIfError(gt.gm.flushEdgeIndex(partAndKind[0], partAndKind[1]))
- panicIfError(gt.gm.flushEdgeStorage(partAndKind[0], partAndKind[1]))
- }
- return nil
- }
- /*
- commitNodes tries to commit all transaction nodes.
- */
- func (gt *baseTrans) commitNodes(nodePartsAndKinds map[string]string, edgePartsAndKinds map[string]string) error {
- // First insert nodes
- for tkey, node := range gt.storeNodes {
- // Get partition and kind
- partAndKind := strings.Split(tkey, "#")
- nodePartsAndKinds[partAndKind[0]+"#"+partAndKind[1]] = ""
- part := partAndKind[0]
- // Get the HTrees which stores the node index and node
- iht, err := gt.gm.getNodeIndexHTree(part, node.Kind(), true)
- if err != nil {
- return err
- }
- attht, valht, err := gt.gm.getNodeStorageHTree(part, node.Kind(), true)
- if err != nil {
- return err
- }
- // Write the node to the datastore
- oldnode, err := gt.gm.writeNode(node, false, attht, valht, nodeAttributeFilter)
- if err != nil {
- return err
- }
- // Increase node count if the node was inserted and write the changes
- // to the index.
- if oldnode == nil {
- currentCount := gt.gm.NodeCount(node.Kind())
- gt.gm.writeNodeCount(node.Kind(), currentCount+1, false)
- if iht != nil {
- err := util.NewIndexManager(iht).Index(node.Key(), node.IndexMap())
- if err != nil {
- // The node was written at this point and the model is
- // consistent only the index is missing entries
- return err
- }
- }
- } else if iht != nil {
- err := util.NewIndexManager(iht).Reindex(node.Key(), node.IndexMap(),
- oldnode.IndexMap())
- if err != nil {
- // The node was written at this point and the model is
- // consistent only the index is missing entries
- return err
- }
- }
- // Execute rules
- var event int
- if oldnode == nil {
- event = EventNodeCreated
- } else {
- event = EventNodeUpdated
- }
- if err := gt.gm.gr.graphEvent(gt, event, part, node, oldnode); err != nil {
- return err
- }
- delete(gt.storeNodes, tkey)
- }
- // Then remove nodes
- for tkey, node := range gt.removeNodes {
- // Get partition and kind
- partAndKind := strings.Split(tkey, "#")
- nodePartsAndKinds[partAndKind[0]+"#"+partAndKind[1]] = ""
- part := partAndKind[0]
- // Get the HTree which stores the node index and node kind
- iht, err := gt.gm.getNodeIndexHTree(part, node.Kind(), false)
- if err != nil {
- return err
- }
- attTree, valTree, err := gt.gm.getNodeStorageHTree(part, node.Kind(), false)
- if err != nil {
- return err
- }
- if attTree == nil || valTree == nil {
- // Kind does not exist - continue
- delete(gt.removeNodes, tkey)
- continue
- }
- // Delete the node from the datastore
- oldnode, err := gt.gm.deleteNode(node.Key(), node.Kind(), attTree, valTree)
- if err != nil {
- return err
- }
- // Update the index
- if oldnode != nil {
- if iht != nil {
- err := util.NewIndexManager(iht).Deindex(node.Key(), oldnode.IndexMap())
- if err != nil {
- return err
- }
- }
- // Decrease the node count
- currentCount := gt.gm.NodeCount(node.Kind())
- gt.gm.writeNodeCount(node.Kind(), currentCount-1, false)
- // Execute rules
- if err := gt.gm.gr.graphEvent(gt, EventNodeDeleted, part, oldnode); err != nil {
- return err
- }
- }
- delete(gt.removeNodes, tkey)
- }
- return nil
- }
- /*
- commitEdges tries to commit all transaction edges.
- */
- func (gt *baseTrans) commitEdges(nodePartsAndKinds map[string]string, edgePartsAndKinds map[string]string) error {
- // First insert edges
- for tkey, edge := range gt.storeEdges {
- // Get partition and kind
- partAndKind := strings.Split(tkey, "#")
- edgePartsAndKinds[partAndKind[0]+"#"+partAndKind[1]] = ""
- nodePartsAndKinds[partAndKind[0]+"#"+edge.End1Kind()] = ""
- nodePartsAndKinds[partAndKind[0]+"#"+edge.End2Kind()] = ""
- part := partAndKind[0]
- // Get the HTrees which stores the edges and the edge index
- iht, err := gt.gm.getEdgeIndexHTree(part, edge.Kind(), true)
- if err != nil {
- return err
- }
- edgeht, err := gt.gm.getEdgeStorageHTree(part, edge.Kind(), true)
- if err != nil {
- return err
- }
- // Get the HTrees which stores the edge endpoints and make sure the endpoints
- // do exist
- end1nodeht, end1ht, err := gt.gm.getNodeStorageHTree(part, edge.End1Kind(), false)
- if err != nil {
- return err
- } else if end1ht == nil {
- return &util.GraphError{
- Type: util.ErrInvalidData,
- Detail: fmt.Sprintf("Can't store edge to non-existend node kind: %v", edge.End1Kind()),
- }
- } else if end1, err := end1nodeht.Get([]byte(PrefixNSAttrs + edge.End1Key())); err != nil || end1 == nil {
- return &util.GraphError{
- Type: util.ErrInvalidData,
- Detail: fmt.Sprintf("Can't find edge endpoint: %s (%s)", edge.End1Key(), edge.End1Kind()),
- }
- }
- end2nodeht, end2ht, err := gt.gm.getNodeStorageHTree(part, edge.End2Kind(), false)
- if err != nil {
- return err
- } else if end2ht == nil {
- return &util.GraphError{
- Type: util.ErrInvalidData,
- Detail: "Can't store edge to non-existend node kind: " + edge.End2Kind()}
- } else if end2, err := end2nodeht.Get([]byte(PrefixNSAttrs + edge.End2Key())); err != nil || end2 == nil {
- return &util.GraphError{
- Type: util.ErrInvalidData,
- Detail: fmt.Sprintf("Can't find edge endpoint: %s (%s)", edge.End2Key(), edge.End2Kind()),
- }
- }
- // Write edge to the datastore
- oldedge, err := gt.gm.writeEdge(edge, edgeht, end1ht, end2ht)
- if err != nil {
- return err
- }
- // Increase edge count if the edge was inserted and write the changes
- // to the index.
- if oldedge == nil {
- // Increase edge count
- currentCount := gt.gm.EdgeCount(edge.Kind())
- gt.gm.writeEdgeCount(edge.Kind(), currentCount+1, false)
- // Write edge data to the index
- if iht != nil {
- if err := util.NewIndexManager(iht).Index(edge.Key(), edge.IndexMap()); err != nil {
- // The edge was written at this point and the model is
- // consistent only the index is missing entries
- return err
- }
- }
- } else if iht != nil {
- err := util.NewIndexManager(iht).Reindex(edge.Key(), edge.IndexMap(),
- oldedge.IndexMap())
- if err != nil {
- // The edge was written at this point and the model is
- // consistent only the index is missing entries
- return err
- }
- }
- // Execute rules
- var event int
- if oldedge == nil {
- event = EventEdgeCreated
- } else {
- event = EventEdgeUpdated
- }
- if err := gt.gm.gr.graphEvent(gt, event, part, edge, oldedge); err != nil {
- return err
- }
- delete(gt.storeEdges, tkey)
- }
- // Then remove edges
- for tkey, edge := range gt.removeEdges {
- // Get partition and kind
- partAndKind := strings.Split(tkey, "#")
- edgePartsAndKinds[partAndKind[0]+"#"+partAndKind[1]] = ""
- nodePartsAndKinds[partAndKind[0]+"#"+edge.End1Kind()] = ""
- nodePartsAndKinds[partAndKind[0]+"#"+edge.End2Kind()] = ""
- part := partAndKind[0]
- // Get the HTrees which stores the edges and the edge index
- iht, err := gt.gm.getEdgeIndexHTree(part, edge.Kind(), true)
- if err != nil {
- return err
- }
- edgeht, err := gt.gm.getEdgeStorageHTree(part, edge.Kind(), true)
- if err != nil {
- return err
- }
- // Delete the node from the datastore
- node, err := gt.gm.deleteNode(edge.Key(), edge.Kind(), edgeht, edgeht)
- oldedge := data.NewGraphEdgeFromNode(node)
- if err != nil {
- return err
- }
- if node != nil {
- // Get the HTrees which stores the edge endpoints
- _, end1ht, err := gt.gm.getNodeStorageHTree(part, oldedge.End1Kind(), false)
- if err != nil {
- return err
- }
- _, end2ht, err := gt.gm.getNodeStorageHTree(part, oldedge.End2Kind(), false)
- if err != nil {
- return err
- }
- // Delete edge info from node storage
- if err := gt.gm.deleteEdge(oldedge, end1ht, end2ht); err != nil {
- return err
- }
- if iht != nil {
- err := util.NewIndexManager(iht).Deindex(edge.Key(), oldedge.IndexMap())
- if err != nil {
- return err
- }
- }
- // Decrease edge count
- currentCount := gt.gm.EdgeCount(oldedge.Kind())
- gt.gm.writeEdgeCount(oldedge.Kind(), currentCount-1, false)
- // Execute rules
- if err := gt.gm.gr.graphEvent(gt, EventEdgeDeleted, part, oldedge); err != nil {
- return err
- }
- }
- delete(gt.removeEdges, tkey)
- }
- return nil
- }
- /*
- StoreNode stores a single node in a partition of the graph. This function will
- overwrites any existing node.
- */
- func (gt *baseTrans) StoreNode(part string, node data.Node) error {
- if err := gt.gm.checkPartitionName(part); err != nil {
- return err
- } else if err := gt.gm.checkNode(node); err != nil {
- return err
- }
- key := gt.createKey(part, node.Key(), node.Kind())
- if _, ok := gt.removeNodes[key]; ok {
- delete(gt.removeNodes, key)
- }
- gt.storeNodes[key] = node
- return nil
- }
- /*
- UpdateNode updates a single node in a partition of the graph. This function will
- only update the given values of the node.
- */
- func (gt *baseTrans) UpdateNode(part string, node data.Node) error {
- if err := gt.gm.checkPartitionName(part); err != nil {
- return err
- } else if err := gt.gm.checkNode(node); err != nil {
- return err
- }
- key := gt.createKey(part, node.Key(), node.Kind())
- if _, ok := gt.removeNodes[key]; ok {
- delete(gt.removeNodes, key)
- } else if storeNode, ok := gt.storeNodes[key]; ok {
- node = data.NodeMerge(storeNode, node)
- } else {
- // Check the actual database if the node exists
- storeNode, err := gt.gm.FetchNode(part, node.Key(), node.Kind())
- if err != nil {
- return err
- } else if storeNode != nil {
- node = data.NodeMerge(storeNode, node)
- }
- }
- gt.storeNodes[key] = node
- return nil
- }
- /*
- RemoveNode removes a single node from a partition of the graph.
- */
- func (gt *baseTrans) RemoveNode(part string, nkey string, nkind string) error {
- if err := gt.gm.checkPartitionName(part); err != nil {
- return err
- }
- key := gt.createKey(part, nkey, nkind)
- if _, ok := gt.storeNodes[key]; ok {
- delete(gt.storeNodes, key)
- }
- node := data.NewGraphNode()
- node.SetAttr(data.NodeKey, nkey)
- node.SetAttr(data.NodeKind, nkind)
- gt.removeNodes[key] = node
- return nil
- }
- /*
- StoreEdge stores a single edge in a partition of the graph. This function will
- overwrites any existing edge.
- */
- func (gt *baseTrans) StoreEdge(part string, edge data.Edge) error {
- if err := gt.gm.checkPartitionName(part); err != nil {
- return err
- } else if err := gt.gm.checkEdge(edge); err != nil {
- return err
- }
- key := gt.createKey(part, edge.Key(), edge.Kind())
- if _, ok := gt.removeEdges[key]; ok {
- delete(gt.removeEdges, key)
- }
- gt.storeEdges[key] = edge
- return nil
- }
- /*
- RemoveEdge removes a single edge from a partition of the graph.
- */
- func (gt *baseTrans) RemoveEdge(part string, ekey string, ekind string) error {
- if err := gt.gm.checkPartitionName(part); err != nil {
- return err
- }
- key := gt.createKey(part, ekey, ekind)
- if _, ok := gt.storeEdges[key]; ok {
- delete(gt.storeEdges, key)
- }
- edge := data.NewGraphEdge()
- edge.SetAttr(data.NodeKey, ekey)
- edge.SetAttr(data.NodeKind, ekind)
- gt.removeEdges[key] = edge
- return nil
- }
- /*
- Create a key for the transaction storage.
- */
- func (gt *baseTrans) createKey(part string, key string, kind string) string {
- return part + "#" + kind + "#" + key
- }
- /*
- concurrentTrans is a lock-wrapper around baseTrans which allows concurrent use.
- */
- type concurrentTrans struct {
- Trans
- transLock *sync.RWMutex
- }
- /*
- ID returns a unique transaction ID.
- */
- func (gt *concurrentTrans) ID() string {
- gt.transLock.RLock()
- defer gt.transLock.RUnlock()
- return gt.Trans.ID()
- }
- /*
- String returns a string representation of this transatction.
- */
- func (gt *concurrentTrans) String() string {
- gt.transLock.RLock()
- defer gt.transLock.RUnlock()
- return gt.Trans.String()
- }
- /*
- Counts returns the transaction size in terms of objects. Returned values
- are nodes to store, edges to store, nodes to remove and edges to remove.
- */
- func (gt *concurrentTrans) Counts() (int, int, int, int) {
- gt.transLock.RLock()
- defer gt.transLock.RUnlock()
- return gt.Trans.Counts()
- }
- /*
- IsEmpty returns if this transaction is empty.
- */
- func (gt *concurrentTrans) IsEmpty() bool {
- gt.transLock.RLock()
- defer gt.transLock.RUnlock()
- return gt.Trans.IsEmpty()
- }
- /*
- Commit writes the transaction to the graph database. An automatic rollback is done if
- any non-fatal error occurs. Failed transactions cannot be committed again.
- Serious write errors which may corrupt the database will cause a panic.
- */
- func (gt *concurrentTrans) Commit() error {
- gt.transLock.Lock()
- defer gt.transLock.Unlock()
- return gt.Trans.Commit()
- }
- /*
- StoreNode stores a single node in a partition of the graph. This function will
- overwrites any existing node.
- */
- func (gt *concurrentTrans) StoreNode(part string, node data.Node) error {
- gt.transLock.Lock()
- defer gt.transLock.Unlock()
- return gt.Trans.StoreNode(part, node)
- }
- /*
- UpdateNode updates a single node in a partition of the graph. This function will
- only update the given values of the node.
- */
- func (gt *concurrentTrans) UpdateNode(part string, node data.Node) error {
- gt.transLock.Lock()
- defer gt.transLock.Unlock()
- return gt.Trans.UpdateNode(part, node)
- }
- /*
- RemoveNode removes a single node from a partition of the graph.
- */
- func (gt *concurrentTrans) RemoveNode(part string, nkey string, nkind string) error {
- gt.transLock.Lock()
- defer gt.transLock.Unlock()
- return gt.Trans.RemoveNode(part, nkey, nkind)
- }
- /*
- StoreEdge stores a single edge in a partition of the graph. This function will
- overwrites any existing edge.
- */
- func (gt *concurrentTrans) StoreEdge(part string, edge data.Edge) error {
- gt.transLock.Lock()
- defer gt.transLock.Unlock()
- return gt.Trans.StoreEdge(part, edge)
- }
- /*
- RemoveEdge removes a single edge from a partition of the graph.
- */
- func (gt *concurrentTrans) RemoveEdge(part string, ekey string, ekind string) error {
- gt.transLock.Lock()
- defer gt.transLock.Unlock()
- return gt.Trans.RemoveEdge(part, ekey, ekind)
- }
- /*
- rollingTrans is a rolling transaction which will commit itself after
- n operations.
- */
- type rollingTrans struct {
- id string // ID of this transaction
- gm *Manager // Graph manager which created this transaction
- currentTrans Trans // Current transaction which is build up
- newTransFunc func(*Manager) Trans // Function to create a new transaction
- transErrors *errorutil.CompositeError // Collected transaction errors
- opThreshold int // Operation threshold
- opCount int // Operation count
- inFlightCount int // Previous transactions which are still committing
- wg *sync.WaitGroup // WaitGroup which releases after all in-flight transactions
- countNodeIns int // Count for inserted nodes
- countNodeRem int // Count for removed nodes
- countEdgeIns int // Count for inserted edges
- countEdgeRem int // Count for removed edges
- transLock *sync.RWMutex // Lock for this transaction
- }
- /*
- ID returns a unique transaction ID.
- */
- func (gt *rollingTrans) ID() string {
- gt.transLock.RLock()
- defer gt.transLock.RUnlock()
- return gt.id
- }
- /*
- IsEmpty returns if this transaction is empty.
- */
- func (gt *rollingTrans) IsEmpty() bool {
- sn, se, rn, re := gt.Counts()
- return sn == 0 && se == 0 && rn == 0 && re == 0
- }
- /*
- Counts returns the transaction size in terms of objects. Returned values
- are nodes to store, edges to store, nodes to remove and edges to remove.
- */
- func (gt *rollingTrans) Counts() (int, int, int, int) {
- gt.transLock.RLock()
- defer gt.transLock.RUnlock()
- // Count current trans
- ns, es, nr, er := gt.currentTrans.Counts()
- return ns + gt.countNodeIns, es + gt.countEdgeIns,
- nr + gt.countNodeRem, er + gt.countEdgeRem
- }
- /*
- String returns a string representation of this transatction.
- */
- func (gt *rollingTrans) String() string {
- gt.transLock.RLock()
- defer gt.transLock.RUnlock()
- ns, es, nr, er := gt.currentTrans.Counts()
- return fmt.Sprintf("Rolling transaction %v - Nodes: I:%v R:%v - "+
- "Edges: I:%v R:%v - Threshold: %v - In-flight: %v",
- gt.id, ns+gt.countNodeIns, nr+gt.countNodeRem, es+gt.countEdgeIns,
- er+gt.countEdgeRem, gt.opThreshold, gt.inFlightCount)
- }
- /*
- Commit writes the remaining operations of this rolling transaction to
- the graph database.
- */
- func (gt *rollingTrans) Commit() error {
- // Commit current transaction
- gt.transLock.Lock()
- if err := gt.currentTrans.Commit(); err != nil {
- gt.transErrors.Add(err)
- }
- gt.transLock.Unlock()
- // Wait for other transactions
- gt.wg.Wait()
- // Return any errors
- if gt.transErrors.HasErrors() {
- return gt.transErrors
- }
- return nil
- }
- /*
- checkNewSubTrans checks if a new sub-transaction should be started.
- */
- func (gt *rollingTrans) checkNewSubTrans() {
- if gt.opCount++; gt.opCount >= gt.opThreshold {
- // Reset the op counter
- gt.opCount = 0
- // Start a new transaction and add the counts to the overall counts
- cTrans := gt.currentTrans
- gt.currentTrans = gt.newTransFunc(gt.gm)
- ns, es, nr, er := cTrans.Counts()
- gt.countNodeIns += ns
- gt.countNodeRem += nr
- gt.countEdgeIns += es
- gt.countEdgeRem += er
- // Start go routine which commits the current transaction
- gt.wg.Add(1) // Add to WaitGroup so we can wait for all in-flight transactions
- gt.inFlightCount++ // Count the new in-flight transaction
- go func() {
- defer gt.wg.Done()
- err := cTrans.Commit()
- gt.transLock.Lock()
- if err != nil {
- // Store errors
- gt.transErrors.Add(err)
- }
- // Reduce the counts (do this even if there were errors)
- gt.countNodeIns -= ns
- gt.countNodeRem -= nr
- gt.countEdgeIns -= es
- gt.countEdgeRem -= er
- gt.inFlightCount--
- gt.transLock.Unlock()
- }()
- }
- }
- /*
- StoreNode stores a single node in a partition of the graph. This function will
- overwrites any existing node.
- */
- func (gt *rollingTrans) StoreNode(part string, node data.Node) error {
- gt.transLock.Lock()
- defer gt.transLock.Unlock()
- err := gt.currentTrans.StoreNode(part, node)
- if err == nil {
- gt.checkNewSubTrans()
- }
- return err
- }
- /*
- UpdateNode updates a single node in a partition of the graph. This function will
- only update the given values of the node.
- */
- func (gt *rollingTrans) UpdateNode(part string, node data.Node) error {
- gt.transLock.Lock()
- defer gt.transLock.Unlock()
- err := gt.currentTrans.UpdateNode(part, node)
- if err == nil {
- gt.checkNewSubTrans()
- }
- return err
- }
- /*
- RemoveNode removes a single node from a partition of the graph.
- */
- func (gt *rollingTrans) RemoveNode(part string, nkey string, nkind string) error {
- gt.transLock.Lock()
- defer gt.transLock.Unlock()
- err := gt.currentTrans.RemoveNode(part, nkey, nkind)
- if err == nil {
- gt.checkNewSubTrans()
- }
- return err
- }
- /*
- StoreEdge stores a single edge in a partition of the graph. This function will
- overwrites any existing edge.
- */
- func (gt *rollingTrans) StoreEdge(part string, edge data.Edge) error {
- gt.transLock.Lock()
- defer gt.transLock.Unlock()
- err := gt.currentTrans.StoreEdge(part, edge)
- if err == nil {
- gt.checkNewSubTrans()
- }
- return err
- }
- /*
- RemoveEdge removes a single edge from a partition of the graph.
- */
- func (gt *rollingTrans) RemoveEdge(part string, ekey string, ekind string) error {
- gt.transLock.Lock()
- defer gt.transLock.Unlock()
- err := gt.currentTrans.RemoveEdge(part, ekey, ekind)
- if err == nil {
- gt.checkNewSubTrans()
- }
- return err
- }
|