i have small wrapper for concurentMap if i put data to map and then get from same "for" i can getting value by key but if i get value from other "for", map is empty! can somebody explain me, how it works? i tried many variants to get value by key from different "for", but it doesnt works This code in MainListener doesn't work as expected
_currentChannels.add(id, Ref.make(channel)) *> ZIO.debug("connection accepted") *> _currentChannels.get(id) _currentChannels.get(id) -> this function does not find the value by key type ChannelsMap = UIO[Ref[ZIO[Any, Nothing, ConcurrentMap[String, UIO[Ref[AsynchronousSocketChannel]]]]]]
trait CurrentChannels { def add(id: String, channel: UIO[Ref[AsynchronousSocketChannel]]): ZIO[Any, Throwable, Unit] def get(id: String): ZIO[Any, Throwable, AsynchronousSocketChannel] def remove(id: String): ZIO[Any, Nothing, Unit]}
object CurrentChannels { final class CurrentChannelsLive(channels: ChannelsMap) extends CurrentChannels { def add(id: String, channel: UIO[Ref[AsynchronousSocketChannel]]) = for { _r <- channels _ <- ZIO.debug(s"prepare to add key ${id}") _ref <- _r.get _option_ref <- _ref _f <- _option_ref.putIfAbsent(id, channel) _g <- _option_ref.get(id) _ <- _g match case Some(c) => ZIO.debug(s"key ${id} was successfully added") *> ZIO.succeed(c) case None => ZIO.debug(s"unable to add key ${id}") *> ZIO.fail(Throwable("unable to add key")) } yield() def get(id: String) = for { _r <- channels _ref <- _r.get _option_ref <- _ref _channel_ref <- _option_ref.get(id) _empty <- _option_ref.isEmpty _channel <- _channel_ref match case Some(c) => ZIO.debug(s"key ${id} found: ${_empty}") *> ZIO.succeed(c) case None => ZIO.debug(s"key ${id} not found: : ${_empty}") *> ZIO.fail(Throwable("socket not found")) channel <- _channel.flatMap(_.get) } yield channel def remove(id: String) = for { _r <- channels _ <- ZIO.debug(s"prepare to remove key ${id}") _ref <- _r.get _option_ref <- _ref _ <- _option_ref.remove(id) } yield() } val live: ZLayer[Any, Nothing, CurrentChannels] = ZLayer.succeed(CurrentChannelsLive(Ref.make(ConcurrentMap.empty[String, UIO[Ref[AsynchronousSocketChannel]]])))}
trait MainListener { def run(port: Int): ZIO[Any, Throwable, Nothing]}
object MainListener { final class MainListenerLive(_currentChannels: CurrentChannels) extends MainListener{ def run(port: Int): ZIO[Any, Throwable, Nothing] = ZIO.debug("Service Run") *> ZIO.scoped { AsynchronousServerSocketChannel.open .flatMap { socket => for { address <- InetSocketAddress.wildCard(port) _ <- socket.bindTo(address) _ <- ZIO.debug("Socket bind") _ <- socket.accept .flatMap(channel => { val id = UUID.randomUUID().toString() _currentChannels.add(id, Ref.make(channel)) *> ZIO.debug("connection accepted") *> _currentChannels.get(id) }) .onInterrupt({ ZIO.debug("connection closed") }) .forever .fork } yield () } *> ZIO.never } } val live: ZLayer[CurrentChannels, Nothing, MainListener] = ZLayer { for { _currentChannels <- ZIO.service[CurrentChannels] } yield MainListenerLive(_currentChannels) }}
And main object object Main extends ZIOAppDefault { def run = for { service <- ZIO.service[MainListener].provide(MainListener.live, CurrentChannels.live) fiber <- service.run(1337).fork _ <- fiber.join } yield ()}
|