aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--xmpp-vala/src/module/xep/0166_jingle.vala106
1 files changed, 93 insertions, 13 deletions
diff --git a/xmpp-vala/src/module/xep/0166_jingle.vala b/xmpp-vala/src/module/xep/0166_jingle.vala
index 4d9a472a..ac1dd10b 100644
--- a/xmpp-vala/src/module/xep/0166_jingle.vala
+++ b/xmpp-vala/src/module/xep/0166_jingle.vala
@@ -122,7 +122,7 @@ public class Module : XmppStreamModule, Iq.Handler {
throw new Error.GENERAL("Couldn't determine own JID");
}
TransportParameters transport_params = transport.create_transport_parameters();
- Session session = new Session.initiate_sent(random_uuid(), type, transport_params, receiver_full_jid, content_name);
+ Session session = new Session.initiate_sent(random_uuid(), type, transport_params, receiver_full_jid, content_name, stream);
StanzaNode content = new StanzaNode.build("content", NS_URI)
.put_attribute("creator", "initiator")
.put_attribute("name", content_name)
@@ -177,7 +177,7 @@ public class Module : XmppStreamModule, Iq.Handler {
ContentParameters content_params = content_type.parse_content_parameters(description);
TransportType type = content_type.content_type_transport_type();
- Session session = new Session.initiate_received(sid, type, transport_params, iq.from, name);
+ Session session = new Session.initiate_received(sid, type, transport_params, iq.from, name, stream);
stream.get_flag(Flag.IDENTITY).add_session(session);
stream.get_module(Iq.Module.IDENTITY).send_iq(stream, new Iq.Stanza.result(iq));
@@ -317,7 +317,9 @@ public class Session {
// Signals that the session has been accepted by the peer.
public signal void accepted(XmppStream stream);
- public Session.initiate_sent(string sid, Type type, TransportParameters transport, Jid peer_full_jid, string content_name) {
+ XmppStream hack;
+
+ public Session.initiate_sent(string sid, Type type, TransportParameters transport, Jid peer_full_jid, string content_name, XmppStream hack) {
this.state = State.INITIATE_SENT;
this.sid = sid;
this.type_ = type;
@@ -325,9 +327,10 @@ public class Session {
this.content_name = content_name;
this.transport = transport;
this.connection = new Connection(this);
+ this.hack = hack;
}
- public Session.initiate_received(string sid, Type type, TransportParameters? transport, Jid peer_full_jid, string content_name) {
+ public Session.initiate_received(string sid, Type type, TransportParameters? transport, Jid peer_full_jid, string content_name, XmppStream hack) {
this.state = State.INITIATE_RECEIVED;
this.sid = sid;
this.type_ = type;
@@ -335,6 +338,7 @@ public class Session {
this.content_name = content_name;
this.transport = transport;
this.connection = new Connection(this);
+ this.hack = hack;
}
public void handle_iq_set(XmppStream stream, string action, StanzaNode jingle, Iq.Stanza iq) throws IqError {
@@ -447,12 +451,22 @@ public class Session {
}
public void on_connection_error(IOError error) {
- // TODO(hrxi): conjure an XmppStream out of nowhere and terminate the session
+ // TODO(hrxi): where can we get an XmppStream from?
+ StanzaNode reason = new StanzaNode.build("reason", NS_URI)
+ .put_node(new StanzaNode.build("failed-transport", NS_URI))
+ .put_node(new StanzaNode.build("text", NS_URI)
+ .put_node(new StanzaNode.text(error.message))
+ );
+ terminate(hack, reason, "transport error: $(error.message)");
+ }
+ public void on_connection_close() {
+ StanzaNode reason = new StanzaNode.build("reason", NS_URI)
+ .put_node(new StanzaNode.build("success", NS_URI));
+ terminate(hack, reason, "success");
}
public void terminate(XmppStream stream, StanzaNode reason, string? local_reason) {
- if (state != State.INITIATE_SENT && state != State.INITIATE_RECEIVED && state != State.ACTIVE) {
- // TODO(hrxi): what to do?
+ if (state == State.ENDED) {
return;
}
if (state == State.ACTIVE) {
@@ -525,6 +539,9 @@ public class Connection : IOStream {
private IOStream? inner = null;
private string? error = null;
+ private bool read_closed = false;
+ private bool write_closed = false;
+
private class OnSetInnerCallback {
public SourceFunc callback;
public int io_priority;
@@ -578,12 +595,19 @@ public class Connection : IOStream {
strong.on_connection_error(error);
}
}
+ private void handle_connection_close() {
+ Session? strong = session;
+ if (strong != null) {
+ strong.on_connection_close();
+ }
+ }
public async ssize_t read_async(uint8[]? buffer, int io_priority = GLib.Priority.DEFAULT, Cancellable? cancellable = null) throws IOError {
yield wait_and_check_for_errors(io_priority, cancellable);
try {
return yield inner.input_stream.read_async(buffer, io_priority, cancellable);
} catch (IOError e) {
+ print("read_async error\n");
handle_connection_error(e);
throw e;
}
@@ -593,37 +617,93 @@ public class Connection : IOStream {
try {
return yield inner.output_stream.write_async(buffer, io_priority, cancellable);
} catch (IOError e) {
+ print("write_async error\n");
handle_connection_error(e);
throw e;
}
}
public bool close_read(Cancellable? cancellable = null) throws IOError {
check_for_errors();
+ if (read_closed) {
+ return true;
+ }
close_read_async.begin(GLib.Priority.DEFAULT, cancellable);
return true;
}
public async bool close_read_async(int io_priority = GLib.Priority.DEFAULT, Cancellable? cancellable = null) throws IOError {
yield wait_and_check_for_errors(io_priority, cancellable);
+ if (read_closed) {
+ return true;
+ }
+ read_closed = true;
+ IOError error = null;
+ bool result = true;
try {
- return yield inner.input_stream.close_async(io_priority, cancellable);
+ result = yield inner.input_stream.close_async(io_priority, cancellable);
} catch (IOError e) {
- handle_connection_error(e);
- throw e;
+ print("input_stream.close_async error\n");
+ if (error == null) {
+ error = e;
+ }
}
+ try {
+ result = (yield close_if_both_closed(io_priority, cancellable)) && result;
+ } catch (IOError e) {
+ print("close_if_both_closed error\n");
+ if (error == null) {
+ error = e;
+ }
+ }
+ if (error != null) {
+ handle_connection_error(error);
+ throw error;
+ }
+ return result;
}
public bool close_write(Cancellable? cancellable = null) throws IOError {
check_for_errors();
+ if (write_closed) {
+ return true;
+ }
close_write_async.begin(GLib.Priority.DEFAULT, cancellable);
return true;
}
public async bool close_write_async(int io_priority = GLib.Priority.DEFAULT, Cancellable? cancellable = null) throws IOError {
yield wait_and_check_for_errors(io_priority, cancellable);
+ if (write_closed) {
+ return true;
+ }
+ write_closed = true;
+ IOError error = null;
+ bool result = true;
try {
- return yield inner.output_stream.close_async(io_priority, cancellable);
+ result = yield inner.output_stream.close_async(io_priority, cancellable);
} catch (IOError e) {
- handle_connection_error(e);
- throw e;
+ print("output_stream.close_async error\n");
+ if (error == null) {
+ error = e;
+ }
}
+ try {
+ result = (yield close_if_both_closed(io_priority, cancellable)) && result;
+ } catch (IOError e) {
+ print("close_if_both_closed error\n");
+ if (error == null) {
+ error = e;
+ }
+ }
+ if (error != null) {
+ handle_connection_error(error);
+ throw error;
+ }
+ return result;
+ }
+ private async bool close_if_both_closed(int io_priority, Cancellable? cancellable = null) throws IOError {
+ if (read_closed && write_closed) {
+ handle_connection_close();
+ //return yield inner.close_async(io_priority, cancellable);
+ }
+ return true;
}
}