本文共 9690 字,大约阅读时间需要 32 分钟。
RestBulkAction -> TransportBulkAction -> TransportShardBulkAction
TransportShardBulkAction < TransportReplicationAction < TransportAction
public class BulkRequest extends ActionRequestimplements CompositeIndicesRequest { //这个就是前面提到的requests final List requests = new ArrayList<>(); //这个复杂的方法就是通过http请求参数解析出//IndexRequest,DeleteRequest,UpdateRequest等然后放到requests里public BulkRequest add(BytesReference data, @Nullable String defaultIndex, @Nullable String defaultType, @Nullable String defaultRouting, @Nullable String[] defaultFields, @Nullable Object payload, boolean allowExplicitIndex) throws Exception { XContent xContent = XContentFactory.xContent(data); int line = 0; int from = 0; int length = data.length(); byte marker = xContent.streamSeparator(); while (true) {
//这里的client其实是NodeClientclient.bulk(bulkRequest, new RestBuilderListener(channel) {TransportBulkAction
public class TransportBulkAction extends HandledTransportAction{
protected void doExecute(final BulkRequest bulkRequest, final ActionListener这里的bulkRequest 就是前面RestBulkAction组装好的。该方法第一步是判断是不是需要自动建索引,如果索引不存在,就自动创建了。listener) {
//这里的BulkItemRequest来源于 IndexRequest等Map[ShardId, List[BulkItemRequest]]
//这里的shardBulkAction 是TransportShardBulkActionshardBulkAction.execute(bulkShardRequest, new ActionListener() {TransportReplicationAction/TransportShardBulkActionTransportAction是一个通用的主类,具体逻辑还是其子类来实现。虽然前面提到shardBulkAction是TransportShardBulkAction,但其实流程逻辑还是TransportReplicationAction来完成的。入口在该类的doExecute方法:@Override protected void doExecute(Request request, ActionListener listener) { new PrimaryPhase(request, listener).run(); }
final ShardIterator shardIt = shards(observer.observedState(), internalRequest);final ShardRouting primary = resolvePrimary(shardIt);
routeRequestOrPerformLocally(primary, shardIt);
//我去掉了一些无关代码哈if (primary.currentNodeId().equals(observer.observedState().nodes().localNodeId())) { try { threadPool.executor(executor).execute(new AbstractRunnable() { @Override protected void doRun() throws Exception { performOnPrimary(primary, shardsIt); } }
protected TupleshardOperationOnPrimary(ClusterState clusterState, PrimaryOperationRequest shardRequest) {
//伟大的版本号,实现了对并发修改的支持long[] preVersions = new long[request.items().length];VersionType[] preVersionTypes = new VersionType[request.items().length];//事物日志,为Shard Recovery以及//避免过多的Index Commit做出突出贡献,//同时也是是实现了GetById的实时性Translog.Location location = null;
//这里的request是BulkShardRequest//对应的items则是BulkItemRequest集合for (int requestIndex = 0; requestIndex < request.items().length; requestIndex++) {
IndexRequest,DeleteRequest,UpdateRequest,我们这里依然只讨论IndexRequest。如果发现BulkItemRequest是IndexRequest,进行如下操作:WriteResultresult = shardIndexOperation(request, indexRequest, clusterState, indexShard, true);
Engine.IndexingOperation operation = prepareIndexOperationOnPrimary(shardRequest, request, indexShard);
final boolean created = operation.execute(indexShard);
@Overridepublic boolean execute(IndexShard shard) { shard.create(this); return true;}
//我依然做了删减,体现一些核心代码public void create(Engine.Create create) { engine().create(create); }
private void innerCreate(Create create) throws IOException { if (engineConfig.isOptimizeAutoGenerateId() && create.autoGeneratedId() && !create.canHaveDuplicates()) { // We don't need to lock because this ID cannot be concurrently updated: innerCreateNoLock(create, Versions.NOT_FOUND, null); } else { synchronized (dirtyLock(create.uid())) { final long currentVersion; final VersionValue versionValue; versionValue = versionMap.getUnderLock(create.uid().bytes()); if (versionValue == null) { currentVersion = loadCurrentVersionFromIndex(create.uid()); } else { if (engineConfig.isEnableGcDeletes() && versionValue.delete() && (engineConfig.getThreadPool().estimatedTimeInMillis() - versionValue.time()) > engineConfig.getGcDeletesInMillis()) { currentVersion = Versions.NOT_FOUND; // deleted, and GC } else { currentVersion = versionValue.version(); } } innerCreateNoLock(create, currentVersion, versionValue); } } }
currentVersion = loadCurrentVersionFromIndex(create.uid());
indexWriter.addDocument(index.docs().get(0));//或者indexWriter.updateDocument(index.uid(), index.docs().get(0));
final boolean created = operation.execute(indexShard);
replicationPhase = new ReplicationPhase(shardsIt, primaryResponse.v2(), primaryResponse.v1(), observer, primary, internalRequest, listener, indexShardReference);finishAndMoveToReplication(replicationPhase);
threadPool.executor(executor).execute(new AbstractRunnable() { @Override protected void doRun() { try { shardOperationOnReplica(shard.shardId(), replicaRequest); onReplicaSuccess(); } catch (Throwable e) { onReplicaFailure(nodeId, e); failReplicaIfNeeded(shard.index(), shard.id(), e); } }
转载地址:http://adfwl.baihongyu.com/