Movatterモバイル変換


[0]ホーム

URL:


Skip to content

Navigation Menu

Sign in
Appearance settings

Search code, repositories, users, issues, pull requests...

Provide feedback

We read every piece of feedback, and take your input very seriously.

Saved searches

Use saved searches to filter your results more quickly

Sign up
Appearance settings

Monad/MonadIO, Handler, Coroutine/doNotation, Functional Programming features for Rust

License

NotificationsYou must be signed in to change notification settings

TeaEntityLab/fpRust

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

tagCrates.ioTravis CI Build Statusdocs

licensestarsforks

Monad, Functional Programming features for Rust

Why

I love functional programming, Rx-style coding.

However it's hard to implement them in Rust, and there're few libraries to achieve parts of them.

Thus I implemented fpRust. I hope you would like it :)

Features

  • MonadIO, Rx-like (fp_rust::monadio::MonadIO)

    • map/fmap/subscribe
    • async/sync
    • SupportFuture (to_future()) with *feature: for_futures
  • Publisher (fp_rust::publisher::Publisher)

    • SupportStream implementation(subscribe_as_stream()) with *feature: for_futures
  • Fp functions (fp_rust::fp)

    • compose!(), pipe!()
    • map!(), reduce!(), filter!(), foldl!(), foldr!()
    • contains!(), reverse!()
  • Async (fp_rust::sync &fp_rust::handler::HandlerThread)

    • simple BlockingQueue (inspired byJava BlockingQueue, implemented by built-instd::sync::mpsc::channel)
    • HandlerThread (inspired byAndroid Handler, implemented by built-instd::thread)
    • WillAsync (inspired byJava Future)
      • Support as aFuture with *feature: for_futures
    • CountDownLatch (inspired byJava CountDownLatch, implemented by built-instd::sync::Mutex)
      • Support as aFuture with *feature: for_futures
  • Cor (fp_rust::cor::Cor)

    • PythonicGenerator-like Coroutine
    • yield/yieldFrom
    • async/sync
  • Actor (fp_rust::actor::ActorAsync)

    • Pure simpleActor model(receive/send/spawn)
    • Context for keeping internal states
    • Able to communicate with Parent/Children Actors
  • DoNotation (fp_rust::cor::Cor)

    • Haskell DoNotation-like,macro

* Pattern matching

Usage

MonadIO (RxObserver-like)

Example:

externcrate fp_rust;use std::{    thread,    time,    sync::{Arc,Mutex,Condvar,}};use fp_rust::handler::{Handler,HandlerThread,};use fp_rust::common::SubscriptionFunc;use fp_rust::monadio::{MonadIO,    of,};use fp_rust::sync::CountDownLatch;// fmap & map (sync)letmut _subscription =Arc::new(SubscriptionFunc::new(move |x:Arc<u16>|{println!("monadio_sync {:?}", x);// monadio_sync 36assert_eq!(36,*Arc::make_mut(&mut x.clone()));}));let subscription = _subscription.clone();let monadio_sync =MonadIO::just(1).fmap(|x|MonadIO::new(move || x*4)).map(|x| x*3).map(|x| x*3);monadio_sync.subscribe(subscription);// fmap & map (async)letmut _handler_observe_on =HandlerThread::new_with_mutex();letmut _handler_subscribe_on =HandlerThread::new_with_mutex();let monadio_async =MonadIO::new_with_handlers(    ||{println!("In string");String::from("ok")},Some(_handler_observe_on.clone()),Some(_handler_subscribe_on.clone()),);let latch =CountDownLatch::new(1);let latch2 = latch.clone();thread::sleep(time::Duration::from_millis(1));let subscription =Arc::new(SubscriptionFunc::new(move |x:Arc<String>|{println!("monadio_async {:?}", x);// monadio_async ok    latch2.countdown();// Unlock here}));monadio_async.subscribe(subscription);monadio_async.subscribe(Arc::new(SubscriptionFunc::new(move |x:Arc<String>|{println!("monadio_async sub2 {:?}", x);// monadio_async sub2 ok})));{letmut handler_observe_on = _handler_observe_on.lock().unwrap();letmut handler_subscribe_on = _handler_subscribe_on.lock().unwrap();println!("hh2");    handler_observe_on.start();    handler_subscribe_on.start();println!("hh2 running");    handler_observe_on.post(RawFunc::new(move ||{}));    handler_observe_on.post(RawFunc::new(move ||{}));    handler_observe_on.post(RawFunc::new(move ||{}));    handler_observe_on.post(RawFunc::new(move ||{}));    handler_observe_on.post(RawFunc::new(move ||{}));}thread::sleep(time::Duration::from_millis(1));// Waiting for being unlcokedlatch.clone().wait();

Publisher (PubSub-like)

Example:

