Whamcloud - gitweb
LU-15720 dne: add crush2 hash type
[fs/lustre-release.git] / lustre / mdt / mdt_restripe.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * This file is part of Lustre, http://www.lustre.org/
24  *
25  * lustre/mdt/mdt_restriper.c
26  *
27  * Lustre directory restripe and auto-split
28  */
29
30 #define DEBUG_SUBSYSTEM S_MDS
31
32 #include <linux/sched.h>
33 #include <linux/kthread.h>
34 #include "mdt_internal.h"
35
36 /* add directory into splitting list and wake up restripe thread */
37 void mdt_auto_split_add(struct mdt_thread_info *info, struct mdt_object *o)
38 {
39         struct mdt_device *mdt = info->mti_mdt;
40         struct mdt_dir_restriper *restriper = &mdt->mdt_restriper;
41
42         spin_lock(&restriper->mdr_lock);
43         if (mdt->mdt_enable_dir_auto_split && !o->mot_restriping) {
44                 o->mot_restriping = 1;
45                 mdt_object_get(NULL, o);
46                 LASSERT(list_empty(&o->mot_restripe_linkage));
47                 list_add_tail(&o->mot_restripe_linkage,
48                               &restriper->mdr_auto_splitting);
49
50                 CDEBUG(D_INFO, "add "DFID" into auto split list.\n",
51                        PFID(mdt_object_fid(o)));
52         }
53         spin_unlock(&restriper->mdr_lock);
54
55         wake_up_process(restriper->mdr_task);
56 }
57
58 void mdt_restripe_migrate_add(struct mdt_thread_info *info,
59                               struct mdt_object *o)
60 {
61         struct mdt_device *mdt = info->mti_mdt;
62         struct mdt_dir_restriper *restriper = &mdt->mdt_restriper;
63
64         spin_lock(&restriper->mdr_lock);
65         if (!o->mot_restriping) {
66                 o->mot_restriping = 1;
67                 o->mot_restripe_offset = 0;
68                 mdt_object_get(NULL, o);
69                 LASSERT(list_empty(&o->mot_restripe_linkage));
70                 list_add_tail(&o->mot_restripe_linkage,
71                               &restriper->mdr_migrating);
72
73                 CDEBUG(D_INFO, "add "DFID" into migrate list.\n",
74                        PFID(mdt_object_fid(o)));
75         }
76         spin_unlock(&restriper->mdr_lock);
77
78         wake_up_process(restriper->mdr_task);
79 }
80
81 void mdt_restripe_update_add(struct mdt_thread_info *info,
82                              struct mdt_object *o)
83 {
84         struct mdt_device *mdt = info->mti_mdt;
85         struct mdt_dir_restriper *restriper = &mdt->mdt_restriper;
86
87         spin_lock(&restriper->mdr_lock);
88         if (!o->mot_restriping) {
89                 /* update LMV */
90                 o->mot_restriping = 1;
91                 mdt_object_get(NULL, o);
92                 if (list_empty(&restriper->mdr_updating))
93                         restriper->mdr_update_time = ktime_get_real_seconds();
94                 LASSERT(list_empty(&o->mot_restripe_linkage));
95                 list_add_tail(&o->mot_restripe_linkage,
96                               &restriper->mdr_updating);
97
98                 CDEBUG(D_INFO, "add "DFID" into update list.\n",
99                        PFID(mdt_object_fid(o)));
100         }
101         spin_unlock(&restriper->mdr_lock);
102
103         wake_up_process(restriper->mdr_task);
104 }
105
106 static inline int mdt_fid_alloc(const struct lu_env *env,
107                                 struct mdt_device *mdt,
108                                 struct lu_fid *fid,
109                                 struct mdt_object *parent,
110                                 const struct lu_name *name)
111 {
112         struct lu_device *next = &mdt->mdt_child->md_lu_dev;
113         struct lu_object *o = lu_object_next(&parent->mot_obj);
114
115         return next->ld_ops->ldo_fid_alloc(env, next, fid, o, name);
116 }
117
118 static void mdt_auto_split_prep(struct mdt_thread_info *info,
119                                 struct md_op_spec *spec,
120                                 struct md_attr *ma,
121                                 u32 lum_stripe_count)
122 {
123         struct lu_attr *attr = &ma->ma_attr;
124         struct lmv_user_md_v1 *lum;
125
126         attr->la_ctime = attr->la_mtime = ktime_get_real_seconds();
127         attr->la_valid = LA_CTIME | LA_MTIME;
128
129         lum = &info->mti_mdt->mdt_restriper.mdr_lmv.lmv_user_md;
130         lum->lum_magic = cpu_to_le32(LMV_USER_MAGIC);
131         lum->lum_stripe_count = cpu_to_le32(lum_stripe_count);
132         lum->lum_stripe_offset = cpu_to_le32(LMV_OFFSET_DEFAULT);
133         lum->lum_hash_type = 0;
134
135         spec->u.sp_ea.eadatalen = sizeof(*lum);
136         spec->u.sp_ea.eadata = lum;
137         spec->sp_cr_flags = MDS_OPEN_HAS_EA;
138         spec->no_create = 0;
139         spec->sp_migrate_close = 0;
140 }
141
142 /* restripe directory: split or merge stripes */
143 int mdt_restripe_internal(struct mdt_thread_info *info,
144                           struct mdt_object *parent,
145                           struct mdt_object *child,
146                           const struct lu_name *lname,
147                           struct lu_fid *tfid,
148                           struct md_op_spec *spec,
149                           struct md_attr *ma)
150 {
151         const struct lu_env *env = info->mti_env;
152         struct mdt_device *mdt = info->mti_mdt;
153         struct lmv_user_md *lum = spec->u.sp_ea.eadata;
154         struct lmv_mds_md_v1 *lmv;
155         u32 lmv_stripe_count = 0;
156         int rc;
157
158         ENTRY;
159
160         rc = mdt_stripe_get(info, child, ma, XATTR_NAME_LMV);
161         if (rc)
162                 RETURN(rc);
163
164         if (ma->ma_valid & MA_LMV) {
165                 lmv = &ma->ma_lmv->lmv_md_v1;
166                 if (!lmv_is_sane(lmv))
167                         RETURN(-EBADF);
168
169                 /* don't allow restripe if dir layout is changing */
170                 if (lmv_is_layout_changing(lmv))
171                         RETURN(-EBUSY);
172
173                 /* check whether stripe count and hash unchanged */
174                 if (lum->lum_stripe_count == lmv->lmv_stripe_count &&
175                     lum->lum_hash_type == lmv->lmv_hash_type)
176                         RETURN(-EALREADY);
177
178                 lmv_stripe_count = le32_to_cpu(lmv->lmv_stripe_count);
179         } else if (le32_to_cpu(lum->lum_stripe_count) < 2) {
180                 /* stripe count unchanged for plain directory */
181                 RETURN(-EALREADY);
182         }
183
184         if (le32_to_cpu(lum->lum_stripe_count) > lmv_stripe_count) {
185                 /* split */
186                 struct md_layout_change *mlc = &info->mti_mlc;
187                 struct mdt_object *tobj = NULL;
188                 s64 mtime = ma->ma_attr.la_mtime;
189
190                 ma->ma_need = MA_INODE;
191                 ma->ma_valid = 0;
192                 rc = mdt_attr_get_complex(info, child, ma);
193                 if (rc)
194                         RETURN(rc);
195
196                 if (!(ma->ma_valid & MA_INODE))
197                         RETURN(-EBADF);
198
199                 /* mtime is from from client or set outside */
200                 ma->ma_attr.la_mtime = mtime;
201
202                 if (!lmv_stripe_count) {
203                         /* if child is plain directory, allocate @tobj as the
204                          * master object, and make child the first stripe of
205                          * @tobj.
206                          */
207                         tobj = mdt_object_new(env, mdt, tfid);
208                         if (unlikely(IS_ERR(tobj)))
209                                 RETURN(PTR_ERR(tobj));
210                 }
211
212                 mlc->mlc_opc = MD_LAYOUT_SPLIT;
213                 mlc->mlc_parent = mdt_object_child(parent);
214                 mlc->mlc_target = tobj ? mdt_object_child(tobj) : NULL;
215                 mlc->mlc_attr = &ma->ma_attr;
216                 mlc->mlc_name = lname;
217                 mlc->mlc_spec = spec;
218                 rc = mo_layout_change(env, mdt_object_child(child), mlc);
219                 if (!rc) {
220                         /* FID and attr need to be replied to client for manual
221                          * restripe.
222                          */
223                         ma->ma_need = MA_INODE;
224                         ma->ma_valid = 0;
225                         rc = mdt_attr_get_complex(info,
226                                         lmv_stripe_count ? child : tobj, ma);
227                 }
228                 if (tobj)
229                         mdt_object_put(env, tobj);
230                 else
231                         *tfid = *mdt_object_fid(child);
232         } else {
233                 /* merge only needs to override LMV */
234                 struct lu_buf *buf = &info->mti_buf;
235                 __u32 version;
236
237                 LASSERT(ma->ma_valid & MA_LMV);
238                 lmv = &ma->ma_lmv->lmv_md_v1;
239                 version = cpu_to_le32(lmv->lmv_layout_version);
240
241                 /* adjust 0 to 1 */
242                 if (lum->lum_stripe_count == 0)
243                         lum->lum_stripe_count = cpu_to_le32(1);
244
245                 lmv->lmv_hash_type |= cpu_to_le32(LMV_HASH_FLAG_MERGE |
246                                                   LMV_HASH_FLAG_MIGRATION);
247                 lmv->lmv_hash_type |= lum->lum_hash_type &
248                                       cpu_to_le32(LMV_HASH_FLAG_FIXED);
249                 lmv->lmv_merge_offset = lum->lum_stripe_count;
250                 lmv->lmv_merge_hash = lum->lum_hash_type;
251                 lmv->lmv_layout_version = cpu_to_le32(++version);
252
253                 buf->lb_buf = lmv;
254                 buf->lb_len = sizeof(*lmv);
255                 rc = mo_xattr_set(env, mdt_object_child(child), buf,
256                                   XATTR_NAME_LMV, LU_XATTR_REPLACE);
257                 if (rc)
258                         RETURN(rc);
259
260                 *tfid = *mdt_object_fid(child);
261                 ma->ma_need = MA_INODE;
262                 ma->ma_valid = 0;
263                 rc = mdt_attr_get_complex(info, child, ma);
264         }
265
266         RETURN(rc);
267 }
268
269 static int mdt_auto_split(struct mdt_thread_info *info)
270 {
271         const struct lu_env *env = info->mti_env;
272         struct mdt_device *mdt = info->mti_mdt;
273         struct mdt_dir_restriper *restriper = &mdt->mdt_restriper;
274         struct md_attr *ma = &info->mti_attr;
275         struct md_op_spec *spec = &info->mti_spec;
276         struct lu_name *lname = &info->mti_name;
277         struct lu_fid *fid = &info->mti_tmp_fid2;
278         struct mdt_object *parent = NULL;
279         struct mdt_object *child = NULL;
280         struct mdt_object *stripe = NULL;
281         struct ldlm_enqueue_info *einfo = &info->mti_einfo[0];
282         struct mdt_lock_handle *lhp;
283         struct mdt_lock_handle *lhc;
284         u32 lmv_stripe_count = 0;
285         u32 lum_stripe_count = 0;
286         int rc;
287
288         ENTRY;
289
290         if (!atomic_read(&mdt->mdt_mds_mds_conns))
291                 RETURN(-EINVAL);
292
293         spin_lock(&restriper->mdr_lock);
294         if (!list_empty(&restriper->mdr_auto_splitting)) {
295                 child = list_entry(restriper->mdr_auto_splitting.next,
296                                    typeof(*child), mot_restripe_linkage);
297                 list_del_init(&child->mot_restripe_linkage);
298         }
299         spin_unlock(&restriper->mdr_lock);
300
301         if (!child)
302                 RETURN(0);
303
304         LASSERT(child->mot_restriping);
305
306         rc = mdt_stripe_get(info, child, ma, XATTR_NAME_LMV);
307         if (rc)
308                 GOTO(out, rc);
309
310         if (ma->ma_valid & MA_LMV) {
311                 /* stripe dirent exceeds threshold, find its master object */
312                 struct lmv_mds_md_v1 *lmv = &ma->ma_lmv->lmv_md_v1;
313
314                 /* auto-split won't be done on striped directory master object
315                  * directly, because it's triggered when dirent count exceeds
316                  * threshold, however dirent count of master object is its
317                  * stripe count.
318                  */
319                 if (le32_to_cpu(lmv->lmv_magic) != LMV_MAGIC_STRIPE)
320                         GOTO(out, rc = -EINVAL);
321
322                 /* race with migrate? */
323                 if (lmv_hash_is_migrating(cpu_to_le32(lmv->lmv_hash_type)))
324                         GOTO(out, rc = -EBUSY);
325
326                 lmv_stripe_count = le32_to_cpu(lmv->lmv_stripe_count);
327
328                 /* save stripe to clear 'restriping' flag in the end to avoid
329                  * trigger auto-split multiple times.
330                  */
331                 stripe = child;
332                 child = NULL;
333
334                 /* get master object FID from linkea */
335                 rc = mdt_attr_get_pfid(info, stripe, &ma->ma_pfid);
336                 if (rc)
337                         GOTO(out, rc);
338
339                 child = mdt_object_find(env, mdt, &ma->ma_pfid);
340                 if (IS_ERR(child))
341                         GOTO(out, rc = PTR_ERR(child));
342
343                 spin_lock(&restriper->mdr_lock);
344                 if (child->mot_restriping) {
345                         /* race? */
346                         spin_unlock(&restriper->mdr_lock);
347                         GOTO(out, rc = -EBUSY);
348                 }
349                 child->mot_restriping = 1;
350                 spin_unlock(&restriper->mdr_lock);
351
352                 /* skip if master object is remote, let the first stripe
353                  * to start splitting because dir split needs to be done
354                  * on where master object is.
355                  */
356                 if (mdt_object_remote(child))
357                         GOTO(restriping_clear, rc = -EREMOTE);
358         }
359
360         /* striped directory split adds mdr_auto_split_delta stripes */
361         lum_stripe_count = min_t(unsigned int,
362                                 lmv_stripe_count +
363                                         mdt->mdt_restriper.mdr_dir_split_delta,
364                                 atomic_read(&mdt->mdt_mds_mds_conns) + 1);
365         if (lmv_stripe_count >= lum_stripe_count)
366                 GOTO(restriping_clear, rc = -EALREADY);
367
368         /* get dir name and parent FID */
369         rc = mdt_attr_get_pfid_name(info, child, fid, lname);
370         if (rc)
371                 GOTO(restriping_clear, rc);
372
373         /* copy name out because mti_linkea will be used later, and name should
374          * end with '\0'
375          */
376         memcpy(info->mti_filename, lname->ln_name, lname->ln_namelen);
377         info->mti_filename[lname->ln_namelen] = '\0';
378         lname->ln_name = info->mti_filename;
379         CDEBUG(D_INFO, "split "DFID"/"DNAME" to count %u (MDT count %d)\n",
380                PFID(fid), PNAME(lname), lum_stripe_count,
381                atomic_read(&mdt->mdt_mds_mds_conns) + 1);
382
383         parent = mdt_object_find(env, mdt, fid);
384         if (IS_ERR(parent))
385                 GOTO(restriping_clear, rc = PTR_ERR(parent));
386
387         rc = mdt_fid_alloc(env, mdt, fid, child, NULL);
388         if (rc < 0)
389                 GOTO(restriping_clear, rc);
390
391         lhp = &info->mti_lh[MDT_LH_PARENT];
392         mdt_lock_pdo_init(lhp, LCK_PW, lname);
393         rc = mdt_reint_object_lock(info, parent, lhp, MDS_INODELOCK_UPDATE,
394                                    true);
395         if (rc)
396                 GOTO(restriping_clear, rc);
397
398         lhc = &info->mti_lh[MDT_LH_CHILD];
399         mdt_lock_reg_init(lhc, LCK_EX);
400         if (mdt_object_remote(parent)) {
401                 /* enqueue object remote LOOKUP lock */
402                 rc = mdt_remote_object_lock(info, parent, mdt_object_fid(child),
403                                             &lhc->mlh_rreg_lh,
404                                             lhc->mlh_rreg_mode,
405                                             MDS_INODELOCK_LOOKUP, false);
406                 if (rc != ELDLM_OK)
407                         GOTO(unlock_parent, rc);
408         }
409
410         rc = mdt_reint_striped_lock(info, child, lhc, MDS_INODELOCK_FULL, einfo,
411                                     true);
412         if (rc)
413                 GOTO(unlock_child, rc);
414
415         mdt_auto_split_prep(info, spec, ma, lum_stripe_count);
416
417         rc = mdt_restripe_internal(info, parent, child, lname, fid, spec, ma);
418         EXIT;
419
420 unlock_child:
421         mdt_reint_striped_unlock(info, child, lhc, einfo, rc);
422 unlock_parent:
423         mdt_object_unlock(info, parent, lhp, rc);
424 restriping_clear:
425         child->mot_restriping = 0;
426         LASSERT(list_empty(&child->mot_restripe_linkage));
427 out:
428         /* -EALREADY:   dir is split already.
429          * -EBUSY:      dir is opened, or is splitting by others.
430          * -EREMOTE:    dir is remote.
431          */
432         if (rc && rc != -EALREADY && rc != -EBUSY && rc != -EREMOTE)
433                 CERROR("%s: split "DFID"/"DNAME" to count %u failed: rc = %d\n",
434                        mdt_obd_name(mdt), PFID(mdt_object_fid(child)),
435                        PNAME(lname), lum_stripe_count, rc);
436
437         if (!IS_ERR_OR_NULL(child))
438                 mdt_object_put(env, child);
439
440         if (stripe) {
441                 LASSERT(stripe->mot_restriping);
442                 LASSERT(list_empty(&stripe->mot_restripe_linkage));
443                 stripe->mot_restriping = 0;
444                 /* lock may not be taken, don't cache stripe LMV */
445                 mo_invalidate(env, mdt_object_child(stripe));
446                 mdt_object_put(env, stripe);
447         }
448
449         if (!IS_ERR_OR_NULL(parent))
450                 mdt_object_put(env, parent);
451
452         return rc;
453 }
454
455 /* sub-files under one stripe are migrated, clear MIGRATION flag in its LMV */
456 static int mdt_restripe_migrate_finish(struct mdt_thread_info *info,
457                                        struct mdt_object *stripe,
458                                        struct lmv_mds_md_v1 *lmv)
459 {
460         struct mdt_device *mdt = info->mti_mdt;
461         struct lu_buf buf;
462         struct mdt_lock_handle *lh;
463         int rc;
464
465         ENTRY;
466
467         LASSERT(le32_to_cpu(lmv->lmv_magic) == LMV_MAGIC_STRIPE);
468         LASSERT(lmv_is_restriping(lmv));
469
470         lmv->lmv_hash_type &= ~cpu_to_le32(LMV_HASH_FLAG_MIGRATION);
471         buf.lb_buf = lmv;
472         buf.lb_len = sizeof(*lmv);
473
474         lh = &info->mti_lh[MDT_LH_PARENT];
475         mdt_lock_reg_init(lh, LCK_EX);
476         rc = mdt_reint_object_lock(info, stripe, lh, MDS_INODELOCK_XATTR,
477                                    false);
478         if (!rc)
479                 rc = mo_xattr_set(info->mti_env, mdt_object_child(stripe), &buf,
480                                   XATTR_NAME_LMV, LU_XATTR_REPLACE);
481         mdt_object_unlock(info, stripe, lh, rc);
482         if (rc)
483                 CERROR("%s: update "DFID" LMV failed: rc = %d\n",
484                        mdt_obd_name(mdt), PFID(mdt_object_fid(stripe)), rc);
485
486         LASSERT(!list_empty(&stripe->mot_restripe_linkage));
487         LASSERT(stripe->mot_restriping);
488
489         spin_lock(&mdt->mdt_lock);
490         stripe->mot_restriping = 0;
491         list_del_init(&stripe->mot_restripe_linkage);
492         spin_unlock(&mdt->mdt_lock);
493
494         mdt_object_put(info->mti_env, stripe);
495
496         RETURN(rc);
497 }
498
499 static void mdt_restripe_migrate_prep(struct mdt_thread_info *info,
500                                       const struct lu_fid *fid1,
501                                       const struct lu_fid *fid2,
502                                       const struct lu_name *lname,
503                                       __u16 type,
504                                       const struct lmv_mds_md_v1 *lmv)
505 {
506         struct lu_attr *attr = &info->mti_attr.ma_attr;
507         struct mdt_reint_record *rr = &info->mti_rr;
508         struct md_op_spec *spec = &info->mti_spec;
509         struct lmv_user_md_v1 *lum;
510
511         attr->la_ctime = attr->la_mtime = ktime_get_real_seconds();
512         attr->la_valid = LA_CTIME | LA_MTIME;
513
514         rr->rr_fid1 = fid1;
515         rr->rr_fid2 = fid2;
516         rr->rr_name = *lname;
517
518         lum = &info->mti_mdt->mdt_restriper.mdr_lmv.lmv_user_md;
519         lum->lum_magic = cpu_to_le32(LMV_USER_MAGIC);
520         lum->lum_stripe_offset = cpu_to_le32(LMV_OFFSET_DEFAULT);
521         if (lmv_is_splitting(lmv)) {
522                 lum->lum_stripe_count = lmv->lmv_stripe_count;
523                 lum->lum_hash_type =
524                         lmv->lmv_hash_type & le32_to_cpu(LMV_HASH_TYPE_MASK);
525         } else if (lmv_is_merging(lmv)) {
526                 lum->lum_stripe_count = lmv->lmv_merge_offset;
527                 lum->lum_hash_type = lmv->lmv_merge_hash;
528         }
529
530         spec->u.sp_ea.eadatalen = sizeof(*lum);
531         spec->u.sp_ea.eadata = lum;
532         spec->sp_cr_flags = MDS_OPEN_HAS_EA;
533         spec->no_create = 0;
534         spec->sp_migrate_close = 0;
535         /* if 'nsonly' is set, don't migrate inode */
536         if (S_ISDIR(type))
537                 spec->sp_migrate_nsonly = 1;
538         else
539                 spec->sp_migrate_nsonly =
540                         info->mti_mdt->mdt_dir_restripe_nsonly;
541 }
542
543 /* migrate sub-file from @mdr_restripe_offset */
544 static int mdt_restripe_migrate(struct mdt_thread_info *info)
545 {
546         const struct lu_env *env = info->mti_env;
547         struct mdt_device *mdt = info->mti_mdt;
548         struct mdt_dir_restriper *restriper = &mdt->mdt_restriper;
549         struct mdt_object *stripe = NULL;
550         struct mdt_object *master = NULL;
551         struct md_attr *ma = &info->mti_attr;
552         struct lmv_mds_md_v1 *lmv;
553         struct lu_name *lname = &info->mti_name;
554         struct lu_rdpg *rdpg = &info->mti_u.rdpg.mti_rdpg;
555         struct lu_fid fid1;
556         struct lu_fid fid2;
557         struct lu_dirpage *dp;
558         struct lu_dirent *ent;
559         const char *name = NULL;
560         int namelen = 0;
561         __u16 type;
562         int idx = 0;
563         int len;
564         int rc;
565
566         ENTRY;
567
568         if (list_empty(&restriper->mdr_migrating))
569                 RETURN(0);
570
571         stripe = list_entry(restriper->mdr_migrating.next, typeof(*stripe),
572                             mot_restripe_linkage);
573
574         /* get master object FID and stripe name */
575         rc = mdt_attr_get_pfid_name(info, stripe, &fid1, lname);
576         if (rc)
577                 GOTO(out, rc);
578
579         snprintf(info->mti_filename, sizeof(info->mti_filename), DFID,
580                  PFID(mdt_object_fid(stripe)));
581         len = strlen(info->mti_filename) + 1;
582         if (len >= lname->ln_namelen)
583                 GOTO(out, rc = -EBADF);
584
585         while (len < lname->ln_namelen) {
586                 if (!isdigit(lname->ln_name[len]))
587                         GOTO(out, rc = -EBADF);
588
589                 idx = idx * 10 + lname->ln_name[len++] - '0';
590         };
591
592         /* check whether stripe is newly created in split */
593         rc = mdt_stripe_get(info, stripe, ma, XATTR_NAME_LMV);
594         if (rc)
595                 GOTO(out, rc);
596
597         if (!(ma->ma_valid & MA_LMV))
598                 GOTO(out, rc = -ENODATA);
599
600         lmv = &ma->ma_lmv->lmv_md_v1;
601         if (le32_to_cpu(lmv->lmv_magic) != LMV_MAGIC_STRIPE)
602                 GOTO(out, rc = -EBADF);
603
604         if (!lmv_is_restriping(lmv))
605                 GOTO(out, rc = -EINVAL);
606
607         if ((lmv_is_splitting(lmv) &&
608              idx >= le32_to_cpu(lmv->lmv_split_offset)) ||
609             (lmv_is_merging(lmv) &&
610              ((le32_to_cpu(lmv->lmv_hash_type) & LMV_HASH_TYPE_MASK) ==
611                                                  LMV_HASH_TYPE_CRUSH ||
612               (le32_to_cpu(lmv->lmv_hash_type) & LMV_HASH_TYPE_MASK) ==
613                                                  LMV_HASH_TYPE_CRUSH2) &&
614              idx < le32_to_cpu(lmv->lmv_merge_offset))) {
615                 /* new stripes doesn't need to migrate sub files in dir
616                  * split, neither for target stripes in dir merge if hash type
617                  * is CRUSH or CRUSH2.
618                  */
619                 rc = mdt_restripe_migrate_finish(info, stripe, lmv);
620                 RETURN(rc);
621         }
622
623         /* get sub file name @mot_restripe_offset.
624          * TODO: read one dirent instead of whole page.
625          */
626         rdpg->rp_hash = stripe->mot_restripe_offset;
627         rdpg->rp_count = PAGE_SIZE;
628         rdpg->rp_npages = 1;
629         rdpg->rp_attrs = LUDA_64BITHASH | LUDA_FID | LUDA_TYPE;
630         rdpg->rp_pages = &restriper->mdr_page;
631         rc = mo_readpage(env, mdt_object_child(stripe), rdpg);
632         if (rc < 0)
633                 GOTO(out, rc);
634
635         dp = page_address(restriper->mdr_page);
636         for (ent = lu_dirent_start(dp); ent; ent = lu_dirent_next(ent)) {
637                 LASSERT(le64_to_cpu(ent->lde_hash) >= rdpg->rp_hash);
638
639                 if (unlikely(!(le32_to_cpu(ent->lde_attrs) & LUDA_TYPE)))
640                         GOTO(out, rc = -EINVAL);
641
642                 namelen = le16_to_cpu(ent->lde_namelen);
643                 if (!namelen)
644                         continue;
645
646                 if (name_is_dot_or_dotdot(ent->lde_name, namelen))
647                         continue;
648
649                 name = ent->lde_name;
650                 type = lu_dirent_type_get(ent);
651                 break;
652         }
653
654         if (!name) {
655                 if (le64_to_cpu(dp->ldp_hash_end) == MDS_DIR_END_OFF) {
656                         rc = mdt_restripe_migrate_finish(info, stripe, lmv);
657                         RETURN(rc);
658                 }
659
660                 GOTO(out, rc = -EBADF);
661         }
662
663         /* copy name out because it should end with '\0' */
664         memcpy(info->mti_filename, name, namelen);
665         info->mti_filename[namelen] = '\0';
666         lname->ln_name = info->mti_filename;
667         lname->ln_namelen = namelen;
668
669         CDEBUG(D_INFO, "migrate "DFID"/"DNAME" type %ho\n",
670                PFID(&fid1), PNAME(lname), type);
671
672         master = mdt_object_find(env, mdt, &fid1);
673         if (IS_ERR(master))
674                 GOTO(out, rc = PTR_ERR(master));
675
676         rc = mdt_fid_alloc(env, mdt, &fid2, master, lname);
677         mdt_object_put(env, master);
678         if (rc < 0)
679                 GOTO(out, rc);
680
681         mdt_restripe_migrate_prep(info, &fid1, &fid2, lname, type, lmv);
682
683         rc = mdt_reint_migrate(info, NULL);
684         /* mti_big_buf is allocated in XATTR migration */
685         if (unlikely(info->mti_big_buf.lb_buf))
686                 lu_buf_free(&info->mti_big_buf);
687         if (rc == -EALREADY)
688                 rc = 0;
689         if (rc)
690                 GOTO(out, rc);
691
692         LASSERT(ent);
693         do {
694                 ent = lu_dirent_next(ent);
695                 if (!ent)
696                         break;
697
698                 namelen = le16_to_cpu(ent->lde_namelen);
699         } while (namelen == 0); /* Skip dummy record */
700
701         if (ent)
702                 stripe->mot_restripe_offset = le64_to_cpu(ent->lde_hash);
703         else
704                 stripe->mot_restripe_offset = le64_to_cpu(dp->ldp_hash_end);
705
706         EXIT;
707 out:
708         if (rc) {
709                 /* -EBUSY: file is opened by others */
710                 if (rc != -EBUSY)
711                         CERROR("%s: migrate "DFID"/"DNAME" failed: rc = %d\n",
712                                mdt_obd_name(mdt), PFID(&fid1), PNAME(lname),
713                                rc);
714
715                 spin_lock(&mdt->mdt_lock);
716                 stripe->mot_restriping = 0;
717                 list_del_init(&stripe->mot_restripe_linkage);
718                 spin_unlock(&mdt->mdt_lock);
719
720                 mdt_object_put(env, stripe);
721         }
722
723         return rc;
724 }
725
726 static inline bool mdt_restripe_update_pending(struct mdt_thread_info *info)
727 {
728         struct mdt_device *mdt = info->mti_mdt;
729
730         if (list_empty(&mdt->mdt_restriper.mdr_updating))
731                 return false;
732
733         return mdt->mdt_restriper.mdr_update_time < ktime_get_real_seconds();
734 }
735
736 static void mdt_restripe_layout_update_prep(struct mdt_thread_info *info,
737                                             const struct lu_fid *fid,
738                                             const struct lmv_mds_md_v1 *lmv)
739 {
740         struct lu_attr *attr = &info->mti_attr.ma_attr;
741         struct mdt_reint_record *rr = &info->mti_rr;
742         struct lmv_user_md_v1 *lum;
743
744         attr->la_ctime = attr->la_mtime = ktime_get_real_seconds();
745         attr->la_valid = LA_CTIME | LA_MTIME;
746
747         strncpy(info->mti_filename, XATTR_NAME_LMV,
748                 sizeof(info->mti_filename));
749
750         lum = &info->mti_mdt->mdt_restriper.mdr_lmv.lmv_user_md;
751         lum->lum_magic = cpu_to_le32(LMV_USER_MAGIC);
752         lum->lum_stripe_offset = cpu_to_le32(LMV_OFFSET_DEFAULT);
753         if (lmv_is_splitting(lmv)) {
754                 lum->lum_stripe_count = lmv->lmv_stripe_count;
755                 lum->lum_hash_type =
756                         lmv->lmv_hash_type & le32_to_cpu(LMV_HASH_TYPE_MASK);
757         } else if (lmv_is_merging(lmv)) {
758                 lum->lum_stripe_count = lmv->lmv_merge_offset;
759                 lum->lum_hash_type = lmv->lmv_merge_hash;
760         }
761
762         rr->rr_opcode = REINT_SETXATTR;
763         rr->rr_fid1 = fid;
764         rr->rr_name.ln_name = info->mti_filename;
765         rr->rr_name.ln_namelen = strlen(info->mti_filename);
766         rr->rr_eadata = lum;
767         rr->rr_eadatalen = sizeof(*lum);
768 }
769
770 static int mdt_restripe_layout_update(struct mdt_thread_info *info)
771 {
772         const struct lu_env *env = info->mti_env;
773         struct mdt_device *mdt = info->mti_mdt;
774         struct mdt_dir_restriper *restriper = &mdt->mdt_restriper;
775         struct md_attr *ma = &info->mti_attr;
776         struct lu_fid *fid = &info->mti_tmp_fid1;
777         struct mdt_object *master;
778         struct mdt_object *stripe;
779         struct lmv_mds_md_v1 *lmv;
780         int i;
781         int rc;
782
783         ENTRY;
784
785         if (list_empty(&restriper->mdr_updating))
786                 RETURN(0);
787
788         master = list_entry(restriper->mdr_updating.next, typeof(*master),
789                             mot_restripe_linkage);
790
791         rc = mdt_stripe_get(info, master, ma, XATTR_NAME_LMV);
792         if (rc)
793                 GOTO(out, rc);
794
795         if (!(ma->ma_valid & MA_LMV))
796                 GOTO(out, rc = -ENODATA);
797
798         lmv = &ma->ma_lmv->lmv_md_v1;
799         if (le32_to_cpu(lmv->lmv_magic) != LMV_MAGIC_V1)
800                 GOTO(out, rc = -EBADF);
801
802         if (!lmv_is_restriping(lmv))
803                 GOTO(out, rc = -EINVAL);
804
805         /* use different buffer to store stripe LMV */
806         ma->ma_lmv = &restriper->mdr_lmv;
807         ma->ma_lmv_size = sizeof(restriper->mdr_lmv);
808         for (i = 0; i < le32_to_cpu(lmv->lmv_stripe_count); i++) {
809                 fid_le_to_cpu(fid, &lmv->lmv_stripe_fids[i]);
810                 stripe = mdt_object_find(env, mdt, fid);
811                 if (IS_ERR(stripe))
812                         GOTO(out, rc = PTR_ERR(stripe));
813
814                 ma->ma_valid = 0;
815                 rc = __mdt_stripe_get(info, stripe, ma, XATTR_NAME_LMV);
816                 /* LMV is checked without lock, don't cache it */
817                 mo_invalidate(env, mdt_object_child(stripe));
818                 mdt_object_put(env, stripe);
819                 if (rc)
820                         GOTO(out, rc);
821
822                 if (!(ma->ma_valid & MA_LMV))
823                         GOTO(out, rc = -ENODATA);
824
825                 /* check MIGRATION flag cleared on all stripes */
826                 if (lmv_is_restriping(&ma->ma_lmv->lmv_md_v1))
827                         GOTO(out, rc = -EINPROGRESS);
828         }
829
830         mdt_restripe_layout_update_prep(info, mdt_object_fid(master), lmv);
831
832         rc = mdt_dir_layout_update(info);
833         if (rc) {
834                 CERROR("update "DFID" layout failed: rc = %d\n",
835                        PFID(mdt_object_fid(master)), rc);
836                 GOTO(out, rc);
837         }
838
839 out:
840         LASSERT(!list_empty(&master->mot_restripe_linkage));
841         if (rc == -EINPROGRESS) {
842                 restriper->mdr_update_time = ktime_get_real_seconds() + 5;
843         } else {
844                 spin_lock(&restriper->mdr_lock);
845                 master->mot_restriping = 0;
846                 list_del_init(&master->mot_restripe_linkage);
847                 spin_unlock(&restriper->mdr_lock);
848
849                 mdt_object_put(env, master);
850         }
851
852         return rc;
853 }
854
855 static int mdt_restriper_main(void *arg)
856 {
857         struct mdt_thread_info *info = arg;
858         struct mdt_device *mdt = info->mti_mdt;
859         struct mdt_dir_restriper *restriper = &mdt->mdt_restriper;
860
861         ENTRY;
862
863         while (({set_current_state(TASK_IDLE);
864                  !kthread_should_stop(); })) {
865                 if (!list_empty(&restriper->mdr_auto_splitting)) {
866                         __set_current_state(TASK_RUNNING);
867                         mdt_auto_split(info);
868                         cond_resched();
869                 } else if (mdt_restripe_update_pending(info)) {
870                         __set_current_state(TASK_RUNNING);
871                         mdt_restripe_layout_update(info);
872                         cond_resched();
873                 } else if (!list_empty(&restriper->mdr_migrating)) {
874                         __set_current_state(TASK_RUNNING);
875                         mdt_restripe_migrate(info);
876                         cond_resched();
877                 } else {
878                         schedule();
879                 }
880         }
881         __set_current_state(TASK_RUNNING);
882
883         RETURN(0);
884 }
885
886 int mdt_restriper_start(struct mdt_device *mdt)
887 {
888         struct mdt_dir_restriper *restriper = &mdt->mdt_restriper;
889         struct task_struct *task;
890         struct mdt_thread_info *info;
891         struct lu_ucred *uc;
892         int rc;
893
894         ENTRY;
895
896         spin_lock_init(&restriper->mdr_lock);
897         INIT_LIST_HEAD(&restriper->mdr_auto_splitting);
898         INIT_LIST_HEAD(&restriper->mdr_migrating);
899         INIT_LIST_HEAD(&restriper->mdr_updating);
900         restriper->mdr_dir_split_count = DIR_SPLIT_COUNT_DEFAULT;
901         restriper->mdr_dir_split_delta = DIR_SPLIT_DELTA_DEFAULT;
902
903         restriper->mdr_page = alloc_page(GFP_KERNEL);
904         if (!restriper->mdr_page)
905                 RETURN(-ENOMEM);
906
907         rc = lu_env_init(&restriper->mdr_env, LCT_MD_THREAD);
908         if (rc)
909                 GOTO(out_page, rc);
910
911         rc = lu_context_init(&restriper->mdr_session, LCT_SERVER_SESSION);
912         if (rc)
913                 GOTO(out_env, rc);
914
915         lu_context_enter(&restriper->mdr_session);
916         restriper->mdr_env.le_ses = &restriper->mdr_session;
917
918         info = lu_context_key_get(&restriper->mdr_env.le_ctx, &mdt_thread_key);
919         info->mti_env = &restriper->mdr_env;
920         info->mti_mdt = mdt;
921         info->mti_pill = NULL;
922         info->mti_dlm_req = NULL;
923
924         uc = mdt_ucred(info);
925         uc->uc_valid = UCRED_OLD;
926         uc->uc_o_uid = 0;
927         uc->uc_o_gid = 0;
928         uc->uc_o_fsuid = 0;
929         uc->uc_o_fsgid = 0;
930         uc->uc_uid = 0;
931         uc->uc_gid = 0;
932         uc->uc_fsuid = 0;
933         uc->uc_fsgid = 0;
934         uc->uc_suppgids[0] = -1;
935         uc->uc_suppgids[1] = -1;
936         uc->uc_cap = cap_combine(CAP_FS_SET, CAP_NFSD_SET);
937         uc->uc_umask = 0644;
938         uc->uc_ginfo = NULL;
939         uc->uc_identity = NULL;
940
941         task = kthread_create(mdt_restriper_main, info, "mdt_restriper_%03d",
942                               mdt_seq_site(mdt)->ss_node_id);
943         if (IS_ERR(task)) {
944                 rc = PTR_ERR(task);
945                 CERROR("%s: Can't start directory restripe thread: rc %d\n",
946                        mdt_obd_name(mdt), rc);
947                 GOTO(out_ses, rc);
948         }
949         restriper->mdr_task = task;
950         wake_up_process(task);
951
952         RETURN(0);
953
954 out_ses:
955         lu_context_exit(restriper->mdr_env.le_ses);
956         lu_context_fini(restriper->mdr_env.le_ses);
957 out_env:
958         lu_env_fini(&restriper->mdr_env);
959 out_page:
960         __free_page(restriper->mdr_page);
961
962         return rc;
963 }
964
965 void mdt_restriper_stop(struct mdt_device *mdt)
966 {
967         struct mdt_dir_restriper *restriper = &mdt->mdt_restriper;
968         struct lu_env *env = &restriper->mdr_env;
969         struct mdt_object *mo, *next;
970
971         if (!restriper->mdr_task)
972                 return;
973
974         kthread_stop(restriper->mdr_task);
975         restriper->mdr_task = NULL;
976
977         list_for_each_entry_safe(mo, next, &restriper->mdr_auto_splitting,
978                                  mot_restripe_linkage) {
979                 list_del_init(&mo->mot_restripe_linkage);
980                 mdt_object_put(env, mo);
981         }
982
983         list_for_each_entry_safe(mo, next, &restriper->mdr_migrating,
984                                  mot_restripe_linkage) {
985                 list_del_init(&mo->mot_restripe_linkage);
986                 mdt_object_put(env, mo);
987         }
988
989         list_for_each_entry_safe(mo, next, &restriper->mdr_updating,
990                                  mot_restripe_linkage) {
991                 list_del_init(&mo->mot_restripe_linkage);
992                 mdt_object_put(env, mo);
993         }
994
995         __free_page(restriper->mdr_page);
996
997         lu_context_exit(env->le_ses);
998         lu_context_fini(env->le_ses);
999         lu_env_fini(env);
1000 }