当前位置: 首页 > news >正文

Android实现Socket通信

问题:

我在Android端对接后端Socket服务实现消息模块的时候,使用WebSocketClient 库,手动构造了订阅和发送消息的 STOMP 帧,,Socket连接成功建立,但是当用户发送消息的时候,对方无法接受到。

然后我Web端使用了成熟的 STOMP 库(Stomp.over 和 SockJS),所有接口测试通过。

排查经历:

  1. 起初我以为是在安卓端手动构造了订阅和发送消息的 STOMP 帧的格式问题,后来经过抓包排查,没有问题
  2. 然后我以为是,由于Android虚拟机和我手机调试网络的原因,我在本地创建了两个虚拟机,运行项目发现还是消息发送成功,推送失败。
  3. 因为问题定位到推送消息的步骤失败,后端采用的是Spring WebSocket的方法messagingTemplate.convertAndSendToUser(),我开始研究这个方法的底层逻辑,并在网上查找资料,发现有的人因为Header构建的问题导致推送消息失效,我又开始了更改调试,没有解决。
  4. 因为我Web端是正常运行的,Android是失败的,所以我使用Wireshark开始抓包对比,到底是哪个步骤出现了问题。

失败案例抓包数据:

在这里插入图片描述

成功案例抓包数据:

在这里插入图片描述

Socket服务器:

Socket建立连接序列图

在这里插入图片描述

问题原因:

web端的代码使用了成熟的 STOMP 库(Stomp.over 和 SockJS),它在建立连接时会自动发送 CONNECT 帧,完成 STOMP 握手;而你 Android 端的代码则是直接用 WebSocketClient 建立连接,并手动构造了订阅和发送消息的 STOMP 帧,但没有看到发送 CONNECT 帧的过程。

  1. 缺少 CONNECT 握手
    STOMP 协议要求客户端在发送其他 STOMP 帧(例如 SUBSCRIBE 或 SEND)之前,先发送 CONNECT 帧并等待服务器返回 CONNECTED 帧。如果没有发送 CONNECT 帧,服务器可能会忽略你后续发送的订阅或消息帧,从而只看到 ACK 包(底层 TCP 的 ACK 应答),而没有实际的消息推送。

解决方案:

  1. 发送 CONNECT 帧
    在 WebSocket 连接成功(onOpen 回调)后,尽快构造并发送一个标准的 CONNECT 帧。例如,在 Android 端的 onOpen 方法中添加如下代码:
java复制编辑
@Override
public void onOpen(ServerHandshake handshakedata) {Log.d(TAG, "WebSocket连接成功");// 发送 CONNECT 帧,指定支持的 STOMP 版本和心跳StringBuilder connectFrame = new StringBuilder();connectFrame.append("CONNECT\n");connectFrame.append("accept-version:1.2\n");connectFrame.append("heart-beat:10000,10000\n");connectFrame.append("satoken:").append(token).append("\n");connectFrame.append("\n"); // 空行分隔头部和正文connectFrame.append("\0"); // STOMP帧结束符client.send(connectFrame.toString());connectionState.postValue(ConnectionState.CONNECTED);reconnectAttempts = 0;startHeartbeat();// 在收到服务器的 CONNECTED 帧之后再订阅个人消息通道// 你可以在 onMessage 中判断收到 CONNECTED 后再调用 subscribeToPersonalMessages()// 这里为了测试可以先延时一会儿再订阅new Handler(Looper.getMainLooper()).postDelayed(() -> subscribeToPersonalMessages(), 500);showToast("WebSocket连接成功");
}

这样确保服务器能够正确识别你的 STOMP 客户端,处理后续的订阅和消息发送。

完整Android端代码

