From 4b6fe6bf7f86f665238d709c30a777dbc6c81acf Mon Sep 17 00:00:00 2001 From: hrxi Date: Fri, 12 Jul 2019 01:53:28 +0200 Subject: Address pull requests comments, fix a few TODOs --- libdino/src/service/file_manager.vala | 2 +- libdino/src/service/jingle_file_manager.vala | 9 ++-- .../src/module/xep/0047_in_band_bytestreams.vala | 49 ++++++++++++++++++---- 3 files changed, 46 insertions(+), 14 deletions(-) diff --git a/libdino/src/service/file_manager.vala b/libdino/src/service/file_manager.vala index 049239f7..7665936c 100644 --- a/libdino/src/service/file_manager.vala +++ b/libdino/src/service/file_manager.vala @@ -66,7 +66,7 @@ public class FileManager : StreamInteractionModule, Object { foreach (FileSender file_sender in file_senders) { if (file_sender.can_send(conversation, file_transfer)) { file_sender.send_file(conversation, file_transfer); - return; + break; } } received_file(file_transfer, conversation); diff --git a/libdino/src/service/jingle_file_manager.vala b/libdino/src/service/jingle_file_manager.vala index bd470f0b..595afae0 100644 --- a/libdino/src/service/jingle_file_manager.vala +++ b/libdino/src/service/jingle_file_manager.vala @@ -45,9 +45,9 @@ public class JingleFileManager : StreamInteractionModule, FileProvider, FileSend file_transfer.local_time = new DateTime.now_utc(); file_transfer.direction = FileTransfer.DIRECTION_RECEIVED; file_transfer.file_name = jingle_file_transfer.file_name; - file_transfer.size = (int)jingle_file_transfer.size; // TODO(hrxi): remove cast + file_transfer.size = (int)jingle_file_transfer.size; file_transfer.state = FileTransfer.State.NOT_STARTED; - file_transfer.provider = 0; // TODO(hrxi): what is this? + file_transfer.provider = 1; file_transfer.info = id; file_transfers[id] = jingle_file_transfer; @@ -56,7 +56,8 @@ public class JingleFileManager : StreamInteractionModule, FileProvider, FileSend } async void get_meta_info(FileTransfer file_transfer) { - // TODO(hrxi): what is this function? + // In Jingle, all the metadata is provided up-front, so there's no more + // metadata to get. } async void download(FileTransfer file_transfer, File file_) { // TODO(hrxi) What should happen if `stream == null`? @@ -112,7 +113,7 @@ public class JingleFileManager : StreamInteractionModule, FileProvider, FileSend if (!stream.get_module(Xep.JingleFileTransfer.Module.IDENTITY).is_available(stream, full_jid)) { continue; } - stream.get_module(Xep.JingleFileTransfer.Module.IDENTITY).offer_file_stream(stream, full_jid, file_transfer.input_stream, file_transfer.file_name, file_transfer.size); + stream.get_module(Xep.JingleFileTransfer.Module.IDENTITY).offer_file_stream.begin(stream, full_jid, file_transfer.input_stream, file_transfer.file_name, file_transfer.size); return; } } diff --git a/xmpp-vala/src/module/xep/0047_in_band_bytestreams.vala b/xmpp-vala/src/module/xep/0047_in_band_bytestreams.vala index ea9d5f72..89247780 100644 --- a/xmpp-vala/src/module/xep/0047_in_band_bytestreams.vala +++ b/xmpp-vala/src/module/xep/0047_in_band_bytestreams.vala @@ -127,8 +127,14 @@ public class Connection : IOStream { XmppStream stream; + int read_callback_priority; + Cancellable? read_callback_cancellable = null; + ulong read_callback_cancellable_id; SourceFunc? read_callback = null; + int write_callback_priority; SourceFunc? write_callback = null; + ulong write_callback_cancellable_id; + Cancellable? write_callback_cancellable = null; // Need `Bytes` instead of `uint8[]` because the latter doesn't work in // parameter position of `LinkedList`. LinkedList received = new LinkedList(); @@ -144,35 +150,54 @@ public class Connection : IOStream { output = new Output(this); } - public void set_read_calllback(SourceFunc callback) throws IOError { + public void set_read_callback(SourceFunc callback, Cancellable? cancellable, int io_priority) throws IOError { if (read_callback != null) { throw new IOError.PENDING("only one async read is permitted at a time on an in-band bytestream"); } + if (cancellable != null) { + read_callback_cancellable_id = cancellable.connect(trigger_read_callback); + } read_callback = callback; + read_callback_cancellable = cancellable; + read_callback_priority = io_priority; } - public void set_write_calllback(SourceFunc callback) throws IOError { + public void set_write_callback(SourceFunc callback, Cancellable? cancellable, int io_priority) throws IOError { if (write_callback != null) { throw new IOError.PENDING("only one async write is permitted at a time on an in-band bytestream"); } + if (cancellable != null) { + write_callback_cancellable_id = cancellable.connect(trigger_write_callback); + } write_callback = callback; + write_callback_cancellable = cancellable; + write_callback_priority = io_priority; } public void trigger_read_callback() { if (read_callback != null) { - Idle.add((owned) read_callback); + Idle.add((owned) read_callback, read_callback_priority); read_callback = null; + if (read_callback_cancellable != null) { + read_callback_cancellable.disconnect(read_callback_cancellable_id); + } + read_callback_cancellable = null; } } public void trigger_write_callback() { if (write_callback != null) { - Idle.add((owned) write_callback); + Idle.add((owned) write_callback, write_callback_priority); write_callback = null; + if (write_callback_cancellable != null) { + write_callback_cancellable.disconnect(write_callback_cancellable_id); + } + write_callback_cancellable = null; } } public async ssize_t read_async(uint8[]? buffer, int io_priority = GLib.Priority.DEFAULT, Cancellable? cancellable = null) throws IOError { - // TODO(hrxi): cancellable? - // TODO(hrxi): io_priority? while (true) { + if (cancellable != null) { + cancellable.set_error_if_cancelled(); + } if (input_closed) { return 0; } @@ -190,14 +215,17 @@ public class Connection : IOStream { if (state == DISCONNECTED) { return 0; } - set_read_calllback(read_async.callback); + set_read_callback(read_async.callback, cancellable, io_priority); yield; } } public async ssize_t write_async(uint8[]? buffer, int io_priority = GLib.Priority.DEFAULT, Cancellable? cancellable = null) throws IOError { while (state == WAITING_FOR_CONNECT || state == CONNECTING) { - set_write_calllback(write_async.callback); + if (cancellable != null) { + cancellable.set_error_if_cancelled(); + } + set_write_callback(write_async.callback, cancellable, io_priority); yield; } throw_if_closed(); @@ -214,7 +242,7 @@ public class Connection : IOStream { .put_attribute("seq", seq.to_string()) .put_node(new StanzaNode.text(Base64.encode(buffer))); Iq.Stanza iq = new Iq.Stanza.set(data) { to=receiver_full_jid }; - set_write_calllback(write_async.callback); + set_write_callback(write_async.callback, cancellable, io_priority); stream.get_module(Iq.Module.IDENTITY).send_iq(stream, iq, (stream, iq) => { if (iq.is_error()) { set_error("sending failed"); @@ -228,6 +256,9 @@ public class Connection : IOStream { } }); yield; + if (cancellable != null) { + cancellable.set_error_if_cancelled(); + } throw_if_error(); return buffer.length; } -- cgit v1.2.3-70-g09d2