|
| 1 | +(ns tarantool.counter |
| 2 | + "Incrementing and decrementing a counter." |
| 3 | + (:require [jepsen [client :as client] |
| 4 | + [checker :as checker] |
| 5 | + [generator :as gen] |
| 6 | + [independent :as independent]] |
| 7 | + [jepsen.checker.timeline :as timeline] |
| 8 | + [clojure.tools.logging :refer [debug info warn]] |
| 9 | + [next.jdbc :as j] |
| 10 | + [next.jdbc.sql :as sql] |
| 11 | + [tarantool.client :as cl] |
| 12 | + [jepsen.core :as jepsen] |
| 13 | + [knossos.model :as model] |
| 14 | + [knossos.op :as op])) |
| 15 | + |
| 16 | +(def table-name "counter") |
| 17 | + |
| 18 | +(defrecord CounterClient [conn] |
| 19 | + client/Client |
| 20 | + |
| 21 | + (open! [this test node] |
| 22 | + (let [conn (cl/open node test)] |
| 23 | + (assert conn) |
| 24 | + (assoc this :conn conn :node node))) |
| 25 | + |
| 26 | + (setup! [this test node] |
| 27 | + (let [conn (cl/open node test)] |
| 28 | + (assert conn) |
| 29 | + (when (= node (jepsen/primary test)) |
| 30 | + (cl/with-conn-failure-retry conn |
| 31 | + (j/execute! conn [(str "CREATE TABLE IF NOT EXISTS " table-name |
| 32 | + " (id INT NOT NULL PRIMARY KEY, |
| 33 | + count INT NOT NULL)")])) |
| 34 | + (sql/insert! conn table-name {:id 0 :count 0})) |
| 35 | + (assoc this :conn conn :node node))) |
| 36 | + |
| 37 | + (invoke! [this test op] |
| 38 | + (cl/with-error-handling op |
| 39 | + (cl/with-txn-aborts op |
| 40 | + (case (:f op) |
| 41 | + :add (do (j/execute! conn |
| 42 | + [(str "UPDATE " table-name " SET count = count + ? WHERE id = 0") (:value op)]) |
| 43 | + (assoc op :type :ok)) |
| 44 | + |
| 45 | + :read (let [value (:COUNT (first (sql/query conn [(str "SELECT count FROM " table-name " WHERE id = 0")])))] |
| 46 | + (assoc op :type :ok :value value)))))) |
| 47 | + |
| 48 | + (teardown! [_ test] |
| 49 | + (cl/with-conn-failure-retry conn |
| 50 | + (j/execute! conn [(str "DROP TABLE IF EXISTS " table-name)]))) |
| 51 | + |
| 52 | + (close! [_ test])) |
| 53 | + |
| 54 | +(def add {:type :invoke :f :add :value 1}) |
| 55 | +(def sub {:type :invoke :f :add :value -1}) |
| 56 | +(def r {:type :invoke :f :read}) |
| 57 | + |
| 58 | +(defn with-op-index |
| 59 | + "Append :op-index integer to every operation emitted by the given generator. |
| 60 | + Value starts at 1 and increments by 1 for every subsequent emitted operation." |
| 61 | + [gen] |
| 62 | + (let [ctr (atom 0)] |
| 63 | + (gen/map (fn add-op-index [op] |
| 64 | + (assoc op :op-index (swap! ctr inc))) |
| 65 | + gen))) |
| 66 | + |
| 67 | +(defn workload-inc |
| 68 | + [opts] |
| 69 | + {:client (CounterClient. nil) |
| 70 | + :generator (->> (repeat 100 add) |
| 71 | + (cons r) |
| 72 | + gen/mix |
| 73 | + (gen/delay 1/10) |
| 74 | + (with-op-index)) |
| 75 | + :checker (checker/compose |
| 76 | + {:timeline (timeline/html) |
| 77 | + :counter (checker/counter)})}) |
| 78 | + |
| 79 | +(defn workload-dec |
| 80 | + [opts] |
| 81 | + (assoc (workload-inc opts) |
| 82 | + :generator (->> (take 100 (cycle [add sub])) |
| 83 | + (cons r) |
| 84 | + gen/mix |
| 85 | + (gen/delay 1/10) |
| 86 | + (with-op-index)))) |
0 commit comments