Fix large file transfers bugs

This commit is contained in:
Matéo Duparc 2021-04-30 18:54:48 +02:00
parent 33a107d347
commit a765c2565f
Signed by: hardcoresushi
GPG Key ID: 007F84120107191E
6 changed files with 188 additions and 154 deletions

View File

@ -524,15 +524,15 @@ function onFileAborted(sessionId) {
}
}
}
function onIncFileTransfer(sessionId, chunk_size) {
function onIncFileTransfer(sessionId, chunkSize) {
if (pendingFiles.has(sessionId)) {
let file = pendingFiles.get(sessionId);
file.transferred += chunk_size;
file.transferred += chunkSize;
let now = Date.now();
let speed = chunk_size/(now-file.lastChunk)*1000;
let speed = chunkSize/(now-file.lastChunk)*1000;
file.lastChunk = now;
if (file.transferred >= file.size) {
file.state = "finished";
file.state = "completed";
} else {
file.state = "transferring";
}
@ -547,25 +547,26 @@ function onMsgLoad(sessionId, outgoing, msg) {
dislayHistory(false);
}
}
function onFileLoad(sessionId, outgoing, uuid, file_name) {
msgHistory.get(sessionId).unshift([outgoing, true, [uuid, file_name]]);
function onFileLoad(sessionId, outgoing, uuid, fileName) {
msgHistory.get(sessionId).unshift([outgoing, true, [uuid, fileName]]);
if (currentSessionId == sessionId) {
dislayHistory(false);
}
}
function onDisconnected(sessionId) {
if (currentSessionId == sessionId) {
displayChatBottom();
}
pendingFiles.delete(sessionId);
let session = sessionsData.get(sessionId);
if (session.is_contact) {
session.is_online = false;
} else {
sessionsData.delete(sessionId);
if (currentSessionId == sessionId) {
currentSessionId = -1;
document.getElementById("chat_header").classList.add("offline");
}
}
if (currentSessionId == sessionId) {
displayChatBottom();
}
if (currentSessionId == sessionId && !session.is_contact) {
currentSessionId = -1;
document.getElementById("chat_header").classList.add("offline");
}
displaySessions();
}
@ -808,57 +809,62 @@ function generateFileInfo(fileName, fileSize, p) {
p.appendChild(document.createTextNode(" ("+humanFileSize(fileSize)+")"));
}
function displayChatBottom(speed = undefined) {
let msgBox = document.getElementById("message_box");
let session = sessionsData.get(currentSessionId);
if (session.is_online) {
document.getElementById("message_box").style.display = "flex";
if (typeof session === "undefined") {
msgBox.removeAttribute("style");
} else {
document.getElementById("message_box").removeAttribute("style");
}
let fileTransfer = document.getElementById("file_transfer");
if (pendingFiles.has(currentSessionId)) {
let file = pendingFiles.get(currentSessionId);
let fileInfo = document.getElementById("file_info");
fileInfo.innerHTML = "";
generateFileInfo(file.name, file.size, fileInfo);
let fileProgress = document.getElementById("file_progress");
fileProgress.style.display = "none"; //hide by default
let fileStatus = document.getElementById("file_status");
fileStatus.removeAttribute("style"); //show by default
let fileCancel = document.getElementById("file_cancel");
fileCancel.style.display = "none"; //hide by default
document.querySelector("#file_progress_bar>div").style.width = 0;
switch (file.state) {
case "transferring":
fileCancel.removeAttribute("style"); //show
fileStatus.style.display = "none";
fileProgress.removeAttribute("style"); //show
let percent = (file.transferred/file.size)*100;
document.getElementById("file_percent").textContent = percent.toFixed(2)+"%";
if (typeof speed !== "undefined") {
document.getElementById("file_speed").textContent = humanFileSize(speed)+"/s";
}
document.querySelector("#file_progress_bar>div").style.width = Math.round(percent)+"%";
break;
case "waiting":
fileStatus.textContent = "Waiting for peer confirmation...";
break;
case "accepted":
fileStatus.textContent = "Downloading file...";
break;
case "aborted":
fileStatus.textContent = "Transfer aborted.";
pendingFiles.delete(currentSessionId);
break;
case "sending":
fileStatus.textContent = "Sending file...";
break;
case "finished":
fileStatus.textContent = "Transfer finished.";
pendingFiles.delete(currentSessionId);
if (session.is_online) {
msgBox.style.display = "flex";
} else {
msgBox.removeAttribute("style");
}
let fileTransfer = document.getElementById("file_transfer");
if (pendingFiles.has(currentSessionId)) {
let file = pendingFiles.get(currentSessionId);
let fileInfo = document.getElementById("file_info");
fileInfo.innerHTML = "";
generateFileInfo(file.name, file.size, fileInfo);
let fileProgress = document.getElementById("file_progress");
fileProgress.style.display = "none"; //hide by default
let fileStatus = document.getElementById("file_status");
fileStatus.removeAttribute("style"); //show by default
let fileCancel = document.getElementById("file_cancel");
fileCancel.style.display = "none"; //hide by default
document.querySelector("#file_progress_bar>div").style.width = 0;
switch (file.state) {
case "transferring":
fileCancel.removeAttribute("style"); //show
fileStatus.style.display = "none";
fileProgress.removeAttribute("style"); //show
let percent = (file.transferred/file.size)*100;
document.getElementById("file_percent").textContent = percent.toFixed(2)+"%";
if (typeof speed !== "undefined") {
document.getElementById("file_speed").textContent = humanFileSize(speed)+"/s";
}
document.querySelector("#file_progress_bar>div").style.width = Math.round(percent)+"%";
break;
case "waiting":
fileStatus.textContent = "Waiting for peer confirmation...";
break;
case "accepted":
fileStatus.textContent = "Downloading file...";
break;
case "aborted":
fileStatus.textContent = "Transfer aborted.";
pendingFiles.delete(currentSessionId);
break;
case "sending":
fileStatus.textContent = "Sending file...";
break;
case "completed":
fileStatus.textContent = "Transfer completed.";
pendingFiles.delete(currentSessionId);
}
fileTransfer.classList.add("active");
} else {
fileTransfer.classList.remove("active");
}
fileTransfer.classList.add("active");
} else {
fileTransfer.classList.remove("active");
}
}
function dislayHistory(scrollToBottom = true) {
@ -880,4 +886,4 @@ function dislayHistory(scrollToBottom = true) {
if (scrollToBottom) {
msg_log.scrollTop = msg_log.scrollHeight;
}
}
}

View File

@ -98,7 +98,7 @@ async fn websocket_worker(mut ui_connection: UiConnection, global_vars: Arc<RwLo
load_msgs(session_manager.clone(), &mut ui_connection, &contact.0);
});
session_manager.sessions.read().unwrap().iter().for_each(|session| {
ui_connection.on_new_session(session.0, &session.1.name, session.1.outgoing, session.1.file_transfer.as_ref());
ui_connection.on_new_session(session.0, &session.1.name, session.1.outgoing, session.1.file_download.as_ref());
});
let not_seen = session_manager.list_not_seen();
if not_seen.len() > 0 {

View File

@ -9,7 +9,7 @@ use session::Session;
use ed25519_dalek::PUBLIC_KEY_LENGTH;
use uuid::Uuid;
use platform_dirs::UserDirs;
use crate::{constants, discovery, identity::{Contact, Identity}, utils::get_unix_timestamp, print_error};
use crate::{constants, discovery, identity::{Contact, Identity}, utils::{get_unix_timestamp, get_not_used_path}, print_error};
use crate::ui_interface::UiConnection;
#[derive(Display, Debug, PartialEq, Eq)]
@ -34,7 +34,7 @@ enum SessionCommand {
},
Close,
}
#[derive(Clone, PartialEq, Eq)]
#[derive(Copy, Clone, PartialEq, Eq)]
pub enum FileState {
ASKING,
ACCEPTED,
@ -57,7 +57,7 @@ pub struct SessionData {
pub outgoing: bool,
peer_public_key: [u8; PUBLIC_KEY_LENGTH],
sender: Sender<SessionCommand>,
pub file_transfer: Option<LargeFileDownload>,
pub file_download: Option<LargeFileDownload>,
}
pub struct SessionManager {
@ -164,6 +164,9 @@ impl SessionManager {
}
fn remove_session(&self, session_id: &usize) {
self.with_ui_connection(|ui_connection| {
ui_connection.on_disconnected(&session_id);
});
self.sessions.write().unwrap().remove(session_id);
self.saved_msgs.lock().unwrap().remove(session_id);
self.not_seen.write().unwrap().retain(|x| x != session_id);
@ -172,9 +175,9 @@ impl SessionManager {
async fn send_msg(&self, session_id: usize, session: &mut Session, buff: &[u8], aborted: &mut bool, file_ack_sender: Option<&Sender<bool>>) -> Result<(), SessionError> {
session.encrypt_and_send(&buff).await?;
if buff[0] == protocol::Headers::ACCEPT_LARGE_FILE {
self.sessions.write().unwrap().get_mut(&session_id).unwrap().file_transfer.as_mut().unwrap().state = FileState::ACCEPTED;
self.sessions.write().unwrap().get_mut(&session_id).unwrap().file_download.as_mut().unwrap().state = FileState::ACCEPTED;
} else if buff[0] == protocol::Headers::ABORT_FILE_TRANSFER {
self.sessions.write().unwrap().get_mut(&session_id).unwrap().file_transfer = None;
self.sessions.write().unwrap().get_mut(&session_id).unwrap().file_download = None;
*aborted = true;
if let Some(sender) = file_ack_sender {
if let Err(e) = sender.send(false).await {
@ -234,86 +237,95 @@ impl SessionManager {
}
}
protocol::Headers::ASK_LARGE_FILE => {
let file_size = u64::from_be_bytes(buffer[1..9].try_into().unwrap());
match from_utf8(&buffer[9..]) {
Ok(file_name) => {
let file_name = sanitize_filename::sanitize(file_name);
let download_dir = UserDirs::new().unwrap().download_dir;
self.sessions.write().unwrap().get_mut(&session_id).unwrap().file_transfer = Some(LargeFileDownload{
file_name: file_name.clone(),
download_location: download_dir.to_str().unwrap().to_string(),
file_size,
state: FileState::ASKING,
transferred: 0,
last_chunk: get_unix_timestamp(),
});
let mut test_file_path = download_dir.join(&file_name);
let mut n = 1;
while test_file_path.exists() {
let splits: Vec<&str> = file_name.split('.').collect();
test_file_path = download_dir.join(format!("{} ({}).{}", splits[..splits.len()-1].join("."), n, splits[splits.len()-1]));
n += 1;
if self.sessions.read().unwrap().get(&session_id).unwrap().file_download.is_none() { //don't accept 2 downloads at the same time
let file_size = u64::from_be_bytes(buffer[1..9].try_into().unwrap());
match from_utf8(&buffer[9..]) {
Ok(file_name) => {
let file_name = sanitize_filename::sanitize(file_name);
let download_dir = UserDirs::new().unwrap().download_dir;
self.sessions.write().unwrap().get_mut(&session_id).unwrap().file_download = Some(LargeFileDownload{
file_name: file_name.clone(),
download_location: download_dir.to_str().unwrap().to_string(),
file_size,
state: FileState::ASKING,
transferred: 0,
last_chunk: get_unix_timestamp(),
});
local_file_path = Some(get_not_used_path(&file_name, &download_dir));
self.with_ui_connection(|ui_connection| {
ui_connection.on_ask_large_file(&session_id, file_size, &file_name, download_dir.to_str().unwrap());
})
}
local_file_path = Some(test_file_path);
self.with_ui_connection(|ui_connection| {
ui_connection.on_ask_large_file(&session_id, file_size, &file_name, download_dir.to_str().unwrap());
})
Err(e) => print_error!(e),
}
Err(e) => print_error!(e),
} else if let Err(e) = session.encrypt_and_send(&[protocol::Headers::ABORT_FILE_TRANSFER]).await {
print_error!(e);
break;
}
}
protocol::Headers::LARGE_FILE_CHUNK => {
let file_transfer_opt = {
self.sessions.read().unwrap().get(&session_id).unwrap().file_transfer.clone()
let state = {
let sessions = self.sessions.read().unwrap();
match sessions.get(&session_id).unwrap().file_download.as_ref() {
Some(file_transfer) => Some(file_transfer.state),
None => None
}
};
if let Some(file_transfer) = file_transfer_opt {
if file_transfer.state == FileState::ACCEPTED || file_transfer.state == FileState::TRANSFERRING {
if local_file_handle.is_none() {
if let Some(file_path) = local_file_path.as_ref() {
match OpenOptions::new().append(true).create(true).open(file_path) {
Ok(file) => local_file_handle = Some(file),
Err(e) => print_error!(e)
}
}
}
let mut is_success = false;
if let Some(file_handle) = local_file_handle.as_mut() {
match file_handle.write_all(&buffer[1..]) {
Ok(_) => {
let chunk_size = (buffer.len()-1) as u64;
{
let mut sessions = self.sessions.write().unwrap();
let file_transfer = sessions.get_mut(&session_id).unwrap().file_transfer.as_mut().unwrap();
file_transfer.last_chunk = get_unix_timestamp();
file_transfer.transferred += chunk_size;
if file_transfer.transferred >= file_transfer.file_size { //we downloaded all the file
sessions.get_mut(&session_id).unwrap().file_transfer = None;
local_file_path = None;
local_file_handle = None;
} else if file_transfer.state != FileState::TRANSFERRING {
file_transfer.state = FileState::TRANSFERRING;
}
}
if let Err(e) = session.encrypt_and_send(&[protocol::Headers::ACK_CHUNK]).await {
print_error!(e);
break;
}
self.with_ui_connection(|ui_connection| {
ui_connection.inc_file_transfer(&session_id, chunk_size);
});
is_success = true;
let mut should_accept_chunk = false;
if let Some(state) = state {
if state == FileState::ACCEPTED {
if let Some(file_path) = local_file_path.as_ref() {
match OpenOptions::new().append(true).create(true).open(file_path) {
Ok(file) => {
local_file_handle = Some(file);
let mut sessions = self.sessions.write().unwrap();
let file_transfer = sessions.get_mut(&session_id).unwrap().file_download.as_mut().unwrap();
file_transfer.state = FileState::TRANSFERRING;
should_accept_chunk = true;
}
Err(e) => print_error!(e)
}
}
if !is_success {
self.sessions.write().unwrap().get_mut(&session_id).unwrap().file_transfer = None;
local_file_path = None;
local_file_handle = None;
if let Err(e) = session.encrypt_and_send(&[protocol::Headers::ABORT_FILE_TRANSFER]).await {
print_error!(e);
break;
} else if state == FileState::TRANSFERRING {
should_accept_chunk = true;
}
}
if should_accept_chunk {
let mut is_success = false;
if let Some(file_handle) = local_file_handle.as_mut() {
match file_handle.write_all(&buffer[1..]) {
Ok(_) => {
let chunk_size = (buffer.len()-1) as u64;
{
let mut sessions = self.sessions.write().unwrap();
let file_transfer = sessions.get_mut(&session_id).unwrap().file_download.as_mut().unwrap();
file_transfer.last_chunk = get_unix_timestamp();
file_transfer.transferred += chunk_size;
if file_transfer.transferred >= file_transfer.file_size { //we downloaded all the file
sessions.get_mut(&session_id).unwrap().file_download = None;
local_file_path = None;
local_file_handle = None;
}
}
if let Err(e) = session.encrypt_and_send(&[protocol::Headers::ACK_CHUNK]).await {
print_error!(e);
break;
}
self.with_ui_connection(|ui_connection| {
ui_connection.inc_file_transfer(&session_id, chunk_size);
});
is_success = true;
}
Err(e) => print_error!(e)
}
}
if !is_success {
self.sessions.write().unwrap().get_mut(&session_id).unwrap().file_download = None;
local_file_path = None;
local_file_handle = None;
if let Err(e) = session.encrypt_and_send(&[protocol::Headers::ABORT_FILE_TRANSFER]).await {
print_error!(e);
break;
}
}
}
@ -337,7 +349,7 @@ impl SessionManager {
}
aborted = true;
}
self.sessions.write().unwrap().get_mut(&session_id).unwrap().file_transfer = None;
self.sessions.write().unwrap().get_mut(&session_id).unwrap().file_download = None;
local_file_path = None;
local_file_handle = None;
self.with_ui_connection(|ui_connection| {
@ -383,12 +395,9 @@ impl SessionManager {
}
}
Err(e) => {
if e != SessionError::BrokenPipe && e != SessionError::ConnectionReset {
if e != SessionError::BrokenPipe && e != SessionError::ConnectionReset && e != SessionError::BufferTooLarge {
print_error!(e);
}
self.with_ui_connection(|ui_connection| {
ui_connection.on_disconnected(&session_id);
});
break;
}
}
@ -488,7 +497,7 @@ impl SessionManager {
outgoing,
peer_public_key,
sender: sender,
file_transfer: None,
file_download: None,
};
let mut session_id = None;
for (i, contact) in session_manager.loaded_contacts.read().unwrap().iter() {
@ -610,7 +619,9 @@ impl SessionManager {
let result = Identity::remove_contact(&loaded_contacts.get(&session_id).unwrap().uuid);
if result.is_ok() {
if let Some(contact) = loaded_contacts.remove(&session_id) {
self.sessions.write().unwrap().get_mut(&session_id).unwrap().name = contact.name;
if let Some(session) = self.sessions.write().unwrap().get_mut(&session_id) {
session.name = contact.name;
}
}
self.last_loaded_msg_offsets.write().unwrap().remove(&session_id);
}

View File

@ -1,15 +1,15 @@
pub struct Headers;
impl Headers {
pub const MESSAGE: u8 = 0x01;
pub const ASK_NAME: u8 = 0x02;
pub const TELL_NAME: u8 = 0x03;
pub const FILE: u8 = 0x04;
pub const ASK_LARGE_FILE: u8 = 0x05;
pub const ACCEPT_LARGE_FILE: u8 = 0x06;
pub const LARGE_FILE_CHUNK: u8 = 0x07;
pub const ACK_CHUNK: u8 = 0x08;
pub const ABORT_FILE_TRANSFER: u8 = 0x09;
pub const MESSAGE: u8 = 0x00;
pub const ASK_NAME: u8 = 0x01;
pub const TELL_NAME: u8 = 0x02;
pub const FILE: u8 = 0x03;
pub const ASK_LARGE_FILE: u8 = 0x04;
pub const ACCEPT_LARGE_FILE: u8 = 0x05;
pub const LARGE_FILE_CHUNK: u8 = 0x06;
pub const ACK_CHUNK: u8 = 0x07;
pub const ABORT_FILE_TRANSFER: u8 = 0x08;
}
pub fn new_message(message: String) -> Vec<u8> {

View File

@ -252,6 +252,7 @@ impl Session {
Err(_) => Err(SessionError::TransmissionCorrupted)
}
} else {
print_error!("Buffer too large: {} B", recv_len);
Err(SessionError::BufferTooLarge)
}
}

View File

@ -1,4 +1,4 @@
use std::{convert::TryInto, time::{SystemTime, UNIX_EPOCH}};
use std::{convert::TryInto, time::{SystemTime, UNIX_EPOCH}, path::PathBuf};
use uuid::Bytes;
use crate::print_error;
@ -28,6 +28,22 @@ pub fn get_unix_timestamp() -> u128 {
SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_millis()
}
pub fn get_not_used_path(file_name: &str, parent_directory: &PathBuf) -> String {
let has_extension = file_name.matches('.').count() > 0;
let mut path = parent_directory.join(&file_name);
let mut n = 1;
while path.exists() {
path = if has_extension {
let splits: Vec<&str> = file_name.split('.').collect();
parent_directory.join(format!("{} ({}).{}", splits[..splits.len()-1].join("."), n, splits[splits.len()-1]))
} else {
parent_directory.join(format!("{} ({})", file_name, n))
};
n += 1;
}
path.to_str().unwrap().to_owned()
}
#[macro_export]
macro_rules! print_error {
($arg:tt) => ({