From c09fcba77c0bf5c890e9bd19171de555e7ed9241 Mon Sep 17 00:00:00 2001 From: Cristian Maglie Date: Mon, 4 Apr 2022 23:30:19 +0200 Subject: [PATCH] Removed useless goroutine --- monitor.go | 91 ++++++++++++++++++++++++++---------------------------- 1 file changed, 43 insertions(+), 48 deletions(-) diff --git a/monitor.go b/monitor.go index 5833216..81ace3b 100644 --- a/monitor.go +++ b/monitor.go @@ -69,7 +69,8 @@ type Monitor interface { // it must be created using the NewServer function. type Server struct { impl Monitor - outputChan chan *message + out io.Writer + outMutex sync.Mutex userAgent string reqProtocolVersion int initialized bool @@ -82,8 +83,7 @@ type Server struct { // use the Run method. func NewServer(impl Monitor) *Server { return &Server{ - impl: impl, - outputChan: make(chan *message), + impl: impl, } } @@ -93,13 +93,12 @@ func NewServer(impl Monitor) *Server { // the input stream is closed. In case of IO error the error is // returned. func (d *Server) Run(in io.Reader, out io.Writer) error { - go d.outputProcessor(out) - defer close(d.outputChan) + d.out = out reader := bufio.NewReader(in) for { fullCmd, err := reader.ReadString('\n') if err != nil { - d.outputChan <- messageError("command_error", err.Error()) + d.outputMessage(messageError("command_error", err.Error())) return err } fullCmd = strings.TrimSpace(fullCmd) @@ -107,7 +106,7 @@ func (d *Server) Run(in io.Reader, out io.Writer) error { cmd := strings.ToUpper(split[0]) if !d.initialized && cmd != "HELLO" && cmd != "QUIT" { - d.outputChan <- messageError("command_error", fmt.Sprintf("First command must be HELLO, but got '%s'", cmd)) + d.outputMessage(messageError("command_error", fmt.Sprintf("First command must be HELLO, but got '%s'", cmd))) continue } @@ -124,105 +123,105 @@ func (d *Server) Run(in io.Reader, out io.Writer) error { d.close("") case "QUIT": d.impl.Quit() - d.outputChan <- messageOk("quit") + d.outputMessage(messageOk("quit")) return nil default: - d.outputChan <- messageError("command_error", fmt.Sprintf("Command %s not supported", cmd)) + d.outputMessage(messageError("command_error", fmt.Sprintf("Command %s not supported", cmd))) } } } func (d *Server) hello(cmd string) { if d.initialized { - d.outputChan <- messageError("hello", "HELLO already called") + d.outputMessage(messageError("hello", "HELLO already called")) return } re := regexp.MustCompile(`^(\d+) "([^"]+)"$`) matches := re.FindStringSubmatch(cmd) if len(matches) != 3 { - d.outputChan <- messageError("hello", "Invalid HELLO command") + d.outputMessage(messageError("hello", "Invalid HELLO command")) return } d.userAgent = matches[2] v, err := strconv.ParseInt(matches[1], 10, 64) if err != nil { - d.outputChan <- messageError("hello", "Invalid protocol version: "+matches[2]) + d.outputMessage(messageError("hello", "Invalid protocol version: "+matches[2])) return } d.reqProtocolVersion = int(v) if err := d.impl.Hello(d.userAgent, 1); err != nil { - d.outputChan <- messageError("hello", err.Error()) + d.outputMessage(messageError("hello", err.Error())) return } - d.outputChan <- &message{ + d.outputMessage(&message{ EventType: "hello", ProtocolVersion: 1, // Protocol version 1 is the only supported for now... Message: "OK", - } + }) d.initialized = true } func (d *Server) describe() { if !d.initialized { - d.outputChan <- messageError("describe", "Monitor not initialized") + d.outputMessage(messageError("describe", "Monitor not initialized")) return } portDescription, err := d.impl.Describe() if err != nil { - d.outputChan <- messageError("describe", err.Error()) + d.outputMessage(messageError("describe", err.Error())) return } - d.outputChan <- &message{ + d.outputMessage(&message{ EventType: "describe", Message: "OK", PortDescription: portDescription, - } + }) } func (d *Server) configure(cmd string) { if !d.initialized { - d.outputChan <- messageError("configure", "Monitor not initialized") + d.outputMessage(messageError("configure", "Monitor not initialized")) return } re := regexp.MustCompile(`^([\w.-]+) (.+)$`) matches := re.FindStringSubmatch(cmd) if len(matches) != 3 { - d.outputChan <- messageError("configure", "Invalid CONFIGURE command") + d.outputMessage(messageError("configure", "Invalid CONFIGURE command")) return } parameterName := matches[1] value := matches[2] if err := d.impl.Configure(parameterName, value); err != nil { - d.outputChan <- messageError("configure", err.Error()) + d.outputMessage(messageError("configure", err.Error())) return } - d.outputChan <- &message{ + d.outputMessage(&message{ EventType: "configure", Message: "OK", - } + }) } func (d *Server) open(cmd string) { if !d.initialized { - d.outputChan <- messageError("open", "Monitor not initialized") + d.outputMessage(messageError("open", "Monitor not initialized")) return } parameters := strings.SplitN(cmd, " ", 2) if len(parameters) != 2 { - d.outputChan <- messageError("open", "Invalid OPEN command") + d.outputMessage(messageError("open", "Invalid OPEN command")) return } address := parameters[0] portName := parameters[1] port, err := d.impl.Open(portName) if err != nil { - d.outputChan <- messageError("open", err.Error()) + d.outputMessage(messageError("open", err.Error())) return } d.clientConn, err = net.Dial("tcp", address) if err != nil { d.impl.Close() - d.outputChan <- messageError("open", err.Error()) + d.outputMessage(messageError("open", err.Error())) return } // io.Copy is used to bridge the Client's TCP connection to the port one and vice versa @@ -242,10 +241,10 @@ func (d *Server) open(cmd string) { d.close("lost connection with the port") } }() - d.outputChan <- &message{ + d.outputMessage(&message{ EventType: "open", Message: "OK", - } + }) } func (d *Server) close(messageErr string) { @@ -253,7 +252,7 @@ func (d *Server) close(messageErr string) { defer d.closeFuncMutex.Unlock() if d.clientConn == nil { if messageErr == "" { - d.outputChan <- messageError("close", "port already closed") + d.outputMessage(messageError("close", "port already closed")) } return } @@ -261,32 +260,28 @@ func (d *Server) close(messageErr string) { portErr := d.impl.Close() d.clientConn = nil if messageErr != "" { - d.outputChan <- messageError("port_closed", messageErr) + d.outputMessage(messageError("port_closed", messageErr)) return } if connErr != nil || portErr != nil { var errs *multierror.Error errs = multierror.Append(errs, connErr, portErr) - d.outputChan <- messageError("close", errs.Error()) + d.outputMessage(messageError("close", errs.Error())) return } - d.outputChan <- &message{ + d.outputMessage(&message{ EventType: "close", Message: "OK", - } + }) } -func (d *Server) outputProcessor(outWriter io.Writer) { - // Start go routine to serialize messages printing - go func() { - for msg := range d.outputChan { - data, err := json.MarshalIndent(msg, "", " ") - if err != nil { - // We are certain that this will be marshalled correctly - // so we don't handle the error - data, _ = json.MarshalIndent(messageError("command_error", err.Error()), "", " ") - } - fmt.Fprintln(outWriter, string(data)) - } - }() +func (d *Server) outputMessage(msg *message) { + data, err := json.MarshalIndent(msg, "", " ") + if err != nil { + // We are certain that this will be marshalled correctly so we don't handle the error + data, _ = json.MarshalIndent(messageError("command_error", err.Error()), "", " ") + } + d.outMutex.Lock() + fmt.Fprintln(d.out, string(data)) + d.outMutex.Unlock() }