Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commita06b8a1

Browse files
author
RongtongJin
committed
Merge remote-tracking branch 'origin/develop' into dev_aaafasdfasdfnasl
2 parents278f195 +e87f9cb commita06b8a1

File tree

32 files changed

+401
-150
lines changed

32 files changed

+401
-150
lines changed

‎auth/src/main/java/org/apache/rocketmq/auth/authentication/provider/LocalAuthenticationMetadataProvider.java‎

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,14 +46,16 @@ public class LocalAuthenticationMetadataProvider implements AuthenticationMetada
4646

4747
privateLoadingCache<String,User>userCache;
4848

49+
protectedThreadPoolExecutorcacheRefreshExecutor;
50+
4951
@Override
5052
publicvoidinitialize(AuthConfigauthConfig,Supplier<?>metadataService) {
5153
this.storage =ConfigRocksDBStorage.getStore(authConfig.getAuthConfigPath() +File.separator +"users",false);
5254
if (!this.storage.start()) {
5355
thrownewRuntimeException("Failed to load rocksdb for auth_user, please check whether it is occupied");
5456
}
5557

56-
ThreadPoolExecutorcacheRefreshExecutor =ThreadPoolMonitor.createAndMonitor(
58+
this.cacheRefreshExecutor =ThreadPoolMonitor.createAndMonitor(
5759
1,
5860
1,
5961
1000 *60,
@@ -144,6 +146,9 @@ public void shutdown() {
144146
if (this.storage !=null) {
145147
this.storage.shutdown();
146148
}
149+
if (this.cacheRefreshExecutor !=null) {
150+
this.cacheRefreshExecutor.shutdown();
151+
}
147152
}
148153

149154
privatestaticclassUserCacheLoaderimplementsCacheLoader<String,User> {

‎auth/src/main/java/org/apache/rocketmq/auth/authorization/manager/AuthorizationMetadataManagerImpl.java‎

Lines changed: 19 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -176,19 +176,26 @@ public CompletableFuture<Void> deleteAcl(Subject subject, PolicyType policyType,
176176

177177
@Override
178178
publicCompletableFuture<Acl>getAcl(Subjectsubject) {
179-
CompletableFuture<?extendsSubject>subjectFuture;
180-
if (subject.isSubject(SubjectType.USER)) {
181-
Useruser = (User)subject;
182-
subjectFuture =this.getAuthenticationMetadataProvider().getUser(user.getUsername());
183-
}else {
184-
subjectFuture =CompletableFuture.completedFuture(subject);
185-
}
186-
returnsubjectFuture.thenCompose(sub -> {
187-
if (sub ==null) {
188-
thrownewAuthorizationException("The subject is not exist.");
179+
try {
180+
if (subject ==null) {
181+
thrownewAuthorizationException("The subject is null.");
189182
}
190-
returnthis.getAuthorizationMetadataProvider().getAcl(subject);
191-
});
183+
CompletableFuture<?extendsSubject>subjectFuture;
184+
if (subject.isSubject(SubjectType.USER)) {
185+
Useruser = (User)subject;
186+
subjectFuture =this.getAuthenticationMetadataProvider().getUser(user.getUsername());
187+
}else {
188+
subjectFuture =CompletableFuture.completedFuture(subject);
189+
}
190+
returnsubjectFuture.thenCompose(sub -> {
191+
if (sub ==null) {
192+
thrownewAuthorizationException("The subject is not exist.");
193+
}
194+
returnthis.getAuthorizationMetadataProvider().getAcl(sub);
195+
});
196+
}catch (Exceptione) {
197+
returnthis.handleException(e);
198+
}
192199
}
193200

194201
@Override

‎auth/src/main/java/org/apache/rocketmq/auth/authorization/provider/LocalAuthorizationMetadataProvider.java‎

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,13 +51,15 @@ public class LocalAuthorizationMetadataProvider implements AuthorizationMetadata
5151

5252
privateLoadingCache<String,Acl>aclCache;
5353

54+
protectedThreadPoolExecutorcacheRefreshExecutor;
55+
5456
@Override
5557
publicvoidinitialize(AuthConfigauthConfig,Supplier<?>metadataService) {
5658
this.storage =ConfigRocksDBStorage.getStore(authConfig.getAuthConfigPath() +File.separator +"acls",false);
5759
if (!this.storage.start()) {
5860
thrownewRuntimeException("Failed to load rocksdb for auth_acl, please check whether it is occupied.");
5961
}
60-
ThreadPoolExecutorcacheRefreshExecutor =ThreadPoolMonitor.createAndMonitor(
62+
this.cacheRefreshExecutor =ThreadPoolMonitor.createAndMonitor(
6163
1,
6264
1,
6365
1000 *60,
@@ -172,6 +174,9 @@ public void shutdown() {
172174
if (this.storage !=null) {
173175
this.storage.shutdown();
174176
}
177+
if (this.cacheRefreshExecutor !=null) {
178+
this.cacheRefreshExecutor.shutdown();
179+
}
175180
}
176181

177182
privatestaticclassAclCacheLoaderimplementsCacheLoader<String,Acl> {

‎auth/src/main/java/org/apache/rocketmq/auth/authorization/strategy/AbstractAuthorizationStrategy.java‎

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,8 @@
1616
*/
1717
packageorg.apache.rocketmq.auth.authorization.strategy;
1818

19-
importjava.util.ArrayList;
20-
importjava.util.List;
19+
importjava.util.HashSet;
20+
importjava.util.Set;
2121
importjava.util.function.Supplier;
2222
importorg.apache.commons.lang3.StringUtils;
2323
importorg.apache.rocketmq.auth.authorization.context.AuthorizationContext;
@@ -30,7 +30,7 @@
3030
publicabstractclassAbstractAuthorizationStrategyimplementsAuthorizationStrategy {
3131

3232
protectedfinalAuthConfigauthConfig;
33-
protectedfinalList<String>authorizationWhitelist =newArrayList<>();
33+
protectedfinalSet<String>authorizationWhiteSet =newHashSet<>();
3434
protectedfinalAuthorizationProvider<AuthorizationContext>authorizationProvider;
3535

3636
publicAbstractAuthorizationStrategy(AuthConfigauthConfig,Supplier<?>metadataService) {
@@ -42,7 +42,7 @@ public AbstractAuthorizationStrategy(AuthConfig authConfig, Supplier<?> metadata
4242
if (StringUtils.isNotBlank(authConfig.getAuthorizationWhitelist())) {
4343
String[]whitelist =StringUtils.split(authConfig.getAuthorizationWhitelist(),",");
4444
for (StringrpcCode :whitelist) {
45-
this.authorizationWhitelist.add(StringUtils.trim(rpcCode));
45+
this.authorizationWhiteSet.add(StringUtils.trim(rpcCode));
4646
}
4747
}
4848
}
@@ -57,7 +57,7 @@ public void doEvaluate(AuthorizationContext context) {
5757
if (this.authorizationProvider ==null) {
5858
return;
5959
}
60-
if (this.authorizationWhitelist.contains(context.getRpcCode())) {
60+
if (this.authorizationWhiteSet.contains(context.getRpcCode())) {
6161
return;
6262
}
6363
try {
Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
packageorg.apache.rocketmq.auth.authentication.provider;
18+
19+
importorg.apache.rocketmq.auth.config.AuthConfig;
20+
importorg.apache.rocketmq.auth.helper.AuthTestHelper;
21+
importorg.junit.Assert;
22+
importorg.junit.Rule;
23+
importorg.junit.Test;
24+
importorg.junit.rules.TemporaryFolder;
25+
26+
publicclassLocalAuthenticationMetadataProviderTest {
27+
28+
@Rule
29+
publicTemporaryFoldertempFolder =newTemporaryFolder();
30+
31+
@Test
32+
publicvoidtestShutdownReleasesCacheExecutor()throwsException {
33+
AuthConfigauthConfig =AuthTestHelper.createDefaultConfig();
34+
authConfig.setAuthConfigPath(tempFolder.newFolder("auth-test").getAbsolutePath());
35+
36+
LocalAuthenticationMetadataProviderprovider =newLocalAuthenticationMetadataProvider();
37+
// Initialize provider to create the internal cache refresh executor
38+
provider.initialize(authConfig, () ->null);
39+
40+
// After initialization, the executor should exist and not be shutdown
41+
Assert.assertNotNull(provider.cacheRefreshExecutor);
42+
Assert.assertFalse(provider.cacheRefreshExecutor.isShutdown());
43+
44+
// Shutdown provider should also shutdown its executor to release resources
45+
provider.shutdown();
46+
47+
// Verify that the cache refresh executor has been shutdown
48+
Assert.assertTrue(provider.cacheRefreshExecutor.isShutdown());
49+
}
50+
}

‎auth/src/test/java/org/apache/rocketmq/auth/authorization/manager/AuthorizationMetadataManagerTest.java‎

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -203,6 +203,21 @@ public void getAcl() {
203203
});
204204
}
205205

206+
@Test
207+
publicvoidtestGetAclWithNullSubject() {
208+
if (MixAll.isMac()) {
209+
return;
210+
}
211+
AuthorizationExceptionauthorizationException =Assert.assertThrows(AuthorizationException.class, () -> {
212+
try {
213+
this.authorizationMetadataManager.getAcl(null).join();
214+
}catch (Exceptione) {
215+
AuthTestHelper.handleException(e);
216+
}
217+
});
218+
Assert.assertEquals("The subject is null.",authorizationException.getMessage());
219+
}
220+
206221
@Test
207222
publicvoidlistAcl() {
208223
if (MixAll.isMac()) {
Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
packageorg.apache.rocketmq.auth.authorization.provider;
18+
19+
importorg.apache.rocketmq.auth.config.AuthConfig;
20+
importorg.apache.rocketmq.auth.helper.AuthTestHelper;
21+
importorg.junit.Assert;
22+
importorg.junit.Rule;
23+
importorg.junit.Test;
24+
importorg.junit.rules.TemporaryFolder;
25+
26+
publicclassLocalAuthorizationMetadataProviderTest {
27+
28+
@Rule
29+
publicTemporaryFoldertempFolder =newTemporaryFolder();
30+
31+
@Test
32+
publicvoidtestShutdownReleasesCacheExecutor()throwsException {
33+
AuthConfigauthConfig =AuthTestHelper.createDefaultConfig();
34+
authConfig.setAuthConfigPath(tempFolder.newFolder("auth-test").getAbsolutePath());
35+
36+
LocalAuthorizationMetadataProviderprovider =newLocalAuthorizationMetadataProvider();
37+
// Initialize provider to create the internal cache refresh executor
38+
provider.initialize(authConfig, () ->null);
39+
40+
// After initialization, the executor should exist and not be shutdown
41+
Assert.assertNotNull(provider.cacheRefreshExecutor);
42+
Assert.assertFalse(provider.cacheRefreshExecutor.isShutdown());
43+
44+
// Shutdown provider should also shutdown its executor to release resources
45+
provider.shutdown();
46+
47+
// Verify that the cache refresh executor has been shutdown
48+
Assert.assertTrue(provider.cacheRefreshExecutor.isShutdown());
49+
}
50+
}

‎broker/src/main/java/org/apache/rocketmq/broker/config/v1/RocksDBConfigManager.java‎

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,6 @@ public class RocksDBConfigManager {
3838

3939
publicstaticfinalCharsetCHARSET =StandardCharsets.UTF_8;
4040

41-
publicvolatilebooleanisStop =false;
4241
publicConfigRocksDBStorageconfigRocksDBStorage =null;
4342
privateFlushOptionsflushOptions =null;
4443
privatevolatilelonglastFlushMemTableMicroSecond =0;
@@ -72,11 +71,14 @@ public RocksDBConfigManager(String filePath, long memTableFlushInterval, Compres
7271
}
7372

7473
publicbooleaninit(booleanreadOnly) {
75-
this.isStop =false;
7674
this.configRocksDBStorage =ConfigRocksDBStorage.getStore(filePath,readOnly,compressionType);
7775
returnthis.configRocksDBStorage.start();
7876
}
7977

78+
publicbooleanisLoaded() {
79+
returnthis.configRocksDBStorage !=null &&this.configRocksDBStorage.isLoaded();
80+
}
81+
8082
publicbooleaninit() {
8183
returnthis.init(false);
8284
}
@@ -113,7 +115,6 @@ public void start() {
113115
}
114116

115117
publicbooleanstop() {
116-
this.isStop =true;
117118
ConfigRocksDBStorage.shutdown(filePath);
118119
if (this.flushOptions !=null) {
119120
this.flushOptions.close();
@@ -123,7 +124,7 @@ public boolean stop() {
123124

124125
publicvoidflushWAL() {
125126
try {
126-
if (this.isStop) {
127+
if (!isLoaded()) {
127128
return;
128129
}
129130
if (this.configRocksDBStorage !=null) {
@@ -183,4 +184,5 @@ public Statistics getStatistics() {
183184

184185
returnconfigRocksDBStorage.getStatistics();
185186
}
187+
186188
}

‎broker/src/main/java/org/apache/rocketmq/broker/config/v1/RocksDBConsumerOffsetManager.java‎

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -157,7 +157,7 @@ public String configFilePath() {
157157

158158
@Override
159159
publicsynchronizedvoidpersist() {
160-
if (!rocksDBConfigManager.isStop) {
160+
if (rocksDBConfigManager.isLoaded()) {
161161
try (WriteBatchwriteBatch =newWriteBatch()) {
162162
for (Entry<String,ConcurrentMap<Integer,Long>>entry :this.offsetTable.entrySet()) {
163163
putWriteBatch(writeBatch,entry.getKey(),entry.getValue());

‎broker/src/main/java/org/apache/rocketmq/broker/longpolling/PopCommandCallback.java‎

Lines changed: 7 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -17,33 +17,31 @@
1717

1818
packageorg.apache.rocketmq.broker.longpolling;
1919

20+
importjava.util.concurrent.CompletableFuture;
2021
importjava.util.function.BiConsumer;
21-
importjava.util.function.Consumer;
2222
importorg.apache.rocketmq.broker.metrics.ConsumerLagCalculator;
2323
importorg.apache.rocketmq.remoting.CommandCallback;
2424

2525
publicclassPopCommandCallbackimplementsCommandCallback {
2626

2727
privatefinalBiConsumer<ConsumerLagCalculator.ProcessGroupInfo,
28-
Consumer<ConsumerLagCalculator.CalculateLagResult>>biConsumer;
29-
28+
CompletableFuture<ConsumerLagCalculator.CalculateLagResult>>biConsumer;
3029
privatefinalConsumerLagCalculator.ProcessGroupInfoinfo;
31-
privatefinalConsumer<ConsumerLagCalculator.CalculateLagResult>lagRecorder;
32-
30+
privatefinalCompletableFuture<ConsumerLagCalculator.CalculateLagResult>future;
3331

3432
publicPopCommandCallback(
3533
BiConsumer<ConsumerLagCalculator.ProcessGroupInfo,
36-
Consumer<ConsumerLagCalculator.CalculateLagResult>>biConsumer,
34+
CompletableFuture<ConsumerLagCalculator.CalculateLagResult>>biConsumer,
3735
ConsumerLagCalculator.ProcessGroupInfoinfo,
38-
Consumer<ConsumerLagCalculator.CalculateLagResult>lagRecorder) {
36+
CompletableFuture<ConsumerLagCalculator.CalculateLagResult>future) {
3937

4038
this.biConsumer =biConsumer;
4139
this.info =info;
42-
this.lagRecorder =lagRecorder;
40+
this.future =future;
4341
}
4442

4543
@Override
4644
publicvoidaccept() {
47-
biConsumer.accept(info,lagRecorder);
45+
biConsumer.accept(info,future);
4846
}
4947
}

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp