aboutsummaryrefslogtreecommitdiff
path: root/xmpp-vala/src/module/xep/0047_in_band_bytestreams.vala
diff options
context:
space:
mode:
Diffstat (limited to 'xmpp-vala/src/module/xep/0047_in_band_bytestreams.vala')
-rw-r--r--xmpp-vala/src/module/xep/0047_in_band_bytestreams.vala49
1 files changed, 40 insertions, 9 deletions
diff --git a/xmpp-vala/src/module/xep/0047_in_band_bytestreams.vala b/xmpp-vala/src/module/xep/0047_in_band_bytestreams.vala
index ea9d5f72..89247780 100644
--- a/xmpp-vala/src/module/xep/0047_in_band_bytestreams.vala
+++ b/xmpp-vala/src/module/xep/0047_in_band_bytestreams.vala
@@ -127,8 +127,14 @@ public class Connection : IOStream {
XmppStream stream;
+ int read_callback_priority;
+ Cancellable? read_callback_cancellable = null;
+ ulong read_callback_cancellable_id;
SourceFunc? read_callback = null;
+ int write_callback_priority;
SourceFunc? write_callback = null;
+ ulong write_callback_cancellable_id;
+ Cancellable? write_callback_cancellable = null;
// Need `Bytes` instead of `uint8[]` because the latter doesn't work in
// parameter position of `LinkedList`.
LinkedList<Bytes> received = new LinkedList<Bytes>();
@@ -144,35 +150,54 @@ public class Connection : IOStream {
output = new Output(this);
}
- public void set_read_calllback(SourceFunc callback) throws IOError {
+ public void set_read_callback(SourceFunc callback, Cancellable? cancellable, int io_priority) throws IOError {
if (read_callback != null) {
throw new IOError.PENDING("only one async read is permitted at a time on an in-band bytestream");
}
+ if (cancellable != null) {
+ read_callback_cancellable_id = cancellable.connect(trigger_read_callback);
+ }
read_callback = callback;
+ read_callback_cancellable = cancellable;
+ read_callback_priority = io_priority;
}
- public void set_write_calllback(SourceFunc callback) throws IOError {
+ public void set_write_callback(SourceFunc callback, Cancellable? cancellable, int io_priority) throws IOError {
if (write_callback != null) {
throw new IOError.PENDING("only one async write is permitted at a time on an in-band bytestream");
}
+ if (cancellable != null) {
+ write_callback_cancellable_id = cancellable.connect(trigger_write_callback);
+ }
write_callback = callback;
+ write_callback_cancellable = cancellable;
+ write_callback_priority = io_priority;
}
public void trigger_read_callback() {
if (read_callback != null) {
- Idle.add((owned) read_callback);
+ Idle.add((owned) read_callback, read_callback_priority);
read_callback = null;
+ if (read_callback_cancellable != null) {
+ read_callback_cancellable.disconnect(read_callback_cancellable_id);
+ }
+ read_callback_cancellable = null;
}
}
public void trigger_write_callback() {
if (write_callback != null) {
- Idle.add((owned) write_callback);
+ Idle.add((owned) write_callback, write_callback_priority);
write_callback = null;
+ if (write_callback_cancellable != null) {
+ write_callback_cancellable.disconnect(write_callback_cancellable_id);
+ }
+ write_callback_cancellable = null;
}
}
public async ssize_t read_async(uint8[]? buffer, int io_priority = GLib.Priority.DEFAULT, Cancellable? cancellable = null) throws IOError {
- // TODO(hrxi): cancellable?
- // TODO(hrxi): io_priority?
while (true) {
+ if (cancellable != null) {
+ cancellable.set_error_if_cancelled();
+ }
if (input_closed) {
return 0;
}
@@ -190,14 +215,17 @@ public class Connection : IOStream {
if (state == DISCONNECTED) {
return 0;
}
- set_read_calllback(read_async.callback);
+ set_read_callback(read_async.callback, cancellable, io_priority);
yield;
}
}
public async ssize_t write_async(uint8[]? buffer, int io_priority = GLib.Priority.DEFAULT, Cancellable? cancellable = null) throws IOError {
while (state == WAITING_FOR_CONNECT || state == CONNECTING) {
- set_write_calllback(write_async.callback);
+ if (cancellable != null) {
+ cancellable.set_error_if_cancelled();
+ }
+ set_write_callback(write_async.callback, cancellable, io_priority);
yield;
}
throw_if_closed();
@@ -214,7 +242,7 @@ public class Connection : IOStream {
.put_attribute("seq", seq.to_string())
.put_node(new StanzaNode.text(Base64.encode(buffer)));
Iq.Stanza iq = new Iq.Stanza.set(data) { to=receiver_full_jid };
- set_write_calllback(write_async.callback);
+ set_write_callback(write_async.callback, cancellable, io_priority);
stream.get_module(Iq.Module.IDENTITY).send_iq(stream, iq, (stream, iq) => {
if (iq.is_error()) {
set_error("sending failed");
@@ -228,6 +256,9 @@ public class Connection : IOStream {
}
});
yield;
+ if (cancellable != null) {
+ cancellable.set_error_if_cancelled();
+ }
throw_if_error();
return buffer.length;
}