Skip to content

Commit

Permalink
Add tablesheet scrapper, store (#45)
Browse files Browse the repository at this point in the history
  • Loading branch information
Atralupus authored May 9, 2024
1 parent c5a6ac5 commit 02c9de6
Show file tree
Hide file tree
Showing 7 changed files with 167 additions and 46 deletions.
12 changes: 6 additions & 6 deletions Mimir.Worker/BlockPoller.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,14 @@

namespace Mimir.Worker;

public class BlockPoller(IStateService stateService, HeadlessGQLClient headlessGqlClient, MongoDbWorker mongoDbWorker)
public class BlockPoller(IStateService stateService, HeadlessGQLClient headlessGqlClient, MongoDbStore mongoDbStore)
{
public async Task RunAsync(CancellationToken cancellationToken)
{
var stateGetter = new StateGetter(stateService);
while (!cancellationToken.IsCancellationRequested)
{
var syncedBlockIndex = await mongoDbWorker.GetLatestBlockIndex();
var syncedBlockIndex = await mongoDbStore.GetLatestBlockIndex();
var currentBlockIndex = await stateService.GetLatestIndex();
var processBlockIndex = syncedBlockIndex + 1;

Expand All @@ -31,7 +31,7 @@ public async Task RunAsync(CancellationToken cancellationToken)
{
Serilog.Log.Error("Failed to get arena txs. errors:\n" +
string.Join("\n", rawArenaTxsResp.Errors.Select(x => "- " + x.Message)));
await mongoDbWorker.UpdateLatestBlockIndex(syncedBlockIndex + 1);
await mongoDbStore.UpdateLatestBlockIndex(syncedBlockIndex + 1);
continue;
}

Expand All @@ -56,13 +56,13 @@ public async Task RunAsync(CancellationToken cancellationToken)
var myArenaData = await stateGetter.GetArenaData(roundData, myAvatarAddress);
var enemyArenaData = await stateGetter.GetArenaData(roundData, enemyAvatarAddress);

await mongoDbWorker.BulkUpsertAvatarDataAsync(new List<AvatarData> { myAvatarData, enemyAvatarData });
await mongoDbWorker.BulkUpsertArenaDataAsync(new List<ArenaData> { myArenaData, enemyArenaData });
await mongoDbStore.BulkUpsertAvatarDataAsync(new List<AvatarData> { myAvatarData, enemyAvatarData });
await mongoDbStore.BulkUpsertArenaDataAsync(new List<ArenaData> { myArenaData, enemyArenaData });
}
}
finally
{
await mongoDbWorker.UpdateLatestBlockIndex(syncedBlockIndex + 1);
await mongoDbStore.UpdateLatestBlockIndex(syncedBlockIndex + 1);
}
}
}
Expand Down
20 changes: 12 additions & 8 deletions Mimir.Worker/Initializer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,36 +6,40 @@ namespace Mimir.Worker;

public class Initializer : BackgroundService
{
private readonly MongoDbWorker _worker;
private readonly ArenaScrapper _scrapper;
private readonly MongoDbStore _store;
private readonly ArenaScrapper _arenaScrapper;
private readonly TableSheetScrapper _tableSheetScrapper;
private readonly ILogger<Initializer> _logger;
private readonly HeadlessGQLClient _headlessGqlClient;
private readonly IStateService _stateService;

public Initializer(
ILogger<Initializer> logger,
ILogger<ArenaScrapper> scrapperLogger,
ILogger<ArenaScrapper> arenaScrapperLogger,
ILogger<TableSheetScrapper> tableSheetScrapperLogger,
HeadlessGQLClient headlessGqlClient,
IStateService stateService,
MongoDbWorker worker)
MongoDbStore store)
{
_logger = logger;
_stateService = stateService;
_worker = worker;
_store = store;
_headlessGqlClient = headlessGqlClient;
_scrapper = new ArenaScrapper(scrapperLogger, _stateService, _worker);
_arenaScrapper = new ArenaScrapper(arenaScrapperLogger, _stateService, _store);
_tableSheetScrapper = new TableSheetScrapper(tableSheetScrapperLogger, _stateService, _store);
}

protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
var started = DateTime.UtcNow;

