From dda30bf5afb6b1c472290cbcc15ed7fb92ddb7e8 Mon Sep 17 00:00:00 2001 From: Hardcore Sushi Date: Fri, 23 Jul 2021 12:04:29 +0200 Subject: [PATCH] Pending messages --- src/frontend/index.css | 136 +++++++++++++++++++++++++- src/frontend/index.html | 21 +++- src/frontend/index.js | 155 +++++++++++++++++++++-------- src/identity.rs | 2 +- src/main.rs | 59 ++++++----- src/protocol.rs | 12 ++- src/session_manager.rs | 212 +++++++++++++++++++++++++++------------- src/ui_interface.rs | 83 ++++++++-------- 8 files changed, 481 insertions(+), 199 deletions(-) diff --git a/src/frontend/index.css b/src/frontend/index.css index c6e4c15..1aeda44 100644 --- a/src/frontend/index.css +++ b/src/frontend/index.css @@ -402,10 +402,6 @@ button:hover::after { height: 100%; background-color: var(--accent); } -#message_box { - border-top: 2px solid var(--accent); - margin-bottom: 0; -} #msg_log { overflow-y: auto; white-space: pre-wrap; @@ -472,9 +468,139 @@ button:hover::after { width: 2em; margin-left: 15px; } -#message_box, #chat_header, #msg_log, #file_transfer { +#message_box, #message_box.online #offline_warning, #chat_header, #msg_log, #file_transfer { display: none; } +#message_box.active { + display: block; +} +#message_box { + border-top: 2px solid red; + margin-bottom: 0; +} +#message_box>div:nth-child(2) { + display: flex; +} +#message_box.online { + border-top-color: var(--accent); +} +#offline_warning { + margin-left: 20px; + display: flex; + align-items: center; + gap: 25px; +} +#offline_warning::before { + content: url("/static/imgs/icons/warning/ff0000"); + display: block; + width: 2em; +} +#offline_warning h3 { + color: red; + display: inline-block; + margin-bottom: .3em; +} +#offline_warning p { + margin-top: 0; +} +#msg_log li.pending_msgs_divider { + border-top: 1px solid grey; + padding-top: 10px; + margin-top: 30px; + margin-left: 100px; + margin-right: 100px; +} +#msg_log li.pending_msgs_divider h4 { + margin: auto; + opacity: .5; +} +.lds-spinner { + color: official; + position: relative; + width: 82px; + height: 82px; +} +.lds-spinner div { + transform-origin: 40px 40px; + animation: lds-spinner 1.2s linear infinite; +} +.lds-spinner div:after { + content: " "; + display: block; + position: absolute; + top: 3px; + left: 37px; + width: 6px; + height: 18px; + border-radius: 20%; + background: #fff; +} +.lds-spinner div:nth-child(1) { + transform: rotate(0deg); + animation-delay: -1.1s; +} +.lds-spinner div:nth-child(2) { + transform: rotate(30deg); + animation-delay: -1s; +} +.lds-spinner div:nth-child(3) { + transform: rotate(60deg); + animation-delay: -0.9s; +} +.lds-spinner div:nth-child(4) { + transform: rotate(90deg); + animation-delay: -0.8s; +} +.lds-spinner div:nth-child(5) { + transform: rotate(120deg); + animation-delay: -0.7s; +} +.lds-spinner div:nth-child(6) { + transform: rotate(150deg); + animation-delay: -0.6s; +} +.lds-spinner div:nth-child(7) { + transform: rotate(180deg); + animation-delay: -0.5s; +} +.lds-spinner div:nth-child(8) { + transform: rotate(210deg); + animation-delay: -0.4s; +} +.lds-spinner div:nth-child(9) { + transform: rotate(240deg); + animation-delay: -0.3s; +} +.lds-spinner div:nth-child(10) { + transform: rotate(270deg); + animation-delay: -0.2s; +} +.lds-spinner div:nth-child(11) { + transform: rotate(300deg); + animation-delay: -0.1s; +} +.lds-spinner div:nth-child(12) { + transform: rotate(330deg); + animation-delay: 0s; +} +@keyframes lds-spinner { + 0% { + opacity: 1; + } + 100% { + opacity: 0; + } +} +#pending_msgs_indicator { + display: none; + align-items: center; + justify-content: center; + gap: 15px; + margin-bottom: 20px; +} +#pending_msgs_indicator.sending { + display: flex; +} #disconnected { display: none; height: 100%; diff --git a/src/frontend/index.html b/src/frontend/index.html index 1beaa55..59749e4 100644 --- a/src/frontend/index.html +++ b/src/frontend/index.html @@ -41,6 +41,10 @@ +
+
+

Sending pending messages...

+
@@ -56,10 +60,18 @@
- - +
+
+

Your contact seems to be offline.

+

Sent messages will be stored until a connection is established.

