wsrpc

package module
v0.2.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 31, 2025 License: MIT Imports: 9 Imported by: 0

README

WSRPC

License Build Status Go Report Card Go Reference Version

WSRPC is simple package to allow bidirectional RPC over a WebSocket connection.
The library already provides WebSocket adapters for Gorilla, Fiber, and FastHTTP; for all other cases, it should be quite easy to implement the Conn interface.

Installation:

go get github.com/catamat/wsrpc@latest

Examples:

Gorilla server
package main

import (
	"fmt"
	"io"
	"log"
	"net/http"
	"time"

	"github.com/catamat/wsrpc"
	"github.com/catamat/wsrpc/examples/demodata"
	"github.com/catamat/wsrpc/gorillaws"
	"github.com/gorilla/websocket"
)

// Definition of the API that the client can call on the server.
type ServerAPI struct{}

func (s *ServerAPI) Hello(args string, reply *string) error {
	log.Printf("Server: Received ServerAPI.Hello call with args: %q\n", args)
	*reply = "Server says hello back to: " + args
	return nil
}

func (s *ServerAPI) DelayedHello(args string, reply *string) error {
	log.Printf("Server: Received ServerAPI.DelayedHello call with args: %q - Waiting 10 seconds...\n", args)
	time.Sleep(10 * time.Second)
	*reply = "Server belatedly says hello back to: " + args
	log.Println("Server: Responding to ServerAPI.DelayedHello")
	return nil
}

func (s *ServerAPI) LargePayload(args []byte, reply *int) error {
	payloadLen := len(args)
	log.Printf("Server: Received ServerAPI.LargePayload call with payload size: %d bytes\n", payloadLen)
	*reply = payloadLen
	return nil
}

// Define an upgrader to handle upgrading HTTP connections to WebSocket.
var upgrader = websocket.Upgrader{
	CheckOrigin: func(r *http.Request) bool {
		return true
	},
}

// Handler for the WebSocket connection.
func wsHandler(w http.ResponseWriter, r *http.Request) {
	// Upgrade the HTTP connection to a WebSocket connection
	wsConn, err := upgrader.Upgrade(w, r, nil)
	if err != nil {
		log.Printf("Error upgrading connection to WebSocket: %v\n", err)
		return
	}
	defer wsConn.Close()

	// Wrap the WebSocket connection using gorillaws.Conn
	conn := gorillaws.NewAdapter(wsConn)
	config := wsrpc.DefaultConfig()
	config.LogOutput = io.Discard

	// Create the WSRPC server
	wsrpcServer, err := wsrpc.NewServer(conn, config)
	if err != nil {
		return
	}
	defer wsrpcServer.Close()

	// Register the service on the server for calls from the client
	err = wsrpcServer.Register(&ServerAPI{})
	if err != nil {
		log.Printf("Error registering service on server: %v\n", err)
		return
	}

	// --- Server calls Client ---

	// 1. Hello
	log.Println("Server: Calling ClientAPI.Hello...")

	var clientReplyHello string
	err = wsrpcServer.Call("ClientAPI.Hello", "server initial call", &clientReplyHello)
	if err != nil {
		log.Printf("Server: Error calling ClientAPI.Hello: %v\n", err)
	} else {
		log.Printf("Server: Response from ClientAPI.Hello: %q\n", clientReplyHello)
	}

	// 2. DelayedHello
	log.Println("Server: Calling ClientAPI.DelayedHello (will block)...")

	var clientReplyDelayed string
	err = wsrpcServer.Call("ClientAPI.DelayedHello", "server delayed call", &clientReplyDelayed)
	if err != nil {
		log.Printf("Server: Error calling ClientAPI.DelayedHello: %v\n", err)
	} else {
		log.Printf("Server: Response from ClientAPI.DelayedHello: %q\n", clientReplyDelayed)
	}

	// 3. LargePayload
	log.Println("Server: Calling ClientAPI.LargePayload...")

	largeArgs := []byte(demodata.LargePayload)
	var clientReplyLarge int
	err = wsrpcServer.Call("ClientAPI.LargePayload", largeArgs, &clientReplyLarge)
	if err != nil {
		log.Printf("Server: Error calling ClientAPI.LargePayload: %v\n", err)
	} else {
		log.Printf("Server: Response from ClientAPI.LargePayload (client received size): %d bytes\n", clientReplyLarge)
	}

	// ------

	// Wait until the connection is closed
	<-wsrpcServer.Done()
}

