Whamcloud - gitweb
361b38de396eccda36b2ff6b7a1cadef34224aa3
[fs/lustre-release.git] / lustre / cmm / cmm_split.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright  2008 Sun Microsystems, Inc. All rights reserved
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/cmm/cmm_split.c
37  *
38  * Lustre splitting dir
39  *
40  * Author: Alex Thomas  <alex@clusterfs.com>
41  * Author: Wang Di      <wangdi@clusterfs.com>
42  * Author: Yury Umanets <umka@clusterfs.com>
43  */
44
45 #ifndef EXPORT_SYMTAB
46 # define EXPORT_SYMTAB
47 #endif
48
49 #define DEBUG_SUBSYSTEM S_MDS
50
51 #include <obd_class.h>
52 #include <lustre_fid.h>
53 #include <lustre_mds.h>
54 #include <lustre/lustre_idl.h>
55 #include "cmm_internal.h"
56 #include "mdc_internal.h"
57
58 enum {
59         CMM_SPLIT_SIZE =  128 * 1024
60 };
61
62 /*
63  * This function checks if passed @name come to correct server (local MDT). If
64  * not - return -ERESTART and let client know that dir was split and client
65  * needs to chose correct stripe.
66  */
67 int cmm_split_check(const struct lu_env *env, struct md_object *mp,
68                     const char *name)
69 {
70         struct cmm_device *cmm = cmm_obj2dev(md2cmm_obj(mp));
71         struct md_attr *ma = &cmm_env_info(env)->cmi_ma;
72         struct cml_object *clo = md2cml_obj(mp);
73         int rc, lmv_size;
74         ENTRY;
75
76         cmm_lprocfs_time_start(env);
77
78         /* Not split yet */
79         if (clo->clo_split == CMM_SPLIT_NONE ||
80             clo->clo_split == CMM_SPLIT_DENIED)
81                 GOTO(out, rc = 0);
82
83         lmv_size = CMM_MD_SIZE(cmm->cmm_tgt_count + 1);
84
85         /* Try to get the LMV EA */
86         memset(ma, 0, sizeof(*ma));
87
88         ma->ma_need = MA_LMV;
89         ma->ma_lmv_size = lmv_size;
90         OBD_ALLOC(ma->ma_lmv, lmv_size);
91         if (ma->ma_lmv == NULL)
92                 GOTO(out, rc = -ENOMEM);
93
94         /* Get LMV EA, Note: refresh valid here for getting LMV_EA */
95         rc = mo_attr_get(env, mp, ma);
96         if (rc)
97                 GOTO(cleanup, rc);
98
99         /* No LMV just return */
100         if (!(ma->ma_valid & MA_LMV)) {
101                 /* update split state if unknown */
102                 if (clo->clo_split == CMM_SPLIT_UNKNOWN)
103                         clo->clo_split = CMM_SPLIT_NONE;
104                 GOTO(cleanup, rc = 0);
105         }
106
107         /* Skip checking the slave dirs (mea_count is 0) */
108         if (ma->ma_lmv->mea_count != 0) {
109                 int idx;
110
111                 /*
112                  * Get stripe by name to check the name belongs to master dir,
113                  * otherwise return the -ERESTART
114                  */
115                 idx = mea_name2idx(ma->ma_lmv, name, strlen(name));
116
117                 /*
118                  * Check if name came to correct MDT server. We suppose that if
119                  * client does not know about split, it sends create operation
120                  * to master MDT. And this is master job to say it that dir got
121                  * split and client should orward request to correct MDT. This
122                  * is why we check here if stripe zero or not. Zero stripe means
123                  * master stripe. If stripe calculated from name is not zero -
124                  * return -ERESTART.
125                  */
126                 if (idx != 0)
127                         rc = -ERESTART;
128
129                 /* update split state to DONE if unknown */
130                 if (clo->clo_split == CMM_SPLIT_UNKNOWN)
131                         clo->clo_split = CMM_SPLIT_DONE;
132         } else {
133                 /* split is denied for slave dir */
134                 clo->clo_split = CMM_SPLIT_DENIED;
135         }
136         EXIT;
137 cleanup:
138         OBD_FREE(ma->ma_lmv, lmv_size);
139 out:
140         cmm_lprocfs_time_end(env, cmm, LPROC_CMM_SPLIT_CHECK);
141         return rc;
142 }
143
144 /*
145  * Return preferable access mode to caller taking into account possible split
146  * and the fact of existing not splittable dirs in principle.
147  */
148 int cmm_split_access(const struct lu_env *env, struct md_object *mo,
149                      mdl_mode_t lm)
150 {
151         struct md_attr *ma = &cmm_env_info(env)->cmi_ma;
152         int rc, split;
153         ENTRY;
154
155         memset(ma, 0, sizeof(*ma));
156
157         /*
158          * Check only if we need protection from split.  If not - mdt handles
159          * other cases.
160          */
161         rc = cmm_split_expect(env, mo, ma, &split);
162         if (rc) {
163                 CERROR("Can't check for possible split, rc %d\n", rc);
164                 RETURN(MDL_MINMODE);
165         }
166
167         /*
168          * Do not take PDO lock on non-splittable objects if this is not PW,
169          * this should speed things up a bit.
170          */
171         if (split == CMM_SPLIT_DONE && lm != MDL_PW)
172                 RETURN(MDL_NL);
173
174         /* Protect splitting by exclusive lock. */
175         if (split == CMM_SPLIT_NEEDED && lm == MDL_PW)
176                 RETURN(MDL_EX);
177
178         /*
179          * Have no idea about lock mode, let it be what higher layer wants.
180          */
181         RETURN(MDL_MINMODE);
182 }
183
184 /* Check if split is expected for current thread. */
185 int cmm_split_expect(const struct lu_env *env, struct md_object *mo,
186                      struct md_attr *ma, int *split)
187 {
188         struct cmm_device *cmm = cmm_obj2dev(md2cmm_obj(mo));
189         struct cml_object *clo = md2cml_obj(mo);
190         struct lu_fid root_fid;
191         int rc;
192         ENTRY;
193
194         if (clo->clo_split == CMM_SPLIT_DONE ||
195             clo->clo_split == CMM_SPLIT_DENIED) {
196                 *split = clo->clo_split;
197                 RETURN(0);
198         }
199         /* CMM_SPLIT_UNKNOWN case below */
200
201         /* No need to split root object. */
202         rc = cmm_child_ops(cmm)->mdo_root_get(env, cmm->cmm_child,
203                                               &root_fid);
204         if (rc)
205                 RETURN(rc);
206
207         if (lu_fid_eq(&root_fid, cmm2fid(md2cmm_obj(mo)))) {
208                 /* update split state */
209                 *split = clo->clo_split == CMM_SPLIT_DENIED;
210                 RETURN(0);
211         }
212
213         /*
214          * Assumption: ma_valid = 0 here, we only need get inode and lmv_size
215          * for this get_attr.
216          */
217         LASSERT(ma->ma_valid == 0);
218         ma->ma_need = MA_INODE | MA_LMV;
219         rc = mo_attr_get(env, mo, ma);
220         if (rc)
221                 RETURN(rc);
222
223         /* No need split for already split object */
224         if (ma->ma_valid & MA_LMV) {
225                 LASSERT(ma->ma_lmv_size > 0);
226                 *split = clo->clo_split = CMM_SPLIT_DONE;
227                 RETURN(0);
228         }
229
230         /* No need split for object whose size < CMM_SPLIT_SIZE */
231         if (ma->ma_attr.la_size < CMM_SPLIT_SIZE) {
232                 *split = clo->clo_split = CMM_SPLIT_NONE;
233                 RETURN(0);
234         }
235
236         *split = clo->clo_split = CMM_SPLIT_NEEDED;
237         RETURN(0);
238 }
239
240 struct cmm_object *cmm_object_find(const struct lu_env *env,
241                                    struct cmm_device *d,
242                                    const struct lu_fid *f)
243 {
244         return md2cmm_obj(md_object_find_slice(env, &d->cmm_md_dev, fid));
245 }
246
247 static inline void cmm_object_put(const struct lu_env *env,
248                                   struct cmm_object *o)
249 {
250         lu_object_put(env, &o->cmo_obj.mo_lu);
251 }
252
253 /*
254  * Allocate new on passed @mc for slave object which is going to create there
255  * soon.
256  */
257 static int cmm_split_fid_alloc(const struct lu_env *env,
258                                struct cmm_device *cmm,
259                                struct mdc_device *mc,
260                                struct lu_fid *fid)
261 {
262         int rc;
263         ENTRY;
264
265         LASSERT(cmm != NULL && mc != NULL && fid != NULL);
266
267         down(&mc->mc_fid_sem);
268
269         /* Alloc new fid on @mc. */
270         rc = obd_fid_alloc(mc->mc_desc.cl_exp, fid, NULL);
271         if (rc > 0) {
272                 /* Setup FLD for new sequenceif needed. */
273                 rc = fld_client_create(cmm->cmm_fld, fid_seq(fid),
274                                        mc->mc_num, env);
275                 if (rc)
276                         CERROR("Can't create fld entry, rc %d\n", rc);
277         }
278         up(&mc->mc_fid_sem);
279
280         RETURN(rc);
281 }
282
283 /* Allocate new slave object on passed @mc */
284 static int cmm_split_slave_create(const struct lu_env *env,
285                                   struct cmm_device *cmm,
286                                   struct mdc_device *mc,
287                                   struct lu_fid *fid,
288                                   struct md_attr *ma,
289                                   struct lmv_stripe_md *lmv,
290                                   int lmv_size)
291 {
292         struct md_op_spec *spec = &cmm_env_info(env)->cmi_spec;
293         struct cmm_object *obj;
294         int rc;
295         ENTRY;
296
297         /* Allocate new fid and store it to @fid */
298         rc = cmm_split_fid_alloc(env, cmm, mc, fid);
299         if (rc) {
300                 CERROR("Can't alloc new fid on "LPU64
301                        ", rc %d\n", mc->mc_num, rc);
302                 RETURN(rc);
303         }
304
305         /* Allocate new object on @mc */
306         obj = cmm_object_find(env, cmm, fid);
307         if (IS_ERR(obj))
308                 RETURN(PTR_ERR(obj));
309
310         memset(spec, 0, sizeof *spec);
311         spec->u.sp_ea.fid = fid;
312         spec->u.sp_ea.eadata = lmv;
313         spec->u.sp_ea.eadatalen = lmv_size;
314         spec->sp_cr_flags |= MDS_CREATE_SLAVE_OBJ;
315         rc = mo_object_create(env, md_object_next(&obj->cmo_obj),
316                               spec, ma);
317         cmm_object_put(env, obj);
318         RETURN(rc);
319 }
320
321 /*
322  * Create so many slaves as number of stripes. This is called in split time
323  * before sending pages to slaves.
324  */
325 static int cmm_split_slaves_create(const struct lu_env *env,
326                                    struct md_object *mo,
327                                    struct md_attr *ma)
328 {
329         struct cmm_device    *cmm = cmm_obj2dev(md2cmm_obj(mo));
330         struct lu_fid        *lf  = cmm2fid(md2cmm_obj(mo));
331         struct lmv_stripe_md *slave_lmv = &cmm_env_info(env)->cmi_lmv;
332         struct mdc_device    *mc, *tmp;
333         struct lmv_stripe_md *lmv;
334         int i = 1, rc = 0;
335         ENTRY;
336
337         /* Init the split MEA */
338         lmv = ma->ma_lmv;
339         lmv->mea_master = cmm->cmm_local_num;
340         lmv->mea_magic = MEA_MAGIC_HASH_SEGMENT;
341         lmv->mea_count = cmm->cmm_tgt_count + 1;
342
343         /*
344          * Store master FID to local node idx number. Local node is always
345          * master and its stripe number if 0.
346          */
347         lmv->mea_ids[0] = *lf;
348
349         memset(slave_lmv, 0, sizeof *slave_lmv);
350         slave_lmv->mea_master = cmm->cmm_local_num;
351         slave_lmv->mea_magic = MEA_MAGIC_HASH_SEGMENT;
352         slave_lmv->mea_count = 0;
353
354         list_for_each_entry_safe(mc, tmp, &cmm->cmm_targets, mc_linkage) {
355                 rc = cmm_split_slave_create(env, cmm, mc, &lmv->mea_ids[i],
356                                             ma, slave_lmv, sizeof(*slave_lmv));
357                 if (rc)
358                         GOTO(cleanup, rc);
359                 i++;
360         }
361         EXIT;
362 cleanup:
363         return rc;
364 }
365
366 static inline int cmm_split_special_entry(struct lu_dirent *ent)
367 {
368         if (!strncmp(ent->lde_name, ".", le16_to_cpu(ent->lde_namelen)) ||
369             !strncmp(ent->lde_name, "..", le16_to_cpu(ent->lde_namelen)))
370                 return 1;
371         return 0;
372 }
373
374 static inline struct lu_name *cmm_name(const struct lu_env *env,
375                                        char *name, int buflen)
376 {
377         struct lu_name *lname;
378         struct cmm_thread_info *cmi;
379
380         LASSERT(buflen > 0);
381         LASSERT(name[buflen - 1] == '\0');
382
383         cmi = cmm_env_info(env);
384         lname = &cmi->cti_name;
385         lname->ln_name = name;
386         /* do NOT count the terminating '\0' of name for length */
387         lname->ln_namelen = buflen - 1;
388         return lname;
389 }
390
391 /*
392  * Remove one entry from local MDT. Do not corrupt byte order in page, it will
393  * be sent to remote MDT.
394  */
395 static int cmm_split_remove_entry(const struct lu_env *env,
396                                   struct md_object *mo,
397                                   struct lu_dirent *ent)
398 {
399         struct cmm_device *cmm = cmm_obj2dev(md2cmm_obj(mo));
400         struct cmm_thread_info *cmi;
401         struct md_attr *ma;
402         struct cmm_object *obj;
403         int is_dir, rc;
404         char *name;
405         struct lu_name *lname;
406         ENTRY;
407
408         if (cmm_split_special_entry(ent))
409                 RETURN(0);
410
411         fid_le_to_cpu(&cmm_env_info(env)->cmi_fid, &ent->lde_fid);
412         obj = cmm_object_find(env, cmm, &cmm_env_info(env)->cmi_fid);
413         if (IS_ERR(obj))
414                 RETURN(PTR_ERR(obj));
415
416         cmi = cmm_env_info(env);
417         ma = &cmi->cmi_ma;
418
419         if (lu_object_exists(&obj->cmo_obj.mo_lu) > 0)
420                 is_dir = S_ISDIR(lu_object_attr(&obj->cmo_obj.mo_lu));
421         else
422                 /*
423                  * XXX: These days only cross-ref dirs are possible, so for the
424                  * sake of simplicity, in split, we suppose that all cross-ref
425                  * names point to directory and do not do additional getattr to
426                  * remote MDT.
427                  */
428                 is_dir = 1;
429
430         OBD_ALLOC(name, le16_to_cpu(ent->lde_namelen) + 1);
431         if (!name)
432                 GOTO(cleanup, rc = -ENOMEM);
433
434         memcpy(name, ent->lde_name, le16_to_cpu(ent->lde_namelen));
435         lname = cmm_name(env, name, le16_to_cpu(ent->lde_namelen) + 1);
436         /*
437          * When split, no need update parent's ctime,
438          * and no permission check for name_remove.
439          */
440         ma->ma_attr.la_ctime = 0;
441         if (is_dir)
442                 ma->ma_attr.la_mode = S_IFDIR;
443         else
444                 ma->ma_attr.la_mode = 0;
445         ma->ma_attr.la_valid = LA_MODE;
446         ma->ma_valid = MA_INODE;
447
448         ma->ma_attr_flags |= MDS_PERM_BYPASS;
449         rc = mdo_name_remove(env, md_object_next(mo), lname, ma);
450         OBD_FREE(name, le16_to_cpu(ent->lde_namelen) + 1);
451         if (rc)
452                 GOTO(cleanup, rc);
453
454         /*
455          * This @ent will be transferred to slave MDS and insert there, so in
456          * the slave MDS, we should know whether this object is dir or not, so
457          * use the highest bit of the hash to indicate that (because we do not
458          * use highest bit of hash).
459          */
460         if (is_dir) {
461                 ent->lde_hash = le64_to_cpu(ent->lde_hash);
462                 ent->lde_hash = cpu_to_le64(ent->lde_hash | MAX_HASH_HIGHEST_BIT);
463         }
464         EXIT;
465 cleanup:
466         cmm_object_put(env, obj);
467         return rc;
468 }
469
470 /*
471  * Remove all entries from passed page. These entries are going to remote MDT
472  * and thus should be removed locally.
473  */
474 static int cmm_split_remove_page(const struct lu_env *env,
475                                  struct md_object *mo,
476                                  struct lu_rdpg *rdpg,
477                                  __u64 hash_end, __u32 *len)
478 {
479         struct lu_dirpage *dp;
480         struct lu_dirent  *ent;
481         int rc = 0;
482         ENTRY;
483
484         *len = 0;
485         cfs_kmap(rdpg->rp_pages[0]);
486         dp = page_address(rdpg->rp_pages[0]);
487         for (ent = lu_dirent_start(dp);
488              ent != NULL && le64_to_cpu(ent->lde_hash) < hash_end;
489              ent = lu_dirent_next(ent)) {
490                 rc = cmm_split_remove_entry(env, mo, ent);
491                 if (rc) {
492                         /*
493                          * XXX: Error handler to insert remove name back,
494                          * currently we assumed it will success anyway in
495                          * verfication test.
496                          */
497                         CERROR("Can not del %*.*s, rc %d\n",
498                                le16_to_cpu(ent->lde_namelen),
499                                le16_to_cpu(ent->lde_namelen),
500                                ent->lde_name, rc);
501                         GOTO(unmap, rc);
502                 }
503                 *len += lu_dirent_size(ent);
504         }
505
506         if (ent != lu_dirent_start(dp))
507                 *len += sizeof(struct lu_dirpage);
508         EXIT;
509 unmap:
510         cfs_kunmap(rdpg->rp_pages[0]);
511         return rc;
512 }
513
514 /* Send one page to remote MDT for creating entries there. */
515 static int cmm_split_send_page(const struct lu_env *env,
516                                struct md_object *mo,
517                                struct lu_rdpg *rdpg,
518                                struct lu_fid *fid, int len)
519 {
520         struct cmm_device *cmm = cmm_obj2dev(md2cmm_obj(mo));
521         struct cmm_object *obj;
522         int rc = 0;
523         ENTRY;
524
525         obj = cmm_object_find(env, cmm, fid);
526         if (IS_ERR(obj))
527                 RETURN(PTR_ERR(obj));
528
529         rc = mdc_send_page(cmm, env, md_object_next(&obj->cmo_obj),
530                            rdpg->rp_pages[0], len);
531         cmm_object_put(env, obj);
532         RETURN(rc);
533 }
534
535 /* Read one page of entries from local MDT. */
536 static int cmm_split_read_page(const struct lu_env *env,
537                                struct md_object *mo,
538                                struct lu_rdpg *rdpg)
539 {
540         int rc;
541         ENTRY;
542         memset(cfs_kmap(rdpg->rp_pages[0]), 0, CFS_PAGE_SIZE);
543         cfs_kunmap(rdpg->rp_pages[0]);
544         rc = mo_readpage(env, md_object_next(mo), rdpg);
545         RETURN(rc);
546 }
547
548 /*
549  * This function performs migration of all pages with entries which fit into one
550  * stripe and one hash segment.
551  */
552 static int cmm_split_process_stripe(const struct lu_env *env,
553                                     struct md_object *mo,
554                                     struct lu_rdpg *rdpg,
555                                     struct lu_fid *lf,
556                                     __u64 end)
557 {
558         int rc, done = 0;
559         ENTRY;
560
561         LASSERT(rdpg->rp_npages == 1);
562         do {
563                 struct lu_dirpage *ldp;
564                 __u32 len = 0;
565
566                 /* Read one page from local MDT. */
567                 rc = cmm_split_read_page(env, mo, rdpg);
568                 if (rc) {
569                         CERROR("Error in readpage: %d\n", rc);
570                         RETURN(rc);
571                 }
572
573                 /* Remove local entries which are going to remite MDT. */
574                 rc = cmm_split_remove_page(env, mo, rdpg, end, &len);
575                 if (rc) {
576                         CERROR("Error in remove stripe entries: %d\n", rc);
577                         RETURN(rc);
578                 }
579
580                 /* Send entries page to slave MDT. */
581                 if (len > 0) {
582                         rc = cmm_split_send_page(env, mo, rdpg, lf, len);
583                         if (rc) {
584                                 CERROR("Error in sending page: %d\n", rc);
585                                 RETURN(rc);
586                         }
587                 }
588
589                 cfs_kmap(rdpg->rp_pages[0]);
590                 ldp = page_address(rdpg->rp_pages[0]);
591                 if (le64_to_cpu(ldp->ldp_hash_end) >= end)
592                         done = 1;
593
594                 rdpg->rp_hash = le64_to_cpu(ldp->ldp_hash_end);
595                 cfs_kunmap(rdpg->rp_pages[0]);
596         } while (!done);
597
598         RETURN(rc);
599 }
600
601 static int cmm_split_process_dir(const struct lu_env *env,
602                                  struct md_object *mo,
603                                  struct md_attr *ma)
604 {
605         struct cmm_device *cmm = cmm_obj2dev(md2cmm_obj(mo));
606         struct lu_rdpg *rdpg = &cmm_env_info(env)->cmi_rdpg;
607         __u64 hash_segment;
608         int rc = 0, i;
609         ENTRY;
610
611         memset(rdpg, 0, sizeof *rdpg);
612         rdpg->rp_npages = CMM_SPLIT_PAGE_COUNT;
613         rdpg->rp_count  = CFS_PAGE_SIZE * rdpg->rp_npages;
614         rdpg->rp_pages  = cmm_env_info(env)->cmi_pages;
615
616         for (i = 0; i < rdpg->rp_npages; i++) {
617                 rdpg->rp_pages[i] = cfs_alloc_page(CFS_ALLOC_STD);
618                 if (rdpg->rp_pages[i] == NULL)
619                         GOTO(cleanup, rc = -ENOMEM);
620         }
621
622         hash_segment = MAX_HASH_SIZE;
623         do_div(hash_segment, cmm->cmm_tgt_count + 1);
624         for (i = 1; i < cmm->cmm_tgt_count + 1; i++) {
625                 struct lu_fid *lf;
626                 __u64 hash_end;
627
628                 lf = &ma->ma_lmv->mea_ids[i];
629
630                 rdpg->rp_hash = i * hash_segment;
631                 if (i == cmm->cmm_tgt_count)
632                         hash_end = MAX_HASH_SIZE;
633                 else
634                         hash_end = rdpg->rp_hash + hash_segment;
635                 rc = cmm_split_process_stripe(env, mo, rdpg, lf, hash_end);
636                 if (rc) {
637                         CERROR("Error (rc = %d) while splitting for %d: fid="
638                                DFID", "LPX64":"LPX64"\n", rc, i, PFID(lf),
639                                rdpg->rp_hash, hash_end);
640                         GOTO(cleanup, rc);
641                 }
642         }
643         EXIT;
644 cleanup:
645         for (i = 0; i < rdpg->rp_npages; i++)
646                 if (rdpg->rp_pages[i] != NULL)
647                         __cfs_free_page(rdpg->rp_pages[i]);
648         return rc;
649 }
650
651 int cmm_split_dir(const struct lu_env *env, struct md_object *mo)
652 {
653         struct cmm_device *cmm = cmm_obj2dev(md2cmm_obj(mo));
654         struct md_attr    *ma = &cmm_env_info(env)->cmi_ma;
655         int                rc = 0, split;
656         struct lu_buf     *buf;
657         ENTRY;
658
659         cmm_lprocfs_time_start(env);
660
661         LASSERT(S_ISDIR(lu_object_attr(&mo->mo_lu)));
662         memset(ma, 0, sizeof(*ma));
663
664         /* Step1: Checking whether the dir needs to be split. */
665         rc = cmm_split_expect(env, mo, ma, &split);
666         if (rc)
667                 GOTO(out, rc);
668
669         if (split != CMM_SPLIT_NEEDED) {
670                 /* No split is needed, caller may proceed with create. */
671                 GOTO(out, rc = 0);
672         }
673
674         /* Split should be done now, let's do it. */
675         CWARN("Dir "DFID" is going to split (size: "LPU64")\n",
676               PFID(lu_object_fid(&mo->mo_lu)), ma->ma_attr.la_size);
677
678         /*
679          * Disable transacrions for split, since there will be so many trans in
680          * this one ops, conflict with current recovery design.
681          */
682         rc = cmm_upcall(env, &cmm->cmm_md_dev, MD_NO_TRANS);
683         if (rc) {
684                 CERROR("Can't disable trans for split, rc %d\n", rc);
685                 GOTO(out, rc);
686         }
687
688         /* Step2: Prepare the md memory */
689         ma->ma_lmv_size = CMM_MD_SIZE(cmm->cmm_tgt_count + 1);
690         OBD_ALLOC(ma->ma_lmv, ma->ma_lmv_size);
691         if (ma->ma_lmv == NULL)
692                 GOTO(out, rc = -ENOMEM);
693
694         /* Step3: Create slave objects and fill the ma->ma_lmv */
695         rc = cmm_split_slaves_create(env, mo, ma);
696         if (rc) {
697                 CERROR("Can't create slaves for split, rc %d\n", rc);
698                 GOTO(cleanup, rc);
699         }
700
701         /* Step4: Scan and split the object. */
702         rc = cmm_split_process_dir(env, mo, ma);
703         if (rc) {
704                 CERROR("Can't scan and split, rc %d\n", rc);
705                 GOTO(cleanup, rc);
706         }
707
708         /* Step5: Set mea to the master object. */
709         buf = cmm_buf_get(env, ma->ma_lmv, ma->ma_lmv_size);
710         rc = mo_xattr_set(env, md_object_next(mo), buf,
711                           MDS_LMV_MD_NAME, 0);
712         if (rc) {
713                 CERROR("Can't set MEA to master dir, " "rc %d\n", rc);
714                 GOTO(cleanup, rc);
715         }
716
717         /* set flag in cmm_object */
718         md2cml_obj(mo)->clo_split = CMM_SPLIT_DONE;
719
720         /*
721          * Finally, split succeed, tell client to repeat opetartion on correct
722          * MDT.
723          */
724         CWARN("Dir "DFID" has been split\n", PFID(lu_object_fid(&mo->mo_lu)));
725         rc = -ERESTART;
726         EXIT;
727 cleanup:
728         OBD_FREE(ma->ma_lmv, ma->ma_lmv_size);
729 out:
730         cmm_lprocfs_time_end(env, cmm, LPROC_CMM_SPLIT);
731         return rc;
732 }