Общие сведения

Описание механизмов обмена асинхронными сообщениями и примеры использования

Обзор
Примеры
Пример 1: Отправка одного сообщения
Пример 2: Отправка нескольких сообщений одновременно
Пример 3: Очистка
Пример 4: Настройка события, запускающего копирование
Пример 5: Использование времени как триггера
Пример 6: Клиентские и серверные процессы

Обзор

Асинхронный обмен сообщениями - это модель взаимодействия, основанная на методе промежуточного хранения. По сравнению с обычным общением на основе ответов в системах клиент-сервер, инфраструктура обмена сообщениями обеспечивает доставку, даже если участник временно отключен, занят или недоступен. В этом методе отправителю не нужен ответ от получателя. Это обеспечивает большую гибкость и масштабируемость, поскольку отправители и получатели разделены и больше не требуются для синхронного исполнения.

В некоторых реальных сценариях клиенту не нужно ждать в состоянии блокировки, пока сервер завершит обслуживание своих запросов; скорее, он может улучшить свою производительность, выполняя некоторые задачи во время ожидания. Несколько асинхронных сообщений также могут быть отправлены в течение короткого периода времени. Буферизация этих сообщений и их отправка группой могут снизить частоту поступления в ядро и, таким образом, повысить производительность системы.

Асинхронный обмен сообщениями в ЗОСРВ «Нейтрино» обеспечивает управляемую событиями системную модель, которая хорошо дополняет традиционную архитектуру передачи сообщений синхронного обмена сообщениями. Его главная особенность - высокая пропускная способность и неблокирующая передача сообщений. Теперь вы можете проектировать системы, в которых ЗОСРВ «Нейтрино» может своевременно реагировать на асинхронные события, генерируемые извне или внутри. Асинхронный обмен сообщениями также обеспечивает удобный и эффективный механизм для доставки асинхронных событий и связанных с ними данных между компонентами системы.


Note:
  • Асинхронный обмен сообщениями не работает с прозрачной распределенной обработкой (т.е. не работает в Qnet).
  • Потоки, получающие асинхронные сообщения, не наследуют приоритет отправителя.

На следующем рисунке показан пользовательский процесс (производитель), который собирает данные с некоторых устройств ввода (например, датчиков), буферизует их и отправляет другому пользовательскому процессу (потребителю) для обработки. Базовым методом в асинхронном обмене сообщениями является «пакетная доставка», которая увеличивает пропускную способность и снижает накладные расходы. Данные отправляются в виде сообщений с определенными границами.

async_msg.png
Рисунок 1. Передача асинхронных сообщений

Вы можете использовать асинхронный обмен сообщениями вместо схемы "отправки-получения-ответа", когда:

Вы можете использовать схему "отправку-получение-ответ" вместо асинхронного обмена сообщениями, когда:

Примеры

Вот несколько примеров кода, показывающих, как работает асинхронный обмен сообщениями.

Пример 1: Отправка одного сообщения

Это самый простой пример; он демонстрирует отправку и получение одного сообщения. Номер триггера, который определяет количество сообщений, которые отправитель хочет отправить за один раз, установлено в 1.

