Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings
This repository was archived by the owner on Feb 2, 2025. It is now read-only.

Commitac60867

Browse files
authored
Merge pull request#20 from belozierov/ConflatedChannel
Added ConflatedChannel
2 parents592d899 +f064bd1 commitac60867

File tree

76 files changed

+2526
-883
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

76 files changed

+2526
-883
lines changed

‎README.md‎

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -200,7 +200,7 @@ To create channels, use the `CoChannel` class.
200200

201201
```swift
202202
//create a channel with a buffer which can store only one element
203-
let channel= CoChannel<Int>(maxBufferSize:1)
203+
let channel= CoChannel<Int>(capacity:1)
204204

205205
DispatchQueue.global().startCoroutine {
206206
for iin0..<100 {
Lines changed: 193 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,193 @@
1+
//
2+
// _BufferedChannel.swift
3+
// SwiftCoroutine
4+
//
5+
// Created by Alex Belozierov on 07.06.2020.
6+
// Copyright © 2020 Alex Belozierov. All rights reserved.
7+
//
8+
9+
internalfinalclass_BufferedChannel<T>:_Channel<T>{
10+
11+
privatetypealiasReceiveCallback=(Result<T,CoChannelError>)->Void
12+
privatestructSendBlock{letelement:T,resumeBlock:((CoChannelError?)->Void)?}
13+
14+
privateletcapacity:Int
15+
privatevarreceiveCallbacks=FifoQueue<ReceiveCallback>()
16+
privatevarsendBlocks=FifoQueue<SendBlock>()
17+
privatevaratomic=AtomicTuple()
18+
19+
internalinit(capacity:Int){
20+
self.capacity=max(0, capacity)
21+
}
22+
23+
internaloverridevarbufferType:CoChannel<T>.BufferType{
24+
switch capacity{
25+
case.max:return.unlimited
26+
case0:return.none
27+
caselet capacity:return.buffered(capacity: capacity)
28+
}
29+
}
30+
31+
// MARK: - send
32+
33+
internaloverridefunc awaitSend(_ element:T)throws{
34+
switch atomic.update({ count, statein
35+
if state!=0{return(count, state)}
36+
return(count+1,0)
37+
}).old{
38+
case(_,1):
39+
throwCoChannelError.closed
40+
case(_,2):
41+
throwCoChannelError.canceled
42+
case(let count, _)where count<0:
43+
receiveCallbacks.blockingPop()(.success(element))
44+
case(let count, _)where count< capacity:
45+
sendBlocks.push(.init(element: element, resumeBlock:nil))
46+
default:
47+
tryCoroutine.await{
48+
sendBlocks.push(.init(element: element, resumeBlock: $0))
49+
}.map{throw $0}
50+
}
51+
}
52+
53+
internaloverridefunc sendFuture(_ future:CoFuture<T>){
54+
future.whenSuccess{[weak self]in
55+
guardlet self=selfelse{return}
56+
let(count, state)=self.atomic.update{ count, statein
57+
if state!=0{return(count, state)}
58+
return(count+1,0)
59+
}.old
60+
guard state==0else{return}
61+
count<0
62+
?self.receiveCallbacks.blockingPop()(.success($0))
63+
:self.sendBlocks.push(.init(element: $0, resumeBlock:nil))
64+
}
65+
}
66+
67+
internaloverridefunc offer(_ element:T)->Bool{
68+
let(count, state)= atomic.update{ count, statein
69+
if state!=0 || count>= capacity{return(count, state)}
70+
return(count+1,0)
71+
}.old
72+
if state!=0{returnfalse}
73+
if count<0{
74+
receiveCallbacks.blockingPop()(.success(element))
75+
returntrue
76+
}elseif count< capacity{
77+
sendBlocks.push(.init(element: element, resumeBlock:nil))
78+
returntrue
79+
}
80+
returnfalse
81+
}
82+
83+
// MARK: - receive
84+
85+
internaloverridefunc awaitReceive()throws->T{
86+
switch atomic.update({ count, statein
87+
if state==0{return(count-1,0)}
88+
return(Swift.max(0, count-1), state)
89+
}).old{
90+
case(let count,let state)where count>0:
91+
defer{if count==1, state==1{finish()}}
92+
returngetValue()
93+
case(_,0):
94+
returntryCoroutine.await{ receiveCallbacks.push($0)}.get()
95+
case(_,1):
96+
throwCoChannelError.closed
97+
default:
98+
throwCoChannelError.canceled
99+
}
100+
}
101+
102+
internaloverridefunc poll()->T?{
103+
let(count, state)= atomic.update{ count, statein
104+
(Swift.max(0, count-1), state)
105+
}.old
106+
guard count>0else{returnnil}
107+
defer{if count==1, state==1{finish()}}
108+
returngetValue()
109+
}
110+
111+
internaloverridefunc whenReceive(_ callback:@escaping(Result<T,CoChannelError>)->Void){
112+
switch atomic.update({ count, statein
113+
if state==0{return(count-1,0)}
114+
return(Swift.max(0, count-1), state)
115+
}).old{
116+
case(let count,let state)where count>0:
117+
callback(.success(getValue()))
118+
if count==1, state==1{finish()}
119+
case(_,0):
120+
receiveCallbacks.push(callback)
121+
case(_,1):
122+
callback(.failure(.closed))
123+
default:
124+
callback(.failure(.canceled))
125+
}
126+
}
127+
128+
internaloverridevarcount:Int{
129+
Int(max(0, atomic.value.0))
130+
}
131+
132+
internaloverridevarisEmpty:Bool{
133+
atomic.value.0<=0
134+
}
135+
136+
privatefunc getValue()->T{
137+
letblock= sendBlocks.blockingPop()
138+
block.resumeBlock?(nil)
139+
return block.element
140+
}
141+
142+
// MARK: - close
143+
144+
internaloverridefunc close()->Bool{
145+
let(count, state)= atomic.update{ count, statein
146+
state==0?(Swift.max(0, count),1):(count, state)
147+
}.old
148+
guard state==0else{returnfalse}
149+
if count<0{
150+
for_in0..<count.magnitude{
151+
receiveCallbacks.blockingPop()(.failure(.closed))
152+
}
153+
}elseif count>0{
154+
sendBlocks.forEach{ $0.resumeBlock?(.closed)}
155+
}else{
156+
finish()
157+
}
158+
returntrue
159+
}
160+
161+
internaloverridevarisClosed:Bool{
162+
atomic.value.1==1
163+
}
164+
165+
// MARK: - cancel
166+
167+
internaloverridefunc cancel(){
168+
letcount= atomic.update{ _in(0,2)}.old.0
169+
if count<0{
170+
for_in0..<count.magnitude{
171+
receiveCallbacks.blockingPop()(.failure(.canceled))
172+
}
173+
}elseif count>0{
174+
for_in0..<count{
175+
sendBlocks.blockingPop().resumeBlock?(.canceled)
176+
}
177+
}
178+
finish()
179+
}
180+
181+
internaloverridevarisCanceled:Bool{
182+
atomic.value.1==2
183+
}
184+
185+
deinit{
186+
whilelet block= receiveCallbacks.pop(){
187+
block(.failure(.canceled))
188+
}
189+
receiveCallbacks.free()
190+
sendBlocks.free()
191+
}
192+
193+
}
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
//
2+
// _Channel.swift
3+
// SwiftCoroutine
4+
//
5+
// Created by Alex Belozierov on 03.06.2020.
6+
// Copyright © 2020 Alex Belozierov. All rights reserved.
7+
//
8+
9+
@usableFromInlineinternalclass_Channel<T>:CoChannel<T>.Receiver{
10+
11+
privatevarcompleteBlocks=CallbackStack<CoChannelError?>()
12+
13+
// MARK: - send
14+
15+
@usableFromInlineinternalfunc awaitSend(_ element:T)throws{}
16+
@usableFromInlineinternalfunc sendFuture(_ future:CoFuture<T>){}
17+
@usableFromInlineinternalfunc offer(_ element:T)->Bool{false}
18+
19+
// MARK: - close
20+
21+
@usableFromInlineinternalfunc close()->Bool{false}
22+
23+
// MARK: - complete
24+
25+
internalfinaloverridefunc whenFinished(_ callback:@escaping(CoChannelError?)->Void){
26+
if !completeBlocks.append(callback){callback(channelError)}
27+
}
28+
29+
internalfinalfunc finish(){
30+
completeBlocks.close()?.finish(with: channelError)
31+
}
32+
33+
privatevarchannelError:CoChannelError?{
34+
if isClosed{return.closed}
35+
if isCanceled{return.canceled}
36+
returnnil
37+
}
38+
39+
deinit{
40+
finish()
41+
}
42+
43+
}
Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
//
2+
// _CoChannelMap.swift
3+
// SwiftCoroutine
4+
//
5+
// Created by Alex Belozierov on 07.06.2020.
6+
// Copyright © 2020 Alex Belozierov. All rights reserved.
7+
//
8+
9+
internalfinalclass_CoChannelMap<Input, Output>:CoChannel<Output>.Receiver{
10+
11+
privateletreceiver:CoChannel<Input>.Receiver
12+
privatelettransform:(Input)->Output
13+
14+
internalinit(receiver:CoChannel<Input>.Receiver, transform:@escaping(Input)->Output){
15+
self.receiver= receiver
16+
self.transform= transform
17+
}
18+
19+
internaloverridevarbufferType:CoChannel<Output>.BufferType{
20+
switch receiver.bufferType{
21+
case.buffered(let capacity):
22+
return.buffered(capacity: capacity)
23+
case.conflated:
24+
return.conflated
25+
case.none:
26+
return.none
27+
case.unlimited:
28+
return.unlimited
29+
}
30+
}
31+
32+
// MARK: - receive
33+
34+
internaloverridefunc awaitReceive()throws->Output{
35+
trytransform(receiver.awaitReceive())
36+
}
37+
38+
internaloverridefunc poll()->Output?{
39+
receiver.poll().map(transform)
40+
}
41+
42+
internaloverridefunc whenReceive(_ callback:@escaping(Result<Output,CoChannelError>)->Void){
43+
receiver.whenReceive{callback($0.map(self.transform))}
44+
}
45+
46+
internaloverridevarcount:Int{
47+
receiver.count
48+
}
49+
50+
internaloverridevarisEmpty:Bool{
51+
receiver.isEmpty
52+
}
53+
54+
// MARK: - close
55+
56+
internaloverridevarisClosed:Bool{
57+
receiver.isClosed
58+
}
59+
60+
// MARK: - cancel
61+
62+
internaloverridefunc cancel(){
63+
receiver.cancel()
64+
}
65+
66+
internaloverridevarisCanceled:Bool{
67+
receiver.isCanceled
68+
}
69+
70+
// MARK: - complete
71+
72+
internaloverridefunc whenFinished(_ callback:@escaping(CoChannelError?)->Void){
73+
receiver.whenFinished(callback)
74+
}
75+
76+
}

0 commit comments

Comments
 (0)

[8]ページ先頭

©2009-2025 Movatter.jp