Неблокируемая очередь сообщений для двух потоков
От: HaronK Украина  
Дата: 19.12.07 15:46
Оценка:
Привет!

Недавно возникла задача написания очереди для передачи сообщений от одного потока другому.
После рассмотрения существующих реализаций, решил попробовать написать свою реализацию.
Несколько дней ушло на обдумывание и написание пробного варианта.

Возможно я изобретал колесо, тогда посьба сильно не пинать

Идея состоит в следующем.

Постановка задачи:
Есть два потока. Один пишет в очередь, другой — читает из нее.
Реализовать очередь без использования механизма блокировок.

Реализация:
В классе очереди реализовать 3 указателя на цепочки сообщений:
1. Цепочка для записи
2. Промежуточная цепочка
3. Цепочка для чтения
и флаг состояния указателя промежуточной цепочки.

Код:

template <class TYPE> class QueueNoMutex
{
public:
    QueueNoMutex();
    ~QueueNoMutex();

    // Запись в очередь
    void Enqueue(const TYPE& data);

    // Сброс в промежуточный указатель, если пишущий поток завершился раньше.
    bool Flush();

    // Прочитать из очереди следующий елемент
    bool Dequeue(TYPE& data);

private:
    struct Element
    {
        TYPE data;
        Element* next;
    };

    void DeleteChain(Element* el);

    volatile bool isTempQueueSet;

    Element *readerTop;
    Element *tempTop;
    Element *writerTop, *writerBottom;
};

template<class TYPE>
QueueNoMutex<TYPE>::QueueNoMutex()
{
    readerTop = tempTop = writerTop = writerBottom = NULL;
    isTempQueueSet = false;
}

template<class TYPE>
QueueNoMutex<TYPE>::~QueueNoMutex()
{
    DeleteChain(readerTop);
    DeleteChain(tempTop);
    DeleteChain(writerTop);
}

//------------------------------------------------------------------------------
/**
*/
template<class TYPE>
void
QueueNoMutex<TYPE>::Enqueue(const TYPE& data)
{
    // создаем новый елемент цепочки и помещаем в него данные
    Element* el = new Element;
    el->data = data;
    el->next = NULL;

    // если цепочка сообщений для записи не пуста, добавляем елемент в ее конец
    if (writerTop)
    {
        writerBottom->next = el;
        writerBottom = el;
    }
    else
    {
        // иначе это будет первый елемент цепочки
        writerTop = writerBottom = el;
    }

    // проверяем свободна ли промежуточная цепочка
    if (!isTempQueueSet)
    {
        // перекидываем в нее цепочку записи
        tempTop = writerTop;
        // освобождаем указатель вершины цепочки записи
        writerTop = NULL;
        // и устанавливаем флаг готовности промежуточной цепочки
        isTempQueueSet = true;
    }
}

template<class TYPE>
bool
QueueNoMutex<TYPE>::Flush()
{
    // если цепочка записи пуста, возвращаем успешность сброса
    if (!writerTop) return true;

    // проверяем свободна ли промежуточная цепочка
    if (!isTempQueueSet)
    {
        // перекидываем в нее цепочку записи
        tempTop = writerTop;
        // освобождаем указатель вершины цепочки записи
        writerTop = NULL;
        // и устанавливаем флаг готовности промежуточной цепочки
        isTempQueueSet = true;

        // возвращаем успешность сброса
        return true;
    }

    // цепочка все еще не сброшена
    return false;
}

template<class TYPE>
bool
QueueNoMutex<TYPE>::Dequeue(TYPE& data)
{
    // если цепочка для чтения пуста
    if (!readerTop)
    {
        // проверяем, есть ли данные в промежуточной цепочке
        if (isTempQueueSet)
        {
            // перебрасываем промежуточную цепочку в цепочку для чтения
            readerTop = tempTop;
            // обнуляем промежуточную цепочку (если используется Flush(), то не нужно)
            tempTop = NULL;
            // сообщаем, что промежуточная цепочка пуста
            isTempQueueSet = false;
        }
        else
        {
            // нет данных для чтения
            return false;
        }
    }

    // вычитываем данные из верхнего елемента цепочки
    data = readerTop->data;

    // удаляем и смещаем вершину цепочки для чтения
    Element* cur = readerTop;
    readerTop = cur->next;
    delete cur;

    return true;
}

template<class TYPE>
void
QueueNoMutex<TYPE>::DeleteChain(Element* el)
{
    while (el)
    {
        Element* cur = el;
        el = cur->next;
        delete cur;
    }
}


Код потоков для целочисленного варианта даных:


typedef QueueNoMutex<int> MyQueue;

class WriterThread: public Thread
{
public:
    void DoWork()
    {
        int data = 100;

        // основной цикл
        while (data >= 0)
        {
            queue->Enqueue(data--);

            // эмуляция работы реального потока
            Sleep(100);
        }

        // подчищаем хвосты, если последняя запись не перебросила в промежуточную цепочку
        while (!queue->Flush())
        {
            Sleep(100);
        }
    }

    // ...

private:
    MyQueue* queue;
};

class ReaderThread: public Thread
{
public:
    void DoWork()
    {
        int curVal = 1, data = 1;
        bool found = false;

        // основной цикл чтения
        while (data > 0)
        {
            while (queue->Dequeue(data))
            {
            if (curVal != data)
        {
            printf("%d != %d\n", curVal, data);
        }
        curVal = data - 1;
                found = true;
            }

            if (found)
            {
        printf("Last read: %d\n", data);
                found = false;
            }

            // эмуляция работы реального потока
            Sleep(200);
        }
        printf("Done.\n");
    }

    // ...

private:
    MyQueue* queue;
};


Еще раз уточню, что эта очередь предназначена только для одного потока записи и одного чтения.

Проверял в тестовом режиме с эмуляцией различной загруженности потоков. Работало без проблем.
Также надо будет сравнить с очередью с блокировками по скорости.
В ближайшем будуще попробую применить в реальном приложении.

Хотел бы услышать коментарии. Что хорошо, что плохо, нет ли проблем.
Re: Неблокируемая очередь сообщений для двух потоков
От: Сергей Мухин Россия  
Дата: 19.12.07 15:53
Оценка:
Здравствуйте, HaronK, Вы писали:


HK>Хотел бы услышать коментарии. Что хорошо, что плохо, нет ли проблем.


А вот если писатель накидал много записей и устал, читатель их всех разгребет? По тексту не ясно.
---
С уважением,
Сергей Мухин
Re[2]: Неблокируемая очередь сообщений для двух потоков
От: HaronK Украина  
Дата: 19.12.07 16:32
Оценка:
Здравствуйте, Сергей Мухин, Вы писали:

СМ>Здравствуйте, HaronK, Вы писали:



HK>>Хотел бы услышать коментарии. Что хорошо, что плохо, нет ли проблем.


СМ>А вот если писатель накидал много записей и устал, читатель их всех разгребет? По тексту не ясно.


Что значит "устал"? Закончил посылать сообщения?
Читатель полностью обрабатывает все сообщения из своей цепочки и если писатель использует Flush() метод, как было в примере кода для записывающего потока, то читатель получит все сообщения.
Re[3]: Неблокируемая очередь сообщений для двух потоков
От: Сергей Мухин Россия  
Дата: 19.12.07 18:39
Оценка:
Здравствуйте, HaronK, Вы писали:

HK>Здравствуйте, Сергей Мухин, Вы писали:


СМ>>Здравствуйте, HaronK, Вы писали:



HK>>>Хотел бы услышать коментарии. Что хорошо, что плохо, нет ли проблем.


СМ>>А вот если писатель накидал много записей и устал, читатель их всех разгребет? По тексту не ясно.


HK>Что значит "устал"? Закончил посылать сообщения?

да

HK>Читатель полностью обрабатывает все сообщения из своей цепочки и если писатель использует Flush() метод, как было в примере кода для записывающего потока, то читатель получит все сообщения.


а! я и не заметил, но тогда очень странная ситуация, писатель вроде бы как все отдал, тем не менее он еще должен заниматься flush. Не удобно.Очень
---
С уважением,
Сергей Мухин
Re: Неблокируемая очередь сообщений для двух потоков
От: Pzz Россия https://github.com/alexpevzner
Дата: 20.12.07 02:27
Оценка: 1 (1)
Здравствуйте, HaronK, Вы писали:

HK>Хотел бы услышать коментарии. Что хорошо, что плохо, нет ли проблем.


Вы полагаетесь на то, что порядок действий над памятью (все эти манипуляции с указателями) выглядит одинаково из обеих потоков.

Во-первых, надо понимать, что компилятор может просто взять, и сделать их в другом порядке. Просто потому, что ему так удобнее. С компиятором можно бороться словом volatile — есть негласное соглашение, что компилятор никогда не меняет между собой порядок действих над volatile переменными.

