storagefile.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573
  1. /*
  2. * EliasDB
  3. *
  4. * Copyright 2016 Matthias Ladkau. All rights reserved.
  5. *
  6. * This Source Code Form is subject to the terms of the Mozilla Public
  7. * License, v. 2.0. If a copy of the MPL was not distributed with this
  8. * file, You can obtain one at http://mozilla.org/MPL/2.0/.
  9. */
  10. package file
  11. import (
  12. "bytes"
  13. "errors"
  14. "fmt"
  15. "io"
  16. "os"
  17. "devt.de/krotik/common/sortutil"
  18. )
  19. /*
  20. Common storage file related errors.
  21. */
  22. var (
  23. ErrAlreadyInUse = errors.New("Record is already in-use")
  24. ErrNotInUse = errors.New("Record was not in-use")
  25. ErrInUse = errors.New("Records are still in-use")
  26. ErrTransDisabled = errors.New("Transactions are disabled")
  27. ErrInTrans = errors.New("Records are still in a transaction")
  28. ErrNilData = errors.New("Record has nil data")
  29. )
  30. /*
  31. DefaultRecordSize is the default size of a record in bytes
  32. */
  33. const DefaultRecordSize = 4096
  34. /*
  35. DefaultFileSize is the default size of a physical file (10GB)
  36. */
  37. const DefaultFileSize = 0x2540BE401 // 10000000001 Bytes
  38. /*
  39. StorageFile data structure
  40. */
  41. type StorageFile struct {
  42. name string // Name of the storage file
  43. transDisabled bool // Flag if transactions are disabled
  44. recordSize uint32 // Size of a record
  45. maxFileSize uint64 // Max size of a storage file on disk
  46. free map[uint64]*Record // Map of records which are stored in memory
  47. inUse map[uint64]*Record // Locked records which are currently being modified
  48. inTrans map[uint64]*Record // Records which are in the transaction log but not yet written to disk
  49. dirty map[uint64]*Record // Dirty little records waiting to be written
  50. files []*os.File // List of storage files
  51. tm *TransactionManager // Manager object for transactions
  52. }
  53. /*
  54. NewDefaultStorageFile creates a new storage file with default record size and
  55. returns a reference to it.
  56. */
  57. func NewDefaultStorageFile(name string, transDisabled bool) (*StorageFile, error) {
  58. return NewStorageFile(name, DefaultRecordSize, transDisabled)
  59. }
  60. /*
  61. NewStorageFile creates a new storage file and returns a reference to it.
  62. */
  63. func NewStorageFile(name string, recordSize uint32, transDisabled bool) (*StorageFile, error) {
  64. maxFileSize := DefaultFileSize - DefaultFileSize%uint64(recordSize)
  65. ret := &StorageFile{name, transDisabled, recordSize, maxFileSize,
  66. make(map[uint64]*Record), make(map[uint64]*Record), make(map[uint64]*Record),
  67. make(map[uint64]*Record), make([]*os.File, 0), nil}
  68. if !transDisabled {
  69. tm, err := NewTransactionManager(ret, true)
  70. if err != nil {
  71. return nil, err
  72. }
  73. ret.tm = tm
  74. }
  75. _, err := ret.getFile(0)
  76. if err != nil {
  77. return nil, err
  78. }
  79. return ret, nil
  80. }
  81. /*
  82. Name returns the name of this storage file.
  83. */
  84. func (s *StorageFile) Name() string {
  85. return s.name
  86. }
  87. /*
  88. RecordSize returns the size of records which can be storerd or retrieved.
  89. */
  90. func (s *StorageFile) RecordSize() uint32 {
  91. return s.recordSize
  92. }
  93. /*
  94. Get returns a record from the file. Other components can write to this record.
  95. Any write operation should set the dirty flag on the record. Dirty records will
  96. be written back to disk when the file is flushed after which the dirty flag is
  97. cleared. Get panics if a record is requested which is still in-use.
  98. */
  99. func (s *StorageFile) Get(id uint64) (*Record, error) {
  100. var record *Record
  101. // Check if the record is in one of the caches
  102. if record, ok := s.inTrans[id]; ok {
  103. delete(s.inTrans, id)
  104. s.inUse[id] = record
  105. return record, nil
  106. }
  107. if record, ok := s.dirty[id]; ok {
  108. delete(s.dirty, id)
  109. s.inUse[id] = record
  110. return record, nil
  111. }
  112. if record, ok := s.free[id]; ok {
  113. delete(s.free, id)
  114. s.inUse[id] = record
  115. return record, nil
  116. }
  117. // Error if a record which is in-use is requested again before it is released.
  118. if _, ok := s.inUse[id]; ok {
  119. return nil, NewStorageFileError(ErrAlreadyInUse, fmt.Sprintf("Record %v", id), s.name)
  120. }
  121. // Read the record in from file
  122. record = s.createRecord(id)
  123. err := s.readRecord(record)
  124. if err != nil {
  125. return nil, err
  126. }
  127. s.inUse[id] = record
  128. return record, nil
  129. }
  130. /*
  131. getFile gets a physical file for a specific offset.
  132. */
  133. func (s *StorageFile) getFile(offset uint64) (*os.File, error) {
  134. filenumber := int(offset / s.maxFileSize)
  135. // Make sure the index exists which we want to use.
  136. // Fill all previous positions up with nil pointers if they don't exist.
  137. for i := len(s.files); i <= filenumber; i++ {
  138. s.files = append(s.files, nil)
  139. }
  140. var ret *os.File
  141. if len(s.files) > filenumber {
  142. ret = s.files[filenumber]
  143. }
  144. if ret == nil {
  145. // Important not to have os.O_APPEND since we really want
  146. // to have random access to the file.
  147. filename := fmt.Sprintf("%s.%d", s.name, filenumber)
  148. file, err := os.OpenFile(filename, os.O_CREATE|os.O_RDWR, 0660)
  149. if err != nil {
  150. return nil, err
  151. }
  152. s.files[filenumber] = file
  153. ret = file
  154. }
  155. return ret, nil
  156. }
  157. /*
  158. createRecord creates a new record - (either from the free cache or newly created).
  159. */
  160. func (s *StorageFile) createRecord(id uint64) *Record {
  161. var record *Record
  162. if len(s.free) != 0 {
  163. var rkey uint64
  164. for rkey, record = range s.free {
  165. break
  166. }
  167. delete(s.free, rkey)
  168. // NOTE At this point the free record contains
  169. // still old data. It is expected that the following
  170. // readRecord operation will overwrite the data.
  171. }
  172. if record == nil {
  173. record = NewRecord(id, make([]byte, s.recordSize, s.recordSize))
  174. }
  175. record.SetID(id)
  176. record.SetPageView(nil)
  177. record.ClearDirty()
  178. return record
  179. }
  180. /*
  181. writeRecord writes a record to disk.
  182. */
  183. func (s *StorageFile) writeRecord(record *Record) error {
  184. data := record.Data()
  185. if data != nil {
  186. offset := record.ID() * uint64(s.recordSize)
  187. file, err := s.getFile(offset)
  188. if err != nil {
  189. return err
  190. }
  191. file.WriteAt(data, int64(offset%s.maxFileSize))
  192. return nil
  193. }
  194. return NewStorageFileError(ErrNilData, fmt.Sprintf("Record %v", record.ID()), s.name)
  195. }
  196. /*
  197. readRecord fills a given record object with data.
  198. */
  199. func (s *StorageFile) readRecord(record *Record) error {
  200. if record.Data() == nil {
  201. return NewStorageFileError(ErrNilData, fmt.Sprintf("Record %v", record.ID()), s.name)
  202. }
  203. offset := record.ID() * uint64(s.recordSize)
  204. file, err := s.getFile(offset)
  205. if err != nil {
  206. return err
  207. }
  208. n, err := file.ReadAt(record.Data(), int64(offset%s.maxFileSize))
  209. if n > 0 && uint32(n) != s.recordSize {
  210. panic(fmt.Sprintf("File on disk returned unexpected length of data: %v "+
  211. "expected length was: %v", n, s.recordSize))
  212. } else if n == 0 {
  213. // We just allocate a new array here which seems to be the
  214. // quickest way to get an empty array.
  215. record.ClearData()
  216. }
  217. if err == io.EOF {
  218. return nil
  219. }
  220. return err
  221. }
  222. /*
  223. Discard a given record.
  224. */
  225. func (s *StorageFile) Discard(record *Record) {
  226. if record == nil {
  227. return
  228. }
  229. delete(s.inUse, record.ID())
  230. }
  231. /*
  232. releaseInTrans releases a record which was in a transaction. The client code
  233. may indicate if the record should be recycled.
  234. */
  235. func (s *StorageFile) releaseInTrans(record *Record, recycle bool) {
  236. if record == nil {
  237. return
  238. }
  239. _, ok := s.inTrans[record.ID()]
  240. if ok {
  241. delete(s.inTrans, record.ID())
  242. if recycle {
  243. s.free[record.ID()] = record
  244. }
  245. }
  246. }
  247. /*
  248. ReleaseInUseID releases a record given by its id from the in-use map. The
  249. client code may indicate if the record is not dirty.
  250. */
  251. func (s *StorageFile) ReleaseInUseID(id uint64, dirty bool) error {
  252. record, ok := s.inUse[id]
  253. if !ok {
  254. return NewStorageFileError(ErrNotInUse, fmt.Sprintf("Record %v", id), s.name)
  255. }
  256. if !record.Dirty() && dirty {
  257. record.SetDirty()
  258. }
  259. s.ReleaseInUse(record)
  260. return nil
  261. }
  262. /*
  263. ReleaseInUse releases a record from the in-use map. ReleaseInUse panics if
  264. the record was not in use.
  265. */
  266. func (s *StorageFile) ReleaseInUse(record *Record) {
  267. if record == nil {
  268. return
  269. }
  270. id := record.ID()
  271. // Panic if a record which is release was not in-use.
  272. if _, ok := s.inUse[id]; !ok {
  273. panic(fmt.Sprintf("Released record %d was not in-use", id))
  274. }
  275. delete(s.inUse, id)
  276. if record.Dirty() {
  277. s.dirty[id] = record
  278. } else {
  279. if !s.transDisabled && record.InTransaction() {
  280. s.inTrans[id] = record
  281. } else {
  282. s.free[id] = record
  283. }
  284. }
  285. }
  286. /*
  287. Flush commits the current transaction by flushing all dirty records to the
  288. transaction log on disk. If transactions are disabled it simply
  289. writes all dirty records to disk.
  290. */
  291. func (s *StorageFile) Flush() error {
  292. if len(s.inUse) > 0 {
  293. return NewStorageFileError(ErrInUse, fmt.Sprintf("Records %v", len(s.inUse)), s.name)
  294. }
  295. if len(s.dirty) == 0 {
  296. return nil
  297. }
  298. if !s.transDisabled {
  299. s.tm.start()
  300. }
  301. for id, record := range s.dirty {
  302. if s.transDisabled {
  303. err := s.writeRecord(record)
  304. if err != nil {
  305. return err
  306. }
  307. record.ClearDirty()
  308. delete(s.dirty, id)
  309. s.free[id] = record
  310. } else {
  311. s.tm.add(record)
  312. delete(s.dirty, id)
  313. s.inTrans[id] = record
  314. }
  315. }
  316. if !s.transDisabled {
  317. return s.tm.commit()
  318. }
  319. return nil
  320. }
  321. /*
  322. Rollback cancels the current transaction by discarding all dirty records.
  323. */
  324. func (s *StorageFile) Rollback() error {
  325. if s.transDisabled {
  326. return NewStorageFileError(ErrTransDisabled, "", s.name)
  327. }
  328. if len(s.inUse) > 0 {
  329. return NewStorageFileError(ErrInUse, fmt.Sprintf("Records %v", len(s.inUse)), s.name)
  330. }
  331. s.dirty = make(map[uint64]*Record)
  332. if err := s.tm.syncLogFromDisk(); err != nil {
  333. return err
  334. }
  335. if len(s.inTrans) > 0 {
  336. return NewStorageFileError(ErrInTrans, fmt.Sprintf("Records %v", len(s.inTrans)), s.name)
  337. }
  338. return nil
  339. }
  340. /*
  341. Sync syncs all physical files.
  342. */
  343. func (s *StorageFile) Sync() {
  344. for _, file := range s.files {
  345. if file != nil {
  346. file.Sync()
  347. }
  348. }
  349. }
  350. /*
  351. Close commits all data and closes all physical files.
  352. */
  353. func (s *StorageFile) Close() error {
  354. if len(s.dirty) > 0 {
  355. if err := s.Flush(); err != nil {
  356. return err
  357. }
  358. }
  359. if !s.transDisabled {
  360. // If something fails here we will know about it
  361. // when checking if there are records in inTrans
  362. s.tm.syncLogFromMemory()
  363. s.tm.close()
  364. }
  365. if len(s.inTrans) > 0 {
  366. return NewStorageFileError(ErrInTrans, fmt.Sprintf("Records %v", len(s.inTrans)), s.name)
  367. } else if len(s.inUse) > 0 {
  368. return NewStorageFileError(ErrInUse, fmt.Sprintf("Records %v", len(s.inUse)), s.name)
  369. }
  370. for _, file := range s.files {
  371. if file != nil {
  372. file.Close()
  373. }
  374. }
  375. s.free = make(map[uint64]*Record)
  376. s.files = make([]*os.File, 0)
  377. // If transactions are enabled then a StorageFile cannot be
  378. // reused after it was closed.
  379. s.tm = nil
  380. return nil
  381. }
  382. /*
  383. String returns a string representation of a StorageFile.
  384. */
  385. func (s *StorageFile) String() string {
  386. buf := new(bytes.Buffer)
  387. buf.WriteString(fmt.Sprintf("Storage File: %v (transDisabled:%v recordSize:%v "+
  388. "maxFileSize:%v)\n", s.name, s.transDisabled, s.recordSize, s.maxFileSize))
  389. buf.WriteString("====\n")
  390. printRecordIDMap(buf, &s.free, "Free")
  391. buf.WriteString("\n")
  392. printRecordIDMap(buf, &s.inUse, "InUse")
  393. buf.WriteString("\n")
  394. printRecordIDMap(buf, &s.inTrans, "InTrans")
  395. buf.WriteString("\n")
  396. printRecordIDMap(buf, &s.dirty, "Dirty")
  397. buf.WriteString("\n")
  398. buf.WriteString("Open files: ")
  399. l := len(s.files)
  400. for i, file := range s.files {
  401. if file != nil {
  402. buf.WriteString(file.Name())
  403. buf.WriteString(fmt.Sprintf(" (%v)", i))
  404. if i < l-1 {
  405. buf.WriteString(", ")
  406. }
  407. }
  408. }
  409. buf.WriteString("\n")
  410. buf.WriteString("====\n")
  411. if s.tm != nil {
  412. buf.WriteString(s.tm.String())
  413. }
  414. return buf.String()
  415. }
  416. /*
  417. printRecordIDMap appends the ids of a record map to a given buffer.
  418. */
  419. func printRecordIDMap(buf *bytes.Buffer, recordMap *map[uint64]*Record, name string) {
  420. buf.WriteString(name)
  421. buf.WriteString(" Records: ")
  422. var keys []uint64
  423. for k := range *recordMap {
  424. keys = append(keys, k)
  425. }
  426. sortutil.UInt64s(keys)
  427. l := len(*recordMap)
  428. for _, id := range keys {
  429. buf.WriteString(fmt.Sprintf("%v", id))
  430. if l--; l > 0 {
  431. buf.WriteString(", ")
  432. }
  433. }
  434. }
  435. /*
  436. StorageFileError is a storage file related error.
  437. */
  438. type StorageFileError struct {
  439. Type error
  440. Detail string
  441. Filename string
  442. }
  443. /*
  444. NewStorageFileError returns a new StorageFile specific error.
  445. */
  446. func NewStorageFileError(sfeType error, sfeDetail string, sfeFilename string) *StorageFileError {
  447. return &StorageFileError{sfeType, sfeDetail, sfeFilename}
  448. }
  449. /*
  450. Error returns a string representation of the error.
  451. */
  452. func (e *StorageFileError) Error() string {
  453. return fmt.Sprintf("%s (%s - %s)", e.Type.Error(), e.Filename, e.Detail)
  454. }