码迷,mamicode.com
首页 > 数据库 > 详细

PLSQL NOTE--------Advanced Queue demo

时间:2016-04-18 11:52:41      阅读:233      评论:0      收藏:0      [点我收藏+]

标签:

1. sysdba 用户下赋权限

# sysprivilege.sql

prompt ---- connect to sysdba
set serveroutput on;

prompt ---- create user aq and aq_user
drop user aq cascade;
drop user aq_user cascade;
create user aq identified by aq;
create user aq_user identified by aq_user;

prompt ---- grant sys privilege
grant connect,resource to aq,aq_user;
grant aq_administrator_role,unlimited tablespace to aq;
grant execute on dbms_aq to aq;
grant execute on dbms_aqadm to aq;
grant select any dictionary to aq;
grant execute on dbms_aq to aq_user;
grant execute on dbms_aqadm to aq_user;

prompt ---- grant aq queue admin privilege
begin
	dbms_aqadm.grant_system_privilege(‘ENQUEUE_ANY‘, ‘aq‘, FALSE);
	dbms_aqadm.grant_system_privilege(‘DEQUEUE_ANY‘, ‘aq‘, FALSE);
end;
/
show errors;

 2. advanced queue 管理员    aq用户下

# aqinstall.sql

prompt ---- connect to aq
set serveroutput on;

prompt ---- construct message type
create or replace type aq_message force as object(
request clob,
id integer);
/
show errors;
grant execute on aq_message to aq_user; 

prompt ---- drop queue and queue table
begin
	DBMS_AQADM.STOP_QUEUE(
		queue_name => ‘demo_queue‘
	);
	DBMS_AQADM.DROP_QUEUE(
		queue_name => ‘demo_queue‘
	);
	DBMS_AQADM.DROP_QUEUE_TABLE(
		queue_table => ‘demo_queue_table‘
	);
end;
/
show errors;

prompt ---- create queue table and queue
begin
	DBMS_AQADM.CREATE_QUEUE_TABLE (
		queue_table => ‘demo_queue_table‘,
		queue_payload_type => ‘aq_message‘,
		multiple_consumers => TRUE
	);
	DBMS_AQADM.CREATE_QUEUE (
		queue_name => ‘demo_queue‘,
		queue_table => ‘demo_queue_table‘
	);
END;
/
show errors;

prompt ---- grant aq_user queue privilege
begin
	dbms_aqadm.GRANT_QUEUE_PRIVILEGE(‘ALL‘,‘demo_queue‘,‘aq_user‘, true);
end;
/
show errors;

prompt ---- add queue subscriber
begin
	DBMS_AQADM.ADD_SUBSCRIBER (
		queue_name => ‘demo_queue‘,
		subscriber => SYS.AQ$_AGENT(‘demo_sub‘,NULL,NULL)
	);
end;
/
show errors;

prompt ---- subscriber register
begin	
	DBMS_AQ.REGISTER (
		SYS.AQ$_REG_INFO_LIST(
		SYS.AQ$_REG_INFO(
		‘DEMO_QUEUE:demo_sub‘,
		DBMS_AQ.NAMESPACE_AQ,
		‘plsql://DEMO_QUEUE_CALLBACK_PROCEDURE‘,
		HEXTORAW(‘FF‘))),1);
END;
/
show errors;

prompt ---- add queue subscriber_1
begin
	DBMS_AQADM.ADD_SUBSCRIBER (
		queue_name => ‘demo_queue‘,
		subscriber => SYS.AQ$_AGENT(‘demo_sub_1‘,NULL,NULL)
	);
end;
/
show errors;

prompt ---- subscriber_1 register
begin	
	DBMS_AQ.REGISTER (
		SYS.AQ$_REG_INFO_LIST(
		SYS.AQ$_REG_INFO(
		‘DEMO_QUEUE:demo_sub_1‘,
		DBMS_AQ.NAMESPACE_AQ,
		‘plsql://demo_queue_call_procedure‘,
		HEXTORAW(‘FF‘))),1);
END;
/
show errors;

prompt ---- start queue
begin
	DBMS_AQADM.START_QUEUE (
		queue_name => ‘demo_queue‘
	);
end;
/
show errors;

