使⽤ActiveMQ实现简易聊天功能
⼀什么是消息队列
我们可以把消息队列⽐作是⼀个存放消息的容器,当我们需要使⽤消息的时候可以取出消息供⾃⼰使⽤。消息队列是分布式系统中重要的组件,使⽤消息队列主要是为了通过异步处理提⾼系统性能和削峰、降低系统耦合性。⽬前使⽤较多的消息队列有
ActiveMQ,RabbitMQ,Kafka,RocketMQ
⼆为什么要⽤消息队列
使⽤消息队列主要有两点好处:
1.通过异步处理提⾼系统性能(削峰、减少响应所需时间);
2.降低系统耦合性。如果在⾯试的时候你被⾯试官问到这个问题的话,⼀般情况是你在你的简历上涉及到
消息队列这⽅⾯的内容,这个时候推荐你结合你⾃⼰的项⽬来回答。
ActiveMQ 是基于 JMS 规范实现的。
JMS消息列队有两种消息模式,⼀种是点对点的消息模式,还有⼀种是订阅的模式。
四实现
解压缩-bin.zip到⼀个⽬录
启动ActiveMQ:运⾏C:\ \bin\activemq.bat
使⽤点对点⽅式实现聊天功能
编写消息发送类和接收类。发送类中需要连接ActiveMQ 服务器,创建队列,发送消息;接收类中需要A
ctiveMQ 服务器,读取发送者发送消息所⽤的队列。接收类实现为⼀个单独的线程,使⽤模式,每隔⼀段时间侦听是否有消息到来,若有消息到来,将消息添加到辅助类消息列表中。
使⽤2个队列,即对于每⼀个⽤户来说,发送消息为⼀个队列,接受消息为⼀个队列。
效果如下:
1import org.apache.activemq.ActiveMQConnectionFactory;
2
3import javax.jms.*;
4
5import static java.lang.Thread.sleep;
6
7public class MessageReceiver implements Runnable{
9private String user;
10private String password;
11private final String QUEUE;
12private Boolean stop;
13    Connection connection;
14
15public MessageReceiver(String queue, String url, String user, String password) {
16this.url = url;
17this.user = user;
18this.password = password;
19this.QUEUE = queue;
20        stop = false;
21    }
22
23public void run() {
24        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(user, password, url); 25try {
26            connection = ateConnection();
27            Session session = ateSession(false, Session.AUTO_ACKNOWLEDGE);
28            Destination receiveQueue = ateQueue(QUEUE);
29            MessageConsumer consumer = ateConsumer(receiveQueue);
30            connection.start();
31while(!stop) {
32                consumer.setMessageListener(new MessageListener() {
33                    @Override
34public void onMessage(Message message) {
35try {
36//获取到接收的数据
37                            String text = ((TextMessage) message).getText();
38                            MessageText.setMsg(text);
39                        } catch (JMSException e) {
40                            e.printStackTrace();
41                        }
42                    }
43                });
44                sleep(500);
45            }
46        } catch (JMSException e) {
47            e.printStackTrace();
48        }catch (InterruptedException e) {
49//Thread.currentThread().interrupt();
50            e.printStackTrace();
51        }
52    }
53
54public void setStop(Boolean stop) {
55this.stop = stop;
56    }
57
58public void closeConnection(){
59try {
60            connection.close();
61        } catch (JMSException e) {
62            e.printStackTrace();
63        }
64    }
65
66public String getUrl() {
67return url;
68    }
69
70public void setUrl(String url) {
71this.url = url;
72    }
73
74public String getUser() {
75return user;
78public void setUser(String user) {
79this.user = user;
80    }
81
82public String getPassword() {
83return password;
84    }
85
86public void setPassword(String password) {
87this.password = password;
88    }
89 }
MessageReceiver
1import org.apache.activemq.ActiveMQConnectionFactory;
2
3import javax.jms.*;
DateFormat;
SimpleDateFormat;
6import java.util.Date;
7
8public class MessageSender {
9private String url;
10private String user;
11private String password;
12private final String QUEUE;
13private Connection connection;
14private Session session;
15private Destination sendQueue;
16private MessageProducer sender;
17private TextMessage outMessage;
18private DateFormat df;
19
20public MessageSender(String queue, String url, String user, String password) {
21this.url = url;
22this.user = user;
23this.password = password;
24this.QUEUE = queue;
25    }
26
27public void init() {
28        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(user, password, url);
29        df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
30try {
31            connection = ateConnection();
32            connection.start();
33            session = ateSession(true, Session.AUTO_ACKNOWLEDGE);
34            sendQueue = ateQueue(QUEUE);
35            sender = ateProducer(sendQueue);
36            outMessage = ateTextMessage();
37        } catch (JMSException e) {
38            e.printStackTrace();
39        }
40    }
41
42public void sendMessage(String messageStr) {
43try {
44            outMessage = ateTextMessage();
45            String sendStr = df.format(new Date()) + "\n" + QUEUE + ": " + messageStr;
46            outMessage.setText(sendStr);
47            sender.send(outMessage);
48            sessionmit();
49            MessageText.setMsg(sendStr);
50        } catch (JMSException e) {
51            e.printStackTrace();
54
55public void closeConnection() {
56try {
57            sender.close();
58            connection.close();
59        } catch (JMSException e) {
60            e.printStackTrace();
61        }
62    }
63
64public String getUrl() {
65return url;
66    }
67
activemq启动报错68public void setUrl(String url) {
69this.url = url;
70    }
71
72public String getUser() {
73return user;
74    }
75
76public void setUser(String user) {
77this.user = user;
78    }
79
80public String getPassword() {
81return password;
82    }
83
84public void setPassword(String password) { 85this.password = password;
86    }
87 }
MessageSender