Oracle8 Server Application Developer's Guide
Release 8.0
A54642_01

Library

Product

Contents

Index


Prev Next

11
Advanced Queuing

This chapter has three sections:

Introduction to Oracle Advanced Queuing

Introduction Overview

This introductory section:

Complex Systems

Three Scenarios

Consider the following scenarios.

_______________________________________________________________________
A brokerage firm, Makers & Breakers, advertises to the public that its 
new service will let clients stipulate time as well as (or instead of) 
price as a parameter i.e. a request to buy or sell is not executed unless 
it takes place within a specific time period (e.g., within 15 minutes). 
The campaign is extremely successful and a mutual fund house, America's 
Standard Guarantee, takes advantage of the technology to offer its 
clients an opportunity to buy and sell units during course of the day 
rather than at the close of trading. However, M&B are informed by the 
Securities and Exchange Commission that they have received two 
complaints:

 (a) That in executing the buy/sell orders M&B are giving an unfair 
advantage to large customers, such as the mutual fund house.

 (b) That in managing the time parameter M&B are taking advantage of their 
customers e.g., in a falling market selling earlier in the time period 
than they inform their clients, and then pocketing the difference. 

The problem facing Makers & Breakers is not only to answer these unfounded 
charges but to do so quickly and in such a way as to leave no doubt in 
the minds of the public regarding the fairness of their practices.      

A large state university(35,000 students) decides to automate its class 
enrollment process. Students will be able to register for classes using 
web templates from home or at terminals on both the main and satellite 
campuses for any of the more than a thousand classes offered by Big U. 
The administration announces that the following parameters will apply: 

 * Priority: registration is on a `first come' basis except that 

   - seniors receive priority for upper level courses, followed by 
juniors, sophomores and frosh; 

   - frosh receive priority for entry level courses, followed by a senior 
who needs the course to graduate. 

 * Registration phases: the above priority criteria hold only for specific 
defined time phases e.g., in the second phase, seniors and juniors are 
treated as being on an equal `first come' basis with regard to upper-
level classes, but continue to receive priority over juniors and frosh. 

* Full-time undergraduates must register for a minimum of three classes 
and may register for a maximum of four classes without special permission. 
Students may register for as many as ten classes (ranking their 
preferences 1-10) in case they are not admitted to their preferred 
classes, but only the first three choices are regarded as `live'. In the 
event that a class becomes full, the student's next choice becomes `live'. 
However, should a full class develop a vacancy, the student with the 
highest priority will be admitted, at that time being removed from the 
roll of a class of her/his lower preference. 

A power utility, Most Power, develops a sophisticated model in order to 
decide how to deploy its resources. The way the system works is that the 
utility gets ongoing reports from 

 * numerous weather centers regarding current conditions, and   

 * power stations regarding ongoing utilization. 

It then compares this information to historical data in order to predict 
demand for power in specific geographic areas for given time periods. A 
crucial part of this modeling has to do with noting the rapidity and 
degree of change in the incoming reports as weather changes and power is 
deployed. During a prolonged blizzard, matters are complicated by a 
failure of a power station which also forwards weather data. The question 
facing Most Power is whether to purchase power from a neighboring utility, 
organizing such an arrangement having a lead time of five days. 
_______________________________________________________________________

Queuing and the Intricacy of Message Passing

These scenarios all describe large-scale applications, but the problems they pose are familiar. Certainly, many people would be happy to endorse any institution that would allow them to buy/sell stocks sensitive to temporal fluctuations in the market, to register for classes without having to stand in line, to enjoy uninterrupted power supply during emergencies, and so on.

The primary problem is, of course, the complexity of information-handling. Each of these scenarios describes a situation in which messages come from and are distributed to multiple clients (nodes) in a distributed computing environment. Messages are not only passed back and forth between client and server but also are intricately interleaved between processes on the server that utilize database. Indeed, large-scale applications can be viewed as consisting of multi-step processes in which each step is triggered by one or more messages and gives rises to one or more messages. Another way of saying this is that messages are events that trigger other message-events.

Business Process Management, or Workflow, based on this notion of the interrelation of messages and events, is more and more recognized as a fundamental technology for emerging applications. Queuing is one of the key technologies for this class of application because it implements deferred execution of messages. This decoupling of `requests for service' from `supply of services' increases efficiency, and provides the infrastructure for complex scheduling.


Securing Messages in a Vulnerable Environment

Handling the intricacy message-passing is not the only problem. Unfortunately, networks, computing hardware, and software applications will all fail from time to time, as is the case in power utility scenario. Nevertheless, the ACID properties of the information must be preserved. Chaos would quickly follow if buy orders were `lost', or students were registered into the same class twice, or power were not distributed to an area that required it. In other words, messaging must be persistent. By integrating transaction processing with queuing technology, persistent messaging in the form of queuing is made possible. The importance of queuing has been proven by TP-monitors that typically include such a facility.


Message Persistence as Extension in Time and Space

The persistence of messages that is required goes beyond the ability to recover information in the event of system failure. Applications may have to deal with multiple unprocessed messages arriving simultaneously from external clients or from programs internal to the application. If the system falls short in its capacity to deal with these messages immediately, the application must be able to store the messages until they can be processed. By the same token, external clients or internal programs may not be ready to receive messages that have been processed.

Even more important, applications must be able to deal with shifting priorities: messages arriving later may be of higher priority than messages arriving earlier; messages arriving earlier may have to wait for later messages before actions are executed; the same message may have to be accessed by different processes; and so on.

One crucial dimension of handling the dynamic aspect of message persistence has to do with windows of opportunity that grow and shrink. In the case of the share brokerage application, the window for completing the sale shrinks to nothing (i.e. offer to sell expires) from the time the offer to sell message is received. In the case of the student registration application, different priorities apply during different temporal phases, and data must be re-evaluated with the transition from one phase to another. And in the case of the power utility, the entire decision-making process depends on whether conditions are stable (the persistence of a large window) or dynamic (the rapid appearance and disappearance of windows).


Control Data as Essential Information

What is true for all the scenarios is the time that messages are received or dispatched is a crucial part of the message. This means that the control component of the message - in this case, time markers - is as important as the payload data. Put another way: the message retains importance as a business asset after it has been executed.

Persistent messaging thus implies accurate documentation of messages for analysis of historical patterns and future trends. For instance:
* The ability to retrieve the sequence of messages is absolutely critical for the brokerage firm in the first scenario to refute the charges made to the SEC. They must be able to show that the offer to sell made by client_A was matched by the first available offer to buy by client_B.
* With regard to student registration, the withdrawal of a student from a class which is full, requires (1) tracking-down the next student in line based on priority, time-period and specified preference, (2) moving him/her from a class in which he/she is registered into the available spot, and (3) dealing with the resultant repercussions i.e. keeping track of the relationships between messages and navigating from one message to another based on queries
* In the case of the power utility, messages about weather and power utilization need to be preserved over time so as to be able to analyze patterns by querying message warehouses. The utility is specifically concerned with time lapses between events e.g.,
- between distinguishing where power is needed and distributing the power
- between sending the message to distribute power and the actual distribution.


Specific Requirements of a Messaging System

What are the key requirements of a persistent messaging system given the above issues?

Possible Solutions: Synchronous versus Disconnected/Deferred Communication

Generally, attempts to provide communication between programs can be classified into one of two types: Synchronous and Disconnected/Deferred Communication.


Synchronous Communication

This model of communication, also called on-line or connected, is based on the request/reply paradigm. In this model a program sends a request to another program and waits (blocks) until the reply arrives. This model of communication, in which the sender and receiver of the message are tightly coupled, is suitable for programs that need to get a reply before they can proceed with any task. Traditional client/server architectures are based on this model.

The major drawback of the synchronous model of communication is that the programs must be available and running for the application to work. In the event of network or machine failure, or even if the program needed being busy, the entire application grinds to a halt.


Disconnected/Deferred Messaging

In this model programs in the role of producers place requests in a queue and then proceed with their work. Programs in the role of consumers retrieve requests from the queue and acts on them. This model is well suited for applications that can continue with their work after placing a request in the queue because they are not blocked waiting for a reply. It is also suited to applications that can continue with their work until there is a message to retrieve.

For deferred execution to work correctly even in the presence of network, machine and application failures, the requests must be stored persistently, and processed exactly once. This can be achieved by combining persistent queuing with transaction protection. Oracle8 provides a queuing technology that does not depend on the use of TP-monitors or any other evolving Message-Oriented Middleware (MOM) infrastructure.


Oracle Advanced Queuing - Features

Oracle AQ (Oracle Advanced Queueing) provides message queuing as an integrated part of the Oracle server. Oracle AQ provides this functionality by integrating the queuing system with the database, thereby creating a message-enabled database. By providing an integrated solution Oracle AQ frees application developers to devote their efforts to their specific business logic rather than having to construct a messaging infrastructure.


General Features

ENQUEUE Features

DEQUEUE Features

Oracle Advanced Queuing - Primary Components

Queuing Entities

Message

A message is the smallest unit of information inserted into and retrieved from a queue. A message consists of control information (metadata) and payload (data). The control information represents message properties used by AQ to manage messages. The payload data is the information stored in the queue and is transparent to Oracle AQ. A message can reside in only one queue. A message is created by the enqueue call and consumed by the dequeue call.

Queue

A queue is a repository for messages. There are two types of queues: user queues, also known as normal queues, and exception queues. The user queue is for normal message processing. Messages are transferred to an exception queue if they can not be retrieved and processed for some reason. Queues can be created, altered, started, stopped, and dropped by using the Oracle AQ administrative interfaces.

Queue Table

Queues are stored in queue tables. Each queue table is a database table and contains one or more queues. Each queue table contains a default exception queue.

The following figure shows the relationship between messages, queues, and queue tables. The columns represent message queues, with rows representing individual messages.

Figure 11-1: Queuing Entities

Agents

An agent is a queue user. There are two types of agents: producers who place messages in a queue (enqueuing) and consumers who retrieve messages (dequeuing). Any number of producers and consumers may be accessing the queue at a given time. An agent is identified by its name, address and protocol. The address and protocol fields are reserved for future use. Agents insert messages into a queue and retrieve messages from the queue by using the Oracle AQ operational interfaces.

Time manager

The time manager is a background process that monitors the messages in the queue. It provides the mechanism for message expiration, retry and delay.


Basic Queuing

Basic Queuing - One Producer, One Consumer

At its most basic, one producer may enqueues different messages into one queue. Each message will be dequeued and processed once by one of the consumers. A message will stay in the queue until a consumer dequeues it or the message expires. A producer may stipulate a delay before the message is available to be consumed, and a time after which the message expires. Likewise, a consumer may wait when trying to dequeue a message if no message is available. Note that an agent program, or application, can act as both a producer and a consumer.


Basic Queueing - Many Producers, One Consumer

At a slightly higher level of complexity, many producers may enqueue messages into a queue, all of which are processed by one consumer.


Basic Queueing - Many Producers, Many Consumers of Discrete Messages

In this next stage, many producers may enqueue messages, each message being processed by a different consumer depending on type and correlation identifier. The figure below shows this scenario.

Figure 11-2: Basic Queuing


Multiple-Consumer Dequeuing of the Same Message

A message can only be enqueued into one queue at a time. If a producer had to insert the same message into several queues in order to reach different consumers, this would require management of a very large number of queues. Oracle AQ provides two mechanisms to allow for multiple consumers to dequeue the same message: queue subscribers and message recipients. The queue must reside in a queue table that is created with multiple consumer option to allow for subscriber and recipient lists. Each message remains in the queue until it is consumed by all its intended consumers.


Queue Subscribers

Using this approach, multiple consumer-subscribers are associated with a queue. This will cause all messages enqueued in the queue to be made available to be consumed by each of the queue subscribers. The subscribers to the queue can be changed dynamically without any change to the messages or message producers. Subscribers to the queue are added and removed by using the Oracle AQ administrative package. The diagram below shows multiple producers enqueuing messages into queue, each of which is consumed by multiple consumer-subscribers.



Message Recipients

A message producer can submit a list of recipients at the time a message is enqueued. This allows for a unique set of recipients for each message in the queue. The recipient list associated with the message overrides the subscriber list associated with the queue, if there is one. The recipients need not be in the subscriber list.


Oracle Advanced Queuing by Example

Example Overview

Oracle AQ by Example guides users by means of a step-by-step approach.

Assign Roles and Privileges

/* Create user and grant privileges: */

CONNECT sys/change_on_install;

CREATE user aq identified by AQ;

GRANT AQ_ADMINISTRATOR_ROLE TO aq;

GRANT CONNECT TO aq;

GRANT RESOURCE TO aq;

EXECUTE dbms_aqadm.grant_type_access(`aq');

