pacemaker  1.1.18-7fdfbbe
Scalable High-Availability cluster resource manager
 All Data Structures Files Functions Variables Typedefs Enumerations Enumerator Macros Groups Pages
remote.c
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2008 Andrew Beekhof
3  *
4  * This library is free software; you can redistribute it and/or
5  * modify it under the terms of the GNU Lesser General Public
6  * License as published by the Free Software Foundation; either
7  * version 2.1 of the License, or (at your option) any later version.
8  *
9  * This library is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
12  * Lesser General Public License for more details.
13  *
14  * You should have received a copy of the GNU Lesser General Public
15  * License along with this library; if not, write to the Free Software
16  * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
17  *
18  */
19 #include <crm_internal.h>
20 #include <crm/crm.h>
21 
22 #include <sys/param.h>
23 #include <stdio.h>
24 #include <sys/types.h>
25 #include <sys/stat.h>
26 #include <unistd.h>
27 #include <sys/socket.h>
28 #include <arpa/inet.h>
29 #include <netinet/in.h>
30 #include <netinet/ip.h>
31 #include <netinet/tcp.h>
32 #include <netdb.h>
33 
34 #include <stdlib.h>
35 #include <errno.h>
36 #include <glib.h>
37 
38 #include <bzlib.h>
39 
40 #include <crm/common/ipcs.h>
41 #include <crm/common/xml.h>
42 #include <crm/common/mainloop.h>
43 
44 #ifdef HAVE_GNUTLS_GNUTLS_H
45 # undef KEYFILE
46 # include <gnutls/gnutls.h>
47 
48 const int psk_tls_kx_order[] = {
49  GNUTLS_KX_DHE_PSK,
50  GNUTLS_KX_PSK,
51 };
52 
53 const int anon_tls_kx_order[] = {
54  GNUTLS_KX_ANON_DH,
55  GNUTLS_KX_DHE_RSA,
56  GNUTLS_KX_DHE_DSS,
57  GNUTLS_KX_RSA,
58  0
59 };
60 #endif
61 
62 /* Swab macros from linux/swab.h */
63 #ifdef HAVE_LINUX_SWAB_H
64 # include <linux/swab.h>
65 #else
66 /*
67  * casts are necessary for constants, because we never know how for sure
68  * how U/UL/ULL map to __u16, __u32, __u64. At least not in a portable way.
69  */
70 #define __swab16(x) ((uint16_t)( \
71  (((uint16_t)(x) & (uint16_t)0x00ffU) << 8) | \
72  (((uint16_t)(x) & (uint16_t)0xff00U) >> 8)))
73 
74 #define __swab32(x) ((uint32_t)( \
75  (((uint32_t)(x) & (uint32_t)0x000000ffUL) << 24) | \
76  (((uint32_t)(x) & (uint32_t)0x0000ff00UL) << 8) | \
77  (((uint32_t)(x) & (uint32_t)0x00ff0000UL) >> 8) | \
78  (((uint32_t)(x) & (uint32_t)0xff000000UL) >> 24)))
79 
80 #define __swab64(x) ((uint64_t)( \
81  (((uint64_t)(x) & (uint64_t)0x00000000000000ffULL) << 56) | \
82  (((uint64_t)(x) & (uint64_t)0x000000000000ff00ULL) << 40) | \
83  (((uint64_t)(x) & (uint64_t)0x0000000000ff0000ULL) << 24) | \
84  (((uint64_t)(x) & (uint64_t)0x00000000ff000000ULL) << 8) | \
85  (((uint64_t)(x) & (uint64_t)0x000000ff00000000ULL) >> 8) | \
86  (((uint64_t)(x) & (uint64_t)0x0000ff0000000000ULL) >> 24) | \
87  (((uint64_t)(x) & (uint64_t)0x00ff000000000000ULL) >> 40) | \
88  (((uint64_t)(x) & (uint64_t)0xff00000000000000ULL) >> 56)))
89 #endif
90 
91 #define REMOTE_MSG_VERSION 1
92 #define ENDIAN_LOCAL 0xBADADBBD
93 
94 struct crm_remote_header_v0
95 {
96  uint32_t endian; /* Detect messages from hosts with different endian-ness */
98  uint64_t id;
99  uint64_t flags;
104 
105  /* New fields get added here */
106 
107 } __attribute__ ((packed));
108 
109 static struct crm_remote_header_v0 *
110 crm_remote_header(crm_remote_t * remote)
111 {
112  struct crm_remote_header_v0 *header = (struct crm_remote_header_v0 *)remote->buffer;
113  if(remote->buffer_offset < sizeof(struct crm_remote_header_v0)) {
114  return NULL;
115 
116  } else if(header->endian != ENDIAN_LOCAL) {
117  uint32_t endian = __swab32(header->endian);
118 
119  CRM_LOG_ASSERT(endian == ENDIAN_LOCAL);
120  if(endian != ENDIAN_LOCAL) {
121  crm_err("Invalid message detected, endian mismatch: %lx is neither %lx nor the swab'd %lx",
122  ENDIAN_LOCAL, header->endian, endian);
123  return NULL;
124  }
125 
126  header->id = __swab64(header->id);
127  header->flags = __swab64(header->flags);
128  header->endian = __swab32(header->endian);
129 
130  header->version = __swab32(header->version);
131  header->size_total = __swab32(header->size_total);
132  header->payload_offset = __swab32(header->payload_offset);
133  header->payload_compressed = __swab32(header->payload_compressed);
134  header->payload_uncompressed = __swab32(header->payload_uncompressed);
135  }
136 
137  return header;
138 }
139 
140 #ifdef HAVE_GNUTLS_GNUTLS_H
141 
142 int
143 crm_initiate_client_tls_handshake(crm_remote_t * remote, int timeout_ms)
144 {
145  int rc = 0;
146  int pollrc = 0;
147  time_t start = time(NULL);
148 
149  do {
150  rc = gnutls_handshake(*remote->tls_session);
151  if (rc == GNUTLS_E_INTERRUPTED || rc == GNUTLS_E_AGAIN) {
152  pollrc = crm_remote_ready(remote, 1000);
153  if (pollrc < 0) {
154  /* poll returned error, there is no hope */
155  rc = -1;
156  }
157  }
159  } while (((time(NULL) - start) < (timeout_ms / 1000)) &&
160  (rc == GNUTLS_E_INTERRUPTED || rc == GNUTLS_E_AGAIN));
161 
162  if (rc < 0) {
163  crm_trace("gnutls_handshake() failed with %d", rc);
164  }
165  return rc;
166 }
167 
168 void *
169 crm_create_anon_tls_session(int csock, int type /* GNUTLS_SERVER, GNUTLS_CLIENT */ ,
170  void *credentials)
171 {
172  gnutls_session_t *session = gnutls_malloc(sizeof(gnutls_session_t));
173 
174  gnutls_init(session, type);
175 # ifdef HAVE_GNUTLS_PRIORITY_SET_DIRECT
176 /* http://www.manpagez.com/info/gnutls/gnutls-2.10.4/gnutls_81.php#Echo-Server-with-anonymous-authentication */
177  gnutls_priority_set_direct(*session, "NORMAL:+ANON-DH", NULL);
178 /* gnutls_priority_set_direct (*session, "NONE:+VERS-TLS-ALL:+CIPHER-ALL:+MAC-ALL:+SIGN-ALL:+COMP-ALL:+ANON-DH", NULL); */
179 # else
180  gnutls_set_default_priority(*session);
181  gnutls_kx_set_priority(*session, anon_tls_kx_order);
182 # endif
183  gnutls_transport_set_ptr(*session, (gnutls_transport_ptr_t) GINT_TO_POINTER(csock));
184  switch (type) {
185  case GNUTLS_SERVER:
186  gnutls_credentials_set(*session, GNUTLS_CRD_ANON,
187  (gnutls_anon_server_credentials_t) credentials);
188  break;
189  case GNUTLS_CLIENT:
190  gnutls_credentials_set(*session, GNUTLS_CRD_ANON,
191  (gnutls_anon_client_credentials_t) credentials);
192  break;
193  }
194 
195  return session;
196 }
197 
198 void *
199 create_psk_tls_session(int csock, int type /* GNUTLS_SERVER, GNUTLS_CLIENT */ , void *credentials)
200 {
201  gnutls_session_t *session = gnutls_malloc(sizeof(gnutls_session_t));
202 
203  gnutls_init(session, type);
204 # ifdef HAVE_GNUTLS_PRIORITY_SET_DIRECT
205  gnutls_priority_set_direct(*session, "NORMAL:+DHE-PSK:+PSK", NULL);
206 # else
207  gnutls_set_default_priority(*session);
208  gnutls_kx_set_priority(*session, psk_tls_kx_order);
209 # endif
210  gnutls_transport_set_ptr(*session, (gnutls_transport_ptr_t) GINT_TO_POINTER(csock));
211  switch (type) {
212  case GNUTLS_SERVER:
213  gnutls_credentials_set(*session, GNUTLS_CRD_PSK,
214  (gnutls_psk_server_credentials_t) credentials);
215  break;
216  case GNUTLS_CLIENT:
217  gnutls_credentials_set(*session, GNUTLS_CRD_PSK,
218  (gnutls_psk_client_credentials_t) credentials);
219  break;
220  }
221 
222  return session;
223 }
224 
225 static int
226 crm_send_tls(gnutls_session_t * session, const char *buf, size_t len)
227 {
228  const char *unsent = buf;
229  int rc = 0;
230  int total_send;
231 
232  if (buf == NULL) {
233  return -EINVAL;
234  }
235 
236  total_send = len;
237  crm_trace("Message size: %llu", (unsigned long long) len);
238 
239  while (TRUE) {
240  rc = gnutls_record_send(*session, unsent, len);
241 
242  if (rc == GNUTLS_E_INTERRUPTED || rc == GNUTLS_E_AGAIN) {
243  crm_trace("Retrying to send %llu bytes",
244  (unsigned long long) len);
245 
246  } else if (rc < 0) {
247  crm_err("Connection terminated: %s " CRM_XS " rc=%d",
248  gnutls_strerror(rc), rc);
249  rc = -ECONNABORTED;
250  break;
251 
252  } else if (rc < len) {
253  crm_debug("Sent %d of %llu bytes", rc, (unsigned long long) len);
254  len -= rc;
255  unsent += rc;
256  } else {
257  crm_trace("Sent all %d bytes", rc);
258  break;
259  }
260  }
261 
262  return rc < 0 ? rc : total_send;
263 }
264 #endif
265 
266 static int
267 crm_send_plaintext(int sock, const char *buf, size_t len)
268 {
269 
270  int rc = 0;
271  const char *unsent = buf;
272  int total_send;
273 
274  if (buf == NULL) {
275  return -EINVAL;
276  }
277  total_send = len;
278 
279  crm_trace("Message on socket %d: size=%llu",
280  sock, (unsigned long long) len);
281  retry:
282  rc = write(sock, unsent, len);
283  if (rc < 0) {
284  rc = -errno;
285  switch (errno) {
286  case EINTR:
287  case EAGAIN:
288  crm_trace("Retry");
289  goto retry;
290  default:
291  crm_perror(LOG_ERR, "Could only write %d of the remaining %d bytes", rc, (int)len);
292  break;
293  }
294 
295  } else if (rc < len) {
296  crm_trace("Only sent %d of %llu remaining bytes",
297  rc, (unsigned long long) len);
298  len -= rc;
299  unsent += rc;
300  goto retry;
301 
302  } else {
303  crm_trace("Sent %d bytes: %.100s", rc, buf);
304  }
305 
306  return rc < 0 ? rc : total_send;
307 
308 }
309 
310 static int
311 crm_remote_sendv(crm_remote_t * remote, struct iovec * iov, int iovs)
312 {
313  int lpc = 0;
314  int rc = -ESOCKTNOSUPPORT;
315 
316  for(; lpc < iovs; lpc++) {
317 
318 #ifdef HAVE_GNUTLS_GNUTLS_H
319  if (remote->tls_session) {
320  rc = crm_send_tls(remote->tls_session, iov[lpc].iov_base, iov[lpc].iov_len);
321  } else if (remote->tcp_socket) {
322 #else
323  if (remote->tcp_socket) {
324 #endif
325  rc = crm_send_plaintext(remote->tcp_socket, iov[lpc].iov_base, iov[lpc].iov_len);
326 
327  } else {
328  crm_err("Unsupported connection type");
329  }
330  }
331  return rc;
332 }
333 
334 int
335 crm_remote_send(crm_remote_t * remote, xmlNode * msg)
336 {
337  int rc = pcmk_ok;
338  static uint64_t id = 0;
339  char *xml_text = dump_xml_unformatted(msg);
340 
341  struct iovec iov[2];
342  struct crm_remote_header_v0 *header;
343 
344  if (xml_text == NULL) {
345  crm_err("Could not send remote message: no message provided");
346  return -EINVAL;
347  }
348 
349  header = calloc(1, sizeof(struct crm_remote_header_v0));
350  iov[0].iov_base = header;
351  iov[0].iov_len = sizeof(struct crm_remote_header_v0);
352 
353  iov[1].iov_base = xml_text;
354  iov[1].iov_len = 1 + strlen(xml_text);
355 
356  id++;
357  header->id = id;
358  header->endian = ENDIAN_LOCAL;
359  header->version = REMOTE_MSG_VERSION;
360  header->payload_offset = iov[0].iov_len;
361  header->payload_uncompressed = iov[1].iov_len;
362  header->size_total = iov[0].iov_len + iov[1].iov_len;
363 
364  crm_trace("Sending len[0]=%d, start=%x",
365  (int)iov[0].iov_len, *(int*)(void*)xml_text);
366  rc = crm_remote_sendv(remote, iov, 2);
367  if (rc < 0) {
368  crm_err("Could not send remote message: %s " CRM_XS " rc=%d",
369  pcmk_strerror(rc), rc);
370  }
371 
372  free(iov[0].iov_base);
373  free(iov[1].iov_base);
374  return rc;
375 }
376 
377 
383 xmlNode *
385 {
386  xmlNode *xml = NULL;
387  struct crm_remote_header_v0 *header = crm_remote_header(remote);
388 
389  if (remote->buffer == NULL || header == NULL) {
390  return NULL;
391  }
392 
393  /* Support compression on the receiving end now, in case we ever want to add it later */
394  if (header->payload_compressed) {
395  int rc = 0;
396  unsigned int size_u = 1 + header->payload_uncompressed;
397  char *uncompressed = calloc(1, header->payload_offset + size_u);
398 
399  crm_trace("Decompressing message data %d bytes into %d bytes",
400  header->payload_compressed, size_u);
401 
402  rc = BZ2_bzBuffToBuffDecompress(uncompressed + header->payload_offset, &size_u,
403  remote->buffer + header->payload_offset,
404  header->payload_compressed, 1, 0);
405 
406  if (rc != BZ_OK && header->version > REMOTE_MSG_VERSION) {
407  crm_warn("Couldn't decompress v%d message, we only understand v%d",
408  header->version, REMOTE_MSG_VERSION);
409  free(uncompressed);
410  return NULL;
411 
412  } else if (rc != BZ_OK) {
413  crm_err("Decompression failed: %s (%d)", bz2_strerror(rc), rc);
414  free(uncompressed);
415  return NULL;
416  }
417 
418  CRM_ASSERT(size_u == header->payload_uncompressed);
419 
420  memcpy(uncompressed, remote->buffer, header->payload_offset); /* Preserve the header */
421  remote->buffer_size = header->payload_offset + size_u;
422 
423  free(remote->buffer);
424  remote->buffer = uncompressed;
425  header = crm_remote_header(remote);
426  }
427 
428  /* take ownership of the buffer */
429  remote->buffer_offset = 0;
430 
431  CRM_LOG_ASSERT(remote->buffer[sizeof(struct crm_remote_header_v0) + header->payload_uncompressed - 1] == 0);
432 
433  xml = string2xml(remote->buffer + header->payload_offset);
434  if (xml == NULL && header->version > REMOTE_MSG_VERSION) {
435  crm_warn("Couldn't parse v%d message, we only understand v%d",
436  header->version, REMOTE_MSG_VERSION);
437 
438  } else if (xml == NULL) {
439  crm_err("Couldn't parse: '%.120s'", remote->buffer + header->payload_offset);
440  }
441 
442  return xml;
443 }
444 
454 int
455 crm_remote_ready(crm_remote_t *remote, int total_timeout)
456 {
457  struct pollfd fds = { 0, };
458  int sock = 0;
459  int rc = 0;
460  time_t start;
461  int timeout = total_timeout;
462 
463 #ifdef HAVE_GNUTLS_GNUTLS_H
464  if (remote->tls_session) {
465  void *sock_ptr = gnutls_transport_get_ptr(*remote->tls_session);
466 
467  sock = GPOINTER_TO_INT(sock_ptr);
468  } else if (remote->tcp_socket) {
469 #else
470  if (remote->tcp_socket) {
471 #endif
472  sock = remote->tcp_socket;
473  } else {
474  crm_err("Unsupported connection type");
475  }
476 
477  if (sock <= 0) {
478  crm_trace("No longer connected");
479  return -ENOTCONN;
480  }
481 
482  start = time(NULL);
483  errno = 0;
484  do {
485  fds.fd = sock;
486  fds.events = POLLIN;
487 
488  /* If we got an EINTR while polling, and we have a
489  * specific timeout we are trying to honor, attempt
490  * to adjust the timeout to the closest second. */
491  if (errno == EINTR && (timeout > 0)) {
492  timeout = total_timeout - ((time(NULL) - start) * 1000);
493  if (timeout < 1000) {
494  timeout = 1000;
495  }
496  }
497 
498  rc = poll(&fds, 1, timeout);
499  } while (rc < 0 && errno == EINTR);
500 
501  return (rc < 0)? -errno : rc;
502 }
503 
504 
515 static size_t
516 crm_remote_recv_once(crm_remote_t * remote)
517 {
518  int rc = 0;
519  size_t read_len = sizeof(struct crm_remote_header_v0);
520  struct crm_remote_header_v0 *header = crm_remote_header(remote);
521 
522  if(header) {
523  /* Stop at the end of the current message */
524  read_len = header->size_total;
525  }
526 
527  /* automatically grow the buffer when needed */
528  if(remote->buffer_size < read_len) {
529  remote->buffer_size = 2 * read_len;
530  crm_trace("Expanding buffer to %llu bytes",
531  (unsigned long long) remote->buffer_size);
532 
533  remote->buffer = realloc_safe(remote->buffer, remote->buffer_size + 1);
534  CRM_ASSERT(remote->buffer != NULL);
535  }
536 
537 #ifdef HAVE_GNUTLS_GNUTLS_H
538  if (remote->tls_session) {
539  rc = gnutls_record_recv(*(remote->tls_session),
540  remote->buffer + remote->buffer_offset,
541  remote->buffer_size - remote->buffer_offset);
542  if (rc == GNUTLS_E_INTERRUPTED) {
543  rc = -EINTR;
544  } else if (rc == GNUTLS_E_AGAIN) {
545  rc = -EAGAIN;
546  } else if (rc < 0) {
547  crm_debug("TLS receive failed: %s (%d)", gnutls_strerror(rc), rc);
548  rc = -pcmk_err_generic;
549  }
550  } else if (remote->tcp_socket) {
551 #else
552  if (remote->tcp_socket) {
553 #endif
554  errno = 0;
555  rc = read(remote->tcp_socket,
556  remote->buffer + remote->buffer_offset,
557  remote->buffer_size - remote->buffer_offset);
558  if(rc < 0) {
559  rc = -errno;
560  }
561 
562  } else {
563  crm_err("Unsupported connection type");
564  return -ESOCKTNOSUPPORT;
565  }
566 
567  /* process any errors. */
568  if (rc > 0) {
569  remote->buffer_offset += rc;
570  /* always null terminate buffer, the +1 to alloc always allows for this. */
571  remote->buffer[remote->buffer_offset] = '\0';
572  crm_trace("Received %u more bytes, %llu total",
573  rc, (unsigned long long) remote->buffer_offset);
574 
575  } else if (rc == -EINTR || rc == -EAGAIN) {
576  crm_trace("non-blocking, exiting read: %s (%d)", pcmk_strerror(rc), rc);
577 
578  } else if (rc == 0) {
579  crm_debug("EOF encoutered after %llu bytes",
580  (unsigned long long) remote->buffer_offset);
581  return -ENOTCONN;
582 
583  } else {
584  crm_debug("Error receiving message after %llu bytes: %s (%d)",
585  (unsigned long long) remote->buffer_offset,
586  pcmk_strerror(rc), rc);
587  return -ENOTCONN;
588  }
589 
590  header = crm_remote_header(remote);
591  if(header) {
592  if(remote->buffer_offset < header->size_total) {
593  crm_trace("Read less than the advertised length: %llu < %u bytes",
594  (unsigned long long) remote->buffer_offset,
595  header->size_total);
596  } else {
597  crm_trace("Read full message of %llu bytes",
598  (unsigned long long) remote->buffer_offset);
599  return remote->buffer_offset;
600  }
601  }
602 
603  return -EAGAIN;
604 }
605 
616 gboolean
617 crm_remote_recv(crm_remote_t *remote, int total_timeout, int *disconnected)
618 {
619  int rc;
620  time_t start = time(NULL);
621  int remaining_timeout = 0;
622 
623  if (total_timeout == 0) {
624  total_timeout = 10000;
625  } else if (total_timeout < 0) {
626  total_timeout = 60000;
627  }
628  *disconnected = 0;
629 
630  remaining_timeout = total_timeout;
631  while ((remaining_timeout > 0) && !(*disconnected)) {
632 
633  crm_trace("Waiting for remote data (%d of %d ms timeout remaining)",
634  remaining_timeout, total_timeout);
635  rc = crm_remote_ready(remote, remaining_timeout);
636 
637  if (rc == 0) {
638  crm_err("Timed out (%d ms) while waiting for remote data",
639  remaining_timeout);
640  return FALSE;
641 
642  } else if (rc < 0) {
643  crm_debug("Wait for remote data aborted, will try again: %s "
644  CRM_XS " rc=%d", pcmk_strerror(rc), rc);
645 
646  } else {
647  rc = crm_remote_recv_once(remote);
648  if (rc > 0) {
649  return TRUE;
650  } else if (rc == -EAGAIN) {
651  crm_trace("Still waiting for remote data");
652  } else if (rc < 0) {
653  crm_debug("Could not receive remote data: %s " CRM_XS " rc=%d",
654  pcmk_strerror(rc), rc);
655  }
656  }
657 
658  if (rc == -ENOTCONN) {
659  *disconnected = 1;
660  return FALSE;
661  }
662 
663  remaining_timeout = total_timeout - ((time(NULL) - start) * 1000);
664  }
665 
666  return FALSE;
667 }
668 
669 struct tcp_async_cb_data {
670  gboolean success;
671  int sock;
672  void *userdata;
673  void (*callback) (void *userdata, int sock);
674  int timeout; /*ms */
675  time_t start;
676 };
677 
678 static gboolean
679 check_connect_finished(gpointer userdata)
680 {
681  struct tcp_async_cb_data *cb_data = userdata;
682  int cb_arg = 0; // socket fd on success, -errno on error
683  int sock = cb_data->sock;
684  int error = 0;
685 
686  fd_set rset, wset;
687  socklen_t len = sizeof(error);
688  struct timeval ts = { 0, };
689 
690  if (cb_data->success == TRUE) {
691  goto dispatch_done;
692  }
693 
694  FD_ZERO(&rset);
695  FD_SET(sock, &rset);
696  wset = rset;
697 
698  crm_trace("fd %d: checking to see if connect finished", sock);
699  cb_arg = select(sock + 1, &rset, &wset, NULL, &ts);
700 
701  if (cb_arg < 0) {
702  cb_arg = -errno;
703  if ((errno == EINPROGRESS) || (errno == EAGAIN)) {
704  /* reschedule if there is still time left */
705  if ((time(NULL) - cb_data->start) < (cb_data->timeout / 1000)) {
706  goto reschedule;
707  } else {
708  cb_arg = -ETIMEDOUT;
709  }
710  }
711  crm_trace("fd %d: select failed %d connect dispatch ", sock, cb_arg);
712  goto dispatch_done;
713  } else if (cb_arg == 0) {
714  if ((time(NULL) - cb_data->start) < (cb_data->timeout / 1000)) {
715  goto reschedule;
716  }
717  crm_debug("fd %d: timeout during select", sock);
718  cb_arg = -ETIMEDOUT;
719  goto dispatch_done;
720  } else {
721  crm_trace("fd %d: select returned success", sock);
722  cb_arg = 0;
723  }
724 
725  /* can we read or write to the socket now? */
726  if (FD_ISSET(sock, &rset) || FD_ISSET(sock, &wset)) {
727  if (getsockopt(sock, SOL_SOCKET, SO_ERROR, &error, &len) < 0) {
728  cb_arg = -errno;
729  crm_trace("fd %d: call to getsockopt failed", sock);
730  goto dispatch_done;
731  }
732  if (error) {
733  crm_trace("fd %d: error returned from getsockopt: %d", sock, error);
734  cb_arg = -error;
735  goto dispatch_done;
736  }
737  } else {
738  crm_trace("neither read nor write set after select");
739  cb_arg = -EAGAIN;
740  goto dispatch_done;
741  }
742 
743  dispatch_done:
744  if (!cb_arg) {
745  crm_trace("fd %d: connected", sock);
746  /* Success, set the return code to the sock to report to the callback */
747  cb_arg = cb_data->sock;
748  cb_data->sock = 0;
749  } else {
750  close(sock);
751  }
752 
753  if (cb_data->callback) {
754  cb_data->callback(cb_data->userdata, cb_arg);
755  }
756  free(cb_data);
757  return FALSE;
758 
759  reschedule:
760 
761  /* will check again next interval */
762  return TRUE;
763 }
764 
765 static int
766 internal_tcp_connect_async(int sock,
767  const struct sockaddr *addr, socklen_t addrlen, int timeout /* ms */ ,
768  int *timer_id, void *userdata, void (*callback) (void *userdata, int sock))
769 {
770  int rc = 0;
771  int interval = 500;
772  int timer;
773  struct tcp_async_cb_data *cb_data = NULL;
774 
775  rc = crm_set_nonblocking(sock);
776  if (rc < 0) {
777  crm_warn("Could not set socket non-blocking: %s " CRM_XS " rc=%d",
778  pcmk_strerror(rc), rc);
779  close(sock);
780  return -1;
781  }
782 
783  rc = connect(sock, addr, addrlen);
784  if (rc < 0 && (errno != EINPROGRESS) && (errno != EAGAIN)) {
785  crm_perror(LOG_WARNING, "connect");
786  return -1;
787  }
788 
789  cb_data = calloc(1, sizeof(struct tcp_async_cb_data));
790  cb_data->userdata = userdata;
791  cb_data->callback = callback;
792  cb_data->sock = sock;
793  cb_data->timeout = timeout;
794  cb_data->start = time(NULL);
795 
796  if (rc == 0) {
797  /* The connect was successful immediately, we still return to mainloop
798  * and let this callback get called later. This avoids the user of this api
799  * to have to account for the fact the callback could be invoked within this
800  * function before returning. */
801  cb_data->success = TRUE;
802  interval = 1;
803  }
804 
805  /* Check connect finished is mostly doing a non-block poll on the socket
806  * to see if we can read/write to it. Once we can, the connect has completed.
807  * This method allows us to connect to the server without blocking mainloop.
808  *
809  * This is a poor man's way of polling to see when the connection finished.
810  * At some point we should figure out a way to use a mainloop fd callback for this.
811  * Something about the way mainloop is currently polling prevents this from working at the
812  * moment though. */
813  crm_trace("Scheduling check in %dms for whether connect to fd %d finished",
814  interval, sock);
815  timer = g_timeout_add(interval, check_connect_finished, cb_data);
816  if (timer_id) {
817  *timer_id = timer;
818  }
819 
820  return 0;
821 }
822 
823 static int
824 internal_tcp_connect(int sock, const struct sockaddr *addr, socklen_t addrlen)
825 {
826  int rc = connect(sock, addr, addrlen);
827 
828  if (rc < 0) {
829  rc = -errno;
830  crm_warn("Could not connect socket: %s " CRM_XS " rc=%d",
831  pcmk_strerror(rc), rc);
832  return rc;
833  }
834 
835  rc = crm_set_nonblocking(sock);
836  if (rc < 0) {
837  crm_warn("Could not set socket non-blocking: %s " CRM_XS " rc=%d",
838  pcmk_strerror(rc), rc);
839  return rc;
840  }
841 
842  return pcmk_ok;
843 }
844 
858 int
859 crm_remote_tcp_connect_async(const char *host, int port, int timeout,
860  int *timer_id, void *userdata,
861  void (*callback) (void *userdata, int sock))
862 {
863  char buffer[INET6_ADDRSTRLEN];
864  struct addrinfo *res = NULL;
865  struct addrinfo *rp = NULL;
866  struct addrinfo hints;
867  const char *server = host;
868  int ret_ga;
869  int sock = -ENOTCONN;
870 
871  // Get host's IP address(es)
872  memset(&hints, 0, sizeof(struct addrinfo));
873  hints.ai_family = AF_UNSPEC; /* Allow IPv4 or IPv6 */
874  hints.ai_socktype = SOCK_STREAM;
875  hints.ai_flags = AI_CANONNAME;
876  ret_ga = getaddrinfo(server, NULL, &hints, &res);
877  if (ret_ga) {
878  crm_err("Unable to get IP address info for %s: %s",
879  server, gai_strerror(ret_ga));
880  goto async_cleanup;
881  }
882  if (!res || !res->ai_addr) {
883  crm_err("Unable to get IP address info for %s: no result", server);
884  goto async_cleanup;
885  }
886 
887  // getaddrinfo() returns a list of host's addresses, try them in order
888  for (rp = res; rp != NULL; rp = rp->ai_next) {
889  struct sockaddr *addr = rp->ai_addr;
890 
891  if (!addr) {
892  continue;
893  }
894 
895  if (rp->ai_canonname) {
896  server = res->ai_canonname;
897  }
898  crm_debug("Got canonical name %s for %s", server, host);
899 
900  sock = socket(rp->ai_family, SOCK_STREAM, IPPROTO_TCP);
901  if (sock == -1) {
902  crm_perror(LOG_WARNING, "creating socket for connection to %s",
903  server);
904  sock = -ENOTCONN;
905  continue;
906  }
907 
908  /* Set port appropriately for address family */
909  /* (void*) casts avoid false-positive compiler alignment warnings */
910  if (addr->sa_family == AF_INET6) {
911  ((struct sockaddr_in6 *)(void*)addr)->sin6_port = htons(port);
912  } else {
913  ((struct sockaddr_in *)(void*)addr)->sin_port = htons(port);
914  }
915 
916  memset(buffer, 0, DIMOF(buffer));
917  crm_sockaddr2str(addr, buffer);
918  crm_info("Attempting TCP connection to %s:%d", buffer, port);
919 
920  if (callback) {
921  if (internal_tcp_connect_async
922  (sock, rp->ai_addr, rp->ai_addrlen, timeout, timer_id, userdata, callback) == 0) {
923  goto async_cleanup; /* Success for now, we'll hear back later in the callback */
924  }
925 
926  } else if (internal_tcp_connect(sock, rp->ai_addr, rp->ai_addrlen) == 0) {
927  break; /* Success */
928  }
929 
930  close(sock);
931  sock = -ENOTCONN;
932  }
933 
934 async_cleanup:
935 
936  if (res) {
937  freeaddrinfo(res);
938  }
939  return sock;
940 }
941 
942 int
943 crm_remote_tcp_connect(const char *host, int port)
944 {
945  return crm_remote_tcp_connect_async(host, port, -1, NULL, NULL, NULL);
946 }
947 
958 void
959 crm_sockaddr2str(void *sa, char *s)
960 {
961  switch (((struct sockaddr*)sa)->sa_family) {
962  case AF_INET:
963  inet_ntop(AF_INET, &(((struct sockaddr_in *)sa)->sin_addr),
964  s, INET6_ADDRSTRLEN);
965  break;
966 
967  case AF_INET6:
968  inet_ntop(AF_INET6, &(((struct sockaddr_in6 *)sa)->sin6_addr),
969  s, INET6_ADDRSTRLEN);
970  break;
971 
972  default:
973  strcpy(s, "<invalid>");
974  }
975 }
976 
977 int
979 {
980  int csock = 0;
981  int rc = 0;
982  unsigned laddr = 0;
983  struct sockaddr_storage addr;
984  char addr_str[INET6_ADDRSTRLEN];
985 #ifdef TCP_USER_TIMEOUT
986  int optval;
987  long sbd_timeout = crm_get_sbd_timeout();
988 #endif
989 
990  /* accept the connection */
991  laddr = sizeof(addr);
992  memset(&addr, 0, sizeof(addr));
993  csock = accept(ssock, (struct sockaddr *)&addr, &laddr);
994  crm_sockaddr2str(&addr, addr_str);
995  crm_info("New remote connection from %s", addr_str);
996 
997  if (csock == -1) {
998  crm_err("accept socket failed");
999  return -1;
1000  }
1001 
1002  rc = crm_set_nonblocking(csock);
1003  if (rc < 0) {
1004  crm_err("Could not set socket non-blocking: %s " CRM_XS " rc=%d",
1005  pcmk_strerror(rc), rc);
1006  close(csock);
1007  return rc;
1008  }
1009 
1010 #ifdef TCP_USER_TIMEOUT
1011  if (sbd_timeout > 0) {
1012  optval = sbd_timeout / 2; /* time to fail and retry before watchdog */
1013  rc = setsockopt(csock, SOL_TCP, TCP_USER_TIMEOUT,
1014  &optval, sizeof(optval));
1015  if (rc < 0) {
1016  crm_err("setting TCP_USER_TIMEOUT (%d) on client socket failed",
1017  optval);
1018  close(csock);
1019  return rc;
1020  }
1021  }
1022 #endif
1023 
1024  return csock;
1025 }
1026 
1032 int
1034 {
1035  static int port = 0;
1036 
1037  if (port == 0) {
1038  const char *env = getenv("PCMK_remote_port");
1039 
1040  if (env) {
1041  errno = 0;
1042  port = strtol(env, NULL, 10);
1043  if (errno || (port < 1) || (port > 65535)) {
1044  crm_warn("Environment variable PCMK_remote_port has invalid value '%s', using %d instead",
1045  env, DEFAULT_REMOTE_PORT);
1046  port = DEFAULT_REMOTE_PORT;
1047  }
1048  } else {
1049  port = DEFAULT_REMOTE_PORT;
1050  }
1051  }
1052  return port;
1053 }
A dumping ground.
size_t buffer_offset
Definition: ipcs.h:46
int crm_remote_ready(crm_remote_t *remote, int total_timeout)
Definition: remote.c:455
gboolean crm_remote_recv(crm_remote_t *remote, int total_timeout, int *disconnected)
Definition: remote.c:617
long crm_get_sbd_timeout(void)
Definition: watchdog.c:246
uint32_t payload_compressed
Definition: remote.c:159
const char * pcmk_strerror(int rc)
Definition: logging.c:1135
#define pcmk_ok
Definition: error.h:42
char * buffer
Definition: ipcs.h:44
AIS_Host host
Definition: internal.h:52
uint32_t payload_uncompressed
Definition: remote.c:160
#define CRM_LOG_ASSERT(expr)
Definition: logging.h:150
int crm_remote_accept(int ssock)
Definition: remote.c:978
#define ENDIAN_LOCAL
Definition: remote.c:92
int crm_remote_tcp_connect(const char *host, int port)
Definition: remote.c:943
Wrappers for and extensions to glib mainloop.
uint32_t endian
Definition: remote.c:153
xmlNode * string2xml(const char *input)
Definition: xml.c:2750
char version[256]
Definition: plugin.c:84
#define DEFAULT_REMOTE_PORT
Definition: lrmd.h:54
void crm_sockaddr2str(void *sa, char *s)
Convert an IP address (IPv4 or IPv6) to a string for logging.
Definition: remote.c:959
#define crm_warn(fmt, args...)
Definition: logging.h:249
uint32_t id
Definition: internal.h:48
#define crm_debug(fmt, args...)
Definition: logging.h:253
int crm_initiate_client_tls_handshake(crm_remote_t *remote, int timeout_ms)
void gnutls_session_t
Definition: cib_remote.c:52
xmlNode * crm_remote_parse_buffer(crm_remote_t *remote)
Definition: remote.c:384
#define crm_trace(fmt, args...)
Definition: logging.h:254
enum crm_proc_flag __attribute__
int crm_set_nonblocking(int fd)
Definition: io.c:471
#define __swab64(x)
Definition: remote.c:80
Wrappers for and extensions to libxml2.
#define pcmk_err_generic
Definition: error.h:45
uint32_t payload_offset
Definition: remote.c:158
int crm_remote_tcp_connect_async(const char *host, int port, int timeout, int *timer_id, void *userdata, void(*callback)(void *userdata, int sock))
Definition: remote.c:859
uint32_t size_total
Definition: remote.c:157
#define CRM_XS
Definition: logging.h:42
#define __swab32(x)
Definition: remote.c:74
#define crm_perror(level, fmt, args...)
Log a system error message.
Definition: logging.h:226
size_t buffer_size
Definition: ipcs.h:45
#define REMOTE_MSG_VERSION
Definition: remote.c:91
#define crm_err(fmt, args...)
Definition: logging.h:248
const char * bz2_strerror(int rc)
Definition: logging.c:1198
char * dump_xml_unformatted(xmlNode *msg)
Definition: xml.c:3825
#define DIMOF(a)
Definition: crm.h:39
#define uint32_t
Definition: stdint.in.h:158
#define CRM_ASSERT(expr)
Definition: error.h:35
void * create_psk_tls_session(int csock, int type, void *credentials)
void * crm_create_anon_tls_session(int sock, int type, void *credentials)
int tcp_socket
Definition: ipcs.h:48
int crm_remote_send(crm_remote_t *remote, xmlNode *msg)
Definition: remote.c:335
#define crm_info(fmt, args...)
Definition: logging.h:251
uint64_t flags
Definition: remote.c:156
int crm_default_remote_port(void)
Get the default remote connection TCP port on this host.
Definition: remote.c:1033
enum crm_ais_msg_types type
Definition: internal.h:51