Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Java Programming Guide

Dmytro Vyazelenko edited this pageJan 31, 2025 ·53 revisions

Java Programming Guide

The Aeron API is designed to be as simple as possible and no simpler. In this guide, we will walk through aset of applications demonstrating specific points as we do. The entire applications can be found in the locationsbelow.

Note: The javadoc is the definitive source of documentation. Please consider this guide as only a starting point.

Embedded Media Driver

The Aeron Media Driver can be run standalone and handle many applications. However, in some situations, it is desirable to run the media driver within the application.

In this case, aMediaDriver can be instantiated in the process. Only a single one is needed, but it does require some resources as discussedhere.

When running an embedded Media Driver, it is recommended to set the following via system properties or directly viaMediaDriver.Context passed intoMediaDriver.launch:

  • Log Buffer Locations, specified byMediaDriver.Context.aeronDirectoryName(), should point to a specific location as to not interfere with other Media Driver instances and
  • Threading Modes should be considered carefully as they will be spawned within the parent process.

An example of starting up an embedded Media Driver.

finalMediaDriverdriver =MediaDriver.launch();

To guarantee that an embedded Media Driver does not interfere with other Media Drivers, one can use the following launch method:

finalMediaDriverdriver =MediaDriver.launchEmbedded();

The difference is that the latter launches a Media Driver with a randomly generatedaeronDirectoryName if it detects that the default value has not been changed. This is enough to isolate it from other instances of a Media Driver.

Aeron

Aeron client applications need to coordinate operation with a running Media Driver. Either an embedded one or one that is standalone. This interaction handles creatingPublications andSubscriptions and housekeeping. The interaction point for the applicationis theAeron class.

finalAeronaeron =Aeron.connect(newAeron.Context());

Settings for the instance may be changed via anAeron.Context instance that is passed into theAeron.connect method, as mentionedhere.

To be able to establish connection with a Media Driver, Aeron must know the Aeron directory name used by the Media Driver.This can be left unspecified (default value is then used), passed as a system property, or set manually.When the Media Driver is launched in an embedded mode and the directory is randomly generated, one can use a convenientmethodMediaDriver.aeronDirectoryName() that provides the directory name of the Media Driver. It can be then used to setAeron.Context.aeronDirectoryName() with this value and pass this context to theAeron.connect method, as shown below.

finalMediaDriverdriver =MediaDriver.launchEmbedded();Aeronaeron =Aeron.connect(newAeron.Context().aeronDirectoryName(driver.aeronDirectoryName()))

Event Handling

Aeron instances have a set of handlers that might be called for some events. The application can specify these handlers via theAeron.Context instance used to create the instance.

  • Aeron.Context.errorHandler lets the application specify a lambda to call when errors/exceptions occur.
  • Aeron.Context.availableImageHandler specifies a lambda to call when images are available. An image is the replication of the publication stream on the subscription side.
  • Aeron.Context.unavailableImageHandler specifies a lambda to call when an image becomes unavailable.

These handlers are called from theClientConductor thread.

FromBasicSubscriber:

finalAeron.Contextctx =newAeron.Context()    .availableImageHandler(SamplesUtil::printAvailableImage)    .unavailableImageHandler(SamplesUtil::printUnavailableImage);

DirectBuffer

Accessing and modifying buffers that Aeron uses for sending and receiving of messages is done via a set ofinterfaces.

The methods should look familiar to anyone you usesByteBuffer regularly. However, it extends and provides a more appropriate implementation for efficient handling of data layout.

In many cases, the use ofUnsafeBufferwill allow for the most efficient operation. To be useful, aByteBuffer,byte[], etc. must be wrapped. Once wrapped, then mutation or access of the underlying data can be done.

FromBasicPublisher, putting some bytes into a buffer:

privatestaticfinalUnsafeBufferBUFFER =newUnsafeBuffer(BufferUtil.allocateDirectAligned(256,64));...finalStringmessage ="Hello World!";BUFFER.putBytes(0,message.getBytes());

For a subscriber, grabbing some bytes from a buffer:

(buffer,offset,length,header) ->{finalbyte[]data =newbyte[length];buffer.getBytes(offset,data);...}

Subscription

An application that desires to listen to data needs to use achannel and stream to listen on. ASubscription aggregates zero or moreImages for the same channel and stream id.Images are identified by session id from unique sources that is encoded in the opaqueImage.sourceIdentity().

FromBasicSubscriber, listen on a channel and a stream:

finalAeronaeron =Aeron.connect(newAeron.Context());finalSubscriptionsubscription =aeron.addSubscription(CHANNEL,STREAM_ID);

