Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

fix: use authenticated urls for pubsub#14261

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to ourterms of service andprivacy statement. We’ll occasionally send you account related emails.

Already on GitHub?Sign in to your account

Merged
f0ssel merged 25 commits intomainfromf0ssel/pubsub-conn-2
Aug 26, 2024
Merged

Conversation

f0ssel
Copy link
Contributor

@f0sself0ssel commentedAug 13, 2024
edited
Loading

Fix forhttps://github.com/coder/customers/issues/639

Thepq.Listener we use for PubSub does not expose a way to hook into the lifecycle of reconnecting and that prevents us from supporting rotating passwords on reconnection. Previously, we've used a forkcoder/pq that exposes a way to create listeners with a given connector. The newv2 branch ofcoder/pq is a copy of the1.10.9 tag fromlib/pq with our patch on top to ensure no other package changes are brought in besides ours.

This now allows us to create a*sql.DB that usesawsiamrds under the hood and allows us to make a new connector that will generate new credentials on PubSub reconnect.

I've updated theawsiamrds tests to now include testing pubsub. I've tested this on an EC2 instance with the correct roles for connecting to a test RDS instance:

~/coder$ export AWS_DEFAULT_REGION=us-east-2; export DBAWSIAMRDS_TEST_URL=postgres://coder@f0ssel-awsiamrds-instance-1.crvoyau6uypq.us-east-2.rds.amazonaws.com:5432/coder; go test -v ./coderd/database/awsiamrds/awsiamrds_test.go=== RUN   TestDriver=== PAUSE TestDriver=== CONT  TestDriver    t.go:99: 2024-08-14 17:46:23.235 [info]  pubsub dialing postgres  network=tcp  address=f0ssel-awsiamrds-instance-1.crvoyau6uypq.us-east-2.rds.amazonaws.com:5432  timeout_ms=0    t.go:99: 2024-08-14 17:46:23.237 [info]  pubsub postgres TCP connection established  network=tcp  address=f0ssel-awsiamrds-instance-1.crvoyau6uypq.us-east-2.rds.amazonaws.com:5432  timeout_ms=0  elapsed_ms=2    t.go:99: 2024-08-14 17:46:23.261 [info]  pubsub connected to postgres    t.go:99: 2024-08-14 17:46:23.261 [info]  pubsub has started    t.go:99: 2024-08-14 17:46:23.262 [debu]  started listening to event channel  event=test    t.go:99: 2024-08-14 17:46:23.263 [debu]  publish  event=test  message_len=5    t.go:99: 2024-08-14 17:46:23.293 [debu]  stopped listening to event channel  event=test--- PASS: TestDriver (0.16s)PASSok      command-line-arguments  0.189s

Pubsub reconnect tests:

➜  coder git:(f0ssel/pubsub-conn-2) DB=ci go test ./coderd/database/pubsub/ -v -run TestPGPubsubDriver=== RUN   TestPGPubsubDriver=== PAUSE TestPGPubsubDriver=== CONT  TestPGPubsubDriver    t.go:99: 2024-08-21 16:16:55.761 [info]  pubsub dialing postgres  network=tcp  address=localhost:32779  timeout_ms=0    t.go:99: 2024-08-21 16:16:55.762 [info]  pubsub postgres TCP connection established  network=tcp  address=localhost:32779  timeout_ms=0  elapsed_ms=0    t.go:99: 2024-08-21 16:16:55.766 [info]  pubsub connected to postgres    t.go:99: 2024-08-21 16:16:55.766 [info]  pubsub has started    t.go:99: 2024-08-21 16:16:55.767 [debu]  started listening to event channel  event=test    t.go:99: 2024-08-21 16:16:55.767 [debu]  publish  event=test  message_len=5    t.go:102: 2024-08-21 16:16:55.774 [erro]  pubsub disconnected from postgres  error="read tcp 127.0.0.1:53352->127.0.0.1:32779: use of closed network connection"    t.go:99: 2024-08-21 16:16:56.766 [info]  pubsub dialing postgres  network=tcp  address=localhost:32779  timeout_ms=0    t.go:99: 2024-08-21 16:16:56.767 [info]  pubsub postgres TCP connection established  network=tcp  address=localhost:32779  timeout_ms=0  elapsed_ms=0    t.go:99: 2024-08-21 16:16:56.772 [info]  pubsub reconnected to postgres    t.go:99: 2024-08-21 16:16:56.772 [debu]  notifying subscribers of a reconnection    t.go:99: 2024-08-21 16:16:57.772 [debu]  publish  event=test  message_len=11    t.go:99: 2024-08-21 16:16:57.779 [debu]  stopped listening to event channel  event=test    t.go:99: 2024-08-21 16:16:57.779 [info]  pubsub is closing    t.go:99: 2024-08-21 16:16:57.779 [info]  pubsub listen stopped receiving notify    t.go:99: 2024-08-21 16:16:57.779 [debu]  pubsub closed--- PASS: TestPGPubsubDriver (5.04s)PASSok      github.com/coder/coder/v2/coderd/database/pubsub        5.062s

