Skip to content
Open
5 changes: 4 additions & 1 deletion rust/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,5 +17,8 @@ members = [

# libp2p modules
"libp2p_modules/qaul_info",
"libp2p_modules/qaul_messaging"
"libp2p_modules/qaul_messaging",

# simulation
"qaul-sim"
]
2 changes: 0 additions & 2 deletions rust/ble_module/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,12 @@ crossbeam-channel = "0.5"
filetime = "0.2"
futures = "0.3"
futures-concurrency = "7.7"
lazy_static = "1.4"
log = "0.4"
prost = "0.14"
rand = "0.10"
serde = "1.0"
serde_json = "1.0"
simplelog = "0.12"
state = "0.6"
tokio = { version = "1", features = ["full"] }
tokio-stream = "0.1"

Expand Down
52 changes: 28 additions & 24 deletions rust/ble_module/src/ble/ble_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
// This software is published under the AGPLv3 license.

use super::super::BleRpc;
use super::utils::find_device_by_mac;
use super::utils::BleDeviceState;
use crate::{
ble::ble_uuids::{main_service_uuid, msg_char, read_char},
ble::utils,
Expand All @@ -17,21 +17,15 @@ use bytes::Bytes;
use futures::FutureExt;
use futures::StreamExt;
use futures_concurrency::stream::Merge;
use lazy_static::lazy_static;
use std::{
cell::RefCell,
collections::{HashMap, HashSet, VecDeque},
error::Error,
sync::Mutex,
sync::{Arc, Mutex},
};
use tokio::io::AsyncReadExt;
use tokio::task::JoinHandle;

lazy_static! {
static ref HASH_MAP: Mutex<HashMap<String, VecDeque<(String, Vec<u8>, Vec<u8>)>>> =
Mutex::new(HashMap::new());
}

pub enum QaulBleService {
Idle(IdleBleService),
Started(StartedBleService),
Expand All @@ -53,6 +47,8 @@ pub struct IdleBleService {
_session: Session,
device_block_list: Vec<Address>,
address_lookup: RefCell<HashMap<Vec<u8>, Address>>,
send_queue_map: Mutex<HashMap<String, VecDeque<(String, Vec<u8>, Vec<u8>)>>>,
device_state: Arc<BleDeviceState>,
}

enum BleMainLoopEvent {
Expand All @@ -61,10 +57,11 @@ enum BleMainLoopEvent {
}

impl IdleBleService {
/// Initialize a new BleService.
/// Initialize a new BleService.
///
/// Gets default Bluetooth adapter and initializes a Bluer session
pub async fn new() -> Result<QaulBleService, Box<dyn Error>> {
let device_state = BleDeviceState::new();
let session = bluer::Session::new().await?;
let agent = bluer::agent::Agent {
request_default: false,
Expand All @@ -79,6 +76,8 @@ impl IdleBleService {
_session: session,
device_block_list: vec![],
address_lookup: RefCell::new(HashMap::new()),
send_queue_map: Mutex::new(HashMap::new()),
device_state,
}))
}

Expand Down Expand Up @@ -147,6 +146,7 @@ impl IdleBleService {

let adp = self.adapter.clone();
let cmd_tx2 = cmd_tx.clone();
let device_state_for_read = self.device_state.clone();
let main_characterstic = Characteristic {
uuid: read_char(),
read: Some(CharacteristicRead {
Expand All @@ -160,13 +160,14 @@ impl IdleBleService {
// Below snippet checks for device presence in ignore list(discovered devices lsit)
// If device is present, it updates the last found time.
// Else, It triggers an event to try to discover and connect to device.
match utils::find_ignore_device_by_mac(req.device_address) {
match device_state_for_read.find_ignore_device_by_mac(req.device_address) {
Some(_) => {
// utils::update_last_found(req.device_address);
// device_state_for_read.update_last_found(req.device_address);
}
None => {
let adp2 = adp.clone();
let cmd_tx2 = cmd_tx2.clone();
let ds = device_state_for_read.clone();
tokio::task::spawn(async move {
match adp2.device(req.device_address) {
Ok(device) => {
Expand All @@ -192,7 +193,7 @@ impl IdleBleService {
last_found_time: utils::current_time_millis(),
is_connected: false,
};
utils::add_ignore_device(ble_device);
ds.add_ignore_device(ble_device);
log::info!("Device discovered event sent");
}
Err(err) => {
Expand Down Expand Up @@ -253,6 +254,7 @@ impl IdleBleService {
// Set up discovery filter and start streaming the discovered devices adn out of range checker.
let _ = adapter.set_discovery_filter(get_filter()).await;
let mut device_result_sender = internal_sender.clone();
let device_state_for_stream = self.device_state.clone();
let device_stream = match adapter.discover_devices().await {
Ok(addr_stream) => addr_stream.filter_map(|evt| {
let result = match evt {
Expand All @@ -270,7 +272,7 @@ impl IdleBleService {
}
}
AdapterEvent::DeviceRemoved(addr) => {
utils::find_device_by_mac(addr).map(|device| {
device_state_for_stream.find_device_by_mac(addr).map(|device| {
device_result_sender.send_device_unavailable(
device.qaul_id.clone(),
adapter.clone(),
Expand All @@ -294,7 +296,7 @@ impl IdleBleService {
.await;

// TODO: Setup out of range checker.
// utils::out_of_range_checker(adapter.clone(), internal_sender.clone());
// utils::out_of_range_checker(adapter.clone(), internal_sender.clone(), self.device_state.clone());

let cmd_rx_stream = tokio_stream::wrappers::ReceiverStream::new(cmd_rx);
let rpc_reciever_stream =
Expand Down Expand Up @@ -506,8 +508,8 @@ impl IdleBleService {
name: device.name().await.unwrap().unwrap_or_default(),
is_connected: false,
};
utils::add_ignore_device(ble_device.clone());
utils::add_device(ble_device);
self.device_state.add_ignore_device(ble_device.clone());
self.device_state.add_device(ble_device);
read_char_uuid_found = true;
log::info!(
"Read characteristic found for device {} with qaul ID {:?}",
Expand Down Expand Up @@ -545,6 +547,7 @@ impl IdleBleService {
let (ble_msg_sender, mut ble_msg_reciever) =
tokio::sync::mpsc::unbounded_channel::<(Address, Vec<u8>)>();

let device_state_msg = self.device_state.clone();
tokio::task::spawn_local(async move {
log::info!("Spawned message listener for device.");
loop {
Expand All @@ -562,7 +565,7 @@ impl IdleBleService {
log::info!("Received message: {:?} from {:?}", &hex_msg, &mac_address);
let stringified_addr = utils::mac_to_string(&mac_address);

match utils::find_msg_map_by_mac(stringified_addr.clone()) {
match device_state_msg.find_msg_map_by_mac(stringified_addr.clone()) {
Some(mut old_value) => {
if hex_msg.ends_with("2424")
|| (old_value.ends_with("24") && hex_msg == "24")
Expand All @@ -577,11 +580,11 @@ impl IdleBleService {
(hex_msg, mac_address),
internal_sender.clone(),
);
utils::remove_msg_map_by_mac(stringified_addr);
device_state_msg.remove_msg_map_by_mac(stringified_addr);
}
} else {
old_value += &hex_msg;
utils::add_msg_map(stringified_addr, old_value);
device_state_msg.add_msg_map(stringified_addr, old_value);
}
}
None => {
Expand All @@ -595,7 +598,7 @@ impl IdleBleService {
);
}
} else if hex_msg.starts_with("2424") {
utils::add_msg_map(stringified_addr, hex_msg.clone());
device_state_msg.add_msg_map(stringified_addr, hex_msg.clone());
} else {
// Error handling
}
Expand All @@ -605,15 +608,16 @@ impl IdleBleService {
}
});

let device_state_write = self.device_state.clone();
tokio::task::spawn_local(async move {
loop {
match msg_chara_ctrl.next().await {
Some(CharacteristicControlEvent::Write(write)) => {
let mut device_known: bool = true;
let mac_address = write.device_address();
match find_device_by_mac(mac_address) {
match device_state_write.find_device_by_mac(mac_address) {
Some(_) => {
// utils::update_last_found(mac_address);
// device_state_write.update_last_found(mac_address);
}
None => {
device_known = false;
Expand Down Expand Up @@ -672,7 +676,7 @@ impl IdleBleService {
return Err("Error getting device".into());
}
};
let mut hash_map = HASH_MAP.lock().unwrap();
let mut hash_map = self.send_queue_map.lock().unwrap();
match hash_map.get(&stringified_addr) {
Some(queue) => {
let mut queue = queue.clone();
Expand Down Expand Up @@ -741,7 +745,7 @@ impl IdleBleService {
if service.uuid().await? == main_service_uuid() {
for chara in service.characteristics().await? {
if chara.uuid().await? == msg_char() {
utils::update_last_found(*mac_address);
self.device_state.update_last_found(*mac_address);
read_char_found = true;
while send_queue.len() > 0 {
let data: String;
Expand Down
Loading
Loading