aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--libdino/src/service/file_manager.vala2
-rw-r--r--libdino/src/service/jingle_file_manager.vala9
-rw-r--r--xmpp-vala/src/module/xep/0047_in_band_bytestreams.vala49
3 files changed, 46 insertions, 14 deletions
diff --git a/libdino/src/service/file_manager.vala b/libdino/src/service/file_manager.vala
index 049239f7..7665936c 100644
--- a/libdino/src/service/file_manager.vala
+++ b/libdino/src/service/file_manager.vala
@@ -66,7 +66,7 @@ public class FileManager : StreamInteractionModule, Object {
foreach (FileSender file_sender in file_senders) {
if (file_sender.can_send(conversation, file_transfer)) {
file_sender.send_file(conversation, file_transfer);
- return;
+ break;
}
}
received_file(file_transfer, conversation);
diff --git a/libdino/src/service/jingle_file_manager.vala b/libdino/src/service/jingle_file_manager.vala
index bd470f0b..595afae0 100644
--- a/libdino/src/service/jingle_file_manager.vala
+++ b/libdino/src/service/jingle_file_manager.vala
@@ -45,9 +45,9 @@ public class JingleFileManager : StreamInteractionModule, FileProvider, FileSend
file_transfer.local_time = new DateTime.now_utc();
file_transfer.direction = FileTransfer.DIRECTION_RECEIVED;
file_transfer.file_name = jingle_file_transfer.file_name;
- file_transfer.size = (int)jingle_file_transfer.size; // TODO(hrxi): remove cast
+ file_transfer.size = (int)jingle_file_transfer.size;
file_transfer.state = FileTransfer.State.NOT_STARTED;
- file_transfer.provider = 0; // TODO(hrxi): what is this?
+ file_transfer.provider = 1;
file_transfer.info = id;
file_transfers[id] = jingle_file_transfer;
@@ -56,7 +56,8 @@ public class JingleFileManager : StreamInteractionModule, FileProvider, FileSend
}
async void get_meta_info(FileTransfer file_transfer) {
- // TODO(hrxi): what is this function?
+ // In Jingle, all the metadata is provided up-front, so there's no more
+ // metadata to get.
}
async void download(FileTransfer file_transfer, File file_) {
// TODO(hrxi) What should happen if `stream == null`?
@@ -112,7 +113,7 @@ public class JingleFileManager : StreamInteractionModule, FileProvider, FileSend
if (!stream.get_module(Xep.JingleFileTransfer.Module.IDENTITY).is_available(stream, full_jid)) {
continue;
}
- stream.get_module(Xep.JingleFileTransfer.Module.IDENTITY).offer_file_stream(stream, full_jid, file_transfer.input_stream, file_transfer.file_name, file_transfer.size);
+ stream.get_module(Xep.JingleFileTransfer.Module.IDENTITY).offer_file_stream.begin(stream, full_jid, file_transfer.input_stream, file_transfer.file_name, file_transfer.size);
return;
}
}
diff --git a/xmpp-vala/src/module/xep/0047_in_band_bytestreams.vala b/xmpp-vala/src/module/xep/0047_in_band_bytestreams.vala
index ea9d5f72..89247780 100644
--- a/xmpp-vala/src/module/xep/0047_in_band_bytestreams.vala
+++ b/xmpp-vala/src/module/xep/0047_in_band_bytestreams.vala
@@ -127,8 +127,14 @@ public class Connection : IOStream {
XmppStream stream;
+ int read_callback_priority;
+ Cancellable? read_callback_cancellable = null;
+ ulong read_callback_cancellable_id;
SourceFunc? read_callback = null;
+ int write_callback_priority;
SourceFunc? write_callback = null;
+ ulong write_callback_cancellable_id;
+ Cancellable? write_callback_cancellable = null;
// Need `Bytes` instead of `uint8[]` because the latter doesn't work in
// parameter position of `LinkedList`.
LinkedList<Bytes> received = new LinkedList<Bytes>();
@@ -144,35 +150,54 @@ public class Connection : IOStream {
output = new Output(this);
}
- public void set_read_calllback(SourceFunc callback) throws IOError {
+ public void set_read_callback(SourceFunc callback, Cancellable? cancellable, int io_priority) throws IOError {
if (read_callback != null) {
throw new IOError.PENDING("only one async read is permitted at a time on an in-band bytestream");
}
+ if (cancellable != null) {
+ read_callback_cancellable_id = cancellable.connect(trigger_read_callback);
+ }
read_callback = callback;
+ read_callback_cancellable = cancellable;
+ read_callback_priority = io_priority;
}
- public void set_write_calllback(SourceFunc callback) throws IOError {
+ public void set_write_callback(SourceFunc callback, Cancellable? cancellable, int io_priority) throws IOError {
if (write_callback != null) {
throw new IOError.PENDING("only one async write is permitted at a time on an in-band bytestream");
}
+ if (cancellable != null) {
+ write_callback_cancellable_id = cancellable.connect(trigger_write_callback);
+ }
write_callback = callback;
+ write_callback_cancellable = cancellable;
+ write_callback_priority = io_priority;
}
public void trigger_read_callback() {
if (read_callback != null) {
- Idle.add((owned) read_callback);
+ Idle.add((owned) read_callback, read_callback_priority);
read_callback = null;
+ if (read_callback_cancellable != null) {
+ read_callback_cancellable.disconnect(read_callback_cancellable_id);
+ }
+ read_callback_cancellable = null;
}
}
public void trigger_write_callback() {
if (write_callback != null) {
- Idle.add((owned) write_callback);
+ Idle.add((owned) write_callback, write_callback_priority);
write_callback = null;
+ if (write_callback_cancellable != null) {
+ write_callback_cancellable.disconnect(write_callback_cancellable_id);
+ }
+ write_callback_cancellable = null;
}
}
public async ssize_t read_async(uint8[]? buffer, int io_priority = GLib.Priority.DEFAULT, Cancellable? cancellable = null) throws IOError {
- // TODO(hrxi): cancellable?
- // TODO(hrxi): io_priority?
while (true) {
+ if (cancellable != null) {
+ cancellable.set_error_if_cancelled();
+ }
if (input_closed) {
return 0;
}
@@ -190,14 +215,17 @@ public class Connection : IOStream {
if (state == DISCONNECTED) {
return 0;
}
- set_read_calllback(read_async.callback);
+ set_read_callback(read_async.callback, cancellable, io_priority);
yield;
}
}
public async ssize_t write_async(uint8[]? buffer, int io_priority = GLib.Priority.DEFAULT, Cancellable? cancellable = null) throws IOError {
while (state == WAITING_FOR_CONNECT || state == CONNECTING) {
- set_write_calllback(write_async.callback);
+ if (cancellable != null) {
+ cancellable.set_error_if_cancelled();
+ }
+ set_write_callback(write_async.callback, cancellable, io_priority);
yield;
}
throw_if_closed();
@@ -214,7 +242,7 @@ public class Connection : IOStream {
.put_attribute("seq", seq.to_string())
.put_node(new StanzaNode.text(Base64.encode(buffer)));
Iq.Stanza iq = new Iq.Stanza.set(data) { to=receiver_full_jid };
- set_write_calllback(write_async.callback);
+ set_write_callback(write_async.callback, cancellable, io_priority);
stream.get_module(Iq.Module.IDENTITY).send_iq(stream, iq, (stream, iq) => {
if (iq.is_error()) {
set_error("sending failed");
@@ -228,6 +256,9 @@ public class Connection : IOStream {
}
});
yield;
+ if (cancellable != null) {
+ cancellable.set_error_if_cancelled();
+ }
throw_if_error();
return buffer.length;
}