Whamcloud - gitweb
25cadbe27f50277bc4e4d38f74b50e86bcce970d
[fs/lustre-release.git] / lustre / mdt / mdt_reint.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  linux/mdt/mdt_reint.c
5  *  Lustre Metadata Target (mdt) reintegration routines
6  *
7  *  Copyright (C) 2002-2006 Cluster File Systems, Inc.
8  *   Author: Peter Braam <braam@clusterfs.com>
9  *   Author: Andreas Dilger <adilger@clusterfs.com>
10  *   Author: Phil Schwan <phil@clusterfs.com>
11  *   Author: Huang Hua <huanghua@clusterfs.com>
12  *
13  *   This file is part of the Lustre file system, http://www.lustre.org
14  *   Lustre is a trademark of Cluster File Systems, Inc.
15  *
16  *   You may have signed or agreed to another license before downloading
17  *   this software.  If so, you are bound by the terms and conditions
18  *   of that agreement, and the following does not apply to you.  See the
19  *   LICENSE file included with this distribution for more information.
20  *
21  *   If you did not agree to a different license, then this copy of Lustre
22  *   is open source software; you can redistribute it and/or modify it
23  *   under the terms of version 2 of the GNU General Public License as
24  *   published by the Free Software Foundation.
25  *
26  *   In either case, Lustre is distributed in the hope that it will be
27  *   useful, but WITHOUT ANY WARRANTY; without even the implied warranty
28  *   of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
29  *   license text for more details.
30  */
31
32 #ifndef EXPORT_SYMTAB
33 # define EXPORT_SYMTAB
34 #endif
35 #define DEBUG_SUBSYSTEM S_MDS
36
37 #include "mdt_internal.h"
38
39 static int mdt_md_create(struct mdt_thread_info *info)
40 {
41         struct mdt_device      *mdt = info->mti_mdt;
42         struct mdt_object      *parent;
43         struct mdt_object      *child;
44         struct mdt_lock_handle *lh;
45         struct mdt_body        *repbody;
46         struct md_attr         *ma = &info->mti_attr;
47         struct mdt_reint_record *rr = &info->mti_rr;
48         int rc;
49         ENTRY;
50
51         repbody = req_capsule_server_get(&info->mti_pill, &RMF_MDT_BODY);
52
53         lh = &info->mti_lh[MDT_LH_PARENT];
54         lh->mlh_mode = LCK_EX;
55
56         parent = mdt_object_find_lock(info, rr->rr_fid1,
57                                       lh, MDS_INODELOCK_UPDATE,
58                                       rr->rr_capa1);
59         if (IS_ERR(parent))
60                 RETURN(PTR_ERR(parent));
61
62         child = mdt_object_find(info->mti_env, mdt, rr->rr_fid2, BYPASS_CAPA);
63         if (!IS_ERR(child)) {
64                 struct md_object *next = mdt_object_child(parent);
65
66                 ma->ma_need = MA_INODE;
67                 mdt_fail_write(info->mti_env, info->mti_mdt->mdt_bottom,
68                                OBD_FAIL_MDS_REINT_CREATE_WRITE);
69
70                 rc = mdo_create(info->mti_env, next, rr->rr_name,
71                                 mdt_object_child(child),
72                                 &info->mti_spec, ma);
73                 if (rc == 0) {
74                         /* return fid & attr to client. */
75                         if (ma->ma_valid & MA_INODE)
76                                 mdt_pack_attr2body(repbody, &ma->ma_attr,
77                                                    mdt_object_fid(child));
78                                 mdt_body_reverse_idmap(info, repbody);
79                 }
80                 mdt_object_put(info->mti_env, child);
81         } else
82                 rc = PTR_ERR(child);
83         mdt_object_unlock_put(info, parent, lh, rc);
84         RETURN(rc);
85 }
86
87 /* partial request to create object only */
88 static int mdt_md_mkobj(struct mdt_thread_info *info)
89 {
90         struct mdt_device      *mdt = info->mti_mdt;
91         struct mdt_object      *o;
92         struct mdt_body        *repbody;
93         struct md_attr         *ma = &info->mti_attr;
94         int rc;
95         ENTRY;
96
97         repbody = req_capsule_server_get(&info->mti_pill, &RMF_MDT_BODY);
98
99         o = mdt_object_find(info->mti_env, mdt, info->mti_rr.rr_fid2,
100                             BYPASS_CAPA);
101         if (!IS_ERR(o)) {
102                 struct md_object *next = mdt_object_child(o);
103
104                 ma->ma_need = MA_INODE;
105                 /* Cross-ref create can encounter already created obj in case
106                  * of recovery, just get attr in that case */
107                 if (lu_object_exists(&o->mot_obj.mo_lu) == 1) {
108                         rc = mo_attr_get(info->mti_env, next, ma);
109                 } else {
110                         rc = mo_object_create(info->mti_env, next, &info->mti_spec,
111                                       ma);
112                 }
113                 if (rc == 0) {
114                         /* return fid & attr to client. */
115                         if (ma->ma_valid & MA_INODE)
116                                 mdt_pack_attr2body(repbody, &ma->ma_attr,
117                                                    mdt_object_fid(o));
118                                 mdt_body_reverse_idmap(info, repbody);
119                 }
120                 mdt_object_put(info->mti_env, o);
121         } else
122                 rc = PTR_ERR(o);
123
124         RETURN(rc);
125 }
126
127 /* In the raw-setattr case, we lock the child inode.
128  * In the write-back case or if being called from open,
129  *               the client holds a lock already.
130  * We use the ATTR_FROM_OPEN (translated into MRF_SETATTR_LOCKED by
131  * mdt_setattr_unpack()) flag to tell these cases apart. */
132 int mdt_attr_set(struct mdt_thread_info *info, struct mdt_object *mo, int flags)
133 {
134         struct md_attr          *ma = &info->mti_attr;
135         struct mdt_lock_handle  *lh;
136         int som_update = 0;
137         int rc;
138         ENTRY;
139
140         if (info->mti_epoch)
141                 som_update = (info->mti_epoch->flags & MF_SOM_CHANGE);
142
143         /* Try to avoid object_lock if another epoch has been started
144          * already. */
145         if (som_update && (info->mti_epoch->ioepoch != mo->mot_ioepoch))
146                 RETURN(0);
147
148         lh = &info->mti_lh[MDT_LH_PARENT];
149         lh->mlh_mode = LCK_EX;
150
151         if (!(flags & MRF_SETATTR_LOCKED)) {
152                 __u64 lockpart = MDS_INODELOCK_UPDATE;
153                 if (ma->ma_attr.la_valid & (LA_MODE|LA_UID|LA_GID))
154                         lockpart |= MDS_INODELOCK_LOOKUP;
155
156                 rc = mdt_object_lock(info, mo, lh, lockpart);
157                 if (rc != 0)
158                         GOTO(out, rc);
159         }
160
161         /* Setattrs are syncronized through dlm lock taken above. If another
162          * epoch started, its attributes may be already flushed on disk,
163          * skip setattr. */
164         if (som_update && (info->mti_epoch->ioepoch != mo->mot_ioepoch))
165                 GOTO(out, rc = 0);
166
167         if (lu_object_assert_not_exists(&mo->mot_obj.mo_lu))
168                 GOTO(out, rc = -ENOENT);
169
170         /* all attrs are packed into mti_attr in unpack_setattr */
171         mdt_fail_write(info->mti_env, info->mti_mdt->mdt_bottom,
172                        OBD_FAIL_MDS_REINT_SETATTR_WRITE);
173
174         /* all attrs are packed into mti_attr in unpack_setattr */
175         rc = mo_attr_set(info->mti_env, mdt_object_child(mo), ma);
176         if (rc != 0)
177                 GOTO(out, rc);
178
179         /* Re-enable SIZEONMDS. */
180         if (som_update) {
181                 CDEBUG(D_INODE, "Closing epoch "LPU64" on "DFID". Count %d\n",
182                        mo->mot_ioepoch, PFID(mdt_object_fid(mo)),
183                        mo->mot_epochcount);
184
185                 mdt_sizeonmds_enable(info, mo);
186         }
187
188         EXIT;
189 out:
190         mdt_object_unlock(info, mo, lh, rc);
191         return(rc);
192 }
193
194 static int mdt_reint_setattr(struct mdt_thread_info *info,
195                              struct mdt_lock_handle *lhc)
196 {
197         struct mdt_device       *mdt = info->mti_mdt;
198         struct md_attr          *ma = &info->mti_attr;
199         struct mdt_reint_record *rr = &info->mti_rr;
200         struct ptlrpc_request   *req = mdt_info_req(info);
201         struct mdt_export_data  *med = &req->rq_export->exp_mdt_data;
202         struct mdt_file_data    *mfd;
203         struct mdt_object       *mo;
204         struct md_object        *next;
205         struct mdt_body         *repbody;
206         int                      rc;
207
208         ENTRY;
209
210         DEBUG_REQ(D_INODE, req, "setattr "DFID" %x", PFID(rr->rr_fid1),
211                   (unsigned int)ma->ma_attr.la_valid);
212
213         repbody = req_capsule_server_get(&info->mti_pill, &RMF_MDT_BODY);
214         mo = mdt_object_find(info->mti_env, info->mti_mdt, rr->rr_fid1,
215                              rr->rr_capa1);
216         if (IS_ERR(mo))
217                 RETURN(rc = PTR_ERR(mo));
218
219         if (info->mti_epoch && (info->mti_epoch->flags & MF_EPOCH_OPEN)) {
220                 /* Truncate case. */
221                 rc = mdt_write_get(info->mti_mdt, mo);
222                 if (rc)
223                         GOTO(out, rc);
224
225                 mfd = mdt_mfd_new();
226                 if (mfd == NULL)
227                         GOTO(out, rc = -ENOMEM);
228
229                 /* FIXME: in recovery, need to pass old epoch here */
230                 mdt_epoch_open(info, mo, 0);
231                 repbody->ioepoch = mo->mot_ioepoch;
232
233                 mdt_object_get(info->mti_env, mo);
234                 mfd->mfd_mode = FMODE_EPOCHLCK;
235                 mfd->mfd_object = mo;
236                 mfd->mfd_xid = req->rq_xid;
237
238                 spin_lock(&med->med_open_lock);
239                 list_add(&mfd->mfd_list, &med->med_open_head);
240                 spin_unlock(&med->med_open_lock);
241                 repbody->handle.cookie = mfd->mfd_handle.h_cookie;
242         }
243
244         rc = mdt_attr_set(info, mo, rr->rr_flags);
245         if (rc)
246                 GOTO(out, rc);
247
248         if (info->mti_epoch && (info->mti_epoch->flags & MF_SOM_CHANGE)) {
249                 LASSERT(info->mti_epoch);
250
251                 /* Size-on-MDS Update. Find and free mfd. */
252                 spin_lock(&med->med_open_lock);
253                 mfd = mdt_handle2mfd(&(info->mti_epoch->handle));
254                 if (mfd == NULL) {
255                         spin_unlock(&med->med_open_lock);
256                         CDEBUG(D_INODE, "no handle for file close: "
257                                "fid = "DFID": cookie = "LPX64"\n",
258                                PFID(info->mti_rr.rr_fid1),
259                                info->mti_epoch->handle.cookie);
260                         GOTO(out, rc = -ESTALE);
261                 }
262
263                 LASSERT(mfd->mfd_mode == FMODE_SOM);
264                 LASSERT(ma->ma_attr.la_valid & LA_SIZE);
265                 LASSERT(!(info->mti_epoch->flags & MF_EPOCH_CLOSE));
266
267                 class_handle_unhash(&mfd->mfd_handle);
268                 list_del_init(&mfd->mfd_list);
269                 spin_unlock(&med->med_open_lock);
270                 mdt_mfd_close(info, mfd);
271         }
272
273         ma->ma_need = MA_INODE;
274         next = mdt_object_child(mo);
275         rc = mo_attr_get(info->mti_env, next, ma);
276         if (rc != 0)
277                 GOTO(out, rc);
278
279         mdt_pack_attr2body(repbody, &ma->ma_attr, mdt_object_fid(mo));
280
281         if (mdt->mdt_opts.mo_oss_capa) {
282                 struct lustre_capa *capa;
283
284                 capa = req_capsule_server_get(&info->mti_pill, &RMF_CAPA1);
285                 LASSERT(capa);
286                 capa->lc_opc = CAPA_OPC_OSS_DEFAULT | CAPA_OPC_OSS_TRUNC;
287                 rc = mo_capa_get(info->mti_env, mdt_object_child(mo), capa);
288                 if (rc)
289                         RETURN(rc);
290                 repbody->valid |= OBD_MD_FLOSSCAPA;
291         }
292
293         mdt_body_reverse_idmap(info, repbody);
294         EXIT;
295 out:
296         mdt_object_put(info->mti_env, mo);
297         return rc;
298 }
299
300 static int mdt_reint_create(struct mdt_thread_info *info,
301                             struct mdt_lock_handle *lhc)
302 {
303         int rc;
304         ENTRY;
305
306         if (MDT_FAIL_CHECK(OBD_FAIL_MDS_REINT_CREATE))
307                 RETURN(-ESTALE);
308
309         switch (info->mti_attr.ma_attr.la_mode & S_IFMT) {
310         case S_IFREG:
311         case S_IFDIR:{
312                 if (strlen(info->mti_rr.rr_name) == 0) {
313                         rc = mdt_md_mkobj(info);
314                         break;
315                 }
316         }
317         case S_IFLNK:
318         case S_IFCHR:
319         case S_IFBLK:
320         case S_IFIFO:
321         case S_IFSOCK:{
322                 /* special file should stay on the same node as parent */
323                 LASSERT(strlen(info->mti_rr.rr_name) > 0);
324                 rc = mdt_md_create(info);
325                 break;
326         }
327         default:
328                 rc = -EOPNOTSUPP;
329         }
330         RETURN(rc);
331 }
332
333
334 static int mdt_reint_unlink(struct mdt_thread_info *info,
335                             struct mdt_lock_handle *lhc)
336 {
337         struct mdt_reint_record *rr = &info->mti_rr;
338         struct ptlrpc_request   *req = mdt_info_req(info);
339         struct md_attr          *ma = &info->mti_attr;
340         struct lu_fid           *child_fid = &info->mti_tmp_fid1;
341         struct mdt_object       *mp;
342         struct mdt_object       *mc;
343         struct mdt_lock_handle  *parent_lh;
344         struct mdt_lock_handle  *child_lh;
345         int                      rc;
346         ENTRY;
347
348         DEBUG_REQ(D_INODE, req, "unlink "DFID"/%s\n", PFID(rr->rr_fid1),
349                   rr->rr_name);
350
351         if (MDT_FAIL_CHECK(OBD_FAIL_MDS_REINT_UNLINK))
352                 GOTO(out, rc = -ENOENT);
353
354         /* step 1: lock the parent */
355         parent_lh = &info->mti_lh[MDT_LH_PARENT];
356         parent_lh->mlh_mode = LCK_EX;
357         mp = mdt_object_find_lock(info, rr->rr_fid1, parent_lh,
358                                   MDS_INODELOCK_UPDATE, rr->rr_capa1);
359         if (IS_ERR(mp))
360                 GOTO(out, rc = PTR_ERR(mp));
361
362         ma->ma_lmm = req_capsule_server_get(&info->mti_pill, &RMF_MDT_MD);
363         ma->ma_lmm_size = req_capsule_get_size(&info->mti_pill,
364                                                &RMF_MDT_MD, RCL_SERVER);
365
366         ma->ma_cookie = req_capsule_server_get(&info->mti_pill,
367                                                &RMF_LOGCOOKIES);
368         ma->ma_cookie_size = req_capsule_get_size(&info->mti_pill,
369                                                   &RMF_LOGCOOKIES,
370                                                   RCL_SERVER);
371         ma->ma_need = MA_INODE | MA_LOV | MA_COOKIE;
372
373         if (!ma->ma_lmm || !ma->ma_cookie)
374                 GOTO(out_unlock_parent, rc = -EINVAL);
375
376         if (strlen(rr->rr_name) == 0) {
377                 /* remote partial operation */
378                 rc = mo_ref_del(info->mti_env, mdt_object_child(mp), ma);
379                 
380                 mdt_handle_last_unlink(info, mp, ma);
381                 GOTO(out_unlock_parent, rc);
382         }
383
384         /* step 2: find & lock the child */
385         rc = mdo_lookup(info->mti_env, mdt_object_child(mp),
386                         rr->rr_name, child_fid);
387         if (rc != 0)
388                  GOTO(out_unlock_parent, rc);
389
390         /* we will lock the child regardless it is local or remote. No harm. */
391         mc = mdt_object_find(info->mti_env, info->mti_mdt, child_fid,
392                                   BYPASS_CAPA);
393         if (IS_ERR(mc))
394                 GOTO(out_unlock_parent, rc = PTR_ERR(mc));
395         child_lh = &info->mti_lh[MDT_LH_CHILD];
396         child_lh->mlh_mode = LCK_EX;
397         rc = mdt_object_cr_lock(info, mc, child_lh, MDS_INODELOCK_FULL);
398         if (rc != 0)
399                 GOTO(out_put_child, rc);
400
401         mdt_fail_write(info->mti_env, info->mti_mdt->mdt_bottom,
402                        OBD_FAIL_MDS_REINT_UNLINK_WRITE);
403
404         /*
405          * Now we can only make sure we need MA_INODE, in mdd layer, will check
406          * whether need MA_LOV and MA_COOKIE.
407          */
408         ma->ma_need = MA_INODE;
409         rc = mdo_unlink(info->mti_env, mdt_object_child(mp),
410                         mdt_object_child(mc), rr->rr_name, ma);
411         if (rc)
412                 GOTO(out_unlock_child, rc);
413
414         mdt_handle_last_unlink(info, mc, ma);
415
416         GOTO(out_unlock_child, rc);
417 out_unlock_child:
418         mdt_object_unlock(info, mc, child_lh, rc);
419 out_put_child:
420         mdt_object_put(info->mti_env, mc);
421 out_unlock_parent:
422         mdt_object_unlock_put(info, mp, parent_lh, rc);
423 out:
424         mdt_shrink_reply(info, REPLY_REC_OFF + 1, 0, 0);
425         return rc;
426 }
427
428 static int mdt_reint_link(struct mdt_thread_info *info,
429                           struct mdt_lock_handle *lhc)
430 {
431         struct mdt_reint_record *rr = &info->mti_rr;
432         struct ptlrpc_request   *req = mdt_info_req(info);
433         struct md_attr          *ma = &info->mti_attr;
434         struct mdt_object       *ms;
435         struct mdt_object       *mp;
436         struct mdt_lock_handle  *lhs;
437         struct mdt_lock_handle  *lhp;
438         int rc;
439
440         ENTRY;
441
442         DEBUG_REQ(D_INODE, req, "link original "DFID" to "DFID" %s",
443                   PFID(rr->rr_fid1), PFID(rr->rr_fid2), rr->rr_name);
444
445         if (MDT_FAIL_CHECK(OBD_FAIL_MDS_REINT_LINK))
446                 RETURN(-ENOENT);
447
448         /* step 1: lock the source */
449         lhs = &info->mti_lh[MDT_LH_PARENT];
450         lhs->mlh_mode = LCK_EX;
451         ms = mdt_object_find_lock(info, rr->rr_fid1, lhs,
452                                   MDS_INODELOCK_UPDATE, rr->rr_capa1);
453         if (IS_ERR(ms))
454                 RETURN(PTR_ERR(ms));
455
456         if (strlen(rr->rr_name) == 0) {
457                 /* remote partial operation */
458                 rc = mo_ref_add(info->mti_env, mdt_object_child(ms));
459                 GOTO(out_unlock_source, rc);
460         }
461         /*step 2: find & lock the target parent dir*/
462         lhp = &info->mti_lh[MDT_LH_CHILD];
463         lhp->mlh_mode = LCK_EX;
464         mp = mdt_object_find_lock(info, rr->rr_fid2, lhp,
465                                   MDS_INODELOCK_UPDATE, rr->rr_capa2);
466         if (IS_ERR(mp))
467                 GOTO(out_unlock_source, rc = PTR_ERR(mp));
468
469         /* step 4: link it */
470
471         mdt_fail_write(info->mti_env, info->mti_mdt->mdt_bottom,
472                        OBD_FAIL_MDS_REINT_LINK_WRITE);
473
474         rc = mdo_link(info->mti_env, mdt_object_child(mp),
475                       mdt_object_child(ms), rr->rr_name, ma);
476         GOTO(out_unlock_target, rc);
477
478 out_unlock_target:
479         mdt_object_unlock_put(info, mp, lhp, rc);
480 out_unlock_source:
481         mdt_object_unlock_put(info, ms, lhs, rc);
482         return rc;
483 }
484
485 /* partial operation for rename */
486 static int mdt_reint_rename_tgt(struct mdt_thread_info *info)
487 {
488         struct mdt_reint_record *rr = &info->mti_rr;
489         struct ptlrpc_request   *req = mdt_info_req(info);
490         struct md_attr          *ma = &info->mti_attr;
491         struct mdt_object       *mtgtdir;
492         struct mdt_object       *mtgt = NULL;
493         struct mdt_lock_handle  *lh_tgtdir;
494         struct mdt_lock_handle  *lh_tgt;
495         struct lu_fid           *tgt_fid = &info->mti_tmp_fid1;
496         int                      rc;
497
498         ENTRY;
499
500         DEBUG_REQ(D_INODE, req, "rename_tgt "DFID" to "DFID" %s",
501                   PFID(rr->rr_fid2),
502                   PFID(rr->rr_fid1), rr->rr_tgt);
503
504         /* step 1: lookup & lock the tgt dir */
505         lh_tgt = &info->mti_lh[MDT_LH_CHILD];
506         lh_tgtdir = &info->mti_lh[MDT_LH_PARENT];
507         lh_tgtdir->mlh_mode = LCK_EX;
508         mtgtdir = mdt_object_find_lock(info, rr->rr_fid1, lh_tgtdir,
509                                        MDS_INODELOCK_UPDATE, rr->rr_capa1);
510         if (IS_ERR(mtgtdir))
511                 GOTO(out, rc = PTR_ERR(mtgtdir));
512
513         /*step 2: find & lock the target object if exists*/
514         rc = mdo_lookup(info->mti_env, mdt_object_child(mtgtdir),
515                         rr->rr_tgt, tgt_fid);
516         if (rc != 0 && rc != -ENOENT) {
517                 GOTO(out_unlock_tgtdir, rc);
518         } else if (rc == 0) {
519                 lh_tgt->mlh_mode = LCK_EX;
520
521                 mtgt = mdt_object_find_lock(info, tgt_fid, lh_tgt,
522                                             MDS_INODELOCK_LOOKUP, BYPASS_CAPA);
523                 if (IS_ERR(mtgt))
524                         GOTO(out_unlock_tgtdir, rc = PTR_ERR(mtgt));
525
526                 rc = mdo_rename_tgt(info->mti_env, mdt_object_child(mtgtdir),
527                                     mdt_object_child(mtgt), rr->rr_fid2,
528                                     rr->rr_tgt, ma);
529         } else /* -ENOENT */ {
530                 rc = mdo_name_insert(info->mti_env, mdt_object_child(mtgtdir),
531                                      rr->rr_tgt, rr->rr_fid2,
532                                      S_ISDIR(ma->ma_attr.la_mode));
533         }
534
535         /* handle last link of tgt object */
536         if (rc == 0 && mtgt)
537                 mdt_handle_last_unlink(info, mtgt, ma);
538
539         EXIT;
540         if (mtgt) {
541                 mdt_object_unlock_put(info, mtgt, lh_tgt, rc);
542         }
543 out_unlock_tgtdir:
544         mdt_object_unlock_put(info, mtgtdir, lh_tgtdir, rc);
545 out:
546         mdt_shrink_reply(info, REPLY_REC_OFF + 1, 0, 0);
547         return rc;
548 }
549
550 static int mdt_rename_lock(struct mdt_thread_info *info,
551                            struct lustre_handle *lh)
552 {
553         ldlm_policy_data_t policy = { .l_inodebits = { MDS_INODELOCK_UPDATE } };
554         struct ldlm_namespace *ns = info->mti_mdt->mdt_namespace;
555         int flags = LDLM_FL_ATOMIC_CB;
556         struct ldlm_res_id res_id;
557         struct lu_site *ls;
558         int rc;
559         ENTRY;
560
561         ls = info->mti_mdt->mdt_md_dev.md_lu_dev.ld_site;
562         fid_build_res_name(&LUSTRE_BFL_FID, &res_id);
563
564         if (ls->ls_control_exp == NULL) {
565                 /*
566                  * Current node is controller, that is mdt0 where we should take
567                  * BFL lock.
568                  */
569                 rc = ldlm_cli_enqueue_local(ns, res_id, LDLM_IBITS, &policy,
570                                             LCK_EX, &flags, ldlm_blocking_ast,
571                                             ldlm_completion_ast, NULL, NULL, 0,
572                                             NULL, lh);
573         } else {
574                 /*
575                  * This is the case mdt0 is remote node, issue DLM lock like
576                  * other clients.
577                  */
578                 rc = ldlm_cli_enqueue(ls->ls_control_exp, NULL, res_id,
579                                       LDLM_IBITS, &policy, LCK_EX, &flags,
580                                       ldlm_blocking_ast, ldlm_completion_ast,
581                                       NULL, NULL, NULL, 0, NULL, lh, 0);
582         }
583
584         RETURN(rc);
585 }
586
587 static void mdt_rename_unlock(struct lustre_handle *lh)
588 {
589         ENTRY;
590         ldlm_lock_decref(lh, LCK_EX);
591         EXIT;
592 }
593
594 /*
595  * This is is_subdir() variant, it is CMD is cmm forwards it to correct
596  * target. Source should not be ancestor of target dir. May be other rename
597  * checks can be moved here later.
598  */
599 static int mdt_rename_check(struct mdt_thread_info *info, struct lu_fid *fid)
600 {
601         struct mdt_reint_record *rr = &info->mti_rr;
602         struct lu_fid dst_fid = *rr->rr_fid2;
603         struct mdt_object *dst;
604         int rc = 0;
605         ENTRY;
606
607         do {
608                 dst = mdt_object_find(info->mti_env, info->mti_mdt, &dst_fid,
609                                       BYPASS_CAPA);
610                 if (!IS_ERR(dst)) {
611                         rc = mdo_is_subdir(info->mti_env, mdt_object_child(dst),
612                                            fid, &dst_fid);
613                         mdt_object_put(info->mti_env, dst);
614                         if (rc < 0) {
615                                 CERROR("Error while doing mdo_is_subdir(), rc %d\n",
616                                        rc);
617                         } else if (rc == 1) {
618                                 rc = -EINVAL;
619                         }
620                 } else {
621                         rc = PTR_ERR(dst);
622                 }
623         } while (rc == EREMOTE);
624
625         RETURN(rc);
626 }
627
628 static int mdt_reint_rename(struct mdt_thread_info *info,
629                             struct mdt_lock_handle *lhc)
630 {
631         struct mdt_reint_record *rr = &info->mti_rr;
632         struct req_capsule      *pill = &info->mti_pill;
633         struct md_attr          *ma = &info->mti_attr;
634         struct mdt_object       *msrcdir;
635         struct mdt_object       *mtgtdir;
636         struct mdt_object       *mold;
637         struct mdt_object       *mnew = NULL;
638         struct mdt_lock_handle  *lh_srcdirp;
639         struct mdt_lock_handle  *lh_tgtdirp;
640         struct mdt_lock_handle  *lh_oldp;
641         struct mdt_lock_handle  *lh_newp;
642         struct lu_fid           *old_fid = &info->mti_tmp_fid1;
643         struct lu_fid           *new_fid = &info->mti_tmp_fid2;
644         struct lustre_handle     rename_lh = { 0 };
645         int                      rc;
646
647         ENTRY;
648
649         rc = req_capsule_get_size(pill, &RMF_NAME, RCL_CLIENT);
650         if (rc == 1) {
651         /* if (rr->rr_name[0] == 0) {*/
652                 rc = mdt_reint_rename_tgt(info);
653                 RETURN(rc);
654         }
655
656         rc = mdt_rename_lock(info, &rename_lh);
657         if (rc) {
658                 CERROR("can't lock FS for rename, rc %d\n", rc);
659                 GOTO(out, rc);
660         }
661
662         lh_newp = &info->mti_lh[MDT_LH_NEW];
663
664         /* step 1: lock the source dir */
665         lh_srcdirp = &info->mti_lh[MDT_LH_PARENT];
666         lh_srcdirp->mlh_mode = LCK_EX;
667         msrcdir = mdt_object_find_lock(info, rr->rr_fid1, lh_srcdirp,
668                                        MDS_INODELOCK_UPDATE, rr->rr_capa1);
669         if (IS_ERR(msrcdir))
670                 GOTO(out_rename_lock, rc = PTR_ERR(msrcdir));
671
672         /*step 2: find & lock the target dir*/
673         lh_tgtdirp = &info->mti_lh[MDT_LH_CHILD];
674         lh_tgtdirp->mlh_mode = LCK_EX;
675         if (lu_fid_eq(rr->rr_fid1, rr->rr_fid2)) {
676                 mdt_object_get(info->mti_env, msrcdir);
677                 mtgtdir = msrcdir;
678         } else {
679                 mtgtdir = mdt_object_find(info->mti_env, info->mti_mdt,
680                                           rr->rr_fid2, rr->rr_capa2);
681                 if (IS_ERR(mtgtdir))
682                         GOTO(out_unlock_source, rc = PTR_ERR(mtgtdir));
683
684                 rc = mdt_object_cr_lock(info, mtgtdir, lh_tgtdirp,
685                                         MDS_INODELOCK_UPDATE);
686                 if (rc != 0) {
687                         mdt_object_put(info->mti_env, mtgtdir);
688                         GOTO(out_unlock_source, rc);
689                 }
690
691         }
692
693         /*step 3: find & lock the old object*/
694         rc = mdo_lookup(info->mti_env, mdt_object_child(msrcdir),
695                         rr->rr_name, old_fid);
696         if (rc != 0)
697                 GOTO(out_unlock_target, rc);
698
699         if (lu_fid_eq(old_fid, rr->rr_fid1) || lu_fid_eq(old_fid, rr->rr_fid2))
700                 GOTO(out_unlock_target, rc = -EINVAL);
701
702         lh_oldp = &info->mti_lh[MDT_LH_OLD];
703         lh_oldp->mlh_mode = LCK_EX;
704         mold = mdt_object_find_lock(info, old_fid, lh_oldp,
705                                     MDS_INODELOCK_LOOKUP, BYPASS_CAPA);
706         if (IS_ERR(mold))
707                 GOTO(out_unlock_target, rc = PTR_ERR(mold));
708
709         /*step 4: find & lock the new object*/
710         /* new target object may not exist now */
711         rc = mdo_lookup(info->mti_env, mdt_object_child(mtgtdir),
712                         rr->rr_tgt, new_fid);
713         if (rc == 0) {
714                 /* the new_fid should have been filled at this moment*/
715                 if (lu_fid_eq(old_fid, new_fid))
716                        GOTO(out_unlock_old, rc);
717
718                 if (lu_fid_eq(new_fid, rr->rr_fid1) ||
719                     lu_fid_eq(new_fid, rr->rr_fid2))
720                         GOTO(out_unlock_old, rc = -EINVAL);
721
722                 lh_newp->mlh_mode = LCK_EX;
723                 mnew = mdt_object_find(info->mti_env, info->mti_mdt, new_fid,
724                                        BYPASS_CAPA);
725                 if (IS_ERR(mnew))
726                         GOTO(out_unlock_old, rc = PTR_ERR(mnew));
727
728                 rc = mdt_object_cr_lock(info, mnew, lh_newp,
729                                         MDS_INODELOCK_FULL);
730                 if (rc != 0) {
731                         mdt_object_put(info->mti_env, mnew);
732                         GOTO(out_unlock_old, rc);
733                 }
734         } else if (rc != -EREMOTE && rc != -ENOENT)
735                 GOTO(out_unlock_old, rc);
736
737         /* step 5: dome some checking ...*/
738         /* step 6: rename it */
739         ma->ma_lmm = req_capsule_server_get(&info->mti_pill, &RMF_MDT_MD);
740         ma->ma_lmm_size = req_capsule_get_size(&info->mti_pill,
741                                                &RMF_MDT_MD, RCL_SERVER);
742
743         ma->ma_cookie = req_capsule_server_get(&info->mti_pill,
744                                                 &RMF_LOGCOOKIES);
745         ma->ma_cookie_size = req_capsule_get_size(&info->mti_pill,
746                                                &RMF_LOGCOOKIES, RCL_SERVER);
747
748         if (!ma->ma_lmm || !ma->ma_cookie)
749                 GOTO(out_unlock_new, rc = -EINVAL);
750
751         ma->ma_need = MA_INODE | MA_LOV | MA_COOKIE;
752
753         mdt_fail_write(info->mti_env, info->mti_mdt->mdt_bottom,
754                        OBD_FAIL_MDS_REINT_RENAME_WRITE);
755
756         /* Check if @dst is subdir of @src. */
757         rc = mdt_rename_check(info, old_fid);
758         if (rc)
759                 GOTO(out_unlock_new, rc);
760
761         rc = mdo_rename(info->mti_env, mdt_object_child(msrcdir),
762                         mdt_object_child(mtgtdir), old_fid, rr->rr_name,
763                         (mnew ? mdt_object_child(mnew) : NULL),
764                         rr->rr_tgt, ma);
765
766         /* handle last link of tgt object */
767         if (rc == 0 && mnew)
768                 mdt_handle_last_unlink(info, mnew, ma);
769
770 out_unlock_new:
771         if (mnew) {
772                 mdt_object_unlock_put(info, mnew, lh_newp, rc);
773         }
774 out_unlock_old:
775         mdt_object_unlock_put(info, mold, lh_oldp, rc);
776 out_unlock_target:
777         mdt_object_unlock_put(info, mtgtdir, lh_tgtdirp, rc);
778 out_unlock_source:
779         mdt_object_unlock_put(info, msrcdir, lh_srcdirp, rc);
780 out_rename_lock:
781         mdt_rename_unlock(&rename_lh);
782 out:
783         mdt_shrink_reply(info, REPLY_REC_OFF + 1, 0, 0);
784         return rc;
785 }
786
787 typedef int (*mdt_reinter)(struct mdt_thread_info *info,
788                            struct mdt_lock_handle *lhc);
789
790 static mdt_reinter reinters[REINT_MAX] = {
791         [REINT_SETATTR]  = mdt_reint_setattr,
792         [REINT_CREATE] = mdt_reint_create,
793         [REINT_LINK] = mdt_reint_link,
794         [REINT_UNLINK] = mdt_reint_unlink,
795         [REINT_RENAME] = mdt_reint_rename,
796         [REINT_OPEN] = mdt_reint_open
797 };
798
799 int mdt_reint_rec(struct mdt_thread_info *info,
800                   struct mdt_lock_handle *lhc)
801 {
802         int rc;
803         ENTRY;
804
805         rc = reinters[info->mti_rr.rr_opcode](info, lhc);
806
807         RETURN(rc);
808 }