graphql-subscriptions.go 6.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275
  1. /*
  2. * EliasDB
  3. *
  4. * Copyright 2016 Matthias Ladkau. All rights reserved.
  5. *
  6. * This Source Code Form is subject to the terms of the Mozilla Public
  7. * License, v. 2.0. If a copy of the MPL was not distributed with this
  8. * file, You can obtain one at http://mozilla.org/MPL/2.0/.
  9. */
  10. package v1
  11. import (
  12. "encoding/json"
  13. "fmt"
  14. "net/http"
  15. "sync"
  16. "time"
  17. "github.com/gorilla/websocket"
  18. "devt.de/krotik/common/stringutil"
  19. "devt.de/krotik/eliasdb/api"
  20. "devt.de/krotik/eliasdb/graphql"
  21. )
  22. /*
  23. EndpointGraphQLSubscriptions is the GraphQL endpoint URL for subscriptions (rooted). Handles websockets under graphql-subscriptions/
  24. */
  25. const EndpointGraphQLSubscriptions = api.APIRoot + APIv1 + "/graphql-subscriptions/"
  26. /*
  27. upgrader can upgrade normal requests to websocket communications
  28. */
  29. var upgrader = websocket.Upgrader{
  30. Subprotocols: []string{"graphql-subscriptions"},
  31. ReadBufferSize: 1024,
  32. WriteBufferSize: 1024,
  33. }
  34. var subscriptionCallbackError error
  35. /*
  36. GraphQLSubscriptionsEndpointInst creates a new endpoint handler.
  37. */
  38. func GraphQLSubscriptionsEndpointInst() api.RestEndpointHandler {
  39. return &graphQLSubscriptionsEndpoint{}
  40. }
  41. /*
  42. Handler object for GraphQL operations.
  43. */
  44. type graphQLSubscriptionsEndpoint struct {
  45. *api.DefaultEndpointHandler
  46. }
  47. /*
  48. HandleGET handles GraphQL subscription queries.
  49. */
  50. func (e *graphQLSubscriptionsEndpoint) HandleGET(w http.ResponseWriter, r *http.Request, resources []string) {
  51. // Update the incomming connection to a websocket
  52. // If the upgrade fails then the client gets an HTTP error response.
  53. conn, err := upgrader.Upgrade(w, r, nil)
  54. // Websocket connections support one concurrent reader and one concurrent writer.
  55. // See: https://godoc.org/github.com/gorilla/websocket#hdr-Concurrency
  56. connRMutex := &sync.Mutex{}
  57. connWMutex := &sync.Mutex{}
  58. if err != nil {
  59. // We give details here on what went wrong
  60. w.Write([]byte(err.Error()))
  61. return
  62. }
  63. subID := ""
  64. // Ensure we have a partition to query
  65. partition := r.URL.Query().Get("partition")
  66. if partition == "" && len(resources) > 0 {
  67. partition = resources[0]
  68. }
  69. if partition == "" {
  70. connWMutex.Lock()
  71. e.WriteError(conn, subID, "Need a 'partition' in path or as url parameter", true)
  72. connWMutex.Unlock()
  73. return
  74. }
  75. connWMutex.Lock()
  76. conn.WriteMessage(websocket.TextMessage, []byte(`{"type":"init_success","payload":{}}`))
  77. connWMutex.Unlock()
  78. // Create the callback handler for the subscription
  79. callbackHandler := &subscriptionCallbackHandler{
  80. finished: false,
  81. publish: func(data map[string]interface{}, err error) {
  82. var res []byte
  83. // Error for unit testing
  84. err = subscriptionCallbackError
  85. // This is called if data im the datastore changes
  86. if err == nil {
  87. res, err = json.Marshal(map[string]interface{}{
  88. "id": subID,
  89. "type": "subscription_data",
  90. "payload": data,
  91. })
  92. }
  93. if err != nil {
  94. connWMutex.Lock()
  95. e.WriteError(conn, subID, err.Error(), true)
  96. connWMutex.Unlock()
  97. return
  98. }
  99. connWMutex.Lock()
  100. conn.WriteMessage(websocket.TextMessage, res)
  101. connWMutex.Unlock()
  102. },
  103. }
  104. for {
  105. // Read websocket message
  106. connRMutex.Lock()
  107. _, msg, err := conn.ReadMessage()
  108. connRMutex.Unlock()
  109. if err != nil {
  110. // Unregister the callback handler
  111. callbackHandler.finished = true
  112. // If the client is still listening write the error message
  113. // This is a NOP if the client hang up
  114. connWMutex.Lock()
  115. e.WriteError(conn, subID, err.Error(), true)
  116. connWMutex.Unlock()
  117. return
  118. }
  119. data := make(map[string]interface{})
  120. if err := json.Unmarshal(msg, &data); err != nil {
  121. connWMutex.Lock()
  122. e.WriteError(conn, subID, err.Error(), false)
  123. connWMutex.Unlock()
  124. continue
  125. }
  126. // Check we got a message with a type
  127. if msgType, ok := data["type"]; ok {
  128. // Check if the user wants to start a new subscription
  129. if _, ok := data["query"]; msgType == "subscription_start" && ok {
  130. var res []byte
  131. subID = fmt.Sprint(data["id"])
  132. if _, ok := data["variables"]; !ok {
  133. data["variables"] = nil
  134. }
  135. if _, ok := data["operationName"]; !ok {
  136. data["operationName"] = nil
  137. }
  138. resData, err := graphql.RunQuery(stringutil.CreateDisplayString(partition)+" query",
  139. partition, data, api.GM, callbackHandler, false)
  140. if err == nil {
  141. res, err = json.Marshal(map[string]interface{}{
  142. "id": subID,
  143. "type": "subscription_data",
  144. "payload": resData,
  145. })
  146. }
  147. if err != nil {
  148. connWMutex.Lock()
  149. e.WriteError(conn, subID, err.Error(), false)
  150. connWMutex.Unlock()
  151. continue
  152. }
  153. connWMutex.Lock()
  154. conn.WriteMessage(websocket.TextMessage, []byte(
  155. fmt.Sprintf(`{"id":"%s","type":"subscription_success","payload":{}}`, subID)))
  156. conn.WriteMessage(websocket.TextMessage, res)
  157. connWMutex.Unlock()
  158. }
  159. }
  160. }
  161. }
  162. /*
  163. WriteError writes an error message to the websocket.
  164. */
  165. func (e *graphQLSubscriptionsEndpoint) WriteError(conn *websocket.Conn,
  166. subID string, msg string, close bool) {
  167. // Write the error as cleartext message
  168. data, _ := json.Marshal(map[string]interface{}{
  169. "id": subID,
  170. "type": "subscription_fail",
  171. "payload": map[string]interface{}{
  172. "errors": []string{msg},
  173. },
  174. })
  175. conn.WriteMessage(websocket.TextMessage, data)
  176. if close {
  177. // Write error as closing control message
  178. conn.WriteControl(websocket.CloseMessage,
  179. websocket.FormatCloseMessage(
  180. websocket.CloseUnsupportedData, msg), time.Now().Add(10*time.Second))
  181. conn.Close()
  182. }
  183. }
  184. /*
  185. SwaggerDefs is used to describe the endpoint in swagger.
  186. */
  187. func (e *graphQLSubscriptionsEndpoint) SwaggerDefs(s map[string]interface{}) {
  188. // No swagger definitions for this endpoint as it only handles websocket requests
  189. }
  190. // Callback Handler
  191. // ================
  192. /*
  193. subscriptionCallbackHandler pushes new events to a subscription client via a websocket.
  194. */
  195. type subscriptionCallbackHandler struct {
  196. finished bool
  197. publish func(data map[string]interface{}, err error)
  198. }
  199. func (ch *subscriptionCallbackHandler) Publish(data map[string]interface{}, err error) {
  200. ch.publish(data, err)
  201. }
  202. func (ch *subscriptionCallbackHandler) IsFinished() bool {
  203. return ch.finished
  204. }