From 809ce34bff18e8396a023da16284d6a1cd2e3146 Mon Sep 17 00:00:00 2001 From: Hardcore Sushi Date: Thu, 6 May 2021 18:24:29 +0200 Subject: [PATCH] Multiple Files Transfers --- Cargo.toml | 2 +- src/frontend/index.css | 18 ++- src/frontend/index.html | 2 +- src/frontend/index.js | 275 +++++++++++++++++++------------- src/main.rs | 64 ++++++-- src/session_manager/mod.rs | 198 ++++++++++------------- src/session_manager/protocol.rs | 49 ++++-- src/ui_interface.rs | 76 +++++---- 8 files changed, 384 insertions(+), 300 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index feb3a01..fab25a7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -22,6 +22,7 @@ hex = "0.4.3" strum_macros = "0.20.1" #display enums actix-web = "3" actix-multipart = "0.3" +time = "0.2.25" #needed for actix cookies futures = "0.3" tungstenite = "0.13.0" #websocket serde = "1.0.124" #serialization @@ -34,6 +35,5 @@ libmdns = "0.6" #mDNS advertiser multicast_dns = "0.5" #mDNS browser pnet_datalink = "0.27.2" base64 = "0.13.0" -time = "0.2.25" scrypt = "0.7.0" zeroize = "1.2.0" \ No newline at end of file diff --git a/src/frontend/index.css b/src/frontend/index.css index 71c6c75..9503270 100644 --- a/src/frontend/index.css +++ b/src/frontend/index.css @@ -54,12 +54,20 @@ input[type="file"] { right: 0; margin: auto; width: 40%; + max-height: 90vh; + overflow: auto; box-sizing: border-box; - padding: 20px 70px; + padding: 20px 70px 10px; background-color: #2B2F31; border-radius: 10px; font-size: 1.2em; } +.popup:last-child::after { + content: ""; + display: block; + height: 20px; + width: 100%; +} .popup_background { height: 100%; width: 100%; @@ -198,12 +206,6 @@ input[type="file"] { } #left_panel ul li:hover, #left_panel ul li.current { background-color: #333940; -} -#left_panel ul li.output { - -} -#left_panel ul li.input { - } #left_panel ul li p::after { content: url("/static/imgs/icons/warning/FF3C00"); @@ -250,7 +252,7 @@ input[type="file"] { background-color: unset; content: url("/static/imgs/icons/info/FF3C00"); } -ul.ips { +.popup ul { list-style-type: unset; } #chat_header { diff --git a/src/frontend/index.html b/src/frontend/index.html index f6592f5..3f0a46c 100644 --- a/src/frontend/index.html +++ b/src/frontend/index.html @@ -57,7 +57,7 @@
diff --git a/src/frontend/index.js b/src/frontend/index.js index 0026ce8..b2d3662 100644 --- a/src/frontend/index.js +++ b/src/frontend/index.js @@ -7,7 +7,7 @@ let localIps = []; let currentSessionId = -1; let sessionsData = new Map(); let msgHistory = new Map(); -let pendingFiles = new Map(); +let pendingFilesTransfers = new Map(); function onClickSession(event) { let sessionId = event.currentTarget.getAttribute("data-sessionId"); @@ -163,9 +163,16 @@ document.getElementById("logout").onclick = function() { showPopup(mainDiv); } document.getElementById("attach_file").onchange = function(event) { - let file = event.target.files[0]; - if (file.size > 32760000) { - if (pendingFiles.has(currentSessionId)) { + let files = event.target.files; + let useLargeFileTransfer = false; + for (let i=0; i 32760000) { + useLargeFileTransfer = true; + break; + } + } + if (useLargeFileTransfer) { + if (pendingFilesTransfers.has(currentSessionId)) { let mainDiv = document.createElement("div"); mainDiv.appendChild(generatePopupWarningTitle()); let p = document.createElement("p"); @@ -173,28 +180,39 @@ document.getElementById("attach_file").onchange = function(event) { mainDiv.appendChild(p); showPopup(mainDiv); } else { - pendingFiles.set(currentSessionId, { - "file": file, - "name": file.name, - "size": file.size, + let fileTransfers = []; + let fileInfo = ""; + for (let i=0; i { - if (response.ok) { - response.text().then(uuid => onFileSent(currentSessionId, uuid, file.name)); - } else { - console.log(response); - } - }); + for (let i=0; i { + if (response.ok) { + response.text().then(uuid => onFileSent(currentSessionId, uuid, files[i].name)); + } else { + console.log(response); + } + }); + }; } } document.getElementById("file_cancel").onclick = function() { @@ -270,10 +288,10 @@ profile_div.onclick = function() { if (isIdentityProtected || newPassword_set) { //don't change password if identity is not protected and new password is blank let msg = "change_password"; if (isIdentityProtected) { - msg += " "+btoa(inputs[0].value); + msg += " "+b64EncodeUnicode(inputs[0].value); } if (newPassword_set) { - msg += " "+btoa(newPassword.value); + msg += " "+b64EncodeUnicode(newPassword.value); } socket.send(msg); } else { @@ -357,9 +375,9 @@ document.querySelector("#refresher button").onclick = function() { function humanFileSize(bytes, dp=1) { const thresh = 1000; if (Math.abs(bytes) < thresh) { - return bytes + ' B'; + return bytes + " B"; } - const units = ['kB', 'MB', 'GB', 'TB', 'PB', 'EB', 'ZB', 'YB']; + const units = ["kB", "MB", "GB", "TB", "PB", "EB", "ZB", "YB"]; let u = -1; const r = 10**dp; do { @@ -368,6 +386,18 @@ function humanFileSize(bytes, dp=1) { } while (Math.round(Math.abs(bytes) * r) / r >= thresh && u < units.length - 1); return bytes.toFixed(dp) + ' ' + units[u]; } +//source: https://stackoverflow.com/a/30106551 +function b64EncodeUnicode(str) { + return btoa(encodeURIComponent(str).replace(/%([0-9A-F]{2})/g, + function toSolidBytes(match, p1) { + return String.fromCharCode('0x' + p1); + })); +} +function b64DecodeUnicode(str) { + return decodeURIComponent(atob(str).split('').map(function(c) { + return '%' + ("00" + c.charCodeAt(0).toString(16)).slice(-2); + }).join('')); +} //source: https://www.w3schools.com/js/js_cookies.asp function getCookie(cname) { var name = cname + "="; @@ -421,20 +451,20 @@ socket.onmessage = function(msg) { case "file": onFileReceived(args[1], args[2], msg.data.slice(args[0].length+args[1].length+args[2].length+3)); break; - case "file_transfer": - onNewFileTransfer(args[1], args[2], args[3], args[4], args[5], args[6]); + case "files_transfer": + onNewFilesTransfer(args[1], args[2], msg.data.slice(args[0].length+args[1].length+args[2].length+3)); break; - case "ask_large_file": - onAskLargeFile(args[1], args[2], args[3], args[4]); + case "ask_large_files": + onAskLargeFiles(args[1], args[2], msg.data.slice(args[0].length+args[1].length+args[2].length+3)); break; - case "file_accepted": - onFileAccepted(args[1]); + case "files_accepted": + onFilesAccepted(args[1]); break; case "aborted": - onFileAborted(args[1]); + onFilesTransferAborted(args[1]); break; case "inc_file_transfer": - onIncFileTransfer(args[1], parseInt(args[2])); + onIncFilesTransfer(args[1], parseInt(args[2])); break; case "load_sent_msg": onMsgLoad(args[1], args[2] === "true", msg.data.slice(args[0].length+args[1].length+args[2].length+3)); @@ -532,33 +562,47 @@ function onNewMessage(sessionId, outgoing, msg) { msgHistory.get(sessionId).push([outgoing, false, msg]); onMsgOrFileReceived(sessionId, outgoing, msg); } -function onNewFileTransfer(sessionId, encodedFileName, fileSize, state, transferred, lastChunk) { - pendingFiles.set(sessionId, { - "file": undefined, - "name": atob(encodedFileName), - "size": parseInt(fileSize), - "state": state, - "transferred": parseInt(transferred), - "lastChunk": parseInt(lastChunk) +function onNewFilesTransfer(sessionId, index, filesInfo) { + let split = filesInfo.split(' '); + let files = []; + for (let i=0; i { - if (!response.ok) { - console.log(response); - } - }); +function onFilesAccepted(sessionId) { + if (pendingFilesTransfers.has(sessionId)) { + sendNextLargeFile(sessionId); } } -function onFileAborted(sessionId) { - if (pendingFiles.has(sessionId)) { - pendingFiles.get(sessionId).state = "aborted"; +function onFilesTransferAborted(sessionId) { + if (pendingFilesTransfers.has(sessionId)) { + pendingFilesTransfers.get(sessionId).state = "aborted"; if (sessionId == currentSessionId) { displayChatBottom(); } } } -function onIncFileTransfer(sessionId, chunkSize) { - if (pendingFiles.has(sessionId)) { - let file = pendingFiles.get(sessionId); - file.transferred += chunkSize; +function onIncFilesTransfer(sessionId, chunkSize) { + if (pendingFilesTransfers.has(sessionId)) { + let filesTransfer = pendingFilesTransfers.get(sessionId); + let fileTransfer = filesTransfer.files[filesTransfer.index]; + fileTransfer.transferred += chunkSize; let now = Date.now(); - let speed = chunkSize/(now-file.lastChunk)*1000; - file.lastChunk = now; - if (file.transferred >= file.size) { - file.state = "completed"; - } else { - file.state = "transferring"; + let speed = chunkSize/(now-fileTransfer.lastChunk)*1000; + fileTransfer.lastChunk = now; + if (fileTransfer.transferred >= fileTransfer.size) { + if (filesTransfer.index == filesTransfer.files.length-1) { + filesTransfer.state = "completed"; + socket.send("sending_ended "+sessionId); + } else { + filesTransfer.index += 1; + if (typeof fileTransfer.file !== "undefined") { + sendNextLargeFile(sessionId); + } + } } if (currentSessionId == sessionId) { displayChatBottom(speed); @@ -653,7 +698,7 @@ function onFileLoad(sessionId, outgoing, uuid, fileName) { } } function onDisconnected(sessionId) { - pendingFiles.delete(sessionId); + pendingFilesTransfers.delete(sessionId); let session = sessionsData.get(sessionId); if (session.isContact) { session.isOnline = false; @@ -696,6 +741,23 @@ function onPasswordChanged(success, isProtected) { } } +function sendNextLargeFile(sessionId) { + let filesTransfer = pendingFilesTransfers.get(sessionId); + filesTransfer.state = "transferring"; + let fileTransfer = filesTransfer.files[filesTransfer.index]; + fileTransfer.lastChunk = Date.now(); + if (currentSessionId == sessionId) { + displayChatBottom(); + } + let formData = new FormData(); + formData.append("session_id", currentSessionId); + formData.append("", fileTransfer.file); + fetch("/send_large_file", {method: "POST", body: formData}).then(response => { + if (!response.ok) { + console.log(response); + } + }); +} function beautifyFingerprint(f) { for (let i=4; idiv").style.width = 0; - switch (file.state) { + switch (filesTransfer.state) { case "transferring": fileCancel.removeAttribute("style"); //show fileStatus.style.display = "none"; @@ -921,19 +980,13 @@ function displayChatBottom(speed = undefined) { case "waiting": fileStatus.textContent = "Waiting for peer confirmation..."; break; - case "accepted": - fileStatus.textContent = "Downloading file..."; - break; case "aborted": fileStatus.textContent = "Transfer aborted."; - pendingFiles.delete(currentSessionId); - break; - case "sending": - fileStatus.textContent = "Sending file..."; + pendingFilesTransfers.delete(currentSessionId); break; case "completed": fileStatus.textContent = "Transfer completed."; - pendingFiles.delete(currentSessionId); + pendingFilesTransfers.delete(currentSessionId); } fileTransfer.classList.add("active"); } else { diff --git a/src/main.rs b/src/main.rs index 709b453..4b81f63 100644 --- a/src/main.rs +++ b/src/main.rs @@ -20,7 +20,7 @@ use platform_dirs::AppDirs; use zeroize::Zeroize; use utils::escape_double_quote; use identity::Identity; -use session_manager::{SessionManager, protocol}; +use session_manager::{SessionManager, SessionCommand, protocol}; use ui_interface::UiConnection; async fn start_websocket_server(global_vars: Arc>) -> u16 { @@ -104,7 +104,7 @@ async fn websocket_worker(mut ui_connection: UiConnection, global_vars: Arc { let session_id: usize = args[1].parse().unwrap(); let buffer = protocol::new_message(msg[args[0].len()+args[1].len()+2..].to_string()); - match session_manager.send_to(&session_id, buffer.clone()).await { + match session_manager.send_command(&session_id, SessionCommand::Send { + buff: buffer.clone() + }).await { Ok(_) => session_manager.store_msg(&session_id, true, buffer), Err(e) => print_error!(e) } } - "large_file" => { + "large_files" => { let session_id: usize = args[1].parse().unwrap(); - let file_size: u64 = args[2].parse().unwrap(); - let file_name = &msg[args[0].len()+args[1].len()+args[2].len()+3..]; - if let Err(e) = session_manager.send_to(&session_id, protocol::ask_large_file(file_size, file_name)).await { + let mut file_info = Vec::new(); + for n in (2..args.len()).step_by(2) { + file_info.push((args[n].parse::().unwrap(), base64::decode(args[n+1]).unwrap())); + } + if let Err(e) = session_manager.send_command(&session_id, SessionCommand::Send { + buff: protocol::ask_large_files(file_info) + }).await { print_error!(e); } } "download" => { let session_id: usize = args[1].parse().unwrap(); - if let Err(e) = session_manager.send_to(&session_id, vec![protocol::Headers::ACCEPT_LARGE_FILE]).await { + if let Err(e) = session_manager.send_command(&session_id, SessionCommand::Send { + buff: vec![protocol::Headers::ACCEPT_LARGE_FILES] + }).await { print_error!(e); } } "abort" => { let session_id: usize = args[1].parse().unwrap(); - if let Err(e) = session_manager.send_to(&session_id, vec![protocol::Headers::ABORT_FILE_TRANSFER]).await { + if let Err(e) = session_manager.send_command(&session_id, SessionCommand::Send { + buff: vec![protocol::Headers::ABORT_FILES_TRANSFER] + }).await { + print_error!(e); + } + } + "sending_ended" => { + let session_id: usize = args[1].parse().unwrap(); + if let Err(e) = session_manager.send_command(&session_id, SessionCommand::SendingEnded).await { print_error!(e); } } @@ -324,7 +340,9 @@ async fn handle_send_file(req: HttpRequest, mut payload: Multipart) -> HttpRespo while let Some(Ok(chunk)) = field.next().await { buffer.extend(chunk); } - match global_vars_read.session_manager.send_to(&session_id, protocol::file(filename, &buffer)).await { + match global_vars_read.session_manager.send_command(&session_id, SessionCommand::Send { + buff: protocol::file(filename, &buffer) + }).await { Ok(_) => { match global_vars_read.session_manager.store_file(&session_id, &buffer) { Ok(file_uuid) => { @@ -359,15 +377,18 @@ async fn handle_send_file(req: HttpRequest, mut payload: Multipart) -> HttpRespo break; } } - if let Err(e) = global_vars_read.session_manager.encrypt_file_chunk(&session_id, chunk_buffer.clone()).await { + if let Err(e) = global_vars_read.session_manager.send_command(&session_id, SessionCommand::EncryptFileChunk{ + plain_text: chunk_buffer.clone() + }).await { print_error!(e); return HttpResponse::InternalServerError().finish(); } - let last_chunk = chunk_buffer.len() < constants::FILE_CHUNK_SIZE; if !match ack_receiver.recv().await { Some(should_continue) => { //send previous encrypted chunk even if transfert is aborted to keep PSEC nonces syncrhonized - if let Err(e) = global_vars_read.session_manager.send_encrypted_file_chunk(&session_id, ack_sender.clone(), last_chunk).await { + if let Err(e) = global_vars_read.session_manager.send_command(&session_id, SessionCommand::SendEncryptedFileChunk { + ack_sender: ack_sender.clone() + }).await { print_error!(e); false } else { @@ -378,7 +399,7 @@ async fn handle_send_file(req: HttpRequest, mut payload: Multipart) -> HttpRespo } { return HttpResponse::InternalServerError().finish() } - if last_chunk { + if chunk_buffer.len() < constants::FILE_CHUNK_SIZE { break; } else { chunk_buffer.truncate(1); @@ -540,8 +561,19 @@ fn handle_static(req: HttpRequest) -> HttpResponse { if splits[0] == "static" { let mut response_builder = HttpResponse::Ok(); match splits[1] { - "index.js" => return response_builder.content_type(JS_CONTENT_TYPE).body(include_str!("frontend/index.js")), - "index.css" => return response_builder.body(include_str!("frontend/index.css")), + "index.js" => { + response_builder.content_type(JS_CONTENT_TYPE); + #[cfg(debug_assertions)] + return response_builder.body(fs::read_to_string("src/frontend/index.js").unwrap()); + #[cfg(not(debug_assertions))] + return response_builder.body(include_str!("frontend/index.js")); + } + "index.css" => { + #[cfg(debug_assertions)] + return response_builder.body(fs::read_to_string("src/frontend/index.css").unwrap()); + #[cfg(not(debug_assertions))] + return response_builder.body(include_str!("frontend/index.css")); + } "imgs" => { if splits[2] == "icons" && splits.len() <= 5 { let color = if splits.len() == 5 { diff --git a/src/session_manager/mod.rs b/src/session_manager/mod.rs index 5c570df..6b98d4c 100644 --- a/src/session_manager/mod.rs +++ b/src/session_manager/mod.rs @@ -1,7 +1,7 @@ mod session; pub mod protocol; -use std::{collections::HashMap, net::{IpAddr, SocketAddr}, io::{self, Write}, str::from_utf8, fs::OpenOptions, sync::{Mutex, RwLock, Arc}}; +use std::{collections::HashMap, fs::OpenOptions, io::{self, Write}, net::{IpAddr, SocketAddr}, path::PathBuf, str::from_utf8, sync::{Mutex, RwLock, Arc}}; use tokio::{net::{TcpListener, TcpStream}, sync::mpsc::{self, Sender, Receiver}}; use libmdns::Service; use strum_macros::Display; @@ -24,36 +24,36 @@ pub enum SessionError { Unknown, } -enum SessionCommand { +pub enum SessionCommand { Send { buff: Vec, }, SendEncryptedFileChunk { - sender: Sender, - last_chunk: bool, + ack_sender: Sender, }, EncryptFileChunk { plain_text: Vec, }, + SendingEnded, Close, } -#[derive(Copy, Clone, PartialEq, Eq)] -pub enum FileState { - ASKING, - ACCEPTED, - TRANSFERRING, -} #[derive(Clone)] pub struct LargeFileDownload { pub file_name: String, - pub download_location: String, pub file_size: u64, - pub state: FileState, pub transferred: u64, pub last_chunk: u128, } +#[derive(Clone)] +pub struct LargeFilesDownload { + pub download_location: PathBuf, + pub accepted: bool, + pub index: usize, + pub files: Vec, +} + #[derive(Clone)] pub struct SessionData { pub name: String, @@ -61,7 +61,7 @@ pub struct SessionData { pub peer_public_key: [u8; PUBLIC_KEY_LENGTH], pub ip: IpAddr, sender: Sender, - pub file_download: Option, + pub files_download: Option, } pub struct SessionManager { @@ -79,7 +79,7 @@ pub struct SessionManager { impl SessionManager { - fn with_ui_connection(&self, f: F) where F: Fn(&mut UiConnection) { + fn with_ui_connection(&self, f: F) where F: FnOnce(&mut UiConnection) { let mut ui_connection_opt = self.ui_connection.lock().unwrap(); match ui_connection_opt.as_mut() { Some(ui_connection) => if ui_connection.is_valid { @@ -119,44 +119,9 @@ impl SessionManager { } } - pub async fn encrypt_file_chunk(&self, session_id: &usize, plain_text: Vec) -> Result<(), SessionError> { + pub async fn send_command(&self, session_id: &usize, session_command: SessionCommand) -> Result<(), SessionError> { if let Some(sender) = self.get_session_sender(session_id) { - match sender.send(SessionCommand::EncryptFileChunk { - plain_text, - }).await { - Ok(_) => Ok(()), - Err(e) => { - print_error!(e); - Err(SessionError::BrokenPipe) - } - } - } else { - Err(SessionError::InvalidSessionId) - } - } - - pub async fn send_encrypted_file_chunk(&self, session_id: &usize, ack_sender: Sender, last_chunk: bool) -> Result<(), SessionError> { - if let Some(sender) = self.get_session_sender(session_id) { - match sender.send(SessionCommand::SendEncryptedFileChunk { - sender: ack_sender, - last_chunk - }).await { - Ok(_) => Ok(()), - Err(e) => { - print_error!(e); - Err(SessionError::BrokenPipe) - } - } - } else { - Err(SessionError::InvalidSessionId) - } - } - - pub async fn send_to(&self, session_id: &usize, message: Vec) -> Result<(), SessionError> { - if let Some(sender) = self.get_session_sender(session_id) { - match sender.send(SessionCommand::Send { - buff: message - }).await { + match sender.send(session_command).await { Ok(_) => Ok(()), Err(e) => { print_error!(e); @@ -179,10 +144,10 @@ impl SessionManager { async fn send_msg(&self, session_id: usize, session_write: &mut SessionWrite, buff: &[u8], is_sending: &mut bool, file_ack_sender: Option<&Sender>) -> Result<(), SessionError> { session_write.encrypt_and_send(&buff).await?; - if buff[0] == protocol::Headers::ACCEPT_LARGE_FILE { - self.sessions.write().unwrap().get_mut(&session_id).unwrap().file_download.as_mut().unwrap().state = FileState::ACCEPTED; - } else if buff[0] == protocol::Headers::ABORT_FILE_TRANSFER { - self.sessions.write().unwrap().get_mut(&session_id).unwrap().file_download = None; + if buff[0] == protocol::Headers::ACCEPT_LARGE_FILES { + self.sessions.write().unwrap().get_mut(&session_id).unwrap().files_download.as_mut().unwrap().accepted = true; + } else if buff[0] == protocol::Headers::ABORT_FILES_TRANSFER { + self.sessions.write().unwrap().get_mut(&session_id).unwrap().files_download = None; *is_sending = false; if let Some(sender) = file_ack_sender { if let Err(e) = sender.send(false).await { @@ -198,10 +163,10 @@ impl SessionManager { async fn session_worker(&self, session_id: usize, mut receiver: Receiver, session: Session) { //used when we receive large file - let mut local_file_path = None; let mut local_file_handle = None; //used when we send large file let mut next_chunk: Option> = None; + let mut last_chunks_sizes: Option> = None; let mut file_ack_sender: Option> = None; let mut msg_queue = Vec::new(); let mut is_sending = false; @@ -230,13 +195,14 @@ impl SessionManager { protocol::Headers::TELL_NAME => { match from_utf8(&buffer[1..]) { Ok(new_name) => { + let new_name = new_name.replace('\n', " "); self.with_ui_connection(|ui_connection| { - ui_connection.on_name_told(&session_id, new_name); + ui_connection.on_name_told(&session_id, &new_name); }); let mut loaded_contacts = self.loaded_contacts.write().unwrap(); if let Some(contact) = loaded_contacts.get_mut(&session_id) { contact.name = new_name.to_string(); - if let Err(e) = self.identity.read().unwrap().as_ref().unwrap().change_contact_name(&contact.uuid, new_name) { + if let Err(e) = self.identity.read().unwrap().as_ref().unwrap().change_contact_name(&contact.uuid, &new_name) { print_error!(e); } } else { @@ -246,53 +212,52 @@ impl SessionManager { Err(e) => print_error!(e) } } - protocol::Headers::ASK_LARGE_FILE => { - if self.sessions.read().unwrap().get(&session_id).unwrap().file_download.is_none() && !is_sending { //don't accept 2 file transfers at the same time - if let Some((file_size, file_name)) = protocol::parse_ask_file(&buffer) { - let download_dir = UserDirs::new().unwrap().download_dir; - self.sessions.write().unwrap().get_mut(&session_id).unwrap().file_download = Some(LargeFileDownload{ - file_name: file_name.clone(), - download_location: download_dir.to_str().unwrap().to_string(), - file_size, - state: FileState::ASKING, - transferred: 0, - last_chunk: get_unix_timestamp(), + protocol::Headers::ASK_LARGE_FILES => { + if self.sessions.read().unwrap().get(&session_id).unwrap().files_download.is_none() && !is_sending { //don't accept 2 file transfers at the same time + if let Some(files_info) = protocol::parse_ask_files(&buffer) { + let download_location = UserDirs::new().unwrap().download_dir; + let files: Vec = files_info.into_iter().map(|info| { + LargeFileDownload { + file_name: info.1, + file_size: info.0, + transferred: 0, + last_chunk: get_unix_timestamp(), + } + }).collect(); + self.sessions.write().unwrap().get_mut(&session_id).unwrap().files_download = Some(LargeFilesDownload { + download_location: download_location.clone(), + accepted: false, + index: 0, + files: files.clone(), }); - local_file_path = Some(get_not_used_path(&file_name, &download_dir)); self.with_ui_connection(|ui_connection| { - ui_connection.on_ask_large_file(&session_id, file_size, &file_name, download_dir.to_str().unwrap()); + ui_connection.on_ask_large_files(&session_id, &files, download_location.to_str().unwrap()); }) } - } else if let Err(e) = session_write.encrypt_and_send(&[protocol::Headers::ABORT_FILE_TRANSFER]).await { + } else if let Err(e) = session_write.encrypt_and_send(&[protocol::Headers::ABORT_FILES_TRANSFER]).await { print_error!(e); break; } } protocol::Headers::LARGE_FILE_CHUNK => { - let state = { - let sessions = self.sessions.read().unwrap(); - match sessions.get(&session_id).unwrap().file_download.as_ref() { - Some(file_transfer) => Some(file_transfer.state), - None => None - } - }; let mut should_accept_chunk = false; - if let Some(state) = state { - if state == FileState::ACCEPTED { - if let Some(file_path) = local_file_path.as_ref() { - match OpenOptions::new().append(true).create(true).open(file_path) { - Ok(file) => { - local_file_handle = Some(file); - let mut sessions = self.sessions.write().unwrap(); - let file_transfer = sessions.get_mut(&session_id).unwrap().file_download.as_mut().unwrap(); - file_transfer.state = FileState::TRANSFERRING; - should_accept_chunk = true; + { + let sessions = self.sessions.read().unwrap(); + if let Some(files_transfer) = sessions.get(&session_id).unwrap().files_download.as_ref() { + if files_transfer.accepted { + if local_file_handle.is_some() { + should_accept_chunk = true; + } else { + let local_file_path = get_not_used_path(&files_transfer.files[files_transfer.index].file_name, &files_transfer.download_location); + match OpenOptions::new().append(true).create(true).open(local_file_path) { + Ok(file) => { + local_file_handle = Some(file); + should_accept_chunk = true; + } + Err(e) => print_error!(e) } - Err(e) => print_error!(e) } } - } else if state == FileState::TRANSFERRING { - should_accept_chunk = true; } } if should_accept_chunk { @@ -303,12 +268,16 @@ impl SessionManager { let chunk_size = (buffer.len()-1) as u64; { let mut sessions = self.sessions.write().unwrap(); - let file_transfer = sessions.get_mut(&session_id).unwrap().file_download.as_mut().unwrap(); + let files_transfer = sessions.get_mut(&session_id).unwrap().files_download.as_mut().unwrap(); + let file_transfer = &mut files_transfer.files[files_transfer.index]; file_transfer.last_chunk = get_unix_timestamp(); file_transfer.transferred += chunk_size; if file_transfer.transferred >= file_transfer.file_size { //we downloaded all the file - sessions.get_mut(&session_id).unwrap().file_download = None; - local_file_path = None; + if files_transfer.index+1 == files_transfer.files.len() { + sessions.get_mut(&session_id).unwrap().files_download = None; + } else { + files_transfer.index += 1; + } local_file_handle = None; } } @@ -317,7 +286,7 @@ impl SessionManager { break; } self.with_ui_connection(|ui_connection| { - ui_connection.inc_file_transfer(&session_id, chunk_size); + ui_connection.inc_files_transfer(&session_id, chunk_size); }); is_success = true; } @@ -325,10 +294,9 @@ impl SessionManager { } } if !is_success { - self.sessions.write().unwrap().get_mut(&session_id).unwrap().file_download = None; - local_file_path = None; + self.sessions.write().unwrap().get_mut(&session_id).unwrap().files_download = None; local_file_handle = None; - if let Err(e) = session_write.encrypt_and_send(&[protocol::Headers::ABORT_FILE_TRANSFER]).await { + if let Err(e) = session_write.encrypt_and_send(&[protocol::Headers::ABORT_FILES_TRANSFER]).await { print_error!(e); break; } @@ -337,9 +305,10 @@ impl SessionManager { } protocol::Headers::ACK_CHUNK => { if let Some(sender) = file_ack_sender.clone() { - if let Some(next_chunk) = next_chunk.as_ref() { + if let Some(last_chunks_sizes) = last_chunks_sizes.as_mut() { + let chunk_size = last_chunks_sizes.remove(0); self.with_ui_connection(|ui_connection| { - ui_connection.inc_file_transfer(&session_id, next_chunk.len() as u64); + ui_connection.inc_files_transfer(&session_id, chunk_size.into()); }); } if sender.send(true).await.is_err() { @@ -347,15 +316,14 @@ impl SessionManager { } } } - protocol::Headers::ABORT_FILE_TRANSFER => { + protocol::Headers::ABORT_FILES_TRANSFER => { if let Some(sender) = file_ack_sender.clone() { if let Err(e) = sender.send(false).await { print_error!(e); } is_sending = false; } - self.sessions.write().unwrap().get_mut(&session_id).unwrap().file_download = None; - local_file_path = None; + self.sessions.write().unwrap().get_mut(&session_id).unwrap().files_download = None; local_file_handle = None; self.with_ui_connection(|ui_connection| { ui_connection.on_received(&session_id, &buffer); @@ -387,8 +355,9 @@ impl SessionManager { let is_classical_message = header == protocol::Headers::MESSAGE || header == protocol::Headers::FILE; if is_classical_message { self.set_seen(session_id, false); - } else if header == protocol::Headers::ACCEPT_LARGE_FILE { + } else if header == protocol::Headers::ACCEPT_LARGE_FILES { is_sending = true; + last_chunks_sizes = Some(Vec::new()); } self.with_ui_connection(|ui_connection| { ui_connection.on_received(&session_id, buffer.as_ref().unwrap()); @@ -421,12 +390,15 @@ impl SessionManager { } } } - SessionCommand::EncryptFileChunk { plain_text } => next_chunk = Some(session_write.encrypt(&plain_text)), - SessionCommand::SendEncryptedFileChunk { sender, last_chunk } => { + SessionCommand::EncryptFileChunk { plain_text } => { + last_chunks_sizes.as_mut().unwrap().push(plain_text.len() as u32); + next_chunk = Some(session_write.encrypt(&plain_text)); + } + SessionCommand::SendEncryptedFileChunk { ack_sender } => { if let Some(chunk) = next_chunk.as_ref() { match session_write.socket_write(chunk).await { Ok(_) => { - file_ack_sender = Some(sender); + file_ack_sender = Some(ack_sender); //once the pre-encrypted chunk is sent, we can send the pending messages while msg_queue.len() > 0 { let msg = msg_queue.remove(0); @@ -435,9 +407,6 @@ impl SessionManager { break; } } - if last_chunk { - is_sending = false; - } } Err(e) => { print_error!(e); @@ -446,13 +415,10 @@ impl SessionManager { } } } + SessionCommand::SendingEnded => is_sending = false, SessionCommand::Close => break } } - else => { - println!("{} dead", session_id); - break; - } } } } @@ -508,7 +474,7 @@ impl SessionManager { peer_public_key, ip, sender: sender, - file_download: None, + files_download: None, }; let mut session_id = None; for (i, contact) in session_manager.loaded_contacts.read().unwrap().iter() { diff --git a/src/session_manager/protocol.rs b/src/session_manager/protocol.rs index 260f578..24c61b9 100644 --- a/src/session_manager/protocol.rs +++ b/src/session_manager/protocol.rs @@ -8,11 +8,11 @@ impl Headers { pub const ASK_NAME: u8 = 0x01; pub const TELL_NAME: u8 = 0x02; pub const FILE: u8 = 0x03; - pub const ASK_LARGE_FILE: u8 = 0x04; - pub const ACCEPT_LARGE_FILE: u8 = 0x05; + pub const ASK_LARGE_FILES: u8 = 0x04; + pub const ACCEPT_LARGE_FILES: u8 = 0x05; pub const LARGE_FILE_CHUNK: u8 = 0x06; pub const ACK_CHUNK: u8 = 0x07; - pub const ABORT_FILE_TRANSFER: u8 = 0x08; + pub const ABORT_FILES_TRANSFER: u8 = 0x08; } pub fn new_message(message: String) -> Vec { @@ -42,20 +42,41 @@ pub fn parse_file<'a>(buffer: &'a [u8]) -> Option<(&'a [u8], &'a [u8])> { None } -pub fn ask_large_file(file_size: u64, file_name: &str) -> Vec { - [&[Headers::ASK_LARGE_FILE], &file_size.to_be_bytes()[..], file_name.as_bytes()].concat() +pub fn ask_large_files(file_info: Vec<(u64, Vec)>) -> Vec { + let mut buff = vec![Headers::ASK_LARGE_FILES]; + file_info.into_iter().for_each(|info| { + buff.extend(&info.0.to_be_bytes()); + buff.extend(&(info.1.len() as u16).to_be_bytes()); + buff.extend(info.1); + }); + buff } -pub fn parse_ask_file(buffer: &[u8]) -> Option<(u64, String)> { - if buffer.len() > 9 { - let file_size = u64::from_be_bytes(buffer[1..9].try_into().unwrap()); - match from_utf8(&buffer[9..]) { - Ok(file_name) => { - let file_name = sanitize_filename::sanitize(file_name); - return Some((file_size, file_name)); +pub fn parse_ask_files(buffer: &[u8]) -> Option> { + let mut files_info = Vec::new(); + let mut n = 1; + while n < buffer.len() { + if buffer[n..].len() > 10 { //8 + 2 + let file_size = u64::from_be_bytes(buffer[n..n+8].try_into().unwrap()); + let file_name_len = u16::from_be_bytes(buffer[n+8..n+10].try_into().unwrap()) as usize; + if buffer.len() >= n+10+file_name_len { + match from_utf8(&buffer[n+10..n+10+file_name_len]) { + Ok(file_name) => { + let file_name = sanitize_filename::sanitize(file_name); + files_info.push((file_size, file_name)); + n += 10+file_name_len; + } + Err(e) => { + print_error!(e); + return None + } + } + } else { + return None } - Err(e) => print_error!(e), + } else { + return None } } - None + Some(files_info) } \ No newline at end of file diff --git a/src/ui_interface.rs b/src/ui_interface.rs index 40956cb..eba2ea2 100644 --- a/src/ui_interface.rs +++ b/src/ui_interface.rs @@ -1,12 +1,12 @@ use std::net::{IpAddr, TcpStream}; use tungstenite::{WebSocket, protocol::Role, Message}; -use crate::{protocol, session_manager::LargeFileDownload}; +use crate::{protocol, session_manager::{LargeFileDownload, LargeFilesDownload}}; mod ui_messages { use std::{fmt::Display, iter::FromIterator, net::IpAddr, str::from_utf8}; use tungstenite::Message; use uuid::Uuid; - use crate::{print_error, session_manager::{protocol, LargeFileDownload, FileState}, utils::to_uuid_bytes}; + use crate::{print_error, session_manager::{LargeFileDownload, LargeFilesDownload, protocol}, utils::to_uuid_bytes}; const ON_NEW_MESSAGE: &str = "new_message"; const LOAD_SENT_MESSAGE: &str = "load_sent_msg"; @@ -45,30 +45,40 @@ mod ui_messages { } } } - pub fn new_file_transfer(session_id: &usize, file_transfer: &LargeFileDownload) -> Message { - if file_transfer.state == FileState::ASKING { - on_ask_large_file(session_id, file_transfer.file_size, &file_transfer.file_name, &file_transfer.download_location) - } else { - Message::from(format!( - "file_transfer {} {} {} {} {} {}", + pub fn new_files_transfer(session_id: &usize, files_transfer: &LargeFilesDownload) -> Message { + if files_transfer.accepted { + let mut s = format!( + "files_transfer {} {}", session_id, - base64::encode(&file_transfer.file_name), - file_transfer.file_size, - if file_transfer.state == FileState::ACCEPTED { - "accepted" - } else { - "transferring" - }, - file_transfer.transferred, - file_transfer.last_chunk, - )) + files_transfer.index + ); + files_transfer.files.iter().for_each(|file| { + s.push_str(&format!( + " {} {} {} {}", + base64::encode(&file.file_name), + file.file_size, + file.transferred, + file.last_chunk, + )); + }); + Message::from(s) + } else { + on_ask_large_files(session_id, &files_transfer.files, files_transfer.download_location.to_str().unwrap()) } } - pub fn on_ask_large_file(session_id: &usize, file_size: u64, file_name: &str, download_location: &str) -> Message { - Message::from(format!("ask_large_file {} {} {} {}", session_id, file_size, base64::encode(file_name), base64::encode(download_location))) + pub fn on_ask_large_files(session_id: &usize, files: &Vec, download_location: &str) -> Message { + let mut s = format!("ask_large_files {} {}", session_id, base64::encode(download_location)); + files.into_iter().for_each(|file| { + s.push_str(&format!( + " {} {}", + base64::encode(&file.file_name), + file.file_size, + )); + }); + Message::from(s) } - pub fn on_large_file_accepted(session_id: &usize) -> Message { - simple_event("file_accepted", session_id) + pub fn on_large_files_accepted(session_id: &usize) -> Message { + simple_event("files_accepted", session_id) } pub fn on_file_transfer_aborted(session_id: &usize) -> Message { simple_event("aborted", session_id) @@ -76,7 +86,7 @@ mod ui_messages { pub fn on_new_message(session_id: &usize, outgoing: bool, buffer: &[u8]) -> Option { new_message(ON_NEW_MESSAGE, session_id, outgoing, &buffer[1..]) } - pub fn inc_file_transfer(session_id: &usize, chunk_size: u64) -> Message { + pub fn inc_files_transfer(session_id: &usize, chunk_size: u64) -> Message { Message::from(format!("inc_file_transfer {} {}", session_id, chunk_size)) } pub fn load_msg(session_id: &usize, outgoing: bool, buffer: &[u8]) -> Option { @@ -138,16 +148,16 @@ impl UiConnection { let ui_message = match buffer[0] { protocol::Headers::MESSAGE => ui_messages::on_new_message(session_id, false, buffer), protocol::Headers::FILE => ui_messages::on_file_received(session_id, buffer), - protocol::Headers::ACCEPT_LARGE_FILE => Some(ui_messages::on_large_file_accepted(session_id)), - protocol::Headers::ABORT_FILE_TRANSFER => Some(ui_messages::on_file_transfer_aborted(session_id)), + protocol::Headers::ACCEPT_LARGE_FILES => Some(ui_messages::on_large_files_accepted(session_id)), + protocol::Headers::ABORT_FILES_TRANSFER => Some(ui_messages::on_file_transfer_aborted(session_id)), _ => None }; if ui_message.is_some() { self.write_message(ui_message.unwrap()) } } - pub fn on_ask_large_file(&mut self, session_id: &usize, file_size: u64, file_name: &str, download_location: &str) { - self.write_message(ui_messages::on_ask_large_file(session_id, file_size, file_name, download_location)) + pub fn on_ask_large_files(&mut self, session_id: &usize, files: &Vec, download_location: &str) { + self.write_message(ui_messages::on_ask_large_files(session_id, files, download_location)) } pub fn on_msg_sent(&mut self, session_id: usize, buffer: &[u8]) { match buffer[0] { @@ -155,14 +165,14 @@ impl UiConnection { Some(msg) => self.write_message(msg), None => {} } - protocol::Headers::ABORT_FILE_TRANSFER => self.write_message(ui_messages::on_file_transfer_aborted(&session_id)), + protocol::Headers::ABORT_FILES_TRANSFER => self.write_message(ui_messages::on_file_transfer_aborted(&session_id)), _ => {} } } - pub fn on_new_session(&mut self, session_id: &usize, name: &str, outgoing: bool, fingerprint: &str, ip: IpAddr, file_transfer: Option<&LargeFileDownload>) { + pub fn on_new_session(&mut self, session_id: &usize, name: &str, outgoing: bool, fingerprint: &str, ip: IpAddr, files_transfer: Option<&LargeFilesDownload>) { self.write_message(ui_messages::on_new_session(session_id, name, outgoing, fingerprint, ip)); - if let Some(file_transfer) = file_transfer { - self.write_message(ui_messages::new_file_transfer(session_id, file_transfer)); + if let Some(files_transfer) = files_transfer { + self.write_message(ui_messages::new_files_transfer(session_id, files_transfer)); } } pub fn on_disconnected(&mut self, session_id: &usize) { @@ -172,8 +182,8 @@ impl UiConnection { self.write_message(ui_messages::on_name_told(session_id, name)); } - pub fn inc_file_transfer(&mut self, session_id: &usize, chunk_size: u64) { - self.write_message(ui_messages::inc_file_transfer(session_id, chunk_size)); + pub fn inc_files_transfer(&mut self, session_id: &usize, chunk_size: u64) { + self.write_message(ui_messages::inc_files_transfer(session_id, chunk_size)); } pub fn set_as_contact(&mut self, session_id: usize, name: &str, verified: bool, fingerprint: &str) { self.write_message(ui_messages::set_as_contact(session_id, name, verified, fingerprint));