prompt ---- subscriber count
SELECT count(*) FROM aq$demo_queue_table_s;

 3. aq 用户下  服务端处理

# plsqlcallback.sql

set serveroutput on;
CREATE TABLE demo_queue_message_table
( message VARCHAR2(4000),id integer );

create or replace procedure sendmessage(message in clob,id in integer, subcriber in varchar2) is
	r_enqueue_options DBMS_AQ.ENQUEUE_OPTIONS_T;
	r_message_properties DBMS_AQ.MESSAGE_PROPERTIES_T;
	v_message_handle RAW(16);
	o_payload aq.aq_message;
	rcpt_list dbms_aq.aq$_recipient_list_t;
	l_message clob;
begin
	l_message := message||to_clob(TO_CHAR(SYSTIMESTAMP, ‘DD-MON-YYYY HH24:MI:SS.FF3‘)||‘}‘);
	o_payload := aq.aq_message(l_message,id);
  
	rcpt_list(0) := sys.aq$_agent(subcriber, null, null);
	r_message_properties.recipient_list := rcpt_list;
	-- r_message_properties.delay := 10;
	DBMS_AQ.ENQUEUE(
		queue_name => ‘aq.demo_queue‘,
		enqueue_options => r_enqueue_options,
		message_properties => r_message_properties,
		payload => o_payload,
		msgid => v_message_handle
	);
COMMIT;
exception when others then
	dbms_output.put_line(sqlerrm);
	dbms_output.put_line(dbms_utility.format_error_backtrace); 
end;
/
show errors;

create or replace function api_func(request in clob) return clob is
	l_response clob;	
