Whamcloud - gitweb
some code cleanup in split.
[fs/lustre-release.git] / lustre / cmm / cmm_split.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  lustre/cmm/cmm_split.c
5  *  Lustre splitting dir
6  *
7  *  Copyright (c) 2006 Cluster File Systems, Inc.
8  *   Author: Alex thomas <alex@clusterfs.com>
9  *           Wang Di     <wangdi@clusterfs.com>
10  *
11  *   This file is part of the Lustre file system, http://www.lustre.org
12  *   Lustre is a trademark of Cluster File Systems, Inc.
13  *
14  *   You may have signed or agreed to another license before downloading
15  *   this software.  If so, you are bound by the terms and conditions
16  *   of that agreement, and the following does not apply to you.  See the
17  *   LICENSE file included with this distribution for more information.
18  *
19  *   If you did not agree to a different license, then this copy of Lustre
20  *   is open source software; you can redistribute it and/or modify it
21  *   under the terms of version 2 of the GNU General Public License as
22  *   published by the Free Software Foundation.
23  *
24  *   In either case, Lustre is distributed in the hope that it will be
25  *   useful, but WITHOUT ANY WARRANTY; without even the implied warranty
26  *   of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
27  *   license text for more details.
28  */
29
30 #ifndef EXPORT_SYMTAB
31 # define EXPORT_SYMTAB
32 #endif
33
34 #define DEBUG_SUBSYSTEM S_MDS
35
36 #include <obd_class.h>
37 #include <lustre_fid.h>
38 #include <lustre_mds.h>
39 #include <lustre/lustre_idl.h>
40 #include "cmm_internal.h"
41 #include "mdc_internal.h"
42
43 static struct lu_buf *cmm_buf_get(const struct lu_env *env, void *area,
44                                   ssize_t len)
45 {
46         struct lu_buf *buf;
47
48         buf = &cmm_env_info(env)->cmi_buf;
49         buf->lb_buf = area;
50         buf->lb_len = len;
51         return buf;
52 }
53
54 int cmm_mdsnum_check(const struct lu_env *env, struct md_object *mp,
55                      const char *name)
56 {
57         struct cmm_device *cmm = cmm_obj2dev(md2cmm_obj(mp));
58         struct md_attr *ma = &cmm_env_info(env)->cmi_ma;
59         int rc;
60         ENTRY;
61         
62         if (cmm->cmm_tgt_count == 0)
63                 RETURN(0);
64
65         /* Try to get the LMV EA size */
66         memset(ma, 0, sizeof(*ma));
67         ma->ma_need = MA_LMV;
68         rc = mo_attr_get(env, mp, ma);
69         if (rc)
70                 RETURN(rc);
71
72         if (ma->ma_valid & MA_LMV) {
73                 int stripe;
74
75                 /* 
76                  * Clean MA_LMV in ->ma_valid because mdd will do nothing
77                  * counting that EA is already taken.
78                  */
79                 ma->ma_valid &= ~MA_LMV;
80
81                 LASSERT(ma->ma_lmv_size > 0);
82                 OBD_ALLOC(ma->ma_lmv, ma->ma_lmv_size);
83                 if (ma->ma_lmv == NULL)
84                         RETURN(-ENOMEM);
85
86                 /* Get LMV EA */
87                 ma->ma_need = MA_LMV;
88                 rc = mo_attr_get(env, mp, ma);
89                 
90                 /* Skip checking the slave dirs (mea_count is 0) */
91                 if (rc == 0 && ma->ma_lmv->mea_count != 0) {
92                         /* 
93                          * Get stripe by name to check the name belongs to
94                          * master dir, otherwise return the -ERESTART
95                          */
96                         stripe = mea_name2idx(ma->ma_lmv, name, strlen(name));
97                 
98                         /* Master stripe is always 0 */
99                         if (stripe != 0)
100                                 rc = -ERESTART;
101                 }
102                 OBD_FREE(ma->ma_lmv, ma->ma_lmv_size);
103         }
104         RETURN(rc);
105 }
106
107 int cmm_expect_splitting(const struct lu_env *env, struct md_object *mo,
108                          struct md_attr *ma, int *split)
109 {
110         struct cmm_device *cmm = cmm_obj2dev(md2cmm_obj(mo));
111         struct lu_fid root_fid;
112         int rc;
113         ENTRY;
114
115         /* 
116          * Check first most light things like tgt count and root fid. For some
117          * case this style should yeild better performance.
118          */
119         if (cmm->cmm_tgt_count == 0) {
120                 *split = CMM_NO_SPLIT_EXPECTED;
121                 RETURN(0);
122         }
123
124         rc = cmm_child_ops(cmm)->mdo_root_get(env, cmm->cmm_child,
125                                               &root_fid);
126         if (rc)
127                 RETURN(rc);
128
129         if (lu_fid_eq(&root_fid, cmm2fid(md2cmm_obj(mo)))) {
130                 *split = CMM_NOT_SPLITTABLE;
131                 RETURN(0);
132         }
133
134         /* 
135          * MA_INODE is needed to check inode size. 
136          * Memory is prepared by caller.
137          */
138         ma->ma_need = MA_INODE | MA_LMV;
139         rc = mo_attr_get(env, mo, ma);
140         if (rc)
141                 RETURN(rc);
142         
143         if (ma->ma_valid & MA_LMV) {
144                 *split = CMM_NOT_SPLITTABLE;
145                 RETURN(0);
146         }
147                 
148         if (ma->ma_attr.la_size < CMM_SPLIT_SIZE) {
149                 *split = CMM_NO_SPLIT_EXPECTED;
150                 RETURN(0);
151         }
152         
153         *split = CMM_EXPECT_SPLIT;
154         RETURN(0);
155 }
156
157 struct cmm_object *cmm_object_find(const struct lu_env *env,
158                                    struct cmm_device *d,
159                                    const struct lu_fid *f)
160 {
161         struct lu_object *o;
162         struct cmm_object *m;
163         ENTRY;
164
165         o = lu_object_find(env, d->cmm_md_dev.md_lu_dev.ld_site, f);
166         if (IS_ERR(o))
167                 m = (struct cmm_object *)o;
168         else
169                 m = lu2cmm_obj(lu_object_locate(o->lo_header,
170                                d->cmm_md_dev.md_lu_dev.ld_type));
171         RETURN(m);
172 }
173
174 static inline void cmm_object_put(const struct lu_env *env,
175                                   struct cmm_object *o)
176 {
177         lu_object_put(env, &o->cmo_obj.mo_lu);
178 }
179
180 static int cmm_object_create(const struct lu_env *env,
181                              struct cmm_device *cmm,
182                              struct lu_fid *fid,
183                              struct md_attr *ma,
184                              struct lmv_stripe_md *lmv,
185                              int lmv_size)
186 {
187         struct md_create_spec *spec;
188         struct cmm_object *obj;
189         int rc;
190         ENTRY;
191
192         obj = cmm_object_find(env, cmm, fid);
193         if (IS_ERR(obj))
194                 RETURN(PTR_ERR(obj));
195
196         OBD_ALLOC_PTR(spec);
197
198         spec->u.sp_ea.fid = fid;
199         spec->u.sp_ea.eadata = lmv;
200         spec->u.sp_ea.eadatalen = lmv_size;
201         spec->sp_cr_flags |= MDS_CREATE_SLAVE_OBJ;
202         rc = mo_object_create(env, md_object_next(&obj->cmo_obj),
203                               spec, ma);
204         OBD_FREE_PTR(spec);
205
206         cmm_object_put(env, obj);
207         RETURN(rc);
208 }
209
210 static int cmm_fid_alloc(const struct lu_env *env,
211                          struct cmm_device *cmm,
212                          struct mdc_device *mc,
213                          struct lu_fid *fid)
214 {
215         int rc;
216         ENTRY;
217
218         LASSERT(cmm != NULL);
219         LASSERT(mc != NULL);
220         LASSERT(fid != NULL);
221
222         down(&mc->mc_fid_sem);
223
224         /* Alloc new fid on @mc. */
225         rc = obd_fid_alloc(mc->mc_desc.cl_exp, fid, NULL);
226         if (rc > 0) {
227                 /* Setup FLD for new sequenceif needed. */
228                 rc = fld_client_create(cmm->cmm_fld, fid_seq(fid),
229                                        mc->mc_num, env);
230                 if (rc)
231                         CERROR("Can't create fld entry, rc %d\n", rc);
232         }
233         up(&mc->mc_fid_sem);
234         
235         RETURN(rc);
236 }
237
238 static int cmm_slaves_create(const struct lu_env *env,
239                              struct md_object *mo,
240                              struct md_attr *ma)
241 {
242         struct cmm_device    *cmm = cmm_obj2dev(md2cmm_obj(mo));
243         struct lu_fid        *lf  = cmm2fid(md2cmm_obj(mo));
244         struct lmv_stripe_md *lmv;
245         struct lmv_stripe_md *slave_lmv = NULL;
246         struct mdc_device    *mc, *tmp;
247         int i = 1, rc = 0;
248         ENTRY;
249
250         lmv = ma->ma_lmv;
251         lmv->mea_master = cmm->cmm_local_num;
252         lmv->mea_magic = MEA_MAGIC_HASH_SEGMENT;
253         lmv->mea_count = cmm->cmm_tgt_count + 1;
254
255         /* Store master FID to local node idx number. */
256         lmv->mea_ids[0] = *lf;
257
258         OBD_ALLOC_PTR(slave_lmv);
259         if (slave_lmv == NULL)
260                 RETURN(-ENOMEM);
261
262         slave_lmv->mea_master = cmm->cmm_local_num;
263         slave_lmv->mea_magic = MEA_MAGIC_HASH_SEGMENT;
264         slave_lmv->mea_count = 0;
265
266         list_for_each_entry_safe(mc, tmp, &cmm->cmm_targets, mc_linkage) {
267                 /* Alloc fid for slave object. */
268                 rc = cmm_fid_alloc(env, cmm, mc, &lmv->mea_ids[i]);
269                 if (rc) {
270                         CERROR("Can't alloc fid for slave "LPU64", rc %d\n",
271                                mc->mc_num, rc);
272                         GOTO(cleanup, rc);
273                 }
274
275                 /* Create slave on remote MDT. */
276                 rc = cmm_object_create(env, cmm, &lmv->mea_ids[i], ma,
277                                        slave_lmv, sizeof(*slave_lmv));
278                 if (rc)
279                         GOTO(cleanup, rc);
280                 i++;
281         }
282
283         EXIT;
284 cleanup:
285         OBD_FREE_PTR(slave_lmv);
286         return rc;
287 }
288
289 static int cmm_send_split_pages(const struct lu_env *env,
290                                 struct md_object *mo,
291                                 struct lu_rdpg *rdpg,
292                                 struct lu_fid *fid, int len)
293 {
294         struct cmm_device *cmm = cmm_obj2dev(md2cmm_obj(mo));
295         struct cmm_object *obj;
296         int rc = 0;
297         ENTRY;
298
299         obj = cmm_object_find(env, cmm, fid);
300         if (IS_ERR(obj))
301                 RETURN(PTR_ERR(obj));
302
303         rc = mdc_send_page(cmm, env, md_object_next(&obj->cmo_obj),
304                            rdpg->rp_pages[0], len);
305         cmm_object_put(env, obj);
306         RETURN(rc);
307 }
308
309 static int cmm_remove_dir_ent(const struct lu_env *env,
310                               struct md_object *mo,
311                               struct lu_dirent *ent)
312 {
313         struct cmm_device *cmm = cmm_obj2dev(md2cmm_obj(mo));
314         struct cmm_object *obj;
315         char *name;
316         int is_dir, rc;
317         ENTRY;
318
319         if (!strncmp(ent->lde_name, ".", ent->lde_namelen) ||
320             !strncmp(ent->lde_name, "..", ent->lde_namelen))
321                 RETURN(0);
322
323         obj = cmm_object_find(env, cmm, &ent->lde_fid);
324         if (IS_ERR(obj))
325                 RETURN(PTR_ERR(obj));
326
327         if (lu_object_exists(&obj->cmo_obj.mo_lu) > 0)
328                 is_dir = S_ISDIR(lu_object_attr(&obj->cmo_obj.mo_lu));
329         else
330                 /* XXX: is this correct? */
331                 is_dir = 1;
332
333         OBD_ALLOC(name, ent->lde_namelen + 1);
334         if (!name)
335                 GOTO(cleanup, rc = -ENOMEM);
336
337         memcpy(name, ent->lde_name, ent->lde_namelen);
338         rc = mdo_name_remove(env, md_object_next(mo),
339                              name, is_dir);
340         OBD_FREE(name, ent->lde_namelen + 1);
341         if (rc)
342                 GOTO(cleanup, rc);
343
344         /*
345          * This ent will be transferred to slave MDS and insert it there, so in
346          * the slave MDS, we should know whether this object is dir or not, so
347          * use the highest bit of the hash to indicate that (because we do not
348          * use highest bit of hash).
349          */
350         if (is_dir)
351                 ent->lde_hash |= MAX_HASH_HIGHEST_BIT;
352 cleanup:
353         cmm_object_put(env, obj);
354
355         RETURN(rc);
356 }
357
358 static int cmm_remove_entries(const struct lu_env *env,
359                               struct md_object *mo, struct lu_rdpg *rdpg,
360                               __u32 hash_end, __u32 *len)
361 {
362         struct lu_dirpage *dp;
363         struct lu_dirent  *ent;
364         int rc = 0;
365         ENTRY;
366
367         kmap(rdpg->rp_pages[0]);
368         dp = page_address(rdpg->rp_pages[0]);
369         for (ent = lu_dirent_start(dp); ent != NULL;
370              ent = lu_dirent_next(ent)) {
371                 if (ent->lde_hash < hash_end) {
372                         rc = cmm_remove_dir_ent(env, mo, ent);
373                         if (rc) {
374                                 CERROR("Can not del %s rc %d\n", ent->lde_name,
375                                                                  rc);
376                                 GOTO(unmap, rc);
377                         }
378                 } else {
379                         if (ent != lu_dirent_start(dp))
380                                 *len = (int)((__u32)ent - (__u32)dp);
381                         else
382                                 *len = 0;
383                         GOTO(unmap, rc);
384                 }
385         }
386         *len = CFS_PAGE_SIZE;
387         EXIT;
388 unmap:
389         kunmap(rdpg->rp_pages[0]);
390         return rc;
391 }
392
393 static int cmm_split_entries(const struct lu_env *env,
394                              struct md_object *mo, struct lu_rdpg *rdpg,
395                              struct lu_fid *lf, __u32 end)
396 {
397         int rc, done = 0;
398         ENTRY;
399
400         LASSERTF(rdpg->rp_npages == 1, "Now Only support split 1 page each time"
401                  "npages %d\n", rdpg->rp_npages);
402
403         /* Read split page and send them to the slave master. */
404         do {
405                 struct lu_dirpage *ldp;
406                 __u32  len = 0;
407
408                 /* init page with '0' */
409                 memset(kmap(rdpg->rp_pages[0]), 0, CFS_PAGE_SIZE);
410                 kunmap(rdpg->rp_pages[0]);
411
412                 rc = mo_readpage(env, md_object_next(mo), rdpg);
413                 if (rc) {
414                         CERROR("Error in readpage: %d\n", rc);
415                         RETURN(rc);
416                 }
417
418                 /* Remove the old entries */
419                 rc = cmm_remove_entries(env, mo, rdpg, end, &len);
420                 if (rc) {
421                         CERROR("Error in remove entry: %d\n", rc);
422                         RETURN(rc);
423                 }
424
425                 /* Send page to slave object */
426                 if (len > 0) {
427                         rc = cmm_send_split_pages(env, mo, rdpg, lf, len);
428                         if (rc) {
429                                 CERROR("Error in sending pages: %d\n", rc);
430                                 RETURN(rc);
431                         }
432                 }
433
434                 kmap(rdpg->rp_pages[0]);
435                 ldp = page_address(rdpg->rp_pages[0]);
436                 if (ldp->ldp_hash_end >= end) {
437                         done = 1;
438                 }
439                 rdpg->rp_hash = ldp->ldp_hash_end;
440                 kunmap(rdpg->rp_pages[0]);
441         } while (!done);
442
443         RETURN(rc);
444 }
445
446 #define SPLIT_PAGE_COUNT 1
447
448 static int cmm_scan_and_split(const struct lu_env *env,
449                               struct md_object *mo,
450                               struct md_attr *ma)
451 {
452         struct cmm_device *cmm = cmm_obj2dev(md2cmm_obj(mo));
453         struct lu_rdpg *rdpg = NULL;
454         __u32 hash_segement;
455         int rc = 0, i;
456
457         OBD_ALLOC_PTR(rdpg);
458         if (!rdpg)
459                 RETURN(-ENOMEM);
460
461         rdpg->rp_npages = SPLIT_PAGE_COUNT;
462         rdpg->rp_count  = CFS_PAGE_SIZE * rdpg->rp_npages;
463
464         OBD_ALLOC(rdpg->rp_pages, rdpg->rp_npages * sizeof(rdpg->rp_pages[0]));
465         if (rdpg->rp_pages == NULL)
466                 GOTO(free_rdpg, rc = -ENOMEM);
467
468         for (i = 0; i < rdpg->rp_npages; i++) {
469                 rdpg->rp_pages[i] = alloc_pages(GFP_KERNEL, 0);
470                 if (rdpg->rp_pages[i] == NULL)
471                         GOTO(cleanup, rc = -ENOMEM);
472         }
473
474         hash_segement = MAX_HASH_SIZE / (cmm->cmm_tgt_count + 1);
475         for (i = 1; i < cmm->cmm_tgt_count + 1; i++) {
476                 struct lu_fid *lf;
477                 __u32 hash_end;
478
479                 lf = &ma->ma_lmv->mea_ids[i];
480
481                 rdpg->rp_hash = i * hash_segement;
482                 hash_end = rdpg->rp_hash + hash_segement;
483                 rc = cmm_split_entries(env, mo, rdpg, lf, hash_end);
484                 if (rc) {
485                         CERROR("Error (rc=%d) while splitting for %d: fid="
486                                 DFID", %08x:%08x\n", rc, i, PFID(lf), 
487                                 rdpg->rp_hash, hash_end);
488                         GOTO(cleanup, rc);
489                 }
490         }
491         EXIT;
492 cleanup:
493         for (i = 0; i < rdpg->rp_npages; i++)
494                 if (rdpg->rp_pages[i] != NULL)
495                         __free_pages(rdpg->rp_pages[i], 0);
496         if (rdpg->rp_pages)
497                 OBD_FREE(rdpg->rp_pages, rdpg->rp_npages *
498                          sizeof rdpg->rp_pages[0]);
499 free_rdpg:
500         if (rdpg)
501                 OBD_FREE_PTR(rdpg);
502
503         return rc;
504 }
505
506 #define cmm_md_size(stripes) \
507        (sizeof(struct lmv_stripe_md) + (stripes) * sizeof(struct lu_fid))
508
509 int cmm_try_to_split(const struct lu_env *env, struct md_object *mo)
510 {
511         struct cmm_device *cmm = cmm_obj2dev(md2cmm_obj(mo));
512         struct md_attr    *ma = &cmm_env_info(env)->cmi_ma;
513         struct lu_buf     *buf;
514         int rc = 0, split, lmv_size;
515         ENTRY;
516
517         LASSERT(S_ISDIR(lu_object_attr(&mo->mo_lu)));
518
519         memset(ma, 0, sizeof(*ma));
520         lmv_size = ma->ma_lmv_size = cmm_md_size(cmm->cmm_tgt_count + 1);
521
522         /* 
523          * Preparing memory for LMV. This will be freed after finish splitting.
524          */
525         OBD_ALLOC(ma->ma_lmv, ma->ma_lmv_size);
526         if (ma->ma_lmv == NULL)
527                 RETURN(-ENOMEM);
528
529         /* Step1: Checking whether the dir needs to be split. */
530         rc = cmm_expect_splitting(env, mo, ma, &split);
531         if (rc)
532                 GOTO(cleanup, rc);
533         
534         if (split != CMM_EXPECT_SPLIT)
535                 GOTO(cleanup, rc = 0);
536
537         LASSERTF(mo->mo_pdo_mode == MDL_EX, "Split is only valid if "
538                  "dir is protected by MDL_EX lock. Lock mode 0x%x\n",
539                  (int)mo->mo_pdo_mode);
540         
541         /*
542          * Disable trans for splitting, since there will be so many trans in
543          * this one ops, confilct with current recovery design.
544          */
545         rc = cmm_upcall(env, &cmm->cmm_md_dev, MD_NO_TRANS);
546         if (rc) {
547                 CERROR("Can't disable trans for split, rc %d\n", rc);
548                 GOTO(cleanup, rc);
549         }
550
551         /* Step2: Create slave objects (on slave MDTs) */
552         ma->ma_valid = 0;
553         ma->ma_lmv_size = lmv_size;
554         rc = cmm_slaves_create(env, mo, ma);
555         if (rc) {
556                 CERROR("Can't create slaves for split, rc %d\n", rc);
557                 GOTO(cleanup, rc);
558         }
559
560         /* Step3: Scan and split the object. */
561         rc = cmm_scan_and_split(env, mo, ma);
562         if (rc) {
563                 CERROR("Can't scan and split, rc %d\n", rc);
564                 GOTO(cleanup, rc);
565         }
566
567         buf = cmm_buf_get(env, ma->ma_lmv, ma->ma_lmv_size);
568         
569         /* Step4: Set mea to the master object. */
570         rc = mo_xattr_set(env, md_object_next(mo), buf,
571                           MDS_LMV_MD_NAME, 0);
572         if (rc == 0) {
573                 CWARN("Dir "DFID" has been split\n",
574                       PFID(lu_object_fid(&mo->mo_lu)));
575                 rc = -ERESTART;
576         } else {
577                 CERROR("Can't set MEA to master dir, "
578                        "rc %d\n", rc);
579         }
580         EXIT;
581 cleanup:
582         OBD_FREE(ma->ma_lmv, lmv_size);
583         return rc;
584 }
585