Whamcloud - gitweb
33a9657c449691954898e0c21e11aa7cb4d058a6
[fs/lustre-release.git] / lustre / cmm / cmm_split.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  lustre/cmm/cmm_split.c
5  *  Lustre splitting dir
6  *
7  *  Copyright (c) 2006 Cluster File Systems, Inc.
8  *   Author: Alex Thomas  <alex@clusterfs.com>
9  *           Wang Di      <wangdi@clusterfs.com>
10  *           Yury Umanets <umka@clusterfs.com>
11  *
12  *   This file is part of the Lustre file system, http://www.lustre.org
13  *   Lustre is a trademark of Cluster File Systems, Inc.
14  *
15  *   You may have signed or agreed to another license before downloading
16  *   this software.  If so, you are bound by the terms and conditions
17  *   of that agreement, and the following does not apply to you.  See the
18  *   LICENSE file included with this distribution for more information.
19  *
20  *   If you did not agree to a different license, then this copy of Lustre
21  *   is open source software; you can redistribute it and/or modify it
22  *   under the terms of version 2 of the GNU General Public License as
23  *   published by the Free Software Foundation.
24  *
25  *   In either case, Lustre is distributed in the hope that it will be
26  *   useful, but WITHOUT ANY WARRANTY; without even the implied warranty
27  *   of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
28  *   license text for more details.
29  */
30
31 #ifndef EXPORT_SYMTAB
32 # define EXPORT_SYMTAB
33 #endif
34
35 #define DEBUG_SUBSYSTEM S_MDS
36
37 #include <obd_class.h>
38 #include <lustre_fid.h>
39 #include <lustre_mds.h>
40 #include <lustre/lustre_idl.h>
41 #include "cmm_internal.h"
42 #include "mdc_internal.h"
43
44 enum {
45         CMM_SPLIT_SIZE =  128 * 1024
46 };
47
48 /*
49  * This function checks if passed @name come to correct server (local MDT). If
50  * not - return -ERESTART and let client know that dir was split and client
51  * needs to chose correct stripe.
52  */
53 int cmm_split_check(const struct lu_env *env, struct md_object *mp,
54                     const char *name)
55 {
56         struct cmm_device *cmm = cmm_obj2dev(md2cmm_obj(mp));
57         struct md_attr *ma = &cmm_env_info(env)->cmi_ma;
58         struct cml_object *clo = md2cml_obj(mp);
59         int rc, lmv_size;
60         ENTRY;
61
62         cmm_lprocfs_time_start(env);
63
64         /* Not split yet */
65         if (clo->clo_split == CMM_SPLIT_NONE ||
66             clo->clo_split == CMM_SPLIT_DENIED)
67                 GOTO(out, rc = 0);
68
69         lmv_size = CMM_MD_SIZE(cmm->cmm_tgt_count + 1);
70
71         /* Try to get the LMV EA */
72         memset(ma, 0, sizeof(*ma));
73
74         ma->ma_need = MA_LMV;
75         ma->ma_lmv_size = lmv_size;
76         OBD_ALLOC(ma->ma_lmv, lmv_size);
77         if (ma->ma_lmv == NULL)
78                 GOTO(out, rc = -ENOMEM);
79
80         /* Get LMV EA, Note: refresh valid here for getting LMV_EA */
81         rc = mo_attr_get(env, mp, ma);
82         if (rc)
83                 GOTO(cleanup, rc);
84
85         /* No LMV just return */
86         if (!(ma->ma_valid & MA_LMV)) {
87                 /* update split state if unknown */
88                 if (clo->clo_split == CMM_SPLIT_UNKNOWN)
89                         clo->clo_split = CMM_SPLIT_NONE;
90                 GOTO(cleanup, rc = 0);
91         }
92
93         /* Skip checking the slave dirs (mea_count is 0) */
94         if (ma->ma_lmv->mea_count != 0) {
95                 int idx;
96
97                 /*
98                  * Get stripe by name to check the name belongs to master dir,
99                  * otherwise return the -ERESTART
100                  */
101                 idx = mea_name2idx(ma->ma_lmv, name, strlen(name));
102
103                 /*
104                  * Check if name came to correct MDT server. We suppose that if
105                  * client does not know about split, it sends create operation
106                  * to master MDT. And this is master job to say it that dir got
107                  * split and client should orward request to correct MDT. This
108                  * is why we check here if stripe zero or not. Zero stripe means
109                  * master stripe. If stripe calculated from name is not zero -
110                  * return -ERESTART.
111                  */
112                 if (idx != 0)
113                         rc = -ERESTART;
114
115                 /* update split state to DONE if unknown */
116                 if (clo->clo_split == CMM_SPLIT_UNKNOWN)
117                         clo->clo_split = CMM_SPLIT_DONE;
118         } else {
119                 /* split is denied for slave dir */
120                 clo->clo_split = CMM_SPLIT_DENIED;
121         }
122         EXIT;
123 cleanup:
124         OBD_FREE(ma->ma_lmv, lmv_size);
125 out:
126         cmm_lprocfs_time_end(env, cmm, LPROC_CMM_SPLIT_CHECK);
127         return rc;
128 }
129
130 /*
131  * Return preferable access mode to caller taking into account possible split
132  * and the fact of existing not splittable dirs in principle.
133  */
134 int cmm_split_access(const struct lu_env *env, struct md_object *mo,
135                      mdl_mode_t lm)
136 {
137         struct md_attr *ma = &cmm_env_info(env)->cmi_ma;
138         int rc, split;
139         ENTRY;
140
141         memset(ma, 0, sizeof(*ma));
142
143         /*
144          * Check only if we need protection from split.  If not - mdt handles
145          * other cases.
146          */
147         rc = cmm_split_expect(env, mo, ma, &split);
148         if (rc) {
149                 CERROR("Can't check for possible split, rc %d\n", rc);
150                 RETURN(MDL_MINMODE);
151         }
152
153         /*
154          * Do not take PDO lock on non-splittable objects if this is not PW,
155          * this should speed things up a bit.
156          */
157         if (split == CMM_SPLIT_DONE && lm != MDL_PW)
158                 RETURN(MDL_NL);
159
160         /* Protect splitting by exclusive lock. */
161         if (split == CMM_SPLIT_NEEDED && lm == MDL_PW)
162                 RETURN(MDL_EX);
163
164         /*
165          * Have no idea about lock mode, let it be what higher layer wants.
166          */
167         RETURN(MDL_MINMODE);
168 }
169
170 /* Check if split is expected for current thread. */
171 int cmm_split_expect(const struct lu_env *env, struct md_object *mo,
172                      struct md_attr *ma, int *split)
173 {
174         struct cmm_device *cmm = cmm_obj2dev(md2cmm_obj(mo));
175         struct cml_object *clo = md2cml_obj(mo);
176         struct lu_fid root_fid;
177         int rc;
178         ENTRY;
179
180         if (clo->clo_split == CMM_SPLIT_DONE ||
181             clo->clo_split == CMM_SPLIT_DENIED) {
182                 *split = clo->clo_split;
183                 RETURN(0);
184         }
185         /* CMM_SPLIT_UNKNOWN case below */
186
187         /* No need to split root object. */
188         rc = cmm_child_ops(cmm)->mdo_root_get(env, cmm->cmm_child,
189                                               &root_fid);
190         if (rc)
191                 RETURN(rc);
192
193         if (lu_fid_eq(&root_fid, cmm2fid(md2cmm_obj(mo)))) {
194                 /* update split state */
195                 *split = clo->clo_split == CMM_SPLIT_DENIED;
196                 RETURN(0);
197         }
198
199         /*
200          * Assumption: ma_valid = 0 here, we only need get inode and lmv_size
201          * for this get_attr.
202          */
203         LASSERT(ma->ma_valid == 0);
204         ma->ma_need = MA_INODE | MA_LMV;
205         rc = mo_attr_get(env, mo, ma);
206         if (rc)
207                 RETURN(rc);
208
209         /* No need split for already split object */
210         if (ma->ma_valid & MA_LMV) {
211                 LASSERT(ma->ma_lmv_size > 0);
212                 *split = clo->clo_split = CMM_SPLIT_DONE;
213                 RETURN(0);
214         }
215
216         /* No need split for object whose size < CMM_SPLIT_SIZE */
217         if (ma->ma_attr.la_size < CMM_SPLIT_SIZE) {
218                 *split = clo->clo_split = CMM_SPLIT_NONE;
219                 RETURN(0);
220         }
221
222         *split = clo->clo_split = CMM_SPLIT_NEEDED;
223         RETURN(0);
224 }
225
226 struct cmm_object *cmm_object_find(const struct lu_env *env,
227                                    struct cmm_device *d,
228                                    const struct lu_fid *f)
229 {
230         struct lu_object *o;
231         struct cmm_object *m;
232         ENTRY;
233
234         o = lu_object_find(env, d->cmm_md_dev.md_lu_dev.ld_site, f);
235         if (IS_ERR(o))
236                 m = (struct cmm_object *)o;
237         else
238                 m = lu2cmm_obj(lu_object_locate(o->lo_header,
239                                d->cmm_md_dev.md_lu_dev.ld_type));
240         RETURN(m);
241 }
242
243 static inline void cmm_object_put(const struct lu_env *env,
244                                   struct cmm_object *o)
245 {
246         lu_object_put(env, &o->cmo_obj.mo_lu);
247 }
248
249 /*
250  * Allocate new on passed @mc for slave object which is going to create there
251  * soon.
252  */
253 static int cmm_split_fid_alloc(const struct lu_env *env,
254                                struct cmm_device *cmm,
255                                struct mdc_device *mc,
256                                struct lu_fid *fid)
257 {
258         int rc;
259         ENTRY;
260
261         LASSERT(cmm != NULL && mc != NULL && fid != NULL);
262
263         down(&mc->mc_fid_sem);
264
265         /* Alloc new fid on @mc. */
266         rc = obd_fid_alloc(mc->mc_desc.cl_exp, fid, NULL);
267         if (rc > 0) {
268                 /* Setup FLD for new sequenceif needed. */
269                 rc = fld_client_create(cmm->cmm_fld, fid_seq(fid),
270                                        mc->mc_num, env);
271                 if (rc)
272                         CERROR("Can't create fld entry, rc %d\n", rc);
273         }
274         up(&mc->mc_fid_sem);
275
276         RETURN(rc);
277 }
278
279 /* Allocate new slave object on passed @mc */
280 static int cmm_split_slave_create(const struct lu_env *env,
281                                   struct cmm_device *cmm,
282                                   struct mdc_device *mc,
283                                   struct lu_fid *fid,
284                                   struct md_attr *ma,
285                                   struct lmv_stripe_md *lmv,
286                                   int lmv_size)
287 {
288         struct md_op_spec *spec = &cmm_env_info(env)->cmi_spec;
289         struct cmm_object *obj;
290         int rc;
291         ENTRY;
292
293         /* Allocate new fid and store it to @fid */
294         rc = cmm_split_fid_alloc(env, cmm, mc, fid);
295         if (rc) {
296                 CERROR("Can't alloc new fid on "LPU64
297                        ", rc %d\n", mc->mc_num, rc);
298                 RETURN(rc);
299         }
300
301         /* Allocate new object on @mc */
302         obj = cmm_object_find(env, cmm, fid);
303         if (IS_ERR(obj))
304                 RETURN(PTR_ERR(obj));
305
306         memset(spec, 0, sizeof *spec);
307         spec->u.sp_ea.fid = fid;
308         spec->u.sp_ea.eadata = lmv;
309         spec->u.sp_ea.eadatalen = lmv_size;
310         spec->sp_cr_flags |= MDS_CREATE_SLAVE_OBJ;
311         rc = mo_object_create(env, md_object_next(&obj->cmo_obj),
312                               spec, ma);
313         cmm_object_put(env, obj);
314         RETURN(rc);
315 }
316
317 /*
318  * Create so many slaves as number of stripes. This is called in split time
319  * before sending pages to slaves.
320  */
321 static int cmm_split_slaves_create(const struct lu_env *env,
322                                    struct md_object *mo,
323                                    struct md_attr *ma)
324 {
325         struct cmm_device    *cmm = cmm_obj2dev(md2cmm_obj(mo));
326         struct lu_fid        *lf  = cmm2fid(md2cmm_obj(mo));
327         struct lmv_stripe_md *slave_lmv = &cmm_env_info(env)->cmi_lmv;
328         struct mdc_device    *mc, *tmp;
329         struct lmv_stripe_md *lmv;
330         int i = 1, rc = 0;
331         ENTRY;
332
333         /* Init the split MEA */
334         lmv = ma->ma_lmv;
335         lmv->mea_master = cmm->cmm_local_num;
336         lmv->mea_magic = MEA_MAGIC_HASH_SEGMENT;
337         lmv->mea_count = cmm->cmm_tgt_count + 1;
338
339         /*
340          * Store master FID to local node idx number. Local node is always
341          * master and its stripe number if 0.
342          */
343         lmv->mea_ids[0] = *lf;
344
345         memset(slave_lmv, 0, sizeof *slave_lmv);
346         slave_lmv->mea_master = cmm->cmm_local_num;
347         slave_lmv->mea_magic = MEA_MAGIC_HASH_SEGMENT;
348         slave_lmv->mea_count = 0;
349
350         list_for_each_entry_safe(mc, tmp, &cmm->cmm_targets, mc_linkage) {
351                 rc = cmm_split_slave_create(env, cmm, mc, &lmv->mea_ids[i],
352                                             ma, slave_lmv, sizeof(*slave_lmv));
353                 if (rc)
354                         GOTO(cleanup, rc);
355                 i++;
356         }
357
358         ma->ma_valid |= MA_LMV;
359         EXIT;
360 cleanup:
361         return rc;
362 }
363
364 static inline int cmm_split_special_entry(struct lu_dirent *ent)
365 {
366         if (!strncmp(ent->lde_name, ".", le16_to_cpu(ent->lde_namelen)) ||
367             !strncmp(ent->lde_name, "..", le16_to_cpu(ent->lde_namelen)))
368                 return 1;
369         return 0;
370 }
371
372 static inline struct lu_name *cmm_name(const struct lu_env *env,
373                                        char *name, int buflen)
374 {
375         struct lu_name *lname;
376         struct cmm_thread_info *cmi;
377
378         LASSERT(buflen > 0);
379         LASSERT(name[buflen - 1] == '\0');
380
381         cmi = cmm_env_info(env);
382         lname = &cmi->cti_name;
383         lname->ln_name = name;
384         /* NOT count the terminating '\0' of name for length */
385         lname->ln_namelen = buflen - 1;
386         return lname;
387 }
388
389 /*
390  * Remove one entry from local MDT. Do not corrupt byte order in page, it will
391  * be sent to remote MDT.
392  */
393 static int cmm_split_remove_entry(const struct lu_env *env,
394                                   struct md_object *mo,
395                                   struct lu_dirent *ent)
396 {
397         struct cmm_device *cmm = cmm_obj2dev(md2cmm_obj(mo));
398         struct cmm_thread_info *cmi;
399          struct md_attr *ma;
400         struct cmm_object *obj;
401         int is_dir, rc;
402         char *name;
403         struct lu_name *lname;
404         ENTRY;
405
406         if (cmm_split_special_entry(ent))
407                 RETURN(0);
408
409         fid_le_to_cpu(&cmm_env_info(env)->cmi_fid, &ent->lde_fid);
410         obj = cmm_object_find(env, cmm, &cmm_env_info(env)->cmi_fid);
411         if (IS_ERR(obj))
412                 RETURN(PTR_ERR(obj));
413
414         cmi = cmm_env_info(env);
415         ma = &cmi->cmi_ma;
416
417         if (lu_object_exists(&obj->cmo_obj.mo_lu) > 0)
418                 is_dir = S_ISDIR(lu_object_attr(&obj->cmo_obj.mo_lu));
419         else
420                 /*
421                  * XXX: These days only cross-ref dirs are possible, so for the
422                  * sake of simplicity, in split, we suppose that all cross-ref
423                  * names pint to directory and do not do additional getattr to
424                  * remote MDT.
425                  */
426                 is_dir = 1;
427
428         OBD_ALLOC(name, le16_to_cpu(ent->lde_namelen) + 1);
429         if (!name)
430                 GOTO(cleanup, rc = -ENOMEM);
431
432         memcpy(name, ent->lde_name, le16_to_cpu(ent->lde_namelen));
433         lname = cmm_name(env, name, le16_to_cpu(ent->lde_namelen) + 1);
434         /*
435          * When split, no need update parent's ctime,
436          * and no permission check for name_remove.
437          */
438         ma->ma_attr.la_ctime = 0;
439         if (is_dir)
440                 ma->ma_attr.la_mode = S_IFDIR;
441         else
442                 ma->ma_attr.la_mode = 0;
443         ma->ma_attr.la_valid = LA_MODE;
444         ma->ma_valid = MA_INODE;
445
446         ma->ma_attr_flags |= MDS_PERM_BYPASS;
447         rc = mdo_name_remove(env, md_object_next(mo), lname, ma);
448         OBD_FREE(name, le16_to_cpu(ent->lde_namelen) + 1);
449         if (rc)
450                 GOTO(cleanup, rc);
451
452         /*
453          * This @ent will be transferred to slave MDS and insert there, so in
454          * the slave MDS, we should know whether this object is dir or not, so
455          * use the highest bit of the hash to indicate that (because we do not
456          * use highest bit of hash).
457          */
458         if (is_dir) {
459                 ent->lde_hash = le64_to_cpu(ent->lde_hash);
460                 ent->lde_hash = cpu_to_le64(ent->lde_hash | MAX_HASH_HIGHEST_BIT);
461         }
462         EXIT;
463 cleanup:
464         cmm_object_put(env, obj);
465         return rc;
466 }
467
468 /*
469  * Remove all entries from passed page. These entries are going to remote MDT
470  * and thus should be removed locally.
471  */
472 static int cmm_split_remove_page(const struct lu_env *env,
473                                  struct md_object *mo,
474                                  struct lu_rdpg *rdpg,
475                                  __u64 hash_end, __u32 *len)
476 {
477         struct lu_dirpage *dp;
478         struct lu_dirent  *ent;
479         int rc = 0;
480         ENTRY;
481
482         *len = 0;
483         kmap(rdpg->rp_pages[0]);
484         dp = page_address(rdpg->rp_pages[0]);
485         for (ent = lu_dirent_start(dp);
486              ent != NULL && le64_to_cpu(ent->lde_hash) < hash_end;
487              ent = lu_dirent_next(ent)) {
488                 rc = cmm_split_remove_entry(env, mo, ent);
489                 if (rc) {
490                         /*
491                          * XXX: Error handler to insert remove name back,
492                          * currently we assumed it will success anyway in
493                          * verfication test.
494                          */
495                         CERROR("Can not del %*.*s, rc %d\n",
496                                le16_to_cpu(ent->lde_namelen),
497                                le16_to_cpu(ent->lde_namelen),
498                                ent->lde_name, rc);
499                         GOTO(unmap, rc);
500                 }
501                 *len += lu_dirent_size(ent);
502         }
503
504         if (ent != lu_dirent_start(dp))
505                 *len += sizeof(struct lu_dirpage);
506         EXIT;
507 unmap:
508         kunmap(rdpg->rp_pages[0]);
509         return rc;
510 }
511
512 /* Send one page to remote MDT for creating entries there. */
513 static int cmm_split_send_page(const struct lu_env *env,
514                                struct md_object *mo,
515                                struct lu_rdpg *rdpg,
516                                struct lu_fid *fid, int len)
517 {
518         struct cmm_device *cmm = cmm_obj2dev(md2cmm_obj(mo));
519         struct cmm_object *obj;
520         int rc = 0;
521         ENTRY;
522
523         obj = cmm_object_find(env, cmm, fid);
524         if (IS_ERR(obj))
525                 RETURN(PTR_ERR(obj));
526
527         rc = mdc_send_page(cmm, env, md_object_next(&obj->cmo_obj),
528                            rdpg->rp_pages[0], len);
529         cmm_object_put(env, obj);
530         RETURN(rc);
531 }
532
533 /* Read one page of entries from local MDT. */
534 static int cmm_split_read_page(const struct lu_env *env,
535                                struct md_object *mo,
536                                struct lu_rdpg *rdpg)
537 {
538         int rc;
539         ENTRY;
540         memset(cfs_kmap(rdpg->rp_pages[0]), 0, CFS_PAGE_SIZE);
541         cfs_kunmap(rdpg->rp_pages[0]);
542         rc = mo_readpage(env, md_object_next(mo), rdpg);
543         RETURN(rc);
544 }
545
546 /*
547  * This function performs migration of all pages with entries which fit into one
548  * stripe and one hash segment.
549  */
550 static int cmm_split_process_stripe(const struct lu_env *env,
551                                     struct md_object *mo,
552                                     struct lu_rdpg *rdpg,
553                                     struct lu_fid *lf,
554                                     __u64 end)
555 {
556         int rc, done = 0;
557         ENTRY;
558
559         LASSERT(rdpg->rp_npages == 1);
560         do {
561                 struct lu_dirpage *ldp;
562                 __u32 len = 0;
563
564                 /* Read one page from local MDT. */
565                 rc = cmm_split_read_page(env, mo, rdpg);
566                 if (rc) {
567                         CERROR("Error in readpage: %d\n", rc);
568                         RETURN(rc);
569                 }
570
571                 /* Remove local entries which are going to remite MDT. */
572                 rc = cmm_split_remove_page(env, mo, rdpg, end, &len);
573                 if (rc) {
574                         CERROR("Error in remove stripe entries: %d\n", rc);
575                         RETURN(rc);
576                 }
577
578                 /* Send entries page to slave MDT. */
579                 if (len > 0) {
580                         rc = cmm_split_send_page(env, mo, rdpg, lf, len);
581                         if (rc) {
582                                 CERROR("Error in sending page: %d\n", rc);
583                                 RETURN(rc);
584                         }
585                 }
586
587                 kmap(rdpg->rp_pages[0]);
588                 ldp = page_address(rdpg->rp_pages[0]);
589                 if (le64_to_cpu(ldp->ldp_hash_end) >= end)
590                         done = 1;
591
592                 rdpg->rp_hash = le64_to_cpu(ldp->ldp_hash_end);
593                 kunmap(rdpg->rp_pages[0]);
594         } while (!done);
595
596         RETURN(rc);
597 }
598
599 static int cmm_split_process_dir(const struct lu_env *env,
600                                  struct md_object *mo,
601                                  struct md_attr *ma)
602 {
603         struct cmm_device *cmm = cmm_obj2dev(md2cmm_obj(mo));
604         struct lu_rdpg *rdpg = &cmm_env_info(env)->cmi_rdpg;
605         __u64 hash_segement;
606         int rc = 0, i;
607         ENTRY;
608
609         memset(rdpg, 0, sizeof *rdpg);
610         rdpg->rp_npages = CMM_SPLIT_PAGE_COUNT;
611         rdpg->rp_count  = CFS_PAGE_SIZE * rdpg->rp_npages;
612         rdpg->rp_pages  = cmm_env_info(env)->cmi_pages;
613
614         for (i = 0; i < rdpg->rp_npages; i++) {
615                 rdpg->rp_pages[i] = cfs_alloc_page(CFS_ALLOC_STD);
616                 if (rdpg->rp_pages[i] == NULL)
617                         GOTO(cleanup, rc = -ENOMEM);
618         }
619
620         LASSERT(ma->ma_valid & MA_LMV);
621         hash_segement = MAX_HASH_SIZE / (cmm->cmm_tgt_count + 1);
622         for (i = 1; i < cmm->cmm_tgt_count + 1; i++) {
623                 struct lu_fid *lf;
624                 __u64 hash_end;
625
626                 lf = &ma->ma_lmv->mea_ids[i];
627
628                 rdpg->rp_hash = i * hash_segement;
629                 if (i == cmm->cmm_tgt_count)
630                         hash_end = MAX_HASH_SIZE;
631                 else
632                         hash_end = rdpg->rp_hash + hash_segement;
633                 rc = cmm_split_process_stripe(env, mo, rdpg, lf, hash_end);
634                 if (rc) {
635                         CERROR("Error (rc = %d) while splitting for %d: fid="
636                                DFID", %08x:%08x\n", rc, i, PFID(lf),
637                                rdpg->rp_hash, hash_end);
638                         GOTO(cleanup, rc);
639                 }
640         }
641         EXIT;
642 cleanup:
643         for (i = 0; i < rdpg->rp_npages; i++)
644                 if (rdpg->rp_pages[i] != NULL)
645                         __cfs_free_page(rdpg->rp_pages[i]);
646         return rc;
647 }
648
649 int cmm_split_dir(const struct lu_env *env, struct md_object *mo)
650 {
651         struct cmm_device *cmm = cmm_obj2dev(md2cmm_obj(mo));
652         struct md_attr    *ma = &cmm_env_info(env)->cmi_ma;
653         int                rc = 0, split;
654         struct lu_buf     *buf;
655         ENTRY;
656
657         cmm_lprocfs_time_start(env);
658
659         LASSERT(S_ISDIR(lu_object_attr(&mo->mo_lu)));
660         memset(ma, 0, sizeof(*ma));
661
662         /* Step1: Checking whether the dir needs to be split. */
663         rc = cmm_split_expect(env, mo, ma, &split);
664         if (rc)
665                 GOTO(out, rc);
666
667         if (split != CMM_SPLIT_NEEDED) {
668                 /* No split is needed, caller may proceed with create. */
669                 GOTO(out, rc = 0);
670         }
671
672         /* Split should be done now, let's do it. */
673         CWARN("Dir "DFID" is going to split (size: "LPU64")\n",
674               PFID(lu_object_fid(&mo->mo_lu)), ma->ma_attr.la_size);
675
676         /*
677          * Disable transacrions for split, since there will be so many trans in
678          * this one ops, conflict with current recovery design.
679          */
680         rc = cmm_upcall(env, &cmm->cmm_md_dev, MD_NO_TRANS);
681         if (rc) {
682                 CERROR("Can't disable trans for split, rc %d\n", rc);
683                 GOTO(out, rc);
684         }
685
686         /* Step2: Prepare the md memory */
687         ma->ma_lmv_size = CMM_MD_SIZE(cmm->cmm_tgt_count + 1);
688         OBD_ALLOC(ma->ma_lmv, ma->ma_lmv_size);
689         if (ma->ma_lmv == NULL)
690                 GOTO(out, rc = -ENOMEM);
691
692         /* Step3: Create slave objects and fill the ma->ma_lmv */
693         rc = cmm_split_slaves_create(env, mo, ma);
694         if (rc) {
695                 CERROR("Can't create slaves for split, rc %d\n", rc);
696                 GOTO(cleanup, rc);
697         }
698
699         /* Step4: Scan and split the object. */
700         rc = cmm_split_process_dir(env, mo, ma);
701         if (rc) {
702                 CERROR("Can't scan and split, rc %d\n", rc);
703                 GOTO(cleanup, rc);
704         }
705
706         /* Step5: Set mea to the master object. */
707         LASSERT(ma->ma_valid & MA_LMV);
708         buf = cmm_buf_get(env, ma->ma_lmv, ma->ma_lmv_size);
709         rc = mo_xattr_set(env, md_object_next(mo), buf,
710                           MDS_LMV_MD_NAME, 0);
711         if (rc) {
712                 CERROR("Can't set MEA to master dir, " "rc %d\n", rc);
713                 GOTO(cleanup, rc);
714         }
715
716         /* set flag in cmm_object */
717         md2cml_obj(mo)->clo_split = CMM_SPLIT_DONE;
718
719         /*
720          * Finally, split succeed, tell client to repeat opetartion on correct
721          * MDT.
722          */
723         CWARN("Dir "DFID" has been split\n", PFID(lu_object_fid(&mo->mo_lu)));
724         rc = -ERESTART;
725         EXIT;
726 cleanup:
727         OBD_FREE(ma->ma_lmv, ma->ma_lmv_size);
728 out:
729         cmm_lprocfs_time_end(env, cmm, LPROC_CMM_SPLIT);
730         return rc;
731 }