From 4b6fe6bf7f86f665238d709c30a777dbc6c81acf Mon Sep 17 00:00:00 2001 From: hrxi Date: Fri, 12 Jul 2019 01:53:28 +0200 Subject: Address pull requests comments, fix a few TODOs --- .../src/module/xep/0047_in_band_bytestreams.vala | 49 ++++++++++++++++++---- 1 file changed, 40 insertions(+), 9 deletions(-) (limited to 'xmpp-vala/src/module') diff --git a/xmpp-vala/src/module/xep/0047_in_band_bytestreams.vala b/xmpp-vala/src/module/xep/0047_in_band_bytestreams.vala index ea9d5f72..89247780 100644 --- a/xmpp-vala/src/module/xep/0047_in_band_bytestreams.vala +++ b/xmpp-vala/src/module/xep/0047_in_band_bytestreams.vala @@ -127,8 +127,14 @@ public class Connection : IOStream { XmppStream stream; + int read_callback_priority; + Cancellable? read_callback_cancellable = null; + ulong read_callback_cancellable_id; SourceFunc? read_callback = null; + int write_callback_priority; SourceFunc? write_callback = null; + ulong write_callback_cancellable_id; + Cancellable? write_callback_cancellable = null; // Need `Bytes` instead of `uint8[]` because the latter doesn't work in // parameter position of `LinkedList`. LinkedList received = new LinkedList(); @@ -144,35 +150,54 @@ public class Connection : IOStream { output = new Output(this); } - public void set_read_calllback(SourceFunc callback) throws IOError { + public void set_read_callback(SourceFunc callback, Cancellable? cancellable, int io_priority) throws IOError { if (read_callback != null) { throw new IOError.PENDING("only one async read is permitted at a time on an in-band bytestream"); } + if (cancellable != null) { + read_callback_cancellable_id = cancellable.connect(trigger_read_callback); + } read_callback = callback; + read_callback_cancellable = cancellable; + read_callback_priority = io_priority; } - public void set_write_calllback(SourceFunc callback) throws IOError { + public void set_write_callback(SourceFunc callback, Cancellable? cancellable, int io_priority) throws IOError { if (write_callback != null) { throw new IOError.PENDING("only one async write is permitted at a time on an in-band bytestream"); } + if (cancellable != null) { + write_callback_cancellable_id = cancellable.connect(trigger_write_callback); + } write_callback = callback; + write_callback_cancellable = cancellable; + write_callback_priority = io_priority; } public void trigger_read_callback() { if (read_callback != null) { - Idle.add((owned) read_callback); + Idle.add((owned) read_callback, read_callback_priority); read_callback = null; + if (read_callback_cancellable != null) { + read_callback_cancellable.disconnect(read_callback_cancellable_id); + } + read_callback_cancellable = null; } } public void trigger_write_callback() { if (write_callback != null) { - Idle.add((owned) write_callback); + Idle.add((owned) write_callback, write_callback_priority); write_callback = null; + if (write_callback_cancellable != null) { + write_callback_cancellable.disconnect(write_callback_cancellable_id); + } + write_callback_cancellable = null; } } public async ssize_t read_async(uint8[]? buffer, int io_priority = GLib.Priority.DEFAULT, Cancellable? cancellable = null) throws IOError { - // TODO(hrxi): cancellable? - // TODO(hrxi): io_priority? while (true) { + if (cancellable != null) { + cancellable.set_error_if_cancelled(); + } if (input_closed) { return 0; } @@ -190,14 +215,17 @@ public class Connection : IOStream { if (state == DISCONNECTED) { return 0; } - set_read_calllback(read_async.callback); + set_read_callback(read_async.callback, cancellable, io_priority); yield; } } public async ssize_t write_async(uint8[]? buffer, int io_priority = GLib.Priority.DEFAULT, Cancellable? cancellable = null) throws IOError { while (state == WAITING_FOR_CONNECT || state == CONNECTING) { - set_write_calllback(write_async.callback); + if (cancellable != null) { + cancellable.set_error_if_cancelled(); + } + set_write_callback(write_async.callback, cancellable, io_priority); yield; } throw_if_closed(); @@ -214,7 +242,7 @@ public class Connection : IOStream { .put_attribute("seq", seq.to_string()) .put_node(new StanzaNode.text(Base64.encode(buffer))); Iq.Stanza iq = new Iq.Stanza.set(data) { to=receiver_full_jid }; - set_write_calllback(write_async.callback); + set_write_callback(write_async.callback, cancellable, io_priority); stream.get_module(Iq.Module.IDENTITY).send_iq(stream, iq, (stream, iq) => { if (iq.is_error()) { set_error("sending failed"); @@ -228,6 +256,9 @@ public class Connection : IOStream { } }); yield; + if (cancellable != null) { + cancellable.set_error_if_cancelled(); + } throw_if_error(); return buffer.length; } -- cgit v1.2.3-70-g09d2