add RenderManager: improve CPU control with global render queue and parallel trace pooling

This commit is contained in:
2026-06-21 17:49:57 +02:00
parent 220cda1deb
commit f83ccdc7ff
7 changed files with 312 additions and 62 deletions
@@ -0,0 +1,151 @@
package eu.mhsl.minecraft.pixelpics.render;
import org.bukkit.Bukkit;
import org.bukkit.plugin.Plugin;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinWorkerThread;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import java.util.function.Function;
/**
* Server-friendly scheduling for photo renders. Bounds CPU use and protects the server tick by:
* <ul>
* <li><b>Per player:</b> at most one photo in the system at a time (running or queued).</li>
* <li><b>Globally:</b> at most {@code maxConcurrent} photos render at once, with up to
* {@code queueSize} more waiting; further requests are rejected.</li>
* <li><b>CPU:</b> the ray tracing runs on a dedicated {@link ForkJoinPool} of {@code threads}
* low-priority worker threads ({@link Thread#MIN_PRIORITY}), so it tends to use spare CPU and
* never fans out across every core like the default common pool would.</li>
* </ul>
*
* <p>Reserve a slot with {@link #tryReserve(UUID)}; if accepted, run the heavy work via
* {@link #dispatch} (which releases the slot on completion) — or call {@link #release(UUID)} if you
* bail out before dispatching.
*/
public final class RenderManager {
public enum Outcome { ACCEPTED, USER_BUSY, QUEUE_FULL }
private final Plugin plugin;
private final ForkJoinPool tracePool;
private final ThreadPoolExecutor dispatcher;
private final ScheduledExecutorService watchdog;
private final Set<UUID> activeUsers = ConcurrentHashMap.newKeySet();
private final AtomicInteger inFlight = new AtomicInteger();
private final int capacity;
private final long timeoutMillis;
public RenderManager(Plugin plugin, int threads, int maxConcurrent, int queueSize, int timeoutSeconds) {
this.plugin = plugin;
this.capacity = maxConcurrent + queueSize;
this.timeoutMillis = Math.max(1L, timeoutSeconds) * 1000L;
this.tracePool = new ForkJoinPool(threads, lowPriorityForkJoinFactory(), null, false);
this.dispatcher = new ThreadPoolExecutor(
maxConcurrent, maxConcurrent, 30, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(), // capacity is enforced by inFlight, not this queue
lowPriorityThreadFactory());
this.dispatcher.allowCoreThreadTimeOut(true);
this.watchdog = Executors.newSingleThreadScheduledExecutor(runnable -> {
Thread thread = new Thread(runnable, "PixelPics-render-watchdog");
thread.setDaemon(true);
return thread;
});
}
/** The bounded, low-priority pool the ray tracer must run its parallel work on. */
public ForkJoinPool tracePool() {
return tracePool;
}
/** Atomically reserves a slot for {@code user}. On {@code ACCEPTED}, you must later release it. */
public Outcome tryReserve(UUID user) {
if (!activeUsers.add(user)) return Outcome.USER_BUSY;
if (inFlight.incrementAndGet() > capacity) {
inFlight.decrementAndGet();
activeUsers.remove(user);
return Outcome.QUEUE_FULL;
}
return Outcome.ACCEPTED;
}
/** Releases a reservation made by {@link #tryReserve}. Safe to call once per accepted reserve. */
public void release(UUID user) {
if (activeUsers.remove(user)) {
inFlight.decrementAndGet();
}
}
/**
* Runs {@code work} off the main thread (honoring the global concurrency limit), then delivers the
* result back on the main thread via {@code onSuccess}, or {@code onFailure} if it fails, returns
* null, or exceeds the configured timeout. The {@link AtomicBoolean} handed to {@code work} is set
* once the deadline passes — {@code work} should poll it and bail out cooperatively. Releases the
* caller's reservation when done. Requires a prior {@link #tryReserve} success.
*/
public <T> void dispatch(UUID user, Function<AtomicBoolean, T> work, Consumer<T> onSuccess, Runnable onFailure) {
dispatcher.execute(() -> {
AtomicBoolean cancelled = new AtomicBoolean(false);
ScheduledFuture<?> deadline = watchdog.schedule(() -> cancelled.set(true), timeoutMillis, TimeUnit.MILLISECONDS);
T result = null;
boolean ok = false;
try {
result = work.apply(cancelled);
ok = result != null && !cancelled.get();
} catch (Throwable t) {
plugin.getLogger().warning("Render job failed: " + t);
} finally {
deadline.cancel(false);
if (cancelled.get()) {
plugin.getLogger().warning("Render for " + user + " aborted after "
+ (timeoutMillis / 1000) + "s (timeout).");
}
release(user);
}
T finalResult = result;
boolean finalOk = ok;
Bukkit.getScheduler().runTask(plugin, () -> {
if (finalOk) onSuccess.accept(finalResult);
else onFailure.run();
});
});
}
public void shutdown() {
watchdog.shutdownNow();
dispatcher.shutdownNow();
tracePool.shutdownNow();
}
private static ForkJoinPool.ForkJoinWorkerThreadFactory lowPriorityForkJoinFactory() {
return pool -> {
ForkJoinWorkerThread thread = ForkJoinPool.defaultForkJoinWorkerThreadFactory.newThread(pool);
thread.setName("PixelPics-trace-" + thread.getPoolIndex());
thread.setPriority(Thread.MIN_PRIORITY);
thread.setDaemon(true);
return thread;
};
}
private static ThreadFactory lowPriorityThreadFactory() {
AtomicInteger counter = new AtomicInteger();
return runnable -> {
Thread thread = new Thread(runnable, "PixelPics-render-" + counter.incrementAndGet());
thread.setDaemon(true);
thread.setPriority(Thread.MIN_PRIORITY);
return thread;
};
}
}
@@ -30,6 +30,8 @@ import java.awt.image.BufferedImage;
import java.awt.image.DataBufferInt;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.logging.Logger;
import java.util.stream.IntStream;
@@ -54,16 +56,25 @@ public class DefaultScreenRenderer implements Renderer {
private final BlockEntityBaker blockEntityBaker;
private final DecorationBaker decorationBaker;
private final Logger logger;
/** Bounds parallel ray tracing to a fixed, low-priority pool; {@code null} = use the common pool. */
private final ForkJoinPool tracePool;
public DefaultScreenRenderer(BlockModelRegistry registry, BiomeTintProvider tintProvider,
TextureCache textures, CemBaker entityBaker,
BlockEntityBaker blockEntityBaker, Logger logger) {
this(registry, tintProvider, textures, entityBaker, blockEntityBaker, logger, null);
}
public DefaultScreenRenderer(BlockModelRegistry registry, BiomeTintProvider tintProvider,
TextureCache textures, CemBaker entityBaker,
BlockEntityBaker blockEntityBaker, Logger logger, ForkJoinPool tracePool) {
SkyRenderer skyRenderer = new SkyRenderer(textures);
this.raytracer = new SnapshotRaytracer(registry, tintProvider, skyRenderer, MAX_DISTANCE, REFLECTION_DEPTH);
this.entityBaker = entityBaker;
this.blockEntityBaker = blockEntityBaker;
this.decorationBaker = new DecorationBaker(textures);
this.logger = logger;
this.tracePool = tracePool;
}
/** Convenience: prepare and execute in one call (must run on the main thread). */
@@ -94,6 +105,15 @@ public class DefaultScreenRenderer implements Renderer {
/** Traces every (super)ray in parallel, then downsamples gamma-correctly. Safe off the main thread. */
public BufferedImage execute(RenderJob job) {
return execute(job, new AtomicBoolean(false));
}
/**
* As {@link #execute(RenderJob)}, but cooperatively abortable: once {@code cancelled} is set, the
* parallel loops short-circuit (remaining work becomes a no-op) so a stuck/overlong render drains
* out quickly. The returned image is then partial and should be discarded by the caller.
*/
public BufferedImage execute(RenderJob job, AtomicBoolean cancelled) {
int finalW = job.width();
int finalH = job.height();
int superW = finalW * SSAA;
@@ -105,12 +125,14 @@ public class DefaultScreenRenderer implements Renderer {
job.decorations(), decorationBaker);
int[] superBuf = new int[rayMap.size()];
IntStream.range(0, rayMap.size()).parallel().forEach(i ->
superBuf[i] = raytracer.trace(snapshot, origin, rayMap.get(i), sky, scene));
runParallel(() -> IntStream.range(0, rayMap.size()).parallel().forEach(i -> {
if (!cancelled.get()) superBuf[i] = raytracer.trace(snapshot, origin, rayMap.get(i), sky, scene);
}));
BufferedImage image = new BufferedImage(finalW, finalH, BufferedImage.TYPE_INT_RGB);
int[] imageData = ((DataBufferInt) image.getRaster().getDataBuffer()).getData();
IntStream.range(0, finalH).parallel().forEach(fy -> {
runParallel(() -> IntStream.range(0, finalH).parallel().forEach(fy -> {
if (cancelled.get()) return;
int[] block = new int[SSAA * SSAA];
for (int fx = 0; fx < finalW; fx++) {
int n = 0;
@@ -122,10 +144,23 @@ public class DefaultScreenRenderer implements Renderer {
}
imageData[fy * finalW + fx] = ColorUtil.averageLinear(block, 0, n);
}
});
}));
return image;
}
/**
* Runs a parallel-stream task. With a {@link #tracePool} the work is confined to that bounded,
* low-priority pool (parallel streams adopt the pool of the running fork-join worker); without one
* it runs inline on the common pool (used by the offline render tools).
*/
private void runParallel(Runnable task) {
if (tracePool == null) {
task.run();
} else {
tracePool.submit(task).join();
}
}
private List<Vector> buildRayMap(Location eyeLocation, int width, int height) {
Vector lineDirection = eyeLocation.getDirection();
@@ -129,14 +129,12 @@ public final class EntitySnapshotBuilder {
} else if (e instanceof org.bukkit.entity.AbstractHorse ah) {
// Skeleton/zombie horse: only saddle (no colour/markings/armor variants).
saddle = isSaddled(ah);
} else if (e instanceof org.bukkit.entity.AbstractNautilus && e instanceof LivingEntity nl) {
} else if (e instanceof org.bukkit.entity.AbstractNautilus nl) {
// Nautilus body armor + saddle are same-UV overlays (like horse armor).
org.bukkit.inventory.EntityEquipment eq = nl.getEquipment();
if (eq != null) {
bodyEquip = equipAsset(eq.getItem(org.bukkit.inventory.EquipmentSlot.BODY));
org.bukkit.inventory.ItemStack sd = eq.getItem(org.bukkit.inventory.EquipmentSlot.SADDLE);
saddle = sd != null && !sd.getType().isAir();
}
bodyEquip = equipAsset(eq.getItem(org.bukkit.inventory.EquipmentSlot.BODY));
org.bukkit.inventory.ItemStack sd = eq.getItem(org.bukkit.inventory.EquipmentSlot.SADDLE);
saddle = !sd.getType().isAir();
} else if (e instanceof org.bukkit.entity.Fox f) {
variant = keyOf(f.getFoxType());
} else if (e instanceof org.bukkit.entity.MushroomCow mc) {
@@ -149,7 +147,6 @@ public final class EntitySnapshotBuilder {
variant = s.getColor() == null ? null : keyOf(s.getColor());
} else if (e instanceof org.bukkit.entity.ZombieVillager zv) {
variant = keyOf(zv.getVillagerType());
profession = keyOf(zv.getVillagerProfession());
// ZombieVillager exposes no level via Bukkit -> no profession-level badge (matches vanilla).
} else if (e instanceof org.bukkit.entity.Villager vi) {
variant = keyOf(vi.getVillagerType());
@@ -265,10 +262,12 @@ public final class EntitySnapshotBuilder {
/** Registry/Keyed values yield their key path; plain enums yield their lower-case name. */
private static String keyOf(Object o) {
if (o == null) return null;
if (o instanceof org.bukkit.Keyed k) return k.getKey().getKey();
if (o instanceof Enum<?> en) return en.name().toLowerCase(java.util.Locale.ROOT);
return o.toString().toLowerCase(java.util.Locale.ROOT);
return switch(o) {
case null -> null;
case org.bukkit.Keyed k -> k.getKey().getKey();
case Enum<?> en -> en.name().toLowerCase(java.util.Locale.ROOT);
default -> o.toString().toLowerCase(java.util.Locale.ROOT);
};
}
/** Returns {skinUrl, model} from the player's profile texture property, or {null, null}. */