externcrate fp_rust;use fp_rust::common::{SubscriptionFunc,RawFunc};use fp_rust::handler::{Handler,HandlerThread};use fp_rust::publisher::Publisher;use std::sync::Arc;use fp_rust::sync::CountDownLatch;letmut pub1 =Publisher::new();pub1.subscribe_fn(|x:Arc<u16>|{println!("pub1 {:?}", x);assert_eq!(9,*Arc::make_mut(&mut x.clone()));});pub1.publish(9);letmut _h =HandlerThread::new_with_mutex();letmut pub2 =Publisher::new_with_handlers(Some(_h.clone()));let latch =CountDownLatch::new(1);let latch2 = latch.clone();let s =Arc::new(SubscriptionFunc::new(move |x:Arc<String>|{println!("pub2-s1 I got {:?}", x);    latch2.countdown();}));pub2.subscribe(s.clone());pub2.map(move |x:Arc<String>|{println!("pub2-s2 I got {:?}", x);});{let h =&mut _h.lock().unwrap();println!("hh2");    h.start();println!("hh2 running");    h.post(RawFunc::new(move ||{}));    h.post(RawFunc::new(move ||{}));    h.post(RawFunc::new(move ||{}));    h.post(RawFunc::new(move ||{}));    h.post(RawFunc::new(move ||{}));}pub2.publish(String::from("OKOK"));pub2.publish(String::from("OKOK2"));pub2.unsubscribe(s.clone());pub2.publish(String::from("OKOK3"));latch.clone().wait();

Cor (PythonicGenerator-like)

Example:

#[macro_use]externcrate fp_rust;use std::time;use std::thread;use fp_rust::cor::Cor;println!("test_cor_new");let _cor1 =cor_newmutex!(    |this|{        println!("cor1 started");let s = cor_yield!(this,Some(String::from("given_to_outside")));        println!("cor1 {:?}", s);},String,i16);let cor1 = _cor1.clone();let _cor2 =cor_newmutex!(    move |this|{        println!("cor2 started");        println!("cor2 yield_from before");let s = cor_yield_from!(this, cor1,Some(3));        println!("cor2 {:?}", s);},i16,i16);{let cor1 = _cor1.clone();    cor1.lock().unwrap().set_async(true);// NOTE Cor default async// NOTE cor1 should keep async to avoid deadlock waiting.(waiting for each other)}{let cor2 = _cor2.clone();    cor2.lock().unwrap().set_async(false);// NOTE cor2 is the entry point, so it could be sync without any deadlock.}cor_start!(_cor1);cor_start!(_cor2);thread::sleep(time::Duration::from_millis(1));

Do Notation (Haskell DoNotation-like)

Example:

#[macro_use]externcrate fp_rust;use std::time;use std::thread;use fp_rust::cor::Cor;let v =Arc::new(Mutex::new(String::from("")));let _v = v.clone();do_m!(move |this|{    println!("test_cor_do_m started");let cor_inner1 = cor_newmutex_and_start!(        |this|{let s = cor_yield!(this,Some(String::from("1")));            println!("cor_inner1 {:?}", s);},String,i16);let cor_inner2 = cor_newmutex_and_start!(        |this|{let s = cor_yield!(this,Some(String::from("2")));            println!("cor_inner2 {:?}", s);},String,i16);let cor_inner3 = cor_newmutex_and_start!(        |this|{let s = cor_yield!(this,Some(String::from("3")));            println!("cor_inner3 {:?}", s);},String,i16);{(*_v.lock().unwrap()) =[            cor_yield_from!(this, cor_inner1,Some(1)).unwrap(),            cor_yield_from!(this, cor_inner2,Some(2)).unwrap(),            cor_yield_from!(this, cor_inner3,Some(3)).unwrap(),].join("");}});let _v = v.clone();{assert_eq!("123",*_v.lock().unwrap());}

Fp Functions (Compose, Pipe, Map, Reduce, Filter)

Example:

#[macro_use]externcrate fp_rustuse fp_rust::fp::{  compose_two,  map, reduce, filter,};let add = |x| x +2;let multiply = |x| x*3;let divide = |x| x /2;let result =(compose!(add, multiply, divide))(10);assert_eq!(17, result);println!("Composed FnOnce Result is {}", result);let result =(pipe!(add, multiply, divide))(10);assert_eq!(18, result);println!("Piped FnOnce Result is {}", result);let result =(compose!(reduce!(|a, b| a* b), filter!(|x|*x <6), map!(|x| x*2)))(vec![1,2,3,4]);assert_eq!(Some(8), result);println!("test_map_reduce_filter Result is {:?}", result);

Actor

Actor common(send/receive/spawn/states)

Example:

