Back to home page

OSCL-LXR

 
 

    


0001 /*
0002  * Licensed to the Apache Software Foundation (ASF) under one or more
0003  * contributor license agreements.  See the NOTICE file distributed with
0004  * this work for additional information regarding copyright ownership.
0005  * The ASF licenses this file to You under the Apache License, Version 2.0
0006  * (the "License"); you may not use this file except in compliance with
0007  * the License.  You may obtain a copy of the License at
0008  *
0009  *    http://www.apache.org/licenses/LICENSE-2.0
0010  *
0011  * Unless required by applicable law or agreed to in writing, software
0012  * distributed under the License is distributed on an "AS IS" BASIS,
0013  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0014  * See the License for the specific language governing permissions and
0015  * limitations under the License.
0016  */
0017 
0018 package org.apache.spark.network;
0019 
0020 import java.util.ArrayList;
0021 import java.util.List;
0022 
0023 import io.netty.channel.Channel;
0024 import io.netty.channel.ChannelPromise;
0025 import io.netty.channel.DefaultChannelPromise;
0026 import io.netty.util.concurrent.Future;
0027 import io.netty.util.concurrent.GenericFutureListener;
0028 
0029 class ExtendedChannelPromise extends DefaultChannelPromise {
0030 
0031   private List<GenericFutureListener<Future<Void>>> listeners = new ArrayList<>();
0032   private boolean success;
0033 
0034   ExtendedChannelPromise(Channel channel) {
0035     super(channel);
0036     success = false;
0037   }
0038 
0039   @Override
0040   public ChannelPromise addListener(
0041       GenericFutureListener<? extends Future<? super Void>> listener) {
0042     @SuppressWarnings("unchecked")
0043     GenericFutureListener<Future<Void>> gfListener =
0044         (GenericFutureListener<Future<Void>>) listener;
0045     listeners.add(gfListener);
0046     return super.addListener(listener);
0047   }
0048 
0049   @Override
0050   public boolean isSuccess() {
0051     return success;
0052   }
0053 
0054   @Override
0055   public ChannelPromise await() throws InterruptedException {
0056     return this;
0057   }
0058 
0059   public void finish(boolean success) {
0060     this.success = success;
0061     listeners.forEach(listener -> {
0062       try {
0063         listener.operationComplete(this);
0064       } catch (Exception e) {
0065         // do nothing
0066       }
0067     });
0068   }
0069 }