Note: TheAeron.addSubscription method will block until the Media Driver acknowledges the request or a timeout occurs.

Polling

Subscribing applications totally control when data is delivered to theFragmentHandler methods via theSubscription.poll orImage.poll methods,Subscriptions delegate polling to the matchingImages. When called, this method determines if there is any messages to deliver and delivers them via theFragmentHandler interface up to the limit of the number of fragments to deliver before returning.

Example of polling for new messages with a per poll limit of 10 fragments and anIdle Strategy.

finalFragmentHandlerfragmentHandler = ...// defined belowfinalIdleStrategyidleStrategy =newBackoffIdleStrategy(100,10,TimeUnit.MICROSECONDS.toNanos(1),TimeUnit.MICROSECONDS.toNanos(100));while (...){finalintfragmentsRead =subscription.poll(fragmentHandler,10);idleStrategy.idle(fragmentsRead);}

FragmentHandler

Messages are read fromImage instances viaFragmentHandler callbacks. This interface is a functional interface.The arguments are:

  • buffer holding the data
  • offset indicating the offset in the buffer that starts the message
  • length of the message
  • header holding the metadata of the message

Example of printing the contents of a message as a string along with some metadata:

finalFragmentHandlerfragmentHandler = (buffer,offset,length,header) ->{finalbyte[]data =newbyte[length];buffer.getBytes(offset,data);System.out.println(String.format("Message to stream %d from session %d (%d@%d) <<%s>>",streamId,header.sessionId(),length,offset,newString(data)));};

Message Reassembly

Publication instances automatically fragment large messages into data frames that Aeron sends.Subscription instances that desire these fragments to be reassembled prior to delivery to theFragmentHandler can chain an instance ofFragmentAssembler to do this by composition.

FragmentHandlerfragmentAssembler =newFragmentAssembler(fragmentHandler);finalintfragmentsRead =subscription.poll(fragmentAssembler,10);

Note: Reassembly has been shown to be minimal impact to latency. But nottotally negligible. If the lowest latency is desired, then limiting message sizes to MTU size is a good practice.

Note: There is a maximum length allowed for messages which is the min of 1/8th a term length or 16MB. Messages larger than this should chunked using an application level chunking protocol. Chunking has better recovery properties from failure and streams with mechanical sympathy.

Advanced Polling

At times you may wish to take more control in how a Subscription/Image is polled. For example, if you wish to archive a stream of messages in parallel then theImage.blockPoll orImage.rawPoll can be used to efficiently copy available ranges of messages in a stream to another location.

It is also possible to control the polling action with theImage orSubscription.controlledPoll method. This method takes aControlledFragmentHandler that returns the action which should be taken after the message fragment is handled.

When handling a fragment with theControlledFragmentHandler the following return codes can be used to control the polling action:

  • ABORT the current polling operation and do not advance the position for this fragment.
  • BREAK from the current polling operation and commit the position as of the end of the current fragment being handled.
  • COMMIT Continue processing but commit the position as of the end of the current fragment so that flow control is applied to this point.
  • CONTINUE Continue processing taking the same approach as the in the standardFragmentHandler

Publication

An application that desires to send data needs to specify achannel and stream to send to.

FromBasicpublisher, send to a channel and a stream:

finalAeronaeron =Aeron.connect(newAeron.Context());finalPublicationpublication =aeron.addPublication(CHANNEL,STREAM_ID);

Note: TheAeron.addPublication method will block until the Media Driver acknowledges the request or a timeout occurs.

Afterwards, the application is free to send data via thePublication.offer method.

privatestaticfinalUnsafeBufferBUFFER =newUnsafeBuffer(BufferUtil.allocateDirectAligned(256,64));...finalStringmessage ="Hello World!";BUFFER.putBytes(0,message.getBytes());finallongresultingPosition =publication.offer(BUFFER,0,message.getBytes().length);

Handling Back Pressure

Aeron has built in back pressure for a publisher. It will not allow a publisher to send data that exceeds proscribed flow control limits.

When callingPublication.offer a return value greater than 0 indicates the message was sent. Negative values indicate that the message has not been enqueued for sending. Constants for negative values are as follows:

  • NOT_CONNECTED means no subscriber is connected to the publication, this can be a transient state as subscribers come and go.
  • BACK_PRESSURED indicates the message was not sent due to back pressure from Subscribers, but can be retried if desired.
  • ADMIN_ACTION indicates the message was not sent due to an administration action, such as log rotation, but can be retried if desired.
  • CLOSED indicates the Publication has been closed either by another client thread, or if the channel is invalid, or if the client has timed out.
  • MAX_POSITION_EXCEEDED indicates that the Publication has reached the maximum possible position given the term-length. This is possible with a small term-length. Max position is 2^31 * term-length for a Publication.