func main() {
	http.HandleFunc("/ws", wsHandler)

	fmt.Println("Server listening on :50505")
	err := http.ListenAndServe(":50505", nil)
	if err != nil {
		log.Println("Error starting HTTP server:", err)
	}
}
Fiber server
package main

import (
	"fmt"
	"io"
	"log"
	"time"

	"github.com/catamat/wsrpc"
	"github.com/catamat/wsrpc/examples/demodata"
	"github.com/catamat/wsrpc/fiberws"
	"github.com/gofiber/contrib/websocket"
	"github.com/gofiber/fiber/v2"
)

// Definition of the API that the client can call on the server.
type ServerAPI struct{}

func (s *ServerAPI) Hello(args string, reply *string) error {
	log.Printf("Server: Received ServerAPI.Hello call with args: %q\n", args)
	*reply = "Server says hello back to: " + args
	return nil
}

func (s *ServerAPI) DelayedHello(args string, reply *string) error {
	log.Printf("Server: Received ServerAPI.DelayedHello call with args: %q - Waiting 10 seconds...\n", args)
	time.Sleep(10 * time.Second)
	*reply = "Server belatedly says hello back to: " + args
	log.Println("Server: Responding to ServerAPI.DelayedHello")
	return nil
}

func (s *ServerAPI) LargePayload(args []byte, reply *int) error {
	payloadLen := len(args)
	log.Printf("Server: Received ServerAPI.LargePayload call with payload size: %d bytes\n", payloadLen)
	*reply = payloadLen
	return nil
}

func main() {
	app := fiber.New(fiber.Config{
		DisableStartupMessage: true,
	})

	// Define an upgrader to handle upgrading HTTP connections to WebSocket.
	app.Use("/ws", func(c *fiber.Ctx) error {
		if websocket.IsWebSocketUpgrade(c) {
			return c.Next()
		}
		return fiber.ErrUpgradeRequired
	})

	// Handler for the WebSocket connection.
	app.Get("/ws", websocket.New(func(c *websocket.Conn) {
		defer c.Close()

		// Wrap the WebSocket connection using fiberws.Conn
		conn := fiberws.NewAdapter(c)
		config := wsrpc.DefaultConfig()
		config.LogOutput = io.Discard

		// Create the WSRPC server
		wsrpcServer, err := wsrpc.NewServer(conn, config)
		if err != nil {
			log.Printf("Error creating server: %v\n", err)
			return
		}
		defer wsrpcServer.Close()

		// Register the service on the server for calls from the client
		err = wsrpcServer.Register(&ServerAPI{})
		if err != nil {
			log.Printf("Error registering service on server: %v\n", err)
			return
		}

		// --- Server calls Client ---

		// 1. Hello
		log.Println("Server: Calling ClientAPI.Hello...")

		var clientReplyHello string
		err = wsrpcServer.Call("ClientAPI.Hello", "server initial call", &clientReplyHello)
		if err != nil {
			log.Printf("Server: Error calling ClientAPI.Hello: %v\n", err)
		} else {
			log.Printf("Server: Response from ClientAPI.Hello: %q\n", clientReplyHello)
		}

		// 2. DelayedHello
		log.Println("Server: Calling ClientAPI.DelayedHello (will block)...")

		var clientReplyDelayed string
		err = wsrpcServer.Call("ClientAPI.DelayedHello", "server delayed call", &clientReplyDelayed)
		if err != nil {
			log.Printf("Server: Error calling ClientAPI.DelayedHello: %v\n", err)
		} else {
			log.Printf("Server: Response from ClientAPI.DelayedHello: %q\n", clientReplyDelayed)
		}

		// 3. LargePayload
		log.Println("Server: Calling ClientAPI.LargePayload...")

		largeArgs := []byte(demodata.LargePayload)
		var clientReplyLarge int
		err = wsrpcServer.Call("ClientAPI.LargePayload", largeArgs, &clientReplyLarge)
		if err != nil {
			log.Printf("Server: Error calling ClientAPI.LargePayload: %v\n", err)
		} else {
			log.Printf("Server: Response from ClientAPI.LargePayload (client received size): %d bytes\n", clientReplyLarge)
		}

		// ------

		// Wait until the connection is closed
		<-wsrpcServer.Done()
	}))

	fmt.Println("Server listening on :50505")
	err := app.Listen(":50505")
	if err != nil {
		log.Println("Error starting HTTP server:", err)
	}
}
Gorilla client
package main

