diff options
Diffstat (limited to 'xmpp-vala/src/module/xep/0166_jingle.vala')
-rw-r--r-- | xmpp-vala/src/module/xep/0166_jingle.vala | 185 |
1 files changed, 170 insertions, 15 deletions
diff --git a/xmpp-vala/src/module/xep/0166_jingle.vala b/xmpp-vala/src/module/xep/0166_jingle.vala index ae872ac6..ee7df994 100644 --- a/xmpp-vala/src/module/xep/0166_jingle.vala +++ b/xmpp-vala/src/module/xep/0166_jingle.vala @@ -184,7 +184,7 @@ public class Module : XmppStreamModule, Iq.Handler { if (transport == null || transport.transport_type() != type) { StanzaNode reason = new StanzaNode.build("reason", NS_URI) .put_node(new StanzaNode.build("unsupported-transports", NS_URI)); - session.terminate(stream, reason); + session.terminate(stream, reason, "unsupported transports"); return; } @@ -310,7 +310,8 @@ public class Session { TransportParameters? transport = null; // ACTIVE - public IOStream? conn { get; private set; } + private Connection? connection; + public IOStream? conn { get { return connection; } } // Only interesting in INITIATE_SENT. // Signals that the session has been accepted by the peer. @@ -323,7 +324,7 @@ public class Session { this.peer_full_jid = peer_full_jid; this.content_name = content_name; this.transport = transport; - this.conn = null; + this.connection = new Connection(this); } public Session.initiate_received(string sid, Type type, TransportParameters? transport, Jid peer_full_jid, string content_name) { @@ -333,7 +334,7 @@ public class Session { this.peer_full_jid = peer_full_jid; this.content_name = content_name; this.transport = transport; - this.conn = null; + this.connection = new Connection(this); } public void handle_iq_set(XmppStream stream, string action, StanzaNode jingle, Iq.Stanza iq) throws IqError { @@ -389,13 +390,17 @@ public class Session { throw new IqError.BAD_REQUEST("session-accept with unnegotiated transport method"); } transport.update_transport(transport_node); - conn = transport.create_transport_connection(stream, peer_full_jid, Role.INITIATOR); + connection.set_inner(transport.create_transport_connection(stream, peer_full_jid, Role.INITIATOR)); transport = null; stream.get_module(Iq.Module.IDENTITY).send_iq(stream, new Iq.Stanza.result(iq)); state = State.ACTIVE; accepted(stream); } void handle_session_terminate(XmppStream stream, StanzaNode jingle, Iq.Stanza iq) throws IqError { + connection.on_terminated_by_jingle("remote terminated jingle session"); + state = ENDED; + stream.get_flag(Flag.IDENTITY).remove_session(sid); + stream.get_module(Iq.Module.IDENTITY).send_iq(stream, new Iq.Stanza.result(iq)); // TODO(hrxi): also handle presence type=unavailable } @@ -417,7 +422,7 @@ public class Session { Iq.Stanza iq = new Iq.Stanza.set(jingle) { to=peer_full_jid }; stream.get_module(Iq.Module.IDENTITY).send_iq(stream, iq); - conn = transport.create_transport_connection(stream, peer_full_jid, Role.RESPONDER); + connection.set_inner(transport.create_transport_connection(stream, peer_full_jid, Role.RESPONDER)); transport = null; state = State.ACTIVE; @@ -429,7 +434,7 @@ public class Session { } StanzaNode reason = new StanzaNode.build("reason", NS_URI) .put_node(new StanzaNode.build("decline", NS_URI)); - terminate(stream, reason); + terminate(stream, reason, "declined"); } public void set_application_error(XmppStream stream, StanzaNode? application_reason = null) { @@ -438,23 +443,24 @@ public class Session { if (application_reason != null) { reason.put_node(application_reason); } - terminate(stream, reason); + terminate(stream, reason, "application error"); } - public void close_connection(XmppStream stream) { - if (state != State.ACTIVE) { - return; // TODO(hrxi): what to do? - } - conn.close(); + public void on_connection_error(IOError error) { + // TODO(hrxi): conjure an XmppStream out of nowhere and terminate the session } - public void terminate(XmppStream stream, StanzaNode reason) { + public void terminate(XmppStream stream, StanzaNode reason, string? local_reason) { if (state != State.INITIATE_SENT && state != State.INITIATE_RECEIVED && state != State.ACTIVE) { // TODO(hrxi): what to do? return; } if (state == State.ACTIVE) { - conn.close(); + if (local_reason != null) { + connection.on_terminated_by_jingle(@"local session-terminate: $(local_reason)"); + } else { + connection.on_terminated_by_jingle("local session-terminate"); + } } StanzaNode jingle = new StanzaNode.build("jingle", NS_URI) @@ -472,6 +478,155 @@ public class Session { } } +public class Connection : IOStream { + public class Input : InputStream { + private weak Connection connection; + public Input(Connection connection) { + this.connection = connection; + } + public override ssize_t read(uint8[] buffer, Cancellable? cancellable = null) throws IOError { + throw new IOError.NOT_SUPPORTED("can't do non-async reads on jingle connections"); + } + public override async ssize_t read_async(uint8[]? buffer, int io_priority = GLib.Priority.DEFAULT, Cancellable? cancellable = null) throws IOError { + return yield connection.read_async(buffer, io_priority, cancellable); + } + public override bool close(Cancellable? cancellable = null) throws IOError { + return connection.close_read(cancellable); + } + public override async bool close_async(int io_priority = GLib.Priority.DEFAULT, Cancellable? cancellable = null) throws IOError { + return yield connection.close_read_async(io_priority, cancellable); + } + } + public class Output : OutputStream { + private weak Connection connection; + public Output(Connection connection) { + this.connection = connection; + } + public override ssize_t write(uint8[] buffer, Cancellable? cancellable = null) throws IOError { + throw new IOError.NOT_SUPPORTED("can't do non-async writes on jingle connections"); + } + public override async ssize_t write_async(uint8[]? buffer, int io_priority = GLib.Priority.DEFAULT, Cancellable? cancellable = null) throws IOError { + return yield connection.write_async(buffer, io_priority, cancellable); + } + public override bool close(Cancellable? cancellable = null) throws IOError { + return connection.close_write(cancellable); + } + public override async bool close_async(int io_priority = GLib.Priority.DEFAULT, Cancellable? cancellable = null) throws IOError { + return yield connection.close_write_async(io_priority, cancellable); + } + } + + private Input input; + private Output output; + public override InputStream input_stream { get { return input; } } + public override OutputStream output_stream { get { return output; } } + + private weak Session session; + private IOStream? inner = null; + private string? error = null; + + private class OnSetInnerCallback { + public SourceFunc callback; + public int io_priority; + } + + Gee.List<OnSetInnerCallback> callbacks = new ArrayList<OnSetInnerCallback>(); + + public Connection(Session session) { + this.input = new Input(this); + this.output = new Output(this); + this.session = session; + } + + public void set_inner(IOStream inner) { + assert(this.inner == null); + this.inner = inner; + foreach (OnSetInnerCallback c in callbacks) { + Idle.add((owned) c.callback, c.io_priority); + } + callbacks = null; + } + + public void on_terminated_by_jingle(string reason) { + if (error == null) { + close_async.begin(); + error = reason; + } + } + + private void check_for_errors() throws IOError { + if (error != null) { + throw new IOError.CLOSED(error); + } + } + private async void wait_and_check_for_errors(int io_priority, Cancellable? cancellable = null) throws IOError { + while (true) { + check_for_errors(); + if (inner != null) { + return; + } + SourceFunc callback = wait_and_check_for_errors.callback; + ulong id = cancellable.connect(() => callback()); + callbacks.add(new OnSetInnerCallback() { callback=callback, io_priority=io_priority}); + yield; + cancellable.disconnect(id); + } + } + private void handle_connection_error(IOError error) { + Session? strong = session; + if (strong != null) { + strong.on_connection_error(error); + } + } + + public async ssize_t read_async(uint8[]? buffer, int io_priority = GLib.Priority.DEFAULT, Cancellable? cancellable = null) throws IOError { + yield wait_and_check_for_errors(io_priority, cancellable); + try { + return yield inner.input_stream.read_async(buffer, io_priority, cancellable); + } catch (IOError e) { + handle_connection_error(e); + throw e; + } + } + public async ssize_t write_async(uint8[]? buffer, int io_priority = GLib.Priority.DEFAULT, Cancellable? cancellable = null) throws IOError { + yield wait_and_check_for_errors(io_priority, cancellable); + try { + return yield inner.output_stream.write_async(buffer, io_priority, cancellable); + } catch (IOError e) { + handle_connection_error(e); + throw e; + } + } + public bool close_read(Cancellable? cancellable = null) throws IOError { + check_for_errors(); + close_read_async.begin(GLib.Priority.DEFAULT, cancellable); + return true; + } + public async bool close_read_async(int io_priority = GLib.Priority.DEFAULT, Cancellable? cancellable = null) throws IOError { + yield wait_and_check_for_errors(io_priority, cancellable); + try { + return yield inner.input_stream.close_async(io_priority, cancellable); + } catch (IOError e) { + handle_connection_error(e); + throw e; + } + } + public bool close_write(Cancellable? cancellable = null) throws IOError { + check_for_errors(); + close_write_async.begin(GLib.Priority.DEFAULT, cancellable); + return true; + } + public async bool close_write_async(int io_priority = GLib.Priority.DEFAULT, Cancellable? cancellable = null) throws IOError { + yield wait_and_check_for_errors(io_priority, cancellable); + try { + return yield inner.output_stream.close_async(io_priority, cancellable); + } catch (IOError e) { + handle_connection_error(e); + throw e; + } + } +} + public class Flag : XmppStreamFlag { public static FlagIdentity<Flag> IDENTITY = new FlagIdentity<Flag>(NS_URI, "jingle"); |