[关闭]
@Tyhj 2017-02-24T20:50:16.000000Z 字数 8015 阅读 2007

Socket之Mina服务器

Java


最近想搞一下后台,所以socket是肯定要看的,之前也看过一点点Mina,感觉还可以,所以现在来做一下,当然是做聊天服务器啦。

到现在发现做软件先想好,把文档写一下还是不错的,这样也挺轻松的。

原文地址:https://www.zybuluo.com/Tyhj/note/596142

项目地址:https://github.com/tyhjh/Mina

Mina服务器端关键代码

导入jar包:

  • mina-core-2.0.16.jar
  • slf4j-api-1.7.21.jar
  • slf4j-nop-1.7.21.jar
  • org.json.jar

开启服务:

  1. NioSocketAcceptor acceptor=new NioSocketAcceptor();
  2. acceptor.setHandler(new MySeverHandler());
  3. acceptor.getFilterChain().addLast("codec", new ProtocolCodecFilter(new MyTextLineFactory()));
  4. ////设置服务器空闲状态̬
  5. acceptor.getSessionConfig().setIdleTime(IdleStatus.BOTH_IDLE, 30);
  6. acceptor.bind(new InetSocketAddress(9897));
  7. acceptor.getManagedSessions();

监听消息类

可以自己设定逻辑,重点在于,客户端第一次建立连接的时候,要发送一个设置自己的身份,标识的信息,然后在服务器端接收到,就给每个IoSession设置身份,然后就好办了。

客户端建立连接的时候发送一个身份给服务器

  1. @Override
  2. public void sessionCreated(IoSession session) throws Exception {
  3. // TODO Auto-generated method stub
  4. System.out.println("已建立连接" + session.getRemoteAddress());
  5. session.write(getJson.getMsg(id, null, null, null));
  6. }

服务器接收到消息,判断该IoSession有没有设置身份,没有就设置,否则就进行消息传送

  1. JSONObject jsonObject=new JSONObject((String)message);
  2. String from=(String) session.getAttribute("from","xx");
  3. if(from.equals("xx")){
  4. from=jsonObject.getString("from");
  5. if(from!=null){
  6. session.setAttribute("from",from);
  7. }
  8. }else{
  9. //System.out.println("messageReceived:"+(String)message+session.getAttribute("from","xx"));
  10. //传达消息
  11. new Msg().send2One(session, jsonObject);
  12. }

接收到消息后解析一下,看一下是发给谁的,然后找到所有的IoSession,看看是发给哪一个,然后用这个IoSession发送出去
记得消息最后面要加换行符"\n"

  1. Collection<IoSession> sessions = session.getService().getManagedSessions().values();
  2. for (IoSession sess : sessions) {
  3. if (sess.getAttribute("from", "xx").equals(jsonObject.getString("to")))
  4. sess.write(jsonObject.toString()+"\n");
  5. }

服务器端监听消息类,还有几个什么格式转化的还是什么类,也是必须的,就懒得贴了

  1. public class MySeverHandler extends IoHandlerAdapter{
  2. @Override
  3. public void exceptionCaught(IoSession session, Throwable cause)
  4. throws Exception {
  5. System.out.println("exceptionCaught"+cause.getMessage());
  6. }
  7. //收到消息
  8. @Override
  9. public void messageReceived(IoSession session, Object message)
  10. throws Exception {
  11. JSONObject jsonObject=new JSONObject((String)message);
  12. String from=(String) session.getAttribute("from","xx");
  13. if(from.equals("xx")){
  14. from=jsonObject.getString("from");
  15. if(from!=null){
  16. session.setAttribute("from",from);
  17. }
  18. }else{
  19. //System.out.println("messageReceived:"+(String)message+session.getAttribute("from","xx"));
  20. //传达消息
  21. new Msg().send2One(session, jsonObject);
  22. }
  23. }
  24. //发消息时候
  25. @Override
  26. public void messageSent(IoSession session, Object message) throws Exception {
  27. System.out.println("messageSent");
  28. }
  29. @Override
  30. public void sessionClosed(IoSession session) throws Exception {
  31. session.close();
  32. System.out.println("sessionClosed");
  33. }
  34. @Override
  35. public void sessionCreated(IoSession session) throws Exception {
  36. System.out.println("sessionCreated");
  37. }
  38. //客户端空闲时候
  39. @Override
  40. public void sessionIdle(IoSession session, IdleStatus status)
  41. throws Exception {
  42. //System.out.println("sessionIdle");
  43. }
  44. @Override
  45. public void sessionOpened(IoSession session) throws Exception {
  46. System.out.println("sessionOpened");
  47. }
  48. }

