Oracle8i Application Developer's Guide - Advanced Queuing
Release 8.1.5

A68005-01

Library

Product

Contents

Index

Prev Next

6
Operational Interface: Basic Operations

In this chapter we describe the operational interface to Oracle Advanced Queuing in terms of use cases. That is, we discuss each operation (such as "Enqueue a Message") as a use case by that name. The table listing all the use cases is provided at the head of the chapter (see "Use Case Model: Operational Interface -- Basic Operations").

A summary figure, "Use Case Diagram: Operational Interface -- Basic Operations", locates all the use cases in single drawing. If you are using the HTML version of this document, you can use this figure to navigate to the use case in which you are interested by clicking on the relevant use case title.

The individual use cases are themselves laid out as follows:

Use Case Model: Operational Interface -- Basic Operations

Table 6-1 Use Case Model: Operational Interface
Use Case 

Enqueue a Message  

Enqueue a Message [Specify Options]  

Enqueue a Message [Specify Message Properties]  

Enqueue a Message [Add Payload]  

Listen to One (Many) Queue(s)  

Listen to One (Many) Single-Consumer Queue(s)  

Listen to One (Many) Multi-Consumer Queue(s)  

Dequeue a Message  

Dequeue a Message from a Single-Consumer Queue [Specify Options]  

Dequeue a Message from a Multi-Consumer Queue [Specify Options]  

Register for Notification  

Register for Notification [Specify Subscription Name -- Single-Consumer Queue]  

Register for Notification [Specify Subscription Name -- Multi-Consumer Queue]  

Figure 6-1 Use Case Model Diagram: Operational Interface


ENQUEUE a message LISTEN to queue(s) DEQUEUE a message REGISTER for notification

Enqueue a Message

Figure 6-2 Use Case Diagram: Enqueue a Message



To refer to the table of all basic operations having to do with the Operational Interface see:

 

Purpose:

Adds a message to the specified queue.

Syntax:

DBMS_AQ.ENQUEUE (
     
Queue_name          IN      VARCHAR2,
Enqueue_options     IN      enqueue_options_t,
Message_properties  IN      message_properties_t,
Payload             IN      "<type_name>",
Msgid               OUT     RAW);

Usage:

Table 6-2 DBMS_AQ.ENQUEUE
Parameter  Description 

queue_name

(IN VARCHAR2)  

Specifies the name of the queue to which this message should be enqueued. The queue cannot be an exception queue.  

enqueue_options

(IN enqueue_option_t)  

For the definition please refer to the section titled Enqueue a Message [Specify Options]  

message_properties

(IN message_properties_t)  

For the definition please refer to the section titled "Message Properties."  

payload

(IN "<type_name>")  

Not interpreted by Oracle AQ.

The payload must be specified according to the specification in the associated queue table. NULL is an acceptable parameter. For the definition of <type_name> please refer to section titled "Type name".  

msgid

(OUT RAW)  

The system generated identification of the message. This is a globally unique identifier that can be used to identify the message at dequeue time.  

Usage Notes

Enqueue a Message [Specify Options]

Figure 6-3 Use Case Diagram: Enqueue a Message [Specify Options]



To refer to the table of all basic operations having to do with the Operational Interface see:

 

Purpose:

To specify the options available for the enqueue operation.

Syntax:

TYPE Enqueue_options_t IS RECORD (
   Visibility            BINARY_INTEGER DEFAULT ON_COMMIT,
   Relative_msgid        RAW(16) DEFAULT NULL,
   Sequence_deviation    BINARY_INTEGER DEFAULT NULL);
     

Usage:

Table 6-3 Enqueue a Message [Specify Options]
Parameter  Description 

visibility  

Specifies the transactional behavior of the enqueue request.

ON_COMMIT: The enqueue is part of the current transaction. The operation is complete when the transaction commits. This is the default case.

IMMEDIATE: The enqueue is not part of the current transaction. The operation constitutes a transaction on its own. This is the only value allowed when enqueuing to a non-persistent queue.  

relative_msgid

 

Specifies the message identifier of the message which is referenced in the sequence deviation operation. This field is valid if and only if BEFORE is specified in sequence_deviation. This parameter will be ignored if sequence deviation is not specified.  

sequence_deviation

 

Specifies if the message being enqueued should be dequeued before other message(s) already in the queue.

BEFORE: The message is enqueued ahead of the message specified by relative_msgid.

TOP: The message is enqueued ahead of any other messages.

NULL: Default  

Usage Notes

Do not use the immediate option when you want to use LOB locators since LOB locators are valid only for the duration of the transaction. As the immediate option automatically commits the transaction, your locator will not be valid.

Enqueue a Message [Specify Message Properties]

Figure 6-4 Use Case Diagram: Enqueue a Message [Specify Message Properties]



To refer to the table of all basic operations having to do with the Operational Interface see:

 

Purpose:

The Message Properties describe the information that is used by AQ to manage individual messages. These are set at enqueue time and their values are returned at dequeue time.

Syntax:

TYPE Message_properties_t IS RECORD (
     
Priority               BINARY_INTEGER DEFAULT 1,
Delay                  BINARY_INTEGER DEFAULT NO_DELAY,
Expiration             BINARY_INTEGER DEFAULT NEVER,
Correlation            VARCHAR2(128) DEFAULT NULL,
Attempts               BINARY_INTEGER,
Recipient_list         aq$_recipient_list_t,
Exception_queue        VARCHAR2(51) DEFAULT NULL,
Enqueue_time           DATE,
State                  BINARY_INTEGER,
Sender_id              aq$_agent DEFAULT NULL, 
Original_msgid         RAW(16) DEFAULT NULL);
TYPE aq$_recipient_list_t IS TABLE OF aq$_agent
INDEX BY BINARY_INTEGER;
    

Usage:

Table 6-4 Message properties
Parameter  Description 

priority

(BINARY_INTEGER)  

Specifies/returns the priority of the message. A smaller number indicates higher priority. The priority can be any number, including negative numbers.  

delay

(BINARY_INTEGER)  

Specifies/returns the delay of the enqueued message. The delay represents the number of seconds after which a message is available for dequeuing. Dequeuing by msgid overrides the delay specification. A message enqueued with delay set will be in the WAITING state, when the delay expires the messages goes to the READY state. DELAY processing requires the queue monitor to be started. Note that delay is set by the producer who enqueues the message.

NO_DELAY: the message is available for immediate dequeuing.

number: the number of seconds to delay the message.  

expiration

(BINARY_INTEGER)  

Specifies/returns the expiration of the message. It determines, in seconds, the duration the message is available for dequeuing. This parameter is an offset from the delay. Expiration processing requires the queue monitor to be running.

NEVER: message will not expire.

number: number of seconds message will remain in READY state. If the message is not dequeued before it expires, it will be moved to the exception queue in the EXPIRED state.  

correlation

(VARCHAR2(128))  

Returns the identification supplied by the producer for a message at enqueuing.  

attempts

(BINARY_INTEGER)  

Returns the number of attempts that have been made to dequeue this message. This parameter can not be set at enqueue time.  

recipient_list

(aq$_recipient_list_t)  

For type definition please refer to section titled "Agent".

This parameter is only valid for queues which allow multiple consumers. The default recipients are the queue subscribers. This parameter is not returned to a consumer at dequeue time.  

exception_queue

(VARCHAR2(51))  

Specifies/returns the name of the queue to which the message is moved if it cannot be processed successfully. Messages are moved in two cases: The number of unsuccessful dequeue attempts has exceeded max_retries or the message has expired. All messages in the exception queue are in the EXPIRED state.

The default is the exception queue associated with the queue table. If the exception queue specified does not exist at the time of the move the message will be moved to the default exception queue associated with the queue table and a warning will be logged in the alert file. If the default exception queue is used the parameter will return a NULL value at dequeue time.  

enqueue_time

(DATE)  

Returns the time the message was enqueued. This value is determined by the system and cannot be set by the user. This parameter can not be set at enqueue time.  

state

(BINARY_INTEGER)  

Returns the state of the message at the time of the dequeue. This parameter can not be set at enqueue time.

0: The message is ready to be processed.

1: The message delay has not yet been reached.

2: The message has been processed and is retained.

