aboutsummaryrefslogtreecommitdiff
path: root/xmpp-vala/src/module/xep/0198_stream_management.vala
diff options
context:
space:
mode:
Diffstat (limited to 'xmpp-vala/src/module/xep/0198_stream_management.vala')
-rw-r--r--xmpp-vala/src/module/xep/0198_stream_management.vala59
1 files changed, 42 insertions, 17 deletions
diff --git a/xmpp-vala/src/module/xep/0198_stream_management.vala b/xmpp-vala/src/module/xep/0198_stream_management.vala
index 340c4e6f..68eee8ae 100644
--- a/xmpp-vala/src/module/xep/0198_stream_management.vala
+++ b/xmpp-vala/src/module/xep/0198_stream_management.vala
@@ -13,32 +13,51 @@ public class Module : XmppStreamNegotiationModule, WriteNodeFunc {
public string? session_id { get; set; default=null; }
public Gee.List<XmppStreamFlag> flags = null;
private HashMap<int, QueueItem> in_flight_stanzas = new HashMap<int, QueueItem>();
- private Gee.List<QueueItem> node_queue = new ArrayList<QueueItem>();
+ private Gee.Queue<QueueItem> node_queue = new PriorityQueue<QueueItem>((a, b) => {
+ return a.io_priority - b.io_priority;
+ });
private class QueueItem {
public StanzaNode node;
- public Promise<IOError?> promise;
+ public int io_priority;
+ public Cancellable? cancellable;
+ public Promise<void*> promise;
- public QueueItem(StanzaNode node, Promise<IOError?> promise) {
+ public QueueItem(StanzaNode node, int io_priority, Cancellable? cancellable) {
this.node = node;
- this.promise = promise;
+ this.io_priority = io_priority;
+ this.cancellable = cancellable;
+ this.promise = new Promise<void*>();
}
}
- public async void write_stanza(XmppStream stream, StanzaNode node) throws IOError {
- var promise = new Promise<IOError?>();
-
- node_queue.add(new QueueItem(node, promise));
- check_queue(stream);
-
+ public async void write_stanza(XmppStream stream, StanzaNode node, int io_priority = Priority.DEFAULT, Cancellable? cancellable = null) throws IOError {
+ var future = enqueue_stanza(stream, node, io_priority, cancellable);
try {
- yield promise.future.wait_async();
+ yield future.wait_async();
} catch (FutureError e) {
- throw new IOError.FAILED("Future returned error %i".printf(e.code));
+ if (e is FutureError.ABANDON_PROMISE) {
+ throw new IOError.FAILED("Future abandoned: %s".printf(e.message));
+ } else if (e is FutureError.EXCEPTION) {
+ if (future.exception is IOError) {
+ throw (IOError) future.exception;
+ } else {
+ throw new IOError.FAILED("Unknown error %s".printf(future.exception.message));
+ }
+ } else {
+ throw new IOError.FAILED("Unknown future error: %s".printf(e.message));
+ }
}
}
- internal async void write_node(XmppStream stream, StanzaNode node) {
+ private Future<void*> enqueue_stanza(XmppStream stream, StanzaNode node, int io_priority, Cancellable? cancellable) {
+ var queue_item = new QueueItem(node, io_priority, cancellable);
+ node_queue.offer(queue_item);
+ check_queue(stream);
+ return queue_item.promise.future;
+ }
+
+ internal async void write_node(XmppStream stream, StanzaNode node, int io_priority = Priority.DEFAULT, Cancellable? cancellable = null) {
StanzaWriter? writer = ((IoXmppStream)stream).writer;
if (writer == null) return;
try {
@@ -46,22 +65,28 @@ public class Module : XmppStreamNegotiationModule, WriteNodeFunc {
if (node.name == "message" || node.name == "iq" || node.name == "presence") {
var r_node = new StanzaNode.build("r", NS_URI).add_self_xmlns();
stream.log.node("OUT", r_node, stream);
- yield ((!)writer).write_nodes(node, r_node);
+ yield ((!)writer).write_nodes(node, r_node, io_priority, cancellable);
} else {
- yield ((!)writer).write_node(node);
+ yield ((!)writer).write_node(node, io_priority, cancellable);
}
} catch (IOError e) { }
}
private void check_queue(XmppStream stream) {
while (!node_queue.is_empty && in_flight_stanzas.size < 10) {
- QueueItem queue_item = node_queue.remove_at(0);
+ QueueItem queue_item = node_queue.poll();
+ try {
+ queue_item.cancellable.set_error_if_cancelled();
+ } catch (IOError e) {
+ queue_item.promise.set_exception(e);
+ continue;
+ }
StanzaNode node = queue_item.node;
if (node.name == "message" || node.name == "iq" || node.name == "presence") {
in_flight_stanzas[++h_outbound] = queue_item;
}
- write_node.begin(stream, node);
+ write_node.begin(stream, node, queue_item.io_priority, queue_item.cancellable);
}
}