Skip to content

Commit 9079f85

Browse files
authored
[skip-changelog] Pluggable monitor fix and refactor (arduino#1482)
* Fixed race condition in pluggable monitor Many methods had the stateLock held for the entire scope of the function call but the 'port_closed' message can be received at any moment, asyncronously, and it requires a stateLock as well. In this case the worst case is that the decode loop is blocked for 10 seconds until the timeout occurs, but this is not ideal. This bug has been fixed by removing the state, since it's not really useful. * Improved message logging in pluggable monitor * Refactored message processing in pluggable monitor * Fix i18n * fixed lint suggestion * fix from code review
1 parent 3d5b430 commit 9079f85

File tree

3 files changed

+77
-155
lines changed

3 files changed

+77
-155
lines changed

arduino/monitor/monitor.go

+55-112
Original file line numberDiff line numberDiff line change
@@ -21,14 +21,12 @@ import (
2121
"io"
2222
"net"
2323
"strings"
24-
"sync"
2524
"time"
2625

2726
"github.com/arduino/arduino-cli/cli/globals"
2827
"github.com/arduino/arduino-cli/executils"
2928
"github.com/arduino/arduino-cli/i18n"
3029
rpc "github.com/arduino/arduino-cli/rpc/cc/arduino/cli/commands/v1"
31-
"github.com/pkg/errors"
3230
"github.com/sirupsen/logrus"
3331
)
3432

@@ -50,11 +48,10 @@ type PluggableMonitor struct {
5048
outgoingCommandsPipe io.Writer
5149
incomingMessagesChan <-chan *monitorMessage
5250
supportedProtocol string
51+
log *logrus.Entry
5352

5453
// All the following fields are guarded by statusMutex
55-
statusMutex sync.Mutex
5654
incomingMessagesError error
57-
state int
5855
}
5956

6057
type monitorMessage struct {
@@ -101,7 +98,7 @@ func New(id string, args ...string) *PluggableMonitor {
10198
return &PluggableMonitor{
10299
id: id,
103100
processArgs: args,
104-
state: Dead,
101+
log: logrus.WithField("monitor", id),
105102
}
106103
}
107104

@@ -116,54 +113,54 @@ func (mon *PluggableMonitor) String() string {
116113

117114
func (mon *PluggableMonitor) jsonDecodeLoop(in io.Reader, outChan chan<- *monitorMessage) {
118115
decoder := json.NewDecoder(in)
119-
closeAndReportError := func(err error) {
120-
mon.statusMutex.Lock()
121-
mon.state = Dead
122-
mon.incomingMessagesError = err
123-
close(outChan)
124-
mon.statusMutex.Unlock()
125-
logrus.Errorf("stopped monitor %s decode loop", mon.id)
126-
}
127116

128117
for {
129118
var msg monitorMessage
130119
if err := decoder.Decode(&msg); err != nil {
131-
closeAndReportError(err)
120+
mon.incomingMessagesError = err
121+
close(outChan)
122+
mon.log.Errorf("stopped decode loop")
132123
return
133124
}
134-
logrus.Infof("from monitor %s received message %s", mon.id, msg)
125+
mon.log.
126+
WithField("event_type", msg.EventType).
127+
WithField("message", msg.Message).
128+
WithField("error", msg.Error).
129+
Infof("received message")
135130
if msg.EventType == "port_closed" {
136-
mon.statusMutex.Lock()
137-
mon.state = Idle
138-
mon.statusMutex.Unlock()
131+
mon.log.Infof("monitor port has been closed externally")
139132
} else {
140133
outChan <- &msg
141134
}
142135
}
143136
}
144137

145-
// State returns the current state of this PluggableMonitor
146-
func (mon *PluggableMonitor) State() int {
147-
mon.statusMutex.Lock()
148-
defer mon.statusMutex.Unlock()
149-
return mon.state
150-
}
151-
152-
func (mon *PluggableMonitor) waitMessage(timeout time.Duration) (*monitorMessage, error) {
138+
func (mon *PluggableMonitor) waitMessage(timeout time.Duration, expectedEvt string) (*monitorMessage, error) {
139+
var msg *monitorMessage
153140
select {
154-
case msg := <-mon.incomingMessagesChan:
141+
case msg = <-mon.incomingMessagesChan:
155142
if msg == nil {
156143
// channel has been closed
157144
return nil, mon.incomingMessagesError
158145
}
159-
return msg, nil
160146
case <-time.After(timeout):
161-
return nil, fmt.Errorf(tr("timeout waiting for message from monitor %s"), mon.id)
147+
return nil, fmt.Errorf(tr("timeout waiting for message"))
148+
}
149+
if expectedEvt == "" {
150+
// No message processing required for this call
151+
return msg, nil
152+
}
153+
if msg.EventType != expectedEvt {
154+
return msg, fmt.Errorf(tr("communication out of sync, expected '%[1]s', received '%[2]s'"), expectedEvt, msg.EventType)
155+
}
156+
if msg.Message != "OK" || msg.Error {
157+
return msg, fmt.Errorf(tr("command '%[1]s' failed: %[2]s"), expectedEvt, msg.Message)
162158
}
159+
return msg, nil
163160
}
164161

165162
func (mon *PluggableMonitor) sendCommand(command string) error {
166-
logrus.Infof("sending command %s to monitor %s", strings.TrimSpace(command), mon)
163+
mon.log.WithField("command", strings.TrimSpace(command)).Infof("sending command")
167164
data := []byte(command)
168165
for {
169166
n, err := mon.outgoingCommandsPipe.Write(data)
@@ -178,7 +175,7 @@ func (mon *PluggableMonitor) sendCommand(command string) error {
178175
}
179176

180177
func (mon *PluggableMonitor) runProcess() error {
181-
logrus.Infof("starting monitor %s process", mon.id)
178+
mon.log.Infof("Starting monitor process")
182179
proc, err := executils.NewProcess(mon.processArgs...)
183180
if err != nil {
184181
return err
@@ -194,32 +191,27 @@ func (mon *PluggableMonitor) runProcess() error {
194191
mon.outgoingCommandsPipe = stdin
195192
mon.process = proc
196193

194+
if err := mon.process.Start(); err != nil {
195+
return err
196+
}
197+
197198
messageChan := make(chan *monitorMessage)
198199
mon.incomingMessagesChan = messageChan
199200
go mon.jsonDecodeLoop(stdout, messageChan)
200201

201-
if err := mon.process.Start(); err != nil {
202-
return err
203-
}
204-
mon.statusMutex.Lock()
205-
defer mon.statusMutex.Unlock()
206-
mon.state = Alive
207-
logrus.Infof("started monitor %s process", mon.id)
202+
mon.log.Infof("Monitor process started successfully!")
208203
return nil
209204
}
210205

211206
func (mon *PluggableMonitor) killProcess() error {
212-
logrus.Infof("killing monitor %s process", mon.id)
207+
mon.log.Infof("Killing monitor process")
213208
if err := mon.process.Kill(); err != nil {
214209
return err
215210
}
216211
if err := mon.process.Wait(); err != nil {
217212
return err
218213
}
219-
mon.statusMutex.Lock()
220-
defer mon.statusMutex.Unlock()
221-
mon.state = Dead
222-
logrus.Infof("killed monitor %s process", mon.id)
214+
mon.log.Infof("Monitor process killed successfully!")
223215
return nil
224216
}
225217

@@ -241,25 +233,18 @@ func (mon *PluggableMonitor) Run() (err error) {
241233
if killErr := mon.killProcess(); killErr != nil {
242234
// Log failure to kill the process, ideally that should never happen
243235
// but it's best to know it if it does
244-
logrus.Errorf("Killing monitor %s after unsuccessful start: %s", mon.id, killErr)
236+
mon.log.Errorf("Killing monitor after unsuccessful start: %s", killErr)
245237
}
246238
}()
247239

248240
if err = mon.sendCommand("HELLO 1 \"arduino-cli " + globals.VersionInfo.VersionString + "\"\n"); err != nil {
249241
return err
250242
}
251-
if msg, err := mon.waitMessage(time.Second * 10); err != nil {
252-
return fmt.Errorf(tr("calling %[1]s: %[2]w"), "HELLO", err)
253-
} else if msg.EventType != "hello" {
254-
return errors.Errorf(tr("communication out of sync, expected 'hello', received '%s'"), msg.EventType)
255-
} else if msg.Message != "OK" || msg.Error {
256-
return errors.Errorf(tr("command failed: %s"), msg.Message)
243+
if msg, err := mon.waitMessage(time.Second*10, "hello"); err != nil {
244+
return err
257245
} else if msg.ProtocolVersion > 1 {
258-
return errors.Errorf(tr("protocol version not supported: requested 1, got %d"), msg.ProtocolVersion)
246+
return fmt.Errorf(tr("protocol version not supported: requested %[1]d, got %[2]d"), 1, msg.ProtocolVersion)
259247
}
260-
mon.statusMutex.Lock()
261-
defer mon.statusMutex.Unlock()
262-
mon.state = Idle
263248
return nil
264249
}
265250

@@ -268,45 +253,25 @@ func (mon *PluggableMonitor) Describe() (*PortDescriptor, error) {
268253
if err := mon.sendCommand("DESCRIBE\n"); err != nil {
269254
return nil, err
270255
}
271-
if msg, err := mon.waitMessage(time.Second * 10); err != nil {
272-
return nil, fmt.Errorf("calling %s: %w", "", err)
273-
} else if msg.EventType != "describe" {
274-
return nil, errors.Errorf(tr("communication out of sync, expected 'describe', received '%s'"), msg.EventType)
275-
} else if msg.Message != "OK" || msg.Error {
276-
return nil, errors.Errorf(tr("command failed: %s"), msg.Message)
277-
} else {
278-
mon.supportedProtocol = msg.PortDescription.Protocol
279-
return msg.PortDescription, nil
256+
msg, err := mon.waitMessage(time.Second*10, "describe")
257+
if err != nil {
258+
return nil, err
280259
}
260+
mon.supportedProtocol = msg.PortDescription.Protocol
261+
return msg.PortDescription, nil
281262
}
282263

283264
// Configure sets a port configuration parameter.
284265
func (mon *PluggableMonitor) Configure(param, value string) error {
285266
if err := mon.sendCommand(fmt.Sprintf("CONFIGURE %s %s\n", param, value)); err != nil {
286267
return err
287268
}
288-
if msg, err := mon.waitMessage(time.Second * 10); err != nil {
289-
return fmt.Errorf("calling %s: %w", "", err)
290-
} else if msg.EventType != "configure" {
291-
return errors.Errorf(tr("communication out of sync, expected 'configure', received '%s'"), msg.EventType)
292-
} else if msg.Message != "OK" || msg.Error {
293-
return errors.Errorf(tr("configure failed: %s"), msg.Message)
294-
} else {
295-
return nil
296-
}
269+
_, err := mon.waitMessage(time.Second*10, "configure")
270+
return err
297271
}
298272

299273
// Open connects to the given Port. A communication channel is opened
300274
func (mon *PluggableMonitor) Open(port *rpc.Port) (io.ReadWriter, error) {
301-
mon.statusMutex.Lock()
302-
defer mon.statusMutex.Unlock()
303-
304-
if mon.state == Opened {
305-
return nil, fmt.Errorf("a port is already opened")
306-
}
307-
if mon.state != Idle {
308-
return nil, fmt.Errorf("the monitor is not started")
309-
}
310275
if port.Protocol != mon.supportedProtocol {
311276
return nil, fmt.Errorf("invalid monitor protocol '%s': only '%s' is accepted", port.Protocol, mon.supportedProtocol)
312277
}
@@ -321,58 +286,36 @@ func (mon *PluggableMonitor) Open(port *rpc.Port) (io.ReadWriter, error) {
321286
if err := mon.sendCommand(fmt.Sprintf("OPEN 127.0.0.1:%d %s\n", tcpListenerPort, port.Address)); err != nil {
322287
return nil, err
323288
}
324-
if msg, err := mon.waitMessage(time.Second * 10); err != nil {
325-
return nil, fmt.Errorf("calling %s: %w", "", err)
326-
} else if msg.EventType != "open" {
327-
return nil, errors.Errorf(tr("communication out of sync, expected 'open', received '%s'"), msg.EventType)
328-
} else if msg.Message != "OK" || msg.Error {
329-
return nil, errors.Errorf(tr("open failed: %s"), msg.Message)
289+
if _, err := mon.waitMessage(time.Second*10, "open"); err != nil {
290+
return nil, err
330291
}
331292

332293
conn, err := tcpListener.Accept()
333294
if err != nil {
334295
return nil, err // TODO
335296
}
336-
337-
mon.state = Opened
338297
return conn, nil
339298
}
340299

341300
// Close the communication port with the board.
342301
func (mon *PluggableMonitor) Close() error {
343-
mon.statusMutex.Lock()
344-
defer mon.statusMutex.Unlock()
345-
346-
if mon.state != Opened {
347-
return fmt.Errorf("monitor port already closed")
348-
}
349302
if err := mon.sendCommand("CLOSE\n"); err != nil {
350303
return err
351304
}
352-
if msg, err := mon.waitMessage(time.Second * 10); err != nil {
353-
return fmt.Errorf("calling %s: %w", "", err)
354-
} else if msg.EventType != "close" {
355-
return errors.Errorf(tr("communication out of sync, expected 'close', received '%s'"), msg.EventType)
356-
} else if msg.Message != "OK" || msg.Error {
357-
return errors.Errorf(tr("command failed: %s"), msg.Message)
358-
} else {
359-
mon.state = Idle
360-
return nil
361-
}
305+
_, err := mon.waitMessage(time.Second*10, "close")
306+
return err
362307
}
363308

364309
// Quit terminates the monitor. No more commands can be accepted by the monitor.
365310
func (mon *PluggableMonitor) Quit() error {
366311
if err := mon.sendCommand("QUIT\n"); err != nil {
367312
return err
368313
}
369-
if msg, err := mon.waitMessage(time.Second * 10); err != nil {
370-
return fmt.Errorf(tr("calling %[1]s: %[2]w"), "QUIT", err)
371-
} else if msg.EventType != "quit" {
372-
return errors.Errorf(tr("communication out of sync, expected 'quit', received '%s'"), msg.EventType)
373-
} else if msg.Message != "OK" || msg.Error {
374-
return errors.Errorf(tr("command failed: %s"), msg.Message)
314+
if _, err := mon.waitMessage(time.Second*10, "quit"); err != nil {
315+
return err
316+
}
317+
if err := mon.killProcess(); err != nil {
318+
mon.log.WithError(err).Info("error killing monitor process")
375319
}
376-
mon.killProcess()
377320
return nil
378321
}

0 commit comments

Comments
 (0)