Whamcloud - gitweb
fix error handling; kill possible memleak.
[fs/lustre-release.git] / lustre / mdd / mdd_dir.c
1 /* -*- MODE: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  mdd/mdd_handler.c
5  *  Lustre Metadata Server (mdd) routines
6  *
7  *  Copyright (C) 2006 Cluster File Systems, Inc.
8  *   Author: Wang Di <wangdi@clusterfs.com>
9  *
10  *   This file is part of the Lustre file system, http://www.lustre.org
11  *   Lustre is a trademark of Cluster File Systems, Inc.
12  *
13  *   You may have signed or agreed to another license before downloading
14  *   this software.  If so, you are bound by the terms and conditions
15  *   of that agreement, and the following does not apply to you.  See the
16  *   LICENSE file included with this distribution for more information.
17  *
18  *   If you did not agree to a different license, then this copy of Lustre
19  *   is open source software; you can redistribute it and/or modify it
20  *   under the terms of version 2 of the GNU General Public License as
21  *   published by the Free Software Foundation.
22  *
23  *   In either case, Lustre is distributed in the hope that it will be
24  *   useful, but WITHOUT ANY WARRANTY; without even the implied warranty
25  *   of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
26  *   license text for more details.
27  */
28 #ifndef EXPORT_SYMTAB
29 # define EXPORT_SYMTAB
30 #endif
31 #define DEBUG_SUBSYSTEM S_MDS
32
33 #include <linux/module.h>
34 #include <linux/jbd.h>
35 #include <obd.h>
36 #include <obd_class.h>
37 #include <lustre_ver.h>
38 #include <obd_support.h>
39 #include <lprocfs_status.h>
40
41 #include <linux/ldiskfs_fs.h>
42 #include <lustre_mds.h>
43 #include <lustre/lustre_idl.h>
44 #include <lustre_fid.h>
45
46 #include "mdd_internal.h"
47
48 static const char dot[] = ".";
49 static const char dotdot[] = "..";
50
51 static int __mdd_lookup(const struct lu_env *env, struct md_object *pobj,
52                         const char *name, struct lu_fid* fid, int mask);
53 static int
54 __mdd_lookup_locked(const struct lu_env *env, struct md_object *pobj,
55                     const char *name, struct lu_fid* fid, int mask)
56 {
57         struct mdd_object *mdd_obj = md2mdd_obj(pobj);
58         struct dynlock_handle *dlh;
59         int rc;
60
61         dlh = mdd_pdo_read_lock(env, mdd_obj, name);
62         if (dlh == NULL)
63                 return -ENOMEM;
64         rc = __mdd_lookup(env, pobj, name, fid, mask);
65         mdd_pdo_read_unlock(env, mdd_obj, dlh);
66
67         return rc;
68 }
69
70 static int mdd_lookup(const struct lu_env *env,
71                       struct md_object *pobj, const char *name,
72                       struct lu_fid* fid)
73 {
74         int rc;
75         ENTRY;
76         rc = __mdd_lookup_locked(env, pobj, name, fid, MAY_EXEC);
77         RETURN(rc);
78 }
79
80
81 static int mdd_parent_fid(const struct lu_env *env, struct mdd_object *obj,
82                           struct lu_fid *fid)
83 {
84         return __mdd_lookup_locked(env, &obj->mod_obj, dotdot, fid, 0);
85 }
86
87 /*
88  * For root fid use special function, whcih does not compare version component
89  * of fid. Vresion component is different for root fids on all MDTs.
90  */
91 static int mdd_is_root(struct mdd_device *mdd, const struct lu_fid *fid)
92 {
93         return fid_seq(&mdd->mdd_root_fid) == fid_seq(fid) &&
94                 fid_oid(&mdd->mdd_root_fid) == fid_oid(fid);
95 }
96
97 /*
98  * return 1: if lf is the fid of the ancestor of p1;
99  * return 0: if not;
100  *
101  * return -EREMOTE: if remote object is found, in this
102  * case fid of remote object is saved to @pf;
103  *
104  * otherwise: values < 0, errors.
105  */
106 static int mdd_is_parent(const struct lu_env *env,
107                          struct mdd_device *mdd,
108                          struct mdd_object *p1,
109                          const struct lu_fid *lf,
110                          struct lu_fid *pf)
111 {
112         struct mdd_object *parent = NULL;
113         struct lu_fid *pfid;
114         int rc;
115         ENTRY;
116
117         LASSERT(!lu_fid_eq(mdo2fid(p1), lf));
118         pfid = &mdd_env_info(env)->mti_fid;
119
120         /* Check for root first. */
121         if (mdd_is_root(mdd, mdo2fid(p1)))
122                 RETURN(0);
123
124         for(;;) {
125                 rc = mdd_parent_fid(env, p1, pfid);
126                 if (rc)
127                         GOTO(out, rc);
128                 if (mdd_is_root(mdd, pfid))
129                         GOTO(out, rc = 0);
130                 if (lu_fid_eq(pfid, lf))
131                         GOTO(out, rc = 1);
132                 if (parent)
133                         mdd_object_put(env, parent);
134                 parent = mdd_object_find(env, mdd, pfid);
135
136                 /* cross-ref parent */
137                 if (parent == NULL) {
138                         if (pf != NULL)
139                                 *pf = *pfid;
140                         GOTO(out, rc = -EREMOTE);
141                 } else if (IS_ERR(parent))
142                         GOTO(out, rc = PTR_ERR(parent));
143                 p1 = parent;
144         }
145         EXIT;
146 out:
147         if (parent && !IS_ERR(parent))
148                 mdd_object_put(env, parent);
149         return rc;
150 }
151
152 /*
153  * No permission check is needed.
154  *
155  * returns 1: if fid is ancestor of @mo;
156  * returns 0: if fid is not a ancestor of @mo;
157  *
158  * returns EREMOTE if remote object is found, fid of remote object is saved to
159  * @fid;
160  *
161  * returns < 0: if error
162  */
163 static int mdd_is_subdir(const struct lu_env *env,
164                          struct md_object *mo, const struct lu_fid *fid,
165                          struct lu_fid *sfid)
166 {
167         struct mdd_device *mdd = mdo2mdd(mo);
168         int rc;
169         ENTRY;
170
171         if (!S_ISDIR(mdd_object_type(md2mdd_obj(mo))))
172                 RETURN(0);
173
174         rc = mdd_is_parent(env, mdd, md2mdd_obj(mo), fid, sfid);
175         if (rc == 0) {
176                 /* found root */
177                 fid_zero(sfid);
178         } else if (rc == 1) {
179                 /* found @fid is parent */
180                 *sfid = *fid;
181                 rc = 0;
182         }
183         RETURN(rc);
184 }
185
186 /* Check whether it may create the cobj under the pobj */
187 static int mdd_may_create(const struct lu_env *env, struct mdd_object *pobj,
188                           struct mdd_object *cobj, int need_check)
189 {
190         int rc = 0;
191         ENTRY;
192
193         if (cobj && lu_object_exists(&cobj->mod_obj.mo_lu))
194                 RETURN(-EEXIST);
195
196         if (mdd_is_dead_obj(pobj))
197                 RETURN(-ENOENT);
198
199         if (need_check) {
200                 rc = mdd_permission_internal(env, pobj, NULL,
201                                              MAY_WRITE | MAY_EXEC, 1);
202         }
203         RETURN(rc);
204 }
205
206 /*
207  * It's inline, so penalty for filesystems that don't use sticky bit is
208  * minimal.
209  */
210 static inline int mdd_is_sticky(const struct lu_env *env,
211                                 struct mdd_object *pobj,
212                                 struct mdd_object *cobj)
213 {
214         struct lu_attr *tmp_la = &mdd_env_info(env)->mti_la;
215         struct md_ucred *uc = md_ucred(env);
216         int rc;
217
218         rc = mdd_la_get(env, cobj, tmp_la, BYPASS_CAPA);
219         if (rc) {
220                 return rc;
221         } else if (tmp_la->la_uid == uc->mu_fsuid) {
222                 return 0;
223         } else {
224                 rc = mdd_la_get(env, pobj, tmp_la, BYPASS_CAPA);
225                 if (rc)
226                         return rc;
227                 else if (!(tmp_la->la_mode & S_ISVTX))
228                         return 0;
229                 else if (tmp_la->la_uid == uc->mu_fsuid)
230                         return 0;
231                 else
232                         return !mdd_capable(uc, CAP_FOWNER);
233         }
234 }
235
236 /* Check whether it may delete the cobj under the pobj. */
237 static int mdd_may_delete(const struct lu_env *env,
238                           struct mdd_object *pobj,
239                           struct mdd_object *cobj,
240                           int is_dir, int need_check)
241 {
242         struct mdd_device *mdd = mdo2mdd(&cobj->mod_obj);
243         int rc = 0;
244         ENTRY;
245
246         LASSERT(cobj);
247
248         if (!lu_object_exists(&cobj->mod_obj.mo_lu))
249                 RETURN(-ENOENT);
250
251         if (mdd_is_immutable(cobj) || mdd_is_append(cobj))
252                 RETURN(-EPERM);
253
254         if (is_dir) {
255                 if (!S_ISDIR(mdd_object_type(cobj)))
256                         RETURN(-ENOTDIR);
257
258                 if (lu_fid_eq(mdo2fid(cobj), &mdd->mdd_root_fid))
259                         RETURN(-EBUSY);
260
261         } else if (S_ISDIR(mdd_object_type(cobj))) {
262                 RETURN(-EISDIR);
263         }
264
265         if (pobj) {
266                 if (mdd_is_dead_obj(pobj))
267                         RETURN(-ENOENT);
268
269                 if (mdd_is_sticky(env, pobj, cobj))
270                         RETURN(-EPERM);
271
272                 if (need_check)
273                         rc = mdd_permission_internal(env, pobj, NULL,
274                                                      MAY_WRITE | MAY_EXEC, 1);
275         }
276         RETURN(rc);
277 }
278
279 int mdd_link_sanity_check(const struct lu_env *env, struct mdd_object *tgt_obj,
280                           struct mdd_object *src_obj)
281 {
282         int rc = 0;
283         ENTRY;
284
285         if (mdd_is_immutable(src_obj) || mdd_is_append(src_obj))
286                 RETURN(-EPERM);
287
288         if (S_ISDIR(mdd_object_type(src_obj)))
289                 RETURN(-EPERM);
290
291         LASSERT(src_obj != tgt_obj);
292         if (tgt_obj) {
293                 rc = mdd_may_create(env, tgt_obj, NULL, 1);
294                 if (rc)
295                         RETURN(rc);
296         }
297
298         RETURN(rc);
299 }
300
301 const struct dt_rec *__mdd_fid_rec(const struct lu_env *env,
302                                    const struct lu_fid *fid)
303 {
304         struct mdd_thread_info *info = mdd_env_info(env);
305
306         fid_cpu_to_be(&info->mti_fid2, fid);
307         return (const struct dt_rec *)&info->mti_fid2;
308 }
309
310
311 /* insert new index, add reference if isdir, update times */
312 static int __mdd_index_insert(const struct lu_env *env, struct mdd_object *pobj,
313                               const struct lu_fid *lf, const char *name, int is_dir,
314                               struct thandle *handle, struct lustre_capa *capa)
315 {
316         struct dt_object *next = mdd_object_child(pobj);
317         struct timeval    start;
318         int               rc;
319         ENTRY;
320
321         mdd_lproc_time_start(mdo2mdd(&pobj->mod_obj), &start,
322                              LPROC_MDD_INDEX_INSERT);
323         if (dt_try_as_dir(env, next)) {
324                 rc = next->do_index_ops->dio_insert(env, next,
325                                                     __mdd_fid_rec(env, lf),
326                                                     (const struct dt_key *)name,
327                                                     handle, capa);
328         } else {
329                 rc = -ENOTDIR;
330         }
331
332         if (rc == 0) {
333                 if (is_dir) {
334                         mdd_write_lock(env, pobj);
335                         mdd_ref_add_internal(env, pobj, handle);
336                         mdd_write_unlock(env, pobj);
337                 }
338         }
339         mdd_lproc_time_end(mdo2mdd(&pobj->mod_obj), &start,
340                            LPROC_MDD_INDEX_INSERT);
341         RETURN(rc);
342 }
343
344 static int __mdd_index_delete(const struct lu_env *env, struct mdd_object *pobj,
345                               const char *name, int is_dir, struct thandle *handle,
346                               struct lustre_capa *capa)
347 {
348         struct dt_object *next = mdd_object_child(pobj);
349         struct timeval    start;
350         int               rc;
351         ENTRY;
352
353         mdd_lproc_time_start(mdo2mdd(&pobj->mod_obj), &start,
354                              LPROC_MDD_INDEX_DELETE);
355
356         if (dt_try_as_dir(env, next)) {
357                 rc = next->do_index_ops->dio_delete(env, next,
358                                                     (struct dt_key *)name,
359                                                     handle, capa);
360                 if (rc == 0 && is_dir) {
361                         mdd_write_lock(env, pobj);
362                         mdd_ref_del_internal(env, pobj, handle);
363                         mdd_write_unlock(env, pobj);
364                 }
365         } else
366                 rc = -ENOTDIR;
367
368         mdd_lproc_time_end(mdo2mdd(&pobj->mod_obj), &start,
369                            LPROC_MDD_INDEX_DELETE);
370         RETURN(rc);
371 }
372
373 static int
374 __mdd_index_insert_only(const struct lu_env *env, struct mdd_object *pobj,
375                         const struct lu_fid *lf, const char *name,
376                         struct thandle *handle, struct lustre_capa *capa)
377 {
378         struct dt_object *next = mdd_object_child(pobj);
379         int               rc;
380         ENTRY;
381
382         if (dt_try_as_dir(env, next)) {
383                 rc = next->do_index_ops->dio_insert(env, next,
384                                                     __mdd_fid_rec(env, lf),
385                                                     (const struct dt_key *)name,
386                                                     handle, capa);
387         } else {
388                 rc = -ENOTDIR;
389         }
390         RETURN(rc);
391 }
392
393 static int mdd_link(const struct lu_env *env, struct md_object *tgt_obj,
394                     struct md_object *src_obj, const char *name,
395                     struct md_attr *ma)
396 {
397         struct lu_attr    *la = &mdd_env_info(env)->mti_la_for_fix;
398         struct mdd_object *mdd_tobj = md2mdd_obj(tgt_obj);
399         struct mdd_object *mdd_sobj = md2mdd_obj(src_obj);
400         struct mdd_device *mdd = mdo2mdd(src_obj);
401         struct dynlock_handle *dlh;
402         struct thandle *handle;
403         int rc;
404         ENTRY;
405
406         mdd_txn_param_build(env, mdd, MDD_TXN_LINK_OP);
407         handle = mdd_trans_start(env, mdd);
408         if (IS_ERR(handle))
409                 RETURN(PTR_ERR(handle));
410
411         dlh = mdd_pdo_write_lock(env, mdd_tobj, name);
412         if (dlh == NULL)
413                 GOTO(out_trans, rc = -ENOMEM);
414         mdd_write_lock(env, mdd_sobj);
415
416         rc = mdd_link_sanity_check(env, mdd_tobj, mdd_sobj);
417         if (rc)
418                 GOTO(out_unlock, rc);
419
420         rc = __mdd_index_insert_only(env, mdd_tobj, mdo2fid(mdd_sobj),
421                                      name, handle,
422                                      mdd_object_capa(env, mdd_tobj));
423         if (rc)
424                 GOTO(out_unlock, rc);
425
426         mdd_ref_add_internal(env, mdd_sobj, handle);
427
428         *la = ma->ma_attr;
429         la->la_valid = LA_CTIME | LA_MTIME;
430         rc = mdd_attr_set_internal_locked(env, mdd_tobj, la, handle, 0);
431         if (rc)
432                 GOTO(out_unlock, rc);
433
434         la->la_valid = LA_CTIME;
435         rc = mdd_attr_set_internal(env, mdd_sobj, la, handle, 0);
436         EXIT;
437 out_unlock:
438         mdd_write_unlock(env, mdd_sobj);
439         mdd_pdo_write_unlock(env, mdd_tobj, dlh);
440 out_trans:
441         mdd_trans_stop(env, mdd, rc, handle);
442         return rc;
443 }
444
445 /* caller should take a lock before calling */
446 int mdd_finish_unlink(const struct lu_env *env,
447                       struct mdd_object *obj, struct md_attr *ma,
448                       struct thandle *th)
449 {
450         int rc;
451         ENTRY;
452
453         rc = mdd_iattr_get(env, obj, ma);
454         if (rc == 0 && ma->ma_attr.la_nlink == 0) {
455                 /* add new orphan and the object
456                  * will be deleted during the object_put() */
457                 if (__mdd_orphan_add(env, obj, th) == 0)
458                         obj->mod_flags |= ORPHAN_OBJ;
459
460                 if (obj->mod_count == 0)
461                         rc = mdd_object_kill(env, obj, ma);
462                 else
463                         /* clear MA_LOV | MA_COOKIE, if we do not
464                          * unlink it in case we get it somewhere */
465                         ma->ma_valid &= ~(MA_LOV | MA_COOKIE);
466         } else
467                 ma->ma_valid &= ~(MA_LOV | MA_COOKIE);
468
469         RETURN(rc);
470 }
471
472 /*
473  * Check that @dir contains no entries except (possibly) dot and dotdot.
474  *
475  * Returns:
476  *
477  *             0        empty
478  *    -ENOTEMPTY        not empty
479  *           -ve        other error
480  *
481  */
482 static int mdd_dir_is_empty(const struct lu_env *env,
483                             struct mdd_object *dir)
484 {
485         struct dt_it     *it;
486         struct dt_object *obj;
487         struct dt_it_ops *iops;
488         int result;
489         ENTRY;
490
491         obj = mdd_object_child(dir);
492         iops = &obj->do_index_ops->dio_it;
493         it = iops->init(env, obj, 0, BYPASS_CAPA);
494         if (it != NULL) {
495                 result = iops->get(env, it, (const void *)"");
496                 if (result > 0) {
497                         int i;
498                         for (result = 0, i = 0; result == 0 && i < 3; ++i)
499                                 result = iops->next(env, it);
500                         if (result == 0)
501                                 result = -ENOTEMPTY;
502                         else if (result == +1)
503                                 result = 0;
504                 } else if (result == 0)
505                         /*
506                          * Huh? Index contains no zero key?
507                          */
508                         result = -EIO;
509
510                 iops->put(env, it);
511                 iops->fini(env, it);
512         } else
513                 result = -ENOMEM;
514         RETURN(result);
515 }
516
517 int mdd_unlink_sanity_check(const struct lu_env *env, struct mdd_object *pobj,
518                             struct mdd_object *cobj, struct md_attr *ma)
519 {
520         struct dt_object  *dt_cobj  = mdd_object_child(cobj);
521         int rc = 0;
522         ENTRY;
523
524         rc = mdd_may_delete(env, pobj, cobj,
525                             S_ISDIR(ma->ma_attr.la_mode), 1);
526         if (rc)
527                 RETURN(rc);
528
529         if (S_ISDIR(mdd_object_type(cobj))) {
530                 if (dt_try_as_dir(env, dt_cobj))
531                         rc = mdd_dir_is_empty(env, cobj);
532                 else
533                         rc = -ENOTDIR;
534         }
535
536         RETURN(rc);
537 }
538
539 static int mdd_unlink(const struct lu_env *env, struct md_object *pobj,
540                       struct md_object *cobj, const char *name,
541                       struct md_attr *ma)
542 {
543         struct lu_attr    *la = &mdd_env_info(env)->mti_la_for_fix;
544         struct mdd_object *mdd_pobj = md2mdd_obj(pobj);
545         struct mdd_object *mdd_cobj = md2mdd_obj(cobj);
546         struct mdd_device *mdd = mdo2mdd(pobj);
547         struct dynlock_handle *dlh;
548         struct thandle    *handle;
549         int rc, is_dir;
550         ENTRY;
551
552         rc = mdd_log_txn_param_build(env, cobj, ma, MDD_TXN_UNLINK_OP);
553         if (rc)
554                 RETURN(rc);
555
556         handle = mdd_trans_start(env, mdd);
557         if (IS_ERR(handle))
558                 RETURN(PTR_ERR(handle));
559
560         dlh = mdd_pdo_write_lock(env, mdd_pobj, name);
561         if (dlh == NULL)
562                 GOTO(out_trans, rc = -ENOMEM);
563         mdd_write_lock(env, mdd_cobj);
564
565         rc = mdd_unlink_sanity_check(env, mdd_pobj, mdd_cobj, ma);
566         if (rc)
567                 GOTO(cleanup, rc);
568
569         is_dir = S_ISDIR(lu_object_attr(&cobj->mo_lu));
570         rc = __mdd_index_delete(env, mdd_pobj, name, is_dir, handle,
571                                 mdd_object_capa(env, mdd_pobj));
572         if (rc)
573                 GOTO(cleanup, rc);
574
575         mdd_ref_del_internal(env, mdd_cobj, handle);
576         if (is_dir) {
577                 /* unlink dot */
578                 mdd_ref_del_internal(env, mdd_cobj, handle);
579         }
580
581         *la = ma->ma_attr;
582         la->la_valid = LA_CTIME | LA_MTIME;
583         rc = mdd_attr_set_internal_locked(env, mdd_pobj, la, handle, 0);
584         if (rc)
585                 GOTO(cleanup, rc);
586
587         la->la_valid = LA_CTIME;
588         rc = mdd_attr_set_internal(env, mdd_cobj, la, handle, 0);
589         if (rc)
590                 GOTO(cleanup, rc);
591
592         rc = mdd_finish_unlink(env, mdd_cobj, ma, handle);
593
594         if (rc == 0)
595                 obd_set_info_async(mdd2obd_dev(mdd)->u.mds.mds_osc_exp,
596                                    strlen("unlinked"), "unlinked", 0,
597                                    NULL, NULL);
598         EXIT;
599 cleanup:
600         mdd_write_unlock(env, mdd_cobj);
601         mdd_pdo_write_unlock(env, mdd_pobj, dlh);
602 out_trans:
603         mdd_trans_stop(env, mdd, rc, handle);
604         return rc;
605 }
606
607 static int mdd_ni_sanity_check(const struct lu_env *env,
608                                struct md_object *pobj,
609                                const char *name,
610                                const struct lu_fid *fid)
611 {
612         struct mdd_object *obj = md2mdd_obj(pobj);
613         ENTRY;
614
615         /* EEXIST check */
616         if (mdd_is_dead_obj(obj))
617                 RETURN(-ENOENT);
618
619         /* The exist of the name will be checked in _index_insert. */
620         RETURN(mdd_permission_internal(env, obj, NULL,
621                                        MAY_WRITE | MAY_EXEC, 1));
622 }
623
624 /*
625  * Partial operation.
626  */
627 static int mdd_name_insert(const struct lu_env *env, struct md_object *pobj,
628                            const char *name, const struct lu_fid *fid,
629                            int is_dir)
630 {
631         struct lu_attr   *la = &mdd_env_info(env)->mti_la;
632         struct mdd_object *mdd_obj = md2mdd_obj(pobj);
633         struct mdd_device *mdd = mdo2mdd(pobj);
634         struct dynlock_handle *dlh;
635         struct thandle *handle;
636         int rc;
637         ENTRY;
638
639         mdd_txn_param_build(env, mdd, MDD_TXN_INDEX_INSERT_OP);
640         handle = mdd_trans_start(env, mdo2mdd(pobj));
641         if (IS_ERR(handle))
642                 RETURN(PTR_ERR(handle));
643
644         dlh = mdd_pdo_write_lock(env, mdd_obj, name);
645         if (dlh == NULL)
646                 GOTO(out_trans, rc = -ENOMEM);
647         rc = mdd_ni_sanity_check(env, pobj, name, fid);
648         if (rc)
649                 GOTO(out_unlock, rc);
650
651         rc = __mdd_index_insert(env, mdd_obj, fid, name, is_dir,
652                                 handle, BYPASS_CAPA);
653
654         la->la_ctime = la->la_atime = CURRENT_SECONDS;
655         la->la_valid = LA_ATIME | LA_CTIME;
656         rc = mdd_attr_set_internal_locked(env, mdd_obj, la, handle, 0);
657         EXIT;
658 out_unlock:
659         mdd_pdo_write_unlock(env, mdd_obj, dlh);
660 out_trans:
661         mdd_trans_stop(env, mdo2mdd(pobj), rc, handle);
662         return rc;
663 }
664
665 static int mdd_nr_sanity_check(const struct lu_env *env,
666                                struct md_object *pobj,
667                                const char *name)
668 {
669         struct mdd_object *obj = md2mdd_obj(pobj);
670         int rc;
671         ENTRY;
672
673         /* EEXIST check */
674         if (mdd_is_dead_obj(obj)) {
675                 CWARN("Dir "DFID" is dead?\n", PFID(mdo2fid(obj)));
676                 RETURN(-ENOENT);
677         }
678
679         /* Name presense will be checked in _index_delete. */
680         rc = mdd_permission_internal(env, obj, NULL, MAY_WRITE | MAY_EXEC, 1);
681         RETURN(rc);
682 }
683
684 /*
685  * Partial operation.
686  */
687 static int mdd_name_remove(const struct lu_env *env,
688                            struct md_object *pobj,
689                            const char *name, int is_dir)
690 {
691         struct lu_attr    *la = &mdd_env_info(env)->mti_la_for_fix;
692         struct mdd_object *mdd_obj = md2mdd_obj(pobj);
693         struct mdd_device *mdd = mdo2mdd(pobj);
694         struct dynlock_handle *dlh;
695         struct thandle *handle;
696         int rc;
697         ENTRY;
698
699         mdd_txn_param_build(env, mdd, MDD_TXN_INDEX_DELETE_OP);
700         handle = mdd_trans_start(env, mdd);
701         if (IS_ERR(handle))
702                 RETURN(PTR_ERR(handle));
703
704         dlh = mdd_pdo_write_lock(env, mdd_obj, name);
705         if (dlh == NULL)
706                 GOTO(out_trans, rc = -ENOMEM);
707         rc = mdd_nr_sanity_check(env, pobj, name);
708         if (rc)
709                 GOTO(out_unlock, rc);
710
711         rc = __mdd_index_delete(env, mdd_obj, name, is_dir,
712                                 handle, BYPASS_CAPA);
713         if (rc)
714                 GOTO(out_unlock, rc);
715
716         la->la_ctime = la->la_mtime = CURRENT_SECONDS;
717         la->la_valid = LA_CTIME | LA_MTIME;
718         rc = mdd_attr_set_internal_locked(env, mdd_obj, la, handle, 0);
719         EXIT;
720 out_unlock:
721         mdd_pdo_write_unlock(env, mdd_obj, dlh);
722 out_trans:
723         mdd_trans_stop(env, mdd, rc, handle);
724         return rc;
725 }
726
727 static int mdd_rt_sanity_check(const struct lu_env *env,
728                                struct mdd_object *tgt_pobj,
729                                struct mdd_object *tobj,
730                                const struct lu_fid *sfid,
731                                const char *name, struct md_attr *ma)
732 {
733         int rc, src_is_dir;
734         ENTRY;
735
736         /* EEXIST check */
737         if (mdd_is_dead_obj(tgt_pobj))
738                 RETURN(-ENOENT);
739
740         src_is_dir = S_ISDIR(ma->ma_attr.la_mode);
741         if (tobj) {
742                 rc = mdd_may_delete(env, tgt_pobj, tobj, src_is_dir, 1);
743                 if (!rc && S_ISDIR(mdd_object_type(tobj)) &&
744                      mdd_dir_is_empty(env, tobj))
745                                 RETURN(-ENOTEMPTY);
746         } else {
747                 rc = mdd_may_create(env, tgt_pobj, NULL, 1);
748         }
749
750         RETURN(rc);
751 }
752
753 static int mdd_rename_tgt(const struct lu_env *env,
754                           struct md_object *pobj, struct md_object *tobj,
755                           const struct lu_fid *lf, const char *name,
756                           struct md_attr *ma)
757 {
758         struct lu_attr    *la = &mdd_env_info(env)->mti_la_for_fix;
759         struct mdd_object *mdd_tpobj = md2mdd_obj(pobj);
760         struct mdd_object *mdd_tobj = md2mdd_obj(tobj);
761         struct mdd_device *mdd = mdo2mdd(pobj);
762         struct dynlock_handle *dlh;
763         struct thandle *handle;
764         int rc;
765         ENTRY;
766
767         mdd_txn_param_build(env, mdd, MDD_TXN_RENAME_TGT_OP);
768         handle = mdd_trans_start(env, mdd);
769         if (IS_ERR(handle))
770                 RETURN(PTR_ERR(handle));
771
772         dlh = mdd_pdo_write_lock(env, mdd_tpobj, name);
773         if (dlh == NULL)
774                 GOTO(out_trans, rc = -ENOMEM);
775         if (mdd_tobj)
776                 mdd_write_lock(env, mdd_tobj);
777
778         /* XXX: Rename sanity checking. */
779         rc = mdd_rt_sanity_check(env, mdd_tpobj, mdd_tobj, lf, name, ma);
780         if (rc)
781                 GOTO(cleanup, rc);
782
783         /*
784          * If rename_tgt is called then we should just re-insert name with
785          * correct fid, no need to dec/inc parent nlink if obj is dir.
786          */
787         rc = __mdd_index_delete(env, mdd_tpobj, name, 0, handle, BYPASS_CAPA);
788         if (rc)
789                 GOTO(cleanup, rc);
790
791         rc = __mdd_index_insert_only(env, mdd_tpobj, lf, name, handle,
792                                      BYPASS_CAPA);
793         if (rc)
794                 GOTO(cleanup, rc);
795
796         *la = ma->ma_attr;
797         la->la_valid = LA_CTIME | LA_MTIME;
798         rc = mdd_attr_set_internal_locked(env, mdd_tpobj, la, handle, 0);
799         if (rc)
800                 GOTO(cleanup, rc);
801
802         if (tobj && lu_object_exists(&tobj->mo_lu)) {
803                 mdd_ref_del_internal(env, mdd_tobj, handle);
804                 la->la_valid = LA_CTIME;
805                 rc = mdd_attr_set_internal(env, mdd_tobj, la, handle, 0);
806         }
807         EXIT;
808 cleanup:
809         if (tobj)
810                 mdd_write_unlock(env, mdd_tobj);
811         mdd_pdo_write_unlock(env, mdd_tpobj, dlh);
812 out_trans:
813         mdd_trans_stop(env, mdd, rc, handle);
814         return rc;
815 }
816
817 /*
818  * The permission has been checked when obj created, no need check again.
819  */
820 static int mdd_cd_sanity_check(const struct lu_env *env,
821                                struct mdd_object *obj)
822 {
823         ENTRY;
824
825         /* EEXIST check */
826         if (!obj || mdd_is_dead_obj(obj))
827                 RETURN(-ENOENT);
828
829         RETURN(0);
830
831 }
832
833 static int mdd_create_data(const struct lu_env *env,
834                            struct md_object *pobj, struct md_object *cobj,
835                            const struct md_create_spec *spec,
836                            struct md_attr *ma)
837 {
838         struct mdd_device *mdd = mdo2mdd(cobj);
839         struct mdd_object *mdd_pobj = md2mdd_obj(pobj);/* XXX maybe NULL */
840         struct mdd_object *son = md2mdd_obj(cobj);
841         struct lu_attr    *attr = &ma->ma_attr;
842         struct lov_mds_md *lmm = NULL;
843         int                lmm_size = 0;
844         struct thandle    *handle;
845         int                rc;
846         ENTRY;
847
848         rc = mdd_cd_sanity_check(env, son);
849         if (rc)
850                 RETURN(rc);
851
852         if (spec->sp_cr_flags & MDS_OPEN_DELAY_CREATE ||
853                         !(spec->sp_cr_flags & FMODE_WRITE))
854                 RETURN(0);
855         rc = mdd_lov_create(env, mdd, mdd_pobj, son, &lmm, &lmm_size, spec,
856                             attr);
857         if (rc)
858                 RETURN(rc);
859
860         mdd_txn_param_build(env, mdd, MDD_TXN_CREATE_DATA_OP);
861         handle = mdd_trans_start(env, mdd);
862         if (IS_ERR(handle))
863                 RETURN(rc = PTR_ERR(handle));
864
865         /*
866          * XXX: Setting the lov ea is not locked but setting the attr is locked?
867          * Should this be fixed?
868          */
869
870         /* Replay creates has objects already */
871         if (spec->u.sp_ea.no_lov_create) {
872                 CDEBUG(D_INFO, "we already have lov ea\n");
873                 rc = mdd_lov_set_md(env, mdd_pobj, son,
874                                     (struct lov_mds_md *)spec->u.sp_ea.eadata,
875                                     spec->u.sp_ea.eadatalen, handle, 0);
876         } else
877                 rc = mdd_lov_set_md(env, mdd_pobj, son, lmm,
878                                     lmm_size, handle, 0);
879
880         if (rc == 0)
881                rc = mdd_attr_get_internal_locked(env, son, ma);
882
883         /* Finish mdd_lov_create() stuff. */
884         mdd_lov_create_finish(env, mdd, rc);
885         mdd_trans_stop(env, mdd, rc, handle);
886         if (lmm)
887                 OBD_FREE(lmm, lmm_size);
888         RETURN(rc);
889 }
890
891 static int
892 __mdd_lookup(const struct lu_env *env, struct md_object *pobj,
893              const char *name, struct lu_fid* fid, int mask)
894 {
895         const struct dt_key *key = (const struct dt_key *)name;
896         struct mdd_object   *mdd_obj = md2mdd_obj(pobj);
897         struct dt_object    *dir = mdd_object_child(mdd_obj);
898         struct dt_rec       *rec = (struct dt_rec *)fid;
899         struct timeval       start;
900         int rc;
901         ENTRY;
902
903         mdd_lproc_time_start(mdo2mdd(pobj), &start, LPROC_MDD_LOOKUP);
904         if (mdd_is_dead_obj(mdd_obj))
905                 RETURN(-ESTALE);
906
907         rc = lu_object_exists(mdd2lu_obj(mdd_obj));
908         if (rc == 0)
909                 RETURN(-ESTALE);
910         else if (rc < 0) {
911                 CERROR("Object "DFID" locates on remote server\n",
912                         PFID(mdo2fid(mdd_obj)));
913                 LBUG();
914         }
915
916         rc = mdd_permission_internal(env, mdd_obj, NULL, mask, 1);
917         if (rc)
918                 RETURN(rc);
919
920         if (S_ISDIR(mdd_object_type(mdd_obj)) && dt_try_as_dir(env, dir)) {
921                 rc = dir->do_index_ops->dio_lookup(env, dir, rec, key,
922                                                    mdd_object_capa(env, mdd_obj));
923                 if (rc == 0)
924                         fid_be_to_cpu(fid, fid);
925         } else
926                 rc = -ENOTDIR;
927
928         mdd_lproc_time_end(mdo2mdd(pobj), &start, LPROC_MDD_LOOKUP);
929         RETURN(rc);
930 }
931
932 int mdd_object_initialize(const struct lu_env *env, const struct lu_fid *pfid,
933                           struct mdd_object *child, struct md_attr *ma,
934                           struct thandle *handle)
935 {
936         int rc;
937         ENTRY;
938
939         /*
940          * Update attributes for child.
941          *
942          * FIXME:
943          *  (1) the valid bits should be converted between Lustre and Linux;
944          *  (2) maybe, the child attributes should be set in OSD when creation.
945          */
946
947         rc = mdd_attr_set_internal(env, child, &ma->ma_attr, handle, 0);
948         if (rc != 0)
949                 RETURN(rc);
950
951         if (S_ISDIR(ma->ma_attr.la_mode)) {
952                 /* Add "." and ".." for newly created dir */
953                 mdd_ref_add_internal(env, child, handle);
954                 rc = __mdd_index_insert_only(env, child, mdo2fid(child),
955                                              dot, handle, BYPASS_CAPA);
956                 if (rc == 0) {
957                         rc = __mdd_index_insert_only(env, child, pfid,
958                                                      dotdot, handle,
959                                                      BYPASS_CAPA);
960                         if (rc != 0) {
961                                 int rc2;
962
963                                 rc2 = __mdd_index_delete(env, child, dot, 0,
964                                                          handle, BYPASS_CAPA);
965                                 if (rc2 != 0)
966                                         CERROR("Failure to cleanup after dotdot"
967                                                " creation: %d (%d)\n", rc2, rc);
968                                 else
969                                         mdd_ref_del_internal(env, child, handle);
970                         }
971                 }
972         }
973         RETURN(rc);
974 }
975
976 static int mdd_create_sanity_check(const struct lu_env *env,
977                                    struct md_object *pobj,
978                                    const char *name,
979                                    struct md_attr *ma,
980                                    int lookup)
981 {
982         struct mdd_thread_info *info = mdd_env_info(env);
983         struct lu_attr    *la        = &info->mti_la;
984         struct lu_fid     *fid       = &info->mti_fid;
985         struct mdd_object *obj       = md2mdd_obj(pobj);
986         int rc;
987         ENTRY;
988
989         /* EEXIST check */
990         if (mdd_is_dead_obj(obj))
991                 RETURN(-ENOENT);
992
993         /*
994          * In some cases this lookup is not needed - we know before that if name
995          * exists or not.
996          */
997         if (lookup) {
998                 /*
999                  * Check if the name already exist, though it will be checked in
1000                  * _index_insert also, for avoiding rolling back if exists
1001                  * _index_insert.
1002                  */
1003                 rc = __mdd_lookup_locked(env, pobj, name, fid,
1004                                          MAY_WRITE | MAY_EXEC);
1005                 if (rc != -ENOENT)
1006                         RETURN(rc ? : -EEXIST);
1007         } else {
1008                 /*
1009                  * Check if has WRITE permission for the parent.
1010                  */
1011                 rc = mdd_permission_internal(env, obj, NULL, MAY_WRITE, 1);
1012                 if (rc)
1013                         RETURN(rc);
1014         }
1015         
1016         /* sgid check */
1017         rc = mdd_la_get(env, obj, la, BYPASS_CAPA);
1018         if (rc != 0)
1019                 RETURN(rc);
1020
1021         if (la->la_mode & S_ISGID) {
1022                 ma->ma_attr.la_gid = la->la_gid;
1023                 if (S_ISDIR(ma->ma_attr.la_mode)) {
1024                         ma->ma_attr.la_mode |= S_ISGID;
1025                         ma->ma_attr.la_valid |= LA_MODE;
1026                 }
1027         }
1028
1029         switch (ma->ma_attr.la_mode & S_IFMT) {
1030         case S_IFREG:
1031         case S_IFDIR:
1032         case S_IFLNK:
1033         case S_IFCHR:
1034         case S_IFBLK:
1035         case S_IFIFO:
1036         case S_IFSOCK:
1037                 rc = 0;
1038                 break;
1039         default:
1040                 rc = -EINVAL;
1041                 break;
1042         }
1043         RETURN(rc);
1044 }
1045
1046 /*
1047  * Create object and insert it into namespace.
1048  */
1049 static int mdd_create(const struct lu_env *env,
1050                       struct md_object *pobj, const char *name,
1051                       struct md_object *child,
1052                       struct md_create_spec *spec,
1053                       struct md_attr* ma)
1054 {
1055         struct lu_attr    *la = &mdd_env_info(env)->mti_la_for_fix;
1056         struct mdd_object *mdd_pobj = md2mdd_obj(pobj);
1057         struct mdd_object *son = md2mdd_obj(child);
1058         struct mdd_device *mdd = mdo2mdd(pobj);
1059         struct lu_attr    *attr = &ma->ma_attr;
1060         struct lov_mds_md *lmm = NULL;
1061         struct thandle    *handle;
1062         int rc, created = 0, inserted = 0, lmm_size = 0;
1063         struct dynlock_handle *dlh;
1064         struct timeval  start;
1065         ENTRY;
1066
1067         mdd_lproc_time_start(mdd, &start, LPROC_MDD_CREATE);
1068
1069         /*
1070          * Two operations have to be performed:
1071          *
1072          *  - allocation of new object (->do_create()), and
1073          *
1074          *  - insertion into parent index (->dio_insert()).
1075          *
1076          * Due to locking, operation order is not important, when both are
1077          * successful, *but* error handling cases are quite different:
1078          *
1079          *  - if insertion is done first, and following object creation fails,
1080          *  insertion has to be rolled back, but this operation might fail
1081          *  also leaving us with dangling index entry.
1082          *
1083          *  - if creation is done first, is has to be undone if insertion
1084          *  fails, leaving us with leaked space, which is neither good, nor
1085          *  fatal.
1086          *
1087          * It seems that creation-first is simplest solution, but it is
1088          * sub-optimal in the frequent
1089          *
1090          *         $ mkdir foo
1091          *         $ mkdir foo
1092          *
1093          * case, because second mkdir is bound to create object, only to
1094          * destroy it immediately.
1095          *
1096          * To avoid this follow local file systems that do double lookup:
1097          *
1098          *     0. lookup -> -EEXIST (mdd_create_sanity_check())
1099          *
1100          *     1. create            (mdd_object_create_internal())
1101          *
1102          *     2. insert            (__mdd_index_insert(), lookup again)
1103          */
1104
1105         /* Sanity checks before big job. */
1106         rc = mdd_create_sanity_check(env, pobj, name, ma, spec->sp_cr_lookup);
1107         if (rc)
1108                 RETURN(rc);
1109
1110         /*
1111          * No RPC inside the transaction, so OST objects should be created at
1112          * first.
1113          */
1114         if (S_ISREG(attr->la_mode)) {
1115                 rc = mdd_lov_create(env, mdd, mdd_pobj, son, &lmm, &lmm_size,
1116                                     spec, attr);
1117                 if (rc)
1118                         RETURN(rc);
1119         }
1120
1121         mdd_txn_param_build(env, mdd, MDD_TXN_MKDIR_OP);
1122         handle = mdd_trans_start(env, mdd);
1123         if (IS_ERR(handle))
1124                 GOTO(out_free, rc = PTR_ERR(handle));
1125
1126         dlh = mdd_pdo_write_lock(env, mdd_pobj, name);
1127         if (dlh == NULL)
1128                 GOTO(out_trans, rc = -ENOMEM);
1129
1130         /*
1131          * XXX: Check that link can be added to the parent in mkdir case.
1132          */
1133
1134         mdd_write_lock(env, son);
1135         rc = mdd_object_create_internal(env, son, ma, handle);
1136         if (rc) {
1137                 mdd_write_unlock(env, son);
1138                 GOTO(cleanup, rc);
1139         }
1140
1141         created = 1;
1142
1143 #ifdef CONFIG_FS_POSIX_ACL
1144         mdd_read_lock(env, mdd_pobj);
1145         rc = mdd_acl_init(env, mdd_pobj, son, &ma->ma_attr.la_mode, handle);
1146         mdd_read_unlock(env, mdd_pobj);
1147         if (rc) {
1148                 mdd_write_unlock(env, son);
1149                 GOTO(cleanup, rc);
1150         } else {
1151                 ma->ma_attr.la_valid |= LA_MODE;
1152         }
1153 #endif
1154
1155         rc = mdd_object_initialize(env, mdo2fid(mdd_pobj),
1156                                    son, ma, handle);
1157         mdd_write_unlock(env, son);
1158         if (rc)
1159                 /*
1160                  * Object has no links, so it will be destroyed when last
1161                  * reference is released. (XXX not now.)
1162                  */
1163                 GOTO(cleanup, rc);
1164
1165         rc = __mdd_index_insert(env, mdd_pobj, mdo2fid(son),
1166                                 name, S_ISDIR(attr->la_mode), handle,
1167                                 mdd_object_capa(env, mdd_pobj));
1168
1169         if (rc)
1170                 GOTO(cleanup, rc);
1171
1172         inserted = 1;
1173
1174         /* Replay creates has objects already. */
1175         if (spec->u.sp_ea.no_lov_create) {
1176                 CDEBUG(D_INFO, "we already have lov ea\n");
1177                 LASSERT(lmm == NULL);
1178                 lmm = (struct lov_mds_md *)spec->u.sp_ea.eadata;
1179                 lmm_size = spec->u.sp_ea.eadatalen;
1180         }
1181         rc = mdd_lov_set_md(env, mdd_pobj, son, lmm, lmm_size, handle, 0);
1182         if (rc) {
1183                 CERROR("error on stripe info copy %d \n", rc);
1184                 GOTO(cleanup, rc);
1185         }
1186         if (lmm && lmm_size > 0) {
1187                 /* Set Lov here, do not get lmm again later */
1188                 memcpy(ma->ma_lmm, lmm, lmm_size);
1189                 ma->ma_lmm_size = lmm_size;
1190                 ma->ma_valid |= MA_LOV;
1191         }
1192
1193         if (S_ISLNK(attr->la_mode)) {
1194                 struct dt_object *dt = mdd_object_child(son);
1195                 const char *target_name = spec->u.sp_symname;
1196                 int sym_len = strlen(target_name);
1197                 const struct lu_buf *buf;
1198                 loff_t pos = 0;
1199
1200                 buf = mdd_buf_get_const(env, target_name, sym_len);
1201                 rc = dt->do_body_ops->dbo_write(env, dt, buf, &pos, handle,
1202                                                 mdd_object_capa(env, son));
1203
1204                 if (rc == sym_len)
1205                         rc = 0;
1206                 else
1207                         GOTO(cleanup, rc = -EFAULT);
1208         }
1209
1210         *la = ma->ma_attr;
1211         la->la_valid = LA_CTIME | LA_MTIME;
1212         rc = mdd_attr_set_internal_locked(env, mdd_pobj, la, handle, 0);
1213         if (rc)
1214                 GOTO(cleanup, rc);
1215
1216         /* Return attr back. */
1217         rc = mdd_attr_get_internal_locked(env, son, ma);
1218         EXIT;
1219 cleanup:
1220         if (rc && created) {
1221                 int rc2 = 0;
1222
1223                 if (inserted) {
1224                         rc2 = __mdd_index_delete(env, mdd_pobj, name,
1225                                                  S_ISDIR(attr->la_mode),
1226                                                  handle, BYPASS_CAPA);
1227                         if (rc2)
1228                                 CERROR("error can not cleanup destroy %d\n",
1229                                        rc2);
1230                 }
1231                 if (rc2 == 0) {
1232                         mdd_write_lock(env, son);
1233                         mdd_ref_del_internal(env, son, handle);
1234                         mdd_write_unlock(env, son);
1235                 }
1236         }
1237
1238         mdd_pdo_write_unlock(env, mdd_pobj, dlh);
1239 out_trans:
1240         mdd_trans_stop(env, mdd, rc, handle);
1241 out_free:
1242         if (lmm && !spec->u.sp_ea.no_lov_create)
1243                 OBD_FREE(lmm, lmm_size);
1244         /* Finish mdd_lov_create() stuff */
1245         mdd_lov_create_finish(env, mdd, rc);
1246         mdd_lproc_time_end(mdd, &start, LPROC_MDD_CREATE);
1247         return rc;
1248 }
1249
1250 /*
1251  * Get locks on parents in proper order
1252  * RETURN: < 0 - error, rename_order if successful
1253  */
1254 enum rename_order {
1255         MDD_RN_SAME,
1256         MDD_RN_SRCTGT,
1257         MDD_RN_TGTSRC
1258 };
1259
1260 static int mdd_rename_order(const struct lu_env *env,
1261                             struct mdd_device *mdd,
1262                             struct mdd_object *src_pobj,
1263                             struct mdd_object *tgt_pobj)
1264 {
1265         /* order of locking, 1 - tgt-src, 0 - src-tgt*/
1266         int rc;
1267         ENTRY;
1268
1269         if (src_pobj == tgt_pobj)
1270                 RETURN(MDD_RN_SAME);
1271
1272         /* compared the parent child relationship of src_p&tgt_p */
1273         if (lu_fid_eq(&mdd->mdd_root_fid, mdo2fid(src_pobj))){
1274                 rc = MDD_RN_SRCTGT;
1275         } else if (lu_fid_eq(&mdd->mdd_root_fid, mdo2fid(tgt_pobj))) {
1276                 rc = MDD_RN_TGTSRC;
1277         } else {
1278                 rc = mdd_is_parent(env, mdd, src_pobj, mdo2fid(tgt_pobj), NULL);
1279                 if (rc == -EREMOTE)
1280                         rc = 0;
1281
1282                 if (rc == 1)
1283                         rc = MDD_RN_TGTSRC;
1284                 else if (rc == 0)
1285                         rc = MDD_RN_SRCTGT;
1286         }
1287
1288         RETURN(rc);
1289 }
1290
1291 static int mdd_rename_sanity_check(const struct lu_env *env,
1292                                    struct mdd_object *src_pobj,
1293                                    struct mdd_object *tgt_pobj,
1294                                    const struct lu_fid *sfid,
1295                                    int src_is_dir,
1296                                    struct mdd_object *tobj)
1297 {
1298         int rc;
1299         ENTRY;
1300
1301         if (mdd_is_dead_obj(src_pobj))
1302                 RETURN(-ENOENT);
1303
1304         /* The sobj maybe on the remote, check parent permission only here */
1305         rc = mdd_permission_internal(env, src_pobj, NULL,
1306                                      MAY_WRITE | MAY_EXEC, 1);
1307         if (rc)
1308                 RETURN(rc);
1309
1310         if (!tobj) {
1311                 rc = mdd_may_create(env, tgt_pobj, NULL,
1312                                     (src_pobj != tgt_pobj));
1313         } else {
1314                 rc = mdd_may_delete(env, tgt_pobj, tobj, src_is_dir,
1315                                     (src_pobj != tgt_pobj));
1316                 if (rc == 0)
1317                         if (S_ISDIR(mdd_object_type(tobj))
1318                             && mdd_dir_is_empty(env, tobj))
1319                                 rc = -ENOTEMPTY;
1320         }
1321
1322         RETURN(rc);
1323 }
1324 /* src object can be remote that is why we use only fid and type of object */
1325 static int mdd_rename(const struct lu_env *env,
1326                       struct md_object *src_pobj, struct md_object *tgt_pobj,
1327                       const struct lu_fid *lf, const char *sname,
1328                       struct md_object *tobj, const char *tname,
1329                       struct md_attr *ma)
1330 {
1331         struct lu_attr    *la = &mdd_env_info(env)->mti_la_for_fix;
1332         struct mdd_object *mdd_spobj = md2mdd_obj(src_pobj);
1333         struct mdd_object *mdd_tpobj = md2mdd_obj(tgt_pobj);
1334         struct mdd_device *mdd = mdo2mdd(src_pobj);
1335         struct mdd_object *mdd_sobj = NULL;
1336         struct mdd_object *mdd_tobj = NULL;
1337         struct dynlock_handle *sdlh, *tdlh;
1338         struct thandle *handle;
1339         int is_dir;
1340         int rc;
1341         ENTRY;
1342
1343         LASSERT(ma->ma_attr.la_mode & S_IFMT);
1344         is_dir = S_ISDIR(ma->ma_attr.la_mode);
1345         if (ma->ma_attr.la_valid & LA_FLAGS &&
1346             ma->ma_attr.la_flags & (LUSTRE_APPEND_FL | LUSTRE_IMMUTABLE_FL))
1347                 RETURN(-EPERM);
1348
1349         if (tobj)
1350                 mdd_tobj = md2mdd_obj(tobj);
1351
1352         mdd_txn_param_build(env, mdd, MDD_TXN_RENAME_OP);
1353         handle = mdd_trans_start(env, mdd);
1354         if (IS_ERR(handle))
1355                 RETURN(PTR_ERR(handle));
1356
1357         /* FIXME: Should consider tobj and sobj too in rename_lock. */
1358         rc = mdd_rename_order(env, mdd, mdd_spobj, mdd_tpobj);
1359         if (rc < 0)
1360                 GOTO(cleanup_unlocked, rc);
1361
1362         /* Get locks in determined order */
1363         if (rc == MDD_RN_SAME) {
1364                 sdlh = mdd_pdo_write_lock(env, mdd_spobj, sname);
1365                 /* check hashes to determine do we need one lock or two */
1366                 if (mdd_name2hash(sname) != mdd_name2hash(tname))
1367                         tdlh = mdd_pdo_write_lock(env, mdd_tpobj, tname);
1368                 else
1369                         tdlh = sdlh;
1370         } else if (rc == MDD_RN_SRCTGT) {
1371                 sdlh = mdd_pdo_write_lock(env, mdd_spobj, sname);
1372                 tdlh = mdd_pdo_write_lock(env, mdd_tpobj, tname);
1373         } else {
1374                 tdlh = mdd_pdo_write_lock(env, mdd_tpobj, tname);
1375                 sdlh = mdd_pdo_write_lock(env, mdd_spobj, sname);
1376         }
1377         if (sdlh == NULL || tdlh == NULL)
1378                 GOTO(cleanup, rc = -ENOMEM);
1379
1380         rc = mdd_rename_sanity_check(env, mdd_spobj, mdd_tpobj,
1381                                      lf, is_dir, mdd_tobj);
1382         if (rc)
1383                 GOTO(cleanup, rc);
1384
1385         rc = __mdd_index_delete(env, mdd_spobj, sname, is_dir, handle,
1386                                 mdd_object_capa(env, mdd_spobj));
1387         if (rc)
1388                 GOTO(cleanup, rc);
1389
1390         /*
1391          * Here tobj can be remote one, so we do index_delete unconditionally
1392          * and -ENOENT is allowed.
1393          */
1394         rc = __mdd_index_delete(env, mdd_tpobj, tname, is_dir, handle,
1395                                 mdd_object_capa(env, mdd_tpobj));
1396         if (rc != 0 && rc != -ENOENT)
1397                 GOTO(cleanup, rc);
1398
1399         rc = __mdd_index_insert(env, mdd_tpobj, lf, tname, is_dir, handle,
1400                                 mdd_object_capa(env, mdd_tpobj));
1401         if (rc)
1402                 GOTO(cleanup, rc);
1403
1404         *la = ma->ma_attr;
1405         mdd_sobj = mdd_object_find(env, mdd, lf);
1406         if (mdd_sobj) {
1407                 la->la_valid = LA_CTIME;
1408
1409                 /* XXX: How to update ctime for remote sobj? */
1410                 rc = mdd_attr_set_internal_locked(env, mdd_sobj, la, handle, 1);
1411                 if (rc)
1412                         GOTO(cleanup, rc);
1413         }
1414         if (tobj && lu_object_exists(&tobj->mo_lu)) {
1415                 mdd_write_lock(env, mdd_tobj);
1416                 mdd_ref_del_internal(env, mdd_tobj, handle);
1417
1418                 /* Remove dot reference. */
1419                 if (is_dir)
1420                         mdd_ref_del_internal(env, mdd_tobj, handle);
1421
1422                 la->la_valid = LA_CTIME;
1423                 rc = mdd_attr_set_internal(env, mdd_tobj, la, handle, 0);
1424                 if (rc)
1425                         GOTO(cleanup, rc);
1426
1427                 rc = mdd_finish_unlink(env, mdd_tobj, ma, handle);
1428                 mdd_write_unlock(env, mdd_tobj);
1429                 if (rc)
1430                         GOTO(cleanup, rc);
1431         }
1432
1433         la->la_valid = LA_CTIME | LA_MTIME;
1434         rc = mdd_attr_set_internal_locked(env, mdd_spobj, la, handle, 0);
1435         if (rc)
1436                 GOTO(cleanup, rc);
1437
1438         if (mdd_spobj != mdd_tpobj) {
1439                 la->la_valid = LA_CTIME | LA_MTIME;
1440                 rc = mdd_attr_set_internal_locked(env, mdd_tpobj, la,
1441                                                   handle, 0);
1442         }
1443
1444         EXIT;
1445 cleanup:
1446         if (likely(tdlh) && sdlh != tdlh)
1447                 mdd_pdo_write_unlock(env, mdd_tpobj, tdlh);
1448         if (likely(sdlh))
1449                 mdd_pdo_write_unlock(env, mdd_spobj, sdlh);
1450 cleanup_unlocked:
1451         mdd_trans_stop(env, mdd, rc, handle);
1452         if (mdd_sobj)
1453                 mdd_object_put(env, mdd_sobj);
1454         return rc;
1455 }
1456
1457 struct md_dir_operations mdd_dir_ops = {
1458         .mdo_is_subdir     = mdd_is_subdir,
1459         .mdo_lookup        = mdd_lookup,
1460         .mdo_create        = mdd_create,
1461         .mdo_rename        = mdd_rename,
1462         .mdo_link          = mdd_link,
1463         .mdo_unlink        = mdd_unlink,
1464         .mdo_name_insert   = mdd_name_insert,
1465         .mdo_name_remove   = mdd_name_remove,
1466         .mdo_rename_tgt    = mdd_rename_tgt,
1467         .mdo_create_data   = mdd_create_data
1468 };