1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
use std::sync::{Arc, Mutex, RwLock};
use std::sync::mpsc::{channel, Receiver, Sender, TryRecvError};
use std::thread;

use actors::{ActorPath, ActorRef, Message, Props};
use actors::actor_cell::{ActorCell, SystemMessage};
use actors::cthulhu::Cthulhu;
use actors::future::{Future, FutureExtractor};
use actors::name_resolver::NameResolver;
use actors::props::ActorFactory;
use actors::root_actor::RootActor;

/// This is failsafe used to relaunch consumer threads if they panic!.
struct Relauncher {
    actor_system: ActorSystem,
    active: bool,
}

impl Relauncher {
    fn new(actor_system: ActorSystem) -> Relauncher {
        Relauncher {
            actor_system: actor_system,
            active: true,
        }
    }

    fn cancel(mut self) {
        self.active = false;
    }
}

impl Drop for Relauncher {
    fn drop(&mut self) {
        if self.active {
            self.actor_system.spawn_thread();
        }
    }
}

/// The actor system is the struct that manages:
///
///   * The creation of the root actors.
///   * The consumer threads.
///   * Scheduling the actors.
///
/// It needs to be instantiated once at the beggining of the application. Then we need to specify
/// the number of consumer threads that will be allocated.
///
/// Calling `shutdown`, will drop all the actors and terminate the consumer threads.
/// Note that it will shut down the system even if some actors have still messages to handle.
pub struct ActorSystem {
    inner: Arc<InnerActorSystem>,
}

impl ActorSystem {
    /// Creates a new ActorSystem.
    ///
    /// Note that one thread is started.
    pub fn new(name: String) -> ActorSystem {
        let actor_system = ActorSystem { inner: Arc::new(InnerActorSystem::new(name)) };
        let cthulhu = Cthulhu::new(actor_system.clone());
        let cthulhu = ActorRef::with_cthulhu(cthulhu);
        info!("Created cthulhu");
        *actor_system.inner.cthulhu.write().unwrap() = Some(cthulhu.clone());
        let user_actor_path = ActorPath::new_local("/user".to_owned());
        let user_actor_cell = ActorCell::new(Props::new(Arc::new(RootActor::new), ()),
                                                actor_system.clone(),
                                                cthulhu.clone(),
                                                user_actor_path.clone());
        let user_actor = ActorRef::with_cell(user_actor_cell, user_actor_path);
        user_actor.receive_system_message(SystemMessage::Start);
        info!("Created /user actor");
        *actor_system.inner.user_actor.write().unwrap() = Some(user_actor);
        let system_actor_path = ActorPath::new_local("/system".to_owned());
        let system_actor_cell = ActorCell::new(Props::new(Arc::new(RootActor::new), ()),
                                                actor_system.clone(),
                                                cthulhu.clone(),
                                                system_actor_path.clone());
        let system_actor = ActorRef::with_cell(system_actor_cell, system_actor_path);
        system_actor.receive_system_message(SystemMessage::Start);
        info!("Created /system actor");
        *actor_system.inner.system_actor.write().unwrap() = Some(system_actor);
        actor_system.spawn_threads(1);
        info!("Launched the first thread");
        let name_resolver = actor_system.system_actor_of(Props::new(Arc::new(NameResolver::new), ()), "name_resolver".to_owned());
        info!("Created the /system/name_resolver actor");
        *actor_system.inner.name_resolver.write().unwrap() = Some(name_resolver);
        actor_system
    }

    /// Spawns an Actor created using the Props given for the user.
    pub fn actor_of(&self, props: Arc<ActorFactory>, name: String) -> ActorRef {
        self.inner.actor_of(props, name)
    }

    /// Spawns an Actor created using the Props given for the system.
    pub fn system_actor_of(&self, props: Arc<ActorFactory>, name: String) -> ActorRef {
        self.inner.system_actor_of(props, name)
    }

    /// Shuts the actor system down.
    ///
    /// It will terminate all the actors (whether they still have messages to handle or not) and
    /// then terminate the consumer threads.
    pub fn shutdown(&self) {
        self.inner.shutdown();
    }

