aboutsummaryrefslogtreecommitdiff
path: root/xmpp-vala/src/module
diff options
context:
space:
mode:
authorfiaxh <git@mx.ax.lt>2017-11-11 21:29:13 +0100
committerfiaxh <git@mx.ax.lt>2017-11-16 17:43:00 +0100
commit3f531d6b91edab6c79fa232143db828bad13853c (patch)
tree1083046c94e0b4a43cf16ac4a388fcea8ef91e84 /xmpp-vala/src/module
parent1d0745177e7a116455811dfd26e07b848cb89b75 (diff)
downloaddino-3f531d6b91edab6c79fa232143db828bad13853c.tar.gz
dino-3f531d6b91edab6c79fa232143db828bad13853c.zip
Read+(write) stream async
Diffstat (limited to 'xmpp-vala/src/module')
-rw-r--r--xmpp-vala/src/module/message/module.vala17
-rw-r--r--xmpp-vala/src/module/util.vala59
-rw-r--r--xmpp-vala/src/module/xep/0085_chat_state_notifications.vala23
-rw-r--r--xmpp-vala/src/module/xep/0184_message_delivery_receipts.vala26
-rw-r--r--xmpp-vala/src/module/xep/0203_delayed_delivery.vala18
-rw-r--r--xmpp-vala/src/module/xep/0280_message_carbons.vala47
-rw-r--r--xmpp-vala/src/module/xep/0313_message_archive_management.vala44
-rw-r--r--xmpp-vala/src/module/xep/0333_chat_markers.vala13
8 files changed, 177 insertions, 70 deletions
diff --git a/xmpp-vala/src/module/message/module.vala b/xmpp-vala/src/module/message/module.vala
index f7038ef8..2ca06dc4 100644
--- a/xmpp-vala/src/module/message/module.vala
+++ b/xmpp-vala/src/module/message/module.vala
@@ -8,24 +8,27 @@ namespace Xmpp.Message {
public class Module : XmppStreamModule {
public static ModuleIdentity<Module> IDENTITY = new ModuleIdentity<Module>(NS_URI, "message_module");
- public signal void pre_send_message(XmppStream stream, Message.Stanza message);
+ public StanzaListenerHolder<Message.Stanza> received_pipeline = new StanzaListenerHolder<Message.Stanza>();
+ public StanzaListenerHolder<Message.Stanza> send_pipeline = new StanzaListenerHolder<Message.Stanza>();
+
public signal void pre_received_message(XmppStream stream, Message.Stanza message);
public signal void received_message(XmppStream stream, Message.Stanza message);
public void send_message(XmppStream stream, Message.Stanza message) {
- pre_send_message(stream, message);
+ send_pipeline.run.begin(stream, message);
stream.write(message.stanza);
}
- public void received_message_stanza(XmppStream stream, StanzaNode node) {
+ public async void received_message_stanza_async(XmppStream stream, StanzaNode node) {
Message.Stanza message = new Message.Stanza.from_stanza(node, stream.get_flag(Bind.Flag.IDENTITY).my_jid);
- do {
- message.rerun_parsing = false;
- pre_received_message(stream, message);
- } while(message.rerun_parsing);
+ yield received_pipeline.run(stream, message);
received_message(stream, message);
}
+ private void received_message_stanza(XmppStream stream, StanzaNode node) {
+ received_message_stanza_async.begin(stream, node);
+ }
+
public override void attach(XmppStream stream) {
stream.received_message_stanza.connect(received_message_stanza);
}
diff --git a/xmpp-vala/src/module/util.vala b/xmpp-vala/src/module/util.vala
index 365170b0..e6626049 100644
--- a/xmpp-vala/src/module/util.vala
+++ b/xmpp-vala/src/module/util.vala
@@ -1,3 +1,5 @@
+using Gee;
+
namespace Xmpp {
public string get_bare_jid(string jid) {
return jid.split("/")[0];
@@ -20,4 +22,61 @@ namespace Xmpp {
uint32 b5_2 = Random.next_int();
return "%08x-%04x-%04x-%04x-%04x%08x".printf(b1, b2, b3, b4, b5_1, b5_2);
}
+
+public abstract class StanzaListener<T> : Object {
+ public abstract string action_group { get; }
+ public abstract string[] after_actions { get; }
+ public abstract async void run(Core.XmppStream stream, T stanza);
+}
+
+public class StanzaListenerHolder<T> : Object {
+ private Gee.List<StanzaListener<T>> listeners = new ArrayList<StanzaListener<T>>();
+
+ public new void connect(StanzaListener<T> listener) {
+ listeners.add(listener);
+ resort_list();
+ }
+
+ public async void run(Core.XmppStream stream, T stanza) {
+ foreach (StanzaListener<T> l in listeners) {
+ yield l.run(stream, stanza);
+ }
+ }
+
+ private Gee.List<StanzaListener<T>> set_minus(Gee.List<StanzaListener<T>> main_set, Gee.List<StanzaListener<T>> minus) {
+ Gee.List<StanzaListener<T>> res = new ArrayList<StanzaListener<T>>();
+ foreach (StanzaListener<T> l in main_set) {
+ if (!minus.contains(l)) {
+ res.add(l);
+ }
+ }
+ return res;
+ }
+
+ private bool set_contains_action(Gee.List<StanzaListener<T>> s, string[] actions) {
+ foreach(StanzaListener<T> l in s) {
+ if (l.action_group in actions) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ private void resort_list() {
+ Gee.List<StanzaListener<T>> new_list = new ArrayList<StanzaListener<T>>();
+ while (listeners.size > new_list.size) {
+ bool changed = false;
+ foreach (StanzaListener<T> l in listeners) {
+ Gee.List<StanzaListener<T>> remaining = set_minus(listeners, new_list);
+ if (!set_contains_action(remaining, l.after_actions)) {
+ new_list.add(l);
+ changed = true;
+ }
+ }
+ if (!changed) warning("Can't sort listeners");
+ }
+ listeners = new_list;
+ }
+}
+
}
diff --git a/xmpp-vala/src/module/xep/0085_chat_state_notifications.vala b/xmpp-vala/src/module/xep/0085_chat_state_notifications.vala
index 5de504a2..3ca97282 100644
--- a/xmpp-vala/src/module/xep/0085_chat_state_notifications.vala
+++ b/xmpp-vala/src/module/xep/0085_chat_state_notifications.vala
@@ -31,24 +31,17 @@ public class Module : XmppStreamModule {
public override void attach(XmppStream stream) {
stream.get_module(ServiceDiscovery.Module.IDENTITY).add_feature(stream, NS_URI);
- stream.get_module(Message.Module.IDENTITY).pre_send_message.connect(on_pre_send_message);
+ stream.get_module(Message.Module.IDENTITY).send_pipeline.connect(new SendPipelineListener());
stream.get_module(Message.Module.IDENTITY).received_message.connect(on_received_message);
}
public override void detach(XmppStream stream) {
- stream.get_module(Message.Module.IDENTITY).pre_send_message.disconnect(on_pre_send_message);
stream.get_module(Message.Module.IDENTITY).received_message.disconnect(on_received_message);
}
public override string get_ns() { return NS_URI; }
public override string get_id() { return IDENTITY.id; }
- private void on_pre_send_message(XmppStream stream, Message.Stanza message) {
- if (message.body == null) return;
- if (message.type_ != Message.Stanza.TYPE_CHAT) return;
- message.stanza.put_node(new StanzaNode.build(STATE_ACTIVE, NS_URI).add_self_xmlns());
- }
-
private void on_received_message(XmppStream stream, Message.Stanza message) {
if (!message.is_error()) {
Gee.List<StanzaNode> nodes = message.stanza.get_all_subnodes();
@@ -62,4 +55,18 @@ public class Module : XmppStreamModule {
}
}
+public class SendPipelineListener : StanzaListener<Message.Stanza> {
+
+ private const string[] after_actions_const = {"MODIFY_BODY"};
+
+ public override string action_group { get { return "ADD_NODES"; } }
+ public override string[] after_actions { get { return after_actions_const; } }
+
+ public override async void run(Core.XmppStream stream, Message.Stanza message) {
+ if (message.body == null) return;
+ if (message.type_ != Message.Stanza.TYPE_CHAT) return;
+ message.stanza.put_node(new StanzaNode.build(STATE_ACTIVE, NS_URI).add_self_xmlns());
+ }
+}
+
}
diff --git a/xmpp-vala/src/module/xep/0184_message_delivery_receipts.vala b/xmpp-vala/src/module/xep/0184_message_delivery_receipts.vala
index 71a864c3..c9cb2d40 100644
--- a/xmpp-vala/src/module/xep/0184_message_delivery_receipts.vala
+++ b/xmpp-vala/src/module/xep/0184_message_delivery_receipts.vala
@@ -22,12 +22,11 @@ namespace Xmpp.Xep.MessageDeliveryReceipts {
public override void attach(XmppStream stream) {
stream.get_module(ServiceDiscovery.Module.IDENTITY).add_feature(stream, NS_URI);
stream.get_module(Message.Module.IDENTITY).received_message.connect(received_message);
- stream.get_module(Message.Module.IDENTITY).pre_send_message.connect(pre_send_message);
+ stream.get_module(Message.Module.IDENTITY).send_pipeline.connect(new SendPipelineListener());
}
public override void detach(XmppStream stream) {
stream.get_module(Message.Module.IDENTITY).received_message.disconnect(received_message);
- stream.get_module(Message.Module.IDENTITY).pre_send_message.disconnect(pre_send_message);
}
public override string get_ns() { return NS_URI; }
@@ -39,13 +38,22 @@ namespace Xmpp.Xep.MessageDeliveryReceipts {
receipt_received(stream, message.from, received_node.get_attribute("id", NS_URI));
}
}
+ }
- private void pre_send_message(XmppStream stream, Message.Stanza message) {
- StanzaNode? received_node = message.stanza.get_subnode("received", NS_URI);
- if (received_node != null) return;
- if (message.body == null) return;
- if (message.type_ == Message.Stanza.TYPE_GROUPCHAT) return;
- message.stanza.put_node(new StanzaNode.build("request", NS_URI).add_self_xmlns());
- }
+public class SendPipelineListener : StanzaListener<Message.Stanza> {
+
+ private const string[] after_actions_const = {};
+
+ public override string action_group { get { return "ADD_NODES"; } }
+ public override string[] after_actions { get { return after_actions_const; } }
+
+ public override async void run(Core.XmppStream stream, Message.Stanza message) {
+ StanzaNode? received_node = message.stanza.get_subnode("received", NS_URI);
+ if (received_node != null) return;
+ if (message.body == null) return;
+ if (message.type_ == Message.Stanza.TYPE_GROUPCHAT) return;
+ message.stanza.put_node(new StanzaNode.build("request", NS_URI).add_self_xmlns());
}
}
+
+}
diff --git a/xmpp-vala/src/module/xep/0203_delayed_delivery.vala b/xmpp-vala/src/module/xep/0203_delayed_delivery.vala
index 8ca300c9..89c761f2 100644
--- a/xmpp-vala/src/module/xep/0203_delayed_delivery.vala
+++ b/xmpp-vala/src/module/xep/0203_delayed_delivery.vala
@@ -27,19 +27,27 @@ namespace Xmpp.Xep.DelayedDelivery {
}
public override void attach(XmppStream stream) {
- stream.get_module(Message.Module.IDENTITY).pre_received_message.connect(on_pre_received_message);
+ stream.get_module(Message.Module.IDENTITY).received_pipeline.connect(new ReceivedPipelineListener());
}
public override void detach(XmppStream stream) { }
public override string get_ns() { return NS_URI; }
public override string get_id() { return IDENTITY.id; }
+ }
- private void on_pre_received_message(XmppStream stream, Message.Stanza message) {
- DateTime? datetime = get_time_for_message(message);
- if (datetime != null) message.add_flag(new MessageFlag(datetime));
- }
+public class ReceivedPipelineListener : StanzaListener<Message.Stanza> {
+
+ private const string[] after_actions_const = {};
+
+ public override string action_group { get { return "ADD_NODE"; } }
+ public override string[] after_actions { get { return after_actions_const; } }
+
+ public override async void run(Core.XmppStream stream, Message.Stanza message) {
+ DateTime? datetime = Module.get_time_for_message(message);
+ if (datetime != null) message.add_flag(new MessageFlag(datetime));
}
+}
public class MessageFlag : Message.MessageFlag {
public const string ID = "delayed_delivery";
diff --git a/xmpp-vala/src/module/xep/0280_message_carbons.vala b/xmpp-vala/src/module/xep/0280_message_carbons.vala
index b2d21646..930c5234 100644
--- a/xmpp-vala/src/module/xep/0280_message_carbons.vala
+++ b/xmpp-vala/src/module/xep/0280_message_carbons.vala
@@ -18,43 +18,50 @@ namespace Xmpp.Xep.MessageCarbons {
public override void attach(XmppStream stream) {
stream.stream_negotiated.connect(enable);
- stream.get_module(Message.Module.IDENTITY).pre_received_message.connect(pre_received_message);
+ stream.get_module(Message.Module.IDENTITY).received_pipeline.connect(new ReceivedPipelineListener());
stream.get_module(ServiceDiscovery.Module.IDENTITY).add_feature(stream, NS_URI);
}
public override void detach(XmppStream stream) {
stream.stream_negotiated.disconnect(enable);
- stream.get_module(Message.Module.IDENTITY).pre_received_message.disconnect(pre_received_message);
}
public override string get_ns() { return NS_URI; }
public override string get_id() { return IDENTITY.id; }
+ }
+
+public class ReceivedPipelineListener : StanzaListener<Message.Stanza> {
+
+ private const string[] after_actions_const = {"EXTRACT_MESSAGE_1"};
- private void pre_received_message(XmppStream stream, Message.Stanza message) {
- StanzaNode? received_node = message.stanza.get_subnode("received", NS_URI);
- StanzaNode? sent_node = received_node == null ? message.stanza.get_subnode("sent", NS_URI) : null;
- StanzaNode? carbons_node = received_node != null ? received_node : sent_node;
- if (carbons_node != null) {
- StanzaNode? forwarded_node = carbons_node.get_subnode("forwarded", "urn:xmpp:forward:0");
- if (forwarded_node != null) {
- StanzaNode? message_node = forwarded_node.get_subnode("message", Message.NS_URI);
- string? from_attribute = message_node.get_attribute("from", Message.NS_URI);
- // Any forwarded copies received by a Carbons-enabled client MUST be from that user's bare JID; any copies that do not meet this requirement MUST be ignored.
- if (from_attribute != null && from_attribute == get_bare_jid(stream.get_flag(Bind.Flag.IDENTITY).my_jid)) {
- if (received_node != null) {
- message.add_flag(new MessageFlag(MessageFlag.TYPE_RECEIVED));
- } else if (sent_node != null) {
- message.add_flag(new MessageFlag(MessageFlag.TYPE_SENT));
- }
- message.stanza = message_node;
- message.rerun_parsing = true;
+ public override string action_group { get { return "EXTRACT_MESSAGE_2"; } }
+ public override string[] after_actions { get { return after_actions_const; } }
+
+ public override async void run(Core.XmppStream stream, Message.Stanza message) {
+ StanzaNode? received_node = message.stanza.get_subnode("received", NS_URI);
+ StanzaNode? sent_node = received_node == null ? message.stanza.get_subnode("sent", NS_URI) : null;
+ StanzaNode? carbons_node = received_node != null ? received_node : sent_node;
+ if (carbons_node != null) {
+ StanzaNode? forwarded_node = carbons_node.get_subnode("forwarded", "urn:xmpp:forward:0");
+ if (forwarded_node != null) {
+ StanzaNode? message_node = forwarded_node.get_subnode("message", Message.NS_URI);
+ string? from_attribute = message_node.get_attribute("from", Message.NS_URI);
+ // Any forwarded copies received by a Carbons-enabled client MUST be from that user's bare JID; any copies that do not meet this requirement MUST be ignored.
+ if (from_attribute != null && from_attribute == get_bare_jid(stream.get_flag(Bind.Flag.IDENTITY).my_jid)) {
+ if (received_node != null) {
+ message.add_flag(new MessageFlag(MessageFlag.TYPE_RECEIVED));
+ } else if (sent_node != null) {
+ message.add_flag(new MessageFlag(MessageFlag.TYPE_SENT));
}
message.stanza = message_node;
message.rerun_parsing = true;
}
+ message.stanza = message_node;
+ message.rerun_parsing = true;
}
}
}
+}
public class MessageFlag : Message.MessageFlag {
public const string ID = "message_carbons";
diff --git a/xmpp-vala/src/module/xep/0313_message_archive_management.vala b/xmpp-vala/src/module/xep/0313_message_archive_management.vala
index 522f6dca..ac68e190 100644
--- a/xmpp-vala/src/module/xep/0313_message_archive_management.vala
+++ b/xmpp-vala/src/module/xep/0313_message_archive_management.vala
@@ -5,6 +5,10 @@ namespace Xmpp.Xep.MessageArchiveManagement {
public const string NS_URI = "urn:xmpp:mam:2";
public const string NS_URI_1 = "urn:xmpp:mam:1";
+private static string NS_VER(XmppStream stream) {
+ return stream.get_flag(Flag.IDENTITY).ns_ver;
+}
+
public class Module : XmppStreamModule {
public static ModuleIdentity<Module> IDENTITY = new ModuleIdentity<Module>(NS_URI, "0313_message_archive_management");
@@ -38,7 +42,7 @@ public class Module : XmppStreamModule {
}
public override void attach(XmppStream stream) {
- stream.get_module(Message.Module.IDENTITY).pre_received_message.connect(on_pre_received_message);
+ stream.get_module(Message.Module.IDENTITY).received_pipeline.connect(new ReceivedPipelineListener());
stream.stream_negotiated.connect(query_availability);
}
@@ -47,21 +51,6 @@ public class Module : XmppStreamModule {
public override string get_ns() { return NS_URI; }
public override string get_id() { return IDENTITY.id; }
- private void on_pre_received_message(XmppStream stream, Message.Stanza message) {
-// if (message.from != stream.remote_name) return;
- if (stream.get_flag(Flag.IDENTITY) == null) return;
-
- StanzaNode? message_node = message.stanza.get_deep_subnode(NS_VER(stream) + ":result", "urn:xmpp:forward:0:forwarded", Message.NS_URI + ":message");
- if (message_node != null) {
- StanzaNode? forward_node = message.stanza.get_deep_subnode(NS_VER(stream) + ":result", "urn:xmpp:forward:0:forwarded", DelayedDelivery.NS_URI + ":delay");
- DateTime? datetime = DelayedDelivery.Module.get_time_for_node(forward_node);
- message.add_flag(new MessageFlag(datetime));
-
- message.stanza = message_node;
- message.rerun_parsing = true;
- }
- }
-
private static void page_through_results(XmppStream stream, Iq.Stanza iq) {
string? last = iq.stanza.get_deep_string_content(NS_VER(stream) + ":fin", "http://jabber.org/protocol/rsm" + ":set", "last");
if (last == null) {
@@ -89,9 +78,28 @@ public class Module : XmppStreamModule {
if (stream.get_flag(Flag.IDENTITY) != null) feature_available(stream);
});
}
+}
+
+public class ReceivedPipelineListener : StanzaListener<Message.Stanza> {
+
+ private const string[] after_actions_const = {};
+
+ public override string action_group { get { return "EXTRACT_MESSAGE_1"; } }
+ public override string[] after_actions { get { return after_actions_const; } }
- private static string NS_VER(XmppStream stream) {
- return stream.get_flag(Flag.IDENTITY).ns_ver;
+ public override async void run(Core.XmppStream stream, Message.Stanza message) {
+ // if (message.from != stream.remote_name) return;
+ if (stream.get_flag(Flag.IDENTITY) == null) return;
+
+ StanzaNode? message_node = message.stanza.get_deep_subnode(NS_VER(stream) + ":result", "urn:xmpp:forward:0:forwarded", Message.NS_URI + ":message");
+ if (message_node != null) {
+ StanzaNode? forward_node = message.stanza.get_deep_subnode(NS_VER(stream) + ":result", "urn:xmpp:forward:0:forwarded", DelayedDelivery.NS_URI + ":delay");
+ DateTime? datetime = DelayedDelivery.Module.get_time_for_node(forward_node);
+ message.add_flag(new MessageFlag(datetime));
+
+ message.stanza = message_node;
+ message.rerun_parsing = true;
+ }
}
}
diff --git a/xmpp-vala/src/module/xep/0333_chat_markers.vala b/xmpp-vala/src/module/xep/0333_chat_markers.vala
index a0e42510..9c3251dc 100644
--- a/xmpp-vala/src/module/xep/0333_chat_markers.vala
+++ b/xmpp-vala/src/module/xep/0333_chat_markers.vala
@@ -31,12 +31,11 @@ public class Module : XmppStreamModule {
public override void attach(XmppStream stream) {
stream.get_module(ServiceDiscovery.Module.IDENTITY).add_feature(stream, NS_URI);
- stream.get_module(Message.Module.IDENTITY).pre_send_message.connect(on_pre_send_message);
+ stream.get_module(Message.Module.IDENTITY).send_pipeline.connect(new SendPipelineListener());
stream.get_module(Message.Module.IDENTITY).received_message.connect(on_received_message);
}
public override void detach(XmppStream stream) {
- stream.get_module(Message.Module.IDENTITY).pre_send_message.disconnect(on_pre_send_message);
stream.get_module(Message.Module.IDENTITY).received_message.disconnect(on_received_message);
}
@@ -52,8 +51,16 @@ public class Module : XmppStreamModule {
}
}
}
+}
+
+public class SendPipelineListener : StanzaListener<Message.Stanza> {
+
+ private const string[] after_actions_const = {};
+
+ public override string action_group { get { return "ADD_NODES"; } }
+ public override string[] after_actions { get { return after_actions_const; } }
- private void on_pre_send_message(XmppStream stream, Message.Stanza message) {
+ public override async void run(Core.XmppStream stream, Message.Stanza message) {
StanzaNode? received_node = message.stanza.get_subnode("received", NS_URI);
if (received_node != null) return;
if (message.body == null) return;