From d76e12b215eb62e4eda5a0f92fbf5c1bd7c1848e Mon Sep 17 00:00:00 2001 From: Marvin W Date: Tue, 31 Jan 2023 15:13:12 +0100 Subject: Add priority for and allow cancellation of outgoing stanzas --- .../src/module/xep/0198_stream_management.vala | 59 +++++++++++++++------- 1 file changed, 42 insertions(+), 17 deletions(-) (limited to 'xmpp-vala/src/module/xep/0198_stream_management.vala') diff --git a/xmpp-vala/src/module/xep/0198_stream_management.vala b/xmpp-vala/src/module/xep/0198_stream_management.vala index 340c4e6f..68eee8ae 100644 --- a/xmpp-vala/src/module/xep/0198_stream_management.vala +++ b/xmpp-vala/src/module/xep/0198_stream_management.vala @@ -13,32 +13,51 @@ public class Module : XmppStreamNegotiationModule, WriteNodeFunc { public string? session_id { get; set; default=null; } public Gee.List flags = null; private HashMap in_flight_stanzas = new HashMap(); - private Gee.List node_queue = new ArrayList(); + private Gee.Queue node_queue = new PriorityQueue((a, b) => { + return a.io_priority - b.io_priority; + }); private class QueueItem { public StanzaNode node; - public Promise promise; + public int io_priority; + public Cancellable? cancellable; + public Promise promise; - public QueueItem(StanzaNode node, Promise promise) { + public QueueItem(StanzaNode node, int io_priority, Cancellable? cancellable) { this.node = node; - this.promise = promise; + this.io_priority = io_priority; + this.cancellable = cancellable; + this.promise = new Promise(); } } - public async void write_stanza(XmppStream stream, StanzaNode node) throws IOError { - var promise = new Promise(); - - node_queue.add(new QueueItem(node, promise)); - check_queue(stream); - + public async void write_stanza(XmppStream stream, StanzaNode node, int io_priority = Priority.DEFAULT, Cancellable? cancellable = null) throws IOError { + var future = enqueue_stanza(stream, node, io_priority, cancellable); try { - yield promise.future.wait_async(); + yield future.wait_async(); } catch (FutureError e) { - throw new IOError.FAILED("Future returned error %i".printf(e.code)); + if (e is FutureError.ABANDON_PROMISE) { + throw new IOError.FAILED("Future abandoned: %s".printf(e.message)); + } else if (e is FutureError.EXCEPTION) { + if (future.exception is IOError) { + throw (IOError) future.exception; + } else { + throw new IOError.FAILED("Unknown error %s".printf(future.exception.message)); + } + } else { + throw new IOError.FAILED("Unknown future error: %s".printf(e.message)); + } } } - internal async void write_node(XmppStream stream, StanzaNode node) { + private Future enqueue_stanza(XmppStream stream, StanzaNode node, int io_priority, Cancellable? cancellable) { + var queue_item = new QueueItem(node, io_priority, cancellable); + node_queue.offer(queue_item); + check_queue(stream); + return queue_item.promise.future; + } + + internal async void write_node(XmppStream stream, StanzaNode node, int io_priority = Priority.DEFAULT, Cancellable? cancellable = null) { StanzaWriter? writer = ((IoXmppStream)stream).writer; if (writer == null) return; try { @@ -46,22 +65,28 @@ public class Module : XmppStreamNegotiationModule, WriteNodeFunc { if (node.name == "message" || node.name == "iq" || node.name == "presence") { var r_node = new StanzaNode.build("r", NS_URI).add_self_xmlns(); stream.log.node("OUT", r_node, stream); - yield ((!)writer).write_nodes(node, r_node); + yield ((!)writer).write_nodes(node, r_node, io_priority, cancellable); } else { - yield ((!)writer).write_node(node); + yield ((!)writer).write_node(node, io_priority, cancellable); } } catch (IOError e) { } } private void check_queue(XmppStream stream) { while (!node_queue.is_empty && in_flight_stanzas.size < 10) { - QueueItem queue_item = node_queue.remove_at(0); + QueueItem queue_item = node_queue.poll(); + try { + queue_item.cancellable.set_error_if_cancelled(); + } catch (IOError e) { + queue_item.promise.set_exception(e); + continue; + } StanzaNode node = queue_item.node; if (node.name == "message" || node.name == "iq" || node.name == "presence") { in_flight_stanzas[++h_outbound] = queue_item; } - write_node.begin(stream, node); + write_node.begin(stream, node, queue_item.io_priority, queue_item.cancellable); } } -- cgit v1.2.3-70-g09d2