1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
|
using Gee;
public interface Xmpp.WriteNodeFunc : Object {
public abstract async void write_stanza(XmppStream stream, StanzaNode node, int io_priority = Priority.DEFAULT, Cancellable? cancellable = null) throws IOError;
}
public abstract class Xmpp.IoXmppStream : XmppStream {
private IOStream? stream;
internal Cancellable cancellable;
internal StanzaReader? reader;
internal StanzaWriter? writer;
internal WriteNodeFunc? write_obj = null;
protected IoXmppStream(Jid remote_name, Cancellable? cancellable = null) {
base(remote_name);
this.cancellable = cancellable ?? new Cancellable();
}
public void cancel() {
cancellable.cancel();
}
public override async void disconnect() throws IOError {
disconnected = true;
cancel();
if (writer == null || reader == null || stream == null) {
throw new IOError.CLOSED("trying to disconnect, but no stream open");
}
log.str("OUT", "</stream:stream>", this);
yield writer.write("</stream:stream>", Priority.LOW, new Cancellable());
yield stream.close_async();
}
public void reset_stream(IOStream stream) {
this.stream = stream;
reader = new StanzaReader.for_stream(stream.input_stream, cancellable);
writer = new StanzaWriter.for_stream(stream.output_stream, cancellable);
require_setup();
}
public override async StanzaNode read() throws IOError {
StanzaReader? reader = this.reader;
if (reader == null) throw new IOError.NOT_CONNECTED("trying to read, but no stream open");
StanzaNode node = yield ((!)reader).read_node();
log.node("IN", node, this);
return node;
}
[Version (deprecated = true, deprecated_since = "0.1", replacement = "write_async")]
public override void write(StanzaNode node, int io_priority = Priority.DEFAULT) {
write_async.begin(node, io_priority, null, (obj, res) => {
try {
write_async.end(res);
} catch (Error e) {
warning("Error while writing: %s", e.message);
}
});
}
public override async void write_async(StanzaNode node, int io_priority = Priority.DEFAULT, Cancellable? cancellable = null) throws IOError {
if (write_obj != null) {
yield write_obj.write_stanza(this, node, io_priority, cancellable ?? this.cancellable);
} else {
StanzaWriter? writer = this.writer;
if (writer == null) throw new IOError.NOT_CONNECTED("trying to write, but no stream open");
log.node("OUT", node, this);
yield ((!)writer).write_node(node, io_priority, cancellable ?? this.cancellable);
}
}
internal IOStream? get_stream() {
return stream;
}
public override async void setup() throws IOError {
StanzaNode outs = new StanzaNode.build("stream", "http://etherx.jabber.org/streams")
.put_attribute("to", remote_name.to_string())
.put_attribute("version", "1.0")
.put_attribute("xmlns", "jabber:client")
.put_attribute("stream", "http://etherx.jabber.org/streams", XMLNS_URI);
outs.has_nodes = true;
log.node("OUT ROOT", outs, this);
yield write_async(outs, Priority.HIGH, cancellable);
received_root_node(this, yield read_root());
setup_needed = false;
}
private async StanzaNode read_root() throws IOError {
StanzaReader? reader = this.reader;
if (reader == null) throw new IOError.NOT_CONNECTED("trying to read, but no stream open");
StanzaNode node = yield ((!)reader).read_root_node();
log.node("IN ROOT", node, this);
return node;
}
}
|