пятница, 12 ноября 2010 г.

Lock-free программирование - часть 1

Блокировки являются мощным средством синхронизации действий в многопоточной среде, но как и все мощное, эта это средство таит в себе множество недостатков. Блокировка довольно медленная операция и требует много ресурсов, проблемные блокировки могут вызвать deadlock, поток с низким приоритетом может заблокировать на неопределенное время другой поток, с более высоким приоритетом...и так далее. Эта статья призвана дать базовые знания об альтернативном методе написания потокобезопасного кода.

Терминология

Lock-free – lock-free считается та процедура, для которой гарантируется прогресс как минимум одного потока, выполняющего эту процедуру. Другие потоки могут ждать, но один поток минимум должен прогрессировать.

Wait-free – операция называется wait-free в том случае, если она завершается за определенное количество шагов, не зависящих от состояние и действий других потоков.

Вступление

Может показаться, что техника lock-free является отличной заменой блокировочным операциям, но это не совсем так. Было бы неплохо иметь под рукой набор неблокирующих контейнеров (например stack или hash map) и использовать их в своем коде, но отказываться от блокировок совсем и заменять их неблокирущими операциями не стоит. Во первых не все можно написать используя неблокирующие операции. Например, double linked list на сегодняшний день не имеет неблокирующего алгоритма реализации. Во вторых это очень сложно даже для экспертов. Можно написать неблокирующий код, который работает, но очень сложно написать его правильно и оптимально. Даже авторитетные журналы иногда публикуют неблокирующий код, содержащий ошибки. Хватит лирики, перейдем к делу.

Атомарные операции

Атомарность означает неделимость операции. Это значит, что ни один поток не может увидеть промежуточное состояние операции, она либо выполняется, либо нет. Рассмотрим пример простой операции инкрементирования значения.
int x = 0;
++x;
Даже такой простой код нужно синхронизировать в многопоточной среде. Если посмотреть ассемблерный листинг второй строки, мы увидим, что она состоит из 3х операций.
013C5595  mov         eax,dword ptr [x]  
013C5598  add         eax,1  
013C559B  mov         dword ptr [x],eax  
  • Загрузка текущего значения из памяти в регистр eax
  • Инкрементирование значения регистра eax
  • Запись значения регистра eax обратно в память
Модификация встроенных C++ типов не является атомарной, т.е. если два потока одновременно попытаются модифицировать переменную x, мы вполне можем получить ситуацию, где значение x станет 1 после двух инкрементов. Ситуации подобные этой, в которых финальный результат зависит от очередности выполнения, называется data race. Можно исправить этот код и сделать его потокобезопасным добавив блокировку перед инкрементом и разблокировку после него, тем самым обеспечивая атомарность операции инкрементирования, но есть и другой способ.
Инкремент, декремент, чтение, запись и некоторые другие операции являются достаточно простыми и могут быть выполнены процессором атомарно. Что, если процессор может реализовать их в виде специальных инструкций на аппаратном уровне? Например операция инкремент/декремента, сохранения и загрузки значения могут быть реализованы атомарно на аппаратном уровне.
Мы рассмотрим фундаментальную операцию, которая необходима в lock-free программировании и которая предоставляется практически всеми современными процессорами.

CAS (Compare and Swap)

Эту операцию также называют compare exchange. Суть операции заключается в том, что она атомарно сравнивает значение одного объекта с другим и при удачном сравнении заменяет значение объекта.
Псевдокод
bool CAS(long* obj, long exp, long val)
{
    atomically do
    {
        if (*obj == exp)
        {
            *obj = val;
            return true;
        }
        return false;
    }
}
Есть также DCAS (Double compare-and-swap), которая выполняет ту же операцию для двух объектов, но на данный момент она реализована только на процессоре Motorola 68040 (причем довольно неэффективно) и рассматривать ее не имеет никакого смысла.
Поскольку текущий стандарт C++ не содержит упоминаний атомарных операциях нам придется выбирать: либо писать псевдокод, либо использовать одну из нестандартных библиотек. Немного поразмыслив я решил, что наиболее подходящей библиотекой для использования в примерах является C++ standard threading library, которая будет реализована в новом стандарте и уже частично содержится в gcc 4.5 (полностью эта библиотеке реализована компанией Just Software Solutions и называется just::thread). Для начала ознакомимся с атомарными типами нового стандарта C++.

C++0x atomics

Стоит заметить, что стандарт C++0x не гарантирует, что все атомарные операции будут lock-free, единственный тип, для которого дается гарантия – это std::atomic_flag.
std::atomic_flag является самым простым атомарным типом, который имеет две операции – set и clear. Ниже следует список остальных атомарных типов C++0x.
Атомарный типСоответствующий встроенный тип
std::atomic_boolbool
std::atomic_charchar
std::atomic_scharsigned char
std::atomic_ucharunsigned char
std::atomic_intint
std::atomic_uintunsigned int
std::atomic_shortshort
std::atomic_ushortunsigned short
std::atomic_longlong
std::atomic_ulongunsigned long
std::atomic_llonglong long
std::atomic_ullongunsigned long long
std::atomic_char16_tchar16_t
std::atomic_char32_tchar32_t
std::atomic_wchar_twchar_t
std::atomic_addressvoid*