+
+
+
+ + +
@@ -80,4 +92,3 @@ - diff --git a/src/frontend/index.js b/src/frontend/index.js index 07c4b71..eb05ee2 100644 --- a/src/frontend/index.js +++ b/src/frontend/index.js @@ -7,6 +7,7 @@ let localIps = []; let currentSessionId = -1; let sessionsData = new Map(); let msgHistory = new Map(); +let pendingMsgs = new Map(); let pendingFilesTransfers = new Map(); let avatarTimestamps = new Map([ ["self", Date.now()] @@ -80,6 +81,7 @@ document.getElementById("delete_conversation").onclick = function() { document.getElementById("add_contact").onclick = function() { socket.send("contact "+currentSessionId); sessionsData.get(currentSessionId).isContact = true; + pendingMsgs.set(currentSessionId, []); displayHeader(); displaySessions(); }; @@ -103,7 +105,9 @@ document.getElementById("remove_contact").onclick = function() { if (!session.isOnline) { sessionsData.delete(currentSessionId); msgHistory.get(currentSessionId).length = 0; + displayChatBottom(); } + pendingMsgs.delete(currentSessionId); displayHeader(); displaySessions(); displayHistory(); @@ -215,7 +219,11 @@ document.getElementById("attach_file").onchange = function(event) { formData.append("", files[i]); fetch("/send_file", {method: "POST", body: formData}).then(response => { if (response.ok) { - response.text().then(uuid => onFileSent(currentSessionId, new Date(), uuid, files[i].name)); + response.text().then(text => { + if (text === "pending") { + newPendingMsg(currentSessionId, true, files[i].name); + } + }); } else { console.log(response); } @@ -226,16 +234,19 @@ document.getElementById("attach_file").onchange = function(event) { document.getElementById("file_cancel").onclick = function() { socket.send("abort "+currentSessionId); }; -let msg_log = document.getElementById("msg_log"); -msg_log.onscroll = function() { - if (sessionsData.get(currentSessionId).isContact) { - if (msg_log.scrollTop < 30) { - socket.send("load_msgs "+currentSessionId); +let msgLog = document.getElementById("msg_log"); +msgLog.onscroll = function() { + let session = sessionsData.get(currentSessionId); + if (typeof sessions !== "undefined") { + if (session.isContact) { + if (msgLog.scrollTop < 30) { + socket.send("load_msgs "+currentSessionId); + } } } }; -let profile_div = document.querySelector("#me>div"); -profile_div.onclick = function() { +let profileDiv = document.querySelector("#me>div"); +profileDiv.onclick = function() { let mainDiv = document.createElement("div"); mainDiv.id = "profile_info"; let avatarContainer = document.createElement("div"); @@ -480,7 +491,7 @@ socket.onmessage = function(msg) { onNewMessage(args[1], args[2] === "true", parseTimestamp(args[3]), msg.data.slice(args[0].length+args[1].length+args[2].length+args[3].length+4)); break; case "file": - onFileReceived(args[1], parseTimestamp(args[2]), args[3], msg.data.slice(args[0].length+args[1].length+args[2].length+args[3].length+4)); + onNewFile(args[1], args[2] === "true", parseTimestamp(args[3]), args[4], msg.data.slice(args[0].length+args[1].length+args[2].length+args[3].length+args[4].length+5)); break; case "files_transfer": onNewFilesTransfer(args[1], args[2], msg.data.slice(args[0].length+args[1].length+args[2].length+3)); @@ -509,6 +520,15 @@ socket.onmessage = function(msg) { case "not_seen": setNotSeen(msg.data.slice(args[0].length+1)); break; + case "pending": + newPendingMsg(args[1], args[2] === "true", msg.data.slice(args[0].length+args[1].length+args[2].length+3)); + break; + case "sending_pending_msgs": + onSendingPendingMsgs(args[1]); + break; + case "pending_msgs_sent": + onPendingMsgsSent(args[1]); + break; case "local_ips": setLocalIps(msg.data.slice(args[0].length+1)); break; @@ -579,6 +599,22 @@ function setNotSeen(strSessionIds) { } displaySessions(); } +function newPendingMsg(sessionId, isFile, data) { + pendingMsgs.get(sessionId).push([isFile, data]); + if (sessionId == currentSessionId) { + displayHistory(); + } +} +function onSendingPendingMsgs(sessionId) { + document.getElementById("pending_msgs_indicator").classList.add("sending"); + pendingMsgs.get(sessionId).length = 0; + if (sessionId == currentSessionId) { + displayHistory(); + } +} +function onPendingMsgsSent(sessionId) { + document.getElementById("pending_msgs_indicator").classList.remove("sending"); +} function setLocalIps(strIPs) { localIps = strIPs.split(' '); } @@ -591,6 +627,7 @@ function onIsContact(sessionId, verified, fingerprint, name) { } else { addSession(sessionId, name, undefined, fingerprint, undefined, true, verified, false); } + pendingMsgs.set(sessionId, []); } function onMsgOrFileReceived(sessionId, outgoing, body) { if (currentSessionId == sessionId) { @@ -616,6 +653,10 @@ function onNewMessage(sessionId, outgoing, timestamp, msg) { msgHistory.get(sessionId).push([outgoing, timestamp, false, msg]); onMsgOrFileReceived(sessionId, outgoing, msg); } +function onNewFile(sessionId, outgoing, timestamp, uuid, filename) { + msgHistory.get(sessionId).push([outgoing, timestamp, true, [uuid, filename]]); + onMsgOrFileReceived(sessionId, outgoing, filename); +} function onNewFilesTransfer(sessionId, index, filesInfo) { let split = filesInfo.split(' '); let files = []; @@ -763,12 +804,12 @@ function onMsgsLoad(sessionId, strMsgs) { } } if (currentSessionId == sessionId) { - if (msg_log.scrollHeight - msg_log.scrollTop === msg_log.clientHeight) { + if (msgLog.scrollHeight - msgLog.scrollTop === msgLog.clientHeight) { displayHistory(); } else { - let backupHeight = msg_log.scrollHeight; + let backupHeight = msgLog.scrollHeight; displayHistory(false); - msg_log.scrollTop = msg_log.scrollHeight-backupHeight; + msgLog.scrollTop = msgLog.scrollHeight-backupHeight; } } } @@ -783,6 +824,7 @@ function onDisconnected(sessionId) { } if (currentSessionId == sessionId) { displayChatBottom(); + scrollHistoryToBottom(); } if (currentSessionId == sessionId && !session.isContact) { currentSessionId = -1; @@ -790,16 +832,6 @@ function onDisconnected(sessionId) { } displaySessions(); } -function onFileReceived(sessionId, timestamp, uuid, file_name) { - msgHistory.get(sessionId).push([false, timestamp, true, [uuid, file_name]]); - onMsgOrFileReceived(sessionId, false, file_name); -} -function onFileSent(sessionId, timestamp, uuid, file_name) { - msgHistory.get(sessionId).push([true, timestamp, true, [uuid, file_name]]); - if (currentSessionId == sessionId) { - displayHistory(); - } -} function onNameSet(newName) { removePopup(); identityName = newName; @@ -946,11 +978,11 @@ function logout() { window.location = "/logout"; } function displayProfile() { - profile_div.innerHTML = ""; - profile_div.appendChild(generateSelfAvatar(avatarTimestamps.get("self"))); + profileDiv.innerHTML = ""; + profileDiv.appendChild(generateSelfAvatar(avatarTimestamps.get("self"))); let p = document.createElement("p"); p.textContent = identityName; - profile_div.appendChild(p); + profileDiv.appendChild(p); } function displayHeader() { chatHeader.children[0].innerHTML = ""; @@ -1066,7 +1098,7 @@ function generateMessage(name, sessionId, msg) { divContainer.appendChild(div); return divContainer; } -function generateFile(name, sessionId, outgoing, file_info) { +function generateFile(name, sessionId, outgoing, fileInfo) { let div1 = document.createElement("div"); div1.classList.add("file"); div1.classList.add("content"); @@ -1078,14 +1110,18 @@ function generateFile(name, sessionId, outgoing, file_info) { h4.textContent = "File received:"; } div2.appendChild(h4); - let p = document.createElement("p"); - p.textContent = file_info[1]; - div2.appendChild(p); div1.appendChild(div2); - let a = document.createElement("a"); - a.href = "/load_file?uuid="+file_info[0]+"&file_name="+encodeURIComponent(file_info[1]); - a.target = "_blank"; - div1.appendChild(a); + let p = document.createElement("p"); + if (typeof fileInfo === "string") { //pending + p.textContent = fileInfo; + } else { + p.textContent = fileInfo[1]; + let a = document.createElement("a"); + a.href = "/load_file?uuid="+fileInfo[0]+"&file_name="+encodeURIComponent(fileInfo[1]); + a.target = "_blank"; + div1.appendChild(a); + } + div2.appendChild(p); let divContainer = document.createElement("div"); if (typeof name !== "undefined") { divContainer.appendChild(generateMsgHeader(name, sessionId)); @@ -1104,13 +1140,16 @@ function displayChatBottom(speed = undefined) { let fileTransfer = document.getElementById("file_transfer"); let session = sessionsData.get(currentSessionId); if (typeof session === "undefined") { - msgBox.removeAttribute("style"); + msgBox.classList.remove("active"); fileTransfer.classList.remove("active"); } else { + if (session.isContact || session.isOnline) { + msgBox.classList.add("active"); + } if (session.isOnline) { - msgBox.style.display = "flex"; + msgBox.classList.add("online"); } else { - msgBox.removeAttribute("style"); + msgBox.classList.remove("online"); } if (pendingFilesTransfers.has(currentSessionId)) { let fileInfo = document.getElementById("file_info"); @@ -1155,13 +1194,16 @@ function displayChatBottom(speed = undefined) { } } } +function scrollHistoryToBottom() { + msgLog.scrollTop = msgLog.scrollHeight; +} function displayHistory(scrollToBottom = true) { - msg_log.innerHTML = ""; + msgLog.innerHTML = ""; let session = sessionsData.get(currentSessionId); if (typeof session === "undefined") { - msg_log.style.display = "none"; + msgLog.style.display = "none"; } else { - msg_log.style.display = "block"; + msgLog.style.display = "block"; let previousOutgoing = undefined; msgHistory.get(currentSessionId).forEach(entry => { let name = undefined; @@ -1184,12 +1226,39 @@ function displayHistory(scrollToBottom = true) { let li = document.createElement("li"); li.appendChild(div); li.appendChild(generateMessageTimestamp(entry[1])); - msg_log.appendChild(li); + msgLog.appendChild(li); }); - if (scrollToBottom) { - msg_log.scrollTop = msg_log.scrollHeight; + if (session.isContact) { + let msgs = pendingMsgs.get(currentSessionId); + if (msgs.length > 0) { + let li = document.createElement("li"); + li.classList.add("pending_msgs_divider"); + let h4 = document.createElement("h4"); + h4.textContent = "Pending messages:"; + li.appendChild(h4); + msgLog.appendChild(li); + msgs.forEach(entry => { + let name = undefined; + if (previousOutgoing != true) { + previousOutgoing = true; + name = identityName; + } + let div; + if (entry[0]) { //is file + div = generateFile(name, currentSessionId, true, entry[1]); + } else { + div = generateMessage(name, currentSessionId, entry[1]); + } + let li = document.createElement("li"); + li.appendChild(div); + msgLog.appendChild(li); + }); + } } - if (msg_log.scrollHeight <= msg_log.clientHeight && session.isContact) { + if (scrollToBottom) { + scrollHistoryToBottom(); + } + if (msgLog.scrollHeight <= msgLog.clientHeight && session.isContact) { socket.send("load_msgs "+currentSessionId); } } diff --git a/src/identity.rs b/src/identity.rs index ddc28cc..08ae660 100644 --- a/src/identity.rs +++ b/src/identity.rs @@ -249,7 +249,7 @@ impl Identity { Ok(file_uuid) } - pub fn store_msg(&self, contact_uuid: &Uuid, message: Message) -> Result { + pub fn store_msg(&self, contact_uuid: &Uuid, message: &Message) -> Result { let db = Connection::open(get_database_path())?; db.execute(&format!("CREATE TABLE IF NOT EXISTS \"{}\" (outgoing BLOB, timestamp BLOB, data BLOB)", contact_uuid), [])?; let outgoing_byte: u8 = bool_to_byte(message.outgoing); diff --git a/src/main.rs b/src/main.rs index 7449730..c1d1ebd 100644 --- a/src/main.rs +++ b/src/main.rs @@ -23,7 +23,6 @@ use utils::escape_double_quote; use identity::Identity; use session_manager::{SessionManager, SessionCommand}; use ui_interface::UiConnection; -use crate::{identity::Message, utils::get_unix_timestamp_sec}; async fn start_websocket_server(global_vars: Arc>) -> u16 { let websocket_bind_addr = env::var("AIRA_WEBSOCKET_ADDR").unwrap_or("127.0.0.1".to_owned()); @@ -120,6 +119,20 @@ async fn websocket_worker(mut ui_connection: UiConnection, global_vars: Arc match from_utf8(&buff[1..]) { + Ok(msg) => ui_connection.new_pending_msg(entry.0, false, msg), + Err(e) => print_error!(e) + } + protocol::Headers::FILE => if let Some(filename) = protocol::get_file_name(buff) { + ui_connection.new_pending_msg(entry.0, true, filename); + } + _ => {} + } + }); + }); let mut ips = Vec::new(); match if_addrs::get_if_addrs() { Ok(ifaces) => for iface in ifaces { @@ -160,15 +173,11 @@ async fn websocket_worker(mut ui_connection: UiConnection, global_vars: Arc discover_peers(session_manager.clone()), "send" => { let session_id: usize = args[1].parse().unwrap(); - let buffer = protocol::new_message(msg[args[0].len()+args[1].len()+2..].to_string()); - let timestamp = get_unix_timestamp_sec(); - if session_manager.send_command(&session_id, SessionCommand::Send { - buff: buffer.clone() - }).await { - session_manager.store_msg(&session_id, Message { - outgoing: true, - timestamp, - data: buffer, + let msg_content = &msg[args[0].len()+args[1].len()+2..]; + let buffer = protocol::new_message(msg_content); + #[allow(unused_must_use)] { + session_manager.send_or_add_to_pending(&session_id, buffer).await.map(|sent| if !sent { + ui_connection.new_pending_msg(&session_id, false, msg_content); }); } } @@ -178,9 +187,9 @@ async fn websocket_worker(mut ui_connection: UiConnection, global_vars: Arc().unwrap(), base64::decode(args[n+1]).unwrap())); } - session_manager.send_command(&session_id, SessionCommand::Send { - buff: protocol::ask_large_files(file_info) - }).await; + #[allow(unused_must_use)] { + session_manager.send_or_add_to_pending(&session_id, protocol::ask_large_files(file_info)).await; + } } "download" => { let session_id: usize = args[1].parse().unwrap(); @@ -211,7 +220,7 @@ async fn websocket_worker(mut ui_connection: UiConnection, global_vars: Arc { let session_id: usize = args[1].parse().unwrap(); - match session_manager.remove_contact(session_id) { + match session_manager.remove_contact(&session_id) { Ok(_) => {}, Err(e) => print_error!(e) } @@ -440,22 +449,12 @@ async fn handle_send_file(req: HttpRequest, mut payload: Multipart) -> HttpRespo while let Some(Ok(chunk)) = field.next().await { buffer.extend(chunk); } - let timestamp = get_unix_timestamp_sec(); - if global_vars_read.session_manager.send_command(&session_id, SessionCommand::Send { - buff: protocol::file(filename, &buffer) - }).await { - match global_vars_read.session_manager.store_file(&session_id, &buffer) { - Ok(file_uuid) => { - let msg = [&[protocol::Headers::FILE][..], file_uuid.as_bytes(), filename.as_bytes()].concat(); - global_vars_read.session_manager.store_msg(&session_id, Message { - outgoing: true, - timestamp, - data: msg, - }); - return HttpResponse::Ok().body(file_uuid.to_string()); - } - Err(e) => print_error!(e) - } + if let Ok(sent) = global_vars_read.session_manager.send_or_add_to_pending(&session_id, protocol::file(filename, &buffer)).await { + return if sent { + HttpResponse::Ok().finish() + } else { + HttpResponse::Ok().body("pending") + }; } } else { let (ack_sender, mut ack_receiver) = mpsc::channel(1); diff --git a/src/protocol.rs b/src/protocol.rs index 86b73c5..c60543a 100644 --- a/src/protocol.rs +++ b/src/protocol.rs @@ -17,7 +17,7 @@ impl Headers { pub const ABORT_FILES_TRANSFER: u8 = 0x0a; } -pub fn new_message(message: String) -> Vec { +pub fn new_message(message: &str) -> Vec { [&[Headers::MESSAGE], message.as_bytes()].concat() } @@ -33,17 +33,21 @@ pub fn file(file_name: &str, buffer: &[u8]) -> Vec { [&[Headers::FILE], &(file_name.len() as u16).to_be_bytes()[..], file_name.as_bytes(), buffer].concat() } -pub fn parse_file<'a>(buffer: &'a [u8]) -> Option<(&'a [u8], &'a [u8])> { +pub fn get_file_name<'a>(buffer: &'a [u8]) -> Option<&'a str> { if buffer.len() > 3 { let file_name_len = u16::from_be_bytes([buffer[1], buffer[2]]) as usize; if buffer.len() > 3+file_name_len { - let file_name = &buffer[3..3+file_name_len]; - return Some((file_name, &buffer[3+file_name_len..])); + return from_utf8(&buffer[3..3+file_name_len]).ok(); } } None } +pub fn parse_file<'a>(buffer: &'a [u8]) -> Option<(&'a str, &'a [u8])> { + let file_name = get_file_name(buffer)?; + Some((file_name, &buffer[3+file_name.len()..])) +} + pub fn ask_large_files(file_info: Vec<(u64, Vec)>) -> Vec { let mut buff = vec![Headers::ASK_LARGE_FILES]; file_info.into_iter().for_each(|info| { diff --git a/src/session_manager.rs b/src/session_manager.rs index 3fb3c6f..437f6c6 100644 --- a/src/session_manager.rs +++ b/src/session_manager.rs @@ -54,7 +54,8 @@ pub struct SessionManager { ui_connection: Mutex>, loaded_contacts: RwLock>, pub last_loaded_msg_offsets: RwLock>, - pub saved_msgs: RwLock>>, + saved_msgs: RwLock>>, + pub pending_msgs: Mutex>>>, pub not_seen: RwLock>, mdns_service: Mutex>, listener_stop_signal: Mutex>>, @@ -91,7 +92,7 @@ impl SessionManager { let mut msg_saved = false; if let Some(contact) = self.loaded_contacts.read().unwrap().get(session_id) { let mut offsets = self.last_loaded_msg_offsets.write().unwrap(); //locking mutex before modifying the DB to prevent race conditions with load_msgs() - match self.identity.read().unwrap().as_ref().unwrap().store_msg(&contact.uuid, message.clone()) { + match self.identity.read().unwrap().as_ref().unwrap().store_msg(&contact.uuid, &message) { Ok(_) => { *offsets.get_mut(session_id).unwrap() += 1; msg_saved = true; @@ -108,8 +109,8 @@ impl SessionManager { } fn get_session_sender(&self, session_id: &usize) -> Option> { - let mut sessions = self.sessions.write().unwrap(); - match sessions.get_mut(session_id) { + let sessions = self.sessions.read().unwrap(); + match sessions.get(session_id) { Some(session_data) => Some(session_data.sender.clone()), None => None } @@ -129,6 +130,21 @@ impl SessionManager { } } + pub async fn send_or_add_to_pending(&self, session_id: &usize, buff: Vec) -> Result { + if let Some(sender) = self.get_session_sender(session_id) { + match sender.send(SessionCommand::Send { buff }).await { + Ok(_) => Ok(true), + Err(e) => { + print_error!(e); + Err(()) + } + } + } else { + self.pending_msgs.lock().unwrap().get_mut(session_id).unwrap().push(buff); + Ok(false) + } + } + fn remove_session(&self, session_id: &usize) { self.with_ui_connection(|ui_connection| { ui_connection.on_disconnected(&session_id); @@ -153,23 +169,66 @@ impl SessionManager { }); } - async fn send_msg(&self, session_id: usize, session_write: &mut SessionWriteHalf, buff: Vec, is_sending: &mut bool, file_ack_sender: &mut Option>) -> Result<(), PsecError> { - self.encrypt_and_send(session_write, &buff).await?; - if buff[0] == protocol::Headers::ACCEPT_LARGE_FILES { - self.sessions.write().unwrap().get_mut(&session_id).unwrap().files_download.as_mut().unwrap().accepted = true; - } else if buff[0] == protocol::Headers::ABORT_FILES_TRANSFER { - self.sessions.write().unwrap().get_mut(&session_id).unwrap().files_download = None; - *is_sending = false; - if let Some(ack_sender) = file_ack_sender { - if let Err(e) = ack_sender.send(false).await { - print_error!(e); + async fn send_store_and_inform(&self, session_id: usize, session_writer: &mut T, buff: Vec) -> Result>, PsecError> { + self.encrypt_and_send(session_writer, &buff).await?; + let timestamp = get_unix_timestamp_sec(); + Ok(match buff[0] { + protocol::Headers::MESSAGE => { + let msg = Message { + outgoing: true, + timestamp, + data: buff, + }; + self.with_ui_connection(|ui_connection| { + ui_connection.on_new_msg(&session_id, &msg); + }); + self.store_msg(&session_id, msg); + None + } + protocol::Headers::FILE => { + if let Some((filename, content)) = protocol::parse_file(&buff) { + match self.store_file(&session_id, content) { + Ok(file_uuid) => { + let msg = [&[protocol::Headers::FILE][..], file_uuid.as_bytes(), filename.as_bytes()].concat(); + self.store_msg(&session_id, Message { + outgoing: true, + timestamp, + data: msg, + }); + self.with_ui_connection(|ui_connection| { + ui_connection.on_new_file(&session_id, true, timestamp, filename, file_uuid); + }); + } + Err(e) => print_error!(e) + } } - *file_ack_sender = None; + None + } + _ => Some(buff) + }) + } + + async fn send_msg(&self, session_id: usize, session_write: &mut SessionWriteHalf, buff: Vec, is_sending: &mut bool, file_ack_sender: &mut Option>) -> Result<(), PsecError> { + if let Some(buff) = self.send_store_and_inform(session_id, session_write, buff).await? { + //not a message or a file + match buff[0] { + protocol::Headers::ACCEPT_LARGE_FILES => self.sessions.write().unwrap().get_mut(&session_id).unwrap().files_download.as_mut().unwrap().accepted = true, + protocol::Headers::ABORT_FILES_TRANSFER => { + self.sessions.write().unwrap().get_mut(&session_id).unwrap().files_download = None; + *is_sending = false; + if let Some(ack_sender) = file_ack_sender { + if let Err(e) = ack_sender.send(false).await { + print_error!(e); + } + *file_ack_sender = None; + } + self.with_ui_connection(|ui_connection| { + ui_connection.on_file_transfer_aborted(&session_id); + }); + } + _ => {} } } - self.with_ui_connection(|ui_connection| { - ui_connection.on_msg_sent(session_id, get_unix_timestamp_sec(), buff); - }); Ok(()) } @@ -283,7 +342,7 @@ impl SessionManager { self.sessions.write().unwrap().get_mut(&session_id).unwrap().files_download = None; local_file_handle = None; self.with_ui_connection(|ui_connection| { - ui_connection.on_received(&session_id, get_unix_timestamp_sec(), buffer); + ui_connection.on_file_transfer_aborted(&session_id); }); } protocol::Headers::ASK_LARGE_FILES => { @@ -371,45 +430,47 @@ impl SessionManager { protocol::Headers::REMOVE_AVATAR => self.set_avatar_uuid(&session_id, None), _ => { let header = buffer[0]; - let buffer = match header { - protocol::Headers::FILE => { - if let Some((file_name, content)) = protocol::parse_file(&buffer) { - match self.store_file(&session_id, content) { - Ok(file_uuid) => { - Some([&[protocol::Headers::FILE][..], file_uuid.as_bytes(), file_name].concat()) - } - Err(e) => { - print_error!(e); - None - } - } - } else { - None - } - } - _ => { - Some(buffer) - } - }; - if let Some(buffer) = buffer { - let is_classical_message = header == protocol::Headers::MESSAGE || header == protocol::Headers::FILE; - let timestamp = get_unix_timestamp_sec(); - if is_classical_message { - self.set_seen(session_id, false); - } else if header == protocol::Headers::ACCEPT_LARGE_FILES { - is_sending = true; - last_chunks_sizes = Some(Vec::new()); - } - self.with_ui_connection(|ui_connection| { - ui_connection.on_received(&session_id, timestamp, buffer.clone()); - }); - if is_classical_message { - self.store_msg(&session_id, Message { + let timestamp = get_unix_timestamp_sec(); + match header { + protocol::Headers::MESSAGE => { + let msg = Message { outgoing: false, timestamp, data: buffer, + }; + self.with_ui_connection(|ui_connection| { + ui_connection.on_new_msg(&session_id, &msg); }); + self.store_msg(&session_id, msg); } + protocol::Headers::FILE => { + if let Some((filename, content)) = protocol::parse_file(&buffer) { + match self.store_file(&session_id, content) { + Ok(file_uuid) => { + self.with_ui_connection(|ui_connection| { + ui_connection.on_new_file(&session_id, false, timestamp, filename, file_uuid); + }); + self.store_msg(&session_id, Message { + outgoing: false, + timestamp, + data: [&[protocol::Headers::FILE][..], file_uuid.as_bytes(), filename.as_bytes()].concat(), + }); + } + Err(e) => print_error!(e) + } + } + } + protocol::Headers::ACCEPT_LARGE_FILES => { + is_sending = true; + last_chunks_sizes = Some(Vec::new()); + self.with_ui_connection(|ui_connection| { + ui_connection.on_large_files_accepted(&session_id); + }) + } + _ => {} + } + if header == protocol::Headers::MESSAGE || header == protocol::Headers::FILE { + self.set_seen(session_id, false); } } } @@ -468,6 +529,24 @@ impl SessionManager { } } + async fn on_session_initialized(&self, session: &mut Session, session_id: usize, is_contact: bool) -> Result<(), PsecError> { + if is_contact { + let pending_msgs = self.pending_msgs.lock().unwrap().get_mut(&session_id).unwrap().split_off(0); + self.with_ui_connection(|ui_connection| { + ui_connection.on_sending_pending_msgs(&session_id); + }); + for buff in pending_msgs { + self.send_store_and_inform(session_id, session, buff).await?; + } + self.with_ui_connection(|ui_connection| { + ui_connection.on_pending_msgs_sent(&session_id); + }); + Ok(()) + } else { + self.encrypt_and_send(session, &protocol::ask_profile_info()).await + } + } + fn handle_new_session(session_manager: Arc, mut session: Session, outgoing: bool) { tokio::spawn(async move { let mut peer_public_key = [0; PUBLIC_KEY_LENGTH]; @@ -546,17 +625,10 @@ impl SessionManager { session_manager.with_ui_connection(|ui_connection| { ui_connection.on_new_session(&session_id, &ip.to_string(), outgoing, &crypto::generate_fingerprint(&peer_public_key), ip, None); }); - if !is_contact { - match session_manager.encrypt_and_send(&mut session, &protocol::ask_profile_info()).await { - Ok(_) => {} - Err(e) => { - print_error!(e); - session_manager.remove_session(&session_id); - return; - } - } + match session_manager.on_session_initialized(&mut session, session_id, is_contact).await { + Ok(_) => session_manager.session_worker(session_id, receiver, session).await, + Err(e) => print_error!(e) } - session_manager.session_worker(session_id, receiver, session).await; session_manager.remove_session(&session_id); } } @@ -623,20 +695,22 @@ impl SessionManager { let contact = self.identity.read().unwrap().as_ref().unwrap().add_contact(session.name.clone(), session.avatar, session.peer_public_key)?; self.loaded_contacts.write().unwrap().insert(session_id, contact); self.last_loaded_msg_offsets.write().unwrap().insert(session_id, 0); + self.pending_msgs.lock().unwrap().insert(session_id, Vec::new()); Ok(()) } - pub fn remove_contact(&self, session_id: usize) -> Result { + pub fn remove_contact(&self, session_id: &usize) -> Result { let mut loaded_contacts = self.loaded_contacts.write().unwrap(); - let result = Identity::remove_contact(&loaded_contacts.get(&session_id).unwrap().uuid); + let result = Identity::remove_contact(&loaded_contacts.get(session_id).unwrap().uuid); if result.is_ok() { - if let Some(contact) = loaded_contacts.remove(&session_id) { - if let Some(session) = self.sessions.write().unwrap().get_mut(&session_id) { + if let Some(contact) = loaded_contacts.remove(session_id) { + if let Some(session) = self.sessions.write().unwrap().get_mut(session_id) { session.name = contact.name; session.avatar = contact.avatar; } } - self.last_loaded_msg_offsets.write().unwrap().remove(&session_id); + self.last_loaded_msg_offsets.write().unwrap().remove(session_id); + self.pending_msgs.lock().unwrap().remove(session_id); } result } @@ -761,6 +835,7 @@ impl SessionManager { not_seen.push(*session_counter); } loaded_contacts.insert(*session_counter, contact); + self.pending_msgs.lock().unwrap().insert(*session_counter, Vec::new()); *session_counter += 1; }) } @@ -782,6 +857,7 @@ impl SessionManager { loaded_contacts: RwLock::new(HashMap::new()), last_loaded_msg_offsets: RwLock::new(HashMap::new()), saved_msgs: RwLock::new(HashMap::new()), + pending_msgs: Mutex::new(HashMap::new()), not_seen: RwLock::new(Vec::new()), mdns_service: Mutex::new(None), listener_stop_signal: Mutex::new(None), diff --git a/src/ui_interface.rs b/src/ui_interface.rs index aff0f35..8c045ef 100644 --- a/src/ui_interface.rs +++ b/src/ui_interface.rs @@ -1,6 +1,7 @@ use std::{net::{IpAddr, TcpStream}}; use tungstenite::{WebSocket, protocol::Role, Message}; -use crate::{identity, protocol, session_manager::{LargeFileDownload, LargeFilesDownload}}; +use uuid::Uuid; +use crate::{identity, session_manager::{LargeFileDownload, LargeFilesDownload}}; mod ui_messages { use std::{fmt::Display, iter::FromIterator, net::IpAddr, str::from_utf8}; @@ -23,15 +24,8 @@ mod ui_messages { pub fn on_new_session(session_id: &usize, name: &str, outgoing: bool, fingerprint: &str, ip: IpAddr) -> Message { Message::from(format!("new_session {} {} {} {} {}", session_id, outgoing, fingerprint, ip, name)) } - pub fn on_file_received(session_id: &usize, timestamp: u64, buffer: &[u8]) -> Option { - let uuid = Uuid::from_bytes(to_uuid_bytes(&buffer[1..17])?); - match from_utf8(&buffer[17..]) { - Ok(file_name) => Some(Message::from(format!("file {} {} {} {}", session_id, timestamp, uuid.to_string(), file_name))), - Err(e) => { - print_error!(e); - None - } - } + pub fn on_new_file(session_id: &usize, outgoing: bool, timestamp: u64, filename: &str, uuid: Uuid) -> Message { + Message::from(format!("file {} {} {} {} {}", session_id, outgoing, timestamp, uuid.to_string(), filename)) } pub fn new_files_transfer(session_id: &usize, files_transfer: &LargeFilesDownload) -> Message { if files_transfer.accepted { @@ -68,10 +62,7 @@ mod ui_messages { pub fn on_large_files_accepted(session_id: &usize) -> Message { simple_event("files_accepted", session_id) } - pub fn on_file_transfer_aborted(session_id: &usize) -> Message { - simple_event("aborted", session_id) - } - pub fn on_new_message(session_id: &usize, message: identity::Message) -> Option { + pub fn on_new_message(session_id: &usize, message: &identity::Message) -> Option { match from_utf8(&message.data[1..]) { Ok(msg) => Some(Message::from(format!("new_message {} {} {} {}", session_id, message.outgoing, message.timestamp, msg))), Err(e) => { @@ -83,6 +74,9 @@ mod ui_messages { pub fn inc_files_transfer(session_id: &usize, chunk_size: u64) -> Message { Message::from(format!("inc_file_transfer {} {}", session_id, chunk_size)) } + pub fn on_file_transfer_aborted(session_id: &usize) -> Message { + simple_event("aborted", session_id) + } pub fn load_msgs(session_id: &usize, msgs: &Vec) -> Message { let mut s = format!("load_msgs {}", session_id); msgs.into_iter().rev().for_each(|message| { @@ -106,6 +100,15 @@ mod ui_messages { pub fn set_not_seen(session_ids: Vec) -> Message { data_list("not_seen", session_ids) } + pub fn new_pending_msg(session_id: &usize, is_file: bool, data: &str) -> Message { + Message::from(format!("pending {} {} {}", session_id, is_file, data)) + } + pub fn on_sending_pending_msgs(session_id: &usize) -> Message { + simple_event("sending_pending_msgs", session_id) + } + pub fn on_pending_msgs_sent(session_id: &usize) -> Message { + simple_event("pending_msgs_sent", session_id) + } pub fn set_local_ips(ips: Vec) -> Message { data_list("local_ips", ips) } @@ -148,39 +151,24 @@ impl UiConnection { } } - pub fn on_received(&mut self, session_id: &usize, timestamp: u64, buffer: Vec) { - let ui_message = match buffer[0] { - protocol::Headers::MESSAGE => ui_messages::on_new_message(session_id, identity::Message { - outgoing: false, - timestamp, - data: buffer - }), - protocol::Headers::FILE => ui_messages::on_file_received(session_id, timestamp, &buffer), - protocol::Headers::ACCEPT_LARGE_FILES => Some(ui_messages::on_large_files_accepted(session_id)), - protocol::Headers::ABORT_FILES_TRANSFER => Some(ui_messages::on_file_transfer_aborted(session_id)), - _ => None - }; - if ui_message.is_some() { - self.write_message(ui_message.unwrap()) - } - } pub fn on_ask_large_files(&mut self, session_id: &usize, files: &Vec, download_location: &str) { - self.write_message(ui_messages::on_ask_large_files(session_id, files, download_location)) + self.write_message(ui_messages::on_ask_large_files(session_id, files, download_location)); } - pub fn on_msg_sent(&mut self, session_id: usize, timestamp: u64, buffer: Vec) { - match buffer[0] { - protocol::Headers::MESSAGE => match ui_messages::on_new_message(&session_id, identity::Message { - outgoing: true, - timestamp, - data: buffer - }) { - Some(msg) => self.write_message(msg), - None => {} - } - protocol::Headers::ABORT_FILES_TRANSFER => self.write_message(ui_messages::on_file_transfer_aborted(&session_id)), - _ => {} + pub fn on_large_files_accepted(&mut self, session_id: &usize) { + self.write_message(ui_messages::on_large_files_accepted(session_id)); + } + pub fn on_file_transfer_aborted(&mut self, session_id: &usize) { + self.write_message(ui_messages::on_file_transfer_aborted(&session_id)); + } + pub fn on_new_msg(&mut self, session_id: &usize, message: &identity::Message) { + match ui_messages::on_new_message(session_id, message) { + Some(msg) => self.write_message(msg), + None => {} } } + pub fn on_new_file(&mut self, session_id: &usize, outgoing: bool, timestamp: u64, filename: &str, uuid: Uuid) { + self.write_message(ui_messages::on_new_file(session_id, outgoing, timestamp, filename, uuid)); + } pub fn on_new_session(&mut self, session_id: &usize, name: &str, outgoing: bool, fingerprint: &str, ip: IpAddr, files_transfer: Option<&LargeFilesDownload>) { self.write_message(ui_messages::on_new_session(session_id, name, outgoing, fingerprint, ip)); if let Some(files_transfer) = files_transfer { @@ -209,6 +197,15 @@ impl UiConnection { pub fn set_not_seen(&mut self, session_ids: Vec) { self.write_message(ui_messages::set_not_seen(session_ids)); } + pub fn new_pending_msg(&mut self, session_id: &usize, is_file: bool, data: &str) { + self.write_message(ui_messages::new_pending_msg(session_id, is_file, data)); + } + pub fn on_sending_pending_msgs(&mut self, session_id: &usize) { + self.write_message(ui_messages::on_sending_pending_msgs(session_id)); + } + pub fn on_pending_msgs_sent(&mut self, session_id: &usize) { + self.write_message(ui_messages::on_pending_msgs_sent(session_id)); + } pub fn set_local_ips(&mut self, ips: Vec) { self.write_message(ui_messages::set_local_ips(ips)); }