manager.go 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640
  1. /*
  2. * EliasDB
  3. *
  4. * Copyright 2016 Matthias Ladkau. All rights reserved.
  5. *
  6. * This Source Code Form is subject to the terms of the Mozilla Public
  7. * License, v. 2.0. If a copy of the MPL was not distributed with this
  8. * file, You can obtain one at http://mozilla.org/MPL/2.0/.
  9. */
  10. package manager
  11. import (
  12. "crypto/sha512"
  13. "fmt"
  14. "math/rand"
  15. "net"
  16. "net/rpc"
  17. "sort"
  18. "strconv"
  19. "sync"
  20. "time"
  21. "devt.de/krotik/common/datautil"
  22. )
  23. /*
  24. MemberManager is the management object for a cluster member.
  25. This is the main object of the clustering code it contains the main API.
  26. A member registers itself to the rpc server which is the global
  27. ManagerServer (server) object. Each cluster member needs to have a unique name.
  28. Communication between members is secured by using a secret string which
  29. is never exchanged over the network and a hash generated token which
  30. identifies a member.
  31. Each MemberManager object contains a Client object which can be used to
  32. communicate with other cluster members. This object should be used by pure
  33. clients - code which should communicate with the cluster without running an
  34. actual member.
  35. */
  36. type MemberManager struct {
  37. name string // Name of the cluster member
  38. secret string // Cluster secret
  39. stateInfo StateInfo // StateInfo object which can persist runtime configuration
  40. memberInfo map[string]interface{} // Static info about this member
  41. housekeeping bool // Housekeeping thread running
  42. housekeepingLock *sync.Mutex // Lock for housekeeping (prevent housekeeping from running)
  43. StopHousekeeping bool // Flag to temporarily stop housekeeping
  44. handleDataRequest func(interface{}, *interface{}) error // Handler for cluster data requests
  45. notifyStateUpdate func() // Handler which is called when the state info is updated
  46. notifyHouseKeeping func() // Handler which is called each time the housekeeping thread has finished
  47. Client *Client // RPC client object
  48. listener net.Listener // RPC server listener
  49. wg sync.WaitGroup // RPC server Waitgroup for listener shutdown
  50. }
  51. /*
  52. NewMemberManager create a new MemberManager object.
  53. */
  54. func NewMemberManager(rpcInterface string, name string, secret string, stateInfo StateInfo) *MemberManager {
  55. // Generate member token
  56. token := &MemberToken{name, fmt.Sprintf("%X", sha512.Sum512_224([]byte(name+secret)))}
  57. // By default a client can hold a lock for up to 30 seconds before it is cleared.
  58. mm := &MemberManager{name, secret, stateInfo, make(map[string]interface{}),
  59. false, &sync.Mutex{}, false, func(interface{}, *interface{}) error { return nil }, func() {}, func() {},
  60. &Client{token, rpcInterface, make(map[string]string), make(map[string]*rpc.Client),
  61. make(map[string]string), &sync.RWMutex{}, datautil.NewMapCache(0, 30)},
  62. nil, sync.WaitGroup{}}
  63. // Check if given state info should be initialized or applied
  64. if _, ok := stateInfo.Get(StateInfoTS); !ok {
  65. mm.updateStateInfo(true)
  66. } else {
  67. mm.applyStateInfo(stateInfo.Map())
  68. }
  69. return mm
  70. }
  71. // General cluster member API
  72. // ==========================
  73. /*
  74. Start starts the manager process for this cluster member.
  75. */
  76. func (mm *MemberManager) Start() error {
  77. mm.LogInfo("Starting member manager ", mm.name, " rpc server on: ", mm.Client.rpc)
  78. l, err := net.Listen("tcp", mm.Client.rpc)
  79. if err != nil {
  80. return err
  81. }
  82. go func() {
  83. rpc.Accept(l)
  84. mm.wg.Done()
  85. mm.LogInfo("Connection closed: ", mm.Client.rpc)
  86. }()
  87. mm.listener = l
  88. server.managers[mm.name] = mm
  89. if runHousekeeping {
  90. s1 := rand.NewSource(time.Now().UnixNano())
  91. r1 := rand.New(s1)
  92. // Start housekeeping thread which will check for configuration changes
  93. mm.housekeeping = true
  94. go func() {
  95. for mm.housekeeping {
  96. mm.HousekeepingWorker()
  97. time.Sleep(time.Duration(FreqHousekeeping*(1+r1.Float64())) * time.Millisecond)
  98. }
  99. mm.wg.Done()
  100. }()
  101. }
  102. return nil
  103. }
  104. /*
  105. Shutdown shuts the member manager rpc server for this cluster member down.
  106. */
  107. func (mm *MemberManager) Shutdown() error {
  108. // Stop housekeeping
  109. if mm.housekeeping {
  110. mm.wg.Add(1)
  111. mm.housekeeping = false
  112. mm.wg.Wait()
  113. mm.LogInfo("Housekeeping stopped")
  114. }
  115. // Close socket
  116. if mm.listener != nil {
  117. mm.LogInfo("Shutdown rpc server on: ", mm.Client.rpc)
  118. mm.wg.Add(1)
  119. mm.listener.Close()
  120. mm.listener = nil
  121. mm.wg.Wait()
  122. } else {
  123. LogDebug("Member manager ", mm.name, " already shut down")
  124. }
  125. return nil
  126. }
  127. /*
  128. LogInfo logs a member related message at info level.
  129. */
  130. func (mm *MemberManager) LogInfo(v ...interface{}) {
  131. LogInfo(mm.name, ": ", fmt.Sprint(v...))
  132. }
  133. /*
  134. Name returns the member name.
  135. */
  136. func (mm *MemberManager) Name() string {
  137. return mm.name
  138. }
  139. /*
  140. NetAddr returns the network address of the member.
  141. */
  142. func (mm *MemberManager) NetAddr() string {
  143. return mm.Client.rpc
  144. }
  145. /*
  146. Members returns a list of all cluster members.
  147. */
  148. func (mm *MemberManager) Members() []string {
  149. var ret []string
  150. siMembers, _ := mm.stateInfo.Get(StateInfoMEMBERS)
  151. members := siMembers.([]string)
  152. for i := 0; i < len(members); i += 2 {
  153. ret = append(ret, members[i])
  154. }
  155. sort.Strings(ret)
  156. return ret
  157. }
  158. /*
  159. StateInfo returns the current state info.
  160. */
  161. func (mm *MemberManager) StateInfo() StateInfo {
  162. return mm.stateInfo
  163. }
  164. /*
  165. MemberInfo returns the current static member info. Clients may modify the
  166. returned map. Member info can be used to store additional information
  167. on every member (e.g. a member specific URL).
  168. */
  169. func (mm *MemberManager) MemberInfo() map[string]interface{} {
  170. return mm.memberInfo
  171. }
  172. /*
  173. SetEventHandler sets event handler funtions which are called when the state info
  174. is updated or when housekeeping has been done.
  175. */
  176. func (mm *MemberManager) SetEventHandler(notifyStateUpdate func(), notifyHouseKeeping func()) {
  177. mm.notifyStateUpdate = notifyStateUpdate
  178. mm.notifyHouseKeeping = notifyHouseKeeping
  179. }
  180. /*
  181. SetHandleDataRequest sets the data request handler.
  182. */
  183. func (mm *MemberManager) SetHandleDataRequest(handleDataRequest func(interface{}, *interface{}) error) {
  184. mm.handleDataRequest = handleDataRequest
  185. }
  186. /*
  187. MemberInfoCluster returns the current static member info for every known
  188. cluster member. This calls every member in the cluster.
  189. */
  190. func (mm *MemberManager) MemberInfoCluster() map[string]map[string]interface{} {
  191. clusterMemberInfo := make(map[string]map[string]interface{})
  192. clusterMemberInfo[mm.name] = mm.MemberInfo()
  193. for p := range mm.Client.peers {
  194. mi, err := mm.Client.SendMemberInfoRequest(p)
  195. if err != nil {
  196. clusterMemberInfo[p] = map[string]interface{}{MemberInfoError: err.Error()}
  197. } else {
  198. clusterMemberInfo[p] = mi
  199. }
  200. }
  201. return clusterMemberInfo
  202. }
  203. // Cluster membership functions
  204. // ============================
  205. /*
  206. JoinCluster lets this member try to join an existing cluster. The secret must
  207. be correct otherwise the member will be rejected.
  208. */
  209. func (mm *MemberManager) JoinCluster(newMemberName string, newMemberRPC string) error {
  210. // Housekeeping should not be running while joining a cluster
  211. mm.housekeepingLock.Lock()
  212. defer mm.housekeepingLock.Unlock()
  213. res, err := mm.Client.SendJoinCluster(newMemberName, newMemberRPC)
  214. if err == nil {
  215. // Update the state info of this member if the join was successful
  216. mm.applyStateInfo(res)
  217. }
  218. return err
  219. }
  220. /*
  221. JoinNewMember joins a new member to the current cluster. It is assumed that
  222. the new members token has already been verified.
  223. */
  224. func (mm *MemberManager) JoinNewMember(newMemberName string, newMemberRPC string) error {
  225. // Acquire cluster lock for updating the state info
  226. if err := mm.Client.SendAcquireClusterLock(ClusterLockUpdateStateInfo); err != nil {
  227. return err
  228. }
  229. // Get operational peers (operational cluster is NOT required - other members should
  230. // update eventually)
  231. peers, _ := mm.Client.OperationalPeers()
  232. mm.LogInfo("Adding member ", newMemberName, " with rpc ", newMemberRPC, " to the cluster")
  233. // Add member to local state info
  234. if err := mm.addMember(newMemberName, newMemberRPC, nil); err != nil {
  235. // Try to release the cluster lock if something went wrong at this point
  236. mm.Client.SendReleaseClusterLock(ClusterLockUpdateStateInfo)
  237. return err
  238. }
  239. // Add member to all other cluster members (ignore failures - failed members
  240. // should be updated eventually by the BackgroundWorker)
  241. for _, p := range peers {
  242. mm.Client.SendRequest(p, RPCAddMember, map[RequestArgument]interface{}{
  243. RequestMEMBERNAME: newMemberName,
  244. RequestMEMBERRPC: newMemberRPC,
  245. RequestSTATEINFOMAP: mapToBytes(mm.stateInfo.Map()),
  246. })
  247. }
  248. // Release cluster lock for updating the state info
  249. return mm.Client.SendReleaseClusterLock(ClusterLockUpdateStateInfo)
  250. }
  251. /*
  252. EjectMember ejects a member from the current cluster. Trying to remove a non-existent
  253. member has no effect.
  254. */
  255. func (mm *MemberManager) EjectMember(memberToEject string) error {
  256. var err error
  257. // Get operational peers (operational cluster is NOT required - other members should
  258. // update eventually)
  259. peers, _ := mm.Client.OperationalPeers()
  260. // Check if the given member name is valid - it must be a peer or this member
  261. if memberToEjectRPC, ok := mm.Client.peers[memberToEject]; ok {
  262. // Acquire cluster lock for updating the state info
  263. if err := mm.Client.SendAcquireClusterLock(ClusterLockUpdateStateInfo); err != nil {
  264. return err
  265. }
  266. mm.LogInfo("Ejecting member ", memberToEject, " from the cluster")
  267. mm.Client.maplock.Lock()
  268. delete(mm.Client.peers, memberToEject)
  269. delete(mm.Client.conns, memberToEject)
  270. delete(mm.Client.failed, memberToEject)
  271. mm.Client.maplock.Unlock()
  272. if err := mm.updateStateInfo(true); err != nil {
  273. // Put the member to eject back into the peers map
  274. mm.Client.peers[memberToEject] = memberToEjectRPC
  275. // Try to release the cluster lock if something went wrong at this point
  276. mm.Client.SendReleaseClusterLock(ClusterLockUpdateStateInfo)
  277. return err
  278. }
  279. // Send the state info to all other cluster members (ignore failures - failed members
  280. // should be updated eventually by the BackgroundWorker)
  281. for _, k := range peers {
  282. mm.Client.SendRequest(k, RPCUpdateStateInfo, map[RequestArgument]interface{}{
  283. RequestSTATEINFOMAP: mapToBytes(mm.stateInfo.Map()),
  284. })
  285. }
  286. // Release cluster lock for updating the state info
  287. err = mm.Client.SendReleaseClusterLock(ClusterLockUpdateStateInfo)
  288. } else if mm.name == memberToEject {
  289. // If we should eject ourselves then forward the request
  290. mm.LogInfo("Ejecting this member from the cluster")
  291. if len(peers) > 0 {
  292. if err := mm.Client.SendEjectMember(peers[0], mm.name); err != nil {
  293. return err
  294. }
  295. }
  296. // Clear peer maps and update the cluster state
  297. mm.Client.maplock.Lock()
  298. mm.Client.peers = make(map[string]string)
  299. mm.Client.conns = make(map[string]*rpc.Client)
  300. mm.Client.failed = make(map[string]string)
  301. mm.Client.maplock.Unlock()
  302. err = mm.updateStateInfo(true)
  303. }
  304. return err
  305. }
  306. // StateInfo functions
  307. // ===================
  308. /*
  309. UpdateClusterStateInfo updates the members state info and sends it to all members in
  310. the cluster.
  311. */
  312. func (mm *MemberManager) UpdateClusterStateInfo() error {
  313. // Get operational peers - fail if the cluster is not operational
  314. peers, err := mm.Client.OperationalPeers()
  315. if err != nil {
  316. return err
  317. }
  318. // Acquire cluster lock for updating the state info
  319. if err := mm.Client.SendAcquireClusterLock(ClusterLockUpdateStateInfo); err != nil {
  320. return err
  321. }
  322. mm.LogInfo("Updating cluster state info")
  323. if err := mm.updateStateInfo(true); err != nil {
  324. // Try to release the cluster lock if something went wrong at this point
  325. mm.Client.SendReleaseClusterLock(ClusterLockUpdateStateInfo)
  326. return err
  327. }
  328. // Send the state info to all other cluster members (ignore failures - failed members
  329. // should be updated eventually by the BackgroundWorker)
  330. for _, k := range peers {
  331. mm.Client.SendRequest(k, RPCUpdateStateInfo, map[RequestArgument]interface{}{
  332. RequestSTATEINFOMAP: mapToBytes(mm.stateInfo.Map()),
  333. })
  334. }
  335. // Release cluster lock for updating the state info
  336. return mm.Client.SendReleaseClusterLock(ClusterLockUpdateStateInfo)
  337. }
  338. // Helper functions
  339. // ================
  340. /*
  341. addMember adds a new member to the local state info.
  342. */
  343. func (mm *MemberManager) addMember(newMemberName string, newMemberRPC string,
  344. newStateInfo map[string]interface{}) error {
  345. // Check if member exists already
  346. if _, ok := mm.Client.peers[newMemberName]; ok {
  347. return &Error{ErrClusterConfig,
  348. fmt.Sprintf("Cannot add member %v as a member with the same name exists already",
  349. newMemberName)}
  350. }
  351. // Add new peer to peer map - member.Client.conns will be updated on the
  352. // first connection
  353. mm.Client.maplock.Lock()
  354. mm.Client.peers[newMemberName] = newMemberRPC
  355. mm.Client.maplock.Unlock()
  356. // Store the new state or just update the state
  357. if newStateInfo != nil {
  358. return mm.applyStateInfo(newStateInfo)
  359. }
  360. return mm.updateStateInfo(true)
  361. }
  362. /*
  363. updateStateInfo updates the StateInfo from the current runtime state.
  364. Only updates the timestamp if newTS is true.
  365. */
  366. func (mm *MemberManager) updateStateInfo(newTS bool) error {
  367. sortMapKeys := func(m map[string]string) []string {
  368. var ks []string
  369. for k := range m {
  370. ks = append(ks, k)
  371. }
  372. sort.Strings(ks)
  373. return ks
  374. }
  375. // Populate members entry
  376. members := make([]string, 0, len(mm.Client.peers)*2)
  377. // Add this member to the state info
  378. members = append(members, mm.name)
  379. members = append(members, mm.Client.rpc)
  380. // Add other known members to the state info
  381. mm.Client.maplock.Lock()
  382. for _, name := range sortMapKeys(mm.Client.peers) {
  383. rpc := mm.Client.peers[name]
  384. members = append(members, name)
  385. members = append(members, rpc)
  386. }
  387. mm.stateInfo.Put(StateInfoMEMBERS, members)
  388. failed := make([]string, 0, len(mm.Client.failed)*2)
  389. // Add all known failed members to the state info
  390. for _, name := range sortMapKeys(mm.Client.failed) {
  391. errstr := mm.Client.failed[name]
  392. failed = append(failed, name)
  393. failed = append(failed, errstr)
  394. }
  395. mm.Client.maplock.Unlock()
  396. mm.stateInfo.Put(StateInfoFAILED, failed)
  397. // Check for replication factor entry - don't touch if it is set
  398. if _, ok := mm.stateInfo.Get(StateInfoREPFAC); !ok {
  399. mm.stateInfo.Put(StateInfoREPFAC, 1)
  400. }
  401. if newTS {
  402. // Populate old timestamp and timestamp
  403. newOldTS, ok := mm.stateInfo.Get(StateInfoTS)
  404. if !ok {
  405. newOldTS = []string{"", "0"}
  406. }
  407. mm.stateInfo.Put(StateInfoTSOLD, newOldTS)
  408. v, _ := strconv.ParseInt(newOldTS.([]string)[1], 10, 64)
  409. mm.stateInfo.Put(StateInfoTS, []string{mm.name, fmt.Sprint(v + 1)})
  410. }
  411. err := mm.stateInfo.Flush()
  412. if err == nil {
  413. // Notify others of the state update
  414. mm.notifyStateUpdate()
  415. }
  416. return err
  417. }
  418. /*
  419. applyStateInfo sets the runtime state from the given StateInfo map.
  420. */
  421. func (mm *MemberManager) applyStateInfo(stateInfoMap map[string]interface{}) error {
  422. // Set peers entry
  423. mm.applyStateInfoPeers(stateInfoMap, true)
  424. // Set failed entry
  425. mm.Client.maplock.Lock()
  426. mm.Client.failed = make(map[string]string)
  427. siFailed, _ := stateInfoMap[StateInfoFAILED]
  428. failed := siFailed.([]string)
  429. for i := 0; i < len(failed); i += 2 {
  430. mm.Client.failed[failed[i]] = failed[i+1]
  431. }
  432. mm.Client.maplock.Unlock()
  433. // Set give replication factor entry
  434. mm.stateInfo.Put(StateInfoREPFAC, stateInfoMap[StateInfoREPFAC])
  435. // Set given timestamp
  436. mm.stateInfo.Put(StateInfoTS, stateInfoMap[StateInfoTS])
  437. mm.stateInfo.Put(StateInfoTSOLD, stateInfoMap[StateInfoTSOLD])
  438. // Set state info
  439. return mm.updateStateInfo(false)
  440. }
  441. /*
  442. applyStateInfoPeers sets the peer related runtime state from the given StateInfo map.
  443. */
  444. func (mm *MemberManager) applyStateInfoPeers(stateInfoMap map[string]interface{}, replaceExisting bool) {
  445. // Set peers entry
  446. if replaceExisting {
  447. mm.Client.maplock.Lock()
  448. mm.Client.peers = make(map[string]string)
  449. mm.Client.maplock.Unlock()
  450. }
  451. siMembers, _ := stateInfoMap[StateInfoMEMBERS]
  452. members := siMembers.([]string)
  453. for i := 0; i < len(members); i += 2 {
  454. // Do not add this member as peer
  455. if members[i] != mm.name {
  456. mm.Client.maplock.Lock()
  457. mm.Client.peers[members[i]] = members[i+1]
  458. mm.Client.maplock.Unlock()
  459. }
  460. }
  461. }