Skip to content

Commit 30b2bd1

Browse files
committed
control: Add concurrency tests, semaphores to clj-ssh and sshj
Both of these remotes now have concurrency safety tests designed to stress their behavior under rapid-fire multithreaded calls. Both were susceptible to issues related, I think, to a server-side constraint on concurrent channels. I've added semaphores to both remotes, which seems to fix the issue!
1 parent c7f311d commit 30b2bd1

File tree

3 files changed

+109
-40
lines changed

3 files changed

+109
-40
lines changed

jepsen/src/jepsen/control.clj

Lines changed: 25 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -5,15 +5,16 @@
55
66
Note that a whole bunch of this namespace refers to things as 'ssh',
77
although they really can apply to any remote, not just SSH."
8-
(:import java.io.File)
98
(:require [clj-ssh.ssh :as ssh]
109
[jepsen.util :as util :refer [real-pmap with-thread-name]]
1110
[dom-top.core :refer [with-retry]]
1211
[jepsen.reconnect :as rc]
1312
[clojure.string :as str]
1413
[clojure.java.io :as io]
1514
[clojure.tools.logging :refer [warn info debug error]]
16-
[slingshot.slingshot :refer [try+ throw+]]))
15+
[slingshot.slingshot :refer [try+ throw+]])
16+
(:import java.io.File
17+
(java.util.concurrent Semaphore)))
1718

1819
(defprotocol Remote
1920
(connect [this host]
@@ -349,7 +350,9 @@
349350
(ssh/connect))))
350351

351352

352-
(defrecord SSHRemote [session]
353+
(defrecord SSHRemote [concurrency-limit
354+
session
355+
semaphore]
353356
Remote
354357
(connect [this host]
355358
(assoc this :session (if *dummy*
@@ -360,13 +363,19 @@
360363
(throw+ (merge (debug-data)
361364
{:type ::session-error
362365
:message "Error opening SSH session. Verify username, password, and node hostnames are correct."
363-
:host host})))))))
366+
:host host})))))
367+
:semaphore (Semaphore. concurrency-limit true)))
364368

365369
(disconnect! [_]
366370
(when-not (:dummy session) (ssh/disconnect session)))
367371

368372
(execute! [_ action]
369-
(when-not (:dummy session) (ssh/ssh session action)))
373+
(when-not (:dummy session)
374+
(.acquire semaphore)
375+
(try
376+
(ssh/ssh session action)
377+
(finally
378+
(.release semaphore)))))
370379

371380
(upload! [_ local-paths remote-path rest]
372381
(when-not (:dummy session)
@@ -376,7 +385,17 @@
376385
(when-not (:dummy session)
377386
(apply ssh/scp-from session remote-paths local-path rest))))
378387

379-
(def ssh "A remote that does things via clj-ssh." (SSHRemote. nil))
388+
(def concurrency-limit
389+
"OpenSSH has a standard limit of 10 concurrent channels per connection.
390+
However, commands run in quick succession with 10 concurrent *also* seem to
391+
blow out the channel limit--perhaps there's an asynchronous channel teardown
392+
process. We set the limit a bit lower here. This is experimentally determined
393+
for clj-ssh by running jepsen.control-test's integration test... <sigh>"
394+
8)
395+
396+
(def ssh
397+
"A remote that does things via clj-ssh."
398+
(SSHRemote. concurrency-limit nil nil))
380399

381400
(defn session
382401
"Wraps control session in a wrapper for reconnection."

jepsen/src/jepsen/control/sshj.clj

Lines changed: 45 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
jepsen.control's use of clj-ssh with this instead."
44
(:require [byte-streams :as bs]
55
[clojure.tools.logging :refer [info warn]]
6-
[jepsen [control :refer :all]]
6+
[jepsen [control :as c]]
77
[slingshot.slingshot :refer [try+ throw+]])
88
(:import (com.jcraft.jsch.agentproxy AgentProxy
99
ConnectorFactory)
@@ -15,7 +15,8 @@
1515
(net.schmizz.sshj.userauth.method AuthMethod)
1616
(net.schmizz.sshj.xfer FileSystemFile)
1717
(java.io IOException)
18-
(java.util.concurrent TimeUnit)))
18+
(java.util.concurrent Semaphore
19+
TimeUnit)))
1920