#include <stdio.h>
#include <process.h>
#include <unistd.h>
#include <string.h>
#include <sys/asyncmsg.h>
char *msg = "AsyncMsg Passing";
int callback( int err, void *cmsg, unsigned handle )
{
printf( "Callback: err = %d, msg = %p (%p), handle = %d\n", err, cmsg, msg, handle );
return (0);
}
/*
* Simple test: Create a channel, connect to it, put a message, get it, done.
*/
int main()
{
int chid, coid;
struct _asyncmsg_connection_attr aca;
struct _asyncmsg_get_header *agh, *agh1;
if ( (chid = asyncmsg_channel_create( 0, 0666, 2048, 5, NULL, NULL )) == -1 )
{
perror( "channel_create" );
return (-1);
}
memset( &aca, 0, sizeof( aca ) );
aca.buffer_size = 2048;
aca.max_num_buffer = 5;
aca.trigger_num_msg = 1;
if ( (coid = asyncmsg_connect_attach( 0, 0, chid, 0, 0, &aca )) == -1 )
{
perror( "connect_attach" );
return (-1);
}
if ( (asyncmsg_put( coid, msg, strlen( msg ) + 1, 1234, callback )) == -1 )
{
perror( "put" );
return (-1);
}
if ( (agh = asyncmsg_get( chid )) == NULL )
{
perror( "get" );
return (-1);
}
printf( "Got message(s): \n\n" );
while ( agh1 = agh )
{
agh = agh1->next;
printf( "from process: %d (%d)\n", agh1->info.pid, getpid() );
printf( "msglen: %d (%d)\n", agh1->info.msglen, strlen( msg ) + 1 );
printf( "srclen: %d\n", agh1->info.srcmsglen );
printf( "err: %d\n", agh1->err );
printf( "parts: %d\n", agh1->parts );
printf( "msg: %s\n\n", (char *)agh1->iov->iov_base );
asyncmsg_free( agh1 );
}
/* give the callback a chance to run */
sleep( 1 );
if ( asyncmsg_connect_detach( coid ) == -1 )
{
perror( "connect_detach" );
return (-1);
}
if ( asyncmsg_channel_destroy( chid ) == -1 )
{
perror( "channel_detach" );
return (-1);
}
return (0);
}

Пример 2: Отправка нескольких сообщений одновременно

Следующая программа демонстрирует, как можно отправить более одного сообщения от отправителя, но использовать asyncmsg_get() только один раз, чтобы собрать все эти сообщения вместе. Вы должны установить соответствующий номер триггера (в данном примере 3) в атрибутах, которые вы передаете в asyncmsg_connect_attach(), чтобы запустить копирование сообщений. Номер триггера определяет количество сообщений, которые отправитель заранее определил для совместной отправки. Это увеличивает пропускную способность.

#include <stdio.h>
#include <process.h>
#include <unistd.h>
#include <string.h>
#include <sys/asyncmsg.h>
int callback( int err, void *cmsg, unsigned handle )
{
printf( "Callback: err = %d, msg = %p, handle = %d\n", err, cmsg, handle );
return (0);
}
/*
* Multiple put, then see if get can get all of them.
*/
int main()
{
int chid, coid, i;
struct _asyncmsg_connection_attr aca;
struct _asyncmsg_get_header *agh, *agh1;
char msg[3][80];
if ( (chid = asyncmsg_channel_create( _NTO_CHF_SENDER_LEN, 0666, 2048, 5, NULL, NULL )) == -1 )
{
perror( "channel_create" );
return (-1);
}
memset( &aca, 0, sizeof( aca ) );
aca.buffer_size = 2048;
aca.max_num_buffer = 5;
aca.trigger_num_msg = 3;
if ( (coid = asyncmsg_connect_attach( 0, 0, chid, 0, 0, &aca )) == -1 )
{
perror( "connect_attach" );
return (-1);
}
for ( i = 0; i < 3; i++ )
{
sprintf( msg[i], "Async Message Passing (msgid %d)\n", i );
if ( (asyncmsg_put( coid, msg[i], strlen( msg[i] ) + 1, 1234, callback )) == -1 )
{
perror( "put" );
return (-1);
}
}
if ( (agh = asyncmsg_get( chid )) == NULL )
{
perror ("get" );
return (-1);
}
printf( "Got message(s): \n\n" );
while ( agh1 = agh )
{
agh = agh1->next;
printf( "from process: %d (%d)\n", agh1->info.pid, getpid() );
printf( "msglen: %d (%d)\n", agh1->info.msglen, strlen( *msg ) + 1 );
printf( "srclen: %d\n", agh1->info.srcmsglen );
printf( "err: %d\n", agh1->err );
printf( "parts: %d\n", agh1->parts );
printf( "msg: %s\n\n", (char *)agh1->iov->iov_base );
asyncmsg_free( agh1 );
}
sleep( 1 );
if ( asyncmsg_connect_detach( coid ) == -1 )
{
perror( "connect_detach" );
return (-1);
}
if ( asyncmsg_channel_destroy( chid ) == -1 )
{
perror( "channel_detach" );
return (-1);
}
return (0);
}

Пример 3: Очистка

Следующая программа демонстрирует использование asyncmsg_flush(), которое можно использовать для очистки всех сообщений; вам не нужно устанавливать номер триггера. Фактически, получатель получает уведомление о событии от ядра, как только сообщения готовы в отправителе. Затем получатель сразу же принимает все сообщения вместе.

#include <stdio.h>
#include <errno.h>
#include <process.h>
#include <unistd.h>
#include <string.h>
#include <sys/asyncmsg.h>
int num_callback_run = 0;
int chid;
int callback( int err, void *cmsg, unsigned handle )
{
num_callback_run++;
printf( "Callback: err = %d, msg = %p, handle = %d\n", err, cmsg, handle );
return (0);
}
void * thread_for_get( void *arg )
{
int pchid = (int)arg;
struct _pulse pulse;
struct _asyncmsg_get_header *agh, *agh1;
/* waiting for the event */
if ( MsgReceivePulse( pchid, &pulse, sizeof( pulse ), NULL ) == -1 )
{
perror( "MsgReceivePulse" );
return (NULL);
}
if ( (agh = asyncmsg_get( chid )) == NULL )
{
perror( "get" );
return (NULL);
}
printf( "Got message(s): \n\n" );
while ( agh1 = agh )
{
agh = agh1->next;
printf( "from process: %d (%d)\n", agh1->info.pid, getpid() );
printf( "msglen: %d\n", agh1->info.msglen );
printf( "srclen: %d\n", agh1->info.srcmsglen );
printf( "err: %d\n", agh1->err );
printf( "parts: %d\n", agh1->parts );
printf( "msg: %s\n\n", (char *)agh1->iov->iov_base );
asyncmsg_free( agh1 );
}
return (0);
}
/*
* No trigger for put, but we force flush() and see if all messages can be pushed out.
*/
int main()
{
int coid, pchid, i;
struct sigevent gev;
struct _asyncmsg_connection_attr aca;
char msg[4][80];
/* prepare the event */
if ( (pchid = ChannelCreate( 0 )) == -1 )
{
perror( "ChannelCreate" );
return (-1);
}
if ( (errno = pthread_create( 0, 0, thread_for_get, (void *)pchid ) ) != EOK )
{
perror( "pthread_create" );
return (-1);
}
if ( (gev.sigev_coid = ConnectAttach( 0, 0, pchid, _NTO_SIDE_CHANNEL, 0 )) == -1 )
{
perror( "ConnectAttach" );
return (-1);
}
gev.sigev_notify = SIGEV_PULSE;
gev.sigev_priority = SIGEV_PULSE_PRIO_INHERIT;
/* async channel */
if ( (chid = asyncmsg_channel_create( _NTO_CHF_SENDER_LEN, 0666, 2048, 5, &gev, NULL )) == -1 )
{
perror( "channel_create" );
return (-1);
}
memset( &aca, 0, sizeof( aca ) );
aca.buffer_size = 2048;
aca.max_num_buffer = 5;
aca.trigger_num_msg = 0;
aca.call_back = callback;
if ( (coid = asyncmsg_connect_attach( 0, 0, chid, 0, 0, &aca )) == -1 )
{
perror( "connect_attach" );
return (-1);
}
/* put up 4 messages */
for ( i = 0; i < 4; i++ )
{
sprintf( msg[i], "Async Message Passing (msgid %d)\n", i );
if ( (asyncmsg_put( coid, msg[i], strlen( msg[i] ) + 1, 1234, callback )) == -1 )
{
perror( "put" );
return (-1);
}
}
asyncmsg_flush( coid, 0 );
/* give the put callback a chance to run */
while ( num_callback_run < 2 )
delay( 500 );
if ( asyncmsg_connect_detach( coid ) == -1 )
{
perror( "connect_detach" );
return (-1);
}
if ( asyncmsg_channel_destroy( chid ) == -1 )
{
perror( "channel_detach" );
return (-1);
}
return (0);
}

Пример 4: Настройка события, запускающего копирование

Данная программа демонстрирует как использовать asyncmsg_connect_attach() для установки события, которое используется для запуска копирования получателем.

#include <stdio.h>
#include <process.h>
#include <unistd.h>
#include <string.h>
#include <sys/asyncmsg.h>
char *msg = "AsyncMsg Passing";
int callback( int err, void *cmsg, unsigned handle )
{
printf( "Callback: err = %d, msg = %p (%p), handle = %d\n", err, cmsg, msg, handle );
return (0);
}
/*
* Simple test. Create a channel, connect to it, put a message, get them, done
*/
int main()
{
int chid, coid, pchid;
struct sigevent gev;
struct _pulse pulse;
struct _asyncmsg_connection_attr aca;
struct _asyncmsg_get_header *agh, *agh1;
/* prepare the event */
if ( (pchid = ChannelCreate( 0 )) == -1 )
{
perror( "ChannelCreate" );
return (-1);
}
if ( (gev.sigev_coid = ConnectAttach( 0, 0, pchid, _NTO_SIDE_CHANNEL, 0 )) == -1 )
{
perror( "ConnectAttach" );
return (-1);
}
gev.sigev_notify = SIGEV_PULSE;
gev.sigev_priority = SIGEV_PULSE_PRIO_INHERIT;
/* async channel */
if ( (chid = asyncmsg_channel_create( 0, 0666, 2048, 5, &gev, NULL )) == -1 )
{
perror( "channel_create" );
return (-1);
}
memset( &aca, 0, sizeof( aca ) );
aca.buffer_size = 2048;
aca.max_num_buffer = 5;
aca.trigger_num_msg = 1;
aca.call_back = callback;
if ( (coid = asyncmsg_connect_attach( 0, 0, chid, 0, 0, &aca )) == -1 )
{
perror( "connect_attach" );
return (-1);
}
if ( (asyncmsg_put( coid, msg, strlen( msg ) + 1, 0, 0 )) == -1 )
{
perror( "put" );
return (-1);
}
/* waiting for the event */
if ( MsgReceivePulse( pchid, &pulse, sizeof( pulse ), NULL ) == -1 )
{
perror( "MsgReceivePulse" );
return (-1);
}
if ( (agh = asyncmsg_get( chid )) == NULL )
{
perror( "get" );
return (-1);
}
printf( "Got message(s): \n\n" );
while ( agh1 = agh )
{
agh = agh1->next;
printf( "from process: %d (%d)\n", agh1->info.pid, getpid() );
printf( "msglen: %d (%d)\n", agh1->info.msglen, strlen( msg ) + 1 );
printf( "srclen: %d\n", agh1->info.srcmsglen );
printf( "err: %d\n", agh1->err );
printf( "parts: %d\n", agh1->parts );
printf( "msg: %s\n\n", (char *)agh1->iov->iov_base );
asyncmsg_free( agh1 );
}
/* give the callback a chance to run */
sleep( 1 );
if ( asyncmsg_connect_detach( coid ) == -1 )
{
perror( "connect_detach" );
return (-1);
}
if ( asyncmsg_channel_destroy( chid ) == -1 )
{
perror( "channel_detach" );
return (-1);
}
return (0);
}

Пример 5: Использование времени как триггера

В данном примере используется задержка запуска копирования получателем. Здесь таймер выставлен на определенный период времени (2 секунды). По истечении таймера получатель получает уведомление и начинает копировать сообщения.

