aboutsummaryrefslogtreecommitdiff
path: root/plugins/omemo/src/manager.vala
diff options
context:
space:
mode:
authorMarvin W <git@larma.de>2017-03-15 17:23:13 +0100
committerMarvin W <git@larma.de>2017-03-15 17:23:13 +0100
commit5038db063ec3981385dd68a3069e8edd0b52075d (patch)
tree0c2309518a8e20dd47c00126fe6da70eb2041f93 /plugins/omemo/src/manager.vala
parent27afc2164f31f40e79606c72005277d730c1f005 (diff)
downloaddino-5038db063ec3981385dd68a3069e8edd0b52075d.tar.gz
dino-5038db063ec3981385dd68a3069e8edd0b52075d.zip
omemo plugin: improve session bootstrapping
Diffstat (limited to 'plugins/omemo/src/manager.vala')
-rw-r--r--plugins/omemo/src/manager.vala164
1 files changed, 118 insertions, 46 deletions
diff --git a/plugins/omemo/src/manager.vala b/plugins/omemo/src/manager.vala
index 197ba021..a48f4748 100644
--- a/plugins/omemo/src/manager.vala
+++ b/plugins/omemo/src/manager.vala
@@ -11,8 +11,52 @@ public class Manager : StreamInteractionModule, Object {
private StreamInteractor stream_interactor;
private Database db;
- private ConcurrentList<Entities.Message> to_send_after_devicelist = new ConcurrentList<Entities.Message>();
- private ConcurrentList<Entities.Message> to_send_after_session = new ConcurrentList<Entities.Message>();
+ private Map<Entities.Message, MessageState> message_states = new HashMap<Entities.Message, MessageState>(Entities.Message.hash_func, Entities.Message.equals_func);
+
+ private class MessageState {
+ public Entities.Message msg { get; private set; }
+ public EncryptState last_try { get; private set; }
+ public int waiting_other_sessions { get; set; }
+ public int waiting_own_sessions { get; set; }
+ public bool waiting_own_devicelist { get; set; }
+ public bool waiting_other_devicelist { get; set; }
+ public bool force_next_attempt { get; set; }
+ public bool will_send_now { get; private set; }
+ public bool active_send_attempt { get; set; }
+
+ public MessageState(Entities.Message msg, EncryptState last_try) {
+ this.msg = msg;
+ this.last_try = last_try;
+ update_from_encrypt_status(last_try);
+ }
+
+ public void update_from_encrypt_status(EncryptState new_try) {
+ this.last_try = new_try;
+ this.waiting_other_sessions = new_try.other_unknown;
+ this.waiting_own_sessions = new_try.own_unknown;
+ this.waiting_own_devicelist = !new_try.own_list;
+ this.waiting_other_devicelist = !new_try.own_list;
+ this.active_send_attempt = false;
+ will_send_now = false;
+ if (new_try.other_failure > 0 || (new_try.other_lost == new_try.other_devices && new_try.other_devices > 0)) {
+ msg.marked = Entities.Message.Marked.WONTSEND;
+ } else if (new_try.other_unknown > 0 || new_try.own_devices == 0) {
+ msg.marked = Entities.Message.Marked.UNSENT;
+ } else if (!new_try.encrypted) {
+ msg.marked = Entities.Message.Marked.WONTSEND;
+ } else {
+ will_send_now = true;
+ }
+ }
+
+ public bool should_retry_now() {
+ return !waiting_own_devicelist && !waiting_other_devicelist && waiting_other_sessions <= 0 && waiting_own_sessions <= 0 && !active_send_attempt;
+ }
+
+ public string to_string() {
+ return @"MessageState (waiting=(others=$waiting_other_sessions, own=$waiting_own_sessions, other_list=$waiting_other_devicelist, own_list=$waiting_own_devicelist))";
+ }
+ }
private Manager(StreamInteractor stream_interactor, Database db) {
this.stream_interactor = stream_interactor;
@@ -33,31 +77,36 @@ public class Manager : StreamInteractionModule, Object {
private void on_pre_message_send(Entities.Message message, Xmpp.Message.Stanza message_stanza, Conversation conversation) {
if (message.encryption == Encryption.OMEMO) {
StreamModule module = stream_interactor.get_stream(conversation.account).get_module(StreamModule.IDENTITY);
- EncryptStatus status = module.encrypt(message_stanza, conversation.account.bare_jid.to_string());
- if (status.other_failure > 0 || (status.other_lost == status.other_devices && status.other_devices > 0)) {
- message.marked = Entities.Message.Marked.WONTSEND;
- } else if (status.other_unknown > 0 || status.own_devices == 0) {
- message.marked = Entities.Message.Marked.UNSENT;
- } else if (!status.encrypted) {
- message.marked = Entities.Message.Marked.WONTSEND;
+ EncryptState enc_state = module.encrypt(message_stanza, conversation.account.bare_jid.to_string());
+ MessageState state = null;
+ lock (message_states) {
+ if (message_states.has_key(message)) {
+ state = message_states.get(message);
+ state.update_from_encrypt_status(enc_state);
+ } else {
+ state = new MessageState(message, enc_state);
+ message_states[message] = state;
+ }
+ if (state.will_send_now) {
+ message_states.unset(message);
+ }
}
- if (status.other_unknown > 0) {
- bool cont = true;
- lock(to_send_after_session) {
- foreach(Entities.Message msg in to_send_after_session) {
- if (msg.counterpart.bare_jid.to_string() == message.counterpart.bare_jid.to_string()) cont = false;
+ if (!state.will_send_now) {
+ if (message.marked == Entities.Message.Marked.WONTSEND) {
+ if (Plugin.DEBUG) print(@"OMEMO: message $(message.stanza_id) was not sent: $state\n");
+ } else {
+ if (Plugin.DEBUG) print(@"OMEMO: message $(message.stanza_id) will be delayed: $state\n");
+
+ if (state.waiting_own_sessions > 0) {
+ module.start_sessions_with(stream_interactor.get_stream(conversation.account), conversation.account.bare_jid.to_string());
+ }
+ if (state.waiting_other_sessions > 0) {
+ module.start_sessions_with(stream_interactor.get_stream(conversation.account), message.counterpart.bare_jid.to_string());
+ }
+ if (state.waiting_other_devicelist) {
+ module.request_user_devicelist(stream_interactor.get_stream(conversation.account), message.counterpart.bare_jid.to_string());
}
- to_send_after_session.add(message);
- }
- if (cont) module.start_sessions_with(stream_interactor.get_stream(conversation.account), message.counterpart.bare_jid.to_string());
- }
- if (status.own_unknown > 0) {
- module.start_sessions_with(stream_interactor.get_stream(conversation.account), conversation.account.bare_jid.to_string());
- }
- if (status.own_devices == 0) {
- lock (to_send_after_session) {
- to_send_after_devicelist.add(message);
}
}
}
@@ -65,8 +114,9 @@ public class Manager : StreamInteractionModule, Object {
private void on_account_added(Account account) {
stream_interactor.module_manager.get_module(account, StreamModule.IDENTITY).store_created.connect((store) => on_store_created(account, store));
- stream_interactor.module_manager.get_module(account, StreamModule.IDENTITY).device_list_loaded.connect(() => on_device_list_loaded(account));
- stream_interactor.module_manager.get_module(account, StreamModule.IDENTITY).session_started.connect((jid, device_id) => on_session_started(account, jid));
+ stream_interactor.module_manager.get_module(account, StreamModule.IDENTITY).device_list_loaded.connect((jid) => on_device_list_loaded(account, jid));
+ stream_interactor.module_manager.get_module(account, StreamModule.IDENTITY).session_started.connect((jid, device_id) => on_session_started(account, jid, false));
+ stream_interactor.module_manager.get_module(account, StreamModule.IDENTITY).session_start_failed.connect((jid, device_id) => on_session_started(account, jid, true));
}
private void on_stream_negotiated(Account account) {
@@ -74,32 +124,52 @@ public class Manager : StreamInteractionModule, Object {
stream_interactor.module_manager.get_module(account, StreamModule.IDENTITY).request_user_devicelist(stream, account.bare_jid.to_string());
}
- private void on_session_started(Account account, string jid) {
- lock(to_send_after_session) {
- Iterator<Entities.Message> iter = to_send_after_session.iterator();
- while (iter.next()) {
- Entities.Message msg = iter.get();
- if (msg.account.bare_jid.to_string() == account.bare_jid.to_string() && msg.counterpart.bare_jid.to_string() == jid) {
- Entities.Conversation conv = ConversationManager.get_instance(stream_interactor).get_conversation(msg.counterpart, account);
- MessageManager.get_instance(stream_interactor).send_xmpp_message(msg, conv, true);
- iter.remove();
+ private void on_session_started(Account account, string jid, bool failed) {
+ if (Plugin.DEBUG) print(@"OMEMO: session start between $(account.bare_jid) and $jid $(failed ? "failed" : "successful")\n");
+ HashSet<Entities.Message> send_now = new HashSet<Entities.Message>();
+ lock (message_states) {
+ foreach (Entities.Message msg in message_states.keys) {
+ if (!msg.account.equals(account)) continue;
+ MessageState state = message_states[msg];
+ if (account.bare_jid.to_string() == jid) {
+ state.waiting_own_sessions--;
+ } else if (msg.counterpart.bare_jid.to_string() == jid) {
+ state.waiting_other_sessions--;
+ }
+ if (state.should_retry_now()) {
+ send_now.add(msg);
+ state.active_send_attempt = true;
}
}
}
+ foreach (Entities.Message msg in send_now) {
+ Entities.Conversation conv = ConversationManager.get_instance(stream_interactor).get_conversation(msg.counterpart, account);
+ MessageManager.get_instance(stream_interactor).send_xmpp_message(msg, conv, true);
+ }
}
- private void on_device_list_loaded(Account account) {
- lock(to_send_after_devicelist) {
- Iterator<Entities.Message> iter = to_send_after_devicelist.iterator();
- while (iter.next()) {
- Entities.Message msg = iter.get();
- if (msg.account.bare_jid.to_string() == account.bare_jid.to_string()) {
- Entities.Conversation conv = ConversationManager.get_instance(stream_interactor).get_conversation(msg.counterpart, account);
- MessageManager.get_instance(stream_interactor).send_xmpp_message(msg, conv, true);
- iter.remove();
+ private void on_device_list_loaded(Account account, string jid) {
+ if (Plugin.DEBUG) print(@"OMEMO: received device list for $(account.bare_jid) from $jid\n");
+ HashSet<Entities.Message> send_now = new HashSet<Entities.Message>();
+ lock (message_states) {
+ foreach (Entities.Message msg in message_states.keys) {
+ if (!msg.account.equals(account)) continue;
+ MessageState state = message_states[msg];
+ if (account.bare_jid.to_string() == jid) {
+ state.waiting_own_devicelist = false;
+ } else if (msg.counterpart.bare_jid.to_string() == jid) {
+ state.waiting_other_devicelist = false;
+ }
+ if (state.should_retry_now()) {
+ send_now.add(msg);
+ state.active_send_attempt = true;
}
}
}
+ foreach (Entities.Message msg in send_now) {
+ Entities.Conversation conv = ConversationManager.get_instance(stream_interactor).get_conversation(msg.counterpart, account);
+ MessageManager.get_instance(stream_interactor).send_xmpp_message(msg, conv, true);
+ }
}
private void on_store_created(Account account, Store store) {
@@ -141,13 +211,15 @@ public class Manager : StreamInteractionModule, Object {
store.pre_key_store = new BackedPreKeyStore(db, identity_id);
store.session_store = new BackedSessionStore(db, identity_id);
} else {
- print(@"WARN: OMEMO store for $(account.bare_jid) is not persisted");
+ print(@"OMEMO: store for $(account.bare_jid) is not persisted!");
}
}
public bool can_encrypt(Entities.Conversation conversation) {
- return stream_interactor.get_stream(conversation.account).get_module(StreamModule.IDENTITY).is_known_address(conversation.counterpart.bare_jid.to_string());
+ Core.XmppStream stream = stream_interactor.get_stream(conversation.account);
+ if (stream == null) return false;
+ return stream.get_module(StreamModule.IDENTITY).is_known_address(conversation.counterpart.bare_jid.to_string());
}
internal string get_id() {