Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

%collect line magic for using inside loops #432

Open
wants to merge 12 commits into
base: master
Choose a base branch
from
22 changes: 12 additions & 10 deletions sparkmagic/sparkmagic/magics/remotesparkmagics.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,22 +54,24 @@ def manage_spark(self, line, local_ns=None):
@argument("-e", "--coerce", type=str, default=None, help="Whether to automatically coerce the types (default, pass True if being explicit) "
"of the dataframe or not (pass False)")
@handle_expected_exceptions
def collect(self, line, local_ns=None):
args = parse_argstring_or_throw(self.spark, line)
coerce = get_coerce_value(args.coerce)
if (len(args.command) > 0):
def spark_collect(self, line, local_ns=None):
args = parse_argstring_or_throw(self.spark, line)
coerce = get_coerce_value(args.coerce)
if (len(args.command) > 0):
command = " ".join(args.command)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please follow the 4-space indentation convention?

else:
else:
command = ""
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add a new line after every code block, like after this if/else code block.

if args.context == CONTEXT_NAME_SPARK:
if args.context == CONTEXT_NAME_SPARK:
return self.execute_spark(command, args.output, args.samplemethod, args.maxrows, args.samplefraction, None, coerce)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add a new line after every return call in this method

elif args.context == CONTEXT_NAME_SQL:

elif args.context == CONTEXT_NAME_SQL:
args.session = None
args.quiet = None
return self.execute_sqlquery(command, args.samplemethod, args.maxrows, args.samplefraction,
args.session, args.output, args.quiet, coerce)
else:
self.ipython_display.send_error("Context '{}' not found".format(args.context))
args.session, args.output, args.quiet, coerce)

else:
self.ipython_display.send_error("Context '{}' not found".format(args.context))


Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@magic_arguments()
Expand Down
287 changes: 287 additions & 0 deletions sparkmagic/sparkmagic/tests/test_remotesparkmagics.py
Original file line number Diff line number Diff line change
Expand Up @@ -548,3 +548,290 @@ def test_logs_exception():
assert result is None
ipython_display.send_error.assert_called_once_with(EXPECTED_ERROR_MSG
.format(get_logs_method.side_effect))


# New Tests for the spark_collect line magic
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need to have this comment :)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So nice to see tests! Thanks!


#@with_setup(_setup, _teardown)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this commented out?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At first it was a mistake.. but actually I don't think this test is necessary in this case.. The analogous test for the %spark magic is meant to check for the "subcommand" parameter, which does not exist in this case. If anyone writes a bad command in %spark_collect line, it would always throw an error. What do you think?

#def test_spark_collect_bad_command_writes_error():
# line = "bad_command"
# usage = "Please look at usage of %spark_collect by executing `%spark_collect?`."
#
# magic.spark_collect(line)
#
# ipython_display.send_error.assert_called_once_with("Subcommand '{}' not found. {}".format(line, usage))


@with_setup(_setup, _teardown)
def test_spark_collect_run_cell_command_parses():
run_cell_method = MagicMock()
result_value = ""
run_cell_method.return_value = (True, result_value)
spark_controller.run_command = run_cell_method

name = None
line = ""
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This variable is never used in tests. Please remove in all tests where it's not needed.

commandline = "command"

result = magic.spark_collect(commandline)

run_cell_method.assert_called_once_with(Command(commandline), name)
assert result is None
ipython_display.write.assert_called_once_with(result_value)


@with_setup(_setup, _teardown)
def test_spark_collect_run_cell_command_writes_to_err():
run_cell_method = MagicMock()
result_value = ""
run_cell_method.return_value = (False, result_value)
spark_controller.run_command = run_cell_method

name = None
line = ""
commandline = "command"

result = magic.spark_collect(commandline)

run_cell_method.assert_called_once_with(Command(commandline), name)
assert result is None
ipython_display.send_error.assert_called_once_with(result_value)


@with_setup(_setup, _teardown)
def test_spark_collect_run_cell_command_exception():
run_cell_method = MagicMock()
run_cell_method.side_effect = HttpClientException('meh')
spark_controller.run_command = run_cell_method

name = None
line = ""
commandline = "command"

result = magic.spark_collect(commandline)

run_cell_method.assert_called_once_with(Command(commandline), name)
assert result is None
ipython_display.send_error.assert_called_once_with(EXPECTED_ERROR_MSG
.format(run_cell_method.side_effect))


@with_setup(_setup, _teardown)
def test_spark_collect_run_spark_command_parses():
magic.execute_spark = MagicMock()

name = None
context = "-c"
context_name = "spark"
meth = "-m"
method_name = "sample"
commandline = "command"
line = " ".join([context, context_name, meth, method_name, commandline])

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove extra white space


result = magic.spark_collect(line)

magic.execute_spark.assert_called_once_with("command",
None, "sample", None, None, None, None)


@with_setup(_setup, _teardown)
def test_spark_collect_run_spark_command_parses_with_coerce():
magic.execute_spark = MagicMock()

name = None
context = "-c"
context_name = "spark"
meth = "-m"
method_name = "sample"
coer = "--coerce"
coerce_value = "True"
commandline = "command"
line = " ".join([context, context_name, meth, method_name, coer, coerce_value, commandline])

result = magic.spark_collect(line)

