大战熟女丰满人妻av-荡女精品导航-岛国aaaa级午夜福利片-岛国av动作片在线观看-岛国av无码免费无禁网站-岛国大片激情做爰视频

JMS&ActiveMQ教程
基于JMS的消息傳送
ActiveMQ與Spring集成
ActiveMQ與SpringBoot集成
ActiveMQ安全機制
ActiveMQ主從集群

ActiveMQ主從集群

什么是集群

集群就是將相同的程序、功能,部署在兩臺或多臺服務器上,這些服務器對外提供的功能是完全一樣的。

集群是通過不斷橫向擴展增加服務器的方式,以提高服務的能力。

為什么需要集群

● 集群可以解決單點故障問題

● 集群可以提高系統的可用性

● 集群可以提高系統的服務能力

ActiveMQ主從集群方式

1、shared filesystem Master-Slave方式主從集群

通過共享存儲目錄(kahaDB)來實現master和slave的主從信息同步;

所有ActiveMQ的broker都在不斷地獲取共享目錄的控制權,哪個broker搶到了控制權,它就成為master,它將鎖定該目錄,其他broker就只能成為slave。

當master主出現故障后,剩下的slave從將再進行爭奪共享目錄的控制權,誰搶到共享目錄的控制權,誰就成為主,其他沒有搶到控制權的稱為從。

由于他們是基于共享目錄,所以當主出現故障后,其上沒有被消費的消息在接下來產生的新的master主中可以繼續進行消費。

這種方式客戶端訪問的都是主,從只是起到了一個備份訪問的作用

(1) 架構圖

(2) 實現步驟

A、 安裝多個ActiveMQ

因為ActiveMQ的安裝和Tomcat一樣,解壓就可以使用,所以我們直接在/usr/local目錄下復制多份,就相當于安裝了多個ActiveMQ,我們這里復制3個ActiveMQ出來。

復制前,先將運行的ActiveMQ停止。

B、 打開三個Xshell,分別連接不同的ActiveMQ方便操作

C、 配置每個activeMQ的conf /activemq.xml文件中的共享目錄

如果集群搭建在一臺機器上需要改端口,如果搭建在多臺上就不需要了

如果搭建在多臺服務器上,那么存放共享目錄的機器需要通過磁盤掛載的方式掛載到主從機器上。

● 修改三個ActiveMQ的共享目錄

persistenceAdapter>
    <!--<kahaDB directory="${activemq.data}/kahadb"/>-->
    <kahaDB directory="/opt/kahadb"/>
</persistenceAdapter>

● 修改完持久化目錄后,需要在/opt目錄下創建該目錄

D、 配置每個activeMQ的conf /activemq.xml文件中的端口

為了避免端口號的沖突,前三個地址端口+1,后兩個端口地址-1,可以將文件下載下來替換。

一個ActiveMQ
<transportConnector name="openwire" uri="tcp://0.0.0.0:61617?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
<transportConnector name="amqp" uri="amqp://0.0.0.0:5673?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
<transportConnector name="stomp" uri="stomp://0.0.0.0:61614?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
<transportConnector name="mqtt" uri="mqtt://0.0.0.0:1882?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
<transportConnector name="ws" uri="ws://0.0.0.0:61613?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
第二個ActiveMQ
<transportConnector name="openwire" uri="tcp://0.0.0.0:61618?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
<transportConnector name="amqp" uri="amqp://0.0.0.0:5674?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
<transportConnector name="stomp" uri="stomp://0.0.0.0:61615?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
<transportConnector name="mqtt" uri="mqtt://0.0.0.0:1881?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
<transportConnector name="ws" uri="ws://0.0.0.0:61612?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
第三個ActiveMQ
<transportConnector name="openwire" uri="tcp://0.0.0.0:61619?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
<transportConnector name="amqp" uri="amqp://0.0.0.0:5675?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
<transportConnector name="stomp" uri="stomp://0.0.0.0:61616?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
<transportConnector name="mqtt" uri="mqtt://0.0.0.0:1880?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
<transportConnector name="ws" uri="ws://0.0.0.0:61611?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>

● maximumConnections 最大連接數;

