From 5038db063ec3981385dd68a3069e8edd0b52075d Mon Sep 17 00:00:00 2001 From: Marvin W Date: Wed, 15 Mar 2017 17:23:13 +0100 Subject: omemo plugin: improve session bootstrapping --- plugins/omemo/src/encrypt_state.vala | 24 +++++ plugins/omemo/src/encrypt_status.vala | 17 ---- plugins/omemo/src/manager.vala | 164 ++++++++++++++++++++++++---------- plugins/omemo/src/plugin.vala | 3 +- plugins/omemo/src/stream_module.vala | 73 +++++++-------- 5 files changed, 177 insertions(+), 104 deletions(-) create mode 100644 plugins/omemo/src/encrypt_state.vala delete mode 100644 plugins/omemo/src/encrypt_status.vala (limited to 'plugins/omemo/src') diff --git a/plugins/omemo/src/encrypt_state.vala b/plugins/omemo/src/encrypt_state.vala new file mode 100644 index 00000000..80ae40d7 --- /dev/null +++ b/plugins/omemo/src/encrypt_state.vala @@ -0,0 +1,24 @@ +namespace Dino.Plugins.Omemo { + +public class EncryptState { + public bool encrypted { get; internal set; } + public int other_devices { get; internal set; } + public int other_success { get; internal set; } + public int other_lost { get; internal set; } + public int other_unknown { get; internal set; } + public int other_failure { get; internal set; } + public bool other_list { get; internal set; } + + public int own_devices { get; internal set; } + public int own_success { get; internal set; } + public int own_lost { get; internal set; } + public int own_unknown { get; internal set; } + public int own_failure { get; internal set; } + public bool own_list { get; internal set; } + + public string to_string() { + return @"EncryptState (encrypted=$encrypted, other=(devices=$other_devices, success=$other_success, lost=$other_lost, unknown=$other_unknown, failure=$other_failure, list=$other_list), own=(devices=$own_devices, success=$own_success, lost=$own_lost, unknown=$own_unknown, failure=$own_failure, list=$own_list))"; + } +} + +} \ No newline at end of file diff --git a/plugins/omemo/src/encrypt_status.vala b/plugins/omemo/src/encrypt_status.vala deleted file mode 100644 index c6b45ac6..00000000 --- a/plugins/omemo/src/encrypt_status.vala +++ /dev/null @@ -1,17 +0,0 @@ -namespace Dino.Plugins.Omemo { - -public class EncryptStatus { - public bool encrypted { get; internal set; } - public int other_devices { get; internal set; } - public int other_success { get; internal set; } - public int other_lost { get; internal set; } - public int other_unknown { get; internal set; } - public int other_failure { get; internal set; } - public int own_devices { get; internal set; } - public int own_success { get; internal set; } - public int own_lost { get; internal set; } - public int own_unknown { get; internal set; } - public int own_failure { get; internal set; } -} - -} \ No newline at end of file diff --git a/plugins/omemo/src/manager.vala b/plugins/omemo/src/manager.vala index 197ba021..a48f4748 100644 --- a/plugins/omemo/src/manager.vala +++ b/plugins/omemo/src/manager.vala @@ -11,8 +11,52 @@ public class Manager : StreamInteractionModule, Object { private StreamInteractor stream_interactor; private Database db; - private ConcurrentList to_send_after_devicelist = new ConcurrentList(); - private ConcurrentList to_send_after_session = new ConcurrentList(); + private Map message_states = new HashMap(Entities.Message.hash_func, Entities.Message.equals_func); + + private class MessageState { + public Entities.Message msg { get; private set; } + public EncryptState last_try { get; private set; } + public int waiting_other_sessions { get; set; } + public int waiting_own_sessions { get; set; } + public bool waiting_own_devicelist { get; set; } + public bool waiting_other_devicelist { get; set; } + public bool force_next_attempt { get; set; } + public bool will_send_now { get; private set; } + public bool active_send_attempt { get; set; } + + public MessageState(Entities.Message msg, EncryptState last_try) { + this.msg = msg; + this.last_try = last_try; + update_from_encrypt_status(last_try); + } + + public void update_from_encrypt_status(EncryptState new_try) { + this.last_try = new_try; + this.waiting_other_sessions = new_try.other_unknown; + this.waiting_own_sessions = new_try.own_unknown; + this.waiting_own_devicelist = !new_try.own_list; + this.waiting_other_devicelist = !new_try.own_list; + this.active_send_attempt = false; + will_send_now = false; + if (new_try.other_failure > 0 || (new_try.other_lost == new_try.other_devices && new_try.other_devices > 0)) { + msg.marked = Entities.Message.Marked.WONTSEND; + } else if (new_try.other_unknown > 0 || new_try.own_devices == 0) { + msg.marked = Entities.Message.Marked.UNSENT; + } else if (!new_try.encrypted) { + msg.marked = Entities.Message.Marked.WONTSEND; + } else { + will_send_now = true; + } + } + + public bool should_retry_now() { + return !waiting_own_devicelist && !waiting_other_devicelist && waiting_other_sessions <= 0 && waiting_own_sessions <= 0 && !active_send_attempt; + } + + public string to_string() { + return @"MessageState (waiting=(others=$waiting_other_sessions, own=$waiting_own_sessions, other_list=$waiting_other_devicelist, own_list=$waiting_own_devicelist))"; + } + } private Manager(StreamInteractor stream_interactor, Database db) { this.stream_interactor = stream_interactor; @@ -33,31 +77,36 @@ public class Manager : StreamInteractionModule, Object { private void on_pre_message_send(Entities.Message message, Xmpp.Message.Stanza message_stanza, Conversation conversation) { if (message.encryption == Encryption.OMEMO) { StreamModule module = stream_interactor.get_stream(conversation.account).get_module(StreamModule.IDENTITY); - EncryptStatus status = module.encrypt(message_stanza, conversation.account.bare_jid.to_string()); - if (status.other_failure > 0 || (status.other_lost == status.other_devices && status.other_devices > 0)) { - message.marked = Entities.Message.Marked.WONTSEND; - } else if (status.other_unknown > 0 || status.own_devices == 0) { - message.marked = Entities.Message.Marked.UNSENT; - } else if (!status.encrypted) { - message.marked = Entities.Message.Marked.WONTSEND; + EncryptState enc_state = module.encrypt(message_stanza, conversation.account.bare_jid.to_string()); + MessageState state = null; + lock (message_states) { + if (message_states.has_key(message)) { + state = message_states.get(message); + state.update_from_encrypt_status(enc_state); + } else { + state = new MessageState(message, enc_state); + message_states[message] = state; + } + if (state.will_send_now) { + message_states.unset(message); + } } - if (status.other_unknown > 0) { - bool cont = true; - lock(to_send_after_session) { - foreach(Entities.Message msg in to_send_after_session) { - if (msg.counterpart.bare_jid.to_string() == message.counterpart.bare_jid.to_string()) cont = false; + if (!state.will_send_now) { + if (message.marked == Entities.Message.Marked.WONTSEND) { + if (Plugin.DEBUG) print(@"OMEMO: message $(message.stanza_id) was not sent: $state\n"); + } else { + if (Plugin.DEBUG) print(@"OMEMO: message $(message.stanza_id) will be delayed: $state\n"); + + if (state.waiting_own_sessions > 0) { + module.start_sessions_with(stream_interactor.get_stream(conversation.account), conversation.account.bare_jid.to_string()); + } + if (state.waiting_other_sessions > 0) { + module.start_sessions_with(stream_interactor.get_stream(conversation.account), message.counterpart.bare_jid.to_string()); + } + if (state.waiting_other_devicelist) { + module.request_user_devicelist(stream_interactor.get_stream(conversation.account), message.counterpart.bare_jid.to_string()); } - to_send_after_session.add(message); - } - if (cont) module.start_sessions_with(stream_interactor.get_stream(conversation.account), message.counterpart.bare_jid.to_string()); - } - if (status.own_unknown > 0) { - module.start_sessions_with(stream_interactor.get_stream(conversation.account), conversation.account.bare_jid.to_string()); - } - if (status.own_devices == 0) { - lock (to_send_after_session) { - to_send_after_devicelist.add(message); } } } @@ -65,8 +114,9 @@ public class Manager : StreamInteractionModule, Object { private void on_account_added(Account account) { stream_interactor.module_manager.get_module(account, StreamModule.IDENTITY).store_created.connect((store) => on_store_created(account, store)); - stream_interactor.module_manager.get_module(account, StreamModule.IDENTITY).device_list_loaded.connect(() => on_device_list_loaded(account)); - stream_interactor.module_manager.get_module(account, StreamModule.IDENTITY).session_started.connect((jid, device_id) => on_session_started(account, jid)); + stream_interactor.module_manager.get_module(account, StreamModule.IDENTITY).device_list_loaded.connect((jid) => on_device_list_loaded(account, jid)); + stream_interactor.module_manager.get_module(account, StreamModule.IDENTITY).session_started.connect((jid, device_id) => on_session_started(account, jid, false)); + stream_interactor.module_manager.get_module(account, StreamModule.IDENTITY).session_start_failed.connect((jid, device_id) => on_session_started(account, jid, true)); } private void on_stream_negotiated(Account account) { @@ -74,32 +124,52 @@ public class Manager : StreamInteractionModule, Object { stream_interactor.module_manager.get_module(account, StreamModule.IDENTITY).request_user_devicelist(stream, account.bare_jid.to_string()); } - private void on_session_started(Account account, string jid) { - lock(to_send_after_session) { - Iterator iter = to_send_after_session.iterator(); - while (iter.next()) { - Entities.Message msg = iter.get(); - if (msg.account.bare_jid.to_string() == account.bare_jid.to_string() && msg.counterpart.bare_jid.to_string() == jid) { - Entities.Conversation conv = ConversationManager.get_instance(stream_interactor).get_conversation(msg.counterpart, account); - MessageManager.get_instance(stream_interactor).send_xmpp_message(msg, conv, true); - iter.remove(); + private void on_session_started(Account account, string jid, bool failed) { + if (Plugin.DEBUG) print(@"OMEMO: session start between $(account.bare_jid) and $jid $(failed ? "failed" : "successful")\n"); + HashSet send_now = new HashSet(); + lock (message_states) { + foreach (Entities.Message msg in message_states.keys) { + if (!msg.account.equals(account)) continue; + MessageState state = message_states[msg]; + if (account.bare_jid.to_string() == jid) { + state.waiting_own_sessions--; + } else if (msg.counterpart.bare_jid.to_string() == jid) { + state.waiting_other_sessions--; + } + if (state.should_retry_now()) { + send_now.add(msg); + state.active_send_attempt = true; } } } + foreach (Entities.Message msg in send_now) { + Entities.Conversation conv = ConversationManager.get_instance(stream_interactor).get_conversation(msg.counterpart, account); + MessageManager.get_instance(stream_interactor).send_xmpp_message(msg, conv, true); + } } - private void on_device_list_loaded(Account account) { - lock(to_send_after_devicelist) { - Iterator iter = to_send_after_devicelist.iterator(); - while (iter.next()) { - Entities.Message msg = iter.get(); - if (msg.account.bare_jid.to_string() == account.bare_jid.to_string()) { - Entities.Conversation conv = ConversationManager.get_instance(stream_interactor).get_conversation(msg.counterpart, account); - MessageManager.get_instance(stream_interactor).send_xmpp_message(msg, conv, true); - iter.remove(); + private void on_device_list_loaded(Account account, string jid) { + if (Plugin.DEBUG) print(@"OMEMO: received device list for $(account.bare_jid) from $jid\n"); + HashSet send_now = new HashSet(); + lock (message_states) { + foreach (Entities.Message msg in message_states.keys) { + if (!msg.account.equals(account)) continue; + MessageState state = message_states[msg]; + if (account.bare_jid.to_string() == jid) { + state.waiting_own_devicelist = false; + } else if (msg.counterpart.bare_jid.to_string() == jid) { + state.waiting_other_devicelist = false; + } + if (state.should_retry_now()) { + send_now.add(msg); + state.active_send_attempt = true; } } } + foreach (Entities.Message msg in send_now) { + Entities.Conversation conv = ConversationManager.get_instance(stream_interactor).get_conversation(msg.counterpart, account); + MessageManager.get_instance(stream_interactor).send_xmpp_message(msg, conv, true); + } } private void on_store_created(Account account, Store store) { @@ -141,13 +211,15 @@ public class Manager : StreamInteractionModule, Object { store.pre_key_store = new BackedPreKeyStore(db, identity_id); store.session_store = new BackedSessionStore(db, identity_id); } else { - print(@"WARN: OMEMO store for $(account.bare_jid) is not persisted"); + print(@"OMEMO: store for $(account.bare_jid) is not persisted!"); } } public bool can_encrypt(Entities.Conversation conversation) { - return stream_interactor.get_stream(conversation.account).get_module(StreamModule.IDENTITY).is_known_address(conversation.counterpart.bare_jid.to_string()); + Core.XmppStream stream = stream_interactor.get_stream(conversation.account); + if (stream == null) return false; + return stream.get_module(StreamModule.IDENTITY).is_known_address(conversation.counterpart.bare_jid.to_string()); } internal string get_id() { diff --git a/plugins/omemo/src/plugin.vala b/plugins/omemo/src/plugin.vala index d3fee15d..f98b03d9 100644 --- a/plugins/omemo/src/plugin.vala +++ b/plugins/omemo/src/plugin.vala @@ -1,6 +1,7 @@ namespace Dino.Plugins.Omemo { public class Plugin : RootInterface, Object { + public const bool DEBUG = false; public static Signal.Context context; public Dino.Application app; @@ -15,7 +16,7 @@ public class Plugin : RootInterface, Object { } try { - context = new Signal.Context(false); + context = new Signal.Context(DEBUG); this.app = app; this.db = new Database(Path.build_filename(Application.get_storage_dir(), "omemo.db")); this.list_entry = new EncryptionListEntry(this); diff --git a/plugins/omemo/src/stream_module.vala b/plugins/omemo/src/stream_module.vala index 4283fd74..14b1a93e 100644 --- a/plugins/omemo/src/stream_module.vala +++ b/plugins/omemo/src/stream_module.vala @@ -18,24 +18,28 @@ public class StreamModule : XmppStreamModule { public static ModuleIdentity IDENTITY = new ModuleIdentity(NS_URI, ID); private Store store; - private bool device_list_loading = false; - private bool device_list_modified = false; + private ConcurrentSet active_bundle_requests = new ConcurrentSet(); + private ConcurrentSet active_devicelist_requests = new ConcurrentSet(); private Map> device_lists = new HashMap>(); private Map> ignored_devices = new HashMap>(); public signal void store_created(Store store); - public signal void device_list_loaded(); + public signal void device_list_loaded(string jid); public signal void session_started(string jid, int device_id); + public signal void session_start_failed(string jid, int device_id); - public EncryptStatus encrypt(Message.Stanza message, string self_bare_jid) { - EncryptStatus status = new EncryptStatus(); + public EncryptState encrypt(Message.Stanza message, string self_bare_jid) { + EncryptState status = new EncryptState(); if (Plugin.context == null) return status; try { string name = get_bare_jid(message.to); - if (device_lists.get(name) == null || device_lists.get(self_bare_jid) == null) return status; - status.other_devices = device_lists.get(name).size; + if (device_lists.get(self_bare_jid) == null) return status; + status.own_list = true; status.own_devices = device_lists.get(self_bare_jid).size; - if (status.other_devices == 0) return status; + if (device_lists.get(name) == null) return status; + status.other_list = true; + status.other_devices = device_lists.get(name).size; + if (status.own_devices == 0 || status.other_devices == 0) return status; uint8[] key = new uint8[16]; Plugin.context.randomize(key); @@ -93,7 +97,7 @@ public class StreamModule : XmppStreamModule { message.body = "[This message is OMEMO encrypted]"; status.encrypted = true; } catch (Error e) { - print(@"Signal error while encrypting message: $(e.message)\n"); + if (Plugin.DEBUG) print(@"OMEMO: Signal error while encrypting message: $(e.message)\n"); } return status; } @@ -163,7 +167,7 @@ public class StreamModule : XmppStreamModule { flag.decrypted = true; } } catch (Error e) { - print(@"Signal error while decrypting message: $(e.message)\n"); + if (Plugin.DEBUG) print(@"OMEMO: Signal error while decrypting message: $(e.message)\n"); } } } @@ -177,7 +181,10 @@ public class StreamModule : XmppStreamModule { } public void request_user_devicelist(XmppStream stream, string jid) { - stream.get_module(Pubsub.Module.IDENTITY).request(stream, jid, NODE_DEVICELIST, (stream, jid, id, node, obj) => (obj as StreamModule).on_devicelist(stream, jid, id ?? "", node), this); + if (active_devicelist_requests.add(jid)) { + if (Plugin.DEBUG) print(@"OMEMO: requesting device list for $jid\n"); + stream.get_module(Pubsub.Module.IDENTITY).request(stream, jid, NODE_DEVICELIST, (stream, jid, id, node, obj) => (obj as StreamModule).on_devicelist(stream, jid, id ?? "", node), this); + } } public void on_devicelist(XmppStream stream, string jid, string id, StanzaNode? node_) { @@ -187,28 +194,6 @@ public class StreamModule : XmppStreamModule { node = new StanzaNode.build("list", NS_URI).add_self_xmlns().put_node(new StanzaNode.build("device", NS_URI)); } - lock (device_list_loading) { - if (!device_list_loading) { - device_list_loading = true; - GLib.Timeout.add_seconds(3, () => { - bool cont = false; - lock (device_lists) { - if (device_list_modified) { - cont = true; - device_list_modified = false; - } - } - if (!cont) { - lock (device_list_loading) { - device_list_loading = false; - device_list_loaded(); - } - } - return cont; - }); - } - } - bool am_on_devicelist = false; foreach (StanzaNode device_node in node.get_subnodes("device")) { int device_id = device_node.get_attribute_int("id"); @@ -217,7 +202,7 @@ public class StreamModule : XmppStreamModule { } } if (!am_on_devicelist) { - print(@"Not on device list, adding id\n"); + if (Plugin.DEBUG) print(@"OMEMO: Not on device list, adding id\n"); node.put_node(new StanzaNode.build("device", NS_URI).put_attribute("id", store.local_registration_id.to_string())); stream.get_module(Pubsub.Module.IDENTITY).publish(stream, jid, NODE_DEVICELIST, NODE_DEVICELIST, id, node); } else { @@ -225,12 +210,13 @@ public class StreamModule : XmppStreamModule { } } lock(device_lists) { - device_list_modified = true; device_lists[jid] = new ArrayList(); foreach (StanzaNode device_node in node.get_subnodes("device")) { device_lists[jid].add(device_node.get_attribute_int("id")); } } + active_devicelist_requests.remove(jid); + device_list_loaded(jid); } public void start_sessions_with(XmppStream stream, string bare_jid) { @@ -256,8 +242,10 @@ public class StreamModule : XmppStreamModule { } public void start_session_with(XmppStream stream, string bare_jid, int device_id) { - print(@"Asking for bundle from $bare_jid/$device_id\n"); - stream.get_module(Pubsub.Module.IDENTITY).request(stream, bare_jid, @"$NODE_BUNDLES:$device_id", on_other_bundle_result, Tuple.create(store, device_id)); + if (active_bundle_requests.add(bare_jid + @":$device_id")) { + if (Plugin.DEBUG) print(@"OMEMO: Asking for bundle from $bare_jid:$device_id\n"); + stream.get_module(Pubsub.Module.IDENTITY).request(stream, bare_jid, @"$NODE_BUNDLES:$device_id", on_other_bundle_result, Tuple.create(store, device_id)); + } } public bool is_known_address(string name) { @@ -272,6 +260,7 @@ public class StreamModule : XmppStreamModule { } ignored_devices[jid].add(device_id); } + session_start_failed(jid, device_id); } public bool is_ignored_device(string jid, int32 device_id) { @@ -314,20 +303,23 @@ public class StreamModule : XmppStreamModule { } SessionBuilder builder = store.create_session_builder(address); builder.process_pre_key_bundle(create_pre_key_bundle(device_id, device_id, pre_key_id, pre_key, signed_pre_key_id, signed_pre_key, signed_pre_key_signature, identity_key)); + stream.get_module(IDENTITY).session_started(jid, device_id); } catch (Error e) { fail = true; } address.device_id = 0; // TODO: Hack to have address obj live longer - stream.get_module(IDENTITY).session_started(jid, device_id); } } if (fail) { stream.get_module(IDENTITY).ignore_device(jid, device_id); } + stream.get_module(IDENTITY).active_bundle_requests.remove(jid + @":$device_id"); } public void publish_bundles_if_needed(XmppStream stream, string jid) { - stream.get_module(Pubsub.Module.IDENTITY).request(stream, jid, @"$NODE_BUNDLES:$(store.local_registration_id)", on_self_bundle_result, store); + if (active_bundle_requests.add(jid + @":$(store.local_registration_id)")) { + stream.get_module(Pubsub.Module.IDENTITY).request(stream, jid, @"$NODE_BUNDLES:$(store.local_registration_id)", on_self_bundle_result, store); + } } private static void on_self_bundle_result(XmppStream stream, string jid, string? id, StanzaNode? node, Object? storage) { @@ -394,8 +386,9 @@ public class StreamModule : XmppStreamModule { publish_bundles(stream, signed_pre_key_record, identity_key_pair, pre_key_records, (int32) store.local_registration_id); } } catch (Error e) { - print(@"Unexpected error while publishing bundle: $(e.message)\n"); + if (Plugin.DEBUG) print(@"Unexpected error while publishing bundle: $(e.message)\n"); } + stream.get_module(IDENTITY).active_bundle_requests.remove(jid + @":$(store.local_registration_id)"); } public static void publish_bundles(XmppStream stream, SignedPreKeyRecord signed_pre_key_record, IdentityKeyPair identity_key_pair, Set pre_key_records, int32 device_id) throws Error { -- cgit v1.2.3-70-g09d2