Skip to the content.

简易事件/中间件实现

作为 API 的使用者/调用者, 疑问最多的应该就是当前使用的 API, 使用基础代码是怎么实现的. 其原理, 是快速解决诸多实际问题的钥匙.

NodeJs EventEmitter事件模拟(观察者模式)

class EventEmitter {
  constructor() {
    this.events = new Map()
  }
  // 订阅/监听/观察/消费/收听
  on(eventName, listener) {
    // 实例的事件类型, 添加的值为 Set 实例
    this.events.has(eventName) || this.events.set(eventName, new Set())
    // 添加事件订阅, 往事件类型Set值上面添加响应函数值
    this.events.get(eventName).add(listener)
    return this
  }
  // 一次性订阅/监听/观察/消费/广播
  once(eventName, listener) {
    // 一次订阅, 事件发布之后移除事件订阅
    const emitter = this
    function fn(...args) {
      emitter.off(eventName, fn) // 调用前移除此事件
      listener.apply(emitter, args)
    }
    emitter.events.has(eventName) || this.events.set(eventName, new Set())
    emitter.events.get(eventName).add(fn)
    return emitter
  }
  // 从名为 eventName 的事件的监听器数组中移除指定的 listener
  removeListener(eventName, listener) {
    // 移除事件, 1. 移除指定事件, 2. 移除事件类型上所有事件
    if (this.events.has(eventName) && typeof listener === 'function') {
      this.events.get(eventName).delete(listener)
    } else {
      this.events.delete(eventName)
    }
    return this
  }
  // 移除全部或指定的监听器
  removeAllListeners(eventName) {
    if(eventName === undefined) {
      this.events.clear()
    } else {
      this.events.delete(eventName)
    }
    return this
  }
  // 发布/源//生产/广播
  emit(eventName, ...args) {
    // 发布事件, 1. 带参数, 2. 不带参数
    if (this.events.has(eventName)) {
      this.events.get(eventName).forEach(listener => listener.apply(this, args))
    }
    return this.events.has(eventName)
  }
}

ES5 eventbus版本

function EventBus() {
  const eventCallbacksPairs = [];
  this.subscribe = function( eventType, callback ) {
    const eventCallbacksPair = findEventCallbacksPair(eventType);
    if(eventCallbacksPair)
      eventCallbacksPair.callbacks.push(callback);
    else
      eventCallbacksPairs
        .push( new EventCallbacksPair(eventType, callback) );
  }
  this.post = function( eventType, argument1, argument2 ) {
    const eventCallbacksPair = findEventCallbacksPair(eventType);
    if(!eventCallbacksPair) {
      console.error(no subscribers for event  +eventType);
      return;
    }
    eventCallbacksPair.callbacks
      .forEach( callback => callback(argument1, argument2) );
  }
  function findEventCallbacksPair(eventType) {
    return eventCallbacksPairs
      .find( eventObject => eventObject.eventType === eventType );
  }
  function EventCallbacksPair( eventType, callback ) {
    this.eventType = eventType;
    this.callbacks = [callback];
  }
}

观察者模式: 把人的行为与应用程序的行为区分开;

vue自定义事件, dom事件, Nodejs事件都是类似继承了EventEmitter对象原型

Express 中间件模拟(责任链模式)

函数形式

const http = require('http');

/**
 * 仿express实现中间件机制
 *
 * @return {app}
 */
function express() {
  const middleware = [];
  const app = function (req, res) {
    function next() {
      const task = middleware.shift();  // 递归推出middleware数组中的函数调用, next传给函数进行递归调用.
      if (typeof task === 'function') {
        task(req, res, next); // 执行中间件, API设计为携带
      }
    }
    next();
  }
  app.use = function (task) {
    middleware.push(task) // use方法就是把函数添加到函数数组中
    return app
  }
  return app;
}

// 下面是测试case

const app = express();
http.createServer(app).listen('3000', function () {
    console.log('listening 3000....');
});

function middlewareA(req, res, next) {
    console.log('middlewareA before next()');
    next();
    console.log('middlewareA after next()');
}

function middlewareB(req, res, next) {
    console.log('middlewareB before next()');
    next();
    console.log('middlewareB after next()');
}

function middlewareC(req, res, next) {
    console.log('middlewareC before next()');
    next();
    console.log('middlewareC after next()');
}

app.use(middlewareA);
app.use(middlewareB);
app.use(middlewareC);
// 访问localhost:3000即可以看到中间件的实际反馈

calss形式

class App {
  constructor() {
    this.middleware = []
    this.index = 0
  }

  use(task) {
    this.middleware.push(task)
  }

  run(arg) {
    const task = this.middleware[this.index]
    this.index += 1
    if(typeof task === 'function') {
      task(arg, this.run.bind(this))
    }
  }
}

容器存储中间件函数, 函数使用最后一个参数next递归调用,

模拟 redux 中间件

原版实现

// 组合子辅助函数
function compose(...funcs) {
  if (funcs.length === 0) {
    return arg => arg
  }

  if (funcs.length === 1) {
    return funcs[0]
  }

  return funcs.reduce((a, b) => (...args) => a(b(...args)))
}

function applyMiddleware(...middlewares) {
  return createStore => (...args) => {
    const store = createStore(...args)
    let dispatch = () => {
      throw new Error(
        `Dispatching while constructing your middleware is not allowed. ` +
          `Other middleware would not be applied to this dispatch.`
      )
    }

    const middlewareAPI = {
      getState: store.getState,
      dispatch: (...args) => dispatch(...args)
    }
    const chain = middlewares.map(middleware => middleware(middlewareAPI))
    dispatch = compose(...chain)(store.dispatch)

    return {
      ...store,
      dispatch
    }
  }
}

简易版模拟redux实现

// Warning: Naïve implementation!
// That's *not* Redux API.
function applyMiddleware(store, middlewares) {
  middlewares = middlewares.slice()
  middlewares.reverse()
  let dispatch = store.dispatch
  middlewares.forEach(middleware =>
    dispatch = middleware(store)(dispatch)
  )
  return Object.assign({}, store, { dispatch })
}

// 下面是测试case

function middlewareA(store) {
  return next => action => {
    console.log('dispatching 路过A', action)
    let result = next(action)
    console.log('走过A...next state', store)
    return result
  }
}

function middlewareB(store) {
  return next => action => {
    console.log('dispatching 路过B', action)
    let result = next(action)
    console.log('走过B...next state', store)
    return result
  }
}

const store = {
  dispatch: () => { console.log('dis....') }
}

const app = applyMiddleware(store, [middlewareA, middlewareB])
app.dispatch('this is action.......')

koa2中间件(洋葱模型)

koa2源码缩减版

const Emitter = require('events');

function compose (middleware) {
  if (!Array.isArray(middleware)) throw new TypeError('Middleware stack must be an array!')
  for (const fn of middleware) {
    if (typeof fn !== 'function') throw new TypeError('Middleware must be composed of functions!')
  }

  /**
   * @param {Object} context
   * @return {Promise}
   * @api public
   */

  return function (context, next) {
    // last called middleware #
    let index = -1
    return dispatch(0)
    function dispatch (i) {
      if (i <= index) return Promise.reject(new Error('next() called multiple times'))
      index = i
      let fn = middleware[i]
      if (i === middleware.length) fn = next
      if (!fn) return Promise.resolve()
      try {
        return Promise.resolve(fn(context, function next () {
          return dispatch(i + 1)
        }))
      } catch (err) {
        return Promise.reject(err)
      }
    }
  }
}
// 在 Event 上扩展了
class Application extends Emitter {
  constructor() {
    super();
    // 中间件容器
    this.middleware = [];
  }

  use(fn) {
    this.middleware.push(fn)
    // 调用形式
    return this
  }
  // 监听服务调用callback回调
  callback(ctx) {
    const fn = compose(this.middleware);

    return fn(ctx).then(res => {
      // response处理
    }).catch(err => {
      // error处理
    });
  }
}

以下为测试case

async function middlewareA(ctx, next) {
  console.log('>>...A层洋葱皮', ctx)
  await next();
  console.log('<<...A层洋葱皮', ctx)
}

async function middlewareB(ctx, next) {
  console.log('>>...B层洋葱皮', ctx)
  await next();
  console.log('<<...B层洋葱皮', ctx)
}

const app = new Application()
app.use(middlewareA)
app.use(middlewareB)
app.callback()