客户端关键代码

同样的那几个包导进来,然后就开始连接,连接需要,ip和端口,端口是刚才服务器那里自己设置的,我的是9897,然后连接完了得到了session,就可以用来发送消息了。同样有个消息监听类,就不贴了,

  1. //Create TCP/IP connection
  2. connector=new NioSocketConnector();
  3. //创建接受数据的过滤器
  4. DefaultIoFilterChainBuilder chain = connector.getFilterChain();
  5. //设定这个过滤器将一行一行(/r/n)的读取数据
  6. chain.addLast("myChin", new ProtocolCodecFilter(new TextLineCodecFactory()));
  7. MinaClientHandler minaClientHandler=new MinaClientHandler();
  8. minaClientHandler.setId(id);
  9. //客户端的消息处理器:一个SamplMinaServerHander对象
  10. connector.setHandler(minaClientHandler);
  11. //set connect timeout
  12. connector.setConnectTimeout(30);
  13. //连接到服务器:
  14. ConnectFuture cf = connector.connect(new InetSocketAddress(ip,port));
  15. //Wait for the connection attempt to be finished.
  16. cf.awaitUninterruptibly();
  17. session=cf.getSession();
  18. session.write("msg");

这样子搞,自己设计一下,就可以发送消息给指定的人,网上根本找不到,全是我连蒙带猜搞出来的,所以我不知道这方法是不是很好,能用就好。

总结

其实关键在于变量session,对于服务器:每个用户连接进来都是要新建一个session,通过它可以设置身份标示,可以获取所有的session,可以发送消息给这个session所代表的用户;对于客户端:可以用于发送请求和接收返回值。

一些问题

发送消息可以发送了,那离线消息肯定也要,就需要判断用户是否在线,一般的退出我们可以控制,可以告诉服务器我的账号离线了,但是突然断网这种怎么办呢。
其实在服务器的监听类中有个方法,messageSent();就是发送成功的时候有反馈,不成功的话就是对方离线了。

离线消息

messageSent()监听到消息发送成功的时候,我们获取消息的id,然后将数据库中的相应的m_isRead字段改为1即为已接收。

登陆反馈

我们需要知道登录注册是否成功,就是客户端接收到这些请求,在执行完成后发送消息给客户端,客户端接收到后把返回值保存,我用一个循环去读取这个值,当不为null,或者时间超出2000毫秒的时候返回。

断线重连

断线就是服务器和客户端的连接断了,可能是手机突然断网了,可能是服务器出问题了,或者其他,但是只要连接断了,客户端监听类里面的sessionClosed方法都会监听到,所以只要在里面写好重连方法就好了。

  1. @Override
  2. public void sessionClosed(final IoSession session) throws Exception {
  3. System.out.println("客户端与服务端断开连接.....");
  4. new Thread(new Runnable() {
  5. @Override
  6. public void run() {
  7. while (!connect.reconnect().isConnected()) {
  8. try {
  9. Thread.sleep(2000);
  10. System.out.println("正在尝试重新连接.....");
  11. } catch (InterruptedException e) {
  12. e.printStackTrace();
  13. }
  14. }
  15. }
  16. }).start();
  17. }

重连的时候需要重新连接到服务器并且重新设置自己的身份,而连接服务器时候一些变量已经初始化了,如果重复初始化会出现一些问题,会报错。之前初始化的方法,大概可以改成这个样子:

  1. if(session==null) {
  2. connector = new NioSocketConnector();
  3. //创建接受数据的过滤器
  4. DefaultIoFilterChainBuilder chain = connector.getFilterChain();
  5. //设定这个过滤器将一行一行(/r/n)的读取数据
  6. chain.addLast("myChin", new ProtocolCodecFilter(new TextLineCodecFactory()));
  7. minaClientHandler = new MinaClientHandler(this);
  8. //客户端的消息处理器:一个SamplMinaServerHander对象
  9. connector.setHandler(minaClientHandler);
  10. //set connect timeout
  11. connector.setConnectTimeout(5);
  12. }
  13. //连接到服务器:
  14. ConnectFuture cf = connector.connect(new InetSocketAddress(ip,port));
  15. //Wait for the connection attempt to be finished.
  16. cf.awaitUninterruptibly();
  17. try {
  18. session = cf.getSession();
  19. }catch (Exception e){
  20. System.out.println("服务器无响应");
  21. }
  22. } catch (Exception e) {
  23. e.printStackTrace();
  24. }

