Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

S3TransferManager results in error when running in AWS but works remotely #644

Closed
rickyeng127 opened this issue Jul 5, 2023 · 4 comments
Labels
help-wanted We are asking the community to submit a PR to resolve this issue. p3 This is a minor priority issue response-requested Waiting on additional info and feedback. Will move to 'closing-soon' in 7 days.

Comments

@rickyeng127
Copy link

rickyeng127 commented Jul 5, 2023

Describe the bug

We are attempting to use the S3TransferManager to stream data to an S3 bucket. The code works locally when connecting to AWS remotely, however fails when running from AWS.

Expected Behavior

We expect to see the file created in the S3 bucket.

Current Behavior

The code generates this trace and error:

[TRACE] [2023-07-05T18:11:01Z] [00007fdbc6242700] [event-loop] - id=0x7fdbe42c5a80: detected more scheduled tasks with the next occurring at 0, using timeout of 0.
[TRACE] [2023-07-05T18:11:01Z] [00007fdbc6242700] [event-loop] - id=0x7fdbe42c5a80: waiting for a maximum of 0 ms
[TRACE] [2023-07-05T18:11:01Z] [00007fdbc6242700] [event-loop] - id=0x7fdbe42c5a80: wake up with 0 events to process.
[TRACE] [2023-07-05T18:11:01Z] [00007fdbc6242700] [event-loop] - id=0x7fdbe42c5a80: running scheduled tasks.
[DEBUG] [2023-07-05T18:11:01Z] [00007fdbc6242700] [task-scheduler] - id=0x7fdc0419e908: Running aws_exponential_backoff_retry_task task with <Running> status
[DEBUG] [2023-07-05T18:11:01Z] [00007fdbc6242700] [exp-backoff-strategy] - id=0x7fdbe4509160: Vending retry_token 0x7fdc0419e8a0
[DEBUG] [2023-07-05T18:11:01Z] [00007fdbc6242700] [standard-retry-strategy] - id=0x7fdbe4508fd0: token acquired callback invoked with error Success. with token 0x7fdbfc00c8f0 and nested token 0x7fdc0419e8a0
[TRACE] [2023-07-05T18:11:01Z] [00007fdbc6242700] [standard-retry-strategy] - id=0x7fdbe4508fd0: invoking on_retry_token_acquired callback
[DEBUG] [2023-07-05T18:11:01Z] [00007fdbc6242700] [connection-manager] - id=0x7fdbe45a2950: Acquire connection
[DEBUG] [2023-07-05T18:11:01Z] [00007fdbc6242700] [connection-manager] - id=0x7fdbe45a2950: snapshot - state=1, idle_connection_count=0, pending_acquire_count=1, pending_settings_count=0, pending_connect_count=1, vended_connection_count=0, open_connection_count=0, ref_count=1
[INFO] [2023-07-05T18:11:01Z] [00007fdbc6242700] [connection-manager] - id=0x7fdbe45a2950: Requesting 1 new connections from http
[DEBUG] [2023-07-05T18:11:01Z] [00007fdbc6242700] [http-connection] - https_proxy environment found
[ERROR] [2023-07-05T18:11:01Z] [00007fdbc6242700] [http-connection] - Could not parse found proxy URI.
[ERROR] [2023-07-05T18:11:01Z] [00007fdbc6242700] [connection-manager] - id=0x7fdbe45a2950: http connection creation failed with error code 36(An input string was passed to a parser and the string was incorrectly formatted.)
[DEBUG] [2023-07-05T18:11:01Z] [00007fdbc6242700] [connection-manager] - id=0x7fdbe45a2950: Failing excess connection acquisition with error code 36
[WARN] [2023-07-05T18:11:01Z] [00007fdbc6242700] [connection-manager] - id=0x7fdbe45a2950: Failed to complete connection acquisition with error_code 36(An input string was passed to a parser and the string was incorrectly formatted.)
[ERROR] [2023-07-05T18:11:01Z] [00007fdbc6242700] [S3Endpoint] - id=0x7fdbe4592e80: Could not acquire connection due to error code 36 (An input string was passed to a parser and the string was incorrectly formatted.)
[DEBUG] [2023-07-05T18:11:01Z] [00007fdbc6242700] [S3Client] - id=0x7fdbe4401fa0 Client scheduling retry of request 0x7fdc0419e130 for meta request 0x7fdbe45779f0 with token 0x7fdbfc00c8f0 with error code 36 (An input string was passed to a parser and the string was incorrectly formatted.).
[DEBUG] [2023-07-05T18:11:01Z] [00007fdbc6242700] [standard-retry-strategy] - token_id=0x7fdbfc00c8f0: reducing retry capacity by 10 from 500 and scheduling retry.
[DEBUG] [2023-07-05T18:11:01Z] [00007fdbc6242700] [exp-backoff-strategy] - id=0x7fdbe4509160: Attempting retry on token 0x7fdc0419e8a0 with error type 0
[DEBUG] [2023-07-05T18:11:01Z] [00007fdbc6242700] [exp-backoff-strategy] - id=0x7fdbe4509160: Computed backoff value of 8310907ns on token 0x7fdc0419e8a0
[TRACE] [2023-07-05T18:11:01Z] [00007fdbc6242700] [event-loop] - id=0x7fdbe42c5a80: scheduling task 0x7fdc0419e908 in-thread for timestamp 977992596685
[DEBUG] [2023-07-05T18:11:01Z] [00007fdbc6242700] [task-scheduler] - id=0x7fdc0419e908: Scheduling aws_exponential_backoff_retry_task task for future execution at time 977992596685
[TRACE] [2023-07-05T18:11:01Z] [00007fdbc6242700] [standard-retry-strategy] - id=0x7fdbe4508fd0: on_retry_token_acquired callback completed

Reproduction Steps

public class AWSTransferManager {
  private static final Logger log = LogManager.getLogger(AWSTransferManager.class.getName());

  static final String BUCKET="MYBUCKET";
  static final String FILE ="flux.bin";
  static final String testString = "This is some test data that will get sent to S3 over and over and " +
    "over again and is about 128 chars long, no seriously... really";
  static final int numLinesToSend=1_000;
  static final byte[] testBytes = testString.getBytes(StandardCharsets.UTF_8);

  public static void main (String[] args) throws IOException, InterruptedException {
    doIt();
  }

  public static void doIt() throws InterruptedException {

    log.info("http_proxy: " + System.getProperty("http_proxy")); // returns null
    log.info("https_proxy: " + System.getProperty("https_proxy")); // returns null
    log.info("http.proxyHost: " + System.getProperty("http.proxyHost")); // returns null
    log.info("https.proxyHost: " + System.getProperty("https.proxyHost")); // returns null

    String logFile = System.getProperty("user.dir")+"/s3.log";
    Utils.initCrtLogging(logFile);
    //Use default Creds:
    AwsCredentialsProvider credentialsProvider = DefaultCredentialsProvider.create();
    //or Select an Env Profile
    //AwsCredentialsProvider credentialsProvider = ProfileCredentialsProvider.create(PROFILE);
    //Or use SSO
    //AwsCredentialsProvider credentialsProvider = Utils.getSsoCredentials(PROFILE);

    S3AsyncClient s3AsyncClient = Utils.getS3AsyncClient(credentialsProvider);
    S3TransferManager transferManager = Utils.getTransferManager(s3AsyncClient);

    //Create a feed we can 'stream' messages to:
    ConsumerFeed<byte[]> consumerFeed = new ConsumerFeed<>();
    Flux<ByteBuffer> flux = consumerFeed.flux
      .map(ByteBuffer::wrap)
      .doOnComplete(() -> log.info("Done"));

    UploadRequest.Builder req = UploadRequest.builder()
      .requestBody(AsyncRequestBody.fromPublisher(flux))
      .addTransferListener(new S3TransferListener(FILE))
      //Fix for Error we get without contentLength:
      //https://github.com/awslabs/aws-c-s3/pull/285/files#diff-5ee13dd0ee005b828bde6d60ff89582070983e3cb7e5d92389b629a9850c27f8
      .putObjectRequest(b -> b.bucket(BUCKET).key(FILE)); //.contentLength(maxSize));

    Upload upload = transferManager.upload(req.build());
    //async stuff - wait for it.
    consumerFeed.subscription.await();

    //Send some stuff:
    Consumer<byte[]> dataFeed = consumerFeed.getDataFeed();
    for (int i=0; i < numLinesToSend; i++) {
      dataFeed.accept(testBytes);
    }
    long completionSentTime= System.currentTimeMillis();
    consumerFeed.complete();

    CompletedUpload completedUpload = upload.completionFuture().join();
    double responseTime = (System.currentTimeMillis() - completionSentTime)/1000D;
    log.info("CompleteUploadResponse: {}\n\tCloseDuration: {}",
      completedUpload.response(), String.format("%.3f seconds", responseTime));

    // Validation:
    Thread.sleep(2000L);
    log.info("Getting File");
    Path filePath = Paths.get(System.getProperty("user.dir")+"/"+FILE);

    try {
      if (Files.exists(filePath)) Files.delete(filePath);
      GetObjectResponse response = s3AsyncClient.getObject(o -> o.bucket(BUCKET).key(FILE), AsyncResponseTransformer.toFile(filePath)).join();
      log.info("Resulting File Size: {}", Files.size(filePath));
    }
    catch (IOException e) {
      throw new RuntimeException(e);
    }
  }
}

public class ConsumerFeed<T> {
  Consumer<T> dataFeed;
  FluxSink<T> sink;
  final CountDownLatch subscription = new CountDownLatch(1);
  Flux<T> flux = Flux.create(sink -> {
    System.out.println("Create");
    this.setSink(sink);
    setDataFeed(sink::next);
    subscription.countDown();
  });
  void complete() {
    getSink().complete();
  }

  public Consumer<T> getDataFeed() {
    return dataFeed;
  }

  public void setDataFeed(Consumer<T> dataFeed) {
    this.dataFeed = dataFeed;
  }

  public FluxSink<T> getSink() {
    return sink;
  }

  public void setSink(FluxSink<T> sink) {
    this.sink = sink;
  }
}

public class S3TransferListener implements TransferListener {

  private static final Logger log = LogManager.getLogger(S3TransferListener.class.getName());

  public S3TransferListener(String resource) {
    this.resource = resource;
  }

  final String resource;
  long startTime;
  private int step=0;
  @Override
  public void transferInitiated (Context.TransferInitiated context) {
    log.info("Transfer initiated: {}, {}", resource, context.progressSnapshot().ratioTransferred());
    startTime = System.currentTimeMillis();
    status(context.progressSnapshot().transferredBytes());
  }

  private void status(long l) {
    if (l > step * 1_000_000) {
      log.info("Bytes transferred {}", l);
      step++;
    }
  }

  @Override
  public void bytesTransferred (Context.BytesTransferred context) {
    status(context.progressSnapshot().transferredBytes());
  }

  @Override
  public void transferComplete (Context.TransferComplete context) {
    long seconds = (System.currentTimeMillis() - startTime) / 1000;
    double bytes = (double)context.progressSnapshot().transferredBytes();
    double megabytes = bytes / 1_048_576;
    double throughput = megabytes / seconds;
    log.info("Transfer complete for resource: {}\n\t Bytes: {}\n\t MBs: {}\n\tThroughput: {} MB/s",
      resource, String.format("%10f", bytes), String.format("%.3f", megabytes),
      String.format("%.2f", throughput));
  }

  @Override
  public void transferFailed (Context.TransferFailed context) {
    log.error("Transfer failed on resource "+resource, context.exception());
  }
}


public class Utils {

  private static final Logger log = LogManager.getLogger(Utils.class.getName());


  static Path getPath (String resource) {
    Path path = Paths.get(resource);
    if (!path.getParent().toFile().exists() && !path.getParent().toFile().mkdirs()) {
      throw new RuntimeException("Failed to create path location: " + path);
    }
    return path;
  }

  static void initCrtLogging(String path) {
    //Since this method uses native aws libs (https://github.com/awslabs/aws-c-s3)
    // Only decent logging can be found by enabling this.
    /*Path logFilePath = getPath(path);
    if (Files.exists(logFilePath)) {
      try {
        Files.delete(logFilePath);
      } catch (IOException e) {
        throw new RuntimeException(e);
      }
    }*/
    Log.initLoggingToStdout(Log.LogLevel.Trace);
    log.info("CRT Logging initialized: "+path);
  }

  static S3TransferManager getTransferManager(S3AsyncClient s3AsyncClient) {
    return S3TransferManager.builder()
      .s3Client(s3AsyncClient)
      .build();
  }

  public static S3AsyncClient getS3AsyncClient(AwsCredentialsProvider credentialsProvider) {
    S3AsyncClient s3AsyncClient =
      S3AsyncClient.crtBuilder()
        //.credentialsProvider(credentialsProvider)
        .region(Region.US_EAST_1)
        .targetThroughputInGbps(20.0)
        .minimumPartSizeInBytes(1000000L)
        .build();
    return s3AsyncClient;
  }
}

Possible Solution

No response

Additional Information/Context

Using AWS SDK version 2.20.94

aws-crt-java version used

0.22.2

Java version used

1.8

Operating System and version

Windows 10 Enterprise

@rickyeng127 rickyeng127 added bug This issue is a bug. needs-triage This issue or PR still needs to be triaged. labels Jul 5, 2023
@TingDaoK
Copy link
Contributor

TingDaoK commented Jul 6, 2023

awslabs/aws-c-s3#317 (comment)

We check the environment variables, instead of system property. Check System.getenv("https_proxy")

@TingDaoK TingDaoK added help-wanted We are asking the community to submit a PR to resolve this issue. and removed bug This issue is a bug. labels Jul 6, 2023
@yasminetalby yasminetalby added response-requested Waiting on additional info and feedback. Will move to 'closing-soon' in 7 days. p3 This is a minor priority issue labels Jul 6, 2023
@rickyeng127
Copy link
Author

Will the code attempt to parse the System.getenv("https_proxy") value if it is set to an empty string vs. not being set (null)? And if so, will this result in the error seen above?

@TingDaoK
Copy link
Contributor

TingDaoK commented Jul 6, 2023

we will still try to parse the value if it's empty, you need to unset it

@yasminetalby yasminetalby removed the needs-triage This issue or PR still needs to be triaged. label Jul 7, 2023
@rickyeng127
Copy link
Author

Unsettting the proxy from an empty string to null resolve this issue. Thank you for your help.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
help-wanted We are asking the community to submit a PR to resolve this issue. p3 This is a minor priority issue response-requested Waiting on additional info and feedback. Will move to 'closing-soon' in 7 days.
Projects
None yet
Development

No branches or pull requests

3 participants