Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

UDF execution error when embeded connector is used - from confluent platform version 7.6.2 - Critical #10457

Open
zukowskilz opened this issue Oct 4, 2024 · 3 comments

Comments

@zukowskilz
Copy link

Describe the bug

When in KSQL connect worker is enabled by
ksql.connect.worker.config=config/connect.properties

and in connect.properties value is set as
plugin.discovery=service_load

then when executing any query with string param eg. as below
select LCASE('aaaa') from KSQL_PROCESSING_LOG limit 1;

Then throwing exception
[2024-10-04 11:34:14,221] ERROR Failed to execute query (io.confluent.ksql.api.server.ServerUtils:118) java.lang.RuntimeException: Unexpected error generating code for Select at io.confluent.ksql.execution.codegen.CodeGenRunner.buildCodeGenFromParseTree(CodeGenRunner.java:178) at io.confluent.ksql.execution.transform.select.SelectValueMapperFactory.buildSelect(SelectValueMapperFactory.java:81) at java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195) at java.base/java.util.Collections$2.tryAdvance(Collections.java:4747) at java.base/java.util.Collections$2.forEachRemaining(Collections.java:4755) at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484) at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474) at java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:913) at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) at java.base/java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:578) at io.confluent.ksql.execution.transform.select.SelectValueMapperFactory.buildSelects(SelectValueMapperFactory.java:92) at io.confluent.ksql.execution.transform.select.SelectValueMapperFactory.create(SelectValueMapperFactory.java:76) at io.confluent.ksql.execution.transform.select.SelectValueMapperFactory.create(SelectValueMapperFactory.java:69) at io.confluent.ksql.execution.transform.select.Selection.of(Selection.java:45) at io.confluent.ksql.execution.streams.StepSchemaResolver.buildSelectSchema(StepSchemaResolver.java:385) at io.confluent.ksql.execution.streams.StepSchemaResolver.handleStreamSelect(StepSchemaResolver.java:254) at io.confluent.ksql.util.HandlerMaps$BuilderR2.lambda$castHandler2$2(HandlerMaps.java:840) at io.confluent.ksql.execution.streams.StepSchemaResolver.lambda$resolve$0(StepSchemaResolver.java:135) at java.base/java.util.Optional.map(Optional.java:265) at io.confluent.ksql.execution.streams.StepSchemaResolver.resolve(StepSchemaResolver.java:135) at io.confluent.ksql.structured.SchemaKStream.resolveSchema(SchemaKStream.java:477) at io.confluent.ksql.structured.SchemaKStream.select(SchemaKStream.java:160) at io.confluent.ksql.planner.plan.ProjectNode.buildStream(ProjectNode.java:45) at io.confluent.ksql.planner.plan.KsqlBareOutputNode.buildStream(KsqlBareOutputNode.java:52) at io.confluent.ksql.execution.ExecutionPlanBuilder.buildPhysicalPlan(ExecutionPlanBuilder.java:72) at io.confluent.ksql.engine.QueryEngine.buildPhysicalPlan(QueryEngine.java:104) at io.confluent.ksql.engine.EngineExecutor.planQuery(EngineExecutor.java:824) at io.confluent.ksql.engine.EngineExecutor.executeStreamPullQuery(EngineExecutor.java:486) at io.confluent.ksql.engine.KsqlEngine.createStreamPullQuery(KsqlEngine.java:425) at io.confluent.ksql.rest.server.query.QueryExecutor.handleStreamPullQuery(QueryExecutor.java:372) at io.confluent.ksql.rest.server.query.QueryExecutor.handleQuery(QueryExecutor.java:209) at io.confluent.ksql.rest.server.query.QueryExecutor.handleStatement(QueryExecutor.java:128) at io.confluent.ksql.api.impl.QueryEndpoint.createQueryPublisher(QueryEndpoint.java:107) at io.confluent.ksql.rest.server.KsqlServerEndpoints.lambda$createQueryPublisher$0(KsqlServerEndpoints.java:147) at io.confluent.ksql.rest.server.KsqlServerEndpoints.lambda$executeOnWorker$23(KsqlServerEndpoints.java:334) at io.vertx.core.impl.ContextBase.lambda$executeBlocking$1(ContextBase.java:180) at io.vertx.core.impl.ContextInternal.dispatch(ContextInternal.java:277) at io.vertx.core.impl.ContextBase.lambda$internalExecuteBlocking$2(ContextBase.java:199) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) at java.base/java.lang.Thread.run(Thread.java:829) Caused by: org.codehaus.commons.compiler.InternalCompilerException: Compiling "SC" in Line 1, Column 92: Line 1, Column 92: Compiling "eval0(java.util.Map arguments, java.lang.Object defaultValue, io.confluent.ksql.logging.processing.ProcessingLogger logger, io.confluent.ksql.GenericRow row)" at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:403) at org.codehaus.janino.UnitCompiler.access$000(UnitCompiler.java:237) at org.codehaus.janino.UnitCompiler$2.visitCompilationUnit(UnitCompiler.java:364) at org.codehaus.janino.UnitCompiler$2.visitCompilationUnit(UnitCompiler.java:362) at org.codehaus.janino.Java$CompilationUnit.accept(Java.java:371) at org.codehaus.janino.UnitCompiler.compileUnit(UnitCompiler.java:362) at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:273) at org.codehaus.janino.ClassBodyEvaluator.cook(ClassBodyEvaluator.java:303) at org.codehaus.janino.ScriptEvaluator.cook(ScriptEvaluator.java:870) at org.codehaus.janino.ScriptEvaluator.cook(ScriptEvaluator.java:787) at org.codehaus.janino.ScriptEvaluator.cook(ScriptEvaluator.java:771) at org.codehaus.janino.ExpressionEvaluator.cook2(ExpressionEvaluator.java:497) at org.codehaus.janino.ExpressionEvaluator.cook(ExpressionEvaluator.java:452) at org.codehaus.janino.ExpressionEvaluator.cook(ExpressionEvaluator.java:443) at org.codehaus.janino.ExpressionEvaluator.cook(ExpressionEvaluator.java:409) at org.codehaus.janino.ExpressionEvaluator.cook(ExpressionEvaluator.java:394) at org.codehaus.commons.compiler.Cookable.cook(Cookable.java:82) at org.codehaus.commons.compiler.Cookable.cook(Cookable.java:77) at io.confluent.ksql.execution.codegen.CodeGenRunner.cook(CodeGenRunner.java:197) at io.confluent.ksql.execution.codegen.CodeGenRunner.buildCodeGenFromParseTree(CodeGenRunner.java:166) ... 41 more Caused by: org.codehaus.commons.compiler.InternalCompilerException: Line 1, Column 92: Compiling "eval0(java.util.Map arguments, java.lang.Object defaultValue, io.confluent.ksql.logging.processing.ProcessingLogger logger, io.confluent.ksql.GenericRow row)" at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:3334) at org.codehaus.janino.UnitCompiler.compileDeclaredMethods(UnitCompiler.java:1448) at org.codehaus.janino.UnitCompiler.compileDeclaredMethods(UnitCompiler.java:1421) at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:830) at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:443) at org.codehaus.janino.UnitCompiler.access$400(UnitCompiler.java:237) at org.codehaus.janino.UnitCompiler$3.visitPackageMemberClassDeclaration(UnitCompiler.java:423) at org.codehaus.janino.UnitCompiler$3.visitPackageMemberClassDeclaration(UnitCompiler.java:419) at org.codehaus.janino.Java$PackageMemberClassDeclaration.accept(Java.java:1688) at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:419) at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:393) ... 60 more Caused by: java.lang.RuntimeException: Line 1, Column 92 at org.codehaus.janino.UnitCompiler.compileStatements(UnitCompiler.java:1664) at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:3665) at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:3330) ... 70 more Caused by: java.lang.RuntimeException: Line 1, Column 92 at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:1605) at org.codehaus.janino.UnitCompiler.compileStatements(UnitCompiler.java:1662) ... 72 more Caused by: org.codehaus.commons.compiler.InternalCompilerException: Line 1, Column 9: Compiling "((String) ((io.confluent.ksql.function.udf.Kudf) arguments.get("LCASE_0")).evaluate("aaaa"))" at org.codehaus.janino.UnitCompiler.compileGetValue(UnitCompiler.java:5887) at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:2849) at org.codehaus.janino.UnitCompiler.access$2800(UnitCompiler.java:237) at org.codehaus.janino.UnitCompiler$6.visitReturnStatement(UnitCompiler.java:1591) at org.codehaus.janino.UnitCompiler$6.visitReturnStatement(UnitCompiler.java:1576) at org.codehaus.janino.Java$ReturnStatement.accept(Java.java:3888) at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:1576) ... 73 more Caused by: org.codehaus.commons.compiler.InternalCompilerException: "findIClass("LString;")" did not call "defineIClass()"!? at org.codehaus.janino.IClassLoader.loadIClass(IClassLoader.java:330) at org.codehaus.janino.UnitCompiler.findTypeByName(UnitCompiler.java:8926) at org.codehaus.janino.UnitCompiler.getRawReferenceType(UnitCompiler.java:7096) at org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:7005) at org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6886) at org.codehaus.janino.UnitCompiler.getType2(UnitCompiler.java:6857) at org.codehaus.janino.UnitCompiler.access$14800(UnitCompiler.java:237) at org.codehaus.janino.UnitCompiler$24.visitReferenceType(UnitCompiler.java:6755) at org.codehaus.janino.UnitCompiler$24.visitReferenceType(UnitCompiler.java:6752) at org.codehaus.janino.Java$ReferenceType.accept(Java.java:4289) at org.codehaus.janino.UnitCompiler.getType(UnitCompiler.java:6752) at org.codehaus.janino.UnitCompiler.compileGet2(UnitCompiler.java:5228) at org.codehaus.janino.UnitCompiler.access$8800(UnitCompiler.java:237) at org.codehaus.janino.UnitCompiler$16.visitCast(UnitCompiler.java:4761) at org.codehaus.janino.UnitCompiler$16.visitCast(UnitCompiler.java:4742) at org.codehaus.janino.Java$Cast.accept(Java.java:5283) at org.codehaus.janino.UnitCompiler.compileGet(UnitCompiler.java:4742) at org.codehaus.janino.UnitCompiler.compileGet2(UnitCompiler.java:5258) at org.codehaus.janino.UnitCompiler.access$8300(UnitCompiler.java:237) at org.codehaus.janino.UnitCompiler$16$1.visitParenthesizedExpression(UnitCompiler.java:4753) at org.codehaus.janino.UnitCompiler$16$1.visitParenthesizedExpression(UnitCompiler.java:4746) at org.codehaus.janino.Java$ParenthesizedExpression.accept(Java.java:5309) at org.codehaus.janino.UnitCompiler$16.visitLvalue(UnitCompiler.java:4746) at org.codehaus.janino.UnitCompiler$16.visitLvalue(UnitCompiler.java:4742) at org.codehaus.janino.Java$Lvalue.accept(Java.java:4528) at org.codehaus.janino.UnitCompiler.compileGet(UnitCompiler.java:4742) at org.codehaus.janino.UnitCompiler.compileGetValue(UnitCompiler.java:5885) ... 79 more

