Tags: #js #javascript #stream
// Backend: https://sse-demo-dot-rd---product.uc.r.appspot.com
// Example: https://fiddle.fastlydemo.net/fiddle/ba54f417
const CITIES = ['Atlanta', 'Berlin', 'Dublin', 'Boston', 'Denver', 'Tokyo', 'Singapore'];
const REGEX_SSE_LINE = /^([^:]+?)\s*:\s*(.*?)\s*$/;
const parseSSE = (str) => {
return str
.split('\n')
.map(line => line.match(REGEX_SSE_LINE))
.filter(matchResult => matchResult !== null)
.reduce((out, [, k, v]) => ({
...out,
[k]: (k === 'data' && v) ? JSON.parse(v) : v
}))
;
};
const isValidEvent = (sseEvent) => {
if (!sseEvent.data) return true; // Allow unrecognised events through
const city = sseEvent.data.destination;
const shouldEmit = CITIES.includes(city);
console.log('City: '+city+', include: '+ shouldEmit);
return shouldEmit;
};
async function handler(event) {
const clientReq = event.request;
const backendResponse = await fetch(clientReq, { backend: "origin_0" });
const filteredStream = streamFilter(backendResponse.body, {
delimiter: '\n\n',
parser: parseSSE,
validator: isValidEvent,
});
return new Response(filteredStream, {
headers: {
...backendResponse.headers,
'cache-control': 'private, no-store'
}
});
}
addEventListener("fetch", event => event.respondWith(handler(event)));
// *********** HELPER FUNCTIONS *************
/**
* Streamfilter
* Takes a stream and returns a new stream.
*
* - inputStream: The stream to filter
* - options.delimiter: Separator between events in the stream
* - options.parser: Function to parse a single event
* - options.validator: Function to determine whether an event should be included in the output stream
*/
const streamFilter = (inputStream, options) => {
let buffer = '';
const decoder = new TextDecoder();
const encoder = new TextEncoder();
const inputReader = inputStream.getReader();
const outputStream = new ReadableStream({
start() {
buffer = '';
},
// When the output stream is read...
pull(controller) {
// Read from the input stream...
return inputReader.read().then(({value: chunk, done: readerDone}) => {
const chunkStr = decoder.decode(chunk);
let events = chunkStr ? (buffer + chunkStr).split(options.delimiter) : buffer;
// If we're not done the last event will be incomplete
if (!readerDone) {
buffer = events.pop();
}
// Re-emit the events from the input stream into
// the output stream if they meet our criteria
events
.filter(str => options.validator(options.parser(str)))
.forEach(str => controller.enqueue(encoder.encode(str+options.delimiter)))
;
// Flush the queue, and close the stream if we're done
controller.enqueue(encoder.encode(''));
if (readerDone) {
controller.close();
}
});
}
});
return outputStream;
}