aboutsummaryrefslogtreecommitdiff
path: root/libdino/src
diff options
context:
space:
mode:
authorfiaxh <git@lightrise.org>2020-11-16 15:55:33 +0100
committerfiaxh <git@lightrise.org>2020-11-20 15:21:18 +0100
commit07917f1d841f449157aa3aaa2507b0547dd274e7 (patch)
tree315ef3bc243491565d3d5097968dca38d67a7eab /libdino/src
parent881b9eec9dcd8fd8c81b0b9d7bfd2ae714d7722e (diff)
downloaddino-07917f1d841f449157aa3aaa2507b0547dd274e7.tar.gz
dino-07917f1d841f449157aa3aaa2507b0547dd274e7.zip
Refactor XmppStream, TLS and connection method logic
fixes #534
Diffstat (limited to 'libdino/src')
-rw-r--r--libdino/src/service/connection_manager.vala191
-rw-r--r--libdino/src/service/module_manager.vala2
-rw-r--r--libdino/src/service/muc_manager.vala2
-rw-r--r--libdino/src/service/registration.vala105
4 files changed, 189 insertions, 111 deletions
diff --git a/libdino/src/service/connection_manager.vala b/libdino/src/service/connection_manager.vala
index cd5f7384..40cd21d4 100644
--- a/libdino/src/service/connection_manager.vala
+++ b/libdino/src/service/connection_manager.vala
@@ -17,10 +17,12 @@ public class ConnectionManager : Object {
DISCONNECTED
}
- private HashSet<Account> connection_todo = new HashSet<Account>(Account.hash_func, Account.equals_func);
private HashMap<Account, Connection> connections = new HashMap<Account, Connection>(Account.hash_func, Account.equals_func);
private HashMap<Account, ConnectionError> connection_errors = new HashMap<Account, ConnectionError>(Account.hash_func, Account.equals_func);
+ private HashMap<Account, bool> connection_ongoing = new HashMap<Account, bool>(Account.hash_func, Account.equals_func);
+ private HashMap<Account, bool> connection_directly_retry = new HashMap<Account, bool>(Account.hash_func, Account.equals_func);
+
private NetworkMonitor? network_monitor;
private Login1Manager? login1;
private ModuleManager module_manager;
@@ -52,13 +54,45 @@ public class ConnectionManager : Object {
}
private class Connection {
- public XmppStream stream { get; set; }
+ public string uuid { get; set; }
+ public XmppStream? stream { get; set; }
public ConnectionState connection_state { get; set; default = ConnectionState.DISCONNECTED; }
- public DateTime established { get; set; }
- public DateTime last_activity { get; set; }
- public class Connection(XmppStream stream, DateTime established) {
- this.stream = stream;
- this.established = established;
+ public DateTime? established { get; set; }
+ public DateTime? last_activity { get; set; }
+
+ public Connection() {
+ reset();
+ }
+
+ public void reset() {
+ if (stream != null) {
+ stream.detach_modules();
+
+ stream.disconnect.begin();
+ }
+ stream = null;
+ established = last_activity = null;
+ uuid = Xmpp.random_uuid();
+ }
+
+ public void make_offline() {
+ Xmpp.Presence.Stanza presence = new Xmpp.Presence.Stanza();
+ presence.type_ = Xmpp.Presence.Stanza.TYPE_UNAVAILABLE;
+ if (stream != null) {
+ stream.get_module(Presence.Module.IDENTITY).send_presence(stream, presence);
+ }
+ }
+
+ public async void disconnect_account() {
+ make_offline();
+
+ if (stream != null) {
+ try {
+ yield stream.disconnect();
+ } catch (Error e) {
+ debug("Error disconnecting stream: %s", e.message);
+ }
+ }
}
}
@@ -74,7 +108,7 @@ public class ConnectionManager : Object {
login1.PrepareForSleep.connect(on_prepare_for_sleep);
}
Timeout.add_seconds(60, () => {
- foreach (Account account in connection_todo) {
+ foreach (Account account in connections.keys) {
if (connections[account].last_activity != null &&
connections[account].last_activity.compare(new DateTime.now_utc().add_minutes(-1)) < 0) {
check_reconnect(account);
@@ -106,13 +140,16 @@ public class ConnectionManager : Object {
}
public Collection<Account> get_managed_accounts() {
- return connection_todo;
+ return connections.keys;
}
public void connect_account(Account account) {
- if (!connection_todo.contains(account)) connection_todo.add(account);
if (!connections.has_key(account)) {
- connect_(account);
+ connections[account] = new Connection();
+ connection_ongoing[account] = false;
+ connection_directly_retry[account] = false;
+
+ connect_stream.begin(account);
} else {
check_reconnect(account);
}
@@ -125,74 +162,98 @@ public class ConnectionManager : Object {
}
private void make_offline(Account account) {
- Xmpp.Presence.Stanza presence = new Xmpp.Presence.Stanza();
- presence.type_ = Xmpp.Presence.Stanza.TYPE_UNAVAILABLE;
+ connections[account].make_offline();
change_connection_state(account, ConnectionState.DISCONNECTED);
- connections[account].stream.get_module(Presence.Module.IDENTITY).send_presence(connections[account].stream, presence);
}
public async void disconnect_account(Account account) {
if (connections.has_key(account)) {
make_offline(account);
- try {
- yield connections[account].stream.disconnect();
- } catch (Error e) {
- debug("Error disconnecting stream: %s", e.message);
- }
- connection_todo.remove(account);
- if (connections.has_key(account)) {
- connections.unset(account);
- }
+ connections[account].disconnect_account();
+ connections.unset(account);
}
}
- private void connect_(Account account, string? resource = null) {
- if (connections.has_key(account)) connections[account].stream.detach_modules();
+ private async void connect_stream(Account account, string? resource = null) {
+ if (!connections.has_key(account)) return;
+
+ debug("[%s] (Maybe) Establishing a new connection", account.bare_jid.to_string());
+
connection_errors.unset(account);
if (resource == null) resource = account.resourcepart;
- XmppStream stream = new XmppStream();
- foreach (XmppStreamModule module in module_manager.get_modules(account, resource)) {
- stream.add_module(module);
+ XmppStreamResult stream_result;
+
+ if (connection_ongoing[account]) {
+ debug("[%s] Connection attempt already in progress. Directly retry if it fails.", account.bare_jid.to_string());
+ connection_directly_retry[account] = true;
+ return;
+ } else if (connections[account].stream != null) {
+ debug("[%s] Cancelling connecting because there is already a stream", account.bare_jid.to_string());
+ return;
+ } else {
+ connection_ongoing[account] = true;
+ connection_directly_retry[account] = false;
+
+ change_connection_state(account, ConnectionState.CONNECTING);
+ stream_result = yield Xmpp.establish_stream(account.bare_jid, module_manager.get_modules(account, resource), log_options);
+ connections[account].stream = stream_result.stream;
+
+ connection_ongoing[account] = false;
}
- stream.log = new XmppLog(account.bare_jid.to_string(), log_options);
+
+ if (stream_result.stream == null) {
+ if (stream_result.tls_errors != null) {
+ set_connection_error(account, new ConnectionError(ConnectionError.Source.TLS, null) { reconnect_recomendation=ConnectionError.Reconnect.NEVER});
+ return;
+ }
+
+ debug("[%s] Could not connect", account.bare_jid.to_string());
+
+ change_connection_state(account, ConnectionState.DISCONNECTED);
+
+ check_reconnect(account, connection_directly_retry[account]);
+
+ return;
+ }
+
+ XmppStream stream = stream_result.stream;
+
debug("[%s] New connection with resource %s: %p", account.bare_jid.to_string(), resource, stream);
- Connection connection = new Connection(stream, new DateTime.now_utc());
- connections[account] = connection;
- change_connection_state(account, ConnectionState.CONNECTING);
+ connections[account].established = new DateTime.now_utc();
stream.attached_modules.connect((stream) => {
change_connection_state(account, ConnectionState.CONNECTED);
});
stream.get_module(Sasl.Module.IDENTITY).received_auth_failure.connect((stream, node) => {
set_connection_error(account, new ConnectionError(ConnectionError.Source.SASL, null));
});
- stream.get_module(Tls.Module.IDENTITY).invalid_certificate.connect(() => {
- set_connection_error(account, new ConnectionError(ConnectionError.Source.TLS, null) { reconnect_recomendation=ConnectionError.Reconnect.NEVER});
- });
+
+ string connection_uuid = connections[account].uuid;
stream.received_node.connect(() => {
- connection.last_activity = new DateTime.now_utc();
+ if (connections[account].uuid == connection_uuid) {
+ connections[account].last_activity = new DateTime.now_utc();
+ } else {
+ warning("Got node for outdated connection");
+ }
});
- connect_async.begin(account, stream);
stream_opened(account, stream);
- }
- private async void connect_async(Account account, XmppStream stream) {
try {
- yield stream.connect(account.domainpart);
+ yield stream.loop();
} catch (Error e) {
- debug("[%s %p] Error: %s", account.bare_jid.to_string(), stream, e.message);
+ debug("[%s %p] Connection error: %s", account.bare_jid.to_string(), stream, e.message);
+
change_connection_state(account, ConnectionState.DISCONNECTED);
- if (!connection_todo.contains(account)) {
- return;
- }
+ connections[account].reset();
+
StreamError.Flag? flag = stream.get_flag(StreamError.Flag.IDENTITY);
if (flag != null) {
warning(@"[%s %p] Stream Error: %s", account.bare_jid.to_string(), stream, flag.error_type);
set_connection_error(account, new ConnectionError(ConnectionError.Source.STREAM_ERROR, flag.error_type));
if (flag.resource_rejected) {
- connect_(account, account.resourcepart + "-" + random_uuid());
+ connect_stream.begin(account, account.resourcepart + "-" + random_uuid());
return;
}
}
@@ -202,27 +263,36 @@ public class ConnectionManager : Object {
return;
}
- debug("[%s] Check reconnect in 5 sec", account.bare_jid.to_string());
- Timeout.add_seconds(5, () => {
- check_reconnect(account);
- return false;
- });
+ check_reconnect(account);
}
}
private void check_reconnects() {
- foreach (Account account in connection_todo) {
+ foreach (Account account in connections.keys) {
check_reconnect(account);
}
}
- private void check_reconnect(Account account) {
+ private void check_reconnect(Account account, bool directly_reconnect = false) {
if (!connections.has_key(account)) return;
bool acked = false;
DateTime? last_activity_was = connections[account].last_activity;
+ if (connections[account].stream == null) {
+ Timeout.add_seconds(10, () => {
+ if (!connections.has_key(account)) return false;
+ if (connections[account].stream != null) return false;
+ if (connections[account].last_activity != last_activity_was) return false;
+
+ connect_stream.begin(account);
+ return false;
+ });
+ return;
+ }
+
XmppStream stream = connections[account].stream;
+
stream.get_module(Xep.Ping.Module.IDENTITY).send_ping.begin(stream, account.bare_jid.domain_jid, () => {
acked = true;
if (connections[account].stream != stream) return;
@@ -239,15 +309,8 @@ public class ConnectionManager : Object {
debug("[%s %p] Ping timeouted. Reconnecting", account.bare_jid.to_string(), stream);
change_connection_state(account, ConnectionState.DISCONNECTED);
- connections[account].stream.disconnect.begin((_, res) => {
- try {
- connections[account].stream.disconnect.end(res);
- } catch (Error e) {
- debug("Error disconnecting stream: %s", e.message);
- }
- });
-
- connect_(account);
+ connections[account].reset();
+ connect_stream.begin(account);
return false;
});
}
@@ -268,19 +331,19 @@ public class ConnectionManager : Object {
check_reconnects();
} else {
debug("NetworkMonitor: Network reported offline");
- foreach (Account account in connection_todo) {
+ foreach (Account account in connections.keys) {
change_connection_state(account, ConnectionState.DISCONNECTED);
}
}
}
private async void on_prepare_for_sleep(bool suspend) {
- foreach (Account account in connection_todo) {
+ foreach (Account account in connections.keys) {
change_connection_state(account, ConnectionState.DISCONNECTED);
}
if (suspend) {
debug("Login1: Device suspended");
- foreach (Account account in connection_todo) {
+ foreach (Account account in connections.keys) {
try {
make_offline(account);
yield connections[account].stream.disconnect();
diff --git a/libdino/src/service/module_manager.vala b/libdino/src/service/module_manager.vala
index 2fcef581..ebcff6ab 100644
--- a/libdino/src/service/module_manager.vala
+++ b/libdino/src/service/module_manager.vala
@@ -46,8 +46,6 @@ public class ModuleManager {
lock(module_map) {
module_map[account] = new ArrayList<XmppStreamModule>();
module_map[account].add(new Iq.Module());
- module_map[account].add(new Tls.Module());
- module_map[account].add(new Xep.SrvRecordsTls.Module());
module_map[account].add(new Sasl.Module(account.bare_jid.to_string(), account.password));
module_map[account].add(new Xep.StreamManagement.Module());
module_map[account].add(new Bind.Module(account.resourcepart));
diff --git a/libdino/src/service/muc_manager.vala b/libdino/src/service/muc_manager.vala
index 178cc8f9..b5d85236 100644
--- a/libdino/src/service/muc_manager.vala
+++ b/libdino/src/service/muc_manager.vala
@@ -97,6 +97,8 @@ public class MucManager : StreamInteractionModule, Object {
}
public void part(Account account, Jid jid) {
+ if (!mucs_todo.has_key(account) || !mucs_todo[account].contains(jid)) return;
+
mucs_todo[account].remove(jid);
XmppStream? stream = stream_interactor.get_stream(account);
diff --git a/libdino/src/service/registration.vala b/libdino/src/service/registration.vala
index f6c6e95d..b4377b98 100644
--- a/libdino/src/service/registration.vala
+++ b/libdino/src/service/registration.vala
@@ -23,33 +23,35 @@ public class Register : StreamInteractionModule, Object{
}
public async ConnectionManager.ConnectionError.Source? add_check_account(Account account) {
- XmppStream stream = new XmppStream();
- stream.log = new XmppLog(account.bare_jid.to_string(), Application.print_xmpp);
- stream.add_module(new Tls.Module());
- stream.add_module(new Iq.Module());
- stream.add_module(new Xep.SrvRecordsTls.Module());
- stream.add_module(new Sasl.Module(account.bare_jid.to_string(), account.password));
-
ConnectionManager.ConnectionError.Source? ret = null;
+ Gee.List<XmppStreamModule> list = new ArrayList<XmppStreamModule>();
+ list.add(new Iq.Module());
+ list.add(new Sasl.Module(account.bare_jid.to_string(), account.password));
+
+ XmppStreamResult stream_result = yield Xmpp.establish_stream(account.bare_jid.domain_jid, list, Application.print_xmpp);
+
+ if (stream_result.stream == null) {
+ if (stream_result.tls_errors != null) {
+ ret = ConnectionManager.ConnectionError.Source.TLS;
+ }
+ return ret;
+ }
+ XmppStream stream = stream_result.stream;
+
SourceFunc callback = add_check_account.callback;
stream.stream_negotiated.connect(() => {
if (callback == null) return;
Idle.add((owned)callback);
});
- stream.get_module(Tls.Module.IDENTITY).invalid_certificate.connect((peer_cert, errors) => {
- if (callback == null) return;
- ret = ConnectionManager.ConnectionError.Source.TLS;
- Idle.add((owned)callback);
- });
stream.get_module(Sasl.Module.IDENTITY).received_auth_failure.connect((stream, node) => {
if (callback == null) return;
ret = ConnectionManager.ConnectionError.Source.SASL;
Idle.add((owned)callback);
});
- stream.connect.begin(account.domainpart, (_, res) => {
+ stream.loop.begin((_, res) => {
try {
- stream.connect.end(res);
+ stream.loop.end(res);
} catch (Error e) {
debug("Error connecting to stream: %s", e.message);
}
@@ -62,7 +64,7 @@ public class Register : StreamInteractionModule, Object{
yield;
try {
- yield stream.disconnect();
+ yield stream_result.stream.disconnect();
} catch (Error e) {}
return ret;
}
@@ -73,13 +75,24 @@ public class Register : StreamInteractionModule, Object{
}
public static async ServerAvailabilityReturn check_server_availability(Jid jid) {
- XmppStream stream = new XmppStream();
- stream.log = new XmppLog(jid.to_string(), Application.print_xmpp);
- stream.add_module(new Tls.Module());
- stream.add_module(new Iq.Module());
- stream.add_module(new Xep.SrvRecordsTls.Module());
-
ServerAvailabilityReturn ret = new ServerAvailabilityReturn() { available=false };
+
+ Gee.List<XmppStreamModule> list = new ArrayList<XmppStreamModule>();
+ list.add(new Iq.Module());
+
+ XmppStreamResult stream_result = yield Xmpp.establish_stream(jid.domain_jid, list, Application.print_xmpp);
+
+ if (stream_result.stream == null) {
+ if (stream_result.io_error != null) {
+ debug("Error connecting to stream: %s", stream_result.io_error.message);
+ }
+ if (stream_result.tls_errors != null) {
+ ret.error_flags = stream_result.tls_errors;
+ }
+ return ret;
+ }
+ XmppStream stream = stream_result.stream;
+
SourceFunc callback = check_server_availability.callback;
stream.stream_negotiated.connect(() => {
if (callback != null) {
@@ -87,16 +100,10 @@ public class Register : StreamInteractionModule, Object{
Idle.add((owned)callback);
}
});
- stream.get_module(Tls.Module.IDENTITY).invalid_certificate.connect((peer_cert, errors) => {
- if (callback != null) {
- ret.error_flags = errors;
- Idle.add((owned)callback);
- }
- });
- stream.connect.begin(jid.domainpart, (_, res) => {
+ stream.loop.begin((_, res) => {
try {
- stream.connect.end(res);
+ stream.loop.end(res);
} catch (Error e) {
debug("Error connecting to stream: %s", e.message);
}
@@ -114,12 +121,16 @@ public class Register : StreamInteractionModule, Object{
}
public static async Xep.InBandRegistration.Form? get_registration_form(Jid jid) {
- XmppStream stream = new XmppStream();
- stream.log = new XmppLog(jid.to_string(), Application.print_xmpp);
- stream.add_module(new Tls.Module());
- stream.add_module(new Iq.Module());
- stream.add_module(new Xep.SrvRecordsTls.Module());
- stream.add_module(new Xep.InBandRegistration.Module());
+ Gee.List<XmppStreamModule> list = new ArrayList<XmppStreamModule>();
+ list.add(new Iq.Module());
+ list.add(new Xep.InBandRegistration.Module());
+
+ XmppStreamResult stream_result = yield Xmpp.establish_stream(jid.domain_jid, list, Application.print_xmpp);
+
+ if (stream_result.stream == null) {
+ return null;
+ }
+ XmppStream stream = stream_result.stream;
SourceFunc callback = get_registration_form.callback;
@@ -129,9 +140,9 @@ public class Register : StreamInteractionModule, Object{
}
});
- stream.connect.begin(jid.domainpart, (_, res) => {
+ stream.loop.begin((_, res) => {
try {
- stream.connect.end(res);
+ stream.loop.end(res);
} catch (Error e) {
debug("Error connecting to stream: %s", e.message);
}
@@ -154,12 +165,16 @@ public class Register : StreamInteractionModule, Object{
}
public static async string? submit_form(Jid jid, Xep.InBandRegistration.Form form) {
- XmppStream stream = new XmppStream();
- stream.log = new XmppLog(jid.to_string(), Application.print_xmpp);
- stream.add_module(new Tls.Module());
- stream.add_module(new Iq.Module());
- stream.add_module(new Xep.SrvRecordsTls.Module());
- stream.add_module(new Xep.InBandRegistration.Module());
+ Gee.List<XmppStreamModule> list = new ArrayList<XmppStreamModule>();
+ list.add(new Iq.Module());
+ list.add(new Xep.InBandRegistration.Module());
+
+ XmppStreamResult stream_result = yield Xmpp.establish_stream(jid.domain_jid, list, Application.print_xmpp);
+
+ if (stream_result.stream == null) {
+ return null;
+ }
+ XmppStream stream = stream_result.stream;
SourceFunc callback = submit_form.callback;
@@ -169,9 +184,9 @@ public class Register : StreamInteractionModule, Object{
}
});
- stream.connect.begin(jid.domainpart, (_, res) => {
+ stream.loop.begin((_, res) => {
try {
- stream.connect.end(res);
+ stream.loop.end(res);
} catch (Error e) {
debug("Error connecting to stream: %s", e.message);
}