project_files/frontlib/ipc/ipcbase.c
changeset 10017 de822cd3df3a
parent 8071 7d6aaba59505
equal deleted inserted replaced
10015:4feced261c68 10017:de822cd3df3a
    33  * bitmap, 1 for the number of hogs which fit on the map).
    33  * bitmap, 1 for the number of hogs which fit on the map).
    34  *
    34  *
    35  * We don't need to worry about wasting a few kb though, and I like powers of two...
    35  * We don't need to worry about wasting a few kb though, and I like powers of two...
    36  */
    36  */
    37 struct _flib_ipcbase {
    37 struct _flib_ipcbase {
    38 	uint8_t readBuffer[8192];
    38     uint8_t readBuffer[8192];
    39 	int readBufferSize;
    39     int readBufferSize;
    40 
    40 
    41 	flib_acceptor *acceptor;
    41     flib_acceptor *acceptor;
    42 	uint16_t port;
    42     uint16_t port;
    43 
    43 
    44 	flib_tcpsocket *sock;
    44     flib_tcpsocket *sock;
    45 };
    45 };
    46 
    46 
    47 flib_ipcbase *flib_ipcbase_create() {
    47 flib_ipcbase *flib_ipcbase_create() {
    48 	flib_ipcbase *result = flib_calloc(1, sizeof(flib_ipcbase));
    48     flib_ipcbase *result = flib_calloc(1, sizeof(flib_ipcbase));
    49 	flib_acceptor *acceptor = flib_acceptor_create(0);
    49     flib_acceptor *acceptor = flib_acceptor_create(0);
    50 
    50 
    51 	if(!result || !acceptor) {
    51     if(!result || !acceptor) {
    52 		free(result);
    52         free(result);
    53 		flib_acceptor_close(acceptor);
    53         flib_acceptor_close(acceptor);
    54 		return NULL;
    54         return NULL;
    55 	}
    55     }
    56 
    56 
    57 	result->acceptor = acceptor;
    57     result->acceptor = acceptor;
    58 	result->sock = NULL;
    58     result->sock = NULL;
    59 	result->readBufferSize = 0;
    59     result->readBufferSize = 0;
    60 	result->port = flib_acceptor_listenport(acceptor);
    60     result->port = flib_acceptor_listenport(acceptor);
    61 
    61 
    62 	flib_log_i("Started listening for IPC connections on port %u", (unsigned)result->port);
    62     flib_log_i("Started listening for IPC connections on port %u", (unsigned)result->port);
    63 	return result;
    63     return result;
    64 }
    64 }
    65 
    65 
    66 uint16_t flib_ipcbase_port(flib_ipcbase *ipc) {
    66 uint16_t flib_ipcbase_port(flib_ipcbase *ipc) {
    67 	if(log_badargs_if(ipc==NULL)) {
    67     if(log_badargs_if(ipc==NULL)) {
    68 		return 0;
    68         return 0;
    69 	}
    69     }
    70 	return ipc->port;
    70     return ipc->port;
    71 }
    71 }
    72 
    72 
    73 void flib_ipcbase_destroy(flib_ipcbase *ipc) {
    73 void flib_ipcbase_destroy(flib_ipcbase *ipc) {
    74 	if(ipc) {
    74     if(ipc) {
    75 		flib_acceptor_close(ipc->acceptor);
    75         flib_acceptor_close(ipc->acceptor);
    76 		flib_socket_close(ipc->sock);
    76         flib_socket_close(ipc->sock);
    77 		if(ipc->sock) {
    77         if(ipc->sock) {
    78 			flib_log_d("IPC connection closed.");
    78             flib_log_d("IPC connection closed.");
    79 		}
    79         }
    80 		free(ipc);
    80         free(ipc);
    81 	}
    81     }
    82 }
    82 }
    83 
    83 
    84 IpcState flib_ipcbase_state(flib_ipcbase *ipc) {
    84 IpcState flib_ipcbase_state(flib_ipcbase *ipc) {
    85 	if(log_badargs_if(ipc==NULL)) {
    85     if(log_badargs_if(ipc==NULL)) {
    86 		return IPC_NOT_CONNECTED;
    86         return IPC_NOT_CONNECTED;
    87 	} else if(ipc->sock) {
    87     } else if(ipc->sock) {
    88 		return IPC_CONNECTED;
    88         return IPC_CONNECTED;
    89 	} else if(ipc->acceptor) {
    89     } else if(ipc->acceptor) {
    90 		return IPC_LISTENING;
    90         return IPC_LISTENING;
    91 	} else {
    91     } else {
    92 		return IPC_NOT_CONNECTED;
    92         return IPC_NOT_CONNECTED;
    93 	}
    93     }
    94 }
    94 }
    95 
    95 
    96 static void receiveToBuffer(flib_ipcbase *ipc) {
    96 static void receiveToBuffer(flib_ipcbase *ipc) {
    97 	if(ipc->sock) {
    97     if(ipc->sock) {
    98 		int size = flib_socket_nbrecv(ipc->sock, ipc->readBuffer+ipc->readBufferSize, sizeof(ipc->readBuffer)-ipc->readBufferSize);
    98         int size = flib_socket_nbrecv(ipc->sock, ipc->readBuffer+ipc->readBufferSize, sizeof(ipc->readBuffer)-ipc->readBufferSize);
    99 		if(size>=0) {
    99         if(size>=0) {
   100 			ipc->readBufferSize += size;
   100             ipc->readBufferSize += size;
   101 		} else {
   101         } else {
   102 			flib_log_d("IPC connection lost.");
   102             flib_log_d("IPC connection lost.");
   103 			flib_socket_close(ipc->sock);
   103             flib_socket_close(ipc->sock);
   104 			ipc->sock = NULL;
   104             ipc->sock = NULL;
   105 		}
   105         }
   106 	}
   106     }
   107 }
   107 }
   108 
   108 
   109 static bool isMessageReady(flib_ipcbase *ipc) {
   109 static bool isMessageReady(flib_ipcbase *ipc) {
   110 	return ipc->readBufferSize >= ipc->readBuffer[0]+1;
   110     return ipc->readBufferSize >= ipc->readBuffer[0]+1;
   111 }
   111 }
   112 
   112 
   113 static void logSentMsg(const uint8_t *data, size_t len) {
   113 static void logSentMsg(const uint8_t *data, size_t len) {
   114 	if(flib_log_isActive(FLIB_LOGLEVEL_DEBUG)) {
   114     if(flib_log_isActive(FLIB_LOGLEVEL_DEBUG)) {
   115 		size_t msgStart = 0;
   115         size_t msgStart = 0;
   116 		while(msgStart < len) {
   116         while(msgStart < len) {
   117 			uint8_t msglen = data[msgStart];
   117             uint8_t msglen = data[msgStart];
   118 			if(msgStart+msglen < len) {
   118             if(msgStart+msglen < len) {
   119 				flib_log_d("[IPC OUT][%03u]%*.*s",(unsigned)msglen, (unsigned)msglen, (unsigned)msglen, data+msgStart+1);
   119                 flib_log_d("[IPC OUT][%03u]%*.*s",(unsigned)msglen, (unsigned)msglen, (unsigned)msglen, data+msgStart+1);
   120 			} else {
   120             } else {
   121 				uint8_t msglen2 = len-msgStart-1;
   121                 uint8_t msglen2 = len-msgStart-1;
   122 				flib_log_d("[IPC OUT][%03u/%03u]%*.*s",(unsigned)msglen2, (unsigned)msglen, (unsigned)msglen2, (unsigned)msglen2, data+msgStart+1);
   122                 flib_log_d("[IPC OUT][%03u/%03u]%*.*s",(unsigned)msglen2, (unsigned)msglen, (unsigned)msglen2, (unsigned)msglen2, data+msgStart+1);
   123 			}
   123             }
   124 			msgStart += (uint8_t)data[msgStart]+1;
   124             msgStart += (uint8_t)data[msgStart]+1;
   125 		}
   125         }
   126 	}
   126     }
   127 }
   127 }
   128 
   128 
   129 static void logRecvMsg(const uint8_t *data) {
   129 static void logRecvMsg(const uint8_t *data) {
   130 	if(flib_log_isActive(FLIB_LOGLEVEL_DEBUG)) {
   130     if(flib_log_isActive(FLIB_LOGLEVEL_DEBUG)) {
   131 		uint8_t msglen = data[0];
   131         uint8_t msglen = data[0];
   132 		flib_log_d("[IPC IN][%03u]%*.*s",(unsigned)msglen, (unsigned)msglen, (unsigned)msglen, data+1);
   132         flib_log_d("[IPC IN][%03u]%*.*s",(unsigned)msglen, (unsigned)msglen, (unsigned)msglen, data+1);
   133 	}
   133     }
   134 }
   134 }
   135 
   135 
   136 static void popFromReadBuffer(flib_ipcbase *ipc, uint8_t *outbuf, size_t size) {
   136 static void popFromReadBuffer(flib_ipcbase *ipc, uint8_t *outbuf, size_t size) {
   137 	memcpy(outbuf, ipc->readBuffer, size);
   137     memcpy(outbuf, ipc->readBuffer, size);
   138 	memmove(ipc->readBuffer, ipc->readBuffer+size, ipc->readBufferSize-size);
   138     memmove(ipc->readBuffer, ipc->readBuffer+size, ipc->readBufferSize-size);
   139 	ipc->readBufferSize -= size;
   139     ipc->readBufferSize -= size;
   140 }
   140 }
   141 
   141 
   142 int flib_ipcbase_recv_message(flib_ipcbase *ipc, void *data) {
   142 int flib_ipcbase_recv_message(flib_ipcbase *ipc, void *data) {
   143 	if(log_badargs_if2(ipc==NULL, data==NULL)) {
   143     if(log_badargs_if2(ipc==NULL, data==NULL)) {
   144 		return -1;
   144         return -1;
   145 	}
   145     }
   146 
   146 
   147 	if(!isMessageReady(ipc)) {
   147     if(!isMessageReady(ipc)) {
   148 		receiveToBuffer(ipc);
   148         receiveToBuffer(ipc);
   149 	}
   149     }
   150 
   150 
   151 	if(isMessageReady(ipc)) {
   151     if(isMessageReady(ipc)) {
   152 		int msgsize = ipc->readBuffer[0]+1;
   152         int msgsize = ipc->readBuffer[0]+1;
   153 		popFromReadBuffer(ipc, data, msgsize);
   153         popFromReadBuffer(ipc, data, msgsize);
   154 		logRecvMsg(data);
   154         logRecvMsg(data);
   155 		return msgsize;
   155         return msgsize;
   156 	} else if(!ipc->sock && ipc->readBufferSize>0) {
   156     } else if(!ipc->sock && ipc->readBufferSize>0) {
   157 		flib_log_w("Last message from engine data stream is incomplete (received %u of %u bytes)", (unsigned)ipc->readBufferSize, (unsigned)(ipc->readBuffer[0])+1);
   157         flib_log_w("Last message from engine data stream is incomplete (received %u of %u bytes)", (unsigned)ipc->readBufferSize, (unsigned)(ipc->readBuffer[0])+1);
   158 		ipc->readBufferSize = 0;
   158         ipc->readBufferSize = 0;
   159 		return -1;
   159         return -1;
   160 	} else {
   160     } else {
   161 		return -1;
   161         return -1;
   162 	}
   162     }
   163 }
   163 }
   164 
   164 
   165 int flib_ipcbase_recv_map(flib_ipcbase *ipc, void *data) {
   165 int flib_ipcbase_recv_map(flib_ipcbase *ipc, void *data) {
   166 	if(log_badargs_if2(ipc==NULL, data==NULL)) {
   166     if(log_badargs_if2(ipc==NULL, data==NULL)) {
   167 		return -1;
   167         return -1;
   168 	}
   168     }
   169 
   169 
   170 	receiveToBuffer(ipc);
   170     receiveToBuffer(ipc);
   171 
   171 
   172 	if(ipc->readBufferSize >= IPCBASE_MAPMSG_BYTES) {
   172     if(ipc->readBufferSize >= IPCBASE_MAPMSG_BYTES) {
   173 		popFromReadBuffer(ipc, data, IPCBASE_MAPMSG_BYTES);
   173         popFromReadBuffer(ipc, data, IPCBASE_MAPMSG_BYTES);
   174 		return IPCBASE_MAPMSG_BYTES;
   174         return IPCBASE_MAPMSG_BYTES;
   175 	} else {
   175     } else {
   176 		return -1;
   176         return -1;
   177 	}
   177     }
   178 }
   178 }
   179 
   179 
   180 int flib_ipcbase_send_raw(flib_ipcbase *ipc, const void *data, size_t len) {
   180 int flib_ipcbase_send_raw(flib_ipcbase *ipc, const void *data, size_t len) {
   181 	if(log_badargs_if2(ipc==NULL, data==NULL && len>0)
   181     if(log_badargs_if2(ipc==NULL, data==NULL && len>0)
   182 			|| log_w_if(!ipc->sock, "flib_ipcbase_send_raw: Not connected.")) {
   182             || log_w_if(!ipc->sock, "flib_ipcbase_send_raw: Not connected.")) {
   183 		return -1;
   183         return -1;
   184 	}
   184     }
   185 	if(flib_socket_send(ipc->sock, data, len) == len) {
   185     if(flib_socket_send(ipc->sock, data, len) == len) {
   186 		logSentMsg(data, len);
   186         logSentMsg(data, len);
   187 		return 0;
   187         return 0;
   188 	} else {
   188     } else {
   189 		flib_log_w("Failed or incomplete IPC write: engine connection lost.");
   189         flib_log_w("Failed or incomplete IPC write: engine connection lost.");
   190 		flib_socket_close(ipc->sock);
   190         flib_socket_close(ipc->sock);
   191 		ipc->sock = NULL;
   191         ipc->sock = NULL;
   192 		return -1;
   192         return -1;
   193 	}
   193     }
   194 }
   194 }
   195 
   195 
   196 int flib_ipcbase_send_message(flib_ipcbase *ipc, void *data, size_t len) {
   196 int flib_ipcbase_send_message(flib_ipcbase *ipc, void *data, size_t len) {
   197 	if(log_badargs_if3(ipc==NULL, data==NULL && len>0, len>255)) {
   197     if(log_badargs_if3(ipc==NULL, data==NULL && len>0, len>255)) {
   198 		return -1;
   198         return -1;
   199 	}
   199     }
   200 
   200 
   201 	uint8_t sendbuf[256];
   201     uint8_t sendbuf[256];
   202 	sendbuf[0] = len;
   202     sendbuf[0] = len;
   203 	memcpy(sendbuf+1, data, len);
   203     memcpy(sendbuf+1, data, len);
   204 	return flib_ipcbase_send_raw(ipc, sendbuf, len+1);
   204     return flib_ipcbase_send_raw(ipc, sendbuf, len+1);
   205 }
   205 }
   206 
   206 
   207 void flib_ipcbase_accept(flib_ipcbase *ipc) {
   207 void flib_ipcbase_accept(flib_ipcbase *ipc) {
   208 	if(!log_badargs_if(ipc==NULL) && !ipc->sock && ipc->acceptor) {
   208     if(!log_badargs_if(ipc==NULL) && !ipc->sock && ipc->acceptor) {
   209 		ipc->sock = flib_socket_accept(ipc->acceptor, true);
   209         ipc->sock = flib_socket_accept(ipc->acceptor, true);
   210 		if(ipc->sock) {
   210         if(ipc->sock) {
   211 			flib_log_d("IPC connection accepted.");
   211             flib_log_d("IPC connection accepted.");
   212 			flib_acceptor_close(ipc->acceptor);
   212             flib_acceptor_close(ipc->acceptor);
   213 			ipc->acceptor = NULL;
   213             ipc->acceptor = NULL;
   214 		}
   214         }
   215 	}
   215     }
   216 }
   216 }