The OpenNET Project / Index page

[ новости /+++ | форум | теги | ]

форумы  помощь  поиск  регистрация  майллист  вход/выход  слежка  RSS
"Трубопровод между нитями в Linux"
Вариант для распечатки  
Пред. тема | След. тема 
Форумы Программирование под UNIX (Процессы, Треды, RPC)
Изначальное сообщение [ Отслеживать ]

"Трубопровод между нитями в Linux"  +/
Сообщение от pavlinux (ok) on 16-Мрт-10, 18:04 
Задача: UDP сервер, на двух тредах.
Для взаимодействия между тредами решили использовать pipe();

1-й тред: Слушает UDP сокет, по приходу данных пишет в трубу.
2-й тред: Считывает из другого конца трубы, и пишет буфер.

Ниче не работает.
С синтаксисом и типами всё в порядке.

Запор на второй итерации  while (1)  в udp_thread.
Запор решается с помощью poll(), тогда появляется
запор в read(), говорит не верный дискриптор.

Много артефактов, сокращал для понимания на форуме...


/*
*
*/
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <netdb.h>
#include <string.h>
#include <fcntl.h>
#include <errno.h>
#include <pthread.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>

#define UDP_BUFF_SZ (2 <<  8) /*  256b   */
#define PACKET_SIZE (2 << 5) /*   32Kb  */

/* Global pipe */
int pipefd[2] = {EOF, EOF};

/* for setsockopt */
const unsigned long value = PACKET_SIZE;

/* считать ровно 32 кб из pipefd[0] , в буфер */
void *read_thread(void *p __attribute__((unused)))
{
    int sz;
    unsigned int data = PACKET_SIZE;
        char *tmp = (char *)malloc(PACKET_SIZE);

    while (data) {
        if ((sz = read(pipefd[0], tmp, data)) == -1) {
            close(pipefd[0]);
                        free(tmp);
            pthread_exit((void *) EOF);
        }
        data -= sz;
        tmp += sz;
    }
free(tmp);
return(NULL);
}

void *udp_thread(void *a __attribute__((unused)))
{
    socklen_t saddrlen;
    socklen_t val_len = sizeof(value);
    int fd;
    struct sockaddr_in serv_addr;
    char udp_buff[UDP_BUFF_SZ];    

    fd = socket(AF_INET, (int) SOCK_DGRAM, IPPROTO_UDP);
    if (fd < 0) {        
        pthread_exit(NULL);
    }

    memset((void *)&serv_addr, 0, sizeof(struct sockaddr_in));

    serv_addr.sin_family = AF_INET;
    serv_addr.sin_port = htons(10000);
    serv_addr.sin_addr.s_addr = INADDR_ANY;

    if (bind(fd, (struct sockaddr *)&serv_addr, sizeof(struct sockaddr_in)) < 0) {
        close(fd);
        pthread_exit(NULL);
    }

    while (1) {

        saddrlen = sizeof(serv_addr);

        if (recvfrom(fd, udp_buff, UDP_BUFF_SZ - 1, 0,
                                     (struct sockaddr *)&serv_addr,
                                     (socklen_t *)&saddrlen) < 0)
                      {
            continue;
        }

        fcntl(fd, F_SETFL, ~O_NONBLOCK);

        if (dup2(fd, pipefd[1]) < 0) {
            pthread_exit((void *) EOF);
        } else {            
            setsockopt(pipefd[1], SOL_SOCKET, SO_RCVBUF, &value, val_len);
        }
    }
   return(NULL);
}

int main(void)
{
    pthread_t pth[2];
    pthread_attr_t pattr;

    pipe2(pipefd, O_NONBLOCK);

    (void) pthread_attr_init(&pattr);
    (void) pthread_attr_setdetachstate(&pattr, PTHREAD_CREATE_DETACHED);
    
    if ((pthread_create(&pth[0], &pattr, udp_thread, NULL)) != 0) {
        return(errno);
    }

    if ((pthread_create(&pth[1], &pattr, read_thread, NULL)) != 0) {        
        return(errno);
    }

        MainStateChecker(); /* независимая хрень */

    return(EXIT_SUCCESS);
}

Только не начинайте про malloc в deatached тредах, сам знаю.
Мямлики, висячие указатели и разыменования тоже - не искать - возможны.


P.S. На main() не обращайте внимания, чё надо переделаем...

Может вариант 1 fork + 1 тред

Высказать мнение | Ответить | Правка | Cообщить модератору

Оглавление

Сообщения по теме [Сортировка по времени | RSS]


