java8中Stream的使⽤以及分割list案例
⼀、Steam的优势
java8中Stream配合Lambda表达式极⼤提⾼了编程效率,代码简洁易懂(可能刚接触的⼈会觉得晦涩难懂),不需要写传统的多线程代码就能写出⾼性能的并发程序
⼆、项⽬中遇到的问题
由于接⼝限制,每次导⼊code只能100个,所以需要分割list。但是由于code数量可能很⼤,这样执⾏效率就会很低。
1.⾸先想到是⽤多线程写传统并⾏程序,但是博主不是很熟练,写出代码可能会出现不可预料的结果,容易出错也难以维护。
2.然后就想到Steam中的parallel,能提⾼性能⼜能利⽤java8的特性,何乐⽽不为。
三、废话不多说,直接先贴代码,然后再解释(java8分割list代码在标题四)。
1.该⽅法是根据传⼊数量⽣成codes,private String getGeneratorCode(int tenantId)是我根据编码规则⽣成唯⼀code这个不需要管,我们要看的是Stream.iterate
2.1 构造流的⽅法还有Stream.of(),结合或者数组可直接list.stream();
String[] array = new String[]{"1","2","3"} ;
stream = Stream.of(array)或者Arrays.Stream(array);
2.2 数值流IntStream
int[] array = new int[]{1,2,3};
IntStream.of(array)或者IntStream.ranage(0,3)
3.以上构造流的⽅法都是已经知道⼤⼩,对于通过⼊参确定的应该图中⽅法⾃⼰⽣成流。
四、java8分割list,利⽤StreamApi实现。
没⽤java8前代码,做个鲜明对⽐():
1.list是我的编码集合(codes)。MAX_SEND为100(即每次100的⼤⼩去分割list),limit为按编码集合⼤⼩算出的本次需要分割多少次。
2.我们可以看到其实就是多了个skip跟limit⽅法。skip就是舍弃stream前多少个元素,那么limit就是返回流前⾯多少个元素(如果流⾥元素少于该值,则返回全部)。然后开启并⾏处理。通过循环我们的
分割list的⽬标就达到了,每次取到的sendList 就是100,100这样⼦的。
3.因为我这⾥业务就只需要到这⾥,如果我们分割之后需要收集之后再做处理,那只需要改写⼀下就ok;如:
List<List<String>> splitList = Stream.iterate(0,n->n+1).limit(limit).parallel().map(a->{
List<String> sendList = list.stream().skip(a*MAX_SEND).limit(MAX_SEND).parallel().List());
}).List());
五、java8流⾥好像拿不到下标,所以我才⽤到构造⼀个递增数列当下标⽤,这就是我⽤java8分割list的过程,⽐以前的for循环看的爽⼼悦⽬,优雅些,性能功也提⾼了。
如果各位有更好的实现⽅式,欢迎留⾔指教。
补充知识:聊聊flink DataStream的split操作
本⽂主要研究⼀下flink DataStream的split操作
实例
SplitStream<Integer> split = someDataStream.split(new OutputSelector<Integer>() {
@Override
public Iterable<String> select(Integer value) {
List<String> output = new ArrayList<String>();
if (value % 2 == 0) {
output.add("even");
}
else {
output.add("odd");
}
return output;
}
});
本实例将dataStream split为两个dataStream,⼀个outputName为even,另⼀个outputName为odd
DataStream.split
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/datastream/DataStream.java
@Public
public class DataStream<T> {
//......
public SplitStream<T> split(OutputSelector<T> outputSelector) {
return new SplitStream<>(this, clean(outputSelector));
}
//......
}
DataStream的split操作接收OutputSelector参数,然后创建并返回SplitStream
OutputSelector
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/collector/selector/OutputSelector.java
@PublicEvolving
public interface OutputSelector<OUT> extends Serializable {
Iterable<String> select(OUT value);
}
OutputSelector定义了select⽅法⽤于给element打上outputNames
SplitStream
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/datastream/SplitStream.java
@PublicEvolving
public class SplitStream<OUT> extends DataStream<OUT> {
protected SplitStream(DataStream<OUT> dataStream, OutputSelector<OUT> outputSelector) {
ExecutionEnvironment(), new SplitTransformation<OUT>(Transformation(), outputSelector));
}
public DataStream<OUT> outputNames) {
return selectOutput(outputNames);
}
private DataStream<OUT> selectOutput(String[] outputNames) {
for (String outName : outputNames) {
if (outName == null) {
throw new RuntimeException("Selected names must not be null");
}
}
SelectTransformation<OUT> selectTransform = new SelectTransformation<OUT>(Transformation(), wArrayList(outputNames));
return new DataStream<OUT>(ExecutionEnvironment(), selectTransform);
}
}
SplitStream继承了DataStream,它定义了select⽅法,可以⽤来根据outputNames选择split出来的dataStream;select⽅法创建了SelectTransformation
StreamGraphGenerator
flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
@Internal
public class StreamGraphGenerator {
//......
private Collection<Integer> transform(StreamTransformation<?> transform) {
if (ainsKey(transform)) {
(transform);
}
LOG.debug("Transforming " + transform);
if (MaxParallelism() <= 0) {
// if the max parallelism hasn't been set, then first use the job wide max parallelism
// from theExecutionConfig.
int globalMaxParallelismFromConfig = Config().getMaxParallelism();
if (globalMaxParallelismFromConfig > 0) {
transform.setMaxParallelism(globalMaxParallelismFromConfig);
}
}
// call at least once to trigger exceptions about MissingTypeInfo
Collection<Integer> transformedIds;
if (transform instanceof OneInputTransformation<?, ?>) {
transformedIds = transformOneInputTransform((OneInputTransformation<?, ?>) transform);
} else if (transform instanceof TwoInputTransformation<?, ?, ?>) {
transformedIds = transformTwoInputTransform((TwoInputTransformation<?, ?, ?>) transform);
} else if (transform instanceof SourceTransformation<?>) {
transformedIds = transformSource((SourceTransformation<?>) transform);
} else if (transform instanceof SinkTransformation<?>) {
transformedIds = transformSink((SinkTransformation<?>) transform);
} else if (transform instanceof UnionTransformation<?>) {
transformedIds = transformUnion((UnionTransformation<?>) transform);
} else if (transform instanceof SplitTransformation<?>) {
transformedIds = transformSplit((SplitTransformation<?>) transform);
} else if (transform instanceof SelectTransformation<?>) {
transformedIds = transformSelect((SelectTransformation<?>) transform);
} else if (transform instanceof FeedbackTransformation<?>) {
transformedIds = transformFeedback((FeedbackTransformation<?>) transform);
} else if (transform instanceof CoFeedbackTransformation<?>) {
transformedIds = transformCoFeedback((CoFeedbackTransformation<?>) transform);
} else if (transform instanceof PartitionTransformation<?>) {
transformedIds = transformPartition((PartitionTransformation<?>) transform);
} else if (transform instanceof SideOutputTransformation<?>) {
transformedIds = transformSideOutput((SideOutputTransformation<?>) transform);
} else {
throw new IllegalStateException("Unknown transformation: " + transform);
}
// need this check because the iterate transformation adds itself before
// transforming the feedback edges
if (!ainsKey(transform)) {
alreadyTransformed.put(transform, transformedIds);
}
if (BufferTimeout() >= 0) {
streamGraph.Id(), BufferTimeout());
}
if (Uid() != null) {
streamGraph.Id(), Uid());
}
if (UserProvidedNodeHash() != null) {
streamGraph.Id(), UserProvidedNodeHash());
}
if (MinResources() != null && PreferredResources() != null) {
streamGraph.Id(), MinResources(), PreferredResources()); }
return transformedIds;
}
private <T> Collection<Integer> transformSelect(SelectTransformation<T> select) {
StreamTransformation<T> input = Input();
Collection<Integer> resultIds = transform(input);
// the recursive transform might have already transformed this
if (ainsKey(select)) {
(select);
}
List<Integer> virtualResultIds = new ArrayList<>();
for (int inputId : resultIds) {
int virtualId = NewNodeId();
streamGraph.addVirtualSelectNode(inputId, virtualId, SelectedNames());
virtualResultIds.add(virtualId);
}
return virtualResultIds;
java stream
}
private <T> Collection<Integer> transformSplit(SplitTransformation<T> split) {
StreamTransformation<T> input = Input();
Collection<Integer> resultIds = transform(input);
// the recursive transform call might have transformed this already
if (ainsKey(split)) {
(split);
}
for (int inputId : resultIds) {
streamGraph.addOutputSelector(inputId, OutputSelector());
}
return resultIds;
}
//......
}
StreamGraphGenerator⾥头的transform会对SelectTransformation以及SplitTransformation进⾏相应的处理transformSelect⽅法会根据SelectedNames()来addVirtualSelectNode
transformSplit⽅法则根据OutputSelector()来addOutputSelector
⼩结
DataStream的split操作接收OutputSelector参数,然后创建并返回SplitStream
OutputSelector定义了select⽅法⽤于给element打上outputNames
SplitStream继承了DataStream,它定义了select⽅法,可以⽤来根据outputNames选择split出来的dataStream
doc
以上这篇java8中Stream的使⽤以及分割list案例就是⼩编分享给⼤家的全部内容了,希望能给⼤家⼀个参考,也希望⼤家多多⽀持。