diff --git a/gradle.properties b/gradle.properties index d17616c..425c1f1 100644 --- a/gradle.properties +++ b/gradle.properties @@ -34,7 +34,7 @@ mod_name=PlayerSync # The license of the mod. Review your options at https://choosealicense.com/. All Rights Reserved is the default. mod_license=GPL-3.0 license # The mod version. See https://semver.org/ -mod_version=2.1.3 +mod_version=2.1.4 # The group ID for the mod. It is only important when publishing as an artifact to a Maven repository. # This should match the base package used for the mod sources. # See https://maven.apache.org/guides/mini/guide-naming-conventions.html diff --git a/src/main/java/vip/fubuki/playersync/CommandInit.java b/src/main/java/vip/fubuki/playersync/CommandInit.java index 358513a..06fc216 100644 --- a/src/main/java/vip/fubuki/playersync/CommandInit.java +++ b/src/main/java/vip/fubuki/playersync/CommandInit.java @@ -6,7 +6,6 @@ import net.minecraft.commands.Commands; import net.minecraftforge.event.RegisterCommandsEvent; import net.minecraftforge.eventbus.api.SubscribeEvent; import net.minecraftforge.fml.common.Mod; -import vip.fubuki.playersync.sync.chat.ChatSyncClient; @Mod.EventBusSubscriber() public class CommandInit { @@ -18,7 +17,6 @@ public class CommandInit { .requires(cs->cs.hasPermission(2)) .then(Commands.literal("reconnect") .executes(context -> { - new ChatSyncClient().run(); // context.getSource().sendSuccess(()->MutableComponent.create(new TranslatableContents("playersync.command.reconnect")),true); return 0; } diff --git a/src/main/java/vip/fubuki/playersync/PlayerSync.java b/src/main/java/vip/fubuki/playersync/PlayerSync.java index dedd0f8..7d8c257 100644 --- a/src/main/java/vip/fubuki/playersync/PlayerSync.java +++ b/src/main/java/vip/fubuki/playersync/PlayerSync.java @@ -5,6 +5,7 @@ import com.mysql.cj.jdbc.Driver; import net.minecraft.SharedConstants; import net.minecraftforge.common.MinecraftForge; import net.minecraftforge.event.server.ServerStartingEvent; +import net.minecraftforge.event.server.ServerStoppingEvent; import net.minecraftforge.eventbus.api.IEventBus; import net.minecraftforge.eventbus.api.SubscribeEvent; import net.minecraftforge.fml.ModList; @@ -193,6 +194,11 @@ public class PlayerSync { LOGGER.info("PlayerSync is ready!"); } + @SubscribeEvent + public void onServerStopping(ServerStoppingEvent event){ + ChatSync.shutdown(); + } + private static void addColumnIfNotExists(String tableName, String columnName, String dataTypeDefaultNullness, boolean makePrimaryKey) throws SQLException { diff --git a/src/main/java/vip/fubuki/playersync/sync/ChatSync.java b/src/main/java/vip/fubuki/playersync/sync/ChatSync.java index 0414637..ed77fe7 100644 --- a/src/main/java/vip/fubuki/playersync/sync/ChatSync.java +++ b/src/main/java/vip/fubuki/playersync/sync/ChatSync.java @@ -11,28 +11,45 @@ import java.io.IOException; public class ChatSync { public static final Logger LOGGER = LogUtils.getLogger(); + private static ChatSyncServer chatSyncServer; + private static ChatSyncClient chatSyncClient; public static void register(){ if(JdbcConfig.IS_CHAT_SERVER.get()) { LOGGER.info("Trying to setup chat server at port " + JdbcConfig.CHAT_SERVER_PORT.get()); new Thread(()->{ - ChatSyncServer chatSyncServer = new ChatSyncServer(); + chatSyncServer = new ChatSyncServer(); try { chatSyncServer.run(); } catch (IOException e) { LOGGER.error("Unable to start chat server", e); } - }).start(); + }, "ChatSync-Server").start(); } new Thread(()->{ + try { + Thread.sleep(2000); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + LOGGER.info("Trying to connect to chat server " + JdbcConfig.CHAT_SERVER_IP.get() + ":" + JdbcConfig.CHAT_SERVER_PORT.get()); - ChatSyncClient chatSyncClient = new ChatSyncClient(); + chatSyncClient = new ChatSyncClient(); chatSyncClient.run(); - }).start(); + }, "ChatSync-Client").start(); MinecraftForge.EVENT_BUS.register(ChatSyncClient.class); } + + public static void shutdown() { + if (chatSyncServer != null) { + chatSyncServer.shutdown(); + } + if (chatSyncClient != null) { + chatSyncClient.shutdown(); + } + } } diff --git a/src/main/java/vip/fubuki/playersync/sync/VanillaSync.java b/src/main/java/vip/fubuki/playersync/sync/VanillaSync.java index 058103e..d55b669 100644 --- a/src/main/java/vip/fubuki/playersync/sync/VanillaSync.java +++ b/src/main/java/vip/fubuki/playersync/sync/VanillaSync.java @@ -145,6 +145,11 @@ public class VanillaSync { JDBCsetUp.QueryResult qr1 = JDBCsetUp.executeQuery("SELECT online, last_server FROM player_data WHERE uuid='" + player_uuid + "'"); ResultSet rs1 = qr1.resultSet(); ServerPlayer serverPlayer = (ServerPlayer) event.getEntity(); + + // Mod support + ModsSupport modsSupport = new ModsSupport(); + modsSupport.onPlayerJoin(serverPlayer); + if (!rs1.next()){ store(event.getEntity(), true); return; @@ -229,9 +234,6 @@ public class VanillaSync { } } - // Mod support - ModsSupport modsSupport = new ModsSupport(); - modsSupport.onPlayerJoin(serverPlayer); serverPlayer.addTag("player_synced"); rs2.close(); diff --git a/src/main/java/vip/fubuki/playersync/sync/chat/ChatSyncClient.java b/src/main/java/vip/fubuki/playersync/sync/chat/ChatSyncClient.java index 33379b3..8915bf9 100644 --- a/src/main/java/vip/fubuki/playersync/sync/chat/ChatSyncClient.java +++ b/src/main/java/vip/fubuki/playersync/sync/chat/ChatSyncClient.java @@ -6,13 +6,12 @@ import net.minecraftforge.event.entity.player.PlayerEvent; import net.minecraftforge.eventbus.api.SubscribeEvent; import vip.fubuki.playersync.PlayerSync; import vip.fubuki.playersync.config.JdbcConfig; -import vip.fubuki.playersync.sync.ChatSync; -import java.io.BufferedReader; -import java.io.IOException; -import java.io.InputStreamReader; -import java.io.PrintWriter; +import java.io.*; +import java.net.ConnectException; +import java.net.InetSocketAddress; import java.net.Socket; +import java.net.SocketTimeoutException; import java.util.Objects; public class ChatSyncClient { @@ -20,31 +19,135 @@ public class ChatSyncClient { static Socket clientSocket; static PrintWriter out; + private static volatile boolean running = true; + private static final int RECONNECT_DELAY = 5000; + private static final int MAX_RECONNECT_ATTEMPTS = 10; + + private static volatile long lastHeartbeat = System.currentTimeMillis(); + private static final long HEARTBEAT_INTERVAL = 15000; + public void run() { - try { - clientSocket = new Socket(JdbcConfig.CHAT_SERVER_IP.get(), JdbcConfig.CHAT_SERVER_PORT.get()); - out = new PrintWriter(clientSocket.getOutputStream(),true); + int reconnectAttempts = 0; - BufferedReader in = new BufferedReader( - new InputStreamReader(clientSocket.getInputStream())); + while (running && reconnectAttempts < MAX_RECONNECT_ATTEMPTS) { + try { + PlayerSync.LOGGER.info("Connecting to chat server {}:{}", + JdbcConfig.CHAT_SERVER_IP.get(), + JdbcConfig.CHAT_SERVER_PORT.get()); - String serverMessage; - while ((serverMessage = in.readLine()) != null) { - PlayerSync.LOGGER.info("Received message from chat server: " + serverMessage); - Component textComponents = Component.nullToEmpty(serverMessage); - if(playerList != null){ - playerList.broadcastSystemMessage(textComponents,false); + clientSocket = new Socket(); + clientSocket.setReuseAddress(true); + clientSocket.setKeepAlive(true); + clientSocket.setTcpNoDelay(true); + + clientSocket.connect( + new InetSocketAddress( + JdbcConfig.CHAT_SERVER_IP.get(), + JdbcConfig.CHAT_SERVER_PORT.get() + ), + 15000 + ); + + clientSocket.setSoTimeout(30000); + + out = new PrintWriter(new BufferedWriter( + new OutputStreamWriter(clientSocket.getOutputStream())), true); + + PlayerSync.LOGGER.info("Successfully connected to chat server"); + reconnectAttempts = 0; + lastHeartbeat = System.currentTimeMillis(); + + startHeartbeatMonitor(); + + BufferedReader in = new BufferedReader( + new InputStreamReader(clientSocket.getInputStream())); + + String serverMessage; + while (running && (serverMessage = in.readLine()) != null) { + lastHeartbeat = System.currentTimeMillis(); + + if ("".equals(serverMessage)) { + continue; + } + + PlayerSync.LOGGER.info("Received message from chat server: " + serverMessage); + Component textComponents = Component.nullToEmpty(serverMessage); + if(playerList != null){ + playerList.getServer().execute(() -> + playerList.broadcastSystemMessage(textComponents, false)); + } + } + + } catch (SocketTimeoutException e) { + PlayerSync.LOGGER.warn("Chat server read timeout, reconnecting..."); + } catch (ConnectException e) { + PlayerSync.LOGGER.warn("Cannot connect to chat server: {}", e.getMessage()); + } catch (IOException e) { + PlayerSync.LOGGER.error("Chat client connection error: {}", e.getMessage()); + } finally { + closeConnection(); + } + + if (running && reconnectAttempts < MAX_RECONNECT_ATTEMPTS) { + reconnectAttempts++; + PlayerSync.LOGGER.warn("Attempting to reconnect to chat server ({}/{})", + reconnectAttempts, MAX_RECONNECT_ATTEMPTS); + + try { + long delay = Math.min(RECONNECT_DELAY * (long)Math.pow(2, reconnectAttempts-1), 60000); + Thread.sleep(delay); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + break; } } - } catch (IOException e) { - e.printStackTrace(); - reconnectClient(); } } - private void reconnectClient() { - ChatSync.LOGGER.warn("TODO: implement reconnectClient()"); - //TODO + private void startHeartbeatMonitor() { + Thread heartbeatThread = new Thread(() -> { + while (running && clientSocket != null && !clientSocket.isClosed()) { + try { + Thread.sleep(10000); // 每10秒检查一次 + + long now = System.currentTimeMillis(); + if (now - lastHeartbeat > HEARTBEAT_INTERVAL) { + PlayerSync.LOGGER.warn("No heartbeat for {}ms, sending test message", + now - lastHeartbeat); + + // 发送测试消息检查连接 + if (out != null) { + out.println(""); + } + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + break; + } + } + }, "ChatSync-Heartbeat"); + heartbeatThread.setDaemon(true); + heartbeatThread.start(); + } + + private void closeConnection() { + try { + if (out != null) { + out.close(); + out = null; + } + if (clientSocket != null && !clientSocket.isClosed()) { + clientSocket.close(); + clientSocket = null; + } + } catch (IOException e) { + PlayerSync.LOGGER.error("Error closing connection: {}", e.getMessage()); + } + } + + public void shutdown() { + running = false; + closeConnection(); } @SubscribeEvent diff --git a/src/main/java/vip/fubuki/playersync/sync/chat/ChatSyncServer.java b/src/main/java/vip/fubuki/playersync/sync/chat/ChatSyncServer.java index a9ecf65..6697d6b 100644 --- a/src/main/java/vip/fubuki/playersync/sync/chat/ChatSyncServer.java +++ b/src/main/java/vip/fubuki/playersync/sync/chat/ChatSyncServer.java @@ -1,63 +1,171 @@ package vip.fubuki.playersync.sync.chat; +import vip.fubuki.playersync.PlayerSync; import vip.fubuki.playersync.config.JdbcConfig; -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; +import java.io.*; import java.net.ServerSocket; import java.net.Socket; +import java.net.SocketTimeoutException; +import java.util.Iterator; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; public class ChatSyncServer { static ServerSocket serverSocket; static final Set SocketList = ConcurrentHashMap.newKeySet(); static final ExecutorService executorService = Executors.newCachedThreadPool(); + private volatile boolean running = true; public void run() throws IOException { - serverSocket = new ServerSocket(JdbcConfig.CHAT_SERVER_PORT.get()); - while (!Thread.currentThread().isInterrupted()) { - Socket newSocket = serverSocket.accept(); - SocketList.add(newSocket); - executorService.submit(() -> handleClient(newSocket)); + try { + serverSocket = new ServerSocket(JdbcConfig.CHAT_SERVER_PORT.get()); + serverSocket.setReuseAddress(true); + PlayerSync.LOGGER.info("Chat server started successfully on port {}", JdbcConfig.CHAT_SERVER_PORT.get()); + + startHeartbeatBroadcast(); + + while (running && !Thread.currentThread().isInterrupted()) { + try { + Socket newSocket = serverSocket.accept(); + newSocket.setSoTimeout(30000); + SocketList.add(newSocket); + executorService.submit(() -> handleClient(newSocket)); + PlayerSync.LOGGER.info("New client connected, total clients: {}", SocketList.size()); + } catch (IOException e) { + if (running) { + PlayerSync.LOGGER.error("Error accepting client connection: {}", e.getMessage()); + } + } + } + } finally { + shutdown(); } - serverSocket.close(); } private void handleClient(Socket socket) { - try (InputStream inputStream = socket.getInputStream()) { - byte[] buffer = new byte[1024]; - int bytesRead; - while ((bytesRead = inputStream.read(buffer)) != -1) { - String message = new String(buffer, 0, bytesRead); + String clientInfo = socket.getInetAddress() + ":" + socket.getPort(); + + try (BufferedReader reader = new BufferedReader( + new InputStreamReader(socket.getInputStream()))) { + + String message; + while (running && (message = reader.readLine()) != null) { + PlayerSync.LOGGER.info("Received message from {}: {}", clientInfo, message); broadcastMessage(socket, message); } + + } catch (SocketTimeoutException e) { + PlayerSync.LOGGER.warn("Client {} timeout", clientInfo); } catch (IOException e) { - e.printStackTrace(); + PlayerSync.LOGGER.error("Error handling client {}: {}", clientInfo, e.getMessage()); } finally { SocketList.remove(socket); try { - socket.close(); + if (!socket.isClosed()) { + socket.close(); + } } catch (IOException e) { - e.printStackTrace(); + PlayerSync.LOGGER.error("Error closing client socket: {}", e.getMessage()); } + PlayerSync.LOGGER.info("Client disconnected, remaining clients: {}", SocketList.size()); } } private void broadcastMessage(Socket sender, String message) { - for (Socket socket : SocketList) { - if (!socket.equals(sender)) { + Iterator iterator = SocketList.iterator(); + while (iterator.hasNext()) { + Socket socket = iterator.next(); + if (!socket.equals(sender) && !socket.isClosed()) { try { - OutputStream outputStream = socket.getOutputStream(); - outputStream.write(message.getBytes()); - outputStream.flush(); + PrintWriter writer = new PrintWriter(socket.getOutputStream(), true); + writer.println(message); } catch (IOException e) { - e.printStackTrace(); + PlayerSync.LOGGER.error("Error broadcasting to client, removing: {}", e.getMessage()); + iterator.remove(); + try { + socket.close(); + } catch (IOException ex) { + // Ignore + } } } } } + + private void startHeartbeatBroadcast() { + Thread heartbeatThread = new Thread(() -> { + while (running) { + try { + Thread.sleep(20000); + broadcastHeartbeat(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + break; + } + } + }, "ChatSync-Server-Heartbeat"); + heartbeatThread.setDaemon(true); + heartbeatThread.start(); + } + + private void broadcastHeartbeat() { + Iterator iterator = SocketList.iterator(); + while (iterator.hasNext()) { + Socket socket = iterator.next(); + if (!socket.isClosed()) { + try { + PrintWriter writer = new PrintWriter( + new BufferedWriter( + new OutputStreamWriter(socket.getOutputStream())), true); + writer.println(""); + } catch (IOException e) { + PlayerSync.LOGGER.warn("Failed to send heartbeat to client, removing: {}", e.getMessage()); + iterator.remove(); + try { + socket.close(); + } catch (IOException ex) { + // Ignore + } + } + } else { + iterator.remove(); + } + } + } + + public void shutdown() { + running = false; + try { + if (serverSocket != null && !serverSocket.isClosed()) { + serverSocket.close(); + } + } catch (IOException e) { + PlayerSync.LOGGER.error("Error closing server socket: {}", e.getMessage()); + } + + for (Socket socket : SocketList) { + try { + if (!socket.isClosed()) { + socket.close(); + } + } catch (IOException e) { + // Ignore + } + } + SocketList.clear(); + + executorService.shutdown(); + try { + if (!executorService.awaitTermination(5, TimeUnit.SECONDS)) { + executorService.shutdownNow(); + } + } catch (InterruptedException e) { + executorService.shutdownNow(); + Thread.currentThread().interrupt(); + } + } }