From 9165c4db278b2d3da636d53e89c3b80cff66977f Mon Sep 17 00:00:00 2001 From: fiaxh Date: Wed, 22 Nov 2017 20:06:50 +0100 Subject: Async service lookup, connect and write --- xmpp-vala/src/core/stanza_writer.vala | 42 ++++++++++++++++++++++++++--------- xmpp-vala/src/core/xmpp_stream.vala | 28 +++++++++++++---------- 2 files changed, 48 insertions(+), 22 deletions(-) (limited to 'xmpp-vala/src/core') diff --git a/xmpp-vala/src/core/stanza_writer.vala b/xmpp-vala/src/core/stanza_writer.vala index e67920db..270d898d 100644 --- a/xmpp-vala/src/core/stanza_writer.vala +++ b/xmpp-vala/src/core/stanza_writer.vala @@ -2,26 +2,48 @@ namespace Xmpp.Core { public class StanzaWriter { private OutputStream output; + private Queue queue = new Queue(); + private bool running = false; + public StanzaWriter.for_stream(OutputStream output) { this.output = output; } - public void write_node(StanzaNode node) throws XmlError { - try { - lock(output) { - output.write_all(node.to_xml().data, null); - } - } catch (GLib.IOError e) { - throw new XmlError.IO_ERROR(@"IOError in GLib: $(e.message)"); - } + public async void write_node(StanzaNode node) throws XmlError { + yield write_data(node.to_xml().data); } public async void write(string s) throws XmlError { + yield write_data(s.data); + } + + private async void write_data(uint8[] data) throws XmlError { + if (running) { + queue.push_tail(new SourceFuncWrapper(write_data.callback)); + yield; + } + running = true; try { - output.write_all(s.data, null); - } catch (GLib.IOError e) { + yield output.write_all_async(data, 0, null, null); + SourceFuncWrapper? sfw = queue.pop_head(); + if (sfw != null) { + sfw.sfun(); + } + } catch (GLib.Error e) { throw new XmlError.IO_ERROR(@"IOError in GLib: $(e.message)"); + } finally { + running = false; } } } + +public class SourceFuncWrapper : Object { + + public SourceFunc sfun; + + public SourceFuncWrapper(owned SourceFunc sfun) { + this.sfun = (owned)sfun; + } +} + } diff --git a/xmpp-vala/src/core/xmpp_stream.vala b/xmpp-vala/src/core/xmpp_stream.vala index fc4e7fd7..ea186a72 100644 --- a/xmpp-vala/src/core/xmpp_stream.vala +++ b/xmpp-vala/src/core/xmpp_stream.vala @@ -49,7 +49,7 @@ public class XmppStream { int min_priority = -1; ConnectionProvider? best_provider = null; foreach (ConnectionProvider connection_provider in connection_providers) { - int? priority = connection_provider.get_priority(remote_name); + int? priority = yield connection_provider.get_priority(remote_name); if (priority != null && (priority < min_priority || min_priority == -1)) { min_priority = priority; best_provider = connection_provider; @@ -57,9 +57,9 @@ public class XmppStream { } IOStream? stream = null; if (best_provider != null) { - stream = best_provider.connect(this); + stream = yield best_provider.connect(this); } else { - stream = (new SocketClient()).connect(new NetworkService("xmpp-client", "tcp", this.remote_name)); + stream = yield (new SocketClient()).connect_async(new NetworkService("xmpp-client", "tcp", this.remote_name)); } if (stream == null) throw new IOStreamError.CONNECT("client.connect() returned null"); reset_stream((!)stream); @@ -108,12 +108,16 @@ public class XmppStream { } } - public void write(StanzaNode node) throws IOStreamError { + public void write(StanzaNode node) { + write_async.begin(node); + } + + public async void write_async(StanzaNode node) throws IOStreamError { StanzaWriter? writer = this.writer; if (writer == null) throw new IOStreamError.WRITE("trying to write, but no stream open"); try { log.node("OUT", node); - ((!)writer).write_node(node); + yield ((!)writer).write_node(node); } catch (XmlError e) { throw new IOStreamError.WRITE(e.message); } @@ -342,19 +346,19 @@ public abstract class XmppStreamNegotiationModule : XmppStreamModule { } public abstract class ConnectionProvider { - public abstract int? get_priority(string remote_name); - public abstract IOStream? connect(XmppStream stream); + public async abstract int? get_priority(string remote_name); + public async abstract IOStream? connect(XmppStream stream); public abstract string get_id(); } public class StartTlsConnectionProvider : ConnectionProvider { private SrvTarget? srv_target; - public override int? get_priority(string remote_name) { + public async override int? get_priority(string remote_name) { GLib.List? xmpp_target = null; try { - Resolver resolver = Resolver.get_default(); - xmpp_target = resolver.lookup_service("xmpp-client", "tcp", remote_name, null); + GLibFixes.Resolver resolver = GLibFixes.Resolver.get_default(); + xmpp_target = yield resolver.lookup_service_async("xmpp-client", "tcp", remote_name, null); } catch (Error e) { return null; } @@ -363,10 +367,10 @@ public class StartTlsConnectionProvider : ConnectionProvider { return xmpp_target.nth(0).data.get_priority(); } - public override IOStream? connect(XmppStream stream) { + public async override IOStream? connect(XmppStream stream) { try { SocketClient client = new SocketClient(); - return client.connect_to_host(srv_target.get_hostname(), srv_target.get_port()); + return yield client.connect_to_host_async(srv_target.get_hostname(), srv_target.get_port()); } catch (Error e) { return null; } -- cgit v1.2.3-70-g09d2