import_export.go 8.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427
  1. /*
  2. * EliasDB
  3. *
  4. * Copyright 2016 Matthias Ladkau. All rights reserved.
  5. *
  6. * This Source Code Form is subject to the terms of the Mozilla Public
  7. * License, v. 2.0. If a copy of the MPL was not distributed with this
  8. * file, You can obtain one at http://mozilla.org/MPL/2.0/.
  9. */
  10. package graph
  11. import (
  12. "bufio"
  13. "bytes"
  14. "encoding/json"
  15. "fmt"
  16. "io"
  17. "strings"
  18. "devt.de/krotik/common/errorutil"
  19. "devt.de/krotik/eliasdb/graph/data"
  20. "devt.de/krotik/eliasdb/hash"
  21. )
  22. /*
  23. ExportPartition dumps the contents of a partition to an io.Writer in JSON format:
  24. {
  25. nodes : [ { <attr> : <value>, ... }, ... ]
  26. edges : [ { <attr> : <value>, ... }, ... ]
  27. }
  28. */
  29. func ExportPartition(out io.Writer, part string, gm *Manager) error {
  30. // Use a map to unique found edge keys
  31. edgeKeys := make(map[string]string)
  32. writeData := func(data map[string]interface{}) {
  33. nk := 0
  34. for k, v := range data {
  35. // JSON encode value - ignore values which cannot be JSON encoded
  36. jv, err := json.Marshal(v)
  37. // Encoding errors result in a null value
  38. if err != nil {
  39. jv = []byte("null")
  40. }
  41. // Write out the node attributes
  42. fmt.Fprintf(out, " \"%s\" : %s", k, jv)
  43. if nk < len(data)-1 {
  44. fmt.Fprint(out, ",")
  45. }
  46. fmt.Fprint(out, "\n")
  47. nk++
  48. }
  49. }
  50. // Iterate over all available node kinds
  51. fmt.Fprint(out, `{
  52. "nodes" : [
  53. `)
  54. // Loop over all available kinds and build iterators if nodes
  55. // exist in the given partition
  56. var iters []*NodeKeyIterator
  57. var kinds []string
  58. for _, k := range gm.NodeKinds() {
  59. it, err := gm.NodeKeyIterator(part, k)
  60. if err != nil {
  61. return err
  62. }
  63. if it != nil {
  64. iters = append(iters, it)
  65. kinds = append(kinds, k)
  66. }
  67. }
  68. for ik, it := range iters {
  69. // Iterate over all node keys
  70. for i := 0; it.HasNext(); i++ {
  71. key := it.Next()
  72. if it.LastError != nil {
  73. return it.LastError
  74. }
  75. node, err := gm.FetchNode(part, key, kinds[ik])
  76. if err != nil {
  77. return err
  78. }
  79. // Fetch all connected relationships and store their key and kind
  80. _, edges, err := gm.TraverseMulti(part, key, kinds[ik], ":::", false)
  81. if err != nil {
  82. return err
  83. }
  84. for _, edge := range edges {
  85. edgeKeys[edge.Kind()+edge.Key()] = edge.Kind()
  86. }
  87. // Write out JSON object
  88. fmt.Fprint(out, " {\n")
  89. writeData(node.Data())
  90. if it.HasNext() || ik < len(iters)-1 {
  91. fmt.Fprint(out, " },\n")
  92. } else {
  93. fmt.Fprint(out, " }\n")
  94. }
  95. }
  96. }
  97. fmt.Fprint(out, ` ],
  98. "edges" : [
  99. `)
  100. // Iterate over all available edge kinds
  101. ie := 0
  102. for key, kind := range edgeKeys {
  103. key = key[len(kind):]
  104. edge, err := gm.FetchEdge(part, key, kind)
  105. if err != nil {
  106. return err
  107. }
  108. // Write out JSON object
  109. fmt.Fprint(out, " {\n")
  110. writeData(edge.Data())
  111. if ie < len(edgeKeys)-1 {
  112. fmt.Fprint(out, " },\n")
  113. } else {
  114. fmt.Fprint(out, " }\n")
  115. }
  116. ie++
  117. }
  118. fmt.Fprint(out, ` ]
  119. }`)
  120. return nil
  121. }
  122. /*
  123. SortDump sorts a string result which was produced by ExportPartition.
  124. Do not use this for very large results. Panics if the input data is not valid.
  125. */
  126. func SortDump(in string) string {
  127. var nodes []data.Node
  128. var edges []data.Node
  129. dec := json.NewDecoder(bytes.NewBufferString(in))
  130. gdata := make(map[string][]map[string]interface{})
  131. errorutil.AssertOk(dec.Decode(&gdata))
  132. nDataList := gdata["nodes"]
  133. for _, n := range nDataList {
  134. nodes = append(nodes, data.NewGraphNodeFromMap(n))
  135. }
  136. data.NodeSort(nodes)
  137. for i, n := range nodes {
  138. nDataList[i] = n.Data()
  139. }
  140. eDataList := gdata["edges"]
  141. for _, n := range eDataList {
  142. edges = append(edges, data.NewGraphNodeFromMap(n))
  143. }
  144. data.NodeSort(edges)
  145. for i, e := range edges {
  146. eDataList[i] = e.Data()
  147. }
  148. res, err := json.MarshalIndent(map[string]interface{}{
  149. "nodes": nDataList,
  150. "edges": eDataList,
  151. }, "", " ")
  152. errorutil.AssertOk(err)
  153. return string(res)
  154. }
  155. /*
  156. ImportPartition imports the JSON contents of an io.Reader into a given partition.
  157. The following format is expected:
  158. {
  159. nodes : [ { <attr> : <value>, ... }, ... ]
  160. edges : [ { <attr> : <value>, ... }, ... ]
  161. }
  162. */
  163. func ImportPartition(in io.Reader, part string, gm *Manager) error {
  164. dec := json.NewDecoder(in)
  165. gdata := make(map[string][]map[string]interface{})
  166. if err := dec.Decode(&gdata); err != nil {
  167. return fmt.Errorf("Could not decode file content as object with list of nodes and edges: %s", err.Error())
  168. }
  169. nDataList := gdata["nodes"]
  170. eDataList := gdata["edges"]
  171. // Create a transaction
  172. trans := NewGraphTrans(gm)
  173. // Store nodes in transaction
  174. for _, ndata := range nDataList {
  175. node := data.NewGraphNodeFromMap(ndata)
  176. if err := trans.StoreNode(part, node); err != nil {
  177. return err
  178. }
  179. }
  180. // Store edges in transaction
  181. for _, edata := range eDataList {
  182. edge := data.NewGraphEdgeFromNode(data.NewGraphNodeFromMap(edata))
  183. if err := trans.StoreEdge(part, edge); err != nil {
  184. return err
  185. }
  186. }
  187. // Commit transaction
  188. return trans.Commit()
  189. }
  190. /*
  191. ExportWriterFactory produces a named writer.
  192. */
  193. type ExportWriterFactory interface {
  194. CreateWriter(name string) (io.Writer, error)
  195. }
  196. /*
  197. LargeScaleExportPartition dumps the contents of a partition into multiple io.Writer in line-delimited JSON format:
  198. { <attr> : <value>, ... },
  199. { <attr> : <value>, ... },
  200. ...
  201. */
  202. func LargeScaleExportPartition(ewf ExportWriterFactory, gm *Manager) error {
  203. var out io.Writer
  204. var err error
  205. for _, part := range gm.Partitions() {
  206. if out, err = ewf.CreateWriter(fmt.Sprintf("%v-nodes", part)); err == nil {
  207. for _, k := range gm.NodeKinds() {
  208. var it *NodeKeyIterator
  209. if it, err = gm.NodeKeyIterator(part, k); err == nil {
  210. for it.HasNext() {
  211. key := it.Next()
  212. if err = it.Error(); err == nil {
  213. var node data.Node
  214. if node, err = gm.FetchNode(part, key, k); err == nil {
  215. var jsonBytes []byte
  216. if jsonBytes, err = json.Marshal(node.Data()); err == nil {
  217. _, err = out.Write(jsonBytes)
  218. fmt.Fprintln(out)
  219. }
  220. }
  221. }
  222. if err != nil {
  223. break
  224. }
  225. }
  226. }
  227. if err != nil {
  228. break
  229. }
  230. }
  231. }
  232. if err == nil {
  233. if out, err = ewf.CreateWriter(fmt.Sprintf("%v-edges", part)); err == nil {
  234. for _, k := range gm.EdgeKinds() {
  235. var tree *hash.HTree
  236. if tree, err = gm.getEdgeStorageHTree(part, k, false); err == nil {
  237. gm.mutex.RLock()
  238. it := hash.NewHTreeIterator(tree)
  239. gm.mutex.RUnlock()
  240. if err = it.LastError; err == nil {
  241. for it.HasNext() {
  242. gm.mutex.RLock()
  243. binaryKey, _ := it.Next()
  244. gm.mutex.RUnlock()
  245. if prefix := binaryKey[:len(PrefixNSAttrs)]; string(prefix) != PrefixNSAttrs {
  246. continue
  247. }
  248. key := string(binaryKey[len(PrefixNSAttrs):])
  249. if err = it.LastError; err == nil {
  250. var node data.Node
  251. if node, err = gm.FetchEdge(part, key, k); err == nil {
  252. var jsonBytes []byte
  253. if jsonBytes, err = json.Marshal(node.Data()); err == nil {
  254. if _, err = out.Write(jsonBytes); err == nil {
  255. _, err = fmt.Fprintln(out)
  256. }
  257. }
  258. }
  259. }
  260. if err != nil {
  261. break
  262. }
  263. }
  264. }
  265. }
  266. if err != nil {
  267. break
  268. }
  269. }
  270. }
  271. }
  272. if err != nil {
  273. break
  274. }
  275. }
  276. return err
  277. }
  278. /*
  279. ImportReaderFactory produces a named reader.
  280. */
  281. type ImportReaderFactory interface {
  282. Readers() ([]string, error)
  283. CreateReader(name string) (io.Reader, error)
  284. }
  285. /*
  286. LargeScaleImportPartition dumps the contents of a partition into multiple io.Writer in line-delimited JSON format:
  287. { <attr> : <value>, ... },
  288. { <attr> : <value>, ... },
  289. ...
  290. */
  291. func LargeScaleImportPartition(irf ImportReaderFactory, gm *Manager) error {
  292. readers, err := irf.Readers()
  293. if err == nil {
  294. trans := NewRollingTrans(NewGraphTrans(gm), 1000, gm, NewGraphTrans)
  295. for _, r := range readers {
  296. var in io.Reader
  297. if in, err = irf.CreateReader(r); err == nil {
  298. isNode := strings.HasSuffix(r, "-nodes")
  299. scanner := bufio.NewScanner(in)
  300. for scanner.Scan() {
  301. var nodeData map[string]interface{}
  302. if err = json.Unmarshal(scanner.Bytes(), &nodeData); err == nil {
  303. if isNode {
  304. err = trans.StoreNode(strings.TrimSuffix(r, "-nodes"), data.NewGraphNodeFromMap(nodeData))
  305. } else {
  306. err = trans.StoreEdge(strings.TrimSuffix(r, "-edges"), data.NewGraphEdgeFromNode(data.NewGraphNodeFromMap(nodeData)))
  307. }
  308. }
  309. if err != nil {
  310. break
  311. }
  312. }
  313. }
  314. if err != nil {
  315. break
  316. }
  317. }
  318. if err == nil {
  319. err = trans.Commit()
  320. }
  321. }
  322. return err
  323. }