Skip to content

Commit

Permalink
Fix schema parse error when using reserved keys as table names (#881)
Browse files Browse the repository at this point in the history
* bracketed name for reserved words as table name

* add test

* add tests

* fix csx test

* enable test only for csharp

* fix TableNotPresentTest

* revert GetUserTableIdAsync change

* refactor GetUserTableIdAsync

* comment out Java test

* refactor code to use SqlObject
  • Loading branch information
MaddyDev authored Jul 17, 2023
1 parent a310eec commit 1381df1
Show file tree
Hide file tree
Showing 20 changed files with 335 additions and 16 deletions.
6 changes: 3 additions & 3 deletions src/TriggerBinding/SqlTriggerListener.cs
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,8 @@ public SqlTriggerListener(string connectionString, string tableName, string user
}
this._hasConfiguredMaxChangesPerWorker = configuredMaxChangesPerWorker != null;

this._scaleMonitor = new SqlTriggerScaleMonitor(this._userFunctionId, this._userTable.FullName, this._connectionString, this._maxChangesPerWorker, this._logger);
this._targetScaler = new SqlTriggerTargetScaler(this._userFunctionId, this._userTable.FullName, this._connectionString, this._maxChangesPerWorker, this._logger);
this._scaleMonitor = new SqlTriggerScaleMonitor(this._userFunctionId, this._userTable, this._connectionString, this._maxChangesPerWorker, this._logger);
this._targetScaler = new SqlTriggerTargetScaler(this._userFunctionId, this._userTable, this._connectionString, this._maxChangesPerWorker, this._logger);
}

public void Cancel()
Expand Down Expand Up @@ -119,7 +119,7 @@ public async Task StartAsync(CancellationToken cancellationToken)

await VerifyDatabaseSupported(connection, this._logger, cancellationToken);

int userTableId = await GetUserTableIdAsync(connection, this._userTable.FullName, this._logger, cancellationToken);
int userTableId = await GetUserTableIdAsync(connection, this._userTable, this._logger, cancellationToken);
IReadOnlyList<(string name, string type)> primaryKeyColumns = await GetPrimaryKeyColumnsAsync(connection, userTableId, this._logger, this._userTable.FullName, cancellationToken);
IReadOnlyList<string> userTableColumns = await this.GetUserTableColumnsAsync(connection, userTableId, cancellationToken);

