Whamcloud - gitweb
merge b_devel into HEAD (20030703)
[fs/lustre-release.git] / lustre / mds / handler.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  lustre/mds/handler.c
5  *  Lustre Metadata Server (mds) request handler
6  *
7  *  Copyright (c) 2001-2003 Cluster File Systems, Inc.
8  *   Author: Peter Braam <braam@clusterfs.com>
9  *   Author: Andreas Dilger <adilger@clusterfs.com>
10  *   Author: Phil Schwan <phil@clusterfs.com>
11  *   Author: Mike Shaver <shaver@clusterfs.com>
12  *
13  *   This file is part of Lustre, http://www.lustre.org.
14  *
15  *   Lustre is free software; you can redistribute it and/or
16  *   modify it under the terms of version 2 of the GNU General Public
17  *   License as published by the Free Software Foundation.
18  *
19  *   Lustre is distributed in the hope that it will be useful,
20  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
21  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
22  *   GNU General Public License for more details.
23  *
24  *   You should have received a copy of the GNU General Public License
25  *   along with Lustre; if not, write to the Free Software
26  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
27  */
28
29 #define EXPORT_SYMTAB
30 #define DEBUG_SUBSYSTEM S_MDS
31
32 #include <linux/module.h>
33 #include <linux/lustre_mds.h>
34 #include <linux/lustre_dlm.h>
35 #include <linux/init.h>
36 #include <linux/obd_class.h>
37 #include <linux/random.h>
38 #include <linux/fs.h>
39 #include <linux/jbd.h>
40 #include <linux/ext3_fs.h>
41 #if (LINUX_VERSION_CODE >= KERNEL_VERSION(2,5,0))
42 # include <linux/smp_lock.h>
43 # include <linux/buffer_head.h>
44 # include <linux/workqueue.h>
45 # include <linux/mount.h>
46 #else
47 # include <linux/locks.h>
48 #endif
49 #include <linux/obd_lov.h>
50 #include <linux/lustre_mds.h>
51 #include <linux/lustre_fsfilt.h>
52 #include <linux/lprocfs_status.h>
53 #include "mds_internal.h"
54
55 extern int mds_get_lovtgts(struct mds_obd *obd, int tgt_count,
56                            struct obd_uuid *uuidarray);
57 extern int mds_get_lovdesc(struct mds_obd  *obd, struct lov_desc *desc);
58 int mds_finish_transno(struct mds_obd *mds, struct inode *i, void *handle,
59                        struct ptlrpc_request *req, int rc, int disp);
60 static int mds_cleanup(struct obd_device * obddev, int force, int failover);
61
62 inline struct mds_obd *mds_req2mds(struct ptlrpc_request *req)
63 {
64         return &req->rq_export->exp_obd->u.mds;
65 }
66
67 static int mds_bulk_timeout(void *data)
68 {
69         struct ptlrpc_bulk_desc *desc = data;
70         struct obd_export *exp = desc->bd_export;
71
72         CERROR("bulk send timed out: evicting %s@%s\n",
73                exp->exp_client_uuid.uuid,
74                exp->exp_connection->c_remote_uuid.uuid);
75         ptlrpc_fail_export(exp);
76         ptlrpc_abort_bulk (desc);
77         RETURN(1);
78 }
79
80 /* Assumes caller has already pushed into the kernel filesystem context */
81 static int mds_sendpage(struct ptlrpc_request *req, struct file *file,
82                         __u64 offset, __u64 xid)
83 {
84         struct ptlrpc_bulk_desc *desc;
85         struct l_wait_info lwi;
86         struct page *page;
87         int rc = 0;
88         ENTRY;
89
90         LASSERT ((offset & (PAGE_CACHE_SIZE - 1)) == 0);
91
92         desc = ptlrpc_prep_bulk_exp (req, BULK_PUT_SOURCE, MDS_BULK_PORTAL);
93         if (desc == NULL)
94                 GOTO(out, rc = -ENOMEM);
95
96         LASSERT (PAGE_SIZE == PAGE_CACHE_SIZE);
97         page = alloc_pages (GFP_KERNEL, 0);
98         if (page == NULL)
99                 GOTO(cleanup_bulk, rc = -ENOMEM);
100
101         rc = ptlrpc_prep_bulk_page(desc, page, 0, PAGE_CACHE_SIZE);
102         if (rc != 0)
103                 GOTO(cleanup_buf, rc);
104
105         CDEBUG(D_EXT2, "reading %lu@"LPU64" from dir %lu (size %llu)\n",
106                PAGE_CACHE_SIZE, offset, file->f_dentry->d_inode->i_ino,
107                file->f_dentry->d_inode->i_size);
108         rc = fsfilt_readpage(req->rq_export->exp_obd, file, page_address (page),
109                              PAGE_CACHE_SIZE, (loff_t *)&offset);
110
111         if (rc != PAGE_CACHE_SIZE)
112                 GOTO(cleanup_buf, rc = -EIO);
113
114         rc = ptlrpc_bulk_put(desc);
115         if (rc)
116                 GOTO(cleanup_buf, rc);
117
118         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_SENDPAGE)) {
119                 CERROR("obd_fail_loc=%x, fail operation rc=%d\n",
120                        OBD_FAIL_MDS_SENDPAGE, rc);
121                 ptlrpc_abort_bulk(desc);
122                 GOTO(cleanup_buf, rc);
123         }
124
125         lwi = LWI_TIMEOUT(obd_timeout * HZ, mds_bulk_timeout, desc);
126         rc = l_wait_event(desc->bd_waitq, ptlrpc_bulk_complete (desc), &lwi);
127         if (rc) {
128                 LASSERT (rc == -ETIMEDOUT);
129                 GOTO(cleanup_buf, rc);
130         }
131
132         EXIT;
133  cleanup_buf:
134         __free_pages (page, 0);
135  cleanup_bulk:
136         ptlrpc_free_bulk (desc);
137  out:
138         return rc;
139 }
140
141 /* only valid locked dentries or errors should be returned */
142 struct dentry *mds_fid2locked_dentry(struct obd_device *obd, struct ll_fid *fid,
143                                      struct vfsmount **mnt, int lock_mode,
144                                      struct lustre_handle *lockh)
145 {
146         struct mds_obd *mds = &obd->u.mds;
147         struct dentry *de = mds_fid2dentry(mds, fid, mnt), *retval = de;
148         struct ldlm_res_id res_id = { .name = {0} };
149         int flags = 0, rc;
150         ENTRY;
151
152         if (IS_ERR(de))
153                 RETURN(de);
154
155         res_id.name[0] = de->d_inode->i_ino;
156         res_id.name[1] = de->d_inode->i_generation;
157         rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace, NULL,
158                               res_id, LDLM_PLAIN, NULL, 0, lock_mode,
159                               &flags, ldlm_completion_ast,
160                               mds_blocking_ast, NULL, lockh);
161         if (rc != ELDLM_OK) {
162                 l_dput(de);
163                 retval = ERR_PTR(-ENOLCK); /* XXX translate ldlm code */
164         }
165
166         RETURN(retval);
167 }
168
169 #ifndef DCACHE_DISCONNECTED
170 #define DCACHE_DISCONNECTED DCACHE_NFSD_DISCONNECTED
171 #endif
172
173
174 /* Look up an entry by inode number. */
175 /* this function ONLY returns valid dget'd dentries with an initialized inode
176    or errors */
177 struct dentry *mds_fid2dentry(struct mds_obd *mds, struct ll_fid *fid,
178                               struct vfsmount **mnt)
179 {
180         char fid_name[32];
181         unsigned long ino = fid->id;
182         __u32 generation = fid->generation;
183         struct inode *inode;
184         struct dentry *result;
185
186         if (ino == 0)
187                 RETURN(ERR_PTR(-ESTALE));
188
189         snprintf(fid_name, sizeof(fid_name), "0x%lx", ino);
190
191         /* under ext3 this is neither supposed to return bad inodes
192            nor NULL inodes. */
193         result = ll_lookup_one_len(fid_name, mds->mds_fid_de, strlen(fid_name));
194         if (IS_ERR(result))
195                 RETURN(result);
196
197         inode = result->d_inode;
198         if (!inode)
199                 RETURN(ERR_PTR(-ENOENT));
200
201         CDEBUG(D_DENTRY, "--> mds_fid2dentry: ino %lu, gen %u, sb %p\n",
202                inode->i_ino, inode->i_generation, inode->i_sb);
203
204         if (generation && inode->i_generation != generation) {
205                 /* we didn't find the right inode.. */
206                 CERROR("bad inode %lu, link: %d ct: %d or generation %u/%u\n",
207                        inode->i_ino, inode->i_nlink,
208                        atomic_read(&inode->i_count), inode->i_generation,
209                        generation);
210                 dput(result);
211                 RETURN(ERR_PTR(-ENOENT));
212         }
213
214         if (mnt) {
215                 *mnt = mds->mds_vfsmnt;
216                 mntget(*mnt);
217         }
218
219         RETURN(result);
220 }
221
222
223 /* Establish a connection to the MDS.
224  *
225  * This will set up an export structure for the client to hold state data
226  * about that client, like open files, the last operation number it did
227  * on the server, etc.
228  */
229 static int mds_connect(struct lustre_handle *conn, struct obd_device *obd,
230                        struct obd_uuid *cluuid)
231 {
232         struct obd_export *exp;
233         struct mds_export_data *med;
234         struct mds_client_data *mcd;
235         int rc, abort_recovery;
236         ENTRY;
237
238         if (!conn || !obd || !cluuid)
239                 RETURN(-EINVAL);
240
241         /* Check for aborted recovery. */
242         spin_lock_bh(&obd->obd_processing_task_lock);
243         abort_recovery = obd->obd_abort_recovery;
244         spin_unlock_bh(&obd->obd_processing_task_lock);
245         if (abort_recovery)
246                 target_abort_recovery(obd);
247
248         /* XXX There is a small race between checking the list and adding a
249          * new connection for the same UUID, but the real threat (list
250          * corruption when multiple different clients connect) is solved.
251          *
252          * There is a second race between adding the export to the list,
253          * and filling in the client data below.  Hence skipping the case
254          * of NULL mcd above.  We should already be controlling multiple
255          * connects at the client, and we can't hold the spinlock over
256          * memory allocations without risk of deadlocking.
257          */
258         rc = class_connect(conn, obd, cluuid);
259         if (rc)
260                 RETURN(rc);
261         exp = class_conn2export(conn);
262         LASSERT(exp);
263         med = &exp->exp_mds_data;
264         class_export_put(exp);
265
266         OBD_ALLOC(mcd, sizeof(*mcd));
267         if (!mcd) {
268                 CERROR("mds: out of memory for client data\n");
269                 GOTO(out_export, rc = -ENOMEM);
270         }
271
272         memcpy(mcd->mcd_uuid, cluuid, sizeof(mcd->mcd_uuid));
273         med->med_mcd = mcd;
274
275         INIT_LIST_HEAD(&med->med_open_head);
276         spin_lock_init(&med->med_open_lock);
277
278         rc = mds_client_add(obd, &obd->u.mds, med, -1);
279         if (rc)
280                 GOTO(out_mcd, rc);
281
282         RETURN(0);
283
284 out_mcd:
285         OBD_FREE(mcd, sizeof(*mcd));
286 out_export:
287         class_disconnect(conn, 0);
288
289         return rc;
290 }
291
292 static void mds_mfd_addref(void *mfdp)
293 {
294         struct mds_file_data *mfd = mfdp;
295
296         atomic_inc(&mfd->mfd_refcount);
297         CDEBUG(D_INFO, "GETting mfd %p : new refcount %d\n", mfd,
298                atomic_read(&mfd->mfd_refcount));
299 }
300
301 struct mds_file_data *mds_mfd_new(void)
302 {
303         struct mds_file_data *mfd;
304
305         OBD_ALLOC(mfd, sizeof *mfd);
306         if (mfd == NULL) {
307                 CERROR("mds: out of memory\n");
308                 return NULL;
309         }
310
311         atomic_set(&mfd->mfd_refcount, 2);
312
313         INIT_LIST_HEAD(&mfd->mfd_handle.h_link);
314         class_handle_hash(&mfd->mfd_handle, mds_mfd_addref);
315
316         return mfd;
317 }
318
319 static struct mds_file_data *mds_handle2mfd(struct lustre_handle *handle)
320 {
321         ENTRY;
322         LASSERT(handle != NULL);
323         RETURN(class_handle2object(handle->cookie));
324 }
325
326 void mds_mfd_put(struct mds_file_data *mfd)
327 {
328         CDEBUG(D_INFO, "PUTting mfd %p : new refcount %d\n", mfd,
329                atomic_read(&mfd->mfd_refcount) - 1);
330         LASSERT(atomic_read(&mfd->mfd_refcount) > 0 &&
331                 atomic_read(&mfd->mfd_refcount) < 0x5a5a);
332         if (atomic_dec_and_test(&mfd->mfd_refcount)) {
333                 LASSERT(list_empty(&mfd->mfd_handle.h_link));
334                 OBD_FREE(mfd, sizeof *mfd);
335         }
336 }
337
338 void mds_mfd_destroy(struct mds_file_data *mfd)
339 {
340         class_handle_unhash(&mfd->mfd_handle);
341         mds_mfd_put(mfd);
342 }
343
344 /* Call with med->med_open_lock held, please. */
345 static int mds_close_mfd(struct mds_file_data *mfd, struct mds_export_data *med)
346 {
347         struct dentry *de = NULL;
348
349 #ifdef CONFIG_SMP
350         LASSERT(spin_is_locked(&med->med_open_lock));
351 #endif
352         list_del(&mfd->mfd_list);
353
354         if (mfd->mfd_dentry->d_parent) {
355                 LASSERT(atomic_read(&mfd->mfd_dentry->d_parent->d_count));
356                 de = dget(mfd->mfd_dentry->d_parent);
357         }
358
359         /* this is the actual "close" */
360         l_dput(mfd->mfd_dentry);
361
362         if (de)
363                 l_dput(de);
364
365         mds_mfd_destroy(mfd);
366         RETURN(0);
367 }
368
369 static int mds_disconnect(struct lustre_handle *conn, int failover)
370 {
371         struct obd_export *export = class_conn2export(conn);
372         int rc;
373         unsigned long flags;
374         ENTRY;
375
376         ldlm_cancel_locks_for_export(export);
377
378         spin_lock_irqsave(&export->exp_lock, flags);
379         export->exp_failover = failover;
380         spin_unlock_irqrestore(&export->exp_lock, flags);
381
382         rc = class_disconnect(conn, failover);
383         class_export_put(export);
384
385         RETURN(rc);
386 }
387
388 static void mds_destroy_export(struct obd_export *export)
389 {
390         struct mds_export_data *med = &export->exp_mds_data;
391         struct list_head *tmp, *n;
392         int rc;
393
394         ENTRY;
395         LASSERT(!strcmp(export->exp_obd->obd_type->typ_name,
396                         LUSTRE_MDS_NAME));
397
398         /*
399          * Close any open files.
400          */
401         spin_lock(&med->med_open_lock);
402         list_for_each_safe(tmp, n, &med->med_open_head) {
403                 struct mds_file_data *mfd =
404                         list_entry(tmp, struct mds_file_data, mfd_list);
405 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
406                 struct dentry *dentry = mfd->mfd_dentry;
407                 CERROR("force closing client file handle for %*s (%s:%lu)\n",
408                        dentry->d_name.len, dentry->d_name.name,
409                        kdevname(dentry->d_inode->i_sb->s_dev),
410                        dentry->d_inode->i_ino);
411 #endif
412                 rc = mds_close_mfd(mfd, med);
413                 if (rc)
414                         CDEBUG(D_INODE, "Error closing file: %d\n", rc);
415         }
416         spin_unlock(&med->med_open_lock);
417
418         if (export->exp_outstanding_reply) {
419                 struct ptlrpc_request *req = export->exp_outstanding_reply;
420                 unsigned long          flags;
421
422                 /* Fake the ack, so the locks get cancelled. */
423                 LBUG ();
424                 /* Actually we can't do this because it prevents us knowing
425                  * if the ACK callback ran or not */
426                 spin_lock_irqsave (&req->rq_lock, flags);
427                 req->rq_want_ack = 0;
428                 req->rq_err = 1;
429                 wake_up(&req->rq_wait_for_rep);
430                 spin_unlock_irqrestore (&req->rq_lock, flags);
431
432                 export->exp_outstanding_reply = NULL;
433         }
434
435         if (!export->exp_failover)
436                 mds_client_free(export);
437         EXIT;
438 }
439
440 /*
441  * XXX This is NOT guaranteed to flush all transactions to disk (even though
442  *     it is equivalent to calling sync()) because it only _starts_ the flush
443  *     and does not wait for completion.  It's better than nothing though.
444  *     What we really want is a mild form of fsync_dev_lockfs(), but it is
445  *     non-standard, or enabling do_sync_supers in ext3, just for this call.
446  */
447 static void mds_fsync_super(struct super_block *sb)
448 {
449         lock_kernel();
450         lock_super(sb);
451         if (sb->s_dirt && sb->s_op && sb->s_op->write_super)
452                 sb->s_op->write_super(sb);
453         unlock_super(sb);
454         unlock_kernel();
455 }
456
457 static int mds_getstatus(struct ptlrpc_request *req)
458 {
459         struct mds_obd *mds = mds_req2mds(req);
460         struct mds_body *body;
461         int rc, size = sizeof(*body);
462         ENTRY;
463
464         rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
465         if (rc || OBD_FAIL_CHECK(OBD_FAIL_MDS_GETSTATUS_PACK)) {
466                 CERROR("mds: out of memory for message: size=%d\n", size);
467                 req->rq_status = -ENOMEM;       /* superfluous? */
468                 RETURN(-ENOMEM);
469         }
470
471         /* Flush any outstanding transactions to disk so the client will
472          * get the latest last_committed value and can drop their local
473          * requests if they have any.  This would be fsync_super() if it
474          * was exported.
475          */
476         mds_fsync_super(mds->mds_sb);
477
478         body = lustre_msg_buf(req->rq_repmsg, 0, sizeof (*body));
479         memcpy(&body->fid1, &mds->mds_rootfid, sizeof(body->fid1));
480
481         /* the last_committed and last_xid fields are filled in for all
482          * replies already - no need to do so here also.
483          */
484         RETURN(0);
485 }
486
487 static int mds_getlovinfo(struct ptlrpc_request *req)
488 {
489         struct mds_obd *mds = mds_req2mds(req);
490         struct mds_status_req *streq;
491         struct lov_desc *desc;
492         struct obd_uuid *uuid0;
493         int tgt_count;
494         int rc, size[2] = {sizeof(*desc)};
495         ENTRY;
496
497         streq = lustre_swab_reqbuf (req, 0, sizeof (*streq),
498                                     lustre_swab_mds_status_req);
499         if (streq == NULL) {
500                 CERROR ("Can't unpack mds_status_req\n");
501                 RETURN (-EFAULT);
502         }
503
504         if (streq->repbuf > LOV_MAX_UUID_BUFFER_SIZE) {
505                 CERROR ("Illegal request for uuid array > %d\n",
506                         streq->repbuf);
507                 RETURN (-EINVAL);
508         }
509         size[1] = streq->repbuf;
510
511         rc = lustre_pack_msg(2, size, NULL, &req->rq_replen, &req->rq_repmsg);
512         if (rc) {
513                 CERROR("mds: out of memory for message: size=%d\n", size[1]);
514                 RETURN(-ENOMEM);
515         }
516
517         if (!mds->mds_has_lov_desc) {
518                 req->rq_status = -ENOENT;
519                 RETURN(0);
520         }
521
522         /* XXX We're sending the lov_desc in my byte order.
523          * Receiver will swab... */
524         desc = lustre_msg_buf(req->rq_repmsg, 0, sizeof (*desc));
525         memcpy(desc, &mds->mds_lov_desc, sizeof (*desc));
526
527         tgt_count = mds->mds_lov_desc.ld_tgt_count;
528         uuid0 = lustre_msg_buf (req->rq_repmsg, 1,
529                                 tgt_count * sizeof (*uuid0));
530         if (uuid0 == NULL) {
531                 CERROR("too many targets, enlarge client buffers\n");
532                 req->rq_status = -ENOSPC;
533                 RETURN(0);
534         }
535
536         rc = mds_get_lovtgts(mds, tgt_count, uuid0);
537         if (rc) {
538                 CERROR("get_lovtgts error %d\n", rc);
539                 req->rq_status = rc;
540                 RETURN(0);
541         }
542         RETURN(0);
543 }
544
545 int mds_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc,
546                      void *data, int flag)
547 {
548         int do_ast;
549         ENTRY;
550
551         if (flag == LDLM_CB_CANCELING) {
552                 /* Don't need to do anything here. */
553                 RETURN(0);
554         }
555
556         /* XXX layering violation!  -phil */
557         l_lock(&lock->l_resource->lr_namespace->ns_lock);
558         /* Get this: if mds_blocking_ast is racing with ldlm_intent_policy,
559          * such that mds_blocking_ast is called just before l_i_p takes the
560          * ns_lock, then by the time we get the lock, we might not be the
561          * correct blocking function anymore.  So check, and return early, if
562          * so. */
563         if (lock->l_blocking_ast != mds_blocking_ast) {
564                 l_unlock(&lock->l_resource->lr_namespace->ns_lock);
565                 RETURN(0);
566         }
567
568         lock->l_flags |= LDLM_FL_CBPENDING;
569         do_ast = (!lock->l_readers && !lock->l_writers);
570         l_unlock(&lock->l_resource->lr_namespace->ns_lock);
571
572         if (do_ast) {
573                 struct lustre_handle lockh;
574                 int rc;
575
576                 LDLM_DEBUG(lock, "already unused, calling ldlm_cli_cancel");
577                 ldlm_lock2handle(lock, &lockh);
578                 rc = ldlm_cli_cancel(&lockh);
579                 if (rc < 0)
580                         CERROR("ldlm_cli_cancel: %d\n", rc);
581         } else {
582                 LDLM_DEBUG(lock, "Lock still has references, will be "
583                            "cancelled later");
584         }
585         RETURN(0);
586 }
587
588 int mds_pack_md(struct obd_device *obd, struct lustre_msg *msg,
589                 int offset, struct mds_body *body, struct inode *inode)
590 {
591         struct mds_obd *mds = &obd->u.mds;
592         struct lov_mds_md *lmm;
593         int lmm_size;
594         int rc;
595         ENTRY;
596
597         lmm = lustre_msg_buf(msg, offset, 0);
598         if (lmm == NULL) {
599                 /* Some problem with getting eadata when I sized the reply
600                  * buffer... */
601                 CDEBUG(D_INFO, "no space reserved for inode %lu MD\n",
602                        inode->i_ino);
603                 RETURN(0);
604         }
605         lmm_size = msg->buflens[offset];
606
607         /* I don't really like this, but it is a sanity check on the client
608          * MD request.  However, if the client doesn't know how much space
609          * to reserve for the MD, this shouldn't be fatal either...
610          */
611         if (lmm_size > mds->mds_max_mdsize) {
612                 CERROR("Reading MD for inode %lu of %d bytes > max %d\n",
613                        inode->i_ino, lmm_size, mds->mds_max_mdsize);
614                 // RETURN(-EINVAL);
615         }
616
617         rc = fsfilt_get_md(obd, inode, lmm, lmm_size);
618         if (rc < 0) {
619                 CERROR ("Error %d reading eadata for ino %lu\n",
620                         rc, inode->i_ino);
621         } else if (rc > 0) {
622                 body->valid |= OBD_MD_FLEASIZE;
623                 body->eadatasize = rc;
624                 rc = 0;
625         }
626
627         RETURN(rc);
628 }
629
630 static int mds_getattr_internal(struct obd_device *obd, struct dentry *dentry,
631                                 struct ptlrpc_request *req,
632                                 struct mds_body *reqbody, int reply_off)
633 {
634         struct mds_body *body;
635         struct inode *inode = dentry->d_inode;
636         int rc = 0;
637         ENTRY;
638
639         if (inode == NULL)
640                 RETURN(-ENOENT);
641
642         body = lustre_msg_buf(req->rq_repmsg, reply_off, sizeof (*body));
643         LASSERT (body != NULL);                 /* caller prepped reply */
644
645         mds_pack_inode2fid(&body->fid1, inode);
646         mds_pack_inode2body(body, inode);
647
648         if (S_ISREG(inode->i_mode) &&
649             (reqbody->valid & OBD_MD_FLEASIZE) != 0) {
650                 rc = mds_pack_md(obd, req->rq_repmsg, reply_off + 1,
651                                  body, inode);
652         } else if (S_ISLNK(inode->i_mode) &&
653                    (reqbody->valid & OBD_MD_LINKNAME) != 0) {
654                 char *symname = lustre_msg_buf(req->rq_repmsg, reply_off + 1, 0);
655                 int len;
656
657                 LASSERT (symname != NULL);       /* caller prepped reply */
658                 len = req->rq_repmsg->buflens[reply_off + 1];
659
660                 rc = inode->i_op->readlink(dentry, symname, len);
661                 if (rc < 0) {
662                         CERROR("readlink failed: %d\n", rc);
663                 } else if (rc != len - 1) {
664                         CERROR ("Unexpected readlink rc %d: expecting %d\n",
665                                 rc, len - 1);
666                         rc = -EINVAL;
667                 } else {
668                         CDEBUG(D_INODE, "read symlink dest %s\n", symname);
669                         body->valid |= OBD_MD_LINKNAME;
670                         body->eadatasize = rc + 1;
671                         symname[rc] = 0;        /* NULL terminate */
672                         rc = 0;
673                 }
674         }
675         RETURN(rc);
676 }
677
678 static int mds_getattr_pack_msg(struct ptlrpc_request *req, struct inode *inode,
679                                 int offset)
680 {
681         struct mds_obd *mds = mds_req2mds(req);
682         struct mds_body *body;
683         int rc = 0, size[2] = {sizeof(*body)}, bufcount = 1;
684         ENTRY;
685
686         body = lustre_msg_buf(req->rq_reqmsg, offset, sizeof (*body));
687         LASSERT (body != NULL);                 /* checked by caller */
688         LASSERT_REQSWABBED (req, offset);       /* swabbed by caller */
689
690         if (S_ISREG(inode->i_mode) &&
691             (body->valid & OBD_MD_FLEASIZE) != 0) {
692                 int rc = fsfilt_get_md(req->rq_export->exp_obd, inode, NULL, 0);
693                 CDEBUG(D_INODE, "got %d bytes MD data for inode %lu\n",
694                        rc, inode->i_ino);
695                 if (rc < 0) {
696                         if (rc != -ENODATA)
697                                 CERROR("error getting inode %lu MD: rc = %d\n",
698                                        inode->i_ino, rc);
699                         size[bufcount] = 0;
700                 } else if (rc > mds->mds_max_mdsize) {
701                         size[bufcount] = 0;
702                         CERROR("MD size %d larger than maximum possible %u\n",
703                                rc, mds->mds_max_mdsize);
704                 } else
705                         size[bufcount] = rc;
706                 bufcount++;
707         } else if (S_ISLNK (inode->i_mode) &&
708                    (body->valid & OBD_MD_LINKNAME) != 0) {
709                 if (inode->i_size + 1 != body->eadatasize)
710                         CERROR ("symlink size: %Lu, reply space: %d\n",
711                                 inode->i_size + 1, body->eadatasize);
712                 size[bufcount] = MIN(inode->i_size + 1, body->eadatasize);
713                 bufcount++;
714                 CDEBUG(D_INODE, "symlink size: %Lu, reply space: %d\n",
715                        inode->i_size + 1, body->eadatasize);
716         }
717
718         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_GETATTR_PACK)) {
719                 CERROR("failed MDS_GETATTR_PACK test\n");
720                 req->rq_status = -ENOMEM;
721                 GOTO(out, rc = -ENOMEM);
722         }
723
724         rc = lustre_pack_msg(bufcount, size, NULL, &req->rq_replen,
725                              &req->rq_repmsg);
726         if (rc) {
727                 CERROR("out of memoryK\n");
728                 req->rq_status = rc;
729                 GOTO(out, rc);
730         }
731
732         EXIT;
733  out:
734         return(rc);
735 }
736
737 /* This is more copy-and-paste from getattr_name than I'd like. */
738 static void reconstruct_getattr_name(int offset, struct ptlrpc_request *req,
739                                      struct lustre_handle *client_lockh)
740 {
741         struct obd_device *obd = req->rq_export->exp_obd;
742         struct mds_obd *mds = mds_req2mds(req);
743         struct dentry *parent, *child;
744         struct mds_body *body;
745         struct inode *dir;
746         struct obd_run_ctxt saved;
747         struct obd_ucred uc;
748         int namelen, rc = 0;
749         char *name;
750
751         if (req->rq_export->exp_outstanding_reply)
752                 mds_steal_ack_locks(req->rq_export, req);
753
754         body = lustre_msg_buf(req->rq_reqmsg, offset, sizeof (*body));
755         LASSERT (body != NULL);                 /* checked by caller */
756         LASSERT_REQSWABBED (req, offset);       /* swabbed by caller */
757
758         name = lustre_msg_string(req->rq_reqmsg, offset + 1, 0);
759         LASSERT (name != NULL);                 /* checked by caller */
760         LASSERT_REQSWABBED (req, offset + 1);   /* swabbed by caller */
761         namelen = req->rq_reqmsg->buflens[offset + 1];
762
763         LASSERT (offset == 2 || offset == 0);
764         /* requests were at offset 2, replies go back at 1 */
765         if (offset)
766                 offset = 1;
767
768         uc.ouc_fsuid = body->fsuid;
769         uc.ouc_fsgid = body->fsgid;
770         uc.ouc_cap = body->capability;
771         uc.ouc_suppgid1 = body->suppgid;
772         uc.ouc_suppgid2 = -1;
773         push_ctxt(&saved, &mds->mds_ctxt, &uc);
774         parent = mds_fid2dentry(mds, &body->fid1, NULL);
775         LASSERT(!IS_ERR(parent));
776         dir = parent->d_inode;
777         LASSERT(dir);
778         child = ll_lookup_one_len(name, parent, namelen - 1);
779         LASSERT(!IS_ERR(child));
780
781         if (req->rq_repmsg == NULL) {
782                 rc = mds_getattr_pack_msg(req, child->d_inode, offset);
783                 /* XXX need to handle error here */
784                 LASSERT (rc == 0);
785         }
786
787         rc = mds_getattr_internal(obd, child, req, body, offset);
788         req->rq_status = rc;
789         l_dput(child);
790         l_dput(parent);
791 }
792
793 static int mds_getattr_name(int offset, struct ptlrpc_request *req,
794                             struct lustre_handle *child_lockh)
795 {
796         struct mds_obd *mds = mds_req2mds(req);
797         struct obd_device *obd = req->rq_export->exp_obd;
798         struct obd_run_ctxt saved;
799         struct mds_body *body;
800         struct dentry *de = NULL, *dchild = NULL;
801         struct inode *dir;
802         struct obd_ucred uc;
803         struct ldlm_res_id child_res_id = { .name = {0} };
804         struct lustre_handle parent_lockh;
805         int namesize;
806         int flags = 0, rc = 0, cleanup_phase = 0, req_was_resent;
807         char *name;
808         ENTRY;
809
810         LASSERT(!strcmp(obd->obd_type->typ_name, "mds"));
811
812         /* Swab now, before anyone looks inside the request */
813
814         body = lustre_swab_reqbuf (req, offset, sizeof (*body),
815                                    lustre_swab_mds_body);
816         if (body == NULL) {
817                 CERROR ("Can't swab mds_body\n");
818                 GOTO (cleanup, rc = -EFAULT);
819         }
820
821         LASSERT_REQSWAB (req, offset + 1);
822         name = lustre_msg_string (req->rq_reqmsg, offset + 1, 0);
823         if (name == NULL) {
824                 CERROR ("Can't unpack name\n");
825                 GOTO (cleanup, rc = -EFAULT);
826         }
827         namesize = req->rq_reqmsg->buflens[offset + 1];
828
829         req_was_resent = lustre_msg_get_flags(req->rq_reqmsg) & MSG_RESENT;
830         if (child_lockh->cookie) {
831                 LASSERT(req_was_resent);
832                 reconstruct_getattr_name(offset, req, child_lockh);
833                 RETURN(0);
834         } else if (req_was_resent) {
835                 DEBUG_REQ(D_HA, req, "no reply for RESENT req");
836         }
837
838         LASSERT (offset == 0 || offset == 2);
839         /* if requests were at offset 2, replies go back at 1 */
840         if (offset)
841                 offset = 1;
842
843         uc.ouc_fsuid = body->fsuid;
844         uc.ouc_fsgid = body->fsgid;
845         uc.ouc_cap = body->capability;
846         uc.ouc_suppgid1 = body->suppgid;
847         uc.ouc_suppgid2 = -1;
848         push_ctxt(&saved, &mds->mds_ctxt, &uc);
849         /* Step 1: Lookup/lock parent */
850         de = mds_fid2locked_dentry(obd, &body->fid1, NULL, LCK_PR,
851                                    &parent_lockh);
852         if (IS_ERR(de))
853                 GOTO(cleanup, rc = PTR_ERR(de));
854         dir = de->d_inode;
855         LASSERT(dir);
856
857         cleanup_phase = 1; /* parent dentry and lock */
858
859         CDEBUG(D_INODE, "parent ino %lu, name %s\n", dir->i_ino, name);
860
861         /* Step 2: Lookup child */
862         dchild = ll_lookup_one_len(name, de, namesize - 1);
863         if (IS_ERR(dchild)) {
864                 CDEBUG(D_INODE, "child lookup error %ld\n", PTR_ERR(dchild));
865                 GOTO(cleanup, rc = PTR_ERR(dchild));
866         }
867
868         cleanup_phase = 2; /* child dentry */
869
870         if (dchild->d_inode == NULL) {
871                 GOTO(cleanup, rc = -ENOENT);
872         }
873
874         /* Step 3: Lock child */
875         child_res_id.name[0] = dchild->d_inode->i_ino;
876         child_res_id.name[1] = dchild->d_inode->i_generation;
877         rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace, NULL,
878                               child_res_id, LDLM_PLAIN, NULL, 0, LCK_PR,
879                               &flags, ldlm_completion_ast, mds_blocking_ast,
880                               NULL, child_lockh);
881         if (rc != ELDLM_OK) {
882                 CERROR("ldlm_cli_enqueue: %d\n", rc);
883                 GOTO(cleanup, rc = -EIO);
884         }
885
886         cleanup_phase = 3; /* child lock */
887
888         if (req->rq_repmsg == NULL) {
889                 rc = mds_getattr_pack_msg(req, dchild->d_inode, offset);
890                 if (rc != 0) {
891                         CERROR ("mds_getattr_pack_msg: %d\n", rc);
892                         GOTO (cleanup, rc);
893                 }
894         }
895
896         rc = mds_getattr_internal(obd, dchild, req, body, offset);
897         GOTO(cleanup, rc); /* returns the lock to the client */
898
899  cleanup:
900         switch (cleanup_phase) {
901         case 3:
902                 if (rc)
903                         ldlm_lock_decref(child_lockh, LCK_PR);
904         case 2:
905                 l_dput(dchild);
906
907         case 1:
908                 if (rc) {
909                         ldlm_lock_decref(&parent_lockh, LCK_PR);
910                 } else {
911                         memcpy(&req->rq_ack_locks[0].lock, &parent_lockh,
912                                sizeof(parent_lockh));
913                         req->rq_ack_locks[0].mode = LCK_PR;
914                 }
915                 l_dput(de);
916         default: ;
917         }
918         req->rq_status = rc;
919         pop_ctxt(&saved, &mds->mds_ctxt, &uc);
920         return rc;
921 }
922
923 static int mds_getattr(int offset, struct ptlrpc_request *req)
924 {
925         struct mds_obd *mds = mds_req2mds(req);
926         struct obd_device *obd = req->rq_export->exp_obd;
927         struct obd_run_ctxt saved;
928         struct dentry *de;
929         struct mds_body *body;
930         struct obd_ucred uc;
931         int rc = 0;
932         ENTRY;
933
934         body = lustre_swab_reqbuf (req, offset, sizeof (*body),
935                                    lustre_swab_mds_body);
936         if (body == NULL) {
937                 CERROR ("Can't unpack body\n");
938                 RETURN (-EFAULT);
939         }
940
941         uc.ouc_fsuid = body->fsuid;
942         uc.ouc_fsgid = body->fsgid;
943         uc.ouc_cap = body->capability;
944         push_ctxt(&saved, &mds->mds_ctxt, &uc);
945         de = mds_fid2dentry(mds, &body->fid1, NULL);
946         if (IS_ERR(de)) {
947                 rc = req->rq_status = -ENOENT;
948                 GOTO(out_pop, PTR_ERR(de));
949         }
950
951         rc = mds_getattr_pack_msg(req, de->d_inode, offset);
952         if (rc != 0) {
953                 CERROR ("mds_getattr_pack_msg: %d\n", rc);
954                 GOTO (out_pop, rc);
955         }
956
957         req->rq_status = mds_getattr_internal(obd, de, req, body, 0);
958
959         l_dput(de);
960         GOTO(out_pop, rc);
961 out_pop:
962         pop_ctxt(&saved, &mds->mds_ctxt, &uc);
963         return rc;
964 }
965
966 static int mds_statfs(struct ptlrpc_request *req)
967 {
968         struct obd_device *obd = req->rq_export->exp_obd;
969         struct obd_statfs *osfs;
970         int rc, size = sizeof(*osfs);
971         ENTRY;
972
973         rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
974         if (rc || OBD_FAIL_CHECK(OBD_FAIL_MDS_STATFS_PACK)) {
975                 CERROR("mds: statfs lustre_pack_msg failed: rc = %d\n", rc);
976                 GOTO(out, rc);
977         }
978
979         osfs = lustre_msg_buf(req->rq_repmsg, 0, sizeof (*osfs));
980         rc = fsfilt_statfs(obd, obd->u.mds.mds_sb, osfs);
981         if (rc) {
982                 CERROR("mds: statfs failed: rc %d\n", rc);
983                 GOTO(out, rc);
984         }
985
986         EXIT;
987 out:
988         req->rq_status = rc;
989         return 0;
990 }
991
992 static void reconstruct_close(struct ptlrpc_request *req)
993 {
994         struct mds_export_data *med = &req->rq_export->exp_mds_data;
995         struct mds_client_data *mcd = med->med_mcd;
996
997         req->rq_transno = mcd->mcd_last_transno;
998         req->rq_status = mcd->mcd_last_result;
999
1000         /* XXX When open-unlink is working, we'll need to steal ack locks as
1001          * XXX well, and make sure that we do the right unlinking after we
1002          * XXX get the ack back.
1003          */
1004 }
1005
1006 static int mds_close(struct ptlrpc_request *req)
1007 {
1008         struct mds_export_data *med = &req->rq_export->exp_mds_data;
1009         struct mds_body *body;
1010         struct mds_file_data *mfd;
1011         int rc;
1012         ENTRY;
1013
1014         MDS_CHECK_RESENT(req, reconstruct_close(req));
1015
1016         body = lustre_swab_reqbuf(req, 0, sizeof (*body),
1017                                   lustre_swab_mds_body);
1018         if (body == NULL) {
1019                 CERROR ("Can't unpack body\n");
1020                 RETURN (-EFAULT);
1021         }
1022
1023         mfd = mds_handle2mfd(&body->handle);
1024         if (mfd == NULL) {
1025                 DEBUG_REQ(D_ERROR, req, "no handle for file close "LPD64
1026                           ": cookie "LPX64"\n", body->fid1.id,
1027                           body->handle.cookie);
1028                 RETURN(-ESTALE);
1029         }
1030
1031         spin_lock(&med->med_open_lock);
1032         req->rq_status = mds_close_mfd(mfd, med);
1033         spin_unlock(&med->med_open_lock);
1034
1035         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_CLOSE_PACK)) {
1036                 CERROR("test case OBD_FAIL_MDS_CLOSE_PACK\n");
1037                 req->rq_status = -ENOMEM;
1038                 mds_mfd_put(mfd);
1039                 RETURN(-ENOMEM);
1040         }
1041
1042         rc = lustre_pack_msg(0, NULL, NULL, &req->rq_replen, &req->rq_repmsg);
1043         if (rc) {
1044                 CERROR("mds: lustre_pack_msg: rc = %d\n", rc);
1045                 req->rq_status = rc;
1046         }
1047
1048         mds_mfd_put(mfd);
1049         RETURN(0);
1050 }
1051
1052 static int mds_readpage(struct ptlrpc_request *req)
1053 {
1054         struct mds_obd *mds = mds_req2mds(req);
1055         struct vfsmount *mnt;
1056         struct dentry *de;
1057         struct file *file;
1058         struct mds_body *body, *repbody;
1059         struct obd_run_ctxt saved;
1060         int rc, size = sizeof(*repbody);
1061         struct obd_ucred uc;
1062         ENTRY;
1063
1064         rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
1065         if (rc || OBD_FAIL_CHECK(OBD_FAIL_MDS_READPAGE_PACK)) {
1066                 CERROR("mds: out of memory\n");
1067                 GOTO(out, rc = -ENOMEM);
1068         }
1069
1070         body = lustre_swab_reqbuf (req, 0, sizeof (*body),
1071                                    lustre_swab_mds_body);
1072         if (body == NULL)
1073                 GOTO (out, rc = -EFAULT);
1074
1075         /* body->size is actually the offset -eeb */
1076         if ((body->size & (PAGE_SIZE - 1)) != 0) {
1077                 CERROR ("offset "LPU64"not on a page boundary\n", body->size);
1078                 GOTO (out, rc = -EFAULT);
1079         }
1080
1081         /* body->nlink is actually the #bytes to read -eeb */
1082         if (body->nlink != PAGE_SIZE) {
1083                 CERROR ("size %d is not PAGE_SIZE\n", body->nlink);
1084                 GOTO (out, rc = -EFAULT);
1085         }
1086
1087         uc.ouc_fsuid = body->fsuid;
1088         uc.ouc_fsgid = body->fsgid;
1089         uc.ouc_cap = body->capability;
1090         push_ctxt(&saved, &mds->mds_ctxt, &uc);
1091         de = mds_fid2dentry(mds, &body->fid1, &mnt);
1092         if (IS_ERR(de))
1093                 GOTO(out_pop, rc = PTR_ERR(de));
1094
1095         CDEBUG(D_INODE, "ino %lu\n", de->d_inode->i_ino);
1096
1097         file = dentry_open(de, mnt, O_RDONLY | O_LARGEFILE);
1098         /* note: in case of an error, dentry_open puts dentry */
1099         if (IS_ERR(file))
1100                 GOTO(out_pop, rc = PTR_ERR(file));
1101
1102         repbody = lustre_msg_buf(req->rq_repmsg, 0, sizeof (*repbody));
1103         repbody->size = file->f_dentry->d_inode->i_size;
1104         repbody->valid = OBD_MD_FLSIZE;
1105
1106         /* to make this asynchronous make sure that the handling function
1107            doesn't send a reply when this function completes. Instead a
1108            callback function would send the reply */
1109         /* body->blocks is actually the xid -phil */
1110         /* body->size is actually the offset -eeb */
1111         rc = mds_sendpage(req, file, body->size, body->blocks);
1112
1113         filp_close(file, 0);
1114 out_pop:
1115         pop_ctxt(&saved, &mds->mds_ctxt, &uc);
1116 out:
1117         req->rq_status = rc;
1118         RETURN(0);
1119 }
1120
1121 int mds_reint(struct ptlrpc_request *req, int offset,
1122               struct lustre_handle *lockh)
1123 {
1124         struct mds_update_record *rec; /* 116 bytes on the stack?  no sir! */
1125         int rc;
1126
1127         OBD_ALLOC(rec, sizeof(*rec));
1128         if (rec == NULL)
1129                 RETURN(-ENOMEM);
1130
1131         rc = mds_update_unpack(req, offset, rec);
1132         if (rc || OBD_FAIL_CHECK(OBD_FAIL_MDS_REINT_UNPACK)) {
1133                 CERROR("invalid record\n");
1134                 GOTO(out, req->rq_status = -EINVAL);
1135         }
1136         /* rc will be used to interrupt a for loop over multiple records */
1137         rc = mds_reint_rec(rec, offset, req, lockh);
1138  out:
1139         OBD_FREE(rec, sizeof(*rec));
1140         return rc;
1141 }
1142
1143 static int filter_recovery_request(struct ptlrpc_request *req,
1144                                    struct obd_device *obd, int *process)
1145 {
1146         switch (req->rq_reqmsg->opc) {
1147         case MDS_CONNECT: /* This will never get here, but for completeness. */
1148         case OST_CONNECT: /* This will never get here, but for completeness. */
1149         case MDS_DISCONNECT:
1150         case OST_DISCONNECT:
1151                *process = 1;
1152                RETURN(0);
1153
1154         case MDS_CLOSE:
1155         case MDS_GETSTATUS: /* used in unmounting */
1156         case OBD_PING:
1157         case MDS_REINT:
1158         case LDLM_ENQUEUE:
1159                 *process = target_queue_recovery_request(req, obd);
1160                 RETURN(0);
1161
1162         default:
1163                 DEBUG_REQ(D_ERROR, req, "not permitted during recovery");
1164                 *process = 0;
1165                 /* XXX what should we set rq_status to here? */
1166                 req->rq_status = -EAGAIN;
1167                 RETURN(ptlrpc_error(req));
1168         }
1169 }
1170
1171 static char *reint_names[] = {
1172         [REINT_SETATTR] "setattr",
1173         [REINT_CREATE]  "create",
1174         [REINT_LINK]    "link",
1175         [REINT_UNLINK]  "unlink",
1176         [REINT_RENAME]  "rename",
1177         [REINT_OPEN]    "open",
1178 };
1179
1180 void mds_steal_ack_locks(struct obd_export *exp,
1181                          struct ptlrpc_request *req)
1182 {
1183         unsigned long  flags;
1184
1185         struct ptlrpc_request *oldrep = exp->exp_outstanding_reply;
1186         memcpy(req->rq_ack_locks, oldrep->rq_ack_locks,
1187                sizeof req->rq_ack_locks);
1188         spin_lock_irqsave (&req->rq_lock, flags);
1189         oldrep->rq_resent = 1;
1190         wake_up(&oldrep->rq_wait_for_rep);
1191         spin_unlock_irqrestore (&req->rq_lock, flags);
1192         DEBUG_REQ(D_HA, oldrep, "stole locks from");
1193         DEBUG_REQ(D_HA, req, "stole locks for");
1194 }
1195
1196 int mds_handle(struct ptlrpc_request *req)
1197 {
1198         int should_process;
1199         int rc = 0;
1200         struct mds_obd *mds = NULL; /* quell gcc overwarning */
1201         struct obd_device *obd = NULL;
1202         ENTRY;
1203
1204         OBD_FAIL_RETURN(OBD_FAIL_MDS_ALL_REQUEST_NET | OBD_FAIL_ONCE, 0);
1205
1206         LASSERT(!strcmp(req->rq_obd->obd_type->typ_name, LUSTRE_MDT_NAME));
1207
1208         /* XXX identical to OST */
1209         if (req->rq_reqmsg->opc != MDS_CONNECT) {
1210                 struct mds_export_data *med;
1211                 int recovering, abort_recovery;
1212
1213                 if (req->rq_export == NULL) {
1214                         CERROR("lustre_mds: operation %d on unconnected MDS\n",
1215                                req->rq_reqmsg->opc);
1216                         req->rq_status = -ENOTCONN;
1217                         GOTO(out, rc = -ENOTCONN);
1218                 }
1219
1220                 med = &req->rq_export->exp_mds_data;
1221                 obd = req->rq_export->exp_obd;
1222                 mds = &obd->u.mds;
1223
1224                 /* Check for aborted recovery. */
1225                 spin_lock_bh(&obd->obd_processing_task_lock);
1226                 abort_recovery = obd->obd_abort_recovery;
1227                 recovering = obd->obd_recovering;
1228                 spin_unlock_bh(&obd->obd_processing_task_lock);
1229                 if (abort_recovery) {
1230                         target_abort_recovery(obd);
1231                 } else if (recovering) {
1232                         rc = filter_recovery_request(req, obd, &should_process);
1233                         if (rc || !should_process)
1234                                 RETURN(rc);
1235                 }
1236         }
1237
1238         switch (req->rq_reqmsg->opc) {
1239         case MDS_CONNECT:
1240                 DEBUG_REQ(D_INODE, req, "connect");
1241                 OBD_FAIL_RETURN(OBD_FAIL_MDS_CONNECT_NET, 0);
1242                 rc = target_handle_connect(req, mds_handle);
1243                 /* Make sure that last_rcvd is correct. */
1244                 if (!rc) {
1245                         /* Now that we have an export, set mds. */
1246                         mds = mds_req2mds(req);
1247                         mds_fsync_super(mds->mds_sb);
1248                 }
1249                 break;
1250
1251         case MDS_DISCONNECT:
1252                 DEBUG_REQ(D_INODE, req, "disconnect");
1253                 OBD_FAIL_RETURN(OBD_FAIL_MDS_DISCONNECT_NET, 0);
1254                 rc = target_handle_disconnect(req);
1255                 /* Make sure that last_rcvd is correct. */
1256                 if (!rc)
1257                         mds_fsync_super(mds->mds_sb);
1258                 req->rq_status = rc;            /* superfluous? */
1259                 break;
1260
1261         case MDS_GETSTATUS:
1262                 DEBUG_REQ(D_INODE, req, "getstatus");
1263                 OBD_FAIL_RETURN(OBD_FAIL_MDS_GETSTATUS_NET, 0);
1264                 rc = mds_getstatus(req);
1265                 break;
1266
1267         case MDS_GETLOVINFO:
1268                 DEBUG_REQ(D_INODE, req, "getlovinfo");
1269                 rc = mds_getlovinfo(req);
1270                 break;
1271
1272         case MDS_GETATTR:
1273                 DEBUG_REQ(D_INODE, req, "getattr");
1274                 OBD_FAIL_RETURN(OBD_FAIL_MDS_GETATTR_NET, 0);
1275                 rc = mds_getattr(0, req);
1276                 break;
1277
1278         case MDS_GETATTR_NAME: {
1279                 struct lustre_handle lockh;
1280                 DEBUG_REQ(D_INODE, req, "getattr_name");
1281                 OBD_FAIL_RETURN(OBD_FAIL_MDS_GETATTR_NAME_NET, 0);
1282
1283                 /* If this request gets a reconstructed reply, we won't be
1284                  * acquiring any new locks in mds_getattr_name, so we don't
1285                  * want to cancel.
1286                  */
1287                 lockh.cookie = 0;
1288                 rc = mds_getattr_name(0, req, &lockh);
1289                 if (rc == 0 && lockh.cookie)
1290                         ldlm_lock_decref(&lockh, LCK_PR);
1291                 break;
1292         }
1293         case MDS_STATFS:
1294                 DEBUG_REQ(D_INODE, req, "statfs");
1295                 OBD_FAIL_RETURN(OBD_FAIL_MDS_STATFS_NET, 0);
1296                 rc = mds_statfs(req);
1297                 break;
1298
1299         case MDS_READPAGE:
1300                 DEBUG_REQ(D_INODE, req, "readpage");
1301                 OBD_FAIL_RETURN(OBD_FAIL_MDS_READPAGE_NET, 0);
1302                 rc = mds_readpage(req);
1303
1304                 if (OBD_FAIL_CHECK(OBD_FAIL_MDS_SENDPAGE))
1305                         return 0;
1306                 break;
1307
1308         case MDS_REINT: {
1309                 __u32 *opcp = lustre_msg_buf (req->rq_reqmsg, 0, sizeof (*opcp));
1310                 __u32  opc;
1311                 int size[2] = {sizeof(struct mds_body), mds->mds_max_mdsize};
1312                 int bufcount;
1313
1314                 /* NB only peek inside req now; mds_reint() will swab it */
1315                 if (opcp == NULL) {
1316                         CERROR ("Can't inspect opcode\n");
1317                         rc = -EINVAL;
1318                         break;
1319                 }
1320                 opc = *opcp;
1321                 if (lustre_msg_swabbed (req->rq_reqmsg))
1322                         __swab32s (&opc);
1323
1324                 DEBUG_REQ(D_INODE, req, "reint %d (%s)", opc,
1325                           (opc < sizeof (reint_names) / sizeof (reint_names[0]) ||
1326                            reint_names[opc] == NULL) ? reint_names[opc] : "unknown opcode");
1327
1328                 OBD_FAIL_RETURN(OBD_FAIL_MDS_REINT_NET, 0);
1329
1330                 if (opc == REINT_UNLINK)
1331                         bufcount = 2;
1332                 else
1333                         bufcount = 1;
1334
1335                 rc = lustre_pack_msg(bufcount, size, NULL,
1336                                      &req->rq_replen, &req->rq_repmsg);
1337                 if (rc)
1338                         break;
1339
1340                 rc = mds_reint(req, 0, NULL);
1341                 OBD_FAIL_RETURN(OBD_FAIL_MDS_REINT_NET_REP, 0);
1342                 break;
1343         }
1344
1345         case MDS_CLOSE:
1346                 DEBUG_REQ(D_INODE, req, "close");
1347                 OBD_FAIL_RETURN(OBD_FAIL_MDS_CLOSE_NET, 0);
1348                 rc = mds_close(req);
1349                 break;
1350
1351         case OBD_PING:
1352                 DEBUG_REQ(D_INODE, req, "ping");
1353                 rc = target_handle_ping(req);
1354                 break;
1355
1356         case LDLM_ENQUEUE:
1357                 DEBUG_REQ(D_INODE, req, "enqueue");
1358                 OBD_FAIL_RETURN(OBD_FAIL_LDLM_ENQUEUE, 0);
1359                 rc = ldlm_handle_enqueue(req, ldlm_server_completion_ast,
1360                                          ldlm_server_blocking_ast);
1361                 break;
1362         case LDLM_CONVERT:
1363                 DEBUG_REQ(D_INODE, req, "convert");
1364                 OBD_FAIL_RETURN(OBD_FAIL_LDLM_CONVERT, 0);
1365                 rc = ldlm_handle_convert(req);
1366                 break;
1367         case LDLM_BL_CALLBACK:
1368         case LDLM_CP_CALLBACK:
1369                 DEBUG_REQ(D_INODE, req, "callback");
1370                 CERROR("callbacks should not happen on MDS\n");
1371                 LBUG();
1372                 OBD_FAIL_RETURN(OBD_FAIL_LDLM_BL_CALLBACK, 0);
1373                 break;
1374         default:
1375                 req->rq_status = -ENOTSUPP;
1376                 rc = ptlrpc_error(req);
1377                 RETURN(rc);
1378         }
1379
1380         EXIT;
1381
1382         /* If we're DISCONNECTing, the mds_export_data is already freed */
1383         if (!rc && req->rq_reqmsg->opc != MDS_DISCONNECT) {
1384                 struct mds_export_data *med = &req->rq_export->exp_mds_data;
1385                 struct obd_device *obd = list_entry(mds, struct obd_device,
1386                                                     u.mds);
1387                 req->rq_repmsg->last_xid =
1388                         le64_to_cpu (med->med_mcd->mcd_last_xid);
1389
1390                 if (!obd->obd_no_transno) {
1391                         req->rq_repmsg->last_committed =
1392                                 obd->obd_last_committed;
1393                 } else {
1394                         DEBUG_REQ(D_IOCTL, req,
1395                                   "not sending last_committed update");
1396                 }
1397                 CDEBUG(D_INFO, "last_transno "LPU64", last_committed "LPU64
1398                        ", xid "LPU64"\n",
1399                        mds->mds_last_transno, obd->obd_last_committed,
1400                        req->rq_xid);
1401         }
1402  out:
1403
1404         if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_LAST_REPLAY) {
1405                 if (obd && obd->obd_recovering) {
1406                         DEBUG_REQ(D_HA, req, "LAST_REPLAY, queuing reply");
1407                         return target_queue_final_reply(req, rc);
1408                 }
1409                 /* Lost a race with recovery; let the error path DTRT. */
1410                 rc = req->rq_status = -ENOTCONN;
1411         }
1412
1413         target_send_reply(req, rc, OBD_FAIL_MDS_ALL_REPLY_NET);
1414         return 0;
1415 }
1416
1417 /* Update the server data on disk.  This stores the new mount_count and
1418  * also the last_rcvd value to disk.  If we don't have a clean shutdown,
1419  * then the server last_rcvd value may be less than that of the clients.
1420  * This will alert us that we may need to do client recovery.
1421  *
1422  * Also assumes for mds_last_transno that we are not modifying it (no locking).
1423  */
1424 int mds_update_server_data(struct mds_obd *mds)
1425 {
1426         struct mds_server_data *msd = mds->mds_server_data;
1427         struct file *filp = mds->mds_rcvd_filp;
1428         struct obd_run_ctxt saved;
1429         loff_t off = 0;
1430         int rc;
1431
1432         push_ctxt(&saved, &mds->mds_ctxt, NULL);
1433         msd->msd_last_transno = cpu_to_le64(mds->mds_last_transno);
1434         msd->msd_mount_count = cpu_to_le64(mds->mds_mount_count);
1435
1436         CDEBUG(D_SUPER, "MDS mount_count is %Lu, last_transno is %Lu\n",
1437                (unsigned long long)mds->mds_mount_count,
1438                (unsigned long long)mds->mds_last_transno);
1439         rc = lustre_fwrite(filp, (char *)msd, sizeof(*msd), &off);
1440         if (rc != sizeof(*msd)) {
1441                 CERROR("error writing MDS server data: rc = %d\n", rc);
1442                 if (rc > 0)
1443                         rc = -EIO;
1444                 GOTO(out, rc);
1445         }
1446 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
1447         rc = fsync_dev(filp->f_dentry->d_inode->i_rdev);
1448 #else
1449         rc = file_fsync(filp, filp->f_dentry, 1);
1450 #endif
1451         if (rc)
1452                 CERROR("error flushing MDS server data: rc = %d\n", rc);
1453
1454 out:
1455         pop_ctxt(&saved, &mds->mds_ctxt, NULL);
1456         RETURN(rc);
1457 }
1458
1459 /* mount the file system (secretly) */
1460 static int mds_setup(struct obd_device *obddev, obd_count len, void *buf)
1461 {
1462         struct obd_ioctl_data* data = buf;
1463         struct mds_obd *mds = &obddev->u.mds;
1464         struct vfsmount *mnt;
1465         int rc = 0;
1466         unsigned long page;
1467         ENTRY;
1468
1469
1470 #ifdef CONFIG_DEV_RDONLY
1471         dev_clear_rdonly(2);
1472 #endif
1473         if (!data->ioc_inlbuf1 || !data->ioc_inlbuf2)
1474                 RETURN(rc = -EINVAL);
1475
1476         obddev->obd_fsops = fsfilt_get_ops(data->ioc_inlbuf2);
1477         if (IS_ERR(obddev->obd_fsops))
1478                 RETURN(rc = PTR_ERR(obddev->obd_fsops));
1479
1480
1481         if (data->ioc_inllen3 > 0 && data->ioc_inlbuf3) {
1482                 if (*data->ioc_inlbuf3 == '/') {
1483                         CERROR("mds namespace mount: %s\n", 
1484                                data->ioc_inlbuf3);
1485 //                        mds->mds_nspath = strdup(ioc->inlbuf4);
1486                 } else {
1487                         CERROR("namespace mount must be absolute path: '%s'\n",
1488                                data->ioc_inlbuf3);
1489                 }
1490         }
1491
1492         if (!(page = __get_free_page(GFP_KERNEL)))
1493                 return -ENOMEM;
1494
1495         memset((void *)page, 0, PAGE_SIZE);
1496         sprintf((char *)page, "iopen_nopriv");
1497
1498         mnt = do_kern_mount(data->ioc_inlbuf2, 0,
1499                             data->ioc_inlbuf1, (void *)page);
1500         free_page(page);
1501         if (IS_ERR(mnt)) {
1502                 rc = PTR_ERR(mnt);
1503                 CERROR("do_kern_mount failed: rc = %d\n", rc);
1504                 GOTO(err_ops, rc);
1505         }
1506
1507         CDEBUG(D_SUPER, "%s: mnt = %p\n", data->ioc_inlbuf1, mnt);
1508         mds->mds_sb = mnt->mnt_root->d_inode->i_sb;
1509         if (!mds->mds_sb)
1510                 GOTO(err_put, rc = -ENODEV);
1511
1512         spin_lock_init(&mds->mds_transno_lock);
1513         mds->mds_max_mdsize = sizeof(struct lov_mds_md);
1514         rc = mds_fs_setup(obddev, mnt);
1515         if (rc) {
1516                 CERROR("MDS filesystem method init failed: rc = %d\n", rc);
1517                 GOTO(err_put, rc);
1518         }
1519
1520         obddev->obd_namespace =
1521                 ldlm_namespace_new("mds_server", LDLM_NAMESPACE_SERVER);
1522         if (obddev->obd_namespace == NULL) {
1523                 mds_cleanup(obddev, 0, 0);
1524                 GOTO(err_fs, rc = -ENOMEM);
1525         }
1526
1527         ptlrpc_init_client(LDLM_CB_REQUEST_PORTAL, LDLM_CB_REPLY_PORTAL,
1528                            "mds_ldlm_client", &obddev->obd_ldlm_client);
1529
1530         mds->mds_has_lov_desc = 0;
1531
1532         RETURN(0);
1533
1534 err_fs:
1535         mds_fs_cleanup(obddev, 0);
1536 err_put:
1537         unlock_kernel();
1538         mntput(mds->mds_vfsmnt);
1539         mds->mds_sb = 0;
1540         lock_kernel();
1541 err_ops:
1542         fsfilt_put_ops(obddev->obd_fsops);
1543         return rc;
1544 }
1545
1546 static int mds_cleanup(struct obd_device *obddev, int force, int failover)
1547 {
1548         struct super_block *sb;
1549         struct mds_obd *mds = &obddev->u.mds;
1550         ENTRY;
1551
1552         sb = mds->mds_sb;
1553         if (!mds->mds_sb)
1554                 RETURN(0);
1555
1556         mds_update_server_data(mds);
1557         mds_fs_cleanup(obddev, failover);
1558
1559         unlock_kernel();
1560
1561         /* 2 seems normal on mds, (may_umount() also expects 2
1562           fwiw), but we only see 1 at this point in obdfilter. */
1563         if (atomic_read(&obddev->u.mds.mds_vfsmnt->mnt_count) > 2){
1564                 CERROR("%s: mount point busy, mnt_count: %d\n",
1565                        obddev->obd_name,
1566                        atomic_read(&obddev->u.mds.mds_vfsmnt->mnt_count));
1567         }
1568
1569         mntput(mds->mds_vfsmnt);
1570         mds->mds_sb = 0;
1571
1572         ldlm_namespace_free(obddev->obd_namespace);
1573
1574         if (obddev->obd_recovering)
1575                 target_cancel_recovery_timer(obddev);
1576         lock_kernel();
1577 #ifdef CONFIG_DEV_RDONLY
1578         dev_clear_rdonly(2);
1579 #endif
1580         fsfilt_put_ops(obddev->obd_fsops);
1581
1582         RETURN(0);
1583 }
1584
1585 static void fixup_handle_for_resent_req(struct ptlrpc_request *req,
1586                                         struct ldlm_lock *new_lock,
1587                                         struct lustre_handle *lockh)
1588 {
1589         struct obd_export *exp = req->rq_export;
1590         struct obd_device *obd = exp->exp_obd;
1591         struct ldlm_request *dlmreq =
1592                 lustre_msg_buf(req->rq_reqmsg, 0, sizeof (*dlmreq));
1593         struct lustre_handle remote_hdl = dlmreq->lock_handle1;
1594         struct list_head *iter;
1595
1596         if (!(lustre_msg_get_flags(req->rq_reqmsg) & MSG_RESENT))
1597                 return;
1598
1599         l_lock(&obd->obd_namespace->ns_lock);
1600         list_for_each(iter, &exp->exp_ldlm_data.led_held_locks) {
1601                 struct ldlm_lock *lock;
1602                 lock = list_entry(iter, struct ldlm_lock, l_export_chain);
1603                 if (lock == new_lock)
1604                         continue;
1605                 if (lock->l_remote_handle.cookie == remote_hdl.cookie) {
1606                         lockh->cookie = lock->l_handle.h_cookie;
1607                         DEBUG_REQ(D_HA, req, "restoring lock cookie "LPX64,
1608                                   lockh->cookie);
1609                         l_unlock(&obd->obd_namespace->ns_lock);
1610                         return;
1611                 }
1612
1613         }
1614         l_unlock(&obd->obd_namespace->ns_lock);
1615         DEBUG_REQ(D_HA, req, "no existing lock with rhandle "LPX64,
1616                   remote_hdl.cookie);
1617 }
1618
1619 static int ldlm_intent_policy(struct ldlm_namespace *ns,
1620                               struct ldlm_lock **lockp, void *req_cookie,
1621                               ldlm_mode_t mode, int flags, void *data)
1622 {
1623         struct ptlrpc_request *req = req_cookie;
1624         struct ldlm_lock *lock = *lockp;
1625         int rc = 0;
1626         ENTRY;
1627
1628         if (!req_cookie)
1629                 RETURN(0);
1630
1631         if (req->rq_reqmsg->bufcount > 1) {
1632                 /* an intent needs to be considered */
1633                 struct ldlm_intent *it;
1634                 struct mds_obd *mds = &req->rq_export->exp_obd->u.mds;
1635                 struct mds_body *mds_body;
1636                 struct ldlm_reply *rep;
1637                 struct lustre_handle lockh = { 0 };
1638                 struct ldlm_lock *new_lock;
1639                 int rc, offset = 2, repsize[3] = {sizeof(struct ldlm_reply),
1640                                                   sizeof(struct mds_body),
1641                                                   mds->mds_max_mdsize};
1642
1643                 it = lustre_swab_reqbuf (req, 1, sizeof (*it),
1644                                          lustre_swab_ldlm_intent);
1645                 if (it == NULL) {
1646                         CERROR ("Intent missing\n");
1647                         rc = req->rq_status = -EFAULT;
1648                         RETURN (rc);
1649                 }
1650
1651                 LDLM_DEBUG(lock, "intent policy, opc: %s",
1652                            ldlm_it2str(it->opc));
1653
1654                 rc = lustre_pack_msg(3, repsize, NULL, &req->rq_replen,
1655                                      &req->rq_repmsg);
1656                 if (rc) {
1657                         rc = req->rq_status = -ENOMEM;
1658                         RETURN(rc);
1659                 }
1660
1661                 rep = lustre_msg_buf(req->rq_repmsg, 0, sizeof (*rep));
1662                 rep->lock_policy_res1 = IT_INTENT_EXEC;
1663
1664                 fixup_handle_for_resent_req(req, lock, &lockh);
1665
1666                 /* execute policy */
1667                 switch ((long)it->opc) {
1668                 case IT_OPEN:
1669                 case IT_CREAT|IT_OPEN:
1670                         rc = mds_reint(req, offset, &lockh);
1671                         /* We return a dentry to the client if IT_OPEN_POS is
1672                          * set, or if we make it to the OPEN portion of the
1673                          * programme (which implies that we created) */
1674                         if (!(rep->lock_policy_res1 & IT_OPEN_POS ||
1675                               rep->lock_policy_res1 & IT_OPEN_OPEN)) {
1676                                 rep->lock_policy_res2 = rc;
1677                                 RETURN(ELDLM_LOCK_ABORTED);
1678                         }
1679                         break;
1680                 case IT_UNLINK:
1681                         rc = mds_reint(req, offset, &lockh);
1682                         /* Don't return a lock if the unlink failed, or if we're
1683                          * not sending back an EA */
1684                         if (rc) {
1685                                 rep->lock_policy_res2 = rc;
1686                                 RETURN(ELDLM_LOCK_ABORTED);
1687                         }
1688                         if (req->rq_status != 0) {
1689                                 rep->lock_policy_res2 = req->rq_status;
1690                                 RETURN(ELDLM_LOCK_ABORTED);
1691                         }
1692                         mds_body = lustre_msg_buf(req->rq_repmsg, 1, sizeof (*mds_body));
1693                         if (!(mds_body->valid & OBD_MD_FLEASIZE)) {
1694                                 rep->lock_policy_res2 = rc;
1695                                 RETURN(ELDLM_LOCK_ABORTED);
1696                         }
1697                         break;
1698                 case IT_GETATTR:
1699                 case IT_LOOKUP:
1700                 case IT_READDIR:
1701                         rc = mds_getattr_name(offset, req, &lockh);
1702                         /* FIXME: we need to sit down and decide on who should
1703                          * set req->rq_status, who should return negative and
1704                          * positive return values, and what they all mean. */
1705                         if (rc) {
1706                                 rep->lock_policy_res2 = rc;
1707                                 RETURN(ELDLM_LOCK_ABORTED);
1708                         }
1709                         if (req->rq_status != 0) {
1710                                 rep->lock_policy_res2 = req->rq_status;
1711                                 RETURN(ELDLM_LOCK_ABORTED);
1712                         }
1713                         break;
1714                 default:
1715                         CERROR("Unhandled intent "LPD64"\n", it->opc);
1716                         LBUG();
1717                 }
1718
1719                 /* By this point, whatever function we called above must have
1720                  * filled in 'lockh' or returned an error.  We want to give the
1721                  * new lock to the client instead of whatever lock it was about
1722                  * to get. */
1723                 new_lock = ldlm_handle2lock(&lockh);
1724                 LASSERT(new_lock != NULL);
1725
1726                 /* If we've already given this lock to a client once, then we
1727                  * should have no readers or writers.  Otherwise, we should
1728                  * have one reader _or_ writer ref (which will be zeroed below
1729                  * before returning the lock to a client.
1730                  */
1731                 if (new_lock->l_export == req->rq_export)
1732                         LASSERT(new_lock->l_readers + new_lock->l_writers == 0);
1733                 else
1734                         LASSERT(new_lock->l_readers + new_lock->l_writers == 1);
1735
1736                 /* If we're running an intent only, we want to abort the new
1737                  * lock, and let the client abort the original lock. */
1738                 if (flags & LDLM_FL_INTENT_ONLY) {
1739                         LDLM_DEBUG(lock, "INTENT_ONLY, aborting locks");
1740                         l_lock(&new_lock->l_resource->lr_namespace->ns_lock);
1741                         if (new_lock->l_readers)
1742                                 ldlm_lock_decref(&lockh, LCK_PR);
1743                         else
1744                                 ldlm_lock_decref(&lockh, LCK_PW);
1745                         l_unlock(&new_lock->l_resource->lr_namespace->ns_lock);
1746                         LDLM_LOCK_PUT(new_lock);
1747                         RETURN(ELDLM_LOCK_ABORTED);
1748                 }
1749
1750                 *lockp = new_lock;
1751
1752                 rep->lock_policy_res2 = req->rq_status;
1753
1754                 if (new_lock->l_export == req->rq_export) {
1755                         /* Already gave this to the client, which means that we
1756                          * reconstructed a reply. */
1757                         LASSERT(lustre_msg_get_flags(req->rq_reqmsg) &
1758                                 MSG_RESENT);
1759                         RETURN(ELDLM_LOCK_REPLACED);
1760                 }
1761
1762                 /* Fixup the lock to be given to the client */
1763                 l_lock(&new_lock->l_resource->lr_namespace->ns_lock);
1764                 new_lock->l_readers = 0;
1765                 new_lock->l_writers = 0;
1766
1767                 new_lock->l_export = req->rq_export;
1768                 list_add(&new_lock->l_export_chain,
1769                          &new_lock->l_export->exp_ldlm_data.led_held_locks);
1770
1771                 /* We don't need to worry about completion_ast (which isn't set
1772                  * in 'lock' yet anyways), because this lock is already
1773                  * granted. */
1774                 new_lock->l_blocking_ast = lock->l_blocking_ast;
1775
1776                 memcpy(&new_lock->l_remote_handle, &lock->l_remote_handle,
1777                        sizeof(lock->l_remote_handle));
1778
1779                 new_lock->l_flags &= ~(LDLM_FL_LOCAL | LDLM_FL_AST_SENT |
1780                                        LDLM_FL_CBPENDING);
1781
1782                 LDLM_LOCK_PUT(new_lock);
1783                 l_unlock(&new_lock->l_resource->lr_namespace->ns_lock);
1784
1785                 RETURN(ELDLM_LOCK_REPLACED);
1786         } else {
1787                 int size = sizeof(struct ldlm_reply);
1788                 rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen,
1789                                      &req->rq_repmsg);
1790                 if (rc) {
1791                         LBUG();
1792                         RETURN(-ENOMEM);
1793                 }
1794         }
1795         RETURN(rc);
1796 }
1797
1798 int mds_attach(struct obd_device *dev, obd_count len, void *data)
1799 {
1800         struct lprocfs_static_vars lvars;
1801
1802         lprocfs_init_multi_vars(0, &lvars);
1803         return lprocfs_obd_attach(dev, lvars.obd_vars);
1804 }
1805
1806 int mds_detach(struct obd_device *dev)
1807 {
1808         return lprocfs_obd_detach(dev);
1809 }
1810
1811 int mdt_attach(struct obd_device *dev, obd_count len, void *data)
1812 {
1813         struct lprocfs_static_vars lvars;
1814
1815         lprocfs_init_multi_vars(1, &lvars);
1816         return lprocfs_obd_attach(dev, lvars.obd_vars);
1817 }
1818
1819 int mdt_detach(struct obd_device *dev)
1820 {
1821         return lprocfs_obd_detach(dev);
1822 }
1823
1824 static int mdt_setup(struct obd_device *obddev, obd_count len, void *buf)
1825 {
1826         struct mds_obd *mds = &obddev->u.mds;
1827         int i, rc = 0;
1828         ENTRY;
1829
1830         mds->mds_service = ptlrpc_init_svc(MDS_NEVENTS, MDS_NBUFS,
1831                                            MDS_BUFSIZE, MDS_MAXREQSIZE,
1832                                            MDS_REQUEST_PORTAL, MDC_REPLY_PORTAL,
1833                                            mds_handle, "mds", obddev);
1834
1835         if (!mds->mds_service) {
1836                 CERROR("failed to start service\n");
1837                 RETURN(rc = -ENOMEM);
1838         }
1839
1840         for (i = 0; i < MDT_NUM_THREADS; i++) {
1841                 char name[32];
1842                 sprintf(name, "ll_mdt_%02d", i);
1843                 rc = ptlrpc_start_thread(obddev, mds->mds_service, name);
1844                 if (rc) {
1845                         CERROR("cannot start MDT thread #%d: rc %d\n", i, rc);
1846                         GOTO(err_thread, rc);
1847                 }
1848         }
1849
1850         mds->mds_setattr_service =
1851                 ptlrpc_init_svc(MDS_NEVENTS, MDS_NBUFS,
1852                                 MDS_BUFSIZE, MDS_MAXREQSIZE,
1853                                 MDS_SETATTR_PORTAL, MDC_REPLY_PORTAL,
1854                                 mds_handle, "mds_setattr", obddev);
1855         if (!mds->mds_setattr_service) {
1856                 CERROR("failed to start getattr service\n");
1857                 GOTO(err_thread, rc = -ENOMEM);
1858         }
1859
1860         for (i = 0; i < MDT_NUM_THREADS; i++) {
1861                 char name[32];
1862                 sprintf(name, "ll_mdt_attr_%02d", i);
1863                 rc = ptlrpc_start_thread(obddev, mds->mds_setattr_service,
1864                                          name);
1865                 if (rc) {
1866                         CERROR("cannot start MDT setattr thread #%d: rc %d\n",
1867                                i, rc);
1868                         GOTO(err_thread2, rc);
1869                 }
1870         }
1871
1872         mds->mds_readpage_service =
1873                 ptlrpc_init_svc(MDS_NEVENTS, MDS_NBUFS,
1874                                 MDS_BUFSIZE, MDS_MAXREQSIZE,
1875                                 MDS_READPAGE_PORTAL, MDC_REPLY_PORTAL,
1876                                 mds_handle, "mds_readpage", obddev);
1877         if (!mds->mds_readpage_service) {
1878                 CERROR("failed to start readpage service\n");
1879                 GOTO(err_thread2, rc = -ENOMEM);
1880         }
1881
1882         for (i = 0; i < MDT_NUM_THREADS; i++) {
1883                 char name[32];
1884                 sprintf(name, "ll_mdt_rdpg_%02d", i);
1885                 rc = ptlrpc_start_thread(obddev, mds->mds_readpage_service,
1886                                          name);
1887                 if (rc) {
1888                         CERROR("cannot start MDT readpage thread #%d: rc %d\n",
1889                                i, rc);
1890                         GOTO(err_thread3, rc);
1891                 }
1892         }
1893
1894         RETURN(0);
1895
1896 err_thread3:
1897         ptlrpc_stop_all_threads(mds->mds_readpage_service);
1898         ptlrpc_unregister_service(mds->mds_readpage_service);
1899 err_thread2:
1900         ptlrpc_stop_all_threads(mds->mds_setattr_service);
1901         ptlrpc_unregister_service(mds->mds_setattr_service);
1902 err_thread:
1903         ptlrpc_stop_all_threads(mds->mds_service);
1904         ptlrpc_unregister_service(mds->mds_service);
1905         return rc;
1906 }
1907
1908
1909 static int mdt_cleanup(struct obd_device *obddev, int force, int failover)
1910 {
1911         struct mds_obd *mds = &obddev->u.mds;
1912         ENTRY;
1913
1914         ptlrpc_stop_all_threads(mds->mds_readpage_service);
1915         ptlrpc_unregister_service(mds->mds_readpage_service);
1916
1917         ptlrpc_stop_all_threads(mds->mds_setattr_service);
1918         ptlrpc_unregister_service(mds->mds_setattr_service);
1919
1920         ptlrpc_stop_all_threads(mds->mds_service);
1921         ptlrpc_unregister_service(mds->mds_service);
1922
1923         RETURN(0);
1924 }
1925
1926 extern int mds_iocontrol(unsigned int cmd, struct lustre_handle *conn,
1927                          int len, void *karg, void *uarg);
1928
1929 /* use obd ops to offer management infrastructure */
1930 static struct obd_ops mds_obd_ops = {
1931         o_owner:          THIS_MODULE,
1932         o_attach:         mds_attach,
1933         o_detach:         mds_detach,
1934         o_connect:        mds_connect,
1935         o_disconnect:     mds_disconnect,
1936         o_setup:          mds_setup,
1937         o_cleanup:        mds_cleanup,
1938         o_iocontrol:      mds_iocontrol,
1939         o_destroy_export: mds_destroy_export
1940 };
1941
1942 static struct obd_ops mdt_obd_ops = {
1943         o_owner:       THIS_MODULE,
1944         o_attach:      mdt_attach,
1945         o_detach:      mdt_detach,
1946         o_setup:       mdt_setup,
1947         o_cleanup:     mdt_cleanup,
1948 };
1949
1950
1951 static int __init mds_init(void)
1952 {
1953         struct lprocfs_static_vars lvars;
1954
1955         lprocfs_init_multi_vars(0, &lvars);
1956         class_register_type(&mds_obd_ops, lvars.module_vars, LUSTRE_MDS_NAME);
1957         lprocfs_init_multi_vars(1, &lvars);
1958         class_register_type(&mdt_obd_ops, lvars.module_vars, LUSTRE_MDT_NAME);
1959         ldlm_register_intent(ldlm_intent_policy);
1960
1961         return 0;
1962 }
1963
1964 static void __exit mds_exit(void)
1965 {
1966         ldlm_unregister_intent();
1967         class_unregister_type(LUSTRE_MDS_NAME);
1968         class_unregister_type(LUSTRE_MDT_NAME);
1969 }
1970
1971 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
1972 MODULE_DESCRIPTION("Lustre Metadata Server (MDS)");
1973 MODULE_LICENSE("GPL");
1974
1975 module_init(mds_init);
1976 module_exit(mds_exit);