From 3f531d6b91edab6c79fa232143db828bad13853c Mon Sep 17 00:00:00 2001 From: fiaxh Date: Sat, 11 Nov 2017 21:29:13 +0100 Subject: Read+(write) stream async --- .../xep/0313_message_archive_management.vala | 44 +++++++++++++--------- 1 file changed, 26 insertions(+), 18 deletions(-) (limited to 'xmpp-vala/src/module/xep/0313_message_archive_management.vala') diff --git a/xmpp-vala/src/module/xep/0313_message_archive_management.vala b/xmpp-vala/src/module/xep/0313_message_archive_management.vala index 522f6dca..ac68e190 100644 --- a/xmpp-vala/src/module/xep/0313_message_archive_management.vala +++ b/xmpp-vala/src/module/xep/0313_message_archive_management.vala @@ -5,6 +5,10 @@ namespace Xmpp.Xep.MessageArchiveManagement { public const string NS_URI = "urn:xmpp:mam:2"; public const string NS_URI_1 = "urn:xmpp:mam:1"; +private static string NS_VER(XmppStream stream) { + return stream.get_flag(Flag.IDENTITY).ns_ver; +} + public class Module : XmppStreamModule { public static ModuleIdentity IDENTITY = new ModuleIdentity(NS_URI, "0313_message_archive_management"); @@ -38,7 +42,7 @@ public class Module : XmppStreamModule { } public override void attach(XmppStream stream) { - stream.get_module(Message.Module.IDENTITY).pre_received_message.connect(on_pre_received_message); + stream.get_module(Message.Module.IDENTITY).received_pipeline.connect(new ReceivedPipelineListener()); stream.stream_negotiated.connect(query_availability); } @@ -47,21 +51,6 @@ public class Module : XmppStreamModule { public override string get_ns() { return NS_URI; } public override string get_id() { return IDENTITY.id; } - private void on_pre_received_message(XmppStream stream, Message.Stanza message) { -// if (message.from != stream.remote_name) return; - if (stream.get_flag(Flag.IDENTITY) == null) return; - - StanzaNode? message_node = message.stanza.get_deep_subnode(NS_VER(stream) + ":result", "urn:xmpp:forward:0:forwarded", Message.NS_URI + ":message"); - if (message_node != null) { - StanzaNode? forward_node = message.stanza.get_deep_subnode(NS_VER(stream) + ":result", "urn:xmpp:forward:0:forwarded", DelayedDelivery.NS_URI + ":delay"); - DateTime? datetime = DelayedDelivery.Module.get_time_for_node(forward_node); - message.add_flag(new MessageFlag(datetime)); - - message.stanza = message_node; - message.rerun_parsing = true; - } - } - private static void page_through_results(XmppStream stream, Iq.Stanza iq) { string? last = iq.stanza.get_deep_string_content(NS_VER(stream) + ":fin", "http://jabber.org/protocol/rsm" + ":set", "last"); if (last == null) { @@ -89,9 +78,28 @@ public class Module : XmppStreamModule { if (stream.get_flag(Flag.IDENTITY) != null) feature_available(stream); }); } +} + +public class ReceivedPipelineListener : StanzaListener { + + private const string[] after_actions_const = {}; + + public override string action_group { get { return "EXTRACT_MESSAGE_1"; } } + public override string[] after_actions { get { return after_actions_const; } } - private static string NS_VER(XmppStream stream) { - return stream.get_flag(Flag.IDENTITY).ns_ver; + public override async void run(Core.XmppStream stream, Message.Stanza message) { + // if (message.from != stream.remote_name) return; + if (stream.get_flag(Flag.IDENTITY) == null) return; + + StanzaNode? message_node = message.stanza.get_deep_subnode(NS_VER(stream) + ":result", "urn:xmpp:forward:0:forwarded", Message.NS_URI + ":message"); + if (message_node != null) { + StanzaNode? forward_node = message.stanza.get_deep_subnode(NS_VER(stream) + ":result", "urn:xmpp:forward:0:forwarded", DelayedDelivery.NS_URI + ":delay"); + DateTime? datetime = DelayedDelivery.Module.get_time_for_node(forward_node); + message.add_flag(new MessageFlag(datetime)); + + message.stanza = message_node; + message.rerun_parsing = true; + } } } -- cgit v1.2.3-54-g00ecf