3: The message has been moved to the exception queue.  

sender_id

(aq$_agent)  

Specifies/returns the application-specified sender identification.

DEFAULT: NULL  

original_msgid

(RAW(16))  

This parameter is used by Oracle AQ for propagating messages.

DEFAULT: NULL  

Usage Notes

Enqueue a Message [Specify Message Properties [Specify Sender ID]]

Figure 6-5 Use Case Diagram: Enqueue a Message [Specify Message Properties [Specify Sender ID]]



To refer to the table of all basic operations having to do with the Operational Interface see:

 

Purpose:

To identify the sender (producer) of a message.

Syntax:

TYPE aq$_agent IS OBJECT (
     
Name            VARCHAR2(30), 
Address         VARCHAR2(1024),
Protocol        NUMBER);


For more information about Agent see:

 

Enqueue a Message [Add Payload]

Figure 6-6 Use Case Diagram: Enqueue a Message [Add Payload]



To refer to the table of all basic operations having to do with the Operational Interface see:

 

Usage Notes

To store a payload of type RAW, AQ will create a queue table with LOB column as the payload repository. The maximum size of the payload is determined by which programmatic environment you use to access AQ. For PL/SQL, Java and precompilers the limit is 32K; for the OCI the limit is 4G.

Example: Enqueue of Object Type Messages


Note: You may need to set up the following data structures for certain examples to work:

CONNECT system/manager
CREATE USER aq IDENTIFIED BY aq;
GRANT Aq_administrator_role TO aq;
EXECUTE DBMS_AQADM.CREATE_QUEUE_TABLE (
   Queue_table            =>  'aq.objmsgs_qtab',
   Queue_payload_type     =>  'aq.message_typ');
EXECUTE DBMS_AQADM.CREATE_QUEUE ( 
   Queue_name            =>  'aq.msg_queue',
   Queue_table           =>  'aq.objmsgs_qtab');
EXECUTE DBMS_AQADM.START_QUEUE (
   Queue_name         => 'aq.msg_queue',
   Enqueue            => TRUE);
EXECUTE DBMS_AQADM.CREATE_QUEUE_TABLE (
   Queue_table            => 'aq.prioritymsgs_qtab',
   Sort_list              => 'PRIORITY,ENQ_TIME',
   Queue_payload_type     => 'aq.message_typ');
EXECUTE DBMS_AQADM.CREATE_QUEUE (
   Queue_name             => 'aq.priority_msg_queue',
   Queue_table            => 'aq.prioritymsgs_qtab');
EXECUTE DBMS_AQADM.START_QUEUE (
   Queue_name             => 'aq.priority_msg_queue',
   Enqueue                => TRUE);

 

Enqueue a Single Message and Specify the Queue Name and Payload.

/* Enqueue to msg_queue: */
DECLARE
   Enqueue_options     DBMS_AQ.enqueue_options_t;
   Message_properties  DBMS_AQ.message_properties_t;
   Message_handle      RAW(16);
   Message             aq.message_typ;

BEGIN
   Message := aq.message_typ('NORMAL MESSAGE',
      'enqueued to msg_queue first.');

   DBMS_AQ.ENQUEUE(queue_name => 'msg_queue',           
   Enqueue_options            => enqueue_options,       
   Message_properties         => message_properties,     
   Payload                    => message,               
   Msgid                      => message_handle);

   COMMIT;
END;

Enqueue a Single Message and Specify the Priority

/* The queue name priority_msg_queue is defined as an object type queue table. 
   The payload object type is message. The schema of the queue is aq.  */

 /* Enqueue a message with priority 30: */ 
DECLARE 
   Enqueue_options       dbms_aq.enqueue_options_t; 
   Message_properties    dbms_aq.message_properties_t; 
   Message_handle        RAW(16); 
   Message               aq.Message_typ; 
 
BEGIN 
   Message := Message_typ('PRIORITY MESSAGE', 'enqued at priority 30.'); 
 
   message_properties.priority := 30; 
 
   DBMS_AQ.ENQUEUE(queue_name => 'priority_msg_queue', 
   enqueue_options            => enqueue_options, 
   message_properties         => message_properties, 
   payload                    => message, 
   msgid                      => message_handle); 
 
   COMMIT; 
END; 

Listen to One (Many) Queue(s)

Figure 6-7 Use Case Diagram: Listen to One(Many) Queue(s)



To refer to the table of all basic operations having to do with the Operational Interface see:

 

Purpose:

To monitor one or more queues on behalf of a list of agents.

Syntax:

DBMS_AQ.LISTEN (
   agent_list IN   aq$_agent_list_t,
   wait       IN   BINARY_INTEGER default DBMS_AQ.FOREVER,
   agent      OUT  aq$_agent);

TYPE aq$_agent_list_t IS TABLE of aq$_agent INDEX BY BINARY_INTEGER;

Usage
Table 6-5 DBMS_AQADM.LISTEN
Parameter  Description 

agent_list (aq$_agent_list)  

The list of agents for which to 'listen'.  

wait (integer default DBMS_AQ.FOREVER)  

The time-out for the listen call (seconds). By default, the call will block forever.  

agent (aq$_agent)  

The agent with a message available for consumption.  

Usage Notes

The call takes a list of agents as an argument. You specify the queue to be monitored in the address field of each agent listed. You also must specify the name of the agent when monitoring multiconsumer queues. For single-consumer queues, an agent name must not be specified. Only local queues are supported as addresses. Protocol is reserved for future use.

This is a blocking call that returns when there is a message ready for consumption for an agent in the list. If there are messages for more than one agent, only the first agent listed is returned. If there are no messages found when the wait time expires, an error is raised.

A successful return from the listen call is only an indication that there is a message for one of the listed agents in one the specified queues. The interested agent must still dequeue the relevant message.

Note that you cannot call listen on non-persistent queues.

Listen to One (Many) Single-Consumer Queue(s)

Figure 6-8 Use Case Diagram: Listen to One(Many) Single-Consumer Queue(s)



To refer to the table of all basic operations having to do with the Operational Interface see:

 

Example: Listen to Queue(s) Using PL/SQL (DBMS_AQ Package)

/* The listen call allows you to monitor a list of queues for messages for 
   specific agents. You need to have dequeue privileges for all the queues 
   you wish to monitor. */

Listen to Single-Consumer Queue (Timeout of Zero).

DECLARE
   Agent_w_msg      aq$_agent;
   My_agent_list    dbms_aq.agent_list_t;

BEGIN
   /* NOTE:  MCQ1, MCQ2, MCQ3 are multi-consumer queues  in SCOTT's schema
   *        SCQ1, SCQ2, SCQ3 are single consumer queues in SCOTT's schema
   */

   Qlist(1):= aq$_agent(NULL, 'scott.SCQ1',  NULL);
   Qlist(2):= aq$_agent(NULL, 'SCQ2', NULL);
   Qlist(3):= aq$_agent(NULL, 'SCQ3', NULL);

   /* Listen with a time-out of zero: */
   DBMS_AQ.LISTEN(
      Agent_list   =>   My_agent_list, 
      Wait         =>   0, 
      Agent        =>   agent_w_msg);
   DBMS_OUTPUT.PUT_LINE('Message in Queue :- ' ||  agent_w_msg.address);
   DBMS_OUTPUT.PUT_LINE('');
END;

Example: Listen to Single-Consumer Queue(s) Using C (OCI)

Listening for Single Consumer Queues with Zero Timeout

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <oci.h>

static void checkerr(errhp, status)
OCIError *errhp;
sword status;
{
    text errbuf[512];
    ub4 buflen;
    sb4 errcode;

    switch (status)
    {
   case OCI_SUCCESS:
       break;
   case OCI_SUCCESS_WITH_INFO:
       printf("Error - OCI_SUCCESS_WITH_INFO\n");
       break;
   case OCI_NEED_DATA:
       printf("Error - OCI_NEED_DATA\n");
       break;
   case OCI_NO_DATA:
       printf("Error - OCI_NO_DATA\n");
       break;
   case OCI_ERROR:
       OCIErrorGet ((dvoid *) errhp, (ub4) 1, (text *) NULL, &errcode,
       errbuf, (ub4) sizeof(errbuf), (ub4) OCI_HTYPE_ERROR);
       printf("Error - %s\n", errbuf);
       break;
   case OCI_INVALID_HANDLE:
       printf("Error - OCI_INVALID_HANDLE\n");
       break;
   case OCI_STILL_EXECUTING:
       printf("Error - OCI_STILL_EXECUTE\n");
       break;
   case OCI_CONTINUE:
       printf("Error - OCI_CONTINUE\n");
       break;
   default:
   break;
    }
}

