@@ -69,7 +69,8 @@ type Monitor interface {
69
69
// it must be created using the NewServer function.
70
70
type Server struct {
71
71
impl Monitor
72
- outputChan chan * message
72
+ out io.Writer
73
+ outMutex sync.Mutex
73
74
userAgent string
74
75
reqProtocolVersion int
75
76
initialized bool
@@ -82,8 +83,7 @@ type Server struct {
82
83
// use the Run method.
83
84
func NewServer (impl Monitor ) * Server {
84
85
return & Server {
85
- impl : impl ,
86
- outputChan : make (chan * message ),
86
+ impl : impl ,
87
87
}
88
88
}
89
89
@@ -93,21 +93,20 @@ func NewServer(impl Monitor) *Server {
93
93
// the input stream is closed. In case of IO error the error is
94
94
// returned.
95
95
func (d * Server ) Run (in io.Reader , out io.Writer ) error {
96
- go d .outputProcessor (out )
97
- defer close (d .outputChan )
96
+ d .out = out
98
97
reader := bufio .NewReader (in )
99
98
for {
100
99
fullCmd , err := reader .ReadString ('\n' )
101
100
if err != nil {
102
- d .outputChan <- messageError ("command_error" , err .Error ())
101
+ d .outputMessage ( messageError ("command_error" , err .Error () ))
103
102
return err
104
103
}
105
104
fullCmd = strings .TrimSpace (fullCmd )
106
105
split := strings .Split (fullCmd , " " )
107
106
cmd := strings .ToUpper (split [0 ])
108
107
109
108
if ! d .initialized && cmd != "HELLO" && cmd != "QUIT" {
110
- d .outputChan <- messageError ("command_error" , fmt .Sprintf ("First command must be HELLO, but got '%s'" , cmd ))
109
+ d .outputMessage ( messageError ("command_error" , fmt .Sprintf ("First command must be HELLO, but got '%s'" , cmd ) ))
111
110
continue
112
111
}
113
112
@@ -124,105 +123,105 @@ func (d *Server) Run(in io.Reader, out io.Writer) error {
124
123
d .close ("" )
125
124
case "QUIT" :
126
125
d .impl .Quit ()
127
- d .outputChan <- messageOk ("quit" )
126
+ d .outputMessage ( messageOk ("quit" ) )
128
127
return nil
129
128
default :
130
- d .outputChan <- messageError ("command_error" , fmt .Sprintf ("Command %s not supported" , cmd ))
129
+ d .outputMessage ( messageError ("command_error" , fmt .Sprintf ("Command %s not supported" , cmd ) ))
131
130
}
132
131
}
133
132
}
134
133
135
134
func (d * Server ) hello (cmd string ) {
136
135
if d .initialized {
137
- d .outputChan <- messageError ("hello" , "HELLO already called" )
136
+ d .outputMessage ( messageError ("hello" , "HELLO already called" ) )
138
137
return
139
138
}
140
139
re := regexp .MustCompile (`^(\d+) "([^"]+)"$` )
141
140
matches := re .FindStringSubmatch (cmd )
142
141
if len (matches ) != 3 {
143
- d .outputChan <- messageError ("hello" , "Invalid HELLO command" )
142
+ d .outputMessage ( messageError ("hello" , "Invalid HELLO command" ) )
144
143
return
145
144
}
146
145
d .userAgent = matches [2 ]
147
146
v , err := strconv .ParseInt (matches [1 ], 10 , 64 )
148
147
if err != nil {
149
- d .outputChan <- messageError ("hello" , "Invalid protocol version: " + matches [2 ])
148
+ d .outputMessage ( messageError ("hello" , "Invalid protocol version: " + matches [2 ]) )
150
149
return
151
150
}
152
151
d .reqProtocolVersion = int (v )
153
152
if err := d .impl .Hello (d .userAgent , 1 ); err != nil {
154
- d .outputChan <- messageError ("hello" , err .Error ())
153
+ d .outputMessage ( messageError ("hello" , err .Error () ))
155
154
return
156
155
}
157
- d .outputChan <- & message {
156
+ d .outputMessage ( & message {
158
157
EventType : "hello" ,
159
158
ProtocolVersion : 1 , // Protocol version 1 is the only supported for now...
160
159
Message : "OK" ,
161
- }
160
+ })
162
161
d .initialized = true
163
162
}
164
163
165
164
func (d * Server ) describe () {
166
165
if ! d .initialized {
167
- d .outputChan <- messageError ("describe" , "Monitor not initialized" )
166
+ d .outputMessage ( messageError ("describe" , "Monitor not initialized" ) )
168
167
return
169
168
}
170
169
portDescription , err := d .impl .Describe ()
171
170
if err != nil {
172
- d .outputChan <- messageError ("describe" , err .Error ())
171
+ d .outputMessage ( messageError ("describe" , err .Error () ))
173
172
return
174
173
}
175
- d .outputChan <- & message {
174
+ d .outputMessage ( & message {
176
175
EventType : "describe" ,
177
176
Message : "OK" ,
178
177
PortDescription : portDescription ,
179
- }
178
+ })
180
179
}
181
180
182
181
func (d * Server ) configure (cmd string ) {
183
182
if ! d .initialized {
184
- d .outputChan <- messageError ("configure" , "Monitor not initialized" )
183
+ d .outputMessage ( messageError ("configure" , "Monitor not initialized" ) )
185
184
return
186
185
}
187
186
re := regexp .MustCompile (`^([\w.-]+) (.+)$` )
188
187
matches := re .FindStringSubmatch (cmd )
189
188
if len (matches ) != 3 {
190
- d .outputChan <- messageError ("configure" , "Invalid CONFIGURE command" )
189
+ d .outputMessage ( messageError ("configure" , "Invalid CONFIGURE command" ) )
191
190
return
192
191
}
193
192
parameterName := matches [1 ]
194
193
value := matches [2 ]
195
194
if err := d .impl .Configure (parameterName , value ); err != nil {
196
- d .outputChan <- messageError ("configure" , err .Error ())
195
+ d .outputMessage ( messageError ("configure" , err .Error () ))
197
196
return
198
197
}
199
- d .outputChan <- & message {
198
+ d .outputMessage ( & message {
200
199
EventType : "configure" ,
201
200
Message : "OK" ,
202
- }
201
+ })
203
202
}
204
203
205
204
func (d * Server ) open (cmd string ) {
206
205
if ! d .initialized {
207
- d .outputChan <- messageError ("open" , "Monitor not initialized" )
206
+ d .outputMessage ( messageError ("open" , "Monitor not initialized" ) )
208
207
return
209
208
}
210
209
parameters := strings .SplitN (cmd , " " , 2 )
211
210
if len (parameters ) != 2 {
212
- d .outputChan <- messageError ("open" , "Invalid OPEN command" )
211
+ d .outputMessage ( messageError ("open" , "Invalid OPEN command" ) )
213
212
return
214
213
}
215
214
address := parameters [0 ]
216
215
portName := parameters [1 ]
217
216
port , err := d .impl .Open (portName )
218
217
if err != nil {
219
- d .outputChan <- messageError ("open" , err .Error ())
218
+ d .outputMessage ( messageError ("open" , err .Error () ))
220
219
return
221
220
}
222
221
d .clientConn , err = net .Dial ("tcp" , address )
223
222
if err != nil {
224
223
d .impl .Close ()
225
- d .outputChan <- messageError ("open" , err .Error ())
224
+ d .outputMessage ( messageError ("open" , err .Error () ))
226
225
return
227
226
}
228
227
// io.Copy is used to bridge the Client's TCP connection to the port one and vice versa
@@ -242,51 +241,47 @@ func (d *Server) open(cmd string) {
242
241
d .close ("lost connection with the port" )
243
242
}
244
243
}()
245
- d .outputChan <- & message {
244
+ d .outputMessage ( & message {
246
245
EventType : "open" ,
247
246
Message : "OK" ,
248
- }
247
+ })
249
248
}
250
249
251
250
func (d * Server ) close (messageErr string ) {
252
251
d .closeFuncMutex .Lock ()
253
252
defer d .closeFuncMutex .Unlock ()
254
253
if d .clientConn == nil {
255
254
if messageErr == "" {
256
- d .outputChan <- messageError ("close" , "port already closed" )
255
+ d .outputMessage ( messageError ("close" , "port already closed" ) )
257
256
}
258
257
return
259
258
}
260
259
connErr := d .clientConn .Close ()
261
260
portErr := d .impl .Close ()
262
261
d .clientConn = nil
263
262
if messageErr != "" {
264
- d .outputChan <- messageError ("port_closed" , messageErr )
263
+ d .outputMessage ( messageError ("port_closed" , messageErr ) )
265
264
return
266
265
}
267
266
if connErr != nil || portErr != nil {
268
267
var errs * multierror.Error
269
268
errs = multierror .Append (errs , connErr , portErr )
270
- d .outputChan <- messageError ("close" , errs .Error ())
269
+ d .outputMessage ( messageError ("close" , errs .Error () ))
271
270
return
272
271
}
273
- d .outputChan <- & message {
272
+ d .outputMessage ( & message {
274
273
EventType : "close" ,
275
274
Message : "OK" ,
276
- }
275
+ })
277
276
}
278
277
279
- func (d * Server ) outputProcessor (outWriter io.Writer ) {
280
- // Start go routine to serialize messages printing
281
- go func () {
282
- for msg := range d .outputChan {
283
- data , err := json .MarshalIndent (msg , "" , " " )
284
- if err != nil {
285
- // We are certain that this will be marshalled correctly
286
- // so we don't handle the error
287
- data , _ = json .MarshalIndent (messageError ("command_error" , err .Error ()), "" , " " )
288
- }
289
- fmt .Fprintln (outWriter , string (data ))
290
- }
291
- }()
278
+ func (d * Server ) outputMessage (msg * message ) {
279
+ data , err := json .MarshalIndent (msg , "" , " " )
280
+ if err != nil {
281
+ // We are certain that this will be marshalled correctly so we don't handle the error
282
+ data , _ = json .MarshalIndent (messageError ("command_error" , err .Error ()), "" , " " )
283
+ }
284
+ d .outMutex .Lock ()
285
+ fmt .Fprintln (d .out , string (data ))
286
+ d .outMutex .Unlock ()
292
287
}
0 commit comments