Java 8平行流中的自定义线程池

问题:

可以为Java 8 parallel stream指定自定义线程池吗?我找不到任何地方
想象一下,我有一个服务器应用程序,我想使用并行流。但是应用程序是大型和多线程的,所以我想划分它。我不想在另一个模块的应用程序块任务的一个模块中运行缓慢的任务。
如果我不能为不同的模块使用不同的线程池,这意味着我无法在大多数现实世界的情况下安全地使用并行流。
尝试以下示例。在单独的线程中执行一些CPU密集型任务。
这些任务利用并行流。第一个任务是坏的,所以每个步骤需要1秒钟(由线程睡眠模拟)。问题是其他线程被卡住,等待破坏的任务完成。这是一个有创意的例子,但是想象一下servlet应用程序和有人将长时间运行的任务提交给共享fork连接池。

public class ParallelTest {
    public static void main(String[] args) throws InterruptedException {
        ExecutorService es = Executors.newCachedThreadPool();

        es.execute(() -> runTask(1000)); //incorrect task
        es.execute(() -> runTask(0));
        es.execute(() -> runTask(0));
        es.execute(() -> runTask(0));
        es.execute(() -> runTask(0));
        es.execute(() -> runTask(0));


        es.shutdown();
        es.awaitTermination(60, TimeUnit.SECONDS);
    }

    private static void runTask(int delay) {
        range(1, 1_000_000).parallel().filter(ParallelTest::isPrime).peek(i -> Utils.sleep(delay)).max()
                .ifPresent(max -> System.out.println(Thread.currentThread() + " " + max));
    }

    public static boolean isPrime(long n) {
        return n > 1 && rangeClosed(2, (long) sqrt(n)).noneMatch(divisor -> n % divisor == 0);
    }
}

回答:

实际上有一个技巧如何在特定的fork-join池中执行并行操作。如果将其作为fork-join池中的任务执行,那么它将停留在那里,不使用通用的。

ForkJoinPool forkJoinPool = new ForkJoinPool(2);
forkJoinPool.submit(() ->
    //parallel task here, for example
    IntStream.range(1, 1_000_000).parallel().filter(PrimesPrint::isPrime).collect(toList())
).get();

诀窍是基于ForkJoinTask.fork,它指定:“在当前任务正在运行的池中异步执行此任务(如果适用)或使用ForkJoinPool.commonPool()(如果不在ForkJoinPool()”中排列)

 
 
Code问答: http://codewenda.com/topics/python/
Stackoverflow: Custom thread pool in Java 8 parallel stream

*转载请注明本文链接以及stackoverflow的英文链接

发表评论

电子邮件地址不会被公开。 必填项已用*标注

6 + 2 =