Whamcloud - gitweb
3255b7ee893a07e3e41dfaf1d8564ee27579c36c
[fs/lustre-release.git] / lustre / mds / mds_reint.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  linux/mds/mds_reint.c
5  *
6  *  Lustre Metadata Server (mds) reintegration routines
7  *
8  *  Copyright (C) 2002  Cluster File Systems, Inc.
9  *  author: Peter Braam <braam@clusterfs.com>
10  *
11  *  This code is issued under the GNU General Public License.
12  *  See the file COPYING in this distribution
13  *
14  */
15
16 // XXX - add transaction sequence numbers
17
18 #define EXPORT_SYMTAB
19
20 #include <linux/version.h>
21 #include <linux/module.h>
22 #include <linux/fs.h>
23 #include <linux/stat.h>
24 #include <linux/locks.h>
25 #include <linux/quotaops.h>
26 #include <asm/unistd.h>
27 #include <asm/uaccess.h>
28
29 #define DEBUG_SUBSYSTEM S_MDS
30
31 #include <linux/obd_support.h>
32 #include <linux/obd_class.h>
33 #include <linux/obd.h>
34 #include <linux/lustre_lib.h>
35 #include <linux/lustre_idl.h>
36 #include <linux/lustre_mds.h>
37 #include <linux/obd_class.h>
38
39 struct mds_client_info *mds_uuid_to_mci(struct mds_obd *mds, __u8 *uuid)
40 {
41         struct list_head *p;
42
43         if (!uuid)
44                 return NULL;
45
46         list_for_each(p, &mds->mds_client_info) {
47                 struct mds_client_info *mci;
48
49                 mci = list_entry(p, struct mds_client_info, mci_list);
50                 CDEBUG(D_INFO, "checking client UUID '%s'\n",
51                        mci->mci_mcd->mcd_uuid);
52                 if (!strncmp(mci->mci_mcd->mcd_uuid, uuid,
53                              sizeof(mci->mci_mcd->mcd_uuid)))
54                         return mci;
55         }
56         CDEBUG(D_INFO, "no client UUID found for '%s'\n", uuid);
57         return NULL;
58 }
59
60 int mds_update_last_rcvd(struct mds_obd *mds, void *handle,
61                          struct ptlrpc_request *req)
62 {
63         /* get from req->rq_connection-> or req->rq_client */
64         struct mds_client_info *mci;
65         loff_t off;
66         int rc;
67
68         mci = mds_uuid_to_mci(mds, req->rq_connection->c_remote_uuid);
69         if (!mci) {
70                 CERROR("unable to locate MDS client data for UUID '%s'\n",
71                        ptlrpc_req_to_uuid(req));
72                 /* This will be a real error once everything is working */
73                 //LBUG();
74                 RETURN(0);
75         }
76
77         off = MDS_LR_CLIENT + mci->mci_off * MDS_LR_SIZE;
78
79         ++mds->mds_last_rcvd;   /* lock this, or make it an LDLM function? */
80         mci->mci_mcd->mcd_last_rcvd = cpu_to_le64(mds->mds_last_rcvd);
81         mci->mci_mcd->mcd_mount_count = cpu_to_le64(mds->mds_mount_count);
82         mci->mci_mcd->mcd_last_xid = cpu_to_le32(req->rq_reqmsg->xid);
83
84         mds_fs_set_last_rcvd(mds, handle);
85         rc = lustre_fwrite(mds->mds_rcvd_filp, (char *)mci->mci_mcd,
86                            sizeof(*mci->mci_mcd), &off);
87         CDEBUG(D_INODE, "wrote trans #%Ld for client '%s' at #%d: rc = %d\n",
88                mds->mds_last_rcvd, mci->mci_mcd->mcd_uuid, mci->mci_off, rc);
89         // store new value and last committed value in req struct
90
91         if (rc == sizeof(mci->mci_mcd))
92                 rc = 0;
93         else if (rc >= 0)
94                 rc = -EIO;
95
96         return rc;
97 }
98
99 static int mds_reint_setattr(struct mds_update_record *rec,
100                              struct ptlrpc_request *req)
101 {
102         struct mds_obd *mds = &req->rq_obd->u.mds;
103         struct dentry *de;
104         void *handle;
105         int rc = 0;
106
107         de = mds_fid2dentry(mds, rec->ur_fid1, NULL);
108         if (IS_ERR(de) || OBD_FAIL_CHECK(OBD_FAIL_MDS_REINT_SETATTR)) {
109                 GOTO(out_setattr, rc = -ESTALE);
110         }
111
112         CDEBUG(D_INODE, "ino %ld\n", de->d_inode->i_ino);
113
114         OBD_FAIL_WRITE(OBD_FAIL_MDS_REINT_SETATTR_WRITE,
115                        de->d_inode->i_sb->s_dev);
116
117         handle = mds_fs_start(mds, de->d_inode, MDS_FSOP_SETATTR);
118         if (!handle)
119                 GOTO(out_setattr_de, rc = PTR_ERR(handle));
120         rc = mds_fs_setattr(mds, de, handle, &rec->ur_iattr);
121
122         if (!rc)
123                 rc = mds_update_last_rcvd(mds, handle, req);
124         /* FIXME: need to return last_rcvd, last_committed */
125
126         EXIT;
127
128         /* FIXME: keep rc intact */
129         rc = mds_fs_commit(mds, de->d_inode, handle);
130 out_setattr_de:
131         l_dput(de);
132 out_setattr:
133         req->rq_status = rc;
134         return(0);
135 }
136
137 static int mds_reint_create(struct mds_update_record *rec,
138                             struct ptlrpc_request *req)
139 {
140         struct dentry *de = NULL;
141         struct mds_obd *mds = &req->rq_obd->u.mds;
142         struct dentry *dchild = NULL;
143         struct inode *dir;
144         void *handle;
145         int rc = 0, type = rec->ur_mode & S_IFMT;
146         ENTRY;
147
148         de = mds_fid2dentry(mds, rec->ur_fid1, NULL);
149         if (IS_ERR(de) || OBD_FAIL_CHECK(OBD_FAIL_MDS_REINT_CREATE)) {
150                 LBUG();
151                 GOTO(out_create_de, rc = -ESTALE);
152         }
153         dir = de->d_inode;
154         CDEBUG(D_INODE, "parent ino %ld\n", dir->i_ino);
155
156         down(&dir->i_sem);
157         dchild = lookup_one_len(rec->ur_name, de, rec->ur_namelen - 1);
158         if (IS_ERR(dchild)) {
159                 CERROR("child lookup error %ld\n", PTR_ERR(dchild));
160                 up(&dir->i_sem);
161                 LBUG();
162                 GOTO(out_create_dchild, rc = -ESTALE);
163         }
164
165         if (dchild->d_inode) {
166                 CERROR("child exists (dir %ld, name %s, ino %ld)\n",
167                        dir->i_ino, rec->ur_name, dchild->d_inode->i_ino);
168                 LBUG();
169                 GOTO(out_create_dchild, rc = -EEXIST);
170         }
171
172         OBD_FAIL_WRITE(OBD_FAIL_MDS_REINT_CREATE_WRITE, dir->i_sb->s_dev);
173
174         switch (type) {
175         case S_IFREG: {
176                 handle = mds_fs_start(mds, dir, MDS_FSOP_CREATE);
177                 if (!handle)
178                         GOTO(out_create_dchild, PTR_ERR(handle));
179                 rc = vfs_create(dir, dchild, rec->ur_mode);
180                 EXIT;
181                 break;
182         }
183         case S_IFDIR: {
184                 handle = mds_fs_start(mds, dir, MDS_FSOP_MKDIR);
185                 if (!handle)
186                         GOTO(out_create_dchild, PTR_ERR(handle));
187                 rc = vfs_mkdir(dir, dchild, rec->ur_mode);
188                 EXIT;
189                 break;
190         }
191         case S_IFLNK: {
192                 handle = mds_fs_start(mds, dir, MDS_FSOP_SYMLINK);
193                 if (!handle)
194                         GOTO(out_create_dchild, PTR_ERR(handle));
195                 rc = vfs_symlink(dir, dchild, rec->ur_tgt);
196                 EXIT;
197                 break;
198         }
199         case S_IFCHR:
200         case S_IFBLK:
201         case S_IFIFO:
202         case S_IFSOCK: {
203                 int rdev = rec->ur_id;
204                 handle = mds_fs_start(mds, dir, MDS_FSOP_MKNOD);
205                 if (!handle)
206                         GOTO(out_create_dchild, PTR_ERR(handle));
207                 rc = vfs_mknod(dir, dchild, rec->ur_mode, rdev);
208                 EXIT;
209                 break;
210         }
211         default:
212                 CERROR("bad file type %d for create of %s\n",type,rec->ur_name);
213                 GOTO(out_create_dchild, rc = -EINVAL);
214         }
215
216         if (rc) {
217                 CERROR("error during create: %d\n", rc);
218                 LBUG();
219                 GOTO(out_create_commit, rc);
220         } else {
221                 struct iattr iattr;
222                 struct inode *inode = dchild->d_inode;
223                 struct mds_body *body;
224
225                 CDEBUG(D_INODE, "created ino %ld\n", dchild->d_inode->i_ino);
226                 if (type == S_IFREG) {
227                         rc = mds_fs_set_objid(mds, inode, handle, rec->ur_id);
228                         if (rc)
229                                 CERROR("error %d setting objid for %ld\n",
230                                        rc, inode->i_ino);
231                 }
232
233                 iattr.ia_atime = rec->ur_time;
234                 iattr.ia_ctime = rec->ur_time;
235                 iattr.ia_mtime = rec->ur_time;
236                 iattr.ia_uid = rec->ur_uid;
237                 iattr.ia_gid = rec->ur_gid;
238                 iattr.ia_valid = ATTR_UID | ATTR_GID | ATTR_ATIME |
239                         ATTR_MTIME | ATTR_CTIME;
240
241                 rc = mds_fs_setattr(mds, dchild, handle, &iattr);
242                 /* XXX should we abort here in case of error? */
243
244                 //if (!rc)
245                 rc = mds_update_last_rcvd(mds, handle, req);
246
247                 body = lustre_msg_buf(req->rq_repmsg, 0);
248                 body->ino = inode->i_ino;
249                 body->generation = inode->i_generation;
250                 body->last_rcvd = mds->mds_last_rcvd;
251                 body->last_committed = mds->mds_last_committed;
252         }
253
254 out_create_commit:
255         /* FIXME: keep rc intact */
256         rc = mds_fs_commit(mds, dir, handle);
257 out_create_dchild:
258         l_dput(dchild);
259         up(&dir->i_sem);
260 out_create_de:
261         l_dput(de);
262         req->rq_status = rc;
263         return 0;
264 }
265
266 static int mds_reint_unlink(struct mds_update_record *rec,
267                             struct ptlrpc_request *req)
268 {
269         struct dentry *de = NULL;
270         struct dentry *dchild = NULL;
271         struct mds_obd *mds = &req->rq_obd->u.mds;
272         struct inode *dir, *inode;
273         void *handle;
274         int rc = 0;
275         ENTRY;
276
277         de = mds_fid2dentry(mds, rec->ur_fid1, NULL);
278         if (IS_ERR(de) || OBD_FAIL_CHECK(OBD_FAIL_MDS_REINT_UNLINK)) {
279                 LBUG();
280                 GOTO(out_unlink, rc = -ESTALE);
281         }
282         dir = de->d_inode;
283         CDEBUG(D_INODE, "parent ino %ld\n", dir->i_ino);
284
285         down(&dir->i_sem);
286         dchild = lookup_one_len(rec->ur_name, de, rec->ur_namelen - 1);
287         if (IS_ERR(dchild)) {
288                 CERROR("child lookup error %ld\n", PTR_ERR(dchild));
289                 LBUG();
290                 GOTO(out_unlink_de, rc = -ESTALE);
291         }
292
293         inode = dchild->d_inode;
294         if (!inode) {
295                 CERROR("child doesn't exist (dir %ld, name %s\n",
296                        dir->i_ino, rec->ur_name);
297                 LBUG();
298                 GOTO(out_unlink_dchild, rc = -ESTALE);
299         }
300
301         if (inode->i_ino != rec->ur_fid2->id) {
302                 CERROR("inode and FID ID do not match (%ld != %Ld)\n",
303                        inode->i_ino, rec->ur_fid2->id);
304                 LBUG();
305                 GOTO(out_unlink_dchild, rc = -ESTALE);
306         }
307         if (inode->i_generation != rec->ur_fid2->generation) {
308                 CERROR("inode and FID GENERATION do not match (%d != %d)\n",
309                        inode->i_generation, rec->ur_fid2->generation);
310                 LBUG();
311                 GOTO(out_unlink_dchild, rc = -ESTALE);
312         }
313
314         OBD_FAIL_WRITE(OBD_FAIL_MDS_REINT_UNLINK_WRITE, dir->i_sb->s_dev);
315
316         switch (dchild->d_inode->i_mode & S_IFMT) {
317         case S_IFDIR:
318                 handle = mds_fs_start(mds, dir, MDS_FSOP_RMDIR);
319                 if (!handle)
320                         GOTO(out_unlink_dchild, rc = PTR_ERR(handle));
321                 rc = vfs_rmdir(dir, dchild);
322                 break;
323         default:
324                 handle = mds_fs_start(mds, dir, MDS_FSOP_UNLINK);
325                 if (!handle)
326                         GOTO(out_unlink_dchild, rc = PTR_ERR(handle));
327                 rc = vfs_unlink(dir, dchild);
328                 break;
329         }
330
331         if (!rc)
332                 rc = mds_update_last_rcvd(mds, handle, req);
333         /* FIXME: need to return last_rcvd, last_committed */
334         /* FIXME: keep rc intact */
335         rc = mds_fs_commit(mds, dir, handle);
336
337         EXIT;
338 out_unlink_dchild:
339         l_dput(dchild);
340 out_unlink_de:
341         up(&dir->i_sem);
342         l_dput(de);
343 out_unlink:
344         req->rq_status = rc;
345         return 0;
346 }
347
348 static int mds_reint_link(struct mds_update_record *rec,
349                             struct ptlrpc_request *req)
350 {
351         struct dentry *de_src = NULL;
352         struct dentry *de_tgt_dir = NULL;
353         struct dentry *dchild = NULL;
354         struct mds_obd *mds = &req->rq_obd->u.mds;
355         void *handle;
356         int rc = 0;
357
358         ENTRY;
359         de_src = mds_fid2dentry(mds, rec->ur_fid1, NULL);
360         if (IS_ERR(de_src) || OBD_FAIL_CHECK(OBD_FAIL_MDS_REINT_LINK)) {
361                 GOTO(out_link, rc = -ESTALE);
362         }
363
364         de_tgt_dir = mds_fid2dentry(mds, rec->ur_fid2, NULL);
365         if (IS_ERR(de_tgt_dir)) {
366                 GOTO(out_link_de_src, rc = -ESTALE);
367         }
368
369         down(&de_tgt_dir->d_inode->i_sem);
370         dchild = lookup_one_len(rec->ur_name, de_tgt_dir, rec->ur_namelen - 1);
371         if (IS_ERR(dchild)) {
372                 CERROR("child lookup error %ld\n", PTR_ERR(dchild));
373                 GOTO(out_link_de_tgt_dir, rc = -ESTALE);
374         }
375
376         if (dchild->d_inode) {
377                 CERROR("child exists (dir %ld, name %s\n",
378                        de_tgt_dir->d_inode->i_ino, rec->ur_name);
379                 GOTO(out_link_dchild, rc = -EEXIST);
380         }
381
382         OBD_FAIL_WRITE(OBD_FAIL_MDS_REINT_LINK_WRITE,
383                        dchild->d_inode->i_sb->s_dev);
384
385         handle = mds_fs_start(mds, de_tgt_dir->d_inode, MDS_FSOP_LINK);
386         if (!handle)
387                 GOTO(out_link_dchild, rc = PTR_ERR(handle));
388
389         rc = vfs_link(de_src, de_tgt_dir->d_inode, dchild);
390
391         if (!rc)
392                 rc = mds_update_last_rcvd(mds, handle, req);
393
394         /* FIXME: need to return last_rcvd, last_committed */
395         /* FIXME: keep rc intact */
396         rc = mds_fs_commit(mds, de_tgt_dir->d_inode, handle);
397         EXIT;
398
399 out_link_dchild:
400         l_dput(dchild);
401 out_link_de_tgt_dir:
402         up(&de_tgt_dir->d_inode->i_sem);
403         l_dput(de_tgt_dir);
404 out_link_de_src:
405         l_dput(de_src);
406 out_link:
407         req->rq_status = rc;
408         return 0;
409 }
410
411 static int mds_reint_rename(struct mds_update_record *rec,
412                             struct ptlrpc_request *req)
413 {
414         struct dentry *de_srcdir = NULL;
415         struct dentry *de_tgtdir = NULL;
416         struct dentry *de_old = NULL;
417         struct dentry *de_new = NULL;
418         struct mds_obd *mds = &req->rq_obd->u.mds;
419         void *handle;
420         int rc = 0;
421         ENTRY;
422
423         de_srcdir = mds_fid2dentry(mds, rec->ur_fid1, NULL);
424         if (IS_ERR(de_srcdir)) {
425                 GOTO(out_rename, rc = -ESTALE);
426         }
427
428         de_tgtdir = mds_fid2dentry(mds, rec->ur_fid2, NULL);
429         if (IS_ERR(de_tgtdir)) {
430                 GOTO(out_rename_srcdir, rc = -ESTALE);
431         }
432
433         de_old = lookup_one_len(rec->ur_name, de_srcdir, rec->ur_namelen - 1);
434         if (IS_ERR(de_old)) {
435                 CERROR("old child lookup error %ld\n", PTR_ERR(de_old));
436                 GOTO(out_rename_tgtdir, rc = -ESTALE);
437         }
438
439         de_new = lookup_one_len(rec->ur_tgt, de_tgtdir, rec->ur_tgtlen - 1);
440         if (IS_ERR(de_new)) {
441                 CERROR("new child lookup error %ld\n", PTR_ERR(de_new));
442                 GOTO(out_rename_deold, rc = -ESTALE);
443         }
444
445         OBD_FAIL_WRITE(OBD_FAIL_MDS_REINT_RENAME_WRITE,
446                        de_srcdir->d_inode->i_sb->s_dev);
447
448         handle = mds_fs_start(mds, de_tgtdir->d_inode, MDS_FSOP_RENAME);
449         if (!handle)
450                 GOTO(out_rename_denew, rc = PTR_ERR(handle));
451         rc = vfs_rename(de_srcdir->d_inode, de_old, de_tgtdir->d_inode, de_new);
452
453         if (!rc)
454                 rc = mds_update_last_rcvd(mds, handle, req);
455
456         /* FIXME: need to return last_rcvd, last_committed */
457         /* FIXME: keep rc intact */
458         rc = mds_fs_commit(mds, de_tgtdir->d_inode, handle);
459         EXIT;
460
461 out_rename_denew:
462         l_dput(de_new);
463 out_rename_deold:
464         l_dput(de_old);
465 out_rename_tgtdir:
466         l_dput(de_tgtdir);
467 out_rename_srcdir:
468         l_dput(de_srcdir);
469 out_rename:
470         req->rq_status = rc;
471         return 0;
472 }
473
474 typedef int (*mds_reinter)(struct mds_update_record *, struct ptlrpc_request*);
475
476 static mds_reinter reinters[REINT_MAX+1] = {
477         [REINT_SETATTR]   mds_reint_setattr,
478         [REINT_CREATE]    mds_reint_create,
479         [REINT_UNLINK]    mds_reint_unlink,
480         [REINT_LINK]      mds_reint_link,
481         [REINT_RENAME]    mds_reint_rename,
482 };
483
484 int mds_reint_rec(struct mds_update_record *rec, struct ptlrpc_request *req)
485 {
486         int rc, size = sizeof(struct mds_body);
487
488         if (rec->ur_opcode < 1 || rec->ur_opcode > REINT_MAX) {
489                 CERROR("opcode %d not valid\n", rec->ur_opcode);
490                 rc = req->rq_status = -EINVAL;
491                 RETURN(rc);
492         }
493
494         rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
495         if (rc) {
496                 CERROR("mds: out of memory\n");
497                 rc = req->rq_status = -ENOMEM;
498                 RETURN(rc);
499         }
500
501         rc = reinters[rec->ur_opcode](rec, req);
502         return rc;
503 }