From 77a7f1229ceb572f63040f73e9c53488901aa88f Mon Sep 17 00:00:00 2001 From: Julien CLEMENT Date: Fri, 6 Jan 2023 18:42:12 +0100 Subject: [PATCH] add basic yield executor Signed-off-by: Julien CLEMENT --- src/syscalls/mod.rs | 19 ++---- src/syscalls/proc.rs | 5 +- src/task/mod.rs | 1 + src/task/yield_executor.rs | 119 +++++++++++++++++++++++++++++++++++++ 4 files changed, 127 insertions(+), 17 deletions(-) create mode 100644 src/task/yield_executor.rs diff --git a/src/syscalls/mod.rs b/src/syscalls/mod.rs index b5dd281..4449d58 100644 --- a/src/syscalls/mod.rs +++ b/src/syscalls/mod.rs @@ -1,7 +1,7 @@ use crate::println; use crate::proc::thread::{resume_k_thread, RUNNING_THREAD}; use crate::proc::scheduler::SCHEDULER; -use crate::task::executor::EXECUTOR; +use crate::task::yield_executor::YieldExecutor; use crate::task::Task; pub use ids::*; @@ -25,8 +25,6 @@ impl SyscallContext { pub async fn run(&mut self) { println!("Running async syscall runner for {:?}", self.id); self.dispatch().await; - println!("Syscall {:?} end, unblocking thread", self.id); - SCHEDULER.lock().await.unblock(self.thread_id); } pub async fn dispatch(&mut self) { @@ -51,18 +49,9 @@ pub fn syscall_routine(syscall_id: SyscallId) -> u64 { })); println!("Spawning async syscall runner"); - EXECUTOR - .try_lock() - .unwrap() - .spawn(Task::new(syscall_runner(context.clone()))); - - println!("Blocking thread"); - SCHEDULER - .try_lock() - .unwrap() - .block(context.borrow().thread_id); - println!("Returning to scheduler"); - resume_k_thread(); + let mut executor = YieldExecutor::new(context.borrow().thread_id); + executor.spawn(Task::new(syscall_runner(context.clone()))); + executor.run(); let res = context.borrow().res; res diff --git a/src/syscalls/proc.rs b/src/syscalls/proc.rs index 4148585..b594043 100644 --- a/src/syscalls/proc.rs +++ b/src/syscalls/proc.rs @@ -1,10 +1,11 @@ use crate::println; use crate::proc::scheduler::SCHEDULER; +use crate::proc::thread::resume_k_thread; use super::SyscallContext; pub async fn exit(context: &SyscallContext) { println!("Running exit(2)"); - let mut scheduler = SCHEDULER.lock().await; - scheduler.exit(context.thread_id); + SCHEDULER.lock().await.exit(context.thread_id); + resume_k_thread(); } diff --git a/src/task/mod.rs b/src/task/mod.rs index 078e6b8..f66a11e 100644 --- a/src/task/mod.rs +++ b/src/task/mod.rs @@ -5,6 +5,7 @@ use core::sync::atomic::{AtomicU64, Ordering}; use core::task::{Context, Poll}; pub mod executor; +pub mod yield_executor; pub mod keyboard; #[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)] diff --git a/src/task/yield_executor.rs b/src/task/yield_executor.rs new file mode 100644 index 0000000..6ad1d25 --- /dev/null +++ b/src/task/yield_executor.rs @@ -0,0 +1,119 @@ + +use crate::println; +use crate::proc::thread::{ThreadId, resume_k_thread}; +use crate::proc::scheduler::SCHEDULER; + +use super::{Task, TaskId}; + +use alloc::{collections::BTreeMap, sync::Arc, task::Wake}; +use core::task::{Context, Poll, Waker}; +use crossbeam_queue::ArrayQueue; + +pub struct YieldExecutor { + tasks: BTreeMap, + task_queue: Arc>, + waker_cache: BTreeMap, + thread_id: ThreadId, +} + +impl YieldExecutor { + pub fn new(thread_id: ThreadId) -> Self { + YieldExecutor { + tasks: BTreeMap::new(), + task_queue: Arc::new(ArrayQueue::new(100)), + waker_cache: BTreeMap::new(), + thread_id: thread_id, + } + } + + pub fn run(&mut self) { + loop { + self.run_ready_tasks(); + if self.done() { + break; + } else { + println!("Blocking thread"); + SCHEDULER + .try_lock() + .unwrap() + .block(self.thread_id); + println!("Returning to scheduler"); + resume_k_thread(); + } + } + } + + fn done(&self) -> bool { + self.tasks.is_empty() + } + + pub fn spawn(&mut self, task: Task) { + let task_id = task.id; + if self.tasks.insert(task.id, task).is_some() { + panic!("Duplicate task ID"); + } + self.task_queue.push(task_id).expect("Task queue full"); + } + + fn run_ready_tasks(&mut self) { + let Self { + tasks, + task_queue, + waker_cache, + thread_id, + } = self; // Executor destructuring + + while let Ok(task_id) = task_queue.pop() { + let task = match tasks.get_mut(&task_id) { + Some(task) => task, + None => continue, // Task does not exist anymore + }; + let waker = waker_cache + .entry(task_id) + .or_insert_with(|| YieldTaskWaker::new(task_id, task_queue.clone(), *thread_id)); + let mut context = Context::from_waker(waker); + match task.poll(&mut context) { + Poll::Ready(()) => { + // task is done + tasks.remove(&task_id); + waker_cache.remove(&task_id); + } + Poll::Pending => {} + } + } + } +} + +struct YieldTaskWaker { + task_id: TaskId, + task_queue: Arc>, + thread_id: ThreadId, +} + +impl YieldTaskWaker { + fn new(task_id: TaskId, task_queue: Arc>, thread_id: ThreadId) -> Waker { + Waker::from(Arc::new(YieldTaskWaker { + task_id, + task_queue, + thread_id + })) + } + + fn wake_task(&self) { + self.task_queue.push(self.task_id).expect("Task queue full"); + SCHEDULER + .try_lock() + .unwrap() + .unblock(self.thread_id); + } +} + +impl Wake for YieldTaskWaker { + fn wake(self: Arc) { + self.wake_task(); + } + + fn wake_by_ref(self: &Arc) { + self.wake_task(); + } +}