pacemaker  1.1.14-70404b0
Scalable High-Availability cluster resource manager
 All Data Structures Files Functions Variables Typedefs Enumerations Enumerator Macros Modules Pages
remote.c
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2008 Andrew Beekhof
3  *
4  * This library is free software; you can redistribute it and/or
5  * modify it under the terms of the GNU Lesser General Public
6  * License as published by the Free Software Foundation; either
7  * version 2.1 of the License, or (at your option) any later version.
8  *
9  * This library is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
12  * Lesser General Public License for more details.
13  *
14  * You should have received a copy of the GNU Lesser General Public
15  * License along with this library; if not, write to the Free Software
16  * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
17  *
18  */
19 #include <crm_internal.h>
20 #include <crm/crm.h>
21 
22 #include <sys/param.h>
23 #include <stdio.h>
24 #include <sys/types.h>
25 #include <sys/stat.h>
26 #include <unistd.h>
27 #include <sys/socket.h>
28 #include <arpa/inet.h>
29 #include <netinet/in.h>
30 #include <netinet/ip.h>
31 #include <netdb.h>
32 
33 #include <stdlib.h>
34 #include <errno.h>
35 #include <fcntl.h>
36 #include <glib.h>
37 
38 #include <bzlib.h>
39 
40 #include <crm/common/ipcs.h>
41 #include <crm/common/xml.h>
42 #include <crm/common/mainloop.h>
43 
44 #ifdef HAVE_GNUTLS_GNUTLS_H
45 # undef KEYFILE
46 # include <gnutls/gnutls.h>
47 
48 const int psk_tls_kx_order[] = {
49  GNUTLS_KX_DHE_PSK,
50  GNUTLS_KX_PSK,
51 };
52 
53 const int anon_tls_kx_order[] = {
54  GNUTLS_KX_ANON_DH,
55  GNUTLS_KX_DHE_RSA,
56  GNUTLS_KX_DHE_DSS,
57  GNUTLS_KX_RSA,
58  0
59 };
60 #endif
61 
62 /* Swab macros from linux/swab.h */
63 #ifdef HAVE_LINUX_SWAB_H
64 # include <linux/swab.h>
65 #else
66 /*
67  * casts are necessary for constants, because we never know how for sure
68  * how U/UL/ULL map to __u16, __u32, __u64. At least not in a portable way.
69  */
70 #define __swab16(x) ((uint16_t)( \
71  (((uint16_t)(x) & (uint16_t)0x00ffU) << 8) | \
72  (((uint16_t)(x) & (uint16_t)0xff00U) >> 8)))
73 
74 #define __swab32(x) ((uint32_t)( \
75  (((uint32_t)(x) & (uint32_t)0x000000ffUL) << 24) | \
76  (((uint32_t)(x) & (uint32_t)0x0000ff00UL) << 8) | \
77  (((uint32_t)(x) & (uint32_t)0x00ff0000UL) >> 8) | \
78  (((uint32_t)(x) & (uint32_t)0xff000000UL) >> 24)))
79 
80 #define __swab64(x) ((uint64_t)( \
81  (((uint64_t)(x) & (uint64_t)0x00000000000000ffULL) << 56) | \
82  (((uint64_t)(x) & (uint64_t)0x000000000000ff00ULL) << 40) | \
83  (((uint64_t)(x) & (uint64_t)0x0000000000ff0000ULL) << 24) | \
84  (((uint64_t)(x) & (uint64_t)0x00000000ff000000ULL) << 8) | \
85  (((uint64_t)(x) & (uint64_t)0x000000ff00000000ULL) >> 8) | \
86  (((uint64_t)(x) & (uint64_t)0x0000ff0000000000ULL) >> 24) | \
87  (((uint64_t)(x) & (uint64_t)0x00ff000000000000ULL) >> 40) | \
88  (((uint64_t)(x) & (uint64_t)0xff00000000000000ULL) >> 56)))
89 #endif
90 
91 #define REMOTE_MSG_VERSION 1
92 #define ENDIAN_LOCAL 0xBADADBBD
93 
94 struct crm_remote_header_v0
95 {
96  uint32_t endian; /* Detect messages from hosts with different endian-ness */
98  uint64_t id;
99  uint64_t flags;
104 
105  /* New fields get added here */
106 
107 } __attribute__ ((packed));
108 
109 static struct crm_remote_header_v0 *
110 crm_remote_header(crm_remote_t * remote)
111 {
112  struct crm_remote_header_v0 *header = (struct crm_remote_header_v0 *)remote->buffer;
113  if(remote->buffer_offset < sizeof(struct crm_remote_header_v0)) {
114  return NULL;
115 
116  } else if(header->endian != ENDIAN_LOCAL) {
117  uint32_t endian = __swab32(header->endian);
120  if(endian != ENDIAN_LOCAL) {
121  crm_err("Invalid message detected, endian mismatch: %lx is neither %lx nor the swab'd %lx",
122  ENDIAN_LOCAL, header->endian, endian);
123  return NULL;
124  }
126  header->id = __swab64(header->id);
127  header->flags = __swab64(header->flags);
128  header->endian = __swab32(header->endian);
129 
130  header->version = __swab32(header->version);
131  header->size_total = __swab32(header->size_total);
132  header->payload_offset = __swab32(header->payload_offset);
133  header->payload_compressed = __swab32(header->payload_compressed);
134  header->payload_uncompressed = __swab32(header->payload_uncompressed);
135  }
136 
137  return header;
138 }
139 
140 #ifdef HAVE_GNUTLS_GNUTLS_H
141 
142 int
143 crm_initiate_client_tls_handshake(crm_remote_t * remote, int timeout_ms)
144 {
145  int rc = 0;
146  int pollrc = 0;
147  time_t start = time(NULL);
148 
149  do {
150  rc = gnutls_handshake(*remote->tls_session);
151  if (rc == GNUTLS_E_INTERRUPTED || rc == GNUTLS_E_AGAIN) {
152  pollrc = crm_remote_ready(remote, 1000);
153  if (pollrc < 0) {
154  /* poll returned error, there is no hope */
155  rc = -1;
156  }
157  }
158 
159  } while (((time(NULL) - start) < (timeout_ms / 1000)) &&
160  (rc == GNUTLS_E_INTERRUPTED || rc == GNUTLS_E_AGAIN));
161 
162  if (rc < 0) {
163  crm_trace("gnutls_handshake() failed with %d", rc);
164  }
165  return rc;
166 }
167 
168 void *
169 crm_create_anon_tls_session(int csock, int type /* GNUTLS_SERVER, GNUTLS_CLIENT */ ,
170  void *credentials)
171 {
172  gnutls_session_t *session = gnutls_malloc(sizeof(gnutls_session_t));
173 
174  gnutls_init(session, type);
175 # ifdef HAVE_GNUTLS_PRIORITY_SET_DIRECT
176 /* http://www.manpagez.com/info/gnutls/gnutls-2.10.4/gnutls_81.php#Echo-Server-with-anonymous-authentication */
177  gnutls_priority_set_direct(*session, "NORMAL:+ANON-DH", NULL);
178 /* gnutls_priority_set_direct (*session, "NONE:+VERS-TLS-ALL:+CIPHER-ALL:+MAC-ALL:+SIGN-ALL:+COMP-ALL:+ANON-DH", NULL); */
179 # else
180  gnutls_set_default_priority(*session);
181  gnutls_kx_set_priority(*session, anon_tls_kx_order);
182 # endif
183  gnutls_transport_set_ptr(*session, (gnutls_transport_ptr_t) GINT_TO_POINTER(csock));
184  switch (type) {
185  case GNUTLS_SERVER:
186  gnutls_credentials_set(*session, GNUTLS_CRD_ANON,
187  (gnutls_anon_server_credentials_t) credentials);
188  break;
189  case GNUTLS_CLIENT:
190  gnutls_credentials_set(*session, GNUTLS_CRD_ANON,
191  (gnutls_anon_client_credentials_t) credentials);
192  break;
193  }
194 
195  return session;
196 }
197 
198 void *
199 create_psk_tls_session(int csock, int type /* GNUTLS_SERVER, GNUTLS_CLIENT */ , void *credentials)
200 {
201  gnutls_session_t *session = gnutls_malloc(sizeof(gnutls_session_t));
202 
203  gnutls_init(session, type);
204 # ifdef HAVE_GNUTLS_PRIORITY_SET_DIRECT
205  gnutls_priority_set_direct(*session, "NORMAL:+DHE-PSK:+PSK", NULL);
206 # else
207  gnutls_set_default_priority(*session);
208  gnutls_kx_set_priority(*session, psk_tls_kx_order);
209 # endif
210  gnutls_transport_set_ptr(*session, (gnutls_transport_ptr_t) GINT_TO_POINTER(csock));
211  switch (type) {
212  case GNUTLS_SERVER:
213  gnutls_credentials_set(*session, GNUTLS_CRD_PSK,
214  (gnutls_psk_server_credentials_t) credentials);
215  break;
216  case GNUTLS_CLIENT:
217  gnutls_credentials_set(*session, GNUTLS_CRD_PSK,
218  (gnutls_psk_client_credentials_t) credentials);
219  break;
220  }
221 
222  return session;
223 }
224 
225 static int
226 crm_send_tls(gnutls_session_t * session, const char *buf, size_t len)
227 {
228  const char *unsent = buf;
229  int rc = 0;
230  int total_send;
231 
232  if (buf == NULL) {
233  return -1;
234  }
235 
236  total_send = len;
237  crm_trace("Message size: %d", len);
238 
239  while (TRUE) {
240  rc = gnutls_record_send(*session, unsent, len);
241 
242  if (rc == GNUTLS_E_INTERRUPTED || rc == GNUTLS_E_AGAIN) {
243  crm_debug("Retry");
244 
245  } else if (rc < 0) {
246  crm_err("Connection terminated rc = %d", rc);
247  break;
248 
249  } else if (rc < len) {
250  crm_debug("Sent %d of %d bytes", rc, len);
251  len -= rc;
252  unsent += rc;
253  } else {
254  crm_trace("Sent all %d bytes", rc);
255  break;
256  }
257  }
258 
259  return rc < 0 ? rc : total_send;
260 }
261 #endif
262 
263 static int
264 crm_send_plaintext(int sock, const char *buf, size_t len)
265 {
266 
267  int rc = 0;
268  const char *unsent = buf;
269  int total_send;
270 
271  if (buf == NULL) {
272  return -1;
273  }
274  total_send = len;
275 
276  crm_trace("Message on socket %d: size=%d", sock, len);
277  retry:
278  rc = write(sock, unsent, len);
279  if (rc < 0) {
280  switch (errno) {
281  case EINTR:
282  case EAGAIN:
283  crm_trace("Retry");
284  goto retry;
285  default:
286  crm_perror(LOG_ERR, "Could only write %d of the remaining %d bytes", rc, (int)len);
287  break;
288  }
289 
290  } else if (rc < len) {
291  crm_trace("Only sent %d of %d remaining bytes", rc, len);
292  len -= rc;
293  unsent += rc;
294  goto retry;
295 
296  } else {
297  crm_trace("Sent %d bytes: %.100s", rc, buf);
298  }
299 
300  return rc < 0 ? rc : total_send;
301 
302 }
303 
304 static int
305 crm_remote_sendv(crm_remote_t * remote, struct iovec * iov, int iovs)
306 {
307  int lpc = 0;
308  int rc = -ESOCKTNOSUPPORT;
309 
310  for(; lpc < iovs; lpc++) {
311 
312 #ifdef HAVE_GNUTLS_GNUTLS_H
313  if (remote->tls_session) {
314  rc = crm_send_tls(remote->tls_session, iov[lpc].iov_base, iov[lpc].iov_len);
315  } else if (remote->tcp_socket) {
316 #else
317  if (remote->tcp_socket) {
318 #endif
319  rc = crm_send_plaintext(remote->tcp_socket, iov[lpc].iov_base, iov[lpc].iov_len);
320 
321  } else {
322  crm_err("Unsupported connection type");
323  }
324  }
325  return rc;
326 }
327 
328 int
329 crm_remote_send(crm_remote_t * remote, xmlNode * msg)
330 {
331  int rc = -1;
332  static uint64_t id = 0;
333  char *xml_text = dump_xml_unformatted(msg);
334 
335  struct iovec iov[2];
336  struct crm_remote_header_v0 *header;
337 
338  if (xml_text == NULL) {
339  crm_err("Invalid XML, can not send msg");
340  return -1;
341  }
342 
343  header = calloc(1, sizeof(struct crm_remote_header_v0));
344  iov[0].iov_base = header;
345  iov[0].iov_len = sizeof(struct crm_remote_header_v0);
346 
347  iov[1].iov_base = xml_text;
348  iov[1].iov_len = 1 + strlen(xml_text);
349 
350  id++;
351  header->id = id;
352  header->endian = ENDIAN_LOCAL;
353  header->version = REMOTE_MSG_VERSION;
354  header->payload_offset = iov[0].iov_len;
355  header->payload_uncompressed = iov[1].iov_len;
356  header->size_total = iov[0].iov_len + iov[1].iov_len;
357 
358  crm_trace("Sending len[0]=%d, start=%x\n",
359  (int)iov[0].iov_len, *(int*)(void*)xml_text);
360  rc = crm_remote_sendv(remote, iov, 2);
361  if (rc < 0) {
362  crm_err("Failed to send remote msg, rc = %d", rc);
363  }
364 
365  free(iov[0].iov_base);
366  free(iov[1].iov_base);
367  return rc;
368 }
369 
370 
376 xmlNode *
378 {
379  xmlNode *xml = NULL;
380  struct crm_remote_header_v0 *header = crm_remote_header(remote);
381 
382  if (remote->buffer == NULL || header == NULL) {
383  return NULL;
384  }
385 
386  /* take ownership of the buffer */
387  remote->buffer_offset = 0;
388 
389  /* Support compression on the receiving end now, in case we ever want to add it later */
390  if (header->payload_compressed) {
391  int rc = 0;
392  unsigned int size_u = 1 + header->payload_uncompressed;
393  char *uncompressed = calloc(1, header->payload_offset + size_u);
394 
395  crm_trace("Decompressing message data %d bytes into %d bytes",
396  header->payload_compressed, size_u);
397 
398  rc = BZ2_bzBuffToBuffDecompress(uncompressed + header->payload_offset, &size_u,
399  remote->buffer + header->payload_offset,
400  header->payload_compressed, 1, 0);
401 
402  if (rc != BZ_OK && header->version > REMOTE_MSG_VERSION) {
403  crm_warn("Couldn't decompress v%d message, we only understand v%d",
404  header->version, REMOTE_MSG_VERSION);
405  free(uncompressed);
406  return NULL;
407 
408  } else if (rc != BZ_OK) {
409  crm_err("Decompression failed: %s (%d)", bz2_strerror(rc), rc);
410  free(uncompressed);
411  return NULL;
412  }
413 
414  CRM_ASSERT(size_u == header->payload_uncompressed);
415 
416  memcpy(uncompressed, remote->buffer, header->payload_offset); /* Preserve the header */
417  remote->buffer_size = header->payload_offset + size_u;
418 
419  free(remote->buffer);
420  remote->buffer = uncompressed;
421  header = crm_remote_header(remote);
422  }
423 
424  CRM_LOG_ASSERT(remote->buffer[sizeof(struct crm_remote_header_v0) + header->payload_uncompressed - 1] == 0);
425 
426  xml = string2xml(remote->buffer + header->payload_offset);
427  if (xml == NULL && header->version > REMOTE_MSG_VERSION) {
428  crm_warn("Couldn't parse v%d message, we only understand v%d",
429  header->version, REMOTE_MSG_VERSION);
430 
431  } else if (xml == NULL) {
432  crm_err("Couldn't parse: '%.120s'", remote->buffer + header->payload_offset);
433  }
434 
435  return xml;
436 }
437 
446 int
447 crm_remote_ready(crm_remote_t * remote, int timeout /* ms */ )
448 {
449  struct pollfd fds = { 0, };
450  int sock = 0;
451  int rc = 0;
452  time_t start;
453 
454 #ifdef HAVE_GNUTLS_GNUTLS_H
455  if (remote->tls_session) {
456  void *sock_ptr = gnutls_transport_get_ptr(*remote->tls_session);
457 
458  sock = GPOINTER_TO_INT(sock_ptr);
459  } else if (remote->tcp_socket) {
460 #else
461  if (remote->tcp_socket) {
462 #endif
463  sock = remote->tcp_socket;
464  } else {
465  crm_err("Unsupported connection type");
466  }
467 
468  if (sock <= 0) {
469  crm_trace("No longer connected");
470  return -ENOTCONN;
471  }
472 
473  start = time(NULL);
474  errno = 0;
475  do {
476  fds.fd = sock;
477  fds.events = POLLIN;
478 
479  /* If we got an EINTR while polling, and we have a
480  * specific timeout we are trying to honor, attempt
481  * to adjust the timeout to the closest second. */
482  if (errno == EINTR && (timeout > 0)) {
483  timeout = timeout - ((time(NULL) - start) * 1000);
484  if (timeout < 1000) {
485  timeout = 1000;
486  }
487  }
488 
489  rc = poll(&fds, 1, timeout);
490  } while (rc < 0 && errno == EINTR);
491 
492  return rc;
493 }
494 
495 
506 static size_t
507 crm_remote_recv_once(crm_remote_t * remote)
508 {
509  int rc = 0;
510  size_t read_len = sizeof(struct crm_remote_header_v0);
511  struct crm_remote_header_v0 *header = crm_remote_header(remote);
512 
513  if(header) {
514  /* Stop at the end of the current message */
515  read_len = header->size_total;
516  }
517 
518  /* automatically grow the buffer when needed */
519  if(remote->buffer_size < read_len) {
520  remote->buffer_size = 2 * read_len;
521  crm_trace("Expanding buffer to %u bytes", remote->buffer_size);
522 
523  remote->buffer = realloc_safe(remote->buffer, remote->buffer_size + 1);
524  CRM_ASSERT(remote->buffer != NULL);
525  }
526 
527 #ifdef HAVE_GNUTLS_GNUTLS_H
528  if (remote->tls_session) {
529  rc = gnutls_record_recv(*(remote->tls_session),
530  remote->buffer + remote->buffer_offset,
531  remote->buffer_size - remote->buffer_offset);
532  if (rc == GNUTLS_E_INTERRUPTED) {
533  rc = -EINTR;
534  } else if (rc == GNUTLS_E_AGAIN) {
535  rc = -EAGAIN;
536  } else if (rc < 0) {
537  crm_debug("TLS receive failed: %s (%d)", gnutls_strerror(rc), rc);
538  rc = -pcmk_err_generic;
539  }
540  } else if (remote->tcp_socket) {
541 #else
542  if (remote->tcp_socket) {
543 #endif
544  errno = 0;
545  rc = read(remote->tcp_socket,
546  remote->buffer + remote->buffer_offset,
547  remote->buffer_size - remote->buffer_offset);
548  if(rc < 0) {
549  rc = -errno;
550  }
551 
552  } else {
553  crm_err("Unsupported connection type");
554  return -ESOCKTNOSUPPORT;
555  }
556 
557  /* process any errors. */
558  if (rc > 0) {
559  remote->buffer_offset += rc;
560  /* always null terminate buffer, the +1 to alloc always allows for this. */
561  remote->buffer[remote->buffer_offset] = '\0';
562  crm_trace("Received %u more bytes, %u total", rc, remote->buffer_offset);
563 
564  } else if (rc == -EINTR || rc == -EAGAIN) {
565  crm_trace("non-blocking, exiting read: %s (%d)", pcmk_strerror(rc), rc);
566 
567  } else if (rc == 0) {
568  crm_debug("EOF encoutered after %u bytes", remote->buffer_offset);
569  return -ENOTCONN;
570 
571  } else {
572  crm_debug("Error receiving message after %u bytes: %s (%d)",
573  remote->buffer_offset, pcmk_strerror(rc), rc);
574  return -ENOTCONN;
575  }
576 
577  header = crm_remote_header(remote);
578  if(header) {
579  if(remote->buffer_offset < header->size_total) {
580  crm_trace("Read less than the advertised length: %u < %u bytes",
581  remote->buffer_offset, header->size_total);
582  } else {
583  crm_trace("Read full message of %u bytes", remote->buffer_offset);
584  return remote->buffer_offset;
585  }
586  }
587 
588  return -EAGAIN;
589 }
590 
598 gboolean
599 crm_remote_recv(crm_remote_t * remote, int total_timeout /*ms */ , int *disconnected)
600 {
601  int rc;
602  time_t start = time(NULL);
603  int remaining_timeout = 0;
604 
605  if (total_timeout == 0) {
606  total_timeout = 10000;
607  } else if (total_timeout < 0) {
608  total_timeout = 60000;
609  }
610  *disconnected = 0;
611 
612  remaining_timeout = total_timeout;
613  while ((remaining_timeout > 0) && !(*disconnected)) {
614 
615  /* read some more off the tls buffer if we still have time left. */
616  crm_trace("waiting to receive remote msg, starting timeout %d, remaining_timeout %d",
617  total_timeout, remaining_timeout);
618  rc = crm_remote_ready(remote, remaining_timeout);
619 
620  if (rc == 0) {
621  crm_err("poll timed out (%d ms) while waiting to receive msg", remaining_timeout);
622  return FALSE;
623 
624  } else if (rc == -EAGAIN) {
625  crm_trace("waiting for remote connection data (up to %dms)",
626  remaining_timeout);
627 
628  } else if(rc < 0) {
629  crm_debug("poll() failed: %s (%d)", pcmk_strerror(rc), rc);
630 
631  } else {
632  rc = crm_remote_recv_once(remote);
633  if(rc > 0) {
634  return TRUE;
635  } else if (rc < 0) {
636  crm_debug("recv() failed: %s (%d)", pcmk_strerror(rc), rc);
637  }
638  }
639 
640  if(rc == -ENOTCONN) {
641  *disconnected = 1;
642  return FALSE;
643  }
644 
645  remaining_timeout = remaining_timeout - ((time(NULL) - start) * 1000);
646  }
647 
648  return FALSE;
649 }
650 
651 struct tcp_async_cb_data {
652  gboolean success;
653  int sock;
654  void *userdata;
655  void (*callback) (void *userdata, int sock);
656  int timeout; /*ms */
657  time_t start;
658 };
659 
660 static gboolean
661 check_connect_finished(gpointer userdata)
662 {
663  struct tcp_async_cb_data *cb_data = userdata;
664  int rc = 0;
665  int sock = cb_data->sock;
666  int error = 0;
667 
668  fd_set rset, wset;
669  socklen_t len = sizeof(error);
670  struct timeval ts = { 0, };
671 
672  if (cb_data->success == TRUE) {
673  goto dispatch_done;
674  }
675 
676  FD_ZERO(&rset);
677  FD_SET(sock, &rset);
678  wset = rset;
679 
680  crm_trace("fd %d: checking to see if connect finished", sock);
681  rc = select(sock + 1, &rset, &wset, NULL, &ts);
682 
683  if (rc < 0) {
684  rc = errno;
685  if ((errno == EINPROGRESS) || (errno == EAGAIN)) {
686  /* reschedule if there is still time left */
687  if ((time(NULL) - cb_data->start) < (cb_data->timeout / 1000)) {
688  goto reschedule;
689  } else {
690  rc = -ETIMEDOUT;
691  }
692  }
693  crm_trace("fd %d: select failed %d connect dispatch ", rc);
694  goto dispatch_done;
695  } else if (rc == 0) {
696  if ((time(NULL) - cb_data->start) < (cb_data->timeout / 1000)) {
697  goto reschedule;
698  }
699  crm_debug("fd %d: timeout during select", sock);
700  rc = -ETIMEDOUT;
701  goto dispatch_done;
702  } else {
703  crm_trace("fd %d: select returned success", sock);
704  rc = 0;
705  }
706 
707  /* can we read or write to the socket now? */
708  if (FD_ISSET(sock, &rset) || FD_ISSET(sock, &wset)) {
709  if (getsockopt(sock, SOL_SOCKET, SO_ERROR, &error, &len) < 0) {
710  crm_trace("fd %d: call to getsockopt failed", sock);
711  rc = -1;
712  goto dispatch_done;
713  }
714 
715  if (error) {
716  crm_trace("fd %d: error returned from getsockopt: %d", sock, error);
717  rc = -1;
718  goto dispatch_done;
719  }
720  } else {
721  crm_trace("neither read nor write set after select");
722  rc = -1;
723  goto dispatch_done;
724  }
725 
726  dispatch_done:
727  if (!rc) {
728  crm_trace("fd %d: connected", sock);
729  /* Success, set the return code to the sock to report to the callback */
730  rc = cb_data->sock;
731  cb_data->sock = 0;
732  } else {
733  close(sock);
734  }
735 
736  if (cb_data->callback) {
737  cb_data->callback(cb_data->userdata, rc);
738  }
739  free(cb_data);
740  return FALSE;
741 
742  reschedule:
743 
744  /* will check again next interval */
745  return TRUE;
746 }
747 
748 static int
749 internal_tcp_connect_async(int sock,
750  const struct sockaddr *addr, socklen_t addrlen, int timeout /* ms */ ,
751  int *timer_id, void *userdata, void (*callback) (void *userdata, int sock))
752 {
753  int rc = 0;
754  int flag = 0;
755  int interval = 500;
756  int timer;
757  struct tcp_async_cb_data *cb_data = NULL;
758 
759  if ((flag = fcntl(sock, F_GETFL)) >= 0) {
760  if (fcntl(sock, F_SETFL, flag | O_NONBLOCK) < 0) {
761  crm_err("fcntl() write failed");
762  return -1;
763  }
764  }
765 
766  rc = connect(sock, addr, addrlen);
767 
768  if (rc < 0 && (errno != EINPROGRESS) && (errno != EAGAIN)) {
769  return -1;
770  }
771 
772  cb_data = calloc(1, sizeof(struct tcp_async_cb_data));
773  cb_data->userdata = userdata;
774  cb_data->callback = callback;
775  cb_data->sock = sock;
776  cb_data->timeout = timeout;
777  cb_data->start = time(NULL);
778 
779  if (rc == 0) {
780  /* The connect was successful immediately, we still return to mainloop
781  * and let this callback get called later. This avoids the user of this api
782  * to have to account for the fact the callback could be invoked within this
783  * function before returning. */
784  cb_data->success = TRUE;
785  interval = 1;
786  }
787 
788  /* Check connect finished is mostly doing a non-block poll on the socket
789  * to see if we can read/write to it. Once we can, the connect has completed.
790  * This method allows us to connect to the server without blocking mainloop.
791  *
792  * This is a poor man's way of polling to see when the connection finished.
793  * At some point we should figure out a way to use a mainloop fd callback for this.
794  * Something about the way mainloop is currently polling prevents this from working at the
795  * moment though. */
796  crm_trace("fd %d: scheduling to check if connect finished in %dms second", sock, interval);
797  timer = g_timeout_add(interval, check_connect_finished, cb_data);
798  if (timer_id) {
799  *timer_id = timer;
800  }
801 
802  return 0;
803 }
804 
805 static int
806 internal_tcp_connect(int sock, const struct sockaddr *addr, socklen_t addrlen)
807 {
808  int flag = 0;
809  int rc = connect(sock, addr, addrlen);
810 
811  if (rc == 0) {
812  if ((flag = fcntl(sock, F_GETFL)) >= 0) {
813  if (fcntl(sock, F_SETFL, flag | O_NONBLOCK) < 0) {
814  crm_err("fcntl() write failed");
815  return -1;
816  }
817  }
818  }
819 
820  return rc;
821 }
822 
829 int
830 crm_remote_tcp_connect_async(const char *host, int port, int timeout, /*ms */
831  int *timer_id, void *userdata, void (*callback) (void *userdata, int sock))
832 {
833  char buffer[256];
834  struct addrinfo *res = NULL;
835  struct addrinfo *rp = NULL;
836  struct addrinfo hints;
837  const char *server = host;
838  int ret_ga;
839  int sock = -1;
840 
841  /* getaddrinfo */
842  memset(&hints, 0, sizeof(struct addrinfo));
843  hints.ai_family = AF_UNSPEC; /* Allow IPv4 or IPv6 */
844  hints.ai_socktype = SOCK_STREAM;
845  hints.ai_flags = AI_CANONNAME;
846 
847  crm_debug("Looking up %s", server);
848  ret_ga = getaddrinfo(server, NULL, &hints, &res);
849  if (ret_ga) {
850  crm_err("getaddrinfo: %s", gai_strerror(ret_ga));
851  return -1;
852  }
853 
854  if (!res || !res->ai_addr) {
855  crm_err("getaddrinfo failed");
856  goto async_cleanup;
857  }
858 
859  for (rp = res; rp != NULL; rp = rp->ai_next) {
860  struct sockaddr *addr = rp->ai_addr;
861 
862  if (!addr) {
863  continue;
864  }
865 
866  if (rp->ai_canonname) {
867  server = res->ai_canonname;
868  }
869  crm_debug("Got address %s for %s", server, host);
870 
871  /* create socket */
872  sock = socket(rp->ai_family, SOCK_STREAM, IPPROTO_TCP);
873  if (sock == -1) {
874  crm_err("Socket creation failed for remote client connection.");
875  continue;
876  }
877 
878  memset(buffer, 0, DIMOF(buffer));
879  if (addr->sa_family == AF_INET6) {
880  struct sockaddr_in6 *addr_in = (struct sockaddr_in6 *)(void*)addr;
881 
882  addr_in->sin6_port = htons(port);
883  inet_ntop(addr->sa_family, &addr_in->sin6_addr, buffer, DIMOF(buffer));
884 
885  } else {
886  struct sockaddr_in *addr_in = (struct sockaddr_in *)(void*)addr;
887 
888  addr_in->sin_port = htons(port);
889  inet_ntop(addr->sa_family, &addr_in->sin_addr, buffer, DIMOF(buffer));
890  }
891 
892  crm_info("Attempting to connect to remote server at %s:%d", buffer, port);
893 
894  if (callback) {
895  if (internal_tcp_connect_async
896  (sock, rp->ai_addr, rp->ai_addrlen, timeout, timer_id, userdata, callback) == 0) {
897  goto async_cleanup; /* Success for now, we'll hear back later in the callback */
898  }
899 
900  } else {
901  if (internal_tcp_connect(sock, rp->ai_addr, rp->ai_addrlen) == 0) {
902  break; /* Success */
903  }
904  }
905 
906  close(sock);
907  sock = -1;
908  }
909 
910 async_cleanup:
911 
912  if (res) {
913  freeaddrinfo(res);
914  }
915  return sock;
916 }
917 
918 int
919 crm_remote_tcp_connect(const char *host, int port)
920 {
921  return crm_remote_tcp_connect_async(host, port, -1, NULL, NULL, NULL);
922 }
A dumping ground.
size_t buffer_offset
Definition: ipcs.h:45
uint64_t id
Definition: remote.c:120
#define REMOTE_MSG_VERSION
Definition: remote.c:91
const char * pcmk_strerror(int rc)
Definition: logging.c:1113
struct tcp_async_cb_data __attribute__
char * buffer
Definition: ipcs.h:43
uint32_t payload_offset
Definition: remote.c:123
AIS_Host host
Definition: internal.h:52
#define CRM_LOG_ASSERT(expr)
Definition: logging.h:150
gboolean crm_remote_recv(crm_remote_t *remote, int total_timeout, int *disconnected)
Definition: remote.c:599
#define __swab32(x)
Definition: remote.c:74
Wrappers for and extensions to glib mainloop.
xmlNode * string2xml(const char *input)
Definition: xml.c:2957
uint32_t endian
Definition: remote.c:118
#define crm_warn(fmt, args...)
Definition: logging.h:249
uint32_t payload_compressed
Definition: remote.c:124
uint64_t flags
Definition: remote.c:121
#define crm_debug(fmt, args...)
Definition: logging.h:253
int crm_initiate_client_tls_handshake(crm_remote_t *remote, int timeout_ms)
void gnutls_session_t
Definition: cib_remote.c:52
int crm_remote_tcp_connect(const char *host, int port)
Definition: remote.c:919
uint32_t size_total
Definition: remote.c:122
#define crm_trace(fmt, args...)
Definition: logging.h:254
Wrappers for and extensions to libxml2.
int crm_remote_ready(crm_remote_t *remote, int timeout)
Definition: remote.c:447
#define pcmk_err_generic
Definition: error.h:45
#define __swab64(x)
Definition: remote.c:80
int crm_remote_tcp_connect_async(const char *host, int port, int timeout, int *timer_id, void *userdata, void(*callback)(void *userdata, int sock))
Definition: remote.c:830
int crm_remote_send(crm_remote_t *remote, xmlNode *msg)
Definition: remote.c:329
#define ENDIAN_LOCAL
Definition: remote.c:92
#define crm_perror(level, fmt, args...)
Log a system error message.
Definition: logging.h:226
size_t buffer_size
Definition: ipcs.h:44
#define crm_err(fmt, args...)
Definition: logging.h:248
const char * bz2_strerror(int rc)
Definition: logging.c:1176
char * dump_xml_unformatted(xmlNode *msg)
Definition: xml.c:3987
#define DIMOF(a)
Definition: crm.h:41
#define uint32_t
Definition: stdint.in.h:158
#define CRM_ASSERT(expr)
Definition: error.h:35
void * create_psk_tls_session(int csock, int type, void *credentials)
uint32_t version
Definition: remote.c:119
xmlNode * crm_remote_parse_buffer(crm_remote_t *remote)
Definition: remote.c:377
uint32_t payload_uncompressed
Definition: remote.c:125
void * crm_create_anon_tls_session(int sock, int type, void *credentials)
int tcp_socket
Definition: ipcs.h:47
#define crm_info(fmt, args...)
Definition: logging.h:251
enum crm_ais_msg_types type
Definition: internal.h:51