博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
dubbo 集群容错源码
阅读量:4578 次
发布时间:2019-06-09

本文共 21573 字,大约阅读时间需要 71 分钟。

1 /*  2  * Licensed to the Apache Software Foundation (ASF) under one or more  3  * contributor license agreements.  See the NOTICE file distributed with  4  * this work for additional information regarding copyright ownership.  5  * The ASF licenses this file to You under the Apache License, Version 2.0  6  * (the "License"); you may not use this file except in compliance with  7  * the License.  You may obtain a copy of the License at  8  *  9  *     http://www.apache.org/licenses/LICENSE-2.0 10  * 11  * Unless required by applicable law or agreed to in writing, software 12  * distributed under the License is distributed on an "AS IS" BASIS, 13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14  * See the License for the specific language governing permissions and 15  * limitations under the License. 16  */ 17 package com.alibaba.dubbo.rpc.cluster.support; 18  19 import com.alibaba.dubbo.common.Constants; 20 import com.alibaba.dubbo.common.Version; 21 import com.alibaba.dubbo.common.logger.Logger; 22 import com.alibaba.dubbo.common.logger.LoggerFactory; 23 import com.alibaba.dubbo.common.utils.NetUtils; 24 import com.alibaba.dubbo.rpc.Invocation; 25 import com.alibaba.dubbo.rpc.Invoker; 26 import com.alibaba.dubbo.rpc.Result; 27 import com.alibaba.dubbo.rpc.RpcContext; 28 import com.alibaba.dubbo.rpc.RpcException; 29 import com.alibaba.dubbo.rpc.cluster.Directory; 30 import com.alibaba.dubbo.rpc.cluster.LoadBalance; 31  32 import java.util.ArrayList; 33 import java.util.HashSet; 34 import java.util.List; 35 import java.util.Set; 36  37 /** 38  * When invoke fails, log the initial error and retry other invokers (retry n times, which means at most n different invokers will be invoked) 39  * Note that retry causes latency. 40  * 

41 * Failover 42 * 43 */ 44 public class FailoverClusterInvoker

