Skip to content

Commit 2d7e152

Browse files
SticksmanSuperQsysadmind
authored
Gitlab Collector: Wal Receiver Collector and Test (#844)
* Wal Receiver Collector and Test Signed-off-by: Felix Yuan <[email protected]> * Add more escapes Signed-off-by: Felix Yuan <[email protected]> * Corrections to wal_receiver Signed-off-by: Felix Yuan <[email protected]> * Continue on null labels Signed-off-by: Felix Yuan <[email protected]> * Skip nulls and log a message Signed-off-by: Felix Yuan <[email protected]> * Redundant breaks Signed-off-by: Felix Yuan <[email protected]> * Fix up walreceiver Signed-off-by: Felix Yuan <[email protected]> * Remove extra label Signed-off-by: Felix Yuan <[email protected]> * Update collector/pg_stat_walreceiver.go Co-authored-by: Ben Kochie <[email protected]> Signed-off-by: Felix Yuan <[email protected]> * Clean up the extra assignments Signed-off-by: Felix Yuan <[email protected]> * Update collector/pg_stat_walreceiver.go Co-authored-by: Joe Adams <[email protected]> Signed-off-by: Felix Yuan <[email protected]> --------- Signed-off-by: Felix Yuan <[email protected]> Co-authored-by: Ben Kochie <[email protected]> Co-authored-by: Joe Adams <[email protected]>
1 parent dc3e813 commit 2d7e152

File tree

3 files changed

+456
-0
lines changed

3 files changed

+456
-0
lines changed

collector/collector_test.go

+1
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ func readMetric(m prometheus.Metric) MetricResult {
4949
func sanitizeQuery(q string) string {
5050
q = strings.Join(strings.Fields(q), " ")
5151
q = strings.Replace(q, "(", "\\(", -1)
52+
q = strings.Replace(q, "?", "\\?", -1)
5253
q = strings.Replace(q, ")", "\\)", -1)
5354
q = strings.Replace(q, "[", "\\[", -1)
5455
q = strings.Replace(q, "]", "\\]", -1)

collector/pg_stat_walreceiver.go

+269
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,269 @@
1+
// Copyright 2023 The Prometheus Authors
2+
// Licensed under the Apache License, Version 2.0 (the "License");
3+
// you may not use this file except in compliance with the License.
4+
// You may obtain a copy of the License at
5+
//
6+
// http://www.apache.org/licenses/LICENSE-2.0
7+
//
8+
// Unless required by applicable law or agreed to in writing, software
9+
// distributed under the License is distributed on an "AS IS" BASIS,
10+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
// See the License for the specific language governing permissions and
12+
// limitations under the License.
13+
package collector
14+
15+
import (
16+
"context"
17+
"database/sql"
18+
"fmt"
19+
20+
"github.com/go-kit/log"
21+
"github.com/go-kit/log/level"
22+
"github.com/prometheus/client_golang/prometheus"
23+
)
24+
25+
func init() {
26+
registerCollector(statWalReceiverSubsystem, defaultDisabled, NewPGStatWalReceiverCollector)
27+
}
28+
29+
type PGStatWalReceiverCollector struct {
30+
log log.Logger
31+
}
32+
33+
const statWalReceiverSubsystem = "stat_wal_receiver"
34+
35+
func NewPGStatWalReceiverCollector(config collectorConfig) (Collector, error) {
36+
return &PGStatWalReceiverCollector{log: config.logger}, nil
37+
}
38+
39+
var (
40+
labelCats = []string{"upstream_host", "slot_name", "status"}
41+
statWalReceiverReceiveStartLsn = prometheus.NewDesc(
42+
prometheus.BuildFQName(namespace, statWalReceiverSubsystem, "receive_start_lsn"),
43+
"First write-ahead log location used when WAL receiver is started represented as a decimal",
44+
labelCats,
45+
prometheus.Labels{},
46+
)
47+
statWalReceiverReceiveStartTli = prometheus.NewDesc(
48+
prometheus.BuildFQName(namespace, statWalReceiverSubsystem, "receive_start_tli"),
49+
"First timeline number used when WAL receiver is started",
50+
labelCats,
51+
prometheus.Labels{},
52+
)
53+
statWalReceiverFlushedLSN = prometheus.NewDesc(
54+
prometheus.BuildFQName(namespace, statWalReceiverSubsystem, "flushed_lsn"),
55+
"Last write-ahead log location already received and flushed to disk, the initial value of this field being the first log location used when WAL receiver is started represented as a decimal",
56+
labelCats,
57+
prometheus.Labels{},
58+
)
59+
statWalReceiverReceivedTli = prometheus.NewDesc(
60+
prometheus.BuildFQName(namespace, statWalReceiverSubsystem, "received_tli"),
61+
"Timeline number of last write-ahead log location received and flushed to disk",
62+
labelCats,
63+
prometheus.Labels{},
64+
)
65+
statWalReceiverLastMsgSendTime = prometheus.NewDesc(
66+
prometheus.BuildFQName(namespace, statWalReceiverSubsystem, "last_msg_send_time"),
67+
"Send time of last message received from origin WAL sender",
68+
labelCats,
69+
prometheus.Labels{},
70+
)
71+
statWalReceiverLastMsgReceiptTime = prometheus.NewDesc(
72+
prometheus.BuildFQName(namespace, statWalReceiverSubsystem, "last_msg_receipt_time"),
73+
"Send time of last message received from origin WAL sender",
74+
labelCats,
75+
prometheus.Labels{},
76+
)
77+
statWalReceiverLatestEndLsn = prometheus.NewDesc(
78+
prometheus.BuildFQName(namespace, statWalReceiverSubsystem, "latest_end_lsn"),
79+
"Last write-ahead log location reported to origin WAL sender as integer",
80+
labelCats,
81+
prometheus.Labels{},
82+
)
83+
statWalReceiverLatestEndTime = prometheus.NewDesc(
84+
prometheus.BuildFQName(namespace, statWalReceiverSubsystem, "latest_end_time"),
85+
"Time of last write-ahead log location reported to origin WAL sender",
86+
labelCats,
87+
prometheus.Labels{},
88+
)
89+
statWalReceiverUpstreamNode = prometheus.NewDesc(
90+
prometheus.BuildFQName(namespace, statWalReceiverSubsystem, "upstream_node"),
91+
"Node ID of the upstream node",
92+
labelCats,
93+
prometheus.Labels{},
94+
)
95+
96+
pgStatWalColumnQuery = `
97+
SELECT
98+
column_name
99+
FROM information_schema.columns
100+
WHERE
101+
table_name = 'pg_stat_wal_receiver' and
102+
column_name = 'flushed_lsn'
103+
`
104+
105+
pgStatWalReceiverQueryTemplate = `
106+
SELECT
107+
trim(both '''' from substring(conninfo from 'host=([^ ]*)')) as upstream_host,
108+
slot_name,
109+
status,
110+
(receive_start_lsn- '0/0') % (2^52)::bigint as receive_start_lsn,
111+
%s
112+
receive_start_tli,
113+
received_tli,
114+
extract(epoch from last_msg_send_time) as last_msg_send_time,
115+
extract(epoch from last_msg_receipt_time) as last_msg_receipt_time,
116+
(latest_end_lsn - '0/0') % (2^52)::bigint as latest_end_lsn,
117+
extract(epoch from latest_end_time) as latest_end_time,
118+
substring(slot_name from 'repmgr_slot_([0-9]*)') as upstream_node
119+
FROM pg_catalog.pg_stat_wal_receiver
120+
`
121+
)
122+
123+
func (c *PGStatWalReceiverCollector) Update(ctx context.Context, instance *instance, ch chan<- prometheus.Metric) error {
124+
db := instance.getDB()
125+
hasFlushedLSNRows, err := db.QueryContext(ctx, pgStatWalColumnQuery)
126+
if err != nil {
127+
return err
128+
}
129+
130+
defer hasFlushedLSNRows.Close()
131+
hasFlushedLSN := hasFlushedLSNRows.Next()
132+
var query string
133+
if hasFlushedLSN {
134+
query = fmt.Sprintf(pgStatWalReceiverQueryTemplate, "(flushed_lsn - '0/0') % (2^52)::bigint as flushed_lsn,\n")
135+
} else {
136+
query = fmt.Sprintf(pgStatWalReceiverQueryTemplate, "")
137+
}
138+
rows, err := db.QueryContext(ctx, query)
139+
if err != nil {
140+
return err
141+
}
142+
defer rows.Close()
143+
for rows.Next() {
144+
var upstreamHost, slotName, status sql.NullString
145+
var receiveStartLsn, receiveStartTli, flushedLsn, receivedTli, latestEndLsn, upstreamNode sql.NullInt64
146+
var lastMsgSendTime, lastMsgReceiptTime, latestEndTime sql.NullFloat64
147+
148+
if hasFlushedLSN {
149+
if err := rows.Scan(&upstreamHost, &slotName, &status, &receiveStartLsn, &receiveStartTli, &flushedLsn, &receivedTli, &lastMsgSendTime, &lastMsgReceiptTime, &latestEndLsn, &latestEndTime, &upstreamNode); err != nil {
150+
return err
151+
}
152+
} else {
153+
if err := rows.Scan(&upstreamHost, &slotName, &status, &receiveStartLsn, &receiveStartTli, &receivedTli, &lastMsgSendTime, &lastMsgReceiptTime, &latestEndLsn, &latestEndTime, &upstreamNode); err != nil {
154+
return err
155+
}
156+
}
157+
if !upstreamHost.Valid {
158+
level.Debug(c.log).Log("msg", "Skipping wal receiver stats because upstream host is null")
159+
continue
160+
}
161+
162+
if !slotName.Valid {
163+
level.Debug(c.log).Log("msg", "Skipping wal receiver stats because slotname host is null")
164+
continue
165+
}
166+
167+
if !status.Valid {
168+
level.Debug(c.log).Log("msg", "Skipping wal receiver stats because status is null")
169+
continue
170+
}
171+
labels := []string{upstreamHost.String, slotName.String, status.String}
172+
173+
if !receiveStartLsn.Valid {
174+
level.Debug(c.log).Log("msg", "Skipping wal receiver stats because receive_start_lsn is null")
175+
continue
176+
}
177+
if !receiveStartTli.Valid {
178+
level.Debug(c.log).Log("msg", "Skipping wal receiver stats because receive_start_tli is null")
179+
continue
180+
}
181+
if hasFlushedLSN && !flushedLsn.Valid {
182+
level.Debug(c.log).Log("msg", "Skipping wal receiver stats because flushed_lsn is null")
183+
continue
184+
}
185+
if !receivedTli.Valid {
186+
level.Debug(c.log).Log("msg", "Skipping wal receiver stats because received_tli is null")
187+
continue
188+
}
189+
if !lastMsgSendTime.Valid {
190+
level.Debug(c.log).Log("msg", "Skipping wal receiver stats because last_msg_send_time is null")
191+
continue
192+
}
193+
if !lastMsgReceiptTime.Valid {
194+
level.Debug(c.log).Log("msg", "Skipping wal receiver stats because last_msg_receipt_time is null")
195+
continue
196+
}
197+
if !latestEndLsn.Valid {
198+
level.Debug(c.log).Log("msg", "Skipping wal receiver stats because latest_end_lsn is null")
199+
continue
200+
}
201+
if !latestEndTime.Valid {
202+
level.Debug(c.log).Log("msg", "Skipping wal receiver stats because latest_end_time is null")
203+
continue
204+
}
205+
if !upstreamNode.Valid {
206+
level.Debug(c.log).Log("msg", "Skipping wal receiver stats because upstream_node is null")
207+
continue
208+
}
209+
ch <- prometheus.MustNewConstMetric(
210+
statWalReceiverReceiveStartLsn,
211+
prometheus.CounterValue,
212+
float64(receiveStartLsn.Int64),
213+
labels...)
214+
215+
ch <- prometheus.MustNewConstMetric(
216+
statWalReceiverReceiveStartTli,
217+
prometheus.GaugeValue,
218+
float64(receiveStartTli.Int64),
219+
labels...)
220+
221+
if hasFlushedLSN {
222+
ch <- prometheus.MustNewConstMetric(
223+
statWalReceiverFlushedLSN,
224+
prometheus.CounterValue,
225+
float64(flushedLsn.Int64),
226+
labels...)
227+
}
228+
229+
ch <- prometheus.MustNewConstMetric(
230+
statWalReceiverReceivedTli,
231+
prometheus.GaugeValue,
232+
float64(receivedTli.Int64),
233+
labels...)
234+
235+
ch <- prometheus.MustNewConstMetric(
236+
statWalReceiverLastMsgSendTime,
237+
prometheus.CounterValue,
238+
float64(lastMsgSendTime.Float64),
239+
labels...)
240+
241+
ch <- prometheus.MustNewConstMetric(
242+
statWalReceiverLastMsgReceiptTime,
243+
prometheus.CounterValue,
244+
float64(lastMsgReceiptTime.Float64),
245+
labels...)
246+
247+
ch <- prometheus.MustNewConstMetric(
248+
statWalReceiverLatestEndLsn,
249+
prometheus.CounterValue,
250+
float64(latestEndLsn.Int64),
251+
labels...)
252+
253+
ch <- prometheus.MustNewConstMetric(
254+
statWalReceiverLatestEndTime,
255+
prometheus.CounterValue,
256+
latestEndTime.Float64,
257+
labels...)
258+
259+
ch <- prometheus.MustNewConstMetric(
260+
statWalReceiverUpstreamNode,
261+
prometheus.GaugeValue,
262+
float64(upstreamNode.Int64),
263+
labels...)
264+
}
265+
if err := rows.Err(); err != nil {
266+
return err
267+
}
268+
return nil
269+
}

0 commit comments

Comments
 (0)