Energym Google Cloud API

In this project, an API based on RESTfull API for gcloud has been designed and developed in order to use Google Cloud infrastructure directly writing experiments definition ir our personal computer.

Sinergym cloud API diagram

Let’s see a detailed explanation of the diagram above.

Executing API

Our objective is defining a set of experiments in order to execute them in a Google Cloud remote container each one. For this, cloud_manager.py has been created in repository root. This file must be used in our local computer:

import argparse
from time import sleep
from pprint import pprint
import energym.utils.gcloud as gcloud

parser = argparse.ArgumentParser(
    description='Process for run experiments in Google Cloud')
parser.add_argument(
    '--project_id',
    '-id',
    type=str,
    dest='project',
    help='Your Google Cloud project ID.')
parser.add_argument(
    '--zone',
    '-zo',
    type=str,
    default='europe-west1-b',
    dest='zone',
    help='service Engine zone to deploy to.')
parser.add_argument(
    '--template_name',
    '-tem',
    type=str,
    default='energym-template',
    dest='template_name',
    help='Name of template previously created in gcloud account to generate VM copies.')
parser.add_argument(
    '--group_name',
    '-group',
    type=str,
    default='energym-group',
    dest='group_name',
    help='Name of instance group(MIG) will be created during experimentation.')
parser.add_argument(
    '--experiment_commands',
    '-cmds',
    default=['python3 ./algorithm/DQN.py -env Eplus-demo-v1 -ep 1 -'],
    nargs='+',
    dest='commands',
    help='list of commands for DRL_battery.py you want to execute remotely.')

args = parser.parse_args()

print('Init Google cloud service API...')
service = gcloud.init_gcloud_service()

# Create instance group
n_experiments = len(args.commands)
print('Creating instance group(MIG) for experiments ({} instances)...'.format(
    n_experiments))
response = gcloud.create_instance_group(
    service=service,
    project=args.project,
    zone=args.zone,
    size=n_experiments,
    template_name=args.template_name,
    group_name=args.group_name)
pprint(response)

# Wait for the machines to be fully created.
print(
    '{0} status is {1}.'.format(
        response['operationType'],
        response['status']))
if response['status'] != 'DONE':
    response = gcloud.wait_for_operation(
        service,
        args.project,
        args.zone,
        operation=response['id'],
        operation_type=response['operationType'])
pprint(response)
print('MIG created.')

# List VM names
print('Looking for instance names... (waiting for they are visible too)')
# Sometimes, although instance group insert status is DONE, isn't visible
# for API yet. Hence, we have to wait for with a loop...
instances = []
while len(instances) < n_experiments:
    instances = gcloud.list_instances(
        service=service,
        project=args.project,
        zone=args.zone,
        base_instances_names=args.group_name)
    sleep(3)
print(instances)
# Number of machines should be the same than commands

# Execute a comand in every container inner VM
print('Sending commands to every container VM... (waiting for container inner VM is ready too)')
for i, instance in enumerate(instances):
    container_id = None
    # Obtain container id inner VM
    while not container_id:
        container_id = gcloud.get_container_id(instance_name=instance)
        sleep(5)
    # Execute command in container
    gcloud.execute_remote_command_instance(
        container_id=container_id,
        instance_name=instance,
        experiment_command=args.commands[i])
    print(
        'command {} has been sent to instance {}(container: {}).'.format(
            args.commands[i],
            instance,
            container_id))

print('All VM\'s are working correctly, see Google Cloud Platform Console.')
# Close VM when finished with google cloud alerts?

# python  cloud_manager.py --project_id energym-314709
# --experiments_commands
# 'python 3 DRL_battery.py --environment Eplus-5Zone-hot-discrete-v1 --episodes 3 --algorithm DQN --logger log_interval 1 --seed 54'
# 'python3 DRL_battery.py --environment Eplus-5Zone-hot-continuous-stochastic-v1 --episodes 3 --algorithm PPO --logger --log_interval 1 --tensorboard --normalization --seed 54'
# --template_name energym_template --group_name sinergym-vm

