Whamcloud - gitweb
a28954dae88eb5cb9b593e591123953e670f1b60
[fs/lustre-release.git] / lustre / mds / handler.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  linux/mds/handler.c
5  *
6  *  Lustre Metadata Server (mds) request handler
7  *
8  *  Copyright (C) 2001, 2002 Cluster File Systems, Inc.
9  *
10  *  This code is issued under the GNU General Public License.
11  *  See the file COPYING in this distribution
12  *
13  *  by Peter Braam <braam@clusterfs.com> &
14  *     Andreas Dilger <braam@clusterfs.com>
15  *
16  *  This server is single threaded at present (but can easily be multi threaded)
17  *
18  */
19
20 #define EXPORT_SYMTAB
21 #define DEBUG_SUBSYSTEM S_MDS
22
23 #include <linux/module.h>
24 #include <linux/lustre_mds.h>
25 #include <linux/lustre_dlm.h>
26 extern int mds_update_last_rcvd(struct mds_obd *mds, void *handle,
27                          struct ptlrpc_request *req);
28 static int mds_cleanup(struct obd_device * obddev);
29
30 static int mds_sendpage(struct ptlrpc_request *req, struct file *file,
31                         __u64 offset)
32 {
33         int rc = 0;
34         mm_segment_t oldfs = get_fs();
35         struct ptlrpc_bulk_desc *desc;
36         struct ptlrpc_bulk_page *bulk;
37         char *buf;
38         ENTRY;
39
40         desc = ptlrpc_prep_bulk(req->rq_connection);
41         if (desc == NULL)
42                 GOTO(out, rc = -ENOMEM);
43
44         bulk = ptlrpc_prep_bulk_page(desc);
45         if (bulk == NULL)
46                 GOTO(cleanup_bulk, rc = -ENOMEM);
47
48         OBD_ALLOC(buf, PAGE_SIZE);
49         if (buf == NULL)
50                 GOTO(cleanup_bulk, rc = -ENOMEM);
51
52         set_fs(KERNEL_DS);
53         rc = mds_fs_readpage(&req->rq_obd->u.mds, file, buf, PAGE_SIZE,
54                              (loff_t *)&offset);
55         set_fs(oldfs);
56
57         if (rc != PAGE_SIZE)
58                 GOTO(cleanup_buf, rc = -EIO);
59
60         bulk->b_xid = req->rq_xid;
61         bulk->b_buf = buf;
62         bulk->b_buflen = PAGE_SIZE;
63         desc->b_portal = MDS_BULK_PORTAL;
64
65         rc = ptlrpc_send_bulk(desc);
66         if (rc)
67                 GOTO(cleanup_buf, rc);
68
69         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_SENDPAGE)) {
70                 CERROR("obd_fail_loc=%x, fail operation rc=%d\n",
71                        OBD_FAIL_MDS_SENDPAGE, rc);
72                 ptlrpc_abort_bulk(desc);
73                 GOTO(cleanup_buf, rc);
74         }
75
76         wait_event_interruptible(desc->b_waitq, ptlrpc_check_bulk_sent(desc));
77         if (desc->b_flags & PTL_RPC_FL_INTR)
78                 GOTO(cleanup_buf, rc = -EINTR);
79
80         EXIT;
81  cleanup_buf:
82         OBD_FREE(buf, PAGE_SIZE);
83  cleanup_bulk:
84         ptlrpc_free_bulk(desc);
85  out:
86         return rc;
87 }
88
89 struct dentry *mds_fid2dentry(struct mds_obd *mds, struct ll_fid *fid,
90                               struct vfsmount **mnt)
91 {
92         /* stolen from NFS */
93         struct super_block *sb = mds->mds_sb;
94         unsigned long ino = fid->id;
95         __u32 generation = fid->generation;
96         struct inode *inode;
97         struct list_head *lp;
98         struct dentry *result;
99
100         if (ino == 0)
101                 RETURN(ERR_PTR(-ESTALE));
102
103         inode = iget(sb, ino);
104         if (inode == NULL)
105                 RETURN(ERR_PTR(-ENOMEM));
106
107         CDEBUG(D_DENTRY, "--> mds_fid2dentry: sb %p\n", inode->i_sb);
108
109         if (is_bad_inode(inode) ||
110             (generation && inode->i_generation != generation)) {
111                 /* we didn't find the right inode.. */
112                 CERROR("bad inode %lu, link: %d ct: %d or version  %u/%u\n",
113                        inode->i_ino, inode->i_nlink,
114                        atomic_read(&inode->i_count), inode->i_generation,
115                        generation);
116                 LBUG();
117                 iput(inode);
118                 RETURN(ERR_PTR(-ESTALE));
119         }
120
121         /* now to find a dentry.
122          * If possible, get a well-connected one
123          */
124         if (mnt)
125                 *mnt = mds->mds_vfsmnt;
126         spin_lock(&dcache_lock);
127         for (lp = inode->i_dentry.next; lp != &inode->i_dentry ; lp=lp->next) {
128                 result = list_entry(lp,struct dentry, d_alias);
129                 if (! (result->d_flags & DCACHE_NFSD_DISCONNECTED)) {
130                         dget_locked(result);
131                         result->d_vfs_flags |= DCACHE_REFERENCED;
132                         spin_unlock(&dcache_lock);
133                         iput(inode);
134                         if (mnt)
135                                 mntget(*mnt);
136                         return result;
137                 }
138         }
139         spin_unlock(&dcache_lock);
140         result = d_alloc_root(inode);
141         if (result == NULL) {
142                 iput(inode);
143                 return ERR_PTR(-ENOMEM);
144         }
145         if (mnt)
146                 mntget(*mnt);
147         result->d_flags |= DCACHE_NFSD_DISCONNECTED;
148         return result;
149 }
150
151 static int mds_connect(struct ptlrpc_request *req, struct mds_obd **mdsp)
152 {
153         struct mds_obd *mds;
154         char *uuid;
155         int rc, i;
156         ENTRY;
157
158         uuid = lustre_msg_buf(req->rq_reqmsg, 0);
159         if (req->rq_reqmsg->buflens[0] > 37) {
160                 /* Invalid UUID */
161                 req->rq_status = -EINVAL;
162                 RETURN(-EINVAL);
163         }
164
165         i = obd_class_name2dev(uuid);
166         if (i == -1) {
167                 req->rq_status = -ENODEV;
168                 RETURN(-ENODEV);
169         }
170
171         *mdsp = mds = &(obd_dev[i].u.mds);
172         if (mds != &(req->rq_obd->u.mds)) {
173                 CERROR("device %d (%s) is not an mds\n", i, uuid);
174                 req->rq_status = -ENODEV;
175                 RETURN(-ENODEV);
176         }
177
178         CDEBUG(D_INFO, "MDS connect from UUID '%s'\n",
179                ptlrpc_req_to_uuid(req));
180         rc = lustre_pack_msg(0, NULL, NULL, &req->rq_replen, &req->rq_repmsg);
181         if (rc || OBD_FAIL_CHECK(OBD_FAIL_MDS_CONNECT_PACK)) {
182                 req->rq_status = -ENOMEM;
183                 RETURN(-ENOMEM);
184         }
185         req->rq_repmsg->target_id = i;
186
187         RETURN(0);
188 }
189
190 /* FIXME: the error cases need fixing to avoid leaks */
191 static int mds_getstatus(struct mds_obd *mds, struct ptlrpc_request *req)
192 {
193         struct mds_body *body;
194         struct mds_client_info *mci;
195         struct mds_client_data *mcd;
196         int rc, size = sizeof(*body);
197         ENTRY;
198
199         rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
200         if (rc || OBD_FAIL_CHECK(OBD_FAIL_MDS_GETSTATUS_PACK)) {
201                 CERROR("mds: out of memory for message: size=%d\n", size);
202                 req->rq_status = -ENOMEM;
203                 RETURN(0);
204         }
205
206         body = lustre_msg_buf(req->rq_reqmsg, 0);
207         mds_unpack_body(body);
208         /* Anything we need to do here with the client's trans no or so? */
209
210         body = lustre_msg_buf(req->rq_repmsg, 0);
211         memcpy(&body->fid1, &mds->mds_rootfid, sizeof(body->fid1));
212
213         mci = mds_uuid_to_mci(mds, ptlrpc_req_to_uuid(req));
214         if (!mci) {
215                 /* We don't have any old connection data for this client */
216                 int rc;
217
218                 CDEBUG(D_INFO, "allocating new client data for UUID '%s'",
219                        ptlrpc_req_to_uuid(req));
220
221                 OBD_ALLOC(mcd, sizeof(*mcd));
222                 if (!mcd) {
223                         CERROR("mds: out of memory for client data\n");
224                         req->rq_status = -ENOMEM;
225                         RETURN(0);
226                 }
227                 memcpy(mcd->mcd_uuid, ptlrpc_req_to_uuid(req),
228                        sizeof(mcd->mcd_uuid));
229                 rc = mds_client_add(mds, mcd, -1);
230                 if (rc) {
231                         req->rq_status = rc;
232                         RETURN(0);
233                 }
234         } else {
235                 /* We have old connection data for this client... */
236                 mcd = mci->mci_mcd;
237                 CDEBUG(D_INFO, "found existing data for UUID '%s' at #%d\n",
238                        mcd->mcd_uuid, mci->mci_off);
239         }
240         /* mcd_last_xid is is stored in little endian on the disk and
241            mds_pack_rep_body converts it to network order */
242         body->last_xid = le32_to_cpu(mcd->mcd_last_xid);
243         mds_pack_rep_body(req);
244         RETURN(0);
245 }
246
247 static int mds_disconnect(struct mds_obd *mds, struct ptlrpc_request *req)
248 {
249         struct mds_body *body;
250         int rc;
251         ENTRY;
252
253         body = lustre_msg_buf(req->rq_reqmsg, 0);
254
255         rc = lustre_pack_msg(0, NULL, NULL, &req->rq_replen, &req->rq_repmsg);
256         if (rc)
257                 RETURN(rc);
258
259         RETURN(0);
260 }
261
262 int mds_lock_callback(struct ldlm_lock *lock, struct ldlm_lock *new,
263                       void *data, int data_len, struct ptlrpc_request **reqp)
264 {
265         ENTRY;
266
267         if (new == NULL) {
268                 /* Completion AST.  Do nothing */
269                 RETURN(0);
270         }
271
272         if (ldlm_cli_cancel(lock->l_client, lock) < 0)
273                 LBUG();
274         RETURN(0);
275 }
276
277 static int mds_getattr_name(int offset, struct ptlrpc_request *req)
278 {
279         struct mds_obd *mds = &req->rq_obd->u.mds;
280         struct mds_body *body;
281         struct dentry *de = NULL, *dchild = NULL;
282         struct inode *dir;
283         struct ldlm_lock *lock;
284         struct lustre_handle lockh;
285         char *name;
286         int namelen, flags, lock_mode, rc = 0;
287         __u64 res_id[3] = {0, 0, 0};
288         ENTRY;
289
290         if (strcmp(req->rq_obd->obd_type->typ_name, "mds") != 0)
291                 LBUG();
292
293         if (req->rq_reqmsg->bufcount <= offset + 1) {
294                 LBUG();
295                 GOTO(out_pre_de, rc = -EINVAL);
296         }
297
298         body = lustre_msg_buf(req->rq_reqmsg, offset);
299         name = lustre_msg_buf(req->rq_reqmsg, offset + 1);
300         namelen = req->rq_reqmsg->buflens[offset + 1];
301         /* requests were at offset 2, replies go back at 1 */
302         if (offset)
303                 offset = 1;
304
305         de = mds_fid2dentry(mds, &body->fid1, NULL);
306         if (IS_ERR(de)) {
307                 LBUG();
308                 GOTO(out_pre_de, rc = -ESTALE);
309         }
310
311         dir = de->d_inode;
312         CDEBUG(D_INODE, "parent ino %ld\n", dir->i_ino);
313
314         lock_mode = (req->rq_reqmsg->opc == MDS_REINT) ? LCK_CW : LCK_PW;
315         res_id[0] = dir->i_ino;
316
317         rc = ldlm_local_lock_match(mds->mds_local_namespace, res_id, LDLM_PLAIN,
318                                    NULL, 0, lock_mode, &lockh);
319         if (rc == 0) {
320                 LDLM_DEBUG_NOLOCK("enqueue res %Lu", res_id[0]);
321                 rc = ldlm_cli_enqueue(mds->mds_ldlm_client, mds->mds_ldlm_conn,
322                                       NULL, mds->mds_local_namespace, NULL,
323                                       res_id, LDLM_PLAIN, NULL, 0, lock_mode,
324                                       &flags, (void *)mds_lock_callback,
325                                       NULL, 0, &lockh);
326                 if (rc != ELDLM_OK) {
327                         CERROR("lock enqueue: err: %d\n", rc);
328                         GOTO(out_create_de, rc = -EIO);
329                 }
330         } else {
331                 lock = lustre_handle2object(&lockh);
332                 LDLM_DEBUG(lock, "matched");
333         }
334         ldlm_lock_dump((void *)(unsigned long)lockh.addr);
335
336         down(&dir->i_sem);
337         dchild = lookup_one_len(name, de, namelen - 1);
338         if (IS_ERR(dchild)) {
339                 CERROR("child lookup error %ld\n", PTR_ERR(dchild));
340                 up(&dir->i_sem);
341                 LBUG();
342                 GOTO(out_create_dchild, rc = -ESTALE);
343         }
344
345         if (dchild->d_inode) {
346                 struct mds_body *body;
347                 struct obdo *obdo;
348                 struct inode *inode = dchild->d_inode;
349                 CERROR("child exists (dir %ld, name %s, ino %ld)\n",
350                        dir->i_ino, name, dchild->d_inode->i_ino);
351
352                 body = lustre_msg_buf(req->rq_repmsg, offset);
353                 mds_pack_inode2fid(&body->fid1, inode);
354                 mds_pack_inode2body(body, inode);
355                 if (S_ISREG(inode->i_mode)) {
356                         obdo = lustre_msg_buf(req->rq_repmsg, offset + 1);
357                         mds_fs_get_obdo(mds, inode, obdo);
358                 }
359                 /* now a normal case for intent locking */
360                 rc = 0;
361         } else {
362                 rc = -ENOENT;
363         }
364
365         EXIT;
366 out_create_dchild:
367         l_dput(dchild);
368         up(&dir->i_sem);
369         lock = lustre_handle2object(&lockh);
370         ldlm_lock_decref(lock, lock_mode);
371 out_create_de:
372         l_dput(de);
373  out_pre_de:
374         req->rq_status = rc;
375         return 0;
376 }
377
378
379 static int mds_getattr(int offset, struct ptlrpc_request *req)
380 {
381         struct mds_obd *mds = &req->rq_obd->u.mds;
382         struct dentry *de;
383         struct inode *inode;
384         struct mds_body *body;
385         int rc, size[2] = {sizeof(*body)}, bufcount = 1;
386         ENTRY;
387
388         body = lustre_msg_buf(req->rq_reqmsg, offset);
389         de = mds_fid2dentry(mds, &body->fid1, NULL);
390         if (IS_ERR(de)) {
391                 req->rq_status = -ENOENT;
392                 RETURN(-ENOENT);
393         }
394
395         inode = de->d_inode;
396         if (S_ISREG(body->fid1.f_type)) {
397                 bufcount = 2;
398                 size[1] = sizeof(struct obdo);
399         } else if (body->valid & OBD_MD_LINKNAME) {
400                 bufcount = 2;
401                 size[1] = inode->i_size;
402         }
403
404         rc = lustre_pack_msg(bufcount, size, NULL, &req->rq_replen,
405                              &req->rq_repmsg);
406         if (rc || OBD_FAIL_CHECK(OBD_FAIL_MDS_GETATTR_PACK)) {
407                 CERROR("mds: out of memory\n");
408                 req->rq_status = -ENOMEM;
409                 GOTO(out, rc = -ENOMEM);
410         }
411
412         if (body->valid & OBD_MD_LINKNAME) {
413                 char *tmp = lustre_msg_buf(req->rq_repmsg, 1);
414                 mm_segment_t oldfs;
415
416                 oldfs = get_fs();
417                 set_fs(KERNEL_DS);
418                 rc = inode->i_op->readlink(de, tmp, size[1]);
419                 set_fs(oldfs);
420
421                 if (rc < 0) {
422                         req->rq_status = rc;
423                         CERROR("readlink failed: %d\n", rc);
424                         GOTO(out, rc);
425                 }
426         }
427
428         body = lustre_msg_buf(req->rq_repmsg, 0);
429         body->ino = inode->i_ino;
430         body->generation = inode->i_generation;
431         body->atime = inode->i_atime;
432         body->ctime = inode->i_ctime;
433         body->mtime = inode->i_mtime;
434         body->uid = inode->i_uid;
435         body->gid = inode->i_gid;
436         body->size = inode->i_size;
437         body->mode = inode->i_mode;
438         body->nlink = inode->i_nlink;
439         body->valid = ~0; /* FIXME: should be more selective */
440
441         if (S_ISREG(inode->i_mode)) {
442                 rc = mds_fs_get_obdo(mds, inode,
443                                      lustre_msg_buf(req->rq_repmsg, 1));
444                 if (rc < 0) {
445                         req->rq_status = rc;
446                         CERROR("mds_fs_get_obdo failed: %d\n", rc);
447                         GOTO(out, rc);
448                 }
449         }
450  out:
451         l_dput(de);
452         RETURN(rc);
453 }
454
455 static int mds_open(struct ptlrpc_request *req)
456 {
457         struct dentry *de;
458         struct mds_body *body;
459         struct file *file;
460         struct vfsmount *mnt;
461         struct mds_obd *mds = &req->rq_obd->u.mds;
462         struct mds_client_info *mci;
463         __u32 flags;
464         struct list_head *tmp;
465         struct mds_file_data *mfd;
466         int rc, size = sizeof(*body);
467         ENTRY;
468
469         rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
470         if (rc || OBD_FAIL_CHECK(OBD_FAIL_MDS_OPEN_PACK)) {
471                 CERROR("mds: out of memory\n");
472                 req->rq_status = -ENOMEM;
473                 RETURN(0);
474         }
475
476         mci = mds_uuid_to_mci(mds, ptlrpc_req_to_uuid(req));
477         if (!mci) {
478                 CERROR("mds: no mci!\n");
479                 req->rq_status = -ENOTCONN;
480                 RETURN(0);
481         }
482
483         body = lustre_msg_buf(req->rq_reqmsg, 0);
484
485         /* was this animal open already? */
486         list_for_each(tmp, &mci->mci_open_head) {
487                 struct mds_file_data *fd;
488                 fd = list_entry(tmp, struct mds_file_data, mfd_list);
489                 if (body->extra == fd->mfd_clientfd &&
490                     body->fid1.id == fd->mfd_file->f_dentry->d_inode->i_ino) {
491                         CERROR("Re opening %Ld\n", body->fid1.id);
492                         RETURN(0);
493                 }
494         }
495
496         OBD_ALLOC(mfd, sizeof(*mfd));
497         if (!mfd) {
498                 CERROR("mds: out of memory\n");
499                 req->rq_status = -ENOMEM;
500                 RETURN(0);
501         }
502
503         de = mds_fid2dentry(mds, &body->fid1, &mnt);
504         if (IS_ERR(de)) {
505                 req->rq_status = -ENOENT;
506                 RETURN(0);
507         }
508
509         /* check if this inode has seen a delayed object creation */
510         if (req->rq_reqmsg->bufcount > 1) {
511                 void *handle;
512                 struct inode *inode = de->d_inode;
513                 //struct iattr iattr;
514                 struct obdo *obdo;
515                 int rc;
516
517                 obdo = lustre_msg_buf(req->rq_reqmsg, 1);
518                 //iattr.ia_valid = ATTR_MODE;
519                 //iattr.ia_mode = inode->i_mode;
520
521                 handle = mds_fs_start(mds, de->d_inode, MDS_FSOP_SETATTR);
522                 if (!handle) {
523                         req->rq_status = -ENOMEM;
524                         RETURN(0);
525                 }
526
527                 /* XXX error handling */
528                 rc = mds_fs_set_obdo(mds, inode, handle, obdo);
529                 //                rc = mds_fs_setattr(mds, de, handle, &iattr);
530                 if (!rc)
531                         rc = mds_update_last_rcvd(mds, handle, req);
532                 else {
533                         req->rq_status = rc;
534                         RETURN(0);
535                 }
536                 /* FIXME: need to return last_rcvd, last_committed */
537
538                 /* FIXME: keep rc intact */
539                 rc = mds_fs_commit(mds, de->d_inode, handle);
540                 if (rc) {
541                         req->rq_status = rc;
542                         RETURN(0);
543                 }
544         }
545
546         flags = body->flags;
547         file = dentry_open(de, mnt, flags & ~O_DIRECT);
548         if (!file || IS_ERR(file)) {
549                 req->rq_status = -EINVAL;
550                 OBD_FREE(mfd, sizeof(*mfd));
551                 RETURN(0);
552         }
553
554         file->private_data = mfd;
555         mfd->mfd_file = file;
556         mfd->mfd_clientfd = body->extra;
557         list_add(&mfd->mfd_list, &mci->mci_open_head);
558
559         body = lustre_msg_buf(req->rq_repmsg, 0);
560         body->extra = (__u64) (unsigned long)file;
561         RETURN(0);
562 }
563
564 static
565 int mds_close(struct ptlrpc_request *req)
566 {
567         struct dentry *de;
568         struct mds_body *body;
569         struct file *file;
570         struct mds_obd *mds = &req->rq_obd->u.mds;
571         struct vfsmount *mnt;
572         struct mds_file_data *mfd;
573         int rc;
574         ENTRY;
575
576         rc = lustre_pack_msg(0, NULL, NULL, &req->rq_replen, &req->rq_repmsg);
577         if (rc || OBD_FAIL_CHECK(OBD_FAIL_MDS_CLOSE_PACK)) {
578                 CERROR("mds: out of memory\n");
579                 req->rq_status = -ENOMEM;
580                 RETURN(0);
581         }
582
583         body = lustre_msg_buf(req->rq_reqmsg, 0);
584         de = mds_fid2dentry(mds, &body->fid1, &mnt);
585         if (IS_ERR(de)) {
586                 req->rq_status = -ENOENT;
587                 RETURN(0);
588         }
589
590         file = (struct file *)(unsigned long)body->extra;
591         if (!file->f_dentry)
592                 LBUG();
593         mfd = (struct mds_file_data *)file->private_data;
594         list_del(&mfd->mfd_list);
595         OBD_FREE(mfd, sizeof(*mfd));
596
597         req->rq_status = filp_close(file, 0);
598         l_dput(de);
599         mntput(mnt);
600
601         RETURN(0);
602 }
603
604 static int mds_readpage(struct mds_obd *mds, struct ptlrpc_request *req)
605 {
606         struct vfsmount *mnt;
607         struct dentry *de;
608         struct file *file;
609         struct mds_body *body;
610         int rc, size = sizeof(*body);
611         ENTRY;
612
613         rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
614         if (rc || OBD_FAIL_CHECK(OBD_FAIL_MDS_READPAGE_PACK)) {
615                 CERROR("mds: out of memory\n");
616                 req->rq_status = -ENOMEM;
617                 RETURN(0);
618         }
619
620         body = lustre_msg_buf(req->rq_reqmsg, 0);
621         de = mds_fid2dentry(mds, &body->fid1, &mnt);
622         if (IS_ERR(de)) {
623                 req->rq_status = PTR_ERR(de);
624                 RETURN(0);
625         }
626
627         CDEBUG(D_INODE, "ino %ld\n", de->d_inode->i_ino);
628
629         file = dentry_open(de, mnt, O_RDONLY | O_LARGEFILE);
630         /* note: in case of an error, dentry_open puts dentry */
631         if (IS_ERR(file)) {
632                 req->rq_status = PTR_ERR(file);
633                 RETURN(0);
634         }
635
636         /* to make this asynchronous make sure that the handling function
637            doesn't send a reply when this function completes. Instead a
638            callback function would send the reply */
639         rc = mds_sendpage(req, file, body->size);
640
641         filp_close(file, 0);
642         req->rq_status = rc;
643         RETURN(0);
644 }
645
646 int mds_reint(int offset, struct ptlrpc_request *req)
647 {
648         int rc;
649         struct mds_update_record rec;
650
651         rc = mds_update_unpack(req, offset, &rec);
652         if (rc || OBD_FAIL_CHECK(OBD_FAIL_MDS_REINT_UNPACK)) {
653                 CERROR("invalid record\n");
654                 req->rq_status = -EINVAL;
655                 RETURN(0);
656         }
657         /* rc will be used to interrupt a for loop over multiple records */
658         rc = mds_reint_rec(&rec, offset, req);
659         return rc;
660 }
661
662 int mds_handle(struct obd_device *dev, struct ptlrpc_service *svc,
663                struct ptlrpc_request *req)
664 {
665         struct mds_obd *mds = NULL;
666         int rc;
667         ENTRY;
668
669         rc = lustre_unpack_msg(req->rq_reqmsg, req->rq_reqlen);
670         if (rc || OBD_FAIL_CHECK(OBD_FAIL_MDS_HANDLE_UNPACK)) {
671                 CERROR("lustre_mds: Invalid request\n");
672                 GOTO(out, rc);
673         }
674
675         if (req->rq_reqmsg->type != PTL_RPC_MSG_REQUEST) {
676                 CERROR("lustre_mds: wrong packet type sent %d\n",
677                        req->rq_reqmsg->type);
678                 GOTO(out, rc = -EINVAL);
679         }
680
681         if (req->rq_reqmsg->opc != MDS_CONNECT) {
682                 int id = req->rq_reqmsg->target_id;
683                 struct obd_device *obddev;
684                 if (id < 0 || id > MAX_OBD_DEVICES)
685                         GOTO(out, rc = -ENODEV);
686                 obddev = &obd_dev[id];
687                 if (strcmp(obddev->obd_type->typ_name, "mds") != 0)
688                         GOTO(out, rc = -EINVAL);
689                 mds = &obddev->u.mds;
690                 req->rq_obd = obddev;
691         }
692
693         switch (req->rq_reqmsg->opc) {
694         case MDS_CONNECT:
695                 CDEBUG(D_INODE, "connect\n");
696                 OBD_FAIL_RETURN(OBD_FAIL_MDS_CONNECT_NET, 0);
697                 rc = mds_connect(req, &mds);
698                 break;
699
700         case MDS_DISCONNECT:
701                 CDEBUG(D_INODE, "disconnect\n");
702                 OBD_FAIL_RETURN(OBD_FAIL_MDS_DISCONNECT_NET, 0);
703                 rc = mds_disconnect(mds, req);
704                 break;
705
706         case MDS_GETSTATUS:
707                 CDEBUG(D_INODE, "getstatus\n");
708                 OBD_FAIL_RETURN(OBD_FAIL_MDS_GETSTATUS_NET, 0);
709                 rc = mds_getstatus(mds, req);
710                 break;
711
712         case MDS_GETATTR:
713                 CDEBUG(D_INODE, "getattr\n");
714                 OBD_FAIL_RETURN(OBD_FAIL_MDS_GETATTR_NET, 0);
715                 rc = mds_getattr(0, req);
716                 break;
717
718         case MDS_READPAGE:
719                 CDEBUG(D_INODE, "readpage\n");
720                 OBD_FAIL_RETURN(OBD_FAIL_MDS_READPAGE_NET, 0);
721                 rc = mds_readpage(mds, req);
722
723                 if (OBD_FAIL_CHECK(OBD_FAIL_MDS_SENDPAGE))
724                         return 0;
725                 break;
726
727         case MDS_REINT: {
728                 int size = sizeof(struct mds_body);
729                 CDEBUG(D_INODE, "reint\n");
730                 OBD_FAIL_RETURN(OBD_FAIL_MDS_REINT_NET, 0);
731
732                 rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen,
733                                      &req->rq_repmsg);
734                 if (rc) {
735                         rc = req->rq_status = -ENOMEM;
736                         break;
737                 }
738                 rc = mds_reint(0, req);
739                 OBD_FAIL_RETURN(OBD_FAIL_MDS_REINT_NET_REP, 0);
740                 break;
741         }
742
743         case MDS_OPEN:
744                 CDEBUG(D_INODE, "open\n");
745                 OBD_FAIL_RETURN(OBD_FAIL_MDS_OPEN_NET, 0);
746                 rc = mds_open(req);
747                 break;
748
749         case MDS_CLOSE:
750                 CDEBUG(D_INODE, "close\n");
751                 OBD_FAIL_RETURN(OBD_FAIL_MDS_CLOSE_NET, 0);
752                 rc = mds_close(req);
753                 break;
754
755         default:
756                 rc = ptlrpc_error(svc, req);
757                 RETURN(rc);
758         }
759
760         EXIT;
761 out:
762         /* Still not 100% sure whether we should reply with the server
763          * last_rcvd or that of this client.  I'm not sure it even makes
764          * a difference on a per-client basis, because last_rcvd is global
765          * and we are not supposed to allow transactions while in recovery.
766          */
767         if (rc) {
768                 ptlrpc_error(svc, req);
769         } else {
770                 req->rq_repmsg->last_rcvd = HTON__u64(mds->mds_last_rcvd);
771                 req->rq_repmsg->last_committed =
772                         HTON__u64(mds->mds_last_committed);
773                 CDEBUG(D_INFO, "last_rcvd %Lu, last_committed %Lu, xid %d\n",
774                        (unsigned long long)mds->mds_last_rcvd,
775                        (unsigned long long)mds->mds_last_committed,
776                        cpu_to_le32(req->rq_xid));
777                 CDEBUG(D_NET, "sending reply\n");
778                 ptlrpc_reply(svc, req);
779         }
780         return 0;
781 }
782
783 /* Update the server data on disk.  This stores the new mount_count and
784  * also the last_rcvd value to disk.  If we don't have a clean shutdown,
785  * then the server last_rcvd value may be less than that of the clients.
786  * This will alert us that we may need to do client recovery.
787  */
788 static
789 int mds_update_server_data(struct mds_obd *mds)
790 {
791         struct obd_run_ctxt saved;
792         struct mds_server_data *msd = mds->mds_server_data;
793         struct file *filp = mds->mds_rcvd_filp;
794         loff_t off = 0;
795         int rc;
796
797         msd->msd_last_rcvd = cpu_to_le64(mds->mds_last_rcvd);
798         msd->msd_mount_count = cpu_to_le64(mds->mds_mount_count);
799
800         CDEBUG(D_SUPER, "MDS mount_count is %Lu, last_rcvd is %Lu\n",
801                (unsigned long long)mds->mds_mount_count,
802                (unsigned long long)mds->mds_last_rcvd);
803         push_ctxt(&saved, &mds->mds_ctxt);
804         rc = lustre_fwrite(filp, (char *)msd, sizeof(*msd), &off);
805         if (rc != sizeof(*msd)) {
806                 CERROR("error writing MDS server data: rc = %d\n", rc);
807                 if (rc > 0)
808                         RETURN(-EIO);
809                 RETURN(rc);
810         }
811         rc = fsync_dev(filp->f_dentry->d_inode->i_rdev);
812         pop_ctxt(&saved);
813         if (rc)
814                 CERROR("error flushing MDS server data: rc = %d\n", rc);
815
816         return 0;
817 }
818
819 /* Do recovery actions for the MDS */
820 static int mds_recover(struct obd_device *obddev)
821 {
822         struct mds_obd *mds = &obddev->u.mds;
823         int rc;
824
825         /* This happens at the end when recovery is complete */
826         ++mds->mds_mount_count;
827         rc = mds_update_server_data(mds);
828
829         return rc;
830 }
831
832
833 /* mount the file system (secretly) */
834 static int mds_setup(struct obd_device *obddev, obd_count len, void *buf)
835 {
836         struct obd_ioctl_data* data = buf;
837         struct mds_obd *mds = &obddev->u.mds;
838         struct vfsmount *mnt;
839         int rc = 0;
840         ENTRY;
841
842         MOD_INC_USE_COUNT;
843 #ifdef CONFIG_DEV_RDONLY
844         dev_clear_rdonly(2);
845 #endif
846         if (!data->ioc_inlbuf1 || !data->ioc_inlbuf2)
847                 GOTO(err_dec, rc = -EINVAL);
848
849         mds->mds_fstype = strdup(data->ioc_inlbuf2);
850
851         mnt = do_kern_mount(mds->mds_fstype, 0, data->ioc_inlbuf1, NULL);
852         if (IS_ERR(mnt)) {
853                 rc = PTR_ERR(mnt);
854                 CERROR("do_kern_mount failed: rc = %d\n", rc);
855                 GOTO(err_kfree, rc);
856         }
857
858         mds->mds_sb = mnt->mnt_root->d_inode->i_sb;
859         if (!mds->mds_sb)
860                 GOTO(err_put, rc = -ENODEV);
861
862         rc = mds_fs_setup(mds, mnt);
863         if (rc) {
864                 CERROR("MDS filesystem method init failed: rc = %d\n", rc);
865                 GOTO(err_put, rc);
866         }
867
868         mds->mds_service = ptlrpc_init_svc(64 * 1024,
869                                            MDS_REQUEST_PORTAL, MDC_REPLY_PORTAL,                                           "self", mds_handle);
870         if (!mds->mds_service) {
871                 CERROR("failed to start service\n");
872                 GOTO(err_fs, rc = -EINVAL);
873         }
874
875         rc = ptlrpc_start_thread(obddev, mds->mds_service, "lustre_mds");
876         if (rc) {
877                 CERROR("cannot start thread: rc = %d\n", rc);
878                 GOTO(err_svc, rc);
879         }
880
881         rc = -ENOENT;
882         mds->mds_ldlm_conn = ptlrpc_uuid_to_connection("self");
883         if (!mds->mds_ldlm_conn) {
884                 mds_cleanup(obddev);
885                 GOTO(err_thread, rc);
886         }
887
888         obddev->obd_namespace =
889                 ldlm_namespace_new("mds_server", LDLM_NAMESPACE_SERVER);
890         if (obddev->obd_namespace == NULL) {
891                 LBUG();
892                 mds_cleanup(obddev);
893                 GOTO(err_thread, rc);
894         }
895
896         mds->mds_local_namespace =
897                 ldlm_namespace_new("mds_client", LDLM_NAMESPACE_CLIENT);
898         if (mds->mds_local_namespace == NULL) {
899                 LBUG();
900                 mds_cleanup(obddev);
901                 GOTO(err_thread, rc);
902         }
903
904         OBD_ALLOC(mds->mds_ldlm_client, sizeof(*mds->mds_ldlm_client));
905         if (mds->mds_ldlm_client == NULL) {
906                 LBUG();
907                 mds_cleanup(obddev);
908                 GOTO(err_thread, rc);
909         }
910         ptlrpc_init_client(NULL, NULL, LDLM_REQUEST_PORTAL, LDLM_REPLY_PORTAL,
911                            mds->mds_ldlm_client);
912         mds->mds_ldlm_client->cli_target_devno = obddev->obd_minor;
913         mds->mds_ldlm_client->cli_name = "mds ldlm";
914
915         rc = mds_recover(obddev);
916         if (rc)
917                 GOTO(err_thread, rc);
918
919         RETURN(0);
920
921 err_thread:
922         ptlrpc_stop_all_threads(mds->mds_service);
923 err_svc:
924         rpc_unregister_service(mds->mds_service);
925         OBD_FREE(mds->mds_service, sizeof(*mds->mds_service));
926 err_fs:
927         mds_fs_cleanup(mds);
928 err_put:
929         unlock_kernel();
930         mntput(mds->mds_vfsmnt);
931         mds->mds_sb = 0;
932         lock_kernel();
933 err_kfree:
934         kfree(mds->mds_fstype);
935 err_dec:
936         MOD_DEC_USE_COUNT;
937         return rc;
938 }
939
940 static int mds_cleanup(struct obd_device * obddev)
941 {
942         struct super_block *sb;
943         struct mds_obd *mds = &obddev->u.mds;
944
945         ENTRY;
946
947         if ( !list_empty(&obddev->obd_gen_clients) ) {
948                 CERROR("still has clients!\n");
949                 RETURN(-EBUSY);
950         }
951
952         ptlrpc_stop_all_threads(mds->mds_service);
953         rpc_unregister_service(mds->mds_service);
954         if (!list_empty(&mds->mds_service->srv_reqs)) {
955                 // XXX reply with errors and clean up
956                 CERROR("Request list not empty!\n");
957         }
958         OBD_FREE(mds->mds_service, sizeof(*mds->mds_service));
959
960         sb = mds->mds_sb;
961         if (!mds->mds_sb)
962                 RETURN(0);
963
964         mds_update_server_data(mds);
965
966         if (mds->mds_rcvd_filp) {
967                 int rc = filp_close(mds->mds_rcvd_filp, 0);
968                 mds->mds_rcvd_filp = NULL;
969
970                 if (rc)
971                         CERROR("last_rcvd file won't close, rc=%d\n", rc);
972         }
973
974         unlock_kernel();
975         mntput(mds->mds_vfsmnt);
976         mds->mds_sb = 0;
977         kfree(mds->mds_fstype);
978
979         ldlm_namespace_free(mds->mds_local_namespace);
980         ldlm_namespace_free(obddev->obd_namespace);
981
982         if (mds->mds_ldlm_conn != NULL)
983                 ptlrpc_put_connection(mds->mds_ldlm_conn);
984
985         OBD_FREE(mds->mds_ldlm_client, sizeof(*mds->mds_ldlm_client));
986
987         lock_kernel();
988 #ifdef CONFIG_DEV_RDONLY
989         dev_clear_rdonly(2);
990 #endif
991         mds_fs_cleanup(mds);
992
993         MOD_DEC_USE_COUNT;
994         RETURN(0);
995 }
996
997 /* use obd ops to offer management infrastructure */
998 static struct obd_ops mds_obd_ops = {
999         o_setup:       mds_setup,
1000         o_cleanup:     mds_cleanup,
1001 };
1002
1003 static int __init mds_init(void)
1004 {
1005         inter_module_register("mds_reint", THIS_MODULE, &mds_reint);
1006         inter_module_register("mds_getattr_name", THIS_MODULE,
1007                               &mds_getattr_name);
1008         obd_register_type(&mds_obd_ops, LUSTRE_MDS_NAME);
1009         return 0;
1010 }
1011
1012 static void __exit mds_exit(void)
1013 {
1014         inter_module_unregister("mds_reint");
1015         inter_module_unregister("mds_getattr_name");
1016         obd_unregister_type(LUSTRE_MDS_NAME);
1017 }
1018
1019 MODULE_AUTHOR("Cluster File Systems <info@clusterfs.com>");
1020 MODULE_DESCRIPTION("Lustre Metadata Server (MDS) v0.01");
1021 MODULE_LICENSE("GPL");
1022
1023 module_init(mds_init);
1024 module_exit(mds_exit);