From 77ff73a1ca5d2283dc5335c1048ccb3fce66e508 Mon Sep 17 00:00:00 2001 From: hrxi Date: Mon, 22 Jul 2019 21:40:30 +0200 Subject: Terminate the Jingle session after the file transfer is complete --- xmpp-vala/src/module/xep/0166_jingle.vala | 106 ++++++++++++++++++++++++++---- 1 file changed, 93 insertions(+), 13 deletions(-) (limited to 'xmpp-vala/src') diff --git a/xmpp-vala/src/module/xep/0166_jingle.vala b/xmpp-vala/src/module/xep/0166_jingle.vala index 4d9a472a..ac1dd10b 100644 --- a/xmpp-vala/src/module/xep/0166_jingle.vala +++ b/xmpp-vala/src/module/xep/0166_jingle.vala @@ -122,7 +122,7 @@ public class Module : XmppStreamModule, Iq.Handler { throw new Error.GENERAL("Couldn't determine own JID"); } TransportParameters transport_params = transport.create_transport_parameters(); - Session session = new Session.initiate_sent(random_uuid(), type, transport_params, receiver_full_jid, content_name); + Session session = new Session.initiate_sent(random_uuid(), type, transport_params, receiver_full_jid, content_name, stream); StanzaNode content = new StanzaNode.build("content", NS_URI) .put_attribute("creator", "initiator") .put_attribute("name", content_name) @@ -177,7 +177,7 @@ public class Module : XmppStreamModule, Iq.Handler { ContentParameters content_params = content_type.parse_content_parameters(description); TransportType type = content_type.content_type_transport_type(); - Session session = new Session.initiate_received(sid, type, transport_params, iq.from, name); + Session session = new Session.initiate_received(sid, type, transport_params, iq.from, name, stream); stream.get_flag(Flag.IDENTITY).add_session(session); stream.get_module(Iq.Module.IDENTITY).send_iq(stream, new Iq.Stanza.result(iq)); @@ -317,7 +317,9 @@ public class Session { // Signals that the session has been accepted by the peer. public signal void accepted(XmppStream stream); - public Session.initiate_sent(string sid, Type type, TransportParameters transport, Jid peer_full_jid, string content_name) { + XmppStream hack; + + public Session.initiate_sent(string sid, Type type, TransportParameters transport, Jid peer_full_jid, string content_name, XmppStream hack) { this.state = State.INITIATE_SENT; this.sid = sid; this.type_ = type; @@ -325,9 +327,10 @@ public class Session { this.content_name = content_name; this.transport = transport; this.connection = new Connection(this); + this.hack = hack; } - public Session.initiate_received(string sid, Type type, TransportParameters? transport, Jid peer_full_jid, string content_name) { + public Session.initiate_received(string sid, Type type, TransportParameters? transport, Jid peer_full_jid, string content_name, XmppStream hack) { this.state = State.INITIATE_RECEIVED; this.sid = sid; this.type_ = type; @@ -335,6 +338,7 @@ public class Session { this.content_name = content_name; this.transport = transport; this.connection = new Connection(this); + this.hack = hack; } public void handle_iq_set(XmppStream stream, string action, StanzaNode jingle, Iq.Stanza iq) throws IqError { @@ -447,12 +451,22 @@ public class Session { } public void on_connection_error(IOError error) { - // TODO(hrxi): conjure an XmppStream out of nowhere and terminate the session + // TODO(hrxi): where can we get an XmppStream from? + StanzaNode reason = new StanzaNode.build("reason", NS_URI) + .put_node(new StanzaNode.build("failed-transport", NS_URI)) + .put_node(new StanzaNode.build("text", NS_URI) + .put_node(new StanzaNode.text(error.message)) + ); + terminate(hack, reason, "transport error: $(error.message)"); + } + public void on_connection_close() { + StanzaNode reason = new StanzaNode.build("reason", NS_URI) + .put_node(new StanzaNode.build("success", NS_URI)); + terminate(hack, reason, "success"); } public void terminate(XmppStream stream, StanzaNode reason, string? local_reason) { - if (state != State.INITIATE_SENT && state != State.INITIATE_RECEIVED && state != State.ACTIVE) { - // TODO(hrxi): what to do? + if (state == State.ENDED) { return; } if (state == State.ACTIVE) { @@ -525,6 +539,9 @@ public class Connection : IOStream { private IOStream? inner = null; private string? error = null; + private bool read_closed = false; + private bool write_closed = false; + private class OnSetInnerCallback { public SourceFunc callback; public int io_priority; @@ -578,12 +595,19 @@ public class Connection : IOStream { strong.on_connection_error(error); } } + private void handle_connection_close() { + Session? strong = session; + if (strong != null) { + strong.on_connection_close(); + } + } public async ssize_t read_async(uint8[]? buffer, int io_priority = GLib.Priority.DEFAULT, Cancellable? cancellable = null) throws IOError { yield wait_and_check_for_errors(io_priority, cancellable); try { return yield inner.input_stream.read_async(buffer, io_priority, cancellable); } catch (IOError e) { + print("read_async error\n"); handle_connection_error(e); throw e; } @@ -593,37 +617,93 @@ public class Connection : IOStream { try { return yield inner.output_stream.write_async(buffer, io_priority, cancellable); } catch (IOError e) { + print("write_async error\n"); handle_connection_error(e); throw e; } } public bool close_read(Cancellable? cancellable = null) throws IOError { check_for_errors(); + if (read_closed) { + return true; + } close_read_async.begin(GLib.Priority.DEFAULT, cancellable); return true; } public async bool close_read_async(int io_priority = GLib.Priority.DEFAULT, Cancellable? cancellable = null) throws IOError { yield wait_and_check_for_errors(io_priority, cancellable); + if (read_closed) { + return true; + } + read_closed = true; + IOError error = null; + bool result = true; try { - return yield inner.input_stream.close_async(io_priority, cancellable); + result = yield inner.input_stream.close_async(io_priority, cancellable); } catch (IOError e) { - handle_connection_error(e); - throw e; + print("input_stream.close_async error\n"); + if (error == null) { + error = e; + } } + try { + result = (yield close_if_both_closed(io_priority, cancellable)) && result; + } catch (IOError e) { + print("close_if_both_closed error\n"); + if (error == null) { + error = e; + } + } + if (error != null) { + handle_connection_error(error); + throw error; + } + return result; } public bool close_write(Cancellable? cancellable = null) throws IOError { check_for_errors(); + if (write_closed) { + return true; + } close_write_async.begin(GLib.Priority.DEFAULT, cancellable); return true; } public async bool close_write_async(int io_priority = GLib.Priority.DEFAULT, Cancellable? cancellable = null) throws IOError { yield wait_and_check_for_errors(io_priority, cancellable); + if (write_closed) { + return true; + } + write_closed = true; + IOError error = null; + bool result = true; try { - return yield inner.output_stream.close_async(io_priority, cancellable); + result = yield inner.output_stream.close_async(io_priority, cancellable); } catch (IOError e) { - handle_connection_error(e); - throw e; + print("output_stream.close_async error\n"); + if (error == null) { + error = e; + } } + try { + result = (yield close_if_both_closed(io_priority, cancellable)) && result; + } catch (IOError e) { + print("close_if_both_closed error\n"); + if (error == null) { + error = e; + } + } + if (error != null) { + handle_connection_error(error); + throw error; + } + return result; + } + private async bool close_if_both_closed(int io_priority, Cancellable? cancellable = null) throws IOError { + if (read_closed && write_closed) { + handle_connection_close(); + //return yield inner.close_async(io_priority, cancellable); + } + return true; } } -- cgit v1.2.3-54-g00ecf