Но есть и более тонкая проблема. Процессор тоже может менять порядок доступа к памяти. Это заметно только если потоки бегут на разных процессорах в многопроцессорной системе (процессор с hyperthreading'ом ведет себя в этом отношении как 2 отдельных процессора). Проявляться это будет в том, что Ваша програмка будет изредка сбоить, причем эффект будет пропадать (или усиливаться, в зависимости от Вашего везения) под отладчиком, при вставлении отладочной печати и т.п. Лечится это с помощью memory barrier'ов, ищите подробности в интернете, мне на эту тему лень лекцию читать. Да, InterlockedXXX() функции содержат внутри себя barrier, поэтому они на SMP-машинках безопасны. В отличии от обычных присваиваний. Но неизвестно, что дороже, несколько InterlockedXXX подряд, или одна честная CRITICAL_SECTION (ну или pthread_mutex, если говорить про юниксы-хрюндиксы).
Re[2]: Неблокируемая очередь сообщений для двух потоков
От: What Беларусь  
Дата: 20.12.07 03:49
Оценка: -1
Здравствуйте, Pzz, Вы писали:

Pzz>Но есть и более тонкая проблема. Процессор тоже может менять порядок доступа к памяти. Это заметно только если потоки бегут на разных процессорах в многопроцессорной системе (процессор с hyperthreading'ом ведет себя в этом отношении как 2 отдельных процессора). Проявляться это будет в том, что Ваша програмка будет изредка сбоить, причем эффект будет пропадать (или усиливаться, в зависимости от Вашего везения) под отладчиком, при вставлении отладочной печати и т.п. Лечится это с помощью memory barrier'ов, ищите подробности в интернете, мне на эту тему лень лекцию читать.

Это верно для всяких там итаниумов и прочих, а на x86 и Intel64/AMD64 такой проблемы нет.
Из Intell TBB книжки:

Some processor architectures, such as the Intel Itanium, IBM POWER and PowerPC,
and Alpha processors, have weak memory consistency, in which memory operations
on different addresses may be reordered by the hardware for the sake of efficiency.
[skip]
If you are programming only IA-32 and Intel 64/AMD64 processor platforms,
you can skip this section. These platforms have the strongest memory consistency
models.

Re[3]: Неблокируемая очередь сообщений для двух потоков
От: K13 http://akvis.com
Дата: 20.12.07 04:43
Оценка: 1 (1)
W>Это верно для всяких там итаниумов и прочих, а на x86 и Intel64/AMD64 такой проблемы нет.
W>Из Intell TBB книжки:

Some processor architectures, such as the Intel Itanium, IBM POWER and PowerPC,
and Alpha processors, have weak memory consistency, in which memory operations
on different addresses may be reordered by the hardware for the sake of efficiency.
[skip]
If you are programming only IA-32 and Intel 64/AMD64 processor platforms,
you can skip this section. These platforms have the strongest memory consistency
models.


Лучше всего обратиться в http://groups.google.com/group/comp.programming.threads?hl=en
Там все выскажут весьма обоснованно.
Причем насколько я помню, операции чтения процессор все-таки имеет право reorder даже на x86, но в каких случаях -- лучше к специалистам в этом вопросе.
Re[4]: Неблокируемая очередь сообщений для двух потоков
От: Сергей Мухин Россия  
Дата: 20.12.07 06:53
Оценка:
Здравствуйте, K13, Вы писали:

W>>Это верно для всяких там итаниумов и прочих, а на x86 и Intel64/AMD64 такой проблемы нет.


Самая большая проблема не в этом а:
1. решение исключительно для одного читателя-писателя
2. писатель должен ждать! читателя
---
С уважением,
Сергей Мухин
Re[5]: Неблокируемая очередь сообщений для двух потоков
От: hexis  
Дата: 20.12.07 07:38
Оценка:
Сергей Мухин wrote:
> W>>Это верно для всяких там итаниумов и прочих, а на x86 и Intel64/AMD64
> такой проблемы нет.
>
> Самая большая проблема не в этом а:
> 1. решение исключительно для одного читателя-писателя
> 2. писатель должен ждать! читателя
Со второй легко справиться в этом-же стиле — достаточно установить
признак ушедшего писателя, и проверить его из читателя. И с
переупорядочиванием операций, на первый взгляд, тоже можно —
устанавливая барьер перед изменением isTempQueueSet (например, используя
аналог InterlockedExchange для ее установки и сброса).
Posted via RSDN NNTP Server 2.1 beta
Re[6]: Неблокируемая очередь сообщений для двух потоков
От: Сергей Мухин Россия  
Дата: 20.12.07 07:45
Оценка:
Здравствуйте, hexis, Вы писали:

H>Сергей Мухин wrote:

>> W>>Это верно для всяких там итаниумов и прочих, а на x86 и Intel64/AMD64
>> такой проблемы нет.
>>
>> Самая большая проблема не в этом а:
>> 1. решение исключительно для одного читателя-писателя
>> 2. писатель должен ждать! читателя
H>Со второй легко справиться в этом-же стиле — достаточно установить
H>признак ушедшего писателя, и проверить его из читателя. И с

а если он не ушел, а просто задумался?

Не получается ли, что с такими ограничениями, надо еще делать какие-то странные действия, что бы получить спорный выигрыш?

H>переупорядочиванием операций, на первый взгляд, тоже можно -

H>устанавливая барьер перед изменением isTempQueueSet (например, используя
H>аналог InterlockedExchange для ее установки и сброса).

в общем очередной кривой велосипед.
---
С уважением,
Сергей Мухин
Re[7]: Неблокируемая очередь сообщений для двух потоков
От: hexis  
Дата: 20.12.07 08:39
Оценка:
Сергей Мухин wrote:
>
> H>Сергей Мухин wrote:
>>> W>>Это верно для всяких там итаниумов и прочих, а на x86 и Intel64/AMD64
>>> такой проблемы нет.
>>>
>>> Самая большая проблема не в этом а:
>>> 1. решение исключительно для одного читателя-писателя
>>> 2. писатель должен ждать! читателя
> H>Со второй легко справиться в этом-же стиле — достаточно установить
> H>признак ушедшего писателя, и проверить его из читателя. И с
>
> а если он не ушел, а просто задумался?

Ну, тут есть и другие проблемы — например, будет проблема, если в
среднем пишется быстрее, чем читается. Просто для универсальных проблем
существуют универсальные решения. Для частных — частные. Можно выбирать
под конкретную задачу.

>

> Не получается ли, что с такими ограничениями, надо еще делать какие-то
> странные действия, что бы получить спорный выигрыш?

В последнее время lock-free и wait-free структуры стали очень
популярными. И в принципе, встречаются задачи, где их использование
весьма желательно, или вообще — единственный возможный вариант. Например
— hard real time или взаимодействие с обработчиком прерываний. По
скорости — я не думаю, что реализация очереди на блокировках будет
существенно медленнее. Если вообще будет. Но это при на нормальной
реализации потоков. Скажем, в случае с linux, собранным с linuxthreads
(жуткий тормоз в сравнении с NPTL), будет явное преимущество по скорости.
Posted via RSDN NNTP Server 2.1 beta
Re[4]: Неблокируемая очередь сообщений для двух потоков
От: HaronK Украина  
Дата: 20.12.07 08:43
Оценка:
Здравствуйте, K13, Вы писали:

K13>Лучше всего обратиться в http://groups.google.com/group/comp.programming.threads?hl=en

K13>Там все выскажут весьма обоснованно.
K13>Причем насколько я помню, операции чтения процессор все-таки имеет право reorder даже на x86, но в каких случаях -- лучше к специалистам в этом вопросе.

Спасибо за ссылку. Закину туда вопрос тоже.
Re[3]: Неблокируемая очередь сообщений для двух потоков
От: Pzz Россия https://github.com/alexpevzner
Дата: 20.12.07 11:16
Оценка:
Здравствуйте, What, Вы писали:

W>Здравствуйте, Pzz, Вы писали:


Pzz>>Но есть и более тонкая проблема. Процессор тоже может менять порядок доступа к памяти. Это заметно только если потоки бегут на разных процессорах в многопроцессорной системе (процессор с hyperthreading'ом ведет себя в этом отношении как 2 отдельных процессора). Проявляться это будет в том, что Ваша програмка будет изредка сбоить, причем эффект будет пропадать (или усиливаться, в зависимости от Вашего везения) под отладчиком, при вставлении отладочной печати и т.п. Лечится это с помощью memory barrier'ов, ищите подробности в интернете, мне на эту тему лень лекцию читать.

W>Это верно для всяких там итаниумов и прочих, а на x86 и Intel64/AMD64 такой проблемы нет.

Я тоже так раньше думал, пока сам не нарвался. У меня, к счастию, не было Вашей уверенности, поэтому мне хватило часа, чтобы локализовать проблему, и полдня, чтобы сообразить, как переделать виноватое место. Подумайте на досуге, сколько времени это могло бы занять у Вас.
Re[8]: Неблокируемая очередь сообщений для двух потоков
От: remark Россия http://www.1024cores.net/
Дата: 20.12.07 11:24
Оценка:
Здравствуйте, hexis, Вы писали:

H>По

H>скорости — я не думаю, что реализация очереди на блокировках будет
H>существенно медленнее. Если вообще будет.


На однопроцессорной машине она может быть лучше более чем на порядок. На многопроцессорной машине выигрыш будет ещё существеннее + иметь последствия не только на голую скорость, но и на масштабиремость.



1024cores &mdash; all about multithreading, multicore, concurrency, parallelism, lock-free algorithms
Re[9]: Неблокируемая очередь сообщений для двух потоков
От: Сергей Мухин Россия  
Дата: 20.12.07 11:31
Оценка:
Здравствуйте, remark, Вы писали:

H>>По

H>>скорости — я не думаю, что реализация очереди на блокировках будет
H>>существенно медленнее. Если вообще будет.


R>На однопроцессорной машине она может быть лучше более чем на порядок. На многопроцессорной машине выигрыш будет ещё существеннее + иметь последствия не только на голую скорость, но и на масштабиремость.



кто она? Кто выиграет то?
---
С уважением,
Сергей Мухин
Re[10]: Неблокируемая очередь сообщений для двух потоков
От: remark Россия http://www.1024cores.net/
Дата: 20.12.07 11:50
Оценка:
Здравствуйте, Сергей Мухин, Вы писали:

СМ>Здравствуйте, remark, Вы писали:


H>>>По

H>>>скорости — я не думаю, что реализация очереди на блокировках будет
H>>>существенно медленнее. Если вообще будет.


R>>На однопроцессорной машине она может быть лучше более чем на порядок. На многопроцессорной машине выигрыш будет ещё существеннее + иметь последствия не только на голую скорость, но и на масштабиремость.



СМ>кто она? Кто выиграет то?



Очередь с использованием только неблокирующих примитивов (InterlockedXXX) на многопроцессорной/многоядерной машине может дать выигрыш на порядки по сравнению с очередью на мьютексе. Это можно частично вылечить, если использовать спин-мьютекс с активным ожиданием и с backoff'ом в блокировку, который (мьютекс, а не backoff) вызывает только одну InterlockedXXX операцию при захвате мьютекса.


Очередь, типа предложенной, которая использует только голые сохранения и загрузки и дружественна расслабленной модели памяти современных аппаратных платформ, может дать выигрышь ещё на порядок по сравнению с очередью, использующей InterlockedXXX.



1024cores &mdash; all about multithreading, multicore, concurrency, parallelism, lock-free algorithms
Re[11]: Неблокируемая очередь сообщений для двух потоков
От: Сергей Мухин Россия  
Дата: 20.12.07 12:05
Оценка:
Здравствуйте, remark, Вы писали:


R>Очередь с использованием только неблокирующих примитивов (InterlockedXXX) на многопроцессорной/многоядерной машине может дать выигрыш на порядки по сравнению с очередью на мьютексе. Это можно частично вылечить, если использовать спин-мьютекс с активным ожиданием и с backoff'ом в блокировку, который (мьютекс, а не backoff) вызывает только одну InterlockedXXX операцию при захвате мьютекса.



R>Очередь, типа предложенной, которая использует только голые сохранения и загрузки и дружественна расслабленной модели памяти современных аппаратных платформ, может дать выигрышь ещё на порядок по сравнению с очередью, использующей InterlockedXXX.



спасибо.

это, как я понимаю, если не учитывать ограниченность предложенной реализации.

т.е. мы сравниваем ограниченную по числу читателей-писателей, по алгоритму использования, при идеальной (для данной реализации) нагрузке с общем решением. Тогда будем иметь вышеприведённые выгоды по скорости. т.е. может использоваться автором в ограниченных задачах.

если перевести: велосипед на одном квадратном колесе ездит по одной тропинке быстро.
---
С уважением,
Сергей Мухин
Re: Неблокируемая очередь сообщений для двух потоков
От: remark Россия http://www.1024cores.net/
Дата: 20.12.07 12:10
Оценка: 1 (1) +1
Здравствуйте, HaronK, Вы писали:

HK>Недавно возникла задача написания очереди для передачи сообщений от одного потока другому.

HK>После рассмотрения существующих реализаций, решил попробовать написать свою реализацию.
HK>Несколько дней ушло на обдумывание и написание пробного варианта.


В целом идея здравая и в правильном направлении.

По дизайну. Что касается Flush() и isTempQueueSet это, как уже заметил Сергей Мухин, не очень "красивое" решение. Во-первых, это усложняет использование для производителя, но это не очень серьёзная проблема — опустим её. Во-вторых, даже если ты введёшь некий флаг, что производитель "ушёл", поток производителя может быть заблокирован на неопределённое время в произвольном месте, т.е. он может не успеть вызвать Flush() или установить флаг. Тогда в очереди зависнут на неопределенное время часть элементов, которые как бы в очереди, но тем не менее не доступны для потребителя. Ну и в третьих, без этого можно обойтись без дополнительной платы в рантайм — об этом ниже

Далее, если ставить задачу добиться высокой производительности, элементы контейнера очень желательно сделать интрузивными. Т.е. пользователь контейнера будет обязан добавить в свой элемент указатель next. Тогда можно убрать new/delete в самой очереди. Ну и вобщем-то есть значительная вероятность, что new/delete блокирующие. А строить неблокирующий примитив из блокирующих бессмысленно.

Не знаю стоит ли у тебя задача делать блокировку на Dequeue(), т.е. что бы она всегда возврашала элемент, а если его нет, то блокировалась внутри. Добавление такой блокировки поверх такой очереди — задача далеко не тривиальная. Если такой задачи нет, то всё хорошо. Если есть, то смотри ниже.

Что касается реализации, это всё сильно не правильно. Нужно добавить volatile для подавления переупорядочивания компилятора, надо добавить барьеры памяти, для обеспечения корректного порядка операций. Надо убрать промежуточную очередь и очередь потребителя — это всё надо слить в одну очередь. Надо убрать isTempQueueSet — от него только одни проблемы.

Пример хорошей и работающей очереди смотри в библиотеке appcore:
http://appcore.home.comcast.net/~appcore/

Конкретно вот сама очередь ac_queue_spsc:
http://appcore.home.comcast.net/~appcore/appcore/include/ac_queue_spsc_h.html
http://appcore.home.comcast.net/~appcore/appcore/src/ac_queue_spsc_c.html

Если тебе нужно блокирование при пустой очереди, то смотри компонент ac_eventcount_algo1:
http://appcore.home.comcast.net/~appcore/appcore/include/ac_eventcount_algo1_h.html
http://appcore.home.comcast.net/~appcore/appcore/src/ac_eventcount_algo1_c.html
если блокирование не нужно, то просто удали вызовы ac_eventcount_algo1 из очереди.

Библиотека написана в hardcore C стиле, но переписать её на C++ достаточно не сложно. Однако при этом советую хорошо разобраться в принципе работы. Там учтены все моменты касательно переупорядочивания компилятором, порядка видимости изменений и дружественности к разделяемым данным в кэше.

з.ы. Библиотеку разрабатывает Chris Thomasson, который скорее всего уже пишет тебе ответ в comp.programming.threads, и скорее всего сошлёт на эту же реализацию Готов спорить
з.з.ы. Там у него есть небольшая ошибка в том, как соединены компоненты ac_queue_spsc и ac_eventcount_algo1. Если будешь использовать или разбираться в ac_eventcount_algo1, то скажи — я покажу ошибку. Или смотри здесь:
http://groups.google.com/group/comp.programming.threads/browse_frm/thread/00cf4a952ff5af7d
там я указал на ошибку.


1024cores &mdash; all about multithreading, multicore, concurrency, parallelism, lock-free algorithms
Re[4]: Неблокируемая очередь сообщений для двух потоков
От: What Беларусь  
Дата: 20.12.07 14:00
Оценка:
Здравствуйте, Pzz, Вы писали:

Pzz>Здравствуйте, What, Вы писали:


W>>Здравствуйте, Pzz, Вы писали:


Pzz>>>Но есть и более тонкая проблема. Процессор тоже может менять порядок доступа к памяти. Это заметно только если потоки бегут на разных процессорах в многопроцессорной системе (процессор с hyperthreading'ом ведет себя в этом отношении как 2 отдельных процессора). Проявляться это будет в том, что Ваша програмка будет изредка сбоить, причем эффект будет пропадать (или усиливаться, в зависимости от Вашего везения) под отладчиком, при вставлении отладочной печати и т.п. Лечится это с помощью memory barrier'ов, ищите подробности в интернете, мне на эту тему лень лекцию читать.

W>>Это верно для всяких там итаниумов и прочих, а на x86 и Intel64/AMD64 такой проблемы нет.

Pzz>Я тоже так раньше думал, пока сам не нарвался. У меня, к счастию, не было Вашей уверенности, поэтому мне хватило часа, чтобы локализовать проблему, и полдня, чтобы сообразить, как переделать виноватое место. Подумайте на досуге, сколько времени это могло бы занять у Вас.

Я нисколько не пропагандирую использование volatile и не считаю это хорошей практикой. Действительно, существует проблема с необходимостью использования барьеров на не x86/Intel64/AMD64 платформах.
Однако наличие ошибки где-то у Вас в коде вряд ли является убедительным опровержением того, что x86 & co используют strict memory model (это, кстати, можно погуглить).
неплохой пост про volatile, etc
Re[2]: Неблокируемая очередь сообщений для двух потоков
От: HaronK Украина  
Дата: 20.12.07 14:06
Оценка:
Здравствуйте, remark, Вы писали:

R>По дизайну. Что касается Flush() и isTempQueueSet это, как уже заметил Сергей Мухин, не очень "красивое" решение. Во-первых, это усложняет использование для производителя, но это не очень серьёзная проблема — опустим её. Во-вторых, даже если ты введёшь некий флаг, что производитель "ушёл", поток производителя может быть заблокирован на неопределённое время в произвольном месте, т.е. он может не успеть вызвать Flush() или установить флаг. Тогда в очереди зависнут на неопределенное время часть элементов, которые как бы в очереди, но тем не менее не доступны для потребителя. Ну и в третьих, без этого можно обойтись без дополнительной платы в рантайм — об этом ниже


Да, то что производитель будет заблокирован и "хвост" очереди недоступен это не есть хорошо. Но все же это уже больше вопрос архитектуры приложения. Если производитель блокируется, не выполнив положенных действий, то это скорее проблема производителя и программиста, а не очереди.
Возможно моя реализация и "некрасивая", но я пытался, чтоб алгоритм работал быстро, а не "красиво".

R>Далее, если ставить задачу добиться высокой производительности, элементы контейнера очень желательно сделать интрузивными. Т.е. пользователь контейнера будет обязан добавить в свой элемент указатель next. Тогда можно убрать new/delete в самой очереди. Ну и вобщем-то есть значительная вероятность, что new/delete блокирующие. А строить неблокирующий примитив из блокирующих бессмысленно.


Это хорошая идея. Я в начале тоже планировал возложить создание/удаление, на потоки, но тогда я реализовал эту процедуру некорректно и отказался от нее.
Реализую твою версию. Спасибо.

R>Не знаю стоит ли у тебя задача делать блокировку на Dequeue(), т.е. что бы она всегда возврашала элемент, а если его нет, то блокировалась внутри. Добавление такой блокировки поверх такой очереди — задача далеко не тривиальная. Если такой задачи нет, то всё хорошо. Если есть, то смотри ниже.


Нет, у меня читающий поток не требует наличия данных в очереди. Если их нет, то выполняются остальные задачи.

R>Что касается реализации, это всё сильно не правильно. Нужно добавить volatile для подавления переупорядочивания компилятора, надо добавить барьеры памяти, для обеспечения корректного порядка операций. Надо убрать промежуточную очередь и очередь потребителя — это всё надо слить в одну очередь. Надо убрать isTempQueueSet — от него только одни проблемы.


volatile для каких переменных? Вершин цепочек?
Барьеры добавлю.
Но вообще, если поубирать указатели и флаг, как ты сказал, то это уже другой алгоритм получается.

R>Пример хорошей и работающей очереди смотри в библиотеке appcore:

...

Гляну. Правда судя по главной странице, эта библиотека не обновлялась еще с февраля 2005.
Re[5]: Неблокируемая очередь сообщений для двух потоков
От: remark Россия http://www.1024cores.net/
Дата: 20.12.07 14:11
Оценка:
Здравствуйте, What, Вы писали:

W>Здравствуйте, Pzz, Вы писали:


Pzz>>Здравствуйте, What, Вы писали:


W>>>Здравствуйте, Pzz, Вы писали:


Pzz>>>>Но есть и более тонкая проблема. Процессор тоже может менять порядок доступа к памяти. Это заметно только если потоки бегут на разных процессорах в многопроцессорной системе (процессор с hyperthreading'ом ведет себя в этом отношении как 2 отдельных процессора). Проявляться это будет в том, что Ваша програмка будет изредка сбоить, причем эффект будет пропадать (или усиливаться, в зависимости от Вашего везения) под отладчиком, при вставлении отладочной печати и т.п. Лечится это с помощью memory barrier'ов, ищите подробности в интернете, мне на эту тему лень лекцию читать.

W>>>Это верно для всяких там итаниумов и прочих, а на x86 и Intel64/AMD64 такой проблемы нет.

Pzz>>Я тоже так раньше думал, пока сам не нарвался. У меня, к счастию, не было Вашей уверенности, поэтому мне хватило часа, чтобы локализовать проблему, и полдня, чтобы сообразить, как переделать виноватое место. Подумайте на досуге, сколько времени это могло бы занять у Вас.


Я нисколько не пропагандирую использование volatile и не считаю это хорошей практикой. Действительно, существует проблема с необходимостью использования барьеров на не x86/Intel64/AMD64 платформах.


Это полный бред, что на x86 не нужно использовать барьеры памяти при написании примитивов синхронизации. Дабы не ходить вокруг, да около:
Intel® 64 Architecture Memory Ordering White Paper


W>Однако наличие ошибки где-то у Вас в коде вряд ли является убедительным опровержением того, что x86 & co используют strict memory model (это, кстати, можно погуглить).

W>неплохой пост про volatile, etc


Хотя бы прочитай о чём там написано.
А написано там про язык C#. А ключевое слово volatile в C# как раз подразумевает генерацию барьера памяти. Именно потому, что барьеры памяти на x86 нужны.
Это всё в отличие от С++, где volatile директива исключительно для компилятора.



1024cores &mdash; all about multithreading, multicore, concurrency, parallelism, lock-free algorithms
Re[5]: Неблокируемая очередь сообщений для двух потоков
От: Pzz Россия https://github.com/alexpevzner
Дата: 20.12.07 14:52
Оценка:
Здравствуйте, What, Вы писали:

W>Однако наличие ошибки где-то у Вас в коде вряд ли является убедительным опровержением того, что x86 & co используют strict memory model (это, кстати, можно погуглить).


Довольно оскорбительная точка зрения

Я вообще человек довольно дотошный, и загадочные явления изучаю до того момента, когда я уверен, что докопался до источника проблемы. "Попробовали это, и все заработало" — это не мой метод. В данном случая я свой код отладочными печатями поймал за тем, что, грубо говоря, последовательность действий типа
i = 1; j = 2;
выглядит совершенно другой с соседнего процессора. Т.е., в j уже 2, а в i вовсе не 1.

Вы можете и дальше продолжать верить в свои заблуждения. Беда только в том, что поймать такую ошибку может занять у Вас от недели до бесконечности. Никакие отладчики Вам в этом не помогут. Могу лишь пожелать Вам удачи в этом деле.

P.S. Забыл сказать, Интеловскую документацию по x86 по этому вопросу я тоже, разумеется, изучил. Позиция Интела совпадает с моей. Грубо говоря, она сводится к тому, что memory ordering не виден, пока Вы остаетесь на одном процессоре, но с другого процессора он может выглядеть по-другому, если не предпринимать специальных усилий.
Re[6]: Неблокируемая очередь сообщений для двух потоков
От: What Беларусь  
Дата: 20.12.07 15:31
Оценка:
Здравствуйте, remark, Вы писали:

R>Это полный бред, что на x86 не нужно использовать барьеры памяти при написании примитивов синхронизации.

В x86 в некоторых случаях не нужно использовать барьеры памяти там, где они нужны для других архитектур.

R>Дабы не ходить вокруг, да около:

R>Intel® 64 Architecture Memory Ordering White Paper
Оттуда:

1. Loads are not reordered with other loads.
2. Stores are not reordered with other stores.
3. Stores are not reordered with older loads.
4. Loads may be reordered with older stores to different locations but not with older
stores to the same location.

Вовзращаясь к очереди автора топика, где там нужны memory barriers, исходя из спецификации выше? (Я согласен, что надо подобавлять volatile).

W>>неплохой пост про volatile, etc


R>Хотя бы прочитай о чём там написано.

Естественно, читал.
R>А написано там про язык C#. А ключевое слово volatile в C# как раз подразумевает генерацию барьера памяти. Именно потому, что барьеры памяти на x86 нужны.
R>Это всё в отличие от С++, где volatile директива исключительно для компилятора.
Там разбирается достаточно простой пример проблемы с переупорядочиванием инструкций, а по ссылкам есть Java и C++ memory model (то, что ты написал, что volatile не гарантирует барьер), а также интересные комментарии про барьеры в .net:

THE X86 MODEL.

The X86 has a relatively strong model. It states that all memory writes
can not pass one another. Bascially every write is a WriteMemoryBarrier.
What that means is that on the X86 the LazyInit example will never display
a problem. In particular on the X86 the
System.Threading.Thread.WriteMemoryBarrier() API is a no-op. On other
processors however it is important to have the barrier there.

CAVEATES for V1

Unfortunately, the MemoryBarrier APIs do not exist in the V1 product (they
were not a priority since V1 only supports the X86 and as we have seen,
code will work properly on the X86 without the MemoryBarrier calls.
These APIs have already been added what will be the next version of the
platform (to ship with Windows XP server). If you are concerned about
this issue, I would recommend you add the MemoryBarrier calls in the
correct places, and then comment them out until such time as you upgrade
to the next version of DOTNET.

Re[7]: Неблокируемая очередь сообщений для двух потоков
От: remark Россия http://www.1024cores.net/
Дата: 20.12.07 16:00
Оценка:
Здравствуйте, What, Вы писали:

W>Здравствуйте, remark, Вы писали:


R>>Это полный бред, что на x86 не нужно использовать барьеры памяти при написании примитивов синхронизации.


W>В x86 в некоторых случаях не нужно использовать барьеры памяти там, где они нужны для других архитектур.



С этим согласен. Но эта фраза не несёт практически никакой информации. Во-первых, потому что она справедлива практически для любой архитектуры. Во-вторых, она не несёт никакой информации, которую можно использовать на практике.
Тем не менее фраза "... на x86 и Intel64/AMD64 такой проблемы нет" не верна.


R>>Дабы не ходить вокруг, да около:

R>>Intel® 64 Architecture Memory Ordering White Paper
W>Оттуда:
W>

W>1. Loads are not reordered with other loads.
W>2. Stores are not reordered with other stores.
W>3. Stores are not reordered with older loads.
W>4. Loads may be reordered with older stores to different locations but not with older
W>stores to the same location.

W>Вовзращаясь к очереди автора топика, где там нужны memory barriers, исходя из спецификации выше? (Я согласен, что надо подобавлять volatile).

Это не совсем точно. Документ построен в виде "здесь некоторые правила. А здесь случаи, когда предыдущие правила не работают. А здесь оставшиеся правила". Смотри раздел 2.4 — он вносит некоторые усложнения.


W>>>неплохой пост про volatile, etc


R>>Хотя бы прочитай о чём там написано.

W>Естественно, читал.
R>>А написано там про язык C#. А ключевое слово volatile в C# как раз подразумевает генерацию барьера памяти. Именно потому, что барьеры памяти на x86 нужны.
R>>Это всё в отличие от С++, где volatile директива исключительно для компилятора.

W>Там разбирается достаточно простой пример проблемы с переупорядочиванием инструкций, а по ссылкам есть Java и C++ memory model (то, что ты написал, что volatile не гарантирует барьер), а также интересные комментарии про барьеры в .net:



Интересно, откуда они взяли модель памяти для С++? Её же ещё нет...



1024cores &mdash; all about multithreading, multicore, concurrency, parallelism, lock-free algorithms
Re[12]: Неблокируемая очередь сообщений для двух потоков
От: remark Россия http://www.1024cores.net/
Дата: 20.12.07 16:14
Оценка:
Здравствуйте, Сергей Мухин, Вы писали:

СМ>Здравствуйте, remark, Вы писали:


R>>Очередь с использованием только неблокирующих примитивов (InterlockedXXX) на многопроцессорной/многоядерной машине может дать выигрыш на порядки по сравнению с очередью на мьютексе. Это можно частично вылечить, если использовать спин-мьютекс с активным ожиданием и с backoff'ом в блокировку, который (мьютекс, а не backoff) вызывает только одну InterlockedXXX операцию при захвате мьютекса.


R>>Очередь, типа предложенной, которая использует только голые сохранения и загрузки и дружественна расслабленной модели памяти современных аппаратных платформ, может дать выигрышь ещё на порядок по сравнению с очередью, использующей InterlockedXXX.


СМ>спасибо.


СМ>это, как я понимаю, если не учитывать ограниченность предложенной реализации.


СМ>т.е. мы сравниваем ограниченную по числу читателей-писателей, по алгоритму использования, при идеальной (для данной реализации) нагрузке с общем решением. Тогда будем иметь вышеприведённые выгоды по скорости. т.е. может использоваться автором в ограниченных задачах.


СМ>если перевести: велосипед на одном квадратном колесе ездит по одной тропинке быстро.



Проблема в том, что если ты используешь подход основанный на мьютексах, то у тебя очередь_много_производителей_много_потребителей == очередь_много_производителей_один_потребитель == очередь_один_производитель_много_потребителей == очередь_один_производитель_один_потребитель.
Т.е. если даже тебя бы устроила очередь_один_производитель_один_потребитель ты обязан иметь те же издержки, что и для очередь_много_производителей_много_потребителей.
Если ты используешь подход основанный не на мьютексах, то ты можешь взять специализированную реализацию для конкретной цели.
Т.е. фактически это сравнение можно рассматривать как сравнение очередь_один_производитель_один_потребитель, основанная на блокировках, против очередь_один_производитель_один_потребитель, основанная на неблокирующем подходе.


Второй момент, что очередь_один_производитель_один_потребитель значительно интереснее, чем кажется на первый взгляд. Во-первых, она имеет на порядок меньшие издержки и дружественность к SMP/multicore, чем все остальные типы очередей. Во-вторых, на основе этой очереди можно построить общее решение. Например, можно сделать полносвязанную систему из потоков, связанную такими очередями, где каждый поток может послать сообщение каждому. Либо можно построить систему, в которой один или несколько потоков выступают в роли посредников-маршрутизаторов, рядовые потоки связаны очередями с этимим посредниками и через них посылают/принимают сообщения от всех остальных потоков.
Такая система получается врожденно распределенной и как следствие — масштабируемой. Т.е. конкуренция на никакой очереди не растёт всё больше и больше с ростом числа процессоров/ядер.



1024cores &mdash; all about multithreading, multicore, concurrency, parallelism, lock-free algorithms
Re[8]: Неблокируемая очередь сообщений для двух потоков
От: What Беларусь  
Дата: 20.12.07 16:19
Оценка:
Здравствуйте, remark, Вы писали:

W>>В x86 в некоторых случаях не нужно использовать барьеры памяти там, где они нужны для других архитектур.


R>С этим согласен. Но эта фраза не несёт практически никакой информации. Во-первых, потому что она справедлива практически для любой архитектуры. Во-вторых, она не несёт никакой информации, которую можно использовать на практике.

R>Тем не менее фраза "... на x86 и Intel64/AMD64 такой проблемы нет" не верна.
Ок, переформулируем. В коде, предложенном автором топика, я не вижу проблемы с барьерами памяти (для x86 & co) и, соответственно, необходиости их использования.

W>>Вовзращаясь к очереди автора топика, где там нужны memory barriers, исходя из спецификации выше? (Я согласен, что надо подобавлять volatile).


R>Это не совсем точно. Документ построен в виде "здесь некоторые правила. А здесь случаи, когда предыдущие правила не работают. А здесь оставшиеся правила". Смотри раздел 2.4 — он вносит некоторые усложнения.

Да, вносит. Но всё равно, где нужны memory barriers в коде автора топика?
Re[9]: Неблокируемая очередь сообщений для двух потоков
От: remark Россия http://www.1024cores.net/
Дата: 20.12.07 16:36
Оценка:
Здравствуйте, What, Вы писали:

R>>С этим согласен. Но эта фраза не несёт практически никакой информации. Во-первых, потому что она справедлива практически для любой архитектуры. Во-вторых, она не несёт никакой информации, которую можно использовать на практике.

R>>Тем не менее фраза "... на x86 и Intel64/AMD64 такой проблемы нет" не верна.

W>Ок, переформулируем. В коде, предложенном автором топика, я не вижу проблемы с барьерами памяти (для x86 & co) и, соответственно, необходиости их использования.



В коде автора конкретно на текущей модели памяти x86 барьеры памяти не нужны.
Если, конечно, он работает на типе памяти write-back, а не на write-combined

Но вот если надо будет к этой очереди прикрутить блокирующую dequeue(), которая блокируется пока не появяться новые элементы, то барьер памяти уже понадобится при каждом добавлении элемента.



1024cores &mdash; all about multithreading, multicore, concurrency, parallelism, lock-free algorithms
Re[10]: Неблокируемая очередь сообщений для двух потоков
От: HaronK Украина  
Дата: 20.12.07 16:56
Оценка:
Здравствуйте, remark, Вы писали:

R>Но вот если надо будет к этой очереди прикрутить блокирующую dequeue(), которая блокируется пока не появяться новые элементы, то барьер памяти уже понадобится при каждом добавлении элемента.


Как я уже писал, мне блокировка не нужна, так что все должно работать.
Re[6]: Неблокируемая очередь сообщений для двух потоков
От: What Беларусь  
Дата: 20.12.07 17:11
Оценка:
Здравствуйте, Pzz, Вы писали:

W>>Однако наличие ошибки где-то у Вас в коде вряд ли является убедительным опровержением того, что x86 & co используют strict memory model (это, кстати, можно погуглить).


Pzz>Довольно оскорбительная точка зрения

Ну Вы тоже за словом в карман не лезете.

Pzz>Я вообще человек довольно дотошный, и загадочные явления изучаю до того момента, когда я уверен, что докопался до источника проблемы. "Попробовали это, и все заработало" — это не мой метод.

Я ничуть не сомневаюсь в Вашей дотошности. Однако, я уверен в том, что бы обобщаете проблему.
Потому что:
Pzz>В данном случая я свой код отладочными печатями поймал за тем, что, грубо говоря, последовательность действий типа
i = 1; j = 2;
выглядит совершенно другой с соседнего процессора. Т.е., в j уже 2, а в i вовсе не 1.

Это ваше утверждение явно противоречит доке от Интел (спасибо remark'у):

2.1 Loads are not reordered with other loads and stores are not
reordered with other stores
Intel 64 memory ordering ensures that loads are seen in program order, and that stores are
seen in program order.
Processor 0                  Processor 1
mov [ _x], 1 // M1           mov r1,[_y] // M3
mov [ _y], 1 // M2           mov r2, [_x] // M4
Initially x == y == 0
r1 == 1 and r2 == 0 is not allowed


Pzz>Вы можете и дальше продолжать верить в свои заблуждения. Беда только в том, что поймать такую ошибку может занять у Вас от недели до бесконечности. Никакие отладчики Вам в этом не помогут. Могу лишь пожелать Вам удачи в этом деле.

Ну вот, Вы опять грубите. Думаю, пора сворачивать эту дискуссию.
Pzz>P.S. Забыл сказать, Интеловскую документацию по x86 по этому вопросу я тоже, разумеется, изучил. Позиция Интела совпадает с моей.
Всё таки она отличается от Вашей.
Pzz>Грубо говоря, она сводится к тому, что memory ordering не виден, пока Вы остаетесь на одном процессоре, но с другого процессора он может выглядеть по-другому, если не предпринимать специальных усилий.
Вот, я и говорю, что обобщаете
На самом деле, я тоже считаю, что не стоит полагаться на такие эзотерические вещи, как последовательность доступа к памяти в разных потоках, и что лучше использовать как можно более высокоуровневые средства параллельного выполнения задач, в крайнем случае использовать библиотечные атомарные операции и мьютексы.
Но, возвращаясь к Вашим комментариям автору топика, проблем с переупорядочиванием инструкция на x86 и Intel64/AMD64 в исходном коде автора я не вижу, хотя согласен, что в принципе, а абстрактном коде они могут возникнуть очень легко.
Re[7]: Неблокируемая очередь сообщений для двух потоков
От: Pzz Россия https://github.com/alexpevzner
Дата: 20.12.07 19:04
Оценка:
Здравствуйте, What, Вы писали:

W>Это ваше утверждение явно противоречит доке от Интел (спасибо remark'у):


Да, я вероятно несколько упростил. В той же доке от Intel в пункте 2.3 сказано:

[q]
Intel 64 memory ordering allows load instructions to be reordered with prior stores to a
different location. However, loads are not reordered with prior stores to the same location.
The first example in this section illustrates the case in which a load may be reordered with an
older store – i.e. if the store and load are to different non-overlapping locations.

Processor 0                    Processor 1
mov [ _x], 1  // M1            mov [ _y], 1 // M3
mov r1, [ _y] // M2            mov r2, [_x] // M4

Initially x == y == 0
r1 == 0 and r2 == 0 is allowed

[q]

Собственно, на что-то такое я и нарвался в реальном коде. Тоже попытался выпендриться, и сделать lock free в типичном случае

W>Но, возвращаясь к Вашим комментариям автору топика, проблем с переупорядочиванием инструкция на x86 и Intel64/AMD64 в исходном коде автора я не вижу, хотя согласен, что в принципе, а абстрактном коде они могут возникнуть очень легко.


Возможно вокруг именно добавления/удаления проблем нет — нет сил вычитывать, простите. Т.е., я готов допустить, что указатель на добавленный объект "читающий" поток получит правильный, и совместными усилиями очередь они не порушат. Однако из-за reordering'а load и store в different location есть вполне реальный шанс получить валидный указатель на объект, внутрь которого store еще не завершились.

Если бы очередь была защищена нормальным мутексом, это не было заметно, поскольку мутекс включает в себя memory barrier. InterlockedXXX на интеле тоже включает глобальный барьер, хотя микрософтовская документация этого не обещает (т.е., на других CPU это может быть и не так).
Re[8]: Неблокируемая очередь сообщений для двух потоков
От: What Беларусь  
Дата: 20.12.07 19:32
Оценка:
Здравствуйте, Pzz, Вы писали:

Pzz>Возможно вокруг именно добавления/удаления проблем нет — нет сил вычитывать, простите. Т.е., я готов допустить, что указатель на добавленный объект "читающий" поток получит правильный, и совместными усилиями очередь они не порушат. Однако из-за reordering'а load и store в different location есть вполне реальный шанс получить валидный указатель на объект, внутрь которого store еще не завершились.

Как раз пункт 2.1 (который я цитировал) также гарантирует, что объект будет валидным. Если сначала записать сам объект (первая переменная), а потом положить указатель на него в очередь (вторая переменная внутри очереди), то, в момент, когда второй поток прочитает новое значение второй переменной (достанет из очереди указатель) первая переменная (содержимое объекта) будет уже иметь валидное знаечение, потому что она была записана раньше, а операции записи в разные адреса из одного потока не переставляются в x86. Так что конкретно в том коде автора темы барьеры доступа к памяти не нужны.
Хотя Вы безусловно правы, что шаг влево или вправо в коде очереди может привести к очень трудно обнаруживаемой и отлаживаемой ошибке, поймать которую "может занять у меня от недели до бесконечности".

Pzz>Если бы очередь была защищена нормальным мутексом, это не было заметно, поскольку мутекс включает в себя memory barrier.

+1
Re: Неблокируемая очередь сообщений для двух потоков
От: vmoiseev Россия  
Дата: 20.12.07 19:35
Оценка:
Здравствуйте, HaronK, Вы писали:
HK> ...Недавно возникла задача написания очереди для передачи сообщений от одного потока другому.
Очередь на основе циклического массива Вам не подойдет?
Один указатель(index) для записи и один для чтения. max — максимальное значение индексов, если больше, index=0
Если индекс чтения=индексу записи, то очередь пуста.

С уважением, Владимир.
Re[9]: Неблокируемая очередь сообщений для двух потоков
От: Pzz Россия https://github.com/alexpevzner
Дата: 20.12.07 19:51
Оценка:
Здравствуйте, What, Вы писали:

W>Как раз пункт 2.1 (который я цитировал) также гарантирует, что объект будет валидным. Если сначала записать сам объект (первая переменная), а потом положить указатель на него в очередь (вторая переменная внутри очереди), то, в момент, когда второй поток прочитает новое значение второй переменной (достанет из очереди указатель) первая переменная (содержимое объекта) будет уже иметь валидное знаечение, потому что она была записана раньше, а операции записи в разные адреса из одного потока не переставляются в x86. Так что конкретно в том коде автора темы барьеры доступа к памяти не нужны.


При условии, что объект будет читаться в том же порядке, в котором он писался. Что является в нормальной жизни совершенно невыносимым требованием.

Я, кстати, читал не white paper, а другой интеловский дукомент, и поэтому у меня сложилось несколько другое (более пессемистическое) впечатление. Посмотрите пункт 7.2.2, там прям картинка нарисована про то, как writes с разных процессоров могут быть reordered. Следует учесть так же, что на SMP машинке поток вполне может перескочить на соседний процессор как раз между инструкциями, так что даже запись в одно и то же место может приехать out of order, если специально не позаботиться об обратном. Хотя, конечно, в процессе перескакивания на соседний процессор наверняка хоть один барьер да встретится, так что эта возможность скорее гипотетическая. Но я, знаете ли, в таких вопросах пессемист
Re[10]: Неблокируемая очередь сообщений для двух потоков
От: What Беларусь  
Дата: 20.12.07 21:16
Оценка:
Здравствуйте, Pzz, Вы писали:

Pzz>При условии, что объект будет читаться в том же порядке, в котором он писался. Что является в нормальной жизни совершенно невыносимым требованием.

??
Достаточно читать (в любом порядке) после того, как объект достали из кучи.

Pzz>Я, кстати, читал не white paper, а другой интеловский дукомент, и поэтому у меня сложилось несколько другое (более пессемистическое) впечатление. Посмотрите пункт 7.2.2, там прям картинка нарисована про то, как writes с разных процессоров могут быть reordered. Следует учесть так же, что на SMP машинке поток вполне может перескочить на соседний процессор как раз между инструкциями, так что даже запись в одно и то же место может приехать out of order, если специально не позаботиться об обратном. Хотя, конечно, в процессе перескакивания на соседний процессор наверняка хоть один барьер да встретится, так что эта возможность скорее гипотетическая. Но я, знаете ли, в таких вопросах пессемист

Ну это совсем уже пессемистично
Re[11]: Неблокируемая очередь сообщений для двух потоков
От: What Беларусь  
Дата: 20.12.07 21:20
Оценка:
Здравствуйте, What, Вы писали:

W>Достаточно читать (в любом порядке) после того, как объект достали из кучи.

В смысле из очереди
Re[9]: Неблокируемая очередь сообщений для двух потоков
От: hexis  
Дата: 21.12.07 07:58
Оценка:
remark wrote:
>
>
> H>По
> H>скорости — я не думаю, что реализация очереди на блокировках будет
> H>существенно медленнее. Если вообще будет.
>
>
> На однопроцессорной машине она может быть лучше более чем на порядок. На

Это при привой реализации потоков. При нормальной нет. Я вот не
поленился, попробовал простенькую реализацию очередей сделанную из тех
же соображений (правда получилось — один читатель, много писателей).
Результат — неблокирующая выигрывает процентов на 15-20 (в основном за
счет тормозов на InterlockedXXX операциях). И это вполне объяснимо — при
взаимодействии 1:1 в указанных условиях можно обойтись без фактических
блокировок. Вот код:

template <class TYPE>
class QueueMutex
{
public:
    QueueMutex(size_t heap_size);
    ~QueueMutex();

    void Enqueue(const TYPE& data);
    bool Dequeue(TYPE& data);

private:
    struct Element
    {
        TYPE data;
        Element* next;

        Element(const TYPE& _data) : data(_data), next(NULL) {}
    };

    Element * rqueue;
    volatile Element * whead, * wtail;
    Alloc<Element> heap;

    Mutex writer_cs;
};

template <class TYPE>
QueueMutex<TYPE>::QueueMutex(size_t heap_size)
: heap(heap_size)
{
    rqueue = NULL;
    whead = wtail = NULL;
}

template <class TYPE>
QueueMutex<TYPE>::~QueueMutex()
{
}

template <class TYPE>
void QueueMutex<TYPE>::Enqueue(const TYPE& data)
{
    Element * n = heap.alloc(data);

    writer_cs.lock();
    if (whead == NULL)
        whead = n;
    else
        wtail->next = n;
    writer_cs.unlock();
    // если писателей много это присвоение нужно внести в
    // критическую секцию
    wtail = n;
}

template <class TYPE>
bool QueueMutex<TYPE>::Dequeue(TYPE& data)
{
    Element * next = rqueue;
    if (next == NULL) {
        next = (Element *)whead;
        if (next == NULL)
            return false;
        writer_cs.lock();
        whead = NULL;
        writer_cs.unlock();
    }

    data = next->data;
    rqueue = next->next;
    heap.release(next);
    return true;
}


Из отличий — я эксперементировал с неблокирующим аллокатором (иначе
new/delete тоже занимаются сериализацией) и с разными мьютексами. По
порядку снижения производительности: spin lock, моя реализация
рекурсивного mutex с блокировками и win32 critical_section.

> многопроцессорной машине выигрыш будет ещё существеннее + иметь

> последствия не только на голую скорость, но и на масштабиремость.
>

С масштабируемостью, ИМХО, совсем не очевидно — реализация-то рассчитана
на взаимодействие 1:1. При таких условиях и блокирующие алгоритмы не
будут деградировать.
Posted via RSDN NNTP Server 2.1 beta
Re[2]: Неблокируемая очередь сообщений для двух потоков
От: HaronK Украина  
Дата: 21.12.07 08:12
Оценка:
Здравствуйте, vmoiseev, Вы писали:

V>Очередь на основе циклического массива Вам не подойдет?

V>Один указатель(index) для записи и один для чтения. max — максимальное значение индексов, если больше, index=0
V>Если индекс чтения=индексу записи, то очередь пуста.

А для доступа к индексам блокировка не нужна?
Re: Неблокируемая очередь сообщений для двух потоков
От: HaronK Украина  
Дата: 21.12.07 08:56
Оценка:
Переписал реализацию очереди с учетом предложений в топике.
Из изменений:
1. Убрал флаг. Теперь его роль исполняет указатель на промежуточную цепочку.
2. Убрал new/delete. Операции создания и удаления возлагаются на потоки.

Не знаю насколько замена флага на указатель безопасна, но в моих тестах этот код отрабатывал.
Барьеры пока не добавлял, поскольку нет единого мнения нужны ли они вообще.

Получилось меньше операций и по идее должно быть быстрее.
Не знаю правда как повлияет на быстродействие второй volatile.

template <class TYPE> class LockFreeQueue
{
public:
    /// constructor
    LockFreeQueue();
    /// destructor
    ~LockFreeQueue();

    // Writer method
    void Enqueue(TYPE* data);

    // Flush remained data. Writer method
    bool Flush();

    // Reader method
    bool Dequeue(volatile TYPE*& data);

private:
    volatile TYPE *readerTop;
    volatile TYPE *tempTop;
    TYPE *writerTop, *writerBottom;
};

//------------------------------------------------------------------------------
/**
*/
template<class TYPE>
LockFreeQueue<TYPE>::LockFreeQueue()
{
    readerTop = tempTop = writerTop = writerBottom = NULL;
}

//------------------------------------------------------------------------------
/**
*/
template<class TYPE>
LockFreeQueue<TYPE>::~LockFreeQueue()
{
}

//------------------------------------------------------------------------------
/**
*/
template<class TYPE>
void
LockFreeQueue<TYPE>::Enqueue(TYPE* data)
{
    //n_assert(data);

    data->next = NULL;

    if (writerTop)
    {
        writerBottom->next = data;
        writerBottom = data;
    }
    else
    {
        writerTop = writerBottom = data;
    }

    if (!tempTop)
    {
        tempTop = writerTop;
        writerTop = NULL;
    }
}

//------------------------------------------------------------------------------
/**
*/
template<class TYPE>
bool
LockFreeQueue<TYPE>::Flush()
{
    if (!writerTop) return true;

    if (!tempTop)
    {
        tempTop = writerTop;
        return true;
    }
    return false;
}

//------------------------------------------------------------------------------
/**
*/
template<class TYPE>
bool
LockFreeQueue<TYPE>::Dequeue(volatile TYPE*& data)
{
    if (!readerTop)
    {
        if (tempTop)
        {
            readerTop = tempTop;
            tempTop   = NULL;
        }
        else
        {
            return false;
        }
    }

    data = readerTop;
    readerTop = readerTop->next;

    return true;
}
Re[13]: Неблокируемая очередь сообщений для двух потоков
От: HaronK Украина  
Дата: 21.12.07 10:29
Оценка:
Здравствуйте, remark, Вы писали:

R>Второй момент, что очередь_один_производитель_один_потребитель значительно интереснее, чем кажется на первый взгляд. Во-первых, она имеет на порядок меньшие издержки и дружественность к SMP/multicore, чем все остальные типы очередей. Во-вторых, на основе этой очереди можно построить общее решение. Например, можно сделать полносвязанную систему из потоков, связанную такими очередями, где каждый поток может послать сообщение каждому. Либо можно построить систему, в которой один или несколько потоков выступают в роли посредников-маршрутизаторов, рядовые потоки связаны очередями с этимим посредниками и через них посылают/принимают сообщения от всех остальных потоков.

R>Такая система получается врожденно распределенной и как следствие — масштабируемой. Т.е. конкуренция на никакой очереди не растёт всё больше и больше с ростом числа процессоров/ядер.

Я тоже думал о реализации варианта многие ко многим,а вот идея маршрутизатора интересна. Сейчас думаю над ее реализацией.
Re[3]: Неблокируемая очередь сообщений для двух потоков
От: vmoiseev Россия  
Дата: 21.12.07 14:26
Оценка:
Здравствуйте, HaronK, Вы писали:
HK>А для доступа к индексам блокировка не нужна?
Думаю, что нет. Каждый поток меняет только свой индекс.
С уважением, Владимир.
Re[10]: Неблокируемая очередь сообщений для двух потоков
От: remark Россия http://www.1024cores.net/
Дата: 21.12.07 15:56
Оценка:
Здравствуйте, Pzz, Вы писали:

Pzz>Здравствуйте, What, Вы писали:


W>>Как раз пункт 2.1 (который я цитировал) также гарантирует, что объект будет валидным. Если сначала записать сам объект (первая переменная), а потом положить указатель на него в очередь (вторая переменная внутри очереди), то, в момент, когда второй поток прочитает новое значение второй переменной (достанет из очереди указатель) первая переменная (содержимое объекта) будет уже иметь валидное знаечение, потому что она была записана раньше, а операции записи в разные адреса из одного потока не переставляются в x86. Так что конкретно в том коде автора темы барьеры доступа к памяти не нужны.


Pzz>При условии, что объект будет читаться в том же порядке, в котором он писался. Что является в нормальной жизни совершенно невыносимым требованием.



Нет. Достаточно что бы данные элемента читались после того, как будет считан из очереди сам указатель на эти данные.
Несмотря на всю абсурдность, были такие процессоры, которые могли это делать — читать данные по указателю до того как считан сам указатель


Pzz>Я, кстати, читал не white paper, а другой интеловский дукомент, и поэтому у меня сложилось несколько другое (более пессемистическое) впечатление. Посмотрите пункт 7.2.2, там прям картинка нарисована про то, как writes с разных процессоров могут быть reordered. Следует учесть так же, что на SMP машинке поток вполне может перескочить на соседний процессор как раз между инструкциями, так что даже запись в одно и то же место может приехать out of order, если специально не позаботиться об обратном. Хотя, конечно, в процессе перескакивания на соседний процессор наверняка хоть один барьер да встретится, так что эта возможность скорее гипотетическая. Но я, знаете ли, в таких вопросах пессемист



При миграции потока с одного процессора на другой *гарантированно* обязательно будет выполнен полный барьер памяти на том процессоре, с которого идёт миграция, и на том, на который идёт миграция. Плюс гарантируется, что любая запись с нового процессора будет видна не раньше любой записи со старого процессора.
Я думаю, что в POSIX это должно гарантироваться. В Win32 сложно сказать, где такую информацию можно искать, но уверен, что это в любом случае будет работать так и не сломается в будущем.



1024cores &mdash; all about multithreading, multicore, concurrency, parallelism, lock-free algorithms
Re[10]: Неблокируемая очередь сообщений для двух потоков
От: remark Россия http://www.1024cores.net/
Дата: 21.12.07 16:05
Оценка:
Здравствуйте, hexis, Вы писали:

H>Это при привой реализации потоков. При нормальной нет. Я вот не

H>поленился, попробовал простенькую реализацию очередей сделанную из тех
H>же соображений (правда получилось — один читатель, много писателей).
H>Результат — неблокирующая выигрывает процентов на 15-20 (в основном за
H>счет тормозов на InterlockedXXX операциях). И это вполне объяснимо — при
H>взаимодействии 1:1 в указанных условиях можно обойтись без фактических
H>блокировок. Вот код:


Подожди-подожи. Какие InterlockedXXX, если мы говорим об очереди без InterlockedXXX?!
Если ты используешь InterlockedXXX, то это естественно по скорости будет одинакого, т.к. мьютекс == InterlockedXXX.



H>Из отличий — я эксперементировал с неблокирующим аллокатором (иначе

H>new/delete тоже занимаются сериализацией) и с разными мьютексами. По
H>порядку снижения производительности: spin lock, моя реализация
H>рекурсивного mutex с блокировками и win32 critical_section.


В реализации spin lock сколько операций InterlockedXXX на одну блокировку/разблокировку?


>> многопроцессорной машине выигрыш будет ещё существеннее + иметь

>> последствия не только на голую скорость, но и на масштабиремость.
>>

H>С масштабируемостью, ИМХО, совсем не очевидно — реализация-то рассчитана

H>на взаимодействие 1:1. При таких условиях и блокирующие алгоритмы не
H>будут деградировать.


Во-первых, на многопроцессорной машине, если у тебя мьютекс без спин-части, то существенно увеличивается вероятность блокирования.
Во-вторых, дело не только в увеличении конкуренции за один конкретный мьютекс, дело так же в увеличении давления на шину когерентности кэшей.



1024cores &mdash; all about multithreading, multicore, concurrency, parallelism, lock-free algorithms
Re[2]: Неблокируемая очередь сообщений для двух потоков
От: remark Россия http://www.1024cores.net/
Дата: 21.12.07 16:10
Оценка:
Здравствуйте, HaronK, Вы писали:

HK>Переписал реализацию очереди с учетом предложений в топике.

HK>Из изменений:
HK>1. Убрал флаг. Теперь его роль исполняет указатель на промежуточную цепочку.
HK>2. Убрал new/delete. Операции создания и удаления возлагаются на потоки.

HK>Не знаю насколько замена флага на указатель безопасна, но в моих тестах этот код отрабатывал.


Это даже безопаснее, чем использование отдельного флага.

HK>Барьеры пока не добавлял, поскольку нет единого мнения нужны ли они вообще.


Конкретно в этой реализации на текущей модели памяти х86 барьеры памяти не нужны.


HK>Получилось меньше операций и по идее должно быть быстрее.


Мерить не пробовал?


HK>Не знаю правда как повлияет на быстродействие второй volatile.


Практически никак.

Только тут тебе volatile не поможет, т.к. пользовательские данные не volatile, следовательно обращения к ним могут перемещаться произвольно.
Тут тебе надо использовать _ReadWriteBarrier()



1024cores &mdash; all about multithreading, multicore, concurrency, parallelism, lock-free algorithms
Re[14]: Неблокируемая очередь сообщений для двух потоков
От: remark Россия http://www.1024cores.net/
Дата: 21.12.07 16:23
Оценка:
Здравствуйте, HaronK, Вы писали:

HK>Здравствуйте, remark, Вы писали:


R>>Второй момент, что очередь_один_производитель_один_потребитель значительно интереснее, чем кажется на первый взгляд. Во-первых, она имеет на порядок меньшие издержки и дружественность к SMP/multicore, чем все остальные типы очередей. Во-вторых, на основе этой очереди можно построить общее решение. Например, можно сделать полносвязанную систему из потоков, связанную такими очередями, где каждый поток может послать сообщение каждому. Либо можно построить систему, в которой один или несколько потоков выступают в роли посредников-маршрутизаторов, рядовые потоки связаны очередями с этимим посредниками и через них посылают/принимают сообщения от всех остальных потоков.

R>>Такая система получается врожденно распределенной и как следствие — масштабируемой. Т.е. конкуренция на никакой очереди не растёт всё больше и больше с ростом числа процессоров/ядер.

HK>Я тоже думал о реализации варианта многие ко многим,а вот идея маршрутизатора интересна. Сейчас думаю над ее реализацией.



Тут палка о двух концах.
В полносвязанной системе получается, что каждому потоку надо опрашивать N-1 очередей, и отправлять в N-1 очередей.
С маршрутизаторами (с одним) получается, что каждому потоку надо опрашивать и отправлять в 1 очередь. Но зато увеличивается латентность доставки, т.к. сообщение должно пройти через 2 очереди. Плюс у маршрутизатора получается достаточно большой объём дополнительной работы. Плюс один дополнительный процессор "трогает" память сообщения, т.е. возрастает нагрузка на шину когерентности кэшей.

Я лично, когда думал над этим вопросом, склонился к полносвязанной системе. По крайней мере, пока число процессоров меньше сотни. Когда число процессоров перевалит за сотню, то опрашивать более сотни очередей становится очень накладно. Но и маршрутизатор имхо тут будет не очень хорошо себя вести. Скорее всего лучше будет некое гибридное решение.
А вот, например, Chris Thomasson (который делает appcore и vZOOM) склоняется к системе с маршрутизаторами. Хотя видимо он думает о NUMA системах, где будет один маршрутизатор на NUMA-узел, который обслуживает только процессоры данного узла. В таком контексте идея с маршрутизаторами звучит более разумно.

А если не секрет, ты чем занимаешься? Каким-то конкретным проектом? Или так, для себя?



1024cores &mdash; all about multithreading, multicore, concurrency, parallelism, lock-free algorithms
Re[2]: Неблокируемая очередь сообщений для двух потоков
От: remark Россия http://www.1024cores.net/
Дата: 21.12.07 16:27
Оценка:
Здравствуйте, HaronK, Вы писали:

HK>Переписал реализацию очереди с учетом предложений в топике.

HK>Из изменений:
HK>1. Убрал флаг. Теперь его роль исполняет указатель на промежуточную цепочку.
HK>2. Убрал new/delete. Операции создания и удаления возлагаются на потоки.

HK>Не знаю насколько замена флага на указатель безопасна, но в моих тестах этот код отрабатывал.

HK>Барьеры пока не добавлял, поскольку нет единого мнения нужны ли они вообще.


А почему ты не хочешь совсем убрать промежуточную очередь? И возможность "зависания" элементов в очереди?




1024cores &mdash; all about multithreading, multicore, concurrency, parallelism, lock-free algorithms
Re[2]: lock-free node allocator for fifo-queues
От: remark Россия http://www.1024cores.net/
Дата: 21.12.07 16:36
Оценка:
Здравствуйте, HaronK, Вы писали:


HK>2. Убрал new/delete. Операции создания и удаления возлагаются на потоки.



Возможно тебе будет интересен аллокатор, который я разработал специально для fifo-очередей:
lock-free node allocator for fifo-queues

Он работает по очень интересному принципу — фактически это кэш элементов, разделяемый между 2 потоками. При этом оба потока (производитель и потребитель) не используют никаких дорогих операций — мьютексов, InterlockedXXX, барьеров памяти и т.д.
Операция освобождения элемента сводится к одной записи в память (!)
Операция выделения элемента своидится к нескольким проверкам и нескольким записям в память.
Алокатор тоже можно сделать интрузивным, тогда стоимость добавления/изъятия из очереди практически стремится к нулю

И там же есть пример использования этого аллокатора с mpsc fifo очереди.



1024cores &mdash; all about multithreading, multicore, concurrency, parallelism, lock-free algorithms
Re[11]: Неблокируемая очередь сообщений для двух потоков
От: hexis  
Дата: 21.12.07 22:29
Оценка:
remark wrote:
>
> Здравствуйте, hexis, Вы писали:
>
> H>Это при привой реализации потоков. При нормальной нет. Я вот не
> H>поленился, попробовал простенькую реализацию очередей сделанную из тех
> H>же соображений (правда получилось — один читатель, много писателей).
> H>Результат — неблокирующая выигрывает процентов на 15-20 (в основном за
> H>счет тормозов на InterlockedXXX операциях). И это вполне объяснимо — при
> H>взаимодействии 1:1 в указанных условиях можно обойтись без фактических
> H>блокировок. Вот код:
>
> Подожди-подожи. Какие InterlockedXXX, если мы говорим об очереди без
> InterlockedXXX?!

Они используются в моей реализации блокирующейся очереди. Все же должно
быть честно.

> Если ты используешь InterlockedXXX, то это естественно по скорости будет

> одинакого, т.к. мьютекс == InterlockedXXX.

Так эти мьютексы потенциально блокирующиеся (при неудаче — без spin
зависают на семафоре). Кстати, если заставить реализацию переключать
потоки так, чтобы постоянно появились ожидания (я сделал 1 раз из
десяти), добавив sleep сразу после lock, время работы становится хуже в
два раза. И большего различия мне добиться не удалось.
Использование стандартных win32 mutex/sem, приводит к тому, что
блокирующая очередь работает в пять раз медленнее. Но это совсем не порядки.
Все проверки на однопроцессорной машине.

>

> В реализации spin lock сколько операций InterlockedXXX на одну
> блокировку/разблокировку?

По необходимости. В лучшем случае — одна. В мьютексе, в лучшем случае —
тоже одна.

>

>>> многопроцессорной машине выигрыш будет ещё существеннее + иметь
>>> последствия не только на голую скорость, но и на масштабиремость.
>>>
>
> H>С масштабируемостью, ИМХО, совсем не очевидно — реализация-то рассчитана
> H>на взаимодействие 1:1. При таких условиях и блокирующие алгоритмы не
> H>будут деградировать.
>
>
> Во-первых, на многопроцессорной машине, если у тебя мьютекс без
> спин-части, то существенно увеличивается вероятность блокирования.

Это так, согласен. Точнее — больше шансов за то, что потоки будут
проходить по критической секции одновременно, за чем последует
переключение в режим ядра, что и будет основным тормозом — пока
переключился, пока опять квант выделят. В этом плане очень интересно
выглядят реализации потоков в userspace, так, что переключение в режим
ядра происходит буквально по необходимости — я когда-то
экспериментировал с такой реализаций под QNX 4 — прирост
производительности просто колоссальный по сравнению с потоками из ядра.

> Во-вторых, дело не только в увеличении конкуренции за один конкретный

> мьютекс, дело так же в увеличении давления на шину когерентности кэшей.
>

Есди я правильно тебя понимаю, то эта проблема в большей мере актуальна
для spin lock.
Posted via RSDN NNTP Server 2.1 beta
Re[12]: Неблокируемая очередь сообщений для двух потоков
От: remark Россия http://www.1024cores.net/
Дата: 22.12.07 11:21
Оценка:
Здравствуйте, hexis, Вы писали:

>> Подожди-подожи. Какие InterlockedXXX, если мы говорим об очереди без

>> InterlockedXXX?!

H>Они используются в моей реализации блокирующейся очереди. Все же должно

H>быть честно.


Опять не понятно. В примере на мьютексах, который ты привёл, — очередь не блокирующаяся. Зачем ты тогда делал на Interlocked блокирующуюся?


>> Если ты используешь InterlockedXXX, то это естественно по скорости будет

>> одинакого, т.к. мьютекс == InterlockedXXX.

H>Так эти мьютексы потенциально блокирующиеся (при неудаче — без spin

H>зависают на семафоре). Кстати, если заставить реализацию переключать
H>потоки так, чтобы постоянно появились ожидания (я сделал 1 раз из
H>десяти), добавив sleep сразу после lock, время работы становится хуже в
H>два раза. И большего различия мне добиться не удалось.
H>Использование стандартных win32 mutex/sem, приводит к тому, что
H>блокирующая очередь работает в пять раз медленнее. Но это совсем не порядки.
H>Все проверки на однопроцессорной машине.


Я и не говорил, что тут будет на порядки. Я говорил:
1. Между мьютексами и Interlocked на многопроцессорной машине будет на порядки
2. Между Interlocked и без Interlocked будет на порядки


>> В реализации spin lock сколько операций InterlockedXXX на одну

>> блокировку/разблокировку?

H>По необходимости. В лучшем случае — одна. В мьютексе, в лучшем случае -

H>тоже одна.


Ты придумал реализацию не спин мьютекса с одной Interlocked операцией?!
А ты можешь запостить. Насколько я заню, такого в мире ещё нет.



1024cores &mdash; all about multithreading, multicore, concurrency, parallelism, lock-free algorithms
Re[12]: Неблокируемая очередь сообщений для двух потоков
От: remark Россия http://www.1024cores.net/
Дата: 22.12.07 11:32
Оценка:
Здравствуйте, hexis, Вы писали:

>>>> многопроцессорной машине выигрыш будет ещё существеннее + иметь

>>>> последствия не только на голую скорость, но и на масштабиремость.
>>>>
>>
>> H>С масштабируемостью, ИМХО, совсем не очевидно — реализация-то рассчитана
>> H>на взаимодействие 1:1. При таких условиях и блокирующие алгоритмы не
>> H>будут деградировать.
>>
>>
>> Во-первых, на многопроцессорной машине, если у тебя мьютекс без
>> спин-части, то существенно увеличивается вероятность блокирования.

H>Это так, согласен. Точнее — больше шансов за то, что потоки будут

H>проходить по критической секции одновременно, за чем последует
H>переключение в режим ядра, что и будет основным тормозом — пока
H>переключился, пока опять квант выделят. В этом плане очень интересно
H>выглядят реализации потоков в userspace, так, что переключение в режим
H>ядра происходит буквально по необходимости — я когда-то
H>экспериментировал с такой реализаций под QNX 4 — прирост
H>производительности просто колоссальный по сравнению с потоками из ядра.


Да, такие вещи есть. Но они так сказать очень частно применимы Т.е. надо делать обёртки для *всех* блокирующих вызовов. И использовать только их. Любой блокирующий вызов, если он всё таки будет, поломает систему. А если пользователю надо сделать блокирующий вызов из сторонней библиотеки...
Плюс надо оборачивать неблокирующие вызовы, что бы из них сделать видимость блокирующего...
Очень интересная реализация юзер-спейс кооперативного тридинга Capriccio:
http://capriccio.cs.berkeley.edu/
Фактически они позволяют применять модель "поток на соединение" и при этом получать производительность евент-дривен систем. Т.е. можно создавать сотни тысяч потоков, и все операции шедулинга выполняются за O(1).


>> Во-вторых, дело не только в увеличении конкуренции за один конкретный

>> мьютекс, дело так же в увеличении давления на шину когерентности кэшей.
>>

H>Есди я правильно тебя понимаю, то эта проблема в большей мере актуальна

H>для spin lock.


Нет, она актуальная для всего, что использует разделяемую память, когда один поток читает то, что записал другой. И особенно при использовании Interlocked операций.



1024cores &mdash; all about multithreading, multicore, concurrency, parallelism, lock-free algorithms
Re[13]: Неблокируемая очередь сообщений для двух потоков
От: hexis  
Дата: 23.12.07 20:23
Оценка:
remark wrote:
>
> Опять не понятно. В примере на мьютексах, который ты привёл, — очередь
> не блокирующаяся. Зачем ты тогда делал на Interlocked блокирующуюся?

Не совсем так — можно сказать, она мало блокирующая, по сравнению с
типичной реализацией очередей — из-за своей специализации под конкретную
задачу. (NB: Что на самом деле — не так часто встречается. Возможно, это
и правильно: организовать взаимодействие потоков, без опыта — сложная
штука, и народ часто отскакивает на стандартные реализации, почерпнутые
из интернета)

Тут же в писателе требуется атомарность при выполнении операции "if
(whead != NULL) wtail->next = n;" Поэтому в читателе присвоение "whead =
NULL" и обложено блокировками, как это ни странно выглядит.

> Я и не говорил, что тут будет на порядки. Я говорил:

> 1. Между мьютексами и Interlocked на *многопроцессорной* машине будет на
> порядки
> 2. Между Interlocked и *без* Interlocked будет на порядки

Вообще, разговор начался с того, что я предположил, что различие между
wait-free и блокируюшей реализацией будет минимальным, на что ты ответил:

На однопроцессорной машине она может быть лучше более чем на порядок.
На многопроцессорной машине выигрыш будет ещё существеннее + иметь
последствия не только на голую скорость, но и на масштабиремость.


Многопроцессорной у меня под руками нет, а на однопроцессорной, мой опыт
говорит о другом — что различия в скорости сильно нивелируются
псевдопараллельностью исполнения — в спин блокировках, например,
практически нет смысла — лучше сразу переключить контекст на следующую
задачу. Правда тут есть одна особенность — планировщик винды отдает
предпочтение потокам, добровольно сократившим свой квант, в итоге этот
поток *быстрее* получает управление обратно. А вот на ожидание на
объектах синхронизации, это правило, похоже не распространяется, или это
время нивелируется тем, что обращение к объекту синхронизации для
ожидания, требует переключение в режим ядра. То есть присутствует одно
дополнительное переключение контекста. Хотя, зачем я ТЕБЕ об этом
рассказываю???

> Ты придумал реализацию *не* спин мьютекса с одной Interlocked операцией?!

> А ты можешь запостить. Насколько я заню, такого в мире ещё нет.

Думаю, ты просто неправильно меня понял, — я об оптимизации обычного
мьютекса, с отскоком для ожидания в операционку при неудаче. В типичных
ситуациях, когда конкуренция за ресурсы не столь велика, такой подход
позволяет существенно сократить количество обращений к ядру. Но, чтобы
недоразумений больше не возникало — вот текст:

class Mutex
{
public:

    Mutex();
    ~Mutex();

    bool try_lock();
    void lock();
    void unlock();

private:
    DWORD  owner;
    DWORD  count;
    HANDLE hwait;
    volatile LONG   avail;
};

Mutex::Mutex()
{
    owner = 0;
    count = 0;
    avail = 1;
    hwait = CreateSemaphore(NULL, 0, LONG_MAX, NULL);
}

Mutex::~Mutex()
{
    CloseHandle(hwait);
}

bool Mutex::try_lock()
{
    DWORD cur = GetCurrentThreadId();

    if (owner != cur) {
        LONG p = CAS(&avail, 0, -1);
        if (p != 0)
            return false;
        owner = cur;
    }
    count++;
    return true;
}

void Mutex::lock()
{
    DWORD cur = GetCurrentThreadId();

    if (owner != cur) {
        LONG p = InterlockedDecrement(&avail);
        if (p < 0) {
            WaitForSingleObject(hwait, INFINITE);
        }
        owner = cur;
    }
    count++;
}

void Mutex::unlock()
{
    if (--count == 0) {
        owner = 0;
        if (InterlockedIncrement(&avail) < 1) {
            ReleaseSemaphore(hwait, 1, NULL);
        }
    }
}
Posted via RSDN NNTP Server 2.1 beta
Re[15]: Неблокируемая очередь сообщений для двух потоков
От: HaronK Украина  
Дата: 24.12.07 08:40
Оценка:
Здравствуйте, remark, Вы писали:

R>А если не секрет, ты чем занимаешься? Каким-то конкретным проектом? Или так, для себя?


Если по работе, то мы делаем систему статистического анализа для немцев.
А задача с очередью возникла из игрового проекта, что мы делаем с друзьями. Наверное можно было использовать и обычные очереди с блокировками, только хотелось сделать, что-то свое, красивое.
Re[16]: Неблокируемая очередь сообщений для двух потоков
От: remark Россия http://www.1024cores.net/
Дата: 24.12.07 08:53
Оценка:
Здравствуйте, HaronK, Вы писали:

HK>Здравствуйте, remark, Вы писали:


R>>А если не секрет, ты чем занимаешься? Каким-то конкретным проектом? Или так, для себя?


HK>Если по работе, то мы делаем систему статистического анализа для немцев.

HK>А задача с очередью возникла из игрового проекта, что мы делаем с друзьями. Наверное можно было использовать и обычные очереди с блокировками, только хотелось сделать, что-то свое, красивое.


Ок. Понятно.
Может тогда тебе будет интересно:
Multi-Threaded Challenges in the Game Space &mdash; a Conversation with Tom Leonard of Valve Fame
Tom Leonard раскрывает секреты, что в Valve's Source Engine они экстенсивно используют lock-free технологии для построения моделей мира, шедулера и т.д. Это, так сказать, их ставка и конкурентное приемущество в multicore будущем.
(и вообще сайт интересный — много статей и интервью)



1024cores &mdash; all about multithreading, multicore, concurrency, parallelism, lock-free algorithms
Re[14]: Неблокируемая очередь сообщений для двух потоков
От: remark Россия http://www.1024cores.net/
Дата: 24.12.07 09:00
Оценка:
Здравствуйте, hexis, Вы писали:

H>remark wrote:

>>
>> Опять не понятно. В примере на мьютексах, который ты привёл, — очередь
>> не блокирующаяся. Зачем ты тогда делал на Interlocked блокирующуюся?

H>Не совсем так — можно сказать, она мало блокирующая, по сравнению с

H>типичной реализацией очередей — из-за своей специализации под конкретную
H>задачу. (NB: Что на самом деле — не так часто встречается. Возможно, это
H>и правильно: организовать взаимодействие потоков, без опыта — сложная
H>штука, и народ часто отскакивает на стандартные реализации, почерпнутые
H>из интернета)

H>Тут же в писателе требуется атомарность при выполнении операции "if

H>(whead != NULL) wtail->next = n;" Поэтому в читателе присвоение "whead =
H>NULL" и обложено блокировками, как это ни странно выглядит.


Под блокирующей я имел в виду, что надо блокироваться, когда вызывается dequeue(), но очередь пуста.
Я запутался. Если блокироваться на dequeue() не надо, то зачем в тесте lock-free очереди, ты использовал Interlocked инструкции? Ведь вот же у тебя есть реализация очереди, в которой нет Interlocked.


>> Я и не говорил, что тут будет на порядки. Я говорил:

>> 1. Между мьютексами и Interlocked на *многопроцессорной* машине будет на
>> порядки
>> 2. Между Interlocked и *без* Interlocked будет на порядки

H>Вообще, разговор начался с того, что я предположил, что различие между

H>wait-free и блокируюшей реализацией будет минимальным, на что ты ответил:

H>

На однопроцессорной машине она может быть лучше более чем на порядок.
H>На многопроцессорной машине выигрыш будет ещё существеннее + иметь
H>последствия не только на голую скорость, но и на масштабиремость.



Ну так я имел в виду вот ту очередь, которую ты запостил в первом посте. А ты говоришь, что ты тестировал какую-то другую очередь, в которой всё таки были Interlocked инструкции.



H>Многопроцессорной у меня под руками нет, а на однопроцессорной, мой опыт

H>говорит о другом — что различия в скорости сильно нивелируются
H>псевдопараллельностью исполнения — в спин блокировках, например,
H>практически нет смысла — лучше сразу переключить контекст на следующую
H>задачу. Правда тут есть одна особенность — планировщик винды отдает
H>предпочтение потокам, добровольно сократившим свой квант, в итоге этот
H>поток *быстрее* получает управление обратно. А вот на ожидание на
H>объектах синхронизации, это правило, похоже не распространяется, или это
H>время нивелируется тем, что обращение к объекту синхронизации для
H>ожидания, требует переключение в режим ядра. То есть присутствует одно
H>дополнительное переключение контекста. Хотя, зачем я ТЕБЕ об этом
H>рассказываю???

>> Ты придумал реализацию *не* спин мьютекса с одной Interlocked операцией?!

>> А ты можешь запостить. Насколько я заню, такого в мире ещё нет.

H>Думаю, ты просто неправильно меня понял, — я об оптимизации обычного

H>мьютекса, с отскоком для ожидания в операционку при неудаче. В типичных
H>ситуациях, когда конкуренция за ресурсы не столь велика, такой подход
H>позволяет существенно сократить количество обращений к ядру. Но, чтобы
H>недоразумений больше не возникало — вот текст:



Тут на lock()/unlock() приходится 2 Interlocked операции (CAS в lock() и InterlockedIncrement в unlock()).
Понятно, это стандартный мьютекс.



1024cores &mdash; all about multithreading, multicore, concurrency, parallelism, lock-free algorithms
Re[3]: Неблокируемая очередь сообщений для двух потоков
От: HaronK Украина  
Дата: 24.12.07 09:44
Оценка:
Здравствуйте, remark, Вы писали:

R>А почему ты не хочешь совсем убрать промежуточную очередь? И возможность "зависания" элементов в очереди?


Убирание "зависания" обойдется дополнительной проверкой. Но вообще то можно реализовать, как опцию через "define".

В пятницу вечером туго соображал, поэтому и не понял как убрать промежуточную очередь без применения блокировки )
Вот что сегодня получилось:

#define LOCK_FREE_QUEUE2_USE_FLUSH 1

template <class TYPE> class LockFreeQueue2
{
public:
    /// constructor
    LockFreeQueue2();
    /// destructor
    ~LockFreeQueue2();

    // Writer method
    void Enqueue(TYPE* data);

    // Reader method
    bool Dequeue(volatile TYPE*& data);

#if LOCK_FREE_QUEUE2_USE_FLUSH
    // Flush remained data. Writer method
    bool Flush();
#else
    // Inform that writer queue finished
    void SetWriterFinished();
#endif

private:
    volatile TYPE *readerTop;
    TYPE *writerTop, *writerBottom;

#if LOCK_FREE_QUEUE2_USE_FLUSH == 0
    volatile bool isWriterFinished;
#endif
};

template<class TYPE>
LockFreeQueue2<TYPE>::LockFreeQueue2()
{
    readerTop = writerTop = writerBottom = NULL;

#if LOCK_FREE_QUEUE2_USE_FLUSH == 0
    isWriterFinished = false;
#endif
}

template<class TYPE>
LockFreeQueue2<TYPE>::~LockFreeQueue2()
{
}

template<class TYPE>
void
LockFreeQueue2<TYPE>::Enqueue(TYPE* data)
{
    //n_assert(data);

    data->next = NULL;

    if (writerTop)
    {
        writerBottom->next = data;
        writerBottom = data;
    }
    else
    {
        writerTop = writerBottom = data;
    }

    if (!readerTop)
    {
        readerTop = writerTop;
        writerTop = NULL;
    }
}

template<class TYPE>
bool
LockFreeQueue2<TYPE>::Dequeue(volatile TYPE*& data)
{
    if (!readerTop)
    {
#if LOCK_FREE_QUEUE2_USE_FLUSH == 0
        if (isWriterFinished && writerTop)
        {
            readerTop = writerTop;
            writerTop = NULL;
        }
        else
#endif
        {
            return false;
        }
    }

    data = readerTop;
    readerTop = readerTop->next;

    return true;
}

#if LOCK_FREE_QUEUE2_USE_FLUSH
template<class TYPE>
bool
LockFreeQueue2<TYPE>::Flush()
{
    if (!writerTop) return true;

    if (!readerTop)
    {
        readerTop = writerTop;
        return true;
    }
    return false;
}
#else
template<class TYPE>
void
LockFreeQueue2<TYPE>::SetWriterFinished()
{
    isWriterFinished = true;
}
#endif
Re[15]: Неблокируемая очередь сообщений для двух потоков
От: hexis  
Дата: 24.12.07 18:25
Оценка:
remark wrote:
>
> H>remark wrote:
>>>
>>> Опять не понятно. В примере на мьютексах, который ты привёл, — очередь
>>> не блокирующаяся. Зачем ты тогда делал на Interlocked блокирующуюся?
>
> H>Не совсем так — можно сказать, она мало блокирующая, по сравнению с
> H>типичной реализацией очередей — из-за своей специализации под конкретную
> H>задачу. (NB: Что на самом деле — не так часто встречается. Возможно, это
> H>и правильно: организовать взаимодействие потоков, без опыта — сложная
> H>штука, и народ часто отскакивает на стандартные реализации, почерпнутые
> H>из интернета)
>
> H>Тут же в писателе требуется атомарность при выполнении операции "if
> H>(whead != NULL) wtail->next = n;" Поэтому в читателе присвоение "whead =
> H>NULL" и обложено блокировками, как это ни странно выглядит.
>
>
> Под блокирующей я имел в виду, что надо блокироваться, когда вызывается
> dequeue(), но очередь пуста.

Так я исходил из исходной постановки задачи: взаимодействие 1:1,
читающая сторона не блокируется при отсутствии сообщения. Получилось
чуть больше (1:*) — но, как получилось. А сравнивать две принципиально
разные задачи — нет смысла.

> Я запутался. Если блокироваться на dequeue() не надо, то зачем в тесте

> lock-free очереди, ты использовал Interlocked инструкции? Ведь вот же у
> тебя есть реализация очереди, в которой нет Interlocked.

Да, похоже, мы малость запутались. Сначала было мое мнение, что при
нормальной реализации потоков (то-есть примитивов синхронизации) не
будет принципиальных различий в скорости между реализацией этой задаче в
стиле lock-free и lock-based. Меня смутило твое возражение относительно
однопроцессных машин и я решил проверить — так ли это. Для этого я и
написал простую очередь, использующую mutex и, после того, поигрался с
различными вариантами реализации mutex — страндартный win32 mutex,
CRITICAL_SECTION, spin lock и тот, что запостил в прошлый раз — отсюда и
появились InterlockedXXX. Сама реализация очереди при этом не менялась.

С другой стороны, если ты говоришь о dequeue()-блокирующей очереди, то
там вступают в силу совсем другие соображения и будет совсем другая
реализация.

>

> Тут на lock()/unlock() приходится 2 Interlocked операции (CAS в lock() и
> InterlockedIncrement в unlock()).
> Понятно, это стандартный мьютекс.
>

Теперь понятно, что ты имел в виду. Что сделаешь — я редко занимаюсь
этим, и не всегда понимаю терминологию. Если конкуренция не очень
высока, можно предположить такой, достаточно очевидный вариант (fair,
время разблокирования пропорционально кол-ву ожидающих потоков):

class Mutex
{
public:
    Mutex();
    ~Mutex() {}

    void lock();
    bool try_lock();
    void unlock();

    static void attach_thread();
private:

    struct mqitem
    {
        volatile struct mqitem * next;
        HANDLE hwait;
    } e, *queue;

#ifdef RECURSIVE_MUTEX
    volatile DWORD owner;
    DWORD count;
#endif

    static _declspec(thread) mqitem * thread_item;
};

_declspec(thread) Mutex::mqitem * Mutex::thread_item = NULL;

void Mutex::attach_thread()
{
    if (thread_item == NULL) {
        thread_item = new mqitem;
        thread_item->hwait = CreateSemaphore(NULL, 0, LONG_MAX, NULL);
    }
}

Mutex::Mutex()
{
    e.next = NULL;
    queue = &e;
#ifdef RECURSIVE_MUTEX
    owner = 0;
    count = 0;
#endif
}

void Mutex::lock()
{
    mqitem * p, * h;
    DWORD cur = GetCurrentThreadId();

#ifdef RECURSIVE_MUTEX
    if (owner != cur) {
#endif
        assert( thread_item != NULL );
        p = thread_item;

        do {
            h = (mqitem *)e.next;
            p->next = h;
        } while( _CAS_PTR((PVOID*)&e.next, (PVOID)h, (PVOID)p) != h );

        if (h != NULL)
            WaitForSingleObject(thread_item->hwait, INFINITE);

#ifdef RECURSIVE_MUTEX
#ifdef _DEBUG
        assert(CAS(&owner, (DWORD)NULL, cur) == NULL);
#else
        owner = cur;
#endif
    }
    count++;
#endif
}

bool Mutex::try_lock()
{
    mqitem * p;
    DWORD cur = GetCurrentThreadId();

#ifdef RECURSIVE_MUTEX
    if (owner != cur) {
#endif
        assert( thread_item != NULL );
        if (e.next != NULL)
            return false;

        p = thread_item;
        p->next = NULL;

        if ( _CAS_PTR((PVOID*)&e.next, (PVOID)NULL, (PVOID)p) != NULL )
            return false;

#ifdef RECURSIVE_MUTEX
#ifdef _DEBUG
        assert(CAS(&owner, (DWORD)NULL, cur) == NULL);
#else
        owner = cur;
#endif
    }
    count++;
#endif
    return true;
}

void Mutex::unlock()
{
    mqitem * o = thread_item;
    volatile mqitem * p = queue;

#ifdef RECURSIVE_MUTEX
    assert(owner == GetCurrentThreadId());
    if (--count == 0) {
#endif
        /* we'd better not to have high concurrency on this mutex */
        while(p->next != o)
            p = p->next;

#ifdef RECURSIVE_MUTEX
        owner = NULL;
#endif
        if (p != &e)
            ReleaseSemaphore(p->hwait, 1, NULL);
        p->next = NULL;
#ifdef RECURSIVE_MUTEX
    }
#endif
}
Posted via RSDN NNTP Server 2.1 beta
Re[16]: Неблокируемая очередь сообщений для двух потоков
От: remark Россия http://www.1024cores.net/
Дата: 25.12.07 15:38
Оценка:
Здравствуйте, hexis, Вы писали:


H>Да, похоже, мы малость запутались. Сначала было мое мнение, что при

H>нормальной реализации потоков (то-есть примитивов синхронизации) не
H>будет принципиальных различий в скорости между реализацией этой задаче в
H>стиле lock-free и lock-based. Меня смутило твое возражение относительно
H>однопроцессных машин и я решил проверить — так ли это. Для этого я и
H>написал простую очередь, использующую mutex и, после того, поигрался с
H>различными вариантами реализации mutex — страндартный win32 mutex,
H>CRITICAL_SECTION, spin lock и тот, что запостил в прошлый раз — отсюда и
H>появились InterlockedXXX. Сама реализация очереди при этом не менялась.


Хорошо. Т.е. ты сравнивал реализацию очереди, которую запостил в первом посте, с очередью, построенной на мьютексе (на различных реализациях). Я правильно понял?
И что, разница была всего 15%?
Сейчас займусь ясновидением: ты либо тестировал на Win2k, либо у тебя процессор AMD, либо и то и то?



1024cores &mdash; all about multithreading, multicore, concurrency, parallelism, lock-free algorithms
Re[16]: Неблокируемая очередь сообщений для двух потоков
От: hexis  
Дата: 25.12.07 16:11
Оценка:
hexis wrote:
>
> Если конкуренция не очень
> высока, можно предположить такой, достаточно очевидный вариант (fair,
> время разблокирования пропорционально кол-ву ожидающих потоков):

При unlock() можно связывать список обратно — в этом случае среднее
время на операцию разблокирования будет O(1), а не O(n) — за счет того,
что одна нитка будет готовить данные для других — то есть время
разблокирования будет различным. Семафоры для ожидания можно сделать
общими — один на мьютекс, но очередь все равно нужна.

Вот текст с обратным связыванием списка:
class Mutex
{
public:
    Mutex();
    ~Mutex() {}

    void lock();
    bool try_lock();
    void unlock();

    static void attach_thread();
private:

    struct mqitem
    {
        volatile struct mqitem * next;
        struct mqitem * prior;
        HANDLE hwait;
    } e, * tail;

#ifdef RECURSIVE_MUTEX
    volatile DWORD owner;
    DWORD count;
#endif

    static _declspec(thread) mqitem * thread_item;
};

_declspec(thread) Mutex::mqitem * Mutex::thread_item = NULL;

void Mutex::attach_thread()
{
    if (thread_item == NULL) {
        thread_item = new mqitem;
        thread_item->hwait = CreateSemaphore(NULL, 0, LONG_MAX, NULL);
    }
}

Mutex::Mutex()
{
    e.next = NULL;
    tail = NULL;
#ifdef RECURSIVE_MUTEX
    owner = 0;
    count = 0;
#endif
}

void Mutex::lock()
{
    mqitem * p, * h;
    DWORD cur = GetCurrentThreadId();

#ifdef RECURSIVE_MUTEX
    if (owner != cur) {
#endif
        assert( thread_item != NULL );
        p = thread_item;
        p->prior = NULL;

        do {
            h = (mqitem *)e.next;
            p->next = h;
        } while( _CAS_PTR((PVOID*)&e.next, (PVOID)h, (PVOID)p) != h );

        if (h != NULL)
            WaitForSingleObject(thread_item->hwait, INFINITE);

#ifdef RECURSIVE_MUTEX
#ifdef _DEBUG
        assert(CAS(&owner, (DWORD)NULL, cur) == NULL);
#else
        owner = cur;
#endif
    }
    count++;
#endif
}

bool Mutex::try_lock()
{
    mqitem * p;
    DWORD cur = GetCurrentThreadId();

#ifdef RECURSIVE_MUTEX
    if (owner != cur) {
#endif
        assert( thread_item != NULL );
        if (e.next != NULL)
            return false;

        p = thread_item;
        p->next = NULL;

        if ( _CAS_PTR((PVOID*)&e.next, (PVOID)NULL, (PVOID)p) != NULL )
            return false;

#ifdef RECURSIVE_MUTEX
#ifdef _DEBUG
        assert(CAS(&owner, (DWORD)NULL, cur) == NULL);
#else
        owner = cur;
#endif
    }
    count++;
#endif
    return true;
}

void Mutex::unlock()
{
    mqitem * o = thread_item;
    mqitem * p = &e;

#ifdef RECURSIVE_MUTEX
    assert(owner == GetCurrentThreadId());
    if (--count == 0) {
#endif
        /* we know the tail sometimes */
        if (tail != &e) {
            p = tail;
            tail = p->prior;
        } else {
            /* scan the list to find the tail and update
             * the list inverse. Anyway, if the unlock() time
             * is sensetive, do not use in highly concurrent
             * conditions
             */
            while(p->next != o) {
                p->next->prior = p;
                p = (mqitem *)p->next;
            }
            tail = p->prior;
        }

#ifdef RECURSIVE_MUTEX
        owner = NULL;
#endif
        if (p != &e)
            ReleaseSemaphore(p->hwait, 1, NULL);

        p->next = NULL;
#ifdef RECURSIVE_MUTEX
    }
#endif
}
Posted via RSDN NNTP Server 2.1 beta
Re[16]: Неблокируемая очередь сообщений для двух потоков
От: remark Россия http://www.1024cores.net/
Дата: 25.12.07 16:31
Оценка:
Здравствуйте, hexis, Вы писали:


H>Теперь понятно, что ты имел в виду. Что сделаешь — я редко занимаюсь

H>этим, и не всегда понимаю терминологию.


Обычно спин-мьютекс это очередь, которая основана на периодическом опросе. Есть спин с активным ожиданием, это когда простой цикл, и есть спин с пассивным ожиданием, это когда sleep(0). Но тем не менее это всё спин, т.к. оба варианта периодически опрашивают состояние, что бы понять когда им действовать.
Не-спин мьютекс, основывается не на опросе, а на чётком взимодействии между двумя потоками. Ожидающий просто блокируется в ядро, а тот, кто сейчас в мьютексе, при выходе должен нотифицировать первого. При этом тот, кто сейчас в мьютексе, должен 100-процентно определить, есть ли кто-то ожидающий или нет. Вот тут и начинаются проблемы. Тут начинается такая же гонка как и при попытке захватить мьютекс двумя потоками почти одновременно — им надо чётко решить, кто же из них первый, а кто будет ждать. Приянть такое чёткое и 100-процентно консистентное решение и позволяет InterlockedXXX. При выходе начинается такая же гонка, только между потоком выходящим из мьютекса и потоком входящим в мьютекс. Они должны чётко решить — либо (1) выход произошёл раньше, тогда второй может входить без ожидания, и первому не надо нотифицировать, либо (2) вход произошёл раньше, тогда первый должен нотифицировать второго, а второй блокироваться в ядре. Если при разблокировке мьютекса нет InterlockedXXX, то чётко и консистентно решить этот вопрос 2 потока не могут. Есть вероятность, что один поток примет решение (1), а второй (2), или наоборот. Если бы это можно было разрешить без InterlockedXXX, то аналогичную ситуацию можно было бы разрешить и при входе в мьютекс. Т.е. можно было бы сделать мьютекс вообще без InterlockedXXX. Что естественно не соотв. действительности.



H> Если конкуренция не очень

H>высока, можно предположить такой, достаточно очевидный вариант (fair,
H>время разблокирования пропорционально кол-ву ожидающих потоков):


К сожалению, это *почти* работающий вариант...
Тут есть гонка между выходящим потоком и одновременно входящим потоком. Т.е. выходящий поток может решить, что ещё никакой поток не пытался войти, т.е. ему не надо нотифицировать семафор, и в то же время одновременно входящий поток может решить, что мьютекс ещё занят и заблокироваться на семафоре. Как результат он "повиснет" на неопределённое время...



з.ы. вот это можно cоптимизировать:
    /* we'd better not to have high concurrency on this mutex */
    while(p->next != o)
        p = p->next;


В lock-free алгоритмах есть такой трюк как "prev hint". Линк next является как бы основным, а линк prev вспомогательным. При этом prev не обязан быть 100% правильным, но он является правильным в большинстве случаев. Это позволяет в большинстве случаев и использовать prev, а если он всё таки не правильный, то иметь какой-то запасной вариант. Вот пример модификации. За 100% не ручаюсь, но идея такая:

void Mutex::lock()
{
    mqitem* p = thread_item;
    mqitem* h;
    do 
    {
        h = (mqitem *)e.next;
        p->next = h;
    } while( _CAS_PTR((PVOID*)&e.next, (PVOID)h, (PVOID)p) != h );

    if (p->next)
        p->next->prev = p;

    if (h != NULL)
        WaitForSingleObject(thread_item->hwait, INFINITE);
}

void Mutex::unlock()
{
    mqitem * o = thread_item;
    volatile mqitem * p = queue;

    if (o->prev->next == o)
    {
        // we're lucky
        p = o->prev;
    }
    else
    {
        // back-off
        while(p->next != o)
            p = p->next;
    }

    if (p != &e)
        ReleaseSemaphore(p->hwait, 1, NULL);
    p->next = NULL;
}





1024cores &mdash; all about multithreading, multicore, concurrency, parallelism, lock-free algorithms
Re[17]: Неблокируемая очередь сообщений для двух потоков
От: hexis  
Дата: 25.12.07 16:43
Оценка:
remark wrote:
>
>
> Здравствуйте, hexis, Вы писали:
>
>
> H>Да, похоже, мы малость запутались. Сначала было мое мнение, что при
> H>нормальной реализации потоков (то-есть примитивов синхронизации) не
> H>будет принципиальных различий в скорости между реализацией этой задаче в
> H>стиле lock-free и lock-based. Меня смутило твое возражение относительно
> H>однопроцессных машин и я решил проверить — так ли это. Для этого я и
> H>написал простую очередь, использующую mutex и, после того, поигрался с
> H>различными вариантами реализации mutex — страндартный win32 mutex,
> H>CRITICAL_SECTION, spin lock и тот, что запостил в прошлый раз — отсюда и
> H>появились InterlockedXXX. Сама реализация очереди при этом не менялась.
>
>
> Хорошо. Т.е. ты сравнивал реализацию очереди, которую запостил в первом
> посте, с очередью, построенной на мьютексе (на различных реализациях). Я
> правильно понял?
> И что, разница была всего 15%?
> Сейчас займусь ясновидением: ты либо тестировал на Win2k, либо у тебя
> процессор AMD, либо и то и то?
>

XP. Проц AMD. Что-то у них с lock? Я правда использовал cmpxchg без него
— потому, что для однопроцессорной машины он смысла не имеет. А 15% —
это в основном алгоритмическая разница — при этом обращений к ядру
практически не происходило. Можно сказать, что при этом реализация
работала в режиме lock-free. Не думаю, что другая операционка или проц
смогли бы замедлить это время, по отношению к любой другой lock-free
реализации. Худшее время, когда блокировки происходят постоянно, я тоже
написал — разница в два раза — тут я могу поверить во влияние проца и
операционки.
Posted via RSDN NNTP Server 2.1 beta
Re[17]: Неблокируемая очередь сообщений для двух потоков
От: remark Россия http://www.1024cores.net/
Дата: 25.12.07 17:24
Оценка:
Здравствуйте, hexis, Вы писали:

H>hexis wrote:

>>
>> Если конкуренция не очень
>> высока, можно предположить такой, достаточно очевидный вариант (fair,
>> время разблокирования пропорционально кол-ву ожидающих потоков):

H>При unlock() можно связывать список обратно — в этом случае среднее

H>время на операцию разблокирования будет O(1), а не O(n) — за счет того,
H>что одна нитка будет готовить данные для других — то есть время
H>разблокирования будет различным. Семафоры для ожидания можно сделать
H>общими — один на мьютекс, но очередь все равно нужна.


Да, такой трюк можно применить. Можно совместить его с prev hint, который я запостил в соседнем посте.
Но основную проблему это не решает — решение не рабочее...



1024cores &mdash; all about multithreading, multicore, concurrency, parallelism, lock-free algorithms
Re[18]: Неблокируемая очередь сообщений для двух потоков
От: remark Россия http://www.1024cores.net/
Дата: 25.12.07 17:35
Оценка:
Здравствуйте, hexis, Вы писали:

H>XP. Проц AMD. Что-то у них с lock? Я правда использовал cmpxchg без него

H>- потому, что для однопроцессорной машины он смысла не имеет. А 15% -
H>это в основном алгоритмическая разница — при этом обращений к ядру
H>практически не происходило. Можно сказать, что при этом реализация
H>работала в режиме lock-free. Не думаю, что другая операционка или проц
H>смогли бы замедлить это время, по отношению к любой другой lock-free
H>реализации. Худшее время, когда блокировки происходят постоянно, я тоже
H>написал — разница в два раза — тут я могу поверить во влияние проца и
H>операционки.


Они на AMD, редиски, сделали, что в однопроцессорной конфигурации у них LOCK префикс игнорируется. По крайней мере на write-back памяти.
Поэтому получается, что тяжёлая interlocked операция получается лёгкой. А на Intel процессоре ты получишь следующую картину. Допустим операция вставки в очередь в твоей реализации, которую ты запостил в первом посте, будет занимать 10 тактов — там только несколько манипуляций с указателями. А очередь использующая Interlocked будет — 110 тактов = 10 вставить в очередь + 100 тактов LOCK CMPXCHG....



1024cores &mdash; all about multithreading, multicore, concurrency, parallelism, lock-free algorithms
Re[17]: Неблокируемая очередь сообщений для двух потоков
От: hexis  
Дата: 25.12.07 19:12
Оценка:
remark wrote:
>
> Здравствуйте, hexis, Вы писали:
>
> H>Теперь понятно, что ты имел в виду. Что сделаешь — я редко занимаюсь
> H>этим, и не всегда понимаю терминологию.
>
>
> Обычно спин-мьютекс это очередь, которая основана на периодическом
> опросе. Есть спин с активным ожиданием, это когда простой цикл, и есть
> спин с пассивным ожиданием, это когда sleep(0). Но тем не менее это всё
> спин, т.к. оба варианта периодически опрашивают состояние, что бы понять
> когда им действовать.

Теорию я в общем-то знаю. Проблемы у меня с терминологией — так, как
я не поддерживаю ее на современном уровне, могу не понять что и как
называется. Так, например, отлично понимаю, что такое спин и к чему
приводит (или зачем нужно) использование задержек, но не знал, что они
так называются.

>

> H> Если конкуренция не очень
> H>высока, можно предположить такой, достаточно очевидный вариант (fair,
> H>время разблокирования пропорционально кол-ву ожидающих потоков):
>
>
> К сожалению, это *почти* работающий вариант...
> Тут есть гонка между выходящим потоком и одновременно входящим потоком.
> Т.е. выходящий поток может решить, что ещё никакой поток не пытался
> войти, т.е. ему не надо нотифицировать семафор, и в то же время
> одновременно входящий поток может решить, что мьютекс ещё занят и
> заблокироваться на семафоре. Как результат он "повиснет" на
> неопределённое время...

Да, есть гонки, проглядел. В принципе, их можно устранить, сохранив
основную идею, путем ведения цикла ожидания изменения состояния (все еще
без Interlocked операций, за счет различных областей памяти для
коммуникации с разными потоками) или если допустить устранимые ложные
сигналы на семафоре. Пусть цикл ожидания не так красив, но, пожалуй,
лучше немного подождать, чем лишний раз упасть в ядро.

>

> з.ы. вот это можно cоптимизировать:
>
> /* we'd better not to have high concurrency on this mutex */
> while(p->next != o)
> p = p->next;
>
>
>
> В lock-free алгоритмах есть такой трюк как "prev hint". Линк next
> является как бы основным, а линк prev вспомогательным. При этом prev не
> обязан быть 100% правильным, но он является правильным в большинстве
> случаев. Это позволяет в большинстве случаев и использовать prev, а если
> он всё таки не правильный, то иметь какой-то запасной вариант. Вот
> пример модификации. За 100% не ручаюсь, но идея такая:

Понятно, спасибо. Можно и так сделать. Можно совместить оба подхода —
проверять хинт, и при сканировании дополнительно восстанавливать список.

Вот что получилось:
class Mutex
{
public:
    Mutex();
    ~Mutex() {}

    void lock();
    void unlock();

    static void attach_thread();
private:
    enum State { UNCERTAIN, WAITING, RUN };

    struct mqitem
    {
        Mutex * waiting;

        volatile struct mqitem * next;
        struct mqitem * prior;
        State state;
        HANDLE hwait;
    } e;

    static _declspec(thread) mqitem * thread_item;
};

_declspec(thread) Mutex::mqitem * Mutex::thread_item = NULL;

void Mutex::attach_thread()
{
    if (thread_item == NULL) {
        thread_item = new mqitem;
        thread_item->hwait = CreateSemaphore(NULL, 0, LONG_MAX, NULL);
    }
}

Mutex::Mutex()
{
    e.next = NULL;
    e.state = RUN;
    e.waiting = NULL;
}

void Mutex::lock()
{
    mqitem * p, * h;

    assert( thread_item != NULL );
    p = thread_item;
    p->prior = &e;

    do {
        h = (mqitem *)e.next;
        p->next = h;
        p->state = UNCERTAIN;
        p->waiting = this;
    } while( _CAS_PTR((PVOID*)&e.next, (PVOID)h, (PVOID)p) != h );

    /* hint the previous item. And you'd better not to release the other
thread's
     * descriptor ;-)
     */
    if (h)
        h->prior = h;

    while (p->next != NULL) {
        p->state = WAITING;
        WaitForSingleObject(p->hwait, INFINITE);
    }
    p->state = RUN;
}

void Mutex::unlock()
{
    mqitem * o = thread_item;
    mqitem * p;
    BOOL rescan;

    /* check the hint */
    p = o->prior;
    if (p->next != o) {
        /* Scan the whole list, updating the hints 'prior' as well. */
        p = &e;
        while(p->next != o) {
            p->next->prior = p;
            p = (mqitem *)p->next;
        }
    }

    /* we might have to rescan the queue if no waiters yet */
    rescan = (e.next == thread_item);

    /* release the mutex */
    p->next = NULL;

    /* Now to prevent race conditions we have to rescan the list to see
     * if any other thread has appeared since we released the lock in
     * the previous line.
     * Note that we have to deal with two threads only.
     */

    if (rescan) {
    /* we could have updated 'prior' hints here, should we? */
        for(p = &e; p->next != NULL; p = (mqitem *)p->next)
            ;
    }

    if (p->waiting == this) {
        /* spin until thread decided about it's state.
         * Removing this would just cause spurious wakeups.
         */
        while (p->state == UNCERTAIN)
            ;

        if (p->state != RUN)
            ReleaseSemaphore(p->hwait, 1, NULL);
    }
}
Posted via RSDN NNTP Server 2.1 beta
Re[19]: Неблокируемая очередь сообщений для двух потоков
От: hexis  
Дата: 25.12.07 19:29
Оценка:
remark wrote:
>
> Здравствуйте, hexis, Вы писали:
>
> H>XP. Проц AMD. Что-то у них с lock? Я правда использовал cmpxchg без него
> H>- потому, что для однопроцессорной машины он смысла не имеет. А 15% -
> H>это в основном алгоритмическая разница — при этом обращений к ядру
> H>практически не происходило. Можно сказать, что при этом реализация
> H>работала в режиме lock-free. Не думаю, что другая операционка или проц
> H>смогли бы замедлить это время, по отношению к любой другой lock-free
> H>реализации. Худшее время, когда блокировки происходят постоянно, я тоже
> H>написал — разница в два раза — тут я могу поверить во влияние проца и
> H>операционки.
>
>
> Они на AMD, редиски, сделали, что в однопроцессорной конфигурации у них
> LOCK префикс игнорируется. По крайней мере на write-back памяти.
> Поэтому получается, что тяжёлая interlocked операция получается лёгкой.
> А на Intel процессоре ты получишь следующую картину. Допустим операция
> вставки в очередь в твоей реализации, которую ты запостил в первом
> посте, будет занимать 10 тактов — там только несколько манипуляций с
> указателями. А очередь использующая Interlocked будет — 110 тактов = 10
> вставить в очередь + 100 тактов LOCK CMPXCHG....
>

Так я и думал. Чтобы покопаться с зависимостями от SMP (предположив, что
XP идет в SMP и UNP конфигурациях, как и NT4 и w2k), написал CAS в виде
ассемблерной вставки и заметил, что нет разницы между наличием и
отсутствием LOCK. Значит, это только у AMD... Будем знать.
Это, кстати, было одной из причин, почему я специально отмечал
однопроцессорную машину.
Posted via RSDN NNTP Server 2.1 beta
Re[18]: Неблокируемая очередь сообщений для двух потоков
От: hexis  
Дата: 25.12.07 20:29
Оценка: 2 (1)
hexis wrote:
>
> Вот что получилось:
>

Есть неоптимальность — первый поток оканчивает rescan в unlock() и
вытесняется; второй поток блокирует (без ожидания), разблокирует; третий
поток блокирует; второй запрашивает блокировку; первый просыпается и
будит второй поток. Который, правда, тут-же вернется в состояние
ожидания. Лечится заменой waiting с указателя на объект, на указатель на
поток (несбрасываемый дубликат поля next) и обработкой его в unlock.
Кстати, мне непонятно, насколько все это интересно — выкладывать
следующие версии или нет?
Posted via RSDN NNTP Server 2.1 beta
Re[18]: Неблокируемая очередь сообщений для двух потоков
От: hexis  
Дата: 25.12.07 20:30
Оценка:
hexis wrote:
>
> Вот что получилось:
>

Есть неоптимальность — первый поток оканчивает rescan в unlock() и
вытесняется; второй поток блокирует (без ожидания), разблокирует; третий
поток блокирует; второй запрашивает блокировку; первый просыпается и
будит второй поток. Который, правда, тут-же вернется в состояние
ожидания. Лечится заменой waiting с указателя на объект, на указатель на
поток (несбрасываемый дубликат поля next) и обработкой его в unlock.
Кстати, мне непонятно, насколько все это интересно — выкладывать
следующие версии или нет?
Posted via RSDN NNTP Server 2.1 beta
Re[18]: Неблокируемая очередь сообщений для двух потоков
От: remark Россия http://www.1024cores.net/
Дата: 26.12.07 12:34
Оценка:
Здравствуйте, hexis, Вы писали:

H>Да, есть гонки, проглядел. В принципе, их можно устранить, сохранив

H>основную идею, путем ведения цикла ожидания изменения состояния (все еще
H>без Interlocked операций, за счет различных областей памяти для
H>коммуникации с разными потоками) или если допустить устранимые ложные
H>сигналы на семафоре. Пусть цикл ожидания не так красив, но, пожалуй,
H>лучше немного подождать, чем лишний раз упасть в ядро.


Это уже лучше. *Теоретически* это рабочий алгоритм.
Самое интересное начнётся, когда мы переложим его в практическую плоскость. А именно вот тут нужен барьер памяти:



H> /* release the mutex */

p->>next = NULL;

#StoreLoad memory barrier

H> /* Now to prevent race conditions we have to rescan the list to see

H> * if any other thread has appeared since we released the lock in
H> * the previous line.
H> * Note that we have to deal with two threads only.
H> */

H> if (rescan) {

H> /* we could have updated 'prior' hints here, should we? */
H> for(p = &e; p->next != NULL; p = (mqitem *)p->next)
H> ;
H> }


#StoreLoad это самый дорогой тип барьера памяти. В частности на x86 для его реализации надо использовать mfence инструкцию. Сколько она занимает времени? Делаем ставки! Правильно 100 тактов, столько же сколько и инструкиция с префиксом LOCK, т.к. по большому счёту это одно и то же просто в другой форме.
Хотя в принципе есть мнения, что mfence более дружествен к SMP/multicore системам, чем LOCK, несмотря на то, что занимает столько же времени. Хотя у меня уверенности нет...

Обычно LOCK и MFENCE взаимозаменяемы. Вот например алгоритм мьютекса Петерсона:
http://en.wikipedia.org/wiki/Peterson's_algorithm
Он использует на *входе* в мьютекс MFENCE вместо LOCK.

Поэтому более правильно ставить задачу как "without atomic rmw operations or heavy memory barriers".



1024cores &mdash; all about multithreading, multicore, concurrency, parallelism, lock-free algorithms
Re[19]: Неблокируемая очередь сообщений для двух потоков
От: remark Россия http://www.1024cores.net/
Дата: 26.12.07 12:48
Оценка:
Здравствуйте, hexis, Вы писали:

H>hexis wrote:

>>
>> Вот что получилось:
>>

H>Есть неоптимальность — первый поток оканчивает rescan в unlock() и

H>вытесняется; второй поток блокирует (без ожидания), разблокирует; третий
H>поток блокирует; второй запрашивает блокировку; первый просыпается и
H>будит второй поток. Который, правда, тут-же вернется в состояние
H>ожидания. Лечится заменой waiting с указателя на объект, на указатель на
H>поток (несбрасываемый дубликат поля next) и обработкой его в unlock.


В принципе имхо небольшая вероятность spurious wakeups не страшна, если при этом общий случай мы делаем значительно быстрее. Под значительно быстрее понимается, например, меньшее кол-во атомарных RMW операций или тяжёлых барьеров памяти.


H>Кстати, мне непонятно, насколько все это интересно — выкладывать

H>следующие версии или нет?


Мне лично очень интересно. Выкладывай.
Я сам частьенько занимаюсь выдумыванием всяких кастомных примитивов синхронизации и выкладываю на comp.programming.threads:

asymmetric rw_mutex:
http://groups.google.com/group/comp.programming.threads/tree/browse_frm/thread/b7d3b8c08f9ca3c6

rw_mutex with costless read_lock/unlock:
http://groups.google.com/group/comp.programming.threads/browse_frm/thread/1348064e4838e871

MPSC FIFO Queue w/o atomic_rmw/membars:
http://groups.google.com/group/comp.programming.threads/browse_frm/thread/844d08c4eeb5c5d5

MPMC Unbounded FIFO Queue w/ 1 CAS/Operation:
http://groups.google.com/group/comp.programming.threads/browse_frm/thread/bf2261a78b4d3188

lock-free node allocator for fifo-queues:
http://groups.google.com/group/comp.programming.threads/browse_frm/thread/378a35b21ae2b42e

Lock-free proxy algorithm:
http://groups.google.com/group/comp.programming.threads/browse_frm/thread/6cfe6b1a6926680d

Ну и так далее. К сожалению на c.p.t обычно к таким вопросам имеют интерес 1,5 человека, которых я могу назвать по именам. На Intel Threading Forum и на RSDN, к сожалению, никто...



1024cores &mdash; all about multithreading, multicore, concurrency, parallelism, lock-free algorithms
Re[19]: Неблокируемая очередь сообщений для двух потоков
От: hexis  
Дата: 27.12.07 01:58
Оценка: 22 (1)
remark wrote:
>
> Поэтому более правильно ставить задачу как "without atomic rmw
> operations or heavy memory barriers".
>

Ну тогда я выскажу следующую мысль — в заданных условиях, идеальное
решение невозможно. Обоснование такое. Рассмотрим упрощенную модель
работы mutex в приложении к двум потокам:

lock()
{
    /* barrier */
    if (!free) {
        wait = true;
        Wait();
    }
    free = false;
}

unlock()
{
    free = true;
    /* barrier */
    if (wait) {
        wait = false;
        Signal();
    }
}

Видно, что здесь информация передается в две стороны — от блокирующего
потока к разблокирующему и обратно. Из чего следует, что, для того,
чтобы тот процедура отработала правильно, требуется наличие барьеров.
Собственно проблема возникает из-за необходимости синхронизировать
состояние двух примитивов — самой блокировки и объекта ожидания.

Соответственно, можно рассмотреть такие способы преодоления проблемы —
передача информации только в одну сторону и попытки избежать барьеров
путем переноса операций.

Передачу информации в одну сторону можно реализовать на практике одним
из следующих способов:

1) отсутствие барьера в lock(), приведет к тому, что в результате
несколько потоков могут одновременно получить блокировку. Очевидно, что
при этом требуется проверка конфликта в unlock и откат изменений
выполненных в конфликтной ситуации. То есть схема эквивалентна, так
называемой, оптимистической блокировке. Но не эквивалентна классической
трактовке понятия мьютекса. Так, что вряд ли стоит разрабатывать эту
тему в рамках вопроса о мьютексе, хотя сама по себе она достаточно
интересна.

2) отсутствие барьера у unlock() приведет к тому, что проверка в
unlock() предиката (wait) может дать ложные результаты. Что будет
приводить к подвисанию потоков в lock(). Возможные пути решения
проблемы, пришедшие мне в голову, можно просуммировать следующим образом:

2.1) Wait не должен быть блокирующимся. Требуется, как минимум вторая
проверка, после истечения времени барьера. Это возможно двумя путями —
полное отсутствие Wait и Signal (вариант spin lock-а) или
Wait(TIMEOUT)+Wait(INFINITE). Таймаут равен времени записи в память плюс
1. Достаточно практично было бы обрабатывать опрос с таймаутом при
помощи spin опроса, но без дополнительного барьера.

2.2) Wait должен знать об ожидании на предикате. Что приводит к мысли о
futex. К сожалению — трудно уговорить производителя ОС включить в API
новый предикат.

2.3) Wait должен получать гарантированное оповещение. Для этого при
unlock() должен всегда вызываться Signal(). Вариация на тему обычного
mutex из OS, что, в данном случае, не имеет особого смысла.

2.4) Наличие помощника/посредника. unlock() может сигналить, если он
точно знает, что другая нитка ждет. И запрашивать проверку состояния по
истечении времени барьера у посредника, если предикат wait оказался
ложным. В идеале этот посредник — планировщик OS, но на практике это
может быть выделенный поток.

Перенос кода из lock() в unlock() невозможен без изменения семантики.
Но можно перенести код из unlock() в lock():

lock()
{
    /* barrier */
    if (wait > 0 && free) {
        wait++;
        Signal();
        Wait();
        wait--;
    } else if (!free) {
        wait++;
        Wait();
        wait--;
    }
    free = false;
}

unlock()
{
    /* no-barrier */
    if (wait > 0)
        Signal();
    free = true;
}

Собственно, недостатки этого решения очевидны — тут нужна конкуренция
определённого вида. А именно — должно быть как минимум два конкурирующих
потока — нужно гарантировать запрос блокировки.

Примерно так. Я пока не вижу других способов решения вопроса. Из
приведенного, на мой взгляд есть два практичных решения — futex и
ожидание с таймаутом.

Соответственно, код будет выглядеть наподобие следующего (нетестированно):

struct mutex
{
    long requested;  /* количество запросов на блокировки */
    long granted;    /* количество выданных блокировок    */
    long sent;       /* количество посланных сигналов     */
    long got;        /* количество полученных сигналов    */
};

void
init(mutex * m)
{
    m->granted = 0;
    m->requested = 0;
    m->sent = 0;
    m->got  = 0;
}

void
lock(mutex * m)
{
    long mine;
    int  j;

    /* InterlockedIncrement включает в себя барьер */
    mine = InterlockedIncrement(&m->requested) - 1L;
    while (m->granted != mine) {
        for(j = 0; j < WAIT_COUNT; j++) {
            if (m->granted == mine)
                return;
        }
        Wait(m, INFINITE);
    m->got++;
    }
}

void
unlock(mutex * m)
{
    if (m->requested != ++m->granted && m->sent == m->got) {
    m->sent++;
        Signal(m);
    }
}


Вероятность spurious wakeup при такой технике значительно повышена (тем
ниже, чем ближе WAIT_COUNT к времени исполнения FENCE (задержку, кстати,
можно калибровать на старте). Соответственно, предпочтительно
использовать бинарный семафор или Event. Или же можно подсчитывать
соотношение посланных и полученных сигналов (на тех-же условиях).
Posted via RSDN NNTP Server 2.1 beta
Re: Неблокируемая очередь сообщений для двух потоков
От: HaronK Украина  
Дата: 28.12.07 12:06
Оценка:
Возможно не совсем в тему вопрос, но вот искал аналог InterlockedIncrement для Линукса.
Наткнулся вот на такой пример (см. второй ответ).
Насколько это будет работоспособно? Платформа х86.
Re[2]: Неблокируемая очередь сообщений для двух потоков
От: remark Россия http://www.1024cores.net/
Дата: 28.12.07 12:36
Оценка:
Здравствуйте, HaronK, Вы писали:

HK>Возможно не совсем в тему вопрос, но вот искал аналог InterlockedIncrement для Линукса.

HK>Наткнулся вот на такой пример (см. второй ответ).
HK>Насколько это будет работоспособно? Платформа х86.


В принципе похоже. Это асм под gcc компилятор. Так же можешь поглядеть в библиотеках типа boost или ACE (Atomic_Op.cpp).

А вообще смотри здесь:
Built-in functions for atomic memory access
В gcc просто есть встроенные функции типа __sync_fetch_and_add() и __sync_val_compare_and_swap() — так что изобретать ничего не надо...



1024cores &mdash; all about multithreading, multicore, concurrency, parallelism, lock-free algorithms
Re[3]: Неблокируемая очередь сообщений для двух потоков
От: hexis  
Дата: 28.12.07 14:59
Оценка:
remark wrote:
>
> Здравствуйте, HaronK, Вы писали:
>
> HK>Возможно не совсем в тему вопрос, но вот искал аналог
> InterlockedIncrement для Линукса.
> HK>Наткнулся вот на такой пример
> <http://www.verycoder.com/programmers-archive/16/developer-tech-c++-160558.shtm&gt;
> (см. второй ответ).
> HK>Насколько это будет работоспособно? Платформа х86.
>
>
> В принципе похоже. Это асм под gcc компилятор. Так же можешь поглядеть в
> библиотеках типа boost или ACE (Atomic_Op.cpp).

Да, похоже на правду. Можно еще взять из asm/atomic.h

> А вообще смотри здесь:

> Built-in functions for atomic memory access
> <http://gcc.gnu.org/onlinedocs/gcc-4.1.2/gcc/Atomic-Builtins.html#Atomic-Builtins&gt;
> В gcc просто есть встроенные функции типа __sync_fetch_and_add() и
> __sync_val_compare_and_swap() — так что изобретать ничего не надо...

Интересно, а как gcc учитывает SMP/UNP конфигурации ядра? И учитывает ли?
Posted via RSDN NNTP Server 2.1 beta
Re[3]: Неблокируемая очередь сообщений для двух потоков
От: HaronK Украина  
Дата: 28.12.07 18:15
Оценка:
Здравствуйте, remark, Вы писали:

R>В принципе похоже. Это асм под gcc компилятор. Так же можешь поглядеть в библиотеках типа boost или ACE (Atomic_Op.cpp).


R>А вообще смотри здесь:

R>Built-in functions for atomic memory access
R>В gcc просто есть встроенные функции типа __sync_fetch_and_add() и __sync_val_compare_and_swap() — так что изобретать ничего не надо...

Спасибо, гляну.
Re[20]: Неблокируемая очередь сообщений для двух потоков
От: remark Россия http://www.1024cores.net/
Дата: 29.12.07 11:00
Оценка:
Здравствуйте, hexis, Вы писали:

H>remark wrote:

>>
>> Поэтому более правильно ставить задачу как "without atomic rmw
>> operations or heavy memory barriers".
>>

H>Ну тогда я выскажу следующую мысль — в заданных условиях, идеальное

H>решение невозможно.Обоснование такое. Рассмотрим упрощенную модель
H>работы mutex в приложении к двум потокам:


Про это я и говорил, что в общем виде эта задача видимо не решается.
Существуют некоторые решения для *специальных* случаев. Вопрос в том, насколько такой *специальный* случай распространён. Т.е. насколько всё-таки решение применимо.

Если интересно, то можешь поглядеть следующие ссылки:
Relaxed-Locks
Это описание лока, который используется в SUN JVM. С описанием некоторых интересных деталей, типа directed-handoff vs competitive handoff, futile wakeup throttling.

Quickly Reacquirable Locks
Тут используется collocation technique. Правда фактически это замена StoreLoad барьеру памяти. Но на SPARC архитектуре StoreLoad дешевле, чем атомарные RMW операции, а эта collocation technique видимо ещё дешевле. Плохие новости состоят в том, что на x86 это работать не будет... т.е. всё равно нужен будет явный StoreLoad барьер...

Seqlock
Это уникальный лок. Он позволяет захватывать/освобождать блокировку на чтение вообще без каких-либо тяжёлых операций. Но правда подходит только для *чистого* чтения.

Asymmetric Dekker Synchronization
Это аналог моего asymmetric_rw_mutex. Идея та же — переносим всю тяжесть с блокировки на чтение на блокировку на запись.



1024cores &mdash; all about multithreading, multicore, concurrency, parallelism, lock-free algorithms
Re[20]: Неблокируемая очередь сообщений для двух потоков
От: remark Россия http://www.1024cores.net/
Дата: 29.12.07 12:32
Оценка:
Здравствуйте, hexis, Вы писали:


H>Видно, что здесь информация передается в две стороны — от блокирующего

H>потока к разблокирующему и обратно. Из чего следует, что, для того,
H>чтобы тот процедура отработала правильно, требуется наличие барьеров.
H>Собственно проблема возникает из-за необходимости синхронизировать
H>состояние двух примитивов — самой блокировки и объекта ожидания.


Круто всё разложил по полочкам
Про передачу информации в две стороны — хорошая аналогия, точнее не аналогия, а способ анализировать алгоритм.
По поводу синхронизации 2 примитивов не согласен. Фактически тут надо синхронизировать 1 примитив — само слово блокировки, а объект ожидания является вспомогательным, т.к. он "не теряющий". Анализируя слово блокировки, разблокирующему потоку достаточно только понять, надо ему сигналить объект ожидания или нет. А когда именно он это сделает по отношению к ожидающему — не важно, хоть до того, как ожидающий заблокируется, хоть после.
Проблема в том, что без атомарной RMW в операции выхода не получается синхронизировать даже один примитив.



H>Соответственно, можно рассмотреть такие способы преодоления проблемы -

H>передача информации только в одну сторону и попытки избежать барьеров
H>путем переноса операций.

H>Передачу информации в одну сторону можно реализовать на практике одним

H>из следующих способов:

H>1) отсутствие барьера в lock(), приведет к тому, что в результате

H>несколько потоков могут одновременно получить блокировку. Очевидно, что
H>при этом требуется проверка конфликта в unlock и откат изменений
H>выполненных в конфликтной ситуации. То есть схема эквивалентна, так
H>называемой, оптимистической блокировке. Но не эквивалентна классической
H>трактовке понятия мьютекса. Так, что вряд ли стоит разрабатывать эту
H>тему в рамках вопроса о мьютексе, хотя сама по себе она достаточно
H>интересна.


Да, идея интересная. Это называется транзакционная память. Можешь гуглить по словам STM, HTM, HyTM.
Модель предоставляемая пользователю, конечно, принципиально отличается от мьютекса — надо делать ретрай, нельзя делать ничего, что было бы нельзя "откатить" и т.д.



H>2) отсутствие барьера у unlock() приведет к тому, что проверка в

