// This file is part of Deja-Vu. // // Deja-Vu is free software: you can redistribute it and/or modify it // under the terms of the GNU General Public License as published by // the Free Software Foundation, either version 3 of the License, or // (at your option) any later version. // // Deja-Vu is distributed in the hope that it will be useful, but // WITHOUT ANY WARRANTY; without even the implied warranty of // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. // See the GNU General Public License for more details. // // You should have received a copy of the GNU General Public License // along with Deja-Vu. If not, see . // use std::{ collections::VecDeque, path::PathBuf, process::{Command, Stdio}, sync::{Arc, Mutex}, thread::{self, sleep, JoinHandle}, time::Duration, }; use color_eyre::Result; use tokio::sync::mpsc; use tracing::info; use crate::{ action::Action, components::popup, system::{disk, diskpart}, }; #[derive(Clone, Debug)] pub enum TaskResult { Error(String), Output(String, String, bool), // stdout, stderr, success } #[derive(Clone, Debug)] pub enum TaskType { Command(PathBuf, Vec), // (command, args) Diskpart(String), // (script_as_string) ScanDisks, Sleep, UpdateDestDisk(usize), // (disk_index) UpdateDiskList, } #[derive(Debug)] pub struct Task { pub handle: Option>, pub result: Option, pub task_type: TaskType, } impl Task { pub fn new(task_type: TaskType) -> Task { Task { handle: None, result: None, task_type, } } } #[derive(Debug)] pub struct Tasks { action_tx: mpsc::UnboundedSender, disk_list: Arc>>, cur_handle: Option>, cur_task: Option, task_list: VecDeque, task_rx: mpsc::UnboundedReceiver, task_tx: mpsc::UnboundedSender, } impl Tasks { pub fn new( action_tx: mpsc::UnboundedSender, disk_list_arc: Arc>>, ) -> Self { let (task_tx, task_rx) = mpsc::unbounded_channel(); Tasks { action_tx, disk_list: disk_list_arc, cur_handle: None, cur_task: None, task_list: VecDeque::new(), task_rx, task_tx, } } pub fn add(&mut self, task_type: TaskType) { info!("Adding task: {:?}", &task_type); self.task_list.push_back(Task::new(task_type)); } pub fn idle(&self) -> bool { self.cur_handle.is_none() } pub fn poll(&mut self) -> Result> { let mut return_task: Option = None; // Handle task channel item(s) if let Ok(result) = self.task_rx.try_recv() { if let Some(mut task) = self.cur_task.take() { task.result.replace(result); self.cur_task.replace(task); } } // Check status of current task (if one is running). // NOTE: Action::TasksComplete is sent once all tasks are complete if let Some(task_handle) = self.cur_handle.take() { if task_handle.is_finished() { // Need to return task with handle if let Some(mut cur_task) = self.cur_task.take() { cur_task.handle = Some(task_handle); return_task = Some(cur_task); } if self.task_list.is_empty() { // No tasks remain self.action_tx.send(Action::TasksComplete)?; } else { // Start next task self.start()?; } } else { // TaskType not complete, return handle self.cur_handle.replace(task_handle); } } else if !self.task_list.is_empty() { // No current task but one is available self.start()?; } Ok(return_task) } pub fn start(&mut self) -> Result<()> { self.cur_task = self.task_list.pop_front(); if let Some(task) = self.cur_task.take() { let task_tx = self.task_tx.clone(); match task.task_type { TaskType::Command(ref cmd_path, ref cmd_args) => { self.cur_handle = Some(run_task_command( cmd_path.clone(), cmd_args.clone(), task_tx, )); } TaskType::Diskpart(ref script) => { self.cur_handle = Some(run_task_diskpart(&script, task_tx)); } TaskType::ScanDisks => { let disk_list_arc = self.disk_list.clone(); // Queue UpdateDiskList for various components self.add(TaskType::UpdateDiskList); self.cur_handle = Some(thread::spawn(move || { let mut disks = disk_list_arc.lock().unwrap(); *disks = disk::get_disks() })); } TaskType::Sleep => { self.cur_handle = Some(thread::spawn(|| sleep(Duration::from_millis(250)))); } TaskType::UpdateDestDisk(index) => { self.action_tx.send(Action::DisplayPopup( popup::Type::Info, String::from("Refreshing disk info"), ))?; // Queue UpdateDiskList for various components self.add(TaskType::Sleep); self.add(TaskType::UpdateDiskList); // Update destination disk ~in-place let disk_list_arc = self.disk_list.clone(); self.cur_handle = Some(thread::spawn(move || { let mut disks = disk_list_arc.lock().unwrap(); let old_disk = &mut disks[index]; disks[index] = disk::refresh_disk_info(old_disk); })); } TaskType::UpdateDiskList => { let disks = self.disk_list.lock().unwrap(); let disks_copy = disks.clone(); let action_tx = self.action_tx.clone(); self.cur_handle = Some(thread::spawn(move || { if let Err(err) = action_tx.send(Action::UpdateDiskList(disks_copy)) { panic!("Failed to send Action: {err:?}"); } })); } } // Done self.cur_task.replace(task); } Ok(()) } } fn parse_bytes_as_str(bytes: Vec) -> String { match String::from_utf8(bytes) { Ok(s) => s.trim().to_string(), Err(_) => String::from("Failed to parse bytes as UTF-8 text"), } } fn run_task_command( cmd_path: PathBuf, cmd_args: Vec, task_tx: mpsc::UnboundedSender, ) -> JoinHandle<()> { if cfg!(windows) { thread::spawn(move || { let result = Command::new(cmd_path) .args(cmd_args) .stdout(Stdio::piped()) .output(); match result { Err(e) => { task_tx .send(TaskResult::Error(format!("{:?}", &e))) .expect("Failed to propegate error?"); } Ok(output) => { let stderr = parse_bytes_as_str(output.stderr.to_owned()); let stdout = parse_bytes_as_str(output.stdout.to_owned()); let task_result = TaskResult::Output(stdout, stderr, output.status.success()); let err_str = format!("Failed to send TaskResult: {:?}", &task_result); task_tx.send(task_result).expect(err_str.as_str()); } } }) } else { // Simulate task if not running under Windows thread::spawn(|| sleep(Duration::from_millis(250))) } } fn run_task_diskpart(script: &str, task_tx: mpsc::UnboundedSender) -> JoinHandle<()> { if cfg!(windows) { let script = script.to_owned(); thread::spawn(move || { let output = diskpart::run_script_raw(&script); let stderr = parse_bytes_as_str(output.stderr.to_owned()); let stdout = parse_bytes_as_str(output.stdout.to_owned()); let task_result = TaskResult::Output(stdout, stderr, output.status.success()); let err_str = format!("Failed to send TaskResult: {:?}", &task_result); task_tx.send(task_result).expect(err_str.as_str()); }) } else { // Simulate task if not running under Windows thread::spawn(|| sleep(Duration::from_millis(250))) } }