Storm提供的一个插拔式调度策略的扩展,在加入新的topology后,并不会完全替代系统自带的分配策略。它仅仅是扩展,不能替代。而且按照设计,它的出现是为了应付一些极端情况,比如,slot不够用。在这些情况下,storm默认的调度策略,无法很好的实施。然后,才会执行自定义的调度策略。新加入的topology启动后,系统默认的调度策略还在起作用,所以,我们制定的spout/bolt的实体可能已经被分配了,也可能我们制定的supervisor机器的slot已经全部用光了,只有这些都可以满足的时候,我们才能进行一定程度的调度。
为了打破以上的限制,我写的方法是,首先将所有的已经分配的任务全部释放掉。然后,按照逻辑进行任务的分配。剩下的任务使用默认的调度策略,但是,总是出现一台supervisor上面分配不到任务。
最后的解决方法,是把topology中所有的任务,全部自定义分配。然后将无任务可分配的topology交给默认调度策略来分配(这一步不能省,否则,会按照系统默认的调度策略进行调度。这说明调度策略,除了进行任务的分配还进行了其他的配置)。这样的结果,才能将任务按照逻辑成功分配。当然,这样会有很多的问题,比如,在自定义调度策略分配完之前,不能接收任何tuple。调度策略的粒度,是线程级别。
自己写的代码如下:实现了,将spout定向配置到其他某个supervisor上,然后,将所有的blot线程平均分摊到所有的supervisor。集群环境:4个supervisor,两个blot,并行度各为10.
如下代码仅供参考,不要随便移植走,如要移植走还要解决一切线程分配的算法问题。
package storm; import java.util.ArrayList; import java.util.Collection; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; import backtype.storm.scheduler.Cluster; import backtype.storm.scheduler.EvenScheduler; import backtype.storm.scheduler.ExecutorDetails; import backtype.storm.scheduler.IScheduler; import backtype.storm.scheduler.SchedulerAssignment; import backtype.storm.scheduler.SupervisorDetails; import backtype.storm.scheduler.Topologies; import backtype.storm.scheduler.TopologyDetails; import backtype.storm.scheduler.WorkerSlot; public class DemoScheduler implements IScheduler { public void prepare(Map conf) {} private int flag=0; private void Myschedule(Topologies topologies, Cluster cluster) { SchedulerAssignment currentAssignment = cluster.getAssignmentById(topologies.getByName("special-topology").getId()); if (currentAssignment != null) { System.out.println("MY:current assignments: " + currentAssignment.getExecutorToSlot()); } else { System.out.println("My:current assignments: {}"); } SupervisorDetails specialSupervisor= GetSupervisorDetailsByName(cluster,"special-slave3"); if(specialSupervisor!=null) { List<WorkerSlot> availableSlots = cluster.getAvailableSlots(specialSupervisor); System.out.println("availableSlotsNum:"+availableSlots.size()); System.out.println("availableSlotsNum List:"+availableSlots); TopologyDetails topology = topologies.getByName("special-topology"); Map<String, List<ExecutorDetails>> componentToExecutors = cluster.getNeedsSchedulingComponentToExecutors(topology); List<ExecutorDetails> executors = componentToExecutors.get("1"); List<ExecutorDetails> executors2 = componentToExecutors.get("2"); Map<String, SupervisorDetails> AllSupervisors= cluster.getSupervisors(); Collection<SupervisorDetails>AllSuperVaule= AllSupervisors.values(); SupervisorDetails[] superArray=new SupervisorDetails[AllSuperVaule.size()]; AllSuperVaule.toArray(superArray); ArrayList<ExecutorDetails> AllExecutor=new ArrayList<ExecutorDetails>(); for(int i=0;i<executors.size();i++) { AllExecutor.add(executors.get(i)); AllExecutor.add(executors2.get(i)); } System.out.println("AllExecutor size:"+AllExecutor.size()+" "+superArray.length); for(int i=0;i<superArray.length;i++) { List<ExecutorDetails> temp=AllExecutor.subList(i*5, i*5+5); List<WorkerSlot> availableSlotsInner = cluster.getAvailableSlots(superArray[i]); cluster.assign(availableSlotsInner .get(0), topology.getId(), temp); System.out.println("Assiment:"+temp+"to"+i); } // cluster.assign(availableSlots.get(1), topology.getId(), executors); // cluster.assign(availableSlots.get(2), topology.getId(), executors2); } else { System.out.println("special-slave3 is not exits!!!"); } } private SupervisorDetails GetSupervisorDetailsByName(Cluster cluster,String SupervisorName) { // find out the our "special-supervisor" from the supervisor metadata Collection<SupervisorDetails> supervisors = cluster.getSupervisors().values(); SupervisorDetails specialSupervisor = null; for (SupervisorDetails supervisor : supervisors) { Map meta = (Map) supervisor.getSchedulerMeta(); if (meta.get("name").equals(SupervisorName)) { specialSupervisor = supervisor; break; } } return specialSupervisor; } public void schedule(Topologies topologies, Cluster cluster) { System.out.println("DemoScheduler: begin scheduling"); // Gets the topology which we want to schedule TopologyDetails topology = topologies.getByName("special-topology"); // make sure the special topology is submitted, if (topology != null) { System.out.println("special-topology is not null!!!"); if(flag==0) { boolean needsScheduling = cluster.needsScheduling(topology); // cluster.n if (needsScheduling) { System.out.println("Our special topology DOES NOT NEED scheduling."); } else { System.out.println("Our special topology needs scheduling."); // find out all the needs-scheduling components of this topology Collection<SupervisorDetails> Tempsupervisors = cluster.getSupervisors().values();//d for (SupervisorDetails supervisor : Tempsupervisors) { List<WorkerSlot> availableSlots = cluster.getAvailableSlots(supervisor); // int Availablenum =availableSlots.size(); String suName=supervisor.getHost(); System.out.println("before:HostName:"+suName+" AvailableNum:"+Availablenum); if(!availableSlots.isEmpty()) { for (Integer port : cluster.getUsedPorts(supervisor)) { cluster.freeSlot(new WorkerSlot(supervisor.getId(), port)); } } List<WorkerSlot> availableSlots2 = cluster.getAvailableSlots(supervisor); int Availablenum2 =availableSlots2.size(); System.out.println("after:HostName:"+suName+" AvailableNum:"+Availablenum2); } Map<String, List<ExecutorDetails>> componentToExecutors = cluster.getNeedsSchedulingComponentToExecutors(topology); System.out.println("needs scheduling(component->executor): " + componentToExecutors); System.out.println("needs scheduling(executor->compoenents): " + cluster.getNeedsSchedulingExecutorToComponents(topology)); SchedulerAssignment currentAssignment = cluster.getAssignmentById(topologies.getByName("special-topology").getId()); if (currentAssignment != null) { System.out.println("current assignments: " + currentAssignment.getExecutorToSlot()); } else { System.out.println("current assignments: {}"); } if (!componentToExecutors.containsKey("spout")) { System.out.println("Our special-spout DOES NOT NEED scheduling."); } else { System.out.println("Our special-spout needs scheduling."); List<ExecutorDetails> executors = componentToExecutors.get("spout"); // find out the our "special-supervisor" from the supervisor metadata Collection<SupervisorDetails> supervisors = cluster.getSupervisors().values(); SupervisorDetails specialSupervisor = null; for (SupervisorDetails supervisor : supervisors) { Map meta = (Map) supervisor.getSchedulerMeta(); if (meta.get("name").equals("special-slave2")) { specialSupervisor = supervisor; break; } } // found the special supervisor if (specialSupervisor != null) { System.out.println("Found the special-supervisor"); List<WorkerSlot> availableSlots = cluster.getAvailableSlots(specialSupervisor); // if there is no available slots on this supervisor, free some. // TODO for simplicity, we free all the used slots on the supervisor. if (availableSlots.isEmpty() && !executors.isEmpty()) { for (Integer port : cluster.getUsedPorts(specialSupervisor)) { cluster.freeSlot(new WorkerSlot(specialSupervisor.getId(), port)); } } // re-get the aviableSlots availableSlots = cluster.getAvailableSlots(specialSupervisor); // since it is just a demo, to keep things simple, we assign all the // executors into one slot. cluster.assign(availableSlots.get(0), topology.getId(), executors); Myschedule(topologies, cluster); flag=1; System.out.println("We assigned executors:" + executors + " to slot: [" + availableSlots.get(0).getNodeId() + ", " + availableSlots.get(0).getPort() + "]"); } else { System.out.println("There is no supervisor named special-supervisor!!!"); } } } }//end flag==0 else { System.out.println(" only do once :"+flag); } }//end special=null // let system‘s even scheduler handle the rest scheduling work // you can also use your own other scheduler here, this is what // makes storm‘s scheduler composable. System.out.println("using the default system Schedule!!!"); new EvenScheduler().schedule(topologies, cluster); } }//end class