threadpool_test.go 7.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426
  1. /*
  2. * Public Domain Software
  3. *
  4. * I (Matthias Ladkau) am the author of the source code in this file.
  5. * I have placed the source code in this file in the public domain.
  6. *
  7. * For further information see: http://creativecommons.org/publicdomain/zero/1.0/
  8. */
  9. package pool
  10. import (
  11. "bytes"
  12. "errors"
  13. "fmt"
  14. "strings"
  15. "sync"
  16. "testing"
  17. "time"
  18. )
  19. type testTask struct {
  20. task func() error
  21. errorHandler func(e error)
  22. }
  23. func (t *testTask) Run(tid uint64) error {
  24. return t.task()
  25. }
  26. func (t *testTask) HandleError(e error) {
  27. t.errorHandler(e)
  28. }
  29. func TestDefaultTaskQueue(t *testing.T) {
  30. var taskFinishCounter int
  31. var tq DefaultTaskQueue
  32. if res := tq.Size(); res != 0 {
  33. t.Error("Initial size should be empty not: ", res)
  34. return
  35. }
  36. if res := tq.Pop(); res != nil {
  37. t.Error("Unexpected result: ", res)
  38. return
  39. }
  40. tq.Clear()
  41. if res := tq.Size(); res != 0 {
  42. t.Error("Initial size should be empty not: ", res)
  43. return
  44. }
  45. if res := tq.Pop(); res != nil {
  46. t.Error("Unexpected result: ", res)
  47. return
  48. }
  49. tq.Push(&testTask{func() error {
  50. taskFinishCounter++
  51. return nil
  52. }, nil})
  53. tq.Push(&testTask{func() error {
  54. taskFinishCounter++
  55. return nil
  56. }, nil})
  57. tq.Push(&testTask{func() error {
  58. taskFinishCounter++
  59. return nil
  60. }, nil})
  61. if res := tq.Size(); res != 3 {
  62. t.Error("Unexpected result: ", res)
  63. return
  64. }
  65. // Execute the functions
  66. tq.Pop().Run(0)
  67. if res := tq.Size(); res != 2 {
  68. t.Error("Unexpected result: ", res)
  69. return
  70. }
  71. tq.Pop().Run(0)
  72. if res := tq.Size(); res != 1 {
  73. t.Error("Unexpected result: ", res)
  74. return
  75. }
  76. tq.Pop().Run(0)
  77. if res := tq.Size(); res != 0 {
  78. t.Error("Unexpected result: ", res)
  79. return
  80. }
  81. if res := tq.Pop(); res != nil {
  82. t.Error("Unexpected result: ", res)
  83. return
  84. }
  85. if taskFinishCounter != 3 {
  86. t.Error("Unexpected result: ", taskFinishCounter)
  87. return
  88. }
  89. }
  90. func TestThreadPool(t *testing.T) {
  91. var taskFinishCounter int
  92. taskFinishCounterLock := &sync.Mutex{}
  93. tp := NewThreadPool()
  94. tp.SetWorkerCount(-10, true)
  95. tp.TooManyThreshold = 1
  96. if status := tp.Status(); status != StatusStopped {
  97. t.Error("Unexpected status:", status)
  98. return
  99. }
  100. tp.SetWorkerCount(3, true)
  101. if status := tp.Status(); status != StatusRunning {
  102. t.Error("Unexpected status:", status)
  103. return
  104. }
  105. if workers := len(tp.workerMap); workers != 3 {
  106. t.Error("Unepxected state:", workers)
  107. return
  108. }
  109. tp.AddTask(&testTask{func() error {
  110. taskFinishCounterLock.Lock()
  111. taskFinishCounter++
  112. taskFinishCounterLock.Unlock()
  113. return nil
  114. }, nil})
  115. tp.AddTask(&testTask{func() error {
  116. taskFinishCounterLock.Lock()
  117. taskFinishCounter++
  118. taskFinishCounterLock.Unlock()
  119. return nil
  120. }, nil})
  121. tp.AddTask(&testTask{func() error {
  122. taskFinishCounterLock.Lock()
  123. taskFinishCounter++
  124. taskFinishCounterLock.Unlock()
  125. return nil
  126. }, nil})
  127. tp.JoinAll()
  128. if workers := len(tp.workerMap); workers != 0 {
  129. t.Error("Unepxected state:", workers)
  130. return
  131. }
  132. if taskFinishCounter != 3 {
  133. t.Error("Unexpected result: ", taskFinishCounter)
  134. return
  135. }
  136. if status := tp.Status(); status != StatusStopped {
  137. t.Error("Unexpected status:", status)
  138. return
  139. }
  140. tp.AddTask(&testTask{func() error {
  141. taskFinishCounterLock.Lock()
  142. taskFinishCounter++
  143. taskFinishCounterLock.Unlock()
  144. return nil
  145. }, nil})
  146. tp.AddTask(&testTask{func() error {
  147. taskFinishCounterLock.Lock()
  148. taskFinishCounter++
  149. taskFinishCounterLock.Unlock()
  150. return nil
  151. }, nil})
  152. tp.AddTask(&testTask{func() error {
  153. taskFinishCounterLock.Lock()
  154. taskFinishCounter++
  155. taskFinishCounterLock.Unlock()
  156. return nil
  157. }, nil})
  158. tp.AddTask(&testTask{func() error {
  159. taskFinishCounterLock.Lock()
  160. taskFinishCounter++
  161. taskFinishCounterLock.Unlock()
  162. time.Sleep(10 * time.Millisecond)
  163. return nil
  164. }, nil})
  165. if status := tp.Status(); status != StatusStopped {
  166. t.Error("Unexpected status:", status)
  167. return
  168. }
  169. tp.SetWorkerCount(3, false)
  170. if workers := len(tp.workerMap); workers != 3 {
  171. t.Error("Unepxected state:", workers)
  172. return
  173. }
  174. // Let the workers go into the idle state
  175. time.Sleep(20 * time.Millisecond)
  176. // Reduce the number of workers
  177. tp.SetWorkerCount(1, true)
  178. if workers := len(tp.workerMap); workers != 1 {
  179. t.Error("Unepxected state:", workers)
  180. return
  181. }
  182. tp.AddTask(&testTask{func() error {
  183. taskFinishCounterLock.Lock()
  184. taskFinishCounter++
  185. taskFinishCounterLock.Unlock()
  186. return nil
  187. }, nil})
  188. tp.AddTask(&testTask{func() error {
  189. taskFinishCounterLock.Lock()
  190. taskFinishCounter++
  191. taskFinishCounterLock.Unlock()
  192. time.Sleep(10 * time.Millisecond)
  193. return nil
  194. }, nil})
  195. // Set the kill value
  196. tp.workerKill = -1
  197. if status := tp.Status(); status != StatusStopping {
  198. t.Error("Unexpected status:", status)
  199. return
  200. }
  201. tp.WaitAll()
  202. tp.SetWorkerCount(-5, true)
  203. if workers := len(tp.workerMap); workers != 0 {
  204. t.Error("Unepxected state:", workers)
  205. return
  206. }
  207. tp.AddTask(&testTask{func() error {
  208. taskFinishCounterLock.Lock()
  209. taskFinishCounter++
  210. taskFinishCounterLock.Unlock()
  211. return nil
  212. }, nil})
  213. tp.WaitAll()
  214. if taskFinishCounter != 9 {
  215. t.Error("Unexpected result: ", taskFinishCounter)
  216. return
  217. }
  218. tp.SetWorkerCount(1, false)
  219. tp.WaitAll()
  220. if taskFinishCounter != 10 {
  221. t.Error("Unexpected result: ", taskFinishCounter)
  222. return
  223. }
  224. tp.SetWorkerCount(0, true)
  225. if status := tp.Status(); status != StatusStopped {
  226. t.Error("Unexpected status:", status)
  227. return
  228. }
  229. }
  230. func TestThreadPoolThresholds(t *testing.T) {
  231. var taskFinishCounter int
  232. taskFinishCounterLock := &sync.Mutex{}
  233. task := &testTask{func() error {
  234. time.Sleep(time.Millisecond * 5)
  235. taskFinishCounterLock.Lock()
  236. taskFinishCounter++
  237. taskFinishCounterLock.Unlock()
  238. return nil
  239. }, nil}
  240. var buf bytes.Buffer
  241. tp := NewThreadPool()
  242. tp.TooFewThreshold = 1
  243. tp.TooManyThreshold = 5
  244. tp.TooFewCallback = func() {
  245. taskFinishCounterLock.Lock()
  246. buf.WriteString("low")
  247. taskFinishCounterLock.Unlock()
  248. }
  249. tp.TooManyCallback = func() {
  250. taskFinishCounterLock.Lock()
  251. buf.WriteString("high")
  252. taskFinishCounterLock.Unlock()
  253. }
  254. tp.SetWorkerCount(10, false)
  255. for i := 0; i < 10; i++ {
  256. tp.AddTask(task)
  257. }
  258. if wc := tp.WorkerCount(); wc != 10 {
  259. t.Error("Unexpected result:", wc)
  260. return
  261. }
  262. tp.SetWorkerCount(10, false)
  263. tp.WaitAll()
  264. if wc := tp.WorkerCount(); wc != 10 {
  265. t.Error("Unexpected result:", wc)
  266. return
  267. }
  268. tp.SetWorkerCount(10, false)
  269. for i := 0; i < 10; i++ {
  270. tp.AddTask(task)
  271. }
  272. tp.WaitAll()
  273. if state := fmt.Sprint(tp.State()); state == `` {
  274. t.Error("Unexpected state:", state)
  275. return
  276. }
  277. if wc := tp.WorkerCount(); wc != 10 {
  278. t.Error("Unexpected result:", wc)
  279. return
  280. }
  281. if taskFinishCounter != 20 {
  282. t.Error("Unexpected result:", taskFinishCounter)
  283. return
  284. }
  285. tp.JoinAll()
  286. if wc := tp.WorkerCount(); wc != 0 {
  287. t.Error("Unexpected result:", wc)
  288. return
  289. }
  290. // Check that the callbacks where triggered twice each
  291. if !strings.Contains(buf.String(), "high") {
  292. t.Error("Unexpected result:", buf.String())
  293. return
  294. }
  295. if !strings.Contains(buf.String(), "low") {
  296. t.Error("Unexpected result:", buf.String())
  297. return
  298. }
  299. }
  300. func TestThreadPoolIdleTaskPanic(t *testing.T) {
  301. defer func() {
  302. if r := recover(); r == nil {
  303. t.Error("Error handling on the idle task did not cause a panic")
  304. }
  305. }()
  306. // Run error handling function of idle task
  307. idleTask := &idleTask{}
  308. idleTask.HandleError(nil)
  309. }
  310. func TestThreadPoolErrorHandling(t *testing.T) {
  311. // Test error normal task handling
  312. var buf bytes.Buffer
  313. task := &testTask{func() error {
  314. return errors.New("testerror")
  315. }, func(e error) {
  316. buf.WriteString(e.Error())
  317. }}
  318. tp := NewThreadPool()
  319. tp.AddTask(task)
  320. if buf.String() != "" {
  321. t.Error("Unexpected result:", buf.String())
  322. }
  323. tp.SetWorkerCount(1, false)
  324. tp.JoinAll()
  325. if buf.String() != "testerror" {
  326. t.Error("Unexpected result:", buf.String())
  327. }
  328. if state := fmt.Sprint(tp.State()); state == `` {
  329. t.Error("Unexpected state:", state)
  330. return
  331. }
  332. }