В предыдущей статье мы ознакомились с атомарными операциями в C++, с техникой lock-free и wait-free программирование, а также узнали об основной проблеме lock-free программирования в C++. В среде, в которой есть сборщик мусора эта проблема нас бы не волновала, но по стандарту C++ пользователь обязан очищать память с помощью оператора delete и на данный момент очень мало реализаций компилятора C++ имеют механизм сборки мусора. Так или иначе мы не можем надеяться на компилятор и делать наш код непереносимым (за редкими исключениями) и нам придется самим обеспечить сборку мусора для наших структур данных.
Для начала рассмотрим еще раз проблему, связанную с удалением узла.
Ее работа заключается в том, чтобы проверить наличие других потоков в функции pop и удалить узел в том случае, если текущий поток – единственный, иначе сохранить указатель на узел в каком нибудь списке для дальнейшего удаления. Для реализации этого списка никаких дополнительных средств нам не понадобится, так-как структура node сама по себе предназначена для реализации односвязного списка, мы просто переносим узел из одного списка в другой.
Добавим в класс переменные
Рассмотрим код.
Реализация функций chain_one_node и chain_node_list не представляет собой ничего особо сложного, первая добавляет 1 узел в начало списка, вторая - список узлов.
Для тех, кто ищет высокоэффективное решение проблемы очистки памяти, советую взглянуть на технику Hazard Pointers, разработанную Maged Michael – ом и в дальнейшем запатентованную корпорацией IBM. По сути hazard pointers – это механизм, с помощью которого поток сообщает остальным, что он использует объект и его нельзя удалять. Подробно изучить эту технику и посмотреть примеры реализации можно пройдя по ссылкам в конце статьи.
В статье использован пример из книги C++ Concurrency in Action
void push(const T& data)
{
node* new_node = new node(data, head.load());
while (!head.compare_exchange_weak(
new_node->next, new_node));
}
std::shared_ptr<T> pop()
{
node* old_head=head.load();
while (old_head && !head.compare_exchange_weak(
old_head, old_head->next));
return old_head ? old_head->data : std::shared_ptr<T>();
}
Если удалить old_head в функции pop непосредственно перед возвращением значения, возможна ситуация, когда другой поток обращается к head->next в функции pop, что приведет нас в волшебный мир неопределенного поведения. Далее в статье мы рассмотрим алгоритмы, специальное разработанные для решения проблемы очистки памяти и начнем мы с самого простого.Подсчет потоков
Единственным препятствием для удаления узла является то, что другой поток может все еще хранить ссылку на этот узел и потому мы можем считать количество вхождений в функцию pop и удалять узел только тогда, когда это единственный поток, выполняющий функцию. Естественно переменная для подсчета количества потоков тоже должна быть атомарной. Изменим функцию pop в соответствии с этимstd::shared_ptr<T> pop()
{
++threads_in_pop;
node* old_head = head;
while(old_head && !head.compare_exchange_weak(old_head, old_head->next));
std::shared_ptr<T> res;
if(old_head)
res.swap(old_head->data);
// пытаемся удалить old_head
try_reclaim(old_head);
return res;
}
Здесь при вхождении в функцию мы инкрементируем значение счетчика и в самом конце пытаемся удалить узел old_head вызовом функции try_reclaim. Теперь рассмотрим саму функцию.Ее работа заключается в том, чтобы проверить наличие других потоков в функции pop и удалить узел в том случае, если текущий поток – единственный, иначе сохранить указатель на узел в каком нибудь списке для дальнейшего удаления. Для реализации этого списка никаких дополнительных средств нам не понадобится, так-как структура node сама по себе предназначена для реализации односвязного списка, мы просто переносим узел из одного списка в другой.
Добавим в класс переменные
std::atomic<unsigned> threads_in_pop; std::atomic<node*> to_be_deleted;Первая будет хранить количество потоков в функции pop, а вторая головной элемент списка узлов, которые необходимо удалить.
Рассмотрим код.
void try_reclaim(node* old_head)
{
if(threads_in_pop == 1)
{
node* nodes_to_delete = to_be_deleted.exchange(NULL);
if(!--threads_in_pop)
{
delete_nodes(nodes_to_delete);
}
else if(nodes_to_delete)
{
chain_node_list(nodes_to_delete);
}
delete old_head;
}
else
{
chain_one_node(old_head);
--threads_in_pop;
}
}
Сперва мы проверяем наличие других потоков в функции. Если поток всего один (текущий) нам нужно как-то отделить текущие узлы, чтобы другие потоки не помешали работе с ними. Для этого мы просто сохраняем текущий указатель на головной элемент списка to_be_deleted и записываем в него значение NULL (это указывает на то, что список пуст). После этого мы должны уменьшить значение счетчика и еще раз проверить его значение. Еще одна проверка значения счетчика нужна для того, чтобы удостовериться, что никакой другой поток не начал выполняться в промежутке между первой проверкой и обменом указателя to_be_deleted на NULL. Убедившись, что никакой другой поток не начал выполнение функции pop мы можем спокойно перейти непосредственно к удалению узлов, в противном случае мы должны вернуть узлы на место (в список to_be_deleted), чтобы удалить их позже, при следующей удобной возможности. Это делает функция chain_node_list. Если же при входе в функцию количество потоков превышает единицу, мы просто добавляем узел в список вызовом функции chain_one_node и уменьшаем значение счетчика.Реализация функций chain_one_node и chain_node_list не представляет собой ничего особо сложного, первая добавляет 1 узел в начало списка, вторая - список узлов.
void chain_node_list(node* first, node* last)
{
last->next = to_be_deleted;
while (!to_be_deleted.compare_exchange_weak(last->next, first));
}
Функция принимает два узла, связанных в виде списка и добавляет их в начало списка удаляемых узлов to_be_deleted. Сперва следующий элемент последнего узла переданного списка устанавливается на текущий головной элемент to_be_deleted, после чего to_be_deleted атомарно заменяется на first (используется все та же техника CAS). Нам также необходима перегруженная функция chain_node_list, которая делает тоже самое, но на основе первого элемента списка, последний элемент она находит сама и в итоге вызывает первую.void chain_node_list(node* nodes)
{
node* last = nodes;
while(node* const next = last->next)
last = next;
chain_pending_nodes(nodes, last);
}
Третья функция для работы со списком удаляемых узлов нужна для добавления всего лишь одного узла. Работает она опять таки на основе первой, просто передавая ей тот же самый элемент в качестве и первого и последнего.void chain_one_node(node* n)
{
chain_pending_nodes(n, n);
}
Осталось только определить саму функцию удаления узлов и мы закончили.static void delete_nodes(node* nodes)
{
while(nodes)
{
node* next = nodes->next;
delete nodes;
nodes = next;
}
}
Здесь все довольно просто, так как в этой функции может работать один единственный поток и никаких средств синхронизации и защиты не требуется, мы просто проводим итерацию по списку и удаляем все узлы. Мы сделали функцию статической, так-как она не привязана к конкретному стеку. Вот и все.template<typename T>
class stack
{
private:
struct node
{
node* next;
std::shared_ptr<T> data;
node(const T& d, node* n = 0)
: next(n), data(std::make_shared<T>(d)){}
};
std::atomic<node*> head;
std::atomic<node*> to_be_deleted;
std::atomic<unsigned> threads_in_pop;
public:
void push(const T& data)
{
node* new_node = new node(data, head.load());
while (!head.compare_exchange_weak(new_node->next, new_node));
}
std::shared_ptr<T> pop()
{
++threads_in_pop;
node* old_head = head;
while(old_head && !head.compare_exchange_weak(old_head, old_head->next));
std::shared_ptr<T> res;
if(old_head)
res.swap(old_head->data);
// пытаемся удалить old_head
try_reclaim(old_head);
return res;
}
private:
void chain_node_list(node* first, node* last)
{
last->next = to_be_deleted;
while (!to_be_deleted.compare_exchange_weak(last->next, first));
}
void chain_node_list(node* nodes)
{
node* last=nodes;
while(node* const next=last->next)
last=next;
chain_pending_nodes(nodes,last);
}
void chain_one_node(node* n)
{
chain_pending_nodes(n, n);
}
static void delete_nodes(node* nodes)
{
while(nodes)
{
node* next=nodes->next;
delete nodes;
nodes=next;
}
}
void try_reclaim(node* old_head)
{
if(threads_in_pop == 1)
{
node* nodes_to_delete = to_be_deleted.exchange(NULL);
if(!--threads_in_pop)
{
delete_nodes(nodes_to_delete);
}
else if(nodes_to_delete)
{
chain_node_list(nodes_to_delete);
}
delete old_head;
}
else
{
chain_one_node(old_head);
--threads_in_pop;
}
}
};
К сожалению этот метод не лишен недостатков. Неблокирующие структуры данных зачастую используются в условиях очень высокой нагрузки, а в таких условиях маловероятна ситуация, когда функцию pop будет выполнять один единственный поток и теоретически список to_be_deleted может постоянно расти, тем самым расходуя память.Для тех, кто ищет высокоэффективное решение проблемы очистки памяти, советую взглянуть на технику Hazard Pointers, разработанную Maged Michael – ом и в дальнейшем запатентованную корпорацией IBM. По сути hazard pointers – это механизм, с помощью которого поток сообщает остальным, что он использует объект и его нельзя удалять. Подробно изучить эту технику и посмотреть примеры реализации можно пройдя по ссылкам в конце статьи.
В статье использован пример из книги C++ Concurrency in Action
0 comments:
Отправить комментарий