/* set agent into descriptor */
void SetAgent(agent, appname, queue,errhp)

OCIAQAgent  *agent;
text        *appname;
text        *queue;
OCIError    *errhp;
{

  OCIAttrSet(agent, OCI_DTYPE_AQAGENT, 
     appname ? (dvoid *)appname : (dvoid *)"", 
     appname ? strlen((const char *)appname) : 0,
        OCI_ATTR_AGENT_NAME, errhp);

  OCIAttrSet(agent, OCI_DTYPE_AQAGENT, 
     queue ? (dvoid *)queue : (dvoid *)"", 
     queue ? strlen((const char *)queue) : 0,
        OCI_ATTR_AGENT_ADDRESS, errhp);

  printf("Set agent name to %s\n", appname ? (char *)appname : "NULL");
  printf("Set agent address to %s\n", queue ? (char *)queue : "NULL");
}

/* get agent from descriptor */
void GetAgent(agent, errhp)
OCIAQAgent *agent;
OCIError   *errhp;
{
text      *appname;
text      *queue;
ub4       appsz;
ub4       queuesz;

  if (!agent )
  {
    printf("agent was NULL \n");
    return;
  }
  checkerr(errhp, OCIAttrGet(agent, OCI_DTYPE_AQAGENT, 
     (dvoid *)&appname, &appsz, OCI_ATTR_AGENT_NAME, errhp));
  checkerr(errhp, OCIAttrGet(agent, OCI_DTYPE_AQAGENT, 
     (dvoid *)&queue, &queuesz, OCI_ATTR_AGENT_ADDRESS, errhp));
  if (!appsz)
     printf("agent name: NULL\n");
  else printf("agent name: %.*s\n", appsz, (char *)appname);
  if (!queuesz)
     printf("agent address: NULL\n");
  else printf("agent address: %.*s\n", queuesz, (char *)queue);
}

int main()
{
  OCIEnv *envhp;
  OCIServer *srvhp;
  OCIError *errhp;
  OCISvcCtx *svchp;
  OCISession *usrhp;
  OCIAQAgent *agent_list[3];
  OCIAQAgent *agent = (OCIAQAgent *)0;
  /* added next 2 121598 */
  int i;

 /* Standard OCI Initialization */

  OCIInitialize((ub4) OCI_OBJECT, (dvoid *)0,  (dvoid * (*)()) 0,
     (dvoid * (*)()) 0,  (void (*)()) 0 );
 
  OCIHandleAlloc( (dvoid *) NULL, (dvoid **) &envhp, 
             (ub4) OCI_HTYPE_ENV, 0, (dvoid **) 0);
  
  OCIEnvInit( &envhp, (ub4) OCI_DEFAULT, 0, (dvoid **) 0);
  
  OCIHandleAlloc( (dvoid *) envhp, (dvoid **) &errhp, (ub4) OCI_HTYPE_ERROR,
     0, (dvoid **) 0);

  OCIHandleAlloc( (dvoid *) envhp, (dvoid **) &srvhp, (ub4) OCI_HTYPE_SERVER,
     0, (dvoid **) 0);
  
  OCIServerAttach( srvhp, errhp, (text *) 0, (sb4) 0, (ub4) OCI_DEFAULT);
  
  OCIHandleAlloc( (dvoid *) envhp, (dvoid **) &svchp, (ub4) OCI_HTYPE_SVCCTX,
     0, (dvoid **) 0);
  
  /* set attribute server context in the service context */
  OCIAttrSet( (dvoid *) svchp, (ub4) OCI_HTYPE_SVCCTX, (dvoid *)srvhp, (ub4) 0,
     (ub4) OCI_ATTR_SERVER, (OCIError *) errhp);
  
  /* allocate a user context handle */
  OCIHandleAlloc((dvoid *)envhp, (dvoid **)&usrhp, (ub4) OCI_HTYPE_SESSION,
      (size_t) 0, (dvoid **) 0);
  
  /* allocate a user context handle */
  OCIHandleAlloc((dvoid *)envhp, (dvoid **)&usrhp, (ub4) OCI_HTYPE_SESSION,
     (size_t) 0, (dvoid **) 0);
  
  OCIAttrSet((dvoid *)usrhp, (ub4)OCI_HTYPE_SESSION,
     (dvoid *)"scott", (ub4)strlen("scott"), OCI_ATTR_USERNAME, errhp);
   
  OCIAttrSet((dvoid *) usrhp, (ub4) OCI_HTYPE_SESSION,
      (dvoid *) "tiger", (ub4) strlen("tiger"),
      (ub4) OCI_ATTR_PASSWORD, errhp);   
  
  OCISessionBegin (svchp, errhp, usrhp, OCI_CRED_RDBMS, OCI_DEFAULT);
  
  OCIAttrSet((dvoid *)svchp, (ub4)OCI_HTYPE_SVCCTX,
     (dvoid *)usrhp, (ub4)0, OCI_ATTR_SESSION, errhp);

  /* AQ LISTEN Initialization - allocate agent handles */
  for (i = 0; i < 3; i++)
  {
     agent_list[i] = (OCIAQAgent *)0;
     OCIDescriptorAlloc(envhp, (dvoid **)&agent_list[i], 
         OCI_DTYPE_AQAGENT, 0, (dvoid **)0);
  }

  /* 
   *   SCQ1, SCQ2, SCQ3 are single consumer queues in SCOTT's schema
   */
 
  SetAgent(agent_list[0], (text *)0, "SCOTT.SCQ1", errhp);
  SetAgent(agent_list[1], (text *)0, "SCOTT.SCQ2", errhp);
  SetAgent(agent_list[2], (text *)0, "SCOTT.SCQ3", errhp);
 
  checkerr(errhp,OCIAQListen(svchp, errhp, agent_list, 3, 0, &agent, 0));

  printf("MESSAGE for :- \n");             
  GetAgent(agent, errhp);
  printf("\n");

}

Listening for Single Consumer Queues with Timeout of 120 Seconds

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <oci.h>

static void checkerr(errhp, status)
OCIError *errhp;
sword status;
{
    text errbuf[512];
    ub4 buflen;
    sb4 errcode;

    switch (status)
    {
   case OCI_SUCCESS:
       break;
   case OCI_SUCCESS_WITH_INFO:
       printf("Error - OCI_SUCCESS_WITH_INFO\n");
       break;
   case OCI_NEED_DATA:
       printf("Error - OCI_NEED_DATA\n");
       break;
   case OCI_NO_DATA:
       printf("Error - OCI_NO_DATA\n");
       break;
   case OCI_ERROR:
       OCIErrorGet ((dvoid *) errhp, (ub4) 1, (text *) NULL, &errcode,
       errbuf, (ub4) sizeof(errbuf), (ub4) OCI_HTYPE_ERROR);
       printf("Error - %s\n", errbuf);
       break;
   case OCI_INVALID_HANDLE:
       printf("Error - OCI_INVALID_HANDLE\n");
       break;
   case OCI_STILL_EXECUTING:
       printf("Error - OCI_STILL_EXECUTE\n");
       break;
   case OCI_CONTINUE:
       printf("Error - OCI_CONTINUE\n");
       break;
   default:
   break;
    }
}

/* set agent into descriptor */
/* void SetAgent(agent, appname, queue) */
void SetAgent(agent, appname, queue,errhp)

