聊聊skywalking的spring-cloud-gateway-plugin

NettyRoutingFilterInstrumentation

skywalking-6.6.0/apm-sniffer/optional-plugins/optional-spring-plugins/optional-spring-cloud/gateway-2.1.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/spring/cloud/gateway/v21x/define/NettyRoutingFilterInstrumentation.java

public class NettyRoutingFilterInstrumentation extends ClassInstanceMethodsEnhancePluginDefine {

    @Override
    public ConstructorInterceptPoint[] getConstructorsInterceptPoints() {
        return new ConstructorInterceptPoint[0];
    }

    @Override
    public InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints() {
        return new InstanceMethodsInterceptPoint[]{
            new InstanceMethodsInterceptPoint() {
                @Override
                public ElementMatcher<MethodDescription> getMethodsMatcher() {
                    return named("filter").and(takesArgumentWithType(0, "org.springframework.web.server.ServerWebExchange"));
                }

                @Override
                public String getMethodsInterceptor() {
                    return "org.apache.skywalking.apm.plugin.spring.cloud.gateway.v21x.NettyRoutingFilterInterceptor";
                }

                @Override
                public boolean isOverrideArgs() {
                    return false;
                }
            }
        };
    }

    @Override
    public ClassMatch enhanceClass() {
        return byName("org.springframework.cloud.gateway.filter.NettyRoutingFilter");
    }

    @Override
    protected final String[] witnessClasses() {
        return new String[]{"org.springframework.cloud.gateway.handler.FilteringWebHandler", "reactor.netty.http.client.HttpClientOperations"};
    }
}
  • NettyRoutingFilterInstrumentation继承了ClassInstanceMethodsEnhancePluginDefine,它使用org.apache.skywalking.apm.plugin.spring.cloud.gateway.v21x.NettyRoutingFilterInterceptor拦截org.springframework.cloud.gateway.filter.NettyRoutingFilter带有org.springframework.web.server.ServerWebExchange参数的filter方法

NettyRoutingFilterInterceptor

skywalking-6.6.0/apm-sniffer/optional-plugins/optional-spring-plugins/optional-spring-cloud/gateway-2.1.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/spring/cloud/gateway/v21x/NettyRoutingFilterInterceptor.java

public class NettyRoutingFilterInterceptor implements InstanceMethodsAroundInterceptor {


    @Override
    public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,
                             MethodInterceptResult result) throws Throwable {
        EnhancedInstance instance = NettyRoutingFilterInterceptor.getInstance(allArguments[0]);
        if (instance != null) {
            SWTransmitter swTransmitter = (SWTransmitter) instance.getSkyWalkingDynamicField();
            ContextManager.getRuntimeContext().put(Constants.SPRING_CLOUD_GATEWAY_TRANSMITTER, swTransmitter);
        }
    }

    @Override
    public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments,
                              Class<?>[] argumentsTypes, Object ret) throws Throwable {
        if (ContextManager.getRuntimeContext().get(Constants.SPRING_CLOUD_GATEWAY_TRANSMITTER) != null) {
            ContextManager.getRuntimeContext().remove(Constants.SPRING_CLOUD_GATEWAY_TRANSMITTER);
        }
        return ret;
    }


    @Override
    public void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments,
                                      Class<?>[] argumentsTypes, Throwable t) {
    }

    public static EnhancedInstance getInstance(Object o) {
        EnhancedInstance instance = null;
        if (o instanceof ServerWebExchangeDecorator) {
            instance = getEnhancedInstance((ServerWebExchangeDecorator) o);
        } else if (o instanceof DefaultServerWebExchange) {
            instance = (EnhancedInstance) o;
        }
        return instance;
    }


    private static EnhancedInstance getEnhancedInstance(ServerWebExchangeDecorator serverWebExchangeDecorator) {
        Object o = serverWebExchangeDecorator.getDelegate();
        if (o instanceof ServerWebExchangeDecorator) {
            return getEnhancedInstance((ServerWebExchangeDecorator) o);
        } else if (o instanceof DefaultServerWebExchange) {
            return (EnhancedInstance) o;
        } else if (o == null) {
            throw new NullPointerException("The expected class DefaultServerWebExchange is null");
        } else {
            throw new RuntimeException("Unknown parameter types:" + o.getClass());
        }
    }
}
  • NettyRoutingFilterInterceptor实现了InstanceMethodsAroundInterceptor接口,其beforeMethod方法获取swTransmitter,然后以名为Constants.SPRING_CLOUD_GATEWAY_TRANSMITTER的key放入到ContextManager.getRuntimeContext();其afterMethod方法执行ContextManager.getRuntimeContext().remove(Constants.SPRING_CLOUD_GATEWAY_TRANSMITTER)

