1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
4 * lustre/cmm/cmm_split.c
7 * Copyright (c) 2006 Cluster File Systems, Inc.
8 * Author: Alex Thomas <alex@clusterfs.com>
9 * Wang Di <wangdi@clusterfs.com>
10 * Yury Umanets <umka@clusterfs.com>
12 * This file is part of the Lustre file system, http://www.lustre.org
13 * Lustre is a trademark of Cluster File Systems, Inc.
15 * You may have signed or agreed to another license before downloading
16 * this software. If so, you are bound by the terms and conditions
17 * of that agreement, and the following does not apply to you. See the
18 * LICENSE file included with this distribution for more information.
20 * If you did not agree to a different license, then this copy of Lustre
21 * is open source software; you can redistribute it and/or modify it
22 * under the terms of version 2 of the GNU General Public License as
23 * published by the Free Software Foundation.
25 * In either case, Lustre is distributed in the hope that it will be
26 * useful, but WITHOUT ANY WARRANTY; without even the implied warranty
27 * of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
28 * license text for more details.
32 # define EXPORT_SYMTAB
35 #define DEBUG_SUBSYSTEM S_MDS
37 #include <obd_class.h>
38 #include <lustre_fid.h>
39 #include <lustre_mds.h>
40 #include <lustre/lustre_idl.h>
41 #include "cmm_internal.h"
42 #include "mdc_internal.h"
45 CMM_SPLIT_SIZE = 128 * 1024
49 * This function checks if passed @name come to correct server (local MDT). If
50 * not - return -ERESTART and let client know that dir was split and client
51 * needs to chose correct stripe.
53 int cmm_split_check(const struct lu_env *env, struct md_object *mp,
56 struct cmm_device *cmm = cmm_obj2dev(md2cmm_obj(mp));
57 struct md_attr *ma = &cmm_env_info(env)->cmi_ma;
58 struct cml_object *clo = md2cml_obj(mp);
62 cmm_lprocfs_time_start(env);
65 if (clo->clo_split == CMM_SPLIT_NONE ||
66 clo->clo_split == CMM_SPLIT_DENIED)
69 lmv_size = CMM_MD_SIZE(cmm->cmm_tgt_count + 1);
71 /* Try to get the LMV EA */
72 memset(ma, 0, sizeof(*ma));
75 ma->ma_lmv_size = lmv_size;
76 OBD_ALLOC(ma->ma_lmv, lmv_size);
77 if (ma->ma_lmv == NULL)
78 GOTO(out, rc = -ENOMEM);
80 /* Get LMV EA, Note: refresh valid here for getting LMV_EA */
81 rc = mo_attr_get(env, mp, ma);
85 /* No LMV just return */
86 if (!(ma->ma_valid & MA_LMV)) {
87 /* update split state if unknown */
88 if (clo->clo_split == CMM_SPLIT_UNKNOWN)
89 clo->clo_split = CMM_SPLIT_NONE;
90 GOTO(cleanup, rc = 0);
93 /* Skip checking the slave dirs (mea_count is 0) */
94 if (ma->ma_lmv->mea_count != 0) {
98 * Get stripe by name to check the name belongs to master dir,
99 * otherwise return the -ERESTART
101 idx = mea_name2idx(ma->ma_lmv, name, strlen(name));
104 * Check if name came to correct MDT server. We suppose that if
105 * client does not know about split, it sends create operation
106 * to master MDT. And this is master job to say it that dir got
107 * split and client should orward request to correct MDT. This
108 * is why we check here if stripe zero or not. Zero stripe means
109 * master stripe. If stripe calculated from name is not zero -
115 /* update split state to DONE if unknown */
116 if (clo->clo_split == CMM_SPLIT_UNKNOWN)
117 clo->clo_split = CMM_SPLIT_DONE;
119 /* split is denied for slave dir */
120 clo->clo_split = CMM_SPLIT_DENIED;
124 OBD_FREE(ma->ma_lmv, lmv_size);
126 cmm_lprocfs_time_end(env, cmm, LPROC_CMM_SPLIT_CHECK);
131 * Return preferable access mode to caller taking into account possible split
132 * and the fact of existing not splittable dirs in principle.
134 int cmm_split_access(const struct lu_env *env, struct md_object *mo,
137 struct md_attr *ma = &cmm_env_info(env)->cmi_ma;
141 memset(ma, 0, sizeof(*ma));
144 * Check only if we need protection from split. If not - mdt handles
147 rc = cmm_split_expect(env, mo, ma, &split);
149 CERROR("Can't check for possible split, rc %d\n", rc);
154 * Do not take PDO lock on non-splittable objects if this is not PW,
155 * this should speed things up a bit.
157 if (split == CMM_SPLIT_DONE && lm != MDL_PW)
160 /* Protect splitting by exclusive lock. */
161 if (split == CMM_SPLIT_NEEDED && lm == MDL_PW)
165 * Have no idea about lock mode, let it be what higher layer wants.
170 /* Check if split is expected for current thread. */
171 int cmm_split_expect(const struct lu_env *env, struct md_object *mo,
172 struct md_attr *ma, int *split)
174 struct cmm_device *cmm = cmm_obj2dev(md2cmm_obj(mo));
175 struct cml_object *clo = md2cml_obj(mo);
176 struct lu_fid root_fid;
180 if (clo->clo_split == CMM_SPLIT_DONE ||
181 clo->clo_split == CMM_SPLIT_DENIED) {
182 *split = clo->clo_split;
185 /* CMM_SPLIT_UNKNOWN case below */
187 /* No need to split root object. */
188 rc = cmm_child_ops(cmm)->mdo_root_get(env, cmm->cmm_child,
193 if (lu_fid_eq(&root_fid, cmm2fid(md2cmm_obj(mo)))) {
194 /* update split state */
195 *split = clo->clo_split == CMM_SPLIT_DENIED;
200 * Assumption: ma_valid = 0 here, we only need get inode and lmv_size
203 LASSERT(ma->ma_valid == 0);
204 ma->ma_need = MA_INODE | MA_LMV;
205 rc = mo_attr_get(env, mo, ma);
209 /* No need split for already split object */
210 if (ma->ma_valid & MA_LMV) {
211 LASSERT(ma->ma_lmv_size > 0);
212 *split = clo->clo_split = CMM_SPLIT_DONE;
216 /* No need split for object whose size < CMM_SPLIT_SIZE */
217 if (ma->ma_attr.la_size < CMM_SPLIT_SIZE) {
218 *split = clo->clo_split = CMM_SPLIT_NONE;
222 *split = clo->clo_split = CMM_SPLIT_NEEDED;
226 struct cmm_object *cmm_object_find(const struct lu_env *env,
227 struct cmm_device *d,
228 const struct lu_fid *f)
231 struct cmm_object *m;
234 o = lu_object_find(env, d->cmm_md_dev.md_lu_dev.ld_site, f);
236 m = (struct cmm_object *)o;
238 m = lu2cmm_obj(lu_object_locate(o->lo_header,
239 d->cmm_md_dev.md_lu_dev.ld_type));
243 static inline void cmm_object_put(const struct lu_env *env,
244 struct cmm_object *o)
246 lu_object_put(env, &o->cmo_obj.mo_lu);
250 * Allocate new on passed @mc for slave object which is going to create there
253 static int cmm_split_fid_alloc(const struct lu_env *env,
254 struct cmm_device *cmm,
255 struct mdc_device *mc,
261 LASSERT(cmm != NULL && mc != NULL && fid != NULL);
263 down(&mc->mc_fid_sem);
265 /* Alloc new fid on @mc. */
266 rc = obd_fid_alloc(mc->mc_desc.cl_exp, fid, NULL);
268 /* Setup FLD for new sequenceif needed. */
269 rc = fld_client_create(cmm->cmm_fld, fid_seq(fid),
272 CERROR("Can't create fld entry, rc %d\n", rc);
279 /* Allocate new slave object on passed @mc */
280 static int cmm_split_slave_create(const struct lu_env *env,
281 struct cmm_device *cmm,
282 struct mdc_device *mc,
285 struct lmv_stripe_md *lmv,
288 struct md_op_spec *spec = &cmm_env_info(env)->cmi_spec;
289 struct cmm_object *obj;
293 /* Allocate new fid and store it to @fid */
294 rc = cmm_split_fid_alloc(env, cmm, mc, fid);
296 CERROR("Can't alloc new fid on "LPU64
297 ", rc %d\n", mc->mc_num, rc);
301 /* Allocate new object on @mc */
302 obj = cmm_object_find(env, cmm, fid);
304 RETURN(PTR_ERR(obj));
306 memset(spec, 0, sizeof *spec);
307 spec->u.sp_ea.fid = fid;
308 spec->u.sp_ea.eadata = lmv;
309 spec->u.sp_ea.eadatalen = lmv_size;
310 spec->sp_cr_flags |= MDS_CREATE_SLAVE_OBJ;
311 rc = mo_object_create(env, md_object_next(&obj->cmo_obj),
313 cmm_object_put(env, obj);
318 * Create so many slaves as number of stripes. This is called in split time
319 * before sending pages to slaves.
321 static int cmm_split_slaves_create(const struct lu_env *env,
322 struct md_object *mo,
325 struct cmm_device *cmm = cmm_obj2dev(md2cmm_obj(mo));
326 struct lu_fid *lf = cmm2fid(md2cmm_obj(mo));
327 struct lmv_stripe_md *slave_lmv = &cmm_env_info(env)->cmi_lmv;
328 struct mdc_device *mc, *tmp;
329 struct lmv_stripe_md *lmv;
333 /* Init the split MEA */
335 lmv->mea_master = cmm->cmm_local_num;
336 lmv->mea_magic = MEA_MAGIC_HASH_SEGMENT;
337 lmv->mea_count = cmm->cmm_tgt_count + 1;
340 * Store master FID to local node idx number. Local node is always
341 * master and its stripe number if 0.
343 lmv->mea_ids[0] = *lf;
345 memset(slave_lmv, 0, sizeof *slave_lmv);
346 slave_lmv->mea_master = cmm->cmm_local_num;
347 slave_lmv->mea_magic = MEA_MAGIC_HASH_SEGMENT;
348 slave_lmv->mea_count = 0;
350 list_for_each_entry_safe(mc, tmp, &cmm->cmm_targets, mc_linkage) {
351 rc = cmm_split_slave_create(env, cmm, mc, &lmv->mea_ids[i],
352 ma, slave_lmv, sizeof(*slave_lmv));
358 ma->ma_valid |= MA_LMV;
364 static inline int cmm_split_special_entry(struct lu_dirent *ent)
366 if (!strncmp(ent->lde_name, ".", le16_to_cpu(ent->lde_namelen)) ||
367 !strncmp(ent->lde_name, "..", le16_to_cpu(ent->lde_namelen)))
372 static inline struct lu_name *cmm_name(const struct lu_env *env,
373 char *name, int buflen)
375 struct lu_name *lname;
376 struct cmm_thread_info *cmi;
379 LASSERT(name[buflen - 1] == '\0');
381 cmi = cmm_env_info(env);
382 lname = &cmi->cti_name;
383 lname->ln_name = name;
384 /* NOT count the terminating '\0' of name for length */
385 lname->ln_namelen = buflen - 1;
390 * Remove one entry from local MDT. Do not corrupt byte order in page, it will
391 * be sent to remote MDT.
393 static int cmm_split_remove_entry(const struct lu_env *env,
394 struct md_object *mo,
395 struct lu_dirent *ent)
397 struct cmm_device *cmm = cmm_obj2dev(md2cmm_obj(mo));
398 struct cmm_thread_info *cmi;
400 struct cmm_object *obj;
403 struct lu_name *lname;
406 if (cmm_split_special_entry(ent))
409 fid_le_to_cpu(&cmm_env_info(env)->cmi_fid, &ent->lde_fid);
410 obj = cmm_object_find(env, cmm, &cmm_env_info(env)->cmi_fid);
412 RETURN(PTR_ERR(obj));
414 cmi = cmm_env_info(env);
417 if (lu_object_exists(&obj->cmo_obj.mo_lu) > 0)
418 is_dir = S_ISDIR(lu_object_attr(&obj->cmo_obj.mo_lu));
421 * XXX: These days only cross-ref dirs are possible, so for the
422 * sake of simplicity, in split, we suppose that all cross-ref
423 * names pint to directory and do not do additional getattr to
428 OBD_ALLOC(name, le16_to_cpu(ent->lde_namelen) + 1);
430 GOTO(cleanup, rc = -ENOMEM);
432 memcpy(name, ent->lde_name, le16_to_cpu(ent->lde_namelen));
433 lname = cmm_name(env, name, le16_to_cpu(ent->lde_namelen) + 1);
435 * When split, no need update parent's ctime,
436 * and no permission check for name_remove.
438 ma->ma_attr.la_ctime = 0;
440 ma->ma_attr.la_mode = S_IFDIR;
442 ma->ma_attr.la_mode = 0;
443 ma->ma_attr.la_valid = LA_MODE;
444 ma->ma_valid = MA_INODE;
446 ma->ma_attr_flags |= MDS_PERM_BYPASS;
447 rc = mdo_name_remove(env, md_object_next(mo), lname, ma);
448 OBD_FREE(name, le16_to_cpu(ent->lde_namelen) + 1);
453 * This @ent will be transferred to slave MDS and insert there, so in
454 * the slave MDS, we should know whether this object is dir or not, so
455 * use the highest bit of the hash to indicate that (because we do not
456 * use highest bit of hash).
459 ent->lde_hash = le32_to_cpu(ent->lde_hash);
460 ent->lde_hash = cpu_to_le32(ent->lde_hash | MAX_HASH_HIGHEST_BIT);
464 cmm_object_put(env, obj);
469 * Remove all entries from passed page. These entries are going to remote MDT
470 * and thus should be removed locally.
472 static int cmm_split_remove_page(const struct lu_env *env,
473 struct md_object *mo,
474 struct lu_rdpg *rdpg,
475 __u32 hash_end, __u32 *len)
477 struct lu_dirpage *dp;
478 struct lu_dirent *ent;
483 kmap(rdpg->rp_pages[0]);
484 dp = page_address(rdpg->rp_pages[0]);
485 for (ent = lu_dirent_start(dp);
486 ent != NULL && le32_to_cpu(ent->lde_hash) < hash_end;
487 ent = lu_dirent_next(ent)) {
488 rc = cmm_split_remove_entry(env, mo, ent);
491 * XXX: Error handler to insert remove name back,
492 * currently we assumed it will success anyway in
495 CERROR("Can not del %*.*s, rc %d\n",
496 le16_to_cpu(ent->lde_namelen),
497 le16_to_cpu(ent->lde_namelen),
501 *len += lu_dirent_size(ent);
504 if (ent != lu_dirent_start(dp))
505 *len += sizeof(struct lu_dirpage);
508 kunmap(rdpg->rp_pages[0]);
512 /* Send one page to remote MDT for creating entries there. */
513 static int cmm_split_send_page(const struct lu_env *env,
514 struct md_object *mo,
515 struct lu_rdpg *rdpg,
516 struct lu_fid *fid, int len)
518 struct cmm_device *cmm = cmm_obj2dev(md2cmm_obj(mo));
519 struct cmm_object *obj;
523 obj = cmm_object_find(env, cmm, fid);
525 RETURN(PTR_ERR(obj));
527 rc = mdc_send_page(cmm, env, md_object_next(&obj->cmo_obj),
528 rdpg->rp_pages[0], len);
529 cmm_object_put(env, obj);
533 /* Read one page of entries from local MDT. */
534 static int cmm_split_read_page(const struct lu_env *env,
535 struct md_object *mo,
536 struct lu_rdpg *rdpg)
540 memset(cfs_kmap(rdpg->rp_pages[0]), 0, CFS_PAGE_SIZE);
541 cfs_kunmap(rdpg->rp_pages[0]);
542 rc = mo_readpage(env, md_object_next(mo), rdpg);
547 * This function performs migration of all pages with entries which fit into one
548 * stripe and one hash segment.
550 static int cmm_split_process_stripe(const struct lu_env *env,
551 struct md_object *mo,
552 struct lu_rdpg *rdpg,
559 LASSERT(rdpg->rp_npages == 1);
561 struct lu_dirpage *ldp;
564 /* Read one page from local MDT. */
565 rc = cmm_split_read_page(env, mo, rdpg);
567 CERROR("Error in readpage: %d\n", rc);
571 /* Remove local entries which are going to remite MDT. */
572 rc = cmm_split_remove_page(env, mo, rdpg, end, &len);
574 CERROR("Error in remove stripe entries: %d\n", rc);
578 /* Send entries page to slave MDT. */
580 rc = cmm_split_send_page(env, mo, rdpg, lf, len);
582 CERROR("Error in sending page: %d\n", rc);
587 kmap(rdpg->rp_pages[0]);
588 ldp = page_address(rdpg->rp_pages[0]);
589 if (le32_to_cpu(ldp->ldp_hash_end) >= end)
592 rdpg->rp_hash = le32_to_cpu(ldp->ldp_hash_end);
593 kunmap(rdpg->rp_pages[0]);
599 static int cmm_split_process_dir(const struct lu_env *env,
600 struct md_object *mo,
603 struct cmm_device *cmm = cmm_obj2dev(md2cmm_obj(mo));
604 struct lu_rdpg *rdpg = &cmm_env_info(env)->cmi_rdpg;
609 memset(rdpg, 0, sizeof *rdpg);
610 rdpg->rp_npages = CMM_SPLIT_PAGE_COUNT;
611 rdpg->rp_count = CFS_PAGE_SIZE * rdpg->rp_npages;
612 rdpg->rp_pages = cmm_env_info(env)->cmi_pages;
614 for (i = 0; i < rdpg->rp_npages; i++) {
615 rdpg->rp_pages[i] = cfs_alloc_page(CFS_ALLOC_STD);
616 if (rdpg->rp_pages[i] == NULL)
617 GOTO(cleanup, rc = -ENOMEM);
620 LASSERT(ma->ma_valid & MA_LMV);
621 hash_segement = MAX_HASH_SIZE / (cmm->cmm_tgt_count + 1);
622 for (i = 1; i < cmm->cmm_tgt_count + 1; i++) {
626 lf = &ma->ma_lmv->mea_ids[i];
628 rdpg->rp_hash = i * hash_segement;
629 if (i == cmm->cmm_tgt_count)
630 hash_end = MAX_HASH_SIZE;
632 hash_end = rdpg->rp_hash + hash_segement;
633 rc = cmm_split_process_stripe(env, mo, rdpg, lf, hash_end);
635 CERROR("Error (rc = %d) while splitting for %d: fid="
636 DFID", %08x:%08x\n", rc, i, PFID(lf),
637 rdpg->rp_hash, hash_end);
643 for (i = 0; i < rdpg->rp_npages; i++)
644 if (rdpg->rp_pages[i] != NULL)
645 __cfs_free_page(rdpg->rp_pages[i]);
649 int cmm_split_dir(const struct lu_env *env, struct md_object *mo)
651 struct cmm_device *cmm = cmm_obj2dev(md2cmm_obj(mo));
652 struct md_attr *ma = &cmm_env_info(env)->cmi_ma;
657 cmm_lprocfs_time_start(env);
659 LASSERT(S_ISDIR(lu_object_attr(&mo->mo_lu)));
660 memset(ma, 0, sizeof(*ma));
662 /* Step1: Checking whether the dir needs to be split. */
663 rc = cmm_split_expect(env, mo, ma, &split);
667 if (split != CMM_SPLIT_NEEDED) {
668 /* No split is needed, caller may proceed with create. */
672 /* Split should be done now, let's do it. */
673 CWARN("Dir "DFID" is going to split (size: "LPU64")\n",
674 PFID(lu_object_fid(&mo->mo_lu)), ma->ma_attr.la_size);
677 * Disable transacrions for split, since there will be so many trans in
678 * this one ops, conflict with current recovery design.
680 rc = cmm_upcall(env, &cmm->cmm_md_dev, MD_NO_TRANS);
682 CERROR("Can't disable trans for split, rc %d\n", rc);
686 /* Step2: Prepare the md memory */
687 ma->ma_lmv_size = CMM_MD_SIZE(cmm->cmm_tgt_count + 1);
688 OBD_ALLOC(ma->ma_lmv, ma->ma_lmv_size);
689 if (ma->ma_lmv == NULL)
690 GOTO(out, rc = -ENOMEM);
692 /* Step3: Create slave objects and fill the ma->ma_lmv */
693 rc = cmm_split_slaves_create(env, mo, ma);
695 CERROR("Can't create slaves for split, rc %d\n", rc);
699 /* Step4: Scan and split the object. */
700 rc = cmm_split_process_dir(env, mo, ma);
702 CERROR("Can't scan and split, rc %d\n", rc);
706 /* Step5: Set mea to the master object. */
707 LASSERT(ma->ma_valid & MA_LMV);
708 buf = cmm_buf_get(env, ma->ma_lmv, ma->ma_lmv_size);
709 rc = mo_xattr_set(env, md_object_next(mo), buf,
712 CERROR("Can't set MEA to master dir, " "rc %d\n", rc);
716 /* set flag in cmm_object */
717 md2cml_obj(mo)->clo_split = CMM_SPLIT_DONE;
720 * Finally, split succeed, tell client to repeat opetartion on correct
723 CWARN("Dir "DFID" has been split\n", PFID(lu_object_fid(&mo->mo_lu)));
727 OBD_FREE(ma->ma_lmv, ma->ma_lmv_size);
729 cmm_lprocfs_time_end(env, cmm, LPROC_CMM_SPLIT);