I did some experiments and I think that this is related with janino library upgrade.
In KSQLdb 7.6.1 janino is 3.0.7 and all is working fine
but in KSQLdb from 7.6.2 until now is janino 3.1.10 and here problem was started.

To Reproduce
Steps to reproduce the behavior, include:

  1. The version of KSQL.
    7.6.2
  2. Sample source data.
    enable connect worker in kslq-server.properties
    ksql.connect.worker.config=config/connect.properties

set plugin discovery in connect.properties
plugin.discovery=service_load

create stream foo(a string) with(kafka_topic='foo',partitions=1, format='JSON');

  1. Any SQL statements you ran
    select LCASE('aaaa') from foo;

Expected behavior
A clear and concise description of what you expected to happen.
Statement should be executed

Actual behaviour
A clear and concise description of what actually happens, including:

  1. CLI output
    image

  2. Error messages
    The server encountered an internal error when processing the query. Please consult the server logs for more information.

  3. KSQL logs
    ksql.log

Additional context
Add any other context about the problem here.

@choiwonpyo
Copy link

Hello zukowskilz, I had the same issue. thank you for your comment, and i was happy that same persion exists.

some security issue, i have to update my ksqldb version.
and i got the same issue. (janino compiler error)

i got "janino error" when CSAS (Create Stream As Select), and use "SCALA FUNCTION" like REGEX_REPLACE.
(REGEX: https://docs.ksqldb.io/en/0.27.1-ksqldb/developer-guide/ksqldb-reference/scalar-functions/#regexp_replace)

it was very weired, because i used that "scala function" with older version.
and i found out, janino version has changed from some sub version of all 7.3.x, 7.2.x, 7.1x ...

https://docs.confluent.io/platform/7.3/release-notes/changelog.html
https://docs.confluent.io/platform/7.2/release-notes/changelog.html
https://docs.confluent.io/platform/7.1/release-notes/changelog.html

(can find janino version changed commit)


I just solve(?) this problem, with using 7.2.9 version ( https://www.confluent.io/previous-versions/ )

  • some reason, 7.2.10 download link has broken.
  • and, the latest version with janino 3.0.7

@Ocramius
Copy link

Ocramius commented Feb 4, 2025

I just experienced this with a pure SQL-ish statement:

CREATE STREAM example
WITH (
 KAFKA_TOPIC = 'output_topic',
 PARTITIONS = 1
) AS
SELECT
 `KEY`,
 `data`
FROM input_stream
WHERE (`customer` -> `id`) = 2
PARTITION BY `KEY`
EMIT CHANGES;

this produces roughly:

Unexpected error generating code for Predicate
Caused by: Compiling "SC" in Line 1, Column 575: Line 1, Column 575: Compiling
	"eval0(java.util.Map arguments, java.lang.Object defaultValue,
	io.confluent.ksql.logging.processing.ProcessingLogger logger,
	io.confluent.ksql.GenericRow row)"
Caused by: Line 1, Column 575: Compiling "eval0(java.util.Map arguments,
	java.lang.Object defaultValue,
	io.confluent.ksql.logging.processing.ProcessingLogger logger,
	io.confluent.ksql.GenericRow row)"
