diff --git a/plugins/src/signaller/imp.rs b/plugins/src/signaller/imp.rs index 26ef1e9..e715251 100644 --- a/plugins/src/signaller/imp.rs +++ b/plugins/src/signaller/imp.rs @@ -105,10 +105,11 @@ impl Signaller { } else { None }; - websocket_sender - .send(p::IncomingMessage::Register(p::RegisterMessage::Producer { + .send(p::IncomingMessage::SetPeerStatus(p::PeerStatus { + roles: vec![p::PeerRole::Producer], meta, + peer_id: None, })) .await?; @@ -122,9 +123,7 @@ impl Signaller { if let Ok(msg) = serde_json::from_str::(&msg) { match msg { - p::OutgoingMessage::Registered( - p::RegisteredMessage::Producer { peer_id, .. }, - ) => { + p::OutgoingMessage::Welcome { peer_id } => { gst::info!( CAT, obj: &element, @@ -132,24 +131,30 @@ impl Signaller { peer_id ); } - p::OutgoingMessage::Registered(_) => unreachable!(), - p::OutgoingMessage::StartSession { peer_id } => { - if let Err(err) = element.add_consumer(&peer_id) { + p::OutgoingMessage::StartSession { + session_id, + peer_id, + } => { + if let Err(err) = + element.start_session(&session_id, &peer_id) + { gst::warning!(CAT, obj: &element, "{}", err); } } - p::OutgoingMessage::EndSession { peer_id } => { - if let Err(err) = element.remove_consumer(&peer_id) { + p::OutgoingMessage::EndSession(session_info) => { + if let Err(err) = + element.end_session(&session_info.session_id) + { gst::warning!(CAT, obj: &element, "{}", err); } } p::OutgoingMessage::Peer(p::PeerMessage { - peer_id, + session_id, peer_message, }) => match peer_message { p::PeerMessageInner::Sdp(p::SdpMessage::Answer { sdp }) => { if let Err(err) = element.handle_sdp( - &peer_id, + &session_id, &gst_webrtc::WebRTCSessionDescription::new( gst_webrtc::WebRTCSDPType::Answer, gst_sdp::SDPMessage::parse_buffer( @@ -175,7 +180,7 @@ impl Signaller { sdp_m_line_index, } => { if let Err(err) = element.handle_ice( - &peer_id, + &session_id, Some(sdp_m_line_index), None, &candidate, @@ -254,13 +259,13 @@ impl Signaller { pub fn handle_sdp( &self, element: &WebRTCSink, - peer_id: &str, + session_id: &str, sdp: &gst_webrtc::WebRTCSessionDescription, ) { let state = self.state.lock().unwrap(); let msg = p::IncomingMessage::Peer(p::PeerMessage { - peer_id: peer_id.to_string(), + session_id: session_id.to_string(), peer_message: p::PeerMessageInner::Sdp(p::SdpMessage::Offer { sdp: sdp.sdp().as_text().unwrap(), }), @@ -281,7 +286,7 @@ impl Signaller { pub fn handle_ice( &self, element: &WebRTCSink, - peer_id: &str, + session_id: &str, candidate: &str, sdp_m_line_index: Option, _sdp_mid: Option, @@ -289,7 +294,7 @@ impl Signaller { let state = self.state.lock().unwrap(); let msg = p::IncomingMessage::Peer(p::PeerMessage { - peer_id: peer_id.to_string(), + session_id: session_id.to_string(), peer_message: p::PeerMessageInner::Ice { candidate: candidate.to_string(), sdp_m_line_index: sdp_m_line_index.unwrap(), @@ -331,17 +336,17 @@ impl Signaller { } } - pub fn consumer_removed(&self, element: &WebRTCSink, peer_id: &str) { - gst::debug!(CAT, obj: element, "Signalling consumer {} removed", peer_id); + pub fn end_session(&self, element: &WebRTCSink, session_id: &str) { + gst::debug!(CAT, obj: element, "Signalling session {} ended", session_id); let state = self.state.lock().unwrap(); - let peer_id = peer_id.to_string(); + let session_id = session_id.to_string(); let element = element.downgrade(); if let Some(mut sender) = state.websocket_sender.clone() { task::spawn(async move { if let Err(err) = sender .send(p::IncomingMessage::EndSession(p::EndSessionMessage { - peer_id: peer_id.to_string(), + session_id: session_id.to_string(), })) .await { diff --git a/plugins/src/signaller/mod.rs b/plugins/src/signaller/mod.rs index 8caf2e6..2959a24 100644 --- a/plugins/src/signaller/mod.rs +++ b/plugins/src/signaller/mod.rs @@ -34,13 +34,13 @@ impl Signallable for Signaller { fn handle_ice( &mut self, element: &WebRTCSink, - peer_id: &str, + session_id: &str, candidate: &str, sdp_mline_index: Option, sdp_mid: Option, ) -> Result<(), Box> { let signaller = imp::Signaller::from_instance(self); - signaller.handle_ice(element, peer_id, candidate, sdp_mline_index, sdp_mid); + signaller.handle_ice(element, session_id, candidate, sdp_mline_index, sdp_mid); Ok(()) } @@ -49,9 +49,9 @@ impl Signallable for Signaller { signaller.stop(element); } - fn consumer_removed(&mut self, element: &WebRTCSink, peer_id: &str) { + fn session_ended(&mut self, element: &WebRTCSink, session_id: &str) { let signaller = imp::Signaller::from_instance(self); - signaller.consumer_removed(element, peer_id); + signaller.end_session(element, session_id); } } diff --git a/plugins/src/webrtcsink/imp.rs b/plugins/src/webrtcsink/imp.rs index 9c60be2..07840d7 100644 --- a/plugins/src/webrtcsink/imp.rs +++ b/plugins/src/webrtcsink/imp.rs @@ -125,12 +125,14 @@ pub struct VideoEncoder { filter: gst::Element, halved_framerate: gst::Fraction, video_info: gst_video::VideoInfo, - peer_id: String, + session_id: String, mitigation_mode: WebRTCSinkMitigationMode, pub transceiver: gst_webrtc::WebRTCRTPTransceiver, } -struct Consumer { +struct Session { + id: String, + pipeline: gst::Pipeline, webrtcbin: gst::Element, rtprtxsend: Option, @@ -165,7 +167,7 @@ struct NavigationEvent { struct State { signaller: Box, signaller_state: SignallerState, - consumers: HashMap, + sessions: HashMap, codecs: BTreeMap, /// Used to abort codec discovery codecs_abort_handle: Option, @@ -275,7 +277,7 @@ impl Default for State { Self { signaller: Box::new(signaller), signaller_state: SignallerState::Stopped, - consumers: HashMap::new(), + sessions: HashMap::new(), codecs: BTreeMap::new(), codecs_abort_handle: None, codecs_done_receiver: None, @@ -541,7 +543,7 @@ impl VideoEncoder { filter, halved_framerate, video_info, - peer_id: peer_id.to_string(), + session_id: peer_id.to_string(), mitigation_mode: WebRTCSinkMitigationMode::NONE, transceiver, } @@ -632,8 +634,8 @@ impl VideoEncoder { gst::log!( CAT, obj: element, - "consumer {}: setting bitrate {} and caps {} on encoder {:?}", - self.peer_id, + "session {}: setting bitrate {} and caps {} on encoder {:?}", + self.session_id, bitrate, caps, self.element @@ -657,39 +659,40 @@ impl VideoEncoder { } impl State { - fn finalize_consumer( + fn finalize_session( &mut self, element: &super::WebRTCSink, - consumer: &mut Consumer, + session: &mut Session, signal: bool, ) { - consumer.pipeline.debug_to_dot_file_with_ts( + gst::info!(CAT, "Ending session {}", session.id); + session.pipeline.debug_to_dot_file_with_ts( gst::DebugGraphDetails::all(), - format!("removing-peer-{}-", consumer.peer_id,), + format!("removing-peer-{}-", session.peer_id,), ); - for ssrc in consumer.webrtc_pads.keys() { - consumer.links.remove(ssrc); + for ssrc in session.webrtc_pads.keys() { + session.links.remove(ssrc); } - consumer.pipeline.call_async(|pipeline| { + session.pipeline.call_async(|pipeline| { let _ = pipeline.set_state(gst::State::Null); }); if signal { - self.signaller.consumer_removed(element, &consumer.peer_id); + self.signaller.session_ended(element, &session.peer_id); } } - fn remove_consumer( + fn end_session( &mut self, element: &super::WebRTCSink, - peer_id: &str, + session_id: &str, signal: bool, - ) -> Option { - if let Some(mut consumer) = self.consumers.remove(peer_id) { - self.finalize_consumer(element, &mut consumer, signal); - Some(consumer) + ) -> Option { + if let Some(mut session) = self.sessions.remove(session_id) { + self.finalize_session(element, &mut session, signal); + Some(session) } else { None } @@ -723,8 +726,9 @@ impl State { } } -impl Consumer { +impl Session { fn new( + id: String, pipeline: gst::Pipeline, webrtcbin: gst::Element, peer_id: String, @@ -732,6 +736,7 @@ impl Consumer { cc_info: CCInfo, ) -> Self { Self { + id, pipeline, webrtcbin, peer_id, @@ -1144,10 +1149,10 @@ impl WebRTCSink { let mut state = self.state.lock().unwrap(); - let consumer_ids: Vec<_> = state.consumers.keys().map(|k| k.to_owned()).collect(); + let session_ids: Vec<_> = state.sessions.keys().map(|k| k.to_owned()).collect(); - for id in consumer_ids { - state.remove_consumer(element, &id, true); + for id in session_ids { + state.end_session(element, &id, true); } state @@ -1197,39 +1202,39 @@ impl WebRTCSink { &self, element: &super::WebRTCSink, offer: gst_webrtc::WebRTCSessionDescription, - peer_id: &str, + session_id: &str, ) { let mut state = self.state.lock().unwrap(); - if let Some(consumer) = state.consumers.get(peer_id) { - consumer + if let Some(session) = state.sessions.get(session_id) { + session .webrtcbin .emit_by_name::<()>("set-local-description", &[&offer, &None::]); - if let Err(err) = state.signaller.handle_sdp(element, peer_id, &offer) { + if let Err(err) = state.signaller.handle_sdp(element, session_id, &offer) { gst::warning!( CAT, - "Failed to handle SDP for consumer {}: {}", - peer_id, + "Failed to handle SDP for session {}: {}", + session_id, err ); - state.remove_consumer(element, peer_id, true); + state.end_session(element, session_id, true); } } } - fn negotiate(&self, element: &super::WebRTCSink, peer_id: &str) { + fn negotiate(&self, element: &super::WebRTCSink, session_id: &str) { let state = self.state.lock().unwrap(); - gst::debug!(CAT, obj: element, "Negotiating for peer {}", peer_id); + gst::debug!(CAT, obj: element, "Negotiating for session {}", session_id); - if let Some(consumer) = state.consumers.get(peer_id) { + if let Some(session) = state.sessions.get(session_id) { let element = element.downgrade(); - gst::debug!(CAT, "Creating offer for peer {}", peer_id); - let peer_id = peer_id.to_string(); + gst::debug!(CAT, "Creating offer for session {}", session_id); + let session_id = session_id.to_string(); let promise = gst::Promise::with_change_func(move |reply| { - gst::debug!(CAT, "Created offer for peer {}", peer_id); + gst::debug!(CAT, "Created offer for session {}", session_id); if let Some(element) = element.upgrade() { let this = Self::from_instance(&element); @@ -1240,9 +1245,9 @@ impl WebRTCSink { CAT, obj: &element, "Promise returned without a reply for {}", - peer_id + session_id ); - let _ = this.remove_consumer(&element, &peer_id, true); + let _ = this.remove_session(&element, &session_id, true); return; } Err(err) => { @@ -1250,10 +1255,10 @@ impl WebRTCSink { CAT, obj: &element, "Promise returned with an error for {}: {:?}", - peer_id, + session_id, err ); - let _ = this.remove_consumer(&element, &peer_id, true); + let _ = this.remove_session(&element, &session_id, true); return; } }; @@ -1262,28 +1267,29 @@ impl WebRTCSink { .value("offer") .map(|offer| offer.get::().unwrap()) { - this.on_offer_created(&element, offer, &peer_id); + this.on_offer_created(&element, offer, &session_id); } else { gst::warning!( CAT, - "Reply without an offer for consumer {}: {:?}", - peer_id, + "Reply without an offer for session {}: {:?}", + session_id, reply ); - let _ = this.remove_consumer(&element, &peer_id, true); + let _ = this.remove_session(&element, &session_id, true); } } }); - consumer + session .webrtcbin .emit_by_name::<()>("create-offer", &[&None::, &promise]); } else { gst::debug!( CAT, obj: element, - "consumer for peer {} no longer exists", - peer_id + "consumer for session {} no longer exists (sessions: {:?}", + session_id, + state.sessions.keys().map(|id| id) ); } } @@ -1291,47 +1297,58 @@ impl WebRTCSink { fn on_ice_candidate( &self, element: &super::WebRTCSink, - peer_id: String, + session_id: String, sdp_m_line_index: u32, candidate: String, ) { let mut state = self.state.lock().unwrap(); - if let Err(err) = - state - .signaller - .handle_ice(element, &peer_id, &candidate, Some(sdp_m_line_index), None) - { + if let Err(err) = state.signaller.handle_ice( + element, + &session_id, + &candidate, + Some(sdp_m_line_index), + None, + ) { gst::warning!( CAT, - "Failed to handle ICE for consumer {}: {}", - peer_id, + "Failed to handle ICE in session {}: {}", + session_id, err ); - state.remove_consumer(element, &peer_id, true); + state.end_session(element, &session_id, true); } } - /// Called by the signaller to add a new consumer - pub fn add_consumer( + /// Called by the signaller to add a new session + pub fn start_session( &self, element: &super::WebRTCSink, + session_id: &str, peer_id: &str, ) -> Result<(), WebRTCSinkError> { let settings = self.settings.lock().unwrap(); let mut state = self.state.lock().unwrap(); let peer_id = peer_id.to_string(); + let session_id = session_id.to_string(); - if state.consumers.contains_key(&peer_id) { - return Err(WebRTCSinkError::DuplicateConsumerId(peer_id)); + if state.sessions.contains_key(&session_id) { + return Err(WebRTCSinkError::DuplicateSessionId(session_id)); } - gst::info!(CAT, obj: element, "Adding consumer {}", peer_id); + gst::info!( + CAT, + obj: element, + "Adding session: {} for peer: {}", + peer_id, + session_id + ); - let pipeline = gst::Pipeline::new(Some(&format!("consumer-pipeline-{}", peer_id))); + let pipeline = gst::Pipeline::new(Some(&format!("session-pipeline-{}", session_id))); - let webrtcbin = make_element("webrtcbin", Some(&format!("webrtcbin-{}", peer_id))) - .map_err(|err| WebRTCSinkError::ConsumerPipelineError { + let webrtcbin = make_element("webrtcbin", Some(&format!("webrtcbin-{}", session_id))) + .map_err(|err| WebRTCSinkError::SessionPipelineError { + session_id: session_id.clone(), peer_id: peer_id.clone(), details: err.to_string(), })?; @@ -1351,7 +1368,7 @@ impl WebRTCSink { webrtcbin.connect_closure( "request-aux-sender", false, - glib::closure!(@watch element, @strong peer_id + glib::closure!(@watch element, @strong session_id => move |_webrtcbin: gst::Element, _transport: gst::Object| { let settings = element.imp().settings.lock().unwrap(); @@ -1378,9 +1395,9 @@ impl WebRTCSink { }; cc.connect_notify(Some("estimated-bitrate"), - glib::clone!(@weak element, @strong peer_id + glib::clone!(@weak element, @strong session_id => move |bwe, pspec| { - element.imp().set_bitrate(&element, &peer_id, + element.imp().set_bitrate(&element, &session_id, bwe.property::(pspec.name())); } )); @@ -1392,12 +1409,12 @@ impl WebRTCSink { webrtcbin.connect_closure( "deep-element-added", false, - glib::closure!(@watch element, @strong peer_id + glib::closure!(@watch element, @strong session_id => move |_webrtcbin: gst::Element, _bin: gst::Bin, e: gst::Element| { if e.factory().map_or(false, |f| f.name() == "rtprtxsend") { if e.has_property("stuffing-kbps", Some(i32::static_type())) { - element.imp().set_rtptrxsend(&element, &peer_id, e); + element.imp().set_rtptrxsend(&element, &session_id, e); } else { gst::warning!(CAT, "rtprtxsend doesn't have a `stuffing-kbps` \ property, stuffing disabled"); @@ -1412,7 +1429,7 @@ impl WebRTCSink { pipeline.add(&webrtcbin).unwrap(); let element_clone = element.downgrade(); - let peer_id_clone = peer_id.clone(); + let session_id_clone = session_id.clone(); webrtcbin.connect("on-ice-candidate", false, move |values| { if let Some(element) = element_clone.upgrade() { let this = Self::from_instance(&element); @@ -1420,7 +1437,7 @@ impl WebRTCSink { let candidate = values[2].get::().expect("Invalid argument"); this.on_ice_candidate( &element, - peer_id_clone.to_string(), + session_id_clone.to_string(), sdp_m_line_index, candidate, ); @@ -1430,6 +1447,7 @@ impl WebRTCSink { let element_clone = element.downgrade(); let peer_id_clone = peer_id.clone(); + let session_id_clone = session_id.clone(); webrtcbin.connect_notify(Some("connection-state"), move |webrtcbin, _pspec| { if let Some(element) = element_clone.upgrade() { let state = @@ -1441,16 +1459,18 @@ impl WebRTCSink { gst::warning!( CAT, obj: &element, - "Connection state for consumer {} failed", + "Connection state for in session {} (peer {}) failed", + session_id_clone, peer_id_clone ); - let _ = this.remove_consumer(&element, &peer_id_clone, true); + let _ = this.remove_session(&element, &session_id_clone, true); } _ => { gst::log!( CAT, obj: &element, - "Connection state for consumer {} changed: {:?}", + "Connection state in session {} (peer {}) changed: {:?}", + session_id_clone, peer_id_clone, state ); @@ -1461,6 +1481,7 @@ impl WebRTCSink { let element_clone = element.downgrade(); let peer_id_clone = peer_id.clone(); + let session_id_clone = session_id.clone(); webrtcbin.connect_notify(Some("ice-connection-state"), move |webrtcbin, _pspec| { if let Some(element) = element_clone.upgrade() { let state = webrtcbin @@ -1472,16 +1493,18 @@ impl WebRTCSink { gst::warning!( CAT, obj: &element, - "Ice connection state for consumer {} failed", - peer_id_clone + "Ice connection state in session {} (peer {}) failed", + session_id_clone, + peer_id_clone, ); - let _ = this.remove_consumer(&element, &peer_id_clone, true); + let _ = this.remove_session(&element, &session_id_clone, true); } _ => { gst::log!( CAT, obj: &element, - "Ice connection state for consumer {} changed: {:?}", + "Ice connection state in session {} (peer {}) changed: {:?}", + session_id_clone, peer_id_clone, state ); @@ -1491,8 +1514,8 @@ impl WebRTCSink { if state == gst_webrtc::WebRTCICEConnectionState::Completed { let state = this.state.lock().unwrap(); - if let Some(consumer) = state.consumers.get(&peer_id_clone) { - for webrtc_pad in consumer.webrtc_pads.values() { + if let Some(session) = state.sessions.get(&session_id_clone) { + for webrtc_pad in session.webrtc_pads.values() { if let Some(srcpad) = webrtc_pad.pad.peer() { srcpad.send_event( gst_video::UpstreamForceKeyUnitEvent::builder() @@ -1508,6 +1531,7 @@ impl WebRTCSink { let element_clone = element.downgrade(); let peer_id_clone = peer_id.clone(); + let session_id_clone = session_id.clone(); webrtcbin.connect_notify(Some("ice-gathering-state"), move |webrtcbin, _pspec| { let state = webrtcbin.property::("ice-gathering-state"); @@ -1516,14 +1540,16 @@ impl WebRTCSink { gst::log!( CAT, obj: &element, - "Ice gathering state for consumer {} changed: {:?}", + "Ice gathering state in session {} (peer {}) changed: {:?}", + session_id_clone, peer_id_clone, state ); } }); - let mut consumer = Consumer::new( + let mut session = Session::new( + session_id.clone(), pipeline.clone(), webrtcbin.clone(), peer_id.clone(), @@ -1549,23 +1575,23 @@ impl WebRTCSink { .child_by_name("rtpbin") .unwrap(); - if consumer.congestion_controller.is_some() { - let peer_id_str = peer_id.to_string(); - if consumer.stats_sigid.is_none() { - consumer.stats_sigid = Some(rtpbin.connect_closure("on-new-ssrc", true, + if session.congestion_controller.is_some() { + let session_id_str = session_id.to_string(); + if session.stats_sigid.is_none() { + session.stats_sigid = Some(rtpbin.connect_closure("on-new-ssrc", true, glib::closure!(@weak-allow-none element, @weak-allow-none webrtcbin => move |rtpbin: gst::Object, session_id: u32, _src: u32| { - let session = rtpbin.emit_by_name::("get-session", &[&session_id]); + let rtp_session = rtpbin.emit_by_name::("get-session", &[&session_id]); let element = element.expect("on-new-ssrc emited when webrtcsink has been disposed?"); let webrtcbin = webrtcbin.unwrap(); let mut state = element.imp().state.lock().unwrap(); - if let Some(mut consumer) = state.consumers.get_mut(&peer_id_str) { + if let Some(mut session) = state.sessions.get_mut(&session_id_str) { - consumer.stats_sigid = Some(session.connect_notify(Some("twcc-stats"), - glib::clone!(@strong peer_id_str, @weak webrtcbin, @weak element => @default-panic, move |sess, pspec| { + session.stats_sigid = Some(rtp_session.connect_notify(Some("twcc-stats"), + glib::clone!(@strong session_id_str, @weak webrtcbin, @weak element => @default-panic, move |sess, pspec| { // Run the Loss-based control algortithm on new peer TWCC feedbacks - element.imp().process_loss_stats(&element, &peer_id_str, &sess.property::(pspec.name())); + element.imp().process_loss_stats(&element, &session_id_str, &sess.property::(pspec.name())); }) )); } @@ -1577,7 +1603,7 @@ impl WebRTCSink { state .streams .iter() - .for_each(|(_, stream)| consumer.request_webrtcbin_pad(element, &settings, stream)); + .for_each(|(_, stream)| session.request_webrtcbin_pad(element, &settings, stream)); let clock = element.clock(); @@ -1588,7 +1614,7 @@ impl WebRTCSink { let mut bus_stream = pipeline.bus().unwrap().stream(); let element_clone = element.downgrade(); let pipeline_clone = pipeline.downgrade(); - let peer_id_clone = peer_id.to_owned(); + let session_id_clone = session_id.to_owned(); task::spawn(async move { while let Some(msg) = bus_stream.next().await { @@ -1598,12 +1624,12 @@ impl WebRTCSink { gst::MessageView::Error(err) => { gst::error!( CAT, - "Consumer {} error: {}, details: {:?}", - peer_id_clone, + "session {} error: {}, details: {:?}", + session_id_clone, err.error(), err.debug() ); - let _ = this.remove_consumer(&element, &peer_id_clone, true); + let _ = this.remove_session(&element, &session_id_clone, true); } gst::MessageView::StateChanged(state_changed) => { if let Some(pipeline) = pipeline_clone.upgrade() { @@ -1611,8 +1637,8 @@ impl WebRTCSink { pipeline.debug_to_dot_file_with_ts( gst::DebugGraphDetails::all(), format!( - "webrtcsink-peer-{}-{:?}-to-{:?}", - peer_id_clone, + "webrtcsink-session-{}-{:?}-to-{:?}", + session_id_clone, state_changed.old(), state_changed.current() ), @@ -1629,10 +1655,10 @@ impl WebRTCSink { gst::MessageView::Eos(..) => { gst::error!( CAT, - "Unexpected end of stream for consumer {}", - peer_id_clone + "Unexpected end of stream in session {}", + session_id_clone, ); - let _ = this.remove_consumer(&element, &peer_id_clone, true); + let _ = this.remove_session(&element, &session_id_clone, true); } _ => (), } @@ -1641,7 +1667,8 @@ impl WebRTCSink { }); pipeline.set_state(gst::State::Ready).map_err(|err| { - WebRTCSinkError::ConsumerPipelineError { + WebRTCSinkError::SessionPipelineError { + session_id: session_id.to_string(), peer_id: peer_id.to_string(), details: err.to_string(), } @@ -1651,7 +1678,7 @@ impl WebRTCSink { state.navigation_handler = Some(NavigationEventHandler::new(element, &webrtcbin)); } - state.consumers.insert(peer_id.to_string(), consumer); + state.sessions.insert(session_id.to_string(), session); drop(state); @@ -1669,10 +1696,11 @@ impl WebRTCSink { // // This is completely safe, as we know that by now all conditions are gathered: // webrtcbin is in the Ready state, and all its transceivers have codec_preferences. - self.negotiate(element, &peer_id); + self.negotiate(element, &session_id); pipeline.set_state(gst::State::Playing).map_err(|err| { - WebRTCSinkError::ConsumerPipelineError { + WebRTCSinkError::SessionPipelineError { + session_id: session_id.to_string(), peer_id: peer_id.to_string(), details: err.to_string(), } @@ -1682,21 +1710,21 @@ impl WebRTCSink { } /// Called by the signaller to remove a consumer - pub fn remove_consumer( + pub fn remove_session( &self, element: &super::WebRTCSink, - peer_id: &str, + session_id: &str, signal: bool, ) -> Result<(), WebRTCSinkError> { let mut state = self.state.lock().unwrap(); - if !state.consumers.contains_key(peer_id) { - return Err(WebRTCSinkError::NoConsumerWithId(peer_id.to_string())); + if !state.sessions.contains_key(session_id) { + return Err(WebRTCSinkError::NoSessionWithId(session_id.to_string())); } - if let Some(consumer) = state.remove_consumer(element, peer_id, signal) { + if let Some(session) = state.end_session(element, session_id, signal) { drop(state); - element.emit_by_name::<()>("consumer-removed", &[&peer_id, &consumer.webrtcbin]); + element.emit_by_name::<()>("consumer-removed", &[&session.peer_id, &session.webrtcbin]); } Ok(()) @@ -1705,30 +1733,35 @@ impl WebRTCSink { fn process_loss_stats( &self, element: &super::WebRTCSink, - peer_id: &str, + session_id: &str, stats: &gst::Structure, ) { let mut state = element.imp().state.lock().unwrap(); - if let Some(mut consumer) = state.consumers.get_mut(peer_id) { - if let Some(congestion_controller) = consumer.congestion_controller.as_mut() { - congestion_controller.loss_control(&element, stats, &mut consumer.encoders); + if let Some(mut session) = state.sessions.get_mut(session_id) { + if let Some(congestion_controller) = session.congestion_controller.as_mut() { + congestion_controller.loss_control(&element, stats, &mut session.encoders); } - consumer.stats = stats.to_owned(); + session.stats = stats.to_owned(); } } - fn process_stats(&self, element: &super::WebRTCSink, webrtcbin: gst::Element, peer_id: &str) { - let peer_id = peer_id.to_string(); + fn process_stats( + &self, + element: &super::WebRTCSink, + webrtcbin: gst::Element, + session_id: &str, + ) { + let session_id = session_id.to_string(); let promise = gst::Promise::with_change_func( - glib::clone!(@strong peer_id, @weak element => move |reply| { + glib::clone!(@strong session_id, @weak element => move |reply| { if let Ok(Some(stats)) = reply { let mut state = element.imp().state.lock().unwrap(); - if let Some(mut consumer) = state.consumers.get_mut(&peer_id) { - if let Some(congestion_controller) = consumer.congestion_controller.as_mut() { - congestion_controller.delay_control(&element, stats, &mut consumer.encoders,); + if let Some(mut session) = state.sessions.get_mut(&session_id) { + if let Some(congestion_controller) = session.congestion_controller.as_mut() { + congestion_controller.delay_control(&element, stats, &mut session.encoders,); } - consumer.stats = stats.to_owned(); + session.stats = stats.to_owned(); } } }), @@ -1740,34 +1773,34 @@ impl WebRTCSink { fn set_rtptrxsend(&self, element: &super::WebRTCSink, peer_id: &str, rtprtxsend: gst::Element) { let mut state = element.imp().state.lock().unwrap(); - if let Some(consumer) = state.consumers.get_mut(peer_id) { - consumer.rtprtxsend = Some(rtprtxsend); + if let Some(session) = state.sessions.get_mut(peer_id) { + session.rtprtxsend = Some(rtprtxsend); } } fn set_bitrate(&self, element: &super::WebRTCSink, peer_id: &str, bitrate: u32) { let mut state = element.imp().state.lock().unwrap(); - if let Some(consumer) = state.consumers.get_mut(peer_id) { + if let Some(session) = state.sessions.get_mut(peer_id) { let fec_ratio = { // Start adding some FEC when the bitrate > 2Mbps as we found experimentally // that it is not worth it below that threshold - if bitrate <= 2_000_000 || consumer.cc_info.max_bitrate <= 2_000_000 { + if bitrate <= 2_000_000 || session.cc_info.max_bitrate <= 2_000_000 { 0f64 } else { (bitrate as f64 - 2_000_000.) - / (consumer.cc_info.max_bitrate as f64 - 2_000_000.) + / (session.cc_info.max_bitrate as f64 - 2_000_000.) } }; let fec_percentage = fec_ratio * 50f64; let encoders_bitrate = ((bitrate as f64) / (1. + (fec_percentage / 100.))) as i32; - if let Some(ref rtpxsend) = consumer.rtprtxsend.as_ref() { + if let Some(ref rtpxsend) = session.rtprtxsend.as_ref() { rtpxsend.set_property("stuffing-kbps", (bitrate as f64 / 1000.) as i32); } - for encoder in consumer.encoders.iter_mut() { + for encoder in session.encoders.iter_mut() { encoder.set_bitrate(element, encoders_bitrate); encoder .transceiver @@ -1776,12 +1809,12 @@ impl WebRTCSink { } } - fn on_remote_description_set(&self, element: &super::WebRTCSink, peer_id: String) { + fn on_remote_description_set(&self, element: &super::WebRTCSink, session_id: String) { let mut state = self.state.lock().unwrap(); let mut remove = false; - if let Some(mut consumer) = state.consumers.remove(&peer_id) { - for webrtc_pad in consumer.webrtc_pads.clone().values() { + if let Some(mut session) = state.sessions.remove(&session_id) { + for webrtc_pad in session.webrtc_pads.clone().values() { let transceiver = webrtc_pad .pad .property::("transceiver"); @@ -1798,14 +1831,14 @@ impl WebRTCSink { .and_then(|stream| stream.producer.as_ref()) { if let Err(err) = - consumer.connect_input_stream(element, producer, webrtc_pad, &state.codecs) + session.connect_input_stream(element, producer, webrtc_pad, &state.codecs) { gst::error!( CAT, obj: element, - "Failed to connect input stream {} for consumer {}: {}", + "Failed to connect input stream {} for session {}: {}", webrtc_pad.stream_name, - peer_id, + session_id, err ); remove = true; @@ -1815,21 +1848,21 @@ impl WebRTCSink { gst::error!( CAT, obj: element, - "No producer to connect consumer {} to", - peer_id, + "No producer to connect session {} to", + session_id, ); remove = true; break; } } - consumer.pipeline.debug_to_dot_file_with_ts( + session.pipeline.debug_to_dot_file_with_ts( gst::DebugGraphDetails::all(), - format!("webrtcsink-peer-{}-remote-description-set", peer_id,), + format!("webrtcsink-peer-{}-remote-description-set", session_id,), ); let element_clone = element.downgrade(); - let webrtcbin = consumer.webrtcbin.downgrade(); + let webrtcbin = session.webrtcbin.downgrade(); task::spawn(async move { let mut interval = async_std::stream::interval(std::time::Duration::from_millis(100)); @@ -1839,7 +1872,9 @@ impl WebRTCSink { if let (Some(webrtcbin), Some(element)) = (webrtcbin.upgrade(), element_clone.upgrade()) { - element.imp().process_stats(&element, webrtcbin, &peer_id); + element + .imp() + .process_stats(&element, webrtcbin, &session_id); } else { break; } @@ -1847,9 +1882,9 @@ impl WebRTCSink { }); if remove { - state.finalize_consumer(element, &mut consumer, true); + state.finalize_session(element, &mut session, true); } else { - state.consumers.insert(consumer.peer_id.clone(), consumer); + state.sessions.insert(session.id.clone(), session); } } } @@ -1858,7 +1893,7 @@ impl WebRTCSink { pub fn handle_ice( &self, _element: &super::WebRTCSink, - peer_id: &str, + session_id: &str, sdp_m_line_index: Option, _sdp_mid: Option, candidate: &str, @@ -1867,14 +1902,14 @@ impl WebRTCSink { let sdp_m_line_index = sdp_m_line_index.ok_or(WebRTCSinkError::MandatorySdpMlineIndex)?; - if let Some(consumer) = state.consumers.get(peer_id) { - gst::trace!(CAT, "adding ice candidate for peer {}", peer_id); - consumer + if let Some(session) = state.sessions.get(session_id) { + gst::trace!(CAT, "adding ice candidate for session {}", session_id); + session .webrtcbin .emit_by_name::<()>("add-ice-candidate", &[&sdp_m_line_index, &candidate]); Ok(()) } else { - Err(WebRTCSinkError::NoConsumerWithId(peer_id.to_string())) + Err(WebRTCSinkError::NoSessionWithId(session_id.to_string())) } } @@ -1882,17 +1917,17 @@ impl WebRTCSink { pub fn handle_sdp( &self, element: &super::WebRTCSink, - peer_id: &str, + session_id: &str, desc: &gst_webrtc::WebRTCSessionDescription, ) -> Result<(), WebRTCSinkError> { let mut state = self.state.lock().unwrap(); - if let Some(consumer) = state.consumers.get_mut(peer_id) { + if let Some(session) = state.sessions.get_mut(session_id) { let sdp = desc.sdp(); - consumer.sdp = Some(sdp.to_owned()); + session.sdp = Some(sdp.to_owned()); - for webrtc_pad in consumer.webrtc_pads.values_mut() { + for webrtc_pad in session.webrtc_pads.values_mut() { let media_idx = webrtc_pad.media_idx; /* TODO: support partial answer, webrtcbin doesn't seem * very well equipped to deal with this at the moment */ @@ -1904,15 +1939,15 @@ impl WebRTCSink { gst::warning!( CAT, - "consumer {} refused media {}: {:?}", - peer_id, + "consumer from session {} refused media {}: {:?}", + session_id, media_idx, media_str ); - state.remove_consumer(element, peer_id, true); + state.end_session(element, session_id, true); return Err(WebRTCSinkError::ConsumerRefusedMedia { - peer_id: peer_id.to_string(), + session_id: session_id.to_string(), media_idx, }); } @@ -1927,39 +1962,40 @@ impl WebRTCSink { } else { gst::warning!( CAT, - "consumer {} did not provide valid payload for media index {}", - peer_id, - media_idx + "consumer from session {} did not provide valid payload for media index {} for session {}", + session_id, + media_idx, + session_id, ); - state.remove_consumer(element, peer_id, true); + state.end_session(element, session_id, true); return Err(WebRTCSinkError::ConsumerNoValidPayload { - peer_id: peer_id.to_string(), + session_id: session_id.to_string(), media_idx, }); } } let element = element.downgrade(); - let peer_id = peer_id.to_string(); + let session_id = session_id.to_string(); let promise = gst::Promise::with_change_func(move |reply| { gst::debug!(CAT, "received reply {:?}", reply); if let Some(element) = element.upgrade() { let this = Self::from_instance(&element); - this.on_remote_description_set(&element, peer_id); + this.on_remote_description_set(&element, session_id); } }); - consumer + session .webrtcbin .emit_by_name::<()>("set-remote-description", &[desc, &promise]); Ok(()) } else { - Err(WebRTCSinkError::NoConsumerWithId(peer_id.to_string())) + Err(WebRTCSinkError::NoSessionWithId(session_id.to_string())) } } @@ -2135,7 +2171,7 @@ impl WebRTCSink { self.state .lock() .unwrap() - .consumers + .sessions .iter() .map(|(name, consumer)| (name.as_str(), consumer.gather_stats().to_send_value())), ) @@ -2493,11 +2529,11 @@ impl ObjectImpl for WebRTCSink { .param_types([String::static_type(), gst::Element::static_type()]) .build(), /* - * RsWebRTCSink::get_consumers: + * RsWebRTCSink::get_sessions: * - * List all consumers (by ID). + * List all sessions (by ID). */ - glib::subclass::Signal::builder("get-consumers") + glib::subclass::Signal::builder("get-sessions") .action() .class_handler(|_, args| { let element = args[0].get::().expect("signal arg"); @@ -2507,7 +2543,7 @@ impl ObjectImpl for WebRTCSink { this.state .lock() .unwrap() - .consumers + .sessions .keys() .cloned() .collect::>() diff --git a/plugins/src/webrtcsink/mod.rs b/plugins/src/webrtcsink/mod.rs index acb7a17..da124e1 100644 --- a/plugins/src/webrtcsink/mod.rs +++ b/plugins/src/webrtcsink/mod.rs @@ -15,18 +15,22 @@ unsafe impl Sync for WebRTCSink {} #[derive(thiserror::Error, Debug)] pub enum WebRTCSinkError { - #[error("no consumer with id")] - NoConsumerWithId(String), + #[error("no session with id")] + NoSessionWithId(String), #[error("consumer refused media")] - ConsumerRefusedMedia { peer_id: String, media_idx: u32 }, + ConsumerRefusedMedia { session_id: String, media_idx: u32 }, #[error("consumer did not provide valid payload for media")] - ConsumerNoValidPayload { peer_id: String, media_idx: u32 }, + ConsumerNoValidPayload { session_id: String, media_idx: u32 }, #[error("SDP mline index is currently mandatory")] MandatorySdpMlineIndex, - #[error("duplicate consumer id")] - DuplicateConsumerId(String), + #[error("duplicate session id")] + DuplicateSessionId(String), #[error("error setting up consumer pipeline")] - ConsumerPipelineError { peer_id: String, details: String }, + SessionPipelineError { + session_id: String, + peer_id: String, + details: String, + }, } pub trait Signallable: Sync + Send + 'static { @@ -35,7 +39,7 @@ pub trait Signallable: Sync + Send + 'static { fn handle_sdp( &mut self, element: &WebRTCSink, - peer_id: &str, + session_id: &str, sdp: &gst_webrtc::WebRTCSessionDescription, ) -> Result<(), Box>; @@ -46,13 +50,13 @@ pub trait Signallable: Sync + Send + 'static { fn handle_ice( &mut self, element: &WebRTCSink, - peer_id: &str, + session_id: &str, candidate: &str, sdp_m_line_index: Option, sdp_mid: Option, ) -> Result<(), Box>; - fn consumer_removed(&mut self, element: &WebRTCSink, peer_id: &str); + fn session_ended(&mut self, element: &WebRTCSink, session_id: &str); fn stop(&mut self, element: &WebRTCSink); } @@ -86,12 +90,12 @@ impl WebRTCSink { pub fn handle_sdp( &self, - peer_id: &str, + session_id: &str, sdp: &gst_webrtc::WebRTCSessionDescription, ) -> Result<(), WebRTCSinkError> { let ws = imp::WebRTCSink::from_instance(self); - ws.handle_sdp(self, peer_id, sdp) + ws.handle_sdp(self, session_id, sdp) } /// sdp_mid is exposed for future proofing, see @@ -99,14 +103,14 @@ impl WebRTCSink { /// at the moment sdp_m_line_index must be Some pub fn handle_ice( &self, - peer_id: &str, + session_id: &str, sdp_m_line_index: Option, sdp_mid: Option, candidate: &str, ) -> Result<(), WebRTCSinkError> { let ws = imp::WebRTCSink::from_instance(self); - ws.handle_ice(self, peer_id, sdp_m_line_index, sdp_mid, candidate) + ws.handle_ice(self, session_id, sdp_m_line_index, sdp_mid, candidate) } pub fn handle_signalling_error(&self, error: Box) { @@ -115,16 +119,16 @@ impl WebRTCSink { ws.handle_signalling_error(self, anyhow::anyhow!(error)); } - pub fn add_consumer(&self, peer_id: &str) -> Result<(), WebRTCSinkError> { + pub fn start_session(&self, session_id: &str, peer_id: &str) -> Result<(), WebRTCSinkError> { let ws = imp::WebRTCSink::from_instance(self); - ws.add_consumer(self, peer_id) + ws.start_session(self, session_id, peer_id) } - pub fn remove_consumer(&self, peer_id: &str) -> Result<(), WebRTCSinkError> { + pub fn end_session(&self, session_id: &str) -> Result<(), WebRTCSinkError> { let ws = imp::WebRTCSink::from_instance(self); - ws.remove_consumer(self, peer_id, false) + ws.remove_session(self, session_id, false) } } diff --git a/protocol/src/lib.rs b/protocol/src/lib.rs index 86019ab..9af9e92 100644 --- a/protocol/src/lib.rs +++ b/protocol/src/lib.rs @@ -1,33 +1,6 @@ /// The default protocol used by the signalling server use serde::{Deserialize, Serialize}; -#[derive(Serialize, Deserialize, Debug, PartialEq)] -#[serde(tag = "peerType")] -#[serde(rename_all = "camelCase")] -/// Confirms registration -pub enum RegisteredMessage { - /// Registered as a producer - #[serde(rename_all = "camelCase")] - Producer { - peer_id: String, - #[serde(default)] - meta: Option, - }, - /// Registered as a consumer - #[serde(rename_all = "camelCase")] - Consumer { - peer_id: String, - #[serde(default)] - meta: Option, - }, - /// Registered as a listener - #[serde(rename_all = "camelCase")] - Listener { - peer_id: String, - #[serde(default)] - meta: Option, - }, -} #[derive(Serialize, Deserialize, Debug, PartialEq)] #[serde(rename_all = "camelCase")] pub struct Peer { @@ -41,28 +14,19 @@ pub struct Peer { #[serde(rename_all = "camelCase")] /// Messages sent from the server to peers pub enum OutgoingMessage { - /// Confirms registration - Registered(RegisteredMessage), - /// Notifies listeners that a producer was registered - #[serde(rename_all = "camelCase")] - ProducerAdded { - peer_id: String, - #[serde(default)] - meta: Option, - }, - /// Notifies listeners that a producer was removed + /// Welcoming message, sets the Peer ID linked to a new connection + Welcome { peer_id: String }, + /// Notifies listeners that a peer status has changed + PeerStatusChanged(PeerStatus), + /// Instructs a peer to generate an offer and inform about the session ID #[serde(rename_all = "camelCase")] - ProducerRemoved { - peer_id: String, - #[serde(default)] - meta: Option, - }, - /// Instructs a peer to generate an offer + StartSession { peer_id: String, session_id: String }, + /// Let consumer know that the requested session is starting with the specified identifier #[serde(rename_all = "camelCase")] - StartSession { peer_id: String }, + SessionStarted { peer_id: String, session_id: String }, /// Signals that the session the peer was in was ended #[serde(rename_all = "camelCase")] - EndSession { peer_id: String }, + EndSession(EndSessionMessage), /// Messages directly forwarded from one peer to another Peer(PeerMessage), /// Provides the current list of consumer peers @@ -71,29 +35,36 @@ pub enum OutgoingMessage { Error { details: String }, } -#[derive(Serialize, Deserialize, Debug)] -#[serde(tag = "peerType")] +#[derive(Serialize, Deserialize, Debug, PartialEq, Eq, Clone)] #[serde(rename_all = "camelCase")] /// Register with a peer type -pub enum RegisterMessage { +pub enum PeerRole { /// Register as a producer #[serde(rename_all = "camelCase")] - Producer { - #[serde(default)] - meta: Option, - }, - /// Register as a consumer - #[serde(rename_all = "camelCase")] - Consumer { - #[serde(default)] - meta: Option, - }, + Producer, /// Register as a listener #[serde(rename_all = "camelCase")] - Listener { - #[serde(default)] - meta: Option, - }, + Listener, +} + +#[derive(Serialize, Deserialize, Debug, PartialEq, Eq, Default, Clone)] +#[serde(rename_all = "camelCase")] +pub struct PeerStatus { + pub roles: Vec, + pub meta: Option, + #[serde(skip_serializing_if = "Option::is_none")] + #[serde(default)] + pub peer_id: Option, +} + +impl PeerStatus { + pub fn producing(&self) -> bool { + self.roles.iter().any(|t| matches!(t, PeerRole::Producer)) + } + + pub fn listening(&self) -> bool { + self.roles.iter().any(|t| matches!(t, PeerRole::Listener)) + } } #[derive(Serialize, Deserialize, Debug)] @@ -104,7 +75,7 @@ pub struct StartSessionMessage { pub peer_id: String, } -#[derive(Serialize, Deserialize, Debug, PartialEq)] +#[derive(Serialize, Deserialize, Debug, PartialEq, Clone)] #[serde(tag = "type")] #[serde(rename_all = "camelCase")] /// Conveys a SDP @@ -121,7 +92,7 @@ pub enum SdpMessage { }, } -#[derive(Serialize, Deserialize, Debug, PartialEq)] +#[derive(Serialize, Deserialize, Debug, PartialEq, Clone)] #[serde(rename_all = "camelCase")] /// Contents of the peer message pub enum PeerMessageInner { @@ -140,19 +111,17 @@ pub enum PeerMessageInner { #[serde(rename_all = "camelCase")] /// Messages directly forwarded from one peer to another pub struct PeerMessage { - /// The identifier of the peer, which must be in a session with the sender - pub peer_id: String, - /// The contents of the message + pub session_id: String, #[serde(flatten)] pub peer_message: PeerMessageInner, } -#[derive(Serialize, Deserialize, Debug)] +#[derive(Serialize, Deserialize, Debug, PartialEq, Eq)] #[serde(rename_all = "camelCase")] /// End a session pub struct EndSessionMessage { - /// The identifier of the peer to end the session with - pub peer_id: String, + /// The identifier of the session to end + pub session_id: String, } #[derive(Serialize, Deserialize, Debug)] @@ -160,8 +129,10 @@ pub struct EndSessionMessage { #[serde(rename_all = "camelCase")] /// Messages received by the server from peers pub enum IncomingMessage { - /// Register as a peer type - Register(RegisterMessage), + /// Internal message to let know about new peers + NewPeer, + /// Set current peer status + SetPeerStatus(PeerStatus), /// Start a session with a producer peer StartSession(StartSessionMessage), /// End an existing session diff --git a/signalling/src/handlers/mod.rs b/signalling/src/handlers/mod.rs index c33d9f7..de0bd8e 100644 --- a/signalling/src/handlers/mod.rs +++ b/signalling/src/handlers/mod.rs @@ -1,25 +1,45 @@ use anyhow::{anyhow, Error}; +use anyhow::{bail, Context}; use futures::prelude::*; use futures::ready; +use p::PeerStatus; use pin_project_lite::pin_project; -use std::collections::{HashMap, HashSet, VecDeque}; +use std::collections::{HashMap, VecDeque}; use std::pin::Pin; -use std::task::{Context, Poll}; +use std::task::{Context as TaskContext, Poll}; +use tracing::log::error; use tracing::{info, instrument, warn}; use webrtcsink_protocol as p; type PeerId = String; +#[derive(Clone)] +struct Session { + id: String, + producer: PeerId, + consumer: PeerId, +} + +impl Session { + fn other_peer_id(&self, id: &str) -> Result<&str, Error> { + if self.producer == id { + Ok(&self.consumer) + } else if self.consumer == id { + Ok(&self.producer) + } else { + bail!("Peer {id} is not part of {}", self.id) + } + } +} + pin_project! { #[must_use = "streams do nothing unless polled"] pub struct Handler { #[pin] stream: Pin)> + Send>>, items: VecDeque<(String, p::OutgoingMessage)>, - producers: HashMap>, - consumers: HashMap>, - listeners: HashSet, - meta: HashMap>, + peers: HashMap, + sessions: HashMap, } } @@ -32,10 +52,8 @@ impl Handler { Self { stream, items: VecDeque::new(), - producers: HashMap::new(), - consumers: HashMap::new(), - listeners: HashSet::new(), - meta: HashMap::new(), + peers: Default::default(), + sessions: Default::default(), } } @@ -46,146 +64,118 @@ impl Handler { msg: p::IncomingMessage, ) -> Result<(), Error> { match msg { - p::IncomingMessage::Register(message) => match message { - p::RegisterMessage::Producer { meta } => self.register_producer(peer_id, meta), - p::RegisterMessage::Consumer { meta } => self.register_consumer(peer_id, meta), - p::RegisterMessage::Listener { meta } => self.register_listener(peer_id, meta), - }, + p::IncomingMessage::NewPeer => { + self.peers.insert(peer_id.to_string(), Default::default()); + self.items.push_back(( + peer_id.into(), + p::OutgoingMessage::Welcome { + peer_id: peer_id.to_string(), + }, + )); + + Ok(()) + } + p::IncomingMessage::SetPeerStatus(status) => self.set_peer_status(peer_id, &status), p::IncomingMessage::StartSession(message) => { self.start_session(&message.peer_id, peer_id) } - p::IncomingMessage::Peer(p::PeerMessage { - peer_id: other_peer_id, - peer_message, - }) => match peer_message { - p::PeerMessageInner::Ice { - candidate, - sdp_m_line_index, - } => self.handle_ice(candidate, sdp_m_line_index, peer_id, &other_peer_id), - p::PeerMessageInner::Sdp(sdp_message) => match sdp_message { - p::SdpMessage::Offer { sdp } => { - self.handle_sdp_offer(sdp, peer_id, &other_peer_id) - } - p::SdpMessage::Answer { sdp } => { - self.handle_sdp_answer(sdp, peer_id, &other_peer_id) - } - }, - }, + p::IncomingMessage::Peer(peermsg) => self.handle_peer_message(peer_id, peermsg), p::IncomingMessage::List => self.list_producers(peer_id), - p::IncomingMessage::EndSession(p::EndSessionMessage { - peer_id: other_peer_id, - }) => self.end_session(peer_id, &other_peer_id), + p::IncomingMessage::EndSession(msg) => self.end_session(peer_id, &msg.session_id), } } + fn handle_peer_message(&mut self, peer_id: &str, peermsg: p::PeerMessage) -> Result<(), Error> { + let session_id = &peermsg.session_id; + let session = self + .sessions + .get(session_id) + .context(format!("Session {} doesn't exist", session_id))? + .clone(); + + if matches!( + peermsg.peer_message, + p::PeerMessageInner::Sdp(p::SdpMessage::Offer { .. }) + ) && peer_id == session.consumer + { + bail!( + r#"cannot forward offer from "{peer_id}" to "{}" as "{peer_id}" is not the producer"#, + session.producer, + ); + } + + self.items.push_back(( + session.other_peer_id(peer_id)?.to_owned(), + p::OutgoingMessage::Peer(p::PeerMessage { + session_id: session_id.to_string(), + peer_message: peermsg.peer_message.clone(), + }), + )); + + Ok(()) + } + + fn stop_producer(&mut self, peer_id: &str) { + let sessions_to_end = self + .sessions + .iter() + .filter_map(|(session_id, session)| { + if session.producer == peer_id { + Some(session_id.clone()) + } else { + None + } + }) + .collect::>(); + + sessions_to_end.iter().for_each(|session_id| { + if let Err(e) = self.end_session(peer_id, session_id) { + error!("Could not end session {session_id}: {e:?}"); + } + }); + } + #[instrument(level = "debug", skip(self))] /// Remove a peer, this can cause sessions to be ended fn remove_peer(&mut self, peer_id: &str) { info!(peer_id = %peer_id, "removing peer"); + let peer_status = match self.peers.remove(peer_id) { + Some(peer_status) => peer_status, + _ => return, + }; - self.listeners.remove(peer_id); + self.stop_producer(peer_id); - if let Some(consumers) = self.producers.remove(peer_id) { - for consumer_id in &consumers { - info!(producer_id=%peer_id, consumer_id=%consumer_id, "ended session"); - self.consumers.insert(consumer_id.clone(), None); - self.items.push_back(( - consumer_id.to_string(), - p::OutgoingMessage::EndSession { - peer_id: peer_id.to_string(), - }, - )); + for (id, p) in self.peers.iter() { + if !p.listening() { + continue; } - for listener in &self.listeners { - self.items.push_back(( - listener.to_string(), - p::OutgoingMessage::ProducerRemoved { - peer_id: peer_id.to_string(), - meta: match self.meta.get(peer_id) { - Some(meta) => meta.clone(), - None => Default::default(), - }, - }, - )); - } - } - - if let Some(Some(producer_id)) = self.consumers.remove(peer_id) { - info!(producer_id=%producer_id, consumer_id=%peer_id, "ended session"); - - self.producers - .get_mut(&producer_id) - .unwrap() - .remove(peer_id); - - self.items.push_back(( - producer_id.to_string(), - p::OutgoingMessage::EndSession { - peer_id: peer_id.to_string(), - }, - )); + let message = p::OutgoingMessage::PeerStatusChanged(PeerStatus { + roles: Default::default(), + meta: peer_status.meta.clone(), + peer_id: Some(peer_id.to_string()), + }); + self.items.push_back((id.to_string(), message)); } - - let _ = self.meta.remove(peer_id); } #[instrument(level = "debug", skip(self))] /// End a session between two peers - fn end_session(&mut self, peer_id: &str, other_peer_id: &str) -> Result<(), Error> { - info!(peer_id=%peer_id, other_peer_id=%other_peer_id, "endsession request"); - if let Some(ref mut consumers) = self.producers.get_mut(peer_id) { - if consumers.remove(other_peer_id) { - info!(producer_id=%peer_id, consumer_id=%other_peer_id, "ended session"); - - self.items.push_back(( - other_peer_id.to_string(), - p::OutgoingMessage::EndSession { - peer_id: peer_id.to_string(), - }, - )); - - self.consumers.insert(other_peer_id.to_string(), None); - Ok(()) - } else { - Err(anyhow!( - "Producer {} has no consumer {}", - peer_id, - other_peer_id - )) - } - } else if let Some(Some(producer_id)) = self.consumers.get(peer_id) { - if producer_id == other_peer_id { - info!(producer_id=%other_peer_id, consumer_id=%peer_id, "ended session"); - - self.consumers.insert(peer_id.to_string(), None); - self.producers - .get_mut(other_peer_id) - .unwrap() - .remove(peer_id); + fn end_session(&mut self, peer_id: &str, session_id: &str) -> Result<(), Error> { + let session = self + .sessions + .remove(session_id) + .with_context(|| format!("Session {session_id} doesn't exist"))?; - self.items.push_back(( - other_peer_id.to_string(), - p::OutgoingMessage::EndSession { - peer_id: peer_id.to_string(), - }, - )); + self.items.push_back(( + session.other_peer_id(peer_id)?.to_string(), + p::OutgoingMessage::EndSession(p::EndSessionMessage { + session_id: session_id.to_string(), + }), + )); - Ok(()) - } else { - Err(anyhow!( - "Consumer {} is not in a session with {}", - peer_id, - other_peer_id - )) - } - } else { - Err(anyhow!( - "No session between {} and {}", - peer_id, - other_peer_id - )) - } + Ok(()) } /// List producer peers @@ -195,15 +185,13 @@ impl Handler { peer_id.to_string(), p::OutgoingMessage::List { producers: self - .producers - .keys() - .cloned() - .map(|peer_id| p::Peer { - id: peer_id.clone(), - meta: self - .meta - .get(&peer_id) - .map_or_else(|| Default::default(), |m| m.clone()), + .peers + .iter() + .filter_map(|(peer_id, peer)| { + peer.producing().then_some(p::Peer { + id: peer_id.clone(), + meta: peer.meta.clone(), + }) }) .collect(), }, @@ -212,266 +200,94 @@ impl Handler { Ok(()) } - /// Handle ICE candidate sent by one peer to another peer + /// Register peer as a producer #[instrument(level = "debug", skip(self))] - fn handle_ice( - &mut self, - candidate: String, - sdp_m_line_index: u32, - peer_id: &str, - other_peer_id: &str, - ) -> Result<(), Error> { - if let Some(consumers) = self.producers.get(peer_id) { - if consumers.contains(other_peer_id) { - self.items.push_back(( - other_peer_id.to_string(), - p::OutgoingMessage::Peer(p::PeerMessage { - peer_id: peer_id.to_string(), - peer_message: p::PeerMessageInner::Ice { - candidate, - sdp_m_line_index, - }, - }), - )); - Ok(()) - } else { - Err(anyhow!( - "cannot forward ICE from {} to {} as they are not in a session", - peer_id, - other_peer_id - )) - } - } else if let Some(producer) = self.consumers.get(peer_id) { - if &Some(other_peer_id.to_string()) == producer { - self.items.push_back(( - other_peer_id.to_string(), - p::OutgoingMessage::Peer(p::PeerMessage { - peer_id: peer_id.to_string(), - peer_message: p::PeerMessageInner::Ice { - candidate, - sdp_m_line_index, - }, - }), - )); + fn set_peer_status(&mut self, peer_id: &str, status: &p::PeerStatus) -> Result<(), Error> { + let old_status = self + .peers + .get(peer_id) + .context(anyhow!("Peer '{peer_id}' hasn't been welcomed"))?; - Ok(()) - } else { - Err(anyhow!( - "cannot forward ICE from {} to {} as they are not in a session", - peer_id, - other_peer_id - )) - } - } else { - Err(anyhow!( - "cannot forward ICE from {} to {} as they are not in a session", - peer_id, - other_peer_id, - )) - } - } + if status == old_status { + info!("Status for '{}' hasn't changed", peer_id); - /// Handle SDP offered by one peer to another peer - #[instrument(level = "debug", skip(self))] - fn handle_sdp_offer( - &mut self, - sdp: String, - producer_id: &str, - consumer_id: &str, - ) -> Result<(), Error> { - if let Some(consumers) = self.producers.get(producer_id) { - if consumers.contains(consumer_id) { - self.items.push_back(( - consumer_id.to_string(), - p::OutgoingMessage::Peer(p::PeerMessage { - peer_id: producer_id.to_string(), - peer_message: p::PeerMessageInner::Sdp(p::SdpMessage::Offer { sdp }), - }), - )); - Ok(()) - } else { - Err(anyhow!( - "cannot forward offer from {} to {} as they are not in a session", - producer_id, - consumer_id - )) - } - } else { - Err(anyhow!( - "cannot forward offer from {} to {} as they are not in a session or {} is not the producer", - producer_id, - consumer_id, - producer_id, - )) + return Ok(()); } - } - /// Handle the SDP answer from one peer to another peer - #[instrument(level = "debug", skip(self))] - fn handle_sdp_answer( - &mut self, - sdp: String, - consumer_id: &str, - producer_id: &str, - ) -> Result<(), Error> { - if let Some(producer) = self.consumers.get(consumer_id) { - if &Some(producer_id.to_string()) == producer { - self.items.push_back(( - producer_id.to_string(), - p::OutgoingMessage::Peer(p::PeerMessage { - peer_id: consumer_id.to_string(), - peer_message: p::PeerMessageInner::Sdp(p::SdpMessage::Answer { sdp }), - }), - )); - Ok(()) - } else { - Err(anyhow!( - "cannot forward answer from {} to {} as they are not in a session", - consumer_id, - producer_id - )) - } - } else { - Err(anyhow!( - "cannot forward answer from {} to {} as they are not in a session", - consumer_id, - producer_id - )) + if old_status.producing() && !status.producing() { + self.stop_producer(peer_id); } - } - /// Register peer as a producer - #[instrument(level = "debug", skip(self))] - fn register_producer( - &mut self, - peer_id: &str, - meta: Option, - ) -> Result<(), Error> { - if self.producers.contains_key(peer_id) { - Err(anyhow!("{} is already registered as a producer", peer_id)) - } else { - self.producers.insert(peer_id.to_string(), HashSet::new()); - - for listener in &self.listeners { - self.items.push_back(( - listener.to_string(), - p::OutgoingMessage::ProducerAdded { - peer_id: peer_id.to_string(), - meta: meta.clone(), - }, - )); + let mut status = status.clone(); + status.peer_id = Some(peer_id.to_string()); + self.peers.insert(peer_id.to_string(), status.clone()); + for (id, peer) in &self.peers { + if !peer.listening() { + continue; } self.items.push_back(( - peer_id.to_string(), - p::OutgoingMessage::Registered(p::RegisteredMessage::Producer { - peer_id: peer_id.to_string(), - meta: meta.clone(), + id.to_string(), + p::OutgoingMessage::PeerStatusChanged(p::PeerStatus { + peer_id: Some(peer_id.to_string()), + roles: status.roles.clone(), + meta: status.meta.clone(), }), )); - - self.meta.insert(peer_id.to_string(), meta); - - info!(peer_id = %peer_id, "registered as a producer"); - - Ok(()) - } - } - - /// Register peer as a consumer - #[instrument(level = "debug", skip(self))] - fn register_consumer( - &mut self, - peer_id: &str, - meta: Option, - ) -> Result<(), Error> { - if self.consumers.contains_key(peer_id) { - Err(anyhow!("{} is already registered as a consumer", peer_id)) - } else { - self.consumers.insert(peer_id.to_string(), None); - - self.items.push_back(( - peer_id.to_string(), - p::OutgoingMessage::Registered(p::RegisteredMessage::Consumer { - peer_id: peer_id.to_string(), - meta: meta.clone(), - }), - )); - - self.meta.insert(peer_id.to_string(), meta); - - info!(peer_id = %peer_id, "registered as a consumer"); - - Ok(()) } - } - /// Register peer as a listener - #[instrument(level = "debug", skip(self))] - fn register_listener( - &mut self, - peer_id: &str, - meta: Option, - ) -> Result<(), Error> { - if !self.listeners.insert(peer_id.to_string()) { - Err(anyhow!("{} is already registered as a listener", peer_id)) - } else { - self.items.push_back(( - peer_id.to_string(), - p::OutgoingMessage::Registered(p::RegisteredMessage::Listener { - peer_id: peer_id.to_string(), - meta: meta.clone(), - }), - )); + info!(peer_id = %peer_id, "registered as a producer"); - self.meta.insert(peer_id.to_string(), meta); - - info!(peer_id = %peer_id, "registered as a listener"); - - Ok(()) - } + Ok(()) } /// Start a session between two peers #[instrument(level = "debug", skip(self))] fn start_session(&mut self, producer_id: &str, consumer_id: &str) -> Result<(), Error> { - if !self.consumers.contains_key(consumer_id) { - return Err(anyhow!( - "Peer with id {} is not registered as a consumer", - consumer_id - )); - } - - if let Some(producer_id) = self.consumers.get(consumer_id).unwrap() { - return Err(anyhow!( - "Consumer with id {} is already in a session with producer {}", - consumer_id, - producer_id, - )); - } - - if !self.producers.contains_key(producer_id) { - return Err(anyhow!( - "Peer with id {} is not registered as a producer", - producer_id - )); - } - - self.consumers - .insert(consumer_id.to_string(), Some(producer_id.to_string())); - self.producers - .get_mut(producer_id) - .unwrap() - .insert(consumer_id.to_string()); - + self.peers.get(producer_id).map_or_else( + || Err(anyhow!("Peer '{producer_id}' hasn't been welcomed")), + |peer| { + if !peer.producing() { + Err(anyhow!( + "Peer with id {} is not registered as a producer", + producer_id + )) + } else { + Ok(peer) + } + }, + )?; + + self.peers.get(consumer_id).map_or_else( + || Err(anyhow!("Peer '{consumer_id}' hasn't been welcomed")), + Ok, + )?; + + let session_id = uuid::Uuid::new_v4().to_string(); + self.sessions.insert( + session_id.clone(), + Session { + id: session_id.clone(), + consumer: consumer_id.to_string(), + producer: producer_id.to_string(), + }, + ); + self.items.push_back(( + consumer_id.to_string(), + p::OutgoingMessage::SessionStarted { + peer_id: producer_id.to_string(), + session_id: session_id.clone(), + }, + )); self.items.push_back(( producer_id.to_string(), p::OutgoingMessage::StartSession { peer_id: consumer_id.to_string(), + session_id: session_id.clone(), }, )); - info!(producer_id = %producer_id, consumer_id = %consumer_id, "started a session"); + info!(id = %session_id, producer_id = %producer_id, consumer_id = %consumer_id, "started a session"); Ok(()) } @@ -480,7 +296,7 @@ impl Handler { impl Stream for Handler { type Item = (String, p::OutgoingMessage); - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + fn poll_next(mut self: Pin<&mut Self>, cx: &mut TaskContext<'_>) -> Poll> { loop { let this = self.as_mut().project(); @@ -517,29 +333,44 @@ mod tests { use futures::channel::mpsc; use serde_json::json; + async fn new_peer( + tx: &mut mpsc::UnboundedSender<(String, Option)>, + handler: &mut Handler, + peer_id: &str, + ) { + tx.send((peer_id.to_string(), Some(p::IncomingMessage::NewPeer))) + .await + .unwrap(); + + let res = handler.next().await.unwrap(); + assert_eq!( + res, + ( + peer_id.to_string(), + p::OutgoingMessage::Welcome { + peer_id: peer_id.to_string() + } + ) + ); + } + #[async_std::test] async fn test_register_producer() { let (mut tx, rx) = mpsc::unbounded(); let mut handler = Handler::new(Box::pin(rx)); - let message = p::IncomingMessage::Register(p::RegisterMessage::Producer { - meta: Default::default(), - }); - - tx.send(("producer".to_string(), Some(message))) - .await - .unwrap(); - - let (peer_id, sent_message) = handler.next().await.unwrap(); + new_peer(&mut tx, &mut handler, "producer").await; - assert_eq!(peer_id, "producer"); - assert_eq!( - sent_message, - p::OutgoingMessage::Registered(p::RegisteredMessage::Producer { - peer_id: "producer".to_string(), - meta: Default::default(), - }) - ); + tx.send(( + "producer".to_string(), + Some(p::IncomingMessage::SetPeerStatus(p::PeerStatus { + roles: vec![p::PeerRole::Producer], + meta: None, + peer_id: None, + })), + )) + .await + .unwrap(); } #[async_std::test] @@ -547,18 +378,19 @@ mod tests { let (mut tx, rx) = mpsc::unbounded(); let mut handler = Handler::new(Box::pin(rx)); - let message = p::IncomingMessage::Register(p::RegisterMessage::Producer { - meta: Some(json!( {"display-name": "foobar".to_string() })), + new_peer(&mut tx, &mut handler, "producer").await; + + let message = p::IncomingMessage::SetPeerStatus(p::PeerStatus { + meta: Some(json!({"display-name":"foobar".to_string()})), + roles: vec![p::PeerRole::Producer], + peer_id: None, }); tx.send(("producer".to_string(), Some(message))) .await .unwrap(); - let _ = handler.next().await.unwrap(); - let message = p::IncomingMessage::List; - tx.send(("listener".to_string(), Some(message))) .await .unwrap(); @@ -580,58 +412,11 @@ mod tests { } #[async_std::test] - async fn test_register_consumer() { - let (mut tx, rx) = mpsc::unbounded(); - let mut handler = Handler::new(Box::pin(rx)); - - let message = p::IncomingMessage::Register(p::RegisterMessage::Consumer { - meta: Default::default(), - }); - - tx.send(("consumer".to_string(), Some(message))) - .await - .unwrap(); - - let (peer_id, sent_message) = handler.next().await.unwrap(); - - assert_eq!(peer_id, "consumer"); - assert_eq!( - sent_message, - p::OutgoingMessage::Registered(p::RegisteredMessage::Consumer { - peer_id: "consumer".to_string(), - meta: Default::default() - }) - ); - } - - #[async_std::test] - async fn test_register_producer_twice() { + async fn test_welcome() { let (mut tx, rx) = mpsc::unbounded(); let mut handler = Handler::new(Box::pin(rx)); - let message = p::IncomingMessage::Register(p::RegisterMessage::Producer { - meta: Default::default(), - }); - tx.send(("producer".to_string(), Some(message))) - .await - .unwrap(); - let _ = handler.next().await.unwrap(); - - let message = p::IncomingMessage::Register(p::RegisterMessage::Producer { - meta: Default::default(), - }); - tx.send(("producer".to_string(), Some(message))) - .await - .unwrap(); - let (peer_id, sent_message) = handler.next().await.unwrap(); - - assert_eq!(peer_id, "producer"); - assert_eq!( - sent_message, - p::OutgoingMessage::Error { - details: "producer is already registered as a producer".into() - } - ); + new_peer(&mut tx, &mut handler, "consumer").await; } #[async_std::test] @@ -639,18 +424,26 @@ mod tests { let (mut tx, rx) = mpsc::unbounded(); let mut handler = Handler::new(Box::pin(rx)); - let message = p::IncomingMessage::Register(p::RegisterMessage::Listener { - meta: Default::default(), + new_peer(&mut tx, &mut handler, "producer").await; + new_peer(&mut tx, &mut handler, "listener").await; + + let message = p::IncomingMessage::SetPeerStatus(p::PeerStatus { + roles: vec![p::PeerRole::Listener], + meta: None, + peer_id: None, }); tx.send(("listener".to_string(), Some(message))) .await .unwrap(); + let _ = handler.next().await.unwrap(); - let message = p::IncomingMessage::Register(p::RegisterMessage::Producer { + let message = p::IncomingMessage::SetPeerStatus(p::PeerStatus { + roles: vec![p::PeerRole::Producer], meta: Some(json!({ "display-name": "foobar".to_string(), })), + peer_id: None, }); tx.send(("producer".to_string(), Some(message))) .await @@ -660,12 +453,14 @@ mod tests { assert_eq!(peer_id, "listener"); assert_eq!( sent_message, - p::OutgoingMessage::ProducerAdded { - peer_id: "producer".to_string(), + p::OutgoingMessage::PeerStatusChanged(p::PeerStatus { + roles: vec![p::PeerRole::Producer], + peer_id: Some("producer".to_string()), meta: Some(json!({ - "display-name": Some("foobar".to_string()), - })) - } + "display-name": Some("foobar".to_string()), + } + )) + }) ); } @@ -674,21 +469,18 @@ mod tests { let (mut tx, rx) = mpsc::unbounded(); let mut handler = Handler::new(Box::pin(rx)); - let message = p::IncomingMessage::Register(p::RegisterMessage::Producer { - meta: Default::default(), + new_peer(&mut tx, &mut handler, "producer").await; + + let message = p::IncomingMessage::SetPeerStatus(p::PeerStatus { + roles: vec![p::PeerRole::Producer], + meta: None, + peer_id: None, }); tx.send(("producer".to_string(), Some(message))) .await .unwrap(); - let _ = handler.next().await.unwrap(); - let message = p::IncomingMessage::Register(p::RegisterMessage::Consumer { - meta: Default::default(), - }); - tx.send(("consumer".to_string(), Some(message))) - .await - .unwrap(); - let _ = handler.next().await.unwrap(); + new_peer(&mut tx, &mut handler, "consumer").await; let message = p::IncomingMessage::StartSession(p::StartSessionMessage { peer_id: "producer".to_string(), @@ -698,12 +490,25 @@ mod tests { .unwrap(); let (peer_id, sent_message) = handler.next().await.unwrap(); + assert_eq!(peer_id, "consumer"); + let session_id = match sent_message { + p::OutgoingMessage::SessionStarted { + ref peer_id, + ref session_id, + } => { + assert_eq!(peer_id, "producer"); + session_id.to_string() + } + _ => panic!("SessionStarted message missing {:?}", sent_message), + }; + let (peer_id, sent_message) = handler.next().await.unwrap(); assert_eq!(peer_id, "producer"); assert_eq!( sent_message, p::OutgoingMessage::StartSession { - peer_id: "consumer".to_string() + peer_id: "consumer".to_string(), + session_id: session_id.to_string(), } ); } @@ -713,21 +518,18 @@ mod tests { let (mut tx, rx) = mpsc::unbounded(); let mut handler = Handler::new(Box::pin(rx)); - let message = p::IncomingMessage::Register(p::RegisterMessage::Producer { - meta: Default::default(), + new_peer(&mut tx, &mut handler, "producer").await; + + let message = p::IncomingMessage::SetPeerStatus(p::PeerStatus { + roles: vec![p::PeerRole::Producer], + meta: None, + peer_id: None, }); tx.send(("producer".to_string(), Some(message))) .await .unwrap(); - let _ = handler.next().await.unwrap(); - let message = p::IncomingMessage::Register(p::RegisterMessage::Consumer { - meta: Default::default(), - }); - tx.send(("consumer".to_string(), Some(message))) - .await - .unwrap(); - let _ = handler.next().await.unwrap(); + new_peer(&mut tx, &mut handler, "consumer").await; let message = p::IncomingMessage::StartSession(p::StartSessionMessage { peer_id: "producer".to_string(), @@ -735,10 +537,36 @@ mod tests { tx.send(("consumer".to_string(), Some(message))) .await .unwrap(); - let _ = handler.next().await.unwrap(); + let (peer_id, sent_message) = handler.next().await.unwrap(); + assert_eq!(peer_id, "consumer"); + let session_id = match sent_message { + p::OutgoingMessage::SessionStarted { + ref peer_id, + ref session_id, + } => { + assert_eq!(peer_id, "producer"); + session_id.to_string() + } + _ => panic!("SessionStarted message missing"), + }; - let message = p::IncomingMessage::Register(p::RegisterMessage::Listener { - meta: Default::default(), + assert_eq!( + handler.next().await.unwrap(), + ( + "producer".into(), + p::OutgoingMessage::StartSession { + peer_id: "consumer".into(), + session_id: session_id.clone() + } + ) + ); + + new_peer(&mut tx, &mut handler, "listener").await; + + let message = p::IncomingMessage::SetPeerStatus(p::PeerStatus { + roles: vec![p::PeerRole::Listener], + meta: None, + peer_id: None, }); tx.send(("listener".to_string(), Some(message))) .await @@ -751,9 +579,7 @@ mod tests { assert_eq!(peer_id, "consumer"); assert_eq!( sent_message, - p::OutgoingMessage::EndSession { - peer_id: "producer".to_string() - } + p::OutgoingMessage::EndSession(p::EndSessionMessage { session_id }) ); let (peer_id, sent_message) = handler.next().await.unwrap(); @@ -761,10 +587,11 @@ mod tests { assert_eq!(peer_id, "listener"); assert_eq!( sent_message, - p::OutgoingMessage::ProducerRemoved { - peer_id: "producer".to_string(), + p::OutgoingMessage::PeerStatusChanged(PeerStatus { + roles: vec![], + peer_id: Some("producer".to_string()), meta: Default::default() - } + }) ); } @@ -773,21 +600,18 @@ mod tests { let (mut tx, rx) = mpsc::unbounded(); let mut handler = Handler::new(Box::pin(rx)); - let message = p::IncomingMessage::Register(p::RegisterMessage::Producer { - meta: Default::default(), + new_peer(&mut tx, &mut handler, "producer").await; + + let message = p::IncomingMessage::SetPeerStatus(p::PeerStatus { + roles: vec![p::PeerRole::Producer], + meta: None, + peer_id: None, }); tx.send(("producer".to_string(), Some(message))) .await .unwrap(); - let _ = handler.next().await.unwrap(); - let message = p::IncomingMessage::Register(p::RegisterMessage::Consumer { - meta: Default::default(), - }); - tx.send(("consumer".to_string(), Some(message))) - .await - .unwrap(); - let _ = handler.next().await.unwrap(); + new_peer(&mut tx, &mut handler, "consumer").await; let message = p::IncomingMessage::StartSession(p::StartSessionMessage { peer_id: "producer".to_string(), @@ -795,11 +619,25 @@ mod tests { tx.send(("consumer".to_string(), Some(message))) .await .unwrap(); + let (peer_id, sent_message) = handler.next().await.unwrap(); + assert_eq!(peer_id, "consumer"); + let session_id = match sent_message { + p::OutgoingMessage::SessionStarted { + ref peer_id, + ref session_id, + } => { + assert_eq!(peer_id, "producer"); + session_id.to_string() + } + _ => panic!("SessionStarted message missing"), + }; + let _ = handler.next().await.unwrap(); let message = p::IncomingMessage::EndSession(p::EndSessionMessage { - peer_id: "producer".to_string(), + session_id: session_id.clone(), }); + tx.send(("consumer".to_string(), Some(message))) .await .unwrap(); @@ -808,9 +646,9 @@ mod tests { assert_eq!(peer_id, "producer"); assert_eq!( sent_message, - p::OutgoingMessage::EndSession { - peer_id: "consumer".to_string() - } + p::OutgoingMessage::EndSession(p::EndSessionMessage { + session_id: session_id + }) ); } @@ -819,21 +657,18 @@ mod tests { let (mut tx, rx) = mpsc::unbounded(); let mut handler = Handler::new(Box::pin(rx)); - let message = p::IncomingMessage::Register(p::RegisterMessage::Producer { - meta: Default::default(), + new_peer(&mut tx, &mut handler, "producer").await; + + let message = p::IncomingMessage::SetPeerStatus(p::PeerStatus { + roles: vec![p::PeerRole::Producer], + meta: None, + peer_id: None, }); tx.send(("producer".to_string(), Some(message))) .await .unwrap(); - let _ = handler.next().await.unwrap(); - let message = p::IncomingMessage::Register(p::RegisterMessage::Consumer { - meta: Default::default(), - }); - tx.send(("consumer".to_string(), Some(message))) - .await - .unwrap(); - let _ = handler.next().await.unwrap(); + new_peer(&mut tx, &mut handler, "consumer").await; let message = p::IncomingMessage::StartSession(p::StartSessionMessage { peer_id: "producer".to_string(), @@ -841,10 +676,23 @@ mod tests { tx.send(("consumer".to_string(), Some(message))) .await .unwrap(); + let (peer_id, sent_message) = handler.next().await.unwrap(); + assert_eq!(peer_id, "consumer"); + let session_id = match sent_message { + p::OutgoingMessage::SessionStarted { + ref peer_id, + ref session_id, + } => { + assert_eq!(peer_id, "producer"); + session_id.to_string() + } + _ => panic!("SessionStarted message missing"), + }; + let _ = handler.next().await.unwrap(); let message = p::IncomingMessage::EndSession(p::EndSessionMessage { - peer_id: "consumer".to_string(), + session_id: session_id.clone(), }); tx.send(("producer".to_string(), Some(message))) .await @@ -854,9 +702,7 @@ mod tests { assert_eq!(peer_id, "consumer"); assert_eq!( sent_message, - p::OutgoingMessage::EndSession { - peer_id: "producer".to_string() - } + p::OutgoingMessage::EndSession(p::EndSessionMessage { session_id }) ); } @@ -865,21 +711,18 @@ mod tests { let (mut tx, rx) = mpsc::unbounded(); let mut handler = Handler::new(Box::pin(rx)); - let message = p::IncomingMessage::Register(p::RegisterMessage::Producer { - meta: Default::default(), + new_peer(&mut tx, &mut handler, "producer").await; + + let message = p::IncomingMessage::SetPeerStatus(p::PeerStatus { + roles: vec![p::PeerRole::Producer], + meta: None, + peer_id: None, }); tx.send(("producer".to_string(), Some(message))) .await .unwrap(); - let _ = handler.next().await.unwrap(); - let message = p::IncomingMessage::Register(p::RegisterMessage::Consumer { - meta: Default::default(), - }); - tx.send(("consumer".to_string(), Some(message))) - .await - .unwrap(); - let _ = handler.next().await.unwrap(); + new_peer(&mut tx, &mut handler, "consumer").await; let message = p::IncomingMessage::StartSession(p::StartSessionMessage { peer_id: "producer".to_string(), @@ -887,29 +730,52 @@ mod tests { tx.send(("consumer".to_string(), Some(message))) .await .unwrap(); + let (peer_id, sent_message) = handler.next().await.unwrap(); + assert_eq!(peer_id, "consumer"); + let session_id = match sent_message { + p::OutgoingMessage::SessionStarted { + ref peer_id, + ref session_id, + } => { + assert_eq!(peer_id, "producer"); + session_id.to_string() + } + _ => panic!("SessionStarted message missing"), + }; + let _ = handler.next().await.unwrap(); + // The consumer ends the session let message = p::IncomingMessage::EndSession(p::EndSessionMessage { - peer_id: "consumer".to_string(), + session_id: session_id.clone(), }); - tx.send(("producer".to_string(), Some(message))) + tx.send(("consumer".to_string(), Some(message))) .await .unwrap(); - let _ = handler.next().await.unwrap(); + + let (peer_id, sent_message) = handler.next().await.unwrap(); + + assert_eq!(peer_id, "producer"); + assert_eq!( + sent_message, + p::OutgoingMessage::EndSession(p::EndSessionMessage { + session_id: session_id.clone() + }) + ); let message = p::IncomingMessage::EndSession(p::EndSessionMessage { - peer_id: "consumer".to_string(), + session_id: session_id.clone(), }); - tx.send(("producer".to_string(), Some(message))) + tx.send(("consumer".to_string(), Some(message))) .await .unwrap(); let (peer_id, sent_message) = handler.next().await.unwrap(); - assert_eq!(peer_id, "producer"); + assert_eq!(peer_id, "consumer"); assert_eq!( sent_message, p::OutgoingMessage::Error { - details: "Producer producer has no consumer consumer".into() + details: format!("Session {session_id} doesn't exist") } ); } @@ -919,21 +785,18 @@ mod tests { let (mut tx, rx) = mpsc::unbounded(); let mut handler = Handler::new(Box::pin(rx)); - let message = p::IncomingMessage::Register(p::RegisterMessage::Producer { - meta: Default::default(), + new_peer(&mut tx, &mut handler, "producer").await; + + let message = p::IncomingMessage::SetPeerStatus(p::PeerStatus { + roles: vec![p::PeerRole::Producer], + meta: None, + peer_id: None, }); tx.send(("producer".to_string(), Some(message))) .await .unwrap(); - let _ = handler.next().await.unwrap(); - let message = p::IncomingMessage::Register(p::RegisterMessage::Consumer { - meta: Default::default(), - }); - tx.send(("consumer".to_string(), Some(message))) - .await - .unwrap(); - let _ = handler.next().await.unwrap(); + new_peer(&mut tx, &mut handler, "consumer").await; let message = p::IncomingMessage::StartSession(p::StartSessionMessage { peer_id: "producer".to_string(), @@ -941,10 +804,23 @@ mod tests { tx.send(("consumer".to_string(), Some(message))) .await .unwrap(); + let (peer_id, sent_message) = handler.next().await.unwrap(); + assert_eq!(peer_id, "consumer"); + let session_id = match sent_message { + p::OutgoingMessage::SessionStarted { + ref peer_id, + ref session_id, + } => { + assert_eq!(peer_id, "producer"); + session_id.to_string() + } + _ => panic!("SessionStarted message missing"), + }; + let _ = handler.next().await.unwrap(); let message = p::IncomingMessage::Peer(p::PeerMessage { - peer_id: "consumer".to_string(), + session_id: session_id.clone(), peer_message: p::PeerMessageInner::Sdp(p::SdpMessage::Offer { sdp: "offer".to_string(), }), @@ -958,7 +834,7 @@ mod tests { assert_eq!( sent_message, p::OutgoingMessage::Peer(p::PeerMessage { - peer_id: "producer".to_string(), + session_id: session_id.clone(), peer_message: p::PeerMessageInner::Sdp(p::SdpMessage::Offer { sdp: "offer".to_string() }) @@ -971,21 +847,18 @@ mod tests { let (mut tx, rx) = mpsc::unbounded(); let mut handler = Handler::new(Box::pin(rx)); - let message = p::IncomingMessage::Register(p::RegisterMessage::Producer { - meta: Default::default(), + new_peer(&mut tx, &mut handler, "producer").await; + + let message = p::IncomingMessage::SetPeerStatus(p::PeerStatus { + roles: vec![p::PeerRole::Producer], + meta: None, + peer_id: None, }); tx.send(("producer".to_string(), Some(message))) .await .unwrap(); - let _ = handler.next().await.unwrap(); - let message = p::IncomingMessage::Register(p::RegisterMessage::Consumer { - meta: Default::default(), - }); - tx.send(("consumer".to_string(), Some(message))) - .await - .unwrap(); - let _ = handler.next().await.unwrap(); + new_peer(&mut tx, &mut handler, "consumer").await; let message = p::IncomingMessage::StartSession(p::StartSessionMessage { peer_id: "producer".to_string(), @@ -993,10 +866,23 @@ mod tests { tx.send(("consumer".to_string(), Some(message))) .await .unwrap(); + let (peer_id, sent_message) = handler.next().await.unwrap(); + assert_eq!(peer_id, "consumer"); + let session_id = match sent_message { + p::OutgoingMessage::SessionStarted { + ref peer_id, + ref session_id, + } => { + assert_eq!(peer_id, "producer"); + session_id.to_string() + } + _ => panic!("SessionStarted message missing"), + }; + let _ = handler.next().await.unwrap(); let message = p::IncomingMessage::Peer(p::PeerMessage { - peer_id: "consumer".to_string(), + session_id: session_id.clone(), peer_message: p::PeerMessageInner::Ice { candidate: "candidate".to_string(), sdp_m_line_index: 42, @@ -1011,7 +897,7 @@ mod tests { assert_eq!( sent_message, p::OutgoingMessage::Peer(p::PeerMessage { - peer_id: "producer".to_string(), + session_id: session_id.clone(), peer_message: p::PeerMessageInner::Ice { candidate: "candidate".to_string(), sdp_m_line_index: 42 @@ -1020,7 +906,7 @@ mod tests { ); let message = p::IncomingMessage::Peer(p::PeerMessage { - peer_id: "producer".to_string(), + session_id: session_id.clone(), peer_message: p::PeerMessageInner::Ice { candidate: "candidate".to_string(), sdp_m_line_index: 42, @@ -1035,7 +921,7 @@ mod tests { assert_eq!( sent_message, p::OutgoingMessage::Peer(p::PeerMessage { - peer_id: "consumer".to_string(), + session_id: session_id.clone(), peer_message: p::PeerMessageInner::Ice { candidate: "candidate".to_string(), sdp_m_line_index: 42 @@ -1049,21 +935,18 @@ mod tests { let (mut tx, rx) = mpsc::unbounded(); let mut handler = Handler::new(Box::pin(rx)); - let message = p::IncomingMessage::Register(p::RegisterMessage::Producer { - meta: Default::default(), + new_peer(&mut tx, &mut handler, "producer").await; + + let message = p::IncomingMessage::SetPeerStatus(p::PeerStatus { + roles: vec![p::PeerRole::Producer], + meta: None, + peer_id: None, }); tx.send(("producer".to_string(), Some(message))) .await .unwrap(); - let _ = handler.next().await.unwrap(); - let message = p::IncomingMessage::Register(p::RegisterMessage::Consumer { - meta: Default::default(), - }); - tx.send(("consumer".to_string(), Some(message))) - .await - .unwrap(); - let _ = handler.next().await.unwrap(); + new_peer(&mut tx, &mut handler, "consumer").await; let message = p::IncomingMessage::StartSession(p::StartSessionMessage { peer_id: "producer".to_string(), @@ -1071,55 +954,234 @@ mod tests { tx.send(("consumer".to_string(), Some(message))) .await .unwrap(); + let (peer_id, sent_message) = handler.next().await.unwrap(); + assert_eq!(peer_id, "consumer"); + let session_id = match sent_message { + p::OutgoingMessage::SessionStarted { + ref peer_id, + ref session_id, + } => { + assert_eq!(peer_id, "producer"); + session_id.to_string() + } + _ => panic!("SessionStarted message missing"), + }; + let _ = handler.next().await.unwrap(); let message = p::IncomingMessage::Peer(p::PeerMessage { - peer_id: "producer".to_string(), + session_id, peer_message: p::PeerMessageInner::Sdp(p::SdpMessage::Offer { sdp: "offer".to_string(), }), }); + tx.send(("consumer".to_string(), Some(message))) + .await + .unwrap(); + let response = handler.next().await.unwrap(); + + assert_eq!(response, + ( + "consumer".into(), + p::OutgoingMessage::Error { + details: r#"cannot forward offer from "consumer" to "producer" as "consumer" is not the producer"#.into() + } + ) + ); + } + + #[async_std::test] + async fn test_start_session_no_producer() { + let (mut tx, rx) = mpsc::unbounded(); + let mut handler = Handler::new(Box::pin(rx)); + + new_peer(&mut tx, &mut handler, "consumer").await; + + let message = p::IncomingMessage::StartSession(p::StartSessionMessage { + peer_id: "producer".to_string(), + }); tx.send(("consumer".to_string(), Some(message))) .await .unwrap(); let (peer_id, sent_message) = handler.next().await.unwrap(); assert_eq!(peer_id, "consumer"); - assert_eq!(sent_message, + assert_eq!( + sent_message, p::OutgoingMessage::Error { - details: "cannot forward offer from consumer to producer as they are not in a session or consumer is not the producer".into() + details: "Peer 'producer' hasn't been welcomed".into() } ); } #[async_std::test] - async fn test_start_session_no_producer() { + async fn test_stop_producing() { let (mut tx, rx) = mpsc::unbounded(); let mut handler = Handler::new(Box::pin(rx)); - let message = p::IncomingMessage::Register(p::RegisterMessage::Consumer { - meta: Default::default(), + new_peer(&mut tx, &mut handler, "producer").await; + + let message = p::IncomingMessage::SetPeerStatus(p::PeerStatus { + roles: vec![p::PeerRole::Producer], + meta: None, + peer_id: None, + }); + tx.send(("producer".to_string(), Some(message))) + .await + .unwrap(); + + new_peer(&mut tx, &mut handler, "consumer").await; + + let message = p::IncomingMessage::StartSession(p::StartSessionMessage { + peer_id: "producer".to_string(), }); tx.send(("consumer".to_string(), Some(message))) .await .unwrap(); + let (peer_id, sent_message) = handler.next().await.unwrap(); + assert_eq!(peer_id, "consumer"); + let session_id = match sent_message { + p::OutgoingMessage::SessionStarted { + ref peer_id, + ref session_id, + } => { + assert_eq!(peer_id, "producer"); + session_id.to_string() + } + _ => panic!("SessionStarted message missing"), + }; + + let (peer_id, sent_message) = handler.next().await.unwrap(); + + assert_eq!(peer_id, "producer"); + assert_eq!( + sent_message, + p::OutgoingMessage::StartSession { + peer_id: "consumer".to_string(), + session_id: session_id.clone(), + } + ); + + let message = p::IncomingMessage::SetPeerStatus(p::PeerStatus { + roles: vec![], + meta: None, + peer_id: None, + }); + tx.send(("producer".to_string(), Some(message))) + .await + .unwrap(); + + let (peer_id, sent_message) = handler.next().await.unwrap(); + + assert_eq!(peer_id, "consumer"); + assert_eq!( + sent_message, + p::OutgoingMessage::EndSession(p::EndSessionMessage { + session_id: session_id.clone(), + }) + ); + } + + #[async_std::test] + async fn test_unregistering_with_listenners() { + let (mut tx, rx) = mpsc::unbounded(); + let mut handler = Handler::new(Box::pin(rx)); + + new_peer(&mut tx, &mut handler, "listener").await; + let message = p::IncomingMessage::SetPeerStatus(p::PeerStatus { + roles: vec![p::PeerRole::Listener], + meta: None, + peer_id: None, + }); + tx.send(("listener".to_string(), Some(message))) + .await + .unwrap(); let _ = handler.next().await.unwrap(); + new_peer(&mut tx, &mut handler, "producer").await; + let message = p::IncomingMessage::SetPeerStatus(p::PeerStatus { + roles: vec![p::PeerRole::Producer], + meta: None, + peer_id: None, + }); + tx.send(("producer".to_string(), Some(message))) + .await + .unwrap(); + + let (peer_id, sent_message) = handler.next().await.unwrap(); + assert_eq!(peer_id, "listener"); + assert_eq!( + sent_message, + p::OutgoingMessage::PeerStatusChanged(PeerStatus { + roles: vec![p::PeerRole::Producer], + peer_id: Some("producer".to_string()), + meta: Default::default() + }) + ); + + new_peer(&mut tx, &mut handler, "consumer").await; + let message = p::IncomingMessage::StartSession(p::StartSessionMessage { peer_id: "producer".to_string(), }); tx.send(("consumer".to_string(), Some(message))) .await .unwrap(); - let (peer_id, sent_message) = handler.next().await.unwrap(); + let (peer_id, sent_message) = handler.next().await.unwrap(); assert_eq!(peer_id, "consumer"); + let session_id = match sent_message { + p::OutgoingMessage::SessionStarted { + ref peer_id, + ref session_id, + } => { + assert_eq!(peer_id, "producer"); + session_id.to_string() + } + _ => panic!("SessionStarted message missing {:?}", sent_message), + }; + + let (peer_id, sent_message) = handler.next().await.unwrap(); + + assert_eq!(peer_id, "producer"); assert_eq!( sent_message, - p::OutgoingMessage::Error { - details: "Peer with id producer is not registered as a producer".into() + p::OutgoingMessage::StartSession { + peer_id: "consumer".to_string(), + session_id: session_id.clone(), } ); + + let message = p::IncomingMessage::SetPeerStatus(p::PeerStatus { + roles: vec![], + meta: None, + peer_id: None, + }); + tx.send(("producer".to_string(), Some(message))) + .await + .unwrap(); + + assert_eq!( + handler.next().await.unwrap(), + ( + "consumer".into(), + p::OutgoingMessage::EndSession(p::EndSessionMessage { + session_id: session_id.clone(), + }) + ) + ); + + assert_eq!( + handler.next().await.unwrap(), + ( + "listener".into(), + p::OutgoingMessage::PeerStatusChanged(PeerStatus { + roles: vec![], + peer_id: Some("producer".to_string()), + meta: Default::default() + }) + ) + ); } #[async_std::test] @@ -1127,13 +1189,15 @@ mod tests { let (mut tx, rx) = mpsc::unbounded(); let mut handler = Handler::new(Box::pin(rx)); - let message = p::IncomingMessage::Register(p::RegisterMessage::Producer { - meta: Default::default(), + new_peer(&mut tx, &mut handler, "producer").await; + let message = p::IncomingMessage::SetPeerStatus(p::PeerStatus { + roles: vec![p::PeerRole::Producer], + meta: None, + peer_id: None, }); tx.send(("producer".to_string(), Some(message))) .await .unwrap(); - let _ = handler.next().await.unwrap(); let message = p::IncomingMessage::StartSession(p::StartSessionMessage { peer_id: "producer".to_string(), @@ -1147,7 +1211,7 @@ mod tests { assert_eq!( sent_message, p::OutgoingMessage::Error { - details: "Peer with id consumer is not registered as a consumer".into() + details: "Peer 'consumer' hasn't been welcomed".into() } ); } @@ -1157,21 +1221,17 @@ mod tests { let (mut tx, rx) = mpsc::unbounded(); let mut handler = Handler::new(Box::pin(rx)); - let message = p::IncomingMessage::Register(p::RegisterMessage::Producer { - meta: Default::default(), + new_peer(&mut tx, &mut handler, "producer").await; + let message = p::IncomingMessage::SetPeerStatus(p::PeerStatus { + roles: vec![p::PeerRole::Producer], + meta: Some(json!( {"display-name": "foobar".to_string() })), + peer_id: None, }); tx.send(("producer".to_string(), Some(message))) .await .unwrap(); - let _ = handler.next().await.unwrap(); - let message = p::IncomingMessage::Register(p::RegisterMessage::Consumer { - meta: Default::default(), - }); - tx.send(("consumer".to_string(), Some(message))) - .await - .unwrap(); - let _ = handler.next().await.unwrap(); + new_peer(&mut tx, &mut handler, "consumer").await; let message = p::IncomingMessage::StartSession(p::StartSessionMessage { peer_id: "producer".to_string(), @@ -1179,6 +1239,19 @@ mod tests { tx.send(("consumer".to_string(), Some(message))) .await .unwrap(); + let (peer_id, sent_message) = handler.next().await.unwrap(); + assert_eq!(peer_id, "consumer"); + let session0_id = match sent_message { + p::OutgoingMessage::SessionStarted { + ref peer_id, + ref session_id, + } => { + assert_eq!(peer_id, "producer"); + session_id.to_string() + } + _ => panic!("SessionStarted message missing"), + }; + let _ = handler.next().await.unwrap(); let message = p::IncomingMessage::StartSession(p::StartSessionMessage { @@ -1189,14 +1262,18 @@ mod tests { .await .unwrap(); let (peer_id, sent_message) = handler.next().await.unwrap(); - assert_eq!(peer_id, "consumer"); - assert_eq!( - sent_message, - p::OutgoingMessage::Error { - details: "Consumer with id consumer is already in a session with producer producer" - .into() + let session1_id = match sent_message { + p::OutgoingMessage::SessionStarted { + ref peer_id, + ref session_id, + } => { + assert_eq!(peer_id, "producer"); + session_id.to_string() } - ); + _ => panic!("SessionStarted message missing"), + }; + + assert_ne!(session0_id, session1_id); } } diff --git a/signalling/src/server/mod.rs b/signalling/src/server/mod.rs index 5cf8f0a..8ece1ef 100644 --- a/signalling/src/server/mod.rs +++ b/signalling/src/server/mod.rs @@ -156,7 +156,24 @@ impl Server { let this_id_clone = this_id.clone(); let state_clone = self.state.clone(); let receive_task_handle = task::spawn(async move { + if let Some(tx) = tx.as_mut() { + if let Err(err) = tx + .send(( + this_id_clone.clone(), + Some( + serde_json::json!({ + "type": "newPeer", + }) + .to_string(), + ), + )) + .await + { + warn!(this = %this_id_clone, "Error handling message: {:?}", err); + } + } while let Some(msg) = ws_stream.next().await { + info!("Received message {msg:?}"); match msg { Ok(WsMessage::Text(msg)) => { if let Some(tx) = tx.as_mut() { diff --git a/www/webrtc.js b/www/webrtc.js index d003ccf..eb449d9 100644 --- a/www/webrtc.js +++ b/www/webrtc.js @@ -47,6 +47,7 @@ function Uint8ToString(u8a){ } function Session(our_id, peer_id, closed_callback) { + this.id = null; this.peer_connection = null; this.ws_conn = null; this.peer_id = peer_id; @@ -113,7 +114,7 @@ function Session(our_id, peer_id, closed_callback) { this.setStatus("Sending SDP answer"); var sdp = { 'type': 'peer', - 'peerId': this.peer_id, + 'sessionId': this.id, 'sdp': this.peer_connection.localDescription.toJSON() }; this.ws_conn.send(JSON.stringify(sdp)); @@ -164,6 +165,9 @@ function Session(our_id, peer_id, closed_callback) { if (msg.type == "registered") { this.setStatus("Registered with server"); this.connectPeer(); + } else if (msg.type == "sessionStarted") { + this.setStatus("Registered with server"); + this.id = msg.sessionId; } else if (msg.type == "error") { this.handleIncomingError(msg.details); } else if (msg.type == "endSession") { @@ -216,11 +220,8 @@ function Session(our_id, peer_id, closed_callback) { this.ws_conn = new WebSocket(ws_url); /* When connected, immediately register with the server */ this.ws_conn.addEventListener('open', (event) => { - this.ws_conn.send(JSON.stringify({ - "type": "register", - "peerType": "consumer" - })); - this.setStatus("Registering with server"); + this.setStatus("Connecting to the peer"); + this.connectPeer(); }); this.ws_conn.addEventListener('error', this.onServerError.bind(this)); this.ws_conn.addEventListener('message', this.onServerMessage.bind(this)); @@ -315,7 +316,7 @@ function Session(our_id, peer_id, closed_callback) { } this.ws_conn.send(JSON.stringify({ "type": "peer", - "peerId": this.peer_id, + "sessionId": this.id, "ice": event.candidate.toJSON() })); }; @@ -384,14 +385,15 @@ function onServerMessage(event) { msg = JSON.parse(event.data); } catch (e) { if (e instanceof SyntaxError) { - this.handleIncomingError("Error parsing incoming JSON: " + event.data); + console.error("Error parsing incoming JSON: " + event.data); } else { - this.handleIncomingError("Unknown error parsing response: " + event.data); + console.error("Unknown error parsing response: " + event.data); } return; } - if (msg.type == "registered") { + if (msg.type == "welcome") { + console.info(`Got welcomed with ID ${msg.peer_id}`); ws_conn.send(JSON.stringify({ "type": "list" })); @@ -400,13 +402,18 @@ function onServerMessage(event) { for (i = 0; i < msg.producers.length; i++) { addPeer(msg.producers[i].id, msg.producers[i].meta); } - } else if (msg.type == "producerAdded") { - addPeer(msg.peerId, msg.meta); - } else if (msg.type == "producerRemoved") { + } else if (msg.type == "peerStatusChanged") { var li = document.getElementById("peer-" + msg.peerId); - li.parentNode.removeChild(li); + if (msg.roles.includes("producer")) { + if (li == null) { + console.error('Adding peer'); + addPeer(msg.peerId, msg.meta); + } + } else if (li != null) { + li.parentNode.removeChild(li); + } } else { - this.handleIncomingError("Unsupported message: ", msg); + console.error("Unsupported message: ", msg); } }; @@ -445,8 +452,8 @@ function connect() { ws_conn = new WebSocket(ws_url); ws_conn.addEventListener('open', (event) => { ws_conn.send(JSON.stringify({ - "type": "register", - "peerType": "listener" + "type": "setPeerStatus", + "roles": ["listener"] })); }); ws_conn.addEventListener('error', onServerError);