nng-study

github

NNG 是 nanomsg 的继任版本,而 nanomsg 则是流行的 ZMQ 的 C 重写版。

整体上看,NNG 的 API 很简约,主要是 4 个,open/recv/send/close,open 根据协议不同使用的函数会不同。配置则是 setopt/getopt,与 UNIX API 类似。API 中没有上下文环境(context-less)依赖,只需要一个 nng_socket,

使用 CMAKE 构建项目

nng_recvmsg() does describe NNG_FLAG_NONBLOCK. nng_recv() just calls nng_recvmsg() under the hood.

NNG 支持的通信协议主要有以下几种:

PAIR 一对一双向通信。
PIPELINE(PUSH/PULL) 单向通信,类似与生产者消费者模型的消息队列。
PUB/SUB 单向广播。
REQ/REP 请求-应答模式,类似于 RPC 模式。
BUS 网状连接通信,每个加入节点都可以发送/接受广播消息。
SURVEY 用于多节点表决或者服务发现。

NNG 支持的传输模式主要有以下三种常用,其他还有tcp附加tls 1.2加密的tls传输和基于WebSocket的ws传输:

inproc 进程内线程间传输
ipc 主机内进程间传输
tcp 网络内主机间传输

通信协议里除了 PAIR 之外,基本都是一对多的通信模式

不同机器进程间:
TCP - network transport via TCP
WS - websockets over TCP
服务端:”tcp://*:5555”
客户端:”tcp://localhost:5555”

同台机器进程间:
IPC - transport between processes on a single machine
ipc:///tmp/reqrep.ipc

INPROC - transport within a process (between threads, modules etc.)
同进程的线程,模块间通信:
“inproc://rot13”

编译使用

使用 cmake 创建2015工程后,编译出错:win_thread.obj : error LNK2019: unresolved external symbol _InterlockedDecrementRelease64 referenced in function _nni_atomic_dec64_nv

使用release v1.1.1编译通过,master分支不行

同机进程间通信:”ipc:///rep_service”

下面的问题没遇到,直接运行成功。
The libraries you would need to add for windows are:

ws2_32
mswsock
advapi32

REP/REQ

REQ

#define NNG_STATIC_LIB
#include <nng/nng.h>
#include <nng/protocol/reqrep0/rep.h>
#define URL_REPLY1 "ipc://server/rep_server"
#ifdef _DEBUG
#pragma comment(lib,"nng.lib")
#else
#pragma comment(lib,"nng.lib")
#endif

nng_socket m_sockRep;
std::thread m_threadRep;

m_threadRep = std::thread(threadReplyProc1,this);

UINT threadReplyProc1(LPVOID lpParam)
{
_cprintf("threadReplyProc1 enter....\r\n");

CnngServerDemoDlg* pwnd = (CnngServerDemoDlg*)lpParam;
int rv;
if ((rv = nng_rep0_open(&pwnd->m_sockRep)) != 0) {
ASSERT(FALSE);
}
if ((rv = nng_listen(pwnd->m_sockRep, URL_REPLY1, NULL, 0)) != 0) {
ASSERT(FALSE);
}
if ((rv = nng_setopt_ms(pwnd->m_sockPair, NNG_OPT_RECVTIMEO, 500)) != 0) {
g_logger->error("thread_python_communicate NNG_OPT_RECVTIMEO failed. ret={},url={}", rv, URL_PAIR_FIREWORKS);
}
if ((rv = nng_setopt_ms(pwnd->m_sockPair, NNG_OPT_SENDTIMEO, 1000)) != 0) {
g_logger->error("thread_python_communicate NNG_OPT_RECVTIMEO failed. ret={},url={}", rv, URL_PAIR_FIREWORKS);
}
while (FLAG_EXIT_SERVER_REP != (pwnd->m_flag & FLAG_EXIT_SERVER_REP))
{
char* buf = NULL;
size_t sz = 0;
if ((rv = nng_recv(pwnd->m_sockRep, &buf, &sz, NNG_FLAG_ALLOC)) != 0)
{
if (rv == NNG_ECLOSED)
{
break;
}
else
{
nng_close(pwnd->m_sockRep);
ASSERT(FALSE);
}
}
// 这里会自动释放 buf
rv = nng_send(pwnd->m_sockRep, buf, sz, 0);
if (rv != 0) {
ASSERT(FALSE);
//fatal("nng_send", rv);
}
nng_free(buf, sz);

}

pwnd->m_flag &= (~FLAG_SERVER_REP_RUNNING);

_cprintf("threadReplyProc1 exit....\r\n");
return 0;
}

