Skip to content

Commit c81201e

Browse files
authored
Bring TestTx helper into rivershared for use in other projects (#814)
I found in another demo project that I wanted to have access to the `TestTx` helper for tests, but that we weren't exporting it anywhere. Here, bring it into `rivershared` for easy access everywhere. I put two variants in for now, the simpler `TestTx` that uses a pool to `TEST_DATABASE_URL` or `river_test`, and a `TestTxPool` that lets you inject your own pool. The former is in use for the time being by `riverinternaltest` so it can do its existing connection customization logic, but as we fully migrate it to `rivershared`, we should see if we can drop that along with `TestTxPool` maybe. Even if we don't drop the latter, it's not the worst thing to have around because it definitely increases flexibility. I also added some code that uses `context.WithoutCancel` to be able to issue a full rollback in `TestTx` cleanup. This isn't needed by the main River project yet, but probably will be soon. When switching to the new Go `t.Context` in tests, the context is cancelled before cleanup phases are called, so without these changes the rollback will fail.
1 parent 125d93a commit c81201e

File tree

5 files changed

+171
-39
lines changed

5 files changed

+171
-39
lines changed

internal/riverinternaltest/riverinternaltest.go

Lines changed: 1 addition & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@ package riverinternaltest
44

55
import (
66
"context"
7-
"errors"
87
"fmt"
98
"log"
109
"net/url"
@@ -219,44 +218,7 @@ func TestTx(ctx context.Context, tb testing.TB) pgx.Tx {
219218
return dbPool
220219
}
221220

222-
tx, err := getPool().Begin(ctx)
223-
require.NoError(tb, err)
224-
225-
tb.Cleanup(func() {
226-
err := tx.Rollback(ctx)
227-
228-
if err == nil {
229-
return
230-
}
231-
232-
// Try to look for an error on rollback because it does occasionally
233-
// reveal a real problem in the way a test is written. However, allow
234-
// tests to roll back their transaction early if they like, so ignore
235-
// `ErrTxClosed`.
236-
if errors.Is(err, pgx.ErrTxClosed) {
237-
return
238-
}
239-
240-
// In case of a cancelled context during a database operation, which
241-
// happens in many tests, pgx seems to not only roll back the
242-
// transaction, but closes the connection, and returns this error on
243-
// rollback. Allow this error since it's hard to prevent it in our flows
244-
// that use contexts heavily.
245-
if err.Error() == "conn closed" {
246-
return
247-
}
248-
249-
// Similar to the above, but a newly appeared error that wraps the
250-
// above. As far as I can tell, no error variables are available to use
251-
// with `errors.Is`.
252-
if err.Error() == "failed to deallocate cached statement(s): conn closed" {
253-
return
254-
}
255-
256-
require.NoError(tb, err)
257-
})
258-
259-
return tx
221+
return riversharedtest.TestTxPool(ctx, tb, getPool())
260222
}
261223

262224
// TruncateRiverTables truncates River tables in the target database. This is

rivershared/go.mod

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ require (
1515

1616
require (
1717
github.com/davecgh/go-spew v1.1.1 // indirect
18+
github.com/jackc/pgerrcode v0.0.0-20240316143900-6e2875d9b438 // indirect
1819
github.com/pmezard/go-difflib v1.0.0 // indirect
1920
gopkg.in/yaml.v3 v3.0.1 // indirect
2021
)

rivershared/go.sum

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
22
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
3+
github.com/jackc/pgerrcode v0.0.0-20240316143900-6e2875d9b438 h1:Dj0L5fhJ9F82ZJyVOmBx6msDp/kfd1t9GRfny/mfJA0=
4+
github.com/jackc/pgerrcode v0.0.0-20240316143900-6e2875d9b438/go.mod h1:a/s9Lp5W7n/DD0VrVoyJ00FbP2ytTPDVOivvn2bMlds=
35
github.com/kr/pretty v0.3.0 h1:WgNl7dwNpEZ6jJ9k1snq4pZsg7DOEN8hP9Xw0Tsjwk0=
46
github.com/kr/pretty v0.3.0/go.mod h1:640gp4NfQd8pI5XOwp5fnNeVWj67G7CFk/SaSQn7NBk=
57
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
@@ -8,10 +10,13 @@ github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZb
810
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
911
github.com/riverqueue/river v0.18.0 h1:sGHeTOL9MR8+pMIVHRm59fzet8Ron/xjF3Yq/PSGb78=
1012
github.com/riverqueue/river v0.18.0/go.mod h1:oapX5xb/L2YnkE801QubDZ0COHxVxEGVY37icPzghhU=
13+
github.com/riverqueue/river v0.19.0/go.mod h1:YJ7LA2uBdqFHQJzKyYc+X6S04KJeiwsS1yU5a1rynlk=
1114
github.com/riverqueue/river/riverdriver v0.18.0 h1:a2haR5I0MQLHjLCSVFpUEeJALCLemRl5zCztucysm1E=
1215
github.com/riverqueue/river/riverdriver v0.18.0/go.mod h1:Mj45PbHabEnBv/nSah0J1/tg6hrX/SNeXtcYcSqMzxQ=
16+
github.com/riverqueue/river/riverdriver v0.19.0/go.mod h1:Soxi08hHkEvopExAp6ADG2437r4coSiB4QpuIL5E28k=
1317
github.com/riverqueue/river/rivertype v0.18.0 h1:YsXR5NbLAzniurGO0+zcISWMKq7Y71xkIe2oi86OAsE=
1418
github.com/riverqueue/river/rivertype v0.18.0/go.mod h1:DETcejveWlq6bAb8tHkbgJqmXWVLiFhTiEm8j7co1bE=
19+
github.com/riverqueue/river/rivertype v0.19.0/go.mod h1:DETcejveWlq6bAb8tHkbgJqmXWVLiFhTiEm8j7co1bE=
1520
github.com/rogpeppe/go-internal v1.12.0 h1:exVL4IDcn6na9z1rAb56Vxr+CgyK3nn3O+epU5NdKM8=
1621
github.com/rogpeppe/go-internal v1.12.0/go.mod h1:E+RYuTGaKKdloAfM02xzb0FW3Paa99yedzYV+kq4uf4=
1722
github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA=

rivershared/riversharedtest/riversharedtest.go

Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,18 @@
11
package riversharedtest
22

33
import (
4+
"cmp"
5+
"context"
6+
"errors"
47
"fmt"
58
"log/slog"
69
"os"
710
"sync"
811
"testing"
912
"time"
1013

14+
"github.com/jackc/pgx/v5"
15+
"github.com/jackc/pgx/v5/pgxpool"
1116
"github.com/stretchr/testify/require"
1217
"go.uber.org/goleak"
1318

@@ -27,6 +32,47 @@ func BaseServiceArchetype(tb testing.TB) *baseservice.Archetype {
2732
}
2833
}
2934

35+
// A pool and mutex to protect it, lazily initialized by TestTx. Once open, this
36+
// pool is never explicitly closed, instead closing implicitly as the package
37+
// tests finish.
38+
var (
39+
dbPool *pgxpool.Pool //nolint:gochecknoglobals
40+
dbPoolMu sync.RWMutex //nolint:gochecknoglobals
41+
)
42+
43+
// DBPool gets a lazily initialized database pool for `TEST_DATABASE_URL` or
44+
// `river_test` if the former isn't specified.
45+
func DBPool(ctx context.Context, tb testing.TB) *pgxpool.Pool {
46+
tb.Helper()
47+
48+
tryPool := func() *pgxpool.Pool {
49+
dbPoolMu.RLock()
50+
defer dbPoolMu.RUnlock()
51+
return dbPool
52+
}
53+
54+
if dbPool := tryPool(); dbPool != nil {
55+
return dbPool
56+
}
57+
58+
dbPoolMu.Lock()
59+
defer dbPoolMu.Unlock()
60+
61+
// Multiple goroutines may have passed the initial `nil` check on start
62+
// up, so check once more to make sure pool hasn't been set yet.
63+
if dbPool != nil {
64+
return dbPool
65+
}
66+
67+
dbPool, err := pgxpool.New(ctx, cmp.Or(
68+
os.Getenv("TEST_DATABASE_URL"),
69+
"postgres://localhost:5432/river_test",
70+
))
71+
require.NoError(tb, err)
72+
73+
return dbPool
74+
}
75+
3076
// Logger returns a logger suitable for use in tests.
3177
//
3278
// Defaults to informational verbosity. If env is set with `RIVER_DEBUG=true`,
@@ -48,6 +94,69 @@ func LoggerWarn(tb testing.TB) *slog.Logger {
4894
return slogtest.NewLogger(tb, &slog.HandlerOptions{Level: slog.LevelWarn})
4995
}
5096

97+
// TestTx starts a test transaction that's rolled back automatically as the test
98+
// case is cleaning itself up.
99+
//
100+
// This variant uses the default database pool from DBPool that points to
101+
// `TEST_DATABASE_URL` or `river_test` if the former wasn't specified.
102+
func TestTx(ctx context.Context, tb testing.TB) pgx.Tx {
103+
tb.Helper()
104+
return TestTxPool(ctx, tb, DBPool(ctx, tb))
105+
}
106+
107+
// TestTxPool starts a test transaction that's rolled back automatically as the
108+
// test case is cleaning itself up.
109+
//
110+
// This variant starts the test transaction on the specified database pool.
111+
func TestTxPool(ctx context.Context, tb testing.TB, dbPool *pgxpool.Pool) pgx.Tx {
112+
tb.Helper()
113+
114+
tx, err := dbPool.Begin(ctx)
115+
require.NoError(tb, err)
116+
117+
tb.Cleanup(func() {
118+
// Tests may inerit context from `t.Context()` which is cancelled after
119+
// tests run and before calling clean up. We need a non-cancelled
120+
// context to issue rollback here, so use a bit of a bludgeon to do so
121+
// with `context.WithoutCancel()`.
122+
ctx := context.WithoutCancel(ctx)
123+
124+
err := tx.Rollback(ctx)
125+
126+
if err == nil {
127+
return
128+
}
129+
130+
// Try to look for an error on rollback because it does occasionally
131+
// reveal a real problem in the way a test is written. However, allow
132+
// tests to roll back their transaction early if they like, so ignore
133+
// `ErrTxClosed`.
134+
if errors.Is(err, pgx.ErrTxClosed) {
135+
return
136+
}
137+
138+
// In case of a cancelled context during a database operation, which
139+
// happens in many tests, pgx seems to not only roll back the
140+
// transaction, but closes the connection, and returns this error on
141+
// rollback. Allow this error since it's hard to prevent it in our flows
142+
// that use contexts heavily.
143+
if err.Error() == "conn closed" {
144+
return
145+
}
146+
147+
// Similar to the above, but a newly appeared error that wraps the
148+
// above. As far as I can tell, no error variables are available to use
149+
// with `errors.Is`.
150+
if err.Error() == "failed to deallocate cached statement(s): conn closed" {
151+
return
152+
}
153+
154+
require.NoError(tb, err)
155+
})
156+
157+
return tx
158+
}
159+
51160
// TimeStub implements baseservice.TimeGeneratorWithStub to allow time to be
52161
// stubbed in tests.
53162
//

rivershared/riversharedtest/riversharedtest_test.go

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,67 @@
11
package riversharedtest
22

33
import (
4+
"context"
45
"testing"
56
"time"
67

8+
"github.com/jackc/pgerrcode"
9+
"github.com/jackc/pgx/v5/pgconn"
710
"github.com/stretchr/testify/require"
811
)
912

13+
func TestDBPool(t *testing.T) {
14+
t.Parallel()
15+
16+
ctx := context.Background()
17+
18+
pool := DBPool(ctx, t)
19+
_, err := pool.Exec(ctx, "SELECT 1")
20+
require.NoError(t, err)
21+
}
22+
23+
func TestTestTx(t *testing.T) {
24+
t.Parallel()
25+
26+
ctx := context.Background()
27+
28+
type PoolOrTx interface {
29+
Exec(ctx context.Context, sql string, arguments ...any) (commandTag pgconn.CommandTag, err error)
30+
}
31+
32+
checkTestTable := func(ctx context.Context, poolOrTx PoolOrTx) error {
33+
_, err := poolOrTx.Exec(ctx, "SELECT * FROM river_shared_test_tx_table")
34+
return err
35+
}
36+
37+
// Test cleanups are invoked in the order of last added, first called. When
38+
// TestTx is called below it adds a cleanup, so we want to make sure that
39+
// this cleanup, which checks that the database remains pristine, is invoked
40+
// after the TestTx cleanup, so we add it first.
41+
t.Cleanup(func() {
42+
// Tests may inherit context from `t.Context()` which is cancelled after
43+
// tests run and before calling clean up. We need a non-cancelled
44+
// context to issue rollback here, so use a bit of a bludgeon to do so
45+
// with `context.WithoutCancel()`.
46+
ctx := context.WithoutCancel(ctx)
47+
48+
err := checkTestTable(ctx, DBPool(ctx, t))
49+
require.Error(t, err)
50+
51+
var pgErr *pgconn.PgError
52+
require.ErrorAs(t, err, &pgErr)
53+
require.Equal(t, pgerrcode.UndefinedTable, pgErr.Code)
54+
})
55+
56+
tx := TestTx(ctx, t)
57+
58+
_, err := tx.Exec(ctx, "CREATE TABLE river_shared_test_tx_table (id bigint)")
59+
require.NoError(t, err)
60+
61+
err = checkTestTable(ctx, tx)
62+
require.NoError(t, err)
63+
}
64+
1065
func TestWaitOrTimeout(t *testing.T) {
1166
t.Parallel()
1267

0 commit comments

Comments
 (0)