import (
	"fmt"
	"io"
	"log"
	"time"

	"github.com/catamat/wsrpc"
	"github.com/catamat/wsrpc/examples/demodata"
	"github.com/catamat/wsrpc/gorillaws"
	"github.com/gorilla/websocket"
)

// Definition of the API that the server can call on the client.
type ClientAPI struct{}

func (c *ClientAPI) Hello(args string, reply *string) error {
	log.Printf("Client: Received ClientAPI.Hello call with args: %q\n", args)
	*reply = "Client says hello back to: " + args
	return nil
}

func (c *ClientAPI) DelayedHello(args string, reply *string) error {
	log.Printf("Client: Received ClientAPI.DelayedHello call with args: %q - Waiting 10 seconds...\n", args)
	time.Sleep(10 * time.Second)
	*reply = "Client belatedly says hello back to: " + args
	log.Println("Client: Responding to ClientAPI.DelayedHello")
	return nil
}

func (c *ClientAPI) LargePayload(args []byte, reply *int) error {
	payloadLen := len(args)
	log.Printf("Client: Received ClientAPI.LargePayload call with payload size: %d bytes\n", payloadLen)
	*reply = payloadLen
	return nil
}

func main() {
	retryDelay := 10 * time.Second

	for {
		err := connect()
		if err != nil {
			log.Println("Error connecting to server:", err)

			// Wait before attempting to reconnect
			time.Sleep(retryDelay)
			continue
		}
	}
}

func connect() error {
	// Connect to the server
	wsConn, _, err := websocket.DefaultDialer.Dial("ws://localhost:50505/ws", nil)
	if err != nil {
		return err
	}
	defer wsConn.Close()
	log.Println("Successfully connected to the server")

	// Wrap the WebSocket connection using gorilla.Conn
	conn := gorillaws.NewAdapter(wsConn)
	config := wsrpc.DefaultConfig()
	config.LogOutput = io.Discard

	// Create the WSRPC client
	wsrpcClient, err := wsrpc.NewClient(conn, config)
	if err != nil {
		log.Printf("Error creating client: %v\n", err)
		return err
	}
	defer wsrpcClient.Close()

	// Register the service on the client for calls from the server
	err = wsrpcClient.Register(&ClientAPI{})
	if err != nil {
		log.Printf("Error registering service on client: %v\n", err)
		return err
	}

	// --- Client calls Server ---

	// 1. Hello
	log.Println("Client: Calling ServerAPI.Hello...")

	var serverReplyHello string
	err = wsrpcClient.Call("ServerAPI.Hello", "client initial call", &serverReplyHello)
	if err != nil {
		log.Printf("Client: Error calling ServerAPI.Hello: %v\n", err)
		return fmt.Errorf("ServerAPI.Hello call error: %w", err)
	} else {
		log.Printf("Client: Response from ServerAPI.Hello: %q\n", serverReplyHello)
	}

	// 2. DelayedHello
	log.Println("Client: Calling ServerAPI.DelayedHello (will block)...")

	var serverReplyDelayed string
	err = wsrpcClient.Call("ServerAPI.DelayedHello", "client delayed call", &serverReplyDelayed)
	if err != nil {
		log.Printf("Client: Error calling ServerAPI.DelayedHello: %v\n", err)
		return fmt.Errorf("ServerAPI.DelayedHello call error: %w", err)
	} else {
		log.Printf("Client: Response from ServerAPI.DelayedHello: %q\n", serverReplyDelayed)
	}

	// 3. LargePayload
	log.Println("Client: Calling ServerAPI.LargePayload...")

	largeArgs := []byte(demodata.LargePayload)
	var serverReplyLarge int
	err = wsrpcClient.Call("ServerAPI.LargePayload", largeArgs, &serverReplyLarge)
	if err != nil {
		log.Printf("Client: Error calling ServerAPI.LargePayload: %v\n", err)
		return fmt.Errorf("ServerAPI.LargePayload call error: %w", err)
	} else {
		log.Printf("Client: Response from ServerAPI.LargePayload (server received size): %d bytes\n", serverReplyLarge)
	}

	// ------

	// Wait until the connection is closed
	<-wsrpcClient.Done()

	return nil
}
FastHTTP client
package main