H>unlock() предиката (wait) может дать ложные результаты. Что будет
H>приводить к подвисанию потоков в lock(). Возможные пути решения
H>проблемы, пришедшие мне в голову, можно просуммировать следующим образом:

H>2.1) Wait не должен быть блокирующимся. Требуется, как минимум вторая

H>проверка, после истечения времени барьера. Это возможно двумя путями -
H>полное отсутствие Wait и Signal (вариант spin lock-а) или
H>Wait(TIMEOUT)+Wait(INFINITE). Таймаут равен времени записи в память плюс
H>1. Достаточно практично было бы обрабатывать опрос с таймаутом при
H>помощи spin опроса, но без дополнительного барьера.


Я думал над этим. Ничего хорошего/практического не придумывается.
Во-первых, не понятно как определять этот TIMEOUT. Очевидно, что он сильно архитектурно зависим. Возможно он зависит от текущей загрузки системы...
А брать "заведомо достаточный" интервал, типа 10 мкс. получается бессмысленно. Ждать столько в активном спине дорого. Блокироваться лишний раз — так это мы так только сделаем операцию дороже...



H>2.2) Wait должен знать об ожидании на предикате. Что приводит к мысли о

H>futex. К сожалению — трудно уговорить производителя ОС включить в API
H>новый предикат.


