Browse Source

feat: Initial commit

Matthias Ladkau 10 months ago
commit
e32772917e
82 changed files with 23022 additions and 0 deletions
  1. 10 0
      .gitignore
  2. 153 0
      Jenkinsfile
  3. 7 0
      LICENSE
  4. 43 0
      Makefile
  5. 2 0
      NOTICE
  6. 33 0
      README.md
  7. 601 0
      api/api.go
  8. 344 0
      api/api_test.go
  9. 146 0
      api/eventsource.go
  10. 108 0
      api/eventsource_test.go
  11. 77 0
      api/web/about.go
  12. 187 0
      api/web/rest.go
  13. 364 0
      api/web/rest_test.go
  14. 173 0
      api/web/swagger.go
  15. 64 0
      api/web/util.go
  16. 680 0
      api/web/v1/engine.go
  17. 280 0
      api/web/v1/engine_test.go
  18. 53 0
      api/web/v1/rest.go
  19. 149 0
      api/web/v1/rest_test.go
  20. 140 0
      api/web/v1/rumble.go
  21. 93 0
      api/web/v1/rumble_test.go
  22. 22 0
      attach_webzip.sh
  23. 53 0
      brawler_design.md
  24. 88 0
      cli/brawler.go
  25. 278 0
      cli/debug_server.go
  26. 262 0
      cli/interpreter.go
  27. 124 0
      engine/debug.go
  28. 60 0
      engine/event.go
  29. 468 0
      engine/monitor.go
  30. 436 0
      engine/processor.go
  31. 742 0
      engine/processor_test.go
  32. 804 0
      engine/rule.go
  33. 509 0
      engine/rule_test.go
  34. 236 0
      engine/taskqueue.go
  35. 223 0
      engine/taskqueue_test.go
  36. 161 0
      engine/util.go
  37. 77 0
      engine/util_test.go
  38. 5 0
      go.mod
  39. 2 0
      go.sum
  40. 302 0
      rumble.md
  41. 69 0
      rumble/api/func.go
  42. 80 0
      rumble/api/func_test.go
  43. 162 0
      rumble/interpreter/arithmetic.go
  44. 294 0
      rumble/interpreter/arithmetic_test.go
  45. 237 0
      rumble/interpreter/boolean.go
  46. 259 0
      rumble/interpreter/boolean_test.go
  47. 152 0
      rumble/interpreter/cond_op.go
  48. 143 0
      rumble/interpreter/cond_op_test.go
  49. 702 0
      rumble/interpreter/func.go
  50. 782 0
      rumble/interpreter/func_test.go
  51. 924 0
      rumble/interpreter/general.go
  52. 834 0
      rumble/interpreter/general_test.go
  53. 134 0
      rumble/interpreter/provider.go
  54. 179 0
      rumble/interpreter/provider_test.go
  55. 63 0
      rumble/interpreter/runtimeerror.go
  56. 334 0
      rumble/interpreter/sink.go
  57. 367 0
      rumble/interpreter/statements.go
  58. 1013 0
      rumble/interpreter/statements_test.go
  59. 461 0
      rumble/interpreter/varscope.go
  60. 303 0
      rumble/interpreter/varscope_test.go
  61. 238 0
      rumble/parser/const.go
  62. 556 0
      rumble/parser/lexer.go
  63. 418 0
      rumble/parser/lexer_test.go
  64. 882 0
      rumble/parser/parser.go
  65. 1175 0
      rumble/parser/parser_test.go
  66. 65 0
      rumble/parser/parsererror.go
  67. 316 0
      rumble/parser/prettyprinter.go
  68. 745 0
      rumble/parser/prettyprinter_test.go
  69. 69 0
      rumble/parser/runtime.go
  70. 69 0
      rumble/stdlib/util/file.go
  71. 66 0
      rumble/stdlib/util/path.go
  72. 198 0
      rumble/stdlib/util/string.go
  73. 154 0
      rumble/stdlib/util/time.go
  74. 306 0
      rumble/stdlib/util/util_test.go
  75. 118 0
      rumble/util.go
  76. 142 0
      rumble/util_test.go
  77. BIN
      web.zip
  78. 156 0
      web/brawler/brawler.css
  79. 200 0
      web/brawler/sinks.js
  80. 397 0
      web/brawler/term.js
  81. 678 0
      web/brawler/util.js
  82. 23 0
      web/index.html

+ 10 - 0
.gitignore

@@ -0,0 +1,10 @@
+.cache
+.cover
+coverage.txt
+coverage.out
+coverage.html
+test
+/ssl
+/dist
+build
+/brawler

+ 153 - 0
Jenkinsfile

@@ -0,0 +1,153 @@
+pipeline {
+    agent any
+
+    /**
+     * Build file for Brawler
+     *
+     * Each build happens with 2 commits. The first commit is the actual
+     * feature or fix commit. The commit message should follow conventional
+     * commit messages (https://www.conventionalcommits.org/en/v1.0.0-beta.4/).
+     * In a second commit a program called standard version
+     * (https://github.com/conventional-changelog/standard-version) calculates
+     * a new product version. The versioning will be according to the rules
+     * of “Semantic Versioning” (https://semver.org/).
+     *
+     * Building is done using simple make.
+     *
+     * Testing produces code coverage badges which can be embedded web pages.
+     */
+
+    stages {
+        stage('Commit Analysis') {
+            steps {
+
+                // Read the commit message into a variable
+                //
+                script {
+                  commit_msg = sh(returnStdout: true, script: 'git log -1')
+                }
+            }
+        }
+        stage('Prepare Release Build') {
+
+            // Check for a release build (a commit by standard-version)
+            //
+            when { expression { return commit_msg =~ /chore\(release\)\:/ } }
+            steps {
+
+                // Find out the tagged version
+                //
+                script {
+                  version = sh(returnStdout: true, script: 'git log -1 | grep chore | tr -d "\\n" | sed "s/.*chore(release): \\([0-9\\.]*\\)/\\1/"')
+                }
+
+                echo "Building version: ${version} ..."
+            }
+        }
+        stage('Build') {
+            when { expression { return commit_msg =~ /chore\(release\)\:/ } }
+            steps {
+
+                // Fetch all git tags and run goreleaser
+                //
+                checkout scm
+                sshagent (credentials: ['Gogs']) {
+                    sh 'git fetch --tags'
+                }
+
+                sh '/opt/env-go/bin/env-go make dist'
+            }
+        }
+        stage('Test') {
+
+            // The tests are run in both stages - no release commit is made if the tests fail.
+            // The output is the coverage data and the badge.
+            //
+            steps {
+                echo 'Running tests ...'
+
+                sh """echo '<svg width="88" height="20" xmlns="http://www.w3.org/2000/svg"><g shape-rendering="crispEdges"><path fill="#555" d="M0 0h41v20H0z"/><path fill="#fc1" d="M41 0h40v20H41z"/></g><g fill="#fff" text-anchor="middle" font-family="DejaVu Sans,Verdana,Geneva,sans-serif" font-size="11"><text x="20.5" y="14">tests</text><text x="60" y="14">fail</text></g></svg>' > test_result.svg"""
+
+                sh 'CGO_ENABLED=0 /opt/env-go/bin/env-go go test -p 1 --coverprofile=coverage.out ./...'
+                sh '/opt/env-go/bin/env-go go tool cover --html=coverage.out -o coverage.html'
+
+                echo 'Determine overall coverage and writing badge'
+                script {
+                  coverage = sh(returnStdout: true, script: '/opt/env-go/bin/env-go go tool cover -func=coverage.out | tee coverage.txt | tail -1 | grep -o "[0-9]*.[0-9]*%$" | tr -d "\\n"')
+
+                  echo "Overall coverage is: ${coverage}"
+
+                  if (coverage.equals("100.0%")) {
+                    sh """echo '<svg width="110" height="20" xmlns="http://www.w3.org/2000/svg"><g shape-rendering="crispEdges"><path fill="#555" d="M0 0h61v20H0z"/><path fill="#4c1" d="M61 0h50v20H61z"/></g><g fill="#fff" text-anchor="middle" font-family="DejaVu Sans,Verdana,Geneva,sans-serif" font-size="11"><text x="30.5" y="14">coverage</text><text x="85" y="14">$coverage</text></g></svg>' > test_result.svg"""
+                  } else {
+                    sh """echo '<svg width="110" height="20" xmlns="http://www.w3.org/2000/svg"><g shape-rendering="crispEdges"><path fill="#555" d="M0 0h61v20H0z"/><path fill="#fc1" d="M61 0h50v20H61z"/></g><g fill="#fff" text-anchor="middle" font-family="DejaVu Sans,Verdana,Geneva,sans-serif" font-size="11"><text x="30.5" y="14">coverage</text><text x="85" y="14">$coverage</text></g></svg>' > test_result.svg"""
+                  }
+                }
+            }
+        }
+        stage('Create Release Build Commit') {
+
+            // Check for a non-release build to avoid a commit loop
+            //
+            when { not { expression { return commit_msg =~ /chore\(release\)\:/ } } }
+            steps {
+
+                // Before running standard-version it is important to fetch
+                // the existing tags so next version can be calculated
+                //
+                echo 'Running standard version ...'
+                sshagent (credentials: ['Gogs']) {
+                    sh 'git fetch --tags'
+                }
+                sh 'standard-version'
+
+                // The new version is inserted into the code
+                //
+                script {
+                  new_version = sh(returnStdout: true, script: 'git tag | tail -1 | tr -d "\\n"')
+                }
+                echo "Inserting version $new_version into the code"
+                sh "find . -name '*.go' -exec sed -i -e 's/ProductVersion\\ =\\ \\\".*\\\"/ProductVersion = \\\"${new_version.substring(1)}\\\"/g' {} \\;"
+
+                // The commit is amended to include the code change
+                //
+                echo "Tagging the build and push the changes into the origin repository"
+                sshagent (credentials: ['Gogs']) {
+                    sh 'git config user.name "Matthias Ladkau"'
+                    sh 'git config user.email "webmaster@devt.de"'
+
+                    sh 'git commit -a --amend --no-edit'
+                    sh "git tag --force $new_version"
+
+                    sh 'git push --tags origin master'
+                }
+            }
+        }
+        stage('Upload Release Build Commit') {
+            when { expression { return commit_msg =~ /chore\(release\)\:/ } }
+            steps {
+                echo "Uploading release build ..."
+
+                // After a successful build the resulting artifacts are
+                // uploaded for publication
+                //
+                sshagent (credentials: ['Gogs']) {
+
+                  // Clear distribution folder
+                  sh 'ssh -o StrictHostKeyChecking=no -p 7000 krotik@devt.de rm -fR pub/brawler'
+                  sh 'ssh -o StrictHostKeyChecking=no -p 7000 krotik@devt.de mkdir -p pub/brawler'
+
+                  // Copy distribution packages in place
+                  sh 'scp -P 7000 -o StrictHostKeyChecking=no dist/*.tar.gz krotik@devt.de:~/pub/brawler'
+                  sh 'scp -P 7000 -o StrictHostKeyChecking=no dist/checksums.txt krotik@devt.de:~/pub/brawler'
+
+                  // Copy coverage in place
+                  sh 'scp -P 7000 -o StrictHostKeyChecking=no coverage.* krotik@devt.de:~/pub/brawler'
+
+                  // Copy test result in place
+                  sh 'scp -P 7000 -o StrictHostKeyChecking=no test_result.svg krotik@devt.de:~/pub/brawler'
+                }
+            }
+        }
+    }
+}