The ways that an application may handle back pressure are, by necessity, dependent on the application semantics. Here are a few options. This isnot an exhaustive list.

  • Retry until success. Keep callingPublication.offer until it succeeds. This may spin or have some sortof idle strategy. Many examples do this.
  • Ignore failure and continue. Ignore that the data didn't send and move on. This is usually appropriate for situations wherethe data being sent has some lifetime and it would be better to not send stale data.
  • Retry until success or timeout. As normal retry with or without some sort of idle strategy but with a timeout attached.
  • Retry asynchronously. Retry periodically, but instead of idling, do some other work.

The needs of an application, or system, are quite complex. The common use case is one of non-blocking offer, though. Out of this more complex scenarios may be developed.

Monitoring

The Aeron Media Driver and the status of various buffers may be monitored outside of the driver via the counter files in use by the driver. Below is an example application that reads this data and prints it periodically. Full source can be foundhere.

/** * Tool for printing out Aeron counters. A command-and-control (CnC) file is maintained by media driver * in shared memory. This application reads the the cnc file and prints the counters. Layout of the cnc file is * described in {@link CncFileDescriptor}. * <p> * This tool accepts filters on the command line, e.g. for connections only see example below: * <p> * <code> * java -cp aeron-samples/build/libs/samples.jar io.aeron.samples.AeronStat type=[1-9] identity=12345 * </code> */publicclassAeronStat{privatestaticfinalStringANSI_CLS ="\u001b[2J";privatestaticfinalStringANSI_HOME ="\u001b[H";/**     * The delay in seconds between each update.     */privatestaticfinalStringDELAY ="delay";/**     * Whether to watch for updates or run once.     */privatestaticfinalStringWATCH ="watch";/**     * Types of the counters.     * <ul>     * <li>0: System Counters</li>     * <li>1 - 5, 9, 10, 11: Stream Positions and Indicators</li>     * <li>6 - 7: Channel Endpoint Status</li>     * </ul>     */privatestaticfinalStringCOUNTER_TYPE_ID ="type";/**     * The identity of each counter that can either be the system counter id or registration id for positions.     */privatestaticfinalStringCOUNTER_IDENTITY ="identity";/**     * Session id filter to be used for position counters.     */privatestaticfinalStringCOUNTER_SESSION_ID ="session";/**     * Stream id filter to be used for position counters.     */privatestaticfinalStringCOUNTER_STREAM_ID ="stream";/**     * Channel filter to be used for position counters.     */privatestaticfinalStringCOUNTER_CHANNEL ="channel";publicstaticvoidmain(finalString[]args)throwsException    {longdelayMs =1000L;booleanwatch =true;PatterntypeFilter =null;PatternidentityFilter =null;PatternsessionFilter =null;PatternstreamFilter =null;PatternchannelFilter =null;if (0 !=args.length)        {checkForHelp(args);for (finalStringarg :args)            {finalintequalsIndex =arg.indexOf('=');if (-1 ==equalsIndex)                {System.out.println("Arguments must be in name=pattern format: Invalid '" +arg +"'");return;                }finalStringargName =arg.substring(0,equalsIndex);finalStringargValue =arg.substring(equalsIndex +1);switch (argName)                {caseWATCH:watch =Boolean.parseBoolean(argValue);break;caseDELAY:delayMs =Long.parseLong(argValue) *1000L;break;caseCOUNTER_TYPE_ID:typeFilter =Pattern.compile(argValue);break;caseCOUNTER_IDENTITY:identityFilter =Pattern.compile(argValue);break;caseCOUNTER_SESSION_ID:sessionFilter =Pattern.compile(argValue);break;caseCOUNTER_STREAM_ID:streamFilter =Pattern.compile(argValue);break;caseCOUNTER_CHANNEL:channelFilter =Pattern.compile(argValue);break;default:System.out.println("Unrecognised argument: '" +arg +"'");return;                }            }        }finalCncFileReadercncFileReader =CncFileReader.map();finalCounterFiltercounterFilter =newCounterFilter(typeFilter,identityFilter,sessionFilter,streamFilter,channelFilter);if (watch)        {workLoop(delayMs, () ->printOutput(cncFileReader,counterFilter));        }else        {printOutput(cncFileReader,counterFilter);        }    }privatestaticvoidworkLoop(finallongdelayMs,finalRunnableoutputPrinter)throwsException    {finalAtomicBooleanrunning =newAtomicBoolean(true);SigInt.register(() ->running.set(false));do        {clearScreen();outputPrinter.run();Thread.sleep(delayMs);        }while (running.get());    }privatestaticvoidprintOutput(finalCncFileReadercncFileReader,finalCounterFiltercounterFilter)    {finalSimpleDateFormatdateFormat =newSimpleDateFormat("HH:mm:ss");System.out.print(dateFormat.format(newDate()));System.out.println(" - Aeron Stat (CnC v" +cncFileReader.semanticVersion() +")" +", pid " +SystemUtil.getPid() +", heartbeat age " +cncFileReader.driverHeartbeatAgeMs() +"ms");System.out.println("======================================================================");finalCountersReadercounters =cncFileReader.countersReader();counters.forEach(            (counterId,typeId,keyBuffer,label) ->            {if (counterFilter.filter(typeId,keyBuffer))                {finallongvalue =counters.getCounterValue(counterId);System.out.format("%3d: %,20d - %s%n",counterId,value,label);                }            }        );System.out.println("--");    }privatestaticvoidcheckForHelp(finalString[]args)    {for (finalStringarg :args)        {if ("-?".equals(arg) ||"-h".equals(arg) ||"-help".equals(arg))            {System.out.format("Usage: [-Daeron.dir=<directory containing CnC file>] AeronStat%n" +"\t[delay=<seconds between updates>]%n" +"\t[watch=<true|false>]%n" +"filter by optional regex patterns:%n" +"\t[type=<pattern>]%n" +"\t[identity=<pattern>]%n" +"\t[sessionId=<pattern>]%n" +"\t[streamId=<pattern>]%n" +"\t[channel=<pattern>]%n");System.exit(0);            }        }    }privatestaticvoidclearScreen()throwsException    {if (SystemUtil.isWindows())        {newProcessBuilder("cmd","/c","cls").inheritIO().start().waitFor();        }else        {System.out.print(ANSI_CLS +ANSI_HOME);        }    }staticclassCounterFilter    {privatefinalPatterntypeFilter;privatefinalPatternidentityFilter;privatefinalPatternsessionFilter;privatefinalPatternstreamFilter;privatefinalPatternchannelFilter;CounterFilter(finalPatterntypeFilter,finalPatternidentityFilter,finalPatternsessionFilter,finalPatternstreamFilter,finalPatternchannelFilter)        {this.typeFilter =typeFilter;this.identityFilter =identityFilter;this.sessionFilter =sessionFilter;this.streamFilter =streamFilter;this.channelFilter =channelFilter;        }privatestaticbooleanmatch(finalPatternpattern,finalSupplier<String>supplier)        {returnnull ==pattern ||pattern.matcher(supplier.get()).find();        }booleanfilter(finalinttypeId,finalDirectBufferkeyBuffer)        {if (!match(typeFilter, () ->Integer.toString(typeId)))            {returnfalse;            }if (SYSTEM_COUNTER_TYPE_ID ==typeId && !match(identityFilter, () ->Integer.toString(keyBuffer.getInt(0))))            {returnfalse;            }elseif ((typeId >=PUBLISHER_LIMIT_TYPE_ID &&typeId <=RECEIVER_POS_TYPE_ID) ||typeId ==SENDER_LIMIT_TYPE_ID ||typeId ==PER_IMAGE_TYPE_ID ||typeId ==PUBLISHER_POS_TYPE_ID)            {returnmatch(identityFilter, () ->Long.toString(keyBuffer.getLong(REGISTRATION_ID_OFFSET))) &&match(sessionFilter, () ->Integer.toString(keyBuffer.getInt(SESSION_ID_OFFSET))) &&match(streamFilter, () ->Integer.toString(keyBuffer.getInt(STREAM_ID_OFFSET))) &&match(channelFilter, () ->keyBuffer.getStringAscii(CHANNEL_OFFSET));            }elseif (typeId >=SEND_CHANNEL_STATUS_TYPE_ID &&typeId <=RECEIVE_CHANNEL_STATUS_TYPE_ID)            {returnmatch(channelFilter, () ->keyBuffer.getStringAscii(ChannelEndpointStatus.CHANNEL_OFFSET));            }returntrue;        }    }}

TheAeronStat application above does the following:

  1. Find labels and values files in the file system
  2. Map the files intoMappedByteBuffer instances
  3. Use anUnsafeBuffer to read the values
  4. Use aCountersReader to grab context for the values and labels.
  5. Set up aSigInt to handle control-C out of the application
  6. While running, in a loop do the following:
    1. Grab the time
    2. For each counter, grab its value and print out a line with the timestamp, label, and value.

Clone this wiki locally


[8]ページ先頭

©2009-2025 Movatter.jp