Skip to content

Commit fd2ce5f

Browse files
committed
Add set test
Closes #6
1 parent f3ac116 commit fd2ce5f

File tree

3 files changed

+195
-5
lines changed

3 files changed

+195
-5
lines changed

src/tarantool/client.clj

Lines changed: 100 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,10 @@
33
(:require [clojure.string :as str]
44
[clojure.tools.logging :refer [info warn]]
55
[next.jdbc :as j]
6+
[dom-top.core :as dt]
67
[next.jdbc.connection :as connection]))
78

8-
(def max-timeout "Longest timeout, in ms" 30000)
9+
(def max-timeout "Longest timeout, in ms" 300000)
910

1011
(defn conn-spec
1112
"JDBC connection spec for a node."
@@ -41,3 +42,101 @@
4142
[conn id old new]
4243
(first (vals (first (j/execute! conn ["SELECT _CAS(?, ?, ?, 'JEPSEN')"
4344
id old new])))))
45+
46+
(defmacro with-error-handling
47+
"Common error handling for errors, including txn aborts."
48+
[op & body]
49+
`(try
50+
(with-txn-aborts ~op ~@body)
51+
52+
(catch java.sql.BatchUpdateException e#
53+
(condp re-find (.getMessage e#)
54+
#"Query timed out" (assoc ~op :type :info, :error :query-timed-out)
55+
(throw e#)))
56+
57+
(catch java.sql.SQLNonTransientConnectionException e#
58+
(condp re-find (.getMessage e#)
59+
#"Connection timed out" (assoc ~op :type :info, :error :conn-timed-out)
60+
(throw e#)))
61+
62+
(catch clojure.lang.ExceptionInfo e#
63+
(cond (= "Connection is closed" (.cause (:rollback (ex-data e#))))
64+
(assoc ~op :type :info, :error :conn-closed-rollback-failed)
65+
66+
(= "createStatement() is called on closed connection"
67+
(.cause (:rollback (ex-data e#))))
68+
(assoc ~op :type :fail, :error :conn-closed-rollback-failed)
69+
70+
true (do (info e# :caught (pr-str (ex-data e#)))
71+
(info :caught-rollback (:rollback (ex-data e#)))
72+
(info :caught-cause (.cause (:rollback (ex-data e#))))
73+
(throw e#))))))
74+
75+
(defmacro with-txn-aborts
76+
"Aborts body on rollbacks."
77+
[op & body]
78+
`(let [res# (capture-txn-abort ~@body)]
79+
(if (= ::abort res#)
80+
(assoc ~op :type :fail, :error :conflict)
81+
res#)))
82+
83+
(defmacro with-conn-failure-retry
84+
"DBMS tends to be flaky for a few seconds after starting up, which can wind
85+
up breaking our setup code. This macro adds a little bit of backoff and retry
86+
for those conditions."
87+
[conn & body]
88+
(assert (symbol? conn))
89+
(let [tries (gensym 'tries) ; try count
90+
e (gensym 'e) ; errors
91+
conn-sym (gensym 'conn) ; local conn reference
92+
retry `(do (when (zero? ~tries)
93+
(info "Out of retries!")
94+
(throw ~e))
95+
(info "Connection failure; retrying...")
96+
(Thread/sleep (rand-int 2000))
97+
(~'retry (reopen! ~conn-sym) (dec ~tries)))]
98+
`(dt/with-retry [~conn-sym ~conn
99+
~tries 32]
100+
(let [~conn ~conn-sym] ; Rebind the conn symbol to our current connection
101+
~@body)
102+
(catch org.tarantool.CommunicationException ~e ~retry)
103+
(catch java.sql.BatchUpdateException ~e ~retry)
104+
(catch java.sql.SQLTimeoutException ~e ~retry)
105+
(catch java.sql.SQLNonTransientConnectionException ~e ~retry)
106+
(catch java.sql.SQLException ~e
107+
(condp re-find (.getMessage ~e)
108+
#"Resolve lock timeout" ~retry ; high contention
109+
#"Information schema is changed" ~retry ; ???
110+
#"called on closed connection" ~retry ; definitely didn't happen
111+
#"Region is unavailable" ~retry ; okay fine
112+
(do (info "with-conn-failure-retry isn't sure how to handle SQLException with message" (pr-str (class (.getMessage ~e))) (pr-str (.getMessage ~e)))
113+
(throw ~e)))))))
114+
115+
(defn reopen!
116+
"Closes a connection and returns a new one based on the given connection."
117+
[conn]
118+
; Don't know how to close connection in next.jdbc
119+
;(close! conn)
120+
(open (::node conn) (::test conn)))
121+
122+
(defmacro capture-txn-abort
123+
"Converts aborted transactions to an ::abort keyword"
124+
[& body]
125+
`(try ~@body
126+
(catch java.sql.SQLTransactionRollbackException e#
127+
(if (= (.getMessage e#) rollback-msg)
128+
::abort
129+
(throw e#)))
130+
(catch java.sql.BatchUpdateException e#
131+
(if (= (.getMessage e#) rollback-msg)
132+
::abort
133+
(throw e#)))
134+
(catch java.sql.SQLException e#
135+
(condp re-find (.getMessage e#)
136+
#"can not retry select for update statement" ::abort
137+
#"\[try again later\]" ::abort
138+
(throw e#)))))
139+
140+
(def rollback-msg
141+
"Some drivers have a few exception classes that use this message."
142+
"Deadlock found when trying to get lock; try restarting transaction")

src/tarantool/runner.clj

Lines changed: 22 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,12 +19,23 @@
1919
;[knossos.model :as model]
2020
[jepsen.os.ubuntu :as ubuntu]
2121
[tarantool [db :as db]
22-
[register :as register]]))
22+
[register :as register]
23+
[sets :as sets]]))
2324

2425
(def workloads
2526
"A map of workload names to functions that can take opts and construct
26-
workloads."
27-
{;:set set/workload
27+
workloads.
28+
Each workload is a map like
29+
30+
{:generator a generator of client ops
31+
:final-generator a generator to run after the cluster recovers
32+
:client a client to execute those ops
33+
:checker a checker
34+
:model for the checker}
35+
36+
Or, for some special cases where nemeses and workloads are coupled, we return
37+
a keyword here instead."
38+
{:set sets/workload
2839
;:bank bank/workload
2940
;:bank-index bank/index-workload
3041
;:g2 g2/workload
@@ -104,7 +115,14 @@
104115
nemesis nemesis/noop
105116
gen (->> (:generator workload)
106117
(gen/nemesis (:generator nemesis))
107-
(gen/time-limit (:time-limit opts)))]
118+
(gen/time-limit (:time-limit opts)))
119+
gen (if (:final-generator workload)
120+
(gen/phases gen
121+
(gen/log "Healing cluster")
122+
(gen/nemesis (:final-generator nemesis))
123+
(gen/log "Waiting for recovery...")
124+
(gen/clients (:final-generator workload)))
125+
gen)]
108126
(merge tests/noop-test
109127
opts
110128
{:client (:client workload)

src/tarantool/sets.clj

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
(ns tarantool.sets
2+
"Set (test inserts a series of unique numbers as separate instances, one per
3+
transaction, and attempts to read them back through an index), serializability."
4+
5+
(:require [jepsen [client :as client]
6+
[checker :as checker]
7+
[generator :as gen]
8+
[independent :as independent]]
9+
[next.jdbc :as j]
10+
[next.jdbc.sql :as sql]
11+
[tarantool.client :as cl]
12+
[jepsen.core :as jepsen]
13+
[knossos.model :as model]
14+
[knossos.op :as op]))
15+
16+
(defrecord SetClient [conn]
17+
client/Client
18+
19+
(open! [this test node]
20+
(let [conn (cl/open node test)]
21+
(assert conn)
22+
(assoc this :conn conn :node node)))
23+
24+
(setup! [this test node]
25+
(let [conn (cl/open node test)]
26+
(assert conn)
27+
(cl/with-conn-failure-retry conn
28+
(j/execute! conn ["CREATE TABLE IF NOT EXISTS sets
29+
(id INT NOT NULL PRIMARY KEY AUTOINCREMENT,
30+
value INT NOT NULL)"]))
31+
(assoc this :conn conn :node node)))
32+
33+
(invoke! [this test op]
34+
(let [[k v] (:value op)]
35+
(cl/with-error-handling op
36+
(cl/with-txn-aborts op
37+
(case (:f op)
38+
:add (do (sql/insert! conn :sets {:value v})
39+
(assoc op :type :ok))
40+
41+
:read (->> (sql/query conn ["SELECT * FROM sets"])
42+
(mapv :VALUE)
43+
(assoc op :type :ok, :value)))))))
44+
45+
(teardown! [_ test])
46+
47+
(close! [_ test]))
48+
49+
(defn workload
50+
[opts]
51+
(let [max-key (atom 0) c (:concurrency opts)]
52+
{:client (SetClient. nil)
53+
:checker (independent/checker (checker/set-full {:linearizable? true}))
54+
:generator (independent/concurrent-generator
55+
c
56+
(range)
57+
(fn [k]
58+
(swap! max-key max k)
59+
(->> (range 10000)
60+
(map (fn [x] {:type :invoke, :f :add, :value x}))
61+
gen/seq
62+
(gen/stagger 1/10))))
63+
:final-generator (gen/derefer
64+
(delay
65+
(locking keys
66+
(independent/concurrent-generator
67+
c
68+
(range (inc @max-key))
69+
(fn [k]
70+
(gen/stagger 10
71+
(gen/each
72+
(gen/once {:type :invoke
73+
:f :read}))))))))}))

0 commit comments

Comments
 (0)