+ 7 - 0
LICENSE

@@ -0,0 +1,7 @@
+Copyright 2019 Matthias Ladkau
+
+Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:
+
+The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.

+ 43 - 0
Makefile

@@ -0,0 +1,43 @@
+export NAME=brawler
+export TAG=`git describe --abbrev=0 --tags`
+export CGO_ENABLED=0
+export GOOS=linux
+
+all: build
+clean:
+	rm -f brawler
+
+mod:
+	go mod init || true
+	go mod tidy
+test:
+	go test -p 1 ./...
+
+fmt:
+	gofmt -l -w -s .
+
+vet:
+	go vet ./...
+
+build: clean mod fmt vet
+	go build -o $(NAME) cli/*
+
+build-win: clean mod fmt vet
+	GOOS=windows GOARCH=amd64 go build -o $(NAME).exe cli/*
+
+dist: build build-win
+	rm -fR dist
+
+	mkdir -p dist/$(NAME)_linux_amd64
+	mv $(NAME) dist/$(NAME)_linux_amd64
+	cp LICENSE dist/$(NAME)_linux_amd64
+	cp NOTICE dist/$(NAME)_linux_amd64
+	tar --directory=dist -cz $(NAME)_linux_amd64 > dist/$(NAME)_$(TAG)_linux_amd64.tar.gz
+
+	mkdir -p dist/$(NAME)_windows_amd64
+	mv $(NAME).exe dist/$(NAME)_windows_amd64
+	cp LICENSE dist/$(NAME)_windows_amd64
+	cp NOTICE dist/$(NAME)_windows_amd64
+	tar --directory=dist -cz $(NAME)_windows_amd64 > dist/$(NAME)_$(TAG)_windows_amd64.tar.gz
+
+	sh -c 'cd dist; sha256sum *.tar.gz' > dist/checksums.txt

+ 2 - 0
NOTICE

@@ -0,0 +1,2 @@
+Brawler - Event Condition Action Engine
+Copyright (c) 2019 Matthias Ladkau

+ 33 - 0
README.md

@@ -0,0 +1,33 @@
+Brawler
+====
+Brawler is a ECA (Event Condition Action) engine for concurrent event processing using. Brawler uses the event-based language Rumble to define event-based systems featuring events and rules which are triggered by events.
+
+<p>
+<a href="https://void.devt.de/pub/brawler/coverage.txt"><img src="https://void.devt.de/pub/brawler/test_result.svg" alt="Code coverage"></a>
+<a href="https://goreportcard.com/report/devt.de/krotik/brawler">
+<img src="https://goreportcard.com/badge/devt.de/krotik/brawler?style=flat-square" alt="Go Report Card"></a>
+<a href="https://godoc.org/devt.de/krotik/brawler">
+<img src="https://godoc.org/devt.de/krotik/brawler?status.svg" alt="Go Doc"></a>
+</p>
+
+Features
+--------
+- Simple but powerful concurrent event-based processing.
+- Priorities for control flow.
+- Event cascades can be traced with monitors.
+- Rules which can match on event state.
+- Rules can suppress each other.
+
+### REST API:
+
+The terminal uses a REST API to communicate with the backend. The REST API can be browsed using a dynamically generated swagger.json definition (https://localhost:9090/fs/swagger.json). You can browse the API of Rufs's latest version [here](http://petstore.swagger.io/?url=https://devt.de/krotik/brawler/raw/master/swagger.json).
+
+### Further Reading:
+
+- [Brawler Design](brawler_design.md)
+- [Rumble Language](rumble.md)
+
+
+License
+-------
+Rufs source code is available under the [MIT License](/LICENSE).

+ 601 - 0
api/api.go

@@ -0,0 +1,601 @@
+/*
+ * Brawler
+ *
+ * Copyright 2019 Matthias Ladkau. All rights reserved.
+ *
+ * This Source Code Form is subject to the terms of the MIT
+ * License, If a copy of the MIT License was not distributed with this
+ * file, You can obtain one at https://opensource.org/licenses/MIT.
+ */
+
+/*
+Package api contains the main API to Brawler.
+*/
+package api
+
+import (
+	"fmt"
+	"io/ioutil"
+	"os"
+	"path/filepath"
+	"sort"
+	"strings"
+	"sync"
+
+	"devt.de/krotik/brawler/engine"
+	"devt.de/krotik/brawler/rumble"
+	"devt.de/krotik/brawler/rumble/interpreter"
+	"devt.de/krotik/brawler/rumble/parser"
+	"devt.de/krotik/common/datautil"
+	rumbledefs "devt.de/krotik/common/defs/rumble"
+	"devt.de/krotik/common/errorutil"
+	"devt.de/krotik/common/fileutil"
+)
+
+// API parameters
+// ==============
+
+/*
+ProcessorWorkerCount is the worker count for each created processor
+*/
+var ProcessorWorkerCount = 4
+
+/*
+LogSize is the log size for each created processor
+*/
+var LogSize = 50
+
+/*
+EngineSinkDirectories contains initialisation sinks for new engines.
+All .ram files will be loaded from <EngineSinkDirectories>/<engine name>
+*/
+var EngineSinkDirectories = "sinks"
+
+/*
+DefaultLogLevel is the default log level which is used by every new engine
+*/
+var DefaultLogLevel = rumble.LogLevel(rumble.Info)
+
+// Constants and definitions
+// =========================
+
+/*
+Error types for Brawler API
+*/
+const (
+	ErrorRestAPI       = "Rest API Error"
+	ErrorRumbleParser  = "Rumble Parser Error"
+	ErrorRumbleRuntime = "Rumble Runtime Error"
+)
+
+// BrawlerEngine
+// ================
+
+/*
+BrawlerEngine is used to define a processing unit in the API.
+*/
+type BrawlerEngine struct {
+	name           string                     // Name of the engine
+	Proc           engine.Processor           // Actual processor which load rules and processes events
+	Log            LogWriter                  // Log of the processor
+	LogLevel       rumble.LogLevel            // LogLevel of this engine
+	RootVars       parser.VarsScope           // Root variable scope of the engine
+	EchoNilResults bool                       // Flag if nil results should be echoed (used for consoles)
+	Sinks          map[string]*parser.ASTNode // Loaded Rumble sinks
+}
+
+/*
+String returns a string representation of the engine.
+*/
+func (re *BrawlerEngine) String() string {
+	return fmt.Sprintf("BrawlerEngine: %v (EchoNilResults: %v, Sinks: %v, Processor: %v Status: %v)",
+		re.name, re.EchoNilResults, len(re.Sinks), re.Proc.String(), re.Proc.Status())
+}
+
+/*
+AddLogMessage adds a log message to the log if it meets the current log
+level of the engine.
+*/
+func (re *BrawlerEngine) AddLogMessage(m *LogMessage) {
+	if m.Level == Result || rumble.LogLevelMatch(re.LogLevel, m.Level) {
+		re.Log.Add(m)
+	}
+}
+
+/*
+DefaultRootMonitorErrorObserver is the default observer for root monitor errors.
+The observer logs all encountered errors in the engine's log.
+*/
+func (re *BrawlerEngine) DefaultRootMonitorErrorObserver(rm *engine.RootMonitor) {
+	terrs := rm.AllErrors()
+
+	for _, te := range terrs {
+
+		// Collect all errors and sort them by name
+
+		errNames := make([]string, 0, len(te.ErrorMap))
+
+		for name := range te.ErrorMap {
+			errNames = append(errNames, name)
+		}
+
+		sort.Strings(errNames)
+
+		eventPath := te.Monitor.EventPathString()
+		for _, name := range errNames {
+			re.AddLogMessage(&LogMessage{rm.ID(), rumble.Error,
+				fmt.Sprintf("%v -> %v: %v", eventPath, name, te.ErrorMap[name])})
+		}
+	}
+}
+
+/*
+LogWriter models the log writer for a BrawlerEngine
+*/
+type LogWriter interface {
+
+	/*
+	   Add adds a new log message.
+	*/
+	Add(*LogMessage)
+
+	/*
+	   Slice returns the contents of the current log as a slice.
+	*/
+	Slice() []*LogMessage
+
+	/*
+		Resets the current log.
+	*/
+	Reset()
+
+	/*
+		Size returns the current log size.
+	*/
+	Size() int
+
+	/*
+		String returns the current log as a string.
+	*/
+	String() string
+}
+
+/*
+NewLogWriter returns a new log writer instance and is called when a new
+engine is created.
+*/
+var NewLogWriter = func(engine string) LogWriter {
+	return &MemoryLogWriter{datautil.NewRingBuffer(LogSize)}
+}
+
+/*
+MemoryLogWriter is a log writer which writes into a ring buffer in memory.
+*/
+type MemoryLogWriter struct {
+	*datautil.RingBuffer
+}
+
+/*
+Add adds a new log message.
+*/
+func (lw *MemoryLogWriter) Add(m *LogMessage) {
+	lw.RingBuffer.Add(m)
+}
+
+/*
+Slice returns the contents of the current log as a slice.
+*/
+func (lw *MemoryLogWriter) Slice() []*LogMessage {
+	sl := lw.RingBuffer.Slice()
+	ret := make([]*LogMessage, len(sl))
+	for i, lm := range sl {
+		ret[i] = lm.(*LogMessage)
+	}
+	return ret
+}
+
+/*
+Reset resets the current log.
+*/
+func (lw *MemoryLogWriter) Reset() {
+	lw.RingBuffer.Reset()
+}
+
+/*
+Size returns the current log size.
+*/
+func (lw *MemoryLogWriter) Size() int {
+	return lw.RingBuffer.Size()
+}
+
+/*
+String returns the current log as a string.
+*/
+func (lw *MemoryLogWriter) String() string {
+	return lw.RingBuffer.String()
+}
+
+/*
+Result is a special log level indicating a result of a manually entered command
+*/
+var Result rumble.LogLevel = "Result"
+
+/*
+LogMessage models a single log message
+*/
+type LogMessage struct {
+	MonitorID uint64
+	Level     rumble.LogLevel
+	Msg       string
+}
+
+func (lm *LogMessage) String() string {
+	if lm.Level == Result {
+		return fmt.Sprintf("> %v", lm.Msg)
+	}
+	return fmt.Sprintf("LogMsg (Monitor:%v Level:%v): %v", lm.MonitorID, lm.Level, lm.Msg)
+}
+
+// API functions
+// =============
+
+/*
+GetEngineNames returns all available engine names.
+*/
+func GetEngineNames() []string {
+	enginesLock.Lock()
+	defer enginesLock.Unlock()
+
+	ret := make([]string, 0, len(engines))
+
+	for n := range engines {
+		ret = append(ret, n)
+	}
+
+	sort.Strings(ret)
+
+	return ret
+}
+
+/*
+EngineExists checks if a given engine exists.
+*/
+func EngineExists(name string) bool {
+	enginesLock.Lock()
+	defer enginesLock.Unlock()
+
+	_, ok := engines[name]
+
+	return ok
+}
+
+/*
+GetEngine returns a BrawlerEngine of a certain namme. The engine is
+created if it does not exist. If the reset flag is set then all current
+loaded sinks are discarded and the default sinks are loaded again.
+*/
+func GetEngine(name string, reset bool) *BrawlerEngine {
+	var loadDefaultSinks bool
+
+	enginesLock.Lock()
+	re, ok := engines[name]
+	enginesLock.Unlock()
+
+	if !ok {
+
+		re = &BrawlerEngine{name, wrapProc(engine.NewProcessor(ProcessorWorkerCount)),
+			NewLogWriter(name), DefaultLogLevel, nil, true, make(map[string]*parser.ASTNode)}
+
+		enginesLock.Lock()
+		engines[name] = re
+		enginesLock.Unlock()
+
+		// Register logger which logs into the ringbuffer of the engine
+
+		rumble.RegisterLogListener(name, func(processorID uint64, rootMonitorID uint64,
+			level rumble.LogLevel, v ...interface{}) {
+
+			if processorID == re.Proc.ID() {
+				for i, vv := range v {
+					v[i] = rumble.EvalToString(vv)
+				}
+				re.AddLogMessage(&LogMessage{rootMonitorID, level, fmt.Sprint(v...)})
+			}
+		})
+
+		// Default error observer just logs the errors
+
+		re.Proc.SetRootMonitorErrorObserver(re.DefaultRootMonitorErrorObserver)
+
+		loadDefaultSinks = true
+
+	} else if reset {
+
+		re.Proc.Finish()
+		re.Proc.Reset()
+		re.Proc.Start()
+
+		loadDefaultSinks = true
+	}
+
+	if loadDefaultSinks {
+
+		// Reset the root variable scope
+
+		re.RootVars = interpreter.NewRootVariableScope("")
+
+		// Load default sinks for this engine
+
+		sinkPath := filepath.Join(EngineSinkDirectories, name)
+
+		if ok, _ := fileutil.PathExists(sinkPath); ok {
+
+			checkLoadError := func(err error) {
+				if err != nil {
+					re.AddLogMessage(&LogMessage{0, rumble.Error,
+						fmt.Sprintf("Can't load initial sinks for %v: %v",
+							name, err.Error())})
+				}
+			}
+
+			origEchoNilResults := re.EchoNilResults
+			re.EchoNilResults = false
+
+			err := filepath.Walk(sinkPath, func(path string, info os.FileInfo, err error) error {
+				if !info.IsDir() && strings.HasSuffix(info.Name(), ".ram") {
+					var code []byte
+					var err error
+
+					if code, err = ioutil.ReadFile(path); err == nil {
+						var wg sync.WaitGroup
+
+						wg.Add(1)
+						_, err = RunRumbleCode(name, info.Name(), string(code), &wg)
+
+						// Wait until the default sinks have loaded and executed
+
+						wg.Wait()
+					}
+					checkLoadError(err)
+				}
+
+				return nil
+			})
+
+			re.EchoNilResults = origEchoNilResults
+
+			checkLoadError(err)
+		}
+	}
+
+	return re
+}
+
+/*
+RunRumbleCode executes a given piece of Rumble code on a named processor. Can
+receive an optional WaitGroup which is called once all code has finished processing.
+*/
+func RunRumbleCode(procName, codeName, code string, wg *sync.WaitGroup) (string, error) {
+	var result string
+
+	// Get / create the processor
+
+	re := GetEngine(procName, false)
+
+	// Get the root variable scope
+
+	vs := re.RootVars
+
+	// Set the processor in the root variable scope
+
+	vs.SetValue(rumbledefs.VarProcessor, re.Proc)
+
+	// Parse and validate
+
+	ast, err := parser.ParseWithRuntime(codeName, code,
+		interpreter.NewRumbleRuntimeProvider(codeName))
+
+	if err == nil {
+
+		if err = ast.Runtime.Validate(); err == nil {
+			var res interface{}
+
+			// Get a reference to the processor wrapper
+
+			wp := re.Proc.(*procWrapper)
+
+			// Start the execution transaction
+
+			wp.transLock.Lock()
+
+			// Tell the processor to queue any adding of rules and events
+
+			wp.queueCalls = true
+
+			// Do the code execution
+
+			res, err = ast.Runtime.Eval(vs, make(map[string]interface{}))
+			result = fmt.Sprint(res)
+
+			if res != nil || re.EchoNilResults {
+
+				// Add the result to the log
+
+				re.AddLogMessage(&LogMessage{0, Result, result})
+			}
+
+			// After all calls have been recorded activate the call-pass-through again
+
+			wp.queueCalls = false
+
+			// Kick off the execution thread
+
+			if err == nil {
+
+				go func() {
+					var err error
+
+					// Check if rules should be added
+
+					if len(wp.rulesToAdd) > 0 {
+
+						wp.Finish()
+
+						for _, r := range wp.rulesToAdd {
+							if err = wp.AddRule(r); err != nil {
+								break
+							}
+						}
+					}
+
+					// Check if events should be added
+
+					if err == nil && len(wp.eventsToAdd) > 0 {
+
+						wp.Start()
+
+						for i, e := range wp.eventsToAdd {
+
+							// AddEvent should always succeed since the processor
+							// was just started
+
+							_, err = wp.AddEvent(e, wp.eventParentMonitors[i])
+							errorutil.AssertOk(err)
+						}
+					}
+
+					if err != nil {
+
+						rumble.Log(wp.ID(), 0, rumble.Error, err.Error())
+					}
+
+					// All remaining rules and events are discarded
+
+					wp.rulesToAdd = []*engine.Rule{}
+					wp.eventsToAdd = []*engine.Event{}
+					wp.eventParentMonitors = []engine.Monitor{}
+
+					if wg != nil {
+
+						// If a  wait group was given wait until all processing has finished
+
+						wp.Finish()
+						wg.Done()
+					}
+
+					// Finish the execution transaction
+
+					wp.transLock.Unlock()
+				}()
+
+			} else {
+
+				wp.transLock.Unlock()
+			}
+		}
+	}
+
+	if err != nil && wg != nil {
+		wg.Done()
+	}
+
+	// Store all loaded sinks
+
+	if err == nil {
+		storeSinks(re, ast)
+	}
+
+	return result, err
+}
+
+/*
+Run through a given AST and store all the found sinks on the given engine.
+*/
+func storeSinks(engine *BrawlerEngine, ast *parser.ASTNode) {
+	var visitAst func(node *parser.ASTNode)
+
+	vs := interpreter.NewRootVariableScope("")
+
+	visitAst = func(node *parser.ASTNode) {
+
+		if node.Name == parser.NodeSINK {
+
+			// Getting the name of the sink should always succeed
+
+			name, err := node.Children[0].Runtime.Eval(vs, make(map[string]interface{}))
+			errorutil.AssertOk(err)
+
+			// Store the sink node
+
+			engine.Sinks[fmt.Sprint(name)] = node
+		}
+
+		// Visit children
+
+		for _, cnode := range node.Children {
+			visitAst(cnode)
+		}
+	}
+
+	visitAst(ast)
+}
+
+// Internal data structures
+// ========================
+
+/*
+engines is a map of all defined Brawler engines
+*/
+var engines = make(map[string]*BrawlerEngine)
+var enginesLock = &sync.Mutex{}
+
+/*
+procWrapper is a special processor wrapper which can intercept calls to add events
+and rules and execute them later.
+*/
+type procWrapper struct {
+	engine.Processor
+
+	queueCalls bool        // Flag to queue calls
+	transLock  *sync.Mutex // Lock for code execution transactions
+
+	rulesToAdd          []*engine.Rule   // List of rules to add
+	eventsToAdd         []*engine.Event  // List of events to add
+	eventParentMonitors []engine.Monitor // List of monitors for events to add
+}
+
+/*
+wrapProc wraps a given processor in a procWrapper.
+*/
+func wrapProc(proc engine.Processor) *procWrapper {
+	return &procWrapper{proc, false, &sync.Mutex{}, []*engine.Rule{},
+		[]*engine.Event{}, []engine.Monitor{}}
+}
+
+/*
+AddRule adds a new rule to the processor.
+*/
+func (p *procWrapper) AddRule(rule *engine.Rule) error {
+
+	if p.queueCalls {
+		p.rulesToAdd = append(p.rulesToAdd, rule)
+		return nil
+	}
+
+	return p.Processor.AddRule(rule)
+}
+
+/*
+AddEvent adds a new event to the processor. Returns the monitor if the event
+triggered a rule and nil if the event was skipped.
+*/
+func (p *procWrapper) AddEvent(event *engine.Event, parentMonitor engine.Monitor) (engine.Monitor, error) {
+
+	if p.queueCalls {
+		p.eventsToAdd = append(p.eventsToAdd, event)
+		p.eventParentMonitors = append(p.eventParentMonitors, parentMonitor)
+		return nil, nil
+	}
+
+	return p.Processor.AddEvent(event, parentMonitor)
+}

