deja-vu/core/src/tasks.rs
2025-05-11 20:20:08 -07:00

335 lines
11 KiB
Rust

// This file is part of Deja-Vu.
//
// Deja-Vu is free software: you can redistribute it and/or modify it
// under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// Deja-Vu is distributed in the hope that it will be useful, but
// WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
// See the GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with Deja-Vu. If not, see <https://www.gnu.org/licenses/>.
#![allow(clippy::missing_errors_doc)]
#![allow(clippy::missing_panics_doc)]
use std::{
collections::VecDeque,
fmt,
path::PathBuf,
process::{Command, Stdio},
sync::{Arc, Mutex},
thread::{self, JoinHandle, sleep},
time::Duration,
};
use color_eyre::Result;
use serde::{Deserialize, Serialize};
use tokio::sync::mpsc;
use tracing::info;
use crate::{
action::Action,
components::popup,
system::{disk, diskpart},
};
#[derive(Clone, Debug)]
pub enum TaskResult {
Error(String),
Output(String, String, bool), // stdout, stderr, success
}
#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
pub enum TaskType {
CommandNoWait(PathBuf, Vec<String>), // (command, args)
CommandWait(PathBuf, Vec<String>), // (command, args)
Diskpart(String), // (script_as_string)
ScanDisks,
Sleep,
TestPaths(Vec<PathBuf>),
UpdateDestDisk(usize), // (disk_index)
UpdateDiskList,
}
impl fmt::Display for TaskType {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
TaskType::CommandNoWait(cmd_path, _) | TaskType::CommandWait(cmd_path, _) => {
write!(
f,
"Command(\"{}\")",
cmd_path.file_name().unwrap().to_string_lossy()
)
}
TaskType::Diskpart(_) => {
write!(f, "Diskpart")
}
TaskType::ScanDisks => write!(f, "ScanDisks"),
TaskType::Sleep => write!(f, "Sleep"),
TaskType::TestPaths(_) => write!(f, "TestPaths"),
TaskType::UpdateDestDisk(_) => write!(f, "UpdateDestDisk"),
TaskType::UpdateDiskList => write!(f, "UpdateDiskList"),
}
}
}
#[derive(Debug)]
pub struct Task {
pub handle: Option<JoinHandle<()>>,
pub result: Option<TaskResult>,
pub task_type: TaskType,
}
impl Task {
#[must_use]
pub fn new(task_type: TaskType) -> Task {
Task {
handle: None,
result: None,
task_type,
}
}
}
#[derive(Debug)]
pub struct Tasks {
action_tx: mpsc::UnboundedSender<Action>,
disk_list: Arc<Mutex<Vec<disk::Disk>>>,
cur_handle: Option<JoinHandle<()>>,
cur_task: Option<Task>,
task_list: VecDeque<Task>,
task_rx: mpsc::UnboundedReceiver<TaskResult>,
task_tx: mpsc::UnboundedSender<TaskResult>,
}
impl Tasks {
pub fn new(
action_tx: mpsc::UnboundedSender<Action>,
disk_list_arc: Arc<Mutex<Vec<disk::Disk>>>,
) -> Self {
let (task_tx, task_rx) = mpsc::unbounded_channel();
Tasks {
action_tx,
disk_list: disk_list_arc,
cur_handle: None,
cur_task: None,
task_list: VecDeque::new(),
task_rx,
task_tx,
}
}
pub fn add(&mut self, task_type: TaskType) {
info!("Adding task: {:?}", &task_type);
self.task_list.push_back(Task::new(task_type));
}
#[must_use]
pub fn idle(&self) -> bool {
self.cur_handle.is_none()
}
pub fn poll(&mut self) -> Result<Option<Task>> {
let mut return_task: Option<Task> = None;
// Handle task channel item(s)
if let Ok(result) = self.task_rx.try_recv() {
if let Some(mut task) = self.cur_task.take() {
task.result.replace(result);
self.cur_task.replace(task);
}
}
// Check status of current task (if one is running).
// NOTE: Action::TasksComplete is sent once all tasks are complete
if let Some(task_handle) = self.cur_handle.take() {
if task_handle.is_finished() {
// Need to return task with handle
if let Some(mut cur_task) = self.cur_task.take() {
cur_task.handle = Some(task_handle);
return_task = Some(cur_task);
}
if self.task_list.is_empty() {
// No tasks remain
self.action_tx.send(Action::TasksComplete)?;
} else {
// Start next task
self.start()?;
}
} else {
// TaskType not complete, return handle
self.cur_handle.replace(task_handle);
}
} else if !self.task_list.is_empty() {
// No current task but one is available
self.start()?;
}
Ok(return_task)
}
pub fn start(&mut self) -> Result<()> {
self.cur_task = self.task_list.pop_front();
if let Some(task) = self.cur_task.take() {
let task_tx = self.task_tx.clone();
self.action_tx
.send(Action::TaskStart(task.task_type.clone()))?;
match task.task_type {
TaskType::CommandNoWait(ref cmd_path, ref cmd_args) => {
self.cur_handle = None;
run_task_command(cmd_path.clone(), cmd_args.clone(), task_tx);
}
TaskType::CommandWait(ref cmd_path, ref cmd_args) => {
self.cur_handle = Some(run_task_command(
cmd_path.clone(),
cmd_args.clone(),
task_tx,
));
}
TaskType::Diskpart(ref script) => {
self.cur_handle = Some(run_task_diskpart(script, task_tx));
}
TaskType::ScanDisks => {
let disk_list_arc = self.disk_list.clone();
// Queue UpdateDiskList for various components
self.add(TaskType::UpdateDiskList);
self.cur_handle = Some(thread::spawn(move || {
let mut disks = disk_list_arc.lock().unwrap();
*disks = disk::get_disks();
}));
}
TaskType::Sleep => {
self.cur_handle = Some(thread::spawn(|| sleep(Duration::from_millis(250))));
}
TaskType::TestPaths(ref list) => {
self.cur_handle = Some(test_paths(list.clone(), task_tx.clone()));
}
TaskType::UpdateDestDisk(index) => {
self.action_tx.send(Action::DisplayPopup(
popup::Type::Info,
String::from("Refreshing disk info"),
))?;
// Queue UpdateDiskList for various components
self.add(TaskType::Sleep);
self.add(TaskType::UpdateDiskList);
// Update destination disk ~in-place
let disk_list_arc = self.disk_list.clone();
self.cur_handle = Some(thread::spawn(move || {
let mut disks = disk_list_arc.lock().unwrap();
let old_disk = &mut disks[index];
disks[index] = disk::refresh_disk_info(old_disk);
}));
}
TaskType::UpdateDiskList => {
let disks = self.disk_list.lock().unwrap();
let disks_copy = disks.clone();
let action_tx = self.action_tx.clone();
self.cur_handle = Some(thread::spawn(move || {
if let Err(err) = action_tx.send(Action::UpdateDiskList(disks_copy)) {
panic!("Failed to send Action: {err:?}");
}
}));
}
}
// Done
self.cur_task.replace(task);
}
Ok(())
}
}
fn parse_bytes_as_str(bytes: Vec<u8>) -> String {
match String::from_utf8(bytes) {
Ok(s) => s.trim().to_string(),
Err(_) => String::from("Failed to parse bytes as UTF-8 text"),
}
}
fn run_task_command(
cmd_path: PathBuf,
cmd_args: Vec<String>,
task_tx: mpsc::UnboundedSender<TaskResult>,
) -> JoinHandle<()> {
if cfg!(windows) {
thread::spawn(move || {
let result = Command::new(cmd_path)
.args(cmd_args)
.stdout(Stdio::piped())
.output();
match result {
Err(e) => {
task_tx
.send(TaskResult::Error(format!("{:?}", &e)))
.expect("Failed to propegate error?");
}
Ok(output) => {
let stderr = parse_bytes_as_str(output.stderr.clone());
let stdout = parse_bytes_as_str(output.stdout.clone());
let task_result = TaskResult::Output(stdout, stderr, output.status.success());
let err_str = format!("Failed to send TaskResult: {:?}", &task_result);
task_tx
.send(task_result)
.unwrap_or_else(|_| panic!("{}", err_str));
}
}
})
} else {
// Simulate task if not running under Windows
thread::spawn(|| sleep(Duration::from_millis(500)))
}
}
fn run_task_diskpart(script: &str, task_tx: mpsc::UnboundedSender<TaskResult>) -> JoinHandle<()> {
if cfg!(windows) {
let script = script.to_owned();
thread::spawn(move || {
let output = diskpart::run_script_raw(&script);
let stderr = parse_bytes_as_str(output.stderr.clone());
let stdout = parse_bytes_as_str(output.stdout.clone());
let task_result = TaskResult::Output(stdout, stderr, output.status.success());
let err_str = format!("Failed to send TaskResult: {:?}", &task_result);
task_tx
.send(task_result)
.unwrap_or_else(|_| panic!("{}", err_str));
})
} else {
// Simulate task if not running under Windows
thread::spawn(|| sleep(Duration::from_millis(250)))
}
}
fn test_paths(
path_list: Vec<PathBuf>,
task_tx: mpsc::UnboundedSender<TaskResult>,
) -> JoinHandle<()> {
thread::spawn(move || {
let mut missing_paths = Vec::new();
let task_result: TaskResult;
path_list.iter().for_each(|path| {
if !path.exists() {
missing_paths.push(String::from(path.to_string_lossy()));
}
});
if missing_paths.is_empty() {
// No missing paths
task_result = TaskResult::Output(String::from("OK"), String::new(), true);
} else {
task_result = TaskResult::Output(
String::from("Missing item(s)"),
missing_paths.join(",\n"),
false,
);
};
let err_str = format!("Failed to send TaskResult: {:?}", &task_result);
task_tx
.send(task_result)
.unwrap_or_else(|_| panic!("{}", err_str));
})
}