This commit is contained in:
2023-05-18 19:58:22 +08:00
parent 68b3d8e95d
commit 996223d36c
8 changed files with 74 additions and 45 deletions

View File

@@ -4,6 +4,7 @@ project(IM2)
set(CMAKE_CXX_STANDARD 17)
add_subdirectory(MS)
add_subdirectory(MessageSystem)
add_subdirectory(MP)
add_subdirectory(MC/gui)
add_subdirectory(MC/api)

View File

@@ -9,16 +9,18 @@ import "mp.mp.proto";
/*************************************/
// 推送 [通知包]
message notice { // pull push
string message_id = 1; // 消息id[] 数组 字节序 转成 uint64
uint64 message_id = 1; // 消息id[] 数组 字节序 转成 uint64 用户级id
uint64 time = 2; // 时间 用于消息同步的生命周期
}
// 获取/推送 [消息包]
message data {
MP_SUB_TYPE msg_type = 1; // [] 数组 // 消息类型 子命令 分辨什么类型的消息 文本 视频 音频?
MP_SUB_TYPE session_type = 2; // [] 数组 // 会话 目标/来源 类型 分辨是群消息还是 单体消息
string message_id = 3; // [] 数组 // 消息id 字节序 转成 uint64
uint64 account = 4; // [] 数组 // 消息 目标/来源
string msg_data = 5; // [] 数组 // 消息数据字节序
uint64 message_id = 3; // [] 数组 // 消息id 字节序 转成 uint64
uint64 time = 4; // 时间
uint64 account = 5; // [] 数组 // 消息 目标/来源
string msg_data = 6; // [] 数组 // 消息数据字节序
}

View File

@@ -5,7 +5,6 @@ aux_source_directory(pool/thread DIR_THREAD_POOL)
aux_source_directory(pool/object DIR_OBJECT_POOL)
aux_source_directory(mmm DIR_MMM)
aux_source_directory(smtp DIR_EMAIL)
aux_source_directory(message DIR_MESSAGE)
include_directories(${CMAKE_SOURCE_DIR}/include/libevent)
include_directories(${CMAKE_SOURCE_DIR}/include/ini)
@@ -32,7 +31,6 @@ add_executable(MS
MS.cpp
MS.cpp
${DIR_EMAIL}
${DIR_MESSAGE}
${DIR_MMM}
${DIR_THREAD_POOL}
${DIR_MEM_POOL}

View File

@@ -1,39 +0,0 @@
//
// Created by dongl on 23-5-17.
//
#ifndef IM2_TIMELINE_H
#define IM2_TIMELINE_H
/**
* Timeline模型
* 一个抽象表述Timeline可以简单理解为是一个消息队列但这个消息队列有如下特性
*
* 每个消息拥有一个顺序IDSeqId在队列后面的消息的SeqId一定比前面的消息的SeqId大也就是保证SeqId一定是增长的但是不要求严格递增
* 新的消息永远在尾部添加保证新的消息的SeqId永远比已经存在队列中的消息都大
* 可根据SeqId随机定位到具体的某条消息进行读取也可以任意读取某个给定范围内的所有消息。
*/
/**
* 消息同步
*
* 有了这些特性后消息的同步可以拿Timeline来很简单的实现。图中的例子中消息发送方是A消息接收方是B同时B存在多个接收端分别是B1、B2和B3。
* A向B发送消息消息需要同步到B的多个端待同步的消息通过一个Timeline来进行交换。A向B发送的所有消息都会保存在这个Timeline中
* B的每个接收端都是独立的从这个Timeline中拉取消息。每个接收端同步完毕后都会在本地记录下最新同步到的消息的SeqId即最新的一个位点
* 作为下次消息同步的起始位点。服务端不会保存各个端的同步状态,各个端均可以在任意时间从任意点开始拉取消息。
*/
/**
* 消息漫游
* 也是基于Timeline
* 和消息同步唯一的区别是消息漫游要求服务端能够对Timeline内的所有数据进行 持久化。
*/
/// 每个会话 通过 TimeLine 维护
class TimeLine {
};
#endif //IM2_TIMELINE_H

View File

@@ -80,6 +80,10 @@ bufferevent* ev_pool::add_buffer_event(evutil_socket_t fd, bufferevent_data_cb r
bufferevent_setcb(bev, readcb, writecb, eventcb, bbca);
// 启用
bufferevent_enable(bev, EV_READ | EV_WRITE);
// 设置超时
timeval t = {30*60,0};
bufferevent_set_timeouts(bev, &t, nullptr);
m_bevs.insert(std::pair<event_base*, bufferevent*>(base, bev));
printf("event_base: %p, fd: %d\n", base, fd);

View File

@@ -0,0 +1,14 @@
project(MessageSystem)
aux_source_directory(message_push PUSH)
aux_source_directory(storage STORAGE)
aux_source_directory(synchronization SYN)
include_directories(${CMAKE_SOURCE_DIR}/MP)
add_library(MessageSystem
${PUSH}
${STORAGE}
${SYN}
TimeLine.cpp
)

49
MessageSystem/TimeLine.h Normal file
View File

@@ -0,0 +1,49 @@
//
// Created by dongl on 23-5-17.
//
#ifndef IM2_TIMELINE_H
#define IM2_TIMELINE_H
#include <string>
#include <queue>
#include "proto/mp.mp.pb.h"
struct SynMsg {
uint64_t message_id;
time_t msg_time;
};
struct StorageMsg {
mp::MP_SUB_TYPE msg_type;
mp::MP_SUB_TYPE session_type;
uint64_t message_id;
time_t msg_time;
uint64_t account;
std::string msg_data;
};
template<class T>
class TimeLine {
void push(T ele) {
queue.push(ele);
}
T* pull() {
auto ele = queue.front();
queue.back();
return ele;
}
private:
std::mutex mutex;
std::queue<T> queue;
};
#endif //IM2_TIMELINE_H