OCIAQAgent  *agent;
text        *appname;
text        *queue;
OCIError    *errhp;
{

  OCIAttrSet(agent, OCI_DTYPE_AQAGENT, 
     appname ? (dvoid *)appname : (dvoid *)"", 
     appname ? strlen((const char *)appname) : 0,
        OCI_ATTR_AGENT_NAME, errhp);

  OCIAttrSet(agent, OCI_DTYPE_AQAGENT, 
     queue ? (dvoid *)queue : (dvoid *)"", 
     queue ? strlen((const char *)queue) : 0,
        OCI_ATTR_AGENT_ADDRESS, errhp);

  printf("Set agent name to %s\n", appname ? (char *)appname : "NULL");
  printf("Set agent address to %s\n", queue ? (char *)queue : "NULL");
}

/* get agent from descriptor */
void GetAgent(agent, errhp)
OCIAQAgent *agent;
OCIError   *errhp;
{
text      *appname;
text      *queue;
ub4       appsz;
ub4       queuesz;

  if (!agent )
  {
    printf("agent was NULL \n");
    return;
  }
  checkerr(errhp, OCIAttrGet(agent, OCI_DTYPE_AQAGENT, 
     (dvoid *)&appname, &appsz, OCI_ATTR_AGENT_NAME, errhp));
  checkerr(errhp, OCIAttrGet(agent, OCI_DTYPE_AQAGENT, 
     (dvoid *)&queue, &queuesz, OCI_ATTR_AGENT_ADDRESS, errhp));
  if (!appsz)
     printf("agent name: NULL\n");
  else printf("agent name: %.*s\n", appsz, (char *)appname);
  if (!queuesz)
     printf("agent address: NULL\n");
  else printf("agent address: %.*s\n", queuesz, (char *)queue);
}

int main()
{
  OCIEnv *envhp;
  OCIServer *srvhp;
  OCIError *errhp;
  OCISvcCtx *svchp;
  OCISession *usrhp;
  OCIAQAgent *agent_list[3];
  OCIAQAgent *agent = (OCIAQAgent *)0;
  /* added next 2 121598 */
  int i;

 /* Standard OCI Initialization */

  OCIInitialize((ub4) OCI_OBJECT, (dvoid *)0,  (dvoid * (*)()) 0,
     (dvoid * (*)()) 0,  (void (*)()) 0 );
 
  OCIHandleAlloc( (dvoid *) NULL, (dvoid **) &envhp, 
             (ub4) OCI_HTYPE_ENV, 0, (dvoid **) 0);
  
  OCIEnvInit( &envhp, (ub4) OCI_DEFAULT, 0, (dvoid **) 0);
  
  OCIHandleAlloc( (dvoid *) envhp, (dvoid **) &errhp, (ub4) OCI_HTYPE_ERROR,
     0, (dvoid **) 0);

  OCIHandleAlloc( (dvoid *) envhp, (dvoid **) &srvhp, (ub4) OCI_HTYPE_SERVER,
     0, (dvoid **) 0);
  
  OCIServerAttach( srvhp, errhp, (text *) 0, (sb4) 0, (ub4) OCI_DEFAULT);
  
  OCIHandleAlloc( (dvoid *) envhp, (dvoid **) &svchp, (ub4) OCI_HTYPE_SVCCTX,
     0, (dvoid **) 0);
  
  /* set attribute server context in the service context */
  OCIAttrSet( (dvoid *) svchp, (ub4) OCI_HTYPE_SVCCTX, (dvoid *)srvhp, (ub4) 0,
     (ub4) OCI_ATTR_SERVER, (OCIError *) errhp);
  
  /* allocate a user context handle */
  OCIHandleAlloc((dvoid *)envhp, (dvoid **)&usrhp, (ub4) OCI_HTYPE_SESSION,
      (size_t) 0, (dvoid **) 0);
  
  /* allocate a user context handle */
  OCIHandleAlloc((dvoid *)envhp, (dvoid **)&usrhp, (ub4) OCI_HTYPE_SESSION,
     (size_t) 0, (dvoid **) 0);
  
  OCIAttrSet((dvoid *)usrhp, (ub4)OCI_HTYPE_SESSION,
     (dvoid *)"scott", (ub4)strlen("scott"), OCI_ATTR_USERNAME, errhp);
   
  OCIAttrSet((dvoid *) usrhp, (ub4) OCI_HTYPE_SESSION,
      (dvoid *) "tiger", (ub4) strlen("tiger"),
      (ub4) OCI_ATTR_PASSWORD, errhp);   
  
  OCISessionBegin (svchp, errhp, usrhp, OCI_CRED_RDBMS, OCI_DEFAULT);
  
  OCIAttrSet((dvoid *)svchp, (ub4)OCI_HTYPE_SVCCTX,
     (dvoid *)usrhp, (ub4)0, OCI_ATTR_SESSION, errhp);

  /* AQ LISTEN Initialization - allocate agent handles */
  for (i = 0; i < 3; i++)
  {
     agent_list[i] = (OCIAQAgent *)0;
     OCIDescriptorAlloc(envhp, (dvoid **)&agent_list[i], 
         OCI_DTYPE_AQAGENT, 0, (dvoid **)0);
  }

  /* 
   *   SCQ1, SCQ2, SCQ3 are single consumer queues in SCOTT's schema
   */
 
  SetAgent(agent_list[0], (text *)0, "SCOTT.SCQ1", errhp);
  SetAgent(agent_list[1], (text *)0, "SCOTT.SCQ2", errhp);
  SetAgent(agent_list[2], (text *)0, "SCOTT.SCQ3", errhp);
 
  checkerr(errhp,OCIAQListen(svchp, errhp, agent_list, 3, 120, &agent, 0));

  printf("MESSAGE for :- \n");             
  GetAgent(agent, errhp);
  printf("\n");

}

Listen to One (Many) Multi-Consumer Queue(s)

Figure 6-9 Use Case Diagram: Listen to One(Many) Multi-Consumer Queue(s)



To refer to the table of all basic operations having to do with the Operational Interface see:

 

Example: Listen to Queue(s) Using PL/SQL (DBMS_AQ Package)

/* The listen call allows you to monitor a list of queues for messages for 
   specific agents. You need to have dequeue privileges for all the queues 
   you wish to monitor. */

Listen to Multi-Consumer Queue (Timeout of Zero).

DECLARE
   Agent_w_msg      aq$_agent;
   My_agent_list    dbms_aq.agent_list_t;

BEGIN
   /* NOTE:  MCQ1, MCQ2, MCQ3 are multi-consumer queues  in SCOTT's schema
   *        SCQ1, SCQ2, SCQ3 are single consumer queues in SCOTT's schema
   */
    Qlist(1):= aq$_agent('agent1', 'MCQ1',  NULL);
    Qlist(2):= aq$_agent('agent2', 'scott.MCQ2', NULL);
    Qlist(3):= aq$_agent('agent3', 'scott.MCQ3', NULL);

   /* Listen with a time-out of zero: */
   DBMS_AQ.LISTEN(
      agent_list   =>    My_agent_list, 
      wait         =>    0, 
      agent        =>    agent_w_msg);
   DBMS_OUTPUT.PUT_LINE('Message in Queue :- ' ||  agent_w_msg.address);
   DBMS_OUTPUT.PUT_LINE('');
END;
   /

Listen to Mixture of Multi-Consumer Queues (Timeout 100 Seconds).

DECLARE
   Agent_w_msg      aq$_agent;
   My_agent_list    dbms_aq.agent_list_t;

BEGIN
   /* NOTE:  MCQ1, MCQ2, MCQ3 are multi-consumer queues  in SCOTT's schema
   *        SCQ1, SCQ2, SCQ3 are single consumer queues in SCOTT's schema
   */
   Qlist(1):= aq$_agent('agent1', 'MCQ1',  NULL);
   Qlist(2):= aq$_agent(NULL, 'scott.SQ1', NULL);
   Qlist(3):= aq$_agent('agent3', 'scott.MCQ3', NULL);
   /* Listen with a time-out of 100 seconds */
   DBMS_AQ.LISTEN(
      Agent_list   =>   My_agent_list, 
      Wait         =>   100, 
      Agent        =>    agent_w_msg);
      DBMS_OUTPUT.PUT_LINE('Message in Queue :- ' ||  agent_w_msg.address 
                           || 'for agent' || agent_w_msg.name);
      DBMS_OUTPUT.PUT_LINE('');
   END;
   /

Example: Listen to Multi-Consumer Queue(s) Using C (OCI)