● wireFormat.maxFrameSize 表示一個完整消息的最大數據量,單位byte;

● 0.0.0.0表示任意ip

E、 修改conf/jetty.xml文件的jetty服務器端口(管理控制臺)

第一個ActiveMQ
<bean id="jettyPort" class="org.apache.activemq.web.WebConsolePort" init-method="start">
    <!-- the default port number for the web console -->
    <property name="host" value="0.0.0.0"/>
    <property name="port" value="8162"/>
</bean>
第二個ActiveMQ
<bean id="jettyPort" class="org.apache.activemq.web.WebConsolePort" init-method="start">
    <!-- the default port number for the web console -->
    <property name="host" value="0.0.0.0"/>
    <property name="port" value="8163"/>
</bean>
第三個ActiveMQ
<bean id="jettyPort" class="org.apache.activemq.web.WebConsolePort" init-method="start">
    <!-- the default port number for the web console -->
    <property name="host" value="0.0.0.0"/>
    <property name="port" value="8164"/>
</bean>

F、 啟動三臺ActiveMQ,可以測試驗證了

注意:啟動后會有一段時間延時,稍等一會;

瀏覽器訪問http://192.168.235.128:8162/, http://192.168.235.128:8163/ ,http://192.168.235.128:8164/ 判斷主從服務器;

web控制臺能訪問的是 master,不能訪問的是 slave。

G、 修改11-activemq-java中的程序收發消息代碼

連接時使用故障轉義協議failover

