Skip to content

Commit 5faa9e8

Browse files
committed
feat: Add wal_status to replication_slot
1 parent c4c90df commit 5faa9e8

File tree

2 files changed

+39
-10
lines changed

2 files changed

+39
-10
lines changed

collector/pg_replication_slot.go

+28-2
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,15 @@ var (
7272
"number of bytes that can be written to WAL such that this slot is not in danger of getting in state lost",
7373
[]string{"slot_name", "slot_type"}, nil,
7474
)
75+
pgReplicationSlotWalStatus = prometheus.NewDesc(
76+
prometheus.BuildFQName(
77+
namespace,
78+
replicationSlotSubsystem,
79+
"wal_status",
80+
),
81+
"availability of WAL files claimed by this slot",
82+
[]string{"slot_name", "slot_type"}, nil,
83+
)
7584

7685
pgReplicationSlotQuery = `SELECT
7786
slot_name,
@@ -83,7 +92,8 @@ var (
8392
END AS current_wal_lsn,
8493
COALESCE(confirmed_flush_lsn, '0/0') - '0/0' AS confirmed_flush_lsn,
8594
active,
86-
safe_wal_size
95+
safe_wal_size,
96+
wal_status
8797
FROM pg_replication_slots;`
8898
)
8999

@@ -103,7 +113,8 @@ func (PGReplicationSlotCollector) Update(ctx context.Context, instance *instance
103113
var flushLSN sql.NullFloat64
104114
var isActive sql.NullBool
105115
var safeWalSize sql.NullInt64
106-
if err := rows.Scan(&slotName, &slotType, &walLSN, &flushLSN, &isActive, &safeWalSize); err != nil {
116+
var walStatus sql.NullString
117+
if err := rows.Scan(&slotName, &slotType, &walLSN, &flushLSN, &isActive, &safeWalSize, &walStatus); err != nil {
107118
return err
108119
}
109120

@@ -149,6 +160,21 @@ func (PGReplicationSlotCollector) Update(ctx context.Context, instance *instance
149160
prometheus.GaugeValue, float64(safeWalSize.Int64), slotNameLabel, slotTypeLabel,
150161
)
151162
}
163+
164+
if walStatus.Valid {
165+
// See https://www.postgresql.org/docs/14/view-pg-replication-slots.html
166+
walStatusMap := map[string]int{
167+
"reserved": 0, // reserved means that the claimed files are within max_wal_size.
168+
"extended": 1, // extended means that max_wal_size is exceeded but the files are still retained, either by the replication slot or by wal_keep_size.
169+
"unreserved": 2, // unreserved means that the slot no longer retains the required WAL files and some of them are to be removed at the next checkpoint. This state can return to reserved or extended.
170+
"lost": 3, // lost means that some required WAL files have been removed and this slot is no longer usable.
171+
}
172+
173+
ch <- prometheus.MustNewConstMetric(
174+
pgReplicationSlotWalStatus,
175+
prometheus.GaugeValue, float64(walStatusMap[walStatus.String]), slotNameLabel, slotTypeLabel,
176+
)
177+
}
152178
}
153179
return rows.Err()
154180
}

collector/pg_replication_slot_test.go

+11-8
Original file line numberDiff line numberDiff line change
@@ -31,9 +31,9 @@ func TestPgReplicationSlotCollectorActive(t *testing.T) {
3131

3232
inst := &instance{db: db}
3333

34-
columns := []string{"slot_name", "slot_type", "current_wal_lsn", "confirmed_flush_lsn", "active", "safe_wal_size"}
34+
columns := []string{"slot_name", "slot_type", "current_wal_lsn", "confirmed_flush_lsn", "active", "safe_wal_size", "wal_status"}
3535
rows := sqlmock.NewRows(columns).
36-
AddRow("test_slot", "physical", 5, 3, true, 323906992)
36+
AddRow("test_slot", "physical", 5, 3, true, 323906992, "reserved")
3737
mock.ExpectQuery(sanitizeQuery(pgReplicationSlotQuery)).WillReturnRows(rows)
3838

3939
ch := make(chan prometheus.Metric)
@@ -51,6 +51,7 @@ func TestPgReplicationSlotCollectorActive(t *testing.T) {
5151
{labels: labelMap{"slot_name": "test_slot", "slot_type": "physical"}, value: 3, metricType: dto.MetricType_GAUGE},
5252
{labels: labelMap{"slot_name": "test_slot", "slot_type": "physical"}, value: 1, metricType: dto.MetricType_GAUGE},
5353
{labels: labelMap{"slot_name": "test_slot", "slot_type": "physical"}, value: 323906992, metricType: dto.MetricType_GAUGE},
54+
{labels: labelMap{"slot_name": "test_slot", "slot_type": "physical"}, value: 0, metricType: dto.MetricType_GAUGE},
5455
}
5556

5657
convey.Convey("Metrics comparison", t, func() {
@@ -73,9 +74,9 @@ func TestPgReplicationSlotCollectorInActive(t *testing.T) {
7374

7475
inst := &instance{db: db}
7576

76-
columns := []string{"slot_name", "slot_type", "current_wal_lsn", "confirmed_flush_lsn", "active", "safe_wal_size"}
77+
columns := []string{"slot_name", "slot_type", "current_wal_lsn", "confirmed_flush_lsn", "active", "safe_wal_size", "wal_status"}
7778
rows := sqlmock.NewRows(columns).
78-
AddRow("test_slot", "physical", 6, 12, false, -4000)
79+
AddRow("test_slot", "physical", 6, 12, false, -4000, "extended")
7980
mock.ExpectQuery(sanitizeQuery(pgReplicationSlotQuery)).WillReturnRows(rows)
8081

8182
ch := make(chan prometheus.Metric)
@@ -92,6 +93,7 @@ func TestPgReplicationSlotCollectorInActive(t *testing.T) {
9293
{labels: labelMap{"slot_name": "test_slot", "slot_type": "physical"}, value: 6, metricType: dto.MetricType_GAUGE},
9394
{labels: labelMap{"slot_name": "test_slot", "slot_type": "physical"}, value: 0, metricType: dto.MetricType_GAUGE},
9495
{labels: labelMap{"slot_name": "test_slot", "slot_type": "physical"}, value: -4000, metricType: dto.MetricType_GAUGE},
96+
{labels: labelMap{"slot_name": "test_slot", "slot_type": "physical"}, value: 1, metricType: dto.MetricType_GAUGE},
9597
}
9698

9799
convey.Convey("Metrics comparison", t, func() {
@@ -115,9 +117,9 @@ func TestPgReplicationSlotCollectorActiveNil(t *testing.T) {
115117

116118
inst := &instance{db: db}
117119

118-
columns := []string{"slot_name", "slot_type", "current_wal_lsn", "confirmed_flush_lsn", "active", "safe_wal_size"}
120+
columns := []string{"slot_name", "slot_type", "current_wal_lsn", "confirmed_flush_lsn", "active", "safe_wal_size", "wal_status"}
119121
rows := sqlmock.NewRows(columns).
120-
AddRow("test_slot", "physical", 6, 12, nil, nil)
122+
AddRow("test_slot", "physical", 6, 12, nil, nil, "lost")
121123
mock.ExpectQuery(sanitizeQuery(pgReplicationSlotQuery)).WillReturnRows(rows)
122124

123125
ch := make(chan prometheus.Metric)
@@ -133,6 +135,7 @@ func TestPgReplicationSlotCollectorActiveNil(t *testing.T) {
133135
expected := []MetricResult{
134136
{labels: labelMap{"slot_name": "test_slot", "slot_type": "physical"}, value: 6, metricType: dto.MetricType_GAUGE},
135137
{labels: labelMap{"slot_name": "test_slot", "slot_type": "physical"}, value: 0, metricType: dto.MetricType_GAUGE},
138+
{labels: labelMap{"slot_name": "test_slot", "slot_type": "physical"}, value: 3, metricType: dto.MetricType_GAUGE},
136139
}
137140

138141
convey.Convey("Metrics comparison", t, func() {
@@ -155,9 +158,9 @@ func TestPgReplicationSlotCollectorTestNilValues(t *testing.T) {
155158

156159
inst := &instance{db: db}
157160

158-
columns := []string{"slot_name", "slot_type", "current_wal_lsn", "confirmed_flush_lsn", "active", "safe_wal_size"}
161+
columns := []string{"slot_name", "slot_type", "current_wal_lsn", "confirmed_flush_lsn", "active", "safe_wal_size", "wal_status"}
159162
rows := sqlmock.NewRows(columns).
160-
AddRow(nil, nil, nil, nil, true, nil)
163+
AddRow(nil, nil, nil, nil, true, nil, nil)
161164
mock.ExpectQuery(sanitizeQuery(pgReplicationSlotQuery)).WillReturnRows(rows)
162165

163166
ch := make(chan prometheus.Metric)

0 commit comments

Comments
 (0)