Поставщик.
Рассмотрим первый из двух основных методов: Produce. Задача состоит в том, что бы позволить им работать на столько параллельно, на сколько возможно:
void Produce(const T& t) {
Node* tmp = new Node(new T(t));
while (producerLock.exchange(true)) { } // вход в критическую секцию
last->next = tmp; // добавить потребителям
last = tmp; // двигаем указатель
producerLock = false; // выход из критической секции
}
Во-первых, мы хотим выполнить как можно больше вне критической части кода, обновляющего очередь. В этом случае мы сможем выполнять выделение и коструирование узла и его значения одновременно, вне зависимости от числа поставщиков и потребителей.
Во-вторых, мы "применяем" изменения, получая исключительный доступ к хвосту очереди. В цикле while мы пытаемся поменять prodicerLock в true до тех пор, пока старое значение не было false; полученное старое значение true означает, что кто-то другой уже получил доступ в критическую секцию. Подобный цикл while можно прочесть как "до тех пор, пока я не буду единственным, кто поменял значение prodicerLock из false в true". Потом мы можем обновить last->next и, собственно, сам last; это две различные операции доступа (записи) в память, которые не могут быть выполнены одновременно на большенстве процессоров без какой-либо блокировки. В завершении, мы устанавливаем producerLock в false, освобождая критическую секцию.
Подобным же образом мы хотим поддерживать любое количество потоков, называемых Consume, и обеспечивать их выполнение параллельно на столько, на сколько это возможно. Во-первых, мы получаем исключительный доступ к голове очереди.
Во-вторых, мы "применяем" изменения, получая исключительный доступ к хвосту очереди. В цикле while мы пытаемся поменять prodicerLock в true до тех пор, пока старое значение не было false; полученное старое значение true означает, что кто-то другой уже получил доступ в критическую секцию. Подобный цикл while можно прочесть как "до тех пор, пока я не буду единственным, кто поменял значение prodicerLock из false в true". Потом мы можем обновить last->next и, собственно, сам last; это две различные операции доступа (записи) в память, которые не могут быть выполнены одновременно на большенстве процессоров без какой-либо блокировки. В завершении, мы устанавливаем producerLock в false, освобождая критическую секцию.
Потребитель
Подобным же образом мы хотим поддерживать любое количество потоков, называемых Consume, и обеспечивать их выполнение параллельно на столько, на сколько это возможно. Во-первых, мы получаем исключительный доступ к голове очереди.
bool Consume(T& result) {
while(consumerLock.exchange(true)) { } // вход в критическую секцию
Далее, мы получаем значение указателя next. Если он не нулевой, мы должны получить первое значение, но важно сократить пребывание в исключительной секции на столько, на сколько это возможно.
Теперь, закончив манипуляции со списком, остальные поставщики могу продолжить свое выполнение, пока мы занимаемся копированием и очисткой в сторонке.
Иначе, нулевое значение theNext, означает, что список пуст и мы можем немедленно выйти из критической секции и вернуть код завершения.
Node* theFirst = first;
Node* theNext = first->next;
if(theNext != nullptr) { // если очередь не пуста
T* val = theNext->value; // взять
theNext->value = nullptr; // узел
first = theNext; // подвинуть указатель
consumerLock = false; // выйти из исключительной секции
Теперь, закончив манипуляции со списком, остальные поставщики могу продолжить свое выполнение, пока мы занимаемся копированием и очисткой в сторонке.
result = *val; // скопируем его обратно
delete val; // удалим значение
delete theFirst; // и временный узел
return true; // сообщим об успешном завершении
}
Иначе, нулевое значение theNext, означает, что список пуст и мы можем немедленно выйти из критической секции и вернуть код завершения.
consumerLock = false; // выйдем из критической секции
return false; // сообщим, что очередь пуста
}
};
Комментариев нет:
Отправить комментарий