Whamcloud - gitweb
2ec38a768a214dd304ef8eb3c657bc3bb1b2f4ae
[fs/lustre-release.git] / lustre / mdd / mdd_lov.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/mdd/mdd_lov.c
37  *
38  * Lustre Metadata Server (mds) handling of striped file data
39  *
40  * Author: Peter Braam <braam@clusterfs.com>
41  * Author: wangdi <wangdi@clusterfs.com>
42  */
43
44 #ifndef EXPORT_SYMTAB
45 # define EXPORT_SYMTAB
46 #endif
47 #define DEBUG_SUBSYSTEM S_MDS
48
49 #include <linux/module.h>
50 #include <obd.h>
51 #include <obd_class.h>
52 #include <lustre_ver.h>
53 #include <obd_support.h>
54 #include <obd_lov.h>
55 #include <lprocfs_status.h>
56 #include <lustre_mds.h>
57 #include <lustre_fid.h>
58 #include <lustre/lustre_idl.h>
59
60 #include "mdd_internal.h"
61
62 static int mdd_notify(struct obd_device *host, struct obd_device *watched,
63                       enum obd_notify_event ev, void *owner, void *data)
64 {
65         struct mdd_device *mdd = owner;
66         int rc = 0;
67         ENTRY;
68
69         LASSERT(owner != NULL);
70         switch (ev)
71         {
72                 case OBD_NOTIFY_ACTIVE:
73                 case OBD_NOTIFY_SYNC:
74                 case OBD_NOTIFY_SYNC_NONBLOCK:
75                         rc = md_do_upcall(NULL, &mdd->mdd_md_dev,
76                                           MD_LOV_SYNC, data);
77                         break;
78                 case OBD_NOTIFY_CONFIG:
79                         rc = md_do_upcall(NULL, &mdd->mdd_md_dev,
80                                           MD_LOV_CONFIG, data);
81                         break;
82 #ifdef HAVE_QUOTA_SUPPORT
83                 case OBD_NOTIFY_QUOTA:
84                         rc = md_do_upcall(NULL, &mdd->mdd_md_dev,
85                                           MD_LOV_QUOTA, data);
86                         break;
87 #endif
88                 default:
89                         CDEBUG(D_INFO, "Unhandled notification %#x\n", ev);
90         }
91
92         RETURN(rc);
93 }
94
95 /* The obd is created for handling data stack for mdd */
96 int mdd_init_obd(const struct lu_env *env, struct mdd_device *mdd,
97                  struct lustre_cfg *cfg)
98 {
99         char                   *dev = lustre_cfg_string(cfg, 0);
100         int                     rc, name_size, uuid_size;
101         char                   *name, *uuid;
102         __u32                   mds_id;
103         struct lustre_cfg_bufs *bufs;
104         struct lustre_cfg      *lcfg;
105         struct obd_device      *obd;
106         ENTRY;
107
108         mds_id = lu_site2md(mdd2lu_dev(mdd)->ld_site)->ms_node_id;
109         name_size = strlen(MDD_OBD_NAME) + 35;
110         uuid_size = strlen(MDD_OBD_UUID) + 35;
111
112         OBD_ALLOC(name, name_size);
113         OBD_ALLOC(uuid, uuid_size);
114         if (name == NULL || uuid == NULL)
115                 GOTO(cleanup_mem, rc = -ENOMEM);
116
117         OBD_ALLOC_PTR(bufs);
118         if (!bufs)
119                 GOTO(cleanup_mem, rc = -ENOMEM);
120
121         snprintf(name, strlen(MDD_OBD_NAME) + 35, "%s-%s-%d",
122                  MDD_OBD_NAME, dev, mds_id);
123
124         snprintf(uuid, strlen(MDD_OBD_UUID) + 35, "%s-%s-%d",
125                  MDD_OBD_UUID, dev, mds_id);
126
127         lustre_cfg_bufs_reset(bufs, name);
128         lustre_cfg_bufs_set_string(bufs, 1, MDD_OBD_TYPE);
129         lustre_cfg_bufs_set_string(bufs, 2, uuid);
130         lustre_cfg_bufs_set_string(bufs, 3, (char*)dev/* MDD_OBD_PROFILE */);
131         lustre_cfg_bufs_set_string(bufs, 4, (char*)dev);
132
133         lcfg = lustre_cfg_new(LCFG_ATTACH, bufs);
134         OBD_FREE_PTR(bufs);
135         if (!lcfg)
136                 GOTO(cleanup_mem, rc = -ENOMEM);
137
138         rc = class_attach(lcfg);
139         if (rc)
140                 GOTO(lcfg_cleanup, rc);
141
142         obd = class_name2obd(name);
143         if (!obd) {
144                 CERROR("Can not find obd %s\n", MDD_OBD_NAME);
145                 LBUG();
146         }
147
148         cfs_spin_lock(&obd->obd_dev_lock);
149         obd->obd_recovering = 1;
150         cfs_spin_unlock(&obd->obd_dev_lock);
151         obd->u.mds.mds_id = mds_id;
152         rc = class_setup(obd, lcfg);
153         if (rc)
154                 GOTO(class_detach, rc);
155
156         /*
157          * Add here for obd notify mechanism, when adding a new ost, the mds
158          * will notify this mdd. The mds will be used for quota also.
159          */
160         obd->obd_upcall.onu_upcall = mdd_notify;
161         obd->obd_upcall.onu_owner = mdd;
162         mdd->mdd_obd_dev = obd;
163         EXIT;
164 class_detach:
165         if (rc)
166                 class_detach(obd, lcfg);
167 lcfg_cleanup:
168         lustre_cfg_free(lcfg);
169 cleanup_mem:
170         if (name)
171                 OBD_FREE(name, name_size);
172         if (uuid)
173                 OBD_FREE(uuid, uuid_size);
174         return rc;
175 }
176
177 int mdd_fini_obd(const struct lu_env *env, struct mdd_device *mdd,
178                  struct lustre_cfg *lcfg)
179 {
180         struct obd_device      *obd;
181         int rc;
182         ENTRY;
183
184         obd = mdd2obd_dev(mdd);
185         LASSERT(obd);
186
187         rc = class_cleanup(obd, lcfg);
188         if (rc)
189                 GOTO(lcfg_cleanup, rc);
190
191         obd->obd_upcall.onu_upcall = NULL;
192         obd->obd_upcall.onu_owner = NULL;
193         rc = class_detach(obd, lcfg);
194         if (rc)
195                 GOTO(lcfg_cleanup, rc);
196         mdd->mdd_obd_dev = NULL;
197
198         EXIT;
199 lcfg_cleanup:
200         return rc;
201 }
202
203 int mdd_get_md(const struct lu_env *env, struct mdd_object *obj,
204                void *md, int *md_size, const char *name)
205 {
206         int rc;
207         ENTRY;
208
209         rc = mdo_xattr_get(env, obj, mdd_buf_get(env, md, *md_size), name,
210                            mdd_object_capa(env, obj));
211         /*
212          * XXX: Handling of -ENODATA, the right way is to have ->do_md_get()
213          * exported by dt layer.
214          */
215         if (rc == 0 || rc == -ENODATA) {
216                 *md_size = 0;
217                 rc = 0;
218         } else if (rc < 0) {
219                 CERROR("Error %d reading eadata - %d\n", rc, *md_size);
220         } else {
221                 /* XXX: Convert lov EA but fixed after verification test. */
222                 *md_size = rc;
223         }
224
225         RETURN(rc);
226 }
227
228 int mdd_get_md_locked(const struct lu_env *env, struct mdd_object *obj,
229                       void *md, int *md_size, const char *name)
230 {
231         int rc = 0;
232         mdd_read_lock(env, obj, MOR_TGT_CHILD);
233         rc = mdd_get_md(env, obj, md, md_size, name);
234         mdd_read_unlock(env, obj);
235         return rc;
236 }
237
238 static int mdd_lov_set_stripe_md(const struct lu_env *env,
239                                  struct mdd_object *obj, struct lu_buf *buf,
240                                  struct thandle *handle)
241 {
242         struct mdd_device       *mdd = mdo2mdd(&obj->mod_obj);
243         struct obd_device       *obd = mdd2obd_dev(mdd);
244         struct obd_export       *lov_exp = obd->u.mds.mds_lov_exp;
245         struct lov_stripe_md    *lsm = NULL;
246         int rc;
247         ENTRY;
248
249         LASSERT(S_ISDIR(mdd_object_type(obj)) || S_ISREG(mdd_object_type(obj)));
250         rc = obd_iocontrol(OBD_IOC_LOV_SETSTRIPE, lov_exp, 0,
251                            &lsm, buf->lb_buf);
252         if (rc)
253                 RETURN(rc);
254         obd_free_memmd(lov_exp, &lsm);
255
256         rc = mdd_xattr_set_txn(env, obj, buf, XATTR_NAME_LOV, 0, handle);
257
258         CDEBUG(D_INFO, "set lov ea of "DFID" rc %d \n", PFID(mdo2fid(obj)), rc);
259         RETURN(rc);
260 }
261
262 /*
263  * Permission check is done before call it,
264  * no need check again.
265  */
266 static int mdd_lov_set_dir_md(const struct lu_env *env,
267                               struct mdd_object *obj, struct lu_buf *buf,
268                               struct thandle *handle)
269 {
270         struct lov_user_md *lum = NULL;
271         int rc = 0;
272         ENTRY;
273
274         LASSERT(S_ISDIR(mdd_object_type(obj)));
275         lum = (struct lov_user_md*)buf->lb_buf;
276
277         /* if { size, offset, count } = { 0, -1, 0 } and no pool (i.e. all default
278          * values specified) then delete default striping from dir. */
279         if (LOVEA_DELETE_VALUES(lum->lmm_stripe_size, lum->lmm_stripe_count,
280                                 lum->lmm_stripe_offset) &&
281             lum->lmm_magic != LOV_USER_MAGIC_V3) {
282                 rc = mdd_xattr_set_txn(env, obj, &LU_BUF_NULL,
283                                        XATTR_NAME_LOV, 0, handle);
284                 if (rc == -ENODATA)
285                         rc = 0;
286                 CDEBUG(D_INFO, "delete lov ea of "DFID" rc %d \n",
287                                 PFID(mdo2fid(obj)), rc);
288         } else {
289                 rc = mdd_lov_set_stripe_md(env, obj, buf, handle);
290         }
291         RETURN(rc);
292 }
293
294 int mdd_lsm_sanity_check(const struct lu_env *env,  struct mdd_object *obj)
295 {
296         struct lu_attr   *tmp_la = &mdd_env_info(env)->mti_la;
297         struct md_ucred  *uc     = md_ucred(env);
298         int rc;
299         ENTRY;
300
301         rc = mdd_la_get(env, obj, tmp_la, BYPASS_CAPA);
302         if (rc)
303                 RETURN(rc);
304
305         if ((uc->mu_fsuid != tmp_la->la_uid) &&
306             !mdd_capable(uc, CFS_CAP_FOWNER))
307                 rc = mdd_permission_internal_locked(env, obj, tmp_la,
308                                                     MAY_WRITE, MOR_TGT_CHILD);
309
310         RETURN(rc);
311 }
312
313 int mdd_lov_set_md(const struct lu_env *env, struct mdd_object *pobj,
314                    struct mdd_object *child, struct lov_mds_md *lmmp,
315                    int lmm_size, struct thandle *handle, int set_stripe)
316 {
317         struct lu_buf *buf;
318         cfs_umode_t mode;
319         int rc = 0;
320         ENTRY;
321
322         buf = mdd_buf_get(env, lmmp, lmm_size);
323         mode = mdd_object_type(child);
324         if (S_ISREG(mode) && lmm_size > 0) {
325                 if (set_stripe) {
326                         rc = mdd_lov_set_stripe_md(env, child, buf, handle);
327                 } else {
328                         rc = mdd_xattr_set_txn(env, child, buf,
329                                                XATTR_NAME_LOV, 0, handle);
330                 }
331         } else if (S_ISDIR(mode)) {
332                 if (lmmp == NULL && lmm_size == 0) {
333                         struct mdd_device *mdd = mdd_obj2mdd_dev(child);
334                         struct lov_mds_md *lmm = mdd_max_lmm_get(env, mdd);
335                         int size = sizeof(struct lov_mds_md_v3);
336
337                         /* Get parent dir stripe and set */
338                         if (pobj != NULL)
339                                 rc = mdd_get_md_locked(env, pobj, lmm, &size,
340                                                        XATTR_NAME_LOV);
341                         if (rc > 0) {
342                                 buf = mdd_buf_get(env, lmm, size);
343                                 rc = mdd_xattr_set_txn(env, child, buf,
344                                                XATTR_NAME_LOV, 0, handle);
345                                 if (rc)
346                                         CERROR("error on copy stripe info: rc "
347                                                 "= %d\n", rc);
348                         }
349                 } else {
350                         LASSERT(lmmp != NULL && lmm_size > 0);
351                         rc = mdd_lov_set_dir_md(env, child, buf, handle);
352                 }
353         }
354         CDEBUG(D_INFO, "Set lov md %p size %d for fid "DFID" rc %d\n",
355                         lmmp, lmm_size, PFID(mdo2fid(child)), rc);
356         RETURN(rc);
357 }
358
359 int mdd_lov_objid_prepare(struct mdd_device *mdd, struct lov_mds_md *lmm)
360 {
361         /* copy mds_lov code is using wrong layer */
362         return mds_lov_prepare_objids(mdd->mdd_obd_dev, lmm);
363 }
364
365 void mdd_lov_objid_update(struct mdd_device *mdd, struct lov_mds_md *lmm)
366 {
367         /* copy mds_lov code is using wrong layer */
368         mds_lov_update_objids(mdd->mdd_obd_dev, lmm);
369 }
370
371 void mdd_lov_create_finish(const struct lu_env *env, struct mdd_device *mdd,
372                            struct lov_mds_md *lmm, int lmm_size,
373                            const struct md_op_spec *spec)
374 {
375         if (lmm && !spec->no_create)
376                 OBD_FREE(lmm, lmm_size);
377 }
378
379 int mdd_lov_create(const struct lu_env *env, struct mdd_device *mdd,
380                    struct mdd_object *parent, struct mdd_object *child,
381                    struct lov_mds_md **lmm, int *lmm_size,
382                    const struct md_op_spec *spec, struct lu_attr *la)
383 {
384         struct obd_device     *obd = mdd2obd_dev(mdd);
385         struct obd_export     *lov_exp = obd->u.mds.mds_lov_exp;
386         struct lu_site        *site = mdd2lu_dev(mdd)->ld_site;
387         struct obdo           *oa;
388         struct lov_stripe_md  *lsm = NULL;
389         const void            *eadata = spec->u.sp_ea.eadata;
390         __u32                  create_flags = spec->sp_cr_flags;
391         struct obd_trans_info *oti = &mdd_env_info(env)->mti_oti;
392         int                    rc = 0;
393         ENTRY;
394
395         if (!md_should_create(create_flags)) {
396                 *lmm_size = 0;
397                 RETURN(0);
398         }
399         oti_init(oti, NULL);
400
401         /* replay case, has objects already, only get lov from eadata */
402         if (spec->no_create != 0) {
403                 *lmm = (struct lov_mds_md *)spec->u.sp_ea.eadata;
404                 *lmm_size = spec->u.sp_ea.eadatalen;
405                 if (*lmm_size == lov_mds_md_size((*lmm)->lmm_stripe_count,
406                                                  (*lmm)->lmm_magic)) {
407                         RETURN(0);
408                 } else {
409                         CERROR("incorrect lsm received during recovery\n");
410                         RETURN(-EPROTO);
411                 }
412         }
413
414         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_ALLOC_OBDO))
415                 GOTO(out_ids, rc = -ENOMEM);
416
417         LASSERT(lov_exp != NULL);
418         oa = &mdd_env_info(env)->mti_oa;
419
420         oa->o_uid = 0; /* must have 0 uid / gid on OST */
421         oa->o_gid = 0;
422         oa->o_seq = mdt_to_obd_objseq(lu_site2md(site)->ms_node_id);
423         oa->o_mode = S_IFREG | 0600;
424         oa->o_id = fid_ver_oid(mdd_object_fid(child));
425         oa->o_valid = OBD_MD_FLID | OBD_MD_FLTYPE | OBD_MD_FLFLAGS |
426                 OBD_MD_FLMODE | OBD_MD_FLUID | OBD_MD_FLGID | OBD_MD_FLGROUP;
427         oa->o_size = 0;
428
429         if (!(create_flags & MDS_OPEN_HAS_OBJS)) {
430                 if (create_flags & MDS_OPEN_HAS_EA) {
431                         LASSERT(eadata != NULL);
432                         rc = obd_iocontrol(OBD_IOC_LOV_SETSTRIPE, lov_exp,
433                                            0, &lsm, (void*)eadata);
434                         if (rc)
435                                 GOTO(out_oti, rc);
436                 } else if (parent != NULL) {
437                         /* get lov ea from parent and set to lov */
438                         struct lov_mds_md *_lmm;
439                         int _lmm_size;
440
441                         _lmm_size = mdd_lov_mdsize(env, mdd);
442                         _lmm = mdd_max_lmm_get(env, mdd);
443
444                         if (_lmm == NULL)
445                                 GOTO(out_oti, rc = -ENOMEM);
446
447                         rc = mdd_get_md_locked(env, parent, _lmm,
448                                                &_lmm_size,
449                                                XATTR_NAME_LOV);
450                         if (rc > 0)
451                                 rc = obd_iocontrol(OBD_IOC_LOV_SETSTRIPE,
452                                                    lov_exp, *lmm_size,
453                                                    &lsm, _lmm);
454
455                         if (rc)
456                                 GOTO(out_oti, rc);
457                 }
458
459                 OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_OPEN_WAIT_CREATE, 10);
460                 rc = obd_create(lov_exp, oa, &lsm, oti);
461                 if (rc) {
462                         if (rc > 0) {
463                                 CERROR("Create error for "DFID": %d\n",
464                                        PFID(mdo2fid(child)), rc);
465                                 rc = -EIO;
466                         }
467                         GOTO(out_oti, rc);
468                 }
469                 LASSERT_SEQ_IS_MDT(lsm->lsm_object_seq);
470         } else {
471                 LASSERT(eadata != NULL);
472                 rc = obd_iocontrol(OBD_IOC_LOV_SETEA, lov_exp, 0, &lsm,
473                                    (void*)eadata);
474                 if (rc)
475                         GOTO(out_oti, rc);
476
477         }
478
479         lsm->lsm_object_id = fid_ver_oid(mdd_object_fid(child));
480         lsm->lsm_object_seq = fid_seq(mdd_object_fid(child));
481         /*
482          * Sometimes, we may truncate some object(without lsm) then open it
483          * (with write flags), so creating lsm above.  The Nonzero(truncated)
484          * size should tell ost, since size attr is in charge by OST.
485          */
486         if (la->la_size && la->la_valid & LA_SIZE) {
487                 struct obd_info *oinfo = &mdd_env_info(env)->mti_oi;
488
489                 memset(oinfo, 0, sizeof(*oinfo));
490
491                 /* When setting attr to ost, FLBKSZ is not needed. */
492                 oa->o_valid &= ~OBD_MD_FLBLKSZ;
493                 obdo_from_la(oa, la, OBD_MD_FLTYPE | OBD_MD_FLATIME |
494                              OBD_MD_FLMTIME | OBD_MD_FLCTIME | OBD_MD_FLSIZE);
495
496                 /*
497                  * XXX: Pack lustre id to OST, in OST, it will be packed by
498                  * filter_fid, but can not see what is the usages. So just pack
499                  * o_seq o_ver here, maybe fix it after this cycle.
500                  */
501                 obdo_from_inode(oa, NULL,
502                                 (struct lu_fid *)mdd_object_fid(child), 0);
503                 oinfo->oi_oa = oa;
504                 oinfo->oi_md = lsm;
505                 oinfo->oi_capa = NULL;
506                 oinfo->oi_policy.l_extent.start = la->la_size;
507                 oinfo->oi_policy.l_extent.end = OBD_OBJECT_EOF;
508
509                 rc = obd_punch_rqset(lov_exp, oinfo, oti);
510                 if (rc) {
511                         CERROR("Error setting attrs for "DFID": rc %d\n",
512                                PFID(mdo2fid(child)), rc);
513                         if (rc > 0) {
514                                 CERROR("obd_setattr for "DFID" rc %d\n",
515                                         PFID(mdo2fid(child)), rc);
516                                 rc = -EIO;
517                         }
518                         GOTO(out_oti, rc);
519                 }
520         }
521         /* blksize should be changed after create data object */
522         la->la_valid |= LA_BLKSIZE;
523         la->la_blksize = oa->o_blksize;
524         *lmm = NULL;
525         rc = obd_packmd(lov_exp, lmm, lsm);
526         if (rc < 0) {
527                 CERROR("Cannot pack lsm, err = %d\n", rc);
528                 GOTO(out_oti, rc);
529         }
530         if (mdd_lov_objid_prepare(mdd, *lmm) != 0) {
531                 CERROR("Not have memory for update objid\n");
532                 OBD_FREE(*lmm, rc);
533                 *lmm = NULL;
534                 GOTO(out_oti, rc = -ENOMEM);
535         }
536         *lmm_size = rc;
537         rc = 0;
538         EXIT;
539 out_oti:
540         oti_free_cookies(oti);
541 out_ids:
542         if (lsm)
543                 obd_free_memmd(lov_exp, &lsm);
544
545         return rc;
546 }
547
548 /*
549  * used when destroying orphans and from mds_reint_unlink() when MDS wants to
550  * destroy objects on OSS.
551  */
552 static
553 int mdd_lovobj_unlink(const struct lu_env *env, struct mdd_device *mdd,
554                       struct mdd_object *obj, struct lu_attr *la,
555                       struct lov_mds_md *lmm, int lmm_size,
556                       struct llog_cookie *logcookies,
557                       int log_unlink)
558 {
559         struct obd_device     *obd = mdd2obd_dev(mdd);
560         struct obd_export     *lov_exp = obd->u.mds.mds_lov_exp;
561         struct lov_stripe_md  *lsm = NULL;
562         struct obd_trans_info *oti = &mdd_env_info(env)->mti_oti;
563         struct obdo           *oa = &mdd_env_info(env)->mti_oa;
564         struct lu_site        *site = mdd2lu_dev(mdd)->ld_site;
565         int rc;
566         ENTRY;
567
568         if (lmm_size == 0)
569                 RETURN(0);
570
571         rc = obd_unpackmd(lov_exp, &lsm, lmm, lmm_size);
572         if (rc < 0) {
573                 CERROR("Error unpack md %p\n", lmm);
574                 RETURN(rc);
575         } else {
576                 LASSERT(rc >= sizeof(*lsm));
577                 rc = 0;
578         }
579
580         oa->o_id = lsm->lsm_object_id;
581         oa->o_seq = mdt_to_obd_objseq(lu_site2md(site)->ms_node_id);
582         oa->o_mode = la->la_mode & S_IFMT;
583         oa->o_valid = OBD_MD_FLID | OBD_MD_FLTYPE | OBD_MD_FLGROUP;
584
585         oti_init(oti, NULL);
586         if (log_unlink && logcookies) {
587                 oa->o_valid |= OBD_MD_FLCOOKIE;
588                 oti->oti_logcookies = logcookies;
589         }
590
591         CDEBUG(D_INFO, "destroying OSS object "LPU64":"LPU64"\n", oa->o_seq,
592                oa->o_id);
593
594         rc = obd_destroy(lov_exp, oa, lsm, oti, NULL, NULL);
595
596         obd_free_memmd(lov_exp, &lsm);
597         RETURN(rc);
598 }
599
600 /*
601  * called with obj locked. 
602  */
603 int mdd_lov_destroy(const struct lu_env *env, struct mdd_device *mdd,
604                     struct mdd_object *obj, struct lu_attr *la)
605 {
606         struct md_attr    *ma = &mdd_env_info(env)->mti_ma;
607         int                rc;
608         ENTRY;
609
610         LASSERT(mdd_write_locked(env, obj) != 0);
611
612         if (unlikely(!S_ISREG(mdd_object_type(obj))))
613                 RETURN(0);
614
615         if (unlikely(la->la_nlink != 0)) {
616                 CWARN("Attempt to destroy OSS object when nlink == %d\n",
617                       la->la_nlink);
618                 RETURN(0);
619         }
620
621         ma->ma_lmm_size = mdd_lov_mdsize(env, mdd);
622         ma->ma_lmm = mdd_max_lmm_get(env, mdd);
623         ma->ma_cookie_size = mdd_lov_cookiesize(env, mdd);
624         ma->ma_cookie = mdd_max_cookie_get(env, mdd);
625         if (ma->ma_lmm == NULL || ma->ma_cookie == NULL)
626                 RETURN(rc = -ENOMEM);
627
628         /* get lov ea */
629
630         rc = mdd_get_md(env, obj, ma->ma_lmm, &ma->ma_lmm_size,
631                         XATTR_NAME_LOV);
632
633         if (rc <= 0) {
634                 CWARN("Get lov ea failed for "DFID" rc = %d\n",
635                          PFID(mdo2fid(obj)), rc);
636                 if (rc == 0)
637                         rc = -ENOENT;
638                 RETURN(rc);
639         }
640
641         ma->ma_valid = MA_LOV;
642
643         rc = mdd_unlink_log(env, mdd, obj, ma);
644         if (rc) {
645                 CWARN("mds unlink log for "DFID" failed: %d\n",
646                        PFID(mdo2fid(obj)), rc);
647                 RETURN(rc);
648         }
649
650         if (ma->ma_valid & MA_COOKIE)
651                 rc = mdd_lovobj_unlink(env, mdd, obj, la,
652                                        ma->ma_lmm, ma->ma_lmm_size,
653                                        ma->ma_cookie, 1);
654         RETURN(rc);
655 }
656
657 int mdd_unlink_log(const struct lu_env *env, struct mdd_device *mdd,
658                    struct mdd_object *mdd_cobj, struct md_attr *ma)
659 {
660         LASSERT(ma->ma_valid & MA_LOV);
661
662         if ((ma->ma_cookie_size > 0) &&
663             (mds_log_op_unlink(mdd2obd_dev(mdd), ma->ma_lmm, ma->ma_lmm_size,
664                                ma->ma_cookie, ma->ma_cookie_size) > 0)) {
665                 CDEBUG(D_HA, "DEBUG: unlink log is added for object "DFID"\n",
666                        PFID(mdd_object_fid(mdd_cobj)));
667                 ma->ma_valid |= MA_COOKIE;
668         }
669         return 0;
670 }
671
672 int mdd_log_op_setattr(struct obd_device *obd, __u32 uid, __u32 gid,
673                        struct lov_mds_md *lmm, int lmm_size,
674                        struct llog_cookie *logcookies, int cookies_size)
675 {
676         struct mds_obd *mds = &obd->u.mds;
677         struct lov_stripe_md *lsm = NULL;
678         struct llog_setattr64_rec *lsr;
679         struct llog_ctxt *ctxt;
680         int rc;
681         ENTRY;
682
683         if (IS_ERR(mds->mds_lov_obd))
684                 RETURN(PTR_ERR(mds->mds_lov_obd));
685
686         rc = obd_unpackmd(mds->mds_lov_exp, &lsm, lmm, lmm_size);
687         if (rc < 0)
688                 RETURN(rc);
689
690         OBD_ALLOC(lsr, sizeof(*lsr));
691         if (!lsr)
692                 GOTO(out, rc = -ENOMEM);
693
694         /* prepare setattr log record */
695         lsr->lsr_hdr.lrh_len = lsr->lsr_tail.lrt_len = sizeof(*lsr);
696         lsr->lsr_hdr.lrh_type = MDS_SETATTR64_REC;
697         lsr->lsr_uid = uid;
698         lsr->lsr_gid = gid;
699
700         /* write setattr log */
701         ctxt = llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
702         rc = llog_add(ctxt, &lsr->lsr_hdr, lsm, logcookies,
703                       cookies_size / sizeof(struct llog_cookie));
704
705         llog_ctxt_put(ctxt);
706
707         OBD_FREE(lsr, sizeof(*lsr));
708  out:
709         obd_free_memmd(mds->mds_lov_exp, &lsm);
710         RETURN(rc);
711 }
712
713 int mdd_setattr_log(const struct lu_env *env, struct mdd_device *mdd,
714                     const struct md_attr *ma,
715                     struct lov_mds_md *lmm, int lmm_size,
716                     struct llog_cookie *logcookies, int cookies_size)
717 {
718         struct obd_device *obd = mdd2obd_dev(mdd);
719
720         /* journal chown/chgrp in llog, just like unlink */
721         if (lmm_size > 0) {
722                 CDEBUG(D_INFO, "setattr llog for uid/gid=%lu/%lu\n",
723                         (unsigned long)ma->ma_attr.la_uid,
724                         (unsigned long)ma->ma_attr.la_gid);
725                 return mdd_log_op_setattr(obd, ma->ma_attr.la_uid,
726                                           ma->ma_attr.la_gid, lmm,
727                                           lmm_size, logcookies,
728                                           cookies_size);
729         } else
730                 return 0;
731 }
732
733 static int mdd_osc_setattr_async(struct obd_device *obd, __u32 uid, __u32 gid,
734                           struct lov_mds_md *lmm, int lmm_size,
735                           struct llog_cookie *logcookies, const struct lu_fid *parent,
736                           struct obd_capa *oc)
737 {
738         struct mds_obd *mds = &obd->u.mds;
739         struct obd_trans_info oti = { 0 };
740         struct obd_info oinfo = { { { 0 } } };
741         int rc;
742         ENTRY;
743
744         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_OST_SETATTR))
745                 RETURN(0);
746
747         /* first get memory EA */
748         OBDO_ALLOC(oinfo.oi_oa);
749         if (!oinfo.oi_oa)
750                 RETURN(-ENOMEM);
751
752         LASSERT(lmm);
753
754         rc = obd_unpackmd(mds->mds_lov_exp, &oinfo.oi_md, lmm, lmm_size);
755         if (rc < 0) {
756                 CERROR("Error unpack md %p for obj "DFID"\n", lmm,
757                         PFID(parent));
758                 GOTO(out, rc);
759         }
760
761         /* then fill oa */
762         oinfo.oi_oa->o_uid = uid;
763         oinfo.oi_oa->o_gid = gid;
764         oinfo.oi_oa->o_id = oinfo.oi_md->lsm_object_id;
765         oinfo.oi_oa->o_seq = oinfo.oi_md->lsm_object_seq;
766         oinfo.oi_oa->o_valid |= OBD_MD_FLID | OBD_MD_FLGROUP |
767                                 OBD_MD_FLUID | OBD_MD_FLGID;
768         if (logcookies) {
769                 oinfo.oi_oa->o_valid |= OBD_MD_FLCOOKIE;
770                 oti.oti_logcookies = logcookies;
771         }
772
773         obdo_from_inode(oinfo.oi_oa, NULL, (struct lu_fid *)parent, 0);
774         oinfo.oi_capa = oc;
775
776         /* do async setattr from mds to ost not waiting for responses. */
777         rc = obd_setattr_async(mds->mds_lov_exp, &oinfo, &oti, NULL);
778         if (rc)
779                 CDEBUG(D_INODE, "mds to ost setattr objid 0x"LPX64
780                        " on ost error %d\n", oinfo.oi_md->lsm_object_id, rc);
781 out:
782         if (oinfo.oi_md)
783                 obd_free_memmd(mds->mds_lov_exp, &oinfo.oi_md);
784         OBDO_FREE(oinfo.oi_oa);
785         RETURN(rc);
786 }
787
788 int mdd_lov_setattr_async(const struct lu_env *env, struct mdd_object *obj,
789                           struct lov_mds_md *lmm, int lmm_size,
790                           struct llog_cookie *logcookies)
791 {
792         struct mdd_device   *mdd = mdo2mdd(&obj->mod_obj);
793         struct obd_device   *obd = mdd2obd_dev(mdd);
794         struct lu_attr      *tmp_la = &mdd_env_info(env)->mti_la;
795         const struct lu_fid *fid = mdd_object_fid(obj);
796         int rc = 0;
797         ENTRY;
798
799         mdd_read_lock(env, obj, MOR_TGT_CHILD);
800         rc = mdo_attr_get(env, obj, tmp_la, mdd_object_capa(env, obj));
801         mdd_read_unlock(env, obj);
802         if (rc)
803                 RETURN(rc);
804
805         rc = mdd_osc_setattr_async(obd, tmp_la->la_uid, tmp_la->la_gid, lmm,
806                                    lmm_size, logcookies, fid, NULL);
807         RETURN(rc);
808 }
809
810 static int grouplock_blocking_ast(struct ldlm_lock *lock,
811                                   struct ldlm_lock_desc *desc,
812                                   void *data, int flag)
813 {
814         struct md_attr *ma = data;
815         struct lustre_handle lockh;
816         int rc = 0;
817         ENTRY;
818
819         switch (flag)
820         {
821                 case LDLM_CB_BLOCKING :
822                         /* lock is canceled */
823                         CDEBUG(D_DLMTRACE, "Lock %p is canceled\n", lock);
824
825                         ldlm_lock2handle(lock, &lockh);
826                         rc = ldlm_cli_cancel(&lockh);
827
828                         break;
829                 case LDLM_CB_CANCELING :
830                         CDEBUG(D_DLMTRACE,
831                                "Lock %p has been canceled, do cleaning\n",
832                                lock);
833
834                         if (ma && ma->ma_som)
835                                 OBD_FREE_PTR(ma->ma_som);
836                         if (ma)
837                                 OBD_FREE_PTR(ma);
838                         break;
839                 default:
840                         LBUG();
841         }
842         RETURN(rc);
843 }
844
845 static int grouplock_glimpse_ast(struct ldlm_lock *lock, void *data)
846 {
847         struct ptlrpc_request *req = data;
848         struct ost_lvb *lvb;
849         int rc;
850         struct md_attr *ma;
851         ENTRY;
852
853         ma = lock->l_ast_data;
854
855         req_capsule_extend(&req->rq_pill, &RQF_LDLM_GL_CALLBACK);
856         req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
857                              sizeof(*lvb));
858         rc = req_capsule_server_pack(&req->rq_pill);
859         if (rc) {
860                 CERROR("failed pack reply: %d\n", rc);
861                 GOTO(out, rc);
862         }
863
864         lvb = req_capsule_server_get(&req->rq_pill, &RMF_DLM_LVB);
865
866         if ((ma) && (ma->ma_valid & MA_SOM)) {
867                 lvb->lvb_size = ma->ma_som->msd_size;
868                 lvb->lvb_blocks = ma->ma_som->msd_blocks;
869         } else if ((ma) && (ma->ma_valid & MA_INODE)) {
870                 lvb->lvb_size = ma->ma_attr.la_size;
871                 lvb->lvb_blocks = ma->ma_attr.la_blocks;
872         } else {
873                 lvb->lvb_size = 0;
874                 rc = -ELDLM_NO_LOCK_DATA;
875         }
876
877         EXIT;
878 out:
879         if (rc == -ELDLM_NO_LOCK_DATA)
880                 lustre_pack_reply(req, 1, NULL, NULL);
881
882         req->rq_status = rc;
883         return rc;
884 }
885
886 int mdd_file_lock(const struct lu_env *env, struct md_object *obj,
887                   struct lov_mds_md *lmm, struct ldlm_extent *extent,
888                   struct lustre_handle *lockh)
889 {
890         struct ldlm_enqueue_info einfo = { 0 };
891         struct obd_info oinfo = { { { 0 } } };
892         struct obd_device *obd;
893         struct obd_export *lov_exp;
894         struct lov_stripe_md *lsm = NULL;
895         struct md_attr *ma = NULL;
896         int rc;
897         ENTRY;
898
899         obd = mdo2mdd(obj)->mdd_obd_dev;
900         lov_exp = obd->u.mds.mds_lov_exp;
901
902         obd_unpackmd(lov_exp, &lsm, lmm,
903                      lov_mds_md_size(lmm->lmm_stripe_count, lmm->lmm_magic));
904
905         OBD_ALLOC_PTR(ma);
906         if (ma == NULL)
907                 GOTO(out, rc = -ENOMEM);
908
909         OBD_ALLOC_PTR(ma->ma_som);
910         if (ma->ma_som == NULL)
911                 GOTO(out, rc = -ENOMEM);
912
913         ma->ma_need = MA_SOM | MA_INODE;
914         mo_attr_get(env, obj, ma);
915
916         einfo.ei_type = LDLM_EXTENT;
917         einfo.ei_mode = LCK_GROUP;
918         einfo.ei_cb_bl = grouplock_blocking_ast;
919         einfo.ei_cb_cp = ldlm_completion_ast;
920         einfo.ei_cb_gl = grouplock_glimpse_ast;
921
922         if (ma->ma_valid & (MA_SOM | MA_INODE))
923                 einfo.ei_cbdata = ma;
924         else
925                 einfo.ei_cbdata = NULL;
926
927         memset(&oinfo.oi_policy, 0, sizeof(oinfo.oi_policy));
928         oinfo.oi_policy.l_extent = *extent;
929         oinfo.oi_lockh = lockh;
930         oinfo.oi_md = lsm;
931         oinfo.oi_flags = 0;
932
933         rc = obd_enqueue(lov_exp, &oinfo, &einfo, NULL);
934         /* ei_cbdata is used as a free flag at exit */
935         if (rc)
936                 einfo.ei_cbdata = NULL;
937
938         obd_unpackmd(lov_exp, &lsm, NULL, 0);
939
940 out:
941         /* ma is freed if not used as callback data */
942         if ((einfo.ei_cbdata == NULL) && ma && ma->ma_som)
943                 OBD_FREE_PTR(ma->ma_som);
944         if ((einfo.ei_cbdata == NULL) && ma)
945                 OBD_FREE_PTR(ma);
946
947         RETURN(rc);
948 }
949
950 int mdd_file_unlock(const struct lu_env *env, struct md_object *obj,
951                     struct lov_mds_md *lmm, struct lustre_handle *lockh)
952 {
953         struct obd_device *obd;
954         struct obd_export *lov_exp;
955         struct lov_stripe_md *lsm = NULL;
956         int rc;
957         ENTRY;
958
959         LASSERT(lustre_handle_is_used(lockh));
960
961         obd = mdo2mdd(obj)->mdd_obd_dev;
962         lov_exp = obd->u.mds.mds_lov_exp;
963
964         obd_unpackmd(lov_exp, &lsm, lmm,
965                      lov_mds_md_size(lmm->lmm_stripe_count, lmm->lmm_magic));
966
967         rc = obd_cancel(lov_exp, lsm, LCK_GROUP, lockh);
968
969         obd_unpackmd(lov_exp, &lsm, NULL, 0);
970
971         RETURN(rc);
972 }