aboutsummaryrefslogtreecommitdiff
path: root/xmpp-vala/src/core
diff options
context:
space:
mode:
authorfiaxh <git@lightrise.org>2024-08-19 12:17:23 +0200
committerfiaxh <git@lightrise.org>2024-08-19 12:28:45 +0200
commitdc57561dcffda62d01618c72e0bbf5c5a45c2114 (patch)
treef2336f1d6bef1b1a7b37577d444c4141a51fbc39 /xmpp-vala/src/core
parent88376cd6f75d5057caa6582d0a82fc76bb7b388f (diff)
downloaddino-dc57561dcffda62d01618c72e0bbf5c5a45c2114.tar.gz
dino-dc57561dcffda62d01618c72e0bbf5c5a45c2114.zip
Add cancellable to stream connect
Diffstat (limited to 'xmpp-vala/src/core')
-rw-r--r--xmpp-vala/src/core/direct_tls_xmpp_stream.vala2
-rw-r--r--xmpp-vala/src/core/io_xmpp_stream.vala28
-rw-r--r--xmpp-vala/src/core/stanza_reader.vala11
-rw-r--r--xmpp-vala/src/core/stanza_writer.vala18
-rw-r--r--xmpp-vala/src/core/starttls_xmpp_stream.vala2
-rw-r--r--xmpp-vala/src/core/stream_connect.vala8
6 files changed, 41 insertions, 28 deletions
diff --git a/xmpp-vala/src/core/direct_tls_xmpp_stream.vala b/xmpp-vala/src/core/direct_tls_xmpp_stream.vala
index 26adc90f..f2e06e2c 100644
--- a/xmpp-vala/src/core/direct_tls_xmpp_stream.vala
+++ b/xmpp-vala/src/core/direct_tls_xmpp_stream.vala
@@ -17,7 +17,7 @@ public class Xmpp.DirectTlsXmppStream : TlsXmppStream {
SocketClient client = new SocketClient();
try {
debug("Connecting to %s:%i (tls)", host, port);
- IOStream? io_stream = yield client.connect_to_host_async(host, port);
+ IOStream? io_stream = yield client.connect_to_host_async(host, port, cancellable);
TlsConnection tls_connection = TlsClientConnection.new(io_stream, new NetworkAddress(remote_name.to_string(), port));
#if GLIB_2_60
tls_connection.set_advertised_protocols(ADVERTISED_PROTOCOLS);
diff --git a/xmpp-vala/src/core/io_xmpp_stream.vala b/xmpp-vala/src/core/io_xmpp_stream.vala
index 9c58a46b..1d9d061b 100644
--- a/xmpp-vala/src/core/io_xmpp_stream.vala
+++ b/xmpp-vala/src/core/io_xmpp_stream.vala
@@ -6,32 +6,36 @@ public interface Xmpp.WriteNodeFunc : Object {
public abstract class Xmpp.IoXmppStream : XmppStream {
private IOStream? stream;
+ internal Cancellable cancellable;
internal StanzaReader? reader;
internal StanzaWriter? writer;
internal WriteNodeFunc? write_obj = null;
- protected IoXmppStream(Jid remote_name) {
+ protected IoXmppStream(Jid remote_name, Cancellable? cancellable = null) {
base(remote_name);
+ this.cancellable = cancellable ?? new Cancellable();
+ }
+
+ public void cancel() {
+ cancellable.cancel();
}
public override async void disconnect() throws IOError {
disconnected = true;
+ cancel();
if (writer == null || reader == null || stream == null) {
throw new IOError.CLOSED("trying to disconnect, but no stream open");
}
log.str("OUT", "</stream:stream>", this);
- yield writer.write("</stream:stream>");
- reader.cancel();
+ yield writer.write("</stream:stream>", Priority.LOW, new Cancellable());
yield stream.close_async();
}
public void reset_stream(IOStream stream) {
this.stream = stream;
- reader = new StanzaReader.for_stream(stream.input_stream);
- writer = new StanzaWriter.for_stream(stream.output_stream);
-
- writer.cancel.connect(reader.cancel);
+ reader = new StanzaReader.for_stream(stream.input_stream, cancellable);
+ writer = new StanzaWriter.for_stream(stream.output_stream, cancellable);
require_setup();
}
@@ -48,18 +52,20 @@ public abstract class Xmpp.IoXmppStream : XmppStream {
write_async.begin(node, io_priority, null, (obj, res) => {
try {
write_async.end(res);
- } catch (Error e) { }
+ } catch (Error e) {
+ warning("Error while writing: %s", e.message);
+ }
});
}
public override async void write_async(StanzaNode node, int io_priority = Priority.DEFAULT, Cancellable? cancellable = null) throws IOError {
if (write_obj != null) {
- yield write_obj.write_stanza(this, node, io_priority, cancellable);
+ yield write_obj.write_stanza(this, node, io_priority, cancellable ?? this.cancellable);
} else {
StanzaWriter? writer = this.writer;
if (writer == null) throw new IOError.NOT_CONNECTED("trying to write, but no stream open");
log.node("OUT", node, this);
- yield ((!)writer).write_node(node, io_priority, cancellable);
+ yield ((!)writer).write_node(node, io_priority, cancellable ?? this.cancellable);
}
}
@@ -75,7 +81,7 @@ public abstract class Xmpp.IoXmppStream : XmppStream {
.put_attribute("stream", "http://etherx.jabber.org/streams", XMLNS_URI);
outs.has_nodes = true;
log.node("OUT ROOT", outs, this);
- write(outs);
+ yield write_async(outs, Priority.HIGH, cancellable);
received_root_node(this, yield read_root());
setup_needed = false;
diff --git a/xmpp-vala/src/core/stanza_reader.vala b/xmpp-vala/src/core/stanza_reader.vala
index 17f0b7b0..349476ec 100644
--- a/xmpp-vala/src/core/stanza_reader.vala
+++ b/xmpp-vala/src/core/stanza_reader.vala
@@ -13,7 +13,7 @@ public class StanzaReader {
private uint8[] buffer;
private int buffer_fill = 0;
private int buffer_pos = 0;
- private Cancellable cancellable = new Cancellable();
+ private Cancellable? cancellable;
private NamespaceState ns_state = new NamespaceState();
@@ -26,19 +26,16 @@ public class StanzaReader {
this.for_buffer(s.data);
}
- public StanzaReader.for_stream(InputStream input) {
+ public StanzaReader.for_stream(InputStream input, Cancellable? cancellable = null) {
this.input = input;
+ this.cancellable = cancellable;
buffer = new uint8[BUFFER_MAX];
}
- public void cancel() {
- cancellable.cancel();
- }
-
private async void update_buffer() throws IOError {
InputStream? input = this.input;
if (input == null) throw new IOError.CLOSED("No input stream specified and end of buffer reached.");
- if (cancellable.is_cancelled()) throw new IOError.CANCELLED("Input stream is canceled.");
+ if (cancellable != null && cancellable.is_cancelled()) throw new IOError.CANCELLED("Input stream is canceled.");
buffer_fill = (int) yield ((!)input).read_async(buffer, GLib.Priority.DEFAULT, cancellable);
if (buffer_fill == 0) throw new IOError.CLOSED("End of input stream reached.");
buffer_pos = 0;
diff --git a/xmpp-vala/src/core/stanza_writer.vala b/xmpp-vala/src/core/stanza_writer.vala
index aecf8983..79af402e 100644
--- a/xmpp-vala/src/core/stanza_writer.vala
+++ b/xmpp-vala/src/core/stanza_writer.vala
@@ -1,19 +1,19 @@
namespace Xmpp {
public class StanzaWriter {
- public signal void cancel();
-
+ private Cancellable? connection_cancellable;
private OutputStream output;
private Queue<SourceFuncWrapper> queue = new Queue<SourceFuncWrapper>();
private bool running = false;
- public StanzaWriter.for_stream(OutputStream output) {
+ public StanzaWriter.for_stream(OutputStream output, Cancellable? cancellable = null) {
this.output = output;
+ this.connection_cancellable = cancellable;
}
public async void write_node(StanzaNode node, int io_priority = Priority.DEFAULT, Cancellable? cancellable = null) throws IOError {
- yield write_data(node.to_xml().data, io_priority, cancellable);
+ yield write_data(node.to_xml().data, io_priority, cancellable ?? connection_cancellable);
}
public async void write_nodes(StanzaNode node1, StanzaNode node2, int io_priority = Priority.DEFAULT, Cancellable? cancellable = null) throws IOError {
@@ -29,11 +29,11 @@ public class StanzaWriter {
concat[i++] = datum;
}
- yield write_data(concat, io_priority, cancellable);
+ yield write_data(concat, io_priority, cancellable ?? connection_cancellable);
}
public async void write(string s, int io_priority = Priority.DEFAULT, Cancellable? cancellable = null) throws IOError {
- yield write_data(s.data, io_priority, cancellable);
+ yield write_data(s.data, io_priority, cancellable ?? connection_cancellable);
}
private async void write_data(owned uint8[] data, int io_priority = Priority.DEFAULT, Cancellable? cancellable = null) throws IOError {
@@ -45,10 +45,12 @@ public class StanzaWriter {
try {
yield output.write_all_async(data, io_priority, cancellable, null);
} catch (IOError e) {
- cancel();
+ if (!(e is IOError.CANCELLED)) {
+ connection_cancellable.cancel();
+ }
throw e;
} catch (GLib.Error e) {
- cancel();
+ connection_cancellable.cancel();
throw new IOError.FAILED("Error in GLib: %s".printf(e.message));
} finally {
SourceFuncWrapper? sfw = queue.pop_head();
diff --git a/xmpp-vala/src/core/starttls_xmpp_stream.vala b/xmpp-vala/src/core/starttls_xmpp_stream.vala
index 0d4fbc7d..01c75207 100644
--- a/xmpp-vala/src/core/starttls_xmpp_stream.vala
+++ b/xmpp-vala/src/core/starttls_xmpp_stream.vala
@@ -17,7 +17,7 @@ public class Xmpp.StartTlsXmppStream : TlsXmppStream {
try {
SocketClient client = new SocketClient();
debug("Connecting to %s:%i (starttls)", host, port);
- IOStream stream = yield client.connect_to_host_async(host, port);
+ IOStream stream = yield client.connect_to_host_async(host, port, cancellable);
reset_stream(stream);
yield setup();
diff --git a/xmpp-vala/src/core/stream_connect.vala b/xmpp-vala/src/core/stream_connect.vala
index a4c5b82e..17d47f38 100644
--- a/xmpp-vala/src/core/stream_connect.vala
+++ b/xmpp-vala/src/core/stream_connect.vala
@@ -69,8 +69,16 @@ namespace Xmpp {
stream.add_module(module);
}
+ uint connection_timeout_id = Timeout.add_seconds(30, () => {
+ warning("Connection attempt timed out");
+ stream.disconnect();
+ return Source.REMOVE;
+ });
+
yield stream.connect();
+ Source.remove(connection_timeout_id);
+
return new XmppStreamResult() { stream=stream };
} catch (IOError e) {
warning("Could not establish XMPP session with %s:%i: %s", target.host, target.port, e.message);