1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
6 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
8 * This program is free software; you can redistribute it and/or modify
9 * it under the terms of the GNU General Public License version 2 only,
10 * as published by the Free Software Foundation.
12 * This program is distributed in the hope that it will be useful, but
13 * WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * General Public License version 2 for more details (a copy is included
16 * in the LICENSE file that accompanied this code).
18 * You should have received a copy of the GNU General Public License
19 * version 2 along with this program; If not, see
20 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
22 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23 * CA 95054 USA or visit www.sun.com if you need additional information or
29 * Copyright 2008 Sun Microsystems, Inc. All rights reserved
30 * Use is subject to license terms.
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
36 * lustre/cmm/cmm_split.c
38 * Lustre splitting dir
40 * Author: Alex Thomas <alex@clusterfs.com>
41 * Author: Wang Di <wangdi@clusterfs.com>
42 * Author: Yury Umanets <umka@clusterfs.com>
46 # define EXPORT_SYMTAB
49 #define DEBUG_SUBSYSTEM S_MDS
51 #include <obd_class.h>
52 #include <lustre_fid.h>
53 #include <lustre_mds.h>
54 #include <lustre/lustre_idl.h>
55 #include "cmm_internal.h"
56 #include "mdc_internal.h"
59 CMM_SPLIT_SIZE = 128 * 1024
63 * This function checks if passed @name come to correct server (local MDT). If
64 * not - return -ERESTART and let client know that dir was split and client
65 * needs to chose correct stripe.
67 int cmm_split_check(const struct lu_env *env, struct md_object *mp,
70 struct cmm_device *cmm = cmm_obj2dev(md2cmm_obj(mp));
71 struct md_attr *ma = &cmm_env_info(env)->cmi_ma;
72 struct cml_object *clo = md2cml_obj(mp);
76 cmm_lprocfs_time_start(env);
79 if (clo->clo_split == CMM_SPLIT_NONE ||
80 clo->clo_split == CMM_SPLIT_DENIED)
83 lmv_size = CMM_MD_SIZE(cmm->cmm_tgt_count + 1);
85 /* Try to get the LMV EA */
86 memset(ma, 0, sizeof(*ma));
89 ma->ma_lmv_size = lmv_size;
90 OBD_ALLOC(ma->ma_lmv, lmv_size);
91 if (ma->ma_lmv == NULL)
92 GOTO(out, rc = -ENOMEM);
94 /* Get LMV EA, Note: refresh valid here for getting LMV_EA */
95 rc = mo_attr_get(env, mp, ma);
99 /* No LMV just return */
100 if (!(ma->ma_valid & MA_LMV)) {
101 /* update split state if unknown */
102 if (clo->clo_split == CMM_SPLIT_UNKNOWN)
103 clo->clo_split = CMM_SPLIT_NONE;
104 GOTO(cleanup, rc = 0);
107 /* Skip checking the slave dirs (mea_count is 0) */
108 if (ma->ma_lmv->mea_count != 0) {
112 * Get stripe by name to check the name belongs to master dir,
113 * otherwise return the -ERESTART
115 idx = mea_name2idx(ma->ma_lmv, name, strlen(name));
118 * Check if name came to correct MDT server. We suppose that if
119 * client does not know about split, it sends create operation
120 * to master MDT. And this is master job to say it that dir got
121 * split and client should orward request to correct MDT. This
122 * is why we check here if stripe zero or not. Zero stripe means
123 * master stripe. If stripe calculated from name is not zero -
129 /* update split state to DONE if unknown */
130 if (clo->clo_split == CMM_SPLIT_UNKNOWN)
131 clo->clo_split = CMM_SPLIT_DONE;
133 /* split is denied for slave dir */
134 clo->clo_split = CMM_SPLIT_DENIED;
138 OBD_FREE(ma->ma_lmv, lmv_size);
140 cmm_lprocfs_time_end(env, cmm, LPROC_CMM_SPLIT_CHECK);
145 * Return preferable access mode to caller taking into account possible split
146 * and the fact of existing not splittable dirs in principle.
148 int cmm_split_access(const struct lu_env *env, struct md_object *mo,
151 struct md_attr *ma = &cmm_env_info(env)->cmi_ma;
155 memset(ma, 0, sizeof(*ma));
158 * Check only if we need protection from split. If not - mdt handles
161 rc = cmm_split_expect(env, mo, ma, &split);
163 CERROR("Can't check for possible split, rc %d\n", rc);
168 * Do not take PDO lock on non-splittable objects if this is not PW,
169 * this should speed things up a bit.
171 if (split == CMM_SPLIT_DONE && lm != MDL_PW)
174 /* Protect splitting by exclusive lock. */
175 if (split == CMM_SPLIT_NEEDED && lm == MDL_PW)
179 * Have no idea about lock mode, let it be what higher layer wants.
184 /* Check if split is expected for current thread. */
185 int cmm_split_expect(const struct lu_env *env, struct md_object *mo,
186 struct md_attr *ma, int *split)
188 struct cmm_device *cmm = cmm_obj2dev(md2cmm_obj(mo));
189 struct cml_object *clo = md2cml_obj(mo);
190 struct lu_fid root_fid;
194 if (clo->clo_split == CMM_SPLIT_DONE ||
195 clo->clo_split == CMM_SPLIT_DENIED) {
196 *split = clo->clo_split;
199 /* CMM_SPLIT_UNKNOWN case below */
201 /* No need to split root object. */
202 rc = cmm_child_ops(cmm)->mdo_root_get(env, cmm->cmm_child,
207 if (lu_fid_eq(&root_fid, cmm2fid(md2cmm_obj(mo)))) {
208 /* update split state */
209 *split = clo->clo_split == CMM_SPLIT_DENIED;
214 * Assumption: ma_valid = 0 here, we only need get inode and lmv_size
217 LASSERT(ma->ma_valid == 0);
218 ma->ma_need = MA_INODE | MA_LMV;
219 rc = mo_attr_get(env, mo, ma);
223 /* No need split for already split object */
224 if (ma->ma_valid & MA_LMV) {
225 LASSERT(ma->ma_lmv_size > 0);
226 *split = clo->clo_split = CMM_SPLIT_DONE;
230 /* No need split for object whose size < CMM_SPLIT_SIZE */
231 if (ma->ma_attr.la_size < CMM_SPLIT_SIZE) {
232 *split = clo->clo_split = CMM_SPLIT_NONE;
236 *split = clo->clo_split = CMM_SPLIT_NEEDED;
240 struct cmm_object *cmm_object_find(const struct lu_env *env,
241 struct cmm_device *d,
242 const struct lu_fid *f)
245 struct cmm_object *m;
248 o = lu_object_find(env, d->cmm_md_dev.md_lu_dev.ld_site, f);
250 m = (struct cmm_object *)o;
252 m = lu2cmm_obj(lu_object_locate(o->lo_header,
253 d->cmm_md_dev.md_lu_dev.ld_type));
257 static inline void cmm_object_put(const struct lu_env *env,
258 struct cmm_object *o)
260 lu_object_put(env, &o->cmo_obj.mo_lu);
264 * Allocate new on passed @mc for slave object which is going to create there
267 static int cmm_split_fid_alloc(const struct lu_env *env,
268 struct cmm_device *cmm,
269 struct mdc_device *mc,
275 LASSERT(cmm != NULL && mc != NULL && fid != NULL);
277 down(&mc->mc_fid_sem);
279 /* Alloc new fid on @mc. */
280 rc = obd_fid_alloc(mc->mc_desc.cl_exp, fid, NULL);
282 /* Setup FLD for new sequenceif needed. */
283 rc = fld_client_create(cmm->cmm_fld, fid_seq(fid),
286 CERROR("Can't create fld entry, rc %d\n", rc);
293 /* Allocate new slave object on passed @mc */
294 static int cmm_split_slave_create(const struct lu_env *env,
295 struct cmm_device *cmm,
296 struct mdc_device *mc,
299 struct lmv_stripe_md *lmv,
302 struct md_op_spec *spec = &cmm_env_info(env)->cmi_spec;
303 struct cmm_object *obj;
307 /* Allocate new fid and store it to @fid */
308 rc = cmm_split_fid_alloc(env, cmm, mc, fid);
310 CERROR("Can't alloc new fid on "LPU64
311 ", rc %d\n", mc->mc_num, rc);
315 /* Allocate new object on @mc */
316 obj = cmm_object_find(env, cmm, fid);
318 RETURN(PTR_ERR(obj));
320 memset(spec, 0, sizeof *spec);
321 spec->u.sp_ea.fid = fid;
322 spec->u.sp_ea.eadata = lmv;
323 spec->u.sp_ea.eadatalen = lmv_size;
324 spec->sp_cr_flags |= MDS_CREATE_SLAVE_OBJ;
325 rc = mo_object_create(env, md_object_next(&obj->cmo_obj),
327 cmm_object_put(env, obj);
332 * Create so many slaves as number of stripes. This is called in split time
333 * before sending pages to slaves.
335 static int cmm_split_slaves_create(const struct lu_env *env,
336 struct md_object *mo,
339 struct cmm_device *cmm = cmm_obj2dev(md2cmm_obj(mo));
340 struct lu_fid *lf = cmm2fid(md2cmm_obj(mo));
341 struct lmv_stripe_md *slave_lmv = &cmm_env_info(env)->cmi_lmv;
342 struct mdc_device *mc, *tmp;
343 struct lmv_stripe_md *lmv;
347 /* Init the split MEA */
349 lmv->mea_master = cmm->cmm_local_num;
350 lmv->mea_magic = MEA_MAGIC_HASH_SEGMENT;
351 lmv->mea_count = cmm->cmm_tgt_count + 1;
354 * Store master FID to local node idx number. Local node is always
355 * master and its stripe number if 0.
357 lmv->mea_ids[0] = *lf;
359 memset(slave_lmv, 0, sizeof *slave_lmv);
360 slave_lmv->mea_master = cmm->cmm_local_num;
361 slave_lmv->mea_magic = MEA_MAGIC_HASH_SEGMENT;
362 slave_lmv->mea_count = 0;
364 list_for_each_entry_safe(mc, tmp, &cmm->cmm_targets, mc_linkage) {
365 rc = cmm_split_slave_create(env, cmm, mc, &lmv->mea_ids[i],
366 ma, slave_lmv, sizeof(*slave_lmv));
376 static inline int cmm_split_special_entry(struct lu_dirent *ent)
378 if (!strncmp(ent->lde_name, ".", le16_to_cpu(ent->lde_namelen)) ||
379 !strncmp(ent->lde_name, "..", le16_to_cpu(ent->lde_namelen)))
384 static inline struct lu_name *cmm_name(const struct lu_env *env,
385 char *name, int buflen)
387 struct lu_name *lname;
388 struct cmm_thread_info *cmi;
391 LASSERT(name[buflen - 1] == '\0');
393 cmi = cmm_env_info(env);
394 lname = &cmi->cti_name;
395 lname->ln_name = name;
396 /* do NOT count the terminating '\0' of name for length */
397 lname->ln_namelen = buflen - 1;
402 * Remove one entry from local MDT. Do not corrupt byte order in page, it will
403 * be sent to remote MDT.
405 static int cmm_split_remove_entry(const struct lu_env *env,
406 struct md_object *mo,
407 struct lu_dirent *ent)
409 struct cmm_device *cmm = cmm_obj2dev(md2cmm_obj(mo));
410 struct cmm_thread_info *cmi;
412 struct cmm_object *obj;
415 struct lu_name *lname;
418 if (cmm_split_special_entry(ent))
421 fid_le_to_cpu(&cmm_env_info(env)->cmi_fid, &ent->lde_fid);
422 obj = cmm_object_find(env, cmm, &cmm_env_info(env)->cmi_fid);
424 RETURN(PTR_ERR(obj));
426 cmi = cmm_env_info(env);
429 if (lu_object_exists(&obj->cmo_obj.mo_lu) > 0)
430 is_dir = S_ISDIR(lu_object_attr(&obj->cmo_obj.mo_lu));
433 * XXX: These days only cross-ref dirs are possible, so for the
434 * sake of simplicity, in split, we suppose that all cross-ref
435 * names point to directory and do not do additional getattr to
440 OBD_ALLOC(name, le16_to_cpu(ent->lde_namelen) + 1);
442 GOTO(cleanup, rc = -ENOMEM);
444 memcpy(name, ent->lde_name, le16_to_cpu(ent->lde_namelen));
445 lname = cmm_name(env, name, le16_to_cpu(ent->lde_namelen) + 1);
447 * When split, no need update parent's ctime,
448 * and no permission check for name_remove.
450 ma->ma_attr.la_ctime = 0;
452 ma->ma_attr.la_mode = S_IFDIR;
454 ma->ma_attr.la_mode = 0;
455 ma->ma_attr.la_valid = LA_MODE;
456 ma->ma_valid = MA_INODE;
458 ma->ma_attr_flags |= MDS_PERM_BYPASS;
459 rc = mdo_name_remove(env, md_object_next(mo), lname, ma);
460 OBD_FREE(name, le16_to_cpu(ent->lde_namelen) + 1);
465 * This @ent will be transferred to slave MDS and insert there, so in
466 * the slave MDS, we should know whether this object is dir or not, so
467 * use the highest bit of the hash to indicate that (because we do not
468 * use highest bit of hash).
471 ent->lde_hash = le64_to_cpu(ent->lde_hash);
472 ent->lde_hash = cpu_to_le64(ent->lde_hash | MAX_HASH_HIGHEST_BIT);
476 cmm_object_put(env, obj);
481 * Remove all entries from passed page. These entries are going to remote MDT
482 * and thus should be removed locally.
484 static int cmm_split_remove_page(const struct lu_env *env,
485 struct md_object *mo,
486 struct lu_rdpg *rdpg,
487 __u64 hash_end, __u32 *len)
489 struct lu_dirpage *dp;
490 struct lu_dirent *ent;
495 cfs_kmap(rdpg->rp_pages[0]);
496 dp = page_address(rdpg->rp_pages[0]);
497 for (ent = lu_dirent_start(dp);
498 ent != NULL && le64_to_cpu(ent->lde_hash) < hash_end;
499 ent = lu_dirent_next(ent)) {
500 rc = cmm_split_remove_entry(env, mo, ent);
503 * XXX: Error handler to insert remove name back,
504 * currently we assumed it will success anyway in
507 CERROR("Can not del %*.*s, rc %d\n",
508 le16_to_cpu(ent->lde_namelen),
509 le16_to_cpu(ent->lde_namelen),
513 *len += lu_dirent_size(ent);
516 if (ent != lu_dirent_start(dp))
517 *len += sizeof(struct lu_dirpage);
520 cfs_kunmap(rdpg->rp_pages[0]);
524 /* Send one page to remote MDT for creating entries there. */
525 static int cmm_split_send_page(const struct lu_env *env,
526 struct md_object *mo,
527 struct lu_rdpg *rdpg,
528 struct lu_fid *fid, int len)
530 struct cmm_device *cmm = cmm_obj2dev(md2cmm_obj(mo));
531 struct cmm_object *obj;
535 obj = cmm_object_find(env, cmm, fid);
537 RETURN(PTR_ERR(obj));
539 rc = mdc_send_page(cmm, env, md_object_next(&obj->cmo_obj),
540 rdpg->rp_pages[0], len);
541 cmm_object_put(env, obj);
545 /* Read one page of entries from local MDT. */
546 static int cmm_split_read_page(const struct lu_env *env,
547 struct md_object *mo,
548 struct lu_rdpg *rdpg)
552 memset(cfs_kmap(rdpg->rp_pages[0]), 0, CFS_PAGE_SIZE);
553 cfs_kunmap(rdpg->rp_pages[0]);
554 rc = mo_readpage(env, md_object_next(mo), rdpg);
559 * This function performs migration of all pages with entries which fit into one
560 * stripe and one hash segment.
562 static int cmm_split_process_stripe(const struct lu_env *env,
563 struct md_object *mo,
564 struct lu_rdpg *rdpg,
571 LASSERT(rdpg->rp_npages == 1);
573 struct lu_dirpage *ldp;
576 /* Read one page from local MDT. */
577 rc = cmm_split_read_page(env, mo, rdpg);
579 CERROR("Error in readpage: %d\n", rc);
583 /* Remove local entries which are going to remite MDT. */
584 rc = cmm_split_remove_page(env, mo, rdpg, end, &len);
586 CERROR("Error in remove stripe entries: %d\n", rc);
590 /* Send entries page to slave MDT. */
592 rc = cmm_split_send_page(env, mo, rdpg, lf, len);
594 CERROR("Error in sending page: %d\n", rc);
599 cfs_kmap(rdpg->rp_pages[0]);
600 ldp = page_address(rdpg->rp_pages[0]);
601 if (le64_to_cpu(ldp->ldp_hash_end) >= end)
604 rdpg->rp_hash = le64_to_cpu(ldp->ldp_hash_end);
605 cfs_kunmap(rdpg->rp_pages[0]);
611 static int cmm_split_process_dir(const struct lu_env *env,
612 struct md_object *mo,
615 struct cmm_device *cmm = cmm_obj2dev(md2cmm_obj(mo));
616 struct lu_rdpg *rdpg = &cmm_env_info(env)->cmi_rdpg;
621 memset(rdpg, 0, sizeof *rdpg);
622 rdpg->rp_npages = CMM_SPLIT_PAGE_COUNT;
623 rdpg->rp_count = CFS_PAGE_SIZE * rdpg->rp_npages;
624 rdpg->rp_pages = cmm_env_info(env)->cmi_pages;
626 for (i = 0; i < rdpg->rp_npages; i++) {
627 rdpg->rp_pages[i] = cfs_alloc_page(CFS_ALLOC_STD);
628 if (rdpg->rp_pages[i] == NULL)
629 GOTO(cleanup, rc = -ENOMEM);
632 hash_segment = MAX_HASH_SIZE;
633 do_div(hash_segment, cmm->cmm_tgt_count + 1);
634 for (i = 1; i < cmm->cmm_tgt_count + 1; i++) {
638 lf = &ma->ma_lmv->mea_ids[i];
640 rdpg->rp_hash = i * hash_segment;
641 if (i == cmm->cmm_tgt_count)
642 hash_end = MAX_HASH_SIZE;
644 hash_end = rdpg->rp_hash + hash_segment;
645 rc = cmm_split_process_stripe(env, mo, rdpg, lf, hash_end);
647 CERROR("Error (rc = %d) while splitting for %d: fid="
648 DFID", "LPX64":"LPX64"\n", rc, i, PFID(lf),
649 rdpg->rp_hash, hash_end);
655 for (i = 0; i < rdpg->rp_npages; i++)
656 if (rdpg->rp_pages[i] != NULL)
657 __cfs_free_page(rdpg->rp_pages[i]);
661 int cmm_split_dir(const struct lu_env *env, struct md_object *mo)
663 struct cmm_device *cmm = cmm_obj2dev(md2cmm_obj(mo));
664 struct md_attr *ma = &cmm_env_info(env)->cmi_ma;
669 cmm_lprocfs_time_start(env);
671 LASSERT(S_ISDIR(lu_object_attr(&mo->mo_lu)));
672 memset(ma, 0, sizeof(*ma));
674 /* Step1: Checking whether the dir needs to be split. */
675 rc = cmm_split_expect(env, mo, ma, &split);
679 if (split != CMM_SPLIT_NEEDED) {
680 /* No split is needed, caller may proceed with create. */
684 /* Split should be done now, let's do it. */
685 CWARN("Dir "DFID" is going to split (size: "LPU64")\n",
686 PFID(lu_object_fid(&mo->mo_lu)), ma->ma_attr.la_size);
689 * Disable transacrions for split, since there will be so many trans in
690 * this one ops, conflict with current recovery design.
692 rc = cmm_upcall(env, &cmm->cmm_md_dev, MD_NO_TRANS);
694 CERROR("Can't disable trans for split, rc %d\n", rc);
698 /* Step2: Prepare the md memory */
699 ma->ma_lmv_size = CMM_MD_SIZE(cmm->cmm_tgt_count + 1);
700 OBD_ALLOC(ma->ma_lmv, ma->ma_lmv_size);
701 if (ma->ma_lmv == NULL)
702 GOTO(out, rc = -ENOMEM);
704 /* Step3: Create slave objects and fill the ma->ma_lmv */
705 rc = cmm_split_slaves_create(env, mo, ma);
707 CERROR("Can't create slaves for split, rc %d\n", rc);
711 /* Step4: Scan and split the object. */
712 rc = cmm_split_process_dir(env, mo, ma);
714 CERROR("Can't scan and split, rc %d\n", rc);
718 /* Step5: Set mea to the master object. */
719 buf = cmm_buf_get(env, ma->ma_lmv, ma->ma_lmv_size);
720 rc = mo_xattr_set(env, md_object_next(mo), buf,
723 CERROR("Can't set MEA to master dir, " "rc %d\n", rc);
727 /* set flag in cmm_object */
728 md2cml_obj(mo)->clo_split = CMM_SPLIT_DONE;
731 * Finally, split succeed, tell client to repeat opetartion on correct
734 CWARN("Dir "DFID" has been split\n", PFID(lu_object_fid(&mo->mo_lu)));
738 OBD_FREE(ma->ma_lmv, ma->ma_lmv_size);
740 cmm_lprocfs_time_end(env, cmm, LPROC_CMM_SPLIT);