use std::time::Duration;use fp_rust::common::LinkedListAsync;#[derive(Clone,Debug)]enumValue{// Str(String),Int(i32),VecStr(Vec<String>),Spawn,Shutdown,}let result_i32 =LinkedListAsync::<i32>::new();let result_i32_thread = result_i32.clone();let result_string =LinkedListAsync::<Vec<String>>::new();let result_string_thread = result_string.clone();letmut root =ActorAsync::new(move |this:&mutActorAsync<_,_>,msg:Value,context:&mutHashMap<String,Value>|{match msg{Value::Spawn =>{println!("Actor Spawn");let result_i32_thread = result_i32_thread.clone();let spawned = this.spawn_with_handle(Box::new(move |this:&mutActorAsync<_,_>,msg:Value, _|{match msg{Value::Int(v) =>{println!("Actor Child Int");                                result_i32_thread.push_back(v*10);}Value::Shutdown =>{println!("Actor Child Shutdown");                                this.stop();}                            _ =>{}};},));let list = context.get("children_ids").cloned();letmut list =match list{Some(Value::VecStr(list)) => list,                    _ =>Vec::new(),};                list.push(spawned.get_id());                context.insert("children_ids".into(),Value::VecStr(list));}Value::Shutdown =>{println!("Actor Shutdown");ifletSome(Value::VecStr(ids)) = context.get("children_ids"){                    result_string_thread.push_back(ids.clone());}                this.for_each_child(move |id, handle|{println!("Actor Shutdown id {:?}", id);                    handle.send(Value::Shutdown);});                this.stop();}Value::Int(v) =>{println!("Actor Int");ifletSome(Value::VecStr(ids)) = context.get("children_ids"){for idin ids{println!("Actor Int id {:?}", id);ifletSome(mut handle) = this.get_handle_child(id){                            handle.send(Value::Int(v));}}}}            _ =>{}}},);letmut root_handle = root.get_handle();root.start();// One childroot_handle.send(Value::Spawn);root_handle.send(Value::Int(10));// Two childrenroot_handle.send(Value::Spawn);root_handle.send(Value::Int(20));// Three childrenroot_handle.send(Value::Spawn);root_handle.send(Value::Int(30));// Send Shutdownroot_handle.send(Value::Shutdown);thread::sleep(Duration::from_millis(1));// 3 children Actorsassert_eq!(3, result_string.pop_front().unwrap().len());letmut v =Vec::<Option<i32>>::new();for _in1..7{let i = result_i32.pop_front();println!("Actor {:?}", i);    v.push(i);}v.sort();assert_eq!([Some(100),Some(200),Some(200),Some(300),Some(300),Some(300)],    v.as_slice())

Actor Ask (inspired by Akka/Erlang)

Example:

use std::time::Duration;use fp_rust::common::LinkedListAsync;#[derive(Clone,Debug)]enumValue{AskIntByLinkedListAsync((i32,LinkedListAsync<i32>)),AskIntByBlockingQueue((i32,BlockingQueue<i32>)),}letmut root =ActorAsync::new(move |_:&mutActorAsync<_,_>,msg:Value, _:&mutHashMap<String,Value>|match msg{Value::AskIntByLinkedListAsync(v) =>{println!("Actor AskIntByLinkedListAsync");            v.1.push_back(v.0*10);}Value::AskIntByBlockingQueue(mut v) =>{println!("Actor AskIntByBlockingQueue");// NOTE If negative, hanging for testing timeoutif v.0 <0{return;}// NOTE General Cases            v.1.offer(v.0*10);}// _ => {}},);letmut root_handle = root.get_handle();root.start();// LinkedListAsync<i32>let result_i32 =LinkedListAsync::<i32>::new();root_handle.send(Value::AskIntByLinkedListAsync((1, result_i32.clone())));root_handle.send(Value::AskIntByLinkedListAsync((2, result_i32.clone())));root_handle.send(Value::AskIntByLinkedListAsync((3, result_i32.clone())));thread::sleep(Duration::from_millis(1));let i = result_i32.pop_front();assert_eq!(Some(10), i);let i = result_i32.pop_front();assert_eq!(Some(20), i);let i = result_i32.pop_front();assert_eq!(Some(30), i);// BlockingQueue<i32>letmut result_i32 =BlockingQueue::<i32>::new();result_i32.timeout =Some(Duration::from_millis(1));root_handle.send(Value::AskIntByBlockingQueue((4, result_i32.clone())));root_handle.send(Value::AskIntByBlockingQueue((5, result_i32.clone())));root_handle.send(Value::AskIntByBlockingQueue((6, result_i32.clone())));thread::sleep(Duration::from_millis(1));let i = result_i32.take();assert_eq!(Some(40), i);let i = result_i32.take();assert_eq!(Some(50), i);let i = result_i32.take();assert_eq!(Some(60), i);// Timeout case:root_handle.send(Value::AskIntByBlockingQueue((-1, result_i32.clone())));let i = result_i32.take();assert_eq!(None, i);

[8]ページ先頭

©2009-2025 Movatter.jp