Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BUG]: Last yielded batch not propagated when pipeline stopped #1489

Open
2 tasks done
efajardo-nv opened this issue Feb 1, 2024 · 1 comment
Open
2 tasks done

[BUG]: Last yielded batch not propagated when pipeline stopped #1489

efajardo-nv opened this issue Feb 1, 2024 · 1 comment
Assignees
Labels
bug Something isn't working external This issue was filed by someone outside of the Morpheus team

Comments

@efajardo-nv
Copy link
Contributor

efajardo-nv commented Feb 1, 2024

Version

23.11

Which installation method(s) does this occur on?

Conda

Describe the bug.

We've built a custom source stage that implements the stop() method. Our stop() method sets _stop_requested flag to True on Ctrl+C which signals the generator method to stop yielding batches. However, we're seeing that the last yielded batch is not being received by the next stage.

Minimum reproducible example

  1. Save the following code to pipeline.py in Morpheus root.
  2. In Morpheus root: export MORPHEUS_ROOT=$(pwd)
  3. Run pipeline: python pipeline.py
  4. Press Ctrl+C after second batch is yielded.
  5. Observe that PassThruStage does not receive the last yielded batch.
import logging
import os
import pathlib
import time
import typing

import mrc

import cudf

from morpheus.config import Config
from morpheus.messages import MessageMeta
from morpheus.pipeline import LinearPipeline
from morpheus.pipeline.stage_decorator import stage
from morpheus.pipeline.single_output_source import SingleOutputSource
from morpheus.pipeline.stage_schema import StageSchema
from morpheus.utils.logger import configure_logging

logger = logging.getLogger("morpheus.{__name__}")


@stage
def passthru_stage(message: MessageMeta) -> MessageMeta:
    logger.debug("Received %s", len(message.df))
    return message


class TestSourceStage(SingleOutputSource):

    def __init__(self, c: Config, filename: pathlib.Path):

        super().__init__(c)

        self._filename = filename
        self._stop_requested = False

    @property
    def name(self) -> str:
        return "from-file"

    def supports_cpp_node(self) -> bool:
        return False

    def compute_schema(self, schema: StageSchema):
        schema.output_schema.set_type(MessageMeta)

    def _build_source(self, builder: mrc.Builder) -> mrc.SegmentObject:

        node = builder.make_source(self.unique_name, self._generate_frames())

        return node

    def stop(self):
        self._stop_requested = True
        logger.debug("Received Ctrl-C")

    def _generate_frames(self) -> typing.Iterable[MessageMeta]:

        df = cudf.read_csv(self._filename)

        i = 1
        while not self._stop_requested:

            time.sleep(5)

            x = MessageMeta(df)

            logger.debug("Batch %s: Yielded %s", i, len(x.df))
            i += 1

            yield x


def run_pipeline():
    configure_logging(log_level=logging.DEBUG)

    root_dir = os.environ['MORPHEUS_ROOT']
    input_file = os.path.join(root_dir, 'models/datasets/validation-data/sid-validation-data.csv')

    config = Config()

    pipeline = LinearPipeline(config)
    pipeline.set_source(TestSourceStage(config, filename=input_file))
    pipeline.add_stage(passthru_stage(config))

    pipeline.run()


if __name__ == "__main__":
    run_pipeline()

Relevant log output

====Pipeline Started====
====Building Segment Complete!====
Batch 1: Yielded 2000
Received 2000
Batch 2: Yielded 2000
Received 2000
^CStopping pipeline. Please wait... Press Ctrl+C again to kill.
====Stopping Pipeline====
Received Ctrl+C
====Pipeline Stopped====
Batch 3: Yielded 2000
Received Ctrl+C
====Pipeline Complete====

Note: The second Received Ctrl+C is due to a known issue where the source stage's stop() method is being called twice (#1477).

Full env printout

No response

Other/Misc.

No response

Code of Conduct

  • I agree to follow Morpheus' Code of Conduct
  • I have searched the open bugs and have found no duplicates for this bug report
@efajardo-nv efajardo-nv added the bug Something isn't working label Feb 1, 2024
@efajardo-nv efajardo-nv added the external This issue was filed by someone outside of the Morpheus team label Feb 16, 2024
@efajardo-nv
Copy link
Contributor Author

#1837

@mdemoret-nv mdemoret-nv self-assigned this Aug 12, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working external This issue was filed by someone outside of the Morpheus team
Projects
Status: Todo
Development

No branches or pull requests

2 participants