aboutsummaryrefslogtreecommitdiff
path: root/plugins/rtp/src/stream.vala
diff options
context:
space:
mode:
authorfiaxh <git@lightrise.org>2021-05-11 12:57:02 +0200
committerfiaxh <git@lightrise.org>2021-05-11 12:57:02 +0200
commitd71604913dd5b3372a823320db83c37c845fac5c (patch)
tree2ffbff97a02c81d48d8aef4a4b7ee870507236e9 /plugins/rtp/src/stream.vala
parente92ed27317ae398c867c946cf7206b1f0b32f3b4 (diff)
parent90f9ecf62b2ebfef14de2874e7942552409632bf (diff)
downloaddino-d71604913dd5b3372a823320db83c37c845fac5c.tar.gz
dino-d71604913dd5b3372a823320db83c37c845fac5c.zip
Merge remote-tracking branch 'origin/feature/calls'
Diffstat (limited to 'plugins/rtp/src/stream.vala')
-rw-r--r--plugins/rtp/src/stream.vala681
1 files changed, 681 insertions, 0 deletions
diff --git a/plugins/rtp/src/stream.vala b/plugins/rtp/src/stream.vala
new file mode 100644
index 00000000..bd8a279f
--- /dev/null
+++ b/plugins/rtp/src/stream.vala
@@ -0,0 +1,681 @@
+using Gee;
+using Xmpp;
+
+public class Dino.Plugins.Rtp.Stream : Xmpp.Xep.JingleRtp.Stream {
+ public uint8 rtpid { get; private set; }
+
+ public Plugin plugin { get; private set; }
+ public Gst.Pipeline pipe { get {
+ return plugin.pipe;
+ }}
+ public Gst.Element rtpbin { get {
+ return plugin.rtpbin;
+ }}
+ public CodecUtil codec_util { get {
+ return plugin.codec_util;
+ }}
+ private Gst.App.Sink send_rtp;
+ private Gst.App.Sink send_rtcp;
+ private Gst.App.Src recv_rtp;
+ private Gst.App.Src recv_rtcp;
+ private Gst.Element encode;
+ private Gst.RTP.BasePayload encode_pay;
+ private Gst.Element decode;
+ private Gst.RTP.BaseDepayload decode_depay;
+ private Gst.Element input;
+ private Gst.Element output;
+ private Gst.Element session;
+
+ private Device _input_device;
+ public Device input_device { get { return _input_device; } set {
+ if (!paused) {
+ if (this._input_device != null) {
+ this._input_device.unlink();
+ this._input_device = null;
+ }
+ set_input(value != null ? value.link_source() : null);
+ }
+ this._input_device = value;
+ }}
+ private Device _output_device;
+ public Device output_device { get { return _output_device; } set {
+ if (output != null) remove_output(output);
+ if (value != null) add_output(value.link_sink());
+ this._output_device = value;
+ }}
+
+ public bool created { get; private set; default = false; }
+ public bool paused { get; private set; default = false; }
+ private bool push_recv_data = false;
+ private string participant_ssrc = null;
+
+ private Gst.Pad recv_rtcp_sink_pad;
+ private Gst.Pad recv_rtp_sink_pad;
+ private Gst.Pad recv_rtp_src_pad;
+ private Gst.Pad send_rtcp_src_pad;
+ private Gst.Pad send_rtp_sink_pad;
+ private Gst.Pad send_rtp_src_pad;
+
+ private Crypto.Srtp.Session? crypto_session = new Crypto.Srtp.Session();
+
+ public Stream(Plugin plugin, Xmpp.Xep.Jingle.Content content) {
+ base(content);
+ this.plugin = plugin;
+ this.rtpid = plugin.next_free_id();
+
+ content.notify["senders"].connect_after(on_senders_changed);
+ }
+
+ public void on_senders_changed() {
+ if (sending && input == null) {
+ input_device = plugin.get_preferred_device(media, false);
+ }
+ if (receiving && output == null) {
+ output_device = plugin.get_preferred_device(media, true);
+ }
+ }
+
+ public override void create() {
+ plugin.pause();
+
+ // Create i/o if needed
+
+ if (input == null && input_device == null && sending) {
+ input_device = plugin.get_preferred_device(media, false);
+ }
+ if (output == null && output_device == null && receiving && media == "audio") {
+ output_device = plugin.get_preferred_device(media, true);
+ }
+
+ // Create app elements
+ send_rtp = Gst.ElementFactory.make("appsink", @"rtp_sink_$rtpid") as Gst.App.Sink;
+ send_rtp.async = false;
+ send_rtp.caps = CodecUtil.get_caps(media, payload_type, false);
+ send_rtp.emit_signals = true;
+ send_rtp.sync = false;
+ send_rtp.new_sample.connect(on_new_sample);
+ pipe.add(send_rtp);
+
+ send_rtcp = Gst.ElementFactory.make("appsink", @"rtcp_sink_$rtpid") as Gst.App.Sink;
+ send_rtcp.async = false;
+ send_rtcp.caps = new Gst.Caps.empty_simple("application/x-rtcp");
+ send_rtcp.emit_signals = true;
+ send_rtcp.sync = false;
+ send_rtcp.new_sample.connect(on_new_sample);
+ pipe.add(send_rtcp);
+
+ recv_rtp = Gst.ElementFactory.make("appsrc", @"rtp_src_$rtpid") as Gst.App.Src;
+ recv_rtp.caps = CodecUtil.get_caps(media, payload_type, true);
+ recv_rtp.do_timestamp = true;
+ recv_rtp.format = Gst.Format.TIME;
+ recv_rtp.is_live = true;
+ pipe.add(recv_rtp);
+
+ recv_rtcp = Gst.ElementFactory.make("appsrc", @"rtcp_src_$rtpid") as Gst.App.Src;
+ recv_rtcp.caps = new Gst.Caps.empty_simple("application/x-rtcp");
+ recv_rtcp.do_timestamp = true;
+ recv_rtcp.format = Gst.Format.TIME;
+ recv_rtcp.is_live = true;
+ pipe.add(recv_rtcp);
+
+ // Connect RTCP
+ send_rtcp_src_pad = rtpbin.get_request_pad(@"send_rtcp_src_$rtpid");
+ send_rtcp_src_pad.link(send_rtcp.get_static_pad("sink"));
+ recv_rtcp_sink_pad = rtpbin.get_request_pad(@"recv_rtcp_sink_$rtpid");
+ recv_rtcp.get_static_pad("src").link(recv_rtcp_sink_pad);
+
+ // Connect input
+ encode = codec_util.get_encode_bin(media, payload_type, @"encode_$rtpid");
+ encode_pay = (Gst.RTP.BasePayload)((Gst.Bin)encode).get_by_name(@"encode_$(rtpid)_rtp_pay");
+ pipe.add(encode);
+ send_rtp_sink_pad = rtpbin.get_request_pad(@"send_rtp_sink_$rtpid");
+ encode.get_static_pad("src").link(send_rtp_sink_pad);
+ if (input != null) {
+ input.link(encode);
+ }
+
+ // Connect output
+ decode = codec_util.get_decode_bin(media, payload_type, @"decode_$rtpid");
+ decode_depay = (Gst.RTP.BaseDepayload)((Gst.Bin)encode).get_by_name(@"decode_$(rtpid)_rtp_depay");
+ pipe.add(decode);
+ if (output != null) {
+ decode.link(output);
+ }
+
+ // Connect RTP
+ recv_rtp_sink_pad = rtpbin.get_request_pad(@"recv_rtp_sink_$rtpid");
+ recv_rtp.get_static_pad("src").link(recv_rtp_sink_pad);
+
+ created = true;
+ push_recv_data = true;
+ plugin.unpause();
+
+ GLib.Signal.emit_by_name(rtpbin, "get-session", rtpid, out session);
+ if (session != null && payload_type.rtcp_fbs.any_match((it) => it.type_ == "goog-remb")) {
+ Object internal_session;
+ session.@get("internal-session", out internal_session);
+ if (internal_session != null) {
+ internal_session.connect("signal::on-feedback-rtcp", on_feedback_rtcp, this);
+ }
+ Timeout.add(1000, () => remb_adjust());
+ }
+ if (media == "video") {
+ codec_util.update_bitrate(media, payload_type, encode, 256);
+ }
+ }
+
+ private uint remb = 256;
+ private int last_packets_lost = -1;
+ private uint64 last_packets_received;
+ private uint64 last_octets_received;
+ private bool remb_adjust() {
+ unowned Gst.Structure? stats;
+ if (session == null) {
+ debug("Session for %u finished, turning off remb adjustment", rtpid);
+ return Source.REMOVE;
+ }
+ session.get("stats", out stats);
+ if (stats == null) {
+ warning("No stats for session %u", rtpid);
+ return Source.REMOVE;
+ }
+ unowned ValueArray? source_stats;
+ stats.get("source-stats", typeof(ValueArray), out source_stats);
+ if (source_stats == null) {
+ warning("No source-stats for session %u", rtpid);
+ return Source.REMOVE;
+ }
+ foreach (Value value in source_stats.values) {
+ unowned Gst.Structure source_stat = (Gst.Structure) value.get_boxed();
+ uint ssrc;
+ if (!source_stat.get_uint("ssrc", out ssrc)) continue;
+ if (ssrc.to_string() == participant_ssrc) {
+ int packets_lost;
+ uint64 packets_received, octets_received;
+ source_stat.get_int("packets-lost", out packets_lost);
+ source_stat.get_uint64("packets-received", out packets_received);
+ source_stat.get_uint64("octets-received", out octets_received);
+ int new_lost = packets_lost - last_packets_lost;
+ uint64 new_received = packets_received - last_packets_received;
+ uint64 new_octets = octets_received - last_octets_received;
+ if (new_received == 0) continue;
+ last_packets_lost = packets_lost;
+ last_packets_received = packets_received;
+ last_octets_received = octets_received;
+ double loss_rate = (double)new_lost / (double)(new_lost + new_received);
+ if (new_lost <= 0 || loss_rate < 0.02) {
+ remb = (uint)(1.08 * (double)remb);
+ } else if (loss_rate > 0.1) {
+ remb = (uint)((1.0 - 0.5 * loss_rate) * (double)remb);
+ }
+ remb = uint.max(remb, (uint)((new_octets * 8) / 1000));
+ remb = uint.max(16, remb); // Never go below 16
+ uint8[] data = new uint8[] {
+ 143, 206, 0, 5,
+ 0, 0, 0, 0,
+ 0, 0, 0, 0,
+ 'R', 'E', 'M', 'B',
+ 1, 0, 0, 0,
+ 0, 0, 0, 0
+ };
+ data[4] = (uint8)((encode_pay.ssrc >> 24) & 0xff);
+ data[5] = (uint8)((encode_pay.ssrc >> 16) & 0xff);
+ data[6] = (uint8)((encode_pay.ssrc >> 8) & 0xff);
+ data[7] = (uint8)(encode_pay.ssrc & 0xff);
+ uint8 br_exp = 0;
+ uint32 br_mant = remb * 1000;
+ uint8 bits = (uint8)Math.log2(br_mant);
+ if (bits > 16) {
+ br_exp = (uint8)bits - 16;
+ br_mant = br_mant >> br_exp;
+ }
+ data[17] = (uint8)((br_exp << 2) | ((br_mant >> 16) & 0x3));
+ data[18] = (uint8)((br_mant >> 8) & 0xff);
+ data[19] = (uint8)(br_mant & 0xff);
+ data[20] = (uint8)((ssrc >> 24) & 0xff);
+ data[21] = (uint8)((ssrc >> 16) & 0xff);
+ data[22] = (uint8)((ssrc >> 8) & 0xff);
+ data[23] = (uint8)(ssrc & 0xff);
+ encrypt_and_send_rtcp(data);
+ }
+ }
+ return Source.CONTINUE;
+ }
+
+ private static void on_feedback_rtcp(Gst.Element session, uint type, uint fbtype, uint sender_ssrc, uint media_ssrc, Gst.Buffer? fci, Stream self) {
+ if (type == 206 && fbtype == 15 && fci != null && sender_ssrc.to_string() == self.participant_ssrc) {
+ // https://tools.ietf.org/html/draft-alvestrand-rmcat-remb-03
+ uint8[] data;
+ fci.extract_dup(0, fci.get_size(), out data);
+ if (data[0] != 'R' || data[1] != 'E' || data[2] != 'M' || data[3] != 'B') return;
+ uint8 br_exp = data[5] >> 2;
+ uint32 br_mant = (((uint32)data[5] & 0x3) << 16) + ((uint32)data[6] << 8) + (uint32)data[7];
+ uint bitrate = (br_mant << br_exp) / 1000;
+ self.codec_util.update_bitrate(self.media, self.payload_type, self.encode, bitrate * 8);
+ }
+ }
+
+ private void prepare_local_crypto() {
+ if (local_crypto != null && local_crypto.is_valid && !crypto_session.has_encrypt) {
+ crypto_session.set_encryption_key(local_crypto.crypto_suite, local_crypto.key, local_crypto.salt);
+ debug("Setting up encryption with key params %s", local_crypto.key_params);
+ }
+ }
+
+ private Gst.FlowReturn on_new_sample(Gst.App.Sink sink) {
+ if (sink == null) {
+ debug("Sink is null");
+ return Gst.FlowReturn.EOS;
+ }
+ Gst.Sample sample = sink.pull_sample();
+ Gst.Buffer buffer = sample.get_buffer();
+ uint8[] data;
+ buffer.extract_dup(0, buffer.get_size(), out data);
+ prepare_local_crypto();
+ if (sink == send_rtp) {
+ if (crypto_session.has_encrypt) {
+ data = crypto_session.encrypt_rtp(data);
+ }
+ on_send_rtp_data(new Bytes.take((owned) data));
+ } else if (sink == send_rtcp) {
+ encrypt_and_send_rtcp((owned) data);
+ } else {
+ warning("unknown sample");
+ }
+ return Gst.FlowReturn.OK;
+ }
+
+ private void encrypt_and_send_rtcp(owned uint8[] data) {
+ if (crypto_session.has_encrypt) {
+ data = crypto_session.encrypt_rtcp(data);
+ }
+ if (rtcp_mux) {
+ on_send_rtp_data(new Bytes.take((owned) data));
+ } else {
+ on_send_rtcp_data(new Bytes.take((owned) data));
+ }
+ }
+
+ private static Gst.PadProbeReturn drop_probe() {
+ return Gst.PadProbeReturn.DROP;
+ }
+
+ public override void destroy() {
+ // Stop network communication
+ push_recv_data = false;
+ recv_rtp.end_of_stream();
+ recv_rtcp.end_of_stream();
+ send_rtp.new_sample.disconnect(on_new_sample);
+ send_rtcp.new_sample.disconnect(on_new_sample);
+
+ // Disconnect input device
+ if (input != null) {
+ input.unlink(encode);
+ input = null;
+ }
+ if (this._input_device != null) {
+ if (!paused) this._input_device.unlink();
+ this._input_device = null;
+ }
+
+ // Disconnect encode
+ encode.set_locked_state(true);
+ encode.set_state(Gst.State.NULL);
+ encode.get_static_pad("src").unlink(send_rtp_sink_pad);
+ pipe.remove(encode);
+ encode = null;
+ encode_pay = null;
+
+ // Disconnect RTP sending
+ if (send_rtp_src_pad != null) {
+ send_rtp_src_pad.add_probe(Gst.PadProbeType.BLOCK, drop_probe);
+ send_rtp_src_pad.unlink(send_rtp.get_static_pad("sink"));
+ }
+ send_rtp.set_locked_state(true);
+ send_rtp.set_state(Gst.State.NULL);
+ pipe.remove(send_rtp);
+ send_rtp = null;
+
+ // Disconnect decode
+ if (recv_rtp_src_pad != null) {
+ recv_rtp_src_pad.add_probe(Gst.PadProbeType.BLOCK, drop_probe);
+ recv_rtp_src_pad.unlink(decode.get_static_pad("sink"));
+ }
+
+ // Disconnect RTP receiving
+ recv_rtp.set_locked_state(true);
+ recv_rtp.set_state(Gst.State.NULL);
+ recv_rtp.get_static_pad("src").unlink(recv_rtp_sink_pad);
+ pipe.remove(recv_rtp);
+ recv_rtp = null;
+
+ // Disconnect output
+ if (output != null) {
+ decode.unlink(output);
+ }
+ decode.set_locked_state(true);
+ decode.set_state(Gst.State.NULL);
+ pipe.remove(decode);
+ decode = null;
+ decode_depay = null;
+ output = null;
+
+ // Disconnect output device
+ if (this._output_device != null) {
+ this._output_device.unlink();
+ this._output_device = null;
+ }
+
+ // Disconnect RTCP receiving
+ recv_rtcp.get_static_pad("src").unlink(recv_rtcp_sink_pad);
+ recv_rtcp.set_locked_state(true);
+ recv_rtcp.set_state(Gst.State.NULL);
+ pipe.remove(recv_rtcp);
+ recv_rtcp = null;
+
+ // Disconnect RTCP sending
+ send_rtcp_src_pad.unlink(send_rtcp.get_static_pad("sink"));
+ send_rtcp.set_locked_state(true);
+ send_rtcp.set_state(Gst.State.NULL);
+ pipe.remove(send_rtcp);
+ send_rtcp = null;
+
+ // Release rtp pads
+ rtpbin.release_request_pad(send_rtp_sink_pad);
+ send_rtp_sink_pad = null;
+ rtpbin.release_request_pad(recv_rtp_sink_pad);
+ recv_rtp_sink_pad = null;
+ rtpbin.release_request_pad(recv_rtcp_sink_pad);
+ recv_rtcp_sink_pad = null;
+ rtpbin.release_request_pad(send_rtcp_src_pad);
+ send_rtcp_src_pad = null;
+ send_rtp_src_pad = null;
+ recv_rtp_src_pad = null;
+
+ session = null;
+ }
+
+ private void prepare_remote_crypto() {
+ if (remote_crypto != null && remote_crypto.is_valid && !crypto_session.has_decrypt) {
+ crypto_session.set_decryption_key(remote_crypto.crypto_suite, remote_crypto.key, remote_crypto.salt);
+ debug("Setting up decryption with key params %s", remote_crypto.key_params);
+ }
+ }
+
+ private uint16 previous_video_orientation_degree = uint16.MAX;
+ public signal void video_orientation_changed(uint16 degree);
+
+ public override void on_recv_rtp_data(Bytes bytes) {
+ if (rtcp_mux && bytes.length >= 2 && bytes.get(1) >= 192 && bytes.get(1) < 224) {
+ on_recv_rtcp_data(bytes);
+ return;
+ }
+ prepare_remote_crypto();
+ uint8[] data = bytes.get_data();
+ if (crypto_session.has_decrypt) {
+ try {
+ data = crypto_session.decrypt_rtp(data);
+ } catch (Error e) {
+ warning("%s (%d)", e.message, e.code);
+ }
+ }
+ if (push_recv_data) {
+ Gst.Buffer buffer = new Gst.Buffer.wrapped((owned) data);
+ Gst.RTP.Buffer rtp_buffer;
+ if (Gst.RTP.Buffer.map(buffer, Gst.MapFlags.READ, out rtp_buffer)) {
+ if (rtp_buffer.get_extension()) {
+ Xmpp.Xep.JingleRtp.HeaderExtension? ext = header_extensions.first_match((it) => it.uri == "urn:3gpp:video-orientation");
+ if (ext != null) {
+ unowned uint8[] extension_data;
+ if (rtp_buffer.get_extension_onebyte_header(ext.id, 0, out extension_data) && extension_data.length == 1) {
+ bool camera = (extension_data[0] & 0x8) > 0;
+ bool flip = (extension_data[0] & 0x4) > 0;
+ uint8 rotation = extension_data[0] & 0x3;
+ uint16 rotation_degree = uint16.MAX;
+ switch(rotation) {
+ case 0: rotation_degree = 0; break;
+ case 1: rotation_degree = 90; break;
+ case 2: rotation_degree = 180; break;
+ case 3: rotation_degree = 270; break;
+ }
+ if (rotation_degree != previous_video_orientation_degree) {
+ video_orientation_changed(rotation_degree);
+ previous_video_orientation_degree = rotation_degree;
+ }
+ }
+ }
+ }
+ rtp_buffer.unmap();
+ }
+
+ // FIXME: VAPI file in Vala < 0.49.1 has a bug that results in broken ownership of buffer in push_buffer()
+ // We workaround by using the plain signal. The signal unfortunately will cause an unnecessary copy of
+ // the underlying buffer, so and some point we should move over to the new version (once we require
+ // Vala >= 0.50)
+#if FIXED_APPSRC_PUSH_BUFFER_IN_VAPI
+ recv_rtp.push_buffer((owned) buffer);
+#else
+ Gst.FlowReturn ret;
+ GLib.Signal.emit_by_name(recv_rtp, "push-buffer", buffer, out ret);
+#endif
+ }
+ }
+
+ public override void on_recv_rtcp_data(Bytes bytes) {
+ prepare_remote_crypto();
+ uint8[] data = bytes.get_data();
+ if (crypto_session.has_decrypt) {
+ try {
+ data = crypto_session.decrypt_rtcp(data);
+ } catch (Error e) {
+ warning("%s (%d)", e.message, e.code);
+ }
+ }
+ if (push_recv_data) {
+ Gst.Buffer buffer = new Gst.Buffer.wrapped((owned) data);
+ // See above
+#if FIXED_APPSRC_PUSH_BUFFER_IN_VAPI
+ recv_rtcp.push_buffer((owned) buffer);
+#else
+ Gst.FlowReturn ret;
+ GLib.Signal.emit_by_name(recv_rtcp, "push-buffer", buffer, out ret);
+#endif
+ }
+ }
+
+ public override void on_rtp_ready() {
+ // If full frame has been sent before the connection was ready, the counterpart would only display our video after the next full frame.
+ // Send a full frame to let the counterpart display our video asap
+ rtpbin.send_event(new Gst.Event.custom(
+ Gst.EventType.CUSTOM_UPSTREAM,
+ new Gst.Structure("GstForceKeyUnit", "all-headers", typeof(bool), true, null))
+ );
+ }
+
+ public override void on_rtcp_ready() {
+ int rtp_session_id = (int) rtpid;
+ uint64 max_delay = int.MAX;
+ Object rtp_session;
+ bool rtp_sent;
+ GLib.Signal.emit_by_name(rtpbin, "get-internal-session", rtp_session_id, out rtp_session);
+ GLib.Signal.emit_by_name(rtp_session, "send-rtcp-full", max_delay, out rtp_sent);
+ debug("RTCP is ready, resending rtcp: %s", rtp_sent.to_string());
+ }
+
+ public void on_ssrc_pad_added(string ssrc, Gst.Pad pad) {
+ debug("New ssrc %s with pad %s", ssrc, pad.name);
+ if (participant_ssrc != null && participant_ssrc != ssrc) {
+ warning("Got second ssrc on stream (old: %s, new: %s), ignoring", participant_ssrc, ssrc);
+ return;
+ }
+ participant_ssrc = ssrc;
+ recv_rtp_src_pad = pad;
+ if (decode != null) {
+ plugin.pause();
+ debug("Link %s to %s decode for %s", recv_rtp_src_pad.name, media, name);
+ recv_rtp_src_pad.link(decode.get_static_pad("sink"));
+ plugin.unpause();
+ }
+ }
+
+ public void on_send_rtp_src_added(Gst.Pad pad) {
+ send_rtp_src_pad = pad;
+ if (send_rtp != null) {
+ plugin.pause();
+ debug("Link %s to %s send_rtp for %s", send_rtp_src_pad.name, media, name);
+ send_rtp_src_pad.link(send_rtp.get_static_pad("sink"));
+ plugin.unpause();
+ }
+ }
+
+ public void set_input(Gst.Element? input) {
+ set_input_and_pause(input, paused);
+ }
+
+ private void set_input_and_pause(Gst.Element? input, bool paused) {
+ if (created && this.input != null) {
+ this.input.unlink(encode);
+ this.input = null;
+ }
+
+ this.input = input;
+ this.paused = paused;
+
+ if (created && sending && !paused && input != null) {
+ plugin.pause();
+ input.link(encode);
+ plugin.unpause();
+ }
+ }
+
+ public void pause() {
+ if (paused) return;
+ set_input_and_pause(null, true);
+ if (input_device != null) input_device.unlink();
+ }
+
+ public void unpause() {
+ if (!paused) return;
+ set_input_and_pause(input_device != null ? input_device.link_source() : null, false);
+ }
+
+ ulong block_probe_handler_id = 0;
+ public virtual void add_output(Gst.Element element) {
+ if (output != null) {
+ critical("add_output() invoked more than once");
+ return;
+ }
+ this.output = element;
+ if (created) {
+ plugin.pause();
+ decode.link(element);
+ if (block_probe_handler_id != 0) {
+ decode.get_static_pad("src").remove_probe(block_probe_handler_id);
+ }
+ plugin.unpause();
+ }
+ }
+
+ public virtual void remove_output(Gst.Element element) {
+ if (output != element) {
+ critical("remove_output() invoked without prior add_output()");
+ return;
+ }
+ if (created) {
+ block_probe_handler_id = decode.get_static_pad("src").add_probe(Gst.PadProbeType.BLOCK, drop_probe);
+ decode.unlink(element);
+ }
+ if (this._output_device != null) {
+ this._output_device.unlink();
+ this._output_device = null;
+ }
+ this.output = null;
+ }
+}
+
+public class Dino.Plugins.Rtp.VideoStream : Stream {
+ private Gee.List<Gst.Element> outputs = new ArrayList<Gst.Element>();
+ private Gst.Element output_tee;
+ private Gst.Element rotate;
+ private ulong video_orientation_changed_handler;
+
+ public VideoStream(Plugin plugin, Xmpp.Xep.Jingle.Content content) {
+ base(plugin, content);
+ if (media != "video") critical("VideoStream created for non-video media");
+ }
+
+ public override void create() {
+ video_orientation_changed_handler = video_orientation_changed.connect(on_video_orientation_changed);
+ plugin.pause();
+ rotate = Gst.ElementFactory.make("videoflip", @"video_rotate_$rtpid");
+ pipe.add(rotate);
+ output_tee = Gst.ElementFactory.make("tee", @"video_tee_$rtpid");
+ output_tee.@set("allow-not-linked", true);
+ pipe.add(output_tee);
+ rotate.link(output_tee);
+ add_output(rotate);
+ base.create();
+ foreach (Gst.Element output in outputs) {
+ output_tee.link(output);
+ }
+ plugin.unpause();
+ }
+
+ private void on_video_orientation_changed(uint16 degree) {
+ if (rotate != null) {
+ switch (degree) {
+ case 0:
+ rotate.@set("method", 0);
+ break;
+ case 90:
+ rotate.@set("method", 1);
+ break;
+ case 180:
+ rotate.@set("method", 2);
+ break;
+ case 270:
+ rotate.@set("method", 3);
+ break;
+ }
+ }
+ }
+
+ public override void destroy() {
+ foreach (Gst.Element output in outputs) {
+ output_tee.unlink(output);
+ }
+ base.destroy();
+ rotate.set_locked_state(true);
+ rotate.set_state(Gst.State.NULL);
+ rotate.unlink(output_tee);
+ pipe.remove(rotate);
+ rotate = null;
+ output_tee.set_locked_state(true);
+ output_tee.set_state(Gst.State.NULL);
+ pipe.remove(output_tee);
+ output_tee = null;
+ disconnect(video_orientation_changed_handler);
+ }
+
+ public override void add_output(Gst.Element element) {
+ if (element == output_tee || element == rotate) {
+ base.add_output(element);
+ return;
+ }
+ outputs.add(element);
+ if (output_tee != null) {
+ output_tee.link(element);
+ }
+ }
+
+ public override void remove_output(Gst.Element element) {
+ if (element == output_tee || element == rotate) {
+ base.remove_output(element);
+ return;
+ }
+ outputs.remove(element);
+ if (output_tee != null) {
+ output_tee.unlink(element);
+ }
+ }
+} \ No newline at end of file