test init request包 序列化的字符串 会多出字节 浮点型server解析不出来。
This commit is contained in:
@@ -57,7 +57,6 @@ void MS::link(struct evconnlistener *e, int fd, struct sockaddr *addr, int sockl
|
||||
printf("listen_cb\n");
|
||||
pool->add_buffer_event(fd, read_cb, write_cb, event_cb,
|
||||
BEV_OPT_CLOSE_ON_FREE, (sockaddr_in*)addr);
|
||||
|
||||
fflush(stdout);
|
||||
}
|
||||
|
||||
@@ -85,12 +84,15 @@ void MS::write_cb(struct bufferevent *bev, void *ctx) {
|
||||
}
|
||||
|
||||
void MS::event_cb(struct bufferevent *bev, short what, void *ctx) {
|
||||
printf("[event]: %p\n", ctx);
|
||||
BBCA* bbca = (BBCA*) ctx;
|
||||
auto addr = bbca->addr;
|
||||
auto base = bbca->base;
|
||||
|
||||
printf("[event]: %p\n", base);
|
||||
if (what == BEV_EVENT_EOF || BEV_EVENT_ERROR || BEV_EVENT_TIMEOUT) {
|
||||
printf("客户端退出\n");
|
||||
handler::remove_user(bev);
|
||||
}
|
||||
|
||||
fflush(stdout);
|
||||
}
|
||||
|
||||
|
||||
@@ -19,11 +19,11 @@ public:
|
||||
|
||||
std::shared_ptr<agreement_request> operator () (bufferevent* bev, sockaddr_in* addr) {
|
||||
// agreement_request
|
||||
auto type = m_mph->mp_type();
|
||||
auto agreementRequest = std::make_shared<agreement_request>(agreement_request());
|
||||
|
||||
// request
|
||||
auto request = std::make_shared<mp::request>(mp::request());
|
||||
|
||||
request->ParseFromString(m_data);
|
||||
agreementRequest->set(m_mph, request, bev, addr);
|
||||
|
||||
@@ -31,7 +31,7 @@ public:
|
||||
}
|
||||
private:
|
||||
std::shared_ptr<mp::mph> m_mph;
|
||||
char* m_data = nullptr;
|
||||
char* m_data;
|
||||
};
|
||||
|
||||
|
||||
|
||||
@@ -2,12 +2,11 @@
|
||||
// Created by dongl on 23-4-28.
|
||||
//
|
||||
|
||||
#include <arpa/inet.h>
|
||||
#include "handler.h"
|
||||
#include "Response.h"
|
||||
|
||||
// handler 保有 session
|
||||
session* handler::g_session = new session();
|
||||
session* handler::session = new class session();
|
||||
|
||||
/// resp im
|
||||
void handler::resp(const std::shared_ptr<agreement_request>& request,
|
||||
@@ -15,8 +14,8 @@ void handler::resp(const std::shared_ptr<agreement_request>& request,
|
||||
/// 用户操作逻辑包
|
||||
// 定义100序号一下的为操作逻辑业务逻辑 请求与响应 +20 就能一一对应
|
||||
// 例如 login包类型为 request login==0 那么响应这个登陆信息就是 response login==0+20
|
||||
// 100 以上 为im msg 通讯的包类型序号
|
||||
if (request->m_mph->mp_type() < 100) {
|
||||
// 200 以上 为im msg 通讯的包类型序号
|
||||
if (request->m_mph->mp_type() < 200) {
|
||||
auto resp = new Response((mp::MP_TYPE) (request->m_mph->mp_type() + 20),
|
||||
response->m_sri.sri_code(), response->m_sri.sri_username(),
|
||||
response->m_sri.sri_msg(), response->m_sri.sri_token());
|
||||
@@ -31,10 +30,11 @@ void handler::resp(const std::shared_ptr<agreement_request>& request,
|
||||
}
|
||||
}
|
||||
/// end resp im
|
||||
|
||||
// 聊天消息包
|
||||
void handler::send(const std::shared_ptr<agreement_request> &request, const std::shared_ptr<agreement_response> &response) {
|
||||
// 查询在线的用户信息
|
||||
auto ret = g_session->find_user_fd(request->m_body.target());
|
||||
auto ret = session->find_user_fd(request->m_body.target());
|
||||
// 用户信息结构体
|
||||
userinfo *user;
|
||||
if (ret.has_value()) {
|
||||
@@ -50,6 +50,10 @@ handler::ccp2p(const std::shared_ptr<agreement_request> &request, const std::sha
|
||||
|
||||
}
|
||||
|
||||
void handler::remove_user(bufferevent* bev) {
|
||||
session->remove_user(bev);
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
@@ -20,8 +20,11 @@ public:
|
||||
static void send(const std::shared_ptr<agreement_request>& request, const std::shared_ptr<agreement_response>& response);
|
||||
static void ccp2p(const std::shared_ptr<agreement_request>& request, const std::shared_ptr<agreement_response>& response);
|
||||
|
||||
public:
|
||||
static void remove_user(bufferevent* bev);
|
||||
|
||||
protected:
|
||||
static session* g_session;
|
||||
static class session* session;
|
||||
};
|
||||
|
||||
|
||||
|
||||
@@ -6,7 +6,6 @@
|
||||
#include "Response.h"
|
||||
|
||||
|
||||
|
||||
management::management() {
|
||||
|
||||
}
|
||||
@@ -20,29 +19,96 @@ void management::send_packet(bufferevent *bev) {
|
||||
|
||||
}
|
||||
|
||||
void management::read_packet(bufferevent *bev, sockaddr_in* addr) {
|
||||
// read L 读包长度
|
||||
uint8_t packetLen;
|
||||
bufferevent_read(bev, &packetLen, 1);
|
||||
// read V 读包头
|
||||
char data_h[256] = {0};
|
||||
bufferevent_read(bev, data_h, packetLen);
|
||||
void management::read_packet(bufferevent *bev, sockaddr_in *addr) {
|
||||
// while (true) {
|
||||
// // read L 读包长度
|
||||
// uint8_t packetLen;
|
||||
// size_t len1 = bufferevent_read(bev, &packetLen, 1);
|
||||
// if (len1 == 0) {
|
||||
// break;
|
||||
// }
|
||||
//
|
||||
// // read V 读包头
|
||||
// char data_h[256] = {0};
|
||||
// size_t len2 = bufferevent_read(bev, data_h, packetLen);
|
||||
// if (len2 == 0) {
|
||||
// break;
|
||||
// }
|
||||
//
|
||||
// auto mph = std::make_shared<mp::mph>(mp::mph());
|
||||
// mph->ParseFromString(data_h);
|
||||
//
|
||||
// // read V 读包体 包头内含有包体长度
|
||||
// char data_b[256] = {0};
|
||||
// size_t len3 = bufferevent_read(bev, data_b, mph->mpb_size());
|
||||
// if (len3 == 0) {
|
||||
// break;
|
||||
// }
|
||||
//
|
||||
// // 请求
|
||||
// auto request = analysis(mph, data_b)(bev, addr);
|
||||
// // 响应
|
||||
// auto response= std::make_shared<agreement_response>(agreement_response());
|
||||
// // 执行逻辑 自定义
|
||||
// mapping::run(mph->mp_type(), request, response);
|
||||
// }
|
||||
|
||||
auto mph = std::make_shared<mp::mph>(mp::mph());
|
||||
mph->ParseFromString(data_h);
|
||||
char buff[2048] = {0};
|
||||
std::atomic<long> len_index = bufferevent_read(bev, buff, 2048);
|
||||
|
||||
// read V 读包体 包头内含有包体长度
|
||||
char data_b[256] = {0};
|
||||
bufferevent_read(bev, data_b, mph->mpb_size());
|
||||
// 处理粘包
|
||||
char packet_h[50] = {0};
|
||||
char packet_b[256] = {0};
|
||||
while (len_index > 0) {
|
||||
memset(packet_h, 0, sizeof(packet_h));
|
||||
memset(packet_b, 0, sizeof(packet_b));
|
||||
|
||||
// 请求
|
||||
auto request = analysis(mph, data_b)(bev, addr);
|
||||
// 响应
|
||||
auto response= std::make_shared<agreement_response>(agreement_response());
|
||||
// 执行逻辑 自定义
|
||||
mapping::run(mph->mp_type(), request, response);
|
||||
std::cout << len_index << std::endl;
|
||||
std::cout << buff << std::endl;
|
||||
|
||||
/// read L 读包长度
|
||||
uint8_t packetLen;
|
||||
// 取包长度
|
||||
memcpy(&packetLen, buff, 1);
|
||||
// 更新buffer
|
||||
memcpy(buff, buff + 1, strlen(buff) - 1);
|
||||
// 更新buffer长度
|
||||
len_index -= 1;
|
||||
|
||||
/// read V 读包头
|
||||
// 取包头
|
||||
memcpy(packet_h, buff, packetLen);
|
||||
// 更新buffer
|
||||
memcpy(buff, buff + packetLen, strlen(buff) - packetLen);
|
||||
// 更新buffer长度
|
||||
len_index -= packetLen;
|
||||
|
||||
// 解包
|
||||
auto mph = std::make_shared<mp::mph>(mp::mph());
|
||||
mph->ParseFromString(packet_h);
|
||||
|
||||
/// read V 读包体 包头内含有包体长度
|
||||
// 取包体
|
||||
memcpy(packet_b, buff, mph->mpb_size());
|
||||
// 更新buffer
|
||||
memcpy(buff, buff + mph->mpb_size(), strlen(buff) - mph->mpb_size());
|
||||
// 更新buffer长度
|
||||
len_index -= mph->mpb_size();
|
||||
|
||||
std::cout << buff << std::endl;
|
||||
|
||||
// 请求
|
||||
auto request = analysis(mph, packet_b)(bev, addr);
|
||||
// 响应
|
||||
auto response = std::make_shared<agreement_response>(agreement_response());
|
||||
// 执行逻辑 自定义
|
||||
mapping::run(mph->mp_type(), request, response);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
@@ -36,9 +36,9 @@ void mapping::run(const mp::MP_TYPE mpTYpe, std::shared_ptr<agreement_request>&
|
||||
printf("[packet type]:%s\n", myenumToString(mpTYpe));
|
||||
// 取出需要的执行对象
|
||||
auto fun = map.find(mpTYpe)->second;
|
||||
// 开始执行 请求
|
||||
// 开始执行 处理请求的数据
|
||||
fun->run(request, response);
|
||||
// 发送 响应
|
||||
// 发送 响应数据
|
||||
handler::resp(request, response);
|
||||
}
|
||||
|
||||
|
||||
@@ -30,11 +30,12 @@ void session::remove_user(bufferevent *bev) {
|
||||
}
|
||||
|
||||
user_fd.erase(target_ele);
|
||||
bufferevent_free(bev);
|
||||
}
|
||||
|
||||
void session::remove_user(const std::shared_ptr<agreement_request>& request) {
|
||||
bufferevent_free(request->m_bev);
|
||||
user_fd.erase(strtol(request->m_body.account().c_str(), nullptr, 0));
|
||||
bufferevent_free(request->m_bev);
|
||||
}
|
||||
|
||||
bool session::is_user(const std::string& account) {
|
||||
|
||||
@@ -51,13 +51,27 @@ void read(evutil_socket_t, short, void *) {
|
||||
fflush(stdout);
|
||||
}
|
||||
|
||||
void ev_pool::add_buffer_event(evutil_socket_t fd, bufferevent_data_cb readcb, bufferevent_data_cb writecb,
|
||||
bufferevent* ev_pool::add_buffer_event(evutil_socket_t fd, bufferevent_data_cb readcb, bufferevent_data_cb writecb,
|
||||
bufferevent_event_cb eventcb, short events, sockaddr_in* addr) {
|
||||
// 调度一个base集合;
|
||||
event_base* base = dispatching();
|
||||
// 创建socket链接监听
|
||||
bufferevent* bev = bufferevent_socket_new(base, fd, events);
|
||||
|
||||
/// 设置水位
|
||||
// 读取缓冲水位
|
||||
bufferevent_setwatermark(bev,
|
||||
EV_READ,
|
||||
1, // 低水位 0 就是无限制 最小取出量
|
||||
10240 // 高水位 0 就是无限制 当前最大容量 直接取出
|
||||
);
|
||||
// 输出缓冲区水位
|
||||
bufferevent_setwatermark(bev,
|
||||
EV_WRITE,
|
||||
39, // 低水位 输出缓冲区数据低于1024 才会继续向缓冲写入数据 写入回调被发送 设置 输出缓冲区的大小 就是?
|
||||
0 // 高水位无效
|
||||
);
|
||||
|
||||
BBCA* bbca = new BBCA();
|
||||
bbca->base = base;
|
||||
bbca->addr = addr;
|
||||
@@ -68,6 +82,8 @@ void ev_pool::add_buffer_event(evutil_socket_t fd, bufferevent_data_cb readcb, b
|
||||
bufferevent_enable(bev, EV_READ | EV_WRITE);
|
||||
m_bevs.insert(std::pair<event_base*, bufferevent*>(base, bev));
|
||||
printf("event_base: %p, fd: %d\n", base, fd);
|
||||
|
||||
return bev;
|
||||
}
|
||||
|
||||
void ev_pool::add_event(evutil_socket_t fd, short events, event_callback_fn callback, void *callback_arg) {
|
||||
|
||||
@@ -23,7 +23,7 @@ public:
|
||||
void add_event_base(const std::function<void()>& function);
|
||||
void add_event_bases(int num);
|
||||
bool polling(bool poll = false);
|
||||
void add_buffer_event(evutil_socket_t fd, bufferevent_data_cb readcb, bufferevent_data_cb writecb,
|
||||
bufferevent* add_buffer_event(evutil_socket_t fd, bufferevent_data_cb readcb, bufferevent_data_cb writecb,
|
||||
bufferevent_event_cb eventcb, short events, sockaddr_in* addr);
|
||||
void add_event(evutil_socket_t fd, short events, event_callback_fn callback, void *callback_arg);
|
||||
event_base* dispatching();
|
||||
|
||||
@@ -13,7 +13,7 @@ void PEVerifCodeController::run(std::shared_ptr<agreement_request> request, std:
|
||||
// 发送验证码至邮
|
||||
bool state = peVerifCodeService.send_email(request->m_body.account(), code);
|
||||
// 设置session 字段
|
||||
g_session->set_session(request->m_bev, "code", code);
|
||||
session->set_session(request->m_bev, "code", code);
|
||||
|
||||
if (state) {
|
||||
sri->set_sri_code(mp::MP_PE_CODE_SUCCESS);
|
||||
|
||||
@@ -15,8 +15,8 @@ public:
|
||||
|
||||
private:
|
||||
PEVerifCodeService peVerifCodeService = PEVerifCodeService();
|
||||
static std::map<bufferevent*, std::string> session; // 客户端 fd 与 code 绑定的 session
|
||||
static std::map<uint64_t, bufferevent*> time_wheel; // 时间轮定时容器 看验证码超时的
|
||||
// static std::map<bufferevent*, std::string> session; // 客户端 fd 与 code 绑定的 session
|
||||
// static std::map<uint64_t, bufferevent*> time_wheel; // 时间轮定时容器 看验证码超时的
|
||||
};
|
||||
|
||||
|
||||
|
||||
@@ -5,6 +5,9 @@
|
||||
#include "UserProveController.h"
|
||||
|
||||
void UserProveController::run(std::shared_ptr<agreement_request> request, std::shared_ptr<agreement_response> response) {
|
||||
printf("客户端版本:%f", request->m_cqi.cqi_version());
|
||||
printf("account:%s", request->m_body.account().c_str());
|
||||
|
||||
// 登陆
|
||||
if (request->m_mph->mp_type() == mp::MP_REQUEST_LOGIN) {
|
||||
auto sri = service.login(request->m_body.subcommand(),
|
||||
@@ -12,22 +15,22 @@ void UserProveController::run(std::shared_ptr<agreement_request> request, std::s
|
||||
response->set(sri, request->m_bev);
|
||||
|
||||
// 登陆的用户 直接在全局 注册 在map中 直接存入服务器内存
|
||||
handler::add_user(sri, request);
|
||||
session->add_user(sri, request);
|
||||
}
|
||||
// 注册
|
||||
else if (request->m_mph->mp_type() == mp::MP_REQUEST_REGISTER) {
|
||||
auto sri = service.register_(request->m_body.subcommand(),
|
||||
request->m_body.email(), request->m_body.password());
|
||||
request->m_body.account(), request->m_body.password());
|
||||
response->set(sri, request->m_bev);
|
||||
}
|
||||
// 退出登陆
|
||||
else if (request->m_mph->mp_type() == mp::MP_REQUEST_LOGOUT) {
|
||||
// 查看当前用户是否在线
|
||||
bool state = handler::is_user(request->m_body.account());
|
||||
bool state = session->is_user(request->m_body.account());
|
||||
|
||||
// current user list used redis 在优化
|
||||
if (state) {
|
||||
handler::remove_user(request);
|
||||
session->remove_user(request);
|
||||
auto sri = service.logout(request->m_body.account(), state);
|
||||
response->set(sri, request->m_bev);
|
||||
} else {
|
||||
|
||||
Reference in New Issue
Block a user