Pending messages

This commit is contained in:
Matéo Duparc 2021-07-23 12:04:29 +02:00
parent 5081f9dbf4
commit dda30bf5af
Signed by: hardcoresushi
GPG Key ID: 007F84120107191E
8 changed files with 481 additions and 199 deletions

View File

@ -402,10 +402,6 @@ button:hover::after {
height: 100%;
background-color: var(--accent);
}
#message_box {
border-top: 2px solid var(--accent);
margin-bottom: 0;
}
#msg_log {
overflow-y: auto;
white-space: pre-wrap;
@ -472,9 +468,139 @@ button:hover::after {
width: 2em;
margin-left: 15px;
}
#message_box, #chat_header, #msg_log, #file_transfer {
#message_box, #message_box.online #offline_warning, #chat_header, #msg_log, #file_transfer {
display: none;
}
#message_box.active {
display: block;
}
#message_box {
border-top: 2px solid red;
margin-bottom: 0;
}
#message_box>div:nth-child(2) {
display: flex;
}
#message_box.online {
border-top-color: var(--accent);
}
#offline_warning {
margin-left: 20px;
display: flex;
align-items: center;
gap: 25px;
}
#offline_warning::before {
content: url("/static/imgs/icons/warning/ff0000");
display: block;
width: 2em;
}
#offline_warning h3 {
color: red;
display: inline-block;
margin-bottom: .3em;
}
#offline_warning p {
margin-top: 0;
}
#msg_log li.pending_msgs_divider {
border-top: 1px solid grey;
padding-top: 10px;
margin-top: 30px;
margin-left: 100px;
margin-right: 100px;
}
#msg_log li.pending_msgs_divider h4 {
margin: auto;
opacity: .5;
}
.lds-spinner {
color: official;
position: relative;
width: 82px;
height: 82px;
}
.lds-spinner div {
transform-origin: 40px 40px;
animation: lds-spinner 1.2s linear infinite;
}
.lds-spinner div:after {
content: " ";
display: block;
position: absolute;
top: 3px;
left: 37px;
width: 6px;
height: 18px;
border-radius: 20%;
background: #fff;
}
.lds-spinner div:nth-child(1) {
transform: rotate(0deg);
animation-delay: -1.1s;
}
.lds-spinner div:nth-child(2) {
transform: rotate(30deg);
animation-delay: -1s;
}
.lds-spinner div:nth-child(3) {
transform: rotate(60deg);
animation-delay: -0.9s;
}
.lds-spinner div:nth-child(4) {
transform: rotate(90deg);
animation-delay: -0.8s;
}
.lds-spinner div:nth-child(5) {
transform: rotate(120deg);
animation-delay: -0.7s;
}
.lds-spinner div:nth-child(6) {
transform: rotate(150deg);
animation-delay: -0.6s;
}
.lds-spinner div:nth-child(7) {
transform: rotate(180deg);
animation-delay: -0.5s;
}
.lds-spinner div:nth-child(8) {
transform: rotate(210deg);
animation-delay: -0.4s;
}
.lds-spinner div:nth-child(9) {
transform: rotate(240deg);
animation-delay: -0.3s;
}
.lds-spinner div:nth-child(10) {
transform: rotate(270deg);
animation-delay: -0.2s;
}
.lds-spinner div:nth-child(11) {
transform: rotate(300deg);
animation-delay: -0.1s;
}
.lds-spinner div:nth-child(12) {
transform: rotate(330deg);
animation-delay: 0s;
}
@keyframes lds-spinner {
0% {
opacity: 1;
}
100% {
opacity: 0;
}
}
#pending_msgs_indicator {
display: none;
align-items: center;
justify-content: center;
gap: 15px;
margin-bottom: 20px;
}
#pending_msgs_indicator.sending {
display: flex;
}
#disconnected {
display: none;
height: 100%;

View File

@ -41,6 +41,10 @@
</div>
<ul id="msg_log">
</ul>
<div id="pending_msgs_indicator">
<div class="lds-spinner"><div></div><div></div><div></div><div></div><div></div><div></div><div></div><div></div><div></div><div></div><div></div><div></div></div>
<h3>Sending pending messages...</h3>
</div>
<div id="file_transfer">
<div id="file_control">
<button id="file_cancel" title="Cancel"></button>
@ -56,10 +60,18 @@
</div>
</div>
<div id="message_box">
<input type="text" id="message_input" placeholder="Send a message...">
<label title="Send file" class="file_picker">
<input type="file" id="attach_file" multiple>
</label>
<div id="offline_warning">
<div>
<h3>Your contact seems to be offline.</h3>
<p>Sent messages will be stored until a connection is established.</p>
</div>
</div>
<div>
<input type="text" id="message_input" placeholder="Send a message...">
<label title="Send file" class="file_picker">
<input type="file" id="attach_file" multiple>
</label>
</div>
</div>
<div id="disconnected">
<img src="/static/imgs/icons/warning/ff0000">
@ -80,4 +92,3 @@
<script src="/static/index.js"></script>
</body>
</html>

View File