failover:(tcp://192.168.235.128:61617,tcp://192.168.235.128:61618,tcp://192.168.235.128:61619)

修改BROKER_URL地址

public static final String BROKER_URL = "failover:(tcp://192.168.235.128:61617,tcp://192.168.235.128:61618,tcp://192.168.235.128:61619)";

 為了看到效果,發消息和接收消息我們都是用循環方式

//發消息 沒有返回值,是非阻塞的
while(true){
    messageProducer.send(message); 
}

查看發送以及接收idea控制臺輸出,停止ActiveMQ主,看效果

注意:如果是事務消息,被中斷那么程序發送程序出錯,不能實現,所以我們將消息改為非事務消息進行測試,如果是非事務消息就注釋掉session.commit。

2、shared database Master-Slave方式主從集群

該方式與shared filesystem方式類似,只是共享的存儲介質由文件系統改成了數據庫。

(1) 架構圖

(2) 實現步驟

A、 安裝多個ActiveMQ(已做)

因為ActiveMQ的安裝和Tomcat一樣,解壓就可以使用,所以我們直接在/usr/local目錄下復制多份,就相當于安裝了多個ActiveMQ,我們這里復制3個ActiveMQ出來復制前,先將運行的ActiveMQ停止。

B、 打開三個Xshell,分別連接不同的ActiveMQ方便操作(已做)

C、 配置每個activeMQ的conf /activemq.xml文件中的持久化適配器是jdbc數據庫方式

<persistenceAdapter>
    <jdbcPersistenceAdapter dataSource="#mysql-ds"/>
</persistenceAdapter>

D、 配置每個數據庫連接池

注意:連接池的配置需要配置在的外面

<bean id="mysql-ds" class="com.alibaba.druid.pool.DruidDataSource" destroy-method="close">
    <property name="driverClassName" value="com.mysql.jdbc.Driver"/>
    <property name="url" value="jdbc:mysql://localhost:3306/activemq?useUnicode=true&amp;characterEncoding=utf8&amp;useSSL=false"/>
    <property name="username" value="root"/>
    <property name="password" value="123456"/>
</bean>

E、 在每個ActiveMQ的lib目錄下加入mysql的驅動包和數據庫連接池Druid包,該包在我提供的資料05-ActiveMQ\resources\lib下

可以通過Xftp或者rz命令上傳。

F、 啟動MySQL數據庫,并創建activemq數據庫

G、 配置每個activeMQ的conf /activemq.xml文件中的端口(已做)

如果集群搭建在一臺機器上需要改端口,如果搭建在多臺上就不需要了;

為了避免端口號的沖突,前三個地址端口+1,后兩個端口地址-1。

第一個ActiveMQ
<transportConnector name="openwire" uri="tcp://0.0.0.0:61617?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
<transportConnector name="amqp" uri="amqp://0.0.0.0:5673?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
<transportConnector name="stomp" uri="stomp://0.0.0.0:61614?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
<transportConnector name="mqtt" uri="mqtt://0.0.0.0:1882?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
<transportConnector name="ws" uri="ws://0.0.0.0:61613?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
第二個ActiveMQ
<transportConnector name="openwire" uri="tcp://0.0.0.0:61618?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
<transportConnector name="amqp" uri="amqp://0.0.0.0:5674?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
<transportConnector name="stomp" uri="stomp://0.0.0.0:61615?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
<transportConnector name="mqtt" uri="mqtt://0.0.0.0:1881?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
<transportConnector name="ws" uri="ws://0.0.0.0:61612?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
第三個ActiveMQ
<transportConnector name="openwire" uri="tcp://0.0.0.0:61619?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
<transportConnector name="amqp" uri="amqp://0.0.0.0:5675?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
<transportConnector name="stomp" uri="stomp://0.0.0.0:61616?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
<transportConnector name="mqtt" uri="mqtt://0.0.0.0:1880?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
<transportConnector name="ws" uri="ws://0.0.0.0:61611?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>

● maximumConnections 最大連接數;

● wireFormat.maxFrameSize 表示一個完整消息的最大數據量,單位byte;

● 0.0.0.0表示任意ip

H、 修改conf/jetty.xml文件的jetty服務器端口(管理控制臺) (已做)

第一個ActiveMQ
<bean id="jettyPort" class="org.apache.activemq.web.WebConsolePort" init-method="start">
    <!-- the default port number for the web console -->
    <property name="host" value="0.0.0.0"/>
    <property name="port" value="8162"/>
</bean>
第二個ActiveMQ
<bean id="jettyPort" class="org.apache.activemq.web.WebConsolePort" init-method="start">
    <!-- the default port number for the web console -->
    <property name="host" value="0.0.0.0"/>
    <property name="port" value="8163"/>
</bean>
第三個ActiveMQ
<bean id="jettyPort" class="org.apache.activemq.web.WebConsolePort" init-method="start">
    <!-- the default port number for the web console -->
    <property name="host" value="0.0.0.0"/>
    <property name="port" value="8164"/>
</bean>

I、啟動三臺ActiveMQ,可以測試驗證了

瀏覽器訪問http://192.168.235.128:8162/, http://192.168.235.128:8163/ ,http://192.168.235.128:8164/ 判斷主從服務器;

web控制臺能訪問的是 master,不能訪問的是 slave。

J、 修改11-activemq-java中的程序收發消息類(已做)

連接時使用故障轉義協議failover

failover:(tcp://192.168.235.128:61617,tcp://192.168.235.128:61618,tcp://192.168.235.128:61619)

修改BROKER_URL地址 

public static final String BROKER_URL = "failover:(tcp://192.168.235.128:61617,tcp://192.168.235.128:61618,tcp://192.168.235.128:61619)";

為了看到效果,發消息和接收消息我們都是用循環方式

//發消息 沒有返回值,是非阻塞的
while(true){
    messageProducer.send(message);
    session.commit();
}

查看發送以及接收idea控制臺輸出,停止ActiveMQ主,看效果

注意:如果是事務消息,被中斷那么程序發送程序出錯,不能實現,所以我們將消息改為非事務消息進行測試,如果是非事務消息就注釋掉session.commit。

3、Replicated LevelDB Store方式主從集群(常用)

基于可復制的LevelDB存儲方式的集群;

這種集群方式是ActiveMQ5.9版本以后新增的特性,它使用ZooKeeper從一組broker中協調選擇一個broker作為master主,其他broker作為slave從的模式。所有slave從節點通過復制master主節點的消息來實現消息同步,當主出現故障后,沒有被消費的消息在從服務器上也同步了一份,所以不會有消息的丟失。

LevelDB 是 Google開發的一套用于持久化數據的高性能kv數據庫,ActiveMQ利用該數據庫進行數據的存儲。

只有master 接受客戶端連接,slave不接受客戶端連接,Master的所有存儲操作都將被復制到slaves。

在這個模式中,需要有半數以上的broker是正常的,集群才是可用的,超過半數broker故障,ZooKeeper的選舉算法將不能選擇master,從而導致集群不可用。

(1)架構圖

(2) 實現步驟

A、 安裝多個ActiveMQ(已做)

因為ActiveMQ的安裝和Tomcat一樣,解壓就可以使用,所以我們直接在/usr/local目錄下復制多份,就相當于安裝了多個ActiveMQ,我們這里復制3個ActiveMQ出來。

復制前,先將運行的ActiveMQ停止

B、 打開三個Xshell,分別連接不同的ActiveMQ方便操作(已做)

C、 配置每個activeMQ的conf /activemq.xml文件中的持久化適配器replicatedLevelDB方式

<persistenceAdapter>
<replicatedLevelDB
    replicas="3"
    bind="tcp://0.0.0.0:0"
    zkAddress="localhost:2181"/>
</persistenceAdapter>

參數說明

● replicas :集群中存在的節點的數目

● bind :當該節點成為master后,將使用該bind配置的ip和端口進行數據復制

● zkAddress :ZooKeeper的地址

D、 啟動ZooKeeper服務器

E、 啟動三臺ActiveMQ,可以測試驗證了

瀏覽器訪問http://192.168.235.128:8162/, http://192.168.235.128:8163/ ,http://192.168.235.128:8164/ 判斷主從服務器;

web控制臺能訪問的是 master,不能訪問的是 slave。

F、 修改11-activemq-java中的程序收發消息類(已做)

連接時使用故障轉義協議failover

ailover:(tcp://192.168.235.128:61617,tcp://192.168.235.128:61618,tcp://192.168.235.128:61619)

修改BROKER_URL地址

public static final String BROKER_URL = "failover:(tcp://192.168.235.128:61617,tcp://192.168.235.128:61618,tcp://192.168.235.128:61619)";

為了看到效果,發消息和接收消息我們都是用循環方式 

//發消息 沒有返回值,是非阻塞的
while(true){
    messageProducer.send(message);
}

查看發送以及接收idea控制臺輸出,停止ActiveMQ主,看效果。

G、 把其中的一臺master關閉,留下兩臺運行,觀察效果

H、 繼續關閉下一臺master,留下一臺運行,觀察效果

I、 啟動其中一臺,讓兩個運行,再觀察效果

(3) 總結

這種方式,不適合集群太大,也就是activemq不能太多,因為多個activemq之間需要復制消息,這個比較耗資源,占用網絡,建議3、5臺。

全部教程
主站蜘蛛池模板: 亚洲wuma| 欧美 xx性 在线 | 999精品久久久中文字幕蜜桃 | 天天操天天射天天操 | 亚洲综合图片人成综合网 | 精品欧美高清一区二区免费 | 手机看片日韩日韩国产在线看 | 伦理自拍| 亚洲综合在线另类色区奇米 | 爱爱免费播放视频在线观看 | 特级理论片| 国产精品久久久久免费a∨ 国产精品久久久久免费视频 | 四虎精品成人免费视频 | 久久精品国产6699国产精 | 一级女人18毛片免费 | 一级二级毛片 | 日日射影院 | 精品久久久中文字幕一区 | 99久久99这里只有免费费精品 | 一区二区高清视频 | 亚洲日本va中文字幕 | 国产人成久久久精品 | 久久国产视频网站 | 国产一区在线视频观看 | 久久成人免费播放网站 | 欧美午夜在线观看理论片 | 国产午夜精品不卡观看 | 青青青国产在线手机免费观看 | 四虎4hu影库免费永久国产 | 国产亚洲精品久久麻豆 | 亚洲日本aⅴ片在线观看香蕉 | 日韩欧美国产成人 | 真人一级一级特黄高清毛片 | 午夜精品久久久久久99热 | 久久久久久午夜精品 | 91在线视频免费 | 欧美影院一区二区 | 欧美专区亚洲专区 | 国产成人一区二区 | 在线观看国产区 | 欧美久久久久欧美一区 |