This commit is contained in:
Daniel Flanagan 2022-11-23 17:00:44 -06:00
parent 0ed40d35c2
commit f0efe75ece
Signed by: lytedev
GPG key ID: 5B2020A0F9921EF4
3 changed files with 444 additions and 36 deletions

454
6.ts
View file

@ -1,19 +1,24 @@
import { BufReader } from "https://deno.land/std@0.157.0/io/buffer.ts"; import { BufReader } from "https://deno.land/std@0.157.0/io/buffer.ts";
import { ByteSet, LengthType } from "https://deno.land/x/bytes@1.0.3/mod.ts";
const PORT = 5588; const PORT = 5588;
type Road = number;
type Dispatcher = Road[];
type Location = number;
type SpeedLimit = number;
type Timestamp = number;
type Plate = string;
type Car = Plate; type Car = Plate;
type U8 = number;
type U16 = number; type U16 = number;
type U32 = number; type U32 = number;
type Deciseconds = U32;
type Location = U16;
type Plate = string;
type Road = U16;
type Mile = U16;
type SpeedLimit = U16;
type SpeedMPH100 = U16;
type Timestamp = U32;
interface Camera { interface Camera {
road: Road; road: Road;
location: Location; location: Location;
@ -26,53 +31,444 @@ interface Report {
timestamp: Timestamp; timestamp: Timestamp;
} }
// const tEnc = new TextEncoder(); const tEnc = new TextEncoder();
const tDec = new TextDecoder();
console.log(`Starting TCP listener on on 0.0.0.0:${PORT}`); console.log(`Starting TCP listener on on 0.0.0.0:${PORT}`);
function chatSession(conn: Deno.Conn) { function session(conn: Deno.Conn) {
const closeOnDisconnect = () => { serve(conn).catch((e) => console.error("Exception during session:", e))
console.warn("Disconnected client"); .finally(() => disconnect(conn));
try {
conn.close();
} catch (err) {
console.error("Error closing client connection:", err);
}
};
serve(conn).catch((e) => e).finally(closeOnDisconnect);
} }
enum MessageType { enum MessageType {
ERROR = 0x10, // Server -> Client ERROR = 0x10, // Server -> Client
PLATE = 0x20, // Client -> Server PLATE = 0x20, // Client -> Server
TICKET = 0x21, // Server -> Client TICKET = 0x21, // Server -> Client
WANT_HEARTBEAT = 0x40, // Client -> Server
HEARTBEAT = 0x41, // Server -> Client
I_AM_CAMERA = 0x80, // Client -> Server
I_AM_DISPATCHER = 0x81, // Client -> Server
} }
console.log("Message Types:", MessageType);
type ErrorMessage = [ type ErrorMessage = [
string, // msg string, // msg
]; ];
type PlateMessage = [ type PlateMessage = [
string, // plate Plate,
U32, // timestamp Timestamp,
]; ];
interface TicketMessage { type TicketMessage = [
Plate, // plate
Road, // road
Mile, // mile1
Timestamp, // timestamp1
Mile, // mile2
Timestamp, // timestamp2
SpeedMPH100, // speed
];
type WantHeartbeatMessage = [
Deciseconds,
];
type HeartbeatMessage = [];
type IAmCameraMessage = [
Road,
Mile,
SpeedLimit,
];
type IAmDispatcherMessage = [
Road[],
];
type Message =
| ErrorMessage
| PlateMessage
| TicketMessage
| WantHeartbeatMessage
| HeartbeatMessage
| IAmCameraMessage
| IAmDispatcherMessage;
async function readU16(reader: BufReader): Promise<U16> {
const buf = await reader.readFull(new Uint8Array(2));
if (!buf) throw `unable to fill buffer of length 2 with U32 bytes`;
return ByteSet.from(buf, "big").read.uint16();
} }
type Message = ErrorMessage | PlateMessage; async function readU32(reader: BufReader): Promise<U32> {
const buf = await reader.readFull(new Uint8Array(4));
if (!buf) throw `unable to fill buffer of length 4 with U32 bytes`;
return ByteSet.from(buf, "big").read.uint32();
}
async function readString(reader: BufReader): Promise<string> {
const len = await reader.readByte();
if (!len) throw "unable to read string length byte";
const buf = await reader.readFull(new Uint8Array(len));
if (!buf) throw `unable to fill buffer of length ${len} with string bytes`;
return tDec.decode(buf);
}
async function readArray<T>(
reader: BufReader,
readCallback: (r: BufReader) => Promise<T>,
): Promise<T[]> {
const len = await reader.readByte();
if (!len) throw "unable to read array length byte";
const result = [];
while (result.length < len) {
result.push(await readCallback(reader));
}
if (result.length != len) {
throw new Error(
`unable to read array of length ${len} (got ${result.length})`,
);
}
return result;
}
async function seqRead<T extends Message>(
reader: BufReader,
seq: ((r: BufReader) => Promise<T[keyof T]>)[],
): Promise<T> {
const result = [];
for (const f of seq) {
result.push(await f(reader));
}
return result as T; // TODO: cheating
}
async function readError(reader: BufReader): Promise<ErrorMessage> {
return await seqRead(reader, [readString]);
}
async function readPlate(reader: BufReader): Promise<PlateMessage> {
return await seqRead(reader, [readString, readU32]);
}
async function readTicket(reader: BufReader): Promise<TicketMessage> {
return await seqRead(reader, [
readString,
readU16,
readU16,
readU32,
readU16,
readU32,
readU16,
]);
}
async function readWantHeartbeat(
reader: BufReader,
): Promise<WantHeartbeatMessage> {
return await seqRead(reader, [readU32]);
}
async function readIAmCamera(reader: BufReader): Promise<IAmCameraMessage> {
return await seqRead(reader, [readU16, readU16, readU16]);
}
async function readMessage(
reader: BufReader,
): Promise<{ type: MessageType; message: Message }> {
const type = await reader.readByte();
if (!type) throw new Error("Failed to read message type byte");
let message: Message | null = null;
switch (type) {
case MessageType.ERROR:
message = await readError(reader);
break;
case MessageType.PLATE:
message = await readPlate(reader);
break;
case MessageType.TICKET:
message = await readTicket(reader);
break;
case MessageType.WANT_HEARTBEAT:
message = await readWantHeartbeat(reader);
break;
case MessageType.HEARTBEAT:
message = [];
break;
case MessageType.I_AM_CAMERA:
message = await readIAmCamera(reader);
break;
case MessageType.I_AM_DISPATCHER:
message = [await readArray(reader, readU16)];
break;
default:
throw new Error(`failed to read message of type 0x${type.toString(16)}`);
}
return { type, message };
}
function dayOfTime(ts: Timestamp): number {
return Math.floor(ts / 86400);
}
const intervals = new Map<Deno.Conn, number>();
const cameras = new Map<Deno.Conn, Camera>();
const dispatchers = new Map<Deno.Conn, Road[]>();
const observations = new Map<Plate, Map<Road, Map<Location, Timestamp>>>();
const unsentTickets: TicketMessage[] = [];
const dispatchedTicketDays = new Map<Plate, Set<number>>();
function makeObservation(
plate: Plate,
road: Road,
location: Location,
when: Timestamp,
speedLimit: SpeedLimit,
) {
let forPlate = observations.get(plate);
if (!forPlate) {
const newForPlate = new Map<Road, Map<Location, Timestamp>>();
observations.set(plate, newForPlate);
forPlate = newForPlate;
}
let forRoad = forPlate.get(road);
if (!forRoad) {
const newForRoad = new Map<Location, Timestamp>();
forPlate.set(road, newForRoad);
forRoad = newForRoad;
}
for (const [loc, ts] of forRoad.entries()) {
const distMiles = Math.abs(location - loc);
const deltaSecs = Math.abs(ts - when);
const speed = distMiles / (deltaSecs / 60 / 60);
console.log("Diff:", {
plate,
road,
loc,
ts,
location,
when,
distMiles,
deltaSecs,
speed,
speedLimit,
day: dayOfTime(when),
});
if (speed > speedLimit + 0.5) {
let [m1, t1] = [loc, ts];
let [m2, t2] = [location, when];
if (when < ts) {
[m2, t2] = [loc, ts];
[m1, t1] = [location, when];
}
// for now, we will send one ticket per observation - should work, given the constraints
dispatchTicket([
plate,
road,
m1,
t1,
m2,
t2,
Math.floor(speed * 100),
]);
break;
}
}
console.log("Observation:", { plate, road, location, when });
forRoad.set(location, when);
}
function handleMessage(
conn: Deno.Conn,
type: MessageType,
message: Message,
) {
console.debug("Message:", {
from: conn.remoteAddr,
type: MessageType[type.toString()],
message,
});
switch (type) {
case MessageType.WANT_HEARTBEAT: {
if (intervals.has(conn)) {
return sendErrorAndDisconnect(conn, "heartbeat already specified");
}
const [interval] = (message as WantHeartbeatMessage);
setIntervalFor(conn, interval);
break;
}
case MessageType.PLATE: {
const cam = cameras.get(conn);
if (!cam) return sendErrorAndDisconnect(conn, "not a camera");
const [plate, when] = (message as PlateMessage);
makeObservation(plate, cam.road, cam.location, when, cam.speedLimit);
break;
}
case MessageType.I_AM_CAMERA: {
if (cameras.has(conn) || dispatchers.has(conn)) {
sendErrorAndDisconnect(conn, "already identified");
}
const [road, location, speedLimit] = (message as IAmCameraMessage);
const camera: Camera = {
road,
location,
speedLimit,
};
cameras.set(conn, camera);
break;
}
case MessageType.I_AM_DISPATCHER: {
if (cameras.has(conn) || dispatchers.has(conn)) {
sendErrorAndDisconnect(conn, "already identified");
}
const [roads] = (message as IAmDispatcherMessage);
dispatchers.set(conn, roads);
for (let i = unsentTickets.length - 1; i >= 0; i--) {
const t = unsentTickets[i];
const [_, road] = t;
if (roads.includes(road)) {
unsentTickets.splice(i, 1);
sendTicket(conn, t);
}
}
break;
}
default:
console.warn("Unable to handle unknown message:", {
from: conn.remoteAddr,
type: MessageType[type.toString()],
message,
});
break;
}
}
function markPlateDay(plate: Plate, day: number) {
let forTicketDay = dispatchedTicketDays.get(plate);
if (!forTicketDay) {
const newForTicketDay = new Set<number>();
dispatchedTicketDays.set(plate, newForTicketDay);
forTicketDay = newForTicketDay;
}
forTicketDay.add(day);
}
function hasPlateDay(plate: Plate, day: number): boolean {
return !!(dispatchedTicketDays.get(plate)?.has(day));
}
function dispatchTicket(t: TicketMessage) {
let [plate, road, mile1, ts1, mile2, ts2, speed] = t;
let day1 = dayOfTime(ts1);
const day2 = dayOfTime(ts2);
while (day1 < day2) {
const nts1 = Math.max(ts1, day1 * 86400);
const nts2 = Math.min(((day1 + 1) * 86400) - 1, ts2);
console.log("Multi-day ticket:", day1, day2, nts1, nts2);
day1++;
ts1 = nts2 + 1;
dispatchTicket([plate, road, mile1, nts1, mile2, nts2, speed]);
}
if (hasPlateDay(plate, day1)) {
return;
}
markPlateDay(plate, day1);
let sent = false;
for (const [conn, roads] of dispatchers.entries()) {
if (roads.includes(road)) {
sent = true;
sendTicket(conn, t);
break;
}
}
if (!sent) unsentTickets.push(t);
}
async function sendTicket(conn: Deno.Conn, t: TicketMessage) {
const [plate, road, m1, t1, m2, t2, speed] = t;
console.log("Sending ticket:", t);
const plateBytes = tEnc.encode(plate);
const p = new ByteSet(
1 + 1 + plateBytes.length + 2 + 2 + 4 + 2 + 4 + 2,
"big",
);
p.write.uint8(MessageType.TICKET);
// p.write.uint8(plateBytes.length);
p.write.uint8Array(plateBytes, LengthType.Uint8);
p.write.uint16(road);
p.write.uint16(m1);
p.write.uint32(t1);
p.write.uint16(m2);
p.write.uint32(t2);
p.write.uint16(speed);
await conn.write(p.buffer);
}
async function serve(conn: Deno.Conn) { async function serve(conn: Deno.Conn) {
const reader = new BufReader(conn); const reader = new BufReader(conn);
const type = await reader.peek(1); try {
if (!type) throw new Error("failed to peek start of packet"); while (true) {
console.log({ type }, 0x64); const { type: type, message } = await readMessage(reader);
// TODO: loop... handleMessage(conn, type, message);
}
} finally {
clearIntervalFor(conn);
}
}
function disconnect(conn: Deno.Conn) {
console.log("Disconnecting:", conn.remoteAddr);
try {
conn.close();
} catch (_) {
console.debug("Failed to close connection (probably already closed)");
}
clearIntervalFor(conn);
cameras.delete(conn);
dispatchers.delete(conn);
}
async function sendErrorAndDisconnect(conn: Deno.Conn, msg: string) {
console.warn("Sending error:", msg);
await conn.write(new Uint8Array([MessageType.ERROR]));
await conn.write(tEnc.encode(msg));
disconnect(conn);
}
function setIntervalFor(conn: Deno.Conn, intervalDeciseconds: number) {
if (intervalDeciseconds <= 0) return;
intervals.set(
conn,
setInterval(() => {
try {
conn.write(new Uint8Array([MessageType.HEARTBEAT]));
} catch {
clearIntervalFor(conn);
}
}, intervalDeciseconds * 100 /* deciseconds to ms */),
);
}
function clearIntervalFor(conn: Deno.Conn) {
if (intervals.has(conn)) {
clearInterval(intervals.get(conn));
intervals.delete(conn);
}
} }
for await (const conn of Deno.listen({ port: PORT })) { for await (const conn of Deno.listen({ port: PORT })) {
console.log("Connection established:", conn.remoteAddr); console.log("Connection established:", conn.remoteAddr);
chatSession(conn); session(conn);
console.log("Connection closed:", conn.remoteAddr);
} }

18
deno.lock Normal file
View file

@ -0,0 +1,18 @@
{
"version": "2",
"remote": {
"https://deno.land/std@0.157.0/_util/assert.ts": "e94f2eb37cebd7f199952e242c77654e43333c1ac4c5c700e929ea3aa5489f74",
"https://deno.land/std@0.157.0/bytes/bytes_list.ts": "aba5e2369e77d426b10af1de0dcc4531acecec27f9b9056f4f7bfbf8ac147ab4",
"https://deno.land/std@0.157.0/bytes/equals.ts": "3c3558c3ae85526f84510aa2b48ab2ad7bdd899e2e0f5b7a8ffc85acb3a6043a",
"https://deno.land/std@0.157.0/bytes/mod.ts": "763f97d33051cc3f28af1a688dfe2830841192a9fea0cbaa55f927b49d49d0bf",
"https://deno.land/std@0.157.0/io/buffer.ts": "fae02290f52301c4e0188670e730cd902f9307fb732d79c4aa14ebdc82497289",
"https://deno.land/std@0.157.0/io/types.d.ts": "0cae3a62da7a37043661746c65c021058bae020b54e50c0e774916e5d4baee43",
"https://deno.land/x/bytes@1.0.3/mod.ts": "0df9e698738399cb6ab2ceaff1d64d45e1581d62c900e3cc6664d75556288fb8",
"https://deno.land/x/bytes@1.0.3/src/bitarray/BitArray.ts": "0b3ad2f03aee0bc5f1f047714e83fec43e9ccbab32b4113cb61925ac80fec62d",
"https://deno.land/x/bytes@1.0.3/src/bitarray/Read.ts": "f8e721c984a7297025de9406304cf96cf9778f999fa6ed064dfee5f3d4f9800d",
"https://deno.land/x/bytes@1.0.3/src/bitarray/Write.ts": "2ef36768c7d47b00e9100507452ce8ce9b14912bc0f1059926a022a719de7d9c",
"https://deno.land/x/bytes@1.0.3/src/byteset/ByteSet.ts": "9b7306cb12852cfaf6340378a87c9e5b33eba6de8c3d4c05ddc411bb3da2743c",
"https://deno.land/x/bytes@1.0.3/src/byteset/Read.ts": "761966c30456c6cc5b4a52c7e154139033b03e6b14c8cecea88f68ad3f0c2b66",
"https://deno.land/x/bytes@1.0.3/src/byteset/Write.ts": "9ec31b39bc1306960f70486e3e898cc38902f2ce3ba338912e1acdee6867b97e"
}
}

View file

@ -10,13 +10,7 @@ the `std` modules available to do as much as possible in a very asynchronous
[Install Deno][deno#install] [Install Deno][deno#install]
# Usage # Example Usage
```bash
deno run -A --unstable N.ts
```
## Example
```bash ```bash
deno run -A --unstable 0.ts deno run -A --unstable 0.ts