Whamcloud - gitweb
1d105fbbac0f3fc64bcb15e935ef635e7a896c1c
[fs/lustre-release.git] / lustre / cmm / cmm_split.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright  2008 Sun Microsystems, Inc. All rights reserved
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/cmm/cmm_split.c
37  *
38  * Lustre splitting dir
39  *
40  * Author: Alex Thomas  <alex@clusterfs.com>
41  * Author: Wang Di      <wangdi@clusterfs.com>
42  * Author: Yury Umanets <umka@clusterfs.com>
43  */
44
45 #ifndef EXPORT_SYMTAB
46 # define EXPORT_SYMTAB
47 #endif
48
49 #define DEBUG_SUBSYSTEM S_MDS
50
51 #include <obd_class.h>
52 #include <lustre_fid.h>
53 #include <lustre_mds.h>
54 #include <lustre/lustre_idl.h>
55 #include "cmm_internal.h"
56 #include "mdc_internal.h"
57
58 enum {
59         CMM_SPLIT_SIZE =  128 * 1024
60 };
61
62 /*
63  * This function checks if passed @name come to correct server (local MDT). If
64  * not - return -ERESTART and let client know that dir was split and client
65  * needs to chose correct stripe.
66  */
67 int cmm_split_check(const struct lu_env *env, struct md_object *mp,
68                     const char *name)
69 {
70         struct cmm_device *cmm = cmm_obj2dev(md2cmm_obj(mp));
71         struct md_attr *ma = &cmm_env_info(env)->cmi_ma;
72         struct cml_object *clo = md2cml_obj(mp);
73         int rc, lmv_size;
74         ENTRY;
75
76         cmm_lprocfs_time_start(env);
77
78         /* Not split yet */
79         if (clo->clo_split == CMM_SPLIT_NONE ||
80             clo->clo_split == CMM_SPLIT_DENIED)
81                 GOTO(out, rc = 0);
82
83         lmv_size = CMM_MD_SIZE(cmm->cmm_tgt_count + 1);
84
85         /* Try to get the LMV EA */
86         memset(ma, 0, sizeof(*ma));
87
88         ma->ma_need = MA_LMV;
89         ma->ma_lmv_size = lmv_size;
90         OBD_ALLOC(ma->ma_lmv, lmv_size);
91         if (ma->ma_lmv == NULL)
92                 GOTO(out, rc = -ENOMEM);
93
94         /* Get LMV EA, Note: refresh valid here for getting LMV_EA */
95         rc = mo_attr_get(env, mp, ma);
96         if (rc)
97                 GOTO(cleanup, rc);
98
99         /* No LMV just return */
100         if (!(ma->ma_valid & MA_LMV)) {
101                 /* update split state if unknown */
102                 if (clo->clo_split == CMM_SPLIT_UNKNOWN)
103                         clo->clo_split = CMM_SPLIT_NONE;
104                 GOTO(cleanup, rc = 0);
105         }
106
107         /* Skip checking the slave dirs (mea_count is 0) */
108         if (ma->ma_lmv->mea_count != 0) {
109                 int idx;
110
111                 /*
112                  * Get stripe by name to check the name belongs to master dir,
113                  * otherwise return the -ERESTART
114                  */
115                 idx = mea_name2idx(ma->ma_lmv, name, strlen(name));
116
117                 /*
118                  * Check if name came to correct MDT server. We suppose that if
119                  * client does not know about split, it sends create operation
120                  * to master MDT. And this is master job to say it that dir got
121                  * split and client should orward request to correct MDT. This
122                  * is why we check here if stripe zero or not. Zero stripe means
123                  * master stripe. If stripe calculated from name is not zero -
124                  * return -ERESTART.
125                  */
126                 if (idx != 0)
127                         rc = -ERESTART;
128
129                 /* update split state to DONE if unknown */
130                 if (clo->clo_split == CMM_SPLIT_UNKNOWN)
131                         clo->clo_split = CMM_SPLIT_DONE;
132         } else {
133                 /* split is denied for slave dir */
134                 clo->clo_split = CMM_SPLIT_DENIED;
135         }
136         EXIT;
137 cleanup:
138         OBD_FREE(ma->ma_lmv, lmv_size);
139 out:
140         cmm_lprocfs_time_end(env, cmm, LPROC_CMM_SPLIT_CHECK);
141         return rc;
142 }
143
144 /*
145  * Return preferable access mode to caller taking into account possible split
146  * and the fact of existing not splittable dirs in principle.
147  */
148 int cmm_split_access(const struct lu_env *env, struct md_object *mo,
149                      mdl_mode_t lm)
150 {
151         struct md_attr *ma = &cmm_env_info(env)->cmi_ma;
152         int rc, split;
153         ENTRY;
154
155         memset(ma, 0, sizeof(*ma));
156
157         /*
158          * Check only if we need protection from split.  If not - mdt handles
159          * other cases.
160          */
161         rc = cmm_split_expect(env, mo, ma, &split);
162         if (rc) {
163                 CERROR("Can't check for possible split, rc %d\n", rc);
164                 RETURN(MDL_MINMODE);
165         }
166
167         /*
168          * Do not take PDO lock on non-splittable objects if this is not PW,
169          * this should speed things up a bit.
170          */
171         if (split == CMM_SPLIT_DONE && lm != MDL_PW)
172                 RETURN(MDL_NL);
173
174         /* Protect splitting by exclusive lock. */
175         if (split == CMM_SPLIT_NEEDED && lm == MDL_PW)
176                 RETURN(MDL_EX);
177
178         /*
179          * Have no idea about lock mode, let it be what higher layer wants.
180          */
181         RETURN(MDL_MINMODE);
182 }
183
184 /* Check if split is expected for current thread. */
185 int cmm_split_expect(const struct lu_env *env, struct md_object *mo,
186                      struct md_attr *ma, int *split)
187 {
188         struct cmm_device *cmm = cmm_obj2dev(md2cmm_obj(mo));
189         struct cml_object *clo = md2cml_obj(mo);
190         struct lu_fid root_fid;
191         int rc;
192         ENTRY;
193
194         if (clo->clo_split == CMM_SPLIT_DONE ||
195             clo->clo_split == CMM_SPLIT_DENIED) {
196                 *split = clo->clo_split;
197                 RETURN(0);
198         }
199         /* CMM_SPLIT_UNKNOWN case below */
200
201         /* No need to split root object. */
202         rc = cmm_child_ops(cmm)->mdo_root_get(env, cmm->cmm_child,
203                                               &root_fid);
204         if (rc)
205                 RETURN(rc);
206
207         if (lu_fid_eq(&root_fid, cmm2fid(md2cmm_obj(mo)))) {
208                 /* update split state */
209                 *split = clo->clo_split == CMM_SPLIT_DENIED;
210                 RETURN(0);
211         }
212
213         /*
214          * Assumption: ma_valid = 0 here, we only need get inode and lmv_size
215          * for this get_attr.
216          */
217         LASSERT(ma->ma_valid == 0);
218         ma->ma_need = MA_INODE | MA_LMV;
219         rc = mo_attr_get(env, mo, ma);
220         if (rc)
221                 RETURN(rc);
222
223         /* No need split for already split object */
224         if (ma->ma_valid & MA_LMV) {
225                 LASSERT(ma->ma_lmv_size > 0);
226                 *split = clo->clo_split = CMM_SPLIT_DONE;
227                 RETURN(0);
228         }
229
230         /* No need split for object whose size < CMM_SPLIT_SIZE */
231         if (ma->ma_attr.la_size < CMM_SPLIT_SIZE) {
232                 *split = clo->clo_split = CMM_SPLIT_NONE;
233                 RETURN(0);
234         }
235
236         *split = clo->clo_split = CMM_SPLIT_NEEDED;
237         RETURN(0);
238 }
239
240 struct cmm_object *cmm_object_find(const struct lu_env *env,
241                                    struct cmm_device *d,
242                                    const struct lu_fid *f)
243 {
244         struct lu_object *o;
245         struct cmm_object *m;
246         ENTRY;
247
248         o = lu_object_find(env, d->cmm_md_dev.md_lu_dev.ld_site, f);
249         if (IS_ERR(o))
250                 m = (struct cmm_object *)o;
251         else
252                 m = lu2cmm_obj(lu_object_locate(o->lo_header,
253                                d->cmm_md_dev.md_lu_dev.ld_type));
254         RETURN(m);
255 }
256
257 static inline void cmm_object_put(const struct lu_env *env,
258                                   struct cmm_object *o)
259 {
260         lu_object_put(env, &o->cmo_obj.mo_lu);
261 }
262
263 /*
264  * Allocate new on passed @mc for slave object which is going to create there
265  * soon.
266  */
267 static int cmm_split_fid_alloc(const struct lu_env *env,
268                                struct cmm_device *cmm,
269                                struct mdc_device *mc,
270                                struct lu_fid *fid)
271 {
272         int rc;
273         ENTRY;
274
275         LASSERT(cmm != NULL && mc != NULL && fid != NULL);
276
277         down(&mc->mc_fid_sem);
278
279         /* Alloc new fid on @mc. */
280         rc = obd_fid_alloc(mc->mc_desc.cl_exp, fid, NULL);
281         if (rc > 0) {
282                 /* Setup FLD for new sequenceif needed. */
283                 rc = fld_client_create(cmm->cmm_fld, fid_seq(fid),
284                                        mc->mc_num, env);
285                 if (rc)
286                         CERROR("Can't create fld entry, rc %d\n", rc);
287         }
288         up(&mc->mc_fid_sem);
289
290         RETURN(rc);
291 }
292
293 /* Allocate new slave object on passed @mc */
294 static int cmm_split_slave_create(const struct lu_env *env,
295                                   struct cmm_device *cmm,
296                                   struct mdc_device *mc,
297                                   struct lu_fid *fid,
298                                   struct md_attr *ma,
299                                   struct lmv_stripe_md *lmv,
300                                   int lmv_size)
301 {
302         struct md_op_spec *spec = &cmm_env_info(env)->cmi_spec;
303         struct cmm_object *obj;
304         int rc;
305         ENTRY;
306
307         /* Allocate new fid and store it to @fid */
308         rc = cmm_split_fid_alloc(env, cmm, mc, fid);
309         if (rc) {
310                 CERROR("Can't alloc new fid on "LPU64
311                        ", rc %d\n", mc->mc_num, rc);
312                 RETURN(rc);
313         }
314
315         /* Allocate new object on @mc */
316         obj = cmm_object_find(env, cmm, fid);
317         if (IS_ERR(obj))
318                 RETURN(PTR_ERR(obj));
319
320         memset(spec, 0, sizeof *spec);
321         spec->u.sp_ea.fid = fid;
322         spec->u.sp_ea.eadata = lmv;
323         spec->u.sp_ea.eadatalen = lmv_size;
324         spec->sp_cr_flags |= MDS_CREATE_SLAVE_OBJ;
325         rc = mo_object_create(env, md_object_next(&obj->cmo_obj),
326                               spec, ma);
327         cmm_object_put(env, obj);
328         RETURN(rc);
329 }
330
331 /*
332  * Create so many slaves as number of stripes. This is called in split time
333  * before sending pages to slaves.
334  */
335 static int cmm_split_slaves_create(const struct lu_env *env,
336                                    struct md_object *mo,
337                                    struct md_attr *ma)
338 {
339         struct cmm_device    *cmm = cmm_obj2dev(md2cmm_obj(mo));
340         struct lu_fid        *lf  = cmm2fid(md2cmm_obj(mo));
341         struct lmv_stripe_md *slave_lmv = &cmm_env_info(env)->cmi_lmv;
342         struct mdc_device    *mc, *tmp;
343         struct lmv_stripe_md *lmv;
344         int i = 1, rc = 0;
345         ENTRY;
346
347         /* Init the split MEA */
348         lmv = ma->ma_lmv;
349         lmv->mea_master = cmm->cmm_local_num;
350         lmv->mea_magic = MEA_MAGIC_HASH_SEGMENT;
351         lmv->mea_count = cmm->cmm_tgt_count + 1;
352
353         /*
354          * Store master FID to local node idx number. Local node is always
355          * master and its stripe number if 0.
356          */
357         lmv->mea_ids[0] = *lf;
358
359         memset(slave_lmv, 0, sizeof *slave_lmv);
360         slave_lmv->mea_master = cmm->cmm_local_num;
361         slave_lmv->mea_magic = MEA_MAGIC_HASH_SEGMENT;
362         slave_lmv->mea_count = 0;
363
364         list_for_each_entry_safe(mc, tmp, &cmm->cmm_targets, mc_linkage) {
365                 rc = cmm_split_slave_create(env, cmm, mc, &lmv->mea_ids[i],
366                                             ma, slave_lmv, sizeof(*slave_lmv));
367                 if (rc)
368                         GOTO(cleanup, rc);
369                 i++;
370         }
371         EXIT;
372 cleanup:
373         return rc;
374 }
375
376 static inline int cmm_split_special_entry(struct lu_dirent *ent)
377 {
378         if (!strncmp(ent->lde_name, ".", le16_to_cpu(ent->lde_namelen)) ||
379             !strncmp(ent->lde_name, "..", le16_to_cpu(ent->lde_namelen)))
380                 return 1;
381         return 0;
382 }
383
384 static inline struct lu_name *cmm_name(const struct lu_env *env,
385                                        char *name, int buflen)
386 {
387         struct lu_name *lname;
388         struct cmm_thread_info *cmi;
389
390         LASSERT(buflen > 0);
391         LASSERT(name[buflen - 1] == '\0');
392
393         cmi = cmm_env_info(env);
394         lname = &cmi->cti_name;
395         lname->ln_name = name;
396         /* do NOT count the terminating '\0' of name for length */
397         lname->ln_namelen = buflen - 1;
398         return lname;
399 }
400
401 /*
402  * Remove one entry from local MDT. Do not corrupt byte order in page, it will
403  * be sent to remote MDT.
404  */
405 static int cmm_split_remove_entry(const struct lu_env *env,
406                                   struct md_object *mo,
407                                   struct lu_dirent *ent)
408 {
409         struct cmm_device *cmm = cmm_obj2dev(md2cmm_obj(mo));
410         struct cmm_thread_info *cmi;
411         struct md_attr *ma;
412         struct cmm_object *obj;
413         int is_dir, rc;
414         char *name;
415         struct lu_name *lname;
416         ENTRY;
417
418         if (cmm_split_special_entry(ent))
419                 RETURN(0);
420
421         fid_le_to_cpu(&cmm_env_info(env)->cmi_fid, &ent->lde_fid);
422         obj = cmm_object_find(env, cmm, &cmm_env_info(env)->cmi_fid);
423         if (IS_ERR(obj))
424                 RETURN(PTR_ERR(obj));
425
426         cmi = cmm_env_info(env);
427         ma = &cmi->cmi_ma;
428
429         if (lu_object_exists(&obj->cmo_obj.mo_lu) > 0)
430                 is_dir = S_ISDIR(lu_object_attr(&obj->cmo_obj.mo_lu));
431         else
432                 /*
433                  * XXX: These days only cross-ref dirs are possible, so for the
434                  * sake of simplicity, in split, we suppose that all cross-ref
435                  * names point to directory and do not do additional getattr to
436                  * remote MDT.
437                  */
438                 is_dir = 1;
439
440         OBD_ALLOC(name, le16_to_cpu(ent->lde_namelen) + 1);
441         if (!name)
442                 GOTO(cleanup, rc = -ENOMEM);
443
444         memcpy(name, ent->lde_name, le16_to_cpu(ent->lde_namelen));
445         lname = cmm_name(env, name, le16_to_cpu(ent->lde_namelen) + 1);
446         /*
447          * When split, no need update parent's ctime,
448          * and no permission check for name_remove.
449          */
450         ma->ma_attr.la_ctime = 0;
451         if (is_dir)
452                 ma->ma_attr.la_mode = S_IFDIR;
453         else
454                 ma->ma_attr.la_mode = 0;
455         ma->ma_attr.la_valid = LA_MODE;
456         ma->ma_valid = MA_INODE;
457
458         ma->ma_attr_flags |= MDS_PERM_BYPASS;
459         rc = mdo_name_remove(env, md_object_next(mo), lname, ma);
460         OBD_FREE(name, le16_to_cpu(ent->lde_namelen) + 1);
461         if (rc)
462                 GOTO(cleanup, rc);
463
464         /*
465          * This @ent will be transferred to slave MDS and insert there, so in
466          * the slave MDS, we should know whether this object is dir or not, so
467          * use the highest bit of the hash to indicate that (because we do not
468          * use highest bit of hash).
469          */
470         if (is_dir) {
471                 ent->lde_hash = le64_to_cpu(ent->lde_hash);
472                 ent->lde_hash = cpu_to_le64(ent->lde_hash | MAX_HASH_HIGHEST_BIT);
473         }
474         EXIT;
475 cleanup:
476         cmm_object_put(env, obj);
477         return rc;
478 }
479
480 /*
481  * Remove all entries from passed page. These entries are going to remote MDT
482  * and thus should be removed locally.
483  */
484 static int cmm_split_remove_page(const struct lu_env *env,
485                                  struct md_object *mo,
486                                  struct lu_rdpg *rdpg,
487                                  __u64 hash_end, __u32 *len)
488 {
489         struct lu_dirpage *dp;
490         struct lu_dirent  *ent;
491         int rc = 0;
492         ENTRY;
493
494         *len = 0;
495         cfs_kmap(rdpg->rp_pages[0]);
496         dp = page_address(rdpg->rp_pages[0]);
497         for (ent = lu_dirent_start(dp);
498              ent != NULL && le64_to_cpu(ent->lde_hash) < hash_end;
499              ent = lu_dirent_next(ent)) {
500                 rc = cmm_split_remove_entry(env, mo, ent);
501                 if (rc) {
502                         /*
503                          * XXX: Error handler to insert remove name back,
504                          * currently we assumed it will success anyway in
505                          * verfication test.
506                          */
507                         CERROR("Can not del %*.*s, rc %d\n",
508                                le16_to_cpu(ent->lde_namelen),
509                                le16_to_cpu(ent->lde_namelen),
510                                ent->lde_name, rc);
511                         GOTO(unmap, rc);
512                 }
513                 *len += lu_dirent_size(ent);
514         }
515
516         if (ent != lu_dirent_start(dp))
517                 *len += sizeof(struct lu_dirpage);
518         EXIT;
519 unmap:
520         cfs_kunmap(rdpg->rp_pages[0]);
521         return rc;
522 }
523
524 /* Send one page to remote MDT for creating entries there. */
525 static int cmm_split_send_page(const struct lu_env *env,
526                                struct md_object *mo,
527                                struct lu_rdpg *rdpg,
528                                struct lu_fid *fid, int len)
529 {
530         struct cmm_device *cmm = cmm_obj2dev(md2cmm_obj(mo));
531         struct cmm_object *obj;
532         int rc = 0;
533         ENTRY;
534
535         obj = cmm_object_find(env, cmm, fid);
536         if (IS_ERR(obj))
537                 RETURN(PTR_ERR(obj));
538
539         rc = mdc_send_page(cmm, env, md_object_next(&obj->cmo_obj),
540                            rdpg->rp_pages[0], len);
541         cmm_object_put(env, obj);
542         RETURN(rc);
543 }
544
545 /* Read one page of entries from local MDT. */
546 static int cmm_split_read_page(const struct lu_env *env,
547                                struct md_object *mo,
548                                struct lu_rdpg *rdpg)
549 {
550         int rc;
551         ENTRY;
552         memset(cfs_kmap(rdpg->rp_pages[0]), 0, CFS_PAGE_SIZE);
553         cfs_kunmap(rdpg->rp_pages[0]);
554         rc = mo_readpage(env, md_object_next(mo), rdpg);
555         RETURN(rc);
556 }
557
558 /*
559  * This function performs migration of all pages with entries which fit into one
560  * stripe and one hash segment.
561  */
562 static int cmm_split_process_stripe(const struct lu_env *env,
563                                     struct md_object *mo,
564                                     struct lu_rdpg *rdpg,
565                                     struct lu_fid *lf,
566                                     __u64 end)
567 {
568         int rc, done = 0;
569         ENTRY;
570
571         LASSERT(rdpg->rp_npages == 1);
572         do {
573                 struct lu_dirpage *ldp;
574                 __u32 len = 0;
575
576                 /* Read one page from local MDT. */
577                 rc = cmm_split_read_page(env, mo, rdpg);
578                 if (rc) {
579                         CERROR("Error in readpage: %d\n", rc);
580                         RETURN(rc);
581                 }
582
583                 /* Remove local entries which are going to remite MDT. */
584                 rc = cmm_split_remove_page(env, mo, rdpg, end, &len);
585                 if (rc) {
586                         CERROR("Error in remove stripe entries: %d\n", rc);
587                         RETURN(rc);
588                 }
589
590                 /* Send entries page to slave MDT. */
591                 if (len > 0) {
592                         rc = cmm_split_send_page(env, mo, rdpg, lf, len);
593                         if (rc) {
594                                 CERROR("Error in sending page: %d\n", rc);
595                                 RETURN(rc);
596                         }
597                 }
598
599                 cfs_kmap(rdpg->rp_pages[0]);
600                 ldp = page_address(rdpg->rp_pages[0]);
601                 if (le64_to_cpu(ldp->ldp_hash_end) >= end)
602                         done = 1;
603
604                 rdpg->rp_hash = le64_to_cpu(ldp->ldp_hash_end);
605                 cfs_kunmap(rdpg->rp_pages[0]);
606         } while (!done);
607
608         RETURN(rc);
609 }
610
611 static int cmm_split_process_dir(const struct lu_env *env,
612                                  struct md_object *mo,
613                                  struct md_attr *ma)
614 {
615         struct cmm_device *cmm = cmm_obj2dev(md2cmm_obj(mo));
616         struct lu_rdpg *rdpg = &cmm_env_info(env)->cmi_rdpg;
617         __u64 hash_segment;
618         int rc = 0, i;
619         ENTRY;
620
621         memset(rdpg, 0, sizeof *rdpg);
622         rdpg->rp_npages = CMM_SPLIT_PAGE_COUNT;
623         rdpg->rp_count  = CFS_PAGE_SIZE * rdpg->rp_npages;
624         rdpg->rp_pages  = cmm_env_info(env)->cmi_pages;
625
626         for (i = 0; i < rdpg->rp_npages; i++) {
627                 rdpg->rp_pages[i] = cfs_alloc_page(CFS_ALLOC_STD);
628                 if (rdpg->rp_pages[i] == NULL)
629                         GOTO(cleanup, rc = -ENOMEM);
630         }
631
632         hash_segment = MAX_HASH_SIZE;
633         do_div(hash_segment, cmm->cmm_tgt_count + 1);
634         for (i = 1; i < cmm->cmm_tgt_count + 1; i++) {
635                 struct lu_fid *lf;
636                 __u64 hash_end;
637
638                 lf = &ma->ma_lmv->mea_ids[i];
639
640                 rdpg->rp_hash = i * hash_segment;
641                 if (i == cmm->cmm_tgt_count)
642                         hash_end = MAX_HASH_SIZE;
643                 else
644                         hash_end = rdpg->rp_hash + hash_segment;
645                 rc = cmm_split_process_stripe(env, mo, rdpg, lf, hash_end);
646                 if (rc) {
647                         CERROR("Error (rc = %d) while splitting for %d: fid="
648                                DFID", "LPX64":"LPX64"\n", rc, i, PFID(lf),
649                                rdpg->rp_hash, hash_end);
650                         GOTO(cleanup, rc);
651                 }
652         }
653         EXIT;
654 cleanup:
655         for (i = 0; i < rdpg->rp_npages; i++)
656                 if (rdpg->rp_pages[i] != NULL)
657                         __cfs_free_page(rdpg->rp_pages[i]);
658         return rc;
659 }
660
661 int cmm_split_dir(const struct lu_env *env, struct md_object *mo)
662 {
663         struct cmm_device *cmm = cmm_obj2dev(md2cmm_obj(mo));
664         struct md_attr    *ma = &cmm_env_info(env)->cmi_ma;
665         int                rc = 0, split;
666         struct lu_buf     *buf;
667         ENTRY;
668
669         cmm_lprocfs_time_start(env);
670
671         LASSERT(S_ISDIR(lu_object_attr(&mo->mo_lu)));
672         memset(ma, 0, sizeof(*ma));
673
674         /* Step1: Checking whether the dir needs to be split. */
675         rc = cmm_split_expect(env, mo, ma, &split);
676         if (rc)
677                 GOTO(out, rc);
678
679         if (split != CMM_SPLIT_NEEDED) {
680                 /* No split is needed, caller may proceed with create. */
681                 GOTO(out, rc = 0);
682         }
683
684         /* Split should be done now, let's do it. */
685         CWARN("Dir "DFID" is going to split (size: "LPU64")\n",
686               PFID(lu_object_fid(&mo->mo_lu)), ma->ma_attr.la_size);
687
688         /*
689          * Disable transacrions for split, since there will be so many trans in
690          * this one ops, conflict with current recovery design.
691          */
692         rc = cmm_upcall(env, &cmm->cmm_md_dev, MD_NO_TRANS);
693         if (rc) {
694                 CERROR("Can't disable trans for split, rc %d\n", rc);
695                 GOTO(out, rc);
696         }
697
698         /* Step2: Prepare the md memory */
699         ma->ma_lmv_size = CMM_MD_SIZE(cmm->cmm_tgt_count + 1);
700         OBD_ALLOC(ma->ma_lmv, ma->ma_lmv_size);
701         if (ma->ma_lmv == NULL)
702                 GOTO(out, rc = -ENOMEM);
703
704         /* Step3: Create slave objects and fill the ma->ma_lmv */
705         rc = cmm_split_slaves_create(env, mo, ma);
706         if (rc) {
707                 CERROR("Can't create slaves for split, rc %d\n", rc);
708                 GOTO(cleanup, rc);
709         }
710
711         /* Step4: Scan and split the object. */
712         rc = cmm_split_process_dir(env, mo, ma);
713         if (rc) {
714                 CERROR("Can't scan and split, rc %d\n", rc);
715                 GOTO(cleanup, rc);
716         }
717
718         /* Step5: Set mea to the master object. */
719         buf = cmm_buf_get(env, ma->ma_lmv, ma->ma_lmv_size);
720         rc = mo_xattr_set(env, md_object_next(mo), buf,
721                           MDS_LMV_MD_NAME, 0);
722         if (rc) {
723                 CERROR("Can't set MEA to master dir, " "rc %d\n", rc);
724                 GOTO(cleanup, rc);
725         }
726
727         /* set flag in cmm_object */
728         md2cml_obj(mo)->clo_split = CMM_SPLIT_DONE;
729
730         /*
731          * Finally, split succeed, tell client to repeat opetartion on correct
732          * MDT.
733          */
734         CWARN("Dir "DFID" has been split\n", PFID(lu_object_fid(&mo->mo_lu)));
735         rc = -ERESTART;
736         EXIT;
737 cleanup:
738         OBD_FREE(ma->ma_lmv, ma->ma_lmv_size);
739 out:
740         cmm_lprocfs_time_end(env, cmm, LPROC_CMM_SPLIT);
741         return rc;
742 }