- Notifications
You must be signed in to change notification settings - Fork15
dgrr/websocket
Folders and files
Name | Name | Last commit message | Last commit date | |
---|---|---|---|---|
Repository files navigation
WebSocket library forfasthttp andnet/http.
Checkoutexamples to inspire yourself.
go get github.com/dgrr/websocket
Other WebSocket packages DON'T allow concurrent Read/Write operations on serversand they do not provide low level access to WebSocket packet crafting.Those WebSocket packages try to emulate the Golang API by implementingio.Reader and io.Writer interfaces on their connections. io.Writer might be agood idea to use it, but no io.Reader, given that WebSocket is an async protocolby nature (all protocols are (?)).
Sometimes, WebSocket servers are just cumbersome when we want to handle a lot ofclients in an async manner. For example,in other WebSocket packages to broadcasta message generated internally we'll need to do the following:
typeMyWebSocketServicestruct {clients sync.Map}typeBlockingConnstruct {lck sync.Mutexc websocketPackage.Conn}func (ws*MyWebSocketService)Broadcaster() {formsg:=rangemessageProducerChannel {ws.clients.Range(func(_,vinterface{})bool {c:=v.(*BlockingConn)c.lck.Lock()// oh, we need to block, otherwise we can break the programerr:=c.Write(msg)c.lck.Unlock()iferr!=nil {// we have an error, what can we do? Log it?// if the connection has been closed we'll receive that on// the Read call, so the connection will close automatically. }returntrue }) }}func (ws*MyWebSocketService)Handle(request,response) {c,err:=websocketPackage.Upgrade(request,response)iferr!=nil {// then it's clearly an error! Report back }bc:=&BlockingConn{c:c, }ws.clients.Store(bc,struct{}{})// even though I just want to write, I need to block somehowfor {content,err:=bc.Read()iferr!=nil {// handle the errorbreak } }ws.clients.Delete(bc)}
First, we need to store every client upon connection,and whenever we want to send data we need to iterate over a list, and send the message.If while, writing we get an error, then we need to handle that client's errorWhat if the writing operation is happening at the same time in 2 different coroutines?Then we need a sync.Mutex and block until we finish writing.
To solve most of those problemswebsocketuses channels and separated coroutines, one for reading and another one for writing.By following thesharing principle.
Do not communicate by sharing memory; instead, share memory by communicating.
Following the fasthttp philosophy this library tries to take as much advantageof the Golang's multi-threaded model as possible,while keeping your code concurrently safe.
To see an example of what this package CAN do that others DONT checkoutthe broadcast example.
It's quite easy. You only need to create aServer,set your callbacks by calling theHandle* methodsand then specify your fasthttp handler asServer.Upgrade.
package mainimport ("fmt""github.com/valyala/fasthttp""github.com/dgrr/websocket")funcmain() {ws:= websocket.Server{}ws.HandleData(OnMessage)fasthttp.ListenAndServe(":8080",ws.Upgrade)}funcOnMessage(c*websocket.Conn,isBinarybool,data []byte) {fmt.Printf("Received data from %s: %s\n",c.RemoteAddr(),data)}
package mainimport ("fmt""net/http""github.com/dgrr/websocket")funcmain() {ws:= websocket.Server{}ws.HandleData(OnMessage)http.HandleFunc("/",ws.NetUpgrade)http.ListenAndServe(":8080",nil)}funcOnMessage(c*websocket.Conn,isBinarybool,data []byte) {fmt.Printf("Received data from %s: %s\n",c.RemoteAddr(),data)}
Pings are handle automatically by the library, but you can get the content ofthose pings setting the callback usingHandlePing.
For example, let's try to get the round trip time to a client by usingthe PING frame. The websitehttp2.gofiber.iouses this method to measure the round trip time displayed at the bottom of the webpage.
package mainimport ("sync""encoding/binary""log""time""github.com/valyala/fasthttp""github.com/dgrr/websocket")// Struct to keep the clients connected//// it should be safe to access the clients concurrently from Open and Close.typeRTTMeasurestruct {clients sync.Map}// just trigger the ping senderfunc (rtt*RTTMeasure)Start() {time.AfterFunc(time.Second*2,rtt.sendPings)}func (rtt*RTTMeasure)sendPings() {vardata [8]bytebinary.BigEndian.PutUint64(data[:],uint64(time.Now().UnixNano()),)rtt.clients.Range(func(_,vinterface{})bool {c:=v.(*websocket.Conn)c.Ping(data[:])returntrue})rtt.Start()}// register a connection when it's openfunc (rtt*RTTMeasure)RegisterConn(c*websocket.Conn) {rtt.clients.Store(c.ID(),c)log.Printf("Client %s connected\n",c.RemoteAddr())}// remove the connection when receiving the closefunc (rtt*RTTMeasure)RemoveConn(c*websocket.Conn,errerror) {rtt.clients.Delete(c.ID())log.Printf("Client %s disconnected\n",c.RemoteAddr())}funcmain() {rtt:=RTTMeasure{}ws:= websocket.Server{}ws.HandleOpen(rtt.RegisterConn)ws.HandleClose(rtt.RemoveConn)ws.HandlePong(OnPong)// schedule the timerrtt.Start()fasthttp.ListenAndServe(":8080",ws.Upgrade)}// handle the pong messagefuncOnPong(c*websocket.Conn,data []byte) {iflen(data)==8 {n:=binary.BigEndian.Uint64(data)ts:=time.Unix(0,int64(n))log.Printf("RTT with %s is %s\n",c.RemoteAddr(),time.Now().Sub(ts))}}
Features | websocket | Gorilla | Nhooyr | gowabs |
---|---|---|---|---|
Concurrent R/W | Yes | No | No. Only writes | No |
Passes Autobahn Test Suite | Mostly | Yes | Yes | Mostly |
Receive fragmented message | Yes | Yes | Yes | Yes |
Send close message | Yes | Yes | Yes | Yes |
Send pings and receive pongs | Yes | Yes | Yes | Yes |
Get the type of a received data message | Yes | Yes | Yes | Yes |
Compression Extensions | No | Experimental | Yes | No (?) |
Read message using io.Reader | No | Yes | No | No (?) |
Write message using io.WriteCloser | Yes | Yes | No | No (?) |
The following stress test were performed without timeouts:
Executingtcpkali --ws -c 100 -m 'hello world!!13212312!' -r 10k localhost:8081
the tests shows the following:
Total data sent: 267.7 MiB (280678466 bytes)Total data received: 229.5 MiB (240626600 bytes)Bandwidth per channel: 4.167⇅ Mbps (520.9 kBps)Aggregate bandwidth: 192.357↓, 224.375↑ MbpsPacket rate estimate: 247050.1↓, 61842.9↑ (1↓, 1↑ TCP MSS/op)Test duration: 10.0075 s.
Total data sent: 267.3 MiB (280320124 bytes)Total data received: 228.3 MiB (239396374 bytes)Bandwidth per channel: 4.156⇅ Mbps (519.5 kBps)Aggregate bandwidth: 191.442↓, 224.168↑ MbpsPacket rate estimate: 188107.1↓, 52240.7↑ (1↓, 1↑ TCP MSS/op)Test duration: 10.0039 s.
Either for fasthttp and net/http should be quite close,the only difference is the way they both upgrade.
Total data sent: 260.2 MiB (272886768 bytes)Total data received: 109.3 MiB (114632982 bytes)Bandwidth per channel: 3.097⇅ Mbps (387.1 kBps)Aggregate bandwidth: 91.615↓, 218.092↑ MbpsPacket rate estimate: 109755.3↓, 66807.4↑ (1↓, 1↑ TCP MSS/op)Test duration: 10.01 s.
Total data sent: 224.3 MiB (235184096 bytes)Total data received: 41.2 MiB (43209780 bytes)Bandwidth per channel: 2.227⇅ Mbps (278.3 kBps)Aggregate bandwidth: 34.559↓, 188.097↑ MbpsPacket rate estimate: 88474.0↓, 55256.1↑ (1↓, 1↑ TCP MSS/op)Test duration: 10.0027 s.
Total data sent: 265.8 MiB (278718160 bytes)Total data received: 117.8 MiB (123548959 bytes)Bandwidth per channel: 3.218⇅ Mbps (402.2 kBps)Aggregate bandwidth: 98.825↓, 222.942↑ MbpsPacket rate estimate: 148231.6↓, 72106.1↑ (1↓, 1↑ TCP MSS/op)Test duration: 10.0015 s.
The source files are inthis folder.