вторник, 23 ноября 2010 г.

Lock-free программирование - часть 2

В предыдущей статье мы ознакомились с атомарными операциями в C++, с техникой lock-free и wait-free программирование, а также узнали об основной проблеме lock-free программирования в C++. В среде, в которой есть сборщик мусора эта проблема нас бы не волновала, но по стандарту C++ пользователь обязан очищать память с помощью оператора delete и на данный момент очень мало реализаций компилятора C++ имеют механизм сборки мусора. Так или иначе мы не можем надеяться на компилятор и делать наш код непереносимым (за редкими исключениями) и нам придется самим обеспечить сборку мусора для наших структур данных.
Для начала рассмотрим еще раз проблему, связанную с удалением узла.
void push(const T& data)
{
    node* new_node = new node(data, head.load());
    while (!head.compare_exchange_weak(
        new_node->next, new_node));
}
std::shared_ptr<T> pop()
{
    node* old_head=head.load();
    while (old_head && !head.compare_exchange_weak(
        old_head, old_head->next));
    return old_head ? old_head->data : std::shared_ptr<T>();
}
Если удалить old_head в функции pop непосредственно перед возвращением значения, возможна ситуация, когда другой поток обращается к head->next в функции pop, что приведет нас в волшебный мир неопределенного поведения. Далее в статье мы рассмотрим алгоритмы, специальное разработанные для решения проблемы очистки памяти и начнем мы с самого простого.

Подсчет потоков

Единственным препятствием для удаления узла является то, что другой поток может все еще хранить ссылку на этот узел и потому мы можем считать количество вхождений в функцию pop и удалять узел только тогда, когда это единственный поток, выполняющий функцию. Естественно переменная для подсчета количества потоков тоже должна быть атомарной. Изменим функцию pop в соответствии с этим
std::shared_ptr<T> pop()
{
    ++threads_in_pop;
    node* old_head = head;
    while(old_head && !head.compare_exchange_weak(old_head, old_head->next));
    std::shared_ptr<T> res;
    if(old_head)
        res.swap(old_head->data);

    // пытаемся удалить old_head
    try_reclaim(old_head);

    return res;
}
Здесь при вхождении в функцию мы инкрементируем значение счетчика и в самом конце пытаемся удалить узел old_head вызовом функции try_reclaim. Теперь рассмотрим саму функцию.
Ее работа заключается в том, чтобы проверить наличие других потоков в функции pop и удалить узел в том случае, если текущий поток – единственный, иначе сохранить указатель на узел в каком нибудь списке для дальнейшего удаления. Для реализации этого списка никаких дополнительных средств нам не понадобится, так-как структура node сама по себе предназначена для реализации односвязного списка, мы просто переносим узел из одного списка в другой.
Добавим в класс переменные
std::atomic<unsigned> threads_in_pop;
std::atomic<node*> to_be_deleted;
Первая будет хранить количество потоков в функции pop, а вторая головной элемент списка узлов, которые необходимо удалить.

