4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License version 2 for more details. A copy is
14 * included in the COPYING file that accompanied this code.
16 * You should have received a copy of the GNU General Public License
17 * along with this program; if not, write to the Free Software
18 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
23 * (C) Copyright 2012 Commissariat a l'energie atomique et aux energies
28 * lustre/mdt/mdt_hsm_cdt_client.c
30 * Lustre HSM Coordinator
32 * Author: Jacques-Charles Lafoucriere <jacques-charles.lafoucriere@cea.fr>
33 * Author: Aurelien Degremont <aurelien.degremont@cea.fr>
36 #define DEBUG_SUBSYSTEM S_MDS
38 #include <obd_support.h>
39 #include <lustre_net.h>
40 #include <lustre_export.h>
43 #include <lprocfs_status.h>
44 #include <lustre_log.h>
45 #include "mdt_internal.h"
48 * data passed to llog_cat_process() callback
49 * to find compatible requests
51 struct hsm_compat_data_cb {
52 struct coordinator *cdt;
53 struct hsm_action_list *hal;
57 * llog_cat_process() callback, used to find record
58 * compatibles with a new hsm_action_list
59 * \param env [IN] environment
60 * \param llh [IN] llog handle
61 * \param hdr [IN] llog record
62 * \param data [IN] cb data = hsm_compat_data_cb
66 static int hsm_find_compatible_cb(const struct lu_env *env,
67 struct llog_handle *llh,
68 struct llog_rec_hdr *hdr, void *data)
70 struct llog_agent_req_rec *larr;
71 struct hsm_compat_data_cb *hcdcb;
72 struct hsm_action_item *hai;
76 larr = (struct llog_agent_req_rec *)hdr;
78 /* a compatible request must be WAITING or STARTED
80 if ((larr->arr_status != ARS_WAITING &&
81 larr->arr_status != ARS_STARTED) ||
82 larr->arr_hai.hai_action == HSMA_CANCEL)
85 hai = hai_first(hcdcb->hal);
86 for (i = 0; i < hcdcb->hal->hal_count; i++, hai = hai_next(hai)) {
87 /* if request is a CANCEL:
88 * if cookie set in the request, there is no need to find a
89 * compatible one, the cookie in the request is directly used.
90 * if cookie is not set, we use the FID to find the request
91 * to cancel (the "compatible" one)
92 * if the caller sets the cookie, we assume he also sets the
95 if (hai->hai_action == HSMA_CANCEL && hai->hai_cookie != 0)
98 if (!lu_fid_eq(&hai->hai_fid, &larr->arr_hai.hai_fid))
101 /* HSMA_NONE is used to find running request for some FID */
102 if (hai->hai_action == HSMA_NONE) {
103 hcdcb->hal->hal_archive_id = larr->arr_archive_id;
104 hcdcb->hal->hal_flags = larr->arr_flags;
105 *hai = larr->arr_hai;
108 /* in V1 we do not manage partial transfer
109 * so extent is always whole file
111 hai->hai_cookie = larr->arr_hai.hai_cookie;
112 /* we read the archive number from the request we cancel */
113 if (hai->hai_action == HSMA_CANCEL &&
114 hcdcb->hal->hal_archive_id == 0)
115 hcdcb->hal->hal_archive_id = larr->arr_archive_id;
121 * find compatible requests already recorded
122 * \param env [IN] environment
123 * \param mdt [IN] MDT device
124 * \param hal [IN/OUT] new request
125 * cookie set to compatible found or to 0 if not found
126 * for cancel request, see callback hsm_find_compatible_cb()
128 * \retval -ve failure
130 static int hsm_find_compatible(const struct lu_env *env, struct mdt_device *mdt,
131 struct hsm_action_list *hal)
133 struct hsm_action_item *hai;
134 struct hsm_compat_data_cb hcdcb;
139 hai = hai_first(hal);
140 for (i = 0; i < hal->hal_count; i++, hai = hai_next(hai)) {
141 /* in a cancel request hai_cookie may be set by caller to
142 * show the request to be canceled
143 * if not we need to search by FID
145 if (hai->hai_action == HSMA_CANCEL && hai->hai_cookie != 0)
151 /* if all requests are cancel with cookie, no need to find compatible */
152 if (ok_cnt == hal->hal_count)
155 hcdcb.cdt = &mdt->mdt_coordinator;
158 rc = cdt_llog_process(env, mdt, hsm_find_compatible_cb, &hcdcb);
164 * check if an action is really needed
165 * \param hai [IN] request description
166 * \param hal_an [IN] request archive number (not used)
167 * \param rq_flags [IN] request flags
168 * \param hsm [IN] file HSM metadata
171 static bool hsm_action_is_needed(struct hsm_action_item *hai, int hal_an,
172 __u64 rq_flags, struct md_hsm *hsm)
174 bool is_needed = false;
178 if (rq_flags & HSM_FORCE_ACTION)
181 hsm_flags = hsm->mh_flags;
182 switch (hai->hai_action) {
184 if (hsm_flags & HS_DIRTY || !(hsm_flags & HS_ARCHIVED))
188 /* if file is dirty we must return an error, this function
189 * cannot, so we ask for an action and
190 * mdt_hsm_is_action_compat() will return an error
192 if (hsm_flags & (HS_RELEASED | HS_DIRTY))
196 if (hsm_flags & (HS_ARCHIVED | HS_EXISTS))
203 CDEBUG(D_HSM, "fid="DFID" action=%s rq_flags="LPX64
204 " extent="LPX64"-"LPX64" hsm_flags=%X %s\n",
206 hsm_copytool_action2name(hai->hai_action), rq_flags,
207 hai->hai_extent.offset, hai->hai_extent.length,
209 (is_needed ? "action needed" : "no action needed"));
215 * test sanity of an hal
217 * action must be known
221 static bool hal_is_sane(struct hsm_action_list *hal)
224 struct hsm_action_item *hai;
227 if (hal->hal_count == 0)
230 hai = hai_first(hal);
231 for (i = 0; i < hal->hal_count; i++, hai = hai_next(hai)) {
232 if (!fid_is_sane(&hai->hai_fid))
234 switch (hai->hai_action) {
249 * Coordinator external API
253 * register a list of requests
255 * \param hal [IN] list of requests
256 * \param compound_id [OUT] id of the compound request
258 * \retval -ve failure
259 * in case of restore, caller must hold layout lock
261 int mdt_hsm_add_actions(struct mdt_thread_info *mti,
262 struct hsm_action_list *hal, __u64 *compound_id)
264 struct mdt_device *mdt = mti->mti_mdt;
265 struct coordinator *cdt = &mdt->mdt_coordinator;
266 struct hsm_action_item *hai;
267 struct mdt_object *obj = NULL;
270 bool is_restore = false;
273 /* no coordinator started, so we cannot serve requests */
274 if (cdt->cdt_state == CDT_STOPPED)
277 if (!hal_is_sane(hal))
280 *compound_id = atomic_inc_return(&cdt->cdt_compound_id);
282 /* search for compatible request, if found hai_cookie is set
283 * to the request cookie
284 * it is also used to set the cookie for cancel request by FID
286 rc = hsm_find_compatible(mti->mti_env, mdt, hal);
290 hai = hai_first(hal);
291 for (i = 0; i < hal->hal_count; i++, hai = hai_next(hai)) {
295 /* default archive number is the one explicitly specified */
296 archive_id = hal->hal_archive_id;
297 flags = hal->hal_flags;
299 /* by default, data FID is same as Lustre FID */
300 /* the volatile data FID will be created by copy tool and
301 * send from the agent through the progress call */
302 hai->hai_dfid = hai->hai_fid;
304 /* done here to manage first and redundant requests cases */
305 if (hai->hai_action == HSMA_RESTORE)
308 /* test result of hsm_find_compatible()
309 * if request redundant or cancel of nothing
313 if (hai->hai_action != HSMA_CANCEL && hai->hai_cookie != 0)
315 /* cancel nothing case */
316 if (hai->hai_action == HSMA_CANCEL && hai->hai_cookie == 0)
319 /* new request or cancel request
320 * we search for HSM status flags to check for compatibility
321 * if restore, we take the layout lock
324 /* if action is cancel, also no need to check */
325 if (hai->hai_action == HSMA_CANCEL)
328 /* get HSM attributes */
329 obj = mdt_hsm_get_md_hsm(mti, &hai->hai_fid, &mh);
330 if (IS_ERR(obj) || obj == NULL) {
331 /* in case of archive remove, Lustre file
332 * is not mandatory */
333 if (hai->hai_action == HSMA_REMOVE)
336 GOTO(out, rc = -ENOENT);
337 GOTO(out, rc = PTR_ERR(obj));
339 mdt_object_put(mti->mti_env, obj);
341 /* Check if an action is needed, compare request
342 * and HSM flags status */
343 if (!hsm_action_is_needed(hai, archive_id, flags, &mh))
346 /* Check if file request is compatible with HSM flags status
347 * and stop at first incompatible
349 if (!mdt_hsm_is_action_compat(hai, archive_id, flags, &mh))
350 GOTO(out, rc = -EPERM);
352 /* for cancel archive number is taken from canceled request
353 * for other request, we take from lma if not specified,
354 * this works also for archive because the default value is 0
355 * /!\ there is a side effect: in case of restore on multiple
356 * files which are in different backend, the initial compound
357 * request will be split in multiple requests because we cannot
358 * warranty an agent can serve any combinaison of archive
361 if (hai->hai_action != HSMA_CANCEL && archive_id == 0)
362 archive_id = mh.mh_arch_id;
364 /* if restore, take an exclusive lock on layout */
365 if (hai->hai_action == HSMA_RESTORE) {
366 struct cdt_restore_handle *crh;
367 struct mdt_object *child;
369 OBD_SLAB_ALLOC_PTR(crh, mdt_hsm_cdt_kmem);
371 GOTO(out, rc = -ENOMEM);
373 crh->crh_fid = hai->hai_fid;
374 /* in V1 only whole file is supported
375 crh->extent.start = hai->hai_extent.offset;
376 crh->extent.end = hai->hai_extent.offset +
377 hai->hai_extent.length;
379 crh->crh_extent.start = 0;
380 crh->crh_extent.end = OBD_OBJECT_EOF;
382 mdt_lock_reg_init(&crh->crh_lh, LCK_EX);
383 child = mdt_object_find_lock(mti, &crh->crh_fid,
385 MDS_INODELOCK_LAYOUT);
388 CERROR("%s: cannot take layout lock for "
389 DFID": rc = %d\n", mdt_obd_name(mdt),
390 PFID(&crh->crh_fid), rc);
391 OBD_SLAB_FREE_PTR(crh, mdt_hsm_cdt_kmem);
394 /* we choose to not keep a keep a reference
395 * on the object during the restore time which can be
397 mdt_object_put(mti->mti_env, child);
399 mutex_lock(&cdt->cdt_restore_lock);
400 list_add_tail(&crh->crh_list, &cdt->cdt_restore_hdl);
401 mutex_unlock(&cdt->cdt_restore_lock);
405 rc = mdt_agent_record_add(mti->mti_env, mdt, *compound_id,
406 archive_id, flags, hai);
411 (cdt->cdt_policy & CDT_NONBLOCKING_RESTORE))
418 /* if work has been added, wake up coordinator */
419 if (rc == 0 || rc == -ENODATA)
420 mdt_hsm_cdt_wakeup(mdt);
426 * get running action on a FID list or from cookie
428 * \param hal [IN/OUT] requests
430 * \retval -ve failure
432 int mdt_hsm_get_running(struct mdt_thread_info *mti,
433 struct hsm_action_list *hal)
435 struct mdt_device *mdt = mti->mti_mdt;
436 struct coordinator *cdt = &mdt->mdt_coordinator;
437 struct hsm_action_item *hai;
441 hai = hai_first(hal);
442 for (i = 0; i < hal->hal_count; i++, hai = hai_next(hai)) {
443 struct cdt_agent_req *car;
445 if (!fid_is_sane(&hai->hai_fid))
448 car = mdt_cdt_find_request(cdt, 0, &hai->hai_fid);
451 hai->hai_action = HSMA_NONE;
453 *hai = *car->car_hai;
454 mdt_cdt_put_request(car);
461 * check if a restore is running on a FID
462 * this is redundant with mdt_hsm_coordinator_get_running()
463 * but as it can be called frequently when getting attr
464 * we make an optimized/simpler version only for a FID
466 * \param fid [IN] file FID
469 bool mdt_hsm_restore_is_running(struct mdt_thread_info *mti,
470 const struct lu_fid *fid)
472 struct mdt_device *mdt = mti->mti_mdt;
473 struct coordinator *cdt = &mdt->mdt_coordinator;
474 struct cdt_restore_handle *crh;
478 if (!fid_is_sane(fid))
481 mutex_lock(&cdt->cdt_restore_lock);
482 list_for_each_entry(crh, &cdt->cdt_restore_hdl, crh_list) {
483 if (lu_fid_eq(&crh->crh_fid, fid)) {
488 mutex_unlock(&cdt->cdt_restore_lock);
493 * get registered action on a FID list
495 * \param hal [IN/OUT] requests
497 * \retval -ve failure
499 int mdt_hsm_get_actions(struct mdt_thread_info *mti,
500 struct hsm_action_list *hal)
502 struct mdt_device *mdt = mti->mti_mdt;
503 struct coordinator *cdt = &mdt->mdt_coordinator;
504 struct hsm_action_item *hai;
508 hai = hai_first(hal);
509 for (i = 0; i < hal->hal_count; i++, hai = hai_next(hai)) {
510 hai->hai_action = HSMA_NONE;
511 if (!fid_is_sane(&hai->hai_fid))
515 /* 1st we search in recorded requests */
516 rc = hsm_find_compatible(mti->mti_env, mdt, hal);
517 /* if llog file is not created, no action is recorded */
524 /* 2nd we search if the request are running
525 * cookie is cleared to tell to caller, the request is
527 * we could in place use the record status, but in the future
528 * we may want do give back dynamic informations on the
531 hai = hai_first(hal);
532 for (i = 0; i < hal->hal_count; i++, hai = hai_next(hai)) {
533 struct cdt_agent_req *car;
535 car = mdt_cdt_find_request(cdt, hai->hai_cookie, NULL);
541 mdt_cdt_get_work_done(car, &data_moved);
542 /* this is just to give the volume of data moved
543 * it means data_moved data have been moved from the
544 * original request but we do not know which one
546 hai->hai_extent.length = data_moved;
547 mdt_cdt_put_request(car);