+ 344 - 0
api/api_test.go

@@ -0,0 +1,344 @@
+/*
+ * Brawler
+ *
+ * Copyright 2019 Matthias Ladkau. All rights reserved.
+ *
+ * This Source Code Form is subject to the terms of the MIT
+ * License, If a copy of the MIT License was not distributed with this
+ * file, You can obtain one at https://opensource.org/licenses/MIT.
+ */
+
+package api
+
+import (
+	"flag"
+	"fmt"
+	"io/ioutil"
+	"os"
+	"path/filepath"
+	"sync"
+	"testing"
+
+	"devt.de/krotik/brawler/engine"
+	"devt.de/krotik/brawler/rumble/interpreter"
+	"devt.de/krotik/brawler/rumble/parser"
+	"devt.de/krotik/common/fileutil"
+)
+
+func TestMemoryLogWriter(t *testing.T) {
+
+	lw := NewLogWriter("testlogger")
+
+	lw.Add(&LogMessage{})
+
+	if res := fmt.Sprint(lw.Slice()); res != "[LogMsg (Monitor:0 Level:): ]" {
+		t.Error("Unexpected result:", res)
+		return
+	}
+
+	if res := lw.Size(); res != 1 {
+		t.Error("Unexpected result:", res)
+		return
+	}
+}
+
+func TestSinkAutoLoad(t *testing.T) {
+	var wg sync.WaitGroup
+
+	engines = make(map[string]*BrawlerEngine)
+	engine.UnitTestResetIDs()
+
+	os.Mkdir(filepath.Join(EngineSinkDirectories, "foo"), 0755)
+
+	ioutil.WriteFile(filepath.Join(EngineSinkDirectories, "foo", "bla.ram"),
+		[]byte(`
+sink "rule1" 
+    kindmatch [ "core.event1" ],
+	{
+        @log("Info", "rule1 < ", event)
+	}
+`), 0666)
+
+	os.Mkdir(filepath.Join(EngineSinkDirectories, "foo2"), 0755)
+
+	ioutil.WriteFile(filepath.Join(EngineSinkDirectories, "foo2", "bla.ram"),
+		[]byte(`
+sink "rule1" 
+    kindmatch [ "core.event1" ],
+	{
+        @log("Info", "rule1 < ", event)
+
+`), 0666)
+
+	ioutil.WriteFile(filepath.Join(EngineSinkDirectories, "foo2", "bla2.ram"),
+		[]byte(`
+"Hallo" := 1	
+`), 0666)
+
+	wg.Add(1)
+
+	re := GetEngine("foo", false)
+
+	RunRumbleCode("foo", "bar", `
+@rootevent("the_event", "core.event1", { "foo"  : "bar" })
+`, &wg)
+
+	wg.Wait()
+
+	if res := fmt.Sprint(re.Log); res != `> <nil>
+LogMsg (Monitor:1 Level:Info): rule1 < {"kind":"core.event1","name":"the_event","state":{"foo":"bar"}}` {
+		t.Error("Unexpected result:", res)
+		return
+	}
+
+	wg.Add(1)
+
+	RunRumbleCode("foo", "bar", `
+"Hallo"
+`, &wg)
+
+	wg.Wait()
+
+	if res := fmt.Sprint(re.Log); res != `> <nil>
+LogMsg (Monitor:1 Level:Info): rule1 < {"kind":"core.event1","name":"the_event","state":{"foo":"bar"}}
+> Hallo` {
+		t.Error("Unexpected result:", res)
+		return
+	}
+
+	// Test error cases
+
+	re = GetEngine("foo2", false)
+
+	if res := fmt.Sprint(re.Log); res != `LogMsg (Monitor:0 Level:Error): Can't load initial sinks for foo2: Parse error in bla.ram: Unexpected end (Line:7 Pos:-2)
+LogMsg (Monitor:0 Level:Error): Can't load initial sinks for foo2: Rumble error in bla2.ram: Invalid construct (Must have a variable on the left side of the assignment) (Line:2 Pos:9)` {
+		t.Error("Unexpected result:", res)
+		return
+	}
+}
+
+func TestBasicAPIFunc(t *testing.T) {
+	var wg sync.WaitGroup
+
+	engines = make(map[string]*BrawlerEngine)
+	engine.UnitTestResetIDs()
+
+	wg.Add(1)
+	_, err := RunRumbleCode("testproc", "testcode", `
+
+sink "rule1" 
+    kindmatch [ "core.event1" ],
+	{
+        @log("Info", "rule1 < ", event)
+	}
+
+@rootevent("the_event", "core.event1", { "foo"  : "bar" })
+`, &wg)
+
+	if err != nil {
+		t.Error(err)
+		return
+	}
+
+	wg.Wait()
+
+	if !EngineExists("testproc") {
+		t.Error("Unexpected result")
+		return
+	}
+
+	if EngineExists("testproc1") {
+		t.Error("Unexpected result")
+		return
+	}
+
+	if res := fmt.Sprint(GetEngineNames()); res != "[testproc]" {
+		t.Error("Unexpected result:", res)
+		return
+	}
+
+	re := GetEngine("testproc", false)
+
+	if res := fmt.Sprint(re.Log); res != `> <nil>
+LogMsg (Monitor:1 Level:Info): rule1 < {"kind":"core.event1","name":"the_event","state":{"foo":"bar"}}` {
+		t.Error("Unexpected result:", res)
+		return
+	}
+
+	re.Log.Reset()
+
+	wg.Add(1)
+	_, err = RunRumbleCode("testproc", "testcode", `
+
+sink "rule1" 
+    kindmatch [ "core.event1" ],
+	{
+	    @testerror1()
+	}
+
+@rootevent("the_event", "core.event1", { "foo"  : "bar" })
+`, &wg)
+
+	if err != nil {
+		t.Error(err)
+		return
+	}
+
+	wg.Wait()
+
+	re = GetEngine("testproc", false)
+
+	if res := fmt.Sprint(re.Log); res != `> <nil>
+LogMsg (Monitor:0 Level:Error): Cannot add rule rule1 twice` {
+		t.Error("Unexpected result:", res)
+		return
+	}
+
+	re.Proc.Reset()
+	re.Log.Reset()
+
+	re.EchoNilResults = false
+
+	wg.Add(1)
+	_, err = RunRumbleCode("testproc", "testcode", `
+
+sink "rule1" 
+    kindmatch [ "core.event1" ],
+	{
+	    @testerror2()
+	}
+@rootevent("the_event", "core.event1", { "foo"  : "bar" })
+`, &wg)
+
+	if err.Error() != "testerror" {
+		t.Error(err)
+		return
+	}
+
+	wg.Wait()
+
+	wg.Add(1)
+	_, err = RunRumbleCode("testproc", "testcode", `
+
+sink "rule1" 
+    kindmatch [ "core.event1" ],
+	{
+	    @testerror1()
+	}
+
+sink "rule2" 
+    kindmatch [ "core.event1" ],
+	{
+	    @testerror1()
+	}
+
+sink "rule3" 
+    kindmatch [ "core.event1" ],
+	{
+	    @testerror1()
+	}
+
+@rootevent("the_event", "core.event1", { "foo"  : "bar" })
+`, &wg)
+
+	if err != nil {
+		t.Error(err)
+		return
+	}
+
+	wg.Wait()
+
+	re = GetEngine("testproc", false)
+
+	if res := fmt.Sprint(re.Log); res != `LogMsg (Monitor:2 Level:Error): the_event -> rule1: testerror
+Environment:
+sink: rule1 {
+    event (map[interface {}]interface {}) : {"kind":"core.event1","name":"the_event","state":{"foo":"bar"}}
+    monitor (*engine.RootMonitor) : Monitor 2 (parent: <nil> priority: 0 activated: true finished: false)
+    processor (*api.procWrapper) : RumbleProcessor 1 (workers:4)
+}
+LogMsg (Monitor:2 Level:Error): the_event -> rule2: testerror
+Environment:
+sink: rule2 {
+    event (map[interface {}]interface {}) : {"kind":"core.event1","name":"the_event","state":{"foo":"bar"}}
+    monitor (*engine.RootMonitor) : Monitor 2 (parent: <nil> priority: 0 activated: true finished: false)
+    processor (*api.procWrapper) : RumbleProcessor 1 (workers:4)
+}
+LogMsg (Monitor:2 Level:Error): the_event -> rule3: testerror
+Environment:
+sink: rule3 {
+    event (map[interface {}]interface {}) : {"kind":"core.event1","name":"the_event","state":{"foo":"bar"}}
+    monitor (*engine.RootMonitor) : Monitor 2 (parent: <nil> priority: 0 activated: true finished: false)
+    processor (*api.procWrapper) : RumbleProcessor 1 (workers:4)
+}` {
+		t.Error("Unexpected result:", res)
+		return
+	}
+
+	re = GetEngine("testproc", true)
+
+	if len(re.Proc.Rules()) != 0 {
+		t.Error("Unexpected result:", re.Proc.Rules())
+		return
+	}
+
+	// Test errors during validation and eval
+
+	wg.Add(1)
+	_, err = RunRumbleCode("testproc", "testcode", `@testerror1()`, &wg)
+	if err.Error() != "testerror" {
+		t.Error(err)
+		return
+	}
+
+	wg.Add(1)
+	_, err = RunRumbleCode("testproc", "testcode", `@testerror2()`, &wg)
+	if err.Error() != "testerror" {
+		t.Error(err)
+		return
+	}
+}
+
+func TestMain(m *testing.M) {
+	flag.Parse()
+
+	// Register a special error test functions
+
+	interpreter.RegisterRumbleFunc("testerror1",
+		nil,
+		func(string, *parser.ASTNode, parser.VarsScope, map[string]interface{},
+			*interpreter.RumbleRuntimeProvider) (interface{}, error) {
+			return nil, fmt.Errorf("testerror")
+		})
+
+	interpreter.RegisterRumbleFunc("testerror2",
+		func(*parser.ASTNode, *interpreter.RumbleRuntimeProvider) error {
+			return fmt.Errorf("testerror")
+		},
+		func(string, *parser.ASTNode, parser.VarsScope, map[string]interface{},
+			*interpreter.RumbleRuntimeProvider) (interface{}, error) {
+			return nil, nil
+		})
+
+	if res, _ := fileutil.PathExists(EngineSinkDirectories); res {
+		os.RemoveAll(EngineSinkDirectories)
+	}
+
+	if err := os.Mkdir(EngineSinkDirectories, 0770); err != nil {
+		fmt.Print("Could not create test directory:", err.Error())
+		os.Exit(1)
+	}
+
+	// Run the tests
+
+	res := m.Run()
+
+	// Teardown
+
+	if err := os.RemoveAll(EngineSinkDirectories); err != nil {
+		fmt.Print("Could not remove test directory:", err.Error())
+		os.Exit(1)
+	}
+
+	os.Exit(res)
+}

