Large file transfers
This commit is contained in:
parent
264ca5299c
commit
33a107d347
@ -25,7 +25,8 @@ futures = "0.3"
|
||||
tungstenite = "0.13.0" #websocket
|
||||
serde = "1.0.124" #serialization
|
||||
html-escape = "0.2.7"
|
||||
dirs = "3.0"
|
||||
sanitize-filename = "0.3"
|
||||
platform-dirs = "0.3.0"
|
||||
uuid = {version = "0.8", features = ["v4"]}
|
||||
webbrowser = "0.5.5"
|
||||
libmdns = "0.6" #mDNS advertiser
|
||||
|
@ -4,3 +4,4 @@ pub const APPLICATION_FOLDER: &str = "AIRA";
|
||||
pub const DB_NAME: &str = "AIRA.db";
|
||||
pub const HTTP_COOKIE_NAME: &str = "aira_auth";
|
||||
pub const MSG_LOADING_COUNT: usize = 20;
|
||||
pub const FILE_CHUNK_SIZE: usize = 1023996;
|
@ -1,17 +1,17 @@
|
||||
use std::{net::IpAddr};
|
||||
use std::{net::IpAddr, io};
|
||||
use libmdns::{Responder, Service};
|
||||
use multicast_dns::discovery::{DiscoveryManager, DiscoveryListeners, ResolveListeners};
|
||||
use crate::{constants, print_error};
|
||||
|
||||
const SERVICE_TYPE: &str = "_aira._tcp";
|
||||
|
||||
pub fn advertise_me() -> Service {
|
||||
Responder::new().unwrap().register(
|
||||
pub fn advertise_me() -> io::Result<Service> {
|
||||
Ok(Responder::new()?.register(
|
||||
SERVICE_TYPE.to_string(),
|
||||
"AIRA Node".to_string(),
|
||||
constants::PORT,
|
||||
&[]
|
||||
)
|
||||
))
|
||||
}
|
||||
|
||||
pub fn discover_peers<F: Fn(&DiscoveryManager, IpAddr)>(on_service_discovered: F) {
|
||||
|
@ -1,4 +1,4 @@
|
||||
function generate_avatar(name){
|
||||
function generateAvatar(name){
|
||||
let span = document.createElement("span");
|
||||
if (typeof name == "undefined"){
|
||||
span.appendChild(document.createTextNode("?"));
|
||||
|
@ -84,7 +84,7 @@ input[type="file"] {
|
||||
font-size: 4em;
|
||||
margin: auto;
|
||||
}
|
||||
.popup>div>div, .popup>div>button {
|
||||
.popup section {
|
||||
font-weight: bold;
|
||||
display: block;
|
||||
margin-bottom: 1em;
|
||||
@ -104,16 +104,23 @@ input[type="file"] {
|
||||
.popup button:hover {
|
||||
background-color: var(--accent);
|
||||
}
|
||||
.popup span {
|
||||
font-weight: bold;
|
||||
}
|
||||
.popup>div>div p {
|
||||
font-weight: normal;
|
||||
font-size: 0.9em;
|
||||
}
|
||||
.popup h2::before {
|
||||
.popup h2.warning::before {
|
||||
content: url("/static/imgs/icons/warning/FF3C00");
|
||||
width: 9%;
|
||||
display: inline-block;
|
||||
vertical-align: middle;
|
||||
}
|
||||
.button_row {
|
||||
display: flex;
|
||||
gap: 15px;
|
||||
}
|
||||
.section_title {
|
||||
margin-left: 8px;
|
||||
font-weight: bold;
|
||||
@ -272,6 +279,55 @@ input[type="file"] {
|
||||
padding: 1em;
|
||||
font-size: 1.1em;
|
||||
}
|
||||
#file_transfer {
|
||||
border-top: 2px solid var(--accent);
|
||||
display: none;
|
||||
position: relative;
|
||||
}
|
||||
#file_transfer.active {
|
||||
display: block;
|
||||
}
|
||||
#file_transfer span {
|
||||
font-weight: bold;
|
||||
}
|
||||
#file_control {
|
||||
display: flex;
|
||||
align-items: center;
|
||||
}
|
||||
#file_cancel {
|
||||
padding: 0;
|
||||
}
|
||||
#file_cancel::after {
|
||||
background-color: unset;
|
||||
width: 1.2em;
|
||||
content: url("/static/imgs/icons/cancel/FF3C00");
|
||||
}
|
||||
#file_progress {
|
||||
display: none;
|
||||
gap: 10px;
|
||||
align-items: center;
|
||||
margin-bottom: 15px;
|
||||
}
|
||||
#file_transfer.active>#file_progress {
|
||||
display: flex;
|
||||
}
|
||||
#file_status {
|
||||
margin-top: 0;
|
||||
}
|
||||
#file_percent, #file_speed {
|
||||
font-weight: bold;
|
||||
margin: 0;
|
||||
border-left: 2px solid var(--accent);
|
||||
padding-left: 10px;
|
||||
}
|
||||
#file_progress_bar {
|
||||
flex-grow: 1;
|
||||
height: 1.5em;
|
||||
}
|
||||
#file_progress_bar div {
|
||||
height: 100%;
|
||||
background-color: var(--accent);
|
||||
}
|
||||
#message_box {
|
||||
border-top: 2px solid var(--accent);
|
||||
margin-bottom: 0;
|
||||
|
@ -37,6 +37,20 @@
|
||||
</div>
|
||||
<ul id="msg_log">
|
||||
</ul>
|
||||
<div id="file_transfer">
|
||||
<div id="file_control">
|
||||
<button id="file_cancel" title="Cancel"></button>
|
||||
<p id="file_info"></p>
|
||||
</div>
|
||||
<p id="file_status"></p>
|
||||
<div id="file_progress">
|
||||
<p id="file_percent"></p>
|
||||
<p id="file_speed"></p>
|
||||
<div id="file_progress_bar">
|
||||
<div></div>
|
||||
</div>
|
||||
</div>
|
||||
</div>
|
||||
<div id="message_box">
|
||||
<input type="text" id="message_input" placeholder="Send a message...">
|
||||
<label title="Send file" class="file_picker">
|
||||
@ -47,8 +61,8 @@
|
||||
</main>
|
||||
<script>
|
||||
//replaced by web server
|
||||
let is_identity_protected = IS_IDENTITY_PROTECTED;
|
||||
let websocket_port = WEBSOCKET_PORT;
|
||||
let isIdentityProtected = IS_IDENTITY_PROTECTED;
|
||||
let websocketPort = WEBSOCKET_PORT;
|
||||
</script>
|
||||
<script src="/static/libs/linkify.min.js"></script>
|
||||
<script src="/static/libs/linkify-element.min.js"></script>
|
||||
|
File diff suppressed because it is too large
Load Diff
@ -157,7 +157,7 @@
|
||||
}
|
||||
} else {
|
||||
let identity = document.getElementById("identity");
|
||||
identity.appendChild(generate_avatar(identity_name));
|
||||
identity.appendChild(generateAvatar(identity_name));
|
||||
let h2 = document.createElement("h2");
|
||||
h2.textContent = identity_name;
|
||||
identity.appendChild(h2);
|
||||
|
@ -1,8 +1,8 @@
|
||||
use std::{convert::TryInto, path::Path};
|
||||
use std::convert::TryInto;
|
||||
use crypto::CryptoError;
|
||||
use ed25519_dalek::{Keypair, Signer, SIGNATURE_LENGTH, PUBLIC_KEY_LENGTH};
|
||||
use rusqlite::{Connection, params};
|
||||
use dirs;
|
||||
use platform_dirs::AppDirs;
|
||||
use utils::to_uuid_bytes;
|
||||
use uuid::Uuid;
|
||||
use zeroize::Zeroize;
|
||||
@ -37,7 +37,7 @@ fn byte_to_bool(b: u8) -> Result<bool, ()> {
|
||||
}
|
||||
|
||||
fn get_database_path() -> String {
|
||||
Path::new(&dirs::data_local_dir().unwrap()).join(constants::APPLICATION_FOLDER).join(constants::DB_NAME).to_str().unwrap().to_owned()
|
||||
AppDirs::new(Some(constants::APPLICATION_FOLDER), false).unwrap().data_dir.join(constants::DB_NAME).to_str().unwrap().to_owned()
|
||||
}
|
||||
|
||||
struct EncryptedIdentity {
|
||||
|
118
src/main.rs
118
src/main.rs
@ -7,8 +7,8 @@ mod ui_interface;
|
||||
mod constants;
|
||||
mod discovery;
|
||||
|
||||
use std::{env, fs, io, net::SocketAddr, path::Path, str::FromStr, sync::{Arc, RwLock}};
|
||||
use tokio::{net::TcpListener, runtime::Handle};
|
||||
use std::{env, fs, io, net::SocketAddr, str::{FromStr, from_utf8}, sync::{Arc, RwLock}};
|
||||
use tokio::{net::TcpListener, runtime::Handle, sync::mpsc};
|
||||
use actix_web::{App, HttpMessage, HttpRequest, HttpResponse, HttpServer, http::{header, CookieBuilder}, web, web::Data};
|
||||
use actix_multipart::Multipart;
|
||||
use tungstenite::Message;
|
||||
@ -16,6 +16,7 @@ use futures::{StreamExt, TryStreamExt};
|
||||
use rand_8::{RngCore, rngs::OsRng};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use uuid::Uuid;
|
||||
use platform_dirs::AppDirs;
|
||||
use zeroize::Zeroize;
|
||||
use utils::escape_double_quote;
|
||||
use identity::Identity;
|
||||
@ -96,8 +97,8 @@ async fn websocket_worker(mut ui_connection: UiConnection, global_vars: Arc<RwLo
|
||||
session_manager.last_loaded_msg_offsets.write().unwrap().insert(contact.0, 0);
|
||||
load_msgs(session_manager.clone(), &mut ui_connection, &contact.0);
|
||||
});
|
||||
session_manager.list_sessions().into_iter().for_each(|session| {
|
||||
ui_connection.on_new_session(session.0, session.1);
|
||||
session_manager.sessions.read().unwrap().iter().for_each(|session| {
|
||||
ui_connection.on_new_session(session.0, &session.1.name, session.1.outgoing, session.1.file_transfer.as_ref());
|
||||
});
|
||||
let not_seen = session_manager.list_not_seen();
|
||||
if not_seen.len() > 0 {
|
||||
@ -142,6 +143,26 @@ async fn websocket_worker(mut ui_connection: UiConnection, global_vars: Arc<RwLo
|
||||
Err(e) => print_error!(e)
|
||||
}
|
||||
}
|
||||
"large_file" => {
|
||||
let session_id: usize = args[1].parse().unwrap();
|
||||
let file_size: u64 = args[2].parse().unwrap();
|
||||
let file_name = &msg[args[0].len()+args[1].len()+args[2].len()+3..];
|
||||
if let Err(e) = session_manager.send_to(&session_id, protocol::ask_large_file(file_size, file_name)).await {
|
||||
print_error!(e);
|
||||
}
|
||||
}
|
||||
"download" => {
|
||||
let session_id: usize = args[1].parse().unwrap();
|
||||
if let Err(e) = session_manager.send_to(&session_id, vec![protocol::Headers::ACCEPT_LARGE_FILE]).await {
|
||||
print_error!(e);
|
||||
}
|
||||
}
|
||||
"abort" => {
|
||||
let session_id: usize = args[1].parse().unwrap();
|
||||
if let Err(e) = session_manager.send_to(&session_id, vec![protocol::Headers::ABORT_FILE_TRANSFER]).await {
|
||||
print_error!(e);
|
||||
}
|
||||
}
|
||||
"load_msgs" => {
|
||||
let session_id: usize = args[1].parse().unwrap();
|
||||
load_msgs(session_manager.clone(), &mut ui_connection, &session_id);
|
||||
@ -155,7 +176,7 @@ async fn websocket_worker(mut ui_connection: UiConnection, global_vars: Arc<RwLo
|
||||
}
|
||||
"uncontact" => {
|
||||
let session_id: usize = args[1].parse().unwrap();
|
||||
match session_manager.remove_contact(&session_id) {
|
||||
match session_manager.remove_contact(session_id) {
|
||||
Ok(_) => {},
|
||||
Err(e) => print_error!(e)
|
||||
}
|
||||
@ -280,29 +301,78 @@ async fn handle_send_file(req: HttpRequest, mut payload: Multipart) -> HttpRespo
|
||||
if let Some(name) = content_disposition.get_name() {
|
||||
if name == "session_id" {
|
||||
if let Some(Ok(raw_id)) = field.next().await {
|
||||
session_id = Some(std::str::from_utf8(&raw_id).unwrap().parse().unwrap());
|
||||
session_id = Some(from_utf8(&raw_id).unwrap().parse().unwrap());
|
||||
}
|
||||
} else if session_id.is_some() {
|
||||
let filename = content_disposition.get_filename().unwrap();
|
||||
let mut buffer = Vec::new();
|
||||
while let Some(chunk) = field.next().await {
|
||||
buffer.extend(chunk.unwrap());
|
||||
}
|
||||
let session_id = session_id.unwrap();
|
||||
let global_vars = req.app_data::<Data<Arc<RwLock<GlobalVars>>>>().unwrap();
|
||||
let global_vars_read = global_vars.read().unwrap();
|
||||
match global_vars_read.session_manager.send_to(&session_id, protocol::file(filename, &buffer)).await {
|
||||
Ok(_) => {
|
||||
match global_vars_read.session_manager.store_file(&session_id, &buffer) {
|
||||
Ok(file_uuid) => {
|
||||
let msg = [&[protocol::Headers::FILE][..], file_uuid.as_bytes(), filename.as_bytes()].concat();
|
||||
global_vars_read.session_manager.store_msg(&session_id, true, msg);
|
||||
return HttpResponse::Ok().body(file_uuid.to_string());
|
||||
if req.path() == "/send_file" {
|
||||
let mut buffer = Vec::new();
|
||||
while let Some(Ok(chunk)) = field.next().await {
|
||||
buffer.extend(chunk);
|
||||
}
|
||||
match global_vars_read.session_manager.send_to(&session_id, protocol::file(filename, &buffer)).await {
|
||||
Ok(_) => {
|
||||
match global_vars_read.session_manager.store_file(&session_id, &buffer) {
|
||||
Ok(file_uuid) => {
|
||||
let msg = [&[protocol::Headers::FILE][..], file_uuid.as_bytes(), filename.as_bytes()].concat();
|
||||
global_vars_read.session_manager.store_msg(&session_id, true, msg);
|
||||
return HttpResponse::Ok().body(file_uuid.to_string());
|
||||
}
|
||||
Err(e) => print_error!(e)
|
||||
}
|
||||
Err(e) => print_error!(e)
|
||||
}
|
||||
Err(e) => print_error!(e)
|
||||
}
|
||||
} else {
|
||||
let (ack_sender, mut ack_receiver) = mpsc::channel(1);
|
||||
let mut pending_buffer = Vec::new();
|
||||
let mut chunk_buffer = Vec::with_capacity(constants::FILE_CHUNK_SIZE);
|
||||
chunk_buffer.push(protocol::Headers::LARGE_FILE_CHUNK);
|
||||
ack_sender.send(true).await.unwrap();
|
||||
loop {
|
||||
chunk_buffer.extend(&pending_buffer);
|
||||
pending_buffer.clear();
|
||||
while let Some(Ok(chunk)) = field.next().await {
|
||||
if chunk_buffer.len()+chunk.len() <= constants::FILE_CHUNK_SIZE {
|
||||
chunk_buffer.extend(chunk);
|
||||
} else if chunk_buffer.len() == constants::FILE_CHUNK_SIZE {
|
||||
pending_buffer.extend(chunk);
|
||||
break;
|
||||
} else {
|
||||
let remaining = constants::FILE_CHUNK_SIZE - chunk_buffer.len();
|
||||
chunk_buffer.extend(&chunk[..remaining]);
|
||||
pending_buffer.extend(&chunk[remaining..]);
|
||||
break;
|
||||
}
|
||||
}
|
||||
if let Err(e) = global_vars_read.session_manager.encrypt_file_chunk(&session_id, chunk_buffer.clone()).await {
|
||||
print_error!(e);
|
||||
return HttpResponse::InternalServerError().finish();
|
||||
}
|
||||
if !match ack_receiver.recv().await {
|
||||
Some(should_continue) => {
|
||||
//send previous encrypted chunk even if transfert is aborted to keep PSEC nonces syncrhonized
|
||||
if let Err(e) = global_vars_read.session_manager.send_encrypted_file_chunk(&session_id, ack_sender.clone()).await {
|
||||
print_error!(e);
|
||||
false
|
||||
} else {
|
||||
should_continue
|
||||
}
|
||||
}
|
||||
None => false
|
||||
} {
|
||||
return HttpResponse::InternalServerError().finish()
|
||||
}
|
||||
if chunk_buffer.len() < constants::FILE_CHUNK_SIZE {
|
||||
break;
|
||||
} else {
|
||||
chunk_buffer.truncate(1);
|
||||
}
|
||||
}
|
||||
Err(e) => print_error!(e)
|
||||
return HttpResponse::Ok().finish();
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -544,6 +614,7 @@ async fn start_http_server(global_vars: Arc<RwLock<GlobalVars>>) -> io::Result<(
|
||||
)
|
||||
.route("/login", web::post().to(handle_login))
|
||||
.route("/send_file", web::post().to(handle_send_file))
|
||||
.route("/send_large_file", web::post().to(handle_send_file))
|
||||
.route("/load_file", web::get().to(handle_load_file))
|
||||
.route("/static/.*", web::get().to(handle_static))
|
||||
.route("/logout", web::get().to(handle_logout))
|
||||
@ -578,12 +649,9 @@ struct GlobalVars {
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() {
|
||||
match fs::create_dir(Path::new(&dirs::data_local_dir().unwrap()).join(constants::APPLICATION_FOLDER)) {
|
||||
Ok(_) => {}
|
||||
Err(e) => {
|
||||
if e.kind() != io::ErrorKind::AlreadyExists {
|
||||
print_error!(e);
|
||||
}
|
||||
if let Err(e) = fs::create_dir(AppDirs::new(Some(constants::APPLICATION_FOLDER), false).unwrap().data_dir) {
|
||||
if e.kind() != io::ErrorKind::AlreadyExists {
|
||||
print_error!(e);
|
||||
}
|
||||
}
|
||||
let global_vars = Arc::new(RwLock::new(GlobalVars {
|
||||
|
@ -1,14 +1,15 @@
|
||||
mod session;
|
||||
pub mod protocol;
|
||||
|
||||
use std::{collections::HashMap, net::{IpAddr, SocketAddr}, io, sync::{Mutex, RwLock, Arc}};
|
||||
use tokio::{net::{TcpListener, TcpStream}, sync::{mpsc, mpsc::Sender}};
|
||||
use std::{collections::HashMap, net::{IpAddr, SocketAddr}, io::{self, Write}, convert::TryInto, str::from_utf8, fs::OpenOptions, sync::{Mutex, RwLock, Arc}};
|
||||
use tokio::{net::{TcpListener, TcpStream}, sync::mpsc::{self, Sender, Receiver}};
|
||||
use libmdns::Service;
|
||||
use strum_macros::Display;
|
||||
use session::Session;
|
||||
use ed25519_dalek::PUBLIC_KEY_LENGTH;
|
||||
use uuid::Uuid;
|
||||
use crate::{constants, discovery, identity::{Contact, Identity}, print_error};
|
||||
use platform_dirs::UserDirs;
|
||||
use crate::{constants, discovery, identity::{Contact, Identity}, utils::get_unix_timestamp, print_error};
|
||||
use crate::ui_interface::UiConnection;
|
||||
|
||||
#[derive(Display, Debug, PartialEq, Eq)]
|
||||
@ -23,14 +24,45 @@ pub enum SessionError {
|
||||
|
||||
enum SessionCommand {
|
||||
Send {
|
||||
buff: Vec<u8>
|
||||
buff: Vec<u8>,
|
||||
},
|
||||
SendEncryptedFileChunk {
|
||||
sender: Sender<bool>,
|
||||
},
|
||||
EncryptFileChunk {
|
||||
plain_text: Vec<u8>,
|
||||
},
|
||||
Close,
|
||||
}
|
||||
#[derive(Clone, PartialEq, Eq)]
|
||||
pub enum FileState {
|
||||
ASKING,
|
||||
ACCEPTED,
|
||||
TRANSFERRING,
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct LargeFileDownload {
|
||||
pub file_name: String,
|
||||
pub download_location: String,
|
||||
pub file_size: u64,
|
||||
pub state: FileState,
|
||||
pub transferred: u64,
|
||||
pub last_chunk: u128,
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct SessionData {
|
||||
pub name: String,
|
||||
pub outgoing: bool,
|
||||
peer_public_key: [u8; PUBLIC_KEY_LENGTH],
|
||||
sender: Sender<SessionCommand>,
|
||||
pub file_transfer: Option<LargeFileDownload>,
|
||||
}
|
||||
|
||||
pub struct SessionManager {
|
||||
session_counter: RwLock<usize>,
|
||||
sessions: RwLock<HashMap<usize, (bool ,[u8; PUBLIC_KEY_LENGTH], mpsc::Sender<SessionCommand>)>>,
|
||||
pub sessions: RwLock<HashMap<usize, SessionData>>,
|
||||
identity: RwLock<Option<Identity>>,
|
||||
ui_connection: Mutex<Option<UiConnection>>,
|
||||
loaded_contacts: RwLock<HashMap<usize, Contact>>,
|
||||
@ -75,22 +107,59 @@ impl SessionManager {
|
||||
}
|
||||
}
|
||||
|
||||
fn get_session_sender(&self, session_id: &usize) -> Option<Sender<SessionCommand>> {
|
||||
let mut sessions = self.sessions.write().unwrap();
|
||||
match sessions.get_mut(session_id) {
|
||||
Some(session_data) => Some(session_data.sender.clone()),
|
||||
None => None
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn encrypt_file_chunk(&self, session_id: &usize, plain_text: Vec<u8>) -> Result<(), SessionError> {
|
||||
if let Some(sender) = self.get_session_sender(session_id) {
|
||||
match sender.send(SessionCommand::EncryptFileChunk {
|
||||
plain_text,
|
||||
}).await {
|
||||
Ok(_) => Ok(()),
|
||||
Err(e) => {
|
||||
print_error!(e);
|
||||
Err(SessionError::BrokenPipe)
|
||||
}
|
||||
}
|
||||
} else {
|
||||
Err(SessionError::InvalidSessionId)
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn send_encrypted_file_chunk(&self, session_id: &usize, ack_sender: Sender<bool>) -> Result<(), SessionError> {
|
||||
if let Some(sender) = self.get_session_sender(session_id) {
|
||||
match sender.send(SessionCommand::SendEncryptedFileChunk {
|
||||
sender: ack_sender,
|
||||
}).await {
|
||||
Ok(_) => Ok(()),
|
||||
Err(e) => {
|
||||
print_error!(e);
|
||||
Err(SessionError::BrokenPipe)
|
||||
}
|
||||
}
|
||||
} else {
|
||||
Err(SessionError::InvalidSessionId)
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn send_to(&self, session_id: &usize, message: Vec<u8>) -> Result<(), SessionError> {
|
||||
let sender = {
|
||||
let mut sessions = self.sessions.write().unwrap();
|
||||
match sessions.get_mut(session_id) {
|
||||
Some(session_data) => session_data.2.clone(),
|
||||
None => return Err(SessionError::InvalidSessionId)
|
||||
}
|
||||
};
|
||||
match sender.send(SessionCommand::Send {
|
||||
buff: message
|
||||
}).await {
|
||||
Ok(_) => Ok(()),
|
||||
Err(e) => {
|
||||
print_error!(e);
|
||||
Err(SessionError::BrokenPipe)
|
||||
if let Some(sender) = self.get_session_sender(session_id) {
|
||||
match sender.send(SessionCommand::Send {
|
||||
buff: message
|
||||
}).await {
|
||||
Ok(_) => Ok(()),
|
||||
Err(e) => {
|
||||
print_error!(e);
|
||||
Err(SessionError::BrokenPipe)
|
||||
}
|
||||
}
|
||||
} else {
|
||||
Err(SessionError::InvalidSessionId)
|
||||
}
|
||||
}
|
||||
|
||||
@ -100,6 +169,276 @@ impl SessionManager {
|
||||
self.not_seen.write().unwrap().retain(|x| x != session_id);
|
||||
}
|
||||
|
||||
async fn send_msg(&self, session_id: usize, session: &mut Session, buff: &[u8], aborted: &mut bool, file_ack_sender: Option<&Sender<bool>>) -> Result<(), SessionError> {
|
||||
session.encrypt_and_send(&buff).await?;
|
||||
if buff[0] == protocol::Headers::ACCEPT_LARGE_FILE {
|
||||
self.sessions.write().unwrap().get_mut(&session_id).unwrap().file_transfer.as_mut().unwrap().state = FileState::ACCEPTED;
|
||||
} else if buff[0] == protocol::Headers::ABORT_FILE_TRANSFER {
|
||||
self.sessions.write().unwrap().get_mut(&session_id).unwrap().file_transfer = None;
|
||||
*aborted = true;
|
||||
if let Some(sender) = file_ack_sender {
|
||||
if let Err(e) = sender.send(false).await {
|
||||
print_error!(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
self.with_ui_connection(|ui_connection| {
|
||||
ui_connection.on_msg_sent(session_id, &buff);
|
||||
});
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn session_worker(&self, session_id: usize, mut receiver: Receiver<SessionCommand>, mut session: Session) {
|
||||
//used when we receive large file
|
||||
let mut local_file_path = None;
|
||||
let mut local_file_handle = None;
|
||||
//used when we send large file
|
||||
let mut next_chunk: Option<Vec<u8>> = None;
|
||||
let mut file_ack_sender: Option<Sender<bool>> = None;
|
||||
let mut msg_queue = Vec::new();
|
||||
let mut aborted = false;
|
||||
loop {
|
||||
tokio::select! {
|
||||
buffer = session.receive_and_decrypt() => {
|
||||
match buffer {
|
||||
Ok(buffer) => {
|
||||
match buffer[0] {
|
||||
protocol::Headers::ASK_NAME => {
|
||||
let name = {
|
||||
self.identity.read().unwrap().as_ref().and_then(|identity| Some(identity.name.clone()))
|
||||
};
|
||||
if name.is_some() { //can be None if we log out just before locking the identity mutex
|
||||
if let Err(e) = session.encrypt_and_send(&protocol::tell_name(&name.unwrap())).await {
|
||||
print_error!(e);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
protocol::Headers::TELL_NAME => {
|
||||
match from_utf8(&buffer[1..]) {
|
||||
Ok(new_name) => {
|
||||
self.with_ui_connection(|ui_connection| {
|
||||
ui_connection.on_name_told(&session_id, new_name);
|
||||
});
|
||||
let mut loaded_contacts = self.loaded_contacts.write().unwrap();
|
||||
if let Some(contact) = loaded_contacts.get_mut(&session_id) {
|
||||
contact.name = new_name.to_string();
|
||||
if let Err(e) = self.identity.read().unwrap().as_ref().unwrap().change_contact_name(&contact.uuid, new_name) {
|
||||
print_error!(e);
|
||||
}
|
||||
} else {
|
||||
self.sessions.write().unwrap().get_mut(&session_id).unwrap().name = new_name.to_string();
|
||||
}
|
||||
}
|
||||
Err(e) => print_error!(e)
|
||||
}
|
||||
}
|
||||
protocol::Headers::ASK_LARGE_FILE => {
|
||||
let file_size = u64::from_be_bytes(buffer[1..9].try_into().unwrap());
|
||||
match from_utf8(&buffer[9..]) {
|
||||
Ok(file_name) => {
|
||||
let file_name = sanitize_filename::sanitize(file_name);
|
||||
let download_dir = UserDirs::new().unwrap().download_dir;
|
||||
self.sessions.write().unwrap().get_mut(&session_id).unwrap().file_transfer = Some(LargeFileDownload{
|
||||
file_name: file_name.clone(),
|
||||
download_location: download_dir.to_str().unwrap().to_string(),
|
||||
file_size,
|
||||
state: FileState::ASKING,
|
||||
transferred: 0,
|
||||
last_chunk: get_unix_timestamp(),
|
||||
});
|
||||
let mut test_file_path = download_dir.join(&file_name);
|
||||
let mut n = 1;
|
||||
while test_file_path.exists() {
|
||||
let splits: Vec<&str> = file_name.split('.').collect();
|
||||
test_file_path = download_dir.join(format!("{} ({}).{}", splits[..splits.len()-1].join("."), n, splits[splits.len()-1]));
|
||||
n += 1;
|
||||
}
|
||||
local_file_path = Some(test_file_path);
|
||||
self.with_ui_connection(|ui_connection| {
|
||||
ui_connection.on_ask_large_file(&session_id, file_size, &file_name, download_dir.to_str().unwrap());
|
||||
})
|
||||
}
|
||||
Err(e) => print_error!(e),
|
||||
}
|
||||
}
|
||||
protocol::Headers::LARGE_FILE_CHUNK => {
|
||||
let file_transfer_opt = {
|
||||
self.sessions.read().unwrap().get(&session_id).unwrap().file_transfer.clone()
|
||||
};
|
||||
if let Some(file_transfer) = file_transfer_opt {
|
||||
if file_transfer.state == FileState::ACCEPTED || file_transfer.state == FileState::TRANSFERRING {
|
||||
if local_file_handle.is_none() {
|
||||
if let Some(file_path) = local_file_path.as_ref() {
|
||||
match OpenOptions::new().append(true).create(true).open(file_path) {
|
||||
Ok(file) => local_file_handle = Some(file),
|
||||
Err(e) => print_error!(e)
|
||||
}
|
||||
}
|
||||
}
|
||||
let mut is_success = false;
|
||||
if let Some(file_handle) = local_file_handle.as_mut() {
|
||||
match file_handle.write_all(&buffer[1..]) {
|
||||
Ok(_) => {
|
||||
let chunk_size = (buffer.len()-1) as u64;
|
||||
{
|
||||
let mut sessions = self.sessions.write().unwrap();
|
||||
let file_transfer = sessions.get_mut(&session_id).unwrap().file_transfer.as_mut().unwrap();
|
||||
file_transfer.last_chunk = get_unix_timestamp();
|
||||
file_transfer.transferred += chunk_size;
|
||||
if file_transfer.transferred >= file_transfer.file_size { //we downloaded all the file
|
||||
sessions.get_mut(&session_id).unwrap().file_transfer = None;
|
||||
local_file_path = None;
|
||||
local_file_handle = None;
|
||||
} else if file_transfer.state != FileState::TRANSFERRING {
|
||||
file_transfer.state = FileState::TRANSFERRING;
|
||||
}
|
||||
}
|
||||
if let Err(e) = session.encrypt_and_send(&[protocol::Headers::ACK_CHUNK]).await {
|
||||
print_error!(e);
|
||||
break;
|
||||
}
|
||||
self.with_ui_connection(|ui_connection| {
|
||||
ui_connection.inc_file_transfer(&session_id, chunk_size);
|
||||
});
|
||||
is_success = true;
|
||||
}
|
||||
Err(e) => print_error!(e)
|
||||
}
|
||||
}
|
||||
if !is_success {
|
||||
self.sessions.write().unwrap().get_mut(&session_id).unwrap().file_transfer = None;
|
||||
local_file_path = None;
|
||||
local_file_handle = None;
|
||||
if let Err(e) = session.encrypt_and_send(&[protocol::Headers::ABORT_FILE_TRANSFER]).await {
|
||||
print_error!(e);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
protocol::Headers::ACK_CHUNK => {
|
||||
if let Some(sender) = file_ack_sender.clone() {
|
||||
if let Some(next_chunk) = next_chunk.as_ref() {
|
||||
self.with_ui_connection(|ui_connection| {
|
||||
ui_connection.inc_file_transfer(&session_id, next_chunk.len() as u64);
|
||||
});
|
||||
}
|
||||
if sender.send(true).await.is_err() {
|
||||
aborted = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
protocol::Headers::ABORT_FILE_TRANSFER => {
|
||||
if let Some(sender) = file_ack_sender.clone() {
|
||||
if let Err(e) = sender.send(false).await {
|
||||
print_error!(e);
|
||||
}
|
||||
aborted = true;
|
||||
}
|
||||
self.sessions.write().unwrap().get_mut(&session_id).unwrap().file_transfer = None;
|
||||
local_file_path = None;
|
||||
local_file_handle = None;
|
||||
self.with_ui_connection(|ui_connection| {
|
||||
ui_connection.on_received(&session_id, &buffer);
|
||||
});
|
||||
}
|
||||
_ => {
|
||||
let header = buffer[0];
|
||||
let buffer = match header {
|
||||
protocol::Headers::FILE => {
|
||||
let file_name_len = u16::from_be_bytes([buffer[1], buffer[2]]) as usize;
|
||||
let file_name = &buffer[3..3+file_name_len];
|
||||
match self.store_file(&session_id, &buffer[3+file_name_len..]) {
|
||||
Ok(file_uuid) => {
|
||||
Some([&[protocol::Headers::FILE][..], file_uuid.as_bytes(), file_name].concat())
|
||||
}
|
||||
Err(e) => {
|
||||
print_error!(e);
|
||||
None
|
||||
}
|
||||
}
|
||||
}
|
||||
_ => {
|
||||
Some(buffer)
|
||||
}
|
||||
};
|
||||
if buffer.is_some() {
|
||||
let is_classical_message = header == protocol::Headers::MESSAGE || header == protocol::Headers::FILE;
|
||||
if is_classical_message {
|
||||
self.set_seen(session_id, false);
|
||||
}
|
||||
if header == protocol::Headers::ACCEPT_LARGE_FILE {
|
||||
aborted = false;
|
||||
}
|
||||
self.with_ui_connection(|ui_connection| {
|
||||
ui_connection.on_received(&session_id, buffer.as_ref().unwrap());
|
||||
});
|
||||
if is_classical_message {
|
||||
self.store_msg(&session_id, false, buffer.unwrap());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
if e != SessionError::BrokenPipe && e != SessionError::ConnectionReset {
|
||||
print_error!(e);
|
||||
}
|
||||
self.with_ui_connection(|ui_connection| {
|
||||
ui_connection.on_disconnected(&session_id);
|
||||
});
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
command = receiver.recv() => {
|
||||
match command.unwrap() {
|
||||
SessionCommand::Send { buff } => {
|
||||
//don't send msg if we already encrypted a file chunk (keep PSEC nonces synchronized)
|
||||
if next_chunk.is_none() || aborted {
|
||||
if let Err(e) = self.send_msg(session_id, &mut session, &buff, &mut aborted, file_ack_sender.as_ref()).await {
|
||||
print_error!(e);
|
||||
break;
|
||||
}
|
||||
} else {
|
||||
msg_queue.push(buff);
|
||||
}
|
||||
}
|
||||
SessionCommand::EncryptFileChunk { plain_text } => next_chunk = Some(session.encrypt(&plain_text)),
|
||||
SessionCommand::SendEncryptedFileChunk { sender } => {
|
||||
if let Some(chunk) = next_chunk.as_ref() {
|
||||
match session.socket_write(chunk).await {
|
||||
Ok(_) => {
|
||||
file_ack_sender = Some(sender);
|
||||
//once the pre-encrypted chunk is sent, we can send the pending messages
|
||||
while msg_queue.len() > 0 {
|
||||
let msg = msg_queue.remove(0);
|
||||
if let Err(e) = self.send_msg(session_id, &mut session, &msg, &mut aborted, file_ack_sender.as_ref()).await {
|
||||
print_error!(e);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
print_error!(e);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
SessionCommand::Close => break
|
||||
}
|
||||
}
|
||||
else => {
|
||||
println!("{} dead", session_id);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn handle_new_session(session_manager: Arc<SessionManager>, mut session: Session, outgoing: bool) {
|
||||
tokio::spawn(async move {
|
||||
let mut peer_public_key = [0; PUBLIC_KEY_LENGTH];
|
||||
@ -137,17 +476,24 @@ impl SessionManager {
|
||||
let mut sessions = session_manager.sessions.write().unwrap();
|
||||
let mut is_new_session = true;
|
||||
for (_, registered_session) in sessions.iter() {
|
||||
if registered_session.1 == peer_public_key { //already connected to this identity
|
||||
if registered_session.peer_public_key == peer_public_key { //already connected to this identity
|
||||
is_new_session = false;
|
||||
break;
|
||||
}
|
||||
}
|
||||
if is_new_session && session_manager.is_identity_loaded() { //check if we didn't logged out during the handshake
|
||||
let (sender, receiver) = mpsc::channel(32);
|
||||
let session_data = SessionData{
|
||||
name: session.get_ip(),
|
||||
outgoing,
|
||||
peer_public_key,
|
||||
sender: sender,
|
||||
file_transfer: None,
|
||||
};
|
||||
let mut session_id = None;
|
||||
for (i, contact) in session_manager.loaded_contacts.read().unwrap().iter() {
|
||||
if contact.public_key == peer_public_key { //session is a known contact. Assign the contact session_id to it
|
||||
sessions.insert(*i, (outgoing, peer_public_key, sender.clone()));
|
||||
sessions.insert(*i, session_data.clone());
|
||||
is_contact = true;
|
||||
session_id = Some(*i);
|
||||
break;
|
||||
@ -155,7 +501,7 @@ impl SessionManager {
|
||||
}
|
||||
if session_id.is_none() { //if not a contact, increment the session_counter
|
||||
let mut session_counter = session_manager.session_counter.write().unwrap();
|
||||
sessions.insert(*session_counter, (outgoing, peer_public_key, sender));
|
||||
sessions.insert(*session_counter, session_data);
|
||||
session_id = Some(*session_counter);
|
||||
*session_counter += 1;
|
||||
}
|
||||
@ -167,9 +513,9 @@ impl SessionManager {
|
||||
}
|
||||
};
|
||||
if let Some(session_data) = session_data {
|
||||
let (session_id, mut receiver) = session_data;
|
||||
let (session_id, receiver) = session_data;
|
||||
session_manager.with_ui_connection(|ui_connection| {
|
||||
ui_connection.on_new_session(session_id, outgoing);
|
||||
ui_connection.on_new_session(&session_id, &session.get_ip(), outgoing, None);
|
||||
});
|
||||
if !is_contact {
|
||||
match session.encrypt_and_send(&protocol::ask_name()).await {
|
||||
@ -181,101 +527,7 @@ impl SessionManager {
|
||||
}
|
||||
}
|
||||
}
|
||||
loop {
|
||||
tokio::select! {
|
||||
buffer = session.receive_and_decrypt() => {
|
||||
match buffer {
|
||||
Ok(buffer) => {
|
||||
if buffer[0] == protocol::Headers::ASK_NAME {
|
||||
let name = {
|
||||
session_manager.identity.read().unwrap().as_ref().and_then(|identity| Some(identity.name.clone()))
|
||||
};
|
||||
if name.is_some() { //can be None if we log out just before locking the identity mutex
|
||||
match session.encrypt_and_send(&protocol::tell_name(&name.unwrap())).await {
|
||||
Ok(_) => {}
|
||||
Err(e) => {
|
||||
print_error!(e);
|
||||
session.close();
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
} else {
|
||||
let buffer = if buffer[0] == protocol::Headers::FILE {
|
||||
let file_name_len = u16::from_be_bytes([buffer[1], buffer[2]]) as usize;
|
||||
let file_name = &buffer[3..3+file_name_len];
|
||||
match session_manager.store_file(&session_id, &buffer[3+file_name_len..]) {
|
||||
Ok(file_uuid) => {
|
||||
Some([&[protocol::Headers::FILE][..], file_uuid.as_bytes(), file_name].concat())
|
||||
}
|
||||
Err(e) => {
|
||||
print_error!(e);
|
||||
None
|
||||
}
|
||||
}
|
||||
} else {
|
||||
Some(buffer)
|
||||
};
|
||||
if buffer.is_some() {
|
||||
if buffer.as_ref().unwrap()[0] != protocol::Headers::TELL_NAME {
|
||||
session_manager.set_seen(session_id, false);
|
||||
}
|
||||
session_manager.with_ui_connection(|ui_connection| {
|
||||
ui_connection.on_received(&session_id, buffer.as_ref().unwrap());
|
||||
});
|
||||
if session_manager.is_contact(&session_id) {
|
||||
if buffer.as_ref().unwrap()[0] == protocol::Headers::TELL_NAME {
|
||||
match std::str::from_utf8(&buffer.as_ref().unwrap()[1..]) {
|
||||
Ok(new_name) => {
|
||||
let mut loaded_contacts = session_manager.loaded_contacts.write().unwrap();
|
||||
let contact = loaded_contacts.get_mut(&session_id).unwrap();
|
||||
contact.name = new_name.to_string();
|
||||
match session_manager.identity.read().unwrap().as_ref().unwrap().change_contact_name(&contact.uuid, new_name) {
|
||||
Ok(_) => {}
|
||||
Err(e) => print_error!(e)
|
||||
}
|
||||
}
|
||||
Err(e) => print_error!(e)
|
||||
}
|
||||
}
|
||||
}
|
||||
session_manager.store_msg(&session_id, false, buffer.unwrap());
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
if e != SessionError::BrokenPipe && e != SessionError::ConnectionReset {
|
||||
print_error!(e);
|
||||
}
|
||||
session_manager.with_ui_connection(|ui_connection| {
|
||||
ui_connection.on_disconnected(session_id);
|
||||
});
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
command = receiver.recv() => {
|
||||
match command.unwrap() {
|
||||
SessionCommand::Send { buff } => {
|
||||
match session.encrypt_and_send(&buff).await {
|
||||
Ok(_) => session_manager.with_ui_connection(|ui_connection| {
|
||||
ui_connection.on_msg_sent(session_id, &buff);
|
||||
}),
|
||||
Err(e) => print_error!(e)
|
||||
}
|
||||
}
|
||||
SessionCommand::Close => {
|
||||
session.close();
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
else => {
|
||||
println!("{} dead", session_id);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
session_manager.session_worker(session_id, receiver, session).await;
|
||||
session_manager.remove_session(&session_id);
|
||||
}
|
||||
}
|
||||
@ -287,7 +539,12 @@ impl SessionManager {
|
||||
let server_v4 = TcpListener::bind(SocketAddr::new("0.0.0.0".parse().unwrap(), constants::PORT)).await?;
|
||||
let (sender, mut receiver) = mpsc::channel(1);
|
||||
*session_manager.listener_stop_signal.lock().unwrap() = Some(sender);
|
||||
*session_manager.mdns_service.lock().unwrap() = Some(discovery::advertise_me());
|
||||
match discovery::advertise_me() {
|
||||
Ok(service) => *session_manager.mdns_service.lock().unwrap() = Some(service),
|
||||
Err(e) => {
|
||||
print_error!("{}: you won't be discoverable by other peers.", e);
|
||||
}
|
||||
}
|
||||
tokio::spawn(async move {
|
||||
loop {
|
||||
let (stream, _addr) = (tokio::select! {
|
||||
@ -301,11 +558,6 @@ impl SessionManager {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn list_sessions(&self) -> Vec<(usize, bool)> {
|
||||
let sessions = self.sessions.read().unwrap();
|
||||
sessions.iter().map(|t| (*t.0, t.1.0)).collect()
|
||||
}
|
||||
|
||||
pub fn list_contacts(&self) -> Vec<(usize, String, bool)> {
|
||||
self.loaded_contacts.read().unwrap().iter().map(|c| (*c.0, c.1.name.clone(), c.1.verified)).collect()
|
||||
}
|
||||
@ -317,7 +569,7 @@ impl SessionManager {
|
||||
pub fn get_peer_public_key(&self, session_id: &usize) -> Option<[u8; PUBLIC_KEY_LENGTH]> {
|
||||
let sessions = self.sessions.read().unwrap();
|
||||
let session = sessions.get(session_id)?;
|
||||
Some(session.1)
|
||||
Some(session.peer_public_key)
|
||||
}
|
||||
|
||||
pub fn list_not_seen(&self) -> Vec<usize> {
|
||||
@ -353,12 +605,14 @@ impl SessionManager {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn remove_contact(&self, session_id: &usize) -> Result<usize, rusqlite::Error> {
|
||||
pub fn remove_contact(&self, session_id: usize) -> Result<usize, rusqlite::Error> {
|
||||
let mut loaded_contacts = self.loaded_contacts.write().unwrap();
|
||||
let result = Identity::remove_contact(&loaded_contacts.get(session_id).unwrap().uuid);
|
||||
let result = Identity::remove_contact(&loaded_contacts.get(&session_id).unwrap().uuid);
|
||||
if result.is_ok() {
|
||||
loaded_contacts.remove(session_id);
|
||||
self.last_loaded_msg_offsets.write().unwrap().remove(session_id);
|
||||
if let Some(contact) = loaded_contacts.remove(&session_id) {
|
||||
self.sessions.write().unwrap().get_mut(&session_id).unwrap().name = contact.name;
|
||||
}
|
||||
self.last_loaded_msg_offsets.write().unwrap().remove(&session_id);
|
||||
}
|
||||
result
|
||||
}
|
||||
@ -420,7 +674,7 @@ impl SessionManager {
|
||||
let result = self.identity.write().unwrap().as_mut().unwrap().change_name(new_name);
|
||||
if result.is_ok() {
|
||||
let senders: Vec<Sender<SessionCommand>> = {
|
||||
self.sessions.read().unwrap().iter().map(|i| i.1.2.clone()).collect()
|
||||
self.sessions.read().unwrap().iter().map(|i| i.1.sender.clone()).collect()
|
||||
};
|
||||
for sender in senders.into_iter() {
|
||||
sender.send(SessionCommand::Send {
|
||||
@ -440,8 +694,8 @@ impl SessionManager {
|
||||
*sender = None;
|
||||
}
|
||||
self.set_identity(None);
|
||||
for (_, _, sender) in self.sessions.read().unwrap().values() {
|
||||
sender.send(SessionCommand::Close).await;
|
||||
for session_data in self.sessions.read().unwrap().values() {
|
||||
session_data.sender.send(SessionCommand::Close).await;
|
||||
}
|
||||
*self.ui_connection.lock().unwrap() = None;
|
||||
*self.session_counter.write().unwrap() = 0;
|
||||
@ -453,7 +707,6 @@ impl SessionManager {
|
||||
self.identity.read().unwrap().is_some()
|
||||
}
|
||||
|
||||
#[allow(unused_must_use)]
|
||||
pub fn set_identity(&self, identity: Option<Identity>) {
|
||||
let mut identity_guard = self.identity.write().unwrap();
|
||||
if identity.is_none() { //logout
|
||||
|
@ -5,6 +5,11 @@ impl Headers {
|
||||
pub const ASK_NAME: u8 = 0x02;
|
||||
pub const TELL_NAME: u8 = 0x03;
|
||||
pub const FILE: u8 = 0x04;
|
||||
pub const ASK_LARGE_FILE: u8 = 0x05;
|
||||
pub const ACCEPT_LARGE_FILE: u8 = 0x06;
|
||||
pub const LARGE_FILE_CHUNK: u8 = 0x07;
|
||||
pub const ACK_CHUNK: u8 = 0x08;
|
||||
pub const ABORT_FILE_TRANSFER: u8 = 0x09;
|
||||
}
|
||||
|
||||
pub fn new_message(message: String) -> Vec<u8> {
|
||||
@ -22,3 +27,7 @@ pub fn tell_name(name: &str) -> Vec<u8> {
|
||||
pub fn file(file_name: &str, buffer: &[u8]) -> Vec<u8> {
|
||||
[&[Headers::FILE], &(file_name.len() as u16).to_be_bytes()[..], file_name.as_bytes(), buffer].concat()
|
||||
}
|
||||
|
||||
pub fn ask_large_file(file_size: u64, file_name: &str) -> Vec<u8> {
|
||||
[&[Headers::ASK_LARGE_FILE], &file_size.to_be_bytes()[..], file_name.as_bytes()].concat()
|
||||
}
|
@ -34,7 +34,6 @@ impl Session {
|
||||
const MAX_RECV_SIZE: usize = MESSAGE_LEN_LEN + Session::PADDED_MAX_SIZE + AES_TAG_LEN;
|
||||
|
||||
pub fn new(stream: TcpStream) -> Session {
|
||||
//stream.set_read_timeout(Some(Duration::from_millis(1000))).unwrap();
|
||||
Session {
|
||||
stream: stream,
|
||||
handshake_sent_buff: Vec::new(),
|
||||
@ -49,6 +48,10 @@ impl Session {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn get_ip(&self) -> String {
|
||||
self.stream.peer_addr().unwrap().ip().to_string()
|
||||
}
|
||||
|
||||
async fn socket_read(&mut self, buff: &mut [u8]) -> Result<usize, SessionError> {
|
||||
match self.stream.read(buff).await {
|
||||
Ok(read) => {
|
||||
@ -70,7 +73,7 @@ impl Session {
|
||||
}
|
||||
}
|
||||
|
||||
async fn socket_write(&mut self, buff: &[u8]) -> Result<(), SessionError> {
|
||||
pub async fn socket_write(&mut self, buff: &[u8]) -> Result<(), SessionError> {
|
||||
match self.stream.write_all(buff).await {
|
||||
Ok(_) => Ok(()),
|
||||
Err(e) => Err(match e.kind() {
|
||||
@ -211,7 +214,7 @@ impl Session {
|
||||
Vec::from(&input[MESSAGE_LEN_LEN..MESSAGE_LEN_LEN+msg_len])
|
||||
}
|
||||
|
||||
pub async fn encrypt_and_send(&mut self, message: &[u8]) -> Result<(), SessionError> {
|
||||
pub fn encrypt(&mut self, message: &[u8]) -> Vec<u8> {
|
||||
let padded_msg = Session::random_pad(message);
|
||||
let cipher_len = (padded_msg.len() as MessageLenType).to_be_bytes();
|
||||
let payload = Payload {
|
||||
@ -220,7 +223,12 @@ impl Session {
|
||||
};
|
||||
let nonce = iv_to_nonce(&self.local_iv.unwrap(), &mut self.local_counter);
|
||||
let cipher_text = self.local_cipher.as_ref().unwrap().encrypt(GenericArray::from_slice(&nonce), payload).unwrap();
|
||||
self.socket_write([&cipher_len, cipher_text.as_slice()].concat().as_slice()).await
|
||||
[&cipher_len, cipher_text.as_slice()].concat()
|
||||
}
|
||||
|
||||
pub async fn encrypt_and_send(&mut self, message: &[u8]) -> Result<(), SessionError> {
|
||||
let cipher_text = self.encrypt(message);
|
||||
self.socket_write(&cipher_text).await
|
||||
}
|
||||
|
||||
pub async fn receive_and_decrypt(&mut self) -> Result<Vec<u8>, SessionError> {
|
||||
@ -247,8 +255,4 @@ impl Session {
|
||||
Err(SessionError::BufferTooLarge)
|
||||
}
|
||||
}
|
||||
|
||||
pub fn close(self) {
|
||||
drop(self.stream);
|
||||
}
|
||||
}
|
||||
|
@ -1,12 +1,12 @@
|
||||
use std::net::TcpStream;
|
||||
use tungstenite::{WebSocket, protocol::Role, Message};
|
||||
use crate::protocol;
|
||||
use crate::{protocol, session_manager::LargeFileDownload};
|
||||
|
||||
mod ui_messages {
|
||||
use std::{iter::FromIterator, str::from_utf8};
|
||||
use tungstenite::Message;
|
||||
use uuid::Uuid;
|
||||
use crate::{print_error, session_manager::protocol, utils::to_uuid_bytes};
|
||||
use crate::{print_error, session_manager::{protocol, LargeFileDownload, FileState}, utils::to_uuid_bytes};
|
||||
|
||||
const ON_NEW_MESSAGE: &str = "new_message";
|
||||
const LOAD_SENT_MESSAGE: &str = "load_sent_msg";
|
||||
@ -20,16 +20,19 @@ mod ui_messages {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn on_disconnected(session_id: usize) -> Message {
|
||||
Message::from(format!("disconnected {}", session_id))
|
||||
fn simple_event(verb: &str, session_id: &usize) -> Message {
|
||||
Message::from(format!("{} {}", verb, session_id))
|
||||
}
|
||||
pub fn on_new_session(session_id: usize, outgoing: bool) -> Message {
|
||||
Message::from(format!("new_session {} {}", session_id, outgoing))
|
||||
|
||||
pub fn on_disconnected(session_id: &usize) -> Message {
|
||||
simple_event("disconnected", session_id)
|
||||
}
|
||||
pub fn on_new_session(session_id: &usize, name: &str, outgoing: bool) -> Message {
|
||||
Message::from(format!("new_session {} {} {}", session_id, outgoing, name))
|
||||
}
|
||||
pub fn on_file_received(session_id: &usize, buffer: &[u8]) -> Option<Message> {
|
||||
let uuid = Uuid::from_bytes(to_uuid_bytes(&buffer[..16])?);
|
||||
match from_utf8(&buffer[16..]) {
|
||||
let uuid = Uuid::from_bytes(to_uuid_bytes(&buffer[1..17])?);
|
||||
match from_utf8(&buffer[17..]) {
|
||||
Ok(file_name) => Some(Message::from(format!("file {} {} {}", session_id, uuid.to_string(), file_name))),
|
||||
Err(e) => {
|
||||
print_error!(e);
|
||||
@ -37,13 +40,43 @@ mod ui_messages {
|
||||
}
|
||||
}
|
||||
}
|
||||
pub fn new_file_transfer(session_id: &usize, file_transfer: &LargeFileDownload) -> Message {
|
||||
if file_transfer.state == FileState::ASKING {
|
||||
on_ask_large_file(session_id, file_transfer.file_size, &file_transfer.file_name, &file_transfer.download_location)
|
||||
} else {
|
||||
Message::from(format!(
|
||||
"file_transfer {} {} {} {} {} {}",
|
||||
session_id,
|
||||
base64::encode(&file_transfer.file_name),
|
||||
file_transfer.file_size,
|
||||
if file_transfer.state == FileState::ACCEPTED {
|
||||
"accepted"
|
||||
} else {
|
||||
"transferring"
|
||||
},
|
||||
file_transfer.transferred,
|
||||
file_transfer.last_chunk,
|
||||
))
|
||||
}
|
||||
}
|
||||
pub fn on_ask_large_file(session_id: &usize, file_size: u64, file_name: &str, download_location: &str) -> Message {
|
||||
Message::from(format!("ask_large_file {} {} {} {}", session_id, file_size, base64::encode(file_name), base64::encode(download_location)))
|
||||
}
|
||||
pub fn on_large_file_accepted(session_id: &usize) -> Message {
|
||||
simple_event("file_accepted", session_id)
|
||||
}
|
||||
pub fn on_file_transfer_aborted(session_id: &usize) -> Message {
|
||||
simple_event("aborted", session_id)
|
||||
}
|
||||
pub fn on_new_message(session_id: &usize, outgoing: bool, buffer: &[u8]) -> Option<Message> {
|
||||
new_message(ON_NEW_MESSAGE, session_id, outgoing, &buffer[1..])
|
||||
}
|
||||
pub fn inc_file_transfer(session_id: &usize, chunk_size: u64) -> Message {
|
||||
Message::from(format!("inc_file_transfer {} {}", session_id, chunk_size))
|
||||
}
|
||||
pub fn load_msg(session_id: &usize, outgoing: bool, buffer: &[u8]) -> Option<Message> {
|
||||
match buffer[0] {
|
||||
protocol::Headers::MESSAGE => new_message(LOAD_SENT_MESSAGE, session_id, outgoing, &buffer[1..]),
|
||||
protocol::Headers::TELL_NAME => on_name_told(session_id, &buffer[1..]),
|
||||
protocol::Headers::FILE => {
|
||||
let uuid = Uuid::from_bytes(to_uuid_bytes(&buffer[1..17])?);
|
||||
match from_utf8(&buffer[17..]) {
|
||||
@ -62,14 +95,8 @@ mod ui_messages {
|
||||
format!(" {}", session_id)
|
||||
})))
|
||||
}
|
||||
pub fn on_name_told(session_id: &usize, raw_name: &[u8]) -> Option<Message> {
|
||||
match from_utf8(raw_name) {
|
||||
Ok(name) => Some(Message::from(format!("name_told {} {}", session_id, name))),
|
||||
Err(e) => {
|
||||
print_error!(e);
|
||||
None
|
||||
}
|
||||
}
|
||||
pub fn on_name_told(session_id: &usize, name: &str) -> Message {
|
||||
Message::from(format!("name_told {} {}", session_id, name))
|
||||
}
|
||||
pub fn set_as_contact(session_id: usize, name: &str, verified: bool) -> Message {
|
||||
Message::from(format!("is_contact {} {} {}", session_id, verified, name))
|
||||
@ -107,29 +134,44 @@ impl UiConnection {
|
||||
pub fn on_received(&mut self, session_id: &usize, buffer: &[u8]) {
|
||||
let ui_message = match buffer[0] {
|
||||
protocol::Headers::MESSAGE => ui_messages::on_new_message(session_id, false, buffer),
|
||||
protocol::Headers::TELL_NAME => ui_messages::on_name_told(session_id, &buffer[1..]),
|
||||
protocol::Headers::FILE => ui_messages::on_file_received(session_id, &buffer[1..]),
|
||||
protocol::Headers::FILE => ui_messages::on_file_received(session_id, buffer),
|
||||
protocol::Headers::ACCEPT_LARGE_FILE => Some(ui_messages::on_large_file_accepted(session_id)),
|
||||
protocol::Headers::ABORT_FILE_TRANSFER => Some(ui_messages::on_file_transfer_aborted(session_id)),
|
||||
_ => None
|
||||
};
|
||||
if ui_message.is_some() {
|
||||
self.write_message(ui_message.unwrap())
|
||||
}
|
||||
}
|
||||
pub fn on_ask_large_file(&mut self, session_id: &usize, file_size: u64, file_name: &str, download_location: &str) {
|
||||
self.write_message(ui_messages::on_ask_large_file(session_id, file_size, file_name, download_location))
|
||||
}
|
||||
pub fn on_msg_sent(&mut self, session_id: usize, buffer: &[u8]) {
|
||||
if buffer[0] == protocol::Headers::MESSAGE {
|
||||
match ui_messages::on_new_message(&session_id, true, buffer) {
|
||||
match buffer[0] {
|
||||
protocol::Headers::MESSAGE => match ui_messages::on_new_message(&session_id, true, buffer) {
|
||||
Some(msg) => self.write_message(msg),
|
||||
None => {}
|
||||
}
|
||||
protocol::Headers::ABORT_FILE_TRANSFER => self.write_message(ui_messages::on_file_transfer_aborted(&session_id)),
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
pub fn on_new_session(&mut self, session_id: usize, outgoing: bool) {
|
||||
self.write_message(ui_messages::on_new_session(session_id, outgoing));
|
||||
pub fn on_new_session(&mut self, session_id: &usize, name: &str, outgoing: bool, file_transfer: Option<&LargeFileDownload>) {
|
||||
self.write_message(ui_messages::on_new_session(session_id, name, outgoing));
|
||||
if let Some(file_transfer) = file_transfer {
|
||||
self.write_message(ui_messages::new_file_transfer(session_id, file_transfer));
|
||||
}
|
||||
}
|
||||
pub fn on_disconnected(&mut self, session_id: usize) {
|
||||
pub fn on_disconnected(&mut self, session_id: &usize) {
|
||||
self.write_message(ui_messages::on_disconnected(session_id));
|
||||
}
|
||||
pub fn on_name_told(&mut self, session_id: &usize, name: &str) {
|
||||
self.write_message(ui_messages::on_name_told(session_id, name));
|
||||
}
|
||||
|
||||
pub fn inc_file_transfer(&mut self, session_id: &usize, chunk_size: u64) {
|
||||
self.write_message(ui_messages::inc_file_transfer(session_id, chunk_size));
|
||||
}
|
||||
pub fn set_as_contact(&mut self, session_id: usize, name: &str, verified: bool) {
|
||||
self.write_message(ui_messages::set_as_contact(session_id, name, verified));
|
||||
}
|
||||
|
@ -1,4 +1,4 @@
|
||||
use std::convert::TryInto;
|
||||
use std::{convert::TryInto, time::{SystemTime, UNIX_EPOCH}};
|
||||
use uuid::Bytes;
|
||||
use crate::print_error;
|
||||
|
||||
@ -24,6 +24,10 @@ pub fn escape_double_quote(origin: String) -> String {
|
||||
origin.replace("\"", "\\\"")
|
||||
}
|
||||
|
||||
pub fn get_unix_timestamp() -> u128 {
|
||||
SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_millis()
|
||||
}
|
||||
|
||||
#[macro_export]
|
||||
macro_rules! print_error {
|
||||
($arg:tt) => ({
|
||||
|
Loading…
Reference in New Issue
Block a user