4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License version 2 for more details. A copy is
14 * included in the COPYING file that accompanied this code.
16 * You should have received a copy of the GNU General Public License
17 * along with this program; if not, write to the Free Software
18 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
23 * (C) Copyright 2012 Commissariat a l'energie atomique et aux energies
25 * Copyright (c) 2016, 2017, Intel Corporation.
30 * lustre/mdt/mdt_hsm_cdt_agent.c
32 * Lustre HSM Coordinator
34 * Author: Jacques-Charles Lafoucriere <jacques-charles.lafoucriere@cea.fr>
35 * Author: Aurelien Degremont <aurelien.degremont@cea.fr>
38 #define DEBUG_SUBSYSTEM S_MDS
41 #include <obd_support.h>
42 #include <lustre_export.h>
43 #include <lprocfs_status.h>
44 #include <lustre_kernelcomm.h>
45 #include "mdt_internal.h"
52 * find a hsm_agent by uuid
53 * lock cdt_agent_lock needs to be held by caller
54 * \param cdt [IN] coordinator
55 * \param uuid [IN] agent UUID
56 * \retval hsm_agent pointer or NULL if not found
58 static struct hsm_agent *mdt_hsm_agent_lookup(struct coordinator *cdt,
59 const struct obd_uuid *uuid)
63 list_for_each_entry(ha, &cdt->cdt_agents, ha_list) {
64 if (obd_uuid_equals(&ha->ha_uuid, uuid))
71 * register a copy tool
72 * \param mti [IN] MDT context
73 * \param uuid [IN] client UUID to be registered
74 * \param count [IN] number of archives agent serves
75 * \param archive_id [IN] vector of archive number served by the copytool
79 int mdt_hsm_agent_register(struct mdt_thread_info *mti,
80 const struct obd_uuid *uuid,
81 int nr_archives, __u32 *archive_id)
83 struct coordinator *cdt = &mti->mti_mdt->mdt_coordinator;
84 struct hsm_agent *ha, *tmp;
88 /* no coordinator started, so we cannot serve requests */
89 if (!cdt_getref_try(cdt)) {
90 LCONSOLE_WARN("HSM coordinator thread is not running - "
91 "denying agent registration.\n");
92 /* The client will resend the request if starting */
93 RETURN(cdt->cdt_state == CDT_RUNNING ? -EINPROGRESS : -ENXIO);
98 GOTO(out, rc = -ENOMEM);
101 ha->ha_archive_cnt = nr_archives;
102 if (ha->ha_archive_cnt != 0) {
105 sz = ha->ha_archive_cnt * sizeof(*ha->ha_archive_id);
106 OBD_ALLOC(ha->ha_archive_id, sz);
107 if (ha->ha_archive_id == NULL)
108 GOTO(out_free, rc = -ENOMEM);
109 memcpy(ha->ha_archive_id, archive_id, sz);
111 atomic_set(&ha->ha_requests, 0);
112 atomic_set(&ha->ha_success, 0);
113 atomic_set(&ha->ha_failure, 0);
115 down_write(&cdt->cdt_agent_lock);
116 tmp = mdt_hsm_agent_lookup(cdt, uuid);
118 LCONSOLE_WARN("HSM agent %s already registered\n",
120 up_write(&cdt->cdt_agent_lock);
121 GOTO(out_free, rc = -EEXIST);
124 list_add_tail(&ha->ha_list, &cdt->cdt_agents);
126 if (ha->ha_archive_cnt == 0)
127 CDEBUG(D_HSM, "agent %s registered for all archives\n",
128 obd_uuid2str(&ha->ha_uuid));
130 CDEBUG(D_HSM, "agent %s registered for %d archives\n",
131 obd_uuid2str(&ha->ha_uuid), ha->ha_archive_cnt);
133 up_write(&cdt->cdt_agent_lock);
138 if (ha != NULL && ha->ha_archive_id != NULL)
139 OBD_FREE_PTR_ARRAY(ha->ha_archive_id, ha->ha_archive_cnt);
143 /* wake the coordinator to potentially schedule requests */
144 if (rc == -EEXIST || rc == 0)
145 mdt_hsm_cdt_event(cdt);
152 * register a copy tool
153 * \param mti [IN] MDT context
154 * \param uuid [IN] uuid to be registered
155 * \param archive_mask [IN] bitmask of archive number served by the copytool
157 * \retval -ve failure
159 int mdt_hsm_agent_register_mask(struct mdt_thread_info *mti,
160 const struct obd_uuid *uuid, __u32 archive_mask)
162 int rc, i, nr_archives = 0;
163 __u32 *archive_id = NULL;
166 nr_archives = hweight32(archive_mask);
168 if (nr_archives != 0) {
169 OBD_ALLOC_PTR_ARRAY(archive_id, nr_archives);
174 for (i = 0; i < sizeof(archive_mask) * 8; i++) {
175 if (BIT(i) & archive_mask) {
176 archive_id[nr_archives] = i + 1;
182 rc = mdt_hsm_agent_register(mti, uuid, nr_archives, archive_id);
184 if (archive_id != NULL)
185 OBD_FREE_PTR_ARRAY(archive_id, nr_archives);
191 * unregister a copy tool
192 * \param mti [IN] MDT context
193 * \param uuid [IN] uuid to be unregistered
195 * \retval -ve failure
197 int mdt_hsm_agent_unregister(struct mdt_thread_info *mti,
198 const struct obd_uuid *uuid)
200 struct coordinator *cdt = &mti->mti_mdt->mdt_coordinator;
201 struct hsm_agent *ha;
205 /* no coordinator started, so we cannot serve requests */
206 if (!cdt_getref_try(cdt))
209 down_write(&cdt->cdt_agent_lock);
211 ha = mdt_hsm_agent_lookup(cdt, uuid);
213 list_del_init(&ha->ha_list);
215 up_write(&cdt->cdt_agent_lock);
218 GOTO(out, rc = -ENOENT);
220 if (ha->ha_archive_cnt != 0)
221 OBD_FREE_PTR_ARRAY(ha->ha_archive_id, ha->ha_archive_cnt);
226 CDEBUG(D_HSM, "agent %s unregistration: %d\n", obd_uuid2str(uuid), rc);
233 * update agent statistics
234 * \param mdt [IN] MDT device
235 * \param succ_rq [IN] number of success
236 * \param fail_rq [IN] number of failure
237 * \param new_rq [IN] number of new requests
238 * \param uuid [IN] agent uuid
239 * if all counters == 0, clear counters
241 * \retval -ve failure
243 int mdt_hsm_agent_update_statistics(struct coordinator *cdt,
244 int succ_rq, int fail_rq, int new_rq,
245 const struct obd_uuid *uuid)
247 struct hsm_agent *ha;
251 down_read(&cdt->cdt_agent_lock);
252 list_for_each_entry(ha, &cdt->cdt_agents, ha_list) {
253 if (obd_uuid_equals(&ha->ha_uuid, uuid)) {
254 if (succ_rq == 0 && fail_rq == 0 && new_rq == 0) {
255 atomic_set(&ha->ha_success, 0);
256 atomic_set(&ha->ha_failure, 0);
257 atomic_set(&ha->ha_requests, 0);
259 atomic_add(succ_rq, &ha->ha_success);
260 atomic_add(fail_rq, &ha->ha_failure);
261 atomic_add(new_rq, &ha->ha_requests);
262 atomic_sub(succ_rq, &ha->ha_requests);
263 atomic_sub(fail_rq, &ha->ha_requests);
271 up_read(&cdt->cdt_agent_lock);
276 * find the best agent
277 * \param cdt [IN] coordinator
278 * \param archive [IN] archive number
279 * \param uuid [OUT] agent who can serve archive
281 * \retval -ve failure
283 int mdt_hsm_find_best_agent(struct coordinator *cdt, __u32 archive,
284 struct obd_uuid *uuid)
286 int rc = -EAGAIN, i, load = -1;
287 struct hsm_agent *ha;
290 /* Choose an export to send a copytool req to */
291 down_read(&cdt->cdt_agent_lock);
292 list_for_each_entry(ha, &cdt->cdt_agents, ha_list) {
293 for (i = 0; (i < ha->ha_archive_cnt) &&
294 (ha->ha_archive_id[i] != archive); i++) {
295 /* nothing to do, just skip unmatching records */
298 /* archive count == 0 means copy tool serves any backend */
299 if (ha->ha_archive_cnt != 0 && i == ha->ha_archive_cnt)
302 if (load == -1 || load > atomic_read(&ha->ha_requests)) {
303 load = atomic_read(&ha->ha_requests);
307 if (atomic_read(&ha->ha_requests) == 0)
310 up_read(&cdt->cdt_agent_lock);
315 static int mdt_hsm_send_action_to_each_archive(struct mdt_thread_info *mti,
316 struct hsm_action_item *hai)
318 struct hsm_agent *ha;
319 __u32 archive_mask = 0;
320 struct coordinator *cdt = &mti->mti_mdt->mdt_coordinator;
322 /* return error by default in case all archive_ids have unregistered */
326 /* send action to all registered archive_ids */
327 down_read(&cdt->cdt_agent_lock);
328 list_for_each_entry(ha, &cdt->cdt_agents, ha_list) {
329 for (i = 0; (i < ha->ha_archive_cnt); i++) {
330 /* only send once for each archive_id */
331 if (BIT(ha->ha_archive_id[i]) & archive_mask)
333 archive_mask |= BIT(ha->ha_archive_id[i]);
335 /* XXX: it could make sense to gather all
336 * actions for the same archive_id like in
337 * mdt_hsm_add_actions() ?? */
338 rc = mdt_agent_record_add(mti->mti_env, mti->mti_mdt,
339 ha->ha_archive_id[i], 0,
342 CERROR("%s: unable to add HSM remove request "
343 "for "DFID": rc=%d\n",
344 mdt_obd_name(mti->mti_mdt),
345 PFID(&hai->hai_fid), rc);
348 CDEBUG(D_HSM, "%s: added HSM remove request "
349 "for "DFID", archive_id=%d\n",
350 mdt_obd_name(mti->mti_mdt),
352 ha->ha_archive_id[i]);
355 /* early exit from loop due to error? */
356 if (i != ha->ha_archive_cnt)
359 up_read(&cdt->cdt_agent_lock);
365 * send a HAL to the agent
366 * \param mti [IN] context
367 * \param hal [IN] request (can be a kuc payload)
368 * \param purge [IN] purge mode (no record)
370 * \retval -ve failure
371 * This function supposes:
372 * - all actions are for the same archive number
373 * - in case of cancel, all cancel are for the same agent
374 * This implies that request split has to be done
375 * before when building the hal
377 int mdt_hsm_agent_send(struct mdt_thread_info *mti,
378 struct hsm_action_list *hal, bool purge)
380 struct obd_export *exp;
381 struct mdt_device *mdt = mti->mti_mdt;
382 struct coordinator *cdt = &mti->mti_mdt->mdt_coordinator;
383 struct hsm_action_list *buf = NULL;
384 struct hsm_action_item *hai;
385 struct obd_uuid uuid;
388 bool is_registered = false;
391 rc = mdt_hsm_find_best_agent(cdt, hal->hal_archive_id, &uuid);
392 if (rc && hal->hal_archive_id == 0) {
396 /* special case of remove requests with no archive_id specified,
397 * and no agent registered to serve all archives, then create a
398 * set of new requests, each to be sent to each registered
400 * Todo so, find all HSMA_REMOVE entries, and then :
401 * _ set completed status as SUCCESS (or FAIL?)
402 * _ create a new LLOG record for each archive_id
403 * presently being served by any CT
405 hai = hai_first(hal);
406 for (i = 0; i < hal->hal_count; i++,
407 hai = hai_next(hai)) {
408 struct hsm_record_update update;
410 /* only removes are concerned */
411 if (hai->hai_action != HSMA_REMOVE) {
412 /* count if other actions than HSMA_REMOVE,
413 * to return original error/rc */
418 /* send remove request to all registered archive_ids */
419 rc2 = mdt_hsm_send_action_to_each_archive(mti, hai);
423 /* only update original request as SUCCEED if it has
424 * been successfully broadcasted to all available
426 * XXX: this should only cause duplicates to be sent,
427 * unless a method to record already successfully
428 * reached archive_ids is implemented */
430 update.cookie = hai->hai_cookie;
431 update.status = ARS_SUCCEED;
432 rc2 = mdt_agent_record_update(mti, &update, 1);
434 CERROR("%s: mdt_agent_record_update() "
435 "failed, cannot update "
436 "status to %s for cookie "
439 agent_req_status2name(ARS_SUCCEED),
440 hai->hai_cookie, rc2);
444 /* only remove requests with archive_id=0 */
451 CERROR("%s: Cannot find agent for archive %d: rc = %d\n",
452 mdt_obd_name(mdt), hal->hal_archive_id, rc);
456 CDEBUG(D_HSM, "Agent %s selected for archive %d\n", obd_uuid2str(&uuid),
457 hal->hal_archive_id);
460 buf = kuc_alloc(len, KUC_TRANSPORT_HSM, HMT_ACTION_LIST);
462 RETURN(PTR_ERR(buf));
463 memcpy(buf, hal, len);
465 /* Check if request is still valid (cf file hsm flags) */
466 fail_request = false;
467 hai = hai_first(hal);
468 for (i = 0; i < hal->hal_count; i++, hai = hai_next(hai)) {
469 struct mdt_object *obj;
472 if (hai->hai_action == HSMA_CANCEL)
475 obj = mdt_hsm_get_md_hsm(mti, &hai->hai_fid, &hsm);
477 mdt_object_put(mti->mti_env, obj);
478 } else if (PTR_ERR(obj) == -ENOENT) {
479 struct hsm_record_update update = {
480 .cookie = hai->hai_cookie,
481 .status = ARS_FAILED,
484 if (hai->hai_action == HSMA_REMOVE)
488 rc = mdt_agent_record_update(mti, &update, 1);
490 CERROR("%s: mdt_agent_record_update() failed, "
491 "cannot update status to %s for cookie "
494 agent_req_status2name(ARS_FAILED),
495 hai->hai_cookie, rc);
501 GOTO(out_buf, rc = PTR_ERR(obj));
504 if (!mdt_hsm_is_action_compat(hai, hal->hal_archive_id,
505 hal->hal_flags, &hsm)) {
506 struct hsm_record_update update = {
507 .cookie = hai->hai_cookie,
508 .status = ARS_FAILED,
511 /* incompatible request, we abort the request */
512 /* next time coordinator will wake up, it will
513 * make the same HAL with valid only
516 rc = mdt_agent_record_update(mti, &update, 1);
518 CERROR("%s: mdt_agent_record_update() failed, "
519 "cannot update status to %s for cookie "
522 agent_req_status2name(ARS_FAILED),
523 hai->hai_cookie, rc);
527 /* if restore and record status updated, give
528 * back granted layout lock */
529 if (hai->hai_action == HSMA_RESTORE)
530 cdt_restore_handle_del(mti, cdt, &hai->hai_fid);
534 /* we found incompatible requests, so the HAL cannot be sent
535 * as is. Bad records have been invalidated in llog.
536 * Valid one will be reschedule next time coordinator will wake up
537 * So no need the rebuild a full valid HAL now
540 GOTO(out_buf, rc = -EAGAIN);
542 /* Cancel memory registration is useless for purge
543 * non registration avoid a deadlock :
544 * in case of failure we have to take the write lock
545 * to remove entry which conflict with the read loack needed
549 /* set is_registered even if failure because we may have
550 * partial work done */
551 is_registered = true;
552 rc = mdt_hsm_add_hal(mti, hal, &uuid);
557 /* Uses the ldlm reverse import; this rpc will be seen by
558 * the ldlm_callback_handler. Note this sends a request RPC
559 * from a server (MDT) to a client (MDC), backwards of normal comms.
561 exp = obd_uuid_lookup(mdt2obd_dev(mdt), &uuid);
562 if (exp == NULL || exp->exp_disconnected) {
564 class_export_put(exp);
565 /* This should clean up agents on evicted exports */
567 CERROR("%s: agent uuid (%s) not found, unregistering:"
569 mdt_obd_name(mdt), obd_uuid2str(&uuid), rc);
570 mdt_hsm_agent_unregister(mti, &uuid);
574 /* send request to agent */
575 rc = do_set_info_async(exp->exp_imp_reverse, LDLM_SET_INFO,
577 sizeof(KEY_HSM_COPYTOOL_SEND),
578 KEY_HSM_COPYTOOL_SEND,
579 kuc_len(len), kuc_ptr(buf), NULL);
582 CERROR("%s: cannot send request to agent '%s': rc = %d\n",
583 mdt_obd_name(mdt), obd_uuid2str(&uuid), rc);
585 class_export_put(exp);
588 CDEBUG(D_HSM, "Lost connection to agent '%s', unregistering\n",
589 obd_uuid2str(&uuid));
590 mdt_hsm_agent_unregister(mti, &uuid);
594 if (rc != 0 && is_registered) {
595 /* in case of error, we have to unregister requests */
596 hai = hai_first(hal);
597 for (i = 0; i < hal->hal_count; i++, hai = hai_next(hai)) {
598 if (hai->hai_action == HSMA_CANCEL)
600 mdt_cdt_remove_request(cdt, hai->hai_cookie);
611 * seq_file method called to start access to debugfs file
613 static void *mdt_hsm_agent_debugfs_start(struct seq_file *s, loff_t *off)
615 struct mdt_device *mdt = s->private;
616 struct coordinator *cdt = &mdt->mdt_coordinator;
617 struct list_head *pos;
621 down_read(&cdt->cdt_agent_lock);
623 if (list_empty(&cdt->cdt_agents))
627 RETURN(SEQ_START_TOKEN);
630 list_for_each(pos, &cdt->cdt_agents) {
640 * seq_file method called to get next item
641 * just returns NULL at eof
643 static void *mdt_hsm_agent_debugfs_next(struct seq_file *s, void *v, loff_t *p)
645 struct mdt_device *mdt = s->private;
646 struct coordinator *cdt = &mdt->mdt_coordinator;
647 struct list_head *pos = v;
650 if (pos == SEQ_START_TOKEN)
651 pos = cdt->cdt_agents.next;
656 if (pos != &cdt->cdt_agents)
664 static int mdt_hsm_agent_debugfs_show(struct seq_file *s, void *v)
666 struct list_head *pos = v;
667 struct hsm_agent *ha;
671 if (pos == SEQ_START_TOKEN)
674 ha = list_entry(pos, struct hsm_agent, ha_list);
675 seq_printf(s, "uuid=%s archive_id=", ha->ha_uuid.uuid);
676 if (ha->ha_archive_cnt == 0) {
677 seq_printf(s, "ANY");
679 seq_printf(s, "%d", ha->ha_archive_id[0]);
680 for (i = 1; i < ha->ha_archive_cnt; i++)
681 seq_printf(s, ",%d", ha->ha_archive_id[i]);
684 seq_printf(s, " requests=[current:%d ok:%d errors:%d]\n",
685 atomic_read(&ha->ha_requests),
686 atomic_read(&ha->ha_success),
687 atomic_read(&ha->ha_failure));
692 * seq_file method called to stop access to debugfs file
694 static void mdt_hsm_agent_debugfs_stop(struct seq_file *s, void *v)
696 struct mdt_device *mdt = s->private;
697 struct coordinator *cdt = &mdt->mdt_coordinator;
699 up_read(&cdt->cdt_agent_lock);
702 /* hsm agent list debugfs functions */
703 static const struct seq_operations mdt_hsm_agent_debugfs_ops = {
704 .start = mdt_hsm_agent_debugfs_start,
705 .next = mdt_hsm_agent_debugfs_next,
706 .show = mdt_hsm_agent_debugfs_show,
707 .stop = mdt_hsm_agent_debugfs_stop,
711 * public function called at open of debugfs file to get
714 static int ldebugfs_open_hsm_agent(struct inode *inode, struct file *file)
720 rc = seq_open(file, &mdt_hsm_agent_debugfs_ops);
724 s = file->private_data;
725 s->private = inode->i_private;
730 /* methods to access hsm agent list */
731 const struct file_operations mdt_hsm_agent_fops = {
732 .owner = THIS_MODULE,
733 .open = ldebugfs_open_hsm_agent,
736 .release = seq_release,