OberonCore

Библиотека  Wiki  Форум  BlackBox  Компоненты  Проекты
Текущее время: Суббота, 27 Апрель, 2024 15:50

Часовой пояс: UTC + 3 часа




Начать новую тему Ответить на тему  [ Сообщений: 17 ] 
Автор Сообщение
 Заголовок сообщения: Последовательные коммуникации
СообщениеДобавлено: Пятница, 24 Ноябрь, 2006 01:58 
Аватара пользователя

Зарегистрирован: Вторник, 19 Сентябрь, 2006 21:54
Сообщения: 2449
Откуда: Россия, Томск
Привожу ниже модуль собственного изготовления. Называется CommAction. Знаю, что залез в чужую подсистему, но слишком уж он мне нравится именно там. Свою изобретать ради двух модулей не стал.

Код:
(*
Общая идея реализации заключается в том, что есть некий Action, который в фоновом режиме опрашивает поток (CommStreams.Stream) на предмет наличия новых данных, получает эти данные и складывает в буфер приема. У этого Action есть также буфер передачи, из которого данные записываются в поток. Интерпретация получаемых данных - дело специфическое, зависящее от конкретных задач, а вот периодическая работа с буферами никаких новшеств не требует. Поэтому Action - не расширяемый тип. Более того, через него невозможно получить прямого доступа ни к потоку, ни к буферам.
Для интерпретации принимаемых данных вводится тип Engine ("движок"). Движок прикрепляется к Action методом Action.SetEngine. Engine - тип расширяемый. Его метод Do опубликован только для реализации и изначально пуст. Этот метод периодически вызывается из Action. Для каждой конкретной задачи следует реализовать свой движок и присоединять его к Action. В общем случае движок реализует некую базовую часть протокола обмена и передает данные далее в протокол верхнего уровня. Движок имеет доступ только к буферам приема и передачи, но не к потоку непосредственно (если иное не предусмотрено в расширенном типе-наследнике). Чтобы подать сигнал завершения работы, движок устанавливает собственное свойство finished = TRUE.
Буферы реализованы в отдельном модуле как тип CommBuffers.Carrier. В текущей реализации они допускают последовательное чтение данных с начала, удаление данных с начала и добавление данных в конец (FIFO).
Таким образом, получение, хранение и обработка данных максимально отделены друг от друга. Объект, обрабатывающий данные, не имеет доступа к коммуникационному потоку, что позволяет четко ограничить доступ к низкоуровневым функциям. Изменяемая логика обработки вынесена в отдельный тип, минимизируя наследуемую часть. Тем самым соблюден принцип предпочтения композиции объектов перед наследованием.
*)

MODULE CommAction;
(* Copyright (c) Alexander Iljin, 2006. *)

IMPORT CommStreams, Services, CommBuffer;

TYPE
   (** Action - это объект, периодически читающий данные из потока в буфер приема и пытающийся записать данные из буфера отправки в поток. Сам по себе Action не предоставляет доступа к этим буферам, для чтения и записи необходимо расширять тип Engine. **)
   Action* = POINTER TO ABSTRACT RECORD (Services.Action)
      active-: BOOLEAN;   (* = TRUE, если действие периодически вызывается *)
      lastRecvTick-, lastSendTick-: LONGINT;   (* когда был последний прием и передача *)
      engine-: Engine   (* движок-интерпретатор получаемых данных *)
   END;

   (** Engine - это объект, который можно присоединить к Action. Action будет периодически вызывать метод Engine.Do для обработки принятых данных или таймаутов. Движок имеет доступ к буферам приема и передачи, но не к потоку непосредственно. Когда работа закончена, движок может установить finished = TRUE и Action закроет поток. **)
   Engine* = POINTER TO EXTENSIBLE RECORD
      action-: Action;   (* с этим Action связан данный движок *)
      recvBuf-, sendBuf-: CommBuffer.Carrier; (* буферы приема и передачи (= NIL тогда и только тогда, когда action = NIL) *)
      finished*: BOOLEAN;   (* Движок может установить finished = TRUE, сигнализируя, что он закончил работу и поток можно закрыть. *)
      streamClosed-: BOOLEAN   (* Action устанавливает streamClosed = TRUE перед вызовом Engine.Do, если поток уже закрыт (например, из-за потери связи). Action все равно будет продолжать работу, пока движок не установит finished = TRUE. *)
   END;

   StdAction = POINTER TO RECORD (Action)
      stream: CommStreams.Stream;   (* коммуникации через этот объект *)
      sendBuf: CommBuffer.Carrier;   (* буфер отправляемых данных *)
      recvBuf: CommBuffer.Carrier;   (* буфер принятых данных *)
      sendRider: CommBuffer.Rider   (* райдер по sendBuf *)
   END;


(* Engine *)

PROCEDURE (e: Engine) Do- (), NEW, EMPTY;
(** Данное событие периодически вызывается, если движок присоединен к Action. Здесь должна выполняться вся работа по интерпретации получаемых данных и отправке данных в ответ. Do вызывается даже если в буфере приема нет новых данных, поэтому можно проверять таймауты относительно Action.lastRecvTick и Action.lastSendTick. **)

PROCEDURE (e: Engine) Close- (), NEW, EMPTY;
(** Данное событие вызывается при нормальном прекращении работы Action. Не будет вызван, если Action просто удалят из списка активных объектов с помощью Services.RemoveAction. **)


PROCEDURE (e: Engine) SetLoggers* (send, recv: CommBuffer.Logger), NEW;
(** Устанавливаем "журналистов" для буферов приема и передачи. **)
BEGIN
   e.recvBuf.SetLogger(recv);
   e.sendBuf.SetLogger(send);
   ASSERT(e.sendBuf.logger = send, 60);
   ASSERT(e.recvBuf.logger = recv, 61)
END SetLoggers;



(* Action *)

PROCEDURE (a: Action) SetEngine* (e: Engine), NEW, ABSTRACT;
(** Задаем движок, который будет обрабатывать данные. *)

PROCEDURE (a: Action) SetStream* (stream: CommStreams.Stream), NEW, ABSTRACT;
(** Задаем поток, через который будем перекачивать данные. *)

PROCEDURE (a: Action) Close*, NEW, ABSTRACT;
(** Закрываем активное соединение и останавливаем движок. *)



(* StdAction *)

(* forward declaration *)
PROCEDURE ^ (a: StdAction) Close*;


PROCEDURE (a: StdAction) SetEngine* (e : Engine);
BEGIN
   (* отцепляем прежний движок *)
   IF a.engine # NIL THEN
      a.engine.action := NIL;
      a.engine.recvBuf := NIL;
      a.engine.sendBuf := NIL
   END;
   (* ставим новый *)
   a.engine := e;
   IF e # NIL THEN
      e.action := a;
      e.recvBuf := a.recvBuf;
      e.sendBuf := a.sendBuf
   END
END SetEngine;


PROCEDURE (a: StdAction) Init, NEW;
BEGIN
   NEW(a.recvBuf);
   NEW(a.sendBuf)
END Init;


PROCEDURE (a: StdAction) SetStream* (stream: CommStreams.Stream);
(** Задаем поток, с которым будем работать. *)
BEGIN
   IF stream = a.stream THEN RETURN END;

   IF a.stream # NIL THEN
      a.stream.Close
(*      ;Log.String("Action: stream closed by setting another stream");
      Log.Ln;*)
   END;
   a.stream := stream;
   IF a.stream # NIL THEN
      Services.DoLater(a, Services.now);
      a.active := TRUE
   END
END SetStream;


PROCEDURE (a: StdAction) Do-;
(* Будем отправлять данные в движок до тех пор, пока буфер приема не будет пуст, даже после того, как сервер закрыл соединение. Это дает возможность движку обрабатывать приходящие данные небольшими порциями, в духе кооперативной многозадачности, и удалять их из буфера по мере обработки, не заводя собственного буфера. *)
   CONST BufSize = 256;
   VAR
      i, num : INTEGER;
      buf : ARRAY BufSize OF BYTE;
      s : POINTER TO ARRAY OF SHORTCHAR;
      ch : SHORTCHAR;
BEGIN
   IF (a.lastSendTick = 0) & (a.lastRecvTick = 0) THEN
      a.lastSendTick := Services.Ticks();
      a.lastRecvTick := Services.Ticks()
   END;

   (* читаем все данные из потока в буфер приема *)
   IF a.stream # NIL THEN
      REPEAT
         a.stream.ReadBytes(buf, 0, BufSize, num);
         a.recvBuf.Append(buf, 0, num);
         IF num # 0 THEN
            a.lastRecvTick := Services.Ticks()
         END
