Oracle Streams Advanced Queuing (AQ) is a robust and feature-rich message queuing system integrated with Oracle Database. When an organization has different systems that must communicate with each other, a messaging environment can provide a standard, reliable way to transport critical information between these systems. AQ is implemented in database tables.
Oracle AQ是Oracle数据库中集成的一种消息队列机制,可以用于不同应用程序间的消息交互,例如PL/SQL可以通过相应的Package访问队列、C#应该程序可以通过ODP.NET访问队列、Java应用程序则可以通过OJMS访问队列。AQ内部是通过数据库表实现的(即消息实际上是存储在数据库表中)。
随笔展示了PL/SQL中使用AQ的基本用法,希望对各位有帮助。示例假设了一个场景:A是一个被频繁调用的存储过程,每次调用A之前需要调用过程B,B消耗大量的时间,假设A的执行并不依赖于B的执行结果,我们可以把调用B的上下文先存入AQ中,而后异步地进行处理,从而减小了B对应用程序性能的影响。
1. 创建AQ所需要的权限
GRANT EXECUTE ON DBMS_AQ TO user1; GRANT EXECUTE ON DBMS_AQADM TO user1; BEGIN DBMS_AQADM.GRANT_SYSTEM_PRIVILEGE('ENQUEUE_ANY', 'user1', FALSE); DBMS_AQADM.GRANT_SYSTEM_PRIVILEGE('DEQUEUE_ANY', 'user1', FALSE); END;
需要使用sys或system用户发起这些授权语句,欲创建及管理AQ,需要获得两个至关重要的包dbms_aq, dbms_aqadm的执行权限。后两个通过grant_system_privilege进行的授权是可选的,它们表示的是:
ENQUEUE_ANY means users granted this privilege are allowed to enqueue messages to any queues in the database. DEQUEUE_ANY means users granted this privilege are allowed to dequeue messages from any queues in the database.
2. 创建一个payload类型
CREATE OR REPLACE TYPE t_spl_queue_payload AS OBJECT ( ID CHAR(36), EXEC_DATE TIMESTAMP(6), PARAMETER1 NUMBER, PARAMETER2 VARCHAR2(500), FLAG CHAR(1) );
通常我们会定义一个对象,用于存储将来需要放置在AQ队列中的信息。
3. 创建AQ相关表
BEGIN DBMS_AQADM.CREATE_QUEUE_TABLE(queue_table => 'user1. Spl_queue_table', multiple_consumers => TRUE, queue_payload_type => 'user1.t_spl_queue_payload'); END;
执行的结果是生成了表Spl_queue_table,以及若干个aq$_ spl_queue_table_表。表Spl_queue_table中除了AQ队列自身需要的一些字段外,有一个类型为t_spl_queue_payload的USER_DATA字段,用于存储队列消息,这也印证了上面说的:AQ内部是通过数据库表实现的。
4. 创建及启动AQ
BEGIN DBMS_AQADM.CREATE_QUEUE(queue_name => 'user1.spl_aq', queue_table => 'user1.spl_queue_table'); END; -- BEGIN DBMS_AQADM.START_QUEUE(queue_name => 'user1.spl_aq'); END;
如何停止及删除AQ:
BEGIN DBMS_AQADM.STOP_QUEUE (queue_name => 'user1.spl_aq'); DBMS_AQADM.DROP_QUEUE (queue_name => 'user1.spl_aq'); DBMS_AQADM.DROP_QUEUE_TABLE (queue_table => 'user1.spl_queue_table'); END;
5. 消息的入队
PROCEDURE enqueue(p_payload IN t_spl_queue_payload) IS --PRAGMA AUTONOMOUS_TRANSACTION; enqueue_options dbms_aq.enqueue_options_t; message_properties dbms_aq.message_properties_t; message_handle RAW(16); recipients DBMS_AQ.aq$_recipient_list_t; BEGIN recipients(1) := sys.aq$_agent('someguy', 'user1.SPL_AQ', NULL); message_properties.recipient_list := recipients; message_properties.priority := -5; message_properties.delay := dbms_aq.no_delay; message_properties.expiration := dbms_aq.never; --enqueue_options.visibility := dbms_aq.on_commit; enqueue_options.visibility := dbms_aq.immediate; enqueue_options.sequence_deviation := null; dbms_aq.enqueue(queue_name => 'user1.SPL_AQ', enqueue_options => enqueue_options, message_properties => message_properties, payload => p_payload, msgid => message_handle); --COMMIT; END enqueue;
(1) recipient,其中“someguy”指定的是消息的接收者,出队时你需要指定一样的名字才能接收到消息。(2)visibility,可以是on_commit或者immediate,如果使用on_commit,需要手工调用commit语句之后消息才进入队列(这种情况下,最好使用自治事务);如果使用immediate,则dbms_aq.enqueue完成时消息就进入队列,不需commit,并且默认使用自治事务。
6. 消息的出队
PROCEDURE dequeue IS l_payload t_spl_queue_payload; l_queue_record NUMBER; dequeue_options dbms_aq.dequeue_options_t; message_properties dbms_aq.message_properties_t; message_handle RAW(16); BEGIN dequeue_options.consumer_name := 'someguy'; dequeue_options.dequeue_mode := dbms_aq.remove; dequeue_options.navigation := dbms_aq.next_message; dequeue_options.visibility := dbms_aq.immediate; --dequeue_options.wait := dbms_aq.forever; dequeue_options.wait := dbms_aq.no_wait; dequeue_options.msgid := null; -- SELECT COUNT(*) INTO l_queue_record FROM AQ$SPL_QUEUE_TABLE WHERE msg_state = 'READY'; -- FOR i IN 1 .. l_queue_record LOOP dbms_aq.dequeue(queue_name => 'user1.SPL_AQ', dequeue_options => dequeue_options, message_properties => message_properties, payload => l_payload, msgid => message_handle); -- /*………………………………………. some time consuming calculation ………………………………………….*/ END LOOP; END;
(1) consumer_name需要和前面在入队时指定的recipient一致。(2)wait的两个值forever和no_wait是指如果当前队列中无消息时,是否进行等待,默认等待。(3) navigation的两个值first_message和next_message,一般出于性能考虑我们使用后者,或者在第一次出队时使用前者而在随后的出队中使用后者:
The FIRST_MESSAGE navigation option performs a SELECT on the queue. The NEXT_ MESSAGE navigation option fetches from the results of the SELECT run in the FIRST_ MESSAGE navigation. Thus performance is optimized because subsequent dequeues need not run the entire SELECT again.