diff options
Diffstat (limited to 'plugins/rtp/src/stream.vala')
-rw-r--r-- | plugins/rtp/src/stream.vala | 457 |
1 files changed, 292 insertions, 165 deletions
diff --git a/plugins/rtp/src/stream.vala b/plugins/rtp/src/stream.vala index bd8a279f..dc712b61 100644 --- a/plugins/rtp/src/stream.vala +++ b/plugins/rtp/src/stream.vala @@ -18,22 +18,19 @@ public class Dino.Plugins.Rtp.Stream : Xmpp.Xep.JingleRtp.Stream { private Gst.App.Sink send_rtcp; private Gst.App.Src recv_rtp; private Gst.App.Src recv_rtcp; - private Gst.Element encode; - private Gst.RTP.BasePayload encode_pay; private Gst.Element decode; private Gst.RTP.BaseDepayload decode_depay; private Gst.Element input; + private Gst.Pad input_pad; private Gst.Element output; private Gst.Element session; private Device _input_device; public Device input_device { get { return _input_device; } set { if (!paused) { - if (this._input_device != null) { - this._input_device.unlink(); - this._input_device = null; - } - set_input(value != null ? value.link_source() : null); + var input = this.input; + set_input(value != null ? value.link_source(payload_type, our_ssrc, next_seqnum_offset, next_timestamp_offset) : null); + if (this._input_device != null) this._input_device.unlink(input); } this._input_device = value; }} @@ -47,7 +44,16 @@ public class Dino.Plugins.Rtp.Stream : Xmpp.Xep.JingleRtp.Stream { public bool created { get; private set; default = false; } public bool paused { get; private set; default = false; } private bool push_recv_data = false; - private string participant_ssrc = null; + private uint our_ssrc = Random.next_int(); + private int next_seqnum_offset = -1; + private uint32 next_timestamp_offset_base = 0; + private int64 next_timestamp_offset_stamp = 0; + private uint32 next_timestamp_offset { get { + if (next_timestamp_offset_base == 0) return 0; + int64 monotonic_diff = get_monotonic_time() - next_timestamp_offset_stamp; + return next_timestamp_offset_base + (uint32)((double)monotonic_diff / 1000000.0 * payload_type.clockrate); + } } + private uint32 participant_ssrc = 0; private Gst.Pad recv_rtcp_sink_pad; private Gst.Pad recv_rtp_sink_pad; @@ -92,16 +98,22 @@ public class Dino.Plugins.Rtp.Stream : Xmpp.Xep.JingleRtp.Stream { send_rtp.async = false; send_rtp.caps = CodecUtil.get_caps(media, payload_type, false); send_rtp.emit_signals = true; - send_rtp.sync = false; + send_rtp.sync = true; + send_rtp.drop = true; + send_rtp.wait_on_eos = false; send_rtp.new_sample.connect(on_new_sample); + send_rtp.connect("signal::eos", on_eos_static, this); pipe.add(send_rtp); send_rtcp = Gst.ElementFactory.make("appsink", @"rtcp_sink_$rtpid") as Gst.App.Sink; send_rtcp.async = false; send_rtcp.caps = new Gst.Caps.empty_simple("application/x-rtcp"); send_rtcp.emit_signals = true; - send_rtcp.sync = false; + send_rtcp.sync = true; + send_rtcp.drop = true; + send_rtcp.wait_on_eos = false; send_rtcp.new_sample.connect(on_new_sample); + send_rtcp.connect("signal::eos", on_eos_static, this); pipe.add(send_rtcp); recv_rtp = Gst.ElementFactory.make("appsrc", @"rtp_src_$rtpid") as Gst.App.Src; @@ -125,18 +137,15 @@ public class Dino.Plugins.Rtp.Stream : Xmpp.Xep.JingleRtp.Stream { recv_rtcp.get_static_pad("src").link(recv_rtcp_sink_pad); // Connect input - encode = codec_util.get_encode_bin(media, payload_type, @"encode_$rtpid"); - encode_pay = (Gst.RTP.BasePayload)((Gst.Bin)encode).get_by_name(@"encode_$(rtpid)_rtp_pay"); - pipe.add(encode); send_rtp_sink_pad = rtpbin.get_request_pad(@"send_rtp_sink_$rtpid"); - encode.get_static_pad("src").link(send_rtp_sink_pad); if (input != null) { - input.link(encode); + input_pad = input.get_request_pad(@"src_$rtpid"); + input_pad.link(send_rtp_sink_pad); } // Connect output decode = codec_util.get_decode_bin(media, payload_type, @"decode_$rtpid"); - decode_depay = (Gst.RTP.BaseDepayload)((Gst.Bin)encode).get_by_name(@"decode_$(rtpid)_rtp_depay"); + decode_depay = (Gst.RTP.BaseDepayload)((Gst.Bin)decode).get_by_name(@"decode_$(rtpid)_rtp_depay"); pipe.add(decode); if (output != null) { decode.link(output); @@ -151,7 +160,7 @@ public class Dino.Plugins.Rtp.Stream : Xmpp.Xep.JingleRtp.Stream { plugin.unpause(); GLib.Signal.emit_by_name(rtpbin, "get-session", rtpid, out session); - if (session != null && payload_type.rtcp_fbs.any_match((it) => it.type_ == "goog-remb")) { + if (session != null && remb_enabled) { Object internal_session; session.@get("internal-session", out internal_session); if (internal_session != null) { @@ -159,15 +168,16 @@ public class Dino.Plugins.Rtp.Stream : Xmpp.Xep.JingleRtp.Stream { } Timeout.add(1000, () => remb_adjust()); } - if (media == "video") { - codec_util.update_bitrate(media, payload_type, encode, 256); + if (input_device != null && media == "video") { + input_device.update_bitrate(payload_type, target_send_bitrate); } } - private uint remb = 256; private int last_packets_lost = -1; - private uint64 last_packets_received; - private uint64 last_octets_received; + private uint64 last_packets_received = 0; + private uint64 last_octets_received = 0; + private uint max_target_receive_bitrate = 0; + private int64 last_remb_time = 0; private bool remb_adjust() { unowned Gst.Structure? stats; if (session == null) { @@ -185,73 +195,95 @@ public class Dino.Plugins.Rtp.Stream : Xmpp.Xep.JingleRtp.Stream { warning("No source-stats for session %u", rtpid); return Source.REMOVE; } + + if (input_device == null) return Source.CONTINUE; + foreach (Value value in source_stats.values) { unowned Gst.Structure source_stat = (Gst.Structure) value.get_boxed(); - uint ssrc; + uint32 ssrc; if (!source_stat.get_uint("ssrc", out ssrc)) continue; - if (ssrc.to_string() == participant_ssrc) { + if (ssrc == participant_ssrc) { int packets_lost; uint64 packets_received, octets_received; source_stat.get_int("packets-lost", out packets_lost); source_stat.get_uint64("packets-received", out packets_received); source_stat.get_uint64("octets-received", out octets_received); int new_lost = packets_lost - last_packets_lost; + if (new_lost < 0) new_lost = 0; uint64 new_received = packets_received - last_packets_received; + if (packets_received < last_packets_received) new_received = 0; uint64 new_octets = octets_received - last_octets_received; + if (octets_received < last_octets_received) octets_received = 0; if (new_received == 0) continue; last_packets_lost = packets_lost; last_packets_received = packets_received; last_octets_received = octets_received; double loss_rate = (double)new_lost / (double)(new_lost + new_received); + uint new_target_receive_bitrate; if (new_lost <= 0 || loss_rate < 0.02) { - remb = (uint)(1.08 * (double)remb); + new_target_receive_bitrate = (uint)(1.08 * (double)target_receive_bitrate); } else if (loss_rate > 0.1) { - remb = (uint)((1.0 - 0.5 * loss_rate) * (double)remb); + new_target_receive_bitrate = (uint)((1.0 - 0.5 * loss_rate) * (double)target_receive_bitrate); + } else { + new_target_receive_bitrate = target_receive_bitrate; } - remb = uint.max(remb, (uint)((new_octets * 8) / 1000)); - remb = uint.max(16, remb); // Never go below 16 - uint8[] data = new uint8[] { - 143, 206, 0, 5, - 0, 0, 0, 0, - 0, 0, 0, 0, - 'R', 'E', 'M', 'B', - 1, 0, 0, 0, - 0, 0, 0, 0 - }; - data[4] = (uint8)((encode_pay.ssrc >> 24) & 0xff); - data[5] = (uint8)((encode_pay.ssrc >> 16) & 0xff); - data[6] = (uint8)((encode_pay.ssrc >> 8) & 0xff); - data[7] = (uint8)(encode_pay.ssrc & 0xff); - uint8 br_exp = 0; - uint32 br_mant = remb * 1000; - uint8 bits = (uint8)Math.log2(br_mant); - if (bits > 16) { - br_exp = (uint8)bits - 16; - br_mant = br_mant >> br_exp; + if (last_remb_time == 0) { + last_remb_time = get_monotonic_time(); + } else { + int64 time_now = get_monotonic_time(); + int64 time_diff = time_now - last_remb_time; + last_remb_time = time_now; + uint actual_bitrate = (uint)(((double)new_octets * 8.0) * (double)time_diff / 1000.0 / 1000000.0); + new_target_receive_bitrate = uint.max(new_target_receive_bitrate, (uint)(0.9 * (double)actual_bitrate)); + max_target_receive_bitrate = uint.max((uint)(1.5 * (double)actual_bitrate), max_target_receive_bitrate); + new_target_receive_bitrate = uint.min(new_target_receive_bitrate, max_target_receive_bitrate); + } + new_target_receive_bitrate = uint.max(16, new_target_receive_bitrate); // Never go below 16 + if (new_target_receive_bitrate != target_receive_bitrate) { + target_receive_bitrate = new_target_receive_bitrate; + uint8[] data = new uint8[] { + 143, 206, 0, 5, + 0, 0, 0, 0, + 0, 0, 0, 0, + 'R', 'E', 'M', 'B', + 1, 0, 0, 0, + 0, 0, 0, 0 + }; + data[4] = (uint8)((our_ssrc >> 24) & 0xff); + data[5] = (uint8)((our_ssrc >> 16) & 0xff); + data[6] = (uint8)((our_ssrc >> 8) & 0xff); + data[7] = (uint8)(our_ssrc & 0xff); + uint8 br_exp = 0; + uint32 br_mant = target_receive_bitrate * 1000; + uint8 bits = (uint8)Math.log2(br_mant); + if (bits > 16) { + br_exp = (uint8)bits - 16; + br_mant = br_mant >> br_exp; + } + data[17] = (uint8)((br_exp << 2) | ((br_mant >> 16) & 0x3)); + data[18] = (uint8)((br_mant >> 8) & 0xff); + data[19] = (uint8)(br_mant & 0xff); + data[20] = (uint8)((ssrc >> 24) & 0xff); + data[21] = (uint8)((ssrc >> 16) & 0xff); + data[22] = (uint8)((ssrc >> 8) & 0xff); + data[23] = (uint8)(ssrc & 0xff); + encrypt_and_send_rtcp(data); } - data[17] = (uint8)((br_exp << 2) | ((br_mant >> 16) & 0x3)); - data[18] = (uint8)((br_mant >> 8) & 0xff); - data[19] = (uint8)(br_mant & 0xff); - data[20] = (uint8)((ssrc >> 24) & 0xff); - data[21] = (uint8)((ssrc >> 16) & 0xff); - data[22] = (uint8)((ssrc >> 8) & 0xff); - data[23] = (uint8)(ssrc & 0xff); - encrypt_and_send_rtcp(data); } } return Source.CONTINUE; } private static void on_feedback_rtcp(Gst.Element session, uint type, uint fbtype, uint sender_ssrc, uint media_ssrc, Gst.Buffer? fci, Stream self) { - if (type == 206 && fbtype == 15 && fci != null && sender_ssrc.to_string() == self.participant_ssrc) { + if (self.input_device != null && self.media == "video" && type == 206 && fbtype == 15 && fci != null && sender_ssrc == self.participant_ssrc) { // https://tools.ietf.org/html/draft-alvestrand-rmcat-remb-03 uint8[] data; fci.extract_dup(0, fci.get_size(), out data); if (data[0] != 'R' || data[1] != 'E' || data[2] != 'M' || data[3] != 'B') return; uint8 br_exp = data[5] >> 2; uint32 br_mant = (((uint32)data[5] & 0x3) << 16) + ((uint32)data[6] << 8) + (uint32)data[7]; - uint bitrate = (br_mant << br_exp) / 1000; - self.codec_util.update_bitrate(self.media, self.payload_type, self.encode, bitrate * 8); + self.target_send_bitrate = (br_mant << br_exp) / 1000; + self.input_device.update_bitrate(self.payload_type, self.target_send_bitrate); } } @@ -267,32 +299,63 @@ public class Dino.Plugins.Rtp.Stream : Xmpp.Xep.JingleRtp.Stream { debug("Sink is null"); return Gst.FlowReturn.EOS; } + if (sink != send_rtp && sink != send_rtcp) { + warning("unknown sample"); + return Gst.FlowReturn.NOT_SUPPORTED; + } Gst.Sample sample = sink.pull_sample(); Gst.Buffer buffer = sample.get_buffer(); + if (sink == send_rtp) { + uint buffer_ssrc = 0, buffer_seq = 0; + Gst.RTP.Buffer rtp_buffer; + if (Gst.RTP.Buffer.map(buffer, Gst.MapFlags.READ, out rtp_buffer)) { + buffer_ssrc = rtp_buffer.get_ssrc(); + buffer_seq = rtp_buffer.get_seq(); + next_seqnum_offset = rtp_buffer.get_seq() + 1; + next_timestamp_offset_base = rtp_buffer.get_timestamp(); + next_timestamp_offset_stamp = get_monotonic_time(); + rtp_buffer.unmap(); + } + if (our_ssrc != buffer_ssrc) { + warning("Sending RTP %s buffer seq %u with SSRC %u when our ssrc is %u", media, buffer_seq, buffer_ssrc, our_ssrc); + } else { + debug("Sending RTP %s buffer seq %u with SSRC %u", media, buffer_seq, buffer_ssrc); + } + } + + prepare_local_crypto(); + uint8[] data; buffer.extract_dup(0, buffer.get_size(), out data); - prepare_local_crypto(); if (sink == send_rtp) { - if (crypto_session.has_encrypt) { - data = crypto_session.encrypt_rtp(data); - } - on_send_rtp_data(new Bytes.take((owned) data)); + encrypt_and_send_rtp((owned) data); } else if (sink == send_rtcp) { encrypt_and_send_rtcp((owned) data); - } else { - warning("unknown sample"); } return Gst.FlowReturn.OK; } + private void encrypt_and_send_rtp(owned uint8[] data) { + Bytes bytes; + if (crypto_session.has_encrypt) { + bytes = new Bytes.take(crypto_session.encrypt_rtp(data)); + } else { + bytes = new Bytes.take(data); + } + on_send_rtp_data(bytes); + } + private void encrypt_and_send_rtcp(owned uint8[] data) { + Bytes bytes; if (crypto_session.has_encrypt) { - data = crypto_session.encrypt_rtcp(data); + bytes = new Bytes.take(crypto_session.encrypt_rtcp(data)); + } else { + bytes = new Bytes.take(data); } if (rtcp_mux) { - on_send_rtp_data(new Bytes.take((owned) data)); + on_send_rtp_data(bytes); } else { - on_send_rtcp_data(new Bytes.take((owned) data)); + on_send_rtcp_data(bytes); } } @@ -300,41 +363,59 @@ public class Dino.Plugins.Rtp.Stream : Xmpp.Xep.JingleRtp.Stream { return Gst.PadProbeReturn.DROP; } + private static void on_eos_static(Gst.App.Sink sink, Stream self) { + debug("EOS on %s", sink.name); + if (sink == self.send_rtp) { + Idle.add(() => { self.on_send_rtp_eos(); return Source.REMOVE; }); + } else if (sink == self.send_rtcp) { + Idle.add(() => { self.on_send_rtcp_eos(); return Source.REMOVE; }); + } + } + + private void on_send_rtp_eos() { + if (send_rtp_src_pad != null) { + send_rtp_src_pad.unlink(send_rtp.get_static_pad("sink")); + send_rtp_src_pad = null; + } + send_rtp.set_locked_state(true); + send_rtp.set_state(Gst.State.NULL); + pipe.remove(send_rtp); + send_rtp = null; + debug("Stopped sending RTP for %u", rtpid); + } + + private void on_send_rtcp_eos() { + send_rtcp.set_locked_state(true); + send_rtcp.set_state(Gst.State.NULL); + pipe.remove(send_rtcp); + send_rtcp = null; + debug("Stopped sending RTCP for %u", rtpid); + } + public override void destroy() { // Stop network communication push_recv_data = false; - recv_rtp.end_of_stream(); - recv_rtcp.end_of_stream(); - send_rtp.new_sample.disconnect(on_new_sample); - send_rtcp.new_sample.disconnect(on_new_sample); + if (recv_rtp != null) recv_rtp.end_of_stream(); + if (recv_rtcp != null) recv_rtcp.end_of_stream(); + if (send_rtp != null) send_rtp.new_sample.disconnect(on_new_sample); + if (send_rtcp != null) send_rtcp.new_sample.disconnect(on_new_sample); // Disconnect input device if (input != null) { - input.unlink(encode); - input = null; + input_pad.unlink(send_rtp_sink_pad); + input.release_request_pad(input_pad); + input_pad = null; } if (this._input_device != null) { - if (!paused) this._input_device.unlink(); + if (!paused) this._input_device.unlink(input); this._input_device = null; + this.input = null; } - // Disconnect encode - encode.set_locked_state(true); - encode.set_state(Gst.State.NULL); - encode.get_static_pad("src").unlink(send_rtp_sink_pad); - pipe.remove(encode); - encode = null; - encode_pay = null; - - // Disconnect RTP sending - if (send_rtp_src_pad != null) { - send_rtp_src_pad.add_probe(Gst.PadProbeType.BLOCK, drop_probe); - send_rtp_src_pad.unlink(send_rtp.get_static_pad("sink")); + // Inject EOS + if (send_rtp_sink_pad != null) { + send_rtp_sink_pad.send_event(new Gst.Event.eos()); } - send_rtp.set_locked_state(true); - send_rtp.set_state(Gst.State.NULL); - pipe.remove(send_rtp); - send_rtp = null; // Disconnect decode if (recv_rtp_src_pad != null) { @@ -342,57 +423,63 @@ public class Dino.Plugins.Rtp.Stream : Xmpp.Xep.JingleRtp.Stream { recv_rtp_src_pad.unlink(decode.get_static_pad("sink")); } - // Disconnect RTP receiving - recv_rtp.set_locked_state(true); - recv_rtp.set_state(Gst.State.NULL); - recv_rtp.get_static_pad("src").unlink(recv_rtp_sink_pad); - pipe.remove(recv_rtp); - recv_rtp = null; - // Disconnect output if (output != null) { + decode.get_static_pad("src").add_probe(Gst.PadProbeType.BLOCK, drop_probe); decode.unlink(output); } - decode.set_locked_state(true); - decode.set_state(Gst.State.NULL); - pipe.remove(decode); - decode = null; - decode_depay = null; - output = null; // Disconnect output device if (this._output_device != null) { - this._output_device.unlink(); + this._output_device.unlink(output); this._output_device = null; } + output = null; - // Disconnect RTCP receiving - recv_rtcp.get_static_pad("src").unlink(recv_rtcp_sink_pad); - recv_rtcp.set_locked_state(true); - recv_rtcp.set_state(Gst.State.NULL); - pipe.remove(recv_rtcp); - recv_rtcp = null; + // Destroy decode + if (decode != null) { + decode.set_locked_state(true); + decode.set_state(Gst.State.NULL); + pipe.remove(decode); + decode = null; + decode_depay = null; + } - // Disconnect RTCP sending - send_rtcp_src_pad.unlink(send_rtcp.get_static_pad("sink")); - send_rtcp.set_locked_state(true); - send_rtcp.set_state(Gst.State.NULL); - pipe.remove(send_rtcp); - send_rtcp = null; + // Disconnect and remove RTP input + if (recv_rtp != null) { + recv_rtp.get_static_pad("src").unlink(recv_rtp_sink_pad); + recv_rtp.set_locked_state(true); + recv_rtp.set_state(Gst.State.NULL); + pipe.remove(recv_rtp); + recv_rtp = null; + } - // Release rtp pads - rtpbin.release_request_pad(send_rtp_sink_pad); - send_rtp_sink_pad = null; - rtpbin.release_request_pad(recv_rtp_sink_pad); - recv_rtp_sink_pad = null; - rtpbin.release_request_pad(recv_rtcp_sink_pad); - recv_rtcp_sink_pad = null; - rtpbin.release_request_pad(send_rtcp_src_pad); - send_rtcp_src_pad = null; - send_rtp_src_pad = null; - recv_rtp_src_pad = null; + // Disconnect and remove RTCP input + if (recv_rtcp != null) { + recv_rtcp.get_static_pad("src").unlink(recv_rtcp_sink_pad); + recv_rtcp.set_locked_state(true); + recv_rtcp.set_state(Gst.State.NULL); + pipe.remove(recv_rtcp); + recv_rtcp = null; + } - session = null; + // Release rtp pads + if (send_rtp_sink_pad != null) { + rtpbin.release_request_pad(send_rtp_sink_pad); + send_rtp_sink_pad = null; + } + if (recv_rtp_sink_pad != null) { + rtpbin.release_request_pad(recv_rtp_sink_pad); + recv_rtp_sink_pad = null; + } + if (send_rtcp_src_pad != null) { + rtpbin.release_request_pad(send_rtcp_src_pad); + send_rtcp_src_pad = null; + } + if (recv_rtcp_sink_pad != null) { + rtpbin.release_request_pad(recv_rtcp_sink_pad); + recv_rtcp_sink_pad = null; + } } private void prepare_remote_crypto() { @@ -410,17 +497,38 @@ public class Dino.Plugins.Rtp.Stream : Xmpp.Xep.JingleRtp.Stream { on_recv_rtcp_data(bytes); return; } - prepare_remote_crypto(); - uint8[] data = bytes.get_data(); - if (crypto_session.has_decrypt) { - try { - data = crypto_session.decrypt_rtp(data); - } catch (Error e) { - warning("%s (%d)", e.message, e.code); +#if GST_1_16 + { + Gst.Buffer buffer = new Gst.Buffer.wrapped_bytes(bytes); + Gst.RTP.Buffer rtp_buffer; + uint buffer_ssrc = 0, buffer_seq = 0; + if (Gst.RTP.Buffer.map(buffer, Gst.MapFlags.READ, out rtp_buffer)) { + buffer_ssrc = rtp_buffer.get_ssrc(); + buffer_seq = rtp_buffer.get_seq(); + rtp_buffer.unmap(); } + debug("Received RTP %s buffer seq %u with SSRC %u", media, buffer_seq, buffer_ssrc); } +#endif if (push_recv_data) { - Gst.Buffer buffer = new Gst.Buffer.wrapped((owned) data); + prepare_remote_crypto(); + + Gst.Buffer buffer; + if (crypto_session.has_decrypt) { + try { + buffer = new Gst.Buffer.wrapped(crypto_session.decrypt_rtp(bytes.get_data())); + } catch (Error e) { + warning("%s (%d)", e.message, e.code); + return; + } + } else { +#if GST_1_16 + buffer = new Gst.Buffer.wrapped_bytes(bytes); +#else + buffer = new Gst.Buffer.wrapped(bytes.get_data()); +#endif + } + Gst.RTP.Buffer rtp_buffer; if (Gst.RTP.Buffer.map(buffer, Gst.MapFlags.READ, out rtp_buffer)) { if (rtp_buffer.get_extension()) { @@ -448,11 +556,7 @@ public class Dino.Plugins.Rtp.Stream : Xmpp.Xep.JingleRtp.Stream { rtp_buffer.unmap(); } - // FIXME: VAPI file in Vala < 0.49.1 has a bug that results in broken ownership of buffer in push_buffer() - // We workaround by using the plain signal. The signal unfortunately will cause an unnecessary copy of - // the underlying buffer, so and some point we should move over to the new version (once we require - // Vala >= 0.50) -#if FIXED_APPSRC_PUSH_BUFFER_IN_VAPI +#if VALA_0_50 recv_rtp.push_buffer((owned) buffer); #else Gst.FlowReturn ret; @@ -462,19 +566,26 @@ public class Dino.Plugins.Rtp.Stream : Xmpp.Xep.JingleRtp.Stream { } public override void on_recv_rtcp_data(Bytes bytes) { - prepare_remote_crypto(); - uint8[] data = bytes.get_data(); - if (crypto_session.has_decrypt) { - try { - data = crypto_session.decrypt_rtcp(data); - } catch (Error e) { - warning("%s (%d)", e.message, e.code); - } - } if (push_recv_data) { - Gst.Buffer buffer = new Gst.Buffer.wrapped((owned) data); - // See above -#if FIXED_APPSRC_PUSH_BUFFER_IN_VAPI + prepare_remote_crypto(); + + Gst.Buffer buffer; + if (crypto_session.has_decrypt) { + try { + buffer = new Gst.Buffer.wrapped(crypto_session.decrypt_rtcp(bytes.get_data())); + } catch (Error e) { + warning("%s (%d)", e.message, e.code); + return; + } + } else { +#if GST_1_16 + buffer = new Gst.Buffer.wrapped_bytes(bytes); +#else + buffer = new Gst.Buffer.wrapped(bytes.get_data()); +#endif + } + +#if VALA_0_50 recv_rtcp.push_buffer((owned) buffer); #else Gst.FlowReturn ret; @@ -502,10 +613,10 @@ public class Dino.Plugins.Rtp.Stream : Xmpp.Xep.JingleRtp.Stream { debug("RTCP is ready, resending rtcp: %s", rtp_sent.to_string()); } - public void on_ssrc_pad_added(string ssrc, Gst.Pad pad) { - debug("New ssrc %s with pad %s", ssrc, pad.name); - if (participant_ssrc != null && participant_ssrc != ssrc) { - warning("Got second ssrc on stream (old: %s, new: %s), ignoring", participant_ssrc, ssrc); + public void on_ssrc_pad_added(uint32 ssrc, Gst.Pad pad) { + debug("New ssrc %u with pad %s", ssrc, pad.name); + if (participant_ssrc != 0 && participant_ssrc != ssrc) { + warning("Got second ssrc on stream (old: %u, new: %u), ignoring", participant_ssrc, ssrc); return; } participant_ssrc = ssrc; @@ -534,7 +645,9 @@ public class Dino.Plugins.Rtp.Stream : Xmpp.Xep.JingleRtp.Stream { private void set_input_and_pause(Gst.Element? input, bool paused) { if (created && this.input != null) { - this.input.unlink(encode); + this.input_pad.unlink(send_rtp_sink_pad); + this.input.release_request_pad(this.input_pad); + this.input_pad = null; this.input = null; } @@ -543,28 +656,42 @@ public class Dino.Plugins.Rtp.Stream : Xmpp.Xep.JingleRtp.Stream { if (created && sending && !paused && input != null) { plugin.pause(); - input.link(encode); + input_pad = input.get_request_pad(@"src_$rtpid"); + input_pad.link(send_rtp_sink_pad); plugin.unpause(); } } public void pause() { if (paused) return; + var input = this.input; set_input_and_pause(null, true); - if (input_device != null) input_device.unlink(); + if (input != null && input_device != null) input_device.unlink(input); } public void unpause() { if (!paused) return; - set_input_and_pause(input_device != null ? input_device.link_source() : null, false); + set_input_and_pause(input_device != null ? input_device.link_source(payload_type, our_ssrc, next_seqnum_offset, next_timestamp_offset) : null, false); + input_device.update_bitrate(payload_type, target_send_bitrate); + } + + public uint get_participant_ssrc(Xmpp.Jid participant) { + if (participant.equals(content.session.peer_full_jid)) { + return participant_ssrc; + } + return 0; } ulong block_probe_handler_id = 0; - public virtual void add_output(Gst.Element element) { + public virtual void add_output(Gst.Element element, Xmpp.Jid? participant = null) { if (output != null) { critical("add_output() invoked more than once"); return; } + if (participant != null) { + critical("add_output() invoked with participant when not supported"); + return; + } this.output = element; if (created) { plugin.pause(); @@ -586,7 +713,7 @@ public class Dino.Plugins.Rtp.Stream : Xmpp.Xep.JingleRtp.Stream { decode.unlink(element); } if (this._output_device != null) { - this._output_device.unlink(); + this._output_device.unlink(element); this._output_device = null; } this.output = null; @@ -657,7 +784,7 @@ public class Dino.Plugins.Rtp.VideoStream : Stream { disconnect(video_orientation_changed_handler); } - public override void add_output(Gst.Element element) { + public override void add_output(Gst.Element element, Xmpp.Jid? participant) { if (element == output_tee || element == rotate) { base.add_output(element); return; @@ -678,4 +805,4 @@ public class Dino.Plugins.Rtp.VideoStream : Stream { output_tee.unlink(element); } } -}
\ No newline at end of file +} |