Raspberry - сокеты: нужен совет (подсказка)

Тема в разделе "Raspberry Pi", создана пользователем Igor68, 9 июл 2016.

  1. Igor68

    Igor68 Гуру

    Собственно вопрос о взаимодействии между процессами:
    Предыстория:
    - реализован обмен данными между процессами через файлы, создаваемые ими в RAM диске, но проблема с количеством открываемых файлов. А именно один процесс требует данных от нескольких других, да и доступ одного процесса к файлу другого, когда он производит запись затруднителен.
    - позже был переход на схему с применением сервера связи. Когда все запущенные процессы (программы) производят соединение с одним сервером, который принимает и отдаёт данные, работая сразу со всеми процессами. Схема вполне рабочая (сейчас проходит обкатка и доработка - и реализуется проект для эксплуатации).
    Причины вопроса:
    по команде netstat выводится (фрагмент):
    unix 3 [ ] STREAM CONNECTED 21903
    unix 3 [ ] STREAM CONNECTED 21896 @/tmp/dbus-8Ytxl6pU7t
    unix 3 [ ] STREAM CONNECTED 20872
    unix 3 [ ] STREAM CONNECTED 15278 @/tmp/dbus-8Ytxl6pU7t
    unix 3 [ ] STREAM CONNECTED 17892
    unix 3 [ ] STREAM CONNECTED 15259 @/tmp/dbus-8Ytxl6pU7t
    unix 3 [ ] STREAM CONNECTED 21736
    unix 3 [ ] STREAM CONNECTED 17888 @/tmp/dbus-8Ytxl6pU7t
    unix 3 [ ] STREAM CONNECTED 21735
    unix 3 [ ] STREAM CONNECTED 21734 @/tmp/dbus-8Ytxl6pU7t
    unix 3 [ ] STREAM CONNECTED 17887
    unix 3 [ ] STREAM CONNECTED 15258 @/tmp/.ICE-unix/4788
    то есть происходит взаимодействие по UNIX сокетам - понятное дело, что это системные взаимодействия.

    Собственно по теме:
    Имеет ли кто опыт использования UNIX сокеты для обмена данными между процессами(программами). Сразу скажу что с потоками(нитями) работаю давно - многопотчные приложения и переключения между потоками по схеме sleep, usleep не устраивает, хоть и широко применяю pthread в каждом процессе. Хотелось бы узнать про это (взаимодействия через сокеты) на каком-нибудь примере.

    Заранее спасибо.
     
  2. NR55RU

    NR55RU Гик

    Я вам посоветую крайне хорошую книгу на тему взаимодействия процессов. Вообще пожалуй это "единственная" книга на эту тему от очень достойного автора. По крайней мере лучше не встречал пока.
    Стивенс У.
    UNIX. Взаимодействие процессов

    [​IMG]

    Купить её уже нельзя, но скачать в интернете проблем не составляет. :)


    P.s. Если у кого она есть в бумажном виде и он не против её продать. Готов купить. :)
     
    Igor68 нравится это.
  3. Igor68

    Igor68 Гуру

    Спасибо, что откликнулись - книга с подобным пунктом ("Программирование в среде Linux") есть при создании дочерних процессов. Но то что я качал содержит вот это:
    upload_2016-7-9_15-13-45.png
    В принципе то, что надо а документ (PDF) с возможностью навигации внутри, а не просто картинка. Спасибо! Да и судя по беглому просмотру в ней (книге) больше, чем я спаршивал.
    Но текущий проект придётся допилить, так как он "размазан" на несколько устройств, соединённых по локальной сети, среди которых Raspberry на котором 4 независимых процесса:
    - процесс - проект с применением OpenCV - слежение за отклонением по веб камере (только сбор объектов);
    - процесс - сервер доступа по I2C к MultiServo и ComMotion driver for 4 motors и др.;
    - процесс - процесс управления приводами через доступ по сети к серверу I2C;
    - процесс - (планируется предоставление объектов от камеры) доступ.
    такая схема удобна для отладки на ноуте с дальнейшим переносом либо на RPi на колёсах, либо на смонтированном втором RPi.
    Ещё раз СПАСИБО!
     
  4. Igor68

    Igor68 Гуру

    Да кстати сокеты - коли название (может кому надо):
    1) открытие послушивания:
    Код (C++):
    //старт сервера соединений
    int server_start(int ipp)
    {
        listener = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
            if(listener < 0)
                    return 1;
            addr.sin_family = AF_INET;
            addr.sin_port = htons(ipp);
            addr.sin_addr.s_addr = htonl(INADDR_ANY);
            memset(&addr.sin_zero, 0, 8);
            if(bind(listener, (struct sockaddr *)&addr, sizeof(addr)) < 0)
                    return 2;
            if(listen(listener, _max_sock) < 0)
                    return 3;
            //mutex create
            if(!(pthread_mutex_init(&ServerMutex,NULL) == 0))
                    return 4;
            //starting listen
            cserver.work = 1;
            pthread_create(&hListenProcess,NULL,(void*) server_listen,NULL);
            sleep(1);       //wait one secoond
            //
            return 0;
    }
    тут создаётся поток прослушивания. далее само прослушивание:
    Код (C++):
    int server_listen(void)
    {
        int err;
            fd_set fdss;
            struct timeval tvs;
            int count;
            int csock;
        pthread_mutex_lock(&ServerMutex);
            //инициализация данных
            memset(&cserver.midcl[0], 0, _max_sock);
            cserver.cntclient         = 0;
        cserver.setcntlive        = _time_life;
        //основной цикл прослушивания
            while(cserver.work)
            {
    mt1:            tvs.tv_sec = 0;
                    tvs.tv_usec = _LSOCKET_TIMEOUT_;
                    FD_ZERO(&fdss);
                    FD_SET(listener,&fdss);
                    //
                    err = select(listener + 1,&fdss,&fdss,NULL,&tvs);
                    //err = select(listener + 1,&fdss,NULL,&fdss,&tvs);
                    if(err <= 0)
                    {
                pthread_mutex_unlock(&ServerMutex);
                            usleep(_listen_sleep);
                pthread_mutex_lock(&ServerMutex);
                            goto mt1;
                    }
                    csock = accept(listener,NULL,NULL);
                    if((cserver.csock > 0) && (csock == cserver.csock))
                    {
                pthread_mutex_unlock(&ServerMutex);
                            usleep(_listen_sleep);
                pthread_mutex_lock(&ServerMutex);
                            goto mt1;
                    }
            //создание потока клиента
                    printf("-----CLIENT-----\r\n");
                    printf("-- sock. %i --\r\n", csock);
                    count = 0;
                    while(count < _max_sock)
                    {
                            if(cserver.midcl[count] == 0)
                            {
                                    cserver.csock             = csock;
                                    cserver.idclient         = count;
                    cserver.midcl[count]        = 0xFF;
                                    pthread_create(&cserver.pth[count],NULL,(void*) server_con, &cserver);
                                    cserver.cntclient++;
                    break;
                            }
                            count++;
                    }
                    //
                    pthread_mutex_unlock(&ServerMutex);
                    usleep(_listen_sleep);
            pthread_mutex_lock(&ServerMutex);
            }
        return 0;
    }
     
    обратите внимание, что в моём случае создаётся поток для обслуживания одного соединения, но с учётом мютекса доступа к общим данным:
    Код (C++):
    int server_con(CSERV * cs)
    {
        int    countlive     = cs->setcntlive;
        int    setcntlive    = cs->setcntlive;
        int    csock        = cs->csock;
        int    idclient    = cs->idclient;
        U8    rxbuf[_size_buf];
        U8    txbuf[_size_buf];
        int    txsz         = 0;
        int    res;
        //
        printf("client connect\n");
        //
        pthread_mutex_lock(&ServerMutex);
        //
        while(1)
        {
            #if(_deb > 0)
                printf("client_cnt: %i\n", countlive);
            #endif
            //чтение сокета
            txsz = 0;
            pthread_mutex_unlock(&ServerMutex);
            res = ReadSocket(csock, &rxbuf[0]);
            pthread_mutex_lock(&ServerMutex);
            switch(res)
            {
                case 0:    //Ok
                    break;
                case -1: //таймаут
                    #if(_deb == 1)
                        printf("TCP recv.:%i timeout..\r\n",cs->midcl[idclient]);
                    #endif
                    break;
                case -2: //какая-то ошибка сокета
                    #if(_deb == 1)
                        printf("TCP resv.:%i err?..\r\n",cs->midcl[idclient]);
                    #endif
                    goto mexit;
                case -3: //ошибка размера пакета
                    //передача сообщения о несоответствии
                    #if(_deb == 1)
                        printf("TCP client:%i frame no correct..\r\n",cs->midcl[idclient]);
                    #endif
                    goto mexit; //отмена обработки запроса
                default: //нормально - данные приняты
                    countlive = setcntlive;
                    //обработка команд и данных сокета
                    txsz = mwork(&rxbuf[0], &txbuf[0]);
                    break;
            }
            //передача данных
            if(txsz > 0)
            {
                pthread_mutex_unlock(&ServerMutex);
                res = WriteSocket(csock, &txbuf[0], txsz);
                switch(res)
                {
                    case -1:        //Ok
                        break;
                    case -2:
                        #if(_ssdeb == 1)
                            printf("TCP send.:%i err?..\r\n",cs->midcl[idclient]);
                        #endif
                        goto mexit;
                    default:
                        countlive = setcntlive;
                        break;
                }
                //pthread_mutex_lock(&ServerMutex);
            }
            //ожидание и новый цикл
            pthread_mutex_unlock(&ServerMutex);
            usleep(_conn_sleep);
            countlive--;
            if(!(countlive > 0)) //никому это соединение не нужно
                break;
            pthread_mutex_lock(&ServerMutex);
            if(!(cs->work)) //кто-то сказал НЕ РАБОТАТЬ!
                break;
        }
    mexit:    close(csock);
        cs->midcl[idclient] = 0;
        cs->cntclient--;
        pthread_mutex_unlock(&ServerMutex);
        return 0;
    }
     
    ну и по ходу первые два байта - размер. Вы можете сделать по своему. Функции чтения и записи - пардон приёма и передачи:
    Код (C++):
    inline int ReadSocket(int nSocket, BYTE *pBuffer)
    {
            //int     res                     = 1;
            int         iRC                     = 0;
            int         iReceiveStatus          = 0;
            int         iStillToReceive         = 2; //начальный размер получения минимальных данных
            int         iCount = 0;
            struct timeval     ReceiveTimeout;
            fd_set         fds;
        U8        flag            = 0;
        //fcntl(nSocket, F_SETFL, O_NONBLOCK);
            //fds = calloc(1, sizeof(fd_set));// fds;
            //int size = 0;
            while(iStillToReceive > 0)
            {
                    //установка величины таймаута
                    FD_ZERO(&fds);
                    FD_SET(nSocket, &fds);
                    ReceiveTimeout.tv_sec  = 0;
                    ReceiveTimeout.tv_usec = _SOCKET_TIMEOUT_;
                    iRC = select(nSocket + 1, &fds, NULL, NULL, &ReceiveTimeout);
                    //выход по таймауту
                    if(!iRC)
                return -1;
            //произошла какая то ошибка
                    if(iRC < 0)
                            return -2;
            //ошибка указания размера данных
                    if(iStillToReceive <= 0)
                            return -3;
                    //прием нескольких байт
            #if(_tcp_recv_deb == 1)
                printf("----read sock (recv) ----\r\n");
                #endif
            iReceiveStatus = recv(nSocket, (char*)(pBuffer), iStillToReceive, 0);
                    if(iReceiveStatus >  0)
                    {
                pBuffer += iReceiveStatus;
                            iStillToReceive -= iReceiveStatus;
                            iCount += iReceiveStatus;
                if((iCount >= 2) && (flag == 0))
                {
                    iStillToReceive = (*(U16*)(pBuffer - iCount)) - iCount;
                    #if(_tcp_recv_deb == 1)
                        printf("---recv size: set %i, count %i, still %i\r\n", (*(U16*)(pBuffer - iCount)), iCount, iStillToReceive);
                    #endif
                    flag = 0xFF;
                }
                    }
            else if(iReceiveStatus == 0)
                return -1;
            else
                return -2;
            #if(_tcp_recv_deb == 1)
                printf("==iStillToReceive:%i iCount:%i==\r\n", iStillToReceive, iCount);
            #endif
            }
            //
            return iCount;
    }
    смотрите далее
     
    Последнее редактирование: 9 июл 2016
  5. Igor68

    Igor68 Гуру

    смотрите предыдущее
    Код (Text):

    inline int WriteSocket(int nSocket, BYTE *pBuffer, U16 iMessageLength)
    {
         int         iCount        = 0;
            int         iSC         = 0;
            int         iSendStatus     = 0;
            struct timeval     SendTimeout;
            fd_set         fds;
        //fcntl(nSocket, F_SETFL, O_NONBLOCK);
            while(iMessageLength > 0)
            {
            //установка величины таймаута
                SendTimeout.tv_sec  = 0;
                SendTimeout.tv_usec = _SOCKET_TIMEOUT_;
                FD_ZERO(&fds);
                FD_SET(nSocket, &fds);
                    iSC = select(nSocket + 1, NULL, &fds, NULL, &SendTimeout);
                    if(!iSC)
                            return -1; //таймаут
                    if(iSC < 0)
                            return -2; //ошибка
            //отправить несколько байт
            #if(_tcp_send_deb == 1)
                printf("----write sock (send) ----\r\n");
                    #endif
            iSendStatus = send(nSocket, (char*)(pBuffer), iMessageLength, 0);
                    if(iSendStatus > 0)
                    {
                            //обновить буфер и счетчик
                            iMessageLength -= iSendStatus;
                            pBuffer += iSendStatus;
                            iCount += iSendStatus;
                    }
            else if(iSendStatus == 0)
                return -1;
            else
                return -2;
            }
        return iCount;
    }
     
    таким образом у меня сервер.
    клиент (опять из моих нужд) :
    Код (C++):
    int    cclient_connect(void)
    {
        //int    res;
        cclient.socket = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
            if(cclient.socket < 0)
                    return -1;
            ccaddr.sin_family = AF_INET;
            ccaddr.sin_port = htons(cclient.ipport);
            ccaddr.sin_addr.s_addr = inet_addr((char*)(&cclient.str_ip_url[0]));
            memset(&ccaddr.sin_zero, 0, 8);
        //
        if(connect(cclient.socket, (struct sockaddr *)&ccaddr, sizeof(ccaddr)) < 0)
            return -2;
        ClientWork = 1;
        pthread_create(&hClientProcess,NULL,(void*) client_process, NULL);
        return 0;
    }
    тут тоже делаю поток для работы с сервером, но в моём случае мютекс не требуется:
    Код (C++):
    int    client_process(void)
    {
        U8    txbuf[_szbuf];
        U8    rxbuf[_szbuf];
        int    txsz = 0;
        int    res;
        while(ClientWork)
        {
            //организация запроса
            txsz = mworks(&txbuf[0]);
            //передача данных
            if(txsz > 0)
            {
                res = cWriteSocket(cclient.socket, &txbuf[0], txsz);
                switch(res)
                {
                    case 0:        //
                        break;
                    default:
                        break;
                }
            }
            usleep(_conn_sleep);
            res = cReadSocket(cclient.socket, &rxbuf[0]);
            //приём данных
            switch(res)
            {
                case 0:    //Ok
                    //countlive = setcntlive;
                    //mworkr(&rxbuf[0]);
                    break;
                case -1: //таймаут
                    #if(_deb == 1)
                        printf("TCP client recv. timeout..\r\n");
                    #endif
                    break;
                case -2: //какая-то ошибка сокета
                    #if(_deb == 1)
                        printf("TCP client resv. err?..\r\n");
                    #endif
                    goto mexit;
                case -3: //ошибка размера пакета
                    //передача сообщения о несоответствии
                    #if(_deb == 1)
                        printf("TCP client frame no correct..\r\n");
                    #endif
                    goto mexit; //отмена обработки запроса
                default: //нормально - данные приняты
                    //обработка ответа
                    mworkr(&rxbuf[0]);
                    break;
            }
            //
            usleep(_conn_sleep);
        }
    mexit:    close(cclient.socket);
        return 0;
    }
    функции чтения и записи (в сокет из сокета) идентичны - как и для сервера. Ну а обработку как в сервере mwork(&rxbuf[0]); и в клиенте (для приёма и передачи) mwork(&rxbuf[0]) mworks(&txbuf[0]) - согласно нуждам;
    Всё вышесказанное - пример(хоть и вырезка из рабочего)!!!
     
    Последнее редактирование: 9 июл 2016