一、載入與安置
徑直去官網(wǎng)(http://activemq.apache.org/)載入最新本子即可,因?yàn)檫@是免安置的,只須要解壓就行了。安置完之保守入bin目次,雙擊activemq.bat文獻(xiàn)(linux下在bin目次下實(shí)行activemqstart)
二、考察遏制臺(tái)
在欣賞器輸出:http://ip:8161/admin/,展示如次界面表白啟用勝利,默許的用戶名暗號(hào)都是admin
三、竄改端標(biāo)語
61616為對(duì)外效勞端標(biāo)語
8161為遏制器端標(biāo)語
當(dāng)端標(biāo)語辯論時(shí),不妨竄改這兩個(gè)端標(biāo)語。cdconf,竄改activemq.xml竄改內(nèi)里的61616端口。竄改jetty.xml,竄改內(nèi)里的8161端口。
queue部隊(duì)形式:
和rabbitmq大略部隊(duì)形式一律,假如有多個(gè)耗費(fèi)者耗費(fèi)同一個(gè)部隊(duì)中的動(dòng)靜的話,默許也是輪詢體制的耗費(fèi)
示例代碼:
publicclassProductor{publicstaticfinalStringBORKER_URL="tcp://127.0.0.1:61616";publicstaticfinalStringQUEUE_NAME="queue1";publicstaticvoidmain(String[]args)throwsJMSException{//創(chuàng)造工場ActiveMQConnectionFactoryfactory=newActiveMQConnectionFactory(BORKER_URL);//創(chuàng)造tcp貫穿Connectionconnection=factory.createConnection();//創(chuàng)造貫穿connection.start();/***創(chuàng)造對(duì)話,1.能否打開工作,2.簽收形式*/Sessionsession=connection.createSession(false,Session.AUTO_ACKNOWLEDGE);//創(chuàng)造部隊(duì)(動(dòng)靜的手段地)Queuequeue=session.createQueue(QUEUE_NAME);//創(chuàng)造消費(fèi)者M(jìn)essageProducerproducer=session.createProducer(queue);//動(dòng)靜非長久化producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);//動(dòng)靜長久化默許是長久化的//producer.setDeliveryMode(DeliveryMode.PERSISTENT);//創(chuàng)造動(dòng)靜TextMessagemessage=session.createTextMessage("您好嗎");//發(fā)送動(dòng)靜producer.send(message);producer.close();session.close();connection.close();System.out.println("發(fā)送勝利!");}}publicclassConsumer{publicstaticfinalStringBORKER_URL="tcp://127.0.0.1:61616";publicstaticfinalStringQUEUE_NAME="queue1";publicstaticvoidmain(String[]args)throwsJMSException{//創(chuàng)造工場ActiveMQConnectionFactoryfactory=newActiveMQConnectionFactory(BORKER_URL);//創(chuàng)造tcp貫穿Connectionconnection=factory.createConnection();//創(chuàng)造貫穿connection.start();/***創(chuàng)造對(duì)話,1.能否打開工作,2.簽收形式*/Sessionsession=connection.createSession(false,Session.AUTO_ACKNOWLEDGE);//創(chuàng)造/證明部隊(duì)(動(dòng)靜的手段地)Queuequeue=session.createQueue(QUEUE_NAME);//創(chuàng)造耗費(fèi)者M(jìn)essageConsumerconsumer=session.createConsumer(queue);/*while(true){//receive會(huì)阻礙線程TextMessagemessage=(TextMessage)consumer.receive();System.out.println("接受到動(dòng)靜:"+message.getText());}*///監(jiān)聽的辦法耗費(fèi)consumer.setMessageListener(message->{TextMessagetextMessage=(TextMessage)message;try{System.out.println("1號(hào)接受到動(dòng)靜:"+textMessage.getText());}catch(JMSExceptione){e.printStackTrace();}});}}topic部隊(duì)形式:
稱為頒布訂閱形式,消費(fèi)者把動(dòng)靜發(fā)送給訂閱給某個(gè)topic中心的耗費(fèi)者,是散發(fā)的形式,這種形式默許須要先啟用耗費(fèi)者,否則就算消費(fèi)者頒布了某個(gè)topic中心的動(dòng)靜,耗費(fèi)者也耗費(fèi)不了;只有耗費(fèi)者提早訂閱,而且做了動(dòng)靜長久化的處置,如許后啟用耗費(fèi)者本領(lǐng)耗費(fèi)提早推送的動(dòng)靜。
代碼:
publicclassProductor{publicstaticfinalStringBORKER_URL="tcp://127.0.0.1:61616";publicstaticfinalStringTOPIC_NAME="topic1";publicstaticvoidmain(String[]args)throwsJMSException{//創(chuàng)造工場ActiveMQConnectionFactoryfactory=newActiveMQConnectionFactory(BORKER_URL);//異步送達(dá)factory.setUseAsyncSend(true);//創(chuàng)造tcp貫穿Connectionconnection=factory.createConnection();/***創(chuàng)造對(duì)話,1.能否打開工作,2.簽收形式*/Sessionsession=connection.createSession(false,Session.AUTO_ACKNOWLEDGE);//創(chuàng)造/證明topic(動(dòng)靜的手段地)Topictopic=session.createTopic(TOPIC_NAME);//創(chuàng)造消費(fèi)者ActiveMQMessageProducerproducer=(ActiveMQMessageProducer)session.createProducer(topic);//長久化producer.setDeliveryMode(DeliveryMode.PERSISTENT);//創(chuàng)造貫穿connection.start();//創(chuàng)造動(dòng)靜TextMessagemessage=session.createTextMessage("您好嗎");//發(fā)送動(dòng)靜,異步發(fā)送回調(diào)因變量producer.send(message,newAsyncCallback(){@OverridepublicvoidonSuccess(){System.out.println("success");}@OverridepublicvoidonException(JMSExceptione){System.out.println("fail");}});producer.close();session.close();connection.close();System.out.println("發(fā)送勝利!");}}publicclassConsumer1{publicstaticfinalStringBORKER_URL="tcp://127.0.0.1:61616";publicstaticfinalStringTOPIC_NAME="topic1";publicstaticvoidmain(String[]args)throwsJMSException{//創(chuàng)造工場ActiveMQConnectionFactoryfactory=newActiveMQConnectionFactory(BORKER_URL);//創(chuàng)造tcp貫穿Connectionconnection=factory.createConnection();//擬訂clientIdconnection.setClientID("my");/***創(chuàng)造對(duì)話,1.能否打開工作,2.簽收形式*/Sessionsession=connection.createSession(false,Session.AUTO_ACKNOWLEDGE);//創(chuàng)造/證明topic(動(dòng)靜的手段地)Topictopic=session.createTopic(TOPIC_NAME);//訂閱中心TopicSubscribersubscriber=session.createDurableSubscriber(topic,"remark");//創(chuàng)造貫穿connection.start();while(true){//receive會(huì)阻礙線程//接受訂閱的動(dòng)靜TextMessagemessage=(TextMessage)subscriber.receive();System.out.println("接受到動(dòng)靜:"+message.getText());}/*//創(chuàng)造耗費(fèi)者M(jìn)essageConsumerconsumer=session.createConsumer(topic);//創(chuàng)造貫穿connection.start();*//*while(true){//receive會(huì)阻礙線程TextMessagemessage=(TextMessage)consumer.receive();System.out.println("接受到動(dòng)靜:"+message.getText());}*//*//監(jiān)聽的辦法耗費(fèi)consumer.setMessageListener(message->{TextMessagetextMessage=(TextMessage)message;try{System.out.println("1號(hào)接受到動(dòng)靜:"+textMessage.getText());}catch(JMSExceptione){e.printStackTrace();}});*/}}怎樣保護(hù)動(dòng)靜的真實(shí)性
回復(fù)這個(gè)題目重要從長久化,工作,簽收這幾個(gè)上面動(dòng)手
動(dòng)靜長久化的中心代碼:
//queue形式的動(dòng)靜長久化默許是長久化的producer.setDeliveryMode(DeliveryMode.PERSISTENT);/***topic形式的長久化*/Topictopic=session.createTopic(TOPIC_NAME);ActiveMQMessageProducerproducer=(ActiveMQMessageProducer)session.createProducer(topic);producer.setDeliveryMode(DeliveryMode.PERSISTENT);connection.start();工作的中心代碼(偏消費(fèi)者):
//參數(shù)樹立成trueconnection.createSession(false,Session.AUTO_ACKNOWLEDGE);//工作提交session.commit();簽收的中心代碼(偏耗費(fèi)者):
//參數(shù)樹立成手動(dòng)提交connection.createSession(false,Session.CLIENT_ACKNOWLEDGE);//動(dòng)靜簽收message.acknowledge();提防:假如既打開工作,又打開手動(dòng)簽收,以工作為準(zhǔn),只有工作被提交了也默許動(dòng)靜被簽收了
本能提高:
1.運(yùn)用nio的和議比tcp的本能高,
擺設(shè)辦法:在conf目次下activemq.xml照著底下擺設(shè)<broker>...<transportConnectors><transportConnectorname="nio"uri="nio://0.0.0.0:61616"/></<transportConnectors>...</broker>第二步是代碼考察辦法由tcp改為nio//創(chuàng)造工場ActiveMQConnectionFactoryfactory=newActiveMQConnectionFactory("nio://127.0.0.1:61616");2.jdbc+Journaling普及惟有jdbc長久化的本能,它在做長久化入數(shù)據(jù)庫之前,會(huì)先將數(shù)據(jù)生存到Journaling文獻(xiàn)中,之后才漸漸同步到數(shù)據(jù)庫中,即是中央加了一層緩沖層。
把數(shù)據(jù)庫mysql的啟動(dòng)包放到lib目次下擺設(shè)辦法:在conf目次下activemq.xml照著底下擺設(shè),個(gè)中有個(gè)createTablesOnStartup屬性,默許值是true,表白歷次啟用后去數(shù)據(jù)庫機(jī)動(dòng)建表<persistenceAdapter><kahaDBdirectory="${activemq.data}/kahadb"/></persistenceAdapter>//上頭是默許擺設(shè)找到改成底下的擺設(shè)<persistenceAdapter><journalPersistenceAdapterFactoryjournalLogFiles="5"dataDirectory="${basedir}/activemq-data"dataSource="#mysql-ds"/></persistenceAdapter>//底下的代碼寫在<beans>節(jié)點(diǎn)中<beanid="mysql-ds"class="org.apache.commons.dbcp.BasicDataSource"destroy-method="close"><propertyname="driverClassName"value="com.mysql.jdbc.Driver"/><propertyname="url"value="jdbc:mysql://localhost/activemq?relaxAutoCommit=true"/><propertyname="username"value="activemq"/><propertyname="password"value="activemq"/><propertyname="poolPreparedStatements"value="true"/></bean>