标签:android blog http io os ar java for strong
一、服务器,好了,废话不多说,我们先来看看服务器部分,我这里用到线程池,至于为什么用线程池,不知道的童鞋可以去我的另一篇blog看看:http://blog.csdn.net/weidi1989/article/details/7930820。当一个用户连接上之后,我们马上将该用户的socket丢入已经建好的线程池中去处理,这样可以很快腾出时间来接受下一个用户的连接,而线程池中的这个线程又分支为两个线程,一个是读消息线程,一个是写消息线程,当然,因为我这个聊天是用来转发消息的,所以还以单例模式建了一个Map用来存放每个用户的写消息线程(如果用户多的话,这是相当消耗资源的),以便在转发消息的时候,通过Map的key就可以取出对应用户的写消息线程,从而达到转发消息的目的。具体下面再说
- public class Server {
- private ExecutorService executorService;
- private ServerSocket serverSocket = null;
- private Socket socket = null;
- private boolean isStarted = true;
-
- public Server() {
- try {
-
- executorService = Executors.newFixedThreadPool(Runtime.getRuntime()
- .availableProcessors() * 50);
- serverSocket = new ServerSocket(Constants.SERVER_PORT);
- } catch (IOException e) {
- e.printStackTrace();
- quit();
- }
- }
-
- public void start() {
- System.out.println(MyDate.getDateCN() + " 服务器已启动...");
- try {
- while (isStarted) {
- socket = serverSocket.accept();
- String ip = socket.getInetAddress().toString();
- System.out.println(MyDate.getDateCN() + " 用户:" + ip + " 已建立连接");
-
- if (socket.isConnected())
- executorService.execute(new SocketTask(socket));
- }
- if (socket != null)
- socket.close();
- if (serverSocket != null)
- serverSocket.close();
- } catch (IOException e) {
- e.printStackTrace();
-
- }
- }
-
- private final class SocketTask implements Runnable {
- private Socket socket = null;
- private InputThread in;
- private OutputThread out;
- private OutputThreadMap map;
-
- public SocketTask(Socket socket) {
- this.socket = socket;
- map = OutputThreadMap.getInstance();
- }
-
- @Override
- public void run() {
- out = new OutputThread(socket, map);
-
- in = new InputThread(socket, out, map);
- out.setStart(true);
- in.setStart(true);
- in.start();
- out.start();
- }
- }
-
-
- public void quit() {
- try {
- this.isStarted = false;
- serverSocket.close();
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
-
- public static void main(String[] args) {
- new Server().start();
- }
- }
二、服务器写消息线程,接下来,我们来看看写消息线程,很简单的一段代码,有注释,我就不多说了:
- public class OutputThread extends Thread {
- private OutputThreadMap map;
- private ObjectOutputStream oos;
- private TranObject object;
- private boolean isStart = true;
- private Socket socket;
-
- public OutputThread(Socket socket, OutputThreadMap map) {
- try {
- this.socket = socket;
- this.map = map;
- oos = new ObjectOutputStream(socket.getOutputStream());
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
-
- public void setStart(boolean isStart) {
- this.isStart = isStart;
- }
-
-
- public void setMessage(TranObject object) {
- this.object = object;
- synchronized (this) {
- notify();
- }
- }
-
- @Override
- public void run() {
- try {
- while (isStart) {
-
- synchronized (this) {
- wait();
- }
- if (object != null) {
- oos.writeObject(object);
- oos.flush();
- }
- }
- if (oos != null)
- oos.close();
- if (socket != null)
- socket.close();
- } catch (InterruptedException e) {
- e.printStackTrace();
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
- }
三、服务器写消息线程缓存器,接下来让我们看一下那个写消息线程缓存器的庐山真面目:
- public class OutputThreadMap {
- private HashMap<Integer, OutputThread> map;
- private static OutputThreadMap instance;
-
-
- private OutputThreadMap() {
- map = new HashMap<Integer, OutputThread>();
- }
-
-
- public synchronized static OutputThreadMap getInstance() {
- if (instance == null) {
- instance = new OutputThreadMap();
- }
- return instance;
- }
-
-
- public synchronized void add(Integer id, OutputThread out) {
- map.put(id, out);
- }
-
-
- public synchronized void remove(Integer id) {
- map.remove(id);
- }
-
-
- public synchronized OutputThread getById(Integer id) {
- return map.get(id);
- }
-
-
- public synchronized List<OutputThread> getAll() {
- List<OutputThread> list = new ArrayList<OutputThread>();
- for (Map.Entry<Integer, OutputThread> entry : map.entrySet()) {
- list.add(entry.getValue());
- }
- return list;
- }
- }
四、服务器读消息线程,接下来是读消息线程,这里包括两个部分,一部分是读消息,另一部分是处理消息,我以分开的形式贴出代码,虽然我是写在一个类里面的:
- public class InputThread extends Thread {
- private Socket socket;
- private OutputThread out;
- private OutputThreadMap map;
- private ObjectInputStream ois;
- private boolean isStart = true;
-
- public InputThread(Socket socket, OutputThread out, OutputThreadMap map) {
- this.socket = socket;
- this.out = out;
- this.map = map;
- try {
- ois = new ObjectInputStream(socket.getInputStream());
- } catch (IOException e) {
- e.printStackTrace();
- }
-
- }
-
- public void setStart(boolean isStart) {
- this.isStart = isStart;
- }
-
- @Override
- public void run() {
- try {
- while (isStart) {
-
- readMessage();
- }
- if (ois != null)
- ois.close();
- if (socket != null)
- socket.close();
- } catch (ClassNotFoundException e) {
- e.printStackTrace();
- } catch (IOException e) {
- e.printStackTrace();
- }
-
- }
五、服务器消息处理,下面是处理消息的方法,由于比较麻烦以及各种纠结,我就与读消息线程分开贴,显得稍微简洁一点:
- public void readMessage() throws IOException, ClassNotFoundException {
- Object readObject = ois.readObject();
- UserDao dao = UserDaoFactory.getInstance();
- if (readObject != null && readObject instanceof TranObject) {
- TranObject read_tranObject = (TranObject) readObject;
- switch (read_tranObject.getType()) {
- case REGISTER:
- User registerUser = (User) read_tranObject.getObject();
- int registerResult = dao.register(registerUser);
- System.out.println(MyDate.getDateCN() + " 新用户注册:"
- + registerResult);
-
- TranObject<User> register2TranObject = new TranObject<User>(
- TranObjectType.REGISTER);
- User register2user = new User();
- register2user.setId(registerResult);
- register2TranObject.setObject(register2user);
- out.setMessage(register2TranObject);
- break;
- case LOGIN:
- User loginUser = (User) read_tranObject.getObject();
- ArrayList<User> list = dao.login(loginUser);
- TranObject<ArrayList<User>> login2Object = new TranObject<ArrayList<User>>(
- TranObjectType.LOGIN);
- if (list != null) {
- TranObject<User> onObject = new TranObject<User>(
- TranObjectType.LOGIN);
- User login2User = new User();
- login2User.setId(loginUser.getId());
- onObject.setObject(login2User);
- for (OutputThread onOut : map.getAll()) {
- onOut.setMessage(onObject);
- }
- map.add(loginUser.getId(), out);
- login2Object.setObject(list);
- } else {
- login2Object.setObject(null);
- }
- out.setMessage(login2Object);
-
- System.out.println(MyDate.getDateCN() + " 用户:"
- + loginUser.getId() + " 上线了");
- break;
- case LOGOUT:
- User logoutUser = (User) read_tranObject.getObject();
- int offId = logoutUser.getId();
- System.out
- .println(MyDate.getDateCN() + " 用户:" + offId + " 下线了");
- dao.logout(offId);
- isStart = false;
- map.remove(offId);
- out.setMessage(null);
- out.setStart(false);
-
- TranObject<User> offObject = new TranObject<User>(
- TranObjectType.LOGOUT);
- User logout2User = new User();
- logout2User.setId(logoutUser.getId());
- offObject.setObject(logout2User);
- for (OutputThread offOut : map.getAll()) {
- offOut.setMessage(offObject);
- }
- break;
- case MESSAGE:
-
- int id2 = read_tranObject.getToUser();
- OutputThread toOut = map.getById(id2);
- if (toOut != null) {
- toOut.setMessage(read_tranObject);
- } else {
- TextMessage text = new TextMessage();
- text.setMessage("亲!对方不在线哦,您的消息将暂时保存在服务器");
- TranObject<TextMessage> offText = new TranObject<TextMessage>(
- TranObjectType.MESSAGE);
- offText.setObject(text);
- offText.setFromUser(0);
- out.setMessage(offText);
- }
- break;
- case REFRESH:
- List<User> refreshList = dao.refresh(read_tranObject
- .getFromUser());
- TranObject<List<User>> refreshO = new TranObject<List<User>>(
- TranObjectType.REFRESH);
- refreshO.setObject(refreshList);
- out.setMessage(refreshO);
- break;
- default:
- break;
- }
- }
- }
好了,服务器的核心代码就这么一些了,很简单吧?是的,因为我们还有很多事情没有去做,比如说心跳监测用户是否一直在线,如果不在线,就释放资源等,这些都是商业项目中必须要考虑到的问题,至于这个通过心跳监测用户是否在线,我说说我的一些想法吧:由客户端定时给服务器发送一个心跳包(最好是空包,节约流量),服务器也定时去监测那个心跳包,如果有3次未收到客户端的心跳包,就判断该用户已经掉线,释放资源,至于这次数和时间间隔,就随情况而定了。如果有什么更好的其他建议,欢迎给我留言,谢谢。
六、消息传输对象,下面,我们来看看,这个超级消息对象和定义好的消息类型:
- public class TranObject<T> implements Serializable {
-
- private static final long serialVersionUID = 1L;
-
- private TranObjectType type;
-
- private int fromUser;
- private int toUser;
-
- private T object;
- private List<Integer> group;
-
- get...set...
- public enum TranObjectType {
- REGISTER,
- LOGIN,
- LOGOUT,
- FRIENDLOGIN,
- FRIENDLOGOUT,
- MESSAGE,
- UNCONNECTED,
- FILE,
- REFRESH,
- }
七、客户端,然后是客户端部分了,其实跟服务器差不多,只是没有建立线程池了,因为没有必要,是吧?然后实例化写线程和读线程没有先后顺序,这也勉强算一个区别吧~呵呵
- public class Client {
-
- private Socket client;
- private ClientThread clientThread;
- private String ip;
- private int port;
-
- public Client(String ip, int port) {
- this.ip = ip;
- this.port = port;
- }
-
- public boolean start() {
- try {
- client = new Socket();
-
-
- client.connect(new InetSocketAddress(ip, port), 3000);
- if (client.isConnected()) {
-
- clientThread = new ClientThread(client);
- clientThread.start();
- }
- } catch (IOException e) {
- e.printStackTrace();
- return false;
- }
- return true;
- }
-
-
- public ClientInputThread getClientInputThread() {
- return clientThread.getIn();
- }
-
-
- public ClientOutputThread getClientOutputThread() {
- return clientThread.getOut();
- }
-
-
- public void setIsStart(boolean isStart) {
- clientThread.getIn().setStart(isStart);
- clientThread.getOut().setStart(isStart);
- }
-
- public class ClientThread extends Thread {
-
- private ClientInputThread in;
- private ClientOutputThread out;
-
- public ClientThread(Socket socket) {
- in = new ClientInputThread(socket);
- out = new ClientOutputThread(socket);
- }
-
- public void run() {
- in.setStart(true);
- out.setStart(true);
- in.start();
- out.start();
- }
-
-
- public ClientInputThread getIn() {
- return in;
- }
-
-
- public ClientOutputThread getOut() {
- return out;
- }
- }
- }
八、客户端写消息线程,先看看客户端写消息线程吧:
- public class ClientOutputThread extends Thread {
- private Socket socket;
- private ObjectOutputStream oos;
- private boolean isStart = true;
- private TranObject msg;
-
- public ClientOutputThread(Socket socket) {
- this.socket = socket;
- try {
- oos = new ObjectOutputStream(socket.getOutputStream());
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
-
- public void setStart(boolean isStart) {
- this.isStart = isStart;
- }
-
-
- public void setMsg(TranObject msg) {
- this.msg = msg;
- synchronized (this) {
- notify();
- }
- }
-
- @Override
- public void run() {
- try {
- while (isStart) {
- if (msg != null) {
- oos.writeObject(msg);
- oos.flush();
- if (msg.getType() == TranObjectType.LOGOUT) {
- break;
- }
- synchronized (this) {
- wait();
- }
- }
- }
- oos.close();
- if (socket != null)
- socket.close();
- } catch (InterruptedException e) {
- e.printStackTrace();
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
-
- }
九、客户端读消息线程,然后是客户端读消息线程,这里又有一个要注意的地方,我们收到消息的时候,是不是要告诉用户?如何告诉呢?接口监听貌似是一个很好的办法,神马?不知道接口监听?你会用Android的setOnClickListener不?这就是android封装好的点击事件监听,不懂的话,可以好好看看,理解一下,其实也不难:
- public class ClientInputThread extends Thread {
- private Socket socket;
- private TranObject msg;
- private boolean isStart = true;
- private ObjectInputStream ois;
- private MessageListener messageListener;
-
- public ClientInputThread(Socket socket) {
- this.socket = socket;
- try {
- ois = new ObjectInputStream(socket.getInputStream());
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
-
-
- public void setMessageListener(MessageListener messageListener) {
- this.messageListener = messageListener;
- }
-
- public void setStart(boolean isStart) {
- this.isStart = isStart;
- }
-
- @Override
- public void run() {
- try {
- while (isStart) {
- msg = (TranObject) ois.readObject();
-
-
- messageListener.Message(msg);
- }
- ois.close();
- if (socket != null)
- socket.close();
- } catch (ClassNotFoundException e) {
- e.printStackTrace();
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
-
-
- public interface MessageListener {
- public void Message(TranObject msg);
- }
- }
QQ聊天
标签:android blog http io os ar java for strong
原文地址:http://www.cnblogs.com/dianfengwork/p/4037452.html