2021
(defn auth-methods
2122
"Returns a list of AuthMethods we can use for logging in via an AgentProxy."
@@ -36,38 +37,42 @@
3637
to username/password."
3738
[^SSHClient c]
3839
(or ; Try given key
39-
(when-let [k *private-key-path*]
40-
(.authPublickey c *username* (into-array [k]))
40+
(when-let [k c/*private-key-path*]
41+
(.authPublickey c c/*username* (into-array [k]))
4142
true)
4243

4344
; Try agent
4445
(try
4546
(let [agent-proxy (agent-proxy)
4647
methods (auth-methods agent-proxy)]
47-
(.auth c *username* methods)
48+
(.auth c c/*username* methods)
4849
true)
4950
(catch UserAuthException e
5051
false))
5152

5253
; Fall back to standard id_rsa/id_dsa keys
53-
(try (.authPublickey c ^String *username*)
54+
(try (.authPublickey c ^String c/*username*)
5455
true
5556
(catch UserAuthException e
5657
false))
5758

5859
; OK, standard keys didn't work, try username+password
59-
(.authPassword c *username* *password*)))
60+
(.authPassword c c/*username* c/*password*)))
6061

61-
(defrecord SSHJRemote [^SSHClient client]
62+
(defrecord SSHJRemote [concurrency-limit
63+
^SSHClient client
64+
^Semaphore semaphore]
6265
jepsen.control/Remote
6366
(connect [this host]
6467
(try+ (let [c (doto (SSHClient.)
6568
(.loadKnownHosts)
66-
(.connect host *port*)
69+
(.connect host c/*port*)
6770
auth!)]
68-
(assoc this :client c))
71+
(assoc this
72+
:client c
73+
:semaphore (Semaphore. concurrency-limit true)))
6974
(catch Exception e
70-
(throw+ (assoc (debug-data)
75+
(throw+ (assoc (c/debug-data)
7176
:type :jepsen.control/session-error
7277
:message "Error opening SSH session. Verify username, password, and node hostnames are correct."
7378
:host host)))))
@@ -77,21 +82,26 @@
7782
(.close c)))
7883

7984
(execute! [this action]
80-
(with-open [session (.startSession client)]
81-
(let [cmd (.exec session (:cmd action))]
82-
; Feed it input
83-
(when-let [input (:in action)]
84-
(let [stream (.getOutputStream cmd)]
85-
(bs/transfer input stream)))
86-
; Wait on command
87-
(.join cmd)
88-
; Return completion
89-
(assoc action
90-
:out (.toString (IOUtils/readFully (.getInputStream cmd)))
91-
:err (.toString (IOUtils/readFully (.getErrorStream cmd)))
92-
; There's also a .getExitErrorMessage that might be interesting
93-
; here?
94-
:exit (.getExitStatus cmd)))))
85+
; (info :permits (.availablePermits semaphore))
86+
(.acquire semaphore)
87+
(try
88+
(with-open [session (.startSession client)]
89+
(let [cmd (.exec session (:cmd action))]
90+
; Feed it input
91+
(when-let [input (:in action)]
92+
(let [stream (.getOutputStream cmd)]
93+
(bs/transfer input stream)))
94+
; Wait on command
95+
(.join cmd)
96+
; Return completion
97+
(assoc action
98+
:out (.toString (IOUtils/readFully (.getInputStream cmd)))
99+
:err (.toString (IOUtils/readFully (.getErrorStream cmd)))
100+
; There's also a .getExitErrorMessage that might be interesting
101+
; here?
102+
:exit (.getExitStatus cmd))))
103+
(finally
104+
(.release semaphore))))
95105

96106
(upload! [this local-paths remote-path more]
97107
(with-open [sftp (.newSFTPClient client)]
@@ -101,7 +111,15 @@
101111
(with-open [sftp (.newSFTPClient client)]
102112
(.get sftp remote-paths (FileSystemFile. local-path)))))
103113

114+
(def concurrency-limit
115+
"OpenSSH has a standard limit of 10 concurrent channels per connection.
116+
However, commands run in quick succession with 10 concurrent *also* seem to
117+
blow out the channel limit--perhaps there's an asynchronous channel teardown
118+
process. We set the limit a bit lower here. This is experimentally determined
119+
by running jepsen.control-test's integration test... <sigh>"
120+
6)
121+
104122
(defn remote
105123
"Constructs an SSHJ remote."
106124
[]
107-
(SSHJRemote. nil))
125+
(SSHJRemote. concurrency-limit nil nil))

jepsen/test/jepsen/control_test.clj

Lines changed: 39 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,13 @@
11
(ns jepsen.control-test
2-
(:require [jepsen [control :as c]
2+
(:require [clojure [string :as str]
3+
[test :refer :all]]
4+
[clojure.java.io :as io]
5+
[clojure.tools.logging :refer [info warn]]
6+
[jepsen [control :as c]
37
[common-test :refer [quiet-logging]]
4-
[util :refer [contains-many?]]]
8+
[util :refer [contains-many? real-pmap]]]
59
[jepsen.control.sshj :as sshj]
6-
[slingshot.slingshot :refer [try+ throw+]]
7-
[clojure.java.io :as io]
8-
[clojure.test :refer :all])
10+
[slingshot.slingshot :refer [try+ throw+]])
911
(:import (java.io File)))
1012

1113
(use-fixtures :once quiet-logging)
@@ -21,7 +23,8 @@
2123
(testing "on failure, session throws debug data"
2224
(try+
2325
(c/on "thishostshouldnotresolve"
24-
(c/exec :echo "hello"))
26+
(c/exec :echo "hello")
27+
(is false))
2528
(catch [:type :jepsen.control/session-error] e
2629
(is (= "Error opening SSH session. Verify username, password, and node hostnames are correct." (:message e)))
2730
(is (= "thishostshouldnotresolve" (:host e)))
@@ -61,10 +64,39 @@
6164
(finally
6265
(.delete tmp)))))
6366

64-
(c/exec :rm :-f remote-path))))))
67+
(c/exec :rm :-f remote-path))
68+
69+
(testing "thread safety"
70+
(let [; Which nodes are we going to act over?
71+
nodes ["n1" "n2" "n3" "n4" "n5"]
72+
; Concurrency per node
73+
concurrency 10
74+
; Makes the string we echo bigger or smaller
75+
str-count 10
76+
; How many times do we run each command?
77+
rep-count 5
78+
; What strings are we going to echo on each channel?
79+
xs (->> (range concurrency)
80+
(map (fn [i]
81+
(str/join "," (repeat str-count i)))))
82+
t0 (System/nanoTime)]
83+
; On each each node, and on concurrency channels, that string
84+
; several times, making sure it comes back properly.
85+
(->> (for [node nodes, x xs]
86+
(future
87+
(dotimes [i rep-count]
88+
;(info :call node :i i :x x)
89+
(is (= x (c/exec :echo x))))))
90+
vec
91+
(mapv deref))
92+
;(info :time (-> (System/nanoTime) (- t0) (/ 1e6)) "ms")
93+
))))))
94+
6595

6696
(deftest ^:integration ssh-remote-test
97+
(info :clj-ssh)
6798
(test-remote c/ssh))
6899

69100
(deftest ^:integration sshj-remote-test
101+
(info :sshj)
70102
(test-remote (sshj/remote)))

0 commit comments

Comments
 (0)