1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
4 * lustre/cmm/cmm_split.c
7 * Copyright (c) 2006 Cluster File Systems, Inc.
8 * Author: Alex Thomas <alex@clusterfs.com>
9 * Wang Di <wangdi@clusterfs.com>
10 * Yury Umanets <umka@clusterfs.com>
12 * This file is part of the Lustre file system, http://www.lustre.org
13 * Lustre is a trademark of Cluster File Systems, Inc.
15 * You may have signed or agreed to another license before downloading
16 * this software. If so, you are bound by the terms and conditions
17 * of that agreement, and the following does not apply to you. See the
18 * LICENSE file included with this distribution for more information.
20 * If you did not agree to a different license, then this copy of Lustre
21 * is open source software; you can redistribute it and/or modify it
22 * under the terms of version 2 of the GNU General Public License as
23 * published by the Free Software Foundation.
25 * In either case, Lustre is distributed in the hope that it will be
26 * useful, but WITHOUT ANY WARRANTY; without even the implied warranty
27 * of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
28 * license text for more details.
32 # define EXPORT_SYMTAB
35 #define DEBUG_SUBSYSTEM S_MDS
37 #include <obd_class.h>
38 #include <lustre_fid.h>
39 #include <lustre_mds.h>
40 #include <lustre/lustre_idl.h>
41 #include "cmm_internal.h"
42 #include "mdc_internal.h"
45 CMM_SPLIT_SIZE = 64 * 1024
49 * This function checks if passed @name come to correct server (local MDT). If
50 * not - return -ERESTART and let client know that dir was split and client
51 * needs to chose correct stripe.
53 int cmm_split_check(const struct lu_env *env, struct md_object *mp,
56 struct cmm_device *cmm = cmm_obj2dev(md2cmm_obj(mp));
57 struct md_attr *ma = &cmm_env_info(env)->cmi_ma;
58 struct cml_object *clo = md2cml_obj(mp);
63 cmm_lprocfs_time_start(cmm, &start, LPROC_CMM_SPLIT_CHECK);
66 if (clo->clo_split == CMM_SPLIT_NONE ||
67 clo->clo_split == CMM_SPLIT_DENIED)
70 lmv_size = CMM_MD_SIZE(cmm->cmm_tgt_count + 1);
72 /* Try to get the LMV EA */
73 memset(ma, 0, sizeof(*ma));
76 ma->ma_lmv_size = lmv_size;
77 OBD_ALLOC(ma->ma_lmv, lmv_size);
78 if (ma->ma_lmv == NULL)
79 GOTO(out, rc = -ENOMEM);
81 /* Get LMV EA, Note: refresh valid here for getting LMV_EA */
82 rc = mo_attr_get(env, mp, ma);
86 /* No LMV just return */
87 if (!(ma->ma_valid & MA_LMV)) {
88 /* update split state if unknown */
89 if (clo->clo_split == CMM_SPLIT_UNKNOWN)
90 clo->clo_split = CMM_SPLIT_NONE;
91 GOTO(cleanup, rc = 0);
94 /* Skip checking the slave dirs (mea_count is 0) */
95 if (ma->ma_lmv->mea_count != 0) {
99 * Get stripe by name to check the name belongs to master dir,
100 * otherwise return the -ERESTART
102 idx = mea_name2idx(ma->ma_lmv, name, strlen(name));
105 * Check if name came to correct MDT server. We suppose that if
106 * client does not know about split, it sends create operation
107 * to master MDT. And this is master job to say it that dir got
108 * split and client should orward request to correct MDT. This
109 * is why we check here if stripe zero or not. Zero stripe means
110 * master stripe. If stripe calculated from name is not zero -
116 /* update split state to DONE if unknown */
117 if (clo->clo_split == CMM_SPLIT_UNKNOWN)
118 clo->clo_split = CMM_SPLIT_DONE;
120 /* split is denied for slave dir */
121 clo->clo_split = CMM_SPLIT_DENIED;
125 OBD_FREE(ma->ma_lmv, lmv_size);
127 cmm_lprocfs_time_end(cmm, &start, LPROC_CMM_SPLIT_CHECK);
132 * Return preferable access mode to caller taking into account possible split
133 * and the fact of existing not splittable dirs in principle.
135 int cmm_split_access(const struct lu_env *env, struct md_object *mo,
138 struct md_attr *ma = &cmm_env_info(env)->cmi_ma;
142 memset(ma, 0, sizeof(*ma));
145 * Check only if we need protection from split. If not - mdt handles
148 rc = cmm_split_expect(env, mo, ma, &split);
150 CERROR("Can't check for possible split, rc %d\n", rc);
155 * Do not take PDO lock on non-splittable objects if this is not PW,
156 * this should speed things up a bit.
158 if (split == CMM_SPLIT_DONE && lm != MDL_PW)
161 /* Protect splitting by exclusive lock. */
162 if (split == CMM_SPLIT_NEEDED && lm == MDL_PW)
166 * Have no idea about lock mode, let it be what higher layer wants.
171 /* Check if split is expected for current thread. */
172 int cmm_split_expect(const struct lu_env *env, struct md_object *mo,
173 struct md_attr *ma, int *split)
175 struct cmm_device *cmm = cmm_obj2dev(md2cmm_obj(mo));
176 struct cml_object *clo = md2cml_obj(mo);
177 struct lu_fid root_fid;
181 if (clo->clo_split == CMM_SPLIT_DONE ||
182 clo->clo_split == CMM_SPLIT_DENIED) {
183 *split = clo->clo_split;
186 /* CMM_SPLIT_UNKNOWN case below */
188 /* No need to split root object. */
189 rc = cmm_child_ops(cmm)->mdo_root_get(env, cmm->cmm_child,
194 if (lu_fid_eq(&root_fid, cmm2fid(md2cmm_obj(mo)))) {
195 /* update split state */
196 *split = clo->clo_split == CMM_SPLIT_DENIED;
201 * Assumption: ma_valid = 0 here, we only need get inode and lmv_size
204 LASSERT(ma->ma_valid == 0);
205 ma->ma_need = MA_INODE | MA_LMV;
206 rc = mo_attr_get(env, mo, ma);
210 /* No need split for already split object */
211 if (ma->ma_valid & MA_LMV) {
212 LASSERT(ma->ma_lmv_size > 0);
213 *split = clo->clo_split = CMM_SPLIT_DONE;
217 /* No need split for object whose size < CMM_SPLIT_SIZE */
218 if (ma->ma_attr.la_size < CMM_SPLIT_SIZE) {
219 *split = clo->clo_split = CMM_SPLIT_NONE;
223 *split = clo->clo_split = CMM_SPLIT_NEEDED;
227 struct cmm_object *cmm_object_find(const struct lu_env *env,
228 struct cmm_device *d,
229 const struct lu_fid *f)
232 struct cmm_object *m;
235 o = lu_object_find(env, d->cmm_md_dev.md_lu_dev.ld_site, f);
237 m = (struct cmm_object *)o;
239 m = lu2cmm_obj(lu_object_locate(o->lo_header,
240 d->cmm_md_dev.md_lu_dev.ld_type));
244 static inline void cmm_object_put(const struct lu_env *env,
245 struct cmm_object *o)
247 lu_object_put(env, &o->cmo_obj.mo_lu);
251 * Allocate new on passed @mc for slave object which is going to create there
254 static int cmm_split_fid_alloc(const struct lu_env *env,
255 struct cmm_device *cmm,
256 struct mdc_device *mc,
262 LASSERT(cmm != NULL && mc != NULL && fid != NULL);
264 down(&mc->mc_fid_sem);
266 /* Alloc new fid on @mc. */
267 rc = obd_fid_alloc(mc->mc_desc.cl_exp, fid, NULL);
269 /* Setup FLD for new sequenceif needed. */
270 rc = fld_client_create(cmm->cmm_fld, fid_seq(fid),
273 CERROR("Can't create fld entry, rc %d\n", rc);
280 /* Allocate new slave object on passed @mc */
281 static int cmm_split_slave_create(const struct lu_env *env,
282 struct cmm_device *cmm,
283 struct mdc_device *mc,
286 struct lmv_stripe_md *lmv,
289 struct md_op_spec *spec = &cmm_env_info(env)->cmi_spec;
290 struct cmm_object *obj;
294 /* Allocate new fid and store it to @fid */
295 rc = cmm_split_fid_alloc(env, cmm, mc, fid);
297 CERROR("Can't alloc new fid on "LPU64
298 ", rc %d\n", mc->mc_num, rc);
302 /* Allocate new object on @mc */
303 obj = cmm_object_find(env, cmm, fid);
305 RETURN(PTR_ERR(obj));
307 memset(spec, 0, sizeof *spec);
308 spec->u.sp_ea.fid = fid;
309 spec->u.sp_ea.eadata = lmv;
310 spec->u.sp_ea.eadatalen = lmv_size;
311 spec->sp_cr_flags |= MDS_CREATE_SLAVE_OBJ;
312 rc = mo_object_create(env, md_object_next(&obj->cmo_obj),
314 cmm_object_put(env, obj);
319 * Create so many slaves as number of stripes. This is called in split time
320 * before sending pages to slaves.
322 static int cmm_split_slaves_create(const struct lu_env *env,
323 struct md_object *mo,
326 struct cmm_device *cmm = cmm_obj2dev(md2cmm_obj(mo));
327 struct lu_fid *lf = cmm2fid(md2cmm_obj(mo));
328 struct lmv_stripe_md *slave_lmv = &cmm_env_info(env)->cmi_lmv;
329 struct mdc_device *mc, *tmp;
330 struct lmv_stripe_md *lmv;
334 /* Init the split MEA */
336 lmv->mea_master = cmm->cmm_local_num;
337 lmv->mea_magic = MEA_MAGIC_HASH_SEGMENT;
338 lmv->mea_count = cmm->cmm_tgt_count + 1;
341 * Store master FID to local node idx number. Local node is always
342 * master and its stripe number if 0.
344 lmv->mea_ids[0] = *lf;
346 memset(slave_lmv, 0, sizeof *slave_lmv);
347 slave_lmv->mea_master = cmm->cmm_local_num;
348 slave_lmv->mea_magic = MEA_MAGIC_HASH_SEGMENT;
349 slave_lmv->mea_count = 0;
351 list_for_each_entry_safe(mc, tmp, &cmm->cmm_targets, mc_linkage) {
352 rc = cmm_split_slave_create(env, cmm, mc, &lmv->mea_ids[i],
353 ma, slave_lmv, sizeof(*slave_lmv));
359 ma->ma_valid |= MA_LMV;
365 static inline int cmm_split_special_entry(struct lu_dirent *ent)
367 if (!strncmp(ent->lde_name, ".", le16_to_cpu(ent->lde_namelen)) ||
368 !strncmp(ent->lde_name, "..", le16_to_cpu(ent->lde_namelen)))
374 * Remove one entry from local MDT. Do not corrupt byte order in page, it will
375 * be sent to remote MDT.
377 static int cmm_split_remove_entry(const struct lu_env *env,
378 struct md_object *mo,
379 struct lu_dirent *ent)
381 struct cmm_device *cmm = cmm_obj2dev(md2cmm_obj(mo));
382 struct cmm_object *obj;
387 if (cmm_split_special_entry(ent))
390 fid_le_to_cpu(&cmm_env_info(env)->cmi_fid, &ent->lde_fid);
391 obj = cmm_object_find(env, cmm, &cmm_env_info(env)->cmi_fid);
393 RETURN(PTR_ERR(obj));
395 if (lu_object_exists(&obj->cmo_obj.mo_lu) > 0)
396 is_dir = S_ISDIR(lu_object_attr(&obj->cmo_obj.mo_lu));
399 * XXX: These days only cross-ref dirs are possible, so for the
400 * sake of simplicity, in split, we suppose that all cross-ref
401 * names pint to directory and do not do additional getattr to
406 OBD_ALLOC(name, le16_to_cpu(ent->lde_namelen) + 1);
408 GOTO(cleanup, rc = -ENOMEM);
410 memcpy(name, ent->lde_name, le16_to_cpu(ent->lde_namelen));
411 /* No permission check for name_remove when split */
412 rc = mdo_name_remove(env, md_object_next(mo),
414 OBD_FREE(name, le16_to_cpu(ent->lde_namelen) + 1);
419 * This @ent will be transferred to slave MDS and insert there, so in
420 * the slave MDS, we should know whether this object is dir or not, so
421 * use the highest bit of the hash to indicate that (because we do not
422 * use highest bit of hash).
425 ent->lde_hash = le32_to_cpu(ent->lde_hash);
426 ent->lde_hash = cpu_to_le32(ent->lde_hash | MAX_HASH_HIGHEST_BIT);
430 cmm_object_put(env, obj);
435 * Remove all entries from passed page. These entries are going to remote MDT
436 * and thus should be removed locally.
438 static int cmm_split_remove_page(const struct lu_env *env,
439 struct md_object *mo,
440 struct lu_rdpg *rdpg,
441 __u32 hash_end, __u32 *len)
443 struct lu_dirpage *dp;
444 struct lu_dirent *ent;
449 kmap(rdpg->rp_pages[0]);
450 dp = page_address(rdpg->rp_pages[0]);
451 for (ent = lu_dirent_start(dp);
452 ent != NULL && le32_to_cpu(ent->lde_hash) < hash_end;
453 ent = lu_dirent_next(ent)) {
454 rc = cmm_split_remove_entry(env, mo, ent);
457 * XXX: Error handler to insert remove name back,
458 * currently we assumed it will success anyway in
461 CERROR("Can not del %*.*s, rc %d\n",
462 le16_to_cpu(ent->lde_namelen),
463 le16_to_cpu(ent->lde_namelen),
467 *len += lu_dirent_size(ent);
470 if (ent != lu_dirent_start(dp))
471 *len += sizeof(struct lu_dirpage);
474 kunmap(rdpg->rp_pages[0]);
478 /* Send one page to remote MDT for creating entries there. */
479 static int cmm_split_send_page(const struct lu_env *env,
480 struct md_object *mo,
481 struct lu_rdpg *rdpg,
482 struct lu_fid *fid, int len)
484 struct cmm_device *cmm = cmm_obj2dev(md2cmm_obj(mo));
485 struct cmm_object *obj;
489 obj = cmm_object_find(env, cmm, fid);
491 RETURN(PTR_ERR(obj));
493 rc = mdc_send_page(cmm, env, md_object_next(&obj->cmo_obj),
494 rdpg->rp_pages[0], len);
495 cmm_object_put(env, obj);
499 /* Read one page of entries from local MDT. */
500 static int cmm_split_read_page(const struct lu_env *env,
501 struct md_object *mo,
502 struct lu_rdpg *rdpg)
506 memset(cfs_kmap(rdpg->rp_pages[0]), 0, CFS_PAGE_SIZE);
507 cfs_kunmap(rdpg->rp_pages[0]);
508 rc = mo_readpage(env, md_object_next(mo), rdpg);
513 * This function performs migration of all pages with entries which fit into one
514 * stripe and one hash segment.
516 static int cmm_split_process_stripe(const struct lu_env *env,
517 struct md_object *mo,
518 struct lu_rdpg *rdpg,
525 LASSERT(rdpg->rp_npages == 1);
527 struct lu_dirpage *ldp;
530 /* Read one page from local MDT. */
531 rc = cmm_split_read_page(env, mo, rdpg);
533 CERROR("Error in readpage: %d\n", rc);
537 /* Remove local entries which are going to remite MDT. */
538 rc = cmm_split_remove_page(env, mo, rdpg, end, &len);
540 CERROR("Error in remove stripe entries: %d\n", rc);
544 /* Send entries page to slave MDT. */
546 rc = cmm_split_send_page(env, mo, rdpg, lf, len);
548 CERROR("Error in sending page: %d\n", rc);
553 kmap(rdpg->rp_pages[0]);
554 ldp = page_address(rdpg->rp_pages[0]);
555 if (le32_to_cpu(ldp->ldp_hash_end) >= end)
558 rdpg->rp_hash = le32_to_cpu(ldp->ldp_hash_end);
559 kunmap(rdpg->rp_pages[0]);
567 static int cmm_split_process_dir(const struct lu_env *env,
568 struct md_object *mo,
571 struct cmm_device *cmm = cmm_obj2dev(md2cmm_obj(mo));
572 struct lu_rdpg *rdpg = &cmm_env_info(env)->cmi_rdpg;
577 memset(rdpg, 0, sizeof *rdpg);
578 rdpg->rp_npages = CMM_SPLIT_PAGE_COUNT;
579 rdpg->rp_count = CFS_PAGE_SIZE * rdpg->rp_npages;
580 rdpg->rp_pages = cmm_env_info(env)->cmi_pages;
582 for (i = 0; i < rdpg->rp_npages; i++) {
583 rdpg->rp_pages[i] = alloc_pages(GFP_KERNEL, 0);
584 if (rdpg->rp_pages[i] == NULL)
585 GOTO(cleanup, rc = -ENOMEM);
588 LASSERT(ma->ma_valid & MA_LMV);
589 hash_segement = MAX_HASH_SIZE / (cmm->cmm_tgt_count + 1);
590 for (i = 1; i < cmm->cmm_tgt_count + 1; i++) {
594 lf = &ma->ma_lmv->mea_ids[i];
596 rdpg->rp_hash = i * hash_segement;
597 if (i == cmm->cmm_tgt_count)
598 hash_end = MAX_HASH_SIZE;
600 hash_end = rdpg->rp_hash + hash_segement;
601 rc = cmm_split_process_stripe(env, mo, rdpg, lf, hash_end);
603 CERROR("Error (rc = %d) while splitting for %d: fid="
604 DFID", %08x:%08x\n", rc, i, PFID(lf),
605 rdpg->rp_hash, hash_end);
612 for (i = 0; i < rdpg->rp_npages; i++)
613 if (rdpg->rp_pages[i] != NULL)
614 __free_pages(rdpg->rp_pages[i], 0);
618 int cmm_split_dir(const struct lu_env *env, struct md_object *mo)
620 struct cmm_device *cmm = cmm_obj2dev(md2cmm_obj(mo));
621 struct md_attr *ma = &cmm_env_info(env)->cmi_ma;
624 struct timeval start;
627 cmm_lprocfs_time_start(cmm, &start, LPROC_CMM_SPLIT);
629 LASSERT(S_ISDIR(lu_object_attr(&mo->mo_lu)));
630 memset(ma, 0, sizeof(*ma));
632 /* Step1: Checking whether the dir needs to be split. */
633 rc = cmm_split_expect(env, mo, ma, &split);
637 if (split != CMM_SPLIT_NEEDED) {
638 /* No split is needed, caller may proceed with create. */
642 /* Split should be done now, let's do it. */
643 CWARN("Dir "DFID" is going to split (size: "LPU64")\n",
644 PFID(lu_object_fid(&mo->mo_lu)), ma->ma_attr.la_size);
647 * Disable transacrions for split, since there will be so many trans in
648 * this one ops, conflict with current recovery design.
650 rc = cmm_upcall(env, &cmm->cmm_md_dev, MD_NO_TRANS);
652 CERROR("Can't disable trans for split, rc %d\n", rc);
656 /* Step2: Prepare the md memory */
657 ma->ma_lmv_size = CMM_MD_SIZE(cmm->cmm_tgt_count + 1);
658 OBD_ALLOC(ma->ma_lmv, ma->ma_lmv_size);
659 if (ma->ma_lmv == NULL)
660 GOTO(out, rc = -ENOMEM);
662 /* Step3: Create slave objects and fill the ma->ma_lmv */
663 rc = cmm_split_slaves_create(env, mo, ma);
665 CERROR("Can't create slaves for split, rc %d\n", rc);
669 /* Step4: Scan and split the object. */
670 rc = cmm_split_process_dir(env, mo, ma);
672 CERROR("Can't scan and split, rc %d\n", rc);
676 /* Step5: Set mea to the master object. */
677 LASSERT(ma->ma_valid & MA_LMV);
678 buf = cmm_buf_get(env, ma->ma_lmv, ma->ma_lmv_size);
679 rc = mo_xattr_set(env, md_object_next(mo), buf,
682 CERROR("Can't set MEA to master dir, " "rc %d\n", rc);
686 /* set flag in cmm_object */
687 md2cml_obj(mo)->clo_split = CMM_SPLIT_DONE;
690 * Finally, split succeed, tell client to repeat opetartion on correct
693 CWARN("Dir "DFID" has been split\n", PFID(lu_object_fid(&mo->mo_lu)));
697 OBD_FREE(ma->ma_lmv, ma->ma_lmv_size);
699 cmm_lprocfs_time_end(cmm, &start, LPROC_CMM_SPLIT);