@ -7,6 +7,7 @@ let localIps = [];
let currentSessionId = -1;
let sessionsData = new Map();
let msgHistory = new Map();
let pendingMsgs = new Map();
let pendingFilesTransfers = new Map();
let avatarTimestamps = new Map([
["self", Date.now()]
@ -80,6 +81,7 @@ document.getElementById("delete_conversation").onclick = function() {
document.getElementById("add_contact").onclick = function() {
socket.send("contact "+currentSessionId);
sessionsData.get(currentSessionId).isContact = true;
pendingMsgs.set(currentSessionId, []);
displayHeader();
displaySessions();
};
@ -103,7 +105,9 @@ document.getElementById("remove_contact").onclick = function() {
if (!session.isOnline) {
sessionsData.delete(currentSessionId);
msgHistory.get(currentSessionId).length = 0;
displayChatBottom();
}
pendingMsgs.delete(currentSessionId);
displayHeader();
displaySessions();
displayHistory();
@ -215,7 +219,11 @@ document.getElementById("attach_file").onchange = function(event) {
formData.append("", files[i]);
fetch("/send_file", {method: "POST", body: formData}).then(response => {
if (response.ok) {
response.text().then(uuid => onFileSent(currentSessionId, new Date(), uuid, files[i].name));
response.text().then(text => {
if (text === "pending") {
newPendingMsg(currentSessionId, true, files[i].name);
}
});
} else {
console.log(response);
}
@ -226,16 +234,19 @@ document.getElementById("attach_file").onchange = function(event) {
document.getElementById("file_cancel").onclick = function() {
socket.send("abort "+currentSessionId);
};
let msg_log = document.getElementById("msg_log");
msg_log.onscroll = function() {
if (sessionsData.get(currentSessionId).isContact) {
if (msg_log.scrollTop < 30) {
socket.send("load_msgs "+currentSessionId);
let msgLog = document.getElementById("msg_log");
msgLog.onscroll = function() {
let session = sessionsData.get(currentSessionId);
if (typeof sessions !== "undefined") {
if (session.isContact) {
if (msgLog.scrollTop < 30) {
socket.send("load_msgs "+currentSessionId);
}
}
}
};
let profile_div = document.querySelector("#me>div");
profile_div.onclick = function() {
let profileDiv = document.querySelector("#me>div");
profileDiv.onclick = function() {
let mainDiv = document.createElement("div");
mainDiv.id = "profile_info";
let avatarContainer = document.createElement("div");
@ -480,7 +491,7 @@ socket.onmessage = function(msg) {
onNewMessage(args[1], args[2] === "true", parseTimestamp(args[3]), msg.data.slice(args[0].length+args[1].length+args[2].length+args[3].length+4));
break;
case "file":
onFileReceived(args[1], parseTimestamp(args[2]), args[3], msg.data.slice(args[0].length+args[1].length+args[2].length+args[3].length+4));
onNewFile(args[1], args[2] === "true", parseTimestamp(args[3]), args[4], msg.data.slice(args[0].length+args[1].length+args[2].length+args[3].length+args[4].length+5));
break;
case "files_transfer":
onNewFilesTransfer(args[1], args[2], msg.data.slice(args[0].length+args[1].length+args[2].length+3));
@ -509,6 +520,15 @@ socket.onmessage = function(msg) {
case "not_seen":
setNotSeen(msg.data.slice(args[0].length+1));
break;
case "pending":
newPendingMsg(args[1], args[2] === "true", msg.data.slice(args[0].length+args[1].length+args[2].length+3));
break;
case "sending_pending_msgs":
onSendingPendingMsgs(args[1]);
break;
case "pending_msgs_sent":
onPendingMsgsSent(args[1]);
break;
case "local_ips":
setLocalIps(msg.data.slice(args[0].length+1));
break;
@ -579,6 +599,22 @@ function setNotSeen(strSessionIds) {
}
displaySessions();
}
function newPendingMsg(sessionId, isFile, data) {
pendingMsgs.get(sessionId).push([isFile, data]);
if (sessionId == currentSessionId) {
displayHistory();
}
}
function onSendingPendingMsgs(sessionId) {
document.getElementById("pending_msgs_indicator").classList.add("sending");
pendingMsgs.get(sessionId).length = 0;
if (sessionId == currentSessionId) {
displayHistory();
}
}
function onPendingMsgsSent(sessionId) {
document.getElementById("pending_msgs_indicator").classList.remove("sending");
}
function setLocalIps(strIPs) {
localIps = strIPs.split(' ');
}
@ -591,6 +627,7 @@ function onIsContact(sessionId, verified, fingerprint, name) {
} else {
addSession(sessionId, name, undefined, fingerprint, undefined, true, verified, false);
}
pendingMsgs.set(sessionId, []);
}
function onMsgOrFileReceived(sessionId, outgoing, body) {
if (currentSessionId == sessionId) {
@ -616,6 +653,10 @@ function onNewMessage(sessionId, outgoing, timestamp, msg) {
msgHistory.get(sessionId).push([outgoing, timestamp, false, msg]);
onMsgOrFileReceived(sessionId, outgoing, msg);
}
function onNewFile(sessionId, outgoing, timestamp, uuid, filename) {
msgHistory.get(sessionId).push([outgoing, timestamp, true, [uuid, filename]]);
onMsgOrFileReceived(sessionId, outgoing, filename);
}
function onNewFilesTransfer(sessionId, index, filesInfo) {
let split = filesInfo.split(' ');
let files = [];
@ -763,12 +804,12 @@ function onMsgsLoad(sessionId, strMsgs) {
}
}
if (currentSessionId == sessionId) {
if (msg_log.scrollHeight - msg_log.scrollTop === msg_log.clientHeight) {
if (msgLog.scrollHeight - msgLog.scrollTop === msgLog.clientHeight) {
displayHistory();
} else {
let backupHeight = msg_log.scrollHeight;
let backupHeight = msgLog.scrollHeight;
displayHistory(false);
msg_log.scrollTop = msg_log.scrollHeight-backupHeight;
msgLog.scrollTop = msgLog.scrollHeight-backupHeight;
}
}
}
@ -783,6 +824,7 @@ function onDisconnected(sessionId) {
}
if (currentSessionId == sessionId) {
displayChatBottom();
scrollHistoryToBottom();
}
if (currentSessionId == sessionId && !session.isContact) {
currentSessionId = -1;
@ -790,16 +832,6 @@ function onDisconnected(sessionId) {
}
displaySessions();
}
function onFileReceived(sessionId, timestamp, uuid, file_name) {
msgHistory.get(sessionId).push([false, timestamp, true, [uuid, file_name]]);
onMsgOrFileReceived(sessionId, false, file_name);
}
function onFileSent(sessionId, timestamp, uuid, file_name) {
msgHistory.get(sessionId).push([true, timestamp, true, [uuid, file_name]]);
if (currentSessionId == sessionId) {
displayHistory();
}
}
function onNameSet(newName) {
removePopup();
identityName = newName;
@ -946,11 +978,11 @@ function logout() {
window.location = "/logout";
}
function displayProfile() {
profile_div.innerHTML = "";
profile_div.appendChild(generateSelfAvatar(avatarTimestamps.get("self")));
profileDiv.innerHTML = "";
profileDiv.appendChild(generateSelfAvatar(avatarTimestamps.get("self")));
let p = document.createElement("p");
p.textContent = identityName;
profile_div.appendChild(p);
profileDiv.appendChild(p);
}
function displayHeader() {
chatHeader.children[0].innerHTML = "";
@ -1066,7 +1098,7 @@ function generateMessage(name, sessionId, msg) {
divContainer.appendChild(div);
return divContainer;
}
function generateFile(name, sessionId, outgoing, file_info) {
function generateFile(name, sessionId, outgoing, fileInfo) {
let div1 = document.createElement("div");
div1.classList.add("file");
div1.classList.add("content");
@ -1078,14 +1110,18 @@ function generateFile(name, sessionId, outgoing, file_info) {
h4.textContent = "File received:";
}
div2.appendChild(h4);
let p = document.createElement("p");
p.textContent = file_info[1];
div2.appendChild(p);
div1.appendChild(div2);
let a = document.createElement("a");
a.href = "/load_file?uuid="+file_info[0]+"&file_name="+encodeURIComponent(file_info[1]);
a.target = "_blank";
div1.appendChild(a);
let p = document.createElement("p");
if (typeof fileInfo === "string") { //pending
p.textContent = fileInfo;
} else {
p.textContent = fileInfo[1];
let a = document.createElement("a");
a.href = "/load_file?uuid="+fileInfo[0]+"&file_name="+encodeURIComponent(fileInfo[1]);
a.target = "_blank";
div1.appendChild(a);
}
div2.appendChild(p);
let divContainer = document.createElement("div");
if (typeof name !== "undefined") {
divContainer.appendChild(generateMsgHeader(name, sessionId));
@ -1104,13 +1140,16 @@ function displayChatBottom(speed = undefined) {
let fileTransfer = document.getElementById("file_transfer");
let session = sessionsData.get(currentSessionId);
if (typeof session === "undefined") {
msgBox.removeAttribute("style");
msgBox.classList.remove("active");
fileTransfer.classList.remove("active");
} else {
if (session.isContact || session.isOnline) {
msgBox.classList.add("active");
}
if (session.isOnline) {
msgBox.style.display = "flex";
msgBox.classList.add("online");
} else {
msgBox.removeAttribute("style");
msgBox.classList.remove("online");
}
if (pendingFilesTransfers.has(currentSessionId)) {
let fileInfo = document.getElementById("file_info");
@ -1155,13 +1194,16 @@ function displayChatBottom(speed = undefined) {
}
}
}
function scrollHistoryToBottom() {
msgLog.scrollTop = msgLog.scrollHeight;
}
function displayHistory(scrollToBottom = true) {
msg_log.innerHTML = "";
msgLog.innerHTML = "";
let session = sessionsData.get(currentSessionId);
if (typeof session === "undefined") {
msg_log.style.display = "none";
msgLog.style.display = "none";
} else {
msg_log.style.display = "block";
msgLog.style.display = "block";
let previousOutgoing = undefined;
msgHistory.get(currentSessionId).forEach(entry => {
let name = undefined;
@ -1184,12 +1226,39 @@ function displayHistory(scrollToBottom = true) {
let li = document.createElement("li");
li.appendChild(div);
li.appendChild(generateMessageTimestamp(entry[1]));
msg_log.appendChild(li);
msgLog.appendChild(li);
});
if (scrollToBottom) {
msg_log.scrollTop = msg_log.scrollHeight;
if (session.isContact) {
let msgs = pendingMsgs.get(currentSessionId);
if (msgs.length > 0) {
let li = document.createElement("li");
li.classList.add("pending_msgs_divider");
let h4 = document.createElement("h4");
h4.textContent = "Pending messages:";
li.appendChild(h4);
msgLog.appendChild(li);
msgs.forEach(entry => {
let name = undefined;
if (previousOutgoing != true) {
previousOutgoing = true;
name = identityName;
}
let div;
if (entry[0]) { //is file
div = generateFile(name, currentSessionId, true, entry[1]);
} else {
div = generateMessage(name, currentSessionId, entry[1]);
}
let li = document.createElement("li");
li.appendChild(div);
msgLog.appendChild(li);
});
}
}
if (msg_log.scrollHeight <= msg_log.clientHeight && session.isContact) {
if (scrollToBottom) {
scrollHistoryToBottom();
}
if (msgLog.scrollHeight <= msgLog.clientHeight && session.isContact) {
socket.send("load_msgs "+currentSessionId);
}
}

View File

@ -249,7 +249,7 @@ impl Identity {
Ok(file_uuid)
}
pub fn store_msg(&self, contact_uuid: &Uuid, message: Message) -> Result<usize, rusqlite::Error> {
pub fn store_msg(&self, contact_uuid: &Uuid, message: &Message) -> Result<usize, rusqlite::Error> {
let db = Connection::open(get_database_path())?;
db.execute(&format!("CREATE TABLE IF NOT EXISTS \"{}\" (outgoing BLOB, timestamp BLOB, data BLOB)", contact_uuid), [])?;
let outgoing_byte: u8 = bool_to_byte(message.outgoing);

View File

@ -23,7 +23,6 @@ use utils::escape_double_quote;
use identity::Identity;
use session_manager::{SessionManager, SessionCommand};
use ui_interface::UiConnection;
use crate::{identity::Message, utils::get_unix_timestamp_sec};
async fn start_websocket_server(global_vars: Arc<RwLock<GlobalVars>>) -> u16 {
let websocket_bind_addr = env::var("AIRA_WEBSOCKET_ADDR").unwrap_or("127.0.0.1".to_owned());
@ -120,6 +119,20 @@ async fn websocket_worker(mut ui_connection: UiConnection, global_vars: Arc<RwLo
ui_connection.load_msgs(&msgs.0, &msgs.1);
}
});
session_manager.pending_msgs.lock().unwrap().iter().for_each(|entry| {
entry.1.iter().for_each(|buff| {
match buff[0] {
protocol::Headers::MESSAGE => match from_utf8(&buff[1..]) {
Ok(msg) => ui_connection.new_pending_msg(entry.0, false, msg),
Err(e) => print_error!(e)
}
protocol::Headers::FILE => if let Some(filename) = protocol::get_file_name(buff) {
ui_connection.new_pending_msg(entry.0, true, filename);
}
_ => {}
}
});
});
let mut ips = Vec::new();
match if_addrs::get_if_addrs() {
Ok(ifaces) => for iface in ifaces {
@ -160,15 +173,11 @@ async fn websocket_worker(mut ui_connection: UiConnection, global_vars: Arc<RwLo
"refresh" => discover_peers(session_manager.clone()),
"send" => {
let session_id: usize = args[1].parse().unwrap();
let buffer = protocol::new_message(msg[args[0].len()+args[1].len()+2..].to_string());
let timestamp = get_unix_timestamp_sec();
if session_manager.send_command(&session_id, SessionCommand::Send {
buff: buffer.clone()
}).await {
session_manager.store_msg(&session_id, Message {
outgoing: true,
timestamp,
data: buffer,
let msg_content = &msg[args[0].len()+args[1].len()+2..];
let buffer = protocol::new_message(msg_content);
#[allow(unused_must_use)] {
session_manager.send_or_add_to_pending(&session_id, buffer).await.map(|sent| if !sent {
ui_connection.new_pending_msg(&session_id, false, msg_content);
});
}
}
@ -178,9 +187,9 @@ async fn websocket_worker(mut ui_connection: UiConnection, global_vars: Arc<RwLo
for n in (2..args.len()).step_by(2) {
file_info.push((args[n].parse::<u64>().unwrap(), base64::decode(args[n+1]).unwrap()));
}
session_manager.send_command(&session_id, SessionCommand::Send {
buff: protocol::ask_large_files(file_info)
}).await;
#[allow(unused_must_use)] {
session_manager.send_or_add_to_pending(&session_id, protocol::ask_large_files(file_info)).await;
}
}
"download" => {
let session_id: usize = args[1].parse().unwrap();
@ -211,7 +220,7 @@ async fn websocket_worker(mut ui_connection: UiConnection, global_vars: Arc<RwLo
}
"uncontact" => {
let session_id: usize = args[1].parse().unwrap();
match session_manager.remove_contact(session_id) {
match session_manager.remove_contact(&session_id) {
Ok(_) => {},
Err(e) => print_error!(e)
}
@ -440,22 +449,12 @@ async fn handle_send_file(req: HttpRequest, mut payload: Multipart) -> HttpRespo
while let Some(Ok(chunk)) = field.next().await {
buffer.extend(chunk);
}
let timestamp = get_unix_timestamp_sec();
if global_vars_read.session_manager.send_command(&session_id, SessionCommand::Send {
buff: protocol::file(filename, &buffer)
}).await {
match global_vars_read.session_manager.store_file(&session_id, &buffer) {
Ok(file_uuid) => {
let msg = [&[protocol::Headers::FILE][..], file_uuid.as_bytes(), filename.as_bytes()].concat();
global_vars_read.session_manager.store_msg(&session_id, Message {
outgoing: true,
timestamp,
data: msg,
});
return HttpResponse::Ok().body(file_uuid.to_string());
}
Err(e) => print_error!(e)
}
if let Ok(sent) = global_vars_read.session_manager.send_or_add_to_pending(&session_id, protocol::file(filename, &buffer)).await {
return if sent {
HttpResponse::Ok().finish()
} else {
HttpResponse::Ok().body("pending")
};
}
} else {
let (ack_sender, mut ack_receiver) = mpsc::channel(1);

View File

@ -17,7 +17,7 @@ impl Headers {
pub const ABORT_FILES_TRANSFER: u8 = 0x0a;
}
pub fn new_message(message: String) -> Vec<u8> {
pub fn new_message(message: &str) -> Vec<u8> {
[&[Headers::MESSAGE], message.as_bytes()].concat()
}
@ -33,17 +33,21 @@ pub fn file(file_name: &str, buffer: &[u8]) -> Vec<u8> {
[&[Headers::FILE], &(file_name.len() as u16).to_be_bytes()[..], file_name.as_bytes(), buffer].concat()
}
pub fn parse_file<'a>(buffer: &'a [u8]) -> Option<(&'a [u8], &'a [u8])> {
pub fn get_file_name<'a>(buffer: &'a [u8]) -> Option<&'a str> {
if buffer.len() > 3 {
let file_name_len = u16::from_be_bytes([buffer[1], buffer[2]]) as usize;
if buffer.len() > 3+file_name_len {
let file_name = &buffer[3..3+file_name_len];
return Some((file_name, &buffer[3+file_name_len..]));
return from_utf8(&buffer[3..3+file_name_len]).ok();
}
}
None
}
pub fn parse_file<'a>(buffer: &'a [u8]) -> Option<(&'a str, &'a [u8])> {
let file_name = get_file_name(buffer)?;
Some((file_name, &buffer[3+file_name.len()..]))
}
pub fn ask_large_files(file_info: Vec<(u64, Vec<u8>)>) -> Vec<u8> {
let mut buff = vec![Headers::ASK_LARGE_FILES];
file_info.into_iter().for_each(|info| {

View File

@ -54,7 +54,8 @@ pub struct SessionManager {
ui_connection: Mutex<Option<UiConnection>>,
loaded_contacts: RwLock<HashMap<usize, Contact>>,
pub last_loaded_msg_offsets: RwLock<HashMap<usize, usize>>,
pub saved_msgs: RwLock<HashMap<usize, Vec<Message>>>,
saved_msgs: RwLock<HashMap<usize, Vec<Message>>>,
pub pending_msgs: Mutex<HashMap<usize, Vec<Vec<u8>>>>,
pub not_seen: RwLock<Vec<usize>>,
mdns_service: Mutex<Option<Service>>,
listener_stop_signal: Mutex<Option<Sender<()>>>,
@ -91,7 +92,7 @@ impl SessionManager {
let mut msg_saved = false;
if let Some(contact) = self.loaded_contacts.read().unwrap().get(session_id) {
let mut offsets = self.last_loaded_msg_offsets.write().unwrap(); //locking mutex before modifying the DB to prevent race conditions with load_msgs()
match self.identity.read().unwrap().as_ref().unwrap().store_msg(&contact.uuid, message.clone()) {
match self.identity.read().unwrap().as_ref().unwrap().store_msg(&contact.uuid, &message) {
Ok(_) => {
*offsets.get_mut(session_id).unwrap() += 1;
msg_saved = true;
@ -108,8 +109,8 @@ impl SessionManager {
}
fn get_session_sender(&self, session_id: &usize) -> Option<Sender<SessionCommand>> {
let mut sessions = self.sessions.write().unwrap();
match sessions.get_mut(session_id) {
let sessions = self.sessions.read().unwrap();
match sessions.get(session_id) {
Some(session_data) => Some(session_data.sender.clone()),
None => None
}
@ -129,6 +130,21 @@ impl SessionManager {
}
}
pub async fn send_or_add_to_pending(&self, session_id: &usize, buff: Vec<u8>) -> Result<bool, ()> {
if let Some(sender) = self.get_session_sender(session_id) {
match sender.send(SessionCommand::Send { buff }).await {
Ok(_) => Ok(true),
Err(e) => {
print_error!(e);
Err(())
}
}
} else {
self.pending_msgs.lock().unwrap().get_mut(session_id).unwrap().push(buff);
Ok(false)
}
}
fn remove_session(&self, session_id: &usize) {
self.with_ui_connection(|ui_connection| {
ui_connection.on_disconnected(&session_id);
@ -153,23 +169,66 @@ impl SessionManager {
});
}
async fn send_msg(&self, session_id: usize, session_write: &mut SessionWriteHalf, buff: Vec<u8>, is_sending: &mut bool, file_ack_sender: &mut Option<Sender<bool>>) -> Result<(), PsecError> {
self.encrypt_and_send(session_write, &buff).await?;
if buff[0] == protocol::Headers::ACCEPT_LARGE_FILES {
self.sessions.write().unwrap().get_mut(&session_id).unwrap().files_download.as_mut().unwrap().accepted = true;
} else if buff[0] == protocol::Headers::ABORT_FILES_TRANSFER {
self.sessions.write().unwrap().get_mut(&session_id).unwrap().files_download = None;
*is_sending = false;
if let Some(ack_sender) = file_ack_sender {
if let Err(e) = ack_sender.send(false).await {
print_error!(e);
async fn send_store_and_inform<T: PsecWriter>(&self, session_id: usize, session_writer: &mut T, buff: Vec<u8>) -> Result<Option<Vec<u8>>, PsecError> {
self.encrypt_and_send(session_writer, &buff).await?;
let timestamp = get_unix_timestamp_sec();
Ok(match buff[0] {
protocol::Headers::MESSAGE => {
let msg = Message {
outgoing: true,
timestamp,
data: buff,
};
self.with_ui_connection(|ui_connection| {
ui_connection.on_new_msg(&session_id, &msg);
});
self.store_msg(&session_id, msg);
None
}
protocol::Headers::FILE => {
if let Some((filename, content)) = protocol::parse_file(&buff) {
match self.store_file(&session_id, content) {
Ok(file_uuid) => {
let msg = [&[protocol::Headers::FILE][..], file_uuid.as_bytes(), filename.as_bytes()].concat();
self.store_msg(&session_id, Message {
outgoing: true,
timestamp,
data: msg,
});
self.with_ui_connection(|ui_connection| {
ui_connection.on_new_file(&session_id, true, timestamp, filename, file_uuid);
});
}
Err(e) => print_error!(e)
}
}
*file_ack_sender = None;
None
}
_ => Some(buff)
})
}
async fn send_msg(&self, session_id: usize, session_write: &mut SessionWriteHalf, buff: Vec<u8>, is_sending: &mut bool, file_ack_sender: &mut Option<Sender<bool>>) -> Result<(), PsecError> {
if let Some(buff) = self.send_store_and_inform(session_id, session_write, buff).await? {
//not a message or a file
match buff[0] {
protocol::Headers::ACCEPT_LARGE_FILES => self.sessions.write().unwrap().get_mut(&session_id).unwrap().files_download.as_mut().unwrap().accepted = true,
protocol::Headers::ABORT_FILES_TRANSFER => {
self.sessions.write().unwrap().get_mut(&session_id).unwrap().files_download = None;
*is_sending = false;
if let Some(ack_sender) = file_ack_sender {
if let Err(e) = ack_sender.send(false).await {
print_error!(e);
}
*file_ack_sender = None;
}
self.with_ui_connection(|ui_connection| {
ui_connection.on_file_transfer_aborted(&session_id);
});
}
_ => {}
}
}
self.with_ui_connection(|ui_connection| {
ui_connection.on_msg_sent(session_id, get_unix_timestamp_sec(), buff);
});
Ok(())
}
@ -283,7 +342,7 @@ impl SessionManager {
self.sessions.write().unwrap().get_mut(&session_id).unwrap().files_download = None;
local_file_handle = None;
self.with_ui_connection(|ui_connection| {
ui_connection.on_received(&session_id, get_unix_timestamp_sec(), buffer);
ui_connection.on_file_transfer_aborted(&session_id);
});
}
protocol::Headers::ASK_LARGE_FILES => {
@ -371,45 +430,47 @@ impl SessionManager {
protocol::Headers::REMOVE_AVATAR => self.set_avatar_uuid(&session_id, None),
_ => {
let header = buffer[0];
let buffer = match header {
protocol::Headers::FILE => {
if let Some((file_name, content)) = protocol::parse_file(&buffer) {
match self.store_file(&session_id, content) {
Ok(file_uuid) => {
Some([&[protocol::Headers::FILE][..], file_uuid.as_bytes(), file_name].concat())
}
Err(e) => {
print_error!(e);
None
}
}
} else {
None
}
}
_ => {
Some(buffer)
}
};
if let Some(buffer) = buffer {
let is_classical_message = header == protocol::Headers::MESSAGE || header == protocol::Headers::FILE;
let timestamp = get_unix_timestamp_sec();
if is_classical_message {
self.set_seen(session_id, false);
} else if header == protocol::Headers::ACCEPT_LARGE_FILES {
is_sending = true;
last_chunks_sizes = Some(Vec::new());
}
self.with_ui_connection(|ui_connection| {
ui_connection.on_received(&session_id, timestamp, buffer.clone());
});
if is_classical_message {
self.store_msg(&session_id, Message {
let timestamp = get_unix_timestamp_sec();
match header {
protocol::Headers::MESSAGE => {
let msg = Message {
outgoing: false,
timestamp,
data: buffer,
};
self.with_ui_connection(|ui_connection| {
ui_connection.on_new_msg(&session_id, &msg);
});
self.store_msg(&session_id, msg);
}
protocol::Headers::FILE => {
if let Some((filename, content)) = protocol::parse_file(&buffer) {
match self.store_file(&session_id, content) {
Ok(file_uuid) => {
self.with_ui_connection(|ui_connection| {
ui_connection.on_new_file(&session_id, false, timestamp, filename, file_uuid);
});
self.store_msg(&session_id, Message {
outgoing: false,
timestamp,
data: [&[protocol::Headers::FILE][..], file_uuid.as_bytes(), filename.as_bytes()].concat(),
});
}
Err(e) => print_error!(e)
}
}
}
protocol::Headers::ACCEPT_LARGE_FILES => {
is_sending = true;
last_chunks_sizes = Some(Vec::new());
self.with_ui_connection(|ui_connection| {
ui_connection.on_large_files_accepted(&session_id);
})
}
_ => {}
}
if header == protocol::Headers::MESSAGE || header == protocol::Headers::FILE {
self.set_seen(session_id, false);
}
}
}
@ -468,6 +529,24 @@ impl SessionManager {
}
}
async fn on_session_initialized(&self, session: &mut Session, session_id: usize, is_contact: bool) -> Result<(), PsecError> {
if is_contact {
let pending_msgs = self.pending_msgs.lock().unwrap().get_mut(&session_id).unwrap().split_off(0);
self.with_ui_connection(|ui_connection| {
ui_connection.on_sending_pending_msgs(&session_id);
});
for buff in pending_msgs {
self.send_store_and_inform(session_id, session, buff).await?;
}
self.with_ui_connection(|ui_connection| {
ui_connection.on_pending_msgs_sent(&session_id);
});
Ok(())
} else {
self.encrypt_and_send(session, &protocol::ask_profile_info()).await
}
}
fn handle_new_session(session_manager: Arc<SessionManager>, mut session: Session, outgoing: bool) {
tokio::spawn(async move {
let mut peer_public_key = [0; PUBLIC_KEY_LENGTH];
@ -546,17 +625,10 @@ impl SessionManager {
session_manager.with_ui_connection(|ui_connection| {
ui_connection.on_new_session(&session_id, &ip.to_string(), outgoing, &crypto::generate_fingerprint(&peer_public_key), ip, None);
});
if !is_contact {
match session_manager.encrypt_and_send(&mut session, &protocol::ask_profile_info()).await {
Ok(_) => {}
Err(e) => {
print_error!(e);
session_manager.remove_session(&session_id);
return;
}
}
match session_manager.on_session_initialized(&mut session, session_id, is_contact).await {
Ok(_) => session_manager.session_worker(session_id, receiver, session).await,
Err(e) => print_error!(e)
}
session_manager.session_worker(session_id, receiver, session).await;
session_manager.remove_session(&session_id);
}
}
@ -623,20 +695,22 @@ impl SessionManager {
let contact = self.identity.read().unwrap().as_ref().unwrap().add_contact(session.name.clone(), session.avatar, session.peer_public_key)?;
self.loaded_contacts.write().unwrap().insert(session_id, contact);
self.last_loaded_msg_offsets.write().unwrap().insert(session_id, 0);
self.pending_msgs.lock().unwrap().insert(session_id, Vec::new());
Ok(())
}
pub fn remove_contact(&self, session_id: usize) -> Result<usize, rusqlite::Error> {
pub fn remove_contact(&self, session_id: &usize) -> Result<usize, rusqlite::Error> {
let mut loaded_contacts = self.loaded_contacts.write().unwrap();
let result = Identity::remove_contact(&loaded_contacts.get(&session_id).unwrap().uuid);
let result = Identity::remove_contact(&loaded_contacts.get(session_id).unwrap().uuid);
if result.is_ok() {
if let Some(contact) = loaded_contacts.remove(&session_id) {
if let Some(session) = self.sessions.write().unwrap().get_mut(&session_id) {
if let Some(contact) = loaded_contacts.remove(session_id) {
if let Some(session) = self.sessions.write().unwrap().get_mut(session_id) {
session.name = contact.name;
session.avatar = contact.avatar;
}
}
self.last_loaded_msg_offsets.write().unwrap().remove(&session_id);
self.last_loaded_msg_offsets.write().unwrap().remove(session_id);
self.pending_msgs.lock().unwrap().remove(session_id);
}
result
}
@ -761,6 +835,7 @@ impl SessionManager {
not_seen.push(*session_counter);
}
loaded_contacts.insert(*session_counter, contact);
self.pending_msgs.lock().unwrap().insert(*session_counter, Vec::new());
*session_counter += 1;
})
}
@ -782,6 +857,7 @@ impl SessionManager {
loaded_contacts: RwLock::new(HashMap::new()),
last_loaded_msg_offsets: RwLock::new(HashMap::new()),
saved_msgs: RwLock::new(HashMap::new()),
pending_msgs: Mutex::new(HashMap::new()),
not_seen: RwLock::new(Vec::new()),
mdns_service: Mutex::new(None),
listener_stop_signal: Mutex::new(None),

View File

@ -1,6 +1,7 @@
use std::{net::{IpAddr, TcpStream}};
use tungstenite::{WebSocket, protocol::Role, Message};
use crate::{identity, protocol, session_manager::{LargeFileDownload, LargeFilesDownload}};
use uuid::Uuid;
use crate::{identity, session_manager::{LargeFileDownload, LargeFilesDownload}};
mod ui_messages {
use std::{fmt::Display, iter::FromIterator, net::IpAddr, str::from_utf8};
@ -23,15 +24,8 @@ mod ui_messages {
pub fn on_new_session(session_id: &usize, name: &str, outgoing: bool, fingerprint: &str, ip: IpAddr) -> Message {
Message::from(format!("new_session {} {} {} {} {}", session_id, outgoing, fingerprint, ip, name))
}
pub fn on_file_received(session_id: &usize, timestamp: u64, buffer: &[u8]) -> Option<Message> {
let uuid = Uuid::from_bytes(to_uuid_bytes(&buffer[1..17])?);
match from_utf8(&buffer[17..]) {
Ok(file_name) => Some(Message::from(format!("file {} {} {} {}", session_id, timestamp, uuid.to_string(), file_name))),
Err(e) => {
print_error!(e);
None
}
}
pub fn on_new_file(session_id: &usize, outgoing: bool, timestamp: u64, filename: &str, uuid: Uuid) -> Message {
Message::from(format!("file {} {} {} {} {}", session_id, outgoing, timestamp, uuid.to_string(), filename))
}
pub fn new_files_transfer(session_id: &usize, files_transfer: &LargeFilesDownload) -> Message {
if files_transfer.accepted {
@ -68,10 +62,7 @@ mod ui_messages {
pub fn on_large_files_accepted(session_id: &usize) -> Message {
simple_event("files_accepted", session_id)
}
pub fn on_file_transfer_aborted(session_id: &usize) -> Message {
simple_event("aborted", session_id)
}
pub fn on_new_message(session_id: &usize, message: identity::Message) -> Option<Message> {
pub fn on_new_message(session_id: &usize, message: &identity::Message) -> Option<Message> {
match from_utf8(&message.data[1..]) {
Ok(msg) => Some(Message::from(format!("new_message {} {} {} {}", session_id, message.outgoing, message.timestamp, msg))),
Err(e) => {
@ -83,6 +74,9 @@ mod ui_messages {
pub fn inc_files_transfer(session_id: &usize, chunk_size: u64) -> Message {
Message::from(format!("inc_file_transfer {} {}", session_id, chunk_size))
}
pub fn on_file_transfer_aborted(session_id: &usize) -> Message {
simple_event("aborted", session_id)
}
pub fn load_msgs(session_id: &usize, msgs: &Vec<identity::Message>) -> Message {
let mut s = format!("load_msgs {}", session_id);
msgs.into_iter().rev().for_each(|message| {
@ -106,6 +100,15 @@ mod ui_messages {
pub fn set_not_seen(session_ids: Vec<usize>) -> Message {
data_list("not_seen", session_ids)
}
pub fn new_pending_msg(session_id: &usize, is_file: bool, data: &str) -> Message {
Message::from(format!("pending {} {} {}", session_id, is_file, data))
}
pub fn on_sending_pending_msgs(session_id: &usize) -> Message {
simple_event("sending_pending_msgs", session_id)
}
pub fn on_pending_msgs_sent(session_id: &usize) -> Message {
simple_event("pending_msgs_sent", session_id)
}
pub fn set_local_ips(ips: Vec<IpAddr>) -> Message {
data_list("local_ips", ips)
}
@ -148,39 +151,24 @@ impl UiConnection {
}
}
pub fn on_received(&mut self, session_id: &usize, timestamp: u64, buffer: Vec<u8>) {
let ui_message = match buffer[0] {
protocol::Headers::MESSAGE => ui_messages::on_new_message(session_id, identity::Message {
outgoing: false,
timestamp,
data: buffer
}),
protocol::Headers::FILE => ui_messages::on_file_received(session_id, timestamp, &buffer),
protocol::Headers::ACCEPT_LARGE_FILES => Some(ui_messages::on_large_files_accepted(session_id)),
protocol::Headers::ABORT_FILES_TRANSFER => Some(ui_messages::on_file_transfer_aborted(session_id)),
_ => None
};
if ui_message.is_some() {
self.write_message(ui_message.unwrap())
}
}
pub fn on_ask_large_files(&mut self, session_id: &usize, files: &Vec<LargeFileDownload>, download_location: &str) {
self.write_message(ui_messages::on_ask_large_files(session_id, files, download_location))
self.write_message(ui_messages::on_ask_large_files(session_id, files, download_location));
}
pub fn on_msg_sent(&mut self, session_id: usize, timestamp: u64, buffer: Vec<u8>) {
match buffer[0] {
protocol::Headers::MESSAGE => match ui_messages::on_new_message(&session_id, identity::Message {
outgoing: true,
timestamp,
data: buffer
}) {
Some(msg) => self.write_message(msg),
None => {}
}
protocol::Headers::ABORT_FILES_TRANSFER => self.write_message(ui_messages::on_file_transfer_aborted(&session_id)),
_ => {}
pub fn on_large_files_accepted(&mut self, session_id: &usize) {
self.write_message(ui_messages::on_large_files_accepted(session_id));
}
pub fn on_file_transfer_aborted(&mut self, session_id: &usize) {
self.write_message(ui_messages::on_file_transfer_aborted(&session_id));
}
pub fn on_new_msg(&mut self, session_id: &usize, message: &identity::Message) {
match ui_messages::on_new_message(session_id, message) {
Some(msg) => self.write_message(msg),
None => {}
}
}
pub fn on_new_file(&mut self, session_id: &usize, outgoing: bool, timestamp: u64, filename: &str, uuid: Uuid) {
self.write_message(ui_messages::on_new_file(session_id, outgoing, timestamp, filename, uuid));
}
pub fn on_new_session(&mut self, session_id: &usize, name: &str, outgoing: bool, fingerprint: &str, ip: IpAddr, files_transfer: Option<&LargeFilesDownload>) {
self.write_message(ui_messages::on_new_session(session_id, name, outgoing, fingerprint, ip));
if let Some(files_transfer) = files_transfer {
@ -209,6 +197,15 @@ impl UiConnection {
pub fn set_not_seen(&mut self, session_ids: Vec<usize>) {
self.write_message(ui_messages::set_not_seen(session_ids));
}
pub fn new_pending_msg(&mut self, session_id: &usize, is_file: bool, data: &str) {
self.write_message(ui_messages::new_pending_msg(session_id, is_file, data));
}
pub fn on_sending_pending_msgs(&mut self, session_id: &usize) {
self.write_message(ui_messages::on_sending_pending_msgs(session_id));
}
pub fn on_pending_msgs_sent(&mut self, session_id: &usize) {
self.write_message(ui_messages::on_pending_msgs_sent(session_id));
}
pub fn set_local_ips(&mut self, ips: Vec<IpAddr>) {
self.write_message(ui_messages::set_local_ips(ips));
}