From dc57561dcffda62d01618c72e0bbf5c5a45c2114 Mon Sep 17 00:00:00 2001 From: fiaxh Date: Mon, 19 Aug 2024 12:17:23 +0200 Subject: Add cancellable to stream connect --- xmpp-vala/src/core/io_xmpp_stream.vala | 28 +++++++++++++++++----------- 1 file changed, 17 insertions(+), 11 deletions(-) (limited to 'xmpp-vala/src/core/io_xmpp_stream.vala') diff --git a/xmpp-vala/src/core/io_xmpp_stream.vala b/xmpp-vala/src/core/io_xmpp_stream.vala index 9c58a46b..1d9d061b 100644 --- a/xmpp-vala/src/core/io_xmpp_stream.vala +++ b/xmpp-vala/src/core/io_xmpp_stream.vala @@ -6,32 +6,36 @@ public interface Xmpp.WriteNodeFunc : Object { public abstract class Xmpp.IoXmppStream : XmppStream { private IOStream? stream; + internal Cancellable cancellable; internal StanzaReader? reader; internal StanzaWriter? writer; internal WriteNodeFunc? write_obj = null; - protected IoXmppStream(Jid remote_name) { + protected IoXmppStream(Jid remote_name, Cancellable? cancellable = null) { base(remote_name); + this.cancellable = cancellable ?? new Cancellable(); + } + + public void cancel() { + cancellable.cancel(); } public override async void disconnect() throws IOError { disconnected = true; + cancel(); if (writer == null || reader == null || stream == null) { throw new IOError.CLOSED("trying to disconnect, but no stream open"); } log.str("OUT", "", this); - yield writer.write(""); - reader.cancel(); + yield writer.write("", Priority.LOW, new Cancellable()); yield stream.close_async(); } public void reset_stream(IOStream stream) { this.stream = stream; - reader = new StanzaReader.for_stream(stream.input_stream); - writer = new StanzaWriter.for_stream(stream.output_stream); - - writer.cancel.connect(reader.cancel); + reader = new StanzaReader.for_stream(stream.input_stream, cancellable); + writer = new StanzaWriter.for_stream(stream.output_stream, cancellable); require_setup(); } @@ -48,18 +52,20 @@ public abstract class Xmpp.IoXmppStream : XmppStream { write_async.begin(node, io_priority, null, (obj, res) => { try { write_async.end(res); - } catch (Error e) { } + } catch (Error e) { + warning("Error while writing: %s", e.message); + } }); } public override async void write_async(StanzaNode node, int io_priority = Priority.DEFAULT, Cancellable? cancellable = null) throws IOError { if (write_obj != null) { - yield write_obj.write_stanza(this, node, io_priority, cancellable); + yield write_obj.write_stanza(this, node, io_priority, cancellable ?? this.cancellable); } else { StanzaWriter? writer = this.writer; if (writer == null) throw new IOError.NOT_CONNECTED("trying to write, but no stream open"); log.node("OUT", node, this); - yield ((!)writer).write_node(node, io_priority, cancellable); + yield ((!)writer).write_node(node, io_priority, cancellable ?? this.cancellable); } } @@ -75,7 +81,7 @@ public abstract class Xmpp.IoXmppStream : XmppStream { .put_attribute("stream", "http://etherx.jabber.org/streams", XMLNS_URI); outs.has_nodes = true; log.node("OUT ROOT", outs, this); - write(outs); + yield write_async(outs, Priority.HIGH, cancellable); received_root_node(this, yield read_root()); setup_needed = false; -- cgit v1.2.3-70-g09d2