From c887240fdcdbc8f76c6bdd37b1174b8154907c4c Mon Sep 17 00:00:00 2001 From: fiaxh Date: Tue, 21 Jul 2020 15:48:42 +0200 Subject: Improve stream management queue --- .../src/module/xep/0198_stream_management.vala | 34 +++++++++------------- 1 file changed, 14 insertions(+), 20 deletions(-) (limited to 'xmpp-vala/src/module') diff --git a/xmpp-vala/src/module/xep/0198_stream_management.vala b/xmpp-vala/src/module/xep/0198_stream_management.vala index bc695ddb..b6317425 100644 --- a/xmpp-vala/src/module/xep/0198_stream_management.vala +++ b/xmpp-vala/src/module/xep/0198_stream_management.vala @@ -26,25 +26,21 @@ public class Module : XmppStreamNegotiationModule, WriteNodeFunc { } public async void write_stanza(XmppStream stream, StanzaNode node) throws IOStreamError { - if (stream.has_flag(Flag.IDENTITY)) { - var promise = new Promise(); + var promise = new Promise(); - node_queue.add(new QueueItem(node, promise)); - check_queue(stream); + node_queue.add(new QueueItem(node, promise)); + check_queue(stream); - try { - yield promise.future.wait_async(); - } catch (FutureError e) { - throw new IOStreamError.WRITE("Future returned error %i".printf(e.code)); - } - } else { - yield write_node(stream, node); + try { + yield promise.future.wait_async(); + } catch (FutureError e) { + throw new IOStreamError.WRITE("Future returned error %i".printf(e.code)); } } - internal async void write_node(XmppStream stream, StanzaNode node) throws IOError { + internal async void write_node(XmppStream stream, StanzaNode node) { StanzaWriter? writer = stream.writer; - if (writer == null) throw new IOStreamError.WRITE("trying to write, but no stream open"); + if (writer == null) return; try { stream.log.node("OUT", node, stream); if (node.name == "message" || node.name == "iq" || node.name == "presence") { @@ -54,12 +50,10 @@ public class Module : XmppStreamNegotiationModule, WriteNodeFunc { } else { yield ((!)writer).write_node(node); } - } catch (XmlError e) { - throw new IOStreamError.WRITE(e.message); - } + } catch (XmlError e) { } } - private void check_queue(XmppStream stream) throws IOError { + private void check_queue(XmppStream stream) { while (!node_queue.is_empty && in_flight_stanzas.size < 10) { QueueItem queue_item = node_queue.remove_at(0); StanzaNode node = queue_item.node; @@ -115,7 +109,7 @@ public class Module : XmppStreamNegotiationModule, WriteNodeFunc { StanzaNode node = new StanzaNode.build("resume", NS_URI).add_self_xmlns() .put_attribute("h", h_inbound.to_string()) .put_attribute("previd", session_id); - write_node(stream, node); + write_node.begin(stream, node); stream.add_flag(new Flag()); } } @@ -124,7 +118,7 @@ public class Module : XmppStreamNegotiationModule, WriteNodeFunc { private void check_enable(XmppStream stream) { if (stream_has_sm_feature(stream) && session_id == null) { StanzaNode node = new StanzaNode.build("enable", NS_URI).add_self_xmlns().put_attribute("resume", "true"); - write_node(stream, node); + write_node.begin(stream, node); stream.add_flag(new Flag()); h_outbound = 0; } @@ -174,7 +168,7 @@ public class Module : XmppStreamNegotiationModule, WriteNodeFunc { private void send_ack(XmppStream stream) { StanzaNode node = new StanzaNode.build("a", NS_URI).add_self_xmlns().put_attribute("h", h_inbound.to_string()); - write_node(stream, node); + write_node.begin(stream, node); } private void handle_ack(XmppStream stream, StanzaNode node) { -- cgit v1.2.3-70-g09d2