Files
SimpleRemoter/server/go/hub/hub.go
yuanyuanxiang d757c33bcb Fix(Go): stable device list ordering + RDP-reset handler
Fix UTF-8 login text decode + stale screen sub-conn retirement
2026-05-19 16:28:32 +02:00

843 lines
28 KiB
Go

// Package hub maintains the registry of currently online devices and acts as
// the bridge between the TCP server (which sees raw client connections) and
// the web server (which serves browser clients).
//
// The TCP side calls Register / Unregister / UpdateLive / BindScreenConn as
// the protocol layer notices new sub-connections.
// The web side calls ListDevices / SendToDevice / Subscribe.
// Neither side imports the other — both depend only on this package.
package hub
import (
"errors"
"sort"
"sync"
"time"
"github.com/yuanyuanxiang/SimpleRemoter/server/go/connection"
"github.com/yuanyuanxiang/SimpleRemoter/server/go/protocol"
)
// ErrDeviceOffline is returned by SendToDevice when the target device is not
// (no longer) registered.
var ErrDeviceOffline = errors.New("device offline")
// ErrNoSender is returned by SendToDevice if SetSender has not been called.
var ErrNoSender = errors.New("hub sender not configured")
// SendFunc encodes-and-writes raw command bytes to a device's TCP context.
// In practice this is bound to server.Server.Send at startup.
type SendFunc func(ctx *connection.Context, data []byte) error
// Device is the internal record for one logical end-device (keyed by MasterID).
// A single device may use multiple TCP sub-connections (screen, terminal …);
// only the main login connection is stored here.
//
// PCName from LOGIN_INFOR is interpreted as "ComputerName/Group" and
// ModuleVersion as "Version-Capability"; the split halves live in separate
// fields so the front-end can render them independently.
type Device struct {
ID string // MasterID — stable identifier the client reports at login
Name string // PCName before '/' (real computer name)
Group string // PCName after '/' (group label; may be empty)
Version string // ModuleVersion before '-' (semantic version)
Capability string // ModuleVersion after '-' (capability tags; may be empty)
OS string // OS version string
CPU string // from LOGIN_INFOR reserved field 2
FilePath string // from LOGIN_INFOR reserved field 4
InstallTime string // from LOGIN_INFOR reserved field 6 (or StartTime)
Location string // client-reported geo string (reserved field 10)
PeerIP string // network-level remote address as seen by the server
PublicIP string // client-reported public IP (reserved field 11)
Resolution string // client-formatted screen geometry "N:W*H" (reserved field 15)
ConnectedAt time.Time
// Live fields refreshed on every heartbeat. Protected by hub.mu.
RTT int // network latency in ms (Heartbeat.Ping)
ActiveWindow string // foreground window title (Heartbeat.ActiveWnd, decoded)
// conn is the main connection's context — used by SendToDevice to forward
// commands (COMMAND_SCREEN_SPY, etc.) to the device.
conn *connection.Context
// screenConn is the device-initiated sub-connection that streams
// TOKEN_BITMAPINFO / TOKEN_FIRSTSCREEN / TOKEN_NEXTSCREEN frames. Bound
// after the device responds to COMMAND_SCREEN_SPY. Nil while no screen
// session is active.
screenConn *connection.Context
// Cached screen state, for late-joining browsers. Populated by the
// PublishResolution / PublishScreenFrame call sites. screenWidth==0 or
// lastKeyframe==nil indicates "no session" / "no IDR seen yet".
screenWidth int
screenHeight int
lastKeyframe []byte // fully-packed WS binary packet of the most recent IDR
// Cursor index dedup: cursors arrive on every frame (~30 Hz) but only
// rarely change. Suppress duplicates so the WS doesn't carry redundant
// JSON messages.
cursorSeen bool
lastCursorIndex byte
// Terminal session state — at most one web terminal per device (MVP
// constraint shared with the C++ server). All three fields are
// guarded by hub.mu.
//
// terminalPending: COMMAND_SHELL has been sent, waiting for the device's
// sub-conn to arrive and announce itself via TOKEN_TERMINAL_START /
// TOKEN_SHELL_START.
// terminalConn: the shell sub-conn ctx after binding. Nil before BIND
// and after teardown.
// terminalIsPTY: distinguishes Linux/macOS/ConPTY (true) from the legacy
// Windows cmd-pipe path. PTY mode supports resize; cmd-pipe ignores it.
terminalPending bool
terminalConn *connection.Context
terminalIsPTY bool
}
// ScreenCache is a read-only snapshot of a device's last-seen screen state,
// used by wsHub.handleConnect to bootstrap late joiners.
type ScreenCache struct {
Width int
Height int
Keyframe []byte // packed WS packet; nil if no keyframe cached yet
Active bool // true iff a screen sub-conn is currently bound
}
// ScreenState returns a snapshot of the device's current screen state, or
// an empty struct if the device is unknown. Safe to call from any goroutine.
func (h *Hub) ScreenState(deviceID string) ScreenCache {
h.mu.RLock()
defer h.mu.RUnlock()
d, ok := h.devices[deviceID]
if !ok {
return ScreenCache{}
}
return ScreenCache{
Width: d.screenWidth,
Height: d.screenHeight,
Keyframe: d.lastKeyframe,
Active: d.screenConn != nil,
}
}
// MainConn exposes the device's main TCP context for callers that need to
// send commands directly. Returns nil if the device is not registered.
func (h *Hub) MainConn(id string) *connection.Context {
h.mu.RLock()
defer h.mu.RUnlock()
if d, ok := h.devices[id]; ok {
return d.conn
}
return nil
}
// Capability returns the device's reported capability hex string
// (LOGIN_INFOR.moduleVersion tail). Empty for unknown devices — callers
// should treat that as "no caps" (legacy Windows GBK default).
func (h *Hub) Capability(id string) string {
h.mu.RLock()
defer h.mu.RUnlock()
if d, ok := h.devices[id]; ok {
return d.Capability
}
return ""
}
// DeviceInfo is the JSON-safe projection of Device for the /api/devices
// endpoint and the WS device_list message. Field names match what the
// existing browser front-end expects.
type DeviceInfo struct {
ID string `json:"id"`
Name string `json:"name"`
Group string `json:"group,omitempty"`
Version string `json:"version"`
Capability string `json:"capability,omitempty"`
OS string `json:"os"`
CPU string `json:"cpu,omitempty"`
FilePath string `json:"file_path,omitempty"`
InstallTime string `json:"install_time,omitempty"`
Location string `json:"location,omitempty"`
IP string `json:"ip"` // client-reported public IP (matches C++ key)
PeerIP string `json:"peer_ip,omitempty"`
Screen string `json:"screen,omitempty"` // "N:W*H" — matches C++ DeviceInfo.screen key
RTT int `json:"rtt"`
ActiveWindow string `json:"activeWindow,omitempty"`
ConnectedAt int64 `json:"connected_at"`
Online bool `json:"online"`
}
// EventHandler receives notifications about device lifecycle, per-tick live
// updates, screen frames and resolution changes. Methods are invoked
// synchronously from the corresponding hub mutator — implementations must
// be non-blocking (typically just write to a channel or queue).
type EventHandler interface {
OnDeviceOnline(d DeviceInfo)
OnDeviceOffline(id string)
OnDeviceUpdate(id string, rtt int, activeWindow string)
// OnScreenFrame delivers a fully-formed WS binary packet for the given
// device. The packet matches the C++ layout:
// [DeviceID:4 LE][FrameType:1][DataLen:4 LE][H264:N]
// Implementations should treat the slice as read-only.
OnScreenFrame(deviceID string, packet []byte, isKeyframe bool)
// OnResolutionChange fires when a screen session starts (TOKEN_BITMAPINFO)
// or whenever the device reports a new screen geometry mid-stream.
OnResolutionChange(deviceID string, width, height int)
// OnCursorChange fires when the device's foreground cursor index changes.
// Duplicates (same index as the previous frame) are filtered out by the
// hub before reaching subscribers.
OnCursorChange(deviceID string, index byte)
// OnTerminalReady fires once the device's shell sub-conn is bound and
// the server has sent COMMAND_NEXT to start its output read loop.
// isPTY=true means PTY mode (Linux/macOS or ConPTY); false means the
// legacy Windows cmd-pipe path which doesn't support resize.
OnTerminalReady(deviceID string, isPTY bool)
// OnTerminalData ships one chunk of raw shell output (already wrapped
// in the WS-binary "TRM1" magic header) to terminal viewers.
OnTerminalData(deviceID string, packet []byte)
// OnTerminalClosed fires when the session ends — either because the
// device sent TOKEN_TERMINAL_CLOSE, the sub-conn dropped, or the
// server explicitly tore it down.
OnTerminalClosed(deviceID string, reason string)
}
// Hub is a thread-safe registry of online devices.
type Hub struct {
mu sync.RWMutex
devices map[string]*Device
subMu sync.RWMutex
subscribers []EventHandler
sender SendFunc
// Reverse index: TCP context -> device ID for the device's screen
// sub-connection. Lets us clean up on raw-connection close without
// having to walk every device. Empty when no screen sessions exist.
screenIndex map[*connection.Context]string
screenIndexMu sync.RWMutex
// Parallel reverse index for terminal sub-conns. Same purpose: O(1)
// lookup from a raw ctx (e.g. on OnDisconnect) back to its device.
terminalIndex map[*connection.Context]string
terminalIndexMu sync.RWMutex
}
// New returns an empty Hub.
func New() *Hub {
return &Hub{
devices: make(map[string]*Device),
screenIndex: make(map[*connection.Context]string),
terminalIndex: make(map[*connection.Context]string),
}
}
// SetSender wires the function used to deliver outbound bytes on a device's
// main TCP connection. Typically called once in main() with server.Send.
func (h *Hub) SetSender(fn SendFunc) {
h.sender = fn
}
// SendToDevice forwards an already-formed command payload to the device's
// main connection. data should be the raw command bytes (the sender takes
// care of framing / compression at the protocol layer).
func (h *Hub) SendToDevice(id string, data []byte) error {
h.mu.RLock()
d, ok := h.devices[id]
h.mu.RUnlock()
if !ok || d.conn == nil {
return ErrDeviceOffline
}
if h.sender == nil {
return ErrNoSender
}
return h.sender(d.conn, data)
}
// SendToScreen routes a payload to the device's currently-bound screen
// sub-connection. Input events (COMMAND_SCREEN_CONTROL) MUST go through the
// screen sub-conn rather than the main conn — the C++ client only dispatches
// these commands from CScreenManager::OnReceive, which reads exclusively from
// the sub-conn (see client/ScreenManager.cpp:1065). Returns ErrDeviceOffline
// when the device is unknown OR has no active screen session, so callers can
// quietly drop input from browsers that haven't called connect yet.
func (h *Hub) SendToScreen(id string, data []byte) error {
h.mu.RLock()
d, ok := h.devices[id]
var sc *connection.Context
if ok {
sc = d.screenConn
}
h.mu.RUnlock()
if !ok || sc == nil {
return ErrDeviceOffline
}
if h.sender == nil {
return ErrNoSender
}
return h.sender(sc, data)
}
// BindScreenConn associates a freshly-arrived sub-connection (the one that
// just sent TOKEN_BITMAPINFO) with the device identified by clientID.
// Returns false if the device is not registered — callers should drop the
// orphan connection in that case.
//
// If the device already has a screen sub-conn bound (typically because the
// client's monitor-poll logic opened a new one without the previous viewer
// going through CloseScreen — e.g. multiple open/close cycles where each
// cycle picks a different display), the previous sub-conn is retired via
// retireScreenConn. Without this, both sub-conns keep streaming under the
// same device ID and the browser sees H.264 frames from two encoders
// interleaved, rendering as a picture that jumps between monitors.
func (h *Hub) BindScreenConn(deviceID string, ctx *connection.Context) bool {
if deviceID == "" || ctx == nil {
return false
}
h.mu.Lock()
d, ok := h.devices[deviceID]
if !ok {
h.mu.Unlock()
return false
}
old := d.screenConn
d.screenConn = ctx
// The cached resolution and last IDR belong to the old encoder's
// stream. A viewer joining now must wait for the new sub-conn's
// TOKEN_BITMAPINFO + first IDR; serving the old monitor's keyframe
// to a decoder that's about to receive a different SPS/PPS would
// produce the same mixed-stream corruption retireScreenConn exists
// to prevent.
replacing := old != nil && old != ctx
if replacing {
d.screenWidth = 0
d.screenHeight = 0
d.lastKeyframe = nil
}
h.mu.Unlock()
h.screenIndexMu.Lock()
h.screenIndex[ctx] = deviceID
h.screenIndexMu.Unlock()
if replacing {
h.retireScreenConn(old)
}
return true
}
// ScreenDeviceID returns the device ID whose screen sub-connection this
// context represents, or "" if the context is not a screen sub-connection.
// Used by the TCP layer to route TOKEN_FIRSTSCREEN / TOKEN_NEXTSCREEN frames.
func (h *Hub) ScreenDeviceID(ctx *connection.Context) string {
h.screenIndexMu.RLock()
defer h.screenIndexMu.RUnlock()
return h.screenIndex[ctx]
}
// UnbindScreenConn removes the screen sub-connection mapping (called on TCP
// disconnect of a screen sub-context). No-op if the context isn't tracked.
func (h *Hub) UnbindScreenConn(ctx *connection.Context) {
h.screenIndexMu.Lock()
deviceID, ok := h.screenIndex[ctx]
if !ok {
h.screenIndexMu.Unlock()
return
}
delete(h.screenIndex, ctx)
h.screenIndexMu.Unlock()
h.mu.Lock()
if d, ok := h.devices[deviceID]; ok && d.screenConn == ctx {
d.screenConn = nil
// Clear the cache too — when this device's screen comes back up, the
// resolution and IDR will be republished fresh.
d.screenWidth = 0
d.screenHeight = 0
d.lastKeyframe = nil
}
h.mu.Unlock()
}
// retireScreenConn tears down a screen sub-conn that's being replaced or
// closed. Shared by CloseScreen (last viewer left) and BindScreenConn (device
// opened a new sub-conn that's superseding this one, e.g. when the client's
// monitor-poll logic switches to a different display).
//
// Steps and ordering matter:
//
// 1. screenIndex entry is dropped FIRST so any in-flight frames still in the
// device's TCP send buffer arrive at handleScreenFrame with deviceID=""
// and are silently dropped. Without this they'd be relayed under the
// same deviceID as the new sub-conn — that's the "frames from two
// monitors interleaved in one stream" symptom: the browser decoder sees
// a mix of two independent x264 SPS/PPS sequences and renders an
// alternating / glitchy picture.
//
// 2. COMMAND_BYE is sent so the C++ client's IOCPClient exits via the
// clean StopRunning() path. Without it the client treats FIN as a
// network blip and fires m_ReconnectFunc, which opens a fresh sub-conn
// that never sends BITMAPINFO again, keeps the encoder thread alive,
// and ultimately wedges the device for ~10 s — see the original
// CloseScreen comment for the full failure mode.
//
// 3. The actual TCP Close happens 500 ms later on a goroutine, mirroring
// the C++ ScreenSpyDlg.cpp:842 (Sleep(500); CancelIO()) sequence so
// COMMAND_BYE has time to reach the device read loop before FIN does.
//
// No-op for a nil sc so callers can pass d.screenConn unconditionally.
func (h *Hub) retireScreenConn(sc *connection.Context) {
if sc == nil {
return
}
h.screenIndexMu.Lock()
delete(h.screenIndex, sc)
h.screenIndexMu.Unlock()
if h.sender != nil {
_ = h.sender(sc, []byte{protocol.CommandBye})
}
go func(c *connection.Context) {
time.Sleep(500 * time.Millisecond)
c.Close()
}(sc)
}
// Subscribe registers an EventHandler. The returned func removes it.
// Multiple handlers are supported; each receives every event.
func (h *Hub) Subscribe(eh EventHandler) (unsubscribe func()) {
h.subMu.Lock()
h.subscribers = append(h.subscribers, eh)
h.subMu.Unlock()
return func() {
h.subMu.Lock()
defer h.subMu.Unlock()
for i, x := range h.subscribers {
if x == eh {
h.subscribers = append(h.subscribers[:i], h.subscribers[i+1:]...)
return
}
}
}
}
func (h *Hub) snapshotSubscribers() []EventHandler {
h.subMu.RLock()
defer h.subMu.RUnlock()
out := make([]EventHandler, len(h.subscribers))
copy(out, h.subscribers)
return out
}
// Register records a device as online and pins the main TCP connection that
// will receive outbound commands via SendToDevice. Re-registering an existing
// ID overwrites the previous entry (e.g. a client reconnect with the same
// MasterID). A nil device, nil conn, or empty ID is silently ignored.
// Subscribers are notified after the device is added.
func (h *Hub) Register(d *Device, conn *connection.Context) {
if d == nil || d.ID == "" || conn == nil {
return
}
d.conn = conn
h.mu.Lock()
h.devices[d.ID] = d
info := deviceToInfo(d)
h.mu.Unlock()
for _, s := range h.snapshotSubscribers() {
s.OnDeviceOnline(info)
}
}
// Unregister removes a device by ID. No-op if not present.
// Subscribers are notified after the device is removed (only if it existed).
func (h *Hub) Unregister(id string) {
if id == "" {
return
}
h.mu.Lock()
_, existed := h.devices[id]
delete(h.devices, id)
h.mu.Unlock()
if !existed {
return
}
for _, s := range h.snapshotSubscribers() {
s.OnDeviceOffline(id)
}
}
// ListDevices returns a fresh snapshot slice. The caller may mutate it freely;
// it shares no state with the hub.
//
// Sort by ConnectedAt asc (ID as tiebreaker) so the order stays stable across
// REST polls and WS pushes — Go's map iteration is intentionally randomized,
// which would otherwise reshuffle the UI list on every refresh.
func (h *Hub) ListDevices() []DeviceInfo {
h.mu.RLock()
defer h.mu.RUnlock()
out := make([]DeviceInfo, 0, len(h.devices))
for _, d := range h.devices {
out = append(out, deviceToInfo(d))
}
sort.Slice(out, func(i, j int) bool {
if out[i].ConnectedAt != out[j].ConnectedAt {
return out[i].ConnectedAt < out[j].ConnectedAt
}
return out[i].ID < out[j].ID
})
return out
}
func deviceToInfo(d *Device) DeviceInfo {
return DeviceInfo{
ID: d.ID,
Name: d.Name,
Group: d.Group,
Version: d.Version,
Capability: d.Capability,
OS: d.OS,
CPU: d.CPU,
FilePath: d.FilePath,
InstallTime: d.InstallTime,
Location: d.Location,
IP: d.PublicIP,
PeerIP: d.PeerIP,
Screen: d.Resolution,
RTT: d.RTT,
ActiveWindow: d.ActiveWindow,
ConnectedAt: d.ConnectedAt.Unix(),
Online: true, // a device that's in the map is by definition online
}
}
// PublishCursor notifies subscribers when the device reports a new cursor
// index. Repeated identical indices are suppressed so the WS isn't spammed
// with per-frame cursor JSON. No-op for unknown devices.
func (h *Hub) PublishCursor(deviceID string, index byte) {
h.mu.Lock()
d, ok := h.devices[deviceID]
if !ok {
h.mu.Unlock()
return
}
if d.cursorSeen && d.lastCursorIndex == index {
h.mu.Unlock()
return
}
d.cursorSeen = true
d.lastCursorIndex = index
h.mu.Unlock()
for _, s := range h.snapshotSubscribers() {
s.OnCursorChange(deviceID, index)
}
}
// CloseScreen tears down the active screen sub-connection for the device,
// if any. Used when the last viewer leaves so the device stops capturing.
//
// Cache (screenConn / screenWidth / lastKeyframe) is cleared SYNCHRONOUSLY
// here, not deferred to the eventual OnDisconnect → UnbindScreenConn path.
// Otherwise a new viewer arriving in the brief window between TCP close and
// the disconnect callback would see Active=true with stale dimensions/IDR
// and skip the COMMAND_SCREEN_SPY kick, leaving the page stuck on a "connected"
// status with no frames ever arriving.
//
// The actual sub-conn teardown is delegated to retireScreenConn, which is
// shared with BindScreenConn's replacement path.
func (h *Hub) CloseScreen(deviceID string) {
h.mu.Lock()
d, ok := h.devices[deviceID]
if !ok {
h.mu.Unlock()
return
}
sc := d.screenConn
d.screenConn = nil
d.screenWidth = 0
d.screenHeight = 0
d.lastKeyframe = nil
h.mu.Unlock()
h.retireScreenConn(sc)
}
// PublishResolution announces a new (or first-ever) screen geometry for a
// device. The browser uses width/height to initialize its WebCodecs decoder.
// The latest dimensions are also cached on the Device so future late-joining
// viewers can be bootstrapped without waiting for the next BITMAPINFO.
func (h *Hub) PublishResolution(deviceID string, width, height int) {
h.mu.Lock()
if d, ok := h.devices[deviceID]; ok {
d.screenWidth = width
d.screenHeight = height
}
h.mu.Unlock()
for _, s := range h.snapshotSubscribers() {
s.OnResolutionChange(deviceID, width, height)
}
}
// PublishScreenFrame fans out a screen frame packet to all subscribers.
// Callers must have already wrapped the H.264 NAL payload in the
// [DeviceID:4][FrameType:1][DataLen:4][...] header expected by the browser.
// The packet slice is shared with subscribers — do not mutate after publish.
//
// Keyframe packets are also retained on the Device record so a new viewer
// joining a live session can immediately receive a decodable starting point
// instead of waiting up to ~15 s for the next IDR.
func (h *Hub) PublishScreenFrame(deviceID string, packet []byte, isKeyframe bool) {
if isKeyframe {
h.mu.Lock()
if d, ok := h.devices[deviceID]; ok {
d.lastKeyframe = packet
}
h.mu.Unlock()
}
for _, s := range h.snapshotSubscribers() {
s.OnScreenFrame(deviceID, packet, isKeyframe)
}
}
// UpdateLive applies a heartbeat-derived RTT and active-window title to the
// device's live fields, then notifies subscribers. No-op if the device is
// not registered (e.g. heartbeat arriving for a connection that never sent
// TOKEN_LOGIN or has already disconnected).
func (h *Hub) UpdateLive(id string, rtt int, activeWindow string) {
if id == "" {
return
}
h.mu.Lock()
d, ok := h.devices[id]
if !ok {
h.mu.Unlock()
return
}
d.RTT = rtt
d.ActiveWindow = activeWindow
h.mu.Unlock()
for _, s := range h.snapshotSubscribers() {
s.OnDeviceUpdate(id, rtt, activeWindow)
}
}
// ----- Terminal session management (Phase 6) --------------------------------
// ErrTerminalBusy is returned by OpenTerminalSession when the device already
// has a pending or active terminal session — MVP enforces single-viewer.
var ErrTerminalBusy = errors.New("terminal already open by another viewer")
// OpenTerminalSession atomically marks a terminal session as pending for the
// device, then sends COMMAND_SHELL on the main TCP connection so the device
// will spawn a shell sub-conn. Returns nil if the request was sent. On any
// failure the pending flag is rolled back so retries are possible.
//
// Single-viewer constraint: if a pending or bound session already exists,
// returns ErrTerminalBusy. Mirrors C++ CWebService::HandleTermOpen
// (server/2015Remote/WebService.cpp:1838).
func (h *Hub) OpenTerminalSession(deviceID string) error {
if deviceID == "" {
return ErrDeviceOffline
}
h.mu.Lock()
d, ok := h.devices[deviceID]
if !ok || d.conn == nil {
h.mu.Unlock()
return ErrDeviceOffline
}
if d.terminalPending || d.terminalConn != nil {
h.mu.Unlock()
return ErrTerminalBusy
}
d.terminalPending = true
mainConn := d.conn
h.mu.Unlock()
if h.sender == nil {
// Roll back so a retry isn't permanently blocked.
h.mu.Lock()
d.terminalPending = false
h.mu.Unlock()
return ErrNoSender
}
if err := h.sender(mainConn, []byte{protocol.CommandShell}); err != nil {
h.mu.Lock()
d.terminalPending = false
h.mu.Unlock()
return err
}
return nil
}
// IsTerminalPending tells the TCP layer whether the next-arriving shell
// sub-conn should be claimed by the web terminal. The C++ side uses this
// in MessageHandle to decide between WebService takeover and opening an
// MFC dialog (server/2015Remote/2015RemoteDlg.cpp:5753).
func (h *Hub) IsTerminalPending(deviceID string) bool {
h.mu.RLock()
defer h.mu.RUnlock()
d, ok := h.devices[deviceID]
return ok && d.terminalPending
}
// BindTerminalConn promotes the pending session to an active one by
// associating the device's freshly-arrived shell sub-conn. Returns false
// if no pending session exists — callers should drop the orphan ctx.
//
// Subscribers receive OnTerminalReady AFTER binding so they can flip the
// browser into "ready" state immediately on the same TCP roundtrip that
// will deliver the first shell output.
func (h *Hub) BindTerminalConn(deviceID string, ctx *connection.Context, isPTY bool) bool {
if deviceID == "" || ctx == nil {
return false
}
h.mu.Lock()
d, ok := h.devices[deviceID]
if !ok || !d.terminalPending {
h.mu.Unlock()
return false
}
d.terminalConn = ctx
d.terminalIsPTY = isPTY
d.terminalPending = false
h.mu.Unlock()
h.terminalIndexMu.Lock()
h.terminalIndex[ctx] = deviceID
h.terminalIndexMu.Unlock()
for _, s := range h.snapshotSubscribers() {
s.OnTerminalReady(deviceID, isPTY)
}
return true
}
// TerminalDeviceID returns the device ID whose terminal sub-conn this
// context belongs to, or "" otherwise. The TCP layer uses this on every
// inbound packet on a sub-conn — when non-empty, the bytes are raw shell
// output and bypass the usual command-byte switch.
func (h *Hub) TerminalDeviceID(ctx *connection.Context) string {
h.terminalIndexMu.RLock()
defer h.terminalIndexMu.RUnlock()
return h.terminalIndex[ctx]
}
// UnbindTerminalConn removes the terminal mapping (called from the TCP
// disconnect path for any sub-conn ctx). Fires OnTerminalClosed once if
// the unbind actually removed something — so subscribers can update the
// browser even on unexpected device-side drops.
func (h *Hub) UnbindTerminalConn(ctx *connection.Context) {
h.terminalIndexMu.Lock()
deviceID, tracked := h.terminalIndex[ctx]
if !tracked {
h.terminalIndexMu.Unlock()
return
}
delete(h.terminalIndex, ctx)
h.terminalIndexMu.Unlock()
h.mu.Lock()
if d, ok := h.devices[deviceID]; ok && d.terminalConn == ctx {
d.terminalConn = nil
d.terminalPending = false
d.terminalIsPTY = false
}
h.mu.Unlock()
for _, s := range h.snapshotSubscribers() {
s.OnTerminalClosed(deviceID, "disconnected")
}
}
// SendToTerminal forwards bytes (typically xterm.js keystrokes) to the
// device's shell sub-conn. Returns ErrDeviceOffline if no session is
// active for this device.
func (h *Hub) SendToTerminal(id string, data []byte) error {
h.mu.RLock()
d, ok := h.devices[id]
var tc *connection.Context
if ok {
tc = d.terminalConn
}
h.mu.RUnlock()
if !ok || tc == nil {
return ErrDeviceOffline
}
if h.sender == nil {
return ErrNoSender
}
return h.sender(tc, data)
}
// TerminalIsPTY reports whether the active session is PTY mode (the
// resize command only applies in PTY mode — legacy cmd-pipe ignores it).
func (h *Hub) TerminalIsPTY(id string) bool {
h.mu.RLock()
defer h.mu.RUnlock()
d, ok := h.devices[id]
return ok && d.terminalConn != nil && d.terminalIsPTY
}
// CloseTerminalSession tears down the session from the server side
// (typically when the requesting browser sends term_close or disconnects).
// Mirrors CloseScreen's graceful pattern: drop the index synchronously,
// send COMMAND_BYE, then close after a short grace period so the client's
// IOCPClient reconnect logic doesn't fire.
func (h *Hub) CloseTerminalSession(deviceID string) {
h.mu.Lock()
d, ok := h.devices[deviceID]
if !ok {
h.mu.Unlock()
return
}
tc := d.terminalConn
// hadSession guards against firing spurious OnTerminalClosed events
// when there was nothing to tear down — relevant when the main-conn
// teardown path calls CloseTerminalSession unconditionally as part of
// device-offline cleanup, or when both OnDisconnect and an explicit
// browser term_close race for the same teardown.
hadSession := tc != nil || d.terminalPending
d.terminalConn = nil
d.terminalPending = false
d.terminalIsPTY = false
h.mu.Unlock()
if !hadSession {
return
}
for _, s := range h.snapshotSubscribers() {
s.OnTerminalClosed(deviceID, "closed")
}
if tc == nil {
return
}
h.terminalIndexMu.Lock()
delete(h.terminalIndex, tc)
h.terminalIndexMu.Unlock()
// Mirror Hub.CloseScreen: send COMMAND_BYE then close after 500 ms so
// the device exits its shell read loop instead of treating the FIN as
// a network blip and triggering reconnect.
if h.sender != nil {
_ = h.sender(tc, []byte{protocol.CommandBye})
}
go func(c *connection.Context) {
time.Sleep(500 * time.Millisecond)
c.Close()
}(tc)
}
// PublishTerminalData fans out one chunk of shell output to subscribers.
// Caller has already wrapped it in the "TRM1" magic header so the browser
// can demultiplex from screen frames over the shared WebSocket.
func (h *Hub) PublishTerminalData(deviceID string, packet []byte) {
for _, s := range h.snapshotSubscribers() {
s.OnTerminalData(deviceID, packet)
}
}
// Count returns the current number of online devices.
func (h *Hub) Count() int {
h.mu.RLock()
defer h.mu.RUnlock()
return len(h.devices)
}