Tuesday, October 27, 2015

Oracle AQ Buffered Queues

Oracle AQ Buffered Queues


Set Up the environnement to use SQL*Plus and to connect as SYSDBA
Create a DEMO user and a User Defined Type in the 2 databases
Create a database link between the source and the destination databases
Create and start queues
Create a subscribers on the source queue and schedule propagation to the destination queue
Create an enqueue procedure in the source database
Create a dequeue procedure in the destination database
Enqueue a message on one end and dequeue it on the other end
Clean Up the environment
You can start right away...
Set Up the environnement to use SQL*Plus and to connect as SYSDBA

To begin, you'll set the SQL*Plus variables that follow:
sourcegdb defines the global_name AND the network alias of the source database.
We assume global_names=true, even if that's not strictly mandatory.
It helps to make things more readable. Make sure you have the aliases setup everywhere and they match the database global names.
destgdb defines the global_name AND the network alias of the destination database.
source_user and dest_user define names of SYSDBA users on the source and on the destination databases.
source_pwd and dest_pwd define the passwords of the corresponding source_user and dest_user.
accept sourcegdb default 'BLACK' -
prompt "Enter the Source Database Global Name                [BLACK]: "

accept destgdb   default 'WHITE' -
prompt "Enter the Destination Database Global Name           [WHITE]: "

accept source_user default 'sys'  -
prompt "Enter the Destination SYSDBA user                      [sys]: "

accept source_pwd  default 'change_on_install'  -
prompt "Enter the Destination SYSDBA password    [change_on_install]: "

accept dest_user default 'sys'  -
prompt "Enter the Destination SYSDBA user                      [sys]: "

accept dest_pwd  default 'change_on_install'  -
prompt "Enter the Destination SYSDBA password    [change_on_install]: "
Create a User DEMO and a User Defined Type in the 2 databases

To go on with the example, you'll need to create the DEMO user in the 2 databases and create a type
you'll use to send and receive messages. The script below creates those users and types.
Note:
For this example, we assume there is no user named DEMO. We also assume the 2 tablespaces USERS and TEMP exist in the databases.
There is no need for any of the database to run in ARCHIVELOG mode.

connect &&source_user/&&source_pwd@&&sourcegdb as sysdba

create user demo
identified by demo
default tablespace users
temporary tablespace temp;

grant connect, resource, dba to demo;
grant execute on dbms_aq to demo;
grant execute on dbms_aqadm to demo;

connect &&dest_user/&&dest_pwd@&&destgdb as sysdba

create user demo
identified by demo
default tablespace users
temporary tablespace temp;

grant connect, resource, dba to demo;
grant execute on dbms_aq to demo;
grant execute on dbms_aqadm to demo;

connect demo/demo@&&sourcegdb

create type mytype as object (
        id     number
      , field1 varchar2(4000)
      , field2 varchar2(4000));
/

connect demo/demo@&&destgdb

create type mytype as object (
        id     number
      , field1 varchar2(4000)
      , field2 varchar2(4000));
/
Create a database link between the source and the destination databases

To start the propagation job, you'll need the source database to connect to the destination database. Create and test a database link for that purpose:
connect demo/demo@&&sourcegdb

create database link &&destgdb
 connect to demo
 identified by demo
 using '&&destgdb';

select * from dual@&&destgdb;
Create and start queues

Then, create and start the queues:
connect demo/demo@&&destgdb

begin
dbms_aqadm.create_queue_table(
     'myqueue_table'
   , 'mytype'
   , multiple_consumers => true);
end;
/

begin
dbms_aqadm.create_queue(
     'myqueue'
   , 'myqueue_table');
end;
/

begin
dbms_aqadm.start_queue('myqueue');
end;
/

connect demo/demo@&&sourcegdb

begin
dbms_aqadm.create_queue_table(
     'myqueue_table'
   , 'mytype'
   , multiple_consumers => true);
end;
/

begin
dbms_aqadm.create_queue(
     'myqueue'
   , 'myqueue_table');
end;
/

begin
dbms_aqadm.start_queue('myqueue');
end;
/
Create a subscribers on the source queue and schedule propagation to the destination queue

The next step consists in adding the subscribers that match the destination queue, to the source queue.
In this example, we add 2 subscribers because we will eventually dequeue the messages from 2 separate programs
(or for 2 distinct purposes). Once done, check the queues are compatibles and schedule the QUEUE to QUEUE propagation.
dba_queue_schedules provides detailed informations about what is scheduled:
connect demo/demo@&&sourcegdb

begin
dbms_aqadm.add_subscriber(
     queue_name => 'myqueue'
   , subscriber => sys.aq$_agent('RED','demo.myqueue@&&destgdb',null)
   , queue_to_queue => true
   , delivery_mode  => dbms_aqadm.buffered);
end;
/

