Instrument Neutral Distributed Interface INDI  2.0.2
indiserver.cpp
Go to the documentation of this file.
1 /* INDI Server for protocol version 1.7.
2  * Copyright (C) 2007 Elwood C. Downey <ecdowney@clearskyinstitute.com>
3  2013 Jasem Mutlaq <mutlaqja@ikarustech.com>
4  2022 Ludovic Pollet
5  This library is free software; you can redistribute it and/or
6  modify it under the terms of the GNU Lesser General Public
7  License as published by the Free Software Foundation; either
8  version 2.1 of the License, or (at your option) any later version.
9 
10  This library is distributed in the hope that it will be useful,
11  but WITHOUT ANY WARRANTY; without even the implied warranty of
12  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13  Lesser General Public License for more details.
14 
15  You should have received a copy of the GNU Lesser General Public
16  License along with this library; if not, write to the Free Software
17  Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
18 
19  * argv lists names of Driver programs to run or sockets to connect for Devices.
20  * Drivers are restarted if they exit or connection closes.
21  * Each local Driver's stdin/out are assumed to provide INDI traffic and are
22  * connected here via pipes. Local Drivers' stderr are connected to our
23  * stderr with date stamp and driver name prepended.
24  * We only support Drivers that advertise support for one Device. The problem
25  * with multiple Devices in one Driver is without a way to know what they
26  * _all_ are there is no way to avoid sending all messages to all Drivers.
27  * Outbound messages are limited to Devices and Properties seen inbound.
28  * Messages to Devices on sockets always include Device so the chained
29  * indiserver will only pass back info from that Device.
30  * All newXXX() received from one Client are echoed to all other Clients who
31  * have shown an interest in the same Device and property.
32  *
33  * 2017-01-29 JM: Added option to drop stream blobs if client blob queue is
34  * higher than maxstreamsiz bytes
35  *
36  * Implementation notes:
37  *
38  * We fork each driver and open a server socket listening for INDI clients.
39  * Then forever we listen for new clients and pass traffic between clients and
40  * drivers, subject to optimizations based on sniffing messages for matching
41  * Devices and Properties. Since one message might be destined to more than
42  * one client or device, they are queued and only removed after the last
43  * consumer is finished. XMLEle are converted to linear strings before being
44  * sent to optimize write system calls and avoid blocking to slow clients.
45  * Clients that get more than maxqsiz bytes behind are shut down.
46  */
47 #ifndef _GNU_SOURCE
48 #define _GNU_SOURCE // needed for siginfo_t and sigaction
49 #endif
50 
51 #include "config.h"
52 #include <set>
53 #include <string>
54 #include <list>
55 #include <map>
56 #include <unordered_map>
57 #include <vector>
58 #include <thread>
59 #include <mutex>
60 
61 #include <assert.h>
62 
63 #include "indiapi.h"
64 #include "indidevapi.h"
65 #include "sharedblob.h"
66 #include "lilxml.h"
67 #include "base64.h"
68 
69 #include <errno.h>
70 #include <fcntl.h>
71 #include <libgen.h>
72 #include <netdb.h>
73 #include <signal.h>
74 #include <stdio.h>
75 #include <stdlib.h>
76 #include <stdarg.h>
77 #include <stdint.h>
78 #include <string.h>
79 #include <time.h>
80 #include <poll.h>
81 #include <unistd.h>
82 #include <arpa/inet.h>
83 #include <netinet/in.h>
84 #include <sys/time.h>
85 #include <sys/ioctl.h>
86 #include <sys/types.h>
87 #include <sys/wait.h>
88 #include <sys/stat.h>
89 #include <sys/socket.h>
90 #include <sys/mman.h>
91 #include <unistd.h>
92 #include <sys/un.h>
93 #ifdef MSG_ERRQUEUE
94 #include <linux/errqueue.h>
95 #endif
96 
97 #include <ev++.h>
98 
99 #define INDIPORT 7624 /* default TCP/IP port to listen */
100 #define INDIUNIXSOCK "/tmp/indiserver" /* default unix socket path (local connections) */
101 #define MAXSBUF 512
102 #define MAXRBUF 49152 /* max read buffering here */
103 #define MAXWSIZ 49152 /* max bytes/write */
104 #define SHORTMSGSIZ 2048 /* buf size for most messages */
105 #define DEFMAXQSIZ 128 /* default max q behind, MB */
106 #define DEFMAXSSIZ 5 /* default max stream behind, MB */
107 #define DEFMAXRESTART 10 /* default max restarts */
108 #define MAXFD_PER_MESSAGE 16 /* No more than 16 buffer attached to a message */
109 #ifdef OSX_EMBEDED_MODE
110 #define LOGNAME "/Users/%s/Library/Logs/indiserver.log"
111 #define FIFONAME "/tmp/indiserverFIFO"
112 #endif
113 
114 #define STRINGIFY_TOK(x) #x
115 #define TO_STRING(x) STRINGIFY_TOK(x)
116 
117 static ev::default_loop loop;
118 
119 template<class M>
121 {
122  unsigned long identifier = 1;
123  std::map<unsigned long, M*> items;
124 
125  public:
126  void insert(M* item)
127  {
128  item->id = identifier++;
129  items[item->id] = item;
130  item->current = (ConcurrentSet<void>*)this;
131  }
132 
133  void erase(M* item)
134  {
135  items.erase(item->id);
136  item->id = 0;
137  item->current = nullptr;
138  }
139 
140  std::vector<unsigned long> ids() const
141  {
142  std::vector<unsigned long> result;
143  for(auto item : items)
144  {
145  result.push_back(item.first);
146  }
147  return result;
148  }
149 
150  M* operator[](unsigned long id) const
151  {
152  auto e = items.find(id);
153  if (e == items.end())
154  {
155  return nullptr;
156  }
157  return e->second;
158  }
159 
160  class iterator
161  {
162  friend class ConcurrentSet<M>;
163  const ConcurrentSet<M> * parent;
164  std::vector<unsigned long> ids;
165  // Will be -1 when done
166  long int pos = 0;
167 
168  void skip()
169  {
170  if (pos == -1) return;
171  while(pos < (long int)ids.size() && !(*parent)[ids[pos]])
172  {
173  pos++;
174  }
175  if (pos == (long int)ids.size())
176  {
177  pos = -1;
178  }
179  }
180  public:
181  iterator(const ConcurrentSet<M> * parent) : parent(parent) {}
182 
183  bool operator!=(const iterator &o)
184  {
185  return pos != o.pos;
186  }
187 
189  {
190  if (pos != -1)
191  {
192  pos++;
193  skip();
194  }
195  return *this;
196  }
197 
198  M * operator*() const
199  {
200  return (*parent)[ids[pos]];
201  }
202  };
203 
204  iterator begin() const
205  {
206  iterator result(this);
207  for(auto item : items)
208  {
209  result.ids.push_back(item.first);
210  }
211  result.skip();
212  return result;
213  }
214 
215  iterator end() const
216  {
217  iterator result(nullptr);
218  result.pos = -1;
219  return result;
220  }
221 };
222 
223 /* An object that can be put in a ConcurrentSet, and provide a heartbeat
224  * to detect removal from ConcurrentSet
225  */
227 {
228  template<class P> friend class ConcurrentSet;
229  unsigned long id = 0;
230  const ConcurrentSet<void> * current;
231 
232  /* Keep the id */
233  class HeartBeat
234  {
235  friend class Collectable;
236  unsigned long id;
237  const ConcurrentSet<void> * current;
238  HeartBeat(unsigned long id, const ConcurrentSet<void> * current)
239  : id(id), current(current) {}
240  public:
241  bool alive() const
242  {
243  return id != 0 && (*current)[id] != nullptr;
244  }
245  };
246 
247  protected:
248  /* heartbeat.alive will return true as long as this item has not changed collection.
249  * Also detect deletion of the Collectable */
250  HeartBeat heartBeat() const
251  {
252  return HeartBeat(id, current);
253  }
254 };
255 
262 {
263  friend class SerializedMsg;
266  friend class MsgChunckIterator;
267 
268  MsgChunck();
269  MsgChunck(char * content, unsigned long length);
270 
271  char * content;
272  unsigned long contentLength;
273 
274  std::vector<int> sharedBufferIdsToAttach;
275 };
276 
277 class Msg;
278 class MsgQueue;
279 class MsgChunckIterator;
280 
282 {
283  friend class Msg;
284  friend class SerializedMsg;
285 
286  // If the xml is still required
287  bool xml;
288  // Set of sharedBuffer that are still required
289  std::set<int> sharedBuffers;
290 
291  SerializationRequirement() : sharedBuffers()
292  {
293  xml = false;
294  }
295 
296  void add(const SerializationRequirement &from)
297  {
298  xml |= from.xml;
299  for(auto fd : from.sharedBuffers)
300  {
301  sharedBuffers.insert(fd);
302  }
303  }
304 
305  bool operator==(const SerializationRequirement &sr) const
306  {
307  return (xml == sr.xml) && (sharedBuffers == sr.sharedBuffers);
308  }
309 };
310 
312 
314 {
315  friend class Msg;
316  friend class MsgChunckIterator;
317 
318  std::recursive_mutex lock;
319  ev::async asyncProgress;
320 
321  // Start a thread for execution of asyncRun
322  void async_start();
323  void async_cancel();
324 
325  // Called within main loop when async task did some progress
326  void async_progressed();
327 
328  // The requirements. Prior to starting, everything is required.
329  SerializationRequirement requirements;
330 
331  void produce(bool sync);
332 
333  protected:
334  // These methods are to be called from asyncRun
335  bool async_canceled();
337  void async_pushChunck(const MsgChunck &m);
338  void async_done();
339 
340  // True if a producing thread is active
341  bool isAsyncRunning();
342 
343  protected:
344 
347 
349 
350  std::set<MsgQueue *> awaiters;
351  private:
352  std::vector<MsgChunck> chuncks;
353 
354  protected:
355  // Buffers malloced during asyncRun
356  std::list<void*> ownBuffers;
357 
358  // This will notify awaiters and possibly release the owner
359  void onDataReady();
360 
361  virtual bool generateContentAsync() const = 0;
362  virtual void generateContent() = 0;
363 
365 
366  // The task will cancel itself if all owner release it
367  void abort();
368 
369  // Make sure the given receiver will not be processed until this task complete
370  // TODO : to implement + make sure the task start when it actually block something
371  void blockReceiver(MsgQueue * toblock);
372 
373  public:
374  SerializedMsg(Msg * parent);
375  virtual ~SerializedMsg();
376 
377  // Calling requestContent will start production
378  // Return true if some content is available
379  bool requestContent(const MsgChunckIterator &position);
380 
381  // Return true if some content is available
382  // It is possible to have 0 to send, meaning end was actually reached
383  bool getContent(MsgChunckIterator &position, void * &data, ssize_t &nsend, std::vector<int> &sharedBuffers);
384 
385  void advance(MsgChunckIterator &position, ssize_t s);
386 
387  // When a queue is done with sending this message
388  void release(MsgQueue * from);
389 
390  void addAwaiter(MsgQueue * awaiter);
391 
392  ssize_t queueSize();
393 };
394 
396 {
397  std::set<int> ownSharedBuffers;
398  protected:
399  bool detectInlineBlobs();
400 
401  public:
404 
405  virtual bool generateContentAsync() const;
406  virtual void generateContent();
407 };
408 
410 {
411 
412  public:
415 
416  virtual bool generateContentAsync() const;
417  virtual void generateContent();
418 };
419 
421 {
422  friend class SerializedMsg;
423  std::size_t chunckId;
424  unsigned long chunckOffset;
425  bool endReached;
426  public:
428  {
429  reset();
430  }
431 
432  // Point to start of message.
433  void reset()
434  {
435  chunckId = 0;
436  chunckOffset = 0;
437  // No risk of 0 length message, so always false here
438  endReached = false;
439  }
440 
441  bool done() const
442  {
443  return endReached;
444  }
445 };
446 
447 
448 class Msg
449 {
450  friend class SerializedMsg;
453  private:
454  // Present for sure until message queing is doned. Prune asap then
455  XMLEle * xmlContent;
456 
457  // Present until message was queued.
458  MsgQueue * from;
459 
460  int queueSize;
461  bool hasInlineBlobs;
462  bool hasSharedBufferBlobs;
463 
464  std::vector<int> sharedBuffers; /* fds of shared buffer */
465 
466  // Convertion task and resultat of the task
467  SerializedMsg* convertionToSharedBuffer;
468  SerializedMsg* convertionToInline;
469 
470  SerializedMsg * buildConvertionToSharedBuffer();
471  SerializedMsg * buildConvertionToInline();
472 
473  bool fetchBlobs(std::list<int> &incomingSharedBuffers);
474 
475  void releaseXmlContent();
476  void releaseSharedBuffers(const std::set<int> &keep);
477 
478  // Remove resources that can be removed.
479  // Will be called when queuingDone is true and for every change of staus from convertionToXXX
480  void prune();
481 
482  void releaseSerialization(SerializedMsg * form);
483 
484  ~Msg();
485 
486  public:
487  /* Message will not be queued anymore. Release all possible resources, incl self */
488  void queuingDone();
489 
490  Msg(MsgQueue * from, XMLEle * root);
491 
492  static Msg * fromXml(MsgQueue * from, XMLEle * root, std::list<int> &incomingSharedBuffers);
493 
512  SerializedMsg * serialize(MsgQueue * from);
513 };
514 
515 class MsgQueue: public Collectable
516 {
517  int rFd, wFd;
518  LilXML * lp; /* XML parsing context */
519  ev::io rio, wio; /* Event loop io events */
520  void ioCb(ev::io &watcher, int revents);
521 
522  // Update the status of FD read/write ability
523  void updateIos();
524 
525  std::set<SerializedMsg*> readBlocker; /* The message that block this queue */
526 
527  std::list<SerializedMsg*> msgq; /* To send msg queue */
528  std::list<int> incomingSharedBuffers; /* During reception, fds accumulate here */
529 
530  // Position in the head message
531  MsgChunckIterator nsent;
532 
533  // Handle fifo or socket case
534  size_t doRead(char * buff, size_t len);
535  void readFromFd();
536 
537  /* write the next chunk of the current message in the queue to the given
538  * client. pop message from queue when complete and free the message if we are
539  * the last one to use it. shut down this client if trouble.
540  */
541  void writeToFd();
542 
543  protected:
545  int getRFd() const
546  {
547  return rFd;
548  }
549  int getWFd() const
550  {
551  return wFd;
552  }
553 
554  /* print key attributes and values of the given xml to stderr. */
555  void traceMsg(const std::string &log, XMLEle *root);
556 
557  /* Close the connection. (May be restarted later depending on driver logic) */
558  virtual void close() = 0;
559 
560  /* Close the writing part of the connection. By default, shutdown the write part, but keep on reading. May delete this */
561  virtual void closeWritePart();
562 
563  /* Handle a message. root will be freed by caller. fds of buffers will be closed, unless set to -1 */
564  virtual void onMessage(XMLEle *root, std::list<int> &sharedBuffers) = 0;
565 
566  /* convert the string value of enableBLOB to our B_ state value.
567  * no change if unrecognized
568  */
569  static void crackBLOB(const char *enableBLOB, BLOBHandling *bp);
570 
572  public:
573  virtual ~MsgQueue();
574 
575  void pushMsg(Msg * msg);
576 
577  /* return storage size of all Msqs on the given q */
578  unsigned long msgQSize() const;
579 
580  SerializedMsg * headMsg() const;
581  void consumeHeadMsg();
582 
583  /* Remove all messages from queue */
584  void clearMsgQueue();
585 
586  void messageMayHaveProgressed(const SerializedMsg * msg);
587 
588  void setFds(int rFd, int wFd);
589 
590  virtual bool acceptSharedBuffers() const
591  {
592  return useSharedBuffer;
593  }
594 
595  virtual void log(const std::string &log) const;
596 };
597 
598 /* device + property name */
599 class Property
600 {
601  public:
602  std::string dev;
603  std::string name;
604  BLOBHandling blob = B_NEVER; /* when to snoop BLOBs */
605 
606  Property(const std::string &dev, const std::string &name): dev(dev), name(name) {}
607 };
608 
609 
610 class Fifo
611 {
612  std::string name; /* Path to FIFO for dynamic startups & shutdowns of drivers */
613 
614  char buffer[1024];
615  int bufferPos = 0;
616  int fd = -1;
617  ev::io fdev;
618 
619  void close();
620  void open();
621  void processLine(const char * line);
622 
623  /* Read commands from FIFO and process them. Start/stop drivers accordingly */
624  void read();
625  void ioCb(ev::io &watcher, int revents);
626  public:
627  Fifo(const std::string &name);
628  void listen()
629  {
630  open();
631  }
632 };
633 
634 static Fifo * fifo = nullptr;
635 
636 
637 class DvrInfo;
638 
639 /* info for each connected client */
640 class ClInfo: public MsgQueue
641 {
642  protected:
643  /* send message to each appropriate driver.
644  * also send all newXXX() to all other interested clients.
645  */
646  virtual void onMessage(XMLEle *root, std::list<int> &sharedBuffers);
647 
648  /* Update the client property BLOB handling policy */
649  void crackBLOBHandling(const std::string &dev, const std::string &name, const char *enableBLOB);
650 
651  /* close down the given client */
652  virtual void close();
653 
654  public:
655  std::list<Property*> props; /* props we want */
656  int allprops = 0; /* saw getProperties w/o device */
657  BLOBHandling blob = B_NEVER; /* when to send setBLOBs */
658 
659  ClInfo(bool useSharedBuffer);
660  virtual ~ClInfo();
661 
662  /* return 0 if cp may be interested in dev/name else -1
663  */
664  int findDevice(const std::string &dev, const std::string &name) const;
665 
666  /* add the given device and property to the props[] list of client if new.
667  */
668  void addDevice(const std::string &dev, const std::string &name, int isblob);
669 
670  virtual void log(const std::string &log) const;
671 
672  /* put Msg mp on queue of each chained server client, except notme.
673  */
674  static void q2Servers(DvrInfo *me, Msg *mp, XMLEle *root);
675 
676  /* put Msg mp on queue of each client interested in dev/name, except notme.
677  * if BLOB always honor current mode.
678  */
679  static void q2Clients(ClInfo *notme, int isblob, const std::string &dev, const std::string &name, Msg *mp, XMLEle *root);
680 
681  /* Reference to all active clients */
683 };
684 
685 /* info for each connected driver */
686 class DvrInfo: public MsgQueue
687 {
688  /* add dev/name to this device's snooping list.
689  * init with blob mode set to B_NEVER.
690  */
691  void addSDevice(const std::string &dev, const std::string &name);
692 
693  public:
694  /* return Property if dp is this driver is snooping dev/name, else NULL.
695  */
696  Property *findSDevice(const std::string &dev, const std::string &name) const;
697 
698  protected:
699  /* send message to each interested client
700  */
701  virtual void onMessage(XMLEle *root, std::list<int> &sharedBuffers);
702 
703  /* override to kill driver that are not reachable anymore */
704  virtual void closeWritePart();
705 
706 
707  /* Construct an instance that will start the same driver */
708  DvrInfo(const DvrInfo &model);
709 
710  public:
711  std::string name; /* persistent name */
712 
713  std::set<std::string> dev; /* device served by this driver */
714  std::list<Property*>sprops; /* props we snoop */
715  int restarts; /* times process has been restarted */
716  bool restart = true; /* Restart on shutdown */
717 
718  DvrInfo(bool useSharedBuffer);
719  virtual ~DvrInfo();
720 
721  bool isHandlingDevice(const std::string &dev) const;
722 
723  /* start the INDI driver process or connection.
724  * exit if trouble.
725  */
726  virtual void start() = 0;
727 
728  /* close down the given driver and restart if set*/
729  virtual void close();
730 
731  /* Allocate an instance that will start the same driver */
732  virtual DvrInfo * clone() const = 0;
733 
734  virtual void log(const std::string &log) const;
735 
736  virtual const std::string remoteServerUid() const = 0;
737 
738  /* put Msg mp on queue of each driver responsible for dev, or all drivers
739  * if dev empty.
740  */
741  static void q2RDrivers(const std::string &dev, Msg *mp, XMLEle *root);
742 
743  /* put Msg mp on queue of each driver snooping dev/name.
744  * if BLOB always honor current mode.
745  */
746  static void q2SDrivers(DvrInfo *me, int isblob, const std::string &dev, const std::string &name, Msg *mp, XMLEle *root);
747 
748  /* Reference to all active drivers */
750 
751  // decoding of attached blobs from driver is not supported ATM. Be conservative here
752  virtual bool acceptSharedBuffers() const
753  {
754  return false;
755  }
756 };
757 
758 class LocalDvrInfo: public DvrInfo
759 {
760  char errbuff[1024]; /* buffer for stderr pipe. line too long will be clipped */
761  int errbuffpos = 0; /* first free pos in buffer */
762  ev::io eio; /* Event loop io events */
763  ev::child pidwatcher;
764  void onEfdEvent(ev::io &watcher, int revents); /* callback for data on efd */
765  void onPidEvent(ev::child &watcher, int revents);
766 
767  int pid = 0; /* process id or 0 for N/A (not started/terminated) */
768  int efd = -1; /* stderr from driver, or -1 when N/A */
769 
770  void closeEfd();
771  void closePid();
772  protected:
773  LocalDvrInfo(const LocalDvrInfo &model);
774 
775  public:
776  std::string envDev;
777  std::string envConfig;
778  std::string envSkel;
779  std::string envPrefix;
780 
781  LocalDvrInfo();
782  virtual ~LocalDvrInfo();
783 
784  virtual void start();
785 
786  virtual LocalDvrInfo * clone() const;
787 
788  virtual const std::string remoteServerUid() const
789  {
790  return "";
791  }
792 };
793 
794 class RemoteDvrInfo: public DvrInfo
795 {
796  /* open a connection to the given host and port or die.
797  * return socket fd.
798  */
799  int openINDIServer();
800 
801  void extractRemoteId(const std::string &name, std::string &o_host, int &o_port, std::string &o_dev) const;
802 
803  protected:
804  RemoteDvrInfo(const RemoteDvrInfo &model);
805 
806  public:
807  std::string host;
808  int port;
809 
810  RemoteDvrInfo();
811  virtual ~RemoteDvrInfo();
812 
813  virtual void start();
814 
815  virtual RemoteDvrInfo * clone() const;
816 
817  virtual const std::string remoteServerUid() const
818  {
819  return std::string(host) + ":" + std::to_string(port);
820  }
821 };
822 
824 {
825  int port;
826  int sfd = -1;
827  ev::io sfdev;
828 
829  /* prepare for new client arriving on socket.
830  * exit if trouble.
831  */
832  void accept();
833  void ioCb(ev::io &watcher, int revents);
834  public:
835  TcpServer(int port);
836 
837  /* create the public INDI Driver endpoint lsocket on port.
838  * return server socket else exit.
839  */
840  void listen();
841 };
842 
843 #ifdef ENABLE_INDI_SHARED_MEMORY
844 
845 class UnixServer
846 {
847  std::string path;
848  int sfd = -1;
849  ev::io sfdev;
850 
851  void accept();
852  void ioCb(ev::io &watcher, int revents);
853 
854  virtual void log(const std::string &log) const;
855  public:
856  UnixServer(const std::string &path);
857 
858  /* create the public INDI Driver endpoint over UNIX (local) domain.
859  * exit on failure
860  */
861  void listen();
862 
863  static std::string unixSocketPath;
864 };
865 
866 std::string UnixServer::unixSocketPath = INDIUNIXSOCK;
867 
868 #endif
869 
870 static void log(const std::string &log);
871 /* Turn a printf format into std::string */
872 static std::string fmt(const char * fmt, ...) __attribute__ ((format (printf, 1, 0)));
873 
874 static char *indi_tstamp(char *s);
875 
876 static const char *me; /* our name */
877 static int port = INDIPORT; /* public INDI port */
878 static int verbose; /* chattiness */
879 static char *ldir; /* where to log driver messages */
880 static unsigned int maxqsiz = (DEFMAXQSIZ * 1024 * 1024); /* kill if these bytes behind */
881 static unsigned int maxstreamsiz = (DEFMAXSSIZ * 1024 * 1024); /* drop blobs if these bytes behind while streaming*/
882 static int maxrestarts = DEFMAXRESTART;
883 
884 static std::vector<XMLEle *> findBlobElements(XMLEle * root);
885 
886 static void logStartup(int ac, char *av[]);
887 static void usage(void);
888 static void noSIGPIPE(void);
889 static char *indi_tstamp(char *s);
890 static void logDMsg(XMLEle *root, const char *dev);
891 static void Bye(void);
892 
893 static int readFdError(int
894  fd); /* Read a pending error condition on the given fd. Return errno value or 0 if none */
895 
896 static void * attachSharedBuffer(int fd, size_t &size);
897 static void dettachSharedBuffer(int fd, void * ptr, size_t size);
898 
899 int main(int ac, char *av[])
900 {
901  /* log startup */
902  logStartup(ac, av);
903 
904  /* save our name */
905  me = av[0];
906 
907 #ifdef OSX_EMBEDED_MODE
908 
909  char logname[128];
910  snprintf(logname, 128, LOGNAME, getlogin());
911  fprintf(stderr, "switching stderr to %s", logname);
912  freopen(logname, "w", stderr);
913 
914  fifo = new Fifo();
915  fifo->name = FIFONAME;
916  verbose = 1;
917  ac = 0;
918 
919 #else
920 
921  /* crack args */
922  while ((--ac > 0) && ((*++av)[0] == '-'))
923  {
924  char *s;
925  for (s = av[0] + 1; *s != '\0'; s++)
926  switch (*s)
927  {
928  case 'l':
929  if (ac < 2)
930  {
931  fprintf(stderr, "-l requires log directory\n");
932  usage();
933  }
934  ldir = *++av;
935  ac--;
936  break;
937  case 'm':
938  if (ac < 2)
939  {
940  fprintf(stderr, "-m requires max MB behind\n");
941  usage();
942  }
943  maxqsiz = 1024 * 1024 * atoi(*++av);
944  ac--;
945  break;
946  case 'p':
947  if (ac < 2)
948  {
949  fprintf(stderr, "-p requires port value\n");
950  usage();
951  }
952  port = atoi(*++av);
953  ac--;
954  break;
955  case 'd':
956  if (ac < 2)
957  {
958  fprintf(stderr, "-d requires max stream MB behind\n");
959  usage();
960  }
961  maxstreamsiz = 1024 * 1024 * atoi(*++av);
962  ac--;
963  break;
964 #ifdef ENABLE_INDI_SHARED_MEMORY
965  case 'u':
966  if (ac < 2)
967  {
968  fprintf(stderr, "-u requires local socket path\n");
969  usage();
970  }
971  UnixServer::unixSocketPath = *++av;
972  ac--;
973  break;
974 #endif // ENABLE_INDI_SHARED_MEMORY
975  case 'f':
976  if (ac < 2)
977  {
978  fprintf(stderr, "-f requires fifo node\n");
979  usage();
980  }
981  fifo = new Fifo(*++av);
982  ac--;
983  break;
984  case 'r':
985  if (ac < 2)
986  {
987  fprintf(stderr, "-r requires number of restarts\n");
988  usage();
989  }
990  maxrestarts = atoi(*++av);
991  if (maxrestarts < 0)
992  maxrestarts = 0;
993  ac--;
994  break;
995  case 'v':
996  verbose++;
997  break;
998  default:
999  usage();
1000  }
1001  }
1002 #endif
1003 
1004  /* at this point there are ac args in av[] to name our drivers */
1005  if (ac == 0 && !fifo)
1006  usage();
1007 
1008  /* take care of some unixisms */
1009  noSIGPIPE();
1010 
1011  /* start each driver */
1012  while (ac-- > 0)
1013  {
1014  std::string dvrName = *av++;
1015  DvrInfo * dr;
1016  if (dvrName.find('@') != std::string::npos)
1017  {
1018  dr = new RemoteDvrInfo();
1019  }
1020  else
1021  {
1022  dr = new LocalDvrInfo();
1023  }
1024  dr->name = dvrName;
1025  dr->start();
1026  }
1027 
1028  /* announce we are online */
1029  (new TcpServer(port))->listen();
1030 
1031 #ifdef ENABLE_INDI_SHARED_MEMORY
1032  /* create a new unix server */
1033  (new UnixServer(UnixServer::unixSocketPath))->listen();
1034 #endif
1035  /* Load up FIFO, if available */
1036  if (fifo)
1037  {
1038  // New started drivers will not inherit server's prefix anymore
1039 
1040  // JM 2022.07.23: This causes an issue on MacOS. Disabled for now until investigated further.
1041  //unsetenv("INDIPREFIX");
1042  fifo->listen();
1043  }
1044 
1045  /* handle new clients and all io */
1046  loop.loop();
1047 
1048  /* will not happen unless no more listener left ! */
1049  log("unexpected return from event loop\n");
1050  return (1);
1051 }
1052 
1053 /* record we have started and our args */
1054 static void logStartup(int ac, char *av[])
1055 {
1056  int i;
1057 
1058  std::string startupMsg = "startup:";
1059  for (i = 0; i < ac; i++)
1060  {
1061  startupMsg += " ";
1062  startupMsg += av[i];
1063  }
1064  startupMsg += '\n';
1065  log(startupMsg);
1066 }
1067 
1068 /* print usage message and exit (2) */
1069 static void usage(void)
1070 {
1071  fprintf(stderr, "Usage: %s [options] driver [driver ...]\n", me);
1072  fprintf(stderr, "Purpose: server for local and remote INDI drivers\n");
1073  fprintf(stderr, "INDI Library: %s\nCode %s. Protocol %g.\n", CMAKE_INDI_VERSION_STRING, GIT_TAG_STRING, INDIV);
1074  fprintf(stderr, "Options:\n");
1075  fprintf(stderr, " -l d : log driver messages to <d>/YYYY-MM-DD.islog\n");
1076  fprintf(stderr, " -m m : kill client if gets more than this many MB behind, default %d\n", DEFMAXQSIZ);
1077  fprintf(stderr,
1078  " -d m : drop streaming blobs if client gets more than this many MB behind, default %d. 0 to disable\n",
1079  DEFMAXSSIZ);
1080 #ifdef ENABLE_INDI_SHARED_MEMORY
1081  fprintf(stderr, " -u path : Path for the local connection socket (abstract), default %s\n", INDIUNIXSOCK);
1082 #endif
1083  fprintf(stderr, " -p p : alternate IP port, default %d\n", INDIPORT);
1084  fprintf(stderr, " -r r : maximum driver restarts on error, default %d\n", DEFMAXRESTART);
1085  fprintf(stderr, " -f path : Path to fifo for dynamic startup and shutdown of drivers.\n");
1086  fprintf(stderr, " -v : show key events, no traffic\n");
1087  fprintf(stderr, " -vv : -v + key message content\n");
1088  fprintf(stderr, " -vvv : -vv + complete xml\n");
1089  fprintf(stderr, "driver : executable or [device]@host[:port]\n");
1090 
1091  exit(2);
1092 }
1093 
1094 /* turn off SIGPIPE on bad write so we can handle it inline */
1095 static void noSIGPIPE()
1096 {
1097  struct sigaction sa;
1098  memset(&sa, 0, sizeof(sa));
1099  sa.sa_handler = SIG_IGN;
1100  sigemptyset(&sa.sa_mask);
1101  (void)sigaction(SIGPIPE, &sa, NULL);
1102 }
1103 
1104 /* start the given local INDI driver process.
1105  * exit if trouble.
1106  */
1108 {
1109  Msg *mp;
1110  int rp[2], wp[2], ep[2];
1111  int ux[2];
1112  int pid;
1113 
1114 #ifdef OSX_EMBEDED_MODE
1115  fprintf(stderr, "STARTING \"%s\"\n", name.c_str());
1116  fflush(stderr);
1117 #endif
1118 
1119  /* build three pipes: r, w and error*/
1120  if (useSharedBuffer)
1121  {
1122  // FIXME: lots of FD are opened by indiserver. FD_CLOEXEC is a must + check other fds
1123  if (socketpair(AF_UNIX, SOCK_STREAM, 0, ux) == -1)
1124  {
1125  log(fmt("socketpair: %s\n", strerror(errno)));
1126  Bye();
1127  }
1128  }
1129  else
1130  {
1131  if (pipe(rp) < 0)
1132  {
1133  log(fmt("read pipe: %s\n", strerror(errno)));
1134  Bye();
1135  }
1136  if (pipe(wp) < 0)
1137  {
1138  log(fmt("write pipe: %s\n", strerror(errno)));
1139  Bye();
1140  }
1141  }
1142  if (pipe(ep) < 0)
1143  {
1144  log(fmt("stderr pipe: %s\n", strerror(errno)));
1145  Bye();
1146  }
1147 
1148  /* fork&exec new process */
1149  pid = fork();
1150  if (pid < 0)
1151  {
1152  log(fmt("fork: %s\n", strerror(errno)));
1153  Bye();
1154  }
1155  if (pid == 0)
1156  {
1157  /* child: exec name */
1158  int fd;
1159 
1160  /* rig up pipes */
1161  if (useSharedBuffer)
1162  {
1163  // For unix socket, the same socket end can be used for both read & write
1164  dup2(ux[0], 0); /* driver stdin reads from ux[0] */
1165  dup2(ux[0], 1); /* driver stdout writes to ux[0] */
1166  ::close(ux[0]);
1167  ::close(ux[1]);
1168  }
1169  else
1170  {
1171  dup2(wp[0], 0); /* driver stdin reads from wp[0] */
1172  dup2(rp[1], 1); /* driver stdout writes to rp[1] */
1173  }
1174  dup2(ep[1], 2); /* driver stderr writes to e[]1] */
1175  for (fd = 3; fd < 100; fd++)
1176  (void)::close(fd);
1177 
1178  if (!envDev.empty())
1179  setenv("INDIDEV", envDev.c_str(), 1);
1180  /* Only reset environment variable in case of FIFO */
1181  else if (fifo)
1182  unsetenv("INDIDEV");
1183  if (!envConfig.empty())
1184  setenv("INDICONFIG", envConfig.c_str(), 1);
1185  else if (fifo)
1186  unsetenv("INDICONFIG");
1187  if (!envSkel.empty())
1188  setenv("INDISKEL", envSkel.c_str(), 1);
1189  else if (fifo)
1190  unsetenv("INDISKEL");
1191  std::string executable;
1192  if (!envPrefix.empty())
1193  {
1194  setenv("INDIPREFIX", envPrefix.c_str(), 1);
1195 #if defined(OSX_EMBEDED_MODE)
1196  executable = envPrefix + "/Contents/MacOS/" + name;
1197 #elif defined(__APPLE__)
1198  executable = envPrefix + "/" + name;
1199 #else
1200  executable = envPrefix + "/bin/" + name;
1201 #endif
1202 
1203  fprintf(stderr, "%s\n", executable.c_str());
1204 
1205  execlp(executable.c_str(), name.c_str(), NULL);
1206  }
1207  else
1208  {
1209  if (name[0] == '.')
1210  {
1211  executable = std::string(dirname((char*)me)) + "/" + name;
1212  execlp(executable.c_str(), name.c_str(), NULL);
1213  }
1214  else
1215  {
1216  execlp(name.c_str(), name.c_str(), NULL);
1217  }
1218  }
1219 
1220 #ifdef OSX_EMBEDED_MODE
1221  fprintf(stderr, "FAILED \"%s\"\n", name.c_str());
1222  fflush(stderr);
1223 #endif
1224  log(fmt("execlp %s: %s\n", executable.c_str(), strerror(errno)));
1225  _exit(1); /* parent will notice EOF shortly */
1226  }
1227 
1228  if (useSharedBuffer)
1229  {
1230  /* don't need child's other socket end */
1231  ::close(ux[0]);
1232 
1233  /* record pid, io channels, init lp and snoop list */
1234  setFds(ux[1], ux[1]);
1235  rp[0] = ux[1];
1236  wp[1] = ux[1];
1237  }
1238  else
1239  {
1240  /* don't need child's side of pipes */
1241  ::close(wp[0]);
1242  ::close(rp[1]);
1243 
1244  /* record pid, io channels, init lp and snoop list */
1245  setFds(rp[0], wp[1]);
1246  }
1247 
1248  ::close(ep[1]);
1249 
1250  // Watch pid
1251  this->pid = pid;
1252  this->pidwatcher.set(pid);
1253  this->pidwatcher.start();
1254 
1255  // Watch input on efd
1256  this->efd = ep[0];
1257  fcntl(this->efd, F_SETFL, fcntl(this->efd, F_GETFL, 0) | O_NONBLOCK);
1258  this->eio.start(this->efd, ev::READ);
1259 
1260  /* first message primes driver to report its properties -- dev known
1261  * if restarting
1262  */
1263  if (verbose > 0)
1264  log(fmt("pid=%d rfd=%d wfd=%d efd=%d\n", pid, rp[0], wp[1], ep[0]));
1265 
1266  XMLEle *root = addXMLEle(NULL, "getProperties");
1267  addXMLAtt(root, "version", TO_STRING(INDIV));
1268  mp = new Msg(nullptr, root);
1269 
1270  // pushmsg can kill mp. do at end
1271  pushMsg(mp);
1272 }
1273 
1274 void RemoteDvrInfo::extractRemoteId(const std::string &name, std::string &o_host, int &o_port, std::string &o_dev) const
1275 {
1276  char dev[MAXINDIDEVICE] = {0};
1277  char host[MAXSBUF] = {0};
1278 
1279  /* extract host and port from name*/
1280  int indi_port = INDIPORT;
1281  if (sscanf(name.c_str(), "%[^@]@%[^:]:%d", dev, host, &indi_port) < 2)
1282  {
1283  // Device missing? Try a different syntax for all devices
1284  if (sscanf(name.c_str(), "@%[^:]:%d", host, &indi_port) < 1)
1285  {
1286  log(fmt("Bad remote device syntax: %s\n", name.c_str()));
1287  Bye();
1288  }
1289  }
1290 
1291  o_host = host;
1292  o_port = indi_port;
1293  o_dev = dev;
1294 }
1295 
1296 /* start the given remote INDI driver connection.
1297  * exit if trouble.
1298  */
1300 {
1301  int sockfd;
1302  std::string dev;
1303  extractRemoteId(name, host, port, dev);
1304 
1305  /* connect */
1306  sockfd = openINDIServer();
1307 
1308  /* record flag pid, io channels, init lp and snoop list */
1309 
1310  this->setFds(sockfd, sockfd);
1311 
1312  if (verbose > 0)
1313  log(fmt("socket=%d\n", sockfd));
1314 
1315  /* N.B. storing name now is key to limiting outbound traffic to this
1316  * dev.
1317  */
1318  if (!dev.empty())
1319  this->dev.insert(dev);
1320 
1321  /* Sending getProperties with device lets remote server limit its
1322  * outbound (and our inbound) traffic on this socket to this device.
1323  */
1324  XMLEle *root = addXMLEle(NULL, "getProperties");
1325 
1326  if (!dev.empty())
1327  {
1328  addXMLAtt(root, "device", dev.c_str());
1329  addXMLAtt(root, "version", TO_STRING(INDIV));
1330  }
1331  else
1332  {
1333  // This informs downstream server that it is connecting to an upstream server
1334  // and not a regular client. The difference is in how it treats snooping properties
1335  // among properties.
1336  addXMLAtt(root, "device", "*");
1337  addXMLAtt(root, "version", TO_STRING(INDIV));
1338  }
1339 
1340  Msg *mp = new Msg(nullptr, root);
1341 
1342  // pushmsg can kill this. do at end
1343  pushMsg(mp);
1344 }
1345 
1346 int RemoteDvrInfo::openINDIServer()
1347 {
1348  struct sockaddr_in serv_addr;
1349  struct hostent *hp;
1350  int sockfd;
1351 
1352  /* lookup host address */
1353  hp = gethostbyname(host.c_str());
1354  if (!hp)
1355  {
1356  log(fmt("gethostbyname(%s): %s\n", host.c_str(), strerror(errno)));
1357  Bye();
1358  }
1359 
1360  /* create a socket to the INDI server */
1361  (void)memset((char *)&serv_addr, 0, sizeof(serv_addr));
1362  serv_addr.sin_family = AF_INET;
1363  serv_addr.sin_addr.s_addr = ((struct in_addr *)(hp->h_addr_list[0]))->s_addr;
1364  serv_addr.sin_port = htons(port);
1365  if ((sockfd = socket(AF_INET, SOCK_STREAM, 0)) < 0)
1366  {
1367  log(fmt("socket(%s,%d): %s\n", host.c_str(), port, strerror(errno)));
1368  Bye();
1369  }
1370 
1371  /* connect */
1372  if (connect(sockfd, (struct sockaddr *)&serv_addr, sizeof(serv_addr)) < 0)
1373  {
1374  log(fmt("connect(%s,%d): %s\n", host.c_str(), port, strerror(errno)));
1375  Bye();
1376  }
1377 
1378  /* ok */
1379  return (sockfd);
1380 }
1381 
1382 #ifdef ENABLE_INDI_SHARED_MEMORY
1383 
1384 UnixServer::UnixServer(const std::string &path): path(path)
1385 {
1386  sfdev.set<UnixServer, &UnixServer::ioCb>(this);
1387 }
1388 
1389 void UnixServer::log(const std::string &str) const
1390 {
1391  std::string logLine = "Local server: ";
1392  logLine += str;
1393  ::log(logLine);
1394 }
1395 
1396 void UnixServer::ioCb(ev::io &, int revents)
1397 {
1398  if (revents & EV_ERROR)
1399  {
1400  int sockErrno = readFdError(this->sfd);
1401  if (sockErrno)
1402  {
1403  log(fmt("Error on unix socket: %s\n", strerror(sockErrno)));
1404  Bye();
1405  }
1406  }
1407  if (revents & EV_READ)
1408  {
1409  accept();
1410  }
1411 }
1412 
1413 static void initUnixSocketAddr(const std::string &unixAddr, struct sockaddr_un &serv_addr_un, socklen_t &addrlen, bool bind)
1414 {
1415  memset(&serv_addr_un, 0, sizeof(serv_addr_un));
1416  serv_addr_un.sun_family = AF_UNIX;
1417 
1418 #ifdef __linux__
1419  (void) bind;
1420 
1421  // Using abstract socket path to avoid filesystem boilerplate
1422  strncpy(serv_addr_un.sun_path + 1, unixAddr.c_str(), sizeof(serv_addr_un.sun_path) - 1);
1423 
1424  int len = offsetof(struct sockaddr_un, sun_path) + unixAddr.size() + 1;
1425 
1426  addrlen = len;
1427 #else
1428  // Using filesystem socket path
1429  strncpy(serv_addr_un.sun_path, unixAddr.c_str(), sizeof(serv_addr_un.sun_path) - 1);
1430 
1431  int len = offsetof(struct sockaddr_un, sun_path) + unixAddr.size();
1432 
1433  if (bind)
1434  {
1435  unlink(unixAddr.c_str());
1436  }
1437 #endif
1438  addrlen = len;
1439 }
1440 
1441 void UnixServer::listen()
1442 {
1443  struct sockaddr_un serv_socket;
1444 
1445  /* make socket endpoint */
1446  if ((sfd = socket(AF_UNIX, SOCK_STREAM, 0)) < 0)
1447  {
1448  log(fmt("socket: %s\n", strerror(errno)));
1449  Bye();
1450  }
1451 
1452  int reuse = 1;
1453  if (setsockopt(sfd, SOL_SOCKET, SO_REUSEADDR, &reuse, sizeof(reuse)) < 0)
1454  {
1455  log(fmt("setsockopt: %s\n", strerror(errno)));
1456  Bye();
1457  }
1458 
1459  /* bind to given path as unix address */
1460  socklen_t len;
1461  initUnixSocketAddr(path, serv_socket, len, true);
1462 
1463  if (bind(sfd, (struct sockaddr *)&serv_socket, len) < 0)
1464  {
1465  log(fmt("bind: %s\n", strerror(errno)));
1466  Bye();
1467  }
1468 
1469  /* willing to accept connections with a backlog of 5 pending */
1470  if (::listen(sfd, 5) < 0)
1471  {
1472  log(fmt("listen: %s\n", strerror(errno)));
1473  Bye();
1474  }
1475 
1476  fcntl(sfd, F_SETFL, fcntl(sfd, F_GETFL, 0) | O_NONBLOCK);
1477  sfdev.start(sfd, EV_READ);
1478 
1479  /* ok */
1480  if (verbose > 0)
1481  log(fmt("listening on local domain at: @%s\n", path.c_str()));
1482 }
1483 
1484 void UnixServer::accept()
1485 {
1486  int cli_fd;
1487 
1488  /* get a private connection to new client */
1489  cli_fd = ::accept(sfd, 0, 0);
1490  if (cli_fd < 0)
1491  {
1492  if (errno == EAGAIN || errno == EWOULDBLOCK) return;
1493 
1494  log(fmt("accept: %s\n", strerror(errno)));
1495  Bye();
1496  }
1497 
1498  ClInfo * cp = new ClInfo(true);
1499 
1500  /* rig up new clinfo entry */
1501  cp->setFds(cli_fd, cli_fd);
1502 
1503  if (verbose > 0)
1504  {
1505 #ifdef SO_PEERCRED
1506  struct ucred ucred;
1507 
1508  socklen_t len = sizeof(struct ucred);
1509  if (getsockopt(cli_fd, SOL_SOCKET, SO_PEERCRED, &ucred, &len) == -1)
1510  {
1511  log(fmt("getsockopt failed: %s\n", strerror(errno)));
1512  Bye();
1513  }
1514 
1515  cp->log(fmt("new arrival from local pid %ld (user: %ld:%ld) - welcome!\n", (long)ucred.pid, (long)ucred.uid,
1516  (long)ucred.gid));
1517 #else
1518  cp->log(fmt("new arrival from local domain - welcome!\n"));
1519 #endif
1520  }
1521 
1522 #ifdef OSX_EMBEDED_MODE
1523  fprintf(stderr, "CLIENTS %d\n", clients.size());
1524  fflush(stderr);
1525 #endif
1526 }
1527 
1528 #endif // ENABLE_INDI_SHARED_MEMORY
1529 
1530 TcpServer::TcpServer(int port): port(port)
1531 {
1532  sfdev.set<TcpServer, &TcpServer::ioCb>(this);
1533 }
1534 
1535 void TcpServer::ioCb(ev::io &, int revents)
1536 {
1537  if (revents & EV_ERROR)
1538  {
1539  int sockErrno = readFdError(this->sfd);
1540  if (sockErrno)
1541  {
1542  log(fmt("Error on tcp server socket: %s\n", strerror(sockErrno)));
1543  Bye();
1544  }
1545  }
1546  if (revents & EV_READ)
1547  {
1548  accept();
1549  }
1550 }
1551 
1553 {
1554  struct sockaddr_in serv_socket;
1555  int reuse = 1;
1556 
1557  /* make socket endpoint */
1558  if ((sfd = socket(AF_INET, SOCK_STREAM, 0)) < 0)
1559  {
1560  log(fmt("socket: %s\n", strerror(errno)));
1561  Bye();
1562  }
1563 
1564  /* bind to given port for any IP address */
1565  memset(&serv_socket, 0, sizeof(serv_socket));
1566  serv_socket.sin_family = AF_INET;
1567 #ifdef SSH_TUNNEL
1568  serv_socket.sin_addr.s_addr = htonl(INADDR_LOOPBACK);
1569 #else
1570  serv_socket.sin_addr.s_addr = htonl(INADDR_ANY);
1571 #endif
1572  serv_socket.sin_port = htons((unsigned short)port);
1573  if (setsockopt(sfd, SOL_SOCKET, SO_REUSEADDR, &reuse, sizeof(reuse)) < 0)
1574  {
1575  log(fmt("setsockopt: %s\n", strerror(errno)));
1576  Bye();
1577  }
1578  if (bind(sfd, (struct sockaddr *)&serv_socket, sizeof(serv_socket)) < 0)
1579  {
1580  log(fmt("bind: %s\n", strerror(errno)));
1581  Bye();
1582  }
1583 
1584  /* willing to accept connections with a backlog of 5 pending */
1585  if (::listen(sfd, 5) < 0)
1586  {
1587  log(fmt("listen: %s\n", strerror(errno)));
1588  Bye();
1589  }
1590 
1591  fcntl(sfd, F_SETFL, fcntl(sfd, F_GETFL, 0) | O_NONBLOCK);
1592  sfdev.start(sfd, EV_READ);
1593 
1594  /* ok */
1595  if (verbose > 0)
1596  log(fmt("listening to port %d on fd %d\n", port, sfd));
1597 }
1598 
1599 void TcpServer::accept()
1600 {
1601  struct sockaddr_in cli_socket;
1602  socklen_t cli_len;
1603  int cli_fd;
1604 
1605  /* get a private connection to new client */
1606  cli_len = sizeof(cli_socket);
1607  cli_fd = ::accept(sfd, (struct sockaddr *)&cli_socket, &cli_len);
1608  if (cli_fd < 0)
1609  {
1610  if (errno == EAGAIN || errno == EWOULDBLOCK) return;
1611 
1612  log(fmt("accept: %s\n", strerror(errno)));
1613  Bye();
1614  }
1615 
1616  ClInfo * cp = new ClInfo(false);
1617 
1618  /* rig up new clinfo entry */
1619  cp->setFds(cli_fd, cli_fd);
1620 
1621  if (verbose > 0)
1622  {
1623  cp->log(fmt("new arrival from %s:%d - welcome!\n",
1624  inet_ntoa(cli_socket.sin_addr), ntohs(cli_socket.sin_port)));
1625  }
1626 #ifdef OSX_EMBEDED_MODE
1627  fprintf(stderr, "CLIENTS %d\n", clients.size());
1628  fflush(stderr);
1629 #endif
1630 }
1631 
1632 Fifo::Fifo(const std::string &name) : name(name)
1633 {
1634  fdev.set<Fifo, &Fifo::ioCb>(this);
1635 }
1636 
1637 /* Attempt to open up FIFO */
1638 void Fifo::close(void)
1639 {
1640  if (fd != -1)
1641  {
1642  ::close(fd);
1643  fd = -1;
1644  fdev.stop();
1645  }
1646  bufferPos = 0;
1647 }
1648 
1649 void Fifo::open()
1650 {
1651  /* Open up FIFO, if available */
1652  fd = ::open(name.c_str(), O_RDONLY | O_NONBLOCK | O_CLOEXEC);
1653 
1654  if (fd < 0)
1655  {
1656  log(fmt("open(%s): %s.\n", name.c_str(), strerror(errno)));
1657  Bye();
1658  }
1659 
1660  fdev.start(fd, EV_READ);
1661 }
1662 
1663 
1664 /* Handle one fifo command. Start/stop drivers accordingly */
1665 void Fifo::processLine(const char * line)
1666 {
1667 
1668  if (verbose)
1669  log(fmt("FIFO: %s\n", line));
1670 
1671  char cmd[MAXSBUF], arg[4][1], var[4][MAXSBUF], tDriver[MAXSBUF], tName[MAXSBUF], envConfig[MAXSBUF],
1672  envSkel[MAXSBUF], envPrefix[MAXSBUF];
1673 
1674  memset(&tDriver[0], 0, sizeof(char) * MAXSBUF);
1675  memset(&tName[0], 0, sizeof(char) * MAXSBUF);
1676  memset(&envConfig[0], 0, sizeof(char) * MAXSBUF);
1677  memset(&envSkel[0], 0, sizeof(char) * MAXSBUF);
1678  memset(&envPrefix[0], 0, sizeof(char) * MAXSBUF);
1679 
1680  int n = 0;
1681 
1682  bool remoteDriver = !!strstr(line, "@");
1683 
1684  // If remote driver
1685  if (remoteDriver)
1686  {
1687  n = sscanf(line, "%s %511[^\n]", cmd, tDriver);
1688 
1689  // Remove quotes if any
1690  char *ptr = tDriver;
1691  int len = strlen(tDriver);
1692  while ((ptr = strstr(tDriver, "\"")))
1693  {
1694  memmove(ptr, ptr + 1, --len);
1695  ptr[len] = '\0';
1696  }
1697  }
1698  // If local driver
1699  else
1700  {
1701  n = sscanf(line, "%s %s -%1c \"%511[^\"]\" -%1c \"%511[^\"]\" -%1c \"%511[^\"]\" -%1c \"%511[^\"]\"", cmd,
1702  tDriver, arg[0], var[0], arg[1], var[1], arg[2], var[2], arg[3], var[3]);
1703  }
1704 
1705  int n_args = (n - 2) / 2;
1706 
1707  int j = 0;
1708  for (j = 0; j < n_args; j++)
1709  {
1710  if (arg[j][0] == 'n')
1711  {
1712  strncpy(tName, var[j], MAXSBUF - 1);
1713  tName[MAXSBUF - 1] = '\0';
1714 
1715  if (verbose)
1716  log(fmt("With name: %s\n", tName));
1717  }
1718  else if (arg[j][0] == 'c')
1719  {
1720  strncpy(envConfig, var[j], MAXSBUF - 1);
1721  envConfig[MAXSBUF - 1] = '\0';
1722 
1723  if (verbose)
1724  log(fmt("With config: %s\n", envConfig));
1725  }
1726  else if (arg[j][0] == 's')
1727  {
1728  strncpy(envSkel, var[j], MAXSBUF - 1);
1729  envSkel[MAXSBUF - 1] = '\0';
1730 
1731  if (verbose)
1732  log(fmt("With skeketon: %s\n", envSkel));
1733  }
1734  else if (arg[j][0] == 'p')
1735  {
1736  strncpy(envPrefix, var[j], MAXSBUF - 1);
1737  envPrefix[MAXSBUF - 1] = '\0';
1738 
1739  if (verbose)
1740  log(fmt("With prefix: %s\n", envPrefix));
1741  }
1742  }
1743 
1744  bool startCmd;
1745  if (!strcmp(cmd, "start"))
1746  startCmd = 1;
1747  else
1748  startCmd = 0;
1749 
1750  if (startCmd)
1751  {
1752  if (verbose)
1753  log(fmt("FIFO: Starting driver %s\n", tDriver));
1754 
1755  DvrInfo * dp;
1756  if (remoteDriver == 0)
1757  {
1758  auto * localDp = new LocalDvrInfo();
1759  dp = localDp;
1760  //strncpy(dp->dev, tName, MAXINDIDEVICE);
1761  localDp->envDev = tName;
1762  localDp->envConfig = envConfig;
1763  localDp->envSkel = envSkel;
1764  localDp->envPrefix = envPrefix;
1765  }
1766  else
1767  {
1768  dp = new RemoteDvrInfo();
1769  }
1770  dp->name = tDriver;
1771  dp->start();
1772  }
1773  else
1774  {
1775  for (auto dp : DvrInfo::drivers)
1776  {
1777  if (dp == nullptr) continue;
1778 
1779  if (dp->name == tDriver)
1780  {
1781  /* If device name is given, check against it before shutting down */
1782  if (tName[0] && !dp->isHandlingDevice(tName))
1783  continue;
1784  if (verbose)
1785  log(fmt("FIFO: Shutting down driver: %s\n", tDriver));
1786 
1787  dp->restart = false;
1788  dp->close();
1789  break;
1790  }
1791  }
1792  }
1793 }
1794 
1795 void Fifo::read(void)
1796 {
1797  int rd = ::read(fd, buffer + bufferPos, sizeof(buffer) - 1 - bufferPos);
1798  if (rd == 0)
1799  {
1800  if (bufferPos > 0)
1801  {
1802  buffer[bufferPos] = '\0';
1803  processLine(buffer);
1804  }
1805  close();
1806  open();
1807  return;
1808  }
1809  if (rd == -1)
1810  {
1811  if (errno == EAGAIN || errno == EWOULDBLOCK) return;
1812 
1813  log(fmt("Fifo error: %s\n", strerror(errno)));
1814  close();
1815  open();
1816  return;
1817  }
1818 
1819  bufferPos += rd;
1820 
1821  for(int i = 0; i < bufferPos; ++i)
1822  {
1823  if (buffer[i] == '\n')
1824  {
1825  buffer[i] = 0;
1826  processLine(buffer);
1827  // shift the buffer
1828  i++; /* count including nl */
1829  bufferPos -= i; /* remove from nexbuf */
1830  memmove(buffer, buffer + i, bufferPos); /* slide remaining to front */
1831  i = -1; /* restart for loop scan */
1832  }
1833  }
1834 
1835  if ((unsigned)bufferPos >= sizeof(buffer) - 1)
1836  {
1837  log(fmt("Fifo overflow"));
1838  close();
1839  open();
1840  }
1841 }
1842 
1843 void Fifo::ioCb(ev::io &, int revents)
1844 {
1845  if (EV_ERROR & revents)
1846  {
1847  int sockErrno = readFdError(this->fd);
1848  if (sockErrno)
1849  {
1850  log(fmt("Error on fifo: %s\n", strerror(sockErrno)));
1851  close();
1852  open();
1853  }
1854  }
1855  else if (revents & EV_READ)
1856  {
1857  read();
1858  }
1859 }
1860 
1861 // root will be released
1862 void ClInfo::onMessage(XMLEle * root, std::list<int> &sharedBuffers)
1863 {
1864  char *roottag = tagXMLEle(root);
1865 
1866  const char *dev = findXMLAttValu(root, "device");
1867  const char *name = findXMLAttValu(root, "name");
1868  int isblob = !strcmp(tagXMLEle(root), "setBLOBVector");
1869 
1870  /* snag interested properties.
1871  * N.B. don't open to alldevs if seen specific dev already, else
1872  * remote client connections start returning too much.
1873  */
1874  if (dev[0])
1875  {
1876  // Signature for CHAINED SERVER
1877  // Not a regular client.
1878  if (dev[0] == '*' && !this->props.size())
1879  this->allprops = 2;
1880  else
1881  addDevice(dev, name, isblob);
1882  }
1883  else if (!strcmp(roottag, "getProperties") && !this->props.size() && this->allprops != 2)
1884  this->allprops = 1;
1885 
1886  /* snag enableBLOB -- send to remote drivers too */
1887  if (!strcmp(roottag, "enableBLOB"))
1888  crackBLOBHandling(dev, name, pcdataXMLEle(root));
1889 
1890  if (!strcmp(roottag, "pingRequest"))
1891  {
1892  setXMLEleTag(root, "pingReply");
1893 
1894  Msg * mp = new Msg(this, root);
1895  pushMsg(mp);
1896  mp->queuingDone();
1897  return;
1898  }
1899 
1900  /* build a new message -- set content iff anyone cares */
1901  Msg* mp = Msg::fromXml(this, root, sharedBuffers);
1902  if (!mp)
1903  {
1904  log("Closing after malformed message\n");
1905  close();
1906  return;
1907  }
1908 
1909  /* send message to driver(s) responsible for dev */
1910  DvrInfo::q2RDrivers(dev, mp, root);
1911 
1912  /* JM 2016-05-18: Upstream client can be a chained INDI server. If any driver locally is snooping
1913  * on any remote drivers, we should catch it and forward it to the responsible snooping driver. */
1914  /* send to snooping drivers. */
1915  // JM 2016-05-26: Only forward setXXX messages
1916  if (!strncmp(roottag, "set", 3))
1917  DvrInfo::q2SDrivers(NULL, isblob, dev, name, mp, root);
1918 
1919  /* echo new* commands back to other clients */
1920  if (!strncmp(roottag, "new", 3))
1921  {
1922  q2Clients(this, isblob, dev, name, mp, root);
1923  }
1924 
1925  mp->queuingDone();
1926 }
1927 
1928 void DvrInfo::onMessage(XMLEle * root, std::list<int> &sharedBuffers)
1929 {
1930  char *roottag = tagXMLEle(root);
1931  const char *dev = findXMLAttValu(root, "device");
1932  const char *name = findXMLAttValu(root, "name");
1933  int isblob = !strcmp(tagXMLEle(root), "setBLOBVector");
1934 
1935  if (verbose > 2)
1936  traceMsg("read ", root);
1937  else if (verbose > 1)
1938  {
1939  log(fmt("read <%s device='%s' name='%s'>\n",
1940  tagXMLEle(root), findXMLAttValu(root, "device"), findXMLAttValu(root, "name")));
1941  }
1942 
1943  /* that's all if driver is just registering a snoop */
1944  /* JM 2016-05-18: Send getProperties to upstream chained servers as well.*/
1945  if (!strcmp(roottag, "getProperties"))
1946  {
1947  this->addSDevice(dev, name);
1948  Msg *mp = new Msg(this, root);
1949  /* send to interested chained servers upstream */
1950  // FIXME: no use of root here
1951  ClInfo::q2Servers(this, mp, root);
1952  /* Send to snooped drivers if they exist so that they can echo back the snooped propertly immediately */
1953  // FIXME: no use of root here
1954  q2RDrivers(dev, mp, root);
1955 
1956  mp->queuingDone();
1957 
1958  return;
1959  }
1960 
1961  /* that's all if driver desires to snoop BLOBs from other drivers */
1962  if (!strcmp(roottag, "enableBLOB"))
1963  {
1964  Property *sp = findSDevice(dev, name);
1965  if (sp)
1966  crackBLOB(pcdataXMLEle(root), &sp->blob);
1967  delXMLEle(root);
1968  return;
1969  }
1970 
1971  /* Found a new device? Let's add it to driver info */
1972  if (dev[0] && !this->isHandlingDevice(dev))
1973  {
1974 #ifdef OSX_EMBEDED_MODE
1975  if (this->dev.empty())
1976  fprintf(stderr, "STARTED \"%s\"\n", dp->name.c_str());
1977  fflush(stderr);
1978 #endif
1979  this->dev.insert(dev);
1980  }
1981 
1982  /* log messages if any and wanted */
1983  if (ldir)
1984  logDMsg(root, dev);
1985 
1986  if (!strcmp(roottag, "pingRequest"))
1987  {
1988  setXMLEleTag(root, "pingReply");
1989 
1990  Msg * mp = new Msg(this, root);
1991  pushMsg(mp);
1992  mp->queuingDone();
1993  return;
1994  }
1995 
1996  /* build a new message -- set content iff anyone cares */
1997  Msg * mp = Msg::fromXml(this, root, sharedBuffers);
1998  if (!mp)
1999  {
2000  close();
2001  return;
2002  }
2003 
2004  /* send to interested clients */
2005  ClInfo::q2Clients(NULL, isblob, dev, name, mp, root);
2006 
2007  /* send to snooping drivers */
2008  DvrInfo::q2SDrivers(this, isblob, dev, name, mp, root);
2009 
2010  /* set message content if anyone cares else forget it */
2011  mp->queuingDone();
2012 }
2013 
2015 {
2016  // Don't want any half-dead drivers
2017  close();
2018 }
2019 
2021 {
2022  if (verbose > 0)
2023  log("shut down complete - bye!\n");
2024 
2025  delete(this);
2026 
2027 #ifdef OSX_EMBEDED_MODE
2028  fprintf(stderr, "CLIENTS %d\n", clients.size());
2029  fflush(stderr);
2030 #endif
2031 }
2032 
2034 {
2035  // Tell client driver is dead.
2036  for (auto dev : dev)
2037  {
2038  /* Inform clients that this driver is dead */
2039  XMLEle *root = addXMLEle(NULL, "delProperty");
2040  addXMLAtt(root, "device", dev.c_str());
2041 
2042  prXMLEle(stderr, root, 0);
2043  Msg *mp = new Msg(this, root);
2044 
2045  ClInfo::q2Clients(NULL, 0, dev.c_str(), "", mp, root);
2046  mp->queuingDone();
2047  }
2048 
2049  bool terminate;
2050  if (!restart)
2051  {
2052  terminate = true;
2053  }
2054  else
2055  {
2056  if (restarts >= maxrestarts)
2057  {
2058  log(fmt("Terminated after #%d restarts.\n", restarts));
2059  terminate = true;
2060  }
2061  else
2062  {
2063  log(fmt("restart #%d\n", restarts));
2064  ++restarts;
2065  terminate = false;
2066  }
2067  }
2068 
2069 #ifdef OSX_EMBEDED_MODE
2070  fprintf(stderr, "STOPPED \"%s\"\n", name.c_str());
2071  fflush(stderr);
2072 #endif
2073 
2074  // FIXME: we loose stderr from dying driver
2075  if (terminate)
2076  {
2077  delete(this);
2078  if ((!fifo) && (drivers.ids().empty()))
2079  Bye();
2080  return;
2081  }
2082  else
2083  {
2084  DvrInfo * restarted = this->clone();
2085  delete(this);
2086  restarted->start();
2087  }
2088 }
2089 
2090 void DvrInfo::q2RDrivers(const std::string &dev, Msg *mp, XMLEle *root)
2091 {
2092  char *roottag = tagXMLEle(root);
2093 
2094  /* queue message to each interested driver.
2095  * N.B. don't send generic getProps to more than one remote driver,
2096  * otherwise they all fan out and we get multiple responses back.
2097  */
2098  std::set<std::string> remoteAdvertised;
2099  for (auto dpId : drivers.ids())
2100  {
2101  auto dp = drivers[dpId];
2102  if (dp == nullptr) continue;
2103 
2104  std::string remoteUid = dp->remoteServerUid();
2105  bool isRemote = !remoteUid.empty();
2106 
2107  /* driver known to not support this dev */
2108  if ((!dev.empty()) && dev[0] != '*' && !dp->isHandlingDevice(dev))
2109  continue;
2110 
2111  /* Only send message to each *unique* remote driver at a particular host:port
2112  * Since it will be propogated to all other devices there */
2113  if (dev.empty() && isRemote)
2114  {
2115  if (remoteAdvertised.find(remoteUid) != remoteAdvertised.end())
2116  continue;
2117 
2118  /* Retain last remote driver data so that we do not send the same info again to a driver
2119  * residing on the same host:port */
2120  remoteAdvertised.insert(remoteUid);
2121  }
2122 
2123  /* JM 2016-10-30: Only send enableBLOB to remote drivers */
2124  if (isRemote == 0 && !strcmp(roottag, "enableBLOB"))
2125  continue;
2126 
2127  /* ok: queue message to this driver */
2128  if (verbose > 1)
2129  {
2130  dp->log(fmt("queuing responsible for <%s device='%s' name='%s'>\n",
2131  tagXMLEle(root), findXMLAttValu(root, "device"), findXMLAttValu(root, "name")));
2132  }
2133 
2134  // pushmsg can kill dp. do at end
2135  dp->pushMsg(mp);
2136  }
2137 }
2138 
2139 void DvrInfo::q2SDrivers(DvrInfo *me, int isblob, const std::string &dev, const std::string &name, Msg *mp, XMLEle *root)
2140 {
2141  std::string meRemoteServerUid = me ? me->remoteServerUid() : "";
2142  for (auto dpId : drivers.ids())
2143  {
2144  auto dp = drivers[dpId];
2145  if (dp == nullptr) continue;
2146 
2147  Property *sp = dp->findSDevice(dev, name);
2148 
2149  /* nothing for dp if not snooping for dev/name or wrong BLOB mode */
2150  if (!sp)
2151  continue;
2152  if ((isblob && sp->blob == B_NEVER) || (!isblob && sp->blob == B_ONLY))
2153  continue;
2154 
2155  // Do not send snoop data to remote drivers at the same host
2156  // since they will manage their own snoops remotely
2157  if ((!meRemoteServerUid.empty()) && dp->remoteServerUid() == meRemoteServerUid)
2158  continue;
2159 
2160  /* ok: queue message to this device */
2161  if (verbose > 1)
2162  {
2163  dp->log(fmt("queuing snooped <%s device='%s' name='%s'>\n",
2164  tagXMLEle(root), findXMLAttValu(root, "device"), findXMLAttValu(root, "name")));
2165  }
2166 
2167  // pushmsg can kill dp. do at end
2168  dp->pushMsg(mp);
2169  }
2170 }
2171 
2172 void DvrInfo::addSDevice(const std::string &dev, const std::string &name)
2173 {
2174  Property *sp;
2175 
2176  /* no dups */
2177  sp = findSDevice(dev, name);
2178  if (sp)
2179  return;
2180 
2181  /* add dev to sdevs list */
2182  sp = new Property(dev, name);
2183  sp->blob = B_NEVER;
2184  sprops.push_back(sp);
2185 
2186  if (verbose)
2187  log(fmt("snooping on %s.%s\n", dev.c_str(), name.c_str()));
2188 }
2189 
2190 Property * DvrInfo::findSDevice(const std::string &dev, const std::string &name) const
2191 {
2192  for(auto sp : sprops)
2193  {
2194  if ((sp->dev == dev) && (sp->name.empty() || sp->name == name))
2195  return (sp);
2196  }
2197 
2198  return nullptr;
2199 }
2200 
2201 void ClInfo::q2Clients(ClInfo *notme, int isblob, const std::string &dev, const std::string &name, Msg *mp, XMLEle *root)
2202 {
2203  /* queue message to each interested client */
2204  for (auto cpId : clients.ids())
2205  {
2206  auto cp = clients[cpId];
2207  if (cp == nullptr) continue;
2208 
2209  /* cp in use? notme? want this dev/name? blob? */
2210  if (cp == notme)
2211  continue;
2212  if (cp->findDevice(dev, name) < 0)
2213  continue;
2214 
2215  //if ((isblob && cp->blob==B_NEVER) || (!isblob && cp->blob==B_ONLY))
2216  if (!isblob && cp->blob == B_ONLY)
2217  continue;
2218 
2219  if (isblob)
2220  {
2221  if (cp->props.size() > 0)
2222  {
2223  Property *blobp = nullptr;
2224  for (auto pp : cp->props)
2225  {
2226  if (pp->dev == dev && pp->name == name)
2227  {
2228  blobp = pp;
2229  break;
2230  }
2231  }
2232 
2233  if ((blobp && blobp->blob == B_NEVER) || (!blobp && cp->blob == B_NEVER))
2234  continue;
2235  }
2236  else if (cp->blob == B_NEVER)
2237  continue;
2238  }
2239 
2240  /* shut down this client if its q is already too large */
2241  unsigned long ql = cp->msgQSize();
2242  if (isblob && maxstreamsiz > 0 && ql > maxstreamsiz)
2243  {
2244  // Drop frames for streaming blobs
2245  /* pull out each name/BLOB pair, decode */
2246  XMLEle *ep = NULL;
2247  int streamFound = 0;
2248  for (ep = nextXMLEle(root, 1); ep; ep = nextXMLEle(root, 0))
2249  {
2250  if (strcmp(tagXMLEle(ep), "oneBLOB") == 0)
2251  {
2252  XMLAtt *fa = findXMLAtt(ep, "format");
2253 
2254  if (fa && strstr(valuXMLAtt(fa), "stream"))
2255  {
2256  streamFound = 1;
2257  break;
2258  }
2259  }
2260  }
2261  if (streamFound)
2262  {
2263  if (verbose > 1)
2264  cp->log(fmt("%ld bytes behind. Dropping stream BLOB...\n", ql));
2265  continue;
2266  }
2267  }
2268  if (ql > maxqsiz)
2269  {
2270  if (verbose)
2271  cp->log(fmt("%ld bytes behind, shutting down\n", ql));
2272  cp->close();
2273  continue;
2274  }
2275 
2276  if (verbose > 1)
2277  cp->log(fmt("queuing <%s device='%s' name='%s'>\n",
2278  tagXMLEle(root), findXMLAttValu(root, "device"), findXMLAttValu(root, "name")));
2279 
2280  // pushmsg can kill cp. do at end
2281  cp->pushMsg(mp);
2282  }
2283 
2284  return;
2285 }
2286 
2287 void ClInfo::q2Servers(DvrInfo *me, Msg *mp, XMLEle *root)
2288 {
2289  int devFound = 0;
2290 
2291  /* queue message to each interested client */
2292  for (auto cpId : clients.ids())
2293  {
2294  auto cp = clients[cpId];
2295  if (cp == nullptr) continue;
2296 
2297  // Only send the message to the upstream server that is connected specfically to the device in driver dp
2298  switch (cp->allprops)
2299  {
2300  // 0 --> not all props are requested. Check for specific combination
2301  case 0:
2302  for (auto pp : cp->props)
2303  {
2304  if (me->dev.find(pp->dev) != me->dev.end())
2305  {
2306  devFound = 1;
2307  break;
2308  }
2309  }
2310  break;
2311 
2312  // All props are requested. This is client-only mode (not upstream server)
2313  case 1:
2314  break;
2315  // Upstream server mode
2316  case 2:
2317  devFound = 1;
2318  break;
2319  }
2320 
2321  // If no matching device found, continue
2322  if (devFound == 0)
2323  continue;
2324 
2325  /* shut down this client if its q is already too large */
2326  unsigned long ql = cp->msgQSize();
2327  if (ql > maxqsiz)
2328  {
2329  if (verbose)
2330  cp->log(fmt("%ld bytes behind, shutting down\n", ql));
2331  cp->close();
2332  continue;
2333  }
2334 
2335  /* ok: queue message to this client */
2336  if (verbose > 1)
2337  cp->log(fmt("queuing <%s device='%s' name='%s'>\n",
2338  tagXMLEle(root), findXMLAttValu(root, "device"), findXMLAttValu(root, "name")));
2339 
2340  // pushmsg can kill cp. do at end
2341  cp->pushMsg(mp);
2342  }
2343 }
2344 
2345 void MsgQueue::writeToFd()
2346 {
2347  ssize_t nw;
2348  void * data;
2349  ssize_t nsend;
2350  std::vector<int> sharedBuffers;
2351 
2352  /* get current message */
2353  auto mp = headMsg();
2354  if (mp == nullptr)
2355  {
2356  log("Unexpected write notification");
2357  return;
2358  }
2359 
2360 
2361  do
2362  {
2363  if (!mp->getContent(nsent, data, nsend, sharedBuffers))
2364  {
2365  wio.stop();
2366  return;
2367  }
2368 
2369  if (nsend == 0)
2370  {
2371  consumeHeadMsg();
2372  mp = headMsg();
2373  if (mp == nullptr)
2374  {
2375  return;
2376  }
2377  }
2378  }
2379  while(nsend == 0);
2380 
2381  /* send next chunk, never more than MAXWSIZ to reduce blocking */
2382  if (nsend > MAXWSIZ)
2383  nsend = MAXWSIZ;
2384 
2385  if (!useSharedBuffer)
2386  {
2387  nw = write(wFd, data, nsend);
2388  }
2389  else
2390  {
2391  struct msghdr msgh;
2392  struct iovec iov[1];
2393  int cmsghdrlength;
2394  struct cmsghdr * cmsgh;
2395 
2396  int fdCount = sharedBuffers.size();
2397  if (fdCount > 0)
2398  {
2399  if (fdCount > MAXFD_PER_MESSAGE)
2400  {
2401  log(fmt("attempt to send too many FD\n"));
2402  close();
2403  return;
2404  }
2405 
2406  cmsghdrlength = CMSG_SPACE((fdCount * sizeof(int)));
2407  // FIXME: abort on alloc error here
2408  cmsgh = (struct cmsghdr*)malloc(cmsghdrlength);
2409  memset(cmsgh, 0, cmsghdrlength);
2410 
2411  /* Write the fd as ancillary data */
2412  cmsgh->cmsg_len = CMSG_LEN(fdCount * sizeof(int));
2413  cmsgh->cmsg_level = SOL_SOCKET;
2414  cmsgh->cmsg_type = SCM_RIGHTS;
2415  msgh.msg_control = cmsgh;
2416  msgh.msg_controllen = cmsghdrlength;
2417  for(int i = 0; i < fdCount; ++i)
2418  {
2419  ((int *) CMSG_DATA(CMSG_FIRSTHDR(&msgh)))[i] = sharedBuffers[i];
2420  }
2421  }
2422  else
2423  {
2424  cmsgh = NULL;
2425  cmsghdrlength = 0;
2426  msgh.msg_control = cmsgh;
2427  msgh.msg_controllen = cmsghdrlength;
2428  }
2429 
2430  iov[0].iov_base = data;
2431  iov[0].iov_len = nsend;
2432 
2433  msgh.msg_flags = 0;
2434  msgh.msg_name = NULL;
2435  msgh.msg_namelen = 0;
2436  msgh.msg_iov = iov;
2437  msgh.msg_iovlen = 1;
2438 
2439  nw = sendmsg(wFd, &msgh, MSG_NOSIGNAL);
2440 
2441  free(cmsgh);
2442  }
2443 
2444  /* shut down if trouble */
2445  if (nw <= 0)
2446  {
2447  if (nw == 0)
2448  log("write returned 0\n");
2449  else
2450  log(fmt("write: %s\n", strerror(errno)));
2451 
2452  // Keep the read part open
2453  closeWritePart();
2454  return;
2455  }
2456 
2457  /* trace */
2458  if (verbose > 2)
2459  {
2460  log(fmt("sending msg nq %ld:\n%.*s\n",
2461  msgq.size(), (int)nw, data));
2462  }
2463  else if (verbose > 1)
2464  {
2465  log(fmt("sending %.*s\n", (int)nw, data));
2466  }
2467 
2468  /* update amount sent. when complete: free message if we are the last
2469  * to use it and pop from our queue.
2470  */
2471  mp->advance(nsent, nw);
2472  if (nsent.done())
2473  consumeHeadMsg();
2474 }
2475 
2476 void MsgQueue::log(const std::string &str) const
2477 {
2478  // This is only invoked from destructor
2479  std::string logLine = "Dying Connection ";
2480  logLine += ": ";
2481  logLine += str;
2482  ::log(logLine);
2483 }
2484 
2485 int ClInfo::findDevice(const std::string &dev, const std::string &name) const
2486 {
2487  if (allprops >= 1 || dev.empty())
2488  return (0);
2489  for (auto pp : props)
2490  {
2491  if ((pp->dev == dev) && (pp->name.empty() || (pp->name == name)))
2492  return (0);
2493  }
2494  return (-1);
2495 }
2496 
2497 void ClInfo::addDevice(const std::string &dev, const std::string &name, int isblob)
2498 {
2499  if (isblob)
2500  {
2501  for (auto pp : props)
2502  {
2503  if (pp->dev == dev && pp->name == name)
2504  return;
2505  }
2506  }
2507  /* no dups */
2508  else if (!findDevice(dev, name))
2509  return;
2510 
2511  /* add */
2512  Property *pp = new Property(dev, name);
2513  props.push_back(pp);
2514 }
2515 
2516 void MsgQueue::crackBLOB(const char *enableBLOB, BLOBHandling *bp)
2517 {
2518  if (!strcmp(enableBLOB, "Also"))
2519  *bp = B_ALSO;
2520  else if (!strcmp(enableBLOB, "Only"))
2521  *bp = B_ONLY;
2522  else if (!strcmp(enableBLOB, "Never"))
2523  *bp = B_NEVER;
2524 }
2525 
2526 void ClInfo::crackBLOBHandling(const std::string &dev, const std::string &name, const char *enableBLOB)
2527 {
2528  /* If we have EnableBLOB with property name, we add it to Client device list */
2529  if (!name.empty())
2530  addDevice(dev, name, 1);
2531  else
2532  /* Otherwise, we set the whole client blob handling to what's passed (enableBLOB) */
2533  crackBLOB(enableBLOB, &blob);
2534 
2535  /* If whole client blob handling policy was updated, we need to pass that also to all children
2536  and if the request was for a specific property, then we apply the policy to it */
2537  for (auto pp : props)
2538  {
2539  if (name.empty())
2540  crackBLOB(enableBLOB, &pp->blob);
2541  else if (pp->dev == dev && pp->name == name)
2542  {
2543  crackBLOB(enableBLOB, &pp->blob);
2544  return;
2545  }
2546  }
2547 }
2548 
2549 void MsgQueue::traceMsg(const std::string &logMsg, XMLEle *root)
2550 {
2551  log(logMsg);
2552 
2553  static const char *prtags[] =
2554  {
2555  "defNumber", "oneNumber", "defText", "oneText", "defSwitch", "oneSwitch", "defLight", "oneLight",
2556  };
2557  XMLEle *e;
2558  const char *msg, *perm, *pcd;
2559  unsigned int i;
2560 
2561  /* print tag header */
2562  fprintf(stderr, "%s %s %s %s", tagXMLEle(root), findXMLAttValu(root, "device"), findXMLAttValu(root, "name"),
2563  findXMLAttValu(root, "state"));
2564  pcd = pcdataXMLEle(root);
2565  if (pcd[0])
2566  fprintf(stderr, " %s", pcd);
2567  perm = findXMLAttValu(root, "perm");
2568  if (perm[0])
2569  fprintf(stderr, " %s", perm);
2570  msg = findXMLAttValu(root, "message");
2571  if (msg[0])
2572  fprintf(stderr, " '%s'", msg);
2573 
2574  /* print each array value */
2575  for (e = nextXMLEle(root, 1); e; e = nextXMLEle(root, 0))
2576  for (i = 0; i < sizeof(prtags) / sizeof(prtags[0]); i++)
2577  if (strcmp(prtags[i], tagXMLEle(e)) == 0)
2578  fprintf(stderr, "\n %10s='%s'", findXMLAttValu(e, "name"), pcdataXMLEle(e));
2579 
2580  fprintf(stderr, "\n");
2581 }
2582 
2583 /* fill s with current UT string.
2584  * if no s, use a static buffer
2585  * return s or buffer.
2586  * N.B. if use our buffer, be sure to use before calling again
2587  */
2588 static char *indi_tstamp(char *s)
2589 {
2590  static char sbuf[64];
2591  struct tm *tp;
2592  time_t t;
2593 
2594  time(&t);
2595  tp = gmtime(&t);
2596  if (!s)
2597  s = sbuf;
2598  strftime(s, sizeof(sbuf), "%Y-%m-%dT%H:%M:%S", tp);
2599  return (s);
2600 }
2601 
2602 /* log message in root known to be from device dev to ldir, if any.
2603  */
2604 static void logDMsg(XMLEle *root, const char *dev)
2605 {
2606  char stamp[64];
2607  char logfn[1024];
2608  const char *ts, *ms;
2609  FILE *fp;
2610 
2611  /* get message, if any */
2612  ms = findXMLAttValu(root, "message");
2613  if (!ms[0])
2614  return;
2615 
2616  /* get timestamp now if not provided */
2617  ts = findXMLAttValu(root, "timestamp");
2618  if (!ts[0])
2619  {
2620  indi_tstamp(stamp);
2621  ts = stamp;
2622  }
2623 
2624  /* append to log file, name is date portion of time stamp */
2625  sprintf(logfn, "%s/%.10s.islog", ldir, ts);
2626  fp = fopen(logfn, "a");
2627  if (!fp)
2628  return; /* oh well */
2629  fprintf(fp, "%s: %s: %s\n", ts, dev, ms);
2630  fclose(fp);
2631 }
2632 
2633 /* log when then exit */
2634 static void Bye()
2635 {
2636  fprintf(stderr, "%s: good bye\n", indi_tstamp(NULL));
2637  exit(1);
2638 }
2639 
2640 DvrInfo::DvrInfo(bool useSharedBuffer) :
2641  MsgQueue(useSharedBuffer),
2642  restarts(0)
2643 {
2644  drivers.insert(this);
2645 }
2646 
2648  MsgQueue(model.useSharedBuffer),
2649  name(model.name),
2650  restarts(model.restarts)
2651 {
2652  drivers.insert(this);
2653 }
2654 
2656 {
2657  drivers.erase(this);
2658  for(auto prop : sprops)
2659  {
2660  delete prop;
2661  }
2662 }
2663 
2664 bool DvrInfo::isHandlingDevice(const std::string &dev) const
2665 {
2666  return this->dev.find(dev) != this->dev.end();
2667 }
2668 
2669 void DvrInfo::log(const std::string &str) const
2670 {
2671  std::string logLine = "Driver ";
2672  logLine += name;
2673  logLine += ": ";
2674  logLine += str;
2675  ::log(logLine);
2676 }
2677 
2679 
2681 {
2682  eio.set<LocalDvrInfo, &LocalDvrInfo::onEfdEvent>(this);
2683  pidwatcher.set<LocalDvrInfo, &LocalDvrInfo::onPidEvent>(this);
2684 }
2685 
2687  DvrInfo(model),
2688  envDev(model.envDev),
2689  envConfig(model.envConfig),
2690  envSkel(model.envSkel),
2691  envPrefix(model.envPrefix)
2692 {
2693  eio.set<LocalDvrInfo, &LocalDvrInfo::onEfdEvent>(this);
2694  pidwatcher.set<LocalDvrInfo, &LocalDvrInfo::onPidEvent>(this);
2695 }
2696 
2698 {
2699  closeEfd();
2700  if (pid != 0)
2701  {
2702  kill(pid, SIGKILL); /* libev insures there will be no zombies */
2703  pid = 0;
2704  }
2705  closePid();
2706 }
2707 
2709 {
2710  return new LocalDvrInfo(*this);
2711 }
2712 
2713 void LocalDvrInfo::closeEfd()
2714 {
2715  ::close(efd);
2716  efd = -1;
2717  eio.stop();
2718 }
2719 
2720 void LocalDvrInfo::closePid()
2721 {
2722  pid = 0;
2723  pidwatcher.stop();
2724 }
2725 
2726 void LocalDvrInfo::onEfdEvent(ev::io &, int revents)
2727 {
2728  if (EV_ERROR & revents)
2729  {
2730  int sockErrno = readFdError(this->efd);
2731  if (sockErrno)
2732  {
2733  log(fmt("Error on stderr: %s\n", strerror(sockErrno)));
2734  closeEfd();
2735  }
2736  return;
2737  }
2738 
2739  if (revents & EV_READ)
2740  {
2741  ssize_t nr;
2742 
2743  /* read more */
2744  nr = read(efd, errbuff + errbuffpos, sizeof(errbuff) - errbuffpos);
2745  if (nr <= 0)
2746  {
2747  if (nr < 0)
2748  {
2749  if (errno == EAGAIN || errno == EWOULDBLOCK) return;
2750 
2751  log(fmt("stderr %s\n", strerror(errno)));
2752  }
2753  else
2754  log("stderr EOF\n");
2755  closeEfd();
2756  return;
2757  }
2758  errbuffpos += nr;
2759 
2760  for(int i = 0; i < errbuffpos; ++i)
2761  {
2762  if (errbuff[i] == '\n')
2763  {
2764  log(fmt("%.*s\n", (int)i, errbuff));
2765  i++; /* count including nl */
2766  errbuffpos -= i; /* remove from nexbuf */
2767  memmove(errbuff, errbuff + i, errbuffpos); /* slide remaining to front */
2768  i = -1; /* restart for loop scan */
2769  }
2770  }
2771  }
2772 }
2773 
2774 void LocalDvrInfo::onPidEvent(ev::child &, int revents)
2775 {
2776  if (revents & EV_CHILD)
2777  {
2778  if (WIFEXITED(pidwatcher.rstatus))
2779  {
2780  log(fmt("process %d exited with status %d\n", pid, WEXITSTATUS(pidwatcher.rstatus)));
2781  }
2782  else if (WIFSIGNALED(pidwatcher.rstatus))
2783  {
2784  int signum = WTERMSIG(pidwatcher.rstatus);
2785  log(fmt("process %d killed with signal %d - %s\n", pid, signum, strsignal(signum)));
2786  }
2787  pid = 0;
2788  this->pidwatcher.stop();
2789  }
2790 }
2791 
2793 {}
2794 
2796  DvrInfo(model),
2797  host(model.host),
2798  port(model.port)
2799 {}
2800 
2802 {}
2803 
2805 {
2806  return new RemoteDvrInfo(*this);
2807 }
2808 
2809 ClInfo::ClInfo(bool useSharedBuffer) : MsgQueue(useSharedBuffer)
2810 {
2811  clients.insert(this);
2812 }
2813 
2815 {
2816  for(auto prop : props)
2817  {
2818  delete prop;
2819  }
2820 
2821  clients.erase(this);
2822 }
2823 
2824 void ClInfo::log(const std::string &str) const
2825 {
2826  std::string logLine = fmt("Client %d: ", this->getRFd());
2827  logLine += str;
2828  ::log(logLine);
2829 }
2830 
2832 
2833 SerializedMsg::SerializedMsg(Msg * parent) : asyncProgress(), owner(parent), awaiters(), chuncks(), ownBuffers()
2834 {
2835  blockedProducer = nullptr;
2836  // At first, everything is required.
2837  for(auto fd : parent->sharedBuffers)
2838  {
2839  if (fd != -1)
2840  {
2841  requirements.sharedBuffers.insert(fd);
2842  }
2843  }
2844  requirements.xml = true;
2845  asyncStatus = PENDING;
2846  asyncProgress.set<SerializedMsg, &SerializedMsg::async_progressed>(this);
2847 }
2848 
2849 // Delete occurs when no async task is running and no awaiters are left
2851 {
2852  for(auto buff : ownBuffers)
2853  {
2854  free(buff);
2855  }
2856 }
2857 
2859 {
2860  std::lock_guard<std::recursive_mutex> guard(lock);
2861  return asyncStatus == CANCELING;
2862 }
2863 
2865 {
2866  std::lock_guard<std::recursive_mutex> guard(lock);
2867  if (this->requirements == req)
2868  {
2869  return;
2870  }
2871  this->requirements = req;
2872  asyncProgress.send();
2873 }
2874 
2876 {
2877  std::lock_guard<std::recursive_mutex> guard(lock);
2878 
2879  this->chuncks.push_back(m);
2880  asyncProgress.send();
2881 }
2882 
2884 {
2885  std::lock_guard<std::recursive_mutex> guard(lock);
2887  asyncProgress.send();
2888 }
2889 
2890 void SerializedMsg::async_start()
2891 {
2892  std::lock_guard<std::recursive_mutex> guard(lock);
2893  if (asyncStatus != PENDING)
2894  {
2895  return;
2896  }
2897 
2898  asyncStatus = RUNNING;
2899  if (generateContentAsync())
2900  {
2901  asyncProgress.start();
2902 
2903  std::thread t([this]()
2904  {
2905  generateContent();
2906  });
2907  t.detach();
2908  }
2909  else
2910  {
2911  generateContent();
2912  }
2913 }
2914 
2915 void SerializedMsg::async_progressed()
2916 {
2917  std::lock_guard<std::recursive_mutex> guard(lock);
2918 
2919  if (asyncStatus == TERMINATED)
2920  {
2921  // FIXME: unblock ?
2922  asyncProgress.stop();
2923  }
2924 
2925  // Update ios of awaiters
2926  for(auto awaiter : awaiters)
2927  {
2928  awaiter->messageMayHaveProgressed(this);
2929  }
2930 
2931  // Then prune
2932  owner->prune();
2933 }
2934 
2936 {
2937  std::lock_guard<std::recursive_mutex> guard(lock);
2938 
2939  return (asyncStatus == RUNNING) || (asyncStatus == CANCELING);
2940 }
2941 
2942 
2944 {
2945  std::lock_guard<std::recursive_mutex> guard(lock);
2946 
2947  if (asyncStatus == PENDING)
2948  {
2949  async_start();
2950  }
2951 
2952  if (asyncStatus == TERMINATED)
2953  {
2954  return true;
2955  }
2956 
2957  // Not reached the last chunck
2958  if (position.chunckId < chuncks.size())
2959  {
2960  return true;
2961  }
2962 
2963  return false;
2964 }
2965 
2966 bool SerializedMsg::getContent(MsgChunckIterator &from, void* &data, ssize_t &size,
2967  std::vector<int, std::allocator<int> > &sharedBuffers)
2968 {
2969  std::lock_guard<std::recursive_mutex> guard(lock);
2970 
2971  if (asyncStatus != TERMINATED && from.chunckId >= chuncks.size())
2972  {
2973  // Not ready yet
2974  return false;
2975  }
2976 
2977  if (from.chunckId == chuncks.size())
2978  {
2979  // Done
2980  data = 0;
2981  size = 0;
2982  from.endReached = true;
2983  return true;
2984  }
2985 
2986  const MsgChunck &ck = chuncks[from.chunckId];
2987 
2988  if (from.chunckOffset == 0)
2989  {
2990  sharedBuffers = ck.sharedBufferIdsToAttach;
2991  }
2992  else
2993  {
2994  sharedBuffers.clear();
2995  }
2996 
2997  data = ck.content + from.chunckOffset;
2998  size = ck.contentLength - from.chunckOffset;
2999  return true;
3000 }
3001 
3003 {
3004  std::lock_guard<std::recursive_mutex> guard(lock);
3005 
3006  MsgChunck &cur = chuncks[iter.chunckId];
3007  iter.chunckOffset += s;
3008  if (iter.chunckOffset >= cur.contentLength)
3009  {
3010  iter.chunckId ++ ;
3011  iter.chunckOffset = 0;
3012  if (iter.chunckId >= chuncks.size() && asyncStatus == TERMINATED)
3013  {
3014  iter.endReached = true;
3015  }
3016  }
3017 }
3018 
3020 {
3021  awaiters.insert(q);
3022 }
3023 
3025 {
3026  awaiters.erase(q);
3027  if (awaiters.empty() && !isAsyncRunning())
3028  {
3029  owner->releaseSerialization(this);
3030  }
3031 }
3032 
3034 {
3035  sr.add(requirements);
3036 }
3037 
3038 // This is called when a received message require additional // work, to avoid overflow
3040 {
3041  // TODO : implement or remove
3042  (void) receiver;
3043 }
3044 
3046 {
3047  return owner->queueSize;
3048 }
3049 
3051 {
3052 }
3053 
3055 {
3056 }
3057 
3059 {
3060 }
3061 
3063 {
3064  for(auto id : ownSharedBuffers)
3065  {
3066  close(id);
3067  }
3068 }
3069 
3070 
3071 MsgChunck::MsgChunck() : sharedBufferIdsToAttach()
3072 {
3073  content = nullptr;
3074  contentLength = 0;
3075 }
3076 
3077 MsgChunck::MsgChunck(char * content, unsigned long length) : sharedBufferIdsToAttach()
3078 {
3079  this->content = content;
3080  this->contentLength = length;
3081 }
3082 
3083 Msg::Msg(MsgQueue * from, XMLEle * ele): sharedBuffers()
3084 {
3085  this->from = from;
3086  xmlContent = ele;
3087  hasInlineBlobs = false;
3088  hasSharedBufferBlobs = false;
3089 
3090  convertionToSharedBuffer = nullptr;
3091  convertionToInline = nullptr;
3092 
3093  queueSize = sprlXMLEle(xmlContent, 0);
3094  for(auto blobContent : findBlobElements(xmlContent))
3095  {
3096  std::string attached = findXMLAttValu(blobContent, "attached");
3097  if (attached == "true")
3098  {
3099  hasSharedBufferBlobs = true;
3100  }
3101  else
3102  {
3103  hasInlineBlobs = true;
3104  }
3105  }
3106 }
3107 
3108 Msg::~Msg()
3109 {
3110  // Assume convertionToSharedBlob and convertionToInlineBlob were already droped
3111  assert(convertionToSharedBuffer == nullptr);
3112  assert(convertionToInline == nullptr);
3113 
3114  releaseXmlContent();
3115  releaseSharedBuffers(std::set<int>());
3116 }
3117 
3118 void Msg::releaseSerialization(SerializedMsg * msg)
3119 {
3120  if (msg == convertionToSharedBuffer)
3121  {
3122  convertionToSharedBuffer = nullptr;
3123  }
3124 
3125  if (msg == convertionToInline)
3126  {
3127  convertionToInline = nullptr;
3128  }
3129 
3130  delete(msg);
3131  prune();
3132 }
3133 
3134 void Msg::releaseXmlContent()
3135 {
3136  if (xmlContent != nullptr)
3137  {
3138  delXMLEle(xmlContent);
3139  xmlContent = nullptr;
3140  }
3141 }
3142 
3143 void Msg::releaseSharedBuffers(const std::set<int> &keep)
3144 {
3145  for(std::size_t i = 0; i < sharedBuffers.size(); ++i)
3146  {
3147  auto fd = sharedBuffers[i];
3148  if (fd != -1 && keep.find(fd) == keep.end())
3149  {
3150  if (close(fd) == -1)
3151  {
3152  perror("Releasing shared buffer");
3153  }
3154  sharedBuffers[i] = -1;
3155  }
3156  }
3157 }
3158 
3159 void Msg::prune()
3160 {
3161  // Collect ressources required.
3163  if (convertionToSharedBuffer)
3164  {
3165  convertionToSharedBuffer->collectRequirements(req);
3166  }
3167  if (convertionToInline)
3168  {
3169  convertionToInline->collectRequirements(req);
3170  }
3171  // Free the resources.
3172  if (!req.xml)
3173  {
3174  releaseXmlContent();
3175  }
3176 
3177  releaseSharedBuffers(req.sharedBuffers);
3178 
3179  // Nobody cares anymore ?
3180  if (convertionToSharedBuffer == nullptr && convertionToInline == nullptr)
3181  {
3182  delete(this);
3183  }
3184 }
3185 
3186 bool parseBlobSize(XMLEle * blobWithAttachedBuffer, ssize_t &size)
3187 {
3188  std::string sizeStr = findXMLAttValu(blobWithAttachedBuffer, "size");
3189  if (sizeStr == "")
3190  {
3191  return false;
3192  }
3193  std::size_t pos;
3194  size = std::stoll(sizeStr, &pos, 10);
3195  if (pos != sizeStr.size())
3196  {
3197  log("Invalid size attribute value " + sizeStr);
3198  return false;
3199  }
3200  return true;
3201 }
3202 
3204 bool Msg::fetchBlobs(std::list<int> &incomingSharedBuffers)
3205 {
3206  /* Consume every buffers */
3207  for(auto blobContent : findBlobElements(xmlContent))
3208  {
3209  ssize_t blobSize;
3210  if (!parseBlobSize(blobContent, blobSize))
3211  {
3212  log("Attached blob misses the size attribute");
3213  return false;
3214  }
3215 
3216  std::string attached = findXMLAttValu(blobContent, "attached");
3217  if (attached == "true")
3218  {
3219  if (incomingSharedBuffers.empty())
3220  {
3221  log("Missing shared buffer...\n");
3222  return false;
3223  }
3224 
3225  queueSize += blobSize;
3226  //log("Found one fd !\n");
3227  int fd = *incomingSharedBuffers.begin();
3228  incomingSharedBuffers.pop_front();
3229 
3230  sharedBuffers.push_back(fd);
3231  }
3232  else
3233  {
3234  // Check cdata length vs blobSize ?
3235  }
3236  }
3237  return true;
3238 }
3239 
3241 {
3242  prune();
3243 }
3244 
3245 Msg * Msg::fromXml(MsgQueue * from, XMLEle * root, std::list<int> &incomingSharedBuffers)
3246 {
3247  Msg * m = new Msg(from, root);
3248  if (!m->fetchBlobs(incomingSharedBuffers))
3249  {
3250  delete(m);
3251  return nullptr;
3252  }
3253  return m;
3254 }
3255 
3256 SerializedMsg * Msg::buildConvertionToSharedBuffer()
3257 {
3258  if (convertionToSharedBuffer)
3259  {
3260  return convertionToSharedBuffer;
3261  }
3262 
3263  convertionToSharedBuffer = new SerializedMsgWithSharedBuffer(this);
3264  if (hasInlineBlobs && from)
3265  {
3266  convertionToSharedBuffer->blockReceiver(from);
3267  }
3268  return convertionToSharedBuffer;
3269 }
3270 
3271 SerializedMsg * Msg::buildConvertionToInline()
3272 {
3273  if (convertionToInline)
3274  {
3275  return convertionToInline;
3276  }
3277 
3278  return convertionToInline = new SerializedMsgWithoutSharedBuffer(this);
3279 }
3280 
3282 {
3283  if (hasSharedBufferBlobs || hasInlineBlobs)
3284  {
3285  if (to->acceptSharedBuffers())
3286  {
3287  return buildConvertionToSharedBuffer();
3288  }
3289  else
3290  {
3291  return buildConvertionToInline();
3292  }
3293  }
3294  else
3295  {
3296  // Just serialize using copy
3297  return buildConvertionToInline();
3298  }
3299 }
3300 
3302 {
3303  for(auto blobContent : findBlobElements(owner->xmlContent))
3304  {
3305  // C'est pas trivial, dans ce cas, car il faut les réattacher
3306  std::string attached = findXMLAttValu(blobContent, "attached");
3307  if (attached != "true")
3308  {
3309  return true;
3310  }
3311  }
3312  return false;
3313 }
3314 
3315 static int xmlReplacementMapFind(void * self, XMLEle * source, XMLEle * * replace)
3316 {
3317  auto map = (const std::unordered_map<XMLEle*, XMLEle*> *) self;
3318  auto idx = map->find(source);
3319  if (idx == map->end())
3320  {
3321  return 0;
3322  }
3323  *replace = (XMLEle*)idx->second;
3324  return 1;
3325 }
3326 
3327 XMLEle * cloneXMLEleWithReplacementMap(XMLEle * root, const std::unordered_map<XMLEle*, XMLEle*> &replacement)
3328 {
3329  return cloneXMLEle(root, &xmlReplacementMapFind, (void*)&replacement);
3330 }
3331 
3333 {
3334  return owner->hasInlineBlobs || owner->hasSharedBufferBlobs;
3335 }
3336 
3338 {
3339  // Convert every shared buffer into an inline base64
3340  auto xmlContent = owner->xmlContent;
3341 
3342  std::vector<XMLEle*> cdata;
3343  // Every cdata will have either sharedBuffer or sharedCData
3344  std::vector<int> sharedBuffers;
3345  std::vector<ssize_t> xmlSizes;
3346  std::vector<XMLEle *> sharedCData;
3347 
3348  std::unordered_map<XMLEle*, XMLEle*> replacement;
3349 
3350  int ownerSharedBufferId = 0;
3351 
3352  // Identify shared buffer blob to base64 them
3353  // Identify base64 blob to avoid copying them (we'll copy the cdata)
3354  for(auto blobContent : findBlobElements(xmlContent))
3355  {
3356  std::string attached = findXMLAttValu(blobContent, "attached");
3357 
3358  if (attached != "true" && pcdatalenXMLEle(blobContent) == 0)
3359  {
3360  continue;
3361  }
3362 
3363  XMLEle * clone = shallowCloneXMLEle(blobContent);
3364  rmXMLAtt(clone, "attached");
3365  editXMLEle(clone, "_");
3366 
3367  replacement[blobContent] = clone;
3368  cdata.push_back(clone);
3369 
3370  if (attached == "true")
3371  {
3372  rmXMLAtt(clone, "enclen");
3373 
3374  // Get the size if present
3375  ssize_t size = -1;
3376  parseBlobSize(clone, size);
3377 
3378  // FIXME: we could add enclen there
3379 
3380  // Put something here for later replacement
3381  sharedBuffers.push_back(owner->sharedBuffers[ownerSharedBufferId++]);
3382  xmlSizes.push_back(size);
3383  sharedCData.push_back(nullptr);
3384  }
3385  else
3386  {
3387  sharedBuffers.push_back(-1);
3388  xmlSizes.push_back(-1);
3389  sharedCData.push_back(blobContent);
3390  }
3391  }
3392 
3393  if (replacement.empty())
3394  {
3395  // Just print the content as is...
3396 
3397  char * model = (char*)malloc(sprlXMLEle(xmlContent, 0) + 1);
3398  int modelSize = sprXMLEle(model, xmlContent, 0);
3399 
3400  ownBuffers.push_back(model);
3401 
3402  async_pushChunck(MsgChunck(model, modelSize));
3403 
3404  // FIXME: lower requirements asap... how to do that ?
3405  // requirements.xml = false;
3406  // requirements.sharedBuffers.clear();
3407 
3408  }
3409  else
3410  {
3411  // Create a replacement that shares original CData buffers
3412  xmlContent = cloneXMLEleWithReplacementMap(xmlContent, replacement);
3413 
3414  std::vector<size_t> modelCdataOffset(cdata.size());
3415 
3416  char * model = (char*)malloc(sprlXMLEle(xmlContent, 0) + 1);
3417  int modelSize = sprXMLEle(model, xmlContent, 0);
3418 
3419  ownBuffers.push_back(model);
3420 
3421  // Get the element offset
3422  for(std::size_t i = 0; i < cdata.size(); ++i)
3423  {
3424  modelCdataOffset[i] = sprXMLCDataOffset(xmlContent, cdata[i], 0);
3425  }
3426  delXMLEle(xmlContent);
3427 
3428  std::vector<int> fds(cdata.size());
3429  std::vector<void*> blobs(cdata.size());
3430  std::vector<size_t> sizes(cdata.size());
3431  std::vector<size_t> attachedSizes(cdata.size());
3432 
3433  // Attach all blobs
3434  for(std::size_t i = 0; i < cdata.size(); ++i)
3435  {
3436  if (sharedBuffers[i] != -1)
3437  {
3438  fds[i] = sharedBuffers[i];
3439 
3440  size_t dataSize;
3441  blobs[i] = attachSharedBuffer(fds[i], dataSize);
3442  attachedSizes[i] = dataSize;
3443 
3444  // check dataSize is compatible with the blob element's size
3445  // It's mandatory for attached blob to give their size
3446  if (xmlSizes[i] != -1 && ((size_t)xmlSizes[i]) <= dataSize)
3447  {
3448  dataSize = xmlSizes[i];
3449  }
3450  sizes[i] = dataSize;
3451  }
3452  else
3453  {
3454  fds[i] = -1;
3455  }
3456  }
3457 
3458  // Copy from model or blob (streaming base64 encode)
3459  int modelOffset = 0;
3460  for(std::size_t i = 0; i < cdata.size(); ++i)
3461  {
3462  int cdataOffset = modelCdataOffset[i];
3463  if (cdataOffset > modelOffset)
3464  {
3465  async_pushChunck(MsgChunck(model + modelOffset, cdataOffset - modelOffset));
3466  }
3467  // Skip the dummy cdata completly
3468  modelOffset = cdataOffset + 1;
3469 
3470  // Perform inplace base64
3471  // FIXME: could be streamed/splitted
3472 
3473  if (fds[i] != -1)
3474  {
3475  // Add a binary chunck. This needs base64 convertion
3476  // FIXME: the size here should be the size of the blob element
3477  unsigned long buffSze = sizes[i];
3478  const unsigned char* src = (const unsigned char*)blobs[i];
3479 
3480  // split here in smaller chuncks for faster startup
3481  // This allow starting write before the whole blob is converted
3482  while(buffSze > 0)
3483  {
3484  // We need a block size multiple of 24 bits (3 bytes)
3485  unsigned long sze = buffSze > 3 * 16384 ? 3 * 16384 : buffSze;
3486 
3487  char* buffer = (char*) malloc(4 * sze / 3 + 4);
3488  ownBuffers.push_back(buffer);
3489  int base64Count = to64frombits_s((unsigned char*)buffer, src, sze, (4 * sze / 3 + 4));
3490 
3491  async_pushChunck(MsgChunck(buffer, base64Count));
3492 
3493  buffSze -= sze;
3494  src += sze;
3495  }
3496 
3497 
3498  // Dettach blobs ASAP
3499  dettachSharedBuffer(fds[i], blobs[i], attachedSizes[i]);
3500 
3501  // requirements.sharedBuffers.erase(fds[i]);
3502  }
3503  else
3504  {
3505  // Add an already ready cdata section
3506 
3507  auto len = pcdatalenXMLEle(sharedCData[i]);
3508  auto data = pcdataXMLEle(sharedCData[i]);
3509  async_pushChunck(MsgChunck(data, len));
3510  }
3511  }
3512 
3513  if (modelOffset < modelSize)
3514  {
3515  async_pushChunck(MsgChunck(model + modelOffset, modelSize - modelOffset));
3516  modelOffset = modelSize;
3517  }
3518  }
3519  async_done();
3520 }
3521 
3523 {
3524  return owner->hasInlineBlobs;
3525 }
3526 
3528 {
3529  // Convert every inline base64 blob from xml into an attached buffer
3530  auto xmlContent = owner->xmlContent;
3531 
3532  std::vector<int> sharedBuffers = owner->sharedBuffers;
3533 
3534  std::unordered_map<XMLEle*, XMLEle*> replacement;
3535  int blobPos = 0;
3536  for(auto blobContent : findBlobElements(owner->xmlContent))
3537  {
3538  if (!pcdatalenXMLEle(blobContent))
3539  {
3540  continue;
3541  }
3542  std::string attached = findXMLAttValu(blobContent, "attached");
3543  if (attached != "true")
3544  {
3545  // We need to replace.
3546  XMLEle * clone = shallowCloneXMLEle(blobContent);
3547  rmXMLAtt(clone, "enclen");
3548  rmXMLAtt(clone, "attached");
3549  addXMLAtt(clone, "attached", "true");
3550 
3551  replacement[blobContent] = clone;
3552 
3553  int base64datalen = pcdatalenXMLEle(blobContent);
3554  char * base64data = pcdataXMLEle(blobContent);
3555  // Shall we really trust the size here ?
3556 
3557  ssize_t size;
3558  if (!parseBlobSize(blobContent, size))
3559  {
3560  log("Missing size value for blob");
3561  size = 1;
3562  }
3563 
3564  void * blob = IDSharedBlobAlloc(size);
3565  if (blob == nullptr)
3566  {
3567  log(fmt("Unable to allocate shared buffer of size %d : %s\n", size, strerror(errno)));
3568  ::exit(1);
3569  }
3570  log(fmt("Blob allocated at %p\n", blob));
3571 
3572  int actualLen = from64tobits_fast((char*)blob, base64data, base64datalen);
3573 
3574  if (actualLen != size)
3575  {
3576  // FIXME: WTF ? at least prevent overflow ???
3577  log(fmt("Blob size mismatch after base64dec: %lld vs %lld\n", (long long int)actualLen, (long long int)size));
3578  }
3579 
3580  int newFd = IDSharedBlobGetFd(blob);
3581  ownSharedBuffers.insert(newFd);
3582 
3583  IDSharedBlobDettach(blob);
3584 
3585  sharedBuffers.insert(sharedBuffers.begin() + blobPos, newFd);
3586  }
3587  blobPos++;
3588  }
3589 
3590  if (!replacement.empty())
3591  {
3592  // Work on a copy --- but we don't want to copy the blob !!!
3593  xmlContent = cloneXMLEleWithReplacementMap(xmlContent, replacement);
3594  }
3595 
3596  // Now create a Chunk from xmlContent
3597  MsgChunck chunck;
3598 
3599  chunck.content = (char*)malloc(sprlXMLEle(xmlContent, 0) + 1);
3600  ownBuffers.push_back(chunck.content);
3601  chunck.contentLength = sprXMLEle(chunck.content, xmlContent, 0);
3602  chunck.sharedBufferIdsToAttach = sharedBuffers;
3603 
3604  async_pushChunck(chunck);
3605 
3606  if (!replacement.empty())
3607  {
3608  delXMLEle(xmlContent);
3609  }
3610  async_done();
3611 }
3612 
3613 MsgQueue::MsgQueue(bool useSharedBuffer): useSharedBuffer(useSharedBuffer)
3614 {
3615  lp = newLilXML();
3616  rio.set<MsgQueue, &MsgQueue::ioCb>(this);
3617  wio.set<MsgQueue, &MsgQueue::ioCb>(this);
3618  rFd = -1;
3619  wFd = -1;
3620 }
3621 
3623 {
3624  rio.stop();
3625  wio.stop();
3626 
3627  clearMsgQueue();
3628  delLilXML(lp);
3629  lp = nullptr;
3630 
3631  setFds(-1, -1);
3632 
3633  /* unreference messages queue for this client */
3634  auto msgqcp = msgq;
3635  msgq.clear();
3636  for(auto mp : msgqcp)
3637  {
3638  mp->release(this);
3639  }
3640 }
3641 
3643 {
3644  if (wFd == -1)
3645  {
3646  // Already closed
3647  return;
3648  }
3649 
3650  int oldWFd = wFd;
3651 
3652  wFd = -1;
3653  // Clear the queue and stop the io slot
3654  clearMsgQueue();
3655 
3656  if (oldWFd == rFd)
3657  {
3658  if (shutdown(oldWFd, SHUT_WR) == -1)
3659  {
3660  if (errno != ENOTCONN)
3661  {
3662  log(fmt("socket shutdown failed: %s\n", strerror(errno)));
3663  close();
3664  }
3665  }
3666  }
3667  else
3668  {
3669  if (::close(oldWFd) == -1)
3670  {
3671  log(fmt("socket close failed: %s\n", strerror(errno)));
3672  close();
3673  }
3674  }
3675 }
3676 
3677 void MsgQueue::setFds(int rFd, int wFd)
3678 {
3679  if (this->rFd != -1)
3680  {
3681  rio.stop();
3682  wio.stop();
3683  ::close(this->rFd);
3684  if (this->rFd != this->wFd)
3685  {
3686  ::close(this->wFd);
3687  }
3688  }
3689  else if (this->wFd != -1)
3690  {
3691  wio.stop();
3692  ::close(this->wFd);
3693  }
3694 
3695  this->rFd = rFd;
3696  this->wFd = wFd;
3697  this->nsent.reset();
3698 
3699  if (rFd != -1)
3700  {
3701  fcntl(rFd, F_SETFL, fcntl(rFd, F_GETFL, 0) | O_NONBLOCK);
3702  if (wFd != rFd)
3703  {
3704  fcntl(wFd, F_SETFL, fcntl(wFd, F_GETFL, 0) | O_NONBLOCK);
3705  }
3706 
3707  rio.set(rFd, ev::READ);
3708  wio.set(wFd, ev::WRITE);
3709  updateIos();
3710  }
3711 }
3712 
3714 {
3715  if (msgq.empty()) return nullptr;
3716  return *(msgq.begin());
3717 }
3718 
3720 {
3721  auto msg = headMsg();
3722  msgq.pop_front();
3723  msg->release(this);
3724  nsent.reset();
3725 
3726  updateIos();
3727 }
3728 
3730 {
3731  // Don't write messages to client that have been disconnected
3732  if (wFd == -1)
3733  {
3734  return;
3735  }
3736 
3737  auto serialized = mp->serialize(this);
3738 
3739  msgq.push_back(serialized);
3740  serialized->addAwaiter(this);
3741 
3742  // Register for client write
3743  updateIos();
3744 }
3745 
3746 void MsgQueue::updateIos()
3747 {
3748  if (wFd != -1)
3749  {
3750  if (msgq.empty() || !msgq.front()->requestContent(nsent))
3751  {
3752  wio.stop();
3753  }
3754  else
3755  {
3756  wio.start();
3757  }
3758  }
3759  if (rFd != -1)
3760  {
3761  rio.start();
3762  }
3763 }
3764 
3766 {
3767  if ((!msgq.empty()) && (msgq.front() == msg))
3768  {
3769  updateIos();
3770  }
3771 }
3772 
3774 {
3775  nsent.reset();
3776 
3777  auto queueCopy = msgq;
3778  for(auto mp : queueCopy)
3779  {
3780  mp->release(this);
3781  }
3782  msgq.clear();
3783 
3784  // Cancel io write events
3785  updateIos();
3786  wio.stop();
3787 }
3788 
3789 unsigned long MsgQueue::msgQSize() const
3790 {
3791  unsigned long l = 0;
3792 
3793  for (auto mp : msgq)
3794  {
3795  l += sizeof(Msg);
3796  l += mp->queueSize();
3797  }
3798 
3799  return (l);
3800 }
3801 
3802 void MsgQueue::ioCb(ev::io &, int revents)
3803 {
3804  if (EV_ERROR & revents)
3805  {
3806  int sockErrno = readFdError(this->rFd);
3807  if ((!sockErrno) && this->wFd != this->rFd)
3808  {
3809  sockErrno = readFdError(this->wFd);
3810  }
3811 
3812  if (sockErrno)
3813  {
3814  log(fmt("Communication error: %s\n", strerror(sockErrno)));
3815  close();
3816  return;
3817  }
3818  }
3819 
3820  if (revents & EV_READ)
3821  readFromFd();
3822 
3823  if (revents & EV_WRITE)
3824  writeToFd();
3825 }
3826 
3827 size_t MsgQueue::doRead(char * buf, size_t nr)
3828 {
3829  if (!useSharedBuffer)
3830  {
3831  /* read client - works for all kinds of fds incl pipe*/
3832  return read(rFd, buf, sizeof(buf));
3833  }
3834  else
3835  {
3836  // Use recvmsg for ancillary data
3837  struct msghdr msgh;
3838  struct iovec iov;
3839 
3840  union
3841  {
3842  struct cmsghdr cmsgh;
3843  /* Space large enough to hold an 'int' */
3844  char control[CMSG_SPACE(MAXFD_PER_MESSAGE * sizeof(int))];
3845  } control_un;
3846 
3847  iov.iov_base = buf;
3848  iov.iov_len = nr;
3849 
3850  msgh.msg_name = NULL;
3851  msgh.msg_namelen = 0;
3852  msgh.msg_iov = &iov;
3853  msgh.msg_iovlen = 1;
3854  msgh.msg_flags = 0;
3855  msgh.msg_control = control_un.control;
3856  msgh.msg_controllen = sizeof(control_un.control);
3857 
3858  int recvflag;
3859 #ifdef __linux__
3860  recvflag = MSG_CMSG_CLOEXEC;
3861 #else
3862  recvflag = 0;
3863 #endif
3864  int size = recvmsg(rFd, &msgh, recvflag);
3865  if (size == -1)
3866  {
3867  return -1;
3868  }
3869 
3870  for (struct cmsghdr * cmsg = CMSG_FIRSTHDR(&msgh); cmsg != NULL; cmsg = CMSG_NXTHDR(&msgh, cmsg))
3871  {
3872  if (cmsg->cmsg_level == SOL_SOCKET && cmsg->cmsg_type == SCM_RIGHTS)
3873  {
3874  int fdCount = 0;
3875  while(cmsg->cmsg_len >= CMSG_LEN((fdCount + 1) * sizeof(int)))
3876  {
3877  fdCount++;
3878  }
3879  //log(fmt("Received %d fds\n", fdCount));
3880  int * fds = (int*)CMSG_DATA(cmsg);
3881  for(int i = 0; i < fdCount; ++i)
3882  {
3883 #ifndef __linux__
3884  fcntl(fds[i], F_SETFD, FD_CLOEXEC);
3885 #endif
3886  incomingSharedBuffers.push_back(fds[i]);
3887  }
3888  }
3889  else
3890  {
3891  log(fmt("Ignoring ancillary data level %d, type %d\n", cmsg->cmsg_level, cmsg->cmsg_type));
3892  }
3893  }
3894  return size;
3895  }
3896 }
3897 
3898 void MsgQueue::readFromFd()
3899 {
3900  char buf[MAXRBUF];
3901  ssize_t nr;
3902 
3903  /* read client */
3904  nr = doRead(buf, sizeof(buf));
3905  if (nr <= 0)
3906  {
3907  if (errno == EAGAIN || errno == EWOULDBLOCK) return;
3908 
3909  if (nr < 0)
3910  log(fmt("read: %s\n", strerror(errno)));
3911  else if (verbose > 0)
3912  log(fmt("read EOF\n"));
3913  close();
3914  return;
3915  }
3916 
3917  /* process XML chunk */
3918  char err[1024];
3919  XMLEle **nodes = parseXMLChunk(lp, buf, nr, err);
3920  if (!nodes)
3921  {
3922  log(fmt("XML error: %s\n", err));
3923  log(fmt("XML read: %.*s\n", (int)nr, buf));
3924  close();
3925  return;
3926  }
3927 
3928  int inode = 0;
3929 
3930  XMLEle *root = nodes[inode];
3931  // Stop processing message in case of deletion...
3932  auto hb = heartBeat();
3933  while (root)
3934  {
3935  if (hb.alive())
3936  {
3937  if (verbose > 2)
3938  traceMsg("read ", root);
3939  else if (verbose > 1)
3940  {
3941  log(fmt("read <%s device='%s' name='%s'>\n",
3942  tagXMLEle(root), findXMLAttValu(root, "device"), findXMLAttValu(root, "name")));
3943  }
3944 
3945  onMessage(root, incomingSharedBuffers);
3946  }
3947  else
3948  {
3949  // Otherwise, client got killed. Just release pending messages
3950  delXMLEle(root);
3951  }
3952  inode++;
3953  root = nodes[inode];
3954  }
3955 
3956  free(nodes);
3957 }
3958 
3959 static std::vector<XMLEle *> findBlobElements(XMLEle * root)
3960 {
3961  std::vector<XMLEle *> result;
3962  for (auto ep = nextXMLEle(root, 1); ep; ep = nextXMLEle(root, 0))
3963  {
3964  if (strcmp(tagXMLEle(ep), "oneBLOB") == 0)
3965  {
3966  result.push_back(ep);
3967  }
3968  }
3969  return result;
3970 }
3971 
3972 static void log(const std::string &log)
3973 {
3974  fprintf(stderr, "%s: ", indi_tstamp(NULL));
3975  fprintf(stderr, "%s", log.c_str());
3976 }
3977 
3978 static int readFdError(int fd)
3979 {
3980 #ifdef MSG_ERRQUEUE
3981  char rcvbuf[128]; /* Buffer for normal data (not expected here...) */
3982  char cbuf[512]; /* Buffer for ancillary data (errors) */
3983  struct iovec iov;
3984  struct msghdr msg;
3985 
3986  iov.iov_base = &rcvbuf;
3987  iov.iov_len = sizeof(rcvbuf);
3988 
3989  msg.msg_name = nullptr;
3990  msg.msg_namelen = 0;
3991  msg.msg_iov = &iov;
3992  msg.msg_iovlen = 1;
3993  msg.msg_flags = 0;
3994  msg.msg_control = cbuf;
3995  msg.msg_controllen = sizeof(cbuf);
3996 
3997  int recv_bytes = recvmsg(fd, &msg, MSG_ERRQUEUE | MSG_DONTWAIT);
3998  if (recv_bytes == -1)
3999  {
4000  if (errno == EAGAIN || errno == EWOULDBLOCK) return 0;
4001  return errno;
4002  }
4003 
4004  /* Receive auxiliary data in msgh */
4005  for (struct cmsghdr * cmsg = CMSG_FIRSTHDR(&msg); cmsg != NULL; cmsg = CMSG_NXTHDR(&msg, cmsg))
4006  {
4007  fprintf(stderr, "cmsg_len=%zu, cmsg_level=%u, cmsg_type=%u\n", cmsg->cmsg_len, cmsg->cmsg_level, cmsg->cmsg_type);
4008 
4009  if (cmsg->cmsg_level == SOL_IP && cmsg->cmsg_type == IP_RECVERR)
4010  {
4011  return ((struct sock_extended_err *)CMSG_DATA(cmsg))->ee_errno;
4012  }
4013  }
4014 #else
4015  (void)fd;
4016 #endif
4017 
4018  // Default to EIO as a generic error path
4019  return EIO;
4020 }
4021 
4022 static void * attachSharedBuffer(int fd, size_t &size)
4023 {
4024  struct stat sb;
4025  if (fstat(fd, &sb) == -1)
4026  {
4027  perror("invalid shared buffer fd");
4028  Bye();
4029  }
4030  size = sb.st_size;
4031  void * ret = mmap(0, size, PROT_READ, MAP_SHARED, fd, 0);
4032 
4033  if (ret == MAP_FAILED)
4034  {
4035  perror("mmap");
4036  Bye();
4037  }
4038 
4039  return ret;
4040 }
4041 
4042 static void dettachSharedBuffer(int fd, void * ptr, size_t size)
4043 {
4044  (void)fd;
4045  if (munmap(ptr, size) == -1)
4046  {
4047  perror("shared buffer munmap");
4048  Bye();
4049  }
4050 }
4051 
4052 static std::string fmt(const char *fmt, ...)
4053 {
4054  char buffer[128];
4055  int size = sizeof(buffer);
4056  char *p = buffer;
4057  va_list ap;
4058 
4059  /* Determine required size */
4060  va_start(ap, fmt);
4061  size = vsnprintf(p, size, fmt, ap);
4062  va_end(ap);
4063 
4064  if (size < 0)
4065  {
4066  perror("vnsprintf");
4067  }
4068 
4069  if ((unsigned)size < sizeof(buffer))
4070  {
4071  return std::string(buffer);
4072  }
4073 
4074  size++; /* For '\0' */
4075  p = (char*)malloc(size);
4076  if (p == NULL)
4077  {
4078  perror("malloc");
4079  Bye();
4080  }
4081 
4082  va_start(ap, fmt);
4083  size = vsnprintf(p, size, fmt, ap);
4084  va_end(ap);
4085 
4086  if (size < 0)
4087  {
4088  free(p);
4089  perror("vnsprintf");
4090  }
4091  std::string ret(p);
4092  free(p);
4093  return ret;
4094 }
int to64frombits_s(unsigned char *out, const unsigned char *in, int inlen, size_t outlen)
Convert bytes array to base64.
Definition: base64.c:63
int from64tobits_fast(char *out, const char *in, int inlen)
Definition: base64.c:122
__attribute__((__format__(__printf__, 2, 0))) void CelestronDriver
int allprops
Definition: indiserver.cpp:656
static ConcurrentSet< ClInfo > clients
Definition: indiserver.cpp:682
virtual void log(const std::string &log) const
static void q2Clients(ClInfo *notme, int isblob, const std::string &dev, const std::string &name, Msg *mp, XMLEle *root)
static void q2Servers(DvrInfo *me, Msg *mp, XMLEle *root)
ClInfo(bool useSharedBuffer)
void crackBLOBHandling(const std::string &dev, const std::string &name, const char *enableBLOB)
BLOBHandling blob
Definition: indiserver.cpp:657
virtual void close()
int findDevice(const std::string &dev, const std::string &name) const
std::list< Property * > props
Definition: indiserver.cpp:655
virtual ~ClInfo()
virtual void onMessage(XMLEle *root, std::list< int > &sharedBuffers)
void addDevice(const std::string &dev, const std::string &name, int isblob)
HeartBeat heartBeat() const
Definition: indiserver.cpp:250
bool operator!=(const iterator &o)
Definition: indiserver.cpp:183
iterator(const ConcurrentSet< M > *parent)
Definition: indiserver.cpp:181
iterator end() const
Definition: indiserver.cpp:215
M * operator[](unsigned long id) const
Definition: indiserver.cpp:150
iterator begin() const
Definition: indiserver.cpp:204
void erase(M *item)
Definition: indiserver.cpp:133
void insert(M *item)
Definition: indiserver.cpp:126
std::vector< unsigned long > ids() const
Definition: indiserver.cpp:140
bool restart
Definition: indiserver.cpp:716
virtual void log(const std::string &log) const
std::set< std::string > dev
Definition: indiserver.cpp:713
static void q2RDrivers(const std::string &dev, Msg *mp, XMLEle *root)
bool isHandlingDevice(const std::string &dev) const
static void q2SDrivers(DvrInfo *me, int isblob, const std::string &dev, const std::string &name, Msg *mp, XMLEle *root)
virtual bool acceptSharedBuffers() const
Definition: indiserver.cpp:752
virtual const std::string remoteServerUid() const =0
virtual void start()=0
static ConcurrentSet< DvrInfo > drivers
Definition: indiserver.cpp:749
Property * findSDevice(const std::string &dev, const std::string &name) const
virtual DvrInfo * clone() const =0
virtual ~DvrInfo()
virtual void onMessage(XMLEle *root, std::list< int > &sharedBuffers)
virtual void close()
virtual void closeWritePart()
std::string name
Definition: indiserver.cpp:711
std::list< Property * > sprops
Definition: indiserver.cpp:714
int restarts
Definition: indiserver.cpp:715
DvrInfo(const DvrInfo &model)
void listen()
Definition: indiserver.cpp:628
Fifo(const std::string &name)
std::string envPrefix
Definition: indiserver.cpp:779
virtual LocalDvrInfo * clone() const
virtual void start()
std::string envConfig
Definition: indiserver.cpp:777
std::string envSkel
Definition: indiserver.cpp:778
virtual const std::string remoteServerUid() const
Definition: indiserver.cpp:788
virtual ~LocalDvrInfo()
std::string envDev
Definition: indiserver.cpp:776
bool done() const
Definition: indiserver.cpp:441
virtual ~MsgQueue()
static void crackBLOB(const char *enableBLOB, BLOBHandling *bp)
void consumeHeadMsg()
void messageMayHaveProgressed(const SerializedMsg *msg)
MsgQueue(bool useSharedBuffer)
virtual void onMessage(XMLEle *root, std::list< int > &sharedBuffers)=0
void clearMsgQueue()
virtual bool acceptSharedBuffers() const
Definition: indiserver.cpp:590
void traceMsg(const std::string &log, XMLEle *root)
virtual void closeWritePart()
void pushMsg(Msg *msg)
virtual void close()=0
unsigned long msgQSize() const
virtual void log(const std::string &log) const
bool useSharedBuffer
Definition: indiserver.cpp:544
int getWFd() const
Definition: indiserver.cpp:549
SerializedMsg * headMsg() const
void setFds(int rFd, int wFd)
int getRFd() const
Definition: indiserver.cpp:545
SerializedMsg * serialize(MsgQueue *from)
Msg(MsgQueue *from, XMLEle *root)
friend class SerializedMsgWithSharedBuffer
Definition: indiserver.cpp:451
friend class SerializedMsgWithoutSharedBuffer
Definition: indiserver.cpp:452
static Msg * fromXml(MsgQueue *from, XMLEle *root, std::list< int > &incomingSharedBuffers)
void queuingDone()
Property(const std::string &dev, const std::string &name)
Definition: indiserver.cpp:606
BLOBHandling blob
Definition: indiserver.cpp:604
std::string dev
Definition: indiserver.cpp:602
std::string name
Definition: indiserver.cpp:603
virtual const std::string remoteServerUid() const
Definition: indiserver.cpp:817
virtual void start()
virtual ~RemoteDvrInfo()
virtual RemoteDvrInfo * clone() const
std::string host
Definition: indiserver.cpp:807
SerializedMsgWithSharedBuffer(Msg *parent)
virtual bool generateContentAsync() const
virtual bool generateContentAsync() const
SerializedMsgWithoutSharedBuffer(Msg *parent)
void async_updateRequirement(const SerializationRequirement &n)
bool async_canceled()
SerializationStatus asyncStatus
Definition: indiserver.cpp:345
bool getContent(MsgChunckIterator &position, void *&data, ssize_t &nsend, std::vector< int > &sharedBuffers)
virtual bool generateContentAsync() const =0
MsgQueue * blockedProducer
Definition: indiserver.cpp:348
void collectRequirements(SerializationRequirement &req)
std::list< void * > ownBuffers
Definition: indiserver.cpp:356
void blockReceiver(MsgQueue *toblock)
virtual ~SerializedMsg()
ssize_t queueSize()
bool requestContent(const MsgChunckIterator &position)
void onDataReady()
std::set< MsgQueue * > awaiters
Definition: indiserver.cpp:350
virtual void generateContent()=0
SerializedMsg(Msg *parent)
void advance(MsgChunckIterator &position, ssize_t s)
void addAwaiter(MsgQueue *awaiter)
void release(MsgQueue *from)
void async_pushChunck(const MsgChunck &m)
bool isAsyncRunning()
void listen()
TcpServer(int port)
int errno
Constants and Data structure definitions for the interface to the reference INDI C API implementation...
#define MAXINDIDEVICE
Definition: indiapi.h:193
#define INDIV
Definition: indiapi.h:134
Interface to the reference INDI C API device implementation on the Device Driver side.
BLOBHandling
How drivers handle BLOBs incoming from snooping drivers.
Definition: indidevapi.h:266
@ B_ONLY
Definition: indidevapi.h:269
@ B_ALSO
Definition: indidevapi.h:268
@ B_NEVER
Definition: indidevapi.h:267
#define DEFMAXRESTART
Definition: indiserver.cpp:107
bool parseBlobSize(XMLEle *blobWithAttachedBuffer, ssize_t &size)
#define MAXSBUF
Definition: indiserver.cpp:101
#define MAXRBUF
Definition: indiserver.cpp:102
#define DEFMAXSSIZ
Definition: indiserver.cpp:106
#define DEFMAXQSIZ
Definition: indiserver.cpp:105
#define TO_STRING(x)
Definition: indiserver.cpp:115
#define MAXWSIZ
Definition: indiserver.cpp:103
#define MAXFD_PER_MESSAGE
Definition: indiserver.cpp:108
#define INDIUNIXSOCK
Definition: indiserver.cpp:100
#define INDIPORT
Definition: indiserver.cpp:99
int main(int ac, char *av[])
Definition: indiserver.cpp:899
SerializationStatus
Definition: indiserver.cpp:311
@ RUNNING
Definition: indiserver.cpp:311
@ PENDING
Definition: indiserver.cpp:311
@ TERMINATED
Definition: indiserver.cpp:311
@ CANCELING
Definition: indiserver.cpp:311
XMLEle * cloneXMLEleWithReplacementMap(XMLEle *root, const std::unordered_map< XMLEle *, XMLEle * > &replacement)
int fd
Definition: intelliscope.c:43
XMLAtt * findXMLAtt(XMLEle *ep, const char *name)
Find an XML attribute within an XML element.
Definition: lilxml.cpp:524
XMLEle * shallowCloneXMLEle(XMLEle *ele)
return a surface copy of a node. Don't copy childs or cdata.
Definition: lilxml.cpp:731
LilXML * newLilXML()
Create a new lilxml parser.
Definition: lilxml.cpp:150
XMLEle * cloneXMLEle(XMLEle *ep)
Definition: lilxml.cpp:479
XMLEle ** parseXMLChunk(LilXML *lp, char *buf, int size, char ynot[])
Process an XML chunk.
Definition: lilxml.cpp:215
size_t sprXMLEle(char *s, XMLEle *ep, int level)
sample print ep to string s. N.B. s must be at least as large as that reported by sprlXMLEle()+1....
Definition: lilxml.cpp:874
const char * findXMLAttValu(XMLEle *ep, const char *name)
Find an XML element's attribute value.
Definition: lilxml.cpp:644
XMLAtt * addXMLAtt(XMLEle *ep, const char *name, const char *valu)
Add an XML attribute to an existing XML element.
Definition: lilxml.cpp:706
char * pcdataXMLEle(XMLEle *ep)
Return the pcdata of an XML element.
Definition: lilxml.cpp:606
void editXMLEle(XMLEle *ep, const char *pcdata)
set the pcdata of the given element
Definition: lilxml.cpp:698
char * tagXMLEle(XMLEle *ep)
Return the tag of an XML element.
Definition: lilxml.cpp:600
size_t sprXMLCDataOffset(XMLEle *root, XMLEle *ep, int level)
return exact position of cdata of child in printed representation of root N.B. set level = 0 on first...
Definition: lilxml.cpp:930
void rmXMLAtt(XMLEle *ep, const char *name)
Remove an XML attribute from an XML element.
Definition: lilxml.cpp:715
void prXMLEle(FILE *fp, XMLEle *ep, int level)
Print an XML element.
Definition: lilxml.cpp:844
size_t sprlXMLEle(XMLEle *ep, int level)
return number of bytes in a string guaranteed able to hold result of sprXLMEle(ep) (sans trailing \0@...
Definition: lilxml.cpp:922
XMLEle * nextXMLEle(XMLEle *ep, int init)
Iterate an XML element for a list of nesetd XML elements.
Definition: lilxml.cpp:555
void delXMLEle(XMLEle *ep)
delXMLEle Delete XML element.
Definition: lilxml.cpp:167
void delLilXML(LilXML *lp)
Delete a lilxml parser.
Definition: lilxml.cpp:159
XMLEle * setXMLEleTag(XMLEle *ep, const char *tag)
Update the tag of an element.
Definition: lilxml.cpp:688
XMLEle * addXMLEle(XMLEle *parent, const char *tag)
add an element with the given tag to the given element. parent can be NULL to make a new root.
Definition: lilxml.cpp:670
int pcdatalenXMLEle(XMLEle *ep)
Return the number of characters in pcdata in an XML element.
Definition: lilxml.cpp:612
char * valuXMLAtt(XMLAtt *ap)
Return the value of an XML attribute.
Definition: lilxml.cpp:624
A little DOM-style library to handle parsing and processing an XML file.
std::vector< uint8_t > buffer
bool sync(const int fd)
@ replace
replace invalid UTF-8 sequences with U+FFFD
NLOHMANN_BASIC_JSON_TPL_DECLARATION std::string to_string(const NLOHMANN_BASIC_JSON_TPL &j)
user-defined to_string function for JSON values
Definition: json.h:23613
bool operator==(json_pointer< RefStringTypeLhs > const &lhs, json_pointer< RefStringTypeRhs > const &rhs) noexcept
Definition: json.h:14083
Definition: json.h:4973
__u8 cmd[4]
Definition: pwc-ioctl.h:2
std::unique_ptr< RadioSim > receiver(new RadioSim())
void IDSharedBlobDettach(void *ptr)
Definition: sharedblob.c:154
void * IDSharedBlobAlloc(size_t size)
Definition: sharedblob.c:70
int IDSharedBlobGetFd(void *ptr)
Definition: sharedblob.c:241