Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Commitd87e8cf

Browse files
feature: Added FanOut/FanIn Pattern (iluwatar#1800)
* Added FanOut/FanIn Pattern (iluwatar#8)*iluwatar#1627 adding fanout-fanin pattern*iluwatar#1627 adding class diagram image*iluwatar#1627 adding readme*iluwatar#1627 adding license*iluwatar#1627 updating relations*iluwatar#1627 interrupting the thread*iluwatar#1627 fixing sonar issues*iluwatar#1627 fixing sonar issues*iluwatar#1627 adding more info in README.md* Added FanOut/FanIn (iluwatar#9)*iluwatar#1627 adding fanout-fanin pattern*iluwatar#1627 adding class diagram image*iluwatar#1627 adding readme*iluwatar#1627 adding license*iluwatar#1627 updating relations*iluwatar#1627 interrupting the thread*iluwatar#1627 fixing sonar issues*iluwatar#1627 fixing sonar issues*iluwatar#1627 adding more info in README.md*iluwatar#1627 adding programmatic examples in README.md
1 parentc5a4068 commitd87e8cf

File tree

14 files changed

+646
-0
lines changed

14 files changed

+646
-0
lines changed

‎fanout-fanin/README.md

Lines changed: 120 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,120 @@
1+
---
2+
layout:pattern
3+
title:Fan-Out/Fan-In
4+
folder:fanout-fanin
5+
permalink:/patterns/fanout-fanin/
6+
categories:Integration
7+
language:en
8+
tags:
9+
-Microservices
10+
---
11+
12+
##Intent
13+
The pattern is used when a source system needs to run one or more long-running processes that will fetch some data.
14+
The source will not block itself waiting for the reply. <br> The pattern will run the same function in multiple
15+
services or machines to fetch the data. This is equivalent to invoking the function multiple times on different chunks of data.
16+
17+
##Explanation
18+
The FanOut/FanIn service will take in a list of requests and a consumer. Each request might complete at a different time.
19+
FanOut/FanIn service will accept the input params and returns the initial system an ID to acknowledge that the pattern
20+
service has received the requests. Now the caller will not wait or expect the result in the same connection.
21+
22+
Meanwhile, the pattern service will invoke the requests that have come. The requests might complete at different time.
23+
These requests will be processed in different instances of the same function in different machines or services. As the
24+
requests get completed, a callback service everytime is called that transforms the result into a common single object format
25+
that gets pushed to a consumer. The caller will be at the other end of the consumer receiving the result.
26+
27+
**Programmatic Example**
28+
29+
The implementation provided has a list of numbers and end goal is to square the numbers and add them to a single result.
30+
`FanOutFanIn` class receives the list of numbers in the form of list of`SquareNumberRequest` and a`Consumer` instance
31+
that collects the results as the requests get over.`SquareNumberRequest` will square the number with a random delay
32+
to give the impression of a long-running process that can complete at any time.`Consumer` instance will add the results from
33+
different`SquareNumberRequest` that will come random time instances.
34+
35+
Let's look at`FanOutFanIn` class that fans out the requests in async processes.
36+
37+
```java
38+
publicclassFanOutFanIn {
39+
publicstaticLongfanOutFanIn(
40+
finalList<SquareNumberRequest>requests,finalConsumerconsumer) {
41+
42+
ExecutorService service=Executors.newFixedThreadPool(requests.size());
43+
44+
// fanning out
45+
List<CompletableFuture<Void>> futures=
46+
requests.stream()
47+
.map(
48+
request->
49+
CompletableFuture.runAsync(()-> request.delayedSquaring(consumer), service))
50+
.collect(Collectors.toList());
51+
52+
CompletableFuture.allOf(futures.toArray(newCompletableFuture[0])).join();
53+
54+
return consumer.getSumOfSquaredNumbers().get();
55+
}
56+
}
57+
```
58+
59+
`Consumer` is used a callback class that will be called when a request is completed. This will aggregate
60+
the result from all requests.
61+
62+
```java
63+
publicclassConsumer {
64+
65+
privatefinalAtomicLong sumOfSquaredNumbers;
66+
67+
Consumer(Longinit) {
68+
sumOfSquaredNumbers=newAtomicLong(init);
69+
}
70+
71+
publicLongadd(finalLongnum) {
72+
return sumOfSquaredNumbers.addAndGet(num);
73+
}
74+
}
75+
```
76+
77+
Request is represented as a`SquareNumberRequest` that squares the number with random delay and calls the
78+
`Consumer` once it is squared.
79+
80+
```java
81+
publicclassSquareNumberRequest {
82+
83+
privatefinalLong number;
84+
publicvoiddelayedSquaring(finalConsumerconsumer) {
85+
86+
var minTimeOut=5000L;
87+
88+
SecureRandom secureRandom=newSecureRandom();
89+
var randomTimeOut= secureRandom.nextInt(2000);
90+
91+
try {
92+
// this will make the thread sleep from 5-7s.
93+
Thread.sleep(minTimeOut+ randomTimeOut);
94+
}catch (InterruptedException e) {
95+
LOGGER.error("Exception while sleep", e);
96+
Thread.currentThread().interrupt();
97+
}finally {
98+
consumer.add(number* number);
99+
}
100+
}
101+
}
102+
```
103+
104+
##Class diagram
105+
![alt-text](./etc/fanout-fanin.png)
106+
107+
##Applicability
108+
109+
Use this pattern when you can divide the workload into multiple chunks that can be dealt with separately.
110+
111+
##Related patterns
112+
113+
*[Aggregator Microservices](https://java-design-patterns.com/patterns/aggregator-microservices/)
114+
*[API Gateway](https://java-design-patterns.com/patterns/api-gateway/)
115+
116+
##Credits
117+
118+
*[Understanding Azure Durable Functions - Part 8: The Fan Out/Fan In Pattern](http://dontcodetired.com/blog/post/Understanding-Azure-Durable-Functions-Part-8-The-Fan-OutFan-In-Pattern)
119+
*[Fan-out/fan-in scenario in Durable Functions - Cloud backup example](https://docs.microsoft.com/en-us/azure/azure-functions/durable/durable-functions-cloud-backup)
120+
*[Understanding the Fan-Out/Fan-In API Integration Pattern](https://dzone.com/articles/understanding-the-fan-out-fan-in-api-integration-p)

‎fanout-fanin/etc/fanout-fanin.png

38.8 KB
Loading
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
@startuml
2+
packagecom.iluwatar.fanout.fanin {
3+
classApp {
4+
-LOGGER : Logger {static}
5+
+App()
6+
+main(args : String[]) {static}
7+
}
8+
classConsumer {
9+
-sumOfSquaredNumbers :AtomicLong
10+
~Consumer(init : Long)
11+
+add(num : Long) : Long
12+
+getSumOfSquaredNumbers() :AtomicLong
13+
}
14+
classFanOutFanIn {
15+
+FanOutFanIn()
16+
+fanOutFanIn(requests : List<SquareNumberRequest>, consumer : Consumer) : Long {static}
17+
}
18+
classSquareNumberRequest {
19+
-LOGGER : Logger {static}
20+
-number :Long
21+
+SquareNumberRequest(number : Long)
22+
+delayedSquaring(consumer : Consumer)
23+
}
24+
25+
objectSquareNumberRequest1
26+
objectSquareNumberRequest2
27+
objectSquareNumberRequest3
28+
diamonddia
29+
}
30+
31+
App-->FanOutFanIn
32+
FanOutFanIn--> "fanout-runninginparallel"SquareNumberRequest1
33+
FanOutFanIn--> "fanout"SquareNumberRequest2
34+
FanOutFanIn--> "fanout"SquareNumberRequest3
35+
SquareNumberRequest1--> "fanin-aggregateusingcallback"dia
36+
SquareNumberRequest2--> "fanin"dia
37+
SquareNumberRequest3--> "fanin"dia
38+
dia-->Consumer
39+
@enduml

‎fanout-fanin/pom.xml

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<!--
3+
4+
The MIT License (MIT)
5+
6+
Copyright © 2014-2021 Ilkka Seppälä
7+
8+
Permission is hereby granted, free of charge, to any person obtaining a copy
9+
of this software and associated documentation files (the "Software"), to deal
10+
in the Software without restriction, including without limitation the rights
11+
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
12+
copies of the Software, and to permit persons to whom the Software is
13+
furnished to do so, subject to the following conditions:
14+
15+
The above copyright notice and this permission notice shall be included in all
16+
copies or substantial portions of the Software.
17+
18+
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
19+
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
20+
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
21+
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
22+
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
23+
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
24+
SOFTWARE.
25+
26+
Module Model-view-viewmodel is using ZK framework
27+
ZK framework is licensed under LGPL and the license can be found at lgpl-3.0.txt
28+
29+
-->
30+
<projectxmlns="http://maven.apache.org/POM/4.0.0"
31+
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
32+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
33+
<parent>
34+
<artifactId>java-design-patterns</artifactId>
35+
<groupId>com.iluwatar</groupId>
36+
<version>1.25.0-SNAPSHOT</version>
37+
</parent>
38+
<modelVersion>4.0.0</modelVersion>
39+
40+
<artifactId>fanout-fanin</artifactId>
41+
42+
<dependencies>
43+
<dependency>
44+
<groupId>org.junit.jupiter</groupId>
45+
<artifactId>junit-jupiter-engine</artifactId>
46+
<scope>test</scope>
47+
</dependency>
48+
</dependencies>
49+
<build>
50+
<plugins>
51+
<plugin>
52+
<groupId>org.apache.maven.plugins</groupId>
53+
<artifactId>maven-assembly-plugin</artifactId>
54+
<executions>
55+
<execution>
56+
<configuration>
57+
<archive>
58+
<manifest>
59+
<mainClass>com.iluwatar.fanout.fanin.App</mainClass>
60+
</manifest>
61+
</archive>
62+
</configuration>
63+
</execution>
64+
</executions>
65+
</plugin>
66+
</plugins>
67+
</build>
68+
69+
</project>
Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
/*
2+
* The MIT License
3+
* Copyright © 2014-2021 Ilkka Seppälä
4+
*
5+
* Permission is hereby granted, free of charge, to any person obtaining a copy
6+
* of this software and associated documentation files (the "Software"), to deal
7+
* in the Software without restriction, including without limitation the rights
8+
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
9+
* copies of the Software, and to permit persons to whom the Software is
10+
* furnished to do so, subject to the following conditions:
11+
*
12+
* The above copyright notice and this permission notice shall be included in
13+
* all copies or substantial portions of the Software.
14+
*
15+
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16+
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17+
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18+
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19+
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
20+
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
21+
* THE SOFTWARE.
22+
*/
23+
24+
packagecom.iluwatar.fanout.fanin;
25+
26+
importjava.util.Arrays;
27+
importjava.util.List;
28+
importjava.util.stream.Collectors;
29+
30+
importlombok.extern.slf4j.Slf4j;
31+
32+
33+
34+
/**
35+
* FanOut/FanIn pattern is a concurrency pattern that refers to executing multiple instances of the
36+
* activity function concurrently. The "fan out" part is essentially splitting the data into
37+
* multiple chunks and then calling the activity function multiple times, passing the chunks.
38+
*
39+
* <p>When each chunk has been processed, the "fan in" takes place that aggregates results from each
40+
* instance of function and forms a single final result.
41+
*
42+
* <p>This pattern is only really useful if you can “chunk” the workload in a meaningful way for
43+
* splitting up to be processed in parallel.
44+
*/
45+
@Slf4j
46+
publicclassApp {
47+
48+
/**
49+
* Entry point.
50+
*
51+
* <p>Implementation provided has a list of numbers that has to be squared and added. The list can
52+
* be chunked in any way and the "activity function" {@link
53+
* SquareNumberRequest#delayedSquaring(Consumer)} i.e. squaring the number ca be done
54+
* concurrently. The "fan in" part is handled by the {@link Consumer} that takes in the result
55+
* from each instance of activity and aggregates it whenever that particular activity function
56+
* gets over.
57+
*/
58+
publicstaticvoidmain(String[]args) {
59+
finalList<Long>numbers =Arrays.asList(1L,3L,4L,7L,8L);
60+
61+
LOGGER.info("Numbers to be squared and get sum --> {}",numbers);
62+
63+
finalList<SquareNumberRequest>requests =
64+
numbers.stream().map(SquareNumberRequest::new).collect(Collectors.toList());
65+
66+
varconsumer =newConsumer(0L);
67+
68+
// Pass the request and the consumer to fanOutFanIn or sometimes referred as Orchestrator
69+
// function
70+
finalLongsumOfSquaredNumbers =FanOutFanIn.fanOutFanIn(requests,consumer);
71+
72+
LOGGER.info("Sum of all squared numbers --> {}",sumOfSquaredNumbers);
73+
}
74+
}
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
/*
2+
* The MIT License
3+
* Copyright © 2014-2021 Ilkka Seppälä
4+
*
5+
* Permission is hereby granted, free of charge, to any person obtaining a copy
6+
* of this software and associated documentation files (the "Software"), to deal
7+
* in the Software without restriction, including without limitation the rights
8+
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
9+
* copies of the Software, and to permit persons to whom the Software is
10+
* furnished to do so, subject to the following conditions:
11+
*
12+
* The above copyright notice and this permission notice shall be included in
13+
* all copies or substantial portions of the Software.
14+
*
15+
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16+
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17+
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18+
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19+
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
20+
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
21+
* THE SOFTWARE.
22+
*/
23+
24+
packagecom.iluwatar.fanout.fanin;
25+
26+
importjava.util.concurrent.atomic.AtomicLong;
27+
28+
importlombok.Getter;
29+
30+
31+
32+
/**
33+
* Consumer or callback class that will be called everytime a request is complete This will
34+
* aggregate individual result to form a final result.
35+
*/
36+
@Getter
37+
publicclassConsumer {
38+
39+
privatefinalAtomicLongsumOfSquaredNumbers;
40+
41+
Consumer(Longinit) {
42+
sumOfSquaredNumbers =newAtomicLong(init);
43+
}
44+
45+
publicLongadd(finalLongnum) {
46+
returnsumOfSquaredNumbers.addAndGet(num);
47+
}
48+
}

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp