From c3532bdf3141bcf0cbf9e4ae7a926dcda4f132ef Mon Sep 17 00:00:00 2001 From: fiaxh Date: Wed, 18 Dec 2019 18:53:14 +0100 Subject: Refactor MAM catchup. Fetch from latest to earliest message. --- xmpp-vala/src/module/message/module.vala | 4 ++ xmpp-vala/src/module/xep/0045_muc/module.vala | 9 ++- .../xep/0313_message_archive_management.vala | 83 ++++++++++++++++------ 3 files changed, 70 insertions(+), 26 deletions(-) (limited to 'xmpp-vala/src/module') diff --git a/xmpp-vala/src/module/message/module.vala b/xmpp-vala/src/module/message/module.vala index ab3a7d80..5ddbbe1a 100644 --- a/xmpp-vala/src/module/message/module.vala +++ b/xmpp-vala/src/module/message/module.vala @@ -12,6 +12,7 @@ namespace Xmpp { public StanzaListenerHolder send_pipeline = new StanzaListenerHolder(); public signal void received_message(XmppStream stream, MessageStanza message); + public signal void received_message_unprocessed(XmppStream stream, MessageStanza message); public void send_message(XmppStream stream, MessageStanza message) { send_pipeline.run.begin(stream, message, (obj, res) => { @@ -21,6 +22,9 @@ namespace Xmpp { public async void received_message_stanza_async(XmppStream stream, StanzaNode node) { MessageStanza message = new MessageStanza.from_stanza(node, stream.get_flag(Bind.Flag.IDENTITY).my_jid); + + received_message_unprocessed(stream, message); + if (!message.is_error()) { bool abort = yield received_pipeline.run(stream, message); if (abort) return; diff --git a/xmpp-vala/src/module/xep/0045_muc/module.vala b/xmpp-vala/src/module/xep/0045_muc/module.vala index b30145ff..59d61f3d 100644 --- a/xmpp-vala/src/module/xep/0045_muc/module.vala +++ b/xmpp-vala/src/module/xep/0045_muc/module.vala @@ -487,16 +487,15 @@ public class ReceivedPipelineListener : StanzaListener { StanzaNode? invite_node = x_node.get_subnode("invite", NS_URI_USER); string? password = null; StanzaNode? password_node = x_node.get_subnode("password", NS_URI_USER); - if (password_node != null) - password = password_node.get_string_content(); + if (password_node != null) password = password_node.get_string_content(); if (invite_node != null) { string? from_jid = invite_node.get_attribute("from"); if (from_jid != null) { StanzaNode? reason_node = invite_node.get_subnode("reason", NS_URI_USER); string? reason = null; - if (reason_node != null) - reason = reason_node.get_string_content(); - outer.invite_received(stream, message.from, new Jid(from_jid), password, reason); + if (reason_node != null) reason = reason_node.get_string_content(); + bool is_mam_message = Xep.MessageArchiveManagement.MessageFlag.get_flag(message) != null; // TODO + if (!is_mam_message) outer.invite_received(stream, message.from, new Jid(from_jid), password, reason); return true; } } diff --git a/xmpp-vala/src/module/xep/0313_message_archive_management.vala b/xmpp-vala/src/module/xep/0313_message_archive_management.vala index 4f8cadec..247e1b8e 100644 --- a/xmpp-vala/src/module/xep/0313_message_archive_management.vala +++ b/xmpp-vala/src/module/xep/0313_message_archive_management.vala @@ -14,10 +14,7 @@ public class Module : XmppStreamModule { private ReceivedPipelineListener received_pipeline_listener = new ReceivedPipelineListener(); - public delegate void OnFinished(XmppStream stream); - public void query_archive(XmppStream stream, string? jid, DateTime? start, DateTime? end, owned OnFinished? on_finished = null) { - if (stream.get_flag(Flag.IDENTITY) == null) return; - + private StanzaNode crate_base_query(XmppStream stream, string? jid, string? queryid, DateTime? start, DateTime? end) { DataForms.DataForm data_form = new DataForms.DataForm(); DataForms.DataForm.HiddenField form_type_field = new DataForms.DataForm.HiddenField() { var="FORM_TYPE" }; form_type_field.set_value_string(NS_VER(stream)); @@ -38,8 +35,41 @@ public class Module : XmppStreamModule { data_form.add_field(field); } StanzaNode query_node = new StanzaNode.build("query", NS_VER(stream)).add_self_xmlns().put_node(data_form.get_submit_node()); + if (queryid != null) { + query_node.put_attribute("queryid", queryid); + } + return query_node; + } + + private StanzaNode create_set_rsm_node(string? before_id) { + var before_node = new StanzaNode.build("before", "http://jabber.org/protocol/rsm"); + if (before_id != null) { + before_node.put_node(new StanzaNode.text(before_id)); + } + var max_node = (new StanzaNode.build("max", "http://jabber.org/protocol/rsm")).put_node(new StanzaNode.text("20")); + return (new StanzaNode.build("set", "http://jabber.org/protocol/rsm")).add_self_xmlns() + .put_node(before_node) + .put_node(max_node); + } + + public async Iq.Stanza? query_archive(XmppStream stream, string? jid, string? query_id, DateTime? start_time, string? start_id, DateTime? end_time, string? end_id) { + if (stream.get_flag(Flag.IDENTITY) == null) return null; + + var query_node = crate_base_query(stream, jid, query_id, start_time, end_time); + + query_node.put_node(create_set_rsm_node(end_id)); Iq.Stanza iq = new Iq.Stanza.set(query_node); - stream.get_module(Iq.Module.IDENTITY).send_iq(stream, iq, (stream, iq) => { page_through_results(stream, iq, (owned)on_finished); }); + + debug(@"OUT INIT: %s", iq.stanza.to_string()); + + Iq.Stanza? result_iq = null; + stream.get_module(Iq.Module.IDENTITY).send_iq(stream, iq, (stream, iq) => { + result_iq = iq; + Idle.add(query_archive.callback); + }); + yield; + + return result_iq; } public override void attach(XmppStream stream) { @@ -54,22 +84,30 @@ public class Module : XmppStreamModule { public override string get_ns() { return NS_URI; } public override string get_id() { return IDENTITY.id; } - private static void page_through_results(XmppStream stream, Iq.Stanza iq, owned OnFinished? on_finished = null) { - string? last = iq.stanza.get_deep_string_content(NS_VER(stream) + ":fin", "http://jabber.org/protocol/rsm" + ":set", "last"); - if (last == null) { - stream.get_flag(Flag.IDENTITY).cought_up = true; - if (on_finished != null) on_finished(stream); - return; + public async Iq.Stanza? page_through_results(XmppStream stream, string? jid, string? query_id, DateTime? start_time, DateTime? end_time, Iq.Stanza iq) { + + string? complete = iq.stanza.get_deep_attribute("urn:xmpp:mam:2:fin", "complete"); + if (complete == "true") { + return null; + } + string? first = iq.stanza.get_deep_string_content(NS_VER(stream) + ":fin", "http://jabber.org/protocol/rsm" + ":set", "first"); + if (first == null) { + return null; } - Iq.Stanza paging_iq = new Iq.Stanza.set( - new StanzaNode.build("query", NS_VER(stream)).add_self_xmlns().put_node( - new StanzaNode.build("set", "http://jabber.org/protocol/rsm").add_self_xmlns().put_node( - new StanzaNode.build("after", "http://jabber.org/protocol/rsm").put_node(new StanzaNode.text(last)) - ) - ) - ); - stream.get_module(Iq.Module.IDENTITY).send_iq(stream, paging_iq, (stream, iq) => { page_through_results(stream, iq, (owned)on_finished); }); + var query_node = crate_base_query(stream, jid, query_id, start_time, end_time); + query_node.put_node(create_set_rsm_node(first)); + + Iq.Stanza paging_iq = new Iq.Stanza.set(query_node); + + Iq.Stanza? result_iq = null; + stream.get_module(Iq.Module.IDENTITY).send_iq(stream, paging_iq, (stream, iq) => { + result_iq = iq; + Idle.add(page_through_results.callback); + }); + yield; + + return result_iq; } private void query_availability(XmppStream stream) { @@ -107,7 +145,8 @@ public class ReceivedPipelineListener : StanzaListener { StanzaNode? forward_node = message.stanza.get_deep_subnode(NS_VER(stream) + ":result", "urn:xmpp:forward:0:forwarded", DelayedDelivery.NS_URI + ":delay"); DateTime? datetime = DelayedDelivery.Module.get_time_for_node(forward_node); string? mam_id = message.stanza.get_deep_attribute(NS_VER(stream) + ":result", NS_VER(stream) + ":id"); - message.add_flag(new MessageFlag(datetime, mam_id)); + string? query_id = message.stanza.get_deep_attribute(NS_VER(stream) + ":result", NS_VER(stream) + ":queryid"); + message.add_flag(new MessageFlag(datetime, mam_id, query_id)); message.stanza = message_node; message.rerun_parsing = true; @@ -134,10 +173,12 @@ public class MessageFlag : Xmpp.MessageFlag { public DateTime? server_time { get; private set; } public string? mam_id { get; private set; } + public string? query_id { get; private set; } - public MessageFlag(DateTime? server_time, string? mam_id) { + public MessageFlag(DateTime? server_time, string? mam_id, string? query_id) { this.server_time = server_time; this.mam_id = mam_id; + this.query_id = query_id; } public static MessageFlag? get_flag(MessageStanza message) { return (MessageFlag) message.get_flag(NS_URI, ID); } -- cgit v1.2.3-70-g09d2