16 декабря 2010 г.

java.util.concurrent (Санкт-Петербургская группа тестирования JVM)

источник: http://blogs.sun.com/vmrobot/entry/java_util_concurrent
автор: Семён Бойков


java.util.concurrent

Когда я впервые услышал о пакете java.util.concurrent (JSR 166), который предоставляет набор классов для организации межпоточного взаимодействия, я подумал, что это всего лишь усовершенствованный набор контейнерных классов, построенный на основе встроенных в язык Java механизмов межпоточного взаимодействия (synchronized методов и блоков), однако более близкое знакомство с предоставляемым этим пакетом API удивило: ряд возможностей не может быть реализован с использованием стандартных механизмов.



Например, класс ReentrantLock, который аналогичен по функциональности стандартному механизму блокировок, помимо метода захвата блокировки lock(), вызов которого аналогичен входу в synchronized блок, также предоставляет метод lockInterruptibly(), который может быть прерван с использованиемThread.interrupt. То есть имеется возможность прервать поток, пытающийся захватить ReentrantLock, что невозможно сделать при захвате обычной блокировки Java. Также ReentrantLock предоставляет такие возможности, отсутствующие у стандартных блокировок, как захват блокировки только при условии, что она не захвачена другим потоком (метод tryLock()), и попытка захвата с тайм-аутом (метод tryLock(long timeout, TimeUnit unit)).

Кроме этого, главной особенностью встроенных блокировок Java (блокировок, основанных на использовании ключевого слова synchronized) является то, что захват и освобождение происходят в рамках синтаксического блока, то есть блокировка автоматически освобождается при выходе из synchronized метода илиsynchronized блока. Такая реализация предотвращает потенциальные ошибки кодирования, но налагает ограничения на алгоритмы синхронизации, которые можно реализовать с их помощью, захват же и освобождение ReentrantLock происходит при помощи методов lock() и unlock(), то есть захват и освобождение не ограничены синтаксическим блоком.

Очевидно, что реализация этой функциональности невозможна без поддержки виртуальной машины и операционной системы. Большинство классов пакетаjava.util.concurrent, обеспечивающих межпоточную синхронизацию, созданы с использованием фреймворка, построенного на основе классаjava.util.concurrent.locks.AbstractQueuedSynchronizer. Этот фреймфорк подробно описан в статье "The java.util.concurrent Synchronizer Framework", здесь я приведу краткий обзор особенностей этого фреймворка.

Прежде чем приступить к описанию особенностей реализации пакета java.util.concurrent, необходимо немного отвлечься и рассказать о методе организации межпоточного взаимодействия, основанном на неблокирующих алгоритмах, именно благодаря неблокирующим алгоритмам классы пакета java.util.concurrent, используемые для синхронизации, предоставляют расширенную по сравнению со стандартными блокировками функциональность, а также обладают лучшей производительностью и масштабируемостью.

Традиционный подход для синхронизации потоков, работающих с общими данными, заключается в использовании так называемых критических секций - участков кода, которые не могут исполняться несколькими потоками одновременно. В случае Java этот подход реализован на основе synchronized методов и блоков: поток, вошедший в synchronized метод, может в рамках этого метода спокойно работать с данными, разделяемыми с другими потоками, так как ни один другой поток не сможет в это же время войти в этот метод, а говоря точнее, другие потоки, попытавшиеся сделать это, будут заблокированы до тех пор, пока первый поток не покинет synchronized метод.

У традиционного подхода, использующего критические секции и блокирование потоков, есть несколько недостатков, и, возможно, главным из них является производительность. Если блокировка не была захвачена другим потоком, то операция захвата этой блокировки в современных виртуальных машинах выполняется очень быстро. Однако если блокировка захвачена другим потоком, то в этом случае должен быть задействован сравнительно дорогой механизм блокирования и последующего разблокирования потока. Также очень часто критические секции выполняют очень небольшой объём действий (например, просто увеличивают значение счётчика), при этом время, затрачиваемое на блокирование/разблокирование потока, оказывается гораздо больше по сравнению со временем работы критической секции, и в таких случаях было бы хорошо иметь более легковесный способ синхронизации. Помимо этого, при использовании синхронизации, основанной на блокировании, часто возникают такие проблемы, как взаимоблокировки (deadlocks) и инверсия приоритетов (priority inversion) (инверсия приоритетов возникает в том случае, если высокоприоритетный поток приостанавливается из-за того, что он не может получить блокировку, захваченную низкоприоритетным потоком, в этом случае высокоприоритетный поток не может продолжить выполнение до тех пор, пока блокировка не освобождена, то есть на время ожидания его приоритет как бы понижается до приоритета потока, удерживающего блокировку).

К счастью, существует другой способ организации межпоточного взаимодействия. Синхронизацию можно реализовать и без блокирования потоков, на основе атомарных операций сравнения и изменения значения переменной. Практически все современные процессоры предоставляют команды для обеспечения межпоточной работы, так например, в процессорах SPARC реализована команда compare-and-swap (CAS) (сравнение и замена). Команда CAS принимает три аргумента: адрес ячейки памяти (V), ожидаемое старое значение (A) и новое значение (B). Процессор записывает значение B в ячейку V, но только в том случае, если ячейка памяти содержит ожидаемое значение A, в противном случае запись произведена не будет. В обоих случаях команда возвращает значение, находящееся в ячейке V на момент вызова команды. По результату, возвращаемому командой CAS, поток может судить, успешно или нет прошла модификация переменной. Если CAS закончилась неудачно (то есть запись не была произведена), значит, какой-то другой поток успел изменить значение этой переменной после того, как текущий поток считал её значение.

Основное отличие использования команды CAS для синхронизации от использования критических секций и блокирования потоков заключается в следующем: если несколько потоков одновременно пытаются изменить значение общей переменной, используя команду CAS, то операция завершается успешно только для одного потока, однако остальные потоки, проигравшие в этом соревновании за изменение, не приостанавливаются (как это произошло бы в результате неудачной операции захвата блокировки), вместо этого поток просто получает информацию о том, что операция изменения переменной прошла неудачно, и так как поток не блокирован, то он может самостоятельно решить, какое действие ему теперь предпринять: возможно, попытаться изменить переменную ещё раз, может быть, выполнить какие-либо другие действия или же вообще ничего не делать (в алгоритмах на основе CAS выбор последнего варианта не редок, так как неудача операции CAS может значить, что какой-то другой поток уже выполнил работу, которую вы собирались сделать).

Поскольку операция CAS поддерживается аппаратно, то её использование вносит гораздо меньше накладных расходов, чем задействование более сложных механизмов операционной системы или виртуальной машины для блокирования и разблокирования потоков. Помимо этого, при использовании CAS сужается область, в которой потенциально может возникнуть состязание между потоками за использование общего ресурса, как правило, эта область ограничена использованием всего лишь одной общей переменной (при использовании традиционного подхода эта область обычно целый synchronized метод). То есть в этом случае уменьшается вероятность состязания за общие данные, и поток имеет гораздо больше шансов обработать общие данные без возникновения конфликтов, и, следовательно, без лишних задержек. В случае же, если конфликт всё-таки возникает, то время на обработку неудачного вызова CAS гораздо меньше времени, требуемого на блокирование и разблокирование потока.

Вот как мог бы быть реализован простой счётчик на основе CAS, без использования ключевого слова synchronized:
// класс, предоставляющий доступ к команде CAS
class IntegerValueUsingCAS {
    private int value;

    // возвращает 'true', если изменение прошло успешно
    public boolean compareAndSet(int expectedValue, int newValue) {
    // попытаться изменить значение 'value', используя CAS
    }

    public int getValue() {
        return value;
    }
}

class CasCounter {
    private IntegerValueUsingCAS value;

    public int getValue() {
        return value.getValue();
    }

    public void increment() {
        int oldValue = value.getValue();
        while (!value.compareAndSet(oldValue, oldValue + 1))
            oldValue = value.getValue();
    }
}

При реализации метода CasCounter.increment используется традиционный для алгоритмов на основе CAS подход: получить старое значение общей переменной, вычислить новое значение и попытаться изменить переменную с помощью CAS. В случае неудачи команды CAS действие немедленно повторяется ещё раз.