extends AbstractClusterInvoker
{ 45 46 private static final Logger logger = LoggerFactory.getLogger(FailoverClusterInvoker.class); 47 48 public FailoverClusterInvoker(Directory
directory) { 49 super(directory); 50 } 51 52 @Override 53 @SuppressWarnings({"unchecked", "rawtypes"}) 54 public Result doInvoke(Invocation invocation, final List
> invokers, LoadBalance loadbalance) throws RpcException { 55 List
> copyinvokers = invokers; 56 checkInvokers(copyinvokers, invocation); 57 int len = getUrl().getMethodParameter(invocation.getMethodName(), Constants.RETRIES_KEY, Constants.DEFAULT_RETRIES) + 1; 58 if (len <= 0) { 59 len = 1; 60 } 61 // retry loop. 62 RpcException le = null; // last exception. 63 List
> invoked = new ArrayList
>(copyinvokers.size()); // invoked invokers. 64 Set
providers = new HashSet
(len); 65 for (int i = 0; i < len; i++) { 66 //Reselect before retry to avoid a change of candidate `invokers`. 67 //NOTE: if `invokers` changed, then `invoked` also lose accuracy. 68 if (i > 0) { 69 checkWhetherDestroyed(); 70 copyinvokers = list(invocation); 71 // check again 72 checkInvokers(copyinvokers, invocation); 73 } 74 Invoker
invoker = select(loadbalance, invocation, copyinvokers, invoked); 75 invoked.add(invoker); 76 RpcContext.getContext().setInvokers((List) invoked); 77 try { 78 Result result = invoker.invoke(invocation); 79 if (le != null && logger.isWarnEnabled()) { 80 logger.warn("Although retry the method " + invocation.getMethodName() 81 + " in the service " + getInterface().getName() 82 + " was successful by the provider " + invoker.getUrl().getAddress() 83 + ", but there have been failed providers " + providers 84 + " (" + providers.size() + "/" + copyinvokers.size() 85 + ") from the registry " + directory.getUrl().getAddress() 86 + " on the consumer " + NetUtils.getLocalHost() 87 + " using the dubbo version " + Version.getVersion() + ". Last error is: " 88 + le.getMessage(), le); 89 } 90 return result; 91 } catch (RpcException e) { 92 if (e.isBiz()) { // biz exception. 93 throw e; 94 } 95 le = e; 96 } catch (Throwable e) { 97 le = new RpcException(e.getMessage(), e); 98 } finally { 99 providers.add(invoker.getUrl().getAddress());100 }101 }102 throw new RpcException(le != null ? le.getCode() : 0, "Failed to invoke the method "103 + invocation.getMethodName() + " in the service " + getInterface().getName()104 + ". Tried " + len + " times of the providers " + providers105 + " (" + providers.size() + "/" + copyinvokers.size()106 + ") from the registry " + directory.getUrl().getAddress()107 + " on the consumer " + NetUtils.getLocalHost() + " using the dubbo version "108 + Version.getVersion() + ". Last error is: "109 + (le != null ? le.getMessage() : ""), le != null && le.getCause() != null ? le.getCause() : le);110 }111 112 }

 

 

 

 

 

 

1 /* 2  * Licensed to the Apache Software Foundation (ASF) under one or more 3  * contributor license agreements.  See the NOTICE file distributed with 4  * this work for additional information regarding copyright ownership. 5  * The ASF licenses this file to You under the Apache License, Version 2.0 6  * (the "License"); you may not use this file except in compliance with 7  * the License.  You may obtain a copy of the License at 8  * 9  *     http://www.apache.org/licenses/LICENSE-2.010  *11  * Unless required by applicable law or agreed to in writing, software12  * distributed under the License is distributed on an "AS IS" BASIS,13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.14  * See the License for the specific language governing permissions and15  * limitations under the License.16  */17 package com.alibaba.dubbo.rpc.cluster.support;18 19 import com.alibaba.dubbo.common.Version;20 import com.alibaba.dubbo.common.utils.NetUtils;21 import com.alibaba.dubbo.rpc.Invocation;22 import com.alibaba.dubbo.rpc.Invoker;23 import com.alibaba.dubbo.rpc.Result;24 import com.alibaba.dubbo.rpc.RpcException;25 import com.alibaba.dubbo.rpc.cluster.Directory;26 import com.alibaba.dubbo.rpc.cluster.LoadBalance;27 28 import java.util.List;29 30 /**31  * Execute exactly once, which means this policy will throw an exception immediately in case of an invocation error.32  * Usually used for non-idempotent write operations33  *34  * Fail-fast35  *36  */37 public class FailfastClusterInvoker
extends AbstractClusterInvoker
{38 39 public FailfastClusterInvoker(Directory
directory) {40 super(directory);41 }42 43 @Override44 public Result doInvoke(Invocation invocation, List
> invokers, LoadBalance loadbalance) throws RpcException {45 checkInvokers(invokers, invocation);46 Invoker
invoker = select(loadbalance, invocation, invokers, null);47 try {48 return invoker.invoke(invocation);49 } catch (Throwable e) {50 if (e instanceof RpcException && ((RpcException) e).isBiz()) { // biz exception.51 throw (RpcException) e;52 }53 throw new RpcException(e instanceof RpcException ? ((RpcException) e).getCode() : 0, "Failfast invoke providers " + invoker.getUrl() + " " + loadbalance.getClass().getSimpleName() + " select from all providers " + invokers + " for service " + getInterface().getName() + " method " + invocation.getMethodName() + " on consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion() + ", but no luck to perform the invocation. Last error is: " + e.getMessage(), e.getCause() != null ? e.getCause() : e);54 }55 }56 }

 

 

 

 

 

1 /* 2  * Licensed to the Apache Software Foundation (ASF) under one or more 3  * contributor license agreements.  See the NOTICE file distributed with 4  * this work for additional information regarding copyright ownership. 5  * The ASF licenses this file to You under the Apache License, Version 2.0 6  * (the "License"); you may not use this file except in compliance with 7  * the License.  You may obtain a copy of the License at 8  * 9  *     http://www.apache.org/licenses/LICENSE-2.010  *11  * Unless required by applicable law or agreed to in writing, software12  * distributed under the License is distributed on an "AS IS" BASIS,13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.14  * See the License for the specific language governing permissions and15  * limitations under the License.16  */17 package com.alibaba.dubbo.rpc.cluster.support;18 19 import com.alibaba.dubbo.common.logger.Logger;20 import com.alibaba.dubbo.common.logger.LoggerFactory;21 import com.alibaba.dubbo.rpc.Invocation;22 import com.alibaba.dubbo.rpc.Invoker;23 import com.alibaba.dubbo.rpc.Result;24 import com.alibaba.dubbo.rpc.RpcException;25 import com.alibaba.dubbo.rpc.RpcResult;26 import com.alibaba.dubbo.rpc.cluster.Directory;27 import com.alibaba.dubbo.rpc.cluster.LoadBalance;28 29 import java.util.List;30 31 /**32  * When invoke fails, log the error message and ignore this error by returning an empty RpcResult.33  * Usually used to write audit logs and other operations34  *35  * Fail-safe36  *37  */38 public class FailsafeClusterInvoker
extends AbstractClusterInvoker
{39 private static final Logger logger = LoggerFactory.getLogger(FailsafeClusterInvoker.class);40 41 public FailsafeClusterInvoker(Directory
directory) {42 super(directory);43 }44 45 @Override46 public Result doInvoke(Invocation invocation, List
> invokers, LoadBalance loadbalance) throws RpcException {47 try {48 checkInvokers(invokers, invocation);49 Invoker
invoker = select(loadbalance, invocation, invokers, null);50 return invoker.invoke(invocation);51 } catch (Throwable e) {52 logger.error("Failsafe ignore exception: " + e.getMessage(), e);53 return new RpcResult(); // ignore54 }55 }56 }

 

 

 

 

 

1 /*  2  * Licensed to the Apache Software Foundation (ASF) under one or more  3  * contributor license agreements.  See the NOTICE file distributed with  4  * this work for additional information regarding copyright ownership.  5  * The ASF licenses this file to You under the Apache License, Version 2.0  6  * (the "License"); you may not use this file except in compliance with  7  * the License.  You may obtain a copy of the License at  8  *  9  *     http://www.apache.org/licenses/LICENSE-2.0 10  * 11  * Unless required by applicable law or agreed to in writing, software 12  * distributed under the License is distributed on an "AS IS" BASIS, 13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14  * See the License for the specific language governing permissions and 15  * limitations under the License. 16  */ 17 package com.alibaba.dubbo.rpc.cluster.support; 18  19 import com.alibaba.dubbo.common.logger.Logger; 20 import com.alibaba.dubbo.common.logger.LoggerFactory; 21 import com.alibaba.dubbo.common.utils.NamedThreadFactory; 22 import com.alibaba.dubbo.rpc.Invocation; 23 import com.alibaba.dubbo.rpc.Invoker; 24 import com.alibaba.dubbo.rpc.Result; 25 import com.alibaba.dubbo.rpc.RpcException; 26 import com.alibaba.dubbo.rpc.RpcResult; 27 import com.alibaba.dubbo.rpc.cluster.Directory; 28 import com.alibaba.dubbo.rpc.cluster.LoadBalance; 29  30 import java.util.HashMap; 31 import java.util.List; 32 import java.util.Map; 33 import java.util.concurrent.ConcurrentHashMap; 34 import java.util.concurrent.ConcurrentMap; 35 import java.util.concurrent.Executors; 36 import java.util.concurrent.ScheduledExecutorService; 37 import java.util.concurrent.ScheduledFuture; 38 import java.util.concurrent.TimeUnit; 39  40 /** 41  * When fails, record failure requests and schedule for retry on a regular interval. 42  * Especially useful for services of notification. 43  * 44  * Failback 45  * 46  */ 47 public class FailbackClusterInvoker
extends AbstractClusterInvoker
{ 48 49 private static final Logger logger = LoggerFactory.getLogger(FailbackClusterInvoker.class); 50 51 private static final long RETRY_FAILED_PERIOD = 5 * 1000; 52 53 private final ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(2, new NamedThreadFactory("failback-cluster-timer", true)); 54 private final ConcurrentMap
> failed = new ConcurrentHashMap
>(); 55 private volatile ScheduledFuture
retryFuture; 56 57 public FailbackClusterInvoker(Directory
directory) { 58 super(directory); 59 } 60 61 private void addFailed(Invocation invocation, AbstractClusterInvoker
router) { 62 if (retryFuture == null) { 63 synchronized (this) { 64 if (retryFuture == null) { 65 retryFuture = scheduledExecutorService.scheduleWithFixedDelay(new Runnable() { 66 67 @Override 68 public void run() { 69 // collect retry statistics 70 try { 71 retryFailed(); 72 } catch (Throwable t) { // Defensive fault tolerance 73 logger.error("Unexpected error occur at collect statistic", t); 74 } 75 } 76 }, RETRY_FAILED_PERIOD, RETRY_FAILED_PERIOD, TimeUnit.MILLISECONDS); 77 } 78 } 79 } 80 failed.put(invocation, router); 81 } 82 83 void retryFailed() { 84 if (failed.size() == 0) { 85 return; 86 } 87 for (Map.Entry
> entry : new HashMap
>( 88 failed).entrySet()) { 89 Invocation invocation = entry.getKey(); 90 Invoker
invoker = entry.getValue(); 91 try { 92 invoker.invoke(invocation); 93 failed.remove(invocation); 94 } catch (Throwable e) { 95 logger.error("Failed retry to invoke method " + invocation.getMethodName() + ", waiting again.", e); 96 } 97 } 98 } 99 100 @Override101 protected Result doInvoke(Invocation invocation, List
> invokers, LoadBalance loadbalance) throws RpcException {102 try {103 checkInvokers(invokers, invocation);104 Invoker
invoker = select(loadbalance, invocation, invokers, null);105 return invoker.invoke(invocation);106 } catch (Throwable e) {107 logger.error("Failback to invoke method " + invocation.getMethodName() + ", wait for retry in background. Ignored exception: "108 + e.getMessage() + ", ", e);109 addFailed(invocation, this);110 return new RpcResult(); // ignore111 }112 }113 114 }

 

 

 

 

 

 

 

 

 

 

 

 

 

1 /*  2  * Licensed to the Apache Software Foundation (ASF) under one or more  3  * contributor license agreements.  See the NOTICE file distributed with  4  * this work for additional information regarding copyright ownership.  5  * The ASF licenses this file to You under the Apache License, Version 2.0  6  * (the "License"); you may not use this file except in compliance with  7  * the License.  You may obtain a copy of the License at  8  *  9  *     http://www.apache.org/licenses/LICENSE-2.0 10  * 11  * Unless required by applicable law or agreed to in writing, software 12  * distributed under the License is distributed on an "AS IS" BASIS, 13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14  * See the License for the specific language governing permissions and 15  * limitations under the License. 16  */ 17 package com.alibaba.dubbo.rpc.cluster.support; 18  19 import com.alibaba.dubbo.common.Constants; 20 import com.alibaba.dubbo.common.utils.NamedThreadFactory; 21 import com.alibaba.dubbo.rpc.Invocation; 22 import com.alibaba.dubbo.rpc.Invoker; 23 import com.alibaba.dubbo.rpc.Result; 24 import com.alibaba.dubbo.rpc.RpcContext; 25 import com.alibaba.dubbo.rpc.RpcException; 26 import com.alibaba.dubbo.rpc.cluster.Directory; 27 import com.alibaba.dubbo.rpc.cluster.LoadBalance; 28  29 import java.util.ArrayList; 30 import java.util.List; 31 import java.util.concurrent.BlockingQueue; 32 import java.util.concurrent.ExecutorService; 33 import java.util.concurrent.Executors; 34 import java.util.concurrent.LinkedBlockingQueue; 35 import java.util.concurrent.TimeUnit; 36 import java.util.concurrent.atomic.AtomicInteger; 37  38 /** 39  * Invoke a specific number of invokers concurrently, usually used for demanding real-time operations, but need to waste more service resources. 40  * 41  * Fork 42  * 43  */ 44 public class ForkingClusterInvoker
extends AbstractClusterInvoker
{ 45 46 private final ExecutorService executor = Executors.newCachedThreadPool(new NamedThreadFactory("forking-cluster-timer", true)); 47 48 public ForkingClusterInvoker(Directory
directory) { 49 super(directory); 50 } 51 52 @Override 53 @SuppressWarnings({"unchecked", "rawtypes"}) 54 public Result doInvoke(final Invocation invocation, List
> invokers, LoadBalance loadbalance) throws RpcException { 55 checkInvokers(invokers, invocation); 56 final List
> selected; 57 final int forks = getUrl().getParameter(Constants.FORKS_KEY, Constants.DEFAULT_FORKS); 58 final int timeout = getUrl().getParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT); 59 if (forks <= 0 || forks >= invokers.size()) { 60 selected = invokers; 61 } else { 62 selected = new ArrayList
>(); 63 for (int i = 0; i < forks; i++) { 64 // TODO. Add some comment here, refer chinese version for more details. 65 Invoker
invoker = select(loadbalance, invocation, invokers, selected); 66 if (!selected.contains(invoker)) { //Avoid add the same invoker several times. 67 selected.add(invoker); 68 } 69 } 70 } 71 RpcContext.getContext().setInvokers((List) selected); 72 final AtomicInteger count = new AtomicInteger(); 73 final BlockingQueue
ref = new LinkedBlockingQueue(); 74 for (final Invoker
invoker : selected) { 75 executor.execute(new Runnable() { 76 @Override 77 public void run() { 78 try { 79 Result result = invoker.invoke(invocation); 80 ref.offer(result); 81 } catch (Throwable e) { 82 int value = count.incrementAndGet(); 83 if (value >= selected.size()) { 84 ref.offer(e); 85 } 86 } 87 } 88 }); 89 } 90 try { 91 Object ret = ref.poll(timeout, TimeUnit.MILLISECONDS); 92 if (ret instanceof Throwable) { 93 Throwable e = (Throwable) ret; 94 throw new RpcException(e instanceof RpcException ? ((RpcException) e).getCode() : 0, "Failed to forking invoke provider " + selected + ", but no luck to perform the invocation. Last error is: " + e.getMessage(), e.getCause() != null ? e.getCause() : e); 95 } 96 return (Result) ret; 97 } catch (InterruptedException e) { 98 throw new RpcException("Failed to forking invoke provider " + selected + ", but no luck to perform the invocation. Last error is: " + e.getMessage(), e); 99 }100 }101 }

 

 

 

 

 

1 /* 2  * Licensed to the Apache Software Foundation (ASF) under one or more 3  * contributor license agreements.  See the NOTICE file distributed with 4  * this work for additional information regarding copyright ownership. 5  * The ASF licenses this file to You under the Apache License, Version 2.0 6  * (the "License"); you may not use this file except in compliance with 7  * the License.  You may obtain a copy of the License at 8  * 9  *     http://www.apache.org/licenses/LICENSE-2.010  *11  * Unless required by applicable law or agreed to in writing, software12  * distributed under the License is distributed on an "AS IS" BASIS,13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.14  * See the License for the specific language governing permissions and15  * limitations under the License.16  */17 package com.alibaba.dubbo.rpc.cluster.support;18 19 import com.alibaba.dubbo.common.logger.Logger;20 import com.alibaba.dubbo.common.logger.LoggerFactory;21 import com.alibaba.dubbo.rpc.Invocation;22 import com.alibaba.dubbo.rpc.Invoker;23 import com.alibaba.dubbo.rpc.Result;24 import com.alibaba.dubbo.rpc.RpcContext;25 import com.alibaba.dubbo.rpc.RpcException;26 import com.alibaba.dubbo.rpc.cluster.Directory;27 import com.alibaba.dubbo.rpc.cluster.LoadBalance;28 29 import java.util.List;30 31 /**32  * BroadcastClusterInvoker33  *34  */35 public class BroadcastClusterInvoker
extends AbstractClusterInvoker
{36 37 private static final Logger logger = LoggerFactory.getLogger(BroadcastClusterInvoker.class);38 39 public BroadcastClusterInvoker(Directory
directory) {40 super(directory);41 }42 43 @Override44 @SuppressWarnings({"unchecked", "rawtypes"})45 public Result doInvoke(final Invocation invocation, List
> invokers, LoadBalance loadbalance) throws RpcException {46 checkInvokers(invokers, invocation);47 RpcContext.getContext().setInvokers((List) invokers);48 RpcException exception = null;49 Result result = null;50 for (Invoker
invoker : invokers) {51 try {52 result = invoker.invoke(invocation);53 } catch (RpcException e) {54 exception = e;55 logger.warn(e.getMessage(), e);56 } catch (Throwable e) {57 exception = new RpcException(e.getMessage(), e);58 logger.warn(e.getMessage(), e);59 }60 }61 if (exception != null) {62 throw exception;63 }64 return result;65 }66 67 }

 

转载于:https://www.cnblogs.com/toUpdating/p/9058482.html

你可能感兴趣的文章
Android Studio 导入新工程项目
查看>>
Jupyter导出PDF从入门到绝望(已解决)
查看>>
vue的挖坑和爬坑之css背景图样式终极解决方法
查看>>
可变字典
查看>>
DS博客作业-05--树
查看>>
记录一些常用的JS属性和语句
查看>>
Map接口
查看>>
iOS启动图 LaunchImage LaunchScreen.xib
查看>>
[转]利用/*+Ordered*/提高查询性能
查看>>
JdbcTemplate 操作Oracle Blob
查看>>
循序渐进地进行代码重构
查看>>
centos7 yum安装配置redis 并设置密码
查看>>
鸟哥私房菜第六章 用户与用户组
查看>>
LRU Cache数据结构简介
查看>>
17.2.2.1 The Slave Relay Log Slave中继日志
查看>>
3.1.2 MVC模式和URL访问
查看>>
node.js
查看>>
gcc编译
查看>>
如何配置Java EE Eclipse+Tomcat 开发环境
查看>>
Android水平(横向)翻页列表,类似水平GridVIew
查看>>