Опять же, если это будет в ядре, то это бессмысленно. Переход в ядро обычно дороже одной атомарной RMW.
В разработке эффективных примитивов это основная проблема — используемые средства должны быть *максимально* лёгкими — нельзя вызывать ядро, нельзя делать атомарные RMW, нельзя использовать барьеры памяти, нельзя лишний раз писать в разделяемую память — короче ничего нельзя



H>2.3) Wait должен получать гарантированное оповещение. Для этого при

H>unlock() должен всегда вызываться Signal(). Вариация на тему обычного
H>mutex из OS, что, в данном случае, не имеет особого смысла.

Да.

H>2.4) Наличие помощника/посредника. unlock() может сигналить, если он

H>точно знает, что другая нитка ждет. И запрашивать проверку состояния по
H>истечении времени барьера у посредника, если предикат wait оказался
H>ложным. В идеале этот посредник — планировщик OS, но на практике это
H>может быть выделенный поток.


Опять же это что-то типа фьютекса получается.
И встаёт вопрос — в какова будет цена периодически проверять *все* критические секции в программе... И какой выбрать период...


H>Перенос кода из lock() в unlock() невозможен без изменения семантики.

H>Но можно перенести код из unlock() в lock():

H>
H>lock()
H>{
H>    /* barrier */
H>    if (wait > 0 && free) {
H>        wait++;
H>        Signal();
H>        Wait();
H>        wait--;
H>    } else if (!free) {
H>        wait++;
H>        Wait();
H>        wait--;
H>    }
H>    free = false;
H>}

