aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--xmpp-vala/src/module/xep/0047_in_band_bytestreams.vala25
-rw-r--r--xmpp-vala/src/module/xep/0166_jingle.vala185
-rw-r--r--xmpp-vala/src/module/xep/0234_jingle_file_transfer.vala7
3 files changed, 176 insertions, 41 deletions
diff --git a/xmpp-vala/src/module/xep/0047_in_band_bytestreams.vala b/xmpp-vala/src/module/xep/0047_in_band_bytestreams.vala
index 2650a194..9af9f30e 100644
--- a/xmpp-vala/src/module/xep/0047_in_band_bytestreams.vala
+++ b/xmpp-vala/src/module/xep/0047_in_band_bytestreams.vala
@@ -60,9 +60,8 @@ public class Module : XmppStreamModule, Iq.Handler {
}
public class Connection : IOStream {
- // TODO(hrxi): Fix reference cycle
public class Input : InputStream {
- private Connection connection;
+ private weak Connection connection;
public Input(Connection connection) {
this.connection = connection;
}
@@ -73,14 +72,14 @@ public class Connection : IOStream {
return yield connection.read_async(buffer, io_priority, cancellable);
}
public override bool close(Cancellable? cancellable = null) throws IOError {
- return connection.close_read(cancellable);
+ throw new IOError.NOT_SUPPORTED("can't do non-async closes on in-band bytestreams");
}
public override async bool close_async(int io_priority = GLib.Priority.DEFAULT, Cancellable? cancellable = null) throws IOError {
return yield connection.close_read_async(io_priority, cancellable);
}
}
public class Output : OutputStream {
- private Connection connection;
+ private weak Connection connection;
public Output(Connection connection) {
this.connection = connection;
}
@@ -91,7 +90,7 @@ public class Connection : IOStream {
return yield connection.write_async(buffer, io_priority, cancellable);
}
public override bool close(Cancellable? cancellable = null) throws IOError {
- return connection.close_write(cancellable);
+ throw new IOError.NOT_SUPPORTED("can't do non-async closes on in-band bytestreams");
}
public override async bool close_async(int io_priority = GLib.Priority.DEFAULT, Cancellable? cancellable = null) throws IOError {
return yield connection.close_write_async(io_priority, cancellable);
@@ -263,13 +262,6 @@ public class Connection : IOStream {
return buffer.length;
}
- public bool close_read(Cancellable? cancellable = null) {
- input_closed = true;
- if (!output_closed) {
- return true;
- }
- return close_impl(cancellable);
- }
public async bool close_read_async(int io_priority = GLib.Priority.DEFAULT, Cancellable? cancellable = null) throws IOError {
input_closed = true;
if (!output_closed) {
@@ -277,13 +269,6 @@ public class Connection : IOStream {
}
return yield close_async_impl(io_priority, cancellable);
}
- public bool close_write(Cancellable? cancellable = null) {
- output_closed = true;
- if (!input_closed) {
- return true;
- }
- return close_impl(cancellable);
- }
public async bool close_write_async(int io_priority = GLib.Priority.DEFAULT, Cancellable? cancellable = null) throws IOError {
output_closed = true;
if (!input_closed) {
@@ -292,7 +277,7 @@ public class Connection : IOStream {
return yield close_async_impl(io_priority, cancellable);
}
delegate void OnClose(bool success);
- private bool close_impl(Cancellable? cancellable = null, OnClose? on_close = null) {
+ private bool close_impl(Cancellable? cancellable, OnClose on_close) {
if (state == State.DISCONNECTING || state == State.DISCONNECTED || state == State.ERROR) {
on_close(true);
return true;
diff --git a/xmpp-vala/src/module/xep/0166_jingle.vala b/xmpp-vala/src/module/xep/0166_jingle.vala
index ae872ac6..ee7df994 100644
--- a/xmpp-vala/src/module/xep/0166_jingle.vala
+++ b/xmpp-vala/src/module/xep/0166_jingle.vala
@@ -184,7 +184,7 @@ public class Module : XmppStreamModule, Iq.Handler {
if (transport == null || transport.transport_type() != type) {
StanzaNode reason = new StanzaNode.build("reason", NS_URI)
.put_node(new StanzaNode.build("unsupported-transports", NS_URI));
- session.terminate(stream, reason);
+ session.terminate(stream, reason, "unsupported transports");
return;
}
@@ -310,7 +310,8 @@ public class Session {
TransportParameters? transport = null;
// ACTIVE
- public IOStream? conn { get; private set; }
+ private Connection? connection;
+ public IOStream? conn { get { return connection; } }
// Only interesting in INITIATE_SENT.
// Signals that the session has been accepted by the peer.
@@ -323,7 +324,7 @@ public class Session {
this.peer_full_jid = peer_full_jid;
this.content_name = content_name;
this.transport = transport;
- this.conn = null;
+ this.connection = new Connection(this);
}
public Session.initiate_received(string sid, Type type, TransportParameters? transport, Jid peer_full_jid, string content_name) {
@@ -333,7 +334,7 @@ public class Session {
this.peer_full_jid = peer_full_jid;
this.content_name = content_name;
this.transport = transport;
- this.conn = null;
+ this.connection = new Connection(this);
}
public void handle_iq_set(XmppStream stream, string action, StanzaNode jingle, Iq.Stanza iq) throws IqError {
@@ -389,13 +390,17 @@ public class Session {
throw new IqError.BAD_REQUEST("session-accept with unnegotiated transport method");
}
transport.update_transport(transport_node);
- conn = transport.create_transport_connection(stream, peer_full_jid, Role.INITIATOR);
+ connection.set_inner(transport.create_transport_connection(stream, peer_full_jid, Role.INITIATOR));
transport = null;
stream.get_module(Iq.Module.IDENTITY).send_iq(stream, new Iq.Stanza.result(iq));
state = State.ACTIVE;
accepted(stream);
}
void handle_session_terminate(XmppStream stream, StanzaNode jingle, Iq.Stanza iq) throws IqError {
+ connection.on_terminated_by_jingle("remote terminated jingle session");
+ state = ENDED;
+ stream.get_flag(Flag.IDENTITY).remove_session(sid);
+
stream.get_module(Iq.Module.IDENTITY).send_iq(stream, new Iq.Stanza.result(iq));
// TODO(hrxi): also handle presence type=unavailable
}
@@ -417,7 +422,7 @@ public class Session {
Iq.Stanza iq = new Iq.Stanza.set(jingle) { to=peer_full_jid };
stream.get_module(Iq.Module.IDENTITY).send_iq(stream, iq);
- conn = transport.create_transport_connection(stream, peer_full_jid, Role.RESPONDER);
+ connection.set_inner(transport.create_transport_connection(stream, peer_full_jid, Role.RESPONDER));
transport = null;
state = State.ACTIVE;
@@ -429,7 +434,7 @@ public class Session {
}
StanzaNode reason = new StanzaNode.build("reason", NS_URI)
.put_node(new StanzaNode.build("decline", NS_URI));
- terminate(stream, reason);
+ terminate(stream, reason, "declined");
}
public void set_application_error(XmppStream stream, StanzaNode? application_reason = null) {
@@ -438,23 +443,24 @@ public class Session {
if (application_reason != null) {
reason.put_node(application_reason);
}
- terminate(stream, reason);
+ terminate(stream, reason, "application error");
}
- public void close_connection(XmppStream stream) {
- if (state != State.ACTIVE) {
- return; // TODO(hrxi): what to do?
- }
- conn.close();
+ public void on_connection_error(IOError error) {
+ // TODO(hrxi): conjure an XmppStream out of nowhere and terminate the session
}
- public void terminate(XmppStream stream, StanzaNode reason) {
+ public void terminate(XmppStream stream, StanzaNode reason, string? local_reason) {
if (state != State.INITIATE_SENT && state != State.INITIATE_RECEIVED && state != State.ACTIVE) {
// TODO(hrxi): what to do?
return;
}
if (state == State.ACTIVE) {
- conn.close();
+ if (local_reason != null) {
+ connection.on_terminated_by_jingle(@"local session-terminate: $(local_reason)");
+ } else {
+ connection.on_terminated_by_jingle("local session-terminate");
+ }
}
StanzaNode jingle = new StanzaNode.build("jingle", NS_URI)
@@ -472,6 +478,155 @@ public class Session {
}
}
+public class Connection : IOStream {
+ public class Input : InputStream {
+ private weak Connection connection;
+ public Input(Connection connection) {
+ this.connection = connection;
+ }
+ public override ssize_t read(uint8[] buffer, Cancellable? cancellable = null) throws IOError {
+ throw new IOError.NOT_SUPPORTED("can't do non-async reads on jingle connections");
+ }
+ public override async ssize_t read_async(uint8[]? buffer, int io_priority = GLib.Priority.DEFAULT, Cancellable? cancellable = null) throws IOError {
+ return yield connection.read_async(buffer, io_priority, cancellable);
+ }
+ public override bool close(Cancellable? cancellable = null) throws IOError {
+ return connection.close_read(cancellable);
+ }
+ public override async bool close_async(int io_priority = GLib.Priority.DEFAULT, Cancellable? cancellable = null) throws IOError {
+ return yield connection.close_read_async(io_priority, cancellable);
+ }
+ }
+ public class Output : OutputStream {
+ private weak Connection connection;
+ public Output(Connection connection) {
+ this.connection = connection;
+ }
+ public override ssize_t write(uint8[] buffer, Cancellable? cancellable = null) throws IOError {
+ throw new IOError.NOT_SUPPORTED("can't do non-async writes on jingle connections");
+ }
+ public override async ssize_t write_async(uint8[]? buffer, int io_priority = GLib.Priority.DEFAULT, Cancellable? cancellable = null) throws IOError {
+ return yield connection.write_async(buffer, io_priority, cancellable);
+ }
+ public override bool close(Cancellable? cancellable = null) throws IOError {
+ return connection.close_write(cancellable);
+ }
+ public override async bool close_async(int io_priority = GLib.Priority.DEFAULT, Cancellable? cancellable = null) throws IOError {
+ return yield connection.close_write_async(io_priority, cancellable);
+ }
+ }
+
+ private Input input;
+ private Output output;
+ public override InputStream input_stream { get { return input; } }
+ public override OutputStream output_stream { get { return output; } }
+
+ private weak Session session;
+ private IOStream? inner = null;
+ private string? error = null;
+
+ private class OnSetInnerCallback {
+ public SourceFunc callback;
+ public int io_priority;
+ }
+
+ Gee.List<OnSetInnerCallback> callbacks = new ArrayList<OnSetInnerCallback>();
+
+ public Connection(Session session) {
+ this.input = new Input(this);
+ this.output = new Output(this);
+ this.session = session;
+ }
+
+ public void set_inner(IOStream inner) {
+ assert(this.inner == null);
+ this.inner = inner;
+ foreach (OnSetInnerCallback c in callbacks) {
+ Idle.add((owned) c.callback, c.io_priority);
+ }
+ callbacks = null;
+ }
+
+ public void on_terminated_by_jingle(string reason) {
+ if (error == null) {
+ close_async.begin();
+ error = reason;
+ }
+ }
+
+ private void check_for_errors() throws IOError {
+ if (error != null) {
+ throw new IOError.CLOSED(error);
+ }
+ }
+ private async void wait_and_check_for_errors(int io_priority, Cancellable? cancellable = null) throws IOError {
+ while (true) {
+ check_for_errors();
+ if (inner != null) {
+ return;
+ }
+ SourceFunc callback = wait_and_check_for_errors.callback;
+ ulong id = cancellable.connect(() => callback());
+ callbacks.add(new OnSetInnerCallback() { callback=callback, io_priority=io_priority});
+ yield;
+ cancellable.disconnect(id);
+ }
+ }
+ private void handle_connection_error(IOError error) {
+ Session? strong = session;
+ if (strong != null) {
+ strong.on_connection_error(error);
+ }
+ }
+
+ public async ssize_t read_async(uint8[]? buffer, int io_priority = GLib.Priority.DEFAULT, Cancellable? cancellable = null) throws IOError {
+ yield wait_and_check_for_errors(io_priority, cancellable);
+ try {
+ return yield inner.input_stream.read_async(buffer, io_priority, cancellable);
+ } catch (IOError e) {
+ handle_connection_error(e);
+ throw e;
+ }
+ }
+ public async ssize_t write_async(uint8[]? buffer, int io_priority = GLib.Priority.DEFAULT, Cancellable? cancellable = null) throws IOError {
+ yield wait_and_check_for_errors(io_priority, cancellable);
+ try {
+ return yield inner.output_stream.write_async(buffer, io_priority, cancellable);
+ } catch (IOError e) {
+ handle_connection_error(e);
+ throw e;
+ }
+ }
+ public bool close_read(Cancellable? cancellable = null) throws IOError {
+ check_for_errors();
+ close_read_async.begin(GLib.Priority.DEFAULT, cancellable);
+ return true;
+ }
+ public async bool close_read_async(int io_priority = GLib.Priority.DEFAULT, Cancellable? cancellable = null) throws IOError {
+ yield wait_and_check_for_errors(io_priority, cancellable);
+ try {
+ return yield inner.input_stream.close_async(io_priority, cancellable);
+ } catch (IOError e) {
+ handle_connection_error(e);
+ throw e;
+ }
+ }
+ public bool close_write(Cancellable? cancellable = null) throws IOError {
+ check_for_errors();
+ close_write_async.begin(GLib.Priority.DEFAULT, cancellable);
+ return true;
+ }
+ public async bool close_write_async(int io_priority = GLib.Priority.DEFAULT, Cancellable? cancellable = null) throws IOError {
+ yield wait_and_check_for_errors(io_priority, cancellable);
+ try {
+ return yield inner.output_stream.close_async(io_priority, cancellable);
+ } catch (IOError e) {
+ handle_connection_error(e);
+ throw e;
+ }
+ }
+}
+
public class Flag : XmppStreamFlag {
public static FlagIdentity<Flag> IDENTITY = new FlagIdentity<Flag>(NS_URI, "jingle");
diff --git a/xmpp-vala/src/module/xep/0234_jingle_file_transfer.vala b/xmpp-vala/src/module/xep/0234_jingle_file_transfer.vala
index 2e636491..cce7b967 100644
--- a/xmpp-vala/src/module/xep/0234_jingle_file_transfer.vala
+++ b/xmpp-vala/src/module/xep/0234_jingle_file_transfer.vala
@@ -46,12 +46,7 @@ public class Module : Jingle.ContentType, XmppStreamModule {
Jingle.Session session = stream.get_module(Jingle.Module.IDENTITY)
.create_session(stream, Jingle.TransportType.STREAMING, receiver_full_jid, Jingle.Senders.INITIATOR, "a-file-offer", description); // TODO(hrxi): Why "a-file-offer"?
- SourceFunc callback = offer_file_stream.callback;
- session.accepted.connect((stream) => {
- session.conn.input_stream.close();
- Idle.add((owned) callback);
- });
- yield;
+ yield session.conn.input_stream.close_async();
// TODO(hrxi): catch errors
yield session.conn.output_stream.splice_async(input_stream, OutputStreamSpliceFlags.CLOSE_SOURCE|OutputStreamSpliceFlags.CLOSE_TARGET);