Listening to Multi-consumer Queues with a Zero Timeout, a Timeout of 120 Seconds, and a Timeout of 100 Seconds

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <oci.h>

static void checkerr(errhp, status)
OCIError *errhp;
sword status;
{
    text errbuf[512];
    ub4 buflen;
    sb4 errcode;

    switch (status)
    {
   case OCI_SUCCESS:
       break;
   case OCI_SUCCESS_WITH_INFO:
       printf("Error - OCI_SUCCESS_WITH_INFO\n");
       break;
   case OCI_NEED_DATA:
       printf("Error - OCI_NEED_DATA\n");
       break;
   case OCI_NO_DATA:
       printf("Error - OCI_NO_DATA\n");
       break;
   case OCI_ERROR:
       OCIErrorGet ((dvoid *) errhp, (ub4) 1, (text *) NULL, &errcode,
       errbuf, (ub4) sizeof(errbuf), (ub4) OCI_HTYPE_ERROR);
       printf("Error - %s\n", errbuf);
       break;
   case OCI_INVALID_HANDLE:
       printf("Error - OCI_INVALID_HANDLE\n");
       break;
   case OCI_STILL_EXECUTING:
       printf("Error - OCI_STILL_EXECUTE\n");
       break;
   case OCI_CONTINUE:
       printf("Error - OCI_CONTINUE\n");
       break;
   default:
   break;
    }
}

void SetAgent(OCIAQAgent *agent, 
         text       *appname, 
         text       *queue,
         OCIError   *errhp,
         OCIEnv     *envhp);

void GetAgent(OCIAQAgent *agent, 
         OCIError   *errhp);

/*----------------------------------------------------------------*/
/* OCI Listen examples for multi-consumers                        */
/*                                                                */
void SetAgent(agent, appname, queue, errhp)
OCIAQAgent    *agent;
text          *appname;
text          *queue;
OCIError      *errhp;
{
  OCIAttrSet(agent, 
        OCI_DTYPE_AQAGENT, 
        appname ? (dvoid *)appname : (dvoid *)"", 
        appname ? strlen((const char *)appname) : 0,
             OCI_ATTR_AGENT_NAME, 
        errhp);

  OCIAttrSet(agent, 
        OCI_DTYPE_AQAGENT, 
        queue ? (dvoid *)queue : (dvoid *)"", 
        queue ? strlen((const char *)queue) : 0,
             OCI_ATTR_AGENT_ADDRESS, 
        errhp);

  printf("Set agent name to %s\n", appname ? (char *)appname : "NULL");
  printf("Set agent address to %s\n", queue ? (char *)queue : "NULL");
}

/* get agent from descriptor */
void GetAgent(agent, errhp)
OCIAQAgent *agent;
OCIError   *errhp;
{
   text      *appname;
   text      *queue;
   ub4       appsz;
   ub4       queuesz;

   if (!agent )
  {
     printf("agent was NULL \n");
     return;
  }
  checkerr(errhp, OCIAttrGet(agent, OCI_DTYPE_AQAGENT, 
     (dvoid *)&appname, &appsz, OCI_ATTR_AGENT_NAME, errhp));
  checkerr(errhp, OCIAttrGet(agent, OCI_DTYPE_AQAGENT, 
     (dvoid *)&queue, &queuesz, OCI_ATTR_AGENT_ADDRESS, errhp));
  if (!appsz)
     printf("agent name: NULL\n");
  else printf("agent name: %.*s\n", appsz, (char *)appname);
  if (!queuesz)
     printf("agent address: NULL\n");
  else printf("agent address: %.*s\n", queuesz, (char *)queue);
}

/* main from AQ Listen to Multi-Consumer Queue(s) */

/*  int main() */
int main(char *argv, int argc)
{
    OCIEnv     *envhp;
    OCIServer  *srvhp;
    OCIError   *errhp;
    OCISvcCtx  *svchp;
    OCISession *usrhp;
    OCIAQAgent *agent_list[3];
    OCIAQAgent *agent;
    int         i;

 /* Standard OCI Initialization */

  OCIInitialize((ub4) OCI_OBJECT, 
      (dvoid *)0,  
      (dvoid * (*)()) 0,
      (dvoid * (*)()) 0,  
      (void (*)()) 0 );
  
  OCIHandleAlloc( (dvoid *) NULL, (dvoid **) &envhp, (ub4) OCI_HTYPE_ENV,
     0, (dvoid **) 0);
  
  OCIEnvInit( &envhp, (ub4) OCI_DEFAULT, 0, (dvoid **)0);
  
  OCIHandleAlloc( (dvoid *) envhp, (dvoid **) &errhp, (ub4) OCI_HTYPE_ERROR,
     0, (dvoid **) 0);

  OCIHandleAlloc( (dvoid *) envhp, (dvoid **) &srvhp, (ub4) OCI_HTYPE_SERVER,
     0, (dvoid **) 0);
  
  OCIServerAttach( srvhp, errhp, (text *) 0, (sb4) 0, (ub4) OCI_DEFAULT);
  
  OCIHandleAlloc( (dvoid *) envhp, (dvoid **) &svchp, (ub4) OCI_HTYPE_SVCCTX,
     0, (dvoid **) 0);
  
  /* set attribute server context in the service context */
  OCIAttrSet( (dvoid *) svchp, (ub4) OCI_HTYPE_SVCCTX, (dvoid *)srvhp, (ub4) 0,
     (ub4) OCI_ATTR_SERVER, (OCIError *) errhp);
  
   /* allocate a user context handle */
  OCIHandleAlloc((dvoid *)envhp, (dvoid **)&usrhp, (ub4) OCI_HTYPE_SESSION,
     (size_t) 0, (dvoid **) 0);
  
   /* allocate a user context handle */
  OCIHandleAlloc((dvoid *)envhp, (dvoid **)&usrhp, (ub4) OCI_HTYPE_SESSION,
     (size_t) 0, (dvoid **) 0);
  
  
  OCIAttrSet((dvoid *)usrhp, (ub4)OCI_HTYPE_SESSION,
     (dvoid *)"scott", (ub4)strlen("scott"), OCI_ATTR_USERNAME, errhp);
   

   OCIAttrSet((dvoid *) usrhp, (ub4) OCI_HTYPE_SESSION,
      (dvoid *) "tiger", (ub4) strlen("tiger"),
      (ub4) OCI_ATTR_PASSWORD, errhp);   
  
  OCISessionBegin (svchp, errhp, usrhp, OCI_CRED_RDBMS, OCI_DEFAULT);
  
  OCIAttrSet((dvoid *)svchp, (ub4)OCI_HTYPE_SVCCTX,
     (dvoid *)usrhp, (ub4)0, OCI_ATTR_SESSION, errhp);

  /* AQ LISTEN Initialization - allocate agent handles */
  for (i = 0; i < 3; i++)
  {
     OCIDescriptorAlloc(envhp, (dvoid **)&agent_list[i], 
         OCI_DTYPE_AQAGENT, 0, (dvoid **)0);
  }

  /* 
   *  MCQ1, MCQ2, MCQ3 are multi-consumer queues in SCOTT's schema 
   */
  /* Listening to Multi-consumer Queues with Zero Timeout */

  SetAgent(agent_list[0], "app1", "MCQ1", errhp);
  SetAgent(agent_list[1], "app2", "MCQ2", errhp);
  SetAgent(agent_list[2], "app3", "MCQ3", errhp);
 
  checkerr(errhp, OCIAQListen(svchp, errhp, agent_list, 3, 0, &agent, 0));

  printf("MESSAGE for :- \n");             
  GetAgent(agent, errhp);
  printf("\n");


  /* Listening to Multi-consumer Queues with Timeout of 120 Seconds */

  SetAgent(agent_list[0], "app1", "SCOTT.MCQ1", errhp);
  SetAgent(agent_list[1], "app2", "SCOTT.MCQ2", errhp);
  SetAgent(agent_list[2], "app3", "SCOTT.MCQ3", errhp);
 
  checkerr(errhp, OCIAQListen(svchp, errhp, agent_list, 3, 120, &agent, 0));

  printf("MESSAGE for :- \n");             
  GetAgent(agent, errhp);
  printf("\n");


  /* Listening to a Mixture of Single and Multi-consumer Queues 
   * with a Timeout of 100 Seconds
   */

  SetAgent(agent_list[0], "app1", "SCOTT.MCQ1", errhp);
  SetAgent(agent_list[1], "app2", "SCOTT.MCQ2", errhp);
  SetAgent(agent_list[2], (text *)0, "SCOTT.SCQ3", errhp);
 
  checkerr(errhp, OCIAQListen(svchp, errhp, agent_list, 3, 100, &agent, 0));

  printf("MESSAGE for :- \n");             
  GetAgent(agent, errhp);
  printf("\n");

}

