[Ray.Tune]调度算法简介与常用算法代码

在 Tune 中,一些超参数优化算法被写成“调度算法”。这些 Trial Scheduler 可以提前终止不良试验、暂停试验、克隆试验以及更改正在运行的试验的超参数。
所有 Trial Scheduler 都接受 一个 metric,这是在您的可训练结果字典中返回的值,并根据 最大化或最小化mode。
tune.run( ... , scheduler=Scheduler(metric="accuracy", mode="max"))
[Ray.Tune]调度算法简介与常用算法代码

使用调度程序时,可能会遇到兼容性的问题,某些调度程序不能与搜索算法一起使用,并且某些调度程序的实现需要依赖检查点。
调度程序可以在在调整起见动态更改使用资源的需求,ResourceChangingScheduler,它可以与其他的调度程序兼容。

各调度算法简介

  • ASHAScheduler:最容易的调度程序,将积极地终止低性能的实验。
  • AsyncHyperBandScheduler:实现异步连续减半,基于Hyperband调度程序,解决了后者面临的落后问题。
  • HyperBandScheduler:建议在该标准的Hyperband调度程序上使用ASHA调度程序。Tune的停止标准将与HyperBand的提前停止机制一起应用。
  • MedianStoppingRule:中值停止规则。如果一个试验的性能在莱斯的时间点低于其他试验的中值,则停止该试验。
  • PopulationBasedTraining:基于人口的训练。每个试验变体都被视为群体的成员,定期检查表现最好的试验。低绩效试验克隆了最佳绩效者的检查点,并扰乱了配置,一起发现最好的变体。与其他超参数搜索算法不同,PBT在训练期间会改变超参数,这可以实现非常快速的超参数发现,并自动发现良好的退火时间表。
  • PopulationBasedTrainingReplay:基于人口训练回访。用于重播PBT的超参数计划。严格来说,它并不是一个调度程序。反倒是一个PBT的恢复工具。
  • PB2。建立在PBT之上,主要区别在于PB2使用高斯过程模型而不是使用随机扰动来选择新的超参数配置。**其主要动机是能够以很小的人口规模找到有前途的超参数。**PB2并行训练一组模型,周期性地,表现不佳的模型会克隆表现最佳的模型,并使用GP-bandit优化重新选择超参数。训练GP模型以预测下一个训练周期的改进。
  • HyperBandForBOHB:BOHB。是HyperBand的变体。与原始的Hyperband实现相同,但是并不实现流水线或者straggler mitigation。它建议与TuneBOHB搜索算法结合使用。

常用的调度算法例子

1 AsyncHyperBandScheduler

import ray
from ray import tune
from ray.tune.schedulers import AsyncHyperBandScheduler

def easy_objective(config):
    # Hyperparameters
    width, height = config["width"], config["height"]

    for step in range(config["steps"]):
        # Iterative training function - can be an arbitrary training procedure
        intermediate_score = evaluation_fn(step, width, height)
        # Feed the score back back to Tune.
        tune.report(iterations=step, mean_loss=intermediate_score)

if __name__ == "__main__":
    # AsyncHyperBand enables aggressive early stopping of bad trials.
    scheduler = AsyncHyperBandScheduler(grace_period=5, max_t=100)

    # 'training_iteration' is incremented every time `trainable.step` is called
    stopping_criteria = {"training_iteration": 1 if args.smoke_test else 9999}

    analysis = tune.run(
        easy_objective,
        name="asynchyperband_test",
        metric="mean_loss",
        mode="min",
        scheduler=scheduler,
        stop=stopping_criteria,
        num_samples=20,
        verbose=1,
        resources_per_trial={
            "cpu": 1,
            "gpu": 0
        },
        config={  # Hyperparameter space
            "steps": 100,
            "width": tune.uniform(10, 100),
            "height": tune.uniform(0, 100),
        })
    print("Best hyperparameters found were: ", analysis.best_config)

2 PB2

import os
import random
import argparse
import pandas as pd
from datetime import datetime

from ray.tune import run, sample_from
from ray.tune.schedulers import PopulationBasedTraining
from ray.tune.schedulers.pb2 import PB2

# Postprocess the perturbed config to ensure it's still valid used if PBT.
def explore(config):
    # Ensure we collect enough timesteps to do sgd.
    if config["train_batch_size"] < config["sgd_minibatch_size"] * 2:
        config["train_batch_size"] = config["sgd_minibatch_size"] * 2
    # Ensure we run at least one sgd iter.
    if config["lambda"] > 1:
        config["lambda"] = 1
    config["train_batch_size"] = int(config["train_batch_size"])
    return config