+ 146 - 0
api/eventsource.go

@@ -0,0 +1,146 @@
+/*
+ * Brawler
+ *
+ * Copyright 2019 Matthias Ladkau. All rights reserved.
+ *
+ * This Source Code Form is subject to the terms of the MIT
+ * License, If a copy of the MIT License was not distributed with this
+ * file, You can obtain one at https://opensource.org/licenses/MIT.
+ */
+
+package api
+
+import (
+	"sort"
+	"sync"
+	"time"
+
+	"devt.de/krotik/brawler/engine"
+	"devt.de/krotik/common/defs/brawler"
+)
+
+/*
+RegisterEventSource registers a new event source which can inject external
+events into a Brawler engine. If the engine name is empty then the
+events are injected into all known engines. It returns an EventPublisher
+object which should be used by the event source to inject events.
+*/
+func RegisterEventSource(name string, engine string) brawler.EventPublisher {
+	sourcesLock.Lock()
+	defer sourcesLock.Unlock()
+
+	ret := &eventSource{engine, nil, 0}
+	sources[name] = ret
+
+	return ret
+}
+
+/*
+GetEventSourceNames returns all registered event sources.
+*/
+func GetEventSourceNames() []string {
+	sourcesLock.Lock()
+	defer sourcesLock.Unlock()
+
+	ret := make([]string, 0, len(sources))
+
+	for n := range sources {
+		ret = append(ret, n)
+	}
+
+	sort.Strings(ret)
+
+	return ret
+}
+
+// API parameters
+// ==============
+
+/*
+EventSourceEngineCheckTime is the time in seconds an event source waits until
+it checks again for available engines.
+*/
+var EventSourceEngineCheckTime int64 = 5
+
+// Internal data structures
+// ========================
+
+/*
+sources is a map of all external event sources for Brawler engines
+*/
+var sources = make(map[string]*eventSource)
+var sourcesLock = &sync.Mutex{}
+
+/*
+eventSource models the external event source. This object injects the
+events into one or more Brawler engines.
+*/
+type eventSource struct {
+	engine          string           // Engine which should receive events (empty means all engines)
+	enginesCache    []*BrawlerEngine // Cache of engines which received events in the past
+	enginesCacheAge int64            // Age of the cache
+}
+
+/*
+getEngines returns all engines into which the event should be injected.
+*/
+func (sw *eventSource) getEngines() []*BrawlerEngine {
+
+	// Check if we can use the cache
+
+	now := time.Now().Unix()
+
+	if (sw.engine != "" && len(sw.enginesCache) == 1) ||
+		(sw.enginesCacheAge != 0 &&
+			now-sw.enginesCacheAge < EventSourceEngineCheckTime) {
+
+		return sw.enginesCache
+	}
+
+	enginesLock.Lock()
+	defer enginesLock.Unlock()
+
+	var ret []*BrawlerEngine
+
+	if sw.engine == "" {
+		ret = make([]*BrawlerEngine, len(engines))
+		i := 0
+
+		for _, e := range engines {
+			ret[i] = e
+			i++
+		}
+
+	} else if e, ok := engines[sw.engine]; ok {
+
+		ret = append(ret, e)
+	}
+
+	sw.enginesCache = ret
+	sw.enginesCacheAge = now
+
+	return ret
+}
+
+/*
+AddEvent adds a new event to one or more Brawler engines.
+*/
+func (sw *eventSource) AddEvent(name string, kind []string, state map[interface{}]interface{}) error {
+
+	engines := sw.getEngines()
+
+	for _, e := range engines {
+
+		// Get a reference to the actual processor
+
+		wp := e.Proc.(*procWrapper).Processor
+
+		// Inject the event and return on the first error
+
+		if _, err := wp.AddEvent(&engine.Event{}, nil); err != nil {
+			return err
+		}
+	}
+
+	return nil
+}

