graphql-subscriptions.go 5.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241
  1. /*
  2. * EliasDB
  3. *
  4. * Copyright 2016 Matthias Ladkau. All rights reserved.
  5. *
  6. * This Source Code Form is subject to the terms of the Mozilla Public
  7. * License, v. 2.0. If a copy of the MPL was not distributed with this
  8. * file, You can obtain one at http://mozilla.org/MPL/2.0/.
  9. */
  10. package v1
  11. import (
  12. "encoding/json"
  13. "fmt"
  14. "net/http"
  15. "time"
  16. "github.com/gorilla/websocket"
  17. "devt.de/krotik/common/stringutil"
  18. "devt.de/krotik/eliasdb/api"
  19. "devt.de/krotik/eliasdb/graphql"
  20. )
  21. /*
  22. EndpointGraphQLSubscriptions is the GraphQL endpoint URL for subscriptions (rooted). Handles websockets under graphql-subscriptions/
  23. */
  24. const EndpointGraphQLSubscriptions = api.APIRoot + APIv1 + "/graphql-subscriptions/"
  25. /*
  26. upgrader can upgrade normal requests to websocket communications
  27. */
  28. var upgrader = websocket.Upgrader{
  29. Subprotocols: []string{"graphql-subscriptions"},
  30. ReadBufferSize: 1024,
  31. WriteBufferSize: 1024,
  32. }
  33. var subscriptionCallbackError error
  34. /*
  35. GraphQLSubscriptionsEndpointInst creates a new endpoint handler.
  36. */
  37. func GraphQLSubscriptionsEndpointInst() api.RestEndpointHandler {
  38. return &graphQLSubscriptionsEndpoint{}
  39. }
  40. /*
  41. Handler object for GraphQL operations.
  42. */
  43. type graphQLSubscriptionsEndpoint struct {
  44. *api.DefaultEndpointHandler
  45. }
  46. /*
  47. HandleGET handles GraphQL subscription queries.
  48. */
  49. func (e *graphQLSubscriptionsEndpoint) HandleGET(w http.ResponseWriter, r *http.Request, resources []string) {
  50. // Update the incomming connection to a websocket
  51. // If the upgrade fails then the client gets an HTTP error response.
  52. conn, err := upgrader.Upgrade(w, r, nil)
  53. if err != nil {
  54. // We give details here on what went wrong
  55. w.Write([]byte(err.Error()))
  56. return
  57. }
  58. subID := ""
  59. // Ensure we have a partition to query
  60. partition := r.URL.Query().Get("partition")
  61. if partition == "" && len(resources) > 0 {
  62. partition = resources[0]
  63. }
  64. if partition == "" {
  65. e.WriteError(conn, subID, "Need a 'partition' in path or as url parameter", true)
  66. return
  67. }
  68. conn.WriteMessage(websocket.TextMessage, []byte(`{"type":"init_success","payload":{}}`))
  69. // Create the callback handler for the subscription
  70. callbackHandler := &subscriptionCallbackHandler{
  71. finished: false,
  72. publish: func(data map[string]interface{}, err error) {
  73. var res []byte
  74. // Error for unit testing
  75. err = subscriptionCallbackError
  76. // This is called if data im the datastore changes
  77. if err == nil {
  78. res, err = json.Marshal(map[string]interface{}{
  79. "id": subID,
  80. "type": "subscription_data",
  81. "payload": data,
  82. })
  83. }
  84. if err != nil {
  85. e.WriteError(conn, subID, err.Error(), true)
  86. return
  87. }
  88. conn.WriteMessage(websocket.TextMessage, res)
  89. },
  90. }
  91. for {
  92. // Read websocket message
  93. _, msg, err := conn.ReadMessage()
  94. if err != nil {
  95. // Unregister the callback handler
  96. callbackHandler.finished = true
  97. // If the client is still listening write the error message
  98. // This is a NOP if the client hang up
  99. e.WriteError(conn, subID, err.Error(), true)
  100. return
  101. }
  102. data := make(map[string]interface{})
  103. if err := json.Unmarshal(msg, &data); err != nil {
  104. e.WriteError(conn, subID, err.Error(), false)
  105. continue
  106. }
  107. // Check we got a message with a type
  108. if msgType, ok := data["type"]; ok {
  109. // Check if the user wants to start a new subscription
  110. if _, ok := data["query"]; msgType == "subscription_start" && ok {
  111. var res []byte
  112. subID = fmt.Sprint(data["id"])
  113. if _, ok := data["variables"]; !ok {
  114. data["variables"] = nil
  115. }
  116. if _, ok := data["operationName"]; !ok {
  117. data["operationName"] = nil
  118. }
  119. resData, err := graphql.RunQuery(stringutil.CreateDisplayString(partition)+" query",
  120. partition, data, api.GM, callbackHandler, false)
  121. if err == nil {
  122. res, err = json.Marshal(map[string]interface{}{
  123. "id": subID,
  124. "type": "subscription_data",
  125. "payload": resData,
  126. })
  127. }
  128. if err != nil {
  129. e.WriteError(conn, subID, err.Error(), false)
  130. continue
  131. }
  132. conn.WriteMessage(websocket.TextMessage, []byte(
  133. fmt.Sprintf(`{"id":"%s","type":"subscription_success","payload":{}}`, subID)))
  134. conn.WriteMessage(websocket.TextMessage, res)
  135. }
  136. }
  137. }
  138. }
  139. /*
  140. WriteError writes an error message to the websocket.
  141. */
  142. func (e *graphQLSubscriptionsEndpoint) WriteError(conn *websocket.Conn,
  143. subID string, msg string, close bool) {
  144. // Write the error as cleartext message
  145. data, _ := json.Marshal(map[string]interface{}{
  146. "id": subID,
  147. "type": "subscription_fail",
  148. "payload": map[string]interface{}{
  149. "errors": []string{msg},
  150. },
  151. })
  152. conn.WriteMessage(websocket.TextMessage, data)
  153. if close {
  154. // Write error as closing control message
  155. conn.WriteControl(websocket.CloseMessage,
  156. websocket.FormatCloseMessage(
  157. websocket.CloseUnsupportedData, msg), time.Now().Add(10*time.Second))
  158. conn.Close()
  159. }
  160. }
  161. /*
  162. SwaggerDefs is used to describe the endpoint in swagger.
  163. */
  164. func (e *graphQLSubscriptionsEndpoint) SwaggerDefs(s map[string]interface{}) {
  165. // No swagger definitions for this endpoint as it only handles websocket requests
  166. }
  167. // Callback Handler
  168. // ================
  169. /*
  170. subscriptionCallbackHandler pushes new events to a subscription client via a websocket.
  171. */
  172. type subscriptionCallbackHandler struct {
  173. finished bool
  174. publish func(data map[string]interface{}, err error)
  175. }
  176. func (ch *subscriptionCallbackHandler) Publish(data map[string]interface{}, err error) {
  177. ch.publish(data, err)
  178. }
  179. func (ch *subscriptionCallbackHandler) IsFinished() bool {
  180. return ch.finished
  181. }