|
@@ -269,91 +269,99 @@ LargeScaleExportPartition dumps the contents of a partition into multiple io.Wri
|
|
{ <attr> : <value>, ... },
|
|
{ <attr> : <value>, ... },
|
|
...
|
|
...
|
|
*/
|
|
*/
|
|
-func LargeScaleExportPartition(ewf ExportWriterFactory, part string, gm *Manager) error {
|
|
|
|
- out, err := ewf.CreateWriter(fmt.Sprintf("%v-nodes", part))
|
|
|
|
|
|
+func LargeScaleExportPartition(ewf ExportWriterFactory, gm *Manager) error {
|
|
|
|
+ var out io.Writer
|
|
|
|
+ var err error
|
|
|
|
|
|
- if err == nil {
|
|
|
|
- for _, k := range gm.NodeKinds() {
|
|
|
|
- var it *NodeKeyIterator
|
|
|
|
|
|
+ for _, part := range gm.Partitions() {
|
|
|
|
|
|
- if it, err = gm.NodeKeyIterator(part, k); err == nil {
|
|
|
|
|
|
+ if out, err = ewf.CreateWriter(fmt.Sprintf("%v-nodes", part)); err == nil {
|
|
|
|
+ for _, k := range gm.NodeKinds() {
|
|
|
|
+ var it *NodeKeyIterator
|
|
|
|
|
|
- for it.HasNext() {
|
|
|
|
- key := it.Next()
|
|
|
|
|
|
+ if it, err = gm.NodeKeyIterator(part, k); err == nil {
|
|
|
|
|
|
- if err = it.Error(); err == nil {
|
|
|
|
- var node data.Node
|
|
|
|
|
|
+ for it.HasNext() {
|
|
|
|
+ key := it.Next()
|
|
|
|
|
|
- if node, err = gm.FetchNode(part, key, k); err == nil {
|
|
|
|
- var jsonBytes []byte
|
|
|
|
|
|
+ if err = it.Error(); err == nil {
|
|
|
|
+ var node data.Node
|
|
|
|
|
|
- if jsonBytes, err = json.Marshal(node.Data()); err == nil {
|
|
|
|
- _, err = out.Write(jsonBytes)
|
|
|
|
- fmt.Fprintln(out)
|
|
|
|
|
|
+ if node, err = gm.FetchNode(part, key, k); err == nil {
|
|
|
|
+ var jsonBytes []byte
|
|
|
|
+ if jsonBytes, err = json.Marshal(node.Data()); err == nil {
|
|
|
|
+ _, err = out.Write(jsonBytes)
|
|
|
|
+ fmt.Fprintln(out)
|
|
|
|
+ }
|
|
}
|
|
}
|
|
}
|
|
}
|
|
- }
|
|
|
|
|
|
|
|
- if err != nil {
|
|
|
|
- break
|
|
|
|
|
|
+ if err != nil {
|
|
|
|
+ break
|
|
|
|
+ }
|
|
}
|
|
}
|
|
}
|
|
}
|
|
- }
|
|
|
|
|
|
|
|
- if err != nil {
|
|
|
|
- break
|
|
|
|
|
|
+ if err != nil {
|
|
|
|
+ break
|
|
|
|
+ }
|
|
}
|
|
}
|
|
}
|
|
}
|
|
- }
|
|
|
|
|
|
|
|
- out, err = ewf.CreateWriter(fmt.Sprintf("%v-edges", part))
|
|
|
|
|
|
+ if err == nil {
|
|
|
|
+ if out, err = ewf.CreateWriter(fmt.Sprintf("%v-edges", part)); err == nil {
|
|
|
|
+ for _, k := range gm.EdgeKinds() {
|
|
|
|
+ var tree *hash.HTree
|
|
|
|
|
|
- if err == nil {
|
|
|
|
- for _, k := range gm.EdgeKinds() {
|
|
|
|
- var tree *hash.HTree
|
|
|
|
|
|
+ if tree, err = gm.getEdgeStorageHTree(part, k, false); err == nil {
|
|
|
|
+ gm.mutex.RLock()
|
|
|
|
+ it := hash.NewHTreeIterator(tree)
|
|
|
|
+ gm.mutex.RUnlock()
|
|
|
|
|
|
- if tree, err = gm.getEdgeStorageHTree(part, k, false); err == nil {
|
|
|
|
- gm.mutex.RLock()
|
|
|
|
- it := hash.NewHTreeIterator(tree)
|
|
|
|
- gm.mutex.RUnlock()
|
|
|
|
|
|
+ if err = it.LastError; err == nil {
|
|
|
|
|
|
- if err = it.LastError; err == nil {
|
|
|
|
|
|
+ for it.HasNext() {
|
|
|
|
+ gm.mutex.RLock()
|
|
|
|
+ binaryKey, _ := it.Next()
|
|
|
|
+ gm.mutex.RUnlock()
|
|
|
|
|
|
- for it.HasNext() {
|
|
|
|
- gm.mutex.RLock()
|
|
|
|
- binaryKey, _ := it.Next()
|
|
|
|
- gm.mutex.RUnlock()
|
|
|
|
|
|
+ if prefix := binaryKey[:len(PrefixNSAttrs)]; string(prefix) != PrefixNSAttrs {
|
|
|
|
+ continue
|
|
|
|
+ }
|
|
|
|
|
|
- if prefix := binaryKey[:len(PrefixNSAttrs)]; string(prefix) != PrefixNSAttrs {
|
|
|
|
- continue
|
|
|
|
- }
|
|
|
|
|
|
+ key := string(binaryKey[len(PrefixNSAttrs):])
|
|
|
|
|
|
- key := string(binaryKey[len(PrefixNSAttrs):])
|
|
|
|
|
|
+ if err = it.LastError; err == nil {
|
|
|
|
+ var node data.Node
|
|
|
|
|
|
- if err = it.LastError; err == nil {
|
|
|
|
- var node data.Node
|
|
|
|
|
|
+ if node, err = gm.FetchEdge(part, key, k); err == nil {
|
|
|
|
+ var jsonBytes []byte
|
|
|
|
|
|
- if node, err = gm.FetchEdge(part, key, k); err == nil {
|
|
|
|
- var jsonBytes []byte
|
|
|
|
|
|
+ if jsonBytes, err = json.Marshal(node.Data()); err == nil {
|
|
|
|
+ if _, err = out.Write(jsonBytes); err == nil {
|
|
|
|
+ _, err = fmt.Fprintln(out)
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
|
|
- if jsonBytes, err = json.Marshal(node.Data()); err == nil {
|
|
|
|
- _, err = out.Write(jsonBytes)
|
|
|
|
- fmt.Fprintln(out)
|
|
|
|
|
|
+ if err != nil {
|
|
|
|
+ break
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
+ }
|
|
|
|
|
|
- if err != nil {
|
|
|
|
- break
|
|
|
|
- }
|
|
|
|
|
|
+ if err != nil {
|
|
|
|
+ break
|
|
}
|
|
}
|
|
- }
|
|
|
|
- }
|
|
|
|
|
|
|
|
- if err != nil {
|
|
|
|
- break
|
|
|
|
|
|
+ }
|
|
}
|
|
}
|
|
|
|
+ }
|
|
|
|
|
|
|
|
+ if err != nil {
|
|
|
|
+ break
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
@@ -364,7 +372,7 @@ func LargeScaleExportPartition(ewf ExportWriterFactory, part string, gm *Manager
|
|
ImportReaderFactory produces a named reader.
|
|
ImportReaderFactory produces a named reader.
|
|
*/
|
|
*/
|
|
type ImportReaderFactory interface {
|
|
type ImportReaderFactory interface {
|
|
- Readers() []string
|
|
|
|
|
|
+ Readers() ([]string, error)
|
|
CreateReader(name string) (io.Reader, error)
|
|
CreateReader(name string) (io.Reader, error)
|
|
}
|
|
}
|
|
|
|
|
|
@@ -375,45 +383,45 @@ LargeScaleImportPartition dumps the contents of a partition into multiple io.Wri
|
|
{ <attr> : <value>, ... },
|
|
{ <attr> : <value>, ... },
|
|
...
|
|
...
|
|
*/
|
|
*/
|
|
-func LargeScaleImportPartition(irf ImportReaderFactory, part string, gm *Manager) error {
|
|
|
|
- var err error
|
|
|
|
|
|
+func LargeScaleImportPartition(irf ImportReaderFactory, gm *Manager) error {
|
|
|
|
+ readers, err := irf.Readers()
|
|
|
|
|
|
- readers := irf.Readers()
|
|
|
|
|
|
+ if err == nil {
|
|
|
|
+ trans := NewRollingTrans(NewGraphTrans(gm), 1000, gm, NewGraphTrans)
|
|
|
|
|
|
- trans := NewRollingTrans(NewGraphTrans(gm), 1000, gm, NewGraphTrans)
|
|
|
|
|
|
+ for _, r := range readers {
|
|
|
|
+ var in io.Reader
|
|
|
|
|
|
- for _, r := range readers {
|
|
|
|
- var in io.Reader
|
|
|
|
|
|
+ if in, err = irf.CreateReader(r); err == nil {
|
|
|
|
+ isNode := strings.HasSuffix(r, "-nodes")
|
|
|
|
|
|
- if in, err = irf.CreateReader(r); err == nil {
|
|
|
|
- isNode := strings.HasSuffix(r, "-nodes")
|
|
|
|
|
|
+ scanner := bufio.NewScanner(in)
|
|
|
|
+ for scanner.Scan() {
|
|
|
|
+ var nodeData map[string]interface{}
|
|
|
|
|
|
- scanner := bufio.NewScanner(in)
|
|
|
|
- for scanner.Scan() {
|
|
|
|
- var nodeData map[string]interface{}
|
|
|
|
|
|
+ if err = json.Unmarshal(scanner.Bytes(), &nodeData); err == nil {
|
|
|
|
+ if isNode {
|
|
|
|
+ err = trans.StoreNode(strings.TrimSuffix(r, "-nodes"), data.NewGraphNodeFromMap(nodeData))
|
|
|
|
+ } else {
|
|
|
|
+ err = trans.StoreEdge(strings.TrimSuffix(r, "-edges"), data.NewGraphEdgeFromNode(data.NewGraphNodeFromMap(nodeData)))
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
|
|
- if err = json.Unmarshal(scanner.Bytes(), &nodeData); err == nil {
|
|
|
|
- if isNode {
|
|
|
|
- err = gm.StoreNode(part, data.NewGraphNodeFromMap(nodeData))
|
|
|
|
- } else {
|
|
|
|
- err = gm.StoreEdge(part, data.NewGraphEdgeFromNode(data.NewGraphNodeFromMap(nodeData)))
|
|
|
|
|
|
+ if err != nil {
|
|
|
|
+ break
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
+ }
|
|
|
|
|
|
- if err != nil {
|
|
|
|
- break
|
|
|
|
- }
|
|
|
|
|
|
+ if err != nil {
|
|
|
|
+ break
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
- if err != nil {
|
|
|
|
- break
|
|
|
|
|
|
+ if err == nil {
|
|
|
|
+ err = trans.Commit()
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
- if err == nil {
|
|
|
|
- err = trans.Commit()
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
return err
|
|
return err
|
|
}
|
|
}
|