(*         FOR i := 0 TO num-1 DO
            Log.Char(CHR(buf[i]));
         END;*)
      UNTIL num = 0;
   END;

   (* Дадим движку обработать принятые данные. Он может что-то добавить в буфер передачи. *)
   IF (a.engine # NIL) & ~a.engine.finished THEN
      a.engine.streamClosed := (a.stream = NIL) OR ~a.stream.IsConnected();
      a.engine.Do;
   END;

   (* Отправляем данные из буфера передачи в поток. *)
   IF (a.stream # NIL) & a.stream.IsConnected() THEN
      a.sendRider := a.sendBuf.NewRider(a.sendRider);
      IF ~a.sendRider.Eof() THEN
         a.sendRider.ReadBytes(buf, 0, BufSize, num);
         a.stream.WriteBytes(buf, 0, num, num);
         a.sendBuf.Delete(num);
         IF num # 0 THEN
            a.lastSendTick := Services.Ticks()
         END
      END
   END;

   (* Работа продолжается в следующих случаях:
   1. связь установлена, и буфер отправки не пуст, и движок не закончил работу;
   2. если движок не закончил работу и буфер приема не пуст.
   Из пункта 1 следует, что чтобы отправить что-либо "на прощание", следует отсоединить движок от Action и не устанавливать finished. Установка finished = TRUE при наличии движка однозначно трактуется как требование закрыть соединение, даже если буфер отправки не пуст.
    *)
   IF (
      (a.stream # NIL) & a.stream.IsConnected() &
      ( ((a.engine # NIL) & ~a.engine.finished) OR ~a.sendBuf.IsEmpty() )
   )
   OR ((a.engine # NIL) & ~a.recvBuf.IsEmpty() & ~a.engine.finished)
   THEN (* переходим в режим ожидания *)
      Services.DoLater(a, Services.now)
   ELSE (* закрываем соединение *)
      a.Close
   END
END Do;


PROCEDURE (a: StdAction) Close*;
BEGIN
   IF a.stream # NIL THEN
      a.stream.Close;
      a.stream := NIL;
(*      Log.String("stream closed normally");
      Log.Ln;*)
   END;
   Services.RemoveAction(a);
   a.active := FALSE;
   IF a.engine # NIL THEN
      a.engine.Close
   END
END Close;


PROCEDURE (a: StdAction) FINALIZE-;
BEGIN
   a.Close
END FINALIZE;



PROCEDURE NewAction* (stream: CommStreams.Stream; engine: Engine): Action;
   VAR a: StdAction;
BEGIN
   NEW(a);
   a.Init;
   a.SetStream(stream);
   a.SetEngine(engine);
   RETURN a
END NewAction;

END CommAction.


Вернуться к началу
 Профиль  
 
 Заголовок сообщения:
СообщениеДобавлено: Пятница, 24 Ноябрь, 2006 02:08 
Аватара пользователя

Зарегистрирован: Вторник, 19 Сентябрь, 2006 21:54
Сообщения: 2449
Откуда: Россия, Томск
Поскольку приведенному выше CommActions.Action совершенно все равно, с какими коммуникационными потоками работать, то на основе этого модуля был мной реализован простенький FTP-клиент, программа чата по TCP/IP и связь с микропроцессорной сетью по COM-порту в рамках подсистемы Mate. Во всех трех случаях я наследовал только CommAction.Engine, реализуя метод Do, после чего составлял нужную мне объектную композицию. Приведу ниже пример из подсистемы Mate.

Код:
MODULE MateEngine;
(** Copyright (c) Alexander Iljin, 2006. **)

   IMPORT
      Log,
      CommAction, CommBuffer,
      MateBytes;

   CONST
      crcSize = 1; (* Число байт в контрольной сумме. *)
      headSize = 4; (* Число байт в заголовке пакета = 4: node, device, param, type. Этот заголовок, по сути, есть предположение о протоколе более высокого уровня. Это предположение помогает нам сэкономить память в процессе работы и накладывает ограничение на реализацию протокола верхнего уровня (если тот попытается создать слишком большой пакет, то наткнется на ошибку времени выполнения). *)
      maxPacketLength* = headSize + 127 + crcSize; (** Максимальная длина раскодированного пакета, в байтах: 4 байта заголовок, до 127 байт данных и контрольная сумма. *)
      maxNumPackets = 10; (* Максимальное число пакетов, которое может быть вычерпано из буфера и обработано за один вызов HexEngine.Do. Ограничение помогает наладить кооперативную многозадачность. *)
      startPacketChar = ':';
      endPacketChar = 0DX;
      minHexPacketLength = 1 + 2 * (1 + crcSize) + 1; (* Минимальный размер hex-пакета в байтах (символах) согласно протоколу. Такой пакет состоит из одного байта данных и контрольной суммы. Данная константа вводится для отбраковывания пакетов на этапе приема, поэтому никаких предположений о минимальной длине пакета для протокола верхнего уровня мы здесь не делаем. *)

   TYPE
      Packet* = ARRAY maxPacketLength OF BYTE;
      
      (*
      Задача данного движка - получение и отправка пакетов в hex-формате. Формат таков:
      ":"<hh>...<hh>0DX
      Начинается пакет с символа двоеточия, заканчивается символом 0DX (либо любым другим символом с кодом из диапазона от 0X до 0DX включительно). Между этими символами заключен пакет, состоящий из символов '0'..'9' и 'A'..'F', то есть, шестнадцатиричные цифры. Число цифр должно быть четным, в противном случае пакет считается испорченным и игнорируется. Игнорируются также все пакеты, внутри которых встретились какие-либо символы помимо шестнадцатиричных цифр.
      После успешного приема пакета проверяется контрольная сумма. Суть проверки заключается в том, что составляющие пакет байты суммируются как беззнаковые целые числа без контроля переполнения. Старшая часть результата отбрасывается, анализируется только младший байт. Если он равен нулю, то пакет принят успешно, в противном случае пакет игнорируется.
       *)

      HexEngine* = POINTER TO RECORD (CommAction.Engine)
         rider: CommBuffer.Rider; (* собственный райдер для чтения буфера приема *)
         rxPacket: Packet; (* Копия принятого пакета. Ссылка на этот пакет передается в Receive. *)
         rxPacketSize: INTEGER; (* Длина принятого пакета. *)
         crcErrors-, badPackets-, goodPackets-: INTEGER; (* статистика работы *)
         hook-: Hook
      END;

      (** Хук - это способ прикрепиться к движку, т.е. отправлять и получать пакеты. Хук следует унаследовать в клиентском коде, реализовав при этом процедуру Receive. Прикрепляется к движку хук с помощью вызова HexEngine.SetHook(Hook). **)

      Hook* = POINTER TO ABSTRACT RECORD
         engine: HexEngine
      END;



   (* Hook *)

   (** Эту фунцию вызывает движок, передавая в нее новые полученные данные. Наследник должен реализовать здесь обработку принятого пакета. **)
   PROCEDURE (h: Hook) Receive* (VAR p: Packet; size: INTEGER), NEW, ABSTRACT;

   (** Эта функция позволяет отправлять данные через хук. **)
   PROCEDURE (h: Hook) Send* (IN p: Packet; size: INTEGER), NEW;
      VAR
         coded: ARRAY 1 + 2 * (maxPacketLength + crcSize) + 1 OF BYTE; (* примечание: crcSize здесь намеренно учтен дважды. Первый раз в составе maxPacketLength, и второй раз явно. *)
         crc, i, pos: INTEGER;

      PROCEDURE AppendX (x: INTEGER);
      (* Добавляем младший байт переменной x в конец массива coded. Конец массива обозначается индексом i. *)
      BEGIN
         IF x < 0 THEN INC(x, 256) END;
         INC(crc, x);
         coded[i] := MateBytes.Byte2Hex(x DIV 16);
         INC(i);
         coded[i] := MateBytes.Byte2Hex(x MOD 16);
         INC(i)
      END AppendX;

   BEGIN
      ASSERT(h.engine # NIL, 20);
      ASSERT(size > 0, 21);
      ASSERT(size <= maxPacketLength + crcSize, 22);
      i := 0;
      coded[i] := SHORT(ORD(startPacketChar));
      INC(i);
      crc := 0;
      pos := 0;
      WHILE pos < size DO
         AppendX(p[pos]);
         INC(pos)
      END;
      AppendX(-SHORT(SHORT(crc)));
      ASSERT(SHORT(SHORT(crc)) = 0, 60);
      coded[i] := SHORT(ORD(endPacketChar));
      INC(i);
      h.engine.sendBuf.Append(coded, 0, i)
   END Send;


   (* HexEngine *)

   PROCEDURE (e: HexEngine) PacketReceived (), NEW;
   BEGIN
      ASSERT(e.hook # NIL, 100);
      INC(e.goodPackets);
      e.hook.Receive(e.rxPacket, e.rxPacketSize)
   END PacketReceived;


   PROCEDURE (e: HexEngine) ReadRxPacket (VAR r: CommBuffer.Rider; size: INTEGER): BOOLEAN, NEW;
   (* Прочитаем из указанного райдера пакет длиной size, раскодируем, проверим контрольную сумму и положим в e.rxPacket. Если контрольная сумма верна, возвращаем TRUE. *)
      VAR
         crc: INTEGER; (* контрольная сумма *)
         x: INTEGER; (* очередной байт пакета *)

   BEGIN
      ASSERT(size >= minHexPacketLength, 20);
      ASSERT(size <= 1+2*(maxPacketLength+1)+1, 21);
      ASSERT(~ODD(size), 22);
      ASSERT(r.Next() = startPacketChar, 23); (* пропустим первый символ *)
      DEC(size);
      crc := 0;
      e.rxPacketSize := 0;
      WHILE size > 2 + 1 DO
         x := ASH(MateBytes.Hex2Byte(r.Next()), 4) + MateBytes.Hex2Byte(r.Next());
         DEC(size, 2);
         INC(crc, x);
         e.rxPacket[e.rxPacketSize] := SHORT(SHORT(x));
         INC(e.rxPacketSize)
      END;
      x := ASH(MateBytes.Hex2Byte(r.Next()), 4) + MateBytes.Hex2Byte(r.Next());
      DEC(size, 2);
      INC(crc, x);
      ASSERT(size = 1, 60); (* один завершающий байт должен остаться *)
      ASSERT(~r.Eof(), 61);
      ASSERT(r.Next() <= endPacketChar, 62);
      IF SHORT(SHORT(crc)) = 0 THEN
         RETURN TRUE
      ELSE
         INC(e.crcErrors);
         RETURN FALSE
      END
   END ReadRxPacket;


   PROCEDURE (e: HexEngine) Do- ();
      VAR
         i, numPackets: INTEGER; (* сколько пакетов обработано *)
         endOfPacket, invalidCharacter: BOOLEAN;
         ch: SHORTCHAR;
   BEGIN
      (* Принцип обработки поступающих данных таков. Уже есть буфер, в котором данные хранятся. Это значит, что нам не нужно делать локальную копию той части пакета, которая была получена. Мы можем последовательно читать буфер. Данные в буфере содержатся в том порядке, в котором они были получены. Мы можем удалять данные из начала буфера. Это значит, что при вхождении в данную процедуру в начале буфера находятся данные, которые нужно обработать. Мы удаляем данные из буфера только тогда, когда обнаружим, что в буфере находится пакет данных целиком. Это значит, что при каждом вхождении в данный обработчик мы проверяем, что у нас в начале буфера. Если там признак начала пакета, значит это либо новые данные, либо часть пакета, оставшаяся с прошлой обработки, когда пакет был принят не до конца. Если в начале буфера нет признака начала пакета, значит это какой-то мусор, не укладывающийся в протокол. Такой мусор нужно игнорировать. Само появление мусора не является фатальной ошибкой. Например, мы могли подключить программу в тот момент, когда кто-то передавал данные из середины пакета. *)
      numPackets := 0;
      e.rider := e.recvBuf.NewRider(e.rider);
      WHILE ~e.rider.Eof() & (numPackets < maxNumPackets) DO
         (* удалим мусор до символа начала пакета *)
         i := 0;
         WHILE ~e.rider.Eof() & (e.rider.Next() # startPacketChar) DO
            INC(i)
         END;
         e.recvBuf.Delete(i); (* Удаляем всё до символа начала пакета. Сам символ не трогаем: он нам понадобится на случай повторной обработки этого же пакета, если в текущем сеансе обработки пакет получен не полностью. *)

         (* Осталось ли что-то еще в буфере? Если да, значит предыдущий цикл завершил работу, прочитав признак начала пакета, и все данные до начала пакета удалены. Теперь нужно прочитать остаток пакета в поисках признака конца, заодно подсчитаем число байт в пакете. *)
         IF ~e.rider.Eof() THEN
            (* проверим, получен ли пакет целиком, и сколько в нем символов *)
            i := 1; (* один символ есть - символ начала пакета, посмотрим, что там дальше *)
            invalidCharacter := FALSE;
            REPEAT
               ch := e.rider.Next();
               INC(i);
               endOfPacket := (ch >= 0X) & (ch <= 0DX);
               IF ~endOfPacket THEN
                  invalidCharacter := ~( ((ch >= '0') & (ch <= '9')) OR ((ch >= 'A') & (ch <= 'F')) )
               END
            UNTIL endOfPacket OR invalidCharacter OR e.rider.Eof();
            IF invalidCharacter THEN
               (* пакет выброшен, потому что в нем содержались непристойные символы *)
               INC(e.badPackets);
               e.recvBuf.Delete(i)
            ELSIF i > (1 + 2*(maxPacketLength) + 1) THEN
               (* пакет выброшен, так как превышает максимально допустимую длину *)
               INC(e.badPackets);
               e.recvBuf.Delete(i)
            ELSIF endOfPacket THEN
               IF i < minHexPacketLength THEN
                  (* пакет выброшен, так как слишком короток, чтобы содержать данные *)
                  INC(e.badPackets)
               ELSIF ODD(i) THEN
                  (* пакет выброшен, так как число байт в нем должно быть четным *)
                  INC(e.badPackets)
               ELSE
                  (* пакет получен целиком, скопируем его и удалим из буфера *)
                  INC(numPackets);
                  e.rider := e.recvBuf.NewRider(e.rider);
                  IF e.ReadRxPacket(e.rider, i) THEN
                     e.PacketReceived()
                  END
               END;
               e.recvBuf.Delete(i)
            END
         END
      END;
      (* Прекращаем работу в том случае, если поток уже закрыт и в текущем сеансе обработки буфера приема мы не достигли максимального числа пакетов. Это значит, что если в буфере и остаются какие-то остатки данных, но они не тянут еще на один цельный пакет. *)
      IF e.streamClosed & (numPackets < maxNumPackets) THEN
         e.finished := TRUE
      END
   END Do;


   (* Miscellaneous *)
   
   PROCEDURE (e: HexEngine) SetHook* (h: Hook), NEW;
   BEGIN
      e.hook := h;
      IF e.hook # NIL THEN
         e.hook.engine := e
      END
   END SetHook;

END MateEngine.


Вернуться к началу
 Профиль  
 
 Заголовок сообщения:
СообщениеДобавлено: Пятница, 24 Ноябрь, 2006 02:22 
Аватара пользователя

Зарегистрирован: Вторник, 19 Сентябрь, 2006 21:54
Сообщения: 2449
Откуда: Россия, Томск
Вот так вот примерно и работаем. Обратите внимание, что тип MateEngine.HexEngine не расширяемый, расширяемым является только MateEngine.Hook, так что у клиентов нет шансов ничего испортить в работе протокола. Модули небольшие, трудных для понимания вещей не содержат кроме, пожалуй, сложного условия при выборе, продолжать или заканчивать работу в конце CommAction.Action.Do. В остальном, если отвлечься от тонкостей реализации hex-протокола, система достаточно простая, гибкая и безопасная. Напоследок - обобщенный пример кода, открывающего соединение через COM-порт:
Код:
VAR
   action: CommAction.Action;
   engine: MateEngine.HexEngine;
   stream: CommStreams.Stream;
   res: INTEGER;
BEGIN
   CommStreams.NewStream('CommV24', '', 'COM2,19200', stream, res);
   CASE res OF
   |   CommStreams.done:
      NEW(engine);
      MateModels.dir.HookToEngine(engine);
      action := CommAction.NewAction(stream, engine);
      Dialog.ShowStatus('#Mate:port open')
..................
END;


Вернуться к началу
 Профиль  
 
 Заголовок сообщения:
СообщениеДобавлено: Пятница, 24 Ноябрь, 2006 17:23 

Зарегистрирован: Суббота, 26 Ноябрь, 2005 02:12
Сообщения: 473
Откуда: KZ
Я давно хотел написать нечто подобное под BlackBox, только вот руки не доходили ;(

В Python-е есть такой framework - называется twisted (аналог POE в Perl).

Если писать расширения для CommAction.Engine, то неплохо было бы обратить на него внимание.

respect!


Вернуться к началу
 Профиль  
 
 Заголовок сообщения:
СообщениеДобавлено: Пятница, 24 Ноябрь, 2006 17:36 
Аватара пользователя

Зарегистрирован: Вторник, 19 Сентябрь, 2006 21:54
Сообщения: 2449
Откуда: Россия, Томск
Чтобы иметь возможность работать с COM-портом как с потоком нужно немного расширить стандартный модуль CommV24, добавив в него процедуру NewStream в соответствии со спецификацией из CommStreams. Вот моя реализация:
Код:
MODULE CommV24;
(**
   project   = "BlackBox"
   organization   = "www.oberon.ch"
   contributors   = "Oberon microsystems", Alexander Iljin
   version   = "System/Rsrc/About"
   copyright   = "System/Rsrc/About"
   license   = "Docu/BB-License"
   changes   = "
   - 20061007, ai, ParseParams published. Fixed ParseParams handling last parameter.
   - 20061007, ai, Deleted ASSERT(101) from procedure Available.
   - 20060919, ai, Added support for virtual devices ("\\.\" prefix).
   - 20060916, ai, Implemented non-blocking access via this module using CommStreams.Stream.
   - 20060916, ai, Added inDTRon, inRTSon support.
   - 20060916, ai, Some parameter restrictions enforced with ASSERTs. Added IsConnected.
   - 20060916, ai, All active connections are closed upon module unload.
   - 20060915, ai, SYSTEM module highlighted in bold.
   - 20060915, ai, Output to StdLog deleted.
   "
   issues   = ""

**)

   IMPORT SYSTEM, WinApi, CommStreams, Strings;

   CONST
      bits4* = 0; bits5* = 1; bits6* = 2; bits7* = 3; stop15* = 4; stop2* = 5; even* = 6; odd* = 7; inXonXoff* = 8; outXonXoff* = 9; inRTS* = 10; inDTR* = 11; outCTS* = 12; outDSR* = 13; inDTRon* = 14; inRTSon* = 15;

   TYPE
      Connection* = POINTER TO LIMITED RECORD
         hnd: WinApi.HANDLE;   (* # 0: open *)
         opts: SET
      END;

      Stream = POINTER TO LIMITED RECORD (CommStreams.Stream)
         conn: Connection;
         remoteAdr: CommStreams.Adr;
         maxWrite: INTEGER
      END;

      ConnectionChain = POINTER TO RECORD
         conn: Connection;
         next: ConnectionChain
      END;

   VAR root: ConnectionChain;

   PROCEDURE IsConnected (c: Connection): BOOLEAN;
   BEGIN
      ASSERT(c # NIL, 20);
      RETURN c.hnd # 0
   END IsConnected;

   PROCEDURE Remove (c: Connection);
      VAR cc, prev: ConnectionChain;
   BEGIN
      ASSERT(~IsConnected(c), 21); (* only closed connections should be removed from the chain *)
      ASSERT(root # NIL, 100); (* if a connection was created, it should be in the list *)
      IF c = root.conn THEN
         root := root.next
      ELSE
         ASSERT(root.next # NIL, 100); (* if a connection was created, it should be in the list *)
         prev := root;
         cc := root.next;
         WHILE (cc # NIL) & (cc.conn # c) DO
            prev := cc;
            cc := cc.next
         END;
         ASSERT((cc # NIL) & (cc.conn = c), 100); (* if a connection was created, it should be in the list *)
         prev.next := cc.next
      END
   END Remove;

   PROCEDURE Close* (c: Connection);
      VAR res: INTEGER;
   BEGIN
      ASSERT(c # NIL, 20);
      IF IsConnected(c) THEN
         res := WinApi.CloseHandle(c.hnd);
         ASSERT(res # 0, 100);
         c.hnd := 0;
         Remove(c)
      END
   END Close;

   PROCEDURE Open* (device: ARRAY OF CHAR; baud: INTEGER; opts: SET; OUT conn: Connection);
      VAR c: Connection; h: WinApi.HANDLE; res: INTEGER; dcb: WinApi.DCB; to: WinApi.COMMTIMEOUTS;
         s: ARRAY WinApi.MAX_PATH OF CHAR;
         cc: ConnectionChain;
   BEGIN
      conn := NIL; (* Alexander Iljin: this was not ensured *)
      ASSERT(opts * {even, odd} # {even, odd}, 20);
      IF bits4 IN opts THEN ASSERT(opts * {bits5, bits6, bits7} = {}, 21) END;
      IF bits5 IN opts THEN ASSERT(opts * {bits4, bits6, bits7} = {}, 21) END;
      IF bits6 IN opts THEN ASSERT(opts * {bits4, bits5, bits7} = {}, 21) END;
      IF bits7 IN opts THEN ASSERT(opts * {bits4, bits5, bits6} = {}, 21) END;
      ASSERT(opts * {inDTR, inDTRon} # {inDTR, inDTRon}, 22);
      ASSERT(opts * {inRTS, inRTSon} # {inRTS, inRTSon}, 23);
      IF device[0] # "\" THEN s := "\\.\" + device
      ELSE s := device$
      END;
      h := WinApi.CreateFileW(
         s,  WinApi.GENERIC_READ +  WinApi.GENERIC_WRITE,
         {}, NIL,  WinApi.OPEN_EXISTING, {}, 0
      );
      IF h # -1 THEN
         dcb.DCBlength := SIZE(WinApi.DCB);
         res := WinApi.GetCommState(h, dcb);
         IF res # 0 THEN
            dcb.BaudRate := baud;
            dcb.fBits0 := {0};   (* binary *)
            IF opts * {even, odd} # {} THEN INCL(dcb.fBits0, 1) END;   (* check parity *)
            IF outCTS IN opts THEN INCL(dcb.fBits0, 2) END;   (* CTS out flow control *)
            IF outDSR IN opts THEN INCL(dcb.fBits0, 3) END;   (* DSR out flow control *)
            IF inDTR IN opts THEN INCL(dcb.fBits0, 5)            (* DTR flow control handshake *)
            ELSIF inDTRon IN opts THEN INCL(dcb.fBits0, 4)      (* DTR enable*)
            END;
            IF outXonXoff IN opts THEN INCL(dcb.fBits0, 8) END;   (* Xon/Xoff out flow control *)
            IF inXonXoff IN opts THEN INCL(dcb.fBits0, 9) END;   (* Xob/Xoff in flow control *)
            IF inRTS IN opts THEN INCL(dcb.fBits0, 13)            (* RTS flow control handshake *)
            ELSIF inRTSon IN opts THEN INCL(dcb.fBits0, 12)   (* RTS enable*)
            END;
            IF bits4 IN opts THEN dcb.ByteSize := 4X
            ELSIF bits5 IN opts THEN dcb.ByteSize := 5X
            ELSIF bits6 IN opts THEN dcb.ByteSize := 6X
            ELSIF bits7 IN opts THEN dcb.ByteSize := 7X
            ELSE dcb.ByteSize := 8X
            END;
            IF stop15 IN opts THEN dcb.StopBits := 1X
            ELSIF stop2 IN opts THEN dcb.StopBits := 2X
            ELSE dcb.StopBits := 0X
            END;
            IF even IN opts THEN dcb.Parity := 2X
            ELSIF odd IN opts THEN dcb.Parity := 1X
            ELSE dcb.Parity := 0X
            END;
            res := WinApi.SetCommState(h, dcb);
            IF res # 0 THEN
               to.ReadIntervalTimeout := 0;
               to.ReadTotalTimeoutMultiplier := 0;
               to.ReadTotalTimeoutConstant := 0;
               to.WriteTotalTimeoutMultiplier := 0;
               to.WriteTotalTimeoutConstant := 0;
               res := WinApi.SetCommTimeouts(h, to);
               IF res # 0 THEN
                  NEW(c);
                  c.hnd := h;
                  c.opts := opts;
                  NEW(cc);
                  cc.conn := c;
                  cc.next := root;
                  root := cc;
                  conn := c
               END
            END
         END
      END
   END Open;

   PROCEDURE SendByte* (c: Connection; x: BYTE);
      VAR res, written: INTEGER;
   BEGIN
      ASSERT(IsConnected(c), 21);
      written := 0;
      res := WinApi.WriteFile(c.hnd, SYSTEM.ADR(x), 1, written, NIL);
      ASSERT(res # 0, 100);
      ASSERT(written = 1, 101)
   END SendByte;

   PROCEDURE SendBytes* (c: Connection; IN x: ARRAY OF BYTE; beg, len: INTEGER);
      VAR res, written: INTEGER;
   BEGIN
      ASSERT(IsConnected(c), 21);
      ASSERT(LEN(x) >= beg + len, 22);
      ASSERT(len > 0, 23);
      written := 0;
      res := WinApi.WriteFile(c.hnd, SYSTEM.ADR(x) + beg, len, written, NIL);
      ASSERT(res # 0, 100);
      ASSERT(written = len, 101)
   END SendBytes;

   PROCEDURE Available* (c: Connection): INTEGER;
      VAR res: INTEGER; errors: SET; status: WinApi.COMSTAT;
   BEGIN
      ASSERT(IsConnected(c), 21);
      errors := {};
      status.cbInQue := 0;
      res := WinApi.ClearCommError(c.hnd, errors, status);
      ASSERT(res # 0, 100);
      RETURN status.cbInQue
   END Available;

   PROCEDURE ReceiveByte* (c: Connection; OUT x: BYTE);
      VAR res, read: INTEGER;
   BEGIN
      ASSERT(IsConnected(c), 21);
      read := 0;
      res := WinApi.ReadFile(c.hnd, SYSTEM.ADR(x), 1, read, NIL);
      ASSERT(res # 0, 100);
      ASSERT(read = 1, 101)
   END ReceiveByte;

   PROCEDURE ReceiveBytes* (c: Connection; OUT x: ARRAY OF BYTE; beg, len: INTEGER);
      VAR res, read: INTEGER;
   BEGIN
      IF len = 0 THEN RETURN END;
      ASSERT(IsConnected(c), 21);
      ASSERT(LEN(x) >= beg + len, 22);
      ASSERT(len > 0, 23);
      read := 0;
      res := WinApi.ReadFile(c.hnd, SYSTEM.ADR(x) + beg, len, read, NIL);
      ASSERT(res # 0, 100);
      ASSERT(read = len, 101)
   END ReceiveBytes;

   PROCEDURE SetBuffers* (c: Connection; inpBufSize, outBufSize: INTEGER);
      VAR res: INTEGER;
   BEGIN
      ASSERT(IsConnected(c), 21);
      res := WinApi.SetupComm(c.hnd, inpBufSize, outBufSize);
      ASSERT(res # 0, 100)
   END SetBuffers;

   PROCEDURE SetDTR* (c: Connection; on: BOOLEAN);
      VAR res: INTEGER;
   BEGIN
      ASSERT(IsConnected(c), 21);
      ASSERT(~(inDTR IN c.opts), 22);
      IF on THEN res := WinApi.EscapeCommFunction(c.hnd, WinApi.SETDTR)
      ELSE res := WinApi.EscapeCommFunction(c.hnd, WinApi.CLRDTR)
      END;
      ASSERT(res # 0, 100)
   END SetDTR;

   PROCEDURE SetRTS* (c: Connection; on: BOOLEAN);
      VAR res: INTEGER;
   BEGIN
      ASSERT(IsConnected(c), 21);
      ASSERT(~(inRTS IN c.opts), 22);
      IF on THEN res := WinApi.EscapeCommFunction(c.hnd, WinApi.SETRTS)
      ELSE res := WinApi.EscapeCommFunction(c.hnd, WinApi.CLRRTS)
      END;
      ASSERT(res # 0, 100)
   END SetRTS;

   PROCEDURE SetBreak* (c: Connection; on: BOOLEAN);
      VAR res: INTEGER;
   BEGIN
      ASSERT(IsConnected(c), 21);
      IF on THEN res := WinApi.EscapeCommFunction(c.hnd, WinApi.SETBREAK)
      ELSE res := WinApi.EscapeCommFunction(c.hnd, WinApi.CLRBREAK)
      END;
      ASSERT(res # 0, 100)
   END SetBreak;

   PROCEDURE CTSState* (c: Connection): BOOLEAN;
      VAR res: INTEGER; s: SET;
   BEGIN
      ASSERT(IsConnected(c), 21);
      s := {};
      res := WinApi.GetCommModemStatus(c.hnd, s);
      ASSERT(res # 0, 100);
      RETURN s * WinApi.MS_CTS_ON # {}
   END CTSState;

   PROCEDURE DSRState* (c: Connection): BOOLEAN;
      VAR res: INTEGER; s: SET;
   BEGIN
      ASSERT(IsConnected(c), 21);
      s := {};
      res := WinApi.GetCommModemStatus(c.hnd, s);
      ASSERT(res # 0, 100);
      RETURN s * WinApi.MS_DSR_ON # {}
   END DSRState;

   PROCEDURE CDState* (c: Connection): BOOLEAN;
      VAR res: INTEGER; s: SET;
   BEGIN
      ASSERT(IsConnected(c), 21);
      s := {};
      res := WinApi.GetCommModemStatus(c.hnd, s);
      ASSERT(res # 0, 100);
      RETURN s * WinApi.MS_RLSD_ON # {}
   END CDState;


   (* Stream*)

   PROCEDURE (s: Stream) RemoteAdr* (): CommStreams.Adr;
      VAR str: CommStreams.Adr;
   BEGIN
      NEW(str, LEN(s.remoteAdr));
      str^ := s.remoteAdr$;
      RETURN str
   END RemoteAdr;

   PROCEDURE (s: Stream) IsConnected* (): BOOLEAN;
   BEGIN
      RETURN s.conn # NIL
   END IsConnected;

   PROCEDURE (s: Stream) WriteBytes* (IN x: ARRAY OF BYTE; beg, len: INTEGER; OUT written: INTEGER);
   BEGIN
      IF s.IsConnected() THEN
         IF len > s.maxWrite THEN written := s.maxWrite
         ELSE written := len
         END;
         SendBytes(s.conn, x, beg, written)
      ELSE
         written := 0
      END
   END WriteBytes;

   PROCEDURE (s: Stream) ReadBytes* (VAR x: ARRAY OF BYTE; beg, len: INTEGER; OUT read: INTEGER);
   BEGIN
      IF s.IsConnected() THEN
         read := Available(s.conn);
         IF read > len THEN read := len END;
         ReceiveBytes(s.conn, x, beg, read)
      ELSE
         read := 0
      END
   END ReadBytes;

   PROCEDURE (s: Stream) Close*;
   BEGIN
      IF s.IsConnected() THEN
         Close(s.conn);
         s.conn := NIL
      END
   END Close;

   PROCEDURE ParseParams* (IN params: ARRAY OF CHAR; OUT device: ARRAY OF CHAR; OUT baud: INTEGER; OUT opts: SET): INTEGER;
      CONST
         maxParamLen = 15;
         paramDelimiter = ',';
      TYPE
         Param = POINTER TO ARRAY maxParamLen OF CHAR;
      VAR
         par: Param;
         i, res, badParamIndex, paramsLen: INTEGER;
         read, tooLong: BOOLEAN;

      PROCEDURE NextPar(p: Param; OUT read, tooLong: BOOLEAN);
         VAR pInd: INTEGER;
      BEGIN
         pInd := 0;
         WHILE (i < paramsLen) & (params[i] # paramDelimiter) & (pInd < maxParamLen - 1) DO
            p[pInd] := params[i];
            INC(pInd);
            INC(i)
         END;
         IF (i < paramsLen) & (params[i] # paramDelimiter) THEN
            tooLong := TRUE;
            (* a parameter is too long to fit in par, let's just skip it *)
            REPEAT
               INC(i)
            UNTIL ~((i < paramsLen) & (params[i] # paramDelimiter));
            pInd := 0
         ELSE
            tooLong := FALSE
         END;
         p[pInd] := 0X;
         IF i >= paramsLen THEN
            read := pInd > 0
         ELSIF params[i] = paramDelimiter THEN
            INC(i); (* skip the delimiter on next reading *)
            read := TRUE
         ELSE
            read := FALSE
         END;
         IF read THEN
            INC(badParamIndex)
         END
      END NextPar;

   BEGIN
      i := 0;
      badParamIndex := 0;
      device := 'COM1';
      baud := 9600;
      opts := {};
      paramsLen := LEN(params$);
      NEW(par);
      NextPar(par, read, tooLong);
      IF read THEN
         IF tooLong THEN RETURN badParamIndex END;
         IF par^ # '' THEN device := par$ END
      END;
      NextPar(par, read, tooLong);
      WHILE read DO
         IF tooLong THEN RETURN badParamIndex END;
         IF par$ = ''   THEN (* ignore empty parameters *)
         ELSIF par$ = '1'   THEN
         ELSIF par$ = '1.5'   THEN INCL(opts, stop15)
         ELSIF par$ = '2'   THEN INCL(opts, stop2)
         ELSIF par$ = '4'   THEN INCL(opts, bits4)
         ELSIF par$ = '5'   THEN INCL(opts, bits5)
         ELSIF par$ = '6'   THEN INCL(opts, bits6)
         ELSIF par$ = '7'   THEN INCL(opts, bits7)
         ELSIF par$ = '8'   THEN
         ELSIF par$ = 'even'   THEN INCL(opts, even)
         ELSIF par$ = 'odd'   THEN INCL(opts, odd)
         ELSIF par$ = 'DTR'   THEN INCL(opts, inDTR)
         ELSIF par$ = 'DTRon'   THEN INCL(opts, inDTRon)
         ELSIF par$ = 'DTRoff'   THEN
         ELSIF par$ = 'RTS'   THEN INCL(opts, inRTS)
         ELSIF par$ = 'RTSon'   THEN INCL(opts, inRTSon)
         ELSIF par$ = 'RTSoff'   THEN
         ELSIF par$ = 'CTS'   THEN INCL(opts, outCTS)
         ELSIF par$ = 'DSR'   THEN INCL(opts, outDSR)
         ELSIF par$ = 'inXonXoff'   THEN INCL(opts, inXonXoff)
         ELSIF par$ = 'outXonXoff'   THEN INCL(opts, outXonXoff)
         ELSE (* otherwise it should be a baud rate *)
            Strings.StringToInt(par, baud, res);
            IF (res # 0) OR (baud <= 8) THEN
               RETURN badParamIndex (* unknown parameter or invalid baud rate *)
            END
         END;
         NextPar(par, read, tooLong)
      END;
      RETURN 0
   END ParseParams;

   PROCEDURE NewStream* (localAdr, remoteAdr: ARRAY OF CHAR; OUT s: CommStreams.Stream; OUT res: INTEGER);
      VAR
         str: Stream;
         device: ARRAY 256 OF CHAR;
         baud: INTEGER;
         opts: SET;
   BEGIN
      NEW(str);
      NEW(str.remoteAdr, LEN(remoteAdr$) + 1);
      str.remoteAdr^ := remoteAdr$;
      IF ParseParams(str.remoteAdr, device, baud, opts) = 0 THEN
         Open(device, baud, opts, str.conn);
         IF str.conn # NIL THEN
            (* Here we calculate the maximum bytes written in a single call to WriteBytes. This is the granularity of blocking, since WriteBytes actually blocks. It just won't try to send more than s.maxWrite bytes at once. Although several calls to WriteBytes will block for a longer period of time.
            The value here is the baud DIV 1000. In the default configuration (9600 baud, 8 data bits, 1 stop bit, no parity check) this equals 9 bytes which it takes less than 10 msec to send. If you want to block for longer, you may call WriteBytes twice in a succession, which would give you a sum of 20 msec, and so on. If baud = 19200, you would send 19 bytes for the same 10 msec. *)
            str.maxWrite := baud DIV 1000;
            IF str.maxWrite < 1 THEN str.maxWrite := 1 END;
            s := str;
            res := CommStreams.done
         ELSE
            s := NIL;
            res := CommStreams.remoteAdrInUse
         END
      ELSE
         res := CommStreams.invalidRemoteAdr
      END
   END NewStream;


   PROCEDURE Init;
   BEGIN
      root := NIL
   END Init;
   
   PROCEDURE CloseAll;
   BEGIN
      WHILE root # NIL DO
         Close(root.conn)
      END
   END CloseAll;

BEGIN
   Init
CLOSE
   CloseAll
END CommV24.


Вернуться к началу
 Профиль  
 
 Заголовок сообщения:
СообщениеДобавлено: Пятница, 24 Ноябрь, 2006 17:53 

Зарегистрирован: Суббота, 26 Ноябрь, 2005 02:12
Сообщения: 473
Откуда: KZ
Если будет BlackBox for Linux/FreeBSD и если оптимизировать CommTCP для работы с большим количеством сокетов, то получится возможность писать под BlackBox-ом сетевые приложения (для некоторых задач ещё потребуется многопоточность, но думаю что это пока не так важно).

Преимущества перед twisted:
* намного более эффективное использование ресурсов (процессор, память)

Преимущества перед Apache: те же самые, что и у twisted:
* надёжность
* возможно более эффективное использование ресурсов при работе программ со сложной логикой (Apache изначально разрабатывался для выдачи статического содержимого)

На ближайшее будущее с помощью CommAction можно реализовать протокол, предназначенный для для обмена данными по сети с программами, написанными на других языках (XML-RPC).


Вернуться к началу
 Профиль  
 
 Заголовок сообщения:
СообщениеДобавлено: Пятница, 24 Ноябрь, 2006 18:31 
Аватара пользователя

Зарегистрирован: Вторник, 19 Сентябрь, 2006 21:54
Сообщения: 2449
Откуда: Россия, Томск
Для разработки web-приложений есть работающий набор подсистем, бесплатно доступный на http://www.o3-software.de/. Правда, без исходников.


Вернуться к началу
 Профиль  
 
 Заголовок сообщения:
СообщениеДобавлено: Пятница, 24 Ноябрь, 2006 20:22 

Зарегистрирован: Суббота, 26 Ноябрь, 2005 02:12
Сообщения: 473
Откуда: KZ
А можно ещё CommBuffer ?


Вернуться к началу
 Профиль  
 
 Заголовок сообщения:
СообщениеДобавлено: Суббота, 25 Ноябрь, 2006 02:14 
Аватара пользователя

Зарегистрирован: Вторник, 19 Сентябрь, 2006 21:54
Сообщения: 2449
Откуда: Россия, Томск
Конечно, мне не жалко. Только если вы (или кто-то еще читающий эту тему) захотите использовать эти модули, не копируйте текст отсюда. Скажите, я их выложу в виде .odc, чтобы сохранить форматирование.

Длинный комментарий в начале написан не по данному случаю, а просто так, в порядке размышлений.

Код:
(**
   Буфер (Carrier) это динамический массив из байт. Специфика его в том, что он позволяет работать с собой как с линейным буфером, хотя на самом деле представляет из себя односвязанный список. В каждом элементе этого списка содержится очередная порция данных. Такая организация позволяет быстро добавлять и удалять сегменты, что очень полезно при работе с потоками информации неравномерной длины (например, обмен данными в сети). Преимущества по сравнению с традиционным монолитным буфером таковы: (1) размер буфера может динамически меняться, нет нужны угадывать "максимальный" размер для статичного буфера; (2) буфер может состоять из "кусочков", каждый из которых сформирован в момент получения информации. Для монолитного буфера пополнение означает выделение блока памяти бОльшего, чем прежний, размера, копирование прежней информации на новое место, добавление новой информации в конец нового блока, освобождение прежнего блока. Для кусочного буфера пополнение - это только выделение блока, необходимого для очередной порции данных, копирование только новых данных в выделенную область и установление связи с последним блоком буфера. При этом (а) не требуется единый блок памяти все бОльшего размера и (б) не образуются все бОльшие "дыры", которые не подойдут для очередного прироста буфера.
   Пример. Пусть мы из сети получаем три пакета по одному байту длиной. Мы хотим их все сложить в буфер для дальнейшей обработки. Как организовать буфер?
   Если мы создаем монолитный статичный буфер, то мы просто указываем в качестве размера буфера 3 байта, выделяем память заранее и последовательно ее заполняем по мере получения данных. Проблемы начнутся, если мы вместо 3 байт получим 4 или более. Традиционное решение - увеличить размер буфера - требует перекомпиляции программы. Кроме того, вряд ли возможно предусмотреть такой буфер, который бы сгодился на все случаи жизни, а если бы это и было возможно, то он скорее всего был бы слишком велик для повседневного использования, то есть неоправданно расходовал память.
   Если мы создаем монолитный динамический буфер, то ситуация развивалась бы следующим образом:
   Мы получаем первый байт. В начале памяти выделяется блок размером один байт. Данные копируются в память.
   Мы получаем второй байт. Буфер заполнен, его необходимо увеличить. В памяти ищется блок размером в два байта, он выделяется под буфер. Из старого буфера копируется прежняя информация, после чего на оставшееся место дописывается только что полученный второй байт. Прежний буфер освобождается, оставляя "дырку" размером в один байт.
   Мы получаем третий байт. Буфер заполнен, его необходимо увеличить. На месте первоначального буфера имеется область размером только в один байт, следовательно надо искать новую свободную область. В памяти ищется блок размером в три байта, он выделяется под буфер. Из старого буфера копируется прежняя информация, после чего на оставшееся место дописывается только что полученный третий байт. Прежний буфер освобождается, оставляя новую "дырку" размером в два байта.
   Таким образом, мы используем три байта под буфер, и еще три байта памяти мы уже не сможем использовать под буфер в дальнейшем. Если нам понадобится получить четвертый байт, мы выведем из употребления уже 6 байт. Конечно, эта память будет доступна для других операций, не требующих блоков большей длины, но все же довольно быстро растет вероятность того, что для очередного увеличения буфера монолитного блока памяти уже не будет.
   Наконец, если мы создаем динамический связанный буфер, мы каждый раз будем выделять только по одному байту - ровно столько, сколько необходимо для размещения очередной порции данных. Фрагментация памяти при этом минимальна. Недостатком данного способа является необходимость хранения дополнительных данных, описывающих каждый из блоков. Как минимум это указатель на следующий блок в связанном списке.

   Замечания по реализации:
   Следует доработать функцию Carrier.Delete, чтобы в ней не производилось копирование буфера. Это копирование только замедляет работу и ведет к использованию лишней памяти. После удаления копирования нужно будет доработать райдеры, чтобы они при создании или сбросе указывали не на начало первого буфера (значение -1), а на байт, предшествующий первому в Carrier.
*)

MODULE CommBuffer;
(** Copyright (c) Alexander Iljin, 2006. **)

TYPE
   (** Rider предназначен для чтения данных из Carrier. *)
   Rider* = POINTER TO LIMITED RECORD
      buff: Carrier;   (* буфер, который мы просматриваем *)
      pos-: INTEGER;   (* положение относительно начала буфера *)
      curr: CarrierData;   (* этот блок мы сейчас просматриваем *)
      currPos: INTEGER;   (* положение в блоке curr *)
      closed-: BOOLEAN;   (* если TRUE, райдер больше не работает *)
      next: Rider   (* следующий райдер в списке *)
   END;

   (** Logger - "журналист", который можно прикрепить к Carrier с помощью Carrier.SetLogger. Этот объект получает копию всех данных, добавленных в буфер с помощью Carrier.Append. Может использоваться для низкоуровневого слежения за обменом данными при отладке. *)
   Logger* = POINTER TO ABSTRACT RECORD
   END;

   CarrierData = POINTER TO RECORD
      next: CarrierData;
      data: POINTER TO ARRAY OF BYTE
   END;

   Carrier* = POINTER TO RECORD
      root: CarrierData;   (* первый блок данных буфера *)
      riders: Rider;   (* райдеры, читающие данный буфер *)
      logger-: Logger;   (* прикрепленный журналист *)
      total-: LONGINT   (* суммарное количество записанных в буфер байт *)
   END;


(* Logger *)

PROCEDURE (log: Logger) Appended- (IN buff: ARRAY OF BYTE), NEW, EMPTY;
(** Данная процедура вызывается для каждого блока данных, добавленного в Carrier, с которым связан данный Logger. *)


(* CarrierData *)

PROCEDURE NewCarrierData (len: INTEGER): CarrierData;
VAR bd: CarrierData;
BEGIN
(*   StdLog.String("LEN = "); StdLog.Int(len); StdLog.Ln; *)
   IF len <= 0 THEN RETURN NIL
   ELSE
      NEW(bd);
      NEW(bd.data, len);
      RETURN bd
   END
END NewCarrierData;


(* Rider *)

PROCEDURE (r: Rider) Init, NEW;
BEGIN
   r.pos := -1;
   r.closed := FALSE
END Init;


PROCEDURE (r: Rider) Eof* (): BOOLEAN, NEW;
BEGIN
   IF r.closed OR (r.buff.root = NIL) THEN
      RETURN TRUE
   ELSE
      IF r.pos = -1 THEN
         RETURN FALSE
      ELSE
         IF r.currPos < LEN(r.curr.data)-1 THEN
            RETURN FALSE
         ELSE
            RETURN r.curr.next = NIL
         END
      END
   END
END Eof;


PROCEDURE (r: Rider) Next* (): SHORTCHAR, NEW;
(* Если Eof() = FALSE, возвращаем очередной символ, иначе возвращаем 0X. *)
BEGIN
   IF r.Eof() THEN
      RETURN 0X
   END;
   
   IF r.pos = -1 THEN
      r.curr := r.buff.root;
      IF r.curr = NIL THEN
         RETURN 0X
      ELSE
         r.currPos := 0
      END
   ELSE
      IF r.currPos < LEN(r.curr.data)-1 THEN
         INC(r.currPos)
      ELSE
         r.curr := r.curr.next;
         r.currPos := 0
      END
   END;
   INC(r.pos);
   RETURN SHORT(CHR(r.curr.data[r.currPos]))
END Next;


PROCEDURE (r: Rider) ReadBytes* (VAR x: ARRAY OF BYTE; beg, len: INTEGER; OUT read: INTEGER), NEW;
(* Читаем len байт и помещаем их в массив x, начиная с позиции beg. В read возвращаем число прочитанных байт (read <= len). *)
BEGIN
   ASSERT(beg >= 0, 20);
   ASSERT(len >= 0, 21);
   ASSERT(beg+len <= LEN(x), 22);
   
   read := 0;
   WHILE (len > 0) & (~r.Eof()) DO
      x[beg] := SHORT(ORD(r.Next()));
      DEC(len);
      INC(beg);
      INC(read)
   END
END ReadBytes;


PROCEDURE (r: Rider) Deleted (n: INTEGER), NEW;
(* Из буфера r.buff только что было удалено n первых байт (n >= 0). *)
VAR i : INTEGER;
BEGIN
   IF (r.buff.root = NIL) OR (n > r.pos) THEN
      r.pos := -1
   ELSE
      DEC(r.pos, n)
   END;
   IF r.pos > -1 THEN
      r.curr := r.buff.root;
      i := LEN(r.curr.data);
      WHILE i <= r.pos DO
         r.curr := r.curr.next;
         INC(i, LEN(r.curr.data))
      END;
      r.currPos := r.pos-(i-LEN(r.curr.data))
   END
END Deleted;


PROCEDURE (r: Rider) Close*, NEW;
(** Удаляем данный райдер из списка обновляемых. После этого использовать его невозможно, зато он будет удален сборщиком мусора. *)
VAR rd : Rider;
BEGIN
   IF (r.buff # NIL) & (r.buff.riders # NIL) & ~r.closed THEN
      r.closed := TRUE;
      IF r.buff.riders = r THEN
         r.buff.riders := r.next
      ELSE
         rd := r.buff.riders;
         WHILE rd.next # NIL DO
            IF rd.next = r THEN
               rd.next := r.next;
               RETURN
            ELSE
               rd := rd.next
            END
         END
      END
   END
END Close;


(* Carrier *)

PROCEDURE (c: Carrier) NewRider* (old: Rider): Rider, NEW;
VAR r: Rider;
BEGIN
   IF (old # NIL) & (old.buff = c) & ~old.closed THEN
      r := old
   ELSE
      NEW(r);
      r.buff := c;
      r.next := c.riders;
      c.riders := r
   END;
   r.Init;
   RETURN r
END NewRider;


PROCEDURE (c: Carrier) SetLogger* (log: Logger), NEW;
BEGIN
   c.logger := log
END SetLogger;


PROCEDURE (c: Carrier) Append* (IN buff: ARRAY OF BYTE; beg, len: INTEGER), NEW;
VAR
   i : INTEGER;
   bd, bdPtr : CarrierData;
BEGIN
   ASSERT(beg >= 0, 20);
   ASSERT(len >= 0, 21);
   ASSERT(LEN(buff) >= beg+len, 22);
   
   bd := NewCarrierData(len);
   i := 0;
   WHILE len > 0 DO
      bd.data[i] := buff[beg];
      INC(i);
      INC(beg);
      DEC(len)
   END;
   INC(c.total, i);

   IF c.root = NIL THEN c.root := bd
   ELSE
      bdPtr := c.root;
      WHILE bdPtr.next # NIL DO
         bdPtr := bdPtr.next
      END;
      bdPtr.next := bd
   END;
   IF (c.logger # NIL) & (bd # NIL) THEN
      c.logger.Appended(bd.data^)
   END
END Append;


PROCEDURE (c: Carrier) Delete* (n: INTEGER), NEW;
(** Удаляем из буфера первые n байт. **)
VAR
   bd, newbd : CarrierData;
   i, cnt, deleted : INTEGER;
   r: Rider;
BEGIN
   IF n = 0 THEN RETURN END;

   ASSERT(n > 0, 20);

   bd := c.root;
   deleted := n;
   WHILE (n > 0) & (bd # NIL) DO
      cnt := LEN(bd.data);
      IF n >= cnt THEN
         bd := bd.next;
         DEC(n, cnt)
      ELSE
         newbd := NewCarrierData(cnt-n);
         i := 0;
         WHILE n < cnt DO
            newbd.data[i] := bd.data[n];
            INC(i);
            INC(n)
         END;
         n := 0;
         newbd.next := bd.next;
         bd := newbd
      END
   END;
   c.root := bd;

   (* обновим текущую позицию райдеров *)
   r := c.riders;
   WHILE r # NIL DO
      r.Deleted(deleted);
      r := r.next
   END
END Delete;


PROCEDURE (c: Carrier) IsEmpty* (): BOOLEAN, NEW;
BEGIN
   RETURN c.root = NIL
END IsEmpty;

END CommBuffer.


Вернуться к началу
 Профиль  
 
 Заголовок сообщения:
СообщениеДобавлено: Суббота, 25 Ноябрь, 2006 02:24 
Аватара пользователя

Зарегистрирован: Вторник, 19 Сентябрь, 2006 21:54
Сообщения: 2449
Откуда: Россия, Томск
Alexander Shiryaev писал(а):
На ближайшее будущее с помощью CommAction можно реализовать протокол, предназначенный для для обмена данными по сети с программами, написанными на других языках (XML-RPC).

На ближайшее будущее? У вас какие-то планы на этот счет?


Вернуться к началу
 Профиль  
 
 Заголовок сообщения:
СообщениеДобавлено: Суббота, 25 Ноябрь, 2006 02:42 
Аватара пользователя

Зарегистрирован: Вторник, 19 Сентябрь, 2006 21:54
Сообщения: 2449
Откуда: Россия, Томск
Alexander Shiryaev писал(а):
Я давно хотел написать нечто подобное под BlackBox, только вот руки не доходили ;(

В Python-е есть такой framework - называется twisted (аналог POE в Perl).

Если писать расширения для CommAction.Engine, то неплохо было бы обратить на него внимание.

Я посмотрел описание по ссылке. Довольно интересный вариант кооперативности, вполне реализуемый в ББ. Там объект Deferred создается для регистрации обработчиков отложенного события, что похоже на CommAction.Engine, но не совсем то же самое. CommAction.Engine.Do вызывается периодически для обработки поступающих данных, а там событие одноразовое, оно просто отложено на неопределенный срок. Если взять их пример, то наследник MailEngine(Engine) обрабатывал бы протокол обмена с почтовым сервером (проверка пользователя и пароля), а потом вызывал зарегистрированных в объекте Deferred обработчиков (callbacks или errbacks в зависимости от успеха операции).

Так что да, можно использовать и для этих целей.

Alexander Shiryaev писал(а):
respect!

Спасибо! : )


Вернуться к началу
 Профиль  
 
 Заголовок сообщения:
СообщениеДобавлено: Воскресенье, 26 Ноябрь, 2006 15:36 

Зарегистрирован: Суббота, 26 Ноябрь, 2005 02:12
Сообщения: 473
Откуда: KZ
Александр Ильин писал(а):
Alexander Shiryaev писал(а):
На ближайшее будущее с помощью CommAction можно реализовать протокол, предназначенный для для обмена данными по сети с программами, написанными на других языках (XML-RPC).

На ближайшее будущее? У вас какие-то планы на этот счет?


Хотелось бы использовать BlackBox вместо Python-а в тех случаях, когда важна производительность.


Вернуться к началу
 Профиль  
 
 Заголовок сообщения:
СообщениеДобавлено: Понедельник, 27 Ноябрь, 2006 04:14 

Зарегистрирован: Суббота, 26 Ноябрь, 2005 02:12
Сообщения: 473
Откуда: KZ
Может быть лучше сделать как-нибудь так?

Код:
MODULE AixBasicProtocols;

   TYPE
      Chunk = POINTER TO RECORD
         next: Chunk;
         data: POINTER TO ARRAY OF BYTE;
         offset: INTEGER
      END;
      TemporaryBuffer* = POINTER TO RECORD
         root: Chunk;
         len: INTEGER
      END;
      
      Protocol* = POINTER TO ABSTRACT RECORD
      END;
      LineOnlyReceiver* = POINTER TO ABSTRACT RECORD (Protocol)
         buffer: TemporaryBuffer;
         delimiter: POINTER TO ARRAY OF BYTE
      END;

   PROCEDURE (b: TemporaryBuffer) Append* (IN data: ARRAY OF BYTE; beg, len: INTEGER), NEW;
      VAR chunk: Chunk;
         i: INTEGER;
   BEGIN
      ASSERT(len > 0, 20);
      ASSERT(beg >= 0, 21);
      ASSERT(beg + len <= LEN(data), 22);
      NEW(chunk);
      NEW(chunk.data, len);
      i := 0;
      WHILE i < len DO
         chunk.data[i] := data[beg];
         INC(beg);
         INC(i)
      END;
      chunk.next := b.root;
      b.root := chunk;
      chunk.offset := b.len;
      INC(b.len, len)
   END Append;

   PROCEDURE (b: TemporaryBuffer) GetBytes* (): POINTER TO ARRAY OF BYTE, NEW;
      VAR data: POINTER TO ARRAY OF BYTE;
         i, j, len: INTEGER;
         chunk: Chunk;
   BEGIN
      IF b.len > 0 THEN
         NEW(data, b.len);
         chunk := b.root;
         WHILE chunk # NIL DO
            i := chunk.offset;
            j := 0;
            len := LEN(chunk.data);
            WHILE j < len DO
               data[i] := chunk.data[j];
               INC(j);
               INC(i)
            END;
            chunk := chunk.next
         END
      END;
      RETURN data
   END GetBytes;

   PROCEDURE (b: TemporaryBuffer) Clear*, NEW;
   BEGIN
      b.len := 0;
      b.root := NIL
   END Clear;

   PROCEDURE NewTemporaryBuffer* (): TemporaryBuffer;
      VAR b: TemporaryBuffer;
   BEGIN
      NEW(b);
      b.len := 0;
      RETURN(b)
   END NewTemporaryBuffer;

   PROCEDURE (p: Protocol) DataReceived- (IN data: ARRAY OF BYTE; beg, len: INTEGER), NEW, ABSTRACT;

   PROCEDURE (p: LineOnlyReceiver) LineReceived- (IN data: ARRAY OF BYTE), NEW, ABSTRACT;

   PROCEDURE (p: LineOnlyReceiver) DataReceived- (IN data: ARRAY OF BYTE; beg, len: INTEGER);
      VAR i, dlen, j, l, k: INTEGER;
   BEGIN
      ASSERT(len > 0, 20);
      ASSERT(beg >= 0, 21);
      ASSERT(beg + len <= LEN(data), 22);
      dlen := LEN(p.delimiter);
      i := beg; j := i;
      LOOP
         WHILE (i < len) & (data[i] # p.delimiter[0]) DO
            INC(i)
         END;
         IF i = len THEN EXIT END;
         l := i - j;
         INC(i); k := 1;
         WHILE (i < len) & (k < dlen) & (data[i] = p.delimiter[k]) DO
            INC(i);
            INC(k)
         END;
         IF k = dlen THEN
            p.buffer.Append(data, j, l);
            p.LineReceived(p.buffer.GetBytes());
            p.buffer.Clear();
            j := i
         END
      END;
      IF i > j THEN
         p.buffer.Append(data, j, i - j)
      END
   END DataReceived;

   PROCEDURE (p: LineOnlyReceiver) InitializeLineOnlyReceiver* (IN delimiter: ARRAY OF BYTE), NEW;
      VAR i, len: INTEGER;
   BEGIN
      len := LEN(delimiter);
      p.buffer := NewTemporaryBuffer();
      NEW(p.delimiter, len);
      i := 0;
      WHILE i < len DO
         p.delimiter[i] := delimiter[i];
         INC(i)
      END
   END InitializeLineOnlyReceiver;

END AixBasicProtocols.


Т.е. не обязательно при получении данных из потока помещать их в буфер, а лучше передать полученные данные сразу в Protocol (метод DataReceived) - а он сам решит, что с ними делать. Даже если ему и нужно будет поместить данные в буфер, то он будет использовать свою, более эффективную реализацию.

В этом примере LineOnlyReceiver - абстрактный Protocol, который принимает данные (DataReceived), разделяет на строки с помощью заданного разделителя и вызывает для каждой строки метод LineReceived.

Здесь Protocol - это и есть CommAction.Engine, только у него вместо Do - DataReceived.


Вернуться к началу
 Профиль  
 
 Заголовок сообщения:
СообщениеДобавлено: Понедельник, 27 Ноябрь, 2006 18:39 
Аватара пользователя

Зарегистрирован: Вторник, 19 Сентябрь, 2006 21:54
Сообщения: 2449
Откуда: Россия, Томск
Вы отделили логику анализа поступающей информации от логики ее буферизации, это хорошо. Буферизация и в самом деле может быть реализована более эффективно, так что может пригодиться возможность ее изменить. К тому же, тривиальные протоколы, работающие на уровне байтов, можно было бы реализовать без использования дополнительной памяти и работы по ее выделению.

Насчет того, что "Protocol.ReceiveData" звучит гораздо понятнее, чем "Engine.Do", то я с вами на 100% согласен. Но здесь есть и концептуальное различие. Дело в том, что я предусмотрел возможность сначала получить данные, а потом их обработать. Это с прицелом на те задачи, в которых обработка данных может занять больше времени, чем требуется на их получение. Здесь мы рискуем получить переполнение системного буфера, если не будем успевать считывать из него данные, пытаясь их сразу обработать. Поэтому Engine.Do вызывается постоянно, а не только когда получены новые данные (DataReceived). Это дает возможность вычерпывать данные из буфера в очень ленивом режиме, не боясь помешать другим задачам.

По-другому это, впрочем, можно реализовать, переложив ответственность за буферизацию на протокол. Тогда протокол нижнего уровня будет складывать данные в некий буфер, а совсем другой Services.Action будет из этого буфера брать данные и лениво обрабатывать. Тогда эту схему с использованием дополнительного Services.Action можно реализовывать только в тех случаях, когда она действительно нужна.

Еще Engine.Do периодически вызывается для того, чтобы определять таймаут приема. Например, нет смысла бесконечно ждать ответа, если устройство не подключено к последовательному порту. Но и это можно реализовать в виде отдельного Action.

В целом мне ваш вариант нравится больше, чем мой. По приведенному вами коду есть только пара замечаний по процедуре LineOnlyReceiver.ReceiveData (на всякий случай, если вы будете ее где-то применять):
1. вы используете параметр len в значении "индекс байта, следующего за последним". Чтобы она этому определению соответствовала, нужно перед LOOP сделать INC(len, beg), иначе ваш код будет корректно работать только для beg = 0;
2. ваш код без сбоев будет работать только при LEN(delimiter) = 1 или 2. Если delimiter = NIL (по умолчанию), будет trap при вызове LEN(delimiter). Чтобы процедура работала корректно для более длинных разделителей (3 символа и более), необходимо в конструкцию "IF k = dlen THEN" добавить следующее:
Код:
ELSE
  DEC(i, k-1)
END

Представьте, что delimiter = "aab" и вы приняли строку "aaab". Без поправки вы не заметите здесь разделителя, хотя он есть. Дело в том, что при несовпадении символов во втором WHILE i будет равно 2, а чтобы найти разделитель нужно вернуться к i = 1.

Мне определенно нравится то, как вы структурировали программу. Переделаю у себя тоже. Заодно снимется зависимость CommAction от CommBuffer.

PS: ИМХО, строчные буквы i, j и l в некоторых шрифтах довольно трудно различить между собой, а l еще бывает похожа на 1. Я обычно использую i, c, n (i - index, c - count, n - number).


Вернуться к началу
 Профиль  
 
 Заголовок сообщения:
СообщениеДобавлено: Вторник, 28 Ноябрь, 2006 10:18 

Зарегистрирован: Понедельник, 28 Ноябрь, 2005 10:28
Сообщения: 1429
Цитата:
Здесь Protocol - это и есть CommAction.Engine, только у него вместо Do - DataReceived.

А у меня он назывался Agent, а его метод Notify. ;)
Код:
TYPE  Agent* = POINTER TO ABSTRACT RECORD END;

PROCEDURE (p: Agent) Notify* (VAR m:Message),NEW,ABSTRACT;


Вернуться к началу
 Профиль  
 
 Заголовок сообщения:
СообщениеДобавлено: Среда, 29 Ноябрь, 2006 18:13 

Зарегистрирован: Суббота, 26 Ноябрь, 2005 02:12
Сообщения: 473
Откуда: KZ
Александр Ильин писал(а):
По приведенному вами коду есть только пара замечаний по процедуре LineOnlyReceiver.ReceiveData (на всякий случай, если вы будете ее где-то применять):
1. вы используете параметр len в значении "индекс байта, следующего за последним". Чтобы она этому определению соответствовала, нужно перед LOOP сделать INC(len, beg), иначе ваш код будет корректно работать только для beg = 0;
2. ваш код без сбоев будет работать только при LEN(delimiter) = 1 или 2. Если delimiter = NIL (по умолчанию), будет trap при вызове LEN(delimiter). Чтобы процедура работала корректно для более длинных разделителей (3 символа и более), необходимо в конструкцию "IF k = dlen THEN" добавить следующее:
Код:
ELSE
  DEC(i, k-1)
END

Представьте, что delimiter = "aab" и вы приняли строку "aaab". Без поправки вы не заметите здесь разделителя, хотя он есть. Дело в том, что при несовпадении символов во втором WHILE i будет равно 2, а чтобы найти разделитель нужно вернуться к i = 1.


Так... спать надо больше :evil:


Вернуться к началу
 Профиль  
 
 Заголовок сообщения:
СообщениеДобавлено: Среда, 29 Ноябрь, 2006 21:48 

Зарегистрирован: Суббота, 26 Ноябрь, 2005 02:12
Сообщения: 473
Откуда: KZ
Trurl писал(а):
Цитата:
Здесь Protocol - это и есть CommAction.Engine, только у него вместо Do - DataReceived.

А у меня он назывался Agent, а его метод Notify. ;)
Код:
TYPE  Agent* = POINTER TO ABSTRACT RECORD END;

PROCEDURE (p: Agent) Notify* (VAR m:Message),NEW,ABSTRACT;


Привет! А у тебя какие-нибудь протоколы уже написаны? HTTP, FTP, XML-RPC? Может написать всё 1 раз нормально?


Вернуться к началу
 Профиль  
 
Показать сообщения за:  Поле сортировки  
Начать новую тему Ответить на тему  [ Сообщений: 17 ] 

Часовой пояс: UTC + 3 часа


Кто сейчас на конференции

Сейчас этот форум просматривают: нет зарегистрированных пользователей и гости: 19


Вы не можете начинать темы
Вы не можете отвечать на сообщения
Вы не можете редактировать свои сообщения
Вы не можете удалять свои сообщения
Вы не можете добавлять вложения

Найти:
Вся информация, размещаемая участниками на конференции (тексты сообщений, вложения и пр.) © 2005-2024, участники конференции «OberonCore», если специально не оговорено иное.
Администрация не несет ответственности за мнения, стиль и достоверность высказываний участников, равно как и за безопасность материалов, предоставляемых участниками во вложениях.
Без разрешения участников и ссылки на конференцию «OberonCore» любое воспроизведение и/или копирование высказываний полностью и/или по частям запрещено.
Powered by phpBB® Forum Software © phpBB Group
Русская поддержка phpBB