0%

计数器异步打印重置之线程安全

本文探讨计数器异步打印重置场景中的线程安全。

示例代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

public class CounterAsyncPrintAndResetExample {

private static ScheduledExecutorService threadPool = Executors.newScheduledThreadPool(10, new ThreadFactory() {

public Thread newThread(Runnable r) {
return new Thread(r);
}
});


private AtomicInteger counter = new AtomicInteger(0);

{
stat();
}

public synchronized void stat() {
threadPool.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
AtomicInteger oldCounter = counter; //1

counter = new AtomicInteger(0); //2

System.out.println(oldCounter.get()); //3
}
}, 3, 3, TimeUnit.MINUTES);
}

/**
* 工作线程持续调用,模拟“来一个请求调用一次的场景”
*/
public void invoke(Object query) {
counter.incrementAndGet();
}
}

在上述代码中存在线程安全问题:

  • 情形1:在//3处打印时,由于“狭义可见性”问题,存在“一些工作线程执行时,counter变量指向旧计数器实例”的可能,此时打印的计数器计数值是偏少的
  • 情形2:存在“//2和//3重排序”可能,此时//3处打印时,工作线程仍然在使用旧计数器实例进行计数,此时打印的计数器计数值是偏少的

针对以上问题,经过分析可以发现:给counter变量加上volatile修饰符,可以解决“情形1”的线程安全问题,因为“根据volatile变量的读写语义,//2执行完之后,所有线程的counter变量指向新的AtomicInteger实例对象,如果此时假定//2和//3不存在重排序,那么就不存在线程安全问题”;而对于“情形2”的线程安全问题则于事无补。因此,为了彻底解决上述线程安全问题,只能通过加锁来解决,根据“正常计数多,打印重置计数少”的特点,显然适合使用读写锁。
改进后代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

public class CounterAsyncPrintAndResetExample {

private static ScheduledExecutorService threadPool = Executors.newScheduledThreadPool(10, new ThreadFactory() {

public Thread newThread(Runnable r) {
return new Thread(r);
}
});


private ReadWriteLock rwLock = new ReentrantReadWriteLock();

private AtomicInteger counter = new AtomicInteger(0);

{
stat();
}

public synchronized void stat() {
threadPool.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
rwLock.writeLock().lock();
try {
AtomicInteger oldCounter = counter; //1

counter = new AtomicInteger(0); //2

System.out.println(oldCounter.get()); //3
} finally {
rwLock.writeLock().unlock();
}
}
}, 3, 3, TimeUnit.MINUTES);
}

/**
* 工作线程持续调用,模拟“来一个请求调用一次的场景”
*/
public void invoke(Object query) {
rwLock.readLock().lock();

try {
counter.incrementAndGet();
} finally {
rwLock.readLock().unlock();
}
}
}
您的支持将鼓励我继续分享!