Skip to content

Commit ac3e8bd

Browse files
committed
add shutdown timeout, use context while running queries;
better naming for db interface;
1 parent 32f0475 commit ac3e8bd

File tree

3 files changed

+26
-17
lines changed

3 files changed

+26
-17
lines changed

main.go

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
"os"
1010
"os/signal"
1111
"syscall"
12+
"time"
1213

1314
"github.com/prometheus/client_golang/prometheus"
1415
"github.com/prometheus/client_golang/prometheus/promhttp"
@@ -18,7 +19,8 @@ import (
1819
)
1920

2021
const (
21-
indexHTML = `
22+
shutdownTimeout = 10 * time.Second
23+
indexHTML = `
2224
<html>
2325
<head>
2426
<title>Postgresql Exporter</title>
@@ -56,7 +58,9 @@ func main() {
5658
if err := cfg.Load(); err != nil {
5759
log.Fatalf("could not load config: %v", err)
5860
}
59-
collector := pgcollector.New()
61+
ctx, cancel := context.WithCancel(context.Background())
62+
63+
collector := pgcollector.New(ctx)
6064
collector.LoadConfig(cfg)
6165

6266
if err := prometheus.Register(collector); err != nil {
@@ -98,7 +102,10 @@ loop:
98102
log.Printf("received signal: %v", sig)
99103
}
100104
}
101-
if err := srv.Shutdown(context.Background()); err != nil {
105+
cancel()
106+
107+
shutdownCtx, _ := context.WithTimeout(context.Background(), shutdownTimeout)
108+
if err := srv.Shutdown(shutdownCtx); err != nil {
102109
log.Printf("could not shutdown http server: %v", err)
103110
}
104111

pkg/db/db.go

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,8 @@ import (
1515
"github.com/adjust/postgresql_exporter/pkg/config"
1616
)
1717

18-
//DbInterface describes Db methods
19-
type DbInterface interface {
18+
//Interface describes Db methods
19+
type Interface interface {
2020
SetStatementTimeout(time.Duration) error
2121
Exec(string) ([]map[string]interface{}, error)
2222
PgVersion() config.PgVersion
@@ -30,12 +30,13 @@ var ErrQueryTimeout = errors.New("canceled due to statement timeout")
3030

3131
// Db describes database
3232
type Db struct {
33+
ctx context.Context
3334
version config.PgVersion
3435
db *pgx.Conn
3536
}
3637

3738
// New creates new instance of database connection
38-
func New(dbConfig config.DbConfig) (*Db, error) {
39+
func New(ctx context.Context, dbConfig config.DbConfig) (*Db, error) {
3940
var version config.PgVersion
4041

4142
cfg := pgx.ConnConfig{
@@ -76,17 +77,14 @@ func New(dbConfig config.DbConfig) (*Db, error) {
7677
version = config.NoVersion
7778
}
7879

79-
if err != nil {
80-
return nil, fmt.Errorf("could not open connection: %v", err)
81-
}
82-
8380
if !dbConfig.IsNotPg {
8481
if err := dbConn.Ping(context.Background()); err != nil {
8582
return nil, fmt.Errorf("could not ping db: %v", err)
8683
}
8784
}
8885

8986
return &Db{
87+
ctx: ctx,
9088
db: dbConn,
9189
version: version,
9290
}, nil
@@ -96,7 +94,7 @@ func New(dbConfig config.DbConfig) (*Db, error) {
9694
func (d *Db) Exec(query string) ([]map[string]interface{}, error) {
9795
values := make([]map[string]interface{}, 0)
9896

99-
rows, err := d.db.Query(query)
97+
rows, err := d.db.QueryEx(d.ctx, query, nil)
10098
if err != nil {
10199
return nil, fmt.Errorf("query error: %v", err)
102100
}

pkg/pgcollector/pgcollector.go

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package pgcollector
22

33
import (
4+
"context"
45
"fmt"
56
"log"
67
"sync"
@@ -32,6 +33,7 @@ type PgCollector struct {
3233
config config.Interface
3334
timeOuts uint32
3435
errors uint32
36+
ctx context.Context
3537
}
3638

3739
type workerJob struct {
@@ -40,8 +42,10 @@ type workerJob struct {
4042
}
4143

4244
// New create new instance of the PostgreSQL metrics collector
43-
func New() *PgCollector {
44-
return &PgCollector{}
45+
func New(ctx context.Context) *PgCollector {
46+
return &PgCollector{
47+
ctx: ctx,
48+
}
4549
}
4650

4751
// LoadConfig loads config
@@ -84,7 +88,7 @@ func createMetric(job *workerJob, name string, constLabels prometheus.Labels, ra
8488
return nil, nil
8589
}
8690

87-
func (p *PgCollector) worker(conn db.DbInterface, jobs chan *workerJob, res chan<- prometheus.Metric, wg *sync.WaitGroup) {
91+
func (p *PgCollector) worker(conn db.Interface, jobs chan *workerJob, res chan<- prometheus.Metric, wg *sync.WaitGroup) {
8892
defer wg.Done()
8993

9094
jobs:
@@ -204,16 +208,16 @@ func (p *PgCollector) Collect(metricsCh chan<- prometheus.Metric) {
204208

205209
wg := &sync.WaitGroup{}
206210

207-
dbPool := make(map[string][]db.DbInterface)
211+
dbPool := make(map[string][]db.Interface)
208212
dbJobs := make(map[string]chan *workerJob)
209213

210214
for _, dbName := range p.config.DbList() {
211215
dbConf := p.config.Db(dbName)
212216
workersCnt := dbConf.Workers()
213217

214-
dbPool[dbName] = make([]db.DbInterface, 0)
218+
dbPool[dbName] = make([]db.Interface, 0)
215219
for i := 0; i < workersCnt; i++ {
216-
conn, err := db.New(dbConf)
220+
conn, err := db.New(p.ctx, dbConf)
217221
if err != nil {
218222
log.Printf("could not create db instance %q: %v", dbName, err)
219223
atomic.AddUint32(&p.errors, 1)

0 commit comments

Comments
 (0)