import (
	"fmt"
	"io"
	"log"
	"time"

	"github.com/catamat/wsrpc"
	"github.com/catamat/wsrpc/examples/demodata"
	"github.com/catamat/wsrpc/fasthttpws"
	"github.com/fasthttp/websocket"
)

// Definition of the API that the server can call on the client.
type ClientAPI struct{}

func (c *ClientAPI) Hello(args string, reply *string) error {
	log.Printf("Client: Received ClientAPI.Hello call with args: %q\n", args)
	*reply = "Client says hello back to: " + args
	return nil
}

func (c *ClientAPI) DelayedHello(args string, reply *string) error {
	log.Printf("Client: Received ClientAPI.DelayedHello call with args: %q - Waiting 10 seconds...\n", args)
	time.Sleep(10 * time.Second)
	*reply = "Client belatedly says hello back to: " + args
	log.Println("Client: Responding to ClientAPI.DelayedHello")
	return nil
}

func (c *ClientAPI) LargePayload(args []byte, reply *int) error {
	payloadLen := len(args)
	log.Printf("Client: Received ClientAPI.LargePayload call with payload size: %d bytes\n", payloadLen)
	*reply = payloadLen
	return nil
}

func main() {
	retryDelay := 10 * time.Second

	for {
		err := connect()
		if err != nil {
			log.Println("Error connecting to server:", err)

			// Wait before attempting to reconnect
			time.Sleep(retryDelay)
			continue
		}
	}
}