Как правило, алгоритмы на основе CAS используют так называемый оптимистический подход - поток сначала пытается выполнить действия с общими данными, и если результат команды CAS показывает, что логика работы была нарушена из-за того, что какой-то другой поток одновременно модифицирует те же данные, то необходимо предпринять действия для восстановления логики работы. Приведённый пример со счётчиком довольно простой, и алгоритм работы метода incrementочевиден, но сложность алгоритмов на основе CAS резко возрастает, если потоку необходимо атомарно модифицировать не одну, а две переменные, так как в этом случае поток должен последовательно выполнить две команды CAS. В том случае, если первая команда заканчивается успешно, а вторая нет, то общие данные оказываются в противоречивом состоянии, также надо учитывать то, что поток может начать работу с общими данными в тот момент, когда другой поток находится в середине процесса их модификации. Основная сложность алгоритмов, использующих CAS, это обеспечение непротиворечивости общих данных в процессе их модификации сразу несколькими потоками, для этого необходимо обнаруживать описанные ситуации и корректно их обрабатывать, что может быть очень непросто (в качестве примера можно посмотреть на реализацию связного списка на основе CAS, приведённую в статье "Introduction to nonblocking algorithms"). Вообще же разработка алгоритмов это довольно специфическое занятие, которое лучше всего оставить специалистам, а люди, занимающиеся решением конкретных прикладных задач, могут пользоваться уже готовыми классами пакета java.util.concurrent.

Помимо сложности программирования алгоритмов на основе CAS, у этого механизма синхронизации есть и другие недостатки. Среди них, например, возможность возникновения так называемого livelock'а. При возникновении livelock'а поток, в отличии от ситуации deadlock'а, не заблокирован, но не может продолжить работу, так как постоянно пытается выполнить операцию, которая всё время заканчивается неудачей. Например, при очень большом числе потоков, использующих CasCounter из нашего примера, и при определённом стечении обстоятелств, один из потоков может 'застрять', выполняя метод increment, так как для него операция value.compareAndSet всё время будет возвращать false из-за того, что CasCounter постоянно изменяется другими, более удачливыми, потоками.

Кроме того, операция CAS может увеличивать трафик на межпроцессорной шине. Некоторые процессоры могут от этого страдать, а некоторые -- нет, за счет разделенного L2$.

Итак, узнав, что такое неблокирующие алгоритмы и в чём их преимущества и недостатки, вернёмся в фреймворку, лежащему в основе пакетаjava.util.concurrent.

Важной частью пакета java.util.concurrent являются классы, используемые для обеспечения синхронизации (имеются в виду такие классы как, например, эксклюзивная блокировка (Lock) или семафор (Semaphore)). Общий вид алгоритмов работы таких классов довольно прост, так, например, операция захвата потоком объекта синхронизации происходит следующим образом:
while (состояние объекта не позволяет захват) {
    поместить текущий поток в очередь потоков, ожидающих захват данного объекта
    заблокировать текущий поток
}
если текущий поток был помещён в очередь ожидающих потоков, удалить его из очереди

а это алгоритм операции освобождения:
обновить состояние объекта
if (состояние объекта позволяет другому потоку захватить объект) {
    разблокировать один из потоков, находящихся в очереди ожидания данного объекта
}

Поскольку общий вид этих алгоритмов для разных типов объектов синхронизации один и тот же, было принято решение создать абстрактный базовый класс (AbstractQueuedSynchronizer), инкапсулирующий эти общие алгоритмы (и скрывающий сложность их реализации от прикладных программистов), а его подклассы должны переопределить несколько относительно простых методов, отвечающих за логику работы конкретного объекта синхронизации. Хотя большинству разработчиков не придётся использовать AbstractQueuedSynchronizer напрямую, знание того, как устроены объекты синхронизации, может помочь при их использовании.

Если проанализировать приведённые выше алгоритмы, то можно обнаружить, что для реализации необходимой функциональности, классуAbstractQueuedSynchronizer требуются следующие возможности:
  • атомарная модификация состояния объекта синхронизации
  • возможность блокирования/разблокирования потоков
  • организация очередей потоков

Поддержка этой функциональности была реализована следующим образом:
  • Атомарная модификация: поддержка атомарной модификации состояния реализована без использования встроенных блокировок Java. Вместо этого в рамках JSR 166 был создан пакет java.util.concurrent.atomic, предоставляющий доступ к командам CAS. Возможности команд CAS использовались в самой виртуальной машине и до этого, но с появлением java.util.concurrent.atomic эта функциональность представлена в виде набора обычных классов (таких как, например, AtomicInteger). Также надо отметить, что на чтение и запись атомарных переменных, представляемых классами пакетаjava.util.concurrent.atomic, налагаются те же требования видимости, что накладываются моделью памяти Java на чтение и запись volatileпеременных.
  • Блокирование/разблокирование потоков: до реализации java.util.concurrent платформа Java не предоставляла API для блокирования/разблокирования потоков, который был бы пригоден для создания объектов синхронизации. Единственные доступные методы -Thread.suspend/Thread.resume неприменимы, так как если поток, желающий разблокировать другой поток, вызывает для него метод resume раньше, чем блокирующий поток вызвал метод suspend, то вызов resume не будет иметь никакого эффекта (это значит, что методы Thread.suspend/Thread.resumeне могут быть безопасно использованы для реализации выше приведённых алгоритмов захвата/освобождения объекта синхронизации). Для решения этой проблемы в пакет java.util.concurrent.locks был добавлен класс LockSupport. Класс LockSupport ассоциирует с каждых потоком так называемое 'разрешение на выполнение', вызов LockSupport.park() блокирует текущий поток до тех пор, пока разрешение на выполнение не получено, а методLockSupport.unpark(Thread thread) устанавливает разрешение на выполнение. Таким образом, методы park/unpark лишены упомянутого недостатка методов Thread.suspend/Thread.resume. Кроме этого, вызов park можно прервать с помощью Thread.interrupt, а также LockSupport предоставляет несколько версий метода park с поддержкой тайм-аута. Именно за счёт возможностей класса LockSupport объекты синхронизации пакетаjava.util.concurrent предоставляют функциональность, отсутствующую у встроенных блокировок.
  • Организация очередей потоков: класс AbstractQueuedSynchronizer является важнейшей частью всего java.util.concurrent, так как он напрямую или косвенно используется практически всеми классами этого пакета, поэтому алгоритмы, используемые этим классом, должны быть максимально эффективны. В связи с этим было принято решение реализовать организацию очередей (а более конкретно: блокирование потоков и помещения заблокированных потоков в очередь ожидания, а также разблокирование потоков и удаление потоков из очереди ожидания после изменения статуса объекта синхронизации) без использования synchronized методов и блоков, а вместо этого используются алгоритмы на основе операций CAS. Это, пожалуй, самая сложная для понимания часть фреймворка, так как используемые алгоритмы весьма нетривиальны, но к счастью, вся сложная логика работы скрыта внутри класса AbstractQueuedSynchronizer, и для его практического использования разбираться в этих алгоритмах совсем не обязательно (те же, кто хочет детально изучить работу этого класса, могут найти всю необходимую информацию в статье "The java.util.concurrent Synchronizer Framework", а также в документации и исходном коде класса AbstractQueuedSynchronizer). Как упоминалось выше, использование CAS вместо блокирования позволяет добиться лучшей производительности и масштабируемости, и поскольку 'сердце' фреймворка, лежащего в основе java.util.concurrent, реализовано именно на основе CAS, это позволяет добиться отличных показателей производительности всем классам пакета.

Теперь посмотрим, как класс AbstractQueuedSynchronizer может использоваться на практике для создания объектов синхронизации. Для этого попробуем реализовать класс OneShotLatch (защёлка) на основе описанного фреймворка. Этот класс имеет два метода: await и signal, которые фактически являются методами захвата и освобождения объекта синхронизации. Изначально защёлка закрыта, и поток, вызывающий метод await, блокируется до тех пор, пока защёлка не открыта. Метод signal открывает защёлку и позволяет всем заблокированным потокам продолжить выполнение.

Обратимся ещё раз к приведённым выше алгоритмам захвата и освобождения объекта синхронизации. При выполнении операции захвата объекта синхронизации успех или неудача этой операции зависит от состояния объекта (например, в нашем случае у защёлки всего два состояния: защёлка закрыта или открыта), если объект был успешно захвачен, то состояние объекта может быть изменено, для того, чтобы запретить другим потокам захват этого же объекта. КлассAbstractQueuedSynchronizer хранит состояние объекта в переменной типа int, подклассы могут получить и модифицировать состояние, используя методыgetState, setState и compareAndSetState. В рамках рассматриваемого фреймворка для реализации этого алгоритма класс OneShotLatch должен переопределить метод AbstractQueuedSynchronizer.tryAcquireShared, этот метод должен проверить состояние защёлки и, в зависимости от результата проверки, вернуть значение > 0, если захват удался и значение <>
protected int tryAcquireShared(int ignored) {
    /*
    * Состояние 1 значит, что защёлка открыта, в этом случае операция
    * захвата успешна 
    */
    return (getState() == 1) ? 1 : -1;
}

Метод освобождения объекта синхронизации должен модифицировать состояние объекта и, возможно (в зависимости от состояния), разблокировать потоки, находящиеся в очереди ожидания. За логику освобождения объекта отвечает метод AbstractQueuedSynchronizer.tryReleaseShared, который должен вернуть значение true, если потоки, находящиеся в очереди ожидания, могут быть разблокированы. Для класса OneShotLatch метод tryReleaseSharedвыглядит так:
protected boolean tryReleaseShared(int ignored) {
    /*
    * Устанавливаем состояние 1 - защёлка открыта
    */
    setState(1);
    return true;
}

Классу OneShotLatch необходимо реализовать лишь два этих метода, всё остальное будет сделано классом AbstractQueuedSynchronizer.

Вот полный листинг класса OneShotLatch (обратите внимание, что OneShotLatch не наследует напрямую от AbstractQueuedSynchronizer, это сделано для того, чтобы скрыть public методы класса AbstractQueuedSynchronizer от клиентов OneShotLatch):
class OneShotLatch {

    private final Sync sync = new Sync();

    /*
     * Методы await() и signal() вызывают соответствующие методы класса
     * AbstractQueuedSynchronizer
     */

    public void await() throws InterruptedException {
        sync.acquireSharedInterruptibly(0);
    }

    public void signal() {
        sync.releaseShared(0);
    }

    static private class Sync extends AbstractQueuedSynchronizer {
        protected int tryAcquireShared(int ignored) {
            /*
             * Состояние 1 значит, что защёлка открыта, в этом случае
             * операция захвата успешна
             */
            return (getState() == 1) ? 1 : -1;
        }

        protected boolean tryReleaseShared(int ignored) {
            /*
             * Устанавливаем состояние 1 - защёлка открыта
             */
            setState(1);
            return true;
        }
    }
}

Как можно увидеть из примера, вся работа с потоками выполняется классом AbstractQueuedSynchronizer, а его подклассам необходимо лишь реализовать минимальный набор методов, отвечающих за логику работы конкретного объекта синхронизации, и именно так реализовано большинство объектов синхронизации пакета java.util.concurrent.

Чтобы убедиться, что всё сказанное о преимуществах синхронизации при помощи классов java.util.concurrent на самом деле правда, можно взглянуть на данные, приведённые в статье "More flexible, scalable locking in JDK 5.0", написанной в октябре 2004 года. Сравнивалась синхронизация с использованием класса ReentrantLock и на основе конструкции synchronized в JVM версии 5.0. На приведённых графиках очень хорошо видно, как при использованииsynchronized производительность резко падает при увеличении числа потоков, в случае же с ReentrantLock производительность падает более плавно (а в случае с двуядерной машиной увеличение числа потоков может даже привести к увеличению производительности). Но, благодаря усилиям Дэйва Дайсаситуация могла измениться. Но это уже тема отдельного поста.

Думаю, что на этом примере рассказ о внутреннем устройстве java.util.concurrent можно закончить (хотя надо отметить, что некоторые важные возможности пакета, например, фреймворк Executor, не были упомянуты). Надеюсь, что этот небольшой обзор убедил вас, что JSR 166 это не просто набор оптимизированных классов коллекций, а принципиально новый для Java подход к организации межпоточного взаимодействия.

Ссылки:

Практически вся информация данной статьи взята из двух источников:

Ещё несколько статей, имеющих отношение к рассматриваемому вопросу:

Семён Бойков

1 комментарий: