diff --git a/6.ts b/6.ts index c1bb5cb..bd1cc7a 100644 --- a/6.ts +++ b/6.ts @@ -1,19 +1,24 @@ import { BufReader } from "https://deno.land/std@0.157.0/io/buffer.ts"; +import { ByteSet, LengthType } from "https://deno.land/x/bytes@1.0.3/mod.ts"; const PORT = 5588; -type Road = number; -type Dispatcher = Road[]; -type Location = number; -type SpeedLimit = number; -type Timestamp = number; -type Plate = string; type Car = Plate; -type U8 = number; type U16 = number; type U32 = number; +type Deciseconds = U32; +type Location = U16; +type Plate = string; + +type Road = U16; +type Mile = U16; + +type SpeedLimit = U16; +type SpeedMPH100 = U16; +type Timestamp = U32; + interface Camera { road: Road; location: Location; @@ -26,53 +31,444 @@ interface Report { timestamp: Timestamp; } -// const tEnc = new TextEncoder(); +const tEnc = new TextEncoder(); +const tDec = new TextDecoder(); console.log(`Starting TCP listener on on 0.0.0.0:${PORT}`); -function chatSession(conn: Deno.Conn) { - const closeOnDisconnect = () => { - console.warn("Disconnected client"); - try { - conn.close(); - } catch (err) { - console.error("Error closing client connection:", err); - } - }; - - serve(conn).catch((e) => e).finally(closeOnDisconnect); +function session(conn: Deno.Conn) { + serve(conn).catch((e) => console.error("Exception during session:", e)) + .finally(() => disconnect(conn)); } enum MessageType { ERROR = 0x10, // Server -> Client PLATE = 0x20, // Client -> Server TICKET = 0x21, // Server -> Client + WANT_HEARTBEAT = 0x40, // Client -> Server + HEARTBEAT = 0x41, // Server -> Client + I_AM_CAMERA = 0x80, // Client -> Server + I_AM_DISPATCHER = 0x81, // Client -> Server } +console.log("Message Types:", MessageType); + type ErrorMessage = [ string, // msg ]; type PlateMessage = [ - string, // plate - U32, // timestamp + Plate, + Timestamp, ]; -interface TicketMessage { +type TicketMessage = [ + Plate, // plate + Road, // road + Mile, // mile1 + Timestamp, // timestamp1 + Mile, // mile2 + Timestamp, // timestamp2 + SpeedMPH100, // speed +]; + +type WantHeartbeatMessage = [ + Deciseconds, +]; + +type HeartbeatMessage = []; + +type IAmCameraMessage = [ + Road, + Mile, + SpeedLimit, +]; + +type IAmDispatcherMessage = [ + Road[], +]; + +type Message = + | ErrorMessage + | PlateMessage + | TicketMessage + | WantHeartbeatMessage + | HeartbeatMessage + | IAmCameraMessage + | IAmDispatcherMessage; + +async function readU16(reader: BufReader): Promise { + const buf = await reader.readFull(new Uint8Array(2)); + if (!buf) throw `unable to fill buffer of length 2 with U32 bytes`; + return ByteSet.from(buf, "big").read.uint16(); } -type Message = ErrorMessage | PlateMessage; +async function readU32(reader: BufReader): Promise { + const buf = await reader.readFull(new Uint8Array(4)); + if (!buf) throw `unable to fill buffer of length 4 with U32 bytes`; + return ByteSet.from(buf, "big").read.uint32(); +} + +async function readString(reader: BufReader): Promise { + const len = await reader.readByte(); + if (!len) throw "unable to read string length byte"; + const buf = await reader.readFull(new Uint8Array(len)); + if (!buf) throw `unable to fill buffer of length ${len} with string bytes`; + return tDec.decode(buf); +} + +async function readArray( + reader: BufReader, + readCallback: (r: BufReader) => Promise, +): Promise { + const len = await reader.readByte(); + if (!len) throw "unable to read array length byte"; + const result = []; + while (result.length < len) { + result.push(await readCallback(reader)); + } + if (result.length != len) { + throw new Error( + `unable to read array of length ${len} (got ${result.length})`, + ); + } + return result; +} + +async function seqRead( + reader: BufReader, + seq: ((r: BufReader) => Promise)[], +): Promise { + const result = []; + for (const f of seq) { + result.push(await f(reader)); + } + return result as T; // TODO: cheating +} + +async function readError(reader: BufReader): Promise { + return await seqRead(reader, [readString]); +} + +async function readPlate(reader: BufReader): Promise { + return await seqRead(reader, [readString, readU32]); +} + +async function readTicket(reader: BufReader): Promise { + return await seqRead(reader, [ + readString, + readU16, + readU16, + readU32, + readU16, + readU32, + readU16, + ]); +} + +async function readWantHeartbeat( + reader: BufReader, +): Promise { + return await seqRead(reader, [readU32]); +} + +async function readIAmCamera(reader: BufReader): Promise { + return await seqRead(reader, [readU16, readU16, readU16]); +} + +async function readMessage( + reader: BufReader, +): Promise<{ type: MessageType; message: Message }> { + const type = await reader.readByte(); + if (!type) throw new Error("Failed to read message type byte"); + let message: Message | null = null; + switch (type) { + case MessageType.ERROR: + message = await readError(reader); + break; + case MessageType.PLATE: + message = await readPlate(reader); + break; + case MessageType.TICKET: + message = await readTicket(reader); + break; + case MessageType.WANT_HEARTBEAT: + message = await readWantHeartbeat(reader); + break; + case MessageType.HEARTBEAT: + message = []; + break; + case MessageType.I_AM_CAMERA: + message = await readIAmCamera(reader); + break; + case MessageType.I_AM_DISPATCHER: + message = [await readArray(reader, readU16)]; + break; + default: + throw new Error(`failed to read message of type 0x${type.toString(16)}`); + } + return { type, message }; +} + +function dayOfTime(ts: Timestamp): number { + return Math.floor(ts / 86400); +} + +const intervals = new Map(); +const cameras = new Map(); +const dispatchers = new Map(); +const observations = new Map>>(); +const unsentTickets: TicketMessage[] = []; +const dispatchedTicketDays = new Map>(); + +function makeObservation( + plate: Plate, + road: Road, + location: Location, + when: Timestamp, + speedLimit: SpeedLimit, +) { + let forPlate = observations.get(plate); + if (!forPlate) { + const newForPlate = new Map>(); + observations.set(plate, newForPlate); + forPlate = newForPlate; + } + let forRoad = forPlate.get(road); + if (!forRoad) { + const newForRoad = new Map(); + forPlate.set(road, newForRoad); + forRoad = newForRoad; + } + + for (const [loc, ts] of forRoad.entries()) { + const distMiles = Math.abs(location - loc); + const deltaSecs = Math.abs(ts - when); + const speed = distMiles / (deltaSecs / 60 / 60); + console.log("Diff:", { + plate, + road, + loc, + ts, + location, + when, + distMiles, + deltaSecs, + speed, + speedLimit, + day: dayOfTime(when), + }); + if (speed > speedLimit + 0.5) { + let [m1, t1] = [loc, ts]; + let [m2, t2] = [location, when]; + if (when < ts) { + [m2, t2] = [loc, ts]; + [m1, t1] = [location, when]; + } + + // for now, we will send one ticket per observation - should work, given the constraints + dispatchTicket([ + plate, + road, + m1, + t1, + m2, + t2, + Math.floor(speed * 100), + ]); + break; + } + } + + console.log("Observation:", { plate, road, location, when }); + + forRoad.set(location, when); +} + +function handleMessage( + conn: Deno.Conn, + type: MessageType, + message: Message, +) { + console.debug("Message:", { + from: conn.remoteAddr, + type: MessageType[type.toString()], + message, + }); + switch (type) { + case MessageType.WANT_HEARTBEAT: { + if (intervals.has(conn)) { + return sendErrorAndDisconnect(conn, "heartbeat already specified"); + } + const [interval] = (message as WantHeartbeatMessage); + setIntervalFor(conn, interval); + break; + } + case MessageType.PLATE: { + const cam = cameras.get(conn); + if (!cam) return sendErrorAndDisconnect(conn, "not a camera"); + const [plate, when] = (message as PlateMessage); + makeObservation(plate, cam.road, cam.location, when, cam.speedLimit); + break; + } + case MessageType.I_AM_CAMERA: { + if (cameras.has(conn) || dispatchers.has(conn)) { + sendErrorAndDisconnect(conn, "already identified"); + } + const [road, location, speedLimit] = (message as IAmCameraMessage); + const camera: Camera = { + road, + location, + speedLimit, + }; + cameras.set(conn, camera); + break; + } + case MessageType.I_AM_DISPATCHER: { + if (cameras.has(conn) || dispatchers.has(conn)) { + sendErrorAndDisconnect(conn, "already identified"); + } + const [roads] = (message as IAmDispatcherMessage); + dispatchers.set(conn, roads); + for (let i = unsentTickets.length - 1; i >= 0; i--) { + const t = unsentTickets[i]; + const [_, road] = t; + if (roads.includes(road)) { + unsentTickets.splice(i, 1); + sendTicket(conn, t); + } + } + break; + } + default: + console.warn("Unable to handle unknown message:", { + from: conn.remoteAddr, + type: MessageType[type.toString()], + message, + }); + break; + } +} + +function markPlateDay(plate: Plate, day: number) { + let forTicketDay = dispatchedTicketDays.get(plate); + if (!forTicketDay) { + const newForTicketDay = new Set(); + dispatchedTicketDays.set(plate, newForTicketDay); + forTicketDay = newForTicketDay; + } + forTicketDay.add(day); +} + +function hasPlateDay(plate: Plate, day: number): boolean { + return !!(dispatchedTicketDays.get(plate)?.has(day)); +} + +function dispatchTicket(t: TicketMessage) { + let [plate, road, mile1, ts1, mile2, ts2, speed] = t; + let day1 = dayOfTime(ts1); + const day2 = dayOfTime(ts2); + + while (day1 < day2) { + const nts1 = Math.max(ts1, day1 * 86400); + const nts2 = Math.min(((day1 + 1) * 86400) - 1, ts2); + console.log("Multi-day ticket:", day1, day2, nts1, nts2); + day1++; + ts1 = nts2 + 1; + dispatchTicket([plate, road, mile1, nts1, mile2, nts2, speed]); + } + + if (hasPlateDay(plate, day1)) { + return; + } + markPlateDay(plate, day1); + + let sent = false; + for (const [conn, roads] of dispatchers.entries()) { + if (roads.includes(road)) { + sent = true; + sendTicket(conn, t); + break; + } + } + if (!sent) unsentTickets.push(t); +} + +async function sendTicket(conn: Deno.Conn, t: TicketMessage) { + const [plate, road, m1, t1, m2, t2, speed] = t; + console.log("Sending ticket:", t); + const plateBytes = tEnc.encode(plate); + + const p = new ByteSet( + 1 + 1 + plateBytes.length + 2 + 2 + 4 + 2 + 4 + 2, + "big", + ); + p.write.uint8(MessageType.TICKET); + // p.write.uint8(plateBytes.length); + p.write.uint8Array(plateBytes, LengthType.Uint8); + p.write.uint16(road); + p.write.uint16(m1); + p.write.uint32(t1); + p.write.uint16(m2); + p.write.uint32(t2); + p.write.uint16(speed); + await conn.write(p.buffer); +} async function serve(conn: Deno.Conn) { const reader = new BufReader(conn); - const type = await reader.peek(1); - if (!type) throw new Error("failed to peek start of packet"); - console.log({ type }, 0x64); - // TODO: loop... + try { + while (true) { + const { type: type, message } = await readMessage(reader); + handleMessage(conn, type, message); + } + } finally { + clearIntervalFor(conn); + } +} + +function disconnect(conn: Deno.Conn) { + console.log("Disconnecting:", conn.remoteAddr); + try { + conn.close(); + } catch (_) { + console.debug("Failed to close connection (probably already closed)"); + } + clearIntervalFor(conn); + cameras.delete(conn); + dispatchers.delete(conn); +} + +async function sendErrorAndDisconnect(conn: Deno.Conn, msg: string) { + console.warn("Sending error:", msg); + await conn.write(new Uint8Array([MessageType.ERROR])); + await conn.write(tEnc.encode(msg)); + disconnect(conn); +} + +function setIntervalFor(conn: Deno.Conn, intervalDeciseconds: number) { + if (intervalDeciseconds <= 0) return; + + intervals.set( + conn, + setInterval(() => { + try { + conn.write(new Uint8Array([MessageType.HEARTBEAT])); + } catch { + clearIntervalFor(conn); + } + }, intervalDeciseconds * 100 /* deciseconds to ms */), + ); +} + +function clearIntervalFor(conn: Deno.Conn) { + if (intervals.has(conn)) { + clearInterval(intervals.get(conn)); + intervals.delete(conn); + } } for await (const conn of Deno.listen({ port: PORT })) { console.log("Connection established:", conn.remoteAddr); - chatSession(conn); - console.log("Connection closed:", conn.remoteAddr); + session(conn); } diff --git a/deno.lock b/deno.lock new file mode 100644 index 0000000..ed3ec25 --- /dev/null +++ b/deno.lock @@ -0,0 +1,18 @@ +{ + "version": "2", + "remote": { + "https://deno.land/std@0.157.0/_util/assert.ts": "e94f2eb37cebd7f199952e242c77654e43333c1ac4c5c700e929ea3aa5489f74", + "https://deno.land/std@0.157.0/bytes/bytes_list.ts": "aba5e2369e77d426b10af1de0dcc4531acecec27f9b9056f4f7bfbf8ac147ab4", + "https://deno.land/std@0.157.0/bytes/equals.ts": "3c3558c3ae85526f84510aa2b48ab2ad7bdd899e2e0f5b7a8ffc85acb3a6043a", + "https://deno.land/std@0.157.0/bytes/mod.ts": "763f97d33051cc3f28af1a688dfe2830841192a9fea0cbaa55f927b49d49d0bf", + "https://deno.land/std@0.157.0/io/buffer.ts": "fae02290f52301c4e0188670e730cd902f9307fb732d79c4aa14ebdc82497289", + "https://deno.land/std@0.157.0/io/types.d.ts": "0cae3a62da7a37043661746c65c021058bae020b54e50c0e774916e5d4baee43", + "https://deno.land/x/bytes@1.0.3/mod.ts": "0df9e698738399cb6ab2ceaff1d64d45e1581d62c900e3cc6664d75556288fb8", + "https://deno.land/x/bytes@1.0.3/src/bitarray/BitArray.ts": "0b3ad2f03aee0bc5f1f047714e83fec43e9ccbab32b4113cb61925ac80fec62d", + "https://deno.land/x/bytes@1.0.3/src/bitarray/Read.ts": "f8e721c984a7297025de9406304cf96cf9778f999fa6ed064dfee5f3d4f9800d", + "https://deno.land/x/bytes@1.0.3/src/bitarray/Write.ts": "2ef36768c7d47b00e9100507452ce8ce9b14912bc0f1059926a022a719de7d9c", + "https://deno.land/x/bytes@1.0.3/src/byteset/ByteSet.ts": "9b7306cb12852cfaf6340378a87c9e5b33eba6de8c3d4c05ddc411bb3da2743c", + "https://deno.land/x/bytes@1.0.3/src/byteset/Read.ts": "761966c30456c6cc5b4a52c7e154139033b03e6b14c8cecea88f68ad3f0c2b66", + "https://deno.land/x/bytes@1.0.3/src/byteset/Write.ts": "9ec31b39bc1306960f70486e3e898cc38902f2ce3ba338912e1acdee6867b97e" + } +} diff --git a/readme.md b/readme.md index e42d1f6..23c8cba 100644 --- a/readme.md +++ b/readme.md @@ -10,13 +10,7 @@ the `std` modules available to do as much as possible in a very asynchronous [Install Deno][deno#install] -# Usage - -```bash -deno run -A --unstable N.ts -``` - -## Example +# Example Usage ```bash deno run -A --unstable 0.ts