- Notifications
You must be signed in to change notification settings - Fork1
helipilot50/aerospike-top-10-aggregation
Folders and files
Name | Name | Last commit message | Last commit date | |
---|---|---|---|---|
Repository files navigation
##ProblemYou want to create a leaderboard of the top 10 scores, or 10 most recent events, using Aerospike as the data store
##SolutionThe solution is to use an Aggregation that processes the stream of tuples flowing from a query on a secondary index. The aggregation is done in code on each node in the cluster and finally aggregated, or reduced, in the client.
The source code for this solution is available on GitHub,
https://github.com/helipilot50/aerospike-top-10-aggregation.git.
This example requires a working Java development environment (Java 6 and above) including Maven (Maven 2). The Aerospike Java client will be downloaded from Maven Central as part of the build.
After cloning the repository, use maven to build the jar files. From the root directory of the project, issue the following command:
mvn clean package
A JAR file will be produced in the directory 'target',aerospike-top-10-1.0-full.jar
###Running the solutionThis is a runnable jar complete with all the dependencies packaged.
To load data use this command:
java -jar aerospike-top-10-1.0-full.jar -l
It will generate 100,000Event
records with an event name and a time stamp.
To verify you have loaded data use this command:
java -jar aerospike-top-10-1.0-full.jar -S
You can run the aggregation with the following command:
java -jar aerospike-top-10-1.0-full.jar -q
This program will load a User Defined Function (UDF) module when it starts. It will look for the UDF module at this locationudf/leaderboard.lua
. Be sure you place it there.
####Options
-a,--all Aggregate all using ScanAggregate.-h,--host <arg> Server hostname (default: 127.0.0.1)-l,--load Load data.-n,--namespace <arg> Namespace (default: test)-p,--port <arg> Server port (default: 3000)-q,--query Aggregate with query.-s,--set <arg> Set (default: demo)-S,--scan Scan all for testing.-u,--usage Print usage.
The output is a List of 10 Maps, in highest to lowest order:
{eventid=Event:100000, time=1421955197267}{eventid=Event:99999, time=1421955197266}{eventid=Event:99998, time=1421955197265}{eventid=Event:99996, time=1421955197259}{eventid=Event:99997, time=1421955197259}{eventid=Event:99995, time=1421955197259}{eventid=Event:99994, time=1421955197258}{eventid=Event:99993, time=1421955197258}{eventid=Event:99992, time=1421955197256}{eventid=Event:99991, time=1421955197255}
##Discussion
The Java code is very simple, in themain()
method a secondary index is created on thetime
Bin and the UDF module is registered with the cluster.
/* * Create index for query * Index creation only needs to be done once and can be done using AQL or ASCLI also */IndexTaskit =as.client.createIndex(null,as.namespace,as.set,"top-10",TIME_BIN,IndexType.NUMERIC);it.waitTillComplete();/* * Register UDF module * Registration only needs to be done after a change in the UDF module. */RegisterTaskrt =as.client.register(null,"udf/leaderboard.lua","leaderboard.lua",Language.LUA);rt.waitTillComplete();
Based on the option from the command line the code will either load data or run the aggregation.
if (cl.hasOption("l")) {as.populateData();return;}elseif (cl.hasOption("q")) {as.queryAggregate();return;}elseif (cl.hasOption("a")) {as.scanAggregate();return;}else {logUsage(options);}
You will note option-a
; this performs an aggregation by scanning the whole set rather than by using a secondary index and it is only supported on the latest version of Aerospike.
ThequeryAggregate()
method creates theStatement
for the query and then calls theaggregate()
method, which uses the AerospikequeryAggregate()
operation to query the data and invoke the StreamUDFtop()
in the moduleleaderboard
.
publicvoidqueryAggregate() {longnow =System.currentTimeMillis();longyesterday =now -24 *60 *60 *1000;Statementstmt =newStatement();stmt.setNamespace(this.namespace);stmt.setSetName(this.set);stmt.setBinNames(EVENT_ID_BIN,TIME_BIN);stmt.setFilters(Filter.range(TIME_BIN,yesterday,now));aggregate(stmt);}privatevoidaggregate(Statementstmt){ResultSetrs =this.client.queryAggregate(null,stmt,"leaderboard","top",Value.get(10));while (rs.next()){List<Map<String,Object>>result = (List<Map<String,Object>>)rs.getObject();for (Map<String,Object>element :result){System.out.println(element);}}}
The heavy lifting is done in the Stream UDF to build a List of the top 10 (latest) events as they come from the query stream.
The stream is processed with aMap()
function, then anAggregate()
function and finally aReduce()
function.
Let's look at each one, then string them together to process the stream.
The purpose of amap()
function is to transform the current element in the stream to a new form. In this example we are transforming aRecord
to aMap
.
While it looks almost the same, thetransformer()
function is discarding the meta data associated with theRecord
, and retaining only the information we are interested in.
localfunctiontransformer(rec)--info("rec:"..tostring(rec))localtouple=map()touple["eventid"]=rec["eventid"]touple["time"]=rec["time"]--info("touple:"..tostring(touple))returntoupleend
Themap()
function is invoked on each node in the cluster forevery element in the stream.
The purpose of theaggregate()
function is to accumulate a result from the elements in the stream. In this example, theaccumulate()
uses aList
of 10 elements, as a new element arrives it is inserted into the list in the correct order. The local functionmovedown()
aids in this.
localfunctionmovedown(theList,size,at,element)--info("List:"..tostring(theList)..":"..tostring(size)..":"..tostring(start))ifat>sizetheninfo("You are an idiot")returnendindex=size-1while (index>at)dotheList[index+1]=theList[index]index=index-1endtheList[at]=elementendlocalfunctionaccumulate(aggregate,nextitem)localaggregate_size=list.size(aggregate)--info("Item:"..tostring(nextitem))index=1forvalueinlist.iterator(aggregate)do--info(tostring(nextitem.time).." > "..tostring(value.time))ifnextitem.time>value.timethenmovedown(aggregate,top_size,index,nextitem)breakendindex=index+1endreturnaggregateend
Theaggregate()
function is invoked for every element in the stream on each node in the cluster.Note: Theaggregate
variable is held in RAM, so watch for high memory usage for large elements.
Thereduce()
function combines all of the results from the stream in to one complete result. It will be invoked on each node in the cluster and a final reduce on the client.
The functionreducer()
simply combines two elements -- in this case two orderedLists
that are the output of twoAggregation()
functions. The code uses a simple technique to take the two ordered Lists and return a new ordered list of the top 10.
localfunctionreducer(this,that )localmerged_list=list()localthis_index=1localthat_index=1whilethis_index<=10dowhilethat_index<=10doifthis[this_index].time>=that[that_index].timethenlist.append(merged_list,this[this_index])this_index=this_index+1elselist.append(merged_list,that[that_index])that_index=that_index+1endiflist.size(merged_list)==10thenbreakendendiflist.size(merged_list)==10thenbreakendend--info("This:"..tostring(this).." that:"..tostring(that))returnmerged_listend
The stream functiontop()
is the UDF called by the client. It takes a stream object as a parameter and configures amap()
function, anaggregate()
function and areduce()
function.
The functions that we have written to implement these stereotypes are passed in as function pointers.
NOTE: Theaggregate()
function also takes an additional parameterlist{}
. This in an initialList
to be populated by theaggregate()
function.
functiontop(flow,top_size). . .returnflow:map(transformer):aggregate(list{map{eventid="ten",time=0},map{eventid="nine",time=0},map{eventid="eight",time=0},map{eventid="seven",time=0},map{eventid="six",time=0},map{eventid="five",time=0},map{eventid="four",time=0},map{eventid="three",time=0},map{eventid="two",time=0},map{eventid="one",time=0} },accumulate):reduce(reducer)end
About
Finding the top 10 with an Aerospike aggregation
Resources
Uh oh!
There was an error while loading.Please reload this page.
Stars
Watchers
Forks
Releases
Packages0
Uh oh!
There was an error while loading.Please reload this page.