From 5038db063ec3981385dd68a3069e8edd0b52075d Mon Sep 17 00:00:00 2001 From: Marvin W Date: Wed, 15 Mar 2017 17:23:13 +0100 Subject: omemo plugin: improve session bootstrapping --- plugins/omemo/src/manager.vala | 164 +++++++++++++++++++++++++++++------------ 1 file changed, 118 insertions(+), 46 deletions(-) (limited to 'plugins/omemo/src/manager.vala') diff --git a/plugins/omemo/src/manager.vala b/plugins/omemo/src/manager.vala index 197ba021..a48f4748 100644 --- a/plugins/omemo/src/manager.vala +++ b/plugins/omemo/src/manager.vala @@ -11,8 +11,52 @@ public class Manager : StreamInteractionModule, Object { private StreamInteractor stream_interactor; private Database db; - private ConcurrentList to_send_after_devicelist = new ConcurrentList(); - private ConcurrentList to_send_after_session = new ConcurrentList(); + private Map message_states = new HashMap(Entities.Message.hash_func, Entities.Message.equals_func); + + private class MessageState { + public Entities.Message msg { get; private set; } + public EncryptState last_try { get; private set; } + public int waiting_other_sessions { get; set; } + public int waiting_own_sessions { get; set; } + public bool waiting_own_devicelist { get; set; } + public bool waiting_other_devicelist { get; set; } + public bool force_next_attempt { get; set; } + public bool will_send_now { get; private set; } + public bool active_send_attempt { get; set; } + + public MessageState(Entities.Message msg, EncryptState last_try) { + this.msg = msg; + this.last_try = last_try; + update_from_encrypt_status(last_try); + } + + public void update_from_encrypt_status(EncryptState new_try) { + this.last_try = new_try; + this.waiting_other_sessions = new_try.other_unknown; + this.waiting_own_sessions = new_try.own_unknown; + this.waiting_own_devicelist = !new_try.own_list; + this.waiting_other_devicelist = !new_try.own_list; + this.active_send_attempt = false; + will_send_now = false; + if (new_try.other_failure > 0 || (new_try.other_lost == new_try.other_devices && new_try.other_devices > 0)) { + msg.marked = Entities.Message.Marked.WONTSEND; + } else if (new_try.other_unknown > 0 || new_try.own_devices == 0) { + msg.marked = Entities.Message.Marked.UNSENT; + } else if (!new_try.encrypted) { + msg.marked = Entities.Message.Marked.WONTSEND; + } else { + will_send_now = true; + } + } + + public bool should_retry_now() { + return !waiting_own_devicelist && !waiting_other_devicelist && waiting_other_sessions <= 0 && waiting_own_sessions <= 0 && !active_send_attempt; + } + + public string to_string() { + return @"MessageState (waiting=(others=$waiting_other_sessions, own=$waiting_own_sessions, other_list=$waiting_other_devicelist, own_list=$waiting_own_devicelist))"; + } + } private Manager(StreamInteractor stream_interactor, Database db) { this.stream_interactor = stream_interactor; @@ -33,31 +77,36 @@ public class Manager : StreamInteractionModule, Object { private void on_pre_message_send(Entities.Message message, Xmpp.Message.Stanza message_stanza, Conversation conversation) { if (message.encryption == Encryption.OMEMO) { StreamModule module = stream_interactor.get_stream(conversation.account).get_module(StreamModule.IDENTITY); - EncryptStatus status = module.encrypt(message_stanza, conversation.account.bare_jid.to_string()); - if (status.other_failure > 0 || (status.other_lost == status.other_devices && status.other_devices > 0)) { - message.marked = Entities.Message.Marked.WONTSEND; - } else if (status.other_unknown > 0 || status.own_devices == 0) { - message.marked = Entities.Message.Marked.UNSENT; - } else if (!status.encrypted) { - message.marked = Entities.Message.Marked.WONTSEND; + EncryptState enc_state = module.encrypt(message_stanza, conversation.account.bare_jid.to_string()); + MessageState state = null; + lock (message_states) { + if (message_states.has_key(message)) { + state = message_states.get(message); + state.update_from_encrypt_status(enc_state); + } else { + state = new MessageState(message, enc_state); + message_states[message] = state; + } + if (state.will_send_now) { + message_states.unset(message); + } } - if (status.other_unknown > 0) { - bool cont = true; - lock(to_send_after_session) { - foreach(Entities.Message msg in to_send_after_session) { - if (msg.counterpart.bare_jid.to_string() == message.counterpart.bare_jid.to_string()) cont = false; + if (!state.will_send_now) { + if (message.marked == Entities.Message.Marked.WONTSEND) { + if (Plugin.DEBUG) print(@"OMEMO: message $(message.stanza_id) was not sent: $state\n"); + } else { + if (Plugin.DEBUG) print(@"OMEMO: message $(message.stanza_id) will be delayed: $state\n"); + + if (state.waiting_own_sessions > 0) { + module.start_sessions_with(stream_interactor.get_stream(conversation.account), conversation.account.bare_jid.to_string()); + } + if (state.waiting_other_sessions > 0) { + module.start_sessions_with(stream_interactor.get_stream(conversation.account), message.counterpart.bare_jid.to_string()); + } + if (state.waiting_other_devicelist) { + module.request_user_devicelist(stream_interactor.get_stream(conversation.account), message.counterpart.bare_jid.to_string()); } - to_send_after_session.add(message); - } - if (cont) module.start_sessions_with(stream_interactor.get_stream(conversation.account), message.counterpart.bare_jid.to_string()); - } - if (status.own_unknown > 0) { - module.start_sessions_with(stream_interactor.get_stream(conversation.account), conversation.account.bare_jid.to_string()); - } - if (status.own_devices == 0) { - lock (to_send_after_session) { - to_send_after_devicelist.add(message); } } } @@ -65,8 +114,9 @@ public class Manager : StreamInteractionModule, Object { private void on_account_added(Account account) { stream_interactor.module_manager.get_module(account, StreamModule.IDENTITY).store_created.connect((store) => on_store_created(account, store)); - stream_interactor.module_manager.get_module(account, StreamModule.IDENTITY).device_list_loaded.connect(() => on_device_list_loaded(account)); - stream_interactor.module_manager.get_module(account, StreamModule.IDENTITY).session_started.connect((jid, device_id) => on_session_started(account, jid)); + stream_interactor.module_manager.get_module(account, StreamModule.IDENTITY).device_list_loaded.connect((jid) => on_device_list_loaded(account, jid)); + stream_interactor.module_manager.get_module(account, StreamModule.IDENTITY).session_started.connect((jid, device_id) => on_session_started(account, jid, false)); + stream_interactor.module_manager.get_module(account, StreamModule.IDENTITY).session_start_failed.connect((jid, device_id) => on_session_started(account, jid, true)); } private void on_stream_negotiated(Account account) { @@ -74,32 +124,52 @@ public class Manager : StreamInteractionModule, Object { stream_interactor.module_manager.get_module(account, StreamModule.IDENTITY).request_user_devicelist(stream, account.bare_jid.to_string()); } - private void on_session_started(Account account, string jid) { - lock(to_send_after_session) { - Iterator iter = to_send_after_session.iterator(); - while (iter.next()) { - Entities.Message msg = iter.get(); - if (msg.account.bare_jid.to_string() == account.bare_jid.to_string() && msg.counterpart.bare_jid.to_string() == jid) { - Entities.Conversation conv = ConversationManager.get_instance(stream_interactor).get_conversation(msg.counterpart, account); - MessageManager.get_instance(stream_interactor).send_xmpp_message(msg, conv, true); - iter.remove(); + private void on_session_started(Account account, string jid, bool failed) { + if (Plugin.DEBUG) print(@"OMEMO: session start between $(account.bare_jid) and $jid $(failed ? "failed" : "successful")\n"); + HashSet send_now = new HashSet(); + lock (message_states) { + foreach (Entities.Message msg in message_states.keys) { + if (!msg.account.equals(account)) continue; + MessageState state = message_states[msg]; + if (account.bare_jid.to_string() == jid) { + state.waiting_own_sessions--; + } else if (msg.counterpart.bare_jid.to_string() == jid) { + state.waiting_other_sessions--; + } + if (state.should_retry_now()) { + send_now.add(msg); + state.active_send_attempt = true; } } } + foreach (Entities.Message msg in send_now) { + Entities.Conversation conv = ConversationManager.get_instance(stream_interactor).get_conversation(msg.counterpart, account); + MessageManager.get_instance(stream_interactor).send_xmpp_message(msg, conv, true); + } } - private void on_device_list_loaded(Account account) { - lock(to_send_after_devicelist) { - Iterator iter = to_send_after_devicelist.iterator(); - while (iter.next()) { - Entities.Message msg = iter.get(); - if (msg.account.bare_jid.to_string() == account.bare_jid.to_string()) { - Entities.Conversation conv = ConversationManager.get_instance(stream_interactor).get_conversation(msg.counterpart, account); - MessageManager.get_instance(stream_interactor).send_xmpp_message(msg, conv, true); - iter.remove(); + private void on_device_list_loaded(Account account, string jid) { + if (Plugin.DEBUG) print(@"OMEMO: received device list for $(account.bare_jid) from $jid\n"); + HashSet send_now = new HashSet(); + lock (message_states) { + foreach (Entities.Message msg in message_states.keys) { + if (!msg.account.equals(account)) continue; + MessageState state = message_states[msg]; + if (account.bare_jid.to_string() == jid) { + state.waiting_own_devicelist = false; + } else if (msg.counterpart.bare_jid.to_string() == jid) { + state.waiting_other_devicelist = false; + } + if (state.should_retry_now()) { + send_now.add(msg); + state.active_send_attempt = true; } } } + foreach (Entities.Message msg in send_now) { + Entities.Conversation conv = ConversationManager.get_instance(stream_interactor).get_conversation(msg.counterpart, account); + MessageManager.get_instance(stream_interactor).send_xmpp_message(msg, conv, true); + } } private void on_store_created(Account account, Store store) { @@ -141,13 +211,15 @@ public class Manager : StreamInteractionModule, Object { store.pre_key_store = new BackedPreKeyStore(db, identity_id); store.session_store = new BackedSessionStore(db, identity_id); } else { - print(@"WARN: OMEMO store for $(account.bare_jid) is not persisted"); + print(@"OMEMO: store for $(account.bare_jid) is not persisted!"); } } public bool can_encrypt(Entities.Conversation conversation) { - return stream_interactor.get_stream(conversation.account).get_module(StreamModule.IDENTITY).is_known_address(conversation.counterpart.bare_jid.to_string()); + Core.XmppStream stream = stream_interactor.get_stream(conversation.account); + if (stream == null) return false; + return stream.get_module(StreamModule.IDENTITY).is_known_address(conversation.counterpart.bare_jid.to_string()); } internal string get_id() { -- cgit v1.2.3-54-g00ecf