Сообщений 0    Оценка 467 [+1/-0]         Оценить  
Система Orphus

Работа с потоками в C#

Часть 2

Автор: Joseph Albahari
Перевод: Алексей Кирюшкин
The RSDN Group

Источники: Threading in C#
базируется на книге
Joseph Albahari Ben Albahari "C# 3.0 in a Nutshell"

Материал предоставил: RSDN Magazine #2-2007
Опубликовано: 27.06.2007
Исправлено: 15.04.2009
Версия текста: 1.0
3. Работа с потоками
Апартаменты и Windows Forms
BackgroundWorker
ReaderWriterLock
Пулы потоков
Асинхронные делегаты
Таймеры
Локальные хранилища
4. Дополнительные материалы
Неблокирующая синхронизация
Wait и Pulse
Suspend и Resume
Аварийное завершение потоков

3. Работа с потоками

Апартаменты и Windows Forms

Потоковые апартаменты – это автоматический потокобезопасный режим, тесно связанный с COM, предыдущей технологией Microsoft. В то время как .NET в основном свободен от унаследованных потоковых моделей, временами они все еще необходимы, из-за потребности работать с устаревшими API. Потоковые апартаменты особенно важны в Windows Forms, так как Windows Forms по большей части используют или оборачивают Win32 API вместе с его апартаментами.

Апартамент – логический “контейнер” для потоков. Апартаменты бывают двух видов – “single” (однопоточные) и “multi” (многопоточные). Однопоточный апартамент может содержать только один поток, многопоточный – любое количество потоков. Однопоточная модель используется чаще и имеет большие возможности для взаимодействия.

Так же, как потоки, апартаменты могут содержать в себе объекты. Когда объект создается в каком-либо апартаменте, он остается там на всю жизнь, навсегда прикованный к апартаменту и его потоку(-ам). Это напоминает объект в контексте синхронизации .NET, за исключением того, что контекст синхронизации не владеет и не содержит потоков. Любой поток может вызвать любой объект в любом контексте синхронизации – с обязательным ожиданием эксклюзивной блокировки. Объекты же, входящие в какой-либо апартамент, могут быть вызваны только потоком этого апартамента.

Вообразите себе библиотеку, где каждая книга представляет собой объект. Выносить книги из библиотеки нельзя, они должны оставаться там всю жизнь. Добавим сюда человека, который будет представлять собой поток.

Библиотека-контекст синхронизации позволит войти любому человеку, но только одному одновременно. Если людей больше – перед библиотекой образуется очередь.

Библиотека-апартамент имеет свой штат – одного библиотекаря для однопоточной библиотеки или целую группу для многопоточной. Никто, кроме штатных библиотекарей, не может войти в библиотеку. Клиент, желающий провести исследование, должен посигналить библиотекарю, а затем попросить его выполнить задание! Вызов библиотекаря называется маршалингом – клиент выполняет маршалинг своего вызова штатному сотруднику. Маршалинг выполняется автоматически и реализуется через прокачку сообщений – в Windows Forms этот механизм постоянно следит за событиями клавиатуры и мыши от операционной системы. Если обработка не успевает за новыми сообщениями, создается очередь сообщений, обрабатываемая в порядке поступления.

Назначение типа апартамента

Потоку .NET автоматически назначается апартамент при переходе к использующему апартаменты Win32-коду или унаследованному COM-коду. По умолчанию он помещается в многопоточный апартамент, если только не запрашивает однопоточный следующим образом:

Thread t = new Thread(...);
t.SetApartmentState(ApartmentState.STA);

Можно также запросить, чтобы главный поток приложения вошел в однопоточный апартамент, используя атрибут STAThread для метода main:

