The OpenNET Project / Index page

[ новости /+++ | форум | теги | ]

Процессы. Очереди сообщений. Взаимодействие между процессами. (ipc proccess thread block memory gcc fork lock)


<< Предыдущая ИНДЕКС Поиск в статьях src Установить закладку Перейти на закладку Следующая >>
Ключевые слова: ipc, proccess, thread, block, memory, gcc, fork, lock,  (найти похожие документы)
From: Kolobynin Alexey <alexey_ak0(at)mail.ru> Newsgroups: linuxfocus.org Date: Mon, 2 Aug 2004 14:31:37 +0000 (UTC) Subject: Процессы. Очереди сообщений. Взаимодействие между процессами. Оригинал: http://www.linuxfocus.org/Russian/November2002/article272.shtml Автор Leonardo Giordani <leo.giordani(at)libero.it> Об авторе: Студент факультета телекоммуникаций Миланского политехнического института, работает сетевым администратором, интересуется программированием (в основном на ассемблере и C/C++). C 1999 года работает исключительно с Linux/Unix. Перевод на Русский: Часть 1 - Kolobynin Alexey <alexey_ak0(at)mail.ru> Часть 2 - Dmitry Martsynkevitch <ursa(at)zaba.ru> Частьи 3,4,5 - Kirill Pukhlyakov <kirill(at)linuxfocus.org> Часть 1. Параллельное программирование - принципы и введение в процессы. Содержание: * Введение * Понятие процесса * Процессы в Linux и Unix * Многозадачность в libc * Рекомендуется к прочтению * Страница отзывов Резюме: Цель данного цикла статей -- познакомить читателя с понятием многозадачности и с ее реализацией в операционной системе Linux. Начиная с теоретических идей основ многозадачности, мы закончим написанием приложения, которое продемонстрирует взаимодействие между процессами по простому, но эффективному протоколу. Для понимания статьи вам нужны: * минимальные знания оболочки; * знания основ языка C (синтаксис, циклы, библиотеки). Все ссылки на страницы man помещены в скобки после имени команды. Все функции glibc описаны на страницах gnu info (info Libc, или введите info:/libc/Top в konqueror). Введение -------- Одним из поворотных пунктов в истории операционных систем было появление концепции мультипрограммирования -- техники чередования выполнения нескольких программ с целью более полного использования ресурсов системы. Представим себе обычную машину, на которой пользователь может одновременно поставить на выполнение текстовый процессор, аудиопроигрыватель, очередь печати, браузер и т.д. Это важное свойство современной операционной системы. Как мы узнаем позже, приведенный список программ -- лишь маленькая часть набора, который выполняется в каждый текущий момент на нашем компьютере, хоть и самая "бросающаяся в глаза" часть. Понятие процесса ---------------- Для реализации чередования выполнения программ необходимо произвести сильное усложнение операционной системы. Чтобы избежать конфликтов между выполняемыми программами, необходимо хранить в памяти вместе с программами и всю информацию нужную для их выполнения. Перед тем как разбираться, что происходит внутри нашего Linux, определимся с технической терминологией: пусть дана выполняющаяся ПРОГРАММА, в данный момент времени КОД -- это набор инструкций, из которых она состоит, ОБЛАСТЬ ПАМЯТИ -- это часть машинной памяти, занятой ее данными, СОСТОЯНИЕ ПРОЦЕССОРА -- значения параметров микропроцессора, такие как флаги или Счетчик Команд (адрес следующей инструкции для выполнения). Определим понятие ВЫПОЛНЯЮЩАЯСЯ ПРОГРАММА как совокупность КОДА, ОБЛАСТИ ПАМЯТИ и СОСТОЯНИЯ ПРОЦЕССОРА. Если в какое-то время работы машины сохранить эту информацию и заменить ее на тот же набор другой выполняющейся программы, то выполнение последней будет продолжено с точки, где она была остановлена ранее: проделывая эту процедуру поочередно для первой и второй программы, мы получим ранее описанное чередование выполнения программ. Термин ПРОЦЕСС (или ЗАДАЧА) используется для обозначения выполняющейся программы. Рассмотрим, что происходит с машиной, о которой мы говорили во введении: в каждый момент времени выполняется только одна задача (есть только один микропроцессор, и он не может делать два дела одновременно), и машина исполняет часть ее кода; после определенного промежутка времени, называемого КВАНТОМ, выполняющийся процесс останавливается, его информация сохраняется и заменяется на ту же информацию другого процесса, чей код будет выполняться в следующий квант времени и т.д. Это то, что мы называем многозадачностью. Как было отмечено во введении, многозадачность порождает набор проблем, решение многих из них нетривиально, например управление очередью остановленных процессов (ПЛАНИРОВКА). Тем не менее, они должны быть решены в архитектуре каждой операционной системы. Возможно, этим проблемам будет посвящена одна из последующих статей, может быть с представлением некоторых частей кода ядра Linux. Процессы в Linux и Unix ----------------------- Давайте узнаем кое-что о процессах, выполняющихся на нашей машине. Команда, которая даст нам нужную информацию, называется ps(1) - это сокращение от "process status" (англ. -- состояние процесса). Открыв обычную командную строку и введя команду ps, мы получим что-то вроде PID TTY TIME CMD 2241 ttyp4 00:00:00 bash 2346 ttyp4 00:00:00 ps Я говорил, что данный список не полный, однако давайте на минуту здесь остановимся: ps выдал нам список процессов, выполняющихся на текущем терминале. Мы видим в последнем столбце имя, при помощи которого был запущен процесс (например "mozilla" для браузера Mozilla или "gcc" для GNU Compiler Collection). "ps" появляется в этом списке, так как эта команда выполнялась, когда выводился список выполняющихся процессов. Другой выведенный процесс -- это Bourne Again Shell -- оболочка, работающая на моих терминалах. Пропустим (пока) информацию в столбцах TIME и TTY и рассмотрим PID -- Process IDentifier (англ. -- идентификатор процесса). pid -- уникальное положительное число, которое присваивается каждому выполняющемуся процессу. Если выполнение процесса завершилось, его pid может быть использован заново, однако гарантируется, что во время выполнения процесса, его pid остается неизменным. Из всего этого следует, что вывод, который каждый из вас будет получать при выполнении ps, может отличаться от приведенного выше. Чтобы удостоверится, что я говорю правду, откроем еще одну оболочку, не закрывая первую, и запустим ps: теперь команда выдаст нам тот же список процессов, однако с другими номерами pid, что свидетельствует о том, что это разные процессы, хотя программы одни и те же. Мы можем также получить список всех процессов, запущенных на нашей машине: страница man говорит, что ключ -e обозначает "выбрать все процессы". Введем "ps -e" в терминале, и ps выведет нам длинный список, отформатированный как выше. Чтобы нам было удобнее анализировать этот список, можем перенаправить вывод в файл ps.log: ps -e > ps.log Теперь мы можем посмотреть этот файл, открыв его в любимом редакторе (или просто командой less). Как говорилось в начале статьи, число выполняющихся процессов больше чем мы могли ожидать. Теперь мы видим, что список содержит не только процессы, запущенные нами (через командную строку или графическую оболочку), но и множество других, некоторые из которых имеют странные имена. Количество и состав процессов в списке зависит от конфигурации вашей системы, однако есть несколько процессов, присутствующих на всех машинах. Во-первых, вне зависимости от конфигурации процесс с pid равным 1 -- это всегда "init", прародитель всех процессов. Он имеет первый pid, так как это первый процесс, запускаемый операционной системой. Также мы можем легко заметить наличие множества процессов с именами, заканчивающимися на "d" -- это так называемые "демоны" -- одни из самых важных процессов в системе. Мы в подробностях изучим init и демонов в одной из следующих статей. Многозадачность в libc ---------------------- Теперь, когда мы имеем представление о процессе и его важной роли в нашей операционной системе, начнем писать многозадачный код. От простого одновременного выполнения процессов мы перейдем к новой проблеме: как организовать связь между параллельными процессами и их синхронизацию. Мы найдем два элегантных решения этих проблем: сообщения и семафоры, однако последние будут глубже рассмотрены в будущей статье, посвященной потокам. После сообщений, у нас будет время, чтобы начать писать нашу программу, основанную на этих идеях. Стандартная библиотека C (libc, реализованная в Linux в glibc), использует возможности многозадачности Unix System V. Unix System V (далее SysV) -- коммерческая реализация Unix, породившая одно из двух самых важных семейтв Unix, второе -- BSD Unix. В libc тип pid_t определен как целое, способное вместить в себе pid. Впредь мы будем использовать этот тип для работы с pid, однако это нужно только для ясности: использование целого типа дало бы тот же результат. Рассмотрим функцию, которая сообщает нам pid процесса, содержащего нашу программу pid_t getpid (void) (она определена вместе с pid_t в unistd.h и sys/types.h) и напишем программу, которая выведет в стандартный вывод свой pid. При помощи любого редактора напишите следующий код -------------------------------------------------------------- #include <unistd.h> #include <sys/types.h> #include <stdio.h> int main() { pid_t pid; pid = getpid(); printf("pid присвоенный процессу - %d\n", pid); return 0; } -------------------------------------------------------------- Сохраните программу в print_pid.c и скомпилируйте ее. gcc -Wall -o print_pid print_pid.c Команда создаст исполняемый файл print_pid. Я напоминаю, что текущая директория не содержится в path, поэтому необходимо запустить программу как "./print_pid". Запустив программу, мы не получим ничего сногсшибательного: она выведет нам положительное число, и, если продолжать запускать ее, вы увидите, что это число будет постоянно увеличиваться на единицу; хотя это может быть и не так, потому что в перерыве между запусками может быть создан другой процесс. Попробуйте, например, выполнить ps между двумя запусками print_pid... Теперь время научится создавать процессы, но сначала я скажу несколько слов о том, что в действительности происходит при этом. Когда программа (содержащаяся в процессе A) создает новый процесс (B), они оба идентичны, то есть у них одинаковый код, их память наполнена одинаковыми данными (однако области различны) и имеют одинаковое состояние процессора. С этого момента они могут выполнять различные участки кода, например, в зависимости от ввода пользователя или некоторых произвольных данных. Процесс A -- "родительский процесс", а B -- "дочерний". Теперь мы можем лучше понять название "прародитель всех процессов", которое мы дали init. Вот функция, которая создает новый процесс pid_t fork(void) При ее вызове происходит разветвление выполнения процесса, отчего и происходит название функции (to fork, англ. -- ветвиться). Число, которое она возвращает -- это pid, однако тут надо обратить кое на что внимание. Мы говорили, что текущий процесс дублируется в родительском и дочернем, которые будут выполняться, чередуясь с другими выполняющимися процессами, производя различные действия. Однако какой процесс будет выполняться сразу после создания копии: родительский или дочерний? Ну, ответ прост: один из двух. Решение, какой процесс должен выполняться, принимается частью операционной системы, которая называется планировщиком, и она не принимает во внимание, является процесс родительским или дочерним, работая по алгоритму, основанному на других параметрах. Как бы то ни было, но нам важно знать какой процесс выполняется, так как код у них одинаковый. Оба процесса содержат коды, как для родительские, так и для дочерние, однако оба они должны выполнить только свой набор кодов. Чтобы прояснить это, взглянем на алгоритм: - РАЗВЕТВИТЬ - ЕСЛИ ТЫ ДОЧЕРНИЙ ПРОЦЕСС ВЫПОЛНИТЬ (...) - ЕСЛИ ТЫ РОДИТЕЛЬСКИЙ ПРОЦЕСС ВЫПОЛНИТЬ (...) который представляет собой код нашей программы, написанный на некотором метаязыке. Откроем тайну: функция fork возвращает '0' в дочерний процесс и pid дочернего процесса в родительский. Поэтому достаточно сверить возвращенный pid с нулем, и мы будем знать, какой процесс выполняет этот код. На языке C мы получим int main() { pid_t pid; pid = fork(); if (pid == 0) { КОД ДОЧЕРНЕГО ПРОЦЕССА } КОД РОДИТЕЛЬСКОГО ПРОЦЕССА } Теперь напишем первый настоящий пример многозадачного кода: вы можете сохранить его в файле fork_demo.c и скомпилировать как ранее. Я поместил номера строк исключительно для ясности. Программа сделает разветвление и оба процесса: родительский и дочерний, выведут кое-что на экран. В результате то, что мы увидим, будет чередованием этих выводов (если все будет правильно). (01) #include <unistd.h> (02) #include <sys/types.h> (03) #include <stdio.h> (04) int main() (05) { (05) pid_t pid; (06) int i; (07) pid = fork(); (08) if (pid == 0){ (09) for (i = 0; i < 8; i++){ (10) printf("-ДОЧЕРНИЙ-\n"); (11) } (12) return(0); (13) } (14) for (i = 0; i < 8; i++){ (15) printf("+РОДИТЕЛЬСКИЙ+\n"); (16) } (17) return(0); (18) } Строки (01)--(03) содержат включения заголовочных файлов необходимых библиотек (стандартный ввод/вывод, многозадачность). Функция main (как обычно в GNU) возвращает целое, которое равно нулю, если все прошло без ошибок и код ошибки, если что-то случилось не то. Давайте пока будем считать, что все выполняется без ошибок (мы добавим обработку ошибок, когда уясним основные идеи). Далее, мы определяем переменную для pid (05) и целое для счетчика в циклах. Типы этих переменных, как замечалось ранее, одинаковы, однако тут они указаны различными для ясности. В строке (07) мы вызываем функцию fork, которая возвратит нуль в программу, выполняющуюся в дочернем процессе, и pid дочернего процесса в родительском; проверка производится в строке (08). Теперь код строк (09)--(13) будет исполнен в дочернем процессе, а оставшийся код (14)--(16) в родительском. Эти части кода просто выводят 8 раз в стандартный вывод слово "-ДОЧЕРНИЙ-" или "+РОДИТЕЛЬСКИЙ+" в зависимости от того, какой процесс выполняется, а затем завершают выполнение, возвращая 0. Последнее по-настоящему важно, так как без этого "return" дочерний процесс после завершения цикла будет выполнять далее код родительского (попробуйте, это не повредит вашей машине, просто произойдет то, чего мы не хотим). Подобные ошибки очень сложно будет обнаружить, так как выполнение многозадачных программ (особенно сложных) дает различные результаты при каждом выполнении, отлаживать их пользуясь результатами просто невозможно. Возможно, вы будете не удовлетворены выполнением программы: я не могу утверждать, что результатом будет смесь из двух строк, все зависит от скорости выполнения такого короткого цикла. Возможно, на выходе вы получите последовательность строк "+РОДИТЕЛЬСКИЙ+", а затем строк "-ДОЧЕРНИЙ-" или наоборот. Тогда попробуйте еще несколько раз выполнить программу, результат может поменяться. Вставляя задержку случайной длины перед каждым вызовом prinf, мы сможем нагляднее увидеть эффект многозадачности: мы сделаем это при помощи функций sleep и rand. sleep(rand()%4) это заставит программу "заснуть" на случайное число секунд: от 0 до 3 (% возвращает остаток от целочисленного деления). Теперь наш код выглядит так (09) for (i = 0; i < 8; i++){ (->) sleep (rand()%4); (10) printf("-ДОЧЕРНИЙ-\n"); (11) } то же сделаем и с кодом родительского процесса. Сохраним программу в fork_demo2.c, скомпилируем и выполним ее. Теперь она выполняется медленнее, зато мы заметим отличие в порядке вывода: [leo@mobile ipc2]$ ./fork_demo2 -ДОЧЕРНИЙ- +РОДИТЕЛЬСКИЙ+ +РОДИТЕЛЬСКИЙ+ -ДОЧЕРНИЙ- -ДОЧЕРНИЙ- +РОДИТЕЛЬСКИЙ+ +РОДИТЕЛЬСКИЙ+ -ДОЧЕРНИЙ- -ДОЧЕРНИЙ- +РОДИТЕЛЬСКИЙ+ +РОДИТЕЛЬСКИЙ+ -ДОЧЕРНИЙ- -ДОЧЕРНИЙ- -ДОЧЕРНИЙ- +РОДИТЕЛЬСКИЙ+ +РОДИТЕЛЬСКИЙ+ [leo@mobile ipc2]$ Теперь рассмотрим проблемы, которые встали перед нами сейчас: мы можем создать несколько дочерних процессов данного родительского, так чтобы они выполняли операции, отличные от операций родительского процесса, параллельно. Часто родительскому процессу необходимо обмениваться информацией с дочерними или хотя бы синхронизироваться с ними, чтобы выполнять операции в нужное время. Первый способ синхронизации процессов -- функция wait pid_t waitpid (pid_t PID, int *STATUS_PTR, int OPTIONS) где PID -- это PID ожидаемого процесса, STATUS_PTR -- указатель на целое, которое будет содержать статус дочернего процесса (NULL, если эта информация не нужна), а OPTIONS -- это набор опций, на которые мы сейчас не будем обращать внимание. Вот пример программы, где родительский процесс создает дочерний и ждет его завершения -------------------------------------------------------------- #include <unistd.h> #include <sys/types.h> #include <stdio.h> int main() { pid_t pid; int i; pid = fork(); if (pid == 0){ for (i = 0; i < 14; i++){ sleep (rand()%4); printf("-ДОЧЕРНИЙ-\n"); } return 0; } sleep (rand()%4); printf("+РОДИТЕЛЬСКИЙ+ Ожидаю завершения выполнения дочернего процесса...\n"); waitpid (pid, NULL, 0); printf("+РОДИТЕЛЬСКИЙ+ ...завершен\n"); return 0; } -------------------------------------------------------------- Функция sleep введена в код родительского процесса, чтобы сделать различными результаты выполнения программы. Сохраним код в fork_demo3.c, скомпилируем его и выполним. Мы только что написали наше первое многозадачное синхронизированное приложение! В следующей статье мы узнаем больше о синхронизации и взаимодействии между процессами. А сейчас напишите несколько программ, используя описанные функции, и пришлите их мне, чтобы я мог использовать их для демонстрации хороших решений и ошибок. Присылайте мне и .c файл с комментариями, и небольшой текстовый файл с описанием программы, свое имя и адрес электронной почты. Удачно поработать! Рекомендуется к прочтению * Silberschatz, Galvin, Gagne, Operating System Concepts - Sixth Edition, Wiley&Sons, 2001 * Tanenbaum, WoodHull, Operating Systems: Design and Implementation - Second Edition, Prentice Hall, 2000 * Stallings, Operating Systems - Fourth Edition, Prentice Hall, 2002 * Bovet, Cesati, Understanding the Linux Kernel, O'Reilly, 2000 На русском языке * Операционная система UNIX / А.М. Робачевский. - СПб.: БХВ-Петербург, 2001. * Сетевые операционные системы / В.Г. Олифер, Н.А. Олифер. - СПб.: Питер, 2001.
Часть 2. Параллельное программирование -- взаимодействие между процессами. Оригинал: http://www.linuxfocus.org/Russian/January2003/article281.shtml Содержание: * Введение * Ключи SysV * Семафоры * Рекомендуемая литература * Страница отзывов Введение -------- Ну вот и опять мы боремся с мультизадачностью в Linux. Как мы видели в предыдущей статье, чтобы создать новый процесс, нужно всего несколько строк в коде, так как операционная система берёт на себя инициализацию, управление и распределение рабочего времени процесса, созданного нами. Это свойство системы является фундаментальным, это ``контроль выполнения процессов'', контроль до такой степени, что процессы исполняются в своих собственных адресных пространствах. Потеря контроля над выполнением процесса приводит разработчика к проблеме синхронизации, которую можно выразить следующим вопросом: как сделать возможной совместную работу двух процессов? На самом деле проблема несколько более сложная, чем может показаться: это не только вопрос одновременной работы программ, но также и вопрос одновременного использования одних данных, как для чтения, так и для записи. Поговорим о некоторых классических проблемах одновременного использования данных; если два процесса одновременно читают один набор данных, то это, очевидно, не создаёт проблем, и выполнение процессов -- последовательное. Пусть теперь один процесс изменяет набор данных: результат работы второго процесса будет зависеть от того, прочёл процесс данные до или после их изменения. Например: у нас есть два процесса "А" и "В" и целое число "d". Процесс А увеличивает d на единицу, процесс В печатает значение d. Это можно записать на условном языке так: A { d->d+1 } & B { d->output } здесь "&" означает одновременное выполнение процессов. Сначала может быть выполнен процесс А, (-) d = 5 (A) d = 6 (B) output = 6 а может и процесс В: (-) d = 5 (B) output = 5 (A) d = 6 Сразу понятно, как важно уметь правильно обращаться с такими ситуациями: риск противоречивости данных высок и неприемлем. Если вы всё ещё недооцениваете эту проблему, представьте, что набор данных -- это ваш банковский счёт... В предыдущей статье мы уже говорили о первом способе синхронизации --- использовании функции waitpid(2), позволяющей процессу подождать завершения другого процесса, работающего на том же наборе данных, и только затем продолжить свою работу. Очевидно, это не самый лучший способ: процесс вынужден простаивать в ожидании завершения работы вторым процессом. Неприятность заключается в том, что второй процесс может работать довольно долго, а общими данными пользоваться весьма короткий промежуток времени. Таким образом, нам необходимо увеличить "гранулированность" нашего управления, т.е. управлять отдельными наборами данных. Решение данной проблемы -- примитивы из стандартной библиотеки, известной как SysV IPC (Взаимодействие процессов в System V). Ключи SysV ---------- Прежде чем перейти к самой теории одновременности, давайте познакомимся с типичной SysV структурой: IPC ключами. IPC ключ -- это число, однозначно идентифицирующее IPC структуру управления (описывается ниже). Также ключ можно использовать для образования универсальных идентификаторов, т.е. для организации не IPC структур. Ключ создаётся функцией ftok(3). key_t ftok(const char *pathname, int proj_id); Для генерирования ключа ftok берёт имя существующего файла (pathname) и идентификатор процесса (proj_id). Алгоритм построения ключа не исключает возможности появления дубликатов, поэтому следует иметь маленькую библиотеку, просматривающую уже созданные ключи и не допускающую повторений. Семафоры -------- Идею управления дорожным движением с помощью семафоров можно без особых изменений перенести на управление доступом к данным. Семафор -- особая структура, содержащая число большее или равное нулю и управляющая цепочкой процессов, ожидающих особого состояния на данном семафоре. Хотя они и кажутся очень простыми, семафоры -- это очень мощное средство, а потому, на самом деле, весьма сложное. Начнём, как всегда, не рассматривая обработку ошибок: мы включим её в код, когда будем писать более сложную программу. Семафоры могут использоваться для контролирования доступа к ресурсам: число в семафоре представляет собой количество процессов, которые могут получить доступ к данным. Каждый раз, когда процесс обращается к данным, значение в семафоре, должно быть уменьшено на единицу, и увеличено, когда работа с данными будет прекращена. Если ресурс эксклюзивный, то есть к данным должен иметь доступ только один процесс, то начальное значение в семафоре следует установить единицей. Семафоры можно использовать и для других целей, например для счётчика ресурсов. В этом случае число в семафоре -- количество свободных ресурсов (например количество свободных ячеек памяти). Рассмотрим практическое применение семафоров. Пусть у нас есть буфер, в который несколько процессов S1,...,Sn могут писать, и только один процесс L может из него читать. Также операции нельзя выполнять одновременно (в данный момент времени только один процесс должен оперировать с буфером). Очевидно, что процессы Si могут писать всегда, когда буфер не полон, а процесс L может читать, когда буфер не пуст. Таким образом, нам необходимо три семафора: один управляет доступом к буферу, а два других следят за числом элементов в нём. Учитывая, что доступ к буферу должен быть эксклюзивным, первый семафор будет бинарным (его значение будет нулём или единицей), в то время как второй и третий будут принимать значения, зависящие от размера буфера. Рассмотрим, как реализованы семафоры на C, в SysV. Создаёт семафор функция semget(2) int semget(key_t key, int nsems, int semflg); здесь key -- IPC ключ, nsems -- число семафоров, которое мы хотим создать, и semflg -- права доступа, закодированные в 12 бит: первые три бита отвечают за режим создания, остальные девять -- права на запись и чтение для пользователя, группы и остальных (заметьте сходство с файловой системой в Unix). За более полной информацией загляните в man страницы ipc(5). Как вы видите SysV создаёт сразу несколько семафоров, что уменьшает код. Давайте создадим наш первый семафор -------------------------------------------------------------- #include <stdio.h> #include <stdlib.h> #include <linux/types.h> #include <linux/ipc.h> #include <linux/sem.h> int main(void) { key_t key; int semid; key = ftok("/etc/fstab", getpid()); /* создать только один семафор: */ semid = semget(key, 1, 0666 | IPC_CREAT); return 0; } -------------------------------------------------------------- Далее нам надо выяснить как управлять семафорами, и как удалять их. Управление происходит с помощью функции semctl(2), int semctl(int semid, int semnum, int cmd, ...) которая выполняет действие cmd на наборе семафоров semid или (если требуется командой) на одном семафоре с номером semnum. Мы расскажем о свойствах это команды, когда станет необходимо, полный же список свойств доступен на man страницах. В зависимости от команды, может понадобится указать ещё один аргумент следующего типа: union semun { int val; /* значение для SETVAL */ struct semid_ds *buf; /* буферы для IPC_STAT, IPC_SET */ unsigned short *array; /* массивы для GETALL, SETALL */ /* часть, особенная для Linux: */ struct seminfo *__buf; /* буфер для IPC_INFO */ }; Чтобы изменить значение семафора, используют директиву SETVAL, новое значение должно быть указано в semun; давайте модифицируем приведённую выше программу, устанавливая в семафоре значение 1. [...] /* создать только один семафор */ semid = semget(key, 1, 0666 | IPC_CREAT); /* в семафоре 0 установить значение 1 */ arg.val = 1; semctl(semid, 0, SETVAL, arg); [...] Теперь необходимо удалить семафор, освобождая структуры, использовавшиеся для управления им; это выполняет директива IPC_RMID. Она удаляет семафор и посылает сообщение об этом всем процессам, ожидающим доступа к ресурсу. Последний раз изменим программу: [...] /* в семафоре 0 установить значение 1 */ arg.val = 1; semctl(semid, 0, SETVAL, arg); /* удалить семафор */ semctl(semid, 0, IPC_RMID); [...] Как вы уже поняли, создание и управление структурами контроля за параллельным выполнением программ достаточно просто, когда мы добавим обработку ошибок, всё станет несколько более сложно, но только в смысле сложности кода. Использовать семафор можно с помощью процедуры semop(2), int semop(int semid, struct sembuf *sops, unsigned nsops); здесь semid -- идентификатор набора семафоров, sops -- массив, содержащий операции, которые необходимо произвести, nsops -- число этих операций. Каждая операция представляется структурой sembuf. unsigned short sem_num; short sem_op; short sem_flg; т.е номером семафора в множестве (sem_num), операцией (sem_op) и флагом, устанавливающим режим ожидания; пусть пока он будет нулём. Операции, которые мы можем указать, являются целыми числами и подчиняются следующим правилам: 1. sem_op < 0 Если модуль значения в семафоре больше или равен модулю sem_op, то sem_op добавляется к значению в семафоре (т.е. значение в семафоре уменьшается). Если модуль sem_op больше, то процесс переходит в спящий режим, пока не будет достаточно ресурсов. 2. sem_op = 0 Процесс спит пока значение в семафоре не достигнет нуля. 3. sem_op > 0 Значение sem_op добавляется к значению в семафоре, используемый ресурс освобождается. Следующая программа представляет пример использования семафоров, реализуя предыдущий пример с буфером: мы создадим пять процессов W и один процесс R. Процессы W будут пытаться получить доступ к ресурсу (буферу), закрывая его через семафор, и, если буфер не полон, будут класть в него элемент и освобождать ресурс. Процесс R будет закрывать ресурс, брать из него элемент, если буфер не пуст, и разблокировать ресурс. Чтение и запись в буфер на самом деле ненастоящие: так происходит потому, что, как обсуждалось в предыдущей статье, каждый процесс выполняется в своём собственной области памяти и не может обращаться к памяти другого процесса. Это делает настоящее управление буфером шестью процессами невозможным, так как каждый процесс будет видеть свою копию буфера. всё встанет на свои места, когда мы будем говорить о разделяемой памяти, но давайте быть последовательными. Почему нам нужно три семафора? Первый (с номером 0) действует как замок к буферу, и его максимальное значение равно единице, остальные два отвечают за переполнение и наличие элементов в буфере. Одним семафором этого не добиться. Потребность в двух семафорах связана с особенностью работы функции semop. Если, например, процессы W уменьшают значение в семафоре, отвечающем за свободное место в буфере, до нуля, то процесс R может увеличивать это значение до бесконечности. Поэтому такой семафор не может указывать на отсутствие элементов в буфере. -------------------------------------------------------------- #include <stdio.h> #include <stdlib.h> #include <errno.h> #include <linux/types.h> #include <linux/ipc.h> #include <linux/sem.h> int main(int argc, char *argv[]) { /* IPC */ pid_t pid; key_t key; int semid; union semun arg; struct sembuf lock_res = {0, -1, 0}; struct sembuf rel_res = {0, 1, 0}; struct sembuf push[2] = {1, -1, IPC_NOWAIT, 2, 1, IPC_NOWAIT}; struct sembuf pop[2] = {1, 1, IPC_NOWAIT, 2, -1, IPC_NOWAIT}; /* Остальное */ int i; if(argc < 2){ printf("Usage: bufdemo [dimensione]\n"); exit(0); } /* Семафоры */ key = ftok("/etc/fstab", getpid()); /* Создать набор из трёх семафоров */ semid = semget(key, 3, 0666 | IPC_CREAT); /* Установить в семафоре номер 0 (Контроллер ресурсов) значение "1" */ arg.val = 1; semctl(semid, 0, SETVAL, arg); /* Установить в семафоре номер 1 (Контроллер свободного места) значение длины буфера */ arg.val = atol(argv[1]); semctl(semid, 1, SETVAL, arg); /* Установить в семафоре номер 2 (Контроллер элементов в буфере) значение "0" */ arg.val = 0; semctl(semid, 2, SETVAL, arg); /* Fork */ for (i = 0; i < 5; i++){ pid = fork(); if (!pid){ for (i = 0; i < 20; i++){ sleep(rand()%6); /* Попытаться заблокировать ресурс (семафор номер 0) */ if (semop(semid, &lock_res, 1) == -1){ perror("semop:lock_res"); } /* Уменьшить свободное место (семафор номер 1) / Добавить элемент (семафор номер 2) */ if (semop(semid, &push, 2) != -1){ printf("---> Process:%d\n", getpid()); } else{ printf("---> Process:%d BUFFER FULL\n", getpid()); } /* Разблокировать ресурс */ semop(semid, &rel_res, 1); } exit(0); } } for (i = 0;i < 100; i++){ sleep(rand()%3); /* Попытаться заблокировать ресурс (семафор номер 0)*/ if (semop(semid, &lock_res, 1) == -1){ perror("semop:lock_res"); } /* Увеличить свободное место (семафор номер 1) / Взять элемент (семафор номер 2) */ if (semop(semid, &pop, 2) != -1){ printf("<--- Process:%d\n", getpid()); } else printf("<--- Process:%d BUFFER EMPTY\n", getpid()); /* Разблокировать ресурс */ semop(semid, &rel_res, 1); } /* Удалить семафоры */ semctl(semid, 0, IPC_RMID); return 0; } -------------------------------------------------------------- Прокомментируем наиболее интересные части кода: struct sembuf lock_res = {0, -1, 0}; struct sembuf rel_res = {0, 1, 0}; struct sembuf push[2] = {1, -1, IPC_NOWAIT, 2, 1, IPC_NOWAIT}; struct sembuf pop[2] = {1, 1, IPC_NOWAIT, 2, -1, IPC_NOWAIT}; Эти четыре строки -- действия, которые мы можем производить над семафорами: первые две -- содержат по одному действия каждая, вторые -- по две. Первое действие, lock_res, блокирует ресурс: оно уменьшает значение первого (номер 0) семафора на единицу (если значение в семафоре не нуль), а если ресурс уже занят, то процесс ждёт. Действие rel_res аналогично lock_res, только значение в первом семафоре увеличивается на единицу, т.е. убирается блокировка ресурса. Действия push и pop несколько отличаются от первых: это массивы из двух действий. Первое действие над семафором номер 1, второе -- над семафором номер 2; одно увеличивает значение в семафоре, другое уменьшает, но теперь процесс не будет ждать освобождения ресурса: IPC_NOWAIT заставляет его продолжить работу, если ресурс заблокирован. /* Установить в семафоре номер 0 (Контроллер ресурсов) значение "1" */ arg.val = 1; semctl(semid, 0, SETVAL, arg); /* Установить в семафоре номер 1 (Контроллер свободного места) значение длины буфера */ arg.val = atol(argv[1]); semctl(semid, 1, SETVAL, arg); /* Установить в семафоре номер 2 (Контроллер элементов в буфере) значение "0" */ arg.val = 0; semctl(semid, 2, SETVAL, arg); Здесь мы инициализируем значения в семафорах: в первом -- единицей, так как он контролирует доступ к ресурсу, во втором -- длиной буфера (заданной в командной строке), в третьем -- нулём (т.е. числом элементов в буфере). /* Попытаться заблокировать ресурс (семафор номер 0) */ if (semop(semid, &lock_res, 1) == -1){ perror("semop:lock_res"); } /* Уменьшить свободное место (семафор номер 1) / Добавить элемент (семафор номер 2) */ if (semop(semid, &push, 2) != -1){ printf("---> Process:%d\n", getpid()); } else{ printf("---> Process:%d BUFFER FULL\n", getpid()); } /* Освободить ресурс */ semop(semid, &rel_res, 1); Процесс W пытается заблокировать ресурс посредством действия lock_res; как только это ему удаётся, он добавляет элемент в буфер посредством действия push и выводит сообщение об этом на стандартный вывод. Если операция не может быть произведена, процесс выводит сообщение о заполнении буфера. В конце процесс освобождает ресурс. /* Попытаться заблокировать ресурс (семафор номер 0) */ if (semop(semid, &lock_res, 1) == -1){ perror("semop:lock_res"); } /* Увеличить свободное место (семафор номер 1) / Взять элемент (семафор номер 2) */ if (semop(semid, &pop, 2) != -1){ printf("<--- Process:%d\n", getpid()); } else printf("<--- Process:%d BUFFER EMPTY\n", getpid()); /* Отпустить ресурс */ semop(semid, &rel_res, 1); Процесс R ведёт себя практически так же как и W процесс: блокирует ресурс, производит действие pop, освобождает ресурс. В следующей статье мы поговорим об очередях сообщений: другой структуре для межпроцессового общения и синхронизации. Как всегда, если вы пишете что-нибудь простое, используя информацию из этой статьи, присылайте это мне, с вашим именем и e-mail адресом, буду рад прочитать. Удачи! Рекомендуемая литература * Silberschatz, Galvin, Gagne, Operating System Concepts - Sixth Edition, Wiley&Sons, 2001 * Tanenbaum, WoodHull, Operating Systems: Design and Implementation - Second Edition, Prentice Hall, 2000 * Stallings, Operating Systems - Fourth Edition, Prentice Hall, 2002 * Bovet, Cesati, Understanding the Linux Kernel, O'Reilly, 2000 * The Linux Programmer's Guide: http://www.tldp.org/LDP/lpg/index.html * Linux Kernel 2.4 Internals http://www.tldp.org/LDP/lki/lki-5.html
Часть 3. Параллельное программирование - очереди сообщений (1) Оригинал: http://www.linuxfocus.org/Russian/March2003/article287.shtml Содержание: * Вступление * Теория очереди сообщений * Создание протокола * Очереди сообщений в System V * Рекомендуемая литература * Страница отзывов Вступление ---------- В прошлых заметках мы начали изучать параллельное программирование, в частности узнали как осуществить межпроцессорное взаимодействие с помощью семафоров. Используя их мы можем управлять доступом к разделяемым ресурсам, что позволяет нам синхронизировать работу двух и более процессов. Задача синхронизации - выделение процессам времени на работу, но не абсолютного времени сиситемы ( когда назначается точное время начала работы ), а относительное, когда мы можем выделить время для какого-либо процесса в первую очередь, а для другого - во вторую и т.д. Использование семафоров для этого достаточно сложное и к тому же ограниченное решение - сложное потому, что каждый процесс должен управлять семафором для другого, с которым необходимо синхронизироваться, а ограниченное, потому, что это не позволяет нам осуществлять обмен параметрами между процессами. Представьте ситуацию создания нового процесса - событие создания его должно быть послано каждому выполняющемуся процессу - семафоры не позволяют сделать этого. Кроме того, параллельный контроль доступа к разделяемым ресурсам посредством семафоров может привести к длительной блокировке процесса, когда другой процесс освободит и вновь заблокирует ресурс до того как какой-н другой процесс возьмет контроль над ним. Как мы уже узнали, в мире параллельного программирования невозможно узнать заранее какой и когда процесс будет запущен. Эти короткие размышления позволяют нам понять, что семафоры - неподходящий инструмент для решения сложных проблем синхронизации. Элегантное решение - использование очередей сообщений - в этой заметке мы рассмотрим теорию взаимодействия процессов и напишем небольшую программу, используя средства SysV. Теория очереди сообщений ------------------------ Каждый процесс способен создать любое количество структур называемых очередями: каждая структура может содержать любое количество сообщений разных типов, которые имеют разную природу и содержат любую информацию; каждый процесс способен послать сообщение в очередь, зная ее идентификатор. Также процесс способен получить доступ к очереди и прочитать сообщения в хронологическом порядке ( начиная с самого первого, последнего, самого недавнего и последнего, поступившего в очередь ), но выборочно, т.е. сообщения только определенного типа, что обеспечивает контроль за возможностью прочитать эти сообщения. Использование очереди легко понять представив ее в виде почтовой системы между процессами: каждый процесс обладает адресом для взаимодействия с другими процессами. Процесс читает сообщения, предназначенные ему и дальнейшая его работа зависит от этих сообщений. Итак, синхронизация двух процессов достигается использованием сообщений: ресурсы по-прежнему будут обладать семафорами, чтобы процессы знали их статус, а разделение времени работы происходит при помощи сообщений. Теперь вы понимаете, что использование сообщений не такая большая сложность как это могло показаться сначала. Перед тем как мы узнаем как использовать механизм сообщений в языке 'C' необходимо рассмотреть еще один аспект синхронизации - коммуникационный протокол. Создание протокола ------------------ Протоколом называется набор правил, с помощью которого происходит взаимодействие объектов; в прошлой заметке мы рассмотрели создание одного из самых простых протоколов на основе семафоров, который заставляет процессы работать в соответствии с их статусом. Использование очередей сообщений позволяет нам создавать более сложные протоколы; важно понять, что все сетевые протоколы (TCP/IP, DNS, SMTP, ...) построены на архитектуре обмена сообщениями. Все достаточно просто - нет никакой разницы на одном ли компьютере происходит взаимодействие процессов или между разными. Как мы увидим в следующей заметке, рассматривая распределенную работу ( несколько компьютеров во взаимодействии ) - это все очень просто. Рассмотрим простой протокол, основанный на обмене сообщениями: два процесса ( А и В ) выполняются параллельно и работают с разными данными: после окончания работы каждого им необходимо обменяться данными. Простой протокол их взаимодействия выглядит следующим образом: ПРОЦЕСС B: * работает со своими данными * по окончании работы посылает сообщение процессу А * получив ответ от процесса A - начинает передачу ему данных ПРОЦЕСС A: * работает со своими данными * ожидает сообщение от процесса B * отвечает на сообщение * принимает данные и объединяет со своими Выбор процесса ответственного за объединение данных достаточно условен в нашем примере; в реальной жизни это зависит от природы взаимодействующих процессов ( клиент/сервер ), но это тема отдельной заметки. Рассмотренный протокол легко применим к n процессам. Любой процесс, кроме А, обрабатывает свои данные и затем посылает сообщение процессу А. После ответа процесса А ему пересылаются данные, нет необходимости менять структуру процессов, кроме А. Очереди сообщений в System V ---------------------------- Теперь рассмотрим реализацию этого механизмав ОС Linux. Как было сказано ранее нам доступен набор примитивов для работы со структурами, представляющими основу механизма сообщений и эта работа схожа с управлением семафорами: я надеюсь, что читатель знаком с основами создания процессов, использования системных вызывов и IPC ключами. Структура для описания сообщения называется msgbuf ;и объявлена в linux/msg.h /* message buffer for msgsnd and msgrcv calls */ struct msgbuf { long mtype; /* type of message */ char mtext[1]; /* message text */ }; Поле mtype описывает тип сообщения и всегда имеет положительное значение: соответствие между типами сообщений и их числовым представлением необходимо определить заранее - это часть протокола. Второе поле представляет само сообщение. Структура msgbuf может быть переопределена и содержать более сложные данные; например мы можем определить ее так: struct message { long mtype; /* message type */ long sender; /* sender id */ long receiver; /* receiver id */ struct info data; /* message content */ ... }; Прототип сообщения может иметь максимальный размер до 4056 байт. Естественно, мы можем перекомпилировать ядро и увеличить это значение, но, тогда наше приложение станет непереносимым ( кроме того, этот размер был точно вычислен и его большое увеличение не является хорошей идеей ). Для создания новой очереди процесс использует функцию msgget() int msgget(key_t key, int msgflg) в которую необходимо передать аргументы и IPC ключ, который можно установить в IPC_CREAT | 0660 ( создать очередь, если она еще не существует и предоставить доступ владельцу и группе 'users' ). Эта функция возвращает идентификатор очереди. Как и в предыдущих заметках, мы подразумеваем, что ошибки не возникли, для упрощения нашего кода и в будущей заметке поступим также, несмотря на то, что будем говорить о безопасном IPC коде. Чтобы послать сообщение в очередь, зная ее идентификатор, необходимо использовать функцию msgsnd() int msgsnd(int msqid, struct msgbuf *msgp, int msgsz, int msgflg) где msqid идентификатор очереди, msgp указатель на сообщение, которое мы хотим послать ( тип которого определен как struct msgbuf но который мы переопределили ), msgsz размер сообщения ( исключая длину mtype типа long, т.е. 4 байта ) и msgflg флаг правила ожидания. Размер сообщения достаточно просто получить, используя следующее выражение: length = sizeof(struct message) - sizeof(long); что касается правила ожидания в случае полной очереди: если msgflg установлен в IPC_NOWAIT посылающий сигнал процесс не будет ждать освобождения очереди и выйдет с кодом ошибки: подробнее мы поговорим об этом позднее, когда будем рассматривать вопрос обработки ошибок. Чтобы прочитать сообщения, находящиеся в очереди необходимо использовать системную функцию msgrcv() int msgrcv(int msqid, struct msgbuf *msgp, int msgsz, long mtype, int msgflg) где msgp указатель на буфер, в котором мы будем читать сообщения, а mtype определяет список интересующих нас сообщений. Удаление очереди осуществляется вызовом функции msgctl() с флагом IPC_RMID msgctl(qid, IPC_RMID, 0) Теперь посмотрим на примере все о чем говорили выше: мы создадим очередь сообщений, пошлем в нее сообщение и прочитаем его, таким образом проконтролировав корректную работу системы. -------------------------------------------------------------- #include <stdio.h> #include <stdlib.h> #include <linux/ipc.h> #include <linux/msg.h> /* Redefines the struct msgbuf */ typedef struct mymsgbuf { long mtype; int int_num; float float_num; char ch; } mess_t; int main() { int qid; key_t msgkey; mess_t sent; mess_t received; int length; /* Initializes the seed of the pseudo-random number generator */ srand (time (0)); /* Length of the message */ length = sizeof(mess_t) - sizeof(long); msgkey = ftok(".",'m'); /* Creates the queue*/ qid = msgget(msgkey, IPC_CREAT | 0660); printf("QID = %d\n", qid); /* Builds a message */ sent.mtype = 1; sent.int_num = rand(); sent.float_num = (float)(rand())/3; sent.ch = 'f'; /* Sends the message */ msgsnd(qid, &sent, length, 0); printf("MESSAGE SENT...\n"); /* Receives the message */ msgrcv(qid, &received, length, sent.mtype, 0); printf("MESSAGE RECEIVED...\n"); /* Controls that received and sent messages are equal */ printf("Integer number = %d (sent %d) -- ", received.int_num, sent.int_num); if(received.int_num == sent.int_num) printf(" OK\n"); else printf("ERROR\n"); printf("Float numero = %f (sent %f) -- ", received.float_num, sent.float_num); if(received.float_num == sent.float_num) printf(" OK\n"); else printf("ERROR\n"); printf("Char = %c (sent %c) -- ", received.ch, sent.ch); if(received.ch == sent.ch) printf(" OK\n"); else printf("ERROR\n"); /* Destroys the queue */ msgctl(qid, IPC_RMID, 0); } -------------------------------------------------------------- Теперь еще один пример - создадим два процесса и заставим их взаимодействовать через очередь сообщений; остановимся немного на концепции порождения процессов: значения всех переменных родительского процесса переходят к порожденному ( memory copy ). Это значит, что нам необходимо создать очередь до порождения, чтобы и процесс-родитель и процесс-потомок могли знать идентификатор очереди и обращаться к ней. Что происходит в этой небольшой программе: посредством очереди попрожденный процесс пересылает данные процессу потомку - порожденный процесс генерирует случайные числа и пересылает их процессу родителю и они оба выводят их в стандартный поток вывода. -------------------------------------------------------------- #include <stdio.h> #include <stdlib.h> #include <linux/ipc.h> #include <linux/msg.h> #include <sys/types.h> /* Redefines the message structure */ typedef struct mymsgbuf { long mtype; int num; } mess_t; int main() { int qid; key_t msgkey; pid_t pid; mess_t buf; int length; int cont; length = sizeof(mess_t) - sizeof(long); msgkey = ftok(".",'m'); qid = msgget(msgkey, IPC_CREAT | 0660); if(!(pid = fork())){ printf("SON - QID = %d\n", qid); srand (time (0)); for(cont = 0; cont < 10; cont++){ sleep (rand()%4); buf.mtype = 1; buf.num = rand()%100; msgsnd(qid, &buf, length, 0); printf("SON - MESSAGE NUMBER %d: %d\n", cont+1, buf.num); } return 0; } printf("FATHER - QID = %d\n", qid); for(cont = 0; cont < 10; cont++){ sleep (rand()%4); msgrcv(qid, &buf, length, 1, 0); printf("FATHER - MESSAGE NUMBER %d: %d\n", cont+1, buf.num); } msgctl(qid, IPC_RMID, 0); return 0; } -------------------------------------------------------------- Мы создали два процесса, взаимодействующих через систему обмена сообщениями. В данном случае нет необходимости в протоколе - взаимодействие очень простое: в следующей заметке мы поговорим снова об очередях сообщений и об управлении различными типами сообщений. Также мы рассмотрим протокол взаимодействия, чтобы начать строить наш большой IPC проект. Рекомендуемая литература * Silberschatz, Galvin, Gagne, Operating System Concepts - Sixth Edition, Wiley&Sons, 2001 * Tanenbaum, WoodHull, Operating Systems: Design and Implementation - Second Edition, Prentice Hall, 2000 * Stallings, Operating Systems - Fourth Edition, Prentice Hall, 2002 * Bovet, Cesati, Understanding the Linux Kernel, O'Reilly, 2000 * The Linux Programmer's Guide: http://www.tldp.org/LDP/lpg/index.html * Linux Kernel 2.4 Internals http://www.tldp.org/LDP/lki/lki-5.html * Web page of the #kernelnewbies IRC channel http://www.kernelnewbies.org/ * The linux-kernel mailing list FAQ http://www.tux.org/lkml/
Часть 4. Параллельное программирование - очереди сообщений (2) Оригинал: http://www.linuxfocus.org/Russian/May2003/article296.shtml Содержание: * Введение * Обработка ошибок * Реализация протокола - уровень 1 * Рекомендуемая литература * Страница отзывов Введение -------- В прошлой заметке из этой серии мы научились организовывать работу двух ( или более процессов ), используя очереди сообщений. В этой заметке мы начнем создавать простой протокол для обмена сообщениями. Как мы уже сказали, протокол - это набор правил, позволяющих общаться людям или машинам, даже если они разные. Например, английский язык является протоколом, позволяющим мне передавать знания моим индийским читателям ( им всегда очень интересно то, что я пишу ). Говоря о чем-нибудь более приближенном к ОС Linux, мы можем упомянуть о перекомпиляции ядра ( не пугайтесь, это не так трудно ), в частности о сетевом разделе, где вы можете указать ядру о необходимости понимания разных сетевых протоколов, таких как TCP/IP. Чтобы придумать протокол во-первых необходимо подумать о типе приложения, для которого он будет создан. Мы займемся созданием простого телефонного коммутатора. Основным процессом будет коммутатор, а потомками - пользователи: задача будет - позволить пользователям обмениваться сообщениями через этот коммутатор. В протоколе необходимо предусмотреть три события: появление пользователя ( т.е. пользователь присутствует и подключен ), обычные действия пользователя и его исчезновение ( пользователь не подключен ). Остановимся немного подробнее на этих событиях: При подключении пользователя к системе он создает свою собственную очередь ( не забывайте, что мы говорим о процессах ), ее идентификатор должен быть послан коммутатору, чтобы он знал как обратиться к этому пользователю. Здесь самое время создать некие структуры данных, если в них есть необходимость. От коммутатора необходимо получить идентификатор очереди, чтобы знать куда посылать сообщения другим пользователям. Пользователь может посылать и принимать сообщения. В случае посылки сообщения другому пользователю мы должны сначала узнать - подключен он или нет, но в любом случае посылающему пользователю необходимо вернуть ответ, чтобы он знал что произошло с его сообщением. Принимающей стороне ничего не надо делать - все сделает коммутатор. При отключении пользователя от системы ему необходимо оповестить об этом коммутатор. В этом фрагменте кода показывается как это сделать -------------------------------------------------------------- /* Birth */ create_queue init send_alive send_queue_id get_switch_queue_id /* Work */ while(!leaving){ receive_all if(<send condition>){ send_message } if(<leave condition>){ leaving = 1 } } /* Death */ send_dead -------------------------------------------------------------- Теперь рассмотрим поведение нашего коммутатора: при подключении пользователя он посылает сообщение с идентификатором его очереди сообщений; нам необходимо его запомнить, чтобы передавать предназначенные ему сообщения и в свою очередь передать ему идентификатор очереди куда бы он мог посылать сообщения для других пользователей. Далее, нам необходимо анализировать сообщения от пользователей и проверять подключены или нет те, кому они предназначены: в случае если пользователь подключен - передать ему сообщение, если нет - отменить сообщение, в обоих случаях необходимо дать ответ отправителю. При отключении пользователя мы удаляем идентификатор его очереди и он становится недоступным. Это реализуется следующим образом -------------------------------------------------------------- while(1){ /* New user */ if (<birth of a user>){ get_queue_id send switch_queue_id } /* User dies */ if (<death of a user>){ remove_user } /* Messages delivering */ check_message if (<user alive>){ send_message ack_sender_ok } else{ ack_sender_error } } -------------------------------------------------------------- Обработка ошибок ---------------- Обработка ошибок один из важнейших и трудных моментов в проекте. Более того, хорошая подсистема обработки ошибок занимает до 50% создаваемого кода. Я не буду рассказывать в данной заметке каким образом создаются хорошие механизмы обработки ошибок, потому, что этот вопрос достаточно объемен, но с настоящего момента я буду уделять этой теме внимание. Хорошим вступлением в эту тему будет изучение документации glibc ( www.gnu.org ), но если вы очень заинтересованы я напишу отдельную заметку. Реализация протокола - уровень 1 -------------------------------- Наш протокол имеет два уровня, первый ( нижний ) состоит из функций для управления очередями, посылкой и приемом сообщений, верхний уровень представлен функциями схожими с кодом, которым мы демонстрировали поведение коммутатора и пользователей. Первое что нам необходимо - определить структуру для сообщения, используя прототип уровня ядра - msgbuf typedef struct { int service; int sender; int receiver; int data; } messg_t; typedef struct { long mtype; /* Tipo del messaggio */ messg_t messaggio; } mymsgbuf_t; Это на самом деле общий вид, который мы дополним позже: поля получателя и отправителя содержат идентификатор пользователя, поле данных - общие данные, поле сервиса используется для запроса на обслуживание у коммутатора. Например представим, что у нас два сервиса - один для незамедлительной и один для отсроченной доставки - в каждом случае поле данных содержит количество секунд задержки. Это всего лишь пример, но нам важно понять насколько разнообразно применение поля сервиса. Теперь создаим несколько функций для управления нашими структурами данных, в частности для заполнения и чтения полей сообщения. Эти функции более-менее схожи, поэтому я покажу только две из них, а остальные вы найдете в заголовочных файлах. void set_sender(mymsgbuf_t * buf, int sender) { buf->message.sender = sender; } int get_sender(mymsgbuf_t * buf) { return(buf->message.sender); } Назначение их не в уменьшении кода ( каждая состоит всего из одной строки ), а в приближении протокола к чему-то более понятному для человека, а следовательно для понимания протокола в целом. Теперь напишем функции для создания IPC ключей, создания и удаления очередей сообщений, отправки и получения сообщений: создание IPC ключа -------------------------------------------------------------- key_t build_key(char c) { key_t key; key = ftok(".", c); return(key); } функция создания очереди int create_queue(key_t key) { int qid; if((qid = msgget(key, IPC_CREAT | 0660)) == -1){ perror("msgget"); exit(1); } return(qid); } -------------------------------------------------------------- как видите обработка ошибок в данном случае достаточно проста. Следующая функция - уничтожение очереди -------------------------------------------------------------- int remove_queue(int qid) { if(msgctl(qid, IPC_RMID, 0) == -1) { perror("msgctl"); exit(1); } return(0); } -------------------------------------------------------------- И наконец функции для отправки и получения сообщений - т.е. запись сообщения в определенную очередь и чтение из нее -------------------------------------------------------------- int send_message(int qid, mymsgbuf_t *qbuf) { int result, lenght; lenght = sizeof(mymsgbuf_t) - sizeof(long); if ((result = msgsnd(qid, qbuf, lenght, 0)) == -1){ perror("msgsnd"); exit(1); } return(result); } int receive_message(int qid, long type, mymsgbuf_t *qbuf) { int result, length; length = sizeof(mymsgbuf_t) - sizeof(long); if((result = msgrcv(qid, (struct msgbuf *)qbuf, length, type, IPC_NOWAIT)) == -1){ if(errno == ENOMSG){ return(0); } else{ perror("msgrcv"); exit(1); } } return(result); } -------------------------------------------------------------- Вот и все. Вы можете найти эти функции в файле layer1.h (http://www.linuxfocus.org/common/src/article296/): попробуйте написать сами программу используя их. В следующей заметке мы поговорим о втором уровне протокола и создадим его. Рекомендуемая литература * Silberschatz, Galvin, Gagne, Operating System Concepts - Sixth Edition, Wiley&Sons, 2001 * Tanenbaum, WoodHull, Operating Systems: Design and Implementation - Second Edition, Prentice Hall, 2000 * Stallings, Operating Systems - Fourth Edition, Prentice Hall, 2002 * Bovet, Cesati, Understanding the Linux Kernel, O'Reilly, 2000 * The Linux Programmer's Guide: http://www.tldp.org/LDP/lpg/index.html * Linux Kernel 2.4 Internals http://www.tldp.org/LDP/lki/lki-5.html * Web page of the #kernelnewbies IRC channel http://www.kernelnewbies.org/ * The linux-kernel mailing list FAQ http://www.tux.org/lkml/ Присылайте свои комментарии, вопросы на мой почтовый адрес leo.giordani(at)libero.it или пишите мне через "страницу отзывов". Вы можете писать мне на английском, немецком или итальянском языках.
Часть 5. Параллельное программирование - очереди сообщений (3) Оригинал: http://www.linuxfocus.org/Russian/November2003/article316.shtml Содержание: * Реализация протокола - Уровень 2 - Общие вопросы * Реализация пользовательского процесса * Реализация процесса коммутатора * Заключение * Заключение * Рекомендуемые приложения, сайты и литература * Загрузить * Страница отзывов Реализация протокола - Уровень 2 - Общие вопросы ------------------------------------------------ Приложение ipcdemo (http://www.linuxfocus.org/common/src/article316/) было разработано для простой реализации коммутатора, позволяющего пользователям обмениваться сообщениями. Чтобы немного разнообразить приложение я добавил так называемый "сервис" - сообщение, задача которого дать пользователям возможность сообщать коммутатору о своем статусе - готовы ли они принимать сообщения, каким образом доставить им их ( послав id очереди IPC ) или они собираются отключиться. Также были добавлены еще два сервиса: Termination и Timing: первый используется для сообщения пользователю, что коммутатор выключается, второй - для измерения времени отклика пользователя. Ниже мы рассмотрим эти моменты подробнее, в разделах о "пользователе" и "коммутаторе" соответственно. Во второй реализации протокола реализованы функции высокого уровня для отправки и получения сообщений, для взаимодействия с "сервисами", для инициализации: эти функции реализованы на основе функций первого уровня протокола, что делает их достаточно понятными. Обратите внимание на некоторые объявления типов сообщений и сервисов в файле layer2.h. Приложение ipcdemo демонстрационное, неоптимизированное - вы можете заметить много глобальных переменных, хочу обратить ваше внимание на то, что главной задачей является объяснение читателю вопросов IPC, но не оптимизации кода. Тем не менее, если вы обнаружите что-то действительно странное - пишите мне и мы обсудим. Реализация пользовательского процесса ------------------------------------- "Пользователь" это на самом деле порожденный процесс коммутатора ( или лучше сказать - родительского процесса моделирующего коммутатор ). Это значит, что "пользователь" обладает теми же переменными, что и коммутатор: ему известен идентификатор очереди коммутатора, потому что он был сохранен коммутатором в локальной переменной до порождения других процессов. При возникновении "пользовательского" процесса первое что ему необходимо сделать - это создать очередь и сообщить коммутатору как обратиться к ней: чтобы сделать это надо послать два сообщения - SERV_BIRTH и SERV_QID. /* инициализация очереди */ qid = init_queue(i); /* сообщение коммутатору о возникновении "пользователя" */ child_send_birth(i, sw); /* сообщение коммутатору способа обращения к "пользователю" */ child_send_qid(i, qid, sw); Затем начинается рутинная работа: послать сообщение, проверить наличие сообщений от других "пользователей", проверить наличие запроса коммутатора и обратно по циклу. Решение о посылке сообщения принимается на вероятностной основе: функция myrand() возвращает случайное число в дипазоне от 0 до значения переданного аргумента, в нашем случае 100, мы посылаем сообщение только если это число меньше указанной вероятности. Так как "пользователь" "спит" 1 секунду между последовательными выполнениями тела цикла, то, значит, за каждые 100 секунд он будет посылать сообщение примерно столько раз, чему равно значение вероятности отсылки; тут мы предполагаем, что выборка из 100 элементов достаточна, чтобы превратить вероятность в действительность, на самом деле 100 элементов достаточно мало для этого... Однако просто заметьте, что в программе не надо указывать слишком маленькие вероятности, иначе ваша симуляция будет длиться веками. -------------------------------------------------------------- if(myrand(100) < send_prob){ dest = 0; /* не посылать сообщения коммутатору, самому себе и */ /* уже получившему */ while((dest == 0) || (dest == i) || (dest == olddest)){ dest = myrand(childs + 1); } olddest = dest; printf("%d -- U %d -- Message to user %d\n", (int) time(NULL), i, dest); child_send_msg(i, dest, 0, sw); } -------------------------------------------------------------- Сообщения посылаемые пользователями коммутатору и затем пересылаемые коммутатором нам - мы отметим как TYPE_CONN (от CONNECTION). -------------------------------------------------------------- /* проверить наличие простых входящих сообщений */ if(child_get_msg(TYPE_CONN, &in)){ msg_sender = get_sender(&in); msg_data = get_data(&in); printf("%d -- U %d -- Message from user %d: %d\n", (int) time(NULL), i, msg_sender, msg_data); } -------------------------------------------------------------- Для запроса сервиса коммутатора будем использовать сообщения типа TYPE_SERV. В случае получения сообщения о прекращении работы - "пользователю" необходимо отправить подтверждающее сообщение, чтобы коммутатор отметил "пользователя" как недоступного и прекратил посылать ему сообщения; затем " пользователь" должен прочитать все сообщения, предназначенные ему ( чтобы выглядеть вежливым, мы можем пропустить этот момент ), удалить очередь и сказать "до свидания" коммутатору. В случае запроса сервиса проверки времени мы посылаем коммутатору сообщение с текущим временем, получив его коммутатор вычисляет разницу между временем получения и отправления сообщения, чтобы знать сколько времени сообщение провело в очередях и заносит это значение в лог. -------------------------------------------------------------- /* проверка наличия запроса коммутатора */ if(child_get_msg(TYPE_SERV, &in)){ msg_service = get_service(&in); switch(msg_service){ case SERV_TERM: /* извините, необходимо прекратить работу */ /* послать подтверждение коммутатору */ child_send_death(i, getpid(), sw); /* прочитать сообщения из очереди */ while(child_get_msg(TYPE_CONN, &in)){ msg_sender = get_sender(&in); msg_data = get_data(&in); printf("%d -- U %d -- Message from user %d: %d\n", (int) time(NULL), i, msg_sender, msg_data); } /* удалить очередь */ close_queue(qid); printf("%d -- U %d -- Termination\n", (int) time(NULL), i); exit(0); break; case SERV_TIME: /* необходимо провести замер времени пребывания сообщения в очередях */ child_send_time(i, sw); printf("%d -- U %d -- Timing\n", (int) time(NULL), i); break; } } -------------------------------------------------------------- Реализация процесса коммутатора ------------------------------- Работу родительского процесса мы можем разделить на две части - "до" и "после" создания порожденных процессов. В первой части необходимо инициализировать массив для хранения идентификаторов порожденных процесов и создать свою очередь. Создавать массив для хранения идентификаторов не совсем верное решение для реального приложения - гораздо правильнее использовать динамические структуры и выделение памяти ( memory allocation ), но объяснение этих тем находится за пределами данной заметки. Сначала идентификаторы очереди инициализируются идентификатором коммутатора, чтобы показать что "пользователь" еще не подключен, после отключения "пользователя" идентификатору опять присваивается оригинальное значение. Во второй части своей работы родительский процесс также как и "пользовательский" ходит по циклу до тех пор пока не отключатся все "пользователи". Коммутатор принимает сообщения от "пользователей" и перенаправляет их по назначению. -------------------------------------------------------------- /* проверить попытку подключения "пользователя" */ if(switch_get_msg(TYPE_CONN, &in)){ msg_receiver = get_receiver(&in); msg_sender = get_sender(&in); msg_data = get_data(&in); /* если адресат доступен */ if(queues[msg_receiver] != sw){ /* послать сообщение адресату */ switch_send_msg(msg_sender, msg_data, queues[msg_receiver]); printf("%d -- S -- Sender: %d -- Destination: %d\n", (int) time(NULL), msg_sender, msg_receiver); } else{ /* адресат недоступен */ printf("%d -- S -- Unreachable destination (Sender: %d - Destination: %d)\n", (int) time(NULL), msg_sender, msg_receiver); } -------------------------------------------------------------- Если "пользователь" посылает сообщение через коммутатор, ему может быть послан запрос одного из двух видов. Решение о посылке запроса и выбор типа запроса производится на вероятностной основе (принцип аналогичен ранее описаному). Первый тип запроса, который может быть послан - запрос на завершение работы "пользователя", второй - запрос на замер времени: мы фиксируем текущее время и помечаем пользователя, чтобы в последующем не пытаться заново замерять время у пользователя, который уже это делает. Если мы не приняли сообщение, возможно все пользователи уже завершили работу. В этом случае мы выжидаем, чтобы порожденные процессы завершили работу до конца (последний пользователь может проверять оставшиеся сообщение в очереди), уничтожаем нашу очередь и выходим. -------------------------------------------------------------- /* случайный запрос сервиса инициатора последнего сообщения */ if((myrand(100) < death_prob) && (queues[msg_sender] != sw)){ switch(myrand(2)) { case 0: /* пользователь должен отключиться */ printf("%d -- S -- User %d chosen for termination\n", (int) time(NULL), msg_sender); switch_send_term(i, queues[msg_sender]); break; case 1: /* проверка наличия замера пользователя */ if(!timing[msg_sender][0]){ timing[msg_sender][0] = 1; timing[msg_sender][1] = (int) time(NULL); printf("%d -- S -- User %d chosen for timing...\n", timing[msg_sender][1], msg_sender); switch_send_time(queues[msg_sender]); } break; } } } else{ if(deadproc == childs){ /* все порожденные процессы завершили работу, ожидание окончания последним своих задач */ waitpid(pid, &status, 0); /* удаление очереди коммутатора */ remove_queue(sw); /* завершение программы */ exit(0); } } -------------------------------------------------------------- Затем мы проверяем, не получили ли мы сервисное сообщение: мы можем получить сообщения о начале и завершении работы пользователя, id очереди и ответы на запрос замера времени. -------------------------------------------------------------- if(switch_get_msg(TYPE_SERV, &in)){ msg_service = get_service(&in); msg_sender = get_sender(&in); switch(msg_service) { case SERV_BIRTH: /* подключение нового пользователя */ printf("%d -- S -- Activation of user %d\n", (int) time(NULL), msg_sender); break; case SERV_DEATH: /* завершение работы пользователя */ printf("%d -- S -- User %d is terminating\n", (int) time(NULL), msg_sender); /* удаление очереди пользователя из списка */ queues[msg_sender] = sw; /* контроль количество пользователей, завершивших работу */ deadproc++; break; case SERV_QID: /* посылка пользователем идентификатора своей очереди */ msg_data = get_data(&in); printf("%d -- S -- Got queue id of user %d: %d\n", (int) time(NULL), msg_sender, msg_data); queues[msg_sender] = msg_data; break; case SERV_TIME: msg_data = get_data(&in); /* информация о времени */ timing[msg_sender][1] = msg_data - timing[msg_sender][1]; printf("%d -- S -- Timing of user %d: %d seconds\n", (int) time(NULL), msg_sender, timing[msg_sender][1]); /* The user is no more under time control */ timing[msg_sender][0] = 0; break; } } -------------------------------------------------------------- Заключение ---------- Это последняя заметка из серии о параллельном программировании: естественно мы не рассмотрели все аспекты этого подхода, но мы можем точно сказать, что вы теперь знаете что скрывается за словом IPC и для решения каких проблем это верный выбор. Советую вам немного усложнить наше приложение, как я уже говорил - отладка таких приложений непростое занятие, но с другой стороны вы можете лучше узнать возможности отладчиков ( не забывайте что gdb ваш верный друг в процессе программирования ), обратите внимание на ссылки ниже - там вы найдете интересные приложения. Небольшой совет касающийся IPC экспериментов. Представьте, что вы запустили несколько раз программу, которая работает не так как вы хотите, в этом случае простое нажатие клавиш Ctrl-C не уничтожит все порожденные процессы. Ранее я не упоминал об утилите "kill", но теперь вы знаете немного о процессах и я уверен, что вы разберетесь с man страницей. Но есть еще одна вещь, которую оставляют за собой процессы - IPC структуры. В приведенном выше примере уничтоженные процессы не освободят выделенную память; чтобы сделать это - мы можем использовать программы ipcs и ipcrm: ipcs показывает список выделенных IPC ресурсов (будьте внимательны - она покажет вам все ресурсы, не только вашего приложения), а ipcrm даст вам возможность удалить некоторые из них; если вы запустите ipcrm без аргументов - вы получите всю интересующую вас информацию: предлагаемые цифры для первых экспериментов - "5 70 70". Разархивируйте командой "tar xvzf ipcdemo-0.1.tar.gz". Чтобы собрать ipcdemo выполните команду "make" внутри каталога с проектом; "make clean" - убирает backup файлы, а "make cleanall" убирает также object файлы. Рекомендуемые приложения, сайты и литература Литературу советую посмотреть вам в предыдущих заметках, здесь хочу посоветовать вам несколько адресов в Интернет о программировании, отладке и т.д. Отладчики верные друзья разработчика, по крайней мере в момент разработки: научитесь сначала пользоваться gdb, а потом уже ddd потому, что графическое приложение это конечно хорошо, но необходимо знать и основы. * GDB The GNU Project Debugger: http://www.gnu.org/directory/gdb.html * DDD Data Display Debugger: http://www.gnu.org/software/ddd Наверняка вы когда-нибудь получали такое сообщение - "Segmentation fault" и размышляли где вы совершили ошибку в коде. Могу посоветовать вам кроме изучения файла с дампом, обратить внимание на valgrind и наблюдать за памятью. Также для чтения core dumped файла с помощью gdb вы можете использовать valgrind. * Valgrind - open-source отладчик памяти для x86-linux: http://developer.kde.org/~sewardj Вообщем-то создавать IPC приложения на языке 'C' занятие интересное, но непростое. Возможным решением может стать выбор языка Python: в нем полностью поддерживается fork и другое, связанное с этим. Обратите внимание на этот язык. * Python: http://www.python.org Загрузить * Здесь материалы для этой заметки (http://www.linuxfocus.org/common/src/article316/)

