标签:
1. sysdba 用户下赋权限
# sysprivilege.sql
prompt ---- connect to sysdba set serveroutput on; prompt ---- create user aq and aq_user drop user aq cascade; drop user aq_user cascade; create user aq identified by aq; create user aq_user identified by aq_user; prompt ---- grant sys privilege grant connect,resource to aq,aq_user; grant aq_administrator_role,unlimited tablespace to aq; grant execute on dbms_aq to aq; grant execute on dbms_aqadm to aq; grant select any dictionary to aq; grant execute on dbms_aq to aq_user; grant execute on dbms_aqadm to aq_user; prompt ---- grant aq queue admin privilege begin dbms_aqadm.grant_system_privilege(‘ENQUEUE_ANY‘, ‘aq‘, FALSE); dbms_aqadm.grant_system_privilege(‘DEQUEUE_ANY‘, ‘aq‘, FALSE); end; / show errors;
2. advanced queue 管理员 aq用户下
# aqinstall.sql
prompt ---- connect to aq set serveroutput on; prompt ---- construct message type create or replace type aq_message force as object( request clob, id integer); / show errors; grant execute on aq_message to aq_user; prompt ---- drop queue and queue table begin DBMS_AQADM.STOP_QUEUE( queue_name => ‘demo_queue‘ ); DBMS_AQADM.DROP_QUEUE( queue_name => ‘demo_queue‘ ); DBMS_AQADM.DROP_QUEUE_TABLE( queue_table => ‘demo_queue_table‘ ); end; / show errors; prompt ---- create queue table and queue begin DBMS_AQADM.CREATE_QUEUE_TABLE ( queue_table => ‘demo_queue_table‘, queue_payload_type => ‘aq_message‘, multiple_consumers => TRUE ); DBMS_AQADM.CREATE_QUEUE ( queue_name => ‘demo_queue‘, queue_table => ‘demo_queue_table‘ ); END; / show errors; prompt ---- grant aq_user queue privilege begin dbms_aqadm.GRANT_QUEUE_PRIVILEGE(‘ALL‘,‘demo_queue‘,‘aq_user‘, true); end; / show errors; prompt ---- add queue subscriber begin DBMS_AQADM.ADD_SUBSCRIBER ( queue_name => ‘demo_queue‘, subscriber => SYS.AQ$_AGENT(‘demo_sub‘,NULL,NULL) ); end; / show errors; prompt ---- subscriber register begin DBMS_AQ.REGISTER ( SYS.AQ$_REG_INFO_LIST( SYS.AQ$_REG_INFO( ‘DEMO_QUEUE:demo_sub‘, DBMS_AQ.NAMESPACE_AQ, ‘plsql://DEMO_QUEUE_CALLBACK_PROCEDURE‘, HEXTORAW(‘FF‘))),1); END; / show errors; prompt ---- add queue subscriber_1 begin DBMS_AQADM.ADD_SUBSCRIBER ( queue_name => ‘demo_queue‘, subscriber => SYS.AQ$_AGENT(‘demo_sub_1‘,NULL,NULL) ); end; / show errors; prompt ---- subscriber_1 register begin DBMS_AQ.REGISTER ( SYS.AQ$_REG_INFO_LIST( SYS.AQ$_REG_INFO( ‘DEMO_QUEUE:demo_sub_1‘, DBMS_AQ.NAMESPACE_AQ, ‘plsql://demo_queue_call_procedure‘, HEXTORAW(‘FF‘))),1); END; / show errors; prompt ---- start queue begin DBMS_AQADM.START_QUEUE ( queue_name => ‘demo_queue‘ ); end; / show errors; prompt ---- subscriber count SELECT count(*) FROM aq$demo_queue_table_s;
3. aq 用户下 服务端处理
# plsqlcallback.sql
set serveroutput on;
CREATE TABLE demo_queue_message_table
( message VARCHAR2(4000),id integer );
create or replace procedure sendmessage(message in clob,id in integer, subcriber in varchar2) is
r_enqueue_options DBMS_AQ.ENQUEUE_OPTIONS_T;
r_message_properties DBMS_AQ.MESSAGE_PROPERTIES_T;
v_message_handle RAW(16);
o_payload aq.aq_message;
rcpt_list dbms_aq.aq$_recipient_list_t;
l_message clob;
begin
l_message := message||to_clob(TO_CHAR(SYSTIMESTAMP, ‘DD-MON-YYYY HH24:MI:SS.FF3‘)||‘}‘);
o_payload := aq.aq_message(l_message,id);
rcpt_list(0) := sys.aq$_agent(subcriber, null, null);
r_message_properties.recipient_list := rcpt_list;
-- r_message_properties.delay := 10;
DBMS_AQ.ENQUEUE(
queue_name => ‘aq.demo_queue‘,
enqueue_options => r_enqueue_options,
message_properties => r_message_properties,
payload => o_payload,
msgid => v_message_handle
);
COMMIT;
exception when others then
dbms_output.put_line(sqlerrm);
dbms_output.put_line(dbms_utility.format_error_backtrace);
end;
/
show errors;
create or replace function api_func(request in clob) return clob is
l_response clob;
begin
l_response := to_clob(‘send to client{‘);
return l_response;
end;
/
show errors;
CREATE or replace PROCEDURE demo_queue_callback_procedure(
context RAW,
reginfo SYS.AQ$_REG_INFO,
descr SYS.AQ$_DESCRIPTOR,
payload RAW,
payloadl NUMBER
) AS
r_dequeue_options DBMS_AQ.DEQUEUE_OPTIONS_T;
r_message_properties DBMS_AQ.MESSAGE_PROPERTIES_T;
v_message_handle RAW(16);
o_payload aq.aq_message;
i_payload aq.aq_message;
i_message clob;
response clob;
l_id integer;
BEGIN
r_dequeue_options.msgid := descr.msg_id;
r_dequeue_options.consumer_name := descr.consumer_name;
DBMS_AQ.DEQUEUE(
queue_name => descr.queue_name,
dequeue_options => r_dequeue_options,
message_properties => r_message_properties,
payload => o_payload,
msgid => v_message_handle
);
i_message := to_clob(‘Message [‘ || o_payload.request|| ‘] ‘ ||‘dequeued at [‘ || TO_CHAR( SYSTIMESTAMP,‘DD-MON-YYYY HH24:MI:SS.FF3‘ ) || ‘]‘);
INSERT INTO demo_queue_message_table ( message,id) VALUES (to_char(i_message),o_payload.id);
response := api_func(i_message);
l_id := o_payload.id;
sendmessage(response,l_id,‘demo_sub_1‘);
COMMIT;
exception when others then
dbms_output.put_line(sqlerrm);
dbms_output.put_line(dbms_utility.format_error_backtrace);
END;
/
show errors;
4. aq 用户下客户端处理
# plsqlcall.sql
CREATE TABLE demo_queue_message_receive ( message VARCHAR2(4000),id integer); CREATE PROCEDURE demo_queue_call_procedure( context RAW, reginfo SYS.AQ$_REG_INFO, descr SYS.AQ$_DESCRIPTOR, payload RAW, payloadl NUMBER ) AS r_dequeue_options DBMS_AQ.DEQUEUE_OPTIONS_T; r_message_properties DBMS_AQ.MESSAGE_PROPERTIES_T; v_message_handle RAW(16); o_payload aq.aq_message; i_message clob; BEGIN r_dequeue_options.msgid := descr.msg_id; r_dequeue_options.consumer_name := descr.consumer_name; DBMS_AQ.DEQUEUE( queue_name => descr.queue_name, dequeue_options => r_dequeue_options, message_properties => r_message_properties, payload => o_payload, msgid => v_message_handle ); i_message := to_clob(‘Message [‘ || o_payload.request|| ‘] ‘ ||‘dequeued at [‘ || TO_CHAR( SYSTIMESTAMP,‘DD-MON-YYYY HH24:MI:SS.FF3‘ ) || ‘]‘); INSERT INTO demo_queue_message_receive ( message,id) VALUES (to_char(i_message),o_payload.id); COMMIT; exception when others then dbms_output.put_line(sqlerrm); dbms_output.put_line(dbms_utility.format_error_backtrace); END; / show errors;
5. message 入队(客户端)
# subcriberenqueue.sql
set serveroutput on;
declare
r_enqueue_options DBMS_AQ.ENQUEUE_OPTIONS_T;
r_message_properties DBMS_AQ.MESSAGE_PROPERTIES_T;
v_message_handle RAW(16);
o_payload aq.aq_message;
message clob;
rcpt_list dbms_aq.aq$_recipient_list_t;
BEGIN
message := to_clob(‘send to server{‘||TO_CHAR(SYSTIMESTAMP, ‘DD-MON-YYYY HH24:MI:SS.FF3‘||‘}‘));
o_payload := aq.aq_message(message,1);
rcpt_list(0) := sys.aq$_agent(‘demo_sub‘, null, null);
r_message_properties.recipient_list := rcpt_list;
--r_message_properties.delay := 10;
DBMS_AQ.ENQUEUE(
queue_name => ‘aq.demo_queue‘,
enqueue_options => r_enqueue_options,
message_properties => r_message_properties,
payload => o_payload,
msgid => v_message_handle
);
COMMIT;
exception when others then
dbms_output.put_line(sqlerrm);
dbms_output.put_line(dbms_utility.format_error_backtrace);
end;
/
show errors;
6. 结果检查
# checkresult.sql
set serveroutput on; prompt ---- message detail select ENQ_TIME, CONSUMER_NAME, USER_DATA from AQ$demo_queue_table; prompt ---- server receive messages SELECT message FROM demo_queue_message_table where id =1; prompt ---- client receive messages SELECT message FROM demo_queue_message_receive where id =1;
7. 安装脚本 && instruction
#!/bin/bash sysname=sys syspwd=111111 username=aq userpwd=aq port=1522 username1=aq_user userpwd1=aq_user sqlplus $sysname/$syspwd@localhost:$port as sysdba <<-EOF SET SERVEROUTPUT ON SIZE 3000 @sysprivilege.sql EOF sqlplus $username/$userpwd@localhost:$port/XE <<-EOF SET SERVEROUTPUT ON SIZE 3000 @aqinstall.sql EOF sqlplus $username/$userpwd@localhost:$port/XE <<-EOF SET SERVEROUTPUT ON SIZE 3000 @plsqlcallback.sql EOF sqlplus $username/$userpwd@localhost:$port/XE <<-EOF SET SERVEROUTPUT ON SIZE 3000 @plsqlcall.sql EOF
++++++++++++++++++++++ instruction ++++++++++++++++++++
1. execute installaq.sh to install aq environment;
2. connect to user ‘aq_user‘ using password ‘aq_user‘;
3. execute subcriberenqueue.sql to send message;
4. connect to user ‘aq‘ using password ‘aq‘;
5. execute checkresult.sql to print the messages receive by server and receive by client.
the result is as below:
MESSAGE
--------------------------------------------------------------------------------
Message [send to server{15-APR-2016 18:08:20.548}] dequeued at [15-APR-2016 18:0
8:20.580]
---- client receive messages
MESSAGE
--------------------------------------------------------------------------------
Message [send to client{15-APR-2016 18:08:20.581}] dequeued at [15-APR-2016 18:0
8:20.589]
PLSQL NOTE--------Advanced Queue demo
标签:
原文地址:http://www.cnblogs.com/ct-blog/p/5403695.html