Hystrix源码剖析

hystrix里面有很多有意思的地方,比如缓存、回退方法、滑动窗口...

本人目前理解不够透彻,暂打印回退方法栈来展示整个回退方法的流程,其他章节后续会补充进来。

代码基于注解方式,需引入包com.netflix.hystrix.hystrix-javanica

入口

注解依赖aspectj,com.netflix.hystrix.contrib.javanica.aop.aspectj.HystrixCommandAspect切面处理@HystrixCommand的方法注解,方法案例:

    /**
     * 故意抛出dubbo的超时异常
     * @param messageReq
     * @return
     */
    // HystrixCommandAspect
    // https://github.com/Netflix/Hystrix/wiki/Configuration
    @HystrixCommand(
            // HystrixCommandGroupKey
            groupKey = "MessageProxy",
            // HystrixCommandKey
            commandKey = "sendTimeout",
            commandProperties = {
                    //HystrixCommandProperties
                    @HystrixProperty(name = "execution.timeout.enabled", value = "false"),    //执行超时时间|default:1000
//                    @HystrixProperty(name = "execution.isolation.thread.timeoutInMilliseconds", value = "5000"),    //执行超时时间|default:1000|RPC本身支持超时选项则才优先RPC方式
                    @HystrixProperty(name = "circuitBreaker.requestVolumeThreshold", value = "20"), //触发断路最低请求数|default:20
                    @HystrixProperty(name = "circuitBreaker.sleepWindowInMilliseconds", value = "5000"),    //断路器恢复时间|default:5000
                    @HystrixProperty(name = "circuitBreaker.errorThresholdPercentage", value = "50"),   //|触发短路错误率,单位%|default:50
                    @HystrixProperty(name = "fallback.isolation.semaphore.maxConcurrentRequests", value = "100"),   // 设置调用线程产生的HystrixCommand.getFallback()方法的允许最大请求数目。|default: 10
            },
            threadPoolProperties = {
                    //HystrixThreadPoolProperties
                    @HystrixProperty(name = "coreSize", value = "15"),  //线程池核心数|default:10 超过队列fallback
                    @HystrixProperty(name = "maxQueueSize", value = "-1"),  //队列长度|default:-1(SynchronousQueue)
                    @HystrixProperty(name = "keepAliveTimeMinutes", value = "1"),   // 设置存活时间,单位分钟| 默认值:1|如果coreSize小于maximumSize,那么该属性控制一个线程从实用完成到被释放的时间。
//                    @HystrixProperty(name = "queueSizeRejectionThreshold", value = "15"),   //队满拒绝服务阈值|default:5|此值生效优先于队满, maxQueueSize设置为-1,该属性不可用。
                    // Metrics统计属性
                    //@HystrixProperty(name = "metrics.rollingStats.numBuckets", value = "12"),   // 设置滑动统计的桶数量。默认10。metrics.rollingStats.timeInMilliseconds必须能被这个值整除。
                    //@HystrixProperty(name = "metrics.rollingStats.timeInMilliseconds", value = "1440")  //窗口维持时间|default:10000
            },
            // HystrixThreadPoolKey|threadPoolKey =
            fallbackMethod = "fallback")
    public BaseResp sendTimeout(MessageReq messageReq){
        return iMessageFacade.sendTimeout(messageReq, 5000L);
    }

上下文数据装载器

通过阅读HystrixCommandAspect执行逻辑,com.netflix.hystrix.contrib.javanica.command.MetaHolder装载了切面的上下文元数据

/**
 * Simple immutable holder to keep all necessary information about current method to build Hystrix command.
 */
// todo: replace fallback related flags with FallbackMethod class
@Immutable
public final class MetaHolder {

Fallback

在装载上下文元数据,创建CommandAction组装执行器

/**
 * Simple action to encapsulate some logic to process it in a Hystrix command.
 */
public interface CommandAction {

创建回退执行器com.netflix.hystrix.contrib.javanica.command::createFallbackAction