class Program 
{
  [STAThread]
  static void Main() 
  {
    ...

Апартаменты никак не влияют на исполнение чистого .NET-кода. Другими словами, два потока из STA могут одновременно вызвать один и тот же метод одного и того же объекта, и при этом не будет никакого маршалинга или блокировок. Это может случиться, только когда дело дойдет до неуправляемого кода.

Типы из пространства имен System.Windows.Forms интенсивно вызывают Win32-код, разработанный в расчете на работу в однопоточном апартаменте. По этой причине main-метод программы Windows Forms должен быть помечен как [STAThread], иначе при вызове Win32 UI-кода произойдет одно из двух:

Control.Invoke

В многопоточном приложении Windows Forms запрещено вызывать методы и свойства элементов управления из потоков, отличных от того, в котором они были созданы. Для всех межпоточных вызовов должен быть явно выполнен маршалинг в поток, в котором был создан элемент управления с использованием методов Control.Invoke или Control.BeginInvoke. Нельзя полагаться на автоматический маршалинг, так как он происходит слишком поздно, когда дело уже дошло до неуправляемого кода, в то время как предшествующий .NET-код уже отработал в “неправильном” потоке – и такой код не будет потокобезопасным.

Превосходное решение для управления рабочими потоками в приложениях Windows Forms состоит в использовании BackgroundWorker. Этот класс-обертка для рабочих потоков умеет уведомлять о ходе выполнения операции и ее завершении, и автоматически вызывает Control.Invoke там, где это нужно.

BackgroundWorker

BackgroundWorker – класс-обертка из пространства имен System.ComponentModel для управления рабочими потоками. Он обеспечивает следующие возможности:

Последние две возможности особенно полезны – они означают, что можно не добавлять try/catch в рабочий метод и обновлять элементы управления без использования Control.Invoke.

BackgroundWorker использует пул многократно используемых потоков, чтобы не создавать их под каждую новую задачу. Это означает, что нельзя вызывать Abort для потока BackgroundWorker.

Вот минимально необходимые действия для использования BackgroundWorker:

После этих действий начнется исполнение. Аргумент, переданный в RunWorkerAsync, будет перенаправлен в обработчик DoWork как свойство Argument параметра DoWorkEventArgs. Вот пример:

class Program 
{
  static BackgroundWorker bw = new BackgroundWorker();
  static void Main() 
  {
    bw.DoWork += bw_DoWork;
    bw.RunWorkerAsync("Message to worker");     
    Console.ReadLine();
  }
 
  static void bw_DoWork(object sender, DoWorkEventArgs e) 
  {
    // Это будет вызвано рабочим потоком
    Console.WriteLine(e.Argument); // напечатает "Message to worker"
    // Выполняем длительную операцию...
  }

BackgroundWorker также предоставляет событие RunWorkerCompleted, которое генерируется после завершения трудов обработчика события DoWork. Обработка RunWorkerCompleted необязательна, но обычно применяется, хотя бы для того, чтобы получить информацию о возможных исключениях в DoWork. Кроме того, код в обработчике RunWorkerCompleted может обновлять элементы управления Windows Forms без явного маршалинга, а код в DoWork – нет.

Чтобы добавить отображение выполнения операции:

Код в обработчике ProgressChanged может свободно обращаться к элементам управления UI так же, как и в RunWorkerCompleted. Обычно это нужно для обновления индикатора прогресса.

Чтобы иметь возможность отмены операции:

Вот пример, реализующий обе описанные возможности:

using System;
using System.Threading;
using System.ComponentModel;

class Program
{
  static BackgroundWorker bw;

  static void Main()
  {
    bw = new BackgroundWorker();
    bw.WorkerReportsProgress = true;
    bw.WorkerSupportsCancellation = true;
    bw.DoWork += bw_DoWork;
    bw.ProgressChanged += bw_ProgressChanged;
    bw.RunWorkerCompleted += bw_RunWorkerCompleted;

    bw.RunWorkerAsync(null);

    Console.WriteLine(
      "Нажмите Enter в течении следующих пяти секунд, чтобы прервать работу");
    Console.ReadLine();

    if (bw.IsBusy)
    {
      bw.CancelAsync();
      Console.ReadLine();
    }
  }

  static void bw_DoWork(object sender, DoWorkEventArgs e)
  {
    for (int i = 0; i <= 100; i += 20)
    {
      if (bw.CancellationPending)
      {
        e.Cancel = true;
        return;
      }

      bw.ReportProgress(i);
      Thread.Sleep(1000);
    }

    e.Result = 123;    // будет передано в RunWorkerComрleted
  }

  static void bw_RunWorkerCompleted(object sender,
    RunWorkerCompletedEventArgs e)
  {
    if (e.Cancelled)
      Console.WriteLine(
        "Работа BackgroundWorker была прервана пользователем!");
    else if (e.Error != null)
      Console.WriteLine("Worker exception: " + e.Error);
    else
      Console.WriteLine("Работа закончена успешно. Результат - "
        + e.Result + ". ");

    Console.WriteLine("Нажмите Enter для выхода из программы...");
  }

  static void bw_ProgressChanged(object sender,
    ProgressChangedEventArgs e)
  {
    Console.WriteLine("Обработано " + e.ProgressPercentage + "%");
  }
}

Консольный вывод (Enter нажат во время работы BackgroundWorker):

Нажмите Enter в течении следующих пяти секунд, чтобы прервать работу...
Обработано 0%
Обработано 20%
Обработано 40%

Работа BackgroundWorker была прервана пользователем!
Нажмите Enter для выхода из программы...

Консольный вывод (работа BackgroundWorker не прерывалась):

Нажмите Enter в течении следующих пяти секунд, чтобы прервать работу...
Обработано 0%
Обработано 20%
Обработано 40%
Обработано 60%
Обработано 80%
Обработано 100%
Работа закончена успешно. Результат - 123.
Нажмите Enter для выхода из программы...

Наследование от BackgroundWorker

BackgroundWorker не запечатан (sealed) и предоставляет виртуальный метод OnDoWork, давая возможность реализовать его по-своему. Когда разрабатывается потенциально долго выполняемый метод, можно использовать наследника BackgroundWorker, предназначенного для асинхронного выполнения работы. Использующий его код должен будет только обрабатывать события RunWorkerCompleted и ProgressChanged. Предположим, что у нас есть долго выполняющийся метод GetFinancialTotals:

public class Client 
{
  Dictionary <string,int> GetFinancialTotals(int foo, int bar) { ... }
  ...
}

Можно реорганизовать его следующим образом:

public class Client 
{
  public FinancialWorker GetFinancialTotalsBackground(int foo, int bar)
  {
    return new FinancialWorker(foo, bar);
  }
}
 
public class FinancialWorker : BackgroundWorker 
{
  // Можно добавить типизированные поля.
  public Dictionary <string,int> Result; 
  // Можно выставить их наружу как свойства с блокировками!
  public volatile int Foo;
  public volatile int Bar;

  public FinancialWorker() 
  {
    WorkerReportsProgress = true;
    WorkerSupportsCancellation = true;
  }
 
  public FinancialWorker(int foo, int bar) : this() 
  {
    Foo = foo;
    Bar = bar;
  }
 
  protected override void OnDoWork(DoWorkEventArgs e) 
  {
    ReportProgress(0, "Вкалываем... над отчетом...в поте лица...");
    Initialize financial report data
 
    while (!finished report ) 
    {
      if (CancellationPending) 
      {
        e.Cancel = true;
        return;
      }

      Perform another calculation step

      ReportProgress(percentCompleteCalc, "Продолжаем работу...");
    }      

    ReportProgress(100, "Готово!");
    e.Result = Result = completed report data;
  }
}

Вызвавший GetFinancialTotalsBackground получит FinancialWorker – практичную обертку, управляющую фоновой операцией. Она может сообщать о прогрессе операции, поддерживает отмену и совместима с Windows Forms без использования Control.Invoke. Кроме того, она обрабатывает возможные исключения и использует стандартный протокол (попробуйте-ка получить все это без использования BackgroundWorker!).

Возможность использования BackgroundWorker хоронит старую “асинхронную модель, основанную на событиях”.

ReaderWriterLock

Обычно экземпляры типов потокобезопасны при параллельных операциях чтения, но не при параллельных обновлениях, или параллельном чтении и обновлении. То же самое справедливо для ресурсов, например файлов. Если в основном выполняется чтение и только изредка – запись, защита экземпляров таких типов простой эксклюзивной блокировкой для всех режимов доступа может необоснованно ограничить параллелизм. В качестве примера можно привести сервер приложений, в котором часто используемые данные кэшируются в статических полях для быстрого доступа. Класс ReaderWriterLock разработан специально под такой сценарий работы.

ReaderWriterLock предоставляет отдельные методы для блокировки на чтение и на запись – AcquireReaderLock и AcquireWriterLock. Оба метода принимают аргумент-таймаут и генерируют исключение ApplicationException, если этот таймаут истекает (вместо возвращения false, как это делают остальные аналогичные методы, связанные с потоками). Таймаут может быть легко превышен, если ресурс пользуется популярностью.

Блокировка снимается при помощи методов ReleaseReaderLock или ReleaseWriterLock. Эти методы поддерживают вложенные блокировки. Предоставляется также метод ReleaseLock, снимающий все вложенные блокировки за один вызов. (Далее можно вызвать RestoreLock для восстановления состояния всех блокировок, предшествовавшего вызову ReleaseLock – в подражание поведению Monitor.Wait).

Можно начать с блокировки на чтение вызовом AcquireReaderLock, затем превратить ее в блокировку на запись, используя UpgradeToWriterLock. Этот метод возвращает cookie для последующего вызова DowngradeFromWriterLock. Такая система позволяет читателю запрашивать временный доступ для записи без необходимости повторного ожидания в очереди.

В следующем примере стартуют четыре потока – один непрерывно добавляет элементы в список, другой их удаляет, а два оставшихся постоянно сообщают о количестве элементов в списке. Первые два устанавливают блокировку на запись, два оставшихся – только на чтение. При каждой блокировке используется таймаут в 10 секунд (обработка исключений в данном примере опущена для краткости).

class Program 
{
  static ReaderWriterLock rw = new ReaderWriterLock();
  static List<int> items = new List<int>();
  static Random rand = new Random();
 
  static void Main(string[] args) 
  {
    new Thread(delegate() { while (true) AppendItem(); }).Start();
    new Thread(delegate() { while (true) RemoveItem(); }).Start();
    new Thread(delegate() { while (true) WriteTotal(); }).Start();
    new Thread(delegate() { while (true) WriteTotal(); }).Start();
  }
 
  static int GetRandNum(int max) { lock (rand) return rand.Next(max); }
 
  static void WriteTotal() 
  {
    rw.AcquireReaderLock(10000);
    int tot = 0;

    foreach (int i in items)
      tot += i;

    Console.WriteLine(tot);
    rw.ReleaseReaderLock();
  }
 
  static void AppendItem() 
  {
    rw.AcquireWriterLock(10000);
    items.Add(GetRandNum(1000));
    Thread.SpinWait(400);
    rw.ReleaseWriterLock();
  }
 
  static void RemoveItem() 
  {
    rw.AcquireWriterLock(10000);

    if (items.Count > 0)
      items.RemoveAt(GetRandNum(items.Count));

    rw.ReleaseWriterLock();
  }
}

Поскольку добавление элементов в список происходит быстрее, чем удаление, в этом примере в метод AppendItem добавлен вызов SpinWait для сохранения баланса.

Пулы потоков

Если в вашем приложении так много потоков, что большая часть их времени тратится на ожидание на WaitHandle, вы можете уменьшить накладные расходы, используя пул потоков. Экономия достигается за счет объединения WaitHandle нескольких потоков.

Для использования пула потоков нужно зарегистрировать WaitHandle и делегат, который должен быть исполнен, когда WaitHandle будет установлен. Это делается вызовом ThreadPool.RegisterWaitForSingleObject, как в следующем примере:

class Test 
{
  static ManualResetEvent starter = new ManualResetEvent(false);
 
  public static void Main() 
  {
    ThreadPool.RegisterWaitForSingleObject(starter, Go, "привет", -1, true);
    Thread.Sleep(5000);
    Console.WriteLine("Запускается рабочий поток...");
    starter.Set();
    Console.ReadLine();
  }
 
  public static void Go(object data, bool timedOut) 
  {
    Console.WriteLine("Запущено: " + data);
    // Выполнение задачи...
  }
}

Консольный вывод (после пятисекундной задержки):

Запускается рабочий поток...
Запущено: привет

В дополнение к WaitHandle и делегату RegisterWaitForSingleObject принимает объект-“черный ящик”, который будет передан в ваш делегат (как это было для ParameterizedThreadStart), таймаут в миллисекундах (-1, если таймаут не нужен) и флаг, указывающий, является запрос одноразовым или повторяющимся.

Все потоки пула являются фоновыми, они уничтожаются автоматически, когда завершается основной поток(-и) приложения. Однако ожидание выполнения важной работы в потоках пула перед завершением приложения на вызове Join бессмысленно, так как потоки пула после выполнения работы не завершаются, они используются повторно до завершения родительского процесса. Поэтому чтобы узнать, когда исполняемая работа будет закончена, необходимо посигналить, например, специальным WaitHandle.

Вызов Abort для потоков пула – это тоже плохая идея. Потоки пула должны повторно использоваться в течение всей жизни приложения.

Можно использовать пул потоков и без WaitHandle, вызывая метод QueueUserWorkItem и задавая делегат для немедленного вызова. При этом вы не сохраните потоки для повторного использования, но получите выгоду в другом: пул потоков поддерживает максимальное количество потоков (по умолчанию 25), автоматически выстраивая задачи в очередь, когда их количество превышает эту цифру. Это скорее походит на очередь поставщик/потребитель с 25-ю потребителями! В следующем примере 100 заданий ставятся в очередь к пулу потоков, и только 25 одновременно исполняются. Главный поток ждет, пока все задания не будут выполнены, используя Wait и Pulse:

class Test
{
  static object workerLocker = new object();
  static int runningWorkers = 100;

  public static void Main()
  {
    for (int i = 0; i < runningWorkers; i++)
      ThreadPool.QueueUserWorkItem(Go, i);

    Console.WriteLine("Ожидаем завершения работы потоков...");

    lock (workerLocker)
      while (runningWorkers > 0)
        Monitor.Wait(workerLocker);

    Console.WriteLine("Готово!");
    Console.ReadLine();
  }

  public static void Go(object instance)
  {
    Console.WriteLine("Запущен:  " + instance);
    Thread.Sleep(1000);
    Console.WriteLine("Завершен: " + instance);

    lock (workerLocker)
    {
      runningWorkers--;
      Monitor.Pulse(workerLocker);
    }
  }
}

Консольный вывод:

Ожидаем завершения работы потоков..
Запущен:  0
Завершен: 0
Запущен:  2
Запущен:  1
Запущен:  3
Завершен: 2
Запущен:  5
Завершен: 1
Запущен:  6
Запущен:  4
...
Завершен: 95
Завершен: 96
Завершен: 97
Завершен: 99
Завершен: 98
Готово!

Если необходимо передать в целевой метод более одного объекта, можно или создать объект с нужными свойствами, или использовать анонимный метод. Например, если метод Go требует два целочисленных параметра, можно запустить его следующим образом:

ThreadPool.QueueUserWorkItem(delegate(object notUsed) { Go(23,34); });

Другой путь в пул потоков – через асинхронные делегаты.

Асинхронные делегаты

В первой части было описано, как передать данные в поток, используя ParameterizedThreadStart. Иногда же нужно, наоборот, вернуть данные из потока, когда он завершит выполнение. Асинхронные делегаты предлагают для этого удобный механизм, позволяя передавать в обоих направлениях любое количество параметров с контролем их типа. Кроме того, необработанные исключения из асинхронных делегатов удобно перебрасываются в исходный поток и, таким образом, не требуют отдельной обработки. Асинхронные делегаты также предоставляют альтернативный путь к пулу потоков.

Цена, которую нужно заплатить – асинхронная модель. Чтобы понять, что это значит, сначала рассмотрим обычную, синхронную модель программирования. Скажем, нужно сравнить две web-страницы. Можно последовательно загрузить каждую страницу, а затем сравнить их примерно так:

static void ComparePages() 
{
  WebClient wc = new WebClient();
  string s1 = wc.DownloadString("http://www.rsdn.ru");
  string s2 = wc.DownloadString("http://rsdn.ru");
  Console.WriteLine(s1 == s2 ? "Одинаковые" : "Различные");
}

Конечно, было бы быстрее, если бы обе страницы загружались одновременно. Возможный взгляд на эту проблему – возложить вину на DownloadString, которая блокирует вызывающий метод на время загрузки страницы. Было бы хорошо вызвать DownloadString не блокирующим, асинхронным способом, другими словами:

  1. Говорим DownloadString начать выполнение.
  2. Исполняем другие задачи, пока она работает, например загружаем вторую страницу
  3. Запрашиваем у DownloadString результаты.

Класс WebClient предлагает встроенный метод DownloadStringAsync, предоставляющий возможности, аналогичные асинхронным. Однако пока мы его проигнорируем и сосредоточимся на механизме, который позволяет вызывать асинхронно любые методы.

Третий шаг – это то, что делает асинхронные делегаты полезными. Вызывающий поток стыкуется с рабочим, чтобы получить результаты и чтобы позволить повторную генерацию исключений. Без этого шага мы имеем нормальную многопоточность. Однако использование асинхронных делегатов без этого заключительного рандеву немногим более полезно, чем ThreadPool.QueueWorkerItem или BackgroundWorker.

Вот как можно использовать асинхронные делегаты для загрузки двух web-страниц, с одновременным выполнением других вычислений:

delegate string DownloadString(string uri);
 
static void ComparePages() 
{
  // создаем два экземпляра делегата DownloadString:
  DownloadString download1 = new WebClient().DownloadString;
  DownloadString download2 = new WebClient().DownloadString;
  
  // Стартуем загрузку:
  IAsyncResult cookie1 = download1.BeginInvoke(uri1, null, null);
  IAsyncResult cookie2 = download2.BeginInvoke(uri2, null, null);
  
  // Выполняем какие-то вычисления:
  double seed = 1.23;

  for (int i = 0; i < 1000000; i++)
    seed = Math.Sqrt(seed + 1000);
  
  // Получаем результат загрузки, ожидая в случае необходимости.
  // Если были исключения, они будут сгенерированы здесь:
  string s1 = download1.EndInvoke(cookie1);
  string s2 = download2.EndInvoke(cookie2);
  
  Console.WriteLine(s1 == s2 ? "Одинаковые" : "Различные");
}

Мы начинаем с объявления и создания делегатов для методов, которые хотим исполнить асинхронно. В этом примере нам нужны два делегата – каждый для отдельного объекта WebClient (WebClient не допускает параллельного доступа, если бы это было возможно, мы использовали бы один единственный делегат).

Далее вызываем BeginInvoke. Методы WebClient начинают исполняться, а управление немедленно возвращается в вызывающий код. В соответствии с сигнатурой делегата в BeginInvoke передается строка, а EndInvoke возвращает строку.

BeginInvoke нужны два дополнительных параметра – метод обратного вызова и объект с данными; обычно в них нет необходимости и можно передать в них null. BeginInvoke возвращает объект IASynchResult, используемый как cookie для вызова EndInvoke. У объекта IASynchResult есть также свойство IsCompleted, которое можно использовать для проверки завершения операции.

Далее мы вызываем для делегатов EndInvoke, так нам нужны их результаты. При необходимости EndInvoke будет ожидать завершения операции, а затем вернет значение, указанное в типе делегата (в нашем случае строку). Удобная особенность EndInvoke – если бы в DownloadString были ref- или out-параметры, они были бы добавлены в сигнатуру EndInvoke, позволяя возвратить таким образом несколько значений.

Если при исполнении асинхронного метода возникает необработанное исключение, оно будет повторно сгенерировано при вызове EndInvoke. Это позволяет аккуратно передавать исключения вызывающему коду.

Даже если вызываемый асинхронно метод не возвращает никакого значения, чисто технически все равно необходимо вызывать EndInvoke. Однако на практике возможны варианты. MSDN выдает противоречивые рекомендации на этот счет. Если вы решите не вызывать EndInvoke, вам необходимо будет разрешить проблему обработки исключений в вызываемом методе.

Асинхронные методы

Некоторые типы .NET Framework предлагают асинхронные версии своих методов, с именами, начинающимися с "Begin" и "End". Они называются асинхронными методами и имеют сигнатуры, подобные асинхронным делегатам, но предназначены они для решения другой проблемы – выполнять больше асинхронных манипуляций, чем у вас есть потоков. Например, Web-сервер или сервер на TCP-сокетах могут параллельно обрабатывать несколько сотен запросов на горстке потоков из пула, используя NetworkStream.BeginRead и NetworkStream.BeginWrite.

Старайтесь однако, избегать асинхронных методов, если вы не пишете специализированное приложение, которое должно обрабатывать много параллельных запросов, по следующим причинам:

Если вам просто нужно параллельное выполнение, лучше вызовите синхронную версию метода (например NetworkStream.Read) через асинхронный делегат. Другая возможность – использовать ThreadPool.QueueUserWorkItem или BackgroundWorker — либо просто создать новый поток.

Асинхронные события

Существует и другой сценарий, по которому типы могут предоставить асинхронные версии своих методов. Он называется “асинхронностью на основе событий”. Названия таких методов заканчиваются на “Async” и “Completed”. Класс WebClient применяет такой способ в методе DownloadStringAsync. Чтобы использовать его, добавьте обработчик для события “Completed” (в данном случае DownloadStringCompleted), и затем вызывайте метод "Async" (т.е. DownloadStringAsync). Когда метод завершит работу, будет вызван ваш обработчик события. К сожалению, реализация этих методов в WebClient испорчена – DownloadStringAsync блокирует вызывающий код на часть времени загрузки.

Сценарий на основе событий также поддерживает реализованные дружественным для Windows Forms-приложений способом события, информирующие о ходе продвижения асинхронной операции и ее отмене. Если вам нужны эти возможности, а тип не поддерживает асинхронность на основе событий (или реализует ее некорректно), не старайтесь самостоятельно реализовать именно этот паттерн, все это проще реализуется как обертка для BackgroundWorker.

Таймеры

Самый простой способ выполнять метод периодически – использовать таймер, например класс Timer из пространства имен System.Threading. Этот таймер использует пул потоков, допуская создание множества таймеров без накладных расходов в виде такого же количества потоков. Timer – довольно простой класс с конструктором и парой методов (просто наслаждение для минималистов и авторов книг!).

public sealed class Timer : MarshalByRefObject, IDisposable
{
  public Timer(TimerCallback tick, object state, 1st, subsequent);
  public bool Change(1st, subsequent);  // Для изменения периода
  public void Dispose();                // Для удаления
}
// 1st = время до первого срабатывания в миллисекундах или как TimeSpan
// subsequent = следующие интервалы в миллисекундах или как TimeSpan 
// (используйте Timeout.Infinite для одноразового срабатывания)

В следующем примере таймер вызывает метод Tick, который печатает "tick..." по истечении 5 секунд и далее каждую секунду, пока пользователь не нажмет Enter:

using System;
using System.Threading;
 
class Program 
{
  static void Main() 
  {
    Timer tmr = new Timer(Tick, "tick...", 5000, 1000);
    Console.ReadLine();
    tmr.Dispose();           // Остановка таймера
  }

  static void Tick(object data) 
  {
    // Этот код выполняется на потоке из пула
    Console.WriteLine(data); // Печать: "tick..."
  }
}

.NET Framework предоставляет также другой класс таймера с тем же самым именем в пространстве имен System.Timers. Это простая обертка System.Threading.Timer, с тем же самым основным механизмом, обеспечивающая дополнительные удобства при использовании пула потоков. Вот основные дополнительные возможности:

Вот пример:

using System;
using System.Timers;   // Пространство имен Timers вместо Threading
 
class SystemTimer 
{
  static void Main()
  {
    Timer tmr = new Timer();       // Конструктор без параметров
    tmr.Interval = 500;
    tmr.Elapsed += tmr_Elapsed;    // Событие вместо делегата
    tmr.Start();                   // Запустить таймер
    Console.ReadLine();

    tmr.Stop();                    // Остановить таймер
    Console.ReadLine();

    tmr.Start();                   // Продолжить
    Console.ReadLine();

    tmr.Dispose();                 // Остановить навсегда
  }
 
  static void tmr_Elapsed(object sender, EventArgs e)
  {
    Console.WriteLine("Tick");
  }
}

.NET Framework предоставляет еще и третий вид таймера – в пространстве имен System.Windows.Forms. Похожий на System.Timers.Timer по интерфейсу, он радикально отличается от него по функциональности. Таймер Windows Forms не использует пула потоков, вместо этого вызывая событие “Tick” всегда в том же самом потоке, в котором был создан таймер. При условии, что таймер создается в главном потоке, там же, где все формы и элементы управления приложения Windows Forms, обработчик события срабатывания таймера может взаимодействовать с формой и элементами управления без нарушения потоковой безопасности и необходимости вызовов Control.Invoke.

Таймер Windows Forms предназначен для заданий, которые могут привести к обновлению пользовательского интерфейса, и которые должны выполняться достаточно быстро. Быстрота выполнения важна, так как событие Tick вызывается в главном потоке, а значит, во время его выполнения интерфейс не будет отвечать на действия пользователя.

Локальные хранилища

Каждый поток получает область данных, изолированную от всех других потоков. Это необходимо для хранения специфических данных инфраструктуры выполнения, типа передачи сообщений, транзакций или токенов безопасности. Передавать такие данные через параметры методов было бы слишком неудобно, хранение же информации в статических полях означает доступность ее для всех потоков.

Метод Thread.GetData читает из изолированной области данных потока, Thread.SetData пишет в нее. Оба метода требуют в качестве параметра объект LocalDataStoreSlot (на самом деле это только обертка для строки с именем слота) для идентификации слота. Один и тот же объект LocalDataStoreSlot может быть использован из любого потока для получения им своих локальных данных. Вот пример:

class ... 
{
  // Этот объект LocalDataStoreSlot может быть использован из любого потока.
  LocalDataStoreSlot secSlot = Thread.GetNamedDataSlot("securityLevel");
 
  // Это свойство будет иметь своё значение для каждого потока
  int SecurityLevel 
  {
    get 
    {
      object data = Thread.GetData(secSlot);
      return data == null ? 0 : (int)data;    // null == uninitialized
    }

    set { Thread.SetData(secSlot, value); }
  }
  ...

Thread.FreeNamedDataSlot освобождает соответствующий слот данных всех потоков, но только тогда, когда все объекты LocalDataStoreSlot с одним и тем же именем выйдут из области видимости и будут уничтожены сборщиком мусора. Это гарантирует, что слот данных не будет отобран у потока, пока он сохраняет ссылку на соответствующий объект LocalDataStoreSlot.

4. Дополнительные материалы

Неблокирующая синхронизация

Ранее было сказано, что синхронизация необходима даже в простых случаях присвоения значения или увеличения значения поля. Хотя эксклюзивная блокировка в данном случае и может помочь, в результате борьбы за блокировку поток может быть заблокирован, что чревато соответствующими накладными расходами. Конструкции неблокирующей синхронизации .NET Framework позволяют выполнить простые операции без блокирования, приостановок и ожидания. При этом используются атомарные операции, а также чтение и запись с семантикой “volatile”. Иногда проще использовать такие конструкции, а не блокировки.

Атомарность и Interlocked

Инструкция является атомарной, если она выполняется как единая, неделимая команда. Строгая атомарность препятствует любой попытке вытеснения. В C# простое чтение или присвоение значения полю в 32 бита или менее является атомарным (для 32-битных CPU). Операции с большими полями не атомарны, так как являются комбинацией более чем одной операции чтения/записи:

class Atomicity 
{
  static int x;
  static int y;
  static long z;
  
  static void Test()
  {
    long myLocal;
    x = 3;             // Атомарная операция
    z = 3;             // Не атомарная (z – 64-битная переменная)
    myLocal = z;       // Не атомарная (z is 64 bits)
    y += x;            // Не атомарная (операции чтения и записи)
    x++;               // Не атомарная (операции чтения и записи)
  }
}

Чтение и запись 64-битных полей не атомарны на 32-битных CPU, так при этом используются два 32-битных участка памяти. Если поток A читает 64-битное значение, в то время как поток B обновляет его, поток A может получить битовую комбинацию из старого и нового значений.

Унарные операторы типа x++ сначала читают переменную, затем обрабатывают ее, а потом записывают новое значение. Рассмотрим следующий класс:

class ThreadUnsafe 
{
  static int x = 1000;

  static void Go()
  {
    for (int i = 0; i < 100; i++)
      x--;
  }
}

Вы могли бы подумать, что если бы десять потоков одновременно выполняли Go, в результате x было бы равно 0. Однако такой гарантии нет, потому что возможна ситуация, когда один поток вытеснит другой после получения им текущего значения x, уменьшит его и запишет его назад (в результате первый поток продолжит работу с устаревшим значением x).

Один из путей решения таких проблем – обернуть неатомарные операции в блокировку. Блокировка фактически моделирует атомарность. Однако класс Interlocked предлагает более простое и быстрое решение для простых атомарных операций:

class Program 
{
  static long sum;
 
  static void Main()                                            // sum
  {
    // Простой increment/decrement:
    Interlocked.Increment(ref sum);                             // 1
    Interlocked.Decrement(ref sum);                             // 0
 
    // Сложение/вычитание:
    Interlocked.Add(ref sum, 3);                                // 3
 
    // Чтение 64-битного поля:
    Console.WriteLine(Interlocked.Read(ref sum));               // 3
 
    // Запись 64-битного поля после чтения предыдущего значения:
    Console.WriteLine(Interlocked.Exchange(ref sum, 10));       // 10
 
    // Обновление поля только если оно соответствует
    // определенному значению(10):
    Interlocked.CompareExchange(ref sum, 123, 10);              // 123
  }
}

Использование Interlocked вообще более эффективно, чем lock, так как при этом в принципе отсутствует блокировка – и соответствующие накладные расходы на временную приостановку потока.

Interlocked аналогично действует и при использовании из разных процессов – в отличие от оператора lock, который эффективен только в рамках потоков текущего процесса. Это может быть использовано, например, при чтении и записи в разделяемую память (shared memory).

Барьеры в памяти и асинхронная изменчивость (volatility)

Рассмотрим следующий класс:

class Unsafe 
{
  static bool endIsNigh;
  static bool repented;

  static void Main()
  {
    // Запустить поток, ждущий изменения флага в цикле...
    new Thread(Wait).Start();
    Thread.Sleep(1000); // Дадим секунду на «прогрев»!
    repented = true;
    endIsNigh = true;
    Console.WriteLine("Понеслась...");
  }
  
  static void Wait()
  {
    while (!endIsNigh) // Крутимся в ожидании изменения значения endIsNigh
      ;

    Console.WriteLine("Готово, " + repented);
  }
}

Внимание, вопрос: насколько существенная задержка может разделять "Понеслась..." от "Готово" – другими словами, может ли цикл в методе Wait продолжать крутиться после того, как флаг endIsNigh был установлен в true? И еще, может ли метод Wait напечатать "Готово, false"?

Ответ на оба вопроса – теоретически да, может, на многопроцессорной машине, если планировщик потоков назначит эти два потока на разные CPU. Поля repented и endIsNigh могут кэшироваться в регистрах CPU для повышения производительности и записываться назад в память с некоторой задержкой. И порядок, в каком регистры записываются в память, необязательно совпадет с порядком обновления полей.

Это кэширование можно обойти, используя статические методы Thread.VolatileRead и Thread.VolatileWrite для чтения и записи полей. VolatileRead – это способ “читать последнее значение”; VolatileWrite означает “записать немедленно в память”. Того же эффекта можно достичь более изящно, объявлением полей с модификатором volatile:

class ThreadSafe 
{
  // используйте семантику чтения/записи volatile:
  volatile static bool endIsNigh;
  volatile static bool repented;
  ...

Если ключевое слово volatile используется как замена методов VolatileRead и VolatileWrite, можно просто думать, что оно означает “не использовать кэш потока для этого поля!”.

Тот же эффект может быть достигнут оборачиванием доступа к repented и endIsNigh в оператор lock. Это работает, так как побочный (но необходимый) эффект блокировки состоит в создании барьера в памяти – для гарантии, что асинхронная изменчивость полей, используемых внутри конструкции lock, не выходит за ее пределы. Другими словами, значения полей будут самими свежими при входе в lock (volatile-чтение) и будут записаны в память перед выходом из lock (volatile-запись).

Использование оператора lock было бы необходимо, если бы нужно было получить доступ к полям repented и endIsNigh атомарно, например, выполнить что-то типа такого:

lock (locker)
{
  if (endIsNigh)
    repented = true; 
}

lock также может оказаться предпочтительнее там, где поля много раз используются в цикле (при этом lock сделан на весь цикл). Хотя volatile-чтение/запись превосходит lock в производительности, маловероятно, что тысяча операций volatile-чтения/записи окажется выгоднее одной блокировки.

Асинхронная изменчивость присуща только примитивным интегральным типам (и unsafe-указателям) – другие типы не кэшируются в регистрах CPU и не могут быть объявлены с ключевым словом volatile. Семантика volatile-чтения и записи автоматически применяется к полям, когда доступ осуществляется через класс Interlocked.

Если ваша политика предполагает доступ к полям из разных потоков в операторе lock, volatile и Interlocked вам не нужны.

Wait и Pulse

Ранее мы рассматривали EventWaitHandle – простой сигнальный механизм блокировки потока до получения уведомления от другого потока.

Более мощная сигнальная конструкция предоставляется классом Monitor при помощи двух статических методов – Wait и Pulse. Принцип состоит в том, что вы пишете сигнальную логику сами, используя флаги и поля (вместе с оператором lock), а затем вводите команды Wait и Pulse для уменьшения нагрузки на CPU. Преимущество такого низкоуровневого подхода в том, что используя только Wait, Pulse и lock можно получить функциональность AutoResetEvent, ManualResetEvent и Semaphore, а также статических методов WaitHandle WaitAll и WaitAny. Кроме того, Wait и Pulse можно применить в ситуациях, где любой WaitHandle бросает вызов бережливости.

Проблема Wait и Pulse – скудная документация, особенно в отношении необходимости их применения. И что еще хуже, Wait и Pulse испытывают особенное отвращение к дилетантам: если вы вызываете их без полного понимания, они это узнают – и будут счастливы найти и замучать вас до смерти! К счастью, есть простой образец, которому можно следовать, и который обеспечивает надежное решение в каждом случае.

Определение Wait и Pulse

Назначение Wait и Pulse – обеспечить простой сигнальный механизм: Wait блокирует, пока не получено уведомление от другого потока, Pulse реализует это уведомление.

Чтобы сигнализация сработала, Wait должен выполняться перед Pulse. Если Pulse выполнится первым, его сигнал будет потерян, и вызванный после него Wait должен будет ожидать следующего сигнала или остаться навсегда заблокированным. Это поведение отличается от AutoResetEvent, у которого метод Set имеет эффект “защелки” и работает, даже если вызван до WaitOne.

Для вызова Wait или Pulse необходимо определить объект синхронизации. Если два потока используют один и тот же объект, они способны посигналить друг другу. Объект синхронизации должен быть заблокирован перед вызовом Wait или Pulse.

Например, если x объявлен следующим образом:

class Test 
{
  // Любой объект ссылочного типа может быть объектом синхронизации
  object x = new object();
}

то следующий код заблокирует поток на вызове Monitor.Wait:

lock (x)
  Monitor.Wait(x);

А этот код (если он выполнен позже в другом потоке) освободит блокированный поток:

lock (x)
  Monitor.Pulse(x);

Переключение блокировки

Чтобы выполнить эту работу, Monitor.Wait временно освобождает или отключает базовый lock на время ожидания, чтобы другой поток (который будет вызывать Pulse) тоже мог получить блокировку. Метод Wait можно представить в виде следующего псевдокода:

Monitor.Exit(x);             // освободить блокировку
Ожидать вызова pulse для x
Monitor.Enter(x);            // восстановить блокировку

Следовательно, Wait может заблокировать поток дважды: один раз при ожидании Pulse, и еще раз – при восстановлении эксклюзивной блокировки. Это также означает, что Pulse не полностью разблокирует ожидающий поток: только когда сигнализирующий поток покидает конструкцию lock, ожидающий поток действительно может идти дальше.

Переключение блокировки методом Wait эффективно независимо от уровня вложенности блокировок. Например, если Wait вызывается из двух вложенных выражений блокировки:

lock (x)
  lock (x)
    Monitor.Wait(x);

Wait логически разворачивается следующим образом:

Monitor.Exit(x); Monitor.Exit(x);    // Exit дважды для освобождения lock
Ожидать вызова pulse для x
Monitor.Enter(x); Monitor.Enter(x);  // Восстановление уровней вложенности

Согласно нормальной семантике блокировки, только первый вызов Monitor.Enter действительно может произвести блокировку.

Зачем нужен lock?

Почему Wait и Pulse разработаны так, что могут работать только в пределах lock? Основная причина – дать возможность вызвать Wait по условию, без нарушений потоковой безопасности. В качестве простого примера предположим, что нужно вызвать Wait, только если булево поле равно false. Следующий код будет потокобезопасным:

lock (x) 
{
  if (!available)
    Monitor.Wait(x);

  available = false;
}

Несколько потоков могут выполнять этот код одновременно, но ни один не может быть вытеснен между проверкой поля и вызовом Monitor.Wait. Эти две инструкции являются атомарными. Аналогично, генерация уведомления также будет потокобезопасной:

lock (x)
{
  if (!available)
  {
    available = true;
    Monitor.Pulse(x);
  }
  ...

Задание таймаута

Таймаут можно задать при вызове Wait как число миллисекунд или как TimeSpan. Wait возвращает false, если ожидание завершается по таймауту. Таймаут участвует только в фазе ожидания сигнала (Pulse), после ее окончания Wait должен будет вновь заблокировать x, и будет пытаться сделать это столько, сколько потребуется. Вот пример:

lock (x) 
{
  if (!Monitor.Wait(x, TimeSpan.FromSeconds(10)))
    Console.WriteLine("Не дождалися!");

  Console.WriteLine("А 'x'-то все еще заблокирован!");
}

Это поведение объясняется тем, что в правильно разработанном Wait/Pulse-приложении объект, на котором вызываются Wait и Pulse, блокируется на короткий промежуток времени, так что переключение блокировки должно быть почти мгновенной операцией.

Сигнализация и подтверждения

Важная особенность Monitor.Pulse – этот вызов выполняется асинхронно, без блокировок или других задержек. Если другой поток ждет на сигнальном объекте, он получит уведомление, если нет, вызов Pulse будет тихо проигнорирован.

Pulse обеспечивает только одностороннюю коммуникацию в направлении ожидающего потока. Никакого встроенного механизма подтверждений не существует. Pulse не возвращает значения, указывающего, был ли получен сигнальный импульс. Кроме того, когда сигнализирующий поток пошлет сигнал и освободит блокировку, нет никаких гарантий, что ждущий сигнала поток вернется к жизни немедленно. Возможна произвольная задержка, определяемая планировщиком потоков, в течении которой никакой поток не владеет блокировкой. Это еще более затрудняет определение момента, когда ожидающий поток действительно возобновляет исполнение, если только не предусмотрено специального подтверждения, например в виде флага.

Если требуется подтверждение, оно должно добавляться явным образом, например, в виде флага, связывающего потоки, вызывающие Pulse и Wait.

Полагаясь на своевременные действия потока, ожидающего сигнала, без специального механизма подтверждений вы проиграете!

Очередь ожидания и PulseAll

Вызвать Wait на одном и том же объекте могут сразу несколько потоков – в этом случае за объектом синхронизации образуется очередь ожидания, "waiting queue" (не путать c очередью ожидания на lock – "ready queue"). Каждый Pulse освобождает один поток из головы очереди ожидания, "waiting queue", после чего он переходит к "ready queue" для переустановки блокировки. Можно провести аналогию с автоматической парковкой для машин – сначала вы стоите в очереди к автомату для проверки билетов ("waiting queue"), а потом ждете снова перед шлагбаумом на входе ("ready queue").


Рисунок 2: Waiting Queue и Ready Queue

Однако часто упорядоченность потоков, присущая очередям, не нужна в Wait/Pulse-приложениях, и в таких случаях проще представить себе некий "пул" ожидающих потоков. Каждый Pulse освобождает один поток из пула.

Класс Monitor предоставляет также метод PulseAll, освобождающий всю очередь или пул потоков. Потоки, однако, все равно стартуют не все сразу, а в определенной последовательности, так как каждый Wait пытается переустановить одну и ту же блокировку. Так что PulseAll просто перемещает потоки из "waiting queue" в "ready queue", после чего они могут продолжить исполнение в дежурном порядке.

Использование Pulse и Wait

Итак, начнем. И для начала условимся о следующем:

Имея в виду эти два правила, рассмотрим простой пример: рабочий поток, который приостанавливается, пока не получит уведомление от главного потока:

class SimpleWaitPulse 
{
  bool go;
  object locker = new object();
 
  void Work() 
  {
    Console.Write("Ждем... ");

    lock (locker)
    {
      while (!go) 
      {
        // Освободим блокировку, чтобы другой поток мог изменить флаг go
        Monitor.Exit(locker); 
        // Снова заблокируем перед проверкой go в while
        Monitor.Enter(locker);
      }
    }

    Console.WriteLine("Оповещен!");
  }
 
  void Notify()// вызывается из другого потока
  {
    lock (locker) 
    {
      Console.Write("Оповещаем... ");
      go = true;
    }
  }
}

Вот метод Main, приводящий все это в движение:

static void Main() 
{
  SimpleWaitPulse test = new SimpleWaitPulse();
 
  // Запускаем метод Work в отдельном потоке
  new Thread(test.Work).Start(); // "Ждем..."
 
  // Подождем секунду и уведомим рабочий поток из главного:
  Thread.Sleep(1000);
  test.Notify(); // "Оповещаем... Оповестили!"
}

Метод Work, где мы крутимся в цикле, постоянно потребляет ресурсы CPU, пока флаг go установлен в true! В цикле нужно постоянно переключать блокировку при помощи Monitor.Enter и Monitor.Exit – чтобы другой поток мог получить блокировку и модифицировать флаг go. Доступ к полю go должен всегда осуществляться только изнутри lock, чтобы избежать проблем с асинхронной изменчивостью (volatility) (помните, что по правилам, о которых мы условились, другие конструкции синхронизации, в том числе и ключевое слово volatile нам недоступны!).

Теперь запустим пример, чтобы убедиться, что он действительно работает. Вот что он выводит:

Ждем... (пауза) Оповещаем... Оповестили!

Добавим Wait и Pulse. Сделаем это так:

Вот модифицированный класс, с опущенными для краткости вызовами Console:

class SimpleWaitPulse 
{
  bool go;
  object locker = new object();
 
  void Work() 
  {
    lock (locker)
      while (!go)
        Monitor.Wait(locker);
  }
 
  void Notify() 
  {
    lock (locker) 
    {
      go = true;
      Monitor.Pulse(locker);
    }
  }
}

Класс работает так же, как и раньше, только постоянная прокрутка цикла устранена. Wait неявно исполняет код, который был удален – Monitor.Enter после Monitor.Exit, но с одним дополнительным шагом в середине: пока блокировка отпущена, он ожидает вызова Pulse из другого потока. Именно это и делает метод Notifier после установки флага go в true. Работа сделана.

Обобщение модели использования Wait и Pulse

Давайте доработаем наш шаблон использования Wait и Pulse. В предыдущем примере внутри блокировки использовалось только одно булево поле – флаг go. В другом сценарии мог бы потребоваться дополнительный флаг, показывающий состояние ожидающего потока – готов или завершен. Экстраполировав эту ситуацию, и предполагая, что набор полей, вовлеченных в блокирование, может быть любым, представим программу в виде следующего псевдокода:

class X 
{
  Блокировочные поля:  один или более объектов, 
  участвующих в условии блокировки, например:
  bool go;   bool ready;   int semaphoreCount;   Queue <Task> consumerQ...
 
  object locker = new object();  // защищает все перечисленные выше поля!
 
  ... SomeMethod 
  {
    ... всякий раз когда нужно блокировать, 
        основываясь на наших блокировочных полях:
    lock (locker)
    {
      while (! Некий набор блокировочных полей ) 
      {
        // Дадим шанс другим потокам изменить блокировочные поля!
        Monitor.Exit(locker);
        Monitor.Enter(locker);
      }
    }
    ... всякий раз когда нужно изменить одно 
        или несколько блокировочных полей:
    lock (locker) 
    {
      изменяем поле(поля)
    }
  }
}

Теперь вставим в наш шаблон Wait и Pulse так же, как и в прошлый раз:

Вот модифицированный псевдокод:

Wait/Pulse шаблон #1: Основной вариант использования Wait/Pulse
class X 
{
  < Блокировочные поля ... >
  object locker = new object();

  ... SomeMethod
  {
    ...
    ... всякий раз когда нужно блокировать, 
        основываясь на наших блокировочных полях:
    lock (locker)
      while (!Некий набор блокировочных полей )
        Monitor.Wait(locker);

    ... всякий раз когда нужно изменить 
        одно или несколько блокировочных полей:
    lock (locker) 
    {
      изменяем поле(поля)
      Monitor.Pulse(locker);
    }     
  }
}

Такой подход дает надежную модель использования Wait и Pulse. Вот её главные особенности:

Важно также, что Pulse не принуждает ожидающий поток к возобновлению исполнения. Скорее Pulse только уведомляет его, что кое-что изменилось, рекомендуя перепроверить условие блокировки. Не сигнализирующий, а сам ожидающий поток определяет (в следующей итерации цикла), нужно ли ему продолжать работу или остаться в заблокированном состоянии. Преимущество такого подхода – возможность использования сложных условий блокировки без замысловатой логики синхронизации.

Другое полезное свойство этой модели – устойчивость логики к пропущенным Pulse. Пропуск импульсов сигнализации может произойти, когда Pulse вызывается раньше Wait, например, из-за гонок между ожидающим и сигнализирующим потоками. Поскольку в этой модели каждый сигнал означает “перепроверить условие блокировки” (а не “продолжить работу”), слишком ранний Pulse может быть безопасно проигнорирован, так как условие блокировки проверяется в while до вызова Wait.

Такой дизайн позволяет определить несколько блокировочных полей, составить из них сложное условие блокировки, но при этом использовать единственный объект синхронизации (в предыдущем примере – locker). Обычно это лучше, чем несколько объектов синхронизации, используемых в lock, Wait и Pulse, так как помогает избежать взаимоблокировок. Кроме того, с одним объектом синхронизации все блокировочные поля читаются и записываются как единое целое, тем самым исключая тонкие ошибки атомарности. Хорошей идеей, однако, будет не использовать объект синхронизации вне необходимой области видимости (объявив как private и собственно объект синхронизации, и все блокировочные поля).

Очередь поставщик/потребитель

Наше простое Wait/Pulse-приложение по сути – очередь поставщик/потребитель, реализованная нами ранее с использованием AutoResetEvent. Поставщик ставит в очередь задачи (обычно в главном потоке), в то время как один или более потребителей в рабочих потоках разбирают и выполняют их одну за другой.

В следующем примере для представления задачи будет использоваться строка. А очередь задач будет, соответственно, такой:

Queue<string> taskQ = new Queue<string>();

Поскольку очередь будет использоваться из нескольких потоков, необходимо обернуть в lock все операции чтения и записи в очередь. Так будет выглядеть постановка задачи в очередь:

lock (locker) 
{
  taskQ.Enqueue("my task");
  Monitor.PulseAll(locker);   // Мы изменили условие блокировки
}

Поскольку изменяется потенциальное условие блокировки, необходимо выдать сигнал. Мы вызываем PulseAll, а не Pulse, так как нужно учитывать, что потребителей может быть несколько, соответственно, ожидать будут несколько потоков.

Далее, требуется, чтобы рабочие потоки были блокированы, когда им нечего делать, другими словами, когда очередь пуста. Следовательно, наше условие блокировки – taskQ.Count==0. Вот выполняющее это выражение Wait:

lock (locker)
  while (taskQ.Count == 0)
    Monitor.Wait(locker);

Следующий шаг – рабочий поток удаляет задачу из очереди и исполняет её:

lock (locker)
  while (taskQ.Count == 0)
    Monitor.Wait(locker);
 
string task;

lock (locker)
  task = taskQ.Dequeue();

То, что получилось, однако, не является потокобезопасным: во время удаления задачи информация о состоянии очереди может быть уже устаревшей, так как она получена в предыдущем выражении lock. Посмотрите, что получится, если запустить одновременно два потока-потребителя с одним-единственным элементом в очереди. Возможно, ни один из них не вошел бы в ожидание в цикле – оба увидели бы в очереди один элемент. Далее они оба попытались бы удалить из очереди этот элемент, с генерацией исключения в потоке, который будет делать это вторым. Для исправления ситуации будем удерживать lock немного дольше – до окончания взаимодействия с очередью:

string task;

lock (locker) 
{
  while (taskQ.Count == 0)
    Monitor.Wait(locker);

  task = taskQ.Dequeue();
}

(Нет необходимости вызывать Pulse после удаления из очереди, так как никакой потребитель не может быть разблокирован имеющимися немногими элементами в очереди).

Как только задача удалена из очереди, нет никаких причин сохранять блокировку. Снятие блокировки в этом месте позволяет потребителю выполнять продолжительную задачу без ненужного блокирования других потоков.

Вот полный текст программы. Как и в версии с AutoResetEvent, постановка в очередь задачи со значением null будет сигналом потребителю на завершение (после завершения других недовыполненных задач). Поскольку потребителей может быть несколько, необходимо будет добавить по null-задаче на каждого, чтобы полностью завершить очередь:

Wait/Pulse шаблон #2: Очередь поставщик/потребитель
using System;
using System.Threading;
using System.Collections.Generic;

public class TaskQueue : IDisposable 
{
  object locker = new object();
  Thread[] workers;
  Queue<string> taskQ = new Queue<string>();

  public TaskQueue(int workerCount) 
  {
    workers = new Thread[workerCount];

    // Создать и запустить отдельный поток на каждого потребителя
    for (int i = 0; i < workerCount; i++)
      (workers [i] = new Thread(Consume)).Start();
  }

  public void Dispose() 
  {
    // Добавить по null-задаче на каждого завершаемого потребителя
    foreach (Thread worker in workers)
      EnqueueTask(null);

    foreach (Thread worker in workers)
      worker.Join();
  }

  public void EnqueueTask(string task) 
  {
    lock (locker) 
    {
      taskQ.Enqueue(task);
      Monitor.PulseAll(locker);
    }
  }

  void Consume()
  {
    while (true) 
    {
      string task;

      lock (locker)
      {
        while (taskQ.Count == 0)
          Monitor.Wait(locker);

        task = taskQ.Dequeue();
      }

      if (task == null)
        return;  // Сигнал на выход

      Console.Write(task);
      Thread.Sleep(1000);       // Имитация длительной работы
    }
  }
}

Вот метод Main, в котором создается очередь задач, задаются два потока-потребителя, для которых в очередь ставится 10 задач:

  static void Main()
  {
    using(TaskQueue q = new TaskQueue(2))
    {
      Console.WriteLine("Помещаем в очередь 10 задач");
      Console.WriteLine("Ожидаем завершения задач...");

      for (int i = 0; i < 10; i++)
        q.EnqueueTask(" Задача" + i);
    }

    // Выход из using приводит к вызову метода Dispose двух TaskQueue,
    // завершая потребителей после выполнения всех задач.
    Console.WriteLine("\r\nВсе задачи выполнены!");
  }

Консольный вывод:

Помещаем в очередь 10 задач
Ожидаем завершения задач...
 Задача0 Задача1 (пауза...) Задача2 Задача3 (пауза...) 
 Задача4 Задача5 (пауза...) Задача6 Задача7 (пауза...)
 Задача8 Задача9 (пауза...)
Все задачи выполнены!

В соответствии с нашим шаблоном проектирования, если удалить PulseAll и заменить Wait на переключение блокировок, мы получим тот же самый результат.

Про экономию сигналов

Посмотрим еще раз на добавление задачи в очередь:

lock (locker) 
{
  taskQ.Enqueue(task);
  Monitor.PulseAll(locker);
}

Вообще говоря, можно было сэкономить на выдаче сигналов, сигналя только тогда, когда действительно есть возможность освободить блокированного исполнителя:

lock (locker) 
{
  taskQ.Enqueue(task);

  if (taskQ.Count <= workers.Length)
    Monitor.PulseAll(locker);
}

Сэкономим мы, однако, немного, так как выдача сигнала занимает менее микросекунды и не влечет за собой накладных расходов для занятых потребителей, поскольку они этот сигнал все равно игнорируют. Правильной практикой для многопоточного кода будет удаление любой необязательной логики: плохо воспроизводимый дефект из-за глупой ошибки – слишком большая цена за экономию одной микросекунды! Вот пример внесения блуждающей ошибки типа “застрявший потребитель”, которая наверняка не будет выявлена при первоначальном тестировании (найди одно отличие):

lock (locker) 
{
  taskQ.Enqueue(task);

  if (taskQ.Count < workers.Length)
    Monitor.PulseAll(locker);
}

Сигнализация без всяких условий защитит вас от этого типа ошибок.

СОВЕТ

Сомневаешься – сигналь! В рассмотренном шаблоне проектирования сигнализация редко может повредить.

Pulse или PulseAll?

В нашем примере есть еще одна возможность для экономии сигналов. После добавления задачи в очередь можно вызвать Pulse, а не PulseAll, и ничего не сломается.

Вспомним отличие: при использовании Pulse может пробудиться максимум один поток (и перепроверить условие блокирования в while); в случае PulseAll пробудятся все ждущие потоки (и перепроверят условие блокирования). Если в очередь добавляется одна задача, для обработки нужен только один потребитель, так что нужно разбудить только одного вызовом Pulse. Это походит на класс спящих детей – если есть только одно мороженое, нет смысла будить их всех и ставить в очередь.

В нашем примере запускаются только два потока-потребителя, так что большой выгоды не извлечь. Если бы потоков-потребителей было десять, некоторый выигрыш от выбора Pulse вместо PulseAll появился бы. Если в очередь добавляются несколько задач, необходимо вызвать Pulse несколько раз. Это можно сделать в рамках одной конструкции lock, например, так:

lock (locker) 
{
  taskQ.Enqueue("task 1");
  taskQ.Enqueue("task 2");
  Monitor.Pulse(locker);    // "Сигналим двум 
  Monitor.Pulse(locker);    //  ожидающим потокам."
}

Цена одного невызванного Pulse – застрявший поток-потребитель. Эта ошибка будет блуждающей, так как вызов Pulse производит эффект, только когда потребитель находится в состоянии ожидания. Следовательно, можно расширить наш предыдущий лозунг “Сомневаешься – сигналь” до “Сомневаешься – сигналь всем!”

Исключение из этого правила возможно, только когда вычисление условия блокировки занимает слишком много времени.

Использование таймаутов для Wait

Иногда может быть нежелательно или невозможно вызывать Pulse всякий раз, когда изменяется условие блокировки. В качестве примера можно представить условие блокировки, вызывающее метод, периодически запрашивающий информацию из базы данных. Если время ожидания – не проблема, решение простое: определить таймаут при вызове Wait следующим образом:

lock (locker) 
{
  while ( blocking condition )
    Monitor.Wait(locker, timeout);
  ...

Условие блокировки будет перепроверяться как минимум регулярно с интервалом, заданным в таймауте, а также немедленно по получении сигнала. Чем проще условие блокировки, тем меньшее значение таймаута можно использовать без снижения эффективности.

Такой подход, кроме того, хорошо работает, если сигнал будет пропущен из-за ошибки в программе! Возможно, стоит добавлять таймаут ко всем командам Wait в программах, где синхронизация особенно сложна – в качестве страховки от особо загадочных ошибок сигнализации. Также это обеспечивает дополнительную устойчивость к ошибкам, если программа изменяется позже кем-то несведущим.

Гонки и подтверждения

Допустим, мы хотим посигналить рабочему потоку пять раз подряд:

class Race 
{
  static object locker = new object();
  static bool go;
 
  static void Main() 
  {
    new Thread(SaySomething).Start();
 
    for (int i = 0; i < 5; i++)
    {
      lock (locker)
      {
        go = true;
        Monitor.Pulse(locker);
      }
    }
  }
 
  static void SaySomething()
  {
    for (int i = 0; i < 5; i++)
    {
      lock (locker)
      {
        while (!go)
          Monitor.Wait(locker);

        go = false;
      }

      Console.WriteLine("Wassup?");
    }
  }
}

Ожидаемый вывод:

Wassup?
Wassup?
Wassup?
Wassup?
Wassup?

Реальный вывод:

Wassup?
(зависание)
ПРИМЕЧАНИЕ

При тестировании этого примера мы получили «ожидаемый результат», запуская программу, скомпилированную в debug-режиме (не из под отладчика), и «реальный результат» (то есть, зависание) в release-версии – прим.ред.

Эта программа с дефектом: цикл for в главном потоке может прокрутить все пять своих итераций в то время, когда рабочий поток не блокирован. Возможно, даже до того, как он вообще стартует! Наш пример с очередью Поставщик/Потребитель не страдал от этой проблемы, так как если главный поток забегал вперёд рабочего, каждый запрос просто ставился в очередь. Но в данном случае требуется блокировка главного потока на каждой итерации, если рабочий все еще занят предыдущей задачей.

В качестве простого решения можно в каждой итерации цикла for ожидать, пока флаг go не будет сброшен рабочим потоком. Рабочий поток после сброса флага должен вызвать Pulse:

class Acknowledged 
{
  static object locker = new object();
  static bool go;
 
  static void Main() 
  {
    new Thread(SaySomething).Start();
 
    for (int i = 0; i < 5; i++) 
    {
      lock (locker) 
      {
        go = true;
        Monitor.Pulse(locker);
      }

      lock (locker) 
      {
        while (go)
          Monitor.Wait(locker);
      }
    }
  }
 
  static void SaySomething() 
  {
    for (int i = 0; i < 5; i++) 
    {
      lock (locker) 
      {
        while (!go)
          Monitor.Wait(locker);

        go = false;
        Monitor.Pulse(locker); // Надо посигналить
      }

      Console.WriteLine("Wassup?");
    }
  }
}

Консольный вывод:

Wassup?
Wassup?
Wassup?
Wassup?
Wassup? (пять повторов)

Важная особенность такой программы заключается в том, что рабочий поток освобождает блокировку перед выполнением длительного задания (которое должно быть на месте вызова Console.WriteLine). Это гарантирует, что инициатор не будет заблокирован все время, пока рабочий поток исполняет задачу, о которой ему только что посигналили (и будет блокирован, только если рабочий поток занят предыдущей задачей).

В данном примере только один поток (главный) сигналит рабочему потоку о необходимости выполнить задачу. Если несколько потоков начнут сигналить рабочему – используя текущую логику из метода Main – у нас начнутся проблемы. Два сигналящих потока могли бы последовательно исполнить следующую строку кода:

lock (locker) 
{
  go = true;
  Monitor.Pulse(locker);
}

что привело бы к потере второго сигнала, если рабочий поток в это время не до конца отработал по первому сигналу. Можно учесть такую ситуацию, использовав пару флажков – “ready” и “go”. Флажок “ready” показывает, что рабочий поток готов принять новую задачу, “go”, как и раньше, – сигнал начинать. Решение аналогично предыдущему примеру, который делал то же самое, используя два AutoResetEvent, за исключением лучшей расширяемости. Вот переработанный шаблон, с нестатическими полями:

Wait/Pulse шаблон #3: Двусторонняя сигнализация
public class Acknowledged 
{
  object locker = new object();
  bool ready;
  bool go;  

  public void NotifyWhenReady() 
  {
    lock (locker)
    {
      // ожидать, если рабочий поток занят предыдущей задачей
      while (!ready)
        Monitor.Wait(locker);

      ready = false;
      go = true;
      Monitor.PulseAll(locker);
    }
  }

  public void AcknowledgedWait()
  { 
    // Отобразить готовность принять запрос
    lock (locker) 
    {
      ready = true;
      Monitor.Pulse(locker);
    }

    lock (locker)
    {
      while (!go)
        Monitor.Wait(locker);     // Ожидать установки "go"

      go = false;
      Monitor.PulseAll(locker);   // Подтвердить сигнал
    }
      
    Console.WriteLine("Wassup?"); // Выполнить задачу
  }
}

Для проверки запустим два параллельных потока, каждый из которых посигналит рабочему потоку пять раз. Тем временем главный поток будет ожидать десяти уведомлений:

public class Test
{
  static Acknowledged a = new Acknowledged();
 
  static void Main()
  {
    new Thread(Notify5).Start();     // Запустить два параллельных
    new Thread(Notify5).Start();     // "уведомляльщика"...
    Wait10();                         // ... и одного ожидающего
  }
 
  static void Notify5()
  {
    for (int i = 0; i < 5; i++)
      a.NotifyWhenReady();
  }
 
  static void Wait10()
  {
    for (int i = 0; i < 10; i++)
      a.AcknowledgedWait();
  }
}

Консольный вывод:

Wassup?
Wassup?
...
Wassup? (десять повторов)

В методе Notify флаг ready очищается перед выходом из lock. Это жизненно важно: таким образом предотвращается последовательная сигнализация двумя уведомляющими потоками без перепроверки флага. Для простоты установка флага go и вызов PulseAll выполняются в той же самой конструкции lock, однако можно поместить эти две инструкции в отдельные конструкции lock, и ничего не сломается.

Имитация Wait Handle

Возможно, вы заметили шаблонный код в предыдущем примере: оба цикла ожидания имеют следующую структуру:

lock (locker) 
{
  while (!flag) 
    Monitor.Wait(locker);

  flag = false;
 ...
}

где flag устанавливается в true в другом потоке. В действительности это имитация AutoResetEvent. А если опустить flag=false, то ManualResetEvent. Используя целочисленное поле, Pulse и Wait, можно имитировать Semaphore. Фактически единственный WaitHandle, который нельзя имитировать при помощи Pulse и Wait – это Mutex, так как эта функциональность предоставляется оператором lock.

Имитация статических методов, которые работают с несколькими WaitHandle, в большинстве случаев проста. Эквивалент вызова WaitAll с несколькими WaitHandle – не что иное, как условие блокировки, включающее все флаги, используемые вместо WaitHandle:

lock (locker) 
{
  while (!flag1 && !flag2 && !flag3...)
    Monitor.Wait(locker);

Это может быть особенно полезно, так как WaitAll в большинстве случаев не пригоден к использованию из-за проблем с унаследованным COM-кодом. Имитация WaitAny – просто вопрос замены оператора && оператором ||.

С имитацией SignalAndWait сложнее. При вызове он сигналит на одном хендле при ожидании на другом в атомарной операции. Ситуация аналогична транзакциям в распределенной базе данных – необходим двухфазный commit! Если, например, необходимо посигналить флагом flagA при ожидании на флаге flagB, придется разделить каждый флаг на два, получив в результате код типа такого:

lock (locker)
{
  flagAphase1 = true;
  Monitor.Pulse(locker);

  while (!flagBphase1)
    Monitor.Wait(locker);
 
  flagAphase2 = true;
  Monitor.Pulse(locker);

  while (!flagBphase2)
    Monitor.Wait(locker);
}

возможно, с дополнительной rollback-логикой для отката flagAphase1, если первый Wait сгенерирует исключение в результате прерывания по Interrupt или аварийного завершения потока по Abort. В этой ситуации использовать WaitHandle гораздо проще. В действительности, однако, атомарная сигнализация-и-ожидание – это редкое требование.

Ожидание стыковки

Wait и Pulse можно использовать так же, как для стыковки двух потоков используется WaitHandle.SignalAndWait. В следующем примере, можно сказать, имитируются два ManualResetEvent (другими словами, мы определяем два булевых флажка!), а затем выполняется взаимная сигнализация-и-ожидание установкой одного флага при ожидании другого. В данном случае нет необходимости в истинной атомарности сигнализации-и-ожидания, так что нет нужды и в двухфазном commit. Пока мы устанавливаем наш флаг и вызываем Wait в пределах одного lock-а, стыковка будет работать:

class Rendezvous 
{
  static object locker = new object();
  static bool signal1, signal2;
 
  static void Main() 
  {
    // Заставим каждый поток бездействовать в течение 
    // случайного промежутка времени
    Random r = new Random();
    new Thread(Mate).Start(r.Next(10000));
    Thread.Sleep(r.Next(10000));
 
    lock (locker)
    {
      signal1 = true;
      Monitor.Pulse(locker);

      while (!signal2)
        Monitor.Wait(locker);
    }

    Console.Write("Одновременно! ");
  }
 
  // Этот метод вызывается в потоке
  static void Mate(object delay) 
  {
    Thread.Sleep((int) delay);

    lock (locker) 
    {
      signal2 = true;
      Monitor.Pulse(locker);

      while (!signal1)
        Monitor.Wait(locker);
    }

    Console.Write("Одновременно! ");
  }
}

Консольный вывод:

Одновременно! Одновременно! (почти одновременно)

Wait и Pulse vs. Wait Handles

Поскольку Wait и Pulse являются наиболее гибкой конструкцией сигнализации, они могут использоваться практически в любой ситуации. У WaitHandle, однако, есть два преимущества:

В дополнение к этому с WaitHandle проще взаимодействовать – их можно передавать через параметры методов. В пулах потоков это свойство с пользой применяется.

В смысле производительности Wait и Pulse имеют небольшой перевес, если следовать вот такому образцу дизайна:

lock (locker)
  while (blocking condition)
    Monitor.Wait(locker);

и условие блокировки ложно с самого начала. Единственные накладные расходы – это выход из lock (десятки наносекунд) против нескольких микросекунд на вызов WaitHandle.WaitOne. Конечно, все это при условии, что борьбы за блокировку не происходит; даже самой короткой борьбы за блокировку было бы достаточно, чтобы выровнять результаты; а частая борьба за блокировку сделала бы WaitHandle быстрее!

С учетом потенциальных различий разных CPU, операционных систем, версий CLR и программной логики, несколько микросекунд вряд ли могут быть причиной для выбора между WaitHandle и Wait/Pulse.

Правильно будет использовать WaitHandle, когда одна из конструкций естественно соответствует требуемой работе, а если такой конструкции нет – использовать Wait и Pulse.

Suspend и Resume

Поток может быть явно приостановлен и продолжен с помощью методов Thread.Suspend и Thread.Resume. Это механизм никак не пересекается с блокировками, обсуждаемыми ранее. Обе системы независимы и работают параллельно.

Поток может приостановить себя или другой поток. Вызов Suspend переводит поток на короткое время в состояние SuspendRequested, а затем, при достижении безопасной точки для сбора мусора – в состояние Suspended. Из этого состояния поток может продолжить выполнение только с помощью другого потока, который вызовет для него метод Resume. Resume работает только для приостановленных, но не для заблокированных потоков.

В .NET 2.0 Suspend и Resume объявлены не рекомендованными к применению из-за опасности произвольной приостановки другого потока. Если поток, удерживающий блокировку на критическом ресурсе, будет приостановлен, может зависнуть целое приложение (или компьютер). Это намного опаснее вызова Abort – который привел бы к освобождению всех блокировок – по крайней мере, теоретически – на основании кода в блоках finally.

Можно, однако, безопасно вызывать Suspend для текущего потока, реализовав при этом простой механизм синхронизации для рабочего потока в цикле выполнение задачи/вызов Suspend для себя/ожидание вызова Resume (“побудки”) главным потоком, когда будет готова следующая задача. Сложность заключается в определении, действительно ли рабочий поток сейчас приостановлен. Посмотрите следующий код:

worker.NextTask = "MowTheLawn";

if ((worker.ThreadState & ThreadState.Suspended) > 0)
  worker.Resume();
else
  // Нельзя вызывать Resume, так как поток уже выполняется.
  // Посигналим рабочему потоку флагом:
  worker.AnotherTaskAwaits = true;

Это грубейшее нарушение потоковой безопасности – код может быть вытеснен в любой точке этих пяти строк, и пока он будет ожидать своего кванта времени, рабочий поток будет исполняться и может изменить свое состояние. Несмотря на то, что разрулить эту ситуацию можно, решение будет более сложным, чем его альтернатива – использование конструкций сигнализации, таких как AutoResetEvent или Monitor.Wait. Это делает Suspend и Resume совершенно бесполезными.

Нерекомендуемые методы Suspend и Resume имеют два режима – опасный и бесполезный.

Аварийное завершение потоков

Поток может быть аварийно завершен при помощи метода Abort:

class Abort 
{
  static void Main()
  {
    Thread t = new Thread(delegate() { while (true) ; }); // Бесконечный цикл
    t.Start();
    Thread.Sleep(1000);  // Пусть поработает секунду...
    t.Abort();           // после чего принудительно завершим его.
  }
}

Аварийно завершаемый поток немедленно переходит в состояние AbortRequested. Если завершение проходит как ожидалось, поток переходит в состояние Stopped. Поток, вызвавший Abort, может ожидать этого, вызвав Join:

class Abort 
{
  static void Main()
  {
    Thread t = new Thread(delegate() { while (true) ; });  // Бесконечный цикл
    Console.WriteLine(t.ThreadState);     // Unstarted
 
    t.Start();
    Thread.Sleep(1000);
    Console.WriteLine(t.ThreadState);     // Running
 
    t.Abort();
    Console.WriteLine(t.ThreadState);     // AbortRequested
 
    t.Join();
    Console.WriteLine(t.ThreadState);     // Stopped
  }
}

Вызов Abort вызывает генерацию исключения ThreadAbortException в завершаемом потоке, в большинстве случаев прямо там, где поток находится в это время. Завершаемый поток может обработать это исключение, но оно будет автоматически выброшено снова в конце блока catch (чтобы содействовать завершению потока, как это и предполагалось). Можно, однако, предотвратить автоматический повторный выброс исключения с помощью вызова Thread.ResetAbort в блоке catch. В этом случае поток возвращается в состояние Running (из которого его можно опять попытаться принудительно прекратить). В следующем примере рабочий поток воскресает из мертвых всякий раз после вызова Abort:

class Terminator 
{
  static void Main()
  {
    Thread t = new Thread(Work);
    t.Start();

    Thread.Sleep(1000);
    t.Abort();

    Thread.Sleep(1000);
    t.Abort();

    Thread.Sleep(1000);
    t.Abort();
  }
 
  static void Work()
  {
    while (true)
    {
      try
      {
        while (true)
          ;
      }
      catch(ThreadAbortException) { Thread.ResetAbort(); }

      Console.WriteLine("Я не умру!");
    }
  }
}

Необработанное ThreadAbortException, однако, не приводит к аварийному завершению приложения, в отличие от других типов исключений.

Abort будет воздействовать на поток в любом состоянии – рабочем, заблокированном, приостановленном или остановленном. Если, однако, принудительно завершается приостановленный поток, генерируется ThreadStateException – на сей раз в вызывающем потоке – и принудительное завершение не может быть произведено, пока поток не возобновит работу. Вот как можно принудительно завершить приостановленый поток:

try { suspendedThread.Abort(); }
catch(ThreadStateException) { suspendedThread.Resume(); }
// Сейчас suspendedThread будет принудительно прекращен.

Сложности с Thread.Abort

Предположив, что аварийно завершаемый поток не будет вызывать ResetAbort, можно было бы ожидать, что убийство произойдет довольно быстро. Но, как это обычно случается, с хорошим адвокатом можно провести в камере смертников довольно долгое время! Вот несколько факторов, которые могут задержать поток в состоянии AbortRequested:

Последний фактор особенно неприятен, так как .NET Framework часто вызывает неуправляемый код, иногда оставаясь там в течение долгого времени. Примером может быть работа с сетью или базами данных. Если сетевой ресурс или сервер баз данных умирают или не спешат с ответом, выполнение может оставаться в неуправляемом коде в течение минут, в зависимости от используемой реализации. В этих случаях было бы неприятно зависнуть на Join, ожидая завершения потока, особенно без использования таймаута.

Принудительное завершение работы чистого .NET-кода менее проблематично, если для гарантии надлежащей очистки после выброса ThreadAbortException используются блоки try/finally или конструкция using. Однако и в этом случае нужно быть готовым к неприятным сюрпризам. Посмотрите, например, на следующий код:

using(StreamWriter w = File.CreateText("myfile.txt"))
  w.Write("Abort-Safe?");

C#-оператор using – на самом деле просто сокращенная запись для следующего кода:

StreamWriter w;
w = File.CreateText("myfile.txt");
try     { w.Write("Abort-Safe"); }
finally { w.Dispose();            }  

Abort может случиться после создания StreamWriter, но перед началом блока try. Фактически, углубляясь в IL, можно увидеть, что это может произойти даже между созданием StreamWriter и присвоением значения w:

IL_0001:  ldstr      "myfile.txt"
IL_0006:  call       class [mscorlib]System.IO.StreamWriter
                     [mscorlib]System.IO.File::CreateText(string)
IL_000b:  stloc.0
.try
{
  ...

В этом случае до вызова Dispose в блоке finally дело не дойдет, хэндл открытого файла зависнет, пресекая любые попытки создать myfile.txt в течение оставшейся жизни домена приложения.

В действительности ситуация еще хуже, так как Abort может иметь место внутри реализации File.CreateText. Исходные коды этого метода официально не открыты, но к счастью, в .NET трудно что-либо действительно закрыть, можно снова обратиться к ILDASM, а лучше к Lutz Roeder's Reflector – и заглянув соответствующую сборку, увидеть, что там вызывается конструктор StreamWriter, который имеет следующую логику:

public StreamWriter(string path, bool append, ...)
{
  ...
  ...
  Stream stream1 = StreamWriter.CreateFile(path, append);
  this.Init(stream1, ...);
}

Нигде в этом конструкторе нет блоков try/catch, а это значит, что вызов Abort во время выполнения нетривиального метода Init подвесит только что созданный StreamWriter безо всякого способа закрыть принадлежащий ему файловый хэндл.

Поскольку дизассемблирование каждого используемого вызова CLR, очевидно, непрактично, встает вопрос, как же писать методы, которые можно без проблем аварийно завершать. Самый очевидный выход из ситуации – не завершать аварийно другие потоки вообще, а использовать специальное булево поле, через которое сигнализировать потоку о необходимости завершения. Рабочий поток должен периодически проверять это поле, элегантно завершаясь, если оно установлено в true. Как ни странно, самым элегантным завершением для рабочего потока будет вызов Abort для себя самого, подойдет также явный выброс исключения. Это гарантирует правильную очистку потоков в процессе выполнения блоков catch/finally – аналогично вызову Abort из другого потока, за исключением того, что исключение генерируется в выбранном нами месте:

class ProLife 
{
  public static void Main()
  {
    RulyWorker w = new RulyWorker();
    Thread t = new Thread(w.Work);
    t.Start();
    Thread.Sleep(500);
    w.Abort();
  }
 
  public class RulyWorker 
  {
    // Ключевое слово volatile гарантирует, что abort не будет
    // кешироваться потоком
    volatile bool abort;   
 
    public void Abort() { abort = true; }
 
    public void Work() 
    {
      while (true)
      {
        CheckAbort();
        // Делаем что-то полезное...
        try      { OtherMethod(); }
        finally  { /* требуемая очистка */ }
      }
    }
 
    void OtherMethod()
    {
      // Делаем что-то полезное...
      CheckAbort();
    }
 
    void CheckAbort() 
    {
      if (abort)
        Thread.CurrentThread.Abort();
    }
  }
}

Вызов Abort для текущего потока – один из вариантов, при которых Abort будет полностью безопасен. Другой вариант – точное знание, что прерываемый извне поток находится в определенной секции кода, обычно достигаемое использованием механизмов синхронизации типа WaitHandle или Monitor.Wait. Третий безопасный вариант вызова Abort – если вслед за ним последует завершение домена приложений или процесса.

Завершение домена приложений

Еще один способ безопасно прикончить рабочий поток – заставить поток работать в собственном домене приложений. После вызова Abort просто сносится весь домен, что приводит к освобождению всех зависших ресурсов.

Строго говоря, первый шаг – принудительное завершение потока – на самом деле не нужен, так как когда домен приложений выгружается, все потоки, исполняющие код в этом домене, автоматически принудительно завершаются. Однако если потоки при этом не завершаются своевременно (возможно, из-за кода в блоках finally, или по причинам, обсуждавшимся выше), домен приложений не выгружается, и выбрасывается исключение CannotUnloadAppDomainException. По этой причине лучше явно вызвать Abort для рабочих потоков, а затем вызвать Join с заданным таймаутом (который вы можете контролировать) перед выгрузкой домена приложений.

В следующем примере рабочий поток в бесконечном цикле создает и закрывает файл, используя небезопасный в случае аварийного завершения метод File.CreateText. Главный поток занимается тем, что запускает и завершает рабочие потоки. Обычно это заканчивается IOException в течении одной-двух итераций из-за реализации File.CreateText, оставляющей незакрытые файловые хэндлы при аварийном завершении потока:

using System;
using System.IO;
using System.Threading;
 
class Program 
{
  static void Main()
  {
    while (true)
    {
      Thread t = new Thread(Work);
      t.Start();
      Thread.Sleep(100);
      t.Abort();
      Console.WriteLine("Aborted");
    }
  }
 
  static void Work()
  {
    while (true)
      using(StreamWriter w = File.CreateText("myfile.txt"))
         ...
  }
}

Консольный вывод:

Aborted
Aborted
System.IO.IOException: The process cannot access the file '...myfile.txt' 
because it is being used by another process.

А вот модифицированная программа, в ней рабочий поток исполняется в собственном домене, который выгружается после принудительного завершения потока. Она бесконечно выполняется без ошибок, так как домен приложений, выгружаясь, освобождает файловый хэндл:

class Program 
{
  static void Main(string [] args)
  {
    while (true)
    {
      AppDomain ad = AppDomain.CreateDomain("worker");
      Thread t = new Thread(delegate() { ad.DoCallBack(Work); });
      t.Start();
      Thread.Sleep(100);
      t.Abort();

      if (!t.Join(2000)) 
      {
        // Поток не завершился – можно предпринять что-либо,
        // если есть что. К счастью, в данном случае можно ожидать,
        // что поток будет завершаться *всегда*.
      }

      AppDomain.Unload(ad); // Выгружаем загрязненный домен!
      Console.WriteLine("Aborted");
    }
  }
 
  static void Work()
  {
    while (true)
      using(StreamWriter w = File.CreateText("myfile.txt")) 
        ...
  }
}

Консольный вывод:

Aborted
Aborted
Aborted
Aborted
...

Создание и разрушение доменов приложений классифицируется в мире потоков как операция, отнимающая относительно много времени (несколько миллисекунд), так что, наверное, не стоит делать это в цикле. Кроме того, разделение, привносимое доменами приложений, добавляет другой элемент, который может быть и полезным, и вредным, в зависимости от назначения многопоточной программы. В контексте unit-тестирования, например, исполнение потоков в отдельных доменах может быть очень полезным.

Завершение процессов

Еще один вариант, при котором поток может завершиться – завершение родительского процесса. Пример – рабочий поток с установленным в true свойством IsBackground, и главный поток, завершающийся, когда рабочий еще исполняется. Фоновый поток не способен продлить жизнь приложения, так что процесс завершается, унося с собой фоновый поток.

Когда поток завершается из-за родительского процесса, он останавливается намертво, никакие блоки finally не выполняются.

Так же выглядит ситуация с завершением пользователем зависшего приложения через Диспетчер задач Windows, или с процессом, завершаемым программно с помощью Process.Kill.


Эта статья опубликована в журнале RSDN Magazine #2-2007. Информацию о журнале можно найти здесь
    Сообщений 0    Оценка 467 [+1/-0]         Оценить