Whamcloud - gitweb
LU-1347 build: remove the vim/emacs modelines
[fs/lustre-release.git] / lustre / cmm / cmm_split.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  */
30 /*
31  * This file is part of Lustre, http://www.lustre.org/
32  * Lustre is a trademark of Sun Microsystems, Inc.
33  *
34  * lustre/cmm/cmm_split.c
35  *
36  * Lustre splitting dir
37  *
38  * Author: Alex Thomas  <alex@clusterfs.com>
39  * Author: Wang Di      <wangdi@clusterfs.com>
40  * Author: Yury Umanets <umka@clusterfs.com>
41  */
42
43 #ifndef EXPORT_SYMTAB
44 # define EXPORT_SYMTAB
45 #endif
46
47 #define DEBUG_SUBSYSTEM S_MDS
48
49 #include <obd_class.h>
50 #include <lustre_fid.h>
51 #include <lustre_mds.h>
52 #include <lustre/lustre_idl.h>
53 #include "cmm_internal.h"
54 #include "mdc_internal.h"
55
56 /**
57  * \addtogroup split
58  * @{
59  */
60 enum {
61         CMM_SPLIT_SIZE =  128 * 1024
62 };
63
64 /**
65  * This function checks if passed \a name come to correct server (local MDT).
66  *
67  * \param mp Parent directory
68  * \param name Name to lookup
69  * \retval  -ERESTART Let client know that dir was split and client needs to
70  * chose correct stripe.
71  */
72 int cmm_split_check(const struct lu_env *env, struct md_object *mp,
73                     const char *name)
74 {
75         struct cmm_device *cmm = cmm_obj2dev(md2cmm_obj(mp));
76         struct md_attr *ma = &cmm_env_info(env)->cmi_ma;
77         struct cml_object *clo = md2cml_obj(mp);
78         int rc, lmv_size;
79         ENTRY;
80
81         cmm_lprocfs_time_start(env);
82
83         /* Not split yet */
84         if (clo->clo_split == CMM_SPLIT_NONE ||
85             clo->clo_split == CMM_SPLIT_DENIED)
86                 GOTO(out, rc = 0);
87
88         lmv_size = CMM_MD_SIZE(cmm->cmm_tgt_count + 1);
89
90         /* Try to get the LMV EA */
91         memset(ma, 0, sizeof(*ma));
92
93         ma->ma_need = MA_LMV;
94         ma->ma_lmv_size = lmv_size;
95         OBD_ALLOC(ma->ma_lmv, lmv_size);
96         if (ma->ma_lmv == NULL)
97                 GOTO(out, rc = -ENOMEM);
98
99         /* Get LMV EA, Note: refresh valid here for getting LMV_EA */
100         rc = mo_attr_get(env, mp, ma);
101         if (rc)
102                 GOTO(cleanup, rc);
103
104         /* No LMV just return */
105         if (!(ma->ma_valid & MA_LMV)) {
106                 /* update split state if unknown */
107                 if (clo->clo_split == CMM_SPLIT_UNKNOWN)
108                         clo->clo_split = CMM_SPLIT_NONE;
109                 GOTO(cleanup, rc = 0);
110         }
111
112         /* Skip checking the slave dirs (mea_count is 0) */
113         if (ma->ma_lmv->mea_count != 0) {
114                 int idx;
115
116                 /**
117                  * This gets stripe by name to check the name belongs to master
118                  * dir, otherwise return the -ERESTART
119                  */
120                 idx = mea_name2idx(ma->ma_lmv, name, strlen(name));
121
122                 /**
123                  * When client does not know about split, it sends create() to
124                  * the master MDT and master replay back if directory is split.
125                  * So client should orward request to correct MDT. This
126                  * is why we check here if stripe zero or not. Zero stripe means
127                  * master stripe. If stripe calculated from name is not zero -
128                  * return -ERESTART.
129                  */
130                 if (idx != 0)
131                         rc = -ERESTART;
132
133                 /* update split state to DONE if unknown */
134                 if (clo->clo_split == CMM_SPLIT_UNKNOWN)
135                         clo->clo_split = CMM_SPLIT_DONE;
136         } else {
137                 /* split is denied for slave dir */
138                 clo->clo_split = CMM_SPLIT_DENIED;
139         }
140         EXIT;
141 cleanup:
142         OBD_FREE(ma->ma_lmv, lmv_size);
143 out:
144         cmm_lprocfs_time_end(env, cmm, LPROC_CMM_SPLIT_CHECK);
145         return rc;
146 }
147
148 /**
149  * Return preferable access mode to the caller taking into account the split
150  * case and the fact of existing not splittable dirs.
151  */
152 int cmm_split_access(const struct lu_env *env, struct md_object *mo,
153                      mdl_mode_t lm)
154 {
155         struct md_attr *ma = &cmm_env_info(env)->cmi_ma;
156         int rc, split;
157         ENTRY;
158
159         memset(ma, 0, sizeof(*ma));
160
161         /*
162          * Check only if we need protection from split.  If not - mdt handles
163          * other cases.
164          */
165         rc = cmm_split_expect(env, mo, ma, &split);
166         if (rc) {
167                 CERROR("Can't check for possible split, rc %d\n", rc);
168                 RETURN(MDL_MINMODE);
169         }
170
171         /*
172          * Do not take PDO lock on non-splittable objects if this is not PW,
173          * this should speed things up a bit.
174          */
175         if (split == CMM_SPLIT_DONE && lm != MDL_PW)
176                 RETURN(MDL_NL);
177
178         /* Protect splitting by exclusive lock. */
179         if (split == CMM_SPLIT_NEEDED && lm == MDL_PW)
180                 RETURN(MDL_EX);
181
182         /*
183          * Have no idea about lock mode, let it be what higher layer wants.
184          */
185         RETURN(MDL_MINMODE);
186 }
187
188 /**
189  * Check if split is expected for current thread.
190  *
191  * \param mo Directory to split.
192  * \param ma md attributes.
193  * \param split Flag to save split information.
194  */
195 int cmm_split_expect(const struct lu_env *env, struct md_object *mo,
196                      struct md_attr *ma, int *split)
197 {
198         struct cmm_device *cmm = cmm_obj2dev(md2cmm_obj(mo));
199         struct cml_object *clo = md2cml_obj(mo);
200         struct lu_fid root_fid;
201         int rc;
202         ENTRY;
203
204         if (clo->clo_split == CMM_SPLIT_DONE ||
205             clo->clo_split == CMM_SPLIT_DENIED) {
206                 *split = clo->clo_split;
207                 RETURN(0);
208         }
209         /* CMM_SPLIT_UNKNOWN case below */
210
211         /* No need to split root object. */
212         rc = cmm_child_ops(cmm)->mdo_root_get(env, cmm->cmm_child,
213                                               &root_fid);
214         if (rc)
215                 RETURN(rc);
216
217         if (lu_fid_eq(&root_fid, cmm2fid(md2cmm_obj(mo)))) {
218                 /* update split state */
219                 *split = clo->clo_split == CMM_SPLIT_DENIED;
220                 RETURN(0);
221         }
222
223         /*
224          * Assumption: ma_valid = 0 here, we only need get inode and lmv_size
225          * for this get_attr.
226          */
227         LASSERT(ma->ma_valid == 0);
228         ma->ma_need = MA_INODE | MA_LMV;
229         rc = mo_attr_get(env, mo, ma);
230         if (rc)
231                 RETURN(rc);
232
233         /* No need split for already split object */
234         if (ma->ma_valid & MA_LMV) {
235                 LASSERT(ma->ma_lmv_size > 0);
236                 *split = clo->clo_split = CMM_SPLIT_DONE;
237                 RETURN(0);
238         }
239
240         /* No need split for object whose size < CMM_SPLIT_SIZE */
241         if (ma->ma_attr.la_size < CMM_SPLIT_SIZE) {
242                 *split = clo->clo_split = CMM_SPLIT_NONE;
243                 RETURN(0);
244         }
245
246         *split = clo->clo_split = CMM_SPLIT_NEEDED;
247         RETURN(0);
248 }
249
250 struct cmm_object *cmm_object_find(const struct lu_env *env,
251                                    struct cmm_device *d,
252                                    const struct lu_fid *f)
253 {
254         return md2cmm_obj(md_object_find_slice(env, &d->cmm_md_dev, fid));
255 }
256
257 static inline void cmm_object_put(const struct lu_env *env,
258                                   struct cmm_object *o)
259 {
260         lu_object_put(env, &o->cmo_obj.mo_lu);
261 }
262
263 /**
264  * Allocate new FID on passed \a mc for slave object which is going to
265  * create there soon.
266  */
267 static int cmm_split_fid_alloc(const struct lu_env *env,
268                                struct cmm_device *cmm,
269                                struct mdc_device *mc,
270                                struct lu_fid *fid)
271 {
272         int rc;
273         ENTRY;
274
275         LASSERT(cmm != NULL && mc != NULL && fid != NULL);
276
277         cfs_down(&mc->mc_fid_sem);
278
279         /* Alloc new fid on \a mc. */
280         rc = obd_fid_alloc(mc->mc_desc.cl_exp, fid, NULL);
281         if (rc > 0)
282                 rc = 0;
283         cfs_up(&mc->mc_fid_sem);
284
285         RETURN(rc);
286 }
287
288 /**
289  * Allocate new slave object on passed \a mc.
290  */
291 static int cmm_split_slave_create(const struct lu_env *env,
292                                   struct cmm_device *cmm,
293                                   struct mdc_device *mc,
294                                   struct lu_fid *fid,
295                                   struct md_attr *ma,
296                                   struct lmv_stripe_md *lmv,
297                                   int lmv_size)
298 {
299         struct md_op_spec *spec = &cmm_env_info(env)->cmi_spec;
300         struct cmm_object *obj;
301         int rc;
302         ENTRY;
303
304         /* Allocate new fid and store it to @fid */
305         rc = cmm_split_fid_alloc(env, cmm, mc, fid);
306         if (rc) {
307                 CERROR("Can't alloc new fid on "LPU64
308                        ", rc %d\n", mc->mc_num, rc);
309                 RETURN(rc);
310         }
311
312         /* Allocate new object on @mc */
313         obj = cmm_object_find(env, cmm, fid);
314         if (IS_ERR(obj))
315                 RETURN(PTR_ERR(obj));
316
317         memset(spec, 0, sizeof *spec);
318         spec->u.sp_ea.fid = fid;
319         spec->u.sp_ea.eadata = lmv;
320         spec->u.sp_ea.eadatalen = lmv_size;
321         spec->sp_cr_flags |= MDS_CREATE_SLAVE_OBJ;
322         rc = mo_object_create(env, md_object_next(&obj->cmo_obj),
323                               spec, ma);
324         cmm_object_put(env, obj);
325         RETURN(rc);
326 }
327
328 /**
329  * Create so many slaves as number of stripes.
330  * This is called in split time before sending pages to slaves.
331  */
332 static int cmm_split_slaves_create(const struct lu_env *env,
333                                    struct md_object *mo,
334                                    struct md_attr *ma)
335 {
336         struct cmm_device    *cmm = cmm_obj2dev(md2cmm_obj(mo));
337         struct lu_fid        *lf  = cmm2fid(md2cmm_obj(mo));
338         struct lmv_stripe_md *slave_lmv = &cmm_env_info(env)->cmi_lmv;
339         struct mdc_device    *mc, *tmp;
340         struct lmv_stripe_md *lmv;
341         int i = 1, rc = 0;
342         ENTRY;
343
344         /* Init the split MEA */
345         lmv = ma->ma_lmv;
346         lmv->mea_master = cmm->cmm_local_num;
347         lmv->mea_magic = MEA_MAGIC_HASH_SEGMENT;
348         lmv->mea_count = cmm->cmm_tgt_count + 1;
349
350         /*
351          * Store master FID to local node idx number. Local node is always
352          * master and its stripe number if 0.
353          */
354         lmv->mea_ids[0] = *lf;
355
356         memset(slave_lmv, 0, sizeof *slave_lmv);
357         slave_lmv->mea_master = cmm->cmm_local_num;
358         slave_lmv->mea_magic = MEA_MAGIC_HASH_SEGMENT;
359         slave_lmv->mea_count = 0;
360
361         cfs_list_for_each_entry_safe(mc, tmp, &cmm->cmm_targets, mc_linkage) {
362                 rc = cmm_split_slave_create(env, cmm, mc, &lmv->mea_ids[i],
363                                             ma, slave_lmv, sizeof(*slave_lmv));
364                 if (rc)
365                         GOTO(cleanup, rc);
366                 i++;
367         }
368         EXIT;
369 cleanup:
370         return rc;
371 }
372
373 static inline int cmm_split_special_entry(struct lu_dirent *ent)
374 {
375         if (!strncmp(ent->lde_name, ".", le16_to_cpu(ent->lde_namelen)) ||
376             !strncmp(ent->lde_name, "..", le16_to_cpu(ent->lde_namelen)))
377                 return 1;
378         return 0;
379 }
380
381 /**
382  * Convert string to the lu_name structure.
383  */
384 static inline struct lu_name *cmm_name(const struct lu_env *env,
385                                        char *name, int buflen)
386 {
387         struct lu_name *lname;
388         struct cmm_thread_info *cmi;
389
390         LASSERT(buflen > 0);
391         LASSERT(name[buflen - 1] == '\0');
392
393         cmi = cmm_env_info(env);
394         lname = &cmi->cti_name;
395         lname->ln_name = name;
396         /* do NOT count the terminating '\0' of name for length */
397         lname->ln_namelen = buflen - 1;
398         return lname;
399 }
400
401 /**
402  * Helper for cmm_split_remove_page(). It removes one entry from local MDT.
403  * Do not corrupt byte order in page, it will be sent to remote MDT.
404  */
405 static int cmm_split_remove_entry(const struct lu_env *env,
406                                   struct md_object *mo,
407                                   struct lu_dirent *ent)
408 {
409         struct cmm_device *cmm = cmm_obj2dev(md2cmm_obj(mo));
410         struct cmm_thread_info *cmi;
411         struct md_attr *ma;
412         struct cmm_object *obj;
413         int is_dir, rc;
414         char *name;
415         struct lu_name *lname;
416         ENTRY;
417
418         if (cmm_split_special_entry(ent))
419                 RETURN(0);
420
421         fid_le_to_cpu(&cmm_env_info(env)->cmi_fid, &ent->lde_fid);
422         obj = cmm_object_find(env, cmm, &cmm_env_info(env)->cmi_fid);
423         if (IS_ERR(obj))
424                 RETURN(PTR_ERR(obj));
425
426         cmi = cmm_env_info(env);
427         ma = &cmi->cmi_ma;
428
429         if (lu_object_exists(&obj->cmo_obj.mo_lu) > 0)
430                 is_dir = S_ISDIR(lu_object_attr(&obj->cmo_obj.mo_lu));
431         else
432                 /**
433                  * \note These days only cross-ref dirs are possible, so for the
434                  * sake of simplicity, in split, we suppose that all cross-ref
435                  * names point to directory and do not do additional getattr to
436                  * remote MDT.
437                  */
438                 is_dir = 1;
439
440         OBD_ALLOC(name, le16_to_cpu(ent->lde_namelen) + 1);
441         if (!name)
442                 GOTO(cleanup, rc = -ENOMEM);
443
444         memcpy(name, ent->lde_name, le16_to_cpu(ent->lde_namelen));
445         lname = cmm_name(env, name, le16_to_cpu(ent->lde_namelen) + 1);
446         /**
447          * \note When split, no need update parent's ctime,
448          * and no permission check for name_remove.
449          */
450         ma->ma_attr.la_ctime = 0;
451         if (is_dir)
452                 ma->ma_attr.la_mode = S_IFDIR;
453         else
454                 ma->ma_attr.la_mode = 0;
455         ma->ma_attr.la_valid = LA_MODE;
456         ma->ma_valid = MA_INODE;
457
458         ma->ma_attr_flags |= MDS_PERM_BYPASS;
459         rc = mdo_name_remove(env, md_object_next(mo), lname, ma);
460         OBD_FREE(name, le16_to_cpu(ent->lde_namelen) + 1);
461         if (rc)
462                 GOTO(cleanup, rc);
463
464         /**
465          * \note For each entry transferred to the slave MDS we should know
466          * whether this object is dir or not. Therefore the highest bit of the
467          * hash is used to indicate that (it is unused for hash purposes anyway).
468          */
469         if (is_dir) {
470                 ent->lde_hash = le64_to_cpu(ent->lde_hash);
471                 ent->lde_hash = cpu_to_le64(ent->lde_hash | MAX_HASH_HIGHEST_BIT);
472         }
473         EXIT;
474 cleanup:
475         cmm_object_put(env, obj);
476         return rc;
477 }
478
479 /**
480  * Remove all entries from passed page.
481  * These entries are going to remote MDT and thus should be removed locally.
482  */
483 static int cmm_split_remove_page(const struct lu_env *env,
484                                  struct md_object *mo,
485                                  struct lu_rdpg *rdpg,
486                                  __u64 hash_end, __u32 *len)
487 {
488         struct lu_dirpage *dp;
489         struct lu_dirent  *ent;
490         int rc = 0;
491         ENTRY;
492
493         *len = 0;
494         cfs_kmap(rdpg->rp_pages[0]);
495         dp = page_address(rdpg->rp_pages[0]);
496         for (ent = lu_dirent_start(dp);
497              ent != NULL && le64_to_cpu(ent->lde_hash) < hash_end;
498              ent = lu_dirent_next(ent)) {
499                 rc = cmm_split_remove_entry(env, mo, ent);
500                 if (rc) {
501                         /*
502                          * XXX: Error handler to insert remove name back,
503                          * currently we assumed it will success anyway in
504                          * verfication test.
505                          */
506                         CERROR("Can not del %*.*s, rc %d\n",
507                                le16_to_cpu(ent->lde_namelen),
508                                le16_to_cpu(ent->lde_namelen),
509                                ent->lde_name, rc);
510                         GOTO(unmap, rc);
511                 }
512                 *len += lu_dirent_size(ent);
513         }
514
515         if (ent != lu_dirent_start(dp))
516                 *len += sizeof(struct lu_dirpage);
517         EXIT;
518 unmap:
519         cfs_kunmap(rdpg->rp_pages[0]);
520         return rc;
521 }
522
523 /**
524  * Send one page of entries to the slave MDT.
525  * This page contains entries to be created there.
526  */
527 static int cmm_split_send_page(const struct lu_env *env,
528                                struct md_object *mo,
529                                struct lu_rdpg *rdpg,
530                                struct lu_fid *fid, int len)
531 {
532         struct cmm_device *cmm = cmm_obj2dev(md2cmm_obj(mo));
533         struct cmm_object *obj;
534         int rc = 0;
535         ENTRY;
536
537         obj = cmm_object_find(env, cmm, fid);
538         if (IS_ERR(obj))
539                 RETURN(PTR_ERR(obj));
540
541         rc = mdc_send_page(cmm, env, md_object_next(&obj->cmo_obj),
542                            rdpg->rp_pages[0], len);
543         cmm_object_put(env, obj);
544         RETURN(rc);
545 }
546
547 /** Read one page of entries from local MDT. */
548 static int cmm_split_read_page(const struct lu_env *env,
549                                struct md_object *mo,
550                                struct lu_rdpg *rdpg)
551 {
552         int rc;
553         ENTRY;
554         memset(cfs_kmap(rdpg->rp_pages[0]), 0, CFS_PAGE_SIZE);
555         cfs_kunmap(rdpg->rp_pages[0]);
556         rc = mo_readpage(env, md_object_next(mo), rdpg);
557         RETURN(rc);
558 }
559
560 /**
561  * This function performs migration of each directory stripe to its MDS.
562  */
563 static int cmm_split_process_stripe(const struct lu_env *env,
564                                     struct md_object *mo,
565                                     struct lu_rdpg *rdpg,
566                                     struct lu_fid *lf,
567                                     __u64 end)
568 {
569         int rc, done = 0;
570         ENTRY;
571
572         LASSERT(rdpg->rp_npages == 1);
573         do {
574                 struct lu_dirpage *ldp;
575                 __u32 len = 0;
576
577                 /** - Read one page of entries from local MDT. */
578                 rc = cmm_split_read_page(env, mo, rdpg);
579                 if (rc) {
580                         CERROR("Error in readpage: %d\n", rc);
581                         RETURN(rc);
582                 }
583
584                 /** - Remove local entries which are going to remite MDT. */
585                 rc = cmm_split_remove_page(env, mo, rdpg, end, &len);
586                 if (rc) {
587                         CERROR("Error in remove stripe entries: %d\n", rc);
588                         RETURN(rc);
589                 }
590
591                 /**
592                  * - Send entries page to slave MDT and repeat while there are
593                  * more pages.
594                  */
595                 if (len > 0) {
596                         rc = cmm_split_send_page(env, mo, rdpg, lf, len);
597                         if (rc) {
598                                 CERROR("Error in sending page: %d\n", rc);
599                                 RETURN(rc);
600                         }
601                 }
602
603                 cfs_kmap(rdpg->rp_pages[0]);
604                 ldp = page_address(rdpg->rp_pages[0]);
605                 if (le64_to_cpu(ldp->ldp_hash_end) >= end)
606                         done = 1;
607
608                 rdpg->rp_hash = le64_to_cpu(ldp->ldp_hash_end);
609                 cfs_kunmap(rdpg->rp_pages[0]);
610         } while (!done);
611
612         RETURN(rc);
613 }
614
615 /**
616  * Directory scanner for split operation.
617  *
618  * It calculates hashes for names and organizes files to stripes.
619  */
620 static int cmm_split_process_dir(const struct lu_env *env,
621                                  struct md_object *mo,
622                                  struct md_attr *ma)
623 {
624         struct cmm_device *cmm = cmm_obj2dev(md2cmm_obj(mo));
625         struct lu_rdpg *rdpg = &cmm_env_info(env)->cmi_rdpg;
626         __u64 hash_segment;
627         int rc = 0, i;
628         ENTRY;
629
630         memset(rdpg, 0, sizeof *rdpg);
631         rdpg->rp_npages = CMM_SPLIT_PAGE_COUNT;
632         rdpg->rp_count  = CFS_PAGE_SIZE * rdpg->rp_npages;
633         rdpg->rp_pages  = cmm_env_info(env)->cmi_pages;
634
635         for (i = 0; i < rdpg->rp_npages; i++) {
636                 rdpg->rp_pages[i] = cfs_alloc_page(CFS_ALLOC_STD);
637                 if (rdpg->rp_pages[i] == NULL)
638                         GOTO(cleanup, rc = -ENOMEM);
639         }
640
641         hash_segment = MAX_HASH_SIZE;
642         /** Whole hash range is divided on segments by number of MDS-es. */
643         do_div(hash_segment, cmm->cmm_tgt_count + 1);
644         /**
645          * For each segment the cmm_split_process_stripe() is called to move
646          * entries on new server.
647          */
648         for (i = 1; i < cmm->cmm_tgt_count + 1; i++) {
649                 struct lu_fid *lf;
650                 __u64 hash_end;
651
652                 lf = &ma->ma_lmv->mea_ids[i];
653
654                 rdpg->rp_hash = i * hash_segment;
655                 if (i == cmm->cmm_tgt_count)
656                         hash_end = MAX_HASH_SIZE;
657                 else
658                         hash_end = rdpg->rp_hash + hash_segment;
659                 rc = cmm_split_process_stripe(env, mo, rdpg, lf, hash_end);
660                 if (rc) {
661                         CERROR("Error (rc = %d) while splitting for %d: fid="
662                                DFID", "LPX64":"LPX64"\n", rc, i, PFID(lf),
663                                rdpg->rp_hash, hash_end);
664                         GOTO(cleanup, rc);
665                 }
666         }
667         EXIT;
668 cleanup:
669         for (i = 0; i < rdpg->rp_npages; i++)
670                 if (rdpg->rp_pages[i] != NULL)
671                         cfs_free_page(rdpg->rp_pages[i]);
672         return rc;
673 }
674
675 /**
676  * Directory splitting.
677  *
678  * Big directory can be split eventually.
679  */
680 int cmm_split_dir(const struct lu_env *env, struct md_object *mo)
681 {
682         struct cmm_device *cmm = cmm_obj2dev(md2cmm_obj(mo));
683         struct md_attr    *ma = &cmm_env_info(env)->cmi_ma;
684         int                rc = 0, split;
685         struct lu_buf     *buf;
686         ENTRY;
687
688         cmm_lprocfs_time_start(env);
689
690         LASSERT(S_ISDIR(lu_object_attr(&mo->mo_lu)));
691         memset(ma, 0, sizeof(*ma));
692
693         /** - Step1: Checking whether the dir needs to be split. */
694         rc = cmm_split_expect(env, mo, ma, &split);
695         if (rc)
696                 GOTO(out, rc);
697
698         if (split != CMM_SPLIT_NEEDED) {
699                 /* No split is needed, caller may proceed with create. */
700                 GOTO(out, rc = 0);
701         }
702
703         /* Split should be done now, let's do it. */
704         CWARN("Dir "DFID" is going to split (size: "LPU64")\n",
705               PFID(lu_object_fid(&mo->mo_lu)), ma->ma_attr.la_size);
706
707         /**
708          * /note Disable transactions for split, since there will be so many trans in
709          * this one ops, conflict with current recovery design.
710          */
711         rc = cmm_upcall(env, &cmm->cmm_md_dev, MD_NO_TRANS, NULL);
712         if (rc) {
713                 CERROR("Can't disable trans for split, rc %d\n", rc);
714                 GOTO(out, rc);
715         }
716
717         /** - Step2: Prepare the md memory */
718         ma->ma_lmv_size = CMM_MD_SIZE(cmm->cmm_tgt_count + 1);
719         OBD_ALLOC(ma->ma_lmv, ma->ma_lmv_size);
720         if (ma->ma_lmv == NULL)
721                 GOTO(out, rc = -ENOMEM);
722
723         /** - Step3: Create slave objects and fill the ma->ma_lmv */
724         rc = cmm_split_slaves_create(env, mo, ma);
725         if (rc) {
726                 CERROR("Can't create slaves for split, rc %d\n", rc);
727                 GOTO(cleanup, rc);
728         }
729
730         /** - Step4: Scan and split the object. */
731         rc = cmm_split_process_dir(env, mo, ma);
732         if (rc) {
733                 CERROR("Can't scan and split, rc %d\n", rc);
734                 GOTO(cleanup, rc);
735         }
736
737         /** - Step5: Set mea to the master object. */
738         buf = cmm_buf_get(env, ma->ma_lmv, ma->ma_lmv_size);
739         rc = mo_xattr_set(env, md_object_next(mo), buf,
740                           MDS_LMV_MD_NAME, 0);
741         if (rc) {
742                 CERROR("Can't set MEA to master dir, " "rc %d\n", rc);
743                 GOTO(cleanup, rc);
744         }
745
746         /* set flag in cmm_object */
747         md2cml_obj(mo)->clo_split = CMM_SPLIT_DONE;
748
749         /**
750          * - Finally, split succeed, tell client to repeat opetartion on correct
751          * MDT.
752          */
753         CWARN("Dir "DFID" has been split\n", PFID(lu_object_fid(&mo->mo_lu)));
754         rc = -ERESTART;
755         EXIT;
756 cleanup:
757         OBD_FREE(ma->ma_lmv, ma->ma_lmv_size);
758 out:
759         cmm_lprocfs_time_end(env, cmm, LPROC_CMM_SPLIT);
760         return rc;
761 }
762 /** @} */