await _scrapper.ExecuteAsync(stoppingToken);
await _tableSheetScrapper.ExecuteAsync(stoppingToken);
await _arenaScrapper.ExecuteAsync(stoppingToken);

var totalElapsedMinutes = DateTime.UtcNow.Subtract(started).Minutes;
_logger.LogInformation($"Finished Initializer background service. Elapsed {totalElapsedMinutes} minutes.");

var poller = new BlockPoller(_stateService, _headlessGqlClient, _worker);
var poller = new BlockPoller(_stateService, _headlessGqlClient, _store);
await poller.RunAsync(stoppingToken);
}
}
20 changes: 20 additions & 0 deletions Mimir.Worker/Models/State/TableSheetData.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
using Libplanet.Crypto;
using Nekoyume.TableData;

namespace Mimir.Worker.Models;

public class TableSheetData : BaseData
{
public Address Address { get; }
public string Name { get; }
public ISheet Sheet { get; }
public string Raw { get; }

public TableSheetData(Address address, string name, ISheet sheet, string raw)
{
Address = address;
Name = name;
Sheet = sheet;
Raw = raw;
}
}
4 changes: 2 additions & 2 deletions Mimir.Worker/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@
builder.Services.AddSingleton(serviceProvider =>
{
var config = serviceProvider.GetRequiredService<IOptions<Configuration>>().Value;
var logger = serviceProvider.GetRequiredService<ILogger<MongoDbWorker>>();
return new MongoDbWorker(logger, config.MongoDbConnectionString, config.DatabaseName);
var logger = serviceProvider.GetRequiredService<ILogger<MongoDbStore>>();
return new MongoDbStore(logger, config.MongoDbConnectionString, config.DatabaseName);
});
builder.Services.AddHostedService<Initializer>();

Expand Down
14 changes: 6 additions & 8 deletions Mimir.Worker/Scrapper/ArenaScrapper.cs
Original file line number Diff line number Diff line change
@@ -1,17 +1,15 @@
using Mimir.Worker.Events;
using Mimir.Worker.Services;
using Mimir.Worker.Models;
using Nekoyume.TableData;
using Libplanet.Crypto;

namespace Mimir.Worker.Scrapper;

