Skip to content

Commit

Permalink
added busPriority.py
Browse files Browse the repository at this point in the history
  • Loading branch information
aminissn committed Jul 17, 2024
1 parent 798122f commit 1fd6b51
Showing 1 changed file with 368 additions and 0 deletions.
368 changes: 368 additions & 0 deletions tools/tls/busPriority.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,368 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Created on Wed Jul 10 12:10:09 2024
@author: Sasan Amini
This file contains functions to implement bus priority at signalized intersections by modifying an existing tlLogic.
The principle is to detect a bus using a E1detector, transfer from the current phase to a target phase where bus gets green until it corsses the intersection.
There are signin and signout detectors to serve this purpose.
"""
import sumolib
import xml.etree.ElementTree as ET
from sumolib.net import Condition,TLSProgram
from sumolib.sensors.inductive_loop import InductiveLoop as E1Deetcor
#%%
def phaseTranition(init_phase,target_phase):

"""
This function generates the transition from the current phase to an ideal phase required
by the approaching bus to cross the intersection.
There are two cases:
1- the current phase needs to be extended and terminated after bus leaves the intersection
2- current phase must be terminated immediately and an optimal ohase that provides green f
or the incoming bus starts and ends after the bus leaves.
Binary ariables transition determines which case must be used.
Input and output are strings e.g. GGgrrrr or GGyrrrrG
"""

trasition = True


s1 = list(init_phase) # Convert the state string to a list
s2 = list(init_phase)
s3 = list(init_phase)
sr = list(init_phase)

if init_phase == target_phase:
trasition = False

if trasition:
for i in range(len(init_phase)):
if init_phase[i] == 'G':
s1[i]='y'
elif init_phase[i] == 'g':
s1[i]='y'
elif init_phase[i] == 'r':
s1[i]='r'
elif init_phase[i] == 'y':
s1[i] = 'r'
else:
s1[i] = init_phase[i]

for i in range(len(target_phase)):
if target_phase[i] == 'G':
s3[i]='y'
elif target_phase[i] == 'g':
s3[i]='y'
elif target_phase[i] == 'r':
s3[i]='r'
elif target_phase[i] == 'y':
s3[i] = 'r'
else:
s3[i] = target_phase[i]
sr = ''.join(['r']*len(s1))
s2 = target_phase
return (''.join(s1),sr,''.join(s2),''.join(s3),sr)

else:
s1 = init_phase
for i in range(len(init_phase)):
if s1[i] == 'G':
s2[i]='y'
elif s1[i] == 'g':
s2[i]='y'
elif s1[i] == 'r':
s2[i]='r'
elif s1[i] == 'y':
s2[i] = 'r'
else:
s2[i] = s1[i]

sr = ''.join(['r']*len(s1))
return (''.join(s1),''.join(s2),sr,None,None)

#%%

def getSignInEdge(sumo_net, edge_id, signin_distance):

"""
This function finds the edge and the position where E1Deetcor should be placedfor bus signin.
Inputs are: an edgeID that leads to a tls, and a distance from tls stopline that a bus sigin detector should be placed upstream of the corresponding tls e.g. 120m.
Outputs: the edgeId and position on that edge to define E1Detector
*Hint: currently, from the stopline we look upstream and only consider connections with straight direction!
This can cause suboptimal detector defintions!
"""

edge = sumo_net.getEdge(edge_id)
length_sum = edge.getLength()
status = False
visited_edges = set()

while length_sum < signin_distance and not status:
up_node = edge.getFromNode()
if up_node.getType() == 'traffic_light':
status = True
break

down_node = edge.getToNode()
incoming_edges = [e for e in up_node.getIncoming() if e not in visited_edges]

if not incoming_edges:
print(f'The given distance of {signin_distance} is too long for edge: {edge_id}! Try a shorter distance.')
return edge.getID(),5

found_next_edge = False
for e in incoming_edges:
if e.getFromNode() != down_node:
conns = up_node.getConnections()
for conn in conns:
if conn.getFrom() == e and conn.getTo() == edge and conn.getDirection() == "s":
edge = e
found_next_edge = True
break
if found_next_edge:
break

if not found_next_edge:
print(f'The given distance of {signin_distance} is too long for edge: {edge_id}! Try a shorter distance.')
return edge.getID(),5

visited_edges.add(edge)
length_sum += edge.getLength()

if status:
return edge.getID(), 5
else:
return edge.getID(), round(length_sum - signin_distance, 1)
#%%

def addSigninE1(sumo_net,tls_id,signin_edge_id,edge_id,pos):

"""
This function generates the E1Detectors needed for signing at a user-defined distance from tls and the conditions for the tll file.
Inputs are the tls_id, the edge from which the bus enters the intersection, the actual edgeId and position on that edge where a detectors must be placed (outputs of getSignInEdge())
"""

dets = []
condistions = {signin_edge_id:[]}

for lane in sumo_net.getEdge(edge_id).getLanes():
if 'bus' in lane.getPermissions() or 'tram' in lane.getPermissions():
dets.append(E1Detector(id=f'signin_{tls_id}_{signin_edge_id}_{edge_id}_{lane.getIndex()}', lane=lane.getID(), pos=pos, vTypes="bus tram" , freq=180, file="e1.out.xml", friendlyPos="True"))
condistions[signin_edge_id].append(Condition(id=f'signin_{tls_id}_{edge_id}{pos}', value=f'6 > z:{dets[-1].id}'))

# Here we can define a near stopline detector, in case of queues the bus cannot cross the intersection, can be prioritized again!
## Todo: make sure these conditions do not conflict other conditions at the tls!

# for lane in sumo_net.getEdge(signin_edge_id).getLanes():
# if 'bus' in lane.getPermissions() or 'tram' in lane.getPermissions():
# dets.append(E1Detector(id=f'signin_{tls_id}_{signin_edge_id}_{lane.getIndex()}_near', lane=lane.getID(), pos=-10, vTypes="bus tram" , freq=180, file="e1.out.xml", friendlyPos="True"))
# condistions[signin_edge_id].append(Condition(id=f'signin_{tls_id}_{edge_id}_near', value=f'2 > z:{dets[-1].id}'))

return dets,condistions

#%%
def addSignoutE1(sumo_net,tls_id):
"""
This function generates the E1Detectors needed for signout from a tls and the conditions for the tll file.
These deetectors are placed on all lanes that allow bus or tram at a fixed distance of 5m downstream of the intersection.
"""
dets = []
condistions = []
seen_lane = []
for item in sumo_net.getTLS(tls_id).getLinks().values():
if item[0][1].getID() not in seen_lane and 'bus' in item[0][1].getPermissions() or 'tram' in item[0][1].getPermissions():
dets.append(E1Detector(id=f'sigout_{tls_id}_{item[0][1].getEdge().getID()}_{item[0][1].getID()}', lane=item[0][1].getID(), pos=5, vTypes="bus tram" , freq=180, file="e1.out.xml", friendlyPos="True"))
condistions.append(Condition(id=f'signout_{tls_id}', value=f'5 > z:{dets[-1].id}'))
seen_lane.append(item[0][1].getID())

return dets,condistions
#%%
def findBestState(strings, indices):
"""
This is an auxiallary function for getEdgeGState()
"""
best_state = None
max_g_count = -1

for s in strings:
if all(len(s) > idx and s[idx] == 'G' for idx in indices):
g_count = s.count('G')
if g_count > max_g_count:
max_g_count = g_count
best_state = s
return best_state

#%%
def getEdgeGState(sumo_net,edge_id,tls_prog_id,from_tls_logic:False):

"""
This function finds the phase that serves bus priority for the corresponding edge at the tls.
There are two options: 1- find the best phase from the exsiting program or 2- create a new phase that turns all lanes on that edge to green using findBestState().
option 2 is very helpful when we have turning lanes with a seperate green phase, and we dont know the coming bus is going to which direction.
"""

tls_id = sumo_net.getEdge(edge_id).getTLS().getID()
connections_dict = sumo_net.getTLS(tls_id).getLinks()
tls_g_index = {edge_id:[]}
for con in connections_dict.values():
if con[0][0].getEdge().getID()==edge_id:
tls_g_index[edge_id].append(con[0][-1])
if not from_tls_logic:
maxTlLinkIndex = max(sumo_net.getTLS(tls_id).getLinks().keys())
phase_state = list((maxTlLinkIndex+1)*'r')
for i in tls_g_index[edge_id]:
phase_state[i] = 'G'
ret = ''.join(phase_state)
else:
stages = []
for phase in sumo_net.getTLS(tls_id).getPrograms()[tls_prog_id].getPhases():
stages.append(phase.state)

ret = findBestState(stages,tls_g_index[edge_id])

return ret
#%%
def addBusPrio(sumo_net,tls_id,tls_prog_id,e1pos,tll_file_path:str,e1det_file_path:str,xml_output=True):

incomingEdges = sumo_net.getTLS(tls_id).getEdges()
signin_conds = {}
signin_e1s = []

signoute1s, signout_conds = addSignoutE1(sumo_net,tls_id)
signout_cond_val = []
for item in signout_conds:
signout_cond_id = item.id
signout_cond_val.append(item.value)
signout_cond_str = (' or '.join(signout_cond_val))


for edge in incomingEdges:
e1edge,pos = getSignInEdge(sumo_net,edge.getID(),e1pos)
signin_e1,signin_cond = addSigninE1(sumo_net,tls_id,edge.getID(),e1edge,pos)
signin_e1s += signin_e1
for key, value in signin_cond.items():
signin_conds[key] = value

if xml_output:
with open(e1det_file_path,'w') as file:
file.write('<additional> \n')
for item in signin_e1s:
file.write(item.toXML())
for item in signoute1s:
file.write(item.toXML())
file.write('</additional>')
else:
out_det = signin_e1s + signoute1s


for pid, tls_prog in sumo_net.getTLS(tls_id).getPrograms().items():
if pid == tls_prog_id:
ptype = tls_prog.getType()
poffset = tls_prog.getOffset()
else:
print('The given tls program ID is not available!')
return None

phases = {key:val for key,val in tls_prog.getPhasesWithIndex().items()}
stages = {key:val for key,val in tls_prog.getStages().items()}


outf = TLSProgram(id=f'{pid}_bus_priority', type=ptype, offset=poffset)
condition_string = "DEFAULT"

for phid, phase in tls_prog.getPhasesWithIndex().items():
if phid in stages.keys():
phases[phid+1].earlyTarget = condition_string

if phid == max(phases.keys()):
outf.addPhase(duration=phase.duration, state=phase.state, minDur = phase.minDur, maxDur = phase.maxDur, next = '0',name = phase.name, earlyTarget = phase.earlyTarget)
else:
outf.addPhase(duration=phase.duration, state=phase.state, minDur = phase.minDur, maxDur = phase.maxDur, next = '',name = phase.name, earlyTarget = phase.earlyTarget)

for stage_idx,stage_i in stages.items():
next = [stage_idx+1]

for edge in incomingEdges:
next.append(outf.numPhases())
n = max(next)
# stage_o = getEdgeGState(sumo_net,edge.getID(),tls_prog_id,True)
# if stage_o is None:
stage_o = getEdgeGState(sumo_net,edge.getID(),tls_prog_id,False)
priority_phases = phaseTranition(stage_i.state,stage_o)
cond_val = []
for item in signin_conds[edge.getID()]:
cond_val.append(item.value)
signin_cond_id = item.id
signin_cond_value = (' or '.join(cond_val))
if priority_phases[4] is None:
outf.addPhase(state = priority_phases[0], duration=10, minDur=5, maxDur=60,next=[n+1],earlyTarget=signin_cond_id)
outf.addPhase(state = priority_phases[1], duration=3, next = [], earlyTarget=signout_cond_id)
outf.addPhase(state = priority_phases[2], duration=1, next = [' '.join([str(stage_idx)]+[str(i) for i in list(stages.keys()) if i != stage_idx])])

else:
outf.addPhase(state = priority_phases[0], duration=3, next = [],earlyTarget=signin_cond_id)
outf.addPhase(state = priority_phases[1], duration=2,next = [])
outf.addPhase(state = priority_phases[2], duration=10, minDur=5, maxDur=60,next=[n+3])
outf.addPhase(state = priority_phases[3], duration=3, next = [], earlyTarget=signout_cond_id)
outf.addPhase(state = priority_phases[4], duration=2, next = [' '.join([str(stage_idx)]+[str(i) for i in list(stages.keys()) if i != stage_idx])])

outf.addCondition(signin_cond_id, signin_cond_value)
outf.getPhaseByIndex(stage_idx).addNext(next[::-1])
outf.addCondition(signout_cond_id, signout_cond_str)



if xml_output:
with open(tll_file_path,'w') as file:
file.write('<additional> \n')
file.write(outf.toXML(tls_id))
file.write('</additional>')
else:
return outf,out_det

#%%

def read_tllogic(file='./tls.tll.xml'):
# Load the XML file
tree = ET.parse(file)
root = tree.getroot()
tlids = []
# Iterate over each tlLogic element
for tl_logic in root.findall('tlLogic'):
idx = 0
pid = tl_logic.get('programID')
ptype = tl_logic.get('type')
tlid = tl_logic.get('id')
tlids.append(tlid)
for phase in tl_logic.findall('phase'):
# Extract phase attributes
idx += 1
duration = phase.get('duration')
state = phase.get('state')
min_dur = phase.get('minDur')
max_dur = phase.get('maxDur')
next_phase = phase.get('next')
early_target = phase.get('earlyTarget')

return tlids
#%%
# How to use the function on certain tls of a given tll file
tls_ids = read_tllogic('./tls_from_pdf_sasan.tll.xml')
net_path = './network.net.xml'
net = sumolib.net.readNet(net_path, withConnections=True,withInternal=True,withLatestPrograms=True)

with open('./bus_priority.tll.xml','w') as file_tll:
file_tll.write('<additional> \n')
with open('./bus_priority.add.xml','w') as file_det:
file_det.write('<additional> \n')
for tls in tls_ids:
tll,det = addBusPrio(net,tls,'1',100,'','',False)
file_tll.write(tll.toXML(tls))
for d in det:
file_det.write(d.toXML())
file_det.write('</additional>')
file_tll.write('</additional>')

0 comments on commit 1fd6b51

Please sign in to comment.