    /// Enqueues the given ActorRef in the queue of ActorRef with message to handle.
    pub fn enqueue_actor(&self, actor_ref: ActorRef) {
        self.inner.enqueue_actor(actor_ref);
    }

    /// Spawns a thread that will have ActorRef handle their messages.
    ///
    /// This thread can be terminated by calling `terminate_thread`.
    pub fn spawn_thread(&self) {
        let actors_queue = self.inner.actors_queue_receiver.clone();
        let rx = self.inner.consumer_threads_receiver.clone();
        let actor_system = self.clone();
        let _ = thread::spawn(move || {
            // This is a failsafe used to relaunch a consumer thread if it panic!
            let relauncher = Relauncher::new(actor_system.clone());
            loop {
                // We check if we received a termination request.
                match rx.lock().unwrap().try_recv() {
                    Ok(_) | Err(TryRecvError::Disconnected) => {
                        relauncher.cancel();
                        break;
                    }
                    Err(TryRecvError::Empty) => {}
                };

                // We try to get an ActorRef with a message to handle.
                let actor_ref = {
                    let lock = actors_queue.lock().unwrap();
                    lock.try_recv()
                };

                match actor_ref {
                    Ok(actor_ref) => actor_ref.handle(),
                    Err(TryRecvError::Empty) => continue,
                    Err(TryRecvError::Disconnected) => {
                        relauncher.cancel();
                        actor_system.shutdown();
                        panic!("The actors queue failed, something is very wrong");
                    }
                }
            }
        });
        *self.inner.n_threads.lock().unwrap() += 1;
    }

    /// Kills a consumer thread.
    pub fn terminate_thread(&self) {
        self.inner.terminate_thread();
    }

    /// Spawns n consumer threads.
    pub fn spawn_threads(&self, n: u32) {
        for _ in 0..n {
            self.spawn_thread();
        }
    }

    /// Kills n consumer threads.
    pub fn terminate_threads(&self, n: u32) {
        self.inner.terminate_threads(n);
    }

    /// Gives the ActorRef of the name resolver actor.
    pub fn name_resolver(&self) -> ActorRef {
        match self.inner.name_resolver.read().unwrap().as_ref() {
            None => panic!("The name resolver is not initialized."),
            Some(resolver) => resolver.clone(),
        }
    }

    /// Sends a message to the given actor.
    ///
    /// The sender of the message is the user_actor, thus this expects that no answer will be
    /// given.
    pub fn tell<M: Message>(&self, to: ActorRef, message: M) {
        match self.inner.user_actor.read().unwrap().as_ref() {
            Some(user_actor) => user_actor.tell_to(to, message),
            None => unreachable!(),
        }
    }

    /// Creates a Future that will send the message to the targetted actor.
    ///
    /// The father of this Future is the user_actor.
    pub fn ask<M: Message>(&self, to: ActorRef, message: M, name: String) -> ActorRef {
        let future = self.actor_of(Props::new(Arc::new(Future::new), ()), name);
        future.tell_to(to, message);
        future
    }

    /// Extracts the result from a Future.
    ///
    /// This is not supposed to be used a lot as this is a synchronous call (if an actor wants to
    /// get the result of a fututure it should use forward_result instead).
    ///
    /// The extraction creates an Extractor actor whose father is the user_actor.
    pub fn extract_result<M: Message>(&self, future: ActorRef) -> M {
        // NOTE: this creates a lot of things but this is not meant to be used outside of
        // tests or examples so this is fine by my book.
        let (tx, rx) = channel();
        let _extractor = self.actor_of(Props::new(Arc::new(FutureExtractor::new), (future, Arc::new(Mutex::new(tx)))), "extractor".to_owned());
        rx.recv().unwrap()
    }
}

impl Clone for ActorSystem {
    fn clone(&self) -> ActorSystem {
        ActorSystem { inner: self.inner.clone() }
    }
}

