github
NNG 是 nanomsg 的继任版本,而 nanomsg 则是流行的 ZMQ 的 C 重写版。
整体上看,NNG 的 API 很简约,主要是 4 个,open/recv/send/close,open 根据协议不同使用的函数会不同。配置则是 setopt/getopt,与 UNIX API 类似。API 中没有上下文环境(context-less)依赖,只需要一个 nng_socket,
使用 CMAKE 构建项目
nng_recvmsg() does describe NNG_FLAG_NONBLOCK. nng_recv() just calls nng_recvmsg() under the hood.
NNG 支持的通信协议主要有以下几种: PAIR 一对一双向通信。 PIPELINE(PUSH/PULL) 单向通信,类似与生产者消费者模型的消息队列。 PUB/SUB 单向广播。 REQ/REP 请求-应答模式,类似于 RPC 模式。 BUS 网状连接通信,每个加入节点都可以发送/接受广播消息。 SURVEY 用于多节点表决或者服务发现。
NNG 支持的传输模式主要有以下三种常用,其他还有tcp附加tls 1.2加密的tls传输和基于WebSocket的ws传输: inproc 进程内线程间传输 ipc 主机内进程间传输 tcp 网络内主机间传输
通信协议里除了 PAIR 之外,基本都是一对多的通信模式
不同机器进程间: TCP - network transport via TCP WS - websockets over TCP 服务端:”tcp://*:5555” 客户端:”tcp://localhost:5555”
同台机器进程间: IPC - transport between processes on a single machine ipc:///tmp/reqrep.ipc
INPROC - transport within a process (between threads, modules etc.) 同进程的线程,模块间通信: “inproc://rot13”
编译使用 使用 cmake 创建2015工程后,编译出错:win_thread.obj : error LNK2019: unresolved external symbol _InterlockedDecrementRelease64 referenced in function _nni_atomic_dec64_nv
使用release v1.1.1编译通过,master分支不行
同机进程间通信:”ipc:///rep_service”
下面的问题没遇到,直接运行成功。 The libraries you would need to add for windows are:
ws2_32 mswsock advapi32
REP/REQ REQ #define NNG_STATIC_LIB #include <nng/nng.h> #include <nng/protocol/reqrep0/rep.h> #define URL_REPLY1 "ipc://server/rep_server" #ifdef _DEBUG #pragma comment(lib,"nng.lib" ) #else #pragma comment(lib,"nng.lib" ) #endif nng_socket m_sockRep; std::thread m_threadRep; m_threadRep = std::thread (threadReplyProc1,this ); UINT threadReplyProc1 (LPVOID lpParam) { _cprintf("threadReplyProc1 enter....\r\n" ); CnngServerDemoDlg* pwnd = (CnngServerDemoDlg*)lpParam; int rv; if ((rv = nng_rep0_open (&pwnd->m_sockRep)) != 0 ) { ASSERT (FALSE); } if ((rv = nng_listen (pwnd->m_sockRep, URL_REPLY1, NULL , 0 )) != 0 ) { ASSERT (FALSE); } if ((rv = nng_setopt_ms (pwnd->m_sockPair, NNG_OPT_RECVTIMEO, 500 )) != 0 ) { g_logger->error ("thread_python_communicate NNG_OPT_RECVTIMEO failed. ret={},url={}" , rv, URL_PAIR_FIREWORKS); } if ((rv = nng_setopt_ms (pwnd->m_sockPair, NNG_OPT_SENDTIMEO, 1000 )) != 0 ) { g_logger->error ("thread_python_communicate NNG_OPT_RECVTIMEO failed. ret={},url={}" , rv, URL_PAIR_FIREWORKS); } while (FLAG_EXIT_SERVER_REP != (pwnd->m_flag & FLAG_EXIT_SERVER_REP)) { char * buf = NULL ; size_t sz = 0 ; if ((rv = nng_recv (pwnd->m_sockRep, &buf, &sz, NNG_FLAG_ALLOC)) != 0 ) { if (rv == NNG_ECLOSED) { break ; } else { nng_close (pwnd->m_sockRep); ASSERT (FALSE); } } rv = nng_send (pwnd->m_sockRep, buf, sz, 0 ); if (rv != 0 ) { ASSERT (FALSE); } nng_free (buf, sz); } pwnd->m_flag &= (~FLAG_SERVER_REP_RUNNING); _cprintf("threadReplyProc1 exit....\r\n" ); return 0 ; } void CnngServerDemoDlg::OnBnClickedBtnRepServer () { if (FLAG_SERVER_REP_RUNNING != (m_flag& FLAG_SERVER_REP_RUNNING)) { m_flag &= (~FLAG_EXIT_SERVER_REP); m_threadRep = std::thread (threadReplyProc1, this ); m_flag |= FLAG_SERVER_REP_RUNNING; } } void CnngServerDemoDlg::OnBnClickedBtnStopRep () { if (FLAG_SERVER_REP_RUNNING == (m_flag & FLAG_SERVER_REP_RUNNING)) { nng_close (m_sockRep); } } void CnngServerDemoDlg::OnDestroy () { CDialogEx::OnDestroy (); if (FLAG_SERVER_REP_RUNNING == (m_flag & FLAG_SERVER_REP_RUNNING)) { nng_close (m_sockRep); } if (m_threadRep.joinable ()) { m_threadRep.join (); } nng_fini (); _cprintf("OnDestroy exit....\r\n" ); }
pair 正常使用 version 0
version 1 支持额外的拓扑结构 Version 1 of this protocol supports an optional polyamorous mode where a peer can maintain multiple partnerships. Using this mode requires some additional sophistication in the application.
server #define NNG_STATIC_LIB #include <nng/nng.h> #include <nng/protocol/reqrep0/rep.h> #include <nng/protocol/reqrep0/req.h> #include <nng/protocol/pair0/pair.h> #define URL_REPLY1 "ipc://server/rep_server" #define URL_PAIR_SIMULATE "ipc:///pair.ipc/simulate" #ifdef _DEBUG #ifdef _WIN64 #pragma comment(lib,"nngd_x64.lib" ) #else #pragma comment(lib,"nngd.lib" ) #endif #else #ifdef _WIN64 #pragma comment(lib,"nng_x64.lib" ) #else #pragma comment(lib,"nng.lib" ) #endif #endif nng_socket m_sockPair; std::thread m_threadWork; m_threadWork = std::thread (thread_driver_simulator, this ); int thread_driver_simulator (const void * p) { CMRLGaugerDlg* pwnd = (CMRLGaugerDlg*)p; int rv; while (true ) { if ((rv = nng_pair0_open (&pwnd->m_sockPair)) != 0 ) { ASSERT (FALSE); g_logger->error ("thread_driver_simulator open failed. ret={}" , rv); this_thread::sleep_for (chrono::milliseconds (1000 )); continue ; } if ((rv = nng_listen (pwnd->m_sockPair, URL_PAIR_SIMULATE, NULL , 0 )) != 0 ) { ASSERT (FALSE); g_logger->error ("thread_driver_simulator listen failed. ret={},url={}" , rv, URL_PAIR_SIMULATE); this_thread::sleep_for (chrono::milliseconds (1000 )); continue ; } if ((rv = nng_setopt_ms (pwnd->m_sockPair, NNG_OPT_RECVTIMEO, 500 )) != 0 ) { g_logger->error ("thread_python_communicate NNG_OPT_RECVTIMEO failed. ret={},url={}" , rv, URL_PAIR_FIREWORKS); } if ((rv = nng_setopt_ms (pwnd->m_sockPair, NNG_OPT_SENDTIMEO, 1000 )) != 0 ) { g_logger->error ("thread_python_communicate NNG_OPT_RECVTIMEO failed. ret={},url={}" , rv, URL_PAIR_FIREWORKS); } break ; } while (FLAG_THREAD_DRIVER_EXIT != (pwnd->m_flag&FLAG_THREAD_DRIVER_EXIT)) { char * buf = NULL ; size_t sz = 0 ; if ((rv = nng_recv (pwnd->m_sockPair, &buf, &sz, NNG_FLAG_ALLOC)) != 0 ) { g_logger->error ("nng_recv failed. ret={}" ,rv); if (rv == NNG_ECLOSED) { break ; } else { nng_close (pwnd->m_sockPair); ASSERT (FALSE); } } string str = &buf[0 ]; str.resize (sz); pwnd->show_sensor_data (str); #ifdef _DEBUG str += "\r\n" ; _cprintf(str.c_str ()); #endif nng_free (buf, sz); } nng_close (pwnd->m_sockPair); return 0 ; }