Whamcloud - gitweb
(1) fixing typo in mdd;
[fs/lustre-release.git] / lustre / mdd / mdd_lov.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  linux/mds/mds_lov.c
5  *  Lustre Metadata Server (mds) handling of striped file data
6  *
7  *  Copyright (C) 2001-2006 Cluster File Systems, Inc.
8  *   Author: Peter Braam <braam@clusterfs.com>
9  *           wangdi <wangdi@clusterfs.com>
10  *
11  *   This file is part of the Lustre file system, http://www.lustre.org
12  *   Lustre is a trademark of Cluster File Systems, Inc.
13  *
14  *   You may have signed or agreed to another license before downloading
15  *   this software.  If so, you are bound by the terms and conditions
16  *   of that agreement, and the following does not apply to you.  See the
17  *   LICENSE file included with this distribution for more information.
18  *
19  *   If you did not agree to a different license, then this copy of Lustre
20  *   is open source software; you can redistribute it and/or modify it
21  *   under the terms of version 2 of the GNU General Public License as
22  *   published by the Free Software Foundation.
23  *
24  *   In either case, Lustre is distributed in the hope that it will be
25  *   useful, but WITHOUT ANY WARRANTY; without even the implied warranty
26  *   of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
27  *   license text for more details.
28  */
29 #ifndef EXPORT_SYMTAB
30 # define EXPORT_SYMTAB
31 #endif
32 #define DEBUG_SUBSYSTEM S_MDS
33
34 #include <linux/module.h>
35
36 #include <obd.h>
37 #include <obd_class.h>
38 #include <lustre_ver.h>
39 #include <obd_support.h>
40 #include <obd_lov.h>
41 #include <lprocfs_status.h>
42
43 #include <lu_object.h>
44 #include <md_object.h>
45 #include <dt_object.h>
46 #include <lustre_mds.h>
47 #include <lustre_fid.h>
48 #include <lustre/lustre_idl.h>
49
50 #include "mdd_internal.h"
51
52 static int mdd_lov_update(struct obd_device *host,
53                           struct obd_device *watched,
54                           enum obd_notify_event ev, void *owner)
55 {
56         struct mdd_device *mdd = owner;
57         struct obd_device *obd;
58         struct md_device *upcall_dev;
59         int rc;
60         ENTRY;
61
62         LASSERT(owner != NULL);
63         obd = mdd2_obd(mdd);
64
65         upcall_dev = mdd->mdd_md_dev.md_upcall.mu_upcall_dev;
66
67         rc = upcall_dev->md_upcall.mu_upcall(NULL, upcall_dev, MD_LOV_SYNC);
68
69         RETURN(rc);
70 }
71
72 /*The obd is created for handling data stack for mdd*/
73 int mdd_init_obd(const struct lu_context *ctxt, struct mdd_device *mdd,
74                  char *dev)
75 {
76         struct lustre_cfg_bufs bufs;
77         struct lustre_cfg      *lcfg;
78         struct obd_device      *obd;
79         int rc;
80         ENTRY;
81
82         lustre_cfg_bufs_reset(&bufs, MDD_OBD_NAME);
83         lustre_cfg_bufs_set_string(&bufs, 1, MDD_OBD_TYPE);
84         lustre_cfg_bufs_set_string(&bufs, 2, MDD_OBD_UUID);
85         lustre_cfg_bufs_set_string(&bufs, 3, MDD_OBD_PROFILE);
86         lustre_cfg_bufs_set_string(&bufs, 4, (char*)dev);
87
88         lcfg = lustre_cfg_new(LCFG_ATTACH, &bufs);
89         if (!lcfg)
90                 RETURN(-ENOMEM);
91
92         rc = class_attach(lcfg);
93         if (rc)
94                 GOTO(lcfg_cleanup, rc);
95
96         obd = class_name2obd(MDD_OBD_NAME);
97         if (!obd) {
98                 CERROR("can not find obd %s \n", MDD_OBD_NAME);
99                 LBUG();
100         }
101
102         rc = class_setup(obd, lcfg);
103         if (rc)
104                 GOTO(class_detach, rc);
105         /*Add here for obd notify mechiasm,
106          *when adding a new ost, the mds will notify this mdd*/
107
108         obd->obd_upcall.onu_owner = mdd;
109         obd->obd_upcall.onu_upcall = mdd_lov_update;
110         mdd->mdd_md_dev.md_lu_dev.ld_obd = obd;
111 class_detach:
112         if (rc)
113                 class_detach(obd, lcfg);
114 lcfg_cleanup:
115         lustre_cfg_free(lcfg);
116         RETURN(rc);
117 }
118
119 int mdd_cleanup_obd(struct mdd_device *mdd)
120 {
121         struct lustre_cfg_bufs bufs;
122         struct lustre_cfg      *lcfg;
123         struct obd_device      *obd;
124         int rc;
125         ENTRY;
126
127         obd = mdd->mdd_md_dev.md_lu_dev.ld_obd;
128         LASSERT(obd);
129
130         lustre_cfg_bufs_reset(&bufs, MDD_OBD_NAME);
131         lcfg = lustre_cfg_new(LCFG_ATTACH, &bufs);
132         if (!lcfg)
133                 RETURN(-ENOMEM);
134
135         rc = class_cleanup(obd, lcfg);
136         if (rc)
137                 GOTO(lcfg_cleanup, rc);
138
139         rc = class_detach(obd, lcfg);
140         if (rc)
141                 GOTO(lcfg_cleanup, rc);
142         mdd->mdd_md_dev.md_lu_dev.ld_obd = NULL;
143 lcfg_cleanup:
144         lustre_cfg_free(lcfg);
145         RETURN(rc);
146 }
147
148 int mdd_get_md(const struct lu_context *ctxt, struct mdd_object *obj,
149                void *md, int *md_size, int need_locked)
150 {
151         struct dt_object *next;
152         int rc = 0;
153         ENTRY;
154
155         if (need_locked)
156                 mdd_lock(ctxt, obj, DT_READ_LOCK);
157         next = mdd_object_child(obj);
158         rc = next->do_ops->do_xattr_get(ctxt, next, md, *md_size,
159                                         MDS_LOV_MD_NAME);
160         /*
161          * XXX: handling of -ENODATA, the right way is to have ->do_md_get()
162          * exported by dt layer.
163          */
164         if (rc == 0 || rc == -ENODATA) {
165                 *md_size = 0;
166                 rc = 0;
167         } else if (rc < 0) {
168                 CERROR("Error %d reading eadata \n", rc);
169         } else if (rc > 0) {
170                 /*FIXME convert lov EA necessary for this version?*/
171                 *md_size = rc;
172         }
173
174         if (need_locked)
175                 mdd_unlock(ctxt, obj, DT_READ_LOCK);
176
177         RETURN (rc);
178 }
179
180 static int mdd_lov_set_stripe_md(const struct lu_context *ctxt,
181                                  struct mdd_object *obj, struct lov_mds_md *lmmp,
182                                  int lmm_size, struct thandle *handle)
183 {
184         struct mdd_device       *mdd = mdo2mdd(&obj->mod_obj);
185         struct obd_device       *obd = mdd2_obd(mdd);
186         struct obd_export       *lov_exp = obd->u.mds.mds_osc_exp;
187         struct lov_stripe_md    *lsm = NULL;
188         int rc;
189         ENTRY;
190         
191         LASSERT(S_ISDIR(mdd_object_type(obj)) || S_ISREG(mdd_object_type(obj)));
192
193         rc = obd_iocontrol(OBD_IOC_LOV_SETSTRIPE, lov_exp, 0, &lsm, lmmp);
194         if (rc)
195                 RETURN(rc);
196         obd_free_memmd(lov_exp, &lsm);
197
198         rc = mdd_xattr_set_txn(ctxt, obj, lmmp, lmm_size, MDS_LOV_MD_NAME, 0, 
199                                handle);
200         
201         CDEBUG(D_INFO, "set lov ea of "DFID" rc %d \n", PFID(mdo2fid(obj)), rc);
202         RETURN(rc);
203 }
204                 
205 static int mdd_lov_set_dir_md(const struct lu_context *ctxt, 
206                               struct mdd_object *obj, struct lov_mds_md *lmmp,
207                               int lmm_size, struct thandle *handle)
208 {
209         struct lov_user_md *lum = NULL;
210         int rc = 0;
211         ENTRY;
212
213         /*TODO check permission*/
214         LASSERT(S_ISDIR(mdd_object_type(obj)));
215         lum = (struct lov_user_md*)lmmp;
216
217         /* if { size, offset, count } = { 0, -1, 0 } (i.e. all default
218          * values specified) then delete default striping from dir. */
219         if ((lum->lmm_stripe_size == 0 && lum->lmm_stripe_count == 0 && 
220              lum->lmm_stripe_offset == (typeof(lum->lmm_stripe_offset))(-1)) ||
221              /* lmm_stripe_size == -1 is deprecated in 1.4.6 */
222              lum->lmm_stripe_size == (typeof(lum->lmm_stripe_size))(-1)){
223                 rc = mdd_xattr_set_txn(ctxt, obj, NULL, 0, MDS_LOV_MD_NAME, 0, 
224                                        handle);
225                 CDEBUG(D_INFO, "delete lov ea of "DFID" rc %d \n",
226                                 PFID(mdo2fid(obj)), rc);
227         } else {
228                 rc = mdd_lov_set_stripe_md(ctxt, obj, lmmp, lmm_size, handle); 
229         }
230         RETURN(rc);
231 }
232         
233 int mdd_lov_set_md(const struct lu_context *ctxt, struct mdd_object *pobj,
234                    struct mdd_object *child, struct lov_mds_md *lmmp,
235                    int lmm_size, struct thandle *handle, int set_stripe)
236 {
237         int rc = 0;
238         ENTRY;
239
240         if (S_ISREG(mdd_object_type(child)) && lmm_size > 0) {
241                 if (set_stripe) {
242                         rc = mdd_lov_set_stripe_md(ctxt, child, lmmp, lmm_size,
243                                                    handle);
244                 } else {
245                         rc = mdd_xattr_set_txn(ctxt, child, lmmp, lmm_size,
246                                                MDS_LOV_MD_NAME, 0, handle);
247                 }
248         } else  if (S_ISDIR(mdd_object_type(child))) {
249                 if (lmmp == NULL && lmm_size == 0) {
250                         struct lov_mds_md *lmm = &mdd_ctx_info(ctxt)->mti_lmm;
251                         int size = sizeof(lmm);
252                         /*Get parent dir stripe and set*/
253                         rc = mdd_get_md(ctxt, pobj, &lmm, &size, 0);
254                         if (rc > 0) {
255                                 rc = mdd_xattr_set_txn(ctxt, child, lmm, size,
256                                                MDS_LOV_MD_NAME, 0, handle);
257                                 if (rc)
258                                         CERROR("error on copy stripe info: rc = %d\n",
259                                                 rc);
260                         }
261                 } else {
262                        LASSERT(lmmp != NULL && lmm_size > 0);
263                         /*delete lmm*/
264                        rc = mdd_lov_set_dir_md(ctxt, child, lmmp, lmm_size, handle);
265                 }
266         }
267         CDEBUG(D_INFO, "Set lov md %p size %d for fid "DFID" rc%d\n",
268                         lmmp, lmm_size, PFID(mdo2fid(child)), rc);
269         RETURN(rc);
270 }
271
272 /*FIXME: this is for create lsm object id, which should identify the
273  * lsm object unique in the whole mds, as I see. But it seems, we
274  * still not need it now. right? so just borrow the ll_fid_build_ino
275  */
276 static obd_id mdd_lov_create_id(const struct lu_fid *fid)
277 {
278         return ((fid_seq(fid) - 1) * LUSTRE_SEQ_MAX_WIDTH + fid_oid(fid));
279 }
280
281 /*FIXME: it is just the helper function used by mdd lov obd to 
282  * get attr from obdo, copied from obdo_from_inode*/
283 static void obdo_from_la(struct obdo *dst, struct lu_attr *la, obd_flag valid)
284 {
285         obd_flag newvalid = 0;
286
287         if (valid & OBD_MD_FLATIME) {
288                 dst->o_atime = la->la_atime;
289                 newvalid |= OBD_MD_FLATIME;
290         }
291         if (valid & OBD_MD_FLMTIME) {
292                 dst->o_mtime = la->la_mtime;
293                 newvalid |= OBD_MD_FLMTIME;
294         }
295         if (valid & OBD_MD_FLCTIME) {
296                 dst->o_ctime = la->la_ctime;
297                 newvalid |= OBD_MD_FLCTIME;
298         }
299         if (valid & OBD_MD_FLSIZE) {
300                 dst->o_size = la->la_size;
301                 newvalid |= OBD_MD_FLSIZE;
302         }
303         if (valid & OBD_MD_FLBLOCKS) {  /* allocation of space (x512 bytes) */
304                 dst->o_blocks = la->la_blocks;
305                 newvalid |= OBD_MD_FLBLOCKS;
306         }
307         if (valid & OBD_MD_FLTYPE) {
308                 dst->o_mode = (la->la_mode & S_IALLUGO)|(la->la_mode & S_IFMT);
309                 newvalid |= OBD_MD_FLTYPE;
310         }
311         if (valid & OBD_MD_FLMODE) {
312                 dst->o_mode = (la->la_mode & S_IFMT)|(la->la_mode & S_IALLUGO);
313                 newvalid |= OBD_MD_FLMODE;
314         }
315         if (valid & OBD_MD_FLUID) {
316                 dst->o_uid = la->la_uid;
317                 newvalid |= OBD_MD_FLUID;
318         }
319         if (valid & OBD_MD_FLGID) {
320                 dst->o_gid = la->la_gid;
321                 newvalid |= OBD_MD_FLGID;
322         }
323         dst->o_valid |= newvalid;
324 }
325
326 int mdd_lov_create(const struct lu_context *ctxt, struct mdd_device *mdd,
327                    struct mdd_object *parent, struct mdd_object *child,
328                    struct lov_mds_md **lmm, int *lmm_size,
329                    const struct md_create_spec *spec, struct lu_attr *la)
330 {
331         struct obd_device       *obd = mdd2_obd(mdd);
332         struct obd_export       *lov_exp = obd->u.mds.mds_osc_exp;
333         struct obdo             *oa;
334         struct lov_stripe_md    *lsm = NULL;
335         const void              *eadata = spec->u.sp_ea.eadata;
336 /*      int                      eadatasize  = spec->u.sp_ea.eadatalen;*/
337         __u32                    create_flags = spec->sp_cr_flags;
338         int                      rc = 0;
339         ENTRY;
340
341         if (create_flags & MDS_OPEN_DELAY_CREATE ||
342                         !(create_flags & FMODE_WRITE))
343                 RETURN(0);
344
345         oa = obdo_alloc();
346
347         oa->o_uid = 0; /* must have 0 uid / gid on OST */
348         oa->o_gid = 0;
349         oa->o_mode = S_IFREG | 0600;
350         oa->o_id = mdd_lov_create_id(lu_object_fid(mdd2lu_obj(child)));
351         oa->o_valid = OBD_MD_FLID | OBD_MD_FLTYPE | OBD_MD_FLFLAGS |
352                 OBD_MD_FLMODE | OBD_MD_FLUID | OBD_MD_FLGID;
353         oa->o_size = 0;
354
355         if (!(create_flags & MDS_OPEN_HAS_OBJS)) {
356                 if (create_flags & MDS_OPEN_HAS_EA) {
357                         LASSERT(eadata != NULL);
358                         rc = obd_iocontrol(OBD_IOC_LOV_SETSTRIPE, lov_exp,
359                                            0, &lsm, (void*)eadata);
360                         if (rc)
361                                 GOTO(out_oa, rc);
362                 } else {
363                         /* get lov ea from parent and set to lov */
364                         struct lov_mds_md *__lmm;
365                         int __lmm_size, returned_lmm_size;
366                         __lmm_size = mdd_lov_mdsize(ctxt, mdd);
367
368                         OBD_ALLOC(__lmm, __lmm_size);
369                         if (__lmm == NULL)
370                                 GOTO(out_oa, rc = -ENOMEM);
371
372                         rc = mdd_get_md(ctxt, parent, __lmm,
373                                         &returned_lmm_size, 1);
374                         if (rc > 0)
375                                 rc = obd_iocontrol(OBD_IOC_LOV_SETSTRIPE,
376                                                    lov_exp, 0, &lsm, __lmm);
377                         OBD_FREE(__lmm, __lmm_size);
378                         if (rc)
379                                 GOTO(out_oa, rc);
380                 }
381                 rc = obd_create(lov_exp, oa, &lsm, NULL);
382                 if (rc) {
383                         if (rc > 0) {
384                                 CERROR("create errro for "DFID": %d \n",
385                                        PFID(mdo2fid(child)), rc);
386                                 rc = -EIO;
387                         }
388                         GOTO(out_oa, rc);
389                 }
390         } else {
391                 LASSERT(eadata != NULL);
392                 rc = obd_iocontrol(OBD_IOC_LOV_SETEA, lov_exp, 0, &lsm,
393                                    (void*)eadata);
394                 if (rc) 
395                         GOTO(out_oa, rc);
396                 lsm->lsm_object_id = oa->o_id;
397         }
398         /*Sometimes, we may truncate some object(without lsm) 
399          *then open (with write flags)it, so creating lsm above. 
400          *The Nonzero(truncated) size should tell ost. since size 
401          *attr is in charged by OST.
402          */
403         if (la->la_size && la->la_valid & LA_SIZE) {
404                 oa->o_size = la->la_size;
405                 obdo_from_la(oa, la, OBD_MD_FLTYPE | OBD_MD_FLATIME |
406                                 OBD_MD_FLMTIME | OBD_MD_FLCTIME | OBD_MD_FLSIZE);
407
408                 /* FIXME:pack lustre id to OST, in OST, it will be packed 
409                  * by filter_fid, but can not see what is the usages. So just 
410                  * pack o_seq o_ver here, maybe fix it after this cycle*/
411                 oa->o_fid = lu_object_fid(mdd2lu_obj(child))->f_seq;
412                 oa->o_generation = lu_object_fid(mdd2lu_obj(child))->f_oid;
413                 oa->o_valid |= OBD_MD_FLFID | OBD_MD_FLGENER;
414
415                 rc = obd_setattr(lov_exp, oa, lsm, NULL);
416                 if (rc) {
417                         CERROR("error setting attrs for "DFID": rc %d\n",
418                                PFID(mdo2fid(child)), rc);
419                         if (rc > 0) {
420                                 CERROR("obd_setattr for "DFID" rc %d\n", 
421                                         PFID(mdo2fid(child)), rc);
422                                 rc = -EIO;
423                         }
424                         GOTO(out_oa, rc);
425                 }
426         }
427         /*blksize should be changed after create data object*/
428         la->la_valid |= LA_BLKSIZE;
429         la->la_blksize = oa->o_blksize;
430
431         rc = obd_packmd(lov_exp, lmm, lsm);
432         if (rc < 0) {
433                 CERROR("cannot pack lsm, err = %d\n", rc);
434                 GOTO(out_oa, rc);
435         }
436         *lmm_size = rc;
437         rc = 0;
438 out_oa:
439         obdo_free(oa);
440         if (lsm)
441                 obd_free_memmd(lov_exp, &lsm);
442         RETURN(rc);
443 }
444
445 int mdd_unlink_log(const struct lu_context *ctxt, struct mdd_device *mdd,
446                    struct mdd_object *mdd_cobj, struct md_attr *ma)
447 {
448         struct obd_device *obd = mdd2_obd(mdd);
449
450         if (mds_log_op_unlink(obd, NULL, ma->ma_lmm, ma->ma_lmm_size,
451                                  ma->ma_cookie, ma->ma_cookie_size) > 0) {
452                 ma->ma_valid |= MA_COOKIE;
453         }
454         return 0;
455 }
456
457 int mdd_lov_setattr_async(const struct lu_context *ctxt, struct mdd_object *obj,
458                           struct lov_mds_md *lmm, int lmm_size)
459 {
460         struct mdd_device       *mdd = mdo2mdd(&obj->mod_obj);
461         struct obd_device       *obd = mdd2_obd(mdd);
462         struct lu_attr          *tmp_la = &mdd_ctx_info(ctxt)->mti_la;
463         struct dt_object        *next = mdd_object_child(obj);
464         __u32  seq  = lu_object_fid(mdd2lu_obj(obj))->f_seq;
465         __u32  oid  = lu_object_fid(mdd2lu_obj(obj))->f_oid;
466         int rc = 0;
467         ENTRY;
468
469         rc = next->do_ops->do_attr_get(ctxt, next, tmp_la);
470         if (rc)
471                 RETURN(rc);
472
473         rc = mds_osc_setattr_async(obd, tmp_la->la_uid, tmp_la->la_gid, lmm,
474                                    lmm_size, NULL, seq, oid);
475
476         RETURN(rc);
477 }
478