From 74f7fa897f9aec298eeadcfc7a7b971f06498858 Mon Sep 17 00:00:00 2001 From: fiaxh Date: Sun, 28 Jun 2020 15:53:41 +0200 Subject: Add queue and resending to stream management --- libdino/src/entity/message.vala | 3 +- libdino/src/service/message_processor.vala | 3 + xmpp-vala/src/core/stanza_writer.vala | 16 +++ xmpp-vala/src/core/xmpp_stream.vala | 28 +++-- .../src/module/xep/0198_stream_management.vala | 114 +++++++++++++++++++-- 5 files changed, 146 insertions(+), 18 deletions(-) diff --git a/libdino/src/entity/message.vala b/libdino/src/entity/message.vala index 89ad241a..6670ec5d 100644 --- a/libdino/src/entity/message.vala +++ b/libdino/src/entity/message.vala @@ -14,7 +14,8 @@ public class Message : Object { READ, ACKNOWLEDGED, UNSENT, - WONTSEND + WONTSEND, + SENT } public enum Type { diff --git a/libdino/src/service/message_processor.vala b/libdino/src/service/message_processor.vala index 74192e16..6c415deb 100644 --- a/libdino/src/service/message_processor.vala +++ b/libdino/src/service/message_processor.vala @@ -638,6 +638,9 @@ public class MessageProcessor : StreamInteractionModule, Object { stream.get_module(MessageModule.IDENTITY).send_message.begin(stream, new_message, (_, res) => { try { stream.get_module(MessageModule.IDENTITY).send_message.end(res); + if (message.marked == Message.Marked.NONE/* && (yield stream.get_module(Xep.ServiceDiscovery.Module.IDENTITY).has_entity_feature(stream, conversation.account.bare_jid, Xep.UniqueStableStanzaIDs.NS_URI))*/) { + message.marked = Message.Marked.SENT; + } // The server might not have given us the resource we asked for. In that case, store the actual resource the message was sent with. Relevant for deduplication. Jid? current_own_jid = stream.get_flag(Bind.Flag.IDENTITY).my_jid; diff --git a/xmpp-vala/src/core/stanza_writer.vala b/xmpp-vala/src/core/stanza_writer.vala index ab51a948..51f0061f 100644 --- a/xmpp-vala/src/core/stanza_writer.vala +++ b/xmpp-vala/src/core/stanza_writer.vala @@ -14,6 +14,22 @@ public class StanzaWriter { yield write_data(node.to_xml().data); } + public async void write_nodes(StanzaNode node1, StanzaNode node2) throws XmlError { + var data1 = node1.to_xml().data; + var data2 = node2.to_xml().data; + + uint8[] concat = new uint8[data1.length + data2.length]; + int i = 0; + foreach (var datum in data1) { + concat[i++] = datum; + } + foreach (var datum in data2) { + concat[i++] = datum; + } + + yield write_data(concat); + } + public async void write(string s) throws XmlError { yield write_data(s.data); } diff --git a/xmpp-vala/src/core/xmpp_stream.vala b/xmpp-vala/src/core/xmpp_stream.vala index 39754ba1..f4cdf09a 100644 --- a/xmpp-vala/src/core/xmpp_stream.vala +++ b/xmpp-vala/src/core/xmpp_stream.vala @@ -18,12 +18,14 @@ public class XmppStream { public StanzaNode? features { get; private set; default = new StanzaNode.build("features", NS_URI); } private IOStream? stream; - private StanzaReader? reader; - private StanzaWriter? writer; + internal StanzaReader? reader; + internal StanzaWriter? writer; public Gee.List flags { get; private set; default=new ArrayList(); } public Gee.List modules { get; private set; default=new ArrayList(); } private Gee.List connection_providers = new ArrayList(); + + internal WriteNodeFunc? write_obj = null; public bool negotiation_complete { get; set; default=false; } private bool setup_needed = false; private bool non_negotiation_modules_attached = false; @@ -126,13 +128,17 @@ public class XmppStream { } public async void write_async(StanzaNode node) throws IOStreamError { - StanzaWriter? writer = this.writer; - if (writer == null) throw new IOStreamError.WRITE("trying to write, but no stream open"); - try { - log.node("OUT", node, this); - yield ((!)writer).write_node(node); - } catch (XmlError e) { - throw new IOStreamError.WRITE(e.message); + if (write_obj != null) { + yield write_obj.write_stanza(this, node); + } else { + StanzaWriter? writer = this.writer; + if (writer == null) throw new IOStreamError.WRITE("trying to write, but no stream open"); + try { + log.node("OUT", node, this); + yield ((!)writer).write_node(node); + } catch (XmlError e) { + throw new IOStreamError.WRITE(e.message); + } } } @@ -403,4 +409,8 @@ public class StartTlsConnectionProvider : ConnectionProvider { public override string get_id() { return "start_tls"; } } +public interface WriteNodeFunc : Object { + public abstract async void write_stanza(XmppStream stream, StanzaNode node) throws IOError; +} + } diff --git a/xmpp-vala/src/module/xep/0198_stream_management.vala b/xmpp-vala/src/module/xep/0198_stream_management.vala index 8ac41eb7..c44e2c70 100644 --- a/xmpp-vala/src/module/xep/0198_stream_management.vala +++ b/xmpp-vala/src/module/xep/0198_stream_management.vala @@ -1,13 +1,71 @@ +using Gee; + namespace Xmpp.Xep.StreamManagement { public const string NS_URI = "urn:xmpp:sm:3"; -public class Module : XmppStreamNegotiationModule { +public class Module : XmppStreamNegotiationModule, WriteNodeFunc { public static ModuleIdentity IDENTITY = new ModuleIdentity(NS_URI, "0198_stream_management"); - public int h_inbound { get; private set; default=0; } + public int h_inbound = 0; + public int h_outbound = 0; + public string? session_id { get; set; default=null; } public Gee.List flags = null; + private HashMap in_flight_stanzas = new HashMap(); + private Gee.List node_queue = new ArrayList(); + + private class QueueItem { + public StanzaNode node; + public Promise promise; + + public QueueItem(StanzaNode node, Promise promise) { + this.node = node; + this.promise = promise; + } + } + + public async void write_stanza(XmppStream stream, StanzaNode node) throws IOError { + if (stream.has_flag(Flag.IDENTITY)) { + var promise = new Promise(); + + node_queue.add(new QueueItem(node, promise)); + check_queue(stream); + + yield promise.future.wait_async(); + } else { + yield write_node(stream, node); + } + } + + internal async void write_node(XmppStream stream, StanzaNode node) throws IOError { + StanzaWriter? writer = stream.writer; + if (writer == null) throw new IOStreamError.WRITE("trying to write, but no stream open"); + try { + stream.log.node("OUT", node, stream); + if (node.name == "message" || node.name == "iq" || node.name == "presence") { + var r_node = new StanzaNode.build("r", NS_URI).add_self_xmlns(); + stream.log.node("OUT", r_node, stream); + yield ((!)writer).write_nodes(node, r_node); + } else { + yield ((!)writer).write_node(node); + } + } catch (XmlError e) { + throw new IOStreamError.WRITE(e.message); + } + } + + private void check_queue(XmppStream stream) throws IOError { + while (!node_queue.is_empty && in_flight_stanzas.size < 10) { + QueueItem queue_item = node_queue.remove_at(0); + StanzaNode node = queue_item.node; + + if (node.name == "message" || node.name == "iq" || node.name == "presence") { + in_flight_stanzas[++h_outbound] = queue_item; + } + write_node.begin(stream, node); + } + } public override void attach(XmppStream stream) { stream.get_module(Bind.Module.IDENTITY).bound_to_resource.connect(check_enable); @@ -19,7 +77,15 @@ public class Module : XmppStreamNegotiationModule { stream.received_iq_stanza.connect(on_stanza_received); } - public override void detach(XmppStream stream) { } + public override void detach(XmppStream stream) { + stream.get_module(Bind.Module.IDENTITY).bound_to_resource.disconnect(check_enable); + stream.received_features_node.disconnect(check_resume); + + stream.received_nonza.disconnect(on_received_nonza); + stream.received_message_stanza.disconnect(on_stanza_received); + stream.received_presence_stanza.disconnect(on_stanza_received); + stream.received_iq_stanza.disconnect(on_stanza_received); + } public static void require(XmppStream stream) { if (stream.get_module(IDENTITY) == null) stream.add_module(new PrivateXmlStorage.Module()); @@ -35,7 +101,7 @@ public class Module : XmppStreamNegotiationModule { public override string get_id() { return IDENTITY.id; } private void on_stanza_received(XmppStream stream, StanzaNode node) { - lock (h_inbound) h_inbound++; + h_inbound++; } private void check_resume(XmppStream stream) { @@ -45,7 +111,7 @@ public class Module : XmppStreamNegotiationModule { StanzaNode node = new StanzaNode.build("resume", NS_URI).add_self_xmlns() .put_attribute("h", h_inbound.to_string()) .put_attribute("previd", session_id); - stream.write(node); + write_node(stream, node); stream.add_flag(new Flag()); } } @@ -54,8 +120,9 @@ public class Module : XmppStreamNegotiationModule { private void check_enable(XmppStream stream) { if (stream_has_sm_feature(stream) && session_id == null) { StanzaNode node = new StanzaNode.build("enable", NS_URI).add_self_xmlns().put_attribute("resume", "true"); - stream.write(node); + write_node(stream, node); stream.add_flag(new Flag()); + h_outbound = 0; } } @@ -69,17 +136,32 @@ public class Module : XmppStreamNegotiationModule { stream.get_flag(Flag.IDENTITY).finished = true; if (node.name == "enabled") { - lock(h_inbound) h_inbound = 0; + h_inbound = 0; session_id = node.get_attribute("id", NS_URI); flags = stream.flags; + stream.write_obj = this; } else if (node.name == "resumed") { foreach (XmppStreamFlag flag in flags) { stream.add_flag(flag); } stream.negotiation_complete = true; + + h_outbound = int.parse(node.get_attribute("h", NS_URI)); + handle_incoming_h(stream, h_outbound); + foreach (var id in in_flight_stanzas.keys) { + node_queue.add(in_flight_stanzas[id]); + } + in_flight_stanzas.clear(); + check_queue(stream); + stream.write_obj = this; } else if (node.name == "failed") { stream.received_features_node(stream); session_id = null; + foreach (var id in in_flight_stanzas.keys) { + in_flight_stanzas[id].promise.set_exception(new IOError.FAILED("bla")); + } + in_flight_stanzas.clear(); + check_queue(stream); } } } @@ -87,11 +169,27 @@ public class Module : XmppStreamNegotiationModule { private void send_ack(XmppStream stream) { StanzaNode node = new StanzaNode.build("a", NS_URI).add_self_xmlns().put_attribute("h", h_inbound.to_string()); - stream.write(node); + write_node(stream, node); } private void handle_ack(XmppStream stream, StanzaNode node) { + string? h_acked = node.get_attribute("h", NS_URI); + int parsed_int = int.parse(h_acked); + handle_incoming_h(stream, parsed_int); + check_queue(stream); + } + private void handle_incoming_h(XmppStream stream, int h) { + var remove_nrs = new ArrayList(); + foreach (int nr in in_flight_stanzas.keys) { + if (nr <= h) { + in_flight_stanzas[nr].promise.set_value(null); + remove_nrs.add(nr); + } + } + foreach (int nr in remove_nrs) { + in_flight_stanzas.unset(nr); + } } private bool stream_has_sm_feature(XmppStream stream) { -- cgit v1.2.3-54-g00ecf