领秀网是领先的新闻资讯平台,汇集美食文化、商旅生涯、国际资讯、投资理财、热点新闻、教育科研、等多方面权威信息
2021-10-24 11:22:00
启动脚本为connect-mirror-maker.sh,可以通过KAFKA_LOG4J_OPTS指定log4j配置文件,可以通过KAFKA_HEAP_OPTS指定堆内存(默认为-Xms256M –Xmx2G),具体类路径为:
org.apache.kafka.connect.mirror.MirrorMaker
main方法首先通过argparse4j(第三方实现的解析命令行的工具)解析命令行参数,返回一个Namespace包装类。
ArgumentParser parser = ArgumentParsers.newArgumentParser("connect-mirror-maker"); parser.description("MirrorMaker 2.0 driver"); parser.addArgument("config").type(Arguments.fileType().verifyCanRead()) .metavar("mm2.properties").required(true) .help("MM2 configuration file."); parser.addArgument("--clusters").nargs("+").metavar("CLUSTER").required(false) .help("Target cluster to use for this node."); Namespace ns; try { ns = parser.parseArgs(args); } catch (ArgumentParserException e) { parser.handleError(e); Exit.exit(-1); return; }
题外话:这个Namespace类设计的很方便。该类实际包装一个Map(存储解析出来的命令行参数KV),通过一个get()方法返回泛型类结果,然后getString()、getByte()、getShort()、getInt()、getLong()、getFloat()、getDouble()、getBoolean()、getList()等方法调用get()方法获取指定数据类型的返回结果。这样的设计很方便获取各种类型的命令行参数值,完全可以复用在别的日常开发中。Namespace具体类结构如图所示:
继续正题,解析connect-mirror-maker.sh命令行参数可以获取到配置文件和clusters参数。然后就可以实例化MirrorMaker类。
MirrorMaker mirrorMaker = new MirrorMaker(config, clusters, Time.SYSTEM);
该类的构造方法中用一个Set
x-y.emit.heartbeats.enabled=false时,x-y才不会被存放到herderPairs内。这么做的原因是:
对于开启emit.hearts的replicationflow(如A->B),是需要两个herders的。一个是A->B属于真实的replicaiton flow同步topic数据,即MirrorSourceConnector;另一个是B->A属于用于监控replicaiton flow健康状态的提交heartbeats到A的MirrorHeartbeatConnector。
log.info("Targeting clusters {}", this.clusters); this.herderPairs = config.clusterPairs().stream() .filter(x -> this.clusters.contains(x.target())) .collect(Collectors.toSet());
除了上述规则,如果命令行参数显式设置了clusters参数,herderPairs内存储的SourceAndTarget pairs还需要满足target cluster属于clusters参数的子集。这么做的原因是考虑MirrorMaker优先运行在target cluster原则:该原则主要是考虑异常情况下,避免无效读写请求。
然后,遍历herderPairs变量,每个变量(SourceAndTarget)均执行addHerder()方法。AddHerder()方法主要功能:实例化一个Kafka Connect框架Wroker类,该Worker类会运行多个线程执行多个tasks(即读Kafka或者写Kafka)。
Worker worker = new Worker(workerId, time, plugins, distributedConfig, offsetBackingStore, CLIENT_CONFIG_OVERRIDE_POLICY);
再基于Worker对象,实例化Herder接口类,该接口有两个实现类:StandaloneHerder、DistributedHerder。MirrorMaker类使用的是DistributedHerder。
Herder herder = new DistributedHerder(distributedConfig, time, worker, kafkaClusterId, statusBackingStore, configBackingStore, advertisedUrl, CLIENT_CONFIG_OVERRIDE_POLICY, sharedAdmin);
然后,将每一个SourceAndTarget和对应的herder,存储到Map
private final Mapherders = new HashMap<>(); herders.put(sourceAndTarget, herder);
关于Herder接口和两个实现类:
Herder接口用于跟踪和管理workers和connectors。 StandaloneHerder属于单进程,使用基于内存的MemoryStatusBackingStore和MemoryConfigBackingStore,用于standalone模式Kafka Connect进程。 DistributedHerder使用基于Kafka的KafkaStatusBackingStore和KafkaConfigBackingStore,在多个进程之间协调workers,底层基于Kafka group membership实现group managed;每个加入group的DistributedHerder实例,上报它的configuration state。Group协调器给每个实例分配一些connectors和tasks执行,分配策略采用简单的round-ronbin方式。但这也不是绝对的,为了避免start/stop花销,herder也会采用sticky分配策略。DistributedHerder实例只运行分配给它的connectors和tasks。
补充下几个概念:
Connectors:the high level abstraction that coordinates data streaming by managing tasks Tasks:the implementation of how data is copied to or from Kafka Workers:the running processes that execute connectors and tasks
至此,MirrorMaker构造方法结束,MirrorMaker实例化完成。
我们知道,上述构造方法中得到Map
for (Herder herder : herders.values()) { try { herder.start(); } finally { startLatch.countDown(); } }
DistributedHerder类实例化时,实例化了一个线程数为1的herderExecutor线程池。
this.herderExecutor = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingDeque(1), ThreadUtils.createThreadFactory( this.getClass().getSimpleName() + "-" + clientId + "-%d", false));
DistributedHerder类实现了Runnable接口,其start方法将自身提交到上述线程池运行。
@Override public void start() { this.herderExecutor.submit(this); }
DistributedHerder类run方法通过调用startServices()方法以启动worker、statusBackingStore、configBackingStore服务。
protected void startServices() { this.worker.start(); this.statusBackingStore.start(); this.configBackingStore.start(); }
最后,MirrorMaker类start()方法会遍历Set
herderPairs.forEach(x -> configureConnectors(x));
putConnectorConfig将具体的SourceAndTarget所对应的replication flow任务提交到上述已经完成启动的DistributedHerder各种服务中。
至此,MIrrorMakerV2.0启动完成!!