pRPC-Day1
关于项目所需技术栈,笔者会另外编写入门笔记。
此项目基于guide哥的自定义RPC框架,仓库-https://gitee.com/p1utowu/pluto-rpc-frame
项目主要技术栈
Netty | 网络传输组件(基于NIO并优于原生NIO) |
---|---|
Kyro | 序列化工具(替代JDK原生序列化) |
Zookeeper | 注册中心(负责服务地址的注册与查找) |
SPI | 服务提供发现机制(解耦) |
Spring | 提供注解服务用于注册消费 |
RPC框架设计
此处的RPC框架指的是:可以让客户端直接调用服务端方法就像调用本地方法一样简单的框架,例如Dubbo、gRPC等
如果与HTTP协议产生联系,解析和封装HTTP请求与响应,那这些框架不能称为“RPC框架”,例如Feign和Spring Cloud Square
最简单的RPC框架示意图如下
服务提供端Server向注册中心注册服务,服务消费端Client通过注册中心获取服务相关信息,然后再通过网络请求服务端Server
作为RPC框架的佼佼者Dubbo的架构如下
RPC框架不仅需要提供最基础的服务发现功能,还需要提供负载均衡、容错重连等功能,如此才是一个完整的RPC框架
下列为设计RPC框架的大体思路:
- 注册中心:注册中心使用Zookeeper。注册中心负责服务地址的注册与查找,相当于目录服务。服务端启动时将服务名称与相对应的地址即
IP+port
注册到注册中心,服务消费端(即客户端)根据服务名称查找对应的服务地址后,通过网络请求服务端。 - 网络传输:网络传输组件使用基于NIO的Netty框架。在调用远程方法时,调用Netty的对应接口以实现网络传输。
- 序列化:涉及到网络传输一定会涉及到序列化,而JDK自带的序列化存在效率低下以及安全性不足的缺点,我们需要用例如Hession2、Kyro、ProtoStuff。综合优缺点,我们在此框架中采用Kyro。
- 动态代理:RPC 的主要目的就是要在调用远程方法像调用本地方法一样简单,使用动态代理可以屏蔽远程方法调用的细节比如网络传输。
- 负载均衡:如果某个服务器负载过大会导致该服务器宕机等严重后果,所以需要将访问量大的服务同时部署在多个服务器上,当客户端发起请求时,需要负载均衡分配任务,使各服务器得到充分且合理的利用。
值得一提的点
SPI机制的应用
SPI简介
SPI全称Service Provider Interface,是Java提供的一套用来被第三方实现或者扩展的API,它可以用来启用框架扩展和替换组件。
整体机制图如下:
Java SPI 实际上是“基于接口的编程+策略模式+配置文件”组合实现的动态加载机制。
系统设计的各个抽象,往往有很多不同的实现方案,在面向的对象的设计里,一般推荐模块之间基于接口编程,模块之间不对实现类进行硬编码。一旦代码里涉及具体的实现类,就违反了可拔插的原则,如果需要替换一种实现,就需要修改代码。为了实现在模块装配的时候能不在程序里动态指明,这就需要一种服务发现机制。
Java SPI就是提供这样的一个机制:为某个接口寻找服务实现的机制。有点类似IOC的思想,就是将装配的控制权移到程序之外,在模块化设计中这个机制尤其重要。所以SPI的核心思想就是解耦。
应用场景
概括地说,适用于:调用者根据实际使用需要,启用、扩展、或者替换框架的实现策略
比较常见的例子:
- 数据库驱动加载接口实现类的加载
JDBC加载不同类型数据库的驱动 - 日志门面接口实现类加载
SLF4J加载不同提供商的日志实现类 - Spring
Spring中大量使用了SPI,比如:对servlet3.0规范对ServletContainerInitializer的实现、自动类型转换Type Conversion SPI(Converter SPI、Formatter SPI)等 - Dubbo
Dubbo中也大量使用SPI的方式实现框架的扩展, 不过它对Java提供的原生SPI做了封装,允许用户扩展实现Filter接口
使用介绍
要使用Java SPI,需要遵循如下约定:
- 当服务提供者提供了接口的一种具体实现后,在jar包的META-INF/services目录下创建一个以“接口全限定名”为命名的文件,内容为实现类的全限定名;
- 接口实现类所在的jar包放在主程序的classpath中;
- 主程序通过java.util.ServiceLoder动态装载实现模块,它通过扫描META-INF/services目录下的配置文件找到实现类的全限定名,把类加载到JVM;
- SPI的实现类必须携带一个不带参数的构造方法;
在项目中的具体实现(自定义实现)
项目创建的“接口全限定名”为命名的文件
参照Dubbo源码编写的ExtensionLoader类
public final class ExtensionLoader<T> {
private static final String SERVICE_DIRECTORY = "META-INF/extensions/";
private static final Map<Class<?>, ExtensionLoader<?>> EXTENSION_LOADERS = new ConcurrentHashMap<>();
private static final Map<Class<?>, Object> EXTENSION_INSTANCES = new ConcurrentHashMap<>();
private final Class<?> type;
private final Map<String, Holder<Object>> cachedInstances = new ConcurrentHashMap<>();
private final Holder<Map<String, Class<?>>> cachedClasses = new Holder<>();
private ExtensionLoader(Class<?> type) {
this.type = type;
}
/**
* 得到扩展加载程序
*
* @param type 类型
* @return {@link ExtensionLoader<S>}
*/
public static <S> ExtensionLoader<S> getExtensionLoader(Class<S> type) {
if (type == null) {
throw new IllegalArgumentException("Extension type should not be null.");
}
if (!type.isInterface()) {
throw new IllegalArgumentException("Extension type must be an interface.");
}
if (type.getAnnotation(SPI.class) == null) {
throw new IllegalArgumentException("Extension type must be annotated by @SPI");
}
// firstly get from cache, if not hit, create one
ExtensionLoader<S> extensionLoader = (ExtensionLoader<S>) EXTENSION_LOADERS.get(type);
if (extensionLoader == null) {
EXTENSION_LOADERS.putIfAbsent(type, new ExtensionLoader<S>(type));
extensionLoader = (ExtensionLoader<S>) EXTENSION_LOADERS.get(type);
}
return extensionLoader;
}
/**
* 得到扩展
*
* @param name 名字
* @return {@link T}
*/
public T getExtension(String name) {
if (name == null || name.isEmpty()) {
throw new IllegalArgumentException("Extension name should not be null or empty.");
}
// firstly get from cache, if not hit, create one
Holder<Object> holder = cachedInstances.get(name);
if (holder == null) {
cachedInstances.putIfAbsent(name, new Holder<>());
holder = cachedInstances.get(name);
}
// create a singleton if no instance exists
Object instance = holder.get();
if (instance == null) {
synchronized (holder) {
instance = holder.get();
if (instance == null) {
instance = createExtension(name);
holder.set(instance);
}
}
}
return (T) instance;
}
/**
* 创建扩展
*
* @param name 名字
* @return {@link T}
*/
private T createExtension(String name) {
// load all extension classes of type T from file and get specific one by name
Class<?> clazz = getExtensionClasses().get(name);
if (clazz == null) {
throw new RuntimeException("No such extension of name " + name);
}
T instance = (T) EXTENSION_INSTANCES.get(clazz);
if (instance == null) {
try {
EXTENSION_INSTANCES.putIfAbsent(clazz, clazz.newInstance());
instance = (T) EXTENSION_INSTANCES.get(clazz);
} catch (Exception e) {
log.error(e.getMessage());
}
}
return instance;
}
/**
* 得到扩展类
*
* @return {@link Map<String, Class<?>>}
*/
private Map<String, Class<?>> getExtensionClasses() {
// get the loaded extension class from the cache
Map<String, Class<?>> classes = cachedClasses.get();
// double check
if (classes == null) {
synchronized (cachedClasses) {
classes = cachedClasses.get();
if (classes == null) {
classes = new HashMap<>();
// load all extensions from our extensions directory
loadDirectory(classes);
cachedClasses.set(classes);
}
}
}
return classes;
}
/**
* 加载目录
*
* @param extensionClasses 扩展类
*/
private void loadDirectory(Map<String, Class<?>> extensionClasses) {
String fileName = ExtensionLoader.SERVICE_DIRECTORY + type.getName();
try {
Enumeration<URL> urls;
ClassLoader classLoader = ExtensionLoader.class.getClassLoader();
urls = classLoader.getResources(fileName);
if (urls != null) {
while (urls.hasMoreElements()) {
URL resourceUrl = urls.nextElement();
loadResource(extensionClasses, classLoader, resourceUrl);
}
}
} catch (IOException e) {
log.error(e.getMessage());
}
}
/**
* 加载资源
*
* @param extensionClasses 扩展类
* @param classLoader
* @param resourceUrl 资源url
*/
private void loadResource(Map<String, Class<?>> extensionClasses, ClassLoader classLoader, URL resourceUrl) {
try (BufferedReader reader = new BufferedReader(new InputStreamReader(resourceUrl.openStream(), UTF_8))) {
String line;
// read every line
while ((line = reader.readLine()) != null) {
// get index of comment
final int ci = line.indexOf('#');
if (ci >= 0) {
// string after # is comment so we ignore it
line = line.substring(0, ci);
}
line = line.trim();
if (line.length() > 0) {
try {
final int ei = line.indexOf('=');
String name = line.substring(0, ei).trim();
String clazzName = line.substring(ei + 1).trim();
// our SPI use key-value pair so both of them must not be empty
if (name.length() > 0 && clazzName.length() > 0) {
Class<?> clazz = classLoader.loadClass(clazzName);
extensionClasses.put(name, clazz);
}
} catch (ClassNotFoundException e) {
log.error(e.getMessage());
}
}
}
} catch (IOException e) {
log.error(e.getMessage());
}
}
}
一个不含参数的构造方法Holder
public class Holder<T> {
private volatile T value;
public T get() {
return value;
}
public void set(T value) {
this.value = value;
}
}
以及自定义注解@SPI
@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.TYPE)
public @interface SPI {
}
通过SPI机制在框架中实现了高可扩展和低耦合配置需求,通过改变配置文件即可实现扩展的变更。
Spring注解实现服务注册与服务消费
接口实现
@Target({ElementType.TYPE, ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@Import(CustomScannerRegistrar.class)
@Documented
public @interface RpcScan {
/**
* 包地址
*
* @return {@link String[]}
*/
String[] basePackage();
}
@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.TYPE})
@Inherited
public @interface RpcService {
/**
* 服务版本,默认为空字符
*/
String version() default "";
/**
* 组,默认为空字符
*/
String group() default "";
}
@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.FIELD})
@Inherited
public @interface RpcReference {
/**
* 服务版本,默认为空字符
*/
String version() default "";
/**
* 组,默认为空字符
*/
String group() default "";
}
自定义扫描注册
public class CustomScannerRegistrar implements ImportBeanDefinitionRegistrar, ResourceLoaderAware {
private static final String SPRING_BEAN_BASE_PACKAGE = "cn.plutowu.spring";
private static final String BASE_PACKAGE_ATTRIBUTE_NAME = "basePackage";
private ResourceLoader resourceLoader;
@Override
public void setResourceLoader(ResourceLoader resourceLoader) {
this.resourceLoader = resourceLoader;
}
@Override
public void registerBeanDefinitions(AnnotationMetadata annotationMetadata, BeanDefinitionRegistry beanDefinitionRegistry) {
//get the attributes and values of RpcScan annotation
AnnotationAttributes rpcScanAnnotationAttributes = AnnotationAttributes.fromMap(annotationMetadata.getAnnotationAttributes(RpcScan.class.getName()));
String[] rpcScanBasePackages = new String[0];
if (rpcScanAnnotationAttributes != null) {
// get the value of the basePackage property
rpcScanBasePackages = rpcScanAnnotationAttributes.getStringArray(BASE_PACKAGE_ATTRIBUTE_NAME);
}
if (rpcScanBasePackages.length == 0) {
rpcScanBasePackages = new String[]{((StandardAnnotationMetadata) annotationMetadata).getIntrospectedClass().getPackage().getName()};
}
// Scan the RpcService annotation
CustomScanner rpcServiceScanner = new CustomScanner(beanDefinitionRegistry, RpcService.class);
// Scan the Component annotation
CustomScanner springBeanScanner = new CustomScanner(beanDefinitionRegistry, Component.class);
if (resourceLoader != null) {
rpcServiceScanner.setResourceLoader(resourceLoader);
springBeanScanner.setResourceLoader(resourceLoader);
}
int springBeanAmount = springBeanScanner.scan(SPRING_BEAN_BASE_PACKAGE);
log.info("springBeanScanner扫描的数量 [{}]", springBeanAmount);
int rpcServiceCount = rpcServiceScanner.scan(rpcScanBasePackages);
log.info("rpcServiceScanner扫描的数量 [{}]", rpcServiceCount);
}
}
重写Spring的BeanPostProcessor
public class SpringBeanPostProcessor implements BeanPostProcessor {
private final ServiceProvider serviceProvider;
private final RpcRequestTransport rpcClient;
public SpringBeanPostProcessor() {
this.serviceProvider = SingletonFactory.getInstance(ServiceProviderImpl.class);
this.rpcClient = ExtensionLoader.getExtensionLoader(RpcRequestTransport.class).getExtension("netty");
}
@SneakyThrows
@Override
public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException {
if (bean.getClass().isAnnotationPresent(RpcService.class)) {
log.info("[{}] is annotated with [{}]", bean.getClass().getName(), RpcService.class.getCanonicalName());
// get RpcService annotation
RpcService rpcService = bean.getClass().getAnnotation(RpcService.class);
// build RpcServiceProperties
RpcServiceProperties rpcServiceProperties = RpcServiceProperties.builder()
.group(rpcService.group()).version(rpcService.version()).build();
serviceProvider.publishService(bean, rpcServiceProperties);
}
return bean;
}
@Override
public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
Class<?> targetClass = bean.getClass();
Field[] declaredFields = targetClass.getDeclaredFields();
for (Field declaredField : declaredFields) {
RpcReference rpcReference = declaredField.getAnnotation(RpcReference.class);
if (rpcReference != null) {
RpcServiceProperties rpcServiceProperties = RpcServiceProperties.builder()
.group(rpcReference.group()).version(rpcReference.version()).build();
RpcClientProxy rpcClientProxy = new RpcClientProxy(rpcClient, rpcServiceProperties);
Object clientProxy = rpcClientProxy.getProxy(declaredField.getType());
declaredField.setAccessible(true);
try {
declaredField.set(bean, clientProxy);
} catch (IllegalAccessException e) {
e.printStackTrace();
}
}
}
return bean;
}
}
通过借助Spring的BeanPostProcessor进行bean注入的自定义函数增强,以及加上自定义扫描注册,最后在客户端与服务端加上对应的自定义注解即可完成自定义扫描包。
本博客所有文章除特别声明外,均采用 CC BY-SA 4.0 协议 ,转载请注明出处!