- Notifications
You must be signed in to change notification settings - Fork926
fix: use authenticated urls for pubsub#14261
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to ourterms of service andprivacy statement. We’ll occasionally send you account related emails.
Already on GitHub?Sign in to your account
Uh oh!
There was an error while loading.Please reload this page.
Conversation
coderd/database/pubsub/pubsub.go Outdated
} | ||
// Set the dialer if the connector supports it. | ||
if dc, ok := connector.(database.DialerConnector); ok { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
We need to drop a CRITICAL log if this type assertion fails, since it means we've introduced a new driver that doesn't support a Dialer and our logs will be incomplete.
// Create a custom connector if the database driver supports it. | ||
connectorCreator, ok := p.db.Driver().(database.ConnectorCreator) | ||
if ok { | ||
connector, err = connectorCreator.Connector(connectURL) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
This code doesn't get hit in the package unit tests. A good way to test it would be to create a pq Driver wrapper that we can control.
I'd like to see a test where we start pubsub with the wrapped driver, do some pub'ing and sub'ing, then kill the connection and verify that the pubsub / pq.Listener reconnects automatically. That would give a nice test of the pq changes you made as well.
} | ||
// DialerConnector can create a driver.Connector and set a pq.Dialer. | ||
type DialerConnector interface { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
👍
coderd/database/dbtestutil/driver.go Outdated
name string | ||
inner driver.Driver | ||
connections []driver.Conn | ||
listeners map[chan struct{}]chan struct{} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
This connections and listeners business is not threadsafe, and so can cause races in our tests.
A better design would be to just have achan driver.Conn
that we pass each new connection thru. Tests will have to know how many connections they expect, and can read from the channel. For thePubSub
, it's the listener we want to test, which should use a connection when it first connects, then another one when it reconnects.
That way you don't need methods likeAddConnection
,WaitForConnection
, orDropConnections
-- the test code reads from the channel to wait for the connection, and can directly close the connection when it wants to interrupt it.
There is some complexity around thesql.DB
, which we use for publishing and has a pool of connections. I suggest you sidestep that complexity by just using a secondPubSub
for publishing with a regularpq
driver.
coderd/database/dbtestutil/driver.go Outdated
} | ||
func Register() (*Driver, error) { | ||
db, err := sql.Open("postgres", "") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
This is a roundabout way to get apq
driver, which is justpq.Driver{}
coderd/database/dbtestutil/driver.go Outdated
listeners: make(map[chan struct{}]chan struct{}), | ||
} | ||
sql.Register(d.name, d) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
It seems like you're doing this so that you can get asql.DB
to pass to the PubSub with this instrumented driver. But, registering it and usingsql.Open()
with the name is more complex than it needs to be.
If you allow test code to directly instantiate the instrumented connector, then you can get asql.DB
as:
db := sql.ConnectDB(connector)
That avoids registering the driver and worrying about a unique name.
case <-gotChan: | ||
case <-ctx.Done(): | ||
t.Fatal("timeout waiting for message") | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
This construction comes up a lot, so we have
_ = testutil.RequireRecvContext(ctx, t, gotChan)
for this purpose.
case <-reconnectChan: | ||
case <-ctx.Done(): | ||
t.Fatal("timeout waiting for reconnect") | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
_ = testutil.RequireRecvChan(ctx, t, reconnectChan)
case <-gotChan: | ||
case <-ctx.Done(): | ||
t.Fatal("timeout waiting for message after reconnect") | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
_ = testutil.RequireRecvCtx(ctx, t, gotChan)
@@ -119,3 +120,79 @@ func TestPGPubsub_Metrics(t *testing.T) { | |||
!testutil.PromCounterGathered(t, metrics, "coder_pubsub_latency_measure_errs_total") | |||
}, testutil.WaitShort, testutil.IntervalFast) | |||
} | |||
func TestPGPubsubDriver(t *testing.T) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
This is a good test, but the design of the instrumented driver is racy -- see comments ondbtestutil/driver.go
for how to address.
@spikecurtis Thanks for the feedback on the test structure, I've updated it with your suggestions and I think things are looking good. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
One problem inline that needs fixing, but I don't need to review again.
coderd/database/dbtestutil/driver.go Outdated
return conn, nil | ||
} | ||
conn, err := c.driver.Open(c.name) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others.Learn more.
This is circular.Driver.Open()
creates a newConnector
and calls into it. Fortunately, the pubsub creates a dialer, but anyone else who uses this might not and will overflow their stack.
You need to usepq.Driver{}
directly here.
ded612d
intomainUh oh!
There was an error while loading.Please reload this page.
Uh oh!
There was an error while loading.Please reload this page.
Fix forhttps://github.com/coder/customers/issues/639
The
pq.Listener
we use for PubSub does not expose a way to hook into the lifecycle of reconnecting and that prevents us from supporting rotating passwords on reconnection. Previously, we've used a forkcoder/pq
that exposes a way to create listeners with a given connector. The newv2 branch ofcoder/pq
is a copy of the1.10.9
tag fromlib/pq
with our patch on top to ensure no other package changes are brought in besides ours.This now allows us to create a
*sql.DB
that usesawsiamrds
under the hood and allows us to make a new connector that will generate new credentials on PubSub reconnect.I've updated the
awsiamrds
tests to now include testing pubsub. I've tested this on an EC2 instance with the correct roles for connecting to a test RDS instance:Pubsub reconnect tests: