distributedstoragemanager.go 8.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402
  1. /*
  2. * EliasDB
  3. *
  4. * Copyright 2016 Matthias Ladkau. All rights reserved.
  5. *
  6. * This Source Code Form is subject to the terms of the Mozilla Public
  7. * License, v. 2.0. If a copy of the MPL was not distributed with this
  8. * file, You can obtain one at http://mozilla.org/MPL/2.0/.
  9. */
  10. package cluster
  11. import (
  12. "bytes"
  13. "encoding/gob"
  14. "fmt"
  15. "devt.de/krotik/common/errorutil"
  16. "devt.de/krotik/eliasdb/storage"
  17. )
  18. /*
  19. DistributedStorageManager is a storage.Manager which sends requests to the
  20. distributed storage.
  21. */
  22. type DistributedStorageManager struct {
  23. name string // Name of the storage manager
  24. rrc int // Round robin counter
  25. ds *DistributedStorage // Distributed storage which created the instance
  26. rootError error // Last error when root values were handled
  27. }
  28. /*
  29. Name returns the name of the StorageManager instance.
  30. */
  31. func (dsm *DistributedStorageManager) Name() string {
  32. return fmt.Sprint("DistributedStorageManager: ", dsm.name)
  33. }
  34. /*
  35. Root returns a root value.
  36. */
  37. func (dsm *DistributedStorageManager) Root(root int) uint64 {
  38. var ret uint64
  39. // Do not do anything if the cluster is not operational
  40. distTable, distTableErr := dsm.ds.DistributionTable()
  41. if distTableErr != nil {
  42. return 0
  43. }
  44. // Root ids always go to member 1
  45. member := distTable.Members()[0]
  46. request := &DataRequest{RTGetRoot, map[DataRequestArg]interface{}{
  47. RPStoreName: dsm.name,
  48. RPRoot: root,
  49. }, nil, false}
  50. res, err := dsm.ds.sendDataRequest(member, request)
  51. if err != nil {
  52. // Cycle through all replicating members if there was an error.
  53. // (as long as the cluster is considered operational there must be a
  54. // replicating member available to accept the request)
  55. for _, rmember := range distTable.Replicas(member) {
  56. res, err = dsm.ds.sendDataRequest(rmember, request)
  57. if err == nil {
  58. break
  59. }
  60. }
  61. }
  62. dsm.rootError = err
  63. if res != nil {
  64. ret = toUInt64(res)
  65. }
  66. return ret
  67. }
  68. /*
  69. SetRoot writes a root value.
  70. */
  71. func (dsm *DistributedStorageManager) SetRoot(root int, val uint64) {
  72. // Do not do anything is the cluster is not operational
  73. distTable, distTableErr := dsm.ds.DistributionTable()
  74. if distTableErr != nil {
  75. return
  76. }
  77. // Root ids always go to member 1
  78. member := distTable.Members()[0]
  79. request := &DataRequest{RTSetRoot, map[DataRequestArg]interface{}{
  80. RPStoreName: dsm.name,
  81. RPRoot: root,
  82. }, val, false}
  83. _, err := dsm.ds.sendDataRequest(member, request)
  84. if err != nil {
  85. // Cycle through all replicating members if there was an error.
  86. // (as long as the cluster is considered operational there must be a
  87. // replicating member available to accept the request)
  88. for _, rmember := range distTable.Replicas(member) {
  89. _, err = dsm.ds.sendDataRequest(rmember, request)
  90. if err == nil {
  91. break
  92. }
  93. }
  94. }
  95. dsm.rootError = err
  96. }
  97. /*
  98. Insert inserts an object and return its storage location.
  99. */
  100. func (dsm *DistributedStorageManager) Insert(o interface{}) (uint64, error) {
  101. return dsm.insertOrUpdate(true, 0, o)
  102. }
  103. /*
  104. Update updates a storage location.
  105. */
  106. func (dsm *DistributedStorageManager) Update(loc uint64, o interface{}) error {
  107. _, err := dsm.insertOrUpdate(false, loc, o)
  108. return err
  109. }
  110. /*
  111. insertOrUpdate stores an object and returns its storage location and any error.
  112. */
  113. func (dsm *DistributedStorageManager) insertOrUpdate(insert bool, loc uint64, o interface{}) (uint64, error) {
  114. var member string
  115. var replicatingMembers []string
  116. var rtype RequestType
  117. var ret uint64
  118. // Do not do anything is the cluster is not operational
  119. distTable, distTableErr := dsm.ds.DistributionTable()
  120. if distTableErr != nil {
  121. return 0, distTableErr
  122. }
  123. // Choose the instance this request should be routed to
  124. if insert {
  125. members := distTable.Members()
  126. member = members[(dsm.rrc)%len(members)]
  127. rtype = RTInsert
  128. } else {
  129. member, replicatingMembers = distTable.LocationHome(loc)
  130. rtype = RTUpdate
  131. }
  132. // Serialize the object into a gob bytes stream
  133. bb := storage.BufferPool.Get().(*bytes.Buffer)
  134. defer func() {
  135. bb.Reset()
  136. storage.BufferPool.Put(bb)
  137. }()
  138. errorutil.AssertOk(gob.NewEncoder(bb).Encode(o))
  139. request := &DataRequest{rtype, map[DataRequestArg]interface{}{
  140. RPStoreName: dsm.name,
  141. RPLoc: loc,
  142. }, bb.Bytes(), false}
  143. cloc, err := dsm.ds.sendDataRequest(member, request)
  144. if err == nil {
  145. return toUInt64(cloc), err
  146. }
  147. // An error has occurred we need to use another member
  148. if rtype == RTInsert {
  149. // Cycle through all members and see which one accepts first
  150. members := distTable.Members()
  151. lenMembers := len(members)
  152. for i := 1; i < lenMembers; i++ {
  153. member = members[(dsm.rrc+i)%lenMembers]
  154. cloc, nerr := dsm.ds.sendDataRequest(member, request)
  155. if nerr == nil {
  156. ret = toUInt64(cloc)
  157. err = nil
  158. break
  159. }
  160. }
  161. } else {
  162. // Cycle through all replicating members and see which one accepts first
  163. // (as long as the cluster is considered operational there must be a
  164. // replicating member available to accept the request)
  165. for _, member := range replicatingMembers {
  166. cloc, nerr := dsm.ds.sendDataRequest(member, request)
  167. if nerr == nil {
  168. ret = toUInt64(cloc)
  169. err = nil
  170. break
  171. }
  172. }
  173. }
  174. return ret, err
  175. }
  176. /*
  177. Free frees a storage location.
  178. */
  179. func (dsm *DistributedStorageManager) Free(loc uint64) error {
  180. // Do not do anything is the cluster is not operational
  181. distTable, distTableErr := dsm.ds.DistributionTable()
  182. if distTableErr != nil {
  183. return distTableErr
  184. }
  185. // Choose the instance this request should be routed to
  186. member, replicatingMembers := distTable.LocationHome(loc)
  187. request := &DataRequest{RTFree, map[DataRequestArg]interface{}{
  188. RPStoreName: dsm.name,
  189. RPLoc: loc,
  190. }, nil, false}
  191. _, err := dsm.ds.sendDataRequest(member, request)
  192. if err != nil {
  193. // Cycle through all replicating members and see which one accepts first
  194. // (as long as the cluster is considered operational there must be a
  195. // replicating member available to accept the request)
  196. for _, member := range replicatingMembers {
  197. _, nerr := dsm.ds.sendDataRequest(member, request)
  198. if nerr == nil {
  199. err = nil
  200. break
  201. }
  202. }
  203. }
  204. return err
  205. }
  206. /*
  207. Exists checks if an object exists in a given storage location.
  208. */
  209. func (dsm *DistributedStorageManager) Exists(loc uint64) (bool, error) {
  210. var ret bool
  211. err := dsm.lookupData(loc, &ret, false)
  212. return ret, err
  213. }
  214. /*
  215. Fetch fetches an object from a given storage location and writes it to
  216. a given data container.
  217. */
  218. func (dsm *DistributedStorageManager) Fetch(loc uint64, o interface{}) error {
  219. return dsm.lookupData(loc, o, true)
  220. }
  221. /*
  222. lookupData fetches or checks for an object in a given storage location.
  223. */
  224. func (dsm *DistributedStorageManager) lookupData(loc uint64, o interface{}, fetch bool) error {
  225. var rt RequestType
  226. // Do not do anything if the cluster is not operational
  227. distTable, distTableErr := dsm.ds.DistributionTable()
  228. if distTableErr != nil {
  229. return distTableErr
  230. }
  231. // Choose the instance this request should be routed to
  232. primaryMember, secondaryMembers := distTable.LocationHome(loc)
  233. if fetch {
  234. rt = RTFetch
  235. } else {
  236. rt = RTExists
  237. }
  238. request := &DataRequest{rt, map[DataRequestArg]interface{}{
  239. RPStoreName: dsm.name,
  240. RPLoc: loc,
  241. }, nil, false}
  242. res, err := dsm.ds.sendDataRequest(primaryMember, request)
  243. if err != nil || (!fetch && !res.(bool)) {
  244. // Try secondary members if the primary member failed or the data didn't exist there
  245. var serr error
  246. for _, member := range secondaryMembers {
  247. res, serr = dsm.ds.sendDataRequest(member, request)
  248. if serr == nil {
  249. err = nil
  250. break
  251. }
  252. }
  253. }
  254. if err == nil {
  255. if !fetch {
  256. *o.(*bool) = res.(bool)
  257. } else {
  258. gob.NewDecoder(bytes.NewReader(res.([]byte))).Decode(o)
  259. }
  260. }
  261. return err
  262. }
  263. /*
  264. FetchCached is not implemented for a DistributedStorageManager. Only defined to satisfy
  265. the StorageManager interface.
  266. */
  267. func (dsm *DistributedStorageManager) FetchCached(loc uint64) (interface{}, error) {
  268. return nil, storage.NewStorageManagerError(storage.ErrNotInCache, "", dsm.Name())
  269. }
  270. /*
  271. Flush is not implemented for a DistributedStorageManager. All changes are immediately
  272. written to disk in a cluster.
  273. */
  274. func (dsm *DistributedStorageManager) Flush() error {
  275. _, distTableErr := dsm.ds.DistributionTable()
  276. // Do not do anything if the cluster is not operational
  277. if distTableErr != nil {
  278. return distTableErr
  279. }
  280. // Increase round robin counter - things which belond together should be
  281. // stored on the same members
  282. dsm.rrc++
  283. return nil
  284. }
  285. /*
  286. Rollback is not implemented for a DistributedStorageManager. All changes are immediately
  287. written to disk in a cluster.
  288. */
  289. func (dsm *DistributedStorageManager) Rollback() error {
  290. return nil
  291. }
  292. /*
  293. Close is not implemented for a DistributedStorageManager. Only the local storage must
  294. be closed which is done when the DistributedStore is shut down.
  295. */
  296. func (dsm *DistributedStorageManager) Close() error {
  297. if _, distTableErr := dsm.ds.DistributionTable(); distTableErr != nil {
  298. return distTableErr
  299. }
  300. return dsm.rootError
  301. }