Dequeue a Message

Figure 6-10 Use Case Diagram: Dequeue a Message


To refer to the table of all basic operations having to do with the Operational Interface see:

 

Purpose:

Dequeues a message from the specified queue.

Syntax:

DBMS_AQ.DEQUEUE (
     
queue_name          IN      VARCHAR2,
dequeue_options     IN      dequeue_options_t,
message_properties  OUT     message_properties_t,
payload             OUT    "<type_name>",
msgid               OUT     raw);

Usage:

Table 6-6 DBMS_AQ.DEQUEUE
Parameter  Description 

queue_name

(IN VARCHAR2)  

Specifies the name of the queue.

 

dequeue_options

(IN dequeue_option_t)  

For the definition please refer to the section titled "DEQUEUE Options."

 

message_properties

(OUT message_properties_t)  

For the definition please refer to the section titled "Message Properties."

 

payload

(OUT "<type_name>")  

Not interpreted by Oracle AQ.

The payload must be specified according to the specification in the associated queue table. For the definition of <type_name> please refer to section titled "Type name".  

msgid

(OUT RAW)  

The system generated identification of the message.  

Usage Notes

Search criteria and dequeue order for messages:

Navigating through a queue:

The default NAVIGATION parameter during dequeue is NEXT_MESSAGE. This means that subsequent dequeues will retrieve the messages from the queue based on the snapshot obtained in the first dequeue. In particular, a message that is enqueued after the first dequeue command will be processed only after processing all the remaining messages in the queue. This is usually sufficient when all the messages have already been enqueued into the queue, or when the queue does not have a priority-based ordering. However, applications must use the FIRST_MESSAGE navigation option when the first message in the queue needs to be processed by every dequeue command. This usually becomes necessary when a higher priority message arrives in the queue while messages already-enqueued are being processed.


Note:

It may also be more efficient to use the FIRST_MESSAGE navigation option when there are messages being concurrently enqueued. If the FIRST_MESSAGE option is not specified, AQ will have to continually generate the snapshot as of the first dequeue command, leading to poor performance. If the FIRST_MESSAGE option is specified, AQ will use a new snapshot for every dequeue command.  


Dequeue by Message Grouping:

Dequeue a Message from a Single-Consumer Queue [Specify Options]

Figure 6-11 Use Case Diagram: Dequeue a Message from a Single-Consumer Queue



To refer to the table of all basic operations having to do with the Operational Interface see:

 

Purpose:

To specify the options available for the dequeue operation.

Syntax:

TYPE dequeue_options_t IS RECORD (
   consumer_name    VARCHAR2(30) default NULL,
   dequeue_mode     BINARY_INTEGER default REMOVE,
   navigation       BINARY_INTEGER default NEXT_MESSAGE,
   visibility       BINARY_INTEGER default ON_COMMIT,
   wait             BINARY_INTEGER default FOREVER,
   msgid            RAW(16) default NULL,
   correlation      VARCHAR2(128) default NULL);
     

Usage:

Table 6-7 DEQUEUE options for a Singe-Consumer Queue
Parameter  Description 

consumer_name  

Name of the consumer. Only those messages matching the consumer name are accessed. If a queue is not set up for multiple consumers, this field should be set to NULL.  

dequeue_mode  

Specifies the locking behavior associated with the dequeue.

BROWSE: Read the message without acquiring any lock on the message. This is equivalent to a select statement.

LOCKED: Read and obtain a write lock on the message. The lock lasts for the duration of the transaction. This is equivalent to a select for update statement.

REMOVE: Read the message and update or delete it. This is the default. The message can be retained in the queue table based on the retention properties.

REMOVE_NODATA: Mark the message as updated or deleted. The message can be retained in the queue table based on the retention properties.  

navigation  

Specifies the position of the message that will be retrieved. First, the position is determined. Second, the search criterion is applied. Finally, the message is retrieved.

NEXT_MESSAGE: Retrieve the next message which is available and matches the search criteria. If the previous message belongs to a message group, AQ will retrieve the next available message which matches the search criteria and belongs to the message group. This is the default.

NEXT_TRANSACTION: Skip the remainder of the current transaction group (if any) and retrieve the first message of the next transaction group. This option can only be used if message grouping is enabled for the current queue.

FIRST_MESSAGE: Retrieves the first message which is available and matches the search criteria. This will reset the position to the beginning of the queue.  

visibility  

Specifies whether the new message is dequeued as part of the current transaction.The visibility parameter is ignored when using the BROWSE mode.

ON_COMMIT: The dequeue will be part of the current transaction. This is the default case.

IMMEDIATE: The dequeued message is not part of the current transaction. It constitutes a transaction on its own.  

wait  

Specifies the wait time if there is currently no message available which matches the search criteria.

FOREVER: wait forever. This is the default.

NO_WAIT: do not wait

number: wait time in seconds  

msgid  

Specifies the message identifier of the message to be dequeued.  

correlation

 

Specifies the correlation identifier of the message to be dequeued. Special pattern matching characters, such as the percent sign (%) and the underscore (_) can be used. If more than one message satisfies the pattern, the order of dequeuing is undetermined.  

Usage Notes

Typically, you expect the consumer of messages to access messages using the dequeue interface. You can view processed messages or messages still to be processed by browsing by message id or by using SELECTs.

Example: Dequeue of Object Type Messages using PL/SQL (DBMS_AQ Package)

/* Dequeue from msg_queue: */
DECLARE
dequeue_options     dbms_aq.dequeue_options_t;
message_properties  dbms_aq.message_properties_t;
message_handle      RAW(16);
message             aq.message_typ;

BEGIN
   DBMS_AQ.DEQUEUE(
      queue_name          =>     'msg_queue',
      dequeue_options      =>    dequeue_options,
      message_properties  =>     message_properties,
      payload             =>     message,
      msgid               =>     message_handle);

   DBMS_OUTPUT.PUT_LINE ('Message: ' || message.subject ||
                                      ' ... ' || message.text );
   COMMIT;
END;

Dequeue a Message from a Multi-Consumer Queue [Specify Options]

Figure 6-12 Use Case Diagram: Dequeue a Message from a Multi-Consumer Queue



To refer to the table of all basic operations having to do with the Operational Interface see:

 

Purpose:

To specify the options available for the dequeue operation.

Syntax:

TYPE dequeue_options_t IS RECORD (
   consumer_name    VARCHAR2(30) default NULL,
   dequeue_mode     BINARY_INTEGER default REMOVE,
   navigation       BINARY_INTEGER default NEXT_MESSAGE,
   visibility       BINARY_INTEGER default ON_COMMIT,
   wait             BINARY_INTEGER default FOREVER,
   msgid            RAW(16) default NULL,
   correlation      VARCHAR2(128) default NULL);
     

Usage:

Table 6-8 DEQUEUE options for a Multi-Consumer Queue
Parameter  Description 

consumer_name  

Name of the consumer. Only those messages matching the consumer name are accessed. If a queue is not set up for multiple consumers, this field should be set to NULL.  

dequeue_mode  

Specifies the locking behavior associated with the dequeue.

BROWSE: Read the message without acquiring any lock on the message. This is equivalent to a select statement.

LOCKED: Read and obtain a write lock on the message. The lock lasts for the duration of the transaction. This is equivalent to a select for update statement.

REMOVE: Read the message and update or delete it. This is the default. The message can be retained in the queue table based on the retention properties.

REMOVE_NODATA: Mark the message as updated or deleted. The message can be retained in the queue table based on the retention properties.  

navigation  

Specifies the position of the message that will be retrieved. First, the position is determined. Second, the search criterion is applied. Finally, the message is retrieved.

