From a765c2565fb31448e9b52bd5e082a7b771b83d6b Mon Sep 17 00:00:00 2001 From: Hardcore Sushi Date: Fri, 30 Apr 2021 18:54:48 +0200 Subject: [PATCH] Fix large file transfers bugs --- src/frontend/index.js | 130 ++++++++++++------------ src/main.rs | 2 +- src/session_manager/mod.rs | 173 +++++++++++++++++--------------- src/session_manager/protocol.rs | 18 ++-- src/session_manager/session.rs | 1 + src/utils.rs | 18 +++- 6 files changed, 188 insertions(+), 154 deletions(-) diff --git a/src/frontend/index.js b/src/frontend/index.js index 7c2d31e..2fd139c 100644 --- a/src/frontend/index.js +++ b/src/frontend/index.js @@ -524,15 +524,15 @@ function onFileAborted(sessionId) { } } } -function onIncFileTransfer(sessionId, chunk_size) { +function onIncFileTransfer(sessionId, chunkSize) { if (pendingFiles.has(sessionId)) { let file = pendingFiles.get(sessionId); - file.transferred += chunk_size; + file.transferred += chunkSize; let now = Date.now(); - let speed = chunk_size/(now-file.lastChunk)*1000; + let speed = chunkSize/(now-file.lastChunk)*1000; file.lastChunk = now; if (file.transferred >= file.size) { - file.state = "finished"; + file.state = "completed"; } else { file.state = "transferring"; } @@ -547,25 +547,26 @@ function onMsgLoad(sessionId, outgoing, msg) { dislayHistory(false); } } -function onFileLoad(sessionId, outgoing, uuid, file_name) { - msgHistory.get(sessionId).unshift([outgoing, true, [uuid, file_name]]); +function onFileLoad(sessionId, outgoing, uuid, fileName) { + msgHistory.get(sessionId).unshift([outgoing, true, [uuid, fileName]]); if (currentSessionId == sessionId) { dislayHistory(false); } } function onDisconnected(sessionId) { - if (currentSessionId == sessionId) { - displayChatBottom(); - } + pendingFiles.delete(sessionId); let session = sessionsData.get(sessionId); if (session.is_contact) { session.is_online = false; } else { sessionsData.delete(sessionId); - if (currentSessionId == sessionId) { - currentSessionId = -1; - document.getElementById("chat_header").classList.add("offline"); - } + } + if (currentSessionId == sessionId) { + displayChatBottom(); + } + if (currentSessionId == sessionId && !session.is_contact) { + currentSessionId = -1; + document.getElementById("chat_header").classList.add("offline"); } displaySessions(); } @@ -808,57 +809,62 @@ function generateFileInfo(fileName, fileSize, p) { p.appendChild(document.createTextNode(" ("+humanFileSize(fileSize)+")")); } function displayChatBottom(speed = undefined) { + let msgBox = document.getElementById("message_box"); let session = sessionsData.get(currentSessionId); - if (session.is_online) { - document.getElementById("message_box").style.display = "flex"; + if (typeof session === "undefined") { + msgBox.removeAttribute("style"); } else { - document.getElementById("message_box").removeAttribute("style"); - } - let fileTransfer = document.getElementById("file_transfer"); - if (pendingFiles.has(currentSessionId)) { - let file = pendingFiles.get(currentSessionId); - let fileInfo = document.getElementById("file_info"); - fileInfo.innerHTML = ""; - generateFileInfo(file.name, file.size, fileInfo); - let fileProgress = document.getElementById("file_progress"); - fileProgress.style.display = "none"; //hide by default - let fileStatus = document.getElementById("file_status"); - fileStatus.removeAttribute("style"); //show by default - let fileCancel = document.getElementById("file_cancel"); - fileCancel.style.display = "none"; //hide by default - document.querySelector("#file_progress_bar>div").style.width = 0; - switch (file.state) { - case "transferring": - fileCancel.removeAttribute("style"); //show - fileStatus.style.display = "none"; - fileProgress.removeAttribute("style"); //show - let percent = (file.transferred/file.size)*100; - document.getElementById("file_percent").textContent = percent.toFixed(2)+"%"; - if (typeof speed !== "undefined") { - document.getElementById("file_speed").textContent = humanFileSize(speed)+"/s"; - } - document.querySelector("#file_progress_bar>div").style.width = Math.round(percent)+"%"; - break; - case "waiting": - fileStatus.textContent = "Waiting for peer confirmation..."; - break; - case "accepted": - fileStatus.textContent = "Downloading file..."; - break; - case "aborted": - fileStatus.textContent = "Transfer aborted."; - pendingFiles.delete(currentSessionId); - break; - case "sending": - fileStatus.textContent = "Sending file..."; - break; - case "finished": - fileStatus.textContent = "Transfer finished."; - pendingFiles.delete(currentSessionId); + if (session.is_online) { + msgBox.style.display = "flex"; + } else { + msgBox.removeAttribute("style"); + } + let fileTransfer = document.getElementById("file_transfer"); + if (pendingFiles.has(currentSessionId)) { + let file = pendingFiles.get(currentSessionId); + let fileInfo = document.getElementById("file_info"); + fileInfo.innerHTML = ""; + generateFileInfo(file.name, file.size, fileInfo); + let fileProgress = document.getElementById("file_progress"); + fileProgress.style.display = "none"; //hide by default + let fileStatus = document.getElementById("file_status"); + fileStatus.removeAttribute("style"); //show by default + let fileCancel = document.getElementById("file_cancel"); + fileCancel.style.display = "none"; //hide by default + document.querySelector("#file_progress_bar>div").style.width = 0; + switch (file.state) { + case "transferring": + fileCancel.removeAttribute("style"); //show + fileStatus.style.display = "none"; + fileProgress.removeAttribute("style"); //show + let percent = (file.transferred/file.size)*100; + document.getElementById("file_percent").textContent = percent.toFixed(2)+"%"; + if (typeof speed !== "undefined") { + document.getElementById("file_speed").textContent = humanFileSize(speed)+"/s"; + } + document.querySelector("#file_progress_bar>div").style.width = Math.round(percent)+"%"; + break; + case "waiting": + fileStatus.textContent = "Waiting for peer confirmation..."; + break; + case "accepted": + fileStatus.textContent = "Downloading file..."; + break; + case "aborted": + fileStatus.textContent = "Transfer aborted."; + pendingFiles.delete(currentSessionId); + break; + case "sending": + fileStatus.textContent = "Sending file..."; + break; + case "completed": + fileStatus.textContent = "Transfer completed."; + pendingFiles.delete(currentSessionId); + } + fileTransfer.classList.add("active"); + } else { + fileTransfer.classList.remove("active"); } - fileTransfer.classList.add("active"); - } else { - fileTransfer.classList.remove("active"); } } function dislayHistory(scrollToBottom = true) { @@ -880,4 +886,4 @@ function dislayHistory(scrollToBottom = true) { if (scrollToBottom) { msg_log.scrollTop = msg_log.scrollHeight; } -} \ No newline at end of file +} diff --git a/src/main.rs b/src/main.rs index 3c3747e..8c543ee 100644 --- a/src/main.rs +++ b/src/main.rs @@ -98,7 +98,7 @@ async fn websocket_worker(mut ui_connection: UiConnection, global_vars: Arc 0 { diff --git a/src/session_manager/mod.rs b/src/session_manager/mod.rs index 193ec00..4672a0f 100644 --- a/src/session_manager/mod.rs +++ b/src/session_manager/mod.rs @@ -9,7 +9,7 @@ use session::Session; use ed25519_dalek::PUBLIC_KEY_LENGTH; use uuid::Uuid; use platform_dirs::UserDirs; -use crate::{constants, discovery, identity::{Contact, Identity}, utils::get_unix_timestamp, print_error}; +use crate::{constants, discovery, identity::{Contact, Identity}, utils::{get_unix_timestamp, get_not_used_path}, print_error}; use crate::ui_interface::UiConnection; #[derive(Display, Debug, PartialEq, Eq)] @@ -34,7 +34,7 @@ enum SessionCommand { }, Close, } -#[derive(Clone, PartialEq, Eq)] +#[derive(Copy, Clone, PartialEq, Eq)] pub enum FileState { ASKING, ACCEPTED, @@ -57,7 +57,7 @@ pub struct SessionData { pub outgoing: bool, peer_public_key: [u8; PUBLIC_KEY_LENGTH], sender: Sender, - pub file_transfer: Option, + pub file_download: Option, } pub struct SessionManager { @@ -164,6 +164,9 @@ impl SessionManager { } fn remove_session(&self, session_id: &usize) { + self.with_ui_connection(|ui_connection| { + ui_connection.on_disconnected(&session_id); + }); self.sessions.write().unwrap().remove(session_id); self.saved_msgs.lock().unwrap().remove(session_id); self.not_seen.write().unwrap().retain(|x| x != session_id); @@ -172,9 +175,9 @@ impl SessionManager { async fn send_msg(&self, session_id: usize, session: &mut Session, buff: &[u8], aborted: &mut bool, file_ack_sender: Option<&Sender>) -> Result<(), SessionError> { session.encrypt_and_send(&buff).await?; if buff[0] == protocol::Headers::ACCEPT_LARGE_FILE { - self.sessions.write().unwrap().get_mut(&session_id).unwrap().file_transfer.as_mut().unwrap().state = FileState::ACCEPTED; + self.sessions.write().unwrap().get_mut(&session_id).unwrap().file_download.as_mut().unwrap().state = FileState::ACCEPTED; } else if buff[0] == protocol::Headers::ABORT_FILE_TRANSFER { - self.sessions.write().unwrap().get_mut(&session_id).unwrap().file_transfer = None; + self.sessions.write().unwrap().get_mut(&session_id).unwrap().file_download = None; *aborted = true; if let Some(sender) = file_ack_sender { if let Err(e) = sender.send(false).await { @@ -234,86 +237,95 @@ impl SessionManager { } } protocol::Headers::ASK_LARGE_FILE => { - let file_size = u64::from_be_bytes(buffer[1..9].try_into().unwrap()); - match from_utf8(&buffer[9..]) { - Ok(file_name) => { - let file_name = sanitize_filename::sanitize(file_name); - let download_dir = UserDirs::new().unwrap().download_dir; - self.sessions.write().unwrap().get_mut(&session_id).unwrap().file_transfer = Some(LargeFileDownload{ - file_name: file_name.clone(), - download_location: download_dir.to_str().unwrap().to_string(), - file_size, - state: FileState::ASKING, - transferred: 0, - last_chunk: get_unix_timestamp(), - }); - let mut test_file_path = download_dir.join(&file_name); - let mut n = 1; - while test_file_path.exists() { - let splits: Vec<&str> = file_name.split('.').collect(); - test_file_path = download_dir.join(format!("{} ({}).{}", splits[..splits.len()-1].join("."), n, splits[splits.len()-1])); - n += 1; + if self.sessions.read().unwrap().get(&session_id).unwrap().file_download.is_none() { //don't accept 2 downloads at the same time + let file_size = u64::from_be_bytes(buffer[1..9].try_into().unwrap()); + match from_utf8(&buffer[9..]) { + Ok(file_name) => { + let file_name = sanitize_filename::sanitize(file_name); + let download_dir = UserDirs::new().unwrap().download_dir; + self.sessions.write().unwrap().get_mut(&session_id).unwrap().file_download = Some(LargeFileDownload{ + file_name: file_name.clone(), + download_location: download_dir.to_str().unwrap().to_string(), + file_size, + state: FileState::ASKING, + transferred: 0, + last_chunk: get_unix_timestamp(), + }); + local_file_path = Some(get_not_used_path(&file_name, &download_dir)); + self.with_ui_connection(|ui_connection| { + ui_connection.on_ask_large_file(&session_id, file_size, &file_name, download_dir.to_str().unwrap()); + }) } - local_file_path = Some(test_file_path); - self.with_ui_connection(|ui_connection| { - ui_connection.on_ask_large_file(&session_id, file_size, &file_name, download_dir.to_str().unwrap()); - }) + Err(e) => print_error!(e), } - Err(e) => print_error!(e), + } else if let Err(e) = session.encrypt_and_send(&[protocol::Headers::ABORT_FILE_TRANSFER]).await { + print_error!(e); + break; } } protocol::Headers::LARGE_FILE_CHUNK => { - let file_transfer_opt = { - self.sessions.read().unwrap().get(&session_id).unwrap().file_transfer.clone() + let state = { + let sessions = self.sessions.read().unwrap(); + match sessions.get(&session_id).unwrap().file_download.as_ref() { + Some(file_transfer) => Some(file_transfer.state), + None => None + } }; - if let Some(file_transfer) = file_transfer_opt { - if file_transfer.state == FileState::ACCEPTED || file_transfer.state == FileState::TRANSFERRING { - if local_file_handle.is_none() { - if let Some(file_path) = local_file_path.as_ref() { - match OpenOptions::new().append(true).create(true).open(file_path) { - Ok(file) => local_file_handle = Some(file), - Err(e) => print_error!(e) - } - } - } - let mut is_success = false; - if let Some(file_handle) = local_file_handle.as_mut() { - match file_handle.write_all(&buffer[1..]) { - Ok(_) => { - let chunk_size = (buffer.len()-1) as u64; - { - let mut sessions = self.sessions.write().unwrap(); - let file_transfer = sessions.get_mut(&session_id).unwrap().file_transfer.as_mut().unwrap(); - file_transfer.last_chunk = get_unix_timestamp(); - file_transfer.transferred += chunk_size; - if file_transfer.transferred >= file_transfer.file_size { //we downloaded all the file - sessions.get_mut(&session_id).unwrap().file_transfer = None; - local_file_path = None; - local_file_handle = None; - } else if file_transfer.state != FileState::TRANSFERRING { - file_transfer.state = FileState::TRANSFERRING; - } - } - if let Err(e) = session.encrypt_and_send(&[protocol::Headers::ACK_CHUNK]).await { - print_error!(e); - break; - } - self.with_ui_connection(|ui_connection| { - ui_connection.inc_file_transfer(&session_id, chunk_size); - }); - is_success = true; + let mut should_accept_chunk = false; + if let Some(state) = state { + if state == FileState::ACCEPTED { + if let Some(file_path) = local_file_path.as_ref() { + match OpenOptions::new().append(true).create(true).open(file_path) { + Ok(file) => { + local_file_handle = Some(file); + let mut sessions = self.sessions.write().unwrap(); + let file_transfer = sessions.get_mut(&session_id).unwrap().file_download.as_mut().unwrap(); + file_transfer.state = FileState::TRANSFERRING; + should_accept_chunk = true; } Err(e) => print_error!(e) } } - if !is_success { - self.sessions.write().unwrap().get_mut(&session_id).unwrap().file_transfer = None; - local_file_path = None; - local_file_handle = None; - if let Err(e) = session.encrypt_and_send(&[protocol::Headers::ABORT_FILE_TRANSFER]).await { - print_error!(e); - break; + } else if state == FileState::TRANSFERRING { + should_accept_chunk = true; + } + } + if should_accept_chunk { + let mut is_success = false; + if let Some(file_handle) = local_file_handle.as_mut() { + match file_handle.write_all(&buffer[1..]) { + Ok(_) => { + let chunk_size = (buffer.len()-1) as u64; + { + let mut sessions = self.sessions.write().unwrap(); + let file_transfer = sessions.get_mut(&session_id).unwrap().file_download.as_mut().unwrap(); + file_transfer.last_chunk = get_unix_timestamp(); + file_transfer.transferred += chunk_size; + if file_transfer.transferred >= file_transfer.file_size { //we downloaded all the file + sessions.get_mut(&session_id).unwrap().file_download = None; + local_file_path = None; + local_file_handle = None; + } + } + if let Err(e) = session.encrypt_and_send(&[protocol::Headers::ACK_CHUNK]).await { + print_error!(e); + break; + } + self.with_ui_connection(|ui_connection| { + ui_connection.inc_file_transfer(&session_id, chunk_size); + }); + is_success = true; } + Err(e) => print_error!(e) + } + } + if !is_success { + self.sessions.write().unwrap().get_mut(&session_id).unwrap().file_download = None; + local_file_path = None; + local_file_handle = None; + if let Err(e) = session.encrypt_and_send(&[protocol::Headers::ABORT_FILE_TRANSFER]).await { + print_error!(e); + break; } } } @@ -337,7 +349,7 @@ impl SessionManager { } aborted = true; } - self.sessions.write().unwrap().get_mut(&session_id).unwrap().file_transfer = None; + self.sessions.write().unwrap().get_mut(&session_id).unwrap().file_download = None; local_file_path = None; local_file_handle = None; self.with_ui_connection(|ui_connection| { @@ -383,12 +395,9 @@ impl SessionManager { } } Err(e) => { - if e != SessionError::BrokenPipe && e != SessionError::ConnectionReset { + if e != SessionError::BrokenPipe && e != SessionError::ConnectionReset && e != SessionError::BufferTooLarge { print_error!(e); } - self.with_ui_connection(|ui_connection| { - ui_connection.on_disconnected(&session_id); - }); break; } } @@ -488,7 +497,7 @@ impl SessionManager { outgoing, peer_public_key, sender: sender, - file_transfer: None, + file_download: None, }; let mut session_id = None; for (i, contact) in session_manager.loaded_contacts.read().unwrap().iter() { @@ -610,7 +619,9 @@ impl SessionManager { let result = Identity::remove_contact(&loaded_contacts.get(&session_id).unwrap().uuid); if result.is_ok() { if let Some(contact) = loaded_contacts.remove(&session_id) { - self.sessions.write().unwrap().get_mut(&session_id).unwrap().name = contact.name; + if let Some(session) = self.sessions.write().unwrap().get_mut(&session_id) { + session.name = contact.name; + } } self.last_loaded_msg_offsets.write().unwrap().remove(&session_id); } diff --git a/src/session_manager/protocol.rs b/src/session_manager/protocol.rs index ebf66ba..0f26d9d 100644 --- a/src/session_manager/protocol.rs +++ b/src/session_manager/protocol.rs @@ -1,15 +1,15 @@ pub struct Headers; impl Headers { - pub const MESSAGE: u8 = 0x01; - pub const ASK_NAME: u8 = 0x02; - pub const TELL_NAME: u8 = 0x03; - pub const FILE: u8 = 0x04; - pub const ASK_LARGE_FILE: u8 = 0x05; - pub const ACCEPT_LARGE_FILE: u8 = 0x06; - pub const LARGE_FILE_CHUNK: u8 = 0x07; - pub const ACK_CHUNK: u8 = 0x08; - pub const ABORT_FILE_TRANSFER: u8 = 0x09; + pub const MESSAGE: u8 = 0x00; + pub const ASK_NAME: u8 = 0x01; + pub const TELL_NAME: u8 = 0x02; + pub const FILE: u8 = 0x03; + pub const ASK_LARGE_FILE: u8 = 0x04; + pub const ACCEPT_LARGE_FILE: u8 = 0x05; + pub const LARGE_FILE_CHUNK: u8 = 0x06; + pub const ACK_CHUNK: u8 = 0x07; + pub const ABORT_FILE_TRANSFER: u8 = 0x08; } pub fn new_message(message: String) -> Vec { diff --git a/src/session_manager/session.rs b/src/session_manager/session.rs index ae53ef3..c619791 100644 --- a/src/session_manager/session.rs +++ b/src/session_manager/session.rs @@ -252,6 +252,7 @@ impl Session { Err(_) => Err(SessionError::TransmissionCorrupted) } } else { + print_error!("Buffer too large: {} B", recv_len); Err(SessionError::BufferTooLarge) } } diff --git a/src/utils.rs b/src/utils.rs index a46b8cb..83389be 100644 --- a/src/utils.rs +++ b/src/utils.rs @@ -1,4 +1,4 @@ -use std::{convert::TryInto, time::{SystemTime, UNIX_EPOCH}}; +use std::{convert::TryInto, time::{SystemTime, UNIX_EPOCH}, path::PathBuf}; use uuid::Bytes; use crate::print_error; @@ -28,6 +28,22 @@ pub fn get_unix_timestamp() -> u128 { SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_millis() } +pub fn get_not_used_path(file_name: &str, parent_directory: &PathBuf) -> String { + let has_extension = file_name.matches('.').count() > 0; + let mut path = parent_directory.join(&file_name); + let mut n = 1; + while path.exists() { + path = if has_extension { + let splits: Vec<&str> = file_name.split('.').collect(); + parent_directory.join(format!("{} ({}).{}", splits[..splits.len()-1].join("."), n, splits[splits.len()-1])) + } else { + parent_directory.join(format!("{} ({})", file_name, n)) + }; + n += 1; + } + path.to_str().unwrap().to_owned() +} + #[macro_export] macro_rules! print_error { ($arg:tt) => ({