设置身份其实只要告诉服务器,我现在是重连操作,并且把我的身份标识放到session里面,然后把之前那个断线之前的session删除掉就好了。

  1. getInstance(ip,port);
  2. JSONObject jsonObject=new JSONObject();
  3. try {
  4. jsonObject.put("action","reconnect");
  5. jsonObject.put("id",u_id);
  6. session.write(jsonObject.toString());
  7. } catch (JSONException e) {
  8. e.printStackTrace();
  9. }
  10. return session;

当然这样写,就是如果断线了会一直重新尝试连接直到连接上服务器为止,想改进的话可以通过监听网络变化来操作。

在使用mina框架时候,传输的数据太多的话会出错,因为数据长度有限制。
解决方法:

  1. //之前客户端的代码改一下
  2. TextLineCodecFactory textLineCodecFactory=new TextLineCodecFactory();
  3. //在这里设置限制长度
  4. textLineCodecFactory.setDecoderMaxLineLength(102400);
  5. textLineCodecFactory.setEncoderMaxLineLength(102400);
  6. //设定这个过滤器将一行一行(/r/n)的读取数据
  7. chain.addLast("myChin", new ProtocolCodecFilter(textLineCodecFactory));

数据库设计

User/用户信息

字段 数据类型 备注
u_id string 用户id
u_name string 昵称
u_pwd string 登录密码
u_email string 邮箱
head_image string 头像
signature string 签名
u_tags string 标签
sex string 性别
  1. create table user(
  2. u_id nvarchar(50) primary key,
  3. u_name nvarchar(50),
  4. u_pwd nvarchar(100),
  5. u_email nvarchar(100),
  6. u_tags text,
  7. head_image nvarchar(100),
  8. signature nvarchar(100),
  9. sex nvarchar(50)
  10. )default charset=utf8;

Msg/消息记录

字段 数据类型 备注
m_id string id
m_belong string 归属
m_msg string/json 信息内容
m_to string 接收者
m_date DATETIME 发送时间
m_isRead int 是否接收
  1. create table msg(
  2. m_id nvarchar(100) primary key,
  3. m_belong nvarchar(100),
  4. m_msg text,
  5. m_to nvarchar(100),
  6. m_date datetime,
  7. m_isReceive integer
  8. )default charset=utf8;

Group_user/群

字段 数据类型 备注
g_id string 群id
u_id string 用户id

消息传送协议

基本聊天信息

  1. jsonObject.put("action",action);
  2. jsonObject.put("id",now+from+to);
  3. jsonObject.put("from",from);
  4. jsonObject.put("to",to);
  5. jsonObject.put("msg",msg);
  6. jsonObject.put("date",now.toString());
  7. msg
  8. jsonObject.put("msg",msg);
  9. jsonObject.put("type",type);
  10. jsonObject.put("length",length);
  11. {"msg":
  12. {"msg":"嗨","length":0,"type":0},
  13. "date":"2016-12-13 10:39:24.426",
  14. "action":"msgSent",
  15. "from":"Tyhj5",
  16. "id":"2016-12-13 10:39:24.426Tyhj5Tyhj",
  17. "to":"Tyhj"
  18. }

登陆信息

  1. jsonObject.put("action","signIn");
  2. jsonObject.put("email",email);
  3. jsonObject.put("pwd",pwd);
  4. {"action":"signIn","email":"Tyhj5","pwd":"4444"}

注册信息

  1. jsonObject.put("action","signUp")
  2. jsonObject.put("name",name)
  3. jsonObject.put("email",email)
  4. jsonObject.put("pwd",pwd);
  5. {"action":"signUp","name":"tyhj5","email":"Tyhj5","pwd":"4444"}

登陆注册,返回值

  1. jsonObject.put("code", code);
  2. jsonObject.put("action", action);
  3. jsonObject.put("msg", GetCode.getCode(code));
  4. {"msg":"邮箱已被注册","code":204,"action":"signUp"}

断线重连

  1. jsonObject.put("action","reconnect");
  2. jsonObject.put("id",u_id);
  3. {"action":"reconnect","id":"Tyhj"}

获取好友

  1. object2.put("u_id", rs.getString(1));
  2. object2.put("u_name", rs.getString(2));
  3. object2.put("head_image", rs.getString(6));
  4. object2.put("signature", rs.getString(7));
  5. {"msg":{"friends":[{"u_name":"i_tyhj@163.com","head_image":"null","u_id":"i_tyhj@163.com2016-12-19 17:28:25.641","signature":"null"}]},"code":200,"action":"getFriends"}

获取未读消息

  1. //请求
  2. jsonObject.put("action","getNewMsg");
  3. jsonObject.put("to",User.userInfo.getId());
  4. //返回值
添加新批注
在作者公开此批注前,只有你和作者可见。
回复批注