magic.execute_spark.assert_called_once_with("command",
None, "sample", None, None, None, True)


@with_setup(_setup, _teardown)
def test_spark_collect_run_spark_command_parses_with_coerce_false():
magic.execute_spark = MagicMock()

name = None
context = "-c"
context_name = "spark"
meth = "-m"
method_name = "sample"
coer = "--coerce"
coerce_value = "False"
commandline = "command"
line = " ".join([context, context_name, meth, method_name, coer, coerce_value, commandline])

result = magic.spark_collect(line)

magic.execute_spark.assert_called_once_with("command",
None, "sample", None, None, None, False)


@with_setup(_setup, _teardown)
def test_spark_collect_run_sql_command_parses_with_coerce_false():
magic.execute_sqlquery = MagicMock()

name = None
context = "-c"
context_name = "sql"
meth = "-m"
method_name = "sample"
coer = "--coerce"
coerce_value = "False"
commandline = "command"
line = " ".join([context, context_name, meth, method_name, coer, coerce_value, commandline])

result = magic.spark_collect(line)

magic.execute_sqlquery.assert_called_once_with("command",
"sample", None, None, None, None, None, False)


@with_setup(_setup, _teardown)
def test_spark_collect_run_spark_with_store_command_parses():
magic.execute_spark = MagicMock()

name = None
context = "-c"
context_name = "spark"
meth = "-m"
method_name = "sample"
output = "-o"
output_var = "var_name"
commandline = "command"
line = " ".join([context, context_name, meth, method_name, output, output_var, commandline])


result = magic.spark_collect(line)
magic.execute_spark.assert_called_once_with("command",
"var_name", "sample", None, None, None, None)


@with_setup(_setup, _teardown)
def test_spark_collect_run_spark_with_store_correct_calls():
run_cell_method = MagicMock()
run_cell_method.return_value = (True, "")
spark_controller.run_command = run_cell_method

#command = "-s"
name = "sessions_name"
context = "-c"
context_name = "spark"
meth = "-m"
method_name = "sample"
output = "-o"
output_var = "var_name"
coer = "--coerce"
coerce_value = "True"
commandline = "command"
line = " ".join([context, context_name, meth, method_name,
output, output_var, coer, coerce_value, commandline])

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove extra white space


result = magic.spark_collect(line)
run_cell_method.assert_any_call(Command(commandline), None)
run_cell_method.assert_any_call(SparkStoreCommand(output_var, samplemethod=method_name, coerce=True), None)


@with_setup(_setup, _teardown)
def test_spark_collect_run_spark_command_exception():
run_cell_method = MagicMock()
run_cell_method.side_effect = LivyUnexpectedStatusException('WOW')
spark_controller.run_command = run_cell_method

name = None
context = "-c"
context_name = "spark"
meth = "-m"
method_name = "sample"
output = "-o"
output_var = "var_name"
commandline = "command"
line = " ".join([context, context_name, meth, method_name,
output, output_var, commandline])

result = magic.spark_collect(line)

run_cell_method.assert_any_call(Command(commandline), name)
ipython_display.send_error.assert_called_once_with(EXPECTED_ERROR_MSG
.format(run_cell_method.side_effect))

@with_setup(_setup, _teardown)
def test_spark_collect_run_spark_command_exception_while_storing():
run_cell_method = MagicMock()
exception = LivyUnexpectedStatusException('WOW')
run_cell_method.side_effect = [(True, ""), exception]
spark_controller.run_command = run_cell_method


name = None
context = "-c"
context_name = "spark"
meth = "-m"
method_name = "sample"
output = "-o"
output_var = "var_name"
commandline = "command"
line = " ".join([context, context_name, meth, method_name,
output, output_var, commandline])

result = magic.spark_collect(line)

run_cell_method.assert_any_call(Command(commandline), name)
run_cell_method.assert_any_call(SparkStoreCommand(output_var, samplemethod=method_name), name)
ipython_display.write.assert_called_once_with("")
ipython_display.send_error.assert_called_once_with(EXPECTED_ERROR_MSG
.format(exception))



@with_setup(_setup, _teardown)
def test_spark_collect_run_sql_command_parses():
run_cell_method = MagicMock()
run_cell_method.return_value = (True, "")
spark_controller.run_sqlquery = run_cell_method

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove extra new line


name = None
context = "-c"
context_name = "sql"
meth = "-m"
method_name = "sample"
commandline = "command"
line = " ".join([context, context_name, meth, method_name, commandline])

result = magic.spark_collect(line)

run_cell_method.assert_called_once_with(SQLQuery(commandline, samplemethod=method_name), name)
assert result is not None


@with_setup(_setup, _teardown)
def test_spark_collect_run_sql_command_exception():
run_cell_method = MagicMock()
run_cell_method.side_effect = LivyUnexpectedStatusException('WOW')
spark_controller.run_sqlquery = run_cell_method

name = None
context = "-c"
context_name = "sql"
meth = "-m"
method_name = "sample"
commandline = "command"
line = " ".join([context, context_name, meth, method_name, commandline])

result = magic.spark_collect(line)

run_cell_method.assert_called_once_with(SQLQuery(commandline, samplemethod=method_name), name)
ipython_display.send_error.assert_called_once_with(EXPECTED_ERROR_MSG
.format(run_cell_method.side_effect))