func connect() error {
	// Connect to the server
	wsConn, _, err := websocket.DefaultDialer.Dial("ws://localhost:50505/ws", nil)
	if err != nil {
		return err
	}
	defer wsConn.Close()
	log.Println("Successfully connected to the server")

	// Wrap the WebSocket connection using fasthttpws.Conn
	conn := fasthttpws.NewAdapter(wsConn)
	config := wsrpc.DefaultConfig()
	config.LogOutput = io.Discard
	
	// Create the WSRPC client
	wsrpcClient, err := wsrpc.NewClient(conn, config)
	if err != nil {
		log.Printf("Error creating client: %v\n", err)
		return err
	}
	defer wsrpcClient.Close()

	// Register the service on the client for calls from the server
	err = wsrpcClient.Register(&ClientAPI{})
	if err != nil {
		log.Printf("Error registering service on client: %v\n", err)
		return err
	}

	// --- Client calls Server ---

	// 1. Hello
	log.Println("Client: Calling ServerAPI.Hello...")

	var serverReplyHello string
	err = wsrpcClient.Call("ServerAPI.Hello", "client initial call", &serverReplyHello)
	if err != nil {
		log.Printf("Client: Error calling ServerAPI.Hello: %v\n", err)
		return fmt.Errorf("ServerAPI.Hello call error: %w", err)
	} else {
		log.Printf("Client: Response from ServerAPI.Hello: %q\n", serverReplyHello)
	}

	// 2. DelayedHello
	log.Println("Client: Calling ServerAPI.DelayedHello (will block)...")

	var serverReplyDelayed string
	err = wsrpcClient.Call("ServerAPI.DelayedHello", "client delayed call", &serverReplyDelayed)
	if err != nil {
		log.Printf("Client: Error calling ServerAPI.DelayedHello: %v\n", err)
		return fmt.Errorf("ServerAPI.DelayedHello call error: %w", err)
	} else {
		log.Printf("Client: Response from ServerAPI.DelayedHello: %q\n", serverReplyDelayed)
	}

	// 3. LargePayload
	log.Println("Client: Calling ServerAPI.LargePayload...")

	largeArgs := []byte(demodata.LargePayload)
	var serverReplyLarge int
	err = wsrpcClient.Call("ServerAPI.LargePayload", largeArgs, &serverReplyLarge)
	if err != nil {
		log.Printf("Client: Error calling ServerAPI.LargePayload: %v\n", err)
		return fmt.Errorf("ServerAPI.LargePayload call error: %w", err)
	} else {
		log.Printf("Client: Response from ServerAPI.LargePayload (server received size): %d bytes\n", serverReplyLarge)
	}

	// ------

	// Wait until the connection is closed
	<-wsrpcClient.Done()

	return nil
}

Documentation

Index

Constants

View Source
const (
	// TextMessage denotes a text data message.
	// The text message payload is interpreted as UTF-8 encoded text data.
	TextMessage = 1

	// BinaryMessage denotes a binary data message.
	BinaryMessage = 2
)

The message types are defined in RFC 6455, section 11.8.

Variables

This section is empty.

Functions

This section is empty.

Types

type Config added in v0.2.0

type Config struct {
	AcceptBacklog          int
	EnableKeepAlive        bool
	KeepAliveInterval      time.Duration
	ConnectionWriteTimeout time.Duration
	MaxStreamWindowSize    uint32
	StreamOpenTimeout      time.Duration
	StreamCloseTimeout     time.Duration
	LogOutput              io.Writer
	MaxWebSocketChunkSize  int
}

Config defines the configuration options.

func DefaultConfig added in v0.2.0

func DefaultConfig() *Config

DefaultConfig is used to return a default configuration.

type Conn added in v0.2.0

type Conn interface {
	ReadMessage() (messageType int, p []byte, err error)
	WriteMessage(messageType int, data []byte) error
	Close() error
	LocalAddr() net.Addr
	RemoteAddr() net.Addr
	SetReadDeadline(t time.Time) error
	SetWriteDeadline(t time.Time) error
}

Conn defines the interface that must be implemented by a WebSocket connection.

type WSRPC

type WSRPC struct {
	// contains filtered or unexported fields
}

WSRPC defines a bidirectional RPC connection over a Yamux session.

func NewClient

func NewClient(conn Conn, config *Config) (*WSRPC, error)

NewClient creates a new client on a net.Conn connection.

func NewServer

func NewServer(conn Conn, config *Config) (*WSRPC, error)

NewServer creates a new server on a net.Conn connection.

func (*WSRPC) Call

func (wsrpc *WSRPC) Call(serviceMethod string, args interface{}, reply interface{}) error

Call calls the named function, waits for it to complete, and returns its error status.

func (*WSRPC) Close

func (wsrpc *WSRPC) Close() error

Close closes the connection.

func (*WSRPC) Done

func (wsrpc *WSRPC) Done() <-chan struct{}

Done returns a channel that is closed when the connection is terminated.

func (*WSRPC) Register

func (wsrpc *WSRPC) Register(rcvr interface{}) error

Register registers the set of methods of the RPC service.

Directories

Path Synopsis
examples
fasthttp_client command
fiber_server command
gorilla_client command
gorilla_server command

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL