aboutsummaryrefslogtreecommitdiff
path: root/xmpp-vala/src/module/xep/0047_in_band_bytestreams.vala
diff options
context:
space:
mode:
Diffstat (limited to 'xmpp-vala/src/module/xep/0047_in_band_bytestreams.vala')
-rw-r--r--xmpp-vala/src/module/xep/0047_in_band_bytestreams.vala59
1 files changed, 23 insertions, 36 deletions
diff --git a/xmpp-vala/src/module/xep/0047_in_band_bytestreams.vala b/xmpp-vala/src/module/xep/0047_in_band_bytestreams.vala
index 2650a194..9aa2d98c 100644
--- a/xmpp-vala/src/module/xep/0047_in_band_bytestreams.vala
+++ b/xmpp-vala/src/module/xep/0047_in_band_bytestreams.vala
@@ -14,7 +14,9 @@ public class Module : XmppStreamModule, Iq.Handler {
stream.add_flag(new Flag());
stream.get_module(Iq.Module.IDENTITY).register_for_namespace(NS_URI, this);
}
- public override void detach(XmppStream stream) { }
+ public override void detach(XmppStream stream) {
+ stream.get_module(Iq.Module.IDENTITY).unregister_from_namespace(NS_URI, this);
+ }
public void on_iq_set(XmppStream stream, Iq.Stanza iq) {
// the iq module ensures that there's only one child node
@@ -23,28 +25,28 @@ public class Module : XmppStreamModule, Iq.Handler {
node = (node != null) ? node : iq.stanza.get_subnode("data", NS_URI);
node = (node != null) ? node : iq.stanza.get_subnode("close", NS_URI);
if (node == null) {
- stream.get_module(Iq.Module.IDENTITY).send_iq(stream, new Iq.Stanza.error(iq, new ErrorStanza.bad_request("unknown IBB action")));
+ stream.get_module(Iq.Module.IDENTITY).send_iq(stream, new Iq.Stanza.error(iq, new ErrorStanza.bad_request("unknown IBB action")) { to=iq.from });
return;
}
string? sid = node.get_attribute("sid");
if (sid == null) {
- stream.get_module(Iq.Module.IDENTITY).send_iq(stream, new Iq.Stanza.error(iq, new ErrorStanza.bad_request("missing sid")));
+ stream.get_module(Iq.Module.IDENTITY).send_iq(stream, new Iq.Stanza.error(iq, new ErrorStanza.bad_request("missing sid")) { to=iq.from });
return;
}
Connection? conn = stream.get_flag(Flag.IDENTITY).get_connection(sid);
if (node.name == "open") {
if (conn == null) {
- stream.get_module(Iq.Module.IDENTITY).send_iq(stream, new Iq.Stanza.error(iq, new ErrorStanza.not_acceptable("unexpected IBB connection")));
+ stream.get_module(Iq.Module.IDENTITY).send_iq(stream, new Iq.Stanza.error(iq, new ErrorStanza.not_acceptable("unexpected IBB connection")) { to=iq.from });
return;
}
if (conn.state != Connection.State.WAITING_FOR_CONNECT) {
- stream.get_module(Iq.Module.IDENTITY).send_iq(stream, new Iq.Stanza.error(iq, new ErrorStanza.bad_request("IBB open for already open connection")));
+ stream.get_module(Iq.Module.IDENTITY).send_iq(stream, new Iq.Stanza.error(iq, new ErrorStanza.bad_request("IBB open for already open connection")) { to=iq.from });
return;
}
conn.handle_open(stream, node, iq);
} else {
if (conn == null || conn.state != Connection.State.CONNECTED) {
- stream.get_module(Iq.Module.IDENTITY).send_iq(stream, new Iq.Stanza.error(iq, new ErrorStanza.item_not_found()));
+ stream.get_module(Iq.Module.IDENTITY).send_iq(stream, new Iq.Stanza.error(iq, new ErrorStanza.item_not_found()) { to=iq.from });
return;
}
if (node.name == "close") {
@@ -60,9 +62,8 @@ public class Module : XmppStreamModule, Iq.Handler {
}
public class Connection : IOStream {
- // TODO(hrxi): Fix reference cycle
public class Input : InputStream {
- private Connection connection;
+ private weak Connection connection;
public Input(Connection connection) {
this.connection = connection;
}
@@ -73,14 +74,14 @@ public class Connection : IOStream {
return yield connection.read_async(buffer, io_priority, cancellable);
}
public override bool close(Cancellable? cancellable = null) throws IOError {
- return connection.close_read(cancellable);
+ throw new IOError.NOT_SUPPORTED("can't do non-async closes on in-band bytestreams");
}
public override async bool close_async(int io_priority = GLib.Priority.DEFAULT, Cancellable? cancellable = null) throws IOError {
return yield connection.close_read_async(io_priority, cancellable);
}
}
public class Output : OutputStream {
- private Connection connection;
+ private weak Connection connection;
public Output(Connection connection) {
this.connection = connection;
}
@@ -91,7 +92,7 @@ public class Connection : IOStream {
return yield connection.write_async(buffer, io_priority, cancellable);
}
public override bool close(Cancellable? cancellable = null) throws IOError {
- return connection.close_write(cancellable);
+ throw new IOError.NOT_SUPPORTED("can't do non-async closes on in-band bytestreams");
}
public override async bool close_async(int io_priority = GLib.Priority.DEFAULT, Cancellable? cancellable = null) throws IOError {
return yield connection.close_write_async(io_priority, cancellable);
@@ -150,25 +151,25 @@ public class Connection : IOStream {
output = new Output(this);
}
- public void set_read_callback(SourceFunc callback, Cancellable? cancellable, int io_priority) throws IOError {
+ public void set_read_callback(owned SourceFunc callback, Cancellable? cancellable, int io_priority) throws IOError {
if (read_callback != null) {
throw new IOError.PENDING("only one async read is permitted at a time on an in-band bytestream");
}
if (cancellable != null) {
read_callback_cancellable_id = cancellable.connect(trigger_read_callback);
}
- read_callback = callback;
+ read_callback = (owned)callback;
read_callback_cancellable = cancellable;
read_callback_priority = io_priority;
}
- public void set_write_callback(SourceFunc callback, Cancellable? cancellable, int io_priority) throws IOError {
+ public void set_write_callback(owned SourceFunc callback, Cancellable? cancellable, int io_priority) throws IOError {
if (write_callback != null) {
throw new IOError.PENDING("only one async write is permitted at a time on an in-band bytestream");
}
if (cancellable != null) {
write_callback_cancellable_id = cancellable.connect(trigger_write_callback);
}
- write_callback = callback;
+ write_callback = (owned)callback;
write_callback_cancellable = cancellable;
write_callback_priority = io_priority;
}
@@ -263,13 +264,6 @@ public class Connection : IOStream {
return buffer.length;
}
- public bool close_read(Cancellable? cancellable = null) {
- input_closed = true;
- if (!output_closed) {
- return true;
- }
- return close_impl(cancellable);
- }
public async bool close_read_async(int io_priority = GLib.Priority.DEFAULT, Cancellable? cancellable = null) throws IOError {
input_closed = true;
if (!output_closed) {
@@ -277,13 +271,6 @@ public class Connection : IOStream {
}
return yield close_async_impl(io_priority, cancellable);
}
- public bool close_write(Cancellable? cancellable = null) {
- output_closed = true;
- if (!input_closed) {
- return true;
- }
- return close_impl(cancellable);
- }
public async bool close_write_async(int io_priority = GLib.Priority.DEFAULT, Cancellable? cancellable = null) throws IOError {
output_closed = true;
if (!input_closed) {
@@ -292,7 +279,7 @@ public class Connection : IOStream {
return yield close_async_impl(io_priority, cancellable);
}
delegate void OnClose(bool success);
- private bool close_impl(Cancellable? cancellable = null, OnClose? on_close = null) {
+ private bool close_impl(Cancellable? cancellable, OnClose on_close) {
if (state == State.DISCONNECTING || state == State.DISCONNECTED || state == State.ERROR) {
on_close(true);
return true;
@@ -386,17 +373,17 @@ public class Connection : IOStream {
string? stanza = open.get_attribute("stanza");
if (block_size < 0 || (stanza != null && stanza != "iq" && stanza != "message")) {
set_error("invalid open");
- stream.get_module(Iq.Module.IDENTITY).send_iq(stream, new Iq.Stanza.error(iq, new ErrorStanza.bad_request("missing block_size or invalid stanza")));
+ stream.get_module(Iq.Module.IDENTITY).send_iq(stream, new Iq.Stanza.error(iq, new ErrorStanza.bad_request("missing block_size or invalid stanza")) { to=iq.from });
return;
}
if (stanza != null && stanza != "iq") {
set_error("invalid open");
- stream.get_module(Iq.Module.IDENTITY).send_iq(stream, new Iq.Stanza.error(iq, new ErrorStanza.feature_not_implemented("cannot use message stanzas for IBB")));
+ stream.get_module(Iq.Module.IDENTITY).send_iq(stream, new Iq.Stanza.error(iq, new ErrorStanza.feature_not_implemented("cannot use message stanzas for IBB")) { to=iq.from });
return;
}
if (block_size > this.block_size) {
set_error("invalid open");
- stream.get_module(Iq.Module.IDENTITY).send_iq(stream, new Iq.Stanza.error(iq, new ErrorStanza.build(ErrorStanza.TYPE_CANCEL, ErrorStanza.CONDITION_RESOURCE_CONSTRAINT, "opening a connection with a greater than negotiated/acceptable block size", null)));
+ stream.get_module(Iq.Module.IDENTITY).send_iq(stream, new Iq.Stanza.error(iq, new ErrorStanza.build(ErrorStanza.TYPE_CANCEL, ErrorStanza.CONDITION_RESOURCE_CONSTRAINT, "opening a connection with a greater than negotiated/acceptable block size", null)) { to=iq.from });
return;
}
this.block_size = block_size;
@@ -408,7 +395,7 @@ public class Connection : IOStream {
assert(state == State.CONNECTED);
if (input_closed) {
set_error("unexpected data");
- stream.get_module(Iq.Module.IDENTITY).send_iq(stream, new Iq.Stanza.error(iq, new ErrorStanza.not_allowed("unexpected data")));
+ stream.get_module(Iq.Module.IDENTITY).send_iq(stream, new Iq.Stanza.error(iq, new ErrorStanza.not_allowed("unexpected data")) { to=iq.from });
return;
}
int seq = data.get_attribute_int("seq");
@@ -417,12 +404,12 @@ public class Connection : IOStream {
uint8[] content = Base64.decode(data.get_string_content());
if (content.length > block_size) {
set_error("data longer than negotiated block size");
- stream.get_module(Iq.Module.IDENTITY).send_iq(stream, new Iq.Stanza.error(iq, new ErrorStanza.bad_request("data longer than negotiated block size")));
+ stream.get_module(Iq.Module.IDENTITY).send_iq(stream, new Iq.Stanza.error(iq, new ErrorStanza.bad_request("data longer than negotiated block size")) { to=iq.from });
return;
}
if (seq < 0 || seq != remote_seq) {
set_error("out of order data packets");
- stream.get_module(Iq.Module.IDENTITY).send_iq(stream, new Iq.Stanza.error(iq, new ErrorStanza.build(ErrorStanza.TYPE_CANCEL, ErrorStanza.CONDITION_UNEXPECTED_REQUEST, "out of order data packets", null)));
+ stream.get_module(Iq.Module.IDENTITY).send_iq(stream, new Iq.Stanza.error(iq, new ErrorStanza.build(ErrorStanza.TYPE_CANCEL, ErrorStanza.CONDITION_UNEXPECTED_REQUEST, "out of order data packets", null)) { to=iq.from });
return;
}
remote_seq = (remote_seq + 1) % SEQ_MODULUS;