Browse Source

fix: Fix race condition for thread pool with one thread - fix race condition for error reporting

Matthias Ladkau 3 years ago
parent
commit
cb65914e8e
5 changed files with 12 additions and 34 deletions
  1. 0 1
      engine/monitor.go
  2. 6 0
      engine/pool/threadpool.go
  3. 0 2
      engine/processor.go
  4. 0 31
      engine/pubsub/eventpump.go
  5. 6 0
      engine/taskqueue.go

+ 0 - 1
engine/monitor.go

@@ -231,7 +231,6 @@ func (mb *monitorBase) Errors() *TaskError {
 SetErrors adds an error object to this monitor.
 */
 func (mb *monitorBase) SetErrors(e *TaskError) {
-	errorutil.AssertTrue(mb.finished, "Cannot set errors on an unfinished monitor")
 	mb.Err = e
 	mb.rootMonitor.descendantFailed(mb)
 }

+ 6 - 0
engine/pool/threadpool.go

@@ -315,6 +315,12 @@ func (tp *ThreadPool) SetWorkerCount(count int, wait bool) {
 			}
 		}
 	}
+
+	// If a count was set wait until at least one worker is idle
+
+	for count > 0 && len(tp.workerIdleMap) == 0 {
+		time.Sleep(5 * time.Nanosecond)
+	}
 }
 
 /*

+ 0 - 2
engine/processor.go

@@ -467,8 +467,6 @@ func (p *eventProcessor) ProcessEvent(event *Event, parent Monitor) map[string]e
 		}
 	}
 
-	parent.Finish()
-
 	return errors
 }
 

+ 0 - 31
engine/pubsub/eventpump.go

@@ -68,37 +68,6 @@ func (ep *EventPump) AddObserver(event string, eventSource interface{}, callback
 	}
 }
 
-/*
-PostEvent posts an event to this event pump from a given event source.
-*/
-func (ep *EventPump) PostEvent2(event string, eventSource interface{}) {
-	if event == "" || eventSource == nil {
-		panic("Posting an event requires the event and its source")
-	}
-
-	ep.eventsObserversLock.Lock()
-	defer ep.eventsObserversLock.Unlock()
-
-	postEvent := func(event string, eventSource interface{}) {
-
-		if sources, ok := ep.eventsObservers[event]; ok {
-			for source, callbacks := range sources {
-				if source == eventSource || source == nil {
-					for _, callback := range callbacks {
-						ep.eventsObserversLock.Unlock()
-						callback(event, eventSource)
-						ep.eventsObserversLock.Lock()
-					}
-				}
-			}
-		}
-
-	}
-
-	postEvent(event, eventSource)
-	postEvent("", eventSource)
-}
-
 /*
 PostEvent posts an event to this event pump from a given event source.
 */

+ 6 - 0
engine/taskqueue.go

@@ -79,10 +79,15 @@ func (t *Task) Run() error {
 	errors := t.p.ProcessEvent(t.e, t.m)
 
 	if len(errors) > 0 {
+
+		// Monitor is not declared finished until the errors have been handled
+
 		EventTracer.record(t.e, "Task.Run", fmt.Sprint("Task had errors:", errors))
 		return &TaskError{errors, t.e, t.m}
 	}
 
+	t.m.Finish()
+
 	return nil
 }
 
@@ -98,6 +103,7 @@ HandleError handles an error which occurred during the run method.
 */
 func (t *Task) HandleError(e error) {
 	t.m.SetErrors(e.(*TaskError))
+	t.m.Finish()
 	t.p.(*eventProcessor).notifyRootMonitorErrors(t.m.RootMonitor())
 }