diff options
-rw-r--r-- | libdino/src/entity/message.vala | 1 | ||||
-rw-r--r-- | libdino/src/service/message_processor.vala | 28 | ||||
-rw-r--r-- | libdino/src/service/stream_interactor.vala | 5 | ||||
-rw-r--r-- | main/src/ui/conversation_content_view/message_widget.vala | 23 | ||||
-rw-r--r-- | xmpp-vala/src/core/xmpp_stream.vala | 2 | ||||
-rw-r--r-- | xmpp-vala/src/module/xep/0198_stream_management.vala | 14 |
6 files changed, 59 insertions, 14 deletions
diff --git a/libdino/src/entity/message.vala b/libdino/src/entity/message.vala index 6670ec5d..272905a9 100644 --- a/libdino/src/entity/message.vala +++ b/libdino/src/entity/message.vala @@ -15,6 +15,7 @@ public class Message : Object { ACKNOWLEDGED, UNSENT, WONTSEND, + SENDING, SENT } diff --git a/libdino/src/service/message_processor.vala b/libdino/src/service/message_processor.vala index c0fde767..e65d45ac 100644 --- a/libdino/src/service/message_processor.vala +++ b/libdino/src/service/message_processor.vala @@ -45,9 +45,7 @@ public class MessageProcessor : StreamInteractionModule, Object { stream_interactor.account_added.connect(on_account_added); - stream_interactor.connection_manager.connection_state_changed.connect((account, state) => { - if (state == ConnectionManager.ConnectionState.CONNECTED) send_unsent_chat_messages(account); - }); + stream_interactor.stream_negotiated.connect(send_unsent_chat_messages); stream_interactor.connection_manager.stream_opened.connect((account, stream) => { debug("MAM: [%s] Reset catchup_id", account.bare_jid.to_string()); @@ -69,6 +67,14 @@ public class MessageProcessor : StreamInteractionModule, Object { return message; } + private void convert_sending_to_unsent_msgs(Account account) { + db.message.update() + .with(db.message.account_id, "=", account.id) + .with(db.message.marked, "=", Message.Marked.SENDING) + .set(db.message.marked, Message.Marked.UNSENT) + .perform(); + } + private void send_unsent_chat_messages(Account account) { var select = db.message.select() .with(db.message.account_id, "=", account.id) @@ -91,7 +97,8 @@ public class MessageProcessor : StreamInteractionModule, Object { Message message = new Message.from_row(db, row); Conversation? msg_conv = stream_interactor.get_module(ConversationManager.IDENTITY).get_conversation(message.counterpart, account, Util.get_conversation_type_for_message(message)); if (msg_conv != null) { - send_xmpp_message(message, msg_conv, true); + Message cached_msg = stream_interactor.get_module(MessageStorage.IDENTITY).get_message_by_id(message.id, msg_conv); + send_xmpp_message(cached_msg ?? message, msg_conv, true); } } catch (InvalidJidError e) { warning("Ignoring message with invalid Jid: %s", e.message); @@ -132,6 +139,8 @@ public class MessageProcessor : StreamInteractionModule, Object { hitted_range[query_id] = -2; } }); + + convert_sending_to_unsent_msgs(account); } private async void do_mam_catchup(Account account) { @@ -601,7 +610,7 @@ public class MessageProcessor : StreamInteractionModule, Object { public void send_xmpp_message(Entities.Message message, Conversation conversation, bool delayed = false) { XmppStream stream = stream_interactor.get_stream(conversation.account); - message.marked = Entities.Message.Marked.NONE; + message.marked = Entities.Message.Marked.SENDING; if (stream == null) { message.marked = Entities.Message.Marked.UNSENT; @@ -638,7 +647,7 @@ public class MessageProcessor : StreamInteractionModule, Object { stream.get_module(MessageModule.IDENTITY).send_message.begin(stream, new_message, (_, res) => { try { stream.get_module(MessageModule.IDENTITY).send_message.end(res); - if (message.marked == Message.Marked.NONE/* && (yield stream.get_module(Xep.ServiceDiscovery.Module.IDENTITY).has_entity_feature(stream, conversation.account.bare_jid, Xep.UniqueStableStanzaIDs.NS_URI))*/) { + if (message.marked == Message.Marked.SENDING) { message.marked = Message.Marked.SENT; } @@ -649,6 +658,13 @@ public class MessageProcessor : StreamInteractionModule, Object { } } catch (IOStreamError e) { message.marked = Entities.Message.Marked.UNSENT; + + if (stream != stream_interactor.get_stream(conversation.account)) { + Timeout.add_seconds(3, () => { + send_xmpp_message(message, conversation, true); + return false; + }); + } } }); } diff --git a/libdino/src/service/stream_interactor.vala b/libdino/src/service/stream_interactor.vala index 1ace195d..db6d58d8 100644 --- a/libdino/src/service/stream_interactor.vala +++ b/libdino/src/service/stream_interactor.vala @@ -67,7 +67,10 @@ public class StreamInteractor : Object { private void on_stream_opened(Account account, XmppStream stream) { stream.stream_negotiated.connect( (stream) => { - stream_negotiated(account, stream); + var flag = stream.get_flag(Xep.StreamManagement.Flag.IDENTITY); + if (flag == null || flag.resumed == false) { + stream_negotiated(account, stream); + } }); } } diff --git a/main/src/ui/conversation_content_view/message_widget.vala b/main/src/ui/conversation_content_view/message_widget.vala index 13b48a1e..83d826e9 100644 --- a/main/src/ui/conversation_content_view/message_widget.vala +++ b/main/src/ui/conversation_content_view/message_widget.vala @@ -80,6 +80,7 @@ public class MessageItemWidget : SizeRequestBin { StreamInteractor stream_interactor; public ContentItem content_item; + public Message.Marked marked { get; set; } Label label = new Label("") { use_markup=true, xalign=0, selectable=true, wrap=true, wrap_mode=Pango.WrapMode.WORD_CHAR, vexpand=true, visible=true }; MessageItemEditMode? edit_mode = null; @@ -179,12 +180,30 @@ public class MessageItemWidget : SizeRequestBin { markup_text = @"<span size=\'$size_str\'>" + markup_text + "</span>"; } + string gray_color = Util.is_dark_theme(label) ? "#808080" : "#909090"; + if (message.edit_to != null) { - string color = Util.is_dark_theme(label) ? "#808080" : "#909090"; - markup_text += " <span size='small' color='%s'>(%s)</span>".printf(color, _("edited")); + markup_text += " <span size='small' color='%s'>(%s)</span>".printf(gray_color, _("edited")); theme_dependent = true; } + if (message.direction == Message.DIRECTION_SENT && (message.marked == Message.Marked.SENDING || message.marked == Message.Marked.UNSENT)) { + if (message.local_time.compare(new DateTime.now_utc().add_seconds(-10)) < 0) { + markup_text += " <span size='small' color='%s'>%s</span>".printf(gray_color, "pending…"); + + message.bind_property("marked", this, "marked"); + this.notify["marked"].connect(() => { + update_label(); + }); + } else { + int time_diff = (- (int) message.local_time.difference(new DateTime.now_utc()) / 1000); + Timeout.add(10000 - time_diff, () => { + update_label(); + return false; + }); + } + } + if (theme_dependent && realize_id == -1) { realize_id = label.realize.connect(update_label); style_updated_id = label.style_updated.connect(update_label); diff --git a/xmpp-vala/src/core/xmpp_stream.vala b/xmpp-vala/src/core/xmpp_stream.vala index f4cdf09a..7e7588b1 100644 --- a/xmpp-vala/src/core/xmpp_stream.vala +++ b/xmpp-vala/src/core/xmpp_stream.vala @@ -410,7 +410,7 @@ public class StartTlsConnectionProvider : ConnectionProvider { } public interface WriteNodeFunc : Object { - public abstract async void write_stanza(XmppStream stream, StanzaNode node) throws IOError; + public abstract async void write_stanza(XmppStream stream, StanzaNode node) throws IOStreamError; } } diff --git a/xmpp-vala/src/module/xep/0198_stream_management.vala b/xmpp-vala/src/module/xep/0198_stream_management.vala index c44e2c70..bc695ddb 100644 --- a/xmpp-vala/src/module/xep/0198_stream_management.vala +++ b/xmpp-vala/src/module/xep/0198_stream_management.vala @@ -25,14 +25,18 @@ public class Module : XmppStreamNegotiationModule, WriteNodeFunc { } } - public async void write_stanza(XmppStream stream, StanzaNode node) throws IOError { + public async void write_stanza(XmppStream stream, StanzaNode node) throws IOStreamError { if (stream.has_flag(Flag.IDENTITY)) { var promise = new Promise<IOError?>(); node_queue.add(new QueueItem(node, promise)); check_queue(stream); - yield promise.future.wait_async(); + try { + yield promise.future.wait_async(); + } catch (FutureError e) { + throw new IOStreamError.WRITE("Future returned error %i".printf(e.code)); + } } else { yield write_node(stream, node); } @@ -141,10 +145,11 @@ public class Module : XmppStreamNegotiationModule, WriteNodeFunc { flags = stream.flags; stream.write_obj = this; } else if (node.name == "resumed") { + stream.get_flag(Flag.IDENTITY).resumed = true; + foreach (XmppStreamFlag flag in flags) { stream.add_flag(flag); } - stream.negotiation_complete = true; h_outbound = int.parse(node.get_attribute("h", NS_URI)); handle_incoming_h(stream, h_outbound); @@ -158,7 +163,7 @@ public class Module : XmppStreamNegotiationModule, WriteNodeFunc { stream.received_features_node(stream); session_id = null; foreach (var id in in_flight_stanzas.keys) { - in_flight_stanzas[id].promise.set_exception(new IOError.FAILED("bla")); + in_flight_stanzas[id].promise.set_exception(new IOStreamError.WRITE("Stanza not acked and session not resumed")); } in_flight_stanzas.clear(); check_queue(stream); @@ -200,6 +205,7 @@ public class Module : XmppStreamNegotiationModule, WriteNodeFunc { public class Flag : XmppStreamFlag { public static FlagIdentity<Flag> IDENTITY = new FlagIdentity<Flag>(NS_URI, "stream_management"); public bool finished = false; + public bool resumed = false; public override string get_ns() { return NS_URI; } public override string get_id() { return IDENTITY.id; } |