+ 108 - 0
api/eventsource_test.go

@@ -0,0 +1,108 @@
+/*
+ * Brawler
+ *
+ * Copyright 2019 Matthias Ladkau. All rights reserved.
+ *
+ * This Source Code Form is subject to the terms of the MIT
+ * License, If a copy of the MIT License was not distributed with this
+ * file, You can obtain one at https://opensource.org/licenses/MIT.
+ */
+
+package api
+
+import (
+	"fmt"
+	"testing"
+
+	"devt.de/krotik/brawler/engine"
+)
+
+func TestEventSource(t *testing.T) {
+
+	engines = make(map[string]*BrawlerEngine)
+	engine.UnitTestResetIDs()
+
+	es1 := RegisterEventSource("source1", "").(*eventSource)
+
+	if err := es1.AddEvent("foo", nil, nil); err != nil {
+		t.Error("Unexpected result")
+		return
+	}
+
+	es2 := RegisterEventSource("source2", "eventtest1").(*eventSource)
+
+	if res := GetEventSourceNames(); fmt.Sprint(res) != "[source1 source2]" {
+		t.Error("Unexpected result:", res)
+		return
+	}
+
+	// No engines there to return
+
+	if res := GetEngineNames(); fmt.Sprint(res) != "[]" {
+		t.Error("Unexpected result:", res)
+		return
+	}
+
+	if res := es1.getEngines(); fmt.Sprint(res) != "[]" {
+		t.Error("Unexpected result:", res)
+		return
+	}
+
+	if res := es2.getEngines(); fmt.Sprint(res) != "[]" {
+		t.Error("Unexpected result:", res)
+		return
+	}
+
+	GetEngine("eventtest1", false)
+
+	EventSourceEngineCheckTime = 0
+
+	if res := es1.getEngines(); fmt.Sprint(res) != "[BrawlerEngine: eventtest1 (EchoNilResults: true, Sinks: 0, Processor: RumbleProcessor 1 (workers:4) Status: Stopped)]" {
+		t.Error("Unexpected result:", res)
+		return
+	}
+
+	if res := es2.getEngines(); fmt.Sprint(res) != "[BrawlerEngine: eventtest1 (EchoNilResults: true, Sinks: 0, Processor: RumbleProcessor 1 (workers:4) Status: Stopped)]" {
+		t.Error("Unexpected result:", res)
+		return
+	}
+
+	GetEngine("eventtest2", false)
+	EventSourceEngineCheckTime = 40
+
+	if res := es1.getEngines(); fmt.Sprint(res) !=
+		"[BrawlerEngine: eventtest1 (EchoNilResults: true, Sinks: 0, "+
+			"Processor: RumbleProcessor 1 (workers:4) Status: Stopped)]" {
+		t.Error("Unexpected result:", res)
+		return
+	}
+
+	if res := es2.getEngines(); fmt.Sprint(res) !=
+		"[BrawlerEngine: eventtest1 (EchoNilResults: true, Sinks: 0, "+
+			"Processor: RumbleProcessor 1 (workers:4) Status: Stopped)]" {
+		t.Error("Unexpected result:", res)
+		return
+	}
+
+	EventSourceEngineCheckTime = 0
+
+	if res := es1.getEngines(); len(res) != 2 {
+		t.Error("Unexpected result:", res)
+		return
+	}
+
+	if res := es2.getEngines(); fmt.Sprint(res) !=
+		"[BrawlerEngine: eventtest1 (EchoNilResults: true, Sinks: 0, "+
+			"Processor: RumbleProcessor 1 (workers:4) Status: Stopped)]" {
+		t.Error("Unexpected result:", res)
+		return
+	}
+
+	// Now add an event (with error)
+
+	if err := es1.AddEvent("foo", nil, nil); err == nil {
+		t.Error("Unexpected result")
+		return
+	}
+
+}

+ 77 - 0
api/web/about.go

@@ -0,0 +1,77 @@
+/*
+ * Brawler
+ *
+ * Copyright 2019 Matthias Ladkau. All rights reserved.
+ *
+ * This Source Code Form is subject to the terms of the MIT
+ * License, If a copy of the MIT License was not distributed with this
+ * file, You can obtain one at https://opensource.org/licenses/MIT.
+ */
+
+/*
+Package web contains general REST API definitions.
+
+The REST API provides an interface to Brawler. It allows querying and control
+of the event engine. The API responds to GET, POST, PUT and DELETE requests in JSON.
+
+Common API definitions
+
+/about
+
+Endpoint which returns an object with version information.
+
+	api_versions : List of available API versions e.g. [ "v1" ]
+	product      : Name of the API provider (Brawler)
+	version:     : Version of the API provider
+	revision:    : Revision of the API provider
+
+/swagger.json
+
+Dynamically generated swagger definition file. See: http://swagger.io
+*/
+package web
+
+import (
+	"encoding/json"
+	"net/http"
+
+	"devt.de/krotik/brawler/engine"
+)
+
+/*
+EndpointAbout is the about endpoint URL (rooted). Handles about/
+*/
+const EndpointAbout = APIRoot + "/about/"
+
+/*
+AboutEndpointInst creates a new endpoint handler.
+*/
+func AboutEndpointInst() RestEndpointHandler {
+	return &aboutEndpoint{}
+}
+
+/*
+Handler object for about operations.
+*/
+type aboutEndpoint struct {
+	*DefaultEndpointHandler
+}
+
+/*
+HandleGET returns about data for the REST API.
+*/
+func (a *aboutEndpoint) HandleGET(w http.ResponseWriter, r *http.Request, resources []string) {
+
+	data := map[string]interface{}{
+		"api_versions": []string{"v1"},
+		"product":      "Brawler",
+		"version":      engine.ProductVersion,
+	}
+
+	// Write data
+
+	w.Header().Set("content-type", "application/json; charset=utf-8")
+
+	ret := json.NewEncoder(w)
+	ret.Encode(data)
+}

+ 187 - 0
api/web/rest.go

@@ -0,0 +1,187 @@
+/*
+ * Brawler
+ *
+ * Copyright 2019 Matthias Ladkau. All rights reserved.
+ *
+ * This Source Code Form is subject to the terms of the MIT
+ * License, If a copy of the MIT License was not distributed with this
+ * file, You can obtain one at https://opensource.org/licenses/MIT.
+ */
+
+package web
+
+import (
+	"net/http"
+	"strings"
+
+	"devt.de/krotik/brawler/api"
+)
+
+/*
+APIVersion is the version of the REST API
+*/
+const APIVersion = "1.0.0"
+
+/*
+APIRoot is the root directory for the REST API
+*/
+const APIRoot = "/rum"
+
+/*
+APISchemes is a list of supported protocol schemes
+*/
+var APISchemes = []string{"https"}
+
+/*
+APIHost is the host definition for the REST API
+*/
+var APIHost = "localhost:9030"
+
+/*
+GeneralEndpointMap contains general endpoints which should always be available
+*/
+var GeneralEndpointMap = map[string]RestEndpointInst{
+	EndpointAbout:   AboutEndpointInst,
+	EndpointSwagger: SwaggerEndpointInst,
+}
+
+/*
+RestEndpointInst models a factory function for REST endpoint handlers.
+*/
+type RestEndpointInst func() RestEndpointHandler
+
+/*
+RestEndpointHandler models a REST endpoint handler.
+*/
+type RestEndpointHandler interface {
+
+	/*
+		HandleGET handles a GET request.
+	*/
+	HandleGET(w http.ResponseWriter, r *http.Request, resources []string)
+
+	/*
+		HandlePOST handles a POST request.
+	*/
+	HandlePOST(w http.ResponseWriter, r *http.Request, resources []string)
+
+	/*
+		HandlePUT handles a PUT request.
+	*/
+	HandlePUT(w http.ResponseWriter, r *http.Request, resources []string)
+
+	/*
+		HandleDELETE handles a DELETE request.
+	*/
+	HandleDELETE(w http.ResponseWriter, r *http.Request, resources []string)
+
+	/*
+		SwaggerDefs is used to describe the endpoint in swagger.
+	*/
+	SwaggerDefs(s map[string]interface{})
+}
+
+/*
+Map of all registered endpoint handlers.
+*/
+var registered = map[string]RestEndpointInst{}
+
+/*
+HandleFunc to use for registering handlers
+
+Should be of type: func(pattern string, handler func(http.ResponseWriter, *http.Request))
+*/
+var HandleFunc = http.HandleFunc
+
+/*
+RegisterRestEndpoints registers all given REST endpoint handlers.
+*/
+func RegisterRestEndpoints(endpointInsts map[string]RestEndpointInst) {
+
+	for url, endpointInst := range endpointInsts {
+		registered[url] = endpointInst
+
+		HandleFunc(url, func() func(w http.ResponseWriter, r *http.Request) {
+
+			var handlerURL = url
+			var handlerInst = endpointInst
+
+			return func(w http.ResponseWriter, r *http.Request) {
+
+				// Create a new handler instance
+
+				handler := handlerInst()
+
+				// Handle request in appropriate method
+
+				res := strings.TrimSpace(r.URL.Path[len(handlerURL):])
+
+				if len(res) > 0 && res[len(res)-1] == '/' {
+					res = res[:len(res)-1]
+				}
+
+				var resources []string
+
+				if res != "" {
+					resources = strings.Split(res, "/")
+				}
+
+				switch r.Method {
+				case "GET":
+					handler.HandleGET(w, r, resources)
+
+				case "POST":
+					handler.HandlePOST(w, r, resources)
+
+				case "PUT":
+					handler.HandlePUT(w, r, resources)
+
+				case "DELETE":
+					handler.HandleDELETE(w, r, resources)
+
+				default:
+					Error(w, api.ErrorRestAPI, http.StatusText(http.StatusMethodNotAllowed),
+						"", http.StatusMethodNotAllowed)
+				}
+			}
+		}())
+	}
+}
+
+/*
+DefaultEndpointHandler represents the default endpoint handler.
+*/
+type DefaultEndpointHandler struct {
+}
+
+/*
+HandleGET is a method stub returning an error.
+*/
+func (de *DefaultEndpointHandler) HandleGET(w http.ResponseWriter, r *http.Request, resources []string) {
+	Error(w, api.ErrorRestAPI, http.StatusText(http.StatusMethodNotAllowed),
+		"", http.StatusMethodNotAllowed)
+}
+
+/*
+HandlePOST is a method stub returning an error.
+*/
+func (de *DefaultEndpointHandler) HandlePOST(w http.ResponseWriter, r *http.Request, resources []string) {
+	Error(w, api.ErrorRestAPI, http.StatusText(http.StatusMethodNotAllowed),
+		"", http.StatusMethodNotAllowed)
+}
+
+/*
+HandlePUT is a method stub returning an error.
+*/
+func (de *DefaultEndpointHandler) HandlePUT(w http.ResponseWriter, r *http.Request, resources []string) {
+	Error(w, api.ErrorRestAPI, http.StatusText(http.StatusMethodNotAllowed),
+		"", http.StatusMethodNotAllowed)
+}
+
+/*
+HandleDELETE is a method stub returning an error.
+*/
+func (de *DefaultEndpointHandler) HandleDELETE(w http.ResponseWriter, r *http.Request, resources []string) {
+	Error(w, api.ErrorRestAPI, http.StatusText(http.StatusMethodNotAllowed),
+		"", http.StatusMethodNotAllowed)
+}

