aboutsummaryrefslogtreecommitdiff
path: root/plugins/rtp/src/stream.vala
diff options
context:
space:
mode:
authorfiaxh <git@lightrise.org>2021-12-23 00:44:51 +0100
committerfiaxh <git@lightrise.org>2021-12-23 00:46:58 +0100
commitd02c5bc55d4325de308a592e8980139aa0634215 (patch)
tree9cb91f86056dd29f863bd8e54580cccc625af60e /plugins/rtp/src/stream.vala
parent1378224444b1862ac95783ac2bae7d25a0a8862d (diff)
parentf0c7dd0682fec8d72c644d8e54896de7bdc40ddb (diff)
downloaddino-d02c5bc55d4325de308a592e8980139aa0634215.tar.gz
dino-d02c5bc55d4325de308a592e8980139aa0634215.zip
Merge branch groupcalls
Diffstat (limited to 'plugins/rtp/src/stream.vala')
-rw-r--r--plugins/rtp/src/stream.vala457
1 files changed, 292 insertions, 165 deletions
diff --git a/plugins/rtp/src/stream.vala b/plugins/rtp/src/stream.vala
index bd8a279f..dc712b61 100644
--- a/plugins/rtp/src/stream.vala
+++ b/plugins/rtp/src/stream.vala
@@ -18,22 +18,19 @@ public class Dino.Plugins.Rtp.Stream : Xmpp.Xep.JingleRtp.Stream {
private Gst.App.Sink send_rtcp;
private Gst.App.Src recv_rtp;
private Gst.App.Src recv_rtcp;
- private Gst.Element encode;
- private Gst.RTP.BasePayload encode_pay;
private Gst.Element decode;
private Gst.RTP.BaseDepayload decode_depay;
private Gst.Element input;
+ private Gst.Pad input_pad;
private Gst.Element output;
private Gst.Element session;
private Device _input_device;
public Device input_device { get { return _input_device; } set {
if (!paused) {
- if (this._input_device != null) {
- this._input_device.unlink();
- this._input_device = null;
- }
- set_input(value != null ? value.link_source() : null);
+ var input = this.input;
+ set_input(value != null ? value.link_source(payload_type, our_ssrc, next_seqnum_offset, next_timestamp_offset) : null);
+ if (this._input_device != null) this._input_device.unlink(input);
}
this._input_device = value;
}}
@@ -47,7 +44,16 @@ public class Dino.Plugins.Rtp.Stream : Xmpp.Xep.JingleRtp.Stream {
public bool created { get; private set; default = false; }
public bool paused { get; private set; default = false; }
private bool push_recv_data = false;
- private string participant_ssrc = null;
+ private uint our_ssrc = Random.next_int();
+ private int next_seqnum_offset = -1;
+ private uint32 next_timestamp_offset_base = 0;
+ private int64 next_timestamp_offset_stamp = 0;
+ private uint32 next_timestamp_offset { get {
+ if (next_timestamp_offset_base == 0) return 0;
+ int64 monotonic_diff = get_monotonic_time() - next_timestamp_offset_stamp;
+ return next_timestamp_offset_base + (uint32)((double)monotonic_diff / 1000000.0 * payload_type.clockrate);
+ } }
+ private uint32 participant_ssrc = 0;
private Gst.Pad recv_rtcp_sink_pad;
private Gst.Pad recv_rtp_sink_pad;
@@ -92,16 +98,22 @@ public class Dino.Plugins.Rtp.Stream : Xmpp.Xep.JingleRtp.Stream {
send_rtp.async = false;
send_rtp.caps = CodecUtil.get_caps(media, payload_type, false);
send_rtp.emit_signals = true;
- send_rtp.sync = false;
+ send_rtp.sync = true;
+ send_rtp.drop = true;
+ send_rtp.wait_on_eos = false;
send_rtp.new_sample.connect(on_new_sample);
+ send_rtp.connect("signal::eos", on_eos_static, this);
pipe.add(send_rtp);
send_rtcp = Gst.ElementFactory.make("appsink", @"rtcp_sink_$rtpid") as Gst.App.Sink;
send_rtcp.async = false;
send_rtcp.caps = new Gst.Caps.empty_simple("application/x-rtcp");
send_rtcp.emit_signals = true;
- send_rtcp.sync = false;
+ send_rtcp.sync = true;
+ send_rtcp.drop = true;
+ send_rtcp.wait_on_eos = false;
send_rtcp.new_sample.connect(on_new_sample);
+ send_rtcp.connect("signal::eos", on_eos_static, this);
pipe.add(send_rtcp);
recv_rtp = Gst.ElementFactory.make("appsrc", @"rtp_src_$rtpid") as Gst.App.Src;
@@ -125,18 +137,15 @@ public class Dino.Plugins.Rtp.Stream : Xmpp.Xep.JingleRtp.Stream {
recv_rtcp.get_static_pad("src").link(recv_rtcp_sink_pad);
// Connect input
- encode = codec_util.get_encode_bin(media, payload_type, @"encode_$rtpid");
- encode_pay = (Gst.RTP.BasePayload)((Gst.Bin)encode).get_by_name(@"encode_$(rtpid)_rtp_pay");
- pipe.add(encode);
send_rtp_sink_pad = rtpbin.get_request_pad(@"send_rtp_sink_$rtpid");
- encode.get_static_pad("src").link(send_rtp_sink_pad);
if (input != null) {
- input.link(encode);
+ input_pad = input.get_request_pad(@"src_$rtpid");
+ input_pad.link(send_rtp_sink_pad);
}
// Connect output
decode = codec_util.get_decode_bin(media, payload_type, @"decode_$rtpid");
- decode_depay = (Gst.RTP.BaseDepayload)((Gst.Bin)encode).get_by_name(@"decode_$(rtpid)_rtp_depay");
+ decode_depay = (Gst.RTP.BaseDepayload)((Gst.Bin)decode).get_by_name(@"decode_$(rtpid)_rtp_depay");
pipe.add(decode);
if (output != null) {
decode.link(output);
@@ -151,7 +160,7 @@ public class Dino.Plugins.Rtp.Stream : Xmpp.Xep.JingleRtp.Stream {
plugin.unpause();
GLib.Signal.emit_by_name(rtpbin, "get-session", rtpid, out session);
- if (session != null && payload_type.rtcp_fbs.any_match((it) => it.type_ == "goog-remb")) {
+ if (session != null && remb_enabled) {
Object internal_session;
session.@get("internal-session", out internal_session);
if (internal_session != null) {
@@ -159,15 +168,16 @@ public class Dino.Plugins.Rtp.Stream : Xmpp.Xep.JingleRtp.Stream {
}
Timeout.add(1000, () => remb_adjust());
}
- if (media == "video") {
- codec_util.update_bitrate(media, payload_type, encode, 256);
+ if (input_device != null && media == "video") {
+ input_device.update_bitrate(payload_type, target_send_bitrate);
}
}
- private uint remb = 256;
private int last_packets_lost = -1;
- private uint64 last_packets_received;
- private uint64 last_octets_received;
+ private uint64 last_packets_received = 0;
+ private uint64 last_octets_received = 0;
+ private uint max_target_receive_bitrate = 0;
+ private int64 last_remb_time = 0;
private bool remb_adjust() {
unowned Gst.Structure? stats;
if (session == null) {
@@ -185,73 +195,95 @@ public class Dino.Plugins.Rtp.Stream : Xmpp.Xep.JingleRtp.Stream {
warning("No source-stats for session %u", rtpid);
return Source.REMOVE;
}
+
+ if (input_device == null) return Source.CONTINUE;
+
foreach (Value value in source_stats.values) {
unowned Gst.Structure source_stat = (Gst.Structure) value.get_boxed();
- uint ssrc;
+ uint32 ssrc;
if (!source_stat.get_uint("ssrc", out ssrc)) continue;
- if (ssrc.to_string() == participant_ssrc) {
+ if (ssrc == participant_ssrc) {
int packets_lost;
uint64 packets_received, octets_received;
source_stat.get_int("packets-lost", out packets_lost);
source_stat.get_uint64("packets-received", out packets_received);
source_stat.get_uint64("octets-received", out octets_received);
int new_lost = packets_lost - last_packets_lost;
+ if (new_lost < 0) new_lost = 0;
uint64 new_received = packets_received - last_packets_received;
+ if (packets_received < last_packets_received) new_received = 0;
uint64 new_octets = octets_received - last_octets_received;
+ if (octets_received < last_octets_received) octets_received = 0;
if (new_received == 0) continue;
last_packets_lost = packets_lost;
last_packets_received = packets_received;
last_octets_received = octets_received;
double loss_rate = (double)new_lost / (double)(new_lost + new_received);
+ uint new_target_receive_bitrate;
if (new_lost <= 0 || loss_rate < 0.02) {
- remb = (uint)(1.08 * (double)remb);
+ new_target_receive_bitrate = (uint)(1.08 * (double)target_receive_bitrate);
} else if (loss_rate > 0.1) {
- remb = (uint)((1.0 - 0.5 * loss_rate) * (double)remb);
+ new_target_receive_bitrate = (uint)((1.0 - 0.5 * loss_rate) * (double)target_receive_bitrate);
+ } else {
+ new_target_receive_bitrate = target_receive_bitrate;
}
- remb = uint.max(remb, (uint)((new_octets * 8) / 1000));
- remb = uint.max(16, remb); // Never go below 16
- uint8[] data = new uint8[] {
- 143, 206, 0, 5,
- 0, 0, 0, 0,
- 0, 0, 0, 0,
- 'R', 'E', 'M', 'B',
- 1, 0, 0, 0,
- 0, 0, 0, 0
- };
- data[4] = (uint8)((encode_pay.ssrc >> 24) & 0xff);
- data[5] = (uint8)((encode_pay.ssrc >> 16) & 0xff);
- data[6] = (uint8)((encode_pay.ssrc >> 8) & 0xff);
- data[7] = (uint8)(encode_pay.ssrc & 0xff);
- uint8 br_exp = 0;
- uint32 br_mant = remb * 1000;
- uint8 bits = (uint8)Math.log2(br_mant);
- if (bits > 16) {
- br_exp = (uint8)bits - 16;
- br_mant = br_mant >> br_exp;
+ if (last_remb_time == 0) {
+ last_remb_time = get_monotonic_time();
+ } else {
+ int64 time_now = get_monotonic_time();
+ int64 time_diff = time_now - last_remb_time;
+ last_remb_time = time_now;
+ uint actual_bitrate = (uint)(((double)new_octets * 8.0) * (double)time_diff / 1000.0 / 1000000.0);
+ new_target_receive_bitrate = uint.max(new_target_receive_bitrate, (uint)(0.9 * (double)actual_bitrate));
+ max_target_receive_bitrate = uint.max((uint)(1.5 * (double)actual_bitrate), max_target_receive_bitrate);
+ new_target_receive_bitrate = uint.min(new_target_receive_bitrate, max_target_receive_bitrate);
+ }
+ new_target_receive_bitrate = uint.max(16, new_target_receive_bitrate); // Never go below 16
+ if (new_target_receive_bitrate != target_receive_bitrate) {
+ target_receive_bitrate = new_target_receive_bitrate;
+ uint8[] data = new uint8[] {
+ 143, 206, 0, 5,
+ 0, 0, 0, 0,
+ 0, 0, 0, 0,
+ 'R', 'E', 'M', 'B',
+ 1, 0, 0, 0,
+ 0, 0, 0, 0
+ };
+ data[4] = (uint8)((our_ssrc >> 24) & 0xff);
+ data[5] = (uint8)((our_ssrc >> 16) & 0xff);
+ data[6] = (uint8)((our_ssrc >> 8) & 0xff);
+ data[7] = (uint8)(our_ssrc & 0xff);
+ uint8 br_exp = 0;
+ uint32 br_mant = target_receive_bitrate * 1000;
+ uint8 bits = (uint8)Math.log2(br_mant);
+ if (bits > 16) {
+ br_exp = (uint8)bits - 16;
+ br_mant = br_mant >> br_exp;
+ }
+ data[17] = (uint8)((br_exp << 2) | ((br_mant >> 16) & 0x3));
+ data[18] = (uint8)((br_mant >> 8) & 0xff);
+ data[19] = (uint8)(br_mant & 0xff);
+ data[20] = (uint8)((ssrc >> 24) & 0xff);
+ data[21] = (uint8)((ssrc >> 16) & 0xff);
+ data[22] = (uint8)((ssrc >> 8) & 0xff);
+ data[23] = (uint8)(ssrc & 0xff);
+ encrypt_and_send_rtcp(data);
}
- data[17] = (uint8)((br_exp << 2) | ((br_mant >> 16) & 0x3));
- data[18] = (uint8)((br_mant >> 8) & 0xff);
- data[19] = (uint8)(br_mant & 0xff);
- data[20] = (uint8)((ssrc >> 24) & 0xff);
- data[21] = (uint8)((ssrc >> 16) & 0xff);
- data[22] = (uint8)((ssrc >> 8) & 0xff);
- data[23] = (uint8)(ssrc & 0xff);
- encrypt_and_send_rtcp(data);
}
}
return Source.CONTINUE;
}
private static void on_feedback_rtcp(Gst.Element session, uint type, uint fbtype, uint sender_ssrc, uint media_ssrc, Gst.Buffer? fci, Stream self) {
- if (type == 206 && fbtype == 15 && fci != null && sender_ssrc.to_string() == self.participant_ssrc) {
+ if (self.input_device != null && self.media == "video" && type == 206 && fbtype == 15 && fci != null && sender_ssrc == self.participant_ssrc) {
// https://tools.ietf.org/html/draft-alvestrand-rmcat-remb-03
uint8[] data;
fci.extract_dup(0, fci.get_size(), out data);
if (data[0] != 'R' || data[1] != 'E' || data[2] != 'M' || data[3] != 'B') return;
uint8 br_exp = data[5] >> 2;
uint32 br_mant = (((uint32)data[5] & 0x3) << 16) + ((uint32)data[6] << 8) + (uint32)data[7];
- uint bitrate = (br_mant << br_exp) / 1000;
- self.codec_util.update_bitrate(self.media, self.payload_type, self.encode, bitrate * 8);
+ self.target_send_bitrate = (br_mant << br_exp) / 1000;
+ self.input_device.update_bitrate(self.payload_type, self.target_send_bitrate);
}
}
@@ -267,32 +299,63 @@ public class Dino.Plugins.Rtp.Stream : Xmpp.Xep.JingleRtp.Stream {
debug("Sink is null");
return Gst.FlowReturn.EOS;
}
+ if (sink != send_rtp && sink != send_rtcp) {
+ warning("unknown sample");
+ return Gst.FlowReturn.NOT_SUPPORTED;
+ }
Gst.Sample sample = sink.pull_sample();
Gst.Buffer buffer = sample.get_buffer();
+ if (sink == send_rtp) {
+ uint buffer_ssrc = 0, buffer_seq = 0;
+ Gst.RTP.Buffer rtp_buffer;
+ if (Gst.RTP.Buffer.map(buffer, Gst.MapFlags.READ, out rtp_buffer)) {
+ buffer_ssrc = rtp_buffer.get_ssrc();
+ buffer_seq = rtp_buffer.get_seq();
+ next_seqnum_offset = rtp_buffer.get_seq() + 1;
+ next_timestamp_offset_base = rtp_buffer.get_timestamp();
+ next_timestamp_offset_stamp = get_monotonic_time();
+ rtp_buffer.unmap();
+ }
+ if (our_ssrc != buffer_ssrc) {
+ warning("Sending RTP %s buffer seq %u with SSRC %u when our ssrc is %u", media, buffer_seq, buffer_ssrc, our_ssrc);
+ } else {
+ debug("Sending RTP %s buffer seq %u with SSRC %u", media, buffer_seq, buffer_ssrc);
+ }
+ }
+
+ prepare_local_crypto();
+
uint8[] data;
buffer.extract_dup(0, buffer.get_size(), out data);
- prepare_local_crypto();
if (sink == send_rtp) {
- if (crypto_session.has_encrypt) {
- data = crypto_session.encrypt_rtp(data);
- }
- on_send_rtp_data(new Bytes.take((owned) data));
+ encrypt_and_send_rtp((owned) data);
} else if (sink == send_rtcp) {
encrypt_and_send_rtcp((owned) data);
- } else {
- warning("unknown sample");
}
return Gst.FlowReturn.OK;
}
+ private void encrypt_and_send_rtp(owned uint8[] data) {
+ Bytes bytes;
+ if (crypto_session.has_encrypt) {
+ bytes = new Bytes.take(crypto_session.encrypt_rtp(data));
+ } else {
+ bytes = new Bytes.take(data);
+ }
+ on_send_rtp_data(bytes);
+ }
+
private void encrypt_and_send_rtcp(owned uint8[] data) {
+ Bytes bytes;
if (crypto_session.has_encrypt) {
- data = crypto_session.encrypt_rtcp(data);
+ bytes = new Bytes.take(crypto_session.encrypt_rtcp(data));
+ } else {
+ bytes = new Bytes.take(data);
}
if (rtcp_mux) {
- on_send_rtp_data(new Bytes.take((owned) data));
+ on_send_rtp_data(bytes);
} else {
- on_send_rtcp_data(new Bytes.take((owned) data));
+ on_send_rtcp_data(bytes);
}
}
@@ -300,41 +363,59 @@ public class Dino.Plugins.Rtp.Stream : Xmpp.Xep.JingleRtp.Stream {
return Gst.PadProbeReturn.DROP;
}
+ private static void on_eos_static(Gst.App.Sink sink, Stream self) {
+ debug("EOS on %s", sink.name);
+ if (sink == self.send_rtp) {
+ Idle.add(() => { self.on_send_rtp_eos(); return Source.REMOVE; });
+ } else if (sink == self.send_rtcp) {
+ Idle.add(() => { self.on_send_rtcp_eos(); return Source.REMOVE; });
+ }
+ }
+
+ private void on_send_rtp_eos() {
+ if (send_rtp_src_pad != null) {
+ send_rtp_src_pad.unlink(send_rtp.get_static_pad("sink"));
+ send_rtp_src_pad = null;
+ }
+ send_rtp.set_locked_state(true);
+ send_rtp.set_state(Gst.State.NULL);
+ pipe.remove(send_rtp);
+ send_rtp = null;
+ debug("Stopped sending RTP for %u", rtpid);
+ }
+
+ private void on_send_rtcp_eos() {
+ send_rtcp.set_locked_state(true);
+ send_rtcp.set_state(Gst.State.NULL);
+ pipe.remove(send_rtcp);
+ send_rtcp = null;
+ debug("Stopped sending RTCP for %u", rtpid);
+ }
+
public override void destroy() {
// Stop network communication
push_recv_data = false;
- recv_rtp.end_of_stream();
- recv_rtcp.end_of_stream();
- send_rtp.new_sample.disconnect(on_new_sample);
- send_rtcp.new_sample.disconnect(on_new_sample);
+ if (recv_rtp != null) recv_rtp.end_of_stream();
+ if (recv_rtcp != null) recv_rtcp.end_of_stream();
+ if (send_rtp != null) send_rtp.new_sample.disconnect(on_new_sample);
+ if (send_rtcp != null) send_rtcp.new_sample.disconnect(on_new_sample);
// Disconnect input device
if (input != null) {
- input.unlink(encode);
- input = null;
+ input_pad.unlink(send_rtp_sink_pad);
+ input.release_request_pad(input_pad);
+ input_pad = null;
}
if (this._input_device != null) {
- if (!paused) this._input_device.unlink();
+ if (!paused) this._input_device.unlink(input);
this._input_device = null;
+ this.input = null;
}
- // Disconnect encode
- encode.set_locked_state(true);
- encode.set_state(Gst.State.NULL);
- encode.get_static_pad("src").unlink(send_rtp_sink_pad);
- pipe.remove(encode);
- encode = null;
- encode_pay = null;
-
- // Disconnect RTP sending
- if (send_rtp_src_pad != null) {
- send_rtp_src_pad.add_probe(Gst.PadProbeType.BLOCK, drop_probe);
- send_rtp_src_pad.unlink(send_rtp.get_static_pad("sink"));
+ // Inject EOS
+ if (send_rtp_sink_pad != null) {
+ send_rtp_sink_pad.send_event(new Gst.Event.eos());
}
- send_rtp.set_locked_state(true);
- send_rtp.set_state(Gst.State.NULL);
- pipe.remove(send_rtp);
- send_rtp = null;
// Disconnect decode
if (recv_rtp_src_pad != null) {
@@ -342,57 +423,63 @@ public class Dino.Plugins.Rtp.Stream : Xmpp.Xep.JingleRtp.Stream {
recv_rtp_src_pad.unlink(decode.get_static_pad("sink"));
}
- // Disconnect RTP receiving
- recv_rtp.set_locked_state(true);
- recv_rtp.set_state(Gst.State.NULL);
- recv_rtp.get_static_pad("src").unlink(recv_rtp_sink_pad);
- pipe.remove(recv_rtp);
- recv_rtp = null;
-
// Disconnect output
if (output != null) {
+ decode.get_static_pad("src").add_probe(Gst.PadProbeType.BLOCK, drop_probe);
decode.unlink(output);
}
- decode.set_locked_state(true);
- decode.set_state(Gst.State.NULL);
- pipe.remove(decode);
- decode = null;
- decode_depay = null;
- output = null;
// Disconnect output device
if (this._output_device != null) {
- this._output_device.unlink();
+ this._output_device.unlink(output);
this._output_device = null;
}
+ output = null;
- // Disconnect RTCP receiving
- recv_rtcp.get_static_pad("src").unlink(recv_rtcp_sink_pad);
- recv_rtcp.set_locked_state(true);
- recv_rtcp.set_state(Gst.State.NULL);
- pipe.remove(recv_rtcp);
- recv_rtcp = null;
+ // Destroy decode
+ if (decode != null) {
+ decode.set_locked_state(true);
+ decode.set_state(Gst.State.NULL);
+ pipe.remove(decode);
+ decode = null;
+ decode_depay = null;
+ }
- // Disconnect RTCP sending
- send_rtcp_src_pad.unlink(send_rtcp.get_static_pad("sink"));
- send_rtcp.set_locked_state(true);
- send_rtcp.set_state(Gst.State.NULL);
- pipe.remove(send_rtcp);
- send_rtcp = null;
+ // Disconnect and remove RTP input
+ if (recv_rtp != null) {
+ recv_rtp.get_static_pad("src").unlink(recv_rtp_sink_pad);
+ recv_rtp.set_locked_state(true);
+ recv_rtp.set_state(Gst.State.NULL);
+ pipe.remove(recv_rtp);
+ recv_rtp = null;
+ }
- // Release rtp pads
- rtpbin.release_request_pad(send_rtp_sink_pad);
- send_rtp_sink_pad = null;
- rtpbin.release_request_pad(recv_rtp_sink_pad);
- recv_rtp_sink_pad = null;
- rtpbin.release_request_pad(recv_rtcp_sink_pad);
- recv_rtcp_sink_pad = null;
- rtpbin.release_request_pad(send_rtcp_src_pad);
- send_rtcp_src_pad = null;
- send_rtp_src_pad = null;
- recv_rtp_src_pad = null;
+ // Disconnect and remove RTCP input
+ if (recv_rtcp != null) {
+ recv_rtcp.get_static_pad("src").unlink(recv_rtcp_sink_pad);
+ recv_rtcp.set_locked_state(true);
+ recv_rtcp.set_state(Gst.State.NULL);
+ pipe.remove(recv_rtcp);
+ recv_rtcp = null;
+ }
- session = null;
+ // Release rtp pads
+ if (send_rtp_sink_pad != null) {
+ rtpbin.release_request_pad(send_rtp_sink_pad);
+ send_rtp_sink_pad = null;
+ }
+ if (recv_rtp_sink_pad != null) {
+ rtpbin.release_request_pad(recv_rtp_sink_pad);
+ recv_rtp_sink_pad = null;
+ }
+ if (send_rtcp_src_pad != null) {
+ rtpbin.release_request_pad(send_rtcp_src_pad);
+ send_rtcp_src_pad = null;
+ }
+ if (recv_rtcp_sink_pad != null) {
+ rtpbin.release_request_pad(recv_rtcp_sink_pad);
+ recv_rtcp_sink_pad = null;
+ }
}
private void prepare_remote_crypto() {
@@ -410,17 +497,38 @@ public class Dino.Plugins.Rtp.Stream : Xmpp.Xep.JingleRtp.Stream {
on_recv_rtcp_data(bytes);
return;
}
- prepare_remote_crypto();
- uint8[] data = bytes.get_data();
- if (crypto_session.has_decrypt) {
- try {
- data = crypto_session.decrypt_rtp(data);
- } catch (Error e) {
- warning("%s (%d)", e.message, e.code);
+#if GST_1_16
+ {
+ Gst.Buffer buffer = new Gst.Buffer.wrapped_bytes(bytes);
+ Gst.RTP.Buffer rtp_buffer;
+ uint buffer_ssrc = 0, buffer_seq = 0;
+ if (Gst.RTP.Buffer.map(buffer, Gst.MapFlags.READ, out rtp_buffer)) {
+ buffer_ssrc = rtp_buffer.get_ssrc();
+ buffer_seq = rtp_buffer.get_seq();
+ rtp_buffer.unmap();
}
+ debug("Received RTP %s buffer seq %u with SSRC %u", media, buffer_seq, buffer_ssrc);
}
+#endif
if (push_recv_data) {
- Gst.Buffer buffer = new Gst.Buffer.wrapped((owned) data);
+ prepare_remote_crypto();
+
+ Gst.Buffer buffer;
+ if (crypto_session.has_decrypt) {
+ try {
+ buffer = new Gst.Buffer.wrapped(crypto_session.decrypt_rtp(bytes.get_data()));
+ } catch (Error e) {
+ warning("%s (%d)", e.message, e.code);
+ return;
+ }
+ } else {
+#if GST_1_16
+ buffer = new Gst.Buffer.wrapped_bytes(bytes);
+#else
+ buffer = new Gst.Buffer.wrapped(bytes.get_data());
+#endif
+ }
+
Gst.RTP.Buffer rtp_buffer;
if (Gst.RTP.Buffer.map(buffer, Gst.MapFlags.READ, out rtp_buffer)) {
if (rtp_buffer.get_extension()) {
@@ -448,11 +556,7 @@ public class Dino.Plugins.Rtp.Stream : Xmpp.Xep.JingleRtp.Stream {
rtp_buffer.unmap();
}
- // FIXME: VAPI file in Vala < 0.49.1 has a bug that results in broken ownership of buffer in push_buffer()
- // We workaround by using the plain signal. The signal unfortunately will cause an unnecessary copy of
- // the underlying buffer, so and some point we should move over to the new version (once we require
- // Vala >= 0.50)
-#if FIXED_APPSRC_PUSH_BUFFER_IN_VAPI
+#if VALA_0_50
recv_rtp.push_buffer((owned) buffer);
#else
Gst.FlowReturn ret;
@@ -462,19 +566,26 @@ public class Dino.Plugins.Rtp.Stream : Xmpp.Xep.JingleRtp.Stream {
}
public override void on_recv_rtcp_data(Bytes bytes) {
- prepare_remote_crypto();
- uint8[] data = bytes.get_data();
- if (crypto_session.has_decrypt) {
- try {
- data = crypto_session.decrypt_rtcp(data);
- } catch (Error e) {
- warning("%s (%d)", e.message, e.code);
- }
- }
if (push_recv_data) {
- Gst.Buffer buffer = new Gst.Buffer.wrapped((owned) data);
- // See above
-#if FIXED_APPSRC_PUSH_BUFFER_IN_VAPI
+ prepare_remote_crypto();
+
+ Gst.Buffer buffer;
+ if (crypto_session.has_decrypt) {
+ try {
+ buffer = new Gst.Buffer.wrapped(crypto_session.decrypt_rtcp(bytes.get_data()));
+ } catch (Error e) {
+ warning("%s (%d)", e.message, e.code);
+ return;
+ }
+ } else {
+#if GST_1_16
+ buffer = new Gst.Buffer.wrapped_bytes(bytes);
+#else
+ buffer = new Gst.Buffer.wrapped(bytes.get_data());
+#endif
+ }
+
+#if VALA_0_50
recv_rtcp.push_buffer((owned) buffer);
#else
Gst.FlowReturn ret;
@@ -502,10 +613,10 @@ public class Dino.Plugins.Rtp.Stream : Xmpp.Xep.JingleRtp.Stream {
debug("RTCP is ready, resending rtcp: %s", rtp_sent.to_string());
}
- public void on_ssrc_pad_added(string ssrc, Gst.Pad pad) {
- debug("New ssrc %s with pad %s", ssrc, pad.name);
- if (participant_ssrc != null && participant_ssrc != ssrc) {
- warning("Got second ssrc on stream (old: %s, new: %s), ignoring", participant_ssrc, ssrc);
+ public void on_ssrc_pad_added(uint32 ssrc, Gst.Pad pad) {
+ debug("New ssrc %u with pad %s", ssrc, pad.name);
+ if (participant_ssrc != 0 && participant_ssrc != ssrc) {
+ warning("Got second ssrc on stream (old: %u, new: %u), ignoring", participant_ssrc, ssrc);
return;
}
participant_ssrc = ssrc;
@@ -534,7 +645,9 @@ public class Dino.Plugins.Rtp.Stream : Xmpp.Xep.JingleRtp.Stream {
private void set_input_and_pause(Gst.Element? input, bool paused) {
if (created && this.input != null) {
- this.input.unlink(encode);
+ this.input_pad.unlink(send_rtp_sink_pad);
+ this.input.release_request_pad(this.input_pad);
+ this.input_pad = null;
this.input = null;
}
@@ -543,28 +656,42 @@ public class Dino.Plugins.Rtp.Stream : Xmpp.Xep.JingleRtp.Stream {
if (created && sending && !paused && input != null) {
plugin.pause();
- input.link(encode);
+ input_pad = input.get_request_pad(@"src_$rtpid");
+ input_pad.link(send_rtp_sink_pad);
plugin.unpause();
}
}
public void pause() {
if (paused) return;
+ var input = this.input;
set_input_and_pause(null, true);
- if (input_device != null) input_device.unlink();
+ if (input != null && input_device != null) input_device.unlink(input);
}
public void unpause() {
if (!paused) return;
- set_input_and_pause(input_device != null ? input_device.link_source() : null, false);
+ set_input_and_pause(input_device != null ? input_device.link_source(payload_type, our_ssrc, next_seqnum_offset, next_timestamp_offset) : null, false);
+ input_device.update_bitrate(payload_type, target_send_bitrate);
+ }
+
+ public uint get_participant_ssrc(Xmpp.Jid participant) {
+ if (participant.equals(content.session.peer_full_jid)) {
+ return participant_ssrc;
+ }
+ return 0;
}
ulong block_probe_handler_id = 0;
- public virtual void add_output(Gst.Element element) {
+ public virtual void add_output(Gst.Element element, Xmpp.Jid? participant = null) {
if (output != null) {
critical("add_output() invoked more than once");
return;
}
+ if (participant != null) {
+ critical("add_output() invoked with participant when not supported");
+ return;
+ }
this.output = element;
if (created) {
plugin.pause();
@@ -586,7 +713,7 @@ public class Dino.Plugins.Rtp.Stream : Xmpp.Xep.JingleRtp.Stream {
decode.unlink(element);
}
if (this._output_device != null) {
- this._output_device.unlink();
+ this._output_device.unlink(element);
this._output_device = null;
}
this.output = null;
@@ -657,7 +784,7 @@ public class Dino.Plugins.Rtp.VideoStream : Stream {
disconnect(video_orientation_changed_handler);
}
- public override void add_output(Gst.Element element) {
+ public override void add_output(Gst.Element element, Xmpp.Jid? participant) {
if (element == output_tee || element == rotate) {
base.add_output(element);
return;
@@ -678,4 +805,4 @@ public class Dino.Plugins.Rtp.VideoStream : Stream {
output_tee.unlink(element);
}
}
-} \ No newline at end of file
+}