The OpenNET Project / Index page

[ новости /+++ | форум | теги | ]

Реализация многопотокового "ассинхронного сервра TCP" и RPC для ОС Linux (select thread rpc gcc socket)


<< Предыдущая ИНДЕКС Поиск в статьях src Установить закладку Перейти на закладку Следующая >>
Ключевые слова: select, thread, rpc, gcc, socket,  (найти похожие документы)
From: Б.А. Державец <dba477 at list.ru> Newsgroups: email Date: Mon, 07 May 2004 14:31:37 +0000 (UTC) Subject: Реализация многопотокового "ассинхронного сервра TCP" и RPC для ОС Linux РЕАЛИЗАЦИЯ МНОГОПОТОКОВОГО "АСИНХРОННОГО СЕРВЕРА TCP" И RPC ДЛЯ ОС LINUX ------------------- В заметке приводится код многопотокового эхо-сервера , основанного на использовании неблокирующего ввода вывода и конечных автоматов. Каждый поток сервера использует вызов select( ) для того, чтобы определить по какому из соединений можно производить обмен в данный момент времени. Код для процедуры serv_request , выполняемой ведомыми потоками может быть взят из различных источников ( см. [1],[2],[3]). Параметр , передаваемый в процедуру serv_request , явлется дескриптором пассивного сокета , создаваемый ведущим потоком с помощью вызова процедуры getServerSocket( ). Необходимо отметить, что приведенное решение основано на свойстве Linux эффективно распараллеливать вызов accept( ). В противном случае возникает необходимость в блокировке мьютекса перед вызовом accept( ), что влечет за собой последовательное выполнение потоками сервера критической части кода, т.е. вызова accept( ). В среде же Red Hat Linux 9 (например) эта предосторожность не нужна и только снижает производительность. Детальное описание Posix Threads API на русском языке может быть найдено в [2]. Приводится также модифицированный код заглушки sample_svc.c (не за- висящий от шаблона sample.x) , позволяюший скомпилировать многопото- ковый сервер RPC в среде LINUX. /* * ServerNBTHR.c */ #include <sys/socket.h> #include <sys/types.h> #include <sys/time.h> #include <netinet/in.h> #include <errno.h> #include <string.h> #include <stdio.h> #include <stdlib.h> #include <unistd.h> #include <fcntl.h> #include <pthread.h> #define NUM_THREADS 512 pthread_mutex_t request_mutex = PTHREAD_MUTEX_INITIALIZER; void die(const char *func, int err) { fprintf(stderr,"%s: %s\n",func, strerror(err)); abort(); } void bark(const char *func, int err) { fprintf(stderr,"%s: %s\n",func, strerror(err)); } /* Описание поцедуры ведущего потока , которая возвращает дескрипторов пассивного сокета, привязанного к адресу сервера. */ int getServerSocket(unsigned short int port) { int listenSocket; struct sockaddr_in listenSockaddr; if((listenSocket=socket(PF_INET,SOCK_STREAM,0))<0) die("socket()",errno); memset(&listenSockaddr, 0, sizeof(listenSockaddr)); listenSockaddr.sin_family = PF_INET; listenSockaddr.sin_port = htons(port); listenSockaddr.sin_addr.s_addr = INADDR_ANY; if(bind(listenSocket,(struct sockaddr*)&listenSockaddr, sizeof(listenSockaddr)) < 0) die("bind()",errno); if(listen(listenSocket,5)<0) die("listen()",errno); return listenSocket; } /* Описание процедуры выполняемой всеми ведомыми потоками */ void * serv_request(void *data) { struct connection_cb { int dataSocket; char data[256]; int dataSent; int dataToSend; int isReading; struct connection_cb *next; }; struct connection_cb *connections = NULL; int listenSocket = (int)data; if(fcntl(listenSocket,F_SETFL,O_NONBLOCK)<0) die("fcntl()",errno); while(1) { fd_set readFdSet; fd_set writeFdSet; struct connection_cb *currentConn, **currentConnPtr, *tempConn; int maxFdNum; FD_ZERO(&readFdSet); FD_ZERO(&writeFdSet); /* Добавление дескриптора к множеству readFdSet */ FD_SET(listenSocket,&readFdSet); maxFdNum = listenSocket; for(currentConn = connections;currentConn!=NULL;currentConn = currentConn->next) { if(currentConn->isReading) FD_SET(currentConn->dataSocket,&readFdSet); else FD_SET(currentConn->dataSocket,&writeFdSet); maxFdNum = currentConn->dataSocket > maxFdNum ?currentConn- >dataSocket : maxFdNum; } /* Получение множества дескрипторов сокетов для обработки */ if(select(maxFdNum+1,&readFdSet,&writeFdSet,NULL,NULL) < 0) { if(errno == EINTR) continue; die("select()",errno); } currentConnPtr=&connections; while(*currentConnPtr!=NULL) { /* Проверка принадлежности дескриптора (*currentConnPtr)->dataSocket к множеству readFdSet */ if((*currentConnPtr)->isReading && FD_ISSET((*currentConnPtr)->dataSocket,&readFdSet)) { int result = recv((*currentConnPtr)->dataSocket, (*currentConnPtr)->data, sizeof((*currentConnPtr)->data),0); if(result < 0) { if(errno!=EINTR && errno!=EAGAIN && errno!=EWOULDBLOCK) { bark("recv()",errno); close((*currentConnPtr)->dataSocket); tempConn = *currentConnPtr; *currentConnPtr = (*currentConnPtr)->next; free(tempConn); continue; } } else if(result==0) { close((*currentConnPtr)->dataSocket); tempConn = *currentConnPtr; *currentConnPtr = (*currentConnPtr)->next; free(tempConn); continue; } else { (*currentConnPtr)->dataToSend = result; (*currentConnPtr)->dataSent = 0; (*currentConnPtr)->isReading = 0; printf("Recieving as Slave Thread id = '%d' \n",pthread_self()); } } else /* Проверка принадлежности дескриптора (*currentConnPtr)->dataSocket к множеству writedFdSet */ if(FD_ISSET((*currentConnPtr)->dataSocket,&writeFdSet)) { int result = send((*currentConnPtr)->dataSocket, (*currentConnPtr)->data+(*currentConnPtr)->dataSent, (*currentConnPtr) ->dataToSend-(*currentConnPtr)->dataSent, 0); if(result < 0) { if(errno!=EINTR && errno!=EAGAIN) { bark("write()",errno); close((*currentConnPtr)->dataSocket); tempConn = *currentConnPtr; *currentConnPtr = (*currentConnPtr)->next; free(tempConn); continue; } } else { (*currentConnPtr)->dataSent +=result; if((*currentConnPtr)->dataSent >= (*currentConnPtr)->dataToSend) (*currentConnPtr)->isReading = 1; } } currentConnPtr = &((*currentConnPtr)->next); printf("Sending as Slave Thread id = '%d' \n",pthread_self()); } /* Проверка принадлежности дескриптора listenSocket к множеству readFdSet,т.е. необходимости обработать вызов connect( ) от нового клиента. */ if(FD_ISSET(listenSocket,&readFdSet)) { while(1) { /* Вызовы pthread_mutex_lock, pthread_mutex_unlock Не нужны в среде Linux */ pthread_mutex_lock(&request_mutex); int result = accept(listenSocket,(struct sockaddr*)NULL,NULL); pthread_mutex_unlock(&request_mutex); if(result < 0) { if(errno==EAGAIN || errno == EWOULDBLOCK) break; die("accept()",errno); } else { *currentConnPtr = malloc(sizeof(struct connection_cb)); if(*currentConnPtr==NULL) die("malloc()",0); if(fcntl(result,F_SETFL,O_NONBLOCK)<0) die("fcntl()",errno); (*currentConnPtr)->dataSocket = result; (*currentConnPtr)->isReading = 1; (*currentConnPtr)->next = 0; currentConnPtr = &((*currentConnPtr)->next); printf("Accepting as Master Thread id = '%d' \n",pthread_self()); } } } } } int main(int argc,char *argv[]) { int k; int descSock; char *service="1500"; switch(argc) { case 1: break; case 2: service = argv[1]; break; default: printf ("Usage: ./ServerBNTH [port]\n"); exit(1); } size_t stacksize; pthread_t p_thread[NUM_THREADS]; /* Установка размера стека для ведомых потоков */ pthread_attr_t attr; pthread_attr_init(&attr); stacksize = 500000; pthread_attr_setstacksize (&attr, stacksize); pthread_attr_getstacksize (&attr, &stacksize); /* Получение значения дескриптора пассивного сокета */ descSock = getServerSocket(atoi(service)); /* Запуск ведомых потоков */ for(k=0; k<NUM_THREADS; k++) { pthread_create(&p_thread[k],&attr,serv_request,(void*)descSock); printf("Thread %d started \n",k); } pthread_attr_destroy(&attr); for(k=0;k<NUM_THREADS;k++) { pthread_join(p_thread[k], NULL); printf("Completed join with thread %d\n",k); } }
Ниже приведен модифицированный код square_svc.c (для шаблона square.x), позволяющий скомпилировать многопотоковый сервер RPC в среде Red Hat Linux 9.0 Шаблон square.x: struct square_in { long arg1; }; struct square_out { long res1; }; program SQUARE_PROG { version SQUARE_VERS { square_out SQUAREPROC(square_in) = 1; } = 2 ; } = 0x31230000; Вызов rpcgen для генерации заглушек клиента,сервера и xdr файла : $ rpcgen -a -M square.x
Код процедур сервера: /* ServerSideProc.c */ #include "square.h" #include <stdio.h> #include <stdlib.h> #include <rpc/pmap_clnt.h> #include <string.h> #include <memory.h> #include <sys/socket.h> #include <netinet/in.h> int request=0; bool_t squareproc_2_svc(square_in *inp,square_out *outp,struct svc_req *rqstp) { printf("Thread id = '%ld' started, arg = %d\n",pthread_self(),inp->arg1); /* Имитация работы процедуры , выполняемой потоками сервера */ sleep(5); outp->res1=inp->arg1*inp->arg1; printf("Thread id = '%ld' is done %d \n",pthread_self(),outp->res1); return(TRUE); } int square_prog_2_freeresult(SVCXPRT *transp,xdrproc_t xdr_result, caddr_t result) { xdr_free(xdr_result,result); return(1); }
Модифицированный файл square_svc.c: /* square_svc.c * Please do not edit this file. * It was generated using rpcgen. */ #include "square.h" #include <stdio.h> #include <stdlib.h> #include <rpc/pmap_clnt.h> #include <string.h> #include <memory.h> #include <sys/socket.h> #include <netinet/in.h> #ifndef SIG_PF #define SIG_PF void(*)(int) #endif pthread_t p_thread; pthread_attr_t attr; /* Процедура выполняемая потоком */ void * serv_request(void *data) { struct thr_data { struct svc_req *rqstp; SVCXPRT *transp; } *ptr_data; { union { square_in squareproc_2_arg; } argument; union { square_out squareproc_2_res; } result; bool_t retval; xdrproc_t _xdr_argument, _xdr_result; bool_t (*local)(char *, void *, struct svc_req *); /* Распаковка данных , переданных в процедуру при запуске потока. */ ptr_data = (struct thr_data *)data; struct svc_req *rqstp = ptr_data->rqstp; register SVCXPRT *transp = ptr_data->transp; switch (rqstp->rq_proc) { case NULLPROC: (void) svc_sendreply (transp, (xdrproc_t) xdr_void, (char *)NULL); return; case SQUAREPROC: _xdr_argument = (xdrproc_t) xdr_square_in; _xdr_result = (xdrproc_t) xdr_square_out; local = (bool_t (*) (char *, void *, struct svc_req *))squareproc_2_svc; break; default: svcerr_noproc (transp); return; } memset ((char *)&argument, 0, sizeof (argument)); if (!svc_getargs (transp, (xdrproc_t) _xdr_argument, (caddr_t) &argument)) { svcerr_decode (transp); return; } /* Стандартный вызов функции сервера. Данные для вызова уже приведены к стандарту. */ retval = (bool_t) (*local)((char *)&argument, (void *)&result, rqstp); if (retval > 0 && !svc_sendreply(transp, (xdrproc_t) _xdr_result, (char *)&result)) { svcerr_systemerr (transp); } if (!svc_freeargs (transp, (xdrproc_t) _xdr_argument, (caddr_t) &argument)) { fprintf (stderr, "%s", "unable to free arguments"); exit (1); } if (!square_prog_2_freeresult (transp, _xdr_result, (caddr_t) &result)) fprintf (stderr, "%s", "unable to free results"); return; } } /* Принципиально измененный код square_prog_2 , стартующей теперь новый поток для каждого инициированного клиентом вызова процедуры на удаленном сервере */ static void square_prog_2(struct svc_req *rqstp, register SVCXPRT *transp) { struct data_str { struct svc_req *rqstp; SVCXPRT *transp; } *data_ptr =(struct data_str*)malloc(sizeof(struct data_str); { /* Упаковка данных в структуру для передачи ссылки на нее, как параметра запускаемому потоку */ data_ptr->rqstp = rqstp; data_ptr->transp = transp; pthread_attr_setdetachstate(&attr,PTHREAD_CREATE_DETACHED); pthread_create(&p_thread,&attr,serv_request,(void *)data_ptr); } } int main (int argc, char **argv) { register SVCXPRT *transp; pmap_unset (SQUARE_PROG, SQUARE_VERS); transp = svcudp_create(RPC_ANYSOCK); if (transp == NULL) { fprintf (stderr, "%s", "cannot create udp service."); exit(1); } if (!svc_register(transp, SQUARE_PROG, SQUARE_VERS, square_prog_2, IPPROTO_UDP)) { fprintf (stderr, "%s", "unable to register (SQUARE_PROG, SQUARE_VERS, udp)."); exit(1); } transp = svctcp_create(RPC_ANYSOCK, 0, 0); if (transp == NULL) { fprintf (stderr, "%s", "cannot create tcp service."); exit(1); } if (!svc_register(transp, SQUARE_PROG, SQUARE_VERS, square_prog_2, IPPROTO_TCP)) { fprintf (stderr, "%s", "unable to register (SQUARE_PROG, SQUARE_VERS, tcp)."); exit(1); } svc_run (); fprintf (stderr, "%s", "svc_run returned"); exit (1); /* NOTREACHED */ } Компиляция ServerSQUARE: $ gcc -o ServerSQUARE ServerSideProc.c square_svc.c square_xdr.c -lprthread -lnsl
Код клиента: /* * ClientSideProc.c */ #include <memory.h> /* for memset */ #include "square.h" #include <stdio.h> #include <stdlib.h> #include <rpc/pmap_clnt.h> #include <string.h> #include <memory.h> #include <sys/socket.h> #include <netinet/in.h> int main (int argc,char **argv) { CLIENT *cl; square_in in; square_out out; if (argc != 3 ) { printf ("Usage : client <hostname> <integer_valus=e>\n"); exit(1); } cl = clnt_create(argv[1],SQUARE_PROG,SQUARE_VERS,"tcp"); if (cl == NULL) { clnt_perror (cl, "call failed"); exit (1); } in.arg1 = atol(argv[2]); if (squareproc_2(&in,&out,cl) != RPC_SUCCESS) { printf ("%s\n" , clnt_perror (cl,argv[1] )); exit(1); } printf("result: %ld\n",out.res1); exit(0); } Компиляция ClientSQUARE: $ gcc -o ClientSQUARE ClientSideProc.c square_clnt.c square_xdr.c -lprthread -lnsl
Далее приведем результаты тестирования (сp. [3] ,Глава "SUN RPC"): [root@dell4500 SQWMT]# cat square.bsh ./ClientSQUARE dell4500.redhat 10 & ./ClientSQUARE dell4500.redhat 11 & \ ./ClientSQUARE dell4500.redhat 12 & ./ClientSQUARE dell4500.redhat 21 & \ ./ClientSQUARE dell4500.redhat 13 & ./ClientSQUARE dell4500.redhat 14 & \ ./ClientSQUARE dell4500.redhat 15 & ./ClientSQUARE dell4500.redhat 16 & \ ./ClientSQUARE dell4500.redhat 17 & ./ClientSQUARE dell4500.redhat 18 & \ ./ClientSQUARE dell4500.redhat 19 & ./ClientSQUARE dell4500.redhat 20 & Вывод на машине клиента: [root@dell4500 SQWMT]# ./square.bsh [root@dell4500 SQWMT]# result: 196 result: 225 result: 256 result: 289 result: 121 result: 144 result: 441 result: 169 result: 100 result: 324 result: 361 result: 400 Вывод на машине сервера: [root@dell4500 SQWMT]# ./ServerSQUARE Thread id = '1082453184' started, arg = 14 Thread id = '1090841664' started, arg = 15 Thread id = '1099230144' started, arg = 16 Thread id = '1116941120' started, arg = 17 Thread id = '1125329600' started, arg = 11 Thread id = '1133718080' started, arg = 12 Thread id = '1142106560' started, arg = 21 Thread id = '1150495040' started, arg = 13 Thread id = '1158883520' started, arg = 10 Thread id = '1167272000' started, arg = 18 Thread id = '1175660480' started, arg = 19 Thread id = '1184048960' started, arg = 20 Thread id = '1082453184' is done 196 Thread id = '1090841664' is done 225 Thread id = '1099230144' is done 256 Thread id = '1116941120' is done 289 Thread id = '1125329600' is done 121 Thread id = '1133718080' is done 144 Thread id = '1142106560' is done 441 Thread id = '1150495040' is done 169 Thread id = '1158883520' is done 100 Thread id = '1167272000' is done 324 Thread id = '1175660480' is done 361 Thread id = '1184048960' is done 400 Литература 1. Дуглас Э. Крамер,Дэвид Л. Стивенс. Сети TCP/IP .Разработка приложений Типа клиент/сервер для LINUX/POSIX . Том 3 Издательский дом "Вильямс",2002 2. http://dlenev.nm.ru/ 3. Стивенс У. UNIX: Взаимодействие процессов.Том 1,2 Из-во "Питер",2002

