Skip to content

Commit 7292932

Browse files
drakkangopherbot
authored andcommitted
ssh: limit the size of the internal packet queue while waiting for KEX
In the SSH protocol, clients and servers execute the key exchange to generate one-time session keys used for encryption and authentication. The key exchange is performed initially after the connection is established and then periodically after a configurable amount of data. While a key exchange is in progress, we add the received packets to an internal queue until we receive SSH_MSG_KEXINIT from the other side. This can result in high memory usage if the other party is slow to respond to the SSH_MSG_KEXINIT packet, or memory exhaustion if a malicious client never responds to an SSH_MSG_KEXINIT packet during a large file transfer. We now limit the internal queue to 64 packets: this means 2MB with the typical 32KB packet size. When the internal queue is full we block further writes until the pending key exchange is completed or there is a read or write error. Thanks to Yuichi Watanabe for reporting this issue. Change-Id: I1ce2214cc16e08b838d4bc346c74c72addafaeec Reviewed-on: https://go-review.googlesource.com/c/crypto/+/652135 Reviewed-by: Neal Patel <[email protected]> Auto-Submit: Gopher Robot <[email protected]> Reviewed-by: Roland Shoemaker <[email protected]> LUCI-TryBot-Result: Go LUCI <[email protected]>
1 parent f66f74b commit 7292932

File tree

2 files changed

+257
-10
lines changed

2 files changed

+257
-10
lines changed

ssh/handshake.go

+37-10
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,11 @@ const debugHandshake = false
2525
// quickly.
2626
const chanSize = 16
2727

28+
// maxPendingPackets sets the maximum number of packets to queue while waiting
29+
// for KEX to complete. This limits the total pending data to maxPendingPackets
30+
// * maxPacket bytes, which is ~16.8MB.
31+
const maxPendingPackets = 64
32+
2833
// keyingTransport is a packet based transport that supports key
2934
// changes. It need not be thread-safe. It should pass through
3035
// msgNewKeys in both directions.
@@ -73,11 +78,19 @@ type handshakeTransport struct {
7378
incoming chan []byte
7479
readError error
7580

76-
mu sync.Mutex
77-
writeError error
78-
sentInitPacket []byte
79-
sentInitMsg *kexInitMsg
80-
pendingPackets [][]byte // Used when a key exchange is in progress.
81+
mu sync.Mutex
82+
// Condition for the above mutex. It is used to notify a completed key
83+
// exchange or a write failure. Writes can wait for this condition while a
84+
// key exchange is in progress.
85+
writeCond *sync.Cond
86+
writeError error
87+
sentInitPacket []byte
88+
sentInitMsg *kexInitMsg
89+
// Used to queue writes when a key exchange is in progress. The length is
90+
// limited by pendingPacketsSize. Once full, writes will block until the key
91+
// exchange is completed or an error occurs. If not empty, it is emptied
92+
// all at once when the key exchange is completed in kexLoop.
93+
pendingPackets [][]byte
8194
writePacketsLeft uint32
8295
writeBytesLeft int64
8396
userAuthComplete bool // whether the user authentication phase is complete
@@ -134,6 +147,7 @@ func newHandshakeTransport(conn keyingTransport, config *Config, clientVersion,
134147

135148
config: config,
136149
}
150+
t.writeCond = sync.NewCond(&t.mu)
137151
t.resetReadThresholds()
138152
t.resetWriteThresholds()
139153

@@ -260,6 +274,7 @@ func (t *handshakeTransport) recordWriteError(err error) {
260274
defer t.mu.Unlock()
261275
if t.writeError == nil && err != nil {
262276
t.writeError = err
277+
t.writeCond.Broadcast()
263278
}
264279
}
265280

@@ -363,6 +378,8 @@ write:
363378
}
364379
}
365380
t.pendingPackets = t.pendingPackets[:0]
381+
// Unblock writePacket if waiting for KEX.
382+
t.writeCond.Broadcast()
366383
t.mu.Unlock()
367384
}
368385

@@ -577,11 +594,20 @@ func (t *handshakeTransport) writePacket(p []byte) error {
577594
}
578595

