В предыдущей статье мы ознакомились с атомарными операциями в C++, с техникой lock-free и wait-free программирование, а также узнали об основной проблеме lock-free программирования в C++. В среде, в которой есть сборщик мусора эта проблема нас бы не волновала, но по стандарту C++ пользователь обязан очищать память с помощью оператора delete и на данный момент очень мало реализаций компилятора C++ имеют механизм сборки мусора. Так или иначе мы не можем надеяться на компилятор и делать наш код непереносимым (за редкими исключениями) и нам придется самим обеспечить сборку мусора для наших структур данных.
Для начала рассмотрим еще раз проблему, связанную с удалением узла.
Ее работа заключается в том, чтобы проверить наличие других потоков в функции pop и удалить узел в том случае, если текущий поток – единственный, иначе сохранить указатель на узел в каком нибудь списке для дальнейшего удаления. Для реализации этого списка никаких дополнительных средств нам не понадобится, так-как структура node сама по себе предназначена для реализации односвязного списка, мы просто переносим узел из одного списка в другой.
Добавим в класс переменные
Рассмотрим код.
Реализация функций chain_one_node и chain_node_list не представляет собой ничего особо сложного, первая добавляет 1 узел в начало списка, вторая - список узлов.
Для тех, кто ищет высокоэффективное решение проблемы очистки памяти, советую взглянуть на технику Hazard Pointers, разработанную Maged Michael – ом и в дальнейшем запатентованную корпорацией IBM. По сути hazard pointers – это механизм, с помощью которого поток сообщает остальным, что он использует объект и его нельзя удалять. Подробно изучить эту технику и посмотреть примеры реализации можно пройдя по ссылкам в конце статьи.
В статье использован пример из книги C++ Concurrency in Action
void push(const T& data) { node* new_node = new node(data, head.load()); while (!head.compare_exchange_weak( new_node->next, new_node)); } std::shared_ptr<T> pop() { node* old_head=head.load(); while (old_head && !head.compare_exchange_weak( old_head, old_head->next)); return old_head ? old_head->data : std::shared_ptr<T>(); }Если удалить old_head в функции pop непосредственно перед возвращением значения, возможна ситуация, когда другой поток обращается к head->next в функции pop, что приведет нас в волшебный мир неопределенного поведения. Далее в статье мы рассмотрим алгоритмы, специальное разработанные для решения проблемы очистки памяти и начнем мы с самого простого.
Подсчет потоков
Единственным препятствием для удаления узла является то, что другой поток может все еще хранить ссылку на этот узел и потому мы можем считать количество вхождений в функцию pop и удалять узел только тогда, когда это единственный поток, выполняющий функцию. Естественно переменная для подсчета количества потоков тоже должна быть атомарной. Изменим функцию pop в соответствии с этимstd::shared_ptr<T> pop() { ++threads_in_pop; node* old_head = head; while(old_head && !head.compare_exchange_weak(old_head, old_head->next)); std::shared_ptr<T> res; if(old_head) res.swap(old_head->data); // пытаемся удалить old_head try_reclaim(old_head); return res; }Здесь при вхождении в функцию мы инкрементируем значение счетчика и в самом конце пытаемся удалить узел old_head вызовом функции try_reclaim. Теперь рассмотрим саму функцию.
Ее работа заключается в том, чтобы проверить наличие других потоков в функции pop и удалить узел в том случае, если текущий поток – единственный, иначе сохранить указатель на узел в каком нибудь списке для дальнейшего удаления. Для реализации этого списка никаких дополнительных средств нам не понадобится, так-как структура node сама по себе предназначена для реализации односвязного списка, мы просто переносим узел из одного списка в другой.
Добавим в класс переменные
std::atomic<unsigned> threads_in_pop; std::atomic<node*> to_be_deleted;Первая будет хранить количество потоков в функции pop, а вторая головной элемент списка узлов, которые необходимо удалить.
Рассмотрим код.
void try_reclaim(node* old_head) { if(threads_in_pop == 1) { node* nodes_to_delete = to_be_deleted.exchange(NULL); if(!--threads_in_pop) { delete_nodes(nodes_to_delete); } else if(nodes_to_delete) { chain_node_list(nodes_to_delete); } delete old_head; } else { chain_one_node(old_head); --threads_in_pop; } }Сперва мы проверяем наличие других потоков в функции. Если поток всего один (текущий) нам нужно как-то отделить текущие узлы, чтобы другие потоки не помешали работе с ними. Для этого мы просто сохраняем текущий указатель на головной элемент списка to_be_deleted и записываем в него значение NULL (это указывает на то, что список пуст). После этого мы должны уменьшить значение счетчика и еще раз проверить его значение. Еще одна проверка значения счетчика нужна для того, чтобы удостовериться, что никакой другой поток не начал выполняться в промежутке между первой проверкой и обменом указателя to_be_deleted на NULL. Убедившись, что никакой другой поток не начал выполнение функции pop мы можем спокойно перейти непосредственно к удалению узлов, в противном случае мы должны вернуть узлы на место (в список to_be_deleted), чтобы удалить их позже, при следующей удобной возможности. Это делает функция chain_node_list. Если же при входе в функцию количество потоков превышает единицу, мы просто добавляем узел в список вызовом функции chain_one_node и уменьшаем значение счетчика.
Реализация функций chain_one_node и chain_node_list не представляет собой ничего особо сложного, первая добавляет 1 узел в начало списка, вторая - список узлов.
void chain_node_list(node* first, node* last) { last->next = to_be_deleted; while (!to_be_deleted.compare_exchange_weak(last->next, first)); }Функция принимает два узла, связанных в виде списка и добавляет их в начало списка удаляемых узлов to_be_deleted. Сперва следующий элемент последнего узла переданного списка устанавливается на текущий головной элемент to_be_deleted, после чего to_be_deleted атомарно заменяется на first (используется все та же техника CAS). Нам также необходима перегруженная функция chain_node_list, которая делает тоже самое, но на основе первого элемента списка, последний элемент она находит сама и в итоге вызывает первую.
void chain_node_list(node* nodes) { node* last = nodes; while(node* const next = last->next) last = next; chain_pending_nodes(nodes, last); }Третья функция для работы со списком удаляемых узлов нужна для добавления всего лишь одного узла. Работает она опять таки на основе первой, просто передавая ей тот же самый элемент в качестве и первого и последнего.
void chain_one_node(node* n) { chain_pending_nodes(n, n); }Осталось только определить саму функцию удаления узлов и мы закончили.
static void delete_nodes(node* nodes) { while(nodes) { node* next = nodes->next; delete nodes; nodes = next; } }Здесь все довольно просто, так как в этой функции может работать один единственный поток и никаких средств синхронизации и защиты не требуется, мы просто проводим итерацию по списку и удаляем все узлы. Мы сделали функцию статической, так-как она не привязана к конкретному стеку. Вот и все.
template<typename T> class stack { private: struct node { node* next; std::shared_ptr<T> data; node(const T& d, node* n = 0) : next(n), data(std::make_shared<T>(d)){} }; std::atomic<node*> head; std::atomic<node*> to_be_deleted; std::atomic<unsigned> threads_in_pop; public: void push(const T& data) { node* new_node = new node(data, head.load()); while (!head.compare_exchange_weak(new_node->next, new_node)); } std::shared_ptr<T> pop() { ++threads_in_pop; node* old_head = head; while(old_head && !head.compare_exchange_weak(old_head, old_head->next)); std::shared_ptr<T> res; if(old_head) res.swap(old_head->data); // пытаемся удалить old_head try_reclaim(old_head); return res; } private: void chain_node_list(node* first, node* last) { last->next = to_be_deleted; while (!to_be_deleted.compare_exchange_weak(last->next, first)); } void chain_node_list(node* nodes) { node* last=nodes; while(node* const next=last->next) last=next; chain_pending_nodes(nodes,last); } void chain_one_node(node* n) { chain_pending_nodes(n, n); } static void delete_nodes(node* nodes) { while(nodes) { node* next=nodes->next; delete nodes; nodes=next; } } void try_reclaim(node* old_head) { if(threads_in_pop == 1) { node* nodes_to_delete = to_be_deleted.exchange(NULL); if(!--threads_in_pop) { delete_nodes(nodes_to_delete); } else if(nodes_to_delete) { chain_node_list(nodes_to_delete); } delete old_head; } else { chain_one_node(old_head); --threads_in_pop; } } };К сожалению этот метод не лишен недостатков. Неблокирующие структуры данных зачастую используются в условиях очень высокой нагрузки, а в таких условиях маловероятна ситуация, когда функцию pop будет выполнять один единственный поток и теоретически список to_be_deleted может постоянно расти, тем самым расходуя память.
Для тех, кто ищет высокоэффективное решение проблемы очистки памяти, советую взглянуть на технику Hazard Pointers, разработанную Maged Michael – ом и в дальнейшем запатентованную корпорацией IBM. По сути hazard pointers – это механизм, с помощью которого поток сообщает остальным, что он использует объект и его нельзя удалять. Подробно изучить эту технику и посмотреть примеры реализации можно пройдя по ссылкам в конце статьи.
В статье использован пример из книги C++ Concurrency in Action
0 comments:
Отправить комментарий