CONNECT aq/AQ;



SET ECHO ON;

SET SERVEROUTPUT ON;


Create Queue Tables and Queues

Create a queue table and queue of object type

/* Create a message type: */

CREATE type aq.message_type as object (

subject     VARCHAR2(30),

text        VARCHAR2(80));   



/* Create a object type queue table and queue: */

EXECUTE dbms_aqadm.create_queue_table (

queue_table        => `aq.msg',

queue_payload_type => `aq.message_type');



EXECUTE dbms_aqadm.create_queue (

queue_name         => `msg_queue',

queue_table        => `aq.msg');



EXECUTE dbms_aqadm.start_queue (

queue_name         => `msg_queue');


Create a queue table and queue of raw type

/* Create a RAW type queue table and queue: */

EXECUTE dbms_aqadm.create_queue_table ( 

queue_table          => 'aq.raw_msg', 

queue_payload_type   => 'RAW'); 

  

EXECUTE dbms_aqadm.create_queue ( 

queue_name          => 'raw_msg_queue', 

queue_table         => 'aq.raw_msg'); 

  

EXECUTE dbms_aqadm.start_queue ( 

queue_name          => 'raw_msg_queue'); 


Create a prioritized message queue table and queue

/* Create a prioritized message queue table and queue: */



EXECUTE dbms_aqadm.create_queue_table (

queue_table        => `aq.priority_msg', 

sort_list          => `PRIORITY,ENQ_TIME', 

queue_payload_type => `aq.message_type');



EXECUTE dbms_aqadm.create_queue (

queue_name         => `priority_msg_queue', 

queue_table        => `aq.priority_msg');



EXECUTE dbms_aqadm.start_queue ( 

queue_name         => `priority_msg_queue');



Create a queue table and queue 

/* Create a multiple consumer queue table and queue: */

EXECUTE dbms_aqadm.create_queue_table (

queue_table        => `aq.msg_multiple',

multiple_consumers => TRUE, 

queue_payload_type => `aq.message_type');



EXECUTE dbms_aqadm.create_queue (

queue_name         => `msg_queue_multiple',

queue_table        => `aq.msg_multiple');



EXECUTE dbms_aqadm.start_queue (

queue_name         => `msg_queue_multiple');

                    

Enqueue and Dequeue of Object Type Messages

To enqueue a single message without any other parameters specify the queue name and the payload.


/* Enqueue to msg_queue: */

DECLARE

enqueue_options     dbms_aq.enqueue_options_t;

message_properties  dbms_aq.message_properties_t;

message_handle      RAW(16);

message             aq.message_type;



BEGIN

message := message_type(`NORMAL MESSAGE',

`enqued to msg_queue first.');



dbms_aq.enqueue(queue_name => `msg_queue',           

enqueue_options      => enqueue_options,       

message_properties   => message_properties,     

payload              => message,               

msgid                => message_handle);



COMMIT;

END;

/



/* Dequeue from msg_queue: */

DECLARE

dequeue_options     dbms_aq.dequeue_options_t;

message_properties  dbms_aq.message_properties_t;

message_handle      RAW(16);

message             aq.message_type;



BEGIN

dbms_aq.dequeue(queue_name => `msg_queue',

              dequeue_options    => dequeue_options,

              message_properties => message_properties,

              payload            => message,

              msgid              => message_handle);



dbms_output.put_line (`Message: ` || message.subject ||

                                      ` ... ` || message.text );

COMMIT;

END;

/


Enqueue and Dequeue of Object Type Messages Using Pro*C/C++


#include  <stdio.h>

#include  <string.h>

#include  <sqlca.h>

#include  <sql2oci.h>

/* The header file generated by processing 

object type 'aq.message_type': */

#include  "pceg.h"



void sql_error(msg)

char *msg;

{

EXEC SQL WHENEVER SQLERROR CONTINUE;

printf("%s\n", msg);

printf("\n% .800s \n", sqlca.sqlerrm.sqlerrmc);

EXEC SQL ROLLBACK WORK RELEASE;

exit(1);

}



main()

{

message_type     *message = (message_type*)0;  /* payload */

char             user[60]="aq/AQ";  /* user logon password */

char             subject[30];  /* components of the */ 

char             txt[80];  /* payload type */

                                      

/* ENQUEUE and DEQUEUE to an OBJECT QUEUE */    

       

/*  Connect to database: */

EXEC SQL CONNECT :user;  

 

/* On an oracle error print the error number :*/

EXEC SQL WHENEVER SQLERROR DO sql_error("Oracle Error :");



/* Allocate memory for the host variable from the object cache : */

EXEC SQL ALLOCATE :message;



/* ENQUEUE */



strcpy(subject, "NORMAL ENQUEUE");

strcpy(txt, "The Enqueue was done through PLSQL embedded in PROC");



/* Initialize the components of message : */

EXEC SQL OBJECT SET SUBJECT, TEXT OF  :message TO :subject, :txt;



/* Embedded PLSQL call to the AQ enqueue procedure : */

EXEC SQL EXECUTE

DECLARE

message_properties   dbms_aq.message_properties_t;

enqueue_options      dbms_aq.enqueue_options_t; 

msgid                RAW(16);

BEGIN

 /* Bind the host variable 'message' to the payload: */

dbms_aq.enqueue(queue_name => 'msg_queue',

message_properties => message_properties,

enqueue_options => enqueue_options,

payload => :message,  

msgid => msgid);

END;

END-EXEC;

/* Commit work */

EXEC SQL COMMIT;                            

            

printf("Enqueued Message \n");

printf("Subject  :%s\n",subject);

printf("Text     :%s\n",txt);



/* Dequeue */



/* Embedded PLSQL call to the AQ dequeue procedure : */

EXEC SQL EXECUTE

DECLARE

message_properties  dbms_aq.message_properties_t;

dequeue_options     dbms_aq.dequeue_options_t; 

msgid               RAW(16);

BEGIN  

/* Return the payload into the host variable 'message':  */

dbms_aq.dequeue(queue_name => 'msg_queue',

message_properties => message_properties,

dequeue_options => dequeue_options,

payload => :message,

msgid => msgid);

END;

END-EXEC;

 /* Commit work :*/

EXEC SQL COMMIT; 

                                       

/* Extract the components of message: */

EXEC SQL OBJECT GET SUBJECT,TEXT FROM :message INTO :subject,:txt;



printf("Dequeued Message \n");

printf("Subject  :%s\n",subject);

printf("Text     :%s\n",txt);

}



Enqueue and Dequeue of RAW Type Messages

/* Enqueue a message containing a RAW: */ 

DECLARE 

      enqueue_options     dbms_aq.enqueue_options_t; 

      message_properties  dbms_aq.message_properties_t; 

      message_handle      RAW(16); 

      message             RAW(4096); 

  

BEGIN 

      message :=  hextoraw(rpad('FF',4095,'FF')); 

      dbms_aq.enqueue(queue_name => 'raw_msg_queue',            

              enqueue_options    => enqueue_options,        

              message_properties => message_properties,      

              payload  => message,                

              msgid              => message_handle); 



      COMMIT; 

END; 

/* Dequeue from raw_msg_queue: */ 

DECLARE 

      dequeue_options     dbms_aq.dequeue_options_t; 

      message_properties  dbms_aq.message_properties_t; 

      message_handle      RAW(16); 

      message             RAW(4096); 

  

BEGIN 

 dbms_aq.dequeue(queue_name => 'raw_msg_queue', 

dequeue_options    => dequeue_options, 

message_properties => message_properties, 

payload            => message, 

msgid              => message_handle); 

    

COMMIT; 

END;



Enqueue and Dequeue of RAW Type Messages using Pro*C/C++              



#include  <stdio.h>

#include  <string.h>

#include  <sqlca.h>

#include  <sql2oci.h>



void sql_error(msg)

char *msg;

{

EXEC SQL WHENEVER SQLERROR CONTINUE;

printf("%s\n", msg);

printf("\n% .800s \n", sqlca.sqlerrm.sqlerrmc);

EXEC SQL ROLLBACK WORK RELEASE;

exit(1);

}



main()

{

OCIEnv         *oeh;   /* OCI Env handle */

OCIError       *err;   /* OCI Err handle */

OCIRaw         *message= (OCIRaw*)0;   /* payload */

ub1            message_txt[100];   /* data for payload */

char           user[60]="aq/AQ";   /* user logon password */

int            status;   /* returns status of the OCI call */



/* Enqueue and dequeue to a RAW queue */



/* Connect to database: */

EXEC SQL CONNECT :user;



/* On an oracle error print the error number: */

EXEC SQL WHENEVER SQLERROR DO sql_error("Oracle Error :");

  

/* Get the OCI Env handle: */

if (SQLEnvGet(SQL_SINGLE_RCTX, &oeh) != OCI_SUCCESS)

{

printf(" error in SQLEnvGet \n");

exit(1);

}

/* Get the OCI Error handle: */ 

if (status = OCIHandleAlloc((dvoid *)oeh, (dvoid **)&err,

(ub4)OCI_HTYPE_ERROR, (ub4)0, (dvoid **)0))

{

printf(" error in OCIHandleAlloc %d \n", status);

exit(1);

}



/* Enqueue */

/* The bytes to be put into the raw payload:*/

strcpy(message_txt, "Enqueue to a Raw payload queue ");



/* Assign bytes to the OCIRaw pointer :

Memory needs to be allocated explicitly to OCIRaw*: */

if (status=OCIRawAssignBytes(oeh, err, message_txt, 100,

 &message))

{

printf(" error in  OCIRawAssignBytes  %d \n", status);

exit(1);

}



/*  Embedded PLSQL call to the AQ enqueue procedure : */

EXEC SQL EXECUTE

DECLARE

message_properties   dbms_aq.message_properties_t;

enqueue_options      dbms_aq.enqueue_options_t; 

msgid                RAW(16);

BEGIN

/* Bind the host variable message to the raw payload: */ 

dbms_aq.enqueue(queue_name => 'raw_msg_queue',

message_properties => message_properties,

enqueue_options => enqueue_options,

payload => :message,

msgid => msgid);

END;

END-EXEC;

/* Commit work: */ 

EXEC SQL COMMIT;                        

 

/* Dequeue */



/*  Embedded PLSQL call to the AQ dequeue procedure :*/  

EXEC SQL EXECUTE

DECLARE

message_properties  dbms_aq.message_properties_t;

dequeue_options     dbms_aq.dequeue_options_t; 

msgid               RAW(16);

BEGIN

/* Return the raw payload into the host variable 'message':*/

dbms_aq.dequeue(queue_name => 'raw_msg_queue',

message_properties => message_properties,

dequeue_options => dequeue_options,

payload => :message,

msgid => msgid);

END;

END-EXEC;

/* Commit work: */  

EXEC SQL COMMIT;                             

}


Enqueue and Dequeue of Messages by Priority

When two messages are enqued with the same priority, the message which was enqued earlier will be dequeued first. However, if two messages are of different priorities, the message with the lower value (higher priority) will be dequeued first.



/* Enqueue two messages with priority 30 and 5: */

DECLARE

enqueue_options     dbms_aq.enqueue_options_t;

message_properties  dbms_aq.message_properties_t;

message_handle      RAW(16);

      message             aq.message_type;



BEGIN

message := message_type(`PRIORITY MESSAGE',

enqued at priority 30.');



message_properties.priority := 30;



dbms_aq.enqueue(queue_name => `priority_msg_queue',

enqueue_options    => enqueue_options,

message_properties => message_properties,

payload            => message,

msgid              => message_handle);



message := message_type(`PRIORITY MESSAGE',

`Enqueued at priority 5.');



message_properties.priority := 5;



      dbms_aq.enqueue(queue_name => `priority_msg_queue',

              enqueue_options    => enqueue_options,

              message_properties => message_properties,

              payload            => message,

              msgid              => message_handle);

END;

/



/* Dequeue from priority queue: */

DECLARE

dequeue_options     dbms_aq.dequeue_options_t;

message_properties  dbms_aq.message_properties_t;

message_handle      RAW(16);

message             aq.message_type;



BEGIN

dbms_aq.dequeue(queue_name  => `priority_msg_queue',

dequeue_options       => dequeue_options,

message_properties    => message_properties,

payload               => message,

msgid                 => message_handle);



dbms_output.put_line (`Message: ` || message.subject ||

` ... ` || message.text );



COMMIT;



dbms_aq.dequeue(queue_name => `priority_msg_queue',

dequeue_options      => dequeue_options,

message_properties   => message_properties,

payload              => message,

msgid                => message_handle);



dbms_output.put_line (`Message: ` || message.subject ||

` ... ` || message.text );

COMMIT;

END;

/

On return, the second message with priority set to 5 will be retrieved 
before the message with priority set to 30 since priority takes precedence 
over enqueue time.


Dequeue of Messages after Preview by Criterion

An application can preview messages in browse mode or locked mode without deleting the message. The message of interest can then be removed from the queue.


/* Enqueue 6 messages to msg_queue

- GREEN, GREEN, YELLOW, VIOLET, BLUE, RED */



DECLARE

enqueue_options     dbms_aq.enqueue_options_t;

message_properties  dbms_aq.message_properties_t;

message_handle      RAW(16);

message             aq.message_type;



BEGIN

message := message_type(`GREEN',

`GREEN enqueued to msg_queue first.');



dbms_aq.enqueue(queue_name => `msg_queue',           

enqueue_options      => enqueue_options,       

message_properties   => message_properties,     

payload              => message,               

msgid                => message_handle);



message := message_type(`GREEN',

`GREEN also enqueued to msg_queue second.');



dbms_aq.enqueue(queue_name => `msg_queue',           

enqueue_options      => enqueue_options,       

message_properties   => message_properties,     

payload              => message,               

msgid                => message_handle);



message := message_type(`YELLOW',

`YELLOW enqueued to msg_queue third.');



dbms_aq.enqueue(queue_name => `msg_queue',           

enqueue_options      => enqueue_options,       

message_properties   => message_properties,     

payload              => message,               

msgid                => message_handle);



dbms_output.put_line (`Message handle: ` || message_handle);



message := message_type(`VIOLET',

`VIOLET enqueued to msg_queue fourth.');



dbms_aq.enqueue(queue_name => `msg_queue',           

enqueue_options      => enqueue_options,       

message_properties   => message_properties,     

payload              => message,               

msgid                => message_handle);



message := message_type(`BLUE',

`BLUE enqueued to msg_queue fifth.');



dbms_aq.enqueue(queue_name => `msg_queue',           

enqueue_options      => enqueue_options,       

message_properties   => message_properties,     

payload              => message,               

msgid                => message_handle);



message := message_type(`RED',

`RED enqueued to msg_queue sixth.');



dbms_aq.enqueue(queue_name => `msg_queue',           

enqueue_options      => enqueue_options,       

message_properties   => message_properties,     

payload              => message,               

msgid                => message_handle);



      COMMIT;

END;

/



/* Dequeue in BROWSE mode until RED is found,

and remove RED from queue: */

DECLARE

dequeue_options     dbms_aq.dequeue_options_t;

message_properties  dbms_aq.message_properties_t;

message_handle      RAW(16);

message             aq.message_type;



BEGIN

dequeue_options.dequeue_mode := dbms_aq.BROWSE;



      LOOP

dbms_aq.dequeue(queue_name          => `msg_queue',

                 dequeue_options    => dequeue_options,

                 message_properties => message_properties,

                 payload            => message,

                 msgid              => message_handle);



dbms_output.put_line (`Message: ` || message.subject ||

                                         ` ... ` || message.text );

         

EXIT WHEN message.subject = `RED';



END LOOP;



dequeue_options.dequeue_mode := dbms_aq.REMOVE;

dequeue_options.msgid        := message_handle;



dbms_aq.dequeue(queue_name => `msg_queue',

dequeue_options    => dequeue_options,

message_properties => message_properties,

payload            => message,

msgid              => message_handle);



dbms_output.put_line (`Message: ` || message.subject ||

` ... ` || message.text );



      COMMIT;

END;

/



/* Dequeue in LOCKED mode until BLUE is found,

and remove BLUE from queue: */

DECLARE

dequeue_options     dbms_aq.dequeue_options_t;

message_properties  dbms_aq.message_properties_t;

message_handle      RAW(16);

message             aq.message_type;



BEGIN

dequeue_options.dequeue_mode := dbms_aq.LOCKED;



      LOOP



dbms_aq.dequeue(queue_name => `msg_queue',

                 dequeue_options    => dequeue_options,

                 message_properties => message_properties,

                 payload            => message,

                 msgid              => message_handle);



dbms_output.put_line (`Message: ` || message.subject ||

          ` ... ` || message.text );

         

EXIT WHEN message.subject = `BLUE';

      END LOOP;



dequeue_options.dequeue_mode := dbms_aq.REMOVE;

dequeue_options.msgid        := message_handle;



dbms_aq.dequeue(queue_name => `msg_queue',

dequeue_options      => dequeue_options,

message_properties   => message_properties,

payload              => message,

msgid => message_handle);



dbms_output.put_line (`Message: ` || message.subject ||

` ... ` || message.text );



      COMMIT;

END;

/


Enqueue and Dequeue of Messages with Time Delay and Expiration

An enqueue can specify the time before which a message cannot be retrieved by a dequeue call. To do this, the producer (i.e the agent enqueuing the message) can also specify the time when a message expires, at which time the message is can use the parameter "delay" when enqueueing the message. The producer can also specify the time when a message expires, at which time the message is moved to an exception queue.

Note that expiration is calculated from the earliest dequeue time. So, if an application wants a message to be dequeued no earlier than a week from now, but no later than 3 weeks from now, this requires setting the expiration time for 2 weeks. This scenario is described in the following code segment.

/* Enqueue message for delayed availability: */

DECLARE

enqueue_options     dbms_aq.enqueue_options_t;

message_properties  dbms_aq.message_properties_t;

message_handle      RAW(16);

message             aq.message_type;



BEGIN

message := message_type(`DELAYED', 

`This message is delayed one week.');

message_properties.delay := 7*24*60*60;

message_properties.expiration := 2*7*24*60*60;



dbms_aq.enqueue(queue_name => `msg_queue',

enqueue_options      => enqueue_options,

message_properties   => message_properties,

payload            => message,

msgid              => message_handle);



      COMMIT;

END;


Enqueue and Dequeue of Messages by Correlation and Message Id Using Pro*C/C++



#include  <stdio.h>

#include  <string.h>

#include  <sqlca.h>

#include  <sql2oci.h>

/* The header file generated by processing 

object type 'aq.message_type': */  

#include  "pceg.h" 



void sql_error(msg)

char *msg;

{

EXEC SQL WHENEVER SQLERROR CONTINUE;

printf("%s\n", msg);

printf("\n% .800s \n", sqlca.sqlerrm.sqlerrmc);

EXEC SQL ROLLBACK WORK RELEASE;

exit(1);

}



main()

{

OCIEnv            *oeh;  /* OCI Env Handle */  

OCIError          *err;  /* OCI Error Handle */

message_type      *message = (message_type*)0; /* queue  payload */

OCIRaw            *msgid = (OCIRaw*)0;  /* message id */

ub1               msgmem[16]="";  /* memory for msgid */

char              user[60]="aq/AQ";  /* user login password */

char              subject[30];  /* components of */

char              txt[80];  /* message_type */

char              correlation1[30];  /* message correlation  */

char              correlation2[30]; 

int               status;  /* code returned by the OCI calls */

                                                    

/* Dequeue by correlation and msgid */



/* Connect to the database: */

EXEC SQL CONNECT :user;                      

EXEC SQL WHENEVER SQLERROR DO sql_error("Oracle Error :");



/* Allocate space in the object cache for the host variable: */

EXEC SQL ALLOCATE :message;



/* Get the OCI Env handle: */

if (SQLEnvGet(SQL_SINGLE_RCTX, &oeh) != OCI_SUCCESS)

{

 printf(" error in SQLEnvGet \n");

 exit(1);

} 

/* Get the OCI Error handle: */

if (status = OCIHandleAlloc((dvoid *)oeh, (dvoid **)&err,

(ub4)OCI_HTYPE_ERROR, (ub4)0, (dvoid **)0))

{

printf(" error in OCIHandleAlloc %d \n", status);

exit(1);

}





/* Assign memory for msgid: 

Memory needs to be allocated explicitly to OCIRaw*: */

if (status=OCIRawAssignBytes(oeh, err, msgmem, 16, &msgid))

{

printf(" error in  OCIRawAssignBytes  %d \n", status);

exit(1);

}



/* First enqueue */



strcpy(correlation1, "1st message");

strcpy(subject, "NORMAL ENQUEUE1");

strcpy(txt, "The Enqueue was done through PLSQL embedded in PROC"); 

 

/* Initialize the components of message: */

EXEC SQL OJECT SET SUBJECT, TEXT OF :message TO :subject, :txt;



/* Embedded PLSQL call to the AQ enqueue procedure: */

EXEC SQL EXECUTE

DECLARE

message_properties   dbms_aq.message_properties_t;

enqueue_options      dbms_aq.enqueue_options_t;

BEGIN

/* Bind the host variable 'correlation1': to message correlation*/

message_properties.correlation := :correlation1;



/* Bind the host variable 'message' to payload and 

 return message id into host variable 'msgid': */

dbms_aq.enqueue(queue_name => 'msg_queue',

message_properties => message_properties,

enqueue_options => enqueue_options,

payload => :message,

msgid => :msgid);

END;

END-EXEC;

/* Commit work: */

EXEC SQL COMMIT;



printf("Enqueued Message \n");

printf("Subject  :%s\n",subject);

printf("Text     :%s\n",txt);

  

/* Second enqueue */



strcpy(correlation2, "2nd message");

strcpy(subject, "NORMAL ENQUEUE2");

strcpy(txt, "The Enqueue was done through PLSQL embedded in PROC");



/* Initialize the components of message: */

EXEC SQL OBJECT SET SUBJECT, TEXT OF :messsage TO :subject,:txt;



/* Embedded PLSQL call to the AQ enqueue procedure: */  

EXEC SQL EXECUTE

DECLARE

message_properties   dbms_aq.message_properties_t;

enqueue_options      dbms_aq.enqueue_options_t; 

msgid                RAW(16);

BEGIN

/* Bind the host variable 'correlation2':  to message correlaiton */  

message_properties.correlation := :correlation2;



/* Bind the host variable 'message': to payload */  

dbms_aq.enqueue(queue_name => 'msg_queue',

message_properties => message_properties,

enqueue_options => enqueue_options,

payload => :message,

msgid => msgid);

END;

END-EXEC;

/* Commit work: */

EXEC SQL COMMIT;

printf("Enqueued Message \n");

printf("Subject  :%s\n",subject);

printf("Text     :%s\n",txt);



/* First dequeue - by  correlation */

 

EXEC SQL EXECUTE

DECLARE

message_properties  dbms_aq.message_properties_t;

dequeue_options     dbms_aq.dequeue_options_t; 

msgid               RAW(16);

BEGIN

/* Dequeue by  correlation in host variable 'correlation2': */  

dequeue_options.correlation := :correlation2;



/* Return the payload into host variable 'message': */

dbms_aq.dequeue(queue_name => 'msg_queue',

message_properties => message_properties,

dequeue_options => dequeue_options,

payload => :message,

msgid => msgid);

END;

END-EXEC;

/* Commit work : */

EXEC SQL COMMIT;



/* Extract the values of the components of message: */

EXEC SQL OBJECT GET SUBJECT, TEXT FROM :message INTO :subject,:txt;



printf("Dequeued Message \n");

printf("Subject  :%s\n",subject);

printf("Text     :%s\n",txt);



/* SECOND DEQUEUE - by MSGID  */   

                                

EXEC SQL EXECUTE

DECLARE

message_properties  dbms_aq.message_properties_t;

dequeue_options     dbms_aq.dequeue_options_t; 

msgid               RAW(16);

BEGIN

/* Dequeue by msgid in host variable 'msgid': */

dequeue_options.msgid := :msgid;



/* Return the payload into host variable 'message':  */

dbms_aq.dequeue(queue_name => 'msg_queue',

message_properties => message_properties,

dequeue_options => dequeue_options,

payload => :message,

msgid => msgid);

END;

END-EXEC;

/* Commit work: */

EXEC SQL COMMIT;

}


Enqueue and Dequeue of Messages to/from a Multiconsumer Queue

/* Create subscriber list: */

DECLARE

subscriber sys.aq$_agent;



/* Add subscribers RED and GREEN to the suscriber list: */

BEGIN

subscriber := sys.aq$_agent(`RED', NULL, NULL);

dbms_aqadm.add_subscriber(queue_name => `msg_queue_multiple',

subscriber => subscriber);



subscriber := sys.aq$_agent(`GREEN', NULL, NULL);

dbms_aqadm.add_subscriber(queue_name => `msg_queue_multiple',

subscriber => subscriber); 

END;

/

DECLARE

enqueue_options     dbms_aq.enqueue_options_t;

message_properties  dbms_aq.message_properties_t;

recipients          dbms_aq.aq$_recipient_list_t;

message_handle      RAW(16);

message             aq.message_type;



/* Enqueue MESSAGE 1 for subscribers to the queue 

i.e. for RED and GREEN: */

BEGIN

message := message_type(`MESSAGE 1',

`This message is queued for queue subscribers.');



dbms_aq.enqueue(queue_name => `msg_queue_multiple',

enqueue_options    => enqueue_options,

message_properties => message_properties,

payload            => message,

msgid              => message_handle);



/* Enqueue MESSAGE 2 for specified recipients i.e. for RED and BLUE.*/

message := message_type(`MESSAGE 2',

`This message is queued for two recipients.');

recipients(1) := sys.aq$_agent(`RED', NULL, NULL);

recipients(2) := sys.aq$_agent(`BLUE', NULL, NULL);

message_properties.recipient_list := recipients;



dbms_aq.enqueue(queue_name => `msg_queue_multiple',

enqueue_options    => enqueue_options,

message_properties => message_properties,

payload            => message,

msgid              => message_handle);



COMMIT;

END;
/

Note that RED is both a subscriber to the queue, as well as being a specified recipient of MESSAGE 2. By contrast, GREEN is only a subscriber to those messages in the queue (in this case, MESSAGE) for which no recipients have been specified. BLUE, while not a subscriber to the queue, is nevertheless specified to receive MESSAGE 2.


/* Dequeue messages from msg_queue_multiple: */

DECLARE

dequeue_options     dbms_aq.dequeue_options_t;

message_properties  dbms_aq.message_properties_t;

message_handle      RAW(16);

message             aq.message_type;

no_messages   exception;

pragma exception_init (no_messages, -25228);



BEGIN



dequeue_options.wait := dbms_aq.NO_WAIT;

   

/* Consumer BLUE will get MESSAGE 2: */

dequeue_options.consumer_name := `BLUE';



LOOP



dbms_aq.dequeue(queue_name   => `msg_queue_multiple',

          dequeue_options    => dequeue_options,

          message_properties => message_properties,

          payload            => message,

          msgid              => message_handle);



dbms_output.put_line (`Message: ` || message.subject ||

         ` ... ` || message.text );



END LOOP;

EXCEPTION

WHEN no_messages THEN

dbms_output.put_line (`No more messages for BLUE');

COMMIT;

END;



BEGIN

/* Consumer RED will get MESSAGE 1 and MESSAGE 2: */

dequeue_options.consumer_name := `RED';

      

LOOP

dbms_aq.dequeue(queue_name   => `msg_queue_multiple',

          dequeue_options    => dequeue_options,

          message_properties => message_properties,

          payload            => message,

          msgid              => message_handle);



dbms_output.put_line (`Message: ` || message.subject ||

                                         ` ... ` || message.text );

END LOOP;

EXCEPTION

WHEN no_messages THEN

dbms_output.put_line (`No more messages for RED');

COMMIT;

END;



BEGIN

/* Consumer GREEN will get MESSAGE 1: */

dequeue_options.consumer_name := `GREEN';

      

LOOP

dbms_aq.dequeue(queue_name   => `msg_queue_multiple',

          dequeue_options    => dequeue_options,

          message_properties => message_properties,

          payload            => message,

          msgid              => message_handle);



dbms_output.put_line (`Message: ` || message.subject ||

          ` ... ` || message.text );

END LOOP;

EXCEPTION

WHEN no_messages THEN

dbms_output.put_line (`No more messages for GREEN');

COMMIT;

END;

/


Drop AQ Objects

/* Cleans up all objects related to the object type: */

CONNECT aq/AQ;



EXECUTE dbms_aqadm.stop_queue ( 

queue_name => `msg_queue');  



EXECUTE dbms_aqadm.drop_queue ( 

queue_name => `msg_queue');  



EXECUTE dbms_aqadm.drop_queue_table ( 

queue_table => `aq.msg');



/* Cleans up all objects related to the RAW type: */

EXECUTE dbms_aqadm.stop_queue ( 

        queue_name      => 'raw_msg_queue');   

  

EXECUTE dbms_aqadm.drop_queue ( 

        queue_name      => 'raw_msg_queue');   

  

EXECUTE dbms_aqadm.drop_queue_table (

            queue_table => 'aq.raw_msg'); 



/* Cleans up all objects related to the priority queue: */

EXECUTE dbms_aqadm.stop_queue ( 

        queue_name     => `priority_msg_queue');  



EXECUTE dbms_aqadm.drop_queue ( 

        queue_name     => `priority_msg_queue');  



EXECUTE dbms_aqadm.drop_queue_table ( 

queue_table   => `aq.priority_msg'); 



/* Cleans up all objects related to the multiple-consumer queue: */

EXECUTE dbms_aqadm.stop_queue ( 

queue_name  => `msg_queue_multiple');  



EXECUTE dbms_aqadm.drop_queue ( 

queue_name  => `msg_queue_multiple');  



EXECUTE dbms_aqadm.drop_queue_table ( 

queue_table => `aq.msg_multiple');



drop type aq.message_type;


Revoke Roles and Privileges

CONNECT sys/change_on_install;

drop user aq;

Oracle Advanced Queuing Reference

Reference Overview

This section contains a detailed description of the technical specifications:

- Init ora Parameter
- Data Structures
- Agent
- Message Properties
- Queue Options
- Operational Interface
- Administrative Interface
- Administration Topics
- Data Objects

    

INIT.ORA Parameter

A parameter called aq_tm_processes should be specified in the init.ora parameter file if you want to perform time monitoring on queue messages. This will be used for messages which have delay and expiration properties specified. This parameter can be set to 0 or 1. Setting it to any other number will result in an error. If this parameter is set to 1, one time manager process will be created as a background process to monitor the messages. If the parameter is not specified or is set to 0, then the time manager process is not created. The administrative interfaces to start and stop the time manager are only valid if the time manager process is started as part of instance startup by specifying this parameter.

Parameter Name: aq_tm_processes

Parameter Type: integer

Parameter Class: Dynamic

Allowable Values: 0 or 1

Syntax: aq_tm_processes = <either 0 or 1>

Name of process: ora_aqtm_<oracle sid>

Example: aq_tm_processes = 1


Data Structures

The following data structures are used in the operational and administrative interfaces.

Object name

Purpose:

Naming of database objects. This naming convention applies to queues, queue tables and object types.

Syntax:

object_name := VARCHAR2
object_name := [<schema_name>.]<name>

Usage:

Names for objects are specified by an optional schema name and a name. If the schema name is not specified then the current schema is assumed. The schema name and the name can each be up to 24 characters long.


Type name

Purpose:

Defining queue types.

Syntax:

type_name := VARCHAR2
type_name := <object_type> | "RAW"

Usage:

Table 11-1:

Parameter   Description  

<object_types>  

For details on creating object types please refer to Server concepts manual. The maximum number of attributes in the object type is limited to 900.  

"RAW"  

To store payload of type RAW, AQ will create a queue table with a LOB column as the payload repository. The size of the payload is limited to 32K bytes of data. Because LOB columns are used for storing RAW payload, the AQ administrator can choose the LOB tablespace and configure the LOB storage by constructing a LOB storage string in the storage_clause parameter during queue table creation time.  

Type Name


Agent

Purpose:

To identify a producer or a consumer of a message.

Syntax:

TYPE sys.aq$_agent IS OBJECT (
name         VARCHAR2(30), 
address      VARCHAR2(30),
protocol     NUMBER)

Usage:
Table 11-2: Agent
Parameter   Description  

name  

Name of a producer or consumer of a message.  

address  

Reserved for future use.  

protocol  

Reserved for future use.  

Message Properties

Purpose:

The Message Properties describe the information that is used by AQ to manage individual messages. These are set at enqueue time and their values are returned at dequeue time.

Syntax:

TYPE message_properties_t IS RECORD (
priority        BINARY_INTEGER default 1,
delay           BINARY_INTEGER default NO_DELAY,
expiration      BINARY_INTEGER default NEVER,
correlation     VARCHAR2(128) default NULL,
attempts        BINARY_INTEGER,
recipient_list  aq$_recipient_list_t,
exception_queue VARCHAR2(51) default NULL,
enqueue_time    date,
state           BINARY_INTEGER)

TYPE aq$_recipient_list_t IS TABLE OF sys.aq$_agent

INDEX BY BINARY_INTEGER

Usage:

Table 11-3: Message properties
Parameter   Description  

priority  

Specifies the priority of the message. A smaller number indicates higher priority. The priority can be any number, including negative numbers.  

delay  

Specifies the delay of the enqueued message. The delay represents the number of seconds after which a message is available for dequeuing. Dequeuing by msgid overrides the delay specification. A message enqueued with delay set will be in the WAITING state, when the delay expires the messages goes to the READY state. DELAY processing requires the time manager to be started. Note that delay is set by the producer who enqueues the message.

NO_DELAY: the message is available for immediate dequeuing.

number: the number of seconds to delay the message.  

expiration  

Specifies the expiration of the message. It determines, in seconds, the duration the message is available for dequeuing. This parameter is an offset from the delay. Expiration processing requires the time manager to be running.

NEVER: message will not expire.

number: number of seconds message will remain in READY state. If the message is not dequeued before it expires, it will be moved to the exception queue in the EXPIRED state.  

correlation  

Specifies the identification supplied by the producer for a message at enqueuing.  

attempts  

Specifies the number of attempts that have been made to dequeue this message. This parameter can not be set at enqueue time.  

recipient_list  

For type definition please refer to section titled "Agent".

This parameter is only valid for queues which allow multiple consumers. The default recipients are the queue subscribers. This parameter is not returned to a consumer at dequeue time.  

exception_queue  

Specifies the name of the queue to which the message is moved to if it cannot be processed successfully. Messages are moved in two cases: The number of unsuccessful dequeue attempts has exceeded max_retries or the message has expired. All messages in the exception queue are in the EXPIRED state.

The default is the exception queue associated with the queue table. If the exception queue specified does not exist at the time of the move the message will be moved to the default exception queue associated with the queue table and a warning will be logged in the alert file. If the default exception queue is used the parameter will return a NULL value at dequeue time.  

enqueue_time  

Specifies the time the message was enqueued. This value is determined by the system and cannot be set by the user. This parameter can not be set at enqueue time.  

state  

Specifies the state of the message at the time of the dequeue. This parameter can not be set at enqueue time.

WAITING: The message delay has not yet been reached

READY: The message is ready to be processed.

PROCESSED: The message has been processed and is retained

EXPIRED: The message has been moved to the exception queue  

Queue Options

Enqueue options

Purpose:

To specify the options available for the enqueue operation.

Syntax:

TYPE enqueue_options_t IS RECORD (
visibility          BINARY_INTEGER default ON_COMMIT,
relative_msgid      RAW(16) default NULL,
sequence_deviation  BINARY_INTEGER default NULL)

Usage:
Table 11-4: ENQUEUE options
Parameter   Description  

visibility  

Specifies the transactional behavior of the enqueue request.

ON_COMMIT: The enqueue is part of the current transaction. The operation is complete when the transaction commits. This is the default case.

IMMEDIATE: The enqueue is not part of the current transaction. The operations constitutes a transaction on its own.  

relative_msgid

 

Specifies the message identifier of the message which is referenced in the sequence deviation operation. This field is valid if and only if BEFORE is specified in sequence_deviation. This parameter will be ignored if sequence deviation is not specified.  

sequence_deviation

 

Specifies if the message being enqueued should be dequeued before other message(s) already in the queue.

BEFORE: The message is enqueued ahead of the message specified by relative_msgid.

TOP: The message is enqueued ahead of any other messages.

NULL: Default  


Dequeue options

Purpose:

To specify the options available for the dequeue operation.

Syntax:

TYPE dequeue_options_t IS RECORD (
consumer_name   VARCHAR2(30) default NULL,
dequeue_mode    BINARY_INTEGER default REMOVE,
navigation      BINARY_INTEGER default NEXT_MESSAGE,
visibility     BINARY_INTEGER default ON_COMMIT,
wait           BINARY_INTEGER default FOREVER
msgid          RAW(16) default NULL,
correlation    VARCHAR2(128) default NULL)

Usage

Table 11-5: DEQUEUE options
Parameter   Description  

consumer_name  

Name of the consumer. Only those messages matching the consumer name are accessed. If a queue is not set up for multiple consumers, this field should be set to NULL.  

dequeue_mode  

Specifies the locking behavior associated with the dequeue.

BROWSE: Read the message without acquiring any lock on the message. This is equivalent to a select statement.

LOCKED: Read and obtain a write lock on the message. The lock lasts for the duration of the transaction. This is equivalent to a select for update statement.

REMOVE: Read the message and update or delete it. This is the default. The message can be retained in the queue table based on the retention properties.  

navigation  

Specifies the position of the message that will be retrieved. First, the position is determined. Second, the search criterion is applied. Finally, the message is retrieved.

NEXT_MESSAGE: Retrieve the next message which is available and matches the search criteria. If the previous message belongs to a message group, AQ will retrieve the next available message which matches the search criteria and belongs to the message group. This is the default.

NEXT_TRANSACTION: Skip the remainder of the current transaction group (if any) and retrieve the first message of the next transaction group. This option can only be used if message grouping is enabled for the current queue.

FIRST_MESSAGE: Retrieves the first message which is available and matches the search criteria. This will reset the position to the beginning of the queue.  

visibility  

Specifies whether the new message is dequeued as part of the current transaction.The visibility parameter is ignored when using the BROWSE mode.

ON_COMMIT: The dequeue will be part of the current transaction. This is the default case.

IMMEDIATE: The dequeued message is not part of the current transaction. It constitutes a transaction on its own.  

wait  

Specifies the wait time if there is currently no message available which matches the search criteria. This parameter is ignored if messages in the same group are being dequeued.

FOREVER: wait forever. This is the default.

NO_WAIT: do not wait

number: wait time in seconds  

msgid

 

Specifies the message identifier of the message to be dequeued.  

correlation

 

Specifies the correlation identifier of the message to be dequeued.  



Operational Interface

The following interface calls are available to enqueue and dequeue messages from queues.


DBMS_AQ.ENQUEUE

Purpose:

Adds a message to the specified queue. In the simplest case, if the user wants to enqueue a message, without any other parameters, only the queue name and the payload have to be specified.

Syntax:

DBMS_AQ.ENQUEUE (
queue_name          IN VARCHAR2,
enqueue_options     IN enqueue_options_t,
message_properties  IN message_properties_t,
payload             IN "<type_name>",
msgid               OUT RAW)

Usage:

Table 11-6: DBMS_AQ.ENQUEUE
Parameter   Description  

queue_name

(IN VARCHAR2)  

Specifies the name of the queue to which this message should be enqueued. The queue cannot be an exception queue.  

enqueue_options

(IN enqueue_option_t)  

For the definition please refer to the section titled "ENQUEUE Options."  

message_properties

(IN message_properties_t)  

For the definition please refer to the section titled "Message Properties."  

payload

(IN "<type_name>")  

Not interpreted by Oracle AQ.

The payload must be specified according to the specification in the associated queue table. NULL is an acceptable parameter. For the definition of <type_name> please refer to section titled "Type name"  

msgid

(OUT RAW)  

The system generated identification of the message. This is a globally unique identifier that can be used to identify the message at dequeue time.  

Using sequence deviation:

The sequence_deviation parameter in enqueue_options can be used to change the order of processing between two messages. The identity of the other message, if any, is specified by the enqueue_options parameter relative_msgid. The relationship is identified by the sequence_deviation parameter.

Specifying sequence_deviation for a message introduces some restrictions for the delay and priority values that can be specified for this message. The delay of this message has to be less than or equal to the delay of the message before which this message is to be enqueued. The priority of this message has to be
greater than or equal to the priority of the message before which this message is to be enqueued.


DBMS_AQ.DEQUEUE

Purpose:

Dequeues a message from the specified queue.

Syntax:

DBMS_AQ.DEQUEUE (
queue_name          IN VARCHAR2,
dequeue_options     IN dequeue_options_t,
message_properties  OUT message_properties_t,
payload             OUT "<type_name>",
msgid               OUTraw)

Usage:

Table 11-7: DBMS_AQ.DEQUEUE

Parameter   Description  

queue_name

(IN VARCHAR2)  

Specifies the name of the queue.
 

dequeue_options

(IN dequeue_option_t)  

For the definition please refer to the section titled "ENQUEUE Options."

 

message_properties

(OUT message_properties_t)  

For the definition please refer to the section titled "Message Properties."

 

payload

(OUT "<type_name>")  

Not interpreted by Oracle AQ.

The payload must be specified according to the specification in the associated queue table. For the definition of <type_name> please refer to section titled "Type name"  

msgid

(OUT raw)  

The system generated identification of the message.  

Search criteria and dequeue order for messages:

The search criteria for messages to be dequeued is determined by the consumer_name, msgid and correlation parameters in the dequeue_options. Msgid uniquely identifies the message to be dequeued. Correlation identifiers are application-defined identifiers that are not interpreted by AQ.

Only messages in the READY state are dequeued unless a msgid is specified.

The dequeue order is determined by the values specified at the time the queue table is created unless overridden by the msgid and correlation id in dequeue_options.

The database consistent read mechanism is applicable for queue operations. For example, a BROWSE call may not see a message that is enqueued after the beginning of the browsing transaction.

Enumerated Constants in the Operational Interface

When using enumerated constants such as BROWSE, LOCKED, REMOVE, the PL/SQL constants need to be specified with the scope of the packages defining it. All types associated with the operational interfaces have to be prepended with dbms_aq . For example:

dbms_aq.BROWSE

Table 11-8: Enumerated types in the operational interface
Parameter   Options  

visibility  

IMMEDIATE, ON_COMMIT  

mode  

BROWSE, LOCKED, REMOVE  

navigation  

FIRST_MESSAGE, NEXT_MESSAGE, NEXT_TRANSACTION  

state  

WAITING, READY, PROCESSED, EXPIRED  

sequence_deviation  

BEFORE, TOP  

wait  

FOREVER, NO_WAIT  

delay  

NO_DELAY  

expiration  

NEVER  

Administrative Interface

Configuration information can be managed through procedures in the DBMS_AQADM package. Because incorrect usage of the administration interface can have substantial performance impact on the database system, the administration interface should be treated as privileged commands, and only the designated queue administrator or privileged users should be granted access to the administration package. Initially, only SYS has the execution privilege for the procedures in DBMS_AQADM and DBMS_AQ.


Privileges and access control

Access to AQ operations are granted to users through roles. These roles provide execution privileges on the AQ procedures. Currently, we do not support fine grained access control at the database object level. This implies that a user with the AQ_USER_ROLE can enqueue and dequeue to any queue in the system.

Administrator role

AQ_ADMINISTRATOR_ROLE grants execute privileges to procedures in the DBMS_AQADM and DBMS_AQ packages. These include all the administrative and operational interfaces. The user `SYS' must grant the AQ_ADMINISTRATOR_ROLE to the AQ administrator.


User role

AQ_USER_ROLE grants execute privileges to procedures in the DBMS_AQ packages. These include all the operational interfaces. The AQ administrator must grant the AQ_USER_ROLE to AQ users.


Access to AQ object types

The procedure grant_type_access must first be executed by the user `SYS' to grant access for AQ object types to the AQ administrator. The AQ administrator can then execute this procedure to grant access for AQ object types to other AQ users. The procedure needs to be executed if the user wishes to perform any administrative operation involving a multiple consumer queue. These include CREATE_QUEUE_TABLE, CREATE_QUEUE, ADD_SUBSCRIBER and REMOVE_SUBSCRIBER.

Syntax:

PROCEDURE grant_type_access (user_name IN VARCHAR2);


Example

1. Scott is appointed as the AQ administrator.

CONNECT sys/change_on_install
GRANT AQ_ADMINISTRATOR_ROLE to scott with admin option; 
execute dbms_aqadm.grant_type_access(`scott');

2. Scott lets Jones use AQ.

CONNECT scott/tiger
GRANT AQ_USER_ROLE to jones;

3. Jones wishes to create queue tables that are enabled for multiple dequeues.

CONNECT scott/tiger
execute dbms_aqadm.grant_type_access(`jones');


DBMS_AQADM.CREATE_QUEUE_TABLE

Purpose:

Create a queue table for messages of a pre-defined type. The sort keys for dequeue ordering, if any, need to be defined at table creation time. The following objects are created at this time:

  1. The default exception queue associated with the queue table called
    aq$_<queue_table_name>_e.
  2. A read-only view which is used by AQ applications for querying queue data called aq$<queue_table_name>.
  3. An index for the time manager operations called aq$_<queue_table_name>_t.
  4. An index or an index organized table (IOT) in the case of multiple consumer queues for dequeue operations called aq$_<queue_table_name>_i.

Syntax:

DBMS_AQADM.CREATE_QUEUE_TABLE (
queue_table        IN      VARCHAR2,
queue_payload_type IN      VARCHAR2,
storage_clause     IN      VARCHAR2 default NULL,
sort_list          IN      VARCHAR2 default NULL,
multiple_consumers IN      BOOLEAN default FALSE,
message_grouping   IN      BINARY_INTEGER default NONE,
comment            IN      VARCHAR2 default NULL,
auto_commit        IN      BOOLEAN default TRUE)

Usage

Table 11-9: DBMS_AQADM.CREATE_QTABLE

Parameter   Description  

queue_table

(IN VARCHAR2)  

Specifies the name of a queue table to be created.  

queue_payload_type

(IN VARCHAR2)  

Specifies the type of the user data stored. Please see section entitled "Type name" for valid values for this parameter.  

storage_clause

(IN VARCHAR2)  

Specifies the storage parameter. The storage parameter will be
included in the `CREATE TABLE' statement when the queue table is created. The storage parameter can be made up of any combinations of the following parameters:

PCTFREE, PCTUSED, INITRANS, MAXTRANS, TABLEPSACE, LOB and a table storage clause.

Please refer to the SQL reference guide for the usage of these parameters.  

sort_list

(IN VARCHAR2)  

Specifies the columns to be used as the sort key in ascending order.

Sort_list has the following format: `<sort_column_1>,<sort_column_2>'.

The allowed column names are priority and enq_time. If both columns are specified then <sort_column_1> defines the most significant order.

Once a queue table is created with a specific ordering mechanism, all queues in the queue table inherit the same defaults. The order of a queue table cannot be altered once the queue table has been created.

If no sort list is specified all the queues in this queue table will be sorted by the enqueue time in ascending order. This order is equivalent to FIFO order.

Even with the default ordering defined, a dequeuer is allowed to choose a message to dequeue by specifying its msgid or correlation. Msgid, correlation and sequence_deviation take precedence over the default dequeueing order if they are specified.  

multiple_consumers

(IN BOOLEAN)  

FALSE: Queues created in the table can only have one consumer per message. This is the default.

TRUE: Queues created in the table can have multiple consumers per message. The user must have been granted type access by executing the grant_type_access procedure.  

message_grouping

(IN BINARY_INTEGER)  

Specifies the message grouping behavior for queues created in the table.

NONE: Each message is treated individually.

TRANSACTIONAL: Messages enqueued as part of one transaction are considered part of the same group and must be dequeued as a group of related messages.  

comment

(IN VARCHAR2)  

Specifies the user-specified description of the queue table. This user comment will be added to the queue catalog.  

auto_commit

(IN BOOLEAN)  

TRUE: Causes the current transaction, if any, to commit before the operation is carried out. The operation becomes persistent when the call returns. This is the default.

FALSE: The operation is part of the current transaction and will become persistent only when the caller issues a commit.  


DBMS_AQADM.CREATE_QUEUE

Purpose:

Create a queue in the specified queue table. All queue names must be unique within a schema. Once a queue is created with CREATE_QUEUE, it can be enabled by calling START_QUEUE. By default, the queue is created with both enqueue and dequeue disabled.

Syntax:

DBMS_AQADM.CREATE_QUEUE (
queue_name          IN       VARCHAR2,
queue_table         IN       VARCHAR2,
queue_type          IN       BINARY_INTEGER default
                             NORMAL_QUEUE,
max_retries         IN       NUMBER default 0,
retry_delay         IN       NUMBER default 0,
retention_time      IN       NUMBER default 0,
dependency_tracking IN       BOOLEAN default FALSE,
comment             IN       VARCHAR2 default NULL,
auto_commit         IN       BOOLEAN default TRUE)

Usage:

Table 11-10: DBMS_AQADM.CREATE_QUEUE
Parameter   Description  

queue_name

(IN VARCHAR2)  

Specifies the name of the queue that is to be created.  

queue_table

(IN VARCHAR2)  

Specifies the name of the queue table that will contain the queue.  

queue_type

(IN BINARY_INTEGER)  

Specifies whether the queue being created is an exception queue or a normal queue.

NORMAL_QUEUE: The queue is a normal queue. This is the default.

EXCEPTION_QUEUE: It is an exception queue. Only the dequeue operation is allowed on the exception queue.  

max_retries

(IN NUMBER)  

Limits the number of times a dequeue with the REMOVE mode can be attempted on a message. The count is incremented when the application issues a rollback after executing the dequeue. The message is moved to the exception queue when it is reaches its max_retries. Default is 0, which means no retry is allowed.  

retry_delay

(IN NUMBER)  

Specifies the delay time, in seconds before this message is scheduled for processing again after an application rollback. The default is 0, which means the message can be retried as soon as possible. This parameter will have no effect if max_retries is set to 0. Retry_delay cannot be specified with multiple consumer queues.  

retention_time

(IN NUMBER)  

Specifies the number of seconds for which a message will be retained in the queue table after being dequeued from the queue.

INFINITE: Message will be retained forever.

number: Number of seconds for which to retain the messages. The default is 0, i.e. no retention.  

dependency_tracking

(IN BOOLEAN)  

Reserved for future use.

FALSE: This is the default.

TRUE: Not permitted in this release.  

comment

(IN VARCHAR2)  

User-specified description of the queue. This user comment will be added to the queue catalog.  

auto_commit

(IN BOOLEAN)  

TRUE: Causes the current transaction, if any, to commit before the operation is carried out. The operation becomes persistent when the call returns. This is the default.

FALSE: The operation is part of the current transaction and will become persistent only when the caller issues a commit.  


DBMS_AQADM.DROP_QUEUE_TABLE

Purpose:

Drop an existing queue table. All the queues in a queue table have to be stopped and dropped before the queue table can be dropped.

Syntax:

DBMS_AQADM.DROP_QUEUE_TABLE (
queue_table       IN    VARCHAR2,
force             IN    BOOLEAN default FALSE,
auto_commit       IN    BOOLEAN default TRUE)

Usage:

Table 11-11: DBMS_AQADM.DROP_QUEUE_TABLE

Parameter   Description  

queue_table

(IN VARCHAR2)  

Specifies the name of a queue table to be dropped.  

force

(IN BOOLEAN)  

FALSE: The operation will not succeed if there are any queues in the table.This is the default.

TRUE: All queues in the table are stopped and dropped automatically.  

auto_commit

(IN BOOLEAN)  

TRUE: Causes the current transaction, if any, to commit before the operation is carried out. The operation becomes persistent when the call returns. This is the default.

FALSE: The operation is part of the current transaction and will become persistent only when the caller issues a commit.  


DBMS_AQADM.DROP_QUEUE

Purpose:

Drops an existing queue. DROP_QUEUE is not allowed unless STOP_QUEUE has been called to disable the queue for both enqueuing and dequeuing. All the queue data is deleted as part of the drop operation.

Syntax:

DBMS_AQADM.DROP_QUEUE (
queue_name        IN            VARCHAR2,
auto_commit       IN            BOOLEAN default TRUE)

Usage:

Table 11-12: DBMS_AQADM.DROP_QUEUE

Parameter  

Description  

queue_name

(IN VARCHAR2)  

Specifies the name of the queue that is to be dropped.  

auto_commit

(IN BOOLEAN)  

TRUE: Causes the current transaction, if any, to commit before the operation is carried out. The operation becomes persistent when the call returns. This is the default.

FALSE: The operation is part of the current transaction and will become persistent only when the caller issues a commit.  



DBMS_AQADM.ALTER_QUEUE

Purpose:

Alter existing properties of a queue. Only max_retries, retry_delay, and retention_time can be altered.

Syntax:

DBMS_AQADM.ALTER_QUEUE (
queue_name        IN    VARCHAR2,
max_retries       IN    NUMBER default NULL,
retry_delay       IN    NUMBER default NULL,
retention_time    IN    NUMBER default NULL,
auto_commit       IN    BOOLEAN default TRUE)

Usage:

Table 11-13: DBMS_AQADM.ALTER_QUEUE



Parameter  

Description  

queue_name

(IN varchar2)  

Specifies the name of the queue that is to be altered.  

max_retries

(IN number)  

Limits the number of times a dequeue with REMOVE mode can be attempted on a message. The count is incremented when the application issues a rollback after executing the dequeue. If the time at which one of the retries has passed the expiration time, no further retries will be attempted. Default is NULL which means that the value will not be altered.  

retry_delay

(IN number)  

Specifies the delay time in seconds before this message is scheduled for processing again after an application rollback. The default is NULL which means that the value will not be altered.  

retention_time

(IN number)  

Specifies the retention time in seconds for which a message will be retained in the queue table after being dequeued. The default is NULL which means that the value will not be altered.  

auto_commit

(IN boolean)  

TRUE: Causes the current transaction, if any, to commit before the operation is carried out. The operation become persistent when the call returns. This is the default.

FALSE: The operation is part of the current transaction and will become persistent only when the caller issues a commit.  

DBMS_AQADM.START_QUEUE

Purpose:

Enables the specified queue for enqueuing and/or dequeueing. After creating a queue the administrator must use START_QUEUE to enable the queue. The default is to enable it for both ENQUEUE and DEQUEUE. Only dequeue operations are allowed on an exception queue. This operation takes effect when the call completes and does not have any transactional characteristics.

Syntax:

DBMS_AQADM.START_QUEUE ( 
queue_name      IN       VARCHAR2,
enqueue         IN       BOOLEAN default TRUE,
dequeue         IN       BOOLEAN default TRUE)

Usage:

Table 11-14: DBMS_AQADM.START_QUEUE



Parameter   Description  

queue_name

(IN varchar2)  

Specifies the name of the queue to be enabled  

enqueue

(IN boolean)  

Specifies whether ENQUEUE should be enabled on this queue.

TRUE: Enable ENQUEUE. This is the default.

FALSE: Do not alter the current setting.  

dequeue

(IN boolean)  

Specifies whether DEQUEUE should be enabled on this queue.

TRUE: Enable DEQUEUE. This is the default.

FALSE: Do not alter the current setting.  

DBMS_AQADM.STOP_QUEUE

Purpose:

Disables enqueuing and/or dequeuing on the specified queue. By default, it disables both ENQUEUEs or DEQUEUEs. A queue cannot be stopped if there are outstanding transactions against the queue. This operation takes effect when the call completes and does not have any transactional characteristics.

Syntax:

DBMS_AQADM.STOP_QUEUE (

queue_name      IN       VARCHAR2,
enqueue         IN       BOOLEAN default TRUE,
dequeue         IN       BOOLEAN default TRUE,
wait            IN       BOOLEAN default TRUE)

Usage:

Table 11-15: DBMS_AQADM.STOP_QUEUE



Parameter   Description  

queue_name

(IN varchar2)  

Specifies the name of the queue to be disabled.  

enqueue

(IN boolean)  

Specifies whether ENQUEUE should be disabled on this queue.

TRUE: Disable ENQUEUE. This is the default.

FALSE: Do not alter the current setting.  

dequeue

(IN boolean)  

Specifies whether DEQUEUE should be disabled on this queue.

TRUE: Disable DEQUEUE. This is the default.

FALSE: Do not alter the current setting.  

wait

(IN boolean)  

The wait parameter allows you to specify whether to wait for the completion of outstanding transactions.

TRUE: Wait if there are any outstanding transactions. In this state no new transactions are allowed to enqueue to or dequeue from this queue.

FALSE: Return immediately either with a success or an error.  

DBMS_AQADM.START_TIME_MANAGER

Purpose:

To start the time manager process.

Syntax:

DBMS_AQADM.START_TIME_MANAGER;

Usage:

This command causes the time manager process to start executing its operations. The physical process has to be started at database startup time by setting the aq_tm_process init.ora parameter to 1. This operation takes effect when the call completes and does not have any transactional characteristics.


DBMS_AQADM.STOP_TIME_MANAGER

Purpose:

To stop the time manager process.

Syntax:

DBMS_AQADM.STOP_TIME_MANAGER;

Usage:

The command causes the time manager to stop executing all its operations. The physical process is not terminated. This operation takes effect when the call completes and does not have any transactional characteristics.


DBMS_AQADM.ADD_SUBSCRIBER

Purpose:

Add a default subscriber to a queue. A program can enqueue messages to a specific list of recipients or to the default list of subscribers. This operation will only succeed on queues that allow multiple consumers. This operation takes effect immediately and the containing transaction is committed. Enqueue requests that are executed after the completion of this call will reflect the new behavior. The user must have been granted type access by executing the grant_type_access procedure.

Syntax:

DBMS_AQADM.ADD_SUBSCRIBER(
queue_namein VARCHAR2,
subscriberinsys.aq$_agent)

Usage:

Table 11-16: DBMS_AQADM.ADD_SUBSCRIBER

Parameter   Description  

queue_name

(IN varchar2)  

Specifies the name of the queue  

subscriber

(IN aq$_agent)  

See definition in section titled `Agent'  

DBMS_AQADM.REMOVE_SUBSCRIBER

Purpose:

Remove a default subscriber from a queue. This operation takes effect immediately and the containing transaction is committed. All references to the subscriber in existing messages are removed as part of the operation. The user must have been granted type access by executing the grant_type_access procedure.

Syntax:

DBMS_AQADM.REMOVE_SUBSCRIBER(
    queue_name         IN         VARCHAR2,
    subscriber         IN         sys.aq$_agent)

Usage:
Table 11-17: DBMS_AQADM.REMOVE_SUBSCRIBER
Parameter   Description  

queue_name

(IN VARCHAR2)  

Specifies the name of the queue  

subscriber

(IN aq$_agent)  

See definition in section titled `Agent'  


Enumerated Constants in the Administrative Interface

When using enumerated constants such as BROWSE, LOCKED, REMOVE, the symbol needs to be specified with the scope of the packages defining it. All types associated with the administrative interfaces have to be prepended with dbms_aqadm. For example:

dbms_aqadm.NORMAL_QUEUE


Table 11-18: Enumerated types in the administrative interface

Parameter   Options  

retention  

INFINITE  

message_grouping  

TRANSACTIONAL, NONE  

queue_type  

NORMAL_QUEUE, EXCEPTION_QUEUE  


Administration Topics

Performance

Queues are stored in database tables. The performance characteristics of queue operations are very similar to the underlying database operations.


Table and index structures

To understand the performance characteristics of queues it is important to understand the tables and index layout for AQ objects.

Creating a queue table creates a database table with approximately 25 columns. These columns store the AQ meta data and the user defined payload. The payload can be of an object type or RAW. The AQ meta data contains object types and scaler types. A view and two indexes are created on the queue table. The view allows users to query the message data. The indexes are used to accelerate access to message data. Please refer to the create queue table command for a detailed description of the objects created.


Throughput

The code path of an enqueue operation is comparable to an insert into a multi-column table with two indexes. The code path of a dequeue operation is comparable to a select and delete operation on a similar table. These operations are performed using PL/SQL functions.


Availability

Oracle Parallel Server (OPS) can be used to ensure highly available access to queue data. Queues are implemented using database tables. The tail and the head of a queue can be extreme hot spots. Since OPS does not scale well in the presence of hot spots it is recommended to limit normal access to a queue from one instance only. In case of an instance failure messages managed by the failed instance can be processed immediately by one of the surviving instances.


Scalability

Queue operation scalability is similar to the underlying database operation scalability. If a dequeue operation with wait option is issued in a Multi-Threaded Server (MTS) environment the shared server process will be dedicated to the dequeue operation for the duration of the call including the wait time. The presence of many such processes could cause severe performance and availability problems and could result in deadlocking the shared server processes. For this reason it is recommended that dequeue requests with wait option be only issued via dedicated server processes. This restriction is not enforced.


Reliability and Recoverability

The standard database reliability and recoverability characteristics apply to queue data.


Enterprise Manager Support

Enterprise manager supports GUIs for some of the administrative functions listed in the administrative interfaces section.

These include:

  1. Queues as part of schema manager to view properties
  2. Start and stop queue
  3. 
    

Importing and Exporting Queue Data

Queues are implemented on tables. The import/export of queues constitutes the import/export of the underlying queue tables and related dictionary tables. Import and export of queues can only be done at queue table granularity.

When a queue table is exported, both the table definition information and the queue data are exported. When a queue table is imported, export action procedures will maintain the queue dictionary. Because the queue table data is also exported, the user is responsible for maintaining application-level data integrity when queue table data are being transported.

Importing queue data into a queue table with existing data is not recommended. During a table mode import, if the queue table already exists at the import site the old queue table definition, and the old queue definition will be dropped and recreated. Hence, queue table and queue definitions prior to the import will be lost.


Performing EXPORTS and IMPORTS of queue tables with multiple recipients

For every queue table that supports multiple recipients, there is a index-organized table (IOT) that contains important queue metadata. This metadata is essential to the operations of the queue, so the user must export and import this IOT as well as the queue table for the queues in this table to work after import. When the schema containing the queue table is exported, the IOT is also automatically exported. The behavior is similar at import time. Because the metadata table contains rowids of some rows in the queue table, import will issue a note about the rowids being obsolete when importing the metadata table. This message can be ignored, as the queueing system will automatically correct the obsolete rowids as a part of the import process. However, if another problem is encountered while doing the import (such as running out of rollback segment space), the problem should be corrected and the import should be rerun.


Database Objects

Queue table view

This is a view of the queue table in which message data is stored. This view is automatically created with each queue table and is called aq$<queue_table_name>. This view should be used for querying the queue data. The dequeue history data (time, user identification and transaction identification) is only valid for single consumer queues. For dequeue history of messages in a multiple consumer queue please refer to a following section.

The administrator can use any SQL statement or SQL tool to analyze and review the content of a queue or queue table. SQL provides full access to the message metadata and/or payload. Use ENQ_TXN_ID and DEQ_TXN_ID to correlate transactions. If the ENQ_TXN_ID of message m2 is the same as the DEQ_TXN_ID of m1, m2 is created in the transaction that consumed m1. (You may use CONNECT BY in your SQL statements to identify related messages). Remove retained messages that are not automatically removed by AQ. Do not update or modify messages since this may destroy the consistency of the queue metadata. Before you use SQL to correct any error in AQ, please contact the Oracle service representative.


QUEUE - queue name

MSG_ID - unique identifier of the message

CORR_ID - user-provided correlation identifier

MSG_PRIORITY - message priority

MSG_STATE - state of this message

READY

DELAYED

PROCESSED

EXPIRED

DELAY - number of seconds the message is delayed

EXPIRATION - number of seconds in which the message will expire after being READY

ENQ_TIME - enqueue time

ENQ_USER_ID - enqueue user id

ENQ_TXN_ID - enqueue transaction id

DEQ_TIME - dequeue time

DEQ_USER_ID - dequeue user id

DEQ_TXN_ID - dequeue transaction id

RETRY_COUNT - number of retries

EXCEPTION_QUEUE_OWNER - exception queue schema

EXCEPTION_QUEUE - exception queue name

USER_DATA - user data


DBA_QUEUE_TABLES

This view describes the names and types of all queue tables created in the database.

OWNER - queue table schema

QUEUE_TABLE - queue table name

TYPE - payload type

OBJECT

RAW

VARIANT (Oracle internal use only)

OBJECT_TYPE - name of object type, if any

SORT_ORDER - user specified sort order

RECIPIENTS - SINGLE or MULTIPLE

MESSAGE_GROUPING - NONE or TRANSACTIONAL

USER_COMMENT - user comment for the queue table


USER_QUEUE_TABLES

This view is the same as DBA_QUEUES_TABLES with the exception that it only shows queue tables in the user's schema. It does not contain a column for OWNER.


DBA_QUEUES

Users can specify operational characteristics for individual queues. DBA_QUEUES contains the view which contains relevant information for every queue in a database.

OWNER - queue schema name

NAME - queue name

QUEUE_TABLE - queue table where this queue resides

QID - unique queue identifier

QUEUE_TYPE - queue type

NORMAL_QUEUE

EXCEPTION_QUEUE

MAX_RETRIES - number of dequeue attempts allowed

RETRY_DELAY - number of seconds before retry can be attempted

ENQUEUE_ENABLED - YES/NO

DEQUEUE_ENABLED - YES/NO

RETENTION - number of seconds message is retained after dequeue

USER_COMMENT - user comment for the queue

USER_QUEUES

This view is the same as DBA_QUEUES with the exception that it only shows queues in the user's schema. It does not contain a column for OWNER.


DBMS_AQADM.QUEUE_SUBSCRIBERS

Purpose:

To get a list of subscribers for a queue.

Syntax:

DBMS_AQADM.QUEUE_SUBSCRIBERS(
queue_name    IN     VARCHAR2)

RETURN aq$_subscriber_list_t

Usage:

The function returns a PL/SQL table of aq$_agent. This can be used to get the list of all subscribers for a queue.

Example:

DECLARE
subs            dbms_aqadm.aq$_subscriber_list_t; 
nsubs           BINARY_INTEGER; 
i               BINARY_INTEGER; 

BEGIN

subs  := dbms_aqadm.queue_subscribers('Q1DEF'); 
nsubs := subs.COUNT; 
FOR i IN 0..nsubs-1 LOOP 
dbms_output.put_line(subs(i).name); 

END LOOP; END;

/


Recipients and dequeue history of multiple consumer messages

The queue table view provides the dequeue history for single consumer queue messages. To query the list of recipients or the dequeue history of a message in a multiple-consumer queue you need to execute a SQL query on the queue table for the message of interest.

For example, to view the dequeue history of the message with msgid
`105E7A2EBFF11348E03400400B40F149' in queue table sys.queue_tab the following query must be executed. The query will return one row per consumer of the message.

SELECT consumer, transaction_id, deq_time, deq_user 
FROM THE(select cast(history as sys.aq$_dequeue_history_t) 
FROM sys.queue_tab 
WHERE msgid='105E7A2EBFF11348E03400400B40F149'); 


Error Messages

The error messages for AQ are reported in two ranges:

24000 - 24099

25200 - 25299

Reference to Demos

The following demos may be found in the related directories:

$ORACLE_HOME/demo/aqdemo00.sql   Main driver of demo 
$ORACLE_HOME/demo/aqdemo01.sql   Create queue tables and queues using  
                                 AQ administration interface 
$ORACLE_HOME/demo/aqdemo02.sql   Load the demo package 
$ORACLE_HOME/demo/aqdemo03.sql  Submit the event handler as a job to Job

                                 Queue 
$ORACLE_HOME/demo/aqdemo04.sql   Enqueue messages 




Prev

Next
Oracle
Copyright © 1997 Oracle Corporation.
All Rights Reserved.

Library

Product

Contents

Index