transactionmanager.go 7.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356
  1. /*
  2. * EliasDB
  3. *
  4. * Copyright 2016 Matthias Ladkau. All rights reserved.
  5. *
  6. * This Source Code Form is subject to the terms of the Mozilla Public
  7. * License, v. 2.0. If a copy of the MPL was not distributed with this
  8. * file, You can obtain one at http://mozilla.org/MPL/2.0/.
  9. */
  10. package file
  11. import (
  12. "bytes"
  13. "encoding/binary"
  14. "fmt"
  15. "io"
  16. "os"
  17. )
  18. /*
  19. Common TransactionManager related errors
  20. */
  21. var (
  22. ErrBadMagic = newStorageFileError("Bad magic for transaction log")
  23. )
  24. /*
  25. LogFileSuffix is the file suffix for transaction log files
  26. */
  27. const LogFileSuffix = "tlg"
  28. /*
  29. DefaultTransInLog is the default number of transactions which should be kept in memory
  30. (affects how often we sync the log from memory)
  31. */
  32. const DefaultTransInLog = 10
  33. /*
  34. DefaultTransSize is the default number of records in a single transaction
  35. (affects how many record pointers are allocated at first
  36. per transaction)
  37. */
  38. const DefaultTransSize = 10
  39. /*
  40. TransactionLogHeader is the magic number to identify transaction log files
  41. */
  42. var TransactionLogHeader = []byte{0x66, 0x42}
  43. /*
  44. LogFile is the abstract interface for an transaction log file.
  45. */
  46. type LogFile interface {
  47. io.Writer
  48. io.Closer
  49. Sync() error
  50. }
  51. /*
  52. TransactionManager data structure
  53. */
  54. type TransactionManager struct {
  55. name string // Name of this transaction manager
  56. logFile LogFile // Log file for transactions
  57. curTrans int // Current transaction pointer
  58. transList [][]*Record // List of storage files
  59. maxTrans int // Maximal number of transaction before log is written
  60. owner *StorageFile // Owner of this manager
  61. }
  62. /*
  63. String returns a string representation of a TransactionManager.
  64. */
  65. func (t *TransactionManager) String() string {
  66. buf := new(bytes.Buffer)
  67. hasLog := t.logFile != nil
  68. buf.WriteString(fmt.Sprintf("Transaction Manager: %v (logFile:%v curTrans:%v "+
  69. "maxTrans:%v)\n", t.name, hasLog, t.curTrans, t.maxTrans))
  70. buf.WriteString("====\n")
  71. buf.WriteString("transList:\n")
  72. for i := 0; i < len(t.transList); i++ {
  73. buf.WriteString(fmt.Sprint(i, ": "))
  74. for _, record := range t.transList[i] {
  75. buf.WriteString(fmt.Sprint(record.ID(), " "))
  76. }
  77. buf.WriteString("\n")
  78. }
  79. buf.WriteString("====\n")
  80. return buf.String()
  81. }
  82. /*
  83. NewTransactionManager creates a new transaction manager and returns a reference to it.
  84. */
  85. func NewTransactionManager(owner *StorageFile, doRecover bool) (*TransactionManager, error) {
  86. name := fmt.Sprintf("%s.%s", owner.Name(), LogFileSuffix)
  87. ret := &TransactionManager{name, nil, -1, make([][]*Record, DefaultTransInLog),
  88. DefaultTransInLog, owner}
  89. if doRecover {
  90. err := ret.recover()
  91. if err != nil && err != ErrBadMagic {
  92. return nil, err
  93. }
  94. // If we have a bad magic just overwrite the transaction file
  95. }
  96. if err := ret.open(); err != nil {
  97. return nil, err
  98. }
  99. return ret, nil
  100. }
  101. /*
  102. recover tries to recover pending transactions from the physical transaction log.
  103. */
  104. func (t *TransactionManager) recover() error {
  105. file, err := os.OpenFile(t.name, os.O_RDONLY, 0660)
  106. if err != nil {
  107. if os.IsNotExist(err) {
  108. return nil
  109. }
  110. return err
  111. }
  112. defer file.Close()
  113. // Read and verify magic
  114. magic := make([]byte, 2)
  115. i, _ := file.Read(magic)
  116. if i != 2 || magic[0] != TransactionLogHeader[0] ||
  117. magic[1] != TransactionLogHeader[1] {
  118. return ErrBadMagic.fireError(t.owner, "")
  119. }
  120. for true {
  121. var numRecords int64
  122. if err := binary.Read(file, binary.LittleEndian, &numRecords); err != nil {
  123. if err == io.EOF {
  124. break
  125. }
  126. return err
  127. }
  128. recMap := make(map[uint64]*Record)
  129. for i := int64(0); i < numRecords; i++ {
  130. record, err := ReadRecord(file)
  131. if err != nil {
  132. return err
  133. }
  134. // Any duplicated records will only be synced once
  135. // using the latest version
  136. recMap[record.ID()] = record
  137. }
  138. // If something goes wrong here ignore and try to do the rest
  139. t.syncRecords(recMap, false)
  140. }
  141. return nil
  142. }
  143. /*
  144. Open opens the transaction log for writing.
  145. */
  146. func (t *TransactionManager) open() error {
  147. // Always create a new empty transaction log file
  148. file, err := os.OpenFile(t.name, os.O_CREATE|os.O_TRUNC|os.O_RDWR, 0660)
  149. if err != nil {
  150. return err
  151. }
  152. t.logFile = file
  153. t.logFile.Write(TransactionLogHeader)
  154. t.logFile.Sync()
  155. t.curTrans = -1
  156. return nil
  157. }
  158. /*
  159. Start starts a new transaction.
  160. */
  161. func (t *TransactionManager) start() {
  162. t.curTrans++
  163. if t.curTrans >= t.maxTrans {
  164. t.syncLogFromMemory()
  165. t.curTrans = 0
  166. }
  167. t.transList[t.curTrans] = make([]*Record, 0, DefaultTransSize)
  168. }
  169. /*
  170. Add adds a record to the current transaction.
  171. */
  172. func (t *TransactionManager) add(record *Record) {
  173. record.IncTransCount()
  174. t.transList[t.curTrans] = append(t.transList[t.curTrans], record)
  175. }
  176. /*
  177. Commit commits the memory transaction log to the physical transaction log.
  178. */
  179. func (t *TransactionManager) commit() error {
  180. // Write how many records will be stored
  181. if err := binary.Write(t.logFile, binary.LittleEndian,
  182. int64(len(t.transList[t.curTrans]))); err != nil {
  183. return err
  184. }
  185. // Write records to log file
  186. for _, record := range t.transList[t.curTrans] {
  187. if err := record.WriteRecord(t.logFile); err != nil {
  188. return err
  189. }
  190. }
  191. t.syncFile()
  192. // Clear all dirty flags
  193. for _, record := range t.transList[t.curTrans] {
  194. record.ClearDirty()
  195. }
  196. return nil
  197. }
  198. /*
  199. syncFile syncs the transaction log file with the disk.
  200. */
  201. func (t *TransactionManager) syncFile() {
  202. t.logFile.Sync()
  203. }
  204. /*
  205. close closes the trasaction log file.
  206. */
  207. func (t *TransactionManager) close() {
  208. t.syncFile()
  209. // If something went wrong with closing the handle
  210. // we don't care as we release the reference
  211. t.logFile.Close()
  212. t.logFile = nil
  213. }
  214. /*
  215. syncLogFromMemory syncs the transaction log from memory to disk.
  216. */
  217. func (t *TransactionManager) syncLogFromMemory() error {
  218. t.close()
  219. recMap := make(map[uint64]*Record)
  220. for i, transList := range t.transList {
  221. if transList == nil {
  222. continue
  223. }
  224. // Add each record to the record map, decreasing the transaction count
  225. // if the same record is listed twice.
  226. for _, record := range transList {
  227. _, ok := recMap[record.ID()]
  228. if ok {
  229. record.DecTransCount()
  230. } else {
  231. recMap[record.ID()] = record
  232. }
  233. }
  234. t.transList[i] = nil
  235. }
  236. // Write the records from the record list to disk
  237. if err := t.syncRecords(recMap, true); err != nil {
  238. return err
  239. }
  240. t.owner.Sync()
  241. return t.open()
  242. }
  243. /*
  244. syncLogFromDisk syncs the log from disk and clears the memory transaction log.
  245. This is used for the rollback operation.
  246. */
  247. func (t *TransactionManager) syncLogFromDisk() error {
  248. t.close()
  249. for i, transList := range t.transList {
  250. if transList == nil {
  251. continue
  252. }
  253. // Discard all records which are held in memory
  254. for _, record := range transList {
  255. record.DecTransCount()
  256. if !record.InTransaction() {
  257. t.owner.releaseInTrans(record, false)
  258. }
  259. }
  260. t.transList[i] = nil
  261. }
  262. if err := t.recover(); err != nil {
  263. return err
  264. }
  265. return t.open()
  266. }
  267. /*
  268. syncRecords writes a list of records to the pysical disk file.
  269. */
  270. func (t *TransactionManager) syncRecords(records map[uint64]*Record, clearMemTransLog bool) error {
  271. for _, record := range records {
  272. if err := t.owner.writeRecord(record); err != nil {
  273. return err
  274. }
  275. if clearMemTransLog {
  276. record.DecTransCount()
  277. if !record.InTransaction() {
  278. t.owner.releaseInTrans(record, true)
  279. }
  280. }
  281. }
  282. return nil
  283. }