Switching to libmdns

This commit is contained in:
Matéo Duparc 2021-04-26 16:29:26 +02:00
parent 597b9d7a17
commit 65b2b8e04d
Signed by: hardcoresushi
GPG Key ID: 007F84120107191E
6 changed files with 72 additions and 92 deletions

View File

@ -7,10 +7,10 @@ edition = "2018"
[dependencies]
rand-8 = {package = "rand", version = "0.8.3"}
rand-7 = {package = "rand", version = "0.7.3"}
tokio = {version = "1", features = ["full"]}
tokio = {version = "1", features = ["rt", "rt-multi-thread", "macros", "net", "io-util"]}
lazy_static = "1.4"
socket2 = "0.4.0"
mio = "0.7"
rusqlite = {version = "0.25.1", features = ["bundled"]}
ed25519-dalek = "1" #for singing
x25519-dalek = "1.1" #for shared secret
sha2 = "0.9.3"
@ -26,16 +26,12 @@ tungstenite = "0.13.0" #websocket
serde = "1.0.124" #serialization
html-escape = "0.2.7"
dirs = "3.0"
uuid = { version = "0.8", features = ["v4"] }
uuid = {version = "0.8", features = ["v4"]}
webbrowser = "0.5.5"
astro-dnssd = "0.2.0" #mDNS advertiser
libmdns = "0.6" #mDNS advertiser
multicast_dns = "0.5" #mDNS browser
base64 = "0.13.0"
time = "0.2.25"
aes-gcm-siv = "0.9.0"
scrypt = "0.6.3"
zeroize = "1.2.0"
[dependencies.rusqlite]
version = "0.25.0"
features = ["bundled"]
zeroize = "1.2.0"

View File

@ -1,29 +1,17 @@
use std::{thread, net::IpAddr};
use astro_dnssd::register::DNSServiceBuilder;
use std::{net::IpAddr};
use libmdns::{Responder, Service};
use multicast_dns::discovery::{DiscoveryManager, DiscoveryListeners, ResolveListeners};
use crate::{constants, print_error};
const SERVICE_TYPE: &str = "_aira._tcp";
pub fn advertise_me(){
thread::spawn(||{
let mut service = DNSServiceBuilder::new(SERVICE_TYPE)
.with_name("AIRA Node")
.with_port(constants::PORT)
.build()
.unwrap();
match service.register(|reply| match reply {
Ok(_) => {},
Err(e) => print_error!("Error registering: {:?}", e)
}) {
Ok(_) => {
loop {
service.process_result();
}
}
Err(e) => print_error!("Unable to register mDNS service. You won't be discoverable by others peers. {}", e)
};
});
pub fn advertise_me() -> Service {
Responder::new().unwrap().register(
SERVICE_TYPE.to_string(),
"AIRA Node".to_string(),
constants::PORT,
&[]
)
}
pub fn discover_peers<F: Fn(&DiscoveryManager, IpAddr)>(on_service_discovered: F) {

View File

@ -303,6 +303,9 @@
#msg_log li p {
margin-top: 0;
}
#msg_log a {
color: #238cf5;
}
#msg_log .file {
display: flex;
align-items: end;

File diff suppressed because one or more lines are too long

View File

@ -8,7 +8,7 @@ mod constants;
mod discovery;
use std::{env, fs, io, net::{SocketAddr, TcpStream}, path::Path, str::FromStr, sync::{Arc, RwLock}};
use tokio::net::{TcpListener};
use tokio::{net::TcpListener, runtime::Handle};
use actix_web::{App, HttpMessage, HttpRequest, HttpResponse, HttpServer, http::{header, CookieBuilder}, web, web::Data};
use actix_multipart::Multipart;
use tungstenite::Message;
@ -87,17 +87,9 @@ async fn websocket_worker(websocket_strem: TcpStream, global_vars: Arc<RwLock<Gl
session_manager.last_loaded_msg_offsets.write().unwrap().insert(contact.0, 0);
load_msgs(session_manager.clone(), &mut ui_connection, &contact.0);
});
if global_vars.read().unwrap().is_backend_running { //ui reconnection
session_manager.list_sessions().into_iter().for_each(|session| {
ui_connection.on_new_session(session.0, session.1);
});
} else {
if SessionManager::start_listener(session_manager.clone()).await.is_err() {
print_error!("You won't be able to receive incomming connections from other peers.");
}
discovery::advertise_me();
global_vars.write().unwrap().is_backend_running = true;
}
session_manager.list_sessions().into_iter().for_each(|session| {
ui_connection.on_new_session(session.0, session.1);
});
let not_seen = session_manager.list_not_seen();
if not_seen.len() > 0 {
ui_connection.set_not_seen(not_seen);
@ -106,7 +98,7 @@ async fn websocket_worker(websocket_strem: TcpStream, global_vars: Arc<RwLock<Gl
ui_connection.load_msgs(&msgs.0, &msgs.1);
});
discover_peers(session_manager.clone());
let handle = tokio::runtime::Handle::current();
let handle = Handle::current();
std::thread::spawn(move || {
loop {
match ui_connection.websocket.read_message() {
@ -115,7 +107,6 @@ async fn websocket_worker(websocket_strem: TcpStream, global_vars: Arc<RwLock<Gl
ui_connection.write_message(Message::Pong(Vec::new())); //not sure if I'm doing this right
} else if msg.is_text() {
let msg = msg.into_text().unwrap();
let global_vars = global_vars.clone();
let session_manager = session_manager.clone();
let mut ui_connection = UiConnection::from_raw_socket(websocket_strem.try_clone().unwrap());
handle.spawn(async move {
@ -191,10 +182,9 @@ async fn websocket_worker(websocket_strem: TcpStream, global_vars: Arc<RwLock<Gl
};
}
"change_password" => {
let global_vars_read = global_vars.read().unwrap();
let (old_password, new_password) = if args.len() == 3 {
(Some(base64::decode(args[1]).unwrap()), Some(base64::decode(args[2]).unwrap()))
} else if global_vars_read.is_identity_protected { //sent old_password
} else if Identity::is_protected().unwrap() { //sent old_password
(Some(base64::decode(args[1]).unwrap()), None)
} else { //sent new password
(None, Some(base64::decode(args[1]).unwrap()))
@ -210,14 +200,7 @@ async fn websocket_worker(websocket_strem: TcpStream, global_vars: Arc<RwLock<Gl
false
};
match result {
Ok(success) => {
ui_connection.password_changed(success, is_identity_protected);
if success && is_identity_protected != global_vars_read.is_identity_protected {
drop(global_vars_read);
let mut global_vars_write = global_vars.write().unwrap();
global_vars_write.is_identity_protected = is_identity_protected;
}
}
Ok(success) => ui_connection.password_changed(success, is_identity_protected),
Err(e) => print_error!(e)
}
}
@ -325,12 +308,11 @@ async fn handle_logout(req: HttpRequest) -> HttpResponse {
Some(cookie) => {
let global_vars = req.app_data::<Data<Arc<RwLock<GlobalVars>>>>().unwrap();
let mut global_vars_write = global_vars.write().unwrap();
if global_vars_write.is_backend_running {
if global_vars_write.session_manager.is_identity_loaded() {
global_vars_write.http_session_manager.remove(cookie.value());
global_vars_write.session_manager.stop().await;
global_vars_write.is_backend_running = false;
}
if global_vars_write.is_identity_protected {
if Identity::is_protected().unwrap_or(true) {
HttpResponse::Found().header(header::LOCATION, "/").finish()
} else {
HttpResponse::Ok().body(include_str!("frontend/logout.html"))
@ -340,11 +322,17 @@ async fn handle_logout(req: HttpRequest) -> HttpResponse {
}
}
async fn login(identity: Identity, global_vars: &Arc<RwLock<GlobalVars>>) -> HttpResponse {
fn login(identity: Identity, global_vars: &Arc<RwLock<GlobalVars>>) -> HttpResponse {
let mut global_vars_write = global_vars.write().unwrap();
let cookie_value = global_vars_write.http_session_manager.register();
if !global_vars_write.session_manager.is_identity_loaded() {
global_vars_write.session_manager.set_identity(Some(identity)).await;
let session_manager = global_vars_write.session_manager.clone();
if !session_manager.is_identity_loaded() {
global_vars_write.session_manager.set_identity(Some(identity));
global_vars_write.tokio_handle.clone().spawn(async move {
if SessionManager::start_listener(session_manager.clone()).await.is_err() {
print_error!("You won't be able to receive incomming connections from other peers.");
}
});
}
let cookie = CookieBuilder::new(constants::HTTP_COOKIE_NAME, cookie_value)
.http_only(true)
@ -353,19 +341,19 @@ async fn login(identity: Identity, global_vars: &Arc<RwLock<GlobalVars>>) -> Htt
HttpResponse::Found().header(header::LOCATION, "/").set_header(header::SET_COOKIE, cookie.to_string()).finish()
}
async fn on_identity_loaded(identity: Identity, global_vars: &Arc<RwLock<GlobalVars>>) -> HttpResponse {
fn on_identity_loaded(identity: Identity, global_vars: &Arc<RwLock<GlobalVars>>) -> HttpResponse {
match Identity::clear_temporary_files() {
Ok(_) => {},
Err(e) => print_error!(e)
}
login(identity, global_vars).await
login(identity, global_vars)
}
async fn handle_login(req: HttpRequest, mut params: web::Form<LoginParams>) -> HttpResponse {
fn handle_login(req: HttpRequest, mut params: web::Form<LoginParams>) -> HttpResponse {
let response = match Identity::load_identity(Some(params.password.as_bytes())) {
Ok(identity) => {
let global_vars = req.app_data::<Data<Arc<RwLock<GlobalVars>>>>().unwrap();
on_identity_loaded(identity, global_vars).await
on_identity_loaded(identity, global_vars)
}
Err(e) => generate_login_response(Some(&e.to_string()))
};
@ -412,7 +400,7 @@ async fn handle_create(req: HttpRequest, mut params: web::Form<CreateParams>) ->
) {
Ok(identity) => {
let global_vars = req.app_data::<Data<Arc<RwLock<GlobalVars>>>>().unwrap();
login(identity, global_vars.get_ref()).await
login(identity, global_vars.get_ref())
}
Err(e) => {
print_error!(e);
@ -427,15 +415,12 @@ async fn handle_create(req: HttpRequest, mut params: web::Form<CreateParams>) ->
response
}
async fn index_not_logged_in(global_vars: &Arc<RwLock<GlobalVars>>) -> HttpResponse {
let global_vars_read = global_vars.read().unwrap();
let is_protected = global_vars_read.is_identity_protected;
drop(global_vars_read);
if is_protected {
fn index_not_logged_in(global_vars: &Arc<RwLock<GlobalVars>>) -> HttpResponse {
if Identity::is_protected().unwrap_or(true) {
generate_login_response(None)
} else {
match Identity::load_identity(None) {
Ok(identity) => on_identity_loaded(identity, global_vars).await,
Ok(identity) => on_identity_loaded(identity, global_vars),
Err(_) => generate_login_response(None) //assuming no identity
}
}
@ -450,14 +435,14 @@ async fn handle_index(req: HttpRequest) -> HttpResponse {
HttpResponse::Ok().body(
include_str!("frontend/index.html")
.replace("WEBSOCKET_PORT", &global_vars_read.websocket_port.to_string())
.replace("IS_IDENTITY_PROTECTED", &global_vars_read.is_identity_protected.to_string())
.replace("IS_IDENTITY_PROTECTED", &Identity::is_protected().unwrap().to_string())
)
} else {
drop(global_vars_read);
index_not_logged_in(global_vars).await
index_not_logged_in(global_vars)
}
}
None => index_not_logged_in(global_vars).await
None => index_not_logged_in(global_vars)
}
}
@ -609,10 +594,9 @@ impl HttpSessionsManager {
struct GlobalVars {
session_manager: Arc<SessionManager>,
is_backend_running: bool,
websocket_port: u16,
is_identity_protected: bool,
http_session_manager: HttpSessionsManager,
tokio_handle: Handle,
}
#[tokio::main]
@ -627,10 +611,9 @@ async fn main() {
}
let global_vars = Arc::new(RwLock::new(GlobalVars {
session_manager: Arc::new(SessionManager::new()),
is_backend_running: false,
websocket_port: 0,
is_identity_protected: Identity::is_protected().unwrap_or(false),
http_session_manager: HttpSessionsManager::new(),
tokio_handle: Handle::current(),
}));
let websocket_port = start_websocket_server(global_vars.clone()).await;
global_vars.write().unwrap().websocket_port = websocket_port;

View File

@ -3,11 +3,12 @@ pub mod protocol;
use std::{collections::HashMap, net::{IpAddr, SocketAddr}, io, sync::{Mutex, RwLock, Arc}};
use tokio::{net::{TcpListener, TcpStream}, sync::{mpsc, mpsc::Sender}};
use session::Session;
use libmdns::Service;
use strum_macros::Display;
use session::Session;
use ed25519_dalek::PUBLIC_KEY_LENGTH;
use uuid::Uuid;
use crate::{constants, identity::{Contact, Identity}, print_error};
use crate::{constants, discovery, identity::{Contact, Identity}, print_error};
use crate::ui_interface::UiConnection;
#[derive(Display, Debug, PartialEq, Eq)]
@ -36,6 +37,7 @@ pub struct SessionManager {
pub last_loaded_msg_offsets: RwLock<HashMap<usize, usize>>,
pub saved_msgs: Mutex<HashMap<usize, Vec<(bool, Vec<u8>)>>>,
not_seen: RwLock<Vec<usize>>,
mdns_service: Mutex<Option<Service>>,
listener_stop_signal: Mutex<Option<Sender<()>>>,
}
@ -178,16 +180,18 @@ impl SessionManager {
Ok(buffer) => {
if buffer[0] == protocol::Headers::ASK_NAME {
let name = {
session_manager.identity.read().unwrap().as_ref().unwrap().name.clone()
session_manager.identity.read().unwrap().as_ref().and_then(|identity| Some(identity.name.clone()))
};
match session.encrypt_and_send(&protocol::tell_name(&name)).await {
Ok(_) => {}
Err(e) => {
print_error!(e);
session.close();
break;
if name.is_some() { //can be None if we log out just before locking the identity mutex
match session.encrypt_and_send(&protocol::tell_name(&name.unwrap())).await {
Ok(_) => {}
Err(e) => {
print_error!(e);
session.close();
break;
}
}
}
}
} else {
let buffer = if buffer[0] == protocol::Headers::FILE {
let file_name_len = u16::from_be_bytes([buffer[1], buffer[2]]) as usize;
@ -277,6 +281,7 @@ impl SessionManager {
let server_v4 = TcpListener::bind(SocketAddr::new("0.0.0.0".parse().unwrap(), constants::PORT)).await?;
let (sender, mut receiver) = mpsc::channel(1);
*session_manager.listener_stop_signal.lock().unwrap() = Some(sender);
*session_manager.mdns_service.lock().unwrap() = Some(discovery::advertise_me());
tokio::spawn(async move {
loop {
let (stream, _addr) = (tokio::select! {
@ -422,8 +427,13 @@ impl SessionManager {
#[allow(unused_must_use)]
pub async fn stop(&self) {
self.listener_stop_signal.lock().unwrap().as_ref().unwrap().send(()).await;
self.set_identity(None).await;
*self.mdns_service.lock().unwrap() = None; //unregister mdns service
let mut sender = self.listener_stop_signal.lock().unwrap();
if sender.is_some() {
sender.as_ref().unwrap().send(()).await;
*sender = None;
}
self.set_identity(None);
for (_, _, sender) in self.sessions.read().unwrap().values() {
sender.send(SessionCommand::Close).await;
}
@ -438,7 +448,7 @@ impl SessionManager {
}
#[allow(unused_must_use)]
pub async fn set_identity(&self, identity: Option<Identity>) {
pub fn set_identity(&self, identity: Option<Identity>) {
let mut identity_guard = self.identity.write().unwrap();
if identity.is_none() { //logout
identity_guard.as_mut().unwrap().zeroize();
@ -477,6 +487,7 @@ impl SessionManager {
last_loaded_msg_offsets: RwLock::new(HashMap::new()),
saved_msgs: Mutex::new(HashMap::new()),
not_seen: RwLock::new(Vec::new()),
mdns_service: Mutex::new(None),
listener_stop_signal: Mutex::new(None),
}
}