Source code for routingtools.optimization

# -*- coding: utf-8 -*-
"""Optimization of vehicle routes.

This module contains classes and functions to solve vehicle routing optimization
problems. It uses the Google OR Tools library as a workhorse.

.. _Google OR Tools documentation:
  https://developers.google.com/optimization/routing

"""

# External imports.
import logging
import time
from ortools.constraint_solver import pywrapcp
from ortools.constraint_solver import routing_enums_pb2 as algorithms

# Internal imports.
from routingtools import osrm, _parsers, _utils
from routingtools.exceptions import CostConstraintTooLowError, NoSolutionFoundError

# Log settings.
logger = logging.getLogger(__name__)

[docs]class RouteOptimizer(): """Interface to solve a vehicle routing optimization problem. The goal of the route optimizer is to find the best routes for a fleet of vehicles (i.e. buses) visiting a set of locations (i.e. bus stops). In general, optimization problems consist of an objective accompanied by constraints. The objective specifies the quantity to be optimized. The constraints describe restrictions on the set of possible solutions, based on specific requirements of the problem. In the vehicle routing case, the objective is to minimize the travel cost of the most expensive route. The travel cost is quantified as the time it takes to travel the route. As a constraint, a route can never be longer than the given maximum route duration. A second, optional constraint relates to capacities of the vehicles. At each bus stop there is a number of people to pick up (i.e. the demand), and each bus has a maximum number of people it can fit (i.e. the capacity). The constraint is that a vehicle can, along its route, never pick up more people than its maximum capacity. The optimization process will first use a specified method to find an initial solution. Then, a specified local search metaheuristic will be used to search the solution space around that initial solution, and see if there are better solutions to be found. Some metaheuristics can escaped local minima. That is, solutions cheaper than all nearby solutions, but NOT the overall cheapest solution possible. The route optimizer uses the Google OR Tools library to perform the optimization process. See https://developers.google.com/optimization/routing. Internally, three main components need to be set before solving the problem: * The `Data Model`: stores the data needed to solve the problem, in a format that can be easily queried by the other components. * The `Routing Index Manager`: converts OR Tools internal indices to node indices that correspond to the position of stop nodes in the data model. * The `Routing Model`: provides an interface that can set the objective and constraints of the problem, and solve it accordingly. The route optimizer can be initialized with a set of parameters, either as keyword arguments or as a dictionary. Most of the parameters have default values. The parameters are: * `event`: List of event objects. Required parameter with no default. An event object is a dictionary with the following keys: * `name`: The name of the event as a string. * `coordinates`: The location of the event as a dictionary with a `lat` key referring to an integer value describing the latitude coordinate of the location, and a `lon` key referring to an integer value describing the longitude coordinate of the location. Coordinate values should be expressed in the WGS84 geographic coordinate reference system (i.e. EPSG:4326). * `stops`: List of stop objects. Required parameter with no default. A stop object is a dictionary with the following keys: * `name`: The name of the stop as a string. * `coordinates`: The location of the stop as a dictionary with a `lat` key referring to an integer value describing the latitude coordinate of the location, and a `lon` key referring to an integer value describing the longitude coordinate of the location. Coordinate values should be expressed in the WGS84 geographic coordinate reference system (i.e. EPSG:4326). * `load`: The number of people to be picked up at the stop as integer. * `vehicles`: List of vehicle objects. required only when capacity constraints should be applied. If not given, it will be created automatically with `create_vehicle_data()`. A vehicle object is a dictionary with the following keys: * `name`: The name of the vehicle as a string. * `coordinates`: The start location of the vehicle as a dictionary with a `lat` key referring to a float describing the latitude coordinate of the location, and a `lon` key referring to a float describing the longitude coordinate of the location. Coordinate values should be expressed in the WGS84 geographic coordinate reference system (i.e. EPSG:4326). This key is not required when vehicle start locations should be arbitrary. * `capacity`: The maximum number of people that fit in the vehicle. * `vehicleCount`: Integer, describing the number of available vehicles. Required only when capacity constraints should *not* be applied *and* no vehicle data are given. In other cases, it will be automatically derived from the vehicle data. * `standardVehicleSizes`: List of integers, each describing the maximum capacity of a commonly available bus type. Required only when capacity constraints should be applied *and* no vehicle data are given. Default value set to `[8, 19, 37, 49, 57]`. * `fixedVehiclePenalty`: Integer, describing the penalty that should be given when using an extra vehicle, expressed in terms of route cost units (generally travel time in seconds). Alternatively, it can be set to "max", which refers to the maximum cost value of the cost matrix (i.e. the maximum travel time between two nodes). Required only when vehicle usage should be penalyzed, ignored otherwise. Default value set to `max`. * `maxRouteDuration`: Integer, describing the maximum allowed travel time of a route, in seconds. Default value set to `7200` (i.e. 2h). * `globalSpanCostCoefficient`: Integer, serving as a weight in the objective function of the optimization. The objective function to minimize is A + xB, where A is the total cost of all routes, B is the cost of the most expensive route, and x is the global span cost coefficient. Hence, a value of x = 1 gives equal importance to A and B while high values give more importance to B, and values close to zero give more importance to A. Default value set to `1`. * `firstSolutionStrategy`: String, describing the algorithm to use for finding a first solution to the problem. Default value set to `PATH_CHEAPEST_ARC`. See https://developers.google.com/optimization/routing/routing_options#first_sol_options for all available options. * `localSearchMetaheuristic`: String, describing the algorithm to use for performing local search. Default value set to `GUIDED_LOCAL_SEARCH`. See https://developers.google.com/optimization/routing/routing_options#local_search_options for all available options. * `searchTimeLimit`: Integer, describing the maximum amount of time a solution search may take, in seconds. If no solution is found within this time, an error will be returned. Default value set to `10`. * `arbitraryStarts`: Boolean, describing if the start location of a vehicle should be arbitrary or not. Default value set to `True`. Note that if this is set to `False`, vehicle data should always be given, with each vehicle object containing a coordinate object which describes the start location of the vehicle. * `capacityConstraints`: Boolean, describing if capacity constraints should be applied or not. Default value set to `True`. Note that if this is set to `True`, vehicle data should always be given, with each vehicle object containing a capacity key describing the maximum capacity of the vehicle. * `multipleVisits`: Boolean, describing if visits of multiple vehicles to the same stop should be allowed. Default value set to `True`. Ignored when capacity constraints are not applied. * `vehicleUsagePenalties`: Boolean, describing if a cost penalty should be given whenever an extra vehicle is used. If set to `True`, the `fixedVehiclePenalty` parameter should be set. Default value set to `True`. Example: >>> event = [ ... { ... "name": "Empire St. Martin", ... "coordinates": { ... "lon": 14.0603747, ... "lat": 48.416865 ... } ... } ... ] >>> stops = [ ... { ... "name": "Grieskirchen Bahnhof (Johannesstraße 3)", ... "coordinates": { ... "lon": 13.834073000000002, ... "lat": 48.232506 ... }, ... "load": 33 ... }, ... { ... "name": "Ried im Innkreis (Busbahnhof)", ... "coordinates": { ... "lon": 13.49006, ... "lat": 48.20989 ... }, ... "load": 14 ... } ... ] >>> optimizer = RouteOptimizer(event = event, stops = stops) >>> optimizer.get_solution() """ def __init__(self, **params): # Set the default values of the parameters. self.params = { "event": None, "stops": None, "vehicles": None, "vehicleCount": None, "standardVehicleSizes": [8, 19, 37, 49, 57], "fixedVehiclePenalty": "max", "maxRouteDuration": 60 * 60 * 2, "globalSpanCostCoefficient": 1, "firstSolutionStrategy": "PATH_CHEAPEST_ARC", "localSearchMetaheuristic": "GUIDED_LOCAL_SEARCH", "searchTimeLimit": 10, "arbitraryStarts": True, "capacityConstraints": True, "multipleVisits": True, "vehicleUsagePenalties": True } # Update the default parameters with the given parameters. self.params.update(params) # Check if required parameters are present. for x in ["event", "stops"]: if self.params[x] is None: raise ValueError("Required parameter {!r} is missing".format(x)) if self.params["capacityConstraints"]: # Create vehicle data if not given. if self.params["vehicles"] is None: self.params["vehicles"] = self.create_vehicle_data( self.params["stops"], self.params["standardVehicleSizes"] ) # Split stop nodes into sub-nodes if multiple visits should be allowed. if self.params["multipleVisits"]: self.params["stops"] = self.split_stops( self.params["stops"], self.params["standardVehicleSizes"] ) else: # Multiple visits should always be False when there are no capacity constraints. self.params["multipleVisits"] = False # Define the vehicle count. if self.params["vehicles"] is None: if self.params["vehicleCount"] is None: self.params["vehicleCount"] = len(self.params["stops"]) else: self.params["vehicleCount"] = int(self.params["vehicleCount"]) else: self.params["vehicleCount"] = len(self.params["vehicles"]) logger.debug("Using the following parameters:") logger.debug(self.params) # Initialize other class attributes. self.data_model = None self.routing_model = None self.routing_index_manager = None self.solution = None
[docs] def get_solution(self): """Solves the specified vehicle routing problem. Returns: dict: A result object. See the API reference for details. """ # Create the three required components. # 1) The data model: stores the data needed to solve the problem. # 2) The routing index manager: converts internal indices to node indices. # 3) The routing model: stores the objective and constraints of the problem. self.set_data_model() self.set_routing_index_manager() self.set_routing_model() # Retrieve the routing model. routing = self.routing_model # Initialize the default routing search parameters. # These parameters define how to search for the solution. search_parameters = pywrapcp.DefaultRoutingSearchParameters() # Define which heuristic method to use for finding a first solution. search_parameters.first_solution_strategy = ( getattr(algorithms.FirstSolutionStrategy, self.params["firstSolutionStrategy"]) ) logging.debug( "Registered {!r} as first solution stratgey".format( self.params["firstSolutionStrategy"] ) ) # Define which metaheuristic to use for optimizing the solution. # This allows the solver to escape a local minimum. # That is, a solution cheaper than all nearby routes, but not the global minimum. # After moving away from the local minimum, the solver continues the search. search_parameters.local_search_metaheuristic = ( getattr(algorithms.LocalSearchMetaheuristic, self.params["localSearchMetaheuristic"]) ) logging.debug( "Registered {!r} as local search metaheuristic".format( self.params["localSearchMetaheuristic"] ) ) # Set a time limit for the solution search. search_parameters.time_limit.seconds = self.params["searchTimeLimit"] logging.debug( "Set {!r} as time limit for the solution search".format( self.params["searchTimeLimit"] ) ) # Solve the problem. logging.info("Solution search started ...") start_time = time.time() self.solution = routing.SolveWithParameters(search_parameters) end_time = time.time() elapsed_time = str(round(end_time - start_time, 1)) # Return the solution in readable format. # A status of value 1 means a solution was found. if routing.status() == 1: logging.info("Solution found in {} seconds".format(elapsed_time)) # Format the solution. result = self._format_solution() logging.info("Solution contains {} routes".format(result["routeCount"])) return result else: raise NoSolutionFoundError(self._get_return_status(routing.status()))
[docs] def parse_event_data(self): """Parses the event data into a format that can be easiliy queried by the model.""" logger.info("Parsing the event data ...") data = {} data["names"] = [_parsers._parse_name(x) for x in self.params["event"]] data["coords"] = [_parsers._parse_coords(x) for x in self.params["event"]] logger.info("Successfully parsed the event data") logger.debug(data) return data
[docs] def parse_stop_data(self): """Parses the stop data into a format that can be easiliy queried by the model.""" logger.info("Parsing the stop data ...") data = {} # Names, coordinates and load values of the stops. # These are always required. data["names"] = [_parsers._parse_name(x) for x in self.params["stops"]] data["coords"] = [_parsers._parse_coords(x) for x in self.params["stops"]] data["loads"] = [_parsers._parse_load(x) for x in self.params["stops"]] # Parent node indices of the sub-nodes of the stops. # These are only required when multiple visits to the same stop are allowed. if self.params["multipleVisits"]: data["parents"] = [x["parent"] for x in self.params["stops"]] logger.info("Successfully parsed the stop data") logger.debug(data) return data
[docs] def parse_vehicle_data(self): """Parses the vehicle data into a format that can be easiliy queried by the model.""" logger.info("Parsing the vehicle data ...") data = {} # Number of available vehicles. count = self.params["vehicleCount"] if count < 1: raise ValueError("There should be at least 1 vehicle, not {}".format(count)) data["count"] = count # Names of the vehicles. if self.params["vehicles"] is not None: data["names"] = [_parsers._parse_name(x) for x in self.params["vehicles"]] # Coordinates of vehicle start locations. # These are only required when start locations should not be arbitrary. if not self.params["arbitraryStarts"]: if self.params["vehicles"] is None: raise ValueError("Vehicle start locations are unknown") data["coords"] = [_parsers._parse_coords(x) for x in self.params["vehicles"]] # Maximum capacity values of the vehicles. # These are only required when capacity constraints should be applied. if self.params["capacityConstraints"]: if self.params["vehicles"] is None: raise ValueError("Vehicle capacities are unknown") data["capacities"] = [_parsers._parse_capacity(x) for x in self.params["vehicles"]] logger.info("Successfully parsed the vehicle data") logger.debug(data) return data
[docs] def set_data_model(self): """Initializes the data model. The data model stores the data needed to solve the problem, in a format that can be easily queried by the other components. """ logger.info("Creating the data model ...") data = {} # Parse event, stop and vehicle data event_data = self.parse_event_data() stop_data = self.parse_stop_data() vehicle_data = self.parse_vehicle_data() # Construct the node data. # The event is always the first node, followed by the stops. # No people are picked up at the event, hence its load is 0. data["node_names"] = event_data["names"] + stop_data["names"] data["node_coords"] = event_data["coords"] + stop_data["coords"] data["node_loads"] = [0] + stop_data["loads"] data["node_count"] = len(data["node_names"]) # If stop nodes are splitted to allow multiple vists: # Add also the index of the parent node for each sub-node. # This index should refer to the node index, and not the stop index! # The node index is the stop index + 1 since the event node comes before the stops. # The event node is never splitted, hence it is the parent of itself. # When nodes are not splitted, every node is the parent of itself. if self.params["multipleVisits"]: data["node_parents"] = [0] + [x + 1 for x in stop_data["parents"]] else: data["node_parents"] = list(range(data["node_count"])) # Define at which node the route of each vehicle - if used - should end. # Vehicles will always end their route at the event node (i.e. node 0). data["vehicle_ends"] = [0] * vehicle_data["count"] # If start nodes are not arbitrary: # Define at which node the route of each vehicle - if used -should start. # This is at the location at which the vehicle is parked. # Those locations should thus also be added as nodes. if not self.params["arbitraryStarts"]: # Define node indices of vehicle start nodes. n = data["node_count"] # Number of stop nodes + event. data["vehicle_starts"] = list(range(n, n + vehicle_data["count"])) # Add vehicle locations to the nodes data. # No people are picked up at those locations, hence the loads are all 0. # These nodes are never splitted, hence they are all their own parent. data["node_names"].extend(vehicle_data["names"]) data["node_coords"].extend(vehicle_data["coords"]) data["node_loads"].extend([0] * vehicle_data["count"]) data["node_count"] = data["node_count"] + vehicle_data["count"] data["node_parents"].extend(data["vehicle_starts"]) # Calculate a cost matrix with all nodes. # This is an array whose i, j entry is the travel cost between locations i and j. data["matrix"] = osrm.get_cost_matrix("duration", data["node_coords"]) # If start nodes are arbitrary: # A dummy location will be added as extra node. # This 'dummy node' will have a cost of 0 to all other nodes in the matrix. # The dummy node will serve as start node for all vehicles. # Since the cost to all other nodes is 0, it does not matter what the first real # node of a route will be. # Hence, the real start node is arbitrary. if self.params["arbitraryStarts"]: # Since there is an new node, an extra column should be added to the cost matrix. # All entries in this column should equal 0. for row in data["matrix"]: row.append(0) # An extra row should also be added to the cost matrix. # All entries in this row should equal 0. data["matrix"].append(([0] * data["node_count"]) + [0]) # Finally, the node attribute lists in the data model should be updated. data["node_names"].append("dummy") data["node_coords"].append([None]) data["node_loads"].append(0) data["node_count"] = data["node_count"] + 1 dummy_node_index = data["node_count"] - 1 data["node_parents"].append(dummy_node_index) # The dummy node will be the start node for all vehicles. data["vehicle_starts"] = [dummy_node_index] * vehicle_data["count"] # If vehicle usage should be penalized: # A fixed vehicle usage cost will be included in the data model. # Otherwise, the fixed vehicle cost will always be 0. if self.params["vehicleUsagePenalties"]: x = self.params["fixedVehiclePenalty"] data["vehicle_penalty"] = _utils._get_max_cost(data["matrix"]) if x == "max" else x else: data["vehicle_penalty"] = 0 # Add the vehicle attributes as objects to the data model. # First add the prefix "vehicle" to all keys from the vehicle data. data.update({"vehicle_" + k: v for k, v in vehicle_data.items()}) self.data_model = data logger.info( "Created a data model with {} nodes and {} vehicles".format( data["node_count"], data["vehicle_count"] ) ) logger.debug(data)
[docs] def set_routing_model(self): """Initializes the routing model. The routing model provides an interface that can set the objective and constraints of the problem, and solve it accordingly. """ logger.info("Creating the routing model ...") # Retrieve the routing index manager. manager = self.routing_index_manager if manager is None: raise ValueError("The routing index manager is not set") # Retrieve the data model. data = self.data_model if data is None: raise ValueError("The data model is not set") # Initialize the routing model. routing = pywrapcp.RoutingModel(manager) # Create a cost callback function. # This function takes any pair of locations and returns the travel cost between them. # It finds this travel cost in the cost matrix stored in the data model. # The routing index manager converts the internal indices to node indices that can be # used to query the cost matrix. def cost_callback(from_index, to_index): """Returns the cost between two nodes.""" # Convert from routing variable index to matrix node index. from_node = manager.IndexToNode(from_index) to_node = manager.IndexToNode(to_index) return data["matrix"][from_node][to_node] # The cost callback function has to be 'registered' in the routing model. # This index is the solvers internal reference to the cost callback function. cost_callback_index = routing.RegisterTransitCallback(cost_callback) # The cost callback function is then used as arc cost evaluator. # This tells the solver how to calculate the travel cost between any two nodes. # In other words, the cost of the edge joining them in the routing graph. # Here, the arc cost evaluator is the cost callback index. # Hence, the travel cost between nodes is taken directly from the cost matrix. routing.SetArcCostEvaluatorOfAllVehicles(cost_callback_index) # The cumulative cost along a route should be added as a dimension of the problem. # The solver uses dimensions to keep track of quantities that accumulate over routes. # If a routing problem involves such a quantity, either in the constraints or the # objective function, we need to define a dimension to specify them. # In this case the objective is to minimize the cost of the most expensive route. # Hence, the travel cost should be added as a dimension. routing.AddDimension( cost_callback_index, 0, # No slack # TODO: Add a slack if needed. self.params["maxRouteDuration"] + data["vehicle_penalty"], # Max route cost. True, # Start cumulative to zero. "cost" # Dimension name. ) # Set the coefficient for the global span of the routes. # This serves as a weight in the objective function of the optimization. # The objective function to minimize is A + xB, where: # - A is the total cost of all routes # - B is the cost of the most expensive route # - x is the global span cost coefficient. # Hence, a value of x = 1 gives equal importance to A and B. # High values of x give more importance to B. # Values close to zero give more importance to A. cost_dim = routing.GetDimensionOrDie("cost") cost_dim.SetGlobalSpanCostCoefficient(self.params["globalSpanCostCoefficient"]) # Check if the node furthest from the event can be reached within max cost threshold. # Don't include the dummy node in this check. m0 = self.data_model["matrix"][0] # Cost values from all nodes to the event. m0_max = max(m0[:-1] if self.params["arbitraryStarts"] else m0) if m0_max > self.params["maxRouteDuration"]: raise CostConstraintTooLowError("Not all stops can be served within threshold") else: logger.debug( "Furthest node from event ({} s) is within max cost threshold ({} s)".format( m0_max, self.params["maxRouteDuration"] ) ) logger.info("Cost constraints included in the routing model") # If capacity constraints should be applied: # There should also be a callback function for stop loads. # Also the loads should be added as a dimension. # With the vehicle capacities as maximum load for each vehicle. # Then, the solver tracks the cumulative load a vehicle is carrying along its route. # To enforce the constraint that a vehicle's load can't exceed its capacity. if self.params["capacityConstraints"]: def load_callback(index): """Returns the load of a stop node.""" node = manager.IndexToNode(index) return data["node_loads"][node] load_callback_index = routing.RegisterUnaryTransitCallback(load_callback) routing.AddDimensionWithVehicleCapacity( load_callback_index, 0, # No slack. data["vehicle_capacities"], # Vehicle maximum capacity. True, # Start cumulative to zero. "capacity" # Dimension name. ) logger.info("Capacity constraints included in the routing model") # If vehicle usage should be penalized: # Set a fixed cost for all vehicles. if self.params["vehicleUsagePenalties"]: routing.SetFixedCostOfAllVehicles(data["vehicle_penalty"]) logger.info("Vehicle usage cost set to {}".format(data["vehicle_penalty"])) self.routing_model = routing logger.info("Created the routing model")
[docs] def set_routing_index_manager(self): """Initializes the routing index manager. The routing index manager converts OR Tools internal indices to node indices that correspond to the position of stop nodes in the data model. """ logger.info("Creating the routing index manager ...") self.routing_index_manager = pywrapcp.RoutingIndexManager( self.data_model["node_count"], self.data_model["vehicle_count"], self.data_model["vehicle_starts"], self.data_model["vehicle_ends"] ) logger.info("Created the routing index manager")
[docs] @staticmethod def create_vehicle_data(stop_nodes, vehicle_sizes): """Automatically creates the required vehicle data. Creates the required vehicle input data by assuming that the standard vehicle sizes are available without a limit. Without a limit means that each stop can possibly be served only by vehicles from a single standard size. The vehicles will not have a start location associated with them. Args: stop_nodes (dict): The stops object of the input parameters. vehicle_sizes (list): A list of the most common vehicle capacities. """ logger.info("Creating vehicle data ...") data = [] # Loop over all stops. for stop in stop_nodes: # Retrieve the load of the stop. stop_load = stop["load"] # Loop over all given vehicle sizes. for vehicle_size in vehicle_sizes: # Calculate how many vehicles of this size are needed to serve the stop. # The // operator is for integer division, but by default rounds down. # Hence, when the stop load is 10 and the bus size is 4, we need three buses to # serve the stop. What // will do, is 10 / 4 = 2.5 -> round down -> answer = 2. # We can solve this by setting the stop load to a negative number and then # turning the result back into a positive number again by multiplying by -1. # Then, -10 / 4 = -2.5 -> round down -> answer = -3 -> times -1 -> = 3. n_vehicles = -(-stop_load // vehicle_size) # Add as many vehicle data objects for this vehicle size to the data list as # needed to serve the stop. for i in range(n_vehicles): vehicle_data = { "name": "Bus of size " + str(vehicle_size), "capacity": vehicle_size } data.append(vehicle_data) logger.info("Successfully created vehicle data") logger.debug(data) return data
[docs] @staticmethod def split_stops(stop_nodes, vehicle_sizes): """Distributes the load of stop nodes over multiple sub-nodes. This is needed to allow mulitple visits to a single node. The rules for splitting are the following: 1) Stop nodes with a total load smaller than or equal to the minimum standard vehicle size will not be splitted, i.e. they will only have a single sub-node with a load equal to the total load of the stop node. 2) Stop nodes with a total load larger than the minimum standard vehicle size will be splitted into sub-nodes with loads that equal the minimum standard vehicle size. 3) However, if the total load of a stop node is larger than 10 times the minimum standard vehicle size, the node will instead be splitted into sub-nodes with loads that equal the second smallest standard vehicle size. This is to prevent that too many sub-nodes get created, which would slow the routing algorithm down. 4) Again, if the total load of a stop node is larger than 10 times the second smallest standard vehicle size, the node will be splitted into sub-nodes with loads that equal the third smallest standard vehicle size, et cetera. 5) Stop nodes with a total load larger than the maximum standard vehicle size will always be splitted into sub-nodes with loads that equal the maximum standard vehicle size. After splitting the total load of a stop node into equally loaded sub-nodes, the remainder is handled as follows: 1) The remainder will first be splitted in sub-nodes with loads that equal the size of a standard vehicle of one level smaller than the standard vehicle which size was used to do the initial split. 2) Then, the remainder of that split operation will be splitted in sub-nodes with loads that equal the size of a standard vehicle of two levels smaller than the standard vehicle which size was used to do the initial split, et cetera. 3) If there are no smaller vehicle sizes (anymore), the remainder is added to the last created sub-node. Args: stop_nodes (dict): The stops object of the input parameters. vehicle_sizes (list): A list of the most common vehicle capacities. """ logger.info("Splitting stop nodes into smaller sub-nodes ...") node_id = 0 # Initial value for the ID of the stop node that is being evaluated. sub_nodes = [] # Empty list to store the sub-nodes data. # Retrieve the smallest and largest of the standard vehicle sizes. min_size = min(vehicle_sizes) max_size = max(vehicle_sizes) # Loop over all stop nodes. for node in stop_nodes: # Retrieve the total load of the stop node. load = node["load"] # Set initial values that will be updated in the while loop below. # This loop will define the load for the sub-nodes during the first split iteration. sub_load_defined = False # Is the load for the sub-nodes is defined or not. i = 0 # Index of vehicle size that is being considered as load for the sub-nodes. # Define the sub-load. while not sub_load_defined: if load <= min_size: # Create only one sub-node with a load equal to the total load of the stop node. sub_load = load sub_load_defined = True else: # Set the load of the sub-nodes to the first value in the vehicle size list. # Accept as the defined sub-load when it is >= 1/10th of the total stop load. # If it is the max vehicle size, always accept it as the defined sub-load. sub_load = vehicle_sizes[i] if sub_load >= load / 10 or sub_load == max_size: sub_load_defined = True # If the sub-load is not defined yet, move on the next vehicle size. if not sub_load_defined: i += 1 # Define the part of the stop node load that still has to be splitted. # Initially, this is equal to the total stop load. # In the loop below, this value will be updated with each iteration. # The loop keeps iterating until there is no load to split anymore. load_to_split = load while load_to_split > 0: # Define how many sub-nodes can be created with a load = the defined sub-load. n_sub_nodes = int(load_to_split / sub_load) # Create all those sub nodes, and save them in the list with the sub-node data. for j in range(n_sub_nodes): sub_node = { "name": node["name"], "coordinates": node["coordinates"], "load": sub_load, "parent": node_id } sub_nodes.append(sub_node) # Update the load_to_split value. load_to_split = load_to_split - n_sub_nodes * sub_load # If there is a remainder smaller than the minimum standard vehicle size: # Add it to the last created sub-node. if load_to_split != 0 and load_to_split < min_size: sub_nodes[-1]["load"] += load_to_split if n_sub_nodes - 1 != 0: logger.debug( "Created {} sub node(s) with load {} for stop node #{}".format( n_sub_nodes - 1, sub_load, node_id + 1 ) ) logger.debug( "Created 1 sub node(s) with load {} for stop node #{}".format( sub_load + load_to_split, node_id + 1) ) load_to_split = 0 # Else: # Update the sub-load variable to the vehicle size of one level smaller than the # vehicle size used to do the previous split. else: if n_sub_nodes > 0: logger.debug( "Created {} sub node(s) with load {} for stop node #{}".format( n_sub_nodes, sub_load, node_id + 1) ) i -= 1 sub_load = vehicle_sizes[i] # Update to the next stop node. node_id += 1 logger.info("Succesfully splitted stop nodes into smaller sub-nodes") logger.debug(sub_nodes) return sub_nodes
def _format_solution(self): """Parses the solution into a clear, human-readable format.""" logger.info("Formatting the solution ...") result = {} # Empty dictionary to store all results. result["routeCount"] = 0 # Initial value for the number of routes in the solution. result["routes"] = [] # Empty list object to store the individual routes. route_index = 0 # Initial route index. # Retrieve the four main components of the route optimizer. data = self.data_model routing = self.routing_model manager = self.routing_index_manager solution = self.solution # Loop over the vehicles to store the optimal route for each of them. for vehicle_id in range(data["vehicle_count"]): # Retrieve the index of the node where the route for this vehicle starts. index = routing.Start(vehicle_id) # Initialize result attributes. node_indices = [] node_names = [] node_coords = [] node_parents = [] loads = [] costs = [] route_load = 0 # Initial value for the cumulative load of the route. route_cost = 0 # Initial value for the cumulative cost of the route. # Loop over all nodes that are visited by this vehicle. while not routing.IsEnd(index): # Retrieve the index of the current node. node_index = manager.IndexToNode(index) # Add the node attributes to the corresponding lists. node_indices.append(node_index) node_names.append(data["node_names"][node_index]) node_coords.append(data["node_coords"][node_index]) node_parents.append(data["node_parents"][node_index]) # Add the load of the node to the total route load. route_load += data["node_loads"][node_index] loads.append(route_load) # Add the travel cost between this node and the next node to the total route cost. previous_index = index # Current node index becomes "previous index". index = solution.Value(routing.NextVar(index)) # Next node index becomes "index". route_cost += routing.GetArcCostForVehicle(previous_index, index, vehicle_id) costs.append(route_cost) # Determine if the vehicle is used in the result. # If a vehicle is not used, it will not visit any node but the start node. # If a vehicle is used, it will visit the start node plus at least one stop. used = True if len(node_indices) > 1 else False if not used: continue # Get the index of the final node, where the vehicle ends its route. final_node_index = manager.IndexToNode(index) # Add the attributes of the final node to the corresponding lists. node_indices.append(final_node_index) node_names.append(data["node_names"][final_node_index]) node_coords.append(data["node_coords"][final_node_index]) node_parents.append(data["node_parents"][final_node_index]) # Add the final load of the route to the end of the loads list. loads.append(route_load) # Subtract the vehicle usage cost from all elements in the cost list. # This is done, because the vehicle usage cost is not a 'true' travel cost. costs = [x - data["vehicle_penalty"] for x in costs] # If arbitrary starts is True: # Remove the first element from all lists that include the start node. # This is done, because the dummy start location is not a 'true' node. if self.params["arbitraryStarts"]: node_indices = node_indices[1:] node_names = node_names[1:] node_coords = node_coords[1:] node_parents = node_parents[1:] loads = loads[1:] else: # Add a cost of zero to the start node of the route. # Since getting to this node has no cost. costs.insert(0, 0) # Retrieve a route from OSRM for those vehicles that are used. route_geoms = osrm.get_route(node_coords) if used else None # If stop nodes were splitted to allow multiple visits: # Group them back together. if self.params["multipleVisits"]: # Get the index of the last element in each group of adjacent duplicates. group_indices = _utils._reduce_grouped_duplicates(node_parents) # Select only the element with those indices from each of the nodes lists. # Regarding the indices: # The interest is in the parent node indices, not in the sub-node indices. node_indices = [node_parents[i] for i in group_indices] node_names = [node_names[i] for i in group_indices] node_coords = [node_coords[i] for i in group_indices] # Also select only the elements with those indices from the loads and cost lists. loads = [loads[i] for i in group_indices] costs = [costs[i] for i in group_indices] # If the vehicle is used: # Create a result object for the route containing the important information. if used: route_data = { "routeIndex": route_index, "routeName": "Route " + str(route_index), "routeObjects": route_geoms, "nodeIndices": node_indices, "nodeNames": node_names, "nodeCoords": node_coords, "cumulativeLoads": loads, "cumulativeCosts": costs } # Add vehicle attributes only when there are capacity constraints. if self.params["capacityConstraints"]: route_data.update({ "vehicleIndex": vehicle_id, "vehicleName": data["vehicle_names"][vehicle_id], "vehicleCapacity": data["vehicle_capacities"][vehicle_id] }) result["routeCount"] += 1 result["routes"].append(route_data) # Update the route index to the next route. route_index += 1 # Return the result dictionary. return result @staticmethod def _get_return_status(value): """Maps the return status values of the routing model to readable status messages.""" status_messages = { 1: "Solution found", 2: "Solution search failed: there is no possible solution", 3: "Solution search failed: time limit reached before finding a solution", 4: "Solution search failed: model, model parameters, or flags are not valid" } return status_messages[value]