package com.xjl.mobilehotel.websocket;import android.content.Context;
import android.content.Intent;
import android.os.Handler;
import android.os.Looper;
import android.util.Log;
import android.widget.Toast;import androidx.lifecycle.LiveData;
import androidx.lifecycle.MutableLiveData;import com.google.gson.Gson;
import com.xjl.mobilehotel.model.MessageChatBO;
import com.xjl.mobilehotel.model.MessageConversationBO;
import com.xjl.mobilehotel.model.MessageDTO;
import com.xjl.mobilehotel.model.config.MessageSessionTypeEnum;
import com.xjl.mobilehotel.model.config.MessageStatus;
import com.xjl.mobilehotel.model.config.MessageTypeEnum;
import com.xjl.mobilehotel.ui.auth.LoginActivity;
import com.xjl.mobilehotel.utils.SharedPreferencesManager;import org.java_websocket.client.WebSocketClient;
import org.java_websocket.drafts.Draft_6455;
import org.java_websocket.handshake.ServerHandshake;
import org.json.JSONObject;import java.net.URI;
import java.net.URISyntaxException;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.concurrent.TimeUnit;import io.reactivex.Observable;
import io.reactivex.android.schedulers.AndroidSchedulers;
import io.reactivex.disposables.CompositeDisposable;
import io.reactivex.disposables.Disposable;
import io.reactivex.schedulers.Schedulers;/*** WebSocket管理类*/
public class WebSocketManager {private static final String TAG = "WebSocketManager";// 服务器地址private static final String SERVER_URL = "ws://192.168.167.39:8008";private static final String WS_PATH = "/ws";// 心跳端点private static final String HEARTBEAT_ENDPOINT = "/socket/heartbeat";// 消息端点private static final String MESSAGE_ENDPOINT = "/socket/message/";// 个人消息订阅端点private static final String PERSONAL_MESSAGE_SUBSCRIPTION = "/user/%s/queue/message";// 最大重连次数private static final int MAX_RECONNECT_ATTEMPTS = 5;// 心跳间隔(毫秒)private static final long HEARTBEAT_INTERVAL = 10000;private static WebSocketManager instance;private WebSocketClient client;private Context context;private SharedPreferencesManager prefsManager;private final CompositeDisposable disposables = new CompositeDisposable();private Disposable heartbeatDisposable;private int reconnectAttempts = 0;private boolean intentionalClose = false;private Handler mainHandler;private String token;private Gson gson;private boolean isSubscribed = false;// 连接状态private final MutableLiveData<ConnectionState> connectionState = new MutableLiveData<>(ConnectionState.DISCONNECTED);// 接收到的消息private final MutableLiveData<MessageDTO> receivedMessage = new MutableLiveData<>();private WebSocketManager() {// 私有构造方法mainHandler = new Handler(Looper.getMainLooper());gson = new Gson();}/*** 获取单例实例*/public static synchronized WebSocketManager getInstance() {if (instance == null) {instance = new WebSocketManager();}return instance;}/*** 初始化*/public void init(Context context) {this.context = context.getApplicationContext();this.prefsManager = SharedPreferencesManager.getInstance(context);}/*** 获取连接状态*/public LiveData<ConnectionState> getConnectionState() {return connectionState;}/*** 获取接收到的消息*/public LiveData<MessageDTO> getReceivedMessage() {return receivedMessage;}/*** 连接WebSocket*/public void connect() {// 检查是否已登录token = prefsManager.getToken();if (token == null || token.isEmpty()) {Log.e(TAG, "用户未登录,无法连接WebSocket");showToast("用户未登录,无法连接WebSocket");// 跳转到登录界面Intent intent = new Intent(context, LoginActivity.class);intent.addFlags(Intent.FLAG_ACTIVITY_NEW_TASK);context.startActivity(intent);return;}// 如果已经连接,先断开disconnect();try {// 使用HTTP URL,SockJS会自动处理协议转换String url = SERVER_URL + WS_PATH + "?satoken=" + token;Log.d(TAG, "正在连接WebSocket: " + url);showToast("正在连接WebSocket服务器...");// 设置请求头Map<String, String> headers = new HashMap<>();headers.put("satoken", token);// 创建WebSocket客户端client = new WebSocketClient(new URI(url), new Draft_6455(), headers, 0) {@Overridepublic void onOpen(ServerHandshake handshakedata) {Log.d(TAG, "WebSocket连接成功");connectionState.postValue(ConnectionState.CONNECTED);reconnectAttempts = 0;startHeartbeat();// 连接成功后先发送 CONNECT 帧,完成 STOMP 握手sendConnectFrame();showToast("WebSocket连接成功, 正在发送CONNECT帧");}@Overridepublic void onMessage(String message) {Log.d(TAG, "收到WebSocket消息: " + message);// 打印消息的前100个字符String logMessage = message.length() > 100 ? message.substring(0, 100) + "..." : message;Log.d(TAG, "收到消息预览: " + logMessage);// 处理消息handleReceivedMessage(message);}@Overridepublic void onClose(int code, String reason, boolean remote) {Log.d(TAG, "WebSocket连接关闭: code=" + code + ", reason=" + reason + ", remote=" + remote);connectionState.postValue(ConnectionState.DISCONNECTED);stopHeartbeat();isSubscribed = false;if (remote && !intentionalClose) {showToast("WebSocket连接被服务器关闭: " + reason);}// 如果不是主动关闭,尝试重连if (!intentionalClose && reconnectAttempts < MAX_RECONNECT_ATTEMPTS) {reconnect();}}@Overridepublic void onError(Exception ex) {Log.e(TAG, "WebSocket连接错误", ex);connectionState.postValue(ConnectionState.ERROR);showToast("WebSocket连接失败: " + ex.getMessage());// 尝试重连if (reconnectAttempts < MAX_RECONNECT_ATTEMPTS) {reconnect();}}};// 连接connectionState.postValue(ConnectionState.CONNECTING);client.connect();} catch (URISyntaxException e) {Log.e(TAG, "WebSocket URI语法错误", e);connectionState.postValue(ConnectionState.ERROR);showToast("WebSocket URI语法错误: " + e.getMessage());}}/*** 断开WebSocket连接*/public void disconnect() {intentionalClose = true;if (client != null && client.isOpen()) {client.close();showToast("WebSocket连接已断开");}stopHeartbeat();connectionState.postValue(ConnectionState.DISCONNECTED);isSubscribed = false;}/*** 重连*/private void reconnect() {reconnectAttempts++;Log.d(TAG, "尝试重连WebSocket,第" + reconnectAttempts + "次");showToast("正在尝试重新连接WebSocket,第" + reconnectAttempts + "次");// 使用指数退避策略long delay = (long) Math.pow(2, reconnectAttempts) * 1000;Disposable disposable = Observable.timer(delay, TimeUnit.MILLISECONDS).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe(aLong -> {intentionalClose = false;connect();});disposables.add(disposable);}/*** 开始心跳*/private void startHeartbeat() {stopHeartbeat();heartbeatDisposable = Observable.interval(HEARTBEAT_INTERVAL, TimeUnit.MILLISECONDS).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe(aLong -> {if (client != null && client.isOpen()) {// 构建STOMP格式的心跳消息try {StringBuilder stompFrame = new StringBuilder();stompFrame.append("SEND\n");stompFrame.append("destination:").append(HEARTBEAT_ENDPOINT).append("\n");stompFrame.append("content-type:application/json;charset=UTF-8\n");stompFrame.append("satoken:").append(token).append("\n");stompFrame.append("\n"); // 空行分隔头部和正文// 使用空 JSON 对象作为消息体JSONObject heartbeatMessage = new JSONObject();stompFrame.append(heartbeatMessage.toString());stompFrame.append("\0"); // STOMP帧结束符client.send(stompFrame.toString());Log.d(TAG, "发送心跳到: " + HEARTBEAT_ENDPOINT);} catch (Exception e) {Log.e(TAG, "构建心跳消息失败", e);}}});disposables.add(heartbeatDisposable);}/*** 停止心跳*/private void stopHeartbeat() {if (heartbeatDisposable != null && !heartbeatDisposable.isDisposed()) {heartbeatDisposable.dispose();}}/*** 发送 CONNECT 帧以完成 STOMP 握手*/private void sendConnectFrame() {if (client != null && client.isOpen()) {StringBuilder connectFrame = new StringBuilder();connectFrame.append("CONNECT\n");connectFrame.append("accept-version:1.2\n");connectFrame.append("heart-beat:10000,10000\n");connectFrame.append("satoken:").append(token).append("\n");connectFrame.append("\n"); // 空行分隔头部和正文connectFrame.append("\0"); // STOMP帧结束符client.send(connectFrame.toString());Log.d(TAG, "发送CONNECT帧: " + connectFrame.toString());}}/*** 订阅个人消息通道*/private void subscribeToPersonalMessages() {if (client != null && client.isOpen()) {try {// 获取当前用户IDString userId = prefsManager.getUserId();if (userId == null || userId.isEmpty()) {Log.e(TAG, "用户ID为空,无法订阅消息");showToast("用户ID为空,无法订阅消息");return;}// 构建订阅消息StringBuilder subscribeMessage = new StringBuilder();subscribeMessage.append("SUBSCRIBE\n");subscribeMessage.append("id:sub-0\n");subscribeMessage.append("destination:").append(String.format(PERSONAL_MESSAGE_SUBSCRIPTION, userId)).append("\n");subscribeMessage.append("\n"); // 空行分隔头部和正文subscribeMessage.append("\0"); // STOMP帧结束符client.send(subscribeMessage.toString());Log.d(TAG, "发送订阅消息到: " + String.format(PERSONAL_MESSAGE_SUBSCRIPTION, userId));isSubscribed = true;} catch (Exception e) {Log.e(TAG, "发送订阅消息失败", e);}}}/*** 处理接收到的消息(支持 CONNECTED、MESSAGE、ERROR 三种STOMP消息)*/private void handleReceivedMessage(String message) {try {Log.d(TAG, "开始处理接收到的消息: " + message);if (message.startsWith("CONNECTED")) {Log.d(TAG, "收到 CONNECTED 帧,STOMP连接成功");// 连接成功后订阅个人消息通道if (!isSubscribed) {subscribeToPersonalMessages();}} else if (message.startsWith("MESSAGE")) {Log.d(TAG, "收到STOMP MESSAGE消息");int headerEnd = message.indexOf("\n\n");if (headerEnd != -1) {String body = message.substring(headerEnd + 2).replace("\0", "");Log.d(TAG, "收到消息体: " + body);try {MessageDTO messageDTO = gson.fromJson(body, MessageDTO.class);if (messageDTO != null && messageDTO.getMessage() != null) {// 更新消息状态为已接收messageDTO.getMessage().setStatus(MessageStatus.UNREAD);Log.d(TAG, "解析消息成功: " + messageDTO);// 在主线程上发送消息更新mainHandler.post(() -> {receivedMessage.setValue(messageDTO);showToast("收到来自 " + messageDTO.getMessage().getSender() + " 的消息: " + messageDTO.getMessage().getContent());});} else {Log.e(TAG, "解析消息失败或消息内容为空");}} catch (Exception e) {Log.e(TAG, "解析JSON消息体失败", e);e.printStackTrace();}} else {Log.e(TAG, "无法找到消息体");}} else if (message.startsWith("ERROR")) {Log.e(TAG, "收到STOMP ERROR消息: " + message);int headerEnd = message.indexOf("\n\n");if (headerEnd != -1) {String errorBody = message.substring(headerEnd + 2).replace("\0", "");Log.e(TAG, "STOMP错误消息体: " + errorBody);showToast("WebSocket错误: " + errorBody);}} else {Log.d(TAG, "收到其他类型的消息: " + message);}} catch (Exception e) {Log.e(TAG, "处理接收消息失败", e);e.printStackTrace();}}/*** 发送聊天消息* @param content 消息内容* @param receiver 接收者ID* @param conversationId 会话ID* @return 发送的消息对象*/public MessageDTO sendChatMessage(String content, String receiver, String conversationId) {if (client == null || !client.isOpen()) {Log.e(TAG, "WebSocket未连接,无法发送消息");showToast("WebSocket未连接,无法发送消息");return null;}if (token == null || token.isEmpty()) {Log.e(TAG, "用户未登录,无法发送消息");showToast("用户未登录,无法发送消息");return null;}try {// 获取当前用户IDString userId = prefsManager.getUserId();if (userId == null || userId.isEmpty()) {Log.e(TAG, "用户ID为空,无法发送消息");showToast("用户ID为空,无法发送消息");return null;}// 获取当前时间String currentTime = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss", Locale.getDefault()).format(new Date());// 创建消息对象 - 消息ID由后端生成MessageChatBO messageChatBO = MessageChatBO.builder().content(content).type(MessageTypeEnum.TEXT) // 只支持TEXT类型.sender(userId).receiver(receiver).sendTime(currentTime).status(MessageStatus.SENDING).build();// 创建会话参与者列表List<String> participants = new ArrayList<>();participants.add(userId);participants.add(receiver);// 创建会话对象 - 使用当前会话ID,不设置lastMessageIdMessageConversationBO conversationBO = MessageConversationBO.builder().id(conversationId).participantIds(participants).lastUpdateTime(currentTime).unreadCount(0).sessionType(MessageSessionTypeEnum.USER).build();// 创建消息DTOMessageDTO messageDTO = new MessageDTO(messageChatBO, conversationBO);// 构建STOMP格式的消息StringBuilder stompFrame = new StringBuilder();stompFrame.append("SEND\n");stompFrame.append("destination:").append(MESSAGE_ENDPOINT).append(receiver).append("\n");stompFrame.append("content-type:application/json;charset=UTF-8\n");stompFrame.append("satoken:").append(token).append("\n");stompFrame.append("\n"); // 空行分隔头部和正文// 将消息对象转换为JSONString messageJson = gson.toJson(messageDTO);stompFrame.append(messageJson);stompFrame.append("\0"); // STOMP帧结束符client.send(stompFrame.toString());Log.d(TAG, "发送消息到: " + MESSAGE_ENDPOINT + receiver + ", 内容: " + messageJson);return messageDTO;} catch (Exception e) {Log.e(TAG, "发送消息失败", e);showToast("发送消息失败: " + e.getMessage());return null;}}/*** 显示Toast提示*/private void showToast(final String message) {mainHandler.post(() -> Toast.makeText(context, message, Toast.LENGTH_SHORT).show());}/*** 释放资源*/public void release() {disconnect();disposables.clear();instance = null;}/*** 连接状态枚举*/public enum ConnectionState {DISCONNECTED,  // 未连接CONNECTING,    // 连接中CONNECTED,     // 已连接ERROR          // 错误}
}

http://www.mrgr.cn/news/94122.html

相关文章:

  • 利用selenium调用豆包进行自动化问答以及信息提取
  • tcc编译器教程6 进一步学习编译gmake源代码
  • go函数详解
  • 【Linux】线程池、单例模式、死锁
  • JVM内存结构笔记01-运行时数据区域
  • golang 高性能的 MySQL 数据导出
  • 下载以后各个软件或者服务器的启动与关闭
  • Docker安装RabbitMQ
  • Qt入门笔记
  • macOS 安装配置 iTerm2 记录
  • 蓝桥杯省赛真题C++B组2024-握手问题
  • MicroPython 智能硬件开发完整指南
  • 计算机三级网络技术备考(5)
  • 基于SpringBoot的“体育购物商城”的设计与实现(源码+数据库+文档+PPT)
  • 《苍穹外卖》SpringBoot后端开发项目核心知识点与常见问题整理(DAY1 to DAY3)
  • JVM内存结构笔记02-堆
  • 利用python生成excel中模板范围对应的shape文件
  • 【大模型统一集成项目】如何封装多个大模型 API 调用
  • [Ai 力扣题单] 数组基本操作篇 27/704
  • 考研数学复习之定积分定义求解数列极限(超详细教程)