node.go 4.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170
  1. /*
  2. * Rufs - Remote Union File System
  3. *
  4. * Copyright 2017 Matthias Ladkau. All rights reserved.
  5. *
  6. * This Source Code Form is subject to the terms of the MIT
  7. * License, If a copy of the MIT License was not distributed with this
  8. * file, You can obtain one at https://opensource.org/licenses/MIT.
  9. */
  10. package node
  11. import (
  12. "crypto/sha512"
  13. "crypto/tls"
  14. "fmt"
  15. "net"
  16. "net/rpc"
  17. "sync"
  18. )
  19. /*
  20. RequestHandler is a function to handle incoming requests. A request has a
  21. control object which contains information on what the data is and how it
  22. should be used and the data itself. The request handler should return
  23. the result or an error.
  24. */
  25. type RequestHandler func(ctrl map[string]string, data []byte) ([]byte, error)
  26. /*
  27. RufsNode is the management object for a node in the Rufs network.
  28. A RufsNode registers itself to the rpc server which is the global
  29. server object. Each node needs to have a unique name. Communication between nodes
  30. is secured by using a secret string which is never exchanged over the network
  31. and a hash generated token which identifies a member.
  32. Each RufsNode object contains a Client object which can be used to communicate
  33. with other nodes. This object should be used by pure clients - code which should
  34. communicate with the cluster without running an actual member.
  35. */
  36. type RufsNode struct {
  37. name string // Name of the node
  38. secret string // Network wide secret
  39. Client *Client // RPC client object
  40. listener net.Listener // RPC server listener
  41. wg sync.WaitGroup // RPC server Waitgroup for listener shutdown
  42. DataHandler RequestHandler // Handler function for data requests
  43. cert *tls.Certificate // Node certificate
  44. }
  45. /*
  46. NewNode create a new RufsNode object.
  47. */
  48. func NewNode(rpcInterface string, name string, secret string, clientCert *tls.Certificate,
  49. dataHandler RequestHandler) *RufsNode {
  50. // Generate node token
  51. token := &RufsNodeToken{name, fmt.Sprintf("%X", sha512.Sum512_224([]byte(name+secret)))}
  52. rn := &RufsNode{name, secret, &Client{token, rpcInterface, make(map[string]string),
  53. make(map[string]*rpc.Client), make(map[string]string), clientCert, &sync.RWMutex{}, false},
  54. nil, sync.WaitGroup{}, dataHandler, clientCert}
  55. return rn
  56. }
  57. /*
  58. NewClient create a new Client object.
  59. */
  60. func NewClient(secret string, clientCert *tls.Certificate) *Client {
  61. return NewNode("", "", secret, clientCert, nil).Client
  62. }
  63. // General node API
  64. // ================
  65. /*
  66. Name returns the name of the node.
  67. */
  68. func (rn *RufsNode) Name() string {
  69. return rn.name
  70. }
  71. /*
  72. SSLFingerprint returns the SSL fingerprint of the node.
  73. */
  74. func (rn *RufsNode) SSLFingerprint() string {
  75. var ret string
  76. if rn.cert != nil && rn.cert.Certificate[0] != nil {
  77. ret = fingerprint(rn.cert.Certificate[0])
  78. }
  79. return ret
  80. }
  81. /*
  82. LogInfo logs a node related message at info level.
  83. */
  84. func (rn *RufsNode) LogInfo(v ...interface{}) {
  85. LogInfo(rn.name, ": ", fmt.Sprint(v...))
  86. }
  87. /*
  88. Start starts process for this node.
  89. */
  90. func (rn *RufsNode) Start(serverCert *tls.Certificate) error {
  91. if _, ok := rufsServer.nodes[rn.name]; ok {
  92. return fmt.Errorf("Cannot start node %s twice", rn.name)
  93. }
  94. rn.LogInfo("Starting node ", rn.name, " rpc server on: ", rn.Client.rpc)
  95. l, err := net.Listen("tcp", rn.Client.rpc)
  96. if err != nil {
  97. return err
  98. }
  99. if serverCert != nil && serverCert.Certificate[0] != nil {
  100. rn.cert = serverCert
  101. rn.LogInfo("SSL fingerprint: ", rn.SSLFingerprint())
  102. // Wrap the listener in a TLS listener
  103. config := tls.Config{Certificates: []tls.Certificate{*serverCert}}
  104. l = tls.NewListener(l, &config)
  105. }
  106. // Kick of the rpc listener
  107. go func() {
  108. rpc.Accept(l)
  109. rn.wg.Done()
  110. rn.LogInfo("Connection closed: ", rn.Client.rpc)
  111. }()
  112. rn.listener = l
  113. // Register this node in the global server map
  114. rufsServer.nodes[rn.name] = rn
  115. return nil
  116. }
  117. /*
  118. Shutdown shuts the member manager rpc server for this cluster member down.
  119. */
  120. func (rn *RufsNode) Shutdown() error {
  121. var err error
  122. // Close socket
  123. if rn.listener != nil {
  124. rn.LogInfo("Shutdown rpc server on: ", rn.Client.rpc)
  125. rn.wg.Add(1)
  126. err = rn.listener.Close()
  127. rn.Client.Shutdown()
  128. rn.listener = nil
  129. rn.wg.Wait()
  130. delete(rufsServer.nodes, rn.name)
  131. } else {
  132. LogDebug("Node ", rn.name, " already shut down")
  133. }
  134. return err
  135. }