aboutsummaryrefslogtreecommitdiff
path: root/xmpp-vala/src/module/xep/0198_stream_management.vala
diff options
context:
space:
mode:
authorfiaxh <git@lightrise.org>2020-06-28 15:53:41 +0200
committerfiaxh <git@lightrise.org>2020-07-15 18:12:19 +0200
commit74f7fa897f9aec298eeadcfc7a7b971f06498858 (patch)
treeb39e0151172ac6a1f0b7eee88a02916c73744d5a /xmpp-vala/src/module/xep/0198_stream_management.vala
parent8e3462b1b703cb504ee397fd5a849090ee377706 (diff)
downloaddino-74f7fa897f9aec298eeadcfc7a7b971f06498858.tar.gz
dino-74f7fa897f9aec298eeadcfc7a7b971f06498858.zip
Add queue and resending to stream management
Diffstat (limited to 'xmpp-vala/src/module/xep/0198_stream_management.vala')
-rw-r--r--xmpp-vala/src/module/xep/0198_stream_management.vala114
1 files changed, 106 insertions, 8 deletions
diff --git a/xmpp-vala/src/module/xep/0198_stream_management.vala b/xmpp-vala/src/module/xep/0198_stream_management.vala
index 8ac41eb7..c44e2c70 100644
--- a/xmpp-vala/src/module/xep/0198_stream_management.vala
+++ b/xmpp-vala/src/module/xep/0198_stream_management.vala
@@ -1,13 +1,71 @@
+using Gee;
+
namespace Xmpp.Xep.StreamManagement {
public const string NS_URI = "urn:xmpp:sm:3";
-public class Module : XmppStreamNegotiationModule {
+public class Module : XmppStreamNegotiationModule, WriteNodeFunc {
public static ModuleIdentity<Module> IDENTITY = new ModuleIdentity<Module>(NS_URI, "0198_stream_management");
- public int h_inbound { get; private set; default=0; }
+ public int h_inbound = 0;
+ public int h_outbound = 0;
+
public string? session_id { get; set; default=null; }
public Gee.List<XmppStreamFlag> flags = null;
+ private HashMap<int, QueueItem> in_flight_stanzas = new HashMap<int, QueueItem>();
+ private Gee.List<QueueItem> node_queue = new ArrayList<QueueItem>();
+
+ private class QueueItem {
+ public StanzaNode node;
+ public Promise<IOError?> promise;
+
+ public QueueItem(StanzaNode node, Promise<IOError?> promise) {
+ this.node = node;
+ this.promise = promise;
+ }
+ }
+
+ public async void write_stanza(XmppStream stream, StanzaNode node) throws IOError {
+ if (stream.has_flag(Flag.IDENTITY)) {
+ var promise = new Promise<IOError?>();
+
+ node_queue.add(new QueueItem(node, promise));
+ check_queue(stream);
+
+ yield promise.future.wait_async();
+ } else {
+ yield write_node(stream, node);
+ }
+ }
+
+ internal async void write_node(XmppStream stream, StanzaNode node) throws IOError {
+ StanzaWriter? writer = stream.writer;
+ if (writer == null) throw new IOStreamError.WRITE("trying to write, but no stream open");
+ try {
+ stream.log.node("OUT", node, stream);
+ if (node.name == "message" || node.name == "iq" || node.name == "presence") {
+ var r_node = new StanzaNode.build("r", NS_URI).add_self_xmlns();
+ stream.log.node("OUT", r_node, stream);
+ yield ((!)writer).write_nodes(node, r_node);
+ } else {
+ yield ((!)writer).write_node(node);
+ }
+ } catch (XmlError e) {
+ throw new IOStreamError.WRITE(e.message);
+ }
+ }
+
+ private void check_queue(XmppStream stream) throws IOError {
+ while (!node_queue.is_empty && in_flight_stanzas.size < 10) {
+ QueueItem queue_item = node_queue.remove_at(0);
+ StanzaNode node = queue_item.node;
+
+ if (node.name == "message" || node.name == "iq" || node.name == "presence") {
+ in_flight_stanzas[++h_outbound] = queue_item;
+ }
+ write_node.begin(stream, node);
+ }
+ }
public override void attach(XmppStream stream) {
stream.get_module(Bind.Module.IDENTITY).bound_to_resource.connect(check_enable);
@@ -19,7 +77,15 @@ public class Module : XmppStreamNegotiationModule {
stream.received_iq_stanza.connect(on_stanza_received);
}
- public override void detach(XmppStream stream) { }
+ public override void detach(XmppStream stream) {
+ stream.get_module(Bind.Module.IDENTITY).bound_to_resource.disconnect(check_enable);
+ stream.received_features_node.disconnect(check_resume);
+
+ stream.received_nonza.disconnect(on_received_nonza);
+ stream.received_message_stanza.disconnect(on_stanza_received);
+ stream.received_presence_stanza.disconnect(on_stanza_received);
+ stream.received_iq_stanza.disconnect(on_stanza_received);
+ }
public static void require(XmppStream stream) {
if (stream.get_module(IDENTITY) == null) stream.add_module(new PrivateXmlStorage.Module());
@@ -35,7 +101,7 @@ public class Module : XmppStreamNegotiationModule {
public override string get_id() { return IDENTITY.id; }
private void on_stanza_received(XmppStream stream, StanzaNode node) {
- lock (h_inbound) h_inbound++;
+ h_inbound++;
}
private void check_resume(XmppStream stream) {
@@ -45,7 +111,7 @@ public class Module : XmppStreamNegotiationModule {
StanzaNode node = new StanzaNode.build("resume", NS_URI).add_self_xmlns()
.put_attribute("h", h_inbound.to_string())
.put_attribute("previd", session_id);
- stream.write(node);
+ write_node(stream, node);
stream.add_flag(new Flag());
}
}
@@ -54,8 +120,9 @@ public class Module : XmppStreamNegotiationModule {
private void check_enable(XmppStream stream) {
if (stream_has_sm_feature(stream) && session_id == null) {
StanzaNode node = new StanzaNode.build("enable", NS_URI).add_self_xmlns().put_attribute("resume", "true");
- stream.write(node);
+ write_node(stream, node);
stream.add_flag(new Flag());
+ h_outbound = 0;
}
}
@@ -69,17 +136,32 @@ public class Module : XmppStreamNegotiationModule {
stream.get_flag(Flag.IDENTITY).finished = true;
if (node.name == "enabled") {
- lock(h_inbound) h_inbound = 0;
+ h_inbound = 0;
session_id = node.get_attribute("id", NS_URI);
flags = stream.flags;
+ stream.write_obj = this;
} else if (node.name == "resumed") {
foreach (XmppStreamFlag flag in flags) {
stream.add_flag(flag);
}
stream.negotiation_complete = true;
+
+ h_outbound = int.parse(node.get_attribute("h", NS_URI));
+ handle_incoming_h(stream, h_outbound);
+ foreach (var id in in_flight_stanzas.keys) {
+ node_queue.add(in_flight_stanzas[id]);
+ }
+ in_flight_stanzas.clear();
+ check_queue(stream);
+ stream.write_obj = this;
} else if (node.name == "failed") {
stream.received_features_node(stream);
session_id = null;
+ foreach (var id in in_flight_stanzas.keys) {
+ in_flight_stanzas[id].promise.set_exception(new IOError.FAILED("bla"));
+ }
+ in_flight_stanzas.clear();
+ check_queue(stream);
}
}
}
@@ -87,11 +169,27 @@ public class Module : XmppStreamNegotiationModule {
private void send_ack(XmppStream stream) {
StanzaNode node = new StanzaNode.build("a", NS_URI).add_self_xmlns().put_attribute("h", h_inbound.to_string());
- stream.write(node);
+ write_node(stream, node);
}
private void handle_ack(XmppStream stream, StanzaNode node) {
+ string? h_acked = node.get_attribute("h", NS_URI);
+ int parsed_int = int.parse(h_acked);
+ handle_incoming_h(stream, parsed_int);
+ check_queue(stream);
+ }
+ private void handle_incoming_h(XmppStream stream, int h) {
+ var remove_nrs = new ArrayList<int>();
+ foreach (int nr in in_flight_stanzas.keys) {
+ if (nr <= h) {
+ in_flight_stanzas[nr].promise.set_value(null);
+ remove_nrs.add(nr);
+ }
+ }
+ foreach (int nr in remove_nrs) {
+ in_flight_stanzas.unset(nr);
+ }
}
private bool stream_has_sm_feature(XmppStream stream) {