"""MDP Solver base class."""

from abc import ABCMeta, abstractmethod
import numpy as np
import logging
from copy import deepcopy
from rlpy.Tools import className, deltaT, hhmmss, clock, l_norm, vec2id, checkNCreateDirectory
from collections import defaultdict
import os
import json

__copyright__ = "Copyright 2013, RLPy http://acl.mit.edu/RLPy"
__credits__ = ["Alborz Geramifard", "Robert H. Klein", "Christoph Dann",
               "William Dabney", "Jonathan P. How"]
__license__ = "BSD 3-Clause"
__author__ = "N. Kemal Ure"

[docs]class MDPSolver(object): """MDPSolver is the base class for model based reinforcement learning agents and planners. Args: job_id (int): Job ID number used for running multiple jobs on a cluster. representation (Representation): Representation used for the value function. domain (Domain): Domain (MDP) to solve. planning_time (int): Maximum amount of time in seconds allowed for planning. Defaults to inf (unlimited). convergence_threshold (float): Threshold for determining if the value function has converged. ns_samples (int): How many samples of the successor states to take. project_path (str): Output path for saving the results of running the MDPSolver on a domain. log_interval (int): Minimum number of seconds between displaying logged information. show (bool): Enable visualization? """ __metaclass__ = ABCMeta representation = None # Link to the representation object domain = None # Link to the domain object # A simple objects that record the prints in a file logger = None # Amount of time in seconds provided for the solver. After this it returns # its performance. planning_time = None # The job id of this run of the algorithm exp_id = None # To make sure all same job ids see the same random sequence mainSeed = 999999999 # Maximum number of runs of an algorithm for averaging maxRuns = 100 # Threshold to determine the convergence of the planner convergence_threshold = None # Number of samples to be used to generate estimated bellman backup if the # domain does not provide explicit probabilities though expectedStep # function. ns_samples = None # Number of bellman backups before reporting the performance. (Not all # planners may use this) log_interval = None show = None # Show the learning if possible? def __init__( self, job_id, representation, domain, planning_time=np.inf, convergence_threshold=.005, ns_samples=100, project_path='.', log_interval=5000, show=False): self.exp_id = job_id self.representation = representation self.domain = domain self.logger = logging.getLogger("rlpy.MDPSolvers." + self.__class__.__name__) self.ns_samples = ns_samples self.planning_time = planning_time self.project_path = project_path self.log_interval = log_interval self.show = show self.convergence_threshold = convergence_threshold # Set random seed for this job id np.random.seed(self.mainSeed) self.randomSeeds = np.random.randint(1, self.mainSeed, (self.maxRuns, 1)) np.random.seed(self.randomSeeds[self.exp_id - 1, 0]) # TODO setup logging to file in experiment # create a dictionary of results self.result = defaultdict(list) self.result["seed"] = self.exp_id self.output_filename = '{:0>3}-results.json'.format(self.exp_id) @abstractmethod
[docs] def solve(self): """Solve the domain MDP.""" # Abstract self.logger.info( 'Value of S0 is = %0.5f' % self.representation.V(*self.domain.s0())) self.saveStats()
def printAll(self): printClass(self)
[docs] def BellmanBackup(self, s, a, ns_samples, policy=None): """Applied Bellman Backup to state-action pair s,a i.e. Q(s,a) = E[r + discount_factor * V(s')] If policy is given then Q(s,a) = E[r + discount_factor * Q(s',pi(s')] Args: s (ndarray): The current state a (int): The action taken in state s ns_samples(int): Number of next state samples to use. policy (Policy): Policy object to use for sampling actions. """ Q = self.representation.Q_oneStepLookAhead( s, a, ns_samples, policy) s_index = vec2id( self.representation.binState(s), self.representation.bins_per_dim) weight_vec_index = int(self.representation.agg_states_num * a + s_index) self.representation.weight_vec[weight_vec_index] = Q
[docs] def performanceRun(self): """Set Exploration to zero and sample one episode from the domain.""" eps_length = 0 eps_return = 0 eps_term = False eps_discounted_return = 0 s, eps_term, p_actions = self.domain.s0() # if self.visualize_performance: # self.domain.showLearning(self.representation) while not eps_term and eps_length < self.domain.episodeCap: a = self.representation.bestAction( s, eps_term, p_actions) r, ns, eps_term, p_actions = self.domain.step(a) s = ns eps_discounted_return += self.domain.discount_factor ** eps_length * r eps_return += r eps_length += 1 return eps_return, eps_length, eps_term, eps_discounted_return
def saveStats(self): fullpath_output = os.path.join(self.project_path, self.output_filename) print ">>> ", fullpath_output checkNCreateDirectory(self.project_path + '/') with open(fullpath_output, "w") as f: json.dump(self.result, f, indent=4, sort_keys=True)
[docs] def hasTime(self): """Return a boolean stating if there is time left for planning.""" return deltaT(self.start_time) < self.planning_time
[docs] def IsTabularRepresentation(self): ''' Check to see if the representation is Tabular as Policy Iteration and Value Iteration only work with Tabular representation ''' return className(self.representation) == 'Tabular' return True
[docs] def collectSamples(self, samples): """ Return matrices of S,A,NS,R,T where each row of each numpy 2d-array is a sample by following the current policy. - S: (#samples) x (# state space dimensions) - A: (#samples) x (1) int [we are storing actionIDs here, integers] - NS:(#samples) x (# state space dimensions) - R: (#samples) x (1) float - T: (#samples) x (1) bool See :py:meth:`~rlpy.Agents.Agent.Agent.Q_MC` and :py:meth:`~rlpy.Agents.Agent.Agent.MC_episode` """ domain = self.representation.domain S = np.empty( (samples, self.representation.domain.state_space_dims), dtype=type(domain.s0())) A = np.empty((samples, 1), dtype='uint16') NS = S.copy() T = A.copy() R = np.empty((samples, 1)) sample = 0 eps_length = 0 # So the first sample forces initialization of s and a terminal = True while sample < samples: if terminal or eps_length > self.representation.domain.episodeCap: s, terminal, possible_actions = domain.s0() a = self.policy.pi(s, terminal, possible_actions) # Transition r, ns, terminal, possible_actions = domain.step(a) # Collect Samples S[sample] = s A[sample] = a NS[sample] = ns T[sample] = terminal R[sample] = r sample += 1 eps_length += 1 s = ns a = self.policy.pi(s, terminal, possible_actions) return S, A, NS, R, T