Собственно вопрос о взаимодействии между процессами: Предыстория: - реализован обмен данными между процессами через файлы, создаваемые ими в RAM диске, но проблема с количеством открываемых файлов. А именно один процесс требует данных от нескольких других, да и доступ одного процесса к файлу другого, когда он производит запись затруднителен. - позже был переход на схему с применением сервера связи. Когда все запущенные процессы (программы) производят соединение с одним сервером, который принимает и отдаёт данные, работая сразу со всеми процессами. Схема вполне рабочая (сейчас проходит обкатка и доработка - и реализуется проект для эксплуатации). Причины вопроса: по команде netstat выводится (фрагмент): unix 3 [ ] STREAM CONNECTED 21903 unix 3 [ ] STREAM CONNECTED 21896 @/tmp/dbus-8Ytxl6pU7t unix 3 [ ] STREAM CONNECTED 20872 unix 3 [ ] STREAM CONNECTED 15278 @/tmp/dbus-8Ytxl6pU7t unix 3 [ ] STREAM CONNECTED 17892 unix 3 [ ] STREAM CONNECTED 15259 @/tmp/dbus-8Ytxl6pU7t unix 3 [ ] STREAM CONNECTED 21736 unix 3 [ ] STREAM CONNECTED 17888 @/tmp/dbus-8Ytxl6pU7t unix 3 [ ] STREAM CONNECTED 21735 unix 3 [ ] STREAM CONNECTED 21734 @/tmp/dbus-8Ytxl6pU7t unix 3 [ ] STREAM CONNECTED 17887 unix 3 [ ] STREAM CONNECTED 15258 @/tmp/.ICE-unix/4788 то есть происходит взаимодействие по UNIX сокетам - понятное дело, что это системные взаимодействия. Собственно по теме: Имеет ли кто опыт использования UNIX сокеты для обмена данными между процессами(программами). Сразу скажу что с потоками(нитями) работаю давно - многопотчные приложения и переключения между потоками по схеме sleep, usleep не устраивает, хоть и широко применяю pthread в каждом процессе. Хотелось бы узнать про это (взаимодействия через сокеты) на каком-нибудь примере. Заранее спасибо.
Я вам посоветую крайне хорошую книгу на тему взаимодействия процессов. Вообще пожалуй это "единственная" книга на эту тему от очень достойного автора. По крайней мере лучше не встречал пока. Стивенс У. UNIX. Взаимодействие процессов Купить её уже нельзя, но скачать в интернете проблем не составляет. P.s. Если у кого она есть в бумажном виде и он не против её продать. Готов купить.
Спасибо, что откликнулись - книга с подобным пунктом ("Программирование в среде Linux") есть при создании дочерних процессов. Но то что я качал содержит вот это: В принципе то, что надо а документ (PDF) с возможностью навигации внутри, а не просто картинка. Спасибо! Да и судя по беглому просмотру в ней (книге) больше, чем я спаршивал. Но текущий проект придётся допилить, так как он "размазан" на несколько устройств, соединённых по локальной сети, среди которых Raspberry на котором 4 независимых процесса: - процесс - проект с применением OpenCV - слежение за отклонением по веб камере (только сбор объектов); - процесс - сервер доступа по I2C к MultiServo и ComMotion driver for 4 motors и др.; - процесс - процесс управления приводами через доступ по сети к серверу I2C; - процесс - (планируется предоставление объектов от камеры) доступ. такая схема удобна для отладки на ноуте с дальнейшим переносом либо на RPi на колёсах, либо на смонтированном втором RPi. Ещё раз СПАСИБО!
Да кстати сокеты - коли название (может кому надо): 1) открытие послушивания: Код (C++): //старт сервера соединений int server_start(int ipp) { listener = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP); if(listener < 0) return 1; addr.sin_family = AF_INET; addr.sin_port = htons(ipp); addr.sin_addr.s_addr = htonl(INADDR_ANY); memset(&addr.sin_zero, 0, 8); if(bind(listener, (struct sockaddr *)&addr, sizeof(addr)) < 0) return 2; if(listen(listener, _max_sock) < 0) return 3; //mutex create if(!(pthread_mutex_init(&ServerMutex,NULL) == 0)) return 4; //starting listen cserver.work = 1; pthread_create(&hListenProcess,NULL,(void*) server_listen,NULL); sleep(1); //wait one secoond // return 0; } тут создаётся поток прослушивания. далее само прослушивание: Код (C++): int server_listen(void) { int err; fd_set fdss; struct timeval tvs; int count; int csock; pthread_mutex_lock(&ServerMutex); //инициализация данных memset(&cserver.midcl[0], 0, _max_sock); cserver.cntclient = 0; cserver.setcntlive = _time_life; //основной цикл прослушивания while(cserver.work) { mt1: tvs.tv_sec = 0; tvs.tv_usec = _LSOCKET_TIMEOUT_; FD_ZERO(&fdss); FD_SET(listener,&fdss); // err = select(listener + 1,&fdss,&fdss,NULL,&tvs); //err = select(listener + 1,&fdss,NULL,&fdss,&tvs); if(err <= 0) { pthread_mutex_unlock(&ServerMutex); usleep(_listen_sleep); pthread_mutex_lock(&ServerMutex); goto mt1; } csock = accept(listener,NULL,NULL); if((cserver.csock > 0) && (csock == cserver.csock)) { pthread_mutex_unlock(&ServerMutex); usleep(_listen_sleep); pthread_mutex_lock(&ServerMutex); goto mt1; } //создание потока клиента printf("-----CLIENT-----\r\n"); printf("-- sock. %i --\r\n", csock); count = 0; while(count < _max_sock) { if(cserver.midcl[count] == 0) { cserver.csock = csock; cserver.idclient = count; cserver.midcl[count] = 0xFF; pthread_create(&cserver.pth[count],NULL,(void*) server_con, &cserver); cserver.cntclient++; break; } count++; } // pthread_mutex_unlock(&ServerMutex); usleep(_listen_sleep); pthread_mutex_lock(&ServerMutex); } return 0; } обратите внимание, что в моём случае создаётся поток для обслуживания одного соединения, но с учётом мютекса доступа к общим данным: Код (C++): int server_con(CSERV * cs) { int countlive = cs->setcntlive; int setcntlive = cs->setcntlive; int csock = cs->csock; int idclient = cs->idclient; U8 rxbuf[_size_buf]; U8 txbuf[_size_buf]; int txsz = 0; int res; // printf("client connect\n"); // pthread_mutex_lock(&ServerMutex); // while(1) { #if(_deb > 0) printf("client_cnt: %i\n", countlive); #endif //чтение сокета txsz = 0; pthread_mutex_unlock(&ServerMutex); res = ReadSocket(csock, &rxbuf[0]); pthread_mutex_lock(&ServerMutex); switch(res) { case 0: //Ok break; case -1: //таймаут #if(_deb == 1) printf("TCP recv.:%i timeout..\r\n",cs->midcl[idclient]); #endif break; case -2: //какая-то ошибка сокета #if(_deb == 1) printf("TCP resv.:%i err?..\r\n",cs->midcl[idclient]); #endif goto mexit; case -3: //ошибка размера пакета //передача сообщения о несоответствии #if(_deb == 1) printf("TCP client:%i frame no correct..\r\n",cs->midcl[idclient]); #endif goto mexit; //отмена обработки запроса default: //нормально - данные приняты countlive = setcntlive; //обработка команд и данных сокета txsz = mwork(&rxbuf[0], &txbuf[0]); break; } //передача данных if(txsz > 0) { pthread_mutex_unlock(&ServerMutex); res = WriteSocket(csock, &txbuf[0], txsz); switch(res) { case -1: //Ok break; case -2: #if(_ssdeb == 1) printf("TCP send.:%i err?..\r\n",cs->midcl[idclient]); #endif goto mexit; default: countlive = setcntlive; break; } //pthread_mutex_lock(&ServerMutex); } //ожидание и новый цикл pthread_mutex_unlock(&ServerMutex); usleep(_conn_sleep); countlive--; if(!(countlive > 0)) //никому это соединение не нужно break; pthread_mutex_lock(&ServerMutex); if(!(cs->work)) //кто-то сказал НЕ РАБОТАТЬ! break; } mexit: close(csock); cs->midcl[idclient] = 0; cs->cntclient--; pthread_mutex_unlock(&ServerMutex); return 0; } ну и по ходу первые два байта - размер. Вы можете сделать по своему. Функции чтения и записи - пардон приёма и передачи: Код (C++): inline int ReadSocket(int nSocket, BYTE *pBuffer) { //int res = 1; int iRC = 0; int iReceiveStatus = 0; int iStillToReceive = 2; //начальный размер получения минимальных данных int iCount = 0; struct timeval ReceiveTimeout; fd_set fds; U8 flag = 0; //fcntl(nSocket, F_SETFL, O_NONBLOCK); //fds = calloc(1, sizeof(fd_set));// fds; //int size = 0; while(iStillToReceive > 0) { //установка величины таймаута FD_ZERO(&fds); FD_SET(nSocket, &fds); ReceiveTimeout.tv_sec = 0; ReceiveTimeout.tv_usec = _SOCKET_TIMEOUT_; iRC = select(nSocket + 1, &fds, NULL, NULL, &ReceiveTimeout); //выход по таймауту if(!iRC) return -1; //произошла какая то ошибка if(iRC < 0) return -2; //ошибка указания размера данных if(iStillToReceive <= 0) return -3; //прием нескольких байт #if(_tcp_recv_deb == 1) printf("----read sock (recv) ----\r\n"); #endif iReceiveStatus = recv(nSocket, (char*)(pBuffer), iStillToReceive, 0); if(iReceiveStatus > 0) { pBuffer += iReceiveStatus; iStillToReceive -= iReceiveStatus; iCount += iReceiveStatus; if((iCount >= 2) && (flag == 0)) { iStillToReceive = (*(U16*)(pBuffer - iCount)) - iCount; #if(_tcp_recv_deb == 1) printf("---recv size: set %i, count %i, still %i\r\n", (*(U16*)(pBuffer - iCount)), iCount, iStillToReceive); #endif flag = 0xFF; } } else if(iReceiveStatus == 0) return -1; else return -2; #if(_tcp_recv_deb == 1) printf("==iStillToReceive:%i iCount:%i==\r\n", iStillToReceive, iCount); #endif } // return iCount; } смотрите далее
смотрите предыдущее Код (Text): inline int WriteSocket(int nSocket, BYTE *pBuffer, U16 iMessageLength) { int iCount = 0; int iSC = 0; int iSendStatus = 0; struct timeval SendTimeout; fd_set fds; //fcntl(nSocket, F_SETFL, O_NONBLOCK); while(iMessageLength > 0) { //установка величины таймаута SendTimeout.tv_sec = 0; SendTimeout.tv_usec = _SOCKET_TIMEOUT_; FD_ZERO(&fds); FD_SET(nSocket, &fds); iSC = select(nSocket + 1, NULL, &fds, NULL, &SendTimeout); if(!iSC) return -1; //таймаут if(iSC < 0) return -2; //ошибка //отправить несколько байт #if(_tcp_send_deb == 1) printf("----write sock (send) ----\r\n"); #endif iSendStatus = send(nSocket, (char*)(pBuffer), iMessageLength, 0); if(iSendStatus > 0) { //обновить буфер и счетчик iMessageLength -= iSendStatus; pBuffer += iSendStatus; iCount += iSendStatus; } else if(iSendStatus == 0) return -1; else return -2; } return iCount; } таким образом у меня сервер. клиент (опять из моих нужд) : Код (C++): int cclient_connect(void) { //int res; cclient.socket = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP); if(cclient.socket < 0) return -1; ccaddr.sin_family = AF_INET; ccaddr.sin_port = htons(cclient.ipport); ccaddr.sin_addr.s_addr = inet_addr((char*)(&cclient.str_ip_url[0])); memset(&ccaddr.sin_zero, 0, 8); // if(connect(cclient.socket, (struct sockaddr *)&ccaddr, sizeof(ccaddr)) < 0) return -2; ClientWork = 1; pthread_create(&hClientProcess,NULL,(void*) client_process, NULL); return 0; } тут тоже делаю поток для работы с сервером, но в моём случае мютекс не требуется: Код (C++): int client_process(void) { U8 txbuf[_szbuf]; U8 rxbuf[_szbuf]; int txsz = 0; int res; while(ClientWork) { //организация запроса txsz = mworks(&txbuf[0]); //передача данных if(txsz > 0) { res = cWriteSocket(cclient.socket, &txbuf[0], txsz); switch(res) { case 0: // break; default: break; } } usleep(_conn_sleep); res = cReadSocket(cclient.socket, &rxbuf[0]); //приём данных switch(res) { case 0: //Ok //countlive = setcntlive; //mworkr(&rxbuf[0]); break; case -1: //таймаут #if(_deb == 1) printf("TCP client recv. timeout..\r\n"); #endif break; case -2: //какая-то ошибка сокета #if(_deb == 1) printf("TCP client resv. err?..\r\n"); #endif goto mexit; case -3: //ошибка размера пакета //передача сообщения о несоответствии #if(_deb == 1) printf("TCP client frame no correct..\r\n"); #endif goto mexit; //отмена обработки запроса default: //нормально - данные приняты //обработка ответа mworkr(&rxbuf[0]); break; } // usleep(_conn_sleep); } mexit: close(cclient.socket); return 0; } функции чтения и записи (в сокет из сокета) идентичны - как и для сервера. Ну а обработку как в сервере mwork(&rxbuf[0]); и в клиенте (для приёма и передачи) mwork(&rxbuf[0]) mworks(&txbuf[0]) - согласно нуждам; Всё вышесказанное - пример(хоть и вырезка из рабочего)!!!