网络代理客户端开发日记2:负载均衡网关

Network proxy client dev diary2: load-balancing gateway

nojsja 2022-06-16
字数:3.3k丨 阅读时间:14 分钟

➣ 目录

Contents

➣ 前言

Preface

前文 提到了一款开发中的跨平台网络代理客户端,支持连接 SSR/SS 协议的代理服务器。开发它的目的其实主要是为了解决我在 Ubuntu 操作系统中使用代理软件的痛点,因为很多软件过于老旧或是由于软件 bug 和网络原因经常出现各种无法揣摩的连接问题。

经过了几个小的迭代版本后,所有现存 bug 基本已经被修复了,客户端的易用性、美观度都得到了改善。最近发布了一个较大的迭代版本 v1.2.0,让它支持了全新的实验特性 - 服务节点负载均衡,实现了客户端通过前置的TCP 负载均衡器来连接运行在本地多个进程内的 SSR/SS 客户端服务(ssr-local),从而将流量负载压力分散到各个节点。

>> 项目 github 源码

lb

➣ 客户端功能预览

ScreenShots

main

settings

add_server

server_config

share

➣ 特性

Features

1. 所有已支持特性

  • SS / SSR Protocol (支持 SSSSR 协议)
  • PAC Mode (GFWList) (支持 PAC 模式,可以设置 GFWList 的地址)
  • Global Mode (全局模式)
  • Manual Mode (手动模式)
  • HTTP(S) Proxy (支持 HTTP(S) 代理)
  • ACL (访问控制列表)
  • Nodes Load-Balancing Mode (节点负载均衡)
  • Traffic Metrics (流量统计)
  • Clipboard / QR-code Import (剪贴板和二维码导入)
  • Subscription Import (服务器订阅)
  • Language Detecting And Switching (中英文切换)
  • Configuration Backup / Recovery (配置备份和恢复)
  • Dark / Light Mode (亮色/暗色模式)
  • Auto Start (开机自启)
  • Server Share (服务器配置分享)
  • Activity Logs (活动日志)

2. 新加入特性

  • Nodes Load-Balancing Mode (节点负载均衡)
  • Traffic Metrics (流量统计)

➣ 技术细节

Tech Details

这里需要描述一下负载均衡器的工作机制,指明它起到的关键作用,并对比之前未引入它时服务器节点的连接方式有何不同。

1. 客户端代理的基本工作原理

之前版本的客户端通过用户预先保存的服务器配置信息,使用 node.js 子进程来启动 ss-local 可执行文件建立和 shadowsocks 服务器的连接来代理用户本地电脑的流量。每个子进程占用一个 socket 端口,其它支持 socks5 代理的 proxy 工具比如:浏览器上的 SwitchOmega 插件会和这个端口的 tcp 服务建立连接,将 tcp 流量加密后通过代理服务器转发给我们需要访问的目标服务器,这就是 Shadowsocks 实现网络流量代理的基本原理。

shadow_working_principle.png

2. 多节点模式的负载均衡

ssr-single.png

以上描述的是客户端连接单个节点的工作模式,节点订阅组中的负载均衡模式需要同时启动多个子进程,每个子进程启动 ss-local 执行文件占用一个本地端口并连接到远端一个服务器节点。

每个子进程启动时选择的端口是会变化的,因为某些端口可能已经被系统占用,程序需要先选择未被使用的端口。并且 proxy 工具也不可能同时连接到我们本地启动的子进程上的多个 ss-local 服务上。因此我们需要一个占用固定端口的中间节点接收 proxy 工具发出的连接请求,然后按照某种分发规则将 tcp 流量转发到各个子进程的 ss-local 服务的端口上。

以上描述的这个中间节点就是用于 tcp 流量转发的透明网关,它同时支持可配置的节点分发策略,并辅以一些附加功能 buff 比如流量消耗统计和节点健康度检测,因此也可以称之为负载均衡网关。

ssr-cluster.png

由于 shadowsocks 协议传输层仅支持 tcp 协议,因此我们用 nodejs 的 net 模块创建一个 tcp 服务器运行在固定端口上接收 proxy 代理工具的请求然后通过负载均衡运算后选择某个节点,建立到这个节点的 tcp 双向传输流即可让透明代理正常转发 tcp 流量了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
import net from 'net';
...

export class SocketTransfer extends EventEmitter {
public bytesTransfer = 0;
public speed = '';
private port: number;
private targets: Target[];
private timer: NodeJS.Timeout | null;
private server: net.Server;
private lb: LoadBalancer;
private strategy: ALGORITHM;
private heartbeat: number[];

constructor(options: SocketTransferOptions) {
...
this.server = this.init();
...
}

...

private init() {
return net.createServer((c) => {
const target = this.lb.pickOne(); // load-balancer
console.log('pick target -> ', target?.id);

if (!target || !target.id) {
this.onLoadBalancerError(new Error('no available target!'));
return c.end('socket transfer not ready!');
}

c.on('end', () => { // traffic metrics
this.bytesTransfer += (c.bytesRead + c.bytesWritten);
});
c.on('error', this.onLocalError);

const remote = net.createConnection({ port: +target.id }, () => {
c.pipe(remote);
remote.pipe(c);
});

remote.on('error', (error) => this.onRemoteError(error, +target.id));

});
}

...
}

3. 负载均衡器的设计细节

为了保证负载均衡器的可用性和稳定性,需要有一种心跳机制,定时检测子进程池内各个进程上的服务节点网络连通性。每次未通过健康检测的节点被踢出,然后新开子进程连接其余的某个服务器以替代这个节点的工作。

在创建负载均衡网关的时候传入的 heartbeat 参数会让其依次在:10s、15s、30s、60s、3mins、5mins 的间隔后对进程池内的节点健康度进行检测。首次检测周期完之后,会维持最后的 5mins 一次的检测频率。之所以这样设计是想让客户端在启动的前几分钟能尽快挑选出符合条件的健康节点,之后的低频检测作为一个辅助手段进行健康维持。

1
2
3
4
5
6
7
8
9
Manager.socketTransfer = new SocketTransfer({
port: settings.localPort,
strategy: ALGORITHM.POLLING,
targets: [{
id: (Manager.ssLocal as (SSClient | SSRClient)).settings.localPort,
confId: (Manager.ssLocal as (SSClient | SSRClient)).config.id
}],
heartbeat: [10e3, 15e3, 30e3, 60e3, 60e3 * 3, 60e3 * 5]
});

这里使用 http agent 的方式编写网络连通检查代码,通过 socks5 代理在 tcp 层模拟 HTTP 协议工作方式向 www.google.com 发送一条 HEAD 请求,通过判断返回数据中是否携带 code = 200 判断请求是否成功。为了避免偶发性的网络不稳定,上层控制逻辑会采用 double check 的方式,也就是筛选第一次连通检查失败的节点,再对这些节点进行第二次连通检查,如果还是失败可以得出这个节点的网络健康度较差,可以踢出。踢出后程序重新从剩下的服务器组中选择一个服务器配置并使用子进程连接后放入当前进程池即可。

以下是节点健康度检查的部分代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
import { Socket } from 'net';

const socks = require('socks');

/**
* @name shadowChecker 检查从 ss-local 到 ss-server 的链路是否联通
* @param {string} host ss代理本地地址 ep. 127.0.0.1
* @param {string} port ss代理本地端口 ep. 1081
* @returns Promise<boolean>
*/
export default function shadowChecker(host: string, port: number): Promise<boolean> {
const agentConf = {
ipaddress: host,
port: port,
type: 5,
authentication: {
username: '',
password: ''
}
};

const options = {
command: 'connect',
proxy: agentConf,
target: {
host: 'www.google.com',
port: 80
},
};

return new Promise(resolve => {
socks.createConnection(options, (error: Error | null, pSocket: Socket) => {
if (error) {
return resolve(false);
}
pSocket.on('data', (data) => {
const header = data.toString();
if (header.includes('HTTP/1.1 200 OK')) {
resolve(true);
} else {
console.log(header.slice(0, 100));
resolve(false);
}
});
pSocket.on('end', () => {
setTimeout(() => {
resolve(false);
}, 1e3);
});
pSocket.on('error', (error) => {
resolve(false);
});
pSocket.resume();
pSocket.write('HEAD / HTTP/1.1\r\nHost: www.google.com\r\n\r\n');
});
});
}

之前做 Electron 多进程管理器的时候,简单写过一版负载均衡器,这里拿过来稍加改动下即可直接使用。由于尚处于初期开发阶段,为了保证功能稳定性,目前仅启用了轮询和随机两种策略。后面可以相继完善节点负载计算相关的算法,然后接入更多的负载均衡策略,比如:最小连接数、最小CPU占用等策略。代码部分就不直接贴了,可以参看这个链接:LoadBalancer

接入了负载均衡网关之后,流量统计方面就比较好做了。之前单节点模式,各种 proxy 工具是直接和子进程内的 ss-local 服务通信的,流量消耗根本无法知晓。启用中转透明代理节点后,只需要统计每一次中转请求的 tcp socket 连接退出后 socket 对象上的读取字节数和写入字节数就可以了,将其做累加,即可统计一段时间内的流量消耗情况。并且新的单节点模式被设计成也需要经过负载均衡网关,为了方便之后的一致性维护和单节点模式流量统计。代码部分可以简单参看上文。

lb

➣ 一些有趣的代码设计

1. 函数运行时的拦截器(Interceptor)

一个业务流程被封装到一个函数中,一些与函数主逻辑关系不大的前置和后置操作可以抽离到拦截器里。比如在开启负载均衡业务流程中,需要先关闭单节点模式,并判断用于负载均衡器的主端口是否被占用,这部分代码可以抽离到前置拦截器中。

下面给出拦截器的一个简单实现,这里只给出异步串行拦截器的代码,项目中还实现了同步串行拦截器和异步并行拦截器,他们的不同点在于调用前置和后置拦截器的方式。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
/**
* Interceptor [拦截器函数]
* @author nojsja
*/
export class Interceptor {

env: { [key: string]: any };
before: ((...args: any[]) => any) | null;
after: ((...args: any[]) => any) | null;

constructor(env: any) {
this.env = env;
this.before = null;
this.after = null;
}

...

/**
*
* @param {Function} func [combine interceptors and main func, call them on by one asyncronously]
* @param {Array} interceptors [interceptors]
* @returns {Function} [combined function]
*/
static useAsyncSeries = (
func: (...args: any[]) => any,
interceptors: Interceptor[] = [],
failed?: (...args: any[]) => any
) => {

return async function (this: any, ...args: any[]): Promise<ReturnType<typeof func>> {
let response: any;

try {
await interceptors.reduce(async (memo, interceptor) => {
await memo;
if (interceptor.before) {
await interceptor.before.apply(this, args);
}
}, Promise.resolve());

response = await func.apply(this, args);

await interceptors.reduce(async (memo, interceptor) => {
await memo;
if (interceptor.after) {
await interceptor.after.apply(this, args);
}
}, Promise.resolve());
} catch (error) {
console.log(error);
failed && failed.apply(this, args);
return Promise.resolve({
code: 600,
result: error && (error as Error).toString()
});
}
return response;
}
}

...

};

使用方式如下,传入的参数依次是主逻辑函数、拦截器实例数组、失败回调函数,拦截器函数也有错误捕获的功能,程序抛出的错误会被捕获然后触发传入的失败回调函数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41

/**
* StartClusterInterceptor [cluster interceptor on start]
* @author nojsja
*/
export class StartClusterInterceptor extends Interceptor {

constructor(env?: any) {
super(env);
}

before = async (config: Config, settings: Settings) => {
await Manager.changeMode('cluster');
const results = await checkPortInUse([settings.localPort], '127.0.0.1');
if (results[0]?.isInUse) {
warning(`Port ${settings.localPort} is in use`);
throw new Error(`${i18n.__('port_already_in_use')} ${settings.localPort}`);
}
await Manager.enableProxy(settings);
}

after = () => {}
}

const startClusterInterceptor = new StartClusterInterceptor();

const worker = Interceptor.useAsyncSeries(
/* worker */
async () => { ... },
/* interceptors */
[startClusterInterceptor],
/* fallback */
(err) => {
warning(err);
Manager.disableProxy();
Manager.syncConnected(false);
}
);

/* some where and some time */
worker();

2. 随机选择一个数组内的几个元素

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
export default function randomPicker(total: any[], count: number) {
const result: any[] = [];
total = total.slice();

let num = total.length;
for (let i = 0; i < count; i++) {
const index = ~~(Math.random() * num) + i;
if(result.includes(total[index])) continue;
(total[index] !== undefined) && (result[i] = total[index]);
total[index] = total[i];
num--;
}

return result;
}

3. 异步地选取从某个起始端口开始的未使用的多个端口号

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
import checkPortInUse from "./port-checker";

/**
* @name pickPorts 获取指定数量的可用端口
* @param {Number} start 起始端口
* @param {Number} count 获取数量
* @param {Number[]} excludes 需要排除的端口
*/
export default function pickPorts(start: number, count: number, excludes: number[] = []): Promise<number[]> {
const ports: number[] = [];
const checkingPorts: number[] = [];

for (let i = start; checkingPorts.length < count; i++) {
if (excludes.includes(i)) continue;
checkingPorts.push(i);
}

return checkPortInUse(checkingPorts, '127.0.0.1')
.then(results => {
results.forEach((result, i) => {
if (!result.isInUse) {
ports.push(checkingPorts[i]);
}
});
if (ports.length < count) {
return pickPorts(
(ports[ports.length - 1] || start) + 1,
(count - ports.length),
excludes
).then(newPorts => {
return ports.concat(newPorts);
});
} else {
return ports;
}
});
}

➣ 计划中功能

Planning

目前的功能自测了几天感觉还可以,先保证基本的稳定性和可用性,有时间了再做更多的优化更新吧。

以下是暂时想到的一些可优化点:

  • 支持更多的负载均衡模式
  • 优化节点健康度检测算法
  • 提升 socket 连接性能
[ loading ]⇷⇷