4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License version 2 for more details. A copy is
14 * included in the COPYING file that accompanied this code.
16 * You should have received a copy of the GNU General Public License
17 * along with this program; if not, write to the Free Software
18 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
23 * (C) Copyright 2012 Commissariat a l'energie atomique et aux energies
28 * lustre/mdt/mdt_hsm_cdt_agent.c
30 * Lustre HSM Coordinator
32 * Author: Jacques-Charles Lafoucriere <jacques-charles.lafoucriere@cea.fr>
33 * Author: Aurelien Degremont <aurelien.degremont@cea.fr>
36 #define DEBUG_SUBSYSTEM S_MDS
39 #include <obd_support.h>
40 #include <lustre_export.h>
41 #include <lustre/lustre_user.h>
42 #include <lprocfs_status.h>
43 #include <lustre_kernelcomm.h>
44 #include "mdt_internal.h"
51 * find a hsm_agent by uuid
52 * lock cdt_agent_lock needs to be held by caller
53 * \param cdt [IN] coordinator
54 * \param uuid [IN] agent UUID
55 * \retval hsm_agent pointer or NULL if not found
57 static struct hsm_agent *mdt_hsm_agent_lookup(struct coordinator *cdt,
58 const struct obd_uuid *uuid)
62 list_for_each_entry(ha, &cdt->cdt_agents, ha_list) {
63 if (obd_uuid_equals(&ha->ha_uuid, uuid))
70 * register a copy tool
71 * \param mti [IN] MDT context
72 * \param uuid [IN] client UUID to be registered
73 * \param count [IN] number of archives agent serves
74 * \param archive_id [IN] vector of archive number served by the copytool
78 int mdt_hsm_agent_register(struct mdt_thread_info *mti,
79 const struct obd_uuid *uuid,
80 int nr_archives, __u32 *archive_id)
82 struct coordinator *cdt = &mti->mti_mdt->mdt_coordinator;
83 struct hsm_agent *ha, *tmp;
87 /* no coordinator started, so we cannot serve requests */
88 if (cdt->cdt_state == CDT_STOPPED) {
89 LCONSOLE_WARN("HSM coordinator thread is not running - "
90 "denying agent registration.\n");
96 GOTO(out, rc = -ENOMEM);
99 ha->ha_archive_cnt = nr_archives;
100 if (ha->ha_archive_cnt != 0) {
103 sz = ha->ha_archive_cnt * sizeof(*ha->ha_archive_id);
104 OBD_ALLOC(ha->ha_archive_id, sz);
105 if (ha->ha_archive_id == NULL)
106 GOTO(out_free, rc = -ENOMEM);
107 memcpy(ha->ha_archive_id, archive_id, sz);
109 atomic_set(&ha->ha_requests, 0);
110 atomic_set(&ha->ha_success, 0);
111 atomic_set(&ha->ha_failure, 0);
113 down_write(&cdt->cdt_agent_lock);
114 tmp = mdt_hsm_agent_lookup(cdt, uuid);
116 LCONSOLE_WARN("HSM agent %s already registered\n",
118 up_write(&cdt->cdt_agent_lock);
119 GOTO(out_free, rc = -EEXIST);
122 list_add_tail(&ha->ha_list, &cdt->cdt_agents);
124 if (ha->ha_archive_cnt == 0)
125 CDEBUG(D_HSM, "agent %s registered for all archives\n",
126 obd_uuid2str(&ha->ha_uuid));
128 CDEBUG(D_HSM, "agent %s registered for %d archives\n",
129 obd_uuid2str(&ha->ha_uuid), ha->ha_archive_cnt);
131 up_write(&cdt->cdt_agent_lock);
136 if (ha != NULL && ha->ha_archive_id != NULL)
137 OBD_FREE(ha->ha_archive_id,
138 ha->ha_archive_cnt * sizeof(*ha->ha_archive_id));
146 * register a copy tool
147 * \param mti [IN] MDT context
148 * \param uuid [IN] uuid to be registered
149 * \param archive_mask [IN] bitmask of archive number served by the copytool
151 * \retval -ve failure
153 int mdt_hsm_agent_register_mask(struct mdt_thread_info *mti,
154 const struct obd_uuid *uuid, __u32 archive_mask)
156 int rc, i, nr_archives = 0;
157 __u32 *archive_id = NULL;
160 nr_archives = hweight32(archive_mask);
162 if (nr_archives != 0) {
163 OBD_ALLOC(archive_id, nr_archives * sizeof(*archive_id));
168 for (i = 0; i < sizeof(archive_mask) * 8; i++) {
169 if ((1 << i) & archive_mask) {
170 archive_id[nr_archives] = i + 1;
176 rc = mdt_hsm_agent_register(mti, uuid, nr_archives, archive_id);
178 if (archive_id != NULL)
179 OBD_FREE(archive_id, nr_archives * sizeof(*archive_id));
185 * unregister a copy tool
186 * \param mti [IN] MDT context
187 * \param uuid [IN] uuid to be unregistered
189 * \retval -ve failure
191 int mdt_hsm_agent_unregister(struct mdt_thread_info *mti,
192 const struct obd_uuid *uuid)
194 struct coordinator *cdt = &mti->mti_mdt->mdt_coordinator;
195 struct hsm_agent *ha;
199 /* no coordinator started, so we cannot serve requests */
200 if (cdt->cdt_state == CDT_STOPPED)
203 down_write(&cdt->cdt_agent_lock);
205 ha = mdt_hsm_agent_lookup(cdt, uuid);
207 list_del_init(&ha->ha_list);
209 up_write(&cdt->cdt_agent_lock);
212 GOTO(out, rc = -ENOENT);
214 if (ha->ha_archive_cnt != 0)
215 OBD_FREE(ha->ha_archive_id,
216 ha->ha_archive_cnt * sizeof(*ha->ha_archive_id));
221 CDEBUG(D_HSM, "agent %s unregistration: %d\n", obd_uuid2str(uuid), rc);
227 * update agent statistics
228 * \param mdt [IN] MDT device
229 * \param succ_rq [IN] number of success
230 * \param fail_rq [IN] number of failure
231 * \param new_rq [IN] number of new requests
232 * \param uuid [IN] agent uuid
233 * if all counters == 0, clear counters
235 * \retval -ve failure
237 int mdt_hsm_agent_update_statistics(struct coordinator *cdt,
238 int succ_rq, int fail_rq, int new_rq,
239 const struct obd_uuid *uuid)
241 struct hsm_agent *ha;
245 down_read(&cdt->cdt_agent_lock);
246 list_for_each_entry(ha, &cdt->cdt_agents, ha_list) {
247 if (obd_uuid_equals(&ha->ha_uuid, uuid)) {
248 if (succ_rq == 0 && fail_rq == 0 && new_rq == 0) {
249 atomic_set(&ha->ha_success, 0);
250 atomic_set(&ha->ha_failure, 0);
251 atomic_set(&ha->ha_requests, 0);
253 atomic_add(succ_rq, &ha->ha_success);
254 atomic_add(fail_rq, &ha->ha_failure);
255 atomic_add(new_rq, &ha->ha_requests);
256 atomic_sub(succ_rq, &ha->ha_requests);
257 atomic_sub(fail_rq, &ha->ha_requests);
265 up_read(&cdt->cdt_agent_lock);
270 * find the best agent
271 * \param cdt [IN] coordinator
272 * \param archive [IN] archive number
273 * \param uuid [OUT] agent who can serve archive
275 * \retval -ve failure
277 int mdt_hsm_find_best_agent(struct coordinator *cdt, __u32 archive,
278 struct obd_uuid *uuid)
280 int rc = -EAGAIN, i, load = -1;
281 struct hsm_agent *ha;
284 /* Choose an export to send a copytool req to */
285 down_read(&cdt->cdt_agent_lock);
286 list_for_each_entry(ha, &cdt->cdt_agents, ha_list) {
287 for (i = 0; (i < ha->ha_archive_cnt) &&
288 (ha->ha_archive_id[i] != archive); i++) {
289 /* nothing to do, just skip unmatching records */
292 /* archive count == 0 means copy tool serves any backend */
293 if (ha->ha_archive_cnt != 0 && i == ha->ha_archive_cnt)
296 if (load == -1 || load > atomic_read(&ha->ha_requests)) {
297 load = atomic_read(&ha->ha_requests);
301 if (atomic_read(&ha->ha_requests) == 0)
304 up_read(&cdt->cdt_agent_lock);
309 int mdt_hsm_send_action_to_each_archive(struct mdt_thread_info *mti,
310 struct hsm_action_item *hai)
313 struct hsm_agent *ha;
314 __u32 archive_mask = 0;
315 struct coordinator *cdt = &mti->mti_mdt->mdt_coordinator;
317 /* return error by default in case all archive_ids have unregistered */
321 /* send action to all registered archive_ids */
322 down_read(&cdt->cdt_agent_lock);
323 list_for_each_entry(ha, &cdt->cdt_agents, ha_list) {
324 for (i = 0; (i < ha->ha_archive_cnt); i++) {
325 /* only send once for each archive_id */
326 if ((1 << ha->ha_archive_id[i]) & archive_mask)
328 archive_mask |= (1 << ha->ha_archive_id[i]);
330 /* XXX: instead of creating one request record per
331 * new action, it could make sense to gather
332 * all for the same archive_id as one compound
333 * request/id, like in mdt_hsm_add_actions() ?? */
334 compound_id = atomic_inc_return(&cdt->cdt_compound_id);
335 rc = mdt_agent_record_add(mti->mti_env, mti->mti_mdt,
337 ha->ha_archive_id[i], 0,
340 CERROR("%s: unable to add HSM remove request "
341 "for "DFID": rc=%d\n",
342 mdt_obd_name(mti->mti_mdt),
343 PFID(&hai->hai_fid), rc);
346 CDEBUG(D_HSM, "%s: added HSM remove request "
347 "for "DFID", archive_id=%d\n",
348 mdt_obd_name(mti->mti_mdt),
350 ha->ha_archive_id[i]);
353 /* early exit from loop due to error? */
354 if (i != ha->ha_archive_cnt)
357 up_read(&cdt->cdt_agent_lock);
363 * send a compound request to the agent
364 * \param mti [IN] context
365 * \param hal [IN] request (can be a kuc payload)
366 * \param purge [IN] purge mode (no record)
368 * \retval -ve failure
369 * This function supposes:
370 * - all actions are for the same archive number
371 * - in case of cancel, all cancel are for the same agent
372 * This implies that request split has to be done
373 * before when building the hal
375 int mdt_hsm_agent_send(struct mdt_thread_info *mti,
376 struct hsm_action_list *hal, bool purge)
378 struct obd_export *exp;
379 struct mdt_device *mdt = mti->mti_mdt;
380 struct coordinator *cdt = &mti->mti_mdt->mdt_coordinator;
381 struct hsm_action_list *buf = NULL;
382 struct hsm_action_item *hai;
383 struct obd_uuid uuid;
386 bool is_registered = false;
389 rc = mdt_hsm_find_best_agent(cdt, hal->hal_archive_id, &uuid);
390 if (rc && hal->hal_archive_id == 0) {
394 /* special case of remove requests with no archive_id specified,
395 * and no agent registered to serve all archives, then create a
396 * set of new requests, each to be sent to each registered
398 * Todo so, find all HSMA_REMOVE entries, and then :
399 * _ set completed status as SUCCESS (or FAIL?)
400 * _ create a new LLOG record for each archive_id
401 * presently being served by any CT
403 hai = hai_first(hal);
404 for (i = 0; i < hal->hal_count; i++,
405 hai = hai_next(hai)) {
406 /* only removes are concerned */
407 if (hai->hai_action != HSMA_REMOVE) {
408 /* count if other actions than HSMA_REMOVE,
409 * to return original error/rc */
414 /* send remove request to all registered archive_ids */
415 rc2 = mdt_hsm_send_action_to_each_archive(mti, hai);
419 /* only update original request as SUCCEED if it has
420 * been successfully broadcasted to all available
422 * XXX: this should only cause duplicates to be sent,
423 * unless a method to record already successfully
424 * reached archive_ids is implemented */
425 rc2 = mdt_agent_record_update(mti->mti_env, mdt,
429 CERROR("%s: mdt_agent_record_update() "
430 "failed, cannot update "
431 "status to %s for cookie "
434 agent_req_status2name(ARS_SUCCEED),
435 hai->hai_cookie, rc2);
439 /* only remove requests with archive_id=0 */
446 CERROR("%s: Cannot find agent for archive %d: rc = %d\n",
447 mdt_obd_name(mdt), hal->hal_archive_id, rc);
451 CDEBUG(D_HSM, "Agent %s selected for archive %d\n", obd_uuid2str(&uuid),
452 hal->hal_archive_id);
455 buf = kuc_alloc(len, KUC_TRANSPORT_HSM, HMT_ACTION_LIST);
457 RETURN(PTR_ERR(buf));
458 memcpy(buf, hal, len);
460 /* Check if request is still valid (cf file hsm flags) */
461 fail_request = false;
462 hai = hai_first(hal);
463 for (i = 0; i < hal->hal_count; i++, hai = hai_next(hai)) {
464 if (hai->hai_action != HSMA_CANCEL) {
465 struct mdt_object *obj;
468 obj = mdt_hsm_get_md_hsm(mti, &hai->hai_fid, &hsm);
469 if (!IS_ERR(obj) && obj != NULL) {
470 mdt_object_put(mti->mti_env, obj);
472 if (hai->hai_action == HSMA_REMOVE)
477 rc = mdt_agent_record_update(
483 "%s: mdt_agent_record_update() "
484 "failed, cannot update "
485 "status to %s for cookie "
488 agent_req_status2name(ARS_FAILED),
489 hai->hai_cookie, rc);
494 GOTO(out_buf, rc = PTR_ERR(obj));
497 if (!mdt_hsm_is_action_compat(hai, hal->hal_archive_id,
498 hal->hal_flags, &hsm)) {
499 /* incompatible request, we abort the request */
500 /* next time coordinator will wake up, it will
501 * make the same compound with valid only
504 rc = mdt_agent_record_update(mti->mti_env, mdt,
508 CERROR("%s: mdt_agent_record_update() "
509 "failed, cannot update "
510 "status to %s for cookie "
513 agent_req_status2name(ARS_FAILED),
514 hai->hai_cookie, rc);
518 /* if restore and record status updated, give
519 * back granted layout lock */
520 if (hai->hai_action == HSMA_RESTORE) {
521 struct cdt_restore_handle *crh = NULL;
522 struct mdt_object *obj = NULL;
524 mutex_lock(&cdt->cdt_restore_lock);
525 crh = mdt_hsm_restore_hdl_find(cdt,
528 list_del(&crh->crh_list);
529 mutex_unlock(&cdt->cdt_restore_lock);
530 obj = mdt_object_find(mti->mti_env,
533 if (!IS_ERR(obj) && crh != NULL)
534 mdt_object_unlock(mti, obj,
538 OBD_SLAB_FREE_PTR(crh,
541 mdt_object_put(mti->mti_env,
548 /* we found incompatible requests, so the compound cannot be send
549 * as is. Bad records have been invalidated in llog.
550 * Valid one will be reschedule next time coordinator will wake up
551 * So no need the rebuild a full valid compound request now
554 GOTO(out_buf, rc = 0);
556 /* Cancel memory registration is useless for purge
557 * non registration avoid a deadlock :
558 * in case of failure we have to take the write lock
559 * to remove entry which conflict with the read loack needed
563 /* set is_registered even if failure because we may have
564 * partial work done */
565 is_registered = true;
566 rc = mdt_hsm_add_hal(mti, hal, &uuid);
571 /* Uses the ldlm reverse import; this rpc will be seen by
572 * the ldlm_callback_handler. Note this sends a request RPC
573 * from a server (MDT) to a client (MDC), backwards of normal comms.
575 exp = cfs_hash_lookup(mdt2obd_dev(mdt)->obd_uuid_hash, &uuid);
576 if (exp == NULL || exp->exp_disconnected) {
578 class_export_put(exp);
579 /* This should clean up agents on evicted exports */
581 CERROR("%s: agent uuid (%s) not found, unregistering:"
583 mdt_obd_name(mdt), obd_uuid2str(&uuid), rc);
584 mdt_hsm_agent_unregister(mti, &uuid);
588 /* send request to agent */
589 rc = do_set_info_async(exp->exp_imp_reverse, LDLM_SET_INFO,
591 sizeof(KEY_HSM_COPYTOOL_SEND),
592 KEY_HSM_COPYTOOL_SEND,
593 kuc_len(len), kuc_ptr(buf), NULL);
596 CERROR("%s: cannot send request to agent '%s': rc = %d\n",
597 mdt_obd_name(mdt), obd_uuid2str(&uuid), rc);
599 class_export_put(exp);
602 CDEBUG(D_HSM, "Lost connection to agent '%s', unregistering\n",
603 obd_uuid2str(&uuid));
604 mdt_hsm_agent_unregister(mti, &uuid);
608 if (rc != 0 && is_registered) {
609 /* in case of error, we have to unregister requests */
610 hai = hai_first(hal);
611 for (i = 0; i < hal->hal_count; i++, hai = hai_next(hai)) {
612 if (hai->hai_action == HSMA_CANCEL)
614 mdt_cdt_remove_request(cdt, hai->hai_cookie);
625 * update status of a request
627 * \param pgs [IN] progress of the copy tool
629 * \retval -ve failure
631 int mdt_hsm_coordinator_update(struct mdt_thread_info *mti,
632 struct hsm_progress_kernel *pgs)
637 /* ask to coordinator to update request state and
638 * to record on disk the result */
639 rc = mdt_hsm_update_request_state(mti, pgs, 1);
644 * seq_file method called to start access to /proc file
646 static void *mdt_hsm_agent_proc_start(struct seq_file *s, loff_t *off)
648 struct mdt_device *mdt = s->private;
649 struct coordinator *cdt = &mdt->mdt_coordinator;
650 struct list_head *pos;
654 down_read(&cdt->cdt_agent_lock);
656 if (list_empty(&cdt->cdt_agents))
660 RETURN(SEQ_START_TOKEN);
663 list_for_each(pos, &cdt->cdt_agents) {
673 * seq_file method called to get next item
674 * just returns NULL at eof
676 static void *mdt_hsm_agent_proc_next(struct seq_file *s, void *v, loff_t *p)
678 struct mdt_device *mdt = s->private;
679 struct coordinator *cdt = &mdt->mdt_coordinator;
680 struct list_head *pos = v;
683 if (pos == SEQ_START_TOKEN)
684 pos = cdt->cdt_agents.next;
689 if (pos != &cdt->cdt_agents)
697 static int mdt_hsm_agent_proc_show(struct seq_file *s, void *v)
699 struct list_head *pos = v;
700 struct hsm_agent *ha;
704 if (pos == SEQ_START_TOKEN)
707 ha = list_entry(pos, struct hsm_agent, ha_list);
708 seq_printf(s, "uuid=%s archive_id=", ha->ha_uuid.uuid);
709 if (ha->ha_archive_cnt == 0) {
710 seq_printf(s, "ANY");
712 seq_printf(s, "%d", ha->ha_archive_id[0]);
713 for (i = 1; i < ha->ha_archive_cnt; i++)
714 seq_printf(s, ",%d", ha->ha_archive_id[i]);
717 seq_printf(s, " requests=[current:%d ok:%d errors:%d]\n",
718 atomic_read(&ha->ha_requests),
719 atomic_read(&ha->ha_success),
720 atomic_read(&ha->ha_failure));
725 * seq_file method called to stop access to /proc file
727 static void mdt_hsm_agent_proc_stop(struct seq_file *s, void *v)
729 struct mdt_device *mdt = s->private;
730 struct coordinator *cdt = &mdt->mdt_coordinator;
732 up_read(&cdt->cdt_agent_lock);
735 /* hsm agent list proc functions */
736 static const struct seq_operations mdt_hsm_agent_proc_ops = {
737 .start = mdt_hsm_agent_proc_start,
738 .next = mdt_hsm_agent_proc_next,
739 .show = mdt_hsm_agent_proc_show,
740 .stop = mdt_hsm_agent_proc_stop,
744 * public function called at open of /proc file to get
747 static int lprocfs_open_hsm_agent(struct inode *inode, struct file *file)
753 rc = seq_open(file, &mdt_hsm_agent_proc_ops);
757 s = file->private_data;
758 s->private = PDE_DATA(inode);
763 /* methods to access hsm agent list */
764 const struct file_operations mdt_hsm_agent_fops = {
765 .owner = THIS_MODULE,
766 .open = lprocfs_open_hsm_agent,
769 .release = lprocfs_seq_release,