Whamcloud - gitweb
e799fa9ab3044610230f7455ea460bf54bd225cc
[fs/lustre-release.git] / lustre / cmm / cmm_split.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  */
30 /*
31  * This file is part of Lustre, http://www.lustre.org/
32  * Lustre is a trademark of Sun Microsystems, Inc.
33  *
34  * lustre/cmm/cmm_split.c
35  *
36  * Lustre splitting dir
37  *
38  * Author: Alex Thomas  <alex@clusterfs.com>
39  * Author: Wang Di      <wangdi@clusterfs.com>
40  * Author: Yury Umanets <umka@clusterfs.com>
41  */
42
43 #define DEBUG_SUBSYSTEM S_MDS
44
45 #include <obd_class.h>
46 #include <lustre_fid.h>
47 #include <lustre_mds.h>
48 #include <lustre/lustre_idl.h>
49 #include "cmm_internal.h"
50 #include "mdc_internal.h"
51
52 /**
53  * \addtogroup split
54  * @{
55  */
56 enum {
57         CMM_SPLIT_SIZE =  128 * 1024
58 };
59
60 /**
61  * This function checks if passed \a name come to correct server (local MDT).
62  *
63  * \param mp Parent directory
64  * \param name Name to lookup
65  * \retval  -ERESTART Let client know that dir was split and client needs to
66  * chose correct stripe.
67  */
68 int cmm_split_check(const struct lu_env *env, struct md_object *mp,
69                     const char *name)
70 {
71         struct cmm_device *cmm = cmm_obj2dev(md2cmm_obj(mp));
72         struct md_attr *ma = &cmm_env_info(env)->cmi_ma;
73         struct cml_object *clo = md2cml_obj(mp);
74         int rc, lmv_size;
75         ENTRY;
76
77         cmm_lprocfs_time_start(env);
78
79         /* Not split yet */
80         if (clo->clo_split == CMM_SPLIT_NONE ||
81             clo->clo_split == CMM_SPLIT_DENIED)
82                 GOTO(out, rc = 0);
83
84         lmv_size = CMM_MD_SIZE(cmm->cmm_tgt_count + 1);
85
86         /* Try to get the LMV EA */
87         memset(ma, 0, sizeof(*ma));
88
89         ma->ma_need = MA_LMV;
90         ma->ma_lmv_size = lmv_size;
91         OBD_ALLOC(ma->ma_lmv, lmv_size);
92         if (ma->ma_lmv == NULL)
93                 GOTO(out, rc = -ENOMEM);
94
95         /* Get LMV EA, Note: refresh valid here for getting LMV_EA */
96         rc = mo_attr_get(env, mp, ma);
97         if (rc)
98                 GOTO(cleanup, rc);
99
100         /* No LMV just return */
101         if (!(ma->ma_valid & MA_LMV)) {
102                 /* update split state if unknown */
103                 if (clo->clo_split == CMM_SPLIT_UNKNOWN)
104                         clo->clo_split = CMM_SPLIT_NONE;
105                 GOTO(cleanup, rc = 0);
106         }
107
108         /* Skip checking the slave dirs (mea_count is 0) */
109         if (ma->ma_lmv->mea_count != 0) {
110                 int idx;
111
112                 /**
113                  * This gets stripe by name to check the name belongs to master
114                  * dir, otherwise return the -ERESTART
115                  */
116                 idx = mea_name2idx(ma->ma_lmv, name, strlen(name));
117
118                 /**
119                  * When client does not know about split, it sends create() to
120                  * the master MDT and master replay back if directory is split.
121                  * So client should orward request to correct MDT. This
122                  * is why we check here if stripe zero or not. Zero stripe means
123                  * master stripe. If stripe calculated from name is not zero -
124                  * return -ERESTART.
125                  */
126                 if (idx != 0)
127                         rc = -ERESTART;
128
129                 /* update split state to DONE if unknown */
130                 if (clo->clo_split == CMM_SPLIT_UNKNOWN)
131                         clo->clo_split = CMM_SPLIT_DONE;
132         } else {
133                 /* split is denied for slave dir */
134                 clo->clo_split = CMM_SPLIT_DENIED;
135         }
136         EXIT;
137 cleanup:
138         OBD_FREE(ma->ma_lmv, lmv_size);
139 out:
140         cmm_lprocfs_time_end(env, cmm, LPROC_CMM_SPLIT_CHECK);
141         return rc;
142 }
143
144 /**
145  * Return preferable access mode to the caller taking into account the split
146  * case and the fact of existing not splittable dirs.
147  */
148 int cmm_split_access(const struct lu_env *env, struct md_object *mo,
149                      mdl_mode_t lm)
150 {
151         struct md_attr *ma = &cmm_env_info(env)->cmi_ma;
152         int rc, split;
153         ENTRY;
154
155         memset(ma, 0, sizeof(*ma));
156
157         /*
158          * Check only if we need protection from split.  If not - mdt handles
159          * other cases.
160          */
161         rc = cmm_split_expect(env, mo, ma, &split);
162         if (rc) {
163                 CERROR("Can't check for possible split, rc %d\n", rc);
164                 RETURN(MDL_MINMODE);
165         }
166
167         /*
168          * Do not take PDO lock on non-splittable objects if this is not PW,
169          * this should speed things up a bit.
170          */
171         if (split == CMM_SPLIT_DONE && lm != MDL_PW)
172                 RETURN(MDL_NL);
173
174         /* Protect splitting by exclusive lock. */
175         if (split == CMM_SPLIT_NEEDED && lm == MDL_PW)
176                 RETURN(MDL_EX);
177
178         /*
179          * Have no idea about lock mode, let it be what higher layer wants.
180          */
181         RETURN(MDL_MINMODE);
182 }
183
184 /**
185  * Check if split is expected for current thread.
186  *
187  * \param mo Directory to split.
188  * \param ma md attributes.
189  * \param split Flag to save split information.
190  */
191 int cmm_split_expect(const struct lu_env *env, struct md_object *mo,
192                      struct md_attr *ma, int *split)
193 {
194         struct cmm_device *cmm = cmm_obj2dev(md2cmm_obj(mo));
195         struct cml_object *clo = md2cml_obj(mo);
196         struct lu_fid root_fid;
197         int rc;
198         ENTRY;
199
200         if (clo->clo_split == CMM_SPLIT_DONE ||
201             clo->clo_split == CMM_SPLIT_DENIED) {
202                 *split = clo->clo_split;
203                 RETURN(0);
204         }
205         /* CMM_SPLIT_UNKNOWN case below */
206
207         /* No need to split root object. */
208         rc = cmm_child_ops(cmm)->mdo_root_get(env, cmm->cmm_child,
209                                               &root_fid);
210         if (rc)
211                 RETURN(rc);
212
213         if (lu_fid_eq(&root_fid, cmm2fid(md2cmm_obj(mo)))) {
214                 /* update split state */
215                 *split = clo->clo_split == CMM_SPLIT_DENIED;
216                 RETURN(0);
217         }
218
219         /*
220          * Assumption: ma_valid = 0 here, we only need get inode and lmv_size
221          * for this get_attr.
222          */
223         LASSERT(ma->ma_valid == 0);
224         ma->ma_need = MA_INODE | MA_LMV;
225         rc = mo_attr_get(env, mo, ma);
226         if (rc)
227                 RETURN(rc);
228
229         /* No need split for already split object */
230         if (ma->ma_valid & MA_LMV) {
231                 LASSERT(ma->ma_lmv_size > 0);
232                 *split = clo->clo_split = CMM_SPLIT_DONE;
233                 RETURN(0);
234         }
235
236         /* No need split for object whose size < CMM_SPLIT_SIZE */
237         if (ma->ma_attr.la_size < CMM_SPLIT_SIZE) {
238                 *split = clo->clo_split = CMM_SPLIT_NONE;
239                 RETURN(0);
240         }
241
242         *split = clo->clo_split = CMM_SPLIT_NEEDED;
243         RETURN(0);
244 }
245
246 struct cmm_object *cmm_object_find(const struct lu_env *env,
247                                    struct cmm_device *d,
248                                    const struct lu_fid *f)
249 {
250         return md2cmm_obj(md_object_find_slice(env, &d->cmm_md_dev, fid));
251 }
252
253 static inline void cmm_object_put(const struct lu_env *env,
254                                   struct cmm_object *o)
255 {
256         lu_object_put(env, &o->cmo_obj.mo_lu);
257 }
258
259 /**
260  * Allocate new FID on passed \a mc for slave object which is going to
261  * create there soon.
262  */
263 static int cmm_split_fid_alloc(const struct lu_env *env,
264                                struct cmm_device *cmm,
265                                struct mdc_device *mc,
266                                struct lu_fid *fid)
267 {
268         int rc;
269         ENTRY;
270
271         LASSERT(cmm != NULL && mc != NULL && fid != NULL);
272
273         cfs_down(&mc->mc_fid_sem);
274
275         /* Alloc new fid on \a mc. */
276         rc = obd_fid_alloc(mc->mc_desc.cl_exp, fid, NULL);
277         if (rc > 0)
278                 rc = 0;
279         cfs_up(&mc->mc_fid_sem);
280
281         RETURN(rc);
282 }
283
284 /**
285  * Allocate new slave object on passed \a mc.
286  */
287 static int cmm_split_slave_create(const struct lu_env *env,
288                                   struct cmm_device *cmm,
289                                   struct mdc_device *mc,
290                                   struct lu_fid *fid,
291                                   struct md_attr *ma,
292                                   struct lmv_stripe_md *lmv,
293                                   int lmv_size)
294 {
295         struct md_op_spec *spec = &cmm_env_info(env)->cmi_spec;
296         struct cmm_object *obj;
297         int rc;
298         ENTRY;
299
300         /* Allocate new fid and store it to @fid */
301         rc = cmm_split_fid_alloc(env, cmm, mc, fid);
302         if (rc) {
303                 CERROR("Can't alloc new fid on "LPU64
304                        ", rc %d\n", mc->mc_num, rc);
305                 RETURN(rc);
306         }
307
308         /* Allocate new object on @mc */
309         obj = cmm_object_find(env, cmm, fid);
310         if (IS_ERR(obj))
311                 RETURN(PTR_ERR(obj));
312
313         memset(spec, 0, sizeof *spec);
314         spec->u.sp_ea.fid = fid;
315         spec->u.sp_ea.eadata = lmv;
316         spec->u.sp_ea.eadatalen = lmv_size;
317         spec->sp_cr_flags |= MDS_CREATE_SLAVE_OBJ;
318         rc = mo_object_create(env, md_object_next(&obj->cmo_obj),
319                               spec, ma);
320         cmm_object_put(env, obj);
321         RETURN(rc);
322 }
323
324 /**
325  * Create so many slaves as number of stripes.
326  * This is called in split time before sending pages to slaves.
327  */
328 static int cmm_split_slaves_create(const struct lu_env *env,
329                                    struct md_object *mo,
330                                    struct md_attr *ma)
331 {
332         struct cmm_device    *cmm = cmm_obj2dev(md2cmm_obj(mo));
333         struct lu_fid        *lf  = cmm2fid(md2cmm_obj(mo));
334         struct lmv_stripe_md *slave_lmv = &cmm_env_info(env)->cmi_lmv;
335         struct mdc_device    *mc, *tmp;
336         struct lmv_stripe_md *lmv;
337         int i = 1, rc = 0;
338         ENTRY;
339
340         /* Init the split MEA */
341         lmv = ma->ma_lmv;
342         lmv->mea_master = cmm->cmm_local_num;
343         lmv->mea_magic = MEA_MAGIC_HASH_SEGMENT;
344         lmv->mea_count = cmm->cmm_tgt_count + 1;
345
346         /*
347          * Store master FID to local node idx number. Local node is always
348          * master and its stripe number if 0.
349          */
350         lmv->mea_ids[0] = *lf;
351
352         memset(slave_lmv, 0, sizeof *slave_lmv);
353         slave_lmv->mea_master = cmm->cmm_local_num;
354         slave_lmv->mea_magic = MEA_MAGIC_HASH_SEGMENT;
355         slave_lmv->mea_count = 0;
356
357         cfs_list_for_each_entry_safe(mc, tmp, &cmm->cmm_targets, mc_linkage) {
358                 rc = cmm_split_slave_create(env, cmm, mc, &lmv->mea_ids[i],
359                                             ma, slave_lmv, sizeof(*slave_lmv));
360                 if (rc)
361                         GOTO(cleanup, rc);
362                 i++;
363         }
364         EXIT;
365 cleanup:
366         return rc;
367 }
368
369 static inline int cmm_split_special_entry(struct lu_dirent *ent)
370 {
371         if (!strncmp(ent->lde_name, ".", le16_to_cpu(ent->lde_namelen)) ||
372             !strncmp(ent->lde_name, "..", le16_to_cpu(ent->lde_namelen)))
373                 return 1;
374         return 0;
375 }
376
377 /**
378  * Convert string to the lu_name structure.
379  */
380 static inline struct lu_name *cmm_name(const struct lu_env *env,
381                                        char *name, int buflen)
382 {
383         struct lu_name *lname;
384         struct cmm_thread_info *cmi;
385
386         LASSERT(buflen > 0);
387         LASSERT(name[buflen - 1] == '\0');
388
389         cmi = cmm_env_info(env);
390         lname = &cmi->cti_name;
391         lname->ln_name = name;
392         /* do NOT count the terminating '\0' of name for length */
393         lname->ln_namelen = buflen - 1;
394         return lname;
395 }
396
397 /**
398  * Helper for cmm_split_remove_page(). It removes one entry from local MDT.
399  * Do not corrupt byte order in page, it will be sent to remote MDT.
400  */
401 static int cmm_split_remove_entry(const struct lu_env *env,
402                                   struct md_object *mo,
403                                   struct lu_dirent *ent)
404 {
405         struct cmm_device *cmm = cmm_obj2dev(md2cmm_obj(mo));
406         struct cmm_thread_info *cmi;
407         struct md_attr *ma;
408         struct cmm_object *obj;
409         int is_dir, rc;
410         char *name;
411         struct lu_name *lname;
412         ENTRY;
413
414         if (cmm_split_special_entry(ent))
415                 RETURN(0);
416
417         fid_le_to_cpu(&cmm_env_info(env)->cmi_fid, &ent->lde_fid);
418         obj = cmm_object_find(env, cmm, &cmm_env_info(env)->cmi_fid);
419         if (IS_ERR(obj))
420                 RETURN(PTR_ERR(obj));
421
422         cmi = cmm_env_info(env);
423         ma = &cmi->cmi_ma;
424
425         if (lu_object_exists(&obj->cmo_obj.mo_lu) > 0)
426                 is_dir = S_ISDIR(lu_object_attr(&obj->cmo_obj.mo_lu));
427         else
428                 /**
429                  * \note These days only cross-ref dirs are possible, so for the
430                  * sake of simplicity, in split, we suppose that all cross-ref
431                  * names point to directory and do not do additional getattr to
432                  * remote MDT.
433                  */
434                 is_dir = 1;
435
436         OBD_ALLOC(name, le16_to_cpu(ent->lde_namelen) + 1);
437         if (!name)
438                 GOTO(cleanup, rc = -ENOMEM);
439
440         memcpy(name, ent->lde_name, le16_to_cpu(ent->lde_namelen));
441         lname = cmm_name(env, name, le16_to_cpu(ent->lde_namelen) + 1);
442         /**
443          * \note When split, no need update parent's ctime,
444          * and no permission check for name_remove.
445          */
446         ma->ma_attr.la_ctime = 0;
447         if (is_dir)
448                 ma->ma_attr.la_mode = S_IFDIR;
449         else
450                 ma->ma_attr.la_mode = 0;
451         ma->ma_attr.la_valid = LA_MODE;
452         ma->ma_valid = MA_INODE;
453
454         ma->ma_attr_flags |= MDS_PERM_BYPASS;
455         rc = mdo_name_remove(env, md_object_next(mo), lname, ma);
456         OBD_FREE(name, le16_to_cpu(ent->lde_namelen) + 1);
457         if (rc)
458                 GOTO(cleanup, rc);
459
460         /**
461          * \note For each entry transferred to the slave MDS we should know
462          * whether this object is dir or not. Therefore the highest bit of the
463          * hash is used to indicate that (it is unused for hash purposes anyway).
464          */
465         if (is_dir) {
466                 ent->lde_hash = le64_to_cpu(ent->lde_hash);
467                 ent->lde_hash = cpu_to_le64(ent->lde_hash | MAX_HASH_HIGHEST_BIT);
468         }
469         EXIT;
470 cleanup:
471         cmm_object_put(env, obj);
472         return rc;
473 }
474
475 /**
476  * Remove all entries from passed page.
477  * These entries are going to remote MDT and thus should be removed locally.
478  */
479 static int cmm_split_remove_page(const struct lu_env *env,
480                                  struct md_object *mo,
481                                  struct lu_rdpg *rdpg,
482                                  __u64 hash_end, __u32 *len)
483 {
484         struct lu_dirpage *dp;
485         struct lu_dirent  *ent;
486         int rc = 0;
487         ENTRY;
488
489         *len = 0;
490         cfs_kmap(rdpg->rp_pages[0]);
491         dp = page_address(rdpg->rp_pages[0]);
492         for (ent = lu_dirent_start(dp);
493              ent != NULL && le64_to_cpu(ent->lde_hash) < hash_end;
494              ent = lu_dirent_next(ent)) {
495                 rc = cmm_split_remove_entry(env, mo, ent);
496                 if (rc) {
497                         /*
498                          * XXX: Error handler to insert remove name back,
499                          * currently we assumed it will success anyway in
500                          * verfication test.
501                          */
502                         CERROR("Can not del %*.*s, rc %d\n",
503                                le16_to_cpu(ent->lde_namelen),
504                                le16_to_cpu(ent->lde_namelen),
505                                ent->lde_name, rc);
506                         GOTO(unmap, rc);
507                 }
508                 *len += lu_dirent_size(ent);
509         }
510
511         if (ent != lu_dirent_start(dp))
512                 *len += sizeof(struct lu_dirpage);
513         EXIT;
514 unmap:
515         cfs_kunmap(rdpg->rp_pages[0]);
516         return rc;
517 }
518
519 /**
520  * Send one page of entries to the slave MDT.
521  * This page contains entries to be created there.
522  */
523 static int cmm_split_send_page(const struct lu_env *env,
524                                struct md_object *mo,
525                                struct lu_rdpg *rdpg,
526                                struct lu_fid *fid, int len)
527 {
528         struct cmm_device *cmm = cmm_obj2dev(md2cmm_obj(mo));
529         struct cmm_object *obj;
530         int rc = 0;
531         ENTRY;
532
533         obj = cmm_object_find(env, cmm, fid);
534         if (IS_ERR(obj))
535                 RETURN(PTR_ERR(obj));
536
537         rc = mdc_send_page(cmm, env, md_object_next(&obj->cmo_obj),
538                            rdpg->rp_pages[0], len);
539         cmm_object_put(env, obj);
540         RETURN(rc);
541 }
542
543 /** Read one page of entries from local MDT. */
544 static int cmm_split_read_page(const struct lu_env *env,
545                                struct md_object *mo,
546                                struct lu_rdpg *rdpg)
547 {
548         int rc;
549         ENTRY;
550         memset(cfs_kmap(rdpg->rp_pages[0]), 0, CFS_PAGE_SIZE);
551         cfs_kunmap(rdpg->rp_pages[0]);
552         rc = mo_readpage(env, md_object_next(mo), rdpg);
553         RETURN(rc);
554 }
555
556 /**
557  * This function performs migration of each directory stripe to its MDS.
558  */
559 static int cmm_split_process_stripe(const struct lu_env *env,
560                                     struct md_object *mo,
561                                     struct lu_rdpg *rdpg,
562                                     struct lu_fid *lf,
563                                     __u64 end)
564 {
565         int rc, done = 0;
566         ENTRY;
567
568         LASSERT(rdpg->rp_npages == 1);
569         do {
570                 struct lu_dirpage *ldp;
571                 __u32 len = 0;
572
573                 /** - Read one page of entries from local MDT. */
574                 rc = cmm_split_read_page(env, mo, rdpg);
575                 if (rc) {
576                         CERROR("Error in readpage: %d\n", rc);
577                         RETURN(rc);
578                 }
579
580                 /** - Remove local entries which are going to remite MDT. */
581                 rc = cmm_split_remove_page(env, mo, rdpg, end, &len);
582                 if (rc) {
583                         CERROR("Error in remove stripe entries: %d\n", rc);
584                         RETURN(rc);
585                 }
586
587                 /**
588                  * - Send entries page to slave MDT and repeat while there are
589                  * more pages.
590                  */
591                 if (len > 0) {
592                         rc = cmm_split_send_page(env, mo, rdpg, lf, len);
593                         if (rc) {
594                                 CERROR("Error in sending page: %d\n", rc);
595                                 RETURN(rc);
596                         }
597                 }
598
599                 cfs_kmap(rdpg->rp_pages[0]);
600                 ldp = page_address(rdpg->rp_pages[0]);
601                 if (le64_to_cpu(ldp->ldp_hash_end) >= end)
602                         done = 1;
603
604                 rdpg->rp_hash = le64_to_cpu(ldp->ldp_hash_end);
605                 cfs_kunmap(rdpg->rp_pages[0]);
606         } while (!done);
607
608         RETURN(rc);
609 }
610
611 /**
612  * Directory scanner for split operation.
613  *
614  * It calculates hashes for names and organizes files to stripes.
615  */
616 static int cmm_split_process_dir(const struct lu_env *env,
617                                  struct md_object *mo,
618                                  struct md_attr *ma)
619 {
620         struct cmm_device *cmm = cmm_obj2dev(md2cmm_obj(mo));
621         struct lu_rdpg *rdpg = &cmm_env_info(env)->cmi_rdpg;
622         __u64 hash_segment;
623         int rc = 0, i;
624         ENTRY;
625
626         memset(rdpg, 0, sizeof *rdpg);
627         rdpg->rp_npages = CMM_SPLIT_PAGE_COUNT;
628         rdpg->rp_count  = CFS_PAGE_SIZE * rdpg->rp_npages;
629         rdpg->rp_pages  = cmm_env_info(env)->cmi_pages;
630
631         for (i = 0; i < rdpg->rp_npages; i++) {
632                 rdpg->rp_pages[i] = cfs_alloc_page(CFS_ALLOC_STD);
633                 if (rdpg->rp_pages[i] == NULL)
634                         GOTO(cleanup, rc = -ENOMEM);
635         }
636
637         hash_segment = MAX_HASH_SIZE;
638         /** Whole hash range is divided on segments by number of MDS-es. */
639         do_div(hash_segment, cmm->cmm_tgt_count + 1);
640         /**
641          * For each segment the cmm_split_process_stripe() is called to move
642          * entries on new server.
643          */
644         for (i = 1; i < cmm->cmm_tgt_count + 1; i++) {
645                 struct lu_fid *lf;
646                 __u64 hash_end;
647
648                 lf = &ma->ma_lmv->mea_ids[i];
649
650                 rdpg->rp_hash = i * hash_segment;
651                 if (i == cmm->cmm_tgt_count)
652                         hash_end = MAX_HASH_SIZE;
653                 else
654                         hash_end = rdpg->rp_hash + hash_segment;
655                 rc = cmm_split_process_stripe(env, mo, rdpg, lf, hash_end);
656                 if (rc) {
657                         CERROR("Error (rc = %d) while splitting for %d: fid="
658                                DFID", "LPX64":"LPX64"\n", rc, i, PFID(lf),
659                                rdpg->rp_hash, hash_end);
660                         GOTO(cleanup, rc);
661                 }
662         }
663         EXIT;
664 cleanup:
665         for (i = 0; i < rdpg->rp_npages; i++)
666                 if (rdpg->rp_pages[i] != NULL)
667                         cfs_free_page(rdpg->rp_pages[i]);
668         return rc;
669 }
670
671 /**
672  * Directory splitting.
673  *
674  * Big directory can be split eventually.
675  */
676 int cmm_split_dir(const struct lu_env *env, struct md_object *mo)
677 {
678         struct cmm_device *cmm = cmm_obj2dev(md2cmm_obj(mo));
679         struct md_attr    *ma = &cmm_env_info(env)->cmi_ma;
680         int                rc = 0, split;
681         struct lu_buf     *buf;
682         ENTRY;
683
684         cmm_lprocfs_time_start(env);
685
686         LASSERT(S_ISDIR(lu_object_attr(&mo->mo_lu)));
687         memset(ma, 0, sizeof(*ma));
688
689         /** - Step1: Checking whether the dir needs to be split. */
690         rc = cmm_split_expect(env, mo, ma, &split);
691         if (rc)
692                 GOTO(out, rc);
693
694         if (split != CMM_SPLIT_NEEDED) {
695                 /* No split is needed, caller may proceed with create. */
696                 GOTO(out, rc = 0);
697         }
698
699         /* Split should be done now, let's do it. */
700         CWARN("Dir "DFID" is going to split (size: "LPU64")\n",
701               PFID(lu_object_fid(&mo->mo_lu)), ma->ma_attr.la_size);
702
703         /**
704          * /note Disable transactions for split, since there will be so many trans in
705          * this one ops, conflict with current recovery design.
706          */
707         rc = cmm_upcall(env, &cmm->cmm_md_dev, MD_NO_TRANS, NULL);
708         if (rc) {
709                 CERROR("Can't disable trans for split, rc %d\n", rc);
710                 GOTO(out, rc);
711         }
712
713         /** - Step2: Prepare the md memory */
714         ma->ma_lmv_size = CMM_MD_SIZE(cmm->cmm_tgt_count + 1);
715         OBD_ALLOC(ma->ma_lmv, ma->ma_lmv_size);
716         if (ma->ma_lmv == NULL)
717                 GOTO(out, rc = -ENOMEM);
718
719         /** - Step3: Create slave objects and fill the ma->ma_lmv */
720         rc = cmm_split_slaves_create(env, mo, ma);
721         if (rc) {
722                 CERROR("Can't create slaves for split, rc %d\n", rc);
723                 GOTO(cleanup, rc);
724         }
725
726         /** - Step4: Scan and split the object. */
727         rc = cmm_split_process_dir(env, mo, ma);
728         if (rc) {
729                 CERROR("Can't scan and split, rc %d\n", rc);
730                 GOTO(cleanup, rc);
731         }
732
733         /** - Step5: Set mea to the master object. */
734         buf = cmm_buf_get(env, ma->ma_lmv, ma->ma_lmv_size);
735         rc = mo_xattr_set(env, md_object_next(mo), buf,
736                           MDS_LMV_MD_NAME, 0);
737         if (rc) {
738                 CERROR("Can't set MEA to master dir, " "rc %d\n", rc);
739                 GOTO(cleanup, rc);
740         }
741
742         /* set flag in cmm_object */
743         md2cml_obj(mo)->clo_split = CMM_SPLIT_DONE;
744
745         /**
746          * - Finally, split succeed, tell client to repeat opetartion on correct
747          * MDT.
748          */
749         CWARN("Dir "DFID" has been split\n", PFID(lu_object_fid(&mo->mo_lu)));
750         rc = -ERESTART;
751         EXIT;
752 cleanup:
753         OBD_FREE(ma->ma_lmv, ma->ma_lmv_size);
754 out:
755         cmm_lprocfs_time_end(env, cmm, LPROC_CMM_SPLIT);
756         return rc;
757 }
758 /** @} */