Browse Source

feat: ECAL support for EliasDB

Matthias Ladkau 3 years ago
parent
commit
f5ecdb188e

+ 6 - 0
api/rest.go

@@ -16,6 +16,7 @@ import (
 
 	"devt.de/krotik/common/datautil"
 	"devt.de/krotik/eliasdb/cluster"
+	"devt.de/krotik/eliasdb/ecal"
 	"devt.de/krotik/eliasdb/graph"
 	"devt.de/krotik/eliasdb/graph/graphstorage"
 )
@@ -89,6 +90,11 @@ GM is the GraphManager instance which should be used by the REST API.
 */
 var GM *graph.Manager
 
+/*
+SI is the ScriptingInterpreter instance which is working with the api.GM GraphManager instance.
+*/
+var SI *ecal.ScriptingInterpreter
+
 /*
 GS is the GraphStorage instance which should be used by the REST API.
 */

BIN
banner.png


+ 16 - 0
config/config.go

@@ -49,6 +49,8 @@ const (
 	HTTPSPort                = "HTTPSPort"
 	CookieMaxAgeSeconds      = "CookieMaxAgeSeconds"
 	EnableReadOnly           = "EnableReadOnly"
+	EnableECALScripts        = "EnableECALScripts"
+	EnableECALDebugServer    = "EnableECALDebugServer"
 	EnableWebFolder          = "EnableWebFolder"
 	EnableAccessControl      = "EnableAccessControl"
 	EnableWebTerminal        = "EnableWebTerminal"
@@ -59,6 +61,12 @@ const (
 	ClusterStateInfoFile     = "ClusterStateInfoFile"
 	ClusterConfigFile        = "ClusterConfigFile"
 	ClusterLogHistory        = "ClusterLogHistory"
+	ECALScriptFolder         = "ECALScriptFolder"
+	ECALEntryScript          = "ECALEntryScript"
+	ECALLogLevel             = "ECALLogLevel"
+	ECALLogFile              = "ECALLogFile"
+	ECALDebugServerHost      = "ECALDebugServerHost"
+	ECALDebugServerPort      = "ECALDebugServerPort"
 )
 
 /*
@@ -67,6 +75,8 @@ DefaultConfig is the defaut configuration
 var DefaultConfig = map[string]interface{}{
 	MemoryOnlyStorage:        false,
 	EnableReadOnly:           false,
+	EnableECALScripts:        false,
+	EnableECALDebugServer:    false,
 	EnableWebFolder:          true,
 	EnableAccessControl:      false,
 	EnableWebTerminal:        true,
@@ -88,6 +98,12 @@ var DefaultConfig = map[string]interface{}{
 	ClusterStateInfoFile:     "cluster.stateinfo",
 	ClusterConfigFile:        "cluster.config.json",
 	ClusterLogHistory:        100.0,
+	ECALScriptFolder:         "ecal",
+	ECALEntryScript:          "main.ecal",
+	ECALLogLevel:             "info",
+	ECALLogFile:              "",
+	ECALDebugServerHost:      "127.0.0.1",
+	ECALDebugServerPort:      "33274",
 }
 
 /*

+ 229 - 0
ecal.md

@@ -0,0 +1,229 @@
+EliasDB Event Condition Action Language
+=======================================
+
+EliasDB supports a scripting language called [Event Condition Action Language (ECAL)](https://devt.de/krotik/ecal/) to enable rule based scripting functionality.
+
+EliasDB specific events which can be handled:
+--
+The ECAL interpreter in EliasDB receives the following events:
+
+EliasDB Graph Event | ECAL event kind | Event state contents | Description
+-|-|-|-
+graph.EventNodeCreated | `db.node.created` | part, trans, node | A node was created.
+graph.EventNodeUpdated | `db.node.updated` | part, trans, node, old_node | A node was updated.
+graph.EventNodeDeleted | `db.node.deleted` | part, trans, node | A node was deleted.
+graph.EventEdgeCreated | `db.edge.created` | part, trans, edge | An edge was created.
+graph.EventEdgeUpdated | `db.edge.updated` | part, trans, edge, old_edge | An edge was updated.
+graph.EventEdgeDeleted | `db.edge.deleted` | part, trans, edge | An edge was deleted.
+graph.EventNodeStore | `db.node.store` | part, trans, node | A node is about to be stored (always overwriting existing values).
+graph.EventNodeUpdate | `db.node.update` | part, trans, node | A node is about to be updated. Only thrown when graph.UpdateNode is called directly.
+graph.EventNodeDelete | `db.node.delete` | part, trans, key, kind | A node is about to be deleted.
+graph.EventEdgeStore | `db.edge.store` | part, trans, edge | An edge is about to be stored.
+graph.EventEdgeDelete | `db.edge.delete` | part, trans, key, kind | An edge is about to be deleted.
+
+TODO: Explaination
+Add special function to signal that an event has been handled
+
+Note: EliasDB will wait for the event cascade to be finished before performing the actual operation (e.g. inserting a node). If the event handling requires a time consuming operation then a new event cascade should be started using `addEvent` with a scope:
+```
+addEvent("request", "foo.bar.xxx", {
+   "payload" : 123
+}, {
+   "": true  # This scope allows all events
+})
+```
+
+EliasDB specific functions:
+--
+The ECAL interpreter in EliasDB supports the following EliasDB specific functions:
+
+#### `db.storeNode(partition, nodeMap, [transaction])`
+Inserts or updates a node in EliasDB.
+
+Parameter | Description
+-|-
+partition | Partition of the node
+nodeMap | Node object as a map with at least a key and a kind attribute
+transaction | Optional a transaction to group a set of changes
+
+Example:
+```
+db.storeNode("main", {
+  "key" : "foo",
+  "kind" : "bar",
+  "data" : 123,
+})
+```
+
+#### `db.removeNode(partition, nodeKey, nodeKind, [transaction])`
+Removes a node in EliasDB.
+
+Parameter | Description
+-|-
+partition | Partition of the node
+nodeKey | Key attribute of the node to remove
+nodeKind | Kind attribute of the node to remove
+transaction | Optional a transaction to group a set of changes
+
+Example:
+```
+db.removeNode("main", "foo", "bar")
+```
+
+#### `db.fetchNode(partition, nodeKey, nodeKind)`
+Fetches a node in EliasDB.
+
+Parameter | Description
+-|-
+partition | Partition of the node
+nodeKey | Key attribute of the node to fetch
+nodeKind | Kind attribute of the node to fetch
+
+Example:
+```
+db.fetchNode("main", "foo", "bar")
+```
+
+#### `db.storeEdge(partition, edgeMap, [transaction])`
+Inserts or updates an edge in EliasDB.
+
+Parameter | Description
+-|-
+partition | Partition of the edge
+edgeMap | Edge object as a map with at least the main attributes: key, kind, end1cascading, end1key, end1kind, end1role, end2cascading, end2key, end2kind, end2role
+transaction | Optional a transaction to group a set of changes
+
+Example:
+```
+db.storeEdge("main", {
+  "key":           "123",
+  "kind":          "myedges",
+  "end1cascading": true,
+  "end1key":       "foo",
+  "end1kind":      "bar",
+  "end1role":      "role1",
+  "end2cascading": false,
+  "end2key":       "key2",
+  "end2kind":      "kind2",
+  "end2role":      "role2",
+})
+```
+
+#### `db.removeEdge(partition, edgeKey, edgeKind, [transaction])`
+Removes an edge in EliasDB.
+
+Parameter | Description
+-|-
+partition | Partition of the edge
+edgeKey | Key attribute of the edge to remove
+edgeKind | Kind attribute of the edge to remove
+transaction | Optional a transaction to group a set of changes
+
+Example:
+```
+db.removeEdge("main", "123", "myedges")
+```
+
+#### `db.fetchEdge(partition, edgeKey, edgeKind)`
+Fetches an edge in EliasDB.
+
+Parameter | Description
+-|-
+partition | Partition of the edge
+edgeKey | Key attribute of the edge to fetch
+edgeKind | Kind attribute of the edge to fetch
+
+Example:
+```
+db.fetchEdge("main", "123", "myedges")
+```
+
+#### `db.traverse(partition, nodeKey, nodeKind, traversalSpec)`
+Traverses an edge in EliasDB from a given node. Returns a list of nodes which were
+reached and a list of edges which were followed.
+
+Parameter | Description
+-|-
+partition | Partition of the node
+nodeKey | Key attribute of the node to traverse from
+nodeKind | Kind attribute of the node to traverse from
+traversalSpec | Traversal spec
+
+Example:
+```
+[nodes, edges] := db.traverse("main", "foo", "bar", "role1:myedges:role2:kind2")
+```
+
+#### `db.newTrans()`
+Creates a new transaction for EliasDB.
+
+Example:
+```
+trans := db.newTrans()
+```
+
+#### `db.newRollingTrans(n)`
+Creates a new rolling transaction for EliasDB. A rolling transaction commits after n entries.
+
+Parameter | Description
+-|-
+n | Rolling threshold (number of operations before rolling)
+
+Example:
+```
+trans := db.newRollingTrans(5)
+```
+
+#### `db.commit(transaction)`
+Commits an existing transaction for EliasDB.
+
+Parameter | Description
+-|-
+transaction | Transaction to execute
+
+Example:
+```
+db.commit(trans)
+```
+
+#### `db.query(partition, query)`
+Run an EQL query.
+
+Parameter | Description
+-|-
+partition | Partition to query
+query | Query to execute
+
+Example:
+```
+db.commit("main", "get bar")
+```
+
+#### `db.graphQL(partition, query, [variables], [operationName])`
+Run a GraphQL query.
+
+Parameter | Description
+-|-
+partition | Partition to query
+query | Query to execute
+variables | Map of variables for the query
+operationName | Operation to execute (useful if the query defines more than a single operation)
+
+Example:
+```
+db.graphQL("main", "query myquery($x: string) { bar(key:$x) { data }}", {
+  "x": "foo",  
+}, "myquery")
+```
+
+#### `db.raiseGraphEventHandled()`
+When handling a graph event, notify the GraphManager of EliasDB that no further action is necessary.
+
+Example:
+```
+sink mysink
+  kindmatch [ "db.*.*" ],
+{
+  db.raiseGraphEventHandled()
+}
+```

+ 254 - 0
ecal/dbfunc/edge.go

@@ -0,0 +1,254 @@
+/*
+ * EliasDB
+ *
+ * Copyright 2016 Matthias Ladkau. All rights reserved.
+ *
+ * This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/.
+ */
+
+/*
+Package dbfunc contains EliasDB specific functions for the event condition action language (ECAL).
+*/
+package dbfunc
+
+import (
+	"fmt"
+
+	"devt.de/krotik/ecal/parser"
+	"devt.de/krotik/eliasdb/graph"
+	"devt.de/krotik/eliasdb/graph/data"
+)
+
+/*
+StoreEdgeFunc inserts or updates an edge in EliasDB.
+*/
+type StoreEdgeFunc struct {
+	GM *graph.Manager
+}
+
+/*
+Run executes the ECAL function.
+*/
+func (f *StoreEdgeFunc) Run(instanceID string, vs parser.Scope, is map[string]interface{}, tid uint64, args []interface{}) (interface{}, error) {
+	var err error
+
+	if arglen := len(args); arglen != 2 && arglen != 3 {
+		err = fmt.Errorf("Function requires 2 or 3 parameters: partition, edge" +
+			" map and optionally a transaction")
+	}
+
+	if err == nil {
+		var trans graph.Trans
+
+		part := fmt.Sprint(args[0])
+		nodeMap, ok := args[1].(map[interface{}]interface{})
+
+		// Check parameters
+
+		if !ok {
+			err = fmt.Errorf("Second parameter must be a map")
+		}
+
+		if err == nil && len(args) > 2 {
+			if trans, ok = args[2].(graph.Trans); !ok {
+				err = fmt.Errorf("Third parameter must be a transaction")
+			}
+		}
+
+		// Build up node to store
+
+		edge := data.NewGraphEdgeFromNode(NewGraphNodeFromECALMap(nodeMap))
+
+		// Store the edge
+
+		if err == nil {
+
+			if trans != nil {
+				err = trans.StoreEdge(part, edge)
+			} else {
+				err = f.GM.StoreEdge(part, edge)
+			}
+		}
+	}
+
+	return nil, err
+}
+
+/*
+DocString returns a descriptive string.
+*/
+func (f *StoreEdgeFunc) DocString() (string, error) {
+	return "Inserts or updates an edge in EliasDB.", nil
+}
+
+/*
+RemoveEdgeFunc removes an edge in EliasDB.
+*/
+type RemoveEdgeFunc struct {
+	GM *graph.Manager
+}
+
+/*
+Run executes the ECAL function.
+*/
+func (f *RemoveEdgeFunc) Run(instanceID string, vs parser.Scope, is map[string]interface{}, tid uint64, args []interface{}) (interface{}, error) {
+	var err error
+
+	if arglen := len(args); arglen != 3 && arglen != 4 {
+		err = fmt.Errorf("Function requires 3 or 4 parameters: partition, edge key," +
+			" edge kind and optionally a transaction")
+	}
+
+	if err == nil {
+		var trans graph.Trans
+
+		part := fmt.Sprint(args[0])
+		key := fmt.Sprint(args[1])
+		kind := fmt.Sprint(args[2])
+
+		// Check parameters
+
+		if len(args) > 3 {
+			var ok bool
+
+			if trans, ok = args[3].(graph.Trans); !ok {
+				err = fmt.Errorf("Fourth parameter must be a transaction")
+			}
+		}
+
+		// Remove the edge
+
+		if err == nil {
+
+			if trans != nil {
+				err = trans.RemoveEdge(part, key, kind)
+			} else {
+				_, err = f.GM.RemoveEdge(part, key, kind)
+			}
+		}
+	}
+
+	return nil, err
+}
+
+/*
+DocString returns a descriptive string.
+*/
+func (f *RemoveEdgeFunc) DocString() (string, error) {
+	return "Removes an edge in EliasDB.", nil
+}
+
+/*
+FetchEdgeFunc fetches an edge in EliasDB.
+*/
+type FetchEdgeFunc struct {
+	GM *graph.Manager
+}
+
+/*
+Run executes the ECAL function.
+*/
+func (f *FetchEdgeFunc) Run(instanceID string, vs parser.Scope, is map[string]interface{}, tid uint64, args []interface{}) (interface{}, error) {
+	var res interface{}
+	var err error
+
+	if arglen := len(args); arglen != 3 {
+		err = fmt.Errorf("Function requires 3 parameters: partition, edge key and" +
+			" edge kind")
+	}
+
+	if err == nil {
+		var node data.Node
+
+		part := fmt.Sprint(args[0])
+		key := fmt.Sprint(args[1])
+		kind := fmt.Sprint(args[2])
+
+		conv := func(m map[string]interface{}) map[interface{}]interface{} {
+			c := make(map[interface{}]interface{})
+			for k, v := range m {
+				c[k] = v
+			}
+			return c
+		}
+
+		// Fetch the node
+
+		if node, err = f.GM.FetchEdge(part, key, kind); node != nil {
+			res = conv(node.Data())
+		}
+	}
+
+	return res, err
+}
+
+/*
+DocString returns a descriptive string.
+*/
+func (f *FetchEdgeFunc) DocString() (string, error) {
+	return "Fetches an edge in EliasDB.", nil
+}
+
+/*
+TraverseFunc traverses an edge in EliasDB.
+*/
+type TraverseFunc struct {
+	GM *graph.Manager
+}
+
+/*
+Run executes the ECAL function.
+*/
+func (f *TraverseFunc) Run(instanceID string, vs parser.Scope, is map[string]interface{}, tid uint64, args []interface{}) (interface{}, error) {
+	var res interface{}
+	var err error
+
+	if arglen := len(args); arglen != 4 {
+		err = fmt.Errorf("Function requires 4 parameters: partition, node key," +
+			" node kind and a traversal spec")
+	}
+
+	if err == nil {
+		var nodes []data.Node
+		var edges []data.Edge
+
+		part := fmt.Sprint(args[0])
+		key := fmt.Sprint(args[1])
+		kind := fmt.Sprint(args[2])
+		spec := fmt.Sprint(args[3])
+
+		conv := func(m map[string]interface{}) map[interface{}]interface{} {
+			c := make(map[interface{}]interface{})
+			for k, v := range m {
+				c[k] = v
+			}
+			return c
+		}
+
+		// Do the traversal
+
+		if nodes, edges, err = f.GM.TraverseMulti(part, key, kind, spec, true); err == nil {
+
+			resNodes := make([]interface{}, len(nodes))
+			for i, n := range nodes {
+				resNodes[i] = conv(n.Data())
+			}
+			resEdges := make([]interface{}, len(edges))
+			for i, e := range edges {
+				resEdges[i] = conv(e.Data())
+			}
+			res = []interface{}{resNodes, resEdges}
+		}
+	}
+
+	return res, err
+}
+
+/*
+DocString returns a descriptive string.
+*/
+func (f *TraverseFunc) DocString() (string, error) {
+	return "Traverses an edge in EliasDB from a given node.", nil
+}

+ 351 - 0
ecal/dbfunc/edge_test.go

@@ -0,0 +1,351 @@
+/*
+ * EliasDB
+ *
+ * Copyright 2016 Matthias Ladkau. All rights reserved.
+ *
+ * This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/.
+ */
+
+package dbfunc
+
+import (
+	"fmt"
+	"testing"
+
+	"devt.de/krotik/eliasdb/graph"
+	"devt.de/krotik/eliasdb/graph/data"
+	"devt.de/krotik/eliasdb/graph/graphstorage"
+)
+
+func TestStoreAndRemoveEdge(t *testing.T) {
+	mgs := graphstorage.NewMemoryGraphStorage("mystorage")
+	gm := graph.NewGraphManager(mgs)
+
+	se := &StoreEdgeFunc{gm}
+
+	if _, err := se.DocString(); err != nil {
+		t.Error(err)
+		return
+	}
+
+	if _, err := se.Run("", nil, nil, 0, []interface{}{""}); err == nil ||
+		err.Error() != "Function requires 2 or 3 parameters: partition, edge map and optionally a transaction" {
+		t.Error(err)
+		return
+	}
+
+	if _, err := se.Run("", nil, nil, 0, []interface{}{"main", map[interface{}]interface{}{
+		"key": "foo",
+	}}); err == nil ||
+		err.Error() != "GraphError: Invalid data (Edge is missing a kind value)" {
+		t.Error(err)
+		return
+	}
+
+	if _, err := se.Run("", nil, nil, 0, []interface{}{"main", "x"}); err == nil ||
+		err.Error() != "Second parameter must be a map" {
+		t.Error(err)
+		return
+	}
+
+	if _, err := se.Run("", nil, nil, 0, []interface{}{"main", map[interface{}]interface{}{
+		"key": "foo",
+	}, "x"}); err == nil ||
+		err.Error() != "Third parameter must be a transaction" {
+		t.Error(err)
+		return
+	}
+
+	gm.StoreNode("main", data.NewGraphNodeFromMap(map[string]interface{}{
+		"key":  "a",
+		"kind": "b",
+	}))
+	gm.StoreNode("main", data.NewGraphNodeFromMap(map[string]interface{}{
+		"key":  "c",
+		"kind": "d",
+	}))
+
+	if _, err := se.Run("", nil, nil, 0, []interface{}{"main", map[interface{}]interface{}{
+		"key":           "123",
+		"kind":          "e",
+		"end1cascading": true,
+		"end1key":       "a",
+		"end1kind":      "b",
+		"end1role":      "role1",
+		"end2cascading": false,
+		"end2key":       "c",
+		"end2kind":      "d",
+		"end2role":      "role2",
+	}}); err != nil {
+		t.Error(err)
+		return
+	}
+
+	_, err := se.Run("", nil, nil, 0, []interface{}{"main", map[interface{}]interface{}{
+		"key":           "123",
+		"kind":          "e",
+		"end1cascading": true,
+		"end1key":       "a",
+		"end1kind":      "b1",
+		"end1role":      "role1",
+		"end2cascading": false,
+		"end2key":       "c",
+		"end2kind":      "d",
+		"end2role":      "role2",
+	}})
+
+	if err == nil || err.Error() != "GraphError: Invalid data (Can't store edge to non-existing node kind: b1)" {
+		t.Error(err)
+		return
+	}
+
+	fe := &FetchEdgeFunc{gm}
+
+	if _, err := fe.DocString(); err != nil {
+		t.Error(err)
+		return
+	}
+
+	if _, err := fe.Run("", nil, nil, 0, []interface{}{""}); err == nil ||
+		err.Error() != "Function requires 3 parameters: partition, edge key and edge kind" {
+		t.Error(err)
+		return
+	}
+
+	if _, err := fe.Run("", nil, nil, 0, []interface{}{"mai n", "123", "e"}); err == nil ||
+		err.Error() != "GraphError: Invalid data (Partition name mai n is not alphanumeric - can only contain [a-zA-Z0-9_])" {
+		t.Error(err)
+		return
+	}
+
+	res, err := fe.Run("", nil, nil, 0, []interface{}{"main", "123", "e"})
+
+	if fmt.Sprint(data.NewGraphEdgeFromNode(NewGraphNodeFromECALMap(res.(map[interface{}]interface{})))) != `
+GraphEdge:
+              key : 123
+             kind : e
+    end1cascading : true
+          end1key : a
+         end1kind : b
+         end1role : role1
+    end2cascading : false
+          end2key : c
+         end2kind : d
+         end2role : role2
+`[1:] || err != nil {
+		t.Error("Unexpected result:", fmt.Sprint(data.NewGraphEdgeFromNode(NewGraphNodeFromECALMap(res.(map[interface{}]interface{})))), err)
+		return
+	}
+
+	tr := &TraverseFunc{gm}
+
+	if _, err := tr.DocString(); err != nil {
+		t.Error(err)
+		return
+	}
+
+	if _, err := tr.Run("", nil, nil, 0, []interface{}{""}); err == nil ||
+		err.Error() != "Function requires 4 parameters: partition, node key, node kind and a traversal spec" {
+		t.Error(err)
+		return
+	}
+
+	if _, err := tr.Run("", nil, nil, 0, []interface{}{"main", "c", "d", "::"}); err == nil ||
+		err.Error() != "GraphError: Invalid data (Invalid spec: ::)" {
+		t.Error(err)
+		return
+	}
+
+	res, err = tr.Run("", nil, nil, 0, []interface{}{"main", "c", "d", ":::"})
+	if err != nil {
+		t.Error(err)
+		return
+	}
+
+	if fmt.Sprint(data.NewGraphEdgeFromNode(NewGraphNodeFromECALMap(res.([]interface{})[1].([]interface{})[0].(map[interface{}]interface{})))) != `
+GraphEdge:
+              key : 123
+             kind : e
+    end1cascading : false
+          end1key : c
+         end1kind : d
+         end1role : role2
+    end2cascading : true
+          end2key : a
+         end2kind : b
+         end2role : role1
+`[1:] || err != nil {
+		t.Error("Unexpected result:", fmt.Sprint(data.NewGraphEdgeFromNode(NewGraphNodeFromECALMap(res.([]interface{})[1].([]interface{})[0].(map[interface{}]interface{})))), err)
+		return
+	}
+
+	if fmt.Sprint(NewGraphNodeFromECALMap(res.([]interface{})[0].([]interface{})[0].(map[interface{}]interface{}))) != `
+GraphNode:
+     key : a
+    kind : b
+`[1:] || err != nil {
+		t.Error("Unexpected result:", fmt.Sprint(NewGraphNodeFromECALMap(res.([]interface{})[0].([]interface{})[0].(map[interface{}]interface{}))), err)
+		return
+	}
+
+	re := &RemoveEdgeFunc{gm}
+
+	if _, err := re.DocString(); err != nil {
+		t.Error(err)
+		return
+	}
+
+	if _, err := re.Run("", nil, nil, 0, []interface{}{""}); err == nil ||
+		err.Error() != "Function requires 3 or 4 parameters: partition, edge key, edge kind and optionally a transaction" {
+		t.Error(err)
+		return
+	}
+
+	if _, err := re.Run("", nil, nil, 0, []interface{}{"mai n", "123", "e"}); err == nil ||
+		err.Error() != "GraphError: Invalid data (Partition name mai n is not alphanumeric - can only contain [a-zA-Z0-9_])" {
+		t.Error(err)
+		return
+	}
+
+	if _, err := re.Run("", nil, nil, 0, []interface{}{"mai n", "123", "e", "bla"}); err == nil ||
+		err.Error() != "Fourth parameter must be a transaction" {
+		t.Error(err)
+		return
+	}
+
+	if _, err := re.Run("", nil, nil, 0, []interface{}{"main", "123", "e"}); err != nil {
+		t.Error(err)
+		return
+	}
+
+	res, err = fe.Run("", nil, nil, 0, []interface{}{"main", "123", "e"})
+
+	if res != nil || err != nil {
+		t.Error("Unexpected result:", res, err)
+		return
+	}
+}
+
+func TestStoreEdgeTrans(t *testing.T) {
+	mgs := graphstorage.NewMemoryGraphStorage("mystorage")
+	gm := graph.NewGraphManager(mgs)
+
+	sn := &StoreNodeFunc{gm}
+	se := &StoreEdgeFunc{gm}
+	tc := &CommitTransFunc{gm}
+
+	if _, err := tc.DocString(); err != nil {
+		t.Error(err)
+		return
+	}
+
+	trans := graph.NewGraphTrans(gm)
+
+	if _, err := sn.Run("", nil, nil, 0, []interface{}{"main", map[interface{}]interface{}{
+		"key":  "a",
+		"kind": "b",
+	}, trans}); err != nil {
+		t.Error(err)
+		return
+	}
+
+	if _, err := sn.Run("", nil, nil, 0, []interface{}{"main", map[interface{}]interface{}{
+		"key":  "c",
+		"kind": "d",
+	}, trans}); err != nil {
+		t.Error(err)
+		return
+	}
+
+	_, err := se.Run("", nil, nil, 0, []interface{}{"main", map[interface{}]interface{}{
+		"key":           "123",
+		"kind":          "e",
+		"end1cascading": true,
+		"end1key":       "a",
+		"end1kind":      "b",
+		"end1role":      "role1",
+		"end2cascading": false,
+		"end2key":       "c",
+		"end2kind":      "d",
+		"end2role":      "role2",
+	}, trans})
+
+	if err != nil {
+		t.Error(err)
+		return
+	}
+
+	if res := fmt.Sprint(trans.Counts()); res != "2 1 0 0" {
+		t.Error("Unexpected result:", res)
+		return
+	}
+	if _, err := tc.Run("", nil, nil, 0, []interface{}{trans}); err != nil {
+		t.Error(err)
+		return
+	}
+
+	// Check that the nodes have been committed
+
+	if res := fmt.Sprint(trans.Counts()); res != "0 0 0 0" {
+		t.Error("Unexpected result:", res)
+		return
+	}
+
+	if res := gm.EdgeCount("e"); res != 1 {
+		t.Error("Unexpected result:", res)
+		return
+	}
+
+	se.Run("", nil, nil, 0, []interface{}{"main", map[interface{}]interface{}{
+		"key":           "123",
+		"kind":          "e",
+		"end1cascading": true,
+		"end1key":       "a",
+		"end1kind":      "b",
+		"end1role":      "role1",
+		"end2cascading": false,
+		"end2key":       "c1",
+		"end2kind":      "d",
+		"end2role":      "role2",
+	}, trans})
+
+	if _, err := tc.Run("", nil, nil, 0, []interface{}{trans}); err == nil || err.Error() !=
+		"GraphError: Invalid data (Can't find edge endpoint: c1 (d))" {
+		t.Error(err)
+		return
+	}
+
+	re := &RemoveEdgeFunc{}
+
+	if _, err := re.DocString(); err != nil {
+		t.Error(err)
+		return
+	}
+
+	if _, err := re.Run("", nil, nil, 0, []interface{}{"main", "123", "e", trans}); err != nil {
+		t.Error(err)
+		return
+	}
+
+	if res := fmt.Sprint(trans.Counts()); res != "0 0 0 1" {
+		t.Error("Unexpected result:", res)
+		return
+	}
+
+	if _, err := tc.Run("", nil, nil, 0, []interface{}{trans}); err != nil {
+		t.Error(err)
+		return
+	}
+
+	if res := fmt.Sprint(trans.Counts()); res != "0 0 0 0" {
+		t.Error("Unexpected result:", res)
+		return
+	}
+
+	if res := gm.EdgeCount("e"); res != 0 {
+		t.Error("Unexpected result:", res)
+		return
+	}
+}

+ 77 - 0
ecal/dbfunc/eql.go

@@ -0,0 +1,77 @@
+/*
+ * EliasDB
+ *
+ * Copyright 2016 Matthias Ladkau. All rights reserved.
+ *
+ * This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/.
+ */
+
+package dbfunc
+
+import (
+	"fmt"
+
+	"devt.de/krotik/ecal/parser"
+	"devt.de/krotik/eliasdb/eql"
+	"devt.de/krotik/eliasdb/graph"
+)
+
+/*
+QueryFunc runs an EQL query.
+*/
+type QueryFunc struct {
+	GM *graph.Manager
+}
+
+/*
+Run executes the ECAL function.
+*/
+func (f *QueryFunc) Run(instanceID string, vs parser.Scope, is map[string]interface{}, tid uint64, args []interface{}) (interface{}, error) {
+	var err error
+	var cols, rows []interface{}
+
+	if arglen := len(args); arglen != 2 {
+		err = fmt.Errorf("Function requires 2 parameters: partition and a query string")
+	}
+
+	if err == nil {
+		var res eql.SearchResult
+
+		part := fmt.Sprint(args[0])
+		query := fmt.Sprint(args[1])
+
+		res, err = eql.RunQuery("db.query", part, query, f.GM)
+
+		if err != nil {
+			return nil, err
+		}
+
+		// Convert result to rumble data structure
+
+		labels := res.Header().Labels()
+		cols = make([]interface{}, len(labels))
+		for i, v := range labels {
+			cols[i] = v
+		}
+
+		rrows := res.Rows()
+		rows = make([]interface{}, len(rrows))
+		for i, v := range rrows {
+			rows[i] = v
+		}
+	}
+
+	return map[interface{}]interface{}{
+		"cols": cols,
+		"rows": rows,
+	}, err
+}
+
+/*
+DocString returns a descriptive string.
+*/
+func (f *QueryFunc) DocString() (string, error) {
+	return "Run an EQL query.", nil
+}

+ 71 - 0
ecal/dbfunc/eql_test.go

@@ -0,0 +1,71 @@
+/*
+ * EliasDB
+ *
+ * Copyright 2016 Matthias Ladkau. All rights reserved.
+ *
+ * This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/.
+ */
+
+package dbfunc
+
+import (
+	"fmt"
+	"testing"
+
+	"devt.de/krotik/eliasdb/graph"
+	"devt.de/krotik/eliasdb/graph/data"
+	"devt.de/krotik/eliasdb/graph/graphstorage"
+)
+
+func TestQuery(t *testing.T) {
+	mgs := graphstorage.NewMemoryGraphStorage("mystorage")
+	gm := graph.NewGraphManager(mgs)
+
+	gm.StoreNode("main", data.NewGraphNodeFromMap(map[string]interface{}{
+		"key":  "a",
+		"kind": "b",
+	}))
+	gm.StoreNode("main", data.NewGraphNodeFromMap(map[string]interface{}{
+		"key":  "c",
+		"kind": "d",
+	}))
+
+	q := &QueryFunc{gm}
+
+	if _, err := q.DocString(); err != nil {
+		t.Error(err)
+		return
+	}
+
+	if _, err := q.Run("", nil, nil, 0, []interface{}{""}); err == nil ||
+		err.Error() != "Function requires 2 parameters: partition and a query string" {
+		t.Error(err)
+		return
+	}
+
+	res, err := q.Run("", nil, nil, 0, []interface{}{"main", "get b"})
+
+	if err != nil {
+		t.Error(err)
+		return
+	}
+
+	if res := res.(map[interface{}]interface{})["rows"]; fmt.Sprint(res) != "[[a]]" {
+		t.Error("Unexpected result:", res)
+		return
+	}
+
+	if res := res.(map[interface{}]interface{})["cols"]; fmt.Sprint(res) != "[B Key]" {
+		t.Error("Unexpected result:", res)
+		return
+	}
+
+	_, err = q.Run("", nil, nil, 0, []interface{}{"main", "got b"})
+
+	if err == nil || err.Error() != "EQL error in db.query: Invalid construct (Unknown query type: got) (Line:1 Pos:1)" {
+		t.Error(err)
+		return
+	}
+}

+ 85 - 0
ecal/dbfunc/graphql.go

@@ -0,0 +1,85 @@
+/*
+ * EliasDB
+ *
+ * Copyright 2016 Matthias Ladkau. All rights reserved.
+ *
+ * This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/.
+ */
+
+package dbfunc
+
+import (
+	"fmt"
+
+	"devt.de/krotik/ecal/parser"
+	"devt.de/krotik/ecal/scope"
+	"devt.de/krotik/eliasdb/graph"
+	"devt.de/krotik/eliasdb/graphql"
+)
+
+/*
+GraphQLFunc runs a GraphQL query.
+*/
+type GraphQLFunc struct {
+	GM *graph.Manager
+}
+
+/*
+Run executes the ECAL function.
+*/
+func (f *GraphQLFunc) Run(instanceID string, vs parser.Scope, is map[string]interface{}, tid uint64, args []interface{}) (interface{}, error) {
+	var err error
+	var ret interface{}
+
+	if arglen := len(args); arglen < 2 {
+		err = fmt.Errorf("Function requires at least 2 parameters: partition and query with optionally a map of variables and an operation name.")
+	}
+
+	if err == nil {
+		var res, varMap map[string]interface{}
+
+		part := fmt.Sprint(args[0])
+		query := fmt.Sprint(args[1])
+		opname := ""
+
+		if err == nil && len(args) > 2 {
+			varECALMap, ok := args[2].(map[interface{}]interface{})
+
+			if !ok {
+				err = fmt.Errorf("Third parameter must be a map")
+			} else {
+				varMap = make(map[string]interface{})
+				for k, v := range varECALMap {
+					varMap[fmt.Sprint(k)] = v
+				}
+			}
+		}
+
+		if err == nil && len(args) > 3 {
+			opname = fmt.Sprint(args[3])
+		}
+
+		if err == nil {
+			res, err = graphql.RunQuery("db.query", part, map[string]interface{}{
+				"operationName": opname,
+				"query":         query,
+				"variables":     varMap,
+			}, f.GM, nil, false)
+
+			if err == nil {
+				ret = scope.ConvertJSONToECALObject(res)
+			}
+		}
+	}
+
+	return ret, err
+}
+
+/*
+DocString returns a descriptive string.
+*/
+func (f *GraphQLFunc) DocString() (string, error) {
+	return "Run a GraphQL query.", nil
+}

+ 77 - 0
ecal/dbfunc/graphql_test.go

@@ -0,0 +1,77 @@
+/*
+ * EliasDB
+ *
+ * Copyright 2016 Matthias Ladkau. All rights reserved.
+ *
+ * This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/.
+ */
+
+package dbfunc
+
+import (
+	"fmt"
+	"testing"
+
+	"devt.de/krotik/eliasdb/graph"
+	"devt.de/krotik/eliasdb/graph/data"
+	"devt.de/krotik/eliasdb/graph/graphstorage"
+)
+
+func TestGraphQL(t *testing.T) {
+	mgs := graphstorage.NewMemoryGraphStorage("mystorage")
+	gm := graph.NewGraphManager(mgs)
+
+	gm.StoreNode("main", data.NewGraphNodeFromMap(map[string]interface{}{
+		"key":  "a",
+		"kind": "b",
+		"foo":  "bar1",
+	}))
+	gm.StoreNode("main", data.NewGraphNodeFromMap(map[string]interface{}{
+		"key":  "c",
+		"kind": "b",
+		"foo":  "bar2",
+	}))
+
+	q := &GraphQLFunc{gm}
+
+	if _, err := q.DocString(); err != nil {
+		t.Error(err)
+		return
+	}
+
+	if _, err := q.Run("", nil, nil, 0, []interface{}{""}); err == nil ||
+		err.Error() != "Function requires at least 2 parameters: partition and query with optionally a map of variables and an operation name." {
+		t.Error(err)
+		return
+	}
+
+	if _, err := q.Run("", nil, nil, 0, []interface{}{"", "", ""}); err == nil ||
+		err.Error() != "Third parameter must be a map" {
+		t.Error(err)
+		return
+	}
+
+	res, err := q.Run("", nil, nil, 0, []interface{}{"main",
+		`query foo($x: string) { b(key:$x) { foo }}`, map[interface{}]interface{}{
+			"x": "c",
+		}, "foo"})
+
+	if err != nil {
+		t.Error(err)
+		return
+	}
+
+	if fmt.Sprint(res) != "map[data:map[b:[map[foo:bar2]]]]" {
+		t.Error("Unexpected result:", res)
+		return
+	}
+
+	_, err = q.Run("", nil, nil, 0, []interface{}{"main", "aaaaa"})
+
+	if err == nil || err.Error() != "Fatal GraphQL operation error in db.query: Missing operation (No executable expression found) (Line:1 Pos:1)" {
+		t.Error(err)
+		return
+	}
+}

+ 202 - 0
ecal/dbfunc/node.go

@@ -0,0 +1,202 @@
+/*
+ * EliasDB
+ *
+ * Copyright 2016 Matthias Ladkau. All rights reserved.
+ *
+ * This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/.
+ */
+
+package dbfunc
+
+import (
+	"fmt"
+
+	"devt.de/krotik/ecal/parser"
+	"devt.de/krotik/eliasdb/graph"
+	"devt.de/krotik/eliasdb/graph/data"
+)
+
+/*
+StoreNodeFunc inserts or updates a node in EliasDB.
+*/
+type StoreNodeFunc struct {
+	GM *graph.Manager
+}
+
+/*
+Run executes the ECAL function.
+*/
+func (f *StoreNodeFunc) Run(instanceID string, vs parser.Scope, is map[string]interface{}, tid uint64, args []interface{}) (interface{}, error) {
+	var err error
+
+	if arglen := len(args); arglen != 2 && arglen != 3 {
+		err = fmt.Errorf("Function requires 2 or 3 parameters: partition, node" +
+			" map and optionally a transaction")
+	}
+
+	if err == nil {
+		var trans graph.Trans
+
+		part := fmt.Sprint(args[0])
+		nodeMap, ok := args[1].(map[interface{}]interface{})
+
+		// Check parameters
+
+		if !ok {
+			err = fmt.Errorf("Second parameter must be a map")
+		}
+
+		if err == nil && len(args) > 2 {
+			if trans, ok = args[2].(graph.Trans); !ok {
+				err = fmt.Errorf("Third parameter must be a transaction")
+			}
+		}
+
+		// Store the node
+
+		if err == nil {
+			node := NewGraphNodeFromECALMap(nodeMap)
+
+			if trans != nil {
+				err = trans.StoreNode(part, node)
+			} else {
+				err = f.GM.StoreNode(part, node)
+			}
+		}
+	}
+
+	return nil, err
+}
+
+/*
+DocString returns a descriptive string.
+*/
+func (f *StoreNodeFunc) DocString() (string, error) {
+	return "Inserts or updates a node in EliasDB.", nil
+}
+
+/*
+RemoveNodeFunc removes a node in EliasDB.
+*/
+type RemoveNodeFunc struct {
+	GM *graph.Manager
+}
+
+/*
+Run executes the ECAL function.
+*/
+func (f *RemoveNodeFunc) Run(instanceID string, vs parser.Scope, is map[string]interface{}, tid uint64, args []interface{}) (interface{}, error) {
+	var err error
+
+	if arglen := len(args); arglen != 3 && arglen != 4 {
+		err = fmt.Errorf("Function requires 3 or 4 parameters: partition, node key" +
+			" node kind and optionally a transaction")
+	}
+
+	if err == nil {
+		var trans graph.Trans
+
+		part := fmt.Sprint(args[0])
+		key := fmt.Sprint(args[1])
+		kind := fmt.Sprint(args[2])
+
+		// Check parameters
+
+		if len(args) > 3 {
+			var ok bool
+
+			if trans, ok = args[3].(graph.Trans); !ok {
+				err = fmt.Errorf("Fourth parameter must be a transaction")
+			}
+		}
+
+		// Remove the node
+
+		if err == nil {
+
+			if trans != nil {
+				err = trans.RemoveNode(part, key, kind)
+			} else {
+				_, err = f.GM.RemoveNode(part, key, kind)
+			}
+		}
+	}
+
+	return nil, err
+}
+
+/*
+DocString returns a descriptive string.
+*/
+func (f *RemoveNodeFunc) DocString() (string, error) {
+	return "Removes a node in EliasDB.", nil
+}
+
+/*
+FetchNodeFunc fetches a node in EliasDB.
+*/
+type FetchNodeFunc struct {
+	GM *graph.Manager
+}
+
+/*
+Run executes the ECAL function.
+*/
+func (f *FetchNodeFunc) Run(instanceID string, vs parser.Scope, is map[string]interface{}, tid uint64, args []interface{}) (interface{}, error) {
+	var res interface{}
+	var err error
+
+	if arglen := len(args); arglen != 3 {
+		err = fmt.Errorf("Function requires 3 parameters: partition, node key" +
+			" node kind")
+	}
+
+	if err == nil {
+		var node data.Node
+
+		part := fmt.Sprint(args[0])
+		key := fmt.Sprint(args[1])
+		kind := fmt.Sprint(args[2])
+
+		conv := func(m map[string]interface{}) map[interface{}]interface{} {
+			c := make(map[interface{}]interface{})
+			for k, v := range m {
+				c[k] = v
+			}
+			return c
+		}
+
+		// Fetch the node
+
+		if node, err = f.GM.FetchNode(part, key, kind); node != nil {
+			res = conv(node.Data())
+		}
+	}
+
+	return res, err
+}
+
+/*
+DocString returns a descriptive string.
+*/
+func (f *FetchNodeFunc) DocString() (string, error) {
+	return "Fetches a node in EliasDB.", nil
+}
+
+// Helper functions
+// ================
+
+/*
+NewGraphNodeFromECALMap creates a new Node instance from a given map.
+*/
+func NewGraphNodeFromECALMap(d map[interface{}]interface{}) data.Node {
+	node := data.NewGraphNode()
+
+	for k, v := range d {
+		node.SetAttr(fmt.Sprint(k), v)
+	}
+
+	return node
+}

+ 321 - 0
ecal/dbfunc/node_test.go

@@ -0,0 +1,321 @@
+/*
+ * EliasDB
+ *
+ * Copyright 2016 Matthias Ladkau. All rights reserved.
+ *
+ * This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/.
+ */
+
+package dbfunc
+
+import (
+	"fmt"
+	"testing"
+
+	"devt.de/krotik/eliasdb/graph"
+	"devt.de/krotik/eliasdb/graph/graphstorage"
+)
+
+func TestStoreAndRemoveNode(t *testing.T) {
+
+	mgs := graphstorage.NewMemoryGraphStorage("mystorage")
+	gm := graph.NewGraphManager(mgs)
+
+	sn := &StoreNodeFunc{gm}
+
+	if _, err := sn.DocString(); err != nil {
+		t.Error(err)
+		return
+	}
+
+	if _, err := sn.Run("", nil, nil, 0, []interface{}{""}); err == nil ||
+		err.Error() != "Function requires 2 or 3 parameters: partition, node map and optionally a transaction" {
+		t.Error(err)
+		return
+	}
+
+	if _, err := sn.Run("", nil, nil, 0, []interface{}{"", "bla"}); err == nil ||
+		err.Error() != "Second parameter must be a map" {
+		t.Error(err)
+		return
+	}
+
+	if _, err := sn.Run("", nil, nil, 0, []interface{}{"main", map[interface{}]interface{}{}, "bla"}); err == nil ||
+		err.Error() != "Third parameter must be a transaction" {
+		t.Error(err)
+		return
+	}
+
+	if _, err := sn.Run("", nil, nil, 0, []interface{}{"main", map[interface{}]interface{}{
+		"key": "foo",
+	}}); err == nil ||
+		err.Error() != "GraphError: Invalid data (Node is missing a kind value)" {
+		t.Error(err)
+		return
+	}
+
+	if _, err := sn.Run("", nil, nil, 0, []interface{}{"main", map[interface{}]interface{}{
+		"key":  "foo",
+		"kind": "bar",
+	}}); err != nil {
+		t.Error(err)
+		return
+	}
+
+	if res := gm.NodeCount("bar"); res != 1 {
+		t.Error("Unexpected result:", res)
+		return
+	}
+
+	fn := &FetchNodeFunc{gm}
+
+	if _, err := fn.DocString(); err != nil {
+		t.Error(err)
+		return
+	}
+
+	if _, err := fn.Run("", nil, nil, 0, []interface{}{""}); err == nil ||
+		err.Error() != "Function requires 3 parameters: partition, node key node kind" {
+		t.Error(err)
+		return
+	}
+
+	if _, err := fn.Run("", nil, nil, 0, []interface{}{"main", "foo", "ba r"}); err == nil ||
+		err.Error() != "GraphError: Invalid data (Node kind ba r is not alphanumeric - can only contain [a-zA-Z0-9_])" {
+		t.Error(err)
+		return
+	}
+
+	res, err := fn.Run("", nil, nil, 0, []interface{}{"main", "foo", "bar"})
+	if fmt.Sprint(NewGraphNodeFromECALMap(res.(map[interface{}]interface{}))) != `
+GraphNode:
+     key : foo
+    kind : bar
+`[1:] || err != nil {
+		t.Error("Unexpected result:", res, err)
+		return
+	}
+
+	rn := &RemoveNodeFunc{gm}
+
+	if _, err := rn.DocString(); err != nil {
+		t.Error(err)
+		return
+	}
+
+	if _, err := rn.Run("", nil, nil, 0, []interface{}{""}); err == nil ||
+		err.Error() != "Function requires 3 or 4 parameters: partition, node key node kind and optionally a transaction" {
+		t.Error(err)
+		return
+	}
+
+	if _, err := rn.Run("", nil, nil, 0, []interface{}{"mai n", "foo", "bar"}); err == nil ||
+		err.Error() != "GraphError: Invalid data (Partition name mai n is not alphanumeric - can only contain [a-zA-Z0-9_])" {
+		t.Error(err)
+		return
+	}
+
+	_, err = rn.Run("", nil, nil, 0, []interface{}{"main", "foo", "bar"})
+	if err != nil {
+		t.Error(err)
+		return
+	}
+
+	res, err = fn.Run("", nil, nil, 0, []interface{}{"main", "foo", "bar"})
+	if res != nil || err != nil {
+		t.Error("Unexpected result:", res, err)
+		return
+	}
+}
+
+func TestStoreNodeTrans(t *testing.T) {
+	mgs := graphstorage.NewMemoryGraphStorage("mystorage")
+	gm := graph.NewGraphManager(mgs)
+
+	tn := &NewTransFunc{gm}
+
+	if _, err := tn.DocString(); err != nil {
+		t.Error(err)
+		return
+	}
+
+	tn2 := &NewRollingTransFunc{gm}
+
+	if _, err := tn2.DocString(); err != nil {
+		t.Error(err)
+		return
+	}
+
+	tc := &CommitTransFunc{gm}
+
+	if _, err := tc.DocString(); err != nil {
+		t.Error(err)
+		return
+	}
+
+	if _, err := tn.Run("", nil, nil, 0, []interface{}{""}); err == nil ||
+		err.Error() != "Function does not require any parameters" {
+		t.Error(err)
+		return
+	}
+	if _, err := tn2.Run("", nil, nil, 0, []interface{}{"", ""}); err == nil ||
+		err.Error() != "Function requires the rolling threshold (number of operations before rolling)" {
+		t.Error(err)
+		return
+	}
+	if _, err := tc.Run("", nil, nil, 0, []interface{}{"", ""}); err == nil ||
+		err.Error() != "Function requires the transaction to commit as parameter" {
+		t.Error(err)
+		return
+	}
+
+	if _, err := tc.Run("", nil, nil, 0, []interface{}{""}); err == nil ||
+		err.Error() != "Parameter must be a transaction" {
+		t.Error(err)
+		return
+	}
+
+	trans, err := tn.Run("", nil, nil, 0, []interface{}{})
+	if err != nil {
+		t.Error(err)
+		return
+	}
+
+	_, err = tn2.Run("", nil, nil, 0, []interface{}{"foo"})
+	if err == nil || err.Error() != "Rolling threshold must be a number not: foo" {
+		t.Error(err)
+		return
+	}
+
+	_, err = tn2.Run("", nil, nil, 0, []interface{}{1})
+	if err != nil {
+		t.Error(err)
+		return
+	}
+
+	sn := &StoreNodeFunc{gm}
+
+	if _, err := sn.Run("", nil, nil, 0, []interface{}{"main", map[interface{}]interface{}{
+		"key":  "foo1",
+		"kind": "bar",
+	}, trans}); err != nil {
+		t.Error(err)
+		return
+	}
+
+	if _, err := sn.Run("", nil, nil, 0, []interface{}{"main", map[interface{}]interface{}{
+		"key":  "foo2",
+		"kind": "bar",
+	}, trans}); err != nil {
+		t.Error(err)
+		return
+	}
+
+	if _, err := sn.Run("", nil, nil, 0, []interface{}{"main", map[interface{}]interface{}{
+		"key":  "foo3",
+		"kind": "bar",
+	}, trans}); err != nil {
+		t.Error(err)
+		return
+	}
+
+	// Check that the nodes are in the transaction
+
+	if res := fmt.Sprint(trans.(graph.Trans).Counts()); res != "3 0 0 0" {
+		t.Error("Unexpected result:", res)
+		return
+	}
+
+	if res := gm.NodeCount("bar"); res != 0 {
+		t.Error("Unexpected result:", res)
+		return
+	}
+
+	// Commit the nodes
+
+	if _, err := tc.Run("", nil, nil, 0, []interface{}{"main", map[interface{}]interface{}{
+		"key":  "foo3",
+		"kind": "bar",
+	}, trans}); err == nil || err.Error() != "Function requires the transaction to commit as parameter" {
+		t.Error(err)
+		return
+	}
+
+	if _, err := tc.Run("", nil, nil, 0, []interface{}{trans}); err != nil {
+		t.Error(err)
+		return
+	}
+
+	// Check that the nodes have been committed
+
+	if res := fmt.Sprint(trans.(graph.Trans).Counts()); res != "0 0 0 0" {
+		t.Error("Unexpected result:", res)
+		return
+	}
+
+	if res := gm.NodeCount("bar"); res != 3 {
+		t.Error("Unexpected result:", res)
+		return
+	}
+
+	// Remove the nodes
+
+	rn := &RemoveNodeFunc{gm}
+
+	_, err = rn.Run("", nil, nil, 0, []interface{}{"main", "foo1", "bar", nil})
+	if err == nil || err.Error() != "Fourth parameter must be a transaction" {
+		t.Error(err)
+		return
+	}
+
+	_, err = rn.Run("", nil, nil, 0, []interface{}{"main", "foo1", "bar", trans})
+	if err != nil {
+		t.Error(err)
+		return
+	}
+
+	_, err = rn.Run("", nil, nil, 0, []interface{}{"main", "foo2", "bar", trans})
+	if err != nil {
+		t.Error(err)
+		return
+	}
+
+	_, err = rn.Run("", nil, nil, 0, []interface{}{"main", "foo3", "bar", trans})
+	if err != nil {
+		t.Error(err)
+		return
+	}
+
+	// Check that the nodes are in the transaction
+
+	if res := fmt.Sprint(trans.(graph.Trans).Counts()); res != "0 0 3 0" {
+		t.Error("Unexpected result:", res)
+		return
+	}
+
+	if res := gm.NodeCount("bar"); res != 3 {
+		t.Error("Unexpected result:", res)
+		return
+	}
+
+	// Commit the nodes
+
+	if _, err := tc.Run("", nil, nil, 0, []interface{}{trans}); err != nil {
+		t.Error(err)
+		return
+	}
+
+	// Check that the nodes have been committed
+
+	if res := fmt.Sprint(trans.(graph.Trans).Counts()); res != "0 0 0 0" {
+		t.Error("Unexpected result:", res)
+		return
+	}
+
+	if res := gm.NodeCount("bar"); res != 0 {
+		t.Error("Unexpected result:", res)
+		return
+	}
+}

+ 127 - 0
ecal/dbfunc/trans.go

@@ -0,0 +1,127 @@
+/*
+ * EliasDB
+ *
+ * Copyright 2016 Matthias Ladkau. All rights reserved.
+ *
+ * This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/.
+ */
+
+package dbfunc
+
+import (
+	"fmt"
+	"strconv"
+
+	"devt.de/krotik/ecal/parser"
+	"devt.de/krotik/eliasdb/graph"
+)
+
+/*
+NewTransFunc creates a new transaction for EliasDB.
+*/
+type NewTransFunc struct {
+	GM *graph.Manager
+}
+
+/*
+Run executes the ECAL function.
+*/
+func (f *NewTransFunc) Run(instanceID string, vs parser.Scope, is map[string]interface{}, tid uint64, args []interface{}) (interface{}, error) {
+	var err error
+
+	if len(args) != 0 {
+		err = fmt.Errorf("Function does not require any parameters")
+	}
+
+	return graph.NewConcurrentGraphTrans(f.GM), err
+}
+
+/*
+DocString returns a descriptive string.
+*/
+func (f *NewTransFunc) DocString() (string, error) {
+	return "Creates a new transaction for EliasDB.", nil
+}
+
+/*
+NewRollingTransFunc creates a new rolling transaction for EliasDB.
+A rolling transaction commits after n entries.
+*/
+type NewRollingTransFunc struct {
+	GM *graph.Manager
+}
+
+/*
+Run executes the ECAL function.
+*/
+func (f *NewRollingTransFunc) Run(instanceID string, vs parser.Scope, is map[string]interface{}, tid uint64, args []interface{}) (interface{}, error) {
+	var err error
+	var trans graph.Trans
+
+	if arglen := len(args); arglen != 1 {
+		err = fmt.Errorf(
+			"Function requires the rolling threshold (number of operations before rolling)")
+	}
+
+	if err == nil {
+		var i int
+
+		if i, err = strconv.Atoi(fmt.Sprint(args[0])); err != nil {
+			err = fmt.Errorf("Rolling threshold must be a number not: %v", args[0])
+		} else {
+			trans = graph.NewRollingTrans(graph.NewConcurrentGraphTrans(f.GM),
+				i, f.GM, graph.NewConcurrentGraphTrans)
+		}
+	}
+
+	return trans, err
+}
+
+/*
+DocString returns a descriptive string.
+*/
+func (f *NewRollingTransFunc) DocString() (string, error) {
+	return "Creates a new rolling transaction for EliasDB. A rolling transaction commits after n entries.", nil
+}
+
+/*
+CommitTransFunc commits an existing transaction for EliasDB.
+*/
+type CommitTransFunc struct {
+	GM *graph.Manager
+}
+
+/*
+Run executes the ECAL function.
+*/
+func (f *CommitTransFunc) Run(instanceID string, vs parser.Scope, is map[string]interface{}, tid uint64, args []interface{}) (interface{}, error) {
+	var err error
+
+	if arglen := len(args); arglen != 1 {
+		err = fmt.Errorf(
+			"Function requires the transaction to commit as parameter")
+	}
+
+	if err == nil {
+		trans, ok := args[0].(graph.Trans)
+
+		// Check parameters
+
+		if !ok {
+			err = fmt.Errorf("Parameter must be a transaction")
+		} else {
+			err = trans.Commit()
+		}
+	}
+
+	return nil, err
+}
+
+/*
+DocString returns a descriptive string.
+*/
+func (f *CommitTransFunc) DocString() (string, error) {
+	return "Commits an existing transaction for EliasDB.", nil
+}

+ 38 - 0
ecal/dbfunc/util.go

@@ -0,0 +1,38 @@
+/*
+ * EliasDB
+ *
+ * Copyright 2016 Matthias Ladkau. All rights reserved.
+ *
+ * This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/.
+ */
+
+package dbfunc
+
+import (
+	"devt.de/krotik/ecal/parser"
+	"devt.de/krotik/eliasdb/graph"
+)
+
+/*
+RaiseGraphEventHandledFunc returns the special graph.ErrEventHandled error which a sink,
+handling graph events, can return to notify the GraphManager that no further
+action is necessary.
+*/
+type RaiseGraphEventHandledFunc struct {
+}
+
+/*
+Run executes the ECAL function.
+*/
+func (f *RaiseGraphEventHandledFunc) Run(instanceID string, vs parser.Scope, is map[string]interface{}, tid uint64, args []interface{}) (interface{}, error) {
+	return nil, graph.ErrEventHandled
+}
+
+/*
+DocString returns a descriptive string.
+*/
+func (f *RaiseGraphEventHandledFunc) DocString() (string, error) {
+	return "When handling a graph event, notify the GraphManager of EliasDB that no further action is necessary.", nil
+}

+ 32 - 0
ecal/dbfunc/util_test.go

@@ -0,0 +1,32 @@
+/*
+ * EliasDB
+ *
+ * Copyright 2016 Matthias Ladkau. All rights reserved.
+ *
+ * This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/.
+ */
+
+package dbfunc
+
+import (
+	"testing"
+
+	"devt.de/krotik/eliasdb/graph"
+)
+
+func TestRaiseGraphEventHandled(t *testing.T) {
+
+	sn := &RaiseGraphEventHandledFunc{}
+
+	if _, err := sn.DocString(); err != nil {
+		t.Error(err)
+		return
+	}
+
+	if _, err := sn.Run("", nil, nil, 0, []interface{}{}); err != graph.ErrEventHandled {
+		t.Error("Unexpected result:", err)
+		return
+	}
+}

+ 241 - 0
ecal/eventbridge.go

@@ -0,0 +1,241 @@
+/*
+ * EliasDB
+ *
+ * Copyright 2016 Matthias Ladkau. All rights reserved.
+ *
+ * This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/.
+ */
+
+/*
+Package ecal contains the main API for the event condition action language (ECAL).
+*/
+package ecal
+
+import (
+	"fmt"
+	"strings"
+
+	"devt.de/krotik/common/errorutil"
+	"devt.de/krotik/ecal/engine"
+	"devt.de/krotik/ecal/scope"
+	"devt.de/krotik/ecal/util"
+	"devt.de/krotik/eliasdb/graph"
+	"devt.de/krotik/eliasdb/graph/data"
+)
+
+/*
+EventMapping is a mapping between EliasDB event types to EliasDB specific event kinds in ECAL.
+*/
+var EventMapping = map[int]string{
+
+	/*
+	   EventNodeCreated is thrown when a node was created.
+
+	   Parameters: partition of created node, created node
+	*/
+	graph.EventNodeCreated: "db.node.created",
+
+	/*
+	   EventNodeUpdated is thrown when a node was updated.
+
+	   Parameters: partition of updated node, updated node, old node
+	*/
+	graph.EventNodeUpdated: "db.node.updated",
+
+	/*
+	   EventNodeDeleted is thrown when a node was deleted.
+
+	   Parameters: partition of deleted node, deleted node
+	*/
+	graph.EventNodeDeleted: "db.node.deleted",
+
+	/*
+	   EventEdgeCreated is thrown when an edge was created.
+
+	   Parameters: partition of created edge, created edge
+	*/
+	graph.EventEdgeCreated: "db.edge.created",
+
+	/*
+	   EventEdgeUpdated is thrown when an edge was updated.
+
+	   Parameters: partition of updated edge, updated edge, old edge
+	*/
+	graph.EventEdgeUpdated: "db.edge.updated",
+
+	/*
+	   EventEdgeDeleted is thrown when an edge was deleted.
+
+	   Parameters: partition of deleted edge, deleted edge
+	*/
+	graph.EventEdgeDeleted: "db.edge.deleted",
+
+	/*
+	   EventNodeStore is thrown before a node is stored (always overwriting existing values).
+
+	   Parameters: partition of node to store, node to store
+	*/
+	graph.EventNodeStore: "db.node.store",
+
+	/*
+	   EventNodeUpdate is thrown before a node is updated.
+
+	   Parameters: partition of node to update, node to update
+	*/
+	graph.EventNodeUpdate: "db.node.update",
+
+	/*
+	   EventNodeDelete is thrown before a node is deleted.
+
+	   Parameters: partition of node to delete, key of node to delete, kind of node to delete
+	*/
+	graph.EventNodeDelete: "db.node.delete",
+
+	/*
+	   EventEdgeStore is thrown before an edge is stored (always overwriting existing values).
+
+	   Parameters: partition of stored edge, stored edge
+	*/
+	graph.EventEdgeStore: "db.edge.store",
+
+	/*
+	   EventEdgeDelete is thrown before an edge is deleted.
+
+	   Parameters: partition of deleted edge, deleted edge
+	*/
+	graph.EventEdgeDelete: "db.edge.delete",
+}
+
+/*
+EventBridge is a rule for a graph manager to forward all graph events to ECAL.
+*/
+type EventBridge struct {
+	Processor engine.Processor
+	Logger    util.Logger
+}
+
+/*
+Name returns the name of the rule.
+*/
+func (eb *EventBridge) Name() string {
+	return "ecal.eventbridge"
+}
+
+/*
+Handles returns a list of events which are handled by this rule.
+*/
+func (eb *EventBridge) Handles() []int {
+	return []int{
+		graph.EventNodeCreated,
+		graph.EventNodeUpdated,
+		graph.EventNodeDeleted,
+		graph.EventEdgeCreated,
+		graph.EventEdgeUpdated,
+		graph.EventEdgeDeleted,
+		graph.EventNodeStore,
+		graph.EventNodeUpdate,
+		graph.EventNodeDelete,
+		graph.EventEdgeStore,
+		graph.EventEdgeDelete,
+	}
+}
+
+/*
+Handle handles an event.
+*/
+func (eb *EventBridge) Handle(gm *graph.Manager, trans graph.Trans, event int, ed ...interface{}) error {
+	var err error
+
+	if name, ok := EventMapping[event]; ok {
+		eventName := fmt.Sprintf("EliasDB: %v", name)
+		eventKind := strings.Split(name, ".")
+
+		// Construct an event which can be used to check if any rule will trigger.
+		// This is to avoid the relative costly state construction below for events
+		// which would not trigger any rules.
+
+		triggerCheckEvent := engine.NewEvent(eventName, eventKind, nil)
+
+		if !eb.Processor.IsTriggering(triggerCheckEvent) {
+			return nil
+		}
+
+		// Build up state
+
+		state := map[interface{}]interface{}{
+			"part":  fmt.Sprint(ed[0]),
+			"trans": trans,
+		}
+
+		// Include the right arguments into the state
+
+		switch event {
+		case graph.EventNodeCreated, graph.EventNodeUpdate, graph.EventNodeDeleted, graph.EventNodeStore:
+			state["node"] = scope.ConvertJSONToECALObject(ed[1].(data.Node).Data())
+
+		case graph.EventNodeUpdated:
+			state["node"] = scope.ConvertJSONToECALObject(ed[1].(data.Node).Data())
+			state["old_node"] = scope.ConvertJSONToECALObject(ed[2].(data.Node).Data())
+
+		case graph.EventEdgeCreated, graph.EventEdgeDeleted, graph.EventEdgeStore:
+			state["edge"] = scope.ConvertJSONToECALObject(ed[1].(data.Edge).Data())
+
+		case graph.EventEdgeUpdated:
+			state["edge"] = scope.ConvertJSONToECALObject(ed[1].(data.Edge).Data())
+			state["old_edge"] = scope.ConvertJSONToECALObject(ed[2].(data.Edge).Data())
+
+		case graph.EventNodeDelete, graph.EventEdgeDelete:
+			state["key"] = fmt.Sprint(ed[1])
+			state["kind"] = fmt.Sprint(ed[2])
+		}
+
+		// Try to inject the event
+
+		event := engine.NewEvent(fmt.Sprintf("EliasDB: %v", name), strings.Split(name, "."), state)
+
+		var m engine.Monitor
+		m, err = eb.Processor.AddEventAndWait(event, nil)
+
+		if err == nil {
+
+			// If there was no direct error adding the event then check if an error was
+			// raised in a sink
+
+			if errs := m.(*engine.RootMonitor).AllErrors(); len(errs) > 0 {
+				var errList []error
+
+				for _, e := range errs {
+
+					addError := true
+
+					for _, se := range e.ErrorMap {
+
+						// Check if the sink returned a special graph.ErrEventHandled error
+
+						if re, ok := se.(*util.RuntimeErrorWithDetail); ok && re.Detail == graph.ErrEventHandled.Error() {
+							addError = false
+						}
+					}
+
+					if addError {
+						errList = append(errList, e)
+					}
+				}
+
+				if len(errList) > 0 {
+					err = &errorutil.CompositeError{errList}
+				} else {
+					err = graph.ErrEventHandled
+				}
+			}
+		}
+
+		if err != nil {
+			eb.Logger.LogDebug(fmt.Sprintf("EliasDB event %v was handled by ECAL and returned: %v", name, err))
+		}
+	}
+
+	return err
+}

+ 151 - 0
ecal/interpreter.go

@@ -0,0 +1,151 @@
+/*
+ * EliasDB
+ *
+ * Copyright 2016 Matthias Ladkau. All rights reserved.
+ *
+ * This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/.
+ */
+
+package ecal
+
+import (
+	"fmt"
+	"io/ioutil"
+	"path/filepath"
+	"strings"
+
+	"devt.de/krotik/common/fileutil"
+	"devt.de/krotik/ecal/cli/tool"
+	"devt.de/krotik/ecal/stdlib"
+	"devt.de/krotik/ecal/util"
+	"devt.de/krotik/eliasdb/config"
+	"devt.de/krotik/eliasdb/ecal/dbfunc"
+	"devt.de/krotik/eliasdb/graph"
+)
+
+/*
+ScriptingInterpreter models a ECAL script interpreter instance.
+*/
+type ScriptingInterpreter struct {
+	GM        *graph.Manager // GraphManager for the interpreter
+	Dir       string         // Root dir for interpreter
+	EntryFile string         // Entry file for the program
+	LogLevel  string         // Log level string (Debug, Info, Error)
+	LogFile   string         // Logfile (blank for stdout)
+
+	RunDebugServer  bool   // Run a debug server
+	DebugServerHost string // Debug server host
+	DebugServerPort string // Debug server port
+}
+
+/*
+NewScriptingInterpreter returns a new ECAL scripting interpreter.
+*/
+func NewScriptingInterpreter(scriptFolder string, gm *graph.Manager) *ScriptingInterpreter {
+	return &ScriptingInterpreter{
+		GM:              gm,
+		Dir:             scriptFolder,
+		EntryFile:       filepath.Join(scriptFolder, config.Str(config.ECALEntryScript)),
+		LogLevel:        config.Str(config.ECALLogLevel),
+		LogFile:         config.Str(config.ECALLogFile),
+		RunDebugServer:  config.Bool(config.EnableECALDebugServer),
+		DebugServerHost: config.Str(config.ECALDebugServerHost),
+		DebugServerPort: config.Str(config.ECALDebugServerPort),
+	}
+}
+
+/*
+dummyEntryFile is a small valid ECAL which does not do anything. It is used
+as the default entry file if no entry file exists.
+*/
+const dummyEntryFile = `0 # Write your ECAL code here
+`
+
+/*
+Run runs the ECAL scripting interpreter.
+*/
+func (si *ScriptingInterpreter) Run() error {
+	var err error
+
+	// Ensure we have a dummy entry point
+
+	if ok, _ := fileutil.PathExists(si.EntryFile); !ok {
+		err = ioutil.WriteFile(si.EntryFile, []byte(dummyEntryFile), 0600)
+	}
+
+	if err == nil {
+		i := tool.NewCLIInterpreter()
+
+		i.Dir = &si.Dir
+		i.LogFile = &si.LogFile
+		i.LogLevel = &si.LogLevel
+
+		i.EntryFile = si.EntryFile
+		i.LoadPlugins = true
+
+		i.CreateRuntimeProvider("eliasdb-runtime")
+
+		// Adding functions
+
+		stdlib.AddStdlibPkg("db", "EliasDB related functions")
+
+		stdlib.AddStdlibFunc("db", "storeNode", &dbfunc.StoreNodeFunc{GM: si.GM})
+		stdlib.AddStdlibFunc("db", "removeNode", &dbfunc.RemoveNodeFunc{GM: si.GM})
+		stdlib.AddStdlibFunc("db", "fetchNode", &dbfunc.FetchNodeFunc{GM: si.GM})
+		stdlib.AddStdlibFunc("db", "storeEdge", &dbfunc.StoreEdgeFunc{GM: si.GM})
+		stdlib.AddStdlibFunc("db", "removeEdge", &dbfunc.RemoveEdgeFunc{GM: si.GM})
+		stdlib.AddStdlibFunc("db", "fetchEdge", &dbfunc.FetchEdgeFunc{GM: si.GM})
+		stdlib.AddStdlibFunc("db", "traverse", &dbfunc.TraverseFunc{GM: si.GM})
+		stdlib.AddStdlibFunc("db", "newTrans", &dbfunc.NewTransFunc{GM: si.GM})
+		stdlib.AddStdlibFunc("db", "newRollingTrans", &dbfunc.NewRollingTransFunc{GM: si.GM})
+		stdlib.AddStdlibFunc("db", "commit", &dbfunc.CommitTransFunc{GM: si.GM})
+		stdlib.AddStdlibFunc("db", "query", &dbfunc.QueryFunc{GM: si.GM})
+		stdlib.AddStdlibFunc("db", "graphQL", &dbfunc.GraphQLFunc{GM: si.GM})
+		stdlib.AddStdlibFunc("db", "raiseGraphEventHandled", &dbfunc.RaiseGraphEventHandledFunc{})
+
+		if err == nil {
+
+			if si.RunDebugServer {
+				di := tool.NewCLIDebugInterpreter(i)
+
+				addr := fmt.Sprintf("%v:%v", si.DebugServerHost, si.DebugServerPort)
+				di.DebugServerAddr = &addr
+				di.RunDebugServer = &si.RunDebugServer
+				falseFlag := false
+				di.EchoDebugServer = &falseFlag
+				di.Interactive = &falseFlag
+				di.BreakOnStart = &falseFlag
+				trueFlag := true
+				di.BreakOnError = &trueFlag
+
+				err = di.Interpret()
+
+			} else {
+
+				err = i.Interpret(false)
+			}
+
+			// Rules for the GraphManager are loaded after the code has been
+			// executed and all rules have been added. The ECA processor can now be started.
+
+			i.RuntimeProvider.Processor.Start()
+
+			// EliasDB graph events can not be forwarded to ECAL via the eventbridge.
+
+			si.GM.SetGraphRule(&EventBridge{
+				Processor: i.RuntimeProvider.Processor,
+				Logger:    i.RuntimeProvider.Logger,
+			})
+		}
+	}
+
+	// Include a traceback if possible
+
+	if ss, ok := err.(util.TraceableRuntimeError); ok {
+		err = fmt.Errorf("%v\n  %v", err.Error(), strings.Join(ss.GetTraceString(), "\n  "))
+	}
+
+	return err
+}

+ 308 - 0
ecal/interpreter_test.go

@@ -0,0 +1,308 @@
+/*
+ * EliasDB
+ *
+ * Copyright 2016 Matthias Ladkau. All rights reserved.
+ *
+ * This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0. If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/.
+ */
+
+package ecal
+
+import (
+	"flag"
+	"fmt"
+	"io/ioutil"
+	"os"
+	"path/filepath"
+	"testing"
+
+	"devt.de/krotik/common/errorutil"
+	"devt.de/krotik/common/fileutil"
+	"devt.de/krotik/eliasdb/config"
+	"devt.de/krotik/eliasdb/graph"
+	"devt.de/krotik/eliasdb/graph/data"
+	"devt.de/krotik/eliasdb/graph/graphstorage"
+)
+
+const testScriptDir = "testscripts"
+
+func TestMain(m *testing.M) {
+	flag.Parse()
+
+	defer func() {
+		if res, _ := fileutil.PathExists(testScriptDir); res {
+			if err := os.RemoveAll(testScriptDir); err != nil {
+				fmt.Print("Could not remove test directory:", err.Error())
+			}
+		}
+	}()
+
+	if res, _ := fileutil.PathExists(testScriptDir); res {
+		if err := os.RemoveAll(testScriptDir); err != nil {
+			fmt.Print("Could not remove test directory:", err.Error())
+		}
+	}
+
+	ensurePath(testScriptDir)
+
+	data := make(map[string]interface{})
+	for k, v := range config.DefaultConfig {
+		data[k] = v
+	}
+
+	config.Config = data
+
+	config.Config[config.EnableECALScripts] = true
+	config.Config[config.ECALScriptFolder] = testScriptDir
+	config.Config[config.ECALLogFile] = filepath.Join(testScriptDir, "interpreter.log")
+
+	// Run the tests
+
+	m.Run()
+}
+
+/*
+ensurePath ensures that a given relative path exists.
+*/
+func ensurePath(path string) {
+	if res, _ := fileutil.PathExists(path); !res {
+		if err := os.Mkdir(path, 0770); err != nil {
+			fmt.Print("Could not create directory:", err.Error())
+			return
+		}
+	}
+}
+
+func writeScript(content string) {
+	filename := filepath.Join(testScriptDir, config.Str(config.ECALEntryScript))
+	err := ioutil.WriteFile(
+		filename,
+		[]byte(content), 0600)
+	errorutil.AssertOk(err)
+	os.Remove(config.Str(config.ECALLogFile))
+}
+
+func checkLog(expected string) error {
+	var err error
+
+	content, err := ioutil.ReadFile(config.Str(config.ECALLogFile))
+	errorutil.AssertOk(err)
+
+	logtext := string(content)
+
+	if logtext != expected {
+		err = fmt.Errorf("Unexpected log text:\n%v", logtext)
+	}
+
+	return err
+}
+
+func TestDebugInterpreter(t *testing.T) {
+
+	config.Config[config.EnableECALDebugServer] = true
+	defer func() {
+		config.Config[config.EnableECALDebugServer] = false
+		errorutil.AssertOk(os.Remove(config.Str(config.ECALLogFile)))
+
+	}()
+
+	mgs := graphstorage.NewMemoryGraphStorage("mystorage")
+	gm := graph.NewGraphManager(mgs)
+
+	ds := NewScriptingInterpreter(testScriptDir, gm)
+
+	filename := filepath.Join(testScriptDir, config.Str(config.ECALEntryScript))
+	os.Remove(filename)
+
+	if err := ds.Run(); err != nil {
+		t.Error("Unexpected result:", err)
+		return
+	}
+}
+
+func TestInterpreter(t *testing.T) {
+
+	mgs := graphstorage.NewMemoryGraphStorage("mystorage")
+	gm := graph.NewGraphManager(mgs)
+
+	ds := NewScriptingInterpreter(testScriptDir, gm)
+
+	writeScript(`
+log("test insert")
+`)
+
+	if err := ds.Run(); err != nil {
+		t.Error("Unexpected result:", err)
+		return
+	}
+
+	if err := checkLog(`test insert
+`); err != nil {
+		t.Error(err)
+	}
+
+	writeScript(`
+db.storeNode("main", {
+  "key" : "foo",
+  "kind" : "bar",
+  "data" : 123,
+})
+
+db.storeNode("main", {
+  "key" : "key2",
+  "kind" : "kind2",
+  "data" : 456,
+})
+
+db.storeEdge("main", {
+  "key":           "123",
+  "kind":          "myedges",
+  "end1cascading": true,
+  "end1key":       "foo",
+  "end1kind":      "bar",
+  "end1role":      "role1",
+  "end2cascading": false,
+  "end2key":       "key2",
+  "end2kind":      "kind2",
+  "end2role":      "role2",
+})
+
+[n, e] := db.traverse("main", "key2", "kind2", "role2:myedges:role1:bar")
+
+log("nodes: ", n, " edges: ", e)
+`)
+
+	// The store statements should trigger the triggerCheck shortcut in the eventbridge
+	// because no rules are defined to handle the events.
+
+	if err := ds.Run(); err != nil {
+		t.Error("Unexpected result:", err)
+		return
+	}
+
+	if err := checkLog(`nodes: [
+  {
+    "data": 123,
+    "key": "foo",
+    "kind": "bar"
+  }
+] edges: [
+  {
+    "end1cascading": false,
+    "end1key": "key2",
+    "end1kind": "kind2",
+    "end1role": "role2",
+    "end2cascading": true,
+    "end2key": "foo",
+    "end2kind": "bar",
+    "end2role": "role1",
+    "key": "123",
+    "kind": "myedges"
+  }
+]
+`); err != nil {
+		t.Error(err)
+	}
+}
+
+func TestEvents(t *testing.T) {
+	mgs := graphstorage.NewMemoryGraphStorage("mystorage")
+	gm := graph.NewGraphManager(mgs)
+
+	ds := NewScriptingInterpreter(testScriptDir, gm)
+
+	writeScript(`
+sink mysink
+  kindmatch [ "db.*.*" ],
+{
+  log("Got event: ", event)
+  if event.state.node.key == "foo2" {
+    raise("Oh no")
+  }
+  if event.state.node.key == "foo3" {
+    db.raiseGraphEventHandled()
+  }
+}
+`)
+
+	if err := ds.Run(); err != nil {
+		t.Error("Unexpected result:", err)
+		return
+	}
+
+	err := gm.StoreNode("main", data.NewGraphNodeFromMap(map[string]interface{}{
+		"key":  "foo",
+		"kind": "bar",
+		"data": 123,
+	}))
+	errorutil.AssertOk(err)
+
+	if err := checkLog(`Got event: {
+  "kind": "db.node.store",
+  "name": "EliasDB: db.node.store",
+  "state": {
+    "node": {
+      "data": 123,
+      "key": "foo",
+      "kind": "bar"
+    },
+    "part": "main",
+    "trans": {}
+  }
+}
+Got event: {
+  "kind": "db.node.created",
+  "name": "EliasDB: db.node.created",
+  "state": {
+    "node": {
+      "data": 123,
+      "key": "foo",
+      "kind": "bar"
+    },
+    "part": "main",
+    "trans": {}
+  }
+}
+`); err != nil {
+		t.Error(err)
+	}
+
+	// Test raising an error before node storage
+
+	err = gm.StoreNode("main", data.NewGraphNodeFromMap(map[string]interface{}{
+		"key":  "foo2",
+		"kind": "bar",
+		"data": 123,
+	}))
+
+	if err == nil || err.Error() != `GraphError: Graph rule error (Taskerror:
+EliasDB: db.node.store -> mysink : ECAL error in eliasdb-runtime: Oh no () (Line:7 Pos:5))` {
+		t.Error("Unexpected result:", err)
+		return
+	}
+
+	if res, err := gm.FetchNode("main", "foo2", "bar"); res != nil || err != nil {
+		t.Error("Unexpected result:", res, err)
+		return
+	}
+
+	// Test preventing node storage without raising an error
+
+	err = gm.StoreNode("main", data.NewGraphNodeFromMap(map[string]interface{}{
+		"key":  "foo3",
+		"kind": "bar",
+		"data": 123,
+	}))
+
+	if err != nil {
+		t.Error("Unexpected result:", err)
+		return
+	}
+
+	if res, err := gm.FetchNode("main", "foo2", "bar"); res != nil || err != nil {
+		t.Error("Unexpected result:", res, err)
+		return
+	}
+}

+ 2 - 1
go.mod

@@ -3,6 +3,7 @@ module devt.de/krotik/eliasdb
 go 1.12
 
 require (
-	devt.de/krotik/common v1.4.0
+	devt.de/krotik/common v1.4.1
+	devt.de/krotik/ecal v1.2.0
 	github.com/gorilla/websocket v1.4.1
 )

+ 4 - 0
go.sum

@@ -1,4 +1,8 @@
 devt.de/krotik/common v1.4.0 h1:chZihshmuv1yehyujrYyW7Yg4cgRqqIWEG2IAzhfFkA=
 devt.de/krotik/common v1.4.0/go.mod h1:X4nsS85DAxyHkwSg/Tc6+XC2zfmGeaVz+37F61+eSaI=
+devt.de/krotik/common v1.4.1 h1:gsZ9OrV+Eo4ar8Y5iLs1lAdWd8aRIcPQJ0CVxLp0uys=
+devt.de/krotik/common v1.4.1/go.mod h1:X4nsS85DAxyHkwSg/Tc6+XC2zfmGeaVz+37F61+eSaI=
+devt.de/krotik/ecal v1.2.0 h1:gaGUNVpx9YInNHctQ7Ciur53Wd8qdCwn5CKJoVWZgJ8=
+devt.de/krotik/ecal v1.2.0/go.mod h1:0qIx3h+EjUnStgdEUnwAeO44UluTSLcpBWXA5zEw0hQ=
 github.com/gorilla/websocket v1.4.1 h1:q7AeDBpnBk8AogcD4DSag/Ukw/KV+YhzLj2bP5HvKCM=
 github.com/gorilla/websocket v1.4.1/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=

+ 50 - 6
graph/globals.go

@@ -100,6 +100,8 @@ the full text search index.
 */
 package graph
 
+import "errors"
+
 /*
 VERSION of the GraphManager
 */
@@ -225,43 +227,85 @@ const PrefixNSEdge = "\x04"
 //=============
 
 /*
-EventNodeCreated is thrown when a node gets created.
+EventNodeCreated is thrown when a node was created.
 
 Parameters: partition of created node, created node
 */
 const EventNodeCreated = 0x01
 
 /*
-EventNodeUpdated is thrown when a node gets updated.
+EventNodeUpdated is thrown when a node was updated.
 
 Parameters: partition of updated node, updated node, old node
 */
 const EventNodeUpdated = 0x02
 
 /*
-EventNodeDeleted is thrown when a node gets deleted.
+EventNodeDeleted is thrown when a node was deleted.
 
 Parameters: partition of deleted node, deleted node
 */
 const EventNodeDeleted = 0x03
 
 /*
-EventEdgeCreated is thrown when an edge gets created.
+EventEdgeCreated is thrown when an edge was created.
 
 Parameters: partition of created edge, created edge
 */
 const EventEdgeCreated = 0x04
 
 /*
-EventEdgeUpdated is thrown when an edge gets updated.
+EventEdgeUpdated is thrown when an edge was updated.
 
 Parameters: partition of updated edge, updated edge, old edge
 */
 const EventEdgeUpdated = 0x05
 
 /*
-EventEdgeDeleted is thrown when an edge gets deleted.
+EventEdgeDeleted is thrown when an edge was deleted.
 
 Parameters: partition of deleted edge, deleted edge
 */
 const EventEdgeDeleted = 0x06
+
+/*
+EventNodeStore is thrown before a node is stored (always overwriting existing values).
+
+Parameters: partition of node to store, node to store
+*/
+const EventNodeStore = 0x07
+
+/*
+EventNodeUpdate is thrown before a node is updated.
+
+Parameters: partition of node to update, node to update
+*/
+const EventNodeUpdate = 0x08
+
+/*
+EventNodeDelete is thrown before a node is deleted.
+
+Parameters: partition of node to delete, key of node to delete, kind of node to delete
+*/
+const EventNodeDelete = 0x09
+
+/*
+EventEdgeStore is thrown before an edge is stored (always overwriting existing values).
+
+Parameters: partition of stored edge, stored edge
+*/
+const EventEdgeStore = 0x0A
+
+/*
+EventEdgeDelete is thrown before an edge is deleted.
+
+Parameters: partition of deleted edge, key of edge to delete, kind of edge to delete
+*/
+const EventEdgeDelete = 0x0B
+
+/*
+ErrEventHandled is a special error which an event handler can return to
+notify the GraphManager that no further action is necessary. No error will
+be returned by the GraphManager operation.
+*/
+var ErrEventHandled = errors.New("Event handled upstream")

+ 169 - 137
graph/graphmanager_edges.go

@@ -330,89 +330,116 @@ StoreEdge stores a single edge in a partition of the graph. This function will
 overwrites any existing edge.
 */
 func (gm *Manager) StoreEdge(part string, edge data.Edge) error {
+	trans := newInternalGraphTrans(gm)
+	trans.subtrans = true
 
-	// Check if the edge can be stored
+	err := gm.gr.graphEvent(trans, EventEdgeStore, part, edge)
 
-	if err := gm.checkEdge(edge); err != nil {
+	if err != nil {
+		if err == ErrEventHandled {
+			err = nil
+		}
 		return err
 	}
 
-	// Get the HTrees which stores the edges and the edge index
+	if err = trans.Commit(); err == nil {
 
-	iht, err := gm.getEdgeIndexHTree(part, edge.Kind(), true)
-	if err != nil {
-		return err
-	}
+		// Check if the edge can be stored
 
-	edgeht, err := gm.getEdgeStorageHTree(part, edge.Kind(), true)
-	if err != nil {
-		return err
-	}
+		if err := gm.checkEdge(edge); err != nil {
+			return err
+		}
 
-	// Get the HTrees which stores the edge endpoints and make sure the endpoints
-	// do exist
+		// Get the HTrees which stores the edges and the edge index
 
-	end1nodeht, end1ht, err := gm.getNodeStorageHTree(part, edge.End1Kind(), false)
+		iht, err := gm.getEdgeIndexHTree(part, edge.Kind(), true)
+		if err != nil {
+			return err
+		}
 
-	if err != nil {
-		return err
-	} else if end1ht == nil {
-		return &util.GraphError{
-			Type:   util.ErrInvalidData,
-			Detail: "Can't store edge to non-existing node kind: " + edge.End1Kind(),
+		edgeht, err := gm.getEdgeStorageHTree(part, edge.Kind(), true)
+		if err != nil {
+			return err
 		}
-	} else if end1, err := end1nodeht.Get([]byte(PrefixNSAttrs + edge.End1Key())); err != nil || end1 == nil {
-		return &util.GraphError{
-			Type:   util.ErrInvalidData,
-			Detail: fmt.Sprintf("Can't find edge endpoint: %s (%s)", edge.End1Key(), edge.End1Kind()),
+
+		// Get the HTrees which stores the edge endpoints and make sure the endpoints
+		// do exist
+
+		end1nodeht, end1ht, err := gm.getNodeStorageHTree(part, edge.End1Kind(), false)
+
+		if err != nil {
+			return err
+		} else if end1ht == nil {
+			return &util.GraphError{
+				Type:   util.ErrInvalidData,
+				Detail: "Can't store edge to non-existing node kind: " + edge.End1Kind(),
+			}
+		} else if end1, err := end1nodeht.Get([]byte(PrefixNSAttrs + edge.End1Key())); err != nil || end1 == nil {
+			return &util.GraphError{
+				Type:   util.ErrInvalidData,
+				Detail: fmt.Sprintf("Can't find edge endpoint: %s (%s)", edge.End1Key(), edge.End1Kind()),
+			}
 		}
-	}
 
-	end2nodeht, end2ht, err := gm.getNodeStorageHTree(part, edge.End2Kind(), false)
+		end2nodeht, end2ht, err := gm.getNodeStorageHTree(part, edge.End2Kind(), false)
 
-	if err != nil {
-		return err
-	} else if end2ht == nil {
-		return &util.GraphError{
-			Type:   util.ErrInvalidData,
-			Detail: "Can't store edge to non-existing node kind: " + edge.End2Kind(),
+		if err != nil {
+			return err
+		} else if end2ht == nil {
+			return &util.GraphError{
+				Type:   util.ErrInvalidData,
+				Detail: "Can't store edge to non-existing node kind: " + edge.End2Kind(),
+			}
+		} else if end2, err := end2nodeht.Get([]byte(PrefixNSAttrs + edge.End2Key())); err != nil || end2 == nil {
+			return &util.GraphError{
+				Type:   util.ErrInvalidData,
+				Detail: fmt.Sprintf("Can't find edge endpoint: %s (%s)", edge.End2Key(), edge.End2Kind()),
+			}
 		}
-	} else if end2, err := end2nodeht.Get([]byte(PrefixNSAttrs + edge.End2Key())); err != nil || end2 == nil {
-		return &util.GraphError{
-			Type:   util.ErrInvalidData,
-			Detail: fmt.Sprintf("Can't find edge endpoint: %s (%s)", edge.End2Key(), edge.End2Kind()),
+
+		// Take writer lock
+
+		gm.mutex.Lock()
+		defer gm.mutex.Unlock()
+
+		// Write edge to the datastore
+
+		oldedge, err := gm.writeEdge(edge, edgeht, end1ht, end2ht)
+		if err != nil {
+			return err
 		}
-	}
 
-	// Take writer lock
+		// Increase edge count if the edge was inserted and write the changes
+		// to the index.
 
-	gm.mutex.Lock()
-	defer gm.mutex.Unlock()
+		if oldedge == nil {
 
-	// Write edge to the datastore
+			// Increase edge count
 
-	oldedge, err := gm.writeEdge(edge, edgeht, end1ht, end2ht)
-	if err != nil {
-		return err
-	}
+			currentCount := gm.EdgeCount(edge.Kind())
+			if err := gm.writeEdgeCount(edge.Kind(), currentCount+1, true); err != nil {
+				return err
+			}
 
-	// Increase edge count if the edge was inserted and write the changes
-	// to the index.
+			// Write edge data to the index
 
-	if oldedge == nil {
+			if iht != nil {
 
-		// Increase edge count
+				if err := util.NewIndexManager(iht).Index(edge.Key(), edge.IndexMap()); err != nil {
 
-		currentCount := gm.EdgeCount(edge.Kind())
-		if err := gm.writeEdgeCount(edge.Kind(), currentCount+1, true); err != nil {
-			return err
-		}
+					// The edge was written at this point and the model is
+					// consistent only the index is missing entries
 
-		// Write edge data to the index
+					return err
+				}
+			}
 
-		if iht != nil {
+		} else if iht != nil {
 
-			if err := util.NewIndexManager(iht).Index(edge.Key(), edge.IndexMap()); err != nil {
+			err := util.NewIndexManager(iht).Reindex(edge.Key(), edge.IndexMap(),
+				oldedge.IndexMap())
+
+			if err != nil {
 
 				// The edge was written at this point and the model is
 				// consistent only the index is missing entries
@@ -421,49 +448,38 @@ func (gm *Manager) StoreEdge(part string, edge data.Edge) error {
 			}
 		}
 
-	} else if iht != nil {
-
-		err := util.NewIndexManager(iht).Reindex(edge.Key(), edge.IndexMap(),
-			oldedge.IndexMap())
+		// Execute rules
 
-		if err != nil {
+		trans := newInternalGraphTrans(gm)
+		trans.subtrans = true
 
-			// The edge was written at this point and the model is
-			// consistent only the index is missing entries
+		var event int
+		if oldedge == nil {
+			event = EventEdgeCreated
+		} else {
+			event = EventEdgeUpdated
+		}
 
+		if err := gm.gr.graphEvent(trans, event, part, edge, oldedge); err != nil && err != ErrEventHandled {
+			return err
+		} else if err := trans.Commit(); err != nil {
 			return err
 		}
-	}
 
-	// Execute rules
-
-	trans := newInternalGraphTrans(gm)
-	trans.subtrans = true
-
-	var event int
-	if oldedge == nil {
-		event = EventEdgeCreated
-	} else {
-		event = EventEdgeUpdated
-	}
-
-	if err := gm.gr.graphEvent(trans, event, part, edge, oldedge); err != nil {
-		return err
-	} else if err := trans.Commit(); err != nil {
-		return err
-	}
+		// Flush changes - errors only reported on the actual node storage flush
 
-	// Flush changes - errors only reported on the actual node storage flush
+		gm.gs.FlushMain()
 
-	gm.gs.FlushMain()
+		gm.flushEdgeIndex(part, edge.Kind())
 
-	gm.flushEdgeIndex(part, edge.Kind())
+		gm.flushNodeStorage(part, edge.End1Kind())
 
-	gm.flushNodeStorage(part, edge.End1Kind())
+		gm.flushNodeStorage(part, edge.End2Kind())
 
-	gm.flushNodeStorage(part, edge.End2Kind())
+		err = gm.flushEdgeStorage(part, edge.Kind())
+	}
 
-	return gm.flushEdgeStorage(part, edge.Kind())
+	return err
 }
 
 /*
@@ -604,91 +620,107 @@ func (gm *Manager) writeEdge(edge data.Edge, edgeTree *hash.HTree,
 RemoveEdge removes a single edge from a partition of the graph.
 */
 func (gm *Manager) RemoveEdge(part string, key string, kind string) (data.Edge, error) {
+	var err error
 
-	// Get the HTrees which stores the edges and the edge index
+	trans := newInternalGraphTrans(gm)
+	trans.subtrans = true
 
-	iht, err := gm.getEdgeIndexHTree(part, kind, true)
-	if err != nil {
+	if err = gm.gr.graphEvent(trans, EventEdgeDelete, part, key, kind); err != nil {
+		if err == ErrEventHandled {
+			err = nil
+		}
 		return nil, err
 	}
 
-	edgeht, err := gm.getEdgeStorageHTree(part, kind, true)
-	if err != nil {
-		return nil, err
-	}
+	err = trans.Commit()
 
-	// Take writer lock
+	if err == nil {
 
-	gm.mutex.Lock()
-	defer gm.mutex.Unlock()
+		// Get the HTrees which stores the edges and the edge index
 
-	// Delete the node from the datastore
+		iht, err := gm.getEdgeIndexHTree(part, kind, true)
+		if err != nil {
+			return nil, err
+		}
 
-	node, err := gm.deleteNode(key, kind, edgeht, edgeht)
-	edge := data.NewGraphEdgeFromNode(node)
-	if err != nil {
-		return edge, err
-	}
+		edgeht, err := gm.getEdgeStorageHTree(part, kind, true)
+		if err != nil {
+			return nil, err
+		}
 
-	if node != nil {
+		// Take writer lock
 
-		// Get the HTrees which stores the edge endpoints
+		gm.mutex.Lock()
+		defer gm.mutex.Unlock()
 
-		_, end1ht, err := gm.getNodeStorageHTree(part, edge.End1Kind(), false)
-		if err != nil {
-			return edge, err
-		}
+		// Delete the node from the datastore
 
-		_, end2ht, err := gm.getNodeStorageHTree(part, edge.End2Kind(), false)
+		node, err := gm.deleteNode(key, kind, edgeht, edgeht)
+		edge := data.NewGraphEdgeFromNode(node)
 		if err != nil {
 			return edge, err
 		}
 
-		// Delete edge info from node storage
+		if node != nil {
 
-		if err := gm.deleteEdge(edge, end1ht, end2ht); err != nil {
-			return edge, err
-		}
+			// Get the HTrees which stores the edge endpoints
 
-		if iht != nil {
-			err := util.NewIndexManager(iht).Deindex(key, edge.IndexMap())
+			_, end1ht, err := gm.getNodeStorageHTree(part, edge.End1Kind(), false)
 			if err != nil {
 				return edge, err
 			}
-		}
 
-		// Decrease edge count
+			_, end2ht, err := gm.getNodeStorageHTree(part, edge.End2Kind(), false)
+			if err != nil {
+				return edge, err
+			}
 
-		currentCount := gm.EdgeCount(edge.Kind())
-		if err := gm.writeEdgeCount(edge.Kind(), currentCount-1, true); err != nil {
-			return edge, err
-		}
+			// Delete edge info from node storage
 
-		// Execute rules
+			if err := gm.deleteEdge(edge, end1ht, end2ht); err != nil {
+				return edge, err
+			}
 
-		trans := newInternalGraphTrans(gm)
-		trans.subtrans = true
+			if iht != nil {
+				err := util.NewIndexManager(iht).Deindex(key, edge.IndexMap())
+				if err != nil {
+					return edge, err
+				}
+			}
 
-		if err := gm.gr.graphEvent(trans, EventEdgeDeleted, part, edge); err != nil {
-			return edge, err
-		} else if err := trans.Commit(); err != nil {
-			return edge, err
-		}
+			// Decrease edge count
 
-		// Flush changes - errors only reported on the actual node storage flush
+			currentCount := gm.EdgeCount(edge.Kind())
+			if err := gm.writeEdgeCount(edge.Kind(), currentCount-1, true); err != nil {
+				return edge, err
+			}
 
-		gm.gs.FlushMain()
+			// Execute rules
 
-		gm.flushEdgeIndex(part, edge.Kind())
+			trans := newInternalGraphTrans(gm)
+			trans.subtrans = true
 
-		gm.flushNodeStorage(part, edge.End1Kind())
+			if err := gm.gr.graphEvent(trans, EventEdgeDeleted, part, edge); err != nil && err != ErrEventHandled {
+				return edge, err
+			} else if err := trans.Commit(); err != nil {
+				return edge, err
+			}
 
-		gm.flushNodeStorage(part, edge.End2Kind())
+			// Flush changes - errors only reported on the actual node storage flush
+
+			gm.gs.FlushMain()
+
+			gm.flushEdgeIndex(part, edge.Kind())
 
-		return edge, gm.flushEdgeStorage(part, edge.Kind())
+			gm.flushNodeStorage(part, edge.End1Kind())
+
+			gm.flushNodeStorage(part, edge.End2Kind())
+
+			return edge, gm.flushEdgeStorage(part, edge.Kind())
+		}
 	}
 
-	return nil, nil
+	return nil, err
 }
 
 /*

+ 91 - 43
graph/graphmanager_nodes.go

@@ -184,7 +184,23 @@ StoreNode stores a single node in a partition of the graph. This function will
 overwrites any existing node.
 */
 func (gm *Manager) StoreNode(part string, node data.Node) error {
-	return gm.storeOrUpdateNode(part, node, false)
+	trans := newInternalGraphTrans(gm)
+	trans.subtrans = true
+
+	err := gm.gr.graphEvent(trans, EventNodeStore, part, node)
+
+	if err != nil {
+		if err == ErrEventHandled {
+			err = nil
+		}
+		return err
+	}
+
+	if err = trans.Commit(); err == nil {
+		err = gm.storeOrUpdateNode(part, node, false)
+	}
+
+	return err
 }
 
 /*
@@ -192,7 +208,23 @@ UpdateNode updates a single node in a partition of the graph. This function will
 only update the given values of the node.
 */
 func (gm *Manager) UpdateNode(part string, node data.Node) error {
-	return gm.storeOrUpdateNode(part, node, true)
+	trans := newInternalGraphTrans(gm)
+	trans.subtrans = true
+
+	err := gm.gr.graphEvent(trans, EventNodeUpdate, part, node)
+
+	if err != nil {
+		if err == ErrEventHandled {
+			err = nil
+		}
+		return err
+	}
+
+	if err = trans.Commit(); err == nil {
+		err = gm.storeOrUpdateNode(part, node, true)
+	}
+
+	return err
 }
 
 /*
@@ -276,7 +308,7 @@ func (gm *Manager) storeOrUpdateNode(part string, node data.Node, onlyUpdate boo
 		event = EventNodeUpdated
 	}
 
-	if err := gm.gr.graphEvent(trans, event, part, node, oldnode); err != nil {
+	if err := gm.gr.graphEvent(trans, event, part, node, oldnode); err != nil && err != ErrEventHandled {
 		return err
 	} else if err := trans.Commit(); err != nil {
 		return err
@@ -438,70 +470,86 @@ func (gm *Manager) writeNode(node data.Node, onlyUpdate bool, attrTree *hash.HTr
 RemoveNode removes a single node from a partition of the graph.
 */
 func (gm *Manager) RemoveNode(part string, key string, kind string) (data.Node, error) {
+	var err error
 
-	// Get the HTree which stores the node index and node kind
+	trans := newInternalGraphTrans(gm)
+	trans.subtrans = true
 
-	iht, err := gm.getNodeIndexHTree(part, kind, false)
-	if err != nil {
+	if err = gm.gr.graphEvent(trans, EventNodeDelete, part, key, kind); err != nil {
+		if err == ErrEventHandled {
+			err = nil
+		}
 		return nil, err
 	}
 
-	attTree, valTree, err := gm.getNodeStorageHTree(part, kind, false)
-	if err != nil || attTree == nil || valTree == nil {
-		return nil, err
-	}
+	err = trans.Commit()
 
-	// Take writer lock
+	if err == nil {
 
-	gm.mutex.Lock()
-	defer gm.mutex.Unlock()
+		// Get the HTree which stores the node index and node kind
 
-	// Delete the node from the datastore
-
-	node, err := gm.deleteNode(key, kind, attTree, valTree)
-	if err != nil {
-		return node, err
-	}
+		iht, err := gm.getNodeIndexHTree(part, kind, false)
+		if err != nil {
+			return nil, err
+		}
 
-	// Update the index
+		attTree, valTree, err := gm.getNodeStorageHTree(part, kind, false)
+		if err != nil || attTree == nil || valTree == nil {
+			return nil, err
+		}
 
-	if node != nil {
+		// Take writer lock
 
-		if iht != nil {
-			err := util.NewIndexManager(iht).Deindex(key, node.IndexMap())
-			if err != nil {
-				return node, err
-			}
-		}
+		gm.mutex.Lock()
+		defer gm.mutex.Unlock()
 
-		// Decrease the node count
+		// Delete the node from the datastore
 
-		currentCount := gm.NodeCount(kind)
-		if err := gm.writeNodeCount(kind, currentCount-1, true); err != nil {
+		node, err := gm.deleteNode(key, kind, attTree, valTree)
+		if err != nil {
 			return node, err
 		}
 
-		// Execute rules
+		// Update the index
 
-		trans := newInternalGraphTrans(gm)
-		trans.subtrans = true
+		if node != nil {
 
-		if err := gm.gr.graphEvent(trans, EventNodeDeleted, part, node); err != nil {
-			return node, err
-		} else if err := trans.Commit(); err != nil {
-			return node, err
-		}
+			if iht != nil {
+				err := util.NewIndexManager(iht).Deindex(key, node.IndexMap())
+				if err != nil {
+					return node, err
+				}
+			}
+
+			// Decrease the node count
 
-		// Flush changes - errors only reported on the actual node storage flush
+			currentCount := gm.NodeCount(kind)
+			if err := gm.writeNodeCount(kind, currentCount-1, true); err != nil {
+				return node, err
+			}
 
-		gm.gs.FlushMain()
+			// Execute rules
 
-		gm.flushNodeIndex(part, kind)
+			trans := newInternalGraphTrans(gm)
+			trans.subtrans = true
 
-		return node, gm.flushNodeStorage(part, kind)
+			if err := gm.gr.graphEvent(trans, EventNodeDeleted, part, node); err != nil && err != ErrEventHandled {
+				return node, err
+			} else if err := trans.Commit(); err != nil {
+				return node, err
+			}
+
+			// Flush changes - errors only reported on the actual node storage flush
+
+			gm.gs.FlushMain()
+
+			gm.flushNodeIndex(part, kind)
+
+			return node, gm.flushNodeStorage(part, kind)
+		}
 	}
 
-	return nil, nil
+	return nil, err
 }
 
 /*

+ 15 - 4
graph/rules.go

@@ -54,10 +54,13 @@ type Rule interface {
 graphEvent main event handler which receives all graph related events.
 */
 func (gr *graphRulesManager) graphEvent(trans Trans, event int, data ...interface{}) error {
+	var result error
 	var errors []string
 
 	rules, ok := gr.eventMap[event]
 
+	handled := false // Flag to return a special handled error if no other error occured
+
 	if ok {
 
 		for _, rule := range rules {
@@ -73,10 +76,14 @@ func (gr *graphRulesManager) graphEvent(trans Trans, event int, data ...interfac
 			err := rule.Handle(gmclone, trans, event, data...)
 
 			if err != nil {
-				if errors == nil {
-					errors = make([]string, 0)
+				if err == ErrEventHandled {
+					handled = true
+				} else {
+					if errors == nil {
+						errors = make([]string, 0)
+					}
+					errors = append(errors, err.Error())
 				}
-				errors = append(errors, err.Error())
 			}
 		}
 	}
@@ -85,7 +92,11 @@ func (gr *graphRulesManager) graphEvent(trans Trans, event int, data ...interfac
 		return &util.GraphError{Type: util.ErrRule, Detail: strings.Join(errors, ";")}
 	}
 
-	return nil
+	if handled {
+		result = ErrEventHandled
+	}
+
+	return result
 }
 
 /*

+ 117 - 32
graph/rules_test.go

@@ -20,8 +20,10 @@ import (
 )
 
 type TestRule struct {
-	handleError bool
-	commitError bool
+	handledError    bool
+	processingError bool
+	commitError     bool
+	handles         []int
 }
 
 func (r *TestRule) Name() string {
@@ -29,12 +31,14 @@ func (r *TestRule) Name() string {
 }
 
 func (r *TestRule) Handles() []int {
-	return []int{EventNodeCreated, EventNodeUpdated, EventNodeDeleted,
-		EventEdgeCreated, EventEdgeUpdated, EventEdgeDeleted}
+	return r.handles
 }
 
 func (r *TestRule) Handle(gm *Manager, trans Trans, event int, ed ...interface{}) error {
-	if r.handleError {
+	if r.handledError {
+		return ErrEventHandled
+	}
+	if r.processingError {
 		return &util.GraphError{Type: util.ErrAccessComponent, Detail: "Test error"}
 	}
 
@@ -436,7 +440,8 @@ func TestRulesErrors(t *testing.T) {
 	mgs := graphstorage.NewMemoryGraphStorage("mystorage")
 	gm := NewGraphManager(mgs)
 
-	tr := &TestRule{false, false}
+	tr := &TestRule{false, false, false, []int{EventNodeCreated, EventNodeUpdated, EventNodeDeleted,
+		EventEdgeCreated, EventEdgeUpdated, EventEdgeDeleted}}
 
 	gm.SetGraphRule(tr)
 
@@ -453,7 +458,7 @@ func TestRulesErrors(t *testing.T) {
 	node1.SetAttr("kind", "mynode")
 	node1.SetAttr("Name", "Node1")
 
-	tr.handleError = true
+	tr.processingError = true
 	tr.commitError = false
 
 	if err := gm.StoreNode("main", node1); err.Error() !=
@@ -462,7 +467,14 @@ func TestRulesErrors(t *testing.T) {
 		return
 	}
 
-	tr.handleError = false
+	// Make sure the node was still added
+
+	if res, err := gm.FetchNode("main", "456", "mynode"); res == nil || err != nil {
+		t.Error("Unexpeccted result:", res, err)
+		return
+	}
+
+	tr.processingError = false
 	tr.commitError = true
 
 	if err := gm.StoreNode("main", node1); err.Error() !=
@@ -471,7 +483,7 @@ func TestRulesErrors(t *testing.T) {
 		return
 	}
 
-	tr.handleError = false
+	tr.processingError = false
 	tr.commitError = false
 
 	if err := gm.StoreNode("main", node1); err != nil {
@@ -479,7 +491,7 @@ func TestRulesErrors(t *testing.T) {
 		return
 	}
 
-	tr.handleError = true
+	tr.processingError = true
 	tr.commitError = false
 
 	if _, err := gm.RemoveNode("main", node1.Key(), node1.Kind()); err.Error() !=
@@ -488,7 +500,7 @@ func TestRulesErrors(t *testing.T) {
 		return
 	}
 
-	tr.handleError = false
+	tr.processingError = false
 	tr.commitError = false
 
 	if err := gm.StoreNode("main", node1); err != nil {
@@ -496,7 +508,7 @@ func TestRulesErrors(t *testing.T) {
 		return
 	}
 
-	tr.handleError = false
+	tr.processingError = false
 	tr.commitError = true
 
 	if _, err := gm.RemoveNode("main", node1.Key(), node1.Kind()); err.Error() !=
@@ -505,7 +517,7 @@ func TestRulesErrors(t *testing.T) {
 		return
 	}
 
-	tr.handleError = false
+	tr.processingError = false
 	tr.commitError = false
 
 	if err := gm.StoreNode("main", node1); err != nil {
@@ -513,7 +525,7 @@ func TestRulesErrors(t *testing.T) {
 		return
 	}
 
-	tr.handleError = true
+	tr.processingError = true
 	tr.commitError = false
 
 	edge := data.NewGraphEdge()
@@ -536,7 +548,7 @@ func TestRulesErrors(t *testing.T) {
 		return
 	}
 
-	tr.handleError = false
+	tr.processingError = false
 	tr.commitError = true
 
 	if err := gm.StoreEdge("main", edge); err.Error() !=
@@ -545,7 +557,7 @@ func TestRulesErrors(t *testing.T) {
 		return
 	}
 
-	tr.handleError = false
+	tr.processingError = false
 	tr.commitError = false
 
 	if err := gm.StoreEdge("main", edge); err != nil {
@@ -553,7 +565,7 @@ func TestRulesErrors(t *testing.T) {
 		return
 	}
 
-	tr.handleError = false
+	tr.processingError = false
 	tr.commitError = true
 
 	if _, err := gm.RemoveEdge("main", edge.Key(), edge.Kind()); err.Error() !=
@@ -562,7 +574,7 @@ func TestRulesErrors(t *testing.T) {
 		return
 	}
 
-	tr.handleError = false
+	tr.processingError = false
 	tr.commitError = false
 
 	if err := gm.StoreEdge("main", edge); err != nil {
@@ -570,7 +582,7 @@ func TestRulesErrors(t *testing.T) {
 		return
 	}
 
-	tr.handleError = true
+	tr.processingError = true
 	tr.commitError = false
 
 	if _, err := gm.RemoveEdge("main", edge.Key(), edge.Kind()); err.Error() !=
@@ -579,7 +591,7 @@ func TestRulesErrors(t *testing.T) {
 		return
 	}
 
-	tr.handleError = false
+	tr.processingError = false
 	tr.commitError = false
 
 	if err := gm.StoreEdge("main", edge); err != nil {
@@ -591,7 +603,7 @@ func TestRulesErrors(t *testing.T) {
 
 	trans := NewConcurrentGraphTrans(gm)
 
-	tr.handleError = true
+	tr.processingError = true
 	tr.commitError = false
 
 	trans.StoreNode("main", node1)
@@ -602,7 +614,7 @@ func TestRulesErrors(t *testing.T) {
 		return
 	}
 
-	tr.handleError = false
+	tr.processingError = false
 	tr.commitError = true
 
 	trans.StoreNode("main", node1)
@@ -613,7 +625,7 @@ func TestRulesErrors(t *testing.T) {
 		return
 	}
 
-	tr.handleError = true
+	tr.processingError = true
 	tr.commitError = false
 
 	trans.RemoveNode("main", node1.Key(), node1.Kind())
@@ -624,7 +636,7 @@ func TestRulesErrors(t *testing.T) {
 		return
 	}
 
-	tr.handleError = false
+	tr.processingError = false
 	tr.commitError = false
 
 	if err := gm.StoreNode("main", node1); err != nil {
@@ -632,7 +644,7 @@ func TestRulesErrors(t *testing.T) {
 		return
 	}
 
-	tr.handleError = false
+	tr.processingError = false
 	tr.commitError = true
 
 	trans.RemoveNode("main", node1.Key(), node1.Kind())
@@ -643,7 +655,7 @@ func TestRulesErrors(t *testing.T) {
 		return
 	}
 
-	tr.handleError = false
+	tr.processingError = false
 	tr.commitError = false
 
 	if err := gm.StoreNode("main", node1); err != nil {
@@ -651,7 +663,7 @@ func TestRulesErrors(t *testing.T) {
 		return
 	}
 
-	tr.handleError = true
+	tr.processingError = true
 	tr.commitError = false
 
 	trans.StoreEdge("main", edge)
@@ -662,7 +674,7 @@ func TestRulesErrors(t *testing.T) {
 		return
 	}
 
-	tr.handleError = false
+	tr.processingError = false
 	tr.commitError = true
 
 	trans.StoreEdge("main", edge)
@@ -673,7 +685,7 @@ func TestRulesErrors(t *testing.T) {
 		return
 	}
 
-	tr.handleError = false
+	tr.processingError = false
 	tr.commitError = false
 
 	if err := gm.StoreEdge("main", edge); err != nil {
@@ -681,7 +693,7 @@ func TestRulesErrors(t *testing.T) {
 		return
 	}
 
-	tr.handleError = false
+	tr.processingError = false
 	tr.commitError = true
 
 	trans.RemoveEdge("main", edge.Key(), edge.Kind())
@@ -692,7 +704,7 @@ func TestRulesErrors(t *testing.T) {
 		return
 	}
 
-	tr.handleError = false
+	tr.processingError = false
 	tr.commitError = false
 
 	if err := gm.StoreEdge("main", edge); err != nil {
@@ -700,7 +712,7 @@ func TestRulesErrors(t *testing.T) {
 		return
 	}
 
-	tr.handleError = true
+	tr.processingError = true
 	tr.commitError = false
 
 	trans.RemoveEdge("main", edge.Key(), edge.Kind())
@@ -711,3 +723,76 @@ func TestRulesErrors(t *testing.T) {
 		return
 	}
 }
+
+func TestRulesErrorsPreChange(t *testing.T) {
+	mgs := graphstorage.NewMemoryGraphStorage("mystorage")
+	gm := NewGraphManager(mgs)
+
+	tr := &TestRule{false, false, false, []int{EventNodeStore, EventNodeUpdate, EventNodeDelete,
+		EventEdgeStore, EventEdgeDelete}}
+
+	gm.SetGraphRule(tr)
+
+	// Check that the test rule was added
+
+	if rules := fmt.Sprint(gm.GraphRules()); rules !=
+		"[system.deletenodeedges system.updatenodestats testrule]" {
+		t.Error("unexpected graph rule list:", rules)
+		return
+	}
+
+	// Test that we can stop the storage of a node with custom processing
+
+	node1 := data.NewGraphNode()
+	node1.SetAttr("key", "456")
+	node1.SetAttr("kind", "mynode")
+	node1.SetAttr("Name", "Node1")
+
+	tr.handledError = true
+
+	if err := gm.StoreNode("main", node1); err != nil {
+		t.Error(err)
+		return
+	}
+
+	// Make sure the node was not added
+
+	if res, err := gm.FetchNode("main", "456", "mynode"); res != nil || err != nil {
+		t.Error("Unexpeccted result:", res, err)
+		return
+	}
+
+	if err := gm.UpdateNode("main", node1); err != nil {
+		t.Error(err)
+		return
+	}
+
+	if _, err := gm.RemoveNode("main", node1.Key(), node1.Kind()); err != nil {
+		t.Error(err)
+		return
+	}
+
+	edge := data.NewGraphEdge()
+	edge.SetAttr("key", "123")
+	edge.SetAttr("kind", "myedge")
+
+	edge.SetAttr(data.EdgeEnd1Key, node1.Key())
+	edge.SetAttr(data.EdgeEnd1Kind, node1.Kind())
+	edge.SetAttr(data.EdgeEnd1Role, "node1")
+	edge.SetAttr(data.EdgeEnd1Cascading, false)
+
+	edge.SetAttr(data.EdgeEnd2Key, node1.Key())
+	edge.SetAttr(data.EdgeEnd2Kind, node1.Kind())
+	edge.SetAttr(data.EdgeEnd2Role, "node2")
+	edge.SetAttr(data.EdgeEnd2Cascading, false)
+
+	if err := gm.StoreEdge("main", edge); err != nil {
+		t.Error(err)
+		return
+	}
+
+	if _, err := gm.RemoveEdge("main", edge.Key(), edge.Kind()); err != nil {
+		t.Error(err)
+		return
+	}
+}

BIN
old/old_brawler_integration.tar


+ 21 - 2
server/server.go

@@ -36,10 +36,11 @@ import (
 	"devt.de/krotik/common/timeutil"
 	"devt.de/krotik/eliasdb/api"
 	"devt.de/krotik/eliasdb/api/ac"
-	"devt.de/krotik/eliasdb/api/v1"
+	v1 "devt.de/krotik/eliasdb/api/v1"
 	"devt.de/krotik/eliasdb/cluster"
 	"devt.de/krotik/eliasdb/cluster/manager"
 	"devt.de/krotik/eliasdb/config"
+	"devt.de/krotik/eliasdb/ecal"
 	"devt.de/krotik/eliasdb/graph"
 	"devt.de/krotik/eliasdb/graph/graphstorage"
 )
@@ -196,6 +197,24 @@ func StartServerWithSingleOp(singleOperation func(*graph.Manager) bool) {
 		os.RemoveAll(filepath.Join(basepath, config.Str(config.LockFile)))
 	}()
 
+	// Create ScriptingInterpreter instance and run ECAL scripts
+
+	if config.Bool(config.EnableECALScripts) {
+
+		// Make sure the script directory exists
+
+		loc := filepath.Join(basepath, config.Str(config.ECALScriptFolder))
+		ensurePath(loc)
+
+		print("Loading ECAL scripts in ", loc)
+
+		api.SI = ecal.NewScriptingInterpreter(loc, api.GM)
+		if err := api.SI.Run(); err != nil {
+			fatal("Failed to start ECAL scripting interpreter:", err)
+			return
+		}
+	}
+
 	// Handle single operation - these are operations which work on the GraphManager
 	// and then exit.
 
@@ -413,7 +432,7 @@ func StartServerWithSingleOp(singleOperation func(*graph.Manager) bool) {
 
 	port := config.Str(config.HTTPSPort)
 
-	print("Starting server on: ", api.APIHost)
+	print("Starting HTTPS server on: ", api.APIHost)
 
 	go hs.RunHTTPSServer(basepath+config.Str(config.LocationHTTPS), config.Str(config.HTTPSCertificate),
 		config.Str(config.HTTPSKey), ":"+port, &wg)

+ 30 - 1
server/server_test.go

@@ -126,6 +126,10 @@ func TestMainNormalCase(t *testing.T) {
 
 	config.LoadDefaultConfig()
 
+	// Start ECAL scripting by default
+
+	config.Config[config.EnableECALScripts] = true
+
 	// Start cluster by default
 
 	config.Config[config.EnableCluster] = true
@@ -206,12 +210,13 @@ Opening cluster state info
 Starting cluster (log history: 100)
 [Cluster] member1: Starting member manager member1 rpc server on: 127.0.0.1:9030
 Creating GraphManager instance
+Loading ECAL scripts in testdb/ecal
 Creating key (key.pem) and certificate (cert.pem) in: ssl
 Ensuring web folder: testdb/web
 Ensuring login page: testdb/web/login.html
 Ensuring web terminal: testdb/web/db/term.html
 Ensuring cluster terminal: testdb/web/db/cluster.html
-Starting server on: 127.0.0.1:9090
+Starting HTTPS server on: 127.0.0.1:9090
 Writing fingerprint file: testdb/web/fingerprint.json
 Waiting for shutdown
 Lockfile was modified
@@ -259,6 +264,30 @@ func TestMainErrorCases(t *testing.T) {
 
 	// Test db access error
 
+	config.Config[config.MemoryOnlyStorage] = true
+	config.Config[config.EnableReadOnly] = true
+	config.Config[config.EnableECALScripts] = true
+	config.Config[config.ECALEntryScript] = invalidFileName
+
+	runServer()
+
+	// Check that an error happened
+
+	if len(errorLog) != 1 ||
+		!strings.Contains(errorLog[0], "Failed to start ECAL scripting interpreter") {
+		t.Error("Unexpected error:", errorLog)
+		return
+	}
+
+	// Set back logs
+
+	printLog = []string{}
+	errorLog = []string{}
+
+	// Test db access error
+
+	config.Config[config.EnableECALScripts] = false
+	config.Config[config.MemoryOnlyStorage] = false
 	config.Config[config.LocationDatastore] = invalidFileName
 	config.Config[config.EnableReadOnly] = true