Skip to content

Removed useless goroutine in conversation handler #15

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
May 23, 2022
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
91 changes: 43 additions & 48 deletions monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,8 @@ type Monitor interface {
// it must be created using the NewServer function.
type Server struct {
impl Monitor
outputChan chan *message
out io.Writer
outMutex sync.Mutex
userAgent string
reqProtocolVersion int
initialized bool
Expand All @@ -82,8 +83,7 @@ type Server struct {
// use the Run method.
func NewServer(impl Monitor) *Server {
return &Server{
impl: impl,
outputChan: make(chan *message),
impl: impl,
}
}

Expand All @@ -93,21 +93,20 @@ func NewServer(impl Monitor) *Server {
// the input stream is closed. In case of IO error the error is
// returned.
func (d *Server) Run(in io.Reader, out io.Writer) error {
go d.outputProcessor(out)
defer close(d.outputChan)
d.out = out
reader := bufio.NewReader(in)
for {
fullCmd, err := reader.ReadString('\n')
if err != nil {
d.outputChan <- messageError("command_error", err.Error())
d.outputMessage(messageError("command_error", err.Error()))
return err
}
fullCmd = strings.TrimSpace(fullCmd)
split := strings.Split(fullCmd, " ")
cmd := strings.ToUpper(split[0])

if !d.initialized && cmd != "HELLO" && cmd != "QUIT" {
d.outputChan <- messageError("command_error", fmt.Sprintf("First command must be HELLO, but got '%s'", cmd))
d.outputMessage(messageError("command_error", fmt.Sprintf("First command must be HELLO, but got '%s'", cmd)))
continue
}

Expand All @@ -124,105 +123,105 @@ func (d *Server) Run(in io.Reader, out io.Writer) error {
d.close("")
case "QUIT":
d.impl.Quit()
d.outputChan <- messageOk("quit")
d.outputMessage(messageOk("quit"))
return nil
default:
d.outputChan <- messageError("command_error", fmt.Sprintf("Command %s not supported", cmd))
d.outputMessage(messageError("command_error", fmt.Sprintf("Command %s not supported", cmd)))
}
}
}

func (d *Server) hello(cmd string) {
if d.initialized {
d.outputChan <- messageError("hello", "HELLO already called")
d.outputMessage(messageError("hello", "HELLO already called"))
return
}
re := regexp.MustCompile(`^(\d+) "([^"]+)"$`)
matches := re.FindStringSubmatch(cmd)
if len(matches) != 3 {
d.outputChan <- messageError("hello", "Invalid HELLO command")
d.outputMessage(messageError("hello", "Invalid HELLO command"))
return
}
d.userAgent = matches[2]
v, err := strconv.ParseInt(matches[1], 10, 64)
if err != nil {
d.outputChan <- messageError("hello", "Invalid protocol version: "+matches[2])
d.outputMessage(messageError("hello", "Invalid protocol version: "+matches[2]))
return
}
d.reqProtocolVersion = int(v)
if err := d.impl.Hello(d.userAgent, 1); err != nil {
d.outputChan <- messageError("hello", err.Error())
d.outputMessage(messageError("hello", err.Error()))
return
}
d.outputChan <- &message{
d.outputMessage(&message{
EventType: "hello",
ProtocolVersion: 1, // Protocol version 1 is the only supported for now...
Message: "OK",
}
})
d.initialized = true
}

func (d *Server) describe() {
if !d.initialized {
d.outputChan <- messageError("describe", "Monitor not initialized")
d.outputMessage(messageError("describe", "Monitor not initialized"))
return
}
portDescription, err := d.impl.Describe()
if err != nil {
d.outputChan <- messageError("describe", err.Error())
d.outputMessage(messageError("describe", err.Error()))
return
}
d.outputChan <- &message{
d.outputMessage(&message{
EventType: "describe",
Message: "OK",
PortDescription: portDescription,
}
})
}

func (d *Server) configure(cmd string) {
if !d.initialized {
d.outputChan <- messageError("configure", "Monitor not initialized")
d.outputMessage(messageError("configure", "Monitor not initialized"))
return
}
re := regexp.MustCompile(`^([\w.-]+) (.+)$`)
matches := re.FindStringSubmatch(cmd)
if len(matches) != 3 {
d.outputChan <- messageError("configure", "Invalid CONFIGURE command")
d.outputMessage(messageError("configure", "Invalid CONFIGURE command"))
return
}
parameterName := matches[1]
value := matches[2]
if err := d.impl.Configure(parameterName, value); err != nil {
d.outputChan <- messageError("configure", err.Error())
d.outputMessage(messageError("configure", err.Error()))
return
}
d.outputChan <- &message{
d.outputMessage(&message{
EventType: "configure",
Message: "OK",
}
})
}

func (d *Server) open(cmd string) {
if !d.initialized {
d.outputChan <- messageError("open", "Monitor not initialized")
d.outputMessage(messageError("open", "Monitor not initialized"))
return
}
parameters := strings.SplitN(cmd, " ", 2)
if len(parameters) != 2 {
d.outputChan <- messageError("open", "Invalid OPEN command")
d.outputMessage(messageError("open", "Invalid OPEN command"))
return
}
address := parameters[0]
portName := parameters[1]
port, err := d.impl.Open(portName)
if err != nil {
d.outputChan <- messageError("open", err.Error())
d.outputMessage(messageError("open", err.Error()))
return
}
d.clientConn, err = net.Dial("tcp", address)
if err != nil {
d.impl.Close()
d.outputChan <- messageError("open", err.Error())
d.outputMessage(messageError("open", err.Error()))
return
}
// io.Copy is used to bridge the Client's TCP connection to the port one and vice versa
Expand All @@ -242,51 +241,47 @@ func (d *Server) open(cmd string) {
d.close("lost connection with the port")
}
}()
d.outputChan <- &message{
d.outputMessage(&message{
EventType: "open",
Message: "OK",
}
})
}

func (d *Server) close(messageErr string) {
d.closeFuncMutex.Lock()
defer d.closeFuncMutex.Unlock()
if d.clientConn == nil {
if messageErr == "" {
d.outputChan <- messageError("close", "port already closed")
d.outputMessage(messageError("close", "port already closed"))
}
return
}
connErr := d.clientConn.Close()
portErr := d.impl.Close()
d.clientConn = nil
if messageErr != "" {
d.outputChan <- messageError("port_closed", messageErr)
d.outputMessage(messageError("port_closed", messageErr))
return
}
if connErr != nil || portErr != nil {
var errs *multierror.Error
errs = multierror.Append(errs, connErr, portErr)
d.outputChan <- messageError("close", errs.Error())
d.outputMessage(messageError("close", errs.Error()))
return
}
d.outputChan <- &message{
d.outputMessage(&message{
EventType: "close",
Message: "OK",
}
})
}

func (d *Server) outputProcessor(outWriter io.Writer) {
// Start go routine to serialize messages printing
go func() {
for msg := range d.outputChan {
data, err := json.MarshalIndent(msg, "", " ")
if err != nil {
// We are certain that this will be marshalled correctly
// so we don't handle the error
data, _ = json.MarshalIndent(messageError("command_error", err.Error()), "", " ")
}
fmt.Fprintln(outWriter, string(data))
}
}()
func (d *Server) outputMessage(msg *message) {
data, err := json.MarshalIndent(msg, "", " ")
if err != nil {
// We are certain that this will be marshalled correctly so we don't handle the error
data, _ = json.MarshalIndent(messageError("command_error", err.Error()), "", " ")
}
d.outMutex.Lock()
fmt.Fprintln(d.out, string(data))
d.outMutex.Unlock()
}