This script uses the following parameters:

  • --project_id or -id: Your Google Cloud project id must be specified.

  • --zone or -zo: Zone for your project (default is europe-west1-b).

  • --template_name or -tem: Template used to generate VM’s clones, defined in your project previously (see 4. Create your VM or MIG).

  • --group_name or -group: Instance group name you want. All instances inner MIG will have this name concatenated with a random str.

  • --experiment_commands or -cmds: Experiment definitions list using python command format (for information about its format, see Receiving experiments in remote containers).

Here is an example bash code to execute the script:

$ python cloud_manager.py \
    --project_id ${PROJECT_ID} \
    --zone europe-west1-b \
    --template_name energym-template \
    --group_name sinergym \
    --experiment_commands \
    'python3 DRL_battery.py --environment Eplus-5Zone-hot-discrete-v1 --episodes 3 --algorithm DQN --logger --log_interval 1 --seed 54' \
    'python3 DRL_battery.py --environment Eplus-5Zone-hot-continuous-stochastic-v1 --episodes 3 --algorithm PPO --logger --log_interval 1 --tensorboard ./tensorboard_log  --normalization --seed 54'

This example generates only 2 machines inner an instance group in your Google Cloud Platform because of you have defined two experiments. If you defined more experiments, more machines will be created by API.

Note

Because of its real-time process. Some containers, instance list action and others could take time. In that case, the API wait a process finish to execute the next (when it is necessary).

Note

This script uses gcloud API in background. Methods developed and used to this issues can be seen in energym/energym/utils/gcloud.py or in API reference.

Receiving experiments in remote containers

This script, called DRL_battery.py, will be allocated in every remote container and it is used to understand experiments command exposed above:

import gym
import energym
import argparse
import uuid
import mlflow

import numpy as np

from energym.utils.callbacks import LoggerCallback, LoggerEvalCallback
from energym.utils.wrappers import MultiObsWrapper, NormalizeObservation, LoggerWrapper
from energym.utils.rewards import *


from stable_baselines3.common.noise import NormalActionNoise, OrnsteinUhlenbeckActionNoise
from stable_baselines3 import A2C, DDPG, DQN, PPO, SAC
from stable_baselines3.common.callbacks import EvalCallback, BaseCallback, CallbackList
from stable_baselines3.common.vec_env import DummyVecEnv

#--------------------------------BATTERY ARGUMENTS DEFINITION---------------------------------#
parser = argparse.ArgumentParser()
# commons arguments for battery
parser.add_argument(
    '--environment',
    '-env',
    required=True,
    type=str,
    dest='environment',
    help='Environment name of simulation (see energym/__init__.py).')
parser.add_argument(
    '--episodes',
    '-ep',
    type=int,
    default=1,
    dest='episodes',
    help='Number of episodes for training.')
parser.add_argument(
    '--algorithm',
    '-alg',
    type=str,
    default='PPO',
    dest='algorithm',
    help='Algorithm used to train (possible values: PPO, A2C, DQN, DDPG, SAC).')
parser.add_argument(
    '--reward',
    '-rw',
    type=str,
    default='linear',
    dest='reward',
    help='Reward function used by model, by default is linear (possible values: linear, exponential).')
parser.add_argument(
    '--normalization',
    '-norm',
    action='store_true',
    dest='normalization',
    help='Apply normalization to observations if this flag is specified.')
parser.add_argument(
    '--multiobs',
    '-mobs',
    action='store_true',
    dest='multiobs',
    help='Apply Multi observations if this flag is specified.')
parser.add_argument(
    '--logger',
    '-log',
    action='store_true',
    dest='logger',
    help='Apply Energym CSVLogger class if this flag is specified.')
parser.add_argument(
    '--tensorboard',
    '-tens',
    type=str,
    default=None,
    dest='tensorboard',
    help='Tensorboard path for logging (if not specified, tensorboard log will not be stored).')
parser.add_argument(
    '--evaluation',
    '-eval',
    action='store_true',
    dest='evaluation',
    help='Evaluation is processed during training with this flag (save best model online).')
