import { Handlers } from '$fresh/server.ts' import { Task, TaskModel } from '@homeman/models.ts' import { kv } from '@homeman/db.ts' import { ulid } from 'https://deno.land/x/ulid@v0.3.0/mod.ts' import { z } from 'https://deno.land/x/zod@v3.21.4/mod.ts' const TaskPayload = TaskModel.omit({ doneAt: true }).partial({ id: true, }).extend({ done: z.boolean().optional(), }) type TaskPayload = z.infer async function createOrUpdate(task: TaskPayload) { const newTask: Task = TaskModel.parse({ ...task, doneAt: task.done ? new Date() : null, }) if (!newTask.id || newTask.id === '') { newTask.id = ulid() } const result = await kv.set(['task', newTask.id], newTask) await kv.set(['last_task_updated'], newTask.id) return result } export const handler: Handlers = { async POST(req, _ctx) { const result = await createOrUpdate( TaskPayload.parse(await req.json()), ) return new Response(JSON.stringify(result)) }, async PUT(req, _ctx) { const t = TaskPayload.parse(await req.json()) const result = await createOrUpdate(t) return new Response(JSON.stringify(result)) }, async DELETE(req, _ctx) { // task: form or query params or json let data if (req.headers.get('content-type')?.includes('json')) { data = await req.json() } else { data = { id: new URL(req.url).searchParams.get('id') } } console.log('delete task data:', data) const taskData = TaskModel.pick({ id: true }).parse(data) const result = await kv.delete(['task', taskData.id]) await kv.set(['last_task_updated'], taskData.id) return new Response(JSON.stringify(result)) }, async GET(req, ctx) { // task: json or query params const accept = req.headers.get('accept') if (accept === 'text/event-stream') { console.log('Request for task event stream') let skipFirst = true const stream = kv.watch([['last_task_updated']]).getReader() const body = new ReadableStream({ async start(controller) { console.log( `Streaming task updates to ${JSON.stringify(ctx.remoteAddr)}...`, ) while (true) { try { const entries = await stream.read() for (const entry of entries.value || []) { if (skipFirst) { skipFirst = false continue } if (typeof entry.value !== 'string') { continue } const task = (await kv.get(['task', entry.value])).value const chunk = `data: ${ JSON.stringify({ id: entry.value, versionstamp: entry.versionstamp, value: task, }) }\n\n` console.log('task event chunk:', chunk) controller.enqueue(new TextEncoder().encode(chunk)) } if (entries.done) { return } } catch (e) { console.error(`Error refreshing task:`, e) } } }, cancel() { stream.cancel() console.log( `Closed task updates stream to ${JSON.stringify(ctx.remoteAddr)}`, ) }, }) return new Response(body, { headers: { 'content-type': 'text/event-stream', }, }) } const data = await req.json().catch(() => {}) const taskData = TaskModel.pick({ id: true }).safeParse(data) if (taskData.success) { return new Response( JSON.stringify((await kv.get(['task', taskData.data.id])).value), ) } else { return new Response( JSON.stringify(Array.fromAsync(kv.list({ prefix: ['task'] }))), ) } }, }