1 /* 2 * Licensed to the Apache Software Foundation (ASF) under one or more 3 * contributor license agreements. See the NOTICE file distributed with 4 * this work for additional information regarding copyright ownership. 5 * The ASF licenses this file to You under the Apache License, Version 2.0 6 * (the "License"); you may not use this file except in compliance with 7 * the License. You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 */ 17 package com.alibaba.dubbo.rpc.cluster.support; 18 19 import com.alibaba.dubbo.common.Constants; 20 import com.alibaba.dubbo.common.Version; 21 import com.alibaba.dubbo.common.logger.Logger; 22 import com.alibaba.dubbo.common.logger.LoggerFactory; 23 import com.alibaba.dubbo.common.utils.NetUtils; 24 import com.alibaba.dubbo.rpc.Invocation; 25 import com.alibaba.dubbo.rpc.Invoker; 26 import com.alibaba.dubbo.rpc.Result; 27 import com.alibaba.dubbo.rpc.RpcContext; 28 import com.alibaba.dubbo.rpc.RpcException; 29 import com.alibaba.dubbo.rpc.cluster.Directory; 30 import com.alibaba.dubbo.rpc.cluster.LoadBalance; 31 32 import java.util.ArrayList; 33 import java.util.HashSet; 34 import java.util.List; 35 import java.util.Set; 36 37 /** 38 * When invoke fails, log the initial error and retry other invokers (retry n times, which means at most n different invokers will be invoked) 39 * Note that retry causes latency. 40 *41 * Failover 42 * 43 */ 44 public class FailoverClusterInvoker
extends AbstractClusterInvoker { 45 46 private static final Logger logger = LoggerFactory.getLogger(FailoverClusterInvoker.class); 47 48 public FailoverClusterInvoker(Directory directory) { 49 super(directory); 50 } 51 52 @Override 53 @SuppressWarnings({"unchecked", "rawtypes"}) 54 public Result doInvoke(Invocation invocation, final List > invokers, LoadBalance loadbalance) throws RpcException { 55 List > copyinvokers = invokers; 56 checkInvokers(copyinvokers, invocation); 57 int len = getUrl().getMethodParameter(invocation.getMethodName(), Constants.RETRIES_KEY, Constants.DEFAULT_RETRIES) + 1; 58 if (len <= 0) { 59 len = 1; 60 } 61 // retry loop. 62 RpcException le = null; // last exception. 63 List > invoked = new ArrayList >(copyinvokers.size()); // invoked invokers. 64 Set providers = new HashSet (len); 65 for (int i = 0; i < len; i++) { 66 //Reselect before retry to avoid a change of candidate `invokers`. 67 //NOTE: if `invokers` changed, then `invoked` also lose accuracy. 68 if (i > 0) { 69 checkWhetherDestroyed(); 70 copyinvokers = list(invocation); 71 // check again 72 checkInvokers(copyinvokers, invocation); 73 } 74 Invoker invoker = select(loadbalance, invocation, copyinvokers, invoked); 75 invoked.add(invoker); 76 RpcContext.getContext().setInvokers((List) invoked); 77 try { 78 Result result = invoker.invoke(invocation); 79 if (le != null && logger.isWarnEnabled()) { 80 logger.warn("Although retry the method " + invocation.getMethodName() 81 + " in the service " + getInterface().getName() 82 + " was successful by the provider " + invoker.getUrl().getAddress() 83 + ", but there have been failed providers " + providers 84 + " (" + providers.size() + "/" + copyinvokers.size() 85 + ") from the registry " + directory.getUrl().getAddress() 86 + " on the consumer " + NetUtils.getLocalHost() 87 + " using the dubbo version " + Version.getVersion() + ". Last error is: " 88 + le.getMessage(), le); 89 } 90 return result; 91 } catch (RpcException e) { 92 if (e.isBiz()) { // biz exception. 93 throw e; 94 } 95 le = e; 96 } catch (Throwable e) { 97 le = new RpcException(e.getMessage(), e); 98 } finally { 99 providers.add(invoker.getUrl().getAddress());100 }101 }102 throw new RpcException(le != null ? le.getCode() : 0, "Failed to invoke the method "103 + invocation.getMethodName() + " in the service " + getInterface().getName()104 + ". Tried " + len + " times of the providers " + providers105 + " (" + providers.size() + "/" + copyinvokers.size()106 + ") from the registry " + directory.getUrl().getAddress()107 + " on the consumer " + NetUtils.getLocalHost() + " using the dubbo version "108 + Version.getVersion() + ". Last error is: "109 + (le != null ? le.getMessage() : ""), le != null && le.getCause() != null ? le.getCause() : le);110 }111 112 }
1 /* 2 * Licensed to the Apache Software Foundation (ASF) under one or more 3 * contributor license agreements. See the NOTICE file distributed with 4 * this work for additional information regarding copyright ownership. 5 * The ASF licenses this file to You under the Apache License, Version 2.0 6 * (the "License"); you may not use this file except in compliance with 7 * the License. You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.010 *11 * Unless required by applicable law or agreed to in writing, software12 * distributed under the License is distributed on an "AS IS" BASIS,13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.14 * See the License for the specific language governing permissions and15 * limitations under the License.16 */17 package com.alibaba.dubbo.rpc.cluster.support;18 19 import com.alibaba.dubbo.common.Version;20 import com.alibaba.dubbo.common.utils.NetUtils;21 import com.alibaba.dubbo.rpc.Invocation;22 import com.alibaba.dubbo.rpc.Invoker;23 import com.alibaba.dubbo.rpc.Result;24 import com.alibaba.dubbo.rpc.RpcException;25 import com.alibaba.dubbo.rpc.cluster.Directory;26 import com.alibaba.dubbo.rpc.cluster.LoadBalance;27 28 import java.util.List;29 30 /**31 * Execute exactly once, which means this policy will throw an exception immediately in case of an invocation error.32 * Usually used for non-idempotent write operations33 *34 * Fail-fast35 *36 */37 public class FailfastClusterInvokerextends AbstractClusterInvoker {38 39 public FailfastClusterInvoker(Directory directory) {40 super(directory);41 }42 43 @Override44 public Result doInvoke(Invocation invocation, List > invokers, LoadBalance loadbalance) throws RpcException {45 checkInvokers(invokers, invocation);46 Invoker invoker = select(loadbalance, invocation, invokers, null);47 try {48 return invoker.invoke(invocation);49 } catch (Throwable e) {50 if (e instanceof RpcException && ((RpcException) e).isBiz()) { // biz exception.51 throw (RpcException) e;52 }53 throw new RpcException(e instanceof RpcException ? ((RpcException) e).getCode() : 0, "Failfast invoke providers " + invoker.getUrl() + " " + loadbalance.getClass().getSimpleName() + " select from all providers " + invokers + " for service " + getInterface().getName() + " method " + invocation.getMethodName() + " on consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion() + ", but no luck to perform the invocation. Last error is: " + e.getMessage(), e.getCause() != null ? e.getCause() : e);54 }55 }56 }
1 /* 2 * Licensed to the Apache Software Foundation (ASF) under one or more 3 * contributor license agreements. See the NOTICE file distributed with 4 * this work for additional information regarding copyright ownership. 5 * The ASF licenses this file to You under the Apache License, Version 2.0 6 * (the "License"); you may not use this file except in compliance with 7 * the License. You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.010 *11 * Unless required by applicable law or agreed to in writing, software12 * distributed under the License is distributed on an "AS IS" BASIS,13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.14 * See the License for the specific language governing permissions and15 * limitations under the License.16 */17 package com.alibaba.dubbo.rpc.cluster.support;18 19 import com.alibaba.dubbo.common.logger.Logger;20 import com.alibaba.dubbo.common.logger.LoggerFactory;21 import com.alibaba.dubbo.rpc.Invocation;22 import com.alibaba.dubbo.rpc.Invoker;23 import com.alibaba.dubbo.rpc.Result;24 import com.alibaba.dubbo.rpc.RpcException;25 import com.alibaba.dubbo.rpc.RpcResult;26 import com.alibaba.dubbo.rpc.cluster.Directory;27 import com.alibaba.dubbo.rpc.cluster.LoadBalance;28 29 import java.util.List;30 31 /**32 * When invoke fails, log the error message and ignore this error by returning an empty RpcResult.33 * Usually used to write audit logs and other operations34 *35 * Fail-safe36 *37 */38 public class FailsafeClusterInvokerextends AbstractClusterInvoker {39 private static final Logger logger = LoggerFactory.getLogger(FailsafeClusterInvoker.class);40 41 public FailsafeClusterInvoker(Directory directory) {42 super(directory);43 }44 45 @Override46 public Result doInvoke(Invocation invocation, List > invokers, LoadBalance loadbalance) throws RpcException {47 try {48 checkInvokers(invokers, invocation);49 Invoker invoker = select(loadbalance, invocation, invokers, null);50 return invoker.invoke(invocation);51 } catch (Throwable e) {52 logger.error("Failsafe ignore exception: " + e.getMessage(), e);53 return new RpcResult(); // ignore54 }55 }56 }
1 /* 2 * Licensed to the Apache Software Foundation (ASF) under one or more 3 * contributor license agreements. See the NOTICE file distributed with 4 * this work for additional information regarding copyright ownership. 5 * The ASF licenses this file to You under the Apache License, Version 2.0 6 * (the "License"); you may not use this file except in compliance with 7 * the License. You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 */ 17 package com.alibaba.dubbo.rpc.cluster.support; 18 19 import com.alibaba.dubbo.common.logger.Logger; 20 import com.alibaba.dubbo.common.logger.LoggerFactory; 21 import com.alibaba.dubbo.common.utils.NamedThreadFactory; 22 import com.alibaba.dubbo.rpc.Invocation; 23 import com.alibaba.dubbo.rpc.Invoker; 24 import com.alibaba.dubbo.rpc.Result; 25 import com.alibaba.dubbo.rpc.RpcException; 26 import com.alibaba.dubbo.rpc.RpcResult; 27 import com.alibaba.dubbo.rpc.cluster.Directory; 28 import com.alibaba.dubbo.rpc.cluster.LoadBalance; 29 30 import java.util.HashMap; 31 import java.util.List; 32 import java.util.Map; 33 import java.util.concurrent.ConcurrentHashMap; 34 import java.util.concurrent.ConcurrentMap; 35 import java.util.concurrent.Executors; 36 import java.util.concurrent.ScheduledExecutorService; 37 import java.util.concurrent.ScheduledFuture; 38 import java.util.concurrent.TimeUnit; 39 40 /** 41 * When fails, record failure requests and schedule for retry on a regular interval. 42 * Especially useful for services of notification. 43 * 44 * Failback 45 * 46 */ 47 public class FailbackClusterInvokerextends AbstractClusterInvoker { 48 49 private static final Logger logger = LoggerFactory.getLogger(FailbackClusterInvoker.class); 50 51 private static final long RETRY_FAILED_PERIOD = 5 * 1000; 52 53 private final ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(2, new NamedThreadFactory("failback-cluster-timer", true)); 54 private final ConcurrentMap > failed = new ConcurrentHashMap >(); 55 private volatile ScheduledFuture retryFuture; 56 57 public FailbackClusterInvoker(Directory directory) { 58 super(directory); 59 } 60 61 private void addFailed(Invocation invocation, AbstractClusterInvoker router) { 62 if (retryFuture == null) { 63 synchronized (this) { 64 if (retryFuture == null) { 65 retryFuture = scheduledExecutorService.scheduleWithFixedDelay(new Runnable() { 66 67 @Override 68 public void run() { 69 // collect retry statistics 70 try { 71 retryFailed(); 72 } catch (Throwable t) { // Defensive fault tolerance 73 logger.error("Unexpected error occur at collect statistic", t); 74 } 75 } 76 }, RETRY_FAILED_PERIOD, RETRY_FAILED_PERIOD, TimeUnit.MILLISECONDS); 77 } 78 } 79 } 80 failed.put(invocation, router); 81 } 82 83 void retryFailed() { 84 if (failed.size() == 0) { 85 return; 86 } 87 for (Map.Entry > entry : new HashMap >( 88 failed).entrySet()) { 89 Invocation invocation = entry.getKey(); 90 Invoker invoker = entry.getValue(); 91 try { 92 invoker.invoke(invocation); 93 failed.remove(invocation); 94 } catch (Throwable e) { 95 logger.error("Failed retry to invoke method " + invocation.getMethodName() + ", waiting again.", e); 96 } 97 } 98 } 99 100 @Override101 protected Result doInvoke(Invocation invocation, List > invokers, LoadBalance loadbalance) throws RpcException {102 try {103 checkInvokers(invokers, invocation);104 Invoker invoker = select(loadbalance, invocation, invokers, null);105 return invoker.invoke(invocation);106 } catch (Throwable e) {107 logger.error("Failback to invoke method " + invocation.getMethodName() + ", wait for retry in background. Ignored exception: "108 + e.getMessage() + ", ", e);109 addFailed(invocation, this);110 return new RpcResult(); // ignore111 }112 }113 114 }
1 /* 2 * Licensed to the Apache Software Foundation (ASF) under one or more 3 * contributor license agreements. See the NOTICE file distributed with 4 * this work for additional information regarding copyright ownership. 5 * The ASF licenses this file to You under the Apache License, Version 2.0 6 * (the "License"); you may not use this file except in compliance with 7 * the License. You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 */ 17 package com.alibaba.dubbo.rpc.cluster.support; 18 19 import com.alibaba.dubbo.common.Constants; 20 import com.alibaba.dubbo.common.utils.NamedThreadFactory; 21 import com.alibaba.dubbo.rpc.Invocation; 22 import com.alibaba.dubbo.rpc.Invoker; 23 import com.alibaba.dubbo.rpc.Result; 24 import com.alibaba.dubbo.rpc.RpcContext; 25 import com.alibaba.dubbo.rpc.RpcException; 26 import com.alibaba.dubbo.rpc.cluster.Directory; 27 import com.alibaba.dubbo.rpc.cluster.LoadBalance; 28 29 import java.util.ArrayList; 30 import java.util.List; 31 import java.util.concurrent.BlockingQueue; 32 import java.util.concurrent.ExecutorService; 33 import java.util.concurrent.Executors; 34 import java.util.concurrent.LinkedBlockingQueue; 35 import java.util.concurrent.TimeUnit; 36 import java.util.concurrent.atomic.AtomicInteger; 37 38 /** 39 * Invoke a specific number of invokers concurrently, usually used for demanding real-time operations, but need to waste more service resources. 40 * 41 * Fork 42 * 43 */ 44 public class ForkingClusterInvokerextends AbstractClusterInvoker { 45 46 private final ExecutorService executor = Executors.newCachedThreadPool(new NamedThreadFactory("forking-cluster-timer", true)); 47 48 public ForkingClusterInvoker(Directory directory) { 49 super(directory); 50 } 51 52 @Override 53 @SuppressWarnings({"unchecked", "rawtypes"}) 54 public Result doInvoke(final Invocation invocation, List > invokers, LoadBalance loadbalance) throws RpcException { 55 checkInvokers(invokers, invocation); 56 final List > selected; 57 final int forks = getUrl().getParameter(Constants.FORKS_KEY, Constants.DEFAULT_FORKS); 58 final int timeout = getUrl().getParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT); 59 if (forks <= 0 || forks >= invokers.size()) { 60 selected = invokers; 61 } else { 62 selected = new ArrayList >(); 63 for (int i = 0; i < forks; i++) { 64 // TODO. Add some comment here, refer chinese version for more details. 65 Invoker invoker = select(loadbalance, invocation, invokers, selected); 66 if (!selected.contains(invoker)) { //Avoid add the same invoker several times. 67 selected.add(invoker); 68 } 69 } 70 } 71 RpcContext.getContext().setInvokers((List) selected); 72 final AtomicInteger count = new AtomicInteger(); 73 final BlockingQueue
1 /* 2 * Licensed to the Apache Software Foundation (ASF) under one or more 3 * contributor license agreements. See the NOTICE file distributed with 4 * this work for additional information regarding copyright ownership. 5 * The ASF licenses this file to You under the Apache License, Version 2.0 6 * (the "License"); you may not use this file except in compliance with 7 * the License. You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.010 *11 * Unless required by applicable law or agreed to in writing, software12 * distributed under the License is distributed on an "AS IS" BASIS,13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.14 * See the License for the specific language governing permissions and15 * limitations under the License.16 */17 package com.alibaba.dubbo.rpc.cluster.support;18 19 import com.alibaba.dubbo.common.logger.Logger;20 import com.alibaba.dubbo.common.logger.LoggerFactory;21 import com.alibaba.dubbo.rpc.Invocation;22 import com.alibaba.dubbo.rpc.Invoker;23 import com.alibaba.dubbo.rpc.Result;24 import com.alibaba.dubbo.rpc.RpcContext;25 import com.alibaba.dubbo.rpc.RpcException;26 import com.alibaba.dubbo.rpc.cluster.Directory;27 import com.alibaba.dubbo.rpc.cluster.LoadBalance;28 29 import java.util.List;30 31 /**32 * BroadcastClusterInvoker33 *34 */35 public class BroadcastClusterInvokerextends AbstractClusterInvoker {36 37 private static final Logger logger = LoggerFactory.getLogger(BroadcastClusterInvoker.class);38 39 public BroadcastClusterInvoker(Directory directory) {40 super(directory);41 }42 43 @Override44 @SuppressWarnings({"unchecked", "rawtypes"})45 public Result doInvoke(final Invocation invocation, List > invokers, LoadBalance loadbalance) throws RpcException {46 checkInvokers(invokers, invocation);47 RpcContext.getContext().setInvokers((List) invokers);48 RpcException exception = null;49 Result result = null;50 for (Invoker invoker : invokers) {51 try {52 result = invoker.invoke(invocation);53 } catch (RpcException e) {54 exception = e;55 logger.warn(e.getMessage(), e);56 } catch (Throwable e) {57 exception = new RpcException(e.getMessage(), e);58 logger.warn(e.getMessage(), e);59 }60 }61 if (exception != null) {62 throw exception;63 }64 return result;65 }66 67 }