storagefile.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584
  1. /*
  2. * EliasDB
  3. *
  4. * Copyright 2016 Matthias Ladkau. All rights reserved.
  5. *
  6. * This Source Code Form is subject to the terms of the Mozilla Public
  7. * License, v. 2.0. If a copy of the MPL was not distributed with this
  8. * file, You can obtain one at http://mozilla.org/MPL/2.0/.
  9. */
  10. package file
  11. import (
  12. "bytes"
  13. "fmt"
  14. "io"
  15. "os"
  16. "devt.de/krotik/common/sortutil"
  17. )
  18. /*
  19. Common storage file related errors. Having these global definitions
  20. makes the error comparison easier but has potential race-conditions.
  21. If two StorageFile objects throw an error at the same time both errors
  22. will appear to come from the same instance.
  23. */
  24. var (
  25. ErrAlreadyInUse = newStorageFileError("Record is already in-use")
  26. ErrNotInUse = newStorageFileError("Record was not in-use")
  27. ErrInUse = newStorageFileError("Records are still in-use")
  28. ErrTransDisabled = newStorageFileError("Transactions are disabled")
  29. ErrInTrans = newStorageFileError("Records are still in a transaction")
  30. ErrNilData = newStorageFileError("Record has nil data")
  31. )
  32. /*
  33. DefaultRecordSize is the default size of a record in bytes
  34. */
  35. const DefaultRecordSize = 4096
  36. /*
  37. DefaultFileSize is the default size of a physical file (10GB)
  38. */
  39. const DefaultFileSize = 0x2540BE401 // 10000000001 Bytes
  40. /*
  41. StorageFile data structure
  42. */
  43. type StorageFile struct {
  44. name string // Name of the storage file
  45. transDisabled bool // Flag if transactions are disabled
  46. recordSize uint32 // Size of a record
  47. maxFileSize uint64 // Max size of a storage file on disk
  48. free map[uint64]*Record // Map of records which are stored in memory
  49. inUse map[uint64]*Record // Locked records which are currently being modified
  50. inTrans map[uint64]*Record // Records which are in the transaction log but not yet written to disk
  51. dirty map[uint64]*Record // Dirty little records waiting to be written
  52. files []*os.File // List of storage files
  53. tm *TransactionManager // Manager object for transactions
  54. }
  55. /*
  56. NewDefaultStorageFile creates a new storage file with default record size and
  57. returns a reference to it.
  58. */
  59. func NewDefaultStorageFile(name string, transDisabled bool) (*StorageFile, error) {
  60. return NewStorageFile(name, DefaultRecordSize, transDisabled)
  61. }
  62. /*
  63. NewStorageFile creates a new storage file and returns a reference to it.
  64. */
  65. func NewStorageFile(name string, recordSize uint32, transDisabled bool) (*StorageFile, error) {
  66. maxFileSize := DefaultFileSize - DefaultFileSize%uint64(recordSize)
  67. ret := &StorageFile{name, transDisabled, recordSize, maxFileSize,
  68. make(map[uint64]*Record), make(map[uint64]*Record), make(map[uint64]*Record),
  69. make(map[uint64]*Record), make([]*os.File, 0), nil}
  70. if !transDisabled {
  71. tm, err := NewTransactionManager(ret, true)
  72. if err != nil {
  73. return nil, err
  74. }
  75. ret.tm = tm
  76. }
  77. _, err := ret.getFile(0)
  78. if err != nil {
  79. return nil, err
  80. }
  81. return ret, nil
  82. }
  83. /*
  84. Name returns the name of this storage file.
  85. */
  86. func (s *StorageFile) Name() string {
  87. return s.name
  88. }
  89. /*
  90. RecordSize returns the size of records which can be storerd or retrieved.
  91. */
  92. func (s *StorageFile) RecordSize() uint32 {
  93. return s.recordSize
  94. }
  95. /*
  96. Get returns a record from the file. Other components can write to this record.
  97. Any write operation should set the dirty flag on the record. Dirty records will
  98. be written back to disk when the file is flushed after which the dirty flag is
  99. cleared. Get panics if a record is requested which is still in-use.
  100. */
  101. func (s *StorageFile) Get(id uint64) (*Record, error) {
  102. var record *Record
  103. // Check if the record is in one of the caches
  104. if record, ok := s.inTrans[id]; ok {
  105. delete(s.inTrans, id)
  106. s.inUse[id] = record
  107. return record, nil
  108. }
  109. if record, ok := s.dirty[id]; ok {
  110. delete(s.dirty, id)
  111. s.inUse[id] = record
  112. return record, nil
  113. }
  114. if record, ok := s.free[id]; ok {
  115. delete(s.free, id)
  116. s.inUse[id] = record
  117. return record, nil
  118. }
  119. // Error if a record which is in-use is requested again before it is released.
  120. if _, ok := s.inUse[id]; ok {
  121. return nil, ErrAlreadyInUse.fireError(s, fmt.Sprintf("Record %v", id))
  122. }
  123. // Read the record in from file
  124. record = s.createRecord(id)
  125. err := s.readRecord(record)
  126. if err != nil {
  127. return nil, err
  128. }
  129. s.inUse[id] = record
  130. return record, nil
  131. }
  132. /*
  133. getFile gets a physical file for a specific offset.
  134. */
  135. func (s *StorageFile) getFile(offset uint64) (*os.File, error) {
  136. filenumber := int(offset / s.maxFileSize)
  137. // Make sure the index exists which we want to use.
  138. // Fill all previous positions up with nil pointers if they don't exist.
  139. for i := len(s.files); i <= filenumber; i++ {
  140. s.files = append(s.files, nil)
  141. }
  142. var ret *os.File
  143. if len(s.files) > filenumber {
  144. ret = s.files[filenumber]
  145. }
  146. if ret == nil {
  147. // Important not to have os.O_APPEND since we really want
  148. // to have random access to the file.
  149. filename := fmt.Sprintf("%s.%d", s.name, filenumber)
  150. file, err := os.OpenFile(filename, os.O_CREATE|os.O_RDWR, 0660)
  151. if err != nil {
  152. return nil, err
  153. }
  154. s.files[filenumber] = file
  155. ret = file
  156. }
  157. return ret, nil
  158. }
  159. /*
  160. createRecord creates a new record - (either from the free cache or newly created).
  161. */
  162. func (s *StorageFile) createRecord(id uint64) *Record {
  163. var record *Record
  164. if len(s.free) != 0 {
  165. var rkey uint64
  166. for rkey, record = range s.free {
  167. break
  168. }
  169. delete(s.free, rkey)
  170. // NOTE At this point the free record contains
  171. // still old data. It is expected that the following
  172. // readRecord operation will overwrite the data.
  173. }
  174. if record == nil {
  175. record = NewRecord(id, make([]byte, s.recordSize, s.recordSize))
  176. }
  177. record.SetID(id)
  178. record.SetPageView(nil)
  179. record.ClearDirty()
  180. return record
  181. }
  182. /*
  183. writeRecord writes a record to disk.
  184. */
  185. func (s *StorageFile) writeRecord(record *Record) error {
  186. data := record.Data()
  187. if data != nil {
  188. offset := record.ID() * uint64(s.recordSize)
  189. file, err := s.getFile(offset)
  190. if err != nil {
  191. return err
  192. }
  193. file.WriteAt(data, int64(offset%s.maxFileSize))
  194. return nil
  195. }
  196. return ErrNilData.fireError(s, fmt.Sprintf("Record %v", record.ID()))
  197. }
  198. /*
  199. readRecord fills a given record object with data.
  200. */
  201. func (s *StorageFile) readRecord(record *Record) error {
  202. if record.Data() == nil {
  203. return ErrNilData.fireError(s, fmt.Sprintf("Record %v", record.ID()))
  204. }
  205. offset := record.ID() * uint64(s.recordSize)
  206. file, err := s.getFile(offset)
  207. if err != nil {
  208. return err
  209. }
  210. n, err := file.ReadAt(record.Data(), int64(offset%s.maxFileSize))
  211. if n > 0 && uint32(n) != s.recordSize {
  212. panic(fmt.Sprintf("File on disk returned unexpected length of data: %v "+
  213. "expected length was: %v", n, s.recordSize))
  214. } else if n == 0 {
  215. // We just allocate a new array here which seems to be the
  216. // quickest way to get an empty array.
  217. record.ClearData()
  218. }
  219. if err == io.EOF {
  220. return nil
  221. }
  222. return err
  223. }
  224. /*
  225. Discard a given record.
  226. */
  227. func (s *StorageFile) Discard(record *Record) {
  228. if record == nil {
  229. return
  230. }
  231. delete(s.inUse, record.ID())
  232. }
  233. /*
  234. releaseInTrans releases a record which was in a transaction. The client code
  235. may indicate if the record should be recycled.
  236. */
  237. func (s *StorageFile) releaseInTrans(record *Record, recycle bool) {
  238. if record == nil {
  239. return
  240. }
  241. _, ok := s.inTrans[record.ID()]
  242. if ok {
  243. delete(s.inTrans, record.ID())
  244. if recycle {
  245. s.free[record.ID()] = record
  246. }
  247. }
  248. }
  249. /*
  250. ReleaseInUseID releases a record given by its id from the in-use map. The
  251. client code may indicate if the record is not dirty.
  252. */
  253. func (s *StorageFile) ReleaseInUseID(id uint64, dirty bool) error {
  254. record, ok := s.inUse[id]
  255. if !ok {
  256. return ErrNotInUse.fireError(s, fmt.Sprintf("Record %v", id))
  257. }
  258. if !record.Dirty() && dirty {
  259. record.SetDirty()
  260. }
  261. s.ReleaseInUse(record)
  262. return nil
  263. }
  264. /*
  265. ReleaseInUse releases a record from the in-use map. ReleaseInUse panics if
  266. the record was not in use.
  267. */
  268. func (s *StorageFile) ReleaseInUse(record *Record) {
  269. if record == nil {
  270. return
  271. }
  272. id := record.ID()
  273. // Panic if a record which is release was not in-use.
  274. if _, ok := s.inUse[id]; !ok {
  275. panic(fmt.Sprintf("Released record %d was not in-use", id))
  276. }
  277. delete(s.inUse, id)
  278. if record.Dirty() {
  279. s.dirty[id] = record
  280. } else {
  281. if !s.transDisabled && record.InTransaction() {
  282. s.inTrans[id] = record
  283. } else {
  284. s.free[id] = record
  285. }
  286. }
  287. }
  288. /*
  289. Flush commits the current transaction by flushing all dirty records to the
  290. transaction log on disk. If transactions are disabled it simply
  291. writes all dirty records to disk.
  292. */
  293. func (s *StorageFile) Flush() error {
  294. if len(s.inUse) > 0 {
  295. return ErrInUse.fireError(s, fmt.Sprintf("Records %v", len(s.inUse)))
  296. }
  297. if len(s.dirty) == 0 {
  298. return nil
  299. }
  300. if !s.transDisabled {
  301. s.tm.start()
  302. }
  303. for id, record := range s.dirty {
  304. if s.transDisabled {
  305. err := s.writeRecord(record)
  306. if err != nil {
  307. return err
  308. }
  309. record.ClearDirty()
  310. delete(s.dirty, id)
  311. s.free[id] = record
  312. } else {
  313. s.tm.add(record)
  314. delete(s.dirty, id)
  315. s.inTrans[id] = record
  316. }
  317. }
  318. if !s.transDisabled {
  319. return s.tm.commit()
  320. }
  321. return nil
  322. }
  323. /*
  324. Rollback cancels the current transaction by discarding all dirty records.
  325. */
  326. func (s *StorageFile) Rollback() error {
  327. if s.transDisabled {
  328. return ErrTransDisabled.fireError(s, "")
  329. }
  330. if len(s.inUse) > 0 {
  331. return ErrInUse.fireError(s, fmt.Sprintf("Records %v", len(s.inUse)))
  332. }
  333. s.dirty = make(map[uint64]*Record)
  334. if err := s.tm.syncLogFromDisk(); err != nil {
  335. return err
  336. }
  337. if len(s.inTrans) > 0 {
  338. return ErrInTrans.fireError(s, fmt.Sprintf("Records %v", len(s.inTrans)))
  339. }
  340. return nil
  341. }
  342. /*
  343. Sync syncs all physical files.
  344. */
  345. func (s *StorageFile) Sync() {
  346. for _, file := range s.files {
  347. if file != nil {
  348. file.Sync()
  349. }
  350. }
  351. }
  352. /*
  353. Close commits all data and closes all physical files.
  354. */
  355. func (s *StorageFile) Close() error {
  356. if len(s.dirty) > 0 {
  357. if err := s.Flush(); err != nil {
  358. return err
  359. }
  360. }
  361. if !s.transDisabled {
  362. // If something fails here we will know about it
  363. // when checking if there are records in inTrans
  364. s.tm.syncLogFromMemory()
  365. s.tm.close()
  366. }
  367. if len(s.inTrans) > 0 {
  368. return ErrInTrans.fireError(s, fmt.Sprintf("Records %v", len(s.inTrans)))
  369. } else if len(s.inUse) > 0 {
  370. return ErrInUse.fireError(s, fmt.Sprintf("Records %v", len(s.inUse)))
  371. }
  372. for _, file := range s.files {
  373. if file != nil {
  374. file.Close()
  375. }
  376. }
  377. s.free = make(map[uint64]*Record)
  378. s.files = make([]*os.File, 0)
  379. // If transactions are enabled then a StorageFile cannot be
  380. // reused after it was closed.
  381. s.tm = nil
  382. return nil
  383. }
  384. /*
  385. String returns a string representation of a StorageFile.
  386. */
  387. func (s *StorageFile) String() string {
  388. buf := new(bytes.Buffer)
  389. buf.WriteString(fmt.Sprintf("Storage File: %v (transDisabled:%v recordSize:%v "+
  390. "maxFileSize:%v)\n", s.name, s.transDisabled, s.recordSize, s.maxFileSize))
  391. buf.WriteString("====\n")
  392. printRecordIDMap(buf, &s.free, "Free")
  393. buf.WriteString("\n")
  394. printRecordIDMap(buf, &s.inUse, "InUse")
  395. buf.WriteString("\n")
  396. printRecordIDMap(buf, &s.inTrans, "InTrans")
  397. buf.WriteString("\n")
  398. printRecordIDMap(buf, &s.dirty, "Dirty")
  399. buf.WriteString("\n")
  400. buf.WriteString("Open files: ")
  401. l := len(s.files)
  402. for i, file := range s.files {
  403. if file != nil {
  404. buf.WriteString(file.Name())
  405. buf.WriteString(fmt.Sprintf(" (%v)", i))
  406. if i < l-1 {
  407. buf.WriteString(", ")
  408. }
  409. }
  410. }
  411. buf.WriteString("\n")
  412. buf.WriteString("====\n")
  413. if s.tm != nil {
  414. buf.WriteString(s.tm.String())
  415. }
  416. return buf.String()
  417. }
  418. /*
  419. printRecordIDMap appends the ids of a record map to a given buffer.
  420. */
  421. func printRecordIDMap(buf *bytes.Buffer, recordMap *map[uint64]*Record, name string) {
  422. buf.WriteString(name)
  423. buf.WriteString(" Records: ")
  424. var keys []uint64
  425. for k := range *recordMap {
  426. keys = append(keys, k)
  427. }
  428. sortutil.UInt64s(keys)
  429. l := len(*recordMap)
  430. for _, id := range keys {
  431. buf.WriteString(fmt.Sprintf("%v", id))
  432. if l--; l > 0 {
  433. buf.WriteString(", ")
  434. }
  435. }
  436. }
  437. /*
  438. newStorageFileError returns a new StorageFile specific error.
  439. */
  440. func newStorageFileError(text string) *storagefileError {
  441. return &storagefileError{text, "?", ""}
  442. }
  443. /*
  444. StorageFile specific error datastructure
  445. */
  446. type storagefileError struct {
  447. msg string
  448. filename string
  449. info string
  450. }
  451. /*
  452. fireError returns the error instance from a specific StorageFile instance.
  453. */
  454. func (e *storagefileError) fireError(s *StorageFile, info string) error {
  455. e.filename = s.name
  456. e.info = info
  457. return e
  458. }
  459. /*
  460. Error returns a string representation of the error.
  461. */
  462. func (e *storagefileError) Error() string {
  463. return fmt.Sprintf("%s (%s - %s)", e.msg, e.filename, e.info)
  464. }