1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
4 * lustre/cmm/cmm_split.c
7 * Copyright (c) 2006 Cluster File Systems, Inc.
8 * Author: Alex thomas <alex@clusterfs.com>
9 * Wang Di <wangdi@clusterfs.com>
11 * This file is part of the Lustre file system, http://www.lustre.org
12 * Lustre is a trademark of Cluster File Systems, Inc.
14 * You may have signed or agreed to another license before downloading
15 * this software. If so, you are bound by the terms and conditions
16 * of that agreement, and the following does not apply to you. See the
17 * LICENSE file included with this distribution for more information.
19 * If you did not agree to a different license, then this copy of Lustre
20 * is open source software; you can redistribute it and/or modify it
21 * under the terms of version 2 of the GNU General Public License as
22 * published by the Free Software Foundation.
24 * In either case, Lustre is distributed in the hope that it will be
25 * useful, but WITHOUT ANY WARRANTY; without even the implied warranty
26 * of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
27 * license text for more details.
31 # define EXPORT_SYMTAB
34 #define DEBUG_SUBSYSTEM S_MDS
36 #include <obd_class.h>
37 #include <lustre_fid.h>
38 #include <lustre_mds.h>
39 #include "cmm_internal.h"
40 #include "mdc_internal.h"
42 struct cmm_thread_info {
43 struct md_attr cti_ma;
46 struct lu_context_key cmm_thread_key;
47 struct cmm_thread_info *cmm_ctx_info(const struct lu_context *ctx)
49 struct cmm_thread_info *info;
51 info = lu_context_key_get(ctx, &cmm_thread_key);
52 LASSERT(info != NULL);
56 #define CMM_NO_SPLIT_EXPECTED 0
57 #define CMM_EXPECT_SPLIT 1
58 #define CMM_NO_SPLITTABLE 2
60 #define SPLIT_SIZE 64*1024
62 static int cmm_expect_splitting(const struct lu_context *ctx,
63 struct md_object *mo, struct md_attr *ma)
65 struct cmm_device *cmm = cmm_obj2dev(md2cmm_obj(mo));
68 if (cmm->cmm_tgt_count == 1)
69 RETURN(CMM_NO_SPLIT_EXPECTED);
71 if (ma->ma_attr.la_size < SPLIT_SIZE)
72 RETURN(CMM_NO_SPLIT_EXPECTED);
75 RETURN(CMM_NO_SPLIT_EXPECTED);
77 RETURN(CMM_EXPECT_SPLIT);
80 static inline struct lu_fid* cmm2_fid(struct cmm_object *obj)
82 return &(obj->cmo_obj.mo_lu.lo_header->loh_fid);
85 #define cmm_md_size(stripes) \
86 (sizeof(struct lmv_stripe_md) + stripes * sizeof(struct lu_fid))
88 static int cmm_alloc_fid(const struct lu_context *ctx, struct cmm_device *cmm,
89 struct lu_fid *fid, int count)
91 struct mdc_device *mc, *tmp;
94 LASSERT(count == cmm->cmm_tgt_count);
95 /* FIXME: this spin_lock maybe not proper,
96 * because fid_alloc may need RPC */
97 spin_lock(&cmm->cmm_tgt_guard);
98 list_for_each_entry_safe(mc, tmp, &cmm->cmm_targets,
100 if (cmm->cmm_local_num == mc->mc_num)
103 rc = obd_fid_alloc(mc->mc_desc.cl_exp, &fid[i++], NULL);
105 spin_unlock(&cmm->cmm_tgt_guard);
109 spin_unlock(&cmm->cmm_tgt_guard);
110 LASSERT(i + 1 == count);
116 struct cmm_object *cmm_object_find(const struct lu_context *ctxt,
117 struct cmm_device *d,
118 const struct lu_fid *f)
121 struct cmm_object *m;
124 o = lu_object_find(ctxt, d->cmm_md_dev.md_lu_dev.ld_site, f);
126 m = (struct cmm_object *)o;
128 m = lu2cmm_obj(lu_object_locate(o->lo_header,
129 d->cmm_md_dev.md_lu_dev.ld_type));
133 static inline void cmm_object_put(const struct lu_context *ctxt,
134 struct cmm_object *o)
136 lu_object_put(ctxt, &o->cmo_obj.mo_lu);
139 static int cmm_creat_remote_obj(const struct lu_context *ctx,
140 struct cmm_device *cmm,
141 struct lu_fid *fid, struct md_attr *ma)
143 struct cmm_object *obj;
144 struct md_create_spec *spec;
148 obj = cmm_object_find(ctx, cmm, fid);
150 RETURN(PTR_ERR(obj));
153 spec->u.sp_pfid = fid;
154 rc = mo_object_create(ctx, md_object_next(&obj->cmo_obj),
158 cmm_object_put(ctx, obj);
162 static int cmm_create_slave_objects(const struct lu_context *ctx,
163 struct md_object *mo, struct md_attr *ma)
165 struct cmm_device *cmm = cmm_obj2dev(md2cmm_obj(mo));
166 struct lmv_stripe_md *lmv = NULL;
168 struct lu_fid *lf = cmm2_fid(md2cmm_obj(mo));
171 lmv_size = cmm_md_size(cmm->cmm_tgt_count);
173 /* This lmv will be free after finish splitting. */
174 OBD_ALLOC(lmv, lmv_size);
178 lmv->mea_master = -1;
179 lmv->mea_magic = MEA_MAGIC_ALL_CHARS;
180 lmv->mea_count = cmm->cmm_tgt_count;
182 lmv->mea_ids[0] = *lf;
184 rc = cmm_alloc_fid(ctx, cmm, &lmv->mea_ids[1], cmm->cmm_tgt_count);
188 for (i = 1; i < cmm->cmm_tgt_count; i ++) {
189 rc = cmm_creat_remote_obj(ctx, cmm, &lmv->mea_ids[i], ma);
194 rc = mo_xattr_set(ctx, md_object_next(mo), lmv, lmv_size,
197 ma->ma_lmv_size = lmv_size;
203 static int cmm_send_split_pages(const struct lu_context *ctx,
204 struct md_object *mo, struct lu_rdpg *rdpg,
207 struct cmm_device *cmm = cmm_obj2dev(md2cmm_obj(mo));
208 struct cmm_object *obj;
212 obj = cmm_object_find(ctx, cmm, fid);
214 RETURN(PTR_ERR(obj));
216 for (i = 0; i < rdpg->rp_npages; i++) {
217 rc = mdc_send_page(ctx, md_object_next(&obj->cmo_obj),
223 cmm_object_put(ctx, obj);
227 static int cmm_split_entries(const struct lu_context *ctx, struct md_object *mo,
228 struct lu_rdpg *rdpg, struct lu_fid *lf)
230 struct lu_dirpage *dp;
235 /* init page with '0' */
236 for (i = 0; i < rdpg->rp_npages; i++) {
237 memset(kmap(rdpg->rp_pages[i]), 0, CFS_PAGE_SIZE);
238 kunmap(rdpg->rp_pages[i]);
241 /* Read splitted page and send them to the slave master */
243 rc = mo_readpage(ctx, md_object_next(mo), rdpg);
247 rc = cmm_send_split_pages(ctx, mo, rdpg, lf);
251 dp = kmap(rdpg->rp_pages[0]);
252 hash_end = dp->ldp_hash_end;
253 kunmap(rdpg->rp_pages[0]);
254 if (hash_end == ~0ul)
256 } while (hash_end < rdpg->rp_hash_end);
261 static int cmm_remove_entries(const struct lu_context *ctx,
262 struct md_object *mo, struct lu_rdpg *rdpg)
264 struct lu_dirpage *dp;
265 struct lu_dirent *ent;
269 for (i = 0; i < rdpg->rp_npages; i++) {
270 kmap(rdpg->rp_pages[i]);
271 dp = page_address(rdpg->rp_pages[i]);
272 for (ent = lu_dirent_start(dp); ent != NULL;
273 ent = lu_dirent_next(ent)) {
274 rc = mdo_name_remove(ctx, md_object_next(mo),
277 kunmap(rdpg->rp_pages[i]);
281 kunmap(rdpg->rp_pages[i]);
286 #define MAX_HASH_SIZE 0x3fffffff
287 #define SPLIT_PAGE_COUNT 1
288 static int cmm_scan_and_split(const struct lu_context *ctx,
289 struct md_object *mo, struct md_attr *ma)
291 struct cmm_device *cmm = cmm_obj2dev(md2cmm_obj(mo));
293 struct lu_rdpg *rdpg = NULL;
300 rdpg->rp_npages = SPLIT_PAGE_COUNT;
301 rdpg->rp_count = CFS_PAGE_SIZE * rdpg->rp_npages;
303 OBD_ALLOC(rdpg->rp_pages, rdpg->rp_npages * sizeof rdpg->rp_pages[0]);
304 if (rdpg->rp_pages == NULL)
305 GOTO(free_rdpg, rc = -ENOMEM);
307 for (i = 0; i < rdpg->rp_npages; i++) {
308 rdpg->rp_pages[i] = alloc_pages(GFP_KERNEL, 0);
309 if (rdpg->rp_pages[i] == NULL)
310 GOTO(cleanup, rc = -ENOMEM);
313 hash_segement = MAX_HASH_SIZE / cmm->cmm_tgt_count;
314 for (i = 1; i < cmm->cmm_tgt_count; i++) {
315 struct lu_fid *lf = &ma->ma_lmv->mea_ids[i];
317 rdpg->rp_hash = i * hash_segement;
318 rdpg->rp_hash_end = rdpg->rp_hash + hash_segement;
319 rc = cmm_remove_entries(ctx, mo, rdpg);
323 rc = cmm_split_entries(ctx, mo, rdpg, lf);
328 for (i = 0; i < rdpg->rp_npages; i++)
329 if (rdpg->rp_pages[i] != NULL)
330 __free_pages(rdpg->rp_pages[i], 0);
332 OBD_FREE(rdpg->rp_pages, rdpg->rp_npages *
333 sizeof rdpg->rp_pages[0]);
341 int cml_try_to_split(const struct lu_context *ctx, struct md_object *mo)
347 LASSERT(S_ISDIR(lu_object_attr(&mo->mo_lu)));
353 ma->ma_need = MA_INODE;
354 rc = mo_attr_get(ctx, mo, ma);
358 /* step1: checking whether the dir need to be splitted */
359 rc = cmm_expect_splitting(ctx, mo, ma);
360 if (rc != CMM_EXPECT_SPLIT)
361 GOTO(cleanup, rc = 0);
363 /* step2: create slave objects */
364 rc = cmm_create_slave_objects(ctx, mo, ma);
368 /* step3: scan and split the object */
369 rc = cmm_scan_and_split(ctx, mo, ma);
372 if (ma->ma_lmv_size && ma->ma_lmv)
373 OBD_FREE(ma->ma_lmv, ma->ma_lmv_size);