Multiple Files Transfers

This commit is contained in:
Matéo Duparc 2021-05-06 18:24:29 +02:00
parent ce246c2234
commit 809ce34bff
Signed by: hardcoresushi
GPG Key ID: 007F84120107191E
8 changed files with 384 additions and 300 deletions

View File

@ -22,6 +22,7 @@ hex = "0.4.3"
strum_macros = "0.20.1" #display enums
actix-web = "3"
actix-multipart = "0.3"
time = "0.2.25" #needed for actix cookies
futures = "0.3"
tungstenite = "0.13.0" #websocket
serde = "1.0.124" #serialization
@ -34,6 +35,5 @@ libmdns = "0.6" #mDNS advertiser
multicast_dns = "0.5" #mDNS browser
pnet_datalink = "0.27.2"
base64 = "0.13.0"
time = "0.2.25"
scrypt = "0.7.0"
zeroize = "1.2.0"

View File

@ -54,12 +54,20 @@ input[type="file"] {
right: 0;
margin: auto;
width: 40%;
max-height: 90vh;
overflow: auto;
box-sizing: border-box;
padding: 20px 70px;
padding: 20px 70px 10px;
background-color: #2B2F31;
border-radius: 10px;
font-size: 1.2em;
}
.popup:last-child::after {
content: "";
display: block;
height: 20px;
width: 100%;
}
.popup_background {
height: 100%;
width: 100%;
@ -198,12 +206,6 @@ input[type="file"] {
}
#left_panel ul li:hover, #left_panel ul li.current {
background-color: #333940;
}
#left_panel ul li.output {
}
#left_panel ul li.input {
}
#left_panel ul li p::after {
content: url("/static/imgs/icons/warning/FF3C00");
@ -250,7 +252,7 @@ input[type="file"] {
background-color: unset;
content: url("/static/imgs/icons/info/FF3C00");
}
ul.ips {
.popup ul {
list-style-type: unset;
}
#chat_header {

View File

@ -57,7 +57,7 @@
<div id="message_box">
<input type="text" id="message_input" placeholder="Send a message...">
<label title="Send file" class="file_picker">
<input type="file" id="attach_file">
<input type="file" id="attach_file" multiple>
</label>
</div>
</div>

View File

@ -7,7 +7,7 @@ let localIps = [];
let currentSessionId = -1;
let sessionsData = new Map();
let msgHistory = new Map();
let pendingFiles = new Map();
let pendingFilesTransfers = new Map();
function onClickSession(event) {
let sessionId = event.currentTarget.getAttribute("data-sessionId");
@ -163,9 +163,16 @@ document.getElementById("logout").onclick = function() {
showPopup(mainDiv);
}
document.getElementById("attach_file").onchange = function(event) {
let file = event.target.files[0];
if (file.size > 32760000) {
if (pendingFiles.has(currentSessionId)) {
let files = event.target.files;
let useLargeFileTransfer = false;
for (let i=0; i<files.length; ++i) {
if (files[i].size > 32760000) {
useLargeFileTransfer = true;
break;
}
}
if (useLargeFileTransfer) {
if (pendingFilesTransfers.has(currentSessionId)) {
let mainDiv = document.createElement("div");
mainDiv.appendChild(generatePopupWarningTitle());
let p = document.createElement("p");
@ -173,28 +180,39 @@ document.getElementById("attach_file").onchange = function(event) {
mainDiv.appendChild(p);
showPopup(mainDiv);
} else {
pendingFiles.set(currentSessionId, {
"file": file,
"name": file.name,
"size": file.size,
"state": "waiting",
let fileTransfers = [];
let fileInfo = "";
for (let i=0; i<files.length; ++i) {
fileTransfers.push({
"file": files[i],
"name": files[i].name,
"size": files[i].size,
"transferred": 0,
"lastChunk": Date.now()
});
socket.send("large_file "+currentSessionId+" "+file.size+" "+file.name);
fileInfo += ' '+files[i].size+' '+b64EncodeUnicode(files[i].name);
};
pendingFilesTransfers.set(currentSessionId, {
"files": fileTransfers,
"index": 0,
"state": "waiting",
});
socket.send("large_files "+currentSessionId+fileInfo);
displayChatBottom();
}
} else {
for (let i=0; i<files.length; ++i) {
let formData = new FormData();
formData.append("session_id", currentSessionId);
formData.append("", file);
formData.append("", files[i]);
fetch("/send_file", {method: "POST", body: formData}).then(response => {
if (response.ok) {
response.text().then(uuid => onFileSent(currentSessionId, uuid, file.name));
response.text().then(uuid => onFileSent(currentSessionId, uuid, files[i].name));
} else {
console.log(response);
}
});
};
}
}
document.getElementById("file_cancel").onclick = function() {
@ -270,10 +288,10 @@ profile_div.onclick = function() {
if (isIdentityProtected || newPassword_set) { //don't change password if identity is not protected and new password is blank
let msg = "change_password";
if (isIdentityProtected) {
msg += " "+btoa(inputs[0].value);
msg += " "+b64EncodeUnicode(inputs[0].value);
}
if (newPassword_set) {
msg += " "+btoa(newPassword.value);
msg += " "+b64EncodeUnicode(newPassword.value);
}
socket.send(msg);
} else {
@ -357,9 +375,9 @@ document.querySelector("#refresher button").onclick = function() {
function humanFileSize(bytes, dp=1) {
const thresh = 1000;
if (Math.abs(bytes) < thresh) {
return bytes + ' B';
return bytes + " B";
}
const units = ['kB', 'MB', 'GB', 'TB', 'PB', 'EB', 'ZB', 'YB'];
const units = ["kB", "MB", "GB", "TB", "PB", "EB", "ZB", "YB"];
let u = -1;
const r = 10**dp;
do {
@ -368,6 +386,18 @@ function humanFileSize(bytes, dp=1) {
} while (Math.round(Math.abs(bytes) * r) / r >= thresh && u < units.length - 1);
return bytes.toFixed(dp) + ' ' + units[u];
}
//source: https://stackoverflow.com/a/30106551
function b64EncodeUnicode(str) {
return btoa(encodeURIComponent(str).replace(/%([0-9A-F]{2})/g,
function toSolidBytes(match, p1) {
return String.fromCharCode('0x' + p1);
}));
}
function b64DecodeUnicode(str) {
return decodeURIComponent(atob(str).split('').map(function(c) {
return '%' + ("00" + c.charCodeAt(0).toString(16)).slice(-2);
}).join(''));
}
//source: https://www.w3schools.com/js/js_cookies.asp
function getCookie(cname) {
var name = cname + "=";
@ -421,20 +451,20 @@ socket.onmessage = function(msg) {
case "file":
onFileReceived(args[1], args[2], msg.data.slice(args[0].length+args[1].length+args[2].length+3));
break;
case "file_transfer":
onNewFileTransfer(args[1], args[2], args[3], args[4], args[5], args[6]);
case "files_transfer":
onNewFilesTransfer(args[1], args[2], msg.data.slice(args[0].length+args[1].length+args[2].length+3));
break;
case "ask_large_file":
onAskLargeFile(args[1], args[2], args[3], args[4]);
case "ask_large_files":
onAskLargeFiles(args[1], args[2], msg.data.slice(args[0].length+args[1].length+args[2].length+3));
break;
case "file_accepted":
onFileAccepted(args[1]);
case "files_accepted":
onFilesAccepted(args[1]);
break;
case "aborted":
onFileAborted(args[1]);
onFilesTransferAborted(args[1]);
break;
case "inc_file_transfer":
onIncFileTransfer(args[1], parseInt(args[2]));
onIncFilesTransfer(args[1], parseInt(args[2]));
break;
case "load_sent_msg":
onMsgLoad(args[1], args[2] === "true", msg.data.slice(args[0].length+args[1].length+args[2].length+3));
@ -532,33 +562,47 @@ function onNewMessage(sessionId, outgoing, msg) {
msgHistory.get(sessionId).push([outgoing, false, msg]);
onMsgOrFileReceived(sessionId, outgoing, msg);
}
function onNewFileTransfer(sessionId, encodedFileName, fileSize, state, transferred, lastChunk) {
pendingFiles.set(sessionId, {
function onNewFilesTransfer(sessionId, index, filesInfo) {
let split = filesInfo.split(' ');
let files = [];
for (let i=0; i<split.length; i += 4) {
files.push({
"file": undefined,
"name": atob(encodedFileName),
"size": parseInt(fileSize),
"state": state,
"transferred": parseInt(transferred),
"lastChunk": parseInt(lastChunk)
"name": b64DecodeUnicode(split[i]),
"size": parseInt(split[i+1]),
"transferred": parseInt(split[i+2]),
"lastChunk": parseInt(split[i+3])
});
}
pendingFilesTransfers.set(sessionId, {
"files": files,
"index": parseInt(index),
"state": "transferring"
});
if (currentSessionId == sessionId) {
displayChatBottom();
}
}
function onAskLargeFile(sessionId, fileSize, encodedFileName, encodedDownloadLocation) {
function onAskLargeFiles(sessionId, encodedDownloadLocation, filesInfo) {
let sessionName = sessionsData.get(sessionId).name;
let mainDiv = document.createElement("div");
let h2 = document.createElement("h2");
h2.textContent = sessionName+" wants to send you a file:";
h2.textContent = sessionName+" wants to send you some files:";
mainDiv.appendChild(h2);
let fileName = atob(encodedFileName);
let fileInfo = document.createElement("p");
generateFileInfo(fileName, fileSize, fileInfo);
mainDiv.appendChild(fileInfo);
let ul = document.createElement("ul");
let split = filesInfo.split(' ');
for (let i=0; i<split.length; i += 2) {
let p = document.createElement("p");
generateFileInfo(b64DecodeUnicode(split[i]), parseInt(split[i+1]), p);
let li = document.createElement("li");
li.appendChild(p);
ul.appendChild(li);
}
mainDiv.appendChild(ul);
let spanDownloadLocation = document.createElement("span");
spanDownloadLocation.textContent = atob(encodedDownloadLocation);
spanDownloadLocation.textContent = b64DecodeUnicode(encodedDownloadLocation);
let pQuestion = document.createElement("p");
pQuestion.appendChild(document.createTextNode("Download it in "));
pQuestion.appendChild(document.createTextNode("Download them in "));
pQuestion.appendChild(spanDownloadLocation);
pQuestion.appendChild(document.createTextNode(" ?"));
mainDiv.appendChild(pQuestion);
@ -568,14 +612,21 @@ function onAskLargeFile(sessionId, fileSize, encodedFileName, encodedDownloadLoc
buttonDownload.textContent = "Download";
buttonDownload.onclick = function() {
removePopup();
pendingFiles.set(sessionId, {
let files = [];
for (let i=0; i<split.length; i += 2) {
files.push({
"file": undefined,
"name": fileName,
"size": fileSize,
"state": "accepted",
"name": b64DecodeUnicode(split[i]),
"size": parseInt(split[i+1]),
"transferred": 0,
"lastChunk": Date.now()
});
}
pendingFilesTransfers.set(sessionId, {
"files": files,
"index": 0,
"state": "transferring"
});
socket.send("download "+sessionId);
if (currentSessionId == sessionId) {
displayChatBottom();
@ -593,47 +644,41 @@ function onAskLargeFile(sessionId, fileSize, encodedFileName, encodedDownloadLoc
showPopup(mainDiv, false);
if (document.hidden && notificationAllowed) {
new Notification(sessionName, {
"body": fileName
"body": "Files download request"
});
}
}
function onFileAccepted(sessionId) {
if (pendingFiles.has(sessionId)) {
let file = pendingFiles.get(sessionId);
file.state = "sending";
file.lastChunk = Date.now();
if (currentSessionId == sessionId) {
displayChatBottom();
}
let formData = new FormData();
formData.append("session_id", currentSessionId);
formData.append("", file.file);
fetch("/send_large_file", {method: "POST", body: formData}).then(response => {
if (!response.ok) {
console.log(response);
}
});
function onFilesAccepted(sessionId) {
if (pendingFilesTransfers.has(sessionId)) {
sendNextLargeFile(sessionId);
}
}
function onFileAborted(sessionId) {
if (pendingFiles.has(sessionId)) {
pendingFiles.get(sessionId).state = "aborted";
function onFilesTransferAborted(sessionId) {
if (pendingFilesTransfers.has(sessionId)) {
pendingFilesTransfers.get(sessionId).state = "aborted";
if (sessionId == currentSessionId) {
displayChatBottom();
}
}
}
function onIncFileTransfer(sessionId, chunkSize) {
if (pendingFiles.has(sessionId)) {
let file = pendingFiles.get(sessionId);
file.transferred += chunkSize;
function onIncFilesTransfer(sessionId, chunkSize) {
if (pendingFilesTransfers.has(sessionId)) {
let filesTransfer = pendingFilesTransfers.get(sessionId);
let fileTransfer = filesTransfer.files[filesTransfer.index];
fileTransfer.transferred += chunkSize;
let now = Date.now();
let speed = chunkSize/(now-file.lastChunk)*1000;
file.lastChunk = now;
if (file.transferred >= file.size) {
file.state = "completed";
let speed = chunkSize/(now-fileTransfer.lastChunk)*1000;
fileTransfer.lastChunk = now;
if (fileTransfer.transferred >= fileTransfer.size) {
if (filesTransfer.index == filesTransfer.files.length-1) {
filesTransfer.state = "completed";
socket.send("sending_ended "+sessionId);
} else {
file.state = "transferring";
filesTransfer.index += 1;
if (typeof fileTransfer.file !== "undefined") {
sendNextLargeFile(sessionId);
}
}
}
if (currentSessionId == sessionId) {
displayChatBottom(speed);
@ -653,7 +698,7 @@ function onFileLoad(sessionId, outgoing, uuid, fileName) {
}
}
function onDisconnected(sessionId) {
pendingFiles.delete(sessionId);
pendingFilesTransfers.delete(sessionId);
let session = sessionsData.get(sessionId);
if (session.isContact) {
session.isOnline = false;
@ -696,6 +741,23 @@ function onPasswordChanged(success, isProtected) {
}
}
function sendNextLargeFile(sessionId) {
let filesTransfer = pendingFilesTransfers.get(sessionId);
filesTransfer.state = "transferring";
let fileTransfer = filesTransfer.files[filesTransfer.index];
fileTransfer.lastChunk = Date.now();
if (currentSessionId == sessionId) {
displayChatBottom();
}
let formData = new FormData();
formData.append("session_id", currentSessionId);
formData.append("", fileTransfer.file);
fetch("/send_large_file", {method: "POST", body: formData}).then(response => {
if (!response.ok) {
console.log(response);
}
});
}
function beautifyFingerprint(f) {
for (let i=4; i<f.length; i+=5) {
f = f.slice(0, i)+" "+f.slice(i);
@ -758,17 +820,17 @@ function displayHeader() {
}
}
}
function showPopup(content, closeButton = true) {
function showPopup(content, cancelable = true) {
let popup_background = document.createElement("div");
popup_background.classList.add("popup_background");
let popup = document.createElement("div");
popup.classList.add("popup");
if (cancelable) {
popup_background.onclick = function(e) {
if (e.target == popup_background) {
removePopup();
}
};
let popup = document.createElement("div");
popup.classList.add("popup");
if (closeButton) {
let close = document.createElement("button");
close.classList.add("close");
close.onclick = removePopup;
@ -805,11 +867,6 @@ function generateSession(sessionId, session) {
li.setAttribute("data-sessionId", sessionId);
li.appendChild(generateAvatar(session.name));
li.appendChild(generateName(session.name));
if (session.outgoing) {
li.classList.add("outgoing");
} else {
li.classList.add("incomming");
}
if (session.isContact) {
li.classList.add("is_contact");
}
@ -884,20 +941,22 @@ function generateFileInfo(fileName, fileSize, p) {
}
function displayChatBottom(speed = undefined) {
let msgBox = document.getElementById("message_box");
let fileTransfer = document.getElementById("file_transfer");
let session = sessionsData.get(currentSessionId);
if (typeof session === "undefined") {
msgBox.removeAttribute("style");
fileTransfer.classList.remove("active");
} else {
if (session.isOnline) {
msgBox.style.display = "flex";
} else {
msgBox.removeAttribute("style");
}
let fileTransfer = document.getElementById("file_transfer");
if (pendingFiles.has(currentSessionId)) {
let file = pendingFiles.get(currentSessionId);
if (pendingFilesTransfers.has(currentSessionId)) {
let fileInfo = document.getElementById("file_info");
fileInfo.innerHTML = "";
let filesTransfer = pendingFilesTransfers.get(currentSessionId);
let file = filesTransfer.files[filesTransfer.index];
generateFileInfo(file.name, file.size, fileInfo);
let fileProgress = document.getElementById("file_progress");
fileProgress.style.display = "none"; //hide by default
@ -906,7 +965,7 @@ function displayChatBottom(speed = undefined) {
let fileCancel = document.getElementById("file_cancel");
fileCancel.style.display = "none"; //hide by default
document.querySelector("#file_progress_bar>div").style.width = 0;
switch (file.state) {
switch (filesTransfer.state) {
case "transferring":
fileCancel.removeAttribute("style"); //show
fileStatus.style.display = "none";
@ -921,19 +980,13 @@ function displayChatBottom(speed = undefined) {
case "waiting":
fileStatus.textContent = "Waiting for peer confirmation...";
break;
case "accepted":
fileStatus.textContent = "Downloading file...";
break;
case "aborted":
fileStatus.textContent = "Transfer aborted.";
pendingFiles.delete(currentSessionId);
break;
case "sending":
fileStatus.textContent = "Sending file...";
pendingFilesTransfers.delete(currentSessionId);
break;
case "completed":
fileStatus.textContent = "Transfer completed.";
pendingFiles.delete(currentSessionId);
pendingFilesTransfers.delete(currentSessionId);
}
fileTransfer.classList.add("active");
} else {

View File

@ -20,7 +20,7 @@ use platform_dirs::AppDirs;
use zeroize::Zeroize;
use utils::escape_double_quote;
use identity::Identity;
use session_manager::{SessionManager, protocol};
use session_manager::{SessionManager, SessionCommand, protocol};
use ui_interface::UiConnection;
async fn start_websocket_server(global_vars: Arc<RwLock<GlobalVars>>) -> u16 {
@ -104,7 +104,7 @@ async fn websocket_worker(mut ui_connection: UiConnection, global_vars: Arc<RwLo
session.1.outgoing,
&crypto::generate_fingerprint(&session.1.peer_public_key),
session.1.ip,
session.1.file_download.as_ref()
session.1.files_download.as_ref()
);
});
{
@ -156,28 +156,44 @@ async fn websocket_worker(mut ui_connection: UiConnection, global_vars: Arc<RwLo
"send" => {
let session_id: usize = args[1].parse().unwrap();
let buffer = protocol::new_message(msg[args[0].len()+args[1].len()+2..].to_string());
match session_manager.send_to(&session_id, buffer.clone()).await {
match session_manager.send_command(&session_id, SessionCommand::Send {
buff: buffer.clone()
}).await {
Ok(_) => session_manager.store_msg(&session_id, true, buffer),
Err(e) => print_error!(e)
}
}
"large_file" => {
"large_files" => {
let session_id: usize = args[1].parse().unwrap();
let file_size: u64 = args[2].parse().unwrap();
let file_name = &msg[args[0].len()+args[1].len()+args[2].len()+3..];
if let Err(e) = session_manager.send_to(&session_id, protocol::ask_large_file(file_size, file_name)).await {
let mut file_info = Vec::new();
for n in (2..args.len()).step_by(2) {
file_info.push((args[n].parse::<u64>().unwrap(), base64::decode(args[n+1]).unwrap()));
}
if let Err(e) = session_manager.send_command(&session_id, SessionCommand::Send {
buff: protocol::ask_large_files(file_info)
}).await {
print_error!(e);
}
}
"download" => {
let session_id: usize = args[1].parse().unwrap();
if let Err(e) = session_manager.send_to(&session_id, vec![protocol::Headers::ACCEPT_LARGE_FILE]).await {
if let Err(e) = session_manager.send_command(&session_id, SessionCommand::Send {
buff: vec![protocol::Headers::ACCEPT_LARGE_FILES]
}).await {
print_error!(e);
}
}
"abort" => {
let session_id: usize = args[1].parse().unwrap();
if let Err(e) = session_manager.send_to(&session_id, vec![protocol::Headers::ABORT_FILE_TRANSFER]).await {
if let Err(e) = session_manager.send_command(&session_id, SessionCommand::Send {
buff: vec![protocol::Headers::ABORT_FILES_TRANSFER]
}).await {
print_error!(e);
}
}
"sending_ended" => {
let session_id: usize = args[1].parse().unwrap();
if let Err(e) = session_manager.send_command(&session_id, SessionCommand::SendingEnded).await {
print_error!(e);
}
}
@ -324,7 +340,9 @@ async fn handle_send_file(req: HttpRequest, mut payload: Multipart) -> HttpRespo
while let Some(Ok(chunk)) = field.next().await {
buffer.extend(chunk);
}
match global_vars_read.session_manager.send_to(&session_id, protocol::file(filename, &buffer)).await {
match global_vars_read.session_manager.send_command(&session_id, SessionCommand::Send {
buff: protocol::file(filename, &buffer)
}).await {
Ok(_) => {
match global_vars_read.session_manager.store_file(&session_id, &buffer) {
Ok(file_uuid) => {
@ -359,15 +377,18 @@ async fn handle_send_file(req: HttpRequest, mut payload: Multipart) -> HttpRespo
break;
}
}
if let Err(e) = global_vars_read.session_manager.encrypt_file_chunk(&session_id, chunk_buffer.clone()).await {
if let Err(e) = global_vars_read.session_manager.send_command(&session_id, SessionCommand::EncryptFileChunk{
plain_text: chunk_buffer.clone()
}).await {
print_error!(e);
return HttpResponse::InternalServerError().finish();
}
let last_chunk = chunk_buffer.len() < constants::FILE_CHUNK_SIZE;
if !match ack_receiver.recv().await {
Some(should_continue) => {
//send previous encrypted chunk even if transfert is aborted to keep PSEC nonces syncrhonized
if let Err(e) = global_vars_read.session_manager.send_encrypted_file_chunk(&session_id, ack_sender.clone(), last_chunk).await {
if let Err(e) = global_vars_read.session_manager.send_command(&session_id, SessionCommand::SendEncryptedFileChunk {
ack_sender: ack_sender.clone()
}).await {
print_error!(e);
false
} else {
@ -378,7 +399,7 @@ async fn handle_send_file(req: HttpRequest, mut payload: Multipart) -> HttpRespo
} {
return HttpResponse::InternalServerError().finish()
}
if last_chunk {
if chunk_buffer.len() < constants::FILE_CHUNK_SIZE {
break;
} else {
chunk_buffer.truncate(1);
@ -540,8 +561,19 @@ fn handle_static(req: HttpRequest) -> HttpResponse {
if splits[0] == "static" {
let mut response_builder = HttpResponse::Ok();
match splits[1] {
"index.js" => return response_builder.content_type(JS_CONTENT_TYPE).body(include_str!("frontend/index.js")),
"index.css" => return response_builder.body(include_str!("frontend/index.css")),
"index.js" => {
response_builder.content_type(JS_CONTENT_TYPE);
#[cfg(debug_assertions)]
return response_builder.body(fs::read_to_string("src/frontend/index.js").unwrap());
#[cfg(not(debug_assertions))]
return response_builder.body(include_str!("frontend/index.js"));
}
"index.css" => {
#[cfg(debug_assertions)]
return response_builder.body(fs::read_to_string("src/frontend/index.css").unwrap());
#[cfg(not(debug_assertions))]
return response_builder.body(include_str!("frontend/index.css"));
}
"imgs" => {
if splits[2] == "icons" && splits.len() <= 5 {
let color = if splits.len() == 5 {

View File

@ -1,7 +1,7 @@
mod session;
pub mod protocol;
use std::{collections::HashMap, net::{IpAddr, SocketAddr}, io::{self, Write}, str::from_utf8, fs::OpenOptions, sync::{Mutex, RwLock, Arc}};
use std::{collections::HashMap, fs::OpenOptions, io::{self, Write}, net::{IpAddr, SocketAddr}, path::PathBuf, str::from_utf8, sync::{Mutex, RwLock, Arc}};
use tokio::{net::{TcpListener, TcpStream}, sync::mpsc::{self, Sender, Receiver}};
use libmdns::Service;
use strum_macros::Display;
@ -24,36 +24,36 @@ pub enum SessionError {
Unknown,
}
enum SessionCommand {
pub enum SessionCommand {
Send {
buff: Vec<u8>,
},
SendEncryptedFileChunk {
sender: Sender<bool>,
last_chunk: bool,
ack_sender: Sender<bool>,
},
EncryptFileChunk {
plain_text: Vec<u8>,
},
SendingEnded,
Close,
}
#[derive(Copy, Clone, PartialEq, Eq)]
pub enum FileState {
ASKING,
ACCEPTED,
TRANSFERRING,
}
#[derive(Clone)]
pub struct LargeFileDownload {
pub file_name: String,
pub download_location: String,
pub file_size: u64,
pub state: FileState,
pub transferred: u64,
pub last_chunk: u128,
}
#[derive(Clone)]
pub struct LargeFilesDownload {
pub download_location: PathBuf,
pub accepted: bool,
pub index: usize,
pub files: Vec<LargeFileDownload>,
}
#[derive(Clone)]
pub struct SessionData {
pub name: String,
@ -61,7 +61,7 @@ pub struct SessionData {
pub peer_public_key: [u8; PUBLIC_KEY_LENGTH],
pub ip: IpAddr,
sender: Sender<SessionCommand>,
pub file_download: Option<LargeFileDownload>,
pub files_download: Option<LargeFilesDownload>,
}
pub struct SessionManager {
@ -79,7 +79,7 @@ pub struct SessionManager {
impl SessionManager {
fn with_ui_connection<F>(&self, f: F) where F: Fn(&mut UiConnection) {
fn with_ui_connection<F>(&self, f: F) where F: FnOnce(&mut UiConnection) {
let mut ui_connection_opt = self.ui_connection.lock().unwrap();
match ui_connection_opt.as_mut() {
Some(ui_connection) => if ui_connection.is_valid {
@ -119,44 +119,9 @@ impl SessionManager {
}
}
pub async fn encrypt_file_chunk(&self, session_id: &usize, plain_text: Vec<u8>) -> Result<(), SessionError> {
pub async fn send_command(&self, session_id: &usize, session_command: SessionCommand) -> Result<(), SessionError> {
if let Some(sender) = self.get_session_sender(session_id) {
match sender.send(SessionCommand::EncryptFileChunk {
plain_text,
}).await {
Ok(_) => Ok(()),
Err(e) => {
print_error!(e);
Err(SessionError::BrokenPipe)
}
}
} else {
Err(SessionError::InvalidSessionId)
}
}
pub async fn send_encrypted_file_chunk(&self, session_id: &usize, ack_sender: Sender<bool>, last_chunk: bool) -> Result<(), SessionError> {
if let Some(sender) = self.get_session_sender(session_id) {
match sender.send(SessionCommand::SendEncryptedFileChunk {
sender: ack_sender,
last_chunk
}).await {
Ok(_) => Ok(()),
Err(e) => {
print_error!(e);
Err(SessionError::BrokenPipe)
}
}
} else {
Err(SessionError::InvalidSessionId)
}
}
pub async fn send_to(&self, session_id: &usize, message: Vec<u8>) -> Result<(), SessionError> {
if let Some(sender) = self.get_session_sender(session_id) {
match sender.send(SessionCommand::Send {
buff: message
}).await {
match sender.send(session_command).await {
Ok(_) => Ok(()),
Err(e) => {
print_error!(e);
@ -179,10 +144,10 @@ impl SessionManager {
async fn send_msg(&self, session_id: usize, session_write: &mut SessionWrite, buff: &[u8], is_sending: &mut bool, file_ack_sender: Option<&Sender<bool>>) -> Result<(), SessionError> {
session_write.encrypt_and_send(&buff).await?;
if buff[0] == protocol::Headers::ACCEPT_LARGE_FILE {
self.sessions.write().unwrap().get_mut(&session_id).unwrap().file_download.as_mut().unwrap().state = FileState::ACCEPTED;
} else if buff[0] == protocol::Headers::ABORT_FILE_TRANSFER {
self.sessions.write().unwrap().get_mut(&session_id).unwrap().file_download = None;
if buff[0] == protocol::Headers::ACCEPT_LARGE_FILES {
self.sessions.write().unwrap().get_mut(&session_id).unwrap().files_download.as_mut().unwrap().accepted = true;
} else if buff[0] == protocol::Headers::ABORT_FILES_TRANSFER {
self.sessions.write().unwrap().get_mut(&session_id).unwrap().files_download = None;
*is_sending = false;
if let Some(sender) = file_ack_sender {
if let Err(e) = sender.send(false).await {
@ -198,10 +163,10 @@ impl SessionManager {
async fn session_worker(&self, session_id: usize, mut receiver: Receiver<SessionCommand>, session: Session) {
//used when we receive large file
let mut local_file_path = None;
let mut local_file_handle = None;
//used when we send large file
let mut next_chunk: Option<Vec<u8>> = None;
let mut last_chunks_sizes: Option<Vec<u32>> = None;
let mut file_ack_sender: Option<Sender<bool>> = None;
let mut msg_queue = Vec::new();
let mut is_sending = false;
@ -230,13 +195,14 @@ impl SessionManager {
protocol::Headers::TELL_NAME => {
match from_utf8(&buffer[1..]) {
Ok(new_name) => {
let new_name = new_name.replace('\n', " ");
self.with_ui_connection(|ui_connection| {
ui_connection.on_name_told(&session_id, new_name);
ui_connection.on_name_told(&session_id, &new_name);
});
let mut loaded_contacts = self.loaded_contacts.write().unwrap();
if let Some(contact) = loaded_contacts.get_mut(&session_id) {
contact.name = new_name.to_string();
if let Err(e) = self.identity.read().unwrap().as_ref().unwrap().change_contact_name(&contact.uuid, new_name) {
if let Err(e) = self.identity.read().unwrap().as_ref().unwrap().change_contact_name(&contact.uuid, &new_name) {
print_error!(e);
}
} else {
@ -246,53 +212,52 @@ impl SessionManager {
Err(e) => print_error!(e)
}
}
protocol::Headers::ASK_LARGE_FILE => {
if self.sessions.read().unwrap().get(&session_id).unwrap().file_download.is_none() && !is_sending { //don't accept 2 file transfers at the same time
if let Some((file_size, file_name)) = protocol::parse_ask_file(&buffer) {
let download_dir = UserDirs::new().unwrap().download_dir;
self.sessions.write().unwrap().get_mut(&session_id).unwrap().file_download = Some(LargeFileDownload{
file_name: file_name.clone(),
download_location: download_dir.to_str().unwrap().to_string(),
file_size,
state: FileState::ASKING,
protocol::Headers::ASK_LARGE_FILES => {
if self.sessions.read().unwrap().get(&session_id).unwrap().files_download.is_none() && !is_sending { //don't accept 2 file transfers at the same time
if let Some(files_info) = protocol::parse_ask_files(&buffer) {
let download_location = UserDirs::new().unwrap().download_dir;
let files: Vec<LargeFileDownload> = files_info.into_iter().map(|info| {
LargeFileDownload {
file_name: info.1,
file_size: info.0,
transferred: 0,
last_chunk: get_unix_timestamp(),
}
}).collect();
self.sessions.write().unwrap().get_mut(&session_id).unwrap().files_download = Some(LargeFilesDownload {
download_location: download_location.clone(),
accepted: false,
index: 0,
files: files.clone(),
});
local_file_path = Some(get_not_used_path(&file_name, &download_dir));
self.with_ui_connection(|ui_connection| {
ui_connection.on_ask_large_file(&session_id, file_size, &file_name, download_dir.to_str().unwrap());
ui_connection.on_ask_large_files(&session_id, &files, download_location.to_str().unwrap());
})
}
} else if let Err(e) = session_write.encrypt_and_send(&[protocol::Headers::ABORT_FILE_TRANSFER]).await {
} else if let Err(e) = session_write.encrypt_and_send(&[protocol::Headers::ABORT_FILES_TRANSFER]).await {
print_error!(e);
break;
}
}
protocol::Headers::LARGE_FILE_CHUNK => {
let state = {
let sessions = self.sessions.read().unwrap();
match sessions.get(&session_id).unwrap().file_download.as_ref() {
Some(file_transfer) => Some(file_transfer.state),
None => None
}
};
let mut should_accept_chunk = false;
if let Some(state) = state {
if state == FileState::ACCEPTED {
if let Some(file_path) = local_file_path.as_ref() {
match OpenOptions::new().append(true).create(true).open(file_path) {
{
let sessions = self.sessions.read().unwrap();
if let Some(files_transfer) = sessions.get(&session_id).unwrap().files_download.as_ref() {
if files_transfer.accepted {
if local_file_handle.is_some() {
should_accept_chunk = true;
} else {
let local_file_path = get_not_used_path(&files_transfer.files[files_transfer.index].file_name, &files_transfer.download_location);
match OpenOptions::new().append(true).create(true).open(local_file_path) {
Ok(file) => {
local_file_handle = Some(file);
let mut sessions = self.sessions.write().unwrap();
let file_transfer = sessions.get_mut(&session_id).unwrap().file_download.as_mut().unwrap();
file_transfer.state = FileState::TRANSFERRING;
should_accept_chunk = true;
}
Err(e) => print_error!(e)
}
}
} else if state == FileState::TRANSFERRING {
should_accept_chunk = true;
}
}
}
if should_accept_chunk {
@ -303,12 +268,16 @@ impl SessionManager {
let chunk_size = (buffer.len()-1) as u64;
{
let mut sessions = self.sessions.write().unwrap();
let file_transfer = sessions.get_mut(&session_id).unwrap().file_download.as_mut().unwrap();
let files_transfer = sessions.get_mut(&session_id).unwrap().files_download.as_mut().unwrap();
let file_transfer = &mut files_transfer.files[files_transfer.index];
file_transfer.last_chunk = get_unix_timestamp();
file_transfer.transferred += chunk_size;
if file_transfer.transferred >= file_transfer.file_size { //we downloaded all the file
sessions.get_mut(&session_id).unwrap().file_download = None;
local_file_path = None;
if files_transfer.index+1 == files_transfer.files.len() {
sessions.get_mut(&session_id).unwrap().files_download = None;
} else {
files_transfer.index += 1;
}
local_file_handle = None;
}
}
@ -317,7 +286,7 @@ impl SessionManager {
break;
}
self.with_ui_connection(|ui_connection| {
ui_connection.inc_file_transfer(&session_id, chunk_size);
ui_connection.inc_files_transfer(&session_id, chunk_size);
});
is_success = true;
}
@ -325,10 +294,9 @@ impl SessionManager {
}
}
if !is_success {
self.sessions.write().unwrap().get_mut(&session_id).unwrap().file_download = None;
local_file_path = None;
self.sessions.write().unwrap().get_mut(&session_id).unwrap().files_download = None;
local_file_handle = None;
if let Err(e) = session_write.encrypt_and_send(&[protocol::Headers::ABORT_FILE_TRANSFER]).await {
if let Err(e) = session_write.encrypt_and_send(&[protocol::Headers::ABORT_FILES_TRANSFER]).await {
print_error!(e);
break;
}
@ -337,9 +305,10 @@ impl SessionManager {
}
protocol::Headers::ACK_CHUNK => {
if let Some(sender) = file_ack_sender.clone() {
if let Some(next_chunk) = next_chunk.as_ref() {
if let Some(last_chunks_sizes) = last_chunks_sizes.as_mut() {
let chunk_size = last_chunks_sizes.remove(0);
self.with_ui_connection(|ui_connection| {
ui_connection.inc_file_transfer(&session_id, next_chunk.len() as u64);
ui_connection.inc_files_transfer(&session_id, chunk_size.into());
});
}
if sender.send(true).await.is_err() {
@ -347,15 +316,14 @@ impl SessionManager {
}
}
}
protocol::Headers::ABORT_FILE_TRANSFER => {
protocol::Headers::ABORT_FILES_TRANSFER => {
if let Some(sender) = file_ack_sender.clone() {
if let Err(e) = sender.send(false).await {
print_error!(e);
}
is_sending = false;
}
self.sessions.write().unwrap().get_mut(&session_id).unwrap().file_download = None;
local_file_path = None;
self.sessions.write().unwrap().get_mut(&session_id).unwrap().files_download = None;
local_file_handle = None;
self.with_ui_connection(|ui_connection| {
ui_connection.on_received(&session_id, &buffer);
@ -387,8 +355,9 @@ impl SessionManager {
let is_classical_message = header == protocol::Headers::MESSAGE || header == protocol::Headers::FILE;
if is_classical_message {
self.set_seen(session_id, false);
} else if header == protocol::Headers::ACCEPT_LARGE_FILE {
} else if header == protocol::Headers::ACCEPT_LARGE_FILES {
is_sending = true;
last_chunks_sizes = Some(Vec::new());
}
self.with_ui_connection(|ui_connection| {
ui_connection.on_received(&session_id, buffer.as_ref().unwrap());
@ -421,12 +390,15 @@ impl SessionManager {
}
}
}
SessionCommand::EncryptFileChunk { plain_text } => next_chunk = Some(session_write.encrypt(&plain_text)),
SessionCommand::SendEncryptedFileChunk { sender, last_chunk } => {
SessionCommand::EncryptFileChunk { plain_text } => {
last_chunks_sizes.as_mut().unwrap().push(plain_text.len() as u32);
next_chunk = Some(session_write.encrypt(&plain_text));
}
SessionCommand::SendEncryptedFileChunk { ack_sender } => {
if let Some(chunk) = next_chunk.as_ref() {
match session_write.socket_write(chunk).await {
Ok(_) => {
file_ack_sender = Some(sender);
file_ack_sender = Some(ack_sender);
//once the pre-encrypted chunk is sent, we can send the pending messages
while msg_queue.len() > 0 {
let msg = msg_queue.remove(0);
@ -435,9 +407,6 @@ impl SessionManager {
break;
}
}
if last_chunk {
is_sending = false;
}
}
Err(e) => {
print_error!(e);
@ -446,13 +415,10 @@ impl SessionManager {
}
}
}
SessionCommand::SendingEnded => is_sending = false,
SessionCommand::Close => break
}
}
else => {
println!("{} dead", session_id);
break;
}
}
}
}
@ -508,7 +474,7 @@ impl SessionManager {
peer_public_key,
ip,
sender: sender,
file_download: None,
files_download: None,
};
let mut session_id = None;
for (i, contact) in session_manager.loaded_contacts.read().unwrap().iter() {

View File

@ -8,11 +8,11 @@ impl Headers {
pub const ASK_NAME: u8 = 0x01;
pub const TELL_NAME: u8 = 0x02;
pub const FILE: u8 = 0x03;
pub const ASK_LARGE_FILE: u8 = 0x04;
pub const ACCEPT_LARGE_FILE: u8 = 0x05;
pub const ASK_LARGE_FILES: u8 = 0x04;
pub const ACCEPT_LARGE_FILES: u8 = 0x05;
pub const LARGE_FILE_CHUNK: u8 = 0x06;
pub const ACK_CHUNK: u8 = 0x07;
pub const ABORT_FILE_TRANSFER: u8 = 0x08;
pub const ABORT_FILES_TRANSFER: u8 = 0x08;
}
pub fn new_message(message: String) -> Vec<u8> {
@ -42,20 +42,41 @@ pub fn parse_file<'a>(buffer: &'a [u8]) -> Option<(&'a [u8], &'a [u8])> {
None
}
pub fn ask_large_file(file_size: u64, file_name: &str) -> Vec<u8> {
[&[Headers::ASK_LARGE_FILE], &file_size.to_be_bytes()[..], file_name.as_bytes()].concat()
pub fn ask_large_files(file_info: Vec<(u64, Vec<u8>)>) -> Vec<u8> {
let mut buff = vec![Headers::ASK_LARGE_FILES];
file_info.into_iter().for_each(|info| {
buff.extend(&info.0.to_be_bytes());
buff.extend(&(info.1.len() as u16).to_be_bytes());
buff.extend(info.1);
});
buff
}
pub fn parse_ask_file(buffer: &[u8]) -> Option<(u64, String)> {
if buffer.len() > 9 {
let file_size = u64::from_be_bytes(buffer[1..9].try_into().unwrap());
match from_utf8(&buffer[9..]) {
pub fn parse_ask_files(buffer: &[u8]) -> Option<Vec<(u64, String)>> {
let mut files_info = Vec::new();
let mut n = 1;
while n < buffer.len() {
if buffer[n..].len() > 10 { //8 + 2
let file_size = u64::from_be_bytes(buffer[n..n+8].try_into().unwrap());
let file_name_len = u16::from_be_bytes(buffer[n+8..n+10].try_into().unwrap()) as usize;
if buffer.len() >= n+10+file_name_len {
match from_utf8(&buffer[n+10..n+10+file_name_len]) {
Ok(file_name) => {
let file_name = sanitize_filename::sanitize(file_name);
return Some((file_size, file_name));
files_info.push((file_size, file_name));
n += 10+file_name_len;
}
Err(e) => print_error!(e),
Err(e) => {
print_error!(e);
return None
}
}
None
} else {
return None
}
} else {
return None
}
}
Some(files_info)
}

View File

@ -1,12 +1,12 @@
use std::net::{IpAddr, TcpStream};
use tungstenite::{WebSocket, protocol::Role, Message};
use crate::{protocol, session_manager::LargeFileDownload};
use crate::{protocol, session_manager::{LargeFileDownload, LargeFilesDownload}};
mod ui_messages {
use std::{fmt::Display, iter::FromIterator, net::IpAddr, str::from_utf8};
use tungstenite::Message;
use uuid::Uuid;
use crate::{print_error, session_manager::{protocol, LargeFileDownload, FileState}, utils::to_uuid_bytes};
use crate::{print_error, session_manager::{LargeFileDownload, LargeFilesDownload, protocol}, utils::to_uuid_bytes};
const ON_NEW_MESSAGE: &str = "new_message";
const LOAD_SENT_MESSAGE: &str = "load_sent_msg";
@ -45,30 +45,40 @@ mod ui_messages {
}
}
}
pub fn new_file_transfer(session_id: &usize, file_transfer: &LargeFileDownload) -> Message {
if file_transfer.state == FileState::ASKING {
on_ask_large_file(session_id, file_transfer.file_size, &file_transfer.file_name, &file_transfer.download_location)
} else {
Message::from(format!(
"file_transfer {} {} {} {} {} {}",
pub fn new_files_transfer(session_id: &usize, files_transfer: &LargeFilesDownload) -> Message {
if files_transfer.accepted {
let mut s = format!(
"files_transfer {} {}",
session_id,
base64::encode(&file_transfer.file_name),
file_transfer.file_size,
if file_transfer.state == FileState::ACCEPTED {
"accepted"
files_transfer.index
);
files_transfer.files.iter().for_each(|file| {
s.push_str(&format!(
" {} {} {} {}",
base64::encode(&file.file_name),
file.file_size,
file.transferred,
file.last_chunk,
));
});
Message::from(s)
} else {
"transferring"
},
file_transfer.transferred,
file_transfer.last_chunk,
))
on_ask_large_files(session_id, &files_transfer.files, files_transfer.download_location.to_str().unwrap())
}
}
pub fn on_ask_large_file(session_id: &usize, file_size: u64, file_name: &str, download_location: &str) -> Message {
Message::from(format!("ask_large_file {} {} {} {}", session_id, file_size, base64::encode(file_name), base64::encode(download_location)))
pub fn on_ask_large_files(session_id: &usize, files: &Vec<LargeFileDownload>, download_location: &str) -> Message {
let mut s = format!("ask_large_files {} {}", session_id, base64::encode(download_location));
files.into_iter().for_each(|file| {
s.push_str(&format!(
" {} {}",
base64::encode(&file.file_name),
file.file_size,
));
});
Message::from(s)
}
pub fn on_large_file_accepted(session_id: &usize) -> Message {
simple_event("file_accepted", session_id)
pub fn on_large_files_accepted(session_id: &usize) -> Message {
simple_event("files_accepted", session_id)
}
pub fn on_file_transfer_aborted(session_id: &usize) -> Message {
simple_event("aborted", session_id)
@ -76,7 +86,7 @@ mod ui_messages {
pub fn on_new_message(session_id: &usize, outgoing: bool, buffer: &[u8]) -> Option<Message> {
new_message(ON_NEW_MESSAGE, session_id, outgoing, &buffer[1..])
}
pub fn inc_file_transfer(session_id: &usize, chunk_size: u64) -> Message {
pub fn inc_files_transfer(session_id: &usize, chunk_size: u64) -> Message {
Message::from(format!("inc_file_transfer {} {}", session_id, chunk_size))
}
pub fn load_msg(session_id: &usize, outgoing: bool, buffer: &[u8]) -> Option<Message> {
@ -138,16 +148,16 @@ impl UiConnection {
let ui_message = match buffer[0] {
protocol::Headers::MESSAGE => ui_messages::on_new_message(session_id, false, buffer),
protocol::Headers::FILE => ui_messages::on_file_received(session_id, buffer),
protocol::Headers::ACCEPT_LARGE_FILE => Some(ui_messages::on_large_file_accepted(session_id)),
protocol::Headers::ABORT_FILE_TRANSFER => Some(ui_messages::on_file_transfer_aborted(session_id)),
protocol::Headers::ACCEPT_LARGE_FILES => Some(ui_messages::on_large_files_accepted(session_id)),
protocol::Headers::ABORT_FILES_TRANSFER => Some(ui_messages::on_file_transfer_aborted(session_id)),
_ => None
};
if ui_message.is_some() {
self.write_message(ui_message.unwrap())
}
}
pub fn on_ask_large_file(&mut self, session_id: &usize, file_size: u64, file_name: &str, download_location: &str) {
self.write_message(ui_messages::on_ask_large_file(session_id, file_size, file_name, download_location))
pub fn on_ask_large_files(&mut self, session_id: &usize, files: &Vec<LargeFileDownload>, download_location: &str) {
self.write_message(ui_messages::on_ask_large_files(session_id, files, download_location))
}
pub fn on_msg_sent(&mut self, session_id: usize, buffer: &[u8]) {
match buffer[0] {
@ -155,14 +165,14 @@ impl UiConnection {
Some(msg) => self.write_message(msg),
None => {}
}
protocol::Headers::ABORT_FILE_TRANSFER => self.write_message(ui_messages::on_file_transfer_aborted(&session_id)),
protocol::Headers::ABORT_FILES_TRANSFER => self.write_message(ui_messages::on_file_transfer_aborted(&session_id)),
_ => {}
}
}
pub fn on_new_session(&mut self, session_id: &usize, name: &str, outgoing: bool, fingerprint: &str, ip: IpAddr, file_transfer: Option<&LargeFileDownload>) {
pub fn on_new_session(&mut self, session_id: &usize, name: &str, outgoing: bool, fingerprint: &str, ip: IpAddr, files_transfer: Option<&LargeFilesDownload>) {
self.write_message(ui_messages::on_new_session(session_id, name, outgoing, fingerprint, ip));
if let Some(file_transfer) = file_transfer {
self.write_message(ui_messages::new_file_transfer(session_id, file_transfer));
if let Some(files_transfer) = files_transfer {
self.write_message(ui_messages::new_files_transfer(session_id, files_transfer));
}
}
pub fn on_disconnected(&mut self, session_id: &usize) {
@ -172,8 +182,8 @@ impl UiConnection {
self.write_message(ui_messages::on_name_told(session_id, name));
}
pub fn inc_file_transfer(&mut self, session_id: &usize, chunk_size: u64) {
self.write_message(ui_messages::inc_file_transfer(session_id, chunk_size));
pub fn inc_files_transfer(&mut self, session_id: &usize, chunk_size: u64) {
self.write_message(ui_messages::inc_files_transfer(session_id, chunk_size));
}
pub fn set_as_contact(&mut self, session_id: usize, name: &str, verified: bool, fingerprint: &str) {
self.write_message(ui_messages::set_as_contact(session_id, name, verified, fingerprint));