Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions app/static/detail_panel.js
Original file line number Diff line number Diff line change
Expand Up @@ -326,6 +326,9 @@ export function buildPopupContent(a, now, airports) {
`<span class="stat-val pop-signal"></span></div>` +
`<div class="stat"><span class="stat-label">First seen</span>` +
`<span class="stat-val pop-first-seen"></span></div>` +
`<div class="stat pop-peers-stat" hidden>` +
`<span class="stat-label">Also seen by</span>` +
`<span class="stat-val pop-peers-val"></span></div>` +
`</div>`;
injectHelpIcons(root);
updatePopupContent(root, a, now, airports);
Expand Down Expand Up @@ -605,6 +608,18 @@ export function updatePopupContent(root, a, now, airports) {
const firstSeen = entry?.sessionFirstSeen || a.first_seen;
q('.pop-first-seen').textContent = relativeAge(firstSeen, now);

// P2P "also seen by N peers" — surfaced only when the relay reported a
// non-zero count for this aircraft. Locally-only aircraft (e.g. P2P off,
// or no peer is currently reporting this ICAO) leave the stat hidden.
const peersStat = q('.pop-peers-stat');
const seenN = a.seen_by_others;
if (typeof seenN === 'number' && seenN > 0) {
q('.pop-peers-val').textContent = `${seenN} peer${seenN === 1 ? '' : 's'}`;
peersStat.hidden = false;
} else {
peersStat.hidden = true;
}

renderCommBSection(root, q, a, now);
}

Expand Down
5 changes: 4 additions & 1 deletion dotnet/src/FlightJar.Api/Endpoints/P2PEndpoints.cs
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,10 @@ private static string Sanitise(RegistrySnapshot snap)
foreach (var ac in snap.Aircraft)
{
if (ac.Peer == true) continue;
aircraft.Add(ac with { DistanceKm = null });
// SeenByOthers is filled by the cloud relay per-recipient and
// doesn't make sense over a direct same-LAN /p2p/ws stream;
// strip it so consumers can't mistake it for our own count.
aircraft.Add(ac with { DistanceKm = null, SeenByOthers = null });
}
var sanitised = snap with
{
Expand Down
6 changes: 5 additions & 1 deletion dotnet/src/FlightJar.Api/Hosting/P2PRelayClientService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,11 @@ private string BuildSanitisedPayload()
foreach (var ac in snap.Aircraft)
{
if (ac.Peer == true) continue; // don't echo back what we received
aircraft.Add(ac with { DistanceKm = null });
// Strip receiver-specific + relay-computed fields:
// - DistanceKm describes our reception, not the aircraft itself.
// - SeenByOthers is filled by the relay per-recipient; if we
// echoed it back the relay would treat it as our reading.
aircraft.Add(ac with { DistanceKm = null, SeenByOthers = null });
}

// Explicitly null out receiver location — never share it with the relay.
Expand Down
5 changes: 5 additions & 0 deletions dotnet/src/FlightJar.Core/State/PeerMerge.cs
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,11 @@ public static SnapshotAircraft Combine(SnapshotAircraft local, SnapshotAircraft
// Peer flag stays null — we have direct contact, so this
// record renders as a local aircraft (no peer styling).
Peer = null,

// Relay-computed: how many other peers also report this ICAO.
// The relay calculated this per-recipient (excluding us), so
// we just carry the peer record's value through.
SeenByOthers = peer.SeenByOthers,
};
}
}
8 changes: 8 additions & 0 deletions dotnet/src/FlightJar.Core/State/RegistrySnapshot.cs
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,14 @@ public sealed record SnapshotAircraft
/// for locally-observed aircraft.</summary>
public bool? Peer { get; init; }

/// <summary>How many OTHER P2P peers also see this aircraft (excluding
/// this receiver). Computed by the relay per-recipient and surfaced via
/// the aggregate broadcast — null for locally-only aircraft when the
/// relay isn't connected, or any aircraft outside the federation. The
/// detail panel renders an "also seen by N peers" stat when this is
/// > 0.</summary>
public int? SeenByOthers { get; init; }

// Enrichment fields populated by the snapshot pusher from the external
// clients. Names match the wire schema the frontend reads directly:
// `origin`, `destination`, `phase`, `operator`, `operator_iata`,
Expand Down
15 changes: 15 additions & 0 deletions dotnet/tests/FlightJar.Core.Tests/State/PeerMergeTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,21 @@ public void ExtendAirports_SkipsAircraftWithNullAirportInfo()
Assert.Empty(airports);
}

[Fact]
public void Combine_TakesSeenByOthersFromPeer()
{
// The relay computes seen_by_others per-recipient (it knows the
// contributor set) and stamps it on the peer record. Combine must
// surface that value on the merged record so the detail panel can
// render the count even when the aircraft is locally observed.
var local = new SnapshotAircraft { Icao = "abc123", Callsign = "BAW123" };
var peer = new SnapshotAircraft { Icao = "abc123", SeenByOthers = 3 };

var merged = PeerMerge.Combine(local, peer);

Assert.Equal(3, merged.SeenByOthers);
}

[Fact]
public void Combine_FillsAltitudeAndVelocityFromPeer()
{
Expand Down
61 changes: 48 additions & 13 deletions relay-worker/src/relay.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,10 @@ import { DurableObject } from 'cloudflare:workers';
interface AircraftEntry {
data: Record<string, unknown>;
receivedAt: number; // epoch seconds
// WebSockets that have reported this aircraft, keyed by their last
// contribution timestamp. Lets us tell each recipient how many OTHER
// peers also see the aircraft, so the UI can render "also seen by N".
contributors: Map<WebSocket, number>;
}

interface TokenRecord {
Expand Down Expand Up @@ -143,7 +147,7 @@ export class RelayDurableObject extends DurableObject {
}));
}

override webSocketMessage(_ws: WebSocket, message: string | ArrayBuffer): void {
override webSocketMessage(ws: WebSocket, message: string | ArrayBuffer): void {
if (typeof message !== 'string') return;

let parsed: Record<string, unknown>;
Expand All @@ -163,17 +167,31 @@ export class RelayDurableObject extends DurableObject {
if (typeof ac !== 'object' || ac === null) continue;
const icao = (ac as Record<string, unknown>)['icao'];
if (typeof icao !== 'string' || !icao) continue;
this.aggregate.set(icao.toLowerCase(), { data: ac as Record<string, unknown>, receivedAt: now });
const key = icao.toLowerCase();
const existing = this.aggregate.get(key);
if (existing) {
existing.data = ac as Record<string, unknown>;
existing.receivedAt = now;
existing.contributors.set(ws, now);
} else {
this.aggregate.set(key, {
data: ac as Record<string, unknown>,
receivedAt: now,
contributors: new Map([[ws, now]]),
});
}
}
}

override webSocketClose(ws: WebSocket): void {
this.connections.delete(ws);
this.dropContributor(ws);
console.log(JSON.stringify({ event: 'disconnect', connections: this.connections.size }));
}

override webSocketError(ws: WebSocket, error: unknown): void {
this.connections.delete(ws);
this.dropContributor(ws);
try { ws.close(); } catch { /* already closed */ }
console.log(JSON.stringify({
event: 'ws_error',
Expand All @@ -182,6 +200,15 @@ export class RelayDurableObject extends DurableObject {
}));
}

// Remove `ws` from every aircraft's contributor map. Called on clean
// disconnect so the seen_by_others count drops immediately rather than
// waiting up to STALE_S for the lazy eviction in broadcast() to catch up.
private dropContributor(ws: WebSocket): void {
for (const entry of this.aggregate.values()) {
entry.contributors.delete(ws);
}
}

override async alarm(): Promise<void> {
this.broadcast();
// Re-schedule unless there are no connections (DO will hibernate)
Expand All @@ -196,27 +223,34 @@ export class RelayDurableObject extends DurableObject {
const now = Date.now() / 1000;
const cutoff = now - STALE_S;

// Evict stale and collect fresh
const fresh: Record<string, unknown>[] = [];
// Evict stale aggregate entries; for each surviving entry also sweep
// out per-WS contributor timestamps that have aged past the same
// cutoff, so seen_by_others counts a contributor only as long as
// they're actively reporting the aircraft.
const fresh: AircraftEntry[] = [];
for (const [icao, entry] of this.aggregate) {
if (entry.receivedAt < cutoff) {
this.aggregate.delete(icao);
} else {
fresh.push(entry.data);
continue;
}
for (const [ws, ts] of entry.contributors) {
if (ts < cutoff) entry.contributors.delete(ws);
}
fresh.push(entry);
}

if (fresh.length === 0 && this.connections.size === 0) return;

// Count of "other" peers from any single recipient's perspective —
// every connected client is one of `connections`, so each sees
// `connections.size - 1` others. Same number for every recipient,
// so one shared payload is correct.
// Total connected count goes in the envelope; per-aircraft counts go
// in `seen_by_others` and are computed per-recipient (each WS subtracts
// itself from every aircraft it's contributing to).
const peers = Math.max(0, this.connections.size - 1);
const payload = JSON.stringify({ type: 'aggregate', peers, aircraft: fresh });
const dead: WebSocket[] = [];

for (const ws of this.connections) {
const aircraft = fresh.map(entry => ({
...entry.data,
seen_by_others: entry.contributors.size - (entry.contributors.has(ws) ? 1 : 0),
}));
const payload = JSON.stringify({ type: 'aggregate', peers, aircraft });
try {
ws.send(payload);
} catch {
Expand All @@ -226,6 +260,7 @@ export class RelayDurableObject extends DurableObject {

for (const ws of dead) {
this.connections.delete(ws);
this.dropContributor(ws);
try { ws.close(); } catch { /* already closed */ }
}
}
Expand Down
Loading