Whamcloud - gitweb
add per-thread debugging flags. Use then to control CDEBUG().
[fs/lustre-release.git] / lustre / mdd / mdd_dir.c
1 /* -*- MODE: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  mdd/mdd_handler.c
5  *  Lustre Metadata Server (mdd) routines
6  *
7  *  Copyright (C) 2006 Cluster File Systems, Inc.
8  *   Author: Wang Di <wangdi@clusterfs.com>
9  *
10  *   This file is part of the Lustre file system, http://www.lustre.org
11  *   Lustre is a trademark of Cluster File Systems, Inc.
12  *
13  *   You may have signed or agreed to another license before downloading
14  *   this software.  If so, you are bound by the terms and conditions
15  *   of that agreement, and the following does not apply to you.  See the
16  *   LICENSE file included with this distribution for more information.
17  *
18  *   If you did not agree to a different license, then this copy of Lustre
19  *   is open source software; you can redistribute it and/or modify it
20  *   under the terms of version 2 of the GNU General Public License as
21  *   published by the Free Software Foundation.
22  *
23  *   In either case, Lustre is distributed in the hope that it will be
24  *   useful, but WITHOUT ANY WARRANTY; without even the implied warranty
25  *   of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
26  *   license text for more details.
27  */
28 #ifndef EXPORT_SYMTAB
29 # define EXPORT_SYMTAB
30 #endif
31 #define DEBUG_SUBSYSTEM S_MDS
32
33 #include <linux/module.h>
34 #include <linux/jbd.h>
35 #include <obd.h>
36 #include <obd_class.h>
37 #include <lustre_ver.h>
38 #include <obd_support.h>
39 #include <lprocfs_status.h>
40
41 #include <linux/ldiskfs_fs.h>
42 #include <lustre_mds.h>
43 #include <lustre/lustre_idl.h>
44 #include <lustre_fid.h>
45
46 #include "mdd_internal.h"
47
48 static const char dot[] = ".";
49 static const char dotdot[] = "..";
50
51 static int __mdd_lookup(const struct lu_env *env, struct md_object *pobj,
52                         const char *name, struct lu_fid* fid, int mask);
53 static int
54 __mdd_lookup_locked(const struct lu_env *env, struct md_object *pobj,
55                     const char *name, struct lu_fid* fid, int mask)
56 {
57         struct mdd_object *mdd_obj = md2mdd_obj(pobj);
58         struct dynlock_handle *dlh;
59         int rc;
60
61         dlh = mdd_pdo_read_lock(env, mdd_obj, name);
62         if (dlh == NULL)
63                 return -ENOMEM;
64         rc = __mdd_lookup(env, pobj, name, fid, mask);
65         mdd_pdo_read_unlock(env, mdd_obj, dlh);
66
67         return rc;
68 }
69
70 static int mdd_lookup(const struct lu_env *env,
71                       struct md_object *pobj, const char *name,
72                       struct lu_fid* fid, struct md_op_spec *spec)
73 {
74         int rc;
75         ENTRY;
76         rc = __mdd_lookup_locked(env, pobj, name, fid, MAY_EXEC);
77         RETURN(rc);
78 }
79
80
81 static int mdd_parent_fid(const struct lu_env *env, struct mdd_object *obj,
82                           struct lu_fid *fid)
83 {
84         return __mdd_lookup_locked(env, &obj->mod_obj, dotdot, fid, 0);
85 }
86
87 /*
88  * For root fid use special function, whcih does not compare version component
89  * of fid. Vresion component is different for root fids on all MDTs.
90  */
91 static int mdd_is_root(struct mdd_device *mdd, const struct lu_fid *fid)
92 {
93         return fid_seq(&mdd->mdd_root_fid) == fid_seq(fid) &&
94                 fid_oid(&mdd->mdd_root_fid) == fid_oid(fid);
95 }
96
97 /*
98  * return 1: if lf is the fid of the ancestor of p1;
99  * return 0: if not;
100  *
101  * return -EREMOTE: if remote object is found, in this
102  * case fid of remote object is saved to @pf;
103  *
104  * otherwise: values < 0, errors.
105  */
106 static int mdd_is_parent(const struct lu_env *env,
107                          struct mdd_device *mdd,
108                          struct mdd_object *p1,
109                          const struct lu_fid *lf,
110                          struct lu_fid *pf)
111 {
112         struct mdd_object *parent = NULL;
113         struct lu_fid *pfid;
114         int rc;
115         ENTRY;
116
117         LASSERT(!lu_fid_eq(mdo2fid(p1), lf));
118         pfid = &mdd_env_info(env)->mti_fid;
119
120         /* Check for root first. */
121         if (mdd_is_root(mdd, mdo2fid(p1)))
122                 RETURN(0);
123
124         for(;;) {
125                 rc = mdd_parent_fid(env, p1, pfid);
126                 if (rc)
127                         GOTO(out, rc);
128                 if (mdd_is_root(mdd, pfid))
129                         GOTO(out, rc = 0);
130                 if (lu_fid_eq(pfid, lf))
131                         GOTO(out, rc = 1);
132                 if (parent)
133                         mdd_object_put(env, parent);
134                 parent = mdd_object_find(env, mdd, pfid);
135
136                 /* cross-ref parent */
137                 if (parent == NULL) {
138                         if (pf != NULL)
139                                 *pf = *pfid;
140                         GOTO(out, rc = -EREMOTE);
141                 } else if (IS_ERR(parent))
142                         GOTO(out, rc = PTR_ERR(parent));
143                 p1 = parent;
144         }
145         EXIT;
146 out:
147         if (parent && !IS_ERR(parent))
148                 mdd_object_put(env, parent);
149         return rc;
150 }
151
152 /*
153  * No permission check is needed.
154  *
155  * returns 1: if fid is ancestor of @mo;
156  * returns 0: if fid is not a ancestor of @mo;
157  *
158  * returns EREMOTE if remote object is found, fid of remote object is saved to
159  * @fid;
160  *
161  * returns < 0: if error
162  */
163 static int mdd_is_subdir(const struct lu_env *env,
164                          struct md_object *mo, const struct lu_fid *fid,
165                          struct lu_fid *sfid)
166 {
167         struct mdd_device *mdd = mdo2mdd(mo);
168         int rc;
169         ENTRY;
170
171         if (!S_ISDIR(mdd_object_type(md2mdd_obj(mo))))
172                 RETURN(0);
173
174         rc = mdd_is_parent(env, mdd, md2mdd_obj(mo), fid, sfid);
175         if (rc == 0) {
176                 /* found root */
177                 fid_zero(sfid);
178         } else if (rc == 1) {
179                 /* found @fid is parent */
180                 *sfid = *fid;
181                 rc = 0;
182         }
183         RETURN(rc);
184 }
185
186 /* Check whether it may create the cobj under the pobj */
187 static int mdd_may_create(const struct lu_env *env, struct mdd_object *pobj,
188                           struct mdd_object *cobj, int need_check)
189 {
190         int rc = 0;
191         ENTRY;
192
193         if (cobj && lu_object_exists(&cobj->mod_obj.mo_lu))
194                 RETURN(-EEXIST);
195
196         if (mdd_is_dead_obj(pobj))
197                 RETURN(-ENOENT);
198
199         if (need_check)
200                 rc = mdd_permission_internal_locked(env, pobj, NULL,
201                                                     MAY_WRITE | MAY_EXEC);
202
203         RETURN(rc);
204 }
205
206 static inline int mdd_is_sticky(const struct lu_env *env,
207                                 struct mdd_object *pobj,
208                                 struct mdd_object *cobj)
209 {
210         struct lu_attr *tmp_la = &mdd_env_info(env)->mti_la;
211         struct md_ucred *uc = md_ucred(env);
212         int rc;
213
214         rc = mdd_la_get(env, cobj, tmp_la, BYPASS_CAPA);
215         if (rc) {
216                 return rc;
217         } else if (tmp_la->la_uid == uc->mu_fsuid) {
218                 return 0;
219         } else {
220                 rc = mdd_la_get(env, pobj, tmp_la, BYPASS_CAPA);
221                 if (rc)
222                         return rc;
223                 else if (!(tmp_la->la_mode & S_ISVTX) ||
224                          (tmp_la->la_uid == uc->mu_fsuid))
225                         return 0;
226                 else
227                         return !mdd_capable(uc, CAP_FOWNER);
228         }
229 }
230
231 /* Check whether it may delete the cobj under the pobj. */
232 static int mdd_may_delete(const struct lu_env *env,
233                           struct mdd_object *pobj,
234                           struct mdd_object *cobj,
235                           int is_dir, int need_check)
236 {
237         struct mdd_device *mdd = mdo2mdd(&cobj->mod_obj);
238         int rc = 0;
239         ENTRY;
240
241         LASSERT(cobj);
242
243         if (!lu_object_exists(&cobj->mod_obj.mo_lu))
244                 RETURN(-ENOENT);
245
246         if (mdd_is_immutable(cobj) || mdd_is_append(cobj))
247                 RETURN(-EPERM);
248
249         if (is_dir) {
250                 if (!S_ISDIR(mdd_object_type(cobj)))
251                         RETURN(-ENOTDIR);
252
253                 if (lu_fid_eq(mdo2fid(cobj), &mdd->mdd_root_fid))
254                         RETURN(-EBUSY);
255
256         } else if (S_ISDIR(mdd_object_type(cobj))) {
257                 RETURN(-EISDIR);
258         }
259
260         if (pobj) {
261                 if (mdd_is_dead_obj(pobj))
262                         RETURN(-ENOENT);
263
264                 if (mdd_is_sticky(env, pobj, cobj))
265                         RETURN(-EPERM);
266
267                 if (need_check)
268                         rc = mdd_permission_internal_locked(env, pobj, NULL,
269                                                      MAY_WRITE | MAY_EXEC);
270         }
271         RETURN(rc);
272 }
273
274 int mdd_link_sanity_check(const struct lu_env *env, struct mdd_object *tgt_obj,
275                           struct mdd_object *src_obj)
276 {
277         int rc = 0;
278         ENTRY;
279
280         if (mdd_is_immutable(src_obj) || mdd_is_append(src_obj))
281                 RETURN(-EPERM);
282
283         if (S_ISDIR(mdd_object_type(src_obj)))
284                 RETURN(-EPERM);
285
286         LASSERT(src_obj != tgt_obj);
287         if (tgt_obj) {
288                 rc = mdd_may_create(env, tgt_obj, NULL, 1);
289                 if (rc)
290                         RETURN(rc);
291         }
292
293         RETURN(rc);
294 }
295
296 const struct dt_rec *__mdd_fid_rec(const struct lu_env *env,
297                                    const struct lu_fid *fid)
298 {
299         struct mdd_thread_info *info = mdd_env_info(env);
300
301         fid_cpu_to_be(&info->mti_fid2, fid);
302         return (const struct dt_rec *)&info->mti_fid2;
303 }
304
305
306 /* insert new index, add reference if isdir, update times */
307 static int __mdd_index_insert(const struct lu_env *env, struct mdd_object *pobj,
308                               const struct lu_fid *lf, const char *name, int is_dir,
309                               struct thandle *handle, struct lustre_capa *capa)
310 {
311         struct dt_object *next = mdd_object_child(pobj);
312         struct timeval    start;
313         int               rc;
314         ENTRY;
315
316         mdd_lprocfs_time_start(mdo2mdd(&pobj->mod_obj), &start,
317                                LPROC_MDD_INDEX_INSERT);
318         if (dt_try_as_dir(env, next)) {
319                 rc = next->do_index_ops->dio_insert(env, next,
320                                                     __mdd_fid_rec(env, lf),
321                                                     (const struct dt_key *)name,
322                                                     handle, capa);
323         } else {
324                 rc = -ENOTDIR;
325         }
326
327         if (rc == 0) {
328                 if (is_dir) {
329                         mdd_write_lock(env, pobj);
330                         mdd_ref_add_internal(env, pobj, handle);
331                         mdd_write_unlock(env, pobj);
332                 }
333         }
334         mdd_lprocfs_time_end(mdo2mdd(&pobj->mod_obj), &start,
335                              LPROC_MDD_INDEX_INSERT);
336         RETURN(rc);
337 }
338
339 static int __mdd_index_delete(const struct lu_env *env, struct mdd_object *pobj,
340                               const char *name, int is_dir, struct thandle *handle,
341                               struct lustre_capa *capa)
342 {
343         struct dt_object *next = mdd_object_child(pobj);
344         struct timeval    start;
345         int               rc;
346         ENTRY;
347
348         mdd_lprocfs_time_start(mdo2mdd(&pobj->mod_obj), &start,
349                                LPROC_MDD_INDEX_DELETE);
350
351         if (dt_try_as_dir(env, next)) {
352                 rc = next->do_index_ops->dio_delete(env, next,
353                                                     (struct dt_key *)name,
354                                                     handle, capa);
355                 if (rc == 0 && is_dir) {
356                         mdd_write_lock(env, pobj);
357                         mdd_ref_del_internal(env, pobj, handle);
358                         mdd_write_unlock(env, pobj);
359                 }
360         } else
361                 rc = -ENOTDIR;
362
363         mdd_lprocfs_time_end(mdo2mdd(&pobj->mod_obj), &start,
364                              LPROC_MDD_INDEX_DELETE);
365         RETURN(rc);
366 }
367
368 static int
369 __mdd_index_insert_only(const struct lu_env *env, struct mdd_object *pobj,
370                         const struct lu_fid *lf, const char *name,
371                         struct thandle *handle, struct lustre_capa *capa)
372 {
373         struct dt_object *next = mdd_object_child(pobj);
374         int               rc;
375         ENTRY;
376
377         if (dt_try_as_dir(env, next)) {
378                 rc = next->do_index_ops->dio_insert(env, next,
379                                                     __mdd_fid_rec(env, lf),
380                                                     (const struct dt_key *)name,
381                                                     handle, capa);
382         } else {
383                 rc = -ENOTDIR;
384         }
385         RETURN(rc);
386 }
387
388 static int mdd_link(const struct lu_env *env, struct md_object *tgt_obj,
389                     struct md_object *src_obj, const char *name,
390                     struct md_attr *ma)
391 {
392         struct lu_attr    *la = &mdd_env_info(env)->mti_la_for_fix;
393         struct mdd_object *mdd_tobj = md2mdd_obj(tgt_obj);
394         struct mdd_object *mdd_sobj = md2mdd_obj(src_obj);
395         struct mdd_device *mdd = mdo2mdd(src_obj);
396         struct dynlock_handle *dlh;
397         struct thandle *handle;
398         int rc;
399         ENTRY;
400
401         mdd_txn_param_build(env, mdd, MDD_TXN_LINK_OP);
402         handle = mdd_trans_start(env, mdd);
403         if (IS_ERR(handle))
404                 RETURN(PTR_ERR(handle));
405
406         dlh = mdd_pdo_write_lock(env, mdd_tobj, name);
407         if (dlh == NULL)
408                 GOTO(out_trans, rc = -ENOMEM);
409         mdd_write_lock(env, mdd_sobj);
410
411         rc = mdd_link_sanity_check(env, mdd_tobj, mdd_sobj);
412         if (rc)
413                 GOTO(out_unlock, rc);
414
415         rc = __mdd_index_insert_only(env, mdd_tobj, mdo2fid(mdd_sobj),
416                                      name, handle,
417                                      mdd_object_capa(env, mdd_tobj));
418         if (rc)
419                 GOTO(out_unlock, rc);
420
421         mdd_ref_add_internal(env, mdd_sobj, handle);
422
423         *la = ma->ma_attr;
424         la->la_valid = LA_CTIME | LA_MTIME;
425         rc = mdd_attr_set_internal_locked(env, mdd_tobj, la, handle, 0);
426         if (rc)
427                 GOTO(out_unlock, rc);
428
429         la->la_valid = LA_CTIME;
430         rc = mdd_attr_set_internal(env, mdd_sobj, la, handle, 0);
431         EXIT;
432 out_unlock:
433         mdd_write_unlock(env, mdd_sobj);
434         mdd_pdo_write_unlock(env, mdd_tobj, dlh);
435 out_trans:
436         mdd_trans_stop(env, mdd, rc, handle);
437         return rc;
438 }
439
440 static inline void mdd_set_dead_obj(struct mdd_object *obj)
441 {
442         if (obj)
443                 obj->mod_flags |= DEAD_OBJ;
444 }
445
446 /* caller should take a lock before calling */
447 int mdd_finish_unlink(const struct lu_env *env,
448                       struct mdd_object *obj, struct md_attr *ma,
449                       struct thandle *th)
450 {
451         int rc;
452         ENTRY;
453
454         rc = mdd_iattr_get(env, obj, ma);
455         if (rc == 0 && ma->ma_attr.la_nlink == 0) {
456                 /* add new orphan and the object
457                  * will be deleted during the object_put() */
458                 if (__mdd_orphan_add(env, obj, th) == 0)
459                         obj->mod_flags |= ORPHAN_OBJ;
460
461                 mdd_set_dead_obj(obj);
462                 if (obj->mod_count == 0)
463                         rc = mdd_object_kill(env, obj, ma);
464                 else
465                         /* clear MA_LOV | MA_COOKIE, if we do not
466                          * unlink it in case we get it somewhere */
467                         ma->ma_valid &= ~(MA_LOV | MA_COOKIE);
468         } else
469                 ma->ma_valid &= ~(MA_LOV | MA_COOKIE);
470
471         RETURN(rc);
472 }
473
474 /*
475  * Check that @dir contains no entries except (possibly) dot and dotdot.
476  *
477  * Returns:
478  *
479  *             0        empty
480  *    -ENOTEMPTY        not empty
481  *           -ve        other error
482  *
483  */
484 static int mdd_dir_is_empty(const struct lu_env *env,
485                             struct mdd_object *dir)
486 {
487         struct dt_it     *it;
488         struct dt_object *obj;
489         struct dt_it_ops *iops;
490         int result;
491         ENTRY;
492
493         obj = mdd_object_child(dir);
494         iops = &obj->do_index_ops->dio_it;
495         it = iops->init(env, obj, 0, BYPASS_CAPA);
496         if (it != NULL) {
497                 result = iops->get(env, it, (const void *)"");
498                 if (result > 0) {
499                         int i;
500                         for (result = 0, i = 0; result == 0 && i < 3; ++i)
501                                 result = iops->next(env, it);
502                         if (result == 0)
503                                 result = -ENOTEMPTY;
504                         else if (result == +1)
505                                 result = 0;
506                 } else if (result == 0)
507                         /*
508                          * Huh? Index contains no zero key?
509                          */
510                         result = -EIO;
511
512                 iops->put(env, it);
513                 iops->fini(env, it);
514         } else
515                 result = -ENOMEM;
516         RETURN(result);
517 }
518
519 int mdd_unlink_sanity_check(const struct lu_env *env, struct mdd_object *pobj,
520                             struct mdd_object *cobj, struct md_attr *ma)
521 {
522         struct dt_object  *dt_cobj  = mdd_object_child(cobj);
523         int rc = 0;
524         ENTRY;
525
526         rc = mdd_may_delete(env, pobj, cobj,
527                             S_ISDIR(ma->ma_attr.la_mode), 1);
528         if (rc)
529                 RETURN(rc);
530
531         if (S_ISDIR(mdd_object_type(cobj))) {
532                 if (dt_try_as_dir(env, dt_cobj))
533                         rc = mdd_dir_is_empty(env, cobj);
534                 else
535                         rc = -ENOTDIR;
536         }
537
538         RETURN(rc);
539 }
540
541 static int mdd_unlink(const struct lu_env *env, struct md_object *pobj,
542                       struct md_object *cobj, const char *name,
543                       struct md_attr *ma)
544 {
545         struct lu_attr    *la = &mdd_env_info(env)->mti_la_for_fix;
546         struct mdd_object *mdd_pobj = md2mdd_obj(pobj);
547         struct mdd_object *mdd_cobj = md2mdd_obj(cobj);
548         struct mdd_device *mdd = mdo2mdd(pobj);
549         struct dynlock_handle *dlh;
550         struct thandle    *handle;
551         int rc, is_dir;
552         ENTRY;
553
554         /*
555          * Check -ENOENT early here because we need to get object type
556          * to calculate credits before transaction start
557          */
558         if (!lu_object_exists(&cobj->mo_lu)) {
559                 LU_OBJECT_DEBUG(D_ERROR, env, &cobj->mo_lu,
560                                 "unlinking as `%s'", name);
561                 RETURN(-ENOENT);
562         }
563
564         LASSERTF(lu_object_exists(&cobj->mo_lu) > 0, "FID is "DFID"\n",
565                  PFID(lu_object_fid(&cobj->mo_lu)));
566
567         rc = mdd_log_txn_param_build(env, cobj, ma, MDD_TXN_UNLINK_OP);
568         if (rc)
569                 RETURN(rc);
570
571         handle = mdd_trans_start(env, mdd);
572         if (IS_ERR(handle))
573                 RETURN(PTR_ERR(handle));
574
575         dlh = mdd_pdo_write_lock(env, mdd_pobj, name);
576         if (dlh == NULL)
577                 GOTO(out_trans, rc = -ENOMEM);
578         mdd_write_lock(env, mdd_cobj);
579
580         rc = mdd_unlink_sanity_check(env, mdd_pobj, mdd_cobj, ma);
581         if (rc)
582                 GOTO(cleanup, rc);
583
584         is_dir = S_ISDIR(lu_object_attr(&cobj->mo_lu));
585
586         current->debugging1 |= 0x1; /* XXX enable lvar_enoent_debug
587                                      * debugging */
588         rc = __mdd_index_delete(env, mdd_pobj, name, is_dir, handle,
589                                 mdd_object_capa(env, mdd_pobj));
590         current->debugging1 &= ~0x1;
591         if (rc)
592                 GOTO(cleanup, rc);
593
594         mdd_ref_del_internal(env, mdd_cobj, handle);
595         if (is_dir) {
596                 /* unlink dot */
597                 mdd_ref_del_internal(env, mdd_cobj, handle);
598         }
599
600         *la = ma->ma_attr;
601         la->la_valid = LA_CTIME | LA_MTIME;
602         rc = mdd_attr_set_internal_locked(env, mdd_pobj, la, handle, 0);
603         if (rc)
604                 GOTO(cleanup, rc);
605
606         la->la_valid = LA_CTIME;
607         rc = mdd_attr_set_internal(env, mdd_cobj, la, handle, 0);
608         if (rc)
609                 GOTO(cleanup, rc);
610
611         rc = mdd_finish_unlink(env, mdd_cobj, ma, handle);
612
613         if (rc == 0)
614                 obd_set_info_async(mdd2obd_dev(mdd)->u.mds.mds_osc_exp,
615                                    strlen("unlinked"), "unlinked", 0,
616                                    NULL, NULL);
617         EXIT;
618 cleanup:
619         mdd_write_unlock(env, mdd_cobj);
620         mdd_pdo_write_unlock(env, mdd_pobj, dlh);
621 out_trans:
622         mdd_trans_stop(env, mdd, rc, handle);
623         return rc;
624 }
625
626 static int mdd_ni_sanity_check(const struct lu_env *env,
627                                struct md_object *pobj,
628                                const char *name,
629                                const struct lu_fid *fid)
630 {
631         struct mdd_object *obj = md2mdd_obj(pobj);
632         ENTRY;
633
634         /* EEXIST check */
635         if (mdd_is_dead_obj(obj))
636                 RETURN(-ENOENT);
637
638         /* The exist of the name will be checked in _index_insert. */
639         RETURN(mdd_permission_internal_locked(env, obj, NULL,
640                                               MAY_WRITE | MAY_EXEC));
641 }
642
643 /*
644  * Partial operation.
645  */
646 static int mdd_name_insert(const struct lu_env *env, struct md_object *pobj,
647                            const char *name, const struct lu_fid *fid,
648                            int is_dir)
649 {
650         struct lu_attr   *la = &mdd_env_info(env)->mti_la;
651         struct mdd_object *mdd_obj = md2mdd_obj(pobj);
652         struct mdd_device *mdd = mdo2mdd(pobj);
653         struct dynlock_handle *dlh;
654         struct thandle *handle;
655         int rc;
656         ENTRY;
657
658         mdd_txn_param_build(env, mdd, MDD_TXN_INDEX_INSERT_OP);
659         handle = mdd_trans_start(env, mdo2mdd(pobj));
660         if (IS_ERR(handle))
661                 RETURN(PTR_ERR(handle));
662
663         dlh = mdd_pdo_write_lock(env, mdd_obj, name);
664         if (dlh == NULL)
665                 GOTO(out_trans, rc = -ENOMEM);
666         rc = mdd_ni_sanity_check(env, pobj, name, fid);
667         if (rc)
668                 GOTO(out_unlock, rc);
669
670         rc = __mdd_index_insert(env, mdd_obj, fid, name, is_dir,
671                                 handle, BYPASS_CAPA);
672         if (rc == 0) {
673                 la->la_ctime = la->la_atime = CURRENT_SECONDS;
674                 la->la_valid = LA_ATIME | LA_CTIME;
675                 rc = mdd_attr_set_internal_locked(env, mdd_obj, la, handle, 0);
676         }
677         EXIT;
678 out_unlock:
679         mdd_pdo_write_unlock(env, mdd_obj, dlh);
680 out_trans:
681         mdd_trans_stop(env, mdo2mdd(pobj), rc, handle);
682         return rc;
683 }
684
685 static int mdd_nr_sanity_check(const struct lu_env *env,
686                                struct md_object *pobj,
687                                const char *name)
688 {
689         struct mdd_object *obj = md2mdd_obj(pobj);
690         ENTRY;
691
692         /* EEXIST check */
693         if (mdd_is_dead_obj(obj)) {
694                 CWARN("Dir "DFID" is dead?\n", PFID(mdo2fid(obj)));
695                 RETURN(-ENOENT);
696         }
697
698         /* Name presense will be checked in _index_delete. */
699         RETURN(mdd_permission_internal_locked(env, obj, NULL,
700                                               MAY_WRITE | MAY_EXEC));
701 }
702
703 /*
704  * Partial operation.
705  */
706 static int mdd_name_remove(const struct lu_env *env,
707                            struct md_object *pobj,
708                            const char *name, int is_dir)
709 {
710         struct lu_attr    *la = &mdd_env_info(env)->mti_la_for_fix;
711         struct mdd_object *mdd_obj = md2mdd_obj(pobj);
712         struct mdd_device *mdd = mdo2mdd(pobj);
713         struct dynlock_handle *dlh;
714         struct thandle *handle;
715         int rc;
716         ENTRY;
717
718         mdd_txn_param_build(env, mdd, MDD_TXN_INDEX_DELETE_OP);
719         handle = mdd_trans_start(env, mdd);
720         if (IS_ERR(handle))
721                 RETURN(PTR_ERR(handle));
722
723         dlh = mdd_pdo_write_lock(env, mdd_obj, name);
724         if (dlh == NULL)
725                 GOTO(out_trans, rc = -ENOMEM);
726         rc = mdd_nr_sanity_check(env, pobj, name);
727         if (rc)
728                 GOTO(out_unlock, rc);
729
730         rc = __mdd_index_delete(env, mdd_obj, name, is_dir,
731                                 handle, BYPASS_CAPA);
732         if (rc)
733                 GOTO(out_unlock, rc);
734
735         la->la_ctime = la->la_mtime = CURRENT_SECONDS;
736         la->la_valid = LA_CTIME | LA_MTIME;
737         rc = mdd_attr_set_internal_locked(env, mdd_obj, la, handle, 0);
738         EXIT;
739 out_unlock:
740         mdd_pdo_write_unlock(env, mdd_obj, dlh);
741 out_trans:
742         mdd_trans_stop(env, mdd, rc, handle);
743         return rc;
744 }
745
746 static int mdd_rt_sanity_check(const struct lu_env *env,
747                                struct mdd_object *tgt_pobj,
748                                struct mdd_object *tobj,
749                                const struct lu_fid *sfid,
750                                const char *name, struct md_attr *ma)
751 {
752         int rc, src_is_dir;
753         ENTRY;
754
755         /* EEXIST check */
756         if (mdd_is_dead_obj(tgt_pobj))
757                 RETURN(-ENOENT);
758
759         src_is_dir = S_ISDIR(ma->ma_attr.la_mode);
760         if (tobj) {
761                 rc = mdd_may_delete(env, tgt_pobj, tobj, src_is_dir, 1);
762                 if (!rc && S_ISDIR(mdd_object_type(tobj)) &&
763                      mdd_dir_is_empty(env, tobj))
764                                 RETURN(-ENOTEMPTY);
765         } else {
766                 rc = mdd_may_create(env, tgt_pobj, NULL, 1);
767         }
768
769         RETURN(rc);
770 }
771
772 static int mdd_rename_tgt(const struct lu_env *env,
773                           struct md_object *pobj, struct md_object *tobj,
774                           const struct lu_fid *lf, const char *name,
775                           struct md_attr *ma)
776 {
777         struct lu_attr    *la = &mdd_env_info(env)->mti_la_for_fix;
778         struct mdd_object *mdd_tpobj = md2mdd_obj(pobj);
779         struct mdd_object *mdd_tobj = md2mdd_obj(tobj);
780         struct mdd_device *mdd = mdo2mdd(pobj);
781         struct dynlock_handle *dlh;
782         struct thandle *handle;
783         int rc;
784         ENTRY;
785
786         mdd_txn_param_build(env, mdd, MDD_TXN_RENAME_TGT_OP);
787         handle = mdd_trans_start(env, mdd);
788         if (IS_ERR(handle))
789                 RETURN(PTR_ERR(handle));
790
791         dlh = mdd_pdo_write_lock(env, mdd_tpobj, name);
792         if (dlh == NULL)
793                 GOTO(out_trans, rc = -ENOMEM);
794         if (mdd_tobj)
795                 mdd_write_lock(env, mdd_tobj);
796
797         /* XXX: Rename sanity checking. */
798         rc = mdd_rt_sanity_check(env, mdd_tpobj, mdd_tobj, lf, name, ma);
799         if (rc)
800                 GOTO(cleanup, rc);
801
802         /*
803          * If rename_tgt is called then we should just re-insert name with
804          * correct fid, no need to dec/inc parent nlink if obj is dir.
805          */
806         rc = __mdd_index_delete(env, mdd_tpobj, name, 0, handle, BYPASS_CAPA);
807         if (rc)
808                 GOTO(cleanup, rc);
809
810         rc = __mdd_index_insert_only(env, mdd_tpobj, lf, name, handle,
811                                      BYPASS_CAPA);
812         if (rc)
813                 GOTO(cleanup, rc);
814
815         *la = ma->ma_attr;
816         la->la_valid = LA_CTIME | LA_MTIME;
817         rc = mdd_attr_set_internal_locked(env, mdd_tpobj, la, handle, 0);
818         if (rc)
819                 GOTO(cleanup, rc);
820
821         if (tobj && lu_object_exists(&tobj->mo_lu)) {
822                 mdd_ref_del_internal(env, mdd_tobj, handle);
823                 la->la_valid = LA_CTIME;
824                 rc = mdd_attr_set_internal(env, mdd_tobj, la, handle, 0);
825         }
826         EXIT;
827 cleanup:
828         if (tobj)
829                 mdd_write_unlock(env, mdd_tobj);
830         mdd_pdo_write_unlock(env, mdd_tpobj, dlh);
831 out_trans:
832         mdd_trans_stop(env, mdd, rc, handle);
833         return rc;
834 }
835
836 /*
837  * The permission has been checked when obj created, no need check again.
838  */
839 static int mdd_cd_sanity_check(const struct lu_env *env,
840                                struct mdd_object *obj)
841 {
842         ENTRY;
843
844         /* EEXIST check */
845         if (!obj || mdd_is_dead_obj(obj))
846                 RETURN(-ENOENT);
847
848         RETURN(0);
849
850 }
851
852 static int mdd_create_data(const struct lu_env *env, struct md_object *pobj,
853                            struct md_object *cobj, const struct md_op_spec *spec,
854                            struct md_attr *ma)
855 {
856         struct mdd_device *mdd = mdo2mdd(cobj);
857         struct mdd_object *mdd_pobj = md2mdd_obj(pobj);
858         struct mdd_object *son = md2mdd_obj(cobj);
859         struct lu_attr    *attr = &ma->ma_attr;
860         struct lov_mds_md *lmm = NULL;
861         int                lmm_size = 0;
862         struct thandle    *handle;
863         int                rc;
864         ENTRY;
865
866         rc = mdd_cd_sanity_check(env, son);
867         if (rc)
868                 RETURN(rc);
869
870         if (spec->sp_cr_flags & MDS_OPEN_DELAY_CREATE ||
871             !(spec->sp_cr_flags & FMODE_WRITE))
872                 RETURN(0);
873
874         rc = mdd_lov_create(env, mdd, mdd_pobj, son, &lmm, &lmm_size,
875                             spec, attr);
876         if (rc)
877                 RETURN(rc);
878
879         mdd_txn_param_build(env, mdd, MDD_TXN_CREATE_DATA_OP);
880         handle = mdd_trans_start(env, mdd);
881         if (IS_ERR(handle))
882                 GOTO(out_free, rc = PTR_ERR(handle));
883
884         /*
885          * XXX: Setting the lov ea is not locked but setting the attr is locked?
886          * Should this be fixed?
887          */
888
889         /* Replay creates has objects already */
890         if (spec->u.sp_ea.no_lov_create) {
891                 CDEBUG(D_INFO, "we already have lov ea\n");
892                 rc = mdd_lov_set_md(env, mdd_pobj, son,
893                                     (struct lov_mds_md *)spec->u.sp_ea.eadata,
894                                     spec->u.sp_ea.eadatalen, handle, 0);
895         } else
896                 rc = mdd_lov_set_md(env, mdd_pobj, son, lmm,
897                                     lmm_size, handle, 0);
898
899         if (rc == 0)
900                rc = mdd_attr_get_internal_locked(env, son, ma);
901
902         mdd_trans_stop(env, mdd, rc, handle);
903 out_free:
904         /* Finish mdd_lov_create() stuff. */
905         mdd_lov_create_finish(env, mdd, rc);
906         if (lmm)
907                 OBD_FREE(lmm, lmm_size);
908         RETURN(rc);
909 }
910
911 static int
912 __mdd_lookup(const struct lu_env *env, struct md_object *pobj,
913              const char *name, struct lu_fid* fid, int mask)
914 {
915         const struct dt_key *key = (const struct dt_key *)name;
916         struct mdd_object   *mdd_obj = md2mdd_obj(pobj);
917         struct dt_object    *dir = mdd_object_child(mdd_obj);
918         struct dt_rec       *rec = (struct dt_rec *)fid;
919         struct timeval       start;
920         int rc;
921         ENTRY;
922
923         mdd_lprocfs_time_start(mdo2mdd(pobj), &start, LPROC_MDD_LOOKUP);
924         if (mdd_is_dead_obj(mdd_obj))
925                 RETURN(-ESTALE);
926
927         rc = lu_object_exists(mdd2lu_obj(mdd_obj));
928         if (rc == 0)
929                 RETURN(-ESTALE);
930         else if (rc < 0) {
931                 CERROR("Object "DFID" locates on remote server\n",
932                         PFID(mdo2fid(mdd_obj)));
933                 LBUG();
934         }
935
936         rc = mdd_permission_internal_locked(env, mdd_obj, NULL, mask);
937         if (rc)
938                 RETURN(rc);
939
940         if (S_ISDIR(mdd_object_type(mdd_obj)) && dt_try_as_dir(env, dir)) {
941                 rc = dir->do_index_ops->dio_lookup(env, dir, rec, key,
942                                                    mdd_object_capa(env, mdd_obj));
943                 if (rc == 0)
944                         fid_be_to_cpu(fid, fid);
945         } else
946                 rc = -ENOTDIR;
947
948         mdd_lprocfs_time_end(mdo2mdd(pobj), &start, LPROC_MDD_LOOKUP);
949         RETURN(rc);
950 }
951
952 int mdd_object_initialize(const struct lu_env *env, const struct lu_fid *pfid,
953                           struct mdd_object *child, struct md_attr *ma,
954                           struct thandle *handle)
955 {
956         int rc;
957         ENTRY;
958
959         /*
960          * Update attributes for child.
961          *
962          * FIXME:
963          *  (1) the valid bits should be converted between Lustre and Linux;
964          *  (2) maybe, the child attributes should be set in OSD when creation.
965          */
966
967         rc = mdd_attr_set_internal(env, child, &ma->ma_attr, handle, 0);
968         if (rc != 0)
969                 RETURN(rc);
970
971         if (S_ISDIR(ma->ma_attr.la_mode)) {
972                 /* Add "." and ".." for newly created dir */
973                 mdd_ref_add_internal(env, child, handle);
974                 rc = __mdd_index_insert_only(env, child, mdo2fid(child),
975                                              dot, handle, BYPASS_CAPA);
976                 if (rc == 0) {
977                         rc = __mdd_index_insert_only(env, child, pfid,
978                                                      dotdot, handle,
979                                                      BYPASS_CAPA);
980                         if (rc != 0) {
981                                 int rc2;
982
983                                 rc2 = __mdd_index_delete(env, child, dot, 0,
984                                                          handle, BYPASS_CAPA);
985                                 if (rc2 != 0)
986                                         CERROR("Failure to cleanup after dotdot"
987                                                " creation: %d (%d)\n", rc2, rc);
988                                 else
989                                         mdd_ref_del_internal(env, child, handle);
990                         }
991                 }
992         }
993         RETURN(rc);
994 }
995
996 static int mdd_create_sanity_check(const struct lu_env *env,
997                                    struct md_object *pobj,
998                                    const char *name,
999                                    struct md_attr *ma,
1000                                    int lookup)
1001 {
1002         struct mdd_thread_info *info = mdd_env_info(env);
1003         struct lu_attr    *la        = &info->mti_la;
1004         struct lu_fid     *fid       = &info->mti_fid;
1005         struct mdd_object *obj       = md2mdd_obj(pobj);
1006         int rc;
1007         ENTRY;
1008
1009         /* EEXIST check */
1010         if (mdd_is_dead_obj(obj))
1011                 RETURN(-ENOENT);
1012
1013         /*
1014          * In some cases this lookup is not needed - we know before if name
1015          * exists or not because MDT performs lookup for it.
1016          */
1017         /* XXX disable that lookup temporary */
1018         if (0 && lookup) {
1019                 /*
1020                  * Check if the name already exist, though it will be checked in
1021                  * _index_insert also, for avoiding rolling back if exists
1022                  * _index_insert.
1023                  */
1024                 rc = __mdd_lookup_locked(env, pobj, name, fid,
1025                                          MAY_WRITE | MAY_EXEC);
1026                 if (rc != -ENOENT)
1027                         RETURN(rc ? : -EEXIST);
1028         } else {
1029                 /*
1030                  * Check if has WRITE permission for the parent.
1031                  */
1032                 rc = mdd_permission_internal_locked(env, obj, NULL, MAY_WRITE);
1033                 if (rc)
1034                         RETURN(rc);
1035         }
1036
1037         /* sgid check */
1038         rc = mdd_la_get(env, obj, la, BYPASS_CAPA);
1039         if (rc != 0)
1040                 RETURN(rc);
1041
1042         if (la->la_mode & S_ISGID) {
1043                 ma->ma_attr.la_gid = la->la_gid;
1044                 if (S_ISDIR(ma->ma_attr.la_mode)) {
1045                         ma->ma_attr.la_mode |= S_ISGID;
1046                         ma->ma_attr.la_valid |= LA_MODE;
1047                 }
1048         }
1049
1050         switch (ma->ma_attr.la_mode & S_IFMT) {
1051         case S_IFDIR: {
1052                 struct mdd_device *mdd = mdo2mdd(pobj);
1053                 if (la->la_nlink >= mdd->mdd_dt_conf.ddp_max_nlink)
1054                         RETURN(-EMLINK);
1055         }
1056         case S_IFREG:
1057         case S_IFLNK:
1058         case S_IFCHR:
1059         case S_IFBLK:
1060         case S_IFIFO:
1061         case S_IFSOCK:
1062                 rc = 0;
1063                 break;
1064         default:
1065                 rc = -EINVAL;
1066                 break;
1067         }
1068         RETURN(rc);
1069 }
1070
1071 /*
1072  * Create object and insert it into namespace.
1073  */
1074 static int mdd_create(const struct lu_env *env,
1075                       struct md_object *pobj, const char *name,
1076                       struct md_object *child,
1077                       struct md_op_spec *spec,
1078                       struct md_attr* ma)
1079 {
1080         struct lu_attr    *la = &mdd_env_info(env)->mti_la_for_fix;
1081         struct mdd_object *mdd_pobj = md2mdd_obj(pobj);
1082         struct mdd_object *son = md2mdd_obj(child);
1083         struct mdd_device *mdd = mdo2mdd(pobj);
1084         struct lu_attr    *attr = &ma->ma_attr;
1085         struct lov_mds_md *lmm = NULL;
1086         struct thandle    *handle;
1087         int rc, created = 0, inserted = 0, lmm_size = 0;
1088         struct dynlock_handle *dlh;
1089         struct timeval  start;
1090         ENTRY;
1091
1092         mdd_lprocfs_time_start(mdd, &start, LPROC_MDD_CREATE);
1093
1094         /*
1095          * Two operations have to be performed:
1096          *
1097          *  - allocation of new object (->do_create()), and
1098          *
1099          *  - insertion into parent index (->dio_insert()).
1100          *
1101          * Due to locking, operation order is not important, when both are
1102          * successful, *but* error handling cases are quite different:
1103          *
1104          *  - if insertion is done first, and following object creation fails,
1105          *  insertion has to be rolled back, but this operation might fail
1106          *  also leaving us with dangling index entry.
1107          *
1108          *  - if creation is done first, is has to be undone if insertion
1109          *  fails, leaving us with leaked space, which is neither good, nor
1110          *  fatal.
1111          *
1112          * It seems that creation-first is simplest solution, but it is
1113          * sub-optimal in the frequent
1114          *
1115          *         $ mkdir foo
1116          *         $ mkdir foo
1117          *
1118          * case, because second mkdir is bound to create object, only to
1119          * destroy it immediately.
1120          *
1121          * To avoid this follow local file systems that do double lookup:
1122          *
1123          *     0. lookup -> -EEXIST (mdd_create_sanity_check())
1124          *
1125          *     1. create            (mdd_object_create_internal())
1126          *
1127          *     2. insert            (__mdd_index_insert(), lookup again)
1128          */
1129
1130         /* Sanity checks before big job. */
1131         rc = mdd_create_sanity_check(env, pobj, name, ma, spec->sp_cr_lookup);
1132         if (rc)
1133                 RETURN(rc);
1134
1135         /*
1136          * No RPC inside the transaction, so OST objects should be created at
1137          * first.
1138          */
1139         if (S_ISREG(attr->la_mode)) {
1140                 rc = mdd_lov_create(env, mdd, mdd_pobj, son, &lmm, &lmm_size,
1141                                     spec, attr);
1142                 if (rc)
1143                         RETURN(rc);
1144         }
1145
1146         mdd_txn_param_build(env, mdd, MDD_TXN_MKDIR_OP);
1147         handle = mdd_trans_start(env, mdd);
1148         if (IS_ERR(handle))
1149                 GOTO(out_free, rc = PTR_ERR(handle));
1150
1151         dlh = mdd_pdo_write_lock(env, mdd_pobj, name);
1152         if (dlh == NULL)
1153                 GOTO(out_trans, rc = -ENOMEM);
1154
1155         /*
1156          * XXX: Check that link can be added to the parent in mkdir case.
1157          */
1158
1159         mdd_write_lock(env, son);
1160         rc = mdd_object_create_internal(env, son, ma, handle);
1161         if (rc) {
1162                 mdd_write_unlock(env, son);
1163                 GOTO(cleanup, rc);
1164         }
1165
1166         created = 1;
1167
1168 #ifdef CONFIG_FS_POSIX_ACL
1169         mdd_read_lock(env, mdd_pobj);
1170         rc = mdd_acl_init(env, mdd_pobj, son, &ma->ma_attr.la_mode, handle);
1171         mdd_read_unlock(env, mdd_pobj);
1172         if (rc) {
1173                 mdd_write_unlock(env, son);
1174                 GOTO(cleanup, rc);
1175         } else {
1176                 ma->ma_attr.la_valid |= LA_MODE;
1177         }
1178 #endif
1179
1180         rc = mdd_object_initialize(env, mdo2fid(mdd_pobj),
1181                                    son, ma, handle);
1182         mdd_write_unlock(env, son);
1183         if (rc)
1184                 /*
1185                  * Object has no links, so it will be destroyed when last
1186                  * reference is released. (XXX not now.)
1187                  */
1188                 GOTO(cleanup, rc);
1189
1190         rc = __mdd_index_insert(env, mdd_pobj, mdo2fid(son),
1191                                 name, S_ISDIR(attr->la_mode), handle,
1192                                 mdd_object_capa(env, mdd_pobj));
1193
1194         if (rc)
1195                 GOTO(cleanup, rc);
1196
1197         inserted = 1;
1198
1199         /* Replay creates has objects already. */
1200         if (spec->u.sp_ea.no_lov_create) {
1201                 CDEBUG(D_INFO, "we already have lov ea\n");
1202                 LASSERT(lmm == NULL);
1203                 lmm = (struct lov_mds_md *)spec->u.sp_ea.eadata;
1204                 lmm_size = spec->u.sp_ea.eadatalen;
1205         }
1206         rc = mdd_lov_set_md(env, mdd_pobj, son, lmm, lmm_size, handle, 0);
1207         if (rc) {
1208                 CERROR("error on stripe info copy %d \n", rc);
1209                 GOTO(cleanup, rc);
1210         }
1211         if (lmm && lmm_size > 0) {
1212                 /* Set Lov here, do not get lmm again later */
1213                 memcpy(ma->ma_lmm, lmm, lmm_size);
1214                 ma->ma_lmm_size = lmm_size;
1215                 ma->ma_valid |= MA_LOV;
1216         }
1217
1218         if (S_ISLNK(attr->la_mode)) {
1219                 struct dt_object *dt = mdd_object_child(son);
1220                 const char *target_name = spec->u.sp_symname;
1221                 int sym_len = strlen(target_name);
1222                 const struct lu_buf *buf;
1223                 loff_t pos = 0;
1224
1225                 buf = mdd_buf_get_const(env, target_name, sym_len);
1226                 rc = dt->do_body_ops->dbo_write(env, dt, buf, &pos, handle,
1227                                                 mdd_object_capa(env, son));
1228
1229                 if (rc == sym_len)
1230                         rc = 0;
1231                 else
1232                         GOTO(cleanup, rc = -EFAULT);
1233         }
1234
1235         *la = ma->ma_attr;
1236         la->la_valid = LA_CTIME | LA_MTIME;
1237         rc = mdd_attr_set_internal_locked(env, mdd_pobj, la, handle, 0);
1238         if (rc)
1239                 GOTO(cleanup, rc);
1240
1241         /* Return attr back. */
1242         rc = mdd_attr_get_internal_locked(env, son, ma);
1243         EXIT;
1244 cleanup:
1245         if (rc && created) {
1246                 int rc2 = 0;
1247
1248                 if (inserted) {
1249                         rc2 = __mdd_index_delete(env, mdd_pobj, name,
1250                                                  S_ISDIR(attr->la_mode),
1251                                                  handle, BYPASS_CAPA);
1252                         if (rc2)
1253                                 CERROR("error can not cleanup destroy %d\n",
1254                                        rc2);
1255                 }
1256                 if (rc2 == 0) {
1257                         mdd_write_lock(env, son);
1258                         mdd_ref_del_internal(env, son, handle);
1259                         mdd_write_unlock(env, son);
1260                 }
1261         }
1262
1263         mdd_pdo_write_unlock(env, mdd_pobj, dlh);
1264 out_trans:
1265         mdd_trans_stop(env, mdd, rc, handle);
1266 out_free:
1267         if (lmm && !spec->u.sp_ea.no_lov_create)
1268                 OBD_FREE(lmm, lmm_size);
1269         /* Finish mdd_lov_create() stuff */
1270         mdd_lov_create_finish(env, mdd, rc);
1271         mdd_lprocfs_time_end(mdd, &start, LPROC_MDD_CREATE);
1272         return rc;
1273 }
1274
1275 /*
1276  * Get locks on parents in proper order
1277  * RETURN: < 0 - error, rename_order if successful
1278  */
1279 enum rename_order {
1280         MDD_RN_SAME,
1281         MDD_RN_SRCTGT,
1282         MDD_RN_TGTSRC
1283 };
1284
1285 static int mdd_rename_order(const struct lu_env *env,
1286                             struct mdd_device *mdd,
1287                             struct mdd_object *src_pobj,
1288                             struct mdd_object *tgt_pobj)
1289 {
1290         /* order of locking, 1 - tgt-src, 0 - src-tgt*/
1291         int rc;
1292         ENTRY;
1293
1294         if (src_pobj == tgt_pobj)
1295                 RETURN(MDD_RN_SAME);
1296
1297         /* compared the parent child relationship of src_p&tgt_p */
1298         if (lu_fid_eq(&mdd->mdd_root_fid, mdo2fid(src_pobj))){
1299                 rc = MDD_RN_SRCTGT;
1300         } else if (lu_fid_eq(&mdd->mdd_root_fid, mdo2fid(tgt_pobj))) {
1301                 rc = MDD_RN_TGTSRC;
1302         } else {
1303                 rc = mdd_is_parent(env, mdd, src_pobj, mdo2fid(tgt_pobj), NULL);
1304                 if (rc == -EREMOTE)
1305                         rc = 0;
1306
1307                 if (rc == 1)
1308                         rc = MDD_RN_TGTSRC;
1309                 else if (rc == 0)
1310                         rc = MDD_RN_SRCTGT;
1311         }
1312
1313         RETURN(rc);
1314 }
1315
1316 static int mdd_rename_sanity_check(const struct lu_env *env,
1317                                    struct mdd_object *src_pobj,
1318                                    struct mdd_object *tgt_pobj,
1319                                    const struct lu_fid *sfid,
1320                                    int src_is_dir,
1321                                    struct mdd_object *tobj)
1322 {
1323         int rc;
1324         ENTRY;
1325
1326         if (mdd_is_dead_obj(src_pobj))
1327                 RETURN(-ENOENT);
1328
1329         /* The sobj maybe on the remote, check parent permission only here */
1330         rc = mdd_permission_internal_locked(env, src_pobj, NULL,
1331                                             MAY_WRITE | MAY_EXEC);
1332         if (rc)
1333                 RETURN(rc);
1334
1335         if (!tobj) {
1336                 rc = mdd_may_create(env, tgt_pobj, NULL,
1337                                     (src_pobj != tgt_pobj));
1338         } else {
1339                 rc = mdd_may_delete(env, tgt_pobj, tobj, src_is_dir,
1340                                     (src_pobj != tgt_pobj));
1341                 if (rc == 0)
1342                         if (S_ISDIR(mdd_object_type(tobj))
1343                             && mdd_dir_is_empty(env, tobj))
1344                                 rc = -ENOTEMPTY;
1345         }
1346
1347         RETURN(rc);
1348 }
1349 /* src object can be remote that is why we use only fid and type of object */
1350 static int mdd_rename(const struct lu_env *env,
1351                       struct md_object *src_pobj, struct md_object *tgt_pobj,
1352                       const struct lu_fid *lf, const char *sname,
1353                       struct md_object *tobj, const char *tname,
1354                       struct md_attr *ma)
1355 {
1356         struct lu_attr    *la = &mdd_env_info(env)->mti_la_for_fix;
1357         struct mdd_object *mdd_spobj = md2mdd_obj(src_pobj);
1358         struct mdd_object *mdd_tpobj = md2mdd_obj(tgt_pobj);
1359         struct mdd_device *mdd = mdo2mdd(src_pobj);
1360         struct mdd_object *mdd_sobj = NULL;
1361         struct mdd_object *mdd_tobj = NULL;
1362         struct dynlock_handle *sdlh, *tdlh;
1363         struct thandle *handle;
1364         int is_dir;
1365         int rc;
1366         ENTRY;
1367
1368         LASSERT(ma->ma_attr.la_mode & S_IFMT);
1369         is_dir = S_ISDIR(ma->ma_attr.la_mode);
1370         if (ma->ma_attr.la_valid & LA_FLAGS &&
1371             ma->ma_attr.la_flags & (LUSTRE_APPEND_FL | LUSTRE_IMMUTABLE_FL))
1372                 RETURN(-EPERM);
1373
1374         if (tobj)
1375                 mdd_tobj = md2mdd_obj(tobj);
1376
1377         mdd_txn_param_build(env, mdd, MDD_TXN_RENAME_OP);
1378         handle = mdd_trans_start(env, mdd);
1379         if (IS_ERR(handle))
1380                 RETURN(PTR_ERR(handle));
1381
1382         /* FIXME: Should consider tobj and sobj too in rename_lock. */
1383         rc = mdd_rename_order(env, mdd, mdd_spobj, mdd_tpobj);
1384         if (rc < 0)
1385                 GOTO(cleanup_unlocked, rc);
1386
1387         /* Get locks in determined order */
1388         if (rc == MDD_RN_SAME) {
1389                 sdlh = mdd_pdo_write_lock(env, mdd_spobj, sname);
1390                 /* check hashes to determine do we need one lock or two */
1391                 if (mdd_name2hash(sname) != mdd_name2hash(tname))
1392                         tdlh = mdd_pdo_write_lock(env, mdd_tpobj, tname);
1393                 else
1394                         tdlh = sdlh;
1395         } else if (rc == MDD_RN_SRCTGT) {
1396                 sdlh = mdd_pdo_write_lock(env, mdd_spobj, sname);
1397                 tdlh = mdd_pdo_write_lock(env, mdd_tpobj, tname);
1398         } else {
1399                 tdlh = mdd_pdo_write_lock(env, mdd_tpobj, tname);
1400                 sdlh = mdd_pdo_write_lock(env, mdd_spobj, sname);
1401         }
1402         if (sdlh == NULL || tdlh == NULL)
1403                 GOTO(cleanup, rc = -ENOMEM);
1404
1405         rc = mdd_rename_sanity_check(env, mdd_spobj, mdd_tpobj,
1406                                      lf, is_dir, mdd_tobj);
1407         if (rc)
1408                 GOTO(cleanup, rc);
1409
1410         rc = __mdd_index_delete(env, mdd_spobj, sname, is_dir, handle,
1411                                 mdd_object_capa(env, mdd_spobj));
1412         if (rc)
1413                 GOTO(cleanup, rc);
1414
1415         /*
1416          * Here tobj can be remote one, so we do index_delete unconditionally
1417          * and -ENOENT is allowed.
1418          */
1419         rc = __mdd_index_delete(env, mdd_tpobj, tname, is_dir, handle,
1420                                 mdd_object_capa(env, mdd_tpobj));
1421         if (rc != 0 && rc != -ENOENT)
1422                 GOTO(cleanup, rc);
1423
1424         rc = __mdd_index_insert(env, mdd_tpobj, lf, tname, is_dir, handle,
1425                                 mdd_object_capa(env, mdd_tpobj));
1426         if (rc)
1427                 GOTO(cleanup, rc);
1428
1429         *la = ma->ma_attr;
1430         mdd_sobj = mdd_object_find(env, mdd, lf);
1431         if (mdd_sobj) {
1432                 la->la_valid = LA_CTIME;
1433
1434                 /* XXX: How to update ctime for remote sobj? */
1435                 rc = mdd_attr_set_internal_locked(env, mdd_sobj, la, handle, 1);
1436                 if (rc)
1437                         GOTO(cleanup, rc);
1438         }
1439         if (tobj && lu_object_exists(&tobj->mo_lu)) {
1440                 mdd_write_lock(env, mdd_tobj);
1441                 mdd_ref_del_internal(env, mdd_tobj, handle);
1442
1443                 /* Remove dot reference. */
1444                 if (is_dir)
1445                         mdd_ref_del_internal(env, mdd_tobj, handle);
1446
1447                 la->la_valid = LA_CTIME;
1448                 rc = mdd_attr_set_internal(env, mdd_tobj, la, handle, 0);
1449                 if (rc)
1450                         GOTO(cleanup, rc);
1451
1452                 rc = mdd_finish_unlink(env, mdd_tobj, ma, handle);
1453                 mdd_write_unlock(env, mdd_tobj);
1454                 if (rc)
1455                         GOTO(cleanup, rc);
1456         }
1457
1458         la->la_valid = LA_CTIME | LA_MTIME;
1459         rc = mdd_attr_set_internal_locked(env, mdd_spobj, la, handle, 0);
1460         if (rc)
1461                 GOTO(cleanup, rc);
1462
1463         if (mdd_spobj != mdd_tpobj) {
1464                 la->la_valid = LA_CTIME | LA_MTIME;
1465                 rc = mdd_attr_set_internal_locked(env, mdd_tpobj, la,
1466                                                   handle, 0);
1467         }
1468
1469         EXIT;
1470 cleanup:
1471         if (likely(tdlh) && sdlh != tdlh)
1472                 mdd_pdo_write_unlock(env, mdd_tpobj, tdlh);
1473         if (likely(sdlh))
1474                 mdd_pdo_write_unlock(env, mdd_spobj, sdlh);
1475 cleanup_unlocked:
1476         mdd_trans_stop(env, mdd, rc, handle);
1477         if (mdd_sobj)
1478                 mdd_object_put(env, mdd_sobj);
1479         return rc;
1480 }
1481
1482 struct md_dir_operations mdd_dir_ops = {
1483         .mdo_is_subdir     = mdd_is_subdir,
1484         .mdo_lookup        = mdd_lookup,
1485         .mdo_create        = mdd_create,
1486         .mdo_rename        = mdd_rename,
1487         .mdo_link          = mdd_link,
1488         .mdo_unlink        = mdd_unlink,
1489         .mdo_name_insert   = mdd_name_insert,
1490         .mdo_name_remove   = mdd_name_remove,
1491         .mdo_rename_tgt    = mdd_rename_tgt,
1492         .mdo_create_data   = mdd_create_data
1493 };