import { BufReader } from "https://deno.land/std@0.157.0/io/buffer.ts"; import { ByteSet, LengthType } from "https://deno.land/x/bytes@1.0.3/mod.ts"; const PORT = 5588; type Car = Plate; type U16 = number; type U32 = number; type Deciseconds = U32; type Location = U16; type Plate = string; type Road = U16; type Mile = U16; type SpeedLimit = U16; type SpeedMPH100 = U16; type Timestamp = U32; interface Camera { road: Road; location: Location; speedLimit: SpeedLimit; } interface Report { camera: Camera; car: Car; timestamp: Timestamp; } const tEnc = new TextEncoder(); const tDec = new TextDecoder(); console.log(`Starting TCP listener on on 0.0.0.0:${PORT}`); function session(conn: Deno.Conn) { serve(conn).catch((e) => console.error("Exception during session:", e)) .finally(() => disconnect(conn)); } enum MessageType { ERROR = 0x10, // Server -> Client PLATE = 0x20, // Client -> Server TICKET = 0x21, // Server -> Client WANT_HEARTBEAT = 0x40, // Client -> Server HEARTBEAT = 0x41, // Server -> Client I_AM_CAMERA = 0x80, // Client -> Server I_AM_DISPATCHER = 0x81, // Client -> Server } console.log("Message Types:", MessageType); type ErrorMessage = [ string, // msg ]; type PlateMessage = [ Plate, Timestamp, ]; type TicketMessage = [ Plate, // plate Road, // road Mile, // mile1 Timestamp, // timestamp1 Mile, // mile2 Timestamp, // timestamp2 SpeedMPH100, // speed ]; type WantHeartbeatMessage = [ Deciseconds, ]; type HeartbeatMessage = []; type IAmCameraMessage = [ Road, Mile, SpeedLimit, ]; type IAmDispatcherMessage = [ Road[], ]; type Message = | ErrorMessage | PlateMessage | TicketMessage | WantHeartbeatMessage | HeartbeatMessage | IAmCameraMessage | IAmDispatcherMessage; async function readU16(reader: BufReader): Promise { const buf = await reader.readFull(new Uint8Array(2)); if (!buf) throw `unable to fill buffer of length 2 with U32 bytes`; return ByteSet.from(buf, "big").read.uint16(); } async function readU32(reader: BufReader): Promise { const buf = await reader.readFull(new Uint8Array(4)); if (!buf) throw `unable to fill buffer of length 4 with U32 bytes`; return ByteSet.from(buf, "big").read.uint32(); } async function readString(reader: BufReader): Promise { const len = await reader.readByte(); if (!len) throw "unable to read string length byte"; const buf = await reader.readFull(new Uint8Array(len)); if (!buf) throw `unable to fill buffer of length ${len} with string bytes`; return tDec.decode(buf); } async function readArray( reader: BufReader, readCallback: (r: BufReader) => Promise, ): Promise { const len = await reader.readByte(); if (!len) throw "unable to read array length byte"; const result = []; while (result.length < len) { result.push(await readCallback(reader)); } if (result.length != len) { throw new Error( `unable to read array of length ${len} (got ${result.length})`, ); } return result; } async function seqRead( reader: BufReader, seq: ((r: BufReader) => Promise)[], ): Promise { const result = []; for (const f of seq) { result.push(await f(reader)); } return result as T; // TODO: cheating } async function readError(reader: BufReader): Promise { return await seqRead(reader, [readString]); } async function readPlate(reader: BufReader): Promise { return await seqRead(reader, [readString, readU32]); } async function readTicket(reader: BufReader): Promise { return await seqRead(reader, [ readString, readU16, readU16, readU32, readU16, readU32, readU16, ]); } async function readWantHeartbeat( reader: BufReader, ): Promise { return await seqRead(reader, [readU32]); } async function readIAmCamera(reader: BufReader): Promise { return await seqRead(reader, [readU16, readU16, readU16]); } async function readMessage( reader: BufReader, ): Promise<{ type: MessageType; message: Message }> { const type = await reader.readByte(); if (!type) throw new Error("Failed to read message type byte"); let message: Message | null = null; switch (type) { case MessageType.ERROR: message = await readError(reader); break; case MessageType.PLATE: message = await readPlate(reader); break; case MessageType.TICKET: message = await readTicket(reader); break; case MessageType.WANT_HEARTBEAT: message = await readWantHeartbeat(reader); break; case MessageType.HEARTBEAT: message = []; break; case MessageType.I_AM_CAMERA: message = await readIAmCamera(reader); break; case MessageType.I_AM_DISPATCHER: message = [await readArray(reader, readU16)]; break; default: throw new Error(`failed to read message of type 0x${type.toString(16)}`); } return { type, message }; } function dayOfTime(ts: Timestamp): number { return Math.floor(ts / 86400); } const intervals = new Map(); const cameras = new Map(); const dispatchers = new Map(); const observations = new Map>>(); const unsentTickets: TicketMessage[] = []; const dispatchedTicketDays = new Map>(); function makeObservation( plate: Plate, road: Road, location: Location, when: Timestamp, speedLimit: SpeedLimit, ) { let forPlate = observations.get(plate); if (!forPlate) { const newForPlate = new Map>(); observations.set(plate, newForPlate); forPlate = newForPlate; } let forRoad = forPlate.get(road); if (!forRoad) { const newForRoad = new Map(); forPlate.set(road, newForRoad); forRoad = newForRoad; } for (const [loc, ts] of forRoad.entries()) { const distMiles = Math.abs(location - loc); const deltaSecs = Math.abs(ts - when); const speed = distMiles / (deltaSecs / 60 / 60); console.log("Diff:", { plate, road, loc, ts, location, when, distMiles, deltaSecs, speed, speedLimit, day: dayOfTime(when), }); if (speed > speedLimit + 0.5) { let [m1, t1] = [loc, ts]; let [m2, t2] = [location, when]; if (when < ts) { [m2, t2] = [loc, ts]; [m1, t1] = [location, when]; } // for now, we will send one ticket per observation - should work, given the constraints dispatchTicket([ plate, road, m1, t1, m2, t2, Math.floor(speed * 100), ]); break; } } console.log("Observation:", { plate, road, location, when }); forRoad.set(location, when); } function handleMessage( conn: Deno.Conn, type: MessageType, message: Message, ) { console.debug("Message:", { from: conn.remoteAddr, type: MessageType[type.toString()], message, }); switch (type) { case MessageType.WANT_HEARTBEAT: { if (intervals.has(conn)) { return sendErrorAndDisconnect(conn, "heartbeat already specified"); } const [interval] = (message as WantHeartbeatMessage); setIntervalFor(conn, interval); break; } case MessageType.PLATE: { const cam = cameras.get(conn); if (!cam) return sendErrorAndDisconnect(conn, "not a camera"); const [plate, when] = (message as PlateMessage); makeObservation(plate, cam.road, cam.location, when, cam.speedLimit); break; } case MessageType.I_AM_CAMERA: { if (cameras.has(conn) || dispatchers.has(conn)) { sendErrorAndDisconnect(conn, "already identified"); } const [road, location, speedLimit] = (message as IAmCameraMessage); const camera: Camera = { road, location, speedLimit, }; cameras.set(conn, camera); break; } case MessageType.I_AM_DISPATCHER: { if (cameras.has(conn) || dispatchers.has(conn)) { sendErrorAndDisconnect(conn, "already identified"); } const [roads] = (message as IAmDispatcherMessage); dispatchers.set(conn, roads); for (let i = unsentTickets.length - 1; i >= 0; i--) { const t = unsentTickets[i]; const [_, road] = t; if (roads.includes(road)) { unsentTickets.splice(i, 1); sendTicket(conn, t); } } break; } default: console.warn("Unable to handle unknown message:", { from: conn.remoteAddr, type: MessageType[type.toString()], message, }); break; } } function markPlateDay(plate: Plate, day: number) { let forTicketDay = dispatchedTicketDays.get(plate); if (!forTicketDay) { const newForTicketDay = new Set(); dispatchedTicketDays.set(plate, newForTicketDay); forTicketDay = newForTicketDay; } forTicketDay.add(day); } function hasPlateDay(plate: Plate, day: number): boolean { return !!(dispatchedTicketDays.get(plate)?.has(day)); } function dispatchTicket(t: TicketMessage) { let [plate, road, mile1, ts1, mile2, ts2, speed] = t; let day1 = dayOfTime(ts1); const day2 = dayOfTime(ts2); while (day1 < day2) { const nts1 = Math.max(ts1, day1 * 86400); const nts2 = Math.min(((day1 + 1) * 86400) - 1, ts2); console.log("Multi-day ticket:", day1, day2, nts1, nts2); day1++; ts1 = nts2 + 1; dispatchTicket([plate, road, mile1, nts1, mile2, nts2, speed]); } if (hasPlateDay(plate, day1)) { return; } markPlateDay(plate, day1); let sent = false; for (const [conn, roads] of dispatchers.entries()) { if (roads.includes(road)) { sent = true; sendTicket(conn, t); break; } } if (!sent) unsentTickets.push(t); } async function sendTicket(conn: Deno.Conn, t: TicketMessage) { const [plate, road, m1, t1, m2, t2, speed] = t; console.log("Sending ticket:", t); const plateBytes = tEnc.encode(plate); const p = new ByteSet( 1 + 1 + plateBytes.length + 2 + 2 + 4 + 2 + 4 + 2, "big", ); p.write.uint8(MessageType.TICKET); // p.write.uint8(plateBytes.length); p.write.uint8Array(plateBytes, LengthType.Uint8); p.write.uint16(road); p.write.uint16(m1); p.write.uint32(t1); p.write.uint16(m2); p.write.uint32(t2); p.write.uint16(speed); await conn.write(p.buffer); } async function serve(conn: Deno.Conn) { const reader = new BufReader(conn); try { while (true) { const { type: type, message } = await readMessage(reader); handleMessage(conn, type, message); } } finally { clearIntervalFor(conn); } } function disconnect(conn: Deno.Conn) { console.log("Disconnecting:", conn.remoteAddr); try { conn.close(); } catch (_) { console.debug("Failed to close connection (probably already closed)"); } clearIntervalFor(conn); cameras.delete(conn); dispatchers.delete(conn); } async function sendErrorAndDisconnect(conn: Deno.Conn, msg: string) { console.warn("Sending error:", msg); await conn.write(new Uint8Array([MessageType.ERROR])); await conn.write(tEnc.encode(msg)); disconnect(conn); } function setIntervalFor(conn: Deno.Conn, intervalDeciseconds: number) { if (intervalDeciseconds <= 0) return; intervals.set( conn, setInterval(() => { try { conn.write(new Uint8Array([MessageType.HEARTBEAT])); } catch { clearIntervalFor(conn); } }, intervalDeciseconds * 100 /* deciseconds to ms */), ); } function clearIntervalFor(conn: Deno.Conn) { if (intervals.has(conn)) { clearInterval(intervals.get(conn)); intervals.delete(conn); } } for await (const conn of Deno.listen({ port: PORT })) { console.log("Connection established:", conn.remoteAddr); session(conn); }