HttpClientOperationsInstrumentation

skywalking-6.6.0/apm-sniffer/optional-plugins/optional-spring-plugins/optional-spring-cloud/gateway-2.1.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/spring/cloud/gateway/v21x/define/HttpClientOperationsInstrumentation.java

public class HttpClientOperationsInstrumentation extends ClassInstanceMethodsEnhancePluginDefine {

    @Override public ConstructorInterceptPoint[] getConstructorsInterceptPoints() {
        return new ConstructorInterceptPoint[0];
    }

    @Override
    public InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints() {
        return new InstanceMethodsInterceptPoint[]{
            new InstanceMethodsInterceptPoint() {
                @Override
                public ElementMatcher<MethodDescription> getMethodsMatcher() {
                    return named("headers").and(takesArgumentWithType(0, "io.netty.handler.codec.http.HttpHeaders"));
                }

                @Override
                public String getMethodsInterceptor() {
                    return "org.apache.skywalking.apm.plugin.spring.cloud.gateway.v21x.HttpClientOperationsHeadersInterceptor";
                }

                @Override
                public boolean isOverrideArgs() {
                    return false;
                }
            },new InstanceMethodsInterceptPoint() {
                @Override
                public ElementMatcher<MethodDescription> getMethodsMatcher() {
                    return named("send").and(takesArgumentWithType(0, "org.reactivestreams.Publisher"));
                }

                @Override
                public String getMethodsInterceptor() {
                    return "org.apache.skywalking.apm.plugin.spring.cloud.gateway.v21x.HttpClientOperationsSendInterceptor";
                }

                @Override
                public boolean isOverrideArgs() {
                    return false;
                }
            },
            new InstanceMethodsInterceptPoint() {
                @Override
                public ElementMatcher<MethodDescription> getMethodsMatcher() {
                    return named("status");
                }

                @Override
                public String getMethodsInterceptor() {
                    return "org.apache.skywalking.apm.plugin.spring.cloud.gateway.v21x.HttpClientOperationsStatusInterceptor";
                }

                @Override
                public boolean isOverrideArgs() {
                    return false;
                }
            },
        };
    }

    @Override
    public ClassMatch enhanceClass() {
        return byName("reactor.netty.http.client.HttpClientOperations");
    }
}
  • HttpClientOperationsInstrumentation继承了ClassInstanceMethodsEnhancePluginDefine
  • 它使用org.apache.skywalking.apm.plugin.spring.cloud.gateway.v21x.HttpClientOperationsHeadersInterceptor拦截reactor.netty.http.client.HttpClientOperations的带有io.netty.handler.codec.http.HttpHeaders参数的headers方法
  • 它还使用org.apache.skywalking.apm.plugin.spring.cloud.gateway.v21x.HttpClientOperationsSendInterceptor拦截reactor.netty.http.client.HttpClientOperations的带有org.reactivestreams.Publisher参数的send方法
  • 它还使用org.apache.skywalking.apm.plugin.spring.cloud.gateway.v21x.HttpClientOperationsStatusInterceptor拦截reactor.netty.http.client.HttpClientOperations的status方法

HttpClientOperationsHeadersInterceptor

skywalking-6.6.0/apm-sniffer/optional-plugins/optional-spring-plugins/optional-spring-cloud/gateway-2.1.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/spring/cloud/gateway/v21x/HttpClientOperationsHeadersInterceptor.java

public class HttpClientOperationsHeadersInterceptor implements InstanceMethodsAroundInterceptor {

