Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add configurable progress and session info indicators #751

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

* Pins `ipython<8.0.0` because of breaking changes. Thanks @utkarshgupta137

* Added customization options for startup information and progress indicator

## 0.19.1

### Bug Fixes
Expand All @@ -33,7 +35,7 @@
### Features

* Added one internal magic to enable retry of session creation. Thanks @edwardps
* New `%%pretty` magic for pretty printing a dataframe as an HTML table. Thanks @hegary
* New `%%pretty` magic for pretty printing a dataframe as an HTML table. Thanks @hegary
* Update Endpoint widget to shield passwords when entering them in the ipywidget. Thanks @J0rg3M3nd3z @jodom961

## 0.18.0
Expand Down Expand Up @@ -129,4 +131,3 @@
* Updated code to work with Livy 0.5 and later, where Python 3 support is not a different kind of session. Thanks to Gianmario Spacagna for contributing some of the code, and G-Research for sponsoring Itamar Turner-Trauring's time.
* Fixed `AttributeError` on `None`, thanks to Eric Dill.
* `recovering` session status won't cause a blow up anymore. Thanks to G-Research for sponsoring Itamar Turner-Trauring's time.

38 changes: 38 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,44 @@ If you want any registered livy sessions to be cleaned up on exit regardless of
}
```

## Notebook customizations

There are several ways in which sparkmagic gives feedback via the Jupyter notebook display system. This section
describes customizations available to modify the default behavior for such interactions, where applicable.

### Startup info table

When a session starts up, sparkmagic displays a table of session information including application name and the links to
sparkUI and logs. The display may be overridden by specifying a custom class:

```json
{
"startup_info_display_class": "module.path.classname"
}
```

The class should be a subclass of [StartupInfoDisplay](sparkmagic/sparkmagic/utils/startupinfo.py). It will be passed
the ipython_display, the LivySession object and the current session id. It should implement the `write_msg(msg)` method
to write a line of status output (by default this writes text to the current cell output), and the `display()`
method to show session information (by default, this displays an HTML table).

### Statement progress indicator

By default, sparkmagic uses the FloatProgress ipython widget to display the progress of a statement in the cell
output. If this is not desired, override the class used to construct the progress indicator:

```json
{
"progress_indicator_class": "module.path.classname"
}
```

The class should be a subclass of [ProgressIndicator](sparkmagic/sparkmagic/utils/progress.py) and will be passed the
session object and statement_id as arguments to the constructor. The `display()` method will be called after
initialization and should arrange to display the widget via the `ipython_display` attribute of the session object. The
`update(value_in_pct)` method will be called on progress and the `close()` method will be called when the statement
completes. By default this uses a horizontal FloatProgress widget.

### Conf overrides in code

In addition to the conf at `~/.sparkmagic/config.json`, sparkmagic conf can be overridden programmatically in a notebook.
Expand Down
17 changes: 10 additions & 7 deletions sparkmagic/example_config.json
Original file line number Diff line number Diff line change
Expand Up @@ -21,20 +21,20 @@
"logging_config": {
"version": 1,
"formatters": {
"magicsFormatter": {
"magicsFormatter": {
"format": "%(asctime)s\t%(levelname)s\t%(message)s",
"datefmt": ""
}
},
"handlers": {
"magicsHandler": {
"magicsHandler": {
"class": "hdijupyterutils.filehandler.MagicsFileHandler",
"formatter": "magicsFormatter",
"home_path": "~/.sparkmagic"
}
},
"loggers": {
"magicsLogger": {
"magicsLogger": {
"handlers": ["magicsHandler"],
"level": "DEBUG",
"propagate": 0
Expand All @@ -43,7 +43,7 @@
},
"authenticators": {
"Kerberos": "sparkmagic.auth.kerberos.Kerberos",
"None": "sparkmagic.auth.customauth.Authenticator",
"None": "sparkmagic.auth.customauth.Authenticator",
"Basic_Access": "sparkmagic.auth.basic.Basic"
},

Expand All @@ -63,15 +63,18 @@
"coerce_dataframe": true,
"max_results_sql": 2500,
"pyspark_dataframe_encoding": "utf-8",

"heartbeat_refresh_seconds": 30,
"livy_server_heartbeat_timeout_seconds": 0,
"heartbeat_retry_seconds": 10,

"server_extension_default_kernel_name": "pysparkkernel",
"custom_headers": {},

"retry_policy": "configurable",
"retry_seconds_to_sleep_list": [0.2, 0.5, 1, 3, 5],
"configurable_retry_policy_max_retries": 8
"configurable_retry_policy_max_retries": 8,

"progress_indicator_class": "sparkmagic.utils.progress.defaultProgressIndicator",
"startup_info_display_class": "sparkmagic.utils.startupinfo.defaultStartupInfoDisplay"
}
18 changes: 6 additions & 12 deletions sparkmagic/sparkmagic/livyclientlib/command.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import sparkmagic.utils.configuration as conf
from sparkmagic.utils.sparklogger import SparkLog
from sparkmagic.utils.sparkevents import SparkEvents
from sparkmagic.utils.utils import get_progress_indicator_class
from sparkmagic.utils.constants import (
MAGICS_LOGGER_NAME,
FINAL_STATEMENT_STATUS,
Expand All @@ -35,6 +36,7 @@ def __init__(self, code, spark_events=None):
if spark_events is None:
spark_events = SparkEvents()
self._spark_events = spark_events
self.progress_indicator_class = get_progress_indicator_class()

def __repr__(self):
return "Command({}, ...)".format(repr(self.code))
Expand Down Expand Up @@ -106,17 +108,9 @@ def execute(self, session):

def _get_statement_output(self, session, statement_id):
retries = 1
progress = FloatProgress(
value=0.0,
min=0,
max=1.0,
step=0.01,
description="Progress:",
bar_style="info",
orientation="horizontal",
layout=Layout(width="50%", height="25px"),
)
session.ipython_display.display(progress)

progress = self.progress_indicator_class(session, statement_id)
progress.display()

while True:
statement = session.http_client.get_statement(session.id, statement_id)
Expand All @@ -127,7 +121,7 @@ def _get_statement_output(self, session, statement_id):
)

if status not in FINAL_STATEMENT_STATUS:
progress.value = statement.get("progress", 0.0)
progress.update(statement.get("progress", 0.0))
session.sleep(retries)
retries += 1
else:
Expand Down
24 changes: 12 additions & 12 deletions sparkmagic/sparkmagic/livyclientlib/livysession.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import sparkmagic.utils.constants as constants
from sparkmagic.utils.sparklogger import SparkLog
from sparkmagic.utils.sparkevents import SparkEvents
from sparkmagic.utils.utils import get_sessions_info_html
from sparkmagic.utils.utils import get_startup_info_display_class
from .configurableretrypolicy import ConfigurableRetryPolicy
from .command import Command
from .exceptions import (
Expand Down Expand Up @@ -137,6 +137,8 @@ def __init__(
self.id = session_id
self.session_info = ""

self.startup_info_display = get_startup_info_display_class()

self._heartbeat_thread = None
if session_id == -1:
self.status = constants.NOT_STARTED_SESSION_STATUS
Expand Down Expand Up @@ -164,7 +166,10 @@ def start(self):
self.id = r["id"]
self.status = str(r["state"])

self.ipython_display.writeln("Starting Spark application")
startup_info = self.startup_info_display(
self.ipython_display, [self], self.id
)
startup_info.write_msg("Starting Spark application")

# Start heartbeat thread to keep Livy interactive session alive.
self._start_heartbeat_thread()
Expand All @@ -179,28 +184,23 @@ def start(self):
)
)

html = get_sessions_info_html([self], self.id)
self.ipython_display.html(html)
startup_info.display()

command = Command("spark")
(success, out, mimetype) = command.execute(self)

if success:
self.ipython_display.writeln("SparkSession available as 'spark'.")
startup_info.write_msg("SparkSession available as 'spark'.")
self.sql_context_variable_name = "spark"
else:
command = Command("sqlContext")
(success, out, mimetype) = command.execute(self)
if success:
self.ipython_display.writeln("SparkContext available as 'sc'.")
startup_info.write_msg("SparkContext available as 'sc'.")
if "hive" in out.lower():
self.ipython_display.writeln(
"HiveContext available as 'sqlContext'."
)
startup_info.write_msg("HiveContext available as 'sqlContext'.")
else:
self.ipython_display.writeln(
"SqlContext available as 'sqlContext'."
)
startup_info.write_msg("SqlContext available as 'sqlContext'.")
self.sql_context_variable_name = "sqlContext"
else:
raise SqlContextNotFoundException(
Expand Down
18 changes: 17 additions & 1 deletion sparkmagic/sparkmagic/tests/test_configuration.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,16 @@
from mock import MagicMock
from nose.tools import assert_equals, assert_not_equals, raises, with_setup
from nose.tools import assert_equals, assert_not_equals, assert_true, raises, with_setup
import json

import sparkmagic.utils.configuration as conf
from sparkmagic.livyclientlib.exceptions import BadUserConfigurationException
from sparkmagic.utils.constants import AUTH_BASIC, NO_AUTH
from sparkmagic.utils.utils import (
get_progress_indicator_class,
get_startup_info_display_class,
)
from sparkmagic.utils.progress import ProgressIndicator
from sparkmagic.utils.startupinfo import StartupInfoDisplay


def _setup():
Expand Down Expand Up @@ -126,3 +132,13 @@ def test_share_config_between_pyspark_and_pyspark3():
conf.base64_kernel_python3_credentials(),
conf.base64_kernel_python_credentials(),
)


@with_setup(_setup)
def test_default_progress_class_valid():
assert_true(issubclass(get_progress_indicator_class(), ProgressIndicator))


@with_setup(_setup)
def test_default_startup_class_valid():
assert_true(issubclass(get_startup_info_display_class(), StartupInfoDisplay))
10 changes: 10 additions & 0 deletions sparkmagic/sparkmagic/utils/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,16 @@ def kerberos_auth_configuration():
return {"mutual_authentication": REQUIRED}


@_with_override
def progress_indicator_class():
return "sparkmagic.utils.progress.HorizontalFloatProgressWidgetIndicator"


@_with_override
def startup_info_display_class():
return "sparkmagic.utils.startupinfo.HTMLTableStartupInfoDisplay"


def _credentials_override(f):
"""Provides special handling for credentials. It still calls _override().
If 'base64_password' in config is set, it will base64 decode it and returned in return value's 'password' field.
Expand Down
40 changes: 40 additions & 0 deletions sparkmagic/sparkmagic/utils/progress.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
from ipywidgets.widgets import FloatProgress, Layout


class ProgressIndicator:
def __init__(self, session, statement_id):
pass

def display(self):
pass

def update(self, value):
pass

def close(self):
pass


class HorizontalFloatProgressWidgetIndicator(ProgressIndicator):
def __init__(self, session, statement_id):
self.session = session
self.statement_id = statement_id
self.progress = FloatProgress(
value=0.0,
min=0,
max=1.0,
step=0.01,
description="Progress:",
bar_style="info",
orientation="horizontal",
layout=Layout(width="50%", height="25px"),
)

def display(self):
self.session.ipython_display.display(self.progress)

def update(self, value):
self.progress.value = value

def close(self):
self.progress.close()
24 changes: 24 additions & 0 deletions sparkmagic/sparkmagic/utils/startupinfo.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
from sparkmagic.utils.utils import get_sessions_info_html


class StartupInfoDisplay:
def __init__(self, ipython_display, sessions_info, current_session_id):
self.ipython_display = ipython_display
self.sessions_info = sessions_info
self.current_session_id = current_session_id

def write_msg(self, msg):
pass

def display(self):
pass


class HTMLTableStartupInfoDisplay(StartupInfoDisplay):
def display(self):
self.ipython_display.html(
get_sessions_info_html(self.sessions_info, self.current_session_id)
)

def write_msg(self, msg):
self.ipython_display.writeln(msg)
19 changes: 16 additions & 3 deletions sparkmagic/sparkmagic/utils/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,13 @@ def get_sessions_info_html(info_sessions, current_session_id):
return html


def load_class_from_string(full_class):
module, class_name = full_class.rsplit(".", 1)
class_module = importlib.import_module(module)
class_class = getattr(class_module, class_name)
return class_class


def initialize_auth(args):
"""Creates an authenticatior class instance for the given auth type

Expand All @@ -130,12 +137,18 @@ def initialize_auth(args):
full_class = conf.authenticators().get(auth)
if full_class is None:
raise BadUserConfigurationException("Auth '{}' not supported".format(auth))
module, class_name = (full_class).rsplit(".", 1)
events_handler_module = importlib.import_module(module)
auth_class = getattr(events_handler_module, class_name)
auth_class = load_class_from_string(full_class)
return auth_class(args)


def get_progress_indicator_class():
return load_class_from_string(conf.progress_indicator_class())


def get_startup_info_display_class():
return load_class_from_string(conf.startup_info_display_class())


class Namespace:
"""Namespace to initialize authenticator class with"""

Expand Down