Whamcloud - gitweb
Merge b_md into HEAD
[fs/lustre-release.git] / lustre / mds / mds_reint.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  linux/mds/mds_reint.c
5  *  Lustre Metadata Server (mds) reintegration routines
6  *
7  *  Copyright (C) 2002, 2003 Cluster File Systems, Inc.
8  *   Author: Peter Braam <braam@clusterfs.com>
9  *   Author: Andreas Dilger <adilger@clusterfs.com>
10  *   Author: Phil Schwan <phil@clusterfs.com>
11  *
12  *   This file is part of Lustre, http://www.lustre.org.
13  *
14  *   Lustre is free software; you can redistribute it and/or
15  *   modify it under the terms of version 2 of the GNU General Public
16  *   License as published by the Free Software Foundation.
17  *
18  *   Lustre is distributed in the hope that it will be useful,
19  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
20  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
21  *   GNU General Public License for more details.
22  *
23  *   You should have received a copy of the GNU General Public License
24  *   along with Lustre; if not, write to the Free Software
25  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
26  */
27
28 #define EXPORT_SYMTAB
29 #define DEBUG_SUBSYSTEM S_MDS
30
31 #include <linux/obd_support.h>
32 #include <linux/obd_class.h>
33 #include <linux/obd.h>
34 #include <linux/lustre_lib.h>
35 #include <linux/lustre_idl.h>
36 #include <linux/lustre_mds.h>
37 #include <linux/lustre_dlm.h>
38 #include <linux/lustre_fsfilt.h>
39
40 extern inline struct mds_obd *mds_req2mds(struct ptlrpc_request *req);
41
42 static void mds_last_rcvd_cb(struct obd_device *obd, __u64 last_rcvd, int error)
43 {
44         CDEBUG(D_HA, "got callback for last_rcvd "LPD64": rc = %d\n",
45                last_rcvd, error);
46         if (!error && last_rcvd > obd->obd_last_committed)
47                 obd->obd_last_committed = last_rcvd;
48 }
49
50 void mds_start_transno(struct mds_obd *mds)
51 {
52         ENTRY;
53         down(&mds->mds_transno_sem);
54 }
55
56 /* Assumes caller has already pushed us into the kernel context. */
57 int mds_finish_transno(struct mds_obd *mds, void *handle,
58                        struct ptlrpc_request *req, int rc)
59 {
60         struct mds_export_data *med = &req->rq_export->exp_mds_data;
61         struct mds_client_data *mcd = med->med_mcd;
62         __u64 last_rcvd;
63         loff_t off;
64         ssize_t written;
65
66         /* Propagate error code. */
67         if (rc)
68                 GOTO(out, rc);
69
70         /* we don't allocate new transnos for replayed requests */
71         if (req->rq_level == LUSTRE_CONN_RECOVD)
72                 GOTO(out, rc = 0);
73
74         off = MDS_LR_CLIENT + med->med_off * MDS_LR_SIZE;
75
76         last_rcvd = ++mds->mds_last_rcvd;
77         req->rq_repmsg->transno = HTON__u64(last_rcvd);
78         mcd->mcd_last_rcvd = cpu_to_le64(last_rcvd);
79         mcd->mcd_mount_count = cpu_to_le64(mds->mds_mount_count);
80         mcd->mcd_last_xid = cpu_to_le64(req->rq_xid);
81
82         fsfilt_set_last_rcvd(req->rq_export->exp_obd, last_rcvd, handle,
83                              mds_last_rcvd_cb);
84         written = lustre_fwrite(mds->mds_rcvd_filp, (char *)mcd, sizeof(*mcd),
85                                 &off);
86         CDEBUG(D_INODE, "wrote trans #"LPD64" for client %s at #%d: written = "
87                LPSZ"\n", last_rcvd, mcd->mcd_uuid, med->med_off, written);
88
89         if (written == sizeof(*mcd))
90                 GOTO(out, rc = 0);
91         CERROR("error writing to last_rcvd file: rc = %d\n", rc);
92         if (written >= 0)
93                 GOTO(out, rc = -EIO);
94
95         rc = 0;
96
97         EXIT;
98  out:
99         up(&mds->mds_transno_sem);
100         return rc;
101 }
102
103 /* In the write-back case, the client holds a lock on a subtree (not supported).
104  * In the intent case, the client holds a lock on the child inode. */
105 static int mds_reint_setattr(struct mds_update_record *rec, int offset,
106                              struct ptlrpc_request *req,
107                              struct lustre_handle *lh)
108 {
109         struct mds_obd *mds = mds_req2mds(req);
110         struct obd_device *obd = req->rq_export->exp_obd;
111         struct mds_body *body;
112         struct dentry *de;
113         struct inode *inode;
114         void *handle;
115         int rc = 0, err;
116
117         de = mds_fid2dentry(mds, rec->ur_fid1, NULL);
118         if (IS_ERR(de))
119                 GOTO(out_setattr, rc = PTR_ERR(de));
120         inode = de->d_inode;
121
122         LASSERT(inode);
123         CDEBUG(D_INODE, "ino %lu\n", inode->i_ino);
124
125         OBD_FAIL_WRITE(OBD_FAIL_MDS_REINT_SETATTR_WRITE,
126                        to_kdev_t(inode->i_sb->s_dev));
127
128         mds_start_transno(mds);
129         handle = fsfilt_start(obd, inode, FSFILT_OP_SETATTR);
130         if (IS_ERR(handle)) {
131                 rc = PTR_ERR(handle);
132                 (void)mds_finish_transno(mds, handle, req, rc);
133                 GOTO(out_setattr_de, rc);
134         }
135
136         rc = fsfilt_setattr(obd, de, handle, &rec->ur_iattr);
137         if (rc == 0 && S_ISREG(inode->i_mode) &&
138             req->rq_reqmsg->bufcount > 1) {
139                 rc = fsfilt_set_md(obd, inode, handle,
140                                    lustre_msg_buf(req->rq_reqmsg, 1),
141                                    req->rq_reqmsg->buflens[1]);
142         }
143
144         body = lustre_msg_buf(req->rq_repmsg, 0);
145         mds_pack_inode2fid(&body->fid1, inode);
146         mds_pack_inode2body(body, inode);
147
148         rc = mds_finish_transno(mds, handle, req, rc);
149         err = fsfilt_commit(obd, de->d_inode, handle);
150         if (err) {
151                 CERROR("error on commit: err = %d\n", err);
152                 if (!rc)
153                         rc = err;
154         }
155
156         EXIT;
157 out_setattr_de:
158         l_dput(de);
159 out_setattr:
160         req->rq_status = rc;
161         return 0;
162 }
163
164 static int mds_reint_create(struct mds_update_record *rec, int offset,
165                             struct ptlrpc_request *req,
166                             struct lustre_handle *lh)
167 {
168         struct dentry *de = NULL;
169         struct mds_obd *mds = mds_req2mds(req);
170         struct obd_device *obd = req->rq_export->exp_obd;
171         struct dentry *dchild = NULL;
172         struct inode *dir;
173         void *handle;
174         struct lustre_handle lockh;
175         int rc = 0, err, type = rec->ur_mode & S_IFMT;
176         ENTRY;
177
178         LASSERT(offset == 0);
179         LASSERT(!strcmp(req->rq_export->exp_obd->obd_type->typ_name, "mds"));
180
181         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_REINT_CREATE))
182                 GOTO(out_create, rc = -ESTALE);
183
184         de = mds_fid2locked_dentry(obd, rec->ur_fid1, NULL, LCK_PW, &lockh);
185         if (IS_ERR(de)) {
186                 rc = PTR_ERR(de);
187                 CERROR("parent lookup error %d\n", rc);
188                 LBUG();
189                 GOTO(out_create, rc);
190         }
191         dir = de->d_inode;
192         LASSERT(dir);
193         CDEBUG(D_INODE, "parent ino %lu creating name %s mode %o\n",
194                dir->i_ino, rec->ur_name, rec->ur_mode);
195
196         ldlm_lock_dump_handle(D_OTHER, &lockh);
197
198         dchild = lookup_one_len(rec->ur_name, de, rec->ur_namelen - 1);
199         if (IS_ERR(dchild)) {
200                 rc = PTR_ERR(dchild);
201                 CERROR("child lookup error %d\n", rc);
202                 GOTO(out_create_de, rc);
203         }
204
205         OBD_FAIL_WRITE(OBD_FAIL_MDS_REINT_CREATE_WRITE,
206                        to_kdev_t(dir->i_sb->s_dev));
207
208         if (dir->i_mode & S_ISGID) {
209                 rec->ur_gid = dir->i_gid;
210                 if (S_ISDIR(rec->ur_mode))
211                         rec->ur_mode |= S_ISGID;
212         }
213
214         if (rec->ur_fid2->id)
215                 dchild->d_fsdata = (void *)(unsigned long)rec->ur_fid2->id;
216         else
217                 LASSERT(!(rec->ur_opcode & REINT_REPLAYING));
218
219         /* From here on, we must exit via a path that calls mds_finish_transno,
220          * so that we release the mds_transno_sem (and, in the case of success,
221          * update the transno correctly).  out_create_commit and
222          * out_transno_dchild are good candidates.
223          */
224         mds_start_transno(mds);
225
226         switch (type) {
227         case S_IFREG:{
228                 handle = fsfilt_start(obd, dir, FSFILT_OP_CREATE);
229                 if (IS_ERR(handle))
230                         GOTO(out_transno_dchild, rc = PTR_ERR(handle));
231                 rc = vfs_create(dir, dchild, rec->ur_mode);
232                 EXIT;
233                 break;
234         }
235         case S_IFDIR:{
236                 handle = fsfilt_start(obd, dir, FSFILT_OP_MKDIR);
237                 if (IS_ERR(handle))
238                         GOTO(out_transno_dchild, rc = PTR_ERR(handle));
239                 rc = vfs_mkdir(dir, dchild, rec->ur_mode);
240                 EXIT;
241                 break;
242         }
243         case S_IFLNK:{
244                 handle = fsfilt_start(obd, dir, FSFILT_OP_SYMLINK);
245                 if (IS_ERR(handle))
246                         GOTO(out_transno_dchild, rc = PTR_ERR(handle));
247                 rc = vfs_symlink(dir, dchild, rec->ur_tgt);
248                 EXIT;
249                 break;
250         }
251         case S_IFCHR:
252         case S_IFBLK:
253         case S_IFIFO:
254         case S_IFSOCK:{
255                 int rdev = rec->ur_rdev;
256                 handle = fsfilt_start(obd, dir, FSFILT_OP_MKNOD);
257                 if (IS_ERR(handle))
258                         GOTO(out_transno_dchild, rc = PTR_ERR(handle));
259                 rc = vfs_mknod(dir, dchild, rec->ur_mode, rdev);
260                 EXIT;
261                 break;
262         }
263         default:
264                 CERROR("bad file type %o creating %s\n", type, rec->ur_name);
265                 handle = NULL; /* quell uninitialized warning */
266                 GOTO(out_transno_dchild, rc = -EINVAL);
267         }
268
269         /* In case we stored the desired inum in here, we want to clean up.
270          * We also do this in the out_transno_dchild block, for the error cases.
271          */
272         dchild->d_fsdata = NULL;
273
274         if (rc) {
275                 CDEBUG(D_INODE, "error during create: %d\n", rc);
276                 GOTO(out_create_commit, rc);
277         } else {
278                 struct iattr iattr;
279                 struct inode *inode = dchild->d_inode;
280                 struct mds_body *body;
281
282                 iattr.ia_atime = rec->ur_time;
283                 iattr.ia_ctime = rec->ur_time;
284                 iattr.ia_mtime = rec->ur_time;
285                 iattr.ia_uid = rec->ur_uid;
286                 iattr.ia_gid = rec->ur_gid;
287                 iattr.ia_valid = ATTR_UID | ATTR_GID | ATTR_ATIME |
288                         ATTR_MTIME | ATTR_CTIME;
289
290                 if (rec->ur_fid2->id) {
291                         LASSERT(rec->ur_fid2->id == inode->i_ino);
292                         inode->i_generation = rec->ur_fid2->generation;
293                         /* Dirtied and committed by the upcoming setattr. */
294                         CDEBUG(D_INODE, "recreated ino %lu with gen %x\n",
295                                inode->i_ino, inode->i_generation);
296                 } else {
297                         CDEBUG(D_INODE, "created ino %lu with gen %x\n",
298                                inode->i_ino, inode->i_generation);
299                 }
300
301                 rc = fsfilt_setattr(obd, dchild, handle, &iattr);
302                 if (rc) {
303                         CERROR("error on setattr: rc = %d\n", rc);
304                         /* XXX should we abort here in case of error? */
305                 }
306
307                 body = lustre_msg_buf(req->rq_repmsg, offset);
308                 mds_pack_inode2fid(&body->fid1, inode);
309                 mds_pack_inode2body(body, inode);
310         }
311         EXIT;
312 out_create_commit:
313         if (rc) {
314                 rc = mds_finish_transno(mds, handle, req, rc);
315         } else {
316                 rc = mds_finish_transno(mds, handle, req, rc);
317                 if (rc)
318                         GOTO(out_create_unlink, rc);
319         }
320         err = fsfilt_commit(obd, dir, handle);
321         if (err) {
322                 CERROR("error on commit: err = %d\n", err);
323                 if (!rc)
324                         rc = err;
325         }
326 out_create_dchild:
327         l_dput(dchild);
328 out_create_de:
329         ldlm_lock_decref(&lockh, LCK_PW);
330         l_dput(de);
331 out_create:
332         req->rq_status = rc;
333         return 0;
334
335 out_transno_dchild:
336         dchild->d_fsdata = NULL;
337         /* Need to release the transno lock, and then put the dchild. */
338         LASSERT(rc);
339         mds_finish_transno(mds, handle, req, rc);
340         goto out_create_dchild;
341
342 out_create_unlink:
343         /* Destroy the file we just created.  This should not need extra
344          * journal credits, as we have already modified all of the blocks
345          * needed in order to create the file in the first place.
346          */
347         switch (type) {
348         case S_IFDIR:
349                 err = vfs_rmdir(dir, dchild);
350                 if (err)
351                         CERROR("failed rmdir in error path: rc = %d\n", err);
352                 break;
353         default:
354                 err = vfs_unlink(dir, dchild);
355                 if (err)
356                         CERROR("failed unlink in error path: rc = %d\n", err);
357                 break;
358         }
359
360         goto out_create_commit;
361 }
362
363 /* This function doesn't use ldlm_match_or_enqueue because we're always called
364  * with EX or PW locks, and the MDS is no longer allowed to match write locks,
365  * because they take the place of local semaphores.
366  *
367  * Two locks are taken in numerical order */
368 int enqueue_ordered_locks(int lock_mode, struct obd_device *obd,
369                           struct ldlm_res_id *p1_res_id,
370                           struct ldlm_res_id *p2_res_id,
371                           struct lustre_handle *p1_lockh,
372                           struct lustre_handle *p2_lockh)
373 {
374         struct ldlm_res_id res_id[2];
375         struct lustre_handle *handles[2] = {p1_lockh, p2_lockh};
376         int rc, flags;
377         ENTRY;
378
379         LASSERT(p1_res_id != NULL && p2_res_id != NULL);
380
381         CDEBUG(D_INFO, "locks before: "LPU64"/"LPU64"\n",
382                p1_res_id[0].name[0], p2_res_id[0].name[0]);
383
384         if (p1_res_id->name[0] < p2_res_id->name[0]) {
385                 handles[0] = p1_lockh;
386                 handles[1] = p2_lockh;
387                 res_id[0] = *p1_res_id;
388                 res_id[1] = *p2_res_id;
389         } else {
390                 handles[1] = p1_lockh;
391                 handles[0] = p2_lockh;
392                 res_id[1] = *p1_res_id;
393                 res_id[0] = *p2_res_id;
394         }
395
396         CDEBUG(D_INFO, "lock order: "LPU64"/"LPU64"\n",
397                p1_res_id[0].name[0], p2_res_id[0].name[0]);
398
399         flags = LDLM_FL_LOCAL_ONLY;
400         rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace, NULL, res_id[0],
401                               LDLM_PLAIN, NULL, 0, lock_mode, &flags,
402                               ldlm_completion_ast, mds_blocking_ast, NULL,
403                               NULL, handles[0]);
404         if (rc != ELDLM_OK)
405                 RETURN(-EIO);
406         ldlm_lock_dump_handle(D_OTHER, handles[0]);
407
408         if (memcmp(&res_id[0], &res_id[1], sizeof(res_id[0])) == 0) {
409                 memcpy(handles[1], handles[0], sizeof(*(handles[1])));
410                 ldlm_lock_addref(handles[1], lock_mode);
411         } else {
412                 flags = LDLM_FL_LOCAL_ONLY;
413                 rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace, NULL,
414                                       res_id[1], LDLM_PLAIN, NULL, 0, lock_mode,
415                                       &flags, ldlm_completion_ast,
416                                       mds_blocking_ast, NULL, 0, handles[1]);
417                 if (rc != ELDLM_OK) {
418                         ldlm_lock_decref(handles[0], lock_mode);
419                         RETURN(-EIO);
420                 }
421         }
422         ldlm_lock_dump_handle(D_OTHER, handles[1]);
423
424         RETURN(0);
425 }
426
427 static int mds_reint_unlink(struct mds_update_record *rec, int offset,
428                             struct ptlrpc_request *req,
429                             struct lustre_handle *child_lockh)
430 {
431         struct dentry *dir_de = NULL;
432         struct dentry *dchild = NULL;
433         struct mds_obd *mds = mds_req2mds(req);
434         struct obd_device *obd = req->rq_export->exp_obd;
435         struct mds_body *body = NULL;
436         struct inode *dir_inode, *child_inode;
437         struct lustre_handle *handle, parent_lockh;
438         struct ldlm_res_id child_res_id = { .name = {0} };
439         char *name;
440         int namelen, err, rc = 0, flags = 0, return_lock = 0;
441         ENTRY;
442
443         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_REINT_UNLINK))
444                 GOTO(out, rc = -ENOENT);
445
446         /* Step 1: Lookup the parent by FID */
447         dir_de = mds_fid2locked_dentry(obd, rec->ur_fid1, NULL, LCK_PW,
448                                        &parent_lockh);
449         if (IS_ERR(dir_de))
450                 GOTO(out, rc = PTR_ERR(dir_de));
451         dir_inode = dir_de->d_inode;
452         LASSERT(dir_inode);
453
454         /* Step 2: Lookup the child */
455         name = lustre_msg_buf(req->rq_reqmsg, offset + 1);
456         namelen = req->rq_reqmsg->buflens[offset + 1] - 1;
457
458         dchild = lookup_one_len(name, dir_de, namelen);
459         if (IS_ERR(dchild))
460                 GOTO(out_step_2a, rc = PTR_ERR(dchild));
461         child_inode = dchild->d_inode;
462         if (child_inode == NULL) {
463                 if (rec->ur_opcode & REINT_REPLAYING) {
464                         CDEBUG(D_INODE,
465                                "child missing (%lu/%s); OK for REPLAYING\n",
466                                dir_inode->i_ino, rec->ur_name);
467                         rc = 0;
468                 } else {
469                         CDEBUG(D_INODE,
470                                "child doesn't exist (dir %lu, name %s)\n",
471                                dir_inode->i_ino, rec->ur_name);
472                         rc = -ENOENT;
473                 }
474                 GOTO(out_step_2b, rc);
475         }
476
477         DEBUG_REQ(D_INODE, req, "parent ino %lu, child ino %lu",
478                   dir_inode->i_ino, child_inode->i_ino);
479
480         /* Step 3: Get lock a lock on the child */
481         child_res_id.name[0] = child_inode->i_ino;
482         child_res_id.name[1] = child_inode->i_generation;
483
484         rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace, NULL,
485                               child_res_id, LDLM_PLAIN, NULL, 0, LCK_EX,
486                               &flags, ldlm_completion_ast, mds_blocking_ast,
487                               NULL, NULL, child_lockh);
488         if (rc != ELDLM_OK)
489                 GOTO(out_step_2b, rc);
490
491         OBD_FAIL_WRITE(OBD_FAIL_MDS_REINT_UNLINK_WRITE,
492                        to_kdev_t(dir_inode->i_sb->s_dev));
493
494         /* Slightly magical; see ldlm_intent_policy */
495         if (offset)
496                 offset = 1;
497
498         body = lustre_msg_buf(req->rq_repmsg, offset);
499
500         /* Step 4: Do the unlink: client decides between rmdir/unlink!
501          * (bug 72) */
502         mds_start_transno(mds);
503         switch (rec->ur_mode & S_IFMT) {
504         case S_IFDIR:
505                 handle = fsfilt_start(obd, dir_inode, FSFILT_OP_RMDIR);
506                 if (IS_ERR(handle))
507                         GOTO(out_cancel_transno, rc = PTR_ERR(handle));
508                 rc = vfs_rmdir(dir_inode, dchild);
509                 break;
510         case S_IFREG:
511                 /* If this is the last reference to this inode, get the OBD EA
512                  * data first so the client can destroy OST objects */
513                 if ((child_inode->i_mode & S_IFMT) == S_IFREG &&
514                     child_inode->i_nlink == 1) {
515                         mds_pack_inode2fid(&body->fid1, child_inode);
516                         mds_pack_inode2body(body, child_inode);
517                         mds_pack_md(obd, req->rq_repmsg, offset + 1,
518                                     body, child_inode);
519                         if (body->valid & OBD_MD_FLEASIZE)
520                                 return_lock = 1;
521                 }
522                 /* no break */
523         case S_IFLNK:
524         case S_IFCHR:
525         case S_IFBLK:
526         case S_IFIFO:
527         case S_IFSOCK:
528                 handle = fsfilt_start(obd, dir_inode, FSFILT_OP_UNLINK);
529                 if (IS_ERR(handle))
530                         GOTO(out_cancel_transno, rc = PTR_ERR(handle));
531                 rc = vfs_unlink(dir_inode, dchild);
532                 break;
533         default:
534                 CERROR("bad file type %o unlinking %s\n", rec->ur_mode, name);
535                 handle = NULL;
536                 LBUG();
537                 GOTO(out_cancel_transno, rc = -EINVAL);
538         }
539
540         rc = mds_finish_transno(mds, handle, req, rc);
541         err = fsfilt_commit(obd, dir_inode, handle);
542         if (rc != 0 || err != 0) {
543                 /* Don't unlink the OST objects if the MDS unlink failed */
544                 body->valid = 0;
545         }
546         if (err) {
547                 CERROR("error on commit: err = %d\n", err);
548                 if (!rc)
549                         rc = err;
550         }
551
552         GOTO(out_step_4, rc);
553  out_step_4:
554         if (rc != 0 || return_lock == 0)
555                 ldlm_lock_decref(child_lockh, LCK_EX);
556  out_step_2b:
557         l_dput(dchild);
558  out_step_2a:
559         ldlm_lock_decref(&parent_lockh, LCK_EX);
560         l_dput(dir_de);
561  out:
562         req->rq_status = rc;
563         return 0;
564
565  out_cancel_transno:
566         rc = mds_finish_transno(mds, handle, req, rc);
567         goto out_step_4;
568 }
569
570 static int mds_reint_link(struct mds_update_record *rec, int offset,
571                           struct ptlrpc_request *req, struct lustre_handle *lh)
572 {
573         struct obd_device *obd = req->rq_export->exp_obd;
574         struct dentry *de_src = NULL;
575         struct dentry *de_tgt_dir = NULL;
576         struct dentry *dchild = NULL;
577         struct mds_obd *mds = mds_req2mds(req);
578         struct lustre_handle *handle, tgt_dir_lockh, src_lockh;
579         struct ldlm_res_id src_res_id = { .name = {0} };
580         struct ldlm_res_id tgt_dir_res_id = { .name = {0} };
581         int lock_mode, rc = 0, err;
582         ENTRY;
583
584         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_REINT_LINK))
585                 GOTO(out, rc = -ENOENT);
586
587         /* Step 1: Lookup the source inode and target directory by FID */
588         de_src = mds_fid2dentry(mds, rec->ur_fid1, NULL);
589         if (IS_ERR(de_src))
590                 GOTO(out, rc = PTR_ERR(de_src));
591
592         de_tgt_dir = mds_fid2dentry(mds, rec->ur_fid2, NULL);
593         if (IS_ERR(de_tgt_dir))
594                 GOTO(out_de_src, rc = PTR_ERR(de_tgt_dir));
595
596         CDEBUG(D_INODE, "linking %*s/%s to inode %lu\n",
597                de_tgt_dir->d_name.len, de_tgt_dir->d_name.name, rec->ur_name,
598                de_src->d_inode->i_ino);
599
600         /* Step 2: Take the two locks */
601         lock_mode = LCK_EX;
602         src_res_id.name[0] = de_src->d_inode->i_ino;
603         src_res_id.name[1] = de_src->d_inode->i_generation;
604         tgt_dir_res_id.name[0] = de_tgt_dir->d_inode->i_ino;
605         tgt_dir_res_id.name[1] = de_tgt_dir->d_inode->i_generation;
606
607         rc = enqueue_ordered_locks(LCK_EX, obd, &src_res_id, &tgt_dir_res_id,
608                                    &src_lockh, &tgt_dir_lockh);
609         if (rc != ELDLM_OK)
610                 GOTO(out_tgt_dir, rc = -EIO);
611
612         /* Step 3: Lookup the child */
613         dchild = lookup_one_len(rec->ur_name, de_tgt_dir, rec->ur_namelen - 1);
614         if (IS_ERR(dchild)) {
615                 CERROR("child lookup error %ld\n", PTR_ERR(dchild));
616                 GOTO(out_drop_locks, rc = PTR_ERR(dchild));
617         }
618
619         if (dchild->d_inode) {
620                 if (rec->ur_opcode & REINT_REPLAYING) {
621                         /* XXX verify that the link is to the the right file? */
622                         CDEBUG(D_INODE,
623                                "child exists (dir %lu, name %s) (REPLAYING)\n",
624                                de_tgt_dir->d_inode->i_ino, rec->ur_name);
625                         rc = 0;
626                 } else {
627                         CDEBUG(D_INODE, "child exists (dir %lu, name %s)\n",
628                                de_tgt_dir->d_inode->i_ino, rec->ur_name);
629                         rc = -EEXIST;
630                 }
631                 GOTO(out_drop_child, rc);
632         }
633
634         /* Step 4: Do it. */
635         OBD_FAIL_WRITE(OBD_FAIL_MDS_REINT_LINK_WRITE,
636                        to_kdev_t(de_src->d_inode->i_sb->s_dev));
637
638         mds_start_transno(mds);
639         handle = fsfilt_start(obd, de_tgt_dir->d_inode, FSFILT_OP_LINK);
640         if (IS_ERR(handle)) {
641                 rc = PTR_ERR(handle);
642                 mds_finish_transno(mds, handle, req, rc);
643                 GOTO(out_drop_child, rc);
644         }
645
646         rc = vfs_link(de_src, de_tgt_dir->d_inode, dchild);
647         if (rc)
648                 CERROR("link error %d\n", rc);
649         rc = mds_finish_transno(mds, handle, req, rc);
650
651         err = fsfilt_commit(obd, de_tgt_dir->d_inode, handle);
652         if (err) {
653                 CERROR("error on commit: err = %d\n", err);
654                 if (!rc)
655                         rc = err;
656         }
657
658         EXIT;
659
660 out_drop_child:
661         l_dput(dchild);
662 out_drop_locks:
663         ldlm_lock_decref(&src_lockh, lock_mode);
664         ldlm_lock_decref(&tgt_dir_lockh, lock_mode);
665 out_tgt_dir:
666         l_dput(de_tgt_dir);
667 out_de_src:
668         l_dput(de_src);
669 out:
670         req->rq_status = rc;
671         return 0;
672 }
673
674 static int mds_reint_rename(struct mds_update_record *rec, int offset,
675                             struct ptlrpc_request *req,
676                             struct lustre_handle *lockh)
677 {
678         struct obd_device *obd = req->rq_export->exp_obd;
679         struct dentry *de_srcdir = NULL;
680         struct dentry *de_tgtdir = NULL;
681         struct dentry *de_old = NULL;
682         struct dentry *de_new = NULL;
683         struct mds_obd *mds = mds_req2mds(req);
684         struct lustre_handle dlm_handles[4];
685         struct ldlm_res_id p1_res_id = { .name = {0} };
686         struct ldlm_res_id p2_res_id = { .name = {0} };
687         struct ldlm_res_id c1_res_id = { .name = {0} };
688         struct ldlm_res_id c2_res_id = { .name = {0} };
689         int rc = 0, err, lock_count = 3, flags = LDLM_FL_LOCAL_ONLY;
690         void *handle;
691         ENTRY;
692
693         de_srcdir = mds_fid2dentry(mds, rec->ur_fid1, NULL);
694         if (IS_ERR(de_srcdir))
695                 GOTO(out, rc = PTR_ERR(de_srcdir));
696         de_tgtdir = mds_fid2dentry(mds, rec->ur_fid2, NULL);
697         if (IS_ERR(de_tgtdir))
698                 GOTO(out_put_srcdir, rc = PTR_ERR(de_tgtdir));
699
700         /* The idea here is that we need to get four locks in the end:
701          * one on each parent directory, one on each child.  We need to take
702          * these locks in some kind of order (to avoid deadlocks), and the order
703          * I selected is "increasing resource number" order.  We need to take
704          * the locks on the parent directories, however, before we can lookup
705          * the children.  Thus the following plan:
706          *
707          * 1. Take locks on the parent(s), in order
708          * 2. Lookup the children
709          * 3. Take locks on the children, in order
710          * 4. Execute the rename
711          */
712
713         /* Step 1: Take locks on the parent(s), in order */
714         p1_res_id.name[0] = de_srcdir->d_inode->i_ino;
715         p1_res_id.name[1] = de_srcdir->d_inode->i_generation;
716
717         p2_res_id.name[0] = de_tgtdir->d_inode->i_ino;
718         p2_res_id.name[1] = de_tgtdir->d_inode->i_generation;
719
720         rc = enqueue_ordered_locks(LCK_EX, obd, &p1_res_id, &p2_res_id,
721                                    &(dlm_handles[0]), &(dlm_handles[1]));
722         if (rc != ELDLM_OK)
723                 GOTO(out_put_tgtdir, rc);
724
725         /* Step 2: Lookup the children */
726         de_old = lookup_one_len(rec->ur_name, de_srcdir, rec->ur_namelen - 1);
727         if (IS_ERR(de_old)) {
728                 CERROR("old child lookup error (%*s): %ld\n",
729                        rec->ur_namelen - 1, rec->ur_name, PTR_ERR(de_old));
730                 GOTO(out_step_2a, rc = PTR_ERR(de_old));
731         }
732
733         if (de_old->d_inode == NULL)
734                 GOTO(out_step_2b, rc = -ENOENT);
735
736         de_new = lookup_one_len(rec->ur_tgt, de_tgtdir, rec->ur_tgtlen - 1);
737         if (IS_ERR(de_new)) {
738                 CERROR("new child lookup error (%*s): %ld\n",
739                        rec->ur_tgtlen - 1, rec->ur_tgt, PTR_ERR(de_new));
740                 GOTO(out_step_2b, rc = PTR_ERR(de_new));
741         }
742
743         /* Step 3: Take locks on the children */
744         c1_res_id.name[0] = de_old->d_inode->i_ino;
745         c1_res_id.name[1] = de_old->d_inode->i_generation;
746         if (de_new->d_inode == NULL) {
747                 flags = LDLM_FL_LOCAL_ONLY;
748                 rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace, NULL,
749                                       c1_res_id, LDLM_PLAIN, NULL, 0, LCK_EX,
750                                       &flags, ldlm_completion_ast,
751                                       mds_blocking_ast, NULL, NULL,
752                                       &(dlm_handles[2]));
753                 lock_count = 3;
754         } else {
755                 c2_res_id.name[0] = de_new->d_inode->i_ino;
756                 c2_res_id.name[1] = de_new->d_inode->i_generation;
757                 rc = enqueue_ordered_locks(LCK_EX, obd, &c1_res_id, &c2_res_id,
758                                            &(dlm_handles[2]),
759                                            &(dlm_handles[3]));
760                 lock_count = 4;
761         }
762         if (rc != ELDLM_OK)
763                 GOTO(out_step_3, rc);
764
765         /* Step 4: Execute the rename */
766         OBD_FAIL_WRITE(OBD_FAIL_MDS_REINT_RENAME_WRITE,
767                        to_kdev_t(de_srcdir->d_inode->i_sb->s_dev));
768
769         mds_start_transno(mds);
770         handle = fsfilt_start(obd, de_tgtdir->d_inode, FSFILT_OP_RENAME);
771         if (IS_ERR(handle)) {
772                 rc = PTR_ERR(handle);
773                 mds_finish_transno(mds, handle, req, rc);
774                 GOTO(out_step_4, rc);
775         }
776
777         lock_kernel();
778         rc = vfs_rename(de_srcdir->d_inode, de_old, de_tgtdir->d_inode, de_new,
779                         NULL);
780         unlock_kernel();
781
782         rc = mds_finish_transno(mds, handle, req, rc);
783
784         err = fsfilt_commit(obd, de_tgtdir->d_inode, handle);
785         if (err) {
786                 CERROR("error on commit: err = %d\n", err);
787                 if (!rc)
788                         rc = err;
789         }
790
791         EXIT;
792  out_step_4:
793         ldlm_lock_decref(&(dlm_handles[2]), LCK_EX);
794         if (lock_count == 4)
795                 ldlm_lock_decref(&(dlm_handles[3]), LCK_EX);
796  out_step_3:
797         l_dput(de_new);
798  out_step_2b:
799         l_dput(de_old);
800  out_step_2a:
801         ldlm_lock_decref(&(dlm_handles[0]), LCK_EX);
802         ldlm_lock_decref(&(dlm_handles[1]), LCK_EX);
803  out_put_tgtdir:
804         l_dput(de_tgtdir);
805  out_put_srcdir:
806         l_dput(de_srcdir);
807  out:
808         req->rq_status = rc;
809         return 0;
810 }
811
812 typedef int (*mds_reinter)(struct mds_update_record *, int offset,
813                            struct ptlrpc_request *, struct lustre_handle *);
814
815 static mds_reinter reinters[REINT_MAX + 1] = {
816         [REINT_SETATTR] mds_reint_setattr,
817         [REINT_CREATE] mds_reint_create,
818         [REINT_UNLINK] mds_reint_unlink,
819         [REINT_LINK] mds_reint_link,
820         [REINT_RENAME] mds_reint_rename,
821         [REINT_OPEN] mds_open
822 };
823
824 int mds_reint_rec(struct mds_update_record *rec, int offset,
825                   struct ptlrpc_request *req, struct lustre_handle *lockh)
826 {
827         struct mds_obd *mds = mds_req2mds(req);
828         struct obd_run_ctxt saved;
829         struct obd_ucred uc;
830         int realop = rec->ur_opcode & REINT_OPCODE_MASK, rc;
831         ENTRY;
832
833         if (realop < 1 || realop > REINT_MAX) {
834                 CERROR("opcode %d not valid (%sREPLAYING)\n", realop,
835                        rec->ur_opcode & REINT_REPLAYING ? "" : "not ");
836                 rc = req->rq_status = -EINVAL;
837                 RETURN(rc);
838         }
839
840         uc.ouc_fsuid = rec->ur_fsuid;
841         uc.ouc_fsgid = rec->ur_fsgid;
842         uc.ouc_cap = rec->ur_cap;
843         uc.ouc_suppgid = rec->ur_suppgid;
844
845         push_ctxt(&saved, &mds->mds_ctxt, &uc);
846         rc = reinters[realop] (rec, offset, req, lockh);
847         pop_ctxt(&saved, &mds->mds_ctxt, &uc);
848
849         RETURN(rc);
850 }