Synchronizers¶
Thread¶
Получение Thread объекта:
- static Thread
currentThread()Returns a reference to the currently executing thread object.
Thread(
ThreadGroupgroup, Runnabletarget, Stringname,
longstackSize, booleaninheritThreadLocals
)Allocates a new Thread object so that it has target as its run object, has the specified name as its name, belongs to the thread group referred to by group, has the specified stackSize, and inherits initial values for inheritable thread-local variables if inheritThreadLocals is true.
Все параметры опциональны.
ThreadGroup:
A thread group represents a set of threads. In addition, a thread group can also include other thread groups (thread groups form a tree).
- void
run()Представляет иной способ определения поведения потока.
Пример
class PrimeThread extends Thread { long minPrime; PrimeThread(long minPrime) { this.minPrime = minPrime; } public void run() { // compute primes larger than minPrime . . . } } PrimeThread p = new PrimeThread(143); p.start();Если поток был создан с использованием объекта Runnable, то вызывается метод
runиз Runnable; otherwise, this method does nothing and returns.
Контроль жизненного цикла потока:
- Thread.State
getState()Returns the state of this thread.
Thread.State
- NEW
A thread that has not yet started is in this state.
- RUNNABLE
A thread executing in the Java virtual machine is in this state.
- BLOCKED
A thread that is blocked waiting for a monitor lock is in this state.
- WAITING
A thread that is waiting indefinitely for another thread to perform a particular action is in this state.
- TIMED_WAITING
A thread that is waiting for another thread to perform an action for up to a specified waiting time is in this state.
- TERMINATED
A thread that has exited is in this state.
- void
start()Causes this thread to begin execution; the Java Virtual Machine calls the
runmethod of this thread (or Runnable).- void
interrupt()Interrupts this thread.
- static void
sleep(long millis)
static voidsleep(long millis, int nanos)Causes the currently executing thread to sleep (temporarily cease execution) for the specified number of milliseconds [plus the specified number of nanoseconds, subject to the precision and accuracy of system timers and schedulers].
TimeUnit.sleep
Вызов метода
sleep(long timeout)на одном из инстансов TimeUnit выполняетThrea.sleepиспользуя указанную еденицу времени.- void
join()
voidjoin(long millis)
voidjoin(long millis, int nanos)Waits for this thread to die.
- static void
yield()Подсказка к планировщику, что текущий поток готов уступить текущее использование процессорного времени.
Конфигурация потоков:
- void
setName(String name)Changes the name of this thread to be equal to the argument name.
- void
setPriority(int newPriority)Changes the priority of this thread.
- void
setDaemon(boolean on)Marks this thread as either a daemon thread or a user thread.
Расширенный список
- static void
setDefaultUncaughtExceptionHandler(
Thread.UncaughtExceptionHandlereh
)Set the default handler invoked when a thread abruptly terminates due to an uncaught exception, and no other handler has been defined for that thread.
- void
setUncaughtExceptionHandler(
Thread.UncaughtExceptionHandlereh
)Set the handler invoked when this thread abruptly terminates due to an uncaught exception.
- void
setContextClassLoader(ClassLoader cl)Sets the context ClassLoader for this Thread.
Получение информации о потоке:
- static boolean
interrupted()Tests whether the current thread has been interrupted.
- boolean
isInterrupted()Tests whether this thread has been interrupted.
- boolean
isAlive()Tests if this thread is alive.
- boolean
isDaemon()Tests if this thread is a daemon thread.
- long
getId()Returns the identifier of this Thread.
- String
getName()Returns this thread’s name.
- int
getPriority()Returns this thread’s priority.
- StackTraceElement[]
getStackTrace()Returns an array of stack trace elements representing the stack dump of this thread.
- static void
dumpStack()Prints a stack trace of the current thread to the standard error stream.
Ссылки:
Thread scheduling and wait/notify
How does Java (JVM) allocate stack for each thread (stackoverflow)
A thread is a thread of execution in a program. The Java Virtual Machine allows an application to have multiple threads of execution running concurrently.
Наследуется от Runnable.
Thread scheduling
Either the JVM or the underlying platform’s operating system deciphers how to share the processor resource among threads—a task known as thread scheduling. That portion of the JVM or operating system that performs thread scheduling is a thread scheduler.
Every thread has a priority. Threads with higher priority are executed in preference to threads with lower priority.
Memory managment
Each Java Virtual Machine thread has a private Java Virtual Machine stack, created at the same time as the thread.
Java thread stack in HotSpot is software managed, it is not an OS native thread stack.
The minimum stack size in HotSpot for a thread seems to be fixed. This is what the aforementioned -Xss option is for.
JVM может использовать метод, называемый escape-анализом, с помощью которого она может сказать, что определенные объекты остаются ограниченными одним потоком в течение всего их времени жизни, и что время жизни ограничено временем жизни данного стекфрейма. Такие объекты можно безопасно размещать в стеке, а не в куче.
Примитивы синхронизации. wait() / notify()¶
Определения:
Класс Object в Java содержит три final метода для взаимодействия потоков. Это методы wait(), notify() и notifyAll()
wait()Метод
wait()бесконечно ждет другой поток, пока не будет вызван методnotify()илиnotifyAll()на объекте. Другие две вариации методаwait()ставят текущий поток в ожидание на определенное время. По истечении этого времени поток просыпается и продолжает работу.notify()Пробуждает только один поток, после чего этот поток начинает выполнение. Если объект ожидают несколько потоков, то метод
notify()разбудит только один из них. Выбор потока зависит от системной реализации управления потоками.notifyAll()Пробуждает все потоки, хотя в какой последовательности они будут пробуждаться зависит от реализации ОС.
Для всех вышеприведенных методов требуется получение блокировки (synchronized) на мониторе:
IllegalMonitorStateException - if the current thread is not the owner of this object’s monitor.
Пример:
void waiting() {
synchronized (monitor) {
while(waitCondition) {
try {
monitor.wait();
} catch(InterruptedException e) {
...
}
}
}
}
void notifier() {
synchronized (monitor) {
waitCondition = false;
monitor.notify();
}
}
java.util.concurrent.Synchronizers¶
Синхронизаторы – вспомогательные утилиты для синхронизации потоков, которые дают возможность разработчику регулировать и/или ограничивать работу потоков и предоставляют более высокий уровень абстракции, чем основные примитивы языка (мониторы).
Semaphore¶
Синхронизатор Semaphore реализует шаблон синхронизации Семафор. Чаще всего, семафоры необходимы, когда нужно ограничить доступ к некоторому общему ресурсу.
CountDownLatch¶
Предоставляет возможность любому количеству потоков в блоке кода ожидать до тех пор, пока не завершится определенное количество операций, выполняющихся в других потоках, перед тем как они будут «отпущены»
CyclicBarrier¶
Циклический барьер является точкой синхронизации, в которой указанное количество параллельных потоков встречается и блокируется. Как только все потоки прибыли, выполняется опционное действие (или не выполняется, если барьер был инициализирован без него), и, после того, как оно выполнено, барьер ломается и ожидающие потоки «освобождаются».
Exchanger<V>¶
Exchanger (обменник) может понадобиться, для того, чтобы обменяться данными между двумя потоками в определенной точки работы обоих потоков.
Phaser¶
Phaser (фазер), как и CyclicBarrier, является реализацией шаблона синхронизации Барьер, но, в отличии от CyclicBarrier, предоставляет больше гибкости. Этот класс позволяет синхронизировать потоки, представляющие отдельную фазу или стадию выполнения общего действия. Как и CyclicBarrier, Phaser является точкой синхронизации, в которой встречаются потоки-участники. Когда все стороны прибыли, Phaser переходит к следующей фазе и снова ожидает ее завершения.
Locks¶
Представляет собой альтернативные и более гибкие механизмы синхронизации потоков по сравнению с базовыми synchronized, wait, notify, notifyAll.
Ссылки:
Lock¶
Lock — Базовый интерфейс из lock framework, предоставляющий более гибкий подход по ограничению доступа к ресурсам/блокам нежели при использовании synchronized.
ReentrantLock¶
ReentrantLock — Лок на вхождение. Только один поток может зайти в защищенный блок. Класс поддерживает «честную» (fair) и «нечестную» (non-fair) разблокировку потоков.
ReadWriteLock¶
ReadWriteLock — Дополнительный интерфейс для создания read/write локов. Такие локи необычайно полезны, когда в системе много операций чтения и мало операций записи.
The read lock may be held simultaneously by multiple reader threads, so long as there are no writers. The write lock is exclusive.
Ссылки:
StampedLock¶
StampedLock поддерживает разбиение на readLock() и writeLock() и различные их модификации.
Однако, в отличие от ReadWriteLock, метод блокировки StampedLock возвращает “штамп” — значение типа long.
Этот штамп может использоваться в дальнейшем как для высвобождения ресурсов, так и для проверки состояния блокировки.
Warning
В StampedLock не реализована реентерантность.
Пример ReadWriteLock и StampedLock
ReadWriteLock |
StampedLock |
|---|---|
var executor = Executors.newFixedThreadPool(2);
var map = new HashMap<>();
var lock = new ReentrantReadWriteLock();
executor.submit(() -> {
lock.writeLock().lock();
try {
sleep(1);
map.put("foo", "bar");
} finally {
lock.writeLock().unlock();
}
});
Runnable readTask = () -> {
lock.readLock().lock();
try {
System.out.println(map.get("foo"));
sleep(1);
} finally {
lock.readLock().unlock();
}
};
executor.submit(readTask);
executor.submit(readTask);
stop(executor);
|
var executor = Executors.newFixedThreadPool(2);
var map = new HashMap<>();
var lock = new StampedLock();
executor.submit(() -> {
long stamp = lock.writeLock();
try {
sleep(1);
map.put("foo", "bar");
} finally {
lock.unlockWrite(stamp);
}
});
Runnable readTask = () -> {
long stamp = lock.readLock();
try {
System.out.println(map.get("foo"));
sleep(1);
} finally {
lock.unlockRead(stamp);
}
};
executor.submit(readTask);
executor.submit(readTask);
stop(executor);
|
Оптимистичная блокировка:
Оптимистичная блокировка для чтения, вызываемая с помощью метода tryOptimisticRead(), отличается тем, что она всегда будет возвращать “штамп” не блокируя текущий поток,
вне зависимости от того, занят ли ресурс, к которому она обратилась. В случае, если ресурс был заблокирован блокировкой для записи, возвращённый штамп будет равняться нулю.
В любой момент можно проверить, является ли блокировка валидной с помощью lock.validate(stamp)
Пример
ExecutorService executor = Executors.newFixedThreadPool(2);
StampedLock lock = new StampedLock();
executor.submit(() -> {
// В отличии от обычных блокировок для чтения, оптимистичная не запрещает
// другим потокам блокировать ресурс для записи
long stamp = lock.tryOptimisticRead();
try {
System.out.println("Optimistic Lock Valid: " + lock.validate(stamp));
// После захвата ресурса блокировка является валидной и оптимистичный поток отправляется спать
sleep(1);
// В это время другой поток блокирует ресурсы для записи, не дожидаясь окончания работы чтения
// Начиная с этого момента, оптимистичная блокировка перестаёт быть валидной
// (даже после окончания записи)
System.out.println("Optimistic Lock Valid: " + lock.validate(stamp));
sleep(2);
System.out.println("Optimistic Lock Valid: " + lock.validate(stamp));
} finally {
lock.unlock(stamp);
}
});
executor.submit(() -> {
long stamp = lock.writeLock();
try {
System.out.println("Write Lock acquired");
sleep(2);
} finally {
lock.unlock(stamp);
System.out.println("Write done");
}
});
stop(executor);
Вывод:
Optimistic Lock Valid: true
Write Lock acquired
Optimistic Lock Valid: false
Write done
Optimistic Lock Valid: false
Note
Таким образом, при использовании оптимистичных блокировок нужно постоянно следить за их валидностью (проверять её нужно уже после того, как выполнены все необходимые операции).
Преобразование:
Иногда может быть полезным преобразовать блокировку для чтения в блокировку для записи не высвобождая ресурсы.
В StampedLock это можно сделать с помощью метода tryConvertToWriteLock().
Пример
ExecutorService executor = Executors.newFixedThreadPool(2);
StampedLock lock = new StampedLock();
// В этом примере нужно прочитать значение переменной count и вывести его в консоль
// Однако, если значение равно нулю, нужно изменить его на 23
executor.submit(() -> {
long stamp = lock.readLock();
try {
if (count == 0) {
stamp = lock.tryConvertToWriteLock(stamp);
// В случае, если tryConvertToWriteLock() вызывается в тот момент,
// когда ресурс занят для записи другим потоком,
// текущий поток остановлен не будет, однако метод вернёт нулевое значение.
if (stamp == 0L) {
System.out.println("Could not convert to write lock");
// В таком случае можно вызвать writeLock() вручную.
stamp = lock.writeLock();
}
count = 23;
}
System.out.println(count);
} finally {
lock.unlock(stamp);
}
});
stop(executor);
Ссылки:
Появился в Java 8
Методы
- Неэксклюзивно захватывает лок на чтение:
long readLockInterruptibly(); long readLock(); long tryReadLock( long time, TimeUnit unit ); long tryReadLock();
- Проверка состояния лока:
boolean isWriteLocked(); boolean isReadLocked(); int getReadLockCount();
- Преобразования:
long tryConvertToWriteLock( long stamp ); long tryConvertToReadLock( long stamp ); long tryConvertToOptimisticRead( long stamp );
- Эксклюзивно захватывает лок на запись:
long writeLockInterruptibly(); long writeLock(); long tryWriteLock( long time, TimeUnit unit ); long tryWriteLock();
- Освобождение лока:
void unlock(long stamp); void unlockWrite(long stamp); boolean tryUnlockWrite(); void unlockRead(long stamp); boolean tryUnlockRead();
- Оптимистичная блокировка:
long tryOptimisticRead(); boolean validate(long stamp);
- Получение ReadWriteLock:
Lock asReadLock(); Lock asWriteLock(); ReadWriteLock asReadWriteLock();
Condition¶
Condition — Интерфейс, который описывает альтернативные методы стандарным wait/notify/notifyAll.
Condition чаще всего получается из локов через метод lock.newCondition(). Тем самым можно получить несколько комплектов wait/notify для одного объекта.
- void
await()
booleanawait(long time, TimeUnit unit)
longawaitNanos(long nanosTimeout)
voidawaitUninterruptibly()
booleanawaitUntil(Date deadline) Causes the current thread to wait until it is signalled or interrupted (if no
awaitUninterruptibly).Lock, связанный с этим Condition, снимается атомарно, и текущий поток становится неактивным и остается таким, пока не произойдет одно из четырех:
Какой-то другой поток вызывает метод
signal()для этого Condition, и текущий поток выберется для пробуждения.Какой-то другой поток вызывает метод
signalAll()для этого Condition.Какой-то другой поток прерывает (interrupt) текущий поток, при этом прерывание потока должно поддерживаться.
A “spurious wakeup” occurs. Поэтому нужно оборачивать в while блоки вызовы
awaitс проверкой условия выхода изawait.
Во всех случаях, прежде поток выйдет из ожидания, он должен вернуть Lock связанный с этим Condition. (When the thread returns it is guaranteed to hold this lock)
- void
signal()
voidsignalAll() Wakes up one/all waiting thread.
Реализация может (обычно) требовать, чтобы текущий поток удерживал Lock, связанный с этим Condition, когда вызывается этот метод.
Ссылки:
class BoundedBuffer<E> {
final Lock lock = new ReentrantLock();
final Condition notFull = lock.newCondition();
final Condition notEmpty = lock.newCondition();
final Object[] items = new Object[100];
int putptr, takeptr, count;
public void put(E x) throws InterruptedException {
lock.lock();
try {
while (count == items.length) // while защищает от spurious wakeup
// Если количество элементов больше максимального - ожидаем, пока элемент не будет изъят
notFull.await();
items[putptr] = x;
if (++putptr == items.length) putptr = 0;
++count;
// Сигнализируем ожидающий поток на получение элемента, когда он был пустой
notEmpty.signal();
} finally {
lock.unlock();
}
}
public E take() throws InterruptedException {
lock.lock();
try {
while (count == 0) // while защищает от spurious wakeup
// Если количество элементов 0, то ожидаем пока элемент не будет добавлен и вызван signal
notEmpty.await();
E x = (E) items[takeptr];
if (++takeptr == items.length) takeptr = 0;
--count;
// Сигнализируем, что элементов не максимальное количество, и теперь можно их добавить
notFull.signal();
return x;
} finally {
lock.unlock();
}
}
}