Whamcloud - gitweb
b=24037 Changes of 2.6.32 kernel.
[fs/lustre-release.git] / lustre / cmm / cmm_split.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/cmm/cmm_split.c
37  *
38  * Lustre splitting dir
39  *
40  * Author: Alex Thomas  <alex@clusterfs.com>
41  * Author: Wang Di      <wangdi@clusterfs.com>
42  * Author: Yury Umanets <umka@clusterfs.com>
43  */
44
45 #ifndef EXPORT_SYMTAB
46 # define EXPORT_SYMTAB
47 #endif
48
49 #define DEBUG_SUBSYSTEM S_MDS
50
51 #include <obd_class.h>
52 #include <lustre_fid.h>
53 #include <lustre_mds.h>
54 #include <lustre/lustre_idl.h>
55 #include "cmm_internal.h"
56 #include "mdc_internal.h"
57
58 /**
59  * \addtogroup split
60  * @{
61  */
62 enum {
63         CMM_SPLIT_SIZE =  128 * 1024
64 };
65
66 /**
67  * This function checks if passed \a name come to correct server (local MDT).
68  *
69  * \param mp Parent directory
70  * \param name Name to lookup
71  * \retval  -ERESTART Let client know that dir was split and client needs to
72  * chose correct stripe.
73  */
74 int cmm_split_check(const struct lu_env *env, struct md_object *mp,
75                     const char *name)
76 {
77         struct cmm_device *cmm = cmm_obj2dev(md2cmm_obj(mp));
78         struct md_attr *ma = &cmm_env_info(env)->cmi_ma;
79         struct cml_object *clo = md2cml_obj(mp);
80         int rc, lmv_size;
81         ENTRY;
82
83         cmm_lprocfs_time_start(env);
84
85         /* Not split yet */
86         if (clo->clo_split == CMM_SPLIT_NONE ||
87             clo->clo_split == CMM_SPLIT_DENIED)
88                 GOTO(out, rc = 0);
89
90         lmv_size = CMM_MD_SIZE(cmm->cmm_tgt_count + 1);
91
92         /* Try to get the LMV EA */
93         memset(ma, 0, sizeof(*ma));
94
95         ma->ma_need = MA_LMV;
96         ma->ma_lmv_size = lmv_size;
97         OBD_ALLOC(ma->ma_lmv, lmv_size);
98         if (ma->ma_lmv == NULL)
99                 GOTO(out, rc = -ENOMEM);
100
101         /* Get LMV EA, Note: refresh valid here for getting LMV_EA */
102         rc = mo_attr_get(env, mp, ma);
103         if (rc)
104                 GOTO(cleanup, rc);
105
106         /* No LMV just return */
107         if (!(ma->ma_valid & MA_LMV)) {
108                 /* update split state if unknown */
109                 if (clo->clo_split == CMM_SPLIT_UNKNOWN)
110                         clo->clo_split = CMM_SPLIT_NONE;
111                 GOTO(cleanup, rc = 0);
112         }
113
114         /* Skip checking the slave dirs (mea_count is 0) */
115         if (ma->ma_lmv->mea_count != 0) {
116                 int idx;
117
118                 /**
119                  * This gets stripe by name to check the name belongs to master
120                  * dir, otherwise return the -ERESTART
121                  */
122                 idx = mea_name2idx(ma->ma_lmv, name, strlen(name));
123
124                 /**
125                  * When client does not know about split, it sends create() to
126                  * the master MDT and master replay back if directory is split.
127                  * So client should orward request to correct MDT. This
128                  * is why we check here if stripe zero or not. Zero stripe means
129                  * master stripe. If stripe calculated from name is not zero -
130                  * return -ERESTART.
131                  */
132                 if (idx != 0)
133                         rc = -ERESTART;
134
135                 /* update split state to DONE if unknown */
136                 if (clo->clo_split == CMM_SPLIT_UNKNOWN)
137                         clo->clo_split = CMM_SPLIT_DONE;
138         } else {
139                 /* split is denied for slave dir */
140                 clo->clo_split = CMM_SPLIT_DENIED;
141         }
142         EXIT;
143 cleanup:
144         OBD_FREE(ma->ma_lmv, lmv_size);
145 out:
146         cmm_lprocfs_time_end(env, cmm, LPROC_CMM_SPLIT_CHECK);
147         return rc;
148 }
149
150 /**
151  * Return preferable access mode to the caller taking into account the split
152  * case and the fact of existing not splittable dirs.
153  */
154 int cmm_split_access(const struct lu_env *env, struct md_object *mo,
155                      mdl_mode_t lm)
156 {
157         struct md_attr *ma = &cmm_env_info(env)->cmi_ma;
158         int rc, split;
159         ENTRY;
160
161         memset(ma, 0, sizeof(*ma));
162
163         /*
164          * Check only if we need protection from split.  If not - mdt handles
165          * other cases.
166          */
167         rc = cmm_split_expect(env, mo, ma, &split);
168         if (rc) {
169                 CERROR("Can't check for possible split, rc %d\n", rc);
170                 RETURN(MDL_MINMODE);
171         }
172
173         /*
174          * Do not take PDO lock on non-splittable objects if this is not PW,
175          * this should speed things up a bit.
176          */
177         if (split == CMM_SPLIT_DONE && lm != MDL_PW)
178                 RETURN(MDL_NL);
179
180         /* Protect splitting by exclusive lock. */
181         if (split == CMM_SPLIT_NEEDED && lm == MDL_PW)
182                 RETURN(MDL_EX);
183
184         /*
185          * Have no idea about lock mode, let it be what higher layer wants.
186          */
187         RETURN(MDL_MINMODE);
188 }
189
190 /**
191  * Check if split is expected for current thread.
192  *
193  * \param mo Directory to split.
194  * \param ma md attributes.
195  * \param split Flag to save split information.
196  */
197 int cmm_split_expect(const struct lu_env *env, struct md_object *mo,
198                      struct md_attr *ma, int *split)
199 {
200         struct cmm_device *cmm = cmm_obj2dev(md2cmm_obj(mo));
201         struct cml_object *clo = md2cml_obj(mo);
202         struct lu_fid root_fid;
203         int rc;
204         ENTRY;
205
206         if (clo->clo_split == CMM_SPLIT_DONE ||
207             clo->clo_split == CMM_SPLIT_DENIED) {
208                 *split = clo->clo_split;
209                 RETURN(0);
210         }
211         /* CMM_SPLIT_UNKNOWN case below */
212
213         /* No need to split root object. */
214         rc = cmm_child_ops(cmm)->mdo_root_get(env, cmm->cmm_child,
215                                               &root_fid);
216         if (rc)
217                 RETURN(rc);
218
219         if (lu_fid_eq(&root_fid, cmm2fid(md2cmm_obj(mo)))) {
220                 /* update split state */
221                 *split = clo->clo_split == CMM_SPLIT_DENIED;
222                 RETURN(0);
223         }
224
225         /*
226          * Assumption: ma_valid = 0 here, we only need get inode and lmv_size
227          * for this get_attr.
228          */
229         LASSERT(ma->ma_valid == 0);
230         ma->ma_need = MA_INODE | MA_LMV;
231         rc = mo_attr_get(env, mo, ma);
232         if (rc)
233                 RETURN(rc);
234
235         /* No need split for already split object */
236         if (ma->ma_valid & MA_LMV) {
237                 LASSERT(ma->ma_lmv_size > 0);
238                 *split = clo->clo_split = CMM_SPLIT_DONE;
239                 RETURN(0);
240         }
241
242         /* No need split for object whose size < CMM_SPLIT_SIZE */
243         if (ma->ma_attr.la_size < CMM_SPLIT_SIZE) {
244                 *split = clo->clo_split = CMM_SPLIT_NONE;
245                 RETURN(0);
246         }
247
248         *split = clo->clo_split = CMM_SPLIT_NEEDED;
249         RETURN(0);
250 }
251
252 struct cmm_object *cmm_object_find(const struct lu_env *env,
253                                    struct cmm_device *d,
254                                    const struct lu_fid *f)
255 {
256         return md2cmm_obj(md_object_find_slice(env, &d->cmm_md_dev, fid));
257 }
258
259 static inline void cmm_object_put(const struct lu_env *env,
260                                   struct cmm_object *o)
261 {
262         lu_object_put(env, &o->cmo_obj.mo_lu);
263 }
264
265 /**
266  * Allocate new FID on passed \a mc for slave object which is going to
267  * create there soon.
268  */
269 static int cmm_split_fid_alloc(const struct lu_env *env,
270                                struct cmm_device *cmm,
271                                struct mdc_device *mc,
272                                struct lu_fid *fid)
273 {
274         int rc;
275         ENTRY;
276
277         LASSERT(cmm != NULL && mc != NULL && fid != NULL);
278
279         cfs_down(&mc->mc_fid_sem);
280
281         /* Alloc new fid on \a mc. */
282         rc = obd_fid_alloc(mc->mc_desc.cl_exp, fid, NULL);
283         if (rc > 0)
284                 rc = 0;
285         cfs_up(&mc->mc_fid_sem);
286
287         RETURN(rc);
288 }
289
290 /**
291  * Allocate new slave object on passed \a mc.
292  */
293 static int cmm_split_slave_create(const struct lu_env *env,
294                                   struct cmm_device *cmm,
295                                   struct mdc_device *mc,
296                                   struct lu_fid *fid,
297                                   struct md_attr *ma,
298                                   struct lmv_stripe_md *lmv,
299                                   int lmv_size)
300 {
301         struct md_op_spec *spec = &cmm_env_info(env)->cmi_spec;
302         struct cmm_object *obj;
303         int rc;
304         ENTRY;
305
306         /* Allocate new fid and store it to @fid */
307         rc = cmm_split_fid_alloc(env, cmm, mc, fid);
308         if (rc) {
309                 CERROR("Can't alloc new fid on "LPU64
310                        ", rc %d\n", mc->mc_num, rc);
311                 RETURN(rc);
312         }
313
314         /* Allocate new object on @mc */
315         obj = cmm_object_find(env, cmm, fid);
316         if (IS_ERR(obj))
317                 RETURN(PTR_ERR(obj));
318
319         memset(spec, 0, sizeof *spec);
320         spec->u.sp_ea.fid = fid;
321         spec->u.sp_ea.eadata = lmv;
322         spec->u.sp_ea.eadatalen = lmv_size;
323         spec->sp_cr_flags |= MDS_CREATE_SLAVE_OBJ;
324         rc = mo_object_create(env, md_object_next(&obj->cmo_obj),
325                               spec, ma);
326         cmm_object_put(env, obj);
327         RETURN(rc);
328 }
329
330 /**
331  * Create so many slaves as number of stripes.
332  * This is called in split time before sending pages to slaves.
333  */
334 static int cmm_split_slaves_create(const struct lu_env *env,
335                                    struct md_object *mo,
336                                    struct md_attr *ma)
337 {
338         struct cmm_device    *cmm = cmm_obj2dev(md2cmm_obj(mo));
339         struct lu_fid        *lf  = cmm2fid(md2cmm_obj(mo));
340         struct lmv_stripe_md *slave_lmv = &cmm_env_info(env)->cmi_lmv;
341         struct mdc_device    *mc, *tmp;
342         struct lmv_stripe_md *lmv;
343         int i = 1, rc = 0;
344         ENTRY;
345
346         /* Init the split MEA */
347         lmv = ma->ma_lmv;
348         lmv->mea_master = cmm->cmm_local_num;
349         lmv->mea_magic = MEA_MAGIC_HASH_SEGMENT;
350         lmv->mea_count = cmm->cmm_tgt_count + 1;
351
352         /*
353          * Store master FID to local node idx number. Local node is always
354          * master and its stripe number if 0.
355          */
356         lmv->mea_ids[0] = *lf;
357
358         memset(slave_lmv, 0, sizeof *slave_lmv);
359         slave_lmv->mea_master = cmm->cmm_local_num;
360         slave_lmv->mea_magic = MEA_MAGIC_HASH_SEGMENT;
361         slave_lmv->mea_count = 0;
362
363         cfs_list_for_each_entry_safe(mc, tmp, &cmm->cmm_targets, mc_linkage) {
364                 rc = cmm_split_slave_create(env, cmm, mc, &lmv->mea_ids[i],
365                                             ma, slave_lmv, sizeof(*slave_lmv));
366                 if (rc)
367                         GOTO(cleanup, rc);
368                 i++;
369         }
370         EXIT;
371 cleanup:
372         return rc;
373 }
374
375 static inline int cmm_split_special_entry(struct lu_dirent *ent)
376 {
377         if (!strncmp(ent->lde_name, ".", le16_to_cpu(ent->lde_namelen)) ||
378             !strncmp(ent->lde_name, "..", le16_to_cpu(ent->lde_namelen)))
379                 return 1;
380         return 0;
381 }
382
383 /**
384  * Convert string to the lu_name structure.
385  */
386 static inline struct lu_name *cmm_name(const struct lu_env *env,
387                                        char *name, int buflen)
388 {
389         struct lu_name *lname;
390         struct cmm_thread_info *cmi;
391
392         LASSERT(buflen > 0);
393         LASSERT(name[buflen - 1] == '\0');
394
395         cmi = cmm_env_info(env);
396         lname = &cmi->cti_name;
397         lname->ln_name = name;
398         /* do NOT count the terminating '\0' of name for length */
399         lname->ln_namelen = buflen - 1;
400         return lname;
401 }
402
403 /**
404  * Helper for cmm_split_remove_page(). It removes one entry from local MDT.
405  * Do not corrupt byte order in page, it will be sent to remote MDT.
406  */
407 static int cmm_split_remove_entry(const struct lu_env *env,
408                                   struct md_object *mo,
409                                   struct lu_dirent *ent)
410 {
411         struct cmm_device *cmm = cmm_obj2dev(md2cmm_obj(mo));
412         struct cmm_thread_info *cmi;
413         struct md_attr *ma;
414         struct cmm_object *obj;
415         int is_dir, rc;
416         char *name;
417         struct lu_name *lname;
418         ENTRY;
419
420         if (cmm_split_special_entry(ent))
421                 RETURN(0);
422
423         fid_le_to_cpu(&cmm_env_info(env)->cmi_fid, &ent->lde_fid);
424         obj = cmm_object_find(env, cmm, &cmm_env_info(env)->cmi_fid);
425         if (IS_ERR(obj))
426                 RETURN(PTR_ERR(obj));
427
428         cmi = cmm_env_info(env);
429         ma = &cmi->cmi_ma;
430
431         if (lu_object_exists(&obj->cmo_obj.mo_lu) > 0)
432                 is_dir = S_ISDIR(lu_object_attr(&obj->cmo_obj.mo_lu));
433         else
434                 /**
435                  * \note These days only cross-ref dirs are possible, so for the
436                  * sake of simplicity, in split, we suppose that all cross-ref
437                  * names point to directory and do not do additional getattr to
438                  * remote MDT.
439                  */
440                 is_dir = 1;
441
442         OBD_ALLOC(name, le16_to_cpu(ent->lde_namelen) + 1);
443         if (!name)
444                 GOTO(cleanup, rc = -ENOMEM);
445
446         memcpy(name, ent->lde_name, le16_to_cpu(ent->lde_namelen));
447         lname = cmm_name(env, name, le16_to_cpu(ent->lde_namelen) + 1);
448         /**
449          * \note When split, no need update parent's ctime,
450          * and no permission check for name_remove.
451          */
452         ma->ma_attr.la_ctime = 0;
453         if (is_dir)
454                 ma->ma_attr.la_mode = S_IFDIR;
455         else
456                 ma->ma_attr.la_mode = 0;
457         ma->ma_attr.la_valid = LA_MODE;
458         ma->ma_valid = MA_INODE;
459
460         ma->ma_attr_flags |= MDS_PERM_BYPASS;
461         rc = mdo_name_remove(env, md_object_next(mo), lname, ma);
462         OBD_FREE(name, le16_to_cpu(ent->lde_namelen) + 1);
463         if (rc)
464                 GOTO(cleanup, rc);
465
466         /**
467          * \note For each entry transferred to the slave MDS we should know
468          * whether this object is dir or not. Therefore the highest bit of the
469          * hash is used to indicate that (it is unused for hash purposes anyway).
470          */
471         if (is_dir) {
472                 ent->lde_hash = le64_to_cpu(ent->lde_hash);
473                 ent->lde_hash = cpu_to_le64(ent->lde_hash | MAX_HASH_HIGHEST_BIT);
474         }
475         EXIT;
476 cleanup:
477         cmm_object_put(env, obj);
478         return rc;
479 }
480
481 /**
482  * Remove all entries from passed page.
483  * These entries are going to remote MDT and thus should be removed locally.
484  */
485 static int cmm_split_remove_page(const struct lu_env *env,
486                                  struct md_object *mo,
487                                  struct lu_rdpg *rdpg,
488                                  __u64 hash_end, __u32 *len)
489 {
490         struct lu_dirpage *dp;
491         struct lu_dirent  *ent;
492         int rc = 0;
493         ENTRY;
494
495         *len = 0;
496         cfs_kmap(rdpg->rp_pages[0]);
497         dp = page_address(rdpg->rp_pages[0]);
498         for (ent = lu_dirent_start(dp);
499              ent != NULL && le64_to_cpu(ent->lde_hash) < hash_end;
500              ent = lu_dirent_next(ent)) {
501                 rc = cmm_split_remove_entry(env, mo, ent);
502                 if (rc) {
503                         /*
504                          * XXX: Error handler to insert remove name back,
505                          * currently we assumed it will success anyway in
506                          * verfication test.
507                          */
508                         CERROR("Can not del %*.*s, rc %d\n",
509                                le16_to_cpu(ent->lde_namelen),
510                                le16_to_cpu(ent->lde_namelen),
511                                ent->lde_name, rc);
512                         GOTO(unmap, rc);
513                 }
514                 *len += lu_dirent_size(ent);
515         }
516
517         if (ent != lu_dirent_start(dp))
518                 *len += sizeof(struct lu_dirpage);
519         EXIT;
520 unmap:
521         cfs_kunmap(rdpg->rp_pages[0]);
522         return rc;
523 }
524
525 /**
526  * Send one page of entries to the slave MDT.
527  * This page contains entries to be created there.
528  */
529 static int cmm_split_send_page(const struct lu_env *env,
530                                struct md_object *mo,
531                                struct lu_rdpg *rdpg,
532                                struct lu_fid *fid, int len)
533 {
534         struct cmm_device *cmm = cmm_obj2dev(md2cmm_obj(mo));
535         struct cmm_object *obj;
536         int rc = 0;
537         ENTRY;
538
539         obj = cmm_object_find(env, cmm, fid);
540         if (IS_ERR(obj))
541                 RETURN(PTR_ERR(obj));
542
543         rc = mdc_send_page(cmm, env, md_object_next(&obj->cmo_obj),
544                            rdpg->rp_pages[0], len);
545         cmm_object_put(env, obj);
546         RETURN(rc);
547 }
548
549 /** Read one page of entries from local MDT. */
550 static int cmm_split_read_page(const struct lu_env *env,
551                                struct md_object *mo,
552                                struct lu_rdpg *rdpg)
553 {
554         int rc;
555         ENTRY;
556         memset(cfs_kmap(rdpg->rp_pages[0]), 0, CFS_PAGE_SIZE);
557         cfs_kunmap(rdpg->rp_pages[0]);
558         rc = mo_readpage(env, md_object_next(mo), rdpg);
559         RETURN(rc);
560 }
561
562 /**
563  * This function performs migration of each directory stripe to its MDS.
564  */
565 static int cmm_split_process_stripe(const struct lu_env *env,
566                                     struct md_object *mo,
567                                     struct lu_rdpg *rdpg,
568                                     struct lu_fid *lf,
569                                     __u64 end)
570 {
571         int rc, done = 0;
572         ENTRY;
573
574         LASSERT(rdpg->rp_npages == 1);
575         do {
576                 struct lu_dirpage *ldp;
577                 __u32 len = 0;
578
579                 /** - Read one page of entries from local MDT. */
580                 rc = cmm_split_read_page(env, mo, rdpg);
581                 if (rc) {
582                         CERROR("Error in readpage: %d\n", rc);
583                         RETURN(rc);
584                 }
585
586                 /** - Remove local entries which are going to remite MDT. */
587                 rc = cmm_split_remove_page(env, mo, rdpg, end, &len);
588                 if (rc) {
589                         CERROR("Error in remove stripe entries: %d\n", rc);
590                         RETURN(rc);
591                 }
592
593                 /**
594                  * - Send entries page to slave MDT and repeat while there are
595                  * more pages.
596                  */
597                 if (len > 0) {
598                         rc = cmm_split_send_page(env, mo, rdpg, lf, len);
599                         if (rc) {
600                                 CERROR("Error in sending page: %d\n", rc);
601                                 RETURN(rc);
602                         }
603                 }
604
605                 cfs_kmap(rdpg->rp_pages[0]);
606                 ldp = page_address(rdpg->rp_pages[0]);
607                 if (le64_to_cpu(ldp->ldp_hash_end) >= end)
608                         done = 1;
609
610                 rdpg->rp_hash = le64_to_cpu(ldp->ldp_hash_end);
611                 cfs_kunmap(rdpg->rp_pages[0]);
612         } while (!done);
613
614         RETURN(rc);
615 }
616
617 /**
618  * Directory scanner for split operation.
619  *
620  * It calculates hashes for names and organizes files to stripes.
621  */
622 static int cmm_split_process_dir(const struct lu_env *env,
623                                  struct md_object *mo,
624                                  struct md_attr *ma)
625 {
626         struct cmm_device *cmm = cmm_obj2dev(md2cmm_obj(mo));
627         struct lu_rdpg *rdpg = &cmm_env_info(env)->cmi_rdpg;
628         __u64 hash_segment;
629         int rc = 0, i;
630         ENTRY;
631
632         memset(rdpg, 0, sizeof *rdpg);
633         rdpg->rp_npages = CMM_SPLIT_PAGE_COUNT;
634         rdpg->rp_count  = CFS_PAGE_SIZE * rdpg->rp_npages;
635         rdpg->rp_pages  = cmm_env_info(env)->cmi_pages;
636
637         for (i = 0; i < rdpg->rp_npages; i++) {
638                 rdpg->rp_pages[i] = cfs_alloc_page(CFS_ALLOC_STD);
639                 if (rdpg->rp_pages[i] == NULL)
640                         GOTO(cleanup, rc = -ENOMEM);
641         }
642
643         hash_segment = MAX_HASH_SIZE;
644         /** Whole hash range is divided on segments by number of MDS-es. */
645         do_div(hash_segment, cmm->cmm_tgt_count + 1);
646         /**
647          * For each segment the cmm_split_process_stripe() is called to move
648          * entries on new server.
649          */
650         for (i = 1; i < cmm->cmm_tgt_count + 1; i++) {
651                 struct lu_fid *lf;
652                 __u64 hash_end;
653
654                 lf = &ma->ma_lmv->mea_ids[i];
655
656                 rdpg->rp_hash = i * hash_segment;
657                 if (i == cmm->cmm_tgt_count)
658                         hash_end = MAX_HASH_SIZE;
659                 else
660                         hash_end = rdpg->rp_hash + hash_segment;
661                 rc = cmm_split_process_stripe(env, mo, rdpg, lf, hash_end);
662                 if (rc) {
663                         CERROR("Error (rc = %d) while splitting for %d: fid="
664                                DFID", "LPX64":"LPX64"\n", rc, i, PFID(lf),
665                                rdpg->rp_hash, hash_end);
666                         GOTO(cleanup, rc);
667                 }
668         }
669         EXIT;
670 cleanup:
671         for (i = 0; i < rdpg->rp_npages; i++)
672                 if (rdpg->rp_pages[i] != NULL)
673                         cfs_free_page(rdpg->rp_pages[i]);
674         return rc;
675 }
676
677 /**
678  * Directory splitting.
679  *
680  * Big directory can be split eventually.
681  */
682 int cmm_split_dir(const struct lu_env *env, struct md_object *mo)
683 {
684         struct cmm_device *cmm = cmm_obj2dev(md2cmm_obj(mo));
685         struct md_attr    *ma = &cmm_env_info(env)->cmi_ma;
686         int                rc = 0, split;
687         struct lu_buf     *buf;
688         ENTRY;
689
690         cmm_lprocfs_time_start(env);
691
692         LASSERT(S_ISDIR(lu_object_attr(&mo->mo_lu)));
693         memset(ma, 0, sizeof(*ma));
694
695         /** - Step1: Checking whether the dir needs to be split. */
696         rc = cmm_split_expect(env, mo, ma, &split);
697         if (rc)
698                 GOTO(out, rc);
699
700         if (split != CMM_SPLIT_NEEDED) {
701                 /* No split is needed, caller may proceed with create. */
702                 GOTO(out, rc = 0);
703         }
704
705         /* Split should be done now, let's do it. */
706         CWARN("Dir "DFID" is going to split (size: "LPU64")\n",
707               PFID(lu_object_fid(&mo->mo_lu)), ma->ma_attr.la_size);
708
709         /**
710          * /note Disable transactions for split, since there will be so many trans in
711          * this one ops, conflict with current recovery design.
712          */
713         rc = cmm_upcall(env, &cmm->cmm_md_dev, MD_NO_TRANS, NULL);
714         if (rc) {
715                 CERROR("Can't disable trans for split, rc %d\n", rc);
716                 GOTO(out, rc);
717         }
718
719         /** - Step2: Prepare the md memory */
720         ma->ma_lmv_size = CMM_MD_SIZE(cmm->cmm_tgt_count + 1);
721         OBD_ALLOC(ma->ma_lmv, ma->ma_lmv_size);
722         if (ma->ma_lmv == NULL)
723                 GOTO(out, rc = -ENOMEM);
724
725         /** - Step3: Create slave objects and fill the ma->ma_lmv */
726         rc = cmm_split_slaves_create(env, mo, ma);
727         if (rc) {
728                 CERROR("Can't create slaves for split, rc %d\n", rc);
729                 GOTO(cleanup, rc);
730         }
731
732         /** - Step4: Scan and split the object. */
733         rc = cmm_split_process_dir(env, mo, ma);
734         if (rc) {
735                 CERROR("Can't scan and split, rc %d\n", rc);
736                 GOTO(cleanup, rc);
737         }
738
739         /** - Step5: Set mea to the master object. */
740         buf = cmm_buf_get(env, ma->ma_lmv, ma->ma_lmv_size);
741         rc = mo_xattr_set(env, md_object_next(mo), buf,
742                           MDS_LMV_MD_NAME, 0);
743         if (rc) {
744                 CERROR("Can't set MEA to master dir, " "rc %d\n", rc);
745                 GOTO(cleanup, rc);
746         }
747
748         /* set flag in cmm_object */
749         md2cml_obj(mo)->clo_split = CMM_SPLIT_DONE;
750
751         /**
752          * - Finally, split succeed, tell client to repeat opetartion on correct
753          * MDT.
754          */
755         CWARN("Dir "DFID" has been split\n", PFID(lu_object_fid(&mo->mo_lu)));
756         rc = -ERESTART;
757         EXIT;
758 cleanup:
759         OBD_FREE(ma->ma_lmv, ma->ma_lmv_size);
760 out:
761         cmm_lprocfs_time_end(env, cmm, LPROC_CMM_SPLIT);
762         return rc;
763 }
764 /** @} */