logo头像
Snippet 博客主题

可以暂停的线程池

本文于 851 天之前发表,文中内容可能已经过时。

问渠那得清如许,为有源头活水来

这段时间没有更新博客了,不是因为找到工作了而繁忙了,也不是在忙于找工作,而是趁着有空在家带娃了,没有输入哪来的输出。面试的时候最喜欢问线程池了,我遇到的题目有:

  • 线程池的工作原理
  • Executors 默认的队列为啥没有用arrayBlockingQueue
  • 如何实现将任务不塞入队列中,直接用线工作线程处理
  • 如何暂停线程池中的工作线程

如何实现暂停线程池的工作线程

其实这个在线程池源码里面有个例子,这里改造了一下,通过命令控制台输入命令控制线程池。原理是:
通过重载线程池提供的beforeExecute(),用lock+condition的wait()使得工作线程暂停。另外提供一个恢复方法,使用的是lock+condition的signalAll唤醒暂停的工作线程。

public class PauseThreadPool extends ThreadPoolExecutor {

    private boolean isPaused;

    private ReentrantLock pauseLock = new ReentrantLock();
    private Condition unpaused = pauseLock.newCondition();

    public PauseThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
    }

    //线程池执行任务之前
    protected void beforeExecute(Thread t, Runnable r) {
        pauseLock.lock();
        try {
            while (isPaused) {
                unpaused.await();
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            pauseLock.unlock();
        }


    }

    //暂停工作线程
    public void pause() {
        pauseLock.lock();
        try {
            isPaused = true;
        }finally {
            pauseLock.unlock();
        }


    }
    //恢复工作线程
    public void resume() {
        pauseLock.lock();
        try {
            isPaused = false;
            unpaused.signalAll();
        }finally {
            pauseLock.unlock();
        }
    }

    //线程池执行任务之前
    protected void afterExecute(Runnable r, Throwable t) {
        System.out.println("afterExecute  task:" + Thread.currentThread().getName());
    }

    //线程池终止时候调用
    protected void terminated() {
        System.out.println("terminated  task:" + Thread.currentThread().getName());
    }


    public static void main(String[] args) {
        PauseThreadPool pool = new PauseThreadPool(5, 10, 30, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(10));
        try {
            System.out.print("Enter a char :");
            while (true) {
                Scanner sc = new Scanner(System.in);
                int val = sc.nextInt();
                if (val == 13 || val == 10) {

                }
                if (val == 1) {//命令行输入1,暂停
                    pool.pause();
                } else if (val == 2) {//输入2 恢复
                    pool.resume();
                } else if (val == 0) {// 提交任务
                    pool.submit(new Callable<Object>() {
                        @Override
                        public Object call() throws Exception {
                            System.out.println("start  task:" + Thread.currentThread().getName());
                            return null;
                        }
                    });
                }else if (val == 3){//关闭线程池
                    pool.shutdown();
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

}