tree.go 29 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356
  1. /*
  2. * Rufs - Remote Union File System
  3. *
  4. * Copyright 2017 Matthias Ladkau. All rights reserved.
  5. *
  6. * This Source Code Form is subject to the terms of the MIT
  7. * License, If a copy of the MIT License was not distributed with this
  8. * file, You can obtain one at https://opensource.org/licenses/MIT.
  9. */
  10. package rufs
  11. import (
  12. "bytes"
  13. "crypto/tls"
  14. "encoding/gob"
  15. "encoding/json"
  16. "fmt"
  17. "io"
  18. "os"
  19. "path"
  20. "regexp"
  21. "sort"
  22. "strings"
  23. "sync"
  24. "time"
  25. "unicode/utf8"
  26. "devt.de/krotik/common/bitutil"
  27. "devt.de/krotik/common/fileutil"
  28. "devt.de/krotik/common/stringutil"
  29. "devt.de/krotik/rufs/config"
  30. "devt.de/krotik/rufs/node"
  31. )
  32. /*
  33. Tree models a Rufs client which combines several branches.
  34. */
  35. type Tree struct {
  36. client *node.Client // RPC client
  37. treeLock *sync.RWMutex // Lock for maps
  38. root *treeItem // Tree root item
  39. branches []map[string]string // Added working branches
  40. branchesAll []map[string]string // All added branches also not working
  41. mapping []map[string]interface{} // Mappings from working branches
  42. mappingAll []map[string]interface{} // All used mappings
  43. }
  44. /*
  45. NewTree creates a new tree.
  46. */
  47. func NewTree(cfg map[string]interface{}, cert *tls.Certificate) (*Tree, error) {
  48. var err error
  49. var t *Tree
  50. // Make sure the given config is ok
  51. if err = config.CheckTreeConfig(cfg); err == nil {
  52. // Create RPC client
  53. c := node.NewClient(fileutil.ConfStr(cfg, config.TreeSecret), cert)
  54. // Create the tree
  55. t = &Tree{c, &sync.RWMutex{}, &treeItem{make(map[string]*treeItem),
  56. []string{}, []bool{}}, []map[string]string{},
  57. []map[string]string{}, []map[string]interface{}{},
  58. []map[string]interface{}{}}
  59. }
  60. return t, err
  61. }
  62. /*
  63. Config returns the current tree configuration as a JSON string.
  64. */
  65. func (t *Tree) Config() string {
  66. t.treeLock.RLock()
  67. defer t.treeLock.RUnlock()
  68. out, _ := json.MarshalIndent(map[string]interface{}{
  69. "branches": t.branches,
  70. "tree": t.mapping,
  71. }, "", " ")
  72. return string(out)
  73. }
  74. /*
  75. SetMapping adds a given tree mapping configuration in a JSON string.
  76. */
  77. func (t *Tree) SetMapping(config string) error {
  78. var err error
  79. var conf map[string][]map[string]interface{}
  80. // Unmarshal the config
  81. if err = json.Unmarshal([]byte(config), &conf); err == nil {
  82. // Reset the whole tree
  83. t.Reset(true)
  84. if branches, ok := conf["branches"]; ok {
  85. for _, b := range branches {
  86. t.AddBranch(b["branch"].(string), b["rpc"].(string), b["fingerprint"].(string))
  87. }
  88. }
  89. if mounts, ok := conf["tree"]; ok {
  90. for _, m := range mounts {
  91. t.AddMapping(m["path"].(string), m["branch"].(string), m["writeable"].(bool))
  92. }
  93. }
  94. }
  95. return err
  96. }
  97. /*
  98. KnownBranches returns a map of all known branches (active or not reachable).
  99. Caution: This map contains also the map of active branches with their fingerprints
  100. it should only be used for read operations.
  101. */
  102. func (t *Tree) KnownBranches() map[string]map[string]string {
  103. ret := make(map[string]map[string]string)
  104. t.treeLock.RLock()
  105. t.treeLock.RUnlock()
  106. for _, b := range t.branchesAll {
  107. ret[b["branch"]] = b
  108. }
  109. return ret
  110. }
  111. /*
  112. ActiveBranches returns a list of all known active branches and their fingerprints.
  113. */
  114. func (t *Tree) ActiveBranches() ([]string, []string) {
  115. return t.client.Peers()
  116. }
  117. /*
  118. NotReachableBranches returns a map of all known branches which couldn't be
  119. reached. The map contains the name and the definition of the branch.
  120. */
  121. func (t *Tree) NotReachableBranches() map[string]map[string]string {
  122. ret := make(map[string]map[string]string)
  123. t.treeLock.RLock()
  124. t.treeLock.RUnlock()
  125. activeBranches := make(map[string]map[string]string)
  126. for _, b := range t.branches {
  127. activeBranches[b["branch"]] = b
  128. }
  129. for _, b := range t.branchesAll {
  130. name := b["branch"]
  131. if _, ok := activeBranches[name]; !ok {
  132. ret[name] = b
  133. }
  134. }
  135. return ret
  136. }
  137. /*
  138. PingBranch sends a ping to a remote branch and returns its fingerprint or an error.
  139. */
  140. func (t *Tree) PingBranch(node string, rpc string) (string, error) {
  141. _, fp, err := t.client.SendPing(node, rpc)
  142. return fp, err
  143. }
  144. /*
  145. Reset either resets only all mounts or if the branches flag is specified
  146. also all known branches.
  147. */
  148. func (t *Tree) Reset(branches bool) {
  149. if branches {
  150. peers, _ := t.client.Peers()
  151. for _, p := range peers {
  152. t.client.RemovePeer(p)
  153. }
  154. t.branches = []map[string]string{}
  155. t.branchesAll = []map[string]string{}
  156. }
  157. t.treeLock.Lock()
  158. defer t.treeLock.Unlock()
  159. t.mapping = []map[string]interface{}{}
  160. t.mappingAll = []map[string]interface{}{}
  161. t.root = &treeItem{make(map[string]*treeItem), []string{}, []bool{}}
  162. }
  163. /*
  164. Refresh refreshes all known branches and mappings. Only reachable branches will
  165. be mapped into the tree.
  166. */
  167. func (t *Tree) Refresh() {
  168. addBranches := make(map[string]map[string]string)
  169. delBranches := make(map[string]map[string]string)
  170. nrBranches := t.NotReachableBranches()
  171. // Check all known branches and decide if they should be added or removed
  172. t.treeLock.RLock()
  173. for _, data := range t.branchesAll {
  174. branchName := data["branch"]
  175. branchRPC := data["rpc"]
  176. _, knownAsNotWorking := nrBranches[branchName]
  177. // Ping the branch
  178. _, _, err := t.client.SendPing(branchName, branchRPC)
  179. if err == nil && knownAsNotWorking {
  180. // Success branch can now be reached
  181. addBranches[branchName] = data
  182. } else if err != nil && !knownAsNotWorking {
  183. // Failure branch can no longer be reached
  184. delBranches[branchName] = data
  185. }
  186. }
  187. t.treeLock.RUnlock()
  188. // Now lock the tree and add/remove branches
  189. t.treeLock.Lock()
  190. for i, b := range t.branches {
  191. branchName := b["branch"]
  192. if _, ok := delBranches[branchName]; ok {
  193. t.client.RemovePeer(branchName)
  194. t.branches = append(t.branches[:i], t.branches[i+1:]...)
  195. }
  196. }
  197. for _, b := range addBranches {
  198. branchName := b["branch"]
  199. branchRPC := b["rpc"]
  200. branchFingerprint := b["fingerprint"]
  201. t.client.RegisterPeer(branchName, branchRPC, branchFingerprint)
  202. t.branches = append(t.branches, b)
  203. }
  204. // Rebuild all mappings
  205. mappings := t.mappingAll
  206. t.mapping = []map[string]interface{}{}
  207. t.mappingAll = []map[string]interface{}{}
  208. t.root = &treeItem{make(map[string]*treeItem), []string{}, []bool{}}
  209. t.treeLock.Unlock()
  210. for _, m := range mappings {
  211. t.AddMapping(fmt.Sprint(m["path"]), fmt.Sprint(m["branch"]), m["writeable"].(bool))
  212. }
  213. }
  214. /*
  215. AddBranch adds a branch to the tree.
  216. */
  217. func (t *Tree) AddBranch(branchName string, branchRPC string, branchFingerprint string) error {
  218. branchMap := map[string]string{
  219. "branch": branchName,
  220. "rpc": branchRPC,
  221. "fingerprint": branchFingerprint,
  222. }
  223. t.branchesAll = append(t.branchesAll, branchMap)
  224. // First ping the branch and see if we get a response
  225. _, fp, err := t.client.SendPing(branchName, branchRPC)
  226. // Only add the branch as active if we've seen it
  227. if err == nil {
  228. if branchFingerprint != "" && branchFingerprint != fp {
  229. err = fmt.Errorf("Remote branch has an unexpected fingerprint\nPresented fingerprint: %s\nExpected fingerprint : %s", branchFingerprint, fp)
  230. } else {
  231. t.treeLock.Lock()
  232. defer t.treeLock.Unlock()
  233. if err = t.client.RegisterPeer(branchName, branchRPC, fp); err == nil {
  234. // Once we know and accepted the fingerprint we change it
  235. //
  236. // Remote branches will never change their fingerprint
  237. // during a single network session
  238. branchMap["fingerprint"] = fp
  239. t.branches = append(t.branches, branchMap) // Store the added branch
  240. }
  241. }
  242. }
  243. return err
  244. }
  245. /*
  246. AddMapping adds a mapping from tree path to a branch.
  247. */
  248. func (t *Tree) AddMapping(dir, branchName string, writable bool) error {
  249. t.treeLock.Lock()
  250. defer t.treeLock.Unlock()
  251. err := node.ErrUnknownTarget
  252. mappingMap := map[string]interface{}{
  253. "path": dir,
  254. "branch": branchName,
  255. "writeable": writable,
  256. }
  257. t.mappingAll = append(t.mappingAll, mappingMap)
  258. peers, _ := t.client.Peers()
  259. for _, p := range peers {
  260. if p == branchName {
  261. // Split the given path and add the mapping
  262. t.root.addMapping(createMappingPath(dir), branchName, writable)
  263. t.mapping = append(t.mapping, mappingMap)
  264. err = nil
  265. }
  266. }
  267. return err
  268. }
  269. /*
  270. String returns a string representation of this tree.
  271. */
  272. func (t *Tree) String() string {
  273. if t.treeLock != nil {
  274. t.treeLock.RLock()
  275. defer t.treeLock.RUnlock()
  276. }
  277. var buf bytes.Buffer
  278. buf.WriteString("/: ")
  279. if t != nil && t.root != nil {
  280. t.root.String(1, &buf)
  281. }
  282. return buf.String()
  283. }
  284. // Client API
  285. // ==========
  286. /*
  287. Dir returns file listings matching a given pattern of one or more directories.
  288. The contents of the given path is returned. Optionally, also the contents of
  289. all subdirectories can be returned if the recursive flag is set. The return
  290. values is a list of traversed directories and their corresponding contents.
  291. */
  292. func (t *Tree) Dir(dir string, pattern string, recursive bool, checksums bool) ([]string, [][]os.FileInfo, error) {
  293. var err error
  294. var dirs []string
  295. var fis [][]os.FileInfo
  296. // Compile pattern
  297. re, err := regexp.Compile(pattern)
  298. if err != nil {
  299. return nil, nil, err
  300. }
  301. t.treeLock.RLock()
  302. defer t.treeLock.RUnlock()
  303. // Stip off trailing slashes to normalize the input
  304. if strings.HasSuffix(dir, "/") {
  305. dir = dir[:len(dir)-1]
  306. }
  307. treeVisitor := func(item *treeItem, treePath string, branchPath []string, branches []string, writable []bool) {
  308. for _, b := range branches {
  309. var res []byte
  310. if err == nil {
  311. res, err = t.client.SendData(b, map[string]string{
  312. ParamAction: OpDir,
  313. ParamPath: path.Join(branchPath...),
  314. ParamPattern: fmt.Sprint(pattern),
  315. ParamRecursive: fmt.Sprint(recursive),
  316. ParamChecksums: fmt.Sprint(checksums),
  317. }, nil)
  318. if err == nil {
  319. var dest []interface{}
  320. // Unpack the result
  321. if err = gob.NewDecoder(bytes.NewBuffer(res)).Decode(&dest); err == nil {
  322. bdirs := dest[0].([]string)
  323. bfis := dest[1].([][]os.FileInfo)
  324. // Construct the actual tree path for the returned directories
  325. for i, d := range bdirs {
  326. bdirs[i] = path.Join(treePath, d)
  327. // Merge these results into the overall results
  328. found := false
  329. for j, dir := range dirs {
  330. // Check if a directory from the result is already
  331. // in the overall result
  332. if dir == bdirs[i] {
  333. found = true
  334. // Create a map of existing names to avoid duplicates
  335. existing := make(map[string]bool)
  336. for _, fi := range fis[j] {
  337. existing[fi.Name()] = true
  338. }
  339. // Only add new files to the overall result
  340. for _, fi := range bfis[i] {
  341. if _, ok := existing[fi.Name()]; !ok {
  342. fis[j] = append(fis[j], fi)
  343. }
  344. }
  345. }
  346. }
  347. if !found {
  348. // Just append if the directory is not in the
  349. // overall results yet
  350. dirs = append(dirs, bdirs[i])
  351. fis = append(fis, bfis[i])
  352. }
  353. }
  354. }
  355. }
  356. }
  357. }
  358. }
  359. t.root.findPathBranches("/", createMappingPath(dir), recursive, treeVisitor)
  360. // Add pseudo directories for mapping components which have no corresponding
  361. // real directories
  362. dirsMap := make(map[string]int)
  363. for i, d := range dirs {
  364. dirsMap[d] = i
  365. }
  366. t.root.findPathBranches("/", createMappingPath(dir), recursive,
  367. func(item *treeItem, treePath string, branchPath []string, branches []string, writable []bool) {
  368. if !strings.HasPrefix(treePath, dir) {
  369. return
  370. }
  371. idx, ok := dirsMap[treePath]
  372. if !ok {
  373. // Create the entry if it does not exist
  374. dirs = append(dirs, treePath)
  375. idx = len(dirs) - 1
  376. dirsMap[treePath] = idx
  377. fis = append(fis, []os.FileInfo{})
  378. }
  379. // Add pseudo dirs if a physical directory is not present
  380. for n := range item.children {
  381. found := false
  382. for _, fi := range fis[idx] {
  383. if fi.Name() == n {
  384. found = true
  385. break
  386. }
  387. }
  388. if found {
  389. continue
  390. }
  391. if re.MatchString(n) {
  392. // Append if it matches the pattern
  393. fis[idx] = append(fis[idx], &FileInfo{
  394. FiName: n,
  395. FiSize: 0,
  396. FiMode: os.FileMode(os.ModeDir | 0777),
  397. FiModTime: time.Time{},
  398. })
  399. }
  400. }
  401. })
  402. return dirs, fis, err
  403. }
  404. /*
  405. Stat returns information about a given item. Use this function to find out
  406. if a given path is a file or directory.
  407. */
  408. func (t *Tree) Stat(item string) (os.FileInfo, error) {
  409. dir, file := path.Split(item)
  410. _, fis, err := t.Dir(dir, file, false, true)
  411. if len(fis) == 1 {
  412. for _, fi := range fis[0] {
  413. if fi.Name() == file {
  414. return fi, err
  415. }
  416. }
  417. }
  418. if err == nil {
  419. err = &node.Error{
  420. Type: node.ErrRemoteAction,
  421. Detail: os.ErrNotExist.Error(),
  422. IsNotExist: true,
  423. }
  424. }
  425. return nil, err
  426. }
  427. /*
  428. Copy is a general purpose copy function which creates files and directories.
  429. Destination must be a directory. A non-existing destination
  430. directory will be created.
  431. */
  432. func (t *Tree) Copy(src []string, dst string,
  433. updFunc func(file string, writtenBytes, totalBytes, currentFile, totalFiles int64)) error {
  434. var err error
  435. var relPaths []string
  436. files := make(map[string]os.FileInfo) // Make sure any file is only copied once
  437. paths := make(map[string]string)
  438. // Add files to be copied to items
  439. for _, s := range src {
  440. var fi os.FileInfo
  441. fi, err = t.Stat(s)
  442. if fi, err = t.Stat(s); fi != nil {
  443. if fi.IsDir() {
  444. // Find all files inside directories
  445. if dirs, fis, err := t.Dir(s, "", true, false); err == nil {
  446. for i, d := range dirs {
  447. for _, fi2 := range fis[i] {
  448. if !fi2.IsDir() {
  449. // Calculate the relative path by removing
  450. // source path from the absolute path
  451. relPath := path.Join(d, fi2.Name())[len(s):]
  452. relPath = path.Join("/"+fi.Name(), relPath)
  453. relPaths = append(relPaths, relPath)
  454. files[relPath] = fi2
  455. paths[relPath] = path.Join(d, fi2.Name())
  456. }
  457. }
  458. }
  459. }
  460. } else {
  461. // Single files are just added - these files will always
  462. // be at the root of the destination
  463. relPath := "/" + fi.Name()
  464. relPaths = append(relPaths, relPath)
  465. files[relPath] = fi
  466. paths[relPath] = s
  467. }
  468. }
  469. if err != nil {
  470. err = fmt.Errorf("Cannot stat %v: %v", s, err.Error())
  471. break
  472. }
  473. }
  474. if err == nil {
  475. var allFiles, cnt int64
  476. // Copy all found files
  477. allFiles = int64(len(files))
  478. for _, k := range relPaths {
  479. var totalSize, totalTransferred int64
  480. cnt++
  481. fi := files[k]
  482. totalSize = fi.Size()
  483. srcFile := paths[k]
  484. err = t.CopyFile(srcFile, path.Join(dst, k), func(b int) {
  485. if b >= 0 {
  486. totalTransferred += int64(b)
  487. updFunc(k, totalTransferred, totalSize, cnt, allFiles)
  488. } else {
  489. updFunc(k, int64(b), totalSize, cnt, allFiles)
  490. }
  491. })
  492. if err != nil {
  493. err = fmt.Errorf("Cannot copy %v to %v: %v", srcFile, dst, err.Error())
  494. break
  495. }
  496. }
  497. }
  498. return err
  499. }
  500. /*
  501. Sync operations
  502. */
  503. const (
  504. SyncCreateDirectory = "Create directory"
  505. SyncCopyFile = "Copy file"
  506. SyncRemoveDirectory = "Remove directory"
  507. SyncRemoveFile = "Remove file"
  508. )
  509. /*
  510. Sync a given destination with a given source directory. After this command has
  511. finished the dstDir will have the same files and directories as the srcDir.
  512. */
  513. func (t *Tree) Sync(srcDir string, dstDir string, recursive bool,
  514. updFunc func(op, srcFile, dstFile string, writtenBytes, totalBytes, currentFile, totalFiles int64)) error {
  515. var currentFile, totalFiles int64
  516. t.treeLock.RLock()
  517. defer t.treeLock.RUnlock()
  518. // doSync syncs a given src directory
  519. doSync := func(dir string, finfos []os.FileInfo) error {
  520. sdir := path.Join(srcDir, dir)
  521. ddir := path.Join(dstDir, dir)
  522. // Query the corresponding destination to see what is there
  523. _, dstFis, err := t.Dir(ddir, "", false, true)
  524. if err == nil {
  525. fileMap := make(map[string]string) // Map to quickly lookup destination files
  526. dirMap := make(map[string]bool) // Map to quickly lookup destination directories
  527. if len(dstFis) > 0 {
  528. for _, fi := range dstFis[0] {
  529. if fi.IsDir() {
  530. dirMap[fi.Name()] = true
  531. } else {
  532. fileMap[fi.Name()] = fi.(*FileInfo).Checksum()
  533. }
  534. }
  535. }
  536. // Go through the given source file infos and see what needs to be copied
  537. for _, fi := range finfos {
  538. currentFile++
  539. // Check if we have a directory or a file
  540. if fi.IsDir() {
  541. if _, ok := dirMap[fi.Name()]; !ok {
  542. // Create all directories which aren't there
  543. if updFunc != nil {
  544. updFunc(SyncCreateDirectory, "", path.Join(ddir, fi.Name()), 0, 0, currentFile, totalFiles)
  545. }
  546. _, err = t.ItemOp(ddir, map[string]string{
  547. ItemOpAction: ItemOpActMkDir,
  548. ItemOpName: fi.Name(),
  549. })
  550. }
  551. // Remove existing directories from the map so we can
  552. // use the map to remove directories which shouldn't
  553. // be there
  554. delete(dirMap, fi.Name())
  555. } else {
  556. fsum, ok := fileMap[fi.Name()]
  557. if !ok || fsum != fi.(*FileInfo).Checksum() {
  558. var u func(b int)
  559. s := path.Join(sdir, fi.Name())
  560. d := path.Join(ddir, fi.Name())
  561. // Copy the file if it does not exist or the checksum
  562. // is not matching
  563. if updFunc != nil {
  564. var totalTransferred, totalSize int64
  565. totalSize = fi.Size()
  566. u = func(b int) {
  567. if b >= 0 {
  568. totalTransferred += int64(b)
  569. updFunc(SyncCopyFile, s, d, totalTransferred, totalSize, currentFile, totalFiles)
  570. } else {
  571. updFunc(SyncCopyFile, s, d, int64(b), totalSize, currentFile, totalFiles)
  572. }
  573. }
  574. }
  575. if err = t.CopyFile(s, d, u); err != nil && updFunc != nil {
  576. // Note at which point the error message was produced
  577. updFunc(SyncCopyFile, s, d, 0, fi.Size(), currentFile, totalFiles)
  578. }
  579. }
  580. // Remove existing files from the map so we can
  581. // use the map to remove files which shouldn't
  582. // be there
  583. delete(fileMap, fi.Name())
  584. }
  585. if err != nil {
  586. break
  587. }
  588. }
  589. if err == nil {
  590. // Remove files and directories which are in the destination but
  591. // not in the source
  592. for d := range dirMap {
  593. if err == nil {
  594. if updFunc != nil {
  595. p := path.Join(ddir, d)
  596. updFunc(SyncRemoveDirectory, "", p, 0, 0, currentFile, totalFiles)
  597. }
  598. _, err = t.ItemOp(ddir, map[string]string{
  599. ItemOpAction: ItemOpActDelete,
  600. ItemOpName: d,
  601. })
  602. }
  603. }
  604. for f := range fileMap {
  605. if err == nil {
  606. if updFunc != nil {
  607. p := path.Join(ddir, f)
  608. updFunc(SyncRemoveFile, "", p, 0, 0, currentFile, totalFiles)
  609. }
  610. _, err = t.ItemOp(ddir, map[string]string{
  611. ItemOpAction: ItemOpActDelete,
  612. ItemOpName: f,
  613. })
  614. }
  615. }
  616. }
  617. }
  618. return err
  619. }
  620. // We only query the source once otherwise we might end up in an
  621. // endless loop if for example the dstDir is a subdirectory of srcDir
  622. srcDirs, srcFis, err := t.Dir(srcDir, "", recursive, true)
  623. if err == nil {
  624. for _, fis := range srcFis {
  625. totalFiles += int64(len(fis))
  626. }
  627. for i, dir := range srcDirs {
  628. if err = doSync(relPath(dir, srcDir), srcFis[i]); err != nil {
  629. break
  630. }
  631. }
  632. }
  633. return err
  634. }
  635. /*
  636. CopyFile copies a given file using a simple io.Pipe.
  637. */
  638. func (t *Tree) CopyFile(srcPath, dstPath string, updFunc func(writtenBytes int)) error {
  639. var pw io.WriteCloser
  640. var err, rerr error
  641. t.treeLock.RLock()
  642. defer t.treeLock.RUnlock()
  643. // Use a pipe to stream the contents of the source file to the destination file
  644. pr, pw := io.Pipe()
  645. if updFunc != nil {
  646. // Wrap the writer of the pipe
  647. pw = &statusUpdatingWriter{pw, updFunc}
  648. }
  649. // Make sure the src exists
  650. if _, rerr = t.ReadFile(srcPath, []byte{}, 0); rerr == nil {
  651. // Read the source in a go routine
  652. go func() {
  653. rerr = t.ReadFileToBuffer(srcPath, pw)
  654. pw.Close()
  655. }()
  656. // Write the destination file - this will return once the
  657. // writer is closed
  658. err = t.WriteFileFromBuffer(dstPath, pr)
  659. }
  660. if rerr != nil {
  661. // Check if we got an empty file
  662. if IsEOF(rerr) {
  663. _, err = t.WriteFile(dstPath, nil, 0)
  664. updFunc(0) // Report the creation of the empty file
  665. rerr = nil
  666. } else {
  667. // Read errors are reported before write errors
  668. err = rerr
  669. }
  670. }
  671. pr.Close()
  672. return err
  673. }
  674. /*
  675. ReadFileToBuffer reads a complete file into a given buffer which implements
  676. io.Writer.
  677. */
  678. func (t *Tree) ReadFileToBuffer(spath string, buf io.Writer) error {
  679. var n int
  680. var err error
  681. var offset int64
  682. readBuf := make([]byte, DefaultReadBufferSize)
  683. for err == nil {
  684. n, err = t.ReadFile(spath, readBuf, offset)
  685. if err == nil {
  686. _, err = buf.Write(readBuf[:n])
  687. offset += int64(n)
  688. } else if IsEOF(err) {
  689. // We reached the end of the file
  690. err = nil
  691. break
  692. }
  693. }
  694. return err
  695. }
  696. /*
  697. ReadFile reads up to len(p) bytes into p from the given offset. It
  698. returns the number of bytes read (0 <= n <= len(p)) and any error
  699. encountered.
  700. */
  701. func (t *Tree) ReadFile(spath string, p []byte, offset int64) (int, error) {
  702. var err error
  703. var n int
  704. var success bool
  705. t.treeLock.RLock()
  706. defer t.treeLock.RUnlock()
  707. err = &node.Error{
  708. Type: node.ErrRemoteAction,
  709. Detail: os.ErrNotExist.Error(),
  710. IsNotExist: true,
  711. }
  712. dir, file := path.Split(spath)
  713. t.root.findPathBranches(dir, createMappingPath(dir), false,
  714. func(item *treeItem, treePath string, branchPath []string, branches []string, writable []bool) {
  715. for _, b := range branches {
  716. if !success { // Only try other branches if we didn't have a success before
  717. var res []byte
  718. rpath := path.Join(branchPath...)
  719. rpath = path.Join(rpath, file)
  720. if res, err = t.client.SendData(b, map[string]string{
  721. ParamAction: OpRead,
  722. ParamPath: rpath,
  723. ParamOffset: fmt.Sprint(offset),
  724. ParamSize: fmt.Sprint(len(p)),
  725. }, nil); err == nil {
  726. var dest []interface{}
  727. // Unpack the result
  728. if err = gob.NewDecoder(bytes.NewBuffer(res)).Decode(&dest); err == nil {
  729. n = dest[0].(int)
  730. buf := dest[1].([]byte)
  731. copy(p, buf)
  732. }
  733. }
  734. success = err == nil
  735. // Special case EOF
  736. if IsEOF(err) {
  737. success = true
  738. }
  739. }
  740. }
  741. })
  742. return n, err
  743. }
  744. /*
  745. WriteFileFromBuffer writes a complete file from a given buffer which implements
  746. io.Reader.
  747. */
  748. func (t *Tree) WriteFileFromBuffer(spath string, buf io.Reader) error {
  749. var err error
  750. var offset int64
  751. writeBuf := make([]byte, DefaultReadBufferSize)
  752. for err == nil {
  753. var n int
  754. if n, err = buf.Read(writeBuf); err == nil {
  755. _, err = t.WriteFile(spath, writeBuf[:n], offset)
  756. offset += int64(n)
  757. } else if IsEOF(err) {
  758. // We reached the end of the file
  759. t.WriteFile(spath, []byte{}, offset)
  760. err = nil
  761. break
  762. }
  763. }
  764. return err
  765. }
  766. /*
  767. WriteFile writes p into the given file from the given offset. It
  768. returns the number of written bytes and any error encountered.
  769. */
  770. func (t *Tree) WriteFile(spath string, p []byte, offset int64) (int, error) {
  771. var err error
  772. var n, totalCount, ignoreCount int
  773. t.treeLock.RLock()
  774. defer t.treeLock.RUnlock()
  775. dir, file := path.Split(spath)
  776. t.root.findPathBranches(dir, createMappingPath(dir), false,
  777. func(item *treeItem, treePath string, branchPath []string, branches []string, writable []bool) {
  778. for i, b := range branches {
  779. var res []byte
  780. if err == nil {
  781. totalCount++
  782. if !writable[i] {
  783. // Ignore all non-writable branches
  784. ignoreCount++
  785. continue
  786. }
  787. rpath := path.Join(branchPath...)
  788. rpath = path.Join(rpath, file)
  789. if res, err = t.client.SendData(b, map[string]string{
  790. ParamAction: OpWrite,
  791. ParamPath: rpath,
  792. ParamOffset: fmt.Sprint(offset),
  793. }, p); err == nil {
  794. err = gob.NewDecoder(bytes.NewBuffer(res)).Decode(&n)
  795. }
  796. }
  797. }
  798. })
  799. if err == nil && totalCount == ignoreCount {
  800. err = fmt.Errorf("All applicable branches for the requested path were mounted as not writable")
  801. }
  802. return n, err
  803. }
  804. /*
  805. ItemOp executes a file or directory specific operation which can either
  806. succeed or fail (e.g. rename or delete). Actions and parameters should
  807. be given in the opdata map.
  808. */
  809. func (t *Tree) ItemOp(dir string, opdata map[string]string) (bool, error) {
  810. var err error
  811. var ret, recurse bool
  812. var totalCount, ignoreCount, notFoundCount int
  813. t.treeLock.RLock()
  814. defer t.treeLock.RUnlock()
  815. data := make(map[string]string)
  816. for k, v := range opdata {
  817. data[k] = v
  818. }
  819. data[ParamAction] = OpItemOp
  820. // Check if we should recurse
  821. if r, ok := data[ItemOpName]; ok {
  822. recurse = strings.HasSuffix(r, "**")
  823. }
  824. t.root.findPathBranches(dir, createMappingPath(dir), recurse,
  825. func(item *treeItem, treePath string, branchPath []string,
  826. branches []string, writable []bool) {
  827. for i, b := range branches {
  828. var res []byte
  829. totalCount++
  830. if !writable[i] {
  831. // Ignore all non-writable branches
  832. ignoreCount++
  833. continue
  834. }
  835. if err == nil {
  836. data[ParamPath] = path.Join(branchPath...)
  837. res, err = t.client.SendData(b, data, nil)
  838. if rerr, ok := err.(*node.Error); ok && rerr.IsNotExist {
  839. // Only count the not exist errors as this might only
  840. // be true for some branches
  841. notFoundCount++
  842. err = nil
  843. } else if err == nil {
  844. var bres bool
  845. // Execute the OpItem function
  846. err = gob.NewDecoder(bytes.NewBuffer(res)).Decode(&bres)
  847. ret = ret || bres // One positive result is enough
  848. }
  849. }
  850. }
  851. })
  852. if totalCount == ignoreCount {
  853. err = fmt.Errorf("All applicable branches for the requested path were mounted as not writable")
  854. } else if totalCount == notFoundCount+ignoreCount {
  855. err = &node.Error{
  856. Type: node.ErrRemoteAction,
  857. Detail: os.ErrNotExist.Error(),
  858. IsNotExist: true,
  859. }
  860. }
  861. return ret, err
  862. }
  863. // Util functions
  864. // ==============
  865. /*
  866. createMappingPath properly splits a given path into a mapping path.
  867. */
  868. func createMappingPath(path string) []string {
  869. var ret []string
  870. for _, i := range strings.Split(path, "/") {
  871. if i == "" {
  872. // Ignore empty child names
  873. continue
  874. }
  875. ret = append(ret, i)
  876. }
  877. return ret
  878. }
  879. /*
  880. DirResultToString formats a given Dir result into a human-readable string.
  881. */
  882. func DirResultToString(paths []string, infos [][]os.FileInfo) string {
  883. var buf bytes.Buffer
  884. // Sort the paths
  885. sort.Sort(&dirResult{paths, infos})
  886. // Sort the FileInfos within the paths
  887. for _, fis := range infos {
  888. sort.Sort(fileInfoSlice(fis))
  889. }
  890. for i, p := range paths {
  891. var maxlen int
  892. fis := infos[i]
  893. buf.WriteString(p)
  894. buf.WriteString("\n")
  895. sizeStrings := make([]string, 0, len(fis))
  896. for _, fi := range fis {
  897. sizeString := bitutil.ByteSizeString(fi.Size(), false)
  898. if strings.HasSuffix(sizeString, " B") {
  899. sizeString += " " // Unit should always be 3 runes
  900. }
  901. if l := utf8.RuneCountInString(sizeString); l > maxlen {
  902. maxlen = l
  903. }
  904. sizeStrings = append(sizeStrings, sizeString)
  905. }
  906. for j, fi := range fis {
  907. sizeString := sizeStrings[j]
  908. sizePrefix := stringutil.GenerateRollingString(" ",
  909. maxlen-utf8.RuneCountInString(sizeString))
  910. if rfi, ok := fi.(*FileInfo); ok && rfi.FiChecksum != "" {
  911. buf.WriteString(fmt.Sprintf("%v %v%v %v [%s]\n", fi.Mode(), sizePrefix,
  912. sizeString, fi.Name(), rfi.Checksum()))
  913. } else {
  914. buf.WriteString(fmt.Sprintf("%v %v%v %v\n", fi.Mode(), sizePrefix,
  915. sizeString, fi.Name()))
  916. }
  917. }
  918. if i < len(paths)-1 {
  919. buf.WriteString("\n")
  920. }
  921. }
  922. return buf.String()
  923. }
  924. // Helper functions
  925. // ================
  926. // Helper function to normalise relative paths
  927. /*
  928. relPath create a normalized relative path by removing a given path prefix.
  929. */
  930. func relPath(path, prefix string) string {
  931. norm := func(path string) string {
  932. if !strings.HasPrefix(path, "/") {
  933. path = "/" + path
  934. }
  935. if strings.HasSuffix(path, "/") {
  936. path = path[:len(path)-1]
  937. }
  938. return path
  939. }
  940. path = norm(path)
  941. prefix = norm(prefix)
  942. if strings.HasPrefix(path, prefix) {
  943. path = path[len(prefix):]
  944. if path == "" {
  945. path = "/"
  946. }
  947. }
  948. return path
  949. }
  950. // Helper objects to sort dir results
  951. type dirResult struct {
  952. paths []string
  953. infos [][]os.FileInfo
  954. }
  955. func (r *dirResult) Len() int { return len(r.paths) }
  956. func (r *dirResult) Less(i, j int) bool { return r.paths[i] < r.paths[j] }
  957. func (r *dirResult) Swap(i, j int) {
  958. r.paths[i], r.paths[j] = r.paths[j], r.paths[i]
  959. r.infos[i], r.infos[j] = r.infos[j], r.infos[i]
  960. }
  961. type fileInfoSlice []os.FileInfo
  962. func (f fileInfoSlice) Len() int { return len(f) }
  963. func (f fileInfoSlice) Less(i, j int) bool { return f[i].Name() < f[j].Name() }
  964. func (f fileInfoSlice) Swap(i, j int) { f[i], f[j] = f[j], f[i] }
  965. // Helper object to given status updates when copying files
  966. /*
  967. statusUpdatingWriter is an internal io.WriteCloser which is used for status
  968. updates.
  969. */
  970. type statusUpdatingWriter struct {
  971. io.WriteCloser
  972. statusUpdate func(writtenBytes int)
  973. }
  974. /*
  975. Write writes len(p) bytes from p to the writer.
  976. */
  977. func (w *statusUpdatingWriter) Write(p []byte) (int, error) {
  978. n, err := w.WriteCloser.Write(p)
  979. w.statusUpdate(n)
  980. return n, err
  981. }
  982. /*
  983. Close closes the writer.
  984. */
  985. func (w *statusUpdatingWriter) Close() error {
  986. w.statusUpdate(-1)
  987. return w.WriteCloser.Close()
  988. }