aboutsummaryrefslogtreecommitdiff
path: root/xmpp-vala/src/module/xep/0047_in_band_bytestreams.vala
blob: 0e1dd6bef36bfc27f169b7ff9da88045fa50abf5 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
using Gee;
using Xmpp;
using Xmpp.Xep;

namespace Xmpp.Xep.InBandBytestreams {

private const string NS_URI = "http://jabber.org/protocol/ibb";
private const int SEQ_MODULUS = 65536;

public class Module : XmppStreamModule {
    public static Xmpp.ModuleIdentity<Module> IDENTITY = new Xmpp.ModuleIdentity<Module>(NS_URI, "0047_in_band_bytestreams");

    public override void attach(XmppStream stream) {
        stream.add_flag(new Flag());
    }
    public override void detach(XmppStream stream) { }

    public void on_iq_set(XmppStream stream, Iq.Stanza iq) {
        StanzaNode? data = iq.stanza.get_subnode("data", NS_URI);
        string? sid = data != null ? data.get_attribute("sid") : null;
        if (data == null || sid == null) {
            stream.get_module(Iq.Module.IDENTITY).send_iq(stream, new Iq.Stanza.error(iq, new ErrorStanza.bad_request("missing data node or sid")));
            return;
        }
        Connection? conn = stream.get_flag(Flag.IDENTITY).get_connection(sid);
        if (conn == null) {
            stream.get_module(Iq.Module.IDENTITY).send_iq(stream, new Iq.Stanza.error(iq, new ErrorStanza.item_not_found()));
            return;
        }

        int seq = data.get_attribute_int("seq");
        // TODO(hrxi): return an error on malformed base64 (need to do this
        // according to the xep)
        uint8[] content = Base64.decode(data.get_string_content());
        if (seq < 0 || seq != conn.remote_seq) {
            // TODO(hrxi): send an error and close the connection
            return;
        }
        conn.remote_seq = (conn.remote_seq + 1) % SEQ_MODULUS;

        stream.get_module(Iq.Module.IDENTITY).send_iq(stream, new Iq.Stanza.result(iq));
        conn.on_data(stream, content);
    }

    public override string get_ns() { return NS_URI; }
    public override string get_id() { return IDENTITY.id; }
}

public class Connection {
    // TODO(hrxi): implement half-open states
    public enum State {
        UNCONNECTED,
        CONNECTING,
        CONNECTED,
        DISCONNECTING,
        DISCONNECTED,
        ERROR,
    }
    State state = UNCONNECTED;
    Jid receiver_full_jid;
    public string sid { get; private set; }
    int block_size;
    int local_seq = 0;
    int remote_ack = 0;
    internal int remote_seq = 0;

    public signal void on_error(XmppStream stream, string error);
    public signal void on_data(XmppStream stream, uint8[] data);
    public signal void on_ready(XmppStream stream);

    public Connection(Jid receiver_full_jid, string sid, int block_size) {
        this.receiver_full_jid = receiver_full_jid;
        this.sid = sid;
        this.block_size = block_size;
    }

    public void connect(XmppStream stream) {
        assert(state == UNCONNECTED);
        state = CONNECTING;

        StanzaNode open = new StanzaNode.build("open", NS_URI)
            .add_self_xmlns()
            .put_attribute("block-size", block_size.to_string())
            .put_attribute("sid", sid);

        Iq.Stanza iq = new Iq.Stanza.set(open) { to=receiver_full_jid };
        stream.get_module(Iq.Module.IDENTITY).send_iq(stream, iq, (stream, iq) => {
            assert(state == CONNECTING);
            if (!iq.is_error()) {
                state = CONNECTED;
                stream.get_flag(Flag.IDENTITY).add_connection(this);
                on_ready(stream);
            } else {
                set_error(stream, "connection failed");
            }
        });
    }

    void set_error(XmppStream stream, string error) {
        // TODO(hrxi): Send disconnect?
        state = ERROR;
        on_error(stream, error);
    }

    public void send(XmppStream stream, uint8[] bytes) {
        assert(state == CONNECTED);
        // TODO(hrxi): rate-limiting/merging?
        int seq = local_seq;
        local_seq = (local_seq + 1) % SEQ_MODULUS;
        StanzaNode data = new StanzaNode.build("data", NS_URI)
            .add_self_xmlns()
            .put_attribute("sid", sid)
            .put_attribute("seq", seq.to_string())
            .put_node(new StanzaNode.text(Base64.encode(bytes)));
        Iq.Stanza iq = new Iq.Stanza.set(data) { to=receiver_full_jid };
        stream.get_module(Iq.Module.IDENTITY).send_iq(stream, iq, (stream, iq) => {
            if (iq.is_error()) {
                set_error(stream, "sending failed");
                return;
            }
            if (remote_ack != seq) {
                set_error(stream, "out of order acks");
                return;
            }
            remote_ack = (remote_ack + 1) % SEQ_MODULUS;
            if (local_seq == remote_ack) {
                on_ready(stream);
            }
        });
    }

    public void close(XmppStream stream) {
        assert(state == CONNECTED);
        state = DISCONNECTING;
        // TODO(hrxi): should not do this, might still receive data
        stream.get_flag(Flag.IDENTITY).remove_connection(this);
        StanzaNode close = new StanzaNode.build("close", NS_URI)
            .add_self_xmlns()
            .put_attribute("sid", sid);
        Iq.Stanza iq = new Iq.Stanza.set(close) { to=receiver_full_jid };
        stream.get_module(Iq.Module.IDENTITY).send_iq(stream, iq, (stream, iq) => {
            assert(state == DISCONNECTING);
            if (iq.is_error()) {
                set_error(stream, "disconnecting failed");
                return;
            }
            state = DISCONNECTED;
        });
    }
}


public class Flag : XmppStreamFlag {
    public static FlagIdentity<Flag> IDENTITY = new FlagIdentity<Flag>(NS_URI, "in_band_bytestreams");

    private HashMap<string, Connection> active = new HashMap<string, Connection>();

    public void add_connection(Connection conn) {
        active[conn.sid] = conn;
    }
    public Connection? get_connection(string sid) {
        return active.has_key(sid) ? active[sid] : null;
    }
    public void remove_connection(Connection conn) {
        active.unset(conn.sid);
    }

    public override string get_ns() { return NS_URI; }
    public override string get_id() { return IDENTITY.id; }
}

}