罗定市建设局网站,青岛城阳软件网站开发,wordpress缩略图顺序,外网下载一、简介在JDK并发包中有这么一个类ExecutorCompletionService#xff0c;提交任务后#xff0c;可以按任务返回结果的先后顺序来获取各任务执行后的结果。该类实现了接口CompletionService#xff1a;public interface CompletionServiceV {FutureV submit…一、简介在JDK并发包中有这么一个类ExecutorCompletionService提交任务后可以按任务返回结果的先后顺序来获取各任务执行后的结果。该类实现了接口CompletionServicepublic interface CompletionServiceV { FutureV submit(CallableV task); FutureV submit(Runnable task, V result); FutureV take() throws InterruptedException; FutureV poll(); FutureV poll(long timeout, TimeUnit unit) throws InterruptedException; }该接口定义了一系列方法提交实现了Callable或Runnable接口的任务并获取这些任务的结果。CompletionService接口定义了一组任务管理接口submit() - 提交任务take() - 获取任务结果poll() - 获取任务结果ExecutorCompletionService类是CompletionService接口的实现ExecutorCompletionService内部管理者一个已完成任务的阻塞队列ExecutorCompletionService引用了一个Executor用来执行任务submit()方法最终会委托给内部的executor去执行任务take/poll方法的工作都委托给内部的已完成任务阻塞队列如果阻塞队列中有已完成的任务take方法就返回任务的结果否则阻塞等待任务完成poll与take方法不同poll有两个版本:无参的poll方法 --- 如果完成队列中有数据就返回, 否则返回null有参数的poll方法 --- 如果完成队列中有数据就直接返回否则等待指定的时间到时间后如果还是没有数据就返回nullExecutorCompletionService主要用与管理异步任务 (有结果的任务任务完成后要处理结果)关于CompletionService和ExecutorCompletionService的类图如下ExecutorCompletionService实现了CompletionService内部通过Executor以及BlockingQueue来实现接口提出的规范。其中Executor由调用者传递进来而Blocking可以使用默认的LinkedBlockingQueue也可以由调用者传递。另外该类还会将提交的任务封装成QueueingFuture这样就可以实现FutureTask.done()方法以便于在任务执行完毕后将结果放入阻塞队列中。QueueingFuture为内部类private static class QueueingFutureV extends FutureTaskVoid { QueueingFuture(RunnableFutureV task, BlockingQueueFutureV completionQueue) { super(task, null); this.task task; this.completionQueue completionQueue; } private final FutureV task; private final BlockingQueueFutureV completionQueue; protected void done() { completionQueue.add(task); } }其中done()方法就是在任务执行完毕后将任务放入队列中。在提交任务时将任务封装成QueueingFuturepublic FutureV submit(CallableV task) { if (task null) throw new NullPointerException(); RunnableFutureV f newTaskFor(task); executor.execute(new QueueingFutureV(f, completionQueue)); return f; } public FutureV submit(Runnable task, V result) { if (task null) throw new NullPointerException(); RunnableFutureV f newTaskFor(task, result); executor.execute(new QueueingFutureV(f, completionQueue)); return f; }在调用take()、poll()方法时会从阻塞队列中获取Future对象以取得任务执行的结果。二、原理当我们向Executor提交一组任务并且希望任务在完成后获得结果此时可以考虑使用ExecutorCompletionService。ExecutorCompletionService实现了CompletionService接口。ExecutorCompletionService将Executor和BlockingQueue功能融合在一起使用它可以提交我们的Callable任务。这个任务委托给Executor执行可以使用ExecutorCompletionService对象的take和poll方法获取结果。ExecutorCompletionService的设计目的在于提供一个可获取线程池执行结果的功能这个类采用了装饰器模式需要用户提供一个自定义的线程池在ExecutorCompletionService内部持有该线程池进行线程执行在原有的线程池功能基础上装饰额外的功能。下面是ExecutorCompletionService的原理图1、在使用ExecutorCompletionService时需要提供一个自定义的线程池Executor构造ExecutorCompletionService。同时也可以指定一个自定义的队列作为线程执行结果的容器当线程执行完成时通过重写FutureTask#done()将结果压入队列中。2、当用户把所有的任务都提交了以后可通过ExecutorCompletionService#poll方法来弹出已完成的结果这样做的好处是可以节省获取完成结果的时间。下面是使用队列和不使用队列的流程对比从图中我们可以看出在使用队列的场景下我们可以优先获取到完成的线程当我们要汇总所有的执行结果时这无疑会缩减我们的汇总时间。而不使用队列时我们需要对FutureTask进行遍历因为我们不知道哪个线程先执行完了只能挨个去获取结果这样已经完成的线程会因为前面未完成的线程的耗时而无法提前进行汇总。如果算上汇总结果的耗时时间在使用队列的场景下我们可以在其他任务线程执行的过程中汇总已完成的结果节省汇总时间。不使用队列的场景下只用等到当前的线程执行完成才能汇总。代码演示public class ExecutorCompletionServiceTest { public static void main(String[] args) throws Exception { ExecutorService fixedThreadPool Executors.newFixedThreadPool(5); ExecutorCompletionService ecs new ExecutorCompletionService(fixedThreadPool); Future future ecs.submit(new Callable() { Override public Integer call() throws Exception { return ThreadLocalRandom.current().nextInt(100); } }); System.out.println(future future.get()); //用于取出最新的线程执行结果注意这里是阻塞的 //Future future1 ecs.take(); //System.out.println(future1 future1.get()); //用于取出最新的线程执行结果是非阻塞的如果没有结果就返回null Future future2 ecs.poll(); if (future2.isDone()) { System.out.println(future2.get()); } } } //多个线程先执行完的进阻塞队列然后可以按执行顺序获取结果 public class ExecutorCompletionServiceDemo { public static void main(String[] args) { //这里只是为了方便真正项目中不要这样创建线程池 ExecutorService executorService Executors.newFixedThreadPool(5); ExecutorCompletionServiceString completionService new ExecutorCompletionService(executorService); completionService.submit(() - { System.out.println(执行任务1开始); try { TimeUnit.SECONDS.sleep(5); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(执行任务1结束); return 任务1执行成功; }); completionService.submit(() - { System.out.println(执行任务2开始); try { TimeUnit.SECONDS.sleep(3); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(执行任务2结束); return 任务2执行成功; }); completionService.submit(() - { System.out.println(执行任务3开始); try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(执行任务3结束); return 任务3执行成功; }); for (int i 0; i 3; i) { try { String result completionService.take().get(); System.out.println(result); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } } executorService.shutdown(); } }源码public class ExecutorCompletionServiceV implements CompletionServiceV { //执行任务的线程池 private final Executor executor; //用于调用AbstractExecutorService的newTaskFor方法来实例化一个实现了RunnableFuture接口的对象 //如果executor继承了AbstractExecutorService 则直接调用executor的newTaskFor方法 //否则直接创建一个FutureTask对象 private final AbstractExecutorService aes; //任务完成后放入该阻塞队列中 private final BlockingQueueFutureV completionQueue; /** * 用于放入执行完成的任务 */ private static class QueueingFutureV extends FutureTaskVoid { QueueingFuture(RunnableFutureV task, BlockingQueueFutureV completionQueue) { super(task, null); this.task task; this.completionQueue completionQueue; } private final FutureV task; private final BlockingQueueFutureV completionQueue; //重写了FutureTask的done方法任务完成后将任务放入阻塞队列中 protected void done() { completionQueue.add(task); } } //将传入的Callable包装为RunnableFuture private RunnableFutureV newTaskFor(CallableV task) { if (aes null) return new FutureTaskV(task); else return aes.newTaskFor(task); } //将传入的Callable包装为RunnableFuture private RunnableFutureV newTaskFor(Runnable task, V result) { if (aes null) return new FutureTaskV(task, result); else return aes.newTaskFor(task, result); } /** * Creates an ExecutorCompletionService using the supplied * executor for base task execution and a * {link LinkedBlockingQueue} as a completion queue. * * param executor the executor to use * throws NullPointerException if executor is {code null} */ public ExecutorCompletionService(Executor executor) { if (executor null) throw new NullPointerException(); this.executor executor; this.aes (executor instanceof AbstractExecutorService) ? (AbstractExecutorService) executor : null; //completionQueue默认为LinkedBlockingQueue this.completionQueue new LinkedBlockingQueueFutureV(); } /** * Creates an ExecutorCompletionService using the supplied * executor for base task execution and the supplied queue as its * completion queue. * * param executor the executor to use * param completionQueue the queue to use as the completion queue * normally one dedicated for use by this service. This * queue is treated as unbounded -- failed attempted * {code Queue.add} operations for completed tasks cause * them not to be retrievable. * throws NullPointerException if executor or completionQueue are {code null} */ public ExecutorCompletionService(Executor executor, BlockingQueueFutureV completionQueue) { if (executor null || completionQueue null) throw new NullPointerException(); this.executor executor; this.aes (executor instanceof AbstractExecutorService) ? (AbstractExecutorService) executor : null; this.completionQueue completionQueue; } /** * 提交任务任务被包装为QueueingFuture对象主要重写FutureTask的done方法 * 使得任务执行完毕后被执行任务的线程放入到阻塞队列中 * throws RejectedExecutionException {inheritDoc} * throws NullPointerException {inheritDoc} */ public FutureV submit(CallableV task) { if (task null) throw new NullPointerException(); RunnableFutureV f newTaskFor(task); executor.execute(new QueueingFutureV(f, completionQueue)); return f; } /** * throws RejectedExecutionException {inheritDoc} * throws NullPointerException {inheritDoc} */ public FutureV submit(Runnable task, V result) { if (task null) throw new NullPointerException(); RunnableFutureV f newTaskFor(task, result); executor.execute(new QueueingFutureV(f, completionQueue)); return f; } //从阻塞队列中获取任务 public FutureV take() throws InterruptedException { return completionQueue.take(); } //如果完成队列中有数据就返回, 否则返回null public FutureV poll() { return completionQueue.poll(); } //如果完成队列中有数据就直接返回, 否则等待指定的时间, 到时间后如果还是没有数据就返回null public FutureV poll(long timeout, TimeUnit unit) throws InterruptedException { return completionQueue.poll(timeout, unit); } }AI大模型学习福利作为一名热心肠的互联网老兵我决定把宝贵的AI知识分享给大家。 至于能学习到多少就看你的学习毅力和能力了 。我已将重要的AI大模型资料包括AI大模型入门学习思维导图、精品AI大模型学习书籍手册、视频教程、实战学习等录播视频免费分享出来。一、全套AGI大模型学习路线AI大模型时代的学习之旅从基础到前沿掌握人工智能的核心技能因篇幅有限仅展示部分资料需要点击文章最下方名片即可前往获取二、640套AI大模型报告合集这套包含640份报告的合集涵盖了AI大模型的理论研究、技术实现、行业应用等多个方面。无论您是科研人员、工程师还是对AI大模型感兴趣的爱好者这套报告合集都将为您提供宝贵的信息和启示。因篇幅有限仅展示部分资料需要点击文章最下方名片即可前往获三、AI大模型经典PDF籍随着人工智能技术的飞速发展AI大模型已经成为了当今科技领域的一大热点。这些大型预训练模型如GPT-3、BERT、XLNet等以其强大的语言理解和生成能力正在改变我们对人工智能的认识。 那以下这些PDF籍就是非常不错的学习资源。因篇幅有限仅展示部分资料需要点击文章最下方名片即可前往获四、AI大模型商业化落地方案因篇幅有限仅展示部分资料需要点击文章最下方名片即可前往获作为普通人入局大模型时代需要持续学习和实践不断提高自己的技能和认知水平同时也需要有责任感和伦理意识为人工智能的健康发展贡献力量