From 07917f1d841f449157aa3aaa2507b0547dd274e7 Mon Sep 17 00:00:00 2001 From: fiaxh Date: Mon, 16 Nov 2020 15:55:33 +0100 Subject: Refactor XmppStream, TLS and connection method logic fixes #534 --- libdino/src/service/connection_manager.vala | 191 ++++++++++++++++++---------- 1 file changed, 127 insertions(+), 64 deletions(-) (limited to 'libdino/src/service/connection_manager.vala') diff --git a/libdino/src/service/connection_manager.vala b/libdino/src/service/connection_manager.vala index cd5f7384..40cd21d4 100644 --- a/libdino/src/service/connection_manager.vala +++ b/libdino/src/service/connection_manager.vala @@ -17,10 +17,12 @@ public class ConnectionManager : Object { DISCONNECTED } - private HashSet connection_todo = new HashSet(Account.hash_func, Account.equals_func); private HashMap connections = new HashMap(Account.hash_func, Account.equals_func); private HashMap connection_errors = new HashMap(Account.hash_func, Account.equals_func); + private HashMap connection_ongoing = new HashMap(Account.hash_func, Account.equals_func); + private HashMap connection_directly_retry = new HashMap(Account.hash_func, Account.equals_func); + private NetworkMonitor? network_monitor; private Login1Manager? login1; private ModuleManager module_manager; @@ -52,13 +54,45 @@ public class ConnectionManager : Object { } private class Connection { - public XmppStream stream { get; set; } + public string uuid { get; set; } + public XmppStream? stream { get; set; } public ConnectionState connection_state { get; set; default = ConnectionState.DISCONNECTED; } - public DateTime established { get; set; } - public DateTime last_activity { get; set; } - public class Connection(XmppStream stream, DateTime established) { - this.stream = stream; - this.established = established; + public DateTime? established { get; set; } + public DateTime? last_activity { get; set; } + + public Connection() { + reset(); + } + + public void reset() { + if (stream != null) { + stream.detach_modules(); + + stream.disconnect.begin(); + } + stream = null; + established = last_activity = null; + uuid = Xmpp.random_uuid(); + } + + public void make_offline() { + Xmpp.Presence.Stanza presence = new Xmpp.Presence.Stanza(); + presence.type_ = Xmpp.Presence.Stanza.TYPE_UNAVAILABLE; + if (stream != null) { + stream.get_module(Presence.Module.IDENTITY).send_presence(stream, presence); + } + } + + public async void disconnect_account() { + make_offline(); + + if (stream != null) { + try { + yield stream.disconnect(); + } catch (Error e) { + debug("Error disconnecting stream: %s", e.message); + } + } } } @@ -74,7 +108,7 @@ public class ConnectionManager : Object { login1.PrepareForSleep.connect(on_prepare_for_sleep); } Timeout.add_seconds(60, () => { - foreach (Account account in connection_todo) { + foreach (Account account in connections.keys) { if (connections[account].last_activity != null && connections[account].last_activity.compare(new DateTime.now_utc().add_minutes(-1)) < 0) { check_reconnect(account); @@ -106,13 +140,16 @@ public class ConnectionManager : Object { } public Collection get_managed_accounts() { - return connection_todo; + return connections.keys; } public void connect_account(Account account) { - if (!connection_todo.contains(account)) connection_todo.add(account); if (!connections.has_key(account)) { - connect_(account); + connections[account] = new Connection(); + connection_ongoing[account] = false; + connection_directly_retry[account] = false; + + connect_stream.begin(account); } else { check_reconnect(account); } @@ -125,74 +162,98 @@ public class ConnectionManager : Object { } private void make_offline(Account account) { - Xmpp.Presence.Stanza presence = new Xmpp.Presence.Stanza(); - presence.type_ = Xmpp.Presence.Stanza.TYPE_UNAVAILABLE; + connections[account].make_offline(); change_connection_state(account, ConnectionState.DISCONNECTED); - connections[account].stream.get_module(Presence.Module.IDENTITY).send_presence(connections[account].stream, presence); } public async void disconnect_account(Account account) { if (connections.has_key(account)) { make_offline(account); - try { - yield connections[account].stream.disconnect(); - } catch (Error e) { - debug("Error disconnecting stream: %s", e.message); - } - connection_todo.remove(account); - if (connections.has_key(account)) { - connections.unset(account); - } + connections[account].disconnect_account(); + connections.unset(account); } } - private void connect_(Account account, string? resource = null) { - if (connections.has_key(account)) connections[account].stream.detach_modules(); + private async void connect_stream(Account account, string? resource = null) { + if (!connections.has_key(account)) return; + + debug("[%s] (Maybe) Establishing a new connection", account.bare_jid.to_string()); + connection_errors.unset(account); if (resource == null) resource = account.resourcepart; - XmppStream stream = new XmppStream(); - foreach (XmppStreamModule module in module_manager.get_modules(account, resource)) { - stream.add_module(module); + XmppStreamResult stream_result; + + if (connection_ongoing[account]) { + debug("[%s] Connection attempt already in progress. Directly retry if it fails.", account.bare_jid.to_string()); + connection_directly_retry[account] = true; + return; + } else if (connections[account].stream != null) { + debug("[%s] Cancelling connecting because there is already a stream", account.bare_jid.to_string()); + return; + } else { + connection_ongoing[account] = true; + connection_directly_retry[account] = false; + + change_connection_state(account, ConnectionState.CONNECTING); + stream_result = yield Xmpp.establish_stream(account.bare_jid, module_manager.get_modules(account, resource), log_options); + connections[account].stream = stream_result.stream; + + connection_ongoing[account] = false; } - stream.log = new XmppLog(account.bare_jid.to_string(), log_options); + + if (stream_result.stream == null) { + if (stream_result.tls_errors != null) { + set_connection_error(account, new ConnectionError(ConnectionError.Source.TLS, null) { reconnect_recomendation=ConnectionError.Reconnect.NEVER}); + return; + } + + debug("[%s] Could not connect", account.bare_jid.to_string()); + + change_connection_state(account, ConnectionState.DISCONNECTED); + + check_reconnect(account, connection_directly_retry[account]); + + return; + } + + XmppStream stream = stream_result.stream; + debug("[%s] New connection with resource %s: %p", account.bare_jid.to_string(), resource, stream); - Connection connection = new Connection(stream, new DateTime.now_utc()); - connections[account] = connection; - change_connection_state(account, ConnectionState.CONNECTING); + connections[account].established = new DateTime.now_utc(); stream.attached_modules.connect((stream) => { change_connection_state(account, ConnectionState.CONNECTED); }); stream.get_module(Sasl.Module.IDENTITY).received_auth_failure.connect((stream, node) => { set_connection_error(account, new ConnectionError(ConnectionError.Source.SASL, null)); }); - stream.get_module(Tls.Module.IDENTITY).invalid_certificate.connect(() => { - set_connection_error(account, new ConnectionError(ConnectionError.Source.TLS, null) { reconnect_recomendation=ConnectionError.Reconnect.NEVER}); - }); + + string connection_uuid = connections[account].uuid; stream.received_node.connect(() => { - connection.last_activity = new DateTime.now_utc(); + if (connections[account].uuid == connection_uuid) { + connections[account].last_activity = new DateTime.now_utc(); + } else { + warning("Got node for outdated connection"); + } }); - connect_async.begin(account, stream); stream_opened(account, stream); - } - private async void connect_async(Account account, XmppStream stream) { try { - yield stream.connect(account.domainpart); + yield stream.loop(); } catch (Error e) { - debug("[%s %p] Error: %s", account.bare_jid.to_string(), stream, e.message); + debug("[%s %p] Connection error: %s", account.bare_jid.to_string(), stream, e.message); + change_connection_state(account, ConnectionState.DISCONNECTED); - if (!connection_todo.contains(account)) { - return; - } + connections[account].reset(); + StreamError.Flag? flag = stream.get_flag(StreamError.Flag.IDENTITY); if (flag != null) { warning(@"[%s %p] Stream Error: %s", account.bare_jid.to_string(), stream, flag.error_type); set_connection_error(account, new ConnectionError(ConnectionError.Source.STREAM_ERROR, flag.error_type)); if (flag.resource_rejected) { - connect_(account, account.resourcepart + "-" + random_uuid()); + connect_stream.begin(account, account.resourcepart + "-" + random_uuid()); return; } } @@ -202,27 +263,36 @@ public class ConnectionManager : Object { return; } - debug("[%s] Check reconnect in 5 sec", account.bare_jid.to_string()); - Timeout.add_seconds(5, () => { - check_reconnect(account); - return false; - }); + check_reconnect(account); } } private void check_reconnects() { - foreach (Account account in connection_todo) { + foreach (Account account in connections.keys) { check_reconnect(account); } } - private void check_reconnect(Account account) { + private void check_reconnect(Account account, bool directly_reconnect = false) { if (!connections.has_key(account)) return; bool acked = false; DateTime? last_activity_was = connections[account].last_activity; + if (connections[account].stream == null) { + Timeout.add_seconds(10, () => { + if (!connections.has_key(account)) return false; + if (connections[account].stream != null) return false; + if (connections[account].last_activity != last_activity_was) return false; + + connect_stream.begin(account); + return false; + }); + return; + } + XmppStream stream = connections[account].stream; + stream.get_module(Xep.Ping.Module.IDENTITY).send_ping.begin(stream, account.bare_jid.domain_jid, () => { acked = true; if (connections[account].stream != stream) return; @@ -239,15 +309,8 @@ public class ConnectionManager : Object { debug("[%s %p] Ping timeouted. Reconnecting", account.bare_jid.to_string(), stream); change_connection_state(account, ConnectionState.DISCONNECTED); - connections[account].stream.disconnect.begin((_, res) => { - try { - connections[account].stream.disconnect.end(res); - } catch (Error e) { - debug("Error disconnecting stream: %s", e.message); - } - }); - - connect_(account); + connections[account].reset(); + connect_stream.begin(account); return false; }); } @@ -268,19 +331,19 @@ public class ConnectionManager : Object { check_reconnects(); } else { debug("NetworkMonitor: Network reported offline"); - foreach (Account account in connection_todo) { + foreach (Account account in connections.keys) { change_connection_state(account, ConnectionState.DISCONNECTED); } } } private async void on_prepare_for_sleep(bool suspend) { - foreach (Account account in connection_todo) { + foreach (Account account in connections.keys) { change_connection_state(account, ConnectionState.DISCONNECTED); } if (suspend) { debug("Login1: Device suspended"); - foreach (Account account in connection_todo) { + foreach (Account account in connections.keys) { try { make_offline(account); yield connections[account].stream.disconnect(); -- cgit v1.2.3-54-g00ecf