Whamcloud - gitweb
b=19964 SOM cleanups, part2
[fs/lustre-release.git] / lustre / mdt / mdt_reint.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright  2008 Sun Microsystems, Inc. All rights reserved
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/mdt/mdt_reint.c
37  *
38  * Lustre Metadata Target (mdt) reintegration routines
39  *
40  * Author: Peter Braam <braam@clusterfs.com>
41  * Author: Andreas Dilger <adilger@clusterfs.com>
42  * Author: Phil Schwan <phil@clusterfs.com>
43  * Author: Huang Hua <huanghua@clusterfs.com>
44  * Author: Yury Umanets <umka@clusterfs.com>
45  */
46
47 #ifndef EXPORT_SYMTAB
48 # define EXPORT_SYMTAB
49 #endif
50 #define DEBUG_SUBSYSTEM S_MDS
51
52 #include "mdt_internal.h"
53 #include "lu_time.h"
54
55 static inline void mdt_reint_init_ma(struct mdt_thread_info *info,
56                                      struct md_attr *ma)
57 {
58         ma->ma_lmm = req_capsule_server_get(info->mti_pill, &RMF_MDT_MD);
59         ma->ma_lmm_size = req_capsule_get_size(info->mti_pill,
60                                                &RMF_MDT_MD, RCL_SERVER);
61
62         ma->ma_cookie = req_capsule_server_get(info->mti_pill,
63                                                &RMF_LOGCOOKIES);
64         ma->ma_cookie_size = req_capsule_get_size(info->mti_pill,
65                                                   &RMF_LOGCOOKIES,
66                                                   RCL_SERVER);
67
68         ma->ma_need = MA_INODE | MA_LOV | MA_COOKIE;
69         ma->ma_valid = 0;
70 }
71
72 static int mdt_create_pack_capa(struct mdt_thread_info *info, int rc,
73                                 struct mdt_object *object,
74                                 struct mdt_body *repbody)
75 {
76         ENTRY;
77
78         /* for cross-ref mkdir, mds capa has been fetched from remote obj, then
79          * we won't go to below*/
80         if (repbody->valid & OBD_MD_FLMDSCAPA)
81                 RETURN(rc);
82
83         if (rc == 0 && info->mti_mdt->mdt_opts.mo_mds_capa &&
84             info->mti_exp->exp_connect_flags & OBD_CONNECT_MDS_CAPA) {
85                 struct lustre_capa *capa;
86
87                 capa = req_capsule_server_get(info->mti_pill, &RMF_CAPA1);
88                 LASSERT(capa);
89                 capa->lc_opc = CAPA_OPC_MDS_DEFAULT;
90                 rc = mo_capa_get(info->mti_env, mdt_object_child(object), capa,
91                                  0);
92                 if (rc == 0)
93                         repbody->valid |= OBD_MD_FLMDSCAPA;
94         }
95
96         RETURN(rc);
97 }
98
99 int mdt_version_get_check(struct mdt_thread_info *info, int index)
100 {
101         /** version recovery */
102         struct md_object *mo;
103         struct ptlrpc_request *req = mdt_info_req(info);
104         __u64 curr_version, *pre_versions;
105         ENTRY;
106
107         if (!exp_connect_vbr(req->rq_export))
108                 RETURN(0);
109
110         LASSERT(info->mti_mos[index]);
111         LASSERT(mdt_object_exists(info->mti_mos[index]));
112         mo = mdt_object_child(info->mti_mos[index]);
113
114         curr_version = mo_version_get(info->mti_env, mo);
115         CDEBUG(D_INODE, "Version is "LPX64"\n", curr_version);
116         /** VBR: version is checked always because costs nothing */
117         if (lustre_msg_get_transno(req->rq_reqmsg) != 0) {
118                 pre_versions = lustre_msg_get_versions(req->rq_reqmsg);
119                 LASSERT(index < PTLRPC_NUM_VERSIONS);
120                 /** Sanity check for malformed buffers */
121                 if (pre_versions == NULL) {
122                         CERROR("No versions in request buffer\n");
123                         spin_lock(&req->rq_export->exp_lock);
124                         req->rq_export->exp_vbr_failed = 1;
125                         spin_unlock(&req->rq_export->exp_lock);
126                         RETURN(-EOVERFLOW);
127                 } else if (pre_versions[index] != curr_version) {
128                         CDEBUG(D_INODE, "Version mismatch "LPX64" != "LPX64"\n",
129                                pre_versions[index], curr_version);
130                         spin_lock(&req->rq_export->exp_lock);
131                         req->rq_export->exp_vbr_failed = 1;
132                         spin_unlock(&req->rq_export->exp_lock);
133                         RETURN(-EOVERFLOW);
134                 }
135         }
136         /** save pre-versions in reply */
137         LASSERT(req->rq_repmsg != NULL);
138         pre_versions = lustre_msg_get_versions(req->rq_repmsg);
139         if (pre_versions)
140                 pre_versions[index] = curr_version;
141         RETURN(0);
142 }
143
144 static int mdt_md_create(struct mdt_thread_info *info)
145 {
146         struct mdt_device       *mdt = info->mti_mdt;
147         struct mdt_object       *parent;
148         struct mdt_object       *child;
149         struct mdt_lock_handle  *lh;
150         struct mdt_body         *repbody;
151         struct md_attr          *ma = &info->mti_attr;
152         struct mdt_reint_record *rr = &info->mti_rr;
153         struct lu_name          *lname;
154         int rc;
155         ENTRY;
156
157         DEBUG_REQ(D_INODE, mdt_info_req(info), "Create  (%s->"DFID") in "DFID,
158                   rr->rr_name, PFID(rr->rr_fid2), PFID(rr->rr_fid1));
159
160         repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
161
162         lh = &info->mti_lh[MDT_LH_PARENT];
163         mdt_lock_pdo_init(lh, LCK_PW, rr->rr_name, rr->rr_namelen);
164
165         parent = mdt_object_find_lock(info, rr->rr_fid1, lh,
166                                       MDS_INODELOCK_UPDATE);
167         if (IS_ERR(parent))
168                 RETURN(PTR_ERR(parent));
169
170         child = mdt_object_find(info->mti_env, mdt, rr->rr_fid2);
171         if (likely(!IS_ERR(child))) {
172                 struct md_object *next = mdt_object_child(parent);
173
174                 ma->ma_need = MA_INODE;
175                 ma->ma_valid = 0;
176                 /* capa for cross-ref will be stored here */
177                 ma->ma_capa = req_capsule_server_get(info->mti_pill,
178                                                      &RMF_CAPA1);
179                 LASSERT(ma->ma_capa);
180
181                 mdt_fail_write(info->mti_env, info->mti_mdt->mdt_bottom,
182                                OBD_FAIL_MDS_REINT_CREATE_WRITE);
183
184                 info->mti_mos[0] = parent;
185                 info->mti_mos[1] = child;
186                 rc = mdt_version_get_check(info, 0);
187                 if (rc)
188                         GOTO(out_put_child, rc);
189
190                 /* Let lower layer know current lock mode. */
191                 info->mti_spec.sp_cr_mode =
192                         mdt_dlm_mode2mdl_mode(lh->mlh_pdo_mode);
193
194                 /*
195                  * Do perform lookup sanity check. We do not know if name exists
196                  * or not.
197                  */
198                 info->mti_spec.sp_cr_lookup = 1;
199                 info->mti_spec.sp_feat = &dt_directory_features;
200
201                 lname = mdt_name(info->mti_env, (char *)rr->rr_name,
202                                  rr->rr_namelen);
203                 rc = mdo_create(info->mti_env, next, lname,
204                                 mdt_object_child(child),
205                                 &info->mti_spec, ma);
206                 if (rc == 0) {
207                         /* Return fid & attr to client. */
208                         if (ma->ma_valid & MA_INODE)
209                                 mdt_pack_attr2body(info, repbody, &ma->ma_attr,
210                                                    mdt_object_fid(child));
211                 }
212 out_put_child:
213                 mdt_object_put(info->mti_env, child);
214         } else
215                 rc = PTR_ERR(child);
216
217         mdt_create_pack_capa(info, rc, child, repbody);
218         mdt_object_unlock_put(info, parent, lh, rc);
219         RETURN(rc);
220 }
221
222 /* Partial request to create object only */
223 static int mdt_md_mkobj(struct mdt_thread_info *info)
224 {
225         struct mdt_device      *mdt = info->mti_mdt;
226         struct mdt_object      *o;
227         struct mdt_body        *repbody;
228         struct md_attr         *ma = &info->mti_attr;
229         int rc;
230         ENTRY;
231
232         DEBUG_REQ(D_INODE, mdt_info_req(info), "Partial create "DFID"",
233                   PFID(info->mti_rr.rr_fid2));
234
235         repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
236
237         o = mdt_object_find(info->mti_env, mdt, info->mti_rr.rr_fid2);
238         if (!IS_ERR(o)) {
239                 struct md_object *next = mdt_object_child(o);
240
241                 ma->ma_need = MA_INODE;
242                 ma->ma_valid = 0;
243
244                 /*
245                  * Cross-ref create can encounter already created obj in case of
246                  * recovery, just get attr in that case.
247                  */
248                 if (mdt_object_exists(o) == 1) {
249                         rc = mo_attr_get(info->mti_env, next, ma);
250                 } else {
251                         /*
252                          * Here, NO permission check for object_create,
253                          * such check has been done on the original MDS.
254                          */
255                         rc = mo_object_create(info->mti_env, next,
256                                               &info->mti_spec, ma);
257                 }
258                 if (rc == 0) {
259                         /* Return fid & attr to client. */
260                         if (ma->ma_valid & MA_INODE)
261                                 mdt_pack_attr2body(info, repbody, &ma->ma_attr,
262                                                    mdt_object_fid(o));
263                 }
264                 mdt_object_put(info->mti_env, o);
265         } else
266                 rc = PTR_ERR(o);
267
268         mdt_create_pack_capa(info, rc, o, repbody);
269         RETURN(rc);
270 }
271
272 /* In the raw-setattr case, we lock the child inode.
273  * In the write-back case or if being called from open,
274  *               the client holds a lock already.
275  * We use the ATTR_FROM_OPEN (translated into MRF_SETATTR_LOCKED by
276  * mdt_setattr_unpack()) flag to tell these cases apart. */
277 int mdt_attr_set(struct mdt_thread_info *info, struct mdt_object *mo, int flags)
278 {
279         struct md_attr          *ma = &info->mti_attr;
280         struct mdt_lock_handle  *lh;
281         int som_update = 0;
282         int do_vbr = ma->ma_attr.la_valid & (LA_MODE|LA_UID|LA_GID|LA_FLAGS);
283         int rc;
284         ENTRY;
285
286         /* attr shouldn't be set on remote object */
287         LASSERT(mdt_object_exists(mo) >= 0);
288
289         if (exp_connect_som(info->mti_exp) && info->mti_ioepoch)
290                 som_update = (info->mti_ioepoch->flags & MF_SOM_CHANGE);
291
292         /* Try to avoid object_lock if another epoch has been started
293          * already. */
294         if (som_update && (info->mti_ioepoch->ioepoch != mo->mot_ioepoch))
295                 RETURN(0);
296
297         lh = &info->mti_lh[MDT_LH_PARENT];
298         mdt_lock_reg_init(lh, LCK_PW);
299
300         if (!(flags & MRF_SETATTR_LOCKED)) {
301                 __u64 lockpart = MDS_INODELOCK_UPDATE;
302                 if (ma->ma_attr.la_valid & (LA_MODE|LA_UID|LA_GID))
303                         lockpart |= MDS_INODELOCK_LOOKUP;
304
305                 rc = mdt_object_lock(info, mo, lh, lockpart, MDT_LOCAL_LOCK);
306                 if (rc != 0)
307                         RETURN(rc);
308         }
309
310         /* Setattrs are syncronized through dlm lock taken above. If another
311          * epoch started, its attributes may be already flushed on disk,
312          * skip setattr. */
313         if (som_update && (info->mti_ioepoch->ioepoch != mo->mot_ioepoch))
314                 GOTO(out_unlock, rc = 0);
315
316         if (mdt_object_exists(mo) == 0)
317                 GOTO(out_unlock, rc = -ENOENT);
318
319         /* all attrs are packed into mti_attr in unpack_setattr */
320         mdt_fail_write(info->mti_env, info->mti_mdt->mdt_bottom,
321                        OBD_FAIL_MDS_REINT_SETATTR_WRITE);
322
323         /* This is only for set ctime when rename's source is on remote MDS. */
324         if (unlikely(ma->ma_attr.la_valid == LA_CTIME))
325                 ma->ma_attr_flags |= MDS_VTX_BYPASS;
326
327         /* VBR: update version if attr changed are important for recovery */
328         if (do_vbr) {
329                 info->mti_mos[0] = mo;
330                 rc = mdt_version_get_check(info, 0);
331                 if (rc)
332                         GOTO(out_unlock, rc);
333         }
334
335         /* all attrs are packed into mti_attr in unpack_setattr */
336         rc = mo_attr_set(info->mti_env, mdt_object_child(mo), ma);
337         if (rc != 0)
338                 GOTO(out_unlock, rc);
339
340         /* Re-enable SIZEONMDS. */
341         if (som_update) {
342                 CDEBUG(D_INODE, "Closing epoch "LPU64" on "DFID" size "LPU64
343                        ". Count %d\n", mo->mot_ioepoch,
344                        PFID(mdt_object_fid(mo)), ma->ma_attr.la_size,
345                        mo->mot_ioepoch_count);
346                 mdt_object_som_enable(info, mo);
347         }
348
349         EXIT;
350 out_unlock:
351         mdt_object_unlock(info, mo, lh, rc);
352         return rc;
353 }
354
355 static int mdt_reint_setattr(struct mdt_thread_info *info,
356                              struct mdt_lock_handle *lhc)
357 {
358         struct md_attr          *ma = &info->mti_attr;
359         struct mdt_reint_record *rr = &info->mti_rr;
360         struct ptlrpc_request   *req = mdt_info_req(info);
361         struct mdt_export_data  *med = &req->rq_export->exp_mdt_data;
362         struct mdt_file_data    *mfd;
363         struct mdt_object       *mo;
364         struct md_object        *next;
365         struct mdt_body         *repbody;
366         int                      rc;
367         ENTRY;
368
369         DEBUG_REQ(D_INODE, req, "setattr "DFID" %x", PFID(rr->rr_fid1),
370                   (unsigned int)ma->ma_attr.la_valid);
371
372         if (info->mti_dlm_req)
373                 ldlm_request_cancel(req, info->mti_dlm_req, 0);
374
375         repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
376         mo = mdt_object_find(info->mti_env, info->mti_mdt, rr->rr_fid1);
377         if (IS_ERR(mo))
378                 GOTO(out, rc = PTR_ERR(mo));
379
380         /* start a log jounal handle if needed */
381         if (!(mdt_conn_flags(info) & OBD_CONNECT_SOM)) {
382                 if ((ma->ma_attr.la_valid & LA_SIZE) ||
383                     (rr->rr_flags & MRF_SETATTR_LOCKED)) {
384                         /* Check write access for the O_TRUNC case */
385                         if (mdt_write_read(info->mti_mdt, mo) < 0)
386                                 GOTO(out_put, rc = -ETXTBSY);
387                 }
388         } else if (info->mti_ioepoch &&
389                    (info->mti_ioepoch->flags & MF_EPOCH_OPEN)) {
390                 /* Truncate case. */
391                 rc = mdt_write_get(info->mti_mdt, mo);
392                 if (rc)
393                         GOTO(out_put, rc);
394
395                 mfd = mdt_mfd_new();
396                 if (mfd == NULL)
397                         GOTO(out_put, rc = -ENOMEM);
398
399                 mdt_ioepoch_open(info, mo);
400                 repbody->ioepoch = mo->mot_ioepoch;
401
402                 mdt_object_get(info->mti_env, mo);
403                 mdt_mfd_set_mode(mfd, FMODE_TRUNC);
404                 mfd->mfd_object = mo;
405                 mfd->mfd_xid = req->rq_xid;
406
407                 spin_lock(&med->med_open_lock);
408                 list_add(&mfd->mfd_list, &med->med_open_head);
409                 spin_unlock(&med->med_open_lock);
410                 repbody->handle.cookie = mfd->mfd_handle.h_cookie;
411         }
412
413         if (info->mti_ioepoch && (info->mti_ioepoch->flags & MF_SOM_CHANGE))
414                 ma->ma_attr_flags |= MDS_PERM_BYPASS | MDS_SOM;
415
416         rc = mdt_attr_set(info, mo, rr->rr_flags);
417         if (rc)
418                 GOTO(out_put, rc);
419
420         if (info->mti_ioepoch && (info->mti_ioepoch->flags & MF_SOM_CHANGE)) {
421                 LASSERT(mdt_conn_flags(info) & OBD_CONNECT_SOM);
422                 LASSERT(info->mti_ioepoch);
423
424                 spin_lock(&med->med_open_lock);
425                 /* Size-on-MDS Update. Find and free mfd. */
426                 mfd = mdt_handle2mfd(info, &info->mti_ioepoch->handle);
427                 if (mfd == NULL) {
428                         spin_unlock(&med->med_open_lock);
429                         CDEBUG(D_INODE | D_ERROR, "no handle for file close: "
430                                         "fid = "DFID": cookie = "LPX64"\n",
431                                         PFID(info->mti_rr.rr_fid1),
432                                         info->mti_ioepoch->handle.cookie);
433                         GOTO(out_put, rc = -ESTALE);
434                 }
435                 LASSERT(mfd->mfd_mode == FMODE_SOM);
436                 LASSERT(!(info->mti_ioepoch->flags & MF_EPOCH_CLOSE));
437
438                 class_handle_unhash(&mfd->mfd_handle);
439                 list_del_init(&mfd->mfd_list);
440                 spin_unlock(&med->med_open_lock);
441
442                 mdt_mfd_close(info, mfd);
443         }
444
445         ma->ma_need = MA_INODE;
446         ma->ma_valid = 0;
447         next = mdt_object_child(mo);
448         rc = mo_attr_get(info->mti_env, next, ma);
449         if (rc != 0)
450                 GOTO(out_put, rc);
451
452         mdt_pack_attr2body(info, repbody, &ma->ma_attr, mdt_object_fid(mo));
453
454         if (info->mti_mdt->mdt_opts.mo_oss_capa &&
455             info->mti_exp->exp_connect_flags & OBD_CONNECT_OSS_CAPA &&
456             S_ISREG(lu_object_attr(&mo->mot_obj.mo_lu)) &&
457             (ma->ma_attr.la_valid & LA_SIZE)) {
458                 struct lustre_capa *capa;
459
460                 capa = req_capsule_server_get(info->mti_pill, &RMF_CAPA2);
461                 LASSERT(capa);
462                 capa->lc_opc = CAPA_OPC_OSS_DEFAULT | CAPA_OPC_OSS_TRUNC;
463                 rc = mo_capa_get(info->mti_env, mdt_object_child(mo), capa, 0);
464                 if (rc)
465                         GOTO(out_put, rc);
466                 repbody->valid |= OBD_MD_FLOSSCAPA;
467         }
468
469         EXIT;
470 out_put:
471         mdt_object_put(info->mti_env, mo);
472 out:
473         mdt_shrink_reply(info);
474         return rc;
475 }
476
477 static int mdt_reint_create(struct mdt_thread_info *info,
478                             struct mdt_lock_handle *lhc)
479 {
480         int rc;
481         ENTRY;
482
483         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_REINT_CREATE))
484                 RETURN(err_serious(-ESTALE));
485
486         if (info->mti_dlm_req)
487                 ldlm_request_cancel(mdt_info_req(info), info->mti_dlm_req, 0);
488
489         switch (info->mti_attr.ma_attr.la_mode & S_IFMT) {
490         case S_IFDIR:{
491                 /* Cross-ref case. */
492                 if (info->mti_cross_ref) {
493                         rc = mdt_md_mkobj(info);
494                         break;
495                 }
496         }
497         case S_IFREG:
498         case S_IFLNK:
499         case S_IFCHR:
500         case S_IFBLK:
501         case S_IFIFO:
502         case S_IFSOCK:{
503                 /* Special file should stay on the same node as parent. */
504                 LASSERT(info->mti_rr.rr_namelen > 0);
505                 rc = mdt_md_create(info);
506                 break;
507         }
508         default:
509                 rc = err_serious(-EOPNOTSUPP);
510         }
511         RETURN(rc);
512 }
513
514 static int mdt_reint_unlink(struct mdt_thread_info *info,
515                             struct mdt_lock_handle *lhc)
516 {
517         struct mdt_reint_record *rr = &info->mti_rr;
518         struct ptlrpc_request   *req = mdt_info_req(info);
519         struct md_attr          *ma = &info->mti_attr;
520         struct lu_fid           *child_fid = &info->mti_tmp_fid1;
521         struct mdt_object       *mp;
522         struct mdt_object       *mc;
523         struct mdt_lock_handle  *parent_lh;
524         struct mdt_lock_handle  *child_lh;
525         struct lu_name          *lname;
526         int                      rc;
527         ENTRY;
528
529         DEBUG_REQ(D_INODE, req, "unlink "DFID"/%s", PFID(rr->rr_fid1),
530                   rr->rr_name);
531
532         if (info->mti_dlm_req)
533                 ldlm_request_cancel(req, info->mti_dlm_req, 0);
534
535         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_REINT_UNLINK))
536                 RETURN(err_serious(-ENOENT));
537
538         /*
539          * step 1: lock the parent. Note, this may be child in case of
540          * remote operation denoted by ->mti_cross_ref flag.
541          */
542         parent_lh = &info->mti_lh[MDT_LH_PARENT];
543         if (info->mti_cross_ref) {
544                 /*
545                  * Init reg lock for cross ref case when we need to do only
546                  * ref del locally.
547                  */
548                 mdt_lock_reg_init(parent_lh, LCK_PW);
549         } else {
550                 mdt_lock_pdo_init(parent_lh, LCK_PW, rr->rr_name,
551                                   rr->rr_namelen);
552         }
553         mp = mdt_object_find_lock(info, rr->rr_fid1, parent_lh,
554                                   MDS_INODELOCK_UPDATE);
555         if (IS_ERR(mp)) {
556                 rc = PTR_ERR(mp);
557                 /* errors are possible here in cross-ref cases, see below */
558                 if (info->mti_cross_ref)
559                         rc = 0;
560                 GOTO(out, rc);
561         }
562
563         info->mti_mos[0] = mp;
564         rc = mdt_version_get_check(info, 0);
565         if (rc)
566                 GOTO(out_unlock_parent, rc);
567
568         mdt_reint_init_ma(info, ma);
569         if (!ma->ma_lmm || !ma->ma_cookie)
570                 GOTO(out_unlock_parent, rc = -EINVAL);
571
572         if (info->mti_cross_ref) {
573                 /*
574                  * Remote partial operation. It is possible that replay may
575                  * happen on parent MDT and this operation will be repeated.
576                  * Therefore the object absense is allowed case and nothing
577                  * should be done here.
578                  */
579                 if (mdt_object_exists(mp) > 0) {
580                         mdt_set_capainfo(info, 0, rr->rr_fid1, BYPASS_CAPA);
581                         rc = mo_ref_del(info->mti_env,
582                                         mdt_object_child(mp), ma);
583                         if (rc == 0)
584                                 mdt_handle_last_unlink(info, mp, ma);
585                 } else
586                         rc = 0;
587                 GOTO(out_unlock_parent, rc);
588         }
589
590         /* step 2: find & lock the child */
591         lname = mdt_name(info->mti_env, (char *)rr->rr_name, rr->rr_namelen);
592         rc = mdo_lookup(info->mti_env, mdt_object_child(mp),
593                         lname, child_fid, &info->mti_spec);
594         if (rc != 0)
595                  GOTO(out_unlock_parent, rc);
596
597         /* We will lock the child regardless it is local or remote. No harm. */
598         mc = mdt_object_find(info->mti_env, info->mti_mdt, child_fid);
599         if (IS_ERR(mc))
600                 GOTO(out_unlock_parent, rc = PTR_ERR(mc));
601         child_lh = &info->mti_lh[MDT_LH_CHILD];
602         mdt_lock_reg_init(child_lh, LCK_EX);
603         rc = mdt_object_lock(info, mc, child_lh, MDS_INODELOCK_FULL,
604                              MDT_CROSS_LOCK);
605         if (rc != 0) {
606                 mdt_object_put(info->mti_env, mc);
607                 GOTO(out_unlock_parent, rc);
608         }
609
610         mdt_fail_write(info->mti_env, info->mti_mdt->mdt_bottom,
611                        OBD_FAIL_MDS_REINT_UNLINK_WRITE);
612
613         info->mti_mos[1] = mc;
614         rc = mdt_version_get_check(info, 1);
615         if (rc)
616                 GOTO(out_unlock_child, rc);
617
618         /*
619          * Now we can only make sure we need MA_INODE, in mdd layer, will check
620          * whether need MA_LOV and MA_COOKIE.
621          */
622         ma->ma_need = MA_INODE;
623         ma->ma_valid = 0;
624         mdt_set_capainfo(info, 1, child_fid, BYPASS_CAPA);
625         rc = mdo_unlink(info->mti_env, mdt_object_child(mp),
626                         mdt_object_child(mc), lname, ma);
627         if (rc == 0)
628                 mdt_handle_last_unlink(info, mc, ma);
629
630         EXIT;
631 out_unlock_child:
632         mdt_object_unlock_put(info, mc, child_lh, rc);
633 out_unlock_parent:
634         mdt_object_unlock_put(info, mp, parent_lh, rc);
635 out:
636         return rc;
637 }
638
639 static int mdt_reint_link(struct mdt_thread_info *info,
640                           struct mdt_lock_handle *lhc)
641 {
642         struct mdt_reint_record *rr = &info->mti_rr;
643         struct ptlrpc_request   *req = mdt_info_req(info);
644         struct md_attr          *ma = &info->mti_attr;
645         struct mdt_object       *ms;
646         struct mdt_object       *mp;
647         struct mdt_lock_handle  *lhs;
648         struct mdt_lock_handle  *lhp;
649         struct lu_name          *lname;
650         int rc;
651         ENTRY;
652
653         DEBUG_REQ(D_INODE, req, "link "DFID" to "DFID"/%s",
654                   PFID(rr->rr_fid1), PFID(rr->rr_fid2), rr->rr_name);
655
656         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_REINT_LINK))
657                 RETURN(err_serious(-ENOENT));
658
659         if (info->mti_dlm_req)
660                 ldlm_request_cancel(req, info->mti_dlm_req, 0);
661
662         if (info->mti_cross_ref) {
663                 /* MDT holding name ask us to add ref. */
664                 lhs = &info->mti_lh[MDT_LH_CHILD];
665                 mdt_lock_reg_init(lhs, LCK_EX);
666                 ms = mdt_object_find_lock(info, rr->rr_fid1, lhs,
667                                           MDS_INODELOCK_UPDATE);
668                 if (IS_ERR(ms))
669                         RETURN(PTR_ERR(ms));
670
671                 mdt_set_capainfo(info, 0, rr->rr_fid1, BYPASS_CAPA);
672                 rc = mo_ref_add(info->mti_env, mdt_object_child(ms), ma);
673                 mdt_object_unlock_put(info, ms, lhs, rc);
674                 RETURN(rc);
675         }
676
677         /* Invalid case so return error immediately instead of
678          * processing it */
679         if (lu_fid_eq(rr->rr_fid1, rr->rr_fid2))
680                 RETURN(-EPERM);
681
682         /* step 1: find & lock the target parent dir */
683         lhp = &info->mti_lh[MDT_LH_PARENT];
684         mdt_lock_pdo_init(lhp, LCK_EX, rr->rr_name,
685                           rr->rr_namelen);
686         mp = mdt_object_find_lock(info, rr->rr_fid2, lhp,
687                                   MDS_INODELOCK_UPDATE);
688         if (IS_ERR(mp))
689                 RETURN(PTR_ERR(mp));
690
691         info->mti_mos[0] = mp;
692         rc = mdt_version_get_check(info, 0);
693         if (rc)
694                 GOTO(out_unlock_parent, rc);
695
696         /* step 2: find & lock the source */
697         lhs = &info->mti_lh[MDT_LH_CHILD];
698         mdt_lock_reg_init(lhs, LCK_EX);
699
700         ms = mdt_object_find(info->mti_env, info->mti_mdt, rr->rr_fid1);
701         if (IS_ERR(ms))
702                 GOTO(out_unlock_parent, rc = PTR_ERR(ms));
703
704         rc = mdt_object_lock(info, ms, lhs, MDS_INODELOCK_UPDATE,
705                             MDT_CROSS_LOCK);
706         if (rc != 0) {
707                 mdt_object_put(info->mti_env, ms);
708                 GOTO(out_unlock_parent, rc);
709         }
710
711         /* step 3: link it */
712         mdt_fail_write(info->mti_env, info->mti_mdt->mdt_bottom,
713                        OBD_FAIL_MDS_REINT_LINK_WRITE);
714
715         info->mti_mos[1] = ms;
716         rc = mdt_version_get_check(info, 1);
717         if (rc)
718                 GOTO(out_unlock_child, rc);
719
720         lname = mdt_name(info->mti_env, (char *)rr->rr_name, rr->rr_namelen);
721         rc = mdo_link(info->mti_env, mdt_object_child(mp),
722                       mdt_object_child(ms), lname, ma);
723
724         EXIT;
725 out_unlock_child:
726         mdt_object_unlock_put(info, ms, lhs, rc);
727 out_unlock_parent:
728         mdt_object_unlock_put(info, mp, lhp, rc);
729         return rc;
730 }
731
732 /* partial operation for rename */
733 static int mdt_reint_rename_tgt(struct mdt_thread_info *info)
734 {
735         struct mdt_reint_record *rr = &info->mti_rr;
736         struct ptlrpc_request   *req = mdt_info_req(info);
737         struct md_attr          *ma = &info->mti_attr;
738         struct mdt_object       *mtgtdir;
739         struct mdt_object       *mtgt = NULL;
740         struct mdt_lock_handle  *lh_tgtdir;
741         struct mdt_lock_handle  *lh_tgt = NULL;
742         struct lu_fid           *tgt_fid = &info->mti_tmp_fid1;
743         struct lu_name          *lname;
744         int                      rc;
745         ENTRY;
746
747         DEBUG_REQ(D_INODE, req, "rename_tgt: insert (%s->"DFID") in "DFID,
748                   rr->rr_tgt, PFID(rr->rr_fid2), PFID(rr->rr_fid1));
749
750         /* step 1: lookup & lock the tgt dir. */
751         lh_tgtdir = &info->mti_lh[MDT_LH_PARENT];
752         mdt_lock_pdo_init(lh_tgtdir, LCK_PW, rr->rr_tgt,
753                           rr->rr_tgtlen);
754         mtgtdir = mdt_object_find_lock(info, rr->rr_fid1, lh_tgtdir,
755                                        MDS_INODELOCK_UPDATE);
756         if (IS_ERR(mtgtdir))
757                 RETURN(PTR_ERR(mtgtdir));
758
759         /* step 2: find & lock the target object if exists. */
760         mdt_set_capainfo(info, 0, rr->rr_fid1, BYPASS_CAPA);
761         lname = mdt_name(info->mti_env, (char *)rr->rr_tgt, rr->rr_tgtlen);
762         rc = mdo_lookup(info->mti_env, mdt_object_child(mtgtdir),
763                         lname, tgt_fid, &info->mti_spec);
764         if (rc != 0 && rc != -ENOENT) {
765                 GOTO(out_unlock_tgtdir, rc);
766         } else if (rc == 0) {
767                 /*
768                  * In case of replay that name can be already inserted, check
769                  * that and do nothing if so.
770                  */
771                 if (lu_fid_eq(tgt_fid, rr->rr_fid2))
772                         GOTO(out_unlock_tgtdir, rc);
773
774                 lh_tgt = &info->mti_lh[MDT_LH_CHILD];
775                 mdt_lock_reg_init(lh_tgt, LCK_EX);
776
777                 mtgt = mdt_object_find_lock(info, tgt_fid, lh_tgt,
778                                             MDS_INODELOCK_LOOKUP);
779                 if (IS_ERR(mtgt))
780                         GOTO(out_unlock_tgtdir, rc = PTR_ERR(mtgt));
781
782                 mdt_reint_init_ma(info, ma);
783                 if (!ma->ma_lmm || !ma->ma_cookie)
784                         GOTO(out_unlock_tgt, rc = -EINVAL);
785
786                 rc = mdo_rename_tgt(info->mti_env, mdt_object_child(mtgtdir),
787                                     mdt_object_child(mtgt), rr->rr_fid2,
788                                     lname, ma);
789         } else /* -ENOENT */ {
790                 rc = mdo_name_insert(info->mti_env, mdt_object_child(mtgtdir),
791                                      lname, rr->rr_fid2, ma);
792         }
793
794         /* handle last link of tgt object */
795         if (rc == 0 && mtgt)
796                 mdt_handle_last_unlink(info, mtgt, ma);
797
798         EXIT;
799 out_unlock_tgt:
800         if (mtgt)
801                 mdt_object_unlock_put(info, mtgt, lh_tgt, rc);
802 out_unlock_tgtdir:
803         mdt_object_unlock_put(info, mtgtdir, lh_tgtdir, rc);
804         return rc;
805 }
806
807 static int mdt_rename_lock(struct mdt_thread_info *info,
808                            struct lustre_handle *lh)
809 {
810         struct ldlm_namespace *ns     = info->mti_mdt->mdt_namespace;
811         ldlm_policy_data_t    *policy = &info->mti_policy;
812         struct ldlm_res_id    *res_id = &info->mti_res_id;
813         struct md_site        *ms;
814         int rc;
815         ENTRY;
816
817         ms = mdt_md_site(info->mti_mdt);
818         fid_build_reg_res_name(&LUSTRE_BFL_FID, res_id);
819
820         memset(policy, 0, sizeof *policy);
821         policy->l_inodebits.bits = MDS_INODELOCK_UPDATE;
822
823         if (ms->ms_control_exp == NULL) {
824                 int flags = LDLM_FL_LOCAL_ONLY | LDLM_FL_ATOMIC_CB;
825
826                 /*
827                  * Current node is controller, that is mdt0, where we should
828                  * take BFL lock.
829                  */
830                 rc = ldlm_cli_enqueue_local(ns, res_id, LDLM_IBITS, policy,
831                                             LCK_EX, &flags, ldlm_blocking_ast,
832                                             ldlm_completion_ast, NULL, NULL, 0,
833                                             &info->mti_exp->exp_handle.h_cookie,
834                                             lh);
835         } else {
836                 struct ldlm_enqueue_info einfo = { LDLM_IBITS, LCK_EX,
837                      ldlm_blocking_ast, ldlm_completion_ast, NULL, NULL, NULL };
838                 int flags = 0;
839
840                 /*
841                  * This is the case mdt0 is remote node, issue DLM lock like
842                  * other clients.
843                  */
844                 rc = ldlm_cli_enqueue(ms->ms_control_exp, NULL, &einfo, res_id,
845                                       policy, &flags, NULL, 0, lh, 0);
846         }
847
848         RETURN(rc);
849 }
850
851 static void mdt_rename_unlock(struct lustre_handle *lh)
852 {
853         ENTRY;
854         LASSERT(lustre_handle_is_used(lh));
855         ldlm_lock_decref(lh, LCK_EX);
856         EXIT;
857 }
858
859 /*
860  * This is is_subdir() variant, it is CMD if cmm forwards it to correct
861  * target. Source should not be ancestor of target dir. May be other rename
862  * checks can be moved here later.
863  */
864 static int mdt_rename_sanity(struct mdt_thread_info *info, struct lu_fid *fid)
865 {
866         struct mdt_reint_record *rr = &info->mti_rr;
867         struct lu_fid dst_fid = *rr->rr_fid2;
868         struct mdt_object *dst;
869         int rc = 0;
870         ENTRY;
871
872         do {
873                 LASSERT(fid_is_sane(&dst_fid));
874                 dst = mdt_object_find(info->mti_env, info->mti_mdt, &dst_fid);
875                 if (!IS_ERR(dst)) {
876                         rc = mdo_is_subdir(info->mti_env,
877                                            mdt_object_child(dst), fid,
878                                            &dst_fid);
879                         mdt_object_put(info->mti_env, dst);
880                         if (rc != -EREMOTE && rc < 0) {
881                                 CERROR("Failed mdo_is_subdir(), rc %d\n", rc);
882                         } else {
883                                 /* check the found fid */
884                                 if (lu_fid_eq(&dst_fid, fid))
885                                         rc = -EINVAL;
886                         }
887                 } else {
888                         rc = PTR_ERR(dst);
889                 }
890         } while (rc == -EREMOTE);
891
892         RETURN(rc);
893 }
894
895 static int mdt_reint_rename(struct mdt_thread_info *info,
896                             struct mdt_lock_handle *lhc)
897 {
898         struct mdt_reint_record *rr = &info->mti_rr;
899         struct md_attr          *ma = &info->mti_attr;
900         struct ptlrpc_request   *req = mdt_info_req(info);
901         struct mdt_object       *msrcdir;
902         struct mdt_object       *mtgtdir;
903         struct mdt_object       *mold;
904         struct mdt_object       *mnew = NULL;
905         struct mdt_lock_handle  *lh_srcdirp;
906         struct mdt_lock_handle  *lh_tgtdirp;
907         struct mdt_lock_handle  *lh_oldp;
908         struct mdt_lock_handle  *lh_newp;
909         struct lu_fid           *old_fid = &info->mti_tmp_fid1;
910         struct lu_fid           *new_fid = &info->mti_tmp_fid2;
911         struct lustre_handle     rename_lh = { 0 };
912         struct lu_name           slname = { 0 };
913         struct lu_name          *lname;
914         int                      rc;
915         ENTRY;
916
917         if (info->mti_dlm_req)
918                 ldlm_request_cancel(mdt_info_req(info), info->mti_dlm_req, 0);
919
920         if (info->mti_cross_ref) {
921                 rc = mdt_reint_rename_tgt(info);
922                 RETURN(rc);
923         }
924
925         DEBUG_REQ(D_INODE, req, "rename "DFID"/%s to "DFID"/%s",
926                   PFID(rr->rr_fid1), rr->rr_name,
927                   PFID(rr->rr_fid2), rr->rr_tgt);
928
929         rc = mdt_rename_lock(info, &rename_lh);
930         if (rc) {
931                 CERROR("Can't lock FS for rename, rc %d\n", rc);
932                 RETURN(rc);
933         }
934
935         lh_newp = &info->mti_lh[MDT_LH_NEW];
936
937         /* step 1: lock the source dir. */
938         lh_srcdirp = &info->mti_lh[MDT_LH_PARENT];
939         mdt_lock_pdo_init(lh_srcdirp, LCK_PW, rr->rr_name,
940                           rr->rr_namelen);
941         msrcdir = mdt_object_find_lock(info, rr->rr_fid1, lh_srcdirp,
942                                        MDS_INODELOCK_UPDATE);
943         if (IS_ERR(msrcdir))
944                 GOTO(out_rename_lock, rc = PTR_ERR(msrcdir));
945
946         info->mti_mos[0] = msrcdir;
947         rc = mdt_version_get_check(info, 0);
948         if (rc)
949                 GOTO(out_unlock_source, rc);
950
951         /* step 2: find & lock the target dir. */
952         lh_tgtdirp = &info->mti_lh[MDT_LH_CHILD];
953         mdt_lock_pdo_init(lh_tgtdirp, LCK_PW, rr->rr_tgt,
954                           rr->rr_tgtlen);
955         if (lu_fid_eq(rr->rr_fid1, rr->rr_fid2)) {
956                 mdt_object_get(info->mti_env, msrcdir);
957                 mtgtdir = msrcdir;
958         } else {
959                 mtgtdir = mdt_object_find(info->mti_env, info->mti_mdt,
960                                           rr->rr_fid2);
961                 if (IS_ERR(mtgtdir))
962                         GOTO(out_unlock_source, rc = PTR_ERR(mtgtdir));
963
964                 rc = mdt_object_exists(mtgtdir);
965                 if (rc == 0)
966                         GOTO(out_unlock_target, rc = -ESTALE);
967                 else if (rc > 0) {
968                         /* we lock the target dir if it is local */
969                         rc = mdt_object_lock(info, mtgtdir, lh_tgtdirp,
970                                              MDS_INODELOCK_UPDATE,
971                                              MDT_LOCAL_LOCK);
972                         if (rc != 0) {
973                                 mdt_object_put(info->mti_env, mtgtdir);
974                                 GOTO(out_unlock_source, rc);
975                         }
976
977                         info->mti_mos[1] = mtgtdir;
978                         rc = mdt_version_get_check(info, 1);
979                         if (rc)
980                                 GOTO(out_unlock_target, rc);
981                 }
982         }
983
984         /* step 3: find & lock the old object. */
985         lname = mdt_name(info->mti_env, (char *)rr->rr_name, rr->rr_namelen);
986         mdt_name_copy(&slname, lname);
987         rc = mdo_lookup(info->mti_env, mdt_object_child(msrcdir),
988                         &slname, old_fid, &info->mti_spec);
989         if (rc != 0)
990                 GOTO(out_unlock_target, rc);
991
992         if (lu_fid_eq(old_fid, rr->rr_fid1) || lu_fid_eq(old_fid, rr->rr_fid2))
993                 GOTO(out_unlock_target, rc = -EINVAL);
994
995         mold = mdt_object_find(info->mti_env, info->mti_mdt, old_fid);
996         if (IS_ERR(mold))
997                 GOTO(out_unlock_target, rc = PTR_ERR(mold));
998
999         lh_oldp = &info->mti_lh[MDT_LH_OLD];
1000         mdt_lock_reg_init(lh_oldp, LCK_EX);
1001         rc = mdt_object_lock(info, mold, lh_oldp, MDS_INODELOCK_LOOKUP,
1002                              MDT_CROSS_LOCK);
1003         if (rc != 0) {
1004                 mdt_object_put(info->mti_env, mold);
1005                 GOTO(out_unlock_target, rc);
1006         }
1007
1008         info->mti_mos[2] = mold;
1009         rc = mdt_version_get_check(info, 2);
1010         if (rc)
1011                 GOTO(out_unlock_old, rc);
1012
1013         mdt_set_capainfo(info, 2, old_fid, BYPASS_CAPA);
1014
1015         /* step 4: find & lock the new object. */
1016         /* new target object may not exist now */
1017         lname = mdt_name(info->mti_env, (char *)rr->rr_tgt, rr->rr_tgtlen);
1018         rc = mdo_lookup(info->mti_env, mdt_object_child(mtgtdir),
1019                         lname, new_fid, &info->mti_spec);
1020         if (rc == 0) {
1021                 /* the new_fid should have been filled at this moment */
1022                 if (lu_fid_eq(old_fid, new_fid))
1023                        GOTO(out_unlock_old, rc);
1024
1025                 if (lu_fid_eq(new_fid, rr->rr_fid1) ||
1026                     lu_fid_eq(new_fid, rr->rr_fid2))
1027                         GOTO(out_unlock_old, rc = -EINVAL);
1028
1029                 mdt_lock_reg_init(lh_newp, LCK_EX);
1030                 mnew = mdt_object_find(info->mti_env, info->mti_mdt, new_fid);
1031                 if (IS_ERR(mnew))
1032                         GOTO(out_unlock_old, rc = PTR_ERR(mnew));
1033
1034                 rc = mdt_object_lock(info, mnew, lh_newp,
1035                                      MDS_INODELOCK_FULL, MDT_CROSS_LOCK);
1036                 if (rc != 0) {
1037                         mdt_object_put(info->mti_env, mnew);
1038                         GOTO(out_unlock_old, rc);
1039                 }
1040
1041                 info->mti_mos[3] = mnew;
1042                 rc = mdt_version_get_check(info, 3);
1043                 if (rc)
1044                         GOTO(out_unlock_new, rc);
1045
1046                 mdt_set_capainfo(info, 3, new_fid, BYPASS_CAPA);
1047         } else if (rc != -EREMOTE && rc != -ENOENT)
1048                 GOTO(out_unlock_old, rc);
1049
1050         /* step 5: rename it */
1051         mdt_reint_init_ma(info, ma);
1052         if (!ma->ma_lmm || !ma->ma_cookie)
1053                 GOTO(out_unlock_new, rc = -EINVAL);
1054
1055         mdt_fail_write(info->mti_env, info->mti_mdt->mdt_bottom,
1056                        OBD_FAIL_MDS_REINT_RENAME_WRITE);
1057
1058
1059         /* Check if @dst is subdir of @src. */
1060         rc = mdt_rename_sanity(info, old_fid);
1061         if (rc)
1062                 GOTO(out_unlock_new, rc);
1063
1064         rc = mdo_rename(info->mti_env, mdt_object_child(msrcdir),
1065                         mdt_object_child(mtgtdir), old_fid, &slname,
1066                         (mnew ? mdt_object_child(mnew) : NULL),
1067                         lname, ma);
1068
1069         /* handle last link of tgt object */
1070         if (rc == 0 && mnew)
1071                 mdt_handle_last_unlink(info, mnew, ma);
1072
1073         EXIT;
1074 out_unlock_new:
1075         if (mnew)
1076                 mdt_object_unlock_put(info, mnew, lh_newp, rc);
1077 out_unlock_old:
1078         mdt_object_unlock_put(info, mold, lh_oldp, rc);
1079 out_unlock_target:
1080         mdt_object_unlock_put(info, mtgtdir, lh_tgtdirp, rc);
1081 out_unlock_source:
1082         mdt_object_unlock_put(info, msrcdir, lh_srcdirp, rc);
1083 out_rename_lock:
1084         mdt_rename_unlock(&rename_lh);
1085         return rc;
1086 }
1087
1088 typedef int (*mdt_reinter)(struct mdt_thread_info *info,
1089                            struct mdt_lock_handle *lhc);
1090
1091 static mdt_reinter reinters[REINT_MAX] = {
1092         [REINT_SETATTR]  = mdt_reint_setattr,
1093         [REINT_CREATE]   = mdt_reint_create,
1094         [REINT_LINK]     = mdt_reint_link,
1095         [REINT_UNLINK]   = mdt_reint_unlink,
1096         [REINT_RENAME]   = mdt_reint_rename,
1097         [REINT_OPEN]     = mdt_reint_open,
1098         [REINT_SETXATTR] = mdt_reint_setxattr
1099 };
1100
1101 int mdt_reint_rec(struct mdt_thread_info *info,
1102                   struct mdt_lock_handle *lhc)
1103 {
1104         int rc;
1105         ENTRY;
1106
1107         rc = reinters[info->mti_rr.rr_opcode](info, lhc);
1108
1109         RETURN(rc);
1110 }