深入浅出 Server Sent Event

SSE 入门

Server-sent events 服务端发送事件。本质上 SSE 依然属于 HTTP 请求,并且 SSE 不同于 websocket,SSE 依然是单向通信的,因此实现简单。

SSE 相较于普通的 HTTP 请求有两个特殊的地方

  1. Response 的结果有格式要求
  2. Response 流式传输

下面是 SSE 在传输过程中的基础格式:

data: This is the first message.

data: This is the second message, it
data: has two lines.

data: This is the third message.

基础的实现

This event stream format’s MIME type is [text/event-stream](https://html.spec.whatwg.org/multipage/iana.html#text/event-stream).

—— SSE Spec#parsing-an-event-stream

从规范中得知,SSE 的媒体类型为 text/event-stream,所以响应头中应该需要设置对应的 Content-Type

除此之外,SSE 还是一个 stream,因此 Response 应该是 Steam。

Bun.serve({
  routes: {
    '/sse': () => {
      const stream = new ReadableStream();

      return new Response(stream, {
        headers: {
          'Content-type': 'text/event-stream'
        }
      });
    }
  }
});

按照最开始提到的 SSE 返回的基本格式,服务端的代码实现应该为:

const makeSSEData = (data: string) => {
  // 注意这里的两个换行符号
  return `data:${JSON.stringify(data)}\n\n`;
}

Bun.serve({
  routes: {
    '/sse': () => {
      const stream = new ReadableStream({
        type: 'bytes',
        pull: (controller) => {
          const textEncoder = new TextEncoder;
          controller.enqueue(textEncoder.encode(makeSSEData('hello')))
          controller.enqueue(textEncoder.encode(makeSSEData('world')))
        }
      });

      return new Response(stream, {
        headers: {
          "Content-type": "text/event-stream",
          // demo 测试,允许任意源访问
          "Access-Control-Allow-Origin": "*",
        }
      });
    },
  },
});

在客户端,我们可以使用浏览器提供的 API EventSource 去请求 SSE 接口。

button.addEventListener('click', () => {
  const eventSource = new EventSource('http://127.0.0.1:3000/sse');

  eventSource.onmessage = (event) => {
    console.log(evnet.type, event.data);
  };
});

结果如下:

message "hello"
message "world"

event、data、last event Id

我们打开 Chrome DevTool,并在 Network 面板中检查一下刚刚请求的 SSE 接口,我们发现这其中有好几个字段,分别是 IdTypeDataTime

SSE 的全称是 Server Sent Event,所以这里的 Type 实际上就是 Event Type,Data 就是 Event Data。

以下是服务端代码的部分实现,以及 SSE 接口的 Response 结果。

const makeSSEData = (data: string) => {
  // 注意这里的两个换行符号
  return `data:${JSON.stringify(data)}\n\n`;
}

// ...
controller.enqueue(textEncoder.encode(makeSSEData('hello')))
controller.enqueue(textEncoder.encode(makeSSEData('world')))
// ...

我们可以猜出出两件事

  1. Event Type 的默认值应该是 message
  2. Event Data 的格式是 data: "xxx"

我的猜测是建立在我已经了解 SSE 的处理规则,正常学习东西的话,还是需要通过更多的例子以及权威的规范去验证。

后文有对规范的解读

确实如此,我们再稍微修改一下服务端代码,实现一个自定义的 Event Type。

const makeSSECustom = (eventName: string, data: string) => {
  return `event: ${eventName}\ndata:${JSON.stringify(data)}\n\n`;
}

// 新增
controller.enqueue(textEncoder.encode(makeSSECustom('test', 'hello')))

controller.enqueue(textEncoder.encode(makeSSEData('hello')))
controller.enqueue(textEncoder.encode(makeSSEData('world')))

可以看到,现在多出来了一个 名为 test 的事件,而 SSE Response 中多了三行。

// 多出来的三行,包括空行
event: test
data:"hello"

data:"hello"

data:"world"

我们也可以再猜出来两件事

  1. 如果出现了空行,那么等于新建一个事件
  2. 如果在一块内容中,解析到了 event: test 那么使用 test 作为 event type

最后我们看一下 id,继续修改服务端代码,并在客户端重新请求。

const makeSSEID = (id: number) => {
  return `id: ${id}\n`;
}

controller.enqueue(textEncoder.encode(makeSSEID(1)))
controller.enqueue(textEncoder.encode(makeSSECustom('test', 'hello')))
controller.enqueue(textEncoder.encode(makeSSEData('hello')))

controller.enqueue(textEncoder.encode(makeSSEID(2)))
controller.enqueue(textEncoder.encode(makeSSEData('world')))

我们观察 SSE Response

// id 1, type test
id: 1
event: test
data:"hello"

// id 1, type: message
data:"hello"

// id 2, type message
id: 2
data:"world"

可以发现,id 和 event type 不一样,event type 会刷新,而 id 只要设置过就会一直保留。

小总结

  1. Event Type 的默认值应该是 message
  2. Event Data 的格式是 data: "xxx"
  3. 如果出现了空行,那么等于新建一个事件
  4. 如果在一块内容中,解析到了 event: test 那么使用 test 作为 event type
  5. id 和 event type 不一样,event type 会刷新,而 id 只要设置过就会一直保留

并不权威,为了你对 SSE 有一个感性的理解。

深入 SSE

eventSource 的问题

浏览器原生实现的 EventSource 只能实现 GET 请求,我们尝试将服务端代码改成 POST ,再去尝试请求一下接口。

Bun.serve({
  routes: {
    '/ssePost': {
      POST: () => {
        const readableStream = new ReadableStream({
          type: 'bytes',
          async start(controller) {
            controller.enqueue(textEncoder.encode(makeSSEData('hello')));
            controller.enqueue(textEncoder.encode(makeSSEData('world')));
            controller.enqueue(textEncoder.encode('\n'));
          }
        });

        return new Response(readableStream, {
          headers: {
            'Content-type': 'text/event-stream',
            'Access-Control-Allow-Origin': '*',
            'Cache-Control': 'no-cache',
            Connection: 'keep-alive'
          }
        });
      }
    }
  },
  port: 3001
});

没错,请求 404 了。

那有什么办法呢?就是使用 Fetch 去模拟浏览器提供的 EventSource,毕竟 Fetch 没有什么请求类型的限制。

你也许会问,那么我使用 GET 请求不就好了。

其实原因是,浏览器对于网络请求的 URL 长度是有限制的,这也意味着如果一个请求的内容很多,我们只能放弃 GET 请求。

参考:https://stackoverflow.com/questions/417142/what-is-the-maximum-length-of-a-url-in-different-browsers

fetch('http://127.0.0.1:3000/ssePost').then(async (res) => {
  const reader = res.body.getReader();
  while (true) {
    const { done, value } = await reader.read();
    if (done) break;
    const decoder = new TextDecoder();
    console.log(decoder.decode(value));
  }
})

请求的结果如下:

看着不错,后端传递的数据都返回了,但是有一个很大的问题——数据没法直接使用,因为返回的结果是纯文本的,实际上我们可能更倾向于以下的用法。

eventSource.onmessage = (event) => {
  if (event.type === 'message') {
    handle('message', event.data);
  } else if (event.type === 'test') {
    handle('test', event.data);
  }
};

如果我们希望直接使用 Fetch,我们需要按照上面提到的规则,去处理 type,data,id 等情况。

这样很麻烦,因此可以使用一些开源库去处理,比如 fetch-event-source,关于 fetch-event-source 的用法这里不介绍了,接着我们讲一下这个库的实现原理。

下面我们将依照规范去讲解 EventSource 是如何实现。和规范中的顺序相反,我们首先讲一下 Event Stream 的解析规则。

Event Stream 解析规则

规范中,单条 SSE Stream 的产生式如下:

因此,我们得知,Response 的每一行都表示一个 event,event 分为 comment,field,换行。

分别给 comment 和 field 的格式举例子就是:

// Comment
: comment

// field 下面三行都是合法的 Response 格式
data
data:123
data: 123

// 也可以只是换行

解析 Event Stream 的时候有几个背景需要了解

  1. Event Stream 是逐行解析的
  2. 解析 Stream 时,必须为其关联一个 data buffer,一个 type buffer 和一个 last event ID buffer
  3. 这三个 buffer 初始化为空字符串。

接着,规范提到了 Stream 解析的步骤,首先进行分类

  1. 空行
  2. : 开头
  3. 包含 :
  4. 其它情况,字符串不为空,但是不包含 :

https://html.spec.whatwg.org/multipage/server-sent-events.html#event-stream-interpretation

首先我们看第三种情况,也就是包含 : 的情况

  1. 收集 : 之前的字符串,作为 field 字段
  2. 收集 : 后的字符串,作为 value,若 value 开头的字符是空格,那么去掉
  3. 如果 field 是 event,那么将 filed 对应的 value 存入 event type buffer
  4. 如果 field 是 data,那么将 filed 对应的 value 追加data buffer,并追加换行符
  5. 如果 field 是 id 并且 value 不包含 U+0000 NULL 字符,那么将 value 存入 last event ID buffer,否则忽略
  6. 如果 field 是 retry 并且 value 仅为数字,那么将 event Stream 的重连时间设置为该值,否则忽略
  7. 其余所有情况将被忽略

举个例子,如果 Event Stream 如下所示:

event: message
data: This is the first message.

此时,buffer 区域的内容应该是

event type buffer -> message
data buffer -> This is the first message.
last event id buffer ->

现在我们对 Buffer 已经有了概念了,那么接下来就是看何时消费 Buffer。

我们在上面提到过 Stream 的解析分成四个种类,第一种就是空行,而通过查阅规范,得知如果解析到空行,那么需要 Dispatch Event,以此去消费 Buffer 区域中的内容

下面是 Dispatch Event 的时候的处理流程:

  1. 获取 last event id buffer 中的数据,提取出来,last event id buffer 不会被重置,直到有新的 id 字符串替代老的字符串
  2. 检查 data buffer 是否为空字符串,如果为空串,那么将 event type buffer 清空,然后退出
  3. 如果 data buffer 的最后一个字符是换行符(LF),那么从 data buffer 中移出该字符
  4. 构建一个 MessageEvent,new MessageEvent(value)
  5. 将 MessageEvent 的 type 设置为 “message”,将 data 属性设置为 data buffer 中的值,将 origin 属性设置为 Event Stream 的最终 URL(重定向后的 URL),以及将 lastEventId 属性设置为 last event id buffer 中的值。
  6. 如果 event type buffer 中有值,那么将 type 设置为 event type buffer 中的值。
  7. 清空 data bufferevent type buffer
  8. 监听 readyState 属性,当 readyState 属性被设置为非 CLOSED 时,在 EventSource 对象上 dispatch 新创建的任务。

简单分析 fetch-event-source 的实现

fetch-event-source 的实现其实很简单,主要分成了两个文件

  1. fetch.ts
  2. parse.ts

fetch

Fetch 负责调用 parser 的能力,对使用者暴露接口,并管理整个 SSE 请求的生命周期,以及异常处理等等。

下面是 fetch 的核心实现,可以看到 fetch-event-source 其实就是在使用 fetch 去请求接口,并处理 Response。

// ...
const response = await fetch(input, {
  ...rest,
  headers,
  signal: curRequestController.signal,
});

await onopen(response);

// 调用 parser
await getBytes(response.body!, getLines(getMessages(id => {
  if (id) {
      // store the id and send it back on the next retry:
      headers[LastEventId] = id;
  } else {
      // don't send the last-event-id header anymore:
      delete headers[LastEventId];
  }
}, retry => {
  retryInterval = retry;
}, onmessage)));
// .....

parser

首先,需要逐步读取 Stream 中的数据,并且把这些数据传入 onChunk 中。

export async function getBytes(stream: ReadableStream<Uint8Array>, onChunk: (arr: Uint8Array) => void) {
    const reader = stream.getReader();
    let result: ReadableStreamDefaultReadResult<Uint8Array>;
    while (!(result = await reader.read()).done) {
        onChunk(result.value);
    }
}

onChunk 是在 fetch.ts 中传进来的回调,实际上对应的使 parser.ts 中的 getLines 函数。

function getLines(onLine: (line: Uint8Array, fieldLength: number) => void)

然后,在 getLines 中将传输过来的数据,按照每一行进行分割。

// 把每一行的 buffer 切割,并且把 field 的长度传入,方便在处理单独行的时候进行切割
onLine(buffer.subarray(lineStart, lineEnd), fieldLength);

接着,在 onLine 中按行处理数据。

export function getMessages(
    onId: (id: string) => void,
    onRetry: (retry: number) => void,
    onMessage?: (msg: EventSourceMessage) => void
) {
    let message = newMessage();
    const decoder = new TextDecoder();

    return function onLine(line: Uint8Array, fieldLength: number) {
        if (line.length === 0) {
            // 如果是空行,那么 Event Message 将被 dispatch
            onMessage?.(message);
            // 构建新的 Message,覆盖原来的
            message = newMessage();
        } else if (fieldLength > 0) {
            // 解出 field 和 value
            const field = decoder.decode(line.subarray(0, fieldLength));
            // 规范规定 value 中第一个字符是空格的话,需要忽略
            const valueOffset = fieldLength + (line[fieldLength + 1] === ControlChars.Space ? 2 : 1);
            const value = decoder.decode(line.subarray(valueOffset));

            // 按照之前规范中说的,构建 Message
            switch (field) {
                case 'data':
                    message.data = message.data
                        ? message.data + '\n' + value
                        : value; // otherwise,
                    break;
                case 'event':
                    message.event = value;
                    break;
                case 'id':
                    onId(message.id = value);
                    break;
                case 'retry':
                    const retry = parseInt(value, 10);
                    if (!isNaN(retry)) {
                        onRetry(message.retry = retry);
                    }
                    break;
            }
        }
    }
}

上面就是 fetch-event-source 大概的实现了,总的来说不算复杂,需要注意的是动态的处理流式数据,并且按照需要按照规范解析二进制流。

fetch   ---> 使用 fetch 请求接口,获取 response 句柄
  |
  |
getBytes ---> 按照 chunk 流式的处理数据
  |
  |
getLines ---> 将 chunk 切分成 line,并调用 onLine 处理数据
  |
  |
onLine  ---> 处理每一行的 Event 数据,并通过注入进来的 onMessage 处理结果

总结

SSE 本质上还是基于 HTTP 协议的,我个人感觉和 JSON RPC 有点像,本质上只是对 Response 格式的一种特殊约定。

在工程实践中,直接使用浏览器提供的 EventSource 有许多问题,因此更加推荐使用基于 Fetch 封装的三方库。