begin
	l_response := to_clob(‘send to client{‘);
	return l_response;
end;
/
show errors;

CREATE or replace PROCEDURE demo_queue_callback_procedure(
context RAW,
reginfo SYS.AQ$_REG_INFO,
descr SYS.AQ$_DESCRIPTOR,
payload RAW,
payloadl NUMBER
) AS
r_dequeue_options DBMS_AQ.DEQUEUE_OPTIONS_T;
r_message_properties DBMS_AQ.MESSAGE_PROPERTIES_T;
v_message_handle RAW(16);
o_payload aq.aq_message;
i_payload aq.aq_message;
i_message clob;
response clob;
l_id integer;
BEGIN
 	r_dequeue_options.msgid := descr.msg_id;
	r_dequeue_options.consumer_name := descr.consumer_name;
 
	DBMS_AQ.DEQUEUE(
		queue_name => descr.queue_name,
		dequeue_options => r_dequeue_options,
		message_properties => r_message_properties,
		payload => o_payload,
		msgid => v_message_handle
	);
	i_message := to_clob(‘Message [‘ || o_payload.request|| ‘] ‘ ||‘dequeued at [‘ || TO_CHAR( SYSTIMESTAMP,‘DD-MON-YYYY HH24:MI:SS.FF3‘ ) || ‘]‘);
	INSERT INTO demo_queue_message_table ( message,id) VALUES (to_char(i_message),o_payload.id);
	response := api_func(i_message);
	l_id := o_payload.id;
	sendmessage(response,l_id,‘demo_sub_1‘);
COMMIT;
exception when others then
	dbms_output.put_line(sqlerrm);
	dbms_output.put_line(dbms_utility.format_error_backtrace); 
END;
/
show errors;

 4. aq 用户下客户端处理

# plsqlcall.sql

CREATE TABLE demo_queue_message_receive
( message VARCHAR2(4000),id integer);

CREATE PROCEDURE demo_queue_call_procedure(
context RAW,
reginfo SYS.AQ$_REG_INFO,
descr SYS.AQ$_DESCRIPTOR,
payload RAW,
payloadl NUMBER
) AS
r_dequeue_options DBMS_AQ.DEQUEUE_OPTIONS_T;
r_message_properties DBMS_AQ.MESSAGE_PROPERTIES_T;
v_message_handle RAW(16);
o_payload aq.aq_message;
i_message clob;
BEGIN
 	r_dequeue_options.msgid := descr.msg_id;
	r_dequeue_options.consumer_name := descr.consumer_name;
 
	DBMS_AQ.DEQUEUE(
		queue_name => descr.queue_name,
		dequeue_options => r_dequeue_options,
		message_properties => r_message_properties,
		payload => o_payload,
		msgid => v_message_handle
	);
	i_message := to_clob(‘Message [‘ || o_payload.request|| ‘] ‘ ||‘dequeued at [‘ || TO_CHAR( SYSTIMESTAMP,‘DD-MON-YYYY HH24:MI:SS.FF3‘ ) || ‘]‘);
	INSERT INTO demo_queue_message_receive ( message,id) VALUES (to_char(i_message),o_payload.id);
	
COMMIT;
exception when others then
	dbms_output.put_line(sqlerrm);
	dbms_output.put_line(dbms_utility.format_error_backtrace); 
END;
/
show errors;

5. message 入队(客户端)

# subcriberenqueue.sql

set serveroutput on;
declare
	r_enqueue_options DBMS_AQ.ENQUEUE_OPTIONS_T;
	r_message_properties DBMS_AQ.MESSAGE_PROPERTIES_T;
	v_message_handle RAW(16);
	o_payload aq.aq_message;
	message clob;
	rcpt_list dbms_aq.aq$_recipient_list_t;
BEGIN
	message := to_clob(‘send to server{‘||TO_CHAR(SYSTIMESTAMP, ‘DD-MON-YYYY HH24:MI:SS.FF3‘||‘}‘)); 
	o_payload := aq.aq_message(message,1);
  
	rcpt_list(0) := sys.aq$_agent(‘demo_sub‘, null, null);
	r_message_properties.recipient_list := rcpt_list;
	--r_message_properties.delay := 10;
	DBMS_AQ.ENQUEUE(
		queue_name => ‘aq.demo_queue‘,
		enqueue_options => r_enqueue_options,
		message_properties => r_message_properties,
		payload => o_payload,
		msgid => v_message_handle
	);
COMMIT;
exception when others then
	dbms_output.put_line(sqlerrm);
	dbms_output.put_line(dbms_utility.format_error_backtrace); 
end;
/
show errors;

 6. 结果检查

# checkresult.sql

set serveroutput on;
prompt ---- message detail 
select ENQ_TIME, CONSUMER_NAME, USER_DATA from AQ$demo_queue_table;

prompt ---- server receive messages
SELECT message FROM demo_queue_message_table where id =1;

prompt ---- client receive messages
SELECT message FROM demo_queue_message_receive where id =1;

7. 安装脚本 && instruction

#!/bin/bash

sysname=sys
syspwd=111111

username=aq
userpwd=aq
port=1522

username1=aq_user
userpwd1=aq_user

sqlplus $sysname/$syspwd@localhost:$port as sysdba <<-EOF
SET SERVEROUTPUT ON SIZE 3000
@sysprivilege.sql
EOF

sqlplus $username/$userpwd@localhost:$port/XE <<-EOF
SET SERVEROUTPUT ON SIZE 3000
@aqinstall.sql
EOF

sqlplus $username/$userpwd@localhost:$port/XE <<-EOF
SET SERVEROUTPUT ON SIZE 3000
@plsqlcallback.sql
EOF

sqlplus $username/$userpwd@localhost:$port/XE <<-EOF
SET SERVEROUTPUT ON SIZE 3000
@plsqlcall.sql
EOF

++++++++++++++++++++++ instruction ++++++++++++++++++++

1. execute installaq.sh to install aq environment;
2. connect to user ‘aq_user‘ using password ‘aq_user‘;
3. execute subcriberenqueue.sql to send message;
4. connect to user ‘aq‘ using password ‘aq‘;
5. execute checkresult.sql to print the messages receive by server and receive by client.


the result is as below:

MESSAGE
--------------------------------------------------------------------------------
Message [send to server{15-APR-2016 18:08:20.548}] dequeued at [15-APR-2016 18:0
8:20.580]


---- client receive messages

MESSAGE
--------------------------------------------------------------------------------
Message [send to client{15-APR-2016 18:08:20.581}] dequeued at [15-APR-2016 18:0
8:20.589]

 

PLSQL NOTE--------Advanced Queue demo

标签:

原文地址:http://www.cnblogs.com/ct-blog/p/5403695.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!