文章來(lái)源于公眾號(hào):小姐姐味道
很多同學(xué)喜歡使用lambda
表達(dá)式,它允許你定義短小精悍的函數(shù),體現(xiàn)你高超的編碼水平。當(dāng)然,這個(gè)功能在某些以代碼行數(shù)來(lái)衡量工作量的公司來(lái)說(shuō),就比較吃虧一些。
比如下面的代碼片段,讓人閱讀的時(shí)候就像是讀詩(shī)一樣。但是一旦用不好,也是會(huì)要命的。
List<Integer> transactionsIds =
widgets.stream()
.filter(b -> b.getColor() == RED)
.sorted((x,y) -> x.getWeight() - y.getWeight())
.mapToInt(Widget::getWeight)
.sum();
這段代碼有一個(gè)關(guān)鍵的函數(shù),那就是stream
。通過(guò)它,可以將一個(gè)普通的 list ,轉(zhuǎn)化為流,然后就可以使用類(lèi)似于管道的方式對(duì) list 進(jìn)行操作??傊眠^(guò)的都說(shuō)好。
問(wèn)題來(lái)了
假如我們把stream
換成parallelStream
,會(huì)發(fā)生什么情況?
根據(jù)字面上的意思,流會(huì)從串行
變成并行
。
既然是并行,那用屁股想一想,就知道這里面肯定會(huì)有線程安全問(wèn)題。不過(guò)我們這里討論的并不是要你使用線程安全的集合,這個(gè)話(huà)題太低級(jí)?,F(xiàn)階段,知道在線程不安全的環(huán)境中使用線程安全的集合,已經(jīng)是一個(gè)基本的技能。
這次踩坑的地方,是并行流的性能問(wèn)題。
我們用代碼來(lái)說(shuō)話(huà)。
下面的代碼,開(kāi)啟了8個(gè)線程,這8個(gè)線程都在使用并行流進(jìn)行數(shù)據(jù)計(jì)算
。在執(zhí)行的邏輯中,我們讓每個(gè)任務(wù)都 sleep 1秒鐘,這樣就能夠模擬一些 I/O 請(qǐng)求的耗時(shí)等待。
使用stream
,程序會(huì)在30秒后返回,但我們期望程序能夠在1秒多返回,因?yàn)樗遣⑿辛?,得?duì)得起這個(gè)稱(chēng)號(hào)。
測(cè)試發(fā)現(xiàn),我們等了好久,任務(wù)才執(zhí)行完畢。
static void paralleTest() {
List<Integer> numbers = Arrays.asList(
0, 1, 2, 3, 4, 5, 6, 7, 8, 9,
10, 11, 12, 13, 14, 15, 16, 17, 18, 19,
20, 21, 22, 23, 24, 25, 26, 27, 28, 29
);
final long begin = System.currentTimeMillis();
numbers.parallelStream().map(k -> {
try {
Thread.sleep(1000);
System.out.println((System.currentTimeMillis() - begin) + "ms => " + k + " \t" + Thread.currentThread());
} catch (InterruptedException e) {
e.printStackTrace();
}
return k;
}).collect(Collectors.toList());
}
public static void main(String[] args) {
// System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "20");
new Thread(() -> paralleTest()).start();
new Thread(() -> paralleTest()).start();
new Thread(() -> paralleTest()).start();
new Thread(() -> paralleTest()).start();
new Thread(() -> paralleTest()).start();
new Thread(() -> paralleTest()).start();
new Thread(() -> paralleTest()).start();
new Thread(() -> paralleTest()).start();
}
坑
實(shí)際上,在不同的機(jī)器上執(zhí)行,這段代碼花費(fèi)的時(shí)間都不一樣。
既然是并行,那肯定得有個(gè)并行度。太低了,體現(xiàn)不到并行的能能力;太大了,又浪費(fèi)了上下文切換的時(shí)間。我是很沮喪的發(fā)現(xiàn),很多高級(jí)研發(fā),將線程池的各種參數(shù)背的滾瓜爛熟,各種調(diào)優(yōu),竟然敢睜一只眼閉一只眼的在 I/O 密集型業(yè)務(wù)中用上parallelStream
。
要了解這個(gè)并行度
,我們需要查看具體的構(gòu)造方法。在ForkJoinPool
類(lèi)中找到這樣的代碼。
try { // ignore exceptions in accessing/parsing properties
String pp = System.getProperty
("java.util.concurrent.ForkJoinPool.common.parallelism");
if (pp != null)
parallelism = Integer.parseInt(pp);
fac = (ForkJoinWorkerThreadFactory) newInstanceFromSystemProperty(
"java.util.concurrent.ForkJoinPool.common.threadFactory");
handler = (UncaughtExceptionHandler) newInstanceFromSystemProperty(
"java.util.concurrent.ForkJoinPool.common.exceptionHandler");
} catch (Exception ignore) {
}
if (fac == null) {
if (System.getSecurityManager() == null)
fac = defaultForkJoinWorkerThreadFactory;
else // use security-managed default
fac = new InnocuousForkJoinWorkerThreadFactory();
}
if (parallelism < 0 && // default 1 less than #cores
(parallelism = Runtime.getRuntime().availableProcessors() - 1) <= 0)
parallelism = 1;
if (parallelism > MAX_CAP)
parallelism = MAX_CAP;
可以看到,并行度到底是多少,是由下面的參數(shù)來(lái)控制的。如果無(wú)法獲取這個(gè)參數(shù),則默認(rèn)使用 CPU個(gè)數(shù)-1
的并行度。
可以看到,這個(gè)函數(shù)是為了計(jì)算密集型業(yè)務(wù)去設(shè)計(jì)的。如果你喂給它一大堆任務(wù),它就會(huì)由并行執(zhí)行退變成類(lèi)似于串行的效果。
-Djava.util.concurrent.ForkJoinPool.common.parallelism=N
即使你使用-Djava.util.concurrent.ForkJoinPool.common.parallelism=N
設(shè)置了一個(gè)初始值大小,它依然有問(wèn)題。
因?yàn)椋?code>parallelism這個(gè)變量是 final 的,一旦設(shè)定,不允許修改。也就是說(shuō),上面的參數(shù)只會(huì)生效一次。
張三可能使用下面的代碼,設(shè)置了并行度大小為20
。
System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "20");
李四可能用同樣的方式,設(shè)置了這個(gè)值為30。那實(shí)際在項(xiàng)目中用的是哪個(gè)值,那就得問(wèn) JVM 是怎么加載的類(lèi)信息了。
這種方式并不太非??孔V。
一種解決方式
我們可以通過(guò)提供外置的forkjoinpool
,也就是改變提交方式,來(lái)實(shí)現(xiàn)不同類(lèi)型的任務(wù)分離。
代碼如下所示,通過(guò)顯式的代碼提交,即可實(shí)現(xiàn)任務(wù)分離。
ForkJoinPool pool = new ForkJoinPool(30);
final long begin = System.currentTimeMillis();
try {
pool.submit(() ->
numbers.parallelStream().map(k -> {
try {
Thread.sleep(1000);
System.out.println((System.currentTimeMillis() - begin) + "ms => " + k + " \t" + Thread.currentThread());
} catch (InterruptedException e) {
e.printStackTrace();
}
return k;
}).collect(Collectors.toList())).get();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
這樣,不同的場(chǎng)景,就可以擁有不同的并行度。這種方式和CountDownLatch
有異曲同工之妙,我們需要手動(dòng)管理資源。
使用了這種方式,代碼量增加,已經(jīng)和優(yōu)雅
關(guān)系不大了,不僅不優(yōu)雅,而且丑的要命。白天鵝變成了丑小鴨,你還會(huì)愛(ài)它么?
以上就是W3Cschool編程獅
關(guān)于parallelStream的坑,不踩不知道,一踩嚇一跳的相關(guān)介紹了,希望對(duì)大家有所幫助。