distributedstorage.go 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506
  1. /*
  2. * EliasDB
  3. *
  4. * Copyright 2016 Matthias Ladkau. All rights reserved.
  5. *
  6. * This Source Code Form is subject to the terms of the Mozilla Public
  7. * License, v. 2.0. If a copy of the MPL was not distributed with this
  8. * file, You can obtain one at http://mozilla.org/MPL/2.0/.
  9. */
  10. /*
  11. Package cluster contains EliasDB's clustering code.
  12. The clustering code provides an abstraction layer to EliasDB's graphstorage.Storage.
  13. This means the actual storage of a cluster can be entirely memory based or use
  14. any other backend as long as it satisfies the graphstorage.Storage interface.
  15. DistributedStorage wraps a graphstorage.Storage and has a manager.MemberManager
  16. object.
  17. Members are identified by a unique name. Calling Start() on manager.MemberManager
  18. registers and starts the RPC server for the member. Cluster internal RPC requests
  19. are served by manager.Server. It is a singleton object which routes RPC calls
  20. to registered MemberManagers - this architecture makes it easy to unit test
  21. the cluster code. The manager.MemberManager has a manager.Client object which
  22. can be used to send messages to the cluster.
  23. The integrity of the cluster is protected by a shared secret (string) among
  24. all members of the cluster. A new member can only join and communicate with
  25. the cluster if it has the secret string. The secret string is never transferred
  26. directly over the network - it is only used for generating a member specific
  27. token which can be verified by all other members.
  28. The clustering code was inspired by Amazon DynamoDB
  29. http://www.allthingsdistributed.com/2012/01/amazon-dynamodb.html
  30. */
  31. package cluster
  32. import (
  33. "fmt"
  34. "math"
  35. "sync"
  36. "devt.de/krotik/common/datautil"
  37. "devt.de/krotik/eliasdb/cluster/manager"
  38. "devt.de/krotik/eliasdb/graph/graphstorage"
  39. "devt.de/krotik/eliasdb/storage"
  40. )
  41. /*
  42. DistributedStorageError is an error related to the distribution storage. This
  43. error is returned when the data distribution fails for example when too many
  44. cluster members have failed.
  45. */
  46. type DistributedStorageError struct {
  47. err error // Wrapped error
  48. }
  49. /*
  50. newError creates a new DistributedStorageError.
  51. */
  52. func newError(err error) error {
  53. return &DistributedStorageError{err}
  54. }
  55. /*
  56. Error returns a string representation of a DistributedStorageError.
  57. */
  58. func (dse *DistributedStorageError) Error() string {
  59. return fmt.Sprint("Storage disabled: ", dse.err.Error())
  60. }
  61. /*
  62. DistributedStorage data structure
  63. */
  64. type DistributedStorage struct {
  65. MemberManager *manager.MemberManager // Manager object
  66. distributionTableLock *sync.Mutex // Mutex to access the distribution table
  67. distributionTable *DistributionTable // Distribution table for the cluster - may be nil
  68. distributionTableError error // Error detail if the storage is disabled
  69. localName string // Name of the local graph storage
  70. localDRHandler func(interface{}, *interface{}) error // Local data request handler
  71. localFlushHandler func() error // Handler to flush the local storage
  72. localCloseHandler func() error // Handler to close the local storage
  73. mainDB map[string]string // Local main copy (only set when requested)
  74. mainDBError error // Last error when main db was requested
  75. }
  76. /*
  77. NewDistributedStorage creates a new cluster graph storage. The distributed storage
  78. wraps around a local graphstorage.Storage. The configuration of the distributed
  79. storage consists of two parts: A normal config map which defines static information
  80. like rpc port, secret string, etc and a StateInfo object which is used for dynamic
  81. information like cluster members, member status, etc. An empty StateInfo means
  82. that the cluster has only one member.
  83. */
  84. func NewDistributedStorage(gs graphstorage.Storage, config map[string]interface{},
  85. stateInfo manager.StateInfo) (*DistributedStorage, error) {
  86. ds, ms, err := newDistributedAndMemberStorage(gs, config, stateInfo)
  87. if _, ok := gs.(*graphstorage.MemoryGraphStorage); ok {
  88. msmap[ds] = ms // Keep track of memory storages for debugging
  89. }
  90. return ds, err
  91. }
  92. /*
  93. DSRetNew is the return value on successful creating a distributed storage
  94. (used for testing)
  95. */
  96. var DSRetNew error
  97. /*
  98. newDistributedAndMemberStorage creates a new cluster graph storage but also returns a
  99. reference to the internal memberStorage object.
  100. */
  101. func newDistributedAndMemberStorage(gs graphstorage.Storage, config map[string]interface{},
  102. stateInfo manager.StateInfo) (*DistributedStorage, *memberStorage, error) {
  103. var repFac int
  104. // Merge given configuration with default configuration
  105. clusterconfig := datautil.MergeMaps(manager.DefaultConfig, config)
  106. // Make 100% sure there is a secret string
  107. if clusterconfig[manager.ConfigClusterSecret] == "" {
  108. clusterconfig[manager.ConfigClusterSecret] = manager.DefaultConfig[manager.ConfigClusterSecret]
  109. }
  110. // Set replication factor
  111. if f, ok := stateInfo.Get(manager.StateInfoREPFAC); !ok {
  112. repFac = int(math.Max(clusterconfig[manager.ConfigReplicationFactor].(float64), 1))
  113. stateInfo.Put(manager.StateInfoREPFAC, repFac)
  114. stateInfo.Flush()
  115. } else {
  116. repFac = f.(int)
  117. }
  118. // Create member objects - these calls will initialise this member's state info
  119. mm := manager.NewMemberManager(clusterconfig[manager.ConfigRPC].(string),
  120. clusterconfig[manager.ConfigMemberName].(string),
  121. clusterconfig[manager.ConfigClusterSecret].(string), stateInfo)
  122. dt, err := NewDistributionTable(mm.Members(), repFac)
  123. if err != nil {
  124. mm.LogInfo("Storage disabled:", err)
  125. }
  126. ds := &DistributedStorage{mm, &sync.Mutex{}, dt, err, gs.Name(), nil, nil, nil, nil, nil}
  127. // Create MemberStorage instance which is not exposed - the object will
  128. // only be used by the RPC server and called during start and stop. It is
  129. // the only instance which has access to the wrapped storage.GraphStorage.
  130. memberStorage, err := newMemberStorage(ds, gs)
  131. if err != nil {
  132. return nil, nil, err
  133. }
  134. // Register handler function for RPC calls and for closing the local storage
  135. mm.SetHandleDataRequest(memberStorage.handleDataRequest)
  136. ds.localDRHandler = memberStorage.handleDataRequest
  137. ds.localFlushHandler = memberStorage.gs.FlushAll
  138. ds.localCloseHandler = memberStorage.gs.Close
  139. // Set update handler
  140. ds.MemberManager.SetEventHandler(func() {
  141. // Handler for state info updates (this handler is called once the state
  142. // info object has been updated from the current state)
  143. si := mm.StateInfo()
  144. rfo, ok := si.Get(manager.StateInfoREPFAC)
  145. rf := rfo.(int)
  146. members, ok2 := si.Get(manager.StateInfoMEMBERS)
  147. if ok && ok2 {
  148. distTable, distTableErr := ds.DistributionTable()
  149. numMembers := len(members.([]string)) / 2
  150. numFailedPeers := len(mm.Client.FailedPeers())
  151. // Check if the cluster is operational
  152. if distTableErr == nil && rf > 0 && numFailedPeers > rf-1 {
  153. // Cluster is not operational
  154. if distTable != nil {
  155. err := fmt.Errorf("Too many members failed (total: %v, failed: %v, replication: %v)",
  156. numMembers, numFailedPeers, rf)
  157. mm.LogInfo("Storage disabled:", err.Error())
  158. ds.SetDistributionTableError(err)
  159. }
  160. return
  161. }
  162. // Check if the replication factor has changed or the amount of members
  163. if distTable == nil ||
  164. numMembers != len(distTable.Members()) ||
  165. rf != distTable.repFac {
  166. // Try to renew the distribution table
  167. if dt, err := NewDistributionTable(mm.Members(), rf); err == nil {
  168. ds.SetDistributionTable(dt)
  169. }
  170. }
  171. }
  172. }, memberStorage.transferWorker)
  173. return ds, memberStorage, DSRetNew
  174. }
  175. /*
  176. Start starts the distributed storage.
  177. */
  178. func (ds *DistributedStorage) Start() error {
  179. return ds.MemberManager.Start()
  180. }
  181. /*
  182. Close closes the distributed storage.
  183. */
  184. func (ds *DistributedStorage) Close() error {
  185. ds.MemberManager.Shutdown()
  186. return ds.localCloseHandler()
  187. }
  188. /*
  189. IsOperational returns if this distribution storage is operational
  190. */
  191. func (ds *DistributedStorage) IsOperational() bool {
  192. ds.distributionTableLock.Lock()
  193. defer ds.distributionTableLock.Unlock()
  194. return ds.distributionTableError == nil && ds.distributionTable != nil
  195. }
  196. /*
  197. DistributionTable returns the current distribution table or an error if the
  198. storage is not available.
  199. */
  200. func (ds *DistributedStorage) DistributionTable() (*DistributionTable, error) {
  201. ds.distributionTableLock.Lock()
  202. defer ds.distributionTableLock.Unlock()
  203. return ds.distributionTable, ds.distributionTableError
  204. }
  205. /*
  206. SetDistributionTable sets the distribution table and clears any error.
  207. */
  208. func (ds *DistributedStorage) SetDistributionTable(dt *DistributionTable) {
  209. ds.distributionTableLock.Lock()
  210. defer ds.distributionTableLock.Unlock()
  211. ds.distributionTable = dt
  212. ds.distributionTableError = nil
  213. }
  214. /*
  215. SetDistributionTableError records an distribution table related error. This
  216. clears the current distribution table.
  217. */
  218. func (ds *DistributedStorage) SetDistributionTableError(err error) {
  219. ds.distributionTableLock.Lock()
  220. defer ds.distributionTableLock.Unlock()
  221. ds.distributionTable = nil
  222. ds.distributionTableError = newError(err)
  223. }
  224. /*
  225. sendDataRequest is used to send data requests into the cluster.
  226. */
  227. func (ds *DistributedStorage) sendDataRequest(member string, request *DataRequest) (interface{}, error) {
  228. // Check if the request should be handled locally
  229. if member == ds.MemberManager.Name() {
  230. // Make sure to copy the request value for local insert or update requests.
  231. // This is necessary since the serialization buffers are pooled and never
  232. // dismissed. Locally the values are just passed around.
  233. if request.RequestType == RTInsert || request.RequestType == RTUpdate {
  234. var val []byte
  235. datautil.CopyObject(request.Value, &val)
  236. request.Value = val
  237. }
  238. var response interface{}
  239. err := ds.localDRHandler(request, &response)
  240. return response, err
  241. }
  242. return ds.MemberManager.Client.SendDataRequest(member, request)
  243. }
  244. /*
  245. Name returns the name of the cluster DistributedStorage instance.
  246. */
  247. func (ds *DistributedStorage) Name() string {
  248. return ds.MemberManager.Name()
  249. }
  250. /*
  251. LocalName returns the local name of the wrapped DistributedStorage instance.
  252. */
  253. func (ds *DistributedStorage) LocalName() string {
  254. return ds.localName
  255. }
  256. /*
  257. ReplicationFactor returns the replication factor of this cluster member. A
  258. value of 0 means the cluster is not operational in the moment.
  259. */
  260. func (ds *DistributedStorage) ReplicationFactor() int {
  261. // Do not do anything is the cluster is not operational
  262. distTable, distTableErr := ds.DistributionTable()
  263. if distTableErr != nil {
  264. return 0
  265. }
  266. return distTable.repFac
  267. }
  268. /*
  269. MainDB returns the main database. The main database is a quick
  270. lookup map for meta data which is always kept in memory.
  271. */
  272. func (ds *DistributedStorage) MainDB() map[string]string {
  273. ret := make(map[string]string)
  274. // Clear the current mainDB cache
  275. ds.mainDB = nil
  276. // Do not do anything is the cluster is not operational
  277. distTable, distTableErr := ds.DistributionTable()
  278. if distTableErr != nil {
  279. ds.mainDBError = distTableErr
  280. return ret
  281. }
  282. // Main db requests always go to member 1
  283. member := distTable.Members()[0]
  284. request := &DataRequest{RTGetMain, nil, nil, false}
  285. mainDB, err := ds.sendDataRequest(member, request)
  286. if err != nil {
  287. // Cycle through all replicating members if there was an error.
  288. // (as long as the cluster is considered operational there must be a
  289. // replicating member available to accept the request)
  290. for _, rmember := range distTable.Replicas(member) {
  291. mainDB, err = ds.sendDataRequest(rmember, request)
  292. if err == nil {
  293. break
  294. }
  295. }
  296. }
  297. ds.mainDBError = err
  298. if mainDB != nil {
  299. ds.mainDB = mainDB.(map[string]string)
  300. ret = ds.mainDB
  301. }
  302. // We failed to get the main db - any flush will fail.
  303. return ret
  304. }
  305. /*
  306. RollbackMain rollback the main database.
  307. */
  308. func (ds *DistributedStorage) RollbackMain() error {
  309. // Nothing to do here - the main db will be updated next time it is requested
  310. ds.mainDB = nil
  311. ds.mainDBError = nil
  312. return nil
  313. }
  314. /*
  315. FlushMain writes the main database to the storage.
  316. */
  317. func (ds *DistributedStorage) FlushMain() error {
  318. // Make sure there is no error
  319. distTable, distTableErr := ds.DistributionTable()
  320. if ds.mainDBError != nil {
  321. return ds.mainDBError
  322. } else if distTableErr != nil {
  323. return distTableErr
  324. }
  325. // Main db requests always go to member 1
  326. member := distTable.Members()[0]
  327. request := &DataRequest{RTSetMain, nil, ds.mainDB, false}
  328. _, err := ds.sendDataRequest(member, request)
  329. if err != nil {
  330. // Cycle through all replicating members if there was an error.
  331. // (as long as the cluster is considered operational there must be a
  332. // replicating member available to accept the request)
  333. for _, rmember := range distTable.Replicas(member) {
  334. _, err = ds.sendDataRequest(rmember, request)
  335. if err == nil {
  336. break
  337. }
  338. }
  339. }
  340. return err
  341. }
  342. /*
  343. FlushAll writes all pending local changes to the storage.
  344. */
  345. func (ds *DistributedStorage) FlushAll() error {
  346. return ds.localFlushHandler()
  347. }
  348. /*
  349. StorageManager gets a storage manager with a certain name. A non-exisClusterting StorageManager
  350. is not created automatically if the create flag is set to false.
  351. */
  352. func (ds *DistributedStorage) StorageManager(smname string, create bool) storage.Manager {
  353. // Make sure there is no error
  354. distTable, distTableErr := ds.DistributionTable()
  355. if ds.mainDBError != nil {
  356. return nil
  357. } else if distTableErr != nil {
  358. return nil
  359. }
  360. if !create {
  361. // Make sure the storage manage exists if it should not be created.
  362. // Try to get its 1st root value. If nil is returned then the storage
  363. // manager does not exist.
  364. // Root ids always go to member 1 as well as the first insert request for data.
  365. member := distTable.Members()[0]
  366. request := &DataRequest{RTGetRoot, map[DataRequestArg]interface{}{
  367. RPStoreName: smname,
  368. RPRoot: 1,
  369. }, nil, false}
  370. res, err := ds.sendDataRequest(member, request)
  371. if res == nil && err == nil {
  372. return nil
  373. }
  374. }
  375. return &DistributedStorageManager{smname, 0, ds, nil}
  376. }