# JS 异步请求并发处理
采用并行,使用请求池,请求池内维持最大并发数
- 串行:一个请求执行完才执行下一个请求
- 并行:不超过并发数的情况下,两个请求一起执行
采用 promise.then 语法
支持 实时向外部更新 已请求成功的 返回数据
/**
* 并发请求控制函数
* @param {Array} _requestList - 请求数组,每个元素可以是一个Promise构造函数或返回Promise的函数
* @param {Number} _concurrency - 最大并发请求数
* @param {Function} _realUpdateCallack - 实时数据更新当前已请求成功的数据回调
* @returns {Promise<Array>}所有请求的结果数组
*/
function handleConcurrentRequest(
_requestList,
_concurrency = 6,
_realUpdateCallack
) {
return new Promise((resolve, reject) => {
if (!_requestList.length) {
reject({ message: "请求函数数组为空!" });
}
// 运行池
const pool = new Set();
// 等待队列
const waitQueue = [];
// 结果
const AllResultLog = new Array(_requestList.length).fill(null);
/**
* @description: 限制并发数量的请求
* @param {*} reqFn:请求方法
* @param {*} max:最大并发数
*/
const request = (reqFn, max) => {
return new Promise((resolve, reject) => {
// 判断运行吃是否已满
const isFull = pool.size >= max;
// 包装的新请求
const newReqFn = () => {
reqFn()
.then((res) => {
resolve(res);
})
.catch((err) => {
reject(err);
})
.finally(() => {
// 请求完成后,将该请求从运行池中删除
pool.delete(newReqFn);
// 从等待队列中取出一个新请求放入等待运行池执行
const next = waitQueue.shift();
if (next) {
pool.add(next);
next();
}
});
};
if (isFull) {
// 如果运行池已满,则将新的请求放到等待队列中
waitQueue.push(newReqFn);
} else {
// 如果运行池未满,则向运行池中添加一个新请求并执行该请求
pool.add(newReqFn);
newReqFn();
}
});
};
_requestList.forEach(async (fn, idx) => {
request(fn, _concurrency)
.then((res) => {
if (res && res.code === 0) {
AllResultLog[idx] = res;
} else {
throw res;
}
})
.catch((err) => {
AllResultLog[idx] = err;
})
.finally(() => {
// 返回数据模板
let tempData = { status: "pending", result: AllResultLog };
// 是否所有的请求皆已完成
if (
waitQueue.length == 0 &&
AllResultLog.length == _requestList.length &&
AllResultLog.every((item) => !!item)
) {
tempData.status = "finished";
resolve(tempData);
}
// 实时更新
if (_realUpdateCallack && _realUpdateCallack instanceof Function) {
_realUpdateCallack(tempData);
}
});
});
});
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
# 测试
let reqs = [];
for (let idx = 0; idx < 10; idx++) {
function testAPI() {
return () =>
new Promise((resolve, reject) => {
// let time = 500 * idx;
let time = Math.ceil(Math.random() * 1000);
setTimeout(() => {
if (idx === 9) {
reject({
code: 1,
message: `第${idx}次请求`,
});
} else {
resolve({
code: 0,
message: `第${idx}次请求`,
});
}
}, time);
});
}
reqs.push(testAPI(idx + 1));
}
let getRealResData = (_res) => {
// // dev-log >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
console.log(`[Dev_Log][${"getRealResData"}_]_>>>`, _res);
};
handleConcurrentRequest(reqs, 3, getRealResData)
.then((results) => {
console.log("请求结果:", results);
})
.catch((error) => {
console.error("请求失败:", error);
});
/**
测试结果
PS C:\MyTest\project\sam9029_code_life\test> node concurrencycopy2.js
请求结果: {
result: [
{ code: 0, message: '第0次请求' },
{ code: 0, message: '第1次请求' },
{ code: 0, message: '第2次请求' },
{ code: 0, message: '第3次请求' },
{ code: 0, message: '第4次请求' },
{ code: 0, message: '第5次请求' },
{ code: 0, message: '第6次请求' },
{ code: 0, message: '第7次请求' },
{ code: 0, message: '第8次请求' },
{ code: 1, message: '第9次请求' }
]
}
*/
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56