void CnngServerDemoDlg::OnBnClickedBtnRepServer()
{
if (FLAG_SERVER_REP_RUNNING != (m_flag& FLAG_SERVER_REP_RUNNING))
{
m_flag &= (~FLAG_EXIT_SERVER_REP);
m_threadRep = std::thread(threadReplyProc1, this);
m_flag |= FLAG_SERVER_REP_RUNNING;
}
}

void CnngServerDemoDlg::OnBnClickedBtnStopRep()
{
if (FLAG_SERVER_REP_RUNNING == (m_flag & FLAG_SERVER_REP_RUNNING))
{
nng_close(m_sockRep);
}
}

void CnngServerDemoDlg::OnDestroy()
{
CDialogEx::OnDestroy();

if (FLAG_SERVER_REP_RUNNING == (m_flag & FLAG_SERVER_REP_RUNNING))
{
nng_close(m_sockRep);
}
if (m_threadRep.joinable())
{
m_threadRep.join();
}

nng_fini();
_cprintf("OnDestroy exit....\r\n");
}

pair

正常使用 version 0

version 1 支持额外的拓扑结构
Version 1 of this protocol supports an optional polyamorous mode where a peer can maintain multiple partnerships. Using this mode requires some additional sophistication in the application.

server

#define NNG_STATIC_LIB
#include <nng/nng.h>
#include <nng/protocol/reqrep0/rep.h>
#include <nng/protocol/reqrep0/req.h>
#include <nng/protocol/pair0/pair.h>
#define URL_REPLY1 "ipc://server/rep_server"
#define URL_PAIR_SIMULATE "ipc:///pair.ipc/simulate"

#ifdef _DEBUG
#ifdef _WIN64
#pragma comment(lib,"nngd_x64.lib")
#else
#pragma comment(lib,"nngd.lib")
#endif
#else
#ifdef _WIN64
#pragma comment(lib,"nng_x64.lib")
#else
#pragma comment(lib,"nng.lib")
#endif
#endif

// 进程通讯部分 -- drivers communicate
nng_socket m_sockPair;
std::thread m_threadWork;

m_threadWork = std::thread(thread_driver_simulator, this);

int thread_driver_simulator(const void* p)
{
CMRLGaugerDlg* pwnd = (CMRLGaugerDlg*)p;

int rv;
while (true)
{
if ((rv = nng_pair0_open(&pwnd->m_sockPair)) != 0) {
ASSERT(FALSE);
g_logger->error("thread_driver_simulator open failed. ret={}", rv);
this_thread::sleep_for(chrono::milliseconds(1000));
continue;
}
if ((rv = nng_listen(pwnd->m_sockPair, URL_PAIR_SIMULATE, NULL, 0)) != 0) {
ASSERT(FALSE);
g_logger->error("thread_driver_simulator listen failed. ret={},url={}", rv, URL_PAIR_SIMULATE);
this_thread::sleep_for(chrono::milliseconds(1000));
continue;
}
if ((rv = nng_setopt_ms(pwnd->m_sockPair, NNG_OPT_RECVTIMEO, 500)) != 0) {
g_logger->error("thread_python_communicate NNG_OPT_RECVTIMEO failed. ret={},url={}", rv, URL_PAIR_FIREWORKS);
}
if ((rv = nng_setopt_ms(pwnd->m_sockPair, NNG_OPT_SENDTIMEO, 1000)) != 0) {
g_logger->error("thread_python_communicate NNG_OPT_RECVTIMEO failed. ret={},url={}", rv, URL_PAIR_FIREWORKS);
}
break;
}

while (FLAG_THREAD_DRIVER_EXIT != (pwnd->m_flag&FLAG_THREAD_DRIVER_EXIT))
{
char* buf = NULL;
size_t sz = 0;
if ((rv = nng_recv(pwnd->m_sockPair, &buf, &sz, NNG_FLAG_ALLOC)) != 0)
{
g_logger->error("nng_recv failed. ret={}",rv);
if (rv == NNG_ECLOSED)
{
//ASSERT(FALSE);
break;
}
else
{
nng_close(pwnd->m_sockPair);
ASSERT(FALSE);
}
}
string str = &buf[0];
str.resize(sz);
// 78.5046,-95.3009,-46.9571,52.6307,-64.0416,-9.2666,6.4361,-59.7699
pwnd->show_sensor_data(str);
#ifdef _DEBUG
str += "\r\n";
_cprintf(str.c_str());
#endif
nng_free(buf, sz);
}
nng_close(pwnd->m_sockPair);
return 0;
}