sys login
CREATE USER aq_test3 IDENTIFIED BY aq_test3 DEFAULT TABLESPACE MYPDB1_UD;
grant connect to aq_test3;
grant resource to aq_test3;
GRANT create type TO aq_test3;
GRANT aq_administrator_role TO aq_test3;
grant execute on dbms_aqadm to aq_test3;
grant execute on dbms_aq to aq_test3;
ALTER USER aq_test3 QUOTA UNLIMITED ON MYPDB1_UD;
grant dba to aq_test3; --可选项
grant execute on DBMS_AQ_BQVIEW to aq_test3;
grant create procedure to aq_test3;
grant create table to aq_test3;
grant create view to aq_test3;
grant unlimited tablespace to aq_test3;
2.創建測試用的表
create table CLIENT
(
client_id VARCHAR2(20),
name VARCHAR2(20),
sex VARCHAR2(20),
age NUMBER
);
create table CLIENT_LOG
(
client_id VARCHAR2(20),
method VARCHAR2(100),
deal_date DATE
);
- 创建payload_type
create or replace TYPE client_queue_payload_type AS OBJECT
(
client_id number(9),
method varchar2(20)
);
- 创建并启动队列
begin
--创建队列表
dbms_aqadm.create_queue_table(queue_table => ‘client_queue_table‘,
queue_payload_type => ‘client_queue_payload_type‘,
multiple_consumers => false);
--创建队列
DBMS_AQADM.CREATE_QUEUE(queue_name => ‘client_queue‘,
queue_table => ‘client_queue_table‘);
--启动队列
DBMS_AQADM.START_QUEUE(queue_name => ‘client_queue‘);
end;
- 创建触发器进行入列
create or replace trigger trigger_client
after insert or update or delete on client
for each row
declare
r_enqueue_options dbms_aq.enqueue_options_t;
r_message_properties dbms_aq.message_properties_t;
v_message_handle RAW(16);
o_payload client_queue_payload_type;
v_client_id number(9);
v_method varchar2(20);
begin
if inserting then
v_client_id := :new.client_id;
v_method := ‘I‘;
elsif deleting then
v_client_id := :old.client_id;
v_method := ‘D‘;
elsif updating then
v_client_id := :new.client_id;
v_method := ‘U‘;
end if;
o_payload := client_queue_payload_type(v_client_id, v_method);
dbms_aq.enqueue(queue_name => ‘client_queue‘,
enqueue_options => r_enqueue_options,
message_properties => r_message_properties,
payload => o_payload,
msgid => v_message_handle);
end trigger_client;
- 创建异步通知callback存储过程
create or replace procedure client_queue_callback_pro(context RAW,
reginfo sys.aq$_reg_info,
descr sys.aq$_descriptor,
payload RAW,
payloadl NUMBER) is
r_dequeue_options dbms_aq.dequeue_options_t;
r_message_properties dbms_aq.message_properties_t;
v_message_handle RAW(16);
o_payload client_queue_payload_type;
begin
r_dequeue_options.msgid := descr.msg_id;
r_dequeue_options.consumer_name := descr.consumer_name;
dbms_aq.dequeue(queue_name => descr.queue_name,
dequeue_options => r_dequeue_options,
message_properties => r_message_properties,
payload => o_payload,
msgid => v_message_handle);
insert into client_log
(client_id, method, deal_date)
values
(o_payload.client_id, o_payload.method, sysdate);
end client_queue_callback_pro;
- 注册异步通知
declare
reginfo sys.aq$_reg_info;
reg_list sys.aq$_reg_info_list;
begin
reginfo := sys.aq$_reg_info(‘client_queue‘,
DBMS_AQ.NAMESPACE_AQ,
‘plsql://client_queue_callback_pro?PR=0‘,
HEXTORAW(‘FF‘));
reg_list := sys.aq$_reg_info_list(reginfo);
dbms_aq.register(reg_list => reg_list, reg_count => 1);
-- dbms_aq.unregister(reg_list => reg_list, reg_count => 1);
end;
- 测试
insert into client(client_id,name,sex,age) values(‘1‘,‘xag‘,‘M‘,42);
insert into client(client_id,name,sex,age) values(‘2‘,‘yyc‘,‘F‘,37);
select * from client_log;
-------------------------------------------------------------------
CLIENT_ID METHOD DEAL_DATE
1 1 I 2020-11-03 13:07:05.115
2 2 I 2020-11-03 14:32:34.1134