Skip to content

Instantly share code, notes, and snippets.

@pka
Last active January 13, 2019 05:01
Show Gist options
  • Save pka/d5e96951f54dde1d376a27151191ff4e to your computer and use it in GitHub Desktop.
Save pka/d5e96951f54dde1d376a27151191ff4e to your computer and use it in GitHub Desktop.
extern crate actix;
extern crate futures;
use actix::dev::{MessageResponse, ResponseChannel};
use actix::*;
use futures::{future, Future};
use std::env;
// Message and Response data type
#[derive(Debug)]
struct Values(Vec<i64>);
impl<A, M> MessageResponse<A, M> for Values
where
A: Actor,
M: Message<Result = Values>,
{
fn handle<R: ResponseChannel<M>>(self, _: &mut A::Context, tx: Option<R>) {
if let Some(tx) = tx {
tx.send(self);
}
}
}
// this is our Message
struct ApplyFilter(Values);
// we have to define the response type for `ApplyFilter` message
impl Message for ApplyFilter {
type Result = Values;
}
// Actor definition
struct Filter1;
impl Actor for Filter1 {
type Context = Context<Self>;
}
// now we need to define `MessageHandler` for the `ApplyFilter` message.
impl Handler<ApplyFilter> for Filter1 {
type Result = Values; // <- Message response type
fn handle(&mut self, msg: ApplyFilter, _ctx: &mut Context<Self>) -> Self::Result {
Values((msg.0).0.into_iter().filter(|&x| x % 2 == 0).collect())
}
}
struct Filter2;
impl Actor for Filter2 {
type Context = Context<Self>;
}
impl Handler<ApplyFilter> for Filter2 {
type Result = Values;
fn handle(&mut self, msg: ApplyFilter, _ctx: &mut Context<Self>) -> Self::Result {
Values((msg.0).0.into_iter().filter(|&x| x % 3 == 0).collect())
}
}
/* HOW CAN WE CREATE OR START A CONFIGURABLE ACTOR?
fn filter_from_config(config: &str) -> impl Actor {
match config {
"filter1" => Filter1,
"filter2" => Filter2,
_ => Filter1,
}
}
fn userfilter(config: &str) {
let sys = System::new("userfilter");
let addr = filter_from_config(config).start();
let res = addr.send(ApplyFilter(Values((0..10).collect())));
Arbiter::spawn(res.then(|res| {
match res {
Ok(result) => println!("Result: {:?}", result.0),
_ => println!("Something wrong"),
}
System::current().stop();
future::result(Ok(()))
}));
sys.run();
}
*/
fn filterchain() {
let sys = System::new("filterchain");
let addr1 = Filter1.start();
let addr2 = Filter2.start();
let res1 = addr1.send(ApplyFilter(Values((0..10).collect()))); // <- send message and get future for result
Arbiter::spawn(
res1.and_then(move |res| addr2.send(ApplyFilter(res)))
.then(|res| {
match res {
Ok(result) => println!("Result: {:?}", result.0),
_ => println!("Something wrong"),
}
System::current().stop();
future::result(Ok(()))
}),
);
sys.run();
}
fn main() {
filterchain();
// let config = env::args().nth(1).expect("filter missing");
// userfilter(&config);
}
@pka
Copy link
Author

pka commented Dec 30, 2018

extern crate actix;
extern crate futures;

use actix::dev::{MessageResponse, ResponseChannel};
use actix::*;
use futures::{future, Future};
use std::env;

// Message and Response data type
#[derive(Debug)]
struct Values(Vec<i64>);

impl<A, M> MessageResponse<A, M> for Values
where
    A: Actor,
    M: Message<Result = Values>,
{
    fn handle<R: ResponseChannel<M>>(self, _: &mut A::Context, tx: Option<R>) {
        if let Some(tx) = tx {
            tx.send(self);
        }
    }
}

// this is our Message
struct ApplyFilter(Values);

// we have to define the response type for `ApplyFilter` message
impl Message for ApplyFilter {
    type Result = Values;
}

// Actor definition
struct Filter1;

impl Actor for Filter1 {
    type Context = Context<Self>;
}

// now we need to define `MessageHandler` for the `ApplyFilter` message.
impl Handler<ApplyFilter> for Filter1 {
    type Result = Values; // <- Message response type

    fn handle(&mut self, msg: ApplyFilter, _ctx: &mut Context<Self>) -> Self::Result {
        Values((msg.0).0.into_iter().filter(|&x| x % 2 == 0).collect())
    }
}

struct Filter2;

impl Actor for Filter2 {
    type Context = Context<Self>;
}

impl Handler<ApplyFilter> for Filter2 {
    type Result = Values;

    fn handle(&mut self, msg: ApplyFilter, _ctx: &mut Context<Self>) -> Self::Result {
        Values((msg.0).0.into_iter().filter(|&x| x % 3 == 0).collect())
    }
}

fn filter_from_config(config: &str) -> Recipient<ApplyFilter> {
    match config {
        "filter1" => Filter1.start().recipient(),
        "filter2" => Filter2.start().recipient(),
        _ => Filter1.start().recipient(),
    }
}

fn userfilter(config: &str) {
    let sys = System::new("userfilter");

    let addr = filter_from_config(config);
    let res = addr.send(ApplyFilter(Values((0..10).collect())));

    Arbiter::spawn(res.then(|res| {
        match res {
            Ok(result) => println!("Result: {:?}", result.0),
            _ => println!("Something wrong"),
        }

        System::current().stop();
        future::result(Ok(()))
    }));

    sys.run();
}

fn filterchain() {
    let sys = System::new("filterchain");

    let addr1 = Filter1.start();
    let addr2 = Filter2.start();
    let res1 = addr1.send(ApplyFilter(Values((0..10).collect()))); // <- send message and get future for result

    Arbiter::spawn(
        res1.and_then(move |res| addr2.send(ApplyFilter(res)))
            .then(|res| {
                match res {
                    Ok(result) => println!("Result: {:?}", result.0),
                    _ => println!("Something wrong"),
                }

                System::current().stop();
                future::result(Ok(()))
            }),
    );

    sys.run();
}

fn main() {
    filterchain();
    let config = env::args().nth(1).expect("filter missing");
    userfilter(&config);
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment