Описание механизмов обмена асинхронными сообщениями и примеры использования
Асинхронный обмен сообщениями - это модель взаимодействия, основанная на методе промежуточного хранения. По сравнению с обычным общением на основе ответов в системах клиент-сервер, инфраструктура обмена сообщениями обеспечивает доставку, даже если участник временно отключен, занят или недоступен. В этом методе отправителю не нужен ответ от получателя. Это обеспечивает большую гибкость и масштабируемость, поскольку отправители и получатели разделены и больше не требуются для синхронного исполнения.
В некоторых реальных сценариях клиенту не нужно ждать в состоянии блокировки, пока сервер завершит обслуживание своих запросов; скорее, он может улучшить свою производительность, выполняя некоторые задачи во время ожидания. Несколько асинхронных сообщений также могут быть отправлены в течение короткого периода времени. Буферизация этих сообщений и их отправка группой могут снизить частоту поступления в ядро и, таким образом, повысить производительность системы.
Асинхронный обмен сообщениями в ЗОСРВ «Нейтрино» обеспечивает управляемую событиями системную модель, которая хорошо дополняет традиционную архитектуру передачи сообщений синхронного обмена сообщениями. Его главная особенность - высокая пропускная способность и неблокирующая передача сообщений. Теперь вы можете проектировать системы, в которых ЗОСРВ «Нейтрино» может своевременно реагировать на асинхронные события, генерируемые извне или внутри. Асинхронный обмен сообщениями также обеспечивает удобный и эффективный механизм для доставки асинхронных событий и связанных с ними данных между компонентами системы.
![]() |
|
На следующем рисунке показан пользовательский процесс (производитель), который собирает данные с некоторых устройств ввода (например, датчиков), буферизует их и отправляет другому пользовательскому процессу (потребителю) для обработки. Базовым методом в асинхронном обмене сообщениями является «пакетная доставка», которая увеличивает пропускную способность и снижает накладные расходы. Данные отправляются в виде сообщений с определенными границами.
Вы можете использовать асинхронный обмен сообщениями вместо схемы "отправки-получения-ответа", когда:
Вы можете использовать схему "отправку-получение-ответ" вместо асинхронного обмена сообщениями, когда:
Вот несколько примеров кода, показывающих, как работает асинхронный обмен сообщениями.
Это самый простой пример; он демонстрирует отправку и получение одного сообщения. Номер триггера, который определяет количество сообщений, которые отправитель хочет отправить за один раз, установлено в 1
.
#include <stdio.h>#include <process.h>#include <unistd.h>#include <string.h>#include <sys/asyncmsg.h>char *msg = "AsyncMsg Passing";int callback( int err, void *cmsg, unsigned handle ){printf( "Callback: err = %d, msg = %p (%p), handle = %d\n", err, cmsg, msg, handle );return (0);}/** Simple test: Create a channel, connect to it, put a message, get it, done.*/int main(){int chid, coid;struct _asyncmsg_connection_attr aca;struct _asyncmsg_get_header *agh, *agh1;if ( (chid = asyncmsg_channel_create( 0, 0666, 2048, 5, NULL, NULL )) == -1 ){perror( "channel_create" );return (-1);}memset( &aca, 0, sizeof( aca ) );aca.buffer_size = 2048;aca.max_num_buffer = 5;aca.trigger_num_msg = 1;if ( (coid = asyncmsg_connect_attach( 0, 0, chid, 0, 0, &aca )) == -1 ){perror( "connect_attach" );return (-1);}if ( (asyncmsg_put( coid, msg, strlen( msg ) + 1, 1234, callback )) == -1 ){perror( "put" );return (-1);}if ( (agh = asyncmsg_get( chid )) == NULL ){perror( "get" );return (-1);}printf( "Got message(s): \n\n" );while ( agh1 = agh ){agh = agh1->next;printf( "from process: %d (%d)\n", agh1->info.pid, getpid() );printf( "msglen: %d (%d)\n", agh1->info.msglen, strlen( msg ) + 1 );printf( "srclen: %d\n", agh1->info.srcmsglen );printf( "err: %d\n", agh1->err );printf( "parts: %d\n", agh1->parts );printf( "msg: %s\n\n", (char *)agh1->iov->iov_base );asyncmsg_free( agh1 );}/* give the callback a chance to run */sleep( 1 );if ( asyncmsg_connect_detach( coid ) == -1 ){perror( "connect_detach" );return (-1);}if ( asyncmsg_channel_destroy( chid ) == -1 ){perror( "channel_detach" );return (-1);}return (0);}
Следующая программа демонстрирует, как можно отправить более одного сообщения от отправителя, но использовать asyncmsg_get() только один раз, чтобы собрать все эти сообщения вместе. Вы должны установить соответствующий номер триггера (в данном примере 3) в атрибутах, которые вы передаете в asyncmsg_connect_attach(), чтобы запустить копирование сообщений. Номер триггера определяет количество сообщений, которые отправитель заранее определил для совместной отправки. Это увеличивает пропускную способность.
#include <stdio.h>#include <process.h>#include <unistd.h>#include <string.h>#include <sys/asyncmsg.h>int callback( int err, void *cmsg, unsigned handle ){printf( "Callback: err = %d, msg = %p, handle = %d\n", err, cmsg, handle );return (0);}/** Multiple put, then see if get can get all of them.*/int main(){int chid, coid, i;struct _asyncmsg_connection_attr aca;struct _asyncmsg_get_header *agh, *agh1;char msg[3][80];if ( (chid = asyncmsg_channel_create( _NTO_CHF_SENDER_LEN, 0666, 2048, 5, NULL, NULL )) == -1 ){perror( "channel_create" );return (-1);}memset( &aca, 0, sizeof( aca ) );aca.buffer_size = 2048;aca.max_num_buffer = 5;aca.trigger_num_msg = 3;if ( (coid = asyncmsg_connect_attach( 0, 0, chid, 0, 0, &aca )) == -1 ){perror( "connect_attach" );return (-1);}for ( i = 0; i < 3; i++ ){sprintf( msg[i], "Async Message Passing (msgid %d)\n", i );if ( (asyncmsg_put( coid, msg[i], strlen( msg[i] ) + 1, 1234, callback )) == -1 ){perror( "put" );return (-1);}}if ( (agh = asyncmsg_get( chid )) == NULL ){perror ("get" );return (-1);}printf( "Got message(s): \n\n" );while ( agh1 = agh ){agh = agh1->next;printf( "from process: %d (%d)\n", agh1->info.pid, getpid() );printf( "msglen: %d (%d)\n", agh1->info.msglen, strlen( *msg ) + 1 );printf( "srclen: %d\n", agh1->info.srcmsglen );printf( "err: %d\n", agh1->err );printf( "parts: %d\n", agh1->parts );printf( "msg: %s\n\n", (char *)agh1->iov->iov_base );asyncmsg_free( agh1 );}sleep( 1 );if ( asyncmsg_connect_detach( coid ) == -1 ){perror( "connect_detach" );return (-1);}if ( asyncmsg_channel_destroy( chid ) == -1 ){perror( "channel_detach" );return (-1);}return (0);}
Следующая программа демонстрирует использование asyncmsg_flush(), которое можно использовать для очистки всех сообщений; вам не нужно устанавливать номер триггера. Фактически, получатель получает уведомление о событии от ядра, как только сообщения готовы в отправителе. Затем получатель сразу же принимает все сообщения вместе.
#include <stdio.h>#include <errno.h>#include <process.h>#include <unistd.h>#include <string.h>#include <sys/asyncmsg.h>int num_callback_run = 0;int chid;int callback( int err, void *cmsg, unsigned handle ){num_callback_run++;printf( "Callback: err = %d, msg = %p, handle = %d\n", err, cmsg, handle );return (0);}void * thread_for_get( void *arg ){int pchid = (int)arg;struct _pulse pulse;struct _asyncmsg_get_header *agh, *agh1;/* waiting for the event */if ( MsgReceivePulse( pchid, &pulse, sizeof( pulse ), NULL ) == -1 ){perror( "MsgReceivePulse" );return (NULL);}if ( (agh = asyncmsg_get( chid )) == NULL ){perror( "get" );return (NULL);}printf( "Got message(s): \n\n" );while ( agh1 = agh ){agh = agh1->next;printf( "from process: %d (%d)\n", agh1->info.pid, getpid() );printf( "msglen: %d\n", agh1->info.msglen );printf( "srclen: %d\n", agh1->info.srcmsglen );printf( "err: %d\n", agh1->err );printf( "parts: %d\n", agh1->parts );printf( "msg: %s\n\n", (char *)agh1->iov->iov_base );asyncmsg_free( agh1 );}return (0);}/** No trigger for put, but we force flush() and see if all messages can be pushed out.*/int main(){int coid, pchid, i;struct sigevent gev;struct _asyncmsg_connection_attr aca;char msg[4][80];/* prepare the event */if ( (pchid = ChannelCreate( 0 )) == -1 ){perror( "ChannelCreate" );return (-1);}if ( (errno = pthread_create( 0, 0, thread_for_get, (void *)pchid ) ) != EOK ){perror( "pthread_create" );return (-1);}if ( (gev.sigev_coid = ConnectAttach( 0, 0, pchid, _NTO_SIDE_CHANNEL, 0 )) == -1 ){perror( "ConnectAttach" );return (-1);}gev.sigev_notify = SIGEV_PULSE;gev.sigev_priority = SIGEV_PULSE_PRIO_INHERIT;/* async channel */if ( (chid = asyncmsg_channel_create( _NTO_CHF_SENDER_LEN, 0666, 2048, 5, &gev, NULL )) == -1 ){perror( "channel_create" );return (-1);}memset( &aca, 0, sizeof( aca ) );aca.buffer_size = 2048;aca.max_num_buffer = 5;aca.trigger_num_msg = 0;aca.call_back = callback;if ( (coid = asyncmsg_connect_attach( 0, 0, chid, 0, 0, &aca )) == -1 ){perror( "connect_attach" );return (-1);}/* put up 4 messages */for ( i = 0; i < 4; i++ ){sprintf( msg[i], "Async Message Passing (msgid %d)\n", i );if ( (asyncmsg_put( coid, msg[i], strlen( msg[i] ) + 1, 1234, callback )) == -1 ){perror( "put" );return (-1);}}asyncmsg_flush( coid, 0 );/* give the put callback a chance to run */while ( num_callback_run < 2 )delay( 500 );if ( asyncmsg_connect_detach( coid ) == -1 ){perror( "connect_detach" );return (-1);}if ( asyncmsg_channel_destroy( chid ) == -1 ){perror( "channel_detach" );return (-1);}return (0);}
Данная программа демонстрирует как использовать asyncmsg_connect_attach() для установки события, которое используется для запуска копирования получателем.
#include <stdio.h>#include <process.h>#include <unistd.h>#include <string.h>#include <sys/asyncmsg.h>char *msg = "AsyncMsg Passing";int callback( int err, void *cmsg, unsigned handle ){printf( "Callback: err = %d, msg = %p (%p), handle = %d\n", err, cmsg, msg, handle );return (0);}/** Simple test. Create a channel, connect to it, put a message, get them, done*/int main(){int chid, coid, pchid;struct sigevent gev;struct _pulse pulse;struct _asyncmsg_connection_attr aca;struct _asyncmsg_get_header *agh, *agh1;/* prepare the event */if ( (pchid = ChannelCreate( 0 )) == -1 ){perror( "ChannelCreate" );return (-1);}if ( (gev.sigev_coid = ConnectAttach( 0, 0, pchid, _NTO_SIDE_CHANNEL, 0 )) == -1 ){perror( "ConnectAttach" );return (-1);}gev.sigev_notify = SIGEV_PULSE;gev.sigev_priority = SIGEV_PULSE_PRIO_INHERIT;/* async channel */if ( (chid = asyncmsg_channel_create( 0, 0666, 2048, 5, &gev, NULL )) == -1 ){perror( "channel_create" );return (-1);}memset( &aca, 0, sizeof( aca ) );aca.buffer_size = 2048;aca.max_num_buffer = 5;aca.trigger_num_msg = 1;aca.call_back = callback;if ( (coid = asyncmsg_connect_attach( 0, 0, chid, 0, 0, &aca )) == -1 ){perror( "connect_attach" );return (-1);}if ( (asyncmsg_put( coid, msg, strlen( msg ) + 1, 0, 0 )) == -1 ){perror( "put" );return (-1);}/* waiting for the event */if ( MsgReceivePulse( pchid, &pulse, sizeof( pulse ), NULL ) == -1 ){perror( "MsgReceivePulse" );return (-1);}if ( (agh = asyncmsg_get( chid )) == NULL ){perror( "get" );return (-1);}printf( "Got message(s): \n\n" );while ( agh1 = agh ){agh = agh1->next;printf( "from process: %d (%d)\n", agh1->info.pid, getpid() );printf( "msglen: %d (%d)\n", agh1->info.msglen, strlen( msg ) + 1 );printf( "srclen: %d\n", agh1->info.srcmsglen );printf( "err: %d\n", agh1->err );printf( "parts: %d\n", agh1->parts );printf( "msg: %s\n\n", (char *)agh1->iov->iov_base );asyncmsg_free( agh1 );}/* give the callback a chance to run */sleep( 1 );if ( asyncmsg_connect_detach( coid ) == -1 ){perror( "connect_detach" );return (-1);}if ( asyncmsg_channel_destroy( chid ) == -1 ){perror( "channel_detach" );return (-1);}return (0);}
В данном примере используется задержка запуска копирования получателем. Здесь таймер выставлен на определенный период времени (2 секунды). По истечении таймера получатель получает уведомление и начинает копировать сообщения.
#include <stdio.h>#include <process.h>#include <unistd.h>#include <string.h>#include <sys/asyncmsg.h>int callback( int err, void *cmsg, unsigned handle ){printf( "Callback: err = %d, msg = %p, handle = %d\n", err, cmsg, handle );return (0);}/** Set the get trigger to 2 seconds, put up 4 messages, see we could get a* notification for get after 2 seconds.*/int main(){int chid, coid, pchid, i;struct sigevent gev;struct _pulse pulse;struct _asyncmsg_connection_attr aca;struct _asyncmsg_get_header *agh, *agh1;char msg[4][80];/* prepare the event */if ( (pchid = ChannelCreate( 0 )) == -1 ){perror( "ChannelCreate" );return (-1);}if ( (gev.sigev_coid = ConnectAttach( 0, 0, pchid, _NTO_SIDE_CHANNEL, 0 )) == -1 ){perror( "ConnectAttach" );return (-1);}gev.sigev_notify = SIGEV_PULSE;gev.sigev_priority = SIGEV_PULSE_PRIO_INHERIT;/* async channel */if ( (chid = asyncmsg_channel_create( _NTO_CHF_SENDER_LEN, 0666, 2048, 5, &gev, NULL )) == -1 ){perror( "channel_create" );return (-1);}memset( &aca, 0, sizeof( aca ) );aca.buffer_size = 2048;aca.max_num_buffer = 5;aca.trigger_num_msg = 0;aca.trigger_time.nsec = 2000000000L;aca.trigger_time.interval_nsec = 0;aca.call_back = callback;if ( (coid = asyncmsg_connect_attach( 0, 0, chid, 0, 0, &aca )) == -1 ){perror( "connect_attach" );return (-1);}/* put up 4 messages */for ( i = 0; i < 4; i++ ){sprintf( msg[i], "Async Message Passing (msgid %d)\n", i );if ( (asyncmsg_put( coid, msg[i], strlen( msg[i] ) + 1, 1234, callback )) == -1 ){perror( "put" );return (-1);}}/* waiting for the event */if ( MsgReceivePulse( pchid, &pulse, sizeof( pulse ), NULL ) == -1 ){perror( "MsgReceivePulse" );return (-1);}if ( (agh = asyncmsg_get( chid )) == NULL ){perror( "get" );return (-1);}printf( "Got message(s): \n\n" );while ( agh1 = agh ){agh = agh1->next;printf( "from process: %d (%d)\n", agh1->info.pid, getpid() );printf( "msglen: %d (%d)\n", agh1->info.msglen, strlen( *msg ) + 1 );printf( "srclen: %d\n", agh1->info.srcmsglen );printf( "err: %d\n", agh1->err );printf( "parts: %d\n", agh1->parts );printf( "msg: %s\n\n", (char *)agh1->iov->iov_base );asyncmsg_free( agh1 );}/* give the callback a chance to run */sleep( 1 );if ( asyncmsg_connect_detach( coid ) == -1 ){perror( "connect_detach" );return (-1);}if ( asyncmsg_channel_destroy( chid ) == -1 ){perror( "channel_detach" );return (-1);}return (0);}
Это двухпроцессный пример, демонстрирующий простую асинхронную доставку сообщений между клиентским и серверным процессом.
Оба файла asynch_client.c
и asynch_server.c
разделяют один и тот же заголовочный файл, asynch_server.h
, чтобы найти друг друга, используя зарегистрированное имя и определенный тип сообщения для запроса:
#define RECV_NAME "ASYNCH_RECEIVER"#define GET_ACHID 2 // message type for async chid query: no data; reply is 32-bit chid
Последующий файл (asynch_server.c
) - это код сервера, демонстрирующий асинхронный обмен сообщениями. Он регистрирует имя с помощью name_attach(), чтобы клиент(ы) мог(ли) его найти, и отвечает на один запрос от клиента, чтобы получить его идентификатор асинхронного канала.
Этот сервер использует MsgReceive() в качестве точки блокировки и получает импульс от ядра всякий раз, когда есть сообщения, доступные для приема по асинхронному каналу, что позволяет серверу легко обрабатывать как синхронные, так и асинхронные сообщения.
#include <stdlib.h>#include <stdio.h>#include <errno.h>#include <sys/siginfo.h>#include <sys/neutrino.h>#include <sys/dispatch.h>#include <sys/asyncmsg.h>#include <unistd.h>#include <sys/trace.h>#include "asynch_server.h"#define PROGNAME "asynch_server: "/** structure for receive buffer for MsgReceive() currently only query message or pulse.*/union recv_msgs{struct _pulse pulse;uint16_t type;} recv_buf;/** pulse code for asynch message notification from kernel this is a user pulse code,* chosen here -- it need not be this value or any particular value*/#define PULSE_ASYNCH_EVENT (_PULSE_CODE_MINAVAIL + 5)int main( int argc, char *argv[] ){name_attach_t *att;int rcvid;struct _msg_info msg_info;struct sigevent sigev;int self_coid;int achid;struct _asyncmsg_get_header *agh, *agh1;/* register my name so client can find me */att = name_attach( NULL, RECV_NAME, 0 );if ( NULL == att ){perror( PROGNAME "name_attach()" );exit( EXIT_FAILURE );}/** create a connection to the synchronous channel created by the name_attach() call. Will* use this to specify where the pulses flagging async data available should be delivered.*/self_coid = ConnectAttach( 0, 0, att->chid, _NTO_SIDE_CHANNEL, 0 );if ( -1 == self_coid ){perror( PROGNAME "ConnectAttach" );exit( EXIT_FAILURE );}/** and fill in the event structure to describe a priority 10 pulse to be delivered to my* synchronous channel*/SIGEV_PULSE_INIT( &sigev, self_coid, 10, PULSE_ASYNCH_EVENT, 0 );/** create an asynchronous channel with automatic buffering* - it will not block an asyncmsg_get() call if there are no messages available* - it will receive up to 10 messages of up to 1024 bytes each at once* I will get a pulse notification when the queue of available messages goes from empty to non-empty*/achid = asyncmsg_channel_create( _NTO_CHF_ASYNC_NONBLOCK, 0666, 1024, 10, &sigev, NULL );if ( -1 == achid ){perror( "asyncmsg_channel_create" );exit( EXIT_FAILURE );}while ( 1 ){rcvid = MsgReceive( att->chid, &recv_buf, sizeof( recv_buf ), &msg_info );if ( -1 == rcvid ){perror( PROGNAME "MsgReceive failed" );continue;}if ( 0 == rcvid ){/* we received a pulse */printf( "got a pulse\n" );switch ( recv_buf.pulse.code ){/* system disconnect pulse */case _PULSE_CODE_DISCONNECT:ConnectDetach( recv_buf.pulse.scoid );printf( PROGNAME "disconnect from a client %X\n", recv_buf.pulse.scoid );break;/* our pulse - we've got one or more messages */case PULSE_ASYNCH_EVENT:/* get one or more messages from our channel */agh = asyncmsg_get( achid );if ( NULL == agh ){perror( "went to get a message, but nothing was there" );} else {/** the async receive header is, actually, a linked list of headers* if multiple messages have been received at once, so we need to* walk the list, looking at each header and message in turn*/while ( agh ){printf( "the message came from %d in %d parts\n", agh->info.pid, agh->parts );printf( "the data is '%s'\n", (char *)(agh->iov->iov_base) );agh1 = agh;agh = agh1->next;/* free this message */asyncmsg_free( agh1 );}}break;default:printf( PROGNAME "unexpected pulse code: %d\n", recv_buf.pulse.code );break;}continue;}/* not an error, not a pulse, therefor a message */if ( recv_buf.type == _IO_CONNECT ){/* _IO_CONNECT because someone did a name_open() to us, must EOK it.*/MsgReply( rcvid, EOK, NULL, 0 );continue;}if ( recv_buf.type > _IO_BASE && recv_buf.type <= _IO_MAX ){/* unexpected system message,probably qconn, error it */MsgError( rcvid, ENOSYS );continue;}switch( recv_buf.type ){/* here our client asked for our asynchronous channel id reply with it */case GET_ACHID:printf( "got request for my achid\n" );MsgReply( rcvid, 0, &achid, sizeof( achid ) );break;default:/* some other expect message */printf( PROGNAME "expect message type: %d\n", recv_buf.type );MsgError( rcvid, ENOSYS );break;}}return (0);}
Последующий файл, asynch_client.c, сначала нахоидт сервер, используя name_locate() затем отправляет запрос для получения идентификатора асинхронного канала сервера. Затем он создает асинхронное соединение и каждую секунду отправляет сообщение с обратным вызовом, печатающее, что сообщение было отправлено.
#include <stdlib.h>#include <stdio.h>#include <errno.h>#include <sys/siginfo.h>#include <sys/neutrino.h>#include <sys/dispatch.h>#include <sys/asyncmsg.h>#include <unistd.h>#include <sys/trace.h>#include "asynch_server.h"#define PROGNAME "asynch_client: "/** callback is run after each message has been delivered to the server. It is registered at* connect time and called by a thread created during the connect processing.*/int callback( int err, void *cmsg, unsigned handle ){static int num_callback_run = 0;++num_callback_run;printf( "Callback: err = %d, msg = %p, handle = %d\n", err, cmsg, handle );printf( "run the %dth time by %d tid\n", num_callback_run, pthread_self() );return (0);}int main( int argc, char *argv[] ){int server_coid, a_coid;short msg;int a_chid;struct _asyncmsg_connection_attr aca;struct _server_info sinfo;int count = 1;/* look for server */server_coid = name_open( RECV_NAME, 0 );while ( server_coid == -1 ){sleep( 1 );server_coid = name_open( RECV_NAME, 0 );}/* need server's pid, use this to get server info including pid */ConnectServerInfo( 0, server_coid, &sinfo );/* send message to server request the asynchronous channel id */msg = GET_ACHID;if ( MsgSend( server_coid, &msg, sizeof( msg ), &a_chid, sizeof( a_chid ) ) ){perror( PROGNAME "MsgSend" );exit( EXIT_FAILURE );}/* setup array of send buffers */memset( &aca, 0, sizeof( aca ) );aca.buffer_size = 2048;aca.max_num_buffer = 5; /* don't locally accumulate more than 5 buffers */aca.trigger_num_msg = 1; /* deliver each message as it is sent */aca.call_back = callback; /* call this function after messages are sent *//* create my asynchronous connection to the server */a_coid = asyncmsg_connect_attach( 0, sinfo.pid, a_chid, 0, 0, &aca );if ( -1 == a_coid ){perror( "async connect" );printf( "server pid is %d\n", sinfo.pid );exit( EXIT_FAILURE );}while ( 1 ){int ret, len;char buf[80];len = sprintf( buf, "message #%d", count ) + 1;printf( "sending %dth time\n", count );/** deliver message to server, the count value will be passed into my callback as the "handle"*/ret = asyncmsg_put( a_coid, buf, len, count, NULL );if ( -1 == ret ){perror( "put" );exit( EXIT_FAILURE );}count++;sleep( 1 );}return (0);}
Предыдущий раздел: Описание API библиотеки asyncmsg