更新時(shí)間:2021-07-28 17:08:12 來(lái)源:動(dòng)力節(jié)點(diǎn) 瀏覽1812次
消息事務(wù)
消息事務(wù),是保證消息傳遞原子性的一個(gè)重要特征,和JDBC的事務(wù)特征類(lèi)似。
一個(gè)事務(wù)性發(fā)送,其中一組消息要么能夠全部保證到達(dá)服務(wù)器,要么都不到達(dá)服務(wù)器。
生產(chǎn)者、消費(fèi)者與消息服務(wù)器直接都支持事務(wù)性;
ActiveMQ的事務(wù)主要偏向在生產(chǎn)者的應(yīng)用。
ActiveMQ消息事務(wù)流程圖:
沒(méi)有加入事務(wù)的時(shí)候,會(huì)有部分信息過(guò)去,結(jié)果如圖:
方式一:
/**
* 事務(wù)性發(fā)送--方案一
*/
@Test
public void sendMessageTx(){
//獲取連接工廠(chǎng)
ConnectionFactory connectionFactory = jmsMessagingTemplate.getConnectionFactory();
Session session = null;
try {
//創(chuàng)建連接
Connection connection = connectionFactory.createConnection();
/**
* 參數(shù)一:是否開(kāi)啟消息事務(wù)
*/
session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
//創(chuàng)建生產(chǎn)者
MessageProducer producer = session.createProducer(session.createQueue(name));
for(int i=1;i<=10;i++){
//模擬異常
if(i==4){
int a = 10/0;
}
TextMessage textMessage = session.createTextMessage("消息--" + i);
producer.send(textMessage);
}
//注意:一旦開(kāi)啟事務(wù)發(fā)送,那么就必須使用commit方法進(jìn)行事務(wù)提交,否則消息無(wú)法到達(dá)MQ服務(wù)器
session.commit();
} catch (JMSException e) {
e.printStackTrace();
//消息事務(wù)回滾
try {
session.rollback();
} catch (JMSException e1) {
e1.printStackTrace();
}
}
}
結(jié)果,沒(méi)有發(fā)送出去
方式二:
/**
* ActiveMQ配置類(lèi)
*/
@Configuration
public class ActiveMQConfig {
/**
* 添加Jms事務(wù)管理器
*/
@Bean
public PlatformTransactionManager createTransactionManager(ConnectionFactory connectionFactory){
return new JmsTransactionManager(connectionFactory);
}
}
/**
* 消息發(fā)送的業(yè)務(wù)類(lèi)
*/
@Service
public class MessageService {
@Autowired
private JmsMessagingTemplate jmsMessagingTemplate;
@Value("${activemq.name}")
private String name;
@Transactional // 對(duì)消息發(fā)送加入事務(wù)管理(同時(shí)也對(duì)JDBC數(shù)據(jù)庫(kù)的事務(wù)生效)
public void sendMessage(){
for(int i=1;i<=10;i++) {
//模擬異常
if(i==4){
int a = 10/0;
}
jmsMessagingTemplate.convertAndSend(name, "消息---"+i);
}
}
}
/**
* 用于監(jiān)聽(tīng)消息類(lèi)(既可以用于隊(duì)列的監(jiān)聽(tīng),也可以用于主題監(jiān)聽(tīng))
*/
@Component // 放入IOC容器
public class MsgListener {
/**
* 接收TextMessage的方法
*/
@JmsListener(destination = "${activemq.name}")
public void receiveMessage(Message message,Session session){
if(message instanceof TextMessage){
TextMessage textMessage = (TextMessage)message;
try {
System.out.println("接收消息:"+textMessage.getText());
int i=10/0;
//提交事務(wù)
session.commit();
} catch (JMSException e) {
e.printStackTrace();
//回滾事務(wù)
try {
session.rollback();//一旦事務(wù)回滾,MQ會(huì)重發(fā)消息,一共重發(fā)6次
} catch (JMSException e1) {
e1.printStackTrace();
}
}
}
}
}
注意如果在消費(fèi)者異常了,會(huì)收到消息,然后重發(fā)6次,要是期間還是異常,就會(huì)到私信隊(duì)列中
以上就是動(dòng)力節(jié)點(diǎn)小編介紹的"ActiveMQ的消息事務(wù)",希望對(duì)大家有幫助,想了解更多可查看ActiveMQ教程。動(dòng)力節(jié)點(diǎn)在線(xiàn)學(xué)習(xí)教程,針對(duì)沒(méi)有任何Java基礎(chǔ)的讀者學(xué)習(xí),讓你從入門(mén)到精通,主要介紹了一些Java基礎(chǔ)的核心知識(shí),讓同學(xué)們更好更方便的學(xué)習(xí)和了解Java編程,感興趣的同學(xué)可以關(guān)注一下。
相關(guān)閱讀
0基礎(chǔ) 0學(xué)費(fèi) 15天面授
有基礎(chǔ) 直達(dá)就業(yè)
業(yè)余時(shí)間 高薪轉(zhuǎn)行
工作1~3年,加薪神器
工作3~5年,晉升架構(gòu)
提交申請(qǐng)后,顧問(wèn)老師會(huì)電話(huà)與您溝通安排學(xué)習(xí)