+ 364 - 0
api/web/rest_test.go

@@ -0,0 +1,364 @@
+/*
+ * Brawler
+ *
+ * Copyright 2019 Matthias Ladkau. All rights reserved.
+ *
+ * This Source Code Form is subject to the terms of the MIT
+ * License, If a copy of the MIT License was not distributed with this
+ * file, You can obtain one at https://opensource.org/licenses/MIT.
+ */
+
+package web
+
+import (
+	"bytes"
+	"encoding/json"
+	"fmt"
+	"io/ioutil"
+	"net/http"
+	"strings"
+	"sync"
+	"testing"
+
+	"devt.de/krotik/brawler/engine"
+	"devt.de/krotik/brawler/rumble/interpreter"
+	"devt.de/krotik/brawler/rumble/parser"
+	"devt.de/krotik/common/errorutil"
+	"devt.de/krotik/common/httputil"
+)
+
+const TESTPORT = ":9030"
+
+var lastRes []string
+var resperr interface{}
+
+type testEndpoint struct {
+	*DefaultEndpointHandler
+}
+
+func (te *testEndpoint) HandleGET(w http.ResponseWriter, r *http.Request, resources []string) {
+	lastRes = resources
+	if resperr != nil {
+		Error(w, "", "foo", resperr, 500)
+		return
+	}
+	te.DefaultEndpointHandler.HandleGET(w, r, resources)
+}
+
+func (te *testEndpoint) SwaggerDefs(s map[string]interface{}) {
+}
+
+var testEndpointMap = map[string]RestEndpointInst{
+	"/": func() RestEndpointHandler {
+		return &testEndpoint{}
+	},
+}
+
+func TestEndpointHandling(t *testing.T) {
+
+	hs, wg := startServer()
+	if hs == nil {
+		return
+	}
+
+	queryURL := "http://localhost" + TESTPORT
+
+	RegisterRestEndpoints(testEndpointMap)
+	RegisterRestEndpoints(GeneralEndpointMap)
+
+	// Test rumble errors
+
+	resperr = &parser.Error{}
+
+	if res := sendTestRequest(queryURL, "GET", nil); res != `{
+  "detail": "Parse error in : \u003cnil\u003e",
+  "error": "foo",
+  "line": 0,
+  "parser_error": "",
+  "pos": 0,
+  "source": "",
+  "type": "Rumble Parser Error"
+}` {
+		t.Error("Unexpected response:", res)
+		return
+	}
+
+	resperr = &interpreter.RuntimeError{}
+	if res := sendTestRequest(queryURL, "GET", nil); res != `{
+  "detail": "Rumble error in : \u003cnil\u003e ()",
+  "error": "foo",
+  "line": 0,
+  "pos": 0,
+  "runtime_error": "",
+  "source": "",
+  "type": "Rumble Runtime Error"
+}` {
+		t.Error("Unexpected response:", res)
+		return
+	}
+
+	resperr = nil
+
+	lastRes = nil
+	methodNotAllowed := `{
+  "detail": "",
+  "error": "Method Not Allowed",
+  "type": "Rest API Error"
+}`
+
+	if res := sendTestRequest(queryURL, "GET", nil); res != methodNotAllowed {
+		t.Error("Unexpected response:", res)
+		return
+	}
+
+	if lastRes != nil {
+		t.Error("Unexpected lastRes:", lastRes)
+	}
+
+	lastRes = nil
+
+	if res := sendTestRequest(queryURL+"/foo/bar", "GET", nil); res != methodNotAllowed {
+		t.Error("Unexpected response:", res)
+		return
+	}
+
+	if fmt.Sprint(lastRes) != "[foo bar]" {
+		t.Error("Unexpected lastRes:", lastRes)
+	}
+
+	lastRes = nil
+
+	if res := sendTestRequest(queryURL+"/foo/bar/", "GET", nil); res != methodNotAllowed {
+		t.Error("Unexpected response:", res)
+		return
+	}
+
+	if fmt.Sprint(lastRes) != "[foo bar]" {
+		t.Error("Unexpected lastRes:", lastRes)
+	}
+
+	if res := sendTestRequest(queryURL, "POST", nil); res != methodNotAllowed {
+		t.Error("Unexpected response:", res)
+		return
+	}
+
+	if res := sendTestRequest(queryURL, "PUT", nil); res != methodNotAllowed {
+		t.Error("Unexpected response:", res)
+		return
+	}
+
+	if res := sendTestRequest(queryURL, "DELETE", nil); res != methodNotAllowed {
+		t.Error("Unexpected response:", res)
+		return
+	}
+
+	if res := sendTestRequest(queryURL, "UPDATE", nil); res != methodNotAllowed {
+		t.Error("Unexpected response:", res)
+		return
+	}
+
+	// Test about endpoints
+
+	if res := sendTestRequest(queryURL+APIRoot+"/about", "GET", nil); res != fmt.Sprintf(`
+{
+  "api_versions": [
+    "v1"
+  ],
+  "product": "Brawler",
+  "version": "%v"
+}`[1:], engine.ProductVersion) {
+		t.Error("Unexpected response:", res)
+		return
+	}
+
+	if res := sendTestRequest(queryURL+APIRoot+"/swagger.json", "GET", nil); res != `
+{
+  "basePath": "/rum",
+  "definitions": {
+    "Error": {
+      "description": "Object describing the error.",
+      "properties": {
+        "detail": {
+          "description": "Error related details.",
+          "type": "string"
+        },
+        "error": {
+          "description": "A human readable error message.",
+          "type": "string"
+        },
+        "line": {
+          "description": "Line where the error occurred.",
+          "type": "number"
+        },
+        "parser_error": {
+          "description": "Error related details.",
+          "type": "string"
+        },
+        "pos": {
+          "description": "Position in line where the error occurred.",
+          "type": "number"
+        },
+        "runtime_error": {
+          "description": "Error related details.",
+          "type": "string"
+        },
+        "source": {
+          "description": "Error source (e.g. file).",
+          "type": "string"
+        },
+        "type": {
+          "description": "A human readable error type.",
+          "type": "string"
+        }
+      },
+      "required": [
+        "type",
+        "error",
+        "detail"
+      ]
+    }
+  },
+  "host": "localhost:9030",
+  "info": {
+    "description": "Query and control the Brawler event engine.",
+    "title": "Brawler API",
+    "version": "1.0.0"
+  },
+  "paths": {
+    "/about": {
+      "get": {
+        "description": "Returns available API versions, product name and product version.",
+        "produces": [
+          "application/json"
+        ],
+        "responses": {
+          "200": {
+            "description": "About info object",
+            "schema": {
+              "properties": {
+                "api_versions": {
+                  "description": "List of available API versions.",
+                  "items": {
+                    "description": "Available API version.",
+                    "type": "string"
+                  },
+                  "type": "array"
+                },
+                "product": {
+                  "description": "Product name of the REST API provider.",
+                  "type": "string"
+                },
+                "version": {
+                  "description": "Version of the REST API provider.",
+                  "type": "string"
+                }
+              },
+              "type": "object"
+            }
+          },
+          "default": {
+            "description": "Error response",
+            "schema": {
+              "$ref": "#/definitions/Error"
+            }
+          }
+        },
+        "summary": "Return information about the REST API provider."
+      }
+    }
+  },
+  "produces": [
+    "application/json"
+  ],
+  "schemes": [
+    "https"
+  ],
+  "swagger": "2.0"
+}`[1:] {
+		t.Error("Unexpected response:", res)
+		return
+	}
+
+	stopServer(hs, wg)
+}
+
+/*
+Send a request to a HTTP test server
+*/
+func sendTestRequest(url string, method string, content []byte) string {
+	var req *http.Request
+	var err error
+
+	if content != nil {
+		req, err = http.NewRequest(method, url, bytes.NewBuffer(content))
+	} else {
+		req, err = http.NewRequest(method, url, nil)
+	}
+	errorutil.AssertOk(err)
+	req.Header.Set("Content-Type", "application/json")
+
+	client := &http.Client{}
+	resp, err := client.Do(req)
+	if err != nil {
+		panic(err)
+	}
+	defer resp.Body.Close()
+
+	body, _ := ioutil.ReadAll(resp.Body)
+	bodyStr := strings.Trim(string(body), " \n")
+
+	// Try json decoding first
+
+	out := bytes.Buffer{}
+	err = json.Indent(&out, []byte(bodyStr), "", "  ")
+	if err == nil {
+		return out.String()
+	}
+
+	// Just return the body
+
+	return bodyStr
+}
+
+/*
+Start a HTTP test server.
+*/
+func startServer() (*httputil.HTTPServer, *sync.WaitGroup) {
+	hs := &httputil.HTTPServer{}
+
+	var wg sync.WaitGroup
+	wg.Add(1)
+
+	go hs.RunHTTPServer(TESTPORT, &wg)
+
+	wg.Wait()
+
+	// Server is started
+
+	if hs.LastError != nil {
+		panic(hs.LastError)
+	}
+
+	return hs, &wg
+}
+
+/*
+Stop a started HTTP test server.
+*/
+func stopServer(hs *httputil.HTTPServer, wg *sync.WaitGroup) {
+
+	if hs.Running == true {
+
+		wg.Add(1)
+
+		// Server is shut down
+
+		hs.Shutdown()
+
+		wg.Wait()
+
+	} else {
+
+		panic("Server was not running as expected")
+	}
+}

+ 173 - 0
api/web/swagger.go

