Whamcloud - gitweb
d77e77aad07f6b7374a358691e409dd0276cd813
[fs/lustre-release.git] / lustre / cmm / cmm_split.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright  2008 Sun Microsystems, Inc. All rights reserved
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/cmm/cmm_split.c
37  *
38  * Lustre splitting dir
39  *
40  * Author: Alex Thomas  <alex@clusterfs.com>
41  * Author: Wang Di      <wangdi@clusterfs.com>
42  * Author: Yury Umanets <umka@clusterfs.com>
43  */
44
45 #ifndef EXPORT_SYMTAB
46 # define EXPORT_SYMTAB
47 #endif
48
49 #define DEBUG_SUBSYSTEM S_MDS
50
51 #include <obd_class.h>
52 #include <lustre_fid.h>
53 #include <lustre_mds.h>
54 #include <lustre/lustre_idl.h>
55 #include "cmm_internal.h"
56 #include "mdc_internal.h"
57
58 enum {
59         CMM_SPLIT_SIZE =  128 * 1024
60 };
61
62 /*
63  * This function checks if passed @name come to correct server (local MDT). If
64  * not - return -ERESTART and let client know that dir was split and client
65  * needs to chose correct stripe.
66  */
67 int cmm_split_check(const struct lu_env *env, struct md_object *mp,
68                     const char *name)
69 {
70         struct cmm_device *cmm = cmm_obj2dev(md2cmm_obj(mp));
71         struct md_attr *ma = &cmm_env_info(env)->cmi_ma;
72         struct cml_object *clo = md2cml_obj(mp);
73         int rc, lmv_size;
74         ENTRY;
75
76         cmm_lprocfs_time_start(env);
77
78         /* Not split yet */
79         if (clo->clo_split == CMM_SPLIT_NONE ||
80             clo->clo_split == CMM_SPLIT_DENIED)
81                 GOTO(out, rc = 0);
82
83         lmv_size = CMM_MD_SIZE(cmm->cmm_tgt_count + 1);
84
85         /* Try to get the LMV EA */
86         memset(ma, 0, sizeof(*ma));
87
88         ma->ma_need = MA_LMV;
89         ma->ma_lmv_size = lmv_size;
90         OBD_ALLOC(ma->ma_lmv, lmv_size);
91         if (ma->ma_lmv == NULL)
92                 GOTO(out, rc = -ENOMEM);
93
94         /* Get LMV EA, Note: refresh valid here for getting LMV_EA */
95         rc = mo_attr_get(env, mp, ma);
96         if (rc)
97                 GOTO(cleanup, rc);
98
99         /* No LMV just return */
100         if (!(ma->ma_valid & MA_LMV)) {
101                 /* update split state if unknown */
102                 if (clo->clo_split == CMM_SPLIT_UNKNOWN)
103                         clo->clo_split = CMM_SPLIT_NONE;
104                 GOTO(cleanup, rc = 0);
105         }
106
107         /* Skip checking the slave dirs (mea_count is 0) */
108         if (ma->ma_lmv->mea_count != 0) {
109                 int idx;
110
111                 /*
112                  * Get stripe by name to check the name belongs to master dir,
113                  * otherwise return the -ERESTART
114                  */
115                 idx = mea_name2idx(ma->ma_lmv, name, strlen(name));
116
117                 /*
118                  * Check if name came to correct MDT server. We suppose that if
119                  * client does not know about split, it sends create operation
120                  * to master MDT. And this is master job to say it that dir got
121                  * split and client should orward request to correct MDT. This
122                  * is why we check here if stripe zero or not. Zero stripe means
123                  * master stripe. If stripe calculated from name is not zero -
124                  * return -ERESTART.
125                  */
126                 if (idx != 0)
127                         rc = -ERESTART;
128
129                 /* update split state to DONE if unknown */
130                 if (clo->clo_split == CMM_SPLIT_UNKNOWN)
131                         clo->clo_split = CMM_SPLIT_DONE;
132         } else {
133                 /* split is denied for slave dir */
134                 clo->clo_split = CMM_SPLIT_DENIED;
135         }
136         EXIT;
137 cleanup:
138         OBD_FREE(ma->ma_lmv, lmv_size);
139 out:
140         cmm_lprocfs_time_end(env, cmm, LPROC_CMM_SPLIT_CHECK);
141         return rc;
142 }
143
144 /*
145  * Return preferable access mode to caller taking into account possible split
146  * and the fact of existing not splittable dirs in principle.
147  */
148 int cmm_split_access(const struct lu_env *env, struct md_object *mo,
149                      mdl_mode_t lm)
150 {
151         struct md_attr *ma = &cmm_env_info(env)->cmi_ma;
152         int rc, split;
153         ENTRY;
154
155         memset(ma, 0, sizeof(*ma));
156
157         /*
158          * Check only if we need protection from split.  If not - mdt handles
159          * other cases.
160          */
161         rc = cmm_split_expect(env, mo, ma, &split);
162         if (rc) {
163                 CERROR("Can't check for possible split, rc %d\n", rc);
164                 RETURN(MDL_MINMODE);
165         }
166
167         /*
168          * Do not take PDO lock on non-splittable objects if this is not PW,
169          * this should speed things up a bit.
170          */
171         if (split == CMM_SPLIT_DONE && lm != MDL_PW)
172                 RETURN(MDL_NL);
173
174         /* Protect splitting by exclusive lock. */
175         if (split == CMM_SPLIT_NEEDED && lm == MDL_PW)
176                 RETURN(MDL_EX);
177
178         /*
179          * Have no idea about lock mode, let it be what higher layer wants.
180          */
181         RETURN(MDL_MINMODE);
182 }
183
184 /* Check if split is expected for current thread. */
185 int cmm_split_expect(const struct lu_env *env, struct md_object *mo,
186                      struct md_attr *ma, int *split)
187 {
188         struct cmm_device *cmm = cmm_obj2dev(md2cmm_obj(mo));
189         struct cml_object *clo = md2cml_obj(mo);
190         struct lu_fid root_fid;
191         int rc;
192         ENTRY;
193
194         if (clo->clo_split == CMM_SPLIT_DONE ||
195             clo->clo_split == CMM_SPLIT_DENIED) {
196                 *split = clo->clo_split;
197                 RETURN(0);
198         }
199         /* CMM_SPLIT_UNKNOWN case below */
200
201         /* No need to split root object. */
202         rc = cmm_child_ops(cmm)->mdo_root_get(env, cmm->cmm_child,
203                                               &root_fid);
204         if (rc)
205                 RETURN(rc);
206
207         if (lu_fid_eq(&root_fid, cmm2fid(md2cmm_obj(mo)))) {
208                 /* update split state */
209                 *split = clo->clo_split == CMM_SPLIT_DENIED;
210                 RETURN(0);
211         }
212
213         /*
214          * Assumption: ma_valid = 0 here, we only need get inode and lmv_size
215          * for this get_attr.
216          */
217         LASSERT(ma->ma_valid == 0);
218         ma->ma_need = MA_INODE | MA_LMV;
219         rc = mo_attr_get(env, mo, ma);
220         if (rc)
221                 RETURN(rc);
222
223         /* No need split for already split object */
224         if (ma->ma_valid & MA_LMV) {
225                 LASSERT(ma->ma_lmv_size > 0);
226                 *split = clo->clo_split = CMM_SPLIT_DONE;
227                 RETURN(0);
228         }
229
230         /* No need split for object whose size < CMM_SPLIT_SIZE */
231         if (ma->ma_attr.la_size < CMM_SPLIT_SIZE) {
232                 *split = clo->clo_split = CMM_SPLIT_NONE;
233                 RETURN(0);
234         }
235
236         *split = clo->clo_split = CMM_SPLIT_NEEDED;
237         RETURN(0);
238 }
239
240 struct cmm_object *cmm_object_find(const struct lu_env *env,
241                                    struct cmm_device *d,
242                                    const struct lu_fid *f)
243 {
244         struct lu_object *o;
245         struct cmm_object *m;
246         ENTRY;
247
248         o = lu_object_find(env, d->cmm_md_dev.md_lu_dev.ld_site, f);
249         if (IS_ERR(o))
250                 m = (struct cmm_object *)o;
251         else
252                 m = lu2cmm_obj(lu_object_locate(o->lo_header,
253                                d->cmm_md_dev.md_lu_dev.ld_type));
254         RETURN(m);
255 }
256
257 static inline void cmm_object_put(const struct lu_env *env,
258                                   struct cmm_object *o)
259 {
260         lu_object_put(env, &o->cmo_obj.mo_lu);
261 }
262
263 /*
264  * Allocate new on passed @mc for slave object which is going to create there
265  * soon.
266  */
267 static int cmm_split_fid_alloc(const struct lu_env *env,
268                                struct cmm_device *cmm,
269                                struct mdc_device *mc,
270                                struct lu_fid *fid)
271 {
272         int rc;
273         ENTRY;
274
275         LASSERT(cmm != NULL && mc != NULL && fid != NULL);
276
277         down(&mc->mc_fid_sem);
278
279         /* Alloc new fid on @mc. */
280         rc = obd_fid_alloc(mc->mc_desc.cl_exp, fid, NULL);
281         if (rc > 0) {
282                 /* Setup FLD for new sequenceif needed. */
283                 rc = fld_client_create(cmm->cmm_fld, fid_seq(fid),
284                                        mc->mc_num, env);
285                 if (rc)
286                         CERROR("Can't create fld entry, rc %d\n", rc);
287         }
288         up(&mc->mc_fid_sem);
289
290         RETURN(rc);
291 }
292
293 /* Allocate new slave object on passed @mc */
294 static int cmm_split_slave_create(const struct lu_env *env,
295                                   struct cmm_device *cmm,
296                                   struct mdc_device *mc,
297                                   struct lu_fid *fid,
298                                   struct md_attr *ma,
299                                   struct lmv_stripe_md *lmv,
300                                   int lmv_size)
301 {
302         struct md_op_spec *spec = &cmm_env_info(env)->cmi_spec;
303         struct cmm_object *obj;
304         int rc;
305         ENTRY;
306
307         /* Allocate new fid and store it to @fid */
308         rc = cmm_split_fid_alloc(env, cmm, mc, fid);
309         if (rc) {
310                 CERROR("Can't alloc new fid on "LPU64
311                        ", rc %d\n", mc->mc_num, rc);
312                 RETURN(rc);
313         }
314
315         /* Allocate new object on @mc */
316         obj = cmm_object_find(env, cmm, fid);
317         if (IS_ERR(obj))
318                 RETURN(PTR_ERR(obj));
319
320         memset(spec, 0, sizeof *spec);
321         spec->u.sp_ea.fid = fid;
322         spec->u.sp_ea.eadata = lmv;
323         spec->u.sp_ea.eadatalen = lmv_size;
324         spec->sp_cr_flags |= MDS_CREATE_SLAVE_OBJ;
325         rc = mo_object_create(env, md_object_next(&obj->cmo_obj),
326                               spec, ma);
327         cmm_object_put(env, obj);
328         RETURN(rc);
329 }
330
331 /*
332  * Create so many slaves as number of stripes. This is called in split time
333  * before sending pages to slaves.
334  */
335 static int cmm_split_slaves_create(const struct lu_env *env,
336                                    struct md_object *mo,
337                                    struct md_attr *ma)
338 {
339         struct cmm_device    *cmm = cmm_obj2dev(md2cmm_obj(mo));
340         struct lu_fid        *lf  = cmm2fid(md2cmm_obj(mo));
341         struct lmv_stripe_md *slave_lmv = &cmm_env_info(env)->cmi_lmv;
342         struct mdc_device    *mc, *tmp;
343         struct lmv_stripe_md *lmv;
344         int i = 1, rc = 0;
345         ENTRY;
346
347         /* Init the split MEA */
348         lmv = ma->ma_lmv;
349         lmv->mea_master = cmm->cmm_local_num;
350         lmv->mea_magic = MEA_MAGIC_HASH_SEGMENT;
351         lmv->mea_count = cmm->cmm_tgt_count + 1;
352
353         /*
354          * Store master FID to local node idx number. Local node is always
355          * master and its stripe number if 0.
356          */
357         lmv->mea_ids[0] = *lf;
358
359         memset(slave_lmv, 0, sizeof *slave_lmv);
360         slave_lmv->mea_master = cmm->cmm_local_num;
361         slave_lmv->mea_magic = MEA_MAGIC_HASH_SEGMENT;
362         slave_lmv->mea_count = 0;
363
364         list_for_each_entry_safe(mc, tmp, &cmm->cmm_targets, mc_linkage) {
365                 rc = cmm_split_slave_create(env, cmm, mc, &lmv->mea_ids[i],
366                                             ma, slave_lmv, sizeof(*slave_lmv));
367                 if (rc)
368                         GOTO(cleanup, rc);
369                 i++;
370         }
371
372         ma->ma_valid |= MA_LMV;
373         EXIT;
374 cleanup:
375         return rc;
376 }
377
378 static inline int cmm_split_special_entry(struct lu_dirent *ent)
379 {
380         if (!strncmp(ent->lde_name, ".", le16_to_cpu(ent->lde_namelen)) ||
381             !strncmp(ent->lde_name, "..", le16_to_cpu(ent->lde_namelen)))
382                 return 1;
383         return 0;
384 }
385
386 static inline struct lu_name *cmm_name(const struct lu_env *env,
387                                        char *name, int buflen)
388 {
389         struct lu_name *lname;
390         struct cmm_thread_info *cmi;
391
392         LASSERT(buflen > 0);
393         LASSERT(name[buflen - 1] == '\0');
394
395         cmi = cmm_env_info(env);
396         lname = &cmi->cti_name;
397         lname->ln_name = name;
398         /* NOT count the terminating '\0' of name for length */
399         lname->ln_namelen = buflen - 1;
400         return lname;
401 }
402
403 /*
404  * Remove one entry from local MDT. Do not corrupt byte order in page, it will
405  * be sent to remote MDT.
406  */
407 static int cmm_split_remove_entry(const struct lu_env *env,
408                                   struct md_object *mo,
409                                   struct lu_dirent *ent)
410 {
411         struct cmm_device *cmm = cmm_obj2dev(md2cmm_obj(mo));
412         struct cmm_thread_info *cmi;
413          struct md_attr *ma;
414         struct cmm_object *obj;
415         int is_dir, rc;
416         char *name;
417         struct lu_name *lname;
418         ENTRY;
419
420         if (cmm_split_special_entry(ent))
421                 RETURN(0);
422
423         fid_le_to_cpu(&cmm_env_info(env)->cmi_fid, &ent->lde_fid);
424         obj = cmm_object_find(env, cmm, &cmm_env_info(env)->cmi_fid);
425         if (IS_ERR(obj))
426                 RETURN(PTR_ERR(obj));
427
428         cmi = cmm_env_info(env);
429         ma = &cmi->cmi_ma;
430
431         if (lu_object_exists(&obj->cmo_obj.mo_lu) > 0)
432                 is_dir = S_ISDIR(lu_object_attr(&obj->cmo_obj.mo_lu));
433         else
434                 /*
435                  * XXX: These days only cross-ref dirs are possible, so for the
436                  * sake of simplicity, in split, we suppose that all cross-ref
437                  * names pint to directory and do not do additional getattr to
438                  * remote MDT.
439                  */
440                 is_dir = 1;
441
442         OBD_ALLOC(name, le16_to_cpu(ent->lde_namelen) + 1);
443         if (!name)
444                 GOTO(cleanup, rc = -ENOMEM);
445
446         memcpy(name, ent->lde_name, le16_to_cpu(ent->lde_namelen));
447         lname = cmm_name(env, name, le16_to_cpu(ent->lde_namelen) + 1);
448         /*
449          * When split, no need update parent's ctime,
450          * and no permission check for name_remove.
451          */
452         ma->ma_attr.la_ctime = 0;
453         if (is_dir)
454                 ma->ma_attr.la_mode = S_IFDIR;
455         else
456                 ma->ma_attr.la_mode = 0;
457         ma->ma_attr.la_valid = LA_MODE;
458         ma->ma_valid = MA_INODE;
459
460         ma->ma_attr_flags |= MDS_PERM_BYPASS;
461         rc = mdo_name_remove(env, md_object_next(mo), lname, ma);
462         OBD_FREE(name, le16_to_cpu(ent->lde_namelen) + 1);
463         if (rc)
464                 GOTO(cleanup, rc);
465
466         /*
467          * This @ent will be transferred to slave MDS and insert there, so in
468          * the slave MDS, we should know whether this object is dir or not, so
469          * use the highest bit of the hash to indicate that (because we do not
470          * use highest bit of hash).
471          */
472         if (is_dir) {
473                 ent->lde_hash = le64_to_cpu(ent->lde_hash);
474                 ent->lde_hash = cpu_to_le64(ent->lde_hash | MAX_HASH_HIGHEST_BIT);
475         }
476         EXIT;
477 cleanup:
478         cmm_object_put(env, obj);
479         return rc;
480 }
481
482 /*
483  * Remove all entries from passed page. These entries are going to remote MDT
484  * and thus should be removed locally.
485  */
486 static int cmm_split_remove_page(const struct lu_env *env,
487                                  struct md_object *mo,
488                                  struct lu_rdpg *rdpg,
489                                  __u64 hash_end, __u32 *len)
490 {
491         struct lu_dirpage *dp;
492         struct lu_dirent  *ent;
493         int rc = 0;
494         ENTRY;
495
496         *len = 0;
497         cfs_kmap(rdpg->rp_pages[0]);
498         dp = page_address(rdpg->rp_pages[0]);
499         for (ent = lu_dirent_start(dp);
500              ent != NULL && le64_to_cpu(ent->lde_hash) < hash_end;
501              ent = lu_dirent_next(ent)) {
502                 rc = cmm_split_remove_entry(env, mo, ent);
503                 if (rc) {
504                         /*
505                          * XXX: Error handler to insert remove name back,
506                          * currently we assumed it will success anyway in
507                          * verfication test.
508                          */
509                         CERROR("Can not del %*.*s, rc %d\n",
510                                le16_to_cpu(ent->lde_namelen),
511                                le16_to_cpu(ent->lde_namelen),
512                                ent->lde_name, rc);
513                         GOTO(unmap, rc);
514                 }
515                 *len += lu_dirent_size(ent);
516         }
517
518         if (ent != lu_dirent_start(dp))
519                 *len += sizeof(struct lu_dirpage);
520         EXIT;
521 unmap:
522         cfs_kunmap(rdpg->rp_pages[0]);
523         return rc;
524 }
525
526 /* Send one page to remote MDT for creating entries there. */
527 static int cmm_split_send_page(const struct lu_env *env,
528                                struct md_object *mo,
529                                struct lu_rdpg *rdpg,
530                                struct lu_fid *fid, int len)
531 {
532         struct cmm_device *cmm = cmm_obj2dev(md2cmm_obj(mo));
533         struct cmm_object *obj;
534         int rc = 0;
535         ENTRY;
536
537         obj = cmm_object_find(env, cmm, fid);
538         if (IS_ERR(obj))
539                 RETURN(PTR_ERR(obj));
540
541         rc = mdc_send_page(cmm, env, md_object_next(&obj->cmo_obj),
542                            rdpg->rp_pages[0], len);
543         cmm_object_put(env, obj);
544         RETURN(rc);
545 }
546
547 /* Read one page of entries from local MDT. */
548 static int cmm_split_read_page(const struct lu_env *env,
549                                struct md_object *mo,
550                                struct lu_rdpg *rdpg)
551 {
552         int rc;
553         ENTRY;
554         memset(cfs_kmap(rdpg->rp_pages[0]), 0, CFS_PAGE_SIZE);
555         cfs_kunmap(rdpg->rp_pages[0]);
556         rc = mo_readpage(env, md_object_next(mo), rdpg);
557         RETURN(rc);
558 }
559
560 /*
561  * This function performs migration of all pages with entries which fit into one
562  * stripe and one hash segment.
563  */
564 static int cmm_split_process_stripe(const struct lu_env *env,
565                                     struct md_object *mo,
566                                     struct lu_rdpg *rdpg,
567                                     struct lu_fid *lf,
568                                     __u64 end)
569 {
570         int rc, done = 0;
571         ENTRY;
572
573         LASSERT(rdpg->rp_npages == 1);
574         do {
575                 struct lu_dirpage *ldp;
576                 __u32 len = 0;
577
578                 /* Read one page from local MDT. */
579                 rc = cmm_split_read_page(env, mo, rdpg);
580                 if (rc) {
581                         CERROR("Error in readpage: %d\n", rc);
582                         RETURN(rc);
583                 }
584
585                 /* Remove local entries which are going to remite MDT. */
586                 rc = cmm_split_remove_page(env, mo, rdpg, end, &len);
587                 if (rc) {
588                         CERROR("Error in remove stripe entries: %d\n", rc);
589                         RETURN(rc);
590                 }
591
592                 /* Send entries page to slave MDT. */
593                 if (len > 0) {
594                         rc = cmm_split_send_page(env, mo, rdpg, lf, len);
595                         if (rc) {
596                                 CERROR("Error in sending page: %d\n", rc);
597                                 RETURN(rc);
598                         }
599                 }
600
601                 cfs_kmap(rdpg->rp_pages[0]);
602                 ldp = page_address(rdpg->rp_pages[0]);
603                 if (le64_to_cpu(ldp->ldp_hash_end) >= end)
604                         done = 1;
605
606                 rdpg->rp_hash = le64_to_cpu(ldp->ldp_hash_end);
607                 cfs_kunmap(rdpg->rp_pages[0]);
608         } while (!done);
609
610         RETURN(rc);
611 }
612
613 static int cmm_split_process_dir(const struct lu_env *env,
614                                  struct md_object *mo,
615                                  struct md_attr *ma)
616 {
617         struct cmm_device *cmm = cmm_obj2dev(md2cmm_obj(mo));
618         struct lu_rdpg *rdpg = &cmm_env_info(env)->cmi_rdpg;
619         __u64 hash_segement;
620         int rc = 0, i;
621         ENTRY;
622
623         memset(rdpg, 0, sizeof *rdpg);
624         rdpg->rp_npages = CMM_SPLIT_PAGE_COUNT;
625         rdpg->rp_count  = CFS_PAGE_SIZE * rdpg->rp_npages;
626         rdpg->rp_pages  = cmm_env_info(env)->cmi_pages;
627
628         for (i = 0; i < rdpg->rp_npages; i++) {
629                 rdpg->rp_pages[i] = cfs_alloc_page(CFS_ALLOC_STD);
630                 if (rdpg->rp_pages[i] == NULL)
631                         GOTO(cleanup, rc = -ENOMEM);
632         }
633
634         LASSERT(ma->ma_valid & MA_LMV);
635         hash_segement = MAX_HASH_SIZE / (cmm->cmm_tgt_count + 1);
636         for (i = 1; i < cmm->cmm_tgt_count + 1; i++) {
637                 struct lu_fid *lf;
638                 __u64 hash_end;
639
640                 lf = &ma->ma_lmv->mea_ids[i];
641
642                 rdpg->rp_hash = i * hash_segement;
643                 if (i == cmm->cmm_tgt_count)
644                         hash_end = MAX_HASH_SIZE;
645                 else
646                         hash_end = rdpg->rp_hash + hash_segement;
647                 rc = cmm_split_process_stripe(env, mo, rdpg, lf, hash_end);
648                 if (rc) {
649                         CERROR("Error (rc = %d) while splitting for %d: fid="
650                                DFID", %08x:%08x\n", rc, i, PFID(lf),
651                                rdpg->rp_hash, hash_end);
652                         GOTO(cleanup, rc);
653                 }
654         }
655         EXIT;
656 cleanup:
657         for (i = 0; i < rdpg->rp_npages; i++)
658                 if (rdpg->rp_pages[i] != NULL)
659                         __cfs_free_page(rdpg->rp_pages[i]);
660         return rc;
661 }
662
663 int cmm_split_dir(const struct lu_env *env, struct md_object *mo)
664 {
665         struct cmm_device *cmm = cmm_obj2dev(md2cmm_obj(mo));
666         struct md_attr    *ma = &cmm_env_info(env)->cmi_ma;
667         int                rc = 0, split;
668         struct lu_buf     *buf;
669         ENTRY;
670
671         cmm_lprocfs_time_start(env);
672
673         LASSERT(S_ISDIR(lu_object_attr(&mo->mo_lu)));
674         memset(ma, 0, sizeof(*ma));
675
676         /* Step1: Checking whether the dir needs to be split. */
677         rc = cmm_split_expect(env, mo, ma, &split);
678         if (rc)
679                 GOTO(out, rc);
680
681         if (split != CMM_SPLIT_NEEDED) {
682                 /* No split is needed, caller may proceed with create. */
683                 GOTO(out, rc = 0);
684         }
685
686         /* Split should be done now, let's do it. */
687         CWARN("Dir "DFID" is going to split (size: "LPU64")\n",
688               PFID(lu_object_fid(&mo->mo_lu)), ma->ma_attr.la_size);
689
690         /*
691          * Disable transacrions for split, since there will be so many trans in
692          * this one ops, conflict with current recovery design.
693          */
694         rc = cmm_upcall(env, &cmm->cmm_md_dev, MD_NO_TRANS);
695         if (rc) {
696                 CERROR("Can't disable trans for split, rc %d\n", rc);
697                 GOTO(out, rc);
698         }
699
700         /* Step2: Prepare the md memory */
701         ma->ma_lmv_size = CMM_MD_SIZE(cmm->cmm_tgt_count + 1);
702         OBD_ALLOC(ma->ma_lmv, ma->ma_lmv_size);
703         if (ma->ma_lmv == NULL)
704                 GOTO(out, rc = -ENOMEM);
705
706         /* Step3: Create slave objects and fill the ma->ma_lmv */
707         rc = cmm_split_slaves_create(env, mo, ma);
708         if (rc) {
709                 CERROR("Can't create slaves for split, rc %d\n", rc);
710                 GOTO(cleanup, rc);
711         }
712
713         /* Step4: Scan and split the object. */
714         rc = cmm_split_process_dir(env, mo, ma);
715         if (rc) {
716                 CERROR("Can't scan and split, rc %d\n", rc);
717                 GOTO(cleanup, rc);
718         }
719
720         /* Step5: Set mea to the master object. */
721         LASSERT(ma->ma_valid & MA_LMV);
722         buf = cmm_buf_get(env, ma->ma_lmv, ma->ma_lmv_size);
723         rc = mo_xattr_set(env, md_object_next(mo), buf,
724                           MDS_LMV_MD_NAME, 0);
725         if (rc) {
726                 CERROR("Can't set MEA to master dir, " "rc %d\n", rc);
727                 GOTO(cleanup, rc);
728         }
729
730         /* set flag in cmm_object */
731         md2cml_obj(mo)->clo_split = CMM_SPLIT_DONE;
732
733         /*
734          * Finally, split succeed, tell client to repeat opetartion on correct
735          * MDT.
736          */
737         CWARN("Dir "DFID" has been split\n", PFID(lu_object_fid(&mo->mo_lu)));
738         rc = -ERESTART;
739         EXIT;
740 cleanup:
741         OBD_FREE(ma->ma_lmv, ma->ma_lmv_size);
742 out:
743         cmm_lprocfs_time_end(env, cmm, LPROC_CMM_SPLIT);
744         return rc;
745 }