Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit049aa9d

Browse files
stepanchegslandelle
authored andcommitted
Simplify semaphore logic in ChannelManager (AsyncHttpClient#1372)
* avoid using `java.util.concurrent.Semaphore` which is blocking, and its blocking part is not used, which is confusing.* Use `NonBlockingSemaphoreInfinite` to avoid nulls and ifs when connection count is not limited.
1 parent672160b commit049aa9d

File tree

5 files changed

+200
-18
lines changed

5 files changed

+200
-18
lines changed

‎client/src/main/java/org/asynchttpclient/netty/channel/ChannelManager.java‎

Lines changed: 17 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,6 @@
4343
importjava.io.IOException;
4444
importjava.util.Map.Entry;
4545
importjava.util.concurrent.ConcurrentHashMap;
46-
importjava.util.concurrent.Semaphore;
4746
importjava.util.concurrent.ThreadFactory;
4847
importjava.util.concurrent.TimeUnit;
4948

@@ -103,9 +102,9 @@ public class ChannelManager {
103102
privatefinalChannelPoolchannelPool;
104103
privatefinalChannelGroupopenChannels;
105104
privatefinalbooleanmaxTotalConnectionsEnabled;
106-
privatefinalSemaphorefreeChannels;
105+
privatefinalNonBlockingSemaphoreLikefreeChannels;
107106
privatefinalbooleanmaxConnectionsPerHostEnabled;
108-
privatefinalConcurrentHashMap<Object,Semaphore>freeChannelsPerHost =newConcurrentHashMap<>();
107+
privatefinalConcurrentHashMap<Object,NonBlockingSemaphore>freeChannelsPerHost =newConcurrentHashMap<>();
109108

110109
privateAsyncHttpClientHandlerwsHandler;
111110

@@ -134,18 +133,21 @@ public ChannelManager(final AsyncHttpClientConfig config, Timer nettyTimer) {
134133
maxTotalConnectionsEnabled =config.getMaxConnections() >0;
135134
maxConnectionsPerHostEnabled =config.getMaxConnectionsPerHost() >0;
136135

136+
freeChannels =maxTotalConnectionsEnabled ?
137+
newNonBlockingSemaphore(config.getMaxConnections()) :
138+
NonBlockingSemaphoreInfinite.INSTANCE;
139+
137140
if (maxTotalConnectionsEnabled ||maxConnectionsPerHostEnabled) {
138141
openChannels =newDefaultChannelGroup("asyncHttpClient",GlobalEventExecutor.INSTANCE) {
139142
@Override
140143
publicbooleanremove(Objecto) {
141144
booleanremoved =super.remove(o);
142145
if (removed) {
143-
if (maxTotalConnectionsEnabled)
144-
freeChannels.release();
146+
freeChannels.release();
145147
if (maxConnectionsPerHostEnabled) {
146148
ObjectpartitionKey =Channel.class.cast(o).attr(partitionKeyAttr).getAndSet(null);
147149
if (partitionKey !=null) {
148-
SemaphorehostFreeChannels =freeChannelsPerHost.get(partitionKey);
150+
NonBlockingSemaphorehostFreeChannels =freeChannelsPerHost.get(partitionKey);
149151
if (hostFreeChannels !=null)
150152
hostFreeChannels.release();
151153
}
@@ -154,10 +156,8 @@ public boolean remove(Object o) {
154156
returnremoved;
155157
}
156158
};
157-
freeChannels =newSemaphore(config.getMaxConnections());
158159
}else {
159160
openChannels =newDefaultChannelGroup("asyncHttpClient",GlobalEventExecutor.INSTANCE);
160-
freeChannels =null;
161161
}
162162

163163
handshakeTimeout =config.getHandshakeTimeout();
@@ -331,15 +331,17 @@ public boolean removeAll(Channel connection) {
331331
}
332332

333333
privatebooleantryAcquireGlobal() {
334-
return!maxTotalConnectionsEnabled ||freeChannels.tryAcquire();
334+
returnfreeChannels.tryAcquire();
335335
}
336336

337-
privateSemaphoregetFreeConnectionsForHost(ObjectpartitionKey) {
338-
returnfreeChannelsPerHost.computeIfAbsent(partitionKey,pk ->newSemaphore(config.getMaxConnectionsPerHost()));
337+
privateNonBlockingSemaphoreLikegetFreeConnectionsForHost(ObjectpartitionKey) {
338+
returnmaxConnectionsPerHostEnabled ?
339+
freeChannelsPerHost.computeIfAbsent(partitionKey,pk ->newNonBlockingSemaphore(config.getMaxConnectionsPerHost())) :
340+
NonBlockingSemaphoreInfinite.INSTANCE;
339341
}
340342

341343
privatebooleantryAcquirePerHost(ObjectpartitionKey) {
342-
return!maxConnectionsPerHostEnabled ||getFreeConnectionsForHost(partitionKey).tryAcquire();
344+
returngetFreeConnectionsForHost(partitionKey).tryAcquire();
343345
}
344346

345347
publicvoidacquireChannelLock(ObjectpartitionKey)throwsIOException {
@@ -348,8 +350,7 @@ public void acquireChannelLock(Object partitionKey) throws IOException {
348350
if (!tryAcquireGlobal())
349351
throwtooManyConnections;
350352
if (!tryAcquirePerHost(partitionKey)) {
351-
if (maxTotalConnectionsEnabled)
352-
freeChannels.release();
353+
freeChannels.release();
353354

354355
throwtooManyConnectionsPerHost;
355356
}
@@ -376,10 +377,8 @@ public void closeChannel(Channel channel) {
376377
}
377378

378379
publicvoidreleaseChannelLock(ObjectpartitionKey) {
379-
if (maxTotalConnectionsEnabled)
380-
freeChannels.release();
381-
if (maxConnectionsPerHostEnabled)
382-
getFreeConnectionsForHost(partitionKey).release();
380+
freeChannels.release();
381+
getFreeConnectionsForHost(partitionKey).release();
383382
}
384383

385384
publicvoidregisterOpenChannel(Channelchannel,ObjectpartitionKey) {
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
/*
2+
* Copyright (c) 2017 AsyncHttpClient Project. All rights reserved.
3+
*
4+
* This program is licensed to you under the Apache License Version 2.0,
5+
* and you may not use this file except in compliance with the Apache License Version 2.0.
6+
* You may obtain a copy of the Apache License Version 2.0 at
7+
* http://www.apache.org/licenses/LICENSE-2.0.
8+
*
9+
* Unless required by applicable law or agreed to in writing,
10+
* software distributed under the Apache License Version 2.0 is distributed on an
11+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
* See the Apache License Version 2.0 for the specific language governing permissions and limitations there under.
13+
*/
14+
packageorg.asynchttpclient.netty.channel;
15+
16+
importjava.util.concurrent.atomic.AtomicInteger;
17+
18+
/**
19+
* Semaphore-like API, but without blocking.
20+
*
21+
* @author Stepan Koltsov
22+
*/
23+
classNonBlockingSemaphoreimplementsNonBlockingSemaphoreLike {
24+
25+
privatefinalAtomicIntegerpermits;
26+
27+
publicNonBlockingSemaphore(intpermits) {
28+
this.permits =newAtomicInteger(permits);
29+
}
30+
31+
@Override
32+
publicvoidrelease() {
33+
permits.incrementAndGet();
34+
}
35+
36+
@Override
37+
publicbooleantryAcquire() {
38+
for (;;) {
39+
intcount =permits.get();
40+
if (count <=0) {
41+
returnfalse;
42+
}
43+
if (permits.compareAndSet(count,count -1)) {
44+
returntrue;
45+
}
46+
}
47+
}
48+
}
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
/*
2+
* Copyright (c) 2017 AsyncHttpClient Project. All rights reserved.
3+
*
4+
* This program is licensed to you under the Apache License Version 2.0,
5+
* and you may not use this file except in compliance with the Apache License Version 2.0.
6+
* You may obtain a copy of the Apache License Version 2.0 at
7+
* http://www.apache.org/licenses/LICENSE-2.0.
8+
*
9+
* Unless required by applicable law or agreed to in writing,
10+
* software distributed under the Apache License Version 2.0 is distributed on an
11+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
* See the Apache License Version 2.0 for the specific language governing permissions and limitations there under.
13+
*/
14+
packageorg.asynchttpclient.netty.channel;
15+
16+
/**
17+
* Non-blocking semaphore-like object with infinite permits.
18+
*
19+
* So try-acquire always succeeds.
20+
*
21+
* @author Stepan Koltsov
22+
*/
23+
enumNonBlockingSemaphoreInfiniteimplementsNonBlockingSemaphoreLike {
24+
INSTANCE;
25+
26+
@Override
27+
publicvoidrelease() {
28+
}
29+
30+
@Override
31+
publicbooleantryAcquire() {
32+
returntrue;
33+
}
34+
}
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
/*
2+
* Copyright (c) 2017 AsyncHttpClient Project. All rights reserved.
3+
*
4+
* This program is licensed to you under the Apache License Version 2.0,
5+
* and you may not use this file except in compliance with the Apache License Version 2.0.
6+
* You may obtain a copy of the Apache License Version 2.0 at
7+
* http://www.apache.org/licenses/LICENSE-2.0.
8+
*
9+
* Unless required by applicable law or agreed to in writing,
10+
* software distributed under the Apache License Version 2.0 is distributed on an
11+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
* See the Apache License Version 2.0 for the specific language governing permissions and limitations there under.
13+
*/
14+
packageorg.asynchttpclient.netty.channel;
15+
16+
/**
17+
* Non-blocking semaphore API.
18+
*
19+
* @author Stepan Koltsov
20+
*/
21+
interfaceNonBlockingSemaphoreLike {
22+
voidrelease();
23+
24+
booleantryAcquire();
25+
}
Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
/*
2+
* Copyright (c) 2017 AsyncHttpClient Project. All rights reserved.
3+
*
4+
* This program is licensed to you under the Apache License Version 2.0,
5+
* and you may not use this file except in compliance with the Apache License Version 2.0.
6+
* You may obtain a copy of the Apache License Version 2.0 at
7+
* http://www.apache.org/licenses/LICENSE-2.0.
8+
*
9+
* Unless required by applicable law or agreed to in writing,
10+
* software distributed under the Apache License Version 2.0 is distributed on an
11+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
* See the Apache License Version 2.0 for the specific language governing permissions and limitations there under.
13+
*/
14+
packageorg.asynchttpclient.netty.channel;
15+
16+
importjava.util.concurrent.Semaphore;
17+
18+
importorg.testng.annotations.Test;
19+
20+
importstaticorg.testng.Assert.*;
21+
22+
/**
23+
* @author Stepan Koltsov
24+
*/
25+
publicclassNonBlockingSemaphoreTest {
26+
27+
privatestaticclassMirror {
28+
privatefinalSemaphorereal;
29+
privatefinalNonBlockingSemaphorenonBlocking;
30+
31+
publicMirror(intpermits) {
32+
real =newSemaphore(permits);
33+
nonBlocking =newNonBlockingSemaphore(permits);
34+
}
35+
36+
publicbooleantryAcquire() {
37+
booleana =real.tryAcquire();
38+
booleanb =nonBlocking.tryAcquire();
39+
assertEquals(a,b);
40+
returna;
41+
}
42+
43+
publicvoidrelease() {
44+
real.release();
45+
nonBlocking.release();
46+
}
47+
}
48+
49+
@Test
50+
publicvoidtest0() {
51+
Mirrormirror =newMirror(0);
52+
assertFalse(mirror.tryAcquire());
53+
}
54+
55+
@Test
56+
publicvoidthree() {
57+
Mirrormirror =newMirror(3);
58+
for (inti =0;i <3; ++i) {
59+
assertTrue(mirror.tryAcquire());
60+
}
61+
assertFalse(mirror.tryAcquire());
62+
mirror.release();
63+
assertTrue(mirror.tryAcquire());
64+
}
65+
66+
@Test
67+
publicvoidnegative() {
68+
Mirrormirror =newMirror(-1);
69+
assertFalse(mirror.tryAcquire());
70+
mirror.release();
71+
assertFalse(mirror.tryAcquire());
72+
mirror.release();
73+
assertTrue(mirror.tryAcquire());
74+
}
75+
76+
}

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp