// Automatically invalidate on release.varpipe:Pipe<String>?// Automatically invalidate on release.vartoken1:PipeSourceToken?vartoken2:PipeSourceToken?foo(){ pipe=Pipe<String>() // Source1 token1= pipe?.sourceChannel.read{ valueinprint("source 1:\(value)")}DispatchQueue.main.asyncAfter(deadline:.now()+1){ // Broadcast a value.self.pipe?.sinkChannel.write("郑念真 520") // Source2: Receive value on main queue, and replay latest value. token2= pipe?.sourceChannel.read(onQueue:.main, replay:true){ valueinprint("source 2:\(value)")}DispatchQueue.main.asyncAfter(deadline:.now()+1){ // Manual invalidation, Source1 will not receive value again.self.token1?.invalidate()self.pipe?.sinkChannel.write("1314")self.pipe=nil}}}foo()// print// source 1: 郑念真 520// source 2: 郑念真 520// source 2: 1314