Feature(Go): Screen frame relay end-to-end with graceful client BYE (Phase 4)
This commit is contained in:
@@ -2,21 +2,32 @@
|
||||
// the bridge between the TCP server (which sees raw client connections) and
|
||||
// the web server (which serves browser clients).
|
||||
//
|
||||
// The TCP side calls RegisterDevice / UnregisterDevice as clients come and go.
|
||||
// The web side calls ListDevices / GetDevice / (Phase 4) SendToDevice.
|
||||
// The TCP side calls Register / Unregister / UpdateLive / BindScreenConn as
|
||||
// the protocol layer notices new sub-connections.
|
||||
// The web side calls ListDevices / SendToDevice / Subscribe.
|
||||
// Neither side imports the other — both depend only on this package.
|
||||
//
|
||||
// Phase 3 scope: device list only. Frame/cursor pub-sub and SendToDevice are
|
||||
// added in later phases as features need them.
|
||||
package hub
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/yuanyuanxiang/SimpleRemoter/server/go/connection"
|
||||
"github.com/yuanyuanxiang/SimpleRemoter/server/go/protocol"
|
||||
)
|
||||
|
||||
// ErrDeviceOffline is returned by SendToDevice when the target device is not
|
||||
// (no longer) registered.
|
||||
var ErrDeviceOffline = errors.New("device offline")
|
||||
|
||||
// ErrNoSender is returned by SendToDevice if SetSender has not been called.
|
||||
var ErrNoSender = errors.New("hub sender not configured")
|
||||
|
||||
// SendFunc encodes-and-writes raw command bytes to a device's TCP context.
|
||||
// In practice this is bound to server.Server.Send at startup.
|
||||
type SendFunc func(ctx *connection.Context, data []byte) error
|
||||
|
||||
// Device is the internal record for one logical end-device (keyed by MasterID).
|
||||
// A single device may use multiple TCP sub-connections (screen, terminal …);
|
||||
// only the main login connection is stored here.
|
||||
@@ -43,9 +54,65 @@ type Device struct {
|
||||
RTT int // network latency in ms (Heartbeat.Ping)
|
||||
ActiveWindow string // foreground window title (Heartbeat.ActiveWnd, decoded)
|
||||
|
||||
// conn is the main connection's context. Web side will use it in Phase 4
|
||||
// to push COMMAND_SCREEN_SPY and similar commands via the hub.
|
||||
// conn is the main connection's context — used by SendToDevice to forward
|
||||
// commands (COMMAND_SCREEN_SPY, etc.) to the device.
|
||||
conn *connection.Context
|
||||
|
||||
// screenConn is the device-initiated sub-connection that streams
|
||||
// TOKEN_BITMAPINFO / TOKEN_FIRSTSCREEN / TOKEN_NEXTSCREEN frames. Bound
|
||||
// after the device responds to COMMAND_SCREEN_SPY. Nil while no screen
|
||||
// session is active.
|
||||
screenConn *connection.Context
|
||||
|
||||
// Cached screen state, for late-joining browsers. Populated by the
|
||||
// PublishResolution / PublishScreenFrame call sites. screenWidth==0 or
|
||||
// lastKeyframe==nil indicates "no session" / "no IDR seen yet".
|
||||
screenWidth int
|
||||
screenHeight int
|
||||
lastKeyframe []byte // fully-packed WS binary packet of the most recent IDR
|
||||
|
||||
// Cursor index dedup: cursors arrive on every frame (~30 Hz) but only
|
||||
// rarely change. Suppress duplicates so the WS doesn't carry redundant
|
||||
// JSON messages.
|
||||
cursorSeen bool
|
||||
lastCursorIndex byte
|
||||
}
|
||||
|
||||
// ScreenCache is a read-only snapshot of a device's last-seen screen state,
|
||||
// used by wsHub.handleConnect to bootstrap late joiners.
|
||||
type ScreenCache struct {
|
||||
Width int
|
||||
Height int
|
||||
Keyframe []byte // packed WS packet; nil if no keyframe cached yet
|
||||
Active bool // true iff a screen sub-conn is currently bound
|
||||
}
|
||||
|
||||
// ScreenState returns a snapshot of the device's current screen state, or
|
||||
// an empty struct if the device is unknown. Safe to call from any goroutine.
|
||||
func (h *Hub) ScreenState(deviceID string) ScreenCache {
|
||||
h.mu.RLock()
|
||||
defer h.mu.RUnlock()
|
||||
d, ok := h.devices[deviceID]
|
||||
if !ok {
|
||||
return ScreenCache{}
|
||||
}
|
||||
return ScreenCache{
|
||||
Width: d.screenWidth,
|
||||
Height: d.screenHeight,
|
||||
Keyframe: d.lastKeyframe,
|
||||
Active: d.screenConn != nil,
|
||||
}
|
||||
}
|
||||
|
||||
// MainConn exposes the device's main TCP context for callers that need to
|
||||
// send commands directly. Returns nil if the device is not registered.
|
||||
func (h *Hub) MainConn(id string) *connection.Context {
|
||||
h.mu.RLock()
|
||||
defer h.mu.RUnlock()
|
||||
if d, ok := h.devices[id]; ok {
|
||||
return d.conn
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// DeviceInfo is the JSON-safe projection of Device for the /api/devices
|
||||
@@ -70,27 +137,128 @@ type DeviceInfo struct {
|
||||
Online bool `json:"online"`
|
||||
}
|
||||
|
||||
// EventHandler receives notifications about device lifecycle and per-tick
|
||||
// live updates. Methods are invoked synchronously from Register / Unregister /
|
||||
// UpdateLive — implementations must be non-blocking (typically just write to
|
||||
// a channel or queue).
|
||||
// EventHandler receives notifications about device lifecycle, per-tick live
|
||||
// updates, screen frames and resolution changes. Methods are invoked
|
||||
// synchronously from the corresponding hub mutator — implementations must
|
||||
// be non-blocking (typically just write to a channel or queue).
|
||||
type EventHandler interface {
|
||||
OnDeviceOnline(d DeviceInfo)
|
||||
OnDeviceOffline(id string)
|
||||
OnDeviceUpdate(id string, rtt int, activeWindow string)
|
||||
// OnScreenFrame delivers a fully-formed WS binary packet for the given
|
||||
// device. The packet matches the C++ layout:
|
||||
// [DeviceID:4 LE][FrameType:1][DataLen:4 LE][H264:N]
|
||||
// Implementations should treat the slice as read-only.
|
||||
OnScreenFrame(deviceID string, packet []byte, isKeyframe bool)
|
||||
// OnResolutionChange fires when a screen session starts (TOKEN_BITMAPINFO)
|
||||
// or whenever the device reports a new screen geometry mid-stream.
|
||||
OnResolutionChange(deviceID string, width, height int)
|
||||
// OnCursorChange fires when the device's foreground cursor index changes.
|
||||
// Duplicates (same index as the previous frame) are filtered out by the
|
||||
// hub before reaching subscribers.
|
||||
OnCursorChange(deviceID string, index byte)
|
||||
}
|
||||
|
||||
// Hub is a thread-safe registry of online devices.
|
||||
type Hub struct {
|
||||
mu sync.RWMutex
|
||||
devices map[string]*Device
|
||||
subMu sync.RWMutex
|
||||
mu sync.RWMutex
|
||||
devices map[string]*Device
|
||||
subMu sync.RWMutex
|
||||
subscribers []EventHandler
|
||||
|
||||
sender SendFunc
|
||||
|
||||
// Reverse index: TCP context -> device ID for the device's screen
|
||||
// sub-connection. Lets us clean up on raw-connection close without
|
||||
// having to walk every device. Empty when no screen sessions exist.
|
||||
screenIndex map[*connection.Context]string
|
||||
screenIndexMu sync.RWMutex
|
||||
}
|
||||
|
||||
// New returns an empty Hub.
|
||||
func New() *Hub {
|
||||
return &Hub{devices: make(map[string]*Device)}
|
||||
return &Hub{
|
||||
devices: make(map[string]*Device),
|
||||
screenIndex: make(map[*connection.Context]string),
|
||||
}
|
||||
}
|
||||
|
||||
// SetSender wires the function used to deliver outbound bytes on a device's
|
||||
// main TCP connection. Typically called once in main() with server.Send.
|
||||
func (h *Hub) SetSender(fn SendFunc) {
|
||||
h.sender = fn
|
||||
}
|
||||
|
||||
// SendToDevice forwards an already-formed command payload to the device's
|
||||
// main connection. data should be the raw command bytes (the sender takes
|
||||
// care of framing / compression at the protocol layer).
|
||||
func (h *Hub) SendToDevice(id string, data []byte) error {
|
||||
h.mu.RLock()
|
||||
d, ok := h.devices[id]
|
||||
h.mu.RUnlock()
|
||||
if !ok || d.conn == nil {
|
||||
return ErrDeviceOffline
|
||||
}
|
||||
if h.sender == nil {
|
||||
return ErrNoSender
|
||||
}
|
||||
return h.sender(d.conn, data)
|
||||
}
|
||||
|
||||
// BindScreenConn associates a freshly-arrived sub-connection (the one that
|
||||
// just sent TOKEN_BITMAPINFO) with the device identified by clientID.
|
||||
// Returns false if the device is not registered — callers should drop the
|
||||
// orphan connection in that case.
|
||||
func (h *Hub) BindScreenConn(deviceID string, ctx *connection.Context) bool {
|
||||
if deviceID == "" || ctx == nil {
|
||||
return false
|
||||
}
|
||||
h.mu.Lock()
|
||||
d, ok := h.devices[deviceID]
|
||||
if !ok {
|
||||
h.mu.Unlock()
|
||||
return false
|
||||
}
|
||||
d.screenConn = ctx
|
||||
h.mu.Unlock()
|
||||
|
||||
h.screenIndexMu.Lock()
|
||||
h.screenIndex[ctx] = deviceID
|
||||
h.screenIndexMu.Unlock()
|
||||
return true
|
||||
}
|
||||
|
||||
// ScreenDeviceID returns the device ID whose screen sub-connection this
|
||||
// context represents, or "" if the context is not a screen sub-connection.
|
||||
// Used by the TCP layer to route TOKEN_FIRSTSCREEN / TOKEN_NEXTSCREEN frames.
|
||||
func (h *Hub) ScreenDeviceID(ctx *connection.Context) string {
|
||||
h.screenIndexMu.RLock()
|
||||
defer h.screenIndexMu.RUnlock()
|
||||
return h.screenIndex[ctx]
|
||||
}
|
||||
|
||||
// UnbindScreenConn removes the screen sub-connection mapping (called on TCP
|
||||
// disconnect of a screen sub-context). No-op if the context isn't tracked.
|
||||
func (h *Hub) UnbindScreenConn(ctx *connection.Context) {
|
||||
h.screenIndexMu.Lock()
|
||||
deviceID, ok := h.screenIndex[ctx]
|
||||
if !ok {
|
||||
h.screenIndexMu.Unlock()
|
||||
return
|
||||
}
|
||||
delete(h.screenIndex, ctx)
|
||||
h.screenIndexMu.Unlock()
|
||||
|
||||
h.mu.Lock()
|
||||
if d, ok := h.devices[deviceID]; ok && d.screenConn == ctx {
|
||||
d.screenConn = nil
|
||||
// Clear the cache too — when this device's screen comes back up, the
|
||||
// resolution and IDR will be republished fresh.
|
||||
d.screenWidth = 0
|
||||
d.screenHeight = 0
|
||||
d.lastKeyframe = nil
|
||||
}
|
||||
h.mu.Unlock()
|
||||
}
|
||||
|
||||
// Subscribe registers an EventHandler. The returned func removes it.
|
||||
@@ -119,14 +287,16 @@ func (h *Hub) snapshotSubscribers() []EventHandler {
|
||||
return out
|
||||
}
|
||||
|
||||
// Register records a device as online. Re-registering an existing ID overwrites
|
||||
// the previous entry (e.g. a client reconnect with the same MasterID).
|
||||
// A nil device or empty ID is silently ignored.
|
||||
// Register records a device as online and pins the main TCP connection that
|
||||
// will receive outbound commands via SendToDevice. Re-registering an existing
|
||||
// ID overwrites the previous entry (e.g. a client reconnect with the same
|
||||
// MasterID). A nil device, nil conn, or empty ID is silently ignored.
|
||||
// Subscribers are notified after the device is added.
|
||||
func (h *Hub) Register(d *Device) {
|
||||
if d == nil || d.ID == "" {
|
||||
func (h *Hub) Register(d *Device, conn *connection.Context) {
|
||||
if d == nil || d.ID == "" || conn == nil {
|
||||
return
|
||||
}
|
||||
d.conn = conn
|
||||
h.mu.Lock()
|
||||
h.devices[d.ID] = d
|
||||
info := deviceToInfo(d)
|
||||
@@ -187,6 +357,135 @@ func deviceToInfo(d *Device) DeviceInfo {
|
||||
}
|
||||
}
|
||||
|
||||
// PublishCursor notifies subscribers when the device reports a new cursor
|
||||
// index. Repeated identical indices are suppressed so the WS isn't spammed
|
||||
// with per-frame cursor JSON. No-op for unknown devices.
|
||||
func (h *Hub) PublishCursor(deviceID string, index byte) {
|
||||
h.mu.Lock()
|
||||
d, ok := h.devices[deviceID]
|
||||
if !ok {
|
||||
h.mu.Unlock()
|
||||
return
|
||||
}
|
||||
if d.cursorSeen && d.lastCursorIndex == index {
|
||||
h.mu.Unlock()
|
||||
return
|
||||
}
|
||||
d.cursorSeen = true
|
||||
d.lastCursorIndex = index
|
||||
h.mu.Unlock()
|
||||
for _, s := range h.snapshotSubscribers() {
|
||||
s.OnCursorChange(deviceID, index)
|
||||
}
|
||||
}
|
||||
|
||||
// CloseScreen tears down the active screen sub-connection for the device,
|
||||
// if any. Used when the last viewer leaves so the device stops capturing.
|
||||
//
|
||||
// Cache (screenConn / screenWidth / lastKeyframe) is cleared SYNCHRONOUSLY
|
||||
// here, not deferred to the eventual OnDisconnect → UnbindScreenConn path.
|
||||
// Otherwise a new viewer arriving in the brief window between TCP close and
|
||||
// the disconnect callback would see Active=true with stale dimensions/IDR
|
||||
// and skip the COMMAND_SCREEN_SPY kick, leaving the page stuck on a "connected"
|
||||
// status with no frames ever arriving.
|
||||
func (h *Hub) CloseScreen(deviceID string) {
|
||||
h.mu.Lock()
|
||||
d, ok := h.devices[deviceID]
|
||||
if !ok {
|
||||
h.mu.Unlock()
|
||||
return
|
||||
}
|
||||
sc := d.screenConn
|
||||
d.screenConn = nil
|
||||
d.screenWidth = 0
|
||||
d.screenHeight = 0
|
||||
d.lastKeyframe = nil
|
||||
h.mu.Unlock()
|
||||
if sc != nil {
|
||||
// Drop the screenIndex entry SYNCHRONOUSLY so any in-flight frames
|
||||
// still draining out of the device on this sub-conn (between our
|
||||
// FIN and the device's clean-up) are silently dropped instead of
|
||||
// being relayed to the freshly initialized browser decoder. Mixing
|
||||
// frames from the old x264 SPS/PPS sequence with the new session's
|
||||
// decoder produces the classic "every other quick reconnect goes
|
||||
// black" symptom — old NAL units come in via the old ctx after we
|
||||
// nulled d.screenConn but before OnDisconnect fires.
|
||||
h.screenIndexMu.Lock()
|
||||
delete(h.screenIndex, sc)
|
||||
h.screenIndexMu.Unlock()
|
||||
|
||||
// Tell the client to shut its screen pipeline down gracefully.
|
||||
// Without this, the client's IOCPClient sees recv()==0 as a network
|
||||
// blip and fires m_ReconnectFunc, which:
|
||||
// 1. Reconnects the sub-conn (~100 ms)
|
||||
// 2. Re-sends ConnAuthPacket (no BITMAPINFO!)
|
||||
// 3. Keeps the capture thread alive for ~10 s holding DXGI handles
|
||||
// 4. ConnAuth eventually times out, ScreenManager exits
|
||||
// Net effect: a second viewer arriving within ~10 s of leaving lands
|
||||
// in the dead window where the device is still capturing for the old
|
||||
// (now unrouted) sub-conn — page sits on "Waiting for video".
|
||||
//
|
||||
// COMMAND_BYE is what the C++ server sends via
|
||||
// CDialogBase::SayByeBye (server/2015Remote/IOCPServer.h:248) before
|
||||
// it tears down a sub-conn for the same reason. Client-side handler:
|
||||
// CScreenManager::OnReceive case COMMAND_BYE
|
||||
// (client/ScreenManager.cpp:812) sets m_bIsWorking=FALSE and calls
|
||||
// StopRunning() — the clean exit path that does NOT trigger reconnect.
|
||||
if h.sender != nil {
|
||||
_ = h.sender(sc, []byte{protocol.CommandBye})
|
||||
}
|
||||
// Mirror the C++ flow (ScreenSpyDlg.cpp:842 — Sleep(500); CancelIO()).
|
||||
// Give the device's read loop a moment to pull COMMAND_BYE off the
|
||||
// wire before our FIN arrives; otherwise on a fast LAN the BYE byte
|
||||
// can be coalesced with the FIN and the client's IOCPClient may
|
||||
// observe recv()==0 first and trigger reconnect anyway.
|
||||
// Run the close on a goroutine so the caller (web handler) isn't
|
||||
// blocked for 500 ms. screenIndex is already cleared above, so
|
||||
// in-flight frames during the grace window are silently dropped.
|
||||
go func(c *connection.Context) {
|
||||
time.Sleep(500 * time.Millisecond)
|
||||
c.Close()
|
||||
}(sc)
|
||||
}
|
||||
}
|
||||
|
||||
// PublishResolution announces a new (or first-ever) screen geometry for a
|
||||
// device. The browser uses width/height to initialize its WebCodecs decoder.
|
||||
// The latest dimensions are also cached on the Device so future late-joining
|
||||
// viewers can be bootstrapped without waiting for the next BITMAPINFO.
|
||||
func (h *Hub) PublishResolution(deviceID string, width, height int) {
|
||||
h.mu.Lock()
|
||||
if d, ok := h.devices[deviceID]; ok {
|
||||
d.screenWidth = width
|
||||
d.screenHeight = height
|
||||
}
|
||||
h.mu.Unlock()
|
||||
for _, s := range h.snapshotSubscribers() {
|
||||
s.OnResolutionChange(deviceID, width, height)
|
||||
}
|
||||
}
|
||||
|
||||
// PublishScreenFrame fans out a screen frame packet to all subscribers.
|
||||
// Callers must have already wrapped the H.264 NAL payload in the
|
||||
// [DeviceID:4][FrameType:1][DataLen:4][...] header expected by the browser.
|
||||
// The packet slice is shared with subscribers — do not mutate after publish.
|
||||
//
|
||||
// Keyframe packets are also retained on the Device record so a new viewer
|
||||
// joining a live session can immediately receive a decodable starting point
|
||||
// instead of waiting up to ~15 s for the next IDR.
|
||||
func (h *Hub) PublishScreenFrame(deviceID string, packet []byte, isKeyframe bool) {
|
||||
if isKeyframe {
|
||||
h.mu.Lock()
|
||||
if d, ok := h.devices[deviceID]; ok {
|
||||
d.lastKeyframe = packet
|
||||
}
|
||||
h.mu.Unlock()
|
||||
}
|
||||
for _, s := range h.snapshotSubscribers() {
|
||||
s.OnScreenFrame(deviceID, packet, isKeyframe)
|
||||
}
|
||||
}
|
||||
|
||||
// UpdateLive applies a heartbeat-derived RTT and active-window title to the
|
||||
// device's live fields, then notifies subscribers. No-op if the device is
|
||||
// not registered (e.g. heartbeat arriving for a connection that never sent
|
||||
|
||||
Reference in New Issue
Block a user