From c887240fdcdbc8f76c6bdd37b1174b8154907c4c Mon Sep 17 00:00:00 2001 From: fiaxh Date: Tue, 21 Jul 2020 15:48:42 +0200 Subject: Improve stream management queue --- xmpp-vala/src/core/stanza_writer.vala | 3 ++ xmpp-vala/src/core/xmpp_stream.vala | 6 +++- .../src/module/xep/0198_stream_management.vala | 34 +++++++++------------- 3 files changed, 22 insertions(+), 21 deletions(-) (limited to 'xmpp-vala') diff --git a/xmpp-vala/src/core/stanza_writer.vala b/xmpp-vala/src/core/stanza_writer.vala index 51f0061f..62fe022a 100644 --- a/xmpp-vala/src/core/stanza_writer.vala +++ b/xmpp-vala/src/core/stanza_writer.vala @@ -1,6 +1,8 @@ namespace Xmpp { public class StanzaWriter { + public signal void cancel(); + private OutputStream output; private Queue queue = new Queue(); @@ -43,6 +45,7 @@ public class StanzaWriter { try { yield output.write_all_async(data, 0, null, null); } catch (GLib.Error e) { + cancel(); throw new XmlError.IO(@"IOError in GLib: $(e.message)"); } finally { SourceFuncWrapper? sfw = queue.pop_head(); diff --git a/xmpp-vala/src/core/xmpp_stream.vala b/xmpp-vala/src/core/xmpp_stream.vala index 7e7588b1..ad4dae97 100644 --- a/xmpp-vala/src/core/xmpp_stream.vala +++ b/xmpp-vala/src/core/xmpp_stream.vala @@ -97,6 +97,8 @@ public class XmppStream { this.stream = stream; reader = new StanzaReader.for_stream(stream.input_stream); writer = new StanzaWriter.for_stream(stream.output_stream); + + writer.cancel.connect(reader.cancel); require_setup(); } @@ -123,7 +125,9 @@ public class XmppStream { [Version (deprecated = true, deprecated_since = "0.1", replacement = "write_async")] public void write(StanzaNode node) { write_async.begin(node, (obj, res) => { - write_async.end(res); + try { + write_async.end(res); + } catch (Error e) { } }); } diff --git a/xmpp-vala/src/module/xep/0198_stream_management.vala b/xmpp-vala/src/module/xep/0198_stream_management.vala index bc695ddb..b6317425 100644 --- a/xmpp-vala/src/module/xep/0198_stream_management.vala +++ b/xmpp-vala/src/module/xep/0198_stream_management.vala @@ -26,25 +26,21 @@ public class Module : XmppStreamNegotiationModule, WriteNodeFunc { } public async void write_stanza(XmppStream stream, StanzaNode node) throws IOStreamError { - if (stream.has_flag(Flag.IDENTITY)) { - var promise = new Promise(); + var promise = new Promise(); - node_queue.add(new QueueItem(node, promise)); - check_queue(stream); + node_queue.add(new QueueItem(node, promise)); + check_queue(stream); - try { - yield promise.future.wait_async(); - } catch (FutureError e) { - throw new IOStreamError.WRITE("Future returned error %i".printf(e.code)); - } - } else { - yield write_node(stream, node); + try { + yield promise.future.wait_async(); + } catch (FutureError e) { + throw new IOStreamError.WRITE("Future returned error %i".printf(e.code)); } } - internal async void write_node(XmppStream stream, StanzaNode node) throws IOError { + internal async void write_node(XmppStream stream, StanzaNode node) { StanzaWriter? writer = stream.writer; - if (writer == null) throw new IOStreamError.WRITE("trying to write, but no stream open"); + if (writer == null) return; try { stream.log.node("OUT", node, stream); if (node.name == "message" || node.name == "iq" || node.name == "presence") { @@ -54,12 +50,10 @@ public class Module : XmppStreamNegotiationModule, WriteNodeFunc { } else { yield ((!)writer).write_node(node); } - } catch (XmlError e) { - throw new IOStreamError.WRITE(e.message); - } + } catch (XmlError e) { } } - private void check_queue(XmppStream stream) throws IOError { + private void check_queue(XmppStream stream) { while (!node_queue.is_empty && in_flight_stanzas.size < 10) { QueueItem queue_item = node_queue.remove_at(0); StanzaNode node = queue_item.node; @@ -115,7 +109,7 @@ public class Module : XmppStreamNegotiationModule, WriteNodeFunc { StanzaNode node = new StanzaNode.build("resume", NS_URI).add_self_xmlns() .put_attribute("h", h_inbound.to_string()) .put_attribute("previd", session_id); - write_node(stream, node); + write_node.begin(stream, node); stream.add_flag(new Flag()); } } @@ -124,7 +118,7 @@ public class Module : XmppStreamNegotiationModule, WriteNodeFunc { private void check_enable(XmppStream stream) { if (stream_has_sm_feature(stream) && session_id == null) { StanzaNode node = new StanzaNode.build("enable", NS_URI).add_self_xmlns().put_attribute("resume", "true"); - write_node(stream, node); + write_node.begin(stream, node); stream.add_flag(new Flag()); h_outbound = 0; } @@ -174,7 +168,7 @@ public class Module : XmppStreamNegotiationModule, WriteNodeFunc { private void send_ack(XmppStream stream) { StanzaNode node = new StanzaNode.build("a", NS_URI).add_self_xmlns().put_attribute("h", h_inbound.to_string()); - write_node(stream, node); + write_node.begin(stream, node); } private void handle_ack(XmppStream stream, StanzaNode node) { -- cgit v1.2.3-70-g09d2