Whamcloud - gitweb
8c047df00875aaac08b8a19123d6b9a65dcc23c3
[fs/lustre-release.git] / lustre / cmm / cmm_split.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/cmm/cmm_split.c
37  *
38  * Lustre splitting dir
39  *
40  * Author: Alex Thomas  <alex@clusterfs.com>
41  * Author: Wang Di      <wangdi@clusterfs.com>
42  * Author: Yury Umanets <umka@clusterfs.com>
43  */
44
45 #ifndef EXPORT_SYMTAB
46 # define EXPORT_SYMTAB
47 #endif
48
49 #define DEBUG_SUBSYSTEM S_MDS
50
51 #include <obd_class.h>
52 #include <lustre_fid.h>
53 #include <lustre_mds.h>
54 #include <lustre/lustre_idl.h>
55 #include "cmm_internal.h"
56 #include "mdc_internal.h"
57
58 enum {
59         CMM_SPLIT_SIZE =  128 * 1024
60 };
61
62 /*
63  * This function checks if passed @name come to correct server (local MDT). If
64  * not - return -ERESTART and let client know that dir was split and client
65  * needs to chose correct stripe.
66  */
67 int cmm_split_check(const struct lu_env *env, struct md_object *mp,
68                     const char *name)
69 {
70         struct cmm_device *cmm = cmm_obj2dev(md2cmm_obj(mp));
71         struct md_attr *ma = &cmm_env_info(env)->cmi_ma;
72         struct cml_object *clo = md2cml_obj(mp);
73         int rc, lmv_size;
74         ENTRY;
75
76         cmm_lprocfs_time_start(env);
77
78         /* Not split yet */
79         if (clo->clo_split == CMM_SPLIT_NONE ||
80             clo->clo_split == CMM_SPLIT_DENIED)
81                 GOTO(out, rc = 0);
82
83         lmv_size = CMM_MD_SIZE(cmm->cmm_tgt_count + 1);
84
85         /* Try to get the LMV EA */
86         memset(ma, 0, sizeof(*ma));
87
88         ma->ma_need = MA_LMV;
89         ma->ma_lmv_size = lmv_size;
90         OBD_ALLOC(ma->ma_lmv, lmv_size);
91         if (ma->ma_lmv == NULL)
92                 GOTO(out, rc = -ENOMEM);
93
94         /* Get LMV EA, Note: refresh valid here for getting LMV_EA */
95         rc = mo_attr_get(env, mp, ma);
96         if (rc)
97                 GOTO(cleanup, rc);
98
99         /* No LMV just return */
100         if (!(ma->ma_valid & MA_LMV)) {
101                 /* update split state if unknown */
102                 if (clo->clo_split == CMM_SPLIT_UNKNOWN)
103                         clo->clo_split = CMM_SPLIT_NONE;
104                 GOTO(cleanup, rc = 0);
105         }
106
107         /* Skip checking the slave dirs (mea_count is 0) */
108         if (ma->ma_lmv->mea_count != 0) {
109                 int idx;
110
111                 /*
112                  * Get stripe by name to check the name belongs to master dir,
113                  * otherwise return the -ERESTART
114                  */
115                 idx = mea_name2idx(ma->ma_lmv, name, strlen(name));
116
117                 /*
118                  * Check if name came to correct MDT server. We suppose that if
119                  * client does not know about split, it sends create operation
120                  * to master MDT. And this is master job to say it that dir got
121                  * split and client should orward request to correct MDT. This
122                  * is why we check here if stripe zero or not. Zero stripe means
123                  * master stripe. If stripe calculated from name is not zero -
124                  * return -ERESTART.
125                  */
126                 if (idx != 0)
127                         rc = -ERESTART;
128
129                 /* update split state to DONE if unknown */
130                 if (clo->clo_split == CMM_SPLIT_UNKNOWN)
131                         clo->clo_split = CMM_SPLIT_DONE;
132         } else {
133                 /* split is denied for slave dir */
134                 clo->clo_split = CMM_SPLIT_DENIED;
135         }
136         EXIT;
137 cleanup:
138         OBD_FREE(ma->ma_lmv, lmv_size);
139 out:
140         cmm_lprocfs_time_end(env, cmm, LPROC_CMM_SPLIT_CHECK);
141         return rc;
142 }
143
144 /*
145  * Return preferable access mode to caller taking into account possible split
146  * and the fact of existing not splittable dirs in principle.
147  */
148 int cmm_split_access(const struct lu_env *env, struct md_object *mo,
149                      mdl_mode_t lm)
150 {
151         struct md_attr *ma = &cmm_env_info(env)->cmi_ma;
152         int rc, split;
153         ENTRY;
154
155         memset(ma, 0, sizeof(*ma));
156
157         /*
158          * Check only if we need protection from split.  If not - mdt handles
159          * other cases.
160          */
161         rc = cmm_split_expect(env, mo, ma, &split);
162         if (rc) {
163                 CERROR("Can't check for possible split, rc %d\n", rc);
164                 RETURN(MDL_MINMODE);
165         }
166
167         /*
168          * Do not take PDO lock on non-splittable objects if this is not PW,
169          * this should speed things up a bit.
170          */
171         if (split == CMM_SPLIT_DONE && lm != MDL_PW)
172                 RETURN(MDL_NL);
173
174         /* Protect splitting by exclusive lock. */
175         if (split == CMM_SPLIT_NEEDED && lm == MDL_PW)
176                 RETURN(MDL_EX);
177
178         /*
179          * Have no idea about lock mode, let it be what higher layer wants.
180          */
181         RETURN(MDL_MINMODE);
182 }
183
184 /* Check if split is expected for current thread. */
185 int cmm_split_expect(const struct lu_env *env, struct md_object *mo,
186                      struct md_attr *ma, int *split)
187 {
188         struct cmm_device *cmm = cmm_obj2dev(md2cmm_obj(mo));
189         struct cml_object *clo = md2cml_obj(mo);
190         struct lu_fid root_fid;
191         int rc;
192         ENTRY;
193
194         if (clo->clo_split == CMM_SPLIT_DONE ||
195             clo->clo_split == CMM_SPLIT_DENIED) {
196                 *split = clo->clo_split;
197                 RETURN(0);
198         }
199         /* CMM_SPLIT_UNKNOWN case below */
200
201         /* No need to split root object. */
202         rc = cmm_child_ops(cmm)->mdo_root_get(env, cmm->cmm_child,
203                                               &root_fid);
204         if (rc)
205                 RETURN(rc);
206
207         if (lu_fid_eq(&root_fid, cmm2fid(md2cmm_obj(mo)))) {
208                 /* update split state */
209                 *split = clo->clo_split == CMM_SPLIT_DENIED;
210                 RETURN(0);
211         }
212
213         /*
214          * Assumption: ma_valid = 0 here, we only need get inode and lmv_size
215          * for this get_attr.
216          */
217         LASSERT(ma->ma_valid == 0);
218         ma->ma_need = MA_INODE | MA_LMV;
219         rc = mo_attr_get(env, mo, ma);
220         if (rc)
221                 RETURN(rc);
222
223         /* No need split for already split object */
224         if (ma->ma_valid & MA_LMV) {
225                 LASSERT(ma->ma_lmv_size > 0);
226                 *split = clo->clo_split = CMM_SPLIT_DONE;
227                 RETURN(0);
228         }
229
230         /* No need split for object whose size < CMM_SPLIT_SIZE */
231         if (ma->ma_attr.la_size < CMM_SPLIT_SIZE) {
232                 *split = clo->clo_split = CMM_SPLIT_NONE;
233                 RETURN(0);
234         }
235
236         *split = clo->clo_split = CMM_SPLIT_NEEDED;
237         RETURN(0);
238 }
239
240 struct cmm_object *cmm_object_find(const struct lu_env *env,
241                                    struct cmm_device *d,
242                                    const struct lu_fid *f)
243 {
244         return md2cmm_obj(md_object_find_slice(env, &d->cmm_md_dev, fid));
245 }
246
247 static inline void cmm_object_put(const struct lu_env *env,
248                                   struct cmm_object *o)
249 {
250         lu_object_put(env, &o->cmo_obj.mo_lu);
251 }
252
253 /*
254  * Allocate new on passed @mc for slave object which is going to create there
255  * soon.
256  */
257 static int cmm_split_fid_alloc(const struct lu_env *env,
258                                struct cmm_device *cmm,
259                                struct mdc_device *mc,
260                                struct lu_fid *fid)
261 {
262         int rc;
263         ENTRY;
264
265         LASSERT(cmm != NULL && mc != NULL && fid != NULL);
266
267         cfs_down(&mc->mc_fid_sem);
268
269         /* Alloc new fid on @mc. */
270         rc = obd_fid_alloc(mc->mc_desc.cl_exp, fid, NULL);
271         if (rc > 0)
272                 rc = 0;
273         cfs_up(&mc->mc_fid_sem);
274
275         RETURN(rc);
276 }
277
278 /* Allocate new slave object on passed @mc */
279 static int cmm_split_slave_create(const struct lu_env *env,
280                                   struct cmm_device *cmm,
281                                   struct mdc_device *mc,
282                                   struct lu_fid *fid,
283                                   struct md_attr *ma,
284                                   struct lmv_stripe_md *lmv,
285                                   int lmv_size)
286 {
287         struct md_op_spec *spec = &cmm_env_info(env)->cmi_spec;
288         struct cmm_object *obj;
289         int rc;
290         ENTRY;
291
292         /* Allocate new fid and store it to @fid */
293         rc = cmm_split_fid_alloc(env, cmm, mc, fid);
294         if (rc) {
295                 CERROR("Can't alloc new fid on "LPU64
296                        ", rc %d\n", mc->mc_num, rc);
297                 RETURN(rc);
298         }
299
300         /* Allocate new object on @mc */
301         obj = cmm_object_find(env, cmm, fid);
302         if (IS_ERR(obj))
303                 RETURN(PTR_ERR(obj));
304
305         memset(spec, 0, sizeof *spec);
306         spec->u.sp_ea.fid = fid;
307         spec->u.sp_ea.eadata = lmv;
308         spec->u.sp_ea.eadatalen = lmv_size;
309         spec->sp_cr_flags |= MDS_CREATE_SLAVE_OBJ;
310         rc = mo_object_create(env, md_object_next(&obj->cmo_obj),
311                               spec, ma);
312         cmm_object_put(env, obj);
313         RETURN(rc);
314 }
315
316 /*
317  * Create so many slaves as number of stripes. This is called in split time
318  * before sending pages to slaves.
319  */
320 static int cmm_split_slaves_create(const struct lu_env *env,
321                                    struct md_object *mo,
322                                    struct md_attr *ma)
323 {
324         struct cmm_device    *cmm = cmm_obj2dev(md2cmm_obj(mo));
325         struct lu_fid        *lf  = cmm2fid(md2cmm_obj(mo));
326         struct lmv_stripe_md *slave_lmv = &cmm_env_info(env)->cmi_lmv;
327         struct mdc_device    *mc, *tmp;
328         struct lmv_stripe_md *lmv;
329         int i = 1, rc = 0;
330         ENTRY;
331
332         /* Init the split MEA */
333         lmv = ma->ma_lmv;
334         lmv->mea_master = cmm->cmm_local_num;
335         lmv->mea_magic = MEA_MAGIC_HASH_SEGMENT;
336         lmv->mea_count = cmm->cmm_tgt_count + 1;
337
338         /*
339          * Store master FID to local node idx number. Local node is always
340          * master and its stripe number if 0.
341          */
342         lmv->mea_ids[0] = *lf;
343
344         memset(slave_lmv, 0, sizeof *slave_lmv);
345         slave_lmv->mea_master = cmm->cmm_local_num;
346         slave_lmv->mea_magic = MEA_MAGIC_HASH_SEGMENT;
347         slave_lmv->mea_count = 0;
348
349         cfs_list_for_each_entry_safe(mc, tmp, &cmm->cmm_targets, mc_linkage) {
350                 rc = cmm_split_slave_create(env, cmm, mc, &lmv->mea_ids[i],
351                                             ma, slave_lmv, sizeof(*slave_lmv));
352                 if (rc)
353                         GOTO(cleanup, rc);
354                 i++;
355         }
356         EXIT;
357 cleanup:
358         return rc;
359 }
360
361 static inline int cmm_split_special_entry(struct lu_dirent *ent)
362 {
363         if (!strncmp(ent->lde_name, ".", le16_to_cpu(ent->lde_namelen)) ||
364             !strncmp(ent->lde_name, "..", le16_to_cpu(ent->lde_namelen)))
365                 return 1;
366         return 0;
367 }
368
369 static inline struct lu_name *cmm_name(const struct lu_env *env,
370                                        char *name, int buflen)
371 {
372         struct lu_name *lname;
373         struct cmm_thread_info *cmi;
374
375         LASSERT(buflen > 0);
376         LASSERT(name[buflen - 1] == '\0');
377
378         cmi = cmm_env_info(env);
379         lname = &cmi->cti_name;
380         lname->ln_name = name;
381         /* do NOT count the terminating '\0' of name for length */
382         lname->ln_namelen = buflen - 1;
383         return lname;
384 }
385
386 /*
387  * Remove one entry from local MDT. Do not corrupt byte order in page, it will
388  * be sent to remote MDT.
389  */
390 static int cmm_split_remove_entry(const struct lu_env *env,
391                                   struct md_object *mo,
392                                   struct lu_dirent *ent)
393 {
394         struct cmm_device *cmm = cmm_obj2dev(md2cmm_obj(mo));
395         struct cmm_thread_info *cmi;
396         struct md_attr *ma;
397         struct cmm_object *obj;
398         int is_dir, rc;
399         char *name;
400         struct lu_name *lname;
401         ENTRY;
402
403         if (cmm_split_special_entry(ent))
404                 RETURN(0);
405
406         fid_le_to_cpu(&cmm_env_info(env)->cmi_fid, &ent->lde_fid);
407         obj = cmm_object_find(env, cmm, &cmm_env_info(env)->cmi_fid);
408         if (IS_ERR(obj))
409                 RETURN(PTR_ERR(obj));
410
411         cmi = cmm_env_info(env);
412         ma = &cmi->cmi_ma;
413
414         if (lu_object_exists(&obj->cmo_obj.mo_lu) > 0)
415                 is_dir = S_ISDIR(lu_object_attr(&obj->cmo_obj.mo_lu));
416         else
417                 /*
418                  * XXX: These days only cross-ref dirs are possible, so for the
419                  * sake of simplicity, in split, we suppose that all cross-ref
420                  * names point to directory and do not do additional getattr to
421                  * remote MDT.
422                  */
423                 is_dir = 1;
424
425         OBD_ALLOC(name, le16_to_cpu(ent->lde_namelen) + 1);
426         if (!name)
427                 GOTO(cleanup, rc = -ENOMEM);
428
429         memcpy(name, ent->lde_name, le16_to_cpu(ent->lde_namelen));
430         lname = cmm_name(env, name, le16_to_cpu(ent->lde_namelen) + 1);
431         /*
432          * When split, no need update parent's ctime,
433          * and no permission check for name_remove.
434          */
435         ma->ma_attr.la_ctime = 0;
436         if (is_dir)
437                 ma->ma_attr.la_mode = S_IFDIR;
438         else
439                 ma->ma_attr.la_mode = 0;
440         ma->ma_attr.la_valid = LA_MODE;
441         ma->ma_valid = MA_INODE;
442
443         ma->ma_attr_flags |= MDS_PERM_BYPASS;
444         rc = mdo_name_remove(env, md_object_next(mo), lname, ma);
445         OBD_FREE(name, le16_to_cpu(ent->lde_namelen) + 1);
446         if (rc)
447                 GOTO(cleanup, rc);
448
449         /*
450          * This @ent will be transferred to slave MDS and insert there, so in
451          * the slave MDS, we should know whether this object is dir or not, so
452          * use the highest bit of the hash to indicate that (because we do not
453          * use highest bit of hash).
454          */
455         if (is_dir) {
456                 ent->lde_hash = le64_to_cpu(ent->lde_hash);
457                 ent->lde_hash = cpu_to_le64(ent->lde_hash | MAX_HASH_HIGHEST_BIT);
458         }
459         EXIT;
460 cleanup:
461         cmm_object_put(env, obj);
462         return rc;
463 }
464
465 /*
466  * Remove all entries from passed page. These entries are going to remote MDT
467  * and thus should be removed locally.
468  */
469 static int cmm_split_remove_page(const struct lu_env *env,
470                                  struct md_object *mo,
471                                  struct lu_rdpg *rdpg,
472                                  __u64 hash_end, __u32 *len)
473 {
474         struct lu_dirpage *dp;
475         struct lu_dirent  *ent;
476         int rc = 0;
477         ENTRY;
478
479         *len = 0;
480         cfs_kmap(rdpg->rp_pages[0]);
481         dp = page_address(rdpg->rp_pages[0]);
482         for (ent = lu_dirent_start(dp);
483              ent != NULL && le64_to_cpu(ent->lde_hash) < hash_end;
484              ent = lu_dirent_next(ent)) {
485                 rc = cmm_split_remove_entry(env, mo, ent);
486                 if (rc) {
487                         /*
488                          * XXX: Error handler to insert remove name back,
489                          * currently we assumed it will success anyway in
490                          * verfication test.
491                          */
492                         CERROR("Can not del %*.*s, rc %d\n",
493                                le16_to_cpu(ent->lde_namelen),
494                                le16_to_cpu(ent->lde_namelen),
495                                ent->lde_name, rc);
496                         GOTO(unmap, rc);
497                 }
498                 *len += lu_dirent_size(ent);
499         }
500
501         if (ent != lu_dirent_start(dp))
502                 *len += sizeof(struct lu_dirpage);
503         EXIT;
504 unmap:
505         cfs_kunmap(rdpg->rp_pages[0]);
506         return rc;
507 }
508
509 /* Send one page to remote MDT for creating entries there. */
510 static int cmm_split_send_page(const struct lu_env *env,
511                                struct md_object *mo,
512                                struct lu_rdpg *rdpg,
513                                struct lu_fid *fid, int len)
514 {
515         struct cmm_device *cmm = cmm_obj2dev(md2cmm_obj(mo));
516         struct cmm_object *obj;
517         int rc = 0;
518         ENTRY;
519
520         obj = cmm_object_find(env, cmm, fid);
521         if (IS_ERR(obj))
522                 RETURN(PTR_ERR(obj));
523
524         rc = mdc_send_page(cmm, env, md_object_next(&obj->cmo_obj),
525                            rdpg->rp_pages[0], len);
526         cmm_object_put(env, obj);
527         RETURN(rc);
528 }
529
530 /* Read one page of entries from local MDT. */
531 static int cmm_split_read_page(const struct lu_env *env,
532                                struct md_object *mo,
533                                struct lu_rdpg *rdpg)
534 {
535         int rc;
536         ENTRY;
537         memset(cfs_kmap(rdpg->rp_pages[0]), 0, CFS_PAGE_SIZE);
538         cfs_kunmap(rdpg->rp_pages[0]);
539         rc = mo_readpage(env, md_object_next(mo), rdpg);
540         RETURN(rc);
541 }
542
543 /*
544  * This function performs migration of all pages with entries which fit into one
545  * stripe and one hash segment.
546  */
547 static int cmm_split_process_stripe(const struct lu_env *env,
548                                     struct md_object *mo,
549                                     struct lu_rdpg *rdpg,
550                                     struct lu_fid *lf,
551                                     __u64 end)
552 {
553         int rc, done = 0;
554         ENTRY;
555
556         LASSERT(rdpg->rp_npages == 1);
557         do {
558                 struct lu_dirpage *ldp;
559                 __u32 len = 0;
560
561                 /* Read one page from local MDT. */
562                 rc = cmm_split_read_page(env, mo, rdpg);
563                 if (rc) {
564                         CERROR("Error in readpage: %d\n", rc);
565                         RETURN(rc);
566                 }
567
568                 /* Remove local entries which are going to remite MDT. */
569                 rc = cmm_split_remove_page(env, mo, rdpg, end, &len);
570                 if (rc) {
571                         CERROR("Error in remove stripe entries: %d\n", rc);
572                         RETURN(rc);
573                 }
574
575                 /* Send entries page to slave MDT. */
576                 if (len > 0) {
577                         rc = cmm_split_send_page(env, mo, rdpg, lf, len);
578                         if (rc) {
579                                 CERROR("Error in sending page: %d\n", rc);
580                                 RETURN(rc);
581                         }
582                 }
583
584                 cfs_kmap(rdpg->rp_pages[0]);
585                 ldp = page_address(rdpg->rp_pages[0]);
586                 if (le64_to_cpu(ldp->ldp_hash_end) >= end)
587                         done = 1;
588
589                 rdpg->rp_hash = le64_to_cpu(ldp->ldp_hash_end);
590                 cfs_kunmap(rdpg->rp_pages[0]);
591         } while (!done);
592
593         RETURN(rc);
594 }
595
596 static int cmm_split_process_dir(const struct lu_env *env,
597                                  struct md_object *mo,
598                                  struct md_attr *ma)
599 {
600         struct cmm_device *cmm = cmm_obj2dev(md2cmm_obj(mo));
601         struct lu_rdpg *rdpg = &cmm_env_info(env)->cmi_rdpg;
602         __u64 hash_segment;
603         int rc = 0, i;
604         ENTRY;
605
606         memset(rdpg, 0, sizeof *rdpg);
607         rdpg->rp_npages = CMM_SPLIT_PAGE_COUNT;
608         rdpg->rp_count  = CFS_PAGE_SIZE * rdpg->rp_npages;
609         rdpg->rp_pages  = cmm_env_info(env)->cmi_pages;
610
611         for (i = 0; i < rdpg->rp_npages; i++) {
612                 rdpg->rp_pages[i] = cfs_alloc_page(CFS_ALLOC_STD);
613                 if (rdpg->rp_pages[i] == NULL)
614                         GOTO(cleanup, rc = -ENOMEM);
615         }
616
617         hash_segment = MAX_HASH_SIZE;
618         do_div(hash_segment, cmm->cmm_tgt_count + 1);
619         for (i = 1; i < cmm->cmm_tgt_count + 1; i++) {
620                 struct lu_fid *lf;
621                 __u64 hash_end;
622
623                 lf = &ma->ma_lmv->mea_ids[i];
624
625                 rdpg->rp_hash = i * hash_segment;
626                 if (i == cmm->cmm_tgt_count)
627                         hash_end = MAX_HASH_SIZE;
628                 else
629                         hash_end = rdpg->rp_hash + hash_segment;
630                 rc = cmm_split_process_stripe(env, mo, rdpg, lf, hash_end);
631                 if (rc) {
632                         CERROR("Error (rc = %d) while splitting for %d: fid="
633                                DFID", "LPX64":"LPX64"\n", rc, i, PFID(lf),
634                                rdpg->rp_hash, hash_end);
635                         GOTO(cleanup, rc);
636                 }
637         }
638         EXIT;
639 cleanup:
640         for (i = 0; i < rdpg->rp_npages; i++)
641                 if (rdpg->rp_pages[i] != NULL)
642                         cfs_free_page(rdpg->rp_pages[i]);
643         return rc;
644 }
645
646 int cmm_split_dir(const struct lu_env *env, struct md_object *mo)
647 {
648         struct cmm_device *cmm = cmm_obj2dev(md2cmm_obj(mo));
649         struct md_attr    *ma = &cmm_env_info(env)->cmi_ma;
650         int                rc = 0, split;
651         struct lu_buf     *buf;
652         ENTRY;
653
654         cmm_lprocfs_time_start(env);
655
656         LASSERT(S_ISDIR(lu_object_attr(&mo->mo_lu)));
657         memset(ma, 0, sizeof(*ma));
658
659         /* Step1: Checking whether the dir needs to be split. */
660         rc = cmm_split_expect(env, mo, ma, &split);
661         if (rc)
662                 GOTO(out, rc);
663
664         if (split != CMM_SPLIT_NEEDED) {
665                 /* No split is needed, caller may proceed with create. */
666                 GOTO(out, rc = 0);
667         }
668
669         /* Split should be done now, let's do it. */
670         CWARN("Dir "DFID" is going to split (size: "LPU64")\n",
671               PFID(lu_object_fid(&mo->mo_lu)), ma->ma_attr.la_size);
672
673         /*
674          * Disable transacrions for split, since there will be so many trans in
675          * this one ops, conflict with current recovery design.
676          */
677         rc = cmm_upcall(env, &cmm->cmm_md_dev, MD_NO_TRANS, NULL);
678         if (rc) {
679                 CERROR("Can't disable trans for split, rc %d\n", rc);
680                 GOTO(out, rc);
681         }
682
683         /* Step2: Prepare the md memory */
684         ma->ma_lmv_size = CMM_MD_SIZE(cmm->cmm_tgt_count + 1);
685         OBD_ALLOC(ma->ma_lmv, ma->ma_lmv_size);
686         if (ma->ma_lmv == NULL)
687                 GOTO(out, rc = -ENOMEM);
688
689         /* Step3: Create slave objects and fill the ma->ma_lmv */
690         rc = cmm_split_slaves_create(env, mo, ma);
691         if (rc) {
692                 CERROR("Can't create slaves for split, rc %d\n", rc);
693                 GOTO(cleanup, rc);
694         }
695
696         /* Step4: Scan and split the object. */
697         rc = cmm_split_process_dir(env, mo, ma);
698         if (rc) {
699                 CERROR("Can't scan and split, rc %d\n", rc);
700                 GOTO(cleanup, rc);
701         }
702
703         /* Step5: Set mea to the master object. */
704         buf = cmm_buf_get(env, ma->ma_lmv, ma->ma_lmv_size);
705         rc = mo_xattr_set(env, md_object_next(mo), buf,
706                           MDS_LMV_MD_NAME, 0);
707         if (rc) {
708                 CERROR("Can't set MEA to master dir, " "rc %d\n", rc);
709                 GOTO(cleanup, rc);
710         }
711
712         /* set flag in cmm_object */
713         md2cml_obj(mo)->clo_split = CMM_SPLIT_DONE;
714
715         /*
716          * Finally, split succeed, tell client to repeat opetartion on correct
717          * MDT.
718          */
719         CWARN("Dir "DFID" has been split\n", PFID(lu_object_fid(&mo->mo_lu)));
720         rc = -ERESTART;
721         EXIT;
722 cleanup:
723         OBD_FREE(ma->ma_lmv, ma->ma_lmv_size);
724 out:
725         cmm_lprocfs_time_end(env, cmm, LPROC_CMM_SPLIT);
726         return rc;
727 }