config.go 6.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286
  1. /*
  2. * EliasDB
  3. *
  4. * Copyright 2016 Matthias Ladkau. All rights reserved.
  5. *
  6. * This Source Code Form is subject to the terms of the Mozilla Public
  7. * License, v. 2.0. If a copy of the MPL was not distributed with this
  8. * file, You can obtain one at http://mozilla.org/MPL/2.0/.
  9. */
  10. /*
  11. Package manager contains the management code for EliasDB's clustering feature.
  12. The management code deals with cluster building, general communication between cluster
  13. members, verification of communicating peers and monitoring of members.
  14. The cluster structure is pure peer-to-peer design with no single point of failure. All
  15. members of the cluster share a versioned cluster state which is persisted. Members have
  16. to manually be added or removed from the cluster. Each member also has a member info object
  17. which can be used by the application which uses the cluster to store additional member
  18. related information.
  19. Temporary failures are detected automatically. Every member of the cluster monitors the
  20. state of all its peers by sending ping requests to them on a regular schedule.
  21. */
  22. package manager
  23. import (
  24. "bytes"
  25. "encoding/gob"
  26. "fmt"
  27. "sync"
  28. "devt.de/krotik/common/datautil"
  29. "devt.de/krotik/common/errorutil"
  30. "devt.de/krotik/common/fileutil"
  31. "devt.de/krotik/eliasdb/storage"
  32. )
  33. // Cluster config
  34. // ==============
  35. /*
  36. ConfigRPC is the PRC network interface for the local cluster manager
  37. */
  38. const ConfigRPC = "ClusterMemberRPC"
  39. /*
  40. ConfigMemberName is the name of the cluster member
  41. */
  42. const ConfigMemberName = "ClusterMemberName"
  43. /*
  44. ConfigClusterSecret is the secret which authorizes a cluster member
  45. (the secret must never be send directly over the network)
  46. */
  47. const ConfigClusterSecret = "ClusterSecret"
  48. /*
  49. ConfigReplicationFactor is the number of times a given datum must be stored
  50. redundently. The cluster can suffer n-1 member losses before it becomes
  51. inoperational. The value is set once in the configuration and becomes afterwards
  52. part of the global cluster state info (once this is there the config value is ignored).
  53. */
  54. const ConfigReplicationFactor = "ReplicationFactor"
  55. /*
  56. DefaultConfig is the defaut configuration
  57. */
  58. var DefaultConfig = map[string]interface{}{
  59. ConfigRPC: "localhost:9030",
  60. ConfigMemberName: "member1",
  61. ConfigClusterSecret: "secret123",
  62. ConfigReplicationFactor: 1.0,
  63. }
  64. // Cluster state info
  65. // ==================
  66. /*
  67. Known StateInfo entries
  68. */
  69. const (
  70. StateInfoTS = "ts" // Timestamp of state info
  71. StateInfoTSOLD = "tsold" // Previous timestamp of state info
  72. StateInfoMEMBERS = "members" // List of known cluster members
  73. StateInfoFAILED = "failed" // List of failed peers
  74. StateInfoREPFAC = "replication" // Replication factor of the cluster
  75. )
  76. /*
  77. Known MemberInfo entries
  78. */
  79. const (
  80. MemberInfoError = "error" // Error message if a member was not reachable
  81. MemberInfoTermURL = "termurl" // URL to the cluster terminal of the member
  82. )
  83. /*
  84. StateInfo models a state object which stores cluster related data. This
  85. information is exchanged between cluster members. It is not expected that
  86. the info changes frequently.
  87. */
  88. type StateInfo interface {
  89. /*
  90. Put stores some data in the state info.
  91. */
  92. Put(key string, value interface{})
  93. /*
  94. Get retrievtes some data from the state info.
  95. */
  96. Get(key string) (interface{}, bool)
  97. /*
  98. Map returns the state info as a map.
  99. */
  100. Map() map[string]interface{}
  101. /*
  102. Flush persists the state info.
  103. */
  104. Flush() error
  105. }
  106. /*
  107. DefaultStateInfo is the default state info which uses a file to persist its data.
  108. */
  109. type DefaultStateInfo struct {
  110. *datautil.PersistentMap
  111. datalock *sync.RWMutex
  112. }
  113. /*
  114. NewDefaultStateInfo creates a new DefaultStateInfo.
  115. */
  116. func NewDefaultStateInfo(filename string) (StateInfo, error) {
  117. var pm *datautil.PersistentMap
  118. var err error
  119. if res, _ := fileutil.PathExists(filename); !res {
  120. pm, err = datautil.NewPersistentMap(filename)
  121. if err != nil {
  122. return nil, &Error{ErrClusterConfig,
  123. fmt.Sprintf("Cannot create state info file %v: %v",
  124. filename, err.Error())}
  125. }
  126. } else {
  127. pm, err = datautil.LoadPersistentMap(filename)
  128. if err != nil {
  129. return nil, &Error{ErrClusterConfig,
  130. fmt.Sprintf("Cannot load state info file %v: %v",
  131. filename, err.Error())}
  132. }
  133. }
  134. return &DefaultStateInfo{pm, &sync.RWMutex{}}, nil
  135. }
  136. /*
  137. Map returns the state info as a map.
  138. */
  139. func (dsi *DefaultStateInfo) Map() map[string]interface{} {
  140. var ret map[string]interface{}
  141. datautil.CopyObject(dsi.Data, &ret)
  142. return ret
  143. }
  144. /*
  145. Get retrieves some data from the state info.
  146. */
  147. func (dsi *DefaultStateInfo) Get(key string) (interface{}, bool) {
  148. dsi.datalock.RLock()
  149. defer dsi.datalock.RUnlock()
  150. v, ok := dsi.Data[key]
  151. return v, ok
  152. }
  153. /*
  154. Put stores some data in the state info.
  155. */
  156. func (dsi *DefaultStateInfo) Put(key string, value interface{}) {
  157. dsi.datalock.Lock()
  158. defer dsi.datalock.Unlock()
  159. dsi.Data[key] = value
  160. }
  161. /*
  162. Flush persists the state info.
  163. */
  164. func (dsi *DefaultStateInfo) Flush() error {
  165. if err := dsi.PersistentMap.Flush(); err != nil {
  166. return &Error{ErrClusterConfig,
  167. fmt.Sprintf("Cannot persist state info: %v",
  168. err.Error())}
  169. }
  170. return nil
  171. }
  172. /*
  173. MsiRetFlush nil or the error which should be returned by a Flush call
  174. */
  175. var MsiRetFlush error
  176. /*
  177. MemStateInfo is a state info object which does not persist its data.
  178. */
  179. type MemStateInfo struct {
  180. data map[string]interface{}
  181. datalock *sync.RWMutex
  182. }
  183. /*
  184. NewMemStateInfo creates a new MemStateInfo.
  185. */
  186. func NewMemStateInfo() StateInfo {
  187. return &MemStateInfo{make(map[string]interface{}), &sync.RWMutex{}}
  188. }
  189. /*
  190. Map returns the state info as a map.
  191. */
  192. func (msi *MemStateInfo) Map() map[string]interface{} {
  193. var ret map[string]interface{}
  194. datautil.CopyObject(msi.data, &ret)
  195. return ret
  196. }
  197. /*
  198. Get retrieves some data from the state info.
  199. */
  200. func (msi *MemStateInfo) Get(key string) (interface{}, bool) {
  201. msi.datalock.RLock()
  202. defer msi.datalock.RUnlock()
  203. v, ok := msi.data[key]
  204. return v, ok
  205. }
  206. /*
  207. Put stores some data in the state info.
  208. */
  209. func (msi *MemStateInfo) Put(key string, value interface{}) {
  210. msi.datalock.Lock()
  211. defer msi.datalock.Unlock()
  212. msi.data[key] = value
  213. }
  214. /*
  215. Flush does not do anything :-)
  216. */
  217. func (msi *MemStateInfo) Flush() error {
  218. return MsiRetFlush
  219. }
  220. // Helper functions to properly serialize maps
  221. // ===========================================
  222. /*
  223. mapToBytes converts a given map to bytes. This method panics on errors.
  224. */
  225. func mapToBytes(m map[string]interface{}) []byte {
  226. bb := storage.BufferPool.Get().(*bytes.Buffer)
  227. defer func() {
  228. bb.Reset()
  229. storage.BufferPool.Put(bb)
  230. }()
  231. errorutil.AssertOk(gob.NewEncoder(bb).Encode(m))
  232. return bb.Bytes()
  233. }
  234. /*
  235. bytesToMap tries to convert a given byte array into a map. This method panics on errors.
  236. */
  237. func bytesToMap(b []byte) map[string]interface{} {
  238. var ret map[string]interface{}
  239. errorutil.AssertOk(gob.NewDecoder(bytes.NewReader(b)).Decode(&ret))
  240. return ret
  241. }