Removed forge packet queue from pipeline

This commit is contained in:
Adrian Bergqvist 2023-04-21 22:24:26 +02:00
parent 8a3f260375
commit cfa7ecad21
No known key found for this signature in database
GPG Key ID: 3B3DA43224B79417
6 changed files with 28 additions and 53 deletions

View File

@ -5,7 +5,7 @@ plugins {
} }
group = "org.adde0109" group = "org.adde0109"
version = "1.3.3-beta" version = "1.3.4-beta"
repositories { repositories {
mavenCentral() mavenCentral()

View File

@ -40,7 +40,7 @@ import static com.velocitypowered.api.network.ProtocolVersion.MINECRAFT_1_19;
import static com.velocitypowered.api.network.ProtocolVersion.MINECRAFT_1_19_3; import static com.velocitypowered.api.network.ProtocolVersion.MINECRAFT_1_19_3;
import static com.velocitypowered.proxy.protocol.packet.brigadier.ArgumentIdentifier.mapSet; import static com.velocitypowered.proxy.protocol.packet.brigadier.ArgumentIdentifier.mapSet;
@Plugin(id = "ambassador", name = "Ambassador", version = "1.3.3-beta", authors = {"adde0109"}) @Plugin(id = "ambassador", name = "Ambassador", version = "1.3.4-beta", authors = {"adde0109"})
public class Ambassador { public class Ambassador {
public ProxyServer server; public ProxyServer server;

View File

@ -8,7 +8,6 @@ public class ForgeConstants {
public static final String RESET_LISTENER = "ambassador-reset-listener"; public static final String RESET_LISTENER = "ambassador-reset-listener";
public static final String SERVER_SUCCESS_LISTENER = "ambassador-server-success-listener"; public static final String SERVER_SUCCESS_LISTENER = "ambassador-server-success-listener";
public static final String PLUGIN_PACKET_QUEUE = "ambassador-plugin-generated-packet-queue"; public static final String PLUGIN_PACKET_QUEUE = "ambassador-plugin-generated-packet-queue";
public static final String FORGE_HANDSHAKE_HOLDER = "ambassador-forge-handshake-holder";
public static final String FORGE_HANDSHAKE_DECODER = "ambassador-forge-decoder"; public static final String FORGE_HANDSHAKE_DECODER = "ambassador-forge-decoder";
public static final String FORGE_HANDSHAKE_HANDLER = "ambassador-forge-handler"; public static final String FORGE_HANDSHAKE_HANDLER = "ambassador-forge-handler";
public static final String COMMAND_ERROR_CATCHER = "ambassador-command-catcher"; public static final String COMMAND_ERROR_CATCHER = "ambassador-command-catcher";

View File

@ -65,11 +65,11 @@ public enum VelocityForgeBackendConnectionPhase implements BackendConnectionPhas
//Reset client if not ready to receive new handshake //Reset client if not ready to receive new handshake
VelocityForgeClientConnectionPhase clientPhase = (VelocityForgeClientConnectionPhase) player.getPhase(); VelocityForgeClientConnectionPhase clientPhase = (VelocityForgeClientConnectionPhase) player.getPhase();
clientPhase.resetConnectionPhase(player); message.retain();
clientPhase.resetAndWrite(player, message);
//Forge server //Forge server
//To avoid unnecessary resets, we wait until we get the handshake even if we know that we should //To avoid unnecessary resets, we wait until we get the handshake even if we know that we should
//reset because that the previous server was Forge. //reset because that the previous server was Forge.
player.getConnection().write(message.retain());
ForgeLoginWrapperDecoder decoder = (ForgeLoginWrapperDecoder) player.getConnection() ForgeLoginWrapperDecoder decoder = (ForgeLoginWrapperDecoder) player.getConnection()
.getChannel().pipeline().get(ForgeConstants.FORGE_HANDSHAKE_DECODER); .getChannel().pipeline().get(ForgeConstants.FORGE_HANDSHAKE_DECODER);

View File

@ -9,6 +9,7 @@ import com.velocitypowered.proxy.connection.client.ClientConnectionPhase;
import com.velocitypowered.proxy.connection.client.ConnectedPlayer; import com.velocitypowered.proxy.connection.client.ConnectedPlayer;
import com.velocitypowered.proxy.network.Connections; import com.velocitypowered.proxy.network.Connections;
import com.velocitypowered.proxy.protocol.StateRegistry; import com.velocitypowered.proxy.protocol.StateRegistry;
import com.velocitypowered.proxy.protocol.packet.LoginPluginMessage;
import com.velocitypowered.proxy.protocol.packet.PluginMessage; import com.velocitypowered.proxy.protocol.packet.PluginMessage;
import io.netty.buffer.Unpooled; import io.netty.buffer.Unpooled;
import org.adde0109.ambassador.Ambassador; import org.adde0109.ambassador.Ambassador;
@ -16,7 +17,6 @@ import org.adde0109.ambassador.forge.packet.IForgeLoginWrapperPacket;
import org.adde0109.ambassador.forge.packet.ModListReplyPacket; import org.adde0109.ambassador.forge.packet.ModListReplyPacket;
import org.adde0109.ambassador.forge.pipeline.ForgeLoginWrapperDecoder; import org.adde0109.ambassador.forge.pipeline.ForgeLoginWrapperDecoder;
import org.adde0109.ambassador.velocity.client.FML2CRPMResetCompleteDecoder; import org.adde0109.ambassador.velocity.client.FML2CRPMResetCompleteDecoder;
import org.adde0109.ambassador.velocity.client.OutboundForgeHandshakeQueue;
import org.adde0109.ambassador.velocity.client.OutboundSuccessHolder; import org.adde0109.ambassador.velocity.client.OutboundSuccessHolder;
import org.adde0109.ambassador.velocity.client.PluginLoginPacketQueue; import org.adde0109.ambassador.velocity.client.PluginLoginPacketQueue;
@ -37,28 +37,36 @@ public enum VelocityForgeClientConnectionPhase implements ClientConnectionPhase
}, },
COMPLETE { COMPLETE {
@Override
public void resetAndWrite(ConnectedPlayer player, LoginPluginMessage message) {
resetConnectionPhase(player);
pending = message;
}
@Override @Override
public void resetConnectionPhase(ConnectedPlayer player) { public void resetConnectionPhase(ConnectedPlayer player) {
MinecraftConnection connection = player.getConnection(); MinecraftConnection connection = player.getConnection();
//There is no going back even if the handshake fails. No reason to still be connected.
//We unregister so no plugin sees this client while the client is being reset.
((VelocityServer) Ambassador.getInstance().server).unregisterConnection(player);
if (player.getConnectedServer() != null) { if (player.getConnectedServer() != null) {
player.getConnectedServer().disconnect(); player.getConnectedServer().disconnect();
player.setConnectedServer(null); player.setConnectedServer(null);
} }
//Don't handle anything from the server until the reset has completed.
player.getConnectionInFlight().getConnection().getChannel().config().setAutoRead(false); player.getConnectionInFlight().getConnection().getChannel().config().setAutoRead(false);
//Prepare to receive reset ACK and Forge Handshake. //Prepare to receive reset ACK
connection.getChannel().pipeline().addBefore(Connections.MINECRAFT_DECODER, ForgeConstants.RESET_LISTENER,new FML2CRPMResetCompleteDecoder()); connection.getChannel().pipeline().addBefore(Connections.MINECRAFT_DECODER, ForgeConstants.RESET_LISTENER,new FML2CRPMResetCompleteDecoder());
connection.getChannel().pipeline().addAfter(Connections.MINECRAFT_ENCODER, ForgeConstants.FORGE_HANDSHAKE_HOLDER,new OutboundForgeHandshakeQueue());
((ForgeLoginWrapperDecoder) connection.getChannel().pipeline().get(ForgeConstants.FORGE_HANDSHAKE_DECODER)).registerLoginWrapperID(98); ((ForgeLoginWrapperDecoder) connection.getChannel().pipeline().get(ForgeConstants.FORGE_HANDSHAKE_DECODER)).registerLoginWrapperID(98);
//No more PLAY packets past this point should be sent to the client in case the reset works. //No more PLAY packets past this point should be sent to the client in case the reset works.
connection.write(new PluginMessage("fml:handshake", Unpooled.wrappedBuffer(ForgeHandshakeUtils.generatePluginResetPacket()))); connection.write(new PluginMessage("fml:handshake", Unpooled.wrappedBuffer(ForgeHandshakeUtils.generatePluginResetPacket())));
//We unregister so no plugin sees this client while the client is being reset.
((VelocityServer) Ambassador.getInstance().server).unregisterConnection(player);
connection.getChannel().pipeline().addAfter(Connections.MINECRAFT_ENCODER,ForgeConstants.PLUGIN_PACKET_QUEUE, new PluginLoginPacketQueue()); connection.getChannel().pipeline().addAfter(Connections.MINECRAFT_ENCODER,ForgeConstants.PLUGIN_PACKET_QUEUE, new PluginLoginPacketQueue());
//Transition
player.setPhase(WAITING_RESET); player.setPhase(WAITING_RESET);
WAITING_RESET.onTransitionToNewPhase(player); WAITING_RESET.onTransitionToNewPhase(player);
} }
@ -88,8 +96,10 @@ public enum VelocityForgeClientConnectionPhase implements ClientConnectionPhase
player.getConnection().getChannel().pipeline().remove(ForgeConstants.RESET_LISTENER); player.getConnection().getChannel().pipeline().remove(ForgeConstants.RESET_LISTENER);
player.getConnection().setState(StateRegistry.LOGIN); player.getConnection().setState(StateRegistry.LOGIN);
player.setPhase(NOT_STARTED); player.setPhase(NOT_STARTED);
//Send all held messages //Send all held messages
player.getConnection().getChannel().pipeline().remove(ForgeConstants.FORGE_HANDSHAKE_HOLDER); if (pending != null)
player.getConnection().write(pending);
player.getConnectionInFlight().getConnection().getChannel().config().setAutoRead(true); player.getConnectionInFlight().getConnection().getChannel().config().setAutoRead(true);
if (!(server.getConnection().getType() instanceof ForgeFMLConnectionType)) { if (!(server.getConnection().getType() instanceof ForgeFMLConnectionType)) {
@ -98,6 +108,8 @@ public enum VelocityForgeClientConnectionPhase implements ClientConnectionPhase
((OutboundSuccessHolder) connection.getChannel().pipeline().get(ForgeConstants.SERVER_SUCCESS_LISTENER)) ((OutboundSuccessHolder) connection.getChannel().pipeline().get(ForgeConstants.SERVER_SUCCESS_LISTENER))
.sendPacket(); .sendPacket();
connection.setState(StateRegistry.PLAY); connection.setState(StateRegistry.PLAY);
//Plugins may now send packets to client
connection.getChannel().pipeline().remove(ForgeConstants.PLUGIN_PACKET_QUEUE); connection.getChannel().pipeline().remove(ForgeConstants.PLUGIN_PACKET_QUEUE);
((VelocityServer) Ambassador.getInstance().server).registerConnection(player); ((VelocityServer) Ambassador.getInstance().server).registerConnection(player);
} }
@ -108,11 +120,7 @@ public enum VelocityForgeClientConnectionPhase implements ClientConnectionPhase
} }
} }
}; };
public LoginPluginMessage pending;
public boolean vanillaMode = true;
public boolean handle(ConnectedPlayer player, IForgeLoginWrapperPacket msg, VelocityServerConnection server) { public boolean handle(ConnectedPlayer player, IForgeLoginWrapperPacket msg, VelocityServerConnection server) {
player.setPhase(nextPhase()); player.setPhase(nextPhase());
@ -122,16 +130,17 @@ public enum VelocityForgeClientConnectionPhase implements ClientConnectionPhase
} }
player.getConnectionInFlight().getConnection().write(msg.encode()); player.getConnectionInFlight().getConnection().write(msg.encode());
vanillaMode = false;
return true; return true;
} }
private RegisteredServer lastKnownWorking;
void onTransitionToNewPhase(ConnectedPlayer player) { void onTransitionToNewPhase(ConnectedPlayer player) {
} }
public void resetAndWrite(ConnectedPlayer player, LoginPluginMessage message) {
player.getConnection().write(message);
}
VelocityForgeClientConnectionPhase nextPhase() { VelocityForgeClientConnectionPhase nextPhase() {
return this; return this;
} }

View File

@ -1,33 +0,0 @@
package org.adde0109.ambassador.velocity.client;
import com.velocitypowered.proxy.protocol.packet.LoginPluginMessage;
import io.netty.channel.*;
public class OutboundForgeHandshakeQueue extends ChannelOutboundHandlerAdapter {
PendingWriteQueue writeQueue;
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
writeQueue = new PendingWriteQueue(ctx);
}
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
if ((msg instanceof LoginPluginMessage packet)) {
writeQueue.add(msg, promise);
} else {
ctx.write(msg, promise);
}
}
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
if (ctx.channel().isActive()) {
writeQueue.removeAndWriteAll();
ctx.flush();
} else {
writeQueue.removeAndFailAll(new ChannelException());
}
}
}