Whamcloud - gitweb
land 0.5.20.3 b_devel onto HEAD (b_devel will remain)
[fs/lustre-release.git] / lustre / mds / handler.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  lustre/mds/handler.c
5  *  Lustre Metadata Server (mds) request handler
6  *
7  *  Copyright (c) 2001-2003 Cluster File Systems, Inc.
8  *   Author: Peter Braam <braam@clusterfs.com>
9  *   Author: Andreas Dilger <adilger@clusterfs.com>
10  *   Author: Phil Schwan <phil@clusterfs.com>
11  *   Author: Mike Shaver <shaver@clusterfs.com>
12  *
13  *   This file is part of Lustre, http://www.lustre.org.
14  *
15  *   Lustre is free software; you can redistribute it and/or
16  *   modify it under the terms of version 2 of the GNU General Public
17  *   License as published by the Free Software Foundation.
18  *
19  *   Lustre is distributed in the hope that it will be useful,
20  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
21  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
22  *   GNU General Public License for more details.
23  *
24  *   You should have received a copy of the GNU General Public License
25  *   along with Lustre; if not, write to the Free Software
26  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
27  */
28
29 #define EXPORT_SYMTAB
30 #define DEBUG_SUBSYSTEM S_MDS
31
32 #include <linux/module.h>
33 #include <linux/lustre_mds.h>
34 #include <linux/lustre_dlm.h>
35 #include <linux/init.h>
36 #include <linux/obd_class.h>
37 #include <linux/random.h>
38 #if (LINUX_VERSION_CODE >= KERNEL_VERSION(2,5,0))
39 #include <linux/smp_lock.h>
40 #include <linux/buffer_head.h>
41 #include <linux/workqueue.h>
42 #include <linux/mount.h>
43 #else 
44 #include <linux/locks.h>
45 #endif
46 #include <linux/obd_lov.h>
47 #include <linux/lustre_mds.h>
48 #include <linux/lustre_fsfilt.h>
49 #include <linux/lprocfs_status.h>
50
51 kmem_cache_t *mds_file_cache;
52
53 extern int mds_get_lovtgts(struct mds_obd *obd, int tgt_count,
54                            struct obd_uuid *uuidarray);
55 extern int mds_get_lovdesc(struct mds_obd  *obd, struct lov_desc *desc);
56 int mds_finish_transno(struct mds_obd *mds, struct inode *i, void *handle,
57                        struct ptlrpc_request *req, int rc, int disp);
58 static int mds_cleanup(struct obd_device * obddev);
59
60 inline struct mds_obd *mds_req2mds(struct ptlrpc_request *req)
61 {
62         return &req->rq_export->exp_obd->u.mds;
63 }
64
65 static int mds_bulk_timeout(void *data)
66 {
67         struct ptlrpc_bulk_desc *desc = data;
68
69         ENTRY;
70         recovd_conn_fail(desc->bd_connection);
71         RETURN(1);
72 }
73
74 /* Assumes caller has already pushed into the kernel filesystem context */
75 static int mds_sendpage(struct ptlrpc_request *req, struct file *file,
76                         __u64 offset, __u64 xid)
77 {
78         struct ptlrpc_bulk_desc *desc;
79         struct ptlrpc_bulk_page *bulk;
80         struct l_wait_info lwi;
81         char *buf;
82         int rc = 0;
83         ENTRY;
84
85         desc = ptlrpc_prep_bulk(req->rq_connection);
86         if (desc == NULL)
87                 GOTO(out, rc = -ENOMEM);
88
89         bulk = ptlrpc_prep_bulk_page(desc);
90         if (bulk == NULL)
91                 GOTO(cleanup_bulk, rc = -ENOMEM);
92
93         OBD_ALLOC(buf, PAGE_CACHE_SIZE);
94         if (buf == NULL)
95                 GOTO(cleanup_bulk, rc = -ENOMEM);
96
97         CDEBUG(D_EXT2, "reading %lu@"LPU64" from dir %lu (size %llu)\n",
98                PAGE_CACHE_SIZE, offset, file->f_dentry->d_inode->i_ino,
99                file->f_dentry->d_inode->i_size);
100         rc = fsfilt_readpage(req->rq_export->exp_obd, file, buf,
101                              PAGE_CACHE_SIZE, (loff_t *)&offset);
102
103         if (rc != PAGE_CACHE_SIZE)
104                 GOTO(cleanup_buf, rc = -EIO);
105
106         bulk->bp_xid = xid;
107         bulk->bp_buf = buf;
108         bulk->bp_buflen = PAGE_CACHE_SIZE;
109         desc->bd_ptl_ev_hdlr = NULL;
110         desc->bd_portal = MDS_BULK_PORTAL;
111
112         rc = ptlrpc_bulk_put(desc);
113         if (rc)
114                 GOTO(cleanup_buf, rc);
115
116         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_SENDPAGE)) {
117                 CERROR("obd_fail_loc=%x, fail operation rc=%d\n",
118                        OBD_FAIL_MDS_SENDPAGE, rc);
119                 ptlrpc_abort_bulk(desc);
120                 GOTO(cleanup_buf, rc);
121         }
122
123         lwi = LWI_TIMEOUT(obd_timeout * HZ, mds_bulk_timeout, desc);
124         rc = l_wait_event(desc->bd_waitq, desc->bd_flags & PTL_BULK_FL_SENT,
125                           &lwi);
126         if (rc) {
127                 if (rc != -ETIMEDOUT)
128                         LBUG();
129                 GOTO(cleanup_buf, rc);
130         }
131
132         EXIT;
133  cleanup_buf:
134         OBD_FREE(buf, PAGE_SIZE);
135  cleanup_bulk:
136         ptlrpc_bulk_decref(desc);
137  out:
138         return rc;
139 }
140
141 /* only valid locked dentries or errors should be returned */
142 struct dentry *mds_fid2locked_dentry(struct obd_device *obd, struct ll_fid *fid,
143                                      struct vfsmount **mnt, int lock_mode,
144                                      struct lustre_handle *lockh)
145 {
146         struct mds_obd *mds = &obd->u.mds;
147         struct dentry *de = mds_fid2dentry(mds, fid, mnt), *retval = de;
148         struct ldlm_res_id res_id = { .name = {0} };
149         int flags = 0, rc;
150         ENTRY;
151
152         if (IS_ERR(de))
153                 RETURN(de);
154
155         res_id.name[0] = de->d_inode->i_ino;
156         res_id.name[1] = de->d_inode->i_generation;
157         rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace, NULL,
158                               res_id, LDLM_PLAIN, NULL, 0, lock_mode,
159                               &flags, ldlm_completion_ast,
160                               mds_blocking_ast, NULL, NULL, lockh);
161         if (rc != ELDLM_OK) {
162                 l_dput(de);
163                 retval = ERR_PTR(-ENOLCK); /* XXX translate ldlm code */
164         }
165
166         RETURN(retval);
167 }
168
169 #ifndef DCACHE_DISCONNECTED
170 #define DCACHE_DISCONNECTED DCACHE_NFSD_DISCONNECTED
171 #endif
172
173
174
175 /* Look up an entry by inode number. */
176 /* this function ONLY returns valid dget'd dentries with an initialized inode
177    or errors */
178 struct dentry *mds_fid2dentry(struct mds_obd *mds, struct ll_fid *fid,
179                               struct vfsmount **mnt)
180 {
181         /* stolen from NFS */
182         struct super_block *sb = mds->mds_sb;
183         unsigned long ino = fid->id;
184         __u32 generation = fid->generation;
185         struct inode *inode;
186         struct list_head *lp;
187         struct dentry *result;
188
189         if (ino == 0)
190                 RETURN(ERR_PTR(-ESTALE));
191
192         inode = iget(sb, ino);
193         if (inode == NULL)
194                 RETURN(ERR_PTR(-ENOMEM));
195
196         CDEBUG(D_DENTRY, "--> mds_fid2dentry: sb %p\n", inode->i_sb);
197
198         if (is_bad_inode(inode) ||
199             (generation && inode->i_generation != generation)) {
200                 /* we didn't find the right inode.. */
201                 CERROR("bad inode %lu, link: %d ct: %d or version  %u/%u\n",
202                        inode->i_ino, inode->i_nlink,
203                        atomic_read(&inode->i_count), inode->i_generation,
204                        generation);
205                 iput(inode);
206                 RETURN(ERR_PTR(-ENOENT));
207         }
208
209         /* now to find a dentry. If possible, get a well-connected one */
210         if (mnt)
211                 *mnt = mds->mds_vfsmnt;
212         spin_lock(&dcache_lock);
213         list_for_each(lp, &inode->i_dentry) {
214                 result = list_entry(lp, struct dentry, d_alias);
215                 if (!(result->d_flags & DCACHE_DISCONNECTED)) {
216                         dget_locked(result);
217                         result->d_vfs_flags |= DCACHE_REFERENCED;
218                         spin_unlock(&dcache_lock);
219                         iput(inode);
220                         if (mnt)
221                                 mntget(*mnt);
222                         return result;
223                 }
224         }
225         spin_unlock(&dcache_lock);
226         result = d_alloc_root(inode);
227         if (result == NULL) {
228                 iput(inode);
229                 return ERR_PTR(-ENOMEM);
230         }
231         if (mnt)
232                 mntget(*mnt);
233         result->d_flags |= DCACHE_DISCONNECTED;
234         return result;
235 }
236
237
238 /* Establish a connection to the MDS.
239  *
240  * This will set up an export structure for the client to hold state data
241  * about that client, like open files, the last operation number it did
242  * on the server, etc.
243  */
244 static int mds_connect(struct lustre_handle *conn, struct obd_device *obd,
245                        struct obd_uuid *cluuid, struct recovd_obd *recovd,
246                        ptlrpc_recovery_cb_t recover)
247 {
248         struct obd_export *exp;
249         struct mds_export_data *med;
250         struct mds_client_data *mcd;
251         int rc;
252         ENTRY;
253
254         if (!conn || !obd || !cluuid)
255                 RETURN(-EINVAL);
256
257         /* Check for aborted recovery. */
258         spin_lock_bh(&obd->obd_processing_task_lock);
259         if (obd->obd_flags & OBD_ABORT_RECOVERY)
260                 target_abort_recovery(obd);
261         spin_unlock_bh(&obd->obd_processing_task_lock);
262
263         /* XXX There is a small race between checking the list and adding a
264          * new connection for the same UUID, but the real threat (list
265          * corruption when multiple different clients connect) is solved.
266          *
267          * There is a second race between adding the export to the list,
268          * and filling in the client data below.  Hence skipping the case
269          * of NULL mcd above.  We should already be controlling multiple
270          * connects at the client, and we can't hold the spinlock over
271          * memory allocations without risk of deadlocking.
272          */
273         rc = class_connect(conn, obd, cluuid);
274         if (rc)
275                 RETURN(rc);
276         exp = class_conn2export(conn);
277         LASSERT(exp);
278         med = &exp->exp_mds_data;
279
280         OBD_ALLOC(mcd, sizeof(*mcd));
281         if (!mcd) {
282                 CERROR("mds: out of memory for client data\n");
283                 GOTO(out_export, rc = -ENOMEM);
284         }
285
286         memcpy(mcd->mcd_uuid, cluuid, sizeof(mcd->mcd_uuid));
287         med->med_mcd = mcd;
288
289         INIT_LIST_HEAD(&med->med_open_head);
290         spin_lock_init(&med->med_open_lock);
291
292         rc = mds_client_add(&obd->u.mds, med, -1);
293         if (rc)
294                 GOTO(out_mcd, rc);
295
296         RETURN(0);
297
298 out_mcd:
299         OBD_FREE(mcd, sizeof(*mcd));
300 out_export:
301         class_disconnect(conn);
302
303         return rc;
304 }
305
306 /* Call with med->med_open_lock held, please. */
307 inline int mds_close_mfd(struct mds_file_data *mfd, struct mds_export_data *med)
308 {
309         struct file *file = mfd->mfd_file;
310         int rc;
311         struct dentry *de = NULL;
312         LASSERT(file->private_data == mfd);
313
314         LASSERT(mfd->mfd_servercookie != DEAD_HANDLE_MAGIC);
315
316         list_del(&mfd->mfd_list);
317         mfd->mfd_servercookie = DEAD_HANDLE_MAGIC;
318         kmem_cache_free(mds_file_cache, mfd);
319
320         if (file->f_dentry->d_parent) {
321                 LASSERT(atomic_read(&file->f_dentry->d_parent->d_count));
322                 de = dget(file->f_dentry->d_parent);
323         }
324         rc = filp_close(file, 0);
325         if (de)
326                 l_dput(de);
327         RETURN(rc);
328 }
329
330 static int mds_disconnect(struct lustre_handle *conn)
331 {
332         struct obd_export *export = class_conn2export(conn);
333         struct list_head *tmp, *n;
334         struct mds_export_data *med = &export->exp_mds_data;
335         int rc;
336         ENTRY;
337
338         /*
339          * Close any open files.
340          */
341         spin_lock(&med->med_open_lock);
342         list_for_each_safe(tmp, n, &med->med_open_head) {
343                 struct mds_file_data *mfd =
344                         list_entry(tmp, struct mds_file_data, mfd_list);
345                 CERROR("force closing client file handle for %*s\n",
346                        mfd->mfd_file->f_dentry->d_name.len,
347                        mfd->mfd_file->f_dentry->d_name.name);
348                 rc = mds_close_mfd(mfd, med);
349                 if (rc)
350                         CDEBUG(D_INODE, "Error closing file: %d\n", rc);
351         }
352         spin_unlock(&med->med_open_lock);
353
354         ldlm_cancel_locks_for_export(export);
355         if (med->med_outstanding_reply) {
356                 /* Fake the ack, so the locks get cancelled. */
357                 med->med_outstanding_reply->rq_flags &= ~PTL_RPC_FL_WANT_ACK;
358                 med->med_outstanding_reply->rq_flags |= PTL_RPC_FL_ERR;
359                 wake_up(&med->med_outstanding_reply->rq_wait_for_rep);
360                 med->med_outstanding_reply = NULL;
361         }
362         mds_client_free(export);
363
364         rc = class_disconnect(conn);
365
366         RETURN(rc);
367 }
368
369 /*
370  * XXX This is NOT guaranteed to flush all transactions to disk (even though
371  *     it is equivalent to calling sync()) because it only _starts_ the flush
372  *     and does not wait for completion.  It's better than nothing though.
373  *     What we really want is a mild form of fsync_dev_lockfs(), but it is
374  *     non-standard, or enabling do_sync_supers in ext3, just for this call.
375  */
376 static void mds_fsync_super(struct super_block *sb)
377 {
378         lock_kernel();
379         lock_super(sb);
380         if (sb->s_dirt && sb->s_op && sb->s_op->write_super)
381                 sb->s_op->write_super(sb);
382         unlock_super(sb);
383         unlock_kernel();
384 }
385
386 static int mds_getstatus(struct ptlrpc_request *req)
387 {
388         struct mds_obd *mds = mds_req2mds(req);
389         struct mds_body *body;
390         int rc, size = sizeof(*body);
391         ENTRY;
392
393         rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
394         if (rc || OBD_FAIL_CHECK(OBD_FAIL_MDS_GETSTATUS_PACK)) {
395                 CERROR("mds: out of memory for message: size=%d\n", size);
396                 req->rq_status = -ENOMEM;
397                 RETURN(-ENOMEM);
398         }
399
400         /* Flush any outstanding transactions to disk so the client will
401          * get the latest last_committed value and can drop their local
402          * requests if they have any.  This would be fsync_super() if it
403          * was exported.
404          */
405         mds_fsync_super(mds->mds_sb);
406
407         body = lustre_msg_buf(req->rq_repmsg, 0);
408         memcpy(&body->fid1, &mds->mds_rootfid, sizeof(body->fid1));
409
410         /* the last_committed and last_xid fields are filled in for all
411          * replies already - no need to do so here also.
412          */
413         RETURN(0);
414 }
415
416 static int mds_getlovinfo(struct ptlrpc_request *req)
417 {
418         struct mds_obd *mds = mds_req2mds(req);
419         struct mds_status_req *streq;
420         struct lov_desc *desc;
421         int tgt_count;
422         int rc, size[2] = {sizeof(*desc)};
423         ENTRY;
424
425         streq = lustre_msg_buf(req->rq_reqmsg, 0);
426         streq->flags = NTOH__u32(streq->flags);
427         streq->repbuf = NTOH__u32(streq->repbuf);
428         size[1] = streq->repbuf;
429
430         rc = lustre_pack_msg(2, size, NULL, &req->rq_replen, &req->rq_repmsg);
431         if (rc) {
432                 CERROR("mds: out of memory for message: size=%d\n", size[1]);
433                 req->rq_status = -ENOMEM;
434                 RETURN(-ENOMEM);
435         }
436
437         if (!mds->mds_has_lov_desc) {
438                 req->rq_status = -ENOENT;
439                 RETURN(0);
440         }
441
442         desc = lustre_msg_buf(req->rq_repmsg, 0);
443         memcpy(desc, &mds->mds_lov_desc, sizeof *desc);
444         lov_packdesc(desc);
445         tgt_count = le32_to_cpu(desc->ld_tgt_count);
446         if (tgt_count * sizeof(struct obd_uuid) > streq->repbuf) {
447                 CERROR("too many targets, enlarge client buffers\n");
448                 req->rq_status = -ENOSPC;
449                 RETURN(0);
450         }
451
452         rc = mds_get_lovtgts(mds, tgt_count,
453                              lustre_msg_buf(req->rq_repmsg, 1));
454         if (rc) {
455                 CERROR("get_lovtgts error %d\n", rc);
456                 req->rq_status = rc;
457                 RETURN(0);
458         }
459         RETURN(0);
460 }
461
462 int mds_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc,
463                      void *data, int flag)
464 {
465         int do_ast;
466         ENTRY;
467
468         if (flag == LDLM_CB_CANCELING) {
469                 /* Don't need to do anything here. */
470                 RETURN(0);
471         }
472
473         /* XXX layering violation!  -phil */
474         l_lock(&lock->l_resource->lr_namespace->ns_lock);
475         /* Get this: if mds_blocking_ast is racing with ldlm_intent_policy,
476          * such that mds_blocking_ast is called just before l_i_p takes the
477          * ns_lock, then by the time we get the lock, we might not be the
478          * correct blocking function anymore.  So check, and return early, if
479          * so. */
480         if (lock->l_blocking_ast != mds_blocking_ast) {
481                 l_unlock(&lock->l_resource->lr_namespace->ns_lock);
482                 RETURN(0);
483         }
484
485         lock->l_flags |= LDLM_FL_CBPENDING;
486         do_ast = (!lock->l_readers && !lock->l_writers);
487         l_unlock(&lock->l_resource->lr_namespace->ns_lock);
488
489         if (do_ast) {
490                 struct lustre_handle lockh;
491                 int rc;
492
493                 LDLM_DEBUG(lock, "already unused, calling ldlm_cli_cancel");
494                 ldlm_lock2handle(lock, &lockh);
495                 rc = ldlm_cli_cancel(&lockh);
496                 if (rc < 0)
497                         CERROR("ldlm_cli_cancel: %d\n", rc);
498         } else {
499                 LDLM_DEBUG(lock, "Lock still has references, will be "
500                            "cancelled later");
501         }
502         RETURN(0);
503 }
504
505 int mds_pack_md(struct obd_device *obd, struct lustre_msg *msg,
506                 int offset, struct mds_body *body, struct inode *inode)
507 {
508         struct mds_obd *mds = &obd->u.mds;
509         struct lov_mds_md *lmm;
510         int lmm_size = msg->buflens[offset];
511         int rc;
512         ENTRY;
513
514         if (lmm_size == 0) {
515                 CDEBUG(D_INFO, "no space reserved for inode %lu MD\n",
516                        inode->i_ino);
517                 RETURN(0);
518         }
519
520         lmm = lustre_msg_buf(msg, offset);
521
522         /* I don't really like this, but it is a sanity check on the client
523          * MD request.  However, if the client doesn't know how much space
524          * to reserve for the MD, this shouldn't be fatal either...
525          */
526         if (lmm_size > mds->mds_max_mdsize) {
527                 CERROR("Reading MD for inode %lu of %d bytes > max %d\n",
528                        inode->i_ino, lmm_size, mds->mds_max_mdsize);
529                 // RETURN(-EINVAL);
530         }
531
532         /* We don't need to store the reply size, because this buffer is
533          * discarded right after unpacking, and the LOV can figure out the
534          * size itself from the ost count.
535          */
536         if ((rc = fsfilt_get_md(obd, inode, lmm, lmm_size)) < 0) {
537                 CDEBUG(D_INFO, "No md for ino %lu: rc = %d\n",
538                        inode->i_ino, rc);
539         } else if (rc > 0) {
540                 body->valid |= OBD_MD_FLEASIZE;
541                 rc = 0;
542         }
543
544         RETURN(rc);
545 }
546
547 static int mds_getattr_internal(struct obd_device *obd, struct dentry *dentry,
548                                 struct ptlrpc_request *req,
549                                 struct mds_body *reqbody, int reply_off)
550 {
551         struct mds_body *body;
552         struct inode *inode = dentry->d_inode;
553         int rc = 0;
554         ENTRY;
555
556         if (inode == NULL)
557                 RETURN(-ENOENT);
558
559         body = lustre_msg_buf(req->rq_repmsg, reply_off);
560
561         mds_pack_inode2fid(&body->fid1, inode);
562         mds_pack_inode2body(body, inode);
563
564         if (S_ISREG(inode->i_mode) && reqbody->valid & OBD_MD_FLEASIZE) {
565                 rc = mds_pack_md(obd, req->rq_repmsg, reply_off + 1,
566                                  body, inode);
567         } else if (S_ISLNK(inode->i_mode) && reqbody->valid & OBD_MD_LINKNAME) {
568                 char *symname = lustre_msg_buf(req->rq_repmsg, reply_off + 1);
569                 int len = req->rq_repmsg->buflens[reply_off + 1];
570
571                 rc = inode->i_op->readlink(dentry, symname, len);
572                 if (rc < 0) {
573                         CERROR("readlink failed: %d\n", rc);
574                 } else {
575                         CDEBUG(D_INODE, "read symlink dest %s\n", symname);
576                         body->valid |= OBD_MD_LINKNAME;
577                         rc = 0;
578                 }
579         }
580         RETURN(rc);
581 }
582
583 static int mds_getattr_pack_msg(struct ptlrpc_request *req, struct inode *inode,
584                                 int offset)
585 {
586         struct mds_obd *mds = mds_req2mds(req);
587         struct mds_body *body;
588         int rc = 0, size[2] = {sizeof(*body)}, bufcount = 1;
589         ENTRY;
590
591         body = lustre_msg_buf(req->rq_reqmsg, offset);
592
593         if (S_ISREG(inode->i_mode) && body->valid & OBD_MD_FLEASIZE) {
594                 int rc = fsfilt_get_md(req->rq_export->exp_obd, inode, NULL, 0);
595                 CDEBUG(D_INODE, "got %d bytes MD data for inode %lu\n",
596                        rc, inode->i_ino);
597                 if (rc < 0) {
598                         if (rc != -ENODATA)
599                                 CERROR("error getting inode %lu MD: rc = %d\n",
600                                        inode->i_ino, rc);
601                         size[bufcount] = 0;
602                 } else if (rc > mds->mds_max_mdsize) {
603                         size[bufcount] = 0;
604                         CERROR("MD size %d larger than maximum possible %u\n",
605                                rc, mds->mds_max_mdsize);
606                 } else
607                         size[bufcount] = rc;
608                 bufcount++;
609         } else if (body->valid & OBD_MD_LINKNAME) {
610                 size[bufcount] = MIN(inode->i_size + 1, body->size);
611                 bufcount++;
612                 CDEBUG(D_INODE, "symlink size: %Lu, reply space: "LPU64"\n",
613                        inode->i_size + 1, body->size);
614         }
615
616         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_GETATTR_PACK)) {
617                 CERROR("failed MDS_GETATTR_PACK test\n");
618                 req->rq_status = -ENOMEM;
619                 GOTO(out, rc = -ENOMEM);
620         }
621
622         rc = lustre_pack_msg(bufcount, size, NULL, &req->rq_replen,
623                              &req->rq_repmsg);
624         if (rc) {
625                 CERROR("out of memoryK\n");
626                 req->rq_status = rc;
627                 GOTO(out, rc);
628         }
629
630         EXIT;
631  out:
632         return(rc);
633 }
634
635 /* This is more copy-and-paste from getattr_name than I'd like. */
636 static void reconstruct_getattr_name(int offset, struct ptlrpc_request *req,
637                                      struct lustre_handle *client_lockh)
638 {
639         struct mds_export_data *med = &req->rq_export->exp_mds_data;
640         struct mds_client_data *mcd = med->med_mcd;
641         struct obd_device *obd = req->rq_export->exp_obd;
642         struct mds_obd *mds = mds_req2mds(req);
643         struct dentry *parent, *child;
644         struct mds_body *body;
645         struct inode *dir;
646         struct obd_run_ctxt saved;
647         struct obd_ucred uc;
648         int namelen, rc = 0;
649         char *name;
650
651         req->rq_transno = mcd->mcd_last_transno;
652         req->rq_status = mcd->mcd_last_result;
653
654         if (med->med_outstanding_reply)
655                 mds_steal_ack_locks(med, req);
656
657         if (req->rq_status)
658                 return;
659
660         body = lustre_msg_buf(req->rq_reqmsg, offset);
661         name = lustre_msg_buf(req->rq_reqmsg, offset + 1);
662         namelen = req->rq_reqmsg->buflens[offset + 1];
663         /* requests were at offset 2, replies go back at 1 */
664         if (offset)
665                 offset = 1;
666
667         uc.ouc_fsuid = body->fsuid;
668         uc.ouc_fsgid = body->fsgid;
669         uc.ouc_cap = body->capability;
670         uc.ouc_suppgid1 = body->suppgid;
671         uc.ouc_suppgid2 = -1;
672         push_ctxt(&saved, &mds->mds_ctxt, &uc);
673         parent = mds_fid2dentry(mds, &body->fid1, NULL);
674         LASSERT(!IS_ERR(parent));
675         dir = parent->d_inode;
676         LASSERT(dir);
677         child = lookup_one_len(name, parent, namelen - 1);
678         LASSERT(!IS_ERR(child));
679
680         if (!med->med_outstanding_reply) {
681                 /* XXX need to enqueue client lock */
682                 LBUG();
683         }
684
685         if (req->rq_repmsg == NULL)
686                 mds_getattr_pack_msg(req, child->d_inode, offset);
687         
688         rc = mds_getattr_internal(obd, child, req, body, offset);
689         LASSERT(!rc);
690         l_dput(child);
691         l_dput(parent);
692 }
693
694 static int mds_getattr_name(int offset, struct ptlrpc_request *req,
695                             struct lustre_handle *child_lockh)
696 {
697         struct mds_obd *mds = mds_req2mds(req);
698         struct obd_device *obd = req->rq_export->exp_obd;
699         struct obd_run_ctxt saved;
700         struct mds_body *body;
701         struct dentry *de = NULL, *dchild = NULL;
702         struct inode *dir;
703         struct obd_ucred uc;
704         struct ldlm_res_id child_res_id = { .name = {0} };
705         struct lustre_handle parent_lockh;
706         int namelen, flags = 0, rc = 0, cleanup_phase = 0;
707         char *name;
708         ENTRY;
709
710         LASSERT(!strcmp(obd->obd_type->typ_name, "mds"));
711
712         MDS_CHECK_RESENT(req, 
713                          reconstruct_getattr_name(offset, req, child_lockh));
714
715         if (req->rq_reqmsg->bufcount <= offset + 1) {
716                 LBUG();
717                 GOTO(cleanup, rc = -EINVAL);
718         }
719
720         body = lustre_msg_buf(req->rq_reqmsg, offset);
721         name = lustre_msg_buf(req->rq_reqmsg, offset + 1);
722         namelen = req->rq_reqmsg->buflens[offset + 1];
723         /* requests were at offset 2, replies go back at 1 */
724         if (offset)
725                 offset = 1;
726
727         uc.ouc_fsuid = body->fsuid;
728         uc.ouc_fsgid = body->fsgid;
729         uc.ouc_cap = body->capability;
730         uc.ouc_suppgid1 = body->suppgid;
731         uc.ouc_suppgid2 = -1;
732         push_ctxt(&saved, &mds->mds_ctxt, &uc);
733         /* Step 1: Lookup/lock parent */
734         de = mds_fid2locked_dentry(obd, &body->fid1, NULL, LCK_PR,
735                                    &parent_lockh);
736         if (IS_ERR(de))
737                 GOTO(cleanup, rc = PTR_ERR(de));
738         dir = de->d_inode;
739         LASSERT(dir);
740
741         cleanup_phase = 1; /* parent dentry and lock */
742
743         CDEBUG(D_INODE, "parent ino %lu, name %*s\n", dir->i_ino,namelen,name);
744
745         /* Step 2: Lookup child */
746         dchild = lookup_one_len(name, de, namelen - 1);
747         if (IS_ERR(dchild)) {
748                 CDEBUG(D_INODE, "child lookup error %ld\n", PTR_ERR(dchild));
749                 GOTO(cleanup, rc = PTR_ERR(dchild));
750         }
751
752         cleanup_phase = 2; /* child dentry */
753
754         if (dchild->d_inode == NULL) {
755                 GOTO(cleanup, rc = -ENOENT);
756         }
757
758         /* Step 3: Lock child */
759         child_res_id.name[0] = dchild->d_inode->i_ino;
760         child_res_id.name[1] = dchild->d_inode->i_generation;
761         rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace, NULL,
762                               child_res_id, LDLM_PLAIN, NULL, 0, LCK_PR,
763                               &flags, ldlm_completion_ast, mds_blocking_ast,
764                               NULL, NULL, child_lockh);
765         if (rc != ELDLM_OK) {
766                 CERROR("ldlm_cli_enqueue: %d\n", rc);
767                 GOTO(cleanup, rc = -EIO);
768         }
769
770         cleanup_phase = 3; /* child lock */
771
772         if (req->rq_repmsg == NULL)
773                 mds_getattr_pack_msg(req, dchild->d_inode, offset);
774
775         rc = mds_getattr_internal(obd, dchild, req, body, offset);
776         GOTO(cleanup, rc); /* returns the lock to the client */
777         
778  cleanup:
779         rc = mds_finish_transno(mds, dchild ? dchild->d_inode : NULL, NULL,
780                                 req, rc, 0);
781         switch (cleanup_phase) {
782         case 3:
783                 if (rc)
784                         ldlm_lock_decref(child_lockh, LCK_PR);
785         case 2:
786                 l_dput(dchild);
787
788         case 1:
789                 if (rc) {
790                         ldlm_lock_decref(&parent_lockh, LCK_PR);
791                 } else {
792                         memcpy(&req->rq_ack_locks[0].lock, &parent_lockh,
793                                sizeof(parent_lockh));
794                         req->rq_ack_locks[0].mode = LCK_PR;
795                 }
796                 l_dput(de);
797         default: ;
798         }
799         req->rq_status = rc;
800         pop_ctxt(&saved, &mds->mds_ctxt, &uc);
801         return rc;
802 }
803
804 static int mds_getattr(int offset, struct ptlrpc_request *req)
805 {
806         struct mds_obd *mds = mds_req2mds(req);
807         struct obd_device *obd = req->rq_export->exp_obd;
808         struct obd_run_ctxt saved;
809         struct dentry *de;
810         struct mds_body *body;
811         struct obd_ucred uc;
812         int rc = 0;
813         ENTRY;
814
815         body = lustre_msg_buf(req->rq_reqmsg, offset);
816         uc.ouc_fsuid = body->fsuid;
817         uc.ouc_fsgid = body->fsgid;
818         uc.ouc_cap = body->capability;
819         push_ctxt(&saved, &mds->mds_ctxt, &uc);
820         de = mds_fid2dentry(mds, &body->fid1, NULL);
821         if (IS_ERR(de)) {
822                 rc = req->rq_status = -ENOENT;
823                 GOTO(out_pop, PTR_ERR(de));
824         }
825
826         rc = mds_getattr_pack_msg(req, de->d_inode, offset);
827
828         req->rq_status = mds_getattr_internal(obd, de, req, body, 0);
829
830         l_dput(de);
831         GOTO(out_pop, rc);
832 out_pop:
833         pop_ctxt(&saved, &mds->mds_ctxt, &uc);
834         return rc;
835 }
836
837 static int mds_statfs(struct ptlrpc_request *req)
838 {
839         struct obd_device *obd = req->rq_export->exp_obd;
840         struct obd_statfs *osfs;
841         int rc, size = sizeof(*osfs);
842         ENTRY;
843
844         rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
845         if (rc || OBD_FAIL_CHECK(OBD_FAIL_MDS_STATFS_PACK)) {
846                 CERROR("mds: statfs lustre_pack_msg failed: rc = %d\n", rc);
847                 GOTO(out, rc);
848         }
849
850         osfs = lustre_msg_buf(req->rq_repmsg, 0);
851         rc = fsfilt_statfs(obd, obd->u.mds.mds_sb, osfs);
852         if (rc) {
853                 CERROR("mds: statfs failed: rc %d\n", rc);
854                 GOTO(out, rc);
855         }
856         obd_statfs_pack(osfs, osfs);
857
858         EXIT;
859 out:
860         req->rq_status = rc;
861         return 0;
862 }
863
864 static struct mds_file_data *mds_handle2mfd(struct lustre_handle *handle)
865 {
866         struct mds_file_data *mfd = NULL;
867         ENTRY;
868
869         if (!handle || !handle->addr)
870                 RETURN(NULL);
871
872         mfd = (struct mds_file_data *)(unsigned long)(handle->addr);
873         if (!kmem_cache_validate(mds_file_cache, mfd))
874                 RETURN(NULL);
875
876         if (mfd->mfd_servercookie != handle->cookie)
877                 RETURN(NULL);
878
879         RETURN(mfd);
880 }
881
882 #if 0
883
884 static int mds_store_md(struct mds_obd *mds, struct ptlrpc_request *req,
885                         int offset, struct mds_body *body, struct inode *inode)
886 {
887         struct obd_device *obd = req->rq_export->exp_obd;
888         struct lov_mds_md *lmm = lustre_msg_buf(req->rq_reqmsg, offset);
889         int lmm_size = req->rq_reqmsg->buflens[offset];
890         struct obd_run_ctxt saved;
891         struct obd_ucred uc;
892         void *handle;
893         int rc, rc2;
894         ENTRY;
895
896         /* I don't really like this, but it is a sanity check on the client
897          * MD request.
898          */
899         if (lmm_size > mds->mds_max_mdsize) {
900                 CERROR("Saving MD for inode %lu of %d bytes > max %d\n",
901                        inode->i_ino, lmm_size, mds->mds_max_mdsize);
902                 //RETURN(-EINVAL);
903         }
904
905         CDEBUG(D_INODE, "storing %d bytes MD for inode %lu\n",
906                lmm_size, inode->i_ino);
907         uc.ouc_fsuid = body->fsuid;
908         uc.ouc_fsgid = body->fsgid;
909         uc.ouc_cap = body->capability;
910         push_ctxt(&saved, &mds->mds_ctxt, &uc);
911         handle = fsfilt_start(obd, inode, FSFILT_OP_SETATTR);
912         if (IS_ERR(handle)) {
913                 rc = PTR_ERR(handle);
914                 GOTO(out_ea, rc);
915         }
916
917         rc = fsfilt_set_md(obd, inode,handle,lmm,lmm_size);
918         rc = mds_finish_transno(mds, inode, handle, req, rc, 0);
919 out_ea:
920         pop_ctxt(&saved, &mds->mds_ctxt, &uc);
921
922         RETURN(rc);
923 }
924
925 #endif
926
927 static void reconstruct_close(struct ptlrpc_request *req)
928 {
929         struct mds_export_data *med = &req->rq_export->exp_mds_data;
930         struct mds_client_data *mcd = med->med_mcd;
931
932         req->rq_transno = mcd->mcd_last_transno;
933         req->rq_status = mcd->mcd_last_result;
934
935         /* XXX When open-unlink is working, we'll need to steal ack locks as
936          * XXX well, and make sure that we do the right unlinking after we
937          * XXX get the ack back.
938          */
939 }
940
941 static int mds_close(struct ptlrpc_request *req)
942 {
943         struct mds_export_data *med = &req->rq_export->exp_mds_data;
944         struct mds_body *body;
945         struct mds_file_data *mfd;
946         int rc;
947         ENTRY;
948
949         MDS_CHECK_RESENT(req, reconstruct_close(req));
950
951         body = lustre_msg_buf(req->rq_reqmsg, 0);
952
953         mfd = mds_handle2mfd(&body->handle);
954         if (mfd == NULL) {
955                 DEBUG_REQ(D_ERROR, req, "no handle for file close "LPD64
956                           ": addr "LPX64", cookie "LPX64"\n",
957                           body->fid1.id, body->handle.addr,
958                           body->handle.cookie);
959                 RETURN(-ESTALE);
960         }
961
962         spin_lock(&med->med_open_lock);
963         req->rq_status = mds_close_mfd(mfd, med);
964         spin_unlock(&med->med_open_lock);
965
966         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_CLOSE_PACK)) {
967                 CERROR("test case OBD_FAIL_MDS_CLOSE_PACK\n");
968                 req->rq_status = -ENOMEM;
969                 RETURN(-ENOMEM);
970         }
971
972         rc = lustre_pack_msg(0, NULL, NULL, &req->rq_replen, &req->rq_repmsg);
973         if (rc) {
974                 CERROR("mds: lustre_pack_msg: rc = %d\n", rc);
975                 req->rq_status = rc;
976         }
977
978         RETURN(0);
979 }
980
981 static int mds_readpage(struct ptlrpc_request *req)
982 {
983         struct mds_obd *mds = mds_req2mds(req);
984         struct vfsmount *mnt;
985         struct dentry *de;
986         struct file *file;
987         struct mds_body *body, *repbody;
988         struct obd_run_ctxt saved;
989         int rc, size = sizeof(*body);
990         struct obd_ucred uc;
991         ENTRY;
992
993         rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
994         if (rc || OBD_FAIL_CHECK(OBD_FAIL_MDS_READPAGE_PACK)) {
995                 CERROR("mds: out of memory\n");
996                 GOTO(out, rc = -ENOMEM);
997         }
998
999         body = lustre_msg_buf(req->rq_reqmsg, 0);
1000         uc.ouc_fsuid = body->fsuid;
1001         uc.ouc_fsgid = body->fsgid;
1002         uc.ouc_cap = body->capability;
1003         push_ctxt(&saved, &mds->mds_ctxt, &uc);
1004         de = mds_fid2dentry(mds, &body->fid1, &mnt);
1005         if (IS_ERR(de))
1006                 GOTO(out_pop, rc = PTR_ERR(de));
1007
1008         CDEBUG(D_INODE, "ino %lu\n", de->d_inode->i_ino);
1009
1010         file = dentry_open(de, mnt, O_RDONLY | O_LARGEFILE);
1011         /* note: in case of an error, dentry_open puts dentry */
1012         if (IS_ERR(file))
1013                 GOTO(out_pop, rc = PTR_ERR(file));
1014
1015         repbody = lustre_msg_buf(req->rq_repmsg, 0);
1016         repbody->size = file->f_dentry->d_inode->i_size;
1017         repbody->valid = OBD_MD_FLSIZE;
1018
1019         /* to make this asynchronous make sure that the handling function
1020            doesn't send a reply when this function completes. Instead a
1021            callback function would send the reply */
1022         /* body->blocks is actually the xid -phil */
1023         rc = mds_sendpage(req, file, body->size, body->blocks);
1024
1025         filp_close(file, 0);
1026 out_pop:
1027         pop_ctxt(&saved, &mds->mds_ctxt, &uc);
1028 out:
1029         req->rq_status = rc;
1030         RETURN(0);
1031 }
1032
1033 int mds_reint(struct ptlrpc_request *req, int offset,
1034               struct lustre_handle *lockh)
1035 {
1036         struct mds_update_record *rec; /* 116 bytes on the stack?  no sir! */
1037         int rc;
1038
1039         OBD_ALLOC(rec, sizeof(*rec));
1040         if (rec == NULL)
1041                 RETURN(-ENOMEM);
1042
1043         rc = mds_update_unpack(req, offset, rec);
1044         if (rc || OBD_FAIL_CHECK(OBD_FAIL_MDS_REINT_UNPACK)) {
1045                 CERROR("invalid record\n");
1046                 GOTO(out, req->rq_status = -EINVAL);
1047         }
1048         /* rc will be used to interrupt a for loop over multiple records */
1049         rc = mds_reint_rec(rec, offset, req, lockh);
1050  out:
1051         OBD_FREE(rec, sizeof(*rec));
1052         return rc;
1053 }
1054
1055 static int filter_recovery_request(struct ptlrpc_request *req,
1056                                    struct obd_device *obd, int *process)
1057 {
1058         switch (req->rq_reqmsg->opc) {
1059         case MDS_CONNECT: /* This will never get here, but for completeness. */
1060         case MDS_DISCONNECT:
1061                *process = 1;
1062                RETURN(0);
1063
1064         case MDS_CLOSE:
1065         case MDS_GETSTATUS: /* used in unmounting */
1066         case MDS_REINT:
1067         case LDLM_ENQUEUE:
1068                 *process = target_queue_recovery_request(req, obd);
1069                 RETURN(0);
1070
1071         default:
1072                 DEBUG_REQ(D_ERROR, req, "not permitted during recovery");
1073                 *process = 0;
1074                 /* XXX what should we set rq_status to here? */
1075                 RETURN(ptlrpc_error(req->rq_svc, req));
1076         }
1077 }
1078
1079 static char *reint_names[] = {
1080         [REINT_SETATTR] "setattr",
1081         [REINT_CREATE]  "create",
1082         [REINT_LINK]    "link",
1083         [REINT_UNLINK]  "unlink",
1084         [REINT_RENAME]  "rename",
1085         [REINT_OPEN]    "open",
1086 };
1087
1088 void mds_steal_ack_locks(struct mds_export_data *med,
1089                          struct ptlrpc_request *req)
1090 {
1091         struct ptlrpc_request *oldrep = med->med_outstanding_reply;
1092         memcpy(req->rq_ack_locks, oldrep->rq_ack_locks,
1093                sizeof req->rq_ack_locks);
1094         oldrep->rq_flags |= PTL_RPC_FL_RESENT;
1095         wake_up(&oldrep->rq_wait_for_rep);
1096         DEBUG_REQ(D_HA, oldrep, "stole locks from");
1097         DEBUG_REQ(D_HA, req, "stole locks for");
1098 }
1099
1100 static void mds_send_reply(struct ptlrpc_request *req, int rc)
1101 {
1102         int i;
1103         struct ptlrpc_req_ack_lock *ack_lock;
1104         struct l_wait_info lwi;
1105         struct mds_export_data *med =
1106                 (req->rq_export && req->rq_ack_locks[0].mode) ?
1107                 &req->rq_export->exp_mds_data : NULL;
1108
1109         if (med) {
1110                 med->med_outstanding_reply = req;
1111                 req->rq_flags |= PTL_RPC_FL_WANT_ACK;
1112                 init_waitqueue_head(&req->rq_wait_for_rep);
1113         }
1114
1115         if (!OBD_FAIL_CHECK(OBD_FAIL_MDS_ALL_REPLY_NET | OBD_FAIL_ONCE)) {
1116                 if (rc) {
1117                         DEBUG_REQ(D_ERROR, req, "processing error (%d)", rc);
1118                         ptlrpc_error(req->rq_svc, req);
1119                 } else {
1120                         DEBUG_REQ(D_NET, req, "sending reply");
1121                         ptlrpc_reply(req->rq_svc, req);
1122                 }
1123         } else {
1124                 obd_fail_loc |= OBD_FAIL_ONCE | OBD_FAILED;
1125                 DEBUG_REQ(D_ERROR, req, "dropping reply");
1126                 if (!med && req->rq_repmsg)
1127                         OBD_FREE(req->rq_repmsg, req->rq_replen);
1128         }
1129
1130         if (!med) {
1131                 DEBUG_REQ(D_HA, req, "not waiting for ack");
1132                 return;
1133         }
1134
1135         lwi = LWI_TIMEOUT(obd_timeout / 2 * HZ, NULL, NULL);
1136         rc = l_wait_event(req->rq_wait_for_rep, 
1137                           (req->rq_flags & PTL_RPC_FL_WANT_ACK) == 0 ||
1138                           (req->rq_flags & PTL_RPC_FL_RESENT),
1139                           &lwi);
1140
1141         if (req->rq_flags & PTL_RPC_FL_RESENT) {
1142                 /* The client resent this request, so abort the
1143                  * waiting-ack portals stuff, and don't decref the
1144                  * locks.
1145                  */
1146                 DEBUG_REQ(D_HA, req, "resent: not cancelling locks");
1147                 ptlrpc_abort(req);
1148                 return;
1149         }
1150
1151         if (rc == -ETIMEDOUT) {
1152                 ptlrpc_abort(req);
1153                 recovd_conn_fail(req->rq_export->exp_connection);
1154                 DEBUG_REQ(D_HA, req, "cancelling locks for timeout");
1155         } else {
1156                 DEBUG_REQ(D_HA, req, "cancelling locks for ack");
1157         }
1158         
1159         med->med_outstanding_reply = NULL;
1160         
1161         for (ack_lock = req->rq_ack_locks, i = 0; i < 4; i++, ack_lock++) {
1162                 if (!ack_lock->mode)
1163                         break;
1164                 ldlm_lock_decref(&ack_lock->lock, ack_lock->mode);
1165         }
1166 }
1167
1168 int mds_handle(struct ptlrpc_request *req)
1169 {
1170         int should_process, rc;
1171         struct mds_obd *mds = NULL; /* quell gcc overwarning */
1172         struct obd_device *obd = NULL;
1173         ENTRY;
1174
1175         rc = lustre_unpack_msg(req->rq_reqmsg, req->rq_reqlen);
1176         if (rc || OBD_FAIL_CHECK(OBD_FAIL_MDS_HANDLE_UNPACK)) {
1177                 DEBUG_REQ(D_ERROR, req, "invalid request (%d)", rc);
1178                 GOTO(out, rc);
1179         }
1180
1181         OBD_FAIL_RETURN(OBD_FAIL_MDS_ALL_REQUEST_NET | OBD_FAIL_ONCE, 0);
1182
1183         LASSERT(!strcmp(req->rq_obd->obd_type->typ_name, LUSTRE_MDT_NAME));
1184
1185         if (req->rq_reqmsg->opc != MDS_CONNECT) {
1186                 struct mds_export_data *med;
1187                 if (req->rq_export == NULL) {
1188                         req->rq_status = -ENOTCONN;
1189                         GOTO(out, rc = -ENOTCONN);
1190                 }
1191
1192                 med = &req->rq_export->exp_mds_data;
1193                 obd = req->rq_export->exp_obd;
1194                 mds = &obd->u.mds;
1195                 spin_lock_bh(&obd->obd_processing_task_lock);
1196                 if (obd->obd_flags & OBD_ABORT_RECOVERY)
1197                         target_abort_recovery(obd);
1198                 spin_unlock_bh(&obd->obd_processing_task_lock);
1199
1200                 if (obd->obd_flags & OBD_RECOVERING) {
1201                         rc = filter_recovery_request(req, obd, &should_process);
1202                         if (rc || !should_process)
1203                                 RETURN(rc);
1204                 }
1205         }
1206
1207         switch (req->rq_reqmsg->opc) {
1208         case MDS_CONNECT:
1209                 DEBUG_REQ(D_INODE, req, "connect");
1210                 OBD_FAIL_RETURN(OBD_FAIL_MDS_CONNECT_NET, 0);
1211                 rc = target_handle_connect(req, mds_handle);
1212                 /* Make sure that last_rcvd is correct. */
1213                 if (!rc) {
1214                         /* Now that we have an export, set mds. */
1215                         mds = mds_req2mds(req);
1216                         mds_fsync_super(mds->mds_sb);
1217                 }
1218                 break;
1219
1220         case MDS_DISCONNECT:
1221                 DEBUG_REQ(D_INODE, req, "disconnect");
1222                 OBD_FAIL_RETURN(OBD_FAIL_MDS_DISCONNECT_NET, 0);
1223                 rc = target_handle_disconnect(req);
1224                 /* Make sure that last_rcvd is correct. */
1225                 if (!rc)
1226                         mds_fsync_super(mds->mds_sb);
1227                 req->rq_status = rc;
1228                 break;
1229
1230         case MDS_GETSTATUS:
1231                 DEBUG_REQ(D_INODE, req, "getstatus");
1232                 OBD_FAIL_RETURN(OBD_FAIL_MDS_GETSTATUS_NET, 0);
1233                 rc = mds_getstatus(req);
1234                 break;
1235
1236         case MDS_GETLOVINFO:
1237                 DEBUG_REQ(D_INODE, req, "getlovinfo");
1238                 rc = mds_getlovinfo(req);
1239                 break;
1240
1241         case MDS_GETATTR:
1242                 DEBUG_REQ(D_INODE, req, "getattr");
1243                 OBD_FAIL_RETURN(OBD_FAIL_MDS_GETATTR_NET, 0);
1244                 rc = mds_getattr(0, req);
1245                 break;
1246
1247         case MDS_GETATTR_NAME: {
1248                 struct lustre_handle lockh;
1249                 DEBUG_REQ(D_INODE, req, "getattr_name");
1250                 OBD_FAIL_RETURN(OBD_FAIL_MDS_GETATTR_NAME_NET, 0);
1251
1252                 /* If this request gets a reconstructed reply, we won't be
1253                  * acquiring any new locks in mds_getattr_name, so we don't
1254                  * want to cancel.
1255                  */
1256                 lockh.addr = 0;
1257                 rc = mds_getattr_name(0, req, &lockh);
1258                 if (rc == 0 && lockh.addr)
1259                         ldlm_lock_decref(&lockh, LCK_PR);
1260                 break;
1261         }
1262         case MDS_STATFS:
1263                 DEBUG_REQ(D_INODE, req, "statfs");
1264                 OBD_FAIL_RETURN(OBD_FAIL_MDS_STATFS_NET, 0);
1265                 rc = mds_statfs(req);
1266                 break;
1267
1268         case MDS_READPAGE:
1269                 DEBUG_REQ(D_INODE, req, "readpage");
1270                 OBD_FAIL_RETURN(OBD_FAIL_MDS_READPAGE_NET, 0);
1271                 rc = mds_readpage(req);
1272
1273                 if (OBD_FAIL_CHECK(OBD_FAIL_MDS_SENDPAGE))
1274                         return 0;
1275                 break;
1276
1277         case MDS_REINT: {
1278                 int opc = *(u32 *)lustre_msg_buf(req->rq_reqmsg, 0);
1279                 int size[2] = {sizeof(struct mds_body), mds->mds_max_mdsize};
1280                 int bufcount;
1281
1282                 DEBUG_REQ(D_INODE, req, "reint (%s%s)",
1283                           reint_names[opc & REINT_OPCODE_MASK],
1284                           opc & REINT_REPLAYING ? "|REPLAYING" : "");
1285
1286                 OBD_FAIL_RETURN(OBD_FAIL_MDS_REINT_NET, 0);
1287
1288                 if (opc == REINT_UNLINK)
1289                         bufcount = 2;
1290                 else
1291                         bufcount = 1;
1292
1293                 rc = lustre_pack_msg(bufcount, size, NULL,
1294                                      &req->rq_replen, &req->rq_repmsg);
1295                 if (rc)
1296                         break;
1297
1298                 rc = mds_reint(req, 0, NULL);
1299                 OBD_FAIL_RETURN(OBD_FAIL_MDS_REINT_NET_REP, 0);
1300                 break;
1301         }
1302
1303         case MDS_CLOSE:
1304                 DEBUG_REQ(D_INODE, req, "close");
1305                 OBD_FAIL_RETURN(OBD_FAIL_MDS_CLOSE_NET, 0);
1306                 rc = mds_close(req);
1307                 break;
1308
1309         case LDLM_ENQUEUE:
1310                 DEBUG_REQ(D_INODE, req, "enqueue");
1311                 OBD_FAIL_RETURN(OBD_FAIL_LDLM_ENQUEUE, 0);
1312                 rc = ldlm_handle_enqueue(req, ldlm_server_completion_ast,
1313                                          ldlm_server_blocking_ast);
1314                 break;
1315         case LDLM_CONVERT:
1316                 DEBUG_REQ(D_INODE, req, "convert");
1317                 OBD_FAIL_RETURN(OBD_FAIL_LDLM_CONVERT, 0);
1318                 rc = ldlm_handle_convert(req);
1319                 break;
1320         case LDLM_BL_CALLBACK:
1321         case LDLM_CP_CALLBACK:
1322                 DEBUG_REQ(D_INODE, req, "callback");
1323                 CERROR("callbacks should not happen on MDS\n");
1324                 LBUG();
1325                 OBD_FAIL_RETURN(OBD_FAIL_LDLM_BL_CALLBACK, 0);
1326                 break;
1327         default:
1328                 rc = ptlrpc_error(req->rq_svc, req);
1329                 RETURN(rc);
1330         }
1331
1332         EXIT;
1333
1334         /* If we're DISCONNECTing, the mds_export_data is already freed */
1335         if (!rc && req->rq_reqmsg->opc != MDS_DISCONNECT) {
1336                 struct mds_export_data *med = &req->rq_export->exp_mds_data;
1337                 struct obd_device *obd = list_entry(mds, struct obd_device,
1338                                                     u.mds);
1339                 req->rq_repmsg->last_xid =
1340                         HTON__u64(le64_to_cpu(med->med_mcd->mcd_last_xid));
1341                 if ((obd->obd_flags & OBD_NO_TRANSNO) == 0) {
1342                         req->rq_repmsg->last_committed =
1343                                 HTON__u64(obd->obd_last_committed);
1344                 } else {
1345                         DEBUG_REQ(D_IOCTL, req,
1346                                   "not sending last_committed update");
1347                 }
1348                 CDEBUG(D_INFO, "last_transno "LPU64", last_committed "LPU64
1349                        ", xid "LPU64"\n",
1350                        mds->mds_last_transno, obd->obd_last_committed,
1351                        NTOH__u64(req->rq_xid));
1352         }
1353  out:
1354
1355         if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_LAST_REPLAY) {
1356                 if (obd && (obd->obd_flags & OBD_RECOVERING)) {
1357                         DEBUG_REQ(D_HA, req, "LAST_REPLAY, queuing reply");
1358                         return target_queue_final_reply(req, rc);
1359                 }
1360                 /* Lost a race with recovery; let the error path DTRT. */
1361                 rc = req->rq_status = -ENOTCONN;
1362         }
1363
1364         mds_send_reply(req, rc);
1365         return 0;
1366 }
1367
1368 /* Update the server data on disk.  This stores the new mount_count and
1369  * also the last_rcvd value to disk.  If we don't have a clean shutdown,
1370  * then the server last_rcvd value may be less than that of the clients.
1371  * This will alert us that we may need to do client recovery.
1372  *
1373  * Also assumes for mds_last_transno that we are not modifying it (no locking).
1374  */
1375 int mds_update_server_data(struct mds_obd *mds)
1376 {
1377         struct mds_server_data *msd = mds->mds_server_data;
1378         struct file *filp = mds->mds_rcvd_filp;
1379         struct obd_run_ctxt saved;
1380         loff_t off = 0;
1381         int rc;
1382
1383         push_ctxt(&saved, &mds->mds_ctxt, NULL);
1384         msd->msd_last_transno = cpu_to_le64(mds->mds_last_transno);
1385         msd->msd_mount_count = cpu_to_le64(mds->mds_mount_count);
1386
1387         CDEBUG(D_SUPER, "MDS mount_count is %Lu, last_transno is %Lu\n",
1388                (unsigned long long)mds->mds_mount_count,
1389                (unsigned long long)mds->mds_last_transno);
1390         rc = lustre_fwrite(filp, (char *)msd, sizeof(*msd), &off);
1391         if (rc != sizeof(*msd)) {
1392                 CERROR("error writing MDS server data: rc = %d\n", rc);
1393                 if (rc > 0)
1394                         rc = -EIO;
1395                 GOTO(out, rc);
1396         }
1397 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
1398         rc = fsync_dev(filp->f_dentry->d_inode->i_rdev);
1399 #else
1400         rc = file_fsync(filp, filp->f_dentry, 1);
1401 #endif
1402         if (rc)
1403                 CERROR("error flushing MDS server data: rc = %d\n", rc);
1404
1405 out:
1406         pop_ctxt(&saved, &mds->mds_ctxt, NULL);
1407         RETURN(rc);
1408 }
1409
1410 /* mount the file system (secretly) */
1411 static int mds_setup(struct obd_device *obddev, obd_count len, void *buf)
1412 {
1413         struct obd_ioctl_data* data = buf;
1414         struct mds_obd *mds = &obddev->u.mds;
1415         struct vfsmount *mnt;
1416         int rc = 0;
1417         ENTRY;
1418
1419 #ifdef CONFIG_DEV_RDONLY
1420         dev_clear_rdonly(2);
1421 #endif
1422         if (!data->ioc_inlbuf1 || !data->ioc_inlbuf2)
1423                 RETURN(rc = -EINVAL);
1424
1425         obddev->obd_fsops = fsfilt_get_ops(data->ioc_inlbuf2);
1426         if (IS_ERR(obddev->obd_fsops))
1427                 RETURN(rc = PTR_ERR(obddev->obd_fsops));
1428
1429         mnt = do_kern_mount(data->ioc_inlbuf2, 0, data->ioc_inlbuf1, NULL);
1430         if (IS_ERR(mnt)) {
1431                 rc = PTR_ERR(mnt);
1432                 CERROR("do_kern_mount failed: rc = %d\n", rc);
1433                 GOTO(err_ops, rc);
1434         }
1435
1436         CDEBUG(D_SUPER, "%s: mnt = %p\n", data->ioc_inlbuf1, mnt);
1437         mds->mds_sb = mnt->mnt_root->d_inode->i_sb;
1438         if (!mds->mds_sb)
1439                 GOTO(err_put, rc = -ENODEV);
1440
1441         spin_lock_init(&mds->mds_transno_lock);
1442         mds->mds_max_mdsize = sizeof(struct lov_mds_md);
1443         rc = mds_fs_setup(obddev, mnt);
1444         if (rc) {
1445                 CERROR("MDS filesystem method init failed: rc = %d\n", rc);
1446                 GOTO(err_put, rc);
1447         }
1448
1449         obddev->obd_namespace =
1450                 ldlm_namespace_new("mds_server", LDLM_NAMESPACE_SERVER);
1451         if (obddev->obd_namespace == NULL) {
1452                 mds_cleanup(obddev);
1453                 GOTO(err_fs, rc = -ENOMEM);
1454         }
1455
1456         ptlrpc_init_client(LDLM_CB_REQUEST_PORTAL, LDLM_CB_REPLY_PORTAL,
1457                            "mds_ldlm_client", &obddev->obd_ldlm_client);
1458
1459         mds->mds_has_lov_desc = 0;
1460
1461         RETURN(0);
1462
1463 err_fs:
1464         mds_fs_cleanup(obddev);
1465 err_put:
1466         unlock_kernel();
1467         mntput(mds->mds_vfsmnt);
1468         mds->mds_sb = 0;
1469         lock_kernel();
1470 err_ops:
1471         fsfilt_put_ops(obddev->obd_fsops);
1472         return rc;
1473 }
1474
1475 static int mds_cleanup(struct obd_device *obddev)
1476 {
1477         struct super_block *sb;
1478         struct mds_obd *mds = &obddev->u.mds;
1479         ENTRY;
1480
1481         sb = mds->mds_sb;
1482         if (!mds->mds_sb)
1483                 RETURN(0);
1484
1485         mds_update_server_data(mds);
1486         mds_fs_cleanup(obddev);
1487
1488         unlock_kernel();
1489         mntput(mds->mds_vfsmnt);
1490         mds->mds_sb = 0;
1491
1492         ldlm_namespace_free(obddev->obd_namespace);
1493
1494         lock_kernel();
1495 #ifdef CONFIG_DEV_RDONLY
1496         dev_clear_rdonly(2);
1497 #endif
1498         fsfilt_put_ops(obddev->obd_fsops);
1499
1500         RETURN(0);
1501 }
1502
1503 inline void fixup_handle_for_resent_req(struct ptlrpc_request *req,
1504                                         struct lustre_handle *lockh)
1505 {
1506         struct mds_export_data *med = &req->rq_export->exp_mds_data;
1507         struct mds_client_data *mcd = med->med_mcd;
1508         struct ptlrpc_request *oldrep = med->med_outstanding_reply;
1509         struct ldlm_reply *dlm_rep;
1510
1511         if ((lustre_msg_get_flags(req->rq_reqmsg) & MSG_RESENT) &&
1512             (mcd->mcd_last_xid == req->rq_xid) && (oldrep != NULL)) {
1513                 DEBUG_REQ(D_HA, req, "restoring lock handle from %p", oldrep);
1514                 dlm_rep = lustre_msg_buf(oldrep->rq_repmsg, 0);
1515                 lockh->addr = dlm_rep->lock_handle.addr;
1516                 lockh->cookie = dlm_rep->lock_handle.cookie;
1517         }
1518 }
1519
1520 static int ldlm_intent_policy(struct ldlm_namespace *ns,
1521                               struct ldlm_lock **lockp, void *req_cookie,
1522                               ldlm_mode_t mode, int flags, void *data)
1523 {
1524         struct ptlrpc_request *req = req_cookie;
1525         struct ldlm_lock *lock = *lockp;
1526         int rc = 0;
1527         ENTRY;
1528
1529         if (!req_cookie)
1530                 RETURN(0);
1531
1532         if (req->rq_reqmsg->bufcount > 1) {
1533                 /* an intent needs to be considered */
1534                 struct ldlm_intent *it = lustre_msg_buf(req->rq_reqmsg, 1);
1535                 struct mds_obd *mds = &req->rq_export->exp_obd->u.mds;
1536                 struct mds_body *mds_body;
1537                 struct ldlm_reply *rep;
1538                 struct lustre_handle lockh;
1539                 struct ldlm_lock *new_lock;
1540                 int rc, offset = 2, repsize[3] = {sizeof(struct ldlm_reply),
1541                                                   sizeof(struct mds_body),
1542                                                   mds->mds_max_mdsize};
1543
1544                 it->opc = NTOH__u64(it->opc);
1545
1546                 LDLM_DEBUG(lock, "intent policy, opc: %s",
1547                            ldlm_it2str(it->opc));
1548
1549                 rc = lustre_pack_msg(3, repsize, NULL, &req->rq_replen,
1550                                      &req->rq_repmsg);
1551                 if (rc) {
1552                         rc = req->rq_status = -ENOMEM;
1553                         RETURN(rc);
1554                 }
1555
1556                 rep = lustre_msg_buf(req->rq_repmsg, 0);
1557                 rep->lock_policy_res1 = IT_INTENT_EXEC;
1558
1559                 fixup_handle_for_resent_req(req, &lockh);
1560
1561                 /* execute policy */
1562                 switch ((long)it->opc) {
1563                 case IT_OPEN:
1564                 case IT_CREAT|IT_OPEN:
1565                         rc = mds_reint(req, offset, &lockh);
1566                         /* We return a dentry to the client if IT_OPEN_POS is
1567                          * set, or if we make it to the OPEN portion of the
1568                          * programme (which implies that we created) */
1569                         if (!(rep->lock_policy_res1 & IT_OPEN_POS ||
1570                               rep->lock_policy_res1 & IT_OPEN_OPEN)) {
1571                                 rep->lock_policy_res2 = rc;
1572                                 RETURN(ELDLM_LOCK_ABORTED);
1573                         }
1574                         break;
1575                 case IT_UNLINK:
1576                         rc = mds_reint(req, offset, &lockh);
1577                         /* Don't return a lock if the unlink failed, or if we're
1578                          * not sending back an EA */
1579                         if (rc) {
1580                                 rep->lock_policy_res2 = rc;
1581                                 RETURN(ELDLM_LOCK_ABORTED);
1582                         }
1583                         if (req->rq_status != 0) {
1584                                 rep->lock_policy_res2 = req->rq_status;
1585                                 RETURN(ELDLM_LOCK_ABORTED);
1586                         }
1587                         mds_body = lustre_msg_buf(req->rq_repmsg, 1);
1588                         if (!(mds_body->valid & OBD_MD_FLEASIZE)) {
1589                                 rep->lock_policy_res2 = rc;
1590                                 RETURN(ELDLM_LOCK_ABORTED);
1591                         }
1592                         break;
1593                 case IT_GETATTR:
1594                 case IT_LOOKUP:
1595                 case IT_READDIR:
1596                         rc = mds_getattr_name(offset, req, &lockh);
1597                         /* FIXME: we need to sit down and decide on who should
1598                          * set req->rq_status, who should return negative and
1599                          * positive return values, and what they all mean. */
1600                         if (rc) {
1601                                 rep->lock_policy_res2 = rc;
1602                                 RETURN(ELDLM_LOCK_ABORTED);
1603                         }
1604                         if (req->rq_status != 0) {
1605                                 rep->lock_policy_res2 = req->rq_status;
1606                                 RETURN(ELDLM_LOCK_ABORTED);
1607                         }
1608                         break;
1609                 default:
1610                         CERROR("Unhandled intent "LPD64"\n", it->opc);
1611                         LBUG();
1612                 }
1613
1614                 if (flags & LDLM_FL_INTENT_ONLY) {
1615                         LDLM_DEBUG(lock, "INTENT_ONLY, aborting lock");
1616                         RETURN(ELDLM_LOCK_ABORTED);
1617                 }
1618
1619                 /* By this point, whatever function we called above must have
1620                  * filled in 'lockh' or returned an error.  We want to give the
1621                  * new lock to the client instead of whatever lock it was about
1622                  * to get. */
1623                 new_lock = ldlm_handle2lock(&lockh);
1624                 LASSERT(new_lock != NULL);
1625                 *lockp = new_lock;
1626
1627                 rep->lock_policy_res2 = req->rq_status;
1628
1629                 if (new_lock->l_export == req->rq_export) {
1630                         /* Already gave this to the client, which means that we
1631                          * reconstructed a reply. */
1632                         LASSERT(lustre_msg_get_flags(req->rq_reqmsg) & 
1633                                 MSG_RESENT);
1634                         RETURN(ELDLM_LOCK_REPLACED);
1635                 }
1636
1637                 /* Fixup the lock to be given to the client */
1638                 l_lock(&new_lock->l_resource->lr_namespace->ns_lock);
1639                 LASSERT(new_lock->l_readers + new_lock->l_writers == 1);
1640                 new_lock->l_readers = 0;
1641                 new_lock->l_writers = 0;
1642
1643                 new_lock->l_export = req->rq_export;
1644                 list_add(&new_lock->l_export_chain,
1645                          &new_lock->l_export->exp_ldlm_data.led_held_locks);
1646
1647                 /* We don't need to worry about completion_ast (which isn't set
1648                  * in 'lock' yet anyways), because this lock is already
1649                  * granted. */
1650                 new_lock->l_blocking_ast = lock->l_blocking_ast;
1651
1652                 memcpy(&new_lock->l_remote_handle, &lock->l_remote_handle,
1653                        sizeof(lock->l_remote_handle));
1654
1655                 new_lock->l_flags &= ~(LDLM_FL_LOCAL | LDLM_FL_AST_SENT |
1656                                        LDLM_FL_CBPENDING);
1657
1658                 LDLM_LOCK_PUT(new_lock);
1659                 l_unlock(&new_lock->l_resource->lr_namespace->ns_lock);
1660
1661                 RETURN(ELDLM_LOCK_REPLACED);
1662         } else {
1663                 int size = sizeof(struct ldlm_reply);
1664                 rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen,
1665                                      &req->rq_repmsg);
1666                 if (rc) {
1667                         LBUG();
1668                         RETURN(-ENOMEM);
1669                 }
1670         }
1671         RETURN(rc);
1672 }
1673
1674 int mds_attach(struct obd_device *dev, obd_count len, void *data)
1675 {
1676         struct lprocfs_static_vars lvars;
1677
1678         lprocfs_init_multi_vars(0, &lvars);
1679         return lprocfs_obd_attach(dev, lvars.obd_vars);
1680 }
1681
1682 int mds_detach(struct obd_device *dev)
1683 {
1684         return lprocfs_obd_detach(dev);
1685 }
1686
1687 int mdt_attach(struct obd_device *dev, obd_count len, void *data)
1688 {
1689         struct lprocfs_static_vars lvars;
1690
1691         lprocfs_init_multi_vars(1, &lvars);
1692         return lprocfs_obd_attach(dev, lvars.obd_vars);
1693 }
1694
1695 int mdt_detach(struct obd_device *dev)
1696 {
1697         return lprocfs_obd_detach(dev);
1698 }
1699
1700 static int mdt_setup(struct obd_device *obddev, obd_count len, void *buf)
1701 {
1702         struct mds_obd *mds = &obddev->u.mds;
1703         int i, rc = 0;
1704         ENTRY;
1705
1706         mds->mds_service = ptlrpc_init_svc(MDS_NEVENTS, MDS_NBUFS,
1707                                            MDS_BUFSIZE, MDS_MAXREQSIZE,
1708                                            MDS_REQUEST_PORTAL, MDC_REPLY_PORTAL,
1709                                            mds_handle, "mds");
1710         if (!mds->mds_service) {
1711                 CERROR("failed to start service\n");
1712                 RETURN(rc = -ENOMEM);
1713         }
1714
1715         for (i = 0; i < MDT_NUM_THREADS; i++) {
1716                 char name[32];
1717                 sprintf(name, "ll_mdt_%02d", i);
1718                 rc = ptlrpc_start_thread(obddev, mds->mds_service, name);
1719                 if (rc) {
1720                         CERROR("cannot start MDT thread #%d: rc %d\n", i, rc);
1721                         GOTO(err_thread, rc);
1722                 }
1723         }
1724
1725         mds->mds_setattr_service =
1726                 ptlrpc_init_svc(MDS_NEVENTS, MDS_NBUFS,
1727                                 MDS_BUFSIZE, MDS_MAXREQSIZE,
1728                                 MDS_SETATTR_PORTAL, MDC_REPLY_PORTAL,
1729                                 mds_handle, "mds");
1730         if (!mds->mds_setattr_service) {
1731                 CERROR("failed to start getattr service\n");
1732                 GOTO(err_thread, rc = -ENOMEM);
1733         }
1734
1735         for (i = 0; i < MDT_NUM_THREADS; i++) {
1736                 char name[32];
1737                 sprintf(name, "ll_mdt_attr_%02d", i);
1738                 rc = ptlrpc_start_thread(obddev, mds->mds_setattr_service,
1739                                          name);
1740                 if (rc) {
1741                         CERROR("cannot start MDT setattr thread #%d: rc %d\n",
1742                                i, rc);
1743                         GOTO(err_thread2, rc);
1744                 }
1745         }
1746
1747         mds->mds_readpage_service =
1748                 ptlrpc_init_svc(MDS_NEVENTS, MDS_NBUFS,
1749                                 MDS_BUFSIZE, MDS_MAXREQSIZE,
1750                                 MDS_READPAGE_PORTAL, MDC_REPLY_PORTAL,
1751                                 mds_handle, "mds");
1752         if (!mds->mds_readpage_service) {
1753                 CERROR("failed to start readpage service\n");
1754                 GOTO(err_thread2, rc = -ENOMEM);
1755         }
1756
1757         for (i = 0; i < MDT_NUM_THREADS; i++) {
1758                 char name[32];
1759                 sprintf(name, "ll_mdt_rdpg_%02d", i);
1760                 rc = ptlrpc_start_thread(obddev, mds->mds_readpage_service,
1761                                          name);
1762                 if (rc) {
1763                         CERROR("cannot start MDT readpage thread #%d: rc %d\n",
1764                                i, rc);
1765                         GOTO(err_thread3, rc);
1766                 }
1767         }
1768
1769         RETURN(0);
1770
1771 err_thread3:
1772         ptlrpc_stop_all_threads(mds->mds_readpage_service);
1773         ptlrpc_unregister_service(mds->mds_readpage_service);
1774 err_thread2:
1775         ptlrpc_stop_all_threads(mds->mds_setattr_service);
1776         ptlrpc_unregister_service(mds->mds_setattr_service);
1777 err_thread:
1778         ptlrpc_stop_all_threads(mds->mds_service);
1779         ptlrpc_unregister_service(mds->mds_service);
1780         return rc;
1781 }
1782
1783
1784 static int mdt_cleanup(struct obd_device *obddev)
1785 {
1786         struct mds_obd *mds = &obddev->u.mds;
1787         ENTRY;
1788
1789         ptlrpc_stop_all_threads(mds->mds_readpage_service);
1790         ptlrpc_unregister_service(mds->mds_readpage_service);
1791
1792         ptlrpc_stop_all_threads(mds->mds_setattr_service);
1793         ptlrpc_unregister_service(mds->mds_setattr_service);
1794
1795         ptlrpc_stop_all_threads(mds->mds_service);
1796         ptlrpc_unregister_service(mds->mds_service);
1797
1798         RETURN(0);
1799 }
1800
1801 extern int mds_iocontrol(unsigned int cmd, struct lustre_handle *conn,
1802                          int len, void *karg, void *uarg);
1803
1804 /* use obd ops to offer management infrastructure */
1805 static struct obd_ops mds_obd_ops = {
1806         o_owner:       THIS_MODULE,
1807         o_attach:      mds_attach,
1808         o_detach:      mds_detach,
1809         o_connect:     mds_connect,
1810         o_disconnect:  mds_disconnect,
1811         o_setup:       mds_setup,
1812         o_cleanup:     mds_cleanup,
1813         o_iocontrol:   mds_iocontrol
1814 };
1815
1816 static struct obd_ops mdt_obd_ops = {
1817         o_owner:       THIS_MODULE,
1818         o_attach:      mdt_attach,
1819         o_detach:      mdt_detach,
1820         o_setup:       mdt_setup,
1821         o_cleanup:     mdt_cleanup,
1822 };
1823
1824
1825 static int __init mds_init(void)
1826 {
1827         struct lprocfs_static_vars lvars;
1828         mds_file_cache = kmem_cache_create("ll_mds_file_data",
1829                                            sizeof(struct mds_file_data),
1830                                            0, 0, NULL, NULL);
1831         if (mds_file_cache == NULL)
1832                 return -ENOMEM;
1833
1834         lprocfs_init_multi_vars(0, &lvars);
1835         class_register_type(&mds_obd_ops, lvars.module_vars, LUSTRE_MDS_NAME);
1836         lprocfs_init_multi_vars(1, &lvars);
1837         class_register_type(&mdt_obd_ops, lvars.module_vars, LUSTRE_MDT_NAME);
1838         ldlm_register_intent(ldlm_intent_policy);
1839
1840         return 0;
1841 }
1842
1843 static void __exit mds_exit(void)
1844 {
1845         ldlm_unregister_intent();
1846         class_unregister_type(LUSTRE_MDS_NAME);
1847         class_unregister_type(LUSTRE_MDT_NAME);
1848         if (kmem_cache_destroy(mds_file_cache))
1849                 CERROR("couldn't free MDS file cache\n");
1850 }
1851
1852 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
1853 MODULE_DESCRIPTION("Lustre Metadata Server (MDS)");
1854 MODULE_LICENSE("GPL");
1855
1856 module_init(mds_init);
1857 module_exit(mds_exit);