ecal-sock.go 4.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185
  1. /*
  2. * EliasDB
  3. *
  4. * Copyright 2016 Matthias Ladkau. All rights reserved.
  5. *
  6. * This Source Code Form is subject to the terms of the Mozilla Public
  7. * License, v. 2.0. If a copy of the MPL was not distributed with this
  8. * file, You can obtain one at http://mozilla.org/MPL/2.0/.
  9. */
  10. package v1
  11. import (
  12. "encoding/json"
  13. "fmt"
  14. "io/ioutil"
  15. "net/http"
  16. "strings"
  17. "devt.de/krotik/common/cryptutil"
  18. "devt.de/krotik/common/errorutil"
  19. "devt.de/krotik/common/stringutil"
  20. "devt.de/krotik/ecal/engine"
  21. "devt.de/krotik/ecal/scope"
  22. "devt.de/krotik/eliasdb/api"
  23. "devt.de/krotik/eliasdb/ecal"
  24. "github.com/gorilla/websocket"
  25. )
  26. /*
  27. EndpointECALSock is the ECAL endpoint URL (rooted) for websocket operations. Handles everything under sock/...
  28. */
  29. const EndpointECALSock = api.APIRoot + "/sock/"
  30. /*
  31. upgrader can upgrade normal requests to websocket communications
  32. */
  33. var sockUpgrader = websocket.Upgrader{
  34. Subprotocols: []string{"ecal-sock"},
  35. ReadBufferSize: 1024,
  36. WriteBufferSize: 1024,
  37. }
  38. var sockCallbackError error
  39. /*
  40. ECALSockEndpointInst creates a new endpoint handler.
  41. */
  42. func ECALSockEndpointInst() api.RestEndpointHandler {
  43. return &ecalSockEndpoint{}
  44. }
  45. /*
  46. Handler object for ECAL websocket operations.
  47. */
  48. type ecalSockEndpoint struct {
  49. *api.DefaultEndpointHandler
  50. }
  51. /*
  52. HandleGET handles ECAL websocket operations.
  53. */
  54. func (e *ecalSockEndpoint) HandleGET(w http.ResponseWriter, r *http.Request, resources []string) {
  55. if api.SI != nil {
  56. var body []byte
  57. // Update the incomming connection to a websocket
  58. // If the upgrade fails then the client gets an HTTP error response.
  59. conn, err := sockUpgrader.Upgrade(w, r, nil)
  60. if err != nil {
  61. // We give details here on what went wrong
  62. w.Write([]byte(err.Error()))
  63. return
  64. }
  65. commID := fmt.Sprintf("%x", cryptutil.GenerateUUID())
  66. wc := ecal.NewWebsocketConnection(commID, conn)
  67. wc.Init()
  68. if body, err = ioutil.ReadAll(r.Body); err == nil {
  69. var data interface{}
  70. json.Unmarshal(body, &data)
  71. query := map[interface{}]interface{}{}
  72. for k, v := range r.URL.Query() {
  73. values := make([]interface{}, 0)
  74. for _, val := range v {
  75. values = append(values, val)
  76. }
  77. query[k] = values
  78. }
  79. header := map[interface{}]interface{}{}
  80. for k, v := range r.Header {
  81. header[k] = scope.ConvertJSONToECALObject(v)
  82. }
  83. proc := api.SI.Interpreter.RuntimeProvider.Processor
  84. event := engine.NewEvent(fmt.Sprintf("WebSocketRequest"), []string{"db", "web", "sock"},
  85. map[interface{}]interface{}{
  86. "commID": commID,
  87. "path": strings.Join(resources, "/"),
  88. "pathList": resources,
  89. "bodyString": string(body),
  90. "bodyJSON": scope.ConvertJSONToECALObject(data),
  91. "query": query,
  92. "method": r.Method,
  93. "header": header,
  94. })
  95. // Add event that the websocket has been registered
  96. if _, err = proc.AddEventAndWait(event, nil); err == nil {
  97. api.SI.RegisterECALSock(wc)
  98. defer func() {
  99. api.SI.DeregisterECALSock(wc)
  100. }()
  101. for {
  102. var fatal bool
  103. var data map[string]interface{}
  104. // Read websocket message
  105. if data, fatal, err = wc.ReadData(); err != nil {
  106. wc.WriteData(map[string]interface{}{
  107. "error": err.Error(),
  108. })
  109. if fatal {
  110. break
  111. }
  112. continue
  113. }
  114. if val, ok := data["close"]; ok && stringutil.IsTrueValue(fmt.Sprint(val)) {
  115. wc.Close("")
  116. break
  117. }
  118. event = engine.NewEvent(fmt.Sprintf("WebSocketRequest"), []string{"db", "web", "sock", "data"},
  119. map[interface{}]interface{}{
  120. "commID": commID,
  121. "path": strings.Join(resources, "/"),
  122. "pathList": resources,
  123. "query": query,
  124. "method": r.Method,
  125. "header": header,
  126. "data": scope.ConvertJSONToECALObject(data),
  127. })
  128. _, err = proc.AddEvent(event, nil)
  129. errorutil.AssertOk(err)
  130. }
  131. }
  132. }
  133. if err != nil {
  134. wc.Close(err.Error())
  135. api.SI.Interpreter.RuntimeProvider.Logger.LogDebug(err)
  136. }
  137. return
  138. }
  139. http.Error(w, "Resource was not found", http.StatusNotFound)
  140. }
  141. /*
  142. SwaggerDefs is used to describe the endpoint in swagger.
  143. */
  144. func (e *ecalSockEndpoint) SwaggerDefs(s map[string]interface{}) {
  145. // No swagger definitions for this endpoint as it only handles websocket requests
  146. }