H>unlock()
H>{
H>    /* no-barrier */
H>    if (wait > 0)
H>        Signal();
H>    free = true;
H>}
H>


H>Собственно, недостатки этого решения очевидны — тут нужна конкуренция

H>определённого вида. А именно — должно быть как минимум два конкурирующих
H>потока — нужно гарантировать запрос блокировки.


Я сейчас разрабатываю event-driven систему. Такая система интересна тем, что можно гарантировать, что все потоки периодически совершают некоторую нужную активность. И в принципе я применяю похожий трюк. С небольшой вероятностью какой-то поток может что-то "пропустить", но тогда, допустим, каждые 10 событий я проверяю, а не пропустил ли кто чего.
В event-driven системе это становится затруднительнее, т.к. пользовательский поток нельзя заставить периодически выполнять некоторую активность...



H>Примерно так. Я пока не вижу других способов решения вопроса. Из

H>приведенного, на мой взгляд есть два практичных решения — futex и
H>ожидание с таймаутом.

H>Соответственно, код будет выглядеть наподобие следующего (нетестированно):


H>
H>struct mutex
H>{
H>    long requested;  /* количество запросов на блокировки */
H>    long granted;    /* количество выданных блокировок    */
H>    long sent;       /* количество посланных сигналов     */
H>    long got;        /* количество полученных сигналов    */
H>};