Рассмотрим код.
void try_reclaim(node* old_head)
{
    if(threads_in_pop == 1)
    {
        node* nodes_to_delete = to_be_deleted.exchange(NULL);
        if(!--threads_in_pop)
        {
            delete_nodes(nodes_to_delete);
        }
        else if(nodes_to_delete)
        {
            chain_node_list(nodes_to_delete);
        }
        delete old_head;
    }
    else
    {
        chain_one_node(old_head);
        --threads_in_pop;
    }
}
Сперва мы проверяем наличие других потоков в функции. Если поток всего один (текущий) нам нужно как-то отделить текущие узлы, чтобы другие потоки не помешали работе с ними. Для этого мы просто сохраняем текущий указатель на головной элемент списка to_be_deleted и записываем в него значение NULL (это указывает на то, что список пуст). После этого мы должны уменьшить значение счетчика и еще раз проверить его значение. Еще одна проверка значения счетчика нужна для того, чтобы удостовериться, что никакой другой поток не начал выполняться в промежутке между первой проверкой и обменом указателя to_be_deleted на NULL. Убедившись, что никакой другой поток не начал выполнение функции pop мы можем спокойно перейти непосредственно к удалению узлов, в противном случае мы должны вернуть узлы на место (в список to_be_deleted), чтобы удалить их позже, при следующей удобной возможности. Это делает функция chain_node_list. Если же при входе в функцию количество потоков превышает единицу, мы просто добавляем узел в список вызовом функции chain_one_node и уменьшаем значение счетчика.
Реализация функций chain_one_node и chain_node_list не представляет собой ничего особо сложного, первая добавляет 1 узел в начало списка, вторая - список узлов.
void chain_node_list(node* first, node* last)
{
    last->next = to_be_deleted;
    while (!to_be_deleted.compare_exchange_weak(last->next, first));
}
Функция принимает два узла, связанных в виде списка и добавляет их в начало списка удаляемых узлов to_be_deleted. Сперва следующий элемент последнего узла переданного списка устанавливается на текущий головной элемент to_be_deleted, после чего to_be_deleted атомарно заменяется на first (используется все та же техника CAS). Нам также необходима перегруженная функция chain_node_list, которая делает тоже самое, но на основе первого элемента списка, последний элемент она находит сама и в итоге вызывает первую.
void chain_node_list(node* nodes)
{
    node* last = nodes;
    while(node* const next = last->next)
        last = next;
    chain_pending_nodes(nodes, last);
}
Третья функция для работы со списком удаляемых узлов нужна для добавления всего лишь одного узла. Работает она опять таки на основе первой, просто передавая ей тот же самый элемент в качестве и первого и последнего.
void chain_one_node(node* n)
{
    chain_pending_nodes(n, n);
}
Осталось только определить саму функцию удаления узлов и мы закончили.
static void delete_nodes(node* nodes) 
{ 
    while(nodes)
    {
        node* next = nodes->next;
        delete nodes;
        nodes = next;
    }
}
Здесь все довольно просто, так как в этой функции может работать один единственный поток и никаких средств синхронизации и защиты не требуется, мы просто проводим итерацию по списку и удаляем все узлы. Мы сделали функцию статической, так-как она не привязана к конкретному стеку. Вот и все.
template<typename T>
class stack
{
private:
    struct node
    {
        node* next;
        std::shared_ptr<T> data;
        node(const T& d, node* n = 0)
            : next(n), data(std::make_shared<T>(d)){}
    };
    std::atomic<node*> head;
    std::atomic<node*> to_be_deleted;
    std::atomic<unsigned> threads_in_pop;
public:
    void push(const T& data)
    {
        node* new_node = new node(data, head.load());
        while (!head.compare_exchange_weak(new_node->next, new_node));
    }
    std::shared_ptr<T> pop()
    {
        ++threads_in_pop;
        node* old_head = head;
        while(old_head && !head.compare_exchange_weak(old_head, old_head->next));
        std::shared_ptr<T> res;
        if(old_head)
            res.swap(old_head->data);

        // пытаемся удалить old_head
        try_reclaim(old_head);

        return res;
    }
private:
    void chain_node_list(node* first, node* last)
    {
        last->next = to_be_deleted;
        while (!to_be_deleted.compare_exchange_weak(last->next, first));
    }
    void chain_node_list(node* nodes)
    {
        node* last=nodes;
        while(node* const next=last->next)
            last=next;
        chain_pending_nodes(nodes,last);
    }
    void chain_one_node(node* n)
    {
        chain_pending_nodes(n, n);
    }
    static void delete_nodes(node* nodes)
    {
        while(nodes)
        {
            node* next=nodes->next;
            delete nodes;
            nodes=next;
        }
    }
    void try_reclaim(node* old_head)
    {
        if(threads_in_pop == 1)
        {
            node* nodes_to_delete = to_be_deleted.exchange(NULL);
            if(!--threads_in_pop)
            {
                delete_nodes(nodes_to_delete);
            }
            else if(nodes_to_delete)
            {
                chain_node_list(nodes_to_delete);
            }
            delete old_head;
        }
        else
        {
            chain_one_node(old_head);
            --threads_in_pop;
        }
    }
};
К сожалению этот метод не лишен недостатков. Неблокирующие структуры данных зачастую используются в условиях очень высокой нагрузки, а в таких условиях маловероятна ситуация, когда функцию pop будет выполнять один единственный поток и теоретически список to_be_deleted может постоянно расти, тем самым расходуя память.
Для тех, кто ищет высокоэффективное решение проблемы очистки памяти, советую взглянуть на технику Hazard Pointers, разработанную Maged Michael – ом и в дальнейшем запатентованную корпорацией IBM. По сути hazard pointers – это механизм, с помощью которого поток сообщает остальным, что он использует объект и его нельзя удалять. Подробно изучить эту технику и посмотреть примеры реализации можно пройдя по ссылкам в конце статьи.

В статье использован пример из книги C++ Concurrency in Action

Ссылки