transactionmanager.go 7.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357
  1. /*
  2. * EliasDB
  3. *
  4. * Copyright 2016 Matthias Ladkau. All rights reserved.
  5. *
  6. * This Source Code Form is subject to the terms of the Mozilla Public
  7. * License, v. 2.0. If a copy of the MPL was not distributed with this
  8. * file, You can obtain one at http://mozilla.org/MPL/2.0/.
  9. */
  10. package file
  11. import (
  12. "bytes"
  13. "encoding/binary"
  14. "fmt"
  15. "io"
  16. "os"
  17. )
  18. /*
  19. Common TransactionManager related errors
  20. */
  21. var (
  22. ErrBadMagic = fmt.Errorf("Bad magic for transaction log")
  23. )
  24. /*
  25. LogFileSuffix is the file suffix for transaction log files
  26. */
  27. const LogFileSuffix = "tlg"
  28. /*
  29. DefaultTransInLog is the default number of transactions which should be kept in memory
  30. (affects how often we sync the log from memory)
  31. */
  32. const DefaultTransInLog = 10
  33. /*
  34. DefaultTransSize is the default number of records in a single transaction
  35. (affects how many record pointers are allocated at first
  36. per transaction)
  37. */
  38. const DefaultTransSize = 10
  39. /*
  40. TransactionLogHeader is the magic number to identify transaction log files
  41. */
  42. var TransactionLogHeader = []byte{0x66, 0x42}
  43. /*
  44. LogFile is the abstract interface for an transaction log file.
  45. */
  46. type LogFile interface {
  47. io.Writer
  48. io.Closer
  49. Sync() error
  50. }
  51. /*
  52. TransactionManager data structure
  53. */
  54. type TransactionManager struct {
  55. name string // Name of this transaction manager
  56. logFile LogFile // Log file for transactions
  57. curTrans int // Current transaction pointer
  58. transList [][]*Record // List of storage files
  59. maxTrans int // Maximal number of transaction before log is written
  60. owner *StorageFile // Owner of this manager
  61. }
  62. /*
  63. String returns a string representation of a TransactionManager.
  64. */
  65. func (t *TransactionManager) String() string {
  66. buf := new(bytes.Buffer)
  67. hasLog := t.logFile != nil
  68. buf.WriteString(fmt.Sprintf("Transaction Manager: %v (logFile:%v curTrans:%v "+
  69. "maxTrans:%v)\n", t.name, hasLog, t.curTrans, t.maxTrans))
  70. buf.WriteString("====\n")
  71. buf.WriteString("transList:\n")
  72. for i := 0; i < len(t.transList); i++ {
  73. buf.WriteString(fmt.Sprint(i, ": "))
  74. for _, record := range t.transList[i] {
  75. buf.WriteString(fmt.Sprint(record.ID(), " "))
  76. }
  77. buf.WriteString("\n")
  78. }
  79. buf.WriteString("====\n")
  80. return buf.String()
  81. }
  82. /*
  83. NewTransactionManager creates a new transaction manager and returns a reference to it.
  84. */
  85. func NewTransactionManager(owner *StorageFile, doRecover bool) (*TransactionManager, error) {
  86. name := fmt.Sprintf("%s.%s", owner.Name(), LogFileSuffix)
  87. ret := &TransactionManager{name, nil, -1, make([][]*Record, DefaultTransInLog),
  88. DefaultTransInLog, owner}
  89. if doRecover {
  90. if err := ret.recover(); err != nil {
  91. if sfe, ok := err.(*StorageFileError); !ok || sfe.Type != ErrBadMagic {
  92. return nil, err
  93. }
  94. }
  95. // If we have a bad magic just overwrite the transaction file
  96. }
  97. if err := ret.open(); err != nil {
  98. return nil, err
  99. }
  100. return ret, nil
  101. }
  102. /*
  103. recover tries to recover pending transactions from the physical transaction log.
  104. */
  105. func (t *TransactionManager) recover() error {
  106. file, err := os.OpenFile(t.name, os.O_RDONLY, 0660)
  107. if err != nil {
  108. if os.IsNotExist(err) {
  109. return nil
  110. }
  111. return err
  112. }
  113. defer file.Close()
  114. // Read and verify magic
  115. magic := make([]byte, 2)
  116. i, _ := file.Read(magic)
  117. if i != 2 || magic[0] != TransactionLogHeader[0] ||
  118. magic[1] != TransactionLogHeader[1] {
  119. return NewStorageFileError(ErrBadMagic, "", t.owner.name)
  120. }
  121. for true {
  122. var numRecords int64
  123. if err := binary.Read(file, binary.LittleEndian, &numRecords); err != nil {
  124. if err == io.EOF {
  125. break
  126. }
  127. return err
  128. }
  129. recMap := make(map[uint64]*Record)
  130. for i := int64(0); i < numRecords; i++ {
  131. record, err := ReadRecord(file)
  132. if err != nil {
  133. return err
  134. }
  135. // Any duplicated records will only be synced once
  136. // using the latest version
  137. recMap[record.ID()] = record
  138. }
  139. // If something goes wrong here ignore and try to do the rest
  140. t.syncRecords(recMap, false)
  141. }
  142. return nil
  143. }
  144. /*
  145. Open opens the transaction log for writing.
  146. */
  147. func (t *TransactionManager) open() error {
  148. // Always create a new empty transaction log file
  149. file, err := os.OpenFile(t.name, os.O_CREATE|os.O_TRUNC|os.O_RDWR, 0660)
  150. if err != nil {
  151. return err
  152. }
  153. t.logFile = file
  154. t.logFile.Write(TransactionLogHeader)
  155. t.logFile.Sync()
  156. t.curTrans = -1
  157. return nil
  158. }
  159. /*
  160. Start starts a new transaction.
  161. */
  162. func (t *TransactionManager) start() {
  163. t.curTrans++
  164. if t.curTrans >= t.maxTrans {
  165. t.syncLogFromMemory()
  166. t.curTrans = 0
  167. }
  168. t.transList[t.curTrans] = make([]*Record, 0, DefaultTransSize)
  169. }
  170. /*
  171. Add adds a record to the current transaction.
  172. */
  173. func (t *TransactionManager) add(record *Record) {
  174. record.IncTransCount()
  175. t.transList[t.curTrans] = append(t.transList[t.curTrans], record)
  176. }
  177. /*
  178. Commit commits the memory transaction log to the physical transaction log.
  179. */
  180. func (t *TransactionManager) commit() error {
  181. // Write how many records will be stored
  182. if err := binary.Write(t.logFile, binary.LittleEndian,
  183. int64(len(t.transList[t.curTrans]))); err != nil {
  184. return err
  185. }
  186. // Write records to log file
  187. for _, record := range t.transList[t.curTrans] {
  188. if err := record.WriteRecord(t.logFile); err != nil {
  189. return err
  190. }
  191. }
  192. t.syncFile()
  193. // Clear all dirty flags
  194. for _, record := range t.transList[t.curTrans] {
  195. record.ClearDirty()
  196. }
  197. return nil
  198. }
  199. /*
  200. syncFile syncs the transaction log file with the disk.
  201. */
  202. func (t *TransactionManager) syncFile() {
  203. t.logFile.Sync()
  204. }
  205. /*
  206. close closes the trasaction log file.
  207. */
  208. func (t *TransactionManager) close() {
  209. t.syncFile()
  210. // If something went wrong with closing the handle
  211. // we don't care as we release the reference
  212. t.logFile.Close()
  213. t.logFile = nil
  214. }
  215. /*
  216. syncLogFromMemory syncs the transaction log from memory to disk.
  217. */
  218. func (t *TransactionManager) syncLogFromMemory() error {
  219. t.close()
  220. recMap := make(map[uint64]*Record)
  221. for i, transList := range t.transList {
  222. if transList == nil {
  223. continue
  224. }
  225. // Add each record to the record map, decreasing the transaction count
  226. // if the same record is listed twice.
  227. for _, record := range transList {
  228. _, ok := recMap[record.ID()]
  229. if ok {
  230. record.DecTransCount()
  231. } else {
  232. recMap[record.ID()] = record
  233. }
  234. }
  235. t.transList[i] = nil
  236. }
  237. // Write the records from the record list to disk
  238. if err := t.syncRecords(recMap, true); err != nil {
  239. return err
  240. }
  241. t.owner.Sync()
  242. return t.open()
  243. }
  244. /*
  245. syncLogFromDisk syncs the log from disk and clears the memory transaction log.
  246. This is used for the rollback operation.
  247. */
  248. func (t *TransactionManager) syncLogFromDisk() error {
  249. t.close()
  250. for i, transList := range t.transList {
  251. if transList == nil {
  252. continue
  253. }
  254. // Discard all records which are held in memory
  255. for _, record := range transList {
  256. record.DecTransCount()
  257. if !record.InTransaction() {
  258. t.owner.releaseInTrans(record, false)
  259. }
  260. }
  261. t.transList[i] = nil
  262. }
  263. if err := t.recover(); err != nil {
  264. return err
  265. }
  266. return t.open()
  267. }
  268. /*
  269. syncRecords writes a list of records to the pysical disk file.
  270. */
  271. func (t *TransactionManager) syncRecords(records map[uint64]*Record, clearMemTransLog bool) error {
  272. for _, record := range records {
  273. if err := t.owner.writeRecord(record); err != nil {
  274. return err
  275. }
  276. if clearMemTransLog {
  277. record.DecTransCount()
  278. if !record.InTransaction() {
  279. t.owner.releaseInTrans(record, true)
  280. }
  281. }
  282. }
  283. return nil
  284. }