转自:http://blog.csdn.net/daliaojie/article/details/18617305
在做storm项目的时候,发现之前的那种判断是否执行自定义分配策略的条件太粗陋。以下是较严格的解决方案:
处理步骤应该为:
不懂得可以给我留言
//从实体的名字获取实体的线程集
private ArrayList<ExecutorDetails> getExecutorByName( TopologyDetails topology, String ExecutorNames) { ArrayList<ExecutorDetails> re = new ArrayList<ExecutorDetails>(); TopologyDetails TempTP = topology; Map<ExecutorDetails, String> temMap = TempTP.getExecutorToComponent(); Set<ExecutorDetails> keyS = temMap.keySet(); String ComName = null; for (ExecutorDetails temp : keyS) { ComName = temMap.get(temp); if (ComName.equals(ExecutorNames)) { re.add(temp); } } return re; }
//从线程集获取对应的WorkerSlot集合 private ArrayList<WorkerSlot> getslotByEx( SchedulerAssignment currentAssignment, ArrayList<ExecutorDetails> Edlist) { ArrayList<WorkerSlot> re = new ArrayList<WorkerSlot>(); Map<ExecutorDetails, WorkerSlot> tempEw = currentAssignment .getExecutorToSlot(); WorkerSlot tempWorkSlot = null; for (ExecutorDetails temp : Edlist) { tempWorkSlot = tempEw.get(temp); re.add(tempWorkSlot); } return re; }
//根据WorkerSlot获取对应的主机名集合 private ArrayList<String> getHostName(ArrayList<WorkerSlot> wSlotAlist, Cluster cluster) { ArrayList<String> AlHostName = new ArrayList<String>(); for (WorkerSlot wSlot : wSlotAlist) { String tmp = wSlot.getNodeId(); String HostName = cluster.getHost(tmp); if (AlHostName.contains(HostName) == false) { AlHostName.add(HostName); } } return AlHostName; }
//需要执行自定义策略,则返回1 private int needdocustomSchedule(String topologyName, String ComName, String HostName, Cluster cluster, Topologies topologies) { int flag = -1; Map<String, List<ExecutorDetails>> componentToExecutors = cluster .getNeedsSchedulingComponentToExecutors(topologies .getByName(topologyName)); if (componentToExecutors == null) { System.out.println("currentAssignment is null!!!"); return flag; } if (componentToExecutors.containsKey(ComName))// component don‘t allocate { flag = 1; } else { TopologyDetails topologyss = topologies.getByName(topologyName); ArrayList<ExecutorDetails> tempEname = getExecutorByName( topologyss, ComName); SchedulerAssignment currentAssignment = cluster .getAssignmentById(topologies.getByName(topologyName) .getId()); if (currentAssignment != null) { ArrayList<WorkerSlot> alistWs = getslotByEx(currentAssignment, tempEname); ArrayList<String> AlistHostName = getHostName(alistWs, cluster); for (String tempStr : AlistHostName) { if (tempStr.equals(HostName)) { flag = 0; } else { flag = 1; } } } } return flag; }
//释放当前已分配的任务 private void freeAssignment(SchedulerAssignment currentAssignment,Cluster cluster) { Set<WorkerSlot> tempws=currentAssignment.getSlots(); for(WorkerSlot temp:tempws) { cluster.freeSlot(new WorkerSlot(temp.getNodeId(),temp.getPort())); } }