579596
if t.sentInitMsg != nil {
580-
// Copy the packet so the writer can reuse the buffer.
581-
cp := make([]byte, len(p))
582-
copy(cp, p)
583-
t.pendingPackets = append(t.pendingPackets, cp)
584-
return nil
597+
if len(t.pendingPackets) < maxPendingPackets {
598+
// Copy the packet so the writer can reuse the buffer.
599+
cp := make([]byte, len(p))
600+
copy(cp, p)
601+
t.pendingPackets = append(t.pendingPackets, cp)
602+
return nil
603+
}
604+
for t.sentInitMsg != nil {
605+
// Block and wait for KEX to complete or an error.
606+
t.writeCond.Wait()
607+
if t.writeError != nil {
608+
return t.writeError
609+
}
610+
}
585611
}
586612

587613
if t.writeBytesLeft > 0 {
@@ -598,6 +624,7 @@ func (t *handshakeTransport) writePacket(p []byte) error {
598624

599625
if err := t.pushPacket(p); err != nil {
600626
t.writeError = err
627+
t.writeCond.Broadcast()
601628
}
602629

603630
return nil

ssh/handshake_test.go

+220
Original file line numberDiff line numberDiff line change
@@ -539,6 +539,226 @@ func TestDisconnect(t *testing.T) {
539539
}
540540
}
541541

542+
type mockKeyingTransport struct {
543+
packetConn
544+
kexInitAllowed chan struct{}
545+
kexInitSent chan struct{}
546+
}
547+
548+
func (n *mockKeyingTransport) prepareKeyChange(*algorithms, *kexResult) error {
549+
return nil
550+
}
551+
552+
func (n *mockKeyingTransport) writePacket(packet []byte) error {
553+
if packet[0] == msgKexInit {
554+
<-n.kexInitAllowed
555+
n.kexInitSent <- struct{}{}
556+
}
557+
return n.packetConn.writePacket(packet)
558+
}
559+
560+
func (n *mockKeyingTransport) readPacket() ([]byte, error) {
561+
return n.packetConn.readPacket()
562+
}
563+
564+
func (n *mockKeyingTransport) setStrictMode() error { return nil }
565+
566+
func (n *mockKeyingTransport) setInitialKEXDone() {}
567+
568+
func TestHandshakePendingPacketsWait(t *testing.T) {
569+
a, b := memPipe()
570+
571+
trS := &mockKeyingTransport{
572+
packetConn: a,
573+
kexInitAllowed: make(chan struct{}, 2),
574+
kexInitSent: make(chan struct{}, 2),
575+
}
576+
// Allow the first KEX.
577+
trS.kexInitAllowed <- struct{}{}
578+
579+
trC := &mockKeyingTransport{
580+
packetConn: b,
581+
kexInitAllowed: make(chan struct{}, 2),
582+
kexInitSent: make(chan struct{}, 2),
583+
}
584+
// Allow the first KEX.
585+
trC.kexInitAllowed <- struct{}{}
586+
587+
clientConf := &ClientConfig{
588+
HostKeyCallback: InsecureIgnoreHostKey(),
589+
}
590+
clientConf.SetDefaults()
591+
592+
v := []byte("version")
593+
client := newClientTransport(trC, v, v, clientConf, "addr", nil)
594+
595+
serverConf := &ServerConfig{}
596+
serverConf.AddHostKey(testSigners["ecdsa"])
597+
serverConf.AddHostKey(testSigners["rsa"])
598+
serverConf.SetDefaults()
599+
server := newServerTransport(trS, v, v, serverConf)
600+
601+
if err := server.waitSession(); err != nil {
602+
t.Fatalf("server.waitSession: %v", err)
603+
}
604+
if err := client.waitSession(); err != nil {
605+
t.Fatalf("client.waitSession: %v", err)
606+
}
607+
608+
<-trC.kexInitSent
609+
<-trS.kexInitSent
610+
611+
// Allow and request new KEX server side.
612+
trS.kexInitAllowed <- struct{}{}
613+
server.requestKeyExchange()
614+
// Wait until the KEX init is sent.
615+
<-trS.kexInitSent
616+
// The client is not allowed to respond to the KEX, so writes will be
617+
// blocked on the server side once the packets queue is full.
618+
for i := 0; i < maxPendingPackets; i++ {
619+
p := []byte{msgRequestSuccess, byte(i)}
620+
if err := server.writePacket(p); err != nil {
621+
t.Errorf("unexpected write error: %v", err)
622+
}
623+
}
624+
// The packets queue is now full, the next write will block.
625+
server.mu.Lock()
626+
if len(server.pendingPackets) != maxPendingPackets {
627+
t.Errorf("unexpected pending packets size; got: %d, want: %d", len(server.pendingPackets), maxPendingPackets)
628+
}
629+
server.mu.Unlock()
630+
631+
writeDone := make(chan struct{})
632+
go func() {
633+
defer close(writeDone)
634+
635+
p := []byte{msgRequestSuccess, byte(65)}
636+
// This write will block until KEX completes.
637+
err := server.writePacket(p)
638+
if err != nil {
639+
t.Errorf("unexpected write error: %v", err)
640+
}
641+
}()
642+
643+
// Consume packets on the client side
644+
readDone := make(chan bool)
645+
go func() {
646+
defer close(readDone)
647+
648+
for {
649+
if _, err := client.readPacket(); err != nil {
650+
if err != io.EOF {
651+
t.Errorf("unexpected read error: %v", err)
652+
}
653+
break
654+
}
655+
}
656+
}()
657+
658+
// Allow the client to reply to the KEX and so unblock the write goroutine.
659+
trC.kexInitAllowed <- struct{}{}
660+
<-trC.kexInitSent
661+
<-writeDone
662+
// Close the client to unblock the read goroutine.
663+
client.Close()
664+
<-readDone
665+
server.Close()
666+
}
667+
668+
func TestHandshakePendingPacketsError(t *testing.T) {
669+
a, b := memPipe()
670+
671+
trS := &mockKeyingTransport{
672+
packetConn: a,
673+
kexInitAllowed: make(chan struct{}, 2),
674+
kexInitSent: make(chan struct{}, 2),
675+
}
676+
// Allow the first KEX.
677+
trS.kexInitAllowed <- struct{}{}
678+
679+
trC := &mockKeyingTransport{
680+
packetConn: b,
681+
kexInitAllowed: make(chan struct{}, 2),
682+
kexInitSent: make(chan struct{}, 2),
683+
}
684+
// Allow the first KEX.
685+
trC.kexInitAllowed <- struct{}{}
686+
687+
clientConf := &ClientConfig{
688+
HostKeyCallback: InsecureIgnoreHostKey(),
689+
}
690+
clientConf.SetDefaults()
691+
692+
v := []byte("version")
693+
client := newClientTransport(trC, v, v, clientConf, "addr", nil)
694+
695+
serverConf := &ServerConfig{}
696+
serverConf.AddHostKey(testSigners["ecdsa"])
697+
serverConf.AddHostKey(testSigners["rsa"])
698+
serverConf.SetDefaults()
699+
server := newServerTransport(trS, v, v, serverConf)
700+
701+
if err := server.waitSession(); err != nil {
702+
t.Fatalf("server.waitSession: %v", err)
703+
}
704+
if err := client.waitSession(); err != nil {
705+
t.Fatalf("client.waitSession: %v", err)
706+
}
707+
708+
<-trC.kexInitSent
709+
<-trS.kexInitSent
710+
711+
// Allow and request new KEX server side.
712+
trS.kexInitAllowed <- struct{}{}
713+
server.requestKeyExchange()
714+
// Wait until the KEX init is sent.
715+
<-trS.kexInitSent
716+
// The client is not allowed to respond to the KEX, so writes will be
717+
// blocked on the server side once the packets queue is full.
718+
for i := 0; i < maxPendingPackets; i++ {
719+
p := []byte{msgRequestSuccess, byte(i)}
720+
if err := server.writePacket(p); err != nil {
721+
t.Errorf("unexpected write error: %v", err)
722+
}
723+
}
724+
// The packets queue is now full, the next write will block.
725+
writeDone := make(chan struct{})
726+
go func() {
727+
defer close(writeDone)
728+
729+
p := []byte{msgRequestSuccess, byte(65)}
730+
// This write will block until KEX completes.
731+
err := server.writePacket(p)
732+
if err != io.EOF {
733+
t.Errorf("unexpected write error: %v", err)
734+
}
735+
}()
736+
737+
// Consume packets on the client side
738+
readDone := make(chan bool)
739+
go func() {
740+
defer close(readDone)
741+
742+
for {
743+
if _, err := client.readPacket(); err != nil {
744+
if err != io.EOF {
745+
t.Errorf("unexpected read error: %v", err)
746+
}
747+
break
748+
}
749+
}
750+
}()
751+
752+
// Close the server to unblock the write after an error
753+
server.Close()
754+
<-writeDone
755+
// Unblock the pending write and close the client to unblock the read
756+
// goroutine.
757+
trC.kexInitAllowed <- struct{}{}
758+
client.Close()
759+
<-readDone
760+
}
761+
542762
func TestHandshakeRekeyDefault(t *testing.T) {
543763
clientConf := &ClientConfig{
544764
Config: Config{

0 commit comments

Comments
 (0)