corosync  3.1.2
exec/cpg.c
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2006-2019 Red Hat, Inc.
3  *
4  * All rights reserved.
5  *
6  * Author: Christine Caulfield (ccaulfie@redhat.com)
7  * Author: Jan Friesse (jfriesse@redhat.com)
8  *
9  * This software licensed under BSD license, the text of which follows:
10  *
11  * Redistribution and use in source and binary forms, with or without
12  * modification, are permitted provided that the following conditions are met:
13  *
14  * - Redistributions of source code must retain the above copyright notice,
15  * this list of conditions and the following disclaimer.
16  * - Redistributions in binary form must reproduce the above copyright notice,
17  * this list of conditions and the following disclaimer in the documentation
18  * and/or other materials provided with the distribution.
19  * - Neither the name of the MontaVista Software, Inc. nor the names of its
20  * contributors may be used to endorse or promote products derived from this
21  * software without specific prior written permission.
22  *
23  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTIBUTORS "AS IS"
24  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
25  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
26  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
27  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
28  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
29  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
30  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
31  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
32  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
33  * THE POSSIBILITY OF SUCH DAMAGE.
34  */
35 
36 #include <config.h>
37 
38 #ifdef HAVE_ALLOCA_H
39 #include <alloca.h>
40 #endif
41 #include <sys/types.h>
42 #include <sys/socket.h>
43 #include <sys/un.h>
44 #include <sys/ioctl.h>
45 #include <netinet/in.h>
46 #include <sys/uio.h>
47 #include <unistd.h>
48 #include <fcntl.h>
49 #include <stdlib.h>
50 #include <stdio.h>
51 #include <errno.h>
52 #include <time.h>
53 #include <assert.h>
54 #include <arpa/inet.h>
55 #include <sys/mman.h>
56 
57 #include <qb/qblist.h>
58 #include <qb/qbmap.h>
59 
60 #include <corosync/corotypes.h>
61 #include <qb/qbipc_common.h>
62 #include <corosync/corodefs.h>
63 #include <corosync/logsys.h>
64 #include <corosync/coroapi.h>
65 
66 #include <corosync/cpg.h>
67 #include <corosync/ipc_cpg.h>
68 
69 #ifndef MAP_ANONYMOUS
70 #define MAP_ANONYMOUS MAP_ANON
71 #endif
72 
73 #include "service.h"
74 
76 
77 #define GROUP_HASH_SIZE 32
78 
87 };
88 
89 struct zcb_mapped {
90  struct qb_list_head list;
91  void *addr;
92  size_t size;
93 };
94 /*
95  * state` exec deliver
96  * match group name, pid -> if matched deliver for YES:
97  * XXX indicates impossible state
98  *
99  * join leave mcast
100  * UNJOINED XXX XXX NO
101  * LEAVE_STARTED XXX YES(unjoined_enter) YES
102  * JOIN_STARTED YES(join_started_enter) XXX NO
103  * JOIN_COMPLETED XXX NO YES
104  *
105  * join_started_enter
106  * set JOIN_COMPLETED
107  * add entry to process_info list
108  * unjoined_enter
109  * set UNJOINED
110  * delete entry from process_info list
111  *
112  *
113  * library accept join error codes
114  * UNJOINED YES(CS_OK) set JOIN_STARTED
115  * LEAVE_STARTED NO(CS_ERR_BUSY)
116  * JOIN_STARTED NO(CS_ERR_EXIST)
117  * JOIN_COMPlETED NO(CS_ERR_EXIST)
118  *
119  * library accept leave error codes
120  * UNJOINED NO(CS_ERR_NOT_EXIST)
121  * LEAVE_STARTED NO(CS_ERR_NOT_EXIST)
122  * JOIN_STARTED NO(CS_ERR_BUSY)
123  * JOIN_COMPLETED YES(CS_OK) set LEAVE_STARTED
124  *
125  * library accept mcast
126  * UNJOINED NO(CS_ERR_NOT_EXIST)
127  * LEAVE_STARTED NO(CS_ERR_NOT_EXIST)
128  * JOIN_STARTED YES(CS_OK)
129  * JOIN_COMPLETED YES(CS_OK)
130  */
131 enum cpd_state {
136 };
137 
141 };
142 
143 static struct qb_list_head joinlist_messages_head;
144 
145 struct cpg_pd {
146  void *conn;
148  uint32_t pid;
149  enum cpd_state cpd_state;
150  unsigned int flags;
152  uint64_t transition_counter; /* These two are used when sending fragmented messages */
154  struct qb_list_head list;
155  struct qb_list_head iteration_instance_list_head;
156  struct qb_list_head zcb_mapped_list_head;
157 };
158 
161  struct qb_list_head list;
162  struct qb_list_head items_list_head; /* List of process_info */
163  struct qb_list_head *current_pointer;
164 };
165 
166 DECLARE_HDB_DATABASE(cpg_iteration_handle_t_db,NULL);
167 
168 QB_LIST_DECLARE (cpg_pd_list_head);
169 
170 static unsigned int my_member_list[PROCESSOR_COUNT_MAX];
171 
172 static unsigned int my_member_list_entries;
173 
174 static unsigned int my_old_member_list[PROCESSOR_COUNT_MAX];
175 
176 static unsigned int my_old_member_list_entries = 0;
177 
178 static struct corosync_api_v1 *api = NULL;
179 
180 static enum cpg_sync_state my_sync_state = CPGSYNC_DOWNLIST;
181 
182 static mar_cpg_ring_id_t last_sync_ring_id;
183 
184 struct process_info {
185  unsigned int nodeid;
186  uint32_t pid;
188  struct qb_list_head list; /* on the group_info members list */
189 };
190 QB_LIST_DECLARE (process_info_list_head);
191 
193  uint32_t pid;
195 };
196 
201 };
202 
203 /*
204  * Service Interfaces required by service_message_handler struct
205  */
206 static char *cpg_exec_init_fn (struct corosync_api_v1 *);
207 
208 static int cpg_lib_init_fn (void *conn);
209 
210 static int cpg_lib_exit_fn (void *conn);
211 
212 static void message_handler_req_exec_cpg_procjoin (
213  const void *message,
214  unsigned int nodeid);
215 
216 static void message_handler_req_exec_cpg_procleave (
217  const void *message,
218  unsigned int nodeid);
219 
220 static void message_handler_req_exec_cpg_joinlist (
221  const void *message,
222  unsigned int nodeid);
223 
224 static void message_handler_req_exec_cpg_mcast (
225  const void *message,
226  unsigned int nodeid);
227 
228 static void message_handler_req_exec_cpg_partial_mcast (
229  const void *message,
230  unsigned int nodeid);
231 
232 static void message_handler_req_exec_cpg_downlist_old (
233  const void *message,
234  unsigned int nodeid);
235 
236 static void message_handler_req_exec_cpg_downlist (
237  const void *message,
238  unsigned int nodeid);
239 
240 static void exec_cpg_procjoin_endian_convert (void *msg);
241 
242 static void exec_cpg_joinlist_endian_convert (void *msg);
243 
244 static void exec_cpg_mcast_endian_convert (void *msg);
245 
246 static void exec_cpg_partial_mcast_endian_convert (void *msg);
247 
248 static void exec_cpg_downlist_endian_convert_old (void *msg);
249 
250 static void exec_cpg_downlist_endian_convert (void *msg);
251 
252 static void message_handler_req_lib_cpg_join (void *conn, const void *message);
253 
254 static void message_handler_req_lib_cpg_leave (void *conn, const void *message);
255 
256 static void message_handler_req_lib_cpg_finalize (void *conn, const void *message);
257 
258 static void message_handler_req_lib_cpg_mcast (void *conn, const void *message);
259 
260 static void message_handler_req_lib_cpg_partial_mcast (void *conn, const void *message);
261 
262 static void message_handler_req_lib_cpg_membership (void *conn,
263  const void *message);
264 
265 static void message_handler_req_lib_cpg_local_get (void *conn,
266  const void *message);
267 
268 static void message_handler_req_lib_cpg_iteration_initialize (
269  void *conn,
270  const void *message);
271 
272 static void message_handler_req_lib_cpg_iteration_next (
273  void *conn,
274  const void *message);
275 
276 static void message_handler_req_lib_cpg_iteration_finalize (
277  void *conn,
278  const void *message);
279 
280 static void message_handler_req_lib_cpg_zc_alloc (
281  void *conn,
282  const void *message);
283 
284 static void message_handler_req_lib_cpg_zc_free (
285  void *conn,
286  const void *message);
287 
288 static void message_handler_req_lib_cpg_zc_execute (
289  void *conn,
290  const void *message);
291 
292 static int cpg_node_joinleave_send (unsigned int pid, const mar_cpg_name_t *group_name, int fn, int reason);
293 
294 static int cpg_exec_send_downlist(void);
295 
296 static int cpg_exec_send_joinlist(void);
297 
298 static void downlist_inform_clients (void);
299 
300 static void joinlist_inform_clients (void);
301 
302 static void joinlist_messages_delete (void);
303 
304 static void cpg_sync_init (
305  const unsigned int *trans_list,
306  size_t trans_list_entries,
307  const unsigned int *member_list,
308  size_t member_list_entries,
309  const struct memb_ring_id *ring_id);
310 
311 static int cpg_sync_process (void);
312 
313 static void cpg_sync_activate (void);
314 
315 static void cpg_sync_abort (void);
316 
317 static void do_proc_join(
318  const mar_cpg_name_t *name,
319  uint32_t pid,
320  unsigned int nodeid,
321  int reason,
322  qb_map_t *group_notify_map);
323 
324 static void do_proc_leave(
325  const mar_cpg_name_t *name,
326  uint32_t pid,
327  unsigned int nodeid,
328  int reason);
329 
330 static int notify_lib_totem_membership (
331  void *conn,
332  int member_list_entries,
333  const unsigned int *member_list);
334 
335 static inline int zcb_all_free (
336  struct cpg_pd *cpd);
337 
338 static char *cpg_print_group_name (
339  const mar_cpg_name_t *group);
340 
341 /*
342  * Library Handler Definition
343  */
344 static struct corosync_lib_handler cpg_lib_engine[] =
345 {
346  { /* 0 - MESSAGE_REQ_CPG_JOIN */
347  .lib_handler_fn = message_handler_req_lib_cpg_join,
348  .flow_control = CS_LIB_FLOW_CONTROL_REQUIRED
349  },
350  { /* 1 - MESSAGE_REQ_CPG_LEAVE */
351  .lib_handler_fn = message_handler_req_lib_cpg_leave,
352  .flow_control = CS_LIB_FLOW_CONTROL_REQUIRED
353  },
354  { /* 2 - MESSAGE_REQ_CPG_MCAST */
355  .lib_handler_fn = message_handler_req_lib_cpg_mcast,
356  .flow_control = CS_LIB_FLOW_CONTROL_REQUIRED
357  },
358  { /* 3 - MESSAGE_REQ_CPG_MEMBERSHIP */
359  .lib_handler_fn = message_handler_req_lib_cpg_membership,
360  .flow_control = CS_LIB_FLOW_CONTROL_NOT_REQUIRED
361  },
362  { /* 4 - MESSAGE_REQ_CPG_LOCAL_GET */
363  .lib_handler_fn = message_handler_req_lib_cpg_local_get,
364  .flow_control = CS_LIB_FLOW_CONTROL_NOT_REQUIRED
365  },
366  { /* 5 - MESSAGE_REQ_CPG_ITERATIONINITIALIZE */
367  .lib_handler_fn = message_handler_req_lib_cpg_iteration_initialize,
368  .flow_control = CS_LIB_FLOW_CONTROL_NOT_REQUIRED
369  },
370  { /* 6 - MESSAGE_REQ_CPG_ITERATIONNEXT */
371  .lib_handler_fn = message_handler_req_lib_cpg_iteration_next,
372  .flow_control = CS_LIB_FLOW_CONTROL_NOT_REQUIRED
373  },
374  { /* 7 - MESSAGE_REQ_CPG_ITERATIONFINALIZE */
375  .lib_handler_fn = message_handler_req_lib_cpg_iteration_finalize,
376  .flow_control = CS_LIB_FLOW_CONTROL_NOT_REQUIRED
377  },
378  { /* 8 - MESSAGE_REQ_CPG_FINALIZE */
379  .lib_handler_fn = message_handler_req_lib_cpg_finalize,
380  .flow_control = CS_LIB_FLOW_CONTROL_REQUIRED
381  },
382  { /* 9 */
383  .lib_handler_fn = message_handler_req_lib_cpg_zc_alloc,
384  .flow_control = CS_LIB_FLOW_CONTROL_REQUIRED
385  },
386  { /* 10 */
387  .lib_handler_fn = message_handler_req_lib_cpg_zc_free,
388  .flow_control = CS_LIB_FLOW_CONTROL_REQUIRED
389  },
390  { /* 11 */
391  .lib_handler_fn = message_handler_req_lib_cpg_zc_execute,
392  .flow_control = CS_LIB_FLOW_CONTROL_REQUIRED
393  },
394  { /* 12 */
395  .lib_handler_fn = message_handler_req_lib_cpg_partial_mcast,
396  .flow_control = CS_LIB_FLOW_CONTROL_REQUIRED
397  },
398 
399 };
400 
401 static struct corosync_exec_handler cpg_exec_engine[] =
402 {
403  { /* 0 - MESSAGE_REQ_EXEC_CPG_PROCJOIN */
404  .exec_handler_fn = message_handler_req_exec_cpg_procjoin,
405  .exec_endian_convert_fn = exec_cpg_procjoin_endian_convert
406  },
407  { /* 1 - MESSAGE_REQ_EXEC_CPG_PROCLEAVE */
408  .exec_handler_fn = message_handler_req_exec_cpg_procleave,
409  .exec_endian_convert_fn = exec_cpg_procjoin_endian_convert
410  },
411  { /* 2 - MESSAGE_REQ_EXEC_CPG_JOINLIST */
412  .exec_handler_fn = message_handler_req_exec_cpg_joinlist,
413  .exec_endian_convert_fn = exec_cpg_joinlist_endian_convert
414  },
415  { /* 3 - MESSAGE_REQ_EXEC_CPG_MCAST */
416  .exec_handler_fn = message_handler_req_exec_cpg_mcast,
417  .exec_endian_convert_fn = exec_cpg_mcast_endian_convert
418  },
419  { /* 4 - MESSAGE_REQ_EXEC_CPG_DOWNLIST_OLD */
420  .exec_handler_fn = message_handler_req_exec_cpg_downlist_old,
421  .exec_endian_convert_fn = exec_cpg_downlist_endian_convert_old
422  },
423  { /* 5 - MESSAGE_REQ_EXEC_CPG_DOWNLIST */
424  .exec_handler_fn = message_handler_req_exec_cpg_downlist,
425  .exec_endian_convert_fn = exec_cpg_downlist_endian_convert
426  },
427  { /* 6 - MESSAGE_REQ_EXEC_CPG_PARTIAL_MCAST */
428  .exec_handler_fn = message_handler_req_exec_cpg_partial_mcast,
429  .exec_endian_convert_fn = exec_cpg_partial_mcast_endian_convert
430  },
431 };
432 
434  .name = "corosync cluster closed process group service v1.01",
435  .id = CPG_SERVICE,
436  .priority = 1,
437  .private_data_size = sizeof (struct cpg_pd),
438  .flow_control = CS_LIB_FLOW_CONTROL_REQUIRED,
439  .allow_inquorate = CS_LIB_ALLOW_INQUORATE,
440  .lib_init_fn = cpg_lib_init_fn,
441  .lib_exit_fn = cpg_lib_exit_fn,
442  .lib_engine = cpg_lib_engine,
443  .lib_engine_count = sizeof (cpg_lib_engine) / sizeof (struct corosync_lib_handler),
444  .exec_init_fn = cpg_exec_init_fn,
445  .exec_dump_fn = NULL,
446  .exec_engine = cpg_exec_engine,
447  .exec_engine_count = sizeof (cpg_exec_engine) / sizeof (struct corosync_exec_handler),
448  .sync_init = cpg_sync_init,
449  .sync_process = cpg_sync_process,
450  .sync_activate = cpg_sync_activate,
451  .sync_abort = cpg_sync_abort
452 };
453 
455 {
456  return (&cpg_service_engine);
457 }
458 
460  struct qb_ipc_request_header header __attribute__((aligned(8)));
461  mar_cpg_name_t group_name __attribute__((aligned(8)));
462  mar_uint32_t pid __attribute__((aligned(8)));
463  mar_uint32_t reason __attribute__((aligned(8)));
464 };
465 
467  struct qb_ipc_request_header header __attribute__((aligned(8)));
468  mar_cpg_name_t group_name __attribute__((aligned(8)));
469  mar_uint32_t msglen __attribute__((aligned(8)));
470  mar_uint32_t pid __attribute__((aligned(8)));
471  mar_message_source_t source __attribute__((aligned(8)));
472  mar_uint8_t message[] __attribute__((aligned(8)));
473 };
474 
476  struct qb_ipc_request_header header __attribute__((aligned(8)));
477  mar_cpg_name_t group_name __attribute__((aligned(8)));
478  mar_uint32_t msglen __attribute__((aligned(8)));
479  mar_uint32_t fraglen __attribute__((aligned(8)));
480  mar_uint32_t pid __attribute__((aligned(8)));
482  mar_message_source_t source __attribute__((aligned(8)));
483  mar_uint8_t message[] __attribute__((aligned(8)));
484 };
485 
487  struct qb_ipc_request_header header __attribute__((aligned(8)));
488  mar_uint32_t left_nodes __attribute__((aligned(8)));
490 };
491 
493  struct qb_ipc_request_header header __attribute__((aligned(8)));
494  /* merge decisions */
495  mar_uint32_t old_members __attribute__((aligned(8)));
496  /* downlist below */
497  mar_uint32_t left_nodes __attribute__((aligned(8)));
499 };
500 
501 struct joinlist_msg {
503  uint32_t pid;
505  struct qb_list_head list;
506 };
507 
508 static struct req_exec_cpg_downlist g_req_exec_cpg_downlist;
509 
510 /*
511  * Function print group name. It's not reentrant
512  */
513 static char *cpg_print_group_name(const mar_cpg_name_t *group)
514 {
515  static char res[CPG_MAX_NAME_LENGTH * 4 + 1];
516  int dest_pos = 0;
517  char c;
518  int i;
519 
520  for (i = 0; i < group->length; i++) {
521  c = group->value[i];
522 
523  if (c >= ' ' && c < 0x7f && c != '\\') {
524  res[dest_pos++] = c;
525  } else {
526  if (c == '\\') {
527  res[dest_pos++] = '\\';
528  res[dest_pos++] = '\\';
529  } else {
530  snprintf(res + dest_pos, sizeof(res) - dest_pos, "\\x%02X", c);
531  dest_pos += 4;
532  }
533  }
534  }
535  res[dest_pos] = 0;
536 
537  return (res);
538 }
539 
540 static void cpg_sync_init (
541  const unsigned int *trans_list,
542  size_t trans_list_entries,
543  const unsigned int *member_list,
544  size_t member_list_entries,
545  const struct memb_ring_id *ring_id)
546 {
547  int entries;
548  int i, j;
549  int found;
550 
551  my_sync_state = CPGSYNC_DOWNLIST;
552 
553  memcpy (my_member_list, member_list, member_list_entries *
554  sizeof (unsigned int));
555  my_member_list_entries = member_list_entries;
556 
557  last_sync_ring_id.nodeid = ring_id->nodeid;
558  last_sync_ring_id.seq = ring_id->seq;
559 
560  entries = 0;
561  /*
562  * Determine list of nodeids for downlist message
563  */
564  for (i = 0; i < my_old_member_list_entries; i++) {
565  found = 0;
566  for (j = 0; j < trans_list_entries; j++) {
567  if (my_old_member_list[i] == trans_list[j]) {
568  found = 1;
569  break;
570  }
571  }
572  if (found == 0) {
573  g_req_exec_cpg_downlist.nodeids[entries++] =
574  my_old_member_list[i];
575  }
576  }
577  g_req_exec_cpg_downlist.left_nodes = entries;
578 }
579 
580 static int cpg_sync_process (void)
581 {
582  int res = -1;
583 
584  if (my_sync_state == CPGSYNC_DOWNLIST) {
585  res = cpg_exec_send_downlist();
586  if (res == -1) {
587  return (-1);
588  }
589  my_sync_state = CPGSYNC_JOINLIST;
590  }
591  if (my_sync_state == CPGSYNC_JOINLIST) {
592  res = cpg_exec_send_joinlist();
593  }
594  return (res);
595 }
596 
597 static void cpg_sync_activate (void)
598 {
599  memcpy (my_old_member_list, my_member_list,
600  my_member_list_entries * sizeof (unsigned int));
601  my_old_member_list_entries = my_member_list_entries;
602 
603  downlist_inform_clients ();
604 
605  joinlist_inform_clients ();
606 
607  joinlist_messages_delete ();
608 
609  notify_lib_totem_membership (NULL, my_member_list_entries, my_member_list);
610 }
611 
612 static void cpg_sync_abort (void)
613 {
614 
615  joinlist_messages_delete ();
616 }
617 
618 static int notify_lib_totem_membership (
619  void *conn,
620  int member_list_entries,
621  const unsigned int *member_list)
622 {
623  struct qb_list_head *iter;
624  char *buf;
625  int size;
627 
628  size = sizeof(struct res_lib_cpg_totem_confchg_callback) +
629  sizeof(mar_uint32_t) * (member_list_entries);
630  buf = alloca(size);
631  if (!buf)
632  return CS_ERR_LIBRARY;
633 
634  res = (struct res_lib_cpg_totem_confchg_callback *)buf;
635  res->member_list_entries = member_list_entries;
636  res->header.size = size;
638  res->header.error = CS_OK;
639 
640  memcpy (&res->ring_id, &last_sync_ring_id, sizeof (mar_cpg_ring_id_t));
641  memcpy (res->member_list, member_list, res->member_list_entries * sizeof (mar_uint32_t));
642 
643  if (conn == NULL) {
644  qb_list_for_each(iter, &cpg_pd_list_head) {
645  struct cpg_pd *cpg_pd = qb_list_entry (iter, struct cpg_pd, list);
646  api->ipc_dispatch_send (cpg_pd->conn, buf, size);
647  }
648  } else {
649  api->ipc_dispatch_send (conn, buf, size);
650  }
651 
652  return CS_OK;
653 }
654 
655 /*
656  * Helper function for notify_lib_joinlist which prepares member_list using
657  * process_info_list with removed left_list items.
658  * member_list_entries - When not NULL it contains number of member_list entries
659  * member_list - When not NULL it is used as pointer to start of preallocated
660  * array of members. Pointer is adjusted to the end of array on
661  * exit.
662  */
663 static void notify_lib_joinlist_fill_member_list(
664  const mar_cpg_name_t *group_name,
665  int left_list_entries,
666  const mar_cpg_address_t *left_list,
667  int *member_list_entries,
668  mar_cpg_address_t **member_list)
669 {
670  struct qb_list_head *iter;
671  int i;
672 
673  if (member_list_entries != NULL) {
674  *member_list_entries = 0;
675  }
676 
677  qb_list_for_each(iter, &process_info_list_head) {
678  struct process_info *pi = qb_list_entry (iter, struct process_info, list);
679 
680  if (mar_name_compare (&pi->group, group_name) == 0) {
681  int in_left_list = 0;
682 
683  for (i = 0; i < left_list_entries; i++) {
684  if (left_list[i].nodeid == pi->nodeid && left_list[i].pid == pi->pid) {
685  in_left_list = 1;
686  break ;
687  }
688  }
689 
690  if (!in_left_list) {
691  if (member_list_entries != NULL) {
692  (*member_list_entries)++;
693  }
694 
695  if (member_list != NULL) {
696  (*member_list)->nodeid = pi->nodeid;
697  (*member_list)->pid = pi->pid;
698  (*member_list)->reason = CPG_REASON_UNDEFINED;
699  (*member_list)++;
700  }
701  }
702  }
703  }
704 }
705 
706 static int notify_lib_joinlist(
707  const mar_cpg_name_t *group_name,
708  int joined_list_entries,
709  mar_cpg_address_t *joined_list,
710  int left_list_entries,
711  mar_cpg_address_t *left_list,
712  int id)
713 {
714  int size;
715  char *buf;
716  struct qb_list_head *iter;
717  int member_list_entries;
718  struct res_lib_cpg_confchg_callback *res;
719  mar_cpg_address_t *retgi;
720  int i;
721 
722  /*
723  * Find size of member_list (use process_info_list but remove items in left_list)
724  */
725  notify_lib_joinlist_fill_member_list(group_name, left_list_entries, left_list,
726  &member_list_entries, NULL);
727 
728  size = sizeof(struct res_lib_cpg_confchg_callback) +
729  sizeof(mar_cpg_address_t) * (member_list_entries + left_list_entries + joined_list_entries);
730  buf = alloca(size);
731  if (!buf)
732  return CS_ERR_LIBRARY;
733 
734  res = (struct res_lib_cpg_confchg_callback *)buf;
735  res->joined_list_entries = joined_list_entries;
736  res->left_list_entries = left_list_entries;
737  res->member_list_entries = member_list_entries;
738  retgi = res->member_list;
739  res->header.size = size;
740  res->header.id = id;
741  res->header.error = CS_OK;
742  memcpy(&res->group_name, group_name, sizeof(mar_cpg_name_t));
743 
744  /*
745  * Fill res->memberlist. Use process_info_list but remove items in left_list.
746  */
747  notify_lib_joinlist_fill_member_list(group_name, left_list_entries, left_list,
748  NULL, &retgi);
749 
750  /*
751  * Fill res->left_list
752  */
753  if (left_list_entries) {
754  memcpy (retgi, left_list, left_list_entries * sizeof(mar_cpg_address_t));
755  retgi += left_list_entries;
756  }
757 
758  if (joined_list_entries) {
759  /*
760  * Fill res->joined_list
761  */
762  memcpy (retgi, joined_list, joined_list_entries * sizeof(mar_cpg_address_t));
763  retgi += joined_list_entries;
764 
765  /*
766  * Update cpd_state for all local joined processes in group
767  */
768  for (i = 0; i < joined_list_entries; i++) {
769  if (joined_list[i].nodeid == api->totem_nodeid_get()) {
770  qb_list_for_each(iter, &cpg_pd_list_head) {
771  struct cpg_pd *cpd = qb_list_entry (iter, struct cpg_pd, list);
772  if (joined_list[i].pid == cpd->pid &&
773  mar_name_compare (&cpd->group_name, group_name) == 0) {
775  }
776  }
777  }
778  }
779  }
780 
781  /*
782  * Send notification to all ipc clients joined in group_name
783  */
784  qb_list_for_each(iter, &cpg_pd_list_head) {
785  struct cpg_pd *cpd = qb_list_entry (iter, struct cpg_pd, list);
786  if (mar_name_compare (&cpd->group_name, group_name) == 0) {
787  if (cpd->cpd_state == CPD_STATE_JOIN_COMPLETED ||
789 
790  api->ipc_dispatch_send (cpd->conn, buf, size);
791  cpd->transition_counter++;
792  }
793  }
794  }
795 
796  if (left_list_entries) {
797  /*
798  * Zero internal cpd state for all local processes leaving group
799  * (this loop is not strictly needed because left_list always either
800  * contains exactly one process running on local node or more items
801  * but none of them is running on local node)
802  */
803  for (i = 0; i < joined_list_entries; i++) {
804  if (left_list[i].nodeid == api->totem_nodeid_get() &&
805  left_list[i].reason == CONFCHG_CPG_REASON_LEAVE) {
806  qb_list_for_each(iter, &cpg_pd_list_head) {
807  struct cpg_pd *cpd = qb_list_entry (iter, struct cpg_pd, list);
808  if (left_list[i].pid == cpd->pid &&
809  mar_name_compare (&cpd->group_name, group_name) == 0) {
810  cpd->pid = 0;
811  memset (&cpd->group_name, 0, sizeof(cpd->group_name));
813  }
814  }
815  }
816  }
817  }
818 
819  /*
820  * Traverse thru cpds and send totem membership for cpd, where it is not send yet
821  */
822  qb_list_for_each(iter, &cpg_pd_list_head) {
823  struct cpg_pd *cpd = qb_list_entry (iter, struct cpg_pd, list);
824 
826  cpd->initial_totem_conf_sent = 1;
827 
828  notify_lib_totem_membership (cpd->conn, my_old_member_list_entries, my_old_member_list);
829  }
830  }
831 
832  return CS_OK;
833 }
834 
835 static void downlist_log(const char *msg, struct req_exec_cpg_downlist *dl)
836 {
837  log_printf (LOG_DEBUG,
838  "%s: members(old:%d left:%d)",
839  msg,
840  dl->old_members,
841  dl->left_nodes);
842 }
843 
844 static void downlist_inform_clients (void)
845 {
846  struct qb_list_head *iter, *tmp_iter;
847  struct process_info *left_pi;
848  qb_map_t *group_map;
849  struct cpg_name cpg_group;
850  mar_cpg_name_t group;
851  struct confchg_data{
852  struct cpg_name cpg_group;
854  int left_list_entries;
855  struct qb_list_head list;
856  } *pcd;
857  qb_map_iter_t *miter;
858  int i, size;
859 
860  downlist_log("my downlist", &g_req_exec_cpg_downlist);
861 
862  group_map = qb_skiplist_create();
863 
864  /*
865  * only the cpg groups included in left nodes should receive
866  * confchg event, so we will collect these cpg groups and
867  * relative left_lists here.
868  */
869  qb_list_for_each_safe(iter, tmp_iter, &process_info_list_head) {
870  struct process_info *pi = qb_list_entry(iter, struct process_info, list);
871 
872  left_pi = NULL;
873  for (i = 0; i < g_req_exec_cpg_downlist.left_nodes; i++) {
874 
875  if (pi->nodeid == g_req_exec_cpg_downlist.nodeids[i]) {
876  left_pi = pi;
877  break;
878  }
879  }
880 
881  if (left_pi) {
882  marshall_from_mar_cpg_name_t(&cpg_group, &left_pi->group);
883  cpg_group.value[cpg_group.length] = 0;
884 
885  pcd = (struct confchg_data *)qb_map_get(group_map, cpg_group.value);
886  if (pcd == NULL) {
887  pcd = (struct confchg_data *)calloc(1, sizeof(struct confchg_data));
888  memcpy(&pcd->cpg_group, &cpg_group, sizeof(struct cpg_name));
889  qb_map_put(group_map, pcd->cpg_group.value, pcd);
890  }
891  size = pcd->left_list_entries;
892  pcd->left_list[size].nodeid = left_pi->nodeid;
893  pcd->left_list[size].pid = left_pi->pid;
894  pcd->left_list[size].reason = CONFCHG_CPG_REASON_NODEDOWN;
895  pcd->left_list_entries++;
896  qb_list_del (&left_pi->list);
897  free (left_pi);
898  }
899  }
900 
901  /* send only one confchg event per cpg group */
902  miter = qb_map_iter_create(group_map);
903  while (qb_map_iter_next(miter, (void **)&pcd)) {
904  marshall_to_mar_cpg_name_t(&group, &pcd->cpg_group);
905 
906  log_printf (LOG_DEBUG, "left_list_entries:%d", pcd->left_list_entries);
907  for (i=0; i<pcd->left_list_entries; i++) {
908  log_printf (LOG_DEBUG, "left_list[%d] group:%s, ip:%s, pid:%d",
909  i, cpg_print_group_name(&group),
910  (char*)api->totem_ifaces_print(pcd->left_list[i].nodeid),
911  pcd->left_list[i].pid);
912  }
913 
914  /* send confchg event */
915  notify_lib_joinlist(&group,
916  0, NULL,
917  pcd->left_list_entries,
918  pcd->left_list,
920 
921  free(pcd);
922  }
923  qb_map_iter_free(miter);
924  qb_map_destroy(group_map);
925 }
926 
927 /*
928  * Remove processes that might have left the group while we were suspended.
929  */
930 static void joinlist_remove_zombie_pi_entries (void)
931 {
932  struct qb_list_head *pi_iter, *tmp_iter;
933  struct qb_list_head *jl_iter;
934  struct process_info *pi;
935  struct joinlist_msg *stored_msg;
936  int found;
937 
938  qb_list_for_each_safe(pi_iter, tmp_iter, &process_info_list_head) {
939  pi = qb_list_entry (pi_iter, struct process_info, list);
940 
941  /*
942  * Ignore local node
943  */
944  if (pi->nodeid == api->totem_nodeid_get()) {
945  continue ;
946  }
947 
948  /*
949  * Try to find message in joinlist messages
950  */
951  found = 0;
952  qb_list_for_each(jl_iter, &joinlist_messages_head) {
953  stored_msg = qb_list_entry(jl_iter, struct joinlist_msg, list);
954 
955  if (stored_msg->sender_nodeid == api->totem_nodeid_get()) {
956  continue ;
957  }
958 
959  if (pi->nodeid == stored_msg->sender_nodeid &&
960  pi->pid == stored_msg->pid &&
961  mar_name_compare (&pi->group, &stored_msg->group_name) == 0) {
962  found = 1;
963  break ;
964  }
965  }
966 
967  if (!found) {
968  do_proc_leave(&pi->group, pi->pid, pi->nodeid, CONFCHG_CPG_REASON_PROCDOWN);
969  }
970  }
971 }
972 
973 static void joinlist_inform_clients (void)
974 {
975  struct joinlist_msg *stored_msg;
976  struct qb_list_head *iter;
977  unsigned int i;
978  qb_map_t *group_notify_map;
979  qb_map_iter_t *miter;
980  struct join_list_confchg_data *jld;
981 
982  group_notify_map = qb_skiplist_create();
983 
984  i = 0;
985  qb_list_for_each(iter, &joinlist_messages_head) {
986  stored_msg = qb_list_entry(iter, struct joinlist_msg, list);
987 
988  log_printf (LOG_DEBUG, "joinlist_messages[%u] group:%s, ip:%s, pid:%d",
989  i++, cpg_print_group_name(&stored_msg->group_name),
990  (char*)api->totem_ifaces_print(stored_msg->sender_nodeid),
991  stored_msg->pid);
992 
993  /* Ignore our own messages */
994  if (stored_msg->sender_nodeid == api->totem_nodeid_get()) {
995  continue ;
996  }
997 
998  do_proc_join (&stored_msg->group_name, stored_msg->pid, stored_msg->sender_nodeid,
999  CONFCHG_CPG_REASON_NODEUP, group_notify_map);
1000  }
1001 
1002  miter = qb_map_iter_create(group_notify_map);
1003  while (qb_map_iter_next(miter, (void **)&jld)) {
1004  notify_lib_joinlist(&jld->cpg_group,
1005  jld->join_list_entries, jld->join_list,
1006  0, NULL,
1008  free(jld);
1009  }
1010  qb_map_iter_free(miter);
1011  qb_map_destroy(group_notify_map);
1012 
1013  joinlist_remove_zombie_pi_entries ();
1014 }
1015 
1016 static void joinlist_messages_delete (void)
1017 {
1018  struct joinlist_msg *stored_msg;
1019  struct qb_list_head *iter, *tmp_iter;
1020 
1021  qb_list_for_each_safe(iter, tmp_iter, &joinlist_messages_head) {
1022  stored_msg = qb_list_entry(iter, struct joinlist_msg, list);
1023  qb_list_del (&stored_msg->list);
1024  free (stored_msg);
1025  }
1026  qb_list_init (&joinlist_messages_head);
1027 }
1028 
1029 static char *cpg_exec_init_fn (struct corosync_api_v1 *corosync_api)
1030 {
1031  qb_list_init (&joinlist_messages_head);
1032  api = corosync_api;
1033  return (NULL);
1034 }
1035 
1036 static void cpg_iteration_instance_finalize (struct cpg_iteration_instance *cpg_iteration_instance)
1037 {
1038  struct qb_list_head *iter, *tmp_iter;
1039  struct process_info *pi;
1040 
1041  qb_list_for_each_safe(iter, tmp_iter, &(cpg_iteration_instance->items_list_head)) {
1042  pi = qb_list_entry (iter, struct process_info, list);
1043  qb_list_del (&pi->list);
1044  free (pi);
1045  }
1046 
1047  qb_list_del (&cpg_iteration_instance->list);
1048  hdb_handle_destroy (&cpg_iteration_handle_t_db, cpg_iteration_instance->handle);
1049 }
1050 
1051 static void cpg_pd_finalize (struct cpg_pd *cpd)
1052 {
1053  struct qb_list_head *iter, *tmp_iter;
1054  struct cpg_iteration_instance *cpii;
1055 
1056  zcb_all_free(cpd);
1057  qb_list_for_each_safe(iter, tmp_iter, &(cpd->iteration_instance_list_head)) {
1058  cpii = qb_list_entry (iter, struct cpg_iteration_instance, list);
1059 
1060  cpg_iteration_instance_finalize (cpii);
1061  }
1062 
1063  qb_list_del (&cpd->list);
1064 }
1065 
1066 static int cpg_lib_exit_fn (void *conn)
1067 {
1068  struct cpg_pd *cpd = (struct cpg_pd *)api->ipc_private_data_get (conn);
1069 
1070  log_printf(LOGSYS_LEVEL_DEBUG, "exit_fn for conn=%p", conn);
1071 
1072  if (cpd->group_name.length > 0 && cpd->cpd_state != CPD_STATE_LEAVE_STARTED) {
1073  cpg_node_joinleave_send (cpd->pid, &cpd->group_name,
1075  }
1076 
1077  cpg_pd_finalize (cpd);
1078 
1079  api->ipc_refcnt_dec (conn);
1080  return (0);
1081 }
1082 
1083 static int cpg_node_joinleave_send (unsigned int pid, const mar_cpg_name_t *group_name, int fn, int reason)
1084 {
1086  struct iovec req_exec_cpg_iovec;
1087  int result;
1088 
1089  memset(&req_exec_cpg_procjoin, 0, sizeof(req_exec_cpg_procjoin));
1090 
1091  memcpy(&req_exec_cpg_procjoin.group_name, group_name, sizeof(mar_cpg_name_t));
1092  req_exec_cpg_procjoin.pid = pid;
1093  req_exec_cpg_procjoin.reason = reason;
1094 
1095  req_exec_cpg_procjoin.header.size = sizeof(req_exec_cpg_procjoin);
1097 
1098  req_exec_cpg_iovec.iov_base = (char *)&req_exec_cpg_procjoin;
1099  req_exec_cpg_iovec.iov_len = sizeof(req_exec_cpg_procjoin);
1100 
1101  result = api->totem_mcast (&req_exec_cpg_iovec, 1, TOTEM_AGREED);
1102 
1103  return (result);
1104 }
1105 
1106 /* Can byteswap join & leave messages */
1107 static void exec_cpg_procjoin_endian_convert (void *msg)
1108 {
1110 
1112  swab_mar_cpg_name_t (&req_exec_cpg_procjoin->group_name);
1114 }
1115 
1116 static void exec_cpg_joinlist_endian_convert (void *msg_v)
1117 {
1118  char *msg = msg_v;
1119  struct qb_ipc_response_header *res = (struct qb_ipc_response_header *)msg;
1120  struct join_list_entry *jle = (struct join_list_entry *)(msg + sizeof(struct qb_ipc_response_header));
1121 
1122  swab_mar_int32_t (&res->size);
1123 
1124  while ((const char*)jle < msg + res->size) {
1125  jle->pid = swab32(jle->pid);
1126  swab_mar_cpg_name_t (&jle->group_name);
1127  jle++;
1128  }
1129 }
1130 
1131 static void exec_cpg_downlist_endian_convert_old (void *msg)
1132 {
1133 }
1134 
1135 static void exec_cpg_downlist_endian_convert (void *msg)
1136 {
1138  unsigned int i;
1139 
1140  req_exec_cpg_downlist->left_nodes = swab32(req_exec_cpg_downlist->left_nodes);
1141  req_exec_cpg_downlist->old_members = swab32(req_exec_cpg_downlist->old_members);
1142 
1143  for (i = 0; i < req_exec_cpg_downlist->left_nodes; i++) {
1144  req_exec_cpg_downlist->nodeids[i] = swab32(req_exec_cpg_downlist->nodeids[i]);
1145  }
1146 }
1147 
1148 
1149 static void exec_cpg_mcast_endian_convert (void *msg)
1150 {
1151  struct req_exec_cpg_mcast *req_exec_cpg_mcast = msg;
1152 
1153  swab_coroipc_request_header_t (&req_exec_cpg_mcast->header);
1154  swab_mar_cpg_name_t (&req_exec_cpg_mcast->group_name);
1156  req_exec_cpg_mcast->msglen = swab32(req_exec_cpg_mcast->msglen);
1157  swab_mar_message_source_t (&req_exec_cpg_mcast->source);
1158 }
1159 
1160 static void exec_cpg_partial_mcast_endian_convert (void *msg)
1161 {
1163 
1164  swab_coroipc_request_header_t (&req_exec_cpg_mcast->header);
1165  swab_mar_cpg_name_t (&req_exec_cpg_mcast->group_name);
1167  req_exec_cpg_mcast->msglen = swab32(req_exec_cpg_mcast->msglen);
1168  req_exec_cpg_mcast->fraglen = swab32(req_exec_cpg_mcast->fraglen);
1170  swab_mar_message_source_t (&req_exec_cpg_mcast->source);
1171 }
1172 
1173 static struct process_info *process_info_find(const mar_cpg_name_t *group_name, uint32_t pid, unsigned int nodeid) {
1174  struct qb_list_head *iter;
1175 
1176  qb_list_for_each(iter, &process_info_list_head) {
1177  struct process_info *pi = qb_list_entry (iter, struct process_info, list);
1178 
1179  if (pi->pid == pid && pi->nodeid == nodeid &&
1180  mar_name_compare (&pi->group, group_name) == 0) {
1181  return pi;
1182  }
1183  }
1184 
1185  return NULL;
1186 }
1187 
1188 static void do_proc_join(
1189  const mar_cpg_name_t *name,
1190  uint32_t pid,
1191  unsigned int nodeid,
1192  int reason,
1193  qb_map_t *group_notify_map)
1194 {
1195  struct process_info *pi;
1196  struct process_info *pi_entry;
1197  mar_cpg_address_t notify_info;
1198  struct qb_list_head *list;
1199  struct qb_list_head *list_to_add = NULL;
1200  int size;
1201 
1202  if (process_info_find (name, pid, nodeid) != NULL) {
1203  return ;
1204  }
1205  pi = malloc (sizeof (struct process_info));
1206  if (!pi) {
1207  log_printf(LOGSYS_LEVEL_WARNING, "Unable to allocate process_info struct");
1208  return;
1209  }
1210  pi->nodeid = nodeid;
1211  pi->pid = pid;
1212  memcpy(&pi->group, name, sizeof(*name));
1213  qb_list_init(&pi->list);
1214 
1215  /*
1216  * Insert new process in sorted order so synchronization works properly
1217  */
1218  list_to_add = &process_info_list_head;
1219  qb_list_for_each(list, &process_info_list_head) {
1220  pi_entry = qb_list_entry(list, struct process_info, list);
1221  if (pi_entry->nodeid > pi->nodeid ||
1222  (pi_entry->nodeid == pi->nodeid && pi_entry->pid > pi->pid)) {
1223 
1224  break;
1225  }
1226  list_to_add = list;
1227  }
1228  qb_list_add (&pi->list, list_to_add);
1229 
1230  notify_info.pid = pi->pid;
1231  notify_info.nodeid = nodeid;
1232  notify_info.reason = reason;
1233 
1234  if (group_notify_map == NULL) {
1235  notify_lib_joinlist(&pi->group,
1236  1, &notify_info,
1237  0, NULL,
1239  } else {
1240  struct join_list_confchg_data *jld = qb_map_get(group_notify_map, pi->group.value);
1241  if (jld == NULL) {
1242  jld = (struct join_list_confchg_data *)calloc(1, sizeof(struct join_list_confchg_data));
1243  memcpy(&jld->cpg_group, &pi->group, sizeof(mar_cpg_name_t));
1244  qb_map_put(group_notify_map, jld->cpg_group.value, jld);
1245  }
1246  size = jld->join_list_entries;
1247  jld->join_list[size].nodeid = notify_info.nodeid;
1248  jld->join_list[size].pid = notify_info.pid;
1249  jld->join_list[size].reason = notify_info.reason;
1250  jld->join_list_entries++;
1251  }
1252 }
1253 
1254 static void do_proc_leave(
1255  const mar_cpg_name_t *name,
1256  uint32_t pid,
1257  unsigned int nodeid,
1258  int reason)
1259 {
1260  struct process_info *pi;
1261  struct qb_list_head *iter, *tmp_iter;
1262  mar_cpg_address_t notify_info;
1263 
1264  notify_info.pid = pid;
1265  notify_info.nodeid = nodeid;
1266  notify_info.reason = reason;
1267 
1268  notify_lib_joinlist(name,
1269  0, NULL,
1270  1, &notify_info,
1272 
1273  qb_list_for_each_safe(iter, tmp_iter, &process_info_list_head) {
1274  pi = qb_list_entry(iter, struct process_info, list);
1275 
1276  if (pi->pid == pid && pi->nodeid == nodeid &&
1277  mar_name_compare (&pi->group, name)==0) {
1278  qb_list_del (&pi->list);
1279  free (pi);
1280  }
1281  }
1282 }
1283 
1284 static void message_handler_req_exec_cpg_downlist_old (
1285  const void *message,
1286  unsigned int nodeid)
1287 {
1288  log_printf (LOGSYS_LEVEL_DEBUG, "downlist OLD from node " CS_PRI_NODE_ID,
1289  nodeid);
1290 }
1291 
1292 static void message_handler_req_exec_cpg_downlist(
1293  const void *message,
1294  unsigned int nodeid)
1295 {
1296  const struct req_exec_cpg_downlist *req_exec_cpg_downlist = message;
1297 
1298  log_printf (LOGSYS_LEVEL_DEBUG, "downlist left_list: %d received",
1299  req_exec_cpg_downlist->left_nodes);
1300 }
1301 
1302 
1303 static void message_handler_req_exec_cpg_procjoin (
1304  const void *message,
1305  unsigned int nodeid)
1306 {
1307  const struct req_exec_cpg_procjoin *req_exec_cpg_procjoin = message;
1308 
1309  log_printf(LOGSYS_LEVEL_DEBUG, "got procjoin message from cluster node " CS_PRI_NODE_ID " (%s) for pid %u",
1310  nodeid,
1311  api->totem_ifaces_print(nodeid),
1312  (unsigned int)req_exec_cpg_procjoin->pid);
1313 
1314  do_proc_join (&req_exec_cpg_procjoin->group_name,
1316  CONFCHG_CPG_REASON_JOIN, NULL);
1317 }
1318 
1319 static void message_handler_req_exec_cpg_procleave (
1320  const void *message,
1321  unsigned int nodeid)
1322 {
1323  const struct req_exec_cpg_procjoin *req_exec_cpg_procjoin = message;
1324 
1325  log_printf(LOGSYS_LEVEL_DEBUG, "got procleave message from cluster node " CS_PRI_NODE_ID " (%s) for pid %u",
1326  nodeid,
1327  api->totem_ifaces_print(nodeid),
1328  (unsigned int)req_exec_cpg_procjoin->pid);
1329 
1330  do_proc_leave (&req_exec_cpg_procjoin->group_name,
1332  req_exec_cpg_procjoin->reason);
1333 }
1334 
1335 
1336 /* Got a proclist from another node */
1337 static void message_handler_req_exec_cpg_joinlist (
1338  const void *message_v,
1339  unsigned int nodeid)
1340 {
1341  const char *message = message_v;
1342  const struct qb_ipc_response_header *res = (const struct qb_ipc_response_header *)message;
1343  const struct join_list_entry *jle = (const struct join_list_entry *)(message + sizeof(struct qb_ipc_response_header));
1344  struct joinlist_msg *stored_msg;
1345 
1346  log_printf(LOGSYS_LEVEL_DEBUG, "got joinlist message from node " CS_PRI_NODE_ID,
1347  nodeid);
1348 
1349  while ((const char*)jle < message + res->size) {
1350  stored_msg = malloc (sizeof (struct joinlist_msg));
1351  memset(stored_msg, 0, sizeof (struct joinlist_msg));
1352  stored_msg->sender_nodeid = nodeid;
1353  stored_msg->pid = jle->pid;
1354  memcpy(&stored_msg->group_name, &jle->group_name, sizeof(mar_cpg_name_t));
1355  qb_list_init (&stored_msg->list);
1356  qb_list_add (&stored_msg->list, &joinlist_messages_head);
1357  jle++;
1358  }
1359 }
1360 
1361 static void message_handler_req_exec_cpg_mcast (
1362  const void *message,
1363  unsigned int nodeid)
1364 {
1365  const struct req_exec_cpg_mcast *req_exec_cpg_mcast = message;
1367  int msglen = req_exec_cpg_mcast->msglen;
1368  struct qb_list_head *iter, *pi_iter, *tmp_iter;
1369  struct cpg_pd *cpd;
1370  struct iovec iovec[2];
1371  int known_node = 0;
1372 
1374  res_lib_cpg_mcast.header.size = sizeof(res_lib_cpg_mcast) + msglen;
1375  res_lib_cpg_mcast.msglen = msglen;
1377  res_lib_cpg_mcast.nodeid = nodeid;
1378 
1379  memcpy(&res_lib_cpg_mcast.group_name, &req_exec_cpg_mcast->group_name,
1380  sizeof(mar_cpg_name_t));
1381  iovec[0].iov_base = (void *)&res_lib_cpg_mcast;
1382  iovec[0].iov_len = sizeof (res_lib_cpg_mcast);
1383 
1384  iovec[1].iov_base = (char*)message+sizeof(*req_exec_cpg_mcast);
1385  iovec[1].iov_len = msglen;
1386 
1387  qb_list_for_each_safe(iter, tmp_iter, &cpg_pd_list_head) {
1388  cpd = qb_list_entry(iter, struct cpg_pd, list);
1390  && (mar_name_compare (&cpd->group_name, &req_exec_cpg_mcast->group_name) == 0)) {
1391 
1392  if (!known_node) {
1393  /* Try to find, if we know the node */
1394  qb_list_for_each(pi_iter, &process_info_list_head) {
1395  struct process_info *pi = qb_list_entry (pi_iter, struct process_info, list);
1396 
1397  if (pi->nodeid == nodeid &&
1398  mar_name_compare (&pi->group, &req_exec_cpg_mcast->group_name) == 0) {
1399  known_node = 1;
1400  break;
1401  }
1402  }
1403  }
1404 
1405  if (!known_node) {
1406  log_printf(LOGSYS_LEVEL_WARNING, "Unknown node -> we will not deliver message");
1407  return ;
1408  }
1409 
1410  api->ipc_dispatch_iov_send (cpd->conn, iovec, 2);
1411  }
1412  }
1413 }
1414 
1415 static void message_handler_req_exec_cpg_partial_mcast (
1416  const void *message,
1417  unsigned int nodeid)
1418 {
1419  const struct req_exec_cpg_partial_mcast *req_exec_cpg_mcast = message;
1421  int msglen = req_exec_cpg_mcast->fraglen;
1422  struct qb_list_head *iter, *pi_iter, *tmp_iter;
1423  struct cpg_pd *cpd;
1424  struct iovec iovec[2];
1425  int known_node = 0;
1426 
1427  log_printf(LOGSYS_LEVEL_DEBUG, "Got fragmented message from node " CS_PRI_NODE_ID ", size = %d bytes\n", nodeid, msglen);
1428 
1430  res_lib_cpg_mcast.header.size = sizeof(res_lib_cpg_mcast) + msglen;
1431  res_lib_cpg_mcast.fraglen = msglen;
1432  res_lib_cpg_mcast.msglen = req_exec_cpg_mcast->msglen;
1435  res_lib_cpg_mcast.nodeid = nodeid;
1436 
1437  memcpy(&res_lib_cpg_mcast.group_name, &req_exec_cpg_mcast->group_name,
1438  sizeof(mar_cpg_name_t));
1439  iovec[0].iov_base = (void *)&res_lib_cpg_mcast;
1440  iovec[0].iov_len = sizeof (res_lib_cpg_mcast);
1441 
1442  iovec[1].iov_base = (char*)message+sizeof(*req_exec_cpg_mcast);
1443  iovec[1].iov_len = msglen;
1444 
1445  qb_list_for_each_safe(iter, tmp_iter, &cpg_pd_list_head) {
1446  cpd = qb_list_entry(iter, struct cpg_pd, list);
1447 
1449  && (mar_name_compare (&cpd->group_name, &req_exec_cpg_mcast->group_name) == 0)) {
1450 
1451  if (!known_node) {
1452  /* Try to find, if we know the node */
1453  qb_list_for_each(pi_iter, &process_info_list_head) {
1454  struct process_info *pi = qb_list_entry (pi_iter, struct process_info, list);
1455 
1456  if (pi->nodeid == nodeid &&
1457  mar_name_compare (&pi->group, &req_exec_cpg_mcast->group_name) == 0) {
1458  known_node = 1;
1459  break;
1460  }
1461  }
1462  }
1463 
1464  if (!known_node) {
1465  log_printf(LOGSYS_LEVEL_WARNING, "Unknown node -> we will not deliver message");
1466  return ;
1467  }
1468 
1469  api->ipc_dispatch_iov_send (cpd->conn, iovec, 2);
1470  }
1471  }
1472 }
1473 
1474 
1475 static int cpg_exec_send_downlist(void)
1476 {
1477  struct iovec iov;
1478 
1479  g_req_exec_cpg_downlist.header.id = SERVICE_ID_MAKE(CPG_SERVICE, MESSAGE_REQ_EXEC_CPG_DOWNLIST);
1480  g_req_exec_cpg_downlist.header.size = sizeof(struct req_exec_cpg_downlist);
1481 
1482  g_req_exec_cpg_downlist.old_members = my_old_member_list_entries;
1483 
1484  iov.iov_base = (void *)&g_req_exec_cpg_downlist;
1485  iov.iov_len = g_req_exec_cpg_downlist.header.size;
1486 
1487  return (api->totem_mcast (&iov, 1, TOTEM_AGREED));
1488 }
1489 
1490 static int cpg_exec_send_joinlist(void)
1491 {
1492  int count = 0;
1493  struct qb_list_head *iter;
1494  struct qb_ipc_response_header *res;
1495  char *buf;
1496  size_t buf_size;
1497  struct join_list_entry *jle;
1498  struct iovec req_exec_cpg_iovec;
1499 
1500  qb_list_for_each(iter, &process_info_list_head) {
1501  struct process_info *pi = qb_list_entry (iter, struct process_info, list);
1502 
1503  if (pi->nodeid == api->totem_nodeid_get ()) {
1504  count++;
1505  }
1506  }
1507 
1508  /* Nothing to send */
1509  if (!count)
1510  return 0;
1511 
1512  buf_size = sizeof(struct qb_ipc_response_header) + sizeof(struct join_list_entry) * count;
1513  buf = alloca(buf_size);
1514  if (!buf) {
1515  log_printf(LOGSYS_LEVEL_WARNING, "Unable to allocate joinlist buffer");
1516  return -1;
1517  }
1518  memset(buf, 0, buf_size);
1519 
1520  jle = (struct join_list_entry *)(buf + sizeof(struct qb_ipc_response_header));
1521  res = (struct qb_ipc_response_header *)buf;
1522 
1523  qb_list_for_each(iter, &process_info_list_head) {
1524  struct process_info *pi = qb_list_entry (iter, struct process_info, list);
1525 
1526  if (pi->nodeid == api->totem_nodeid_get ()) {
1527  memcpy (&jle->group_name, &pi->group, sizeof (mar_cpg_name_t));
1528  jle->pid = pi->pid;
1529  jle++;
1530  }
1531  }
1532 
1534  res->size = sizeof(struct qb_ipc_response_header)+sizeof(struct join_list_entry) * count;
1535 
1536  req_exec_cpg_iovec.iov_base = buf;
1537  req_exec_cpg_iovec.iov_len = res->size;
1538 
1539  return (api->totem_mcast (&req_exec_cpg_iovec, 1, TOTEM_AGREED));
1540 }
1541 
1542 static int cpg_lib_init_fn (void *conn)
1543 {
1544  struct cpg_pd *cpd = (struct cpg_pd *)api->ipc_private_data_get (conn);
1545  memset (cpd, 0, sizeof(struct cpg_pd));
1546  cpd->conn = conn;
1547  qb_list_add (&cpd->list, &cpg_pd_list_head);
1548 
1549  qb_list_init (&cpd->iteration_instance_list_head);
1550  qb_list_init (&cpd->zcb_mapped_list_head);
1551 
1552  api->ipc_refcnt_inc (conn);
1553  log_printf(LOGSYS_LEVEL_DEBUG, "lib_init_fn: conn=%p, cpd=%p", conn, cpd);
1554  return (0);
1555 }
1556 
1557 /* Join message from the library */
1558 static void message_handler_req_lib_cpg_join (void *conn, const void *message)
1559 {
1560  const struct req_lib_cpg_join *req_lib_cpg_join = message;
1561  struct cpg_pd *cpd = (struct cpg_pd *)api->ipc_private_data_get (conn);
1563  cs_error_t error = CS_OK;
1564  struct qb_list_head *iter;
1565 
1566  /* Test, if we don't have same pid and group name joined */
1567  qb_list_for_each(iter, &cpg_pd_list_head) {
1568  struct cpg_pd *cpd_item = qb_list_entry (iter, struct cpg_pd, list);
1569 
1570  if (cpd_item->pid == req_lib_cpg_join->pid &&
1571  mar_name_compare(&req_lib_cpg_join->group_name, &cpd_item->group_name) == 0) {
1572 
1573  /* We have same pid and group name joined -> return error */
1574  error = CS_ERR_EXIST;
1575  goto response_send;
1576  }
1577  }
1578 
1579  /*
1580  * Same check must be done in process info list, because there may be not yet delivered
1581  * leave of client.
1582  */
1583  qb_list_for_each(iter, &process_info_list_head) {
1584  struct process_info *pi = qb_list_entry (iter, struct process_info, list);
1585 
1586  if (pi->nodeid == api->totem_nodeid_get () && pi->pid == req_lib_cpg_join->pid &&
1587  mar_name_compare(&req_lib_cpg_join->group_name, &pi->group) == 0) {
1588  /* We have same pid and group name joined -> return error */
1589  error = CS_ERR_TRY_AGAIN;
1590  goto response_send;
1591  }
1592  }
1593 
1594  if (req_lib_cpg_join->group_name.length > CPG_MAX_NAME_LENGTH) {
1595  error = CS_ERR_NAME_TOO_LONG;
1596  goto response_send;
1597  }
1598 
1599  switch (cpd->cpd_state) {
1600  case CPD_STATE_UNJOINED:
1601  error = CS_OK;
1603  cpd->pid = req_lib_cpg_join->pid;
1604  cpd->flags = req_lib_cpg_join->flags;
1605  memcpy (&cpd->group_name, &req_lib_cpg_join->group_name,
1606  sizeof (cpd->group_name));
1607 
1608  cpg_node_joinleave_send (req_lib_cpg_join->pid,
1609  &req_lib_cpg_join->group_name,
1611  break;
1613  error = CS_ERR_BUSY;
1614  break;
1616  error = CS_ERR_EXIST;
1617  break;
1619  error = CS_ERR_EXIST;
1620  break;
1621  }
1622 
1623 response_send:
1624  res_lib_cpg_join.header.size = sizeof(res_lib_cpg_join);
1626  res_lib_cpg_join.header.error = error;
1627  api->ipc_response_send (conn, &res_lib_cpg_join, sizeof(res_lib_cpg_join));
1628 }
1629 
1630 /* Leave message from the library */
1631 static void message_handler_req_lib_cpg_leave (void *conn, const void *message)
1632 {
1634  cs_error_t error = CS_OK;
1635  struct req_lib_cpg_leave *req_lib_cpg_leave = (struct req_lib_cpg_leave *)message;
1636  struct cpg_pd *cpd = (struct cpg_pd *)api->ipc_private_data_get (conn);
1637 
1638  log_printf(LOGSYS_LEVEL_DEBUG, "got leave request on %p", conn);
1639 
1640  switch (cpd->cpd_state) {
1641  case CPD_STATE_UNJOINED:
1642  error = CS_ERR_NOT_EXIST;
1643  break;
1645  error = CS_ERR_NOT_EXIST;
1646  break;
1648  error = CS_ERR_BUSY;
1649  break;
1651  error = CS_OK;
1653  cpg_node_joinleave_send (req_lib_cpg_leave->pid,
1654  &req_lib_cpg_leave->group_name,
1657  break;
1658  }
1659 
1660  /* send return */
1661  res_lib_cpg_leave.header.size = sizeof(res_lib_cpg_leave);
1663  res_lib_cpg_leave.header.error = error;
1665 }
1666 
1667 /* Finalize message from library */
1668 static void message_handler_req_lib_cpg_finalize (
1669  void *conn,
1670  const void *message)
1671 {
1672  struct cpg_pd *cpd = (struct cpg_pd *)api->ipc_private_data_get (conn);
1674  cs_error_t error = CS_OK;
1675 
1676  log_printf (LOGSYS_LEVEL_DEBUG, "cpg finalize for conn=%p", conn);
1677 
1678  /*
1679  * We will just remove cpd from list. After this call, connection will be
1680  * closed on lib side, and cpg_lib_exit_fn will be called
1681  */
1682  qb_list_del (&cpd->list);
1683  qb_list_init (&cpd->list);
1684 
1685  res_lib_cpg_finalize.header.size = sizeof (res_lib_cpg_finalize);
1687  res_lib_cpg_finalize.header.error = error;
1688 
1690  sizeof (res_lib_cpg_finalize));
1691 }
1692 
1693 static int
1694 memory_map (
1695  const char *path,
1696  size_t bytes,
1697  void **buf)
1698 {
1699  int32_t fd;
1700  void *addr;
1701  int32_t res;
1702 
1703  fd = open (path, O_RDWR, 0600);
1704 
1705  unlink (path);
1706 
1707  if (fd == -1) {
1708  return (-1);
1709  }
1710 
1711  res = ftruncate (fd, bytes);
1712  if (res == -1) {
1713  goto error_close_unlink;
1714  }
1715 
1716  addr = mmap (NULL, bytes, PROT_READ | PROT_WRITE,
1717  MAP_SHARED, fd, 0);
1718 
1719  if (addr == MAP_FAILED) {
1720  goto error_close_unlink;
1721  }
1722 #ifdef MADV_NOSYNC
1723  madvise(addr, bytes, MADV_NOSYNC);
1724 #endif
1725 
1726  res = close (fd);
1727  if (res) {
1728  munmap (addr, bytes);
1729  return (-1);
1730  }
1731  *buf = addr;
1732  return (0);
1733 
1734 error_close_unlink:
1735  close (fd);
1736  unlink(path);
1737  return -1;
1738 }
1739 
1740 static inline int zcb_alloc (
1741  struct cpg_pd *cpd,
1742  const char *path_to_file,
1743  size_t size,
1744  void **addr)
1745 {
1746  struct zcb_mapped *zcb_mapped;
1747  unsigned int res;
1748 
1749  zcb_mapped = malloc (sizeof (struct zcb_mapped));
1750  if (zcb_mapped == NULL) {
1751  return (-1);
1752  }
1753 
1754  res = memory_map (
1755  path_to_file,
1756  size,
1757  addr);
1758  if (res == -1) {
1759  free (zcb_mapped);
1760  return (-1);
1761  }
1762 
1763  qb_list_init (&zcb_mapped->list);
1764  zcb_mapped->addr = *addr;
1765  zcb_mapped->size = size;
1766  qb_list_add_tail (&zcb_mapped->list, &cpd->zcb_mapped_list_head);
1767  return (0);
1768 }
1769 
1770 
1771 static inline int zcb_free (struct zcb_mapped *zcb_mapped)
1772 {
1773  unsigned int res;
1774 
1775  res = munmap (zcb_mapped->addr, zcb_mapped->size);
1776  qb_list_del (&zcb_mapped->list);
1777  free (zcb_mapped);
1778  return (res);
1779 }
1780 
1781 static inline int zcb_by_addr_free (struct cpg_pd *cpd, void *addr)
1782 {
1783  struct qb_list_head *list, *tmp_iter;
1784  struct zcb_mapped *zcb_mapped;
1785  unsigned int res = 0;
1786 
1787  qb_list_for_each_safe(list, tmp_iter, &(cpd->zcb_mapped_list_head)) {
1788  zcb_mapped = qb_list_entry (list, struct zcb_mapped, list);
1789 
1790  if (zcb_mapped->addr == addr) {
1791  res = zcb_free (zcb_mapped);
1792  break;
1793  }
1794 
1795  }
1796  return (res);
1797 }
1798 
1799 static inline int zcb_all_free (
1800  struct cpg_pd *cpd)
1801 {
1802  struct qb_list_head *list, *tmp_iter;
1803  struct zcb_mapped *zcb_mapped;
1804 
1805  qb_list_for_each_safe(list, tmp_iter, &(cpd->zcb_mapped_list_head)) {
1806  zcb_mapped = qb_list_entry (list, struct zcb_mapped, list);
1807 
1808  zcb_free (zcb_mapped);
1809  }
1810  return (0);
1811 }
1812 
1813 union u {
1814  uint64_t server_addr;
1815  void *server_ptr;
1816 };
1817 
1818 static uint64_t void2serveraddr (void *server_ptr)
1819 {
1820  union u u;
1821 
1823  return (u.server_addr);
1824 }
1825 
1826 static void *serveraddr2void (uint64_t server_addr)
1827 {
1828  union u u;
1829 
1831  return (u.server_ptr);
1832 };
1833 
1834 static void message_handler_req_lib_cpg_zc_alloc (
1835  void *conn,
1836  const void *message)
1837 {
1839  struct qb_ipc_response_header res_header;
1840  void *addr = NULL;
1841  struct coroipcs_zc_header *zc_header;
1842  unsigned int res;
1843  struct cpg_pd *cpd = (struct cpg_pd *)api->ipc_private_data_get (conn);
1844 
1845  log_printf(LOGSYS_LEVEL_DEBUG, "path: %s", hdr->path_to_file);
1846 
1847  res = zcb_alloc (cpd, hdr->path_to_file, hdr->map_size,
1848  &addr);
1849  assert(res == 0);
1850 
1851  zc_header = (struct coroipcs_zc_header *)addr;
1852  zc_header->server_address = void2serveraddr(addr);
1853 
1854  res_header.size = sizeof (struct qb_ipc_response_header);
1855  res_header.id = 0;
1856  api->ipc_response_send (conn,
1857  &res_header,
1858  res_header.size);
1859 }
1860 
1861 static void message_handler_req_lib_cpg_zc_free (
1862  void *conn,
1863  const void *message)
1864 {
1866  struct qb_ipc_response_header res_header;
1867  void *addr = NULL;
1868  struct cpg_pd *cpd = (struct cpg_pd *)api->ipc_private_data_get (conn);
1869 
1870  log_printf(LOGSYS_LEVEL_DEBUG, " free'ing");
1871 
1872  addr = serveraddr2void (hdr->server_address);
1873 
1874  zcb_by_addr_free (cpd, addr);
1875 
1876  res_header.size = sizeof (struct qb_ipc_response_header);
1877  res_header.id = 0;
1878  api->ipc_response_send (
1879  conn, &res_header,
1880  res_header.size);
1881 }
1882 
1883 /* Fragmented mcast message from the library */
1884 static void message_handler_req_lib_cpg_partial_mcast (void *conn, const void *message)
1885 {
1886  const struct req_lib_cpg_partial_mcast *req_lib_cpg_mcast = message;
1887  struct cpg_pd *cpd = (struct cpg_pd *)api->ipc_private_data_get (conn);
1889 
1890  struct iovec req_exec_cpg_iovec[2];
1893  int msglen = req_lib_cpg_mcast->fraglen;
1894  int result;
1895  cs_error_t error = CS_ERR_NOT_EXIST;
1896 
1897  log_printf(LOGSYS_LEVEL_TRACE, "got fragmented mcast request on %p", conn);
1898  log_printf(LOGSYS_LEVEL_DEBUG, "Sending fragmented message size = %d bytes\n", msglen);
1899 
1900  switch (cpd->cpd_state) {
1901  case CPD_STATE_UNJOINED:
1902  error = CS_ERR_NOT_EXIST;
1903  break;
1905  error = CS_ERR_NOT_EXIST;
1906  break;
1908  error = CS_OK;
1909  break;
1911  error = CS_OK;
1912  break;
1913  }
1914 
1915  res_lib_cpg_partial_send.header.size = sizeof(res_lib_cpg_partial_send);
1917 
1918  if (req_lib_cpg_mcast->type == LIBCPG_PARTIAL_FIRST) {
1920  }
1922  error = CS_ERR_INTERRUPT;
1923  }
1924 
1925  if (error == CS_OK) {
1926  req_exec_cpg_mcast.header.size = sizeof(req_exec_cpg_mcast) + msglen;
1929  req_exec_cpg_mcast.pid = cpd->pid;
1930  req_exec_cpg_mcast.msglen = req_lib_cpg_mcast->msglen;
1932  req_exec_cpg_mcast.fraglen = req_lib_cpg_mcast->fraglen;
1933  api->ipc_source_set (&req_exec_cpg_mcast.source, conn);
1934  memcpy(&req_exec_cpg_mcast.group_name, &group_name,
1935  sizeof(mar_cpg_name_t));
1936 
1937  req_exec_cpg_iovec[0].iov_base = (char *)&req_exec_cpg_mcast;
1938  req_exec_cpg_iovec[0].iov_len = sizeof(req_exec_cpg_mcast);
1939  req_exec_cpg_iovec[1].iov_base = (char *)&req_lib_cpg_mcast->message;
1940  req_exec_cpg_iovec[1].iov_len = msglen;
1941 
1942  result = api->totem_mcast (req_exec_cpg_iovec, 2, TOTEM_AGREED);
1943  assert(result == 0);
1944  } else {
1945  log_printf(LOGSYS_LEVEL_ERROR, "*** %p can't mcast to group %s state:%d, error:%d",
1946  conn, group_name.value, cpd->cpd_state, error);
1947  }
1948 
1949  res_lib_cpg_partial_send.header.error = error;
1951  sizeof (res_lib_cpg_partial_send));
1952 }
1953 
1954 /* Mcast message from the library */
1955 static void message_handler_req_lib_cpg_mcast (void *conn, const void *message)
1956 {
1957  const struct req_lib_cpg_mcast *req_lib_cpg_mcast = message;
1958  struct cpg_pd *cpd = (struct cpg_pd *)api->ipc_private_data_get (conn);
1960 
1961  struct iovec req_exec_cpg_iovec[2];
1963  int msglen = req_lib_cpg_mcast->msglen;
1964  int result;
1965  cs_error_t error = CS_ERR_NOT_EXIST;
1966 
1967  log_printf(LOGSYS_LEVEL_TRACE, "got mcast request on %p", conn);
1968 
1969  switch (cpd->cpd_state) {
1970  case CPD_STATE_UNJOINED:
1971  error = CS_ERR_NOT_EXIST;
1972  break;
1974  error = CS_ERR_NOT_EXIST;
1975  break;
1977  error = CS_OK;
1978  break;
1980  error = CS_OK;
1981  break;
1982  }
1983 
1984  if (error == CS_OK) {
1985  memset(&req_exec_cpg_mcast, 0, sizeof(req_exec_cpg_mcast));
1986 
1987  req_exec_cpg_mcast.header.size = sizeof(req_exec_cpg_mcast) + msglen;
1990  req_exec_cpg_mcast.pid = cpd->pid;
1991  req_exec_cpg_mcast.msglen = msglen;
1992  api->ipc_source_set (&req_exec_cpg_mcast.source, conn);
1993  memcpy(&req_exec_cpg_mcast.group_name, &group_name,
1994  sizeof(mar_cpg_name_t));
1995 
1996  req_exec_cpg_iovec[0].iov_base = (char *)&req_exec_cpg_mcast;
1997  req_exec_cpg_iovec[0].iov_len = sizeof(req_exec_cpg_mcast);
1998  req_exec_cpg_iovec[1].iov_base = (char *)&req_lib_cpg_mcast->message;
1999  req_exec_cpg_iovec[1].iov_len = msglen;
2000 
2001  result = api->totem_mcast (req_exec_cpg_iovec, 2, TOTEM_AGREED);
2002  assert(result == 0);
2003  } else {
2004  log_printf(LOGSYS_LEVEL_ERROR, "*** %p can't mcast to group %s state:%d, error:%d",
2005  conn, group_name.value, cpd->cpd_state, error);
2006  }
2007 }
2008 
2009 static void message_handler_req_lib_cpg_zc_execute (
2010  void *conn,
2011  const void *message)
2012 {
2014  struct qb_ipc_request_header *header;
2016  struct cpg_pd *cpd = (struct cpg_pd *)api->ipc_private_data_get (conn);
2017  struct iovec req_exec_cpg_iovec[2];
2020  int result;
2021  cs_error_t error = CS_ERR_NOT_EXIST;
2022 
2023  log_printf(LOGSYS_LEVEL_TRACE, "got ZC mcast request on %p", conn);
2024 
2025  header = (struct qb_ipc_request_header *)(((char *)serveraddr2void(hdr->server_address) + sizeof (struct coroipcs_zc_header)));
2027 
2028  switch (cpd->cpd_state) {
2029  case CPD_STATE_UNJOINED:
2030  error = CS_ERR_NOT_EXIST;
2031  break;
2033  error = CS_ERR_NOT_EXIST;
2034  break;
2036  error = CS_OK;
2037  break;
2039  error = CS_OK;
2040  break;
2041  }
2042 
2043  res_lib_cpg_mcast.header.size = sizeof(res_lib_cpg_mcast);
2045  if (error == CS_OK) {
2046  req_exec_cpg_mcast.header.size = sizeof(req_exec_cpg_mcast) + req_lib_cpg_mcast->msglen;
2049  req_exec_cpg_mcast.pid = cpd->pid;
2050  req_exec_cpg_mcast.msglen = req_lib_cpg_mcast->msglen;
2051  api->ipc_source_set (&req_exec_cpg_mcast.source, conn);
2052  memcpy(&req_exec_cpg_mcast.group_name, &cpd->group_name,
2053  sizeof(mar_cpg_name_t));
2054 
2055  req_exec_cpg_iovec[0].iov_base = (char *)&req_exec_cpg_mcast;
2056  req_exec_cpg_iovec[0].iov_len = sizeof(req_exec_cpg_mcast);
2057  req_exec_cpg_iovec[1].iov_base = (char *)header + sizeof(struct req_lib_cpg_mcast);
2058  req_exec_cpg_iovec[1].iov_len = req_exec_cpg_mcast.msglen;
2059 
2060  result = api->totem_mcast (req_exec_cpg_iovec, 2, TOTEM_AGREED);
2061  if (result == 0) {
2062  res_lib_cpg_mcast.header.error = CS_OK;
2063  } else {
2064  res_lib_cpg_mcast.header.error = CS_ERR_TRY_AGAIN;
2065  }
2066  } else {
2067  res_lib_cpg_mcast.header.error = error;
2068  }
2069 
2070  api->ipc_response_send (conn, &res_lib_cpg_mcast,
2071  sizeof (res_lib_cpg_mcast));
2072 
2073 }
2074 
2075 static void message_handler_req_lib_cpg_membership (void *conn,
2076  const void *message)
2077 {
2079  (struct req_lib_cpg_membership_get *)message;
2081  struct qb_list_head *iter;
2082  int member_count = 0;
2083 
2085  res_lib_cpg_membership_get.header.error = CS_OK;
2086  res_lib_cpg_membership_get.header.size =
2087  sizeof (struct res_lib_cpg_membership_get);
2088 
2089  qb_list_for_each(iter, &process_info_list_head) {
2090  struct process_info *pi = qb_list_entry (iter, struct process_info, list);
2091  if (mar_name_compare (&pi->group, &req_lib_cpg_membership_get->group_name) == 0) {
2092  res_lib_cpg_membership_get.member_list[member_count].nodeid = pi->nodeid;
2093  res_lib_cpg_membership_get.member_list[member_count].pid = pi->pid;
2094  member_count += 1;
2095  }
2096  }
2097  res_lib_cpg_membership_get.member_count = member_count;
2098 
2100  sizeof (res_lib_cpg_membership_get));
2101 }
2102 
2103 static void message_handler_req_lib_cpg_local_get (void *conn,
2104  const void *message)
2105 {
2107 
2108  res_lib_cpg_local_get.header.size = sizeof (res_lib_cpg_local_get);
2110  res_lib_cpg_local_get.header.error = CS_OK;
2111  res_lib_cpg_local_get.local_nodeid = api->totem_nodeid_get ();
2112 
2114  sizeof (res_lib_cpg_local_get));
2115 }
2116 
2117 static void message_handler_req_lib_cpg_iteration_initialize (
2118  void *conn,
2119  const void *message)
2120 {
2122  struct cpg_pd *cpd = (struct cpg_pd *)api->ipc_private_data_get (conn);
2123  hdb_handle_t cpg_iteration_handle = 0;
2125  struct qb_list_head *iter, *iter2;
2127  cs_error_t error = CS_OK;
2128  int res;
2129 
2130  log_printf (LOGSYS_LEVEL_DEBUG, "cpg iteration initialize");
2131 
2132  /* Because between calling this function and *next can be some operations which will
2133  * change list, we must do full copy.
2134  */
2135 
2136  /*
2137  * Create new iteration instance
2138  */
2139  res = hdb_handle_create (&cpg_iteration_handle_t_db, sizeof (struct cpg_iteration_instance),
2140  &cpg_iteration_handle);
2141 
2142  if (res != 0) {
2143  error = CS_ERR_NO_MEMORY;
2144  goto response_send;
2145  }
2146 
2147  res = hdb_handle_get (&cpg_iteration_handle_t_db, cpg_iteration_handle, (void *)&cpg_iteration_instance);
2148 
2149  if (res != 0) {
2150  error = CS_ERR_BAD_HANDLE;
2151  goto error_destroy;
2152  }
2153 
2154  qb_list_init (&cpg_iteration_instance->items_list_head);
2155  cpg_iteration_instance->handle = cpg_iteration_handle;
2156 
2157  /*
2158  * Create copy of process_info list "grouped by" group name
2159  */
2160  qb_list_for_each(iter, &process_info_list_head) {
2161  struct process_info *pi = qb_list_entry (iter, struct process_info, list);
2162  struct process_info *new_pi;
2163 
2165  /*
2166  * Try to find processed group name in our list new list
2167  */
2168  int found = 0;
2169 
2170  qb_list_for_each(iter2, &(cpg_iteration_instance->items_list_head)) {
2171  struct process_info *pi2 = qb_list_entry (iter2, struct process_info, list);
2172 
2173  if (mar_name_compare (&pi2->group, &pi->group) == 0) {
2174  found = 1;
2175  break;
2176  }
2177  }
2178 
2179  if (found) {
2180  /*
2181  * We have this name in list -> don't add
2182  */
2183  continue ;
2184  }
2185  } else if (req_lib_cpg_iterationinitialize->iteration_type == CPG_ITERATION_ONE_GROUP) {
2186  /*
2187  * Test pi group name with request
2188  */
2189  if (mar_name_compare (&pi->group, &req_lib_cpg_iterationinitialize->group_name) != 0)
2190  /*
2191  * Not same -> don't add
2192  */
2193  continue ;
2194  }
2195 
2196  new_pi = malloc (sizeof (struct process_info));
2197  if (!new_pi) {
2198  log_printf(LOGSYS_LEVEL_WARNING, "Unable to allocate process_info struct");
2199 
2200  error = CS_ERR_NO_MEMORY;
2201 
2202  goto error_put_destroy;
2203  }
2204 
2205  memcpy (new_pi, pi, sizeof (struct process_info));
2206  qb_list_init (&new_pi->list);
2207 
2209  /*
2210  * pid and nodeid -> undefined
2211  */
2212  new_pi->pid = new_pi->nodeid = 0;
2213  }
2214 
2215  /*
2216  * We will return list "grouped" by "group name", so try to find right place to add
2217  */
2218  qb_list_for_each(iter2, &(cpg_iteration_instance->items_list_head)) {
2219  struct process_info *pi2 = qb_list_entry (iter2, struct process_info, list);
2220 
2221  if (mar_name_compare (&pi2->group, &pi->group) == 0) {
2222  break;
2223  }
2224  }
2225 
2226  qb_list_add (&new_pi->list, iter2);
2227  }
2228 
2229  /*
2230  * Now we have a full "grouped by" copy of process_info list
2231  */
2232 
2233  /*
2234  * Add instance to current cpd list
2235  */
2236  qb_list_init (&cpg_iteration_instance->list);
2238 
2240 
2241 error_put_destroy:
2242  hdb_handle_put (&cpg_iteration_handle_t_db, cpg_iteration_handle);
2243 error_destroy:
2244  if (error != CS_OK) {
2245  hdb_handle_destroy (&cpg_iteration_handle_t_db, cpg_iteration_handle);
2246  }
2247 
2248 response_send:
2251  res_lib_cpg_iterationinitialize.header.error = error;
2252  res_lib_cpg_iterationinitialize.iteration_handle = cpg_iteration_handle;
2253 
2256 }
2257 
2258 static void message_handler_req_lib_cpg_iteration_next (
2259  void *conn,
2260  const void *message)
2261 {
2262  const struct req_lib_cpg_iterationnext *req_lib_cpg_iterationnext = message;
2265  cs_error_t error = CS_OK;
2266  int res;
2267  struct process_info *pi;
2268 
2269  log_printf (LOGSYS_LEVEL_DEBUG, "cpg iteration next");
2270 
2271  res = hdb_handle_get (&cpg_iteration_handle_t_db,
2272  req_lib_cpg_iterationnext->iteration_handle,
2273  (void *)&cpg_iteration_instance);
2274 
2275  if (res != 0) {
2276  error = CS_ERR_LIBRARY;
2277  goto error_exit;
2278  }
2279 
2280  assert (cpg_iteration_instance);
2281 
2283 
2285  error = CS_ERR_NO_SECTIONS;
2286  goto error_put;
2287  }
2288 
2289  pi = qb_list_entry (cpg_iteration_instance->current_pointer, struct process_info, list);
2290 
2291  /*
2292  * Copy iteration data
2293  */
2294  res_lib_cpg_iterationnext.description.nodeid = pi->nodeid;
2295  res_lib_cpg_iterationnext.description.pid = pi->pid;
2296  memcpy (&res_lib_cpg_iterationnext.description.group,
2297  &pi->group,
2298  sizeof (mar_cpg_name_t));
2299 
2300 error_put:
2301  hdb_handle_put (&cpg_iteration_handle_t_db, req_lib_cpg_iterationnext->iteration_handle);
2302 error_exit:
2305  res_lib_cpg_iterationnext.header.error = error;
2306 
2308  sizeof (res_lib_cpg_iterationnext));
2309 }
2310 
2311 static void message_handler_req_lib_cpg_iteration_finalize (
2312  void *conn,
2313  const void *message)
2314 {
2318  cs_error_t error = CS_OK;
2319  int res;
2320 
2321  log_printf (LOGSYS_LEVEL_DEBUG, "cpg iteration finalize");
2322 
2323  res = hdb_handle_get (&cpg_iteration_handle_t_db,
2324  req_lib_cpg_iterationfinalize->iteration_handle,
2325  (void *)&cpg_iteration_instance);
2326 
2327  if (res != 0) {
2328  error = CS_ERR_LIBRARY;
2329  goto error_exit;
2330  }
2331 
2332  assert (cpg_iteration_instance);
2333 
2334  cpg_iteration_instance_finalize (cpg_iteration_instance);
2335  hdb_handle_put (&cpg_iteration_handle_t_db, cpg_iteration_instance->handle);
2336 
2337 error_exit:
2340  res_lib_cpg_iterationfinalize.header.error = error;
2341 
2344 }
#define SERVICE_ID_MAKE(a, b)
Definition: coroapi.h:458
@ CS_LIB_ALLOW_INQUORATE
Definition: coroapi.h:164
unsigned int nodeid
Definition: coroapi.h:0
unsigned char addr[TOTEMIP_ADDRLEN]
Definition: coroapi.h:2
@ CS_LIB_FLOW_CONTROL_REQUIRED
Definition: coroapi.h:152
@ CS_LIB_FLOW_CONTROL_NOT_REQUIRED
Definition: coroapi.h:153
#define TOTEM_AGREED
Definition: coroapi.h:102
#define PROCESSOR_COUNT_MAX
Definition: coroapi.h:96
@ CPG_SERVICE
Definition: corodefs.h:46
#define CS_PRI_NODE_ID
Definition: corotypes.h:59
cs_error_t
The cs_error_t enum.
Definition: corotypes.h:97
@ CS_ERR_NO_SECTIONS
Definition: corotypes.h:124
@ CS_ERR_NAME_TOO_LONG
Definition: corotypes.h:110
@ CS_ERR_NO_MEMORY
Definition: corotypes.h:105
@ CS_ERR_BUSY
Definition: corotypes.h:107
@ CS_ERR_BAD_HANDLE
Definition: corotypes.h:106
@ CS_ERR_TRY_AGAIN
Definition: corotypes.h:103
@ CS_OK
Definition: corotypes.h:98
@ CS_ERR_LIBRARY
Definition: corotypes.h:99
@ CS_ERR_INTERRUPT
Definition: corotypes.h:113
@ CS_ERR_NOT_EXIST
Definition: corotypes.h:109
@ CS_ERR_EXIST
Definition: corotypes.h:111
struct corosync_service_engine cpg_service_engine
Definition: exec/cpg.c:433
QB_LIST_DECLARE(cpg_pd_list_head)
cpg_message_req_types
Definition: exec/cpg.c:79
@ MESSAGE_REQ_EXEC_CPG_JOINLIST
Definition: exec/cpg.c:82
@ MESSAGE_REQ_EXEC_CPG_PARTIAL_MCAST
Definition: exec/cpg.c:86
@ MESSAGE_REQ_EXEC_CPG_PROCLEAVE
Definition: exec/cpg.c:81
@ MESSAGE_REQ_EXEC_CPG_DOWNLIST_OLD
Definition: exec/cpg.c:84
@ MESSAGE_REQ_EXEC_CPG_MCAST
Definition: exec/cpg.c:83
@ MESSAGE_REQ_EXEC_CPG_PROCJOIN
Definition: exec/cpg.c:80
@ MESSAGE_REQ_EXEC_CPG_DOWNLIST
Definition: exec/cpg.c:85
DECLARE_HDB_DATABASE(cpg_iteration_handle_t_db, NULL)
struct corosync_service_engine * cpg_get_service_engine_ver0(void)
Definition: exec/cpg.c:454
cpd_state
Definition: exec/cpg.c:131
@ CPD_STATE_JOIN_STARTED
Definition: exec/cpg.c:134
@ CPD_STATE_LEAVE_STARTED
Definition: exec/cpg.c:133
@ CPD_STATE_UNJOINED
Definition: exec/cpg.c:132
@ CPD_STATE_JOIN_COMPLETED
Definition: exec/cpg.c:135
LOGSYS_DECLARE_SUBSYS("CPG")
cpg_sync_state
Definition: exec/cpg.c:138
@ CPGSYNC_DOWNLIST
Definition: exec/cpg.c:139
@ CPGSYNC_JOINLIST
Definition: exec/cpg.c:140
#define CPG_MAX_NAME_LENGTH
Definition: cpg.h:116
#define CPG_MODEL_V1_DELIVER_INITIAL_TOTEM_CONF
Definition: cpg.h:193
#define CPG_MEMBERS_MAX
Definition: cpg.h:125
@ CPG_ITERATION_ONE_GROUP
Definition: cpg.h:96
@ CPG_ITERATION_NAME_ONLY
Definition: cpg.h:95
@ CPG_REASON_UNDEFINED
Definition: cpg.h:83
qb_handle_t hdb_handle_t
Definition: hdb.h:52
@ LIBCPG_PARTIAL_FIRST
Definition: ipc_cpg.h:104
@ MESSAGE_RES_CPG_ITERATIONFINALIZE
Definition: ipc_cpg.h:79
@ MESSAGE_RES_CPG_MEMBERSHIP
Definition: ipc_cpg.h:71
@ MESSAGE_RES_CPG_LEAVE
Definition: ipc_cpg.h:69
@ MESSAGE_RES_CPG_MCAST
Definition: ipc_cpg.h:70
@ MESSAGE_RES_CPG_PARTIAL_DELIVER_CALLBACK
Definition: ipc_cpg.h:85
@ MESSAGE_RES_CPG_FINALIZE
Definition: ipc_cpg.h:80
@ MESSAGE_RES_CPG_DELIVER_CALLBACK
Definition: ipc_cpg.h:73
@ MESSAGE_RES_CPG_TOTEM_CONFCHG_CALLBACK
Definition: ipc_cpg.h:81
@ MESSAGE_RES_CPG_ITERATIONNEXT
Definition: ipc_cpg.h:78
@ MESSAGE_RES_CPG_CONFCHG_CALLBACK
Definition: ipc_cpg.h:72
@ MESSAGE_RES_CPG_PARTIAL_SEND
Definition: ipc_cpg.h:86
@ MESSAGE_RES_CPG_ITERATIONINITIALIZE
Definition: ipc_cpg.h:77
@ MESSAGE_RES_CPG_LOCAL_GET
Definition: ipc_cpg.h:75
@ MESSAGE_RES_CPG_JOIN
Definition: ipc_cpg.h:68
@ CONFCHG_CPG_REASON_LEAVE
Definition: ipc_cpg.h:94
@ CONFCHG_CPG_REASON_PROCDOWN
Definition: ipc_cpg.h:97
@ CONFCHG_CPG_REASON_NODEUP
Definition: ipc_cpg.h:96
@ CONFCHG_CPG_REASON_JOIN
Definition: ipc_cpg.h:93
@ CONFCHG_CPG_REASON_NODEDOWN
Definition: ipc_cpg.h:95
#define LOGSYS_LEVEL_ERROR
Definition: logsys.h:72
#define log_printf(level, format, args...)
Definition: logsys.h:323
#define LOGSYS_LEVEL_WARNING
Definition: logsys.h:73
#define LOGSYS_LEVEL_DEBUG
Definition: logsys.h:76
#define LOGSYS_LEVEL_TRACE
Definition: logsys.h:77
uint32_t mar_uint32_t
Definition: mar_gen.h:53
uint8_t mar_uint8_t
Definition: mar_gen.h:51
coroipcs_zc_header struct
Definition: ipc_cpg.h:498
uint64_t server_address
Definition: ipc_cpg.h:500
The corosync_api_v1 struct.
Definition: coroapi.h:225
int(* totem_mcast)(const struct iovec *iovec, unsigned int iov_len, unsigned int guarantee)
Definition: coroapi.h:279
unsigned int(* totem_nodeid_get)(void)
Definition: coroapi.h:275
void(* ipc_refcnt_dec)(void *conn)
Definition: coroapi.h:270
const char *(* totem_ifaces_print)(unsigned int nodeid)
Definition: coroapi.h:290
void(* ipc_source_set)(mar_message_source_t *source, void *conn)
Definition: coroapi.h:252
void(* ipc_refcnt_inc)(void *conn)
Definition: coroapi.h:268
int(* ipc_dispatch_send)(void *conn, const void *msg, size_t mlen)
Definition: coroapi.h:263
int(* ipc_response_send)(void *conn, const void *msg, size_t mlen)
Definition: coroapi.h:258
int(* ipc_dispatch_iov_send)(void *conn, const struct iovec *iov, unsigned int iov_len)
Definition: coroapi.h:265
void *(* ipc_private_data_get)(void *conn)
Definition: coroapi.h:256
The corosync_exec_handler struct.
Definition: coroapi.h:475
void(* exec_handler_fn)(const void *msg, unsigned int nodeid)
Definition: coroapi.h:476
The corosync_lib_handler struct.
Definition: coroapi.h:467
void(* lib_handler_fn)(void *conn, const void *msg)
Definition: coroapi.h:468
The corosync_service_engine struct.
Definition: coroapi.h:490
const char * name
Definition: coroapi.h:491
hdb_handle_t handle
Definition: exec/cpg.c:160
struct qb_list_head list
Definition: exec/cpg.c:161
struct qb_list_head items_list_head
Definition: exec/cpg.c:162
struct qb_list_head * current_pointer
Definition: exec/cpg.c:163
The cpg_name struct.
Definition: cpg.h:120
enum cpd_state cpd_state
Definition: exec/cpg.c:149
uint32_t pid
Definition: exec/cpg.c:148
uint64_t transition_counter
Definition: exec/cpg.c:152
int initial_totem_conf_sent
Definition: exec/cpg.c:151
mar_cpg_name_t group_name
Definition: exec/cpg.c:147
struct qb_list_head iteration_instance_list_head
Definition: exec/cpg.c:155
uint64_t initial_transition_counter
Definition: exec/cpg.c:153
struct qb_list_head list
Definition: exec/cpg.c:154
struct qb_list_head zcb_mapped_list_head
Definition: exec/cpg.c:156
void * conn
Definition: exec/cpg.c:146
unsigned int flags
Definition: exec/cpg.c:150
mar_cpg_address_t join_list[CPG_MEMBERS_MAX]
Definition: exec/cpg.c:199
mar_cpg_name_t cpg_group
Definition: exec/cpg.c:198
Definition: exec/cpg.c:192
uint32_t pid
Definition: exec/cpg.c:193
mar_cpg_name_t group_name
Definition: exec/cpg.c:194
mar_uint32_t sender_nodeid
Definition: exec/cpg.c:502
uint32_t pid
Definition: exec/cpg.c:503
struct qb_list_head list
Definition: exec/cpg.c:505
mar_cpg_name_t group_name
Definition: exec/cpg.c:504
mar_cpg_address_t struct
Definition: ipc_cpg.h:155
mar_cpg_name_t struct
Definition: ipc_cpg.h:112
mar_cpg_ring_id_t struct
Definition: ipc_cpg.h:230
The mar_message_source_t struct.
Definition: coroapi.h:50
mar_req_coroipcc_zc_alloc_t struct
Definition: ipc_cpg.h:472
mar_req_coroipcc_zc_execute_t struct
Definition: ipc_cpg.h:490
mar_req_coroipcc_zc_free_t struct
Definition: ipc_cpg.h:481
The memb_ring_id struct.
Definition: coroapi.h:122
unsigned long long seq
Definition: coroapi.h:124
unsigned int nodeid
Definition: coroapi.h:123
struct qb_list_head list
Definition: exec/cpg.c:188
uint32_t pid
Definition: exec/cpg.c:186
mar_cpg_name_t group
Definition: exec/cpg.c:187
unsigned int nodeid
Definition: exec/cpg.c:185
struct qb_ipc_request_header header __attribute__((aligned(8)))
mar_uint32_t nodeids[PROCESSOR_COUNT_MAX] __attribute__((aligned(8)))
mar_uint32_t left_nodes __attribute__((aligned(8)))
mar_uint32_t old_members __attribute__((aligned(8)))
mar_uint32_t left_nodes __attribute__((aligned(8)))
struct qb_ipc_request_header header __attribute__((aligned(8)))
mar_uint32_t nodeids[PROCESSOR_COUNT_MAX] __attribute__((aligned(8)))
struct qb_ipc_request_header header __attribute__((aligned(8)))
mar_cpg_name_t group_name __attribute__((aligned(8)))
mar_uint32_t pid __attribute__((aligned(8)))
mar_uint32_t msglen __attribute__((aligned(8)))
mar_message_source_t source __attribute__((aligned(8)))
mar_uint8_t message[] __attribute__((aligned(8)))
mar_uint32_t type __attribute__((aligned(8)))
mar_message_source_t source __attribute__((aligned(8)))
mar_cpg_name_t group_name __attribute__((aligned(8)))
mar_uint32_t pid __attribute__((aligned(8)))
struct qb_ipc_request_header header __attribute__((aligned(8)))
mar_uint32_t msglen __attribute__((aligned(8)))
mar_uint8_t message[] __attribute__((aligned(8)))
mar_uint32_t fraglen __attribute__((aligned(8)))
mar_uint32_t pid __attribute__((aligned(8)))
mar_uint32_t reason __attribute__((aligned(8)))
struct qb_ipc_request_header header __attribute__((aligned(8)))
mar_cpg_name_t group_name __attribute__((aligned(8)))
The req_lib_cpg_iterationfinalize struct.
Definition: ipc_cpg.h:457
The req_lib_cpg_iterationinitialize struct.
Definition: ipc_cpg.h:424
The req_lib_cpg_iterationnext struct.
Definition: ipc_cpg.h:441
The req_lib_cpg_join struct.
Definition: ipc_cpg.h:251
The req_lib_cpg_leave struct.
Definition: ipc_cpg.h:408
The req_lib_cpg_mcast struct.
Definition: ipc_cpg.h:304
The req_lib_cpg_membership_get struct.
Definition: ipc_cpg.h:367
The req_lib_cpg_partial_mcast struct.
Definition: ipc_cpg.h:314
The res_lib_cpg_confchg_callback struct.
Definition: ipc_cpg.h:384
mar_cpg_address_t member_list[]
Definition: ipc_cpg.h:390
Message from another node.
Definition: ipc_cpg.h:333
The res_lib_cpg_finalize struct.
Definition: ipc_cpg.h:275
The res_lib_cpg_iterationfinalize struct.
Definition: ipc_cpg.h:465
The res_lib_cpg_iterationinitialize struct.
Definition: ipc_cpg.h:433
The res_lib_cpg_iterationnext struct.
Definition: ipc_cpg.h:449
The res_lib_cpg_join struct.
Definition: ipc_cpg.h:261
The res_lib_cpg_leave struct.
Definition: ipc_cpg.h:417
The res_lib_cpg_local_get struct.
Definition: ipc_cpg.h:289
The res_lib_cpg_mcast struct.
Definition: ipc_cpg.h:326
The res_lib_cpg_membership_get struct.
Definition: ipc_cpg.h:375
mar_cpg_address_t member_list[PROCESSOR_COUNT_MAX]
Definition: ipc_cpg.h:378
The res_lib_cpg_partial_deliver_callback struct.
Definition: ipc_cpg.h:345
The res_lib_cpg_partial_send struct.
Definition: ipc_cpg.h:297
The res_lib_cpg_totem_confchg_callback struct.
Definition: ipc_cpg.h:398
size_t size
Definition: exec/cpg.c:92
void * addr
Definition: exec/cpg.c:91
struct qb_list_head list
Definition: exec/cpg.c:90
#define swab32(x)
The swab32 macro.
Definition: swab.h:51
char type
Definition: totem.h:2
struct memb_ring_id ring_id
Definition: totemsrp.c:4
struct totem_message_header header
Definition: totemsrp.c:0
Definition: exec/cpg.c:1813
uint64_t server_addr
Definition: exec/cpg.c:1814
void * server_ptr
Definition: exec/cpg.c:1815