# JS 异步请求并发处理

  • 采用并行,使用请求池,请求池内维持最大并发数

    • 串行:一个请求执行完才执行下一个请求
    • 并行:不超过并发数的情况下,两个请求一起执行
  • 采用 promise.then 语法

  • 支持 实时向外部更新 已请求成功的 返回数据

/**
 * 并发请求控制函数
 * @param {Array} _requestList - 请求数组,每个元素可以是一个Promise构造函数或返回Promise的函数
 * @param {Number} _concurrency - 最大并发请求数
 * @param {Function} _realUpdateCallack - 实时数据更新当前已请求成功的数据回调
 * @returns {Promise<Array>}所有请求的结果数组
 */
function handleConcurrentRequest(
  _requestList,
  _concurrency = 6,
  _realUpdateCallack
) {
  return new Promise((resolve, reject) => {
    if (!_requestList.length) {
      reject({ message: "请求函数数组为空!" });
    }

    // 运行池
    const pool = new Set();
    // 等待队列
    const waitQueue = [];
    // 结果
    const AllResultLog = new Array(_requestList.length).fill(null);

    /**
     * @description: 限制并发数量的请求
     * @param {*} reqFn:请求方法
     * @param {*} max:最大并发数
     */
    const request = (reqFn, max) => {
      return new Promise((resolve, reject) => {
        // 判断运行吃是否已满
        const isFull = pool.size >= max;

        // 包装的新请求
        const newReqFn = () => {
          reqFn()
            .then((res) => {
              resolve(res);
            })
            .catch((err) => {
              reject(err);
            })
            .finally(() => {
              // 请求完成后,将该请求从运行池中删除
              pool.delete(newReqFn);
              // 从等待队列中取出一个新请求放入等待运行池执行
              const next = waitQueue.shift();
              if (next) {
                pool.add(next);
                next();
              }
            });
        };

        if (isFull) {
          // 如果运行池已满,则将新的请求放到等待队列中
          waitQueue.push(newReqFn);
        } else {
          // 如果运行池未满,则向运行池中添加一个新请求并执行该请求
          pool.add(newReqFn);
          newReqFn();
        }
      });
    };

    _requestList.forEach(async (fn, idx) => {
      request(fn, _concurrency)
        .then((res) => {
          if (res && res.code === 0) {
            AllResultLog[idx] = res;
          } else {
            throw res;
          }
        })
        .catch((err) => {
          AllResultLog[idx] = err;
        })
        .finally(() => {
          // 返回数据模板
          let tempData = { status: "pending", result: AllResultLog };

          // 是否所有的请求皆已完成
          if (
            waitQueue.length == 0 &&
            AllResultLog.length == _requestList.length &&
            AllResultLog.every((item) => !!item)
          ) {
            tempData.status = "finished";
            resolve(tempData);
          }

          // 实时更新
          if (_realUpdateCallack && _realUpdateCallack instanceof Function) {
            _realUpdateCallack(tempData);
          }
        });
    });
  });
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100

# 测试

let reqs = [];
for (let idx = 0; idx < 10; idx++) {
  function testAPI() {
    return () =>
      new Promise((resolve, reject) => {
        // let time = 500 * idx;
        let time = Math.ceil(Math.random() * 1000);
        setTimeout(() => {
          if (idx === 9) {
            reject({
              code: 1,
              message: `${idx}次请求`,
            });
          } else {
            resolve({
              code: 0,
              message: `${idx}次请求`,
            });
          }
        }, time);
      });
  }
  reqs.push(testAPI(idx + 1));
}

let getRealResData = (_res) => {
  // // dev-log >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
  console.log(`[Dev_Log][${"getRealResData"}_]_>>>`, _res);
};

handleConcurrentRequest(reqs, 3, getRealResData)
  .then((results) => {
    console.log("请求结果:", results);
  })
  .catch((error) => {
    console.error("请求失败:", error);
  });

/**
测试结果
PS C:\MyTest\project\sam9029_code_life\test> node concurrencycopy2.js
  请求结果: {
    result: [
      { code: 0, message: '第0次请求' },
      { code: 0, message: '第1次请求' },
      { code: 0, message: '第2次请求' },
      { code: 0, message: '第3次请求' },
      { code: 0, message: '第4次请求' },
      { code: 0, message: '第5次请求' },
      { code: 0, message: '第6次请求' },
      { code: 0, message: '第7次请求' },
      { code: 0, message: '第8次请求' },
      { code: 1, message: '第9次请求' }
    ]
  }
 */
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56