1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
4 * lustre/cmm/cmm_split.c
7 * Copyright (c) 2006 Cluster File Systems, Inc.
8 * Author: Alex thomas <alex@clusterfs.com>
9 * Wang Di <wangdi@clusterfs.com>
11 * This file is part of the Lustre file system, http://www.lustre.org
12 * Lustre is a trademark of Cluster File Systems, Inc.
14 * You may have signed or agreed to another license before downloading
15 * this software. If so, you are bound by the terms and conditions
16 * of that agreement, and the following does not apply to you. See the
17 * LICENSE file included with this distribution for more information.
19 * If you did not agree to a different license, then this copy of Lustre
20 * is open source software; you can redistribute it and/or modify it
21 * under the terms of version 2 of the GNU General Public License as
22 * published by the Free Software Foundation.
24 * In either case, Lustre is distributed in the hope that it will be
25 * useful, but WITHOUT ANY WARRANTY; without even the implied warranty
26 * of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
27 * license text for more details.
31 # define EXPORT_SYMTAB
34 #define DEBUG_SUBSYSTEM S_MDS
36 #include <obd_class.h>
37 #include <lustre_fid.h>
38 #include <lustre_mds.h>
39 #include <lustre/lustre_idl.h>
40 #include "cmm_internal.h"
41 #include "mdc_internal.h"
43 #define CMM_NO_SPLIT_EXPECTED 0
44 #define CMM_EXPECT_SPLIT 1
45 #define CMM_NO_SPLITTABLE 2
51 static inline struct lu_fid* cmm2_fid(struct cmm_object *obj)
53 return &(obj->cmo_obj.mo_lu.lo_header->loh_fid);
56 static int cmm_expect_splitting(const struct lu_env *env,
60 struct cmm_device *cmm = cmm_obj2dev(md2cmm_obj(mo));
61 struct lu_fid *fid = NULL;
62 int rc = CMM_EXPECT_SPLIT;
65 if (cmm->cmm_tgt_count == 0)
66 GOTO(cleanup, rc = CMM_NO_SPLIT_EXPECTED);
68 if (ma->ma_attr.la_size < SPLIT_SIZE)
69 GOTO(cleanup, rc = CMM_NO_SPLIT_EXPECTED);
72 GOTO(cleanup, rc = CMM_NO_SPLIT_EXPECTED);
74 rc = cmm_child_ops(cmm)->mdo_root_get(env, cmm->cmm_child, fid);
78 rc = CMM_EXPECT_SPLIT;
80 if (lu_fid_eq(fid, cmm2_fid(md2cmm_obj(mo))))
81 GOTO(cleanup, rc = CMM_NO_SPLIT_EXPECTED);
89 #define cmm_md_size(stripes) \
90 (sizeof(struct lmv_stripe_md) + (stripes) * sizeof(struct lu_fid))
92 static int cmm_alloc_fid(const struct lu_env *env, struct cmm_device *cmm,
93 struct lu_fid *fid, int count)
95 struct mdc_device *mc, *tmp;
98 LASSERT(count == cmm->cmm_tgt_count);
99 /* FIXME: this spin_lock maybe not proper,
100 * because fid_alloc may need RPC */
101 spin_lock(&cmm->cmm_tgt_guard);
102 list_for_each_entry_safe(mc, tmp, &cmm->cmm_targets,
104 LASSERT(cmm->cmm_local_num != mc->mc_num);
106 rc = obd_fid_alloc(mc->mc_desc.cl_exp, &fid[i], NULL);
110 ls = cmm->cmm_md_dev.md_lu_dev.ld_site;
111 rc = fld_client_create(ls->ls_client_fld,
116 spin_unlock(&cmm->cmm_tgt_guard);
121 spin_unlock(&cmm->cmm_tgt_guard);
128 struct cmm_object *cmm_object_find(const struct lu_env *env,
129 struct cmm_device *d,
130 const struct lu_fid *f,
131 struct lustre_capa *capa)
134 struct cmm_object *m;
137 o = lu_object_find(env, d->cmm_md_dev.md_lu_dev.ld_site, f,
140 m = (struct cmm_object *)o;
142 m = lu2cmm_obj(lu_object_locate(o->lo_header,
143 d->cmm_md_dev.md_lu_dev.ld_type));
147 static inline void cmm_object_put(const struct lu_env *env,
148 struct cmm_object *o)
150 lu_object_put(env, &o->cmo_obj.mo_lu);
153 static int cmm_creat_remote_obj(const struct lu_env *env,
154 struct cmm_device *cmm,
155 struct lu_fid *fid, struct md_attr *ma,
156 const struct lmv_stripe_md *lmv,
159 struct cmm_object *obj;
160 struct md_create_spec *spec;
164 /* XXX Since capablity will not work with split. so we
165 * pass NULL capablity here */
166 obj = cmm_object_find(env, cmm, fid, NULL);
168 RETURN(PTR_ERR(obj));
172 spec->u.sp_ea.fid = fid;
173 spec->u.sp_ea.eadata = lmv;
174 spec->u.sp_ea.eadatalen = lmv_size;
175 spec->sp_cr_flags |= MDS_CREATE_SLAVE_OBJ;
176 rc = mo_object_create(env, md_object_next(&obj->cmo_obj),
180 cmm_object_put(env, obj);
184 static int cmm_create_slave_objects(const struct lu_env *env,
185 struct md_object *mo, struct md_attr *ma)
187 struct cmm_device *cmm = cmm_obj2dev(md2cmm_obj(mo));
188 struct lmv_stripe_md *lmv = NULL, *slave_lmv = NULL;
190 struct lu_fid *lf = cmm2_fid(md2cmm_obj(mo));
193 lmv_size = cmm_md_size(cmm->cmm_tgt_count + 1);
195 /* This lmv will be free after finish splitting. */
196 OBD_ALLOC(lmv, lmv_size);
200 lmv->mea_master = cmm->cmm_local_num;
201 lmv->mea_magic = MEA_MAGIC_HASH_SEGMENT;
202 lmv->mea_count = cmm->cmm_tgt_count + 1;
204 lmv->mea_ids[0] = *lf;
206 rc = cmm_alloc_fid(env, cmm, &lmv->mea_ids[1],
211 OBD_ALLOC_PTR(slave_lmv);
213 GOTO(cleanup, rc = -ENOMEM);
215 slave_lmv->mea_master = cmm->cmm_local_num;
216 slave_lmv->mea_magic = MEA_MAGIC_HASH_SEGMENT;
217 slave_lmv->mea_count = 0;
218 for (i = 1; i < cmm->cmm_tgt_count + 1; i ++) {
219 rc = cmm_creat_remote_obj(env, cmm, &lmv->mea_ids[i], ma,
220 slave_lmv, sizeof(slave_lmv));
225 ma->ma_lmv_size = lmv_size;
229 OBD_FREE_PTR(slave_lmv);
233 static int cmm_send_split_pages(const struct lu_env *env,
234 struct md_object *mo, struct lu_rdpg *rdpg,
235 struct lu_fid *fid, int len)
237 struct cmm_device *cmm = cmm_obj2dev(md2cmm_obj(mo));
238 struct cmm_object *obj;
242 obj = cmm_object_find(env, cmm, fid, NULL);
244 RETURN(PTR_ERR(obj));
246 rc = mdc_send_page(cmm, env, md_object_next(&obj->cmo_obj),
247 rdpg->rp_pages[0], len);
248 cmm_object_put(env, obj);
252 static int cmm_remove_dir_ent(const struct lu_env *env, struct md_object *mo,
253 struct lu_dirent *ent)
255 struct cmm_device *cmm = cmm_obj2dev(md2cmm_obj(mo));
256 struct cmm_object *obj;
261 if (!strncmp(ent->lde_name, ".", ent->lde_namelen) ||
262 !strncmp(ent->lde_name, "..", ent->lde_namelen))
265 obj = cmm_object_find(env, cmm, &ent->lde_fid, NULL);
267 RETURN(PTR_ERR(obj));
269 is_dir = S_ISDIR(lu_object_attr(&obj->cmo_obj.mo_lu));
270 OBD_ALLOC(name, ent->lde_namelen + 1);
272 GOTO(cleanup, rc = -ENOMEM);
274 memcpy(name, ent->lde_name, ent->lde_namelen);
275 rc = mdo_name_remove(env, md_object_next(mo),
277 OBD_FREE(name, ent->lde_namelen + 1);
281 /* Because this ent will be transferred to slave MDS and
282 * insert it there, so in the slave MDS, we should know whether
283 * this object is dir or not, so use the highest bit of the hash
284 * to indicate that (because we do not use highest bit of hash)
287 ent->lde_hash |= MAX_HASH_HIGHEST_BIT;
289 cmm_object_put(env, obj);
294 static int cmm_remove_entries(const struct lu_env *env,
295 struct md_object *mo, struct lu_rdpg *rdpg,
296 __u32 hash_end, __u32 *len)
298 struct lu_dirpage *dp;
299 struct lu_dirent *ent;
303 kmap(rdpg->rp_pages[0]);
304 dp = page_address(rdpg->rp_pages[0]);
305 for (ent = lu_dirent_start(dp); ent != NULL;
306 ent = lu_dirent_next(ent)) {
307 if (ent->lde_hash < hash_end) {
308 rc = cmm_remove_dir_ent(env, mo, ent);
310 CERROR("Can not del %s rc %d\n", ent->lde_name,
315 if (ent != lu_dirent_start(dp))
316 *len = (int)((__u32)ent - (__u32)dp);
322 *len = CFS_PAGE_SIZE;
324 kunmap(rdpg->rp_pages[0]);
328 static int cmm_split_entries(const struct lu_env *env,
329 struct md_object *mo, struct lu_rdpg *rdpg,
330 struct lu_fid *lf, __u32 end)
335 LASSERTF(rdpg->rp_npages == 1, "Now Only support split 1 page each time"
336 "npages %d \n", rdpg->rp_npages);
337 /* Read splitted page and send them to the slave master */
339 struct lu_dirpage *ldp;
342 /* init page with '0' */
343 memset(kmap(rdpg->rp_pages[0]), 0, CFS_PAGE_SIZE);
344 kunmap(rdpg->rp_pages[0]);
346 rc = mo_readpage(env, md_object_next(mo), rdpg);
347 /* -E2BIG means it already reach the end of the dir */
356 /* Remove the old entries */
357 rc = cmm_remove_entries(env, mo, rdpg, end, &len);
361 /* Send page to slave object */
363 rc = cmm_send_split_pages(env, mo, rdpg, lf, len);
368 kmap(rdpg->rp_pages[0]);
369 ldp = page_address(rdpg->rp_pages[0]);
370 if (ldp->ldp_hash_end >= end) {
373 rdpg->rp_hash = ldp->ldp_hash_end;
374 kunmap(rdpg->rp_pages[0]);
379 #define SPLIT_PAGE_COUNT 1
380 static int cmm_scan_and_split(const struct lu_env *env,
381 struct md_object *mo, struct md_attr *ma)
383 struct cmm_device *cmm = cmm_obj2dev(md2cmm_obj(mo));
385 struct lu_rdpg *rdpg = NULL;
392 rdpg->rp_npages = SPLIT_PAGE_COUNT;
393 rdpg->rp_count = CFS_PAGE_SIZE * rdpg->rp_npages;
395 OBD_ALLOC(rdpg->rp_pages, rdpg->rp_npages * sizeof rdpg->rp_pages[0]);
396 if (rdpg->rp_pages == NULL)
397 GOTO(free_rdpg, rc = -ENOMEM);
399 for (i = 0; i < rdpg->rp_npages; i++) {
400 rdpg->rp_pages[i] = alloc_pages(GFP_KERNEL, 0);
401 if (rdpg->rp_pages[i] == NULL)
402 GOTO(cleanup, rc = -ENOMEM);
405 hash_segement = MAX_HASH_SIZE / (cmm->cmm_tgt_count + 1);
406 for (i = 1; i < cmm->cmm_tgt_count + 1; i++) {
407 struct lu_fid *lf = &ma->ma_lmv->mea_ids[i];
410 rdpg->rp_hash = i * hash_segement;
411 hash_end = rdpg->rp_hash + hash_segement;
412 rc = cmm_split_entries(env, mo, rdpg, lf, hash_end);
417 for (i = 0; i < rdpg->rp_npages; i++)
418 if (rdpg->rp_pages[i] != NULL)
419 __free_pages(rdpg->rp_pages[i], 0);
421 OBD_FREE(rdpg->rp_pages, rdpg->rp_npages *
422 sizeof rdpg->rp_pages[0]);
430 static struct lu_buf *cmm_buf_get(const struct lu_env *env, void *area,
435 buf = &cmm_env_info(env)->cmi_buf;
441 int cml_try_to_split(const struct lu_env *env, struct md_object *mo)
443 struct cmm_device *cmm = cmm_obj2dev(md2cmm_obj(mo));
449 LASSERT(S_ISDIR(lu_object_attr(&mo->mo_lu)));
455 ma->ma_need = MA_INODE|MA_LMV;
456 rc = mo_attr_get(env, mo, ma);
460 /* step1: checking whether the dir need to be splitted */
461 rc = cmm_expect_splitting(env, mo, ma);
462 if (rc != CMM_EXPECT_SPLIT)
463 GOTO(cleanup, rc = 0);
465 /* Disable trans for splitting, since there will be
466 * so many trans in this one ops, confilct with current
468 rc = cmm_upcall(env, &cmm->cmm_md_dev, MD_NO_TRANS);
470 GOTO(cleanup, rc = 0);
472 /* step2: create slave objects */
473 rc = cmm_create_slave_objects(env, mo, ma);
477 /* step3: scan and split the object */
478 rc = cmm_scan_and_split(env, mo, ma);
482 buf = cmm_buf_get(env, ma->ma_lmv, ma->ma_lmv_size);
483 /* step4: set mea to the master object */
484 rc = mo_xattr_set(env, md_object_next(mo), buf, MDS_LMV_MD_NAME, 0);
486 CWARN("Dir"DFID" has been split \n",
487 PFID(lu_object_fid(&mo->mo_lu)));
489 if (ma->ma_lmv_size && ma->ma_lmv)
490 OBD_FREE(ma->ma_lmv, ma->ma_lmv_size);