Strom:pluggable scheduler :如何检测component是否已经分配到了指定的主机上

转自:http://blog.csdn.net/daliaojie/article/details/18617305

在做storm项目的时候,发现之前的那种判断是否执行自定义分配策略的条件太粗陋。以下是较严格的解决方案:

处理步骤应该为:

不懂得可以给我留言

//从实体的名字获取实体的线程集	
private ArrayList<ExecutorDetails> getExecutorByName(
			TopologyDetails topology, String ExecutorNames) {

		ArrayList<ExecutorDetails> re = new ArrayList<ExecutorDetails>();

		TopologyDetails TempTP = topology;
		Map<ExecutorDetails, String> temMap = TempTP.getExecutorToComponent();

		Set<ExecutorDetails> keyS = temMap.keySet();
		String ComName = null;
		for (ExecutorDetails temp : keyS) {
			ComName = temMap.get(temp);

			if (ComName.equals(ExecutorNames)) {
				re.add(temp);
			}
		}

		return re;

	}

//从线程集获取对应的WorkerSlot集合
	private ArrayList<WorkerSlot> getslotByEx(
			SchedulerAssignment currentAssignment,
			ArrayList<ExecutorDetails> Edlist) {

		ArrayList<WorkerSlot> re = new ArrayList<WorkerSlot>();

		Map<ExecutorDetails, WorkerSlot> tempEw = currentAssignment
				.getExecutorToSlot();

		WorkerSlot tempWorkSlot = null;
		for (ExecutorDetails temp : Edlist) {

			tempWorkSlot = tempEw.get(temp);

			re.add(tempWorkSlot);
		}

		return re;

	}

//根据WorkerSlot获取对应的主机名集合
	private ArrayList<String> getHostName(ArrayList<WorkerSlot> wSlotAlist,
			Cluster cluster) {
		ArrayList<String> AlHostName = new ArrayList<String>();

		for (WorkerSlot wSlot : wSlotAlist) {
			String tmp = wSlot.getNodeId();

			String HostName = cluster.getHost(tmp);

			if (AlHostName.contains(HostName) == false) {
				AlHostName.add(HostName);
			}
		}
		return AlHostName;
	}

//需要执行自定义策略,则返回1
	private int needdocustomSchedule(String topologyName, String ComName,
			String HostName, Cluster cluster, Topologies topologies) {
		int flag = -1;

		Map<String, List<ExecutorDetails>> componentToExecutors = cluster
				.getNeedsSchedulingComponentToExecutors(topologies
						.getByName(topologyName));

		if (componentToExecutors == null) {
			System.out.println("currentAssignment is null!!!");
			return flag;
		}
		if (componentToExecutors.containsKey(ComName))// component don‘t allocate
		{
			flag = 1;
		} else {
			TopologyDetails topologyss = topologies.getByName(topologyName);
			ArrayList<ExecutorDetails> tempEname = getExecutorByName(
					topologyss, ComName);

			SchedulerAssignment currentAssignment = cluster
					.getAssignmentById(topologies.getByName(topologyName)
							.getId());

			if (currentAssignment != null) {
				ArrayList<WorkerSlot> alistWs = getslotByEx(currentAssignment,
						tempEname);
				ArrayList<String> AlistHostName = getHostName(alistWs, cluster);
				for (String tempStr : AlistHostName) {
					if (tempStr.equals(HostName)) {
						flag = 0;
					}
					else
					{
						flag = 1;
					}
				}
			}

		}

		return flag;
	}

//释放当前已分配的任务
	private void freeAssignment(SchedulerAssignment currentAssignment,Cluster cluster) {
		
		Set<WorkerSlot> tempws=currentAssignment.getSlots();
		for(WorkerSlot temp:tempws)
		{
			cluster.freeSlot(new WorkerSlot(temp.getNodeId(),temp.getPort()));
		}
	}


Strom:pluggable scheduler :如何检测component是否已经分配到了指定的主机上

上一篇:mysql中limit的用法详解[数据分页常用]


下一篇:Mac OS安装卸载MySQL教程