4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License version 2 for more details. A copy is
14 * included in the COPYING file that accompanied this code.
16 * You should have received a copy of the GNU General Public License
17 * along with this program; if not, write to the Free Software
18 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
23 * (C) Copyright 2012 Commissariat a l'energie atomique et aux energies
28 * lustre/mdt/mdt_hsm_cdt_agent.c
30 * Lustre HSM Coordinator
32 * Author: Jacques-Charles Lafoucriere <jacques-charles.lafoucriere@cea.fr>
33 * Author: Aurelien Degremont <aurelien.degremont@cea.fr>
36 #define DEBUG_SUBSYSTEM S_MDS
39 #include <obd_support.h>
40 #include <lustre_export.h>
41 #include <lustre/lustre_user.h>
42 #include <lprocfs_status.h>
43 #include "mdt_internal.h"
50 * find a hsm_agent by uuid
51 * lock cdt_agent_lock needs to be hold by caller
52 * \param cdt [IN] coordinator
53 * \param uuid [IN] agent UUID
54 * \retval hsm_agent pointer or NULL if not found
56 static struct hsm_agent *mdt_hsm_agent_lookup(struct coordinator *cdt,
57 const struct obd_uuid *uuid)
61 list_for_each_entry(ha, &cdt->cdt_agents, ha_list) {
62 if (obd_uuid_equals(&ha->ha_uuid, uuid))
69 * register a copy tool
70 * \param mti [IN] MDT context
71 * \param uuid [IN] client UUID to be registered
72 * \param count [IN] number of archives agent serves
73 * \param archive_id [IN] vector of archive number served by the copytool
77 int mdt_hsm_agent_register(struct mdt_thread_info *mti,
78 const struct obd_uuid *uuid,
79 int nr_archives, __u32 *archive_id)
81 struct coordinator *cdt = &mti->mti_mdt->mdt_coordinator;
82 struct hsm_agent *ha, *tmp;
86 /* no coordinator started, so we cannot serve requests */
87 if (cdt->cdt_state == CDT_STOPPED) {
88 LCONSOLE_WARN("HSM coordinator thread is not running - "
89 "denying agent registration.\n");
95 GOTO(out, rc = -ENOMEM);
98 ha->ha_archive_cnt = nr_archives;
99 if (ha->ha_archive_cnt != 0) {
102 sz = ha->ha_archive_cnt * sizeof(*ha->ha_archive_id);
103 OBD_ALLOC(ha->ha_archive_id, sz);
104 if (ha->ha_archive_id == NULL)
105 GOTO(out_free, rc = -ENOMEM);
106 memcpy(ha->ha_archive_id, archive_id, sz);
108 atomic_set(&ha->ha_requests, 0);
109 atomic_set(&ha->ha_success, 0);
110 atomic_set(&ha->ha_failure, 0);
112 down_write(&cdt->cdt_agent_lock);
113 tmp = mdt_hsm_agent_lookup(cdt, uuid);
115 LCONSOLE_WARN("HSM agent %s already registered\n",
117 up_write(&cdt->cdt_agent_lock);
118 GOTO(out_free, rc = -EEXIST);
121 list_add_tail(&ha->ha_list, &cdt->cdt_agents);
123 if (ha->ha_archive_cnt == 0)
124 CDEBUG(D_HSM, "agent %s registered for all archives\n",
125 obd_uuid2str(&ha->ha_uuid));
127 CDEBUG(D_HSM, "agent %s registered for %d archives\n",
128 obd_uuid2str(&ha->ha_uuid), ha->ha_archive_cnt);
130 up_write(&cdt->cdt_agent_lock);
135 if (ha != NULL && ha->ha_archive_id != NULL)
136 OBD_FREE(ha->ha_archive_id,
137 ha->ha_archive_cnt * sizeof(*ha->ha_archive_id));
145 * register a copy tool
146 * \param mti [IN] MDT context
147 * \param uuid [IN] uuid to be registered
148 * \param archive_mask [IN] bitmask of archive number served by the copytool
150 * \retval -ve failure
152 int mdt_hsm_agent_register_mask(struct mdt_thread_info *mti,
153 const struct obd_uuid *uuid, __u32 archive_mask)
155 int rc, i, nr_archives = 0;
156 __u32 *archive_id = NULL;
159 nr_archives = hweight32(archive_mask);
161 if (nr_archives != 0) {
162 OBD_ALLOC(archive_id, nr_archives * sizeof(*archive_id));
167 for (i = 0; i < sizeof(archive_mask) * 8; i++) {
168 if ((1 << i) & archive_mask) {
169 archive_id[nr_archives] = i + 1;
175 rc = mdt_hsm_agent_register(mti, uuid, nr_archives, archive_id);
177 if (archive_id != NULL)
178 OBD_FREE(archive_id, nr_archives * sizeof(*archive_id));
184 * unregister a copy tool
185 * \param mti [IN] MDT context
186 * \param uuid [IN] uuid to be unregistered
188 * \retval -ve failure
190 int mdt_hsm_agent_unregister(struct mdt_thread_info *mti,
191 const struct obd_uuid *uuid)
193 struct coordinator *cdt = &mti->mti_mdt->mdt_coordinator;
194 struct hsm_agent *ha;
198 /* no coordinator started, so we cannot serve requests */
199 if (cdt->cdt_state == CDT_STOPPED)
202 down_write(&cdt->cdt_agent_lock);
204 ha = mdt_hsm_agent_lookup(cdt, uuid);
206 list_del_init(&ha->ha_list);
208 up_write(&cdt->cdt_agent_lock);
211 GOTO(out, rc = -ENOENT);
213 if (ha->ha_archive_cnt != 0)
214 OBD_FREE(ha->ha_archive_id,
215 ha->ha_archive_cnt * sizeof(*ha->ha_archive_id));
220 CDEBUG(D_HSM, "agent %s unregistration: %d\n", obd_uuid2str(uuid), rc);
226 * update agent statistics
227 * \param mdt [IN] MDT device
228 * \param succ_rq [IN] number of success
229 * \param fail_rq [IN] number of failure
230 * \param new_rq [IN] number of new requests
231 * \param uuid [IN] agent uuid
232 * if all counters == 0, clear counters
234 * \retval -ve failure
236 int mdt_hsm_agent_update_statistics(struct coordinator *cdt,
237 int succ_rq, int fail_rq, int new_rq,
238 const struct obd_uuid *uuid)
240 struct hsm_agent *ha;
244 down_read(&cdt->cdt_agent_lock);
245 list_for_each_entry(ha, &cdt->cdt_agents, ha_list) {
246 if (obd_uuid_equals(&ha->ha_uuid, uuid)) {
247 if (succ_rq == 0 && fail_rq == 0 && new_rq == 0) {
248 atomic_set(&ha->ha_success, 0);
249 atomic_set(&ha->ha_failure, 0);
250 atomic_set(&ha->ha_requests, 0);
252 atomic_add(succ_rq, &ha->ha_success);
253 atomic_add(fail_rq, &ha->ha_failure);
254 atomic_add(new_rq, &ha->ha_requests);
255 atomic_sub(succ_rq, &ha->ha_requests);
256 atomic_sub(fail_rq, &ha->ha_requests);
264 up_read(&cdt->cdt_agent_lock);
269 * find the best agent
270 * \param cdt [IN] coordinator
271 * \param archive [IN] archive number
272 * \param uuid [OUT] agent who can serve archive
274 * \retval -ve failure
276 int mdt_hsm_find_best_agent(struct coordinator *cdt, __u32 archive,
277 struct obd_uuid *uuid)
279 int rc = -EAGAIN, i, load = -1;
280 struct hsm_agent *ha;
283 /* Choose an export to send a copytool req to */
284 down_read(&cdt->cdt_agent_lock);
285 list_for_each_entry(ha, &cdt->cdt_agents, ha_list) {
286 for (i = 0; (i < ha->ha_archive_cnt) &&
287 (ha->ha_archive_id[i] != archive); i++) {
288 /* nothing to do, just skip unmatching records */
291 /* archive count == 0 means copy tool serves any backend */
292 if (ha->ha_archive_cnt != 0 && i == ha->ha_archive_cnt)
295 if (load == -1 || load > atomic_read(&ha->ha_requests)) {
296 load = atomic_read(&ha->ha_requests);
300 if (atomic_read(&ha->ha_requests) == 0)
303 up_read(&cdt->cdt_agent_lock);
309 * send a compound request to the agent
310 * \param mti [IN] context
311 * \param hal [IN] request (can be a kuc payload)
312 * \param purge [IN] purge mode (no record)
314 * \retval -ve failure
315 * This function supposes:
316 * - all actions are for the same archive number
317 * - in case of cancel, all cancel are for the same agent
318 * This implies that request split has to be done
319 * before when building the hal
321 int mdt_hsm_agent_send(struct mdt_thread_info *mti,
322 struct hsm_action_list *hal, bool purge)
324 struct obd_export *exp;
325 struct mdt_device *mdt = mti->mti_mdt;
326 struct coordinator *cdt = &mti->mti_mdt->mdt_coordinator;
327 struct hsm_action_list *buf = NULL;
328 struct hsm_action_item *hai;
329 struct obd_uuid uuid;
332 bool is_registered = false;
335 rc = mdt_hsm_find_best_agent(cdt, hal->hal_archive_id, &uuid);
337 CERROR("%s: Cannot find agent for archive %d: rc = %d\n",
338 mdt_obd_name(mdt), hal->hal_archive_id, rc);
342 CDEBUG(D_HSM, "Agent %s selected for archive %d\n", obd_uuid2str(&uuid),
343 hal->hal_archive_id);
346 if (kuc_ispayload(hal)) {
347 /* hal is already a kuc payload
348 * we do not need to alloc a new one
349 * this avoid a alloc/memcpy/free
353 buf = kuc_alloc(len, KUC_TRANSPORT_HSM, HMT_ACTION_LIST);
355 RETURN(PTR_ERR(buf));
356 memcpy(buf, hal, len);
359 /* Check if request is still valid (cf file hsm flags) */
360 fail_request = false;
361 hai = hai_first(hal);
362 for (i = 0; i < hal->hal_count; i++, hai = hai_next(hai)) {
363 if (hai->hai_action != HSMA_CANCEL) {
364 struct mdt_object *obj;
367 obj = mdt_hsm_get_md_hsm(mti, &hai->hai_fid, &hsm);
368 if (!IS_ERR(obj) && obj != NULL) {
369 mdt_object_put(mti->mti_env, obj);
371 if (hai->hai_action == HSMA_REMOVE)
376 rc = mdt_agent_record_update(
382 "%s: mdt_agent_record_update() "
383 "failed, cannot update "
384 "status to %s for cookie "
387 agent_req_status2name(ARS_FAILED),
388 hai->hai_cookie, rc);
393 GOTO(out_buf, rc = PTR_ERR(obj));
396 if (!mdt_hsm_is_action_compat(hai, hal->hal_archive_id,
397 hal->hal_flags, &hsm)) {
398 /* incompatible request, we abort the request */
399 /* next time coordinator will wake up, it will
400 * make the same compound with valid only
403 rc = mdt_agent_record_update(mti->mti_env, mdt,
407 CERROR("%s: mdt_agent_record_update() "
408 "failed, cannot update "
409 "status to %s for cookie "
412 agent_req_status2name(ARS_FAILED),
413 hai->hai_cookie, rc);
420 /* we found incompatible requests, so the compound cannot be send
421 * as is. Bad records have been invalidated in llog.
422 * Valid one will be reschedule next time coordinator will wake up
423 * So no need the rebuild a full valid compound request now
426 GOTO(out_buf, rc = 0);
428 /* Cancel memory registration is useless for purge
429 * non registration avoid a deadlock :
430 * in case of failure we have to take the write lock
431 * to remove entry which conflict with the read loack needed
435 /* set is_registered even if failure because we may have
436 * partial work done */
437 is_registered = true;
438 rc = mdt_hsm_add_hal(mti, hal, &uuid);
443 /* Uses the ldlm reverse import; this rpc will be seen by
444 * the ldlm_callback_handler. Note this sends a request RPC
445 * from a server (MDT) to a client (MDC), backwards of normal comms.
447 exp = cfs_hash_lookup(mdt2obd_dev(mdt)->obd_uuid_hash, &uuid);
448 if (exp == NULL || exp->exp_disconnected) {
449 /* This should clean up agents on evicted exports */
451 CERROR("%s: agent uuid (%s) not found, unregistering:"
453 mdt_obd_name(mdt), obd_uuid2str(&uuid), rc);
454 mdt_hsm_agent_unregister(mti, &uuid);
458 /* send request to agent */
459 rc = do_set_info_async(exp->exp_imp_reverse, LDLM_SET_INFO,
461 sizeof(KEY_HSM_COPYTOOL_SEND),
462 KEY_HSM_COPYTOOL_SEND,
463 kuc_len(len), kuc_ptr(buf), NULL);
466 CERROR("%s: cannot send request to agent '%s': rc = %d\n",
467 mdt_obd_name(mdt), obd_uuid2str(&uuid), rc);
469 class_export_put(exp);
472 CDEBUG(D_HSM, "Lost connection to agent '%s', unregistering\n",
473 obd_uuid2str(&uuid));
474 mdt_hsm_agent_unregister(mti, &uuid);
478 if (rc != 0 && is_registered) {
479 /* in case of error, we have to unregister requests */
480 hai = hai_first(hal);
481 for (i = 0; i < hal->hal_count; i++, hai = hai_next(hai)) {
482 if (hai->hai_action == HSMA_CANCEL)
484 mdt_cdt_remove_request(cdt, hai->hai_cookie);
496 * update status of a request
498 * \param pgs [IN] progress of the copy tool
500 * \retval -ve failure
502 int mdt_hsm_coordinator_update(struct mdt_thread_info *mti,
503 struct hsm_progress_kernel *pgs)
508 /* ask to coodinator to update request state and
509 * to record on disk the result */
510 rc = mdt_hsm_update_request_state(mti, pgs, 1);
515 * seq_file method called to start access to /proc file
517 static void *mdt_hsm_agent_proc_start(struct seq_file *s, loff_t *off)
519 struct mdt_device *mdt = s->private;
520 struct coordinator *cdt = &mdt->mdt_coordinator;
521 struct list_head *pos;
525 down_read(&cdt->cdt_agent_lock);
527 if (list_empty(&cdt->cdt_agents))
531 RETURN(SEQ_START_TOKEN);
534 list_for_each(pos, &cdt->cdt_agents) {
544 * seq_file method called to get next item
545 * just returns NULL at eof
547 static void *mdt_hsm_agent_proc_next(struct seq_file *s, void *v, loff_t *p)
549 struct mdt_device *mdt = s->private;
550 struct coordinator *cdt = &mdt->mdt_coordinator;
551 struct list_head *pos = v;
554 if (pos == SEQ_START_TOKEN)
555 pos = cdt->cdt_agents.next;
560 if (pos != &cdt->cdt_agents)
568 static int mdt_hsm_agent_proc_show(struct seq_file *s, void *v)
570 struct list_head *pos = v;
571 struct hsm_agent *ha;
575 if (pos == SEQ_START_TOKEN)
578 ha = list_entry(pos, struct hsm_agent, ha_list);
579 seq_printf(s, "uuid=%s archive#=%d (", ha->ha_uuid.uuid,
581 if (ha->ha_archive_cnt == 0)
582 seq_printf(s, "all");
584 for (i = 0; i < ha->ha_archive_cnt; i++)
585 seq_printf(s, "%d ", ha->ha_archive_id[i]);
587 seq_printf(s, ") r=%d s=%d f=%d\n",
588 atomic_read(&ha->ha_requests),
589 atomic_read(&ha->ha_success),
590 atomic_read(&ha->ha_failure));
595 * seq_file method called to stop access to /proc file
597 static void mdt_hsm_agent_proc_stop(struct seq_file *s, void *v)
599 struct mdt_device *mdt = s->private;
600 struct coordinator *cdt = &mdt->mdt_coordinator;
602 up_read(&cdt->cdt_agent_lock);
605 /* hsm agent list proc functions */
606 static const struct seq_operations mdt_hsm_agent_proc_ops = {
607 .start = mdt_hsm_agent_proc_start,
608 .next = mdt_hsm_agent_proc_next,
609 .show = mdt_hsm_agent_proc_show,
610 .stop = mdt_hsm_agent_proc_stop,
614 * public function called at open of /proc file to get
617 static int lprocfs_open_hsm_agent(struct inode *inode, struct file *file)
623 if (LPROCFS_ENTRY_CHECK(PDE(inode)))
626 rc = seq_open(file, &mdt_hsm_agent_proc_ops);
630 s = file->private_data;
631 s->private = PDE(inode)->data;
636 /* methods to access hsm agent list */
637 const struct file_operations mdt_hsm_agent_fops = {
638 .owner = THIS_MODULE,
639 .open = lprocfs_open_hsm_agent,
642 .release = lprocfs_seq_release,