From b1d201306b4e50fa19ae28be0756590b68b6b34a Mon Sep 17 00:00:00 2001 From: moxitech Date: Tue, 8 Oct 2024 11:17:08 +0700 Subject: [PATCH] first commit --- Readme.md | 25 +++++++++++++ go.mod | 5 +++ go.sum | 2 + wsstomp.go | 105 +++++++++++++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 137 insertions(+) create mode 100644 Readme.md create mode 100644 go.mod create mode 100644 go.sum create mode 100644 wsstomp.go diff --git a/Readme.md b/Readme.md new file mode 100644 index 0000000..5d4eaf8 --- /dev/null +++ b/Readme.md @@ -0,0 +1,25 @@ +Real stomp - websocket impl stomp library in go + +> How to use? +>> go get "git.sis.ski/moxitech/realstomp-go" + +Example of create connection: + +``` +// Stomper interface defines the basic methods +type Stomper interface { + Send(topic string, id int, data []byte) error +} + +// StompClient implements the Stomper interface +type StompClient struct { + conn *stomp.Conn + state bool + brokerURL string + waitMessages map[string]*time.Timer + mutex sync.Mutex +} + +// make client from StompClient and invoke connect method + +``` \ No newline at end of file diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..8f46813 --- /dev/null +++ b/go.mod @@ -0,0 +1,5 @@ +module git.sis.ski/moxitech/realstomp-go + +go 1.22.6 + +require github.com/coder/websocket v1.8.12 diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..029cf47 --- /dev/null +++ b/go.sum @@ -0,0 +1,2 @@ +github.com/coder/websocket v1.8.12 h1:5bUXkEPPIbewrnkU8LTCLVaxi4N4J8ahufH2vlo4NAo= +github.com/coder/websocket v1.8.12/go.mod h1:LNVeNrXQZfe5qhS9ALED3uA+l5pPqvwXg3CKoDBB2gs= diff --git a/wsstomp.go b/wsstomp.go new file mode 100644 index 0000000..ce79443 --- /dev/null +++ b/wsstomp.go @@ -0,0 +1,105 @@ +package wsstomp + +import ( + "context" + "fmt" + "net/http" + "time" + + "github.com/coder/websocket" +) + +type WebsocketSTOMP struct { + connection *websocket.Conn + readerBuffer []byte + writeBuffer []byte +} + +const ( + NullByte = 0x00 + LineFeedByte = 0x0a + writeTimeout = 10 * time.Second +) + +// Read reads messages from the websocket connection until the provided array is full. +// Any surplus data is preserved for the next Read call. +func (w *WebsocketSTOMP) Read(p []byte) (int, error) { + if len(w.readerBuffer) == 0 { + _, msg, err := w.connection.Read(context.Background()) + if err != nil { + return 0, err + } + w.readerBuffer = msg + } + + n := copy(p, w.readerBuffer) + w.readerBuffer = w.readerBuffer[n:] + return n, nil +} + +// Write sends data to the websocket. +// The data is buffered until a full STOMP frame is written, then sent in a WS message. +func (w *WebsocketSTOMP) Write(p []byte) (int, error) { + w.writeBuffer = append(w.writeBuffer, p...) + + // Send if we reach a null byte or the message is a single heartbeat (linefeed). + if p[len(p)-1] == NullByte || (len(w.writeBuffer) == 1 && len(p) == 1 && p[0] == LineFeedByte) { + err := w.sendMessage(context.Background()) + if err != nil { + return 0, err + } + } + return len(p), nil +} + +// sendMessage sends the accumulated writeBuffer data via the websocket. +func (w *WebsocketSTOMP) sendMessage(ctx context.Context) error { + ctx, cancel := context.WithTimeout(ctx, writeTimeout) + defer cancel() + + err := w.connection.Write(ctx, websocket.MessageText, w.writeBuffer) + if err != nil { + // Preserve the buffer in case of a write failure. + return fmt.Errorf("failed to write message: %w", err) + } + w.writeBuffer = nil + return nil +} + +// Close closes the websocket connection with a normal closure status. +func (w *WebsocketSTOMP) Close() error { + return w.connection.Close(websocket.StatusNormalClosure, "terminating connection") +} + +// Connect establishes a websocket connection with the provided URL. +// The context is used only for the connection handshake. +func Connect(ctx context.Context, url string, options *websocket.DialOptions) (*WebsocketSTOMP, error) { + if options == nil { + options = &websocket.DialOptions{} + } + if options.HTTPClient == nil { + options.HTTPClient = &http.Client{ + CheckRedirect: func(req *http.Request, via []*http.Request) error { + switch req.URL.Scheme { + case "ws": + req.URL.Scheme = "http" + case "wss": + req.URL.Scheme = "https" + default: + return fmt.Errorf("unexpected url scheme: %q", req.URL.Scheme) + } + return nil + }, + Timeout: 30 * time.Second, + } + } + + con, _, err := websocket.Dial(ctx, url, options) + if err != nil { + return nil, fmt.Errorf("failed to dial websocket: %w", err) + } + + return &WebsocketSTOMP{ + connection: con, + }, nil +}