Whamcloud - gitweb
Exit early from mds_open() if we get an error.
[fs/lustre-release.git] / lustre / mds / handler.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  lustre/mds/handler.c
5  *  Lustre Metadata Server (mds) request handler
6  *
7  *  Copyright (c) 2001-2003 Cluster File Systems, Inc.
8  *   Author: Peter Braam <braam@clusterfs.com>
9  *   Author: Andreas Dilger <adilger@clusterfs.com>
10  *   Author: Phil Schwan <phil@clusterfs.com>
11  *   Author: Mike Shaver <shaver@clusterfs.com>
12  *
13  *   This file is part of Lustre, http://www.lustre.org.
14  *
15  *   Lustre is free software; you can redistribute it and/or
16  *   modify it under the terms of version 2 of the GNU General Public
17  *   License as published by the Free Software Foundation.
18  *
19  *   Lustre is distributed in the hope that it will be useful,
20  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
21  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
22  *   GNU General Public License for more details.
23  *
24  *   You should have received a copy of the GNU General Public License
25  *   along with Lustre; if not, write to the Free Software
26  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
27  */
28
29 #define EXPORT_SYMTAB
30 #define DEBUG_SUBSYSTEM S_MDS
31
32 #include <linux/module.h>
33 #include <linux/lustre_mds.h>
34 #include <linux/lustre_dlm.h>
35 #include <linux/init.h>
36 #include <linux/obd_class.h>
37 #include <linux/random.h>
38 #include <linux/fs.h>
39 #include <linux/jbd.h>
40 #include <linux/ext3_fs.h>
41 #if (LINUX_VERSION_CODE >= KERNEL_VERSION(2,5,0))
42 # include <linux/smp_lock.h>
43 # include <linux/buffer_head.h>
44 # include <linux/workqueue.h>
45 # include <linux/mount.h>
46 #else
47 # include <linux/locks.h>
48 #endif
49 #include <linux/obd_lov.h>
50 #include <linux/lustre_mds.h>
51 #include <linux/lustre_fsfilt.h>
52 #include <linux/lprocfs_status.h>
53 #include <linux/lustre_commit_confd.h>
54
55 #include "mds_internal.h"
56
57 static int mds_cleanup(struct obd_device *obd, int flags);
58
59 static int mds_bulk_timeout(void *data)
60 {
61         struct ptlrpc_bulk_desc *desc = data;
62         struct obd_export *exp = desc->bd_export;
63
64         CERROR("bulk send timed out: evicting %s@%s\n",
65                exp->exp_client_uuid.uuid,
66                exp->exp_connection->c_remote_uuid.uuid);
67         ptlrpc_fail_export(exp);
68         ptlrpc_abort_bulk (desc);
69         RETURN(1);
70 }
71
72 /* Assumes caller has already pushed into the kernel filesystem context */
73 static int mds_sendpage(struct ptlrpc_request *req, struct file *file,
74                         __u64 offset, __u64 xid)
75 {
76         struct ptlrpc_bulk_desc *desc;
77         struct l_wait_info lwi;
78         struct page *page;
79         int rc = 0;
80         ENTRY;
81
82         LASSERT ((offset & (PAGE_CACHE_SIZE - 1)) == 0);
83
84         desc = ptlrpc_prep_bulk_exp (req, BULK_PUT_SOURCE, MDS_BULK_PORTAL);
85         if (desc == NULL)
86                 GOTO(out, rc = -ENOMEM);
87
88         LASSERT (PAGE_SIZE == PAGE_CACHE_SIZE);
89         page = alloc_pages (GFP_KERNEL, 0);
90         if (page == NULL)
91                 GOTO(cleanup_bulk, rc = -ENOMEM);
92
93         rc = ptlrpc_prep_bulk_page(desc, page, 0, PAGE_CACHE_SIZE);
94         if (rc != 0)
95                 GOTO(cleanup_buf, rc);
96
97         CDEBUG(D_EXT2, "reading %lu@"LPU64" from dir %lu (size %llu)\n",
98                PAGE_CACHE_SIZE, offset, file->f_dentry->d_inode->i_ino,
99                file->f_dentry->d_inode->i_size);
100         rc = fsfilt_readpage(req->rq_export->exp_obd, file, page_address (page),
101                              PAGE_CACHE_SIZE, (loff_t *)&offset);
102
103         if (rc != PAGE_CACHE_SIZE)
104                 GOTO(cleanup_buf, rc = -EIO);
105
106         rc = ptlrpc_bulk_put(desc);
107         if (rc)
108                 GOTO(cleanup_buf, rc);
109
110         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_SENDPAGE)) {
111                 CERROR("obd_fail_loc=%x, fail operation rc=%d\n",
112                        OBD_FAIL_MDS_SENDPAGE, rc);
113                 ptlrpc_abort_bulk(desc);
114                 GOTO(cleanup_buf, rc);
115         }
116
117         lwi = LWI_TIMEOUT(obd_timeout * HZ, mds_bulk_timeout, desc);
118         rc = l_wait_event(desc->bd_waitq, ptlrpc_bulk_complete (desc), &lwi);
119         if (rc) {
120                 LASSERT (rc == -ETIMEDOUT);
121                 GOTO(cleanup_buf, rc);
122         }
123
124         EXIT;
125  cleanup_buf:
126         __free_pages (page, 0);
127  cleanup_bulk:
128         ptlrpc_free_bulk (desc);
129  out:
130         return rc;
131 }
132
133 /* only valid locked dentries or errors should be returned */
134 struct dentry *mds_fid2locked_dentry(struct obd_device *obd, struct ll_fid *fid,
135                                      struct vfsmount **mnt, int lock_mode,
136                                      struct lustre_handle *lockh)
137 {
138         struct mds_obd *mds = &obd->u.mds;
139         struct dentry *de = mds_fid2dentry(mds, fid, mnt), *retval = de;
140         struct ldlm_res_id res_id = { .name = {0} };
141         int flags = 0, rc;
142         ENTRY;
143
144         if (IS_ERR(de))
145                 RETURN(de);
146
147         res_id.name[0] = de->d_inode->i_ino;
148         res_id.name[1] = de->d_inode->i_generation;
149         rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace, NULL,
150                               res_id, LDLM_PLAIN, NULL, 0, lock_mode,
151                               &flags, ldlm_completion_ast,
152                               mds_blocking_ast, NULL, lockh);
153         if (rc != ELDLM_OK) {
154                 l_dput(de);
155                 retval = ERR_PTR(-ENOLCK); /* XXX translate ldlm code */
156         }
157
158         RETURN(retval);
159 }
160
161 #ifndef DCACHE_DISCONNECTED
162 #define DCACHE_DISCONNECTED DCACHE_NFSD_DISCONNECTED
163 #endif
164
165
166 /* Look up an entry by inode number. */
167 /* this function ONLY returns valid dget'd dentries with an initialized inode
168    or errors */
169 struct dentry *mds_fid2dentry(struct mds_obd *mds, struct ll_fid *fid,
170                               struct vfsmount **mnt)
171 {
172         char fid_name[32];
173         unsigned long ino = fid->id;
174         __u32 generation = fid->generation;
175         struct inode *inode;
176         struct dentry *result;
177
178         if (ino == 0)
179                 RETURN(ERR_PTR(-ESTALE));
180
181         snprintf(fid_name, sizeof(fid_name), "0x%lx", ino);
182
183         CDEBUG(D_DENTRY, "--> mds_fid2dentry: ino %lu, gen %u, sb %p\n",
184                ino, generation, mds->mds_sb);
185
186         /* under ext3 this is neither supposed to return bad inodes
187            nor NULL inodes. */
188         result = ll_lookup_one_len(fid_name, mds->mds_fid_de, strlen(fid_name));
189         if (IS_ERR(result))
190                 RETURN(result);
191
192         inode = result->d_inode;
193         if (!inode)
194                 RETURN(ERR_PTR(-ENOENT));
195
196         if (generation && inode->i_generation != generation) {
197                 /* we didn't find the right inode.. */
198                 CERROR("bad inode %lu, link: %d ct: %d or generation %u/%u\n",
199                        inode->i_ino, inode->i_nlink,
200                        atomic_read(&inode->i_count), inode->i_generation,
201                        generation);
202                 dput(result);
203                 RETURN(ERR_PTR(-ENOENT));
204         }
205
206         if (mnt) {
207                 *mnt = mds->mds_vfsmnt;
208                 mntget(*mnt);
209         }
210
211         RETURN(result);
212 }
213
214
215 /* Establish a connection to the MDS.
216  *
217  * This will set up an export structure for the client to hold state data
218  * about that client, like open files, the last operation number it did
219  * on the server, etc.
220  */
221 static int mds_connect(struct lustre_handle *conn, struct obd_device *obd,
222                        struct obd_uuid *cluuid)
223 {
224         struct obd_export *exp;
225         struct mds_export_data *med;
226         struct mds_client_data *mcd;
227         int rc, abort_recovery;
228         ENTRY;
229
230         if (!conn || !obd || !cluuid)
231                 RETURN(-EINVAL);
232
233         /* Check for aborted recovery. */
234         spin_lock_bh(&obd->obd_processing_task_lock);
235         abort_recovery = obd->obd_abort_recovery;
236         spin_unlock_bh(&obd->obd_processing_task_lock);
237         if (abort_recovery)
238                 target_abort_recovery(obd);
239
240         /* XXX There is a small race between checking the list and adding a
241          * new connection for the same UUID, but the real threat (list
242          * corruption when multiple different clients connect) is solved.
243          *
244          * There is a second race between adding the export to the list,
245          * and filling in the client data below.  Hence skipping the case
246          * of NULL mcd above.  We should already be controlling multiple
247          * connects at the client, and we can't hold the spinlock over
248          * memory allocations without risk of deadlocking.
249          */
250         rc = class_connect(conn, obd, cluuid);
251         if (rc)
252                 RETURN(rc);
253         exp = class_conn2export(conn);
254         LASSERT(exp);
255         med = &exp->exp_mds_data;
256         class_export_put(exp);
257
258         OBD_ALLOC(mcd, sizeof(*mcd));
259         if (!mcd) {
260                 CERROR("mds: out of memory for client data\n");
261                 GOTO(out_export, rc = -ENOMEM);
262         }
263
264         memcpy(mcd->mcd_uuid, cluuid, sizeof(mcd->mcd_uuid));
265         med->med_mcd = mcd;
266
267         INIT_LIST_HEAD(&med->med_open_head);
268         spin_lock_init(&med->med_open_lock);
269
270         rc = mds_client_add(obd, &obd->u.mds, med, -1);
271         if (rc)
272                 GOTO(out_mcd, rc);
273
274         RETURN(0);
275
276 out_mcd:
277         OBD_FREE(mcd, sizeof(*mcd));
278 out_export:
279         class_disconnect(conn, 0);
280
281         return rc;
282 }
283
284 static void mds_mfd_addref(void *mfdp)
285 {
286         struct mds_file_data *mfd = mfdp;
287
288         atomic_inc(&mfd->mfd_refcount);
289         CDEBUG(D_INFO, "GETting mfd %p : new refcount %d\n", mfd,
290                atomic_read(&mfd->mfd_refcount));
291 }
292
293 struct mds_file_data *mds_mfd_new(void)
294 {
295         struct mds_file_data *mfd;
296
297         OBD_ALLOC(mfd, sizeof *mfd);
298         if (mfd == NULL) {
299                 CERROR("mds: out of memory\n");
300                 return NULL;
301         }
302
303         atomic_set(&mfd->mfd_refcount, 2);
304
305         INIT_LIST_HEAD(&mfd->mfd_handle.h_link);
306         class_handle_hash(&mfd->mfd_handle, mds_mfd_addref);
307
308         return mfd;
309 }
310
311 static struct mds_file_data *mds_handle2mfd(struct lustre_handle *handle)
312 {
313         ENTRY;
314         LASSERT(handle != NULL);
315         RETURN(class_handle2object(handle->cookie));
316 }
317
318 void mds_mfd_put(struct mds_file_data *mfd)
319 {
320         CDEBUG(D_INFO, "PUTting mfd %p : new refcount %d\n", mfd,
321                atomic_read(&mfd->mfd_refcount) - 1);
322         LASSERT(atomic_read(&mfd->mfd_refcount) > 0 &&
323                 atomic_read(&mfd->mfd_refcount) < 0x5a5a);
324         if (atomic_dec_and_test(&mfd->mfd_refcount)) {
325                 LASSERT(list_empty(&mfd->mfd_handle.h_link));
326                 OBD_FREE(mfd, sizeof *mfd);
327         }
328 }
329
330 void mds_mfd_destroy(struct mds_file_data *mfd)
331 {
332         class_handle_unhash(&mfd->mfd_handle);
333         mds_mfd_put(mfd);
334 }
335
336 /* Close a "file descriptor" and possibly unlink an orphan from the
337  * PENDING directory.
338  *
339  * If we are being called from mds_disconnect() because the client has
340  * disappeared, then req == NULL and we do not update last_rcvd because
341  * there is nothing that could be recovered by the client at this stage
342  * (it will not even _have_ an entry in last_rcvd anymore).
343  */
344 static int mds_mfd_close(struct ptlrpc_request *req, struct obd_device *obd,
345                          struct mds_file_data *mfd)
346 {
347         struct dentry *dparent = mfd->mfd_dentry->d_parent;
348         struct inode *child_inode = mfd->mfd_dentry->d_inode;
349         char fidname[LL_FID_NAMELEN];
350         int last_orphan, fidlen, rc = 0;
351         ENTRY;
352
353         if (dparent) {
354                 LASSERT(atomic_read(&dparent->d_count) > 0);
355                 dparent = dget(dparent);
356         }
357
358         fidlen = ll_fid2str(fidname, child_inode->i_ino,
359                             child_inode->i_generation);
360
361         last_orphan = mds_open_orphan_dec_test(child_inode) &&
362                 mds_inode_is_orphan(child_inode);
363
364         /* this is the actual "close" */
365         l_dput(mfd->mfd_dentry);
366         mds_mfd_destroy(mfd);
367
368         if (dparent)
369                 l_dput(dparent);
370
371         if (last_orphan) {
372                 struct mds_obd *mds = &obd->u.mds;
373                 struct inode *pending_dir = mds->mds_pending_dir->d_inode;
374                 struct dentry *pending_child = NULL;
375                 void *handle;
376
377                 CDEBUG(D_ERROR, "destroying orphan object %s\n", fidname);
378
379                 /* Sadly, there is no easy way to save pending_child from
380                  * mds_reint_unlink() into mfd, so we need to re-lookup,
381                  * but normally it will still be in the dcache.
382                  */
383                 down(&pending_dir->i_sem);
384                 pending_child = lookup_one_len(fidname, mds->mds_pending_dir,
385                                                fidlen);
386                 if (IS_ERR(pending_child))
387                         GOTO(out_lock, rc = PTR_ERR(pending_child));
388                 LASSERT(pending_child->d_inode != NULL);
389
390                 handle = fsfilt_start(obd, pending_dir, FSFILT_OP_UNLINK, NULL);
391                 if (IS_ERR(handle))
392                         GOTO(out_dput, rc = PTR_ERR(handle));
393                 rc = vfs_unlink(pending_dir, pending_child);
394                 if (rc)
395                         CERROR("error unlinking orphan %s: rc %d\n",fidname,rc);
396
397                 if (req) {
398                         rc = mds_finish_transno(mds, pending_dir, handle, req,
399                                                 rc, 0);
400                 } else {
401                         int err = fsfilt_commit(obd, pending_dir, handle, 0);
402                         if (err) {
403                                 CERROR("error committing orphan unlink: %d\n",
404                                        err);
405                                 if (!rc)
406                                         rc = err;
407                         }
408                 }
409         out_dput:
410                 dput(pending_child);
411         out_lock:
412                 up(&pending_dir->i_sem);
413         }
414
415         RETURN(rc);
416 }
417
418 static int mds_disconnect(struct lustre_handle *conn, int flags)
419 {
420         struct obd_export *export = class_conn2export(conn);
421         struct mds_export_data *med = &export->exp_mds_data;
422         struct obd_device *obd = export->exp_obd;
423         struct obd_run_ctxt saved;
424         int rc;
425         ENTRY;
426
427         push_ctxt(&saved, &obd->u.mds.mds_ctxt, NULL);
428         /* Close any open files (which may also cause orphan unlinking). */
429         spin_lock(&med->med_open_lock);
430         while (!list_empty(&med->med_open_head)) {
431                 struct list_head *tmp = med->med_open_head.next;
432                 struct mds_file_data *mfd =
433                         list_entry(tmp, struct mds_file_data, mfd_list);
434 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
435                 /* bug 1579: fix force-closing for 2.5 */
436                 struct dentry *dentry = mfd->mfd_dentry;
437
438                 list_del(&mfd->mfd_list);
439                 spin_unlock(&med->med_open_lock);
440
441                 CERROR("force closing client file handle for %*s (%s:%lu)\n",
442                        dentry->d_name.len, dentry->d_name.name,
443                        kdevname(dentry->d_inode->i_sb->s_dev),
444                        dentry->d_inode->i_ino);
445                 rc = mds_mfd_close(NULL, obd, mfd);
446 #endif
447                 if (rc)
448                         CDEBUG(D_INODE, "Error closing file: %d\n", rc);
449                 spin_lock(&med->med_open_lock);
450         }
451         spin_unlock(&med->med_open_lock);
452         pop_ctxt(&saved, &obd->u.mds.mds_ctxt, NULL);
453
454         ldlm_cancel_locks_for_export(export);
455         if (export->exp_outstanding_reply) {
456                 struct ptlrpc_request *req = export->exp_outstanding_reply;
457                 unsigned long          flags;
458
459                 /* Fake the ack, so the locks get cancelled. */
460                 LBUG ();
461                 /* Actually we can't do this because it prevents us knowing
462                  * if the ACK callback ran or not */
463                 spin_lock_irqsave (&req->rq_lock, flags);
464                 req->rq_want_ack = 0;
465                 req->rq_err = 1;
466                 wake_up(&req->rq_wait_for_rep);
467                 spin_unlock_irqrestore (&req->rq_lock, flags);
468
469                 export->exp_outstanding_reply = NULL;
470         }
471
472         if (!(flags & OBD_OPT_FAILOVER))
473                 mds_client_free(export);
474
475         rc = class_disconnect(conn, flags);
476         class_export_put(export);
477
478         RETURN(rc);
479 }
480
481 /*
482  * XXX This is NOT guaranteed to flush all transactions to disk (even though
483  *     it is equivalent to calling sync()) because it only _starts_ the flush
484  *     and does not wait for completion.  It's better than nothing though.
485  *     What we really want is a mild form of fsync_dev_lockfs(), but it is
486  *     non-standard, or enabling do_sync_supers in ext3, just for this call.
487  */
488 static void mds_fsync_super(struct super_block *sb)
489 {
490         lock_kernel();
491         lock_super(sb);
492 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
493         if (sb->s_dirt && sb->s_op && sb->s_op->write_super)
494                 sb->s_op->write_super(sb);
495 #else
496         if (sb->s_dirt && sb->s_op) {
497                 if (sb->s_op->sync_fs)
498                         sb->s_op->sync_fs(sb, 1);
499                 else if (sb->s_op->write_super)
500                         sb->s_op->write_super(sb);
501         }
502 #endif
503         unlock_super(sb);
504         unlock_kernel();
505 }
506
507 static int mds_getstatus(struct ptlrpc_request *req)
508 {
509         struct obd_device *obd = req->rq_export->exp_obd;
510         struct mds_obd *mds = mds_req2mds(req);
511         struct mds_body *body;
512         int rc, size = sizeof(*body);
513         ENTRY;
514
515         rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
516         if (rc || OBD_FAIL_CHECK(OBD_FAIL_MDS_GETSTATUS_PACK)) {
517                 CERROR("mds: out of memory for message: size=%d\n", size);
518                 req->rq_status = -ENOMEM;       /* superfluous? */
519                 RETURN(-ENOMEM);
520         }
521
522         /* Flush any outstanding transactions to disk so the client will
523          * get the latest last_committed value and can drop their local
524          * requests if they have any.  This would be fsync_super() if it
525          * was exported.
526          */
527         fsfilt_sync(obd, mds->mds_sb);
528
529         body = lustre_msg_buf(req->rq_repmsg, 0, sizeof (*body));
530         memcpy(&body->fid1, &mds->mds_rootfid, sizeof(body->fid1));
531
532         /* the last_committed and last_xid fields are filled in for all
533          * replies already - no need to do so here also.
534          */
535         RETURN(0);
536 }
537
538 static int mds_getlovinfo(struct ptlrpc_request *req)
539 {
540         struct mds_obd *mds = mds_req2mds(req);
541         struct mds_status_req *streq;
542         struct lov_desc *desc;
543         struct obd_uuid *uuid0;
544         int tgt_count;
545         int rc, size[2] = {sizeof(*desc)};
546         ENTRY;
547
548         streq = lustre_swab_reqbuf (req, 0, sizeof (*streq),
549                                     lustre_swab_mds_status_req);
550         if (streq == NULL) {
551                 CERROR ("Can't unpack mds_status_req\n");
552                 RETURN (-EFAULT);
553         }
554
555         if (streq->repbuf > LOV_MAX_UUID_BUFFER_SIZE) {
556                 CERROR ("Illegal request for uuid array > %d\n",
557                         streq->repbuf);
558                 RETURN (-EINVAL);
559         }
560         size[1] = streq->repbuf;
561
562         rc = lustre_pack_msg(2, size, NULL, &req->rq_replen, &req->rq_repmsg);
563         if (rc) {
564                 CERROR("mds: out of memory for message: size=%d\n", size[1]);
565                 RETURN(-ENOMEM);
566         }
567
568         if (!mds->mds_has_lov_desc) {
569                 req->rq_status = -ENOENT;
570                 RETURN(0);
571         }
572
573         /* XXX We're sending the lov_desc in my byte order.
574          * Receiver will swab... */
575         desc = lustre_msg_buf(req->rq_repmsg, 0, sizeof (*desc));
576         memcpy(desc, &mds->mds_lov_desc, sizeof (*desc));
577
578         tgt_count = mds->mds_lov_desc.ld_tgt_count;
579         uuid0 = lustre_msg_buf(req->rq_repmsg, 1, tgt_count * sizeof (*uuid0));
580         if (uuid0 == NULL) {
581                 CERROR("too many targets, enlarge client buffers\n");
582                 req->rq_status = -ENOSPC;
583                 RETURN(0);
584         }
585
586         rc = mds_get_lovtgts(mds, tgt_count, uuid0);
587         if (rc) {
588                 CERROR("get_lovtgts error %d\n", rc);
589                 req->rq_status = rc;
590                 RETURN(0);
591         }
592         memcpy(&mds->mds_osc_uuid, &mds->mds_lov_desc.ld_uuid,
593                sizeof(mds->mds_osc_uuid));
594         RETURN(0);
595 }
596
597 int mds_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc,
598                      void *data, int flag)
599 {
600         int do_ast;
601         ENTRY;
602
603         if (flag == LDLM_CB_CANCELING) {
604                 /* Don't need to do anything here. */
605                 RETURN(0);
606         }
607
608         /* XXX layering violation!  -phil */
609         l_lock(&lock->l_resource->lr_namespace->ns_lock);
610         /* Get this: if mds_blocking_ast is racing with ldlm_intent_policy,
611          * such that mds_blocking_ast is called just before l_i_p takes the
612          * ns_lock, then by the time we get the lock, we might not be the
613          * correct blocking function anymore.  So check, and return early, if
614          * so. */
615         if (lock->l_blocking_ast != mds_blocking_ast) {
616                 l_unlock(&lock->l_resource->lr_namespace->ns_lock);
617                 RETURN(0);
618         }
619
620         lock->l_flags |= LDLM_FL_CBPENDING;
621         do_ast = (!lock->l_readers && !lock->l_writers);
622         l_unlock(&lock->l_resource->lr_namespace->ns_lock);
623
624         if (do_ast) {
625                 struct lustre_handle lockh;
626                 int rc;
627
628                 LDLM_DEBUG(lock, "already unused, calling ldlm_cli_cancel");
629                 ldlm_lock2handle(lock, &lockh);
630                 rc = ldlm_cli_cancel(&lockh);
631                 if (rc < 0)
632                         CERROR("ldlm_cli_cancel: %d\n", rc);
633         } else {
634                 LDLM_DEBUG(lock, "Lock still has references, will be "
635                            "cancelled later");
636         }
637         RETURN(0);
638 }
639
640 int mds_pack_md(struct obd_device *obd, struct lustre_msg *msg,
641                 int offset, struct mds_body *body, struct inode *inode)
642 {
643         struct mds_obd *mds = &obd->u.mds;
644         struct lov_mds_md *lmm;
645         int lmm_size;
646         int rc;
647         ENTRY;
648
649         lmm = lustre_msg_buf(msg, offset, 0);
650         if (lmm == NULL) {
651                 /* Some problem with getting eadata when I sized the reply
652                  * buffer... */
653                 CDEBUG(D_INFO, "no space reserved for inode %lu MD\n",
654                        inode->i_ino);
655                 RETURN(0);
656         }
657         lmm_size = msg->buflens[offset];
658
659         /* I don't really like this, but it is a sanity check on the client
660          * MD request.  However, if the client doesn't know how much space
661          * to reserve for the MD, this shouldn't be fatal either...
662          */
663         if (lmm_size > mds->mds_max_mdsize) {
664                 CERROR("Reading MD for inode %lu of %d bytes > max %d\n",
665                        inode->i_ino, lmm_size, mds->mds_max_mdsize);
666                 // RETURN(-EINVAL);
667         }
668
669         rc = fsfilt_get_md(obd, inode, lmm, lmm_size);
670         if (rc < 0) {
671                 CERROR("Error %d reading eadata for ino %lu\n",
672                        rc, inode->i_ino);
673         } else if (rc > 0) {
674                 body->valid |= OBD_MD_FLEASIZE;
675                 body->eadatasize = rc;
676                 rc = 0;
677         }
678
679         RETURN(rc);
680 }
681
682 static int mds_getattr_internal(struct obd_device *obd, struct dentry *dentry,
683                                 struct ptlrpc_request *req,
684                                 struct mds_body *reqbody, int reply_off)
685 {
686         struct mds_body *body;
687         struct inode *inode = dentry->d_inode;
688         int rc = 0;
689         ENTRY;
690
691         if (inode == NULL)
692                 RETURN(-ENOENT);
693
694         body = lustre_msg_buf(req->rq_repmsg, reply_off, sizeof(*body));
695         LASSERT(body != NULL);                 /* caller prepped reply */
696
697         mds_pack_inode2fid(&body->fid1, inode);
698         mds_pack_inode2body(body, inode);
699
700         if (S_ISREG(inode->i_mode) && (reqbody->valid & OBD_MD_FLEASIZE) != 0) {
701                 rc = mds_pack_md(obd, req->rq_repmsg, reply_off+1, body, inode);
702
703                 /* If we have LOV EA data, the OST holds size, atime, mtime */
704                 if (!(body->valid & OBD_MD_FLEASIZE))
705                         body->valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS |
706                                         OBD_MD_FLATIME | OBD_MD_FLMTIME);
707         } else if (S_ISLNK(inode->i_mode) &&
708                    (reqbody->valid & OBD_MD_LINKNAME) != 0) {
709                 char *symname = lustre_msg_buf(req->rq_repmsg, reply_off + 1,0);
710                 int len;
711
712                 LASSERT (symname != NULL);       /* caller prepped reply */
713                 len = req->rq_repmsg->buflens[reply_off + 1];
714
715                 rc = inode->i_op->readlink(dentry, symname, len);
716                 if (rc < 0) {
717                         CERROR("readlink failed: %d\n", rc);
718                 } else if (rc != len - 1) {
719                         CERROR ("Unexpected readlink rc %d: expecting %d\n",
720                                 rc, len - 1);
721                         rc = -EINVAL;
722                 } else {
723                         CDEBUG(D_INODE, "read symlink dest %s\n", symname);
724                         body->valid |= OBD_MD_LINKNAME;
725                         body->eadatasize = rc + 1;
726                         symname[rc] = 0;        /* NULL terminate */
727                         rc = 0;
728                 }
729         }
730
731         RETURN(rc);
732 }
733
734 static int mds_getattr_pack_msg(struct ptlrpc_request *req, struct inode *inode,
735                                 int offset)
736 {
737         struct mds_obd *mds = mds_req2mds(req);
738         struct mds_body *body;
739         int rc = 0, size[2] = {sizeof(*body)}, bufcount = 1;
740         ENTRY;
741
742         body = lustre_msg_buf(req->rq_reqmsg, offset, sizeof (*body));
743         LASSERT(body != NULL);                 /* checked by caller */
744         LASSERT_REQSWABBED(req, offset);       /* swabbed by caller */
745
746         if (S_ISREG(inode->i_mode) && (body->valid & OBD_MD_FLEASIZE)) {
747                 int rc = fsfilt_get_md(req->rq_export->exp_obd, inode, NULL, 0);
748                 CDEBUG(D_INODE, "got %d bytes MD data for inode %lu\n",
749                        rc, inode->i_ino);
750                 if (rc < 0) {
751                         if (rc != -ENODATA)
752                                 CERROR("error getting inode %lu MD: rc = %d\n",
753                                        inode->i_ino, rc);
754                         size[bufcount] = 0;
755                 } else if (rc > mds->mds_max_mdsize) {
756                         size[bufcount] = 0;
757                         CERROR("MD size %d larger than maximum possible %u\n",
758                                rc, mds->mds_max_mdsize);
759                 } else {
760                         size[bufcount] = rc;
761                 }
762                 bufcount++;
763         } else if (S_ISLNK(inode->i_mode) && (body->valid & OBD_MD_LINKNAME)) {
764                 if (inode->i_size + 1 != body->eadatasize)
765                         CERROR("symlink size: %Lu, reply space: %d\n",
766                                inode->i_size + 1, body->eadatasize);
767                 size[bufcount] = MIN(inode->i_size + 1, body->eadatasize);
768                 bufcount++;
769                 CDEBUG(D_INODE, "symlink size: %Lu, reply space: %d\n",
770                        inode->i_size + 1, body->eadatasize);
771         }
772
773         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_GETATTR_PACK)) {
774                 CERROR("failed MDS_GETATTR_PACK test\n");
775                 req->rq_status = -ENOMEM;
776                 GOTO(out, rc = -ENOMEM);
777         }
778
779         rc = lustre_pack_msg(bufcount, size, NULL, &req->rq_replen,
780                              &req->rq_repmsg);
781         if (rc) {
782                 CERROR("out of memory\n");
783                 GOTO(out, req->rq_status = rc);
784         }
785
786         EXIT;
787  out:
788         return(rc);
789 }
790
791 /* This is more copy-and-paste from getattr_name than I'd like. */
792 static void reconstruct_getattr_name(int offset, struct ptlrpc_request *req,
793                                      struct lustre_handle *client_lockh)
794 {
795         struct mds_export_data *med = &req->rq_export->exp_mds_data;
796         struct mds_client_data *mcd = med->med_mcd;
797         struct obd_device *obd = req->rq_export->exp_obd;
798         struct mds_obd *mds = mds_req2mds(req);
799         struct dentry *parent, *child;
800         struct mds_body *body;
801         struct inode *dir;
802         struct obd_run_ctxt saved;
803         struct obd_ucred uc;
804         int namelen, rc = 0;
805         char *name;
806
807         req->rq_transno = mcd->mcd_last_transno;
808         req->rq_status = mcd->mcd_last_result;
809
810         LASSERT (req->rq_export->exp_outstanding_reply);
811
812         mds_steal_ack_locks(req->rq_export, req);
813
814         if (req->rq_status)
815                 return;
816
817         body = lustre_msg_buf(req->rq_reqmsg, offset, sizeof (*body));
818         LASSERT (body != NULL);                 /* checked by caller */
819         LASSERT_REQSWABBED (req, offset);       /* swabbed by caller */
820
821         name = lustre_msg_string(req->rq_reqmsg, offset + 1, 0);
822         LASSERT (name != NULL);                 /* checked by caller */
823         LASSERT_REQSWABBED (req, offset + 1);   /* swabbed by caller */
824         namelen = req->rq_reqmsg->buflens[offset + 1];
825
826         LASSERT (offset == 2 || offset == 0);
827         /* requests were at offset 2, replies go back at 1 */
828         if (offset)
829                 offset = 1;
830
831         uc.ouc_fsuid = body->fsuid;
832         uc.ouc_fsgid = body->fsgid;
833         uc.ouc_cap = body->capability;
834         uc.ouc_suppgid1 = body->suppgid;
835         uc.ouc_suppgid2 = -1;
836
837         push_ctxt(&saved, &mds->mds_ctxt, &uc);
838         parent = mds_fid2dentry(mds, &body->fid1, NULL);
839         LASSERT(!IS_ERR(parent));
840         dir = parent->d_inode;
841         LASSERT(dir);
842         child = ll_lookup_one_len(name, parent, namelen - 1);
843         LASSERT(!IS_ERR(child));
844
845         if (req->rq_repmsg == NULL) {
846                 rc = mds_getattr_pack_msg(req, child->d_inode, offset);
847                 /* XXX need to handle error here */
848                 LASSERT (rc == 0);
849         }
850
851         rc = mds_getattr_internal(obd, child, req, body, offset);
852         /* XXX need to handle error here */
853         LASSERT(!rc);
854         l_dput(child);
855         l_dput(parent);
856 }
857
858 static int mds_getattr_name(int offset, struct ptlrpc_request *req,
859                             struct lustre_handle *child_lockh)
860 {
861         struct mds_obd *mds = mds_req2mds(req);
862         struct obd_device *obd = req->rq_export->exp_obd;
863         struct ldlm_reply *rep = NULL;
864         struct obd_run_ctxt saved;
865         struct mds_body *body;
866         struct dentry *de = NULL, *dchild = NULL;
867         struct inode *dir;
868         struct obd_ucred uc;
869         struct ldlm_res_id child_res_id = { .name = {0} };
870         struct lustre_handle parent_lockh;
871         int namesize;
872         int flags = 0, rc = 0, cleanup_phase = 0;
873         char *name;
874         ENTRY;
875
876         LASSERT(!strcmp(obd->obd_type->typ_name, "mds"));
877
878         /* Swab now, before anyone looks inside the request */
879
880         body = lustre_swab_reqbuf(req, offset, sizeof(*body),
881                                   lustre_swab_mds_body);
882         if (body == NULL) {
883                 CERROR("Can't swab mds_body\n");
884                 GOTO(cleanup, rc = -EFAULT);
885         }
886
887         LASSERT_REQSWAB(req, offset + 1);
888         name = lustre_msg_string(req->rq_reqmsg, offset + 1, 0);
889         if (name == NULL) {
890                 CERROR("Can't unpack name\n");
891                 GOTO(cleanup, rc = -EFAULT);
892         }
893         namesize = req->rq_reqmsg->buflens[offset + 1];
894
895         if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_RESENT) {
896                 struct obd_export *exp = req->rq_export;
897                 if (exp->exp_outstanding_reply &&
898                     exp->exp_outstanding_reply->rq_xid == req->rq_xid) {
899                         reconstruct_getattr_name(offset, req, child_lockh);
900                         RETURN(0);
901                 }
902                 DEBUG_REQ(D_HA, req, "no reply for RESENT req (have "LPD64")",
903                           exp->exp_outstanding_reply ?
904                           exp->exp_outstanding_reply->rq_xid : (u64)0);
905         }
906
907         LASSERT (offset == 0 || offset == 2);
908         /* if requests were at offset 2, the getattr reply goes back at 1 */
909         if (offset) { 
910                 rep = lustre_msg_buf(req->rq_repmsg, 0, sizeof (*rep));
911                 offset = 1;
912         }
913
914         uc.ouc_fsuid = body->fsuid;
915         uc.ouc_fsgid = body->fsgid;
916         uc.ouc_cap = body->capability;
917         uc.ouc_suppgid1 = body->suppgid;
918         uc.ouc_suppgid2 = -1;
919         push_ctxt(&saved, &mds->mds_ctxt, &uc);
920         /* Step 1: Lookup/lock parent */
921         intent_set_disposition(rep, DISP_LOOKUP_EXECD);
922         de = mds_fid2locked_dentry(obd, &body->fid1, NULL, LCK_PR,
923                                    &parent_lockh);
924         if (IS_ERR(de))
925                 GOTO(cleanup, rc = PTR_ERR(de));
926         dir = de->d_inode;
927         LASSERT(dir);
928
929         cleanup_phase = 1; /* parent dentry and lock */
930
931         CDEBUG(D_INODE, "parent ino %lu, name %s\n", dir->i_ino, name);
932
933         /* Step 2: Lookup child */
934         dchild = ll_lookup_one_len(name, de, namesize - 1);
935         if (IS_ERR(dchild)) {
936                 CDEBUG(D_INODE, "child lookup error %ld\n", PTR_ERR(dchild));
937                 GOTO(cleanup, rc = PTR_ERR(dchild));
938         }
939
940         cleanup_phase = 2; /* child dentry */
941
942         if (dchild->d_inode == NULL) {
943                 intent_set_disposition(rep, DISP_LOOKUP_NEG);
944                 GOTO(cleanup, rc = -ENOENT);
945         } else {
946                 intent_set_disposition(rep, DISP_LOOKUP_POS);
947         }
948
949         /* Step 3: Lock child */
950         child_res_id.name[0] = dchild->d_inode->i_ino;
951         child_res_id.name[1] = dchild->d_inode->i_generation;
952         rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace, NULL,
953                               child_res_id, LDLM_PLAIN, NULL, 0, LCK_PR,
954                               &flags, ldlm_completion_ast, mds_blocking_ast,
955                               NULL, child_lockh);
956         if (rc != ELDLM_OK) {
957                 CERROR("ldlm_cli_enqueue: %d\n", rc);
958                 GOTO(cleanup, rc = -EIO);
959         }
960
961         cleanup_phase = 3; /* child lock */
962
963         if (req->rq_repmsg == NULL) {
964                 rc = mds_getattr_pack_msg(req, dchild->d_inode, offset);
965                 if (rc != 0) {
966                         CERROR ("mds_getattr_pack_msg: %d\n", rc);
967                         GOTO (cleanup, rc);
968                 }
969         }
970
971         rc = mds_getattr_internal(obd, dchild, req, body, offset);
972         GOTO(cleanup, rc); /* returns the lock to the client */
973
974  cleanup:
975         switch (cleanup_phase) {
976         case 3:
977                 if (rc)
978                         ldlm_lock_decref(child_lockh, LCK_PR);
979         case 2:
980                 l_dput(dchild);
981
982         case 1:
983                 if (rc) {
984                         ldlm_lock_decref(&parent_lockh, LCK_PR);
985                 } else {
986                         memcpy(&req->rq_ack_locks[0].lock, &parent_lockh,
987                                sizeof(parent_lockh));
988                         req->rq_ack_locks[0].mode = LCK_PR;
989                 }
990                 l_dput(de);
991         default: ;
992         }
993         req->rq_status = rc;
994         pop_ctxt(&saved, &mds->mds_ctxt, &uc);
995         return rc;
996 }
997
998 static int mds_getattr(int offset, struct ptlrpc_request *req)
999 {
1000         struct mds_obd *mds = mds_req2mds(req);
1001         struct obd_device *obd = req->rq_export->exp_obd;
1002         struct obd_run_ctxt saved;
1003         struct dentry *de;
1004         struct mds_body *body;
1005         struct obd_ucred uc;
1006         int rc = 0;
1007         ENTRY;
1008
1009         body = lustre_swab_reqbuf (req, offset, sizeof (*body),
1010                                    lustre_swab_mds_body);
1011         if (body == NULL) {
1012                 CERROR ("Can't unpack body\n");
1013                 RETURN (-EFAULT);
1014         }
1015
1016         uc.ouc_fsuid = body->fsuid;
1017         uc.ouc_fsgid = body->fsgid;
1018         uc.ouc_cap = body->capability;
1019         push_ctxt(&saved, &mds->mds_ctxt, &uc);
1020         de = mds_fid2dentry(mds, &body->fid1, NULL);
1021         if (IS_ERR(de)) {
1022                 rc = req->rq_status = -ENOENT;
1023                 GOTO(out_pop, PTR_ERR(de));
1024         }
1025
1026         rc = mds_getattr_pack_msg(req, de->d_inode, offset);
1027         if (rc != 0) {
1028                 CERROR ("mds_getattr_pack_msg: %d\n", rc);
1029                 GOTO (out_pop, rc);
1030         }
1031
1032         req->rq_status = mds_getattr_internal(obd, de, req, body, 0);
1033
1034         l_dput(de);
1035         GOTO(out_pop, rc);
1036 out_pop:
1037         pop_ctxt(&saved, &mds->mds_ctxt, &uc);
1038         return rc;
1039 }
1040
1041
1042 static int mds_obd_statfs(struct obd_device *obd, struct obd_statfs *osfs,
1043                           unsigned long max_age)
1044 {
1045         return fsfilt_statfs(obd, obd->u.mds.mds_sb, osfs);
1046 }
1047
1048 static int mds_statfs(struct ptlrpc_request *req)
1049 {
1050         struct obd_device *obd = req->rq_export->exp_obd;
1051         int rc, size = sizeof(struct obd_statfs);
1052         ENTRY;
1053
1054         rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
1055         if (rc || OBD_FAIL_CHECK(OBD_FAIL_MDS_STATFS_PACK)) {
1056                 CERROR("mds: statfs lustre_pack_msg failed: rc = %d\n", rc);
1057                 GOTO(out, rc);
1058         }
1059
1060         /* We call this so that we can cache a bit - 1 jiffie worth */
1061         rc = obd_statfs(obd, lustre_msg_buf(req->rq_repmsg,0,size),jiffies-HZ);
1062         if (rc) {
1063                 CERROR("mds_obd_statfs failed: rc %d\n", rc);
1064                 GOTO(out, rc);
1065         }
1066
1067         EXIT;
1068 out:
1069         req->rq_status = rc;
1070         return 0;
1071 }
1072
1073 static void reconstruct_close(struct ptlrpc_request *req)
1074 {
1075         struct mds_export_data *med = &req->rq_export->exp_mds_data;
1076         struct mds_client_data *mcd = med->med_mcd;
1077
1078         req->rq_transno = mcd->mcd_last_transno;
1079         req->rq_status = mcd->mcd_last_result;
1080
1081         /* XXX When open-unlink is working, we'll need to steal ack locks as
1082          * XXX well, and make sure that we do the right unlinking after we
1083          * XXX get the ack back.
1084          */
1085 }
1086
1087 static int mds_close(struct ptlrpc_request *req)
1088 {
1089         struct mds_export_data *med = &req->rq_export->exp_mds_data;
1090         struct obd_device *obd = req->rq_export->exp_obd;
1091         struct mds_body *body;
1092         struct mds_file_data *mfd;
1093         struct obd_run_ctxt saved;
1094         int rc;
1095         ENTRY;
1096
1097         MDS_CHECK_RESENT(req, reconstruct_close(req));
1098
1099         body = lustre_swab_reqbuf(req, 0, sizeof (*body),
1100                                   lustre_swab_mds_body);
1101         if (body == NULL) {
1102                 CERROR ("Can't unpack body\n");
1103                 RETURN (-EFAULT);
1104         }
1105
1106         mfd = mds_handle2mfd(&body->handle);
1107         if (mfd == NULL) {
1108                 DEBUG_REQ(D_ERROR, req, "no handle for file close "LPD64
1109                           ": cookie "LPX64"\n", body->fid1.id,
1110                           body->handle.cookie);
1111                 RETURN(-ESTALE);
1112         }
1113
1114         rc = lustre_pack_msg(0, NULL, NULL, &req->rq_replen, &req->rq_repmsg);
1115         if (rc) {
1116                 CERROR("lustre_pack_msg: rc = %d\n", rc);
1117                 req->rq_status = rc;
1118         }
1119
1120         spin_lock(&med->med_open_lock);
1121         list_del(&mfd->mfd_list);
1122         spin_unlock(&med->med_open_lock);
1123
1124         push_ctxt(&saved, &obd->u.mds.mds_ctxt, NULL);
1125         req->rq_status = mds_mfd_close(rc ? NULL : req, obd, mfd);
1126         pop_ctxt(&saved, &obd->u.mds.mds_ctxt, NULL);
1127
1128         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_CLOSE_PACK)) {
1129                 CERROR("test case OBD_FAIL_MDS_CLOSE_PACK\n");
1130                 req->rq_status = -ENOMEM;
1131                 mds_mfd_put(mfd);
1132                 RETURN(-ENOMEM);
1133         }
1134
1135         mds_mfd_put(mfd);
1136         RETURN(0);
1137 }
1138
1139 static int mds_readpage(struct ptlrpc_request *req)
1140 {
1141         struct mds_obd *mds = mds_req2mds(req);
1142         struct vfsmount *mnt;
1143         struct dentry *de;
1144         struct file *file;
1145         struct mds_body *body, *repbody;
1146         struct obd_run_ctxt saved;
1147         int rc, size = sizeof(*repbody);
1148         struct obd_ucred uc;
1149         ENTRY;
1150
1151         rc = lustre_pack_msg(1, &size, NULL, &req->rq_replen, &req->rq_repmsg);
1152         if (rc || OBD_FAIL_CHECK(OBD_FAIL_MDS_READPAGE_PACK)) {
1153                 CERROR("mds: out of memory\n");
1154                 GOTO(out, rc = -ENOMEM);
1155         }
1156
1157         body = lustre_swab_reqbuf (req, 0, sizeof (*body),
1158                                    lustre_swab_mds_body);
1159         if (body == NULL)
1160                 GOTO (out, rc = -EFAULT);
1161
1162         /* body->size is actually the offset -eeb */
1163         if ((body->size & ~PAGE_MASK) != 0) {
1164                 CERROR ("offset "LPU64"not on a page boundary\n", body->size);
1165                 GOTO (out, rc = -EFAULT);
1166         }
1167
1168         /* body->nlink is actually the #bytes to read -eeb */
1169         if (body->nlink != PAGE_SIZE) {
1170                 CERROR ("size %d is not PAGE_SIZE\n", body->nlink);
1171                 GOTO (out, rc = -EFAULT);
1172         }
1173
1174         uc.ouc_fsuid = body->fsuid;
1175         uc.ouc_fsgid = body->fsgid;
1176         uc.ouc_cap = body->capability;
1177         push_ctxt(&saved, &mds->mds_ctxt, &uc);
1178         de = mds_fid2dentry(mds, &body->fid1, &mnt);
1179         if (IS_ERR(de))
1180                 GOTO(out_pop, rc = PTR_ERR(de));
1181
1182         CDEBUG(D_INODE, "ino %lu\n", de->d_inode->i_ino);
1183
1184         file = dentry_open(de, mnt, O_RDONLY | O_LARGEFILE);
1185         /* note: in case of an error, dentry_open puts dentry */
1186         if (IS_ERR(file))
1187                 GOTO(out_pop, rc = PTR_ERR(file));
1188
1189         repbody = lustre_msg_buf(req->rq_repmsg, 0, sizeof (*repbody));
1190         repbody->size = file->f_dentry->d_inode->i_size;
1191         repbody->valid = OBD_MD_FLSIZE;
1192
1193         /* to make this asynchronous make sure that the handling function
1194            doesn't send a reply when this function completes. Instead a
1195            callback function would send the reply */
1196         /* body->blocks is actually the xid -phil */
1197         /* body->size is actually the offset -eeb */
1198         rc = mds_sendpage(req, file, body->size, body->blocks);
1199
1200         filp_close(file, 0);
1201 out_pop:
1202         pop_ctxt(&saved, &mds->mds_ctxt, &uc);
1203 out:
1204         req->rq_status = rc;
1205         RETURN(0);
1206 }
1207
1208 int mds_reint(struct ptlrpc_request *req, int offset,
1209               struct lustre_handle *lockh)
1210 {
1211         struct mds_update_record *rec; /* 116 bytes on the stack?  no sir! */
1212         int rc;
1213
1214         OBD_ALLOC(rec, sizeof(*rec));
1215         if (rec == NULL)
1216                 RETURN(-ENOMEM);
1217
1218         rc = mds_update_unpack(req, offset, rec);
1219         if (rc || OBD_FAIL_CHECK(OBD_FAIL_MDS_REINT_UNPACK)) {
1220                 CERROR("invalid record\n");
1221                 GOTO(out, req->rq_status = -EINVAL);
1222         }
1223         /* rc will be used to interrupt a for loop over multiple records */
1224         rc = mds_reint_rec(rec, offset, req, lockh);
1225  out:
1226         OBD_FREE(rec, sizeof(*rec));
1227         return rc;
1228 }
1229
1230 static int filter_recovery_request(struct ptlrpc_request *req,
1231                                    struct obd_device *obd, int *process)
1232 {
1233         switch (req->rq_reqmsg->opc) {
1234         case MDS_CONNECT: /* This will never get here, but for completeness. */
1235         case OST_CONNECT: /* This will never get here, but for completeness. */
1236         case MDS_DISCONNECT:
1237         case OST_DISCONNECT:
1238                *process = 1;
1239                RETURN(0);
1240
1241         case MDS_CLOSE:
1242         case MDS_GETSTATUS: /* used in unmounting */
1243         case OBD_PING:
1244         case MDS_REINT:
1245         case LDLM_ENQUEUE:
1246                 *process = target_queue_recovery_request(req, obd);
1247                 RETURN(0);
1248
1249         default:
1250                 DEBUG_REQ(D_ERROR, req, "not permitted during recovery");
1251                 *process = 0;
1252                 /* XXX what should we set rq_status to here? */
1253                 req->rq_status = -EAGAIN;
1254                 RETURN(ptlrpc_error(req));
1255         }
1256 }
1257
1258 static char *reint_names[] = {
1259         [REINT_SETATTR] "setattr",
1260         [REINT_CREATE]  "create",
1261         [REINT_LINK]    "link",
1262         [REINT_UNLINK]  "unlink",
1263         [REINT_RENAME]  "rename",
1264         [REINT_OPEN]    "open",
1265 };
1266
1267 void mds_steal_ack_locks(struct obd_export *exp,
1268                          struct ptlrpc_request *req)
1269 {
1270         unsigned long  flags;
1271
1272         struct ptlrpc_request *oldrep = exp->exp_outstanding_reply;
1273         memcpy(req->rq_ack_locks, oldrep->rq_ack_locks,
1274                sizeof req->rq_ack_locks);
1275         spin_lock_irqsave (&req->rq_lock, flags);
1276         oldrep->rq_resent = 1;
1277         wake_up(&oldrep->rq_wait_for_rep);
1278         spin_unlock_irqrestore (&req->rq_lock, flags);
1279         DEBUG_REQ(D_HA, oldrep, "stole locks from");
1280         DEBUG_REQ(D_HA, req, "stole locks for");
1281 }
1282
1283 int mds_handle(struct ptlrpc_request *req)
1284 {
1285         int should_process;
1286         int rc = 0;
1287         struct mds_obd *mds = NULL; /* quell gcc overwarning */
1288         struct obd_device *obd = NULL;
1289         ENTRY;
1290
1291         OBD_FAIL_RETURN(OBD_FAIL_MDS_ALL_REQUEST_NET | OBD_FAIL_ONCE, 0);
1292
1293         LASSERT(!strcmp(req->rq_obd->obd_type->typ_name, LUSTRE_MDT_NAME));
1294
1295         /* XXX identical to OST */
1296         if (req->rq_reqmsg->opc != MDS_CONNECT) {
1297                 struct mds_export_data *med;
1298                 int recovering, abort_recovery;
1299
1300                 if (req->rq_export == NULL) {
1301                         CERROR("lustre_mds: operation %d on unconnected MDS\n",
1302                                req->rq_reqmsg->opc);
1303                         req->rq_status = -ENOTCONN;
1304                         GOTO(out, rc = -ENOTCONN);
1305                 }
1306
1307                 med = &req->rq_export->exp_mds_data;
1308                 obd = req->rq_export->exp_obd;
1309                 mds = &obd->u.mds;
1310
1311                 /* Check for aborted recovery. */
1312                 spin_lock_bh(&obd->obd_processing_task_lock);
1313                 abort_recovery = obd->obd_abort_recovery;
1314                 recovering = obd->obd_recovering;
1315                 spin_unlock_bh(&obd->obd_processing_task_lock);
1316                 if (abort_recovery) {
1317                         target_abort_recovery(obd);
1318                 } else if (recovering) {
1319                         rc = filter_recovery_request(req, obd, &should_process);
1320                         if (rc || !should_process)
1321                                 RETURN(rc);
1322                 }
1323         }
1324
1325         switch (req->rq_reqmsg->opc) {
1326         case MDS_CONNECT:
1327                 DEBUG_REQ(D_INODE, req, "connect");
1328                 OBD_FAIL_RETURN(OBD_FAIL_MDS_CONNECT_NET, 0);
1329                 rc = target_handle_connect(req, mds_handle);
1330                 /* Make sure that last_rcvd is correct. */
1331                 if (!rc) {
1332                         /* Now that we have an export, set mds. */
1333                         mds = mds_req2mds(req);
1334                         mds_fsync_super(mds->mds_sb);
1335                 }
1336                 break;
1337
1338         case MDS_DISCONNECT:
1339                 DEBUG_REQ(D_INODE, req, "disconnect");
1340                 OBD_FAIL_RETURN(OBD_FAIL_MDS_DISCONNECT_NET, 0);
1341                 rc = target_handle_disconnect(req);
1342                 /* Make sure that last_rcvd is correct. */
1343                 if (!rc)
1344                         mds_fsync_super(mds->mds_sb);
1345                 req->rq_status = rc;            /* superfluous? */
1346                 break;
1347
1348         case MDS_GETSTATUS:
1349                 DEBUG_REQ(D_INODE, req, "getstatus");
1350                 OBD_FAIL_RETURN(OBD_FAIL_MDS_GETSTATUS_NET, 0);
1351                 rc = mds_getstatus(req);
1352                 break;
1353
1354         case MDS_GETLOVINFO:
1355                 DEBUG_REQ(D_INODE, req, "getlovinfo");
1356                 rc = mds_getlovinfo(req);
1357                 break;
1358
1359         case MDS_GETATTR:
1360                 DEBUG_REQ(D_INODE, req, "getattr");
1361                 OBD_FAIL_RETURN(OBD_FAIL_MDS_GETATTR_NET, 0);
1362                 rc = mds_getattr(0, req);
1363                 break;
1364
1365         case MDS_GETATTR_NAME: {
1366                 struct lustre_handle lockh;
1367                 DEBUG_REQ(D_INODE, req, "getattr_name");
1368                 OBD_FAIL_RETURN(OBD_FAIL_MDS_GETATTR_NAME_NET, 0);
1369
1370                 /* If this request gets a reconstructed reply, we won't be
1371                  * acquiring any new locks in mds_getattr_name, so we don't
1372                  * want to cancel.
1373                  */
1374                 lockh.cookie = 0;
1375                 rc = mds_getattr_name(0, req, &lockh);
1376                 if (rc == 0 && lockh.cookie)
1377                         ldlm_lock_decref(&lockh, LCK_PR);
1378                 break;
1379         }
1380         case MDS_STATFS:
1381                 DEBUG_REQ(D_INODE, req, "statfs");
1382                 OBD_FAIL_RETURN(OBD_FAIL_MDS_STATFS_NET, 0);
1383                 rc = mds_statfs(req);
1384                 break;
1385
1386         case MDS_READPAGE:
1387                 DEBUG_REQ(D_INODE, req, "readpage");
1388                 OBD_FAIL_RETURN(OBD_FAIL_MDS_READPAGE_NET, 0);
1389                 rc = mds_readpage(req);
1390
1391                 if (OBD_FAIL_CHECK(OBD_FAIL_MDS_SENDPAGE))
1392                         return 0;
1393                 break;
1394
1395         case MDS_REINT: {
1396                 __u32 *opcp = lustre_msg_buf(req->rq_reqmsg, 0, sizeof (*opcp));
1397                 __u32  opc;
1398                 int size[3] = {sizeof(struct mds_body), mds->mds_max_mdsize,
1399                                mds->mds_max_cookiesize};
1400                 int bufcount;
1401
1402                 /* NB only peek inside req now; mds_reint() will swab it */
1403                 if (opcp == NULL) {
1404                         CERROR ("Can't inspect opcode\n");
1405                         rc = -EINVAL;
1406                         break;
1407                 }
1408                 opc = *opcp;
1409                 if (lustre_msg_swabbed (req->rq_reqmsg))
1410                         __swab32s(&opc);
1411
1412                 DEBUG_REQ(D_INODE, req, "reint %d (%s)", opc,
1413                           (opc < sizeof(reint_names) / sizeof(reint_names[0]) ||
1414                            reint_names[opc] == NULL) ? reint_names[opc] :
1415                                                        "unknown opcode");
1416
1417                 OBD_FAIL_RETURN(OBD_FAIL_MDS_REINT_NET, 0);
1418
1419                 if (opc == REINT_UNLINK)
1420                         bufcount = 3;
1421                 else if (opc == REINT_OPEN)
1422                         bufcount = 2;
1423                 else
1424                         bufcount = 1;
1425
1426                 rc = lustre_pack_msg(bufcount, size, NULL,
1427                                      &req->rq_replen, &req->rq_repmsg);
1428                 if (rc)
1429                         break;
1430
1431                 rc = mds_reint(req, 0, NULL);
1432                 OBD_FAIL_RETURN(OBD_FAIL_MDS_REINT_NET_REP, 0);
1433                 break;
1434         }
1435
1436         case MDS_CLOSE:
1437                 DEBUG_REQ(D_INODE, req, "close");
1438                 OBD_FAIL_RETURN(OBD_FAIL_MDS_CLOSE_NET, 0);
1439                 rc = mds_close(req);
1440                 break;
1441
1442         case MDS_PIN:
1443                 DEBUG_REQ(D_INODE, req, "pin");
1444                 OBD_FAIL_RETURN(OBD_FAIL_MDS_PIN_NET, 0);
1445                 rc = mds_pin(req);
1446                 break;
1447
1448         case OBD_PING:
1449                 DEBUG_REQ(D_INODE, req, "ping");
1450                 rc = target_handle_ping(req);
1451                 break;
1452
1453         case OBD_LOG_CANCEL:
1454                 CDEBUG(D_INODE, "log cancel\n");
1455                 OBD_FAIL_RETURN(OBD_FAIL_OBD_LOG_CANCEL_NET, 0);
1456                 rc = -ENOTSUPP; /* la la la */
1457                 break;
1458
1459         case LDLM_ENQUEUE:
1460                 DEBUG_REQ(D_INODE, req, "enqueue");
1461                 OBD_FAIL_RETURN(OBD_FAIL_LDLM_ENQUEUE, 0);
1462                 rc = ldlm_handle_enqueue(req, ldlm_server_completion_ast,
1463                                          ldlm_server_blocking_ast);
1464                 break;
1465         case LDLM_CONVERT:
1466                 DEBUG_REQ(D_INODE, req, "convert");
1467                 OBD_FAIL_RETURN(OBD_FAIL_LDLM_CONVERT, 0);
1468                 rc = ldlm_handle_convert(req);
1469                 break;
1470         case LDLM_BL_CALLBACK:
1471         case LDLM_CP_CALLBACK:
1472                 DEBUG_REQ(D_INODE, req, "callback");
1473                 CERROR("callbacks should not happen on MDS\n");
1474                 LBUG();
1475                 OBD_FAIL_RETURN(OBD_FAIL_LDLM_BL_CALLBACK, 0);
1476                 break;
1477         default:
1478                 req->rq_status = -ENOTSUPP;
1479                 rc = ptlrpc_error(req);
1480                 RETURN(rc);
1481         }
1482
1483         EXIT;
1484
1485         /* If we're DISCONNECTing, the mds_export_data is already freed */
1486         if (!rc && req->rq_reqmsg->opc != MDS_DISCONNECT) {
1487                 struct mds_export_data *med = &req->rq_export->exp_mds_data;
1488                 struct obd_device *obd = list_entry(mds, struct obd_device,
1489                                                     u.mds);
1490                 req->rq_repmsg->last_xid =
1491                         le64_to_cpu(med->med_mcd->mcd_last_xid);
1492
1493                 if (!obd->obd_no_transno) {
1494                         req->rq_repmsg->last_committed =
1495                                 obd->obd_last_committed;
1496                 } else {
1497                         DEBUG_REQ(D_IOCTL, req,
1498                                   "not sending last_committed update");
1499                 }
1500                 CDEBUG(D_INFO, "last_transno "LPU64", last_committed "LPU64
1501                        ", xid "LPU64"\n",
1502                        mds->mds_last_transno, obd->obd_last_committed,
1503                        req->rq_xid);
1504         }
1505  out:
1506
1507         if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_LAST_REPLAY) {
1508                 if (obd && obd->obd_recovering) {
1509                         DEBUG_REQ(D_HA, req, "LAST_REPLAY, queuing reply");
1510                         return target_queue_final_reply(req, rc);
1511                 }
1512                 /* Lost a race with recovery; let the error path DTRT. */
1513                 rc = req->rq_status = -ENOTCONN;
1514         }
1515
1516         target_send_reply(req, rc, OBD_FAIL_MDS_ALL_REPLY_NET);
1517         return 0;
1518 }
1519
1520 /* Update the server data on disk.  This stores the new mount_count and
1521  * also the last_rcvd value to disk.  If we don't have a clean shutdown,
1522  * then the server last_rcvd value may be less than that of the clients.
1523  * This will alert us that we may need to do client recovery.
1524  *
1525  * Also assumes for mds_last_transno that we are not modifying it (no locking).
1526  */
1527 int mds_update_server_data(struct obd_device *obd)
1528 {
1529         struct mds_obd *mds = &obd->u.mds;
1530         struct mds_server_data *msd = mds->mds_server_data;
1531         struct file *filp = mds->mds_rcvd_filp;
1532         struct obd_run_ctxt saved;
1533         loff_t off = 0;
1534         int rc;
1535
1536         push_ctxt(&saved, &mds->mds_ctxt, NULL);
1537         msd->msd_last_transno = cpu_to_le64(mds->mds_last_transno);
1538         msd->msd_mount_count = cpu_to_le64(mds->mds_mount_count);
1539
1540         CDEBUG(D_SUPER, "MDS mount_count is "LPU64", last_transno is "LPU64"\n",
1541                mds->mds_mount_count, mds->mds_last_transno);
1542         rc = fsfilt_write_record(obd, filp, (char *)msd, sizeof(*msd), &off);
1543         if (rc != sizeof(*msd)) {
1544                 CERROR("error writing MDS server data: rc = %d\n", rc);
1545                 if (rc > 0)
1546                         rc = -EIO;
1547                 GOTO(out, rc);
1548         }
1549         rc = file_fsync(filp, filp->f_dentry, 1);
1550         if (rc)
1551                 CERROR("error flushing MDS server data: rc = %d\n", rc);
1552
1553 out:
1554         pop_ctxt(&saved, &mds->mds_ctxt, NULL);
1555         RETURN(rc);
1556 }
1557
1558 /* mount the file system (secretly) */
1559 static int mds_setup(struct obd_device *obd, obd_count len, void *buf)
1560 {
1561         struct obd_ioctl_data* data = buf;
1562         struct mds_obd *mds = &obd->u.mds;
1563         struct vfsmount *mnt;
1564         int rc = 0;
1565         unsigned long page;
1566         ENTRY;
1567
1568
1569 #ifdef CONFIG_DEV_RDONLY
1570         dev_clear_rdonly(2);
1571 #endif
1572         if (!data->ioc_inlbuf1 || !data->ioc_inlbuf2)
1573                 RETURN(rc = -EINVAL);
1574
1575         if (data->ioc_inlbuf4)
1576                 obd_str2uuid(&mds->mds_osc_uuid, data->ioc_inlbuf4);
1577
1578         obd->obd_fsops = fsfilt_get_ops(data->ioc_inlbuf2);
1579         if (IS_ERR(obd->obd_fsops))
1580                 RETURN(rc = PTR_ERR(obd->obd_fsops));
1581
1582
1583         if (data->ioc_inllen3 > 0 && data->ioc_inlbuf3) {
1584                 if (*data->ioc_inlbuf3 == '/') {
1585                         CERROR("mds namespace mount: %s\n", 
1586                                data->ioc_inlbuf3);
1587 //                        mds->mds_nspath = strdup(ioc->inlbuf4);
1588                 } else {
1589                         CERROR("namespace mount must be absolute path: '%s'\n",
1590                                data->ioc_inlbuf3);
1591                 }
1592         }
1593
1594         if (!(page = __get_free_page(GFP_KERNEL)))
1595                 return -ENOMEM;
1596
1597         memset((void *)page, 0, PAGE_SIZE);
1598         sprintf((char *)page, "iopen_nopriv");
1599
1600         mnt = do_kern_mount(data->ioc_inlbuf2, 0,
1601                             data->ioc_inlbuf1, (void *)page);
1602         free_page(page);
1603         if (IS_ERR(mnt)) {
1604                 rc = PTR_ERR(mnt);
1605                 CERROR("do_kern_mount failed: rc = %d\n", rc);
1606                 GOTO(err_ops, rc);
1607         }
1608
1609         CDEBUG(D_SUPER, "%s: mnt = %p\n", data->ioc_inlbuf1, mnt);
1610         mds->mds_sb = mnt->mnt_root->d_inode->i_sb;
1611         if (!mds->mds_sb)
1612                 GOTO(err_put, rc = -ENODEV);
1613
1614         spin_lock_init(&mds->mds_transno_lock);
1615         mds->mds_max_mdsize = sizeof(struct lov_mds_md);
1616         mds->mds_max_cookiesize = sizeof(struct llog_cookie);
1617         rc = mds_fs_setup(obd, mnt);
1618         if (rc) {
1619                 CERROR("MDS filesystem method init failed: rc = %d\n", rc);
1620                 GOTO(err_put, rc);
1621         }
1622
1623 #ifdef ENABLE_ORPHANS
1624         rc = llog_start_commit_thread();
1625         if (rc < 0)
1626                 GOTO(err_fs, rc);
1627 #endif
1628
1629 #ifdef ENABLE_ORPHANS
1630         mds->mds_catalog = mds_get_catalog(obd);
1631         if (IS_ERR(mds->mds_catalog))
1632                 GOTO(err_fs, rc = PTR_ERR(mds->mds_catalog));
1633 #endif
1634
1635         obd->obd_namespace = ldlm_namespace_new("mds_server",
1636                                                 LDLM_NAMESPACE_SERVER);
1637         if (obd->obd_namespace == NULL) {
1638                 mds_cleanup(obd, 0);
1639                 GOTO(err_log, rc = -ENOMEM);
1640         }
1641
1642         ptlrpc_init_client(LDLM_CB_REQUEST_PORTAL, LDLM_CB_REPLY_PORTAL,
1643                            "mds_ldlm_client", &obd->obd_ldlm_client);
1644
1645         mds->mds_has_lov_desc = 0;
1646         obd->obd_replayable = 1;
1647
1648         RETURN(0);
1649
1650 err_log:
1651 #ifdef ENABLE_ORPHANS
1652         mds_put_catalog(mds->mds_catalog);
1653         /* No extra cleanup needed for llog_init_commit_thread() */
1654 err_fs:
1655 #endif
1656         mds_fs_cleanup(obd, 0);
1657 err_put:
1658         unlock_kernel();
1659         mntput(mds->mds_vfsmnt);
1660         mds->mds_sb = 0;
1661         lock_kernel();
1662 err_ops:
1663         fsfilt_put_ops(obd->obd_fsops);
1664         return rc;
1665 }
1666
1667 static int mds_cleanup(struct obd_device *obd, int flags)
1668 {
1669         struct mds_obd *mds = &obd->u.mds;
1670         ENTRY;
1671
1672         if (mds->mds_sb == NULL)
1673                 RETURN(0);
1674
1675 #ifdef ENABLE_ORPHANS
1676         mds_put_catalog(mds->mds_catalog);
1677 #endif
1678         if (mds->mds_osc_obd)
1679                 obd_disconnect(&mds->mds_osc_conn, flags);
1680         mds_update_server_data(obd);
1681         mds_fs_cleanup(obd, flags);
1682
1683         unlock_kernel();
1684
1685         /* 2 seems normal on mds, (may_umount() also expects 2
1686           fwiw), but we only see 1 at this point in obdfilter. */
1687         if (atomic_read(&obd->u.mds.mds_vfsmnt->mnt_count) > 2)
1688                 CERROR("%s: mount point busy, mnt_count: %d\n", obd->obd_name,
1689                        atomic_read(&obd->u.mds.mds_vfsmnt->mnt_count));
1690
1691         mntput(mds->mds_vfsmnt);
1692         mds->mds_sb = 0;
1693
1694         ldlm_namespace_free(obd->obd_namespace);
1695
1696         if (obd->obd_recovering)
1697                 target_cancel_recovery_timer(obd);
1698         lock_kernel();
1699 #ifdef CONFIG_DEV_RDONLY
1700         dev_clear_rdonly(2);
1701 #endif
1702         fsfilt_put_ops(obd->obd_fsops);
1703
1704         RETURN(0);
1705 }
1706
1707 static void fixup_handle_for_resent_req(struct ptlrpc_request *req,
1708                                         struct ldlm_lock *new_lock,
1709                                         struct lustre_handle *lockh)
1710 {
1711         struct obd_export *exp = req->rq_export;
1712         struct obd_device *obd = exp->exp_obd;
1713         struct ldlm_request *dlmreq =
1714                 lustre_msg_buf(req->rq_reqmsg, 0, sizeof (*dlmreq));
1715         struct lustre_handle remote_hdl = dlmreq->lock_handle1;
1716         struct list_head *iter;
1717
1718         if (!(lustre_msg_get_flags(req->rq_reqmsg) & MSG_RESENT))
1719                 return;
1720
1721         l_lock(&obd->obd_namespace->ns_lock);
1722         list_for_each(iter, &exp->exp_ldlm_data.led_held_locks) {
1723                 struct ldlm_lock *lock;
1724                 lock = list_entry(iter, struct ldlm_lock, l_export_chain);
1725                 if (lock == new_lock)
1726                         continue;
1727                 if (lock->l_remote_handle.cookie == remote_hdl.cookie) {
1728                         lockh->cookie = lock->l_handle.h_cookie;
1729                         DEBUG_REQ(D_HA, req, "restoring lock cookie "LPX64,
1730                                   lockh->cookie);
1731                         l_unlock(&obd->obd_namespace->ns_lock);
1732                         return;
1733                 }
1734
1735         }
1736         l_unlock(&obd->obd_namespace->ns_lock);
1737         DEBUG_REQ(D_HA, req, "no existing lock with rhandle "LPX64,
1738                   remote_hdl.cookie);
1739 }
1740
1741 int intent_disposition(struct ldlm_reply *rep, int flag)
1742 {
1743         if (!rep)
1744                 return 0;
1745         return (rep->lock_policy_res1 & flag);
1746 }
1747
1748 void intent_set_disposition(struct ldlm_reply *rep, int flag)
1749 {
1750         if (!rep)
1751                 return;
1752         rep->lock_policy_res1 |= flag;
1753 }
1754
1755 static int ldlm_intent_policy(struct ldlm_namespace *ns,
1756                               struct ldlm_lock **lockp, void *req_cookie,
1757                               ldlm_mode_t mode, int flags, void *data)
1758 {
1759         struct ptlrpc_request *req = req_cookie;
1760         struct ldlm_lock *lock = *lockp;
1761         ENTRY;
1762
1763         if (!req_cookie)
1764                 RETURN(0);
1765
1766         if (req->rq_reqmsg->bufcount > 1) {
1767                 /* an intent needs to be considered */
1768                 struct ldlm_intent *it;
1769                 struct mds_obd *mds = &req->rq_export->exp_obd->u.mds;
1770                 struct ldlm_reply *rep;
1771                 struct lustre_handle lockh;
1772                 struct ldlm_lock *new_lock;
1773                 int offset = 2, repsize[4] = {sizeof(struct ldlm_reply),
1774                                               sizeof(struct mds_body),
1775                                               mds->mds_max_mdsize,
1776                                               mds->mds_max_cookiesize};
1777
1778                 it = lustre_swab_reqbuf(req, 1, sizeof (*it),
1779                                         lustre_swab_ldlm_intent);
1780                 if (it == NULL) {
1781                         CERROR ("Intent missing\n");
1782                         req->rq_status = -EFAULT;
1783                         RETURN(req->rq_status);
1784                 }
1785
1786                 LDLM_DEBUG(lock, "intent policy, opc: %s",
1787                            ldlm_it2str(it->opc));
1788
1789                 req->rq_status = lustre_pack_msg(it->opc == IT_UNLINK ? 4 : 3,
1790                                                  repsize, NULL, &req->rq_replen,
1791                                                  &req->rq_repmsg);
1792                 if (req->rq_status)
1793                         RETURN(req->rq_status);
1794
1795                 rep = lustre_msg_buf(req->rq_repmsg, 0, sizeof (*rep));
1796                 intent_set_disposition(rep, DISP_IT_EXECD);
1797
1798                 fixup_handle_for_resent_req(req, lock, &lockh);
1799
1800                 /* execute policy */
1801                 switch ((long)it->opc) {
1802                 case IT_OPEN:
1803                 case IT_CREAT|IT_OPEN:
1804                         /* XXX swab here to assert that an mds_open reint
1805                          * packet is following */
1806                         rep->lock_policy_res2 = mds_reint(req, offset, &lockh);
1807                         /* We abort the lock if the lookup was negative and
1808                          * we did not make it to the OPEN portion */
1809                         if (!intent_disposition(rep, DISP_LOOKUP_EXECD))
1810                                 RETURN(ELDLM_LOCK_ABORTED);
1811                         if (intent_disposition(rep, DISP_LOOKUP_NEG) &&
1812                             !intent_disposition(rep, DISP_OPEN_OPEN))
1813                                 RETURN(ELDLM_LOCK_ABORTED);
1814                         break;
1815                 case IT_GETATTR:
1816                 case IT_LOOKUP:
1817                 case IT_READDIR:
1818                         rep->lock_policy_res2 = mds_getattr_name(offset, req,
1819                                                                  &lockh);
1820                         /* FIXME: we need to sit down and decide on who should
1821                          * set req->rq_status, who should return negative and
1822                          * positive return values, and what they all mean. 
1823                          * - replay: returns 0 & req->status is old status
1824                          * - otherwise: returns req->status */
1825                         if (!intent_disposition(rep, DISP_LOOKUP_POS) || 
1826                             rep->lock_policy_res2)
1827                                 RETURN(ELDLM_LOCK_ABORTED);
1828                         if (req->rq_status != 0) {
1829                                 rep->lock_policy_res2 = req->rq_status;
1830                                 RETURN(ELDLM_LOCK_ABORTED);
1831                         }
1832                         break;
1833                 default:
1834                         CERROR("Unhandled intent "LPD64"\n", it->opc);
1835                         LBUG();
1836                 }
1837
1838                 /* By this point, whatever function we called above must have
1839                  * either filled in 'lockh', been an intent replay, or returned
1840                  * an error.  We want to allow replayed RPCs to not get a lock,
1841                  * since we would just drop it below anyways because lock replay
1842                  * is done separately by the client afterwards.  For regular
1843                  * RPCs we want to give the new lock to the client instead of
1844                  * whatever lock it was about to get.
1845                  */
1846                 new_lock = ldlm_handle2lock(&lockh);
1847                 if (flags & LDLM_FL_INTENT_ONLY && !new_lock)
1848                         RETURN(ELDLM_LOCK_ABORTED);
1849
1850                 LASSERT(new_lock != NULL);
1851
1852                 /* If we've already given this lock to a client once, then we
1853                  * should have no readers or writers.  Otherwise, we should
1854                  * have one reader _or_ writer ref (which will be zeroed below
1855                  * before returning the lock to a client.
1856                  */
1857                 if (new_lock->l_export == req->rq_export) {
1858                         LASSERT(new_lock->l_readers + new_lock->l_writers == 0);
1859                 } else {
1860                         LASSERT(new_lock->l_export == NULL);
1861                         LASSERT(new_lock->l_readers + new_lock->l_writers == 1);
1862                 }
1863
1864                 /* If we're running an intent only, we want to abort the new
1865                  * lock, and let the client abort the original lock. */
1866                 if (flags & LDLM_FL_INTENT_ONLY) {
1867                         LDLM_DEBUG(lock, "INTENT_ONLY, aborting locks");
1868                         l_lock(&new_lock->l_resource->lr_namespace->ns_lock);
1869                         if (new_lock->l_readers)
1870                                 ldlm_lock_decref(&lockh, LCK_PR);
1871                         else
1872                                 ldlm_lock_decref(&lockh, LCK_PW);
1873                         l_unlock(&new_lock->l_resource->lr_namespace->ns_lock);
1874                         LDLM_LOCK_PUT(new_lock);
1875                         RETURN(ELDLM_LOCK_ABORTED);
1876                 }
1877
1878                 *lockp = new_lock;
1879
1880                 rep->lock_policy_res2 = req->rq_status;
1881
1882                 if (new_lock->l_export == req->rq_export) {
1883                         /* Already gave this to the client, which means that we
1884                          * reconstructed a reply. */
1885                         LASSERT(lustre_msg_get_flags(req->rq_reqmsg) &
1886                                 MSG_RESENT);
1887                         RETURN(ELDLM_LOCK_REPLACED);
1888                 }
1889
1890                 /* Fixup the lock to be given to the client */
1891                 l_lock(&new_lock->l_resource->lr_namespace->ns_lock);
1892                 new_lock->l_readers = 0;
1893                 new_lock->l_writers = 0;
1894
1895                 new_lock->l_export = class_export_get(req->rq_export);
1896                 list_add(&new_lock->l_export_chain,
1897                          &new_lock->l_export->exp_ldlm_data.led_held_locks);
1898
1899                 /* We don't need to worry about completion_ast (which isn't set
1900                  * in 'lock' yet anyways), because this lock is already
1901                  * granted. */
1902                 new_lock->l_blocking_ast = lock->l_blocking_ast;
1903
1904                 memcpy(&new_lock->l_remote_handle, &lock->l_remote_handle,
1905                        sizeof(lock->l_remote_handle));
1906
1907                 new_lock->l_flags &= ~(LDLM_FL_LOCAL | LDLM_FL_AST_SENT |
1908                                        LDLM_FL_CBPENDING);
1909
1910                 LDLM_LOCK_PUT(new_lock);
1911                 l_unlock(&new_lock->l_resource->lr_namespace->ns_lock);
1912
1913                 RETURN(ELDLM_LOCK_REPLACED);
1914         } else {
1915                 int size = sizeof(struct ldlm_reply);
1916                 if (lustre_pack_msg(1, &size, NULL, &req->rq_replen,
1917                                     &req->rq_repmsg)) {
1918                         LBUG();
1919                         RETURN(-ENOMEM);
1920                 }
1921         }
1922         RETURN(0);
1923 }
1924
1925 int mds_attach(struct obd_device *dev, obd_count len, void *data)
1926 {
1927         struct lprocfs_static_vars lvars;
1928
1929         lprocfs_init_multi_vars(0, &lvars);
1930         return lprocfs_obd_attach(dev, lvars.obd_vars);
1931 }
1932
1933 int mds_detach(struct obd_device *dev)
1934 {
1935         return lprocfs_obd_detach(dev);
1936 }
1937
1938 int mdt_attach(struct obd_device *dev, obd_count len, void *data)
1939 {
1940         struct lprocfs_static_vars lvars;
1941
1942         lprocfs_init_multi_vars(1, &lvars);
1943         return lprocfs_obd_attach(dev, lvars.obd_vars);
1944 }
1945
1946 int mdt_detach(struct obd_device *dev)
1947 {
1948         return lprocfs_obd_detach(dev);
1949 }
1950
1951 static int mdt_setup(struct obd_device *obddev, obd_count len, void *buf)
1952 {
1953         struct mds_obd *mds = &obddev->u.mds;
1954         int i, rc = 0;
1955         ENTRY;
1956
1957         mds->mds_service = ptlrpc_init_svc(MDS_NEVENTS, MDS_NBUFS,
1958                                            MDS_BUFSIZE, MDS_MAXREQSIZE,
1959                                            MDS_REQUEST_PORTAL, MDC_REPLY_PORTAL,
1960                                            mds_handle, "mds", obddev);
1961
1962         if (!mds->mds_service) {
1963                 CERROR("failed to start service\n");
1964                 RETURN(rc = -ENOMEM);
1965         }
1966
1967         for (i = 0; i < MDT_NUM_THREADS; i++) {
1968                 char name[32];
1969                 sprintf(name, "ll_mdt_%02d", i);
1970                 rc = ptlrpc_start_thread(obddev, mds->mds_service, name);
1971                 if (rc) {
1972                         CERROR("cannot start MDT thread #%d: rc %d\n", i, rc);
1973                         GOTO(err_thread, rc);
1974                 }
1975         }
1976
1977         mds->mds_setattr_service =
1978                 ptlrpc_init_svc(MDS_NEVENTS, MDS_NBUFS,
1979                                 MDS_BUFSIZE, MDS_MAXREQSIZE,
1980                                 MDS_SETATTR_PORTAL, MDC_REPLY_PORTAL,
1981                                 mds_handle, "mds_setattr", obddev);
1982         if (!mds->mds_setattr_service) {
1983                 CERROR("failed to start getattr service\n");
1984                 GOTO(err_thread, rc = -ENOMEM);
1985         }
1986
1987         for (i = 0; i < MDT_NUM_THREADS; i++) {
1988                 char name[32];
1989                 sprintf(name, "ll_mdt_attr_%02d", i);
1990                 rc = ptlrpc_start_thread(obddev, mds->mds_setattr_service,
1991                                          name);
1992                 if (rc) {
1993                         CERROR("cannot start MDT setattr thread #%d: rc %d\n",
1994                                i, rc);
1995                         GOTO(err_thread2, rc);
1996                 }
1997         }
1998
1999         mds->mds_readpage_service =
2000                 ptlrpc_init_svc(MDS_NEVENTS, MDS_NBUFS,
2001                                 MDS_BUFSIZE, MDS_MAXREQSIZE,
2002                                 MDS_READPAGE_PORTAL, MDC_REPLY_PORTAL,
2003                                 mds_handle, "mds_readpage", obddev);
2004         if (!mds->mds_readpage_service) {
2005                 CERROR("failed to start readpage service\n");
2006                 GOTO(err_thread2, rc = -ENOMEM);
2007         }
2008
2009         for (i = 0; i < MDT_NUM_THREADS; i++) {
2010                 char name[32];
2011                 sprintf(name, "ll_mdt_rdpg_%02d", i);
2012                 rc = ptlrpc_start_thread(obddev, mds->mds_readpage_service,
2013                                          name);
2014                 if (rc) {
2015                         CERROR("cannot start MDT readpage thread #%d: rc %d\n",
2016                                i, rc);
2017                         GOTO(err_thread3, rc);
2018                 }
2019         }
2020
2021         RETURN(0);
2022
2023 err_thread3:
2024         ptlrpc_stop_all_threads(mds->mds_readpage_service);
2025         ptlrpc_unregister_service(mds->mds_readpage_service);
2026 err_thread2:
2027         ptlrpc_stop_all_threads(mds->mds_setattr_service);
2028         ptlrpc_unregister_service(mds->mds_setattr_service);
2029 err_thread:
2030         ptlrpc_stop_all_threads(mds->mds_service);
2031         ptlrpc_unregister_service(mds->mds_service);
2032         return rc;
2033 }
2034
2035
2036 static int mdt_cleanup(struct obd_device *obddev, int flags)
2037 {
2038         struct mds_obd *mds = &obddev->u.mds;
2039         ENTRY;
2040
2041         ptlrpc_stop_all_threads(mds->mds_readpage_service);
2042         ptlrpc_unregister_service(mds->mds_readpage_service);
2043
2044         ptlrpc_stop_all_threads(mds->mds_setattr_service);
2045         ptlrpc_unregister_service(mds->mds_setattr_service);
2046
2047         ptlrpc_stop_all_threads(mds->mds_service);
2048         ptlrpc_unregister_service(mds->mds_service);
2049
2050         RETURN(0);
2051 }
2052
2053 extern int mds_iocontrol(unsigned int cmd, struct lustre_handle *conn,
2054                          int len, void *karg, void *uarg);
2055
2056 /* use obd ops to offer management infrastructure */
2057 static struct obd_ops mds_obd_ops = {
2058         o_owner:       THIS_MODULE,
2059         o_attach:      mds_attach,
2060         o_detach:      mds_detach,
2061         o_connect:     mds_connect,
2062         o_disconnect:  mds_disconnect,
2063         o_setup:       mds_setup,
2064         o_cleanup:     mds_cleanup,
2065         o_statfs:      mds_obd_statfs,
2066         o_iocontrol:   mds_iocontrol
2067 };
2068
2069 static struct obd_ops mdt_obd_ops = {
2070         o_owner:       THIS_MODULE,
2071         o_attach:      mdt_attach,
2072         o_detach:      mdt_detach,
2073         o_setup:       mdt_setup,
2074         o_cleanup:     mdt_cleanup,
2075 };
2076
2077
2078 static int __init mds_init(void)
2079 {
2080         struct lprocfs_static_vars lvars;
2081
2082         lprocfs_init_multi_vars(0, &lvars);
2083         class_register_type(&mds_obd_ops, lvars.module_vars, LUSTRE_MDS_NAME);
2084         lprocfs_init_multi_vars(1, &lvars);
2085         class_register_type(&mdt_obd_ops, lvars.module_vars, LUSTRE_MDT_NAME);
2086         ldlm_register_intent(ldlm_intent_policy);
2087
2088         return 0;
2089 }
2090
2091 static void /*__exit*/ mds_exit(void)
2092 {
2093         ldlm_unregister_intent();
2094         class_unregister_type(LUSTRE_MDS_NAME);
2095         class_unregister_type(LUSTRE_MDT_NAME);
2096 }
2097
2098 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
2099 MODULE_DESCRIPTION("Lustre Metadata Server (MDS)");
2100 MODULE_LICENSE("GPL");
2101
2102 module_init(mds_init);
2103 module_exit(mds_exit);