Whamcloud - gitweb
Add client RPC xid to per-client last_rcvd data. If the MDS dies but the
[fs/lustre-release.git] / lustre / mds / mds_reint.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  linux/mds/mds_reint.c
5  *
6  *  Lustre Metadata Server (mds) reintegration routines
7  *
8  *  Copyright (C) 2002  Cluster File Systems, Inc.
9  *  author: Peter Braam <braam@clusterfs.com>
10  *
11  *  This code is issued under the GNU General Public License.
12  *  See the file COPYING in this distribution
13  *
14  */
15
16 // XXX - add transaction sequence numbers
17
18 #define EXPORT_SYMTAB
19
20 #include <linux/version.h>
21 #include <linux/module.h>
22 #include <linux/fs.h>
23 #include <linux/stat.h>
24 #include <linux/locks.h>
25 #include <linux/quotaops.h>
26 #include <asm/unistd.h>
27 #include <asm/uaccess.h>
28
29 #define DEBUG_SUBSYSTEM S_MDS
30
31 #include <linux/obd_support.h>
32 #include <linux/obd_class.h>
33 #include <linux/obd.h>
34 #include <linux/lustre_lib.h>
35 #include <linux/lustre_idl.h>
36 #include <linux/lustre_mds.h>
37 #include <linux/obd_class.h>
38
39 int mds_update_last_rcvd(struct mds_obd *mds, struct ptlrpc_request *req)
40 {
41         /* get from req->rq_connection-> or req->rq_client */
42         struct mds_client_info mci_data, *mci = &mci_data;
43         struct mds_client_data mcd_data, *mcd = &mcd_data;
44         loff_t off;
45         int rc;
46
47         /* Just a placeholder until more gets committed */
48         memset(mcd, 0, sizeof(*mcd));
49         mci->mci_mcd = mcd;
50         mci->mci_off = off = MDS_LR_CLIENT;
51
52         ++mds->mds_last_rcvd;   /* lock this, or make it an LDLM function? */
53         mci->mci_mcd->mcd_last_rcvd = cpu_to_le64(mds->mds_last_rcvd);
54         mci->mci_mcd->mcd_mount_count = cpu_to_le64(mds->mds_mount_count);
55         mci->mci_mcd->mcd_xid = cpu_to_le32(req->rq_connection->c_xid_in);
56         rc = lustre_fwrite(mds->mds_rcvd_filp, (char *)mci->mci_mcd,
57                            sizeof(*mci->mci_mcd), &off);
58         CDEBUG(D_INODE, "wrote trans #%Ld for client '%s' at %Ld: rc = %d\n",
59                mds->mds_last_rcvd, mci->mci_mcd->mcd_uuid, mci->mci_off, rc);
60         // store new value and last committed value in req struct
61
62         if (rc == sizeof(mci->mci_mcd))
63                 rc = 0;
64         else if (rc >= 0)
65                 rc = -EIO;
66
67         return rc;
68 }
69
70 static int mds_reint_setattr(struct mds_update_record *rec,
71                              struct ptlrpc_request *req)
72 {
73         struct mds_obd *mds = &req->rq_obd->u.mds;
74         struct dentry *de;
75         void *handle;
76         int rc = 0;
77
78         de = mds_fid2dentry(mds, rec->ur_fid1, NULL);
79         if (IS_ERR(de) || OBD_FAIL_CHECK(OBD_FAIL_MDS_REINT_SETATTR)) {
80                 GOTO(out_setattr, rc = -ESTALE);
81         }
82
83         CDEBUG(D_INODE, "ino %ld\n", de->d_inode->i_ino);
84
85         OBD_FAIL_WRITE(OBD_FAIL_MDS_REINT_SETATTR_WRITE,
86                        de->d_inode->i_sb->s_dev);
87
88         handle = mds_fs_start(mds, de->d_inode, MDS_FSOP_SETATTR);
89         if (!handle)
90                 GOTO(out_setattr_de, rc = PTR_ERR(handle));
91         rc = mds_fs_setattr(mds, de, handle, &rec->ur_iattr);
92
93         if (!rc)
94                 rc = mds_update_last_rcvd(mds, req);
95
96         EXIT;
97
98         /* FIXME: keep rc intact */
99         rc = mds_fs_commit(mds, de->d_inode, handle);
100 out_setattr_de:
101         l_dput(de);
102 out_setattr:
103         req->rq_status = rc;
104         return(0);
105 }
106
107 static int mds_reint_create(struct mds_update_record *rec,
108                             struct ptlrpc_request *req)
109 {
110         struct dentry *de = NULL;
111         struct mds_obd *mds = &req->rq_obd->u.mds;
112         struct dentry *dchild = NULL;
113         struct inode *dir;
114         void *handle;
115         int rc = 0, type = rec->ur_mode & S_IFMT;
116         ENTRY;
117
118         de = mds_fid2dentry(mds, rec->ur_fid1, NULL);
119         if (IS_ERR(de) || OBD_FAIL_CHECK(OBD_FAIL_MDS_REINT_CREATE)) {
120                 LBUG();
121                 GOTO(out_create_de, rc = -ESTALE);
122         }
123         dir = de->d_inode;
124         CDEBUG(D_INODE, "ino %ld\n", dir->i_ino);
125
126         down(&dir->i_sem);
127         dchild = lookup_one_len(rec->ur_name, de, rec->ur_namelen - 1);
128         if (IS_ERR(dchild)) {
129                 CERROR("child lookup error %ld\n", PTR_ERR(dchild));
130                 up(&dir->i_sem);
131                 LBUG();
132                 GOTO(out_create_dchild, rc = -ESTALE);
133         }
134
135         if (dchild->d_inode) {
136                 CERROR("child exists (dir %ld, name %s)\n",
137                        dir->i_ino, rec->ur_name);
138                 LBUG();
139                 GOTO(out_create_dchild, rc = -EEXIST);
140         }
141
142         OBD_FAIL_WRITE(OBD_FAIL_MDS_REINT_CREATE_WRITE, dir->i_sb->s_dev);
143
144         switch (type) {
145         case S_IFREG: {
146                 handle = mds_fs_start(mds, dir, MDS_FSOP_CREATE);
147                 if (!handle)
148                         GOTO(out_create_dchild, PTR_ERR(handle));
149                 rc = vfs_create(dir, dchild, rec->ur_mode);
150                 EXIT;
151                 break;
152         }
153         case S_IFDIR: {
154                 handle = mds_fs_start(mds, dir, MDS_FSOP_MKDIR);
155                 if (!handle)
156                         GOTO(out_create_dchild, PTR_ERR(handle));
157                 rc = vfs_mkdir(dir, dchild, rec->ur_mode);
158                 EXIT;
159                 break;
160         }
161         case S_IFLNK: {
162                 handle = mds_fs_start(mds, dir, MDS_FSOP_SYMLINK);
163                 if (!handle)
164                         GOTO(out_create_dchild, PTR_ERR(handle));
165                 rc = vfs_symlink(dir, dchild, rec->ur_tgt);
166                 EXIT;
167                 break;
168         }
169         case S_IFCHR:
170         case S_IFBLK:
171         case S_IFIFO:
172         case S_IFSOCK: {
173                 int rdev = rec->ur_id;
174                 handle = mds_fs_start(mds, dir, MDS_FSOP_MKNOD);
175                 if (!handle)
176                         GOTO(out_create_dchild, PTR_ERR(handle));
177                 rc = vfs_mknod(dir, dchild, rec->ur_mode, rdev);
178                 EXIT;
179                 break;
180         }
181         default:
182                 CERROR("bad file type %d for create of %s\n",type,rec->ur_name);
183                 GOTO(out_create_dchild, rc = -EINVAL);
184         }
185
186         if (rc) {
187                 CERROR("error during create: %d\n", rc);
188                 LBUG();
189                 GOTO(out_create_commit, rc);
190         } else {
191                 struct iattr iattr;
192                 struct inode *inode = dchild->d_inode;
193                 struct mds_body *body;
194
195                 if (type == S_IFREG) {
196                         rc = mds_fs_set_objid(mds, inode, handle, rec->ur_id);
197                         if (rc)
198                                 CERROR("error %d setting objid for %ld\n",
199                                        rc, inode->i_ino);
200                 }
201
202                 iattr.ia_atime = rec->ur_time;
203                 iattr.ia_ctime = rec->ur_time;
204                 iattr.ia_mtime = rec->ur_time;
205                 iattr.ia_uid = rec->ur_uid;
206                 iattr.ia_gid = rec->ur_gid;
207                 iattr.ia_valid = ATTR_UID | ATTR_GID | ATTR_ATIME |
208                         ATTR_MTIME | ATTR_CTIME;
209
210                 rc = mds_fs_setattr(mds, dchild, handle, &iattr);
211                 /* XXX should we abort here in case of error? */
212
213                 body = lustre_msg_buf(req->rq_repmsg, 0);
214                 body->ino = inode->i_ino;
215                 body->generation = inode->i_generation;
216         }
217
218         if (!rc)
219                 rc = mds_update_last_rcvd(mds, req);
220
221 out_create_commit:
222         /* FIXME: keep rc intact */
223         rc = mds_fs_commit(mds, dir, handle);
224 out_create_dchild:
225         l_dput(dchild);
226         up(&dir->i_sem);
227 out_create_de:
228         l_dput(de);
229         req->rq_status = rc;
230         return 0;
231 }
232
233 static int mds_reint_unlink(struct mds_update_record *rec,
234                             struct ptlrpc_request *req)
235 {
236         struct dentry *de = NULL;
237         struct dentry *dchild = NULL;
238         struct mds_obd *mds = &req->rq_obd->u.mds;
239         struct inode *dir, *inode;
240         void *handle;
241         int rc = 0;
242         ENTRY;
243
244         de = mds_fid2dentry(mds, rec->ur_fid1, NULL);
245         if (IS_ERR(de) || OBD_FAIL_CHECK(OBD_FAIL_MDS_REINT_UNLINK)) {
246                 LBUG();
247                 GOTO(out_unlink, rc = -ESTALE);
248         }
249         dir = de->d_inode;
250         CDEBUG(D_INODE, "ino %ld\n", dir->i_ino);
251
252         down(&dir->i_sem);
253         dchild = lookup_one_len(rec->ur_name, de, rec->ur_namelen - 1);
254         if (IS_ERR(dchild)) {
255                 CERROR("child lookup error %ld\n", PTR_ERR(dchild));
256                 LBUG();
257                 GOTO(out_unlink_de, rc = -ESTALE);
258         }
259
260         inode = dchild->d_inode;
261         if (!inode) {
262                 CERROR("child doesn't exist (dir %ld, name %s\n",
263                        dir->i_ino, rec->ur_name);
264                 LBUG();
265                 GOTO(out_unlink_dchild, rc = -ESTALE);
266         }
267
268         if (inode->i_ino != rec->ur_fid2->id) {
269                 CERROR("inode and FID ID do not match (%ld != %Ld)\n",
270                        inode->i_ino, rec->ur_fid2->id);
271                 LBUG();
272                 GOTO(out_unlink_dchild, rc = -ESTALE);
273         }
274         if (inode->i_generation != rec->ur_fid2->generation) {
275                 CERROR("inode and FID GENERATION do not match (%d != %d)\n",
276                        inode->i_generation, rec->ur_fid2->generation);
277                 LBUG();
278                 GOTO(out_unlink_dchild, rc = -ESTALE);
279         }
280
281         OBD_FAIL_WRITE(OBD_FAIL_MDS_REINT_UNLINK_WRITE, dir->i_sb->s_dev);
282
283         switch (dchild->d_inode->i_mode & S_IFMT) {
284         case S_IFDIR:
285                 handle = mds_fs_start(mds, dir, MDS_FSOP_RMDIR);
286                 if (!handle)
287                         GOTO(out_unlink_dchild, rc = PTR_ERR(handle));
288                 rc = vfs_rmdir(dir, dchild);
289                 break;
290         default:
291                 handle = mds_fs_start(mds, dir, MDS_FSOP_UNLINK);
292                 if (!handle)
293                         GOTO(out_unlink_dchild, rc = PTR_ERR(handle));
294                 rc = vfs_unlink(dir, dchild);
295                 break;
296         }
297
298         if (!rc)
299                 rc = mds_update_last_rcvd(mds, req);
300         /* FIXME: keep rc intact */
301         rc = mds_fs_commit(mds, dir, handle);
302
303         EXIT;
304 out_unlink_dchild:
305         l_dput(dchild);
306 out_unlink_de:
307         up(&dir->i_sem);
308         l_dput(de);
309 out_unlink:
310         req->rq_status = rc;
311         return 0;
312 }
313
314 static int mds_reint_link(struct mds_update_record *rec,
315                             struct ptlrpc_request *req)
316 {
317         struct dentry *de_src = NULL;
318         struct dentry *de_tgt_dir = NULL;
319         struct dentry *dchild = NULL;
320         struct mds_obd *mds = &req->rq_obd->u.mds;
321         void *handle;
322         int rc = 0;
323
324         ENTRY;
325         de_src = mds_fid2dentry(mds, rec->ur_fid1, NULL);
326         if (IS_ERR(de_src) || OBD_FAIL_CHECK(OBD_FAIL_MDS_REINT_LINK)) {
327                 GOTO(out_link, rc = -ESTALE);
328         }
329
330         de_tgt_dir = mds_fid2dentry(mds, rec->ur_fid2, NULL);
331         if (IS_ERR(de_tgt_dir)) {
332                 GOTO(out_link_de_src, rc = -ESTALE);
333         }
334
335         down(&de_tgt_dir->d_inode->i_sem);
336         dchild = lookup_one_len(rec->ur_name, de_tgt_dir, rec->ur_namelen - 1);
337         if (IS_ERR(dchild)) {
338                 CERROR("child lookup error %ld\n", PTR_ERR(dchild));
339                 GOTO(out_link_de_tgt_dir, rc = -ESTALE);
340         }
341
342         if (dchild->d_inode) {
343                 CERROR("child exists (dir %ld, name %s\n",
344                        de_tgt_dir->d_inode->i_ino, rec->ur_name);
345                 GOTO(out_link_dchild, rc = -EEXIST);
346         }
347
348         OBD_FAIL_WRITE(OBD_FAIL_MDS_REINT_LINK_WRITE,
349                        dchild->d_inode->i_sb->s_dev);
350
351         handle = mds_fs_start(mds, de_tgt_dir->d_inode, MDS_FSOP_LINK);
352         if (!handle)
353                 GOTO(out_link_dchild, rc = PTR_ERR(handle));
354
355         rc = vfs_link(de_src, de_tgt_dir->d_inode, dchild);
356
357         if (!rc)
358                 rc = mds_update_last_rcvd(mds, req);
359
360         /* FIXME: keep rc intact */
361         rc = mds_fs_commit(mds, de_tgt_dir->d_inode, handle);
362         EXIT;
363
364 out_link_dchild:
365         l_dput(dchild);
366 out_link_de_tgt_dir:
367         up(&de_tgt_dir->d_inode->i_sem);
368         l_dput(de_tgt_dir);
369 out_link_de_src:
370         l_dput(de_src);
371 out_link:
372         req->rq_status = rc;
373         return 0;
374 }
375
376 static int mds_reint_rename(struct mds_update_record *rec,
377                             struct ptlrpc_request *req)
378 {
379         struct dentry *de_srcdir = NULL;
380         struct dentry *de_tgtdir = NULL;
381         struct dentry *de_old = NULL;
382         struct dentry *de_new = NULL;
383         struct mds_obd *mds = &req->rq_obd->u.mds;
384         void *handle;
385         int rc = 0;
386         ENTRY;
387
388         de_srcdir = mds_fid2dentry(mds, rec->ur_fid1, NULL);
389         if (IS_ERR(de_srcdir)) {
390                 GOTO(out_rename, rc = -ESTALE);
391         }
392
393         de_tgtdir = mds_fid2dentry(mds, rec->ur_fid2, NULL);
394         if (IS_ERR(de_tgtdir)) {
395                 GOTO(out_rename_srcdir, rc = -ESTALE);
396         }
397
398         de_old = lookup_one_len(rec->ur_name, de_srcdir, rec->ur_namelen - 1);
399         if (IS_ERR(de_old)) {
400                 CERROR("old child lookup error %ld\n", PTR_ERR(de_old));
401                 GOTO(out_rename_tgtdir, rc = -ESTALE);
402         }
403
404         de_new = lookup_one_len(rec->ur_tgt, de_tgtdir, rec->ur_tgtlen - 1);
405         if (IS_ERR(de_new)) {
406                 CERROR("new child lookup error %ld\n", PTR_ERR(de_new));
407                 GOTO(out_rename_deold, rc = -ESTALE);
408         }
409
410         OBD_FAIL_WRITE(OBD_FAIL_MDS_REINT_RENAME_WRITE,
411                        de_srcdir->d_inode->i_sb->s_dev);
412
413         handle = mds_fs_start(mds, de_tgtdir->d_inode, MDS_FSOP_RENAME);
414         if (!handle)
415                 GOTO(out_rename_denew, rc = PTR_ERR(handle));
416         rc = vfs_rename(de_srcdir->d_inode, de_old, de_tgtdir->d_inode, de_new);
417
418         if (!rc)
419                 rc = mds_update_last_rcvd(mds, req);
420
421         /* FIXME: keep rc intact */
422         rc = mds_fs_commit(mds, de_tgtdir->d_inode, handle);
423         EXIT;
424
425 out_rename_denew:
426         l_dput(de_new);
427 out_rename_deold:
428         l_dput(de_old);
429 out_rename_tgtdir:
430         l_dput(de_tgtdir);
431 out_rename_srcdir:
432         l_dput(de_srcdir);
433 out_rename:
434         req->rq_status = rc;
435         return 0;
436 }
437
438 typedef int (*mds_reinter)(struct mds_update_record *, struct ptlrpc_request*);
439
440 static mds_reinter reinters[REINT_MAX+1] = {
441         [REINT_SETATTR]   mds_reint_setattr,
442         [REINT_CREATE]    mds_reint_create,
443         [REINT_UNLINK]    mds_reint_unlink,
444         [REINT_LINK]      mds_reint_link,
445         [REINT_RENAME]    mds_reint_rename,
446 };
447
448 int mds_reint_rec(struct mds_update_record *rec, struct ptlrpc_request *req)
449 {
450         int rc, size = sizeof(struct mds_body);
451
452         if (rec->ur_opcode < 1 || rec->ur_opcode > REINT_MAX) {
453                 CERROR("opcode %d not valid\n", rec->ur_opcode);
454                 rc = req->rq_status = -EINVAL;
455                 RETURN(rc);
456         }
457
458         rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
459         if (rc) {
460                 CERROR("mds: out of memory\n");
461                 rc = req->rq_status = -ENOMEM;
462                 RETURN(rc);
463         }
464
465         rc = reinters[rec->ur_opcode](rec, req);
466         return rc;
467 }