NEXT_MESSAGE: Retrieve the next message which is available and matches the search criteria. If the previous message belongs to a message group, AQ will retrieve the next available message which matches the search criteria and belongs to the message group. This is the default.

NEXT_TRANSACTION: Skip the remainder of the current transaction group (if any) and retrieve the first message of the next transaction group. This option can only be used if message grouping is enabled for the current queue.

FIRST_MESSAGE: Retrieves the first message which is available and matches the search criteria. This will reset the position to the beginning of the queue.  

visibility  

Specifies whether the new message is dequeued as part of the current transaction.The visibility parameter is ignored when using the BROWSE mode.

ON_COMMIT: The dequeue will be part of the current transaction. This is the default case.

IMMEDIATE: The dequeued message is not part of the current transaction. It constitutes a transaction on its own.  

wait  

Specifies the wait time if there is currently no message available which matches the search criteria.

FOREVER: wait forever. This is the default.

NO_WAIT: do not wait

number: wait time in seconds  

msgid  

Specifies the message identifier of the message to be dequeued.  

correlation

 

Specifies the correlation identifier of the message to be dequeued. Special pattern matching characters, such as the percent sign (%) and the underscore (_) can be used. If more than one message satisfies the pattern, the order of dequeuing is undetermined.  

Register for Notification

Figure 6-13 Use Case Diagram: Register for Notification



To refer to the table of all basic operations having to do with the Operational Interface see:

 

Purpose:

To register a callback for message notification.

Syntax:

ub4 OCISubscriptionRegister (
   OCISvcCtx          *svchp,
   OCISubscription    **subscrhpp,
   ub2                count,
   OCIError           *errhp,
   ub4                mode);

Usage:

Table 6-9 DEQUEUE options for a Multi-Consumer Queue
Parameter  Description 

svchp (IN)  

A V8 OCI service context. This service context should have a valid authenticated user handle.  

subscrhpp (IN)  

An array of subscription handles. Each element of this array should be a subscription handle with the OCI_ATTR_SUBSCR_NAME, OCI_ATTR_SUBSCR_NAMESPACE, OCI_ATTR_SUBSCR_CBACK, and OCI_ATTR_SUBSCR_CTX attributes set; otherwise, an error will be returned. For information, see Subscription Handle Attributes.

When a notification is received for the registration denoted by the subscrhpp[i], the user defined callback function (OCI_ATTR_SUBSCR_CBACK) set for subscrhpp[i] will get invoked with the context (OCI_ATTR_SUBSCR_CTX) set for subscrhpp[i].  

count (IN)  

The number of elements in the subscription handle array  

errhp (OUT)  

An error handle you can pass to OCIErrorGet() for diagnostic information in the event of an error.  

mode (IN)  

Call-specific mode. Valid values:

  • OCI_DEFAULT - executes the default call which specifies that the registration is treated as disconnected

  • OCI_NOTIFY_CONNECTED - notifications are received only if the client is connected (not supported in this release)

Whenever a new client process comes up, or an old one goes down and comes back up, it needs to register for all subscriptions of interest. If the client stays up and the server first goes down and then comes back up, the client will continue to receive notifications for registrations that are DISCONNECTED. However, the client will not receive notifications for CONNECTED registrations as they will be lost once the server goes down and comes back up.  

Usage Notes

Register for Notification [Specify Subscription Name -- Single-Consumer Queue]

Figure 6-14 Use Case Diagram: Specify Subscription Name - Single Consumer Queue



To refer to the table of all basic operations having to do with the Operational Interface see:

 

Register for Notification [Specify Subscription Name -- Multi-Consumer Queue]

Figure 6-15 Use Case Diagram: Specify Subscription Name - Multi-Consumer Queue



To refer to the table of all basic operations having to do with the Operational Interface see:

 

Example: Register for Notifications For Single-Consumer and Multi-Consumer Queries Using C (OCI)

/* OCIRegister can be used by the client to register to receive notifications 
   when messages are enqueued into non-persistent and normal queues. */
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <oci.h>


static OCIEnv     *envhp;
static OCIServer  *srvhp;
static OCIError   *errhp;
static OCISvcCtx  *svchp;


/* The callback that gets invoked on notification */
ub4 notifyCB(ctx, subscrhp, pay, payl, desc, mode)
dvoid *ctx;
OCISubscription *subscrhp;      /* subscription handle */
dvoid           *pay;           /* payload  */
ub4              payl;          /* payload length */
dvoid           *desc;          /* the AQ notification descriptor */
ub4              mode;
{
 text                *subname;
 ub4                  size;
 ub4                 *number = (ub4 *)ctx;
 text                *queue;
 text                *consumer;
 OCIRaw              *msgid;
 OCIAQMsgProperties  *msgprop;

  (*number)++;

  /* Get the subscription name */
  OCIAttrGet((dvoid *)subscrhp, OCI_HTYPE_SUBSCRIPTION,
                             (dvoid *)&subname, &size,
                             OCI_ATTR_SUBSCR_NAME, errhp);
 printf("got notification number %d for %.*s  %d  \n", 
         *number, size, subname, payl);

 /* Get the queue name from the AQ notify descriptor */
 OCIAttrGet(desc, OCI_DTYPE_AQNFY_DESCRIPTOR, (dvoid *)&queue, &size, 
             OCI_ATTR_QUEUE_NAME, errhp);
 
 /* Get the consumer name for which this notification was received */
 OCIAttrGet(desc, OCI_DTYPE_AQNFY_DESCRIPTOR, (dvoid *)&consumer, &size, 
       OCI_ATTR_CONSUMER_NAME, errhp);

 /* Get the message id of the message for which we were notified */
 OCIAttrGet(desc, OCI_DTYPE_AQNFY_DESCRIPTOR, (dvoid *)&msgid, &size, 
       OCI_ATTR_NFY_MSGID, errhp);

 /* Get the message properties of the message for which we were notified */
 OCIAttrGet(desc, OCI_DTYPE_AQNFY_DESCRIPTOR, (dvoid *)&msgprop, &size, 
       OCI_ATTR_MSG_PROP, errhp);

}


