WIP: more parallelization

This commit is contained in:
embeddedt 2023-01-07 11:11:43 -05:00
parent 975676f3cd
commit 9456eac7df
No known key found for this signature in database
GPG Key ID: A69433EC199B5613
11 changed files with 206 additions and 34 deletions

View File

@ -0,0 +1,75 @@
package org.embeddedt.modernfix.blockstate;
import com.google.common.base.Stopwatch;
import net.minecraft.block.Block;
import net.minecraft.block.BlockState;
import net.minecraft.util.Util;
import org.embeddedt.modernfix.ModernFix;
import org.embeddedt.modernfix.duck.IBlockState;
import org.embeddedt.modernfix.util.AsyncStopwatch;
import org.embeddedt.modernfix.util.BakeReason;
import org.embeddedt.modernfix.util.OrderedParallelModDispatcher;
import java.util.ArrayList;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
public class BlockStateCacheHandler {
public static void handleStateCache(BlockState state) {
if(BakeReason.currentBakeReason == BakeReason.FREEZE
|| BakeReason.currentBakeReason == BakeReason.REMOTE_SNAPSHOT_INJECT
|| (BakeReason.currentBakeReason == BakeReason.LOCAL_SNAPSHOT_INJECT && ModernFix.runningFirstInjection)) {
((IBlockState)state).clearCache();
} else {
state.initCache();
}
}
private static void handleStateCacheParallel(BlockState state, boolean force) {
if(force)
state.initCache();
else
handleStateCache(state);
}
public static void rebuildParallel(boolean force) {
Map<String, ArrayList<BlockState>> statesByModId = StreamSupport.stream(Block.BLOCK_STATE_REGISTRY.spliterator(), false)
.collect(Collectors.groupingBy(state -> state.getBlock().getRegistryName().getNamespace(), Collectors.toCollection(ArrayList::new)));
Stopwatch realtimeStopwatch = Stopwatch.createStarted();
AsyncStopwatch cpuStopwatch = new AsyncStopwatch();
/* For safety, do built-in blocks first */
cpuStopwatch.startMeasuringAsync();
ArrayList<BlockState> initialStates = statesByModId.remove("minecraft");
for(BlockState state : initialStates) {
handleStateCacheParallel(state, force);
}
cpuStopwatch.stopMeasuringAsync();
OrderedParallelModDispatcher.dispatchBlocking(Util.backgroundExecutor(), modId -> {
ArrayList<BlockState> states = statesByModId.get(modId);
if(states == null)
return;
cpuStopwatch.startMeasuringAsync();
states.removeIf(state -> {
try {
handleStateCacheParallel(state, force);
return true;
} catch(RuntimeException e) {
ModernFix.LOGGER.error("Error computing state cache for " + state + ": ", e);
return false;
}
});
cpuStopwatch.stopMeasuringAsync();
});
cpuStopwatch.startMeasuringAsync();
for(ArrayList<BlockState> remainingStates : statesByModId.values()) {
for(BlockState state : remainingStates) {
handleStateCacheParallel(state, force);
}
}
cpuStopwatch.stopMeasuringAsync();
realtimeStopwatch.stop();
ModernFix.LOGGER.info("CPU time spent rebuilding blockstate cache: " + cpuStopwatch.getCpuTime()/1000f + " seconds");
ModernFix.LOGGER.info("Real time spent rebuilding blockstate cache: " + realtimeStopwatch.elapsed(TimeUnit.MILLISECONDS)/1000f + " seconds");
}
}

View File

@ -30,7 +30,7 @@ public class ModernFixEarlyConfig {
this.addMixinRule("perf.async_jei", true);
this.addMixinRule("perf.thread_priorities", true);
this.addMixinRule("perf.preload_block_classes", true);
this.addMixinRule("perf.parallel_deferred_suppliers", true);
this.addMixinRule("perf.parallel_potentially_unsafe", true);
/* Mod compat */
if(FMLLoader.getLoadingModList().getModFileById("smoothboot") != null) {

View File

@ -0,0 +1,30 @@
package org.embeddedt.modernfix.mixin.perf.parallel_potentially_unsafe.parallel_blockstate_cache_rebuild;
import net.minecraft.block.Block;
import net.minecraft.block.BlockState;
import net.minecraft.util.ObjectIntIdentityMap;
import net.minecraft.world.gen.DebugChunkGenerator;
import net.minecraftforge.registries.GameData;
import net.minecraftforge.registries.IForgeRegistryInternal;
import net.minecraftforge.registries.RegistryManager;
import org.embeddedt.modernfix.blockstate.BlockStateCacheHandler;
import org.spongepowered.asm.mixin.Mixin;
import org.spongepowered.asm.mixin.injection.At;
import org.spongepowered.asm.mixin.injection.Inject;
import org.spongepowered.asm.mixin.injection.callback.CallbackInfo;
@Mixin(targets = { "net/minecraftforge/registries/GameData$BlockCallbacks" })
public class BlockCallbacksMixin {
@Inject(method = "onBake", at = @At(value = "INVOKE", target = "Ljava/util/Iterator;hasNext()Z"), cancellable = true, remap = false)
private void computeCacheParallel(IForgeRegistryInternal<Block> owner, RegistryManager stage, CallbackInfo ci) {
ci.cancel();
ObjectIntIdentityMap<BlockState> blockstateMap = GameData.getBlockStateIDMap();
for (Block block : owner) {
for (BlockState state : block.getStateDefinition().getPossibleStates()) {
blockstateMap.add(state);
}
}
BlockStateCacheHandler.rebuildParallel(false);
DebugChunkGenerator.initValidStates();
}
}

View File

@ -0,0 +1,17 @@
package org.embeddedt.modernfix.mixin.perf.parallel_potentially_unsafe.parallel_blockstate_cache_rebuild;
import net.minecraft.block.Blocks;
import org.embeddedt.modernfix.blockstate.BlockStateCacheHandler;
import org.spongepowered.asm.mixin.Mixin;
import org.spongepowered.asm.mixin.injection.At;
import org.spongepowered.asm.mixin.injection.Inject;
import org.spongepowered.asm.mixin.injection.callback.CallbackInfo;
@Mixin(Blocks.class)
public class BlocksMixin {
@Inject(method = "rebuildCache", at = @At("HEAD"), cancellable = true)
private static void rebuildParallel(CallbackInfo ci) {
ci.cancel();
BlockStateCacheHandler.rebuildParallel(true);
}
}

View File

@ -1,4 +1,4 @@
package org.embeddedt.modernfix.mixin.perf.parallel_deferred_suppliers;
package org.embeddedt.modernfix.mixin.perf.parallel_potentially_unsafe.parallel_deferred_suppliers;
import net.minecraftforge.registries.DeferredRegister;
import net.minecraftforge.registries.IForgeRegistry;

View File

@ -1,4 +1,4 @@
package org.embeddedt.modernfix.mixin.perf.parallel_deferred_suppliers;
package org.embeddedt.modernfix.mixin.perf.parallel_potentially_unsafe.parallel_deferred_suppliers;
import net.minecraft.util.ResourceLocation;
import net.minecraftforge.fml.ModLoadingStage;

View File

@ -2,6 +2,7 @@ package org.embeddedt.modernfix.mixin.perf.reduce_blockstate_cache_rebuilds;
import net.minecraft.block.BlockState;
import org.embeddedt.modernfix.ModernFix;
import org.embeddedt.modernfix.blockstate.BlockStateCacheHandler;
import org.embeddedt.modernfix.duck.IBlockState;
import org.embeddedt.modernfix.util.BakeReason;
import org.spongepowered.asm.mixin.Mixin;
@ -12,12 +13,6 @@ import org.spongepowered.asm.mixin.injection.Redirect;
public class BlockCallbacksMixin {
@Redirect(method = "onBake", at = @At(value = "INVOKE", target = "Lnet/minecraft/block/BlockState;initCache()V"))
private void skipCacheIfAllowed(BlockState state) {
if(BakeReason.currentBakeReason == BakeReason.FREEZE
|| BakeReason.currentBakeReason == BakeReason.REMOTE_SNAPSHOT_INJECT
|| (BakeReason.currentBakeReason == BakeReason.LOCAL_SNAPSHOT_INJECT && ModernFix.runningFirstInjection)) {
((IBlockState)state).clearCache();
} else {
state.initCache();
}
BlockStateCacheHandler.handleStateCache(state);
}
}

View File

@ -10,6 +10,7 @@ import net.minecraftforge.fml.common.ObfuscationReflectionHelper;
import net.minecraftforge.fml.loading.moddiscovery.ModInfo;
import net.minecraftforge.forgespi.language.IModInfo;
import org.embeddedt.modernfix.ModernFix;
import org.embeddedt.modernfix.util.AsyncStopwatch;
import org.embeddedt.modernfix.util.CachedSupplier;
import org.embeddedt.modernfix.util.OrderedParallelModDispatcher;
@ -39,21 +40,24 @@ public class DeferredRegisterBaker {
if(registrySupplierMap == null)
return;
Stopwatch realtimeStopwatch = Stopwatch.createStarted();
AtomicLong cpuLong = new AtomicLong(0);
OrderedParallelModDispatcher.dispatchBlocking(modId -> {
AsyncStopwatch cpuStopwatch = new AsyncStopwatch();
OrderedParallelModDispatcher.dispatchBlocking(ModWorkManager.parallelExecutor(), modId -> {
List<CachedSupplier<?>> suppliersToCompute = registrySupplierMap.get(modId);
if (suppliersToCompute == null || suppliersToCompute.size() == 0) {
return;
}
Stopwatch stopwatch = Stopwatch.createStarted();
cpuStopwatch.startMeasuringAsync();
for (CachedSupplier<?> supplier : suppliersToCompute) {
supplier.compute();
try {
supplier.compute();
} catch(RuntimeException e) {
e.printStackTrace();
}
}
stopwatch.stop();
cpuLong.addAndGet(stopwatch.elapsed(TimeUnit.MILLISECONDS));
cpuStopwatch.stopMeasuringAsync();
});
realtimeStopwatch.stop();
ModernFix.LOGGER.info("CPU time spent constructing " + registry + " suppliers: " + cpuLong.get()/1000f + " seconds");
ModernFix.LOGGER.info("CPU time spent constructing " + registry + " suppliers: " + cpuStopwatch.getCpuTime()/1000f + " seconds");
ModernFix.LOGGER.info("Real time spent constructing " + registry + " suppliers: " + realtimeStopwatch.elapsed(TimeUnit.MILLISECONDS)/1000f + " seconds");
}
}

View File

@ -0,0 +1,27 @@
package org.embeddedt.modernfix.util;
import com.google.common.base.Stopwatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
public class AsyncStopwatch {
private final AtomicLong cpuTimeMs = new AtomicLong(0);
private final ThreadLocal<Stopwatch> threadStopwatch = ThreadLocal.withInitial(Stopwatch::createUnstarted);
public void startMeasuringAsync() {
threadStopwatch.get().start();
}
public void stopMeasuringAsync() {
Stopwatch watch = threadStopwatch.get();
watch.stop();
long elapsed = watch.elapsed(TimeUnit.MILLISECONDS);
cpuTimeMs.addAndGet(elapsed);
watch.reset();
}
public long getCpuTime() {
return cpuTimeMs.get();
}
}

View File

@ -1,5 +1,6 @@
package org.embeddedt.modernfix.util;
import com.google.common.base.Preconditions;
import com.google.common.base.Stopwatch;
import net.minecraftforge.fml.ModContainer;
import net.minecraftforge.fml.ModList;
@ -8,14 +9,21 @@ import net.minecraftforge.fml.ModWorkManager;
import net.minecraftforge.fml.common.ObfuscationReflectionHelper;
import net.minecraftforge.fml.loading.moddiscovery.ModInfo;
import net.minecraftforge.forgespi.language.IModInfo;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.Marker;
import org.apache.logging.log4j.MarkerManager;
import org.embeddedt.modernfix.ModernFix;
import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.stream.Collectors;
/**
* Iterates over all mods in the game, parallelizing where possible while preserving dependency ordering.
@ -23,8 +31,9 @@ import java.util.function.Supplier;
* Can also be given a list of mods to skip.
*/
public class OrderedParallelModDispatcher {
public static void dispatchBlocking(Consumer<String> task, Collection<String> modIDsToFilter) {
HashSet<String> finishedMods = new HashSet<>(modIDsToFilter);
private static final Marker DISPATCHER = MarkerManager.getMarker("OrderedParallelModDispatcher");
public static void dispatchBlocking(Executor executor, Consumer<String> task, Collection<String> modIDsToFilter) {
Set<String> finishedMods = Collections.synchronizedSet(new HashSet<>(modIDsToFilter));
HashMap<String, CompletableFuture<?>> submittedFutures = new HashMap<>();
int numMods = ModList.get().getMods().size();
Semaphore jobWaitingSemaphore = new Semaphore(0);
@ -33,27 +42,40 @@ public class OrderedParallelModDispatcher {
remainingModList.removeIf(modInfo -> {
if(finishedMods.contains(modInfo.getModId()))
return true;
boolean allDependenciesLoaded = true;
for(IModInfo.ModVersion dep : modInfo.getDependencies()) {
if(dep.isMandatory() && !finishedMods.contains(dep.getModId())) {
allDependenciesLoaded = false;
break;
}
}
if(!allDependenciesLoaded)
List<String> missingDependencies = modInfo.getDependencies().stream()
.filter(IModInfo.ModVersion::isMandatory)
.map(IModInfo.ModVersion::getModId)
.filter(modId -> !finishedMods.contains(modId))
.collect(Collectors.toList());
if(missingDependencies.size() > 0) {
//ModernFix.LOGGER.debug(DISPATCHER, "Cannot process " + modInfo.getModId() + ", as it is waiting on mods: [" + String.join(", ", missingDependencies) + "]");
return false;
}
Optional<? extends ModContainer> modContainerOpt = ModList.get().getModContainerById(modInfo.getModId());
if(!modContainerOpt.isPresent())
throw new IllegalStateException("Can't find mod container");
ModContainer container = modContainerOpt.get();
//ModernFix.LOGGER.debug(DISPATCHER, "Submitting job for " + modInfo.getModId());
submittedFutures.put(modInfo.getModId(), CompletableFuture.runAsync(() -> {
Supplier<?> contextExtension = ObfuscationReflectionHelper.getPrivateValue(ModContainer.class, container, "contextExtension");
ModLoadingContext.get().setActiveContainer(container, contextExtension.get());
task.accept(modInfo.getModId());
try {
task.accept(modInfo.getModId());
} catch(RuntimeException e) {
e.printStackTrace();
}
/*
* We cannot rely on the main thread to correctly mark us as done, as it might start running
* before the future is marked as complete. So we add the mod to the finished set ourselves.
*/
finishedMods.add(modInfo.getModId());
jobWaitingSemaphore.release();
}, ModWorkManager.parallelExecutor()));
//ModLoadingContext.get().setActiveContainer(null, null);
}, executor));
return true;
});
Preconditions.checkState(submittedFutures.size() > 0, "The semaphore will block forever!");
//ModernFix.LOGGER.debug(DISPATCHER, "Waiting for one of [" + String.join(", ", submittedFutures.keySet()) + "] to finish...");
try {
jobWaitingSemaphore.acquire();
} catch(InterruptedException e) {
@ -61,7 +83,7 @@ public class OrderedParallelModDispatcher {
}
submittedFutures.entrySet().removeIf(entry -> {
if(entry.getValue().isDone()) {
finishedMods.add(entry.getKey());
//ModernFix.LOGGER.debug(DISPATCHER, "Job finished for " + entry.getKey());
return true;
}
return false;
@ -69,7 +91,7 @@ public class OrderedParallelModDispatcher {
}
}
public static void dispatchBlocking(Consumer<String> task) {
dispatchBlocking(task, Collections.emptyList());
public static void dispatchBlocking(Executor executor, Consumer<String> task) {
dispatchBlocking(executor, task, Collections.emptyList());
}
}

View File

@ -19,8 +19,10 @@
"perf.boost_worker_count.UtilMixin",
"perf.thread_priorities.UtilMixin",
"perf.preload_block_classes.GameDataMixin",
"perf.parallel_deferred_suppliers.DeferredRegisterMixin",
"perf.parallel_deferred_suppliers.GameDataMixin"
"perf.parallel_potentially_unsafe.parallel_deferred_suppliers.DeferredRegisterMixin",
"perf.parallel_potentially_unsafe.parallel_deferred_suppliers.GameDataMixin",
"perf.parallel_potentially_unsafe.parallel_blockstate_cache_rebuild.BlocksMixin",
"perf.parallel_potentially_unsafe.parallel_blockstate_cache_rebuild.BlockCallbacksMixin"
],
"client": [
"perf.skip_first_datapack_reload.MinecraftMixin",