    private static final ILog logger = LogManager.getLogger(HttpClientOperationsHeadersInterceptor.class);

    @Override
    public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,
                             MethodInterceptResult result) throws Throwable {
    }

    @Override
    public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments,
                              Class<?>[] argumentsTypes, Object ret) throws Throwable {
        Object transmitter = ((EnhancedInstance) allArguments[0]).getSkyWalkingDynamicField();
        if (transmitter != null) {
            objInst.setSkyWalkingDynamicField(transmitter);
            ((EnhancedInstance) allArguments[0]).setSkyWalkingDynamicField(null);
        }
        return ret;
    }


    @Override
    public void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments,
                                      Class<?>[] argumentsTypes, Throwable t) {
    }
}
  • HttpClientOperationsHeadersInterceptor实现了InstanceMethodsAroundInterceptor接口,其afterMethod方法执行objInst.setSkyWalkingDynamicField(transmitter)

HttpClientOperationsSendInterceptor

skywalking-6.6.0/apm-sniffer/optional-plugins/optional-spring-plugins/optional-spring-cloud/gateway-2.1.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/spring/cloud/gateway/v21x/HttpClientOperationsSendInterceptor.java

public class HttpClientOperationsSendInterceptor implements InstanceMethodsAroundInterceptor {

    @Override
    public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,
                             MethodInterceptResult result) throws Throwable {
        SWTransmitter transmitter = (SWTransmitter) objInst.getSkyWalkingDynamicField();
        if (transmitter != null) {
            HttpClientRequest request = (HttpClientRequest) objInst;

            HttpHeaders header = request.requestHeaders();
            ChannelOperations channelOpt = (ChannelOperations) objInst;
            InetSocketAddress remote = (InetSocketAddress) (channelOpt.channel().remoteAddress());
            String peer = remote.getHostName() + ":" + remote.getPort();

            AbstractSpan span = ContextManager.createExitSpan(transmitter.getOperationName(), peer);
            ContextManager.continued(transmitter.getSnapshot());
            ContextCarrier contextCarrier = new ContextCarrier();
            ContextManager.inject(contextCarrier);

            span.setComponent(ComponentsDefine.SPRING_CLOUD_GATEWAY);
            Tags.URL.set(span, peer + request.uri());
            Tags.HTTP.METHOD.set(span, request.method().name());
            SpanLayer.asHttp(span);

            CarrierItem next = contextCarrier.items();
            while (next.hasNext()) {
                next = next.next();
                header.set(next.getHeadKey(), next.getHeadValue());
            }
            transmitter.setSpanGateway(span.prepareForAsync());
            ContextManager.stopSpan(span);
        }
    }

    @Override
    public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments,
                              Class<?>[] argumentsTypes, Object ret) throws Throwable {
        return ret;
    }


    @Override
    public void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments,
                                      Class<?>[] argumentsTypes, Throwable t) {
        ContextManager.activeSpan().errorOccurred().log(t);
    }
}
  • HttpClientOperationsSendInterceptor实现了InstanceMethodsAroundInterceptor接口,其beforeMethod方法获取request header、uri、method等信息设置到span中,最后执行ContextManager.stopSpan(span);其handleMethodException方法执行ContextManager.activeSpan().errorOccurred().log(t)

HttpClientOperationsStatusInterceptor

skywalking-6.6.0/apm-sniffer/optional-plugins/optional-spring-plugins/optional-spring-cloud/gateway-2.1.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/spring/cloud/gateway/v21x/HttpClientOperationsStatusInterceptor.java

public class HttpClientOperationsStatusInterceptor implements InstanceMethodsAroundInterceptor {

