aboutsummaryrefslogtreecommitdiff
path: root/xmpp-vala
diff options
context:
space:
mode:
authorMarvin W <git@larma.de>2023-01-31 15:13:12 +0100
committerMarvin W <git@larma.de>2023-02-07 10:50:45 +0100
commitd76e12b215eb62e4eda5a0f92fbf5c1bd7c1848e (patch)
tree8037cd613ccea827d8d1895b82a2c0dd65a75a14 /xmpp-vala
parent18321ed15ce782ff5d1f24de9f2fb459d714d125 (diff)
downloaddino-d76e12b215eb62e4eda5a0f92fbf5c1bd7c1848e.tar.gz
dino-d76e12b215eb62e4eda5a0f92fbf5c1bd7c1848e.zip
Add priority for and allow cancellation of outgoing stanzas
Diffstat (limited to 'xmpp-vala')
-rw-r--r--xmpp-vala/src/core/io_xmpp_stream.vala12
-rw-r--r--xmpp-vala/src/core/stanza_writer.vala16
-rw-r--r--xmpp-vala/src/core/xmpp_stream.vala4
-rw-r--r--xmpp-vala/src/module/iq/module.vala11
-rw-r--r--xmpp-vala/src/module/xep/0198_stream_management.vala59
-rw-r--r--xmpp-vala/src/module/xep/0199_ping.vala4
-rw-r--r--xmpp-vala/src/module/xep/0313_2_message_archive_management.vala8
-rw-r--r--xmpp-vala/src/module/xep/0313_message_archive_management.vala4
8 files changed, 73 insertions, 45 deletions
diff --git a/xmpp-vala/src/core/io_xmpp_stream.vala b/xmpp-vala/src/core/io_xmpp_stream.vala
index 208e8053..9c58a46b 100644
--- a/xmpp-vala/src/core/io_xmpp_stream.vala
+++ b/xmpp-vala/src/core/io_xmpp_stream.vala
@@ -1,7 +1,7 @@
using Gee;
public interface Xmpp.WriteNodeFunc : Object {
- public abstract async void write_stanza(XmppStream stream, StanzaNode node) throws IOError;
+ public abstract async void write_stanza(XmppStream stream, StanzaNode node, int io_priority = Priority.DEFAULT, Cancellable? cancellable = null) throws IOError;
}
public abstract class Xmpp.IoXmppStream : XmppStream {
@@ -44,22 +44,22 @@ public abstract class Xmpp.IoXmppStream : XmppStream {
}
[Version (deprecated = true, deprecated_since = "0.1", replacement = "write_async")]
- public override void write(StanzaNode node) {
- write_async.begin(node, (obj, res) => {
+ public override void write(StanzaNode node, int io_priority = Priority.DEFAULT) {
+ write_async.begin(node, io_priority, null, (obj, res) => {
try {
write_async.end(res);
} catch (Error e) { }
});
}
- public override async void write_async(StanzaNode node) throws IOError {
+ public override async void write_async(StanzaNode node, int io_priority = Priority.DEFAULT, Cancellable? cancellable = null) throws IOError {
if (write_obj != null) {
- yield write_obj.write_stanza(this, node);
+ yield write_obj.write_stanza(this, node, io_priority, cancellable);
} else {
StanzaWriter? writer = this.writer;
if (writer == null) throw new IOError.NOT_CONNECTED("trying to write, but no stream open");
log.node("OUT", node, this);
- yield ((!)writer).write_node(node);
+ yield ((!)writer).write_node(node, io_priority, cancellable);
}
}
diff --git a/xmpp-vala/src/core/stanza_writer.vala b/xmpp-vala/src/core/stanza_writer.vala
index 5b926a93..aecf8983 100644
--- a/xmpp-vala/src/core/stanza_writer.vala
+++ b/xmpp-vala/src/core/stanza_writer.vala
@@ -12,11 +12,11 @@ public class StanzaWriter {
this.output = output;
}
- public async void write_node(StanzaNode node) throws IOError {
- yield write_data(node.to_xml().data);
+ public async void write_node(StanzaNode node, int io_priority = Priority.DEFAULT, Cancellable? cancellable = null) throws IOError {
+ yield write_data(node.to_xml().data, io_priority, cancellable);
}
- public async void write_nodes(StanzaNode node1, StanzaNode node2) throws IOError {
+ public async void write_nodes(StanzaNode node1, StanzaNode node2, int io_priority = Priority.DEFAULT, Cancellable? cancellable = null) throws IOError {
var data1 = node1.to_xml().data;
var data2 = node2.to_xml().data;
@@ -29,21 +29,21 @@ public class StanzaWriter {
concat[i++] = datum;
}
- yield write_data(concat);
+ yield write_data(concat, io_priority, cancellable);
}
- public async void write(string s) throws IOError {
- yield write_data(s.data);
+ public async void write(string s, int io_priority = Priority.DEFAULT, Cancellable? cancellable = null) throws IOError {
+ yield write_data(s.data, io_priority, cancellable);
}
- private async void write_data(owned uint8[] data) throws IOError {
+ private async void write_data(owned uint8[] data, int io_priority = Priority.DEFAULT, Cancellable? cancellable = null) throws IOError {
if (running) {
queue.push_tail(new SourceFuncWrapper(write_data.callback));
yield;
}
running = true;
try {
- yield output.write_all_async(data, 0, null, null);
+ yield output.write_all_async(data, io_priority, cancellable, null);
} catch (IOError e) {
cancel();
throw e;
diff --git a/xmpp-vala/src/core/xmpp_stream.vala b/xmpp-vala/src/core/xmpp_stream.vala
index 6370554f..322fb016 100644
--- a/xmpp-vala/src/core/xmpp_stream.vala
+++ b/xmpp-vala/src/core/xmpp_stream.vala
@@ -37,9 +37,9 @@ public abstract class Xmpp.XmppStream {
public abstract async StanzaNode read() throws IOError;
[Version (deprecated = true, deprecated_since = "0.1", replacement = "write_async")]
- public abstract void write(StanzaNode node);
+ public abstract void write(StanzaNode node, int io_priority = Priority.DEFAULT);
- public abstract async void write_async(StanzaNode node) throws IOError;
+ public abstract async void write_async(StanzaNode node, int io_priority = Priority.DEFAULT, Cancellable? cancellable = null) throws IOError;
public abstract async void setup() throws IOError;
diff --git a/xmpp-vala/src/module/iq/module.vala b/xmpp-vala/src/module/iq/module.vala
index 17cd3f0d..172aa9b1 100644
--- a/xmpp-vala/src/module/iq/module.vala
+++ b/xmpp-vala/src/module/iq/module.vala
@@ -12,22 +12,25 @@ namespace Xmpp.Iq {
private HashMap<string, ResponseListener> responseListeners = new HashMap<string, ResponseListener>();
private HashMap<string, ArrayList<Handler>> namespaceRegistrants = new HashMap<string, ArrayList<Handler>>();
- public async Iq.Stanza send_iq_async(XmppStream stream, Iq.Stanza iq) {
+ public async Iq.Stanza send_iq_async(XmppStream stream, Iq.Stanza iq, int io_priority = Priority.DEFAULT, Cancellable? cancellable = null) throws IOError {
assert(iq.type_ == Iq.Stanza.TYPE_GET || iq.type_ == Iq.Stanza.TYPE_SET);
+ preprocess_outgoing_iq_set_get(stream, iq);
Iq.Stanza? return_stanza = null;
- send_iq(stream, iq, (_, result_iq) => {
+ responseListeners[iq.id] = new ResponseListener((_, result_iq) => {
return_stanza = result_iq;
Idle.add(send_iq_async.callback);
});
+ stream.write_async(iq.stanza, io_priority, cancellable);
yield;
+ cancellable.set_error_if_cancelled();
return return_stanza;
}
public delegate void OnResult(XmppStream stream, Iq.Stanza iq);
- public void send_iq(XmppStream stream, Iq.Stanza iq, owned OnResult? listener = null) {
+ public void send_iq(XmppStream stream, Iq.Stanza iq, owned OnResult? listener = null, int io_priority = Priority.DEFAULT) {
preprocess_outgoing_iq_set_get(stream, iq);
- stream.write(iq.stanza);
+ stream.write(iq.stanza, io_priority);
if (listener != null) {
responseListeners[iq.id] = new ResponseListener((owned) listener);
}
diff --git a/xmpp-vala/src/module/xep/0198_stream_management.vala b/xmpp-vala/src/module/xep/0198_stream_management.vala
index 340c4e6f..68eee8ae 100644
--- a/xmpp-vala/src/module/xep/0198_stream_management.vala
+++ b/xmpp-vala/src/module/xep/0198_stream_management.vala
@@ -13,32 +13,51 @@ public class Module : XmppStreamNegotiationModule, WriteNodeFunc {
public string? session_id { get; set; default=null; }
public Gee.List<XmppStreamFlag> flags = null;
private HashMap<int, QueueItem> in_flight_stanzas = new HashMap<int, QueueItem>();
- private Gee.List<QueueItem> node_queue = new ArrayList<QueueItem>();
+ private Gee.Queue<QueueItem> node_queue = new PriorityQueue<QueueItem>((a, b) => {
+ return a.io_priority - b.io_priority;
+ });
private class QueueItem {
public StanzaNode node;
- public Promise<IOError?> promise;
+ public int io_priority;
+ public Cancellable? cancellable;
+ public Promise<void*> promise;
- public QueueItem(StanzaNode node, Promise<IOError?> promise) {
+ public QueueItem(StanzaNode node, int io_priority, Cancellable? cancellable) {
this.node = node;
- this.promise = promise;
+ this.io_priority = io_priority;
+ this.cancellable = cancellable;
+ this.promise = new Promise<void*>();
}
}
- public async void write_stanza(XmppStream stream, StanzaNode node) throws IOError {
- var promise = new Promise<IOError?>();
-
- node_queue.add(new QueueItem(node, promise));
- check_queue(stream);
-
+ public async void write_stanza(XmppStream stream, StanzaNode node, int io_priority = Priority.DEFAULT, Cancellable? cancellable = null) throws IOError {
+ var future = enqueue_stanza(stream, node, io_priority, cancellable);
try {
- yield promise.future.wait_async();
+ yield future.wait_async();
} catch (FutureError e) {
- throw new IOError.FAILED("Future returned error %i".printf(e.code));
+ if (e is FutureError.ABANDON_PROMISE) {
+ throw new IOError.FAILED("Future abandoned: %s".printf(e.message));
+ } else if (e is FutureError.EXCEPTION) {
+ if (future.exception is IOError) {
+ throw (IOError) future.exception;
+ } else {
+ throw new IOError.FAILED("Unknown error %s".printf(future.exception.message));
+ }
+ } else {
+ throw new IOError.FAILED("Unknown future error: %s".printf(e.message));
+ }
}
}
- internal async void write_node(XmppStream stream, StanzaNode node) {
+ private Future<void*> enqueue_stanza(XmppStream stream, StanzaNode node, int io_priority, Cancellable? cancellable) {
+ var queue_item = new QueueItem(node, io_priority, cancellable);
+ node_queue.offer(queue_item);
+ check_queue(stream);
+ return queue_item.promise.future;
+ }
+
+ internal async void write_node(XmppStream stream, StanzaNode node, int io_priority = Priority.DEFAULT, Cancellable? cancellable = null) {
StanzaWriter? writer = ((IoXmppStream)stream).writer;
if (writer == null) return;
try {
@@ -46,22 +65,28 @@ public class Module : XmppStreamNegotiationModule, WriteNodeFunc {
if (node.name == "message" || node.name == "iq" || node.name == "presence") {
var r_node = new StanzaNode.build("r", NS_URI).add_self_xmlns();
stream.log.node("OUT", r_node, stream);
- yield ((!)writer).write_nodes(node, r_node);
+ yield ((!)writer).write_nodes(node, r_node, io_priority, cancellable);
} else {
- yield ((!)writer).write_node(node);
+ yield ((!)writer).write_node(node, io_priority, cancellable);
}
} catch (IOError e) { }
}
private void check_queue(XmppStream stream) {
while (!node_queue.is_empty && in_flight_stanzas.size < 10) {
- QueueItem queue_item = node_queue.remove_at(0);
+ QueueItem queue_item = node_queue.poll();
+ try {
+ queue_item.cancellable.set_error_if_cancelled();
+ } catch (IOError e) {
+ queue_item.promise.set_exception(e);
+ continue;
+ }
StanzaNode node = queue_item.node;
if (node.name == "message" || node.name == "iq" || node.name == "presence") {
in_flight_stanzas[++h_outbound] = queue_item;
}
- write_node.begin(stream, node);
+ write_node.begin(stream, node, queue_item.io_priority, queue_item.cancellable);
}
}
diff --git a/xmpp-vala/src/module/xep/0199_ping.vala b/xmpp-vala/src/module/xep/0199_ping.vala
index 0b31011f..18fa538c 100644
--- a/xmpp-vala/src/module/xep/0199_ping.vala
+++ b/xmpp-vala/src/module/xep/0199_ping.vala
@@ -9,7 +9,7 @@ namespace Xmpp.Xep.Ping {
public async Iq.Stanza send_ping(XmppStream stream, Jid jid) {
StanzaNode ping_node = new StanzaNode.build("ping", NS_URI).add_self_xmlns();
Iq.Stanza iq = new Iq.Stanza.get(ping_node) { to=jid };
- return yield stream.get_module(Iq.Module.IDENTITY).send_iq_async(stream, iq);
+ return yield stream.get_module(Iq.Module.IDENTITY).send_iq_async(stream, iq, Priority.HIGH);
}
public override void attach(XmppStream stream) {
@@ -23,7 +23,7 @@ namespace Xmpp.Xep.Ping {
}
public async void on_iq_get(XmppStream stream, Iq.Stanza iq) {
- stream.get_module(Iq.Module.IDENTITY).send_iq(stream, new Iq.Stanza.result(iq));
+ stream.get_module(Iq.Module.IDENTITY).send_iq(stream, new Iq.Stanza.result(iq), null, Priority.HIGH);
}
public override string get_ns() { return NS_URI; }
diff --git a/xmpp-vala/src/module/xep/0313_2_message_archive_management.vala b/xmpp-vala/src/module/xep/0313_2_message_archive_management.vala
index a710a459..3a6d9259 100644
--- a/xmpp-vala/src/module/xep/0313_2_message_archive_management.vala
+++ b/xmpp-vala/src/module/xep/0313_2_message_archive_management.vala
@@ -61,20 +61,20 @@ namespace Xmpp.MessageArchiveManagement.V2 {
return MessageArchiveManagement.create_base_query(stream, MessageArchiveManagement.NS_URI_2, mam_params.query_id, fields);
}
- public async QueryResult query_archive(XmppStream stream, MamQueryParams mam_params) {
+ public async QueryResult query_archive(XmppStream stream, MamQueryParams mam_params, Cancellable? cancellable = null) {
var query_node = create_base_query(stream, mam_params);
if (!mam_params.use_ns2_extended) {
query_node.put_node(ResultSetManagement.create_set_rsm_node_before(mam_params.end_id));
}
- return yield MessageArchiveManagement.query_archive(stream, MessageArchiveManagement.NS_URI_2, mam_params.mam_server, query_node);
+ return yield MessageArchiveManagement.query_archive(stream, MessageArchiveManagement.NS_URI_2, mam_params.mam_server, query_node, cancellable);
}
- public async QueryResult page_through_results(XmppStream stream, MamQueryParams mam_params, QueryResult prev_result) {
+ public async QueryResult page_through_results(XmppStream stream, MamQueryParams mam_params, QueryResult prev_result, Cancellable? cancellable = null) {
var query_node = create_base_query(stream, mam_params);
query_node.put_node(ResultSetManagement.create_set_rsm_node_before(prev_result.first));
- return yield MessageArchiveManagement.query_archive(stream, MessageArchiveManagement.NS_URI_2, mam_params.mam_server, query_node);
+ return yield MessageArchiveManagement.query_archive(stream, MessageArchiveManagement.NS_URI_2, mam_params.mam_server, query_node, cancellable);
}
}
diff --git a/xmpp-vala/src/module/xep/0313_message_archive_management.vala b/xmpp-vala/src/module/xep/0313_message_archive_management.vala
index 8c1b3fbc..1caa1bc3 100644
--- a/xmpp-vala/src/module/xep/0313_message_archive_management.vala
+++ b/xmpp-vala/src/module/xep/0313_message_archive_management.vala
@@ -71,7 +71,7 @@ public class Module : XmppStreamModule {
return query_node;
}
- internal async QueryResult query_archive(XmppStream stream, string ns, Jid? mam_server, StanzaNode query_node) {
+ internal async QueryResult query_archive(XmppStream stream, string ns, Jid? mam_server, StanzaNode query_node, Cancellable? cancellable = null) {
var res = new QueryResult();
if (stream.get_flag(Flag.IDENTITY) == null) { res.error = true; return res; }
@@ -79,7 +79,7 @@ public class Module : XmppStreamModule {
// Build and send query
Iq.Stanza iq = new Iq.Stanza.set(query_node) { to=mam_server };
- Iq.Stanza result_iq = yield stream.get_module(Iq.Module.IDENTITY).send_iq_async(stream, iq);
+ Iq.Stanza result_iq = yield stream.get_module(Iq.Module.IDENTITY).send_iq_async(stream, iq, Priority.LOW, cancellable);
// Parse the response IQ into a QueryResult.
StanzaNode? fin_node = result_iq.stanza.get_subnode("fin", ns);