1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
4 * lustre/cmm/cmm_split.c
7 * Copyright (c) 2006 Cluster File Systems, Inc.
8 * Author: Alex thomas <alex@clusterfs.com>
9 * Wang Di <wangdi@clusterfs.com>
11 * This file is part of the Lustre file system, http://www.lustre.org
12 * Lustre is a trademark of Cluster File Systems, Inc.
14 * You may have signed or agreed to another license before downloading
15 * this software. If so, you are bound by the terms and conditions
16 * of that agreement, and the following does not apply to you. See the
17 * LICENSE file included with this distribution for more information.
19 * If you did not agree to a different license, then this copy of Lustre
20 * is open source software; you can redistribute it and/or modify it
21 * under the terms of version 2 of the GNU General Public License as
22 * published by the Free Software Foundation.
24 * In either case, Lustre is distributed in the hope that it will be
25 * useful, but WITHOUT ANY WARRANTY; without even the implied warranty
26 * of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
27 * license text for more details.
31 # define EXPORT_SYMTAB
34 #define DEBUG_SUBSYSTEM S_MDS
36 #include <obd_class.h>
37 #include <lustre_fid.h>
38 #include <lustre_mds.h>
39 #include <lustre/lustre_idl.h>
40 #include "cmm_internal.h"
41 #include "mdc_internal.h"
43 static struct lu_buf *cmm_buf_get(const struct lu_env *env, void *area,
48 buf = &cmm_env_info(env)->cmi_buf;
54 int cmm_mdsnum_check(const struct lu_env *env, struct md_object *mp,
57 struct cmm_device *cmm = cmm_obj2dev(md2cmm_obj(mp));
58 struct md_attr *ma = &cmm_env_info(env)->cmi_ma;
62 if (cmm->cmm_tgt_count == 0)
65 /* Try to get the LMV EA size */
66 memset(ma, 0, sizeof(*ma));
68 rc = mo_attr_get(env, mp, ma);
72 if (ma->ma_valid & MA_LMV) {
76 * Clean MA_LMV in ->ma_valid because mdd will do nothing
77 * counting that EA is already taken.
79 ma->ma_valid &= ~MA_LMV;
81 LASSERT(ma->ma_lmv_size > 0);
82 OBD_ALLOC(ma->ma_lmv, ma->ma_lmv_size);
83 if (ma->ma_lmv == NULL)
88 rc = mo_attr_get(env, mp, ma);
90 /* Skip checking the slave dirs (mea_count is 0) */
91 if (rc == 0 && ma->ma_lmv->mea_count != 0) {
93 * Get stripe by name to check the name belongs to
94 * master dir, otherwise return the -ERESTART
96 stripe = mea_name2idx(ma->ma_lmv, name, strlen(name));
98 /* Master stripe is always 0 */
102 OBD_FREE(ma->ma_lmv, ma->ma_lmv_size);
107 int cmm_expect_splitting(const struct lu_env *env, struct md_object *mo,
108 struct md_attr *ma, int *split)
110 struct cmm_device *cmm = cmm_obj2dev(md2cmm_obj(mo));
111 struct lu_fid root_fid;
116 * Check first most light things like tgt count and root fid. For some
117 * case this style should yeild better performance.
119 if (cmm->cmm_tgt_count == 0) {
120 *split = CMM_NO_SPLIT_EXPECTED;
124 rc = cmm_child_ops(cmm)->mdo_root_get(env, cmm->cmm_child,
129 if (lu_fid_eq(&root_fid, cmm2fid(md2cmm_obj(mo)))) {
130 *split = CMM_NOT_SPLITTABLE;
135 * MA_INODE is needed to check inode size.
136 * Memory is prepared by caller.
138 ma->ma_need = MA_INODE | MA_LMV;
139 rc = mo_attr_get(env, mo, ma);
143 if (ma->ma_valid & MA_LMV) {
144 *split = CMM_NOT_SPLITTABLE;
148 if (ma->ma_attr.la_size < CMM_SPLIT_SIZE) {
149 *split = CMM_NO_SPLIT_EXPECTED;
153 *split = CMM_EXPECT_SPLIT;
157 struct cmm_object *cmm_object_find(const struct lu_env *env,
158 struct cmm_device *d,
159 const struct lu_fid *f)
162 struct cmm_object *m;
165 o = lu_object_find(env, d->cmm_md_dev.md_lu_dev.ld_site, f);
167 m = (struct cmm_object *)o;
169 m = lu2cmm_obj(lu_object_locate(o->lo_header,
170 d->cmm_md_dev.md_lu_dev.ld_type));
174 static inline void cmm_object_put(const struct lu_env *env,
175 struct cmm_object *o)
177 lu_object_put(env, &o->cmo_obj.mo_lu);
180 static int cmm_object_create(const struct lu_env *env,
181 struct cmm_device *cmm,
184 struct lmv_stripe_md *lmv,
187 struct md_create_spec *spec;
188 struct cmm_object *obj;
192 obj = cmm_object_find(env, cmm, fid);
194 RETURN(PTR_ERR(obj));
198 spec->u.sp_ea.fid = fid;
199 spec->u.sp_ea.eadata = lmv;
200 spec->u.sp_ea.eadatalen = lmv_size;
201 spec->sp_cr_flags |= MDS_CREATE_SLAVE_OBJ;
202 rc = mo_object_create(env, md_object_next(&obj->cmo_obj),
206 cmm_object_put(env, obj);
210 static int cmm_fid_alloc(const struct lu_env *env,
211 struct cmm_device *cmm,
212 struct mdc_device *mc,
218 LASSERT(cmm != NULL);
220 LASSERT(fid != NULL);
222 down(&mc->mc_fid_sem);
224 /* Alloc new fid on @mc. */
225 rc = obd_fid_alloc(mc->mc_desc.cl_exp, fid, NULL);
227 /* Setup FLD for new sequenceif needed. */
228 rc = fld_client_create(cmm->cmm_fld, fid_seq(fid),
231 CERROR("Can't create fld entry, rc %d\n", rc);
238 static int cmm_slaves_create(const struct lu_env *env,
239 struct md_object *mo,
242 struct cmm_device *cmm = cmm_obj2dev(md2cmm_obj(mo));
243 struct lu_fid *lf = cmm2fid(md2cmm_obj(mo));
244 struct lmv_stripe_md *lmv;
245 struct lmv_stripe_md *slave_lmv = NULL;
246 struct mdc_device *mc, *tmp;
251 lmv->mea_master = cmm->cmm_local_num;
252 lmv->mea_magic = MEA_MAGIC_HASH_SEGMENT;
253 lmv->mea_count = cmm->cmm_tgt_count + 1;
255 /* Store master FID to local node idx number. */
256 lmv->mea_ids[0] = *lf;
258 OBD_ALLOC_PTR(slave_lmv);
259 if (slave_lmv == NULL)
262 slave_lmv->mea_master = cmm->cmm_local_num;
263 slave_lmv->mea_magic = MEA_MAGIC_HASH_SEGMENT;
264 slave_lmv->mea_count = 0;
266 list_for_each_entry_safe(mc, tmp, &cmm->cmm_targets, mc_linkage) {
267 /* Alloc fid for slave object. */
268 rc = cmm_fid_alloc(env, cmm, mc, &lmv->mea_ids[i]);
270 CERROR("Can't alloc fid for slave "LPU64", rc %d\n",
275 /* Create slave on remote MDT. */
276 rc = cmm_object_create(env, cmm, &lmv->mea_ids[i], ma,
277 slave_lmv, sizeof(*slave_lmv));
285 OBD_FREE_PTR(slave_lmv);
289 static int cmm_send_split_pages(const struct lu_env *env,
290 struct md_object *mo,
291 struct lu_rdpg *rdpg,
292 struct lu_fid *fid, int len)
294 struct cmm_device *cmm = cmm_obj2dev(md2cmm_obj(mo));
295 struct cmm_object *obj;
299 obj = cmm_object_find(env, cmm, fid);
301 RETURN(PTR_ERR(obj));
303 rc = mdc_send_page(cmm, env, md_object_next(&obj->cmo_obj),
304 rdpg->rp_pages[0], len);
305 cmm_object_put(env, obj);
309 static int cmm_remove_dir_ent(const struct lu_env *env,
310 struct md_object *mo,
311 struct lu_dirent *ent)
313 struct cmm_device *cmm = cmm_obj2dev(md2cmm_obj(mo));
314 struct cmm_object *obj;
319 if (!strncmp(ent->lde_name, ".", ent->lde_namelen) ||
320 !strncmp(ent->lde_name, "..", ent->lde_namelen))
323 obj = cmm_object_find(env, cmm, &ent->lde_fid);
325 RETURN(PTR_ERR(obj));
327 if (lu_object_exists(&obj->cmo_obj.mo_lu) > 0)
328 is_dir = S_ISDIR(lu_object_attr(&obj->cmo_obj.mo_lu));
330 /* XXX: is this correct? */
333 OBD_ALLOC(name, ent->lde_namelen + 1);
335 GOTO(cleanup, rc = -ENOMEM);
337 memcpy(name, ent->lde_name, ent->lde_namelen);
338 rc = mdo_name_remove(env, md_object_next(mo),
340 OBD_FREE(name, ent->lde_namelen + 1);
345 * This ent will be transferred to slave MDS and insert it there, so in
346 * the slave MDS, we should know whether this object is dir or not, so
347 * use the highest bit of the hash to indicate that (because we do not
348 * use highest bit of hash).
351 ent->lde_hash |= MAX_HASH_HIGHEST_BIT;
353 cmm_object_put(env, obj);
358 static int cmm_remove_entries(const struct lu_env *env,
359 struct md_object *mo, struct lu_rdpg *rdpg,
360 __u32 hash_end, __u32 *len)
362 struct lu_dirpage *dp;
363 struct lu_dirent *ent;
367 kmap(rdpg->rp_pages[0]);
368 dp = page_address(rdpg->rp_pages[0]);
369 for (ent = lu_dirent_start(dp); ent != NULL;
370 ent = lu_dirent_next(ent)) {
371 if (ent->lde_hash < hash_end) {
372 rc = cmm_remove_dir_ent(env, mo, ent);
374 CERROR("Can not del %s rc %d\n", ent->lde_name,
379 if (ent != lu_dirent_start(dp))
380 *len = (int)((__u32)ent - (__u32)dp);
386 *len = CFS_PAGE_SIZE;
389 kunmap(rdpg->rp_pages[0]);
393 static int cmm_split_entries(const struct lu_env *env,
394 struct md_object *mo, struct lu_rdpg *rdpg,
395 struct lu_fid *lf, __u32 end)
400 LASSERTF(rdpg->rp_npages == 1, "Now Only support split 1 page each time"
401 "npages %d\n", rdpg->rp_npages);
403 /* Read split page and send them to the slave master. */
405 struct lu_dirpage *ldp;
408 /* init page with '0' */
409 memset(kmap(rdpg->rp_pages[0]), 0, CFS_PAGE_SIZE);
410 kunmap(rdpg->rp_pages[0]);
412 rc = mo_readpage(env, md_object_next(mo), rdpg);
414 CERROR("Error in readpage: %d\n", rc);
418 /* Remove the old entries */
419 rc = cmm_remove_entries(env, mo, rdpg, end, &len);
421 CERROR("Error in remove entry: %d\n", rc);
425 /* Send page to slave object */
427 rc = cmm_send_split_pages(env, mo, rdpg, lf, len);
429 CERROR("Error in sending pages: %d\n", rc);
434 kmap(rdpg->rp_pages[0]);
435 ldp = page_address(rdpg->rp_pages[0]);
436 if (ldp->ldp_hash_end >= end) {
439 rdpg->rp_hash = ldp->ldp_hash_end;
440 kunmap(rdpg->rp_pages[0]);
446 #define SPLIT_PAGE_COUNT 1
448 static int cmm_scan_and_split(const struct lu_env *env,
449 struct md_object *mo,
452 struct cmm_device *cmm = cmm_obj2dev(md2cmm_obj(mo));
453 struct lu_rdpg *rdpg = NULL;
461 rdpg->rp_npages = SPLIT_PAGE_COUNT;
462 rdpg->rp_count = CFS_PAGE_SIZE * rdpg->rp_npages;
464 OBD_ALLOC(rdpg->rp_pages, rdpg->rp_npages * sizeof(rdpg->rp_pages[0]));
465 if (rdpg->rp_pages == NULL)
466 GOTO(free_rdpg, rc = -ENOMEM);
468 for (i = 0; i < rdpg->rp_npages; i++) {
469 rdpg->rp_pages[i] = alloc_pages(GFP_KERNEL, 0);
470 if (rdpg->rp_pages[i] == NULL)
471 GOTO(cleanup, rc = -ENOMEM);
474 hash_segement = MAX_HASH_SIZE / (cmm->cmm_tgt_count + 1);
475 for (i = 1; i < cmm->cmm_tgt_count + 1; i++) {
479 lf = &ma->ma_lmv->mea_ids[i];
481 rdpg->rp_hash = i * hash_segement;
482 hash_end = rdpg->rp_hash + hash_segement;
483 rc = cmm_split_entries(env, mo, rdpg, lf, hash_end);
485 CERROR("Error (rc=%d) while splitting for %d: fid="
486 DFID", %08x:%08x\n", rc, i, PFID(lf),
487 rdpg->rp_hash, hash_end);
493 for (i = 0; i < rdpg->rp_npages; i++)
494 if (rdpg->rp_pages[i] != NULL)
495 __free_pages(rdpg->rp_pages[i], 0);
497 OBD_FREE(rdpg->rp_pages, rdpg->rp_npages *
498 sizeof rdpg->rp_pages[0]);
506 #define cmm_md_size(stripes) \
507 (sizeof(struct lmv_stripe_md) + (stripes) * sizeof(struct lu_fid))
509 int cmm_try_to_split(const struct lu_env *env, struct md_object *mo)
511 struct cmm_device *cmm = cmm_obj2dev(md2cmm_obj(mo));
512 struct md_attr *ma = &cmm_env_info(env)->cmi_ma;
514 int rc = 0, split, lmv_size;
517 LASSERT(S_ISDIR(lu_object_attr(&mo->mo_lu)));
519 memset(ma, 0, sizeof(*ma));
520 lmv_size = ma->ma_lmv_size = cmm_md_size(cmm->cmm_tgt_count + 1);
523 * Preparing memory for LMV. This will be freed after finish splitting.
525 OBD_ALLOC(ma->ma_lmv, ma->ma_lmv_size);
526 if (ma->ma_lmv == NULL)
529 /* Step1: Checking whether the dir needs to be split. */
530 rc = cmm_expect_splitting(env, mo, ma, &split);
534 if (split != CMM_EXPECT_SPLIT)
535 GOTO(cleanup, rc = 0);
537 LASSERTF(mo->mo_pdo_mode == MDL_EX, "Split is only valid if "
538 "dir is protected by MDL_EX lock. Lock mode 0x%x\n",
539 (int)mo->mo_pdo_mode);
542 * Disable trans for splitting, since there will be so many trans in
543 * this one ops, confilct with current recovery design.
545 rc = cmm_upcall(env, &cmm->cmm_md_dev, MD_NO_TRANS);
547 CERROR("Can't disable trans for split, rc %d\n", rc);
551 /* Step2: Create slave objects (on slave MDTs) */
553 ma->ma_lmv_size = lmv_size;
554 rc = cmm_slaves_create(env, mo, ma);
556 CERROR("Can't create slaves for split, rc %d\n", rc);
560 /* Step3: Scan and split the object. */
561 rc = cmm_scan_and_split(env, mo, ma);
563 CERROR("Can't scan and split, rc %d\n", rc);
567 buf = cmm_buf_get(env, ma->ma_lmv, ma->ma_lmv_size);
569 /* Step4: Set mea to the master object. */
570 rc = mo_xattr_set(env, md_object_next(mo), buf,
573 CWARN("Dir "DFID" has been split\n",
574 PFID(lu_object_fid(&mo->mo_lu)));
577 CERROR("Can't set MEA to master dir, "
582 OBD_FREE(ma->ma_lmv, lmv_size);