fix embed queue
This commit is contained in:
parent
ded4f4db26
commit
8d4a7b1729
13 changed files with 188 additions and 32 deletions
|
|
@ -1,15 +1,15 @@
|
|||
using System.Collections.Concurrent;
|
||||
using Catalogger.Backend.Cache;
|
||||
using Catalogger.Backend.Database.Models;
|
||||
using Catalogger.Backend.Extensions;
|
||||
using Humanizer;
|
||||
using Remora.Discord.API;
|
||||
using Remora.Discord.API.Abstractions.Objects;
|
||||
using Remora.Discord.API.Abstractions.Rest;
|
||||
using Remora.Rest.Core;
|
||||
using Guild = Catalogger.Backend.Database.Models.Guild;
|
||||
|
||||
namespace Catalogger.Backend.Services;
|
||||
|
||||
// TODO: this entire class is a mess, clean it up
|
||||
public class WebhookExecutorService(
|
||||
Config config,
|
||||
ILogger logger,
|
||||
|
|
@ -20,6 +20,7 @@ public class WebhookExecutorService(
|
|||
private readonly ILogger _logger = logger.ForContext<WebhookExecutorService>();
|
||||
private readonly Snowflake _applicationId = DiscordSnowflake.New(config.Discord.ApplicationId);
|
||||
private readonly ConcurrentDictionary<ulong, ConcurrentQueue<IEmbed>> _cache = new();
|
||||
private readonly ConcurrentDictionary<ulong, object> _locks = new();
|
||||
private readonly ConcurrentDictionary<ulong, Timer> _timers = new();
|
||||
private IUser? _selfUser;
|
||||
|
||||
|
|
@ -33,29 +34,52 @@ public class WebhookExecutorService(
|
|||
await QueueLogAsync(logChannel.Value, embed);
|
||||
}
|
||||
|
||||
private List<IEmbed> TakeFromQueue(ulong channelId)
|
||||
{
|
||||
var queue = _cache.GetOrAdd(channelId, []);
|
||||
var channelLock = _locks.GetOrAdd(channelId, channelId);
|
||||
lock (channelLock)
|
||||
{
|
||||
var embeds = new List<IEmbed>();
|
||||
for (var i = 0; i < 5; i++)
|
||||
{
|
||||
if (!queue.TryDequeue(out var embed)) break;
|
||||
embeds.Add(embed);
|
||||
}
|
||||
|
||||
return embeds;
|
||||
}
|
||||
}
|
||||
|
||||
public async Task QueueLogAsync(ulong channelId, IEmbed embed)
|
||||
{
|
||||
_logger.Debug("Queueing embed for channel {ChannelId}", channelId);
|
||||
var webhook = await webhookCache.GetOrFetchWebhookAsync(channelId, id => FetchWebhookAsync(id));
|
||||
|
||||
var queue = _cache.GetOrAdd(channelId, []);
|
||||
if (queue.Count >= 5)
|
||||
await SendLogsAsync(channelId);
|
||||
queue.Enqueue(embed);
|
||||
_cache[channelId] = queue;
|
||||
|
||||
await SetTimer(channelId, queue);
|
||||
}
|
||||
|
||||
private async Task SetTimer(ulong channelId, ConcurrentQueue<IEmbed> queue)
|
||||
{
|
||||
if (_timers.TryGetValue(channelId, out var existingTimer)) await existingTimer.DisposeAsync();
|
||||
|
||||
_timers[channelId] = new Timer(_ =>
|
||||
{
|
||||
var __ = SendLogsAsync(channelId);
|
||||
_logger.Debug("Sending 5 queued embeds");
|
||||
|
||||
var __ = SendLogsAsync(channelId, TakeFromQueue(channelId));
|
||||
if (!queue.IsEmpty)
|
||||
{
|
||||
if (_timers.TryGetValue(channelId, out var timer)) timer.Dispose();
|
||||
var ___ = SetTimer(channelId, queue);
|
||||
}
|
||||
}, null, 3000, Timeout.Infinite);
|
||||
}
|
||||
|
||||
private async Task SendLogsAsync(ulong channelId)
|
||||
private async Task SendLogsAsync(ulong channelId, List<IEmbed> embeds)
|
||||
{
|
||||
var queue = _cache.GetValueOrDefault(channelId);
|
||||
if (queue == null) return;
|
||||
var embeds = queue.Take(5).ToList();
|
||||
_logger.Debug("Sending {Count} embeds to channel {ChannelId}", embeds.Count, channelId);
|
||||
if (embeds.Count == 0) return;
|
||||
|
||||
var webhook = await webhookCache.GetOrFetchWebhookAsync(channelId, id => FetchWebhookAsync(id));
|
||||
|
||||
|
|
@ -63,6 +87,18 @@ public class WebhookExecutorService(
|
|||
embeds: embeds, username: _selfUser!.Username, avatarUrl: _selfUser.AvatarUrl());
|
||||
}
|
||||
|
||||
public async Task SendLogWithAttachmentsAsync(ulong channelId, IEmbed embed, IEnumerable<FileData> files)
|
||||
{
|
||||
var attachments = files
|
||||
.Select<FileData, OneOf.OneOf<FileData, IPartialAttachment>>(f => f)
|
||||
.ToList();
|
||||
|
||||
var webhook = await webhookCache.GetOrFetchWebhookAsync(channelId, id => FetchWebhookAsync(id));
|
||||
await webhookApi.ExecuteWebhookAsync(DiscordSnowflake.New(webhook.Id), webhook.Token, shouldWait: false,
|
||||
embeds: new List<IEmbed>([embed]), attachments: attachments, username: _selfUser!.Username,
|
||||
avatarUrl: _selfUser.AvatarUrl());
|
||||
}
|
||||
|
||||
private async Task<IWebhook> FetchWebhookAsync(Snowflake channelId, CancellationToken ct = default)
|
||||
{
|
||||
var channelWebhooks =
|
||||
|
|
@ -78,14 +114,14 @@ public class WebhookExecutorService(
|
|||
ulong? userId = null)
|
||||
{
|
||||
if (channelId == null) return GetDefaultLogChannel(guild, logChannelType);
|
||||
if (!channelCache.GetChannel(channelId.Value, out var channel)) return null;
|
||||
if (!channelCache.TryGet(channelId.Value, out var channel)) return null;
|
||||
|
||||
Snowflake? categoryId;
|
||||
if (channel.Type is ChannelType.AnnouncementThread or ChannelType.PrivateThread or ChannelType.PublicThread)
|
||||
{
|
||||
// parent_id should always have a value for threads
|
||||
channelId = channel.ParentID.Value!.Value;
|
||||
if (!channelCache.GetChannel(channelId.Value, out var parentChannel))
|
||||
if (!channelCache.TryGet(channelId.Value, out var parentChannel))
|
||||
return GetDefaultLogChannel(guild, logChannelType);
|
||||
categoryId = parentChannel.ParentID.Value;
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue