Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commit41d06f0

Browse files
committed
Add ChainedThreadPoolExecutor: tasks marked with the same owner will be
run one by one, tasks with different owners will be run in differentthreadsFixing bug that HttpRequest has the wrong response code.
1 parentdf98c14 commit41d06f0

File tree

4 files changed

+333
-2
lines changed

4 files changed

+333
-2
lines changed
Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
packagenet.sf.j2s.ajax;
2+
3+
importjava.util.LinkedList;
4+
importjava.util.Queue;
5+
6+
publicclassChainedRunnableimplementsRunnable {
7+
8+
privateRunnabletask;
9+
10+
privateChainedRunnablenext;
11+
12+
privateObjectowner;
13+
14+
privatevolatilebooleandone;
15+
16+
publicChainedRunnable(Objectowner,Runnabletask) {
17+
super();
18+
this.owner =owner;
19+
this.task =task;
20+
done =false;
21+
}
22+
23+
publicvoidrunTask() {
24+
if (task !=null) {
25+
task.run();
26+
}
27+
}
28+
29+
publicvoidrun() {
30+
runTask();
31+
// May run into stack overflow!
32+
//if (next != null) {
33+
//next.run();
34+
//}
35+
ChainedRunnablen =next;
36+
Queue<ChainedRunnable>queue =newLinkedList<ChainedRunnable>();
37+
while (n !=null) {
38+
queue.add(n);
39+
n.runTask();
40+
n =n.next;
41+
}
42+
// mark task done one by one
43+
while (!queue.isEmpty()) {
44+
ChainedRunnabler =queue.poll();
45+
if (r !=null)r.done =true;
46+
}
47+
done =true;
48+
}
49+
50+
publicbooleanisDone() {
51+
returndone;
52+
}
53+
54+
publicChainedRunnablegetNext() {
55+
returnnext;
56+
}
57+
58+
publicRunnablegetTask() {
59+
returntask;
60+
}
61+
62+
publicvoidaddNext(ChainedRunnabletask) {
63+
ChainedRunnableoThis =this;
64+
do {
65+
if (oThis.next ==null) {
66+
oThis.next =task;
67+
return;
68+
}else {
69+
oThis =oThis.next;
70+
}
71+
}while (true);
72+
}
73+
74+
publicObjectgetOwner() {
75+
returnowner;
76+
}
77+
78+
}
Lines changed: 250 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,250 @@
1+
packagenet.sf.j2s.ajax;
2+
3+
importjava.lang.reflect.Field;
4+
importjava.util.concurrent.ConcurrentHashMap;
5+
importjava.util.concurrent.RejectedExecutionHandler;
6+
importjava.util.concurrent.ThreadFactory;
7+
importjava.util.concurrent.TimeUnit;
8+
importjava.util.concurrent.locks.*;
9+
importjava.util.concurrent.*;
10+
importjava.util.*;
11+
12+
publicclassChainedThreadPoolExecutorextendsSimpleThreadPoolExecutor {
13+
14+
privateReentrantLockinternalMainLock =null;
15+
16+
privateSet<Runnable>internalWorkers =null;
17+
18+
privateFieldfieldWorkerThread =null;
19+
20+
privateFieldfieldWorkerFirstTask =null;
21+
22+
privateMap<Runnable,ChainedRunnable>runningTasks =newConcurrentHashMap<Runnable,ChainedRunnable>();
23+
24+
privateMap<Object,ChainedRunnable>lastTasks =newConcurrentHashMap<Object,ChainedRunnable>();
25+
26+
@SuppressWarnings("unchecked")
27+
privatevoidfetchInternalFields() {
28+
try {
29+
FieldfieldWorkers =ThreadPoolExecutor.class.getDeclaredField("workers");
30+
if (fieldWorkers !=null) {
31+
fieldWorkers.setAccessible(true);
32+
Objectvalue =fieldWorkers.get(this);
33+
if (valueinstanceofSet) {
34+
internalWorkers = (Set<Runnable>)value;
35+
}
36+
}
37+
FieldfieldMainLock =ThreadPoolExecutor.class.getDeclaredField("mainLock");
38+
if (fieldMainLock !=null) {
39+
fieldMainLock.setAccessible(true);
40+
Objectvalue =fieldMainLock.get(this);
41+
if (valueinstanceofReentrantLock) {
42+
internalMainLock = (ReentrantLock)value;
43+
}
44+
}
45+
}catch (SecurityExceptione) {
46+
e.printStackTrace();
47+
}catch (IllegalArgumentExceptione) {
48+
e.printStackTrace();
49+
}catch (NoSuchFieldExceptione) {
50+
e.printStackTrace();
51+
}catch (IllegalAccessExceptione) {
52+
e.printStackTrace();
53+
}
54+
}
55+
56+
publicChainedThreadPoolExecutor(intcorePoolSize,intmaximumPoolSize,intidlePoolSize,longkeepAliveTime,
57+
TimeUnitunit,intqueueSize,RejectedExecutionHandlerhandler) {
58+
super(corePoolSize,maximumPoolSize,idlePoolSize,keepAliveTime,unit,queueSize,handler);
59+
fetchInternalFields();
60+
}
61+
62+
publicChainedThreadPoolExecutor(intcorePoolSize,intmaximumPoolSize,intidlePoolSize,longkeepAliveTime,
63+
TimeUnitunit,intqueueSize,StringpoolName) {
64+
super(corePoolSize,maximumPoolSize,idlePoolSize,keepAliveTime,unit,queueSize,poolName);
65+
fetchInternalFields();
66+
}
67+
68+
publicChainedThreadPoolExecutor(intcorePoolSize,intmaximumPoolSize,intidlePoolSize,longkeepAliveTime,
69+
TimeUnitunit,intqueueSize,ThreadFactorythreadFactory,RejectedExecutionHandlerhandler) {
70+
super(corePoolSize,maximumPoolSize,idlePoolSize,keepAliveTime,unit,queueSize,threadFactory,handler);
71+
fetchInternalFields();
72+
}
73+
74+
publicChainedThreadPoolExecutor(intcorePoolSize,intmaximumPoolSize,intidlePoolSize,longkeepAliveTime,
75+
TimeUnitunit,intqueueSize,ThreadFactorythreadFactory) {
76+
super(corePoolSize,maximumPoolSize,idlePoolSize,keepAliveTime,unit,queueSize,threadFactory);
77+
fetchInternalFields();
78+
}
79+
80+
publicChainedThreadPoolExecutor(intcorePoolSize,intmaximumPoolSize,intidlePoolSize,longkeepAliveTime,
81+
TimeUnitunit,intqueueSize) {
82+
super(corePoolSize,maximumPoolSize,idlePoolSize,keepAliveTime,unit,queueSize);
83+
fetchInternalFields();
84+
}
85+
86+
87+
privatebooleanaddIfInQueue(ChainedRunnabletask) {
88+
Objectowner =task.getOwner();
89+
finalReentrantLockmainLock =this.internalMainLock;
90+
mainLock.lock();
91+
try {
92+
//System.out.println("Worker size = " + internalWorkers.size());
93+
for (Iterator<Runnable>itr =internalWorkers.iterator();itr.hasNext();) {
94+
Runnableworker = (Runnable)itr.next();
95+
ChainedRunnablerunningTask =runningTasks.get(worker);
96+
if (runningTask !=null &&runningTask.getOwner() ==owner) {
97+
//System.out.println("Appending task " + task + " to running task " + runningTask + " on worker " + worker);
98+
runningTask.addNext(task);
99+
lastTasks.put(owner,task);
100+
returntrue;
101+
}
102+
ChainedRunnablefirstTask =null;// worker.firstTask;
103+
try {
104+
if (fieldWorkerFirstTask ==null) {
105+
Fieldf =worker.getClass().getDeclaredField("firstTask");
106+
if (f !=null) {
107+
f.setAccessible(true);
108+
}
109+
fieldWorkerFirstTask =f;
110+
}
111+
if (fieldWorkerFirstTask !=null) {
112+
Objectvalue =fieldWorkerFirstTask.get(worker);
113+
if (valueinstanceofChainedRunnable) {
114+
firstTask = (ChainedRunnable)value;
115+
}
116+
//System.out.println("Checking firstTask " + value + " for worker " + worker);
117+
}
118+
}catch (SecurityExceptione) {
119+
e.printStackTrace();
120+
}catch (IllegalArgumentExceptione) {
121+
e.printStackTrace();
122+
}catch (NoSuchFieldExceptione) {
123+
e.printStackTrace();
124+
}catch (IllegalAccessExceptione) {
125+
e.printStackTrace();
126+
}
127+
if (firstTask !=null &&firstTask.getOwner() ==owner) {
128+
//System.out.println("Appending task " + task + " to first task " + runningTask + " on worker " + worker);
129+
firstTask.addNext(task);
130+
lastTasks.put(owner,task);
131+
returntrue;
132+
}
133+
}
134+
for (Iterator<Runnable>itr =getQueue().iterator();itr.hasNext();) {
135+
Runnablenext =itr.next();
136+
if (nextinstanceofChainedRunnable) {
137+
ChainedRunnabler = (ChainedRunnable)next;
138+
if (r.getOwner() ==owner) {
139+
//System.out.println("Appending task " + task + " to queued task " + r);
140+
r.addNext(task);
141+
lastTasks.put(owner,task);
142+
returntrue;
143+
}
144+
}
145+
}
146+
ChainedRunnablelast =lastTasks.get(owner);
147+
if (last !=null && !last.isDone()) {
148+
//System.out.println("Appending task " + task + " to last task " + last);
149+
last.addNext(task);
150+
lastTasks.put(owner,task);
151+
returntrue;
152+
}
153+
154+
//System.out.println("Not in queue, starting new worker for " + task);
155+
lastTasks.put(owner,task);
156+
returnfalse;
157+
}finally {
158+
mainLock.unlock();
159+
}
160+
}
161+
162+
publicvoidexecute(Objectowner,Runnablecommand) {
163+
ChainedRunnabletask =newChainedRunnable(owner,command);
164+
execute(task);
165+
}
166+
167+
@Override
168+
publicvoidexecute(Runnablecommand) {
169+
if (command ==null)
170+
thrownewNullPointerException();
171+
if (!(commandinstanceofChainedRunnable)) {
172+
thrownewRuntimeException("Not a chained runnable task");
173+
}
174+
ChainedRunnablechainCommand = (ChainedRunnable)command;
175+
if (addIfInQueue(chainCommand)) {
176+
return;
177+
}
178+
super.execute(command);
179+
}
180+
181+
@Override
182+
protectedvoidafterExecute(Runnabler,Throwablet) {
183+
finalReentrantLockmainLock =this.internalMainLock;
184+
booleanremoveError =false;
185+
mainLock.lock();
186+
try {
187+
for (Map.Entry<Runnable,ChainedRunnable>entry :runningTasks.entrySet()) {
188+
if (entry.getValue() ==r) {
189+
runningTasks.remove(entry.getKey());
190+
break;
191+
}
192+
}
193+
if (rinstanceofChainedRunnable) {
194+
ChainedRunnabletask = (ChainedRunnable)r;
195+
Objectowner =task.getOwner();
196+
ChainedRunnablelastTask =lastTasks.get(owner);
197+
if (lastTask ==task) {
198+
ChainedRunnablelast =lastTasks.remove(owner);
199+
removeError =last !=task;
200+
}
201+
}
202+
}finally {
203+
mainLock.unlock();
204+
}
205+
if (removeError) {
206+
System.out.println("Removed updated last task " +r);
207+
}
208+
super.afterExecute(r,t);
209+
}
210+
211+
@Override
212+
protectedvoidbeforeExecute(Threadt,Runnabler) {
213+
finalReentrantLockmainLock =this.internalMainLock;
214+
mainLock.lock();
215+
try {
216+
for (Runnableworker :internalWorkers) {
217+
try {
218+
if (fieldWorkerThread ==null) {
219+
Fieldf =worker.getClass().getDeclaredField("thread");
220+
if (f !=null) {
221+
f.setAccessible(true);
222+
}
223+
fieldWorkerThread =f;
224+
}
225+
if (fieldWorkerThread !=null) {
226+
Objectvalue =fieldWorkerThread.get(worker);
227+
if (t ==value) {
228+
if (rinstanceofChainedRunnable) {
229+
runningTasks.put(worker, (ChainedRunnable)r);
230+
break;
231+
}// else dummy task
232+
}
233+
}
234+
}catch (SecurityExceptione) {
235+
e.printStackTrace();
236+
}catch (IllegalArgumentExceptione) {
237+
e.printStackTrace();
238+
}catch (NoSuchFieldExceptione) {
239+
e.printStackTrace();
240+
}catch (IllegalAccessExceptione) {
241+
e.printStackTrace();
242+
}
243+
}
244+
}finally {
245+
mainLock.unlock();
246+
}
247+
super.beforeExecute(t,r);
248+
}
249+
250+
}

