标签:
1. sysdba 用户下赋权限
# sysprivilege.sql
prompt ---- connect to sysdba set serveroutput on; prompt ---- create user aq and aq_user drop user aq cascade; drop user aq_user cascade; create user aq identified by aq; create user aq_user identified by aq_user; prompt ---- grant sys privilege grant connect,resource to aq,aq_user; grant aq_administrator_role,unlimited tablespace to aq; grant execute on dbms_aq to aq; grant execute on dbms_aqadm to aq; grant select any dictionary to aq; grant execute on dbms_aq to aq_user; grant execute on dbms_aqadm to aq_user; prompt ---- grant aq queue admin privilege begin dbms_aqadm.grant_system_privilege(‘ENQUEUE_ANY‘, ‘aq‘, FALSE); dbms_aqadm.grant_system_privilege(‘DEQUEUE_ANY‘, ‘aq‘, FALSE); end; / show errors;
2. advanced queue 管理员 aq用户下
# aqinstall.sql
prompt ---- connect to aq set serveroutput on; prompt ---- construct message type create or replace type aq_message force as object( request clob, id integer); / show errors; grant execute on aq_message to aq_user; prompt ---- drop queue and queue table begin DBMS_AQADM.STOP_QUEUE( queue_name => ‘demo_queue‘ ); DBMS_AQADM.DROP_QUEUE( queue_name => ‘demo_queue‘ ); DBMS_AQADM.DROP_QUEUE_TABLE( queue_table => ‘demo_queue_table‘ ); end; / show errors; prompt ---- create queue table and queue begin DBMS_AQADM.CREATE_QUEUE_TABLE ( queue_table => ‘demo_queue_table‘, queue_payload_type => ‘aq_message‘, multiple_consumers => TRUE ); DBMS_AQADM.CREATE_QUEUE ( queue_name => ‘demo_queue‘, queue_table => ‘demo_queue_table‘ ); END; / show errors; prompt ---- grant aq_user queue privilege begin dbms_aqadm.GRANT_QUEUE_PRIVILEGE(‘ALL‘,‘demo_queue‘,‘aq_user‘, true); end; / show errors; prompt ---- add queue subscriber begin DBMS_AQADM.ADD_SUBSCRIBER ( queue_name => ‘demo_queue‘, subscriber => SYS.AQ$_AGENT(‘demo_sub‘,NULL,NULL) ); end; / show errors; prompt ---- subscriber register begin DBMS_AQ.REGISTER ( SYS.AQ$_REG_INFO_LIST( SYS.AQ$_REG_INFO( ‘DEMO_QUEUE:demo_sub‘, DBMS_AQ.NAMESPACE_AQ, ‘plsql://DEMO_QUEUE_CALLBACK_PROCEDURE‘, HEXTORAW(‘FF‘))),1); END; / show errors; prompt ---- add queue subscriber_1 begin DBMS_AQADM.ADD_SUBSCRIBER ( queue_name => ‘demo_queue‘, subscriber => SYS.AQ$_AGENT(‘demo_sub_1‘,NULL,NULL) ); end; / show errors; prompt ---- subscriber_1 register begin DBMS_AQ.REGISTER ( SYS.AQ$_REG_INFO_LIST( SYS.AQ$_REG_INFO( ‘DEMO_QUEUE:demo_sub_1‘, DBMS_AQ.NAMESPACE_AQ, ‘plsql://demo_queue_call_procedure‘, HEXTORAW(‘FF‘))),1); END; / show errors; prompt ---- start queue begin DBMS_AQADM.START_QUEUE ( queue_name => ‘demo_queue‘ ); end; / show errors; prompt ---- subscriber count SELECT count(*) FROM aq$demo_queue_table_s;
3. aq 用户下 服务端处理
# plsqlcallback.sql
set serveroutput on; CREATE TABLE demo_queue_message_table ( message VARCHAR2(4000),id integer ); create or replace procedure sendmessage(message in clob,id in integer, subcriber in varchar2) is r_enqueue_options DBMS_AQ.ENQUEUE_OPTIONS_T; r_message_properties DBMS_AQ.MESSAGE_PROPERTIES_T; v_message_handle RAW(16); o_payload aq.aq_message; rcpt_list dbms_aq.aq$_recipient_list_t; l_message clob; begin l_message := message||to_clob(TO_CHAR(SYSTIMESTAMP, ‘DD-MON-YYYY HH24:MI:SS.FF3‘)||‘}‘); o_payload := aq.aq_message(l_message,id); rcpt_list(0) := sys.aq$_agent(subcriber, null, null); r_message_properties.recipient_list := rcpt_list; -- r_message_properties.delay := 10; DBMS_AQ.ENQUEUE( queue_name => ‘aq.demo_queue‘, enqueue_options => r_enqueue_options, message_properties => r_message_properties, payload => o_payload, msgid => v_message_handle ); COMMIT; exception when others then dbms_output.put_line(sqlerrm); dbms_output.put_line(dbms_utility.format_error_backtrace); end; / show errors; create or replace function api_func(request in clob) return clob is l_response clob; begin l_response := to_clob(‘send to client{‘); return l_response; end; / show errors; CREATE or replace PROCEDURE demo_queue_callback_procedure( context RAW, reginfo SYS.AQ$_REG_INFO, descr SYS.AQ$_DESCRIPTOR, payload RAW, payloadl NUMBER ) AS r_dequeue_options DBMS_AQ.DEQUEUE_OPTIONS_T; r_message_properties DBMS_AQ.MESSAGE_PROPERTIES_T; v_message_handle RAW(16); o_payload aq.aq_message; i_payload aq.aq_message; i_message clob; response clob; l_id integer; BEGIN r_dequeue_options.msgid := descr.msg_id; r_dequeue_options.consumer_name := descr.consumer_name; DBMS_AQ.DEQUEUE( queue_name => descr.queue_name, dequeue_options => r_dequeue_options, message_properties => r_message_properties, payload => o_payload, msgid => v_message_handle ); i_message := to_clob(‘Message [‘ || o_payload.request|| ‘] ‘ ||‘dequeued at [‘ || TO_CHAR( SYSTIMESTAMP,‘DD-MON-YYYY HH24:MI:SS.FF3‘ ) || ‘]‘); INSERT INTO demo_queue_message_table ( message,id) VALUES (to_char(i_message),o_payload.id); response := api_func(i_message); l_id := o_payload.id; sendmessage(response,l_id,‘demo_sub_1‘); COMMIT; exception when others then dbms_output.put_line(sqlerrm); dbms_output.put_line(dbms_utility.format_error_backtrace); END; / show errors;
4. aq 用户下客户端处理
# plsqlcall.sql
CREATE TABLE demo_queue_message_receive ( message VARCHAR2(4000),id integer); CREATE PROCEDURE demo_queue_call_procedure( context RAW, reginfo SYS.AQ$_REG_INFO, descr SYS.AQ$_DESCRIPTOR, payload RAW, payloadl NUMBER ) AS r_dequeue_options DBMS_AQ.DEQUEUE_OPTIONS_T; r_message_properties DBMS_AQ.MESSAGE_PROPERTIES_T; v_message_handle RAW(16); o_payload aq.aq_message; i_message clob; BEGIN r_dequeue_options.msgid := descr.msg_id; r_dequeue_options.consumer_name := descr.consumer_name; DBMS_AQ.DEQUEUE( queue_name => descr.queue_name, dequeue_options => r_dequeue_options, message_properties => r_message_properties, payload => o_payload, msgid => v_message_handle ); i_message := to_clob(‘Message [‘ || o_payload.request|| ‘] ‘ ||‘dequeued at [‘ || TO_CHAR( SYSTIMESTAMP,‘DD-MON-YYYY HH24:MI:SS.FF3‘ ) || ‘]‘); INSERT INTO demo_queue_message_receive ( message,id) VALUES (to_char(i_message),o_payload.id); COMMIT; exception when others then dbms_output.put_line(sqlerrm); dbms_output.put_line(dbms_utility.format_error_backtrace); END; / show errors;
5. message 入队(客户端)
# subcriberenqueue.sql
set serveroutput on; declare r_enqueue_options DBMS_AQ.ENQUEUE_OPTIONS_T; r_message_properties DBMS_AQ.MESSAGE_PROPERTIES_T; v_message_handle RAW(16); o_payload aq.aq_message; message clob; rcpt_list dbms_aq.aq$_recipient_list_t; BEGIN message := to_clob(‘send to server{‘||TO_CHAR(SYSTIMESTAMP, ‘DD-MON-YYYY HH24:MI:SS.FF3‘||‘}‘)); o_payload := aq.aq_message(message,1); rcpt_list(0) := sys.aq$_agent(‘demo_sub‘, null, null); r_message_properties.recipient_list := rcpt_list; --r_message_properties.delay := 10; DBMS_AQ.ENQUEUE( queue_name => ‘aq.demo_queue‘, enqueue_options => r_enqueue_options, message_properties => r_message_properties, payload => o_payload, msgid => v_message_handle ); COMMIT; exception when others then dbms_output.put_line(sqlerrm); dbms_output.put_line(dbms_utility.format_error_backtrace); end; / show errors;
6. 结果检查
# checkresult.sql
set serveroutput on; prompt ---- message detail select ENQ_TIME, CONSUMER_NAME, USER_DATA from AQ$demo_queue_table; prompt ---- server receive messages SELECT message FROM demo_queue_message_table where id =1; prompt ---- client receive messages SELECT message FROM demo_queue_message_receive where id =1;
7. 安装脚本 && instruction
#!/bin/bash sysname=sys syspwd=111111 username=aq userpwd=aq port=1522 username1=aq_user userpwd1=aq_user sqlplus $sysname/$syspwd@localhost:$port as sysdba <<-EOF SET SERVEROUTPUT ON SIZE 3000 @sysprivilege.sql EOF sqlplus $username/$userpwd@localhost:$port/XE <<-EOF SET SERVEROUTPUT ON SIZE 3000 @aqinstall.sql EOF sqlplus $username/$userpwd@localhost:$port/XE <<-EOF SET SERVEROUTPUT ON SIZE 3000 @plsqlcallback.sql EOF sqlplus $username/$userpwd@localhost:$port/XE <<-EOF SET SERVEROUTPUT ON SIZE 3000 @plsqlcall.sql EOF
++++++++++++++++++++++ instruction ++++++++++++++++++++
1. execute installaq.sh to install aq environment;
2. connect to user ‘aq_user‘ using password ‘aq_user‘;
3. execute subcriberenqueue.sql to send message;
4. connect to user ‘aq‘ using password ‘aq‘;
5. execute checkresult.sql to print the messages receive by server and receive by client.
the result is as below:
MESSAGE
--------------------------------------------------------------------------------
Message [send to server{15-APR-2016 18:08:20.548}] dequeued at [15-APR-2016 18:0
8:20.580]
---- client receive messages
MESSAGE
--------------------------------------------------------------------------------
Message [send to client{15-APR-2016 18:08:20.581}] dequeued at [15-APR-2016 18:0
8:20.589]
PLSQL NOTE--------Advanced Queue demo
标签:
原文地址:http://www.cnblogs.com/ct-blog/p/5403695.html