Files
QNX/services/mqueue/io_write.c
2025-08-20 19:02:58 +08:00

168 lines
4.7 KiB
C

/*
* $QNXLicenseC:
* Copyright 2007, QNX Software Systems. All Rights Reserved.
*
* You must obtain a written license from and pay applicable license fees to QNX
* Software Systems before you may reproduce, modify or distribute this software,
* or any work that includes all or part of this software. Free development
* licenses are available for evaluation and non-commercial purposes. For more
* information visit http://licensing.qnx.com or email licensing@qnx.com.
*
* This file may contain contributions from others. Please review this entire
* file for other proprietary rights or license notices, as well as the QNX
* Development Suite License Guide at http://licensing.qnx.com/license-guide/
* for other information.
* $
*/
#include "externs.h"
int
io_write(resmgr_context_t *ctp, io_write_t *msg, struct ocb *ocb) {
MQDEV *dev = ocb->ocb.attr;
MQMSG *mp;
MQWAIT *wp;
void *data;
unsigned priority = 0;
unsigned client_prio;
int nonblock, status, nbytes, rcvid, preread;
// Will be NULL if called from io_closeocb with a closemsg.
if(msg != NULL) {
// Is queue open for write?
if((status = iofunc_write_verify(ctp, msg, &ocb->ocb, &nonblock)) != EOK) {
return status;
}
// If an xtype is specified make sure it is an mqueue.
if((msg->i.xtype & _IO_XTYPE_MASK) != _IO_XTYPE_NONE) {
if((msg->i.xtype & _IO_XTYPE_MASK) != _IO_XTYPE_MQUEUE) {
return ENOSYS;
}
priority = msg->i.xtype >> 16;
if(priority < 0 || priority >= MQ_PRIO_MAX) {
return EINVAL;
}
}
// Is the msg too big for the queue?
if(msg->i.nbytes > dev->mq_attr.mq_msgsize) {
return EMSGSIZE;
}
client_prio = ctp->info.priority;
// If there is not enough room for another msg we must block.
if(dev->mq_attr.mq_curmsgs >= dev->mq_attr.mq_maxmsg) {
if(nonblock & O_NONBLOCK) {
return EAGAIN;
}
if((wp = MemchunkMalloc(memchunk, sizeof(*wp))) == NULL) {
return ENOSPC;
}
wp->rcvid = ctp->rcvid;
wp->scoid = ctp->info.scoid;
wp->coid = ctp->info.coid;
wp->priority = client_prio;
wp->xtype = msg->i.xtype;
LINK_PRI_CLIENT(&dev->waiting_write, wp);
++dev->mq_attr.mq_sendwait;
return _RESMGR_NOREPLY;
}
data = (char *)msg + sizeof(msg->i), nbytes = msg->i.nbytes, preread = ctp->size - sizeof(msg->i);
} else {
data = ocb->closemsg->data, nbytes = preread = ocb->closemsg->nbytes;
}
if((dev->mq_attr.mq_flags & MQ_SEMAPHORE) == 0) {
// Try fast process-to-process flip (without queuing msg)
if((wp = dev->waiting_read) != NULL && (msg == NULL || ctp->size >= sizeof(msg->i) + nbytes)) {
if(nbytes) {
dev->attr.flags |= (IOFUNC_ATTR_CTIME | IOFUNC_ATTR_MTIME | IOFUNC_ATTR_ATIME | IOFUNC_ATTR_DIRTY_TIME);
}
resmgr_endian_context(ctp, _IO_READ, S_IFNAM, wp->xtype);
_IO_SET_READ_NBYTES(ctp, nbytes);
rcvid = ctp->rcvid, ctp->rcvid = wp->rcvid;
if((wp->xtype & _IO_XTYPE_MASK) == _IO_XTYPE_MQUEUE) {
uint32_t prio = priority;
SETIOV(&ctp->iov[0], &prio, sizeof(prio));
SETIOV(&ctp->iov[1], data, nbytes);
resmgr_msgreplyv(ctp, ctp->iov, 2);
} else {
SETIOV(&ctp->iov[0], data, nbytes);
resmgr_msgreplyv(ctp, ctp->iov, 1);
}
ctp->rcvid = rcvid;
dev->waiting_read = wp->next;
MemchunkFree(memchunk, wp);
--dev->mq_attr.mq_recvwait;
return EOK;
}
// Get a msg buffer.
if((mp = MemchunkMalloc(memchunk, MQ_DATAOFF + nbytes)) == NULL) {
return EAGAIN;
}
mp->next = NULL;
mp->priority = priority;
if(mp->nbytes = nbytes) {
dev->attr.flags |= (IOFUNC_ATTR_CTIME | IOFUNC_ATTR_MTIME | IOFUNC_ATTR_DIRTY_TIME);
}
// Save/Get the data into msg buffer
if(msg == NULL || preread >= nbytes) {
memcpy(mp->data, data, nbytes);
} else {
memcpy(&mp->data[0], data, preread);
if(MsgRead(ctp->rcvid, &mp->data[preread], nbytes - preread, ctp->size) != nbytes - preread) {
MemchunkFree(memchunk, mp);
return EIO;
}
}
// Queue the msg
LINK_PRI_MSG(dev->waiting_msg, mp);
}
// Reply with status
if(msg != NULL)
MsgError(ctp->rcvid, EOK);
++dev->mq_attr.mq_curmsgs;
// Keep stat info up-to-date. We overload st_size to be messages waiting.
dev->attr.nbytes = dev->mq_attr.mq_curmsgs;
// Since we added a msg we may need to wake someone waiting for a msg.
if(wp = dev->waiting_read) {
// Unlink and free wait entry
rcvid = wp->rcvid;
dev->waiting_read = wp->next;
MemchunkFree(memchunk, wp);
--dev->mq_attr.mq_recvwait;
// Process the message
resmgr_msg_again(ctp, rcvid);
} else {
// Check for notify conditions
if (IOFUNC_NOTIFY_INPUT_CHECK(dev->notify, dev->mq_attr.mq_curmsgs, dev->mq_attr.mq_curmsgs == 1)) {
iofunc_notify_trigger(dev->notify, dev->mq_attr.mq_curmsgs, IOFUNC_NOTIFY_INPUT);
}
}
return _RESMGR_NOREPLY;
}
__SRCVERSION("io_write.c $Rev: 153052 $");