From dc57561dcffda62d01618c72e0bbf5c5a45c2114 Mon Sep 17 00:00:00 2001 From: fiaxh Date: Mon, 19 Aug 2024 12:17:23 +0200 Subject: Add cancellable to stream connect --- xmpp-vala/src/core/direct_tls_xmpp_stream.vala | 2 +- xmpp-vala/src/core/io_xmpp_stream.vala | 28 ++++++++++++++++---------- xmpp-vala/src/core/stanza_reader.vala | 11 ++++------ xmpp-vala/src/core/stanza_writer.vala | 18 +++++++++-------- xmpp-vala/src/core/starttls_xmpp_stream.vala | 2 +- xmpp-vala/src/core/stream_connect.vala | 8 ++++++++ 6 files changed, 41 insertions(+), 28 deletions(-) (limited to 'xmpp-vala/src') diff --git a/xmpp-vala/src/core/direct_tls_xmpp_stream.vala b/xmpp-vala/src/core/direct_tls_xmpp_stream.vala index 26adc90f..f2e06e2c 100644 --- a/xmpp-vala/src/core/direct_tls_xmpp_stream.vala +++ b/xmpp-vala/src/core/direct_tls_xmpp_stream.vala @@ -17,7 +17,7 @@ public class Xmpp.DirectTlsXmppStream : TlsXmppStream { SocketClient client = new SocketClient(); try { debug("Connecting to %s:%i (tls)", host, port); - IOStream? io_stream = yield client.connect_to_host_async(host, port); + IOStream? io_stream = yield client.connect_to_host_async(host, port, cancellable); TlsConnection tls_connection = TlsClientConnection.new(io_stream, new NetworkAddress(remote_name.to_string(), port)); #if GLIB_2_60 tls_connection.set_advertised_protocols(ADVERTISED_PROTOCOLS); diff --git a/xmpp-vala/src/core/io_xmpp_stream.vala b/xmpp-vala/src/core/io_xmpp_stream.vala index 9c58a46b..1d9d061b 100644 --- a/xmpp-vala/src/core/io_xmpp_stream.vala +++ b/xmpp-vala/src/core/io_xmpp_stream.vala @@ -6,32 +6,36 @@ public interface Xmpp.WriteNodeFunc : Object { public abstract class Xmpp.IoXmppStream : XmppStream { private IOStream? stream; + internal Cancellable cancellable; internal StanzaReader? reader; internal StanzaWriter? writer; internal WriteNodeFunc? write_obj = null; - protected IoXmppStream(Jid remote_name) { + protected IoXmppStream(Jid remote_name, Cancellable? cancellable = null) { base(remote_name); + this.cancellable = cancellable ?? new Cancellable(); + } + + public void cancel() { + cancellable.cancel(); } public override async void disconnect() throws IOError { disconnected = true; + cancel(); if (writer == null || reader == null || stream == null) { throw new IOError.CLOSED("trying to disconnect, but no stream open"); } log.str("OUT", "", this); - yield writer.write(""); - reader.cancel(); + yield writer.write("", Priority.LOW, new Cancellable()); yield stream.close_async(); } public void reset_stream(IOStream stream) { this.stream = stream; - reader = new StanzaReader.for_stream(stream.input_stream); - writer = new StanzaWriter.for_stream(stream.output_stream); - - writer.cancel.connect(reader.cancel); + reader = new StanzaReader.for_stream(stream.input_stream, cancellable); + writer = new StanzaWriter.for_stream(stream.output_stream, cancellable); require_setup(); } @@ -48,18 +52,20 @@ public abstract class Xmpp.IoXmppStream : XmppStream { write_async.begin(node, io_priority, null, (obj, res) => { try { write_async.end(res); - } catch (Error e) { } + } catch (Error e) { + warning("Error while writing: %s", e.message); + } }); } public override async void write_async(StanzaNode node, int io_priority = Priority.DEFAULT, Cancellable? cancellable = null) throws IOError { if (write_obj != null) { - yield write_obj.write_stanza(this, node, io_priority, cancellable); + yield write_obj.write_stanza(this, node, io_priority, cancellable ?? this.cancellable); } else { StanzaWriter? writer = this.writer; if (writer == null) throw new IOError.NOT_CONNECTED("trying to write, but no stream open"); log.node("OUT", node, this); - yield ((!)writer).write_node(node, io_priority, cancellable); + yield ((!)writer).write_node(node, io_priority, cancellable ?? this.cancellable); } } @@ -75,7 +81,7 @@ public abstract class Xmpp.IoXmppStream : XmppStream { .put_attribute("stream", "http://etherx.jabber.org/streams", XMLNS_URI); outs.has_nodes = true; log.node("OUT ROOT", outs, this); - write(outs); + yield write_async(outs, Priority.HIGH, cancellable); received_root_node(this, yield read_root()); setup_needed = false; diff --git a/xmpp-vala/src/core/stanza_reader.vala b/xmpp-vala/src/core/stanza_reader.vala index 17f0b7b0..349476ec 100644 --- a/xmpp-vala/src/core/stanza_reader.vala +++ b/xmpp-vala/src/core/stanza_reader.vala @@ -13,7 +13,7 @@ public class StanzaReader { private uint8[] buffer; private int buffer_fill = 0; private int buffer_pos = 0; - private Cancellable cancellable = new Cancellable(); + private Cancellable? cancellable; private NamespaceState ns_state = new NamespaceState(); @@ -26,19 +26,16 @@ public class StanzaReader { this.for_buffer(s.data); } - public StanzaReader.for_stream(InputStream input) { + public StanzaReader.for_stream(InputStream input, Cancellable? cancellable = null) { this.input = input; + this.cancellable = cancellable; buffer = new uint8[BUFFER_MAX]; } - public void cancel() { - cancellable.cancel(); - } - private async void update_buffer() throws IOError { InputStream? input = this.input; if (input == null) throw new IOError.CLOSED("No input stream specified and end of buffer reached."); - if (cancellable.is_cancelled()) throw new IOError.CANCELLED("Input stream is canceled."); + if (cancellable != null && cancellable.is_cancelled()) throw new IOError.CANCELLED("Input stream is canceled."); buffer_fill = (int) yield ((!)input).read_async(buffer, GLib.Priority.DEFAULT, cancellable); if (buffer_fill == 0) throw new IOError.CLOSED("End of input stream reached."); buffer_pos = 0; diff --git a/xmpp-vala/src/core/stanza_writer.vala b/xmpp-vala/src/core/stanza_writer.vala index aecf8983..79af402e 100644 --- a/xmpp-vala/src/core/stanza_writer.vala +++ b/xmpp-vala/src/core/stanza_writer.vala @@ -1,19 +1,19 @@ namespace Xmpp { public class StanzaWriter { - public signal void cancel(); - + private Cancellable? connection_cancellable; private OutputStream output; private Queue queue = new Queue(); private bool running = false; - public StanzaWriter.for_stream(OutputStream output) { + public StanzaWriter.for_stream(OutputStream output, Cancellable? cancellable = null) { this.output = output; + this.connection_cancellable = cancellable; } public async void write_node(StanzaNode node, int io_priority = Priority.DEFAULT, Cancellable? cancellable = null) throws IOError { - yield write_data(node.to_xml().data, io_priority, cancellable); + yield write_data(node.to_xml().data, io_priority, cancellable ?? connection_cancellable); } public async void write_nodes(StanzaNode node1, StanzaNode node2, int io_priority = Priority.DEFAULT, Cancellable? cancellable = null) throws IOError { @@ -29,11 +29,11 @@ public class StanzaWriter { concat[i++] = datum; } - yield write_data(concat, io_priority, cancellable); + yield write_data(concat, io_priority, cancellable ?? connection_cancellable); } public async void write(string s, int io_priority = Priority.DEFAULT, Cancellable? cancellable = null) throws IOError { - yield write_data(s.data, io_priority, cancellable); + yield write_data(s.data, io_priority, cancellable ?? connection_cancellable); } private async void write_data(owned uint8[] data, int io_priority = Priority.DEFAULT, Cancellable? cancellable = null) throws IOError { @@ -45,10 +45,12 @@ public class StanzaWriter { try { yield output.write_all_async(data, io_priority, cancellable, null); } catch (IOError e) { - cancel(); + if (!(e is IOError.CANCELLED)) { + connection_cancellable.cancel(); + } throw e; } catch (GLib.Error e) { - cancel(); + connection_cancellable.cancel(); throw new IOError.FAILED("Error in GLib: %s".printf(e.message)); } finally { SourceFuncWrapper? sfw = queue.pop_head(); diff --git a/xmpp-vala/src/core/starttls_xmpp_stream.vala b/xmpp-vala/src/core/starttls_xmpp_stream.vala index 0d4fbc7d..01c75207 100644 --- a/xmpp-vala/src/core/starttls_xmpp_stream.vala +++ b/xmpp-vala/src/core/starttls_xmpp_stream.vala @@ -17,7 +17,7 @@ public class Xmpp.StartTlsXmppStream : TlsXmppStream { try { SocketClient client = new SocketClient(); debug("Connecting to %s:%i (starttls)", host, port); - IOStream stream = yield client.connect_to_host_async(host, port); + IOStream stream = yield client.connect_to_host_async(host, port, cancellable); reset_stream(stream); yield setup(); diff --git a/xmpp-vala/src/core/stream_connect.vala b/xmpp-vala/src/core/stream_connect.vala index a4c5b82e..17d47f38 100644 --- a/xmpp-vala/src/core/stream_connect.vala +++ b/xmpp-vala/src/core/stream_connect.vala @@ -69,8 +69,16 @@ namespace Xmpp { stream.add_module(module); } + uint connection_timeout_id = Timeout.add_seconds(30, () => { + warning("Connection attempt timed out"); + stream.disconnect(); + return Source.REMOVE; + }); + yield stream.connect(); + Source.remove(connection_timeout_id); + return new XmppStreamResult() { stream=stream }; } catch (IOError e) { warning("Could not establish XMPP session with %s:%i: %s", target.host, target.port, e.message); -- cgit v1.2.3-70-g09d2