Skip to content

Commit 05e90a9

Browse files
committed
enable new behavior with cli flag
Signed-off-by: Max Englander <[email protected]>
1 parent 0f7b6ed commit 05e90a9

26 files changed

+100
-69
lines changed

cmd/postgres_exporter/main.go

Lines changed: 63 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,11 @@ package main
1515

1616
import (
1717
"fmt"
18+
"log/slog"
1819
"net/http"
1920
"os"
2021
"strings"
22+
"time"
2123

2224
"github.com/alecthomas/kingpin/v2"
2325
"github.com/prometheus-community/postgres_exporter/collector"
@@ -32,6 +34,65 @@ import (
3234
"github.com/prometheus/exporter-toolkit/web/kingpinflag"
3335
)
3436

37+
func registerPostgresCollector(dsn string, exporter *Exporter, logger *slog.Logger, excludedDatabases []string, scrapeTimeout time.Duration, concurrentScrape bool) {
38+
if dsn == "" {
39+
return
40+
}
41+
42+
var instance *collector.Instance
43+
var opts []collector.Option
44+
45+
if concurrentScrape {
46+
// Original behavior: dedicated instance for collector, creates new connection per scrape
47+
inst, err := collector.NewInstance(dsn)
48+
if err != nil {
49+
logger.Warn("Failed to create instance", "err", err.Error())
50+
return
51+
}
52+
instance = inst
53+
// Add option to create new instance per collect
54+
opts = append(opts, collector.WithInstancePerCollect())
55+
} else {
56+
// New optimized behavior: share connection from server
57+
server, err := exporter.servers.GetServer(dsn)
58+
if err != nil {
59+
logger.Warn("Failed to get server for collectors", "err", err.Error())
60+
return
61+
}
62+
63+
inst, err := collector.NewInstance(dsn)
64+
if err != nil {
65+
logger.Warn("Failed to create instance", "err", err.Error())
66+
return
67+
}
68+
69+
err = inst.SetupWithConnection(server.db)
70+
if err != nil {
71+
logger.Warn("Failed to setup shared instance", "err", err.Error())
72+
return
73+
}
74+
instance = inst
75+
}
76+
77+
// Add timeout option
78+
opts = append(opts, collector.WithTimeout(scrapeTimeout))
79+
80+
// Create collector
81+
pe, err := collector.NewPostgresCollector(
82+
logger,
83+
excludedDatabases,
84+
instance,
85+
[]string{},
86+
opts...,
87+
)
88+
if err != nil {
89+
logger.Warn("Failed to create PostgresCollector", "err", err.Error())
90+
return
91+
}
92+
93+
prometheus.MustRegister(pe)
94+
}
95+
3596
var (
3697
c = config.Handler{
3798
Config: &config.Config{},
@@ -50,6 +111,7 @@ var (
50111
includeDatabases = kingpin.Flag("include-databases", "A list of databases to include when autoDiscoverDatabases is enabled (DEPRECATED)").Default("").Envar("PG_EXPORTER_INCLUDE_DATABASES").String()
51112
metricPrefix = kingpin.Flag("metric-prefix", "A metric prefix can be used to have non-default (not \"pg\") prefixes for each of the metrics").Default("pg").Envar("PG_EXPORTER_METRIC_PREFIX").String()
52113
scrapeTimeout = kingpin.Flag("scrape-timeout", "Maximum time for a scrape to complete before timing out (0 = no timeout)").Default("0").Envar("PG_EXPORTER_SCRAPE_TIMEOUT").Duration()
114+
concurrentScrape = kingpin.Flag("concurrent-scrape", "Use dedicated instance for collector allowing concurrent scrapes (default: true for backward compatibility)").Default("true").Envar("PG_EXPORTER_CONCURRENT_SCRAPE").Bool()
53115
logger = promslog.NewNopLogger()
54116
)
55117

@@ -133,38 +195,7 @@ func main() {
133195
dsn = dsns[0]
134196
}
135197

136-
if dsn != "" {
137-
// Get the server connection to share with collectors
138-
server, err := exporter.servers.GetServer(dsn)
139-
if err != nil {
140-
logger.Warn("Failed to get server for collectors", "err", err.Error())
141-
} else {
142-
// Create instance with shared connection from server
143-
instance, err := collector.NewInstance(dsn)
144-
if err != nil {
145-
logger.Warn("Failed to create instance", "err", err.Error())
146-
} else {
147-
err = instance.SetupWithConnection(server.db)
148-
if err != nil {
149-
logger.Warn("Failed to setup shared instance", "err", err.Error())
150-
} else {
151-
// Create collector with shared instance (instancePerCollect defaults to false)
152-
pe, err := collector.NewPostgresCollector(
153-
logger,
154-
excludedDatabases,
155-
instance,
156-
[]string{},
157-
collector.WithTimeout(*scrapeTimeout),
158-
)
159-
if err != nil {
160-
logger.Warn("Failed to create PostgresCollector", "err", err.Error())
161-
} else {
162-
prometheus.MustRegister(pe)
163-
}
164-
}
165-
}
166-
}
167-
}
198+
registerPostgresCollector(dsn, exporter, logger, excludedDatabases, *scrapeTimeout, *concurrentScrape)
168199

169200
http.Handle(*metricsPath, promhttp.Handler())
170201

collector/collector.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ var (
5959
)
6060

6161
type Collector interface {
62-
Update(ctx context.Context, instance *instance, ch chan<- prometheus.Metric) error
62+
Update(ctx context.Context, instance *Instance, ch chan<- prometheus.Metric) error
6363
}
6464

6565
type collectorConfig struct {
@@ -93,7 +93,7 @@ type PostgresCollector struct {
9393
logger *slog.Logger
9494
scrapeTimeout time.Duration
9595

96-
instance *instance
96+
instance *Instance
9797
instancePerCollect bool
9898
}
9999

@@ -116,7 +116,7 @@ func WithInstancePerCollect() Option {
116116
}
117117

118118
// NewPostgresCollector creates a new PostgresCollector.
119-
func NewPostgresCollector(logger *slog.Logger, excludeDatabases []string, instance *instance, filters []string, options ...Option) (*PostgresCollector, error) {
119+
func NewPostgresCollector(logger *slog.Logger, excludeDatabases []string, instance *Instance, filters []string, options ...Option) (*PostgresCollector, error) {
120120
p := &PostgresCollector{
121121
logger: logger,
122122
instance: instance,
@@ -184,7 +184,7 @@ func (p PostgresCollector) Collect(ch chan<- prometheus.Metric) {
184184
ctx = context.Background()
185185
}
186186

187-
var inst *instance
187+
var inst *Instance
188188

189189
if p.instancePerCollect {
190190
// copy the instance so that concurrent scrapes have independent instances
@@ -212,7 +212,7 @@ func (p PostgresCollector) Collect(ch chan<- prometheus.Metric) {
212212
wg.Wait()
213213
}
214214

215-
func execute(ctx context.Context, name string, c Collector, instance *instance, ch chan<- prometheus.Metric, logger *slog.Logger) {
215+
func execute(ctx context.Context, name string, c Collector, instance *Instance, ch chan<- prometheus.Metric, logger *slog.Logger) {
216216
begin := time.Now()
217217
err := c.Update(ctx, instance, ch)
218218
duration := time.Since(begin)

collector/instance.go

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -21,14 +21,14 @@ import (
2121
"github.com/blang/semver/v4"
2222
)
2323

24-
type instance struct {
24+
type Instance struct {
2525
dsn string
2626
db *sql.DB
2727
version semver.Version
2828
}
2929

30-
func NewInstance(dsn string) (*instance, error) {
31-
i := &instance{
30+
func NewInstance(dsn string) (*Instance, error) {
31+
i := &Instance{
3232
dsn: dsn,
3333
}
3434

@@ -44,13 +44,13 @@ func NewInstance(dsn string) (*instance, error) {
4444
}
4545

4646
// copy returns a copy of the instance.
47-
func (i *instance) copy() *instance {
48-
return &instance{
47+
func (i *Instance) copy() *Instance {
48+
return &Instance{
4949
dsn: i.dsn,
5050
}
5151
}
5252

53-
func (i *instance) setup() error {
53+
func (i *Instance) setup() error {
5454
db, err := sql.Open("postgres", i.dsn)
5555
if err != nil {
5656
return err
@@ -69,7 +69,7 @@ func (i *instance) setup() error {
6969
}
7070

7171
// SetupWithConnection sets up the instance with an existing database connection.
72-
func (i *instance) SetupWithConnection(db *sql.DB) error {
72+
func (i *Instance) SetupWithConnection(db *sql.DB) error {
7373
i.db = db
7474

7575
version, err := queryVersion(i.db)
@@ -80,11 +80,11 @@ func (i *instance) SetupWithConnection(db *sql.DB) error {
8080
return nil
8181
}
8282

83-
func (i *instance) getDB() *sql.DB {
83+
func (i *Instance) getDB() *sql.DB {
8484
return i.db
8585
}
8686

87-
func (i *instance) Close() error {
87+
func (i *Instance) Close() error {
8888
return i.db.Close()
8989
}
9090

collector/pg_buffercache_summary.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,7 @@ var (
9191

9292
// Update implements Collector
9393
// It is called by the Prometheus registry when collecting metrics.
94-
func (c BuffercacheSummaryCollector) Update(ctx context.Context, instance *instance, ch chan<- prometheus.Metric) error {
94+
func (c BuffercacheSummaryCollector) Update(ctx context.Context, instance *Instance, ch chan<- prometheus.Metric) error {
9595
// pg_buffercache_summary is only in v16, and we don't need support for earlier currently.
9696
if !instance.version.GE(semver.MustParse("16.0.0")) {
9797
return nil

collector/pg_database.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ var (
7676
// each database individually. This is because we can't filter the
7777
// list of databases in the query because the list of excluded
7878
// databases is dynamic.
79-
func (c PGDatabaseCollector) Update(ctx context.Context, instance *instance, ch chan<- prometheus.Metric) error {
79+
func (c PGDatabaseCollector) Update(ctx context.Context, instance *Instance, ch chan<- prometheus.Metric) error {
8080
db := instance.getDB()
8181
// Query the list of databases
8282
rows, err := db.QueryContext(ctx,

collector/pg_database_wraparound.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ var (
6161
`
6262
)
6363

64-
func (c *PGDatabaseWraparoundCollector) Update(ctx context.Context, instance *instance, ch chan<- prometheus.Metric) error {
64+
func (c *PGDatabaseWraparoundCollector) Update(ctx context.Context, instance *Instance, ch chan<- prometheus.Metric) error {
6565
db := instance.getDB()
6666
rows, err := db.QueryContext(ctx,
6767
databaseWraparoundQuery)

collector/pg_locks.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,7 @@ var (
8888

8989
// Update implements Collector and exposes database locks.
9090
// It is called by the Prometheus registry when collecting metrics.
91-
func (c PGLocksCollector) Update(ctx context.Context, instance *instance, ch chan<- prometheus.Metric) error {
91+
func (c PGLocksCollector) Update(ctx context.Context, instance *Instance, ch chan<- prometheus.Metric) error {
9292
db := instance.getDB()
9393
// Query the list of databases
9494
rows, err := db.QueryContext(ctx,

collector/pg_long_running_transactions.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ AND pid <> pg_backend_pid();
6161
`
6262
)
6363

64-
func (PGLongRunningTransactionsCollector) Update(ctx context.Context, instance *instance, ch chan<- prometheus.Metric) error {
64+
func (PGLongRunningTransactionsCollector) Update(ctx context.Context, instance *Instance, ch chan<- prometheus.Metric) error {
6565
db := instance.getDB()
6666
rows, err := db.QueryContext(ctx,
6767
longRunningTransactionsQuery)

collector/pg_postmaster.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ var (
4747
pgPostmasterQuery = "SELECT extract(epoch from pg_postmaster_start_time) from pg_postmaster_start_time();"
4848
)
4949

50-
func (c *PGPostmasterCollector) Update(ctx context.Context, instance *instance, ch chan<- prometheus.Metric) error {
50+
func (c *PGPostmasterCollector) Update(ctx context.Context, instance *Instance, ch chan<- prometheus.Metric) error {
5151
db := instance.getDB()
5252
row := db.QueryRowContext(ctx,
5353
pgPostmasterQuery)

collector/pg_process_idle.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ var pgProcessIdleSeconds = prometheus.NewDesc(
4444
prometheus.Labels{},
4545
)
4646

47-
func (PGProcessIdleCollector) Update(ctx context.Context, instance *instance, ch chan<- prometheus.Metric) error {
47+
func (PGProcessIdleCollector) Update(ctx context.Context, instance *Instance, ch chan<- prometheus.Metric) error {
4848
db := instance.getDB()
4949
row := db.QueryRowContext(ctx,
5050
`WITH

0 commit comments

Comments
 (0)