4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License version 2 for more details. A copy is
14 * included in the COPYING file that accompanied this code.
16 * You should have received a copy of the GNU General Public License
17 * along with this program; if not, write to the Free Software
18 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
23 * (C) Copyright 2012 Commissariat a l'energie atomique et aux energies
28 * lustre/mdt/mdt_hsm_cdt_client.c
30 * Lustre HSM Coordinator
32 * Author: Jacques-Charles Lafoucriere <jacques-charles.lafoucriere@cea.fr>
33 * Author: Aurelien Degremont <aurelien.degremont@cea.fr>
36 #define DEBUG_SUBSYSTEM S_MDS
38 #include <obd_support.h>
39 #include <lustre_net.h>
40 #include <lustre_export.h>
42 #include <lprocfs_status.h>
43 #include <lustre_log.h>
44 #include "mdt_internal.h"
47 * data passed to llog_cat_process() callback
48 * to find compatible requests
50 struct hsm_compat_data_cb {
51 struct coordinator *cdt;
52 struct hsm_action_list *hal;
56 * llog_cat_process() callback, used to find record
57 * compatibles with a new hsm_action_list
58 * \param env [IN] environment
59 * \param llh [IN] llog handle
60 * \param hdr [IN] llog record
61 * \param data [IN] cb data = hsm_compat_data_cb
65 static int hsm_find_compatible_cb(const struct lu_env *env,
66 struct llog_handle *llh,
67 struct llog_rec_hdr *hdr, void *data)
69 struct llog_agent_req_rec *larr;
70 struct hsm_compat_data_cb *hcdcb;
71 struct hsm_action_item *hai;
75 larr = (struct llog_agent_req_rec *)hdr;
77 /* a compatible request must be WAITING or STARTED
79 if ((larr->arr_status != ARS_WAITING &&
80 larr->arr_status != ARS_STARTED) ||
81 larr->arr_hai.hai_action == HSMA_CANCEL)
84 hai = hai_first(hcdcb->hal);
85 for (i = 0; i < hcdcb->hal->hal_count; i++, hai = hai_next(hai)) {
86 /* if request is a CANCEL:
87 * if cookie set in the request, there is no need to find a
88 * compatible one, the cookie in the request is directly used.
89 * if cookie is not set, we use the FID to find the request
90 * to cancel (the "compatible" one)
91 * if the caller sets the cookie, we assume he also sets the
94 if (hai->hai_action == HSMA_CANCEL && hai->hai_cookie != 0)
97 if (!lu_fid_eq(&hai->hai_fid, &larr->arr_hai.hai_fid))
100 /* HSMA_NONE is used to find running request for some FID */
101 if (hai->hai_action == HSMA_NONE) {
102 hcdcb->hal->hal_archive_id = larr->arr_archive_id;
103 hcdcb->hal->hal_flags = larr->arr_flags;
104 *hai = larr->arr_hai;
107 /* in V1 we do not manage partial transfer
108 * so extent is always whole file
110 hai->hai_cookie = larr->arr_hai.hai_cookie;
111 /* we read the archive number from the request we cancel */
112 if (hai->hai_action == HSMA_CANCEL &&
113 hcdcb->hal->hal_archive_id == 0)
114 hcdcb->hal->hal_archive_id = larr->arr_archive_id;
120 * find compatible requests already recorded
121 * \param env [IN] environment
122 * \param mdt [IN] MDT device
123 * \param hal [IN/OUT] new request
124 * cookie set to compatible found or to 0 if not found
125 * for cancel request, see callback hsm_find_compatible_cb()
127 * \retval -ve failure
129 static int hsm_find_compatible(const struct lu_env *env, struct mdt_device *mdt,
130 struct hsm_action_list *hal)
132 struct hsm_action_item *hai;
133 struct hsm_compat_data_cb hcdcb;
138 hai = hai_first(hal);
139 for (i = 0; i < hal->hal_count; i++, hai = hai_next(hai)) {
140 /* in a cancel request hai_cookie may be set by caller to
141 * show the request to be canceled
142 * if not we need to search by FID
144 if (hai->hai_action == HSMA_CANCEL && hai->hai_cookie != 0)
150 /* if all requests are cancel with cookie, no need to find compatible */
151 if (ok_cnt == hal->hal_count)
154 hcdcb.cdt = &mdt->mdt_coordinator;
157 rc = cdt_llog_process(env, mdt, hsm_find_compatible_cb, &hcdcb);
163 * check if an action is really needed
164 * \param hai [IN] request description
165 * \param hal_an [IN] request archive number (not used)
166 * \param rq_flags [IN] request flags
167 * \param hsm [IN] file HSM metadata
170 static bool hsm_action_is_needed(struct hsm_action_item *hai, int hal_an,
171 __u64 rq_flags, struct md_hsm *hsm)
173 bool is_needed = false;
177 if (rq_flags & HSM_FORCE_ACTION)
180 hsm_flags = hsm->mh_flags;
181 switch (hai->hai_action) {
183 if (hsm_flags & HS_DIRTY || !(hsm_flags & HS_ARCHIVED))
187 /* if file is dirty we must return an error, this function
188 * cannot, so we ask for an action and
189 * mdt_hsm_is_action_compat() will return an error
191 if (hsm_flags & (HS_RELEASED | HS_DIRTY))
195 if (hsm_flags & (HS_ARCHIVED | HS_EXISTS))
202 CDEBUG(D_HSM, "fid="DFID" action=%s rq_flags="LPX64
203 " extent="LPX64"-"LPX64" hsm_flags=%X %s\n",
205 hsm_copytool_action2name(hai->hai_action), rq_flags,
206 hai->hai_extent.offset, hai->hai_extent.length,
208 (is_needed ? "action needed" : "no action needed"));
214 * test sanity of an hal
216 * action must be known
220 static bool hal_is_sane(struct hsm_action_list *hal)
223 struct hsm_action_item *hai;
226 if (hal->hal_count == 0)
229 hai = hai_first(hal);
230 for (i = 0; i < hal->hal_count; i++, hai = hai_next(hai)) {
231 if (!fid_is_sane(&hai->hai_fid))
233 switch (hai->hai_action) {
248 hsm_action_permission(struct mdt_thread_info *mti,
249 struct mdt_object *obj,
250 enum hsm_copytool_action hsma)
252 struct coordinator *cdt = &mti->mti_mdt->mdt_coordinator;
253 struct lu_ucred *uc = mdt_ucred(mti);
254 struct md_attr *ma = &mti->mti_attr;
259 if (hsma != HSMA_RESTORE &&
260 exp_connect_flags(mti->mti_exp) & OBD_CONNECT_RDONLY)
263 if (md_capable(uc, CFS_CAP_SYS_ADMIN))
266 ma->ma_need = MA_INODE;
267 rc = mdt_attr_get_complex(mti, obj, ma);
271 if (uc->uc_fsuid == ma->ma_attr.la_uid)
272 mask = &cdt->cdt_user_request_mask;
273 else if (lustre_in_group_p(uc, ma->ma_attr.la_gid))
274 mask = &cdt->cdt_group_request_mask;
276 mask = &cdt->cdt_other_request_mask;
278 if (!(0 <= hsma && hsma < 8 * sizeof(*mask)))
281 RETURN(*mask & (1UL << hsma) ? 0 : -EPERM);
285 * Coordinator external API
289 * register a list of requests
291 * \param hal [IN] list of requests
292 * \param compound_id [OUT] id of the compound request
294 * \retval -ve failure
295 * in case of restore, caller must hold layout lock
297 int mdt_hsm_add_actions(struct mdt_thread_info *mti,
298 struct hsm_action_list *hal, __u64 *compound_id)
300 struct mdt_device *mdt = mti->mti_mdt;
301 struct coordinator *cdt = &mdt->mdt_coordinator;
302 struct hsm_action_item *hai;
303 struct mdt_object *obj = NULL;
306 bool is_restore = false;
309 /* no coordinator started, so we cannot serve requests */
310 if (cdt->cdt_state == CDT_STOPPED)
313 if (!hal_is_sane(hal))
316 *compound_id = atomic_inc_return(&cdt->cdt_compound_id);
318 /* search for compatible request, if found hai_cookie is set
319 * to the request cookie
320 * it is also used to set the cookie for cancel request by FID
322 rc = hsm_find_compatible(mti->mti_env, mdt, hal);
326 hai = hai_first(hal);
327 for (i = 0; i < hal->hal_count; i++, hai = hai_next(hai)) {
331 /* default archive number is the one explicitly specified */
332 archive_id = hal->hal_archive_id;
333 flags = hal->hal_flags;
335 /* by default, data FID is same as Lustre FID */
336 /* the volatile data FID will be created by copy tool and
337 * send from the agent through the progress call */
338 hai->hai_dfid = hai->hai_fid;
340 /* done here to manage first and redundant requests cases */
341 if (hai->hai_action == HSMA_RESTORE)
344 /* test result of hsm_find_compatible()
345 * if request redundant or cancel of nothing
349 if (hai->hai_action != HSMA_CANCEL && hai->hai_cookie != 0)
351 /* cancel nothing case */
352 if (hai->hai_action == HSMA_CANCEL && hai->hai_cookie == 0)
355 /* new request or cancel request
356 * we search for HSM status flags to check for compatibility
357 * if restore, we take the layout lock
360 /* Get HSM attributes and check permissions. */
361 obj = mdt_hsm_get_md_hsm(mti, &hai->hai_fid, &mh);
363 /* In case of REMOVE and CANCEL a Lustre file
364 * is not mandatory, but restrict this
365 * exception to admins. */
366 if (md_capable(mdt_ucred(mti), CFS_CAP_SYS_ADMIN) &&
367 (hai->hai_action == HSMA_REMOVE ||
368 hai->hai_action == HSMA_CANCEL))
371 GOTO(out, rc = PTR_ERR(obj));
374 rc = hsm_action_permission(mti, obj, hai->hai_action);
375 mdt_object_put(mti->mti_env, obj);
380 /* if action is cancel, also no need to check */
381 if (hai->hai_action == HSMA_CANCEL)
384 /* Check if an action is needed, compare request
385 * and HSM flags status */
386 if (!hsm_action_is_needed(hai, archive_id, flags, &mh))
389 /* Check if file request is compatible with HSM flags status
390 * and stop at first incompatible
392 if (!mdt_hsm_is_action_compat(hai, archive_id, flags, &mh))
393 GOTO(out, rc = -EPERM);
395 /* for cancel archive number is taken from canceled request
396 * for other request, we take from lma if not specified,
397 * or we use the default if none found in lma
398 * this works also for archive because the default value is 0
399 * /!\ there is a side effect: in case of restore on multiple
400 * files which are in different backend, the initial compound
401 * request will be split in multiple requests because we cannot
402 * warranty an agent can serve any combinaison of archive
405 if (hai->hai_action != HSMA_CANCEL && archive_id == 0) {
406 if (mh.mh_arch_id != 0)
407 archive_id = mh.mh_arch_id;
409 archive_id = cdt->cdt_default_archive_id;
412 /* if restore, take an exclusive lock on layout */
413 if (hai->hai_action == HSMA_RESTORE) {
414 struct cdt_restore_handle *crh;
416 /* in V1 only whole file is supported. */
417 if (hai->hai_extent.offset != 0)
418 GOTO(out, rc = -EPROTO);
420 OBD_SLAB_ALLOC_PTR(crh, mdt_hsm_cdt_kmem);
422 GOTO(out, rc = -ENOMEM);
424 crh->crh_fid = hai->hai_fid;
425 /* in V1 only whole file is supported. However the
426 * restore may be due to truncate. */
427 crh->crh_extent.start = 0;
428 crh->crh_extent.end = hai->hai_extent.length;
430 mdt_lock_reg_init(&crh->crh_lh, LCK_EX);
431 obj = mdt_object_find_lock(mti, &crh->crh_fid,
433 MDS_INODELOCK_LAYOUT);
436 CERROR("%s: cannot take layout lock for "
437 DFID": rc = %d\n", mdt_obd_name(mdt),
438 PFID(&crh->crh_fid), rc);
439 OBD_SLAB_FREE_PTR(crh, mdt_hsm_cdt_kmem);
443 /* we choose to not keep a keep a reference
444 * on the object during the restore time which can be
446 mdt_object_put(mti->mti_env, obj);
448 mutex_lock(&cdt->cdt_restore_lock);
449 list_add_tail(&crh->crh_list, &cdt->cdt_restore_hdl);
450 mutex_unlock(&cdt->cdt_restore_lock);
454 rc = mdt_agent_record_add(mti->mti_env, mdt, *compound_id,
455 archive_id, flags, hai);
460 (cdt->cdt_policy & CDT_NONBLOCKING_RESTORE))
467 /* if work has been added, wake up coordinator */
468 if (rc == 0 || rc == -ENODATA)
469 mdt_hsm_cdt_wakeup(mdt);
475 * get running action on a FID list or from cookie
477 * \param hal [IN/OUT] requests
479 * \retval -ve failure
481 int mdt_hsm_get_running(struct mdt_thread_info *mti,
482 struct hsm_action_list *hal)
484 struct mdt_device *mdt = mti->mti_mdt;
485 struct coordinator *cdt = &mdt->mdt_coordinator;
486 struct hsm_action_item *hai;
490 hai = hai_first(hal);
491 for (i = 0; i < hal->hal_count; i++, hai = hai_next(hai)) {
492 struct cdt_agent_req *car;
494 if (!fid_is_sane(&hai->hai_fid))
497 car = mdt_cdt_find_request(cdt, 0, &hai->hai_fid);
500 hai->hai_action = HSMA_NONE;
502 *hai = *car->car_hai;
503 mdt_cdt_put_request(car);
510 * check if a restore is running on a FID
511 * this is redundant with mdt_hsm_coordinator_get_running()
512 * but as it can be called frequently when getting attr
513 * we make an optimized/simpler version only for a FID
515 * \param fid [IN] file FID
518 bool mdt_hsm_restore_is_running(struct mdt_thread_info *mti,
519 const struct lu_fid *fid)
521 struct mdt_device *mdt = mti->mti_mdt;
522 struct coordinator *cdt = &mdt->mdt_coordinator;
523 struct cdt_restore_handle *crh;
527 if (!fid_is_sane(fid))
530 mutex_lock(&cdt->cdt_restore_lock);
531 list_for_each_entry(crh, &cdt->cdt_restore_hdl, crh_list) {
532 if (lu_fid_eq(&crh->crh_fid, fid)) {
537 mutex_unlock(&cdt->cdt_restore_lock);
542 * get registered action on a FID list
544 * \param hal [IN/OUT] requests
546 * \retval -ve failure
548 int mdt_hsm_get_actions(struct mdt_thread_info *mti,
549 struct hsm_action_list *hal)
551 struct mdt_device *mdt = mti->mti_mdt;
552 struct coordinator *cdt = &mdt->mdt_coordinator;
553 struct hsm_action_item *hai;
557 hai = hai_first(hal);
558 for (i = 0; i < hal->hal_count; i++, hai = hai_next(hai)) {
559 hai->hai_action = HSMA_NONE;
560 if (!fid_is_sane(&hai->hai_fid))
564 /* 1st we search in recorded requests */
565 rc = hsm_find_compatible(mti->mti_env, mdt, hal);
566 /* if llog file is not created, no action is recorded */
573 /* 2nd we search if the request are running
574 * cookie is cleared to tell to caller, the request is
576 * we could in place use the record status, but in the future
577 * we may want do give back dynamic informations on the
580 hai = hai_first(hal);
581 for (i = 0; i < hal->hal_count; i++, hai = hai_next(hai)) {
582 struct cdt_agent_req *car;
584 car = mdt_cdt_find_request(cdt, hai->hai_cookie, NULL);
590 mdt_cdt_get_work_done(car, &data_moved);
591 /* this is just to give the volume of data moved
592 * it means data_moved data have been moved from the
593 * original request but we do not know which one
595 hai->hai_extent.length = data_moved;
596 mdt_cdt_put_request(car);