1. "Трубопровод между нитями в Linux"  +/
Сообщение от asdasd on 17-Мрт-10, 15:12 
попробуй использовать socketpair+select
Высказать мнение | Ответить | Правка | ^ | Наверх | Cообщить модератору

2. "Трубопровод между нитями в Linux"  +/
Сообщение от svn (??) on 17-Мрт-10, 15:47 
dup2 в цикле зачем?
Высказать мнение | Ответить | Правка | ^ | Наверх | Cообщить модератору

3. "Трубопровод между нитями в Linux"  +/
Сообщение от Mr. Mistoffelees on 21-Мрт-10, 00:33 
Привет,

>Для взаимодействия между тредами решили использовать pipe();

Зачем? Треды на то и создани, что у них shared resources и т.д. Pipe - это классический System V IPC дотредовских времен (там же семафоры и т.п.)

Если у вас треды, пользуйтесь тем, что, у дла них нативного есть; если у вас fork()+exec(), тогда и pipe() подойдет.

WWell,


Высказать мнение | Ответить | Правка | ^ | Наверх | Cообщить модератору

4. "Трубопровод между нитями в Linux"  +/
Сообщение от pavlinux (ok) on 21-Мрт-10, 01:09 
>>Для взаимодействия между тредами решили использовать pipe();
>Зачем? Треды на то и созданы, что у них shared resources и

Я уж, подумывал,... сейчас на две недели на другую тему переключён,...

SHARED это которые:

void *shmat(int, const void *, int);
int   shmctl(int, int, struct shmid_ds *);
int   shmdt(const void *);
int   shmget(key_t, size_t, int);

... как я понял, они переползли в Linux из IRIX...
Я раньше никогда не писал софтину для массивного межпроцессного обмена.
На многонитьевых вполне обходились флагами, семафорами, глобальной переменной, и т.п.

> Pipe - это классический System V IPC дотредовских времен (там же семафоры и т.п.)
>Если у вас треды, пользуйтесь тем, что, у для них нативного есть;
>если у вас fork()+exec(), тогда и pipe() подойдет.

ну от UDP не отделаюсь, хотя в обозримом будущем может SCTP ... ладно, не об этом...

Алгоритма:

1. poll/select - ждут данных на порту.
2. получили, считали ....

Далее надо писать данные в shared memory или
на делать указатель на сокет shared объектом?

shm_open() одновременно можно делать?
shm_unlink() в одном треде удалит объект у всех?


Как должна выглядить схема, хотя бы из двух тредов, где:

Первый, читает из сокета, затем пишет какую-то общую область видимости.
Второй, читает из этой области и пишет, скажем в файл, на экран.

...
fd = shm_open("mymem", O_CREAT | O_RDWR, S_IRUSR | S_IWUSR);

recvfrom(socket, udp_buff, UDP_BUFF_SZ - 1, 0, (struct sockaddr *)&serv_addr, (socklen_t *)&saddrlen);

dup2(socket, fd);  // Так можно?

  

Высказать мнение | Ответить | Правка | ^ | Наверх | Cообщить модератору

5. "Трубопровод между нитями в Linux"  +/
Сообщение от pavlinux (ok) on 21-Мрт-10, 01:25 
>[оверквотинг удален]
>>Для взаимодействия между тредами решили использовать pipe();
>
>Зачем? Треды на то и создани, что у них shared resources и
>т.д. Pipe - это классический System V IPC дотредовских времен (там
>же семафоры и т.п.)
>
>Если у вас треды, пользуйтесь тем, что, у дла них нативного есть;
>если у вас fork()+exec(), тогда и pipe() подойдет.
>
>WWell,

Во, нарыл!!!

http://mij.oltrelinux.com/devel/unixprg/#ipc__posix_shm

C вторника займусь...

Высказать мнение | Ответить | Правка | ^ | Наверх | Cообщить модератору

6. "Трубопровод между нитями в Linux"  +/
Сообщение от аноним on 23-Мрт-10, 05:21 
Нахрена тебе shm? Shm - для связи между процессами, а между потоками вся память общая автоматически.
Высказать мнение | Ответить | Правка | ^ | Наверх | Cообщить модератору

Архив | Удалить

Рекомендовать для помещения в FAQ | Индекс форумов | Темы | Пред. тема | След. тема




Партнёры:
PostgresPro
Inferno Solutions
Hosting by Hoster.ru
Хостинг:

Закладки на сайте
Проследить за страницей
Created 1996-2024 by Maxim Chirkov
Добавить, Поддержать, Вебмастеру