client.go 7.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359
  1. /*
  2. * Rufs - Remote Union File System
  3. *
  4. * Copyright 2017 Matthias Ladkau. All rights reserved.
  5. *
  6. * This Source Code Form is subject to the terms of the MIT
  7. * License, If a copy of the MIT License was not distributed with this
  8. * file, You can obtain one at https://opensource.org/licenses/MIT.
  9. */
  10. /*
  11. Package node contains the network communication code for Rufs via RPC calls.
  12. */
  13. package node
  14. import (
  15. "crypto/tls"
  16. "encoding/gob"
  17. "fmt"
  18. "io"
  19. "net"
  20. "net/rpc"
  21. "os"
  22. "sort"
  23. "strings"
  24. "sync"
  25. "time"
  26. )
  27. func init() {
  28. // Make sure we can use the relevant types in a gob operation
  29. gob.Register(&RufsNodeToken{})
  30. gob.Register(map[string]string{})
  31. }
  32. /*
  33. DialTimeout is the dial timeout for RPC connections
  34. */
  35. var DialTimeout = 10 * time.Second
  36. /*
  37. RufsNodeToken is used to authenticate a node in the network to other nodes
  38. */
  39. type RufsNodeToken struct {
  40. NodeName string
  41. NodeAuth string
  42. }
  43. /*
  44. Client is the client for the RPC API of a node.
  45. */
  46. type Client struct {
  47. token *RufsNodeToken // Token to be send to other nodes for authentication
  48. rpc string // This client's rpc network interface (may be empty in case of pure clients)
  49. peers map[string]string // Map of node names to their rpc network interface
  50. conns map[string]*rpc.Client // Map of node names to network connections
  51. fingerprints map[string]string // Map of expected server certificate fingerprints
  52. cert *tls.Certificate // Client certificate
  53. maplock *sync.RWMutex // Lock for maps
  54. redial bool // Flag if this client is attempting a redial
  55. }
  56. /*
  57. SSLFingerprint returns the SSL fingerprint of the client.
  58. */
  59. func (c *Client) SSLFingerprint() string {
  60. var ret string
  61. if c.cert != nil && c.cert.Certificate[0] != nil {
  62. ret = fingerprint(c.cert.Certificate[0])
  63. }
  64. return ret
  65. }
  66. /*
  67. Shutdown closes all stored connections.
  68. */
  69. func (c *Client) Shutdown() {
  70. c.maplock.Lock()
  71. defer c.maplock.Unlock()
  72. for _, c := range c.conns {
  73. c.Close()
  74. }
  75. c.conns = make(map[string]*rpc.Client)
  76. }
  77. /*
  78. RegisterPeer registers a new peer to communicate with. An empty fingerprint
  79. means that the client will accept any certificate from the server.
  80. */
  81. func (c *Client) RegisterPeer(node string, rpc string, fingerprint string) error {
  82. if _, ok := c.peers[node]; ok {
  83. return fmt.Errorf("Peer already registered: %v", node)
  84. } else if rpc == "" {
  85. return fmt.Errorf("RPC interface must not be empty")
  86. }
  87. c.maplock.Lock()
  88. c.peers[node] = rpc
  89. delete(c.conns, node)
  90. c.fingerprints[node] = fingerprint
  91. c.maplock.Unlock()
  92. return nil
  93. }
  94. /*
  95. Peers returns all registered peers and their expected fingerprints.
  96. */
  97. func (c *Client) Peers() ([]string, []string) {
  98. ret := make([]string, 0, len(c.peers))
  99. fps := make([]string, len(c.peers))
  100. c.maplock.Lock()
  101. defer c.maplock.Unlock()
  102. for k := range c.peers {
  103. ret = append(ret, k)
  104. }
  105. sort.Strings(ret)
  106. for i, node := range ret {
  107. fps[i] = c.fingerprints[node]
  108. }
  109. return ret, fps
  110. }
  111. /*
  112. RemovePeer removes a registered peer.
  113. */
  114. func (c *Client) RemovePeer(node string) {
  115. c.maplock.Lock()
  116. delete(c.peers, node)
  117. delete(c.conns, node)
  118. delete(c.fingerprints, node)
  119. c.maplock.Unlock()
  120. }
  121. /*
  122. SendPing sends a ping to a node and returns the result. Second argument is
  123. optional if the target member is not a known peer. Should be an empty string
  124. in all other cases. Returns the answer, the fingerprint of the presented server
  125. certificate and any errors.
  126. */
  127. func (c *Client) SendPing(node string, rpc string) ([]string, string, error) {
  128. var ret []string
  129. var fp string
  130. if _, ok := c.peers[node]; !ok && rpc != "" {
  131. // Add member temporary if it was not registered
  132. c.maplock.Lock()
  133. c.peers[node] = rpc
  134. c.maplock.Unlock()
  135. defer func() {
  136. c.maplock.Lock()
  137. delete(c.peers, node)
  138. delete(c.conns, node)
  139. delete(c.fingerprints, node)
  140. c.maplock.Unlock()
  141. }()
  142. }
  143. res, err := c.SendRequest(node, RPCPing, nil)
  144. if res != nil && err == nil {
  145. ret = res.([]string)
  146. c.maplock.Lock()
  147. fp = c.fingerprints[node]
  148. c.maplock.Unlock()
  149. }
  150. return ret, fp, err
  151. }
  152. /*
  153. SendData sends a portion of data and some control information to a node and
  154. returns the result.
  155. */
  156. func (c *Client) SendData(node string, ctrl map[string]string, data []byte) ([]byte, error) {
  157. if _, ok := c.peers[node]; !ok {
  158. return nil, fmt.Errorf("Unknown peer: %v", node)
  159. }
  160. res, err := c.SendRequest(node, RPCData, map[RequestArgument]interface{}{
  161. RequestCTRL: ctrl,
  162. RequestDATA: data,
  163. })
  164. if res != nil {
  165. return res.([]byte), err
  166. }
  167. return nil, err
  168. }
  169. /*
  170. SendRequest sends a request to another node.
  171. */
  172. func (c *Client) SendRequest(node string, remoteCall RPCFunction,
  173. args map[RequestArgument]interface{}) (interface{}, error) {
  174. var err error
  175. // Function to categorize errors
  176. handleError := func(err error) error {
  177. if _, ok := err.(net.Error); ok {
  178. return &Error{ErrNodeComm, err.Error(), false}
  179. }
  180. // Wrap remote errors in a proper error object
  181. if err != nil && !strings.HasPrefix(err.Error(), "RufsError: ") {
  182. // Check if the error is known to report that a file or directory
  183. // does not exist.
  184. err = &Error{ErrRemoteAction, err.Error(), err.Error() == os.ErrNotExist.Error()}
  185. }
  186. return err
  187. }
  188. c.maplock.Lock()
  189. laddr, ok := c.peers[node]
  190. c.maplock.Unlock()
  191. if ok {
  192. // Get network connection to the node
  193. c.maplock.Lock()
  194. conn, ok := c.conns[node]
  195. c.maplock.Unlock()
  196. if !ok {
  197. // Create a new connection if necessary
  198. nconn, err := net.DialTimeout("tcp", laddr, DialTimeout)
  199. if err != nil {
  200. LogDebug(c.token.NodeName, ": ",
  201. fmt.Sprintf("- %v.%v (laddr=%v err=%v)",
  202. node, remoteCall, laddr, err))
  203. return nil, handleError(err)
  204. }
  205. if c.cert != nil && c.cert.Certificate[0] != nil {
  206. // Wrap the conn in a TLS client
  207. config := tls.Config{
  208. Certificates: []tls.Certificate{*c.cert},
  209. InsecureSkipVerify: true,
  210. }
  211. tlsconn := tls.Client(nconn, &config)
  212. // Do the handshake and look at the server certificate
  213. tlsconn.Handshake()
  214. rfp := fingerprint(tlsconn.ConnectionState().PeerCertificates[0].Raw)
  215. c.maplock.Lock()
  216. expected, _ := c.fingerprints[node]
  217. c.maplock.Unlock()
  218. if expected == "" {
  219. // Accept the certificate and store it
  220. c.maplock.Lock()
  221. c.fingerprints[node] = rfp
  222. c.maplock.Unlock()
  223. } else if expected != rfp {
  224. // Fingerprint was NOT verified
  225. LogDebug(c.token.NodeName, ": ",
  226. fmt.Sprintf("Not trusting %v (laddr=%v) presented fingerprint: %v expected fingerprint: %v", node, laddr, rfp, expected))
  227. return nil, &Error{ErrUntrustedTarget, node, false}
  228. }
  229. LogDebug(c.token.NodeName, ": ",
  230. fmt.Sprintf("%v (laddr=%v) has SSL fingerprint %v ", node, laddr, rfp))
  231. nconn = tlsconn
  232. }
  233. conn = rpc.NewClient(nconn)
  234. // Store the connection so it can be reused
  235. c.maplock.Lock()
  236. c.conns[node] = conn
  237. c.maplock.Unlock()
  238. }
  239. // Assemble the request
  240. request := map[RequestArgument]interface{}{
  241. RequestTARGET: node,
  242. RequestTOKEN: c.token,
  243. }
  244. if args != nil {
  245. for k, v := range args {
  246. request[k] = v
  247. }
  248. }
  249. var response interface{}
  250. LogDebug(c.token.NodeName, ": ",
  251. fmt.Sprintf("> %v.%v (laddr=%v)", node, remoteCall, laddr))
  252. err = conn.Call("RufsServer."+string(remoteCall), request, &response)
  253. if !c.redial && (err == rpc.ErrShutdown || err == io.EOF || err == io.ErrUnexpectedEOF) {
  254. // Delete the closed connection and retry the request
  255. c.maplock.Lock()
  256. delete(c.conns, node)
  257. c.redial = true // Set the redial flag to avoid a forever loop
  258. c.maplock.Unlock()
  259. return c.SendRequest(node, remoteCall, args)
  260. }
  261. // Reset redial flag
  262. c.maplock.Lock()
  263. c.redial = false
  264. c.maplock.Unlock()
  265. LogDebug(c.token.NodeName, ": ",
  266. fmt.Sprintf("< %v.%v (err=%v)", node, remoteCall, err))
  267. return response, handleError(err)
  268. }
  269. return nil, &Error{ErrUnknownTarget, node, false}
  270. }