Feature(Go): Screen frame relay end-to-end with graceful client BYE (Phase 4)

This commit is contained in:
yuanyuanxiang
2026-05-18 01:00:56 +02:00
parent b1f229706c
commit f013512c06
10 changed files with 999 additions and 74 deletions

View File

@@ -2,21 +2,32 @@
// the bridge between the TCP server (which sees raw client connections) and
// the web server (which serves browser clients).
//
// The TCP side calls RegisterDevice / UnregisterDevice as clients come and go.
// The web side calls ListDevices / GetDevice / (Phase 4) SendToDevice.
// The TCP side calls Register / Unregister / UpdateLive / BindScreenConn as
// the protocol layer notices new sub-connections.
// The web side calls ListDevices / SendToDevice / Subscribe.
// Neither side imports the other — both depend only on this package.
//
// Phase 3 scope: device list only. Frame/cursor pub-sub and SendToDevice are
// added in later phases as features need them.
package hub
import (
"errors"
"sync"
"time"
"github.com/yuanyuanxiang/SimpleRemoter/server/go/connection"
"github.com/yuanyuanxiang/SimpleRemoter/server/go/protocol"
)
// ErrDeviceOffline is returned by SendToDevice when the target device is not
// (no longer) registered.
var ErrDeviceOffline = errors.New("device offline")
// ErrNoSender is returned by SendToDevice if SetSender has not been called.
var ErrNoSender = errors.New("hub sender not configured")
// SendFunc encodes-and-writes raw command bytes to a device's TCP context.
// In practice this is bound to server.Server.Send at startup.
type SendFunc func(ctx *connection.Context, data []byte) error
// Device is the internal record for one logical end-device (keyed by MasterID).
// A single device may use multiple TCP sub-connections (screen, terminal …);
// only the main login connection is stored here.
@@ -43,9 +54,65 @@ type Device struct {
RTT int // network latency in ms (Heartbeat.Ping)
ActiveWindow string // foreground window title (Heartbeat.ActiveWnd, decoded)
// conn is the main connection's context. Web side will use it in Phase 4
// to push COMMAND_SCREEN_SPY and similar commands via the hub.
// conn is the main connection's context — used by SendToDevice to forward
// commands (COMMAND_SCREEN_SPY, etc.) to the device.
conn *connection.Context
// screenConn is the device-initiated sub-connection that streams
// TOKEN_BITMAPINFO / TOKEN_FIRSTSCREEN / TOKEN_NEXTSCREEN frames. Bound
// after the device responds to COMMAND_SCREEN_SPY. Nil while no screen
// session is active.
screenConn *connection.Context
// Cached screen state, for late-joining browsers. Populated by the
// PublishResolution / PublishScreenFrame call sites. screenWidth==0 or
// lastKeyframe==nil indicates "no session" / "no IDR seen yet".
screenWidth int
screenHeight int
lastKeyframe []byte // fully-packed WS binary packet of the most recent IDR
// Cursor index dedup: cursors arrive on every frame (~30 Hz) but only
// rarely change. Suppress duplicates so the WS doesn't carry redundant
// JSON messages.
cursorSeen bool
lastCursorIndex byte
}
// ScreenCache is a read-only snapshot of a device's last-seen screen state,
// used by wsHub.handleConnect to bootstrap late joiners.
type ScreenCache struct {
Width int
Height int
Keyframe []byte // packed WS packet; nil if no keyframe cached yet
Active bool // true iff a screen sub-conn is currently bound
}
// ScreenState returns a snapshot of the device's current screen state, or
// an empty struct if the device is unknown. Safe to call from any goroutine.
func (h *Hub) ScreenState(deviceID string) ScreenCache {
h.mu.RLock()
defer h.mu.RUnlock()
d, ok := h.devices[deviceID]
if !ok {
return ScreenCache{}
}
return ScreenCache{
Width: d.screenWidth,
Height: d.screenHeight,
Keyframe: d.lastKeyframe,
Active: d.screenConn != nil,
}
}
// MainConn exposes the device's main TCP context for callers that need to
// send commands directly. Returns nil if the device is not registered.
func (h *Hub) MainConn(id string) *connection.Context {
h.mu.RLock()
defer h.mu.RUnlock()
if d, ok := h.devices[id]; ok {
return d.conn
}
return nil
}
// DeviceInfo is the JSON-safe projection of Device for the /api/devices
@@ -70,27 +137,128 @@ type DeviceInfo struct {
Online bool `json:"online"`
}
// EventHandler receives notifications about device lifecycle and per-tick
// live updates. Methods are invoked synchronously from Register / Unregister /
// UpdateLive — implementations must be non-blocking (typically just write to
// a channel or queue).
// EventHandler receives notifications about device lifecycle, per-tick live
// updates, screen frames and resolution changes. Methods are invoked
// synchronously from the corresponding hub mutator — implementations must
// be non-blocking (typically just write to a channel or queue).
type EventHandler interface {
OnDeviceOnline(d DeviceInfo)
OnDeviceOffline(id string)
OnDeviceUpdate(id string, rtt int, activeWindow string)
// OnScreenFrame delivers a fully-formed WS binary packet for the given
// device. The packet matches the C++ layout:
// [DeviceID:4 LE][FrameType:1][DataLen:4 LE][H264:N]
// Implementations should treat the slice as read-only.
OnScreenFrame(deviceID string, packet []byte, isKeyframe bool)
// OnResolutionChange fires when a screen session starts (TOKEN_BITMAPINFO)
// or whenever the device reports a new screen geometry mid-stream.
OnResolutionChange(deviceID string, width, height int)
// OnCursorChange fires when the device's foreground cursor index changes.
// Duplicates (same index as the previous frame) are filtered out by the
// hub before reaching subscribers.
OnCursorChange(deviceID string, index byte)
}
// Hub is a thread-safe registry of online devices.
type Hub struct {
mu sync.RWMutex
devices map[string]*Device
subMu sync.RWMutex
mu sync.RWMutex
devices map[string]*Device
subMu sync.RWMutex
subscribers []EventHandler
sender SendFunc
// Reverse index: TCP context -> device ID for the device's screen
// sub-connection. Lets us clean up on raw-connection close without
// having to walk every device. Empty when no screen sessions exist.
screenIndex map[*connection.Context]string
screenIndexMu sync.RWMutex
}
// New returns an empty Hub.
func New() *Hub {
return &Hub{devices: make(map[string]*Device)}
return &Hub{
devices: make(map[string]*Device),
screenIndex: make(map[*connection.Context]string),
}
}
// SetSender wires the function used to deliver outbound bytes on a device's
// main TCP connection. Typically called once in main() with server.Send.
func (h *Hub) SetSender(fn SendFunc) {
h.sender = fn
}
// SendToDevice forwards an already-formed command payload to the device's
// main connection. data should be the raw command bytes (the sender takes
// care of framing / compression at the protocol layer).
func (h *Hub) SendToDevice(id string, data []byte) error {
h.mu.RLock()
d, ok := h.devices[id]
h.mu.RUnlock()
if !ok || d.conn == nil {
return ErrDeviceOffline
}
if h.sender == nil {
return ErrNoSender
}
return h.sender(d.conn, data)
}
// BindScreenConn associates a freshly-arrived sub-connection (the one that
// just sent TOKEN_BITMAPINFO) with the device identified by clientID.
// Returns false if the device is not registered — callers should drop the
// orphan connection in that case.
func (h *Hub) BindScreenConn(deviceID string, ctx *connection.Context) bool {
if deviceID == "" || ctx == nil {
return false
}
h.mu.Lock()
d, ok := h.devices[deviceID]
if !ok {
h.mu.Unlock()
return false
}
d.screenConn = ctx
h.mu.Unlock()
h.screenIndexMu.Lock()
h.screenIndex[ctx] = deviceID
h.screenIndexMu.Unlock()
return true
}
// ScreenDeviceID returns the device ID whose screen sub-connection this
// context represents, or "" if the context is not a screen sub-connection.
// Used by the TCP layer to route TOKEN_FIRSTSCREEN / TOKEN_NEXTSCREEN frames.
func (h *Hub) ScreenDeviceID(ctx *connection.Context) string {
h.screenIndexMu.RLock()
defer h.screenIndexMu.RUnlock()
return h.screenIndex[ctx]
}
// UnbindScreenConn removes the screen sub-connection mapping (called on TCP
// disconnect of a screen sub-context). No-op if the context isn't tracked.
func (h *Hub) UnbindScreenConn(ctx *connection.Context) {
h.screenIndexMu.Lock()
deviceID, ok := h.screenIndex[ctx]
if !ok {
h.screenIndexMu.Unlock()
return
}
delete(h.screenIndex, ctx)
h.screenIndexMu.Unlock()
h.mu.Lock()
if d, ok := h.devices[deviceID]; ok && d.screenConn == ctx {
d.screenConn = nil
// Clear the cache too — when this device's screen comes back up, the
// resolution and IDR will be republished fresh.
d.screenWidth = 0
d.screenHeight = 0
d.lastKeyframe = nil
}
h.mu.Unlock()
}
// Subscribe registers an EventHandler. The returned func removes it.
@@ -119,14 +287,16 @@ func (h *Hub) snapshotSubscribers() []EventHandler {
return out
}
// Register records a device as online. Re-registering an existing ID overwrites
// the previous entry (e.g. a client reconnect with the same MasterID).
// A nil device or empty ID is silently ignored.
// Register records a device as online and pins the main TCP connection that
// will receive outbound commands via SendToDevice. Re-registering an existing
// ID overwrites the previous entry (e.g. a client reconnect with the same
// MasterID). A nil device, nil conn, or empty ID is silently ignored.
// Subscribers are notified after the device is added.
func (h *Hub) Register(d *Device) {
if d == nil || d.ID == "" {
func (h *Hub) Register(d *Device, conn *connection.Context) {
if d == nil || d.ID == "" || conn == nil {
return
}
d.conn = conn
h.mu.Lock()
h.devices[d.ID] = d
info := deviceToInfo(d)
@@ -187,6 +357,135 @@ func deviceToInfo(d *Device) DeviceInfo {
}
}
// PublishCursor notifies subscribers when the device reports a new cursor
// index. Repeated identical indices are suppressed so the WS isn't spammed
// with per-frame cursor JSON. No-op for unknown devices.
func (h *Hub) PublishCursor(deviceID string, index byte) {
h.mu.Lock()
d, ok := h.devices[deviceID]
if !ok {
h.mu.Unlock()
return
}
if d.cursorSeen && d.lastCursorIndex == index {
h.mu.Unlock()
return
}
d.cursorSeen = true
d.lastCursorIndex = index
h.mu.Unlock()
for _, s := range h.snapshotSubscribers() {
s.OnCursorChange(deviceID, index)
}
}
// CloseScreen tears down the active screen sub-connection for the device,
// if any. Used when the last viewer leaves so the device stops capturing.
//
// Cache (screenConn / screenWidth / lastKeyframe) is cleared SYNCHRONOUSLY
// here, not deferred to the eventual OnDisconnect → UnbindScreenConn path.
// Otherwise a new viewer arriving in the brief window between TCP close and
// the disconnect callback would see Active=true with stale dimensions/IDR
// and skip the COMMAND_SCREEN_SPY kick, leaving the page stuck on a "connected"
// status with no frames ever arriving.
func (h *Hub) CloseScreen(deviceID string) {
h.mu.Lock()
d, ok := h.devices[deviceID]
if !ok {
h.mu.Unlock()
return
}
sc := d.screenConn
d.screenConn = nil
d.screenWidth = 0
d.screenHeight = 0
d.lastKeyframe = nil
h.mu.Unlock()
if sc != nil {
// Drop the screenIndex entry SYNCHRONOUSLY so any in-flight frames
// still draining out of the device on this sub-conn (between our
// FIN and the device's clean-up) are silently dropped instead of
// being relayed to the freshly initialized browser decoder. Mixing
// frames from the old x264 SPS/PPS sequence with the new session's
// decoder produces the classic "every other quick reconnect goes
// black" symptom — old NAL units come in via the old ctx after we
// nulled d.screenConn but before OnDisconnect fires.
h.screenIndexMu.Lock()
delete(h.screenIndex, sc)
h.screenIndexMu.Unlock()
// Tell the client to shut its screen pipeline down gracefully.
// Without this, the client's IOCPClient sees recv()==0 as a network
// blip and fires m_ReconnectFunc, which:
// 1. Reconnects the sub-conn (~100 ms)
// 2. Re-sends ConnAuthPacket (no BITMAPINFO!)
// 3. Keeps the capture thread alive for ~10 s holding DXGI handles
// 4. ConnAuth eventually times out, ScreenManager exits
// Net effect: a second viewer arriving within ~10 s of leaving lands
// in the dead window where the device is still capturing for the old
// (now unrouted) sub-conn — page sits on "Waiting for video".
//
// COMMAND_BYE is what the C++ server sends via
// CDialogBase::SayByeBye (server/2015Remote/IOCPServer.h:248) before
// it tears down a sub-conn for the same reason. Client-side handler:
// CScreenManager::OnReceive case COMMAND_BYE
// (client/ScreenManager.cpp:812) sets m_bIsWorking=FALSE and calls
// StopRunning() — the clean exit path that does NOT trigger reconnect.
if h.sender != nil {
_ = h.sender(sc, []byte{protocol.CommandBye})
}
// Mirror the C++ flow (ScreenSpyDlg.cpp:842 — Sleep(500); CancelIO()).
// Give the device's read loop a moment to pull COMMAND_BYE off the
// wire before our FIN arrives; otherwise on a fast LAN the BYE byte
// can be coalesced with the FIN and the client's IOCPClient may
// observe recv()==0 first and trigger reconnect anyway.
// Run the close on a goroutine so the caller (web handler) isn't
// blocked for 500 ms. screenIndex is already cleared above, so
// in-flight frames during the grace window are silently dropped.
go func(c *connection.Context) {
time.Sleep(500 * time.Millisecond)
c.Close()
}(sc)
}
}
// PublishResolution announces a new (or first-ever) screen geometry for a
// device. The browser uses width/height to initialize its WebCodecs decoder.
// The latest dimensions are also cached on the Device so future late-joining
// viewers can be bootstrapped without waiting for the next BITMAPINFO.
func (h *Hub) PublishResolution(deviceID string, width, height int) {
h.mu.Lock()
if d, ok := h.devices[deviceID]; ok {
d.screenWidth = width
d.screenHeight = height
}
h.mu.Unlock()
for _, s := range h.snapshotSubscribers() {
s.OnResolutionChange(deviceID, width, height)
}
}
// PublishScreenFrame fans out a screen frame packet to all subscribers.
// Callers must have already wrapped the H.264 NAL payload in the
// [DeviceID:4][FrameType:1][DataLen:4][...] header expected by the browser.
// The packet slice is shared with subscribers — do not mutate after publish.
//
// Keyframe packets are also retained on the Device record so a new viewer
// joining a live session can immediately receive a decodable starting point
// instead of waiting up to ~15 s for the next IDR.
func (h *Hub) PublishScreenFrame(deviceID string, packet []byte, isKeyframe bool) {
if isKeyframe {
h.mu.Lock()
if d, ok := h.devices[deviceID]; ok {
d.lastKeyframe = packet
}
h.mu.Unlock()
}
for _, s := range h.snapshotSubscribers() {
s.OnScreenFrame(deviceID, packet, isKeyframe)
}
}
// UpdateLive applies a heartbeat-derived RTT and active-window title to the
// device's live fields, then notifies subscribers. No-op if the device is
// not registered (e.g. heartbeat arriving for a connection that never sent