网站首页 > 物联资讯 > 技术分享

在Windows系统上实现轻量级的线程间及进程间消息队列

2016-09-28 00:00:00 广州睿丰德信息科技有限公司 阅读
睿丰德科技 专注RFID识别技术和条码识别技术与管理软件的集成项目。质量追溯系统、MES系统、金蝶与条码系统对接、用友与条码系统对接

Windows没有message queue累世的IPC内核对象,使得在在处理IPC时少了一种传递消息的手段。

利用Windows的Naming Object可以实现一套简单的Inter-Thread消息队列。这里并不使用socket,因为一旦使用socket,就得负责port管理,很麻烦,另外在对外接口上也很难和vxworks等msgq接口保持一致。所以后来干脆把接口定义成了类vxworks接口。

偶然间看了一眼云风的http://blog.codingnow.com/中关于windows进程间内存共享的blog,决定将其改成同时支持Inter-Thread和Inter-Process Message Queue的有名对象。

目前接口定义如下, 可下载包见 tinymq-binary-0.1.zip 以及测试 demo-0.1.zip

/* msgQueue.h - declaration of VxWorks-like message queue */

/*
 * This file has no copyright assigned and is placed in the Public Domain.
 * This file is a part of the virtual operating system package.
 * No warranty is given; refer to the file DISCLAIMER within the package.
 *
 */

/*
modification history
--------------------
01a,11nov11,sgu  created
*/

/*
DESCRIPTION
This module implements the functions for a message queue. The message queue
manipulation functions in this file are similar with the Wind River VxWorks
kernel message queue interfaces.

The memory architecture for a message queue:
----------------   -----------------------------------------------------------
| local memory |-->|                     shared memory                       |
----------------   -----------------------------------------------------------
       ^                                     ^
       |                                     |
----------------   -----------------------------------------------------------
|    MSG_Q     |   | MSG_SM | MSG_NODE list |       message queue data       |
----------------   -----------------------------------------------------------
                                    ^                         ^
                                    |                         |
             ---------------------------------------------    |
             | MSG_NODE1 | MSG_NODE2 | ... | MSG_NODE(N) |    |
             ---------------------------------------------    |
                                                              |
                                              ---------------------------------
                                              | data1 | data2 | ... | data(N) |
                                              ---------------------------------

Each message queue in memory can be divided into two parts, local memory and
shared memory, but these two parts are not closed by. The local memory can be
accessed in an process, also can be accessed between threads in the process.
The shared memory can be accessed between threads in a process if the message
queue name is NULL; or can be accessed between processes if the message queue
name is not NULL. There is one data structure MSG_Q in local memory; three data
structures -- MSG_SM, MSG_NODE list and message queue data in shared memory.
The structure MSG_Q saves the kernel objects handlers and the shared memory
address; MSG_SM saves the message queue attributes; MSG_NODE list saves all the
nodes for the message queue, and each node saves all attribute of each message;
the message queue data area saves all the data for all the message. All the
structures defined below.

If you meet some problem with this module, please feel free to contact
me via e-mail: gushengyuan2002@163.com
*/

#ifndef _MSG_QUEUE_H_
#define _MSG_QUEUE_H_

/* defines */

/* wait forever for timeout flag */
#define WAIT_FOREVER    -1

/* version string length */
#define VERSION_LEN     8

/* create an inter-thread message queue */
#define msgQCreate(maxMsgs, maxMsgLength, options) \
        msgQCreateEx(maxMsgs, maxMsgLength, options, NULL)

/* typedefs */

typedef unsigned int UINT;
typedef void* MSG_Q_ID;    /* message queue identify */

/* message queue options for task waiting for a message */
enum MSG_Q_OPTION{
    MSG_Q_FIFO     = 0x0000,
    MSG_Q_PRIORITY = 0x0001
};

/* message sending options for sending a message */
enum MSG_Q_PRIORITY{
    MSG_PRI_NORMAL = 0x0000, /* put the message at the end of the queue */
    MSG_PRI_URGENT = 0x0001  /* put the message at the frond of the queue */
};

/* message queue status */
typedef struct tagMSG_Q_STAT {
    char version[VERSION_LEN];  /* library version */
    int maxMsgs;                /* max messages that can be queued */
    UINT maxMsgLength;          /* max bytes in a message */
    int options;                /* message queue options */
    int msgNum;                 /* message number in the queue */
    int sendTimes;              /* number of sent */
    int recvTimes;              /* number of received */
}MSG_Q_STAT;

#ifdef __cplusplus
extern "C"
{
#endif

/* declarations */

/*******************************************************************************
 * msgQCreateEx - create a message queue, queue pended tasks in FIFO order
 *
 * create a message queue, queue pended tasks in FIFO order.
 * <name> message name, if name equals NULL, create an inter-thread message
 * queue, or create an inter-process message queue.
 *
 * RETURNS: MSG_Q_ID when success or NULL otherwise.
 */
MSG_Q_ID msgQCreateEx
    (
    int maxMsgs,     /* max messages that can be queued */
    int maxMsgLength,/* max bytes in a message */
    int options,     /* message queue options, ignored on Windows platform */
    const char *name /* message name */
    );

/*******************************************************************************
 * msgQOpen - open a message queue
 *
 * open a message queue.
 *
 * RETURNS: MSG_Q_ID when success or NULL otherwise.
 */
MSG_Q_ID msgQOpen
    (
    const char * name   /* message name */
    );

/*******************************************************************************
 * msgQDelete - delete a message queue
 *
 * delete a message queue.
 *
 * RETURNS: 0 when success or -1 otherwise.
 */
int msgQDelete
    (
    MSG_Q_ID msgQId /* message queue to delete */
    );

/*******************************************************************************
 * msgQReceive - receive a message from a message queue
 *
 * receive a message from a message queue.
 *
 * RETURNS: 0 when success or -1 otherwise.
 */
int msgQReceive
    (
    MSG_Q_ID msgQId,  /* message queue from which to receive */
    char * buffer,    /* buffer to receive message */
    UINT maxNBytes,   /* length of buffer */
    int timeout       /* ticks to wait */
    );

/*******************************************************************************
 * msgQSend - send a message to a message queue
 *
 * send a message to a message queue.
 *
 * RETURNS: 0 when success or -1 otherwise.
 */
int msgQSend
    (
    MSG_Q_ID msgQId, /* message queue on which to send */
    char * buffer,   /* message to send */
    UINT nBytes,     /* length of message */
    int timeout,     /* ticks to wait */
    int priority     /* MSG_PRI_NORMAL or MSG_PRI_URGENT */
    );

/*******************************************************************************
 * msgQStat - get the status of message queue
 *
 * get the detail status of a message queue.
 *
 * RETURNS: 0 when success or -1 otherwise.
 */
int msgQStat
    (
    MSG_Q_ID msgQId,
    MSG_Q_STAT * msgQStatus
    );

/*******************************************************************************
 * msgQShow - show the status of message queue
 *
 * show the detail status of a message queue.
 *
 * RETURNS: 0 when success or -1 otherwise.
 */
int msgQShow
    (
    MSG_Q_ID msgQId
    );

#ifdef __cplusplus
}
#endif

#endif

RFID管理系统集成商 RFID中间件 条码系统中间层 物联网软件集成