begin
dbms_aqadm.add_subscriber(
     queue_name => 'myqueue'
   , subscriber => sys.aq$_agent('BLUE','demo.myqueue@&&destgdb',null)
   , queue_to_queue => true
   , delivery_mode  => dbms_aqadm.buffered);
end;
/

set serveroutput on

declare
rc binary_integer;
begin
dbms_aqadm.verify_queue_types(
     src_queue_name  => 'myqueue'
   , dest_queue_name => 'demo.myqueue'
   , destination     => '&&destgdb'
   , rc => rc);
dbms_output.put_line('If result is 1, it''s OKAY: '||rc);
end;
/

begin
dbms_aqadm.schedule_propagation(
     queue_name => 'myqueue'
   , destination => '&&destgdb'
   , destination_queue => 'demo.myqueue');
end;
/

set pages 1000
select schema
   , qname
   , destination
   , start_time
   , latency
   , schedule_disabled
   , session_id
   , total_number
   , failures
   , last_error_msg
   , message_delivery_mode
from dba_queue_schedules;
Create an enqueue procedure in the source database

Create an enqueue procedure demo_enqueue, that enqueues a message in the buffered part of the queue:
connect demo/demo@&&sourcegdb

create or replace procedure demo_enqueue(p_mytype mytype) is
enqueue_options     DBMS_AQ.enqueue_options_t;
message_properties  DBMS_AQ.message_properties_t;
recipients          DBMS_AQ.aq$_recipient_list_t;
message_handle      RAW(16);
begin
enqueue_options.visibility := dbms_aq.immediate;
enqueue_options.delivery_mode := dbms_aq.buffered;
dbms_aq.enqueue(
   queue_name         => 'MYQUEUE',
   enqueue_options    => enqueue_options,
   message_properties => message_properties,
   payload            => p_mytype,
   msgid              => message_handle);
commit;
end;
/
Create a dequeue procedure in the destination database

demo_dequeue dequeues messages from the destination queue based on the consumer name:
connect demo/demo@&&destgdb

select * from aq$myqueue_table_S;

set serveroutput on

create or replace procedure demo_dequeue(p_consumer varchar2)
is
dequeue_options       dbms_aq.dequeue_options_t;
message_properties    dbms_aq.message_properties_t;
message_handle        RAW(16);
v_mytype              mytype;
no_messages           exception;
pragma exception_init(no_messages, -25228);
begin
dequeue_options.wait          := dbms_aq.no_wait;
dequeue_options.consumer_name := p_consumer;
dequeue_options.navigation    := dbms_aq.first_message;
dequeue_options.visibility    := dbms_aq.immediate;
dequeue_options.delivery_mode := dbms_aq.buffered;
loop
begin
dbms_aq.dequeue(
   queue_name         => 'myqueue',
   dequeue_options    => dequeue_options,
   message_properties => message_properties,
   payload            => v_mytype,
   msgid              => message_handle);
dbms_output.put_line('---------------------------------------------------------');
dbms_output.put_line('Message for Consumer "'||p_consumer||'": ');
dbms_output.put_line('ID    :'||to_char(v_mytype.id));
dbms_output.put_line('FIELD1:'||v_mytype.field1);
dbms_output.put_line('FIELD2:'||v_mytype.field2);
dbms_output.put_line('---------------------------------------------------------');
dequeue_options.navigation := dbms_aq.next_message;
end;
end loop;
exception
when no_messages then
  dbms_output.put_line('No more messages');
  commit;
end;
/
Enqueue a message on one end and dequeue it on the other end

You are ready to test your case. Enqueue a message and check you get the messages for the 2 subscribers:
connect demo/demo@&&sourcegdb
set serveroutput on
declare
v_mytype mytype;
begin
v_mytype := mytype(1, 'BLUE AND RED','Red And Blue');
demo_enqueue(v_mytype);
end;
/

select * from aq$myqueue_table;


connect demo/demo@&&destgdb

select * from aq$myqueue_table;

set serveroutput on
exec demo_dequeue('BLUE')
exec demo_dequeue('RED')
Note:
aq$myqueue_table help to monitor the messages. The propagation has been set without any time means that the messages are always sent from the source to the destination; the latency (default to 60 in that case), is the only thing that can slightly impact the time needed for the message to be available for the consumers
Clean Up the environment

You are done! Before you leave, suppress the AQ propagation schedule, the queues and the DEMO users:
connect &&source_user/&&source_pwd@&&sourcegdb as sysdba

begin
dbms_aqadm.UNSCHEDULE_PROPAGATION(
                  queue_name        => 'demo.myqueue'
                , destination       => '&&destgdb'
                , destination_queue => 'DEMO.MYQUEUE');
end;
/

exec dbms_aqadm.drop_queue_table('demo.myqueue_table',TRUE)

drop user demo cascade;

select * from dba_queue_schedules;

connect &&dest_user/&&dest_pwd@&&destgdb as sysdba

exec dbms_aqadm.drop_queue_table('demo.myqueue_table',TRUE)

drop user demo cascade;

No comments:

Post a Comment