4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License version 2 for more details. A copy is
14 * included in the COPYING file that accompanied this code.
16 * You should have received a copy of the GNU General Public License
17 * along with this program; if not, write to the Free Software
18 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
23 * (C) Copyright 2012 Commissariat a l'energie atomique et aux energies
26 * Copyright (c) 2013, 2014, Intel Corporation.
29 * lustre/mdt/mdt_hsm_cdt_client.c
31 * Lustre HSM Coordinator
33 * Author: Jacques-Charles Lafoucriere <jacques-charles.lafoucriere@cea.fr>
34 * Author: Aurelien Degremont <aurelien.degremont@cea.fr>
37 #define DEBUG_SUBSYSTEM S_MDS
39 #include <obd_support.h>
40 #include <lustre_net.h>
41 #include <lustre_export.h>
43 #include <lprocfs_status.h>
44 #include <lustre_log.h>
45 #include "mdt_internal.h"
48 * data passed to llog_cat_process() callback
49 * to find compatible requests
51 struct hsm_compat_data_cb {
52 struct coordinator *cdt;
53 struct hsm_action_list *hal;
57 * llog_cat_process() callback, used to find record
58 * compatibles with a new hsm_action_list
59 * \param env [IN] environment
60 * \param llh [IN] llog handle
61 * \param hdr [IN] llog record
62 * \param data [IN] cb data = hsm_compat_data_cb
66 static int hsm_find_compatible_cb(const struct lu_env *env,
67 struct llog_handle *llh,
68 struct llog_rec_hdr *hdr, void *data)
70 struct llog_agent_req_rec *larr;
71 struct hsm_compat_data_cb *hcdcb;
72 struct hsm_action_item *hai;
76 larr = (struct llog_agent_req_rec *)hdr;
78 /* a compatible request must be WAITING or STARTED
80 if ((larr->arr_status != ARS_WAITING &&
81 larr->arr_status != ARS_STARTED) ||
82 larr->arr_hai.hai_action == HSMA_CANCEL)
85 hai = hai_first(hcdcb->hal);
86 for (i = 0; i < hcdcb->hal->hal_count; i++, hai = hai_next(hai)) {
87 /* if request is a CANCEL:
88 * if cookie set in the request, there is no need to find a
89 * compatible one, the cookie in the request is directly used.
90 * if cookie is not set, we use the FID to find the request
91 * to cancel (the "compatible" one)
92 * if the caller sets the cookie, we assume he also sets the
95 if (hai->hai_action == HSMA_CANCEL && hai->hai_cookie != 0)
98 if (!lu_fid_eq(&hai->hai_fid, &larr->arr_hai.hai_fid))
101 /* HSMA_NONE is used to find running request for some FID */
102 if (hai->hai_action == HSMA_NONE) {
103 hcdcb->hal->hal_archive_id = larr->arr_archive_id;
104 hcdcb->hal->hal_flags = larr->arr_flags;
105 *hai = larr->arr_hai;
108 /* in V1 we do not manage partial transfer
109 * so extent is always whole file
111 hai->hai_cookie = larr->arr_hai.hai_cookie;
112 /* we read the archive number from the request we cancel */
113 if (hai->hai_action == HSMA_CANCEL &&
114 hcdcb->hal->hal_archive_id == 0)
115 hcdcb->hal->hal_archive_id = larr->arr_archive_id;
121 * find compatible requests already recorded
122 * \param env [IN] environment
123 * \param mdt [IN] MDT device
124 * \param hal [IN/OUT] new request
125 * cookie set to compatible found or to 0 if not found
126 * for cancel request, see callback hsm_find_compatible_cb()
128 * \retval -ve failure
130 static int hsm_find_compatible(const struct lu_env *env, struct mdt_device *mdt,
131 struct hsm_action_list *hal)
133 struct hsm_action_item *hai;
134 struct hsm_compat_data_cb hcdcb;
139 hai = hai_first(hal);
140 for (i = 0; i < hal->hal_count; i++, hai = hai_next(hai)) {
141 /* in a cancel request hai_cookie may be set by caller to
142 * show the request to be canceled
143 * if not we need to search by FID
145 if (hai->hai_action == HSMA_CANCEL && hai->hai_cookie != 0)
151 /* if all requests are cancel with cookie, no need to find compatible */
152 if (ok_cnt == hal->hal_count)
155 hcdcb.cdt = &mdt->mdt_coordinator;
158 rc = cdt_llog_process(env, mdt, hsm_find_compatible_cb, &hcdcb);
164 * check if an action is really needed
165 * \param hai [IN] request description
166 * \param hal_an [IN] request archive number (not used)
167 * \param rq_flags [IN] request flags
168 * \param hsm [IN] file HSM metadata
171 static bool hsm_action_is_needed(struct hsm_action_item *hai, int hal_an,
172 __u64 rq_flags, struct md_hsm *hsm)
174 bool is_needed = false;
178 if (rq_flags & HSM_FORCE_ACTION)
181 hsm_flags = hsm->mh_flags;
182 switch (hai->hai_action) {
184 if (hsm_flags & HS_DIRTY || !(hsm_flags & HS_ARCHIVED))
188 /* if file is dirty we must return an error, this function
189 * cannot, so we ask for an action and
190 * mdt_hsm_is_action_compat() will return an error
192 if (hsm_flags & (HS_RELEASED | HS_DIRTY))
196 if (hsm_flags & (HS_ARCHIVED | HS_EXISTS))
203 CDEBUG(D_HSM, "fid="DFID" action=%s rq_flags="LPX64
204 " extent="LPX64"-"LPX64" hsm_flags=%X %s\n",
206 hsm_copytool_action2name(hai->hai_action), rq_flags,
207 hai->hai_extent.offset, hai->hai_extent.length,
209 (is_needed ? "action needed" : "no action needed"));
215 * test sanity of an hal
217 * action must be known
221 static bool hal_is_sane(struct hsm_action_list *hal)
224 struct hsm_action_item *hai;
227 if (hal->hal_count == 0)
230 hai = hai_first(hal);
231 for (i = 0; i < hal->hal_count; i++, hai = hai_next(hai)) {
232 if (!fid_is_sane(&hai->hai_fid))
234 switch (hai->hai_action) {
249 hsm_action_permission(struct mdt_thread_info *mti,
250 struct mdt_object *obj,
251 enum hsm_copytool_action hsma)
253 struct coordinator *cdt = &mti->mti_mdt->mdt_coordinator;
254 struct lu_ucred *uc = mdt_ucred(mti);
255 struct md_attr *ma = &mti->mti_attr;
260 if (hsma != HSMA_RESTORE &&
261 exp_connect_flags(mti->mti_exp) & OBD_CONNECT_RDONLY)
264 if (md_capable(uc, CFS_CAP_SYS_ADMIN))
267 ma->ma_need = MA_INODE;
268 rc = mdt_attr_get_complex(mti, obj, ma);
272 if (uc->uc_fsuid == ma->ma_attr.la_uid)
273 mask = &cdt->cdt_user_request_mask;
274 else if (lustre_in_group_p(uc, ma->ma_attr.la_gid))
275 mask = &cdt->cdt_group_request_mask;
277 mask = &cdt->cdt_other_request_mask;
279 if (!(0 <= hsma && hsma < 8 * sizeof(*mask)))
282 RETURN(*mask & (1UL << hsma) ? 0 : -EPERM);
286 * Coordinator external API
290 * register a list of requests
292 * \param hal [IN] list of requests
293 * \param compound_id [OUT] id of the compound request
295 * \retval -ve failure
296 * in case of restore, caller must hold layout lock
298 int mdt_hsm_add_actions(struct mdt_thread_info *mti,
299 struct hsm_action_list *hal, __u64 *compound_id)
301 struct mdt_device *mdt = mti->mti_mdt;
302 struct coordinator *cdt = &mdt->mdt_coordinator;
303 struct hsm_action_item *hai;
304 struct mdt_object *obj = NULL;
307 bool is_restore = false;
310 /* no coordinator started, so we cannot serve requests */
311 if (cdt->cdt_state == CDT_STOPPED)
314 if (!hal_is_sane(hal))
317 *compound_id = atomic_inc_return(&cdt->cdt_compound_id);
319 /* search for compatible request, if found hai_cookie is set
320 * to the request cookie
321 * it is also used to set the cookie for cancel request by FID
323 rc = hsm_find_compatible(mti->mti_env, mdt, hal);
327 hai = hai_first(hal);
328 for (i = 0; i < hal->hal_count; i++, hai = hai_next(hai)) {
332 /* default archive number is the one explicitly specified */
333 archive_id = hal->hal_archive_id;
334 flags = hal->hal_flags;
336 /* by default, data FID is same as Lustre FID */
337 /* the volatile data FID will be created by copy tool and
338 * send from the agent through the progress call */
339 hai->hai_dfid = hai->hai_fid;
341 /* done here to manage first and redundant requests cases */
342 if (hai->hai_action == HSMA_RESTORE)
345 /* test result of hsm_find_compatible()
346 * if request redundant or cancel of nothing
350 if (hai->hai_action != HSMA_CANCEL && hai->hai_cookie != 0)
352 /* cancel nothing case */
353 if (hai->hai_action == HSMA_CANCEL && hai->hai_cookie == 0)
356 /* new request or cancel request
357 * we search for HSM status flags to check for compatibility
358 * if restore, we take the layout lock
361 /* Get HSM attributes and check permissions. */
362 obj = mdt_hsm_get_md_hsm(mti, &hai->hai_fid, &mh);
364 /* In case of REMOVE and CANCEL a Lustre file
365 * is not mandatory, but restrict this
366 * exception to admins. */
367 if (md_capable(mdt_ucred(mti), CFS_CAP_SYS_ADMIN) &&
368 (hai->hai_action == HSMA_REMOVE ||
369 hai->hai_action == HSMA_CANCEL))
372 GOTO(out, rc = PTR_ERR(obj));
375 rc = hsm_action_permission(mti, obj, hai->hai_action);
376 mdt_object_put(mti->mti_env, obj);
381 /* if action is cancel, also no need to check */
382 if (hai->hai_action == HSMA_CANCEL)
385 /* Check if an action is needed, compare request
386 * and HSM flags status */
387 if (!hsm_action_is_needed(hai, archive_id, flags, &mh))
390 /* Check if file request is compatible with HSM flags status
391 * and stop at first incompatible
393 if (!mdt_hsm_is_action_compat(hai, archive_id, flags, &mh))
394 GOTO(out, rc = -EPERM);
396 /* for cancel archive number is taken from canceled request
397 * for other request, we take from lma if not specified,
398 * or we use the default if none found in lma
399 * this works also for archive because the default value is 0
400 * /!\ there is a side effect: in case of restore on multiple
401 * files which are in different backend, the initial compound
402 * request will be split in multiple requests because we cannot
403 * warranty an agent can serve any combinaison of archive
406 if (hai->hai_action != HSMA_CANCEL && archive_id == 0) {
407 if (mh.mh_arch_id != 0)
408 archive_id = mh.mh_arch_id;
410 archive_id = cdt->cdt_default_archive_id;
413 /* if restore, take an exclusive lock on layout */
414 if (hai->hai_action == HSMA_RESTORE) {
415 struct cdt_restore_handle *crh;
417 /* in V1 only whole file is supported. */
418 if (hai->hai_extent.offset != 0)
419 GOTO(out, rc = -EPROTO);
421 OBD_SLAB_ALLOC_PTR(crh, mdt_hsm_cdt_kmem);
423 GOTO(out, rc = -ENOMEM);
425 crh->crh_fid = hai->hai_fid;
426 /* in V1 only whole file is supported. However the
427 * restore may be due to truncate. */
428 crh->crh_extent.start = 0;
429 crh->crh_extent.end = hai->hai_extent.length;
431 mdt_lock_reg_init(&crh->crh_lh, LCK_EX);
432 obj = mdt_object_find_lock(mti, &crh->crh_fid,
434 MDS_INODELOCK_LAYOUT);
437 CERROR("%s: cannot take layout lock for "
438 DFID": rc = %d\n", mdt_obd_name(mdt),
439 PFID(&crh->crh_fid), rc);
440 OBD_SLAB_FREE_PTR(crh, mdt_hsm_cdt_kmem);
444 /* we choose to not keep a keep a reference
445 * on the object during the restore time which can be
447 mdt_object_put(mti->mti_env, obj);
449 mutex_lock(&cdt->cdt_restore_lock);
450 list_add_tail(&crh->crh_list, &cdt->cdt_restore_hdl);
451 mutex_unlock(&cdt->cdt_restore_lock);
455 rc = mdt_agent_record_add(mti->mti_env, mdt, *compound_id,
456 archive_id, flags, hai);
461 (cdt->cdt_policy & CDT_NONBLOCKING_RESTORE))
468 /* if work has been added, wake up coordinator */
469 if (rc == 0 || rc == -ENODATA)
470 mdt_hsm_cdt_wakeup(mdt);
476 * get running action on a FID list or from cookie
478 * \param hal [IN/OUT] requests
480 * \retval -ve failure
482 int mdt_hsm_get_running(struct mdt_thread_info *mti,
483 struct hsm_action_list *hal)
485 struct mdt_device *mdt = mti->mti_mdt;
486 struct coordinator *cdt = &mdt->mdt_coordinator;
487 struct hsm_action_item *hai;
491 hai = hai_first(hal);
492 for (i = 0; i < hal->hal_count; i++, hai = hai_next(hai)) {
493 struct cdt_agent_req *car;
495 if (!fid_is_sane(&hai->hai_fid))
498 car = mdt_cdt_find_request(cdt, 0, &hai->hai_fid);
501 hai->hai_action = HSMA_NONE;
503 *hai = *car->car_hai;
504 mdt_cdt_put_request(car);
511 * check if a restore is running on a FID
512 * this is redundant with mdt_hsm_coordinator_get_running()
513 * but as it can be called frequently when getting attr
514 * we make an optimized/simpler version only for a FID
516 * \param fid [IN] file FID
519 bool mdt_hsm_restore_is_running(struct mdt_thread_info *mti,
520 const struct lu_fid *fid)
522 struct mdt_device *mdt = mti->mti_mdt;
523 struct coordinator *cdt = &mdt->mdt_coordinator;
524 struct cdt_restore_handle *crh;
528 if (!fid_is_sane(fid))
531 mutex_lock(&cdt->cdt_restore_lock);
532 list_for_each_entry(crh, &cdt->cdt_restore_hdl, crh_list) {
533 if (lu_fid_eq(&crh->crh_fid, fid)) {
538 mutex_unlock(&cdt->cdt_restore_lock);
543 * get registered action on a FID list
545 * \param hal [IN/OUT] requests
547 * \retval -ve failure
549 int mdt_hsm_get_actions(struct mdt_thread_info *mti,
550 struct hsm_action_list *hal)
552 struct mdt_device *mdt = mti->mti_mdt;
553 struct coordinator *cdt = &mdt->mdt_coordinator;
554 struct hsm_action_item *hai;
558 hai = hai_first(hal);
559 for (i = 0; i < hal->hal_count; i++, hai = hai_next(hai)) {
560 hai->hai_action = HSMA_NONE;
561 if (!fid_is_sane(&hai->hai_fid))
565 /* 1st we search in recorded requests */
566 rc = hsm_find_compatible(mti->mti_env, mdt, hal);
567 /* if llog file is not created, no action is recorded */
574 /* 2nd we search if the request are running
575 * cookie is cleared to tell to caller, the request is
577 * we could in place use the record status, but in the future
578 * we may want do give back dynamic informations on the
581 hai = hai_first(hal);
582 for (i = 0; i < hal->hal_count; i++, hai = hai_next(hai)) {
583 struct cdt_agent_req *car;
585 car = mdt_cdt_find_request(cdt, hai->hai_cookie, NULL);
591 mdt_cdt_get_work_done(car, &data_moved);
592 /* this is just to give the volume of data moved
593 * it means data_moved data have been moved from the
594 * original request but we do not know which one
596 hai->hai_extent.length = data_moved;
597 mdt_cdt_put_request(car);