From 57c72d2818dec6c713834cfbb8c4c566a1602907 Mon Sep 17 00:00:00 2001 From: fiaxh Date: Fri, 19 Jan 2018 22:37:02 +0100 Subject: Pipeline for incoming messages in libdino --- plugins/http-files/src/file_provider.vala | 50 ++++++++++++++++++------ plugins/http-files/src/manager.vala | 3 +- plugins/http-files/src/upload_stream_module.vala | 17 ++++++++ plugins/omemo/src/manager.vala | 19 ++++++--- plugins/omemo/src/stream_module.vala | 11 +++--- plugins/openpgp/src/manager.vala | 23 +++++++---- plugins/openpgp/src/stream_module.vala | 3 +- 7 files changed, 95 insertions(+), 31 deletions(-) (limited to 'plugins') diff --git a/plugins/http-files/src/file_provider.vala b/plugins/http-files/src/file_provider.vala index 493aaa61..bfeca922 100644 --- a/plugins/http-files/src/file_provider.vala +++ b/plugins/http-files/src/file_provider.vala @@ -20,23 +20,39 @@ public class FileProvider : Dino.FileProvider, Object { this.url_regex = new Regex("""^(?i)\b((?:[a-z][\w-]+:(?:\/{1,3}|[a-z0-9%])|www\d{0,3}[.]|[a-z0-9.\-]+[.][a-z]{2,4}\/)(?:[^\s()<>]+|\(([^\s()<>]+|(\([^\s()<>]+\)))*\))+(?:\(([^\s()<>]+|(\([^\s()<>]+\)))*\)|[^\s`!()\[\]{};:'".,<>?«»“”‘’]))$"""); this.file_ext_regex = new Regex("""\.(png|jpg|jpeg|svg|gif|pgp)$"""); - stream_interactor.get_module(MessageProcessor.IDENTITY).message_received.connect(check_in_message); stream_interactor.get_module(MessageProcessor.IDENTITY).message_sent.connect(check_out_message); + stream_interactor.get_module(MessageProcessor.IDENTITY).received_pipeline.connect(new ReceivedMessageListener(this)); stream_interactor.get_module(Manager.IDENTITY).uploaded.connect((file_transfer, url) => { file_transfer.info = url; ignore_once.add(url); }); } - private void check_in_message(Message message, Conversation conversation) { - if (!url_regex.match(message.body)) return; - Jid relevant_jid = stream_interactor.get_module(MucManager.IDENTITY).get_real_jid(message.from, conversation.account) ?? conversation.counterpart; - bool in_roster = stream_interactor.get_module(RosterManager.IDENTITY).get_roster_item(conversation.account, relevant_jid) != null; - if (message.direction == Message.DIRECTION_RECEIVED && !in_roster) return; + private class ReceivedMessageListener : MessageListener { + + public string[] after_actions_const = new string[]{ "" }; + public override string action_group { get { return "DECRYPT"; } } + public override string[] after_actions { get { return after_actions_const; } } - string? oob_url = Xmpp.Xep.OutOfBandData.get_url_from_message(message.stanza); - if ((oob_url != null && oob_url == message.body) || file_ext_regex.match(message.body)) { - download_url(message, conversation); + private FileProvider outer; + private StreamInteractor stream_interactor; + + public ReceivedMessageListener(FileProvider outer) { + this.outer = outer; + this.stream_interactor = outer.stream_interactor; + } + + public override async bool run(Entities.Message message, Xmpp.MessageStanza stanza, Conversation conversation) { + if (!outer.url_regex.match(message.body)) return false; + Jid relevant_jid = stream_interactor.get_module(MucManager.IDENTITY).get_real_jid(message.from, conversation.account) ?? conversation.counterpart; + bool in_roster = stream_interactor.get_module(RosterManager.IDENTITY).get_roster_item(conversation.account, relevant_jid) != null; + if (message.direction == Message.DIRECTION_RECEIVED && !in_roster) return false; + + string? oob_url = Xmpp.Xep.OutOfBandData.get_url_from_message(message.stanza); + if ((oob_url != null && oob_url == message.body) || outer.file_ext_regex.match(message.body)) { + yield outer.download_url(message, conversation); + } + return false; } } @@ -45,14 +61,15 @@ public class FileProvider : Dino.FileProvider, Object { if (message.body.length < 5) return; if (!url_regex.match(message.body)) return; if (!file_ext_regex.match(message.body)) return; - download_url(message, conversation); } - private void download_url(Message message, Conversation conversation) { + private async bool download_url(Message message, Conversation conversation) { + bool success = false; var session = new Soup.Session(); var head_message = new Soup.Message("HEAD", message.body); if (head_message != null) { + SourceFunc callback = download_url.callback; session.send_async.begin(head_message, null, (obj, res) => { string? content_type = null, content_length = null; print(message.body + ":\n"); @@ -69,6 +86,7 @@ public class FileProvider : Dino.FileProvider, Object { try { file_transfer.input_stream = request.send_async.end(res); } catch (Error e) { + Idle.add((owned)callback); return; } file_transfer.account = conversation.account; @@ -85,11 +103,19 @@ public class FileProvider : Dino.FileProvider, Object { file_transfer.provider = 0; file_transfer.info = message.body; file_incoming(file_transfer); + success = true; + Idle.add((owned)callback); }); - } catch (Error e) { } + } catch (Error e) { + Idle.add((owned)callback); + } + } else { + Idle.add((owned)callback); } }); + yield; } + return success; } } diff --git a/plugins/http-files/src/manager.vala b/plugins/http-files/src/manager.vala index db7a3f8f..3be3c6a7 100644 --- a/plugins/http-files/src/manager.vala +++ b/plugins/http-files/src/manager.vala @@ -37,7 +37,8 @@ public class Manager : StreamInteractionModule, FileSender, Object { uploaded(file_transfer, url_down); stream_interactor.get_module(MessageProcessor.IDENTITY).send_message(url_down, conversation); }, - () => { + (stream, error_str) => { + print(@"Failed getting upload url + $error_str\n"); file_transfer.state = FileTransfer.State.FAILED; } ); diff --git a/plugins/http-files/src/upload_stream_module.vala b/plugins/http-files/src/upload_stream_module.vala index f4a4a428..4835f268 100644 --- a/plugins/http-files/src/upload_stream_module.vala +++ b/plugins/http-files/src/upload_stream_module.vala @@ -11,6 +11,7 @@ public class UploadStreamModule : XmppStreamModule { public static Xmpp.ModuleIdentity IDENTITY = new Xmpp.ModuleIdentity(NS_URI, "0363_http_file_upload"); public signal void feature_available(XmppStream stream, long max_file_size); + public signal void received_url(XmppStream stream, MessageStanza message); public delegate void OnUploadOk(XmppStream stream, string url_down); public delegate void OnError(XmppStream stream, string error); @@ -159,6 +160,22 @@ public class UploadStreamModule : XmppStreamModule { } } +public class ReceivedPipelineListener : StanzaListener { + + private const string[] after_actions_const = {"EXTRACT_MESSAGE_2"}; + + public override string action_group { get { return "EXTRACT_MESSAGE_2"; } } + public override string[] after_actions { get { return after_actions_const; } } + + public override async bool run(XmppStream stream, MessageStanza message) { + string? oob_url = OutOfBandData.get_url_from_message(message); + if (oob_url != null && oob_url == message.body) { + stream.get_module(UploadStreamModule.IDENTITY).received_url(stream, message); + } + return true; + } +} + public class Flag : XmppStreamFlag { public static FlagIdentity IDENTITY = new FlagIdentity(NS_URI, "service_discovery"); diff --git a/plugins/omemo/src/manager.vala b/plugins/omemo/src/manager.vala index 6c8ce4ef..f4c5ed0f 100644 --- a/plugins/omemo/src/manager.vala +++ b/plugins/omemo/src/manager.vala @@ -13,6 +13,7 @@ public class Manager : StreamInteractionModule, Object { private StreamInteractor stream_interactor; private Database db; private Map message_states = new HashMap(Entities.Message.hash_func, Entities.Message.equals_func); + private ReceivedMessageListener received_message_listener = new ReceivedMessageListener(); private class MessageState { public Entities.Message msg { get; private set; } @@ -65,14 +66,22 @@ public class Manager : StreamInteractionModule, Object { stream_interactor.stream_negotiated.connect(on_stream_negotiated); stream_interactor.account_added.connect(on_account_added); - stream_interactor.get_module(MessageProcessor.IDENTITY).pre_message_received.connect(on_pre_message_received); + stream_interactor.get_module(MessageProcessor.IDENTITY).received_pipeline.connect(received_message_listener); stream_interactor.get_module(MessageProcessor.IDENTITY).pre_message_send.connect(on_pre_message_send); } - private void on_pre_message_received(Entities.Message message, Xmpp.MessageStanza message_stanza, Conversation conversation) { - MessageFlag? flag = MessageFlag.get_flag(message_stanza); - if (flag != null && ((!)flag).decrypted) { - message.encryption = Encryption.OMEMO; + private class ReceivedMessageListener : MessageListener { + + public string[] after_actions_const = new string[]{ "" }; + public override string action_group { get { return "DECRYPT"; } } + public override string[] after_actions { get { return after_actions_const; } } + + public override async bool run(Entities.Message message, Xmpp.MessageStanza stanza, Conversation conversation) { + MessageFlag? flag = MessageFlag.get_flag(stanza); + if (flag != null && ((!)flag).decrypted) { + message.encryption = Encryption.OMEMO; + } + return false; } } diff --git a/plugins/omemo/src/stream_module.vala b/plugins/omemo/src/stream_module.vala index 0b5f4ea9..4494e834 100644 --- a/plugins/omemo/src/stream_module.vala +++ b/plugins/omemo/src/stream_module.vala @@ -397,17 +397,17 @@ public class ReceivedPipelineListener : StanzaListener { this.store = store; } - public override async void run(XmppStream stream, MessageStanza message) { + public override async bool run(XmppStream stream, MessageStanza message) { StanzaNode? _encrypted = message.stanza.get_subnode("encrypted", NS_URI); - if (_encrypted == null || MessageFlag.get_flag(message) != null || message.from == null) return; + if (_encrypted == null || MessageFlag.get_flag(message) != null || message.from == null) return false; StanzaNode encrypted = (!)_encrypted; - if (!Plugin.ensure_context()) return; + if (!Plugin.ensure_context()) return false; MessageFlag flag = new MessageFlag(); message.add_flag(flag); StanzaNode? _header = encrypted.get_subnode("header"); - if (_header == null) return; + if (_header == null) return false; StanzaNode header = (!)_header; - if (header.get_attribute_int("sid") <= 0) return; + if (header.get_attribute_int("sid") <= 0) return false; foreach (StanzaNode key_node in header.get_subnodes("key")) { if (key_node.get_attribute_int("rid") == store.local_registration_id) { try { @@ -448,6 +448,7 @@ public class ReceivedPipelineListener : StanzaListener { } } } + return false; } private string arr_to_str(uint8[] arr) { diff --git a/plugins/openpgp/src/manager.vala b/plugins/openpgp/src/manager.vala index 79e832ff..4f8b87bb 100644 --- a/plugins/openpgp/src/manager.vala +++ b/plugins/openpgp/src/manager.vala @@ -15,6 +15,7 @@ public class Manager : StreamInteractionModule, Object { private StreamInteractor stream_interactor; private Database db; private HashMap pgp_key_ids = new HashMap(Jid.hash_bare_func, Jid.equals_bare_func); + private ReceivedMessageListener received_message_listener = new ReceivedMessageListener(); public static void start(StreamInteractor stream_interactor, Database db) { Manager m = new Manager(stream_interactor, db); @@ -26,7 +27,7 @@ public class Manager : StreamInteractionModule, Object { this.db = db; stream_interactor.account_added.connect(on_account_added); - stream_interactor.get_module(MessageProcessor.IDENTITY).pre_message_received.connect(on_pre_message_received); + stream_interactor.get_module(MessageProcessor.IDENTITY).received_pipeline.connect(received_message_listener); stream_interactor.get_module(MessageProcessor.IDENTITY).pre_message_send.connect(check_encypt); } @@ -63,12 +64,6 @@ public class Manager : StreamInteractionModule, Object { return gpgkeys; } - private void on_pre_message_received(Entities.Message message, Xmpp.MessageStanza message_stanza, Conversation conversation) { - if (MessageFlag.get_flag(message_stanza) != null && MessageFlag.get_flag(message_stanza).decrypted) { - message.encryption = Encryption.PGP; - } - } - private void check_encypt(Entities.Message message, Xmpp.MessageStanza message_stanza, Conversation conversation) { try { if (message.encryption == Encryption.PGP) { @@ -104,6 +99,20 @@ public class Manager : StreamInteractionModule, Object { pgp_key_ids[jid] = key_id; } } + + private class ReceivedMessageListener : MessageListener { + + public string[] after_actions_const = new string[]{ "" }; + public override string action_group { get { return "DECRYPT"; } } + public override string[] after_actions { get { return after_actions_const; } } + + public override async bool run(Entities.Message message, Xmpp.MessageStanza stanza, Conversation conversation) { + if (MessageFlag.get_flag(stanza) != null && MessageFlag.get_flag(stanza).decrypted) { + message.encryption = Encryption.PGP; + } + return false; + } + } } } diff --git a/plugins/openpgp/src/stream_module.vala b/plugins/openpgp/src/stream_module.vala index a8b821de..3bcc3326 100644 --- a/plugins/openpgp/src/stream_module.vala +++ b/plugins/openpgp/src/stream_module.vala @@ -140,7 +140,7 @@ public class ReceivedPipelineDecryptListener : StanzaListener { public override string action_group { get { return "ENCRYPT_BODY"; } } public override string[] after_actions { get { return after_actions_const; } } - public override async void run(XmppStream stream, MessageStanza message) { + public override async bool run(XmppStream stream, MessageStanza message) { string? encrypted = get_cyphertext(message); if (encrypted != null) { MessageFlag flag = new MessageFlag(); @@ -151,6 +151,7 @@ public class ReceivedPipelineDecryptListener : StanzaListener { message.body = decrypted; } } + return false; } private static async string? gpg_decrypt(string enc) { -- cgit v1.2.3-54-g00ecf