@f0sself0ssel marked this pull request as ready for reviewAugust 14, 2024 17:15
}

// Set the dialer if the connector supports it.
if dc, ok := connector.(database.DialerConnector); ok {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

We need to drop a CRITICAL log if this type assertion fails, since it means we've introduced a new driver that doesn't support a Dialer and our logs will be incomplete.

f0ssel reacted with thumbs up emoji
// Create a custom connector if the database driver supports it.
connectorCreator, ok := p.db.Driver().(database.ConnectorCreator)
if ok {
connector, err = connectorCreator.Connector(connectURL)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

This code doesn't get hit in the package unit tests. A good way to test it would be to create a pq Driver wrapper that we can control.

I'd like to see a test where we start pubsub with the wrapped driver, do some pub'ing and sub'ing, then kill the connection and verify that the pubsub / pq.Listener reconnects automatically. That would give a nice test of the pq changes you made as well.

f0ssel reacted with thumbs up emoji
}

// DialerConnector can create a driver.Connector and set a pq.Dialer.
type DialerConnector interface {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

👍

f0ssel reacted with thumbs up emoji
name string
inner driver.Driver
connections []driver.Conn
listeners map[chan struct{}]chan struct{}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

This connections and listeners business is not threadsafe, and so can cause races in our tests.

A better design would be to just have achan driver.Conn that we pass each new connection thru. Tests will have to know how many connections they expect, and can read from the channel. For thePubSub, it's the listener we want to test, which should use a connection when it first connects, then another one when it reconnects.

That way you don't need methods likeAddConnection,WaitForConnection, orDropConnections -- the test code reads from the channel to wait for the connection, and can directly close the connection when it wants to interrupt it.

There is some complexity around thesql.DB, which we use for publishing and has a pool of connections. I suggest you sidestep that complexity by just using a secondPubSub for publishing with a regularpq driver.

}

func Register() (*Driver, error) {
db, err := sql.Open("postgres", "")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

This is a roundabout way to get apq driver, which is justpq.Driver{}

listeners: make(map[chan struct{}]chan struct{}),
}

sql.Register(d.name, d)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

It seems like you're doing this so that you can get asql.DB to pass to the PubSub with this instrumented driver. But, registering it and usingsql.Open() with the name is more complex than it needs to be.

If you allow test code to directly instantiate the instrumented connector, then you can get asql.DB as:

db := sql.ConnectDB(connector)

That avoids registering the driver and worrying about a unique name.

case <-gotChan:
case <-ctx.Done():
t.Fatal("timeout waiting for message")
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

This construction comes up a lot, so we have

_ = testutil.RequireRecvContext(ctx, t, gotChan)

for this purpose.

case <-reconnectChan:
case <-ctx.Done():
t.Fatal("timeout waiting for reconnect")
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

_ = testutil.RequireRecvChan(ctx, t, reconnectChan)

case <-gotChan:
case <-ctx.Done():
t.Fatal("timeout waiting for message after reconnect")
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

_ = testutil.RequireRecvCtx(ctx, t, gotChan)

@@ -119,3 +120,79 @@ func TestPGPubsub_Metrics(t *testing.T) {
!testutil.PromCounterGathered(t, metrics, "coder_pubsub_latency_measure_errs_total")
}, testutil.WaitShort, testutil.IntervalFast)
}

func TestPGPubsubDriver(t *testing.T) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

This is a good test, but the design of the instrumented driver is racy -- see comments ondbtestutil/driver.go for how to address.

@f0ssel
Copy link
ContributorAuthor

@spikecurtis Thanks for the feedback on the test structure, I've updated it with your suggestions and I think things are looking good.

Copy link
Contributor

@spikecurtisspikecurtis left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

One problem inline that needs fixing, but I don't need to review again.

return conn, nil
}

conn, err := c.driver.Open(c.name)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others.Learn more.

This is circular.Driver.Open() creates a newConnector and calls into it. Fortunately, the pubsub creates a dialer, but anyone else who uses this might not and will overflow their stack.

You need to usepq.Driver{} directly here.

f0ssel reacted with thumbs up emoji
@f0sself0sselenabled auto-merge (squash)August 23, 2024 18:31
@f0sself0ssel merged commitded612d intomainAug 26, 2024
29 checks passed
@f0sself0ssel deleted the f0ssel/pubsub-conn-2 branchAugust 26, 2024 15:04
@github-actionsgithub-actionsbot locked and limited conversation to collaboratorsAug 26, 2024
Sign up for freeto subscribe to this conversation on GitHub. Already have an account?Sign in.
Reviewers

@spikecurtisspikecurtisspikecurtis approved these changes

Assignees

@f0sself0ssel

Labels
None yet
Projects
None yet
Milestone
No milestone
Development

Successfully merging this pull request may close these issues.

2 participants
@f0ssel@spikecurtis

[8]ページ先頭

©2009-2025 Movatter.jp