Caused by: Line 1, Column 575
Caused by: Line 1, Column 215: Compiling "((((Object) (((Integer)
	(((org.apache.kafka.connect.data.Struct) arguments.get("var0")) == null ? null :
	((org.apache.kafka.connect.data.Struct) arguments.get("var0")).get("id"))))) ==
	null || ((Object) (2)) == null) ? false : ((((Integer)
	(((org.apache.kafka.connect.data.Struct) arguments.get("var0")) == null ? null :
	((org.apache.kafka.connect.data.Struct) arguments.get("var0")).get("id"))) <= 2)
	&& (((Integer) (((org.apache.kafka.connect.data.Struct) arguments.get("var0"))
	== null ? null : ((org.apache.kafka.connect.data.Struct)
	arguments.get("var0")).get("id"))) >= 2)))"
Caused by: Line 1, Column 11: Compiling "((Object) (((Integer)
	(((org.apache.kafka.connect.data.Struct) arguments.get("var0")) == null ? null :
	((org.apache.kafka.connect.data.Struct) arguments.get("var0")).get("id")))))"
Caused by: Line 1, Column 22: Compiling "(((Integer)
	(((org.apache.kafka.connect.data.Struct) arguments.get("var0")) == null ? null :
	((org.apache.kafka.connect.data.Struct) arguments.get("var0")).get("id"))))"
Caused by: "findIClass("LInteger;")" did not call "defineIClass()"!?

@choiwonpyo do you perhaps know which version of kSQL presents usable behavior here?

Also: I don't get these errors when testing locally with ksql-test-runner 🤔

@Ocramius
Copy link

Ocramius commented Feb 4, 2025

I just verified that 7.3.11 is BROKEN, and 7.2.9 is WORKING for me.

UDFs don't seem to be the root cause: will check diffs between these two versions next.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

3 participants