diff --git a/src/lib.rs b/src/lib.rs index c66bd00..da0d762 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -65,6 +65,7 @@ pub extern "C" fn julios_main(multiboot_info_addr: usize) -> ! { executor.spawn(Task::new(drivers::atapi::init())); executor.spawn(Task::new(keyboard::print_keypresses())); executor.spawn(Task::new(get_file())); + executor.spawn(Task::new(proc::scheduler::scheduler_run())); executor.run(); } @@ -89,14 +90,10 @@ async fn get_file() { fd.borrow_mut().close().await; let thread = Arc::new(RefCell::new(proc::thread::Thread::new( - proc::thread::exit as u64, + proc::thread::routine as u64, ))); proc::scheduler::SCHEDULER .lock() .await .register(thread.clone()); - - unsafe { - (&mut*thread.as_ptr()).run(); - } } diff --git a/src/proc/scheduler/mod.rs b/src/proc/scheduler/mod.rs index d621167..17e4084 100644 --- a/src/proc/scheduler/mod.rs +++ b/src/proc/scheduler/mod.rs @@ -4,7 +4,13 @@ use super::thread::{Thread, ThreadId}; use alloc::{collections::BTreeMap, sync::Arc}; use core::cell::RefCell; +use core::{ + pin::Pin, + task::{Context, Poll}, +}; use crossbeam_queue::ArrayQueue; +use futures_util::stream::{Stream, StreamExt}; +use futures_util::task::AtomicWaker; use lazy_static::lazy_static; lazy_static! { @@ -13,16 +19,50 @@ lazy_static! { pub type Threadt = Arc>; +struct ThreadStream { + ids: ArrayQueue, + waker: AtomicWaker +} + +impl ThreadStream { + pub fn new() -> Self { + ThreadStream { + ids: ArrayQueue::new(100), + waker: AtomicWaker::new(), + } + } +} + +impl Stream for ThreadStream { + type Item = ThreadId; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + if let Ok(id) = self.ids.pop() { + return Poll::Ready(Some(id)); + } + + self.waker.register(&cx.waker()); + + match self.ids.pop() { + Ok(id) => { + self.waker.take(); + Poll::Ready(Some(id)) + } + Err(crossbeam_queue::PopError) => Poll::Pending, + } + } +} + pub struct Scheduler { pub threads: BTreeMap, - thread_queue: Arc>, + thread_queue: ThreadStream, } impl Scheduler { pub fn new() -> Self { let mut res = Scheduler { threads: BTreeMap::new(), - thread_queue: Arc::new(ArrayQueue::new(100)), + thread_queue: ThreadStream::new(), }; let k_thread: Thread = Thread { id: ThreadId(0), @@ -34,16 +74,13 @@ impl Scheduler { res } - pub fn schedule(&mut self) -> Option { - if let Ok(thread_id) = self.thread_queue.pop() { - self.thread_queue.push(thread_id); - let thread = match self.threads.get_mut(&thread_id) { - Some(thread) => thread, - None => return None, - }; - Some(thread.clone()) - } else { - None + + pub async fn run(&mut self) { + while let Some(id) = self.thread_queue.next().await { + let thread = self.get_thread(id).unwrap(); + unsafe { + (&mut*thread.as_ptr()).run(); + } } } @@ -53,8 +90,10 @@ impl Scheduler { panic!("Duplicate thread ID") } self.thread_queue + .ids .push(thread_id) .expect("Thread queue full"); + self.thread_queue.waker.wake(); } pub fn get_thread(&mut self, id: ThreadId) -> Option { @@ -65,3 +104,9 @@ impl Scheduler { } } } + +pub async fn scheduler_run() { + let mut scheduler = SCHEDULER.lock().await; + SCHEDULER.force_unlock(); + scheduler.run().await; +} \ No newline at end of file diff --git a/src/proc/thread/mod.rs b/src/proc/thread/mod.rs index 6d2570d..60f513b 100644 --- a/src/proc/thread/mod.rs +++ b/src/proc/thread/mod.rs @@ -27,20 +27,25 @@ impl ThreadId { pub fn exit() { println!("Exiting thread"); - let thread: *mut Thread; + let k_thread: *mut Thread; { let mut scheduler = SCHEDULER.try_lock().unwrap(); - thread = scheduler + k_thread = scheduler .get_thread(ThreadId(0)) .unwrap() .as_ptr(); } // Drop scheduler mutex guard unsafe { - (&mut* thread).run(); + (&mut* k_thread).run(); } } +pub fn routine() { + println!("Routine executed"); + exit(); +} + pub struct Thread { pub id: ThreadId, pub entry_point: u64, @@ -73,11 +78,12 @@ impl Thread { ); let mut scheduler = SCHEDULER.try_lock().unwrap(); + // TODO: check if the thread still exists let current_thread = scheduler.get_thread(*current_thread_guard).unwrap(); current_thread.borrow_mut().rsp = current_rsp; *current_thread_guard = self.id; // change running thread - } // The scheduler and running thread guards is dropped here + } // The scheduler and running thread guards are dropped here unsafe { if self.started {