Библиотека также содержит шаблонный тип std::atomic. Это не весь список, но для наших нужд хватит. Атомарные типы предоставляют обычный для типа интерфейс (операции типа ++, --, +=, -=, *=, /=...и так далее), а также следующие функции

load()
Прочитать текущее значение
store()
Установить новое значение
exchange()
Установить новое значение и вернуть предыдущее
compare_exchange_weak()
Аналог вышеописанной функции CAS (с одной разницей)
compare_exchange_strong()
Аналог вышеописанной функции CAS
fetch_add()
Аналог оператора ++
fetch_or()
Аналог оператора --
is_lock_free()
Возвращает true, если операции на данном типе неблокирующие

Разница между compare_exchange_weak и compare_exchange_strong заключается в том, что compare_exchange_weak на некоторых платформах может ложно не сработать (т.е. вернуть false и не выполнить обмен даже в случае равных значений). compare_exchange_strong вызывает compare_exchange_weak в цикле. Когда compare_exchange_strong вызывается в цикле имеет смысл использовать compare_exchange_weak, чтобы избежать вложенных циклов. Очень важным моментом явлается то, что все операции на атомарных типах в C++0x по умолчанию являются последовательно консистентными (sequentially consistent), т.е. все операции с атомарными типами выполняются и наблюдаются другими процессорами в той очередности, в которой они были написаны. Последовательная консистентность - самая простая для понимания и использования модель памяти, но она же и самая медленная на некоторых архитектурах. Широко используемые процессорные архитектуры (x86-64) предоставляют последовательную консистентность без особых потерь, но на других процессорах это может значительно повлиять на производительность. Это стоит учитывать, при оптимизации для конкретного процессора. Ну что ж, теперь, когда мы ознакомились с атомарными типами и операциями мы можем приступать к делу.

Lock-free стек

Для начала спроектируем интерфейс стека. Посмотрим на его интерфейс в библиотеке STL. Отбросим конструктор, деструктор и другие вспомогательные функции и посмотрим на самое главное.
template <typename T>
class stack
{
public:
    void push(const T& v);
    void pop();
    const T& top() const;
    T& top();
};
К сожалению, в этом виде невозможно сделать стек потокобезопасным, даже с помощью блокировок. Представьте, что два потока пытаются одновременно работать со стеком, первый пытается добавить значение вызовом push(), а второй вызывает top(), а потом pop(). Второй поток читает значение top(), управление передается первому, который вставляет новое значение и управление снова возвращается второму потоку, который благополучно удаляет из стека новое значение. Потокобезопасность, как и безопасность исключений нельзя оставлять на потом при проектировке, это может повлиять на интерфейс проектируемого класса. Для решения этой проблемы нам понадобиться сделать атомарными операции top() и pop(), т.е. включить их в одну функцию.
template <typename T>
class stack
{
public:
    void push(const T& v);
    T pop();
};
В данном случае мы теряем строгую безопасность исключений, ведь если копирующий конструктор сгенерирует исключение при возвращении объекта, мы потеряем его. Это можно решить передачей ссылки
template <typename T>
class stack
{
public:
    void push(const T& v);
    void pop(T& v);
};
Следующим шагом станет выбор структуры данных для стека. Контейнеры STL не потокобезопасны и для реализации стека, через какой либо STL контейнер нам придется при модификации копировать весь контейнер, работать с копией, а потом атомарно заменить его указатель. Это довольно медленно, потому для повышения производительности нам придется самим реализовать структуру данных для стека. Односвязный список нас полностью устраивает, он довольно прост и решает задачу.
Объявляем простую структуру для односвязного списка
struct node
{
    node* next;
    T data;
    node(const T& d, node* n = 0)
        :next(n), data(d) {}
};
А в классе храним атомарный указатель на тип node, это указатель на головной элемент стека.
Далее реализуем функцию push. Для добавления нового элемента в список нам потребуется три действия.
  • Создание нового узла (node)
  • Установка в созданном узле указателя на следующий узел (головной)
  • Замена головного узла на новый
void push(const T& data)
{
    node* new_node = new node(data, head.load());
    while(!head.compare_exchange_weak(
        new_node->next,
        new_node));
}
Самая интересная часть здесь – это последняя строка функции. compare_exchange_weak сравнивает head с new_node->next и при удачном сравнении заменяет значение head на new_node. При неудачном сравнении compare_exchange_weak сохраняет текущее значение head в new_node->next, избавляя нас от надобности делать это самим. Зачем здесь нужен compare_exchange? Почему бы просто не заменить значение? Потому что если другой поток в это время совершит вставку в стек мы можем потерять это значение. В цикле мы проверяем, что head, который мы собираемся заменить действительно тот самый, который мы загрузили в node->next (не забывайте, между 3-eй и 4-ой строкой другой поток может выполнить любые действия, здесь нет блокировок). Вот и все, функция push готова. Перейдем к pop.
Для реализации функции pop потребуется два действия.
  • Заменить указатель текущего головного узла на следующий
  • Возвращение значения элемента
