Fixing bug when consecutively sending 2 large files

This commit is contained in:
Matéo Duparc 2021-05-05 18:00:03 +02:00
parent a988c537e2
commit ce246c2234
Signed by: hardcoresushi
GPG Key ID: 007F84120107191E
3 changed files with 31 additions and 19 deletions

View File

@ -165,7 +165,14 @@ document.getElementById("logout").onclick = function() {
document.getElementById("attach_file").onchange = function(event) { document.getElementById("attach_file").onchange = function(event) {
let file = event.target.files[0]; let file = event.target.files[0];
if (file.size > 32760000) { if (file.size > 32760000) {
if (!pendingFiles.has(currentSessionId)) { if (pendingFiles.has(currentSessionId)) {
let mainDiv = document.createElement("div");
mainDiv.appendChild(generatePopupWarningTitle());
let p = document.createElement("p");
p.textContent = "Another file transfer is already in progress.";
mainDiv.appendChild(p);
showPopup(mainDiv);
} else {
pendingFiles.set(currentSessionId, { pendingFiles.set(currentSessionId, {
"file": file, "file": file,
"name": file.name, "name": file.name,

View File

@ -363,10 +363,11 @@ async fn handle_send_file(req: HttpRequest, mut payload: Multipart) -> HttpRespo
print_error!(e); print_error!(e);
return HttpResponse::InternalServerError().finish(); return HttpResponse::InternalServerError().finish();
} }
let last_chunk = chunk_buffer.len() < constants::FILE_CHUNK_SIZE;
if !match ack_receiver.recv().await { if !match ack_receiver.recv().await {
Some(should_continue) => { Some(should_continue) => {
//send previous encrypted chunk even if transfert is aborted to keep PSEC nonces syncrhonized //send previous encrypted chunk even if transfert is aborted to keep PSEC nonces syncrhonized
if let Err(e) = global_vars_read.session_manager.send_encrypted_file_chunk(&session_id, ack_sender.clone()).await { if let Err(e) = global_vars_read.session_manager.send_encrypted_file_chunk(&session_id, ack_sender.clone(), last_chunk).await {
print_error!(e); print_error!(e);
false false
} else { } else {
@ -377,7 +378,7 @@ async fn handle_send_file(req: HttpRequest, mut payload: Multipart) -> HttpRespo
} { } {
return HttpResponse::InternalServerError().finish() return HttpResponse::InternalServerError().finish()
} }
if chunk_buffer.len() < constants::FILE_CHUNK_SIZE { if last_chunk {
break; break;
} else { } else {
chunk_buffer.truncate(1); chunk_buffer.truncate(1);

View File

@ -30,6 +30,7 @@ enum SessionCommand {
}, },
SendEncryptedFileChunk { SendEncryptedFileChunk {
sender: Sender<bool>, sender: Sender<bool>,
last_chunk: bool,
}, },
EncryptFileChunk { EncryptFileChunk {
plain_text: Vec<u8>, plain_text: Vec<u8>,
@ -134,10 +135,11 @@ impl SessionManager {
} }
} }
pub async fn send_encrypted_file_chunk(&self, session_id: &usize, ack_sender: Sender<bool>) -> Result<(), SessionError> { pub async fn send_encrypted_file_chunk(&self, session_id: &usize, ack_sender: Sender<bool>, last_chunk: bool) -> Result<(), SessionError> {
if let Some(sender) = self.get_session_sender(session_id) { if let Some(sender) = self.get_session_sender(session_id) {
match sender.send(SessionCommand::SendEncryptedFileChunk { match sender.send(SessionCommand::SendEncryptedFileChunk {
sender: ack_sender, sender: ack_sender,
last_chunk
}).await { }).await {
Ok(_) => Ok(()), Ok(_) => Ok(()),
Err(e) => { Err(e) => {
@ -175,13 +177,13 @@ impl SessionManager {
self.not_seen.write().unwrap().retain(|x| x != session_id); self.not_seen.write().unwrap().retain(|x| x != session_id);
} }
async fn send_msg(&self, session_id: usize, session_write: &mut SessionWrite, buff: &[u8], aborted: &mut bool, file_ack_sender: Option<&Sender<bool>>) -> Result<(), SessionError> { async fn send_msg(&self, session_id: usize, session_write: &mut SessionWrite, buff: &[u8], is_sending: &mut bool, file_ack_sender: Option<&Sender<bool>>) -> Result<(), SessionError> {
session_write.encrypt_and_send(&buff).await?; session_write.encrypt_and_send(&buff).await?;
if buff[0] == protocol::Headers::ACCEPT_LARGE_FILE { if buff[0] == protocol::Headers::ACCEPT_LARGE_FILE {
self.sessions.write().unwrap().get_mut(&session_id).unwrap().file_download.as_mut().unwrap().state = FileState::ACCEPTED; self.sessions.write().unwrap().get_mut(&session_id).unwrap().file_download.as_mut().unwrap().state = FileState::ACCEPTED;
} else if buff[0] == protocol::Headers::ABORT_FILE_TRANSFER { } else if buff[0] == protocol::Headers::ABORT_FILE_TRANSFER {
self.sessions.write().unwrap().get_mut(&session_id).unwrap().file_download = None; self.sessions.write().unwrap().get_mut(&session_id).unwrap().file_download = None;
*aborted = true; *is_sending = false;
if let Some(sender) = file_ack_sender { if let Some(sender) = file_ack_sender {
if let Err(e) = sender.send(false).await { if let Err(e) = sender.send(false).await {
print_error!(e); print_error!(e);
@ -202,7 +204,7 @@ impl SessionManager {
let mut next_chunk: Option<Vec<u8>> = None; let mut next_chunk: Option<Vec<u8>> = None;
let mut file_ack_sender: Option<Sender<bool>> = None; let mut file_ack_sender: Option<Sender<bool>> = None;
let mut msg_queue = Vec::new(); let mut msg_queue = Vec::new();
let mut aborted = false; let mut is_sending = false;
let (session_read, mut session_write) = session.into_spit().unwrap(); let (session_read, mut session_write) = session.into_spit().unwrap();
let receiving = session_read.receive_and_decrypt(); let receiving = session_read.receive_and_decrypt();
@ -245,7 +247,7 @@ impl SessionManager {
} }
} }
protocol::Headers::ASK_LARGE_FILE => { protocol::Headers::ASK_LARGE_FILE => {
if self.sessions.read().unwrap().get(&session_id).unwrap().file_download.is_none() { //don't accept 2 downloads at the same time if self.sessions.read().unwrap().get(&session_id).unwrap().file_download.is_none() && !is_sending { //don't accept 2 file transfers at the same time
if let Some((file_size, file_name)) = protocol::parse_ask_file(&buffer) { if let Some((file_size, file_name)) = protocol::parse_ask_file(&buffer) {
let download_dir = UserDirs::new().unwrap().download_dir; let download_dir = UserDirs::new().unwrap().download_dir;
self.sessions.write().unwrap().get_mut(&session_id).unwrap().file_download = Some(LargeFileDownload{ self.sessions.write().unwrap().get_mut(&session_id).unwrap().file_download = Some(LargeFileDownload{
@ -341,7 +343,7 @@ impl SessionManager {
}); });
} }
if sender.send(true).await.is_err() { if sender.send(true).await.is_err() {
aborted = true; is_sending = false;
} }
} }
} }
@ -350,7 +352,7 @@ impl SessionManager {
if let Err(e) = sender.send(false).await { if let Err(e) = sender.send(false).await {
print_error!(e); print_error!(e);
} }
aborted = true; is_sending = false;
} }
self.sessions.write().unwrap().get_mut(&session_id).unwrap().file_download = None; self.sessions.write().unwrap().get_mut(&session_id).unwrap().file_download = None;
local_file_path = None; local_file_path = None;
@ -385,9 +387,8 @@ impl SessionManager {
let is_classical_message = header == protocol::Headers::MESSAGE || header == protocol::Headers::FILE; let is_classical_message = header == protocol::Headers::MESSAGE || header == protocol::Headers::FILE;
if is_classical_message { if is_classical_message {
self.set_seen(session_id, false); self.set_seen(session_id, false);
} } else if header == protocol::Headers::ACCEPT_LARGE_FILE {
if header == protocol::Headers::ACCEPT_LARGE_FILE { is_sending = true;
aborted = false;
} }
self.with_ui_connection(|ui_connection| { self.with_ui_connection(|ui_connection| {
ui_connection.on_received(&session_id, buffer.as_ref().unwrap()); ui_connection.on_received(&session_id, buffer.as_ref().unwrap());
@ -411,17 +412,17 @@ impl SessionManager {
match command.unwrap() { match command.unwrap() {
SessionCommand::Send { buff } => { SessionCommand::Send { buff } => {
//don't send msg if we already encrypted a file chunk (keep PSEC nonces synchronized) //don't send msg if we already encrypted a file chunk (keep PSEC nonces synchronized)
if next_chunk.is_none() || aborted { if is_sending {
if let Err(e) = self.send_msg(session_id, &mut session_write, &buff, &mut aborted, file_ack_sender.as_ref()).await { msg_queue.push(buff);
} else {
if let Err(e) = self.send_msg(session_id, &mut session_write, &buff, &mut is_sending, file_ack_sender.as_ref()).await {
print_error!(e); print_error!(e);
break; break;
} }
} else {
msg_queue.push(buff);
} }
} }
SessionCommand::EncryptFileChunk { plain_text } => next_chunk = Some(session_write.encrypt(&plain_text)), SessionCommand::EncryptFileChunk { plain_text } => next_chunk = Some(session_write.encrypt(&plain_text)),
SessionCommand::SendEncryptedFileChunk { sender } => { SessionCommand::SendEncryptedFileChunk { sender, last_chunk } => {
if let Some(chunk) = next_chunk.as_ref() { if let Some(chunk) = next_chunk.as_ref() {
match session_write.socket_write(chunk).await { match session_write.socket_write(chunk).await {
Ok(_) => { Ok(_) => {
@ -429,11 +430,14 @@ impl SessionManager {
//once the pre-encrypted chunk is sent, we can send the pending messages //once the pre-encrypted chunk is sent, we can send the pending messages
while msg_queue.len() > 0 { while msg_queue.len() > 0 {
let msg = msg_queue.remove(0); let msg = msg_queue.remove(0);
if let Err(e) = self.send_msg(session_id, &mut session_write, &msg, &mut aborted, file_ack_sender.as_ref()).await { if let Err(e) = self.send_msg(session_id, &mut session_write, &msg, &mut is_sending, file_ack_sender.as_ref()).await {
print_error!(e); print_error!(e);
break; break;
} }
} }
if last_chunk {
is_sending = false;
}
} }
Err(e) => { Err(e) => {
print_error!(e); print_error!(e);