root/daemons/attrd/attrd_sync.c

/* [previous][next][first][last][top][bottom][index][help] */

DEFINITIONS

This source file includes following definitions.
  1. next_key
  2. free_waitlist_node
  3. sync_point_str
  4. attrd_add_client_to_waitlist
  5. attrd_free_waitlist
  6. attrd_remove_client_from_waitlist
  7. attrd_ack_waitlist_clients
  8. attrd_cluster_sync_point_update
  9. attrd_request_sync_point
  10. attrd_request_has_sync_point
  11. free_action
  12. confirmation_timeout_cb
  13. attrd_do_not_expect_from_peer
  14. attrd_do_not_wait_for_client
  15. attrd_expect_confirmations
  16. attrd_free_confirmations
  17. attrd_handle_confirmation

   1 /*
   2  * Copyright 2022-2023 the Pacemaker project contributors
   3  *
   4  * The version control history for this file may have further details.
   5  *
   6  * This source code is licensed under the GNU General Public License version 2
   7  * or later (GPLv2+) WITHOUT ANY WARRANTY.
   8  */
   9 
  10 #include <crm_internal.h>
  11 
  12 #include <crm/msg_xml.h>
  13 #include <crm/common/attrd_internal.h>
  14 
  15 #include "pacemaker-attrd.h"
  16 
  17 /* A hash table storing clients that are waiting on a sync point to be reached.
  18  * The key is waitlist_client - just a plain int.  The obvious key would be
  19  * the IPC client's ID, but this is not guaranteed to be unique.  A single client
  20  * could be waiting on a sync point for multiple attributes at the same time.
  21  *
  22  * It is not expected that this hash table will ever be especially large.
  23  */
  24 static GHashTable *waitlist = NULL;
  25 static int waitlist_client = 0;
  26 
  27 struct waitlist_node {
  28     /* What kind of sync point does this node describe? */
  29     enum attrd_sync_point sync_point;
  30 
  31     /* Information required to construct and send a reply to the client. */
  32     char *client_id;
  33     uint32_t ipc_id;
  34     uint32_t flags;
  35 };
  36 
  37 /* A hash table storing information on in-progress IPC requests that are awaiting
  38  * confirmations.  These requests are currently being processed by peer attrds and
  39  * we are waiting to receive confirmation messages from each peer indicating that
  40  * processing is complete.
  41  *
  42  * Multiple requests could be waiting on confirmations at the same time.
  43  *
  44  * The key is the unique callid for the IPC request, and the value is a
  45  * confirmation_action struct.
  46  */
  47 static GHashTable *expected_confirmations = NULL;
  48 
  49 /*!
  50  * \internal
  51  * \brief A structure describing a single IPC request that is awaiting confirmations
  52  */
  53 struct confirmation_action {
  54     /*!
  55      * \brief A list of peer attrds that we are waiting to receive confirmation
  56      *        messages from
  57      *
  58      * This list is dynamic - as confirmations arrive from peer attrds, they will
  59      * be removed from this list.  When the list is empty, all peers have processed
  60      * the request and the associated confirmation action will be taken.
  61      */
  62     GList *respondents;
  63 
  64     /*!
  65      * \brief A timer that will be used to remove the client should it time out
  66      *        before receiving all confirmations
  67      */
  68     mainloop_timer_t *timer;
  69 
  70     /*!
  71      * \brief A function to run when all confirmations have been received
  72      */
  73     attrd_confirmation_action_fn fn;
  74 
  75     /*!
  76      * \brief Information required to construct and send a reply to the client
  77      */
  78     char *client_id;
  79     uint32_t ipc_id;
  80     uint32_t flags;
  81 
  82     /*!
  83      * \brief The XML request containing the callid associated with this action
  84      */
  85     void *xml;
  86 };
  87 
  88 static void
  89 next_key(void)
     /* [previous][next][first][last][top][bottom][index][help] */
  90 {
  91     do {
  92         waitlist_client++;
  93         if (waitlist_client < 0) {
  94             waitlist_client = 1;
  95         }
  96     } while (g_hash_table_contains(waitlist, GINT_TO_POINTER(waitlist_client)));
  97 }
  98 
  99 static void
 100 free_waitlist_node(gpointer data)
     /* [previous][next][first][last][top][bottom][index][help] */
 101 {
 102     struct waitlist_node *wl = (struct waitlist_node *) data;
 103 
 104     free(wl->client_id);
 105     free(wl);
 106 }
 107 
 108 static const char *
 109 sync_point_str(enum attrd_sync_point sync_point)
     /* [previous][next][first][last][top][bottom][index][help] */
 110 {
 111     if (sync_point == attrd_sync_point_local) {
 112         return PCMK__VALUE_LOCAL;
 113     } else if  (sync_point == attrd_sync_point_cluster) {
 114         return PCMK__VALUE_CLUSTER;
 115     } else {
 116         return "unknown";
 117     }
 118 }
 119 
 120 /*!
 121  * \internal
 122  * \brief Add a client to the attrd waitlist
 123  *
 124  * Typically, a client receives an ACK for its XML IPC request immediately.  However,
 125  * some clients want to wait until their request has been processed and taken effect.
 126  * This is called a sync point.  Any client placed on this waitlist will have its
 127  * ACK message delayed until either its requested sync point is hit, or until it
 128  * times out.
 129  *
 130  * The XML IPC request must specify the type of sync point it wants to wait for.
 131  *
 132  * \param[in,out] request   The request describing the client to place on the waitlist.
 133  */
 134 void
 135 attrd_add_client_to_waitlist(pcmk__request_t *request)
     /* [previous][next][first][last][top][bottom][index][help] */
 136 {
 137     const char *sync_point = attrd_request_sync_point(request->xml);
 138     struct waitlist_node *wl = NULL;
 139 
 140     if (sync_point == NULL) {
 141         return;
 142     }
 143 
 144     if (waitlist == NULL) {
 145         waitlist = pcmk__intkey_table(free_waitlist_node);
 146     }
 147 
 148     wl = calloc(sizeof(struct waitlist_node), 1);
 149 
 150     CRM_ASSERT(wl != NULL);
 151 
 152     wl->client_id = strdup(request->ipc_client->id);
 153 
 154     CRM_ASSERT(wl->client_id);
 155 
 156     if (pcmk__str_eq(sync_point, PCMK__VALUE_LOCAL, pcmk__str_none)) {
 157         wl->sync_point = attrd_sync_point_local;
 158     } else if (pcmk__str_eq(sync_point, PCMK__VALUE_CLUSTER, pcmk__str_none)) {
 159         wl->sync_point = attrd_sync_point_cluster;
 160     } else {
 161         free_waitlist_node(wl);
 162         return;
 163     }
 164 
 165     wl->ipc_id = request->ipc_id;
 166     wl->flags = request->flags;
 167 
 168     next_key();
 169     pcmk__intkey_table_insert(waitlist, waitlist_client, wl);
 170 
 171     crm_trace("Added client %s to waitlist for %s sync point",
 172               wl->client_id, sync_point_str(wl->sync_point));
 173     crm_trace("%d clients now on waitlist", g_hash_table_size(waitlist));
 174 
 175     /* And then add the key to the request XML so we can uniquely identify
 176      * it when it comes time to issue the ACK.
 177      */
 178     crm_xml_add_int(request->xml, XML_LRM_ATTR_CALLID, waitlist_client);
 179 }
 180 
 181 /*!
 182  * \internal
 183  * \brief Free all memory associated with the waitlist.  This is most typically
 184  *        used when attrd shuts down.
 185  */
 186 void
 187 attrd_free_waitlist(void)
     /* [previous][next][first][last][top][bottom][index][help] */
 188 {
 189     if (waitlist == NULL) {
 190         return;
 191     }
 192 
 193     g_hash_table_destroy(waitlist);
 194     waitlist = NULL;
 195 }
 196 
 197 /*!
 198  * \internal
 199  * \brief Unconditionally remove a client from the waitlist, such as when the client
 200  *        node disconnects from the cluster
 201  *
 202  * \param[in] client    The client to remove
 203  */
 204 void
 205 attrd_remove_client_from_waitlist(pcmk__client_t *client)
     /* [previous][next][first][last][top][bottom][index][help] */
 206 {
 207     GHashTableIter iter;
 208     gpointer value;
 209 
 210     if (waitlist == NULL) {
 211         return;
 212     }
 213 
 214     g_hash_table_iter_init(&iter, waitlist);
 215 
 216     while (g_hash_table_iter_next(&iter, NULL, &value)) {
 217         struct waitlist_node *wl = (struct waitlist_node *) value;
 218 
 219         if (pcmk__str_eq(wl->client_id, client->id, pcmk__str_none)) {
 220             g_hash_table_iter_remove(&iter);
 221             crm_trace("%d clients now on waitlist", g_hash_table_size(waitlist));
 222         }
 223     }
 224 }
 225 
 226 /*!
 227  * \internal
 228  * \brief Send an IPC ACK message to all awaiting clients
 229  *
 230  * This function will search the waitlist for all clients that are currently awaiting
 231  * an ACK indicating their attrd operation is complete.  Only those clients with a
 232  * matching sync point type and callid from their original XML IPC request will be
 233  * ACKed.  Once they have received an ACK, they will be removed from the waitlist.
 234  *
 235  * \param[in] sync_point What kind of sync point have we hit?
 236  * \param[in] xml        The original XML IPC request.
 237  */
 238 void
 239 attrd_ack_waitlist_clients(enum attrd_sync_point sync_point, const xmlNode *xml)
     /* [previous][next][first][last][top][bottom][index][help] */
 240 {
 241     int callid;
 242     gpointer value;
 243 
 244     if (waitlist == NULL) {
 245         return;
 246     }
 247 
 248     if (crm_element_value_int(xml, XML_LRM_ATTR_CALLID, &callid) == -1) {
 249         crm_warn("Could not get callid from request XML");
 250         return;
 251     }
 252 
 253     value = pcmk__intkey_table_lookup(waitlist, callid);
 254     if (value != NULL) {
 255         struct waitlist_node *wl = (struct waitlist_node *) value;
 256         pcmk__client_t *client = NULL;
 257 
 258         if (wl->sync_point != sync_point) {
 259             return;
 260         }
 261 
 262         crm_notice("Alerting client %s for reached %s sync point",
 263                    wl->client_id, sync_point_str(wl->sync_point));
 264 
 265         client = pcmk__find_client_by_id(wl->client_id);
 266         if (client == NULL) {
 267             return;
 268         }
 269 
 270         attrd_send_ack(client, wl->ipc_id, wl->flags | crm_ipc_client_response);
 271 
 272         /* And then remove the client so it doesn't get alerted again. */
 273         pcmk__intkey_table_remove(waitlist, callid);
 274 
 275         crm_trace("%d clients now on waitlist", g_hash_table_size(waitlist));
 276     }
 277 }
 278 
 279 /*!
 280  * \internal
 281  * \brief Action to take when a cluster sync point is hit for a
 282  *        PCMK__ATTRD_CMD_UPDATE* message.
 283  *
 284  * \param[in] xml  The request that should be passed along to
 285  *                 attrd_ack_waitlist_clients.  This should be the original
 286  *                 IPC request containing the callid for this update message.
 287  */
 288 int
 289 attrd_cluster_sync_point_update(xmlNode *xml)
     /* [previous][next][first][last][top][bottom][index][help] */
 290 {
 291     crm_trace("Hit cluster sync point for attribute update");
 292     attrd_ack_waitlist_clients(attrd_sync_point_cluster, xml);
 293     return pcmk_rc_ok;
 294 }
 295 
 296 /*!
 297  * \internal
 298  * \brief Return the sync point attribute for an IPC request
 299  *
 300  * This function will check both the top-level element of \p xml for a sync
 301  * point attribute, as well as all of its \p op children, if any.  The latter
 302  * is useful for newer versions of attrd that can put multiple IPC requests
 303  * into a single message.
 304  *
 305  * \param[in] xml   An XML IPC request
 306  *
 307  * \note It is assumed that if one child element has a sync point attribute,
 308  *       all will have a sync point attribute and they will all be the same
 309  *       sync point.  No other configuration is supported.
 310  *
 311  * \return The sync point attribute of \p xml, or NULL if none.
 312  */
 313 const char *
 314 attrd_request_sync_point(xmlNode *xml)
     /* [previous][next][first][last][top][bottom][index][help] */
 315 {
 316     CRM_CHECK(xml != NULL, return NULL);
 317 
 318     if (xml->children != NULL) {
 319         xmlNode *child = pcmk__xe_match(xml, XML_ATTR_OP, PCMK__XA_ATTR_SYNC_POINT, NULL);
 320 
 321         if (child) {
 322             return crm_element_value(child, PCMK__XA_ATTR_SYNC_POINT);
 323         } else {
 324             return NULL;
 325         }
 326 
 327     } else {
 328         return crm_element_value(xml, PCMK__XA_ATTR_SYNC_POINT);
 329     }
 330 }
 331 
 332 /*!
 333  * \internal
 334  * \brief Does an IPC request contain any sync point attribute?
 335  *
 336  * \param[in] xml   An XML IPC request
 337  *
 338  * \return true if there's a sync point attribute, false otherwise
 339  */
 340 bool
 341 attrd_request_has_sync_point(xmlNode *xml)
     /* [previous][next][first][last][top][bottom][index][help] */
 342 {
 343     return attrd_request_sync_point(xml) != NULL;
 344 }
 345 
 346 static void
 347 free_action(gpointer data)
     /* [previous][next][first][last][top][bottom][index][help] */
 348 {
 349     struct confirmation_action *action = (struct confirmation_action *) data;
 350     g_list_free_full(action->respondents, free);
 351     mainloop_timer_del(action->timer);
 352     free_xml(action->xml);
 353     free(action->client_id);
 354     free(action);
 355 }
 356 
 357 /* Remove an IPC request from the expected_confirmations table if the peer attrds
 358  * don't respond before the timeout is hit.  We set the timeout to 15s.  The exact
 359  * number isn't critical - we just want to make sure that the table eventually gets
 360  * cleared of things that didn't complete.
 361  */
 362 static gboolean
 363 confirmation_timeout_cb(gpointer data)
     /* [previous][next][first][last][top][bottom][index][help] */
 364 {
 365     struct confirmation_action *action = (struct confirmation_action *) data;
 366 
 367     GHashTableIter iter;
 368     gpointer value;
 369 
 370     if (expected_confirmations == NULL) {
 371         return G_SOURCE_REMOVE;
 372     }
 373 
 374     g_hash_table_iter_init(&iter, expected_confirmations);
 375 
 376     while (g_hash_table_iter_next(&iter, NULL, &value)) {
 377         if (value == action) {
 378             pcmk__client_t *client = pcmk__find_client_by_id(action->client_id);
 379             if (client == NULL) {
 380                 return G_SOURCE_REMOVE;
 381             }
 382 
 383             crm_trace("Timed out waiting for confirmations for client %s", client->id);
 384             pcmk__ipc_send_ack(client, action->ipc_id, action->flags | crm_ipc_client_response,
 385                                "ack", ATTRD_PROTOCOL_VERSION, CRM_EX_TIMEOUT);
 386 
 387             g_hash_table_iter_remove(&iter);
 388             crm_trace("%d requests now in expected confirmations table", g_hash_table_size(expected_confirmations));
 389             break;
 390         }
 391     }
 392 
 393     return G_SOURCE_REMOVE;
 394 }
 395 
 396 /*!
 397  * \internal
 398  * \brief When a peer disconnects from the cluster, no longer wait for its confirmation
 399  *        for any IPC action.  If this peer is the last one being waited on, this will
 400  *        trigger the confirmation action.
 401  *
 402  * \param[in] host   The disconnecting peer attrd's uname
 403  */
 404 void
 405 attrd_do_not_expect_from_peer(const char *host)
     /* [previous][next][first][last][top][bottom][index][help] */
 406 {
 407     GList *keys = NULL;
 408 
 409     if (expected_confirmations == NULL) {
 410         return;
 411     }
 412 
 413     keys = g_hash_table_get_keys(expected_confirmations);
 414 
 415     crm_trace("Removing peer %s from expected confirmations", host);
 416 
 417     for (GList *node = keys; node != NULL; node = node->next) {
 418         int callid = *(int *) node->data;
 419         attrd_handle_confirmation(callid, host);
 420     }
 421 
 422     g_list_free(keys);
 423 }
 424 
 425 /*!
 426  * \internal
 427  * \brief When a client disconnects from the cluster, no longer wait on confirmations
 428  *        for it.  Because the peer attrds may still be processing the original IPC
 429  *        message, they may still send us confirmations.  However, we will take no
 430  *        action on them.
 431  *
 432  * \param[in] client    The disconnecting client
 433  */
 434 void
 435 attrd_do_not_wait_for_client(pcmk__client_t *client)
     /* [previous][next][first][last][top][bottom][index][help] */
 436 {
 437     GHashTableIter iter;
 438     gpointer value;
 439 
 440     if (expected_confirmations == NULL) {
 441         return;
 442     }
 443 
 444     g_hash_table_iter_init(&iter, expected_confirmations);
 445 
 446     while (g_hash_table_iter_next(&iter, NULL, &value)) {
 447         struct confirmation_action *action = (struct confirmation_action *) value;
 448 
 449         if (pcmk__str_eq(action->client_id, client->id, pcmk__str_none)) {
 450             crm_trace("Removing client %s from expected confirmations", client->id);
 451             g_hash_table_iter_remove(&iter);
 452             crm_trace("%d requests now in expected confirmations table", g_hash_table_size(expected_confirmations));
 453             break;
 454         }
 455     }
 456 }
 457 
 458 /*!
 459  * \internal
 460  * \brief Register some action to be taken when IPC request confirmations are
 461  *        received
 462  *
 463  * When this function is called, a list of all peer attrds that support confirming
 464  * requests is generated.  As confirmations from these peer attrds are received,
 465  * they are removed from this list.  When the list is empty, the registered action
 466  * will be called.
 467  *
 468  * \note This function should always be called before attrd_send_message is called
 469  *       to broadcast to the peers to ensure that we know what replies we are
 470  *       waiting on.  Otherwise, it is possible the peer could finish and confirm
 471  *       before we know to expect it.
 472  *
 473  * \param[in] request The request that is awaiting confirmations
 474  * \param[in] fn      A function to be run after all confirmations are received
 475  */
 476 void
 477 attrd_expect_confirmations(pcmk__request_t *request, attrd_confirmation_action_fn fn)
     /* [previous][next][first][last][top][bottom][index][help] */
 478 {
 479     struct confirmation_action *action = NULL;
 480     GHashTableIter iter;
 481     gpointer host, ver;
 482     GList *respondents = NULL;
 483     int callid;
 484 
 485     if (expected_confirmations == NULL) {
 486         expected_confirmations = pcmk__intkey_table((GDestroyNotify) free_action);
 487     }
 488 
 489     if (crm_element_value_int(request->xml, XML_LRM_ATTR_CALLID, &callid) == -1) {
 490         crm_err("Could not get callid from xml");
 491         return;
 492     }
 493 
 494     if (pcmk__intkey_table_lookup(expected_confirmations, callid)) {
 495         crm_err("Already waiting on confirmations for call id %d", callid);
 496         return;
 497     }
 498 
 499     g_hash_table_iter_init(&iter, peer_protocol_vers);
 500     while (g_hash_table_iter_next(&iter, &host, &ver)) {
 501         if (ATTRD_SUPPORTS_CONFIRMATION(GPOINTER_TO_INT(ver))) {
 502             char *s = strdup((char *) host);
 503 
 504             CRM_ASSERT(s != NULL);
 505             respondents = g_list_prepend(respondents, s);
 506         }
 507     }
 508 
 509     action = calloc(1, sizeof(struct confirmation_action));
 510     CRM_ASSERT(action != NULL);
 511 
 512     action->respondents = respondents;
 513     action->fn = fn;
 514     action->xml = copy_xml(request->xml);
 515 
 516     action->client_id = strdup(request->ipc_client->id);
 517     CRM_ASSERT(action->client_id != NULL);
 518 
 519     action->ipc_id = request->ipc_id;
 520     action->flags = request->flags;
 521 
 522     action->timer = mainloop_timer_add(NULL, 15000, FALSE, confirmation_timeout_cb, action);
 523     mainloop_timer_start(action->timer);
 524 
 525     pcmk__intkey_table_insert(expected_confirmations, callid, action);
 526     crm_trace("Callid %d now waiting on %d confirmations", callid, g_list_length(respondents));
 527     crm_trace("%d requests now in expected confirmations table", g_hash_table_size(expected_confirmations));
 528 }
 529 
 530 void
 531 attrd_free_confirmations(void)
     /* [previous][next][first][last][top][bottom][index][help] */
 532 {
 533     if (expected_confirmations != NULL) {
 534         g_hash_table_destroy(expected_confirmations);
 535         expected_confirmations = NULL;
 536     }
 537 }
 538 
 539 /*!
 540  * \internal
 541  * \brief Process a confirmation message from a peer attrd
 542  *
 543  * This function is called every time a PCMK__ATTRD_CMD_CONFIRM message is
 544  * received from a peer attrd.  If this is the last confirmation we are waiting
 545  * on for a given operation, the registered action will be called.
 546  *
 547  * \param[in] callid The unique callid for the XML IPC request
 548  * \param[in] host   The confirming peer attrd's uname
 549  */
 550 void
 551 attrd_handle_confirmation(int callid, const char *host)
     /* [previous][next][first][last][top][bottom][index][help] */
 552 {
 553     struct confirmation_action *action = NULL;
 554     GList *node = NULL;
 555 
 556     if (expected_confirmations == NULL) {
 557         return;
 558     }
 559 
 560     action = pcmk__intkey_table_lookup(expected_confirmations, callid);
 561     if (action == NULL) {
 562         return;
 563     }
 564 
 565     node = g_list_find_custom(action->respondents, host, (GCompareFunc) strcasecmp);
 566 
 567     if (node == NULL) {
 568         return;
 569     }
 570 
 571     action->respondents = g_list_remove(action->respondents, node->data);
 572     crm_trace("Callid %d now waiting on %d confirmations", callid, g_list_length(action->respondents));
 573 
 574     if (action->respondents == NULL) {
 575         action->fn(action->xml);
 576         pcmk__intkey_table_remove(expected_confirmations, callid);
 577         crm_trace("%d requests now in expected confirmations table", g_hash_table_size(expected_confirmations));
 578     }
 579 }

/* [previous][next][first][last][top][bottom][index][help] */