public class ArenaScrapper(ILogger<ArenaScrapper> logger, IStateService service, MongoDbWorker worker)
public class ArenaScrapper(ILogger<ArenaScrapper> logger, IStateService service, MongoDbStore store)
{
private readonly ILogger<ArenaScrapper> _logger = logger;

private readonly IStateService _stateService = service;
private readonly MongoDbWorker _worker = worker;
private readonly MongoDbStore _store = store;

public async Task ExecuteAsync(CancellationToken cancellationToken)
{
Expand All @@ -24,17 +22,17 @@ public async Task ExecuteAsync(CancellationToken cancellationToken)
const int maxBufferSize = 10;
async Task FlushBufferAsync()
{
await _worker.BulkUpsertArenaDataAsync(buffer.Select(x => x.Arena).ToList());
await _worker.BulkUpsertAvatarDataAsync(buffer.Select(x => x.Avatar).ToList());
await _store.BulkUpsertArenaDataAsync(buffer.Select(x => x.Arena).ToList());
await _store.BulkUpsertAvatarDataAsync(buffer.Select(x => x.Avatar).ToList());
foreach (var pair in buffer)
{
await _worker.LinkAvatarWithArenaAsync(pair.AvatarAddress);
await _store.LinkAvatarWithArenaAsync(pair.AvatarAddress);
}

buffer.Clear();
}

await _worker.UpdateLatestBlockIndex(latestBlockIndex);
await _store.UpdateLatestBlockIndex(latestBlockIndex);

foreach (var avatarAddress in arenaParticipants.AvatarAddresses)
{
Expand Down
64 changes: 64 additions & 0 deletions Mimir.Worker/Scrapper/TableSheetScrapper.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
using Bencodex;
using Bencodex.Types;
using Libplanet.Common;
using Libplanet.Crypto;
using Mimir.Worker.Events;
using Mimir.Worker.Models;
using Mimir.Worker.Services;
using Nekoyume;
using Nekoyume.Action;
using Nekoyume.TableData;

namespace Mimir.Worker.Scrapper;

public class TableSheetScrapper(
ILogger<TableSheetScrapper> logger,
IStateService service,
MongoDbStore store
)
{
private readonly ILogger<TableSheetScrapper> _logger = logger;

private readonly IStateService _stateService = service;
private readonly MongoDbStore _store = store;

public async Task ExecuteAsync(CancellationToken cancellationToken)
{
var latestBlockIndex = await service.GetLatestIndex();
var stateGetter = _stateService.At();

var sheetTypes = typeof(ISheet)
.Assembly.GetTypes()
.Where(type =>
type.Namespace is { } @namespace
&& @namespace.StartsWith($"{nameof(Nekoyume)}.{nameof(Nekoyume.TableData)}")
&& !type.IsAbstract
&& typeof(ISheet).IsAssignableFrom(type)
);

foreach (var sheetType in sheetTypes)
{
var sheetAddress = Addresses.TableSheet.Derive(sheetType.Name);
var sheetState = await _stateService.GetState(sheetAddress);
if (sheetState is not Text sheetValue)
{
throw new ArgumentException(nameof(sheetType));
}

var sheetInstance = Activator.CreateInstance(sheetType);
if (sheetInstance is not ISheet sheet)
{
throw new InvalidCastException($"Type {sheetType.Name} cannot be cast to ISheet.");
}

sheet.Set(sheetValue.Value);
var sheetData = new TableSheetData(
sheetAddress,
sheetType.Name,
sheet,
ByteUtil.Hex(new Codec().Encode(sheetState))
);
await _store.InsertTableSheets(sheetData);
}
}
}
Original file line number Diff line number Diff line change
@@ -1,35 +1,43 @@
using MongoDB.Driver;
using Libplanet.Crypto;
using Mimir.Worker.Models;
using Mimir.Worker.Scrapper;
using MongoDB.Bson;
using MongoDB.Driver;
using Mimir.Worker.Models;
using MongoDB.Driver;
using Nekoyume.TableData;

namespace Mimir.Worker.Services;

public class MongoDbWorker
public class MongoDbStore
{
private readonly ILogger<MongoDbWorker> _logger;
private readonly ILogger<MongoDbStore> _logger;

private readonly IMongoClient _client;

private readonly IMongoDatabase _database;

private readonly string _databaseName;

private IMongoCollection<BsonDocument> ArenaCollection => _database.GetCollection<BsonDocument>("arena");
private IMongoCollection<BsonDocument> ArenaCollection =>
_database.GetCollection<BsonDocument>("arena");

private IMongoCollection<BsonDocument> AvatarCollection =>
_database.GetCollection<BsonDocument>("avatars");

private IMongoCollection<BsonDocument> AvatarCollection => _database.GetCollection<BsonDocument>("avatars");
private IMongoCollection<BsonDocument> MetadataCollection =>
_database.GetCollection<BsonDocument>("metadata");

private IMongoCollection<BsonDocument> MetadataCollection => _database.GetCollection<BsonDocument>("metadata");
private IMongoCollection<BsonDocument> TableSheetsCollection =>
_database.GetCollection<BsonDocument>("tableSheets");

public MongoDbWorker(ILogger<MongoDbWorker> logger, string connectionString, string databaseName)
public MongoDbStore(ILogger<MongoDbStore> logger, string connectionString, string databaseName)
{
_client = new MongoClient(connectionString);
_database = _client.GetDatabase(databaseName);
_logger = logger;
_databaseName = databaseName;
}

public async Task LinkAvatarWithArenaAsync(Address address)
{
var avatarFilter = Builders<BsonDocument>.Filter.Eq("Avatar.address", address.ToHex());
Expand All @@ -39,8 +47,13 @@ public async Task LinkAvatarWithArenaAsync(Address address)
var objectId = avatar["_id"].AsObjectId;
var arenaFilter = Builders<BsonDocument>.Filter.Eq("AvatarAddress", address.ToHex());
var update = Builders<BsonDocument>.Update.Set("AvatarObjectId", objectId);
var updateModel = new UpdateOneModel<BsonDocument>(arenaFilter, update) { IsUpsert = false };
await ArenaCollection.BulkWriteAsync(new List<WriteModel<BsonDocument>> { updateModel });
var updateModel = new UpdateOneModel<BsonDocument>(arenaFilter, update)
{
IsUpsert = false
};
await ArenaCollection.BulkWriteAsync(
new List<WriteModel<BsonDocument>> { updateModel }
);
}
}

Expand All @@ -50,10 +63,7 @@ public async Task UpdateLatestBlockIndex(long blockIndex)
var filter = Builders<BsonDocument>.Filter.Eq("_id", "SyncContext");
var update = Builders<BsonDocument>.Update.Set("LatestBlockIndex", blockIndex);
var updateModel = new UpdateOneModel<BsonDocument>(filter, update);
await MetadataCollection.BulkWriteAsync(new[]
{
updateModel
});
await MetadataCollection.BulkWriteAsync(new[] { updateModel });
}

public async Task<long> GetLatestBlockIndex()
Expand All @@ -71,9 +81,15 @@ public async Task BulkUpsertArenaDataAsync(List<ArenaData> arenaDatas)
{
foreach (var arenaData in arenaDatas)
{
var filter = Builders<BsonDocument>.Filter.Eq("AvatarAddress", arenaData.AvatarAddress.ToHex());
var filter = Builders<BsonDocument>.Filter.Eq(
"AvatarAddress",
arenaData.AvatarAddress.ToHex()
);
var bsonDocument = BsonDocument.Parse(arenaData.ToJson());
var upsertOne = new ReplaceOneModel<BsonDocument>(filter, bsonDocument) { IsUpsert = true };
var upsertOne = new ReplaceOneModel<BsonDocument>(filter, bsonDocument)
{
IsUpsert = true
};
bulkOps.Add(upsertOne);
}
if (bulkOps.Count > 0)
Expand All @@ -84,7 +100,7 @@ public async Task BulkUpsertArenaDataAsync(List<ArenaData> arenaDatas)

_logger.LogInformation($"Stored {bulkOps.Count} arena data");
}
catch(Exception ex)
catch (Exception ex)
{
_logger.LogError($"An error occurred during BulkUpsertArenaDataAsync: {ex.Message}");
}
Expand All @@ -98,9 +114,15 @@ public async Task BulkUpsertAvatarDataAsync(List<AvatarData> avatarDatas)
{
foreach (var avatarData in avatarDatas)
{
var filter = Builders<BsonDocument>.Filter.Eq("Avatar.address", avatarData.Avatar.address.ToHex());
var filter = Builders<BsonDocument>.Filter.Eq(
"Avatar.address",
avatarData.Avatar.address.ToHex()
);
var bsonDocument = BsonDocument.Parse(avatarData.ToJson());
var upsertOne = new ReplaceOneModel<BsonDocument>(filter, bsonDocument) { IsUpsert = true };
var upsertOne = new ReplaceOneModel<BsonDocument>(filter, bsonDocument)
{
IsUpsert = true
};
bulkOps.Add(upsertOne);
}
if (bulkOps.Count > 0)
Expand All @@ -110,15 +132,28 @@ public async Task BulkUpsertAvatarDataAsync(List<AvatarData> avatarDatas)

_logger.LogInformation($"Stored {bulkOps.Count} avatar data");
}
catch(Exception ex)
catch (Exception ex)
{
_logger.LogError($"An error occurred during BulkUpsertAvatarDataAsync: {ex.Message}");
}
}

public async Task InsertTableSheets(TableSheetData sheetData)
{
var filter = Builders<BsonDocument>.Filter.Eq("Address", sheetData.Address.ToHex());
var bsonDocument = BsonDocument.Parse(sheetData.ToJson());
await TableSheetsCollection.ReplaceOneAsync(
filter,
bsonDocument,
new ReplaceOptions { IsUpsert = true }
);
}

public async Task<bool> IsInitialized()
{
var names = await (await _client.GetDatabase(_databaseName).ListCollectionNamesAsync()).ToListAsync();
var names = await (
await _client.GetDatabase(_databaseName).ListCollectionNamesAsync()
).ToListAsync();
return names is not { } ns || !(ns.Contains("arena") && ns.Contains("avatars"));
}
}

0 comments on commit 02c9de6

Please sign in to comment.