什么是集群
集群就是將相同的程序、功能,部署在兩臺或多臺服務器上,這些服務器對外提供的功能是完全一樣的。
集群是通過不斷橫向擴展增加服務器的方式,以提高服務的能力。
● 集群可以解決單點故障問題
● 集群可以提高系統的可用性
● 集群可以提高系統的服務能力
通過共享存儲目錄(kahaDB)來實現master和slave的主從信息同步;
所有ActiveMQ的broker都在不斷地獲取共享目錄的控制權,哪個broker搶到了控制權,它就成為master,它將鎖定該目錄,其他broker就只能成為slave。
當master主出現故障后,剩下的slave從將再進行爭奪共享目錄的控制權,誰搶到共享目錄的控制權,誰就成為主,其他沒有搶到控制權的稱為從。
由于他們是基于共享目錄,所以當主出現故障后,其上沒有被消費的消息在接下來產生的新的master主中可以繼續進行消費。
這種方式客戶端訪問的都是主,從只是起到了一個備份訪問的作用
(1) 架構圖
(2) 實現步驟
A、 安裝多個ActiveMQ
因為ActiveMQ的安裝和Tomcat一樣,解壓就可以使用,所以我們直接在/usr/local目錄下復制多份,就相當于安裝了多個ActiveMQ,我們這里復制3個ActiveMQ出來。
復制前,先將運行的ActiveMQ停止。
B、 打開三個Xshell,分別連接不同的ActiveMQ方便操作
C、 配置每個activeMQ的conf /activemq.xml文件中的共享目錄
如果集群搭建在一臺機器上需要改端口,如果搭建在多臺上就不需要了
如果搭建在多臺服務器上,那么存放共享目錄的機器需要通過磁盤掛載的方式掛載到主從機器上。
● 修改三個ActiveMQ的共享目錄
persistenceAdapter>
<!--<kahaDB directory="${activemq.data}/kahadb"/>-->
<kahaDB directory="/opt/kahadb"/>
</persistenceAdapter>
● 修改完持久化目錄后,需要在/opt目錄下創建該目錄
D、 配置每個activeMQ的conf /activemq.xml文件中的端口
為了避免端口號的沖突,前三個地址端口+1,后兩個端口地址-1,可以將文件下載下來替換。
一個ActiveMQ
<transportConnector name="openwire" uri="tcp://0.0.0.0:61617?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="amqp" uri="amqp://0.0.0.0:5673?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="stomp" uri="stomp://0.0.0.0:61614?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="mqtt" uri="mqtt://0.0.0.0:1882?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="ws" uri="ws://0.0.0.0:61613?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
第二個ActiveMQ
<transportConnector name="openwire" uri="tcp://0.0.0.0:61618?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="amqp" uri="amqp://0.0.0.0:5674?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="stomp" uri="stomp://0.0.0.0:61615?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="mqtt" uri="mqtt://0.0.0.0:1881?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="ws" uri="ws://0.0.0.0:61612?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
第三個ActiveMQ
<transportConnector name="openwire" uri="tcp://0.0.0.0:61619?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="amqp" uri="amqp://0.0.0.0:5675?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="stomp" uri="stomp://0.0.0.0:61616?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="mqtt" uri="mqtt://0.0.0.0:1880?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="ws" uri="ws://0.0.0.0:61611?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
● maximumConnections 最大連接數;
● wireFormat.maxFrameSize 表示一個完整消息的最大數據量,單位byte;
● 0.0.0.0表示任意ip
E、 修改conf/jetty.xml文件的jetty服務器端口(管理控制臺)
第一個ActiveMQ
<bean id="jettyPort" class="org.apache.activemq.web.WebConsolePort" init-method="start">
<!-- the default port number for the web console -->
<property name="host" value="0.0.0.0"/>
<property name="port" value="8162"/>
</bean>
第二個ActiveMQ
<bean id="jettyPort" class="org.apache.activemq.web.WebConsolePort" init-method="start">
<!-- the default port number for the web console -->
<property name="host" value="0.0.0.0"/>
<property name="port" value="8163"/>
</bean>
第三個ActiveMQ
<bean id="jettyPort" class="org.apache.activemq.web.WebConsolePort" init-method="start">
<!-- the default port number for the web console -->
<property name="host" value="0.0.0.0"/>
<property name="port" value="8164"/>
</bean>
F、 啟動三臺ActiveMQ,可以測試驗證了
注意:啟動后會有一段時間延時,稍等一會;
瀏覽器訪問http://192.168.235.128:8162/, http://192.168.235.128:8163/ ,http://192.168.235.128:8164/ 判斷主從服務器;
web控制臺能訪問的是 master,不能訪問的是 slave。
G、 修改11-activemq-java中的程序收發消息代碼
連接時使用故障轉義協議failover
failover:(tcp://192.168.235.128:61617,tcp://192.168.235.128:61618,tcp://192.168.235.128:61619)
修改BROKER_URL地址
public static final String BROKER_URL = "failover:(tcp://192.168.235.128:61617,tcp://192.168.235.128:61618,tcp://192.168.235.128:61619)";
為了看到效果,發消息和接收消息我們都是用循環方式
//發消息 沒有返回值,是非阻塞的
while(true){
messageProducer.send(message);
}
查看發送以及接收idea控制臺輸出,停止ActiveMQ主,看效果
注意:如果是事務消息,被中斷那么程序發送程序出錯,不能實現,所以我們將消息改為非事務消息進行測試,如果是非事務消息就注釋掉session.commit。
該方式與shared filesystem方式類似,只是共享的存儲介質由文件系統改成了數據庫。
(1) 架構圖
(2) 實現步驟
A、 安裝多個ActiveMQ(已做)
因為ActiveMQ的安裝和Tomcat一樣,解壓就可以使用,所以我們直接在/usr/local目錄下復制多份,就相當于安裝了多個ActiveMQ,我們這里復制3個ActiveMQ出來復制前,先將運行的ActiveMQ停止。
B、 打開三個Xshell,分別連接不同的ActiveMQ方便操作(已做)
C、 配置每個activeMQ的conf /activemq.xml文件中的持久化適配器是jdbc數據庫方式
<persistenceAdapter>
<jdbcPersistenceAdapter dataSource="#mysql-ds"/>
</persistenceAdapter>
D、 配置每個數據庫連接池
注意:連接池的配置需要配置在的外面
<bean id="mysql-ds" class="com.alibaba.druid.pool.DruidDataSource" destroy-method="close">
<property name="driverClassName" value="com.mysql.jdbc.Driver"/>
<property name="url" value="jdbc:mysql://localhost:3306/activemq?useUnicode=true&characterEncoding=utf8&useSSL=false"/>
<property name="username" value="root"/>
<property name="password" value="123456"/>
</bean>
E、 在每個ActiveMQ的lib目錄下加入mysql的驅動包和數據庫連接池Druid包,該包在我提供的資料05-ActiveMQ\resources\lib下
可以通過Xftp或者rz命令上傳。
F、 啟動MySQL數據庫,并創建activemq數據庫
G、 配置每個activeMQ的conf /activemq.xml文件中的端口(已做)
如果集群搭建在一臺機器上需要改端口,如果搭建在多臺上就不需要了;
為了避免端口號的沖突,前三個地址端口+1,后兩個端口地址-1。
第一個ActiveMQ
<transportConnector name="openwire" uri="tcp://0.0.0.0:61617?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="amqp" uri="amqp://0.0.0.0:5673?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="stomp" uri="stomp://0.0.0.0:61614?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="mqtt" uri="mqtt://0.0.0.0:1882?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="ws" uri="ws://0.0.0.0:61613?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
第二個ActiveMQ
<transportConnector name="openwire" uri="tcp://0.0.0.0:61618?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="amqp" uri="amqp://0.0.0.0:5674?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="stomp" uri="stomp://0.0.0.0:61615?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="mqtt" uri="mqtt://0.0.0.0:1881?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="ws" uri="ws://0.0.0.0:61612?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
第三個ActiveMQ
<transportConnector name="openwire" uri="tcp://0.0.0.0:61619?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="amqp" uri="amqp://0.0.0.0:5675?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="stomp" uri="stomp://0.0.0.0:61616?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="mqtt" uri="mqtt://0.0.0.0:1880?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="ws" uri="ws://0.0.0.0:61611?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
● maximumConnections 最大連接數;
● wireFormat.maxFrameSize 表示一個完整消息的最大數據量,單位byte;
● 0.0.0.0表示任意ip
H、 修改conf/jetty.xml文件的jetty服務器端口(管理控制臺) (已做)
第一個ActiveMQ
<bean id="jettyPort" class="org.apache.activemq.web.WebConsolePort" init-method="start">
<!-- the default port number for the web console -->
<property name="host" value="0.0.0.0"/>
<property name="port" value="8162"/>
</bean>
第二個ActiveMQ
<bean id="jettyPort" class="org.apache.activemq.web.WebConsolePort" init-method="start">
<!-- the default port number for the web console -->
<property name="host" value="0.0.0.0"/>
<property name="port" value="8163"/>
</bean>
第三個ActiveMQ
<bean id="jettyPort" class="org.apache.activemq.web.WebConsolePort" init-method="start">
<!-- the default port number for the web console -->
<property name="host" value="0.0.0.0"/>
<property name="port" value="8164"/>
</bean>
I、啟動三臺ActiveMQ,可以測試驗證了
瀏覽器訪問http://192.168.235.128:8162/, http://192.168.235.128:8163/ ,http://192.168.235.128:8164/ 判斷主從服務器;
web控制臺能訪問的是 master,不能訪問的是 slave。
J、 修改11-activemq-java中的程序收發消息類(已做)
連接時使用故障轉義協議failover
failover:(tcp://192.168.235.128:61617,tcp://192.168.235.128:61618,tcp://192.168.235.128:61619)
修改BROKER_URL地址
public static final String BROKER_URL = "failover:(tcp://192.168.235.128:61617,tcp://192.168.235.128:61618,tcp://192.168.235.128:61619)";
為了看到效果,發消息和接收消息我們都是用循環方式
//發消息 沒有返回值,是非阻塞的
while(true){
messageProducer.send(message);
session.commit();
}
查看發送以及接收idea控制臺輸出,停止ActiveMQ主,看效果
注意:如果是事務消息,被中斷那么程序發送程序出錯,不能實現,所以我們將消息改為非事務消息進行測試,如果是非事務消息就注釋掉session.commit。
3、Replicated LevelDB Store方式主從集群(常用)
基于可復制的LevelDB存儲方式的集群;
這種集群方式是ActiveMQ5.9版本以后新增的特性,它使用ZooKeeper從一組broker中協調選擇一個broker作為master主,其他broker作為slave從的模式。所有slave從節點通過復制master主節點的消息來實現消息同步,當主出現故障后,沒有被消費的消息在從服務器上也同步了一份,所以不會有消息的丟失。
LevelDB 是 Google開發的一套用于持久化數據的高性能kv數據庫,ActiveMQ利用該數據庫進行數據的存儲。
只有master 接受客戶端連接,slave不接受客戶端連接,Master的所有存儲操作都將被復制到slaves。
在這個模式中,需要有半數以上的broker是正常的,集群才是可用的,超過半數broker故障,ZooKeeper的選舉算法將不能選擇master,從而導致集群不可用。
(1)架構圖
(2) 實現步驟
A、 安裝多個ActiveMQ(已做)
因為ActiveMQ的安裝和Tomcat一樣,解壓就可以使用,所以我們直接在/usr/local目錄下復制多份,就相當于安裝了多個ActiveMQ,我們這里復制3個ActiveMQ出來。
復制前,先將運行的ActiveMQ停止
B、 打開三個Xshell,分別連接不同的ActiveMQ方便操作(已做)
C、 配置每個activeMQ的conf /activemq.xml文件中的持久化適配器replicatedLevelDB方式
<persistenceAdapter>
<replicatedLevelDB
replicas="3"
bind="tcp://0.0.0.0:0"
zkAddress="localhost:2181"/>
</persistenceAdapter>
參數說明
● replicas :集群中存在的節點的數目
● bind :當該節點成為master后,將使用該bind配置的ip和端口進行數據復制
● zkAddress :ZooKeeper的地址
D、 啟動ZooKeeper服務器
E、 啟動三臺ActiveMQ,可以測試驗證了
瀏覽器訪問http://192.168.235.128:8162/, http://192.168.235.128:8163/ ,http://192.168.235.128:8164/ 判斷主從服務器;
web控制臺能訪問的是 master,不能訪問的是 slave。
F、 修改11-activemq-java中的程序收發消息類(已做)
連接時使用故障轉義協議failover
ailover:(tcp://192.168.235.128:61617,tcp://192.168.235.128:61618,tcp://192.168.235.128:61619)
修改BROKER_URL地址
public static final String BROKER_URL = "failover:(tcp://192.168.235.128:61617,tcp://192.168.235.128:61618,tcp://192.168.235.128:61619)";
為了看到效果,發消息和接收消息我們都是用循環方式
//發消息 沒有返回值,是非阻塞的
while(true){
messageProducer.send(message);
}
查看發送以及接收idea控制臺輸出,停止ActiveMQ主,看效果。
G、 把其中的一臺master關閉,留下兩臺運行,觀察效果
H、 繼續關閉下一臺master,留下一臺運行,觀察效果
I、 啟動其中一臺,讓兩個運行,再觀察效果
(3) 總結
這種方式,不適合集群太大,也就是activemq不能太多,因為多個activemq之間需要復制消息,這個比較耗資源,占用網絡,建議3、5臺。