0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017
0018 package org.apache.spark.network;
0019
0020 import java.util.ArrayList;
0021 import java.util.List;
0022
0023 import io.netty.channel.Channel;
0024 import io.netty.channel.ChannelPromise;
0025 import io.netty.channel.DefaultChannelPromise;
0026 import io.netty.util.concurrent.Future;
0027 import io.netty.util.concurrent.GenericFutureListener;
0028
0029 class ExtendedChannelPromise extends DefaultChannelPromise {
0030
0031 private List<GenericFutureListener<Future<Void>>> listeners = new ArrayList<>();
0032 private boolean success;
0033
0034 ExtendedChannelPromise(Channel channel) {
0035 super(channel);
0036 success = false;
0037 }
0038
0039 @Override
0040 public ChannelPromise addListener(
0041 GenericFutureListener<? extends Future<? super Void>> listener) {
0042 @SuppressWarnings("unchecked")
0043 GenericFutureListener<Future<Void>> gfListener =
0044 (GenericFutureListener<Future<Void>>) listener;
0045 listeners.add(gfListener);
0046 return super.addListener(listener);
0047 }
0048
0049 @Override
0050 public boolean isSuccess() {
0051 return success;
0052 }
0053
0054 @Override
0055 public ChannelPromise await() throws InterruptedException {
0056 return this;
0057 }
0058
0059 public void finish(boolean success) {
0060 this.success = success;
0061 listeners.forEach(listener -> {
0062 try {
0063 listener.operationComplete(this);
0064 } catch (Exception e) {
0065
0066 }
0067 });
0068 }
0069 }