H>void
H>init(mutex * m)
H>{
    m->>granted = 0;
    m->>requested = 0;
    m->>sent = 0;
    m->>got  = 0;
H>}

H>void
H>lock(mutex * m)
H>{
H>    long mine;
H>    int  j;

H>    /* InterlockedIncrement включает в себя барьер */
H>    mine = InterlockedIncrement(&m->requested) - 1L;
H>    while (m->granted != mine) {
H>        for(j = 0; j < WAIT_COUNT; j++) {
H>            if (m->granted == mine)
H>                return;
H>        }
H>        Wait(m, INFINITE);
    m->>got++;
H>    }
H>}

H>void
H>unlock(mutex * m)
H>{
H>    if (m->requested != ++m->granted && m->sent == m->got) {
    m->>sent++;
H>        Signal(m);
H>    }
H>}
H>


H>Вероятность spurious wakeup при такой технике значительно повышена (тем

H>ниже, чем ближе WAIT_COUNT к времени исполнения FENCE (задержку, кстати,
H>можно калибровать на старте). Соответственно, предпочтительно
H>использовать бинарный семафор или Event. Или же можно подсчитывать
H>соотношение посланных и полученных сигналов (на тех-же условиях).


Хммм... Надо будет подумать над этим. Это что-то типа алгоритма билета, модифицированного для использования так же и блокировки — оригинальный алгоритм билета использует только спин-ожид ание.
Единственное, что сразу приходит — честные мьютексы, конечно, без сильной нужны применять накладно — т.к. они обычно очень плохо себя ведут в реальных условиях. Самое неприятное, что у них есть — handoff ownership проблема. Подробгее смотри здесь:
http://www.accu.org/index.php/journals/1324



1024cores &mdash; all about multithreading, multicore, concurrency, parallelism, lock-free algorithms
 
Подождите ...
Wait...
Пока на собственное сообщение не было ответов, его можно удалить.