`

java线程池Executor简单学习与解析

    博客分类:
  • java
阅读更多
线程池与对象池的学习中,个人感觉线程池是将线程转移到内部一直在运行在容器中的线程中来运行,减少的是线程run时间,而不是创建时间,将其引用至新线程,而不需要重新分配资源

1.线程池的类与接口关系构造

Executor-->Executoservice->AbstractExcetorService->ThreadpoolExecutor

Executor->ExecutorService->SchedulEXecutorService+ThreadpoolExceutor->ScheduleThreadpoolExecutor

Executors.new***ThreadPoolExecutor(...)根据不同参数来构造线程池:主要分为5种

newfixThreadPoolExecutor,newSingleThreadExecutor ,newCachedThreadPool以threadpoolexecutor构造完成

newSingleThreadScheduledExecutor,newThreadScheduledExecutor以ScheduleThreadpoolExecutor()构造完成

2.线程池的一些内部源码查看

A.ThreadPoolExecutor()线程池

内 部容器使用blockingQueue,使用Condition配合reentrantlock来监听对象的状态信息,从而实现Queue的消费-生产模 式的构建实现,也保证了线程安全,内部常量使用atomic(基于CAS算法实现原子性操作安全,AMINO框架提供更多类型)或者Static

B.ScheduleThreadPoolExecutor()

是在Threadpoolexecutor基础上实现,主要区别为可以周期内定时执行Runnable任务

内部容器使用DelayedWorkQueue对blockingQueue进行代理使用进行封装,里面使用RunnableScheduledFuture这个接口,能够执行线程,返回future,设置执行时间(可能不准确),使用future模式,Futuretask进行异步结果计算,Callable(Task)类似于thread任务执行单元,但是可以返回参数,实现future模式,所有执行线程的方法均调用schedule(..),

以下是来源于jdk API中一个带方法的类,它设置了 ScheduledExecutorService ,在 1 小时内每 10 秒钟蜂鸣一次:

import static java.util.concurrent.TimeUnit.*;
class BeeperControl {
    private final ScheduledExecutorService scheduler =
       Executors.newScheduledThreadPool(1);

    public void beepForAnHour() {
        final Runnable beeper = new Runnable() {
                public void run() { System.out.println("beep"); }
            };
        final ScheduledFuture<?> beeperHandle =
            scheduler.scheduleAtFixedRate(beeper, 10, 10, SECONDS);<!---设置->
        scheduler.schedule(new Runnable() {
                public void run() { beeperHandle.cancel(true); }
            }, 60 * 60, SECONDS);
    }


线程安全控制

public int drainTo(Collection<? super Runnable> c) {
            if (c == null)
                throw new NullPointerException();
            if (c == this)
                throw new IllegalArgumentException();
            final ReentrantLock lock = this.lock;//重入锁
            lock.lock();
            try {
                RunnableScheduledFuture first;
                int n = 0;
                while ((first = pollExpired()) != null) {
                    c.add(first);
                    ++n;
                }
                return n;
            } finally {
                lock.unlock();
            }
        }
在ScheduleThreadPoolExecutor.invokeall(Callable()任务列表) 返回future列表代表返回结果集
.submit(Callbale<T>/Runnable)提交一个返回值的任务用于执行,返回一个表示任务的未决结果的 Future。体现使用Callable和future模式的优势
一般执行使用void Execute(Runnable())
内部周期定时操作的实现:
DelayedWorkQueue延时调用比如在poll方法中long timeout, TimeUnit unit,传入时间参数,延时的实现,使用对线程对象的锁机制下关联的
Condition available.awaitNanos(nanos)进行控制.可能不准确,个人整体去看了一遍,但是一步一步实现困难,各种继承和调用关系图实在有点困难,以下是poll()带代码


        public RunnableScheduledFuture poll(long timeout, TimeUnit unit)
            throws InterruptedException {
            long nanos = unit.toNanos(timeout);
            final ReentrantLock lock = this.lock;
            lock.lockInterruptibly();
            try {
                for (;;) {
                    RunnableScheduledFuture first = queue[0];
                    if (first == null) {
                        if (nanos <= 0)
                            return null;
                        else
                            nanos = available.awaitNanos(nanos);
                    } else {
                        long delay = first.getDelay(TimeUnit.NANOSECONDS);
                        if (delay <= 0)
                            return finishPoll(first);
                        if (nanos <= 0)
                            return null;
                        if (nanos < delay || leader != null)
                            nanos = available.awaitNanos(nanos);
                        else {
                            Thread thisThread = Thread.currentThread();
                            leader = thisThread;
                            try {
                                long timeLeft = available.awaitNanos(delay);
                                nanos -= delay - timeLeft;
                            } finally {
                                if (leader == thisThread)
                                    leader = null;
                            }
                        }
                    }
                }
            } finally {
                if (leader == null && queue[0] != null)
                    available.signal();
                lock.unlock();
            }
        }



线程池scheduleThreadPoolExecutors
分为几个模块 处理线程(真是去执行,复用的线程),工作队列(workdelayqueue) ,executor设置的一些策略 ,执行任务单元scheduledfututretask

1处理线程,这个倒是没有什么特别的地方,思想就是复用线程,但是说在用的时候,设置下对于有界且当前队列不能容纳时的策略handler(jdk具体提供实现ThreadPoolExecutor.AbortPolicy, ThreadPoolExecutor.CallerRunsPolicy, ThreadPoolExecutor.DiscardOldestPolicy, ThreadPoolExecutor.DiscardPolicy),之前倒是看到过对于当前工作队列不能容纳时,采纳双队列(添加预备队列,给工作队列使用,但是也有满的时候,看应用场景)
2.工作队列,workdelayqueue(里面包含子属性priorityqueue),对于虽然之前对于delayqueue有看过,但也是扫了眼。总的来说,使用delyaedqueue进行延时控制,取队列子元素在priorityqueue(存在优先级的原因)。比如dealyedqueue现在从priorityqueue取一个工作节点(如果没有,说明工作队列已经为空,肯定在一直在等待去获取节点),肯定要判断,拿出优先级最高的节点,然后使用getDelay(TimeUnit.NANOSECONDS)获取间隔时间,再按照dealyedqueue 的延时实现办法,reeentrantloack加conditiond.awaitNanos();执行完成之后(其中使用condition来进行控制,因为本身是假等待,不释放cpu的,可以相应中断等操作,这里是因为在等待的过程中,如果优先列队发生更改的话,可能要去执行其他的任务单元,那么这时候就需要停止当前等待执行,转而去执行优先级高的,则问题就是当前线程要去响应级别更高的线程,要去通知它,唤醒它,加上之前使用重入锁,正好也用上condition进行控制,一般情况下synchornized与object.notify/wait结合使用,lock与condition结合使用) ,相当于已经完成一次延时调度执行了,那么周期性执行怎么搞呢?在外面做一次判断是否为周期性,片段时间做一次计算,得出下次执行的距离时间,并添加到workqueue中,这一步其实也是在线程池执行的时候前一起执行的
3.executors的执行策略:其实就是线程池大小,类型,线程工程这些,使用的时候用的多
4.执行任务单元,schedulefuturework,可以说是具备future的特性,也可以去执行线程,callback,能周期执行,delay接口处理延时,comparable处理在有限队列中延时比较,从字面意思来看也是能知道的future的主要用于处理异步计算,有需要调用的时候获取处理结果(这个没有处理完,就去调用返回是null还是会阻塞,这个具体要看,像本身调用future是阻塞,有些框架或者中间件是返回null的),callback类似于线程,任务执行单元,但是具备有返回结果,一般结合future使用。


然后,就是对于executors每次操作都是submit或者execute,然后不管是有没有处理完成返回值,都要专门建一个future对象进行追踪,使用
CompletionService可以简化这一操作,提交和处理返回(你只需要等到有处理完future之后就行相当于等待执行不管是哪个future,当然如果你要进行细致或者每个future不一样要么你做一次判断,要么就不用这个方式)

public void run() {
        while (!end) {
            try {
                Future<String> result = service.poll(20, TimeUnit.SECONDS);
                if (result != null) {
                    String report = result.get();
                    System.out.printf("ReportReceiver: Report Received:%s\n", report);
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
        System.out.printf("ReportSender: End\n");
    }

对于timertask timer相当于一个管理者,里面有线程去工作(同步单线程),延时处理时使用计算出当前时间和相对时间,比较两个值,当不相等时就重复判断,然后执行,并且使用对象的notify与wait来进行通行进行队列下一个任务处理

可以参考这个博客 讲了整个运行过程非常好
http://ju.outofmemory.cn/entry/25473
讲了接口以及对应的运行情况和流程,很不错
http://shift-alt-ctrl.iteye.com/blog/1841088


代码均来自于OPEN-JDK中java.lang 和java.util.concurrent包

来源: <http://183.56.168.160:8080/blog/admin-index.do#article/article>
分享到:
评论

相关推荐

    线程池原理-ThreadPoolExecutor源码解析

    线程池原理-ThreadPoolExecutor源码解析 1.构造方法及参数 2.阻塞对列: BlockingQueue 3.线程工厂: DefaultThreadFactory 4.拒绝策略: RejectedExecutionHandler 5.执行线程 Executor

    Java并发编程原理与实战

    单例问题与线程安全性深入解析.mp4 理解自旋锁,死锁与重入锁.mp4 深入理解volatile原理与使用.mp4 JDK5提供的原子类的操作以及实现原理.mp4 Lock接口认识与使用.mp4 手动实现一个可重入锁.mp4 ...

    龙果 java并发编程原理实战

    第3节解析多线程与多进程的联系以及上下文切换所导致资源浪费问题 [免费观看] 00:13:03分钟 | 第4节学习并发的四个阶段并推荐学习并发的资料 [免费观看] 00:09:13分钟 | 第5节线程的状态以及各状态之间的转换详解...

    龙果java并发编程完整视频

    第3节解析多线程与多进程的联系以及上下文切换所导致资源浪费问题 [免费观看] 00:13:03分钟 | 第4节学习并发的四个阶段并推荐学习并发的资料 [免费观看] 00:09:13分钟 | 第5节线程的状态以及各状态之间的转换详解...

    Java 并发编程原理与实战视频

    第3节解析多线程与多进程的联系以及上下文切换所导致资源浪费问题 [免费观看] 00:13:03分钟 | 第4节学习并发的四个阶段并推荐学习并发的资料 [免费观看] 00:09:13分钟 | 第5节线程的状态以及各状态之间的转换详解...

    java并发编程

    第3节解析多线程与多进程的联系以及上下文切换所导致资源浪费问题 [免费观看] 00:13:03分钟 | 第4节学习并发的四个阶段并推荐学习并发的资料 [免费观看] 00:09:13分钟 | 第5节线程的状态以及各状态之间的转换详解...

    \java超强笔记(超级经典)

    泛型不能使用简单类型 GenList&lt;int&gt; nList = new GenList(); //编译错误 泛型类不能是异常类,也就是该泛型类不能继承自Throwable以及其子类 public class MyExpection&lt;T&gt; extends Exception{ } //...

    Java CP/IP Socket编程

    3.3 成帧与解析..........52 3.4 Java特定编码..........58 3.5 构建和解析协议消息..........59 3.5.1 基于文本的表示方法..........62 3.5.2 二进制表示方法..........65 3.5.3 发送和接收..........67 3.6 ...

    android开发艺术探索高清完整版PDF

    392 11.2.2 Async Task的工作原理 / 395 11.2.3 Handler Thread / 402 11.2.4 Intent Service / 403 11.3 Android中的线程池 / 406 11.3.1 Thread Pool Executor / 407 11.3.2 线程池的分类 / 410 第12章 ...

    Android开发艺术探索

    3.4.2 事件分发的源码解析 / 144 3.5 View的滑动冲突 / 154 3.5.1 常见的滑动冲突场景 / 155 3.5.2 滑动冲突的处理规则 / 156 3.5.3 滑动冲突的解决方式 / 157 第4章 View的工作原理 / 174 4.1 初识View...

    汪文君高并发编程实战视频资源全集

    │ 高并发编程第一阶段35讲、线程池原理与自定义线程池.mp4 │ 高并发编程第一阶段36讲、自定义个简单的线程池并且测试.mp4 │ 高并发编程第一阶段37讲、给线程池增加拒绝策略以及停止方法.mp4 │ 高并发编程第...

    汪文君高并发编程实战视频资源下载.txt

    │ 高并发编程第一阶段35讲、线程池原理与自定义线程池.mp4 │ 高并发编程第一阶段36讲、自定义个简单的线程池并且测试.mp4 │ 高并发编程第一阶段37讲、给线程池增加拒绝策略以及停止方法.mp4 │ 高并发编程第...

Global site tag (gtag.js) - Google Analytics