Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 32 additions & 15 deletions src/receive.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ use crate::error::errors::*;
/// This is used for uniquely identifying sources when counting sequence numbers.
use uuid::Uuid;

use std::borrow::Cow;
use std::cmp::{Ordering, max};
use std::collections::HashMap;
use std::fmt;
Expand Down Expand Up @@ -208,6 +207,9 @@ pub struct DiscoveredSacnSource {
/// The name of the source, no protocol guarantee this will be unique but if it isn't then universe discovery may not work correctly.
pub name: String,

/// The unique CID of the source. This should be unique across all devices on the network.
pub cid: Uuid,

/// The time at which the discovered source was last updated / a discovery packet was received by the source.
pub last_updated: Instant,

Expand Down Expand Up @@ -550,7 +552,7 @@ impl SacnReceiver {
SynchronizationPacket(s) => self.handle_sync_packet(pdu.cid, s)?,
UniverseDiscoveryPacket(u) => {
let discovered_src: Option<String> =
self.handle_universe_discovery_packet(u);
self.handle_universe_discovery_packet(pdu.cid, u);
if let Some(src) = discovered_src
&& self.announce_source_discovery
{
Expand Down Expand Up @@ -730,7 +732,7 @@ impl SacnReceiver {
}

if data_pkt.stream_terminated {
self.terminate_stream(cid, data_pkt.source_name, data_pkt.universe);
self.terminate_stream(cid, data_pkt.universe);
if self.announce_stream_termination {
return Err(SacnError::UniverseTerminated(cid, data_pkt.universe));
}
Expand Down Expand Up @@ -804,16 +806,14 @@ impl SacnReceiver {
///
/// src_cid: The CID of the source which is terminating a universe.
///
/// source_name: The human readable name of the sACN source to remove the universe from.
///
/// universe: The sACN universe to remove.
fn terminate_stream<'a>(&mut self, src_cid: Uuid, source_name: Cow<'a, str>, universe: u16) {
fn terminate_stream(&mut self, src_cid: Uuid, universe: u16) {
// Will only return an error if the source/universe wasn't found which is acceptable because as it
// comes to the same result.
let _ = self.sequences.remove_seq_numbers(src_cid, universe);

// As with sequence numbers the source might not be found which is acceptable.
if let Some(index) = find_discovered_src(&self.discovered_sources, &source_name.to_string())
if let Some(index) = find_discovered_src(&self.discovered_sources, &src_cid)
{
self.discovered_sources[index].terminate_universe(universe);
}
Expand Down Expand Up @@ -921,7 +921,7 @@ impl SacnReceiver {
/// Arguments:
/// src: The DiscoveredSacnSource to update the record of discovered sacn sources with.
fn update_discovered_srcs(&mut self, src: DiscoveredSacnSource) {
if let Some(index) = find_discovered_src(&self.discovered_sources, &src.name) {
if let Some(index) = find_discovered_src(&self.discovered_sources, &src.cid) {
self.discovered_sources.remove(index);
}
self.discovered_sources.push(src);
Expand All @@ -935,9 +935,13 @@ impl SacnReceiver {
/// Returns the source name if a source was fully discovered or None if the source was only partially discovered.
///
/// Arguments:
///
/// cid: the source CID.
///
/// discovery_pkt: The universe discovery part of the universe discovery packet to handle.
fn handle_universe_discovery_packet(
&mut self,
cid: Uuid,
discovery_pkt: UniverseDiscoveryPacketFramingLayer,
) -> Option<String> {
let data: UniverseDiscoveryPacketUniverseDiscoveryLayer = discovery_pkt.data;
Expand All @@ -955,7 +959,7 @@ impl SacnReceiver {
// See if some pages that belong to the source that this page belongs to have already been received.
match find_discovered_src(
&self.partially_discovered_sources,
&discovery_pkt.source_name.to_string(),
&cid
) {
Some(index) => {
// Some pages have already been received from this source.
Expand All @@ -974,6 +978,7 @@ impl SacnReceiver {
// This is the first page received from this source.
let discovered_src: DiscoveredSacnSource = DiscoveredSacnSource {
name: discovery_pkt.source_name.to_string(),
cid,
last_page,
pages: vec![uni_page],
last_updated: Instant::now(),
Expand Down Expand Up @@ -1034,10 +1039,12 @@ impl Drop for SacnReceiver {
/// returns the index of the src in the Vec or None if not found.
///
/// Arguments:
///
/// srcs: The Vec of DiscoveredSacnSources to search.
/// name: The human readable name of the source to find.
fn find_discovered_src(srcs: &[DiscoveredSacnSource], name: &String) -> Option<usize> {
(0..srcs.len()).find(|&i| srcs[i].name == *name)
///
/// cid: The CID (uuid) of the source to find.
fn find_discovered_src(srcs: &[DiscoveredSacnSource], cid: &Uuid) -> Option<usize> {
(0..srcs.len()).find(|&i| srcs[i].cid == *cid)
}

/// In general the lower level transport layer is handled by SacnNetworkReceiver (which itself wraps a Socket).
Expand Down Expand Up @@ -2162,6 +2169,10 @@ mod test {
let mut dmx_rcv = SacnReceiver::with_ip(addr, None).unwrap();

let name = "Test Src 1";
let src_cid: Uuid = Uuid::from_bytes([
0xef, 0x07, 0xc8, 0xdd, 0x00, 0x64, 0x44, 0x01, 0xa3, 0xa2, 0x45, 0x9e, 0xf8, 0xe6,
0x14, 0x3e,
]);
let page: u8 = 0;
let last_page: u8 = 0;
let universes: Vec<u16> = vec![0, 1, 2, 3, 4, 5];
Expand All @@ -2181,14 +2192,15 @@ mod test {
universes: universes.clone().into(),
},
};
let res: Option<String> = dmx_rcv.handle_universe_discovery_packet(discovery_pkt);
let res: Option<String> = dmx_rcv.handle_universe_discovery_packet(src_cid, discovery_pkt);

assert!(res.is_some());
assert_eq!(res.unwrap(), name);

assert_eq!(dmx_rcv.discovered_sources.len(), 1);

assert_eq!(dmx_rcv.discovered_sources[0].name, name);
assert_eq!(dmx_rcv.discovered_sources[0].cid, src_cid);
assert_eq!(dmx_rcv.discovered_sources[0].last_page, last_page);
assert_eq!(dmx_rcv.discovered_sources[0].pages.len(), 1);
assert_eq!(dmx_rcv.discovered_sources[0].pages[0].page, page);
Expand All @@ -2202,6 +2214,10 @@ mod test {
let mut dmx_rcv = SacnReceiver::with_ip(addr, None).unwrap();

let name = "Test Src 1";
let src_cid: Uuid = Uuid::from_bytes([
0xef, 0x07, 0xc8, 0xdd, 0x00, 0x64, 0x44, 0x01, 0xa3, 0xa2, 0x45, 0x9e, 0xf8, 0xe6,
0x14, 0x3e,
]);
let last_page: u8 = 1;
let mut universes_page_1: Vec<u16> = Vec::new();
let mut universes_page_2: Vec<u16> = Vec::new();
Expand Down Expand Up @@ -2245,18 +2261,19 @@ mod test {
universes: universes_page_2.clone().into(),
},
};
let res: Option<String> = dmx_rcv.handle_universe_discovery_packet(discovery_pkt_1);
let res: Option<String> = dmx_rcv.handle_universe_discovery_packet(src_cid, discovery_pkt_1);

assert!(res.is_none()); // Should be none because first packet isn't complete as its only the first page.

let res2: Option<String> = dmx_rcv.handle_universe_discovery_packet(discovery_pkt_2);
let res2: Option<String> = dmx_rcv.handle_universe_discovery_packet(src_cid, discovery_pkt_2);

assert!(res2.is_some()); // Source should be discovered because the second and last page is now received.
assert_eq!(res2.unwrap(), name);

assert_eq!(dmx_rcv.discovered_sources.len(), 1);

assert_eq!(dmx_rcv.discovered_sources[0].name, name);
assert_eq!(dmx_rcv.discovered_sources[0].cid, src_cid);
assert_eq!(dmx_rcv.discovered_sources[0].last_page, last_page);
assert_eq!(dmx_rcv.discovered_sources[0].pages.len(), 2);
assert_eq!(dmx_rcv.discovered_sources[0].pages[0].page, 0);
Expand Down