<< Предыдущая ИНДЕКС Поиск в статьях src Установить закладку Перейти на закладку Следующая >>

Обсуждение [ RSS ]
  • 1.1, Борис Державец (?), 13:17, 21/05/2004 [ответить]  
  • +/
    Хорошо известно, что  Oracle Advanced Replication или  Informix Enterprise Replication
    это либо немедленный ,либо  отложенный  RPC вызов. Следовательно, среда ОС Linux может порождать проблемы с репликациями  баз данных, которые не возникают на платформах , поддерживающих “SUN RPC” , либо “DCE RPC”,  так как обе реализации имеют многопотоковую поддержку
    протокола RPC.
     
     
  • 2.4, аноним (?), 10:47, 27/08/2010 [^] [^^] [^^^] [ответить]  
  • +/
    http://www.linuxjournal.com/article/2204
    Remote Procedure Calls
    From Issue #42
    October 1997
    Oct 01, 1997   By Ed Petron
    >Linux distributions provide an RPC version derived from the RPC facility developed by the Open Network Computing (ONC) group at Sun Microsystems.

    кто сказал, что в лине rpc не Sun RPC?

    также смотреть тут - $ ls /proc/sys/sunrpc/
    и тут - ls /usr/src/linux-source*/Documentation/sysctl/sunrpc.txt

     

  • 1.2, Tiger Trader (?), 04:24, 10/02/2009 [ответить]  
  • +/
    Код, очень старый, багов тьма, да и не для Линухов он изначально написан...

    Начнем, вот с чего:
    "errno!=EAGAIN && errno!=EWOULDBLOCK", да но EAGAIN==EWOULDBLOCK в линуксе, ммм...

    далее, почти каждая функция под SMP может выдать, останов из-за обслуживания сигнала, т.е. EINTR, иными словами, данный код будет падать в die, или не не падать, но работать не верно...


    ЗЫ. Вывод, для высоконагруженных сетевый приложений, Linux - далеко не лучший выбор...

     
     
  • 2.3, аноним (?), 10:36, 27/08/2010 [^] [^^] [^^^] [ответить]  
  • +/
    прикольно
    >Код, очень старый, багов тьма, да и не для Линухов
    >ЗЫ. Вывод, для высоконагруженных сетевый приложений, Linux - далеко не лучший выбор...

    логика что писец. маркетинг?

     

  • 1.5, backbone (ok), 12:05, 03/05/2011 [ответить]  
  • +1 +/
    Супер-статья! Нижайший поклон автору! =)
     
  • 1.6, funtom (?), 16:30, 14/06/2013 [ответить]  
  • +/
    Наилучший способ разработки ПО это использование уже готовых оттестированных модулей. К примеру Boost.Asio (http://boost.org) или Unicomm (http://libunicomm.org)
     
  • 1.7, rrrFer (?), 09:26, 11/01/2014 [ответить]  
  • +/
    Вторая ссылка в литературе не рабочая.
     

     Добавить комментарий
    Имя:
    E-Mail:
    Заголовок:
    Текст:




    Партнёры:
    PostgresPro
    Inferno Solutions
    Hosting by Hoster.ru
    Хостинг:

    Закладки на сайте
    Проследить за страницей
    Created 1996-2024 by Maxim Chirkov
    Добавить, Поддержать, Вебмастеру