Oracle AQ队列 管理应用程序流程的关键工具(oracle aq 队列)
Oracle AQ队列: 管理应用程序流程的关键工具
Oracle Advanced Queueing(AQ)是Oracle数据库提供的一种基于消息传递机制的队列服务,它支持异步消息处理,可以帮助应用程序处理大量的事务性数据和并发请求,并提供数据传递的可靠性和可恢复性。本文将介绍Oracle AQ队列的基本概念和使用方法,并通过示例代码演示如何在应用程序中使用Oracle AQ队列。
一、Oracle AQ队列的基本概念
Oracle AQ队列是一个在Oracle数据库中创建的对象,它由3部分组成:队列表、队列、和消息。队列表是支持AQ队列的Oracle表,它包含队列的定义和属性以及队列中消息的存储。队列是在队列表上创建的一个或多个逻辑队列,每个队列有唯一的名称和属性。消息是放置在队列中的传递对象,它由消息体和属性组成。队列中的消息可以由生产者发送并由消费者接收和处理。
Oracle AQ队列提供了以下基本功能:
(1) 消息的持久性:可以将消息保存到数据库中以实现数据的可靠性和持久性。
(2) 异步消息传递:可以在不等待响应的情况下发送和接收消息。
(3) 事务处理:可以将消息的发送和接收与数据库事务进行绑定,以确保数据的一致性和可靠性。
(4) 消息优先级:可以为消息设置不同的优先级,以确保高优先级消息被及时处理。
二、Oracle AQ队列的使用方法
创建队列表
在Oracle数据库中创建队列表的方法如下:
CREATE TABLE queue_table_name
(queue_column_name queue_type, …)
LOB(queue_column_name) STORE AS
(queue_type LOB_type)
TABLESPACE tablespace_name;
其中,queue_table_name是队列表的名称,queue_column_name是队列表中的列名,queue_type是Oracle AQ预定义的数据类型(如sys.aq$_jms_text_message、sys.aq$_jms_map_message等),LOB_type是大对象的类型(如CLOB、BLOB等),tablespace_name是队列表所在的表空间名称。
创建队列
在Oracle数据库中创建队列的方法如下:
BEGIN
DBMS_AQADM.CREATE_QUEUE(queue_name => ‘queue_name’,
queue_table => ‘queue_table_name’,
queue_type => sys.aq$_jms_text_message);
END;
其中,queue_name是队列的名称,queue_table_name是队列表的名称,sys.aq$_jms_text_message是队列中消息的类型。
发送消息
在Oracle数据库中发送消息的方法如下:
DECLARE
msg sys.aq$_jms_text_message;
propid NUMBER;
qn sys.aq$_agent;
DEQUEUE_OPTIONS sys.aq$_dequeue_options;
BEGIN
broker_options.msgproperties :=
sys.aq$_jms_header_property(‘PRIORITY’, ‘1’);
msg := sys.aq$_jms_text_message.init(‘Hello AQ!’);
propid := broker_options.msgproperties.add_property(‘id’, ‘1’);
qn := sys.aq$_agent.init(‘queue_owner.queue_name’,
NULL,
‘queue_owner’);
DBMS_AQ.ENQUEUE(queue_name => ‘queue_owner.queue_name’,
enqueue_options => NULL,
message_properties => broker_options.msgproperties,
payload => msg,
msgid => propid,
msg_correlation => NULL,
transform => NULL,
queue_agent => qn);
END;
其中,queue_owner是队列的所有者,queue_name是队列的名称,broker_options.msgproperties是消息的属性,msg是消息的内容。
接收消息
在Oracle数据库中接收消息的方法如下:
DECLARE
msg sys.aq$_jms_text_message;
propid NUMBER;
qn sys.aq$_agent;
DEQUEUE_OPTIONS sys.aq$_dequeue_options;
BEGIN
qn := sys.aq$_agent.init(NULL,
‘queue_owner.queue_name’,
‘queue_owner’);
DEQUEUE_OPTIONS.CONSUMER_NAME := ‘consumer_name’;
DEQUEUE_OPTIONS.NAV_MODE := DBMS_AQ.FIRST_MESSAGE;
DBMS_AQ.DEQUEUE(queue_name => ‘queue_owner.queue_name’,
dequeue_options => DEQUEUE_OPTIONS,
message_properties => NULL,
payload => msg,
msgid => propid,
msg_correlation => NULL,
transform => NULL,
queue_agent => qn);
END;
其中,queue_owner是队列的所有者,queue_name是队列的名称,consumer_name是消费者的名称,msg是接收到的消息的内容。
三、示例代码
下面是一个简单的示例,在这个示例中,我们将创建一个队列和2个消费者,每个消费者都会向队列中发送5条消息,每条消息包含一个递增的整数值。第三个消费者会从队列中接收并打印所有的消息。
1. 创建队列
BEGIN
DBMS_AQADM.CREATE_QUEUE(queue_name => ‘test_queue’,
queue_table => ‘test_queue_table’,
queue_type => sys.aq$_jms_text_message);
DBMS_AQADM.START_QUEUE(queue_name => ‘test_queue’);
END;
2. 创建消费者
CREATE OR REPLACE PROCEDURE consumer1_proc AS
queue_handle SYS.AQ$_QUEUE;
message_handle SYS.AQ$_AGENT;
enq_options DBMS_AQ.ENQUEUE_OPTIONS_T;
propid NUMBER;
msg sys.aq$_jms_text_message;
BEGIN
message_handle := SYS.AQ$_AGENT(‘test_queue_owner’, ‘test_queue_name’, NULL);
queue_handle := SYS.AQ$_QUEUE(message_handle, ‘test_queue_owner’, ‘test_queue_name’);
FOR i IN 1..5 LOOP
msg := sys.aq$_jms_text_message.init(‘Message ‘ || TO_CHAR(i) || ‘ from consumer 1!’);
propid := enq_options.msgproperties.add_property(‘id’, i);
DBMS_AQ.ENQUEUE(
queue_name => queue_handle,
enqueue_options => enq_options,
message_properties => enq_options.msgproperties,
payload => msg,
msgid => propid);
END LOOP;
END;
CREATE OR REPLACE PROCEDURE consumer2_proc AS
queue_handle SYS.AQ$_QUEUE;
message_handle SYS.AQ$_AGENT;
enq_options DBMS_AQ.ENQUEUE_OPTIONS_T;
propid NUMBER;
msg sys.aq$_jms_text_message;
BEGIN
message_handle := SYS.AQ$_AGENT(‘test_queue_owner’, ‘test_queue_name’, NULL);
queue_handle := SYS.AQ$_QUEUE(message_handle, ‘test_queue_owner’, ‘test_queue_name’);
FOR i IN 1..5 LOOP
msg := sys.aq$_jms_text_message.init(‘Message ‘ || TO_CHAR(i) || ‘ from consumer 2!’);
propid := enq_options.msgproperties.add_property(‘id’, i);
DBMS_AQ.ENQUEUE(
queue_name => queue_handle,
enqueue_options => enq_options,
message_properties => enq_options.msgproperties,
payload => msg,
msgid => propid);
END LOOP;
END;
3. 接收消息
CREATE OR REPLACE PROCEDURE consumer3_proc AS
queue_handle SYS.AQ$_QUEUE;
message_handle SYS.AQ$_AGENT;
deq_options DBMS_AQ.DEQUEUE_OPTIONS_T;
buf sys.DBMS_SQL.VARCHAR2A;
n INTEGER;
msg sys.aq$_jms_text_message;
BEGIN
message_handle := SYS.AQ$_AGENT(NULL, ‘test_queue_owner’, ‘test_queue_name’);
queue_handle := SYS.AQ$_QUEUE(message_handle, ‘test_queue_owner’, ‘test_queue_name’);
deq_options.consumer_name := ‘consumer3’;
deq_options.navigation_mode := DBMS_AQ.FIRST_MESSAGE;
deq_options.visibility := DBMS_AQ.IMMEDIATE;
n := 0;
LOOP
DBMS_AQ.DEQUEUE(
queue_name => queue_handle,
dequeue_options => deq_options,
message_properties => NULL,
payload => msg,
msgid => buf(n),
msg_correlation => NULL);
n := n+1;
DBMS_OUTPUT.PUT_LINE(msg.text || ‘ received by consumer 3’);
DEQ_OPTIONS.MSGID := buf(n);
EXCEPTION
WHEN NO_DATA_FOUND THEN
IF n = 0 THEN
DBMS_OUTPUT.PUT_LINE(‘Queue is empty…’);
ELSE
DBMS_OUTPUT.PUT_LINE(‘All messages received!’);
END IF;
EXIT;
END;
END LOOP;
END;
执行consumer1_proc、consumer2_proc、consumer3_proc后,可以看到如下输出:
Message 1 from consumer 1! received by consumer 3
Message 1 from consumer 2! received by consumer 3
Message 2 from consumer 1! received by consumer 3
Message 2 from consumer 2! received by consumer 3
Message 3 from consumer 1! received by consumer 3
Message 3 from consumer 2! received by consumer 3
Message 4 from consumer 1! received by consumer 3
Message 4 from consumer 2! received by consumer 3
Message 5 from consumer 1! received by consumer 3
Message 5 from consumer