int main(argc, argv)
int argc;
char *argv[];
{
  OCISession *authp = (OCISession *) 0;

  /* The subscription handles */
  OCISubscription *subscrhp[5];

  /* Registrations are for AQ namespace */
  ub4 namespace = OCI_SUBSCR_NAMESPACE_AQ;   

  /* The context fot the callback */
  ub4 ctx[5] = {0,0,0,0,0};

  printf("Initializing OCI Process\n");

  /* The OCI Process Environment must be initialized  with OCI_EVENTS */
  /* OCI_OBJECT flag is set to enable us dequeue */
  (void) OCIInitialize((ub4) OCI_EVENTS|OCI_OBJECT, (dvoid *)0,
                       (dvoid * (*)(dvoid *, size_t)) 0,
                       (dvoid * (*)(dvoid *, dvoid *, size_t))0,
                       (void (*)(dvoid *, dvoid *)) 0 );

  printf("Initialization successful\n");

  /* The standard OCI setup */
  printf("Initializing OCI Env\n");
  (void) OCIEnvInit((OCIEnv **) &envhp, OCI_DEFAULT, (size_t) 0, 
                (dvoid **) 0 );

  (void) OCIHandleAlloc( (dvoid *) envhp, (dvoid **) &errhp, OCI_HTYPE_ERROR, 
                   (size_t) 0, (dvoid **) 0);

  /* Server contexts */
  (void) OCIHandleAlloc( (dvoid *) envhp, (dvoid **) &srvhp, OCI_HTYPE_SERVER,
                   (size_t) 0, (dvoid **) 0);

  (void) OCIHandleAlloc( (dvoid *) envhp, (dvoid **) &svchp, OCI_HTYPE_SVCCTX,
                   (size_t) 0, (dvoid **) 0);


  printf("connecting to server\n");
  (void) OCIServerAttach( srvhp, errhp, (text *)"", strlen(""), 0);
  printf("connect successful\n");

  /* Set attribute server context in the service context */
  (void) OCIAttrSet( (dvoid *) svchp, OCI_HTYPE_SVCCTX, (dvoid *)srvhp, 
          (ub4) 0, OCI_ATTR_SERVER, (OCIError *) errhp);

  (void) OCIHandleAlloc((dvoid *) envhp, (dvoid **)&authp,
             (ub4) OCI_HTYPE_SESSION, (size_t) 0, (dvoid **) 0);
 
  (void) OCIAttrSet((dvoid *) authp, (ub4) OCI_HTYPE_SESSION,
                 (dvoid *) "scott", (ub4) strlen("scott"),
                 (ub4) OCI_ATTR_USERNAME, errhp);
 
  (void) OCIAttrSet((dvoid *) authp, (ub4) OCI_HTYPE_SESSION,
                 (dvoid *) "tiger", (ub4) strlen("tiger"),
                 (ub4) OCI_ATTR_PASSWORD, errhp);

  checkerr(errhp, OCISessionBegin ( svchp,  errhp, authp, OCI_CRED_RDBMS, 
           (ub4) OCI_DEFAULT));

  (void) OCIAttrSet((dvoid *) svchp, (ub4) OCI_HTYPE_SVCCTX,
                   (dvoid *) authp, (ub4) 0,
                   (ub4) OCI_ATTR_SESSION, errhp);

 /* Setting the subscription handle for notification on
    a NORMAL single consumer queue */
  printf("allocating subscription handle\n");
  subscrhp[0] = (OCISubscription *)0;
  (void) OCIHandleAlloc((dvoid *) envhp, (dvoid **)&subscrhp[0], 
         (ub4) OCI_HTYPE_SUBSCRIPTION,
         (size_t) 0, (dvoid **) 0);
 
  printf("setting subscription name\n");
  (void) OCIAttrSet((dvoid *) subscrhp[0], (ub4) OCI_HTYPE_SUBSCRIPTION,
                (dvoid *) "SCOTT.SCQ1", (ub4) strlen("SCOTT.SCQ1"),
                (ub4) OCI_ATTR_SUBSCR_NAME, errhp);
 
  printf("setting subscription callback\n");
  (void) OCIAttrSet((dvoid *) subscrhp[0], (ub4) OCI_HTYPE_SUBSCRIPTION,
                 (dvoid *) notifyCB, (ub4) 0,
                 (ub4) OCI_ATTR_SUBSCR_CALLBACK, errhp);

 printf("setting subscription context \n");
 (void) OCIAttrSet((dvoid *) subscrhp[0], (ub4) OCI_HTYPE_SUBSCRIPTION,
                 (dvoid *)&ctx[0], (ub4)sizeof(ctx[0]),
                 (ub4) OCI_ATTR_SUBSCR_CTX, errhp);

  printf("setting subscription namespace\n");
  (void) OCIAttrSet((dvoid *) subscrhp[0], (ub4) OCI_HTYPE_SUBSCRIPTION,
                 (dvoid *) &namespace, (ub4) 0,
                 (ub4) OCI_ATTR_SUBSCR_NAMESPACE, errhp);

 /* Setting the subscription handle for notification on a NORMAL multi-consumer 
    consumer queue */
  subscrhp[1] = (OCISubscription *)0;
  (void) OCIHandleAlloc((dvoid *) envhp, (dvoid **)&subscrhp[1], 
         (ub4) OCI_HTYPE_SUBSCRIPTION,
         (size_t) 0, (dvoid **) 0);

  (void) OCIAttrSet((dvoid *) subscrhp[1], (ub4) OCI_HTYPE_SUBSCRIPTION,
                 (dvoid *) "SCOTT.MCQ1:APP1", 
                 (ub4) strlen("SCOTT.MCQ1:APP1"),
                 (ub4) OCI_ATTR_SUBSCR_NAME, errhp);

  (void) OCIAttrSet((dvoid *) subscrhp[1], (ub4) OCI_HTYPE_SUBSCRIPTION,
                 (dvoid *) notifyCB, (ub4) 0,
                 (ub4) OCI_ATTR_SUBSCR_CALLBACK, errhp);

  (void) OCIAttrSet((dvoid *) subscrhp[1], (ub4) OCI_HTYPE_SUBSCRIPTION,
                 (dvoid *)&ctx[1], (ub4)sizeof(ctx[1]),
                 (ub4) OCI_ATTR_SUBSCR_CTX, errhp);

  (void) OCIAttrSet((dvoid *) subscrhp[1], (ub4) OCI_HTYPE_SUBSCRIPTION,
                 (dvoid *) &namespace, (ub4) 0,
                 (ub4) OCI_ATTR_SUBSCR_NAMESPACE, errhp);


 /* Setting the subscription handle for notification on a non-persistent   
   single-consumer queue */
  subscrhp[2] = (OCISubscription *)0;
  (void) OCIHandleAlloc((dvoid *) envhp, (dvoid **)&subscrhp[2], 
         (ub4) OCI_HTYPE_SUBSCRIPTION,
         (size_t) 0, (dvoid **) 0);

  (void) OCIAttrSet((dvoid *) subscrhp[2], (ub4) OCI_HTYPE_SUBSCRIPTION,
                 (dvoid *) "SCOTT.NP_SCQ1", 
                 (ub4) strlen("SCOTT.NP_SCQ1"),
                 (ub4) OCI_ATTR_SUBSCR_NAME, errhp);

  (void) OCIAttrSet((dvoid *) subscrhp[2], (ub4) OCI_HTYPE_SUBSCRIPTION,
                 (dvoid *) notifyCB, (ub4) 0,
                 (ub4) OCI_ATTR_SUBSCR_CALLBACK, errhp);

  (void) OCIAttrSet((dvoid *) subscrhp[2], (ub4) OCI_HTYPE_SUBSCRIPTION,
                 (dvoid *)&ctx[2], (ub4)sizeof(ctx[2]),
                 (ub4) OCI_ATTR_SUBSCR_CTX, errhp);

  (void) OCIAttrSet((dvoid *) subscrhp[2], (ub4) OCI_HTYPE_SUBSCRIPTION,
                 (dvoid *) &namespace, (ub4) 0,
                 (ub4) OCI_ATTR_SUBSCR_NAMESPACE, errhp);


 /* Setting the subscription handle for notification on
    a non-persistent multi consumer queue */
 /* Waiting on user specified recipient */
  subscrhp[3] = (OCISubscription *)0;
  (void) OCIHandleAlloc((dvoid *) envhp, (dvoid **)&subscrhp[3], 
         (ub4) OCI_HTYPE_SUBSCRIPTION,
         (size_t) 0, (dvoid **) 0);

  (void) OCIAttrSet((dvoid *) subscrhp[3], (ub4) OCI_HTYPE_SUBSCRIPTION,
                 (dvoid *) "SCOTT.NP_MCQ1", 
                 (ub4) strlen("SCOTT.NP_MCQ1"),
                 (ub4) OCI_ATTR_SUBSCR_NAME, errhp);

  (void) OCIAttrSet((dvoid *) subscrhp[3], (ub4) OCI_HTYPE_SUBSCRIPTION,
                 (dvoid *) notifyCB, (ub4) 0,
                 (ub4) OCI_ATTR_SUBSCR_CALLBACK, errhp);

 (void) OCIAttrSet((dvoid *) subscrhp[3], (ub4) OCI_HTYPE_SUBSCRIPTION,
                 (dvoid *)&ctx[3], (ub4)sizeof(ctx[3]),
                 (ub4) OCI_ATTR_SUBSCR_CTX, errhp);

  (void) OCIAttrSet((dvoid *) subscrhp[3], (ub4) OCI_HTYPE_SUBSCRIPTION,
                 (dvoid *) &namespace, (ub4) 0,
                 (ub4) OCI_ATTR_SUBSCR_NAMESPACE, errhp);


  printf("Registering for all the subscriptiosn \n");
  checkerr(errhp, OCISubscriptionRegister(svchp, subscrhp, 4, errhp, 
                 OCI_DEFAULT));

  printf("Waiting for notifcations \n");
  
  /* wait for minutes for notifications */
  sleep(300);

  printf("Exiting\n");
}




Prev

Next
Oracle
Copyright © 1999 Oracle Corporation.

All Rights Reserved.

Library

Product

Contents

Index