Instrument Neutral Distributed Interface INDI  2.0.2
tcpsocket.cpp
Go to the documentation of this file.
1 /*
2  Copyright (C) 2022 by Pawel Soja <kernel32.pl@gmail.com>
3 
4  This library is free software; you can redistribute it and/or
5  modify it under the terms of the GNU Lesser General Public
6  License as published by the Free Software Foundation; either
7  version 2.1 of the License, or (at your option) any later version.
8 
9  This library is distributed in the hope that it will be useful,
10  but WITHOUT ANY WARRANTY; without even the implied warranty of
11  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
12  Lesser General Public License for more details.
13 
14  You should have received a copy of the GNU Lesser General Public
15  License along with this library; if not, write to the Free Software
16  Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17 */
18 #include "tcpsocket.h"
19 #include "tcpsocket_p.h"
20 
21 #include <errno.h>
22 #include <chrono>
23 #include <algorithm>
24 
25 // SocketAddress
26 const char *SocketAddress::unixDomainPrefix = "localhost:";
27 
28 SocketAddress::SocketAddress(const std::string &hostName, unsigned short port)
29 {
30  if (isUnix(hostName))
31  *this = SocketAddress::afUnix(hostName.substr(strlen(unixDomainPrefix)));
32  else
33  *this = SocketAddress::afInet(hostName, port);
34 }
35 
36 SocketAddress SocketAddress::afInet(const std::string &hostName, unsigned short port)
37 {
38  struct hostent *hp = gethostbyname(hostName.c_str());
39  if (hp == nullptr)
40  return SocketAddress();
41 
42  if (hp->h_addr_list == nullptr)
43  return SocketAddress();
44 
45  if (hp->h_addr_list[0] == nullptr)
46  return SocketAddress();
47 
48  struct sockaddr_in *sa_in = new sockaddr_in;
49  (void)memset(sa_in, 0, sizeof(struct sockaddr_in));
50  sa_in->sin_family = AF_INET;
51  sa_in->sin_addr.s_addr = ((struct in_addr *)(hp->h_addr_list[0]))->s_addr;
52  sa_in->sin_port = htons(port);
53 
54  SocketAddress result;
55  result.mData.reset(reinterpret_cast<struct sockaddr*>(sa_in));
56  result.mSize = sizeof(struct sockaddr_in);
57  return result;
58 }
59 
60 bool SocketAddress::isUnix(const std::string &hostName)
61 {
62  return hostName.rfind(unixDomainPrefix, 0) == 0;
63 }
64 
65 // TcpSocketPrivate
67  : parent(parent)
68 { }
69 
70 TcpSocket::TcpSocket(std::unique_ptr<TcpSocketPrivate> &&d)
71  : d_ptr(std::move(d))
72 { }
73 
74 ssize_t TcpSocketPrivate::write(const void *data, size_t size)
75 {
76  ssize_t ret;
77  do
78  {
79  std::unique_lock<std::mutex> locker(socketStateMutex);
81  {
82  return 0;
83  }
84  ret = TcpSocketPrivate::sendSocket(data, size);
85  }
86  while (ret == -1 && (errno == EINTR || errno == EAGAIN || errno == EWOULDBLOCK));
87 
88  if (ret < 0)
89  {
91  return 0;
92  }
93 
94  return ret;
95 }
96 
97 bool TcpSocketPrivate::connectSocket(const std::string &hostName, unsigned short port)
98 {
99  // create socket handle
100  if (!createSocket(SocketAddress::isUnix(hostName) ? AF_UNIX : AF_INET))
101  {
103  return false;
104  }
105 
106  // set non blocking mode
107  if (!setNonblockSocket())
108  {
110  return false;
111  }
112 
113  // get socket address
114  auto sockAddr = SocketAddress(hostName, port);
115 
116  if (!sockAddr.isValid())
117  {
119  return false;
120  }
121 
122  // connect to host
123  if (::connect(socketFd, &sockAddr, int(sockAddr.size())) < 0 && errno != EINPROGRESS)
124  {
126  return false;
127  }
128 
129  return true;
130 }
131 
133 {
134  // wait for connect
135  select.clear();
138 
139  if (select.isTimeout())
140  {
142  return false;
143  }
144 
145  if (select.isWakedUp())
146  {
147  return false;
148  }
149 
150  return TcpSocketPrivate::sendSocket("", 0) == 0; // final test, -1 if not connected
151 }
152 
154 {
155  select.clear();
157 #ifndef _WIN32
158  select.setTimeout(10 * 1000); // we can wake up
159 #else
160  select.setTimeout(100);
161 #endif
162 
163  select.select();
164 
165  // timeout, continue...
166  if (select.isTimeout())
167  {
168  return true;
169  }
170 
171  // manual wakeup, maybe isAboutToClose, exit
172  if (select.isWakedUp())
173  {
174  return true;
175  }
176 
177  // nothing to do
179  {
180  return true;
181  }
182 
183  // call virtual method
184  parent->readyRead();
185 
186  return true;
187 }
188 
189 void TcpSocketPrivate::joinThread(std::thread &thread)
190 {
191  std::unique_lock<std::mutex> locker(socketStateMutex);
192  isAboutToClose = true;
193  if (thread.joinable())
194  {
195  thread.join();
196  }
197  isAboutToClose = false;
198 }
199 
200 class Finally
201 {
202  typedef std::function<void()> F;
203  F f;
204  public:
205  Finally(const F &f) : f(f) { }
207  {
208  if (f) f();
209  }
210 };
211 
212 void TcpSocketPrivate::connectToHost(const std::string &hostName, unsigned short port)
213 {
215  {
217  return;
218  }
219 
221 
222  thread = std::thread([this, hostName, port] (std::thread && oldThread)
223  {
224  Finally finally([this]
225  {
226  closeSocket();
228  });
229 
230  joinThread(oldThread); // join prevus thread if exists
231 
232  // lookup and connect
233  if (!connectSocket(hostName, port))
234  {
235  // see error in connectSocket
236  return;
237  }
238  // wait for connected
241  {
242  setSocketError(TcpSocket::SocketError::HostNotFoundError);
243  return;
244  }
245 
247  parent->connected();
248  while (isAboutToClose == false && processSocket());
249  parent->disconnected();
250 
251  }, std::move(thread));
252 }
253 
255 {
256  std::unique_lock<std::mutex> locker(socketStateMutex);
257  if (socketState == TcpSocket::UnconnectedState || isAboutToClose.exchange(true) == true)
258  {
259  return;
260  }
261  select.wakeUp();
262 }
263 
265 {
266  if (socketFd == SocketInvalid)
267  return;
268 
269 #ifdef _WIN32
270  ::closesocket(socketFd);
271 #else
272  ::close(socketFd);
273 #endif
274  socketFd = SocketInvalid;
275 }
276 
277 void TcpSocketPrivate::setSocketError(TcpSocket::SocketError error, ErrorType errorType, const std::string &errorString)
278 {
279  if (errorType == ErrorTypeSystem && errorString == "")
280  {
281 #ifdef _WIN32
282  LPTSTR s = nullptr;
283  auto code = WSAGetLastError();
284  FormatMessage(
285  FORMAT_MESSAGE_ALLOCATE_BUFFER | FORMAT_MESSAGE_FROM_SYSTEM | FORMAT_MESSAGE_IGNORE_INSERTS,
286  nullptr, code,
287  MAKELANGID(LANG_NEUTRAL, SUBLANG_DEFAULT),
288  (LPTSTR)&s, 0, nullptr
289  );
290 
291 #ifdef UNICODE
292  std::wstring ws(s);
293  this->errorString = std::string(ws.begin(), ws.end());
294 #else
295  this->errorString = s;
296 #endif
297  LocalFree(s);
298 
299  this->errorString += " (" + std::to_string(code) + ")";
300 
301 #else
302  this->errorString = strerror(errno);
303  this->errorString += " (" + std::to_string(errno) + ")";
304 #endif
305  }
306  else
307  {
308  this->errorString = errorString;
309  }
310  socketError = error;
311  isAboutToClose = true;
312  parent->errorOccurred(error);
313 }
314 
316 {
317  std::unique_lock<std::mutex> locker(socketStateMutex);
318  if (socketState.exchange(state) == state)
319  return;
320  socketStateChanged.notify_all();
321 }
322 
323 // TcpSocket
325  : d_ptr(new TcpSocketPrivate(this))
326 { }
327 
329 {
331  if (waitForDisconnected())
332  {
333  d_ptr->joinThread(d_ptr->thread);
334  }
335 }
336 
338 {
339  d_ptr->timeout = timeout;
340 }
341 
342 void TcpSocket::connectToHost(const std::string &hostName, uint16_t port)
343 {
344  d_ptr->connectToHost(hostName, port);
345 }
346 
348 {
349  d_ptr->aboutToClose();
350 }
351 
352 ssize_t TcpSocket::write(const char *data, size_t size)
353 {
354  return d_ptr->write(data, size);
355 }
356 
357 ssize_t TcpSocket::write(const std::string &data)
358 {
359  return write(data.data(), data.size());
360 }
361 
362 static std::string sSocketErrorToString(TcpSocket::SocketError error)
363 {
364  switch (error)
365  {
367  return "ConnectionRefusedError";
369  return "RemoteHostClosedError";
371  return "HostNotFoundError";
373  return "SocketAccessError";
375  return "SocketResourceError";
377  return "SocketTimeoutError";
379  return "DatagramTooLargeError";
381  return "NetworkError";
383  return "AddressInUseError";
385  return "SocketAddressNotAvailableError";
387  return "UnsupportedSocketOperationError";
389  return "UnfinishedSocketOperationError";
391  return "ProxyAuthenticationRequiredError";
393  return "SslHandshakeFailedError";
395  return "ProxyConnectionRefusedError";
397  return "ProxyConnectionClosedError";
399  return "ProxyConnectionTimeoutError";
401  return "ProxyNotFoundError";
403  return "ProxyProtocolError";
405  return "OperationError";
407  return "SslInternalError";
409  return "SslInvalidUserDataError";
411  return "TemporaryError";
413  return "UnknownSocketError";
414  }
415  return "UnknownSocketError";
416 }
417 
419 {
420  return d_ptr->socketError;
421 }
422 
423 std::string TcpSocket::errorString() const
424 {
425  return sSocketErrorToString(d_ptr->socketError) + ": " + d_ptr->errorString;
426 }
427 
428 void TcpSocket::onConnected(const std::function<void()> &callback)
429 {
430  d_ptr->onConnected = callback;
431 }
432 
433 void TcpSocket::onDisconnected(const std::function<void()> &callback)
434 {
435  d_ptr->onDisconnected = callback;
436 }
437 
438 void TcpSocket::onData(const std::function<void(const char *, size_t)> &callback)
439 {
440  d_ptr->onData = callback;
441 }
442 
443 void TcpSocket::onErrorOccurred(const std::function<void(SocketError)> &callback)
444 {
445  d_ptr->onErrorOccurred = callback;
446 }
447 
448 bool TcpSocket::waitForConnected(int timeout) const
449 {
450  if (d_ptr->thread.get_id() == std::this_thread::get_id())
451  {
452  d_ptr->setSocketError(TcpSocket::SocketError::OperationError);
453  return false;
454  }
455 
456  std::unique_lock<std::mutex> locker(d_ptr->socketStateMutex);
457  d_ptr->socketStateChanged.wait_for(locker, std::chrono::milliseconds(timeout), [this]
458  {
459  return d_ptr->socketState == TcpSocket::ConnectedState || d_ptr->socketState == TcpSocket::UnconnectedState;
460  });
461  return d_ptr->socketState == TcpSocket::ConnectedState;
462 }
463 
464 bool TcpSocket::waitForDisconnected(int timeout) const
465 {
466  if (d_ptr->thread.get_id() == std::this_thread::get_id())
467  {
468  d_ptr->setSocketError(TcpSocket::SocketError::OperationError);
469  return false;
470  }
471 
472  std::unique_lock<std::mutex> locker(d_ptr->socketStateMutex);
473  return d_ptr->socketStateChanged.wait_for(locker, std::chrono::milliseconds(timeout), [this]
474  {
475  return d_ptr->socketState == TcpSocket::UnconnectedState;
476  });
477 
478 }
479 
481 {
482  return reinterpret_cast<int *>(d_ptr->socketFd);
483 }
484 
486 {
487  emitConnected();
488 }
489 
491 {
493 }
494 
496 {
497  char data[65536];
498  ssize_t size = d_ptr->recvSocket(data, sizeof(data));
499 
500  if (size <= 0)
501  {
503  return;
504  }
505 
506  emitData(data, size);
507 }
508 
510 {
512 }
513 
515 {
516  if (d_ptr->onConnected)
517  d_ptr->onConnected();
518 }
519 
521 {
522  if (d_ptr->onDisconnected)
523  d_ptr->onDisconnected();
524 }
525 
526 void TcpSocket::emitData(const char *data, size_t size) const
527 {
528  if (d_ptr->onData)
529  d_ptr->onData(data, size);
530 }
531 
533 {
534  if (d_ptr->onErrorOccurred)
535  d_ptr->onErrorOccurred(error);
536 }
537 
539 {
540  d_ptr->setSocketError(socketError);
541 }
Finally(const F &f)
Definition: tcpsocket.cpp:205
bool isWakedUp() const
Definition: select.h:187
void setReadEvent(SocketFileDescriptor fd)
Definition: select.h:152
void setReadWriteExceptionEvent(SocketFileDescriptor fd)
Definition: select.h:177
void select()
Definition: select.h:131
void setTimeout(int timeout)
Definition: select.h:125
bool isTimeout() const
Definition: select.h:197
void wakeUp()
Definition: select.h:106
void clear()
Definition: select.h:113
bool isReadEvent(SocketFileDescriptor fd) const
Definition: select.h:205
std::unique_ptr< struct sockaddr > mData
Definition: tcpsocket_p.h:73
SocketAddress()=default
static bool isUnix(const std::string &hostName)
Definition: tcpsocket.cpp:60
static const char * unixDomainPrefix
Definition: tcpsocket_p.h:37
static SocketAddress afUnix(const std::string &hostName)
static SocketAddress afInet(const std::string &hostName, unsigned short port)
Definition: tcpsocket.cpp:36
size_t mSize
Definition: tcpsocket_p.h:74
bool createSocket(int domain)
SocketFileDescriptor socketFd
Definition: tcpsocket_p.h:115
std::thread thread
Definition: tcpsocket_p.h:119
std::string errorString
Definition: tcpsocket_p.h:127
TcpSocketPrivate(TcpSocket *parent)
Definition: tcpsocket.cpp:66
std::atomic< TcpSocket::SocketState > socketState
Definition: tcpsocket_p.h:125
ssize_t sendSocket(const void *src, size_t size)
std::condition_variable socketStateChanged
Definition: tcpsocket_p.h:123
bool waitForConnectedSockets()
Definition: tcpsocket.cpp:132
TcpSocket::SocketError socketError
Definition: tcpsocket_p.h:126
bool processSocket()
Definition: tcpsocket.cpp:153
TcpSocket * parent
Definition: tcpsocket_p.h:114
bool connectSocket(const std::string &hostName, unsigned short port)
Definition: tcpsocket.cpp:97
void connectToHost(const std::string &hostName, unsigned short port)
Definition: tcpsocket.cpp:212
void setSocketState(TcpSocket::SocketState state)
Definition: tcpsocket.cpp:315
ssize_t write(const void *data, size_t size)
Definition: tcpsocket.cpp:74
void setSocketError(TcpSocket::SocketError error, ErrorType errorType=ErrorTypeSystem, const std::string &errorString="")
Definition: tcpsocket.cpp:277
std::mutex socketStateMutex
Definition: tcpsocket_p.h:122
void joinThread(std::thread &thread)
Definition: tcpsocket.cpp:189
std::atomic< bool > isAboutToClose
Definition: tcpsocket_p.h:120
std::unique_ptr< TcpSocketPrivate > d_ptr
Definition: tcpsocket.h:119
void onData(const std::function< void(const char *, size_t)> &callback)
Definition: tcpsocket.cpp:438
bool waitForDisconnected(int timeout=2000) const
Definition: tcpsocket.cpp:464
virtual void disconnected()
Definition: tcpsocket.cpp:490
std::string errorString() const
Definition: tcpsocket.cpp:423
@ ConnectedState
Definition: tcpsocket.h:64
@ ConnectingState
Definition: tcpsocket.h:63
@ HostLookupState
Definition: tcpsocket.h:62
@ UnconnectedState
Definition: tcpsocket.h:61
void emitConnected() const
Definition: tcpsocket.cpp:514
void setConnectionTimeout(int timeout)
Definition: tcpsocket.cpp:337
void connectToHost(const std::string &hostName, uint16_t port)
Definition: tcpsocket.cpp:342
virtual ~TcpSocket()
Definition: tcpsocket.cpp:328
@ ProxyNotFoundError
Definition: tcpsocket.h:49
@ SslHandshakeFailedError
Definition: tcpsocket.h:45
@ ProxyConnectionRefusedError
Definition: tcpsocket.h:46
@ TemporaryError
Definition: tcpsocket.h:54
@ ProxyConnectionClosedError
Definition: tcpsocket.h:47
@ SslInternalError
Definition: tcpsocket.h:52
@ RemoteHostClosedError
Definition: tcpsocket.h:33
@ UnfinishedSocketOperationError
Definition: tcpsocket.h:43
@ OperationError
Definition: tcpsocket.h:51
@ ConnectionRefusedError
Definition: tcpsocket.h:32
@ AddressInUseError
Definition: tcpsocket.h:40
@ SocketAddressNotAvailableError
Definition: tcpsocket.h:41
@ SslInvalidUserDataError
Definition: tcpsocket.h:53
@ SocketAccessError
Definition: tcpsocket.h:35
@ ProxyProtocolError
Definition: tcpsocket.h:50
@ NetworkError
Definition: tcpsocket.h:39
@ UnsupportedSocketOperationError
Definition: tcpsocket.h:42
@ ProxyAuthenticationRequiredError
Definition: tcpsocket.h:44
@ SocketTimeoutError
Definition: tcpsocket.h:37
@ ProxyConnectionTimeoutError
Definition: tcpsocket.h:48
@ SocketResourceError
Definition: tcpsocket.h:36
@ HostNotFoundError
Definition: tcpsocket.h:34
@ UnknownSocketError
Definition: tcpsocket.h:56
@ DatagramTooLargeError
Definition: tcpsocket.h:38
void emitErrorOccurred(SocketError error) const
Definition: tcpsocket.cpp:532
void emitDisconnected() const
Definition: tcpsocket.cpp:520
virtual void connected()
Definition: tcpsocket.cpp:485
SocketError error() const
Definition: tcpsocket.cpp:418
void emitData(const char *data, size_t size) const
Definition: tcpsocket.cpp:526
void setSocketError(SocketError socketError)
Definition: tcpsocket.cpp:538
int * socketDescriptor() const
Definition: tcpsocket.cpp:480
void onErrorOccurred(const std::function< void(SocketError)> &callback)
Definition: tcpsocket.cpp:443
void disconnectFromHost()
Definition: tcpsocket.cpp:347
ssize_t write(const char *data, size_t size)
Definition: tcpsocket.cpp:352
void onDisconnected(const std::function< void()> &callback)
Definition: tcpsocket.cpp:433
bool waitForConnected(int timeout=2000) const
Definition: tcpsocket.cpp:448
virtual void errorOccurred(SocketError)
Definition: tcpsocket.cpp:509
virtual void readyRead()
Definition: tcpsocket.cpp:495
void onConnected(const std::function< void()> &callback)
Definition: tcpsocket.cpp:428
int errno
NLOHMANN_BASIC_JSON_TPL_DECLARATION std::string to_string(const NLOHMANN_BASIC_JSON_TPL &j)
user-defined to_string function for JSON values
Definition: json.h:23613
Definition: json.h:4973