From 57c72d2818dec6c713834cfbb8c4c566a1602907 Mon Sep 17 00:00:00 2001 From: fiaxh Date: Fri, 19 Jan 2018 22:37:02 +0100 Subject: Pipeline for incoming messages in libdino --- xmpp-vala/src/module/message/module.vala | 1 - xmpp-vala/src/module/util.vala | 47 +++++++++++++--------- .../module/xep/0085_chat_state_notifications.vala | 9 +++-- .../module/xep/0184_message_delivery_receipts.vala | 9 +++-- .../src/module/xep/0203_delayed_delivery.vala | 3 +- xmpp-vala/src/module/xep/0280_message_carbons.vala | 16 ++++---- .../xep/0313_message_archive_management.vala | 5 ++- xmpp-vala/src/module/xep/0333_chat_markers.vala | 9 +++-- 8 files changed, 56 insertions(+), 43 deletions(-) (limited to 'xmpp-vala') diff --git a/xmpp-vala/src/module/message/module.vala b/xmpp-vala/src/module/message/module.vala index cfb1d750..864b4f71 100644 --- a/xmpp-vala/src/module/message/module.vala +++ b/xmpp-vala/src/module/message/module.vala @@ -11,7 +11,6 @@ namespace Xmpp { public StanzaListenerHolder received_pipeline = new StanzaListenerHolder(); public StanzaListenerHolder send_pipeline = new StanzaListenerHolder(); - public signal void pre_received_message(XmppStream stream, MessageStanza message); public signal void received_message(XmppStream stream, MessageStanza message); public void send_message(XmppStream stream, MessageStanza message) { diff --git a/xmpp-vala/src/module/util.vala b/xmpp-vala/src/module/util.vala index 1043dee1..02f391ae 100644 --- a/xmpp-vala/src/module/util.vala +++ b/xmpp-vala/src/module/util.vala @@ -12,34 +12,42 @@ public string random_uuid() { return "%08x-%04x-%04x-%04x-%04x%08x".printf(b1, b2, b3, b4, b5_1, b5_2); } -public abstract class StanzaListener : Object { +public abstract class StanzaListener : OrderedListener { + + public abstract async bool run(XmppStream stream, T stanza); +} + +public class StanzaListenerHolder : ListenerHolder { + + public async void run(XmppStream stream, T stanza) { + foreach (OrderedListener ol in listeners) { + StanzaListener l = ol as StanzaListener; + bool stop = yield l.run(stream, stanza); + if (stop) break; + } + } +} + +public abstract class OrderedListener : Object { public abstract string action_group { get; } public abstract string[] after_actions { get; } - - public abstract async void run(XmppStream stream, T stanza); } -public class StanzaListenerHolder : Object { - private ArrayList> listeners = new ArrayList>(); +public abstract class ListenerHolder : Object { + protected ArrayList listeners = new ArrayList(); - public new void connect(StanzaListener listener) { + public new void connect(OrderedListener listener) { listeners.add(listener); resort_list(); } - public new void disconnect(StanzaListener listener) { + public new void disconnect(OrderedListener listener) { listeners.remove(listener); resort_list(); } - public async void run(XmppStream stream, T stanza) { - foreach (StanzaListener l in listeners) { - yield l.run(stream, stanza); - } - } - - private bool set_contains_action(Gee.List> s, string[] actions) { - foreach (StanzaListener l in s) { + private bool set_contains_action(Gee.List s, string[] actions) { + foreach(OrderedListener l in s) { if (l.action_group in actions) { return true; } @@ -48,22 +56,23 @@ public class StanzaListenerHolder : Object { } private void resort_list() { - ArrayList> new_list = new ArrayList>(); - ArrayList> remaining = new ArrayList>(); + ArrayList new_list = new ArrayList(); + ArrayList remaining = new ArrayList(); remaining.add_all(listeners); while (remaining.size > 0) { bool changed = false; - Gee.Iterator> iter = remaining.iterator(); + Gee.Iterator iter = remaining.iterator(); while (iter.has_next()) { if (!iter.valid) { iter.next(); } - StanzaListener l = iter.get(); + OrderedListener l = iter.get(); if (!set_contains_action(remaining, l.after_actions)) { new_list.add(l); iter.remove(); changed = true; } + iter.next(); } if (!changed) error("Can't sort listeners"); } diff --git a/xmpp-vala/src/module/xep/0085_chat_state_notifications.vala b/xmpp-vala/src/module/xep/0085_chat_state_notifications.vala index cc18c52f..9d23c716 100644 --- a/xmpp-vala/src/module/xep/0085_chat_state_notifications.vala +++ b/xmpp-vala/src/module/xep/0085_chat_state_notifications.vala @@ -9,7 +9,7 @@ public const string STATE_GONE = "gone"; public const string STATE_COMPOSING = "composing"; public const string STATE_PAUSED = "paused"; -private const string[] STATES = {STATE_ACTIVE, STATE_INACTIVE, STATE_GONE, STATE_COMPOSING, STATE_PAUSED}; +private const string[] STATES = { STATE_ACTIVE, STATE_INACTIVE, STATE_GONE, STATE_COMPOSING, STATE_PAUSED }; public class Module : XmppStreamModule { public static ModuleIdentity IDENTITY = new ModuleIdentity(NS_URI, "0085_chat_state_notifications"); @@ -63,10 +63,11 @@ public class SendPipelineListener : StanzaListener { public override string action_group { get { return "ADD_NODES"; } } public override string[] after_actions { get { return after_actions_const; } } - public override async void run(XmppStream stream, MessageStanza message) { - if (message.body == null) return; - if (message.type_ != MessageStanza.TYPE_CHAT) return; + public override async bool run(XmppStream stream, MessageStanza message) { + if (message.body == null) return false; + if (message.type_ != MessageStanza.TYPE_CHAT) return false; message.stanza.put_node(new StanzaNode.build(STATE_ACTIVE, NS_URI).add_self_xmlns()); + return false; } } diff --git a/xmpp-vala/src/module/xep/0184_message_delivery_receipts.vala b/xmpp-vala/src/module/xep/0184_message_delivery_receipts.vala index 5e3cb320..b51178c7 100644 --- a/xmpp-vala/src/module/xep/0184_message_delivery_receipts.vala +++ b/xmpp-vala/src/module/xep/0184_message_delivery_receipts.vala @@ -48,12 +48,13 @@ public class SendPipelineListener : StanzaListener { public override string action_group { get { return "ADD_NODES"; } } public override string[] after_actions { get { return after_actions_const; } } - public override async void run(XmppStream stream, MessageStanza message) { + public override async bool run(XmppStream stream, MessageStanza message) { StanzaNode? received_node = message.stanza.get_subnode("received", NS_URI); - if (received_node != null) return; - if (message.body == null) return; - if (message.type_ == MessageStanza.TYPE_GROUPCHAT) return; + if (received_node != null) return false; + if (message.body == null) return false; + if (message.type_ == MessageStanza.TYPE_GROUPCHAT) return false; message.stanza.put_node(new StanzaNode.build("request", NS_URI).add_self_xmlns()); + return false; } } diff --git a/xmpp-vala/src/module/xep/0203_delayed_delivery.vala b/xmpp-vala/src/module/xep/0203_delayed_delivery.vala index 8581ed93..39666fa8 100644 --- a/xmpp-vala/src/module/xep/0203_delayed_delivery.vala +++ b/xmpp-vala/src/module/xep/0203_delayed_delivery.vala @@ -51,9 +51,10 @@ public class ReceivedPipelineListener : StanzaListener { public override string action_group { get { return "ADD_NODE"; } } public override string[] after_actions { get { return after_actions_const; } } - public override async void run(XmppStream stream, MessageStanza message) { + public override async bool run(XmppStream stream, MessageStanza message) { DateTime? datetime = Module.get_time_for_message(message); if (datetime != null) message.add_flag(new MessageFlag(datetime)); + return false; } } diff --git a/xmpp-vala/src/module/xep/0280_message_carbons.vala b/xmpp-vala/src/module/xep/0280_message_carbons.vala index 9e85a607..fca35606 100644 --- a/xmpp-vala/src/module/xep/0280_message_carbons.vala +++ b/xmpp-vala/src/module/xep/0280_message_carbons.vala @@ -44,7 +44,7 @@ public class ReceivedPipelineListener : StanzaListener { public override string action_group { get { return "EXTRACT_MESSAGE_2"; } } public override string[] after_actions { get { return after_actions_const; } } - public override async void run(XmppStream stream, MessageStanza message) { + public override async bool run(XmppStream stream, MessageStanza message) { StanzaNode? received_node = message.stanza.get_subnode("received", NS_URI); StanzaNode? sent_node = received_node == null ? message.stanza.get_subnode("sent", NS_URI) : null; StanzaNode? carbons_node = received_node != null ? received_node : sent_node; @@ -55,18 +55,18 @@ public class ReceivedPipelineListener : StanzaListener { string? from_attribute = message_node.get_attribute("from", Xmpp.NS_URI); // Any forwarded copies received by a Carbons-enabled client MUST be from that user's bare JID; any copies that do not meet this requirement MUST be ignored. if (from_attribute != null && from_attribute == stream.get_flag(Bind.Flag.IDENTITY).my_jid.bare_jid.to_string()) { - if (received_node != null) { - message.add_flag(new MessageFlag(MessageFlag.TYPE_RECEIVED)); - } else if (sent_node != null) { - message.add_flag(new MessageFlag(MessageFlag.TYPE_SENT)); - } - message.stanza = message_node; - message.rerun_parsing = true; + return true; + } + if (received_node != null) { + message.add_flag(new MessageFlag(MessageFlag.TYPE_RECEIVED)); + } else if (sent_node != null) { + message.add_flag(new MessageFlag(MessageFlag.TYPE_SENT)); } message.stanza = message_node; message.rerun_parsing = true; } } + return false; } } diff --git a/xmpp-vala/src/module/xep/0313_message_archive_management.vala b/xmpp-vala/src/module/xep/0313_message_archive_management.vala index 1c1b51e3..343a5fbd 100644 --- a/xmpp-vala/src/module/xep/0313_message_archive_management.vala +++ b/xmpp-vala/src/module/xep/0313_message_archive_management.vala @@ -91,9 +91,9 @@ public class ReceivedPipelineListener : StanzaListener { public override string action_group { get { return "EXTRACT_MESSAGE_1"; } } public override string[] after_actions { get { return after_actions_const; } } - public override async void run(XmppStream stream, MessageStanza message) { + public override async bool run(XmppStream stream, MessageStanza message) { // if (message.from != stream.remote_name) return; - if (stream.get_flag(Flag.IDENTITY) == null) return; + if (stream.get_flag(Flag.IDENTITY) == null) return false; StanzaNode? message_node = message.stanza.get_deep_subnode(NS_VER(stream) + ":result", "urn:xmpp:forward:0:forwarded", Xmpp.NS_URI + ":message"); if (message_node != null) { @@ -104,6 +104,7 @@ public class ReceivedPipelineListener : StanzaListener { message.stanza = message_node; message.rerun_parsing = true; } + return false; } } diff --git a/xmpp-vala/src/module/xep/0333_chat_markers.vala b/xmpp-vala/src/module/xep/0333_chat_markers.vala index 2cba957a..d5f46c43 100644 --- a/xmpp-vala/src/module/xep/0333_chat_markers.vala +++ b/xmpp-vala/src/module/xep/0333_chat_markers.vala @@ -61,12 +61,13 @@ public class SendPipelineListener : StanzaListener { public override string action_group { get { return "ADD_NODES"; } } public override string[] after_actions { get { return after_actions_const; } } - public override async void run(XmppStream stream, MessageStanza message) { + public override async bool run(XmppStream stream, MessageStanza message) { StanzaNode? received_node = message.stanza.get_subnode("received", NS_URI); - if (received_node != null) return; - if (message.body == null) return; - if (message.type_ != MessageStanza.TYPE_CHAT) return; + if (received_node != null) return false; + if (message.body == null) return false; + if (message.type_ != MessageStanza.TYPE_CHAT) return false; message.stanza.put_node(new StanzaNode.build("markable", NS_URI).add_self_xmlns()); + return false; } } -- cgit v1.2.3-54-g00ecf