内容简介:最近在何时考虑Oracle AQ?如果出于某种原因你不想搞像
最近在 EmbedIT 工作中,我需要评估Oracle AQ是否是一个替代旧的异步任务管理系统的不错选择。所以,让我分享一下我的经验。首先,有关Oracle AQ的文档非常庞大,因此我将指出您想知道的最重要的内容。
何时考虑Oracle AQ?
如果出于某种原因你不想搞像 Hazelcast 或 Terracotta 这样的大数据框架,或者 通过JMS或AMPQ当前的消息传递解决方案对你来说还不够,那么一定要检查: Oracle Advance Queuing
Oracle带来了自己的消息传递,它通过数据库工作。什么时候适合你?
- 你每天都有数百万封邮件。
- 你收到了很多消息。
- 您希望以消息方式与其他数据库/应用程序通信。
- Oracle AQ对分布式事务来说非常棒。甲骨文明确指出:
Oracle AQ的工作方式是不会将消息视为dequeued 已出列(并因此被删除,假设您在默认的破坏模式下出列) ,直到保留到所有消费者都已将该消息取出为止。
如果处理某些事件涉及多个系统,并且主应用程序需要某种机制来知道其中一个处理失败,则Oracle AQ是一个完美的候选者。
Oracle AQ的技术观点
Oracle AQ允许将消息排入队列并从数据库管理的队列中出列,这里每个队列与一个队列表相关联。
每个队列都有一个有效载荷,可以是:
- RAW
- OBJECT:指定类型的消息。
- ANYDATA:具有任何对象类型的消息。
Oracle AQ中的队列可以是:
- 单个消费者队列(只有一个消费者能够在一瞬间出队)
-
多个消费者队列。可以通过以下方式实现:
1)多个收件人 - 在排队之前设置邮件的收件人。2)多个订户 - 队列具有默认的订户集。
如何使用Oracle AQ队列
- 通过JMS API,只需将WebLogic配置为 通过外部服务器 公开 Oracle AQ队列 。
- 通过JDBC API(我将在演示中显示)
- 通过C,.NET和许多其他语言。
重要说明:
Oracle AQ在订阅者之间没有任何类型的自动负载平衡,就像WebLogic JMS服务器中存在循环一样。但是你可以实现它。这并不难。
Oracle AQ演示
在演示的所有部分中,我将使用JDBC API和附加类AQTestObjectStruct和AQTestClient。
如何创建Oracle AQ表(多个消费者)
假设我们将在数据库模式ho_kloucek_in中排队“ Message_typ ” 类型的消息。所以在数据库中运行:
create or replace type ho_kloucek_in.Message_typ as object ( subject VARCHAR2(30), text VARCHAR2(80))
好的,我们有一个对象类型。现在我们可以为它创建带队列的队列表。
<b>public</b> <b>void</b> createQueue() throws SQLException, AQException, ClassNotFoundException
{
AQQueueTableProperty qtable_prop;
AQQueueProperty queue_prop;
AQQueueTable q_table;
AQQueue queue;
java.sql.Connection aqconn = getOracleDataSource().getConnection();
aqconn.setAutoCommit(false);
AQSession aqsession = <b>null</b>;
<font><i>// Register the Oracle AQ Driver</i></font><font>
Class.forName(</font><font>"oracle.AQ.AQOracleDriver"</font><font>);
<b>try</b> {
AQEnqueueOption enqueueOption = <b>new</b> AQEnqueueOption();
aqsession = AQDriverManager.createAQSession(aqconn);
qtable_prop = <b>new</b> AQQueueTableProperty(</font><font>"ho_kloucek_in.Message_typ"</font><font>);
qtable_prop.setMultiConsumer(<b>true</b>);
</font><font><i>/* Creating a queue table called aq_table1 in aqjava schema: */</i></font><font>
q_table = aqsession.createQueueTable(queueOwner, queueTable, qtable_prop);
System.out.println(</font><font>"Successfully created "</font><font>+queueTable+</font><font>" in "</font><font>+queueOwner+</font><font>" schema"</font><font>);
</font><font><i>/* Creating a new AQQueueProperty object */</i></font><font>
queue_prop = <b>new</b> AQQueueProperty();
</font><font><i>/* Creating a queue called aq_queue1 in aq_table1: */</i></font><font>
queue = aqsession.createQueue(q_table, queueName, queue_prop);
queue.start(<b>true</b>, <b>true</b>);
System.out.println(</font><font>"Successfully created "</font><font>+queueName+</font><font>" in "</font><font>+queueOwner+</font><font>""</font><font>);
} <b>catch</b> (Exception ex) {
ex.printStackTrace();
}
<b>finally</b> {
aqsession.close();
aqconn.close();
}
}
</font>
请注意我是如何调用qtable_prop.setMultiConsumer(true)的
通过设置收件人列表来定位多个消费者
现在,在队列表和队列设置之后,让我们用subsName变量设置名称,将消费者作为收件人:
<b>public</b> <b>void</b> dequeueMessage(<b>final</b> String subsName) throws AQException, SQLException, ClassNotFoundException
{
java.sql.Connection aqconn = getOracleDataSource().getConnection();
aqconn.setAutoCommit(false);
AQSession aq_sess = <b>null</b>;
Class.forName(<font>"oracle.AQ.AQOracleDriver"</font><font>);
<b>try</b>
{
aq_sess = AQDriverManager.createAQSession(aqconn);
AQQueue queue;
AQMessage message;
AQDequeueOption deq_option;
queue = aq_sess.getQueue(queueOwner, queueName);
AQDequeueOption opt = <b>new</b> AQDequeueOption();
opt.setConsumerName(subsName);
<b>while</b> (<b>true</b>) {
System.out.println(</font><font>"Waiting on subscription:"</font><font>+subsName);
message = queue.dequeue(opt, oracle.sql.STRUCT.<b>class</b>);
<b>if</b> (message == <b>null</b>) {
System.out.println(</font><font>"no messages"</font><font>);
} <b>else</b> {
System.out.println(</font><font>"Successful dequeue"</font><font>);
<b>if</b> (message.getObjectPayload().getPayloadData() instanceof STRUCT) {
STRUCT popedStruct = (STRUCT) message.getObjectPayload().getPayloadData();
System.out.println(</font><font>"subject: "</font><font> + popedStruct.getAttributes()[0]);
System.out.println(</font><font>"text: "</font><font> + popedStruct.getAttributes()[1]);
}
</font><font><i>//Commit</i></font><font>
aqconn.commit();
}
}
}
<b>finally</b> {
aq_sess.close();
aqconn.close();
}
}
</font>
在设置件收件人列表时,我们在排队前设置收件人:
<b>public</b> <b>void</b> enqueueMessage(String xmlMessage) throws SQLException, AQException, ClassNotFoundException {
java.sql.Connection aqconn = getOracleDataSource().getConnection();
aqconn.setAutoCommit(false);
AQSession aqsession = <b>null</b>;
<font><i>// Register the Oracle AQ Driver</i></font><font>
Class.forName(</font><font>"oracle.AQ.AQOracleDriver"</font><font>);
<b>try</b> {
AQEnqueueOption enqueueOption = <b>new</b> AQEnqueueOption();
aqsession = AQDriverManager.createAQSession(aqconn);
AQQueue queue = aqsession.getQueue(queueOwner, queueName);
AQMessage msg = queue.createMessage();
AQMessageProperty msgProps = <b>new</b> AQMessageProperty();
msgProps.setPriority(1);
Vector recipientList = <b>new</b> Vector();
AQAgent subs1 = <b>new</b> AQAgent(</font><font>"Sub2"</font><font>, <b>null</b>, 0);
recipientList.add(subs1);
msgProps.setRecipientList(recipientList);
msg.setMessageProperty(msgProps);
AQObjectPayload payload = msg.getObjectPayload();
Object [] test_attributes = <b>new</b> Object[2];
test_attributes [0] = </font><font>"AsyncTask"</font><font>;
test_attributes [1] = </font><font>"121212666"</font><font>;
StructDescriptor personDesc =
StructDescriptor.createDescriptor(</font><font>"HO_KLOUCEK_IN.MESSAGE_TYP"</font><font>, aqconn);
STRUCT <b>new</b>_async = <b>new</b> STRUCT(personDesc, aqconn, test_attributes);
payload.setPayloadData(<b>new</b>_async);
queue.enqueue(enqueueOption, msg);
aqconn.commit();
System.out.println(</font><font>"Message succesfully enqueued.."</font><font>);
}
<b>catch</b> (Exception ex) {
ex.printStackTrace();
}
<b>finally</b> {
aqsession.close();
aqconn.close();
}
}
</font>
此方法将消息发送到我的队列,并且仅为名为“Sub2”的消费者者使用!
我们来试试吧。
启动两个JVM,使用参数“Sub1”和“Sub2”启动前面提到的dequeueMessage方法... 使用包含dequeueMessage方法的类AQTestObjectStruct。以下内容应出现在两个JVM中:
JVM1:
Waiting on subscription: Sub1
JVM2:
Waiting on subscription: Sub2
如您所见,默认情况下AQQueue.dequeue方法是阻塞的。无论如何,您也可以指定阻塞一段时间,请参阅文档和 AQDequeueOption
现在启动类AQTestClient和先前修改过的方法enqueueMessage发送消息
Object [] test_attributes = <b>new</b> Object[2];
test_attributes [0] = <font>"AsyncTask"</font><font>;
test_attributes [1] = </font><font>"11111"</font><font>;
</font>
对于订户“Sub2”:
AQMessage msg = queue.createMessage();
AQMessageProperty msgProps = <b>new</b> AQMessageProperty();
msgProps.setPriority(1);
Vector recipientList = <b>new</b> Vector();
AQAgent subs2 = <b>new</b> AQAgent(<font>"Sub2"</font><font>, <b>null</b>, 0);
recipientList.add(subs2);
msgProps.setRecipientList(recipientList);
msg.setMessageProperty(msgProps);
</font>
consumer由AQAgent的名称设置。现在,在启动AQTestClient之后,应该在JVM2中出现:
Successful dequeue subject: AsyncTask text: 11111 Waiting on subscription: Sub2
订阅的多个消费者
使用AQMessage的收件人列表参数,您将在获取之前设置收件人。Oracle AQ文档明确指出:
如果在入队期间指定了收件人列表,则它将覆盖订阅列表。
那么让我们看看如何创建队列订阅 ...让我们更改AQTestObjectStruct中的dequeue方法并将其作为订阅者启动:
<b>public</b> <b>void</b> dequeueMessage(<b>final</b> String subsName) throws AQException, SQLException, ClassNotFoundException
{
java.sql.Connection aqconn = getOracleDataSource().getConnection();
aqconn.setAutoCommit(false);
AQSession aq_sess = <b>null</b>;
Class.forName(<font>"oracle.AQ.AQOracleDriver"</font><font>);
<b>try</b> {
aq_sess = AQDriverManager.createAQSession(aqconn);
AQQueue queue;
AQMessage message;
AQDequeueOption deq_option;
queue = aq_sess.getQueue(queueOwner, queueName);
</font><font><i>// add subscription</i></font><font>
AQAgent subs = <b>new</b> AQAgent(subsName, <b>null</b>, 0);
queue.removeSubscriber(subs);
queue.addSubscriber(subs,<b>null</b>);
AQDequeueOption opt = <b>new</b> AQDequeueOption();
opt.setConsumerName(subsName);
<b>while</b> (<b>true</b>) {
System.out.println(</font><font>"Waiting on subscription:"</font><font>+subsName);
message = queue.dequeue(opt, oracle.sql.STRUCT.<b>class</b>);
<b>if</b> (message == <b>null</b>) {
System.out.println(</font><font>"no messages"</font><font>);
} <b>else</b> {
System.out.println(</font><font>"Successful dequeue"</font><font>);
<b>if</b> (message.getObjectPayload().getPayloadData() instanceof STRUCT) {
STRUCT popedStruct = (STRUCT) message.getObjectPayload().getPayloadData();
System.out.println(</font><font>"subject: "</font><font> + popedStruct.getAttributes()[0]);
System.out.println(</font><font>"text: "</font><font> + popedStruct.getAttributes()[1]);
}
</font><font><i>//Commit</i></font><font>
aqconn.commit();
}
}
}
<b>finally</b> {
aq_sess.close();
aqconn.close();
}
}
</font>
(注意:如果要添加新订阅者,请注释掉“queue.removeSubscriber(subs)”行)
现在,您可以在排队之前省略消息中的收件人列表,因为订阅会设置一组消息目标。
让我再引用Oracle AQ DOC:
如果enqueue的消息生成者提供消费者的收件人列表,则无需为多消费者队列指定订阅。 在某些情况下,可能需要将针对特定消费者集的消息排队,而不是默认的订户列表。
这就是它!如果未在消息中指定收件人列表,系统将向所有订户发送消息。如果指定收件人列表,则系统会将邮件传递给指定的收件人。如果你不指定收件人列表,队列将没有任何订阅者(AQQueue.addSubscriber方法),那么你最终会得到错误:
oracle.AQ.AQOracleSQLException: ORA-24033: no recipients <b>for</b> message
ORA-06512: at <font>"SYS.DBMS_AQIN"</font><font>, line 345
ORA-06512: at line 1
at oracle.AQ.AQOracleQueue.enqueue(AQOracleQueue.java:1267)
at com.sachinhandiekar.oracle.aq.AQTestClient.enqueueMessage(AQTestClient.java:55)
at com.sachinhandiekar.oracle.aq.AQTestClient.main(AQTestClient.java:84)
</font>
测试多个订阅者
首先改变的方法入队中AQTestClient,就像我说的,我们可以努力忽略任何消费者的设置:
<b>public</b> <b>void</b> enqueueMessage(String xmlMessage) throws SQLException, AQException, ClassNotFoundException {
java.sql.Connection aqconn = getOracleDataSource().getConnection();
aqconn.setAutoCommit(false);
AQSession aqsession = <b>null</b>;
<font><i>// Register the Oracle AQ Driver</i></font><font>
Class.forName(</font><font>"oracle.AQ.AQOracleDriver"</font><font>);
<b>try</b> {
AQEnqueueOption enqueueOption = <b>new</b> AQEnqueueOption();
aqsession = AQDriverManager.createAQSession(aqconn);
AQQueue queue = aqsession.getQueue(queueOwner, queueName);
AQMessage msg = queue.createMessage();
AQMessageProperty msgProps = <b>new</b> AQMessageProperty();
msgProps.setPriority(1);
AQObjectPayload payload = msg.getObjectPayload();
Object [] test_attributes = <b>new</b> Object[2];
test_attributes [0] = </font><font>"AsyncTask"</font><font>;
test_attributes [1] = </font><font>"5555"</font><font>;
StructDescriptor personDesc =
StructDescriptor.createDescriptor(</font><font>"HO_KLOUCEK_IN.MESSAGE_TYP"</font><font>, aqconn);
STRUCT <b>new</b>_async = <b>new</b> STRUCT(personDesc, aqconn, test_attributes);
payload.setPayloadData(<b>new</b>_async);
queue.enqueue(enqueueOption, msg);
aqconn.commit();
System.out.println(</font><font>"Message succesfully enqueued.."</font><font>);
}
<b>catch</b> (Exception ex) {
ex.printStackTrace();
}
<b>finally</b> {
aqsession.close();
aqconn.close();
}
}
</font>
现在通过两个JVM中的AQTestObjectStruct类启动前面提到的dequeueMessage方法和订阅:
JVM1:Waiting on subscription: Sub1
JVM2:Waiting on subscription: Sub2
现在运行修改后的没有收件人的AQTestClient.enqueueMessage方法和两个JVM中的输出将是:
**JVM1**: Waiting on subscription: Sub1 Successful dequeue subject: AsyncTask text: 5555 Waiting on subscription: Sub1 **JVM2**: Waiting on subscription: Sub2 Successful dequeue subject: AsyncTask text: 5555 Waiting on subscription: Sub2
如您所见,消息已发送给所有订阅者,因为我们未在消息中指定收件人。
总结
我希望我能很好地解释所有细节。我真的很想念Oracle AQ的一些消息默认负载均衡。这也许是我不会将它用于我们的应用程序节点之间的异步任务分配的原因,因为我需要循环,WebLogic JMS免费提供给我。但是如果你想在某种分布式事务中与多个应用程序通信,那么肯定会使用Oracle AQ。
点击标题见原文, 源码
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网
猜你喜欢:- 如何确定敏捷是否适合你的团队?
- Docker是否比虚拟机更适合我们?
- SOSP19' Ceph 的十年经验总结:文件系统是否适合做分布式文件系统的后端
- 适合移动端的省市区县选取器
- 如何找到适合自己的研发模式?
- Go语言适合干什么
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
大数据时代的IT架构设计
IT架构设计研究组 / 电子工业出版社 / 2014-4 / 49.00元
《大数据时代的IT架构设计》以大数据时代为背景,邀请著名企业中的一线架构师,结合工作中的实际案例展开与架构相关的讨论。《大数据时代的IT架构设计》作者来自互联网、教育、传统行业等领域,分享的案例极其实用,代表了该领域较先进的架构。无论你就职于哪一行业都可以从本书中找到相关的架构经验,对您在今后的架构设计工作中都能起到很好的帮助作用。 《大数据时代的IT架构设计》适合具备一定架构基础和架构经验......一起来看看 《大数据时代的IT架构设计》 这本书的介绍吧!