博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
ORACLE HANDBOOK系列之十二:高级队列(Advanced Queuing)
阅读量:6256 次
发布时间:2019-06-22

本文共 5063 字,大约阅读时间需要 16 分钟。

Oracle Streams Advanced Queuing (AQ) is a robust and feature-rich message queuing system integrated with Oracle Database. When an organization has different systems that must communicate with each other, a messaging environment can provide a standard, reliable way to transport critical information between these systems. AQ is implemented in database tables.

Oracle AQ是Oracle数据库中集成的一种消息队列机制,可以用于不同应用程序间的消息交互,例如PL/SQL可以通过相应的Package访问队列、C#应该程序可以通过ODP.NET访问队列、Java应用程序则可以通过OJMS访问队列。AQ内部是通过数据库表实现的(即消息实际上是存储在数据库表中)。

随笔展示了PL/SQL中使用AQ的基本用法,希望对各位有帮助。示例假设了一个场景:A是一个被频繁调用的存储过程,每次调用A之前需要调用过程B,B消耗大量的时间,假设A的执行并不依赖于B的执行结果,我们可以把调用B的上下文先存入AQ中,而后异步地进行处理,从而减小了B对应用程序性能的影响。

1.    创建AQ所需要的权限

GRANT EXECUTE ON DBMS_AQ TO user1; GRANT EXECUTE ON DBMS_AQADM TO user1; BEGIN   DBMS_AQADM.GRANT_SYSTEM_PRIVILEGE('ENQUEUE_ANY', 'user1', FALSE);   DBMS_AQADM.GRANT_SYSTEM_PRIVILEGE('DEQUEUE_ANY', 'user1', FALSE); END;

需要使用sys或system用户发起这些授权语句,欲创建及管理AQ,需要获得两个至关重要的包dbms_aq, dbms_aqadm的执行权限。后两个通过grant_system_privilege进行的授权是可选的,它们表示的是:

ENQUEUE_ANY means users granted this privilege are allowed to enqueue messages to any queues in the database. DEQUEUE_ANY means users granted this privilege are allowed to dequeue messages from any queues in the database.

2.    创建一个payload类型

CREATE OR REPLACE TYPE t_spl_queue_payload AS OBJECT (   ID             CHAR(36),   EXEC_DATE     TIMESTAMP(6),   PARAMETER1       NUMBER,   PARAMETER2       VARCHAR2(500),   FLAG           CHAR(1) );

通常我们会定义一个对象,用于存储将来需要放置在AQ队列中的信息。

3.    创建AQ相关表

BEGIN   DBMS_AQADM.CREATE_QUEUE_TABLE(queue_table        => 'user1. Spl_queue_table',                                 multiple_consumers => TRUE,                                 queue_payload_type => 'user1.t_spl_queue_payload'); END;

执行的结果是生成了表Spl_queue_table,以及若干个aq$_ spl_queue_table_表。表Spl_queue_table中除了AQ队列自身需要的一些字段外,有一个类型为t_spl_queue_payload的USER_DATA字段,用于存储队列消息,这也印证了上面说的:AQ内部是通过数据库表实现的。

4.    创建及启动AQ

BEGIN   DBMS_AQADM.CREATE_QUEUE(queue_name  => 'user1.spl_aq',                           queue_table => 'user1.spl_queue_table'); END; -- BEGIN   DBMS_AQADM.START_QUEUE(queue_name => 'user1.spl_aq'); END;

如何停止及删除AQ:

BEGIN   DBMS_AQADM.STOP_QUEUE (queue_name => 'user1.spl_aq');   DBMS_AQADM.DROP_QUEUE (queue_name => 'user1.spl_aq');   DBMS_AQADM.DROP_QUEUE_TABLE (queue_table => 'user1.spl_queue_table'); END;

5.    消息的入队

PROCEDURE enqueue(p_payload IN t_spl_queue_payload) IS --PRAGMA AUTONOMOUS_TRANSACTION;   enqueue_options    dbms_aq.enqueue_options_t;   message_properties dbms_aq.message_properties_t;   message_handle     RAW(16);   recipients         DBMS_AQ.aq$_recipient_list_t; BEGIN   recipients(1) := sys.aq$_agent('someguy', 'user1.SPL_AQ', NULL);   message_properties.recipient_list := recipients;   message_properties.priority := -5;   message_properties.delay := dbms_aq.no_delay;   message_properties.expiration := dbms_aq.never; --enqueue_options.visibility := dbms_aq.on_commit;   enqueue_options.visibility := dbms_aq.immediate;   enqueue_options.sequence_deviation := null;   dbms_aq.enqueue(queue_name         => 'user1.SPL_AQ',                   enqueue_options    => enqueue_options,                   message_properties => message_properties,                   payload            => p_payload,                   msgid              => message_handle); --COMMIT; END enqueue;

(1) recipient,其中“someguy”指定的是消息的接收者,出队时你需要指定一样的名字才能接收到消息。(2)visibility,可以是on_commit或者immediate,如果使用on_commit,需要手工调用commit语句之后消息才进入队列(这种情况下,最好使用自治事务);如果使用immediate,则dbms_aq.enqueue完成时消息就进入队列,不需commit,并且默认使用自治事务。

6.    消息的出队

PROCEDURE dequeue IS   l_payload      t_spl_queue_payload;   l_queue_record NUMBER;   dequeue_options    dbms_aq.dequeue_options_t;   message_properties dbms_aq.message_properties_t;   message_handle     RAW(16); BEGIN   dequeue_options.consumer_name := 'someguy';   dequeue_options.dequeue_mode  := dbms_aq.remove;   dequeue_options.navigation    := dbms_aq.next_message;   dequeue_options.visibility    := dbms_aq.immediate; --dequeue_options.wait          := dbms_aq.forever;   dequeue_options.wait          := dbms_aq.no_wait;   dequeue_options.msgid         := null; --   SELECT COUNT(*) INTO l_queue_record FROM AQ$SPL_QUEUE_TABLE WHERE msg_state = 'READY'; --   FOR i IN 1 .. l_queue_record LOOP     dbms_aq.dequeue(queue_name         => 'user1.SPL_AQ',                     dequeue_options    => dequeue_options,                     message_properties => message_properties,                     payload            => l_payload,                     msgid              => message_handle); --   /*……………………………………….   some time consuming calculation   ………………………………………….*/ END LOOP; END;

(1) consumer_name需要和前面在入队时指定的recipient一致。(2)wait的两个值forever和no_wait是指如果当前队列中无消息时,是否进行等待,默认等待。(3) navigation的两个值first_message和next_message,一般出于性能考虑我们使用后者,或者在第一次出队时使用前者而在随后的出队中使用后者:

The FIRST_MESSAGE navigation option performs a SELECT on the queue. The NEXT_ MESSAGE navigation option fetches from the results of the SELECT run in the FIRST_ MESSAGE navigation. Thus performance is optimized because subsequent dequeues need not run the entire SELECT again.

 

转载地址:http://wbnsa.baihongyu.com/

你可能感兴趣的文章
4、OC —— 构造方法
查看>>
我的友情链接
查看>>
java异常类总结
查看>>
SSH端口修改
查看>>
php归档函数(按时间)实现
查看>>
为什么colorbox要比fancybox强?
查看>>
Java 8新特性探究(8):精简的JRE详解
查看>>
EasyUI 关闭父窗口的modalDialog
查看>>
springCloud(17):统一管理微服务配置-简介
查看>>
WAS 管理节点和node同步报错
查看>>
微软MCITP系列课程(十四)搭建DNS服务器
查看>>
android rom移植知识普及
查看>>
精简SQL语句 提高MySQL服务器的扩展性
查看>>
SQL Server事务日志在修改数据时的角色
查看>>
全面替代nagios,打造高效监控报警系统-Centreon(第一部分:系统安装)
查看>>
jvisualvm 无法下载插件解决方案
查看>>
Android NDK开发_C++多文件
查看>>
SEO Drupal .htaccess 文件(适合multi-site)
查看>>
python向文件写入非打印字符
查看>>
让jquery easyui datagrid列支持绑定嵌套对象
查看>>