diff --git a/grpc/execution/server.go b/grpc/execution/server.go index a984e4ffc..1bb17cb1e 100644 --- a/grpc/execution/server.go +++ b/grpc/execution/server.go @@ -100,7 +100,7 @@ func (s *ExecutionServiceServerV1) GetBlock(ctx context.Context, req *astriaPb.G res, err := s.getBlockFromIdentifier(req.GetIdentifier()) if err != nil { log.Error("failed finding block", err) - return nil, err + return nil, shared.WrapError(err, "failed finding block") } log.Debug("GetBlock completed", "request", req, "response", res) @@ -125,7 +125,7 @@ func (s *ExecutionServiceServerV1) BatchGetBlocks(ctx context.Context, req *astr block, err := s.getBlockFromIdentifier(id) if err != nil { log.Error("failed finding block with id", id, "error", err) - return nil, err + return nil, shared.WrapError(err, fmt.Sprintf("failed finding block with id %s", id.String())) } blocks = append(blocks, block) @@ -190,7 +190,7 @@ func (s *ExecutionServiceServerV1) ExecuteBlock(ctx context.Context, req *astria payload, err := s.Eth().Miner().BuildPayload(payloadAttributes) if err != nil { log.Error("failed to build payload", "err", err) - return nil, status.Errorf(codes.InvalidArgument, "Could not build block with provided txs: %v", err) + return nil, status.Errorf(codes.InvalidArgument, shared.WrapError(err, "Could not build block with provided txs").Error()) } // call blockchain.InsertChain to actually execute and write the blocks to @@ -198,12 +198,12 @@ func (s *ExecutionServiceServerV1) ExecuteBlock(ctx context.Context, req *astria block, err := engine.ExecutableDataToBlock(*payload.Resolve().ExecutionPayload, nil, nil) if err != nil { log.Error("failed to convert executable data to block", err) - return nil, status.Error(codes.Internal, "failed to execute block") + return nil, status.Error(codes.Internal, shared.WrapError(err, "failed to convert executable data to block").Error()) } err = s.Bc().InsertBlockWithoutSetHead(block) if err != nil { log.Error("failed to insert block to chain", "hash", block.Hash(), "prevHash", req.PrevBlockHash, "err", err) - return nil, status.Error(codes.Internal, "failed to insert block to chain") + return nil, status.Error(codes.Internal, shared.WrapError(err, "failed to insert block to chain").Error()) } // remove txs from original mempool @@ -244,12 +244,12 @@ func (s *ExecutionServiceServerV1) GetCommitmentState(ctx context.Context, req * softBlock, err := ethHeaderToExecutionBlock(s.Bc().CurrentSafeBlock()) if err != nil { log.Error("error finding safe block", err) - return nil, status.Error(codes.Internal, "could not locate soft block") + return nil, status.Error(codes.Internal, shared.WrapError(err, "could not locate soft block").Error()) } firmBlock, err := ethHeaderToExecutionBlock(s.Bc().CurrentFinalBlock()) if err != nil { log.Error("error finding final block", err) - return nil, status.Error(codes.Internal, "could not locate firm block") + return nil, status.Error(codes.Internal, shared.WrapError(err, "could not locate firm block").Error()) } celestiaBlock := s.Bc().CurrentBaseCelestiaHeight() @@ -312,7 +312,7 @@ func (s *ExecutionServiceServerV1) UpdateCommitmentState(ctx context.Context, re if currentHead != softEthHash { if _, err := s.Bc().SetCanonical(softBlock); err != nil { log.Error("failed updating canonical chain to soft block", err) - return nil, status.Error(codes.Internal, "Could not update head to safe hash") + return nil, status.Error(codes.Internal, shared.WrapError(err, "Could not update head to safe hash").Error()) } } @@ -368,7 +368,7 @@ func (s *ExecutionServiceServerV1) getBlockFromIdentifier(identifier *astriaPb.B res, err := ethHeaderToExecutionBlock(header) if err != nil { // This should never happen since we validate header exists above. - return nil, status.Error(codes.Internal, "internal error") + return nil, status.Error(codes.Internal, shared.WrapError(err, "internal error").Error()) } return res, nil diff --git a/grpc/optimistic/server.go b/grpc/optimistic/server.go index 08da038d5..a56c0d8ac 100644 --- a/grpc/optimistic/server.go +++ b/grpc/optimistic/server.go @@ -6,7 +6,6 @@ import ( astriaPb "buf.build/gen/go/astria/execution-apis/protocolbuffers/go/astria/execution/v1" "context" "errors" - "fmt" "github.com/ethereum/go-ethereum/beacon/engine" "github.com/ethereum/go-ethereum/common" cmath "github.com/ethereum/go-ethereum/common/math" @@ -77,7 +76,7 @@ func (o *OptimisticServiceV1Alpha1) GetBundleStream(_ *optimsticPb.GetBundleStre marshalledTxs := [][]byte{} marshalledTx, err := pendingTx.MarshalBinary() if err != nil { - return status.Errorf(codes.Internal, "error marshalling tx: %v", err) + return status.Errorf(codes.Internal, shared.WrapError(err, "error marshalling tx").Error()) } marshalledTxs = append(marshalledTxs, marshalledTx) @@ -89,13 +88,18 @@ func (o *OptimisticServiceV1Alpha1) GetBundleStream(_ *optimsticPb.GetBundleStre err = stream.Send(&optimsticPb.GetBundleStreamResponse{Bundle: &bundle}) if err != nil { log.Error("error sending bundle over stream", "err", err) - return status.Errorf(codes.Internal, "error sending bundle over stream: %v", err) + return status.Error(codes.Internal, shared.WrapError(err, "error sending bundle over stream").Error()) } } case err := <-pendingTxEvent.Err(): - log.Error("error waiting for pending transactions", "err", err) - return status.Errorf(codes.Internal, "error waiting for pending transactions: %v", err) + if err != nil { + log.Error("error waiting for pending transactions", "err", err) + return status.Error(codes.Internal, shared.WrapError(err, "error waiting for pending transactions").Error()) + } else { + // TODO - what is the right error code here? + return status.Error(codes.Internal, "tx pool subscription closed") + } case <-stream.Context().Done(): return stream.Context().Err() @@ -125,7 +129,7 @@ func (o *OptimisticServiceV1Alpha1) ExecuteOptimisticBlockStream(stream optimist // execute the optimistic block and wait for the mempool clearing event optimisticBlock, err := o.ExecuteOptimisticBlock(stream.Context(), baseBlock) if err != nil { - return status.Errorf(codes.Internal, "failed to execute optimistic block: %v", err) + return status.Errorf(codes.Internal, shared.WrapError(err, "failed to execute optimistic block").Error()) } optimisticBlockHash := common.BytesToHash(optimisticBlock.Hash) @@ -144,8 +148,13 @@ func (o *OptimisticServiceV1Alpha1) ExecuteOptimisticBlockStream(stream optimist log.Error("timed out waiting for mempool to clear after optimistic block execution") return status.Error(codes.DeadlineExceeded, "timed out waiting for mempool to clear after optimistic block execution") case err := <-mempoolClearingEvent.Err(): - log.Error("error waiting for mempool clearing event", "err", err) - return status.Errorf(codes.Internal, "error waiting for mempool clearing event: %v", err) + if err != nil { + log.Error("error waiting for mempool clearing event", "err", err) + return status.Errorf(codes.Internal, shared.WrapError(err, "error waiting for mempool clearing event").Error()) + } else { + // TODO - what is the right error code here? + return status.Error(codes.Internal, "mempool clearance subscription closed") + } case <-stream.Context().Done(): return stream.Context().Err() } @@ -159,7 +168,7 @@ func (o *OptimisticServiceV1Alpha1) ExecuteOptimisticBlock(ctx context.Context, if err := validateStaticExecuteOptimisticBlockRequest(req); err != nil { log.Error("ExecuteOptimisticBlock called with invalid BaseBlock", "err", err) - return nil, status.Error(codes.InvalidArgument, fmt.Sprintf("BaseBlock is invalid: %s", err.Error())) + return nil, status.Error(codes.InvalidArgument, shared.WrapError(err, "invalid BaseBlock").Error()) } if !o.SyncMethodsCalled() { @@ -193,13 +202,13 @@ func (o *OptimisticServiceV1Alpha1) ExecuteOptimisticBlock(ctx context.Context, payload, err := o.Eth().Miner().BuildPayload(payloadAttributes) if err != nil { log.Error("failed to build payload", "err", err) - return nil, status.Errorf(codes.InvalidArgument, "Could not build block with provided txs: %v", err) + return nil, status.Errorf(codes.InvalidArgument, shared.WrapError(err, "failed to build payload").Error()) } block, err := engine.ExecutableDataToBlock(*payload.Resolve().ExecutionPayload, nil, nil) if err != nil { log.Error("failed to convert executable data to block", err) - return nil, status.Error(codes.Internal, "failed to execute block") + return nil, status.Error(codes.Internal, shared.WrapError(err, "failed to convert executable data to block").Error()) } // this will insert the optimistic block into the chain and persist it's state without @@ -207,7 +216,7 @@ func (o *OptimisticServiceV1Alpha1) ExecuteOptimisticBlock(ctx context.Context, err = o.Bc().InsertBlockWithoutSetHead(block) if err != nil { log.Error("failed to insert block to chain", "hash", block.Hash(), "prevHash", block.ParentHash(), "err", err) - return nil, status.Error(codes.Internal, "failed to insert block to chain") + return nil, status.Error(codes.Internal, shared.WrapError(err, "failed to insert block to chain").Error()) } // we store a pointer to the optimistic block in the chain so that we can use it diff --git a/grpc/optimistic/server_test.go b/grpc/optimistic/server_test.go index ff2359520..b0dd97720 100644 --- a/grpc/optimistic/server_test.go +++ b/grpc/optimistic/server_test.go @@ -334,7 +334,7 @@ func TestNewOptimisticServiceServerV1Alpha_StreamBundles(t *testing.T) { select { case err := <-errorCh: - require.ErrorContains(t, err, "error waiting for pending transactions") + require.ErrorContains(t, err, "tx pool subscription closed") } require.Len(t, mockServerSideStreaming.sentResponses, 5, "Number of responses should match the number of requests") diff --git a/grpc/shared/validation.go b/grpc/shared/validation.go index b6cd82839..e0bd7ea84 100644 --- a/grpc/shared/validation.go +++ b/grpc/shared/validation.go @@ -7,6 +7,7 @@ import ( "bytes" "crypto/ed25519" "crypto/sha256" + "errors" "fmt" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/contracts" @@ -14,10 +15,13 @@ import ( "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/params" "github.com/golang/protobuf/proto" - "github.com/pkg/errors" "math/big" ) +func WrapError(err error, msg string) error { + return fmt.Errorf("%s: %w", msg, err) +} + func protoU128ToBigInt(u128 *primitivev1.Uint128) *big.Int { lo := big.NewInt(0).SetUint64(u128.Lo) hi := big.NewInt(0).SetUint64(u128.Hi) @@ -124,20 +128,20 @@ func unmarshallAllocationTxs(allocation *bundlev1alpha1.Allocation, prevBlockHas publicKey := ed25519.PublicKey(allocation.GetPublicKey()) bech32Address, err := EncodeFromPublicKey(addressPrefix, publicKey) if err != nil { - return nil, errors.Wrapf(err, "failed to encode public key to bech32m address: %s", publicKey) + return nil, WrapError(err, fmt.Sprintf("failed to encode public key to bech32m address: %s", publicKey)) } if auctioneerBech32Address != bech32Address { - return nil, errors.Errorf("address in allocation does not match auctioneer address. expected: %s, got: %s", auctioneerBech32Address, bech32Address) + return nil, fmt.Errorf("address in allocation does not match auctioneer address. expected: %s, got: %s", auctioneerBech32Address, bech32Address) } message, err := proto.Marshal(allocation.GetPayload()) if err != nil { - return nil, errors.Wrap(err, "failed to marshal allocation") + return nil, WrapError(err, "failed to marshal allocation") } signature := allocation.GetSignature() if !ed25519.Verify(publicKey, message, signature) { - return nil, errors.New("failed to verify signature") + return nil, fmt.Errorf("failed to verify signature") } // unmarshall the transactions in the bundle @@ -145,7 +149,7 @@ func unmarshallAllocationTxs(allocation *bundlev1alpha1.Allocation, prevBlockHas ethtx := new(types.Transaction) err := ethtx.UnmarshalBinary(allocationTx) if err != nil { - return nil, errors.Wrap(err, "failed to unmarshall allocation transaction") + return nil, WrapError(err, "failed to unmarshall allocation transaction") } processedTxs = append(processedTxs, ethtx) }