Whamcloud - gitweb
Don't crash when an outstanding reply is pending on an export.
[fs/lustre-release.git] / lustre / mds / handler.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  lustre/mds/handler.c
5  *  Lustre Metadata Server (mds) request handler
6  *
7  *  Copyright (c) 2001-2003 Cluster File Systems, Inc.
8  *   Author: Peter Braam <braam@clusterfs.com>
9  *   Author: Andreas Dilger <adilger@clusterfs.com>
10  *   Author: Phil Schwan <phil@clusterfs.com>
11  *   Author: Mike Shaver <shaver@clusterfs.com>
12  *
13  *   This file is part of Lustre, http://www.lustre.org.
14  *
15  *   Lustre is free software; you can redistribute it and/or
16  *   modify it under the terms of version 2 of the GNU General Public
17  *   License as published by the Free Software Foundation.
18  *
19  *   Lustre is distributed in the hope that it will be useful,
20  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
21  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
22  *   GNU General Public License for more details.
23  *
24  *   You should have received a copy of the GNU General Public License
25  *   along with Lustre; if not, write to the Free Software
26  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
27  */
28
29 #define EXPORT_SYMTAB
30 #define DEBUG_SUBSYSTEM S_MDS
31
32 #include <linux/module.h>
33 #include <linux/lustre_mds.h>
34 #include <linux/lustre_dlm.h>
35 #include <linux/init.h>
36 #include <linux/obd_class.h>
37 #include <linux/random.h>
38 #include <linux/fs.h>
39 #include <linux/jbd.h>
40 #include <linux/ext3_fs.h>
41 #if (LINUX_VERSION_CODE >= KERNEL_VERSION(2,5,0))
42 # include <linux/smp_lock.h>
43 # include <linux/buffer_head.h>
44 # include <linux/workqueue.h>
45 # include <linux/mount.h>
46 #else
47 # include <linux/locks.h>
48 #endif
49 #include <linux/obd_lov.h>
50 #include <linux/lustre_mds.h>
51 #include <linux/lustre_fsfilt.h>
52 #include <linux/lprocfs_status.h>
53 #include <linux/lustre_commit_confd.h>
54
55 #include "mds_internal.h"
56
57 static int mds_cleanup(struct obd_device *obd, int flags);
58
59 static int mds_bulk_timeout(void *data)
60 {
61         struct ptlrpc_bulk_desc *desc = data;
62         struct obd_export *exp = desc->bd_export;
63
64         CERROR("bulk send timed out: evicting %s@%s\n",
65                exp->exp_client_uuid.uuid,
66                exp->exp_connection->c_remote_uuid.uuid);
67         ptlrpc_fail_export(exp);
68         ptlrpc_abort_bulk (desc);
69         RETURN(1);
70 }
71
72 /* Assumes caller has already pushed into the kernel filesystem context */
73 static int mds_sendpage(struct ptlrpc_request *req, struct file *file,
74                         __u64 offset, __u64 xid)
75 {
76         struct ptlrpc_bulk_desc *desc;
77         struct l_wait_info lwi;
78         struct page *page;
79         int rc = 0;
80         ENTRY;
81
82         LASSERT ((offset & (PAGE_CACHE_SIZE - 1)) == 0);
83
84         desc = ptlrpc_prep_bulk_exp (req, BULK_PUT_SOURCE, MDS_BULK_PORTAL);
85         if (desc == NULL)
86                 GOTO(out, rc = -ENOMEM);
87
88         LASSERT (PAGE_SIZE == PAGE_CACHE_SIZE);
89         page = alloc_pages (GFP_KERNEL, 0);
90         if (page == NULL)
91                 GOTO(cleanup_bulk, rc = -ENOMEM);
92
93         rc = ptlrpc_prep_bulk_page(desc, page, 0, PAGE_CACHE_SIZE);
94         if (rc != 0)
95                 GOTO(cleanup_buf, rc);
96
97         CDEBUG(D_EXT2, "reading %lu@"LPU64" from dir %lu (size %llu)\n",
98                PAGE_CACHE_SIZE, offset, file->f_dentry->d_inode->i_ino,
99                file->f_dentry->d_inode->i_size);
100         rc = fsfilt_readpage(req->rq_export->exp_obd, file, page_address (page),
101                              PAGE_CACHE_SIZE, (loff_t *)&offset);
102
103         if (rc != PAGE_CACHE_SIZE)
104                 GOTO(cleanup_buf, rc = -EIO);
105
106         rc = ptlrpc_bulk_put(desc);
107         if (rc)
108                 GOTO(cleanup_buf, rc);
109
110         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_SENDPAGE)) {
111                 CERROR("obd_fail_loc=%x, fail operation rc=%d\n",
112                        OBD_FAIL_MDS_SENDPAGE, rc);
113                 ptlrpc_abort_bulk(desc);
114                 GOTO(cleanup_buf, rc);
115         }
116
117         lwi = LWI_TIMEOUT(obd_timeout * HZ, mds_bulk_timeout, desc);
118         rc = l_wait_event(desc->bd_waitq, ptlrpc_bulk_complete (desc), &lwi);
119         if (rc) {
120                 LASSERT (rc == -ETIMEDOUT);
121                 GOTO(cleanup_buf, rc);
122         }
123
124         EXIT;
125  cleanup_buf:
126         __free_pages (page, 0);
127  cleanup_bulk:
128         ptlrpc_free_bulk (desc);
129  out:
130         return rc;
131 }
132
133 /* only valid locked dentries or errors should be returned */
134 struct dentry *mds_fid2locked_dentry(struct obd_device *obd, struct ll_fid *fid,
135                                      struct vfsmount **mnt, int lock_mode,
136                                      struct lustre_handle *lockh)
137 {
138         struct mds_obd *mds = &obd->u.mds;
139         struct dentry *de = mds_fid2dentry(mds, fid, mnt), *retval = de;
140         struct ldlm_res_id res_id = { .name = {0} };
141         int flags = 0, rc;
142         ENTRY;
143
144         if (IS_ERR(de))
145                 RETURN(de);
146
147         res_id.name[0] = de->d_inode->i_ino;
148         res_id.name[1] = de->d_inode->i_generation;
149         rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace, NULL,
150                               res_id, LDLM_PLAIN, NULL, 0, lock_mode,
151                               &flags, ldlm_completion_ast,
152                               mds_blocking_ast, NULL, lockh);
153         if (rc != ELDLM_OK) {
154                 l_dput(de);
155                 retval = ERR_PTR(-ENOLCK); /* XXX translate ldlm code */
156         }
157
158         RETURN(retval);
159 }
160
161 #ifndef DCACHE_DISCONNECTED
162 #define DCACHE_DISCONNECTED DCACHE_NFSD_DISCONNECTED
163 #endif
164
165
166 /* Look up an entry by inode number. */
167 /* this function ONLY returns valid dget'd dentries with an initialized inode
168    or errors */
169 struct dentry *mds_fid2dentry(struct mds_obd *mds, struct ll_fid *fid,
170                               struct vfsmount **mnt)
171 {
172         char fid_name[32];
173         unsigned long ino = fid->id;
174         __u32 generation = fid->generation;
175         struct inode *inode;
176         struct dentry *result;
177
178         if (ino == 0)
179                 RETURN(ERR_PTR(-ESTALE));
180
181         snprintf(fid_name, sizeof(fid_name), "0x%lx", ino);
182
183         CDEBUG(D_DENTRY, "--> mds_fid2dentry: ino %lu, gen %u, sb %p\n",
184                ino, generation, mds->mds_sb);
185
186         /* under ext3 this is neither supposed to return bad inodes
187            nor NULL inodes. */
188         result = ll_lookup_one_len(fid_name, mds->mds_fid_de, strlen(fid_name));
189         if (IS_ERR(result))
190                 RETURN(result);
191
192         inode = result->d_inode;
193         if (!inode)
194                 RETURN(ERR_PTR(-ENOENT));
195
196         if (generation && inode->i_generation != generation) {
197                 /* we didn't find the right inode.. */
198                 CERROR("bad inode %lu, link: %d ct: %d or generation %u/%u\n",
199                        inode->i_ino, inode->i_nlink,
200                        atomic_read(&inode->i_count), inode->i_generation,
201                        generation);
202                 dput(result);
203                 RETURN(ERR_PTR(-ENOENT));
204         }
205
206         if (mnt) {
207                 *mnt = mds->mds_vfsmnt;
208                 mntget(*mnt);
209         }
210
211         RETURN(result);
212 }
213
214
215 /* Establish a connection to the MDS.
216  *
217  * This will set up an export structure for the client to hold state data
218  * about that client, like open files, the last operation number it did
219  * on the server, etc.
220  */
221 static int mds_connect(struct lustre_handle *conn, struct obd_device *obd,
222                        struct obd_uuid *cluuid)
223 {
224         struct obd_export *exp;
225         struct mds_export_data *med;
226         struct mds_client_data *mcd;
227         int rc, abort_recovery;
228         ENTRY;
229
230         if (!conn || !obd || !cluuid)
231                 RETURN(-EINVAL);
232
233         /* Check for aborted recovery. */
234         spin_lock_bh(&obd->obd_processing_task_lock);
235         abort_recovery = obd->obd_abort_recovery;
236         spin_unlock_bh(&obd->obd_processing_task_lock);
237         if (abort_recovery)
238                 target_abort_recovery(obd);
239
240         /* XXX There is a small race between checking the list and adding a
241          * new connection for the same UUID, but the real threat (list
242          * corruption when multiple different clients connect) is solved.
243          *
244          * There is a second race between adding the export to the list,
245          * and filling in the client data below.  Hence skipping the case
246          * of NULL mcd above.  We should already be controlling multiple
247          * connects at the client, and we can't hold the spinlock over
248          * memory allocations without risk of deadlocking.
249          */
250         rc = class_connect(conn, obd, cluuid);
251         if (rc)
252                 RETURN(rc);
253         exp = class_conn2export(conn);
254         LASSERT(exp);
255         med = &exp->exp_mds_data;
256         class_export_put(exp);
257
258         OBD_ALLOC(mcd, sizeof(*mcd));
259         if (!mcd) {
260                 CERROR("mds: out of memory for client data\n");
261                 GOTO(out_export, rc = -ENOMEM);
262         }
263
264         memcpy(mcd->mcd_uuid, cluuid, sizeof(mcd->mcd_uuid));
265         med->med_mcd = mcd;
266
267         INIT_LIST_HEAD(&med->med_open_head);
268         spin_lock_init(&med->med_open_lock);
269
270         rc = mds_client_add(obd, &obd->u.mds, med, -1);
271         if (rc)
272                 GOTO(out_mcd, rc);
273
274         RETURN(0);
275
276 out_mcd:
277         OBD_FREE(mcd, sizeof(*mcd));
278 out_export:
279         class_disconnect(conn, 0);
280
281         return rc;
282 }
283
284 static void mds_mfd_addref(void *mfdp)
285 {
286         struct mds_file_data *mfd = mfdp;
287
288         atomic_inc(&mfd->mfd_refcount);
289         CDEBUG(D_INFO, "GETting mfd %p : new refcount %d\n", mfd,
290                atomic_read(&mfd->mfd_refcount));
291 }
292
293 struct mds_file_data *mds_mfd_new(void)
294 {
295         struct mds_file_data *mfd;
296
297         OBD_ALLOC(mfd, sizeof *mfd);
298         if (mfd == NULL) {
299                 CERROR("mds: out of memory\n");
300                 return NULL;
301         }
302
303         atomic_set(&mfd->mfd_refcount, 2);
304
305         INIT_LIST_HEAD(&mfd->mfd_handle.h_link);
306         class_handle_hash(&mfd->mfd_handle, mds_mfd_addref);
307
308         return mfd;
309 }
310
311 static struct mds_file_data *mds_handle2mfd(struct lustre_handle *handle)
312 {
313         ENTRY;
314         LASSERT(handle != NULL);
315         RETURN(class_handle2object(handle->cookie));
316 }
317
318 void mds_mfd_put(struct mds_file_data *mfd)
319 {
320         CDEBUG(D_INFO, "PUTting mfd %p : new refcount %d\n", mfd,
321                atomic_read(&mfd->mfd_refcount) - 1);
322         LASSERT(atomic_read(&mfd->mfd_refcount) > 0 &&
323                 atomic_read(&mfd->mfd_refcount) < 0x5a5a);
324         if (atomic_dec_and_test(&mfd->mfd_refcount)) {
325                 LASSERT(list_empty(&mfd->mfd_handle.h_link));
326                 OBD_FREE(mfd, sizeof *mfd);
327         }
328 }
329
330 void mds_mfd_destroy(struct mds_file_data *mfd)
331 {
332         class_handle_unhash(&mfd->mfd_handle);
333         mds_mfd_put(mfd);
334 }
335
336 /* Close a "file descriptor" and possibly unlink an orphan from the
337  * PENDING directory.
338  *
339  * If we are being called from mds_disconnect() because the client has
340  * disappeared, then req == NULL and we do not update last_rcvd because
341  * there is nothing that could be recovered by the client at this stage
342  * (it will not even _have_ an entry in last_rcvd anymore).
343  */
344 static int mds_mfd_close(struct ptlrpc_request *req, struct obd_device *obd,
345                          struct mds_file_data *mfd)
346 {
347         struct dentry *dparent = mfd->mfd_dentry->d_parent;
348         struct inode *child_inode = mfd->mfd_dentry->d_inode;
349         char fidname[LL_FID_NAMELEN];
350         int last_orphan, fidlen, rc = 0;
351         ENTRY;
352
353         if (dparent) {
354                 LASSERT(atomic_read(&dparent->d_count) > 0);
355                 dparent = dget(dparent);
356         }
357
358         fidlen = ll_fid2str(fidname, child_inode->i_ino,
359                             child_inode->i_generation);
360
361         last_orphan = mds_open_orphan_dec_test(child_inode) &&
362                 mds_inode_is_orphan(child_inode);
363
364         /* this is the actual "close" */
365         l_dput(mfd->mfd_dentry);
366         mds_mfd_destroy(mfd);
367
368         if (dparent)
369                 l_dput(dparent);
370
371         if (last_orphan) {
372                 struct mds_obd *mds = &obd->u.mds;
373                 struct inode *pending_dir = mds->mds_pending_dir->d_inode;
374                 struct dentry *pending_child = NULL;
375                 void *handle;
376
377                 CDEBUG(D_ERROR, "destroying orphan object %s\n", fidname);
378
379                 /* Sadly, there is no easy way to save pending_child from
380                  * mds_reint_unlink() into mfd, so we need to re-lookup,
381                  * but normally it will still be in the dcache.
382                  */
383                 down(&pending_dir->i_sem);
384                 pending_child = lookup_one_len(fidname, mds->mds_pending_dir,
385                                                fidlen);
386                 if (IS_ERR(pending_child))
387                         GOTO(out_lock, rc = PTR_ERR(pending_child));
388                 LASSERT(pending_child->d_inode != NULL);
389
390                 handle = fsfilt_start(obd, pending_dir, FSFILT_OP_UNLINK, NULL);
391                 if (IS_ERR(handle))
392                         GOTO(out_dput, rc = PTR_ERR(handle));
393                 rc = vfs_unlink(pending_dir, pending_child);
394                 if (rc)
395                         CERROR("error unlinking orphan %s: rc %d\n",fidname,rc);
396
397                 if (req) {
398                         rc = mds_finish_transno(mds, pending_dir, handle, req,
399                                                 rc, 0);
400                 } else {
401                         int err = fsfilt_commit(obd, pending_dir, handle, 0);
402                         if (err) {
403                                 CERROR("error committing orphan unlink: %d\n",
404                                        err);
405                                 if (!rc)
406                                         rc = err;
407                         }
408                 }
409         out_dput:
410                 dput(pending_child);
411         out_lock:
412                 up(&pending_dir->i_sem);
413         }
414
415         RETURN(rc);
416 }
417
418 static int mds_disconnect(struct lustre_handle *conn, int flags)
419 {
420         struct obd_export *export = class_conn2export(conn);
421         struct mds_export_data *med = &export->exp_mds_data;
422         struct obd_device *obd = export->exp_obd;
423         struct obd_run_ctxt saved;
424         int rc;
425         ENTRY;
426
427         push_ctxt(&saved, &obd->u.mds.mds_ctxt, NULL);
428         /* Close any open files (which may also cause orphan unlinking). */
429         spin_lock(&med->med_open_lock);
430         while (!list_empty(&med->med_open_head)) {
431                 struct list_head *tmp = med->med_open_head.next;
432                 struct mds_file_data *mfd =
433                         list_entry(tmp, struct mds_file_data, mfd_list);
434 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
435                 /* bug 1579: fix force-closing for 2.5 */
436                 struct dentry *dentry = mfd->mfd_dentry;
437
438                 list_del(&mfd->mfd_list);
439                 spin_unlock(&med->med_open_lock);
440
441                 CERROR("force closing client file handle for %*s (%s:%lu)\n",
442                        dentry->d_name.len, dentry->d_name.name,
443                        kdevname(dentry->d_inode->i_sb->s_dev),
444                        dentry->d_inode->i_ino);
445                 rc = mds_mfd_close(NULL, obd, mfd);
446 #endif
447                 if (rc)
448                         CDEBUG(D_INODE, "Error closing file: %d\n", rc);
449                 spin_lock(&med->med_open_lock);
450         }
451         spin_unlock(&med->med_open_lock);
452         pop_ctxt(&saved, &obd->u.mds.mds_ctxt, NULL);
453
454         ldlm_cancel_locks_for_export(export);
455         if (!(flags & OBD_OPT_FAILOVER))
456                 mds_client_free(export);
457
458         rc = class_disconnect(conn, flags);
459         class_export_put(export);
460
461         RETURN(rc);
462 }
463
464 /*
465  * XXX This is NOT guaranteed to flush all transactions to disk (even though
466  *     it is equivalent to calling sync()) because it only _starts_ the flush
467  *     and does not wait for completion.  It's better than nothing though.
468  *     What we really want is a mild form of fsync_dev_lockfs(), but it is
469  *     non-standard, or enabling do_sync_supers in ext3, just for this call.
470  */
471 static void mds_fsync_super(struct super_block *sb)
472 {
473         lock_kernel();
474         lock_super(sb);
475 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
476         if (sb->s_dirt && sb->s_op && sb->s_op->write_super)
477                 sb->s_op->write_super(sb);
478 #else
479         if (sb->s_dirt && sb->s_op) {
480                 if (sb->s_op->sync_fs)
481                         sb->s_op->sync_fs(sb, 1);
482                 else if (sb->s_op->write_super)
483                         sb->s_op->write_super(sb);
484         }
485 #endif
486         unlock_super(sb);
487         unlock_kernel();
488 }
489
490 static int mds_getstatus(struct ptlrpc_request *req)
491 {
492         struct obd_device *obd = req->rq_export->exp_obd;
493         struct mds_obd *mds = mds_req2mds(req);
494         struct mds_body *body;
495         int rc, size = sizeof(*body);
496         ENTRY;
497
498         rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
499         if (rc || OBD_FAIL_CHECK(OBD_FAIL_MDS_GETSTATUS_PACK)) {
500                 CERROR("mds: out of memory for message: size=%d\n", size);
501                 req->rq_status = -ENOMEM;       /* superfluous? */
502                 RETURN(-ENOMEM);
503         }
504
505         /* Flush any outstanding transactions to disk so the client will
506          * get the latest last_committed value and can drop their local
507          * requests if they have any.  This would be fsync_super() if it
508          * was exported.
509          */
510         fsfilt_sync(obd, mds->mds_sb);
511
512         body = lustre_msg_buf(req->rq_repmsg, 0, sizeof (*body));
513         memcpy(&body->fid1, &mds->mds_rootfid, sizeof(body->fid1));
514
515         /* the last_committed and last_xid fields are filled in for all
516          * replies already - no need to do so here also.
517          */
518         RETURN(0);
519 }
520
521 static int mds_getlovinfo(struct ptlrpc_request *req)
522 {
523         struct mds_obd *mds = mds_req2mds(req);
524         struct mds_status_req *streq;
525         struct lov_desc *desc;
526         struct obd_uuid *uuid0;
527         int tgt_count;
528         int rc, size[2] = {sizeof(*desc)};
529         ENTRY;
530
531         streq = lustre_swab_reqbuf (req, 0, sizeof (*streq),
532                                     lustre_swab_mds_status_req);
533         if (streq == NULL) {
534                 CERROR ("Can't unpack mds_status_req\n");
535                 RETURN (-EFAULT);
536         }
537
538         if (streq->repbuf > LOV_MAX_UUID_BUFFER_SIZE) {
539                 CERROR ("Illegal request for uuid array > %d\n",
540                         streq->repbuf);
541                 RETURN (-EINVAL);
542         }
543         size[1] = streq->repbuf;
544
545         rc = lustre_pack_msg(2, size, NULL, &req->rq_replen, &req->rq_repmsg);
546         if (rc) {
547                 CERROR("mds: out of memory for message: size=%d\n", size[1]);
548                 RETURN(-ENOMEM);
549         }
550
551         if (!mds->mds_has_lov_desc) {
552                 req->rq_status = -ENOENT;
553                 RETURN(0);
554         }
555
556         /* XXX We're sending the lov_desc in my byte order.
557          * Receiver will swab... */
558         desc = lustre_msg_buf(req->rq_repmsg, 0, sizeof (*desc));
559         memcpy(desc, &mds->mds_lov_desc, sizeof (*desc));
560
561         tgt_count = mds->mds_lov_desc.ld_tgt_count;
562         uuid0 = lustre_msg_buf(req->rq_repmsg, 1, tgt_count * sizeof (*uuid0));
563         if (uuid0 == NULL) {
564                 CERROR("too many targets, enlarge client buffers\n");
565                 req->rq_status = -ENOSPC;
566                 RETURN(0);
567         }
568
569         rc = mds_get_lovtgts(mds, tgt_count, uuid0);
570         if (rc) {
571                 CERROR("get_lovtgts error %d\n", rc);
572                 req->rq_status = rc;
573                 RETURN(0);
574         }
575         memcpy(&mds->mds_osc_uuid, &mds->mds_lov_desc.ld_uuid,
576                sizeof(mds->mds_osc_uuid));
577         RETURN(0);
578 }
579
580 int mds_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc,
581                      void *data, int flag)
582 {
583         int do_ast;
584         ENTRY;
585
586         if (flag == LDLM_CB_CANCELING) {
587                 /* Don't need to do anything here. */
588                 RETURN(0);
589         }
590
591         /* XXX layering violation!  -phil */
592         l_lock(&lock->l_resource->lr_namespace->ns_lock);
593         /* Get this: if mds_blocking_ast is racing with ldlm_intent_policy,
594          * such that mds_blocking_ast is called just before l_i_p takes the
595          * ns_lock, then by the time we get the lock, we might not be the
596          * correct blocking function anymore.  So check, and return early, if
597          * so. */
598         if (lock->l_blocking_ast != mds_blocking_ast) {
599                 l_unlock(&lock->l_resource->lr_namespace->ns_lock);
600                 RETURN(0);
601         }
602
603         lock->l_flags |= LDLM_FL_CBPENDING;
604         do_ast = (!lock->l_readers && !lock->l_writers);
605         l_unlock(&lock->l_resource->lr_namespace->ns_lock);
606
607         if (do_ast) {
608                 struct lustre_handle lockh;
609                 int rc;
610
611                 LDLM_DEBUG(lock, "already unused, calling ldlm_cli_cancel");
612                 ldlm_lock2handle(lock, &lockh);
613                 rc = ldlm_cli_cancel(&lockh);
614                 if (rc < 0)
615                         CERROR("ldlm_cli_cancel: %d\n", rc);
616         } else {
617                 LDLM_DEBUG(lock, "Lock still has references, will be "
618                            "cancelled later");
619         }
620         RETURN(0);
621 }
622
623 int mds_pack_md(struct obd_device *obd, struct lustre_msg *msg,
624                 int offset, struct mds_body *body, struct inode *inode)
625 {
626         struct mds_obd *mds = &obd->u.mds;
627         struct lov_mds_md *lmm;
628         int lmm_size;
629         int rc;
630         ENTRY;
631
632         lmm = lustre_msg_buf(msg, offset, 0);
633         if (lmm == NULL) {
634                 /* Some problem with getting eadata when I sized the reply
635                  * buffer... */
636                 CDEBUG(D_INFO, "no space reserved for inode %lu MD\n",
637                        inode->i_ino);
638                 RETURN(0);
639         }
640         lmm_size = msg->buflens[offset];
641
642         /* I don't really like this, but it is a sanity check on the client
643          * MD request.  However, if the client doesn't know how much space
644          * to reserve for the MD, this shouldn't be fatal either...
645          */
646         if (lmm_size > mds->mds_max_mdsize) {
647                 CERROR("Reading MD for inode %lu of %d bytes > max %d\n",
648                        inode->i_ino, lmm_size, mds->mds_max_mdsize);
649                 // RETURN(-EINVAL);
650         }
651
652         rc = fsfilt_get_md(obd, inode, lmm, lmm_size);
653         if (rc < 0) {
654                 CERROR("Error %d reading eadata for ino %lu\n",
655                        rc, inode->i_ino);
656         } else if (rc > 0) {
657                 body->valid |= OBD_MD_FLEASIZE;
658                 body->eadatasize = rc;
659                 rc = 0;
660         }
661
662         RETURN(rc);
663 }
664
665 static int mds_getattr_internal(struct obd_device *obd, struct dentry *dentry,
666                                 struct ptlrpc_request *req,
667                                 struct mds_body *reqbody, int reply_off)
668 {
669         struct mds_body *body;
670         struct inode *inode = dentry->d_inode;
671         int rc = 0;
672         ENTRY;
673
674         if (inode == NULL)
675                 RETURN(-ENOENT);
676
677         body = lustre_msg_buf(req->rq_repmsg, reply_off, sizeof(*body));
678         LASSERT(body != NULL);                 /* caller prepped reply */
679
680         mds_pack_inode2fid(&body->fid1, inode);
681         mds_pack_inode2body(body, inode);
682
683         if (S_ISREG(inode->i_mode) && (reqbody->valid & OBD_MD_FLEASIZE) != 0) {
684                 rc = mds_pack_md(obd, req->rq_repmsg, reply_off+1, body, inode);
685
686                 /* If we have LOV EA data, the OST holds size, atime, mtime */
687                 if (!(body->valid & OBD_MD_FLEASIZE))
688                         body->valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS |
689                                         OBD_MD_FLATIME | OBD_MD_FLMTIME);
690         } else if (S_ISLNK(inode->i_mode) &&
691                    (reqbody->valid & OBD_MD_LINKNAME) != 0) {
692                 char *symname = lustre_msg_buf(req->rq_repmsg, reply_off + 1,0);
693                 int len;
694
695                 LASSERT (symname != NULL);       /* caller prepped reply */
696                 len = req->rq_repmsg->buflens[reply_off + 1];
697
698                 rc = inode->i_op->readlink(dentry, symname, len);
699                 if (rc < 0) {
700                         CERROR("readlink failed: %d\n", rc);
701                 } else if (rc != len - 1) {
702                         CERROR ("Unexpected readlink rc %d: expecting %d\n",
703                                 rc, len - 1);
704                         rc = -EINVAL;
705                 } else {
706                         CDEBUG(D_INODE, "read symlink dest %s\n", symname);
707                         body->valid |= OBD_MD_LINKNAME;
708                         body->eadatasize = rc + 1;
709                         symname[rc] = 0;        /* NULL terminate */
710                         rc = 0;
711                 }
712         }
713
714         RETURN(rc);
715 }
716
717 static int mds_getattr_pack_msg(struct ptlrpc_request *req, struct inode *inode,
718                                 int offset)
719 {
720         struct mds_obd *mds = mds_req2mds(req);
721         struct mds_body *body;
722         int rc = 0, size[2] = {sizeof(*body)}, bufcount = 1;
723         ENTRY;
724
725         body = lustre_msg_buf(req->rq_reqmsg, offset, sizeof (*body));
726         LASSERT(body != NULL);                 /* checked by caller */
727         LASSERT_REQSWABBED(req, offset);       /* swabbed by caller */
728
729         if (S_ISREG(inode->i_mode) && (body->valid & OBD_MD_FLEASIZE)) {
730                 int rc = fsfilt_get_md(req->rq_export->exp_obd, inode, NULL, 0);
731                 CDEBUG(D_INODE, "got %d bytes MD data for inode %lu\n",
732                        rc, inode->i_ino);
733                 if (rc < 0) {
734                         if (rc != -ENODATA)
735                                 CERROR("error getting inode %lu MD: rc = %d\n",
736                                        inode->i_ino, rc);
737                         size[bufcount] = 0;
738                 } else if (rc > mds->mds_max_mdsize) {
739                         size[bufcount] = 0;
740                         CERROR("MD size %d larger than maximum possible %u\n",
741                                rc, mds->mds_max_mdsize);
742                 } else {
743                         size[bufcount] = rc;
744                 }
745                 bufcount++;
746         } else if (S_ISLNK(inode->i_mode) && (body->valid & OBD_MD_LINKNAME)) {
747                 if (inode->i_size + 1 != body->eadatasize)
748                         CERROR("symlink size: %Lu, reply space: %d\n",
749                                inode->i_size + 1, body->eadatasize);
750                 size[bufcount] = MIN(inode->i_size + 1, body->eadatasize);
751                 bufcount++;
752                 CDEBUG(D_INODE, "symlink size: %Lu, reply space: %d\n",
753                        inode->i_size + 1, body->eadatasize);
754         }
755
756         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_GETATTR_PACK)) {
757                 CERROR("failed MDS_GETATTR_PACK test\n");
758                 req->rq_status = -ENOMEM;
759                 GOTO(out, rc = -ENOMEM);
760         }
761
762         rc = lustre_pack_msg(bufcount, size, NULL, &req->rq_replen,
763                              &req->rq_repmsg);
764         if (rc) {
765                 CERROR("out of memory\n");
766                 GOTO(out, req->rq_status = rc);
767         }
768
769         EXIT;
770  out:
771         return(rc);
772 }
773
774 /* This is more copy-and-paste from getattr_name than I'd like. */
775 static void reconstruct_getattr_name(int offset, struct ptlrpc_request *req,
776                                      struct lustre_handle *client_lockh)
777 {
778         struct mds_export_data *med = &req->rq_export->exp_mds_data;
779         struct mds_client_data *mcd = med->med_mcd;
780         struct obd_device *obd = req->rq_export->exp_obd;
781         struct mds_obd *mds = mds_req2mds(req);
782         struct dentry *parent, *child;
783         struct mds_body *body;
784         struct inode *dir;
785         struct obd_run_ctxt saved;
786         struct obd_ucred uc;
787         int namelen, rc = 0;
788         char *name;
789
790         req->rq_transno = mcd->mcd_last_transno;
791         req->rq_status = mcd->mcd_last_result;
792
793         LASSERT (req->rq_export->exp_outstanding_reply);
794
795         mds_steal_ack_locks(req->rq_export, req);
796
797         if (req->rq_status)
798                 return;
799
800         body = lustre_msg_buf(req->rq_reqmsg, offset, sizeof (*body));
801         LASSERT (body != NULL);                 /* checked by caller */
802         LASSERT_REQSWABBED (req, offset);       /* swabbed by caller */
803
804         name = lustre_msg_string(req->rq_reqmsg, offset + 1, 0);
805         LASSERT (name != NULL);                 /* checked by caller */
806         LASSERT_REQSWABBED (req, offset + 1);   /* swabbed by caller */
807         namelen = req->rq_reqmsg->buflens[offset + 1];
808
809         LASSERT (offset == 2 || offset == 0);
810         /* requests were at offset 2, replies go back at 1 */
811         if (offset)
812                 offset = 1;
813
814         uc.ouc_fsuid = body->fsuid;
815         uc.ouc_fsgid = body->fsgid;
816         uc.ouc_cap = body->capability;
817         uc.ouc_suppgid1 = body->suppgid;
818         uc.ouc_suppgid2 = -1;
819
820         push_ctxt(&saved, &mds->mds_ctxt, &uc);
821         parent = mds_fid2dentry(mds, &body->fid1, NULL);
822         LASSERT(!IS_ERR(parent));
823         dir = parent->d_inode;
824         LASSERT(dir);
825         child = ll_lookup_one_len(name, parent, namelen - 1);
826         LASSERT(!IS_ERR(child));
827
828         if (req->rq_repmsg == NULL) {
829                 rc = mds_getattr_pack_msg(req, child->d_inode, offset);
830                 /* XXX need to handle error here */
831                 LASSERT (rc == 0);
832         }
833
834         rc = mds_getattr_internal(obd, child, req, body, offset);
835         /* XXX need to handle error here */
836         LASSERT(!rc);
837         l_dput(child);
838         l_dput(parent);
839 }
840
841 static int mds_getattr_name(int offset, struct ptlrpc_request *req,
842                             struct lustre_handle *child_lockh)
843 {
844         struct mds_obd *mds = mds_req2mds(req);
845         struct obd_device *obd = req->rq_export->exp_obd;
846         struct ldlm_reply *rep = NULL;
847         struct obd_run_ctxt saved;
848         struct mds_body *body;
849         struct dentry *de = NULL, *dchild = NULL;
850         struct inode *dir;
851         struct obd_ucred uc;
852         struct ldlm_res_id child_res_id = { .name = {0} };
853         struct lustre_handle parent_lockh;
854         int namesize;
855         int flags = 0, rc = 0, cleanup_phase = 0;
856         char *name;
857         ENTRY;
858
859         LASSERT(!strcmp(obd->obd_type->typ_name, "mds"));
860
861         /* Swab now, before anyone looks inside the request */
862
863         body = lustre_swab_reqbuf(req, offset, sizeof(*body),
864                                   lustre_swab_mds_body);
865         if (body == NULL) {
866                 CERROR("Can't swab mds_body\n");
867                 GOTO(cleanup, rc = -EFAULT);
868         }
869
870         LASSERT_REQSWAB(req, offset + 1);
871         name = lustre_msg_string(req->rq_reqmsg, offset + 1, 0);
872         if (name == NULL) {
873                 CERROR("Can't unpack name\n");
874                 GOTO(cleanup, rc = -EFAULT);
875         }
876         namesize = req->rq_reqmsg->buflens[offset + 1];
877
878         if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_RESENT) {
879                 struct obd_export *exp = req->rq_export;
880                 if (exp->exp_outstanding_reply &&
881                     exp->exp_outstanding_reply->rq_xid == req->rq_xid) {
882                         reconstruct_getattr_name(offset, req, child_lockh);
883                         RETURN(0);
884                 }
885                 DEBUG_REQ(D_HA, req, "no reply for RESENT req (have "LPD64")",
886                           exp->exp_outstanding_reply ?
887                           exp->exp_outstanding_reply->rq_xid : (u64)0);
888         }
889
890         LASSERT (offset == 0 || offset == 2);
891         /* if requests were at offset 2, the getattr reply goes back at 1 */
892         if (offset) { 
893                 rep = lustre_msg_buf(req->rq_repmsg, 0, sizeof (*rep));
894                 offset = 1;
895         }
896
897         uc.ouc_fsuid = body->fsuid;
898         uc.ouc_fsgid = body->fsgid;
899         uc.ouc_cap = body->capability;
900         uc.ouc_suppgid1 = body->suppgid;
901         uc.ouc_suppgid2 = -1;
902         push_ctxt(&saved, &mds->mds_ctxt, &uc);
903         /* Step 1: Lookup/lock parent */
904         intent_set_disposition(rep, DISP_LOOKUP_EXECD);
905         de = mds_fid2locked_dentry(obd, &body->fid1, NULL, LCK_PR,
906                                    &parent_lockh);
907         if (IS_ERR(de))
908                 GOTO(cleanup, rc = PTR_ERR(de));
909         dir = de->d_inode;
910         LASSERT(dir);
911
912         cleanup_phase = 1; /* parent dentry and lock */
913
914         CDEBUG(D_INODE, "parent ino %lu, name %s\n", dir->i_ino, name);
915
916         /* Step 2: Lookup child */
917         dchild = ll_lookup_one_len(name, de, namesize - 1);
918         if (IS_ERR(dchild)) {
919                 CDEBUG(D_INODE, "child lookup error %ld\n", PTR_ERR(dchild));
920                 GOTO(cleanup, rc = PTR_ERR(dchild));
921         }
922
923         cleanup_phase = 2; /* child dentry */
924
925         if (dchild->d_inode == NULL) {
926                 intent_set_disposition(rep, DISP_LOOKUP_NEG);
927                 GOTO(cleanup, rc = -ENOENT);
928         } else {
929                 intent_set_disposition(rep, DISP_LOOKUP_POS);
930         }
931
932         /* Step 3: Lock child */
933         child_res_id.name[0] = dchild->d_inode->i_ino;
934         child_res_id.name[1] = dchild->d_inode->i_generation;
935         rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace, NULL,
936                               child_res_id, LDLM_PLAIN, NULL, 0, LCK_PR,
937                               &flags, ldlm_completion_ast, mds_blocking_ast,
938                               NULL, child_lockh);
939         if (rc != ELDLM_OK) {
940                 CERROR("ldlm_cli_enqueue: %d\n", rc);
941                 GOTO(cleanup, rc = -EIO);
942         }
943
944         cleanup_phase = 3; /* child lock */
945
946         if (req->rq_repmsg == NULL) {
947                 rc = mds_getattr_pack_msg(req, dchild->d_inode, offset);
948                 if (rc != 0) {
949                         CERROR ("mds_getattr_pack_msg: %d\n", rc);
950                         GOTO (cleanup, rc);
951                 }
952         }
953
954         rc = mds_getattr_internal(obd, dchild, req, body, offset);
955         GOTO(cleanup, rc); /* returns the lock to the client */
956
957  cleanup:
958         switch (cleanup_phase) {
959         case 3:
960                 if (rc)
961                         ldlm_lock_decref(child_lockh, LCK_PR);
962         case 2:
963                 l_dput(dchild);
964
965         case 1:
966                 if (rc) {
967                         ldlm_lock_decref(&parent_lockh, LCK_PR);
968                 } else {
969                         memcpy(&req->rq_ack_locks[0].lock, &parent_lockh,
970                                sizeof(parent_lockh));
971                         req->rq_ack_locks[0].mode = LCK_PR;
972                 }
973                 l_dput(de);
974         default: ;
975         }
976         req->rq_status = rc;
977         pop_ctxt(&saved, &mds->mds_ctxt, &uc);
978         return rc;
979 }
980
981 static int mds_getattr(int offset, struct ptlrpc_request *req)
982 {
983         struct mds_obd *mds = mds_req2mds(req);
984         struct obd_device *obd = req->rq_export->exp_obd;
985         struct obd_run_ctxt saved;
986         struct dentry *de;
987         struct mds_body *body;
988         struct obd_ucred uc;
989         int rc = 0;
990         ENTRY;
991
992         body = lustre_swab_reqbuf (req, offset, sizeof (*body),
993                                    lustre_swab_mds_body);
994         if (body == NULL) {
995                 CERROR ("Can't unpack body\n");
996                 RETURN (-EFAULT);
997         }
998
999         uc.ouc_fsuid = body->fsuid;
1000         uc.ouc_fsgid = body->fsgid;
1001         uc.ouc_cap = body->capability;
1002         push_ctxt(&saved, &mds->mds_ctxt, &uc);
1003         de = mds_fid2dentry(mds, &body->fid1, NULL);
1004         if (IS_ERR(de)) {
1005                 rc = req->rq_status = -ENOENT;
1006                 GOTO(out_pop, PTR_ERR(de));
1007         }
1008
1009         rc = mds_getattr_pack_msg(req, de->d_inode, offset);
1010         if (rc != 0) {
1011                 CERROR ("mds_getattr_pack_msg: %d\n", rc);
1012                 GOTO (out_pop, rc);
1013         }
1014
1015         req->rq_status = mds_getattr_internal(obd, de, req, body, 0);
1016
1017         l_dput(de);
1018         GOTO(out_pop, rc);
1019 out_pop:
1020         pop_ctxt(&saved, &mds->mds_ctxt, &uc);
1021         return rc;
1022 }
1023
1024
1025 static int mds_obd_statfs(struct obd_device *obd, struct obd_statfs *osfs,
1026                           unsigned long max_age)
1027 {
1028         return fsfilt_statfs(obd, obd->u.mds.mds_sb, osfs);
1029 }
1030
1031 static int mds_statfs(struct ptlrpc_request *req)
1032 {
1033         struct obd_device *obd = req->rq_export->exp_obd;
1034         int rc, size = sizeof(struct obd_statfs);
1035         ENTRY;
1036
1037         rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
1038         if (rc || OBD_FAIL_CHECK(OBD_FAIL_MDS_STATFS_PACK)) {
1039                 CERROR("mds: statfs lustre_pack_msg failed: rc = %d\n", rc);
1040                 GOTO(out, rc);
1041         }
1042
1043         /* We call this so that we can cache a bit - 1 jiffie worth */
1044         rc = obd_statfs(obd, lustre_msg_buf(req->rq_repmsg,0,size),jiffies-HZ);
1045         if (rc) {
1046                 CERROR("mds_obd_statfs failed: rc %d\n", rc);
1047                 GOTO(out, rc);
1048         }
1049
1050         EXIT;
1051 out:
1052         req->rq_status = rc;
1053         return 0;
1054 }
1055
1056 static void reconstruct_close(struct ptlrpc_request *req)
1057 {
1058         struct mds_export_data *med = &req->rq_export->exp_mds_data;
1059         struct mds_client_data *mcd = med->med_mcd;
1060
1061         req->rq_transno = mcd->mcd_last_transno;
1062         req->rq_status = mcd->mcd_last_result;
1063
1064         /* XXX When open-unlink is working, we'll need to steal ack locks as
1065          * XXX well, and make sure that we do the right unlinking after we
1066          * XXX get the ack back.
1067          */
1068 }
1069
1070 static int mds_close(struct ptlrpc_request *req)
1071 {
1072         struct mds_export_data *med = &req->rq_export->exp_mds_data;
1073         struct obd_device *obd = req->rq_export->exp_obd;
1074         struct mds_body *body;
1075         struct mds_file_data *mfd;
1076         struct obd_run_ctxt saved;
1077         int rc;
1078         ENTRY;
1079
1080         MDS_CHECK_RESENT(req, reconstruct_close(req));
1081
1082         body = lustre_swab_reqbuf(req, 0, sizeof (*body),
1083                                   lustre_swab_mds_body);
1084         if (body == NULL) {
1085                 CERROR ("Can't unpack body\n");
1086                 RETURN (-EFAULT);
1087         }
1088
1089         mfd = mds_handle2mfd(&body->handle);
1090         if (mfd == NULL) {
1091                 DEBUG_REQ(D_ERROR, req, "no handle for file close "LPD64
1092                           ": cookie "LPX64"\n", body->fid1.id,
1093                           body->handle.cookie);
1094                 RETURN(-ESTALE);
1095         }
1096
1097         rc = lustre_pack_msg(0, NULL, NULL, &req->rq_replen, &req->rq_repmsg);
1098         if (rc) {
1099                 CERROR("lustre_pack_msg: rc = %d\n", rc);
1100                 req->rq_status = rc;
1101         }
1102
1103         spin_lock(&med->med_open_lock);
1104         list_del(&mfd->mfd_list);
1105         spin_unlock(&med->med_open_lock);
1106
1107         push_ctxt(&saved, &obd->u.mds.mds_ctxt, NULL);
1108         req->rq_status = mds_mfd_close(rc ? NULL : req, obd, mfd);
1109         pop_ctxt(&saved, &obd->u.mds.mds_ctxt, NULL);
1110
1111         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_CLOSE_PACK)) {
1112                 CERROR("test case OBD_FAIL_MDS_CLOSE_PACK\n");
1113                 req->rq_status = -ENOMEM;
1114                 mds_mfd_put(mfd);
1115                 RETURN(-ENOMEM);
1116         }
1117
1118         mds_mfd_put(mfd);
1119         RETURN(0);
1120 }
1121
1122 static int mds_readpage(struct ptlrpc_request *req)
1123 {
1124         struct mds_obd *mds = mds_req2mds(req);
1125         struct vfsmount *mnt;
1126         struct dentry *de;
1127         struct file *file;
1128         struct mds_body *body, *repbody;
1129         struct obd_run_ctxt saved;
1130         int rc, size = sizeof(*repbody);
1131         struct obd_ucred uc;
1132         ENTRY;
1133
1134         rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
1135         if (rc || OBD_FAIL_CHECK(OBD_FAIL_MDS_READPAGE_PACK)) {
1136                 CERROR("mds: out of memory\n");
1137                 GOTO(out, rc = -ENOMEM);
1138         }
1139
1140         body = lustre_swab_reqbuf (req, 0, sizeof (*body),
1141                                    lustre_swab_mds_body);
1142         if (body == NULL)
1143                 GOTO (out, rc = -EFAULT);
1144
1145         /* body->size is actually the offset -eeb */
1146         if ((body->size & ~PAGE_MASK) != 0) {
1147                 CERROR ("offset "LPU64"not on a page boundary\n", body->size);
1148                 GOTO (out, rc = -EFAULT);
1149         }
1150
1151         /* body->nlink is actually the #bytes to read -eeb */
1152         if (body->nlink != PAGE_SIZE) {
1153                 CERROR ("size %d is not PAGE_SIZE\n", body->nlink);
1154                 GOTO (out, rc = -EFAULT);
1155         }
1156
1157         uc.ouc_fsuid = body->fsuid;
1158         uc.ouc_fsgid = body->fsgid;
1159         uc.ouc_cap = body->capability;
1160         push_ctxt(&saved, &mds->mds_ctxt, &uc);
1161         de = mds_fid2dentry(mds, &body->fid1, &mnt);
1162         if (IS_ERR(de))
1163                 GOTO(out_pop, rc = PTR_ERR(de));
1164
1165         CDEBUG(D_INODE, "ino %lu\n", de->d_inode->i_ino);
1166
1167         file = dentry_open(de, mnt, O_RDONLY | O_LARGEFILE);
1168         /* note: in case of an error, dentry_open puts dentry */
1169         if (IS_ERR(file))
1170                 GOTO(out_pop, rc = PTR_ERR(file));
1171
1172         repbody = lustre_msg_buf(req->rq_repmsg, 0, sizeof (*repbody));
1173         repbody->size = file->f_dentry->d_inode->i_size;
1174         repbody->valid = OBD_MD_FLSIZE;
1175
1176         /* to make this asynchronous make sure that the handling function
1177            doesn't send a reply when this function completes. Instead a
1178            callback function would send the reply */
1179         /* body->blocks is actually the xid -phil */
1180         /* body->size is actually the offset -eeb */
1181         rc = mds_sendpage(req, file, body->size, body->blocks);
1182
1183         filp_close(file, 0);
1184 out_pop:
1185         pop_ctxt(&saved, &mds->mds_ctxt, &uc);
1186 out:
1187         req->rq_status = rc;
1188         RETURN(0);
1189 }
1190
1191 int mds_reint(struct ptlrpc_request *req, int offset,
1192               struct lustre_handle *lockh)
1193 {
1194         struct mds_update_record *rec; /* 116 bytes on the stack?  no sir! */
1195         int rc;
1196
1197         OBD_ALLOC(rec, sizeof(*rec));
1198         if (rec == NULL)
1199                 RETURN(-ENOMEM);
1200
1201         rc = mds_update_unpack(req, offset, rec);
1202         if (rc || OBD_FAIL_CHECK(OBD_FAIL_MDS_REINT_UNPACK)) {
1203                 CERROR("invalid record\n");
1204                 GOTO(out, req->rq_status = -EINVAL);
1205         }
1206         /* rc will be used to interrupt a for loop over multiple records */
1207         rc = mds_reint_rec(rec, offset, req, lockh);
1208  out:
1209         OBD_FREE(rec, sizeof(*rec));
1210         return rc;
1211 }
1212
1213 static int filter_recovery_request(struct ptlrpc_request *req,
1214                                    struct obd_device *obd, int *process)
1215 {
1216         switch (req->rq_reqmsg->opc) {
1217         case MDS_CONNECT: /* This will never get here, but for completeness. */
1218         case OST_CONNECT: /* This will never get here, but for completeness. */
1219         case MDS_DISCONNECT:
1220         case OST_DISCONNECT:
1221                *process = 1;
1222                RETURN(0);
1223
1224         case MDS_CLOSE:
1225         case MDS_GETSTATUS: /* used in unmounting */
1226         case OBD_PING:
1227         case MDS_REINT:
1228         case LDLM_ENQUEUE:
1229                 *process = target_queue_recovery_request(req, obd);
1230                 RETURN(0);
1231
1232         default:
1233                 DEBUG_REQ(D_ERROR, req, "not permitted during recovery");
1234                 *process = 0;
1235                 /* XXX what should we set rq_status to here? */
1236                 req->rq_status = -EAGAIN;
1237                 RETURN(ptlrpc_error(req));
1238         }
1239 }
1240
1241 static char *reint_names[] = {
1242         [REINT_SETATTR] "setattr",
1243         [REINT_CREATE]  "create",
1244         [REINT_LINK]    "link",
1245         [REINT_UNLINK]  "unlink",
1246         [REINT_RENAME]  "rename",
1247         [REINT_OPEN]    "open",
1248 };
1249
1250 void mds_steal_ack_locks(struct obd_export *exp,
1251                          struct ptlrpc_request *req)
1252 {
1253         unsigned long  flags;
1254
1255         struct ptlrpc_request *oldrep = exp->exp_outstanding_reply;
1256         memcpy(req->rq_ack_locks, oldrep->rq_ack_locks,
1257                sizeof req->rq_ack_locks);
1258         spin_lock_irqsave (&req->rq_lock, flags);
1259         oldrep->rq_resent = 1;
1260         wake_up(&oldrep->rq_wait_for_rep);
1261         spin_unlock_irqrestore (&req->rq_lock, flags);
1262         DEBUG_REQ(D_HA, oldrep, "stole locks from");
1263         DEBUG_REQ(D_HA, req, "stole locks for");
1264 }
1265
1266 int mds_handle(struct ptlrpc_request *req)
1267 {
1268         int should_process;
1269         int rc = 0;
1270         struct mds_obd *mds = NULL; /* quell gcc overwarning */
1271         struct obd_device *obd = NULL;
1272         ENTRY;
1273
1274         OBD_FAIL_RETURN(OBD_FAIL_MDS_ALL_REQUEST_NET | OBD_FAIL_ONCE, 0);
1275
1276         LASSERT(!strcmp(req->rq_obd->obd_type->typ_name, LUSTRE_MDT_NAME));
1277
1278         /* XXX identical to OST */
1279         if (req->rq_reqmsg->opc != MDS_CONNECT) {
1280                 struct mds_export_data *med;
1281                 int recovering, abort_recovery;
1282
1283                 if (req->rq_export == NULL) {
1284                         CERROR("lustre_mds: operation %d on unconnected MDS\n",
1285                                req->rq_reqmsg->opc);
1286                         req->rq_status = -ENOTCONN;
1287                         GOTO(out, rc = -ENOTCONN);
1288                 }
1289
1290                 med = &req->rq_export->exp_mds_data;
1291                 obd = req->rq_export->exp_obd;
1292                 mds = &obd->u.mds;
1293
1294                 /* Check for aborted recovery. */
1295                 spin_lock_bh(&obd->obd_processing_task_lock);
1296                 abort_recovery = obd->obd_abort_recovery;
1297                 recovering = obd->obd_recovering;
1298                 spin_unlock_bh(&obd->obd_processing_task_lock);
1299                 if (abort_recovery) {
1300                         target_abort_recovery(obd);
1301                 } else if (recovering) {
1302                         rc = filter_recovery_request(req, obd, &should_process);
1303                         if (rc || !should_process)
1304                                 RETURN(rc);
1305                 }
1306         }
1307
1308         switch (req->rq_reqmsg->opc) {
1309         case MDS_CONNECT:
1310                 DEBUG_REQ(D_INODE, req, "connect");
1311                 OBD_FAIL_RETURN(OBD_FAIL_MDS_CONNECT_NET, 0);
1312                 rc = target_handle_connect(req, mds_handle);
1313                 /* Make sure that last_rcvd is correct. */
1314                 if (!rc) {
1315                         /* Now that we have an export, set mds. */
1316                         mds = mds_req2mds(req);
1317                         mds_fsync_super(mds->mds_sb);
1318                 }
1319                 break;
1320
1321         case MDS_DISCONNECT:
1322                 DEBUG_REQ(D_INODE, req, "disconnect");
1323                 OBD_FAIL_RETURN(OBD_FAIL_MDS_DISCONNECT_NET, 0);
1324                 rc = target_handle_disconnect(req);
1325                 /* Make sure that last_rcvd is correct. */
1326                 if (!rc)
1327                         mds_fsync_super(mds->mds_sb);
1328                 req->rq_status = rc;            /* superfluous? */
1329                 break;
1330
1331         case MDS_GETSTATUS:
1332                 DEBUG_REQ(D_INODE, req, "getstatus");
1333                 OBD_FAIL_RETURN(OBD_FAIL_MDS_GETSTATUS_NET, 0);
1334                 rc = mds_getstatus(req);
1335                 break;
1336
1337         case MDS_GETLOVINFO:
1338                 DEBUG_REQ(D_INODE, req, "getlovinfo");
1339                 rc = mds_getlovinfo(req);
1340                 break;
1341
1342         case MDS_GETATTR:
1343                 DEBUG_REQ(D_INODE, req, "getattr");
1344                 OBD_FAIL_RETURN(OBD_FAIL_MDS_GETATTR_NET, 0);
1345                 rc = mds_getattr(0, req);
1346                 break;
1347
1348         case MDS_GETATTR_NAME: {
1349                 struct lustre_handle lockh;
1350                 DEBUG_REQ(D_INODE, req, "getattr_name");
1351                 OBD_FAIL_RETURN(OBD_FAIL_MDS_GETATTR_NAME_NET, 0);
1352
1353                 /* If this request gets a reconstructed reply, we won't be
1354                  * acquiring any new locks in mds_getattr_name, so we don't
1355                  * want to cancel.
1356                  */
1357                 lockh.cookie = 0;
1358                 rc = mds_getattr_name(0, req, &lockh);
1359                 if (rc == 0 && lockh.cookie)
1360                         ldlm_lock_decref(&lockh, LCK_PR);
1361                 break;
1362         }
1363         case MDS_STATFS:
1364                 DEBUG_REQ(D_INODE, req, "statfs");
1365                 OBD_FAIL_RETURN(OBD_FAIL_MDS_STATFS_NET, 0);
1366                 rc = mds_statfs(req);
1367                 break;
1368
1369         case MDS_READPAGE:
1370                 DEBUG_REQ(D_INODE, req, "readpage");
1371                 OBD_FAIL_RETURN(OBD_FAIL_MDS_READPAGE_NET, 0);
1372                 rc = mds_readpage(req);
1373
1374                 if (OBD_FAIL_CHECK(OBD_FAIL_MDS_SENDPAGE))
1375                         return 0;
1376                 break;
1377
1378         case MDS_REINT: {
1379                 __u32 *opcp = lustre_msg_buf(req->rq_reqmsg, 0, sizeof (*opcp));
1380                 __u32  opc;
1381                 int size[3] = {sizeof(struct mds_body), mds->mds_max_mdsize,
1382                                mds->mds_max_cookiesize};
1383                 int bufcount;
1384
1385                 /* NB only peek inside req now; mds_reint() will swab it */
1386                 if (opcp == NULL) {
1387                         CERROR ("Can't inspect opcode\n");
1388                         rc = -EINVAL;
1389                         break;
1390                 }
1391                 opc = *opcp;
1392                 if (lustre_msg_swabbed (req->rq_reqmsg))
1393                         __swab32s(&opc);
1394
1395                 DEBUG_REQ(D_INODE, req, "reint %d (%s)", opc,
1396                           (opc < sizeof(reint_names) / sizeof(reint_names[0]) ||
1397                            reint_names[opc] == NULL) ? reint_names[opc] :
1398                                                        "unknown opcode");
1399
1400                 OBD_FAIL_RETURN(OBD_FAIL_MDS_REINT_NET, 0);
1401
1402                 if (opc == REINT_UNLINK)
1403                         bufcount = 3;
1404                 else if (opc == REINT_OPEN)
1405                         bufcount = 2;
1406                 else
1407                         bufcount = 1;
1408
1409                 rc = lustre_pack_msg(bufcount, size, NULL,
1410                                      &req->rq_replen, &req->rq_repmsg);
1411                 if (rc)
1412                         break;
1413
1414                 rc = mds_reint(req, 0, NULL);
1415                 OBD_FAIL_RETURN(OBD_FAIL_MDS_REINT_NET_REP, 0);
1416                 break;
1417         }
1418
1419         case MDS_CLOSE:
1420                 DEBUG_REQ(D_INODE, req, "close");
1421                 OBD_FAIL_RETURN(OBD_FAIL_MDS_CLOSE_NET, 0);
1422                 rc = mds_close(req);
1423                 break;
1424
1425         case MDS_PIN:
1426                 DEBUG_REQ(D_INODE, req, "pin");
1427                 OBD_FAIL_RETURN(OBD_FAIL_MDS_PIN_NET, 0);
1428                 rc = mds_pin(req);
1429                 break;
1430
1431         case OBD_PING:
1432                 DEBUG_REQ(D_INODE, req, "ping");
1433                 rc = target_handle_ping(req);
1434                 break;
1435
1436         case OBD_LOG_CANCEL:
1437                 CDEBUG(D_INODE, "log cancel\n");
1438                 OBD_FAIL_RETURN(OBD_FAIL_OBD_LOG_CANCEL_NET, 0);
1439                 rc = -ENOTSUPP; /* la la la */
1440                 break;
1441
1442         case LDLM_ENQUEUE:
1443                 DEBUG_REQ(D_INODE, req, "enqueue");
1444                 OBD_FAIL_RETURN(OBD_FAIL_LDLM_ENQUEUE, 0);
1445                 rc = ldlm_handle_enqueue(req, ldlm_server_completion_ast,
1446                                          ldlm_server_blocking_ast);
1447                 break;
1448         case LDLM_CONVERT:
1449                 DEBUG_REQ(D_INODE, req, "convert");
1450                 OBD_FAIL_RETURN(OBD_FAIL_LDLM_CONVERT, 0);
1451                 rc = ldlm_handle_convert(req);
1452                 break;
1453         case LDLM_BL_CALLBACK:
1454         case LDLM_CP_CALLBACK:
1455                 DEBUG_REQ(D_INODE, req, "callback");
1456                 CERROR("callbacks should not happen on MDS\n");
1457                 LBUG();
1458                 OBD_FAIL_RETURN(OBD_FAIL_LDLM_BL_CALLBACK, 0);
1459                 break;
1460         default:
1461                 req->rq_status = -ENOTSUPP;
1462                 rc = ptlrpc_error(req);
1463                 RETURN(rc);
1464         }
1465
1466         EXIT;
1467
1468         /* If we're DISCONNECTing, the mds_export_data is already freed */
1469         if (!rc && req->rq_reqmsg->opc != MDS_DISCONNECT) {
1470                 struct mds_export_data *med = &req->rq_export->exp_mds_data;
1471                 struct obd_device *obd = list_entry(mds, struct obd_device,
1472                                                     u.mds);
1473                 req->rq_repmsg->last_xid =
1474                         le64_to_cpu(med->med_mcd->mcd_last_xid);
1475
1476                 if (!obd->obd_no_transno) {
1477                         req->rq_repmsg->last_committed =
1478                                 obd->obd_last_committed;
1479                 } else {
1480                         DEBUG_REQ(D_IOCTL, req,
1481                                   "not sending last_committed update");
1482                 }
1483                 CDEBUG(D_INFO, "last_transno "LPU64", last_committed "LPU64
1484                        ", xid "LPU64"\n",
1485                        mds->mds_last_transno, obd->obd_last_committed,
1486                        req->rq_xid);
1487         }
1488  out:
1489
1490         if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_LAST_REPLAY) {
1491                 if (obd && obd->obd_recovering) {
1492                         DEBUG_REQ(D_HA, req, "LAST_REPLAY, queuing reply");
1493                         return target_queue_final_reply(req, rc);
1494                 }
1495                 /* Lost a race with recovery; let the error path DTRT. */
1496                 rc = req->rq_status = -ENOTCONN;
1497         }
1498
1499         target_send_reply(req, rc, OBD_FAIL_MDS_ALL_REPLY_NET);
1500         return 0;
1501 }
1502
1503 /* Update the server data on disk.  This stores the new mount_count and
1504  * also the last_rcvd value to disk.  If we don't have a clean shutdown,
1505  * then the server last_rcvd value may be less than that of the clients.
1506  * This will alert us that we may need to do client recovery.
1507  *
1508  * Also assumes for mds_last_transno that we are not modifying it (no locking).
1509  */
1510 int mds_update_server_data(struct obd_device *obd)
1511 {
1512         struct mds_obd *mds = &obd->u.mds;
1513         struct mds_server_data *msd = mds->mds_server_data;
1514         struct file *filp = mds->mds_rcvd_filp;
1515         struct obd_run_ctxt saved;
1516         loff_t off = 0;
1517         int rc;
1518
1519         push_ctxt(&saved, &mds->mds_ctxt, NULL);
1520         msd->msd_last_transno = cpu_to_le64(mds->mds_last_transno);
1521         msd->msd_mount_count = cpu_to_le64(mds->mds_mount_count);
1522
1523         CDEBUG(D_SUPER, "MDS mount_count is "LPU64", last_transno is "LPU64"\n",
1524                mds->mds_mount_count, mds->mds_last_transno);
1525         rc = fsfilt_write_record(obd, filp, msd, sizeof(*msd), &off);
1526         if (rc != sizeof(*msd)) {
1527                 CERROR("error writing MDS server data: rc = %d\n", rc);
1528                 if (rc > 0)
1529                         rc = -EIO;
1530                 GOTO(out, rc);
1531         }
1532         rc = file_fsync(filp, filp->f_dentry, 1);
1533         if (rc)
1534                 CERROR("error flushing MDS server data: rc = %d\n", rc);
1535
1536 out:
1537         pop_ctxt(&saved, &mds->mds_ctxt, NULL);
1538         RETURN(rc);
1539 }
1540
1541 /* mount the file system (secretly) */
1542 static int mds_setup(struct obd_device *obd, obd_count len, void *buf)
1543 {
1544         struct obd_ioctl_data* data = buf;
1545         struct mds_obd *mds = &obd->u.mds;
1546         struct vfsmount *mnt;
1547         int rc = 0;
1548         unsigned long page;
1549         ENTRY;
1550
1551
1552         dev_clear_rdonly(2);
1553         if (!data->ioc_inlbuf1 || !data->ioc_inlbuf2)
1554                 RETURN(rc = -EINVAL);
1555
1556         if (data->ioc_inlbuf4)
1557                 obd_str2uuid(&mds->mds_osc_uuid, data->ioc_inlbuf4);
1558
1559         obd->obd_fsops = fsfilt_get_ops(data->ioc_inlbuf2);
1560         if (IS_ERR(obd->obd_fsops))
1561                 RETURN(rc = PTR_ERR(obd->obd_fsops));
1562
1563
1564         if (data->ioc_inllen3 > 0 && data->ioc_inlbuf3) {
1565                 if (*data->ioc_inlbuf3 == '/') {
1566                         CERROR("mds namespace mount: %s\n", 
1567                                data->ioc_inlbuf3);
1568 //                        mds->mds_nspath = strdup(ioc->inlbuf4);
1569                 } else {
1570                         CERROR("namespace mount must be absolute path: '%s'\n",
1571                                data->ioc_inlbuf3);
1572                 }
1573         }
1574
1575         if (!(page = __get_free_page(GFP_KERNEL)))
1576                 return -ENOMEM;
1577
1578         memset((void *)page, 0, PAGE_SIZE);
1579         sprintf((char *)page, "iopen_nopriv");
1580
1581         mnt = do_kern_mount(data->ioc_inlbuf2, 0,
1582                             data->ioc_inlbuf1, (void *)page);
1583         free_page(page);
1584         if (IS_ERR(mnt)) {
1585                 rc = PTR_ERR(mnt);
1586                 CERROR("do_kern_mount failed: rc = %d\n", rc);
1587                 GOTO(err_ops, rc);
1588         }
1589
1590         CDEBUG(D_SUPER, "%s: mnt = %p\n", data->ioc_inlbuf1, mnt);
1591         mds->mds_sb = mnt->mnt_root->d_inode->i_sb;
1592         if (!mds->mds_sb)
1593                 GOTO(err_put, rc = -ENODEV);
1594
1595         spin_lock_init(&mds->mds_transno_lock);
1596         mds->mds_max_mdsize = sizeof(struct lov_mds_md);
1597         mds->mds_max_cookiesize = sizeof(struct llog_cookie);
1598         rc = mds_fs_setup(obd, mnt);
1599         if (rc) {
1600                 CERROR("MDS filesystem method init failed: rc = %d\n", rc);
1601                 GOTO(err_put, rc);
1602         }
1603
1604 #ifdef ENABLE_ORPHANS
1605         rc = llog_start_commit_thread();
1606         if (rc < 0)
1607                 GOTO(err_fs, rc);
1608 #endif
1609
1610 #ifdef ENABLE_ORPHANS
1611         mds->mds_catalog = mds_get_catalog(obd);
1612         if (IS_ERR(mds->mds_catalog))
1613                 GOTO(err_fs, rc = PTR_ERR(mds->mds_catalog));
1614 #endif
1615
1616         obd->obd_namespace = ldlm_namespace_new("mds_server",
1617                                                 LDLM_NAMESPACE_SERVER);
1618         if (obd->obd_namespace == NULL) {
1619                 mds_cleanup(obd, 0);
1620                 GOTO(err_log, rc = -ENOMEM);
1621         }
1622
1623         ptlrpc_init_client(LDLM_CB_REQUEST_PORTAL, LDLM_CB_REPLY_PORTAL,
1624                            "mds_ldlm_client", &obd->obd_ldlm_client);
1625
1626         mds->mds_has_lov_desc = 0;
1627         obd->obd_replayable = 1;
1628
1629         RETURN(0);
1630
1631 err_log:
1632 #ifdef ENABLE_ORPHANS
1633         mds_put_catalog(mds->mds_catalog);
1634         /* No extra cleanup needed for llog_init_commit_thread() */
1635 err_fs:
1636 #endif
1637         mds_fs_cleanup(obd, 0);
1638 err_put:
1639         unlock_kernel();
1640         mntput(mds->mds_vfsmnt);
1641         mds->mds_sb = 0;
1642         lock_kernel();
1643 err_ops:
1644         fsfilt_put_ops(obd->obd_fsops);
1645         return rc;
1646 }
1647
1648 static int mds_cleanup(struct obd_device *obd, int flags)
1649 {
1650         struct mds_obd *mds = &obd->u.mds;
1651         ENTRY;
1652
1653         if (mds->mds_sb == NULL)
1654                 RETURN(0);
1655
1656 #ifdef ENABLE_ORPHANS
1657         mds_put_catalog(mds->mds_catalog);
1658 #endif
1659         if (mds->mds_osc_obd)
1660                 obd_disconnect(&mds->mds_osc_conn, flags);
1661         mds_update_server_data(obd);
1662         mds_fs_cleanup(obd, flags);
1663
1664         unlock_kernel();
1665
1666         /* 2 seems normal on mds, (may_umount() also expects 2
1667           fwiw), but we only see 1 at this point in obdfilter. */
1668         if (atomic_read(&obd->u.mds.mds_vfsmnt->mnt_count) > 2)
1669                 CERROR("%s: mount point busy, mnt_count: %d\n", obd->obd_name,
1670                        atomic_read(&obd->u.mds.mds_vfsmnt->mnt_count));
1671
1672         mntput(mds->mds_vfsmnt);
1673         mds->mds_sb = 0;
1674
1675         ldlm_namespace_free(obd->obd_namespace);
1676
1677         if (obd->obd_recovering)
1678                 target_cancel_recovery_timer(obd);
1679         lock_kernel();
1680         dev_clear_rdonly(2);
1681         fsfilt_put_ops(obd->obd_fsops);
1682
1683         RETURN(0);
1684 }
1685
1686 static void fixup_handle_for_resent_req(struct ptlrpc_request *req,
1687                                         struct ldlm_lock *new_lock,
1688                                         struct lustre_handle *lockh)
1689 {
1690         struct obd_export *exp = req->rq_export;
1691         struct obd_device *obd = exp->exp_obd;
1692         struct ldlm_request *dlmreq =
1693                 lustre_msg_buf(req->rq_reqmsg, 0, sizeof (*dlmreq));
1694         struct lustre_handle remote_hdl = dlmreq->lock_handle1;
1695         struct list_head *iter;
1696
1697         if (!(lustre_msg_get_flags(req->rq_reqmsg) & MSG_RESENT))
1698                 return;
1699
1700         l_lock(&obd->obd_namespace->ns_lock);
1701         list_for_each(iter, &exp->exp_ldlm_data.led_held_locks) {
1702                 struct ldlm_lock *lock;
1703                 lock = list_entry(iter, struct ldlm_lock, l_export_chain);
1704                 if (lock == new_lock)
1705                         continue;
1706                 if (lock->l_remote_handle.cookie == remote_hdl.cookie) {
1707                         lockh->cookie = lock->l_handle.h_cookie;
1708                         DEBUG_REQ(D_HA, req, "restoring lock cookie "LPX64,
1709                                   lockh->cookie);
1710                         l_unlock(&obd->obd_namespace->ns_lock);
1711                         return;
1712                 }
1713
1714         }
1715         l_unlock(&obd->obd_namespace->ns_lock);
1716         DEBUG_REQ(D_HA, req, "no existing lock with rhandle "LPX64,
1717                   remote_hdl.cookie);
1718 }
1719
1720 int intent_disposition(struct ldlm_reply *rep, int flag)
1721 {
1722         if (!rep)
1723                 return 0;
1724         return (rep->lock_policy_res1 & flag);
1725 }
1726
1727 void intent_set_disposition(struct ldlm_reply *rep, int flag)
1728 {
1729         if (!rep)
1730                 return;
1731         rep->lock_policy_res1 |= flag;
1732 }
1733
1734 static int ldlm_intent_policy(struct ldlm_namespace *ns,
1735                               struct ldlm_lock **lockp, void *req_cookie,
1736                               ldlm_mode_t mode, int flags, void *data)
1737 {
1738         struct ptlrpc_request *req = req_cookie;
1739         struct ldlm_lock *lock = *lockp;
1740         ENTRY;
1741
1742         if (!req_cookie)
1743                 RETURN(0);
1744
1745         if (req->rq_reqmsg->bufcount > 1) {
1746                 /* an intent needs to be considered */
1747                 struct ldlm_intent *it;
1748                 struct mds_obd *mds = &req->rq_export->exp_obd->u.mds;
1749                 struct ldlm_reply *rep;
1750                 struct lustre_handle lockh;
1751                 struct ldlm_lock *new_lock;
1752                 int offset = 2, repsize[4] = {sizeof(struct ldlm_reply),
1753                                               sizeof(struct mds_body),
1754                                               mds->mds_max_mdsize,
1755                                               mds->mds_max_cookiesize};
1756
1757                 it = lustre_swab_reqbuf(req, 1, sizeof (*it),
1758                                         lustre_swab_ldlm_intent);
1759                 if (it == NULL) {
1760                         CERROR ("Intent missing\n");
1761                         req->rq_status = -EFAULT;
1762                         RETURN(req->rq_status);
1763                 }
1764
1765                 LDLM_DEBUG(lock, "intent policy, opc: %s",
1766                            ldlm_it2str(it->opc));
1767
1768                 req->rq_status = lustre_pack_msg(it->opc == IT_UNLINK ? 4 : 3,
1769                                                  repsize, NULL, &req->rq_replen,
1770                                                  &req->rq_repmsg);
1771                 if (req->rq_status)
1772                         RETURN(req->rq_status);
1773
1774                 rep = lustre_msg_buf(req->rq_repmsg, 0, sizeof (*rep));
1775                 intent_set_disposition(rep, DISP_IT_EXECD);
1776
1777                 fixup_handle_for_resent_req(req, lock, &lockh);
1778
1779                 /* execute policy */
1780                 switch ((long)it->opc) {
1781                 case IT_OPEN:
1782                 case IT_CREAT|IT_OPEN:
1783                         /* XXX swab here to assert that an mds_open reint
1784                          * packet is following */
1785                         rep->lock_policy_res2 = mds_reint(req, offset, &lockh);
1786                         /* We abort the lock if the lookup was negative and
1787                          * we did not make it to the OPEN portion */
1788                         if (!intent_disposition(rep, DISP_LOOKUP_EXECD))
1789                                 RETURN(ELDLM_LOCK_ABORTED);
1790                         if (intent_disposition(rep, DISP_LOOKUP_NEG) &&
1791                             !intent_disposition(rep, DISP_OPEN_OPEN))
1792                                 RETURN(ELDLM_LOCK_ABORTED);
1793                         break;
1794                 case IT_GETATTR:
1795                 case IT_LOOKUP:
1796                 case IT_READDIR:
1797                         rep->lock_policy_res2 = mds_getattr_name(offset, req,
1798                                                                  &lockh);
1799                         /* FIXME: we need to sit down and decide on who should
1800                          * set req->rq_status, who should return negative and
1801                          * positive return values, and what they all mean. 
1802                          * - replay: returns 0 & req->status is old status
1803                          * - otherwise: returns req->status */
1804                         if (!intent_disposition(rep, DISP_LOOKUP_POS) || 
1805                             rep->lock_policy_res2)
1806                                 RETURN(ELDLM_LOCK_ABORTED);
1807                         if (req->rq_status != 0) {
1808                                 rep->lock_policy_res2 = req->rq_status;
1809                                 RETURN(ELDLM_LOCK_ABORTED);
1810                         }
1811                         break;
1812                 default:
1813                         CERROR("Unhandled intent "LPD64"\n", it->opc);
1814                         LBUG();
1815                 }
1816
1817                 /* By this point, whatever function we called above must have
1818                  * either filled in 'lockh', been an intent replay, or returned
1819                  * an error.  We want to allow replayed RPCs to not get a lock,
1820                  * since we would just drop it below anyways because lock replay
1821                  * is done separately by the client afterwards.  For regular
1822                  * RPCs we want to give the new lock to the client instead of
1823                  * whatever lock it was about to get.
1824                  */
1825                 new_lock = ldlm_handle2lock(&lockh);
1826                 if (flags & LDLM_FL_INTENT_ONLY && !new_lock)
1827                         RETURN(ELDLM_LOCK_ABORTED);
1828
1829                 LASSERT(new_lock != NULL);
1830
1831                 /* If we've already given this lock to a client once, then we
1832                  * should have no readers or writers.  Otherwise, we should
1833                  * have one reader _or_ writer ref (which will be zeroed below
1834                  * before returning the lock to a client.
1835                  */
1836                 if (new_lock->l_export == req->rq_export)
1837                         LASSERT(new_lock->l_readers + new_lock->l_writers == 0);
1838                 else {
1839                         LASSERT(new_lock->l_export == NULL);
1840                         LASSERT(new_lock->l_readers + new_lock->l_writers == 1);
1841                 }
1842
1843                 /* If we're running an intent only, we want to abort the new
1844                  * lock, and let the client abort the original lock. */
1845                 if (flags & LDLM_FL_INTENT_ONLY) {
1846                         LDLM_DEBUG(lock, "INTENT_ONLY, aborting locks");
1847                         l_lock(&new_lock->l_resource->lr_namespace->ns_lock);
1848                         if (new_lock->l_readers)
1849                                 ldlm_lock_decref(&lockh, LCK_PR);
1850                         else
1851                                 ldlm_lock_decref(&lockh, LCK_PW);
1852                         l_unlock(&new_lock->l_resource->lr_namespace->ns_lock);
1853                         LDLM_LOCK_PUT(new_lock);
1854                         RETURN(ELDLM_LOCK_ABORTED);
1855                 }
1856
1857                 *lockp = new_lock;
1858
1859                 rep->lock_policy_res2 = req->rq_status;
1860
1861                 if (new_lock->l_export == req->rq_export) {
1862                         /* Already gave this to the client, which means that we
1863                          * reconstructed a reply. */
1864                         LASSERT(lustre_msg_get_flags(req->rq_reqmsg) &
1865                                 MSG_RESENT);
1866                         RETURN(ELDLM_LOCK_REPLACED);
1867                 }
1868
1869                 /* Fixup the lock to be given to the client */
1870                 l_lock(&new_lock->l_resource->lr_namespace->ns_lock);
1871                 new_lock->l_readers = 0;
1872                 new_lock->l_writers = 0;
1873
1874                 new_lock->l_export = class_export_get(req->rq_export);
1875                 list_add(&new_lock->l_export_chain,
1876                          &new_lock->l_export->exp_ldlm_data.led_held_locks);
1877
1878                 /* We don't need to worry about completion_ast (which isn't set
1879                  * in 'lock' yet anyways), because this lock is already
1880                  * granted. */
1881                 new_lock->l_blocking_ast = lock->l_blocking_ast;
1882
1883                 memcpy(&new_lock->l_remote_handle, &lock->l_remote_handle,
1884                        sizeof(lock->l_remote_handle));
1885
1886                 new_lock->l_flags &= ~(LDLM_FL_LOCAL | LDLM_FL_AST_SENT |
1887                                        LDLM_FL_CBPENDING);
1888
1889                 LDLM_LOCK_PUT(new_lock);
1890                 l_unlock(&new_lock->l_resource->lr_namespace->ns_lock);
1891
1892                 RETURN(ELDLM_LOCK_REPLACED);
1893         } else {
1894                 int size = sizeof(struct ldlm_reply);
1895                 if (lustre_pack_msg(1, &size, NULL, &req->rq_replen,
1896                                     &req->rq_repmsg)) {
1897                         LBUG();
1898                         RETURN(-ENOMEM);
1899                 }
1900         }
1901         RETURN(0);
1902 }
1903
1904 int mds_attach(struct obd_device *dev, obd_count len, void *data)
1905 {
1906         struct lprocfs_static_vars lvars;
1907
1908         lprocfs_init_multi_vars(0, &lvars);
1909         return lprocfs_obd_attach(dev, lvars.obd_vars);
1910 }
1911
1912 int mds_detach(struct obd_device *dev)
1913 {
1914         return lprocfs_obd_detach(dev);
1915 }
1916
1917 int mdt_attach(struct obd_device *dev, obd_count len, void *data)
1918 {
1919         struct lprocfs_static_vars lvars;
1920
1921         lprocfs_init_multi_vars(1, &lvars);
1922         return lprocfs_obd_attach(dev, lvars.obd_vars);
1923 }
1924
1925 int mdt_detach(struct obd_device *dev)
1926 {
1927         return lprocfs_obd_detach(dev);
1928 }
1929
1930 static int mdt_setup(struct obd_device *obddev, obd_count len, void *buf)
1931 {
1932         struct mds_obd *mds = &obddev->u.mds;
1933         int i, rc = 0;
1934         ENTRY;
1935
1936         mds->mds_service = ptlrpc_init_svc(MDS_NEVENTS, MDS_NBUFS,
1937                                            MDS_BUFSIZE, MDS_MAXREQSIZE,
1938                                            MDS_REQUEST_PORTAL, MDC_REPLY_PORTAL,
1939                                            mds_handle, "mds", obddev);
1940
1941         if (!mds->mds_service) {
1942                 CERROR("failed to start service\n");
1943                 RETURN(rc = -ENOMEM);
1944         }
1945
1946         for (i = 0; i < MDT_NUM_THREADS; i++) {
1947                 char name[32];
1948                 sprintf(name, "ll_mdt_%02d", i);
1949                 rc = ptlrpc_start_thread(obddev, mds->mds_service, name);
1950                 if (rc) {
1951                         CERROR("cannot start MDT thread #%d: rc %d\n", i, rc);
1952                         GOTO(err_thread, rc);
1953                 }
1954         }
1955
1956         mds->mds_setattr_service =
1957                 ptlrpc_init_svc(MDS_NEVENTS, MDS_NBUFS,
1958                                 MDS_BUFSIZE, MDS_MAXREQSIZE,
1959                                 MDS_SETATTR_PORTAL, MDC_REPLY_PORTAL,
1960                                 mds_handle, "mds_setattr", obddev);
1961         if (!mds->mds_setattr_service) {
1962                 CERROR("failed to start getattr service\n");
1963                 GOTO(err_thread, rc = -ENOMEM);
1964         }
1965
1966         for (i = 0; i < MDT_NUM_THREADS; i++) {
1967                 char name[32];
1968                 sprintf(name, "ll_mdt_attr_%02d", i);
1969                 rc = ptlrpc_start_thread(obddev, mds->mds_setattr_service,
1970                                          name);
1971                 if (rc) {
1972                         CERROR("cannot start MDT setattr thread #%d: rc %d\n",
1973                                i, rc);
1974                         GOTO(err_thread2, rc);
1975                 }
1976         }
1977
1978         mds->mds_readpage_service =
1979                 ptlrpc_init_svc(MDS_NEVENTS, MDS_NBUFS,
1980                                 MDS_BUFSIZE, MDS_MAXREQSIZE,
1981                                 MDS_READPAGE_PORTAL, MDC_REPLY_PORTAL,
1982                                 mds_handle, "mds_readpage", obddev);
1983         if (!mds->mds_readpage_service) {
1984                 CERROR("failed to start readpage service\n");
1985                 GOTO(err_thread2, rc = -ENOMEM);
1986         }
1987
1988         for (i = 0; i < MDT_NUM_THREADS; i++) {
1989                 char name[32];
1990                 sprintf(name, "ll_mdt_rdpg_%02d", i);
1991                 rc = ptlrpc_start_thread(obddev, mds->mds_readpage_service,
1992                                          name);
1993                 if (rc) {
1994                         CERROR("cannot start MDT readpage thread #%d: rc %d\n",
1995                                i, rc);
1996                         GOTO(err_thread3, rc);
1997                 }
1998         }
1999
2000         RETURN(0);
2001
2002 err_thread3:
2003         ptlrpc_stop_all_threads(mds->mds_readpage_service);
2004         ptlrpc_unregister_service(mds->mds_readpage_service);
2005 err_thread2:
2006         ptlrpc_stop_all_threads(mds->mds_setattr_service);
2007         ptlrpc_unregister_service(mds->mds_setattr_service);
2008 err_thread:
2009         ptlrpc_stop_all_threads(mds->mds_service);
2010         ptlrpc_unregister_service(mds->mds_service);
2011         return rc;
2012 }
2013
2014
2015 static int mdt_cleanup(struct obd_device *obddev, int flags)
2016 {
2017         struct mds_obd *mds = &obddev->u.mds;
2018         ENTRY;
2019
2020         ptlrpc_stop_all_threads(mds->mds_readpage_service);
2021         ptlrpc_unregister_service(mds->mds_readpage_service);
2022
2023         ptlrpc_stop_all_threads(mds->mds_setattr_service);
2024         ptlrpc_unregister_service(mds->mds_setattr_service);
2025
2026         ptlrpc_stop_all_threads(mds->mds_service);
2027         ptlrpc_unregister_service(mds->mds_service);
2028
2029         RETURN(0);
2030 }
2031
2032 extern int mds_iocontrol(unsigned int cmd, struct lustre_handle *conn,
2033                          int len, void *karg, void *uarg);
2034
2035 /* use obd ops to offer management infrastructure */
2036 static struct obd_ops mds_obd_ops = {
2037         o_owner:       THIS_MODULE,
2038         o_attach:      mds_attach,
2039         o_detach:      mds_detach,
2040         o_connect:     mds_connect,
2041         o_disconnect:  mds_disconnect,
2042         o_setup:       mds_setup,
2043         o_cleanup:     mds_cleanup,
2044         o_statfs:      mds_obd_statfs,
2045         o_iocontrol:   mds_iocontrol
2046 };
2047
2048 static struct obd_ops mdt_obd_ops = {
2049         o_owner:       THIS_MODULE,
2050         o_attach:      mdt_attach,
2051         o_detach:      mdt_detach,
2052         o_setup:       mdt_setup,
2053         o_cleanup:     mdt_cleanup,
2054 };
2055
2056
2057 static int __init mds_init(void)
2058 {
2059         struct lprocfs_static_vars lvars;
2060
2061         lprocfs_init_multi_vars(0, &lvars);
2062         class_register_type(&mds_obd_ops, lvars.module_vars, LUSTRE_MDS_NAME);
2063         lprocfs_init_multi_vars(1, &lvars);
2064         class_register_type(&mdt_obd_ops, lvars.module_vars, LUSTRE_MDT_NAME);
2065         ldlm_register_intent(ldlm_intent_policy);
2066
2067         return 0;
2068 }
2069
2070 static void /*__exit*/ mds_exit(void)
2071 {
2072         ldlm_unregister_intent();
2073         class_unregister_type(LUSTRE_MDS_NAME);
2074         class_unregister_type(LUSTRE_MDT_NAME);
2075 }
2076
2077 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
2078 MODULE_DESCRIPTION("Lustre Metadata Server (MDS)");
2079 MODULE_LICENSE("GPL");
2080
2081 module_init(mds_init);
2082 module_exit(mds_exit);