Strom:pluggable scheduler 自己写的自定义分配任务

Storm提供的一个插拔式调度策略的扩展,在加入新的topology后,并不会完全替代系统自带的分配策略。它仅仅是扩展,不能替代。而且按照设计,它的出现是为了应付一些极端情况,比如,slot不够用。在这些情况下,storm默认的调度策略,无法很好的实施。然后,才会执行自定义的调度策略。新加入的topology启动后,系统默认的调度策略还在起作用,所以,我们制定的spout/bolt的实体可能已经被分配了,也可能我们制定的supervisor机器的slot已经全部用光了,只有这些都可以满足的时候,我们才能进行一定程度的调度。

为了打破以上的限制,我写的方法是,首先将所有的已经分配的任务全部释放掉。然后,按照逻辑进行任务的分配。剩下的任务使用默认的调度策略,但是,总是出现一台supervisor上面分配不到任务。

最后的解决方法,是把topology中所有的任务,全部自定义分配。然后将无任务可分配的topology交给默认调度策略来分配(这一步不能省,否则,会按照系统默认的调度策略进行调度。这说明调度策略,除了进行任务的分配还进行了其他的配置)。这样的结果,才能将任务按照逻辑成功分配。当然,这样会有很多的问题,比如,在自定义调度策略分配完之前,不能接收任何tuple。调度策略的粒度,是线程级别。

自己写的代码如下:实现了,将spout定向配置到其他某个supervisor上,然后,将所有的blot线程平均分摊到所有的supervisor。集群环境:4个supervisor,两个blot,并行度各为10.

如下代码仅供参考,不要随便移植走,如要移植走还要解决一切线程分配的算法问题。

package storm;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;

import backtype.storm.scheduler.Cluster;
import backtype.storm.scheduler.EvenScheduler;
import backtype.storm.scheduler.ExecutorDetails;
import backtype.storm.scheduler.IScheduler;
import backtype.storm.scheduler.SchedulerAssignment;
import backtype.storm.scheduler.SupervisorDetails;
import backtype.storm.scheduler.Topologies;
import backtype.storm.scheduler.TopologyDetails;
import backtype.storm.scheduler.WorkerSlot;


public class DemoScheduler implements IScheduler {
    public void prepare(Map conf) {}

    private int flag=0;
    private void Myschedule(Topologies topologies, Cluster cluster)
    {
    	
    	 SchedulerAssignment currentAssignment = cluster.getAssignmentById(topologies.getByName("special-topology").getId());
         if (currentAssignment != null) {
                 System.out.println("MY:current assignments: " + currentAssignment.getExecutorToSlot());
         } else {
                 System.out.println("My:current assignments: {}");
         }
         
         SupervisorDetails specialSupervisor= GetSupervisorDetailsByName(cluster,"special-slave3");
         if(specialSupervisor!=null)
         {
         List<WorkerSlot>  availableSlots = cluster.getAvailableSlots(specialSupervisor);

         System.out.println("availableSlotsNum:"+availableSlots.size());
         System.out.println("availableSlotsNum List:"+availableSlots);
       
         TopologyDetails topology = topologies.getByName("special-topology");
        
         Map<String, List<ExecutorDetails>> componentToExecutors = cluster.getNeedsSchedulingComponentToExecutors(topology);
         List<ExecutorDetails> executors = componentToExecutors.get("1");
         List<ExecutorDetails> executors2 = componentToExecutors.get("2");
        
         Map<String, SupervisorDetails> AllSupervisors= cluster.getSupervisors();
         Collection<SupervisorDetails>AllSuperVaule= AllSupervisors.values();
        
         SupervisorDetails[] superArray=new SupervisorDetails[AllSuperVaule.size()];
         AllSuperVaule.toArray(superArray);
         
         ArrayList<ExecutorDetails> AllExecutor=new ArrayList<ExecutorDetails>();
         
         for(int i=0;i<executors.size();i++)
         {
        	 AllExecutor.add(executors.get(i));
        	 AllExecutor.add(executors2.get(i));
         }
         
         System.out.println("AllExecutor size:"+AllExecutor.size()+" "+superArray.length);
        for(int i=0;i<superArray.length;i++)
         {
        	 List<ExecutorDetails> temp=AllExecutor.subList(i*5, i*5+5);
        	 
        	 List<WorkerSlot>  availableSlotsInner = cluster.getAvailableSlots(superArray[i]);
        	 cluster.assign(availableSlotsInner .get(0), topology.getId(), temp);
        	 System.out.println("Assiment:"+temp+"to"+i);
        	 
         }
         
       //  cluster.assign(availableSlots.get(1), topology.getId(), executors);
        // cluster.assign(availableSlots.get(2), topology.getId(), executors2);
        
         }
         else
         {
        	 System.out.println("special-slave3 is not exits!!!");
         }
    }
    private SupervisorDetails  GetSupervisorDetailsByName(Cluster cluster,String SupervisorName)
    {
        // find out the our "special-supervisor" from the supervisor metadata
        Collection<SupervisorDetails> supervisors = cluster.getSupervisors().values();
        SupervisorDetails specialSupervisor = null;
        for (SupervisorDetails supervisor : supervisors) {
            Map meta = (Map) supervisor.getSchedulerMeta();

            if (meta.get("name").equals(SupervisorName)) {
                specialSupervisor = supervisor;
                break;
            }
        }
        
        return specialSupervisor;
    }
    public void schedule(Topologies topologies, Cluster cluster) {
    	
 
            System.out.println("DemoScheduler: begin scheduling");
        // Gets the topology which we want to schedule
            TopologyDetails topology = topologies.getByName("special-topology");

        // make sure the special topology is submitted,
        if (topology != null) {
        	System.out.println("special-topology is  not null!!!");
        	
if(flag==0)
{
      
            boolean needsScheduling = cluster.needsScheduling(topology);
           // cluster.n
            if (needsScheduling) {
                    System.out.println("Our special topology DOES NOT NEED scheduling.");
            } else {
                    System.out.println("Our special topology needs scheduling.");
                // find out all the needs-scheduling components of this topology
                    
                    
                    
                    Collection<SupervisorDetails> Tempsupervisors = cluster.getSupervisors().values();//d
                    
                    
                    for (SupervisorDetails supervisor : Tempsupervisors) {

                    
                    	
                    	
                    	   List<WorkerSlot> availableSlots = cluster.getAvailableSlots(supervisor);
                    	   //
                    	  int Availablenum =availableSlots.size();
                    	  String suName=supervisor.getHost();
                    	  
                    	  System.out.println("before:HostName:"+suName+" AvailableNum:"+Availablenum);
                    	  
                    	   if(!availableSlots.isEmpty())
                    	   {
                    		   for (Integer port : cluster.getUsedPorts(supervisor)) {
                                   cluster.freeSlot(new WorkerSlot(supervisor.getId(), port));
                               }
                    	   }
                    	   List<WorkerSlot> availableSlots2 = cluster.getAvailableSlots(supervisor);
                    	   
                     	  int Availablenum2 =availableSlots2.size();
                     	 
                     	  
                     	  System.out.println("after:HostName:"+suName+" AvailableNum:"+Availablenum2);
                         
                        }
                    
                    
                    
                    
                    
                Map<String, List<ExecutorDetails>> componentToExecutors = cluster.getNeedsSchedulingComponentToExecutors(topology);
                
                System.out.println("needs scheduling(component->executor): " + componentToExecutors);
                System.out.println("needs scheduling(executor->compoenents): " + cluster.getNeedsSchedulingExecutorToComponents(topology));
                SchedulerAssignment currentAssignment = cluster.getAssignmentById(topologies.getByName("special-topology").getId());
                if (currentAssignment != null) {
                        System.out.println("current assignments: " + currentAssignment.getExecutorToSlot());
                } else {
                        System.out.println("current assignments: {}");
                }
                
              
                
                
                if (!componentToExecutors.containsKey("spout")) {
                        System.out.println("Our special-spout DOES NOT NEED scheduling.");
                } else {
                    System.out.println("Our special-spout needs scheduling.");
                    List<ExecutorDetails> executors = componentToExecutors.get("spout");
 
                    
                    
                    
                    
                    // find out the our "special-supervisor" from the supervisor metadata
                    Collection<SupervisorDetails> supervisors = cluster.getSupervisors().values();
                    SupervisorDetails specialSupervisor = null;
                    for (SupervisorDetails supervisor : supervisors) {
                        Map meta = (Map) supervisor.getSchedulerMeta();

                        if (meta.get("name").equals("special-slave2")) {
                            specialSupervisor = supervisor;
                            break;
                        }
                    }

                    // found the special supervisor
                    if (specialSupervisor != null) {
                            System.out.println("Found the special-supervisor");
                        List<WorkerSlot> availableSlots = cluster.getAvailableSlots(specialSupervisor);
                        
                        // if there is no available slots on this supervisor, free some.
                        // TODO for simplicity, we free all the used slots on the supervisor.
                        if (availableSlots.isEmpty() && !executors.isEmpty()) {
                            for (Integer port : cluster.getUsedPorts(specialSupervisor)) {
                                cluster.freeSlot(new WorkerSlot(specialSupervisor.getId(), port));
                            }
                        }

                        // re-get the aviableSlots
                        availableSlots = cluster.getAvailableSlots(specialSupervisor);

                        // since it is just a demo, to keep things simple, we assign all the
                        // executors into one slot.
                        cluster.assign(availableSlots.get(0), topology.getId(), executors);
                        Myschedule(topologies, cluster);
                        flag=1;
                        System.out.println("We assigned executors:" + executors + " to slot: [" + availableSlots.get(0).getNodeId() + ", " + availableSlots.get(0).getPort() + "]");
                    } else {
                            System.out.println("There is no supervisor named special-supervisor!!!");
                    }
                }
            }
            
            
        }//end flag==0
else
{
	System.out.println(" only do once :"+flag);
	}
    }//end special=null
       
        // let system‘s even scheduler handle the rest scheduling work
        // you can also use your own other scheduler here, this is what
        // makes storm‘s scheduler composable.
        	System.out.println("using the default system Schedule!!!");
        new EvenScheduler().schedule(topologies, cluster);
        
       
      
    }
    	
    }//end class


Strom:pluggable scheduler 自己写的自定义分配任务

上一篇:使用MediaPlayer和SurfaceView播放视频


下一篇:AChartEngine简介