void pop(T& result)
{
    node* old_head = head.load();
    while (!head.compare_exchange_weak(
        old_head,
        old_head->next));
    result = old_head->data;
}
Здесь мы используем ту же технику. Цикл while нужен для того, чтобы удостовериться, что мы меняем именно тот head, который мы загрузили на первой строке.
В этом функции есть три проблемы
  • Он не будет работать для пустого стека
  • Эта функция не гарантирует строгой безопасности исключений
  • Утечка памяти
1-ую проблему очень просто решить проверкой на null в цикле while, но для решения второй проблемы мы не можем сперва сохранить значение элемента, а потом удалять его, ведь тогда другой поток может вставить новое значение между двумя этими действиями, потому мы вынуждены сперва удалять элемент а потом использовать его значение. Попробуем немного изменить интерфейс стека, чтобы он возвращал умный указатель.
Изменяем структуру узла
struct node
{
    node* next;
    std::shared_ptr<T> data;
    node(const T& d, node* n = 0)
        :next(n), data(std::make_shared<T>(d)) {}
};
И функцию pop
std::shared_ptr<T> pop()
{
    node* old_head = head.load();
    while (old_head
         && !head.compare_exchange_weak(
            old_head,
            old_head->next));
    return old_head ?
        old_head->data :
        std::shared_ptr<T>();
}
И наконец, соединим это все в один шаблонный класс stack.
template <typename T>
class stack
{
private:
    struct node
    {
        node* next;
        std::shared_ptr<T> data;
        node(const T& d, node* n = 0)
            :next(n), data(std::make_shared<T>(d)) {}
    };
    std::atomic<node*> head;
public:
    void push(const T& data)
    {
        node* new_node = new node(data, head.load());
        while (!head.compare_exchange_weak(
            new_node->next,
            new_node));
    }
    std::shared_ptr<T> pop()
    {
        node* old_head=head.load();
        while (old_head
         && !head.compare_exchange_weak(
            old_head,
            old_head->next));
        return old_head ?
            old_head->data :
            std::shared_ptr<T>();
    }
};
Но как уже писалось выше, этот код имеет еще одну проблему, которую мы так и не решили – утечка памяти. Мы создаем объект узла с помощью оператора new, но нигде его не удаляем. К сожалению, тут начинаются настоящие проблемы, это не так-то просто. Любой другой поток может все еще ссылаться на узел, который мы хотим удалить. В языке C++ нет сборщика мусора, и придется нам реализовывать его самим, как бы страшно это не звучало.

В статье использован пример из книги C++ Concurrency in Action

Ссылки

Продолжение следует…

8 comments:

niXman комментирует...

Познавательно.
Спасибо.

Анонимный комментирует...

К сожалению, в этом виде сделать стек невозможно сделать потокобезопасным, даже с помощью блокировок.

Слово "сделать" написанно дважды

Tigran Avanesov комментирует...

Исправил.
Большое спасибо.

Анонимный комментирует...

bуду рад комментариям:
http://forum.vingrad.ru/forum/topic-316253/anchor-entry2255736/0.html

Анонимный комментирует...


bool CAS(long* obj, long exp, long val)
{
atomically do
{
if (*obj == exp)
{
*obj = val;
return true;
}
return false;
}
}

compare_exchange_weak сравнивает head с new_node->next и при удачном сравнении заменяет значение head на new_node. При неудачном сравнении compare_exchange_weak сохраняет текущее значение head в new_node->next

compare_exchange_strong()
Аналог вышеописанной функции CAS


Cbивает с толку. На самом деле compare_exchange_strong аналог с одной разницей. А compare_exchange_weak с двумя.
compare_exchange_weak сохраняет текущее значение head в new_node->next

Леопольд

Tigran Avanesov комментирует...

Это смотря с чем сравнивать.

compare_exchange_strong() -
Аналог вышеописанной функции CAS
compare_exchange_weak не является аналогом вышеописанной функции. compare_exchange_weak на некоторых платформах может ложно не сработать.
Из документации
Additionally, this may fail spuriously (and return false) even if the values were equal

Анонимный комментирует...

я имел ввиду что compare_exchange_strong раbотает так:

bool CAS(long* obj, long & exp, long val)
{
atomically do
{
if (*obj == exp)
{
*obj = val;
return true;
}
exp = *obj;
return false;
}
}

Tigran Avanesov комментирует...

Они оба так работают, разница в том, что один (strong) в цикле вызывает другой (weak).