<< Предыдущая ИНДЕКС Поиск в статьях src Установить закладку Перейти на закладку Следующая >>

Обсуждение [ RSS ]
  • 1, Alex (??), 17:56, 09/06/2009 [ответить]  
  • +/
    "Теория очереди сообщений
    ------------------------

       Каждый процесс способен создать любое количество структур называемых
       очередями: каждая структура может содержать любое количество сообщений
       разных типов, которые имеют разную природу и содержат любую
       информацию;..."

    Вопрос: так ли это?

    Зачем тогда в линуксе нужны следующие значения?

    /proc/sys/kernel/msgmax
    Здесь определяется максимальный размер сообщения, которое может быть отправлено от одного процесса другому. Сообщения между процессами в памяти ядра не копируются на диск, так что если вы увеличите это значение, то вы увеличите количество памяти используемой операционной системой.

    Default setting: 8192

    /proc/sys/kernel/msgmnb
    Здесь указывается максимальное количество байт в одном сообщении.

    Default setting: 16384

    /proc/sys/kernel/msgmni
    Здесь указывается максимальное количество идентификаторов сообщений в очереди.

    Default setting: 16

     

     Добавить комментарий
    Имя:
    E-Mail:
    Заголовок:
    Текст:




    Партнёры:
    PostgresPro
    Inferno Solutions
    Hosting by Hoster.ru
    Хостинг:

    Закладки на сайте
    Проследить за страницей
    Created 1996-2025 by Maxim Chirkov
    Добавить, Поддержать, Вебмастеру