parser.add_argument(
    '--eval_freq',
    '-evalf',
    type=int,
    default=2,
    dest='eval_freq',
    help='Episodes executed before applying evaluation (if evaluation flag is not specified, this value is useless).')
parser.add_argument(
    '--eval_length',
    '-evall',
    type=int,
    default=2,
    dest='eval_length',
    help='Episodes executed during evaluation (if evaluation flag is not specified, this value is useless).')
parser.add_argument(
    '--log_interval',
    '-inter',
    type=int,
    default=1,
    dest='log_interval',
    help='model training log_interval parameter. See documentation since this value is different in every algorithm.')
parser.add_argument(
    '--seed',
    '-sd',
    type=int,
    default=None,
    dest='seed',
    help='Seed used to algorithm training.')

parser.add_argument('--learning_rate', '-lr', type=float, default=.0007)
parser.add_argument('--gamma', '-g', type=float, default=.99)
parser.add_argument('--n_steps', '-n', type=int, default=5)
parser.add_argument('--gae_lambda', '-gl', type=float, default=1.0)
parser.add_argument('--ent_coef', '-ec', type=float, default=0)
parser.add_argument('--vf_coef', '-v', type=float, default=.5)
parser.add_argument('--max_grad_norm', '-m', type=float, default=.5)
parser.add_argument('--rms_prop_eps', '-rms', type=float, default=1e-05)
parser.add_argument('--buffer_size', '-bfs', type=int, default=1000000)
parser.add_argument('--learning_starts', '-ls', type=int, default=100)
parser.add_argument('--tau', '-tu', type=float, default=0.005)
# for DDPG noise only
parser.add_argument('--sigma', '-sig', type=float, default=0.1)

args = parser.parse_args()
#---------------------------------------------------------------------------------------------#

# Environment construction (with reward specified)
if args.reward == 'linear':
    env = gym.make(args.environment, reward=LinearReward())
elif args.reward == 'exponential':
    env = gym.make(args.environment, reward=ExpReward())
else:
    raise RuntimeError('Reward function specified is not registered.')

# env wrappers (optionals)
if args.normalization:
    env = NormalizeObservation(env)
if args.logger:
    env = LoggerWrapper(env)
if args.multiobs:
    env = MultiObsWrapper(env)


######################## TRAINING ########################

# Defining model(algorithm)
model = None
#--------------------------DQN---------------------------#
if args.algorithm == 'DQN':
    model = DQN('MlpPolicy', env, verbose=1,
                learning_rate=args.learning_rate,
                buffer_size=args.buffer_size,
                learning_starts=50000,
                batch_size=32,
                tau=args.tau,
                gamma=args.gamma,
                train_freq=4,
                gradient_steps=1,
                target_update_interval=10000,
                exploration_fraction=.1,
                exploration_initial_eps=1.0,
                exploration_final_eps=.05,
                max_grad_norm=args.max_grad_norm,
                seed=args.seed,
                tensorboard_log=args.tensorboard)
#--------------------------------------------------------#

#--------------------------DDPG--------------------------#
# The noise objects for DDPG
elif args.algorithm == 'DDPG':
    if args.sigma:
        n_actions = env.action_space.shape[-1]
        action_noise = NormalActionNoise(mean=np.zeros(
            n_actions), sigma=0.1 * np.ones(n_actions))

    model = DDPG("MlpPolicy",
                 env,
                 action_noise=action_noise,
                 verbose=1,
                 seed=args.seed,
                 tensorboard_log=args.tensorboard)
#--------------------------------------------------------#

#--------------------------A2C---------------------------#
elif args.algorithm == 'A2C':
    model = A2C('MlpPolicy', env, verbose=1,
                learning_rate=args.learning_rate,
                n_steps=args.n_steps,
                gamma=args.gamma,
                gae_lambda=args.gae_lambda,
                ent_coef=args.ent_coef,
                vf_coef=args.vf_coef,
                max_grad_norm=args.max_grad_norm,
                rms_prop_eps=args.rms_prop_eps,
                seed=args.seed,
                tensorboard_log=args.tensorboard)
