Define a data type consisting of a fixed number of 'buckets', each containing a nonnegative integer value, which supports operations to:
In order to exercise this data type, create one set of buckets, and start three concurrent tasks:
The display task need not be explicit; use of e.g. a debugger or trace tool is acceptable provided it is simple to set up to provide the display.
This task is intended as an exercise inatomic operations. The sum of the bucket values must be preserved even if the two tasks attempt to perform transfers simultaneously, and a straightforward solution is to ensure that at any time, only one transfer is actually occurring — that the transfer operation isatomic.
varbucketvarbucket-size\ The 'bucket' will be a simple array of some values::genbucket\ n --a:newswap(\ make a random int up to 1000rand-pcgn:abs1000n:moda:push)swaptimesbucket!;\ display bucket and its total::.bucketbucketlock@dup.space'n:+0a:reduce.crbucketunlockdrop;\ Get current value of bucket #x:bucket@\ n -- bucket[n]bucket@swapa:@nip;\ Transfer x from bucket n to bucket m:bucket-xfer\ m n x -->rbucket@\ m n bucketovera:@r@n:-rotswapa:!\ m bucketovera:@r>n:+rotswapa:!drop;\ Get two random indices to check (ensure they're not the same)::pick2rand-pcgn:absbucket-size@n:moddup>rrepeatdroprand-pcgn:absbucket-size@n:modr@overn:=while!r>;\ Pick two buckets and make them more equal (by a quarter of their difference)::make-equalrepeatpick2bucketlock@thirda:@>rovera:@r>n:-\ if they are equal, do nothingdupnotif\ equal, so do nothingdrop-rot2dropelse4n:/n:int>r-rotr>bucket-xferthendropbucketunlockdropagain;\ Moves a quarter of the smaller value from one (random) bucket to another::make-redistrepeatpick2bucketlock@\ n m bucketovera:@>r\ n m b b[m]thirda:@r>\ n m b b[n]n:min4n:/n:intnipbucket-xferbucketunlockdropagain;:app:main\ create 10 buckets with random positive integer values:10genbucketbucket@a:lenbucket-size!drop\ print the bucket.bucket\ the problem's tasks:'make-equalt:task'make-redistt:task\ the print-the-bucket task. We'll do it just 10 times and then quit:( 1 sleep .bucket )10timesbye;
[941,654,311,605,332,822,62,658,9,348] 4742[289,98,710,698,183,490,675,688,793,118] 4742[269,51,141,11,3,1284,1371,436,344,832] 4742[1097,229,1097,307,421,25,85,676,188,617] 4742[503,475,459,467,458,477,451,488,460,504] 4742[480,498,484,460,481,464,467,488,481,439] 4742[442,491,511,446,540,487,424,489,524,388] 4742[3,306,114,88,185,366,2331,202,1138,9] 4742[312,187,212,616,698,790,551,572,568,236] 4742[473,474,475,474,474,473,476,475,473,475] 4742[466,457,448,468,454,501,479,490,469,510] 4742
withAda.Text_IO;useAda.Text_IO;withAda.Numerics.Discrete_Random;procedureTest_UpdatesistypeBucket_Indexisrange1..13;packageRandom_Indexis newAda.Numerics.Discrete_Random(Bucket_Index);useRandom_Index;typeBucketsisarray(Bucket_Index)ofNatural;protectedtypeSafe_BucketsisprocedureInitialize(Value:Buckets);functionGet(I:Bucket_Index)returnNatural;procedureTransfer(I,J:Bucket_Index;Amount:Integer);functionSnapshotreturnBuckets;privateData:Buckets:=(others=>0);endSafe_Buckets;protectedbodySafe_BucketsisprocedureInitialize(Value:Buckets)isbeginData:=Value;endInitialize;functionGet(I:Bucket_Index)returnNaturalisbeginreturnData(I);endGet;procedureTransfer(I,J:Bucket_Index;Amount:Integer)isIncrement:constantInteger:=Integer'Max(-Data(J),Integer'Min(Data(I),Amount));beginData(I):=Data(I)-Increment;Data(J):=Data(J)+Increment;endTransfer;functionSnapshotreturnBucketsisbeginreturnData;endSnapshot;endSafe_Buckets;Data:Safe_Buckets;taskEqualize;taskMess_Up;taskbodyEqualizeisDice:Generator;I,J:Bucket_Index;beginloopI:=Random(Dice);J:=Random(Dice);Data.Transfer(I,J,(Data.Get(I)-Data.Get(J))/2);endloop;endEqualize;taskbodyMess_UpisDice:Generator;beginloopData.Transfer(Random(Dice),Random(Dice),100);endloop;endMess_Up;beginData.Initialize((1,2,3,4,5,6,7,8,9,10,11,12,13));loopdelay1.0;declareState:Buckets:=Data.Snapshot;Sum:Natural:=0;beginforIndexinState'RangeloopSum:=Sum+State(Index);Put(Integer'Image(State(Index)));endloop;Put(" ="&Integer'Image(Sum));New_Line;end;endloop;endTest_Updates;
The array of buckets is a protected object which controls access to its state. The task Equalize averages pairs of buckets. The task Mess_Up moves content of one bucket to another. The main task performs monitoring of the buckets state. Sample output:
18 0 0 0 36 16 0 0 0 2 0 19 0 = 91 0 0 0 6 0 0 37 0 6 23 19 0 0 = 91 1 0 7 66 4 0 0 4 0 0 0 0 9 = 91 0 1 0 2 28 0 17 0 0 22 1 0 20 = 91 2 0 0 11 0 37 17 0 0 0 8 0 16 = 91 0 10 0 59 0 2 0 13 0 2 0 5 0 = 91 0 1 0 10 0 0 0 0 0 0 80 0 0 = 91 16 0 0 0 13 0 9 8 14 16 0 15 0 = 91 0 1 2 0 1 0 42 1 0 42 2 0 0 = 91 0 16 0 0 0 19 28 0 0 0 0 0 28 = 91...
Bucket:=[],Buckets:=10,Originaltotal=0loop,%Buckets%{Random,rnd,0,50Bucket[A_Index]:=rnd,Originaltotal+=rnd}loop100{total:=0Randomize(B1,B2,Buckets)temp:=(Bucket[B1]+Bucket[B2])/2Bucket[B1]:=floor(temp),Bucket[B2]:=Ceil(temp); values closer to equalRandomize(B1,B2,Buckets)temp:=Bucket[B1]+Bucket[B2]Random,value,0,%temp%Bucket[B1]:=value,Bucket[B2]:=temp-value; redistribute values arbitrarilyVisualTip:="Original Total = "Originaltotal"`n"loop,%Buckets%VisualTip.=SubStr("0"Bucket[A_Index],-1)" : "x(Bucket[A_Index])"`n",total+=Bucket[A_Index]ToolTip%VisualTip"Current Total = "totalif(total<>Originaltotal)MsgBox"Error"Sleep,100}returnRandomize(ByRefB1,ByRefB2,Buckets){Random,B1,1,%Buckets%LoopRandom,B2,1,%Buckets%until(B1<>B2)}x(n){loop,%nRes.=">"returnRes}
The BBC BASIC interpreter is single-threaded so the 'concurrent' tasks are implemented by timer events. In this context an 'atomic' update means one which takes place within a single BASIC statement, so it cannot be 'interrupted'. Two (or more) buckets can be updated atomically by making them RETURN parameters of a procedure.
INSTALL@lib$+"TIMERLIB"DIMBuckets%(100)FORi%=1TO100:Buckets%(i%)=RND(10):NEXTtid0%=FN_ontimer(10,PROCdisplay,1)tid1%=FN_ontimer(11,PROCflatten,1)tid2%=FN_ontimer(12,PROCroughen,1)ONERRORPROCcleanup:REPORT:PRINT:ENDONCLOSEPROCcleanup:QUITREPEATWAIT0UNTILFALSEENDDEFPROCdisplayPRINTSUM(Buckets%())" ",MOD(Buckets%())ENDPROCDEFPROCflattenLOCALd%,i%,j%REPEATi%=RND(100)j%=RND(100)UNTILi%<>j%d%=Buckets%(i%)-Buckets%(j%)PROCatomicupdate(Buckets%(i%),Buckets%(j%),d%DIV4)ENDPROCDEFPROCroughenLOCALi%,j%REPEATi%=RND(100)j%=RND(100)UNTILi%<>j%PROCatomicupdate(Buckets%(i%),Buckets%(j%),RND(10))ENDPROCDEFPROCatomicupdate(RETURNsrc%,RETURNdst%,amt%)IFamt%>src%amt%=src%IFamt%<-dst%amt%=-dst%src%-=amt%dst%+=amt%ENDPROCDEFPROCcleanupPROC_killtimer(tid0%)PROC_killtimer(tid1%)PROC_killtimer(tid2%)ENDPROC
#include<stdio.h>#include<stdlib.h>#include<stdbool.h>#include<unistd.h>#include<time.h>#include<pthread.h>#define N_BUCKETS 15pthread_mutex_tbucket_mutex[N_BUCKETS];intbuckets[N_BUCKETS];pthread_tequalizer;pthread_trandomizer;voidtransfer_value(intfrom,intto,inthowmuch){boolswapped=false;if((from==to)||(howmuch<0)||(from<0)||(to<0)||(from>=N_BUCKETS)||(to>=N_BUCKETS))return;if(from>to){inttemp1=from;from=to;to=temp1;swapped=true;howmuch=-howmuch;}pthread_mutex_lock(&bucket_mutex[from]);pthread_mutex_lock(&bucket_mutex[to]);if(howmuch>buckets[from]&&!swapped)howmuch=buckets[from];if(-howmuch>buckets[to]&&swapped)howmuch=-buckets[to];buckets[from]-=howmuch;buckets[to]+=howmuch;pthread_mutex_unlock(&bucket_mutex[from]);pthread_mutex_unlock(&bucket_mutex[to]);}voidprint_buckets(){inti;intsum=0;for(i=0;i<N_BUCKETS;i++)pthread_mutex_lock(&bucket_mutex[i]);for(i=0;i<N_BUCKETS;i++){printf("%3d ",buckets[i]);sum+=buckets[i];}printf("= %d\n",sum);for(i=0;i<N_BUCKETS;i++)pthread_mutex_unlock(&bucket_mutex[i]);}void*equalizer_start(void*t){for(;;){intb1=rand()%N_BUCKETS;intb2=rand()%N_BUCKETS;intdiff=buckets[b1]-buckets[b2];if(diff<0)transfer_value(b2,b1,-diff/2);elsetransfer_value(b1,b2,diff/2);}returnNULL;}void*randomizer_start(void*t){for(;;){intb1=rand()%N_BUCKETS;intb2=rand()%N_BUCKETS;intdiff=rand()%(buckets[b1]+1);transfer_value(b1,b2,diff);}returnNULL;}intmain(){inti,total=0;for(i=0;i<N_BUCKETS;i++)pthread_mutex_init(&bucket_mutex[i],NULL);for(i=0;i<N_BUCKETS;i++){buckets[i]=rand()%100;total+=buckets[i];printf("%3d ",buckets[i]);}printf("= %d\n",total);// we should check if these succeededpthread_create(&equalizer,NULL,equalizer_start,NULL);pthread_create(&randomizer,NULL,randomizer_start,NULL);for(;;){sleep(1);print_buckets();}// we do not provide a "good" way to stop this run, so the following// is never reached indeed...for(i=0;i<N_BUCKETS;i++)pthread_mutex_destroy(bucket_mutex+i);returnEXIT_SUCCESS;}
Compiled withgcc -std=c99 -fopenmp
. The#pragma omp critical
ensures the following block is entered by one thread at a time.
#include<stdio.h>#include<stdlib.h>#include<omp.h>#define irand(n) (n * (double)rand()/(RAND_MAX + 1.0))intbucket[10];intmain(){inti;for(i=0;i<10;i++)bucket[i]=1000;omp_set_num_threads(3);#pragma omp parallel private(i)for(i=0;i<10000;i++){intfrom,to,mode,diff=0,sum;from=irand(10);do{to=irand(10);}while(from==to);mode=irand(10);switch(mode){case0:case1:case2:/* equalize */diff=(bucket[from]-bucket[to])/2;break;case3:/* report */sum=0;for(intj=0;j<10;j++){printf("%d ",bucket[j]);sum+=bucket[j];}printf(" Sum: %d\n",sum);continue;default:/* random transfer */diff=irand(bucket[from]);break;}#pragma omp critical{bucket[from]-=diff;bucket[to]+=diff;}}return0;}
Output:
1000 1000 1000 1798 1000 1000 1000 1000 202 1000 Sum: 10000595 800 2508 2750 470 1209 283 314 601 470 Sum: 100005 521 3339 1656 351 1038 1656 54 508 872 Sum: 10000...752 490 385 2118 1503 508 384 509 1110 2241 Sum: 10000752 823 385 2118 1544 508 10 509 1110 2241 Sum: 10000
This C# implementation uses a class to hold the buckets and data associated with them. The ThreadSafeBuckets class implements thread-stability, and ensures that two threads cannot operate on the same data at the same time. Additionally, the class uses a seperate mutex for each bucket, allowing multiple operations to occur at once if they do not alter the same buckets.
I updated the original class for a few things:
- Changed to using object locks and Montor.Enter rather than Mutexes. This allows use of the cleaner "lock" statement, and also has lower runtime overhead for in process locks- The previous implementation tracked a "swapped" state - which seems a harder way to tackle the problem. You need to acquire the locks in the correct order, not swap i and j
usingSystem;//Rand classusingSystem.Threading;//Thread, Mutex classespublicclassThreadSafeBuckets{//This class is thread safe, and ensures that all operations on it are atomic.//Calling threads do not need to ensure safety.Randomrand=newRandom();int[]Buckets;object[]locks;//Mutexes for each bucket so they can lock individuallypublicintBucketCount{get;privateset;}publicThreadSafeBuckets(intbucketcount){//Create buckets+mutexes and fill them with a random amountBucketCount=bucketcount;Buckets=newint[bucketcount];locks=newobject[bucketcount];intstartingtotal=0;for(inti=0;i<BucketCount;i++){locks[i]=newobject();Buckets[i]=rand.Next(30);startingtotal+=Buckets[i];}//Print the starting totalConsole.WriteLine("Starting total: "+startingtotal);}publicintGetBucketValue(inti){returnBuckets[i];}publicvoidTransfer(inti,intj,intamount){//Transfer amount from bucket i to bucket jif(i>BucketCount||j>BucketCount||i<0||j<0||i==j||amount<0)return;//To prevent deadlock, always lock the lower bucket firstlock(locks[Math.Min(i,j)])lock(locks[Math.Max(i,j)]){//Make sure don't transfer out more than is in the bucketamount=Math.Min(amount,Buckets[i]);//Do the transferBuckets[i]-=amount;Buckets[j]+=amount;}}publicvoidPrintBuckets(){intcounter=0;//Lock all the buckets in sequential order and print their contentsfor(inti=0;i<BucketCount;i++){Monitor.Enter(locks[i]);Console.Write(Buckets[i]+" ");counter+=Buckets[i];}//Print the bucket total, then unlock all the mutexesConsole.Write("= "+counter);Console.WriteLine();foreach(varlinlocks)Monitor.Exit(l);}}classProgram{staticThreadSafeBucketsTSBs;publicstaticvoidMain(){//Create the thread-safe bucket listTSBs=newThreadSafeBuckets(10);TSBs.PrintBuckets();//Create and start the Equalizing ThreadnewThread(newThreadStart(EqualizerThread)).Start();Thread.Sleep(1);//Create and start the Randamizing ThreadnewThread(newThreadStart(RandomizerThread)).Start();//Use this thread to do the printingPrinterThread();}//EqualizerThread runs on it's own thread and randomly averages two bucketsstaticvoidEqualizerThread(){Randomrand=newRandom();while(true){//Pick two bucketsintb1=rand.Next(TSBs.BucketCount);intb2=rand.Next(TSBs.BucketCount);//Get the differenceintdiff=TSBs.GetBucketValue(b1)-TSBs.GetBucketValue(b2);//Transfer to equalizeif(diff<0)TSBs.Transfer(b2,b1,-diff/2);elseTSBs.Transfer(b1,b2,diff/2);}}//RandomizerThread redistributes the values between two bucketsstaticvoidRandomizerThread(){Randomrand=newRandom();while(true){intb1=rand.Next(TSBs.BucketCount);intb2=rand.Next(TSBs.BucketCount);intdiff=rand.Next(TSBs.GetBucketValue(b1));TSBs.Transfer(b1,b2,diff);}}//PrinterThread prints the current bucket contentsstaticvoidPrinterThread(){while(true){Thread.Sleep(50);//Only print every few milliseconds to let the other threads workTSBs.PrintBuckets();}}}
Sample Output:
Starting total: 15615 15 12 27 6 21 19 18 16 7 = 15617 13 15 15 18 14 18 15 14 17 = 15612 9 22 15 9 8 23 10 16 32 = 1560 6 28 4 21 10 28 11 34 14 = 15635 14 30 11 32 1 26 4 3 0 = 15611 17 19 1 18 1 12 35 26 16 = 156
#include<algorithm>#include<array>#include<chrono>#include<iomanip>#include<iostream>#include<mutex>#include<random>#include<thread>usingnamespacestd;constexprintbucket_count=15;voidequalizer(array<int,bucket_count>&buckets,array<mutex,bucket_count>&bucket_mutex){random_devicerd;mt19937gen(rd());uniform_int_distribution<>dist_bucket(0,bucket_count-1);while(true){intfrom=dist_bucket(gen);intto=dist_bucket(gen);if(from!=to){lock_guard<mutex>lock_first(bucket_mutex[min(from,to)]);lock_guard<mutex>lock_second(bucket_mutex[max(from,to)]);intdiff=buckets[from]-buckets[to];intamount=abs(diff/2);if(diff<0){swap(from,to);}buckets[from]-=amount;buckets[to]+=amount;}}}voidrandomizer(array<int,bucket_count>&buckets,array<mutex,bucket_count>&bucket_mutex){random_devicerd;mt19937gen(rd());uniform_int_distribution<>dist_bucket(0,bucket_count-1);while(true){intfrom=dist_bucket(gen);intto=dist_bucket(gen);if(from!=to){lock_guard<mutex>lock_first(bucket_mutex[min(from,to)]);lock_guard<mutex>lock_second(bucket_mutex[max(from,to)]);uniform_int_distribution<>dist_amount(0,buckets[from]);intamount=dist_amount(gen);buckets[from]-=amount;buckets[to]+=amount;}}}voidprint_buckets(constarray<int,bucket_count>&buckets){inttotal=0;for(constint&bucket:buckets){total+=bucket;cout<<setw(3)<<bucket<<' ';}cout<<"= "<<setw(3)<<total<<endl;}intmain(){random_devicerd;mt19937gen(rd());uniform_int_distribution<>dist(0,99);array<int,bucket_count>buckets;array<mutex,bucket_count>bucket_mutex;for(int&bucket:buckets){bucket=dist(gen);}print_buckets(buckets);threadt_eq(equalizer,ref(buckets),ref(bucket_mutex));threadt_rd(randomizer,ref(buckets),ref(bucket_mutex));while(true){this_thread::sleep_for(chrono::seconds(1));for(mutex&mutex:bucket_mutex){mutex.lock();}print_buckets(buckets);for(mutex&mutex:bucket_mutex){mutex.unlock();}}return0;}
Function returning a new map containing altered values:
(defnxfer[mfromtoamt](let[{f-balfromt-balto}mf-bal(-f-balamt)t-bal(+t-balamt)](if(or(neg?f-bal)(neg?t-bal))(throw(IllegalArgumentException."Call results in negative balance."))(assocmfromf-baltot-bal))))
Since clojure data structures are immutable, atomic mutability occurs via a reference, in this case an atom:
(def*data*(atom{:a100:b100}));; *data* is an atom holding a map(swap!*data*xfer:a:b50);; atomically results in *data* holding {:a 50 :b 150}
Now for the test:
(defnequalize[mab](let[{a-valab-valb}mdiff(-a-valb-val)amt(/diff2)](xfermabamt)))(defnrandomize[mab](let[{a-valab-valb}mmin-val(mina-valb-val)amt(rand-int(-min-val)min-val)](xfermabamt)))(defntest-conc[fdataabnname](dotimes[in](swap!datafab)(println(str"total is "(reduce+(vals@data))" after "name" iteration "i))))(defthread-eq(Thread.#(test-concequalize*data*:a:b1000"equalize")))(defthread-rand(Thread.#(test-concrandomize*data*:a:b1000"randomize")))(.startthread-eq)(.startthread-rand)
Depends on libraries in Quicklisp. STMX is a library that provides Software Transactional Memory.
(ql:quickload'(:alexandria:stmx:bordeaux-threads))(defpackage:atomic-updates(:use:cl))(in-package:atomic-updates)(defvar*buckets*nil)(defvar*running*nil)(defundistribute(ratioab)"Atomically redistribute the values of buckets A and B by RATIO."(stmx:atomic(let*((sum(+(stmx:$a)(stmx:$b)))(a2(truncate(*ratiosum))))(setf(stmx:$a)a2)(setf(stmx:$b)(-suma2)))))(defunrunner(ratio-func)"Continously distribute to two different elements in *BUCKETS* with thevalue returned from RATIO-FUNC."(loopwhile*running*do(let((a(alexandria:random-elt*buckets*))(b(alexandria:random-elt*buckets*)))(unless(eqab)(distribute(funcallratio-func)ab)))))(defunprint-buckets()"Atomically get the bucket values and print out their metrics."(let((buckets(stmx:atomic(map'vector'stmx:$*buckets*))))(formatt"Buckets: ~a~%Sum: ~a~%"buckets(reduce'+buckets))))(defunscenario()(setf*buckets*(coerce(looprepeat20collect(stmx:tvar10))'vector))(setf*running*t)(bt:make-thread(lambda()(runner(constantly0.5))))(bt:make-thread(lambda()(runner(lambda()(random1.0))))))
ATOMIC-UPDATES>(scenario)#<SB-THREAD:THREAD"Anonymous thread"RUNNING{10058441D3}>ATOMIC-UPDATES>(looprepeat3do(print-buckets)(sleep1))Buckets:#(84121712105109104114151620118410)Sum:200Buckets:#(2122478313683109711128812154)Sum:200Buckets:#(12332833230841124235328226)Sum:200NIL
This implements a more scalable version than most of the other languages, by using a lock per bucket instead of a single lock for the whole array.
importstd.stdio:writeln;importstd.conv:text;importstd.random:uniform,Xorshift;importstd.algorithm:min,max;importstd.parallelism:task;importcore.thread:Thread;importcore.sync.mutex:Mutex;importcore.time:seconds;__gshareduinttransfersCount;finalclassBuckets(size_tnBuckets)if(nBuckets>0){aliasTBucketValue=uint;// The trailing padding avoids cache line contention// when run with two or more cores.align(128)privatestaticstructBucket{TBucketValuevalue;Mutexmtx;aliasvaluethis;}privateBucket[nBuckets]buckets;privateboolrunning;publicthis(){this.running=true;foreach(refb;buckets)b=Bucket(uniform(0,100),newMutex);}publicTBucketValueopIndex(insize_tindex)constpurenothrow{returnbuckets[index];}publicvoidtransfer(insize_tfrom,insize_tto,inTBucketValueamount){immutablelow=min(from,to);immutablehigh=max(from,to);buckets[low].mtx.lock();buckets[high].mtx.lock();scope(exit){buckets[low].mtx.unlock();buckets[high].mtx.unlock();}immutablerealAmount=min(buckets[from].value,amount);buckets[from]-=realAmount;buckets[to]+=realAmount;transfersCount++;}@propertysize_tlength()constpurenothrow{returnthis.buckets.length;}voidtoString(invoiddelegate(const(char)[])sink){TBucketValuetotal=0;foreach(refb;buckets){b.mtx.lock();total+=b;}scope(exit)foreach(refb;buckets)b.mtx.unlock();sink(text(buckets));sink(" ");sink(text(total));}}voidrandomize(size_tN)(Buckets!Ndata){autorng=Xorshift(1);while(data.running){immutablei=uniform(0,data.length,rng);immutablej=uniform(0,data.length,rng);immutableamount=uniform!"[]"(0,20,rng);data.transfer(i,j,amount);}}voidequalize(size_tN)(Buckets!Ndata){autorng=Xorshift(1);while(data.running){immutablei=uniform(0,data.length,rng);immutablej=uniform(0,data.length,rng);immutablea=data[i];immutableb=data[j];if(a>b)data.transfer(i,j,(a-b)/2);elsedata.transfer(j,i,(b-a)/2);}}voiddisplay(size_tN)(Buckets!Ndata){foreach(immutable_;0..10){writeln(transfersCount," ",data);transfersCount=0;Thread.sleep(1.seconds);}data.running=false;}voidmain(){writeln("N. transfers, buckets, buckets sum:");autodata=newBuckets!20();task!randomize(data).executeInNewThread();task!equalize(data).executeInNewThread();task!display(data).executeInNewThread();}
N. transfers, buckets, buckets sum:445977 [0, 175, 33, 18, 26, 61, 34, 13, 181, 8, 28, 12, 28, 47, 4, 12, 3, 76, 46, 59] 8644863591 [32, 18, 45, 12, 69, 29, 98, 64, 108, 28, 54, 16, 15, 93, 56, 0, 4, 16, 48, 59] 8644872790 [46, 162, 6, 2, 42, 70, 77, 34, 78, 99, 19, 0, 10, 59, 61, 13, 0, 27, 0, 59] 8645102493 [1, 10, 120, 159, 108, 0, 51, 0, 35, 74, 0, 7, 14, 5, 6, 23, 53, 99, 40, 59] 8645139426 [42, 43, 42, 42, 42, 42, 43, 42, 42, 42, 42, 43, 43, 42, 42, 43, 43, 43, 42, 59] 8644853088 [12, 108, 18, 53, 25, 62, 37, 86, 141, 0, 45, 18, 0, 30, 0, 129, 11, 0, 30, 59] 8644739723 [84, 12, 105, 80, 140, 0, 6, 53, 17, 86, 55, 0, 0, 41, 14, 51, 25, 11, 25, 59] 8645295588 [43, 43, 42, 42, 57, 53, 43, 34, 42, 66, 61, 49, 10, 39, 29, 24, 48, 50, 30, 59] 8645137883 [42, 43, 42, 42, 43, 43, 43, 42, 42, 42, 43, 42, 42, 43, 42, 43, 49, 42, 35, 59] 8645143735 [42, 42, 43, 43, 43, 42, 43, 42, 42, 43, 42, 43, 42, 42, 42, 42, 42, 43, 42, 59] 864
In E, any computation occurs in a particularvat. Over its lifetime, a vat executes many individual computations,turns, which are taken from a queue of pending events. Theeventual send operator<-
puts message-sends on the queue.
Since a vat executes only one turn at a time, each turn is atomic; since the below implementation of the transfer operation does not invoke any other code, the transfer operation is itself automatically atomic and will always preserve the total value provided that it does not have any bugs.
In this example, the tasks are in the same vat as the buckets, but it would be straightforward to write them to live in separate vats.
This example uses a Java AWT window to display the current state of the buckets.
#!/usr/bin/env runepragma.syntax("0.9")def pi := (-1.0).acos()def makeEPainter := <unsafe:com.zooko.tray.makeEPainter>def colors := <awt:makeColor># --------------------------------------------------------------# --- Definitions/** Execute 'task' repeatedly as long 'indicator' is unresolved. */def doWhileUnresolved(indicator, task) { def loop() { if (!Ref.isResolved(indicator)) { task() loop <- () } } loop <- ()}/** The data structure specified for the task. */def makeBuckets(size) { def values := ([100] * size).diverge() # storage def buckets { to size() :int { return size } /** get current quantity in bucket 'i' */ to get(i :int) { return values[i] } /** transfer 'amount' units, as much as possible, from bucket 'i' to bucket 'j' or vice versa if 'amount' is negative */ to transfer(i :int, j :int, amount :int) { def amountLim := amount.min(values[i]).max(-(values[j])) values[i] -= amountLim values[j] += amountLim } } return buckets}/** A view of the current state of the buckets. */def makeDisplayComponent(buckets) { def c := makeEPainter(def paintCallback { to paintComponent(g) { def pixelsW := c.getWidth() def pixelsH := c.getHeight() def bucketsW := buckets.size() g.setColor(colors.getWhite()) g.fillRect(0, 0, pixelsW, pixelsH) g.setColor(colors.getDarkGray()) var sum := 0 for i in 0..!bucketsW { sum += def value := buckets[i] def x0 := (i * pixelsW / bucketsW).floor() def x1 := ((i + 1) * pixelsW / bucketsW).floor() g.fillRect(x0 + 1, pixelsH - value, x1 - x0 - 1, value) } g.setColor(colors.getBlack()) g."drawString(String, int, int)"(`Total: $sum`, 2, 20) } }) c.setPreferredSize(<awt:makeDimension>(500, 300)) return c}# --------------------------------------------------------------# --- Application setupdef buckets := makeBuckets(100)def done # Promise indicating when the window is closed# Create the windowdef frame := <unsafe:javax.swing.makeJFrame>("Atomic transfers")frame.setContentPane(def display := makeDisplayComponent(buckets))frame.addWindowListener(def mainWindowListener { to windowClosing(event) :void { bind done := null } match _ {}})frame.setLocation(50, 50)frame.pack()# --------------------------------------------------------------# --- Tasks# Neatens up bucketsvar ni := 0doWhileUnresolved(done, fn { def i := ni def j := (ni + 1) %% buckets.size() buckets.transfer(i, j, (buckets[i] - buckets[j]) // 4) ni := j}) # Messes up bucketsvar mi := 0doWhileUnresolved(done, fn { def i := (mi + entropy.nextInt(3)) %% buckets.size() def j := (i + entropy.nextInt(3)) %% buckets.size() #entropy.nextInt(buckets.size()) buckets.transfer(i, j, (buckets[i] / pi).floor()) mi := j})# Updates display at fixed 10 Hz# (Note: tries to catch up; on slow systems slow this down or it will starve the other tasks)def clock := timer.every(100, def _(_) { if (Ref.isResolved(done)) { clock.stop() } else { display.repaint() }})clock.start()# --------------------------------------------------------------# --- All ready, go visible and waitframe.show()interp.waitAtTop(done)
Erlang has a built in database (Mnesia) with atomic operations. This is another way.Instead of deleting the Buckets process manually I use spawn_link(). That way Buckets goes away with the user. Output is:
[1,2,3,4,5,6,7,8,9,10] = 55[1,3,4,7,2,5,13,8,4,8] = 55[6,13,6,0,8,3,1,0,8,10] = 55[8,0,9,0,5,9,8,8,8,0] = 55[8,11,9,3,1,12,8,0,0,3] = 55[13,4,3,8,1,5,10,4,5,2] = 55[6,6,9,5,6,5,6,1,5,6] = 55[20,7,5,0,5,0,0,10,8,0] = 55[2,10,0,10,0,4,8,3,15,3] = 55[0,11,7,0,4,16,7,0,10,0] = 55
-module(atomic_updates).-export([buckets/1,buckets_get/2,buckets_get_all/1,buckets_move_contents/4,task/0]).buckets(N)->Buckets=erlang:list_to_tuple(lists:seq(1,N)),erlang:spawn_link(fun()->buckets_loop(Buckets)end).buckets_get(N,Buckets_pid)->{is_buckets_alive,true}={is_buckets_alive,erlang:is_process_alive(Buckets_pid)},Buckets_pid!{get,N,erlang:self()},receive{value,Buckets_pid,Value}->Valueend.buckets_get_all(Buckets_pid)->{is_buckets_alive,true}={is_buckets_alive,erlang:is_process_alive(Buckets_pid)},Buckets_pid!{get_all,erlang:self()},receive{values,Buckets_pid,Values}->Valuesend.buckets_move_contents(Amount,From,To,Buckets_pid)->{is_buckets_alive,true}={is_buckets_alive,erlang:is_process_alive(Buckets_pid)},Buckets_pid!{move_contents,Amount,From,To,erlang:self()},receive{move_contents_done,Buckets_pid}->okend.task()->erlang:spawn(fun()->N=10,Buckets=buckets(N),erlang:spawn_link(fun()->closer_loop(N,Buckets)end),erlang:spawn_link(fun()->redistribute_loop(N,Buckets)end),display_loop(0,N,Buckets),erlang:exit(stop)end).closer_loop(N,Buckets)->One=random:uniform(N),Two=random:uniform(N),Difference=buckets_get(One,Buckets)-buckets_get(Two,Buckets),{Amount,From,To}=closer_loop_how_to_move(Difference,One,Two),buckets_move_contents(Amount,From,To,Buckets),closer_loop(N,Buckets).closer_loop_how_to_move(Difference,One,Two)whenDifference<0->{-1*Differencediv2,Two,One};closer_loop_how_to_move(Difference,One,Two)->{Differencediv2,One,Two}.buckets_loop(Buckets)->receive{get,N,Pid}->Pid!{value,erlang:self(),erlang:element(N,Buckets)},buckets_loop(Buckets);{get_all,Pid}->Pid!{values,erlang:self(),erlang:tuple_to_list(Buckets)},buckets_loop(Buckets);{move_contents,Amount,From,To,Pid}->Pid!{move_contents_done,erlang:self()},buckets_loop(buckets_loop_move_contents(Amount,From,To,Buckets))end.buckets_loop_move_contents(_Amount,Same,Same,Buckets)->Buckets;buckets_loop_move_contents(Amount,From,To,Buckets)->Amount_from=erlang:element(From,Buckets),Clamped_amount=erlang:min(Amount,Amount_from),Removed=erlang:setelement(From,Buckets,Amount_from-Clamped_amount),Amount_to=erlang:element(To,Buckets)+Clamped_amount,erlang:setelement(To,Removed,Amount_to).display_loop(N,N,_Buckets)->ok;display_loop(Counter,N,Buckets)->Contents=buckets_get_all(Buckets),io:fwrite("~p =~p~n",[Contents,lists:sum(Contents)]),timer:sleep(100),display_loop(Counter+1,N,Buckets).redistribute_loop(N,Buckets)->Amount=random:uniform(N),From=random:uniform(N),To=random:uniform(N),buckets_move_contents(Amount,From,To,Buckets),redistribute_loop(N,Buckets).
function move(sequence s, integer amount, integer src, integer dest) if src < 1 or src > length(s) or dest < 1 or dest > length(s) or amount < 0 then return -1 else if src != dest and amount then if amount > s[src] then amount = s[src] end if s[src] -= amount s[dest] += amount end if return s end ifend functionsequence bucketsbuckets = repeat(100,10)procedure equalize() integer i, j, diff while 1 do i = rand(length(buckets)) j = rand(length(buckets)) diff = buckets[i] - buckets[j] if diff >= 2 then buckets = move(buckets, floor(diff / 2), i, j) elsif diff <= -2 then buckets = move(buckets, -floor(diff / 2), j, i) end if task_yield() end whileend procedureprocedure redistribute() integer i, j while 1 do i = rand(length(buckets)) j = rand(length(buckets)) if buckets[i] then buckets = move(buckets, rand(buckets[i]), i, j) end if task_yield() end whileend procedurefunction sum(sequence s) integer sum sum = 0 for i = 1 to length(s) do sum += s[i] end for return sumend functionatom tasktask = task_create(routine_id("equalize"), {})task_schedule(task, 1)task = task_create(routine_id("redistribute"), {})task_schedule(task, 1)task_schedule(0, {0.5, 0.5})for i = 1 to 24 do print(1,buckets) printf(1," sum: %d\n", {sum(buckets)}) task_yield()end for
Output:
{100,100,100,100,100,100,100,100,100,100} sum: 1000{150,77,68,150,113,126,14,192,68,42} sum: 1000{46,64,58,117,139,59,143,114,130,130} sum: 1000{82,99,13,99,58,117,10,191,194,137} sum: 1000{72,65,68,193,67,65,112,106,128,124} sum: 1000{43,43,42,31,234,104,105,234,30,134} sum: 1000{83,106,31,82,174,62,254,71,106,31} sum: 1000{145,102,247,86,159,30,87,35,102,7} sum: 1000{93,102,114,40,126,48,243,101,10,123} sum: 1000{160,38,9,89,182,240,116,15,61,90} sum: 1000{31,45,123,31,308,189,71,0,79,123} sum: 1000{9,86,198,87,72,194,168,148,38,0} sum: 1000{122,99,42,99,140,128,106,68,155,41} sum: 1000{223,45,0,220,220,50,153,6,82,1} sum: 1000{171,68,192,100,78,31,100,0,31,229} sum: 1000{47,70,108,253,66,113,70,92,157,24} sum: 1000{113,85,147,84,97,21,93,180,99,81} sum: 1000{82,35,8,75,166,342,48,79,99,66} sum: 1000{65,53,71,36,72,108,127,146,116,206} sum: 1000{154,15,107,47,50,204,82,177,107,57} sum: 1000{63,127,62,126,261,57,127,95,70,12} sum: 1000{25,50,0,39,55,105,586,54,47,39} sum: 1000{31,86,137,66,117,116,157,121,110,59} sum: 1000{129,65,27,38,135,54,175,129,135,113} sum: 1000
The Buckets class is thread safe and its private higher-order Lock function ensures that locks are taken out in order (to avoid deadlocks):
openSystem.ThreadingtypeBuckets(n)=letrand=System.Random()letmutex=Array.initn(fun_->newMutex())letbucket=Array.initn(fun_->100)memberthis.Count=nmemberthis.Itemn=bucket.[n]memberprivatethis.Lockisk=letis=Seq.sortisforiinisdomutex.[i].WaitOne()|>ignoretryk()finallyforiinisdomutex.[i].ReleaseMutex()memberthis.Transferijd=ifi<>j&&d<>0thenleti,j,d=ifd>0theni,j,delsej,i,-dthis.Lock[i;j](fun()->letd=mindbucket.[i]bucket.[i]<-bucket.[i]-dbucket.[j]<-bucket.[j]+d)memberthis.Read=this.Lock[0..n-1](fun()->Array.copybucket)memberthis.Print()=letxs=this.Readprintf"%A = %d\n"xs(Seq.sumxs)interfaceSystem.IDisposablewithmemberthis.Dispose()=forminmutexdo(m:>System.IDisposable).Dispose()lettransfers=ref0letmax_transfers=1000000letrand_pair(rand:System.Random)n=leti,j=rand.Nextn,rand.Next(n-1)i,ifj<ithenjelsej+1letequalizer(bucket:Buckets)()=letrand=System.Random()whileSystem.Threading.Interlocked.Incrementtransfers<max_transfersdoleti,j=rand_pairrandbucket.Countletd=(bucket.[i]-bucket.[j])/2ifd>0thenbucket.Transferijdelsebucket.Transferji-dletrandomizer(bucket:Buckets)()=letrand=System.Random()whileSystem.Threading.Interlocked.Incrementtransfers<max_transfersdoleti,j=rand_pairrandbucket.Countletd=1+rand.Nextbucket.[i]bucket.Transferijddousebucket=newBuckets(10)letequalizer=Thread(equalizerbucket)letrandomizer=Thread(randomizerbucket)bucket.Print()equalizer.Start()randomizer.Start()while!transfers<max_transfersdoThread.Sleep100bucket.Print()
This program performs a million concurrent transfers. Typical output is:
[|100;100;100;100;100;100;100;100;100;100|]=1000[|119;61;138;115;157;54;82;58;157;59|]=1000[|109;90;78;268;55;104;91;46;105;54|]=1000[|101;75;38;114;161;160;2;234;14;101|]=1000[|104;30;114;37;32;117;50;236;127;153|]=1000[|102;32;6;55;367;69;157;80;77;55|]=1000[|211;12;319;18;11;25;73;154;154;23|]=1000[|23;373;110;108;64;33;109;8;63;109|]=1000[|72;106;174;99;115;141;98;63;123;9|]=1000[|188;67;271;30;76;134;1;74;91;68|]=1000[|2;46;240;198;63;63;113;57;136;82|]=1000[|5;151;11;191;88;236;14;0;152;152|]=1000[|162;97;102;97;122;123;0;86;84;127|]=1000[|9;11;204;50;169;206;137;26;137;51|]=1000[|175;55;157;150;116;54;10;168;114;1|]=1000[|73;85;124;3;63;62;189;115;172;114|]=1000[|112;102;253;124;39;67;197;77;20;9|]=1000[|139;172;102;1;101;64;127;55;92;147|]=1000[|54;72;130;31;99;99;130;38;186;161|]=1000[|90;0;43;46;84;335;77;79;90;156|]=1000[|20;7;128;115;24;26;128;105;240;207|]=1000[|42;79;45;60;312;37;26;61;47;291|]=1000[|176;25;10;44;126;268;78;94;46;133|]=1000[|117;153;74;63;214;44;43;93;96;103|]=1000[|56;11;106;54;1;135;174;140;174;149|]=1000[|84;153;108;77;118;140;96;102;103;19|]=1000[|59;64;85;118;215;127;42;42;120;128|]=1000[|147;95;175;116;117;0;74;116;117;43|]=1000[|131;24;128;140;45;139;155;23;68;147|]=1000[|63;184;70;24;64;84;254;14;184;59|]=1000[|119;0;234;0;98;130;94;53;99;173|]=1000[|101;0;114;129;162;176;86;84;64;84|]=1000[|95;49;57;38;73;153;276;10;147;102|]=1000[|109;182;3;147;81;107;2;142;147;80|]=1000[|45;2;103;43;103;79;65;314;57;189|]=1000[|86;86;202;47;69;11;31;246;157;65|]=1000[|82;27;107;86;106;182;64;120;82;144|]=1000[|32;158;248;50;83;109;85;16;134;85|]=1000[|49;15;246;68;69;13;219;123;130;68|]=1000[|125;133;70;23;266;30;30;44;44;235|]=1000[|18;40;174;145;146;131;62;46;138;100|]=1000[|24;128;64;104;65;109;231;101;87;87|]=1000[|107;82;40;8;133;110;180;82;102;156|]=1000[|129;122;122;52;22;143;45;49;217;99|]=1000[|15;13;71;55;55;120;115;192;192;172|]=1000[|3;95;136;76;74;37;309;44;137;89|]=1000[|14;185;47;47;97;164;180;74;98;94|]=1000[|152;145;148;83;27;35;35;77;289;9|]=1000[|78;133;147;148;83;84;142;21;141;23|]=1000[|101;63;94;168;63;90;55;94;209;63|]=1000[|73;131;182;172;130;43;102;102;5;60|]=1000[|84;61;102;9;164;175;56;4;266;79|]=1000[|89;95;29;78;200;82;152;87;101;87|]=1000[|32;33;100;7;132;75;134;234;85;168|]=1000[|197;53;81;27;1;264;100;130;34;113|]=1000[|120;198;102;51;102;64;178;45;64;76|]=1000[|208;147;18;25;178;159;23;170;36;36|]=1000Pressanykeytocontinue...
RandomizeTimerDimSharedAsUintegercubo(1To10),a,iForiAsUinteger=1To10cubo(i)=Int(Rnd*90)NextiFunctionDisplay(cadenaAsString)AsUintegerDimAsUintegervalorPrintcadena;Spc(2);ForiAsUinteger=1To10valor+=cubo(i)PrintUsing"###";cubo(i);NextiPrint" Total:";valorReturnvalorEndFunctionSubFlatten(fAsUinteger)DimAsUintegerf1=Int((f/10)+.5),f2ForiAsUinteger=1To10cubo(i)=f1f2+=f1Nexticubo(10)+=f-f2EndSubSubTransfer(a1AsUinteger,a2AsUinteger)DimAsUintegertemp=Int(Rnd*cubo(a1))cubo(a1)-=tempcubo(a2)+=tempEndSuba=Display(" Display:")' show original arrayFlatten(a)' flatten the arraya=Display(" Flatten:")' show flattened arrayTransfer(3,5)' transfer some amount from 3 to 5Display(" 19 from 3 to 5:")' show transfer arraySleep
Display: 8 77 51 38 76 47 43 16 1 1 Total: 358 Flatten: 36 36 36 36 36 36 36 36 36 34 Total: 358 19 from 3 to 5: 36 36 21 36 51 36 36 36 36 34 Total: 358
local fn PopulateArrayWithRandomNumbers NSUInteger i for i = 0 to 9 mda (i) = rnd(90) nextend fnlocal fn Display( title as CFStringRef ) as NSUInteger NSUInteger i, worth = 0 CFStringRef comma = @"," printf @"%@ [\b", title for i = 0 to 9 worth += mda_integer (i) if i == 9 then comma = @"" printf @"%2lu%@\b", mda_integer (i), comma next printf @"] Sum = %lu", worthend fn = worthlocal fn Flatten( f as NSUInteger ) NSUInteger i, f1 = int((f / 10) + .5 ), f2 = 0, temp for i = 0 to 9 mda (i) = f1 f2 += f1 next temp = mda_integer (9) mda (9) = temp + f - f2end fnlocal fn Transfer( a1 as NSUInteger, a2 as NSUInteger ) NSUInteger t, temp = int( rnd( mda_integer ( a1 ) ) ) t = mda_integer ( a1 ) : mda ( a1 ) = t -temp t = mda_integer ( a2 ) : mda ( a2 ) = t +tempend fnNSUInteger a, irandomfn PopulateArrayWithRandomNumbersa = fn Display( @" Initial array:" )fn Flatten( a )a = fn Display( @" Current values:" )fn Transfer( 3, 5 )fn Display( @" 19 from 3 to 5:" )HandleEvents
Initial array: [28,73,90, 1,75,51,69,35,70,28] Sum = 520 Current values: [52,52,52,52,52,52,52,52,52,52] Sum = 520 19 from 3 to 5: [52,52,52,34,52,70,52,52,52,52] Sum = 520
packagemainimport("fmt""math/rand""sync""time")constnBuckets=10typebucketListstruct{b[nBuckets]int// bucket data specified by task// transfer counts for each updater, not strictly required by task but// useful to show that the two updaters get fair chances to run.tc[2]intsync.Mutex// synchronization}// Updater ids, to track number of transfers by updater.// these can index bucketlist.tc for example.const(idOrder=iotaidChaos)constinitialSum=1000// sum of all bucket values// Constructor.funcnewBucketList()*bucketList{varblbucketList// Distribute initialSum across buckets.fori,dist:=nBuckets,initialSum;i>0;{v:=dist/ii--bl.b[i]=vdist-=v}return&bl}// method 1 required by task, get current value of a bucketfunc(bl*bucketList)bucketValue(bint)int{bl.Lock()// lock before accessing datar:=bl.b[b]bl.Unlock()returnr}// method 2 required by taskfunc(bl*bucketList)transfer(b1,b2,aint,uxint){// Get access.bl.Lock()// Clamping maintains invariant that bucket values remain nonnegative.ifa>bl.b[b1]{a=bl.b[b1]}// Transfer.bl.b[b1]-=abl.b[b2]+=abl.tc[ux]++// increment transfer countbl.Unlock()}// additional useful methodfunc(bl*bucketList)snapshot(s*[nBuckets]int,tc*[2]int){bl.Lock()*s=bl.b*tc=bl.tcbl.tc=[2]int{}// clear transfer countsbl.Unlock()}varbl=newBucketList()funcmain(){// Three concurrent tasks.goorder()// make values closer to equalgochaos()// arbitrarily redistribute valuesbuddha()// display total value and individual values of each bucket}// The concurrent tasks exercise the data operations by calling bucketList// methods. The bucketList methods are "threadsafe", by which we really mean// goroutine-safe. The conconcurrent tasks then do no explicit synchronization// and are not responsible for maintaining invariants.// Exercise 1 required by task: make values more equal.funcorder(){r:=rand.New(rand.NewSource(time.Now().UnixNano()))for{b1:=r.Intn(nBuckets)b2:=r.Intn(nBuckets-1)ifb2>=b1{b2++}v1:=bl.bucketValue(b1)v2:=bl.bucketValue(b2)ifv1>v2{bl.transfer(b1,b2,(v1-v2)/2,idOrder)}else{bl.transfer(b2,b1,(v2-v1)/2,idOrder)}}}// Exercise 2 required by task: redistribute values.funcchaos(){r:=rand.New(rand.NewSource(time.Now().Unix()))for{b1:=r.Intn(nBuckets)b2:=r.Intn(nBuckets-1)ifb2>=b1{b2++}bl.transfer(b1,b2,r.Intn(bl.bucketValue(b1)+1),idChaos)}}// Exercise 3 requred by task: display total.funcbuddha(){vars[nBuckets]intvartc[2]intvartotal,nTicksintfmt.Println("sum ---updates--- mean buckets")tr:=time.Tick(time.Second/10)for{<-trbl.snapshot(&s,&tc)varsumintfor_,l:=ranges{ifl<0{panic("sob")// invariant not preserved}sum+=l}// Output number of updates per tick and cummulative mean// updates per tick to demonstrate "as often as possible"// of task exercises 1 and 2.total+=tc[0]+tc[1]nTicks++fmt.Printf("%d %6d %6d %7d %3d\n",sum,tc[0],tc[1],total/nTicks,s)ifsum!=initialSum{panic("weep")// invariant not preserved}}}
sum ---updates--- mean buckets1000 317832 137235 455067 [100 100 100 100 100 100 100 100 100 100]1000 391239 339389 592847 [ 85 266 81 85 131 37 62 80 111 62]1000 509436 497362 730831 [ 70 194 194 62 16 193 10 16 126 119]1000 512065 499038 800899 [100 100 100 100 100 100 100 100 100 100]1000 250590 121947 715226 [ 47 271 78 61 34 199 73 58 100 79]...
Solution:
classBuckets{defcells=[]finalnBuckets(n,limit=1000,random=newRandom()){this.n=n(0..<n).each{cells<<random.nextInt(limit)}}synchronizedgetAt(i){cells[i]}synchronizedtransfer(from,to,amount){assertfromin(0..<n)&&toin(0..<n)defcappedAmt=[cells[from],amount].min()cells[from]-=cappedAmtcells[to]+=cappedAmt}synchronizedStringtoString(){cells.toString()}}defrandom=newRandom()defbuckets=newBuckets(5)defmakeCloser={i,j->synchronized(buckets){deftargetDiff=(buckets[i]-buckets[j]).intdiv(2)if(targetDiff<0){buckets.transfer(j,i,-targetDiff)}else{buckets.transfer(i,j,targetDiff)}}}defrandomize={i,j->synchronized(buckets){deftargetLimit=buckets[i]+buckets[j]deftargetI=random.nextInt(targetLimit+1)if(targetI<buckets[i]){buckets.transfer(i,j,buckets[i]-targetI)}else{buckets.transfer(j,i,targetI-buckets[i])}}}Thread.start{defstart=System.currentTimeMillis()while(start+10000>System.currentTimeMillis()){defi=random.nextInt(buckets.n)defj=random.nextInt(buckets.n)makeCloser(i,j)}}Thread.start{defstart=System.currentTimeMillis()while(start+10000>System.currentTimeMillis()){defi=random.nextInt(buckets.n)defj=random.nextInt(buckets.n)randomize(i,j)}}defstart=System.currentTimeMillis()while(start+10000>System.currentTimeMillis()){synchronized(buckets){defsum=buckets.cells.sum()println"${new Date()}: checksum: ${sum} buckets: ${buckets}"}Thread.sleep(500)}
Output:
Sat Jan 07 02:24:45 CST 2012: checksum: 2161 buckets: [227, 700, 635, 299, 300]Sat Jan 07 02:24:46 CST 2012: checksum: 2161 buckets: [477, 365, 364, 478, 477]Sat Jan 07 02:24:46 CST 2012: checksum: 2161 buckets: [432, 434, 429, 434, 432]Sat Jan 07 02:24:47 CST 2012: checksum: 2161 buckets: [432, 428, 434, 432, 435]Sat Jan 07 02:24:48 CST 2012: checksum: 2161 buckets: [432, 433, 432, 432, 432]Sat Jan 07 02:24:48 CST 2012: checksum: 2161 buckets: [433, 432, 432, 432, 432]Sat Jan 07 02:24:49 CST 2012: checksum: 2161 buckets: [359, 425, 254, 868, 255]Sat Jan 07 02:24:49 CST 2012: checksum: 2161 buckets: [433, 432, 432, 432, 432]Sat Jan 07 02:24:50 CST 2012: checksum: 2161 buckets: [432, 431, 430, 430, 438]Sat Jan 07 02:24:50 CST 2012: checksum: 2161 buckets: [466, 404, 388, 466, 437]Sat Jan 07 02:24:51 CST 2012: checksum: 2161 buckets: [476, 569, 365, 386, 365]Sat Jan 07 02:24:51 CST 2012: checksum: 2161 buckets: [35, 111, 1038, 387, 590]Sat Jan 07 02:24:52 CST 2012: checksum: 2161 buckets: [423, 341, 341, 423, 633]Sat Jan 07 02:24:52 CST 2012: checksum: 2161 buckets: [141, 1295, 102, 370, 253]Sat Jan 07 02:24:53 CST 2012: checksum: 2161 buckets: [683, 188, 345, 638, 307]Sat Jan 07 02:24:53 CST 2012: checksum: 2161 buckets: [379, 275, 354, 240, 913]Sat Jan 07 02:24:54 CST 2012: checksum: 2161 buckets: [894, 515, 455, 234, 63]Sat Jan 07 02:24:54 CST 2012: checksum: 2161 buckets: [306, 507, 793, 507, 48]Sat Jan 07 02:24:55 CST 2012: checksum: 2161 buckets: [463, 462, 240, 632, 364]Sat Jan 07 02:24:55 CST 2012: checksum: 2161 buckets: [204, 162, 223, 996, 576]
This uses MVar as its concurrency protection. An MVar is a container that may have a value or not; trying totake the value when it is absent blocks until a value is provided, at which point it is atomically taken again. modifyMVar_ is a shortcut to take the value, then put a modified value; readMVar takes the value and puts back the same value while returning it.
So, at any given time, the current value map is either in the MVar or being examined or replaced by one thread, but not both. The IntMap held by the MVar is a pure immutable data structure (adjust
returns a modified version), so there is no problem from that thedisplay task puts the value back before it is done printing.
moduleAtomicUpdates(main)whereimportControl.Concurrent(forkIO,threadDelay)importControl.Concurrent.MVar(MVar,newMVar,readMVar,modifyMVar_)importControl.Monad(forever,forM_)importData.IntMap(IntMap,(!),toAscList,fromList,adjust)importSystem.Random(randomRIO)importText.Printf(printf)-------------------------------------------------------------------------------typeIndex=InttypeValue=IntegerdataBuckets=BucketsIndex(MVar(IntMapValue))makeBuckets::Int->IOBucketssize::Buckets->IndexcurrentValue::Buckets->Index->IOValuecurrentValues::Buckets->IO(IntMapValue)transfer::Buckets->Index->Index->Value->IO()-------------------------------------------------------------------------------makeBucketsn=dov<-newMVar(fromList[(i,100)|i<-[1..n]])return(Bucketsnv)size(Bucketsn_)=ncurrentValue(Buckets_v)i=fmap(!i)(readMVarv)currentValues(Buckets_v)=readMVarvtransferb@(Bucketsnv)ijamt|amt<0=transferbji(-amt)|otherwise=domodifyMVar_v$\map->letamt'=minamt(map!i)inreturn$adjust(subtractamt')i$adjust(+amt')j$map-------------------------------------------------------------------------------roughen,smooth,display::Buckets->IO()pickbuckets=randomRIO(1,sizebuckets)roughenbuckets=foreverloopwhereloop=doi<-pickbucketsj<-pickbucketsiv<-currentValuebucketsitransferbucketsij(iv`div`3)smoothbuckets=foreverloopwhereloop=doi<-pickbucketsj<-pickbucketsiv<-currentValuebucketsijv<-currentValuebucketsjtransferbucketsij((iv-jv)`div`4)displaybuckets=foreverloopwhereloop=dothreadDelay1000000bmap<-currentValuesbucketsputStrLn(report$mapsnd$toAscListbmap)reportlist="\nTotal: "++show(sumlist)++"\n"++barswherebars=concatMaprow$map(*40)$reverse[1..5]rowlim=printf"%3d "lim++[ifx>=limthen'*'else' '|x<-list]++"\n"main=dobuckets<-makeBuckets100forkIO(roughenbuckets)forkIO(smoothbuckets)displaybuckets
Sample output:
Total: 10000200 * * * 160 * * * * * * * * * * 120 ** ** * *** **** ** * * * ** * * ** * * * * * * * ** 80 *** ** ** ***** **** ****** ****** *** ** ** ***** * * ***** * * ** * *** ** * ** 40 ********** ******************************* ***** ****** ******************* ******* ***************Total: 10000200 * 160 * * * * * * *120 * ** *** * * ** * ** * ** * * * ** * * * * * * * ** * * ** 80 ***** ** ******** *** * *** ** ** *** * * ***** **** *** *** * ** *** *** * *** * * ** 40 ******** ****************** ************************************************************** *******
The following only works in Unicon:
globalmtxproceduremain(A)nBuckets:=integer(A[1])|10nShows:=integer(A[2])|4showBuckets:=A[3]mtx:=mutex()every!(buckets:=list(nBuckets)):=?100threadrepeat{every(b1|b2):=?nBuckets# OK if same!criticalmtx:xfer((buckets[b1]-buckets[b2])/2,b1,b2)}threadrepeat{every(b1|b2):=?nBuckets# OK if same!criticalmtx:xfer(integer(?buckets[b1]),b1,b2)}wait(threadrepeat{delay(500)criticalmtx:{every(sum:=0)+:=!bucketswrites("Sum: ",sum)if\showBucketstheneverywrites(" -> "|right(!buckets,4))}write()if(nShows-:=1)<=0thenbreak})endprocedurexfer(x,b1,b2)buckets[b1]-:=xbuckets[b2]+:=xend
Sample run:
->au 20 10 yesSum: 973 -> 48 49 48 49 49 49 48 48 49 49 49 49 48 49 48 49 48 49 49 49Sum: 973 -> 49 49 48 49 49 48 49 49 49 48 48 49 49 48 49 49 48 48 49 49Sum: 973 -> 49 49 49 48 49 48 48 49 49 49 49 49 49 48 49 48 48 48 49 49Sum: 973 -> 49 49 49 48 48 48 48 48 49 49 49 49 49 49 49 49 49 48 49 48Sum: 973 -> 48 49 48 49 49 49 49 48 49 49 48 48 48 49 48 49 49 49 49 49Sum: 973 -> 70 51 49 31 87 51 53 51 48 50 88 12 43 39 50 46 50 0 53 51Sum: 973 -> 11 15 83 95 3 53 145 0 8 120 9 9 10 5 45 122 38 70 2 130Sum: 973 -> 12 260 17 3 45 13 9 4 46 71 18 41 15 68 104 53 18 104 44 28Sum: 973 -> 49 48 49 49 49 48 49 48 49 49 49 49 49 48 49 48 49 48 48 49Sum: 973 -> 140 47 32 47 32 60 227 0 48 32 78 15 36 135 8 16 0 8 11 1->
importjava.util.Arrays;importjava.util.concurrent.ThreadLocalRandom;publicclassAtomicUpdates{privatestaticfinalintNUM_BUCKETS=10;publicstaticclassBuckets{privatefinalint[]data;publicBuckets(int[]data){this.data=data.clone();}publicintgetBucket(intindex){synchronized(data){returndata[index];}}publicinttransfer(intsrcIndex,intdstIndex,intamount){if(amount<0)thrownewIllegalArgumentException("negative amount: "+amount);if(amount==0)return0;synchronized(data){if(data[srcIndex]-amount<0)amount=data[srcIndex];if(data[dstIndex]+amount<0)amount=Integer.MAX_VALUE-data[dstIndex];if(amount<0)thrownewIllegalStateException();data[srcIndex]-=amount;data[dstIndex]+=amount;returnamount;}}publicint[]getBuckets(){synchronized(data){returndata.clone();}}}privatestaticlonggetTotal(int[]values){longtotal=0;for(intvalue:values){total+=value;}returntotal;}publicstaticvoidmain(String[]args){ThreadLocalRandomrnd=ThreadLocalRandom.current();int[]values=newint[NUM_BUCKETS];for(inti=0;i<values.length;i++)values[i]=rnd.nextInt()&Integer.MAX_VALUE;System.out.println("Initial Array: "+getTotal(values)+" "+Arrays.toString(values));Bucketsbuckets=newBuckets(values);newThread(()->equalize(buckets),"equalizer").start();newThread(()->transferRandomAmount(buckets),"transferrer").start();newThread(()->print(buckets),"printer").start();}privatestaticvoidtransferRandomAmount(Bucketsbuckets){ThreadLocalRandomrnd=ThreadLocalRandom.current();while(true){intsrcIndex=rnd.nextInt(NUM_BUCKETS);intdstIndex=rnd.nextInt(NUM_BUCKETS);intamount=rnd.nextInt()&Integer.MAX_VALUE;buckets.transfer(srcIndex,dstIndex,amount);}}privatestaticvoidequalize(Bucketsbuckets){ThreadLocalRandomrnd=ThreadLocalRandom.current();while(true){intsrcIndex=rnd.nextInt(NUM_BUCKETS);intdstIndex=rnd.nextInt(NUM_BUCKETS);intamount=(buckets.getBucket(srcIndex)-buckets.getBucket(dstIndex))/2;if(amount>=0)buckets.transfer(srcIndex,dstIndex,amount);}}privatestaticvoidprint(Bucketsbuckets){while(true){longnextPrintTime=System.currentTimeMillis()+3000;longnow;while((now=System.currentTimeMillis())<nextPrintTime){try{Thread.sleep(nextPrintTime-now);}catch(InterruptedExceptione){return;}}int[]bucketValues=buckets.getBuckets();System.out.println("Current values: "+getTotal(bucketValues)+" "+Arrays.toString(bucketValues));}}}
Initial Array: 8345792262 [143255168, 196076270, 933397723, 1556699232, 1050802212, 538674858, 1196357020, 738586704, 726124301, 1265818774]Current values: 8345792262 [0, 1874588555, 1422104978, 1646554792, 272895092, 0, 1100055274, 562892928, 0, 1466700643]Current values: 8345792262 [0, 938536756, 1022153269, 802097042, 834165196, 893056852, 1022153268, 985683168, 985683168, 862263543]Current values: 8345792262 [828663081, 828663080, 800738961, 653833491, 926105549, 856587200, 1235929058, 653833491, 780719176, 780719175]Current values: 8345792262 [834986940, 835010170, 833752099, 835010170, 834668841, 834620567, 833370581, 835083486, 834620567, 834668841]Current values: 8345792262 [0, 249877205, 1201027166, 2147483647, 0, 966988101, 725353114, 107211829, 2147483647, 800367553]Current values: 8345792262 [789241957, 389741912, 898370461, 1824723292, 389741912, 898370462, 1824723293, 434230374, 896648599, 0]Current values: 8345792262 [290197046, 76068608, 2147483647, 185783029, 646610948, 187523099, 1387188383, 0, 2147483647, 1277453855]Current values: 8345792262 [0, 0, 1594297983, 1972188797, 0, 2147483647, 0, 2147483647, 92403769, 391934419]Current values: 8345792262 [330331828, 330331828, 2147483647, 515452290, 2010486407, 0, 515452290, 0, 348770325, 2147483647]...
Commenting out either of the threads mutating the buckets shows that they work properly.
usingStatsBasefunctionrunall()nbuckets=16unfinish=truespinner=ReentrantLock()buckets=rand(1:99,nbuckets)totaltrans=0bucketsum()=sum(buckets)smallpause()=sleep(rand()/2000)picktwo()=(samplepair(nbuckets)...)functionequalizer()whileunfinishsmallpause()iftrylock(spinner)i,j=picktwo()sm=buckets[i]+buckets[j]m=fld(sm+1,2)buckets[i],buckets[j]=m,sm-mtotaltrans+=1unlock(spinner)endendendfunctionredistributor()whileunfinishsmallpause()iftrylock(spinner)i,j=picktwo()sm=buckets[i]+buckets[j]buckets[i]=rand(0:sm)buckets[j]=sm-buckets[i]totaltrans+=1unlock(spinner)endendendfunctionaccountant()count=0whilecount<16smallpause()iftrylock(spinner)println("Current state of buckets:$buckets. Total in buckets:$(bucketsum())")unlock(spinner)count+=1sleep(1)endendunfinish=falseendt=time()@asyncequalizer()@asyncredistributor()@asyncaccountant()whileunfinishsleep(0.25)endprintln("Total transactions:$totaltrans ($(round(Int,totaltrans/(time()-t))) unlocks per second).")endrunall()
Current state of buckets: [56, 26, 34, 57, 26, 25, 39, 91, 53, 46, 96, 67, 86, 49, 2, 85]. Total in buckets: 838Current state of buckets: [62, 32, 90, 50, 9, 43, 16, 71, 67, 99, 22, 44, 63, 85, 78, 7]. Total in buckets: 838Current state of buckets: [58, 25, 41, 30, 9, 79, 42, 43, 32, 66, 110, 123, 90, 35, 13, 42]. Total in buckets: 838Current state of buckets: [86, 63, 70, 21, 41, 69, 30, 29, 38, 40, 12, 28, 85, 13, 127, 86]. Total in buckets: 838Current state of buckets: [45, 32, 26, 30, 45, 9, 86, 200, 31, 45, 9, 23, 60, 64, 79, 54]. Total in buckets: 838Current state of buckets: [68, 16, 89, 104, 15, 35, 15, 23, 91, 92, 29, 27, 33, 21, 136, 44]. Total in buckets: 838Current state of buckets: [13, 72, 8, 25, 27, 62, 134, 33, 78, 79, 7, 22, 132, 73, 12, 61]. Total in buckets: 838Current state of buckets: [97, 78, 16, 90, 90, 69, 0, 22, 26, 84, 23, 22, 78, 69, 32, 42]. Total in buckets: 838Current state of buckets: [3, 105, 99, 69, 70, 8, 50, 32, 17, 69, 53, 1, 68, 66, 64, 64]. Total in buckets: 838Current state of buckets: [27, 181, 9, 5, 66, 16, 60, 56, 66, 140, 43, 29, 51, 59, 1, 29]. Total in buckets: 838Current state of buckets: [45, 108, 45, 28, 58, 108, 86, 41, 45, 29, 57, 11, 54, 23, 52, 48]. Total in buckets: 838Current state of buckets: [76, 45, 47, 75, 62, 34, 73, 27, 102, 64, 32, 51, 55, 32, 43, 20]. Total in buckets: 838Current state of buckets: [35, 69, 41, 34, 29, 79, 82, 72, 71, 65, 34, 67, 68, 14, 33, 45]. Total in buckets: 838Current state of buckets: [85, 53, 53, 26, 45, 53, 84, 99, 48, 50, 27, 52, 60, 79, 13, 11]. Total in buckets: 838Current state of buckets: [49, 63, 24, 38, 64, 79, 75, 70, 69, 68, 50, 74, 12, 60, 6, 37]. Total in buckets: 838Current state of buckets: [32, 20, 82, 70, 54, 41, 87, 15, 15, 44, 82, 55, 17, 33, 87, 104]. Total in buckets: 838Total transactions: 26751 (1639 unlocks per second).
// version 1.2.0importjava.util.concurrent.ThreadLocalRandomimportkotlin.concurrent.threadconstvalNUM_BUCKETS=10classBuckets(data:IntArray){privatevaldata=data.copyOf()operatorfunget(index:Int)=synchronized(data){data[index]}funtransfer(srcIndex:Int,dstIndex:Int,amount:Int):Int{if(amount<0){throwIllegalArgumentException("Negative amount: $amount")}if(amount==0)return0synchronized(data){vara=amountif(data[srcIndex]-a<0)a=data[srcIndex]if(data[dstIndex]+a<0)a=Int.MAX_VALUE-data[dstIndex]if(a<0)throwIllegalStateException()data[srcIndex]-=adata[dstIndex]+=areturna}}valbucketsget()=synchronized(data){data.copyOf()}funtransferRandomAmount(){valrnd=ThreadLocalRandom.current()while(true){valsrcIndex=rnd.nextInt(NUM_BUCKETS)valdstIndex=rnd.nextInt(NUM_BUCKETS)valamount=rnd.nextInt()andInt.MAX_VALUEtransfer(srcIndex,dstIndex,amount)}}funequalize(){valrnd=ThreadLocalRandom.current()while(true){valsrcIndex=rnd.nextInt(NUM_BUCKETS)valdstIndex=rnd.nextInt(NUM_BUCKETS)valamount=(this[srcIndex]-this[dstIndex])/2if(amount>=0)transfer(srcIndex,dstIndex,amount)}}funprint(){while(true){valnextPrintTime=System.currentTimeMillis()+3000while(true){valnow=System.currentTimeMillis()if(now>=nextPrintTime)breaktry{Thread.sleep(nextPrintTime-now)}catch(e:InterruptedException){return}}valbucketValues=bucketsprintln("Current values: ${bucketValues.total} ${bucketValues.asList()}")}}}valIntArray.total:Longget(){varsum=0Lfor(dinthis)sum+=dreturnsum}funmain(args:Array<String>){valrnd=ThreadLocalRandom.current()valvalues=IntArray(NUM_BUCKETS){rnd.nextInt()andInt.MAX_VALUE}println("Initial array: ${values.total} ${values.asList()}")valbuckets=Buckets(values)thread(name="equalizer"){buckets.equalize()}thread(name="transferrer"){buckets.transferRandomAmount()}thread(name="printer"){buckets.print()}}
Sample output:
Initial array: 9412276676 [1252597313, 1908616225, 824662669, 972947315, 2126883821, 405179067, 693458796, 481375538, 396750085, 349805847]Current values: 9412276676 [2147483647, 844064584, 983174119, 580879514, 1073741823, 666808378, 2147483647, 0, 484320482, 484320482]Current values: 9412276676 [941221423, 941207347, 941304553, 941221422, 941235585, 941235585, 941225242, 941242321, 941157955, 941225243]Current values: 9412276676 [941656114, 941197476, 941190372, 941203044, 941187119, 941177701, 941208610, 941038975, 941226893, 941190372]Current values: 9412276676 [0, 202110459, 2147483647, 1901203310, 2147483647, 1489083519, 0, 234363721, 1290548373, 0]Current values: 9412276676 [695300460, 2147483647, 1452183187, 2147483647, 0, 0, 0, 570277505, 252064583, 2147483647]Current values: 9412276676 [941219147, 941226725, 941226725, 941238796, 941219147, 941247715, 941238795, 941234946, 941189734, 941234946]Current values: 9412276676 [941306524, 941145153, 941241743, 940668167, 942314400, 941491117, 940668168, 941145153, 941306524, 940989727]Current values: 9412276676 [945548859, 939149475, 935477311, 941294057, 944294715, 940031668, 940860151, 940662863, 940662862, 944294715]Current values: 9412276676 [941254898, 941342907, 941188859, 941250824, 941250825, 940973864, 941078878, 941373381, 941373381, 941188859]Current values: 9412276676 [941147294, 941232689, 941132597, 941330728, 941281708, 941236213, 941147294, 941265970, 941236214, 941265969]......
Lasso thread objects are thread-safe by design.
defineatomic=>thread{dataprivatebuckets=staticarray_join(10,void),privatelock=0publiconCreate=>{loop(.buckets->size)=>{.`buckets`->get(loop_count)=math_random(0,1000)}}publicbuckets=>.`buckets`publicbucket(index::integer)=>.`buckets`->get(#index)publictransfer(source::integer,dest::integer,amount::integer)=>{#source==#dest?return#amount=math_min(#amount,.`buckets`->get(#source)).`buckets`->get(#source)-=#amount.`buckets`->get(#dest)+=#amount}publicnumBuckets=>.`buckets`->sizepubliclock=>{.`lock`==1?returnfalse.`lock`=1returntrue}publicunlock=>{.`lock`=0}}local(initial_total)=(withbinatomic->bucketssum#b)local(total)=#initial_total// Make 2 buckets close to equallocal(_)=split_thread=>{local(bucket1)=math_random(1,atomic->numBuckets)local(bucket2)=math_random(1,atomic->numBuckets)local(value1)=atomic->bucket(#bucket1)local(value2)=atomic->bucket(#bucket2)if(#value1>=#value2)=>{atomic->transfer(#bucket1,#bucket2,(#value1-#value2)/2)elseatomic->transfer(#bucket2,#bucket1,(#value2-#value1)/2)}currentCapture->restart}// Randomly distribute 2 bucketslocal(_)=split_thread=>{local(bucket1)=math_random(1,atomic->numBuckets)local(bucket2)=math_random(1,atomic->numBuckets)local(value1)=atomic->bucket(#bucket1)atomic->transfer(#bucket1,#bucket2,math_random(1,#value1))currentCapture->restart}local(buckets)while(#initial_total==#total)=>{sleep(2000)#buckets=atomic->buckets#total=withbin#bucketssum#bstdoutnl(#buckets->asString+" -- total: "+#total)}stdoutnl(`ERROR: totals no longer match: `+#initial_total+', '+#total)
staticarray(130, 399, 339, 0, 444, 444, 618, 872, 390, 23) -- total: 3659staticarray(233, 538, 461, 117, 389, 110, 232, 517, 633, 429) -- total: 3659staticarray(129, 648, 494, 809, 823, 132, 425, 131, 58, 10) -- total: 3659staticarray(255, 484, 53, 261, 484, 264, 336, 521, 211, 790) -- total: 3659staticarray(464, 16, 463, 1043, 470, 177, 369, 486, 41, 130) -- total: 3659staticarray(281, 717, 341, 716, 50, 17, 129, 247, 964, 197) -- total: 3659staticarray(423, 509, 51, 458, 265, 423, 292, 458, 661, 119) -- total: 3659
The following example can be found in the Logtalk distribution and is used here with permission. Works when using SWI-Prolog, XSB, or YAP as the backend compiler.
:-object(buckets). :-threaded. :-public([start/0, start/4]).% bucket representation :-private(bucket_/2). :-dynamic(bucket_/2).% use the same mutex for all the predicates that access the buckets :-private([bucket/2, buckets/1, transfer/3]). :-synchronized([bucket/2, buckets/1, transfer/3]). start:-% by default, create ten buckets with initial random integer values% in the interval [0, 10[ and print their contents ten times start(10,0,10,10). start(N,Min,Max,Samples):-% create the buckets with random values in the% interval [Min, Max[ and return their sum create_buckets(N,Min,Max,Sum),write('Sum of all bucket values: '),write(Sum),nl,nl,% use competitive or-parallelism for the three loops such that% the computations terminate when the display loop terminatesthreaded(( display_loop(Samples); match_loop(N); redistribute_loop(N) )). create_buckets(N,Min,Max,Sum):-% remove all exisiting bucketsretractall(bucket_(_,_)),% create the new buckets create_buckets(N,Min,Max,0,Sum). create_buckets(0,_,_,Sum,Sum):-!. create_buckets(N,Min,Max,Sum0,Sum):- random::random(Min,Max,Value),asserta(bucket_(N,Value)),MisN-1,Sum1isSum0+Value, create_buckets(M,Min,Max,Sum1,Sum). bucket(Bucket,Value):- bucket_(Bucket,Value). buckets(Values):-findall(Value, bucket_(_,Value),Values). transfer(Origin,_,Origin):-!. transfer(Origin,Delta,Destin):-retract(bucket_(Origin,OriginValue)),retract(bucket_(Destin,DestinValue)),% the buckets may have changed between the access to its% values and the calling of this transfer predicate; thus,% we must ensure that we're transfering a legal amountAmountismin(Delta,OriginValue),NewOriginValueisOriginValue-Amount,NewDestinValueisDestinValue+Amount,assertz(bucket_(Origin,NewOriginValue)),assertz(bucket_(Destin,NewDestinValue)). match_loop(N):-% randomly select two bucketsMisN+1, random::random(1,M,Bucket1), random::random(1,M,Bucket2),% access their contents bucket(Bucket1,Value1), bucket(Bucket2,Value2),% make their new values approximately equalDeltaistruncate(abs(Value1-Value2)/2), (Value1>Value2-> transfer(Bucket1,Delta,Bucket2);Value1<Value2-> transfer(Bucket2,Delta,Bucket1);true ), match_loop(N). redistribute_loop(N):-% randomly select two bucketsMisN+1, random::random(1,M,FromBucket), random::random(1,M,ToBucket),% access bucket from where we transfer bucket(FromBucket,Current),LimitisCurrent+1, random::random(0,Limit,Delta), transfer(FromBucket,Delta,ToBucket), redistribute_loop(N). display_loop(0):-!. display_loop(N):- buckets(Values),write(Values),nl, thread_sleep(2),MisN-1, display_loop(M).:-end_object.
Sample output:
?- buckets::start.Sum of all bucket values:52[4,6,9,5,3,5,9,7,4,0][5,3,6,3,9,5,5,6,2,8][2,2,3,13,5,5,2,8,6,6][7,4,7,1,1,1,5,11,8,7][8,5,8,4,4,3,4,1,3,12][2,4,8,6,11,6,6,7,1,1][2,12,3,2,6,5,0,9,7,6][2,6,3,3,16,3,2,3,7,7][6,0,4,0,23,1,1,4,2,11][11,6,10,4,0,4,5,5,4,3]true.
transfer[bucks_,src_,dest_,n_]:=ReplacePart[bucks,{src->Max[bucks[[src]]-n,0],dest->bucks[[dest]]+Min[bucks[[src]],n]}];DistributeDefinitions[transfer];SetSharedVariable[bucks,comp];bucks=RandomInteger[10,20];comp=True;Print["Original sum: "<>IntegerString[Plus@@bucks]];Print[Dynamic["Current sum: "<>IntegerString[Plus@@bucks]]];WaitAll[{ParallelSubmit[While[True,While[!comp,Null];comp=False;Module[{a=RandomInteger[{1,20}],b=RandomInteger[{1,20}]},bucks=transfer[bucks,Max[a,b],Min[a,b],Floor[Abs[bucks[[a]]-bucks[[b]]]/2]]];comp=True]],ParallelSubmit[While[True,While[!comp,Null];comp=False;Module[{src=RandomInteger[{1,20}],dest=RandomInteger[{1,20}]},bucks=transfer[bucks,src,dest,RandomInteger[{1,bucks[[src]]}]]];comp=True]]}];
Original sum: <number>Current sum: <same number, stays fixed>
This simply uses a variable namedcomp to determine whether or not it is currently computing something.
We use Threads objects which are mapped to system threads. Access to buckets is protected by locks (one lock per bucket). We use also a lock to protect the random number generator which is not thread-safe.
The main thread sleeps during 10 seconds, then ask the threads to terminate. For this purpose, we could have used a simple boolean but we have rather chosen to send the termination message via a channel. So each thread receives the number of the channel to listen to and checks regularly if a message ask it to terminate.
The program must be compiled with option--threads:on
.
importlocksimportmathimportosimportrandomconstN=10# Number of buckets.constMaxInit=99# Maximum initial value for buckets.varbuckets:array[1..N,Natural]# Array of buckets.varbucketLocks:array[1..N,Lock]# Array of bucket locks.varrandomLock:Lock# Lock to protect the random number generator.varterminate:array[3,Channel[bool]]# Used to ask threads to terminate.#---------------------------------------------------------------------------------------------------procgetTwoIndexes():tuple[a,b:int]=## Get two indexes from the random number generator.result.a=rand(1..N)result.b=rand(2..N)ifresult.b==result.a:result.b=1#---------------------------------------------------------------------------------------------------procequalize(num:int){.thread.}=## Try to equalize two buckets.varb1,b2:int# Bucket indexes.whiletrue:# Select the two buckets to "equalize".withLockrandomLock:(b1,b2)=getTwoIndexes()ifb1>b2:swapb1,b2# We want "b1 < b2" to avoid deadlocks.# Perform equalization.withLockbucketLocks[b1]:withLockbucketLocks[b2]:lettarget=(buckets[b1]+buckets[b2])div2letdelta=target-buckets[b1]incbuckets[b1],deltadecbuckets[b2],delta# Check termination.let(available,stop)=tryRecvterminate[num]ifavailableandstop:break#---------------------------------------------------------------------------------------------------procdistribute(num:int){.thread.}=## Redistribute contents of two buckets.varb1,b2:int# Bucket indexes.varfactor:float# Ratio used to compute the new value for "b1".whiletrue:# Select the two buckets for redistribution and the redistribution factor.withLockrandomLock:(b1,b2)=getTwoIndexes()factor=rand(0.0..1.0)ifb1>b2:swapb1,b2# We want "b1 < b2" to avoid deadlocks..# Perform redistribution.withLockbucketLocks[b1]:withLockbucketLocks[b2]:letsum=buckets[b1]+buckets[b2]letvalue=(sum.toFloat*factor).toIntbuckets[b1]=valuebuckets[b2]=sum-value# Check termination.let(available,stop)=tryRecvterminate[num]ifavailableandstop:break#---------------------------------------------------------------------------------------------------procdisplay(num:int){.thread.}=## Display the content of buckets and the sum (which should be constant).whiletrue:foriin1..N:acquirebucketLocks[i]echobuckets," Total = ",sum(buckets)foriincountdown(N,1):releasebucketLocks[i]os.sleep(1000)# Check termination.let(available,stop)=tryRecvterminate[num]ifavailableandstop:break#———————————————————————————————————————————————————————————————————————————————————————————————————randomize()# Initialize the buckets with a random value.forbucketinbuckets.mitems:bucket=rand(1..MaxInit)# Initialize the locks.randomLock.initLock()forlockinbucketLocks.mitems:lock.initLock()# Open the channels.forcinterminate.mitems:c.open()# Create and launch the threads.vartequal,tdist,tdisp:Thread[int]tequal.createThread(equalize,0)tdist.createThread(distribute,1)tdisp.createThread(display,2)sleep(10000)# Ask the threads to stop.forcinterminate.mitems:c.send(true)joinThreads([tequal,tdist,tdisp])# Free resources.randomLock.deinitLock()forlockinbucketLocks.mitems:lock.deinitLock()forcinterminate.mitems:c.close()
Total = 588 [92, 63, 33, 68, 66, 37, 26, 66, 77, 60]Total = 588 [91, 3, 41, 126, 34, 3, 25, 92, 13, 160]Total = 588 [129, 9, 80, 6, 68, 8, 73, 45, 69, 101]Total = 588 [87, 71, 144, 20, 11, 54, 72, 48, 63, 18]Total = 588 [158, 71, 110, 51, 19, 60, 27, 31, 10, 51]Total = 588 [97, 43, 5, 70, 71, 104, 25, 17, 112, 44]Total = 588 [68, 50, 12, 51, 128, 8, 21, 143, 53, 54]Total = 588 [31, 47, 156, 81, 69, 5, 28, 76, 66, 29]Total = 588 [97, 3, 27, 82, 42, 120, 72, 74, 39, 32]Total = 588 [30, 39, 79, 109, 62, 62, 13, 14, 54, 126]
Uses a lock for every bucket. Enforces a locking order to avoid deadlocks.
declare %% %% INIT %% NBuckets = 100 StartVal = 50 ExpectedSum = NBuckets * StartVal %% Makes a tuple and calls Fun for every field fun {Make Label N Fun} R = {Tuple.make Label N} in for I in 1..N do R.I = {Fun} end R end Buckets = {Make buckets NBuckets fun {$} {Cell.new StartVal} end} Locks = {Make locks NBuckets Lock.new} LockList = {Record.toList Locks} %% %% DISPLAY %% proc {Display} Snapshot = {WithLocks LockList fun {$} {Record.map Buckets Cell.access} end } Sum = {Record.foldL Snapshot Number.'+' 0} in {Print Snapshot} {System.showInfo " sum: "#Sum} Sum = ExpectedSum %% assert end %% Calls Fun with multiple locks locked and returns the result of Fun. fun {WithLocks Ls Fun} case Ls of L|Lr then lock L then {WithLocks Lr Fun} end [] nil then {Fun} end end %% %% MANIPULATE %% proc {Smooth I J} Diff = @(Buckets.I) - @(Buckets.J) %% reading without lock: by design Amount = Diff div 4 in {Transfer I J Amount} end proc {Roughen I J} Amount = @(Buckets.I) div 3 %% reading without lock: by design in {Transfer I J Amount} end %% Atomically transfer an amount from From to To. %% Negative amounts are allowed; %% will never make a bucket negative. proc {Transfer From To Amount} if From \= To then %% lock in order (to avoid deadlocks) Smaller = {Min From To} Bigger = {Max From To} in lock Locks.Smaller then lock Locks.Bigger then FromBucket = Buckets.From ToBucket = Buckets.To NewFromValue = @FromBucket - Amount NewToValue = @ToBucket + Amount in if NewFromValue >= 0 andthen NewToValue >= 0 then FromBucket := NewFromValue ToBucket := NewToValue end end end end end %% Returns a random bucket index. fun {Pick} {OS.rand} mod NBuckets + 1 endin %% %% START %% thread for do {Smooth {Pick} {Pick}} end end thread for do {Roughen {Pick} {Pick}} end end for do {Display} {Time.delay 50} end
Sample output:
buckets(50 50 50 50 50 50 50 50 50 50 ,,,) sum: 5000buckets(24 68 58 43 78 85 43 66 14 48 ,,,) sum: 5000buckets(36 33 59 38 39 23 55 51 43 45 ,,,) sum: 5000buckets(64 32 62 26 50 82 38 70 16 43 ,,,) sum: 5000buckets(51 51 49 50 51 51 51 49 49 49 ,,,) sum: 5000buckets(43 28 27 60 77 41 36 48 72 70 ,,,) sum: 5000...
GP is not able to do atomic updates. PARI does atomic updates just likeC.
usestrict;use5.10.0;usethreads'yield';usethreads::shared;my@a:shared=(100)x10;my$stop:shared=0;subpick2{my$i=int(rand(10));my$j;$j=int(rand(10))until$j!=$i;($i,$j)}subeven{lock@a;my($i,$j)=pick2;my$sum=$a[$i]+$a[$j];$a[$i]=int($sum/2);$a[$j]=$sum-$a[$i];}subrand_move{lock@a;my($i,$j)=pick2;my$x=int(rand$a[$i]);$a[$i]-=$x;$a[$j]+=$x;}subshow{lock@a;my$sum=0;$sum+=$_for(@a);printf"%4d",$_for@a;print" total $sum\n";}my$t1=async{evenuntil$stop}my$t2=async{rand_moveuntil$stop}my$t3=async{for(1..10){show;sleep(1);}$stop=1;};$t1->join;$t2->join;$t3->join;
withoutjs-- (no threads or critical sections in JavaScript)constantnBuckets=20sequencebuckets=tagset(nBuckets)-- {1,2,3,..,20}constantbucket_cs=init_cs()-- critical sectionatomequals=0,rands=0-- operation countsintegerterminate=0-- control flagproceduremythreads(integereq)-- if eq then equalise else randomiseintegerb1,b2,amtwhilenotterminatedob1=rand(nBuckets)b2=rand(nBuckets)ifb1!=b2then-- (test not actually needed)enter_cs(bucket_cs)ifeqthenamt=floor((buckets[b1]-buckets[b2])/2)equals+=1elseamt=rand(buckets[b1]+1)-1rands+=1endifbuckets[b1]-=amtbuckets[b2]+=amtleave_cs(bucket_cs)endifendwhileexit_thread(0)endprocedureproceduredisplay()enter_cs(bucket_cs)?{sum(buckets),equals,rands,buckets}leave_cs(bucket_cs)endproceduredisplay()constantthreads={create_thread(routine_id("mythreads"),{1}),-- equalisecreate_thread(routine_id("mythreads"),{0})}-- randomiseconstantESC=#1Bwhilenotfind(get_key(),{ESC,'q','Q'})dosleep(1)display()endwhileterminate=1wait_thread(threads)delete_cs(bucket_cs)
{210,0,0,{1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20}}{210,1326977,1619458,{14,17,10,7,9,6,8,5,9,7,6,10,14,12,12,14,13,15,12,10}}{210,2637987,3137483,{10,7,4,31,1,11,5,6,16,11,9,15,9,13,15,5,5,10,6,21}}{210,3973762,4619906,{22,38,9,17,9,10,12,0,3,0,13,11,2,39,4,11,9,0,0,1}}{210,5327923,6082436,{1,0,0,9,23,1,33,7,1,43,8,17,1,6,30,0,24,2,3,1}}{210,6671482,7561288,{12,11,2,9,11,4,11,9,13,9,9,20,19,10,10,8,11,11,12,9}}{210,7950733,9131581,{7,9,10,8,11,8,13,12,11,5,6,8,11,16,15,14,15,11,11,9}}{210,9272164,10625022,{4,8,28,2,13,13,6,32,12,5,10,4,28,1,12,9,4,9,4,6}}{210,10615451,12117282,{10,17,18,2,7,13,10,2,12,4,19,10,18,12,9,5,12,11,8,11}}{210,11912322,13610386,{10,7,15,11,12,8,12,10,15,14,10,7,9,10,8,11,8,10,13,10}}{210,13243566,15099214,{8,12,11,7,12,13,13,8,9,9,16,10,10,8,10,10,8,10,13,13}}
We use database objects (persistent symbols) for the buckets, andchild processes to handle the tasks, as this is the standard wayfor general PicoLisp applications.
(seed (in "/dev/urandom" (rd 8)))(de *Buckets . 15) # Number of buckets# E/R model(class +Bucket +Entity)(rel key (+Key +Number)) # Key 1 .. *Buckets(rel val (+Number)) # Value 1 .. 999# Create new DB file(pool (tmp "buckets.db"))# Create *Buckets buckets with values between 1 and 999(for K *Buckets (new T '(+Bucket) 'key K 'val (rand 1 999)) )(commit)# Pick a random bucket(de pickBucket () (db 'key '+Bucket (rand 1 *Buckets)) )# First process(unless (fork) (seed *Pid) # Ensure local random sequence (loop (let (B1 (pickBucket) B2 (pickBucket)) # Pick two buckets 'B1' and 'B2' (dbSync) # Atomic DB operation (let (V1 (; B1 val) V2 (; B2 val)) # Get current values (cond ((> V1 V2) (dec> B1 'val) # Make them closer to equal (inc> B2 'val) ) ((> V2 V1) (dec> B2 'val) (inc> B1 'val) ) ) ) (commit 'upd) # Close transaction (wait 1) ) ) )# Second process(unless (fork) (seed *Pid) # Ensure local random sequence (loop (let (B1 (pickBucket) B2 (pickBucket)) # Pick two buckets 'B1' and 'B2' (unless (== B1 B2) # Found two different ones? (dbSync) # Atomic DB operation (let (V1 (; B1 val) V2 (; B2 val)) # Get current values (cond ((> V1 V2 0) (inc> B1 'val) # Redistribute them (dec> B2 'val) ) ((> V2 V1 0) (inc> B2 'val) (dec> B1 'val) ) ) ) (commit 'upd) # Close transaction (wait 1) ) ) ) )# Third process(unless (fork) (loop (let Lst (collect 'key '+Bucket) # Get all buckets (for This Lst # Print current values (printsp (: val)) ) (prinl # and total sum "-- Total: " (sum '((This) (: val)) Lst) ) ) (wait 2000) ) ) # Sleep two seconds(wait)
Output:
70 236 582 30 395 215 525 653 502 825 129 769 722 440 708 -- Total: 68010 156 566 352 198 263 0 743 0 1316 58 1180 897 0 1072 -- Total: 68010 0 424 101 0 0 0 682 0 1809 0 1549 961 0 1275 -- Total: 68010 0 0 0 0 0 0 452 0 2226 0 1838 884 0 1401 -- Total: 680154 55 56 55 54 55 54 102 54 2363 54 1816 666 55 1308 -- Total: 6801198 198 197 196 198 198 197 197 196 1903 197 1438 345 197 946 -- Total: 6801342 344 343 344 344 342 344 343 343 1278 343 992 343 343 413 -- Total: 6801^C
#Buckets=9#TotalAmount=200GlobalDimBuckets(#Buckets)GlobalBMutex=CreateMutex()GlobalQuit=#FalseProceduremax(x,y)Ifx>=y:ProcedureReturnxElse:ProcedureReturnyEndIfEndProcedureProcedureMove(WantedAmount,From,Dest)ProtectedRealAmountIffrom<>DestLockMutex(BMutex)RealAmount=max(0,Buckets(from)-WantedAmount)Buckets(From)-RealAmountBuckets(Dest)+RealAmountUnlockMutex(BMutex)EndIfProcedureReturnRealAmountEndProcedureProcedureLevel(A,B)Protectedi,j,tIfA<>BLockMutex(BMutex)t=Buckets(A)+Buckets(B)i=t/2:j=t-iBuckets(A)=iBuckets(B)=jUnlockMutex(BMutex)EndIfEndProcedureProcedureDoInvent(ArrayA(1))Protectedi,sumLockMutex(BMutex)Fori=0ToArraySize(Buckets())A(i)=Buckets(i)sum+A(i)NextiUnlockMutex(BMutex)ProcedureReturnsumEndProcedureProcedureMixingThread(arg)RepeatMove(Random(#TotalAmount),Random(#Buckets),Random(#Buckets))UntilQuitEndProcedureProcedureLevelingThread(arg)RepeatLevel(Random(#Buckets),Random(#Buckets))UntilQuitEndProcedureIfOpenWindow(0,0,0,100,150,"Atomic updates",#PB_Window_SystemMenu)DefineThread1=CreateThread(@MixingThread(),0)DefineThread2=CreateThread(@MixingThread(),0)Definei,EventDimInventory(#Buckets);SetupasmallGUIFori=0To9TextGadget(i,0,i*15,50,15,"Bucket #"+Str(i))NextiTextGadget(10,55,135,40,15,"=")AddWindowTimer(0,0,500)Buckets(0)=#TotalAmountRepeatEvent=WaitWindowEvent()IfEvent=#PB_Event_Timeri=DoInvent(Inventory())SetGadgetText(10,"="+Str(i))Fori=0To#BucketsSetGadgetText(i,Str(Inventory(i)))NextiEndIfUntilEvent=#PB_Event_CloseWindowQuit=#True;TellthreadstoshutdownWaitThread(Thread1):WaitThread(Thread2)EndIf
This code uses athreading.Lock to serialize access to the bucket set.
from__future__importwith_statement# required for Python 2.5importthreadingimportrandomimporttimeterminate=threading.Event()classBuckets:def__init__(self,nbuckets):self.nbuckets=nbucketsself.values=[random.randrange(10)foriinrange(nbuckets)]self.lock=threading.Lock()def__getitem__(self,i):returnself.values[i]deftransfer(self,src,dst,amount):withself.lock:amount=min(amount,self.values[src])self.values[src]-=amountself.values[dst]+=amountdefsnapshot(self):# copy of the current state (synchronized)withself.lock:returnself.values[:]defrandomize(buckets):nbuckets=buckets.nbucketswhilenotterminate.isSet():src=random.randrange(nbuckets)dst=random.randrange(nbuckets)ifdst!=src:amount=random.randrange(20)buckets.transfer(src,dst,amount)defequalize(buckets):nbuckets=buckets.nbucketswhilenotterminate.isSet():src=random.randrange(nbuckets)dst=random.randrange(nbuckets)ifdst!=src:amount=(buckets[src]-buckets[dst])//2ifamount>=0:buckets.transfer(src,dst,amount)else:buckets.transfer(dst,src,-amount)defprint_state(buckets):snapshot=buckets.snapshot()forvalueinsnapshot:print'%2d'%value,print'=',sum(snapshot)# create 15 bucketsbuckets=Buckets(15)# the randomize threadt1=threading.Thread(target=randomize,args=[buckets])t1.start()# the equalize threadt2=threading.Thread(target=equalize,args=[buckets])t2.start()# main thread, displaytry:whileTrue:print_state(buckets)time.sleep(1)exceptKeyboardInterrupt:# ^C to finishterminate.set()# wait until all worker threads finisht1.join()t2.join()
Sample Output:
5 5 11 5 5 5 5 5 5 0 6 5 5 6 5 = 78 9 0 0 0 20 5 0 21 10 0 0 8 5 0 0 = 78 4 0 4 12 4 4 9 2 14 0 11 2 0 12 0 = 78 5 5 6 5 5 5 6 5 6 5 5 5 5 5 5 = 78 2 0 3 0 0 0 0 4 13 4 9 0 1 9 33 = 78 0 0 0 22 11 0 13 12 0 0 0 20 0 0 0 = 78
#langracket(structbucket(value[lock#:auto])#:auto-value#f#:mutable#:transparent)(define*buckets*(build-vector10(λ(i)(bucket100))))(define(show-buckets)(let*([values(for/list([b*buckets*])(bucket-valueb))][total(apply+values)])(appendvalues(list'-total))))(define*equalizations*0)(define*randomizations*0)(define*blocks*0)(define(show-stats)(let([n(length*log*)][log(reverse*log*)])(printf"Equalizations ~a, Randomizations ~a, Transfers: ~a, Blocks ~a\n"*equalizations**randomizations*n*blocks*)(for([i(in-range10)])(definej(min(floor(*i(/n9)))(sub1n)))(printf"~a (~a). "(add1i)(add1j))(displayln(list-reflogj)))))(define*log*(list(show-buckets)))(define-syntax-rule(inc!x)(set!x(add1x)))(define(get-bucketi)(vector-ref*buckets*i))(define(get-valuei)(bucket-value(get-bucketi)))(define(set-value!iv)(set-bucket-value!(get-bucketi)v))(define(locked?i)(bucket-lock(vector-ref*buckets*i)))(define(lock!iv)(set-bucket-lock!(get-bucketi)v))(define(unlock!i)(lock!i#f))(define*clamp-lock*#f)(define(clampij)(cond[*clamp-lock*(inc!*blocks*)#f][else(set!*clamp-lock*#t)(let([result#f][g(gensym)])(unless(locked?i)(lock!ig)(cond[(locked?j)(unlock!i)][else(lock!jg)(set!result#t)]))(unlessresult(inc!*blocks*))(set!*clamp-lock*#f)result)]))(define(unclampij)(unlock!i)(unlock!j))(define(transferijamount)(let*([lock1(locked?i)][lock2(locked?j)][a(get-valuei)][b(get-valuej)][c(-aamount)][d(+bamount)])(cond[(<c0)(error'transfer"Removing too much.")][(<d0)(error'transfer"Stealing too much.")][(andlock1(equal?lock1lock2))(set-value!ic)(set-value!jd)(set!*log*(cons(show-buckets)*log*))][else(error'transfer"Lock problem")])))(define(equalizeij)(when(clampij)(let([a(get-valuei)][b(get-valuej)])(unless(=ab)(transferij(if(>ab)(floor(/(-ab)2))(-(floor(/(-ba)2)))))(inc!*equalizations*)))(unclampij)))(define(randomizeij)(when(clampij)(let*([a(get-valuei)][b(get-valuej)][t(+ab)][r(if(=t0)0(randomt))])(unless(=r0)(transferij(-ar))(inc!*randomizations*)))(unclampij)))(thread(λ()(for([_(in-range500000)])(equalize(random10)(random10)))))(thread(λ()(for([_(in-range500000)])(randomize(random10)(random10)))))
Sample output:
> (show-stats)Equalizations 33616, Randomizations 159240, Transfers: 192857, Blocks 5790351 (1). (100 100 100 100 100 100 100 100 100 100 - 1000)2 (21429). (100 238 23 36 153 111 86 100 38 115 - 1000)3 (42858). (162 26 127 39 459 5 40 23 90 29 - 1000)4 (64286). (16 80 41 307 117 38 251 36 29 85 - 1000)5 (85715). (96 62 142 7 102 48 150 80 57 256 - 1000)6 (107143). (69 70 69 69 69 69 298 69 69 149 - 1000)7 (128572). (56 66 99 23 328 99 116 117 78 18 - 1000)8 (150000). (23 128 108 110 56 232 69 25 33 216 - 1000)9 (171429). (27 169 298 9 26 184 134 27 110 16 - 1000)10 (192857). (54 80 38 52 29 14 42 173 246 272 - 1000)
(formerly Perl 6)
#| A collection of non-negative integers, with atomic operations.classBucketStore {has$.elemsisrequired;has@!buckets = ^1024 .pickxx$!elems;has$lock =Lock.new;#| Returns an array with the contents of all buckets.methodbuckets {$lock.protect: { [@!buckets] } }#| Transfers $amount from bucket at index $from, to bucket at index $to.methodtransfer ($amount, :$from!, :$to!) {returnif$from ==$to;$lock.protect: {my$clamped =$amountmin@!buckets[$from];@!buckets[$from] -=$clamped;@!buckets[$to] +=$clamped; } }}# Create bucket storemy$bucket-store =BucketStore.new:elems =>8;my$initial-sum =$bucket-store.buckets.sum;# Start a thread to equalize bucketsThread.start: {loop {my@buckets =$bucket-store.buckets;# Pick 2 buckets, so that $to has not more than $frommy ($to,$from) =@buckets.keys.pick(2).sort({@buckets[$_] });# Transfer half of the difference, rounded down$bucket-store.transfer: ([-]@buckets[$from,$to])div2, :$from, :$to; }}# Start a thread to distribute values among bucketsThread.start: {loop {my@buckets =$bucket-store.buckets;# Pick 2 bucketsmy ($to,$from) =@buckets.keys.pick(2);# Transfer a random portion$bucket-store.transfer: ^@buckets[$from] .pick, :$from, :$to; }}# Loop to display bucketsloop {sleep1;my@buckets =$bucket-store.buckets;my$sum =@buckets.sum;say"{@buckets.fmt: '%4d'}, total $sum";if$sum !=$initial-sum {note"ERROR: Total changed from $initial-sum to $sum";exit1; }}
23 52 831 195 1407 809 813 20, total 41501172 83 336 306 751 468 615 419, total 4150 734 103 1086 88 313 136 1252 438, total 4150 512 323 544 165 200 3 2155 248, total 4150...
# Project : Atomic updatesbucket = list(10)f2 = 0for i = 1 to 10 bucket[i] = floor(random(9)*10)next a = display("display:")see nla = flatten(a)see "" + a + nla = display("flatten:")see nla = transfer(3,5)see a + nlsee "19 from 3 to 5: "a = display(a)see nl func display(a) display = 0 see "" + a + " " + char(9) for i = 1 to 10 display = display + bucket[i] see "" + bucket[i] + " " next see " total:" + display return display func flatten(f) f1 = floor((f / 10) + 0.5) for i = 1 to 10 bucket[i] = f1 f2 = f2 + f1 next bucket[10] = bucket[10] + f - f2 func transfer(a1,a2) transfer = floor(random(9)/10 * bucket[a1]) bucket[a1] = bucket[a1] - transfer bucket[a2] = bucket[a2] + transfer
Output:
display: 60 10 70 60 40 80 90 20 90 20 total:540flatten: 54 54 54 54 54 54 54 54 54 54 total:54019 from 3 to 5: 54 54 33 54 75 54 54 54 54 54 total:540
require'thread'# A collection of buckets, filled with random non-negative integers.# There are atomic operations to look at the bucket contents, and# to move amounts between buckets.classBucketStore# Creates a BucketStore with +nbuckets+ buckets. Fills each bucket# with a random non-negative integer.definitializenbuckets# Create an array for the buckets@buckets=(0...nbuckets).map{rand(1024)}# Mutex used to make operations atomic@mutex=Mutex.newend# Returns an array with the contents of all buckets.defbuckets@mutex.synchronize{Array.new(@buckets)}end# Transfers _amount_ to bucket at array index _destination_,# from bucket at array index _source_.deftransferdestination,source,amount# Do nothing if both buckets are samereturnnilifdestination==source@mutex.synchronizedo# Clamp amount to prevent negative value in bucketamount=[amount,@buckets[source]].min@buckets[source]-=amount@buckets[destination]+=amountendnilendend# Create bucket storebucket_store=BucketStore.new8# Get total amount in the storeTOTAL=bucket_store.buckets.inject{|a,b|a+=b}# Start a thread to equalize bucketsThread.newdoloopdo# Pick 2 bucketsbuckets=bucket_store.bucketsfirst=randbuckets.lengthsecond=randbuckets.length# Swap buckets so that _first_ has not more than _second_first,second=second,firstifbuckets[first]>buckets[second]# Transfer half of the difference, rounded downbucket_store.transferfirst,second,(buckets[second]-buckets[first])/2endend# Start a thread to distribute values among bucketsThread.newdoloopdo# Pick 2 bucketsbuckets=bucket_store.bucketsfirst=randbuckets.lengthsecond=randbuckets.length# Transfer random amount to _first_ from _second_bucket_store.transferfirst,second,rand(buckets[second])endend# Loop to display bucketsloopdosleep1buckets=bucket_store.buckets# Compute the total value in all buckets.# We calculate this outside BucketStore so BucketStore can't cheat by# always reporting the same value.n=buckets.inject{|a,b|a+=b}# Display buckets and totalprintf"%s, total %d\n",(buckets.map{|v|sprintf"%4d",v}.join" "),nifn!=TOTAL# This should never happen$stderr.puts"ERROR: Total changed from#{TOTAL} to#{n}"exit1endend
Sample Output:
221 521 331 1186 654 185 521 19, total 3638 455 455 455 455 454 454 455 455, total 3638 455 455 455 455 454 454 455 455, total 3638 755 3 115 10 598 1326 515 316, total 3638
DIM bucket(10)FOR i = 1 TO 10 : bucket(i) = int(RND(0)*100) : NEXTa = display(" Display:")' show original arraya = flatten(a)' flatten the arraya = display(" Flatten:")' show flattened arraya = transfer(3,5)' transfer some amount from 3 to 5a = display(a;" from 3 to 5:")' Show transfer arrayendFUNCTION display(a$) print a$;" ";chr$(9); for i = 1 to 10 display = display + bucket(i) print bucket(i);chr$(9); next i print " Total:";displayEND FUNCTION FUNCTION flatten(f) f1 = int((f / 10) + .5) for i = 1 to 10 bucket(i)= f1 f2= f2 + f1 next i bucket(10)= bucket(10) + f - f2END FUNCTIONFUNCTION transfer(a1,a2)transfer= int(rnd(0) * bucket(a1))bucket(a1)= bucket(a1) - transferbucket(a2)= bucket(a2) + transferEND FUNCTION
Display: 2450508563495091102 Total:474 Flatten: 47474747474747474751 Total:47419 from 3 to 5: 47472847664747474751 Total:474
externcraterand;usestd::sync::{Arc,Mutex};usestd::thread;usestd::cmp;usestd::time::Duration;userand::Rng;userand::distributions::{IndependentSample,Range};traitBuckets{fnequalize<R:Rng>(&mutself,rng:&mutR);fnrandomize<R:Rng>(&mutself,rng:&mutR);fnprint_state(&self);}implBucketsfor[i32]{fnequalize<R:Rng>(&mutself,rng:&mutR){letrange=Range::new(0,self.len()-1);letsrc=range.ind_sample(rng);letdst=range.ind_sample(rng);ifdst!=src{letamount=cmp::min(((dst+src)/2)asi32,self[src]);letmultiplier=ifamount>=0{-1}else{1};self[src]+=amount*multiplier;self[dst]-=amount*multiplier;}}fnrandomize<R:Rng>(&mutself,rng:&mutR){letind_range=Range::new(0,self.len()-1);letsrc=ind_range.ind_sample(rng);letdst=ind_range.ind_sample(rng);ifdst!=src{letamount=cmp::min(Range::new(0,20).ind_sample(rng),self[src]);self[src]-=amount;self[dst]+=amount;}}fnprint_state(&self){println!("{:?} = {}",self,self.iter().sum::<i32>());}}fnmain(){lete_buckets=Arc::new(Mutex::new([10;10]));letr_buckets=e_buckets.clone();letp_buckets=e_buckets.clone();thread::spawn(move||{letmutrng=rand::thread_rng();loop{letmutbuckets=e_buckets.lock().unwrap();buckets.equalize(&mutrng);}});thread::spawn(move||{letmutrng=rand::thread_rng();loop{letmutbuckets=r_buckets.lock().unwrap();buckets.randomize(&mutrng);}});letsleep_time=Duration::new(1,0);loop{{letbuckets=p_buckets.lock().unwrap();buckets.print_state();}thread::sleep(sleep_time);}}
objectAtomicUpdates{classBuckets(ns:Int*){importscala.actors.Actor._valbuckets=ns.toArraycaseclassGet(index:Int)caseclassTransfer(fromIndex:Int,toIndex:Int,amount:Int)caseobjectGetAllvalhandler=actor{loop{react{caseGet(index)=>reply(buckets(index))caseTransfer(fromIndex,toIndex,amount)=>assert(amount>=0)valactualAmount=Math.min(amount,buckets(fromIndex))buckets(fromIndex)-=actualAmountbuckets(toIndex)+=actualAmountcaseGetAll=>reply(buckets.toList)}}}defget(index:Int):Int=(handler!?Get(index)).asInstanceOf[Int]deftransfer(fromIndex:Int,toIndex:Int,amount:Int)=handler!Transfer(fromIndex,toIndex,amount)defgetAll:List[Int]=(handler!?GetAll).asInstanceOf[List[Int]]}defrandomPair(n:Int):(Int,Int)={importscala.util.Random._valpair=(nextInt(n),nextInt(n))if(pair._1==pair._2)randomPair(n)elsepair}defmain(args:Array[String]){importscala.actors.Scheduler._valbuckets=newBuckets(List.range(1,11):_*)valstop=newjava.util.concurrent.atomic.AtomicBoolean(false)vallatch=newjava.util.concurrent.CountDownLatch(3)execute{while(!stop.get){val(i1,i2)=randomPair(10)val(n1,n2)=(buckets.get(i1),buckets.get(i2))valm=(n1+n2)/2if(n1<n2)buckets.transfer(i2,i1,n2-m)elsebuckets.transfer(i1,i2,n1-m)}latch.countDown}execute{while(!stop.get){val(i1,i2)=randomPair(10)valn=buckets.get(i1)buckets.transfer(i1,i2,if(n==0)0elsescala.util.Random.nextInt(n))}latch.countDown}execute{for(i<-1to20){valall=buckets.getAllprintln(all.sum+":"+all)Thread.sleep(500)}stop.set(true)latch.countDown}latch.awaitshutdown}}
NUM_BUCKETS:=10."create and preset with random data"buckets:= (1to:NUM_BUCKETS)collect:[:i|RandomnextIntegerBetween:0and:10000]as:Array.count_randomizations:=0.count_equalizations:=0.printSum:= ["the sum must be computed and printed while noone fiddles around"|snapshot|snapshot:=bucketssynchronized:[bucketscopy ].TranscriptshowCR:e' {snapshot}sum={snapshotsum}'. ].pickTwo:= [:action|"pick two pockets and eval action on it"|p1 p2|p1:=RandomnextIntegerBetween:1and:NUM_BUCKETS.p2:=RandomnextIntegerBetween:1and:NUM_BUCKETS.bucketssynchronized:[actionvalue:p1value:p2 ]. ].randomize:= [pickTwovalue:[:p1:p2|"take a random value from p1 and add to p2"|howMuch|howMuch:=RandomnextIntegerBetween:0and:(bucketsat:p1).bucketsat:p1put:(bucketsat:p1)-howMuch.bucketsat:p2put:(bucketsat:p2)+howMuch. ].count_randomizations:=count_randomizations+1. ].equalize:= [pickTwovalue:[:p1:p2|"average them"|diff|diff:= ((bucketsat:p1)- (bucketsat:p2))//2.bucketsat:p1put:(bucketsat:p1)-diff.bucketsat:p2put:(bucketsat:p2)+diff. ].count_equalizations:=count_equalizations+1. ]."start the show"randomizer:= [randomizeloop ]fork.equalizer:= [equalizeloop ]fork."every 2 seconds, print the sum"monitor:= [ [printSumvalue.DelaywaitFor:2seconds. ]loop. ]fork."let it run for 10 seconds, then kill them all"DelaywaitFor:20seconds.randomizerterminate.equalizerterminate.monitorterminate.StdoutprintCR:e'performed {count_equalizations}equalizationsand {count_randomizations}randomizations'.
#(3940 3940 3940 3940 3939 3940 3940 3940 3940 3939) sum=39398 #(3940 3939 3940 3940 3940 3940 3940 3940 3940 3939) sum=39398 #(3940 3939 3940 3939 3940 3940 3940 3940 3940 3940) sum=39398 #(326 90 19490 831 2668 4840 37 6285 441 4390) sum=39398 #(3940 3940 3939 3940 3940 3940 3940 3940 3940 3939) sum=39398 #(3940 3940 3939 3940 3940 3939 3940 3940 3940 3940) sum=39398 #(1073 499 8808 1094 457 7380 4447 12307 1526 1807) sum=39398 #(10073 494 3913 284 286 105 18599 437 1332 3875) sum=39398 #(7938 7 9691 1853 1709 3566 12374 459 1062 739) sum=39398 #(327 1185 790 9606 5667 477 1260 178 18474 1434) sum=39398performed 3360635 equalizations and 3060706 randomizations
Due to the way the CPU is scheduled, there are periods where the equalizer is way ahead, and others, where the randomizer is. Thus, depending on when sampled, the buckets are well equalized at times (running a single core).
importFoundationfinalclassAtomicBuckets:CustomStringConvertible{varcount:Int{returnbuckets.count}vardescription:String{returnwithBucketsLocked{"\(buckets)"}}vartotal:Int{returnwithBucketsLocked{buckets.reduce(0,+)}}privateletlock=DispatchSemaphore(value:1)privatevarbuckets:[Int]subscript(n:Int)->Int{returnwithBucketsLocked{buckets[n]}}init(withbuckets:[Int]){self.buckets=buckets}functransfer(amount:Int,from:Int,to:Int){withBucketsLocked{lettransferAmount=buckets[from]>=amount?amount:buckets[from]buckets[from]-=transferAmountbuckets[to]+=transferAmount}}privatefuncwithBucketsLocked<T>(do:()->T)->T{letret:Tlock.wait()ret=`do`()lock.signal()returnret}}letbucks=AtomicBuckets(with:[21,39,40,20])letorder=DispatchSource.makeTimerSource()letchaos=DispatchSource.makeTimerSource()letprinter=DispatchSource.makeTimerSource()printer.setEventHandler{print("\(bucks) =\(bucks.total)")}printer.schedule(deadline:.now(),repeating:.seconds(1))printer.activate()order.setEventHandler{let(b1,b2)=(Int.random(in:0..<bucks.count),Int.random(in:0..<bucks.count))let(v1,v2)=(bucks[b1],bucks[b2])guardv1!=v2else{return}ifv1>v2{bucks.transfer(amount:(v1-v2)/2,from:b1,to:b2)}else{bucks.transfer(amount:(v2-v1)/2,from:b2,to:b1)}}order.schedule(deadline:.now(),repeating:.milliseconds(5))order.activate()chaos.setEventHandler{let(b1,b2)=(Int.random(in:0..<bucks.count),Int.random(in:0..<bucks.count))bucks.transfer(amount:Int.random(in:0..<(bucks[b1]+1)),from:b1,to:b2)}chaos.schedule(deadline:.now(),repeating:.milliseconds(5))chaos.activate()dispatchMain()
[21, 39, 40, 20] = 120[14, 28, 46, 32] = 120[25, 17, 38, 40] = 120[5, 46, 69, 0] = 120[22, 52, 24, 22] = 120[11, 70, 20, 19] = 120[18, 19, 46, 37] = 120
In Tcl, you need to explicitly hold a mutex if you want to reliably access multiple shared variables; single shared variable accesses use a built-in lock.
packagerequireThreadpackagerequireTk# Make the shared statecanvas.c;# So we can allocate the display lines in one loopsetm[thread::mutexcreate]for{seti0}{$i<100}{incri}{setbucketb$i;# A handle for every bucket...tsv::setbuckets$bucket50lappendbuckets$bucketlappendlines[.ccreateline0000]}tsv::setstillgoing1# Make the "make more equal" tasklappendtasks[thread::create{# Perform an atomic update of two cellsproctransfer{b1b2val}{variablemthread::mutexlock$msetv[tsv::getbuckets$b1]if{$val>$v}{setval$v}tsv::incrbuckets$b1[expr{-$val}]tsv::incrbuckets$b2$valthread::mutexunlock$m}# The task itself; we loop this round frequentlyproctask{mutexbuckets}{variablem$mutexb$bucketsi0while{[tsv::getstillgoing]}{setb1[lindex$b$i]if{[incri]==[llength$b]}{seti0}setb2[lindex$b$i]if{[tsv::getbuckets$b1]>[tsv::getbuckets$b2]}{transfer$b1$b21}else{transfer$b1$b2-1}}}thread::wait}]# Make the "mess things up" tasklappendtasks[thread::create{# Utility to pick a random item from a listprocpicklist{lindex$list[expr{int(rand()*[llength$list])}]}proctransfer{b1b2val}{variablemthread::mutexlock$msetv[tsv::getbuckets$b1]if{$val>$v}{setval$v}tsv::incrbuckets$b1[expr{-$val}]tsv::incrbuckets$b2$valthread::mutexunlock$m}# The task to move a large amount between two random bucketsproctask{mutexbuckets}{variablem$mutexb$bucketswhile{[tsv::getstillgoing]}{setb1[pick$b]setb2[pick$b]transfer$b1$b2[expr{[tsv::getbuckets$b1]/3}]}}thread::wait}]# The "main" task; we keep GUI operations in the main threadprocredisplay{}{globalmbucketslinesthread::mutexlock$mseti1foreachb$bucketsl$lines{.ccoords$l$i0$i[tsv::getbuckets$b]incri2}thread::mutexunlock$mafter100redisplay}# Start tasks and display.cconfigure-width201-height120pack.credisplayforeacht$tasks{thread::send-async$t[listtask$m$buckets]}# Wait for user to close window, then tidy uptkwaitwindow.tsv::setstillgoing0thread::broadcastthread::exit
This is based on the Kotlin entry but has been modified somewhat mainly due to the following factors.
Wren-cli doesn't have threads but uses Fibers for concurrent operations in combination with the Scheduler/Timer classes for asynchronous operations.
Fibers are cooperatively (rather than preemptively) scheduled and only one fiber can run at a time. Consequently, simultaneous operations are impossible and all operations are therefore atomic by their nature.
import"random"forRandomimport"scheduler"forSchedulerimport"timer"forTimerimport"./math"forNumsvarRnd=Random.new()varNUM_BUCKETS=10varMAX_VALUE=9999classBuckets{constructnew(data){_data=data.toList_running=true}[index]{_data[index]}transfer(srcIndex,dstIndex,amount){if(amount<0)Fiber.abort("Negative amount:%(amount)")if(amount==0)return0vara=amountif(_data[srcIndex]-a<0)a=_data[srcIndex]if(_data[dstIndex]+a<0)a=MAX_VALUE-_data[dstIndex]if(a<0)Fiber.abort("Negative amount:%(a)")_data[srcIndex]=_data[srcIndex]-a_data[dstIndex]=_data[dstIndex]+areturna}buckets{_data.toList}transferRandomAmount(){while(_running){varsrcIndex=Rnd.int(NUM_BUCKETS)vardstIndex=Rnd.int(NUM_BUCKETS)varamount=Rnd.int(MAX_VALUE+1)transfer(srcIndex,dstIndex,amount)Timer.sleep(1)}}equalize(){while(_running){varsrcIndex=Rnd.int(NUM_BUCKETS)vardstIndex=Rnd.int(NUM_BUCKETS)varamount=((this[srcIndex]-this[dstIndex])/2).truncateif(amount>=0)transfer(srcIndex,dstIndex,amount)Timer.sleep(1)}}stop(){_running=false}print(){Timer.sleep(1000)// one second delay between printsvarbucketValues=bucketsSystem.print("Current values:%(Nums.sum(bucketValues))%(bucketValues)")}}varvalues=List.filled(NUM_BUCKETS,0)for(iin0...NUM_BUCKETS)values[i]=Rnd.int(MAX_VALUE+1)System.print("Initial array :%(Nums.sum(values))%(values)")varbuckets=Buckets.new(values)varcount=0while(true){Scheduler.add{buckets.equalize()}buckets.print()Scheduler.add{buckets.transferRandomAmount()}buckets.print()count=count+2if(count==10){// stop after 10 prints, saybuckets.stop()break}}
Sample run:
Initial array : 54458 [3795, 7333, 1896, 7813, 5336, 3828, 7832, 2001, 4856, 9768]Current values: 54458 [5446, 5446, 5445, 5445, 5446, 5446, 5446, 5446, 5446, 5446]Current values: 54458 [7868, 0, 25620, 7867, 0, 6511, 0, 0, 3318, 3274]Current values: 54458 [5290, 4692, 3744, 7575, 5290, 7575, 1804, 11419, 0, 7069]Current values: 54458 [0, 0, 2285, 4534, 16194, 11666, 16195, 2608, 976, 0]Current values: 54458 [3435, 9208, 0, 9054, 4589, 5898, 4866, 0, 12819, 4589]Current values: 54458 [4122, 8125, 6552, 415, 5845, 8125, 415, 7159, 0, 13700]Current values: 54458 [5134, 0, 0, 17435, 5135, 9417, 7871, 4299, 5167, 0]Current values: 54458 [2592, 4867, 5708, 6754, 12923, 0, 10013, 6377, 2532, 2692]Current values: 54458 [1574, 9609, 8627, 0, 1575, 229, 4157, 12565, 12973, 3149]Current values: 54458 [4146, 213, 5928, 11141, 2865, 6928, 12598, 2286, 2052, 6301]
Threads and thread safe objects (locks, lists, ints, etc) are built in.
class B{ const N=10; var [const] buckets=(1).pump(N,List).copy(), //(1,2,3...) lock=Atomic.Lock(), cnt=Atomic.Int(); fcn init{ "Initial sum: ".println(values().sum()); } fcn transferArb{ // transfer arbitary amount from 1 bucket to another b1:=(0).random(N); b2:=(0).random(N); critical(lock){ t:=(0).random(buckets[b1]); buckets[b1]=buckets[b1]-t; buckets[b2]=buckets[b2]+t; } cnt.inc(); } fcn transferEq{ // try to make two buckets equal b1:=(0).random(N); b2:=(0).random(N); critical(lock){ v1:=buckets[b1]; v2:=buckets[b2]; t:=(v1-v2).abs()/2; if (v1<v2) t = -t; buckets[b1]=v1-t; buckets[b2]=v2+t; } cnt.inc(); } fcn values{ critical(lock){buckets.copy()} }}fcn threadA(b){ while(1) { b.transferArb(); } }fcn threadE(b){ while(1) { b.transferEq(); } }
b:=B();do(10){ threadA.launch(b); } do(10){ threadE.launch(b); }while(1){ v:=b.values(); v.println("-->",v.sum()," ", b.cnt.value," transfers ", vm.numThreads," threads"); Atomic.sleep(2.5); }
Initial sum: 55L(8,8,7,4,2,2,6,4,4,10)-->55 24 transfers 20 threadsL(1,3,5,6,8,8,1,8,10,5)-->55 33755 transfers 20 threadsL(6,5,4,2,7,6,11,1,7,6)-->55 67616 transfers 20 threadsL(5,8,5,5,9,4,4,4,5,6)-->55 101434 transfers 20 threadsL(4,1,6,9,10,4,5,5,4,7)-->55 135013 transfers 20 threadsL(7,6,5,4,5,4,4,7,7,6)-->55 168516 transfers 20 threadsL(2,4,5,3,4,14,1,5,11,6)-->55 202241 transfers 20 threadsL(7,5,2,3,14,8,6,6,1,3)-->55 235660 transfers 20 threadsL(8,7,9,8,7,6,1,1,6,2)-->55 269039 transfers 20 threadsL(7,4,8,17,3,2,1,5,5,3)-->55 302837 transfers 20 threadsL(4,5,4,5,10,5,5,5,3,9)-->55 336642 transfers 20 threads
Another solution, using a Pipe as a "holding tank". Pipes are thread safe queues. This code just moves the values to and from the pipe to synchronize changes. The use of this class is the same as above, just change b:=B() to b:=C();
class C{ const N=10; var [const] buckets=(1).pump(N,List).copy(), //(1,2,3...) pipe = Thread.Pipe(), cnt=Atomic.Int(); fcn init{ pipe.write(buckets); "Initial sum: ".println(values().sum()); } fcn transferArb{ // transfer arbitary amount from 1 bucket to another b1:=(0).random(N); b2:=(0).random(N); v:=pipe.read(); t:=(0).random(v[b1]); v[b1]=v[b1]-t; v[b2]=v[b2]+t; pipe.write(v); cnt.inc(); } fcn transferEq{ // try to make two buckets equal b1:=(0).random(N); b2:=(0).random(N); v:=pipe.read(); v1:=v[b1]; v2:=v[b2]; t:=(v1-v2).abs()/2; if (v1<v2) t = -t; v[b1]=v1-t; v[b2]=v2+t; pipe.write(v); cnt.inc(); } fcn values{ v:=pipe.read(); v2:=v.copy(); pipe.write(v); v2; }}