在这篇博客文章中,我很高兴分享我在设计可扩展的聊天应用程序方面的见解和经验,该应用程序能够使用强大的框架来处理高用户负载。
有一个工作存储库here。
让我们开始吧! ð
什么可以使我的设计可扩展性?
为了在使用Nestjs构建的聊天应用程序中实现可伸缩性,我们考虑了几个关键因素:
1.保持正确的功能:
我们的应用程序确保即使在多个实例上进行水平缩放,所需的功能仍然完好无损。这意味着,当我们扩展基础架构以处理增加的用户负载时,应用程序继续按预期执行。
2.有效的消息传递
为了优化资源使用情况并提高性能,我们的应用程序仅向相关实例提供消息或事件。这种方法最大程度地减少了不必要的调用,减少了开销并确保应用程序不同组件之间的有效沟通。
3.松动的耦合和微服务
在Nestjs的Dependency Inversion Principle之后,我们应用程序的架构设计在服务之间的耦合松散。这种松散的耦合使应用程序以最小的努力将应用程序轻松分解为微服务。每个微服务都可以独立运行,随着系统的增长,可促进可伸缩性和适应性。
4.清洁体系结构实现
通过实施Clean Architecture的原理,我们的应用程序与框架无关。这意味着它可以在不同的框架之间无缝过渡,例如从mongodb迁移到后Ql,而无需对核心实体和业务逻辑进行修改。干净的体系结构方法有助于解除应用程序的内部层,使其更加可维护和灵活。
实施可扩展应用程序
注意: 在我的应用程序中,我实施了两个版本的
PubSub
服务:一种利用GraphQL Subscriptions,另一个使用Socket.IO。PubSub
服务将在本博客稍后详细描述。通过在.env
文件中设置PUBSUB_SERVICE_PROVIDER
,您可以在这些选项之间无缝切换。
1.保持正确的功能
默认情况下,GraphQL订阅和socket.io中的事件发布系统是内存的。这意味着其他实例处理的客户未收到一个实例发布的事件。当扩展到多个服务器实例时,有必要使用另一种实现替换默认的内存系统,以确保将事件正确路由到所有客户端。有多种选择,例如Redis Pubsub,Google Pubsub,MongoDB Pubsub,PostgreSQL Pubsub等。在此示例中,我们将使用Redis Pubsub。
下图提供了其工作方式的视觉表示:
执行
- GraphQl订阅
@Module({
providers: [
{
provide: PUB_SUB_TOKEN,
useFactory: (envConfigService: EnvConfigService) => {
const options: RedisOptions = {
retryStrategy: (times) => {
// reconnect after
return Math.min(times * 50, 2000);
},
};
const redisConnectionString =
envConfigService.getRedisConnectionString();
return new RedisPubSub({
publisher: new Redis(redisConnectionString, options),
subscriber: new Redis(redisConnectionString, options),
});
},
inject: [EnvConfigService],
},
],
exports: [PUB_SUB_TOKEN],
})
export class RedisPubSubModule {}
- socket.io
export class RedisIoAdapter extends IoAdapter {
private adapterConstructor: ReturnType<typeof createAdapter>;
constructor(
app: INestApplicationContext,
private readonly envConfigService: EnvConfigService,
) {
super(app);
}
async connectToRedis(): Promise<void> {
const options: RedisOptions = {
retryStrategy: (times) => {
// reconnect after
return Math.min(times * 50, 2000);
},
};
const redisConnectionString =
this.envConfigService.getRedisConnectionString();
const pubClient = new Redis(redisConnectionString, options);
const subClient = new Redis(redisConnectionString, options);
this.adapterConstructor = createAdapter(pubClient, subClient);
}
createIOServer(port: number, options?: ServerOptions): any {
const server = super.createIOServer(port, options);
server.adapter(this.adapterConstructor);
return server;
}
}
2.有效的消息传递
考虑一种方法,该方法涉及基于共同主题的事件进行分组,例如“通知”,“ ChannelMessage”,“ DirectMessage”等。用户有能力订阅这些主题,当事件发生时,它将发布到相应的主题。但是,该方法将整个应用程序的流量分配给所有实例,即使仅有一部分是相关的。例如,如果有五个实例,则每个实例将获得100%的流量。通常,只有20%的流量与其运营有关(100/5 = 20%)。结果,80%的流量被发送到不需要的实例。
为了优化资源使用情况并提高性能,我的方法采取了不同的路线。我没有使用通用主题,而是按照“事件:{userId}”的格式为每个用户创建一个单独的主题。事件发生时,它会简单地发布到相应的用户主题,例如用户123的“事件:123”。这确保只有负责处理该特定用户的实例才能被调用。通过实施此优化,可以有效利用资源,从而减少不必要的调用并提高整体性能。
执行
- GraphQl订阅
subscribeToEventTopic(subscriber: ISubscriber) {
const { id: subscriberId, userId } = subscriber;
...
const eventTopic = this.getEventTopic(userId);
return this.pubSub.asyncIterator(eventTopic);
}
...
private getEventTopic(userId: string) {
return `event:${userId}`;
}
- socket.io
async handleConnection(client: Socket) {
const authToken = client.handshake.auth.token;
if (!_.isString(authToken)) {
client.disconnect();
}
const isValidToken = await this.isValidToken(authToken);
if (!isValidToken) {
client.disconnect();
}
const user = this.parseToken(authToken);
const userRoomName = this.getUserRoomName(user.id);
client.join(userRoomName); // Here
const subscriber: ISubscriber = {
id: crypto.randomUUID(),
userId: user.id,
username: user.username,
tokenExpireAt: user.exp,
};
client.data.subscriber = subscriber;
}
...
private getUserRoomName(userId: string) {
return `event:${userId}`;
}
3.松动的耦合和微服务
在我的应用程序中,我有两个主要服务。第一个服务处理消息和其他相关任务的CRUD操作。第二项服务负责将用户(称为PubSub
服务)的用户发布。为了确保脱钩的体系结构,我通过抽象服务来采用依赖性反转原则。对于PubSub
服务,我创建了IPubSubService
接口,该接口具有两个实现。一种实现利用GraphQL订阅,而另一个使用socket.io。这些实现之间的切换就像将PUBSUB_SERVICE_PROVIDER
的值设置为“ GraphQl_subScription”或“ socket.io”一样简单。此无缝开关例证了服务的脱钩。
假设我们决定将应用程序分解为微服务。在这种情况下,创建实现IPubSubService
接口的新的微服务成为一个简单的任务。通过在主要主题中导入相应的模块,我们可以通过最小的努力无缝整合新的微服务,而无需对基础逻辑进行任何修改。随着我们的应用程序演变为微服务体系结构,这种灵活的设计使易于扩展和适应性能力。
让我们使用以下图来查看所有操作:
执行
1。创建IPubSubService
接口和PUBSUB_SERVICE_TOKEN
。
const PUBSUB_SERVICE_TOKEN = Symbol('PUBSUB_SERVICE_TOKEN');
interface IPubSubService {
publishChannelMessageEvent(
params: IPublishChannelMessageParams,
): Promise<void>;
publishDirectMessageEvent(params: IPublishDirectMessageParams): Promise<void>;
}
2。在主要服务中,我们注入PUBSUB_SERVICE_TOKEN
。
@Injectable()
export class MessageUseCase {
constructor(
@Inject(DATA_SERVICE_TOKEN)
private readonly dataService: IDataService,
@Inject(PUBSUB_SERVICE_TOKEN)
private readonly pubSubService: IPubSubService,
) {}
async createOne(params: {
senderId: string;
createMessageInput: CreateChannelMessageInput;
}): Promise<Message> {
...
}
}
3。将PubSubServiceModule
导入主要服务。
@Module({
imports: [DataServiceModule, PubSubServiceModule],
providers: [MessageUseCase],
exports: [MessageUseCase],
})
export class MessageModule {}
4。在PubSubServiceModule
中,检查PUBSUB_SERVICE_PROVIDER
的值以导入相应的提供商。
const pubsubModuleProvider =
process.env.PUBSUB_SERVICE_PROVIDER === PUBSUB_SERVICE_PROVIDER['SOCKET.IO']
? SocketIOModule
: GraphQLSubscriptionModule;
@Module({
imports: [pubsubModuleProvider],
exports: [pubsubModuleProvider],
})
export class PubSubServiceModule {}
5。在每个提供商服务中实现ipubsubservice接口和导出PUBSUB_SERVICE_TOKEN
。
- GraphQl订阅
@Module({
imports: [DataServiceModule, RedisPubSubModule],
providers: [
SubscriptionResolver,
GraphQLSubscriptionService,
{
useExisting: GraphQLSubscriptionService,
provide: PUBSUB_SERVICE_TOKEN,
},
],
exports: [PUBSUB_SERVICE_TOKEN],
})
export class GraphQLSubscriptionModule {}
- socket.io
@Module({
imports: [DataServiceModule],
providers: [
SocketIOGateway,
SocketIOService,
{
useExisting: SocketIOService,
provide: PUBSUB_SERVICE_TOKEN,
},
],
exports: [PUBSUB_SERVICE_TOKEN],
})
export class SocketIOModule {}
4.清洁体系结构实现
在他的书“ Clean Architecture ”的书中,鲍勃叔叔强调了将业务逻辑放在我们建筑的核心的重要性。这使业务逻辑可以独立于外部变化,从而确保其稳定性和灵活性。
以下图清楚地表示此概念:
执行
在我的应用程序中,我组织了core
文件夹中的核心实体和use-cases
文件夹中的用例(业务逻辑)。所有其他层,例如控制器,解析器和框架,都取决于这两个核心层,而核心层仍然独立于任何其他层。
让我们看一下Message
实体:
export interface IMessage extends IBaseEntity {
content?: string;
senderId?: string;
sender?: IUser;
channelId?: string;
channel?: IChannel;
}
现在,让我们检查实现该实体的框架。在此示例中,我正在使用Mongoose
架构:
@Schema({ ...baseSchemaOptions, collection: MESSAGE_COLLECTION })
@ObjectType()
class Message extends BaseSchema implements IMessage {
@Prop()
@Field()
content?: string;
@Prop({ type: mongoose.Schema.Types.ObjectId })
@Field(() => String)
senderId?: string;
@Prop({})
sender?: User;
@Prop({ type: mongoose.Schema.Types.ObjectId })
@Field(() => String)
channelId?: string;
@Prop({})
channel?: Channel;
}
您可以观察到,核心实体只是一个保持独立且没有任何依赖性的接口。实施此核心实体是该框架(例如猫鼬)的责任。因此,假设我们决定用Mongoose从MongoDB切换到Typeorm后GostgreSQL。我们需要做的就是让新框架(TypeOm)实现核心实体。这突出了我们实体的框架独立性,允许在不同技术之间进行无缝过渡,而不会损害我们应用程序的功能或结构。
接下来,让我们看看Message
用例。
@Injectable()
export class MessageUseCase {
constructor(
@Inject(DATA_SERVICE_TOKEN)
private readonly dataService: IDataService,
@Inject(PUBSUB_SERVICE_TOKEN)
private readonly pubSubService: IPubSubService,
) {}
async createOne(params: {
senderId: string;
createMessageInput: CreateChannelMessageInput;
}): Promise<Message> {
...
}
}
在大多数情况下,我们的业务逻辑需要与数据库进行互动以读取和编写数据。但是,将数据库模型直接注入用例违反了干净体系结构的原理,因为它引入了对特定框架的依赖性。这意味着,当我们从其他框架切换到不同的数据库模型时,我们也需要修改用例。为了克服这一点,我们可以再次利用依赖性反转原理(DIP)。通过将IDataService
注入我们的用例中,用例现在取决于IDataService
接口。实施此接口的责任在于数据访问框架。
看一下IDataService
接口:
export const DATA_SERVICE_TOKEN = Symbol('DATA_SERVICE_TOKEN');
export interface IDataService {
users: IRepositories.IUserRepository;
messages: IRepositories.IMessageRepository;
channels: IRepositories.IChannelRepository;
workspaces: IRepositories.IWorkspaceRepository;
members: IRepositories.IMemberRepository;
channelMembers: IRepositories.IChannelMemberRepository;
}
这是IDataService
的实现的示例:
@Injectable()
export class MongooseDataService implements IDataService {
constructor(
@Inject(IRepositories.USER_REPOSITORY_TOKEN)
public readonly users: IRepositories.IUserRepository,
@Inject(IRepositories.MESSAGE_REPOSITORY_TOKEN)
public readonly messages: IRepositories.IMessageRepository,
@Inject(IRepositories.MEMBER_REPOSITORY_TOKEN)
public readonly members: IRepositories.IMemberRepository,
@Inject(IRepositories.CHANNEL_REPOSITORY_TOKEN)
public readonly channels: IRepositories.IChannelRepository,
@Inject(IRepositories.WORKSPACE_REPOSITORY_TOKEN)
public readonly workspaces: IRepositories.IWorkspaceRepository,
@Inject(IRepositories.CHANNEL_MEMBER_REPOSITORY_TOKEN)
public readonly channelMembers: IRepositories.IChannelMemberRepository,
) {}
}
然后,该框架负责实现存储库接口:
@Injectable()
export class MessageRepository
extends BaseRepository<Message>
implements IMessageRepository
{
constructor(
@InjectModel(Message.name)
readonly messageModel: Model<Message>,
) {
super(messageModel);
}
...
}
使用IDataService
而不是直接将存储库注入用例的原因是避免在用例中拥有很多依赖项。通过注入IDataService
,我们可以访问所有存储库。
现在,让我们使用以下图来查看所有行动:
如图所示,我们的用例已实现了框架独立性。我们可以在不修改核心业务逻辑的情况下无缝过渡到任何数据库。遵守清洁架构的原理可确保灵活且可维护的应用结构。
结论
就是这样。我们只是介绍了使用Nestjs设计可扩展聊天应用程序的概述。在此博客中,我认为您已经在软件行业中拥有经验,因此我并没有深入研究依赖性反转原则或干净体系结构等概念。在一篇博客文章中探索所有这些概念将使信息过多而压倒性的信息。该博客的主要重点是强调有助于我们体系结构可扩展性的关键因素。如果您发现此博客很有帮助,并希望我对提到的概念提供更多深入的解释,请随时在评论部分让我知道。
快乐黑客! ð