‎sources/net.sf.j2s.ajax/ajaxcore/net/sf/j2s/ajax/HttpRequest.java‎

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -337,6 +337,7 @@ public void setRequestHeader(String key, String value) {
337337
* @return String the all response header value.
338338
*/
339339
publicStringgetAllResponseHeaders() {
340+
if (connection !=null)returnnull;
340341
StringBuilderbuilder =newStringBuilder();
341342
inti =1;
342343
while (true) {
@@ -362,6 +363,7 @@ public String getAllResponseHeaders() {
362363
* @return String the response header value.
363364
*/
364365
publicStringgetResponseHeader(Stringkey) {
366+
if (connection ==null)returnnull;
365367
Map<String,List<String>>headerFields =connection.getHeaderFields();
366368
List<String>list =headerFields.get(key);
367369
if (list ==null) {
@@ -549,6 +551,7 @@ private void request() {
549551
}catch (IOExceptione) {
550552
if (checkAbort())return;// exception caused by abort action
551553
//e.printStackTrace();
554+
status =connection.getResponseCode();
552555
readyState =4;
553556
if (onreadystatechange !=null) {
554557
onreadystatechange.onLoaded();

‎sources/net.sf.j2s.ajax/ajaxrpc/net/sf/j2s/ajax/SimpleSerializable.java‎

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,9 +42,9 @@
4242
*/
4343
publicclassSimpleSerializableimplementsCloneable {
4444

45-
publicstaticSimpleSerializableUNKNOWN =newSimpleSerializable();
45+
publicstaticfinalSimpleSerializableUNKNOWN =newSimpleSerializable();
4646

47-
publicstaticSimpleSerializableERROR =newSimpleSerializable();// Used to indicate that format error!
47+
publicstaticfinalSimpleSerializableERROR =newSimpleSerializable();// Used to indicate that format error!
4848

4949
@J2SIgnore
5050
publicstaticSimpleFactoryfallbackFactory =null;

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp