56 #include <unordered_map>
82 #include <arpa/inet.h>
83 #include <netinet/in.h>
85 #include <sys/ioctl.h>
86 #include <sys/types.h>
89 #include <sys/socket.h>
94 #include <linux/errqueue.h>
100 #define INDIUNIXSOCK "/tmp/indiserver"
102 #define MAXRBUF 49152
103 #define MAXWSIZ 49152
104 #define SHORTMSGSIZ 2048
105 #define DEFMAXQSIZ 128
107 #define DEFMAXRESTART 10
108 #define MAXFD_PER_MESSAGE 16
109 #ifdef OSX_EMBEDED_MODE
110 #define LOGNAME "/Users/%s/Library/Logs/indiserver.log"
111 #define FIFONAME "/tmp/indiserverFIFO"
114 #define STRINGIFY_TOK(x) #x
115 #define TO_STRING(x) STRINGIFY_TOK(x)
117 static ev::default_loop loop;
122 unsigned long identifier = 1;
123 std::map<unsigned long, M*> items;
128 item->id = identifier++;
129 items[item->id] = item;
135 items.erase(item->id);
137 item->current =
nullptr;
140 std::vector<unsigned long>
ids()
const
142 std::vector<unsigned long> result;
143 for(
auto item : items)
145 result.push_back(item.first);
152 auto e = items.find(
id);
153 if (e == items.end())
164 std::vector<unsigned long>
ids;
170 if (pos == -1)
return;
171 while(pos < (
long int)
ids.size() && !(*parent)[
ids[pos]])
175 if (pos == (
long int)
ids.size())
200 return (*parent)[
ids[pos]];
207 for(
auto item : items)
209 result.ids.push_back(item.first);
229 unsigned long id = 0;
239 : id(id), current(current) {}
243 return id != 0 && (*current)[id] !=
nullptr;
252 return HeartBeat(
id, current);
269 MsgChunck(
char * content,
unsigned long length);
272 unsigned long contentLength;
274 std::vector<int> sharedBufferIdsToAttach;
289 std::set<int> sharedBuffers;
299 for(
auto fd : from.sharedBuffers)
301 sharedBuffers.insert(
fd);
307 return (xml == sr.xml) && (sharedBuffers == sr.sharedBuffers);
318 std::recursive_mutex lock;
319 ev::async asyncProgress;
326 void async_progressed();
331 void produce(
bool sync);
352 std::vector<MsgChunck> chuncks;
397 std::set<int> ownSharedBuffers;
423 std::size_t chunckId;
424 unsigned long chunckOffset;
462 bool hasSharedBufferBlobs;
464 std::vector<int> sharedBuffers;
473 bool fetchBlobs(std::list<int> &incomingSharedBuffers);
475 void releaseXmlContent();
476 void releaseSharedBuffers(
const std::set<int> &keep);
520 void ioCb(ev::io &watcher,
int revents);
525 std::set<SerializedMsg*> readBlocker;
527 std::list<SerializedMsg*> msgq;
528 std::list<int> incomingSharedBuffers;
534 size_t doRead(
char * buff,
size_t len);
588 void setFds(
int rFd,
int wFd);
595 virtual void log(
const std::string &
log)
const;
621 void processLine(
const char * line);
625 void ioCb(ev::io &watcher,
int revents);
627 Fifo(
const std::string &name);
634 static Fifo * fifo =
nullptr;
649 void crackBLOBHandling(
const std::string &dev,
const std::string &name,
const char *enableBLOB);
652 virtual void close();
664 int findDevice(
const std::string &dev,
const std::string &name)
const;
668 void addDevice(
const std::string &dev,
const std::string &name,
int isblob);
670 virtual void log(
const std::string &
log)
const;
679 static void q2Clients(
ClInfo *notme,
int isblob,
const std::string &dev,
const std::string &name,
Msg *mp,
XMLEle *root);
691 void addSDevice(
const std::string &
dev,
const std::string &
name);
713 std::set<std::string>
dev;
729 virtual void close();
734 virtual void log(
const std::string &
log)
const;
763 ev::child pidwatcher;
764 void onEfdEvent(ev::io &watcher,
int revents);
765 void onPidEvent(ev::child &watcher,
int revents);
784 virtual void start();
799 int openINDIServer();
801 void extractRemoteId(
const std::string &
name, std::string &o_host,
int &o_port, std::string &o_dev)
const;
813 virtual void start();
833 void ioCb(ev::io &watcher,
int revents);
843 #ifdef ENABLE_INDI_SHARED_MEMORY
852 void ioCb(ev::io &watcher,
int revents);
854 virtual void log(
const std::string &log)
const;
856 UnixServer(
const std::string &path);
863 static std::string unixSocketPath;
870 static void log(
const std::string &log);
872 static std::string fmt(
const char * fmt, ...)
__attribute__ ((format (printf, 1, 0)));
874 static
char *indi_tstamp(
char *s);
876 static const
char *me;
880 static
unsigned int maxqsiz = (
DEFMAXQSIZ * 1024 * 1024);
881 static
unsigned int maxstreamsiz = (
DEFMAXSSIZ * 1024 * 1024);
886 static
void logStartup(
int ac,
char *av[]);
887 static
void usage(
void);
888 static
void noSIGPIPE(
void);
889 static
char *indi_tstamp(
char *s);
890 static
void logDMsg(
XMLEle *root, const
char *dev);
891 static
void Bye(
void);
893 static
int readFdError(
int
896 static
void * attachSharedBuffer(
int fd,
size_t &size);
897 static
void dettachSharedBuffer(
int fd,
void * ptr,
size_t size);
907 #ifdef OSX_EMBEDED_MODE
910 snprintf(logname, 128, LOGNAME, getlogin());
911 fprintf(stderr,
"switching stderr to %s", logname);
912 freopen(logname,
"w", stderr);
915 fifo->name = FIFONAME;
922 while ((--ac > 0) && ((*++av)[0] ==
'-'))
925 for (s = av[0] + 1; *s !=
'\0'; s++)
931 fprintf(stderr,
"-l requires log directory\n");
940 fprintf(stderr,
"-m requires max MB behind\n");
943 maxqsiz = 1024 * 1024 * atoi(*++av);
949 fprintf(stderr,
"-p requires port value\n");
958 fprintf(stderr,
"-d requires max stream MB behind\n");
961 maxstreamsiz = 1024 * 1024 * atoi(*++av);
964 #ifdef ENABLE_INDI_SHARED_MEMORY
968 fprintf(stderr,
"-u requires local socket path\n");
971 UnixServer::unixSocketPath = *++av;
978 fprintf(stderr,
"-f requires fifo node\n");
981 fifo =
new Fifo(*++av);
987 fprintf(stderr,
"-r requires number of restarts\n");
990 maxrestarts = atoi(*++av);
1005 if (ac == 0 && !fifo)
1014 std::string dvrName = *av++;
1016 if (dvrName.find(
'@') != std::string::npos)
1031 #ifdef ENABLE_INDI_SHARED_MEMORY
1033 (
new UnixServer(UnixServer::unixSocketPath))->listen();
1049 log(
"unexpected return from event loop\n");
1054 static void logStartup(
int ac,
char *av[])
1058 std::string startupMsg =
"startup:";
1059 for (i = 0; i < ac; i++)
1062 startupMsg += av[i];
1069 static void usage(
void)
1071 fprintf(stderr,
"Usage: %s [options] driver [driver ...]\n", me);
1072 fprintf(stderr,
"Purpose: server for local and remote INDI drivers\n");
1073 fprintf(stderr,
"INDI Library: %s\nCode %s. Protocol %g.\n", CMAKE_INDI_VERSION_STRING, GIT_TAG_STRING,
INDIV);
1074 fprintf(stderr,
"Options:\n");
1075 fprintf(stderr,
" -l d : log driver messages to <d>/YYYY-MM-DD.islog\n");
1076 fprintf(stderr,
" -m m : kill client if gets more than this many MB behind, default %d\n",
DEFMAXQSIZ);
1078 " -d m : drop streaming blobs if client gets more than this many MB behind, default %d. 0 to disable\n",
1080 #ifdef ENABLE_INDI_SHARED_MEMORY
1081 fprintf(stderr,
" -u path : Path for the local connection socket (abstract), default %s\n",
INDIUNIXSOCK);
1083 fprintf(stderr,
" -p p : alternate IP port, default %d\n",
INDIPORT);
1084 fprintf(stderr,
" -r r : maximum driver restarts on error, default %d\n",
DEFMAXRESTART);
1085 fprintf(stderr,
" -f path : Path to fifo for dynamic startup and shutdown of drivers.\n");
1086 fprintf(stderr,
" -v : show key events, no traffic\n");
1087 fprintf(stderr,
" -vv : -v + key message content\n");
1088 fprintf(stderr,
" -vvv : -vv + complete xml\n");
1089 fprintf(stderr,
"driver : executable or [device]@host[:port]\n");
1095 static void noSIGPIPE()
1097 struct sigaction sa;
1098 memset(&sa, 0,
sizeof(sa));
1099 sa.sa_handler = SIG_IGN;
1100 sigemptyset(&sa.sa_mask);
1101 (void)sigaction(SIGPIPE, &sa, NULL);
1110 int rp[2], wp[2], ep[2];
1114 #ifdef OSX_EMBEDED_MODE
1115 fprintf(stderr,
"STARTING \"%s\"\n",
name.c_str());
1123 if (socketpair(AF_UNIX, SOCK_STREAM, 0, ux) == -1)
1125 log(fmt(
"socketpair: %s\n", strerror(
errno)));
1133 log(fmt(
"read pipe: %s\n", strerror(
errno)));
1138 log(fmt(
"write pipe: %s\n", strerror(
errno)));
1144 log(fmt(
"stderr pipe: %s\n", strerror(
errno)));
1152 log(fmt(
"fork: %s\n", strerror(
errno)));
1175 for (
fd = 3;
fd < 100;
fd++)
1179 setenv(
"INDIDEV",
envDev.c_str(), 1);
1182 unsetenv(
"INDIDEV");
1184 setenv(
"INDICONFIG",
envConfig.c_str(), 1);
1186 unsetenv(
"INDICONFIG");
1188 setenv(
"INDISKEL",
envSkel.c_str(), 1);
1190 unsetenv(
"INDISKEL");
1191 std::string executable;
1194 setenv(
"INDIPREFIX",
envPrefix.c_str(), 1);
1195 #if defined(OSX_EMBEDED_MODE)
1197 #elif defined(__APPLE__)
1203 fprintf(stderr,
"%s\n", executable.c_str());
1205 execlp(executable.c_str(),
name.c_str(), NULL);
1211 executable = std::string(dirname((
char*)me)) +
"/" +
name;
1212 execlp(executable.c_str(),
name.c_str(), NULL);
1216 execlp(
name.c_str(),
name.c_str(), NULL);
1220 #ifdef OSX_EMBEDED_MODE
1221 fprintf(stderr,
"FAILED \"%s\"\n",
name.c_str());
1224 log(fmt(
"execlp %s: %s\n", executable.c_str(), strerror(
errno)));
1252 this->pidwatcher.set(pid);
1253 this->pidwatcher.start();
1257 fcntl(this->efd, F_SETFL, fcntl(this->efd, F_GETFL, 0) | O_NONBLOCK);
1258 this->eio.start(this->efd,
ev::READ);
1264 log(fmt(
"pid=%d rfd=%d wfd=%d efd=%d\n", pid, rp[0], wp[1], ep[0]));
1268 mp =
new Msg(
nullptr, root);
1274 void RemoteDvrInfo::extractRemoteId(
const std::string &name, std::string &o_host,
int &o_port, std::string &o_dev)
const
1281 if (sscanf(
name.c_str(),
"%[^@]@%[^:]:%d",
dev,
host, &indi_port) < 2)
1284 if (sscanf(
name.c_str(),
"@%[^:]:%d",
host, &indi_port) < 1)
1286 log(fmt(
"Bad remote device syntax: %s\n",
name.c_str()));
1306 sockfd = openINDIServer();
1310 this->
setFds(sockfd, sockfd);
1313 log(fmt(
"socket=%d\n", sockfd));
1319 this->dev.insert(
dev);
1340 Msg *mp =
new Msg(
nullptr, root);
1346 int RemoteDvrInfo::openINDIServer()
1348 struct sockaddr_in serv_addr;
1353 hp = gethostbyname(
host.c_str());
1356 log(fmt(
"gethostbyname(%s): %s\n",
host.c_str(), strerror(
errno)));
1361 (void)memset((
char *)&serv_addr, 0,
sizeof(serv_addr));
1362 serv_addr.sin_family = AF_INET;
1363 serv_addr.sin_addr.s_addr = ((
struct in_addr *)(hp->h_addr_list[0]))->s_addr;
1364 serv_addr.sin_port = htons(
port);
1365 if ((sockfd = socket(AF_INET, SOCK_STREAM, 0)) < 0)
1372 if (connect(sockfd, (
struct sockaddr *)&serv_addr,
sizeof(serv_addr)) < 0)
1382 #ifdef ENABLE_INDI_SHARED_MEMORY
1384 UnixServer::UnixServer(
const std::string &path): path(path)
1386 sfdev.set<UnixServer, &UnixServer::ioCb>(
this);
1389 void UnixServer::log(
const std::string &str)
const
1391 std::string logLine =
"Local server: ";
1396 void UnixServer::ioCb(ev::io &,
int revents)
1398 if (revents & EV_ERROR)
1400 int sockErrno = readFdError(this->sfd);
1403 log(fmt(
"Error on unix socket: %s\n", strerror(sockErrno)));
1407 if (revents & EV_READ)
1413 static void initUnixSocketAddr(
const std::string &unixAddr,
struct sockaddr_un &serv_addr_un, socklen_t &addrlen,
bool bind)
1415 memset(&serv_addr_un, 0,
sizeof(serv_addr_un));
1416 serv_addr_un.sun_family = AF_UNIX;
1422 strncpy(serv_addr_un.sun_path + 1, unixAddr.c_str(),
sizeof(serv_addr_un.sun_path) - 1);
1424 int len = offsetof(
struct sockaddr_un, sun_path) + unixAddr.size() + 1;
1429 strncpy(serv_addr_un.sun_path, unixAddr.c_str(),
sizeof(serv_addr_un.sun_path) - 1);
1431 int len = offsetof(
struct sockaddr_un, sun_path) + unixAddr.size();
1435 unlink(unixAddr.c_str());
1441 void UnixServer::listen()
1443 struct sockaddr_un serv_socket;
1446 if ((sfd = socket(AF_UNIX, SOCK_STREAM, 0)) < 0)
1448 log(fmt(
"socket: %s\n", strerror(
errno)));
1453 if (setsockopt(sfd, SOL_SOCKET, SO_REUSEADDR, &reuse,
sizeof(reuse)) < 0)
1455 log(fmt(
"setsockopt: %s\n", strerror(
errno)));
1461 initUnixSocketAddr(path, serv_socket, len,
true);
1463 if (bind(sfd, (
struct sockaddr *)&serv_socket, len) < 0)
1465 log(fmt(
"bind: %s\n", strerror(
errno)));
1470 if (::listen(sfd, 5) < 0)
1472 log(fmt(
"listen: %s\n", strerror(
errno)));
1476 fcntl(sfd, F_SETFL, fcntl(sfd, F_GETFL, 0) | O_NONBLOCK);
1477 sfdev.start(sfd, EV_READ);
1481 log(fmt(
"listening on local domain at: @%s\n", path.c_str()));
1484 void UnixServer::accept()
1489 cli_fd = ::accept(sfd, 0, 0);
1492 if (
errno == EAGAIN ||
errno == EWOULDBLOCK)
return;
1494 log(fmt(
"accept: %s\n", strerror(
errno)));
1501 cp->
setFds(cli_fd, cli_fd);
1508 socklen_t len =
sizeof(
struct ucred);
1509 if (getsockopt(cli_fd, SOL_SOCKET, SO_PEERCRED, &ucred, &len) == -1)
1511 log(fmt(
"getsockopt failed: %s\n", strerror(
errno)));
1515 cp->
log(fmt(
"new arrival from local pid %ld (user: %ld:%ld) - welcome!\n", (
long)ucred.pid, (
long)ucred.uid,
1518 cp->
log(fmt(
"new arrival from local domain - welcome!\n"));
1522 #ifdef OSX_EMBEDED_MODE
1523 fprintf(stderr,
"CLIENTS %d\n", clients.size());
1532 sfdev.set<
TcpServer, &TcpServer::ioCb>(
this);
1535 void TcpServer::ioCb(ev::io &,
int revents)
1537 if (revents & EV_ERROR)
1539 int sockErrno = readFdError(this->sfd);
1542 log(fmt(
"Error on tcp server socket: %s\n", strerror(sockErrno)));
1546 if (revents & EV_READ)
1554 struct sockaddr_in serv_socket;
1558 if ((sfd = socket(AF_INET, SOCK_STREAM, 0)) < 0)
1560 log(fmt(
"socket: %s\n", strerror(
errno)));
1565 memset(&serv_socket, 0,
sizeof(serv_socket));
1566 serv_socket.sin_family = AF_INET;
1568 serv_socket.sin_addr.s_addr = htonl(INADDR_LOOPBACK);
1570 serv_socket.sin_addr.s_addr = htonl(INADDR_ANY);
1572 serv_socket.sin_port = htons((
unsigned short)port);
1573 if (setsockopt(sfd, SOL_SOCKET, SO_REUSEADDR, &reuse,
sizeof(reuse)) < 0)
1575 log(fmt(
"setsockopt: %s\n", strerror(
errno)));
1578 if (bind(sfd, (
struct sockaddr *)&serv_socket,
sizeof(serv_socket)) < 0)
1580 log(fmt(
"bind: %s\n", strerror(
errno)));
1585 if (::
listen(sfd, 5) < 0)
1587 log(fmt(
"listen: %s\n", strerror(
errno)));
1591 fcntl(sfd, F_SETFL, fcntl(sfd, F_GETFL, 0) | O_NONBLOCK);
1592 sfdev.start(sfd, EV_READ);
1596 log(fmt(
"listening to port %d on fd %d\n", port, sfd));
1599 void TcpServer::accept()
1601 struct sockaddr_in cli_socket;
1606 cli_len =
sizeof(cli_socket);
1607 cli_fd = ::accept(sfd, (
struct sockaddr *)&cli_socket, &cli_len);
1610 if (
errno == EAGAIN ||
errno == EWOULDBLOCK)
return;
1612 log(fmt(
"accept: %s\n", strerror(
errno)));
1619 cp->
setFds(cli_fd, cli_fd);
1623 cp->
log(fmt(
"new arrival from %s:%d - welcome!\n",
1624 inet_ntoa(cli_socket.sin_addr), ntohs(cli_socket.sin_port)));
1626 #ifdef OSX_EMBEDED_MODE
1627 fprintf(stderr,
"CLIENTS %d\n", clients.size());
1634 fdev.set<
Fifo, &Fifo::ioCb>(
this);
1638 void Fifo::close(
void)
1652 fd = ::open(name.c_str(), O_RDONLY | O_NONBLOCK | O_CLOEXEC);
1656 log(fmt(
"open(%s): %s.\n", name.c_str(), strerror(
errno)));
1660 fdev.start(
fd, EV_READ);
1665 void Fifo::processLine(
const char * line)
1669 log(fmt(
"FIFO: %s\n", line));
1674 memset(&tDriver[0], 0,
sizeof(
char) *
MAXSBUF);
1675 memset(&tName[0], 0,
sizeof(
char) *
MAXSBUF);
1676 memset(&envConfig[0], 0,
sizeof(
char) *
MAXSBUF);
1677 memset(&envSkel[0], 0,
sizeof(
char) *
MAXSBUF);
1678 memset(&envPrefix[0], 0,
sizeof(
char) *
MAXSBUF);
1682 bool remoteDriver = !!strstr(line,
"@");
1687 n = sscanf(line,
"%s %511[^\n]",
cmd, tDriver);
1690 char *ptr = tDriver;
1691 int len = strlen(tDriver);
1692 while ((ptr = strstr(tDriver,
"\"")))
1694 memmove(ptr, ptr + 1, --len);
1701 n = sscanf(line,
"%s %s -%1c \"%511[^\"]\" -%1c \"%511[^\"]\" -%1c \"%511[^\"]\" -%1c \"%511[^\"]\"",
cmd,
1702 tDriver, arg[0], var[0], arg[1], var[1], arg[2], var[2], arg[3], var[3]);
1705 int n_args = (n - 2) / 2;
1708 for (j = 0; j < n_args; j++)
1710 if (arg[j][0] ==
'n')
1712 strncpy(tName, var[j],
MAXSBUF - 1);
1716 log(fmt(
"With name: %s\n", tName));
1718 else if (arg[j][0] ==
'c')
1720 strncpy(envConfig, var[j],
MAXSBUF - 1);
1721 envConfig[
MAXSBUF - 1] =
'\0';
1724 log(fmt(
"With config: %s\n", envConfig));
1726 else if (arg[j][0] ==
's')
1728 strncpy(envSkel, var[j],
MAXSBUF - 1);
1732 log(fmt(
"With skeketon: %s\n", envSkel));
1734 else if (arg[j][0] ==
'p')
1736 strncpy(envPrefix, var[j],
MAXSBUF - 1);
1737 envPrefix[
MAXSBUF - 1] =
'\0';
1740 log(fmt(
"With prefix: %s\n", envPrefix));
1745 if (!strcmp(
cmd,
"start"))
1753 log(fmt(
"FIFO: Starting driver %s\n", tDriver));
1756 if (remoteDriver == 0)
1761 localDp->envDev = tName;
1762 localDp->envConfig = envConfig;
1763 localDp->envSkel = envSkel;
1764 localDp->envPrefix = envPrefix;
1777 if (dp ==
nullptr)
continue;
1779 if (dp->
name == tDriver)
1785 log(fmt(
"FIFO: Shutting down driver: %s\n", tDriver));
1795 void Fifo::read(
void)
1797 int rd = ::read(
fd,
buffer + bufferPos,
sizeof(
buffer) - 1 - bufferPos);
1802 buffer[bufferPos] =
'\0';
1811 if (
errno == EAGAIN ||
errno == EWOULDBLOCK)
return;
1813 log(fmt(
"Fifo error: %s\n", strerror(
errno)));
1821 for(
int i = 0; i < bufferPos; ++i)
1835 if ((
unsigned)bufferPos >=
sizeof(
buffer) - 1)
1837 log(fmt(
"Fifo overflow"));
1843 void Fifo::ioCb(ev::io &,
int revents)
1845 if (EV_ERROR & revents)
1847 int sockErrno = readFdError(this->
fd);
1850 log(fmt(
"Error on fifo: %s\n", strerror(sockErrno)));
1855 else if (revents & EV_READ)
1868 int isblob = !strcmp(
tagXMLEle(root),
"setBLOBVector");
1878 if (dev[0] ==
'*' && !this->
props.size())
1883 else if (!strcmp(roottag,
"getProperties") && !this->
props.size() && this->allprops != 2)
1887 if (!strcmp(roottag,
"enableBLOB"))
1890 if (!strcmp(roottag,
"pingRequest"))
1894 Msg * mp =
new Msg(
this, root);
1904 log(
"Closing after malformed message\n");
1916 if (!strncmp(roottag,
"set", 3))
1920 if (!strncmp(roottag,
"new", 3))
1922 q2Clients(
this, isblob, dev, name, mp, root);
1933 int isblob = !strcmp(
tagXMLEle(root),
"setBLOBVector");
1937 else if (verbose > 1)
1939 log(fmt(
"read <%s device='%s' name='%s'>\n",
1945 if (!strcmp(roottag,
"getProperties"))
1948 Msg *mp =
new Msg(
this, root);
1962 if (!strcmp(roottag,
"enableBLOB"))
1974 #ifdef OSX_EMBEDED_MODE
1975 if (this->dev.empty())
1976 fprintf(stderr,
"STARTED \"%s\"\n", dp->
name.c_str());
1979 this->dev.insert(
dev);
1986 if (!strcmp(roottag,
"pingRequest"))
1990 Msg * mp =
new Msg(
this, root);
2023 log(
"shut down complete - bye!\n");
2027 #ifdef OSX_EMBEDED_MODE
2028 fprintf(stderr,
"CLIENTS %d\n",
clients.size());
2043 Msg *mp =
new Msg(
this, root);
2058 log(fmt(
"Terminated after #%d restarts.\n",
restarts));
2069 #ifdef OSX_EMBEDED_MODE
2070 fprintf(stderr,
"STOPPED \"%s\"\n",
name.c_str());
2098 std::set<std::string> remoteAdvertised;
2102 if (dp ==
nullptr)
continue;
2105 bool isRemote = !remoteUid.empty();
2113 if (
dev.empty() && isRemote)
2115 if (remoteAdvertised.find(remoteUid) != remoteAdvertised.end())
2120 remoteAdvertised.insert(remoteUid);
2124 if (isRemote == 0 && !strcmp(roottag,
"enableBLOB"))
2130 dp->
log(fmt(
"queuing responsible for <%s device='%s' name='%s'>\n",
2141 std::string meRemoteServerUid = me ? me->remoteServerUid() :
"";
2145 if (dp ==
nullptr)
continue;
2157 if ((!meRemoteServerUid.empty()) && dp->
remoteServerUid() == meRemoteServerUid)
2163 dp->
log(fmt(
"queuing snooped <%s device='%s' name='%s'>\n",
2172 void DvrInfo::addSDevice(
const std::string &dev,
const std::string &name)
2187 log(fmt(
"snooping on %s.%s\n",
dev.c_str(),
name.c_str()));
2207 if (cp ==
nullptr)
continue;
2221 if (cp->
props.size() > 0)
2224 for (
auto pp : cp->
props)
2226 if (pp->dev == dev && pp->name == name)
2242 if (isblob && maxstreamsiz > 0 && ql > maxstreamsiz)
2247 int streamFound = 0;
2250 if (strcmp(
tagXMLEle(ep),
"oneBLOB") == 0)
2264 cp->
log(fmt(
"%ld bytes behind. Dropping stream BLOB...\n", ql));
2271 cp->
log(fmt(
"%ld bytes behind, shutting down\n", ql));
2277 cp->
log(fmt(
"queuing <%s device='%s' name='%s'>\n",
2295 if (cp ==
nullptr)
continue;
2302 for (
auto pp : cp->
props)
2304 if (me->dev.find(pp->dev) != me->dev.end())
2330 cp->
log(fmt(
"%ld bytes behind, shutting down\n", ql));
2337 cp->
log(fmt(
"queuing <%s device='%s' name='%s'>\n",
2345 void MsgQueue::writeToFd()
2350 std::vector<int> sharedBuffers;
2356 log(
"Unexpected write notification");
2363 if (!mp->getContent(nsent, data, nsend, sharedBuffers))
2387 nw = write(wFd, data, nsend);
2392 struct iovec iov[1];
2394 struct cmsghdr * cmsgh;
2396 int fdCount = sharedBuffers.size();
2401 log(fmt(
"attempt to send too many FD\n"));
2406 cmsghdrlength = CMSG_SPACE((fdCount *
sizeof(
int)));
2408 cmsgh = (
struct cmsghdr*)malloc(cmsghdrlength);
2409 memset(cmsgh, 0, cmsghdrlength);
2412 cmsgh->cmsg_len = CMSG_LEN(fdCount *
sizeof(
int));
2413 cmsgh->cmsg_level = SOL_SOCKET;
2414 cmsgh->cmsg_type = SCM_RIGHTS;
2415 msgh.msg_control = cmsgh;
2416 msgh.msg_controllen = cmsghdrlength;
2417 for(
int i = 0; i < fdCount; ++i)
2419 ((
int *) CMSG_DATA(CMSG_FIRSTHDR(&msgh)))[i] = sharedBuffers[i];
2426 msgh.msg_control = cmsgh;
2427 msgh.msg_controllen = cmsghdrlength;
2430 iov[0].iov_base = data;
2431 iov[0].iov_len = nsend;
2434 msgh.msg_name = NULL;
2435 msgh.msg_namelen = 0;
2437 msgh.msg_iovlen = 1;
2439 nw = sendmsg(wFd, &msgh, MSG_NOSIGNAL);
2448 log(
"write returned 0\n");
2450 log(fmt(
"write: %s\n", strerror(
errno)));
2460 log(fmt(
"sending msg nq %ld:\n%.*s\n",
2461 msgq.size(), (
int)nw, data));
2463 else if (verbose > 1)
2465 log(fmt(
"sending %.*s\n", (
int)nw, data));
2471 mp->advance(nsent, nw);
2479 std::string logLine =
"Dying Connection ";
2489 for (
auto pp :
props)
2491 if ((pp->dev == dev) && (pp->name.empty() || (pp->name == name)))
2501 for (
auto pp :
props)
2503 if (pp->dev == dev && pp->name == name)
2513 props.push_back(pp);
2518 if (!strcmp(enableBLOB,
"Also"))
2520 else if (!strcmp(enableBLOB,
"Only"))
2522 else if (!strcmp(enableBLOB,
"Never"))
2537 for (
auto pp :
props)
2541 else if (pp->dev == dev && pp->name == name)
2553 static const char *prtags[] =
2555 "defNumber",
"oneNumber",
"defText",
"oneText",
"defSwitch",
"oneSwitch",
"defLight",
"oneLight",
2558 const char *msg, *perm, *pcd;
2566 fprintf(stderr,
" %s", pcd);
2569 fprintf(stderr,
" %s", perm);
2572 fprintf(stderr,
" '%s'", msg);
2576 for (i = 0; i <
sizeof(prtags) /
sizeof(prtags[0]); i++)
2577 if (strcmp(prtags[i],
tagXMLEle(e)) == 0)
2580 fprintf(stderr,
"\n");
2588 static char *indi_tstamp(
char *s)
2590 static char sbuf[64];
2598 strftime(s,
sizeof(sbuf),
"%Y-%m-%dT%H:%M:%S", tp);
2604 static void logDMsg(
XMLEle *root,
const char *dev)
2608 const char *ts, *ms;
2625 sprintf(logfn,
"%s/%.10s.islog", ldir, ts);
2626 fp = fopen(logfn,
"a");
2629 fprintf(fp,
"%s: %s: %s\n", ts, dev, ms);
2636 fprintf(stderr,
"%s: good bye\n", indi_tstamp(NULL));
2650 restarts(model.restarts)
2666 return this->dev.find(
dev) != this->dev.end();
2671 std::string logLine =
"Driver ";
2682 eio.set<
LocalDvrInfo, &LocalDvrInfo::onEfdEvent>(
this);
2683 pidwatcher.set<
LocalDvrInfo, &LocalDvrInfo::onPidEvent>(
this);
2688 envDev(model.envDev),
2689 envConfig(model.envConfig),
2690 envSkel(model.envSkel),
2691 envPrefix(model.envPrefix)
2693 eio.set<
LocalDvrInfo, &LocalDvrInfo::onEfdEvent>(
this);
2694 pidwatcher.set<
LocalDvrInfo, &LocalDvrInfo::onPidEvent>(
this);
2713 void LocalDvrInfo::closeEfd()
2720 void LocalDvrInfo::closePid()
2726 void LocalDvrInfo::onEfdEvent(ev::io &,
int revents)
2728 if (EV_ERROR & revents)
2730 int sockErrno = readFdError(this->efd);
2733 log(fmt(
"Error on stderr: %s\n", strerror(sockErrno)));
2739 if (revents & EV_READ)
2744 nr = read(efd, errbuff + errbuffpos,
sizeof(errbuff) - errbuffpos);
2749 if (
errno == EAGAIN ||
errno == EWOULDBLOCK)
return;
2751 log(fmt(
"stderr %s\n", strerror(
errno)));
2754 log(
"stderr EOF\n");
2760 for(
int i = 0; i < errbuffpos; ++i)
2762 if (errbuff[i] ==
'\n')
2764 log(fmt(
"%.*s\n", (
int)i, errbuff));
2767 memmove(errbuff, errbuff + i, errbuffpos);
2774 void LocalDvrInfo::onPidEvent(ev::child &,
int revents)
2776 if (revents & EV_CHILD)
2778 if (WIFEXITED(pidwatcher.rstatus))
2780 log(fmt(
"process %d exited with status %d\n", pid, WEXITSTATUS(pidwatcher.rstatus)));
2782 else if (WIFSIGNALED(pidwatcher.rstatus))
2784 int signum = WTERMSIG(pidwatcher.rstatus);
2785 log(fmt(
"process %d killed with signal %d - %s\n", pid, signum, strsignal(signum)));
2788 this->pidwatcher.stop();
2816 for(
auto prop :
props)
2826 std::string logLine = fmt(
"Client %d: ", this->
getRFd());
2837 for(
auto fd : parent->sharedBuffers)
2841 requirements.sharedBuffers.insert(
fd);
2844 requirements.xml =
true;
2846 asyncProgress.set<
SerializedMsg, &SerializedMsg::async_progressed>(
this);
2860 std::lock_guard<std::recursive_mutex> guard(lock);
2866 std::lock_guard<std::recursive_mutex> guard(lock);
2867 if (this->requirements == req)
2871 this->requirements = req;
2872 asyncProgress.send();
2877 std::lock_guard<std::recursive_mutex> guard(lock);
2879 this->chuncks.push_back(m);
2880 asyncProgress.send();
2885 std::lock_guard<std::recursive_mutex> guard(lock);
2887 asyncProgress.send();
2890 void SerializedMsg::async_start()
2892 std::lock_guard<std::recursive_mutex> guard(lock);
2901 asyncProgress.start();
2903 std::thread t([
this]()
2915 void SerializedMsg::async_progressed()
2917 std::lock_guard<std::recursive_mutex> guard(lock);
2922 asyncProgress.stop();
2928 awaiter->messageMayHaveProgressed(
this);
2937 std::lock_guard<std::recursive_mutex> guard(lock);
2945 std::lock_guard<std::recursive_mutex> guard(lock);
2958 if (position.chunckId < chuncks.size())
2967 std::vector<
int, std::allocator<int> > &sharedBuffers)
2969 std::lock_guard<std::recursive_mutex> guard(lock);
2977 if (from.chunckId == chuncks.size())
2982 from.endReached =
true;
2986 const MsgChunck &ck = chuncks[from.chunckId];
2988 if (from.chunckOffset == 0)
2990 sharedBuffers = ck.sharedBufferIdsToAttach;
2994 sharedBuffers.clear();
2997 data = ck.content + from.chunckOffset;
2998 size = ck.contentLength - from.chunckOffset;
3004 std::lock_guard<std::recursive_mutex> guard(lock);
3006 MsgChunck &cur = chuncks[iter.chunckId];
3007 iter.chunckOffset += s;
3008 if (iter.chunckOffset >= cur.contentLength)
3011 iter.chunckOffset = 0;
3014 iter.endReached =
true;
3029 owner->releaseSerialization(
this);
3035 sr.add(requirements);
3047 return owner->queueSize;
3064 for(
auto id : ownSharedBuffers)
3071 MsgChunck::MsgChunck() : sharedBufferIdsToAttach()
3077 MsgChunck::MsgChunck(
char * content,
unsigned long length) : sharedBufferIdsToAttach()
3079 this->content = content;
3080 this->contentLength = length;
3087 hasInlineBlobs =
false;
3088 hasSharedBufferBlobs =
false;
3090 convertionToSharedBuffer =
nullptr;
3091 convertionToInline =
nullptr;
3094 for(
auto blobContent : findBlobElements(xmlContent))
3097 if (attached ==
"true")
3099 hasSharedBufferBlobs =
true;
3103 hasInlineBlobs =
true;
3111 assert(convertionToSharedBuffer ==
nullptr);
3112 assert(convertionToInline ==
nullptr);
3114 releaseXmlContent();
3115 releaseSharedBuffers(std::set<int>());
3120 if (msg == convertionToSharedBuffer)
3122 convertionToSharedBuffer =
nullptr;
3125 if (msg == convertionToInline)
3127 convertionToInline =
nullptr;
3134 void Msg::releaseXmlContent()
3136 if (xmlContent !=
nullptr)
3139 xmlContent =
nullptr;
3143 void Msg::releaseSharedBuffers(
const std::set<int> &keep)
3145 for(std::size_t i = 0; i < sharedBuffers.size(); ++i)
3147 auto fd = sharedBuffers[i];
3148 if (
fd != -1 && keep.find(
fd) == keep.end())
3150 if (close(
fd) == -1)
3152 perror(
"Releasing shared buffer");
3154 sharedBuffers[i] = -1;
3163 if (convertionToSharedBuffer)
3167 if (convertionToInline)
3174 releaseXmlContent();
3177 releaseSharedBuffers(req.sharedBuffers);
3180 if (convertionToSharedBuffer ==
nullptr && convertionToInline ==
nullptr)
3188 std::string sizeStr =
findXMLAttValu(blobWithAttachedBuffer,
"size");
3194 size = std::stoll(sizeStr, &pos, 10);
3195 if (pos != sizeStr.size())
3197 log(
"Invalid size attribute value " + sizeStr);
3204 bool Msg::fetchBlobs(std::list<int> &incomingSharedBuffers)
3207 for(
auto blobContent : findBlobElements(xmlContent))
3212 log(
"Attached blob misses the size attribute");
3217 if (attached ==
"true")
3219 if (incomingSharedBuffers.empty())
3221 log(
"Missing shared buffer...\n");
3225 queueSize += blobSize;
3227 int fd = *incomingSharedBuffers.begin();
3228 incomingSharedBuffers.pop_front();
3230 sharedBuffers.push_back(
fd);
3247 Msg * m =
new Msg(from, root);
3248 if (!m->fetchBlobs(incomingSharedBuffers))
3258 if (convertionToSharedBuffer)
3260 return convertionToSharedBuffer;
3264 if (hasInlineBlobs && from)
3268 return convertionToSharedBuffer;
3273 if (convertionToInline)
3275 return convertionToInline;
3283 if (hasSharedBufferBlobs || hasInlineBlobs)
3287 return buildConvertionToSharedBuffer();
3291 return buildConvertionToInline();
3297 return buildConvertionToInline();
3303 for(
auto blobContent : findBlobElements(
owner->xmlContent))
3307 if (attached !=
"true")
3315 static int xmlReplacementMapFind(
void *
self,
XMLEle * source,
XMLEle * * replace)
3317 auto map = (
const std::unordered_map<XMLEle*, XMLEle*> *)
self;
3318 auto idx = map->find(source);
3319 if (idx == map->end())
3329 return cloneXMLEle(root, &xmlReplacementMapFind, (
void*)&replacement);
3334 return owner->hasInlineBlobs ||
owner->hasSharedBufferBlobs;
3340 auto xmlContent =
owner->xmlContent;
3342 std::vector<XMLEle*> cdata;
3344 std::vector<int> sharedBuffers;
3345 std::vector<ssize_t> xmlSizes;
3346 std::vector<XMLEle *> sharedCData;
3348 std::unordered_map<XMLEle*, XMLEle*> replacement;
3350 int ownerSharedBufferId = 0;
3354 for(
auto blobContent : findBlobElements(xmlContent))
3367 replacement[blobContent] = clone;
3368 cdata.push_back(clone);
3370 if (attached ==
"true")
3381 sharedBuffers.push_back(
owner->sharedBuffers[ownerSharedBufferId++]);
3382 xmlSizes.push_back(size);
3383 sharedCData.push_back(
nullptr);
3387 sharedBuffers.push_back(-1);
3388 xmlSizes.push_back(-1);
3389 sharedCData.push_back(blobContent);
3393 if (replacement.empty())
3397 char * model = (
char*)malloc(
sprlXMLEle(xmlContent, 0) + 1);
3398 int modelSize =
sprXMLEle(model, xmlContent, 0);
3414 std::vector<size_t> modelCdataOffset(cdata.size());
3416 char * model = (
char*)malloc(
sprlXMLEle(xmlContent, 0) + 1);
3417 int modelSize =
sprXMLEle(model, xmlContent, 0);
3422 for(std::size_t i = 0; i < cdata.size(); ++i)
3428 std::vector<int> fds(cdata.size());
3429 std::vector<void*> blobs(cdata.size());
3430 std::vector<size_t> sizes(cdata.size());
3431 std::vector<size_t> attachedSizes(cdata.size());
3434 for(std::size_t i = 0; i < cdata.size(); ++i)
3436 if (sharedBuffers[i] != -1)
3438 fds[i] = sharedBuffers[i];
3441 blobs[i] = attachSharedBuffer(fds[i], dataSize);
3442 attachedSizes[i] = dataSize;
3446 if (xmlSizes[i] != -1 && ((
size_t)xmlSizes[i]) <= dataSize)
3448 dataSize = xmlSizes[i];
3450 sizes[i] = dataSize;
3459 int modelOffset = 0;
3460 for(std::size_t i = 0; i < cdata.size(); ++i)
3462 int cdataOffset = modelCdataOffset[i];
3463 if (cdataOffset > modelOffset)
3468 modelOffset = cdataOffset + 1;
3477 unsigned long buffSze = sizes[i];
3478 const unsigned char* src = (
const unsigned char*)blobs[i];
3485 unsigned long sze = buffSze > 3 * 16384 ? 3 * 16384 : buffSze;
3487 char*
buffer = (
char*) malloc(4 * sze / 3 + 4);
3499 dettachSharedBuffer(fds[i], blobs[i], attachedSizes[i]);
3513 if (modelOffset < modelSize)
3516 modelOffset = modelSize;
3524 return owner->hasInlineBlobs;
3530 auto xmlContent =
owner->xmlContent;
3532 std::vector<int> sharedBuffers =
owner->sharedBuffers;
3534 std::unordered_map<XMLEle*, XMLEle*> replacement;
3536 for(
auto blobContent : findBlobElements(
owner->xmlContent))
3543 if (attached !=
"true")
3551 replacement[blobContent] = clone;
3560 log(
"Missing size value for blob");
3565 if (blob ==
nullptr)
3567 log(fmt(
"Unable to allocate shared buffer of size %d : %s\n", size, strerror(
errno)));
3570 log(fmt(
"Blob allocated at %p\n", blob));
3574 if (actualLen != size)
3577 log(fmt(
"Blob size mismatch after base64dec: %lld vs %lld\n", (
long long int)actualLen, (
long long int)size));
3581 ownSharedBuffers.insert(newFd);
3585 sharedBuffers.insert(sharedBuffers.begin() + blobPos, newFd);
3590 if (!replacement.empty())
3599 chunck.content = (
char*)malloc(
sprlXMLEle(xmlContent, 0) + 1);
3601 chunck.contentLength =
sprXMLEle(chunck.content, xmlContent, 0);
3602 chunck.sharedBufferIdsToAttach = sharedBuffers;
3606 if (!replacement.empty())
3616 rio.set<
MsgQueue, &MsgQueue::ioCb>(
this);
3617 wio.set<
MsgQueue, &MsgQueue::ioCb>(
this);
3636 for(
auto mp : msgqcp)
3658 if (shutdown(oldWFd, SHUT_WR) == -1)
3660 if (
errno != ENOTCONN)
3662 log(fmt(
"socket shutdown failed: %s\n", strerror(
errno)));
3669 if (::
close(oldWFd) == -1)
3671 log(fmt(
"socket close failed: %s\n", strerror(
errno)));
3679 if (this->rFd != -1)
3684 if (this->rFd != this->wFd)
3689 else if (this->wFd != -1)
3697 this->nsent.
reset();
3701 fcntl(rFd, F_SETFL, fcntl(rFd, F_GETFL, 0) | O_NONBLOCK);
3704 fcntl(wFd, F_SETFL, fcntl(wFd, F_GETFL, 0) | O_NONBLOCK);
3708 wio.set(wFd, ev::WRITE);
3715 if (msgq.empty())
return nullptr;
3716 return *(msgq.begin());
3739 msgq.push_back(serialized);
3740 serialized->addAwaiter(
this);
3746 void MsgQueue::updateIos()
3750 if (msgq.empty() || !msgq.front()->requestContent(nsent))
3767 if ((!msgq.empty()) && (msgq.front() == msg))
3777 auto queueCopy = msgq;
3778 for(
auto mp : queueCopy)
3791 unsigned long l = 0;
3793 for (
auto mp : msgq)
3796 l += mp->queueSize();
3802 void MsgQueue::ioCb(ev::io &,
int revents)
3804 if (EV_ERROR & revents)
3806 int sockErrno = readFdError(this->rFd);
3807 if ((!sockErrno) && this->wFd != this->rFd)
3809 sockErrno = readFdError(this->wFd);
3814 log(fmt(
"Communication error: %s\n", strerror(sockErrno)));
3820 if (revents & EV_READ)
3823 if (revents & EV_WRITE)
3827 size_t MsgQueue::doRead(
char * buf,
size_t nr)
3832 return read(rFd, buf,
sizeof(buf));
3842 struct cmsghdr cmsgh;
3850 msgh.msg_name = NULL;
3851 msgh.msg_namelen = 0;
3852 msgh.msg_iov = &iov;
3853 msgh.msg_iovlen = 1;
3855 msgh.msg_control = control_un.control;
3856 msgh.msg_controllen =
sizeof(control_un.control);
3860 recvflag = MSG_CMSG_CLOEXEC;
3864 int size = recvmsg(rFd, &msgh, recvflag);
3870 for (
struct cmsghdr * cmsg = CMSG_FIRSTHDR(&msgh); cmsg != NULL; cmsg = CMSG_NXTHDR(&msgh, cmsg))
3872 if (cmsg->cmsg_level == SOL_SOCKET && cmsg->cmsg_type == SCM_RIGHTS)
3875 while(cmsg->cmsg_len >= CMSG_LEN((fdCount + 1) *
sizeof(
int)))
3880 int * fds = (
int*)CMSG_DATA(cmsg);
3881 for(
int i = 0; i < fdCount; ++i)
3884 fcntl(fds[i], F_SETFD, FD_CLOEXEC);
3886 incomingSharedBuffers.push_back(fds[i]);
3891 log(fmt(
"Ignoring ancillary data level %d, type %d\n", cmsg->cmsg_level, cmsg->cmsg_type));
3898 void MsgQueue::readFromFd()
3904 nr = doRead(buf,
sizeof(buf));
3907 if (
errno == EAGAIN ||
errno == EWOULDBLOCK)
return;
3910 log(fmt(
"read: %s\n", strerror(
errno)));
3911 else if (verbose > 0)
3912 log(fmt(
"read EOF\n"));
3922 log(fmt(
"XML error: %s\n", err));
3923 log(fmt(
"XML read: %.*s\n", (
int)nr, buf));
3930 XMLEle *root = nodes[inode];
3939 else if (verbose > 1)
3941 log(fmt(
"read <%s device='%s' name='%s'>\n",
3953 root = nodes[inode];
3959 static std::vector<XMLEle *> findBlobElements(
XMLEle * root)
3961 std::vector<XMLEle *> result;
3964 if (strcmp(
tagXMLEle(ep),
"oneBLOB") == 0)
3966 result.push_back(ep);
3972 static void log(
const std::string &log)
3974 fprintf(stderr,
"%s: ", indi_tstamp(NULL));
3975 fprintf(stderr,
"%s", log.c_str());
3978 static int readFdError(
int fd)
3986 iov.iov_base = &rcvbuf;
3987 iov.iov_len =
sizeof(rcvbuf);
3989 msg.msg_name =
nullptr;
3990 msg.msg_namelen = 0;
3994 msg.msg_control = cbuf;
3995 msg.msg_controllen =
sizeof(cbuf);
3997 int recv_bytes = recvmsg(
fd, &msg, MSG_ERRQUEUE | MSG_DONTWAIT);
3998 if (recv_bytes == -1)
4000 if (
errno == EAGAIN ||
errno == EWOULDBLOCK)
return 0;
4005 for (
struct cmsghdr * cmsg = CMSG_FIRSTHDR(&msg); cmsg != NULL; cmsg = CMSG_NXTHDR(&msg, cmsg))
4007 fprintf(stderr,
"cmsg_len=%zu, cmsg_level=%u, cmsg_type=%u\n", cmsg->cmsg_len, cmsg->cmsg_level, cmsg->cmsg_type);
4009 if (cmsg->cmsg_level == SOL_IP && cmsg->cmsg_type == IP_RECVERR)
4011 return ((
struct sock_extended_err *)CMSG_DATA(cmsg))->ee_errno;
4022 static void * attachSharedBuffer(
int fd,
size_t &size)
4025 if (fstat(
fd, &sb) == -1)
4027 perror(
"invalid shared buffer fd");
4031 void * ret = mmap(0, size, PROT_READ, MAP_SHARED,
fd, 0);
4033 if (ret == MAP_FAILED)
4042 static void dettachSharedBuffer(
int fd,
void * ptr,
size_t size)
4045 if (munmap(ptr, size) == -1)
4047 perror(
"shared buffer munmap");
4052 static std::string fmt(
const char *fmt, ...)
4055 int size =
sizeof(
buffer);
4061 size = vsnprintf(p, size, fmt, ap);
4066 perror(
"vnsprintf");
4069 if ((
unsigned)size <
sizeof(
buffer))
4071 return std::string(
buffer);
4075 p = (
char*)malloc(size);
4083 size = vsnprintf(p, size, fmt, ap);
4089 perror(
"vnsprintf");
int to64frombits_s(unsigned char *out, const unsigned char *in, int inlen, size_t outlen)
Convert bytes array to base64.
int from64tobits_fast(char *out, const char *in, int inlen)
__attribute__((__format__(__printf__, 2, 0))) void CelestronDriver
static ConcurrentSet< ClInfo > clients
virtual void log(const std::string &log) const
static void q2Clients(ClInfo *notme, int isblob, const std::string &dev, const std::string &name, Msg *mp, XMLEle *root)
static void q2Servers(DvrInfo *me, Msg *mp, XMLEle *root)
ClInfo(bool useSharedBuffer)
void crackBLOBHandling(const std::string &dev, const std::string &name, const char *enableBLOB)
int findDevice(const std::string &dev, const std::string &name) const
std::list< Property * > props
virtual void onMessage(XMLEle *root, std::list< int > &sharedBuffers)
void addDevice(const std::string &dev, const std::string &name, int isblob)
HeartBeat heartBeat() const
bool operator!=(const iterator &o)
iterator(const ConcurrentSet< M > *parent)
M * operator[](unsigned long id) const
std::vector< unsigned long > ids() const
virtual void log(const std::string &log) const
std::set< std::string > dev
static void q2RDrivers(const std::string &dev, Msg *mp, XMLEle *root)
bool isHandlingDevice(const std::string &dev) const
static void q2SDrivers(DvrInfo *me, int isblob, const std::string &dev, const std::string &name, Msg *mp, XMLEle *root)
virtual bool acceptSharedBuffers() const
virtual const std::string remoteServerUid() const =0
static ConcurrentSet< DvrInfo > drivers
Property * findSDevice(const std::string &dev, const std::string &name) const
virtual DvrInfo * clone() const =0
virtual void onMessage(XMLEle *root, std::list< int > &sharedBuffers)
virtual void closeWritePart()
std::list< Property * > sprops
DvrInfo(const DvrInfo &model)
Fifo(const std::string &name)
virtual LocalDvrInfo * clone() const
virtual const std::string remoteServerUid() const
static void crackBLOB(const char *enableBLOB, BLOBHandling *bp)
void messageMayHaveProgressed(const SerializedMsg *msg)
MsgQueue(bool useSharedBuffer)
virtual void onMessage(XMLEle *root, std::list< int > &sharedBuffers)=0
virtual bool acceptSharedBuffers() const
void traceMsg(const std::string &log, XMLEle *root)
virtual void closeWritePart()
unsigned long msgQSize() const
virtual void log(const std::string &log) const
SerializedMsg * headMsg() const
void setFds(int rFd, int wFd)
SerializedMsg * serialize(MsgQueue *from)
Msg(MsgQueue *from, XMLEle *root)
friend class SerializedMsgWithSharedBuffer
friend class SerializedMsgWithoutSharedBuffer
static Msg * fromXml(MsgQueue *from, XMLEle *root, std::list< int > &incomingSharedBuffers)
Property(const std::string &dev, const std::string &name)
virtual const std::string remoteServerUid() const
virtual RemoteDvrInfo * clone() const
virtual ~SerializedMsgWithSharedBuffer()
SerializedMsgWithSharedBuffer(Msg *parent)
virtual bool generateContentAsync() const
virtual void generateContent()
virtual bool generateContentAsync() const
SerializedMsgWithoutSharedBuffer(Msg *parent)
virtual void generateContent()
virtual ~SerializedMsgWithoutSharedBuffer()
void async_updateRequirement(const SerializationRequirement &n)
SerializationStatus asyncStatus
bool getContent(MsgChunckIterator &position, void *&data, ssize_t &nsend, std::vector< int > &sharedBuffers)
virtual bool generateContentAsync() const =0
MsgQueue * blockedProducer
void collectRequirements(SerializationRequirement &req)
std::list< void * > ownBuffers
void blockReceiver(MsgQueue *toblock)
bool requestContent(const MsgChunckIterator &position)
std::set< MsgQueue * > awaiters
virtual void generateContent()=0
SerializedMsg(Msg *parent)
void advance(MsgChunckIterator &position, ssize_t s)
void addAwaiter(MsgQueue *awaiter)
void release(MsgQueue *from)
void async_pushChunck(const MsgChunck &m)
Constants and Data structure definitions for the interface to the reference INDI C API implementation...
Interface to the reference INDI C API device implementation on the Device Driver side.
BLOBHandling
How drivers handle BLOBs incoming from snooping drivers.
bool parseBlobSize(XMLEle *blobWithAttachedBuffer, ssize_t &size)
#define MAXFD_PER_MESSAGE
int main(int ac, char *av[])
XMLEle * cloneXMLEleWithReplacementMap(XMLEle *root, const std::unordered_map< XMLEle *, XMLEle * > &replacement)
XMLAtt * findXMLAtt(XMLEle *ep, const char *name)
Find an XML attribute within an XML element.
XMLEle * shallowCloneXMLEle(XMLEle *ele)
return a surface copy of a node. Don't copy childs or cdata.
LilXML * newLilXML()
Create a new lilxml parser.
XMLEle * cloneXMLEle(XMLEle *ep)
XMLEle ** parseXMLChunk(LilXML *lp, char *buf, int size, char ynot[])
Process an XML chunk.
size_t sprXMLEle(char *s, XMLEle *ep, int level)
sample print ep to string s. N.B. s must be at least as large as that reported by sprlXMLEle()+1....
const char * findXMLAttValu(XMLEle *ep, const char *name)
Find an XML element's attribute value.
XMLAtt * addXMLAtt(XMLEle *ep, const char *name, const char *valu)
Add an XML attribute to an existing XML element.
char * pcdataXMLEle(XMLEle *ep)
Return the pcdata of an XML element.
void editXMLEle(XMLEle *ep, const char *pcdata)
set the pcdata of the given element
char * tagXMLEle(XMLEle *ep)
Return the tag of an XML element.
size_t sprXMLCDataOffset(XMLEle *root, XMLEle *ep, int level)
return exact position of cdata of child in printed representation of root N.B. set level = 0 on first...
void rmXMLAtt(XMLEle *ep, const char *name)
Remove an XML attribute from an XML element.
void prXMLEle(FILE *fp, XMLEle *ep, int level)
Print an XML element.
size_t sprlXMLEle(XMLEle *ep, int level)
return number of bytes in a string guaranteed able to hold result of sprXLMEle(ep) (sans trailing \0@...
XMLEle * nextXMLEle(XMLEle *ep, int init)
Iterate an XML element for a list of nesetd XML elements.
void delXMLEle(XMLEle *ep)
delXMLEle Delete XML element.
void delLilXML(LilXML *lp)
Delete a lilxml parser.
XMLEle * setXMLEleTag(XMLEle *ep, const char *tag)
Update the tag of an element.
XMLEle * addXMLEle(XMLEle *parent, const char *tag)
add an element with the given tag to the given element. parent can be NULL to make a new root.
int pcdatalenXMLEle(XMLEle *ep)
Return the number of characters in pcdata in an XML element.
char * valuXMLAtt(XMLAtt *ap)
Return the value of an XML attribute.
A little DOM-style library to handle parsing and processing an XML file.
std::vector< uint8_t > buffer
@ replace
replace invalid UTF-8 sequences with U+FFFD
NLOHMANN_BASIC_JSON_TPL_DECLARATION std::string to_string(const NLOHMANN_BASIC_JSON_TPL &j)
user-defined to_string function for JSON values
bool operator==(json_pointer< RefStringTypeLhs > const &lhs, json_pointer< RefStringTypeRhs > const &rhs) noexcept
std::unique_ptr< RadioSim > receiver(new RadioSim())
void IDSharedBlobDettach(void *ptr)
void * IDSharedBlobAlloc(size_t size)
int IDSharedBlobGetFd(void *ptr)