#include <stdio.h>
#include <process.h>
#include <unistd.h>
#include <string.h>
#include <sys/asyncmsg.h>
int callback( int err, void *cmsg, unsigned handle )
{
printf( "Callback: err = %d, msg = %p, handle = %d\n", err, cmsg, handle );
return (0);
}
/*
* Set the get trigger to 2 seconds, put up 4 messages, see we could get a
* notification for get after 2 seconds.
*/
int main()
{
int chid, coid, pchid, i;
struct sigevent gev;
struct _pulse pulse;
struct _asyncmsg_connection_attr aca;
struct _asyncmsg_get_header *agh, *agh1;
char msg[4][80];
/* prepare the event */
if ( (pchid = ChannelCreate( 0 )) == -1 )
{
perror( "ChannelCreate" );
return (-1);
}
if ( (gev.sigev_coid = ConnectAttach( 0, 0, pchid, _NTO_SIDE_CHANNEL, 0 )) == -1 )
{
perror( "ConnectAttach" );
return (-1);
}
gev.sigev_notify = SIGEV_PULSE;
gev.sigev_priority = SIGEV_PULSE_PRIO_INHERIT;
/* async channel */
if ( (chid = asyncmsg_channel_create( _NTO_CHF_SENDER_LEN, 0666, 2048, 5, &gev, NULL )) == -1 )
{
perror( "channel_create" );
return (-1);
}
memset( &aca, 0, sizeof( aca ) );
aca.buffer_size = 2048;
aca.max_num_buffer = 5;
aca.trigger_num_msg = 0;
aca.trigger_time.nsec = 2000000000L;
aca.trigger_time.interval_nsec = 0;
aca.call_back = callback;
if ( (coid = asyncmsg_connect_attach( 0, 0, chid, 0, 0, &aca )) == -1 )
{
perror( "connect_attach" );
return (-1);
}
/* put up 4 messages */
for ( i = 0; i < 4; i++ )
{
sprintf( msg[i], "Async Message Passing (msgid %d)\n", i );
if ( (asyncmsg_put( coid, msg[i], strlen( msg[i] ) + 1, 1234, callback )) == -1 )
{
perror( "put" );
return (-1);
}
}
/* waiting for the event */
if ( MsgReceivePulse( pchid, &pulse, sizeof( pulse ), NULL ) == -1 )
{
perror( "MsgReceivePulse" );
return (-1);
}
if ( (agh = asyncmsg_get( chid )) == NULL )
{
perror( "get" );
return (-1);
}
printf( "Got message(s): \n\n" );
while ( agh1 = agh )
{
agh = agh1->next;
printf( "from process: %d (%d)\n", agh1->info.pid, getpid() );
printf( "msglen: %d (%d)\n", agh1->info.msglen, strlen( *msg ) + 1 );
printf( "srclen: %d\n", agh1->info.srcmsglen );
printf( "err: %d\n", agh1->err );
printf( "parts: %d\n", agh1->parts );
printf( "msg: %s\n\n", (char *)agh1->iov->iov_base );
asyncmsg_free( agh1 );
}
/* give the callback a chance to run */
sleep( 1 );
if ( asyncmsg_connect_detach( coid ) == -1 )
{
perror( "connect_detach" );
return (-1);
}
if ( asyncmsg_channel_destroy( chid ) == -1 )
{
perror( "channel_detach" );
return (-1);
}
return (0);
}

Пример 6: Клиентские и серверные процессы

Это двухпроцессный пример, демонстрирующий простую асинхронную доставку сообщений между клиентским и серверным процессом.

Оба файла asynch_client.c и asynch_server.c разделяют один и тот же заголовочный файл, asynch_server.h, чтобы найти друг друга, используя зарегистрированное имя и определенный тип сообщения для запроса:

#define RECV_NAME "ASYNCH_RECEIVER"
#define GET_ACHID 2 // message type for async chid query: no data; reply is 32-bit chid

Последующий файл (asynch_server.c) - это код сервера, демонстрирующий асинхронный обмен сообщениями. Он регистрирует имя с помощью name_attach(), чтобы клиент(ы) мог(ли) его найти, и отвечает на один запрос от клиента, чтобы получить его идентификатор асинхронного канала.

Этот сервер использует MsgReceive() в качестве точки блокировки и получает импульс от ядра всякий раз, когда есть сообщения, доступные для приема по асинхронному каналу, что позволяет серверу легко обрабатывать как синхронные, так и асинхронные сообщения.

#include <stdlib.h>
#include <stdio.h>
#include <errno.h>
#include <sys/siginfo.h>
#include <sys/neutrino.h>
#include <sys/dispatch.h>
#include <sys/asyncmsg.h>
#include <unistd.h>
#include <sys/trace.h>
#include "asynch_server.h"
#define PROGNAME "asynch_server: "
/*
* structure for receive buffer for MsgReceive() currently only query message or pulse.
*/
union recv_msgs
{
struct _pulse pulse;
uint16_t type;
} recv_buf;
/*
* pulse code for asynch message notification from kernel this is a user pulse code,
* chosen here -- it need not be this value or any particular value
*/
#define PULSE_ASYNCH_EVENT (_PULSE_CODE_MINAVAIL + 5)
int main( int argc, char *argv[] )
{
name_attach_t *att;
int rcvid;
struct _msg_info msg_info;
struct sigevent sigev;
int self_coid;
int achid;
struct _asyncmsg_get_header *agh, *agh1;
/* register my name so client can find me */
att = name_attach( NULL, RECV_NAME, 0 );
if ( NULL == att )
{
perror( PROGNAME "name_attach()" );
exit( EXIT_FAILURE );
}
/*
* create a connection to the synchronous channel created by the name_attach() call. Will
* use this to specify where the pulses flagging async data available should be delivered.
*/
self_coid = ConnectAttach( 0, 0, att->chid, _NTO_SIDE_CHANNEL, 0 );
if ( -1 == self_coid )
{
perror( PROGNAME "ConnectAttach" );
exit( EXIT_FAILURE );
}
/*
* and fill in the event structure to describe a priority 10 pulse to be delivered to my
* synchronous channel
*/
SIGEV_PULSE_INIT( &sigev, self_coid, 10, PULSE_ASYNCH_EVENT, 0 );
/*
* create an asynchronous channel with automatic buffering
* - it will not block an asyncmsg_get() call if there are no messages available
* - it will receive up to 10 messages of up to 1024 bytes each at once
* I will get a pulse notification when the queue of available messages goes from empty to non-empty
*/
achid = asyncmsg_channel_create( _NTO_CHF_ASYNC_NONBLOCK, 0666, 1024, 10, &sigev, NULL );
if ( -1 == achid )
{
perror( "asyncmsg_channel_create" );
exit( EXIT_FAILURE );
}
while ( 1 )
{
rcvid = MsgReceive( att->chid, &recv_buf, sizeof( recv_buf ), &msg_info );
if ( -1 == rcvid )
{
perror( PROGNAME "MsgReceive failed" );
continue;
}
if ( 0 == rcvid )
{
/* we received a pulse */
printf( "got a pulse\n" );
switch ( recv_buf.pulse.code )
{
/* system disconnect pulse */
case _PULSE_CODE_DISCONNECT:
ConnectDetach( recv_buf.pulse.scoid );
printf( PROGNAME "disconnect from a client %X\n", recv_buf.pulse.scoid );
break;
/* our pulse - we've got one or more messages */
case PULSE_ASYNCH_EVENT:
/* get one or more messages from our channel */
agh = asyncmsg_get( achid );
if ( NULL == agh )
{
perror( "went to get a message, but nothing was there" );
} else {
/*
* the async receive header is, actually, a linked list of headers
* if multiple messages have been received at once, so we need to
* walk the list, looking at each header and message in turn
*/
while ( agh )
{
printf( "the message came from %d in %d parts\n", agh->info.pid, agh->parts );
printf( "the data is '%s'\n", (char *)(agh->iov->iov_base) );
agh1 = agh;
agh = agh1->next;
/* free this message */
asyncmsg_free( agh1 );
}
}
break;
default:
printf( PROGNAME "unexpected pulse code: %d\n", recv_buf.pulse.code );
break;
}
continue;
}
/* not an error, not a pulse, therefor a message */
if ( recv_buf.type == _IO_CONNECT )
{
/* _IO_CONNECT because someone did a name_open() to us, must EOK it.*/
MsgReply( rcvid, EOK, NULL, 0 );
continue;
}
if ( recv_buf.type > _IO_BASE && recv_buf.type <= _IO_MAX )
{
/* unexpected system message,probably qconn, error it */
MsgError( rcvid, ENOSYS );
continue;
}
switch( recv_buf.type )
{
/* here our client asked for our asynchronous channel id reply with it */
case GET_ACHID:
printf( "got request for my achid\n" );
MsgReply( rcvid, 0, &achid, sizeof( achid ) );
break;
default:
/* some other expect message */
printf( PROGNAME "expect message type: %d\n", recv_buf.type );
MsgError( rcvid, ENOSYS );
break;
}
}
return (0);
}

Последующий файл, asynch_client.c, сначала нахоидт сервер, используя name_locate() затем отправляет запрос для получения идентификатора асинхронного канала сервера. Затем он создает асинхронное соединение и каждую секунду отправляет сообщение с обратным вызовом, печатающее, что сообщение было отправлено.

#include <stdlib.h>
#include <stdio.h>
#include <errno.h>
#include <sys/siginfo.h>
#include <sys/neutrino.h>
#include <sys/dispatch.h>
#include <sys/asyncmsg.h>
#include <unistd.h>
#include <sys/trace.h>
#include "asynch_server.h"
#define PROGNAME "asynch_client: "
/*
* callback is run after each message has been delivered to the server. It is registered at
* connect time and called by a thread created during the connect processing.
*/
int callback( int err, void *cmsg, unsigned handle )
{
static int num_callback_run = 0;
++num_callback_run;
printf( "Callback: err = %d, msg = %p, handle = %d\n", err, cmsg, handle );
printf( "run the %dth time by %d tid\n", num_callback_run, pthread_self() );
return (0);
}
int main( int argc, char *argv[] )
{
int server_coid, a_coid;
short msg;
int a_chid;
struct _asyncmsg_connection_attr aca;
struct _server_info sinfo;
int count = 1;
/* look for server */
server_coid = name_open( RECV_NAME, 0 );
while ( server_coid == -1 )
{
sleep( 1 );
server_coid = name_open( RECV_NAME, 0 );
}
/* need server's pid, use this to get server info including pid */
ConnectServerInfo( 0, server_coid, &sinfo );
/* send message to server request the asynchronous channel id */
msg = GET_ACHID;
if ( MsgSend( server_coid, &msg, sizeof( msg ), &a_chid, sizeof( a_chid ) ) )
{
perror( PROGNAME "MsgSend" );
exit( EXIT_FAILURE );
}
/* setup array of send buffers */
memset( &aca, 0, sizeof( aca ) );
aca.buffer_size = 2048;
aca.max_num_buffer = 5; /* don't locally accumulate more than 5 buffers */
aca.trigger_num_msg = 1; /* deliver each message as it is sent */
aca.call_back = callback; /* call this function after messages are sent */
/* create my asynchronous connection to the server */
a_coid = asyncmsg_connect_attach( 0, sinfo.pid, a_chid, 0, 0, &aca );
if ( -1 == a_coid )
{
perror( "async connect" );
printf( "server pid is %d\n", sinfo.pid );
exit( EXIT_FAILURE );
}
while ( 1 )
{
int ret, len;
char buf[80];
len = sprintf( buf, "message #%d", count ) + 1;
printf( "sending %dth time\n", count );
/*
* deliver message to server, the count value will be passed into my callback as the "handle"
*/
ret = asyncmsg_put( a_coid, buf, len, count, NULL );
if ( -1 == ret )
{
perror( "put" );
exit( EXIT_FAILURE );
}
count++;
sleep( 1 );
}
return (0);
}




Предыдущий раздел: Описание API библиотеки asyncmsg