Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle S3 connection failure #555

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 22 additions & 4 deletions tensorboardX/event_file_writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import socket
import threading
import time
from botocore.exceptions import EndpointConnectionError

import six

Expand Down Expand Up @@ -191,6 +192,7 @@ def run(self):
# time to flush the writer, whichever is earlier. If we have an
# data, write it. If not, an empty queue exception will be raised
# and we can proceed to flush the writer.
connection = True
while True:
now = time.time()
queue_wait_duration = self._next_flush_time - now
Expand All @@ -213,11 +215,27 @@ def run(self):

now = time.time()
if now > self._next_flush_time:
# Small optimization - if there are no pending data,
# there's no need to flush, since each flush can be
# expensive (e.g. uploading a new file to a server).
if self._has_pending_data:
# Small optimization - if there are no pending data,
# there's no need to flush, since each flush can be
# expensive (e.g. uploading a new file to a server).
self._record_writer.flush()
# .flush() create a new connection each time, if there is
# fluctuation in connection boto3.client('s3', endpoint_url)
# throws an error and since it is not handled the thread will
# hang and since the queue is full the training will also hang.
# this try block will prevent getting stuck, instead it waits
# for the connection to be established again. Connection
# variable will make sure the print happens only once.
try:
self._record_writer.flush()
if not connection:
print("Connection re-established")
connection = True
except EndpointConnectionError as e:
if connection:
print("Connection lost, waiting for connection")
connection = False
continue
self._has_pending_data = False
# Do it again in flush_secs.
self._next_flush_time = now + self._flush_secs