From 07917f1d841f449157aa3aaa2507b0547dd274e7 Mon Sep 17 00:00:00 2001 From: fiaxh Date: Mon, 16 Nov 2020 15:55:33 +0100 Subject: Refactor XmppStream, TLS and connection method logic fixes #534 --- libdino/src/service/connection_manager.vala | 191 ++++++++++++++++++---------- libdino/src/service/module_manager.vala | 2 - libdino/src/service/muc_manager.vala | 2 + libdino/src/service/registration.vala | 105 ++++++++------- 4 files changed, 189 insertions(+), 111 deletions(-) (limited to 'libdino/src') diff --git a/libdino/src/service/connection_manager.vala b/libdino/src/service/connection_manager.vala index cd5f7384..40cd21d4 100644 --- a/libdino/src/service/connection_manager.vala +++ b/libdino/src/service/connection_manager.vala @@ -17,10 +17,12 @@ public class ConnectionManager : Object { DISCONNECTED } - private HashSet connection_todo = new HashSet(Account.hash_func, Account.equals_func); private HashMap connections = new HashMap(Account.hash_func, Account.equals_func); private HashMap connection_errors = new HashMap(Account.hash_func, Account.equals_func); + private HashMap connection_ongoing = new HashMap(Account.hash_func, Account.equals_func); + private HashMap connection_directly_retry = new HashMap(Account.hash_func, Account.equals_func); + private NetworkMonitor? network_monitor; private Login1Manager? login1; private ModuleManager module_manager; @@ -52,13 +54,45 @@ public class ConnectionManager : Object { } private class Connection { - public XmppStream stream { get; set; } + public string uuid { get; set; } + public XmppStream? stream { get; set; } public ConnectionState connection_state { get; set; default = ConnectionState.DISCONNECTED; } - public DateTime established { get; set; } - public DateTime last_activity { get; set; } - public class Connection(XmppStream stream, DateTime established) { - this.stream = stream; - this.established = established; + public DateTime? established { get; set; } + public DateTime? last_activity { get; set; } + + public Connection() { + reset(); + } + + public void reset() { + if (stream != null) { + stream.detach_modules(); + + stream.disconnect.begin(); + } + stream = null; + established = last_activity = null; + uuid = Xmpp.random_uuid(); + } + + public void make_offline() { + Xmpp.Presence.Stanza presence = new Xmpp.Presence.Stanza(); + presence.type_ = Xmpp.Presence.Stanza.TYPE_UNAVAILABLE; + if (stream != null) { + stream.get_module(Presence.Module.IDENTITY).send_presence(stream, presence); + } + } + + public async void disconnect_account() { + make_offline(); + + if (stream != null) { + try { + yield stream.disconnect(); + } catch (Error e) { + debug("Error disconnecting stream: %s", e.message); + } + } } } @@ -74,7 +108,7 @@ public class ConnectionManager : Object { login1.PrepareForSleep.connect(on_prepare_for_sleep); } Timeout.add_seconds(60, () => { - foreach (Account account in connection_todo) { + foreach (Account account in connections.keys) { if (connections[account].last_activity != null && connections[account].last_activity.compare(new DateTime.now_utc().add_minutes(-1)) < 0) { check_reconnect(account); @@ -106,13 +140,16 @@ public class ConnectionManager : Object { } public Collection get_managed_accounts() { - return connection_todo; + return connections.keys; } public void connect_account(Account account) { - if (!connection_todo.contains(account)) connection_todo.add(account); if (!connections.has_key(account)) { - connect_(account); + connections[account] = new Connection(); + connection_ongoing[account] = false; + connection_directly_retry[account] = false; + + connect_stream.begin(account); } else { check_reconnect(account); } @@ -125,74 +162,98 @@ public class ConnectionManager : Object { } private void make_offline(Account account) { - Xmpp.Presence.Stanza presence = new Xmpp.Presence.Stanza(); - presence.type_ = Xmpp.Presence.Stanza.TYPE_UNAVAILABLE; + connections[account].make_offline(); change_connection_state(account, ConnectionState.DISCONNECTED); - connections[account].stream.get_module(Presence.Module.IDENTITY).send_presence(connections[account].stream, presence); } public async void disconnect_account(Account account) { if (connections.has_key(account)) { make_offline(account); - try { - yield connections[account].stream.disconnect(); - } catch (Error e) { - debug("Error disconnecting stream: %s", e.message); - } - connection_todo.remove(account); - if (connections.has_key(account)) { - connections.unset(account); - } + connections[account].disconnect_account(); + connections.unset(account); } } - private void connect_(Account account, string? resource = null) { - if (connections.has_key(account)) connections[account].stream.detach_modules(); + private async void connect_stream(Account account, string? resource = null) { + if (!connections.has_key(account)) return; + + debug("[%s] (Maybe) Establishing a new connection", account.bare_jid.to_string()); + connection_errors.unset(account); if (resource == null) resource = account.resourcepart; - XmppStream stream = new XmppStream(); - foreach (XmppStreamModule module in module_manager.get_modules(account, resource)) { - stream.add_module(module); + XmppStreamResult stream_result; + + if (connection_ongoing[account]) { + debug("[%s] Connection attempt already in progress. Directly retry if it fails.", account.bare_jid.to_string()); + connection_directly_retry[account] = true; + return; + } else if (connections[account].stream != null) { + debug("[%s] Cancelling connecting because there is already a stream", account.bare_jid.to_string()); + return; + } else { + connection_ongoing[account] = true; + connection_directly_retry[account] = false; + + change_connection_state(account, ConnectionState.CONNECTING); + stream_result = yield Xmpp.establish_stream(account.bare_jid, module_manager.get_modules(account, resource), log_options); + connections[account].stream = stream_result.stream; + + connection_ongoing[account] = false; } - stream.log = new XmppLog(account.bare_jid.to_string(), log_options); + + if (stream_result.stream == null) { + if (stream_result.tls_errors != null) { + set_connection_error(account, new ConnectionError(ConnectionError.Source.TLS, null) { reconnect_recomendation=ConnectionError.Reconnect.NEVER}); + return; + } + + debug("[%s] Could not connect", account.bare_jid.to_string()); + + change_connection_state(account, ConnectionState.DISCONNECTED); + + check_reconnect(account, connection_directly_retry[account]); + + return; + } + + XmppStream stream = stream_result.stream; + debug("[%s] New connection with resource %s: %p", account.bare_jid.to_string(), resource, stream); - Connection connection = new Connection(stream, new DateTime.now_utc()); - connections[account] = connection; - change_connection_state(account, ConnectionState.CONNECTING); + connections[account].established = new DateTime.now_utc(); stream.attached_modules.connect((stream) => { change_connection_state(account, ConnectionState.CONNECTED); }); stream.get_module(Sasl.Module.IDENTITY).received_auth_failure.connect((stream, node) => { set_connection_error(account, new ConnectionError(ConnectionError.Source.SASL, null)); }); - stream.get_module(Tls.Module.IDENTITY).invalid_certificate.connect(() => { - set_connection_error(account, new ConnectionError(ConnectionError.Source.TLS, null) { reconnect_recomendation=ConnectionError.Reconnect.NEVER}); - }); + + string connection_uuid = connections[account].uuid; stream.received_node.connect(() => { - connection.last_activity = new DateTime.now_utc(); + if (connections[account].uuid == connection_uuid) { + connections[account].last_activity = new DateTime.now_utc(); + } else { + warning("Got node for outdated connection"); + } }); - connect_async.begin(account, stream); stream_opened(account, stream); - } - private async void connect_async(Account account, XmppStream stream) { try { - yield stream.connect(account.domainpart); + yield stream.loop(); } catch (Error e) { - debug("[%s %p] Error: %s", account.bare_jid.to_string(), stream, e.message); + debug("[%s %p] Connection error: %s", account.bare_jid.to_string(), stream, e.message); + change_connection_state(account, ConnectionState.DISCONNECTED); - if (!connection_todo.contains(account)) { - return; - } + connections[account].reset(); + StreamError.Flag? flag = stream.get_flag(StreamError.Flag.IDENTITY); if (flag != null) { warning(@"[%s %p] Stream Error: %s", account.bare_jid.to_string(), stream, flag.error_type); set_connection_error(account, new ConnectionError(ConnectionError.Source.STREAM_ERROR, flag.error_type)); if (flag.resource_rejected) { - connect_(account, account.resourcepart + "-" + random_uuid()); + connect_stream.begin(account, account.resourcepart + "-" + random_uuid()); return; } } @@ -202,27 +263,36 @@ public class ConnectionManager : Object { return; } - debug("[%s] Check reconnect in 5 sec", account.bare_jid.to_string()); - Timeout.add_seconds(5, () => { - check_reconnect(account); - return false; - }); + check_reconnect(account); } } private void check_reconnects() { - foreach (Account account in connection_todo) { + foreach (Account account in connections.keys) { check_reconnect(account); } } - private void check_reconnect(Account account) { + private void check_reconnect(Account account, bool directly_reconnect = false) { if (!connections.has_key(account)) return; bool acked = false; DateTime? last_activity_was = connections[account].last_activity; + if (connections[account].stream == null) { + Timeout.add_seconds(10, () => { + if (!connections.has_key(account)) return false; + if (connections[account].stream != null) return false; + if (connections[account].last_activity != last_activity_was) return false; + + connect_stream.begin(account); + return false; + }); + return; + } + XmppStream stream = connections[account].stream; + stream.get_module(Xep.Ping.Module.IDENTITY).send_ping.begin(stream, account.bare_jid.domain_jid, () => { acked = true; if (connections[account].stream != stream) return; @@ -239,15 +309,8 @@ public class ConnectionManager : Object { debug("[%s %p] Ping timeouted. Reconnecting", account.bare_jid.to_string(), stream); change_connection_state(account, ConnectionState.DISCONNECTED); - connections[account].stream.disconnect.begin((_, res) => { - try { - connections[account].stream.disconnect.end(res); - } catch (Error e) { - debug("Error disconnecting stream: %s", e.message); - } - }); - - connect_(account); + connections[account].reset(); + connect_stream.begin(account); return false; }); } @@ -268,19 +331,19 @@ public class ConnectionManager : Object { check_reconnects(); } else { debug("NetworkMonitor: Network reported offline"); - foreach (Account account in connection_todo) { + foreach (Account account in connections.keys) { change_connection_state(account, ConnectionState.DISCONNECTED); } } } private async void on_prepare_for_sleep(bool suspend) { - foreach (Account account in connection_todo) { + foreach (Account account in connections.keys) { change_connection_state(account, ConnectionState.DISCONNECTED); } if (suspend) { debug("Login1: Device suspended"); - foreach (Account account in connection_todo) { + foreach (Account account in connections.keys) { try { make_offline(account); yield connections[account].stream.disconnect(); diff --git a/libdino/src/service/module_manager.vala b/libdino/src/service/module_manager.vala index 2fcef581..ebcff6ab 100644 --- a/libdino/src/service/module_manager.vala +++ b/libdino/src/service/module_manager.vala @@ -46,8 +46,6 @@ public class ModuleManager { lock(module_map) { module_map[account] = new ArrayList(); module_map[account].add(new Iq.Module()); - module_map[account].add(new Tls.Module()); - module_map[account].add(new Xep.SrvRecordsTls.Module()); module_map[account].add(new Sasl.Module(account.bare_jid.to_string(), account.password)); module_map[account].add(new Xep.StreamManagement.Module()); module_map[account].add(new Bind.Module(account.resourcepart)); diff --git a/libdino/src/service/muc_manager.vala b/libdino/src/service/muc_manager.vala index 178cc8f9..b5d85236 100644 --- a/libdino/src/service/muc_manager.vala +++ b/libdino/src/service/muc_manager.vala @@ -97,6 +97,8 @@ public class MucManager : StreamInteractionModule, Object { } public void part(Account account, Jid jid) { + if (!mucs_todo.has_key(account) || !mucs_todo[account].contains(jid)) return; + mucs_todo[account].remove(jid); XmppStream? stream = stream_interactor.get_stream(account); diff --git a/libdino/src/service/registration.vala b/libdino/src/service/registration.vala index f6c6e95d..b4377b98 100644 --- a/libdino/src/service/registration.vala +++ b/libdino/src/service/registration.vala @@ -23,33 +23,35 @@ public class Register : StreamInteractionModule, Object{ } public async ConnectionManager.ConnectionError.Source? add_check_account(Account account) { - XmppStream stream = new XmppStream(); - stream.log = new XmppLog(account.bare_jid.to_string(), Application.print_xmpp); - stream.add_module(new Tls.Module()); - stream.add_module(new Iq.Module()); - stream.add_module(new Xep.SrvRecordsTls.Module()); - stream.add_module(new Sasl.Module(account.bare_jid.to_string(), account.password)); - ConnectionManager.ConnectionError.Source? ret = null; + Gee.List list = new ArrayList(); + list.add(new Iq.Module()); + list.add(new Sasl.Module(account.bare_jid.to_string(), account.password)); + + XmppStreamResult stream_result = yield Xmpp.establish_stream(account.bare_jid.domain_jid, list, Application.print_xmpp); + + if (stream_result.stream == null) { + if (stream_result.tls_errors != null) { + ret = ConnectionManager.ConnectionError.Source.TLS; + } + return ret; + } + XmppStream stream = stream_result.stream; + SourceFunc callback = add_check_account.callback; stream.stream_negotiated.connect(() => { if (callback == null) return; Idle.add((owned)callback); }); - stream.get_module(Tls.Module.IDENTITY).invalid_certificate.connect((peer_cert, errors) => { - if (callback == null) return; - ret = ConnectionManager.ConnectionError.Source.TLS; - Idle.add((owned)callback); - }); stream.get_module(Sasl.Module.IDENTITY).received_auth_failure.connect((stream, node) => { if (callback == null) return; ret = ConnectionManager.ConnectionError.Source.SASL; Idle.add((owned)callback); }); - stream.connect.begin(account.domainpart, (_, res) => { + stream.loop.begin((_, res) => { try { - stream.connect.end(res); + stream.loop.end(res); } catch (Error e) { debug("Error connecting to stream: %s", e.message); } @@ -62,7 +64,7 @@ public class Register : StreamInteractionModule, Object{ yield; try { - yield stream.disconnect(); + yield stream_result.stream.disconnect(); } catch (Error e) {} return ret; } @@ -73,13 +75,24 @@ public class Register : StreamInteractionModule, Object{ } public static async ServerAvailabilityReturn check_server_availability(Jid jid) { - XmppStream stream = new XmppStream(); - stream.log = new XmppLog(jid.to_string(), Application.print_xmpp); - stream.add_module(new Tls.Module()); - stream.add_module(new Iq.Module()); - stream.add_module(new Xep.SrvRecordsTls.Module()); - ServerAvailabilityReturn ret = new ServerAvailabilityReturn() { available=false }; + + Gee.List list = new ArrayList(); + list.add(new Iq.Module()); + + XmppStreamResult stream_result = yield Xmpp.establish_stream(jid.domain_jid, list, Application.print_xmpp); + + if (stream_result.stream == null) { + if (stream_result.io_error != null) { + debug("Error connecting to stream: %s", stream_result.io_error.message); + } + if (stream_result.tls_errors != null) { + ret.error_flags = stream_result.tls_errors; + } + return ret; + } + XmppStream stream = stream_result.stream; + SourceFunc callback = check_server_availability.callback; stream.stream_negotiated.connect(() => { if (callback != null) { @@ -87,16 +100,10 @@ public class Register : StreamInteractionModule, Object{ Idle.add((owned)callback); } }); - stream.get_module(Tls.Module.IDENTITY).invalid_certificate.connect((peer_cert, errors) => { - if (callback != null) { - ret.error_flags = errors; - Idle.add((owned)callback); - } - }); - stream.connect.begin(jid.domainpart, (_, res) => { + stream.loop.begin((_, res) => { try { - stream.connect.end(res); + stream.loop.end(res); } catch (Error e) { debug("Error connecting to stream: %s", e.message); } @@ -114,12 +121,16 @@ public class Register : StreamInteractionModule, Object{ } public static async Xep.InBandRegistration.Form? get_registration_form(Jid jid) { - XmppStream stream = new XmppStream(); - stream.log = new XmppLog(jid.to_string(), Application.print_xmpp); - stream.add_module(new Tls.Module()); - stream.add_module(new Iq.Module()); - stream.add_module(new Xep.SrvRecordsTls.Module()); - stream.add_module(new Xep.InBandRegistration.Module()); + Gee.List list = new ArrayList(); + list.add(new Iq.Module()); + list.add(new Xep.InBandRegistration.Module()); + + XmppStreamResult stream_result = yield Xmpp.establish_stream(jid.domain_jid, list, Application.print_xmpp); + + if (stream_result.stream == null) { + return null; + } + XmppStream stream = stream_result.stream; SourceFunc callback = get_registration_form.callback; @@ -129,9 +140,9 @@ public class Register : StreamInteractionModule, Object{ } }); - stream.connect.begin(jid.domainpart, (_, res) => { + stream.loop.begin((_, res) => { try { - stream.connect.end(res); + stream.loop.end(res); } catch (Error e) { debug("Error connecting to stream: %s", e.message); } @@ -154,12 +165,16 @@ public class Register : StreamInteractionModule, Object{ } public static async string? submit_form(Jid jid, Xep.InBandRegistration.Form form) { - XmppStream stream = new XmppStream(); - stream.log = new XmppLog(jid.to_string(), Application.print_xmpp); - stream.add_module(new Tls.Module()); - stream.add_module(new Iq.Module()); - stream.add_module(new Xep.SrvRecordsTls.Module()); - stream.add_module(new Xep.InBandRegistration.Module()); + Gee.List list = new ArrayList(); + list.add(new Iq.Module()); + list.add(new Xep.InBandRegistration.Module()); + + XmppStreamResult stream_result = yield Xmpp.establish_stream(jid.domain_jid, list, Application.print_xmpp); + + if (stream_result.stream == null) { + return null; + } + XmppStream stream = stream_result.stream; SourceFunc callback = submit_form.callback; @@ -169,9 +184,9 @@ public class Register : StreamInteractionModule, Object{ } }); - stream.connect.begin(jid.domainpart, (_, res) => { + stream.loop.begin((_, res) => { try { - stream.connect.end(res); + stream.loop.end(res); } catch (Error e) { debug("Error connecting to stream: %s", e.message); } -- cgit v1.2.3-70-g09d2