#--------------------------------------------------------#

#--------------------------PPO---------------------------#
elif args.algorithm == 'PPO':
    model = PPO('MlpPolicy', env, verbose=1,
                learning_rate=args.learning_rate,
                n_steps=args.n_steps,
                batch_size=64,
                n_epochs=10,
                gamma=args.gamma,
                gae_lambda=args.gae_lambda,
                clip_range=.2,
                ent_coef=0,
                vf_coef=.5,
                max_grad_norm=args.max_grad_norm,
                seed=args.seed,
                tensorboard_log=args.tensorboard)
#--------------------------------------------------------#

#--------------------------SAC---------------------------#
elif args.algorithm == 'SAC':
    model = SAC(policy='MlpPolicy',
                env=env,
                seed=args.seed,
                tensorboard_log=args.tensorboard)
#--------------------------------------------------------#

#-------------------------ERROR?-------------------------#
else:
    raise RuntimeError('Algorithm specified is not registered.')
#--------------------------------------------------------#

# Calculating n_timesteps_episode for training
n_timesteps_episode = env.simulator._eplus_one_epi_len / \
    env.simulator._eplus_run_stepsize
timesteps = args.episodes * n_timesteps_episode

# For callbacks processing
env = DummyVecEnv([lambda: env])

# Using Callbacks for training
callbacks = []

if args.evaluation:
    eval_callback = LoggerEvalCallback(
        env,
        best_model_save_path='./best_models/' + args.environment + '/',
        log_path='./best_models/' + args.environment + '/',
        eval_freq=n_timesteps_episode * args.eval_freq,
        deterministic=True,
        render=False,
        n_eval_episodes=args.eval_length)
    callbacks.append(eval_callback)

if args.tensorboard:
    log_callback = LoggerCallback(energym_logger=bool(args.logger))
    callbacks.append(log_callback)

callback = CallbackList(callbacks)

# Training
model.learn(
    total_timesteps=timesteps,
    callback=callback,
    log_interval=args.log_interval)
# model.save(name)

The list of parameter is pretty large. Let’s see it:

  • --environment or -env: Environment name you want to use (see Environments)

  • --episodes or -ep: Number of episodes you want to train agent in simulation (Depending on environment episode length can be different)

  • --algorithm or -alg: Algorithm you want to use to train (Currently, it is available PPO, A2C, DQN, DDPG and SAC)

  • --reward or -rw: Reward class you want to use for reward function. Currently, possible values are “linear” and “exponential”(see Rewards).

  • --normalization or -norm: Apply normalization wrapper to observations during training. If it isn’t specified wrapper will not be applied (see Wrappers).

  • --multiobs or -mobs: Apply Multi-Observation wrapper to observations during training. If it isn’t specified wrapper will not be applied (see Wrappers).

  • --logger or -log: Apply Sinergym logger wrapper during training. If it isn’t specified wrapper will not be applied (see Wrappers and Logger).

  • --tensorboard or -tens: This parameter will contain a path-file to allocate tensorboard training logs. If it isn’t specified this log will be deactivate (see DRL Logger).

  • --evaluation or -eval: If it is specified, evaluation callback will be activate, else model evaluation will be deactivate during training (see Deep Reinforcement Learning Integration).

  • --eval_freq or -evalf: Only if --evaluation flag has been written. Episode frequency for evaluation.

  • --eval_length or -evall: Only if --evaluation flag has been written. Number of episodes for each evaluation.

  • --log_interval or -inter: This parameter is used for learn() method in each algorithm. It is important specify a correct value.

  • --seed or -sd: Seed for training, random components in process will be able to be recreated.

  • algorithm hyperparameters: Execute python DRL_battery --help for more information.

Google Cloud Alerts

Google Cloud Platform include functionality in order to trigger some events and generate alerts in consequence. Then, a trigger has been created in our gcloud project which aim to advertise when an experiment has finished. This alert can be captured in several ways (slack, sms, email, etc). If you want to do the same, please, check Google Cloud Alerts documentation here.