From 3f531d6b91edab6c79fa232143db828bad13853c Mon Sep 17 00:00:00 2001 From: fiaxh Date: Sat, 11 Nov 2017 21:29:13 +0100 Subject: Read+(write) stream async --- xmpp-vala/src/module/message/module.vala | 17 ++++--- xmpp-vala/src/module/util.vala | 59 ++++++++++++++++++++++ .../module/xep/0085_chat_state_notifications.vala | 23 ++++++--- .../module/xep/0184_message_delivery_receipts.vala | 26 ++++++---- .../src/module/xep/0203_delayed_delivery.vala | 18 +++++-- xmpp-vala/src/module/xep/0280_message_carbons.vala | 47 +++++++++-------- .../xep/0313_message_archive_management.vala | 44 +++++++++------- xmpp-vala/src/module/xep/0333_chat_markers.vala | 13 +++-- 8 files changed, 177 insertions(+), 70 deletions(-) (limited to 'xmpp-vala/src/module') diff --git a/xmpp-vala/src/module/message/module.vala b/xmpp-vala/src/module/message/module.vala index f7038ef8..2ca06dc4 100644 --- a/xmpp-vala/src/module/message/module.vala +++ b/xmpp-vala/src/module/message/module.vala @@ -8,24 +8,27 @@ namespace Xmpp.Message { public class Module : XmppStreamModule { public static ModuleIdentity IDENTITY = new ModuleIdentity(NS_URI, "message_module"); - public signal void pre_send_message(XmppStream stream, Message.Stanza message); + public StanzaListenerHolder received_pipeline = new StanzaListenerHolder(); + public StanzaListenerHolder send_pipeline = new StanzaListenerHolder(); + public signal void pre_received_message(XmppStream stream, Message.Stanza message); public signal void received_message(XmppStream stream, Message.Stanza message); public void send_message(XmppStream stream, Message.Stanza message) { - pre_send_message(stream, message); + send_pipeline.run.begin(stream, message); stream.write(message.stanza); } - public void received_message_stanza(XmppStream stream, StanzaNode node) { + public async void received_message_stanza_async(XmppStream stream, StanzaNode node) { Message.Stanza message = new Message.Stanza.from_stanza(node, stream.get_flag(Bind.Flag.IDENTITY).my_jid); - do { - message.rerun_parsing = false; - pre_received_message(stream, message); - } while(message.rerun_parsing); + yield received_pipeline.run(stream, message); received_message(stream, message); } + private void received_message_stanza(XmppStream stream, StanzaNode node) { + received_message_stanza_async.begin(stream, node); + } + public override void attach(XmppStream stream) { stream.received_message_stanza.connect(received_message_stanza); } diff --git a/xmpp-vala/src/module/util.vala b/xmpp-vala/src/module/util.vala index 365170b0..e6626049 100644 --- a/xmpp-vala/src/module/util.vala +++ b/xmpp-vala/src/module/util.vala @@ -1,3 +1,5 @@ +using Gee; + namespace Xmpp { public string get_bare_jid(string jid) { return jid.split("/")[0]; @@ -20,4 +22,61 @@ namespace Xmpp { uint32 b5_2 = Random.next_int(); return "%08x-%04x-%04x-%04x-%04x%08x".printf(b1, b2, b3, b4, b5_1, b5_2); } + +public abstract class StanzaListener : Object { + public abstract string action_group { get; } + public abstract string[] after_actions { get; } + public abstract async void run(Core.XmppStream stream, T stanza); +} + +public class StanzaListenerHolder : Object { + private Gee.List> listeners = new ArrayList>(); + + public new void connect(StanzaListener listener) { + listeners.add(listener); + resort_list(); + } + + public async void run(Core.XmppStream stream, T stanza) { + foreach (StanzaListener l in listeners) { + yield l.run(stream, stanza); + } + } + + private Gee.List> set_minus(Gee.List> main_set, Gee.List> minus) { + Gee.List> res = new ArrayList>(); + foreach (StanzaListener l in main_set) { + if (!minus.contains(l)) { + res.add(l); + } + } + return res; + } + + private bool set_contains_action(Gee.List> s, string[] actions) { + foreach(StanzaListener l in s) { + if (l.action_group in actions) { + return true; + } + } + return false; + } + + private void resort_list() { + Gee.List> new_list = new ArrayList>(); + while (listeners.size > new_list.size) { + bool changed = false; + foreach (StanzaListener l in listeners) { + Gee.List> remaining = set_minus(listeners, new_list); + if (!set_contains_action(remaining, l.after_actions)) { + new_list.add(l); + changed = true; + } + } + if (!changed) warning("Can't sort listeners"); + } + listeners = new_list; + } +} + } diff --git a/xmpp-vala/src/module/xep/0085_chat_state_notifications.vala b/xmpp-vala/src/module/xep/0085_chat_state_notifications.vala index 5de504a2..3ca97282 100644 --- a/xmpp-vala/src/module/xep/0085_chat_state_notifications.vala +++ b/xmpp-vala/src/module/xep/0085_chat_state_notifications.vala @@ -31,24 +31,17 @@ public class Module : XmppStreamModule { public override void attach(XmppStream stream) { stream.get_module(ServiceDiscovery.Module.IDENTITY).add_feature(stream, NS_URI); - stream.get_module(Message.Module.IDENTITY).pre_send_message.connect(on_pre_send_message); + stream.get_module(Message.Module.IDENTITY).send_pipeline.connect(new SendPipelineListener()); stream.get_module(Message.Module.IDENTITY).received_message.connect(on_received_message); } public override void detach(XmppStream stream) { - stream.get_module(Message.Module.IDENTITY).pre_send_message.disconnect(on_pre_send_message); stream.get_module(Message.Module.IDENTITY).received_message.disconnect(on_received_message); } public override string get_ns() { return NS_URI; } public override string get_id() { return IDENTITY.id; } - private void on_pre_send_message(XmppStream stream, Message.Stanza message) { - if (message.body == null) return; - if (message.type_ != Message.Stanza.TYPE_CHAT) return; - message.stanza.put_node(new StanzaNode.build(STATE_ACTIVE, NS_URI).add_self_xmlns()); - } - private void on_received_message(XmppStream stream, Message.Stanza message) { if (!message.is_error()) { Gee.List nodes = message.stanza.get_all_subnodes(); @@ -62,4 +55,18 @@ public class Module : XmppStreamModule { } } +public class SendPipelineListener : StanzaListener { + + private const string[] after_actions_const = {"MODIFY_BODY"}; + + public override string action_group { get { return "ADD_NODES"; } } + public override string[] after_actions { get { return after_actions_const; } } + + public override async void run(Core.XmppStream stream, Message.Stanza message) { + if (message.body == null) return; + if (message.type_ != Message.Stanza.TYPE_CHAT) return; + message.stanza.put_node(new StanzaNode.build(STATE_ACTIVE, NS_URI).add_self_xmlns()); + } +} + } diff --git a/xmpp-vala/src/module/xep/0184_message_delivery_receipts.vala b/xmpp-vala/src/module/xep/0184_message_delivery_receipts.vala index 71a864c3..c9cb2d40 100644 --- a/xmpp-vala/src/module/xep/0184_message_delivery_receipts.vala +++ b/xmpp-vala/src/module/xep/0184_message_delivery_receipts.vala @@ -22,12 +22,11 @@ namespace Xmpp.Xep.MessageDeliveryReceipts { public override void attach(XmppStream stream) { stream.get_module(ServiceDiscovery.Module.IDENTITY).add_feature(stream, NS_URI); stream.get_module(Message.Module.IDENTITY).received_message.connect(received_message); - stream.get_module(Message.Module.IDENTITY).pre_send_message.connect(pre_send_message); + stream.get_module(Message.Module.IDENTITY).send_pipeline.connect(new SendPipelineListener()); } public override void detach(XmppStream stream) { stream.get_module(Message.Module.IDENTITY).received_message.disconnect(received_message); - stream.get_module(Message.Module.IDENTITY).pre_send_message.disconnect(pre_send_message); } public override string get_ns() { return NS_URI; } @@ -39,13 +38,22 @@ namespace Xmpp.Xep.MessageDeliveryReceipts { receipt_received(stream, message.from, received_node.get_attribute("id", NS_URI)); } } + } - private void pre_send_message(XmppStream stream, Message.Stanza message) { - StanzaNode? received_node = message.stanza.get_subnode("received", NS_URI); - if (received_node != null) return; - if (message.body == null) return; - if (message.type_ == Message.Stanza.TYPE_GROUPCHAT) return; - message.stanza.put_node(new StanzaNode.build("request", NS_URI).add_self_xmlns()); - } +public class SendPipelineListener : StanzaListener { + + private const string[] after_actions_const = {}; + + public override string action_group { get { return "ADD_NODES"; } } + public override string[] after_actions { get { return after_actions_const; } } + + public override async void run(Core.XmppStream stream, Message.Stanza message) { + StanzaNode? received_node = message.stanza.get_subnode("received", NS_URI); + if (received_node != null) return; + if (message.body == null) return; + if (message.type_ == Message.Stanza.TYPE_GROUPCHAT) return; + message.stanza.put_node(new StanzaNode.build("request", NS_URI).add_self_xmlns()); } } + +} diff --git a/xmpp-vala/src/module/xep/0203_delayed_delivery.vala b/xmpp-vala/src/module/xep/0203_delayed_delivery.vala index 8ca300c9..89c761f2 100644 --- a/xmpp-vala/src/module/xep/0203_delayed_delivery.vala +++ b/xmpp-vala/src/module/xep/0203_delayed_delivery.vala @@ -27,19 +27,27 @@ namespace Xmpp.Xep.DelayedDelivery { } public override void attach(XmppStream stream) { - stream.get_module(Message.Module.IDENTITY).pre_received_message.connect(on_pre_received_message); + stream.get_module(Message.Module.IDENTITY).received_pipeline.connect(new ReceivedPipelineListener()); } public override void detach(XmppStream stream) { } public override string get_ns() { return NS_URI; } public override string get_id() { return IDENTITY.id; } + } - private void on_pre_received_message(XmppStream stream, Message.Stanza message) { - DateTime? datetime = get_time_for_message(message); - if (datetime != null) message.add_flag(new MessageFlag(datetime)); - } +public class ReceivedPipelineListener : StanzaListener { + + private const string[] after_actions_const = {}; + + public override string action_group { get { return "ADD_NODE"; } } + public override string[] after_actions { get { return after_actions_const; } } + + public override async void run(Core.XmppStream stream, Message.Stanza message) { + DateTime? datetime = Module.get_time_for_message(message); + if (datetime != null) message.add_flag(new MessageFlag(datetime)); } +} public class MessageFlag : Message.MessageFlag { public const string ID = "delayed_delivery"; diff --git a/xmpp-vala/src/module/xep/0280_message_carbons.vala b/xmpp-vala/src/module/xep/0280_message_carbons.vala index b2d21646..930c5234 100644 --- a/xmpp-vala/src/module/xep/0280_message_carbons.vala +++ b/xmpp-vala/src/module/xep/0280_message_carbons.vala @@ -18,43 +18,50 @@ namespace Xmpp.Xep.MessageCarbons { public override void attach(XmppStream stream) { stream.stream_negotiated.connect(enable); - stream.get_module(Message.Module.IDENTITY).pre_received_message.connect(pre_received_message); + stream.get_module(Message.Module.IDENTITY).received_pipeline.connect(new ReceivedPipelineListener()); stream.get_module(ServiceDiscovery.Module.IDENTITY).add_feature(stream, NS_URI); } public override void detach(XmppStream stream) { stream.stream_negotiated.disconnect(enable); - stream.get_module(Message.Module.IDENTITY).pre_received_message.disconnect(pre_received_message); } public override string get_ns() { return NS_URI; } public override string get_id() { return IDENTITY.id; } + } + +public class ReceivedPipelineListener : StanzaListener { + + private const string[] after_actions_const = {"EXTRACT_MESSAGE_1"}; - private void pre_received_message(XmppStream stream, Message.Stanza message) { - StanzaNode? received_node = message.stanza.get_subnode("received", NS_URI); - StanzaNode? sent_node = received_node == null ? message.stanza.get_subnode("sent", NS_URI) : null; - StanzaNode? carbons_node = received_node != null ? received_node : sent_node; - if (carbons_node != null) { - StanzaNode? forwarded_node = carbons_node.get_subnode("forwarded", "urn:xmpp:forward:0"); - if (forwarded_node != null) { - StanzaNode? message_node = forwarded_node.get_subnode("message", Message.NS_URI); - string? from_attribute = message_node.get_attribute("from", Message.NS_URI); - // Any forwarded copies received by a Carbons-enabled client MUST be from that user's bare JID; any copies that do not meet this requirement MUST be ignored. - if (from_attribute != null && from_attribute == get_bare_jid(stream.get_flag(Bind.Flag.IDENTITY).my_jid)) { - if (received_node != null) { - message.add_flag(new MessageFlag(MessageFlag.TYPE_RECEIVED)); - } else if (sent_node != null) { - message.add_flag(new MessageFlag(MessageFlag.TYPE_SENT)); - } - message.stanza = message_node; - message.rerun_parsing = true; + public override string action_group { get { return "EXTRACT_MESSAGE_2"; } } + public override string[] after_actions { get { return after_actions_const; } } + + public override async void run(Core.XmppStream stream, Message.Stanza message) { + StanzaNode? received_node = message.stanza.get_subnode("received", NS_URI); + StanzaNode? sent_node = received_node == null ? message.stanza.get_subnode("sent", NS_URI) : null; + StanzaNode? carbons_node = received_node != null ? received_node : sent_node; + if (carbons_node != null) { + StanzaNode? forwarded_node = carbons_node.get_subnode("forwarded", "urn:xmpp:forward:0"); + if (forwarded_node != null) { + StanzaNode? message_node = forwarded_node.get_subnode("message", Message.NS_URI); + string? from_attribute = message_node.get_attribute("from", Message.NS_URI); + // Any forwarded copies received by a Carbons-enabled client MUST be from that user's bare JID; any copies that do not meet this requirement MUST be ignored. + if (from_attribute != null && from_attribute == get_bare_jid(stream.get_flag(Bind.Flag.IDENTITY).my_jid)) { + if (received_node != null) { + message.add_flag(new MessageFlag(MessageFlag.TYPE_RECEIVED)); + } else if (sent_node != null) { + message.add_flag(new MessageFlag(MessageFlag.TYPE_SENT)); } message.stanza = message_node; message.rerun_parsing = true; } + message.stanza = message_node; + message.rerun_parsing = true; } } } +} public class MessageFlag : Message.MessageFlag { public const string ID = "message_carbons"; diff --git a/xmpp-vala/src/module/xep/0313_message_archive_management.vala b/xmpp-vala/src/module/xep/0313_message_archive_management.vala index 522f6dca..ac68e190 100644 --- a/xmpp-vala/src/module/xep/0313_message_archive_management.vala +++ b/xmpp-vala/src/module/xep/0313_message_archive_management.vala @@ -5,6 +5,10 @@ namespace Xmpp.Xep.MessageArchiveManagement { public const string NS_URI = "urn:xmpp:mam:2"; public const string NS_URI_1 = "urn:xmpp:mam:1"; +private static string NS_VER(XmppStream stream) { + return stream.get_flag(Flag.IDENTITY).ns_ver; +} + public class Module : XmppStreamModule { public static ModuleIdentity IDENTITY = new ModuleIdentity(NS_URI, "0313_message_archive_management"); @@ -38,7 +42,7 @@ public class Module : XmppStreamModule { } public override void attach(XmppStream stream) { - stream.get_module(Message.Module.IDENTITY).pre_received_message.connect(on_pre_received_message); + stream.get_module(Message.Module.IDENTITY).received_pipeline.connect(new ReceivedPipelineListener()); stream.stream_negotiated.connect(query_availability); } @@ -47,21 +51,6 @@ public class Module : XmppStreamModule { public override string get_ns() { return NS_URI; } public override string get_id() { return IDENTITY.id; } - private void on_pre_received_message(XmppStream stream, Message.Stanza message) { -// if (message.from != stream.remote_name) return; - if (stream.get_flag(Flag.IDENTITY) == null) return; - - StanzaNode? message_node = message.stanza.get_deep_subnode(NS_VER(stream) + ":result", "urn:xmpp:forward:0:forwarded", Message.NS_URI + ":message"); - if (message_node != null) { - StanzaNode? forward_node = message.stanza.get_deep_subnode(NS_VER(stream) + ":result", "urn:xmpp:forward:0:forwarded", DelayedDelivery.NS_URI + ":delay"); - DateTime? datetime = DelayedDelivery.Module.get_time_for_node(forward_node); - message.add_flag(new MessageFlag(datetime)); - - message.stanza = message_node; - message.rerun_parsing = true; - } - } - private static void page_through_results(XmppStream stream, Iq.Stanza iq) { string? last = iq.stanza.get_deep_string_content(NS_VER(stream) + ":fin", "http://jabber.org/protocol/rsm" + ":set", "last"); if (last == null) { @@ -89,9 +78,28 @@ public class Module : XmppStreamModule { if (stream.get_flag(Flag.IDENTITY) != null) feature_available(stream); }); } +} + +public class ReceivedPipelineListener : StanzaListener { + + private const string[] after_actions_const = {}; + + public override string action_group { get { return "EXTRACT_MESSAGE_1"; } } + public override string[] after_actions { get { return after_actions_const; } } - private static string NS_VER(XmppStream stream) { - return stream.get_flag(Flag.IDENTITY).ns_ver; + public override async void run(Core.XmppStream stream, Message.Stanza message) { + // if (message.from != stream.remote_name) return; + if (stream.get_flag(Flag.IDENTITY) == null) return; + + StanzaNode? message_node = message.stanza.get_deep_subnode(NS_VER(stream) + ":result", "urn:xmpp:forward:0:forwarded", Message.NS_URI + ":message"); + if (message_node != null) { + StanzaNode? forward_node = message.stanza.get_deep_subnode(NS_VER(stream) + ":result", "urn:xmpp:forward:0:forwarded", DelayedDelivery.NS_URI + ":delay"); + DateTime? datetime = DelayedDelivery.Module.get_time_for_node(forward_node); + message.add_flag(new MessageFlag(datetime)); + + message.stanza = message_node; + message.rerun_parsing = true; + } } } diff --git a/xmpp-vala/src/module/xep/0333_chat_markers.vala b/xmpp-vala/src/module/xep/0333_chat_markers.vala index a0e42510..9c3251dc 100644 --- a/xmpp-vala/src/module/xep/0333_chat_markers.vala +++ b/xmpp-vala/src/module/xep/0333_chat_markers.vala @@ -31,12 +31,11 @@ public class Module : XmppStreamModule { public override void attach(XmppStream stream) { stream.get_module(ServiceDiscovery.Module.IDENTITY).add_feature(stream, NS_URI); - stream.get_module(Message.Module.IDENTITY).pre_send_message.connect(on_pre_send_message); + stream.get_module(Message.Module.IDENTITY).send_pipeline.connect(new SendPipelineListener()); stream.get_module(Message.Module.IDENTITY).received_message.connect(on_received_message); } public override void detach(XmppStream stream) { - stream.get_module(Message.Module.IDENTITY).pre_send_message.disconnect(on_pre_send_message); stream.get_module(Message.Module.IDENTITY).received_message.disconnect(on_received_message); } @@ -52,8 +51,16 @@ public class Module : XmppStreamModule { } } } +} + +public class SendPipelineListener : StanzaListener { + + private const string[] after_actions_const = {}; + + public override string action_group { get { return "ADD_NODES"; } } + public override string[] after_actions { get { return after_actions_const; } } - private void on_pre_send_message(XmppStream stream, Message.Stanza message) { + public override async void run(Core.XmppStream stream, Message.Stanza message) { StanzaNode? received_node = message.stanza.get_subnode("received", NS_URI); if (received_node != null) return; if (message.body == null) return; -- cgit v1.2.3-70-g09d2