    @Override
    public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,
                             MethodInterceptResult result) throws Throwable {
    }

    @Override
    public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,
                              Object ret) throws Throwable {

        SWTransmitter transmitter = (SWTransmitter) objInst.getSkyWalkingDynamicField();
        if (transmitter != null) {
            HttpResponseStatus response = (HttpResponseStatus) ret;
            if (response.code() >= 400) {
                Tags.STATUS_CODE.set(transmitter.getSpanGateway().errorOccurred(), String.valueOf(response.code()));
            }
            transmitter.getSpanGateway().asyncFinish();
            objInst.setSkyWalkingDynamicField(null);
        }
        return ret;
    }

    @Override
    public void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments,
                                      Class<?>[] argumentsTypes, Throwable t) {
        ContextManager.activeSpan().errorOccurred().log(t);
    }
}
  • HttpClientOperationsStatusInterceptor实现了InstanceMethodsAroundInterceptor接口,其afterMethod方法获取transmitter,在response.code()大于等于400时设置statusCode的tag,然后执行transmitter.getSpanGateway().asyncFinish();其handleMethodException执行ContextManager.activeSpan().errorOccurred().log(t)

FilteringWebHandlerInstrumentation

skywalking-6.6.0/apm-sniffer/optional-plugins/optional-spring-plugins/optional-spring-cloud/gateway-2.1.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/spring/cloud/gateway/v21x/define/FilteringWebHandlerInstrumentation.java

public class FilteringWebHandlerInstrumentation extends ClassInstanceMethodsEnhancePluginDefine {

    @Override
    public ConstructorInterceptPoint[] getConstructorsInterceptPoints() {
        return new ConstructorInterceptPoint[0];
    }

    @Override
    public InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints() {
        return new InstanceMethodsInterceptPoint[]{
            new InstanceMethodsInterceptPoint() {
                @Override
                public ElementMatcher<MethodDescription> getMethodsMatcher() {
                    return named("handle");
                }

                @Override
                public String getMethodsInterceptor() {
                    return "org.apache.skywalking.apm.plugin.spring.cloud.gateway.v21x.FilteringWebHandlerInterceptor";
                }

                @Override
                public boolean isOverrideArgs() {
                    return false;
                }
            }
        };
    }

    @Override
    public ClassMatch enhanceClass() {
        return byName("org.springframework.cloud.gateway.handler.FilteringWebHandler");
    }

}
  • FilteringWebHandlerInstrumentation继承了ClassInstanceMethodsEnhancePluginDefine,它使用org.apache.skywalking.apm.plugin.spring.cloud.gateway.v21x.FilteringWebHandlerInterceptor拦截org.springframework.cloud.gateway.handler.FilteringWebHandler的handle方法

FilteringWebHandlerInterceptor

skywalking-6.6.0/apm-sniffer/optional-plugins/optional-spring-plugins/optional-spring-cloud/gateway-2.1.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/spring/cloud/gateway/v21x/FilteringWebHandlerInterceptor.java

public class FilteringWebHandlerInterceptor implements InstanceMethodsAroundInterceptor {

    private static final String SPRING_CLOUD_GATEWAY_ROUTE_PREFIX = "GATEWAY/";

    @Override
    public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,
                             MethodInterceptResult result) throws Throwable {
        EnhancedInstance instance = NettyRoutingFilterInterceptor.getInstance(allArguments[0]);
        if (instance == null) {
            return;
        }
        ContextSnapshot contextSnapshot = (ContextSnapshot) instance.getSkyWalkingDynamicField();
        if (contextSnapshot == null) {
            return;
        }

        ServerWebExchange exchange = (ServerWebExchange) allArguments[0];
        String operationName = SPRING_CLOUD_GATEWAY_ROUTE_PREFIX;
        Route route = exchange.getRequiredAttribute(GATEWAY_ROUTE_ATTR);
        operationName = operationName + route.getId();
        SWTransmitter transmitter = new SWTransmitter(contextSnapshot, operationName);
        instance.setSkyWalkingDynamicField(transmitter);
    }

    @Override
    public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments,
                              Class<?>[] argumentsTypes, Object ret) throws Throwable {
        EnhancedInstance instance = NettyRoutingFilterInterceptor.getInstance(allArguments[0]);
        if (instance == null) {
            return ret;
        }
        SWTransmitter swTransmitter = (SWTransmitter) instance.getSkyWalkingDynamicField();
        if (swTransmitter == null) {
            return ret;
        }
        Mono<Void> mono = (Mono) ret;
        return mono.doFinally(d -> {
            ServerWebExchange exchange = (ServerWebExchange) allArguments[0];
            HttpStatus statusCode = exchange.getResponse().getStatusCode();
            if (statusCode == HttpStatus.TOO_MANY_REQUESTS) {
                AbstractSpan localSpan = ContextManager.createLocalSpan(swTransmitter.getOperationName());
                Tags.STATUS_CODE.set(localSpan,statusCode.toString());
                SpanLayer.asHttp(localSpan);
                localSpan.setComponent(ComponentsDefine.SPRING_CLOUD_GATEWAY);
                ContextManager.continued(swTransmitter.getSnapshot());
                ContextManager.stopSpan(localSpan);
            }
        });
    }


    @Override
    public void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments,
                                      Class<?>[] argumentsTypes, Throwable t) {
    }

}
  • FilteringWebHandlerInterceptor实现了InstanceMethodsAroundInterceptor接口,其beforeMethod方法从exchange中获取route信息最后形成operationName创建SWTransmitter并执行instance.setSkyWalkingDynamicField(transmitter);其afterMethod方法获取SWTransmitter,然后注册mono的doFinally回调,在里头获取statusCode更细span,然后执行ContextManager.continued(swTransmitter.getSnapshot())及ContextManager.stopSpan(localSpan)

DefaultHttpHeadersInstrumentation

skywalking-6.6.0/apm-sniffer/optional-plugins/optional-spring-plugins/optional-spring-cloud/gateway-2.1.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/spring/cloud/gateway/v21x/define/DefaultHttpHeadersInstrumentation.java

public class DefaultHttpHeadersInstrumentation extends ClassInstanceMethodsEnhancePluginDefine {

    @Override public ConstructorInterceptPoint[] getConstructorsInterceptPoints() {
        return new ConstructorInterceptPoint[] {
            new ConstructorInterceptPoint() {
                @Override public ElementMatcher<MethodDescription> getConstructorMatcher() {
                    return takesArgumentWithType(0, "io.netty.handler.codec.DefaultHeaders");
                }

                @Override public String getConstructorInterceptor() {
                    return "org.apache.skywalking.apm.plugin.spring.cloud.gateway.v21x.DefaultHttpHeadersInterceptor";
                }
            }
        };
    }

    @Override
    public InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints() {
        return new InstanceMethodsInterceptPoint[0];
    }

    @Override
    public ClassMatch enhanceClass() {
        return byName("io.netty.handler.codec.http.DefaultHttpHeaders");
    }
}
  • DefaultHttpHeadersInstrumentation继承了ClassInstanceMethodsEnhancePluginDefine,它使用org.apache.skywalking.apm.plugin.spring.cloud.gateway.v21x.DefaultHttpHeadersInterceptor拦截io.netty.handler.codec.http.DefaultHttpHeaders的第一个参数为io.netty.handler.codec.DefaultHeaders的方法

DefaultHttpHeadersInterceptor

skywalking-6.6.0/apm-sniffer/optional-plugins/optional-spring-plugins/optional-spring-cloud/gateway-2.1.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/spring/cloud/gateway/v21x/DefaultHttpHeadersInterceptor.java

public class DefaultHttpHeadersInterceptor implements InstanceConstructorInterceptor {
    @Override
    public void onConstruct(EnhancedInstance objInst, Object[] allArguments) {
        Object transmitter = ContextManager.getRuntimeContext().get(Constants.SPRING_CLOUD_GATEWAY_TRANSMITTER);
        if (transmitter != null) {
            objInst.setSkyWalkingDynamicField(transmitter);
            ContextManager.getRuntimeContext().remove(Constants.SPRING_CLOUD_GATEWAY_TRANSMITTER);
        }
    }
}
  • DefaultHttpHeadersInterceptor实现了InstanceConstructorInterceptor接口,其onConstruct方法主要是将ContextManager.getRuntimeContext()中的transmitter设置到objInst中,然后从ContextManager.getRuntimeContext()移除该transmitter

小结

skywalking的spring-cloud-gateway-plugin主要有四个instrument,分别是NettyRoutingFilterInstrumentation、HttpClientOperationsInstrumentation、FilteringWebHandlerInstrumentation、DefaultHttpHeadersInstrumentation

doc