@@ -19,7 +19,7 @@ connections and disconnectinos.
1919A quick example of a simple sever that broadcasts a "tick" event every second
2020
2121 func main() {
22- stream := & eventsource.Stream{}
22+ stream := eventsource.NewStream()
2323 go func(s *eventsource.Stream) {
2424 for {
2525 time.Sleep(time.Second)
@@ -33,7 +33,6 @@ A quick example of a simple sever that broadcasts a "tick" event every second
3333package eventsource
3434
3535import (
36- "container/list"
3736 "net/http"
3837 "sync"
3938)
@@ -43,52 +42,49 @@ import (
4342// A stream also implements an http.Handler to easily register incoming
4443// http requests as new clients.
4544type Stream struct {
46- clients list. List
45+ clients map [ * Client ] topicList
4746 listLock sync.RWMutex
4847 shutdownWait sync.WaitGroup
4948 clientConnectHook func (* http.Request , * Client )
5049}
5150
52- type registeredClient struct {
53- c * Client
54- topics map [string ]bool
51+ type topicList map [string ]bool
52+
53+ // NewStream creates a new stream object
54+ func NewStream () * Stream {
55+ return & Stream {
56+ clients : make (map [* Client ]topicList ),
57+ }
5558}
5659
5760// Register adds a client to the stream to receive all broadcast
5861// messages. Has no effect if the client is already registered.
5962func (s * Stream ) Register (c * Client ) {
6063
6164 // see if the client has been registered
62- if cli := s .getClient ( c ); cli != nil {
65+ if _ , found := s .clients [ c ]; found {
6366 return
6467 }
6568
6669 // append new client
67- s .addClient ( c )
70+ s .clients [ c ] = make ( topicList )
6871}
6972
7073// Remove will remove a client from this stream, but not shut the client down.
7174func (s * Stream ) Remove (c * Client ) {
7275 s .listLock .Lock ()
7376 defer s .listLock .Unlock ()
7477
75- for element := s .clients .Front (); element != nil ; element = element .Next () {
76- if regCli := element .Value .(* registeredClient ); regCli .c == c {
77- // client found
78- s .clients .Remove (element )
79- return
80- }
81- }
78+ delete (s .clients , c )
8279}
8380
8481// Broadcast sends the event to all clients registered on this stream.
8582func (s * Stream ) Broadcast (e * Event ) {
8683 s .listLock .RLock ()
8784 defer s .listLock .RUnlock ()
8885
89- for element := s .clients .Front (); element != nil ; element = element .Next () {
90- cli := element .Value .(* registeredClient )
91- cli .c .Send (e )
86+ for cli := range s .clients {
87+ cli .Send (e )
9288 }
9389}
9490
@@ -97,35 +93,35 @@ func (s *Stream) Broadcast(e *Event) {
9793// client.
9894func (s * Stream ) Subscribe (topic string , c * Client ) {
9995 // see if the client is registered
100- cli := s .getClient ( c )
96+ topics , found := s .clients [ c ]
10197
10298 // register if not
103- if cli == nil {
104- cli = s .addClient (c )
99+ if ! found {
100+ topics = make (topicList )
101+ s .clients [c ] = topics
105102 }
106103
107- cli . topics [topic ] = true
104+ topics [topic ] = true
108105}
109106
110107// Unsubscribe removes clients from the topic, but not from broadcasts.
111108func (s * Stream ) Unsubscribe (topic string , c * Client ) {
112109
113- cli := s .getClient ( c )
114- if cli == nil {
110+ topics , found := s .clients [ c ]
111+ if ! found {
115112 return
116113 }
117- cli . topics [topic ] = false
114+ topics [topic ] = false
118115}
119116
120117// Publish sends the event to clients that have subscribed to the given topic.
121118func (s * Stream ) Publish (topic string , e * Event ) {
122119 s .listLock .RLock ()
123120 defer s .listLock .RUnlock ()
124121
125- for element := s .clients .Front (); element != nil ; element = element .Next () {
126- cli := element .Value .(* registeredClient )
127- if cli .topics [topic ] {
128- cli .c .Send (e )
122+ for cli , topics := range s .clients {
123+ if topics [topic ] {
124+ cli .Send (e )
129125 }
130126 }
131127}
@@ -135,12 +131,9 @@ func (s *Stream) Shutdown() {
135131 s .listLock .Lock ()
136132 defer s .listLock .Unlock ()
137133
138- for element := s .clients .Front (); element != nil ; {
139- cli := element .Value .(* registeredClient )
140- cli .c .Shutdown ()
141- next := element .Next ()
142- s .clients .Remove (element )
143- element = next
134+ for client := range s .clients {
135+ client .Shutdown ()
136+ delete (s .clients , client )
144137 }
145138}
146139
@@ -223,32 +216,3 @@ func (s *Stream) ClientConnectHook(fn func(*http.Request, *Client)) {
223216func checkRequest (r * http.Request ) bool {
224217 return r .Header .Get ("Accept" ) == "text/event-stream"
225218}
226-
227- func (s * Stream ) getClient (c * Client ) * registeredClient {
228- s .listLock .RLock ()
229- defer s .listLock .RUnlock ()
230-
231- for element := s .clients .Front (); element != nil ; element = element .Next () {
232- if regCli := element .Value .(* registeredClient ); regCli .c == c {
233- // client found
234- return regCli
235- }
236- }
237-
238- // not found
239- return nil
240- }
241-
242- func (s * Stream ) addClient (c * Client ) * registeredClient {
243-
244- cli := & registeredClient {
245- c : c ,
246- topics : make (map [string ]bool ),
247- }
248-
249- s .listLock .Lock ()
250- s .clients .PushBack (cli )
251- s .listLock .Unlock ()
252-
253- return cli
254- }
0 commit comments