struct InnerActorSystem {
    _name: String,
    // Communication channels to the co,sumer threads.
    consumer_threads_sender: Mutex<Sender<()>>,
    consumer_threads_receiver: Arc<Mutex<Receiver<()>>>,
    n_threads: Mutex<u32>,
    // Sends ActorRefs to be handled on that channel.
    actors_queue_sender: Mutex<Sender<ActorRef>>,
    // Receiving end to give to the thread pool.
    actors_queue_receiver: Arc<Mutex<Receiver<ActorRef>>>,
    cthulhu: RwLock<Option<ActorRef >>,
    user_actor: RwLock<Option<ActorRef>>,
    system_actor: RwLock<Option<ActorRef>>,
    // ActorRef to the name resolver.
    name_resolver: RwLock<Option<ActorRef>>,
}

impl InnerActorSystem {
    fn new(name: String) -> InnerActorSystem {
        let (tx_queue, rx_queue) = channel();
        let (tx_thread, rx_thread) = channel();
        InnerActorSystem {
            _name: name,
            consumer_threads_sender: Mutex::new(tx_thread),
            consumer_threads_receiver: Arc::new(Mutex::new(rx_thread)),
            n_threads: Mutex::new(0u32),
            actors_queue_sender: Mutex::new(tx_queue),
            actors_queue_receiver: Arc::new(Mutex::new(rx_queue)),
            cthulhu: RwLock::new(None),
            user_actor: RwLock::new(None),
            system_actor: RwLock::new(None),
            name_resolver: RwLock::new(None),
        }
    }

    /// Spawns an Actor for the user with the given ActorFactory.
    ///
    /// This will be part of the user cator hierarchy.
    fn actor_of(&self, props: Arc<ActorFactory>, name: String) -> ActorRef {
        // Not having the user actor in a Mutex is ok because the actor_of function already has
        // mutual exclusion, so we are in the clear.
        match self.user_actor.read().unwrap().clone() {
            Some(user_actor) => {
                // NOTE: this creates a lot of things but this is not meant to be used outside of
                // the initialisation of the system so this is fine by my book.
                let (tx, rx) = channel::<Result<ActorRef, &'static str>>();
                info!("Created the channel to get an ActorRef from a root actor");
                self.cthulhu.read().unwrap().as_ref().unwrap().tell_to(user_actor, (props, name, Arc::new(Mutex::new(tx))));
                rx.recv().unwrap().unwrap()
            },
            None => panic!("The user actor is not initialised"),
        }
    }

    fn system_actor_of(&self, props: Arc<ActorFactory>, name: String) -> ActorRef {
        // Not having the user actor in a Mutex is ok because the actor_of function already has
        // mutual exclusion, so we are in the clear.
        match self.system_actor.read().unwrap().clone() {
            Some(system_actor) => {
                // NOTE: this creates a lot of things but this is not meant to be used outside of
                // the initialisation of the system so this is fine by my book.
                let (tx, rx) = channel::<Result<ActorRef, &'static str>>();
                info!("Created the channel to get an ActorRef from a root actor");
                self.cthulhu.read().unwrap().as_ref().unwrap().tell_to(system_actor, (props, name, Arc::new(Mutex::new(tx))));
                rx.recv().unwrap().unwrap()
            },
            None => panic!("The system actor is not initialised"),
        }
    }

    /// Shuts the actor system down.
    fn shutdown(&self) {
        // We have to get this out of the mutex, because terminate_threads would deadlock on
        // n_thread.
        let n = {*self.n_threads.lock().unwrap()};
        self.terminate_threads(n);
        *self.user_actor.write().unwrap() = None;
        *self.system_actor.write().unwrap() = None;
        *self.cthulhu.write().unwrap() = None;
    }

    /// Enqueues the given ActorRef in the list of ActorRef with messages to be handled.
    fn enqueue_actor(&self, actor_ref: ActorRef) {
        match self.actors_queue_sender.lock().unwrap().send(actor_ref) {
            Ok(_) => return,
            Err(_) => {
                self.shutdown();
                panic!("The communication channel for messages is disconnected, this is bad!");
            }
        }
    }

    /// Kills a consumer thread.
    fn terminate_thread(&self) {
        let _ = self.consumer_threads_sender.lock().unwrap().send(());
        *self.n_threads.lock().unwrap() -= 1;
    }

    /// Kills n consumer threads.
    fn terminate_threads(&self, n: u32) {
        for _ in 0..n {
            self.terminate_thread();
        }
    }
}

impl Drop for InnerActorSystem {
    fn drop(&mut self) { }
}