0%

计数器异步打印重置之线程安全

本文探讨计数器异步打印重置场景中的线程安全。

示例代码如下:

import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

public class CounterAsyncPrintAndResetExample {

    private static ScheduledExecutorService threadPool = Executors.newScheduledThreadPool(10, new ThreadFactory() {

        public Thread newThread(Runnable r) {
            return new Thread(r);
        }
    });


    private AtomicInteger counter = new AtomicInteger(0);

    {
        stat();
    }

    public synchronized void stat() {
        threadPool.scheduleAtFixedRate(new Runnable() {
            @Override
            public void run() {
                AtomicInteger oldCounter = counter;    //1

                counter = new AtomicInteger(0);        //2

                System.out.println(oldCounter.get());  //3
            }
        }, 3, 3, TimeUnit.MINUTES);
    }

    /**
     * 工作线程持续调用,模拟“来一个请求调用一次的场景”
     */
    public void invoke(Object query) {
        counter.incrementAndGet();
    }
}

在上述代码中存在线程安全问题:

  • 情形1:在//3处打印时,由于“狭义可见性”问题,存在“一些工作线程执行时,counter变量指向旧计数器实例”的可能,此时打印的计数器计数值是偏少的
  • 情形2:存在“//2和//3重排序”可能,此时//3处打印时,工作线程仍然在使用旧计数器实例进行计数,此时打印的计数器计数值是偏少的

针对以上问题,经过分析可以发现:给counter变量加上volatile修饰符,可以解决“情形1”的线程安全问题,因为“根据volatile变量的读写语义,//2执行完之后,所有线程的counter变量指向新的AtomicInteger实例对象,如果此时假定//2和//3不存在重排序,那么就不存在线程安全问题”;而对于“情形2”的线程安全问题则于事无补。因此,为了彻底解决上述线程安全问题,只能通过加锁来解决,根据“正常计数多,打印重置计数少”的特点,显然适合使用读写锁。
改进后代码如下:

import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

public class CounterAsyncPrintAndResetExample {

    private static ScheduledExecutorService threadPool = Executors.newScheduledThreadPool(10, new ThreadFactory() {

        public Thread newThread(Runnable r) {
            return new Thread(r);
        }
    });


    private ReadWriteLock rwLock = new ReentrantReadWriteLock();

    private AtomicInteger counter = new AtomicInteger(0);

    {
        stat();
    }

    public synchronized void stat() {
        threadPool.scheduleAtFixedRate(new Runnable() {
            @Override
            public void run() {
                rwLock.writeLock().lock();
                try {
                    AtomicInteger oldCounter = counter;    //1

                    counter = new AtomicInteger(0);        //2

                    System.out.println(oldCounter.get());  //3
                } finally {
                    rwLock.writeLock().unlock();
                }
            }
        }, 3, 3, TimeUnit.MINUTES);
    }

    /**
     * 工作线程持续调用,模拟“来一个请求调用一次的场景”
     */
    public void invoke(Object query) {
        rwLock.readLock().lock();

        try {
            counter.incrementAndGet();
        } finally {
            rwLock.readLock().unlock();
        }
    }
}
您的支持将鼓励我继续分享!