Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

fix: use authenticated urls for pubsub#14261

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to ourterms of service andprivacy statement. We’ll occasionally send you account related emails.

Already on GitHub?Sign in to your account

Merged
f0ssel merged 25 commits intomainfromf0ssel/pubsub-conn-2
Aug 26, 2024
Merged
Show file tree
Hide file tree
Changes fromall commits
Commits
Show all changes
25 commits
Select commitHold shift + click to select a range
b89ff23
fix: use authenticated urls for pubsub
f0sselAug 13, 2024
88d50b8
comments
f0sselAug 13, 2024
8be0524
flake
f0sselAug 13, 2024
3be05c2
fmt
f0sselAug 13, 2024
c7964e4
temp add personal fork
f0sselAug 13, 2024
6ff2ec7
update coder fork
f0sselAug 14, 2024
02258ee
test pubsub
f0sselAug 14, 2024
9d8681b
test pubsub
f0sselAug 14, 2024
85f6cfe
t log
f0sselAug 14, 2024
dae69c7
add customer driver tests
f0sselAug 21, 2024
55df464
merge main
f0sselAug 21, 2024
2019fec
update flake
f0sselAug 21, 2024
ff2784c
lint
f0sselAug 21, 2024
c18963b
move to defer
f0sselAug 21, 2024
1600abf
fix tests
f0sselAug 22, 2024
839a52e
remove log
f0sselAug 22, 2024
428f3f5
merge main
f0sselAug 22, 2024
cd61cc7
update flake
f0sselAug 22, 2024
c946cdd
fmt
f0sselAug 22, 2024
1b139e4
close chan
f0sselAug 22, 2024
8c96f6e
use pq
f0sselAug 23, 2024
a645a76
merge main
f0sselAug 23, 2024
ed009f7
update flake
f0sselAug 23, 2024
c8a7c9a
fix flake
f0sselAug 23, 2024
05cbc3c
sleep to prevent flake
f0sselAug 26, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 51 additions & 1 deletioncoderd/database/awsiamrds/awsiamrds.go
View file
Open in desktop
Original file line numberDiff line numberDiff line change
Expand Up@@ -10,15 +10,21 @@ import (
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/feature/rds/auth"
"github.com/lib/pq"
"golang.org/x/xerrors"

"github.com/coder/coder/v2/coderd/database"
)

type awsIamRdsDriver struct {
parent driver.Driver
cfg aws.Config
}

var _ driver.Driver = &awsIamRdsDriver{}
var (
_ driver.Driver = &awsIamRdsDriver{}
_ database.ConnectorCreator = &awsIamRdsDriver{}
)

// Register initializes and registers our aws iam rds wrapped database driver.
func Register(ctx context.Context, parentName string) (string, error) {
Expand DownExpand Up@@ -65,6 +71,16 @@ func (d *awsIamRdsDriver) Open(name string) (driver.Conn, error) {
return conn, nil
}

// Connector returns a driver.Connector that fetches a new authentication token for each connection.
func (d *awsIamRdsDriver) Connector(name string) (driver.Connector, error) {
connector := &connector{
url: name,
cfg: d.cfg,
}

return connector, nil
}

func getAuthenticatedURL(cfg aws.Config, dbURL string) (string, error) {
nURL, err := url.Parse(dbURL)
if err != nil {
Expand All@@ -82,3 +98,37 @@ func getAuthenticatedURL(cfg aws.Config, dbURL string) (string, error) {

return nURL.String(), nil
}

type connector struct {
url string
cfg aws.Config
dialer pq.Dialer
}

var _ database.DialerConnector = &connector{}

func (c *connector) Connect(ctx context.Context) (driver.Conn, error) {
nURL, err := getAuthenticatedURL(c.cfg, c.url)
if err != nil {
return nil, xerrors.Errorf("assigning authentication token to url: %w", err)
}

nc, err := pq.NewConnector(nURL)
if err != nil {
return nil, xerrors.Errorf("creating new connector: %w", err)
}

if c.dialer != nil {
nc.Dialer(c.dialer)
}

return nc.Connect(ctx)
}

func (*connector) Driver() driver.Driver {
return &pq.Driver{}
}

func (c *connector) Dialer(dialer pq.Dialer) {
c.dialer = dialer
}
28 changes: 25 additions & 3 deletionscoderd/database/awsiamrds/awsiamrds_test.go
View file
Open in desktop
Original file line numberDiff line numberDiff line change
Expand Up@@ -7,10 +7,11 @@ import (

"github.com/stretchr/testify/require"

"cdr.dev/slog"
"cdr.dev/slog/sloggers/slogtest"

"github.com/coder/coder/v2/cli"
awsrdsiam "github.com/coder/coder/v2/coderd/database/awsiamrds"
"github.com/coder/coder/v2/coderd/database/awsiamrds"
"github.com/coder/coder/v2/coderd/database/pubsub"
"github.com/coder/coder/v2/testutil"
)

Expand All@@ -22,13 +23,15 @@ func TestDriver(t *testing.T) {
// export DBAWSIAMRDS_TEST_URL="postgres://user@host:5432/dbname";
url := os.Getenv("DBAWSIAMRDS_TEST_URL")
if url == "" {
t.Log("skipping test; no DBAWSIAMRDS_TEST_URL set")
t.Skip()
}

logger := slogtest.Make(t, nil).Leveled(slog.LevelDebug)
ctx, cancel := context.WithTimeout(context.Background(), testutil.WaitShort)
defer cancel()

sqlDriver, err :=awsrdsiam.Register(ctx, "postgres")
sqlDriver, err :=awsiamrds.Register(ctx, "postgres")
require.NoError(t, err)

db, err := cli.ConnectToPostgres(ctx, slogtest.Make(t, nil), sqlDriver, url)
Expand All@@ -47,4 +50,23 @@ func TestDriver(t *testing.T) {
var one int
require.NoError(t, i.Scan(&one))
require.Equal(t, 1, one)

ps, err := pubsub.New(ctx, logger, db, url)
require.NoError(t, err)

gotChan := make(chan struct{})
subCancel, err := ps.Subscribe("test", func(_ context.Context, _ []byte) {
close(gotChan)
})
defer subCancel()
require.NoError(t, err)

err = ps.Publish("test", []byte("hello"))
require.NoError(t, err)

select {
case <-gotChan:
case <-ctx.Done():
require.Fail(t, "timed out waiting for message")
}
}
19 changes: 19 additions & 0 deletionscoderd/database/connector.go
View file
Open in desktop
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
package database

import (
"database/sql/driver"

"github.com/lib/pq"
)

// ConnectorCreator is a driver.Driver that can create a driver.Connector.
type ConnectorCreator interface {
driver.Driver
Connector(name string) (driver.Connector, error)
}

// DialerConnector is a driver.Connector that can set a pq.Dialer.
type DialerConnector interface {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

👍

f0ssel reacted with thumbs up emoji
driver.Connector
Dialer(dialer pq.Dialer)
}
79 changes: 79 additions & 0 deletionscoderd/database/dbtestutil/driver.go
View file
Open in desktop
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
package dbtestutil

import (
"context"
"database/sql/driver"

"github.com/lib/pq"
"golang.org/x/xerrors"

"github.com/coder/coder/v2/coderd/database"
)

var _ database.DialerConnector = &Connector{}

type Connector struct {
name string
driver *Driver
dialer pq.Dialer
}

func (c *Connector) Connect(_ context.Context) (driver.Conn, error) {
if c.dialer != nil {
conn, err := pq.DialOpen(c.dialer, c.name)
if err != nil {
return nil, xerrors.Errorf("failed to dial open connection: %w", err)
}

c.driver.Connections <- conn

return conn, nil
}

conn, err := pq.Driver{}.Open(c.name)
if err != nil {
return nil, xerrors.Errorf("failed to open connection: %w", err)
}

c.driver.Connections <- conn

return conn, nil
}

func (c *Connector) Driver() driver.Driver {
return c.driver
}

func (c *Connector) Dialer(dialer pq.Dialer) {
c.dialer = dialer
}

type Driver struct {
Connections chan driver.Conn
}

func NewDriver() *Driver {
return &Driver{
Connections: make(chan driver.Conn, 1),
}
}

func (d *Driver) Connector(name string) (driver.Connector, error) {
return &Connector{
name: name,
driver: d,
}, nil
}

func (d *Driver) Open(name string) (driver.Conn, error) {
c, err := d.Connector(name)
if err != nil {
return nil, err
}

return c.Connect(context.Background())
}

func (d *Driver) Close() {
close(d.Connections)
}
39 changes: 34 additions & 5 deletionscoderd/database/pubsub/pubsub.go
View file
Open in desktop
Original file line numberDiff line numberDiff line change
Expand Up@@ -3,6 +3,7 @@ package pubsub
import (
"context"
"database/sql"
"database/sql/driver"
"errors"
"io"
"net"
Expand All@@ -15,6 +16,8 @@ import (
"github.com/prometheus/client_golang/prometheus"
"golang.org/x/xerrors"

"github.com/coder/coder/v2/coderd/database"

"cdr.dev/slog"
)

Expand DownExpand Up@@ -432,9 +435,35 @@ func (p *PGPubsub) startListener(ctx context.Context, connectURL string) error {
// pq.defaultDialer uses a zero net.Dialer as well.
d: net.Dialer{},
}
connector driver.Connector
err error
)

// Create a custom connector if the database driver supports it.
connectorCreator, ok := p.db.Driver().(database.ConnectorCreator)
if ok {
connector, err = connectorCreator.Connector(connectURL)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

This code doesn't get hit in the package unit tests. A good way to test it would be to create a pq Driver wrapper that we can control.

I'd like to see a test where we start pubsub with the wrapped driver, do some pub'ing and sub'ing, then kill the connection and verify that the pubsub / pq.Listener reconnects automatically. That would give a nice test of the pq changes you made as well.

f0ssel reacted with thumbs up emoji
if err != nil {
return xerrors.Errorf("create custom connector: %w", err)
}
} else {
// use the default pq connector otherwise
connector, err = pq.NewConnector(connectURL)
if err != nil {
return xerrors.Errorf("create pq connector: %w", err)
}
}

// Set the dialer if the connector supports it.
dc, ok := connector.(database.DialerConnector)
if !ok {
p.logger.Critical(ctx, "connector does not support setting log dialer, database connection debug logs will be missing")
} else {
dc.Dialer(dialer)
}

p.pgListener = pqListenerShim{
Listener: pq.NewDialListener(dialer, connectURL, time.Second, time.Minute, func(t pq.ListenerEventType, err error) {
Listener: pq.NewConnectorListener(connector, connectURL, time.Second, time.Minute, func(t pq.ListenerEventType, err error) {
switch t {
case pq.ListenerEventConnected:
p.logger.Info(ctx, "pubsub connected to postgres")
Expand DownExpand Up@@ -583,8 +612,8 @@ func (p *PGPubsub) Collect(metrics chan<- prometheus.Metric) {
}

// New creates a new Pubsub implementation using a PostgreSQL connection.
func New(startCtx context.Context, logger slog.Logger,database *sql.DB, connectURL string) (*PGPubsub, error) {
p := newWithoutListener(logger,database)
func New(startCtx context.Context, logger slog.Logger,db *sql.DB, connectURL string) (*PGPubsub, error) {
p := newWithoutListener(logger,db)
if err := p.startListener(startCtx, connectURL); err != nil {
return nil, err
}
Expand All@@ -594,11 +623,11 @@ func New(startCtx context.Context, logger slog.Logger, database *sql.DB, connect
}

// newWithoutListener creates a new PGPubsub without creating the pqListener.
func newWithoutListener(logger slog.Logger,database *sql.DB) *PGPubsub {
func newWithoutListener(logger slog.Logger,db *sql.DB) *PGPubsub {
return &PGPubsub{
logger: logger,
listenDone: make(chan struct{}),
db:database,
db:db,
queues: make(map[string]map[uuid.UUID]*msgQueue),
latencyMeasurer: NewLatencyMeasurer(logger.Named("latency-measurer")),

Expand Down
Loading
Loading

[8]ページ先頭

©2009-2025 Movatter.jp