4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21 * CA 95054 USA or visit www.sun.com if you need additional information or
27 * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
28 * Use is subject to license terms.
31 * This file is part of Lustre, http://www.lustre.org/
32 * Lustre is a trademark of Sun Microsystems, Inc.
34 * lustre/cmm/cmm_split.c
36 * Lustre splitting dir
38 * Author: Alex Thomas <alex@clusterfs.com>
39 * Author: Wang Di <wangdi@clusterfs.com>
40 * Author: Yury Umanets <umka@clusterfs.com>
43 #define DEBUG_SUBSYSTEM S_MDS
45 #include <obd_class.h>
46 #include <lustre_fid.h>
47 #include <lustre_mds.h>
48 #include <lustre/lustre_idl.h>
49 #include "cmm_internal.h"
50 #include "mdc_internal.h"
57 CMM_SPLIT_SIZE = 128 * 1024
61 * This function checks if passed \a name come to correct server (local MDT).
63 * \param mp Parent directory
64 * \param name Name to lookup
65 * \retval -ERESTART Let client know that dir was split and client needs to
66 * chose correct stripe.
68 int cmm_split_check(const struct lu_env *env, struct md_object *mp,
71 struct cmm_device *cmm = cmm_obj2dev(md2cmm_obj(mp));
72 struct md_attr *ma = &cmm_env_info(env)->cmi_ma;
73 struct cml_object *clo = md2cml_obj(mp);
77 cmm_lprocfs_time_start(env);
80 if (clo->clo_split == CMM_SPLIT_NONE ||
81 clo->clo_split == CMM_SPLIT_DENIED)
84 lmv_size = CMM_MD_SIZE(cmm->cmm_tgt_count + 1);
86 /* Try to get the LMV EA */
87 memset(ma, 0, sizeof(*ma));
90 ma->ma_lmv_size = lmv_size;
91 OBD_ALLOC(ma->ma_lmv, lmv_size);
92 if (ma->ma_lmv == NULL)
93 GOTO(out, rc = -ENOMEM);
95 /* Get LMV EA, Note: refresh valid here for getting LMV_EA */
96 rc = mo_attr_get(env, mp, ma);
100 /* No LMV just return */
101 if (!(ma->ma_valid & MA_LMV)) {
102 /* update split state if unknown */
103 if (clo->clo_split == CMM_SPLIT_UNKNOWN)
104 clo->clo_split = CMM_SPLIT_NONE;
105 GOTO(cleanup, rc = 0);
108 /* Skip checking the slave dirs (mea_count is 0) */
109 if (ma->ma_lmv->mea_count != 0) {
113 * This gets stripe by name to check the name belongs to master
114 * dir, otherwise return the -ERESTART
116 idx = mea_name2idx(ma->ma_lmv, name, strlen(name));
119 * When client does not know about split, it sends create() to
120 * the master MDT and master replay back if directory is split.
121 * So client should orward request to correct MDT. This
122 * is why we check here if stripe zero or not. Zero stripe means
123 * master stripe. If stripe calculated from name is not zero -
129 /* update split state to DONE if unknown */
130 if (clo->clo_split == CMM_SPLIT_UNKNOWN)
131 clo->clo_split = CMM_SPLIT_DONE;
133 /* split is denied for slave dir */
134 clo->clo_split = CMM_SPLIT_DENIED;
138 OBD_FREE(ma->ma_lmv, lmv_size);
140 cmm_lprocfs_time_end(env, cmm, LPROC_CMM_SPLIT_CHECK);
145 * Return preferable access mode to the caller taking into account the split
146 * case and the fact of existing not splittable dirs.
148 int cmm_split_access(const struct lu_env *env, struct md_object *mo,
151 struct md_attr *ma = &cmm_env_info(env)->cmi_ma;
155 memset(ma, 0, sizeof(*ma));
158 * Check only if we need protection from split. If not - mdt handles
161 rc = cmm_split_expect(env, mo, ma, &split);
163 CERROR("Can't check for possible split, rc %d\n", rc);
168 * Do not take PDO lock on non-splittable objects if this is not PW,
169 * this should speed things up a bit.
171 if (split == CMM_SPLIT_DONE && lm != MDL_PW)
174 /* Protect splitting by exclusive lock. */
175 if (split == CMM_SPLIT_NEEDED && lm == MDL_PW)
179 * Have no idea about lock mode, let it be what higher layer wants.
185 * Check if split is expected for current thread.
187 * \param mo Directory to split.
188 * \param ma md attributes.
189 * \param split Flag to save split information.
191 int cmm_split_expect(const struct lu_env *env, struct md_object *mo,
192 struct md_attr *ma, int *split)
194 struct cmm_device *cmm = cmm_obj2dev(md2cmm_obj(mo));
195 struct cml_object *clo = md2cml_obj(mo);
196 struct lu_fid root_fid;
200 if (clo->clo_split == CMM_SPLIT_DONE ||
201 clo->clo_split == CMM_SPLIT_DENIED) {
202 *split = clo->clo_split;
205 /* CMM_SPLIT_UNKNOWN case below */
207 /* No need to split root object. */
208 rc = cmm_child_ops(cmm)->mdo_root_get(env, cmm->cmm_child,
213 if (lu_fid_eq(&root_fid, cmm2fid(md2cmm_obj(mo)))) {
214 /* update split state */
215 *split = clo->clo_split == CMM_SPLIT_DENIED;
220 * Assumption: ma_valid = 0 here, we only need get inode and lmv_size
223 LASSERT(ma->ma_valid == 0);
224 ma->ma_need = MA_INODE | MA_LMV;
225 rc = mo_attr_get(env, mo, ma);
229 /* No need split for already split object */
230 if (ma->ma_valid & MA_LMV) {
231 LASSERT(ma->ma_lmv_size > 0);
232 *split = clo->clo_split = CMM_SPLIT_DONE;
236 /* No need split for object whose size < CMM_SPLIT_SIZE */
237 if (ma->ma_attr.la_size < CMM_SPLIT_SIZE) {
238 *split = clo->clo_split = CMM_SPLIT_NONE;
242 *split = clo->clo_split = CMM_SPLIT_NEEDED;
246 struct cmm_object *cmm_object_find(const struct lu_env *env,
247 struct cmm_device *d,
248 const struct lu_fid *f)
250 return md2cmm_obj(md_object_find_slice(env, &d->cmm_md_dev, fid));
253 static inline void cmm_object_put(const struct lu_env *env,
254 struct cmm_object *o)
256 lu_object_put(env, &o->cmo_obj.mo_lu);
260 * Allocate new FID on passed \a mc for slave object which is going to
263 static int cmm_split_fid_alloc(const struct lu_env *env,
264 struct cmm_device *cmm,
265 struct mdc_device *mc,
271 LASSERT(cmm != NULL && mc != NULL && fid != NULL);
273 cfs_down(&mc->mc_fid_sem);
275 /* Alloc new fid on \a mc. */
276 rc = obd_fid_alloc(mc->mc_desc.cl_exp, fid, NULL);
279 cfs_up(&mc->mc_fid_sem);
285 * Allocate new slave object on passed \a mc.
287 static int cmm_split_slave_create(const struct lu_env *env,
288 struct cmm_device *cmm,
289 struct mdc_device *mc,
292 struct lmv_stripe_md *lmv,
295 struct md_op_spec *spec = &cmm_env_info(env)->cmi_spec;
296 struct cmm_object *obj;
300 /* Allocate new fid and store it to @fid */
301 rc = cmm_split_fid_alloc(env, cmm, mc, fid);
303 CERROR("Can't alloc new fid on "LPU64
304 ", rc %d\n", mc->mc_num, rc);
308 /* Allocate new object on @mc */
309 obj = cmm_object_find(env, cmm, fid);
311 RETURN(PTR_ERR(obj));
313 memset(spec, 0, sizeof *spec);
314 spec->u.sp_ea.fid = fid;
315 spec->u.sp_ea.eadata = lmv;
316 spec->u.sp_ea.eadatalen = lmv_size;
317 spec->sp_cr_flags |= MDS_CREATE_SLAVE_OBJ;
318 rc = mo_object_create(env, md_object_next(&obj->cmo_obj),
320 cmm_object_put(env, obj);
325 * Create so many slaves as number of stripes.
326 * This is called in split time before sending pages to slaves.
328 static int cmm_split_slaves_create(const struct lu_env *env,
329 struct md_object *mo,
332 struct cmm_device *cmm = cmm_obj2dev(md2cmm_obj(mo));
333 struct lu_fid *lf = cmm2fid(md2cmm_obj(mo));
334 struct lmv_stripe_md *slave_lmv = &cmm_env_info(env)->cmi_lmv;
335 struct mdc_device *mc, *tmp;
336 struct lmv_stripe_md *lmv;
340 /* Init the split MEA */
342 lmv->mea_master = cmm->cmm_local_num;
343 lmv->mea_magic = MEA_MAGIC_HASH_SEGMENT;
344 lmv->mea_count = cmm->cmm_tgt_count + 1;
347 * Store master FID to local node idx number. Local node is always
348 * master and its stripe number if 0.
350 lmv->mea_ids[0] = *lf;
352 memset(slave_lmv, 0, sizeof *slave_lmv);
353 slave_lmv->mea_master = cmm->cmm_local_num;
354 slave_lmv->mea_magic = MEA_MAGIC_HASH_SEGMENT;
355 slave_lmv->mea_count = 0;
357 cfs_list_for_each_entry_safe(mc, tmp, &cmm->cmm_targets, mc_linkage) {
358 rc = cmm_split_slave_create(env, cmm, mc, &lmv->mea_ids[i],
359 ma, slave_lmv, sizeof(*slave_lmv));
369 static inline int cmm_split_special_entry(struct lu_dirent *ent)
371 if (!strncmp(ent->lde_name, ".", le16_to_cpu(ent->lde_namelen)) ||
372 !strncmp(ent->lde_name, "..", le16_to_cpu(ent->lde_namelen)))
378 * Convert string to the lu_name structure.
380 static inline struct lu_name *cmm_name(const struct lu_env *env,
381 char *name, int buflen)
383 struct lu_name *lname;
384 struct cmm_thread_info *cmi;
387 LASSERT(name[buflen - 1] == '\0');
389 cmi = cmm_env_info(env);
390 lname = &cmi->cti_name;
391 lname->ln_name = name;
392 /* do NOT count the terminating '\0' of name for length */
393 lname->ln_namelen = buflen - 1;
398 * Helper for cmm_split_remove_page(). It removes one entry from local MDT.
399 * Do not corrupt byte order in page, it will be sent to remote MDT.
401 static int cmm_split_remove_entry(const struct lu_env *env,
402 struct md_object *mo,
403 struct lu_dirent *ent)
405 struct cmm_device *cmm = cmm_obj2dev(md2cmm_obj(mo));
406 struct cmm_thread_info *cmi;
408 struct cmm_object *obj;
411 struct lu_name *lname;
414 if (cmm_split_special_entry(ent))
417 fid_le_to_cpu(&cmm_env_info(env)->cmi_fid, &ent->lde_fid);
418 obj = cmm_object_find(env, cmm, &cmm_env_info(env)->cmi_fid);
420 RETURN(PTR_ERR(obj));
422 cmi = cmm_env_info(env);
425 if (lu_object_exists(&obj->cmo_obj.mo_lu) > 0)
426 is_dir = S_ISDIR(lu_object_attr(&obj->cmo_obj.mo_lu));
429 * \note These days only cross-ref dirs are possible, so for the
430 * sake of simplicity, in split, we suppose that all cross-ref
431 * names point to directory and do not do additional getattr to
436 OBD_ALLOC(name, le16_to_cpu(ent->lde_namelen) + 1);
438 GOTO(cleanup, rc = -ENOMEM);
440 memcpy(name, ent->lde_name, le16_to_cpu(ent->lde_namelen));
441 lname = cmm_name(env, name, le16_to_cpu(ent->lde_namelen) + 1);
443 * \note When split, no need update parent's ctime,
444 * and no permission check for name_remove.
446 ma->ma_attr.la_ctime = 0;
448 ma->ma_attr.la_mode = S_IFDIR;
450 ma->ma_attr.la_mode = 0;
451 ma->ma_attr.la_valid = LA_MODE;
452 ma->ma_valid = MA_INODE;
454 ma->ma_attr_flags |= MDS_PERM_BYPASS;
455 rc = mdo_name_remove(env, md_object_next(mo), lname, ma);
456 OBD_FREE(name, le16_to_cpu(ent->lde_namelen) + 1);
461 * \note For each entry transferred to the slave MDS we should know
462 * whether this object is dir or not. Therefore the highest bit of the
463 * hash is used to indicate that (it is unused for hash purposes anyway).
466 ent->lde_hash = le64_to_cpu(ent->lde_hash);
467 ent->lde_hash = cpu_to_le64(ent->lde_hash | MAX_HASH_HIGHEST_BIT);
471 cmm_object_put(env, obj);
476 * Remove all entries from passed page.
477 * These entries are going to remote MDT and thus should be removed locally.
479 static int cmm_split_remove_page(const struct lu_env *env,
480 struct md_object *mo,
481 struct lu_rdpg *rdpg,
482 __u64 hash_end, __u32 *len)
484 struct lu_dirpage *dp;
485 struct lu_dirent *ent;
490 cfs_kmap(rdpg->rp_pages[0]);
491 dp = page_address(rdpg->rp_pages[0]);
492 for (ent = lu_dirent_start(dp);
493 ent != NULL && le64_to_cpu(ent->lde_hash) < hash_end;
494 ent = lu_dirent_next(ent)) {
495 rc = cmm_split_remove_entry(env, mo, ent);
498 * XXX: Error handler to insert remove name back,
499 * currently we assumed it will success anyway in
502 CERROR("Can not del %*.*s, rc %d\n",
503 le16_to_cpu(ent->lde_namelen),
504 le16_to_cpu(ent->lde_namelen),
508 *len += lu_dirent_size(ent);
511 if (ent != lu_dirent_start(dp))
512 *len += sizeof(struct lu_dirpage);
515 cfs_kunmap(rdpg->rp_pages[0]);
520 * Send one page of entries to the slave MDT.
521 * This page contains entries to be created there.
523 static int cmm_split_send_page(const struct lu_env *env,
524 struct md_object *mo,
525 struct lu_rdpg *rdpg,
526 struct lu_fid *fid, int len)
528 struct cmm_device *cmm = cmm_obj2dev(md2cmm_obj(mo));
529 struct cmm_object *obj;
533 obj = cmm_object_find(env, cmm, fid);
535 RETURN(PTR_ERR(obj));
537 rc = mdc_send_page(cmm, env, md_object_next(&obj->cmo_obj),
538 rdpg->rp_pages[0], len);
539 cmm_object_put(env, obj);
543 /** Read one page of entries from local MDT. */
544 static int cmm_split_read_page(const struct lu_env *env,
545 struct md_object *mo,
546 struct lu_rdpg *rdpg)
550 memset(cfs_kmap(rdpg->rp_pages[0]), 0, CFS_PAGE_SIZE);
551 cfs_kunmap(rdpg->rp_pages[0]);
552 rc = mo_readpage(env, md_object_next(mo), rdpg);
557 * This function performs migration of each directory stripe to its MDS.
559 static int cmm_split_process_stripe(const struct lu_env *env,
560 struct md_object *mo,
561 struct lu_rdpg *rdpg,
568 LASSERT(rdpg->rp_npages == 1);
570 struct lu_dirpage *ldp;
573 /** - Read one page of entries from local MDT. */
574 rc = cmm_split_read_page(env, mo, rdpg);
576 CERROR("Error in readpage: %d\n", rc);
580 /** - Remove local entries which are going to remite MDT. */
581 rc = cmm_split_remove_page(env, mo, rdpg, end, &len);
583 CERROR("Error in remove stripe entries: %d\n", rc);
588 * - Send entries page to slave MDT and repeat while there are
592 rc = cmm_split_send_page(env, mo, rdpg, lf, len);
594 CERROR("Error in sending page: %d\n", rc);
599 cfs_kmap(rdpg->rp_pages[0]);
600 ldp = page_address(rdpg->rp_pages[0]);
601 if (le64_to_cpu(ldp->ldp_hash_end) >= end)
604 rdpg->rp_hash = le64_to_cpu(ldp->ldp_hash_end);
605 cfs_kunmap(rdpg->rp_pages[0]);
612 * Directory scanner for split operation.
614 * It calculates hashes for names and organizes files to stripes.
616 static int cmm_split_process_dir(const struct lu_env *env,
617 struct md_object *mo,
620 struct cmm_device *cmm = cmm_obj2dev(md2cmm_obj(mo));
621 struct lu_rdpg *rdpg = &cmm_env_info(env)->cmi_rdpg;
626 memset(rdpg, 0, sizeof *rdpg);
627 rdpg->rp_npages = CMM_SPLIT_PAGE_COUNT;
628 rdpg->rp_count = CFS_PAGE_SIZE * rdpg->rp_npages;
629 rdpg->rp_pages = cmm_env_info(env)->cmi_pages;
631 for (i = 0; i < rdpg->rp_npages; i++) {
632 rdpg->rp_pages[i] = cfs_alloc_page(CFS_ALLOC_STD);
633 if (rdpg->rp_pages[i] == NULL)
634 GOTO(cleanup, rc = -ENOMEM);
637 hash_segment = MAX_HASH_SIZE;
638 /** Whole hash range is divided on segments by number of MDS-es. */
639 do_div(hash_segment, cmm->cmm_tgt_count + 1);
641 * For each segment the cmm_split_process_stripe() is called to move
642 * entries on new server.
644 for (i = 1; i < cmm->cmm_tgt_count + 1; i++) {
648 lf = &ma->ma_lmv->mea_ids[i];
650 rdpg->rp_hash = i * hash_segment;
651 if (i == cmm->cmm_tgt_count)
652 hash_end = MAX_HASH_SIZE;
654 hash_end = rdpg->rp_hash + hash_segment;
655 rc = cmm_split_process_stripe(env, mo, rdpg, lf, hash_end);
657 CERROR("Error (rc = %d) while splitting for %d: fid="
658 DFID", "LPX64":"LPX64"\n", rc, i, PFID(lf),
659 rdpg->rp_hash, hash_end);
665 for (i = 0; i < rdpg->rp_npages; i++)
666 if (rdpg->rp_pages[i] != NULL)
667 cfs_free_page(rdpg->rp_pages[i]);
672 * Directory splitting.
674 * Big directory can be split eventually.
676 int cmm_split_dir(const struct lu_env *env, struct md_object *mo)
678 struct cmm_device *cmm = cmm_obj2dev(md2cmm_obj(mo));
679 struct md_attr *ma = &cmm_env_info(env)->cmi_ma;
684 cmm_lprocfs_time_start(env);
686 LASSERT(S_ISDIR(lu_object_attr(&mo->mo_lu)));
687 memset(ma, 0, sizeof(*ma));
689 /** - Step1: Checking whether the dir needs to be split. */
690 rc = cmm_split_expect(env, mo, ma, &split);
694 if (split != CMM_SPLIT_NEEDED) {
695 /* No split is needed, caller may proceed with create. */
699 /* Split should be done now, let's do it. */
700 CWARN("Dir "DFID" is going to split (size: "LPU64")\n",
701 PFID(lu_object_fid(&mo->mo_lu)), ma->ma_attr.la_size);
704 * /note Disable transactions for split, since there will be so many trans in
705 * this one ops, conflict with current recovery design.
707 rc = cmm_upcall(env, &cmm->cmm_md_dev, MD_NO_TRANS, NULL);
709 CERROR("Can't disable trans for split, rc %d\n", rc);
713 /** - Step2: Prepare the md memory */
714 ma->ma_lmv_size = CMM_MD_SIZE(cmm->cmm_tgt_count + 1);
715 OBD_ALLOC(ma->ma_lmv, ma->ma_lmv_size);
716 if (ma->ma_lmv == NULL)
717 GOTO(out, rc = -ENOMEM);
719 /** - Step3: Create slave objects and fill the ma->ma_lmv */
720 rc = cmm_split_slaves_create(env, mo, ma);
722 CERROR("Can't create slaves for split, rc %d\n", rc);
726 /** - Step4: Scan and split the object. */
727 rc = cmm_split_process_dir(env, mo, ma);
729 CERROR("Can't scan and split, rc %d\n", rc);
733 /** - Step5: Set mea to the master object. */
734 buf = cmm_buf_get(env, ma->ma_lmv, ma->ma_lmv_size);
735 rc = mo_xattr_set(env, md_object_next(mo), buf,
738 CERROR("Can't set MEA to master dir, " "rc %d\n", rc);
742 /* set flag in cmm_object */
743 md2cml_obj(mo)->clo_split = CMM_SPLIT_DONE;
746 * - Finally, split succeed, tell client to repeat opetartion on correct
749 CWARN("Dir "DFID" has been split\n", PFID(lu_object_fid(&mo->mo_lu)));
753 OBD_FREE(ma->ma_lmv, ma->ma_lmv_size);
755 cmm_lprocfs_time_end(env, cmm, LPROC_CMM_SPLIT);