Skip to content

Commit

Permalink
Merge pull request #46 from Seagate/streaming-changes-blobfuse2
Browse files Browse the repository at this point in the history
Update to blobfuse2.1.2
  • Loading branch information
jfantinhardesty authored Nov 17, 2023
2 parents eae4418 + 19d2ae5 commit c5f20f1
Show file tree
Hide file tree
Showing 12 changed files with 530 additions and 161 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

## 0.2.1 (WIP) ##

This version is based on [blobfuse2 commit 9c10ae08 dated 2023-11-7](https://github.com/Azure/azure-storage-fuse/commit/9c10ae0880e4b042d89ae4ea571aaf8f49b3d519) (upstream).
This version is based on [blobfuse2 2.1.2](https://github.com/Azure/azure-storage-fuse/releases/tag/blobfuse2-2.1.2) (upstream).
**Changes**
Changed sync-to-flush to true by default.

Expand Down
5 changes: 5 additions & 0 deletions common/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@ const (
BlockFlagUnknown uint16 = iota
DirtyBlock
TruncatedBlock
RemovedBlocks
)

type Block struct {
Expand Down Expand Up @@ -176,6 +177,10 @@ func (block *Block) Truncated() bool {
return block.Flags.IsSet(TruncatedBlock)
}

func (block *Block) Removed() bool {
return block.Flags.IsSet(RemovedBlocks)
}

// Flags for block offset list
const (
BolFlagUnknown uint16 = iota
Expand Down
123 changes: 80 additions & 43 deletions component/azstorage/azauthmsi.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"encoding/json"
"errors"
"fmt"
"math/rand"
"os"
"os/exec"
"strings"
Expand All @@ -52,6 +53,11 @@ type azAuthMSI struct {
azAuthBase
}

func getNextExpiryTimer(token *adal.Token) time.Duration {
delay := time.Duration(5+rand.Intn(120)) * time.Second
return time.Until(token.Expires()) - delay
}

// fetchToken : Generates a token based on the config
func (azmsi *azAuthMSI) fetchToken(endpoint string) (*common.OAuthTokenInfo, error) {
// Resource string is fixed and has no relation with any of the user inputs
Expand Down Expand Up @@ -190,35 +196,51 @@ func (azmsi *azAuthBlobMSI) getCredential() interface{} {
log.Info("azAuthBlobMSI::getCredential : MSI Token over CLI retrieved %s (%d)", token.AccessToken, token.Expires())
// We are running in cli mode so token can not be refreshed, on expiry just get the new token
tc = azblob.NewTokenCredential(token.AccessToken, func(tc azblob.TokenCredential) time.Duration {
newToken, err := azmsi.fetchTokenFromCLI()
if err != nil {
log.Err("azAuthBlobMSI::getCredential : Failed to refresh token [%s]", err.Error())
return 0
}
for failCount := 0; failCount < 5; failCount++ {
newToken, err := azmsi.fetchTokenFromCLI()
if err != nil {
log.Err("azAuthBlobMSI::getCredential : Failed to refresh token attempt %d [%s]", failCount, err.Error())
time.Sleep(time.Duration(rand.Intn(5)) * time.Second)
continue
}

// set the new token value
tc.SetToken(newToken.AccessToken)
log.Debug("azAuthBlobMSI::getCredential : MSI Token retrieved %s (%d)", newToken.AccessToken, newToken.Expires())
// set the new token value
tc.SetToken(newToken.AccessToken)
log.Debug("azAuthBlobMSI::getCredential : MSI Token retrieved %s (%d)", newToken.AccessToken, newToken.Expires())

// Get the next token slightly before the current one expires
return time.Until(newToken.Expires()) - 5*time.Minute
// Get the next token slightly before the current one expires
return getNextExpiryTimer(&newToken.Token)

}
log.Err("azAuthBlobMSI::getCredential : Failed to refresh token bailing out.")
return 0
})
} else {
log.Info("azAuthBlobMSI::getCredential : MSI Token retrieved %s (%d)", token.AccessToken, token.Expires())
// Using token create the credential object, here also register a call back which refreshes the token
tc = azblob.NewTokenCredential(token.AccessToken, func(tc azblob.TokenCredential) time.Duration {
newToken, err := token.Refresh(context.Background())
if err != nil {
log.Err("azAuthBlobMSI::getCredential : Failed to refresh token [%s]", err.Error())
return 0
// token, err := azmsi.fetchToken(msi_endpoint)
// if err != nil {
// log.Err("azAuthBlobMSI::getCredential : Failed to fetch token [%s]", err.Error())
// return 0
// }
for failCount := 0; failCount < 5; failCount++ {
newToken, err := token.Refresh(context.Background())
if err != nil {
log.Err("azAuthBlobMSI::getCredential : Failed to refresh token attempt %d [%s]", failCount, err.Error())
time.Sleep(time.Duration(rand.Intn(5)) * time.Second)
continue
}

// set the new token value
tc.SetToken(newToken.AccessToken)
log.Debug("azAuthBlobMSI::getCredential : MSI Token retrieved %s (%d)", newToken.AccessToken, newToken.Expires())

// Get the next token slightly before the current one expires
return getNextExpiryTimer(newToken)
}

// set the new token value
tc.SetToken(newToken.AccessToken)
log.Debug("azAuthBlobMSI::getCredential : MSI Token retrieved %s (%d)", newToken.AccessToken, newToken.Expires())

// Get the next token slightly before the current one expires
return time.Until(newToken.Expires()) - 5*time.Minute
log.Err("azAuthBlobMSI::getCredential : Failed to refresh token bailing out.")
return 0
})
}

Expand Down Expand Up @@ -269,35 +291,50 @@ func (azmsi *azAuthBfsMSI) getCredential() interface{} {
log.Info("azAuthBfsMSI::getCredential : MSI Token over CLI retrieved %s (%d)", token.AccessToken, token.Expires())
// We are running in cli mode so token can not be refreshed, on expiry just get the new token
tc = azbfs.NewTokenCredential(token.AccessToken, func(tc azbfs.TokenCredential) time.Duration {
newToken, err := azmsi.fetchTokenFromCLI()
if err != nil {
log.Err("azAuthBfsMSI::getCredential : Failed to refresh token [%s]", err.Error())
return 0
for failCount := 0; failCount < 5; failCount++ {
newToken, err := azmsi.fetchTokenFromCLI()
if err != nil {
log.Err("azAuthBfsMSI::getCredential : Failed to refresh token attempt %d [%s]", failCount, err.Error())
time.Sleep(time.Duration(rand.Intn(5)) * time.Second)
continue
}

// set the new token value
tc.SetToken(newToken.AccessToken)
log.Debug("azAuthBfsMSI::getCredential : MSI Token retrieved %s (%d)", newToken.AccessToken, newToken.Expires())

// Get the next token slightly before the current one expires
return getNextExpiryTimer(&newToken.Token)
}

// set the new token value
tc.SetToken(newToken.AccessToken)
log.Debug("azAuthBfsMSI::getCredential : MSI Token retrieved %s (%d)", newToken.AccessToken, newToken.Expires())

// Get the next token slightly before the current one expires
return time.Until(newToken.Expires()) - 5*time.Minute
log.Err("azAuthBfsMSI::getCredential : Failed to refresh token bailing out.")
return 0
})
} else {
log.Info("azAuthBfsMSI::getCredential : MSI Token retrieved %s (%d)", token.AccessToken, token.Expires())
// Using token create the credential object, here also register a call back which refreshes the token
tc = azbfs.NewTokenCredential(token.AccessToken, func(tc azbfs.TokenCredential) time.Duration {
newToken, err := token.Refresh(context.Background())
if err != nil {
log.Err("azAuthBfsMSI::getCredential : Failed to refresh token [%s]", err.Error())
return 0
// token, err := azmsi.fetchToken(msi_endpoint)
// if err != nil {
// log.Err("azAuthBfsMSI::getCredential : Failed to fetch token [%s]", err.Error())
// return 0
// }
for failCount := 0; failCount < 5; failCount++ {
newToken, err := token.Refresh(context.Background())
if err != nil {
log.Err("azAuthBfsMSI::getCredential : Failed to refresh token attempt %d [%s]", failCount, err.Error())
time.Sleep(time.Duration(rand.Intn(5)) * time.Second)
continue
}

// set the new token value
tc.SetToken(newToken.AccessToken)
log.Debug("azAuthBfsMSI::getCredential : MSI Token retrieved %s (%d)", newToken.AccessToken, newToken.Expires())

// Get the next token slightly before the current one expires
return getNextExpiryTimer(newToken)
}

// set the new token value
tc.SetToken(newToken.AccessToken)
log.Debug("azAuthBfsMSI::getCredential : MSI Token retrieved %s (%d)", newToken.AccessToken, newToken.Expires())

// Get the next token slightly before the current one expires
return time.Until(newToken.Expires()) - 5*time.Minute
log.Err("azAuthBfsMSI::getCredential : Failed to refresh token bailing out.")
return 0
})
}

Expand Down
71 changes: 51 additions & 20 deletions component/azstorage/azauthspn.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ package azstorage

import (
"fmt"
"math/rand"
"os"
"time"

Expand All @@ -46,6 +47,11 @@ type azAuthSPN struct {
azAuthBase
}

func getNextExpiryTimerSPN(spt *adal.ServicePrincipalToken) time.Duration {
delay := time.Duration(5+rand.Intn(120)) * time.Second
return time.Until(spt.Token().Expires()) - delay
}

func (azspn *azAuthSPN) getAADEndpoint() string {
if azspn.config.ActiveDirectoryEndpoint != "" {
return azspn.config.ActiveDirectoryEndpoint
Expand Down Expand Up @@ -112,18 +118,31 @@ func (azspn *azAuthBlobSPN) getCredential() interface{} {

// Using token create the credential object, here also register a call back which refreshes the token
tc := azblob.NewTokenCredential(spt.Token().AccessToken, func(tc azblob.TokenCredential) time.Duration {
err := spt.Refresh()
if err != nil {
log.Err("azAuthBlobSPN::getCredential : Failed to refresh SPN token [%s]", err.Error())
return 0
}
// spt, err = azspn.fetchToken()
// if err != nil {
// log.Err("azAuthBlobSPN::getCredential : Failed to fetch SPN token [%s]", err.Error())
// return 0
// }
for failCount := 0; failCount < 5; failCount++ {
err = spt.Refresh()
if err != nil {
log.Err("azAuthBfsSPN::getCredential : Failed to refresh token attempt %d [%s]", failCount, err.Error())
time.Sleep(time.Duration(rand.Intn(5)) * time.Second)
continue
}

// set the new token value
tc.SetToken(spt.Token().AccessToken)
log.Debug("azAuthBlobSPN::getCredential : SPN Token retrieved %s (%d)", spt.Token().AccessToken, spt.Token().Expires())

// set the new token value
tc.SetToken(spt.Token().AccessToken)
log.Debug("azAuthBlobSPN::getCredential : SPN Token retrieved %s (%d)", spt.Token().AccessToken, spt.Token().Expires())
// Get the next token slightly before the current one expires
return getNextExpiryTimerSPN(spt)

// Get the next token slightly before the current one expires
return time.Until(spt.Token().Expires()) - 5*time.Minute
// Test code to expire token every 30 seconds
// return time.Until(time.Now()) + 30*time.Second
}
log.Err("azAuthBfsSPN::getCredential : Failed to refresh token bailing out.")
return 0
})

return tc
Expand All @@ -144,18 +163,30 @@ func (azspn *azAuthBfsSPN) getCredential() interface{} {

// Using token create the credential object, here also register a call back which refreshes the token
tc := azbfs.NewTokenCredential(spt.Token().AccessToken, func(tc azbfs.TokenCredential) time.Duration {
err := spt.Refresh()
if err != nil {
log.Err("azAuthBfsSPN::getCredential : Failed to refresh SPN token [%s]", err.Error())
return 0
}
// spt, err = azspn.fetchToken()
// if err != nil {
// log.Err("azAuthBfsSPN::getCredential : Failed to fetch SPN token [%s]", err.Error())
// return 0
// }
for failCount := 0; failCount < 5; failCount++ {
err = spt.Refresh()
if err != nil {
log.Err("azAuthBfsSPN::getCredential : Failed to refresh token attempt %d [%s]", failCount, err.Error())
time.Sleep(time.Duration(rand.Intn(5)) * time.Second)
continue
}

// set the new token value
tc.SetToken(spt.Token().AccessToken)
log.Debug("azAuthBfsSPN::getCredential : SPN Token retrieved %s (%d)", spt.Token().AccessToken, spt.Token().Expires())
// set the new token value
tc.SetToken(spt.Token().AccessToken)
log.Debug("azAuthBfsSPN::getCredential : SPN Token retrieved %s (%d)", spt.Token().AccessToken, spt.Token().Expires())

// Get the next token slightly before the current one expires
return time.Until(spt.Token().Expires()) - 5*time.Minute
// Get the next token slightly before the current one expires
return getNextExpiryTimerSPN(spt)
// Test code to expire token every 30 seconds
// return time.Until(time.Now()) + 30*time.Second
}
log.Err("azAuthBfsSPN::getCredential : Failed to refresh token bailing out.")
return 0
})

return tc
Expand Down
Loading

0 comments on commit c5f20f1

Please sign in to comment.