distributedstorage.go 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501
  1. /*
  2. * EliasDB
  3. *
  4. * Copyright 2016 Matthias Ladkau. All rights reserved.
  5. *
  6. * This Source Code Form is subject to the terms of the Mozilla Public
  7. * License, v. 2.0. If a copy of the MPL was not distributed with this
  8. * file, You can obtain one at http://mozilla.org/MPL/2.0/.
  9. */
  10. /*
  11. Package cluster contains EliasDB's clustering code.
  12. The clustering code provides an abstraction layer to EliasDB's graphstorage.Storage.
  13. This means the actual storage of a cluster can be entirely memory based or use
  14. any other backend as long as it satisfies the graphstorage.Storage interface.
  15. DistributedStorage wraps a graphstorage.Storage and has a manager.MemberManager
  16. object.
  17. Members are identified by a unique name. Calling Start() on manager.MemberManager
  18. registers and starts the RPC server for the member. Cluster internal RPC requests
  19. are served by manager.Server. It is a singleton object which routes RPC calls
  20. to registered MemberManagers - this architecture makes it easy to unit test
  21. the cluster code. The manager.MemberManager has a manager.Client object which
  22. can be used to send messages to the cluster.
  23. The integrity of the cluster is protected by a shared secret (string) among
  24. all members of the cluster. A new member can only join and communicate with
  25. the cluster if it has the secret string. The secret string is never transferred
  26. directly over the network - it is only used for generating a member specific
  27. token which can be verified by all other members.
  28. The clustering code was inspired by Amazon DynamoDB
  29. http://www.allthingsdistributed.com/2012/01/amazon-dynamodb.html
  30. */
  31. package cluster
  32. import (
  33. "fmt"
  34. "math"
  35. "sync"
  36. "devt.de/krotik/common/datautil"
  37. "devt.de/krotik/eliasdb/cluster/manager"
  38. "devt.de/krotik/eliasdb/graph/graphstorage"
  39. "devt.de/krotik/eliasdb/storage"
  40. )
  41. /*
  42. DistributedStorageError is an error related to the distribution storage. This
  43. error is returned when the data distribution fails for example when too many
  44. cluster members have failed.
  45. */
  46. type DistributedStorageError struct {
  47. err error // Wrapped error
  48. }
  49. /*
  50. newError creates a new DistributedStorageError.
  51. */
  52. func newError(err error) error {
  53. return &DistributedStorageError{err}
  54. }
  55. /*
  56. Error returns a string representation of a DistributedStorageError.
  57. */
  58. func (dse *DistributedStorageError) Error() string {
  59. return fmt.Sprint("Storage disabled: ", dse.err.Error())
  60. }
  61. /*
  62. DistributedStorage data structure
  63. */
  64. type DistributedStorage struct {
  65. MemberManager *manager.MemberManager // Manager object
  66. distributionTableLock *sync.Mutex // Mutex to access the distribution table
  67. distributionTable *DistributionTable // Distribution table for the cluster - may be nil
  68. distributionTableError error // Error detail if the storage is disabled
  69. localName string // Name of the local graph storage
  70. localDRHandler func(interface{}, *interface{}) error // Local data request handler
  71. localFlushHandler func() error // Handler to flush the local storage
  72. localCloseHandler func() error // Handler to close the local storage
  73. mainDB map[string]string // Local main copy (only set when requested)
  74. mainDBError error // Last error when main db was requested
  75. }
  76. /*
  77. NewDistributedStorage creates a new cluster graph storage. The distributed storage
  78. wraps around a local graphstorage.Storage. The configuration of the distributed
  79. storage consists of two parts: A normal config map which defines static information
  80. like rpc port, secret string, etc and a StateInfo object which is used for dynamic
  81. information like cluster members, member status, etc. An empty StateInfo means
  82. that the cluster has only one member.
  83. */
  84. func NewDistributedStorage(gs graphstorage.Storage, config map[string]interface{},
  85. stateInfo manager.StateInfo) (*DistributedStorage, error) {
  86. ds, _, err := newDistributedAndMemberStorage(gs, config, stateInfo)
  87. return ds, err
  88. }
  89. /*
  90. DSRetNew is the return value on successful creating a distributed storage
  91. (used for testing)
  92. */
  93. var DSRetNew error
  94. /*
  95. newDistributedAndMemberStorage creates a new cluster graph storage but also returns a
  96. reference to the internal memberStorage object.
  97. */
  98. func newDistributedAndMemberStorage(gs graphstorage.Storage, config map[string]interface{},
  99. stateInfo manager.StateInfo) (*DistributedStorage, *memberStorage, error) {
  100. var repFac int
  101. // Merge given configuration with default configuration
  102. clusterconfig := datautil.MergeMaps(manager.DefaultConfig, config)
  103. // Make 100% sure there is a secret string
  104. if clusterconfig[manager.ConfigClusterSecret] == "" {
  105. clusterconfig[manager.ConfigClusterSecret] = manager.DefaultConfig[manager.ConfigClusterSecret]
  106. }
  107. // Set replication factor
  108. if f, ok := stateInfo.Get(manager.StateInfoREPFAC); !ok {
  109. repFac = int(math.Max(clusterconfig[manager.ConfigReplicationFactor].(float64), 1))
  110. stateInfo.Put(manager.StateInfoREPFAC, repFac)
  111. stateInfo.Flush()
  112. } else {
  113. repFac = f.(int)
  114. }
  115. // Create member objects - these calls will initialise this member's state info
  116. mm := manager.NewMemberManager(clusterconfig[manager.ConfigRPC].(string),
  117. clusterconfig[manager.ConfigMemberName].(string),
  118. clusterconfig[manager.ConfigClusterSecret].(string), stateInfo)
  119. dt, err := NewDistributionTable(mm.Members(), repFac)
  120. if err != nil {
  121. mm.LogInfo("Storage disabled:", err)
  122. }
  123. ds := &DistributedStorage{mm, &sync.Mutex{}, dt, err, gs.Name(), nil, nil, nil, nil, nil}
  124. // Create MemberStorage instance which is not exposed - the object will
  125. // only be used by the RPC server and called during start and stop. It is
  126. // the only instance which has access to the wrapped storage.GraphStorage.
  127. memberStorage, err := newMemberStorage(ds, gs)
  128. if err != nil {
  129. return nil, nil, err
  130. }
  131. // Register handler function for RPC calls and for closing the local storage
  132. mm.SetHandleDataRequest(memberStorage.handleDataRequest)
  133. ds.localDRHandler = memberStorage.handleDataRequest
  134. ds.localFlushHandler = memberStorage.gs.FlushAll
  135. ds.localCloseHandler = memberStorage.gs.Close
  136. // Set update handler
  137. ds.MemberManager.SetEventHandler(func() {
  138. // Handler for state info updates (this handler is called once the state
  139. // info object has been updated from the current state)
  140. si := mm.StateInfo()
  141. rfo, ok := si.Get(manager.StateInfoREPFAC)
  142. rf := rfo.(int)
  143. members, ok2 := si.Get(manager.StateInfoMEMBERS)
  144. if ok && ok2 {
  145. distTable, distTableErr := ds.DistributionTable()
  146. numMembers := len(members.([]string)) / 2
  147. numFailedPeers := len(mm.Client.FailedPeers())
  148. // Check if the cluster is operational
  149. if distTableErr == nil && rf > 0 && numFailedPeers > rf-1 {
  150. // Cluster is not operational
  151. if distTable != nil {
  152. err := fmt.Errorf("Too many members failed (total: %v, failed: %v, replication: %v)",
  153. numMembers, numFailedPeers, rf)
  154. mm.LogInfo("Storage disabled:", err.Error())
  155. ds.SetDistributionTableError(err)
  156. }
  157. return
  158. }
  159. // Check if the replication factor has changed or the amount of members
  160. if distTable == nil ||
  161. numMembers != len(distTable.Members()) ||
  162. rf != distTable.repFac {
  163. // Try to renew the distribution table
  164. if dt, err := NewDistributionTable(mm.Members(), rf); err == nil {
  165. ds.SetDistributionTable(dt)
  166. }
  167. }
  168. }
  169. }, memberStorage.transferWorker)
  170. return ds, memberStorage, DSRetNew
  171. }
  172. /*
  173. Start starts the distributed storage.
  174. */
  175. func (ds *DistributedStorage) Start() error {
  176. return ds.MemberManager.Start()
  177. }
  178. /*
  179. Close closes the distributed storage.
  180. */
  181. func (ds *DistributedStorage) Close() error {
  182. ds.MemberManager.Shutdown()
  183. return ds.localCloseHandler()
  184. }
  185. /*
  186. IsOperational returns if this distribution storage is operational
  187. */
  188. func (ds *DistributedStorage) IsOperational() bool {
  189. ds.distributionTableLock.Lock()
  190. defer ds.distributionTableLock.Unlock()
  191. return ds.distributionTableError == nil && ds.distributionTable != nil
  192. }
  193. /*
  194. DistributionTable returns the current distribution table or an error if the
  195. storage is not available.
  196. */
  197. func (ds *DistributedStorage) DistributionTable() (*DistributionTable, error) {
  198. ds.distributionTableLock.Lock()
  199. defer ds.distributionTableLock.Unlock()
  200. return ds.distributionTable, ds.distributionTableError
  201. }
  202. /*
  203. SetDistributionTable sets the distribution table and clears any error.
  204. */
  205. func (ds *DistributedStorage) SetDistributionTable(dt *DistributionTable) {
  206. ds.distributionTableLock.Lock()
  207. defer ds.distributionTableLock.Unlock()
  208. ds.distributionTable = dt
  209. ds.distributionTableError = nil
  210. }
  211. /*
  212. SetDistributionTableError records an distribution table related error. This
  213. clears the current distribution table.
  214. */
  215. func (ds *DistributedStorage) SetDistributionTableError(err error) {
  216. ds.distributionTableLock.Lock()
  217. defer ds.distributionTableLock.Unlock()
  218. ds.distributionTable = nil
  219. ds.distributionTableError = newError(err)
  220. }
  221. /*
  222. sendDataRequest is used to send data requests into the cluster.
  223. */
  224. func (ds *DistributedStorage) sendDataRequest(member string, request *DataRequest) (interface{}, error) {
  225. // Check if the request should be handled locally
  226. if member == ds.MemberManager.Name() {
  227. // Make sure to copy the request value for local insert or update requests.
  228. // This is necessary since the serialization buffers are pooled and never
  229. // dismissed. Locally the values are just passed around.
  230. if request.RequestType == RTInsert || request.RequestType == RTUpdate {
  231. var val []byte
  232. datautil.CopyObject(request.Value, &val)
  233. request.Value = val
  234. }
  235. var response interface{}
  236. err := ds.localDRHandler(request, &response)
  237. return response, err
  238. }
  239. return ds.MemberManager.Client.SendDataRequest(member, request)
  240. }
  241. /*
  242. Name returns the name of the cluster DistributedStorage instance.
  243. */
  244. func (ds *DistributedStorage) Name() string {
  245. return ds.MemberManager.Name()
  246. }
  247. /*
  248. LocalName returns the local name of the wrapped DistributedStorage instance.
  249. */
  250. func (ds *DistributedStorage) LocalName() string {
  251. return ds.localName
  252. }
  253. /*
  254. ReplicationFactor returns the replication factor of this cluster member. A
  255. value of 0 means the cluster is not operational in the moment.
  256. */
  257. func (ds *DistributedStorage) ReplicationFactor() int {
  258. // Do not do anything is the cluster is not operational
  259. distTable, distTableErr := ds.DistributionTable()
  260. if distTableErr != nil {
  261. return 0
  262. }
  263. return distTable.repFac
  264. }
  265. /*
  266. MainDB returns the main database. The main database is a quick
  267. lookup map for meta data which is always kept in memory.
  268. */
  269. func (ds *DistributedStorage) MainDB() map[string]string {
  270. ret := make(map[string]string)
  271. // Clear the current mainDB cache
  272. ds.mainDB = nil
  273. // Do not do anything is the cluster is not operational
  274. distTable, distTableErr := ds.DistributionTable()
  275. if distTableErr != nil {
  276. ds.mainDBError = distTableErr
  277. return ret
  278. }
  279. // Main db requests always go to member 1
  280. member := distTable.Members()[0]
  281. request := &DataRequest{RTGetMain, nil, nil, false}
  282. mainDB, err := ds.sendDataRequest(member, request)
  283. if err != nil {
  284. // Cycle through all replicating members if there was an error.
  285. // (as long as the cluster is considered operational there must be a
  286. // replicating member available to accept the request)
  287. for _, rmember := range distTable.Replicas(member) {
  288. mainDB, err = ds.sendDataRequest(rmember, request)
  289. if err == nil {
  290. break
  291. }
  292. }
  293. }
  294. ds.mainDBError = err
  295. if mainDB != nil {
  296. ds.mainDB = mainDB.(map[string]string)
  297. ret = ds.mainDB
  298. }
  299. // We failed to get the main db - any flush will fail.
  300. return ret
  301. }
  302. /*
  303. RollbackMain rollback the main database.
  304. */
  305. func (ds *DistributedStorage) RollbackMain() error {
  306. // Nothing to do here - the main db will be updated next time it is requested
  307. ds.mainDB = nil
  308. ds.mainDBError = nil
  309. return nil
  310. }
  311. /*
  312. FlushMain writes the main database to the storage.
  313. */
  314. func (ds *DistributedStorage) FlushMain() error {
  315. // Make sure there is no error
  316. distTable, distTableErr := ds.DistributionTable()
  317. if ds.mainDBError != nil {
  318. return ds.mainDBError
  319. } else if distTableErr != nil {
  320. return distTableErr
  321. }
  322. // Main db requests always go to member 1
  323. member := distTable.Members()[0]
  324. request := &DataRequest{RTSetMain, nil, ds.mainDB, false}
  325. _, err := ds.sendDataRequest(member, request)
  326. if err != nil {
  327. // Cycle through all replicating members if there was an error.
  328. // (as long as the cluster is considered operational there must be a
  329. // replicating member available to accept the request)
  330. for _, rmember := range distTable.Replicas(member) {
  331. _, err = ds.sendDataRequest(rmember, request)
  332. if err == nil {
  333. break
  334. }
  335. }
  336. }
  337. return err
  338. }
  339. /*
  340. FlushAll writes all pending local changes to the storage.
  341. */
  342. func (ds *DistributedStorage) FlushAll() error {
  343. return ds.localFlushHandler()
  344. }
  345. /*
  346. StorageManager gets a storage manager with a certain name. A non-exisClusterting StorageManager
  347. is not created automatically if the create flag is set to false.
  348. */
  349. func (ds *DistributedStorage) StorageManager(smname string, create bool) storage.Manager {
  350. // Make sure there is no error
  351. distTable, distTableErr := ds.DistributionTable()
  352. if ds.mainDBError != nil {
  353. return nil
  354. } else if distTableErr != nil {
  355. return nil
  356. }
  357. if !create {
  358. // Make sure the storage manage exists if it should not be created.
  359. // Try to get its 1st root value. If nil is returned then the storage
  360. // manager does not exist.
  361. // Root ids always go to member 1 as well as the first insert request for data.
  362. member := distTable.Members()[0]
  363. request := &DataRequest{RTGetRoot, map[DataRequestArg]interface{}{
  364. RPStoreName: smname,
  365. RPRoot: 1,
  366. }, nil, false}
  367. res, err := ds.sendDataRequest(member, request)
  368. if res == nil && err == nil {
  369. return nil
  370. }
  371. }
  372. return &DistributedStorageManager{smname, 0, ds, nil}
  373. }