if __name__ == "__main__":

    parser = argparse.ArgumentParser()
    parser.add_argument("--max", type=int, default=1000000)
    parser.add_argument("--algo", type=str, default="PPO")
    parser.add_argument("--num_workers", type=int, default=4)
    parser.add_argument("--num_samples", type=int, default=4)
    parser.add_argument("--t_ready", type=int, default=50000)
    parser.add_argument("--seed", type=int, default=0)
    parser.add_argument(
        "--horizon", type=int, default=1600)  # make this 1000 for other envs
    parser.add_argument("--perturb", type=float, default=0.25)  # if using PBT
    parser.add_argument("--env_name", type=str, default="BipedalWalker-v2")
    parser.add_argument(
        "--criteria", type=str,
        default="timesteps_total")  # "training_iteration", "time_total_s"
    parser.add_argument(
        "--net", type=str, default="32_32"
    )  # May be important to use a larger network for bigger tasks.
    parser.add_argument("--filename", type=str, default="")
    parser.add_argument("--method", type=str, default="pb2")  # ['pbt', 'pb2']
    parser.add_argument("--save_csv", type=bool, default=False)

    args = parser.parse_args()

    # bipedalwalker needs 1600
    if args.env_name in ["BipedalWalker-v2", "BipedalWalker-v3"]:
        horizon = 1600
    else:
        horizon = 1000

    pbt = PopulationBasedTraining(
        time_attr=args.criteria,
        metric="episode_reward_mean",
        mode="max",
        perturbation_interval=args.t_ready,
        resample_probability=args.perturb,
        quantile_fraction=args.perturb,  # copy bottom % with top %
        # Specifies the search space for these hyperparams
        hyperparam_mutations={
            "lambda": lambda: random.uniform(0.9, 1.0),
            "clip_param": lambda: random.uniform(0.1, 0.5),
            "lr": lambda: random.uniform(1e-3, 1e-5),
            "train_batch_size": lambda: random.randint(1000, 60000),
        },
        custom_explore_fn=explore)

    pb2 = PB2(
        time_attr=args.criteria,
        metric="episode_reward_mean",
        mode="max",
        perturbation_interval=args.t_ready,
        quantile_fraction=args.perturb,  # copy bottom % with top %
        # Specifies the hyperparam search space
        hyperparam_bounds={
            "lambda": [0.9, 1.0],
            "clip_param": [0.1, 0.5],
            "lr": [1e-3, 1e-5],
            "train_batch_size": [1000, 60000]
        })

    methods = {"pbt": pbt, "pb2": pb2}

    timelog = str(datetime.date(datetime.now())) + "_" + str(
        datetime.time(datetime.now()))

    args.dir = "{}_{}_{}_Size{}_{}_{}".format(args.algo,
                                              args.filename, args.method,
                                              str(args.num_samples),
                                              args.env_name, args.criteria)

    analysis = run(
        args.algo,
        name="{}_{}_{}_seed{}_{}".format(timelog, args.method, args.env_name,
                                         str(args.seed), args.filename),
        scheduler=methods[args.method],
        verbose=1,
        num_samples=args.num_samples,
        stop={args.criteria: args.max},
        config={
            "env": args.env_name,
            "log_level": "INFO",
            "seed": args.seed,
            "kl_coeff": 1.0,
            "num_gpus": 0,
            "horizon": horizon,
            "observation_filter": "MeanStdFilter",
            "model": {
                "fcnet_hiddens": [
                    int(args.net.split("_")[0]),
                    int(args.net.split("_")[1])
                ],
                "free_log_std": True
            },
            "num_sgd_iter": 10,
            "sgd_minibatch_size": 128,
            "lambda": sample_from(lambda spec: random.uniform(0.9, 1.0)),
            "clip_param": sample_from(lambda spec: random.uniform(0.1, 0.5)),
            "lr": sample_from(lambda spec: random.uniform(1e-3, 1e-5)),
            "train_batch_size": sample_from(
                lambda spec: random.randint(1000, 60000))
        })

    all_dfs = analysis.trial_dataframes
    names = list(all_dfs.keys())

    results = pd.DataFrame()
    for i in range(args.num_samples):
        df = all_dfs[names[i]]
        df = df[[
            "timesteps_total", "episodes_total", "episode_reward_mean",
            "info/learner/default_policy/cur_kl_coeff"
        ]]
        df["Agent"] = i
        results = pd.concat([results, df]).reset_index(drop=True)

    if args.save_csv:
        if not (os.path.exists("data/" + args.dir)):
            os.makedirs("data/" + args.dir)

        results.to_csv("data/{}/seed{}.csv".format(args.dir, str(args.seed)))
上一篇:春季-无法刷新目标“ jmsinqueue”的JMS连接-在5000毫秒内重试


下一篇:在wildfly 21中搭建cluster集群