@@ -0,0 +1,173 @@
+/*
+ * Brawler
+ *
+ * Copyright 2019 Matthias Ladkau. All rights reserved.
+ *
+ * This Source Code Form is subject to the terms of the MIT
+ * License, If a copy of the MIT License was not distributed with this
+ * file, You can obtain one at https://opensource.org/licenses/MIT.
+ */
+
+package web
+
+import (
+	"encoding/json"
+	"net/http"
+)
+
+/*
+SwaggerDefs is used to describe the endpoint in swagger.
+*/
+func (a *aboutEndpoint) SwaggerDefs(s map[string]interface{}) {
+
+	// Add query paths
+
+	s["paths"].(map[string]interface{})["/about"] = map[string]interface{}{
+		"get": map[string]interface{}{
+			"summary":     "Return information about the REST API provider.",
+			"description": "Returns available API versions, product name and product version.",
+			"produces": []string{
+				"application/json",
+			},
+			"responses": map[string]interface{}{
+				"200": map[string]interface{}{
+					"description": "About info object",
+					"schema": map[string]interface{}{
+						"type": "object",
+						"properties": map[string]interface{}{
+							"api_versions": map[string]interface{}{
+								"description": "List of available API versions.",
+								"type":        "array",
+								"items": map[string]interface{}{
+									"description": "Available API version.",
+									"type":        "string",
+								},
+							},
+							"product": map[string]interface{}{
+								"description": "Product name of the REST API provider.",
+								"type":        "string",
+							},
+							"version": map[string]interface{}{
+								"description": "Version of the REST API provider.",
+								"type":        "string",
+							},
+						},
+					},
+				},
+				"default": map[string]interface{}{
+					"description": "Error response",
+					"schema": map[string]interface{}{
+						"$ref": "#/definitions/Error",
+					},
+				},
+			},
+		},
+	}
+
+	// Add generic error object to definition
+
+	s["definitions"].(map[string]interface{})["Error"] = map[string]interface{}{
+		"description": "Object describing the error.",
+		"required":    []string{"type", "error", "detail"},
+		"properties": map[string]interface{}{
+			"type": map[string]interface{}{
+				"description": "A human readable error type.",
+				"type":        "string",
+			},
+			"error": map[string]interface{}{
+				"description": "A human readable error message.",
+				"type":        "string",
+			},
+			"detail": map[string]interface{}{
+				"description": "Error related details.",
+				"type":        "string",
+			},
+			"source": map[string]interface{}{
+				"description": "Error source (e.g. file).",
+				"type":        "string",
+			},
+			"parser_error": map[string]interface{}{
+				"description": "Error related details.",
+				"type":        "string",
+			},
+			"runtime_error": map[string]interface{}{
+				"description": "Error related details.",
+				"type":        "string",
+			},
+			"line": map[string]interface{}{
+				"description": "Line where the error occurred.",
+				"type":        "number",
+			},
+			"pos": map[string]interface{}{
+				"description": "Position in line where the error occurred.",
+				"type":        "number",
+			},
+		},
+	}
+}
+
+/*
+EndpointSwagger is the swagger endpoint URL (rooted). Handles swagger.json/
+*/
+const EndpointSwagger = APIRoot + "/swagger.json/"
+
+/*
+SwaggerEndpointInst creates a new endpoint handler.
+*/
+func SwaggerEndpointInst() RestEndpointHandler {
+	return &swaggerEndpoint{}
+}
+
+/*
+Handler object for swagger operations.
+*/
+type swaggerEndpoint struct {
+	*DefaultEndpointHandler
+}
+
+/*
+HandleGET returns the swagger definition of the REST API.
+*/
+func (a *swaggerEndpoint) HandleGET(w http.ResponseWriter, r *http.Request, resources []string) {
+
+	// Add general sections
+
+	data := map[string]interface{}{
+		"swagger":     "2.0",
+		"host":        APIHost,
+		"schemes":     APISchemes,
+		"basePath":    APIRoot,
+		"produces":    []string{"application/json"},
+		"paths":       map[string]interface{}{},
+		"definitions": map[string]interface{}{},
+	}
+
+	// Go through all registered components and let them add their definitions
+
+	a.SwaggerDefs(data)
+
+	for _, inst := range registered {
+		inst().SwaggerDefs(data)
+	}
+
+	// Write data
+
+	w.Header().Set("content-type", "application/json; charset=utf-8")
+
+	ret := json.NewEncoder(w)
+	ret.Encode(data)
+}
+
+/*
+SwaggerDefs is used to describe the endpoint in swagger.
+*/
+func (a *swaggerEndpoint) SwaggerDefs(s map[string]interface{}) {
+
+	// Add general application information
+
+	s["info"] = map[string]interface{}{
+		"title":       "Brawler API",
+		"description": "Query and control the Brawler event engine.",
+		"version":     APIVersion,
+	}
+}

+ 64 - 0
api/web/util.go

@@ -0,0 +1,64 @@
+/*
+ * Brawler
+ *
+ * Copyright 2019 Matthias Ladkau. All rights reserved.
+ *
+ * This Source Code Form is subject to the terms of the MIT
+ * License, If a copy of the MIT License was not distributed with this
+ * file, You can obtain one at https://opensource.org/licenses/MIT.
+ */
+
+package web
+
+import (
+	"encoding/json"
+	"fmt"
+	"net/http"
+
+	"devt.de/krotik/brawler/api"
+	"devt.de/krotik/brawler/rumble/interpreter"
+	"devt.de/krotik/brawler/rumble/parser"
+	"devt.de/krotik/common/errorutil"
+)
+
+/*
+Error writes an error response to a given response writer.
+*/
+func Error(w http.ResponseWriter, errortype string, msg string, detail interface{}, httpcode int) {
+	w.Header().Set("content-type", "application/json; charset=utf-8")
+
+	w.WriteHeader(httpcode)
+
+	data := map[string]interface{}{
+		"type":   errortype,
+		"error":  msg,
+		"detail": fmt.Sprint(detail),
+	}
+
+	// Extended error info depending on the error type
+
+	if perr, ok := detail.(*parser.Error); ok {
+
+		data["type"] = api.ErrorRumbleParser // Error type
+		data["source"] = perr.Source         // Name of the source which was given to the parser
+		data["parser_error"] = perr.Detail   // Details of this error
+		data["line"] = perr.Line             // Line of the error
+		data["pos"] = perr.Pos               // Position of the error
+
+	} else if rerr, ok := detail.(*interpreter.RuntimeError); ok {
+
+		data["type"] = api.ErrorRumbleRuntime // Error type
+		data["source"] = rerr.Source          // Name of the source which was given to the parser
+		data["runtime_error"] = rerr.Detail   // Details of this error
+		data["line"] = rerr.Line              // Line of the error
+		data["pos"] = rerr.Pos                // Position of the error
+
+	}
+
+	errorutil.AssertTrue(data["type"] != "", "Invalid error type")
+
+	// Write data
+
+	ret := json.NewEncoder(w)
+	ret.Encode(data)
+}

+ 680 - 0
api/web/v1/engine.go

@@ -0,0 +1,680 @@
+/*
+ * Brawler
+ *
+ * Copyright 2019 Matthias Ladkau. All rights reserved.
+ *
+ * This Source Code Form is subject to the terms of the MIT
+ * License, If a copy of the MIT License was not distributed with this
+ * file, You can obtain one at https://opensource.org/licenses/MIT.
+ */
+
+/*
+Package v1 contains Brawler REST API Version 1.
+
+Engine endpoint
+
+/engine
+
+The engine endpoint is used to control named event engines. A GET request
+gives an overview over all available engines. The response should
+have the following structure:
+
+	[ <engine name>, <engine name>, <engine name> ]
+
+/engine/<engine name>
+
+A GET request to a specific engine returns the current engine state. The
+response should have the following structure:
+
+	{
+		id      : <ID of the engine>
+		workers : <Number of worker threads>
+		status  : <Status of the engine (Running / Stopping / Stopped)>
+	}
+
+A POST request to a specific engine runs the given rumble code on the engine.
+The body should contain the rumble script in plain text. The engine is created
+if it does not exist. Can optimally be posted to a subpath which is interpreted
+as code name.
+
+The response is the result of the code:
+
+	{
+		result: <Result of the code>
+	}
+
+/engine/<engine name>/log
+
+A GET request to the log path returns all log messages for the given engine.
+The response should be a list of log message objects of the following structure:
+
+	{
+		mid   : <Root monitor ID for the log message>
+		level : <Message level>
+		msg   : <Log message>
+	}
+
+A DELETE to the log path clears all log messages.
+
+/engine/<engine name>/sinks
+
+A DElETE to the sinks path clears all current sinks from an engine and reloads
+its standard sinks.
+
+/engine/<engine name>/sinks?prefix=<prefix>
+
+A GET request to the sinks path returns all available sinks. The response
+should have the following structure:
+
+	{
+		<name> : {
+			description      : <Sink description>,
+			kind_match       : <List of matched event kinds>
+			scope_match      : <List of matched scopes>
+			state_match      : { <Required state key> : <Required state value / nil for any> }
+			priority         : <Priority of the sink>
+			suppression_list : <List of suppressed sinks>
+		}
+	}
+
+The search can be narrowed down by providing an optional name prefix.
+
+/engine/<engine name>/sink/<sink name>
+
+A GET request to the sink path returns the code of the given sink in plain text.
+
+
+Rumble endpoint
+
+The rumble endpoint can be used to retrieve information about Rumble.
+
+/rumble/functions
+
+A GET request to the functions endpoint returns a list of all available
+Rumble functions.
+
+/rumble/keywords
+
+A GET request to the keywords endpoint returns a list of all available
+Rumble keywords.
+*/
+package v1
+
+import (
+	"bytes"
+	"encoding/json"
+	"fmt"
+	"net/http"
+
+	"devt.de/krotik/brawler/api"
+	"devt.de/krotik/brawler/api/web"
+	"devt.de/krotik/brawler/rumble/parser"
+	"devt.de/krotik/common/errorutil"
+)
+
+/*
+EndpointEngine is the engine endpoint URL (rooted). Handles everything under engine/...
+*/
+const EndpointEngine = web.APIRoot + APIv1 + "/engine/"
+
+/*
+EngineEndpointInst creates a new endpoint handler.
+*/
+func EngineEndpointInst() web.RestEndpointHandler {
+	return &engineEndpoint{}
+}
+
+/*
+Handler object for engine operations.
+*/
+type engineEndpoint struct {
+	*web.DefaultEndpointHandler
+}
+
+/*
+HandleGET handles REST calls to retrieve engine information.
+*/
+func (ee *engineEndpoint) HandleGET(w http.ResponseWriter, r *http.Request, resources []string) {
+	var data interface{}
+
+	if !checkResources(w, resources, 0, 3, "") {
+		return
+	}
+
+	reslen := len(resources)
+
+	if reslen == 0 {
+
+		// No engine name given just return all available engines
+
+		data = api.GetEngineNames()
+
+	} else if reslen > 0 {
+
+		name := resources[0]
+
+		if !api.EngineExists(name) {
+			web.Error(w, api.ErrorRestAPI, http.StatusText(http.StatusBadRequest),
+				fmt.Sprintf("Engine %v does not exist", name), http.StatusBadRequest)
+			return
+		}
+
+		engine := api.GetEngine(name, false)
+
+		if reslen == 1 {
+
+			// Return processor status
+
+			proc := engine.Proc
+
+			data = map[string]interface{}{
+				"id":      proc.ID(),
+				"workers": proc.Workers(),
+				"status":  proc.Status(),
+			}
+
+		} else if reslen == 2 && resources[1] == "log" {
+
+			// Return log messages
+
+			logMsgs := engine.Log.Slice()
+
+			msgs := make([]map[string]interface{}, 0, len(logMsgs))
+
+			for _, msg := range logMsgs {
+
+				msgs = append(msgs, map[string]interface{}{
+					"mid":   msg.MonitorID,
+					"level": msg.Level,
+					"msg":   msg.Msg,
+				})
+			}
+
+			data = msgs
+
+		} else if reslen == 2 && resources[1] == "sinks" {
+
+			// Return log messages
+
+			rules := engine.Proc.Rules()
+
+			sinks := make(map[string]interface{})
+
+			for name, rule := range rules {
+
+				sinks[name] = map[string]interface{}{
+					"desc":             rule.Desc,
+					"kind_match":       rule.KindMatch,
+					"scope_match":      rule.ScopeMatch,
+					"state_match":      rule.StateMatch,
+					"priority":         rule.Priority,
+					"suppression_list": rule.SuppressionList,
+				}
+			}
+
+			data = sinks
+
+		} else if reslen == 3 && resources[1] == "sink" {
+
+			if sinkAST, ok := engine.Sinks[resources[2]]; ok {
+
+				// It should be possible to pretty print all parsed sinks
+
+				code, err := parser.PrettyPrint(sinkAST)
+				errorutil.AssertOk(err)
+
+				w.Header().Set("content-type", "text/plain; charset=utf-8")
+				w.Write([]byte(code))
+
+			} else {
+				web.Error(w, api.ErrorRestAPI, http.StatusText(http.StatusBadRequest),
+					"Unknown sink", http.StatusBadRequest)
+			}
+
+			return
+
+		} else {
+
+			web.Error(w, api.ErrorRestAPI, http.StatusText(http.StatusBadRequest),
+				"Unknown resource request", http.StatusBadRequest)
+
+			return
+		}
+	}
+
+	// Write data
+
+	w.Header().Set("content-type", "application/json; charset=utf-8")
+
+	ret := json.NewEncoder(w)
+	ret.Encode(data)
+}
+
+/*
+HandleDELETE handles a REST call to delete log messages.
+*/
+func (ee *engineEndpoint) HandleDELETE(w http.ResponseWriter, r *http.Request, resources []string) {
+
+	// Check parameters
+
+	if !checkResources(w, resources, 2, 2, "Need an engine name and a resource") {
+		return
+	}
+
+	name := resources[0]
+	resource := resources[1]
+
+	if !api.EngineExists(name) {
+		web.Error(w, api.ErrorRestAPI, http.StatusText(http.StatusBadRequest),
+			fmt.Sprintf("Engine %v does not exist", name), http.StatusBadRequest)
+		return
+	}
+
+	engine := api.GetEngine(name, false)
+
+	if resource == "log" {
+
+		// Reset the log of the engine
+
+		engine.Log.Reset()
+
+	} else if resource == "sinks" {
+
+		// Reset the engine and reload all default sinks
+
+		api.GetEngine(name, true)
+
+	} else {
+
+		web.Error(w, api.ErrorRestAPI, http.StatusText(http.StatusBadRequest),
+			"Unknown resource request", http.StatusBadRequest)
+
+		return
+	}
+}
+
+/*
+HandlePOST handles a REST call to run rumble code.
+*/
+func (ee *engineEndpoint) HandlePOST(w http.ResponseWriter, r *http.Request, resources []string) {
+	var buf bytes.Buffer
+
+	// Check parameters
+
+	if !checkResources(w, resources, 1, 2, "Need an engine name") {
+		return
+	}
+
+	codename := "code"
+	if len(resources) > 1 {
+		codename = resources[1]
+	}
+
+	buf.ReadFrom(r.Body)
+
+	result, err := api.RunRumbleCode(resources[0], codename, buf.String(), nil)
+
+	if err != nil {
+		web.Error(w, "", http.StatusText(http.StatusBadRequest),
+			err, http.StatusBadRequest)
+	} else {
+		w.Header().Set("content-type", "application/json; charset=utf-8")
+
+		ret := json.NewEncoder(w)
+		ret.Encode(map[string]interface{}{
+			"result": result,
+		})
+	}
+}
+
+/*
+SwaggerDefs is used to describe the endpoint in swagger.
+*/
+func (ee *engineEndpoint) SwaggerDefs(s map[string]interface{}) {
+
+	engineParams := []map[string]interface{}{
+		{
+			"name":        "engine_name",
+			"in":          "path",
+			"description": "Name of engine.",
+			"required":    true,
+			"type":        "string",
+		},
+	}
+
+	sinkParams := []map[string]interface{}{
+		{
+			"name":        "sink_name",
+			"in":          "path",
+			"description": "Name of sink.",
+			"required":    true,
+			"type":        "string",
+		},
+	}
+
+	codenameParams := []map[string]interface{}{
+		{
+			"name":        "code_name",
+			"in":          "path",
+			"description": "Name of code.",
+			"required":    true,
+			"type":        "string",
+		},
+	}
+
+	optionalQueryParams := []map[string]interface{}{
+		{
+			"name":        "prefix",
+			"in":          "query",
+			"description": "Only return items which start with the given prefix.",
+			"required":    false,
+			"type":        "string",
+		},
+	}
+
+	s["paths"].(map[string]interface{})["/v1/engine"] = map[string]interface{}{
+		"get": map[string]interface{}{
+			"summary":     "Return engine information.",
+			"description": "Returns a list of all available engines.",
+			"produces": []string{
+				"application/json",
+			},
+			"responses": map[string]interface{}{
+				"200": map[string]interface{}{
+					"description": "List of all available engines",
+					"schema": map[string]interface{}{
+						"type": "array",
+						"items": map[string]interface{}{
+							"type": "string",
+						},
+					},
+				},
+				"default": map[string]interface{}{
+					"description": "Error response",
+					"schema": map[string]interface{}{
+						"$ref": "#/definitions/Error",
+					},
+				},
+			},
+		},
+	}
+
+	s["paths"].(map[string]interface{})["/v1/engine/{engine_name}"] = map[string]interface{}{
+		"get": map[string]interface{}{
+			"summary":     "Get information on a specific engine.",
+			"description": "The engine endpoint returns the current engine state.",
+			"produces": []string{
+				"application/json",
+			},
+			"parameters": engineParams,
+			"responses": map[string]interface{}{
+				"200": map[string]interface{}{
+					"description": "The operation was successful.",
+					"schema": map[string]interface{}{
+						"type": "object",
+						"properties": map[string]interface{}{
+							"id": map[string]interface{}{
+								"description": "ID of the engine.",
+								"type":        "number",
+							},
+							"workers": map[string]interface{}{
+								"description": "Number of worker threads.",
+								"type":        "number",
+							},
+							"status": map[string]interface{}{
+								"description": "Status of the engine.",
+								"type":        "string",
+							},
+						},
+					},
+				},
+				"default": map[string]interface{}{
+					"description": "Error response",
+					"schema": map[string]interface{}{
+						"$ref": "#/definitions/Error",
+					},
+				},
+			},
+		},
+	}
+
+	s["paths"].(map[string]interface{})["/v1/engine/{engine_name}"] = map[string]interface{}{
+		"post": map[string]interface{}{
+			"summary": "Runs rumble code on the specified engine.",
+			"description": "The body of the request should contain rumble code. " +
+				"The engine runs the given rumble code on the specified engine. " +
+				"All specified sinks are loaded before any events are added.",
+			"consumes": []string{
+				"text/plain",
+			},
+			"produces": []string{
+				"application/json",
+			},
+			"parameters": engineParams,
+			"responses": map[string]interface{}{
+				"200": map[string]interface{}{
+					"description": "The result of the code.",
+				},
+				"default": map[string]interface{}{
+					"description": "Error response",
+					"schema": map[string]interface{}{
+						"$ref": "#/definitions/Error",
+					},
+				},
+			},
+		},
+	}
+
+	s["paths"].(map[string]interface{})["/v1/engine/{engine_name}/{code_name}"] = map[string]interface{}{
+		"post": map[string]interface{}{
+			"summary": "Runs named rumble code on the specified engine.",
+			"description": "The body of the request should contain rumble code. " +
+				"The engine runs the given rumble code on the specified engine. " +
+				"All specified sinks are loaded before any events are added.",
+			"consumes": []string{
+				"text/plain",
+			},
+			"produces": []string{
+				"application/json",
+			},
+			"parameters": append(engineParams, codenameParams...),
+			"responses": map[string]interface{}{
+				"200": map[string]interface{}{
+					"description": "The result of the code.",
+				},
+				"default": map[string]interface{}{
+					"description": "Error response",
+					"schema": map[string]interface{}{
+						"$ref": "#/definitions/Error",
+					},
+				},
+			},
+		},
+	}
+
+	s["paths"].(map[string]interface{})["/v1/engine/{engine_name}/log"] = map[string]interface{}{
+		"get": map[string]interface{}{
+			"summary":     "Get the log messages of a specific engine.",
+			"description": "The engine log endpoint returns all log messages for the given engine.",
+			"produces": []string{
+				"application/json",
+			},
+			"parameters": engineParams,
+			"responses": map[string]interface{}{
+				"200": map[string]interface{}{
+					"description": "List of log message objects.",
+					"schema": map[string]interface{}{
+						"type": "array",
+						"items": map[string]interface{}{
+							"type": "object",
+							"properties": map[string]interface{}{
+								"mid": map[string]interface{}{
+									"description": "Root monitor ID for the log message.",
+									"type":        "string",
+								},
+								"level": map[string]interface{}{
+									"description": "Message level.",
+									"type":        "string",
+								},
+								"msg": map[string]interface{}{
+									"description": "Log message.",
+									"type":        "string",
+								},
+							},
+						},
+					},
+				},
+				"default": map[string]interface{}{
+					"description": "Error response",
+					"schema": map[string]interface{}{
+						"$ref": "#/definitions/Error",
+					},
+				},
+			},
+		},
+	}
+
+	s["paths"].(map[string]interface{})["/v1/engine/{engine_name}/sink"] = map[string]interface{}{
+		"get": map[string]interface{}{
+			"summary":     "Get the available sinks of a specific engine.",
+			"description": "The engine sink endpoint returns all sinks for the given engine.",
+			"produces": []string{
+				"application/json",
+			},
+			"parameters": append(engineParams, optionalQueryParams...),
+			"responses": map[string]interface{}{
+				"200": map[string]interface{}{
+					"description": "Object of sinks.",
+					"schema": map[string]interface{}{
+						"type": "object",
+						"properties": map[string]interface{}{
+							"<Sink name>": map[string]interface{}{
+								"type": "object",
+								"properties": map[string]interface{}{
+									"description": map[string]interface{}{
+										"description": "Sink description.",
+										"type":        "string",
+									},
+									"kind_match": map[string]interface{}{
+										"description": "List of matched event kinds.",
+										"type":        "array",
+										"items": map[string]interface{}{
+											"type": "string",
+										},
+									},
+									"scope_match": map[string]interface{}{
+										"description": "List of matched scopes.",
+										"type":        "array",
+										"items": map[string]interface{}{
+											"type": "string",
+										},