Whamcloud - gitweb
LU-2675 obdclass: remove uses of lov_stripe_md
[fs/lustre-release.git] / lustre / mdt / mdt_hsm_cdt_client.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13  * GNU General Public License version 2 for more details.  A copy is
14  * included in the COPYING file that accompanied this code.
15  *
16  * You should have received a copy of the GNU General Public License
17  * along with this program; if not, write to the Free Software
18  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
19  *
20  * GPL HEADER END
21  */
22 /*
23  * (C) Copyright 2012 Commissariat a l'energie atomique et aux energies
24  *     alternatives
25  *
26  */
27 /*
28  * lustre/mdt/mdt_hsm_cdt_client.c
29  *
30  * Lustre HSM Coordinator
31  *
32  * Author: Jacques-Charles Lafoucriere <jacques-charles.lafoucriere@cea.fr>
33  * Author: Aurelien Degremont <aurelien.degremont@cea.fr>
34  */
35
36 #define DEBUG_SUBSYSTEM S_MDS
37
38 #include <obd_support.h>
39 #include <lustre_net.h>
40 #include <lustre_export.h>
41 #include <obd.h>
42 #include <lprocfs_status.h>
43 #include <lustre_log.h>
44 #include "mdt_internal.h"
45
46 /**
47  * data passed to llog_cat_process() callback
48  * to find compatible requests
49  */
50 struct hsm_compat_data_cb {
51         struct coordinator      *cdt;
52         struct hsm_action_list  *hal;
53 };
54
55 /**
56  * llog_cat_process() callback, used to find record
57  * compatibles with a new hsm_action_list
58  * \param env [IN] environment
59  * \param llh [IN] llog handle
60  * \param hdr [IN] llog record
61  * \param data [IN] cb data = hsm_compat_data_cb
62  * \retval 0 success
63  * \retval -ve failure
64  */
65 static int hsm_find_compatible_cb(const struct lu_env *env,
66                                   struct llog_handle *llh,
67                                   struct llog_rec_hdr *hdr, void *data)
68 {
69         struct llog_agent_req_rec       *larr;
70         struct hsm_compat_data_cb       *hcdcb;
71         struct hsm_action_item          *hai;
72         int                              i;
73         ENTRY;
74
75         larr = (struct llog_agent_req_rec *)hdr;
76         hcdcb = data;
77         /* a compatible request must be WAITING or STARTED
78          * and not a cancel */
79         if ((larr->arr_status != ARS_WAITING &&
80              larr->arr_status != ARS_STARTED) ||
81             larr->arr_hai.hai_action == HSMA_CANCEL)
82                 RETURN(0);
83
84         hai = hai_first(hcdcb->hal);
85         for (i = 0; i < hcdcb->hal->hal_count; i++, hai = hai_next(hai)) {
86                 /* if request is a CANCEL:
87                  * if cookie set in the request, there is no need to find a
88                  * compatible one, the cookie in the request is directly used.
89                  * if cookie is not set, we use the FID to find the request
90                  * to cancel (the "compatible" one)
91                  * if the caller sets the cookie, we assume he also sets the
92                  * arr_archive_id
93                  */
94                 if (hai->hai_action == HSMA_CANCEL && hai->hai_cookie != 0)
95                         continue;
96
97                 if (!lu_fid_eq(&hai->hai_fid, &larr->arr_hai.hai_fid))
98                         continue;
99
100                 /* HSMA_NONE is used to find running request for some FID */
101                 if (hai->hai_action == HSMA_NONE) {
102                         hcdcb->hal->hal_archive_id = larr->arr_archive_id;
103                         hcdcb->hal->hal_flags = larr->arr_flags;
104                         *hai = larr->arr_hai;
105                         continue;
106                 }
107                 /* in V1 we do not manage partial transfer
108                  * so extent is always whole file
109                  */
110                 hai->hai_cookie = larr->arr_hai.hai_cookie;
111                 /* we read the archive number from the request we cancel */
112                 if (hai->hai_action == HSMA_CANCEL &&
113                     hcdcb->hal->hal_archive_id == 0)
114                         hcdcb->hal->hal_archive_id = larr->arr_archive_id;
115         }
116         RETURN(0);
117 }
118
119 /**
120  * find compatible requests already recorded
121  * \param env [IN] environment
122  * \param mdt [IN] MDT device
123  * \param hal [IN/OUT] new request
124  *    cookie set to compatible found or to 0 if not found
125  *    for cancel request, see callback hsm_find_compatible_cb()
126  * \retval 0 success
127  * \retval -ve failure
128  */
129 static int hsm_find_compatible(const struct lu_env *env, struct mdt_device *mdt,
130                                struct hsm_action_list *hal)
131 {
132         struct hsm_action_item          *hai;
133         struct hsm_compat_data_cb        hcdcb;
134         int                              rc, i, ok_cnt;
135         ENTRY;
136
137         ok_cnt = 0;
138         hai = hai_first(hal);
139         for (i = 0; i < hal->hal_count; i++, hai = hai_next(hai)) {
140                 /* in a cancel request hai_cookie may be set by caller to
141                  * show the request to be canceled
142                  * if not we need to search by FID
143                  */
144                 if (hai->hai_action == HSMA_CANCEL && hai->hai_cookie != 0)
145                         ok_cnt++;
146                 else
147                         hai->hai_cookie = 0;
148         }
149
150         /* if all requests are cancel with cookie, no need to find compatible */
151         if (ok_cnt == hal->hal_count)
152                 RETURN(0);
153
154         hcdcb.cdt = &mdt->mdt_coordinator;
155         hcdcb.hal = hal;
156
157         rc = cdt_llog_process(env, mdt, hsm_find_compatible_cb, &hcdcb);
158
159         RETURN(rc);
160 }
161
162 /**
163  * check if an action is really needed
164  * \param hai [IN] request description
165  * \param hal_an [IN] request archive number (not used)
166  * \param rq_flags [IN] request flags
167  * \param hsm [IN] file HSM metadata
168  * \retval boolean
169  */
170 static bool hsm_action_is_needed(struct hsm_action_item *hai, int hal_an,
171                                  __u64 rq_flags, struct md_hsm *hsm)
172 {
173         bool     is_needed = false;
174         int      hsm_flags;
175         ENTRY;
176
177         if (rq_flags & HSM_FORCE_ACTION)
178                 RETURN(true);
179
180         hsm_flags = hsm->mh_flags;
181         switch (hai->hai_action) {
182         case HSMA_ARCHIVE:
183                 if (hsm_flags & HS_DIRTY || !(hsm_flags & HS_ARCHIVED))
184                         is_needed = true;
185                 break;
186         case HSMA_RESTORE:
187                 /* if file is dirty we must return an error, this function
188                  * cannot, so we ask for an action and
189                  * mdt_hsm_is_action_compat() will return an error
190                  */
191                 if (hsm_flags & (HS_RELEASED | HS_DIRTY))
192                         is_needed = true;
193                 break;
194         case HSMA_REMOVE:
195                 if (hsm_flags & (HS_ARCHIVED | HS_EXISTS))
196                         is_needed = true;
197                 break;
198         case HSMA_CANCEL:
199                 is_needed = true;
200                 break;
201         }
202         CDEBUG(D_HSM, "fid="DFID" action=%s rq_flags="LPX64
203                       " extent="LPX64"-"LPX64" hsm_flags=%X %s\n",
204                       PFID(&hai->hai_fid),
205                       hsm_copytool_action2name(hai->hai_action), rq_flags,
206                       hai->hai_extent.offset, hai->hai_extent.length,
207                       hsm->mh_flags,
208                       (is_needed ? "action needed" : "no action needed"));
209
210         RETURN(is_needed);
211 }
212
213 /**
214  * test sanity of an hal
215  * FID must be valid
216  * action must be known
217  * \param hal [IN]
218  * \retval boolean
219  */
220 static bool hal_is_sane(struct hsm_action_list *hal)
221 {
222         int                      i;
223         struct hsm_action_item  *hai;
224         ENTRY;
225
226         if (hal->hal_count == 0)
227                 RETURN(false);
228
229         hai = hai_first(hal);
230         for (i = 0; i < hal->hal_count; i++, hai = hai_next(hai)) {
231                 if (!fid_is_sane(&hai->hai_fid))
232                         RETURN(false);
233                 switch (hai->hai_action) {
234                 case HSMA_NONE:
235                 case HSMA_ARCHIVE:
236                 case HSMA_RESTORE:
237                 case HSMA_REMOVE:
238                 case HSMA_CANCEL:
239                         break;
240                 default:
241                         RETURN(false);
242                 }
243         }
244         RETURN(true);
245 }
246
247 static int
248 hsm_action_permission(struct mdt_thread_info *mti,
249                       struct mdt_object *obj,
250                       enum hsm_copytool_action hsma)
251 {
252         struct coordinator *cdt = &mti->mti_mdt->mdt_coordinator;
253         struct lu_ucred *uc = mdt_ucred(mti);
254         struct md_attr *ma = &mti->mti_attr;
255         const __u64 *mask;
256         int rc;
257         ENTRY;
258
259         if (hsma != HSMA_RESTORE &&
260             exp_connect_flags(mti->mti_exp) & OBD_CONNECT_RDONLY)
261                 RETURN(-EROFS);
262
263         if (md_capable(uc, CFS_CAP_SYS_ADMIN))
264                 RETURN(0);
265
266         ma->ma_need = MA_INODE;
267         rc = mdt_attr_get_complex(mti, obj, ma);
268         if (rc < 0)
269                 RETURN(rc);
270
271         if (uc->uc_fsuid == ma->ma_attr.la_uid)
272                 mask = &cdt->cdt_user_request_mask;
273         else if (lustre_in_group_p(uc, ma->ma_attr.la_gid))
274                 mask = &cdt->cdt_group_request_mask;
275         else
276                 mask = &cdt->cdt_other_request_mask;
277
278         if (!(0 <= hsma && hsma < 8 * sizeof(*mask)))
279                 RETURN(-EINVAL);
280
281         RETURN(*mask & (1UL << hsma) ? 0 : -EPERM);
282 }
283
284 /*
285  * Coordinator external API
286  */
287
288 /**
289  * register a list of requests
290  * \param mti [IN]
291  * \param hal [IN] list of requests
292  * \param compound_id [OUT] id of the compound request
293  * \retval 0 success
294  * \retval -ve failure
295  * in case of restore, caller must hold layout lock
296  */
297 int mdt_hsm_add_actions(struct mdt_thread_info *mti,
298                         struct hsm_action_list *hal, __u64 *compound_id)
299 {
300         struct mdt_device       *mdt = mti->mti_mdt;
301         struct coordinator      *cdt = &mdt->mdt_coordinator;
302         struct hsm_action_item  *hai;
303         struct mdt_object       *obj = NULL;
304         int                      rc = 0, i;
305         struct md_hsm            mh;
306         bool                     is_restore = false;
307         ENTRY;
308
309         /* no coordinator started, so we cannot serve requests */
310         if (cdt->cdt_state == CDT_STOPPED)
311                 RETURN(-EAGAIN);
312
313         if (!hal_is_sane(hal))
314                 RETURN(-EINVAL);
315
316         *compound_id = atomic_inc_return(&cdt->cdt_compound_id);
317
318         /* search for compatible request, if found hai_cookie is set
319          * to the request cookie
320          * it is also used to set the cookie for cancel request by FID
321          */
322         rc = hsm_find_compatible(mti->mti_env, mdt, hal);
323         if (rc)
324                 GOTO(out, rc);
325
326         hai = hai_first(hal);
327         for (i = 0; i < hal->hal_count; i++, hai = hai_next(hai)) {
328                 int archive_id;
329                 __u64 flags;
330
331                 /* default archive number is the one explicitly specified */
332                 archive_id = hal->hal_archive_id;
333                 flags = hal->hal_flags;
334
335                 /* by default, data FID is same as Lustre FID */
336                 /* the volatile data FID will be created by copy tool and
337                  * send from the agent through the progress call */
338                 hai->hai_dfid = hai->hai_fid;
339
340                 /* done here to manage first and redundant requests cases */
341                 if (hai->hai_action == HSMA_RESTORE)
342                         is_restore = true;
343
344                 /* test result of hsm_find_compatible()
345                  * if request redundant or cancel of nothing
346                  * do not record
347                  */
348                 /* redundant case */
349                 if (hai->hai_action != HSMA_CANCEL && hai->hai_cookie != 0)
350                         continue;
351                 /* cancel nothing case */
352                 if (hai->hai_action == HSMA_CANCEL && hai->hai_cookie == 0)
353                         continue;
354
355                 /* new request or cancel request
356                  * we search for HSM status flags to check for compatibility
357                  * if restore, we take the layout lock
358                  */
359
360                 /* Get HSM attributes and check permissions. */
361                 obj = mdt_hsm_get_md_hsm(mti, &hai->hai_fid, &mh);
362                 if (IS_ERR(obj)) {
363                         /* In case of REMOVE and CANCEL a Lustre file
364                          * is not mandatory, but restrict this
365                          * exception to admins. */
366                         if (md_capable(mdt_ucred(mti), CFS_CAP_SYS_ADMIN) &&
367                             (hai->hai_action == HSMA_REMOVE ||
368                              hai->hai_action == HSMA_CANCEL))
369                                 goto record;
370                         else
371                                 GOTO(out, rc = PTR_ERR(obj));
372                 }
373
374                 rc = hsm_action_permission(mti, obj, hai->hai_action);
375                 mdt_object_put(mti->mti_env, obj);
376
377                 if (rc < 0)
378                         GOTO(out, rc);
379
380                 /* if action is cancel, also no need to check */
381                 if (hai->hai_action == HSMA_CANCEL)
382                         goto record;
383
384                 /* Check if an action is needed, compare request
385                  * and HSM flags status */
386                 if (!hsm_action_is_needed(hai, archive_id, flags, &mh))
387                         continue;
388
389                 /* Check if file request is compatible with HSM flags status
390                  * and stop at first incompatible
391                  */
392                 if (!mdt_hsm_is_action_compat(hai, archive_id, flags, &mh))
393                         GOTO(out, rc = -EPERM);
394
395                 /* for cancel archive number is taken from canceled request
396                  * for other request, we take from lma if not specified,
397                  * or we use the default if none found in lma
398                  * this works also for archive because the default value is 0
399                  * /!\ there is a side effect: in case of restore on multiple
400                  * files which are in different backend, the initial compound
401                  * request will be split in multiple requests because we cannot
402                  * warranty an agent can serve any combinaison of archive
403                  * backend
404                  */
405                 if (hai->hai_action != HSMA_CANCEL && archive_id == 0) {
406                         if (mh.mh_arch_id != 0)
407                                 archive_id = mh.mh_arch_id;
408                         else
409                                 archive_id = cdt->cdt_default_archive_id;
410                 }
411
412                 /* if restore, take an exclusive lock on layout */
413                 if (hai->hai_action == HSMA_RESTORE) {
414                         struct cdt_restore_handle *crh;
415
416                         /* in V1 only whole file is supported. */
417                         if (hai->hai_extent.offset != 0)
418                                 GOTO(out, rc = -EPROTO);
419
420                         OBD_SLAB_ALLOC_PTR(crh, mdt_hsm_cdt_kmem);
421                         if (crh == NULL)
422                                 GOTO(out, rc = -ENOMEM);
423
424                         crh->crh_fid = hai->hai_fid;
425                         /* in V1 only whole file is supported. However the
426                          * restore may be due to truncate. */
427                         crh->crh_extent.start = 0;
428                         crh->crh_extent.end = hai->hai_extent.length;
429
430                         mdt_lock_reg_init(&crh->crh_lh, LCK_EX);
431                         obj = mdt_object_find_lock(mti, &crh->crh_fid,
432                                                    &crh->crh_lh,
433                                                    MDS_INODELOCK_LAYOUT);
434                         if (IS_ERR(obj)) {
435                                 rc = PTR_ERR(obj);
436                                 CERROR("%s: cannot take layout lock for "
437                                        DFID": rc = %d\n", mdt_obd_name(mdt),
438                                        PFID(&crh->crh_fid), rc);
439                                 OBD_SLAB_FREE_PTR(crh, mdt_hsm_cdt_kmem);
440                                 GOTO(out, rc);
441                         }
442
443                         /* we choose to not keep a keep a reference
444                          * on the object during the restore time which can be
445                          * very long */
446                         mdt_object_put(mti->mti_env, obj);
447
448                         mutex_lock(&cdt->cdt_restore_lock);
449                         list_add_tail(&crh->crh_list, &cdt->cdt_restore_hdl);
450                         mutex_unlock(&cdt->cdt_restore_lock);
451                 }
452 record:
453                 /* record request */
454                 rc = mdt_agent_record_add(mti->mti_env, mdt, *compound_id,
455                                           archive_id, flags, hai);
456                 if (rc)
457                         GOTO(out, rc);
458         }
459         if (is_restore &&
460             (cdt->cdt_policy & CDT_NONBLOCKING_RESTORE))
461                 rc = -ENODATA;
462         else
463                 rc = 0;
464
465         GOTO(out, rc);
466 out:
467         /* if work has been added, wake up coordinator */
468         if (rc == 0 || rc == -ENODATA)
469                 mdt_hsm_cdt_wakeup(mdt);
470
471         return rc;
472 }
473
474 /**
475  * get running action on a FID list or from cookie
476  * \param mti [IN]
477  * \param hal [IN/OUT] requests
478  * \retval 0 success
479  * \retval -ve failure
480  */
481 int mdt_hsm_get_running(struct mdt_thread_info *mti,
482                         struct hsm_action_list *hal)
483 {
484         struct mdt_device       *mdt = mti->mti_mdt;
485         struct coordinator      *cdt = &mdt->mdt_coordinator;
486         struct hsm_action_item  *hai;
487         int                      i;
488         ENTRY;
489
490         hai = hai_first(hal);
491         for (i = 0; i < hal->hal_count; i++, hai = hai_next(hai)) {
492                 struct cdt_agent_req *car;
493
494                 if (!fid_is_sane(&hai->hai_fid))
495                         RETURN(-EINVAL);
496
497                 car = mdt_cdt_find_request(cdt, 0, &hai->hai_fid);
498                 if (car == NULL) {
499                         hai->hai_cookie = 0;
500                         hai->hai_action = HSMA_NONE;
501                 } else {
502                         *hai = *car->car_hai;
503                         mdt_cdt_put_request(car);
504                 }
505         }
506         RETURN(0);
507 }
508
509 /**
510  * check if a restore is running on a FID
511  * this is redundant with mdt_hsm_coordinator_get_running()
512  * but as it can be called frequently when getting attr
513  * we make an optimized/simpler version only for a FID
514  * \param mti [IN]
515  * \param fid [IN] file FID
516  * \retval boolean
517  */
518 bool mdt_hsm_restore_is_running(struct mdt_thread_info *mti,
519                                 const struct lu_fid *fid)
520 {
521         struct mdt_device               *mdt = mti->mti_mdt;
522         struct coordinator              *cdt = &mdt->mdt_coordinator;
523         struct cdt_restore_handle       *crh;
524         bool                             rc = false;
525         ENTRY;
526
527         if (!fid_is_sane(fid))
528                 RETURN(rc);
529
530         mutex_lock(&cdt->cdt_restore_lock);
531         list_for_each_entry(crh, &cdt->cdt_restore_hdl, crh_list) {
532                 if (lu_fid_eq(&crh->crh_fid, fid)) {
533                         rc = true;
534                         break;
535                 }
536         }
537         mutex_unlock(&cdt->cdt_restore_lock);
538         RETURN(rc);
539 }
540
541 /**
542  * get registered action on a FID list
543  * \param mti [IN]
544  * \param hal [IN/OUT] requests
545  * \retval 0 success
546  * \retval -ve failure
547  */
548 int mdt_hsm_get_actions(struct mdt_thread_info *mti,
549                         struct hsm_action_list *hal)
550 {
551         struct mdt_device       *mdt = mti->mti_mdt;
552         struct coordinator      *cdt = &mdt->mdt_coordinator;
553         struct hsm_action_item  *hai;
554         int                      i, rc;
555         ENTRY;
556
557         hai = hai_first(hal);
558         for (i = 0; i < hal->hal_count; i++, hai = hai_next(hai)) {
559                 hai->hai_action = HSMA_NONE;
560                 if (!fid_is_sane(&hai->hai_fid))
561                         RETURN(-EINVAL);
562         }
563
564         /* 1st we search in recorded requests */
565         rc = hsm_find_compatible(mti->mti_env, mdt, hal);
566         /* if llog file is not created, no action is recorded */
567         if (rc == -ENOENT)
568                 RETURN(0);
569
570         if (rc)
571                 RETURN(rc);
572
573         /* 2nd we search if the request are running
574          * cookie is cleared to tell to caller, the request is
575          * waiting
576          * we could in place use the record status, but in the future
577          * we may want do give back dynamic informations on the
578          * running request
579          */
580         hai = hai_first(hal);
581         for (i = 0; i < hal->hal_count; i++, hai = hai_next(hai)) {
582                 struct cdt_agent_req *car;
583
584                 car = mdt_cdt_find_request(cdt, hai->hai_cookie, NULL);
585                 if (car == NULL) {
586                         hai->hai_cookie = 0;
587                 } else {
588                         __u64 data_moved;
589
590                         mdt_cdt_get_work_done(car, &data_moved);
591                         /* this is just to give the volume of data moved
592                          * it means data_moved data have been moved from the
593                          * original request but we do not know which one
594                          */
595                         hai->hai_extent.length = data_moved;
596                         mdt_cdt_put_request(car);
597                 }
598         }
599
600         RETURN(0);
601 }
602