Expand Down
2 changes: 1 addition & 1 deletion src/TriggerBinding/SqlTriggerMetricsProvider.cs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ private async Task<long> GetUnprocessedChangeCountAsync()
{
await connection.OpenAsync();

int userTableId = await GetUserTableIdAsync(connection, this._userTable.FullName, this._logger, CancellationToken.None);
int userTableId = await GetUserTableIdAsync(connection, this._userTable, this._logger, CancellationToken.None);
IReadOnlyList<(string name, string type)> primaryKeyColumns = await GetPrimaryKeyColumnsAsync(connection, userTableId, this._logger, this._userTable.FullName, CancellationToken.None);

// Use a transaction to automatically release the app lock when we're done executing the query
Expand Down
6 changes: 3 additions & 3 deletions src/TriggerBinding/SqlTriggerScaleMonitor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,11 @@ internal sealed class SqlTriggerScaleMonitor : IScaleMonitor<SqlTriggerMetrics>
private readonly IDictionary<TelemetryPropertyName, string> _telemetryProps = new Dictionary<TelemetryPropertyName, string>();
private readonly int _maxChangesPerWorker;

public SqlTriggerScaleMonitor(string userFunctionId, string userTableName, string connectionString, int maxChangesPerWorker, ILogger logger)
public SqlTriggerScaleMonitor(string userFunctionId, SqlObject userTable, string connectionString, int maxChangesPerWorker, ILogger logger)
{
_ = !string.IsNullOrEmpty(userFunctionId) ? true : throw new ArgumentNullException(userFunctionId);
_ = !string.IsNullOrEmpty(userTableName) ? true : throw new ArgumentNullException(userTableName);
this._userTable = new SqlObject(userTableName);
_ = userTable != null ? true : throw new ArgumentNullException(nameof(userTable));
this._userTable = userTable;
// Do not convert the scale-monitor ID to lower-case string since SQL table names can be case-sensitive
// depending on the collation of the current database.
this.Descriptor = new ScaleMonitorDescriptor($"{userFunctionId}-SqlTrigger-{this._userTable.FullName}", userFunctionId);
Expand Down
4 changes: 2 additions & 2 deletions src/TriggerBinding/SqlTriggerTargetScaler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@ internal sealed class SqlTriggerTargetScaler : ITargetScaler
private readonly SqlTriggerMetricsProvider _metricsProvider;
private readonly int _maxChangesPerWorker;

public SqlTriggerTargetScaler(string userFunctionId, string userTableName, string connectionString, int maxChangesPerWorker, ILogger logger)
public SqlTriggerTargetScaler(string userFunctionId, SqlObject userTable, string connectionString, int maxChangesPerWorker, ILogger logger)
{
this._metricsProvider = new SqlTriggerMetricsProvider(connectionString, logger, new SqlObject(userTableName), userFunctionId);
this._metricsProvider = new SqlTriggerMetricsProvider(connectionString, logger, userTable, userFunctionId);
this.TargetScalerDescriptor = new TargetScalerDescriptor(userFunctionId);
this._maxChangesPerWorker = maxChangesPerWorker;
}
Expand Down
9 changes: 4 additions & 5 deletions src/TriggerBinding/SqlTriggerUtils.cs
Original file line number Diff line number Diff line change
Expand Up @@ -85,28 +85,27 @@ FROM sys.indexes AS i
/// Returns the object ID of the user table.
/// </summary>
/// <param name="connection">SQL connection used to connect to user database</param>
/// <param name="userTableName">Name of the user table</param>
/// <param name="userTable">SqlObject user table</param>
/// <param name="logger">Facilitates logging of messages</param>
/// <param name="cancellationToken">Cancellation token to pass to the command</param>
/// <exception cref="InvalidOperationException">Thrown in case of error when querying the object ID for the user table</exception>
public static async Task<int> GetUserTableIdAsync(SqlConnection connection, string userTableName, ILogger logger, CancellationToken cancellationToken)
internal static async Task<int> GetUserTableIdAsync(SqlConnection connection, SqlObject userTable, ILogger logger, CancellationToken cancellationToken)
{
var userTable = new SqlObject(userTableName);
string getObjectIdQuery = $"SELECT OBJECT_ID(N{userTable.QuotedFullName}, 'U');";

using (var getObjectIdCommand = new SqlCommand(getObjectIdQuery, connection))
using (SqlDataReader reader = await getObjectIdCommand.ExecuteReaderAsyncWithLogging(logger, cancellationToken))
{
if (!await reader.ReadAsync(cancellationToken))
{
throw new InvalidOperationException($"Received empty response when querying the object ID for table: '{userTableName}'.");
throw new InvalidOperationException($"Received empty response when querying the object ID for table: '{userTable.FullName}'.");
}

object userTableId = reader.GetValue(0);

if (userTableId is DBNull)
{
throw new InvalidOperationException($"Could not find table: '{userTableName}'.");
throw new InvalidOperationException($"Could not find table: '{userTable.FullName}'.");
}
logger.LogDebug($"GetUserTableId TableId={userTableId}");
return (int)userTableId;
Expand Down
44 changes: 44 additions & 0 deletions test-outofproc/ReservedTableNameTrigger.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.

using System.Collections.Generic;
using Microsoft.Extensions.Logging;
using Microsoft.Azure.Functions.Worker;
using Microsoft.Azure.Functions.Worker.Extensions.Sql;

namespace DotnetIsolatedTests
{
public static class ReservedTableNameTrigger
{
/// <summary>
/// Used in verification of the trigger function execution on table with reserved keys as name.
/// </summary>
[Function(nameof(ReservedTableNameTrigger))]
public static void Run(
[SqlTrigger("[dbo].[User]", "SqlConnectionString")]
IReadOnlyList<SqlChange<User>> changes,
FunctionContext context)
{
ILogger logger = context.GetLogger("ReservedTableNameTrigger");
logger.LogInformation("SQL Changes: " + Utils.JsonSerializeObject(changes));
}
}

public class User
{
public string UserName { get; set; }
public int UserId { get; set; }
public string FullName { get; set; }

public override bool Equals(object obj)
{
if (obj is User)
{
var that = obj as User;
return this.UserId == that.UserId && this.UserName == that.UserName && this.FullName == that.FullName;
}

return false;
}
}
}
7 changes: 7 additions & 0 deletions test/Database/Tables/User.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
DROP TABLE IF EXISTS [dbo].[User];

CREATE TABLE [dbo].[User] (
[UserId] [int] NOT NULL PRIMARY KEY,
[UserName] [nvarchar](50) NOT NULL,
[FullName] [nvarchar](max) NULL
)
65 changes: 65 additions & 0 deletions test/Integration/SqlTriggerBindingIntegrationTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -656,6 +656,71 @@ JOIN sys.columns c
Assert.True(1 == (int)this.ExecuteScalar("SELECT 1 FROM sys.columns WHERE Name = N'LastAccessTime' AND Object_ID = Object_ID(N'[az_func].[GlobalState]')"), $"{GlobalStateTableName} should have {LastAccessTimeColumnName} column after restarting the listener.");
}

/// <summary>
/// Tests that trigger function executes on table whose name is a reserved word (User).
/// </summary>
[Theory]
[SqlInlineData()]
[UnsupportedLanguages(SupportedLanguages.Java)] // test timing out for Java
public async void ReservedTableNameTest(SupportedLanguages lang)
{
this.SetChangeTrackingForTable("User");
this.StartFunctionHost(nameof(ReservedTableNameTrigger), lang, true);
User expectedResponse = Utils.JsonDeserializeObject<User>(/*lang=json,strict*/ "{\"UserId\":999,\"UserName\":\"test\",\"FullName\":\"Testy Test\"}");
int index = 0;
string messagePrefix = "SQL Changes: ";

var taskCompletion = new TaskCompletionSource<bool>();

void MonitorOutputData(object sender, DataReceivedEventArgs e)
{
if (e.Data != null && (index = e.Data.IndexOf(messagePrefix, StringComparison.Ordinal)) >= 0)
{
string json = e.Data[(index + messagePrefix.Length)..];
// Sometimes we'll get messages that have extra logging content on the same line - so to prevent that from breaking
// the deserialization we look for the end of the changes array and only use that.
// (This is fine since we control what content is in the array so know that none of the items have a ] in them)
json = json[..(json.IndexOf(']') + 1)];
IReadOnlyList<SqlChange<User>> changes;
try
{
changes = Utils.JsonDeserializeObject<IReadOnlyList<SqlChange<User>>>(json);
}
catch (Exception ex)
{
throw new InvalidOperationException($"Exception deserializing JSON content. Error={ex.Message} Json=\"{json}\"", ex);
}
Assert.Equal(SqlChangeOperation.Insert, changes[0].Operation); // Expected change operation
User user = changes[0].Item;
Assert.NotNull(user); // user deserialized correctly
Assert.Equal(expectedResponse, user); // user has the expected values
taskCompletion.SetResult(true);
}
};

// Set up listener for the changes coming in
foreach (Process functionHost in this.FunctionHostList)
{
functionHost.OutputDataReceived += MonitorOutputData;
}

// Now that we've set up our listener trigger the actions to monitor
this.ExecuteNonQuery("INSERT INTO [dbo].[User] VALUES (" +
"999, " + // UserId,
"'test', " + // UserName
"'Testy Test')"); // FullName

// Now wait until either we timeout or we've gotten all the expected changes, whichever comes first
this.LogOutput($"[{DateTime.UtcNow:u}] Waiting for Insert changes (10000ms)");
await taskCompletion.Task.TimeoutAfter(TimeSpan.FromMilliseconds(10000), $"Timed out waiting for Insert changes.");

// Unhook handler since we're done monitoring these changes so we aren't checking other changes done later
foreach (Process functionHost in this.FunctionHostList)
{
functionHost.OutputDataReceived -= MonitorOutputData;
}
}

/// <summary>
/// Ensures that all column types are serialized correctly.
/// </summary>
Expand Down
42 changes: 42 additions & 0 deletions test/Integration/test-csharp/ReservedTableNameTrigger.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.

using System.Collections.Generic;
using Microsoft.Extensions.Logging;

namespace Microsoft.Azure.WebJobs.Extensions.Sql.Tests.Integration
{
public static class ReservedTableNameTrigger
{
/// <summary>
/// Used in verification of the trigger function execution on table with reserved keys as name.
/// </summary>
[FunctionName(nameof(ReservedTableNameTrigger))]
public static void Run(
[SqlTrigger("[dbo].[User]", "SqlConnectionString")]
IReadOnlyList<SqlChange<User>> changes,
ILogger logger)
{
// The output is used to inspect the trigger binding parameter in test methods.
logger.LogInformation("SQL Changes: " + Utils.JsonSerializeObject(changes));
}
}

public class User
{
public string UserName { get; set; }
public int UserId { get; set; }
public string FullName { get; set; }

public override bool Equals(object obj)
{
if (obj is User)
{
var that = obj as User;
return this.UserId == that.UserId && this.UserName == that.UserName && this.FullName == that.FullName;
}

return false;
}
}
}
12 changes: 12 additions & 0 deletions test/Integration/test-csx/ReservedTableNameTrigger/function.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
{
"bindings": [
{
"name": "changes",
"type": "sqlTrigger",
"direction": "in",
"tableName": "[dbo].[User]",
"connectionStringSetting": "SqlConnectionString"
}
],
"disabled": false
}
32 changes: 32 additions & 0 deletions test/Integration/test-csx/ReservedTableNameTrigger/run.csx
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
#r "Newtonsoft.Json"
#r "Microsoft.Azure.WebJobs.Extensions.Sql"

using System.Net;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Extensions.Primitives;
using Newtonsoft.Json;
using Microsoft.Azure.WebJobs.Extensions.Sql;

public static void Run(IReadOnlyList<SqlChange<User>> changes, ILogger log)
{
// The output is used to inspect the trigger binding parameter in test methods.
log.LogInformation("SQL Changes: " + Microsoft.Azure.WebJobs.Extensions.Sql.Utils.JsonSerializeObject(changes));
}

public class User
{
public string UserName { get; set; }
public int UserId { get; set; }
public string FullName { get; set; }

public override bool Equals(object obj)
{
if (obj is User)
{
var that = obj as User;
return this.UserId == that.UserId && this.UserName == that.UserName && this.FullName == that.FullName;
}

return false;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/**
* Copyright (c) Microsoft Corporation. All rights reserved.
* Licensed under the MIT License. See License.txt in the project root for
* license information.
*/

package com.function.Common;

public class User {
private int UserId;
private String UserName;
private String FullName;

public User(int userId, String userName, String fullName) {
UserId = userId;
UserName = userName;
FullName = fullName;
}

public int getUserId() {
return UserId;
}

public String getUserName() {
return UserName;
}

public String getFullName() {
return FullName;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/**
* Copyright (c) Microsoft Corporation. All rights reserved.
* Licensed under the MIT License. See License.txt in the project root for
* license information.
*/

package com.function;

import com.function.Common.User;
import com.google.gson.Gson;
import com.microsoft.azure.functions.ExecutionContext;
import com.microsoft.azure.functions.annotation.FunctionName;
import com.microsoft.azure.functions.sql.annotation.SQLTrigger;

import java.util.logging.Level;

public class ReservedTableNameTrigger {
@FunctionName("ReservedTableNameTrigger")
public void run(
@SQLTrigger(
name = "changes",
tableName = "[dbo].[User]",
connectionStringSetting = "SqlConnectionString")
User[] changes,
ExecutionContext context) throws Exception {

context.getLogger().log(Level.INFO, "SQL Changes: " + new Gson().toJson(changes));
}
}
12 changes: 12 additions & 0 deletions test/Integration/test-js/ReservedTableNameTrigger/function.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
{
"bindings": [
{
"name": "changes",
"type": "sqlTrigger",
"direction": "in",
"tableName": "[dbo].[User]",
"connectionStringSetting": "SqlConnectionString"
}
],
"disabled": false
}
6 changes: 6 additions & 0 deletions test/Integration/test-js/ReservedTableNameTrigger/index.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.

module.exports = async function (context, changes) {
context.log(`SQL Changes: ${JSON.stringify(changes)}`)
}
Loading

0 comments on commit 1381df1

Please sign in to comment.