From 74f7fa897f9aec298eeadcfc7a7b971f06498858 Mon Sep 17 00:00:00 2001 From: fiaxh Date: Sun, 28 Jun 2020 15:53:41 +0200 Subject: Add queue and resending to stream management --- .../src/module/xep/0198_stream_management.vala | 114 +++++++++++++++++++-- 1 file changed, 106 insertions(+), 8 deletions(-) (limited to 'xmpp-vala/src/module/xep') diff --git a/xmpp-vala/src/module/xep/0198_stream_management.vala b/xmpp-vala/src/module/xep/0198_stream_management.vala index 8ac41eb7..c44e2c70 100644 --- a/xmpp-vala/src/module/xep/0198_stream_management.vala +++ b/xmpp-vala/src/module/xep/0198_stream_management.vala @@ -1,13 +1,71 @@ +using Gee; + namespace Xmpp.Xep.StreamManagement { public const string NS_URI = "urn:xmpp:sm:3"; -public class Module : XmppStreamNegotiationModule { +public class Module : XmppStreamNegotiationModule, WriteNodeFunc { public static ModuleIdentity IDENTITY = new ModuleIdentity(NS_URI, "0198_stream_management"); - public int h_inbound { get; private set; default=0; } + public int h_inbound = 0; + public int h_outbound = 0; + public string? session_id { get; set; default=null; } public Gee.List flags = null; + private HashMap in_flight_stanzas = new HashMap(); + private Gee.List node_queue = new ArrayList(); + + private class QueueItem { + public StanzaNode node; + public Promise promise; + + public QueueItem(StanzaNode node, Promise promise) { + this.node = node; + this.promise = promise; + } + } + + public async void write_stanza(XmppStream stream, StanzaNode node) throws IOError { + if (stream.has_flag(Flag.IDENTITY)) { + var promise = new Promise(); + + node_queue.add(new QueueItem(node, promise)); + check_queue(stream); + + yield promise.future.wait_async(); + } else { + yield write_node(stream, node); + } + } + + internal async void write_node(XmppStream stream, StanzaNode node) throws IOError { + StanzaWriter? writer = stream.writer; + if (writer == null) throw new IOStreamError.WRITE("trying to write, but no stream open"); + try { + stream.log.node("OUT", node, stream); + if (node.name == "message" || node.name == "iq" || node.name == "presence") { + var r_node = new StanzaNode.build("r", NS_URI).add_self_xmlns(); + stream.log.node("OUT", r_node, stream); + yield ((!)writer).write_nodes(node, r_node); + } else { + yield ((!)writer).write_node(node); + } + } catch (XmlError e) { + throw new IOStreamError.WRITE(e.message); + } + } + + private void check_queue(XmppStream stream) throws IOError { + while (!node_queue.is_empty && in_flight_stanzas.size < 10) { + QueueItem queue_item = node_queue.remove_at(0); + StanzaNode node = queue_item.node; + + if (node.name == "message" || node.name == "iq" || node.name == "presence") { + in_flight_stanzas[++h_outbound] = queue_item; + } + write_node.begin(stream, node); + } + } public override void attach(XmppStream stream) { stream.get_module(Bind.Module.IDENTITY).bound_to_resource.connect(check_enable); @@ -19,7 +77,15 @@ public class Module : XmppStreamNegotiationModule { stream.received_iq_stanza.connect(on_stanza_received); } - public override void detach(XmppStream stream) { } + public override void detach(XmppStream stream) { + stream.get_module(Bind.Module.IDENTITY).bound_to_resource.disconnect(check_enable); + stream.received_features_node.disconnect(check_resume); + + stream.received_nonza.disconnect(on_received_nonza); + stream.received_message_stanza.disconnect(on_stanza_received); + stream.received_presence_stanza.disconnect(on_stanza_received); + stream.received_iq_stanza.disconnect(on_stanza_received); + } public static void require(XmppStream stream) { if (stream.get_module(IDENTITY) == null) stream.add_module(new PrivateXmlStorage.Module()); @@ -35,7 +101,7 @@ public class Module : XmppStreamNegotiationModule { public override string get_id() { return IDENTITY.id; } private void on_stanza_received(XmppStream stream, StanzaNode node) { - lock (h_inbound) h_inbound++; + h_inbound++; } private void check_resume(XmppStream stream) { @@ -45,7 +111,7 @@ public class Module : XmppStreamNegotiationModule { StanzaNode node = new StanzaNode.build("resume", NS_URI).add_self_xmlns() .put_attribute("h", h_inbound.to_string()) .put_attribute("previd", session_id); - stream.write(node); + write_node(stream, node); stream.add_flag(new Flag()); } } @@ -54,8 +120,9 @@ public class Module : XmppStreamNegotiationModule { private void check_enable(XmppStream stream) { if (stream_has_sm_feature(stream) && session_id == null) { StanzaNode node = new StanzaNode.build("enable", NS_URI).add_self_xmlns().put_attribute("resume", "true"); - stream.write(node); + write_node(stream, node); stream.add_flag(new Flag()); + h_outbound = 0; } } @@ -69,17 +136,32 @@ public class Module : XmppStreamNegotiationModule { stream.get_flag(Flag.IDENTITY).finished = true; if (node.name == "enabled") { - lock(h_inbound) h_inbound = 0; + h_inbound = 0; session_id = node.get_attribute("id", NS_URI); flags = stream.flags; + stream.write_obj = this; } else if (node.name == "resumed") { foreach (XmppStreamFlag flag in flags) { stream.add_flag(flag); } stream.negotiation_complete = true; + + h_outbound = int.parse(node.get_attribute("h", NS_URI)); + handle_incoming_h(stream, h_outbound); + foreach (var id in in_flight_stanzas.keys) { + node_queue.add(in_flight_stanzas[id]); + } + in_flight_stanzas.clear(); + check_queue(stream); + stream.write_obj = this; } else if (node.name == "failed") { stream.received_features_node(stream); session_id = null; + foreach (var id in in_flight_stanzas.keys) { + in_flight_stanzas[id].promise.set_exception(new IOError.FAILED("bla")); + } + in_flight_stanzas.clear(); + check_queue(stream); } } } @@ -87,11 +169,27 @@ public class Module : XmppStreamNegotiationModule { private void send_ack(XmppStream stream) { StanzaNode node = new StanzaNode.build("a", NS_URI).add_self_xmlns().put_attribute("h", h_inbound.to_string()); - stream.write(node); + write_node(stream, node); } private void handle_ack(XmppStream stream, StanzaNode node) { + string? h_acked = node.get_attribute("h", NS_URI); + int parsed_int = int.parse(h_acked); + handle_incoming_h(stream, parsed_int); + check_queue(stream); + } + private void handle_incoming_h(XmppStream stream, int h) { + var remove_nrs = new ArrayList(); + foreach (int nr in in_flight_stanzas.keys) { + if (nr <= h) { + in_flight_stanzas[nr].promise.set_value(null); + remove_nrs.add(nr); + } + } + foreach (int nr in remove_nrs) { + in_flight_stanzas.unset(nr); + } } private bool stream_has_sm_feature(XmppStream stream) { -- cgit v1.2.3-70-g09d2