 private CommandAction createFallbackAction(MetaHolder metaHolder) {

        FallbackMethod fallbackMethod = MethodProvider.getInstance().getFallbackMethod(metaHolder.getObj().getClass(),
                metaHolder.getMethod(), metaHolder.isExtendedFallback());
        fallbackMethod.validateReturnType(metaHolder.getMethod());
        CommandAction fallbackAction = null;
        if (fallbackMethod.isPresent()) {

            Method fMethod = fallbackMethod.getMethod();
            Object[] args = fallbackMethod.isDefault() ? new Object[0] : metaHolder.getArgs();
            if (fallbackMethod.isCommand()) {
                fMethod.setAccessible(true);
                HystrixCommand hystrixCommand = fMethod.getAnnotation(HystrixCommand.class);
                MetaHolder fmMetaHolder = MetaHolder.builder()
                        .obj(metaHolder.getObj())
                        .method(fMethod)
                        .ajcMethod(getAjcMethod(metaHolder.getObj(), fMethod))
                        .args(args)
                        .fallback(true)
                        .defaultFallback(fallbackMethod.isDefault())
                        .defaultCollapserKey(metaHolder.getDefaultCollapserKey())
                        .fallbackMethod(fMethod)
                        .extendedFallback(fallbackMethod.isExtended())
                        .fallbackExecutionType(fallbackMethod.getExecutionType())
                        .extendedParentFallback(metaHolder.isExtendedFallback())
                        .observable(ExecutionType.OBSERVABLE == fallbackMethod.getExecutionType())
                        .defaultCommandKey(fMethod.getName())
                        .defaultGroupKey(metaHolder.getDefaultGroupKey())
                        .defaultThreadPoolKey(metaHolder.getDefaultThreadPoolKey())
                        .defaultProperties(metaHolder.getDefaultProperties().orNull())
                        .hystrixCollapser(metaHolder.getHystrixCollapser())
                        .observableExecutionMode(hystrixCommand.observableExecutionMode())
                        .hystrixCommand(hystrixCommand).build();
                fallbackAction = new LazyCommandExecutionAction(fmMetaHolder);
            } else {
                MetaHolder fmMetaHolder = MetaHolder.builder()
                        .obj(metaHolder.getObj())
                        .defaultFallback(fallbackMethod.isDefault())
                        .method(fMethod)
                        .fallbackExecutionType(ExecutionType.SYNCHRONOUS)
                        .extendedFallback(fallbackMethod.isExtended())
                        .extendedParentFallback(metaHolder.isExtendedFallback())
                        .ajcMethod(null) // if fallback method isn't annotated with command annotation then we don't need to get ajc method for this
                        .args(args).build();

                fallbackAction = new MethodExecutionAction(fmMetaHolder.getObj(), fMethod, fmMetaHolder.getArgs(), fmMetaHolder);
            }

        }
        return fallbackAction;
    }

最后组装成命令执行器集com.netflix.hystrix.contrib.javanica.command.CommandActions

/**
 * Wrapper for command actions combines different actions together.
 *
 * @author dmgcodevil
 */
public class CommandActions {
    private final CommandAction commandAction;
    private final CommandAction fallbackAction;

执行Command逻辑使用rxjava,串联事件链(缓存、熔断、),这里不做详述

关键逻辑定义了观察者异常,错误传播执行回退方法com.netflix.hystrix.AbstractCommand::executeCommandAndObserve.handleFallback

/**
 * This decorates "Hystrix" functionality around the run() Observable.
 *
 * @return R
 */
private Observable<R> executeCommandAndObserve(final AbstractCommand<R> _cmd) {
        ...
        final Func1<Throwable, Observable<R>> handleFallback = new Func1<Throwable, Observable<R>>() {
            @Override
            public Observable<R> call(Throwable t) {
                circuitBreaker.markNonSuccess();
                Exception e = getExceptionFromThrowable(t);
                executionResult = executionResult.setExecutionException(e);
                if (e instanceof RejectedExecutionException) {
                    return handleThreadPoolRejectionViaFallback(e);
                } else if (t instanceof HystrixTimeoutException) {
                    return handleTimeoutViaFallback();
                } else if (t instanceof HystrixBadRequestException) {
                    return handleBadRequestByEmittingError(e);
                } else {
                    /*
                     * Treat HystrixBadRequestException from ExecutionHook like a plain HystrixBadRequestException.
                     */
                    if (e instanceof HystrixBadRequestException) {
                        eventNotifier.markEvent(HystrixEventType.BAD_REQUEST, commandKey);
                        return Observable.error(e);
                    }
                    // 程序RuntimeException, by zzqfsy 2018-07-31
                    return handleFailureViaFallback(e);
                }
            }
        };
        ...
        return execution.doOnNext(markEmits)
        .doOnCompleted(markOnCompleted)
        .onErrorResumeNext(handleFallback)
        .doOnEach(setRequestContext);
}

通过回退来处理故障com.netflix.hystrix.AbstractCommand::handleFailureViaFallback

private Observable<R> handleFailureViaFallback(Exception underlying) {
    /**
     * All other error handling
     */
    logger.debug("Error executing HystrixCommand.run(). Proceeding to fallback logic ...", underlying);

    // report failure
    eventNotifier.markEvent(HystrixEventType.FAILURE, commandKey);

    // record the exception
    executionResult = executionResult.setException(underlying);
    return getFallbackOrThrowException(this, HystrixEventType.FAILURE, FailureType.COMMAND_EXCEPTION, "failed", underlying);
}

获取回退处理方法或直接抛出异常com.netflix.hystrix.AbstractCommand::getFallbackOrThrowException

/**
 * Execute <code>getFallback()</code> within protection of a semaphore that limits number of concurrent executions.
 * <p>
 * Fallback implementations shouldn't perform anything that can be blocking, but we protect against it anyways in case someone doesn't abide by the contract.
 * <p>
 * If something in the <code>getFallback()</code> implementation is latent (such as a network call) then the semaphore will cause us to start rejecting requests rather than allowing potentially
 * all threads to pile up and block.
 *
 * @return K
 * @throws UnsupportedOperationException
 *             if getFallback() not implemented
 * @throws HystrixRuntimeException
 *             if getFallback() fails (throws an Exception) or is rejected by the semaphore
 */
private Observable<R> getFallbackOrThrowException(final AbstractCommand<R> _cmd, final HystrixEventType eventType, final FailureType failureType, final String message, final Exception originalException) {
            ...
            Observable<R> fallbackExecutionChain;

            // acquire a permit
            if (fallbackSemaphore.tryAcquire()) {
                try {
                    if (isFallbackUserDefined()) {
                        executionHook.onFallbackStart(this);
                        fallbackExecutionChain = getFallbackObservable();
                    } else {
                        //same logic as above without the hook invocation
                        fallbackExecutionChain = getFallbackObservable();
                    }
                } catch (Throwable ex) {
                    //If hook or user-fallback throws, then use that as the result of the fallback lookup
                    fallbackExecutionChain = Observable.error(ex);
                }

                return fallbackExecutionChain
                        .doOnEach(setRequestContext)
                        .lift(new FallbackHookApplication(_cmd))
                        .lift(new DeprecatedOnFallbackHookApplication(_cmd))
                        .doOnNext(markFallbackEmit)
                        .doOnCompleted(markFallbackCompleted)
                        .onErrorResumeNext(handleFallbackError)
                        .doOnTerminate(singleSemaphoreRelease)
                        .doOnUnsubscribe(singleSemaphoreRelease);
            } else {
               return handleFallbackRejectionByEmittingError();
            }
        } else {
            return handleFallbackDisabledByEmittingError(originalException, failureType, message);
        }
    }
}

获取回退方法观察者com.netflix.hystrix.HystrixCommand::getFallbackObservable

final protected Observable<R> getFallbackObservable() {
    return Observable.defer(new Func0<Observable<R>>() {
        @Override
        public Observable<R> call() {
            try {
                return Observable.just(getFallback());
            } catch (Throwable ex) {
                return Observable.error(ex);
            }
        }
    });
}

最终执行的函数com.netflix.hystrix.GenericCommand::getFallback

/**
 * The fallback is performed whenever a command execution fails.
 * Also a fallback method will be invoked within separate command in the case if fallback method was annotated with
 * HystrixCommand annotation, otherwise current implementation throws RuntimeException and leaves the caller to deal with it
 * (see {@link super#getFallback()}).
 * The getFallback() is always processed synchronously.
 * Since getFallback() can throw only runtime exceptions thus any exceptions are thrown within getFallback() method
 * are wrapped in {@link FallbackInvocationException}.
 * A caller gets {@link com.netflix.hystrix.exception.HystrixRuntimeException}
 * and should call getCause to get original exception that was thrown in getFallback().
 *
 * @return result of invocation of fallback method or RuntimeException
 */
@Override
protected Object getFallback() {
    final CommandAction commandAction = getFallbackAction();
    if (commandAction != null) {
        try {
            return process(new Action() {
                @Override
                Object execute() {
                    MetaHolder metaHolder = commandAction.getMetaHolder();
                    Object[] args = createArgsForFallback(metaHolder, getExecutionException());
                    return commandAction.executeWithArgs(metaHolder.getFallbackExecutionType(), args);
                }
            });
        } catch (Throwable e) {
            LOGGER.error(FallbackErrorMessageBuilder.create()
                    .append(commandAction, e).build());
            throw new FallbackInvocationException(unwrapCause(e));
        }
    } else {
        return super.getFallback();
    }
}

这个回退方法的栈非常长,其他细节的暂时对源码剖析告一段落...

代码面前,了无秘密。

Last updated