Whamcloud - gitweb
- make compiler happy with initialized value
[fs/lustre-release.git] / lustre / cmm / cmm_split.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  lustre/cmm/cmm_split.c
5  *  Lustre splitting dir
6  *
7  *  Copyright (c) 2006 Cluster File Systems, Inc.
8  *   Author: Alex thomas <alex@clusterfs.com>
9  *           Wang Di     <wangdi@clusterfs.com>
10  *
11  *   This file is part of the Lustre file system, http://www.lustre.org
12  *   Lustre is a trademark of Cluster File Systems, Inc.
13  *
14  *   You may have signed or agreed to another license before downloading
15  *   this software.  If so, you are bound by the terms and conditions
16  *   of that agreement, and the following does not apply to you.  See the
17  *   LICENSE file included with this distribution for more information.
18  *
19  *   If you did not agree to a different license, then this copy of Lustre
20  *   is open source software; you can redistribute it and/or modify it
21  *   under the terms of version 2 of the GNU General Public License as
22  *   published by the Free Software Foundation.
23  *
24  *   In either case, Lustre is distributed in the hope that it will be
25  *   useful, but WITHOUT ANY WARRANTY; without even the implied warranty
26  *   of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
27  *   license text for more details.
28  */
29
30 #ifndef EXPORT_SYMTAB
31 # define EXPORT_SYMTAB
32 #endif
33
34 #define DEBUG_SUBSYSTEM S_MDS
35
36 #include <obd_class.h>
37 #include <lustre_fid.h>
38 #include <lustre_mds.h>
39 #include <lustre/lustre_idl.h>
40 #include "cmm_internal.h"
41 #include "mdc_internal.h"
42
43 #define CMM_NO_SPLIT_EXPECTED   0
44 #define CMM_EXPECT_SPLIT        1
45 #define CMM_NO_SPLITTABLE       2
46
47 enum {
48         SPLIT_SIZE =  64*1024
49 };
50
51 static int cmm_expect_splitting(const struct lu_env *env,
52                                 struct md_object *mo,
53                                 struct md_attr *ma)
54 {
55         struct cmm_device *cmm = cmm_obj2dev(md2cmm_obj(mo));
56         struct lu_fid *fid = NULL;
57         int rc = CMM_EXPECT_SPLIT;
58         ENTRY;
59
60         if (cmm->cmm_tgt_count == 0)
61                 GOTO(cleanup, rc = CMM_NO_SPLIT_EXPECTED);
62
63         if (ma->ma_attr.la_size < SPLIT_SIZE)
64                 GOTO(cleanup, rc = CMM_NO_SPLIT_EXPECTED);
65
66         if (ma->ma_lmv_size)
67                 GOTO(cleanup, rc = CMM_NO_SPLIT_EXPECTED);
68         OBD_ALLOC_PTR(fid);
69         rc = cmm_child_ops(cmm)->mdo_root_get(env, cmm->cmm_child, fid);
70         if (rc)
71                 GOTO(cleanup, rc);
72
73         rc = CMM_EXPECT_SPLIT;
74
75         if (lu_fid_eq(fid, cmm2fid(md2cmm_obj(mo))))
76                 GOTO(cleanup, rc = CMM_NO_SPLIT_EXPECTED);
77
78         EXIT;
79 cleanup:
80         if (fid)
81                 OBD_FREE_PTR(fid);
82         return rc;
83 }
84
85 #define cmm_md_size(stripes) \
86        (sizeof(struct lmv_stripe_md) + (stripes) * sizeof(struct lu_fid))
87
88 struct cmm_object *cmm_object_find(const struct lu_env *env,
89                                    struct cmm_device *d,
90                                    const struct lu_fid *f)
91 {
92         struct lu_object *o;
93         struct cmm_object *m;
94         ENTRY;
95
96         o = lu_object_find(env, d->cmm_md_dev.md_lu_dev.ld_site, f);
97         if (IS_ERR(o))
98                 m = (struct cmm_object *)o;
99         else
100                 m = lu2cmm_obj(lu_object_locate(o->lo_header,
101                                d->cmm_md_dev.md_lu_dev.ld_type));
102         RETURN(m);
103 }
104
105 static inline void cmm_object_put(const struct lu_env *env,
106                                   struct cmm_object *o)
107 {
108         lu_object_put(env, &o->cmo_obj.mo_lu);
109 }
110
111 static int cmm_object_create(const struct lu_env *env,
112                              struct cmm_device *cmm,
113                              struct lu_fid *fid,
114                              struct md_attr *ma,
115                              struct lmv_stripe_md *lmv,
116                              int lmv_size)
117 {
118         struct md_create_spec *spec;
119         struct cmm_object *obj;
120         int rc;
121         ENTRY;
122
123         obj = cmm_object_find(env, cmm, fid);
124         if (IS_ERR(obj))
125                 RETURN(PTR_ERR(obj));
126
127         OBD_ALLOC_PTR(spec);
128
129         spec->u.sp_ea.fid = fid;
130         spec->u.sp_ea.eadata = lmv;
131         spec->u.sp_ea.eadatalen = lmv_size;
132         spec->sp_cr_flags |= MDS_CREATE_SLAVE_OBJ;
133         rc = mo_object_create(env, md_object_next(&obj->cmo_obj),
134                               spec, ma);
135         OBD_FREE_PTR(spec);
136
137         cmm_object_put(env, obj);
138         RETURN(rc);
139 }
140
141 static int cmm_fid_alloc(const struct lu_env *env,
142                          struct cmm_device *cmm,
143                          struct mdc_device *mc,
144                          struct lu_fid *fid)
145 {
146         int rc;
147         ENTRY;
148
149         LASSERT(cmm != NULL);
150         LASSERT(mc != NULL);
151         LASSERT(fid != NULL);
152
153         down(&mc->mc_fid_sem);
154
155         /* Alloc new fid on @mc. */
156         rc = obd_fid_alloc(mc->mc_desc.cl_exp, fid, NULL);
157         if (rc > 0) {
158                 /* Setup FLD for new sequenceif needed. */
159                 rc = fld_client_create(cmm->cmm_fld, fid_seq(fid),
160                                        mc->mc_num, env);
161                 if (rc)
162                         CERROR("Can't create fld entry, rc %d\n", rc);
163         }
164         up(&mc->mc_fid_sem);
165         
166         RETURN(rc);
167 }
168
169 static int cmm_slaves_create(const struct lu_env *env,
170                              struct md_object *mo,
171                              struct md_attr *ma)
172 {
173         struct cmm_device *cmm = cmm_obj2dev(md2cmm_obj(mo));
174         struct lmv_stripe_md *lmv = NULL, *slave_lmv = NULL;
175         struct lu_fid *lf = cmm2fid(md2cmm_obj(mo));
176         struct mdc_device *mc, *tmp;
177         int lmv_size, i = 1, rc = 0;
178         ENTRY;
179
180         lmv_size = cmm_md_size(cmm->cmm_tgt_count + 1);
181
182         /* This lmv will free after finish splitting. */
183         OBD_ALLOC(lmv, lmv_size);
184         if (!lmv)
185                 RETURN(-ENOMEM);
186
187         lmv->mea_master = cmm->cmm_local_num;
188         lmv->mea_magic = MEA_MAGIC_HASH_SEGMENT;
189         lmv->mea_count = cmm->cmm_tgt_count + 1;
190
191         /* Store master FID to local node idx number. */
192         lmv->mea_ids[0] = *lf;
193
194         OBD_ALLOC_PTR(slave_lmv);
195         if (!slave_lmv)
196                 GOTO(cleanup, rc = -ENOMEM);
197
198         slave_lmv->mea_master = cmm->cmm_local_num;
199         slave_lmv->mea_magic = MEA_MAGIC_HASH_SEGMENT;
200         slave_lmv->mea_count = 0;
201
202         list_for_each_entry_safe(mc, tmp, &cmm->cmm_targets, mc_linkage) {
203                 /* Alloc fid for slave object. */
204                 rc = cmm_fid_alloc(env, cmm, mc, &lmv->mea_ids[i]);
205                 if (rc) {
206                         CERROR("Can't alloc fid for slave "LPU64", rc %d\n",
207                                mc->mc_num, rc);
208                         GOTO(cleanup, rc);
209                 }
210
211                 /* Create slave on remote MDT. */
212                 rc = cmm_object_create(env, cmm, &lmv->mea_ids[i], ma,
213                                        slave_lmv, sizeof(*slave_lmv));
214                 if (rc)
215                         GOTO(cleanup, rc);
216                 i++;
217         }
218
219         ma->ma_lmv_size = lmv_size;
220         ma->ma_lmv = lmv;
221         EXIT;
222 cleanup:
223         if (slave_lmv)
224                 OBD_FREE_PTR(slave_lmv);
225         if (rc && lmv) {
226                 OBD_FREE(lmv, lmv_size);
227                 ma->ma_lmv = NULL;
228                 ma->ma_lmv_size = 0;
229         }
230         return rc;
231 }
232
233 static int cmm_send_split_pages(const struct lu_env *env,
234                                 struct md_object *mo,
235                                 struct lu_rdpg *rdpg,
236                                 struct lu_fid *fid, int len)
237 {
238         struct cmm_device *cmm = cmm_obj2dev(md2cmm_obj(mo));
239         struct cmm_object *obj;
240         int rc = 0;
241         ENTRY;
242
243         obj = cmm_object_find(env, cmm, fid);
244         if (IS_ERR(obj))
245                 RETURN(PTR_ERR(obj));
246
247         rc = mdc_send_page(cmm, env, md_object_next(&obj->cmo_obj),
248                            rdpg->rp_pages[0], len);
249         cmm_object_put(env, obj);
250         RETURN(rc);
251 }
252
253 static int cmm_remove_dir_ent(const struct lu_env *env,
254                               struct md_object *mo,
255                               struct lu_dirent *ent)
256 {
257         struct cmm_device *cmm = cmm_obj2dev(md2cmm_obj(mo));
258         struct cmm_object *obj;
259         char *name;
260         int is_dir, rc;
261         ENTRY;
262
263         if (!strncmp(ent->lde_name, ".", ent->lde_namelen) ||
264             !strncmp(ent->lde_name, "..", ent->lde_namelen))
265                 RETURN(0);
266
267         obj = cmm_object_find(env, cmm, &ent->lde_fid);
268         if (IS_ERR(obj))
269                 RETURN(PTR_ERR(obj));
270
271         if (lu_object_exists(&obj->cmo_obj.mo_lu) > 0)
272                 is_dir = S_ISDIR(lu_object_attr(&obj->cmo_obj.mo_lu));
273         else
274                 /* XXX: is this correct? */
275                 is_dir = 1;
276
277         OBD_ALLOC(name, ent->lde_namelen + 1);
278         if (!name)
279                 GOTO(cleanup, rc = -ENOMEM);
280
281         memcpy(name, ent->lde_name, ent->lde_namelen);
282         rc = mdo_name_remove(env, md_object_next(mo),
283                              name, is_dir);
284         OBD_FREE(name, ent->lde_namelen + 1);
285         if (rc)
286                 GOTO(cleanup, rc);
287
288         /*
289          * This ent will be transferred to slave MDS and insert it there, so in
290          * the slave MDS, we should know whether this object is dir or not, so
291          * use the highest bit of the hash to indicate that (because we do not
292          * use highest bit of hash).
293          */
294         if (is_dir)
295                 ent->lde_hash |= MAX_HASH_HIGHEST_BIT;
296 cleanup:
297         cmm_object_put(env, obj);
298
299         RETURN(rc);
300 }
301
302 static int cmm_remove_entries(const struct lu_env *env,
303                               struct md_object *mo, struct lu_rdpg *rdpg,
304                               __u32 hash_end, __u32 *len)
305 {
306         struct lu_dirpage *dp;
307         struct lu_dirent  *ent;
308         int rc = 0;
309         ENTRY;
310
311         kmap(rdpg->rp_pages[0]);
312         dp = page_address(rdpg->rp_pages[0]);
313         for (ent = lu_dirent_start(dp); ent != NULL;
314              ent = lu_dirent_next(ent)) {
315                 if (ent->lde_hash < hash_end) {
316                         rc = cmm_remove_dir_ent(env, mo, ent);
317                         if (rc) {
318                                 CERROR("Can not del %s rc %d\n", ent->lde_name,
319                                                                  rc);
320                                 GOTO(unmap, rc);
321                         }
322                 } else {
323                         if (ent != lu_dirent_start(dp))
324                                 *len = (int)((__u32)ent - (__u32)dp);
325                         else
326                                 *len = 0;
327                         GOTO(unmap, rc);
328                 }
329         }
330         *len = CFS_PAGE_SIZE;
331         EXIT;
332 unmap:
333         kunmap(rdpg->rp_pages[0]);
334         return rc;
335 }
336
337 static int cmm_split_entries(const struct lu_env *env,
338                              struct md_object *mo, struct lu_rdpg *rdpg,
339                              struct lu_fid *lf, __u32 end)
340 {
341         int rc, done = 0;
342         ENTRY;
343
344         LASSERTF(rdpg->rp_npages == 1, "Now Only support split 1 page each time"
345                  "npages %d\n", rdpg->rp_npages);
346
347         /* Read split page and send them to the slave master. */
348         do {
349                 struct lu_dirpage *ldp;
350                 __u32  len = 0;
351
352                 /* init page with '0' */
353                 memset(kmap(rdpg->rp_pages[0]), 0, CFS_PAGE_SIZE);
354                 kunmap(rdpg->rp_pages[0]);
355
356                 rc = mo_readpage(env, md_object_next(mo), rdpg);
357                 if (rc)
358                         RETURN(rc);
359
360                 /* Remove the old entries */
361                 rc = cmm_remove_entries(env, mo, rdpg, end, &len);
362                 if (rc)
363                         RETURN(rc);
364
365                 /* Send page to slave object */
366                 if (len > 0) {
367                         rc = cmm_send_split_pages(env, mo, rdpg, lf, len);
368                         if (rc)
369                                 RETURN(rc);
370                 }
371
372                 kmap(rdpg->rp_pages[0]);
373                 ldp = page_address(rdpg->rp_pages[0]);
374                 if (ldp->ldp_hash_end >= end) {
375                         done = 1;
376                 }
377                 rdpg->rp_hash = ldp->ldp_hash_end;
378                 kunmap(rdpg->rp_pages[0]);
379         } while (!done);
380
381         RETURN(rc);
382 }
383
384 #define SPLIT_PAGE_COUNT 1
385
386 static int cmm_scan_and_split(const struct lu_env *env,
387                               struct md_object *mo,
388                               struct md_attr *ma)
389 {
390         struct cmm_device *cmm = cmm_obj2dev(md2cmm_obj(mo));
391         struct lu_rdpg *rdpg = NULL;
392         __u32 hash_segement;
393         int rc = 0, i;
394
395         OBD_ALLOC_PTR(rdpg);
396         if (!rdpg)
397                 RETURN(-ENOMEM);
398
399         rdpg->rp_npages = SPLIT_PAGE_COUNT;
400         rdpg->rp_count  = CFS_PAGE_SIZE * rdpg->rp_npages;
401
402         OBD_ALLOC(rdpg->rp_pages, rdpg->rp_npages * sizeof(rdpg->rp_pages[0]));
403         if (rdpg->rp_pages == NULL)
404                 GOTO(free_rdpg, rc = -ENOMEM);
405
406         for (i = 0; i < rdpg->rp_npages; i++) {
407                 rdpg->rp_pages[i] = alloc_pages(GFP_KERNEL, 0);
408                 if (rdpg->rp_pages[i] == NULL)
409                         GOTO(cleanup, rc = -ENOMEM);
410         }
411
412         hash_segement = MAX_HASH_SIZE / (cmm->cmm_tgt_count + 1);
413         for (i = 1; i < cmm->cmm_tgt_count + 1; i++) {
414                 struct lu_fid *lf;
415                 __u32 hash_end;
416
417                 lf = &ma->ma_lmv->mea_ids[i];
418
419                 rdpg->rp_hash = i * hash_segement;
420                 hash_end = rdpg->rp_hash + hash_segement;
421                 rc = cmm_split_entries(env, mo, rdpg, lf, hash_end);
422                 if (rc)
423                         GOTO(cleanup, rc);
424         }
425         EXIT;
426 cleanup:
427         for (i = 0; i < rdpg->rp_npages; i++)
428                 if (rdpg->rp_pages[i] != NULL)
429                         __free_pages(rdpg->rp_pages[i], 0);
430         if (rdpg->rp_pages)
431                 OBD_FREE(rdpg->rp_pages, rdpg->rp_npages *
432                          sizeof rdpg->rp_pages[0]);
433 free_rdpg:
434         if (rdpg)
435                 OBD_FREE_PTR(rdpg);
436
437         return rc;
438 }
439
440 static struct lu_buf *cmm_buf_get(const struct lu_env *env, void *area,
441                                   ssize_t len)
442 {
443         struct lu_buf *buf;
444
445         buf = &cmm_env_info(env)->cmi_buf;
446         buf->lb_buf = area;
447         buf->lb_len = len;
448         return buf;
449 }
450
451 int cml_try_to_split(const struct lu_env *env, struct md_object *mo)
452 {
453         struct cmm_device *cmm = cmm_obj2dev(md2cmm_obj(mo));
454         struct md_attr *ma = &cmm_env_info(env)->cmi_ma;
455         struct lu_buf *buf;
456         int rc = 0;
457         ENTRY;
458
459         LASSERT(S_ISDIR(lu_object_attr(&mo->mo_lu)));
460         
461         memset(ma, 0, sizeof(*ma));
462         ma->ma_need = MA_INODE | MA_LMV;
463         rc = mo_attr_get(env, mo, ma);
464         if (rc)
465                 GOTO(cleanup, ma);
466
467         /* step1: checking whether the dir need to be splitted */
468         rc = cmm_expect_splitting(env, mo, ma);
469         if (rc != CMM_EXPECT_SPLIT)
470                 GOTO(cleanup, rc = 0);
471
472         /*
473          * Disable trans for splitting, since there will be so many trans in
474          * this one ops, confilct with current recovery design.
475          */
476         rc = cmm_upcall(env, &cmm->cmm_md_dev, MD_NO_TRANS);
477         if (rc)
478                 GOTO(cleanup, rc = 0);
479
480         /* step2: create slave objects */
481         rc = cmm_slaves_create(env, mo, ma);
482         if (rc)
483                 GOTO(cleanup, ma);
484
485         /* step3: scan and split the object */
486         rc = cmm_scan_and_split(env, mo, ma);
487         if (rc)
488                 GOTO(cleanup, ma);
489
490         buf = cmm_buf_get(env, ma->ma_lmv, ma->ma_lmv_size);
491         
492         /* step4: set mea to the master object */
493         rc = mo_xattr_set(env, md_object_next(mo), buf, MDS_LMV_MD_NAME, 0);
494         if (rc == -ERESTART)
495                 CWARN("Dir "DFID" has been split\n",
496                       PFID(lu_object_fid(&mo->mo_lu)));
497         EXIT;
498 cleanup:
499         if (ma->ma_lmv_size && ma->ma_lmv)
500                 OBD_FREE(ma->ma_lmv, ma->ma_lmv_size);
501         
502         return rc;
503 }
504