364 lines
13 KiB
Java
364 lines
13 KiB
Java
![]() |
/*
|
||
|
* Copyright (c) 2012, 2017, Oracle and/or its affiliates. All rights reserved.
|
||
|
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
|
||
|
*
|
||
|
* This code is free software; you can redistribute it and/or modify it
|
||
|
* under the terms of the GNU General Public License version 2 only, as
|
||
|
* published by the Free Software Foundation. Oracle designates this
|
||
|
* particular file as subject to the "Classpath" exception as provided
|
||
|
* by Oracle in the LICENSE file that accompanied this code.
|
||
|
*
|
||
|
* This code is distributed in the hope that it will be useful, but WITHOUT
|
||
|
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||
|
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
|
||
|
* version 2 for more details (a copy is included in the LICENSE file that
|
||
|
* accompanied this code).
|
||
|
*
|
||
|
* You should have received a copy of the GNU General Public License version
|
||
|
* 2 along with this work; if not, write to the Free Software Foundation,
|
||
|
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
|
||
|
*
|
||
|
* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
|
||
|
* or visit www.oracle.com if you need additional information or have any
|
||
|
* questions.
|
||
|
*/
|
||
|
package java.util.stream;
|
||
|
|
||
|
import java.util.Spliterator;
|
||
|
import java.util.concurrent.CountedCompleter;
|
||
|
import java.util.concurrent.ForkJoinPool;
|
||
|
import java.util.concurrent.ForkJoinWorkerThread;
|
||
|
|
||
|
/**
|
||
|
* Abstract base class for most fork-join tasks used to implement stream ops.
|
||
|
* Manages splitting logic, tracking of child tasks, and intermediate results.
|
||
|
* Each task is associated with a {@link Spliterator} that describes the portion
|
||
|
* of the input associated with the subtree rooted at this task.
|
||
|
* Tasks may be leaf nodes (which will traverse the elements of
|
||
|
* the {@code Spliterator}) or internal nodes (which split the
|
||
|
* {@code Spliterator} into multiple child tasks).
|
||
|
*
|
||
|
* @implNote
|
||
|
* <p>This class is based on {@link CountedCompleter}, a form of fork-join task
|
||
|
* where each task has a semaphore-like count of uncompleted children, and the
|
||
|
* task is implicitly completed and notified when its last child completes.
|
||
|
* Internal node tasks will likely override the {@code onCompletion} method from
|
||
|
* {@code CountedCompleter} to merge the results from child tasks into the
|
||
|
* current task's result.
|
||
|
*
|
||
|
* <p>Splitting and setting up the child task links is done by {@code compute()}
|
||
|
* for internal nodes. At {@code compute()} time for leaf nodes, it is
|
||
|
* guaranteed that the parent's child-related fields (including sibling links
|
||
|
* for the parent's children) will be set up for all children.
|
||
|
*
|
||
|
* <p>For example, a task that performs a reduce would override {@code doLeaf()}
|
||
|
* to perform a reduction on that leaf node's chunk using the
|
||
|
* {@code Spliterator}, and override {@code onCompletion()} to merge the results
|
||
|
* of the child tasks for internal nodes:
|
||
|
*
|
||
|
* <pre>{@code
|
||
|
* protected S doLeaf() {
|
||
|
* spliterator.forEach(...);
|
||
|
* return localReductionResult;
|
||
|
* }
|
||
|
*
|
||
|
* public void onCompletion(CountedCompleter caller) {
|
||
|
* if (!isLeaf()) {
|
||
|
* ReduceTask<P_IN, P_OUT, T, R> child = children;
|
||
|
* R result = child.getLocalResult();
|
||
|
* child = child.nextSibling;
|
||
|
* for (; child != null; child = child.nextSibling)
|
||
|
* result = combine(result, child.getLocalResult());
|
||
|
* setLocalResult(result);
|
||
|
* }
|
||
|
* }
|
||
|
* }</pre>
|
||
|
*
|
||
|
* <p>Serialization is not supported as there is no intention to serialize
|
||
|
* tasks managed by stream ops.
|
||
|
*
|
||
|
* @param <P_IN> Type of elements input to the pipeline
|
||
|
* @param <P_OUT> Type of elements output from the pipeline
|
||
|
* @param <R> Type of intermediate result, which may be different from operation
|
||
|
* result type
|
||
|
* @param <K> Type of parent, child and sibling tasks
|
||
|
* @since 1.8
|
||
|
*/
|
||
|
@SuppressWarnings("serial")
|
||
|
abstract class AbstractTask<P_IN, P_OUT, R,
|
||
|
K extends AbstractTask<P_IN, P_OUT, R, K>>
|
||
|
extends CountedCompleter<R> {
|
||
|
|
||
|
private static final int LEAF_TARGET = ForkJoinPool.getCommonPoolParallelism() << 2;
|
||
|
|
||
|
/** The pipeline helper, common to all tasks in a computation */
|
||
|
protected final PipelineHelper<P_OUT> helper;
|
||
|
|
||
|
/**
|
||
|
* The spliterator for the portion of the input associated with the subtree
|
||
|
* rooted at this task
|
||
|
*/
|
||
|
protected Spliterator<P_IN> spliterator;
|
||
|
|
||
|
/** Target leaf size, common to all tasks in a computation */
|
||
|
protected long targetSize; // may be lazily initialized
|
||
|
|
||
|
/**
|
||
|
* The left child.
|
||
|
* null if no children
|
||
|
* if non-null rightChild is non-null
|
||
|
*/
|
||
|
protected K leftChild;
|
||
|
|
||
|
/**
|
||
|
* The right child.
|
||
|
* null if no children
|
||
|
* if non-null leftChild is non-null
|
||
|
*/
|
||
|
protected K rightChild;
|
||
|
|
||
|
/** The result of this node, if completed */
|
||
|
private R localResult;
|
||
|
|
||
|
/**
|
||
|
* Constructor for root nodes.
|
||
|
*
|
||
|
* @param helper The {@code PipelineHelper} describing the stream pipeline
|
||
|
* up to this operation
|
||
|
* @param spliterator The {@code Spliterator} describing the source for this
|
||
|
* pipeline
|
||
|
*/
|
||
|
protected AbstractTask(PipelineHelper<P_OUT> helper,
|
||
|
Spliterator<P_IN> spliterator) {
|
||
|
super(null);
|
||
|
this.helper = helper;
|
||
|
this.spliterator = spliterator;
|
||
|
this.targetSize = 0L;
|
||
|
}
|
||
|
|
||
|
/**
|
||
|
* Constructor for non-root nodes.
|
||
|
*
|
||
|
* @param parent this node's parent task
|
||
|
* @param spliterator {@code Spliterator} describing the subtree rooted at
|
||
|
* this node, obtained by splitting the parent {@code Spliterator}
|
||
|
*/
|
||
|
protected AbstractTask(K parent,
|
||
|
Spliterator<P_IN> spliterator) {
|
||
|
super(parent);
|
||
|
this.spliterator = spliterator;
|
||
|
this.helper = parent.helper;
|
||
|
this.targetSize = parent.targetSize;
|
||
|
}
|
||
|
|
||
|
/**
|
||
|
* Default target of leaf tasks for parallel decomposition.
|
||
|
* To allow load balancing, we over-partition, currently to approximately
|
||
|
* four tasks per processor, which enables others to help out
|
||
|
* if leaf tasks are uneven or some processors are otherwise busy.
|
||
|
*/
|
||
|
public static int getLeafTarget() {
|
||
|
Thread t = Thread.currentThread();
|
||
|
if (t instanceof ForkJoinWorkerThread) {
|
||
|
return ((ForkJoinWorkerThread) t).getPool().getParallelism() << 2;
|
||
|
}
|
||
|
else {
|
||
|
return LEAF_TARGET;
|
||
|
}
|
||
|
}
|
||
|
|
||
|
/**
|
||
|
* Constructs a new node of type T whose parent is the receiver; must call
|
||
|
* the AbstractTask(T, Spliterator) constructor with the receiver and the
|
||
|
* provided Spliterator.
|
||
|
*
|
||
|
* @param spliterator {@code Spliterator} describing the subtree rooted at
|
||
|
* this node, obtained by splitting the parent {@code Spliterator}
|
||
|
* @return newly constructed child node
|
||
|
*/
|
||
|
protected abstract K makeChild(Spliterator<P_IN> spliterator);
|
||
|
|
||
|
/**
|
||
|
* Computes the result associated with a leaf node. Will be called by
|
||
|
* {@code compute()} and the result passed to @{code setLocalResult()}
|
||
|
*
|
||
|
* @return the computed result of a leaf node
|
||
|
*/
|
||
|
protected abstract R doLeaf();
|
||
|
|
||
|
/**
|
||
|
* Returns a suggested target leaf size based on the initial size estimate.
|
||
|
*
|
||
|
* @return suggested target leaf size
|
||
|
*/
|
||
|
public static long suggestTargetSize(long sizeEstimate) {
|
||
|
long est = sizeEstimate / getLeafTarget();
|
||
|
return est > 0L ? est : 1L;
|
||
|
}
|
||
|
|
||
|
/**
|
||
|
* Returns the targetSize, initializing it via the supplied
|
||
|
* size estimate if not already initialized.
|
||
|
*/
|
||
|
protected final long getTargetSize(long sizeEstimate) {
|
||
|
long s;
|
||
|
return ((s = targetSize) != 0 ? s :
|
||
|
(targetSize = suggestTargetSize(sizeEstimate)));
|
||
|
}
|
||
|
|
||
|
/**
|
||
|
* Returns the local result, if any. Subclasses should use
|
||
|
* {@link #setLocalResult(Object)} and {@link #getLocalResult()} to manage
|
||
|
* results. This returns the local result so that calls from within the
|
||
|
* fork-join framework will return the correct result.
|
||
|
*
|
||
|
* @return local result for this node previously stored with
|
||
|
* {@link #setLocalResult}
|
||
|
*/
|
||
|
@Override
|
||
|
public R getRawResult() {
|
||
|
return localResult;
|
||
|
}
|
||
|
|
||
|
/**
|
||
|
* Does nothing; instead, subclasses should use
|
||
|
* {@link #setLocalResult(Object)}} to manage results.
|
||
|
*
|
||
|
* @param result must be null, or an exception is thrown (this is a safety
|
||
|
* tripwire to detect when {@code setRawResult()} is being used
|
||
|
* instead of {@code setLocalResult()}
|
||
|
*/
|
||
|
@Override
|
||
|
protected void setRawResult(R result) {
|
||
|
if (result != null)
|
||
|
throw new IllegalStateException();
|
||
|
}
|
||
|
|
||
|
/**
|
||
|
* Retrieves a result previously stored with {@link #setLocalResult}
|
||
|
*
|
||
|
* @return local result for this node previously stored with
|
||
|
* {@link #setLocalResult}
|
||
|
*/
|
||
|
protected R getLocalResult() {
|
||
|
return localResult;
|
||
|
}
|
||
|
|
||
|
/**
|
||
|
* Associates the result with the task, can be retrieved with
|
||
|
* {@link #getLocalResult}
|
||
|
*
|
||
|
* @param localResult local result for this node
|
||
|
*/
|
||
|
protected void setLocalResult(R localResult) {
|
||
|
this.localResult = localResult;
|
||
|
}
|
||
|
|
||
|
/**
|
||
|
* Indicates whether this task is a leaf node. (Only valid after
|
||
|
* {@link #compute} has been called on this node). If the node is not a
|
||
|
* leaf node, then children will be non-null and numChildren will be
|
||
|
* positive.
|
||
|
*
|
||
|
* @return {@code true} if this task is a leaf node
|
||
|
*/
|
||
|
protected boolean isLeaf() {
|
||
|
return leftChild == null;
|
||
|
}
|
||
|
|
||
|
/**
|
||
|
* Indicates whether this task is the root node
|
||
|
*
|
||
|
* @return {@code true} if this task is the root node.
|
||
|
*/
|
||
|
protected boolean isRoot() {
|
||
|
return getParent() == null;
|
||
|
}
|
||
|
|
||
|
/**
|
||
|
* Returns the parent of this task, or null if this task is the root
|
||
|
*
|
||
|
* @return the parent of this task, or null if this task is the root
|
||
|
*/
|
||
|
@SuppressWarnings("unchecked")
|
||
|
protected K getParent() {
|
||
|
return (K) getCompleter();
|
||
|
}
|
||
|
|
||
|
/**
|
||
|
* Decides whether or not to split a task further or compute it
|
||
|
* directly. If computing directly, calls {@code doLeaf} and pass
|
||
|
* the result to {@code setRawResult}. Otherwise splits off
|
||
|
* subtasks, forking one and continuing as the other.
|
||
|
*
|
||
|
* <p> The method is structured to conserve resources across a
|
||
|
* range of uses. The loop continues with one of the child tasks
|
||
|
* when split, to avoid deep recursion. To cope with spliterators
|
||
|
* that may be systematically biased toward left-heavy or
|
||
|
* right-heavy splits, we alternate which child is forked versus
|
||
|
* continued in the loop.
|
||
|
*/
|
||
|
@Override
|
||
|
public void compute() {
|
||
|
Spliterator<P_IN> rs = spliterator, ls; // right, left spliterators
|
||
|
long sizeEstimate = rs.estimateSize();
|
||
|
long sizeThreshold = getTargetSize(sizeEstimate);
|
||
|
boolean forkRight = false;
|
||
|
@SuppressWarnings("unchecked") K task = (K) this;
|
||
|
while (sizeEstimate > sizeThreshold && (ls = rs.trySplit()) != null) {
|
||
|
K leftChild, rightChild, taskToFork;
|
||
|
task.leftChild = leftChild = task.makeChild(ls);
|
||
|
task.rightChild = rightChild = task.makeChild(rs);
|
||
|
task.setPendingCount(1);
|
||
|
if (forkRight) {
|
||
|
forkRight = false;
|
||
|
rs = ls;
|
||
|
task = leftChild;
|
||
|
taskToFork = rightChild;
|
||
|
}
|
||
|
else {
|
||
|
forkRight = true;
|
||
|
task = rightChild;
|
||
|
taskToFork = leftChild;
|
||
|
}
|
||
|
taskToFork.fork();
|
||
|
sizeEstimate = rs.estimateSize();
|
||
|
}
|
||
|
task.setLocalResult(task.doLeaf());
|
||
|
task.tryComplete();
|
||
|
}
|
||
|
|
||
|
/**
|
||
|
* {@inheritDoc}
|
||
|
*
|
||
|
* @implNote
|
||
|
* Clears spliterator and children fields. Overriders MUST call
|
||
|
* {@code super.onCompletion} as the last thing they do if they want these
|
||
|
* cleared.
|
||
|
*/
|
||
|
@Override
|
||
|
public void onCompletion(CountedCompleter<?> caller) {
|
||
|
spliterator = null;
|
||
|
leftChild = rightChild = null;
|
||
|
}
|
||
|
|
||
|
/**
|
||
|
* Returns whether this node is a "leftmost" node -- whether the path from
|
||
|
* the root to this node involves only traversing leftmost child links. For
|
||
|
* a leaf node, this means it is the first leaf node in the encounter order.
|
||
|
*
|
||
|
* @return {@code true} if this node is a "leftmost" node
|
||
|
*/
|
||
|
protected boolean isLeftmostNode() {
|
||
|
@SuppressWarnings("unchecked")
|
||
|
K node = (K) this;
|
||
|
while (node != null) {
|
||
|
K parent = node.getParent();
|
||
|
if (parent != null && parent.leftChild != node)
|
||
|
return false;
|
||
|
node = parent;
|
||
|
}
|
||
|
return true;
|
||
|
}
|
||
|
}
|