From 1fd6b51c4b761998389677dbe354c13b5f7835bd Mon Sep 17 00:00:00 2001 From: Sasan Date: Wed, 17 Jul 2024 11:51:46 +0200 Subject: [PATCH] added busPriority.py --- tools/tls/busPriority.py | 368 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 368 insertions(+) create mode 100644 tools/tls/busPriority.py diff --git a/tools/tls/busPriority.py b/tools/tls/busPriority.py new file mode 100644 index 000000000000..070f28cb3827 --- /dev/null +++ b/tools/tls/busPriority.py @@ -0,0 +1,368 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +""" +Created on Wed Jul 10 12:10:09 2024 + +@author: Sasan Amini + +This file contains functions to implement bus priority at signalized intersections by modifying an existing tlLogic. +The principle is to detect a bus using a E1detector, transfer from the current phase to a target phase where bus gets green until it corsses the intersection. +There are signin and signout detectors to serve this purpose. + +""" +import sumolib +import xml.etree.ElementTree as ET +from sumolib.net import Condition,TLSProgram +from sumolib.sensors.inductive_loop import InductiveLoop as E1Deetcor +#%% +def phaseTranition(init_phase,target_phase): + + """ + This function generates the transition from the current phase to an ideal phase required + by the approaching bus to cross the intersection. + There are two cases: + 1- the current phase needs to be extended and terminated after bus leaves the intersection + 2- current phase must be terminated immediately and an optimal ohase that provides green f + or the incoming bus starts and ends after the bus leaves. + Binary ariables transition determines which case must be used. + Input and output are strings e.g. GGgrrrr or GGyrrrrG + """ + + trasition = True + + + s1 = list(init_phase) # Convert the state string to a list + s2 = list(init_phase) + s3 = list(init_phase) + sr = list(init_phase) + + if init_phase == target_phase: + trasition = False + + if trasition: + for i in range(len(init_phase)): + if init_phase[i] == 'G': + s1[i]='y' + elif init_phase[i] == 'g': + s1[i]='y' + elif init_phase[i] == 'r': + s1[i]='r' + elif init_phase[i] == 'y': + s1[i] = 'r' + else: + s1[i] = init_phase[i] + + for i in range(len(target_phase)): + if target_phase[i] == 'G': + s3[i]='y' + elif target_phase[i] == 'g': + s3[i]='y' + elif target_phase[i] == 'r': + s3[i]='r' + elif target_phase[i] == 'y': + s3[i] = 'r' + else: + s3[i] = target_phase[i] + sr = ''.join(['r']*len(s1)) + s2 = target_phase + return (''.join(s1),sr,''.join(s2),''.join(s3),sr) + + else: + s1 = init_phase + for i in range(len(init_phase)): + if s1[i] == 'G': + s2[i]='y' + elif s1[i] == 'g': + s2[i]='y' + elif s1[i] == 'r': + s2[i]='r' + elif s1[i] == 'y': + s2[i] = 'r' + else: + s2[i] = s1[i] + + sr = ''.join(['r']*len(s1)) + return (''.join(s1),''.join(s2),sr,None,None) + +#%% + +def getSignInEdge(sumo_net, edge_id, signin_distance): + + """ + This function finds the edge and the position where E1Deetcor should be placedfor bus signin. + Inputs are: an edgeID that leads to a tls, and a distance from tls stopline that a bus sigin detector should be placed upstream of the corresponding tls e.g. 120m. + Outputs: the edgeId and position on that edge to define E1Detector + *Hint: currently, from the stopline we look upstream and only consider connections with straight direction! + This can cause suboptimal detector defintions! + """ + + edge = sumo_net.getEdge(edge_id) + length_sum = edge.getLength() + status = False + visited_edges = set() + + while length_sum < signin_distance and not status: + up_node = edge.getFromNode() + if up_node.getType() == 'traffic_light': + status = True + break + + down_node = edge.getToNode() + incoming_edges = [e for e in up_node.getIncoming() if e not in visited_edges] + + if not incoming_edges: + print(f'The given distance of {signin_distance} is too long for edge: {edge_id}! Try a shorter distance.') + return edge.getID(),5 + + found_next_edge = False + for e in incoming_edges: + if e.getFromNode() != down_node: + conns = up_node.getConnections() + for conn in conns: + if conn.getFrom() == e and conn.getTo() == edge and conn.getDirection() == "s": + edge = e + found_next_edge = True + break + if found_next_edge: + break + + if not found_next_edge: + print(f'The given distance of {signin_distance} is too long for edge: {edge_id}! Try a shorter distance.') + return edge.getID(),5 + + visited_edges.add(edge) + length_sum += edge.getLength() + + if status: + return edge.getID(), 5 + else: + return edge.getID(), round(length_sum - signin_distance, 1) + #%% + +def addSigninE1(sumo_net,tls_id,signin_edge_id,edge_id,pos): + + """ + This function generates the E1Detectors needed for signing at a user-defined distance from tls and the conditions for the tll file. + Inputs are the tls_id, the edge from which the bus enters the intersection, the actual edgeId and position on that edge where a detectors must be placed (outputs of getSignInEdge()) + """ + + dets = [] + condistions = {signin_edge_id:[]} + + for lane in sumo_net.getEdge(edge_id).getLanes(): + if 'bus' in lane.getPermissions() or 'tram' in lane.getPermissions(): + dets.append(E1Detector(id=f'signin_{tls_id}_{signin_edge_id}_{edge_id}_{lane.getIndex()}', lane=lane.getID(), pos=pos, vTypes="bus tram" , freq=180, file="e1.out.xml", friendlyPos="True")) + condistions[signin_edge_id].append(Condition(id=f'signin_{tls_id}_{edge_id}{pos}', value=f'6 > z:{dets[-1].id}')) + + # Here we can define a near stopline detector, in case of queues the bus cannot cross the intersection, can be prioritized again! + ## Todo: make sure these conditions do not conflict other conditions at the tls! + + # for lane in sumo_net.getEdge(signin_edge_id).getLanes(): + # if 'bus' in lane.getPermissions() or 'tram' in lane.getPermissions(): + # dets.append(E1Detector(id=f'signin_{tls_id}_{signin_edge_id}_{lane.getIndex()}_near', lane=lane.getID(), pos=-10, vTypes="bus tram" , freq=180, file="e1.out.xml", friendlyPos="True")) + # condistions[signin_edge_id].append(Condition(id=f'signin_{tls_id}_{edge_id}_near', value=f'2 > z:{dets[-1].id}')) + + return dets,condistions + +#%% +def addSignoutE1(sumo_net,tls_id): + """ + This function generates the E1Detectors needed for signout from a tls and the conditions for the tll file. + These deetectors are placed on all lanes that allow bus or tram at a fixed distance of 5m downstream of the intersection. + """ + dets = [] + condistions = [] + seen_lane = [] + for item in sumo_net.getTLS(tls_id).getLinks().values(): + if item[0][1].getID() not in seen_lane and 'bus' in item[0][1].getPermissions() or 'tram' in item[0][1].getPermissions(): + dets.append(E1Detector(id=f'sigout_{tls_id}_{item[0][1].getEdge().getID()}_{item[0][1].getID()}', lane=item[0][1].getID(), pos=5, vTypes="bus tram" , freq=180, file="e1.out.xml", friendlyPos="True")) + condistions.append(Condition(id=f'signout_{tls_id}', value=f'5 > z:{dets[-1].id}')) + seen_lane.append(item[0][1].getID()) + + return dets,condistions +#%% +def findBestState(strings, indices): + """ + This is an auxiallary function for getEdgeGState() + """ + best_state = None + max_g_count = -1 + + for s in strings: + if all(len(s) > idx and s[idx] == 'G' for idx in indices): + g_count = s.count('G') + if g_count > max_g_count: + max_g_count = g_count + best_state = s + return best_state + +#%% +def getEdgeGState(sumo_net,edge_id,tls_prog_id,from_tls_logic:False): + + """ + This function finds the phase that serves bus priority for the corresponding edge at the tls. + There are two options: 1- find the best phase from the exsiting program or 2- create a new phase that turns all lanes on that edge to green using findBestState(). + option 2 is very helpful when we have turning lanes with a seperate green phase, and we dont know the coming bus is going to which direction. + """ + + tls_id = sumo_net.getEdge(edge_id).getTLS().getID() + connections_dict = sumo_net.getTLS(tls_id).getLinks() + tls_g_index = {edge_id:[]} + for con in connections_dict.values(): + if con[0][0].getEdge().getID()==edge_id: + tls_g_index[edge_id].append(con[0][-1]) + if not from_tls_logic: + maxTlLinkIndex = max(sumo_net.getTLS(tls_id).getLinks().keys()) + phase_state = list((maxTlLinkIndex+1)*'r') + for i in tls_g_index[edge_id]: + phase_state[i] = 'G' + ret = ''.join(phase_state) + else: + stages = [] + for phase in sumo_net.getTLS(tls_id).getPrograms()[tls_prog_id].getPhases(): + stages.append(phase.state) + + ret = findBestState(stages,tls_g_index[edge_id]) + + return ret +#%% +def addBusPrio(sumo_net,tls_id,tls_prog_id,e1pos,tll_file_path:str,e1det_file_path:str,xml_output=True): + + incomingEdges = sumo_net.getTLS(tls_id).getEdges() + signin_conds = {} + signin_e1s = [] + + signoute1s, signout_conds = addSignoutE1(sumo_net,tls_id) + signout_cond_val = [] + for item in signout_conds: + signout_cond_id = item.id + signout_cond_val.append(item.value) + signout_cond_str = (' or '.join(signout_cond_val)) + + + for edge in incomingEdges: + e1edge,pos = getSignInEdge(sumo_net,edge.getID(),e1pos) + signin_e1,signin_cond = addSigninE1(sumo_net,tls_id,edge.getID(),e1edge,pos) + signin_e1s += signin_e1 + for key, value in signin_cond.items(): + signin_conds[key] = value + + if xml_output: + with open(e1det_file_path,'w') as file: + file.write(' \n') + for item in signin_e1s: + file.write(item.toXML()) + for item in signoute1s: + file.write(item.toXML()) + file.write('') + else: + out_det = signin_e1s + signoute1s + + + for pid, tls_prog in sumo_net.getTLS(tls_id).getPrograms().items(): + if pid == tls_prog_id: + ptype = tls_prog.getType() + poffset = tls_prog.getOffset() + else: + print('The given tls program ID is not available!') + return None + + phases = {key:val for key,val in tls_prog.getPhasesWithIndex().items()} + stages = {key:val for key,val in tls_prog.getStages().items()} + + + outf = TLSProgram(id=f'{pid}_bus_priority', type=ptype, offset=poffset) + condition_string = "DEFAULT" + + for phid, phase in tls_prog.getPhasesWithIndex().items(): + if phid in stages.keys(): + phases[phid+1].earlyTarget = condition_string + + if phid == max(phases.keys()): + outf.addPhase(duration=phase.duration, state=phase.state, minDur = phase.minDur, maxDur = phase.maxDur, next = '0',name = phase.name, earlyTarget = phase.earlyTarget) + else: + outf.addPhase(duration=phase.duration, state=phase.state, minDur = phase.minDur, maxDur = phase.maxDur, next = '',name = phase.name, earlyTarget = phase.earlyTarget) + + for stage_idx,stage_i in stages.items(): + next = [stage_idx+1] + + for edge in incomingEdges: + next.append(outf.numPhases()) + n = max(next) + # stage_o = getEdgeGState(sumo_net,edge.getID(),tls_prog_id,True) + # if stage_o is None: + stage_o = getEdgeGState(sumo_net,edge.getID(),tls_prog_id,False) + priority_phases = phaseTranition(stage_i.state,stage_o) + cond_val = [] + for item in signin_conds[edge.getID()]: + cond_val.append(item.value) + signin_cond_id = item.id + signin_cond_value = (' or '.join(cond_val)) + if priority_phases[4] is None: + outf.addPhase(state = priority_phases[0], duration=10, minDur=5, maxDur=60,next=[n+1],earlyTarget=signin_cond_id) + outf.addPhase(state = priority_phases[1], duration=3, next = [], earlyTarget=signout_cond_id) + outf.addPhase(state = priority_phases[2], duration=1, next = [' '.join([str(stage_idx)]+[str(i) for i in list(stages.keys()) if i != stage_idx])]) + + else: + outf.addPhase(state = priority_phases[0], duration=3, next = [],earlyTarget=signin_cond_id) + outf.addPhase(state = priority_phases[1], duration=2,next = []) + outf.addPhase(state = priority_phases[2], duration=10, minDur=5, maxDur=60,next=[n+3]) + outf.addPhase(state = priority_phases[3], duration=3, next = [], earlyTarget=signout_cond_id) + outf.addPhase(state = priority_phases[4], duration=2, next = [' '.join([str(stage_idx)]+[str(i) for i in list(stages.keys()) if i != stage_idx])]) + + outf.addCondition(signin_cond_id, signin_cond_value) + outf.getPhaseByIndex(stage_idx).addNext(next[::-1]) + outf.addCondition(signout_cond_id, signout_cond_str) + + + + if xml_output: + with open(tll_file_path,'w') as file: + file.write(' \n') + file.write(outf.toXML(tls_id)) + file.write('') + else: + return outf,out_det + +#%% + +def read_tllogic(file='./tls.tll.xml'): +# Load the XML file + tree = ET.parse(file) + root = tree.getroot() + tlids = [] + # Iterate over each tlLogic element + for tl_logic in root.findall('tlLogic'): + idx = 0 + pid = tl_logic.get('programID') + ptype = tl_logic.get('type') + tlid = tl_logic.get('id') + tlids.append(tlid) + for phase in tl_logic.findall('phase'): + # Extract phase attributes + idx += 1 + duration = phase.get('duration') + state = phase.get('state') + min_dur = phase.get('minDur') + max_dur = phase.get('maxDur') + next_phase = phase.get('next') + early_target = phase.get('earlyTarget') + + return tlids +#%% +# How to use the function on certain tls of a given tll file +tls_ids = read_tllogic('./tls_from_pdf_sasan.tll.xml') +net_path = './network.net.xml' +net = sumolib.net.readNet(net_path, withConnections=True,withInternal=True,withLatestPrograms=True) + +with open('./bus_priority.tll.xml','w') as file_tll: + file_tll.write(' \n') + with open('./bus_priority.add.xml','w') as file_det: + file_det.write(' \n') + for tls in tls_ids: + tll,det = addBusPrio(net,tls,'1',100,'','',False) + file_tll.write(tll.toXML(tls)) + for d in det: + file_det.write(d.toXML()) + file_det.write('') + file_tll.write('') \ No newline at end of file