Whamcloud - gitweb
Branch b1_4_newconfig2
[fs/lustre-release.git] / lustre / mds / handler.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  lustre/mds/handler.c
5  *  Lustre Metadata Server (mds) request handler
6  *
7  *  Copyright (c) 2001-2005 Cluster File Systems, Inc.
8  *   Author: Peter Braam <braam@clusterfs.com>
9  *   Author: Andreas Dilger <adilger@clusterfs.com>
10  *   Author: Phil Schwan <phil@clusterfs.com>
11  *   Author: Mike Shaver <shaver@clusterfs.com>
12  *
13  *   This file is part of Lustre, http://www.lustre.org.
14  *
15  *   Lustre is free software; you can redistribute it and/or
16  *   modify it under the terms of version 2 of the GNU General Public
17  *   License as published by the Free Software Foundation.
18  *
19  *   Lustre is distributed in the hope that it will be useful,
20  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
21  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
22  *   GNU General Public License for more details.
23  *
24  *   You should have received a copy of the GNU General Public License
25  *   along with Lustre; if not, write to the Free Software
26  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
27  */
28
29 #ifndef EXPORT_SYMTAB
30 # define EXPORT_SYMTAB
31 #endif
32 #define DEBUG_SUBSYSTEM S_MDS
33
34 #include <linux/module.h>
35 #include <linux/lustre_mds.h>
36 #include <linux/lustre_dlm.h>
37 #include <linux/init.h>
38 #include <linux/obd_class.h>
39 #include <linux/random.h>
40 #include <linux/fs.h>
41 #include <linux/jbd.h>
42 #include <linux/ext3_fs.h>
43 #if (LINUX_VERSION_CODE >= KERNEL_VERSION(2,5,0))
44 # include <linux/smp_lock.h>
45 # include <linux/buffer_head.h>
46 # include <linux/workqueue.h>
47 # include <linux/mount.h>
48 #else
49 # include <linux/locks.h>
50 #endif
51 #include <linux/obd_lov.h>
52 #include <linux/lustre_mds.h>
53 #include <linux/lustre_fsfilt.h>
54 #include <linux/lprocfs_status.h>
55 #include <linux/lustre_commit_confd.h>
56 #include <linux/lustre_quota.h>
57 #include <linux/lustre_disk.h>
58
59 #include "mds_internal.h"
60
61 static int mds_intent_policy(struct ldlm_namespace *ns,
62                              struct ldlm_lock **lockp, void *req_cookie,
63                              ldlm_mode_t mode, int flags, void *data);
64 static int mds_postsetup(struct obd_device *obd);
65 static int mds_cleanup(struct obd_device *obd);
66
67 /* Assumes caller has already pushed into the kernel filesystem context */
68 static int mds_sendpage(struct ptlrpc_request *req, struct file *file,
69                         loff_t offset, int count)
70 {
71         struct ptlrpc_bulk_desc *desc;
72         struct l_wait_info lwi;
73         struct page **pages;
74         int rc = 0, npages, i, tmpcount, tmpsize = 0;
75         ENTRY;
76
77         LASSERT((offset & (PAGE_SIZE - 1)) == 0); /* I'm dubious about this */
78
79         npages = (count + PAGE_SIZE - 1) >> PAGE_SHIFT;
80         OBD_ALLOC(pages, sizeof(*pages) * npages);
81         if (!pages)
82                 GOTO(out, rc = -ENOMEM);
83
84         desc = ptlrpc_prep_bulk_exp(req, npages, BULK_PUT_SOURCE,
85                                     MDS_BULK_PORTAL);
86         if (desc == NULL)
87                 GOTO(out_free, rc = -ENOMEM);
88
89         for (i = 0, tmpcount = count; i < npages; i++, tmpcount -= tmpsize) {
90                 tmpsize = tmpcount > PAGE_SIZE ? PAGE_SIZE : tmpcount;
91
92                 pages[i] = alloc_pages(GFP_KERNEL, 0);
93                 if (pages[i] == NULL)
94                         GOTO(cleanup_buf, rc = -ENOMEM);
95
96                 ptlrpc_prep_bulk_page(desc, pages[i], 0, tmpsize);
97         }
98
99         for (i = 0, tmpcount = count; i < npages; i++, tmpcount -= tmpsize) {
100                 tmpsize = tmpcount > PAGE_SIZE ? PAGE_SIZE : tmpcount;
101                 CDEBUG(D_EXT2, "reading %u@%llu from dir %lu (size %llu)\n",
102                        tmpsize, offset, file->f_dentry->d_inode->i_ino,
103                        file->f_dentry->d_inode->i_size);
104
105                 rc = fsfilt_readpage(req->rq_export->exp_obd, file,
106                                      kmap(pages[i]), tmpsize, &offset);
107                 kunmap(pages[i]);
108
109                 if (rc != tmpsize)
110                         GOTO(cleanup_buf, rc = -EIO);
111         }
112
113         LASSERT(desc->bd_nob == count);
114
115         rc = ptlrpc_start_bulk_transfer(desc);
116         if (rc)
117                 GOTO(cleanup_buf, rc);
118
119         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_SENDPAGE)) {
120                 CERROR("obd_fail_loc=%x, fail operation rc=%d\n",
121                        OBD_FAIL_MDS_SENDPAGE, rc);
122                 GOTO(abort_bulk, rc);
123         }
124
125         lwi = LWI_TIMEOUT(obd_timeout * HZ / 4, NULL, NULL);
126         rc = l_wait_event(desc->bd_waitq, !ptlrpc_bulk_active(desc), &lwi);
127         LASSERT (rc == 0 || rc == -ETIMEDOUT);
128
129         if (rc == 0) {
130                 if (desc->bd_success &&
131                     desc->bd_nob_transferred == count)
132                         GOTO(cleanup_buf, rc);
133
134                 rc = -ETIMEDOUT; /* XXX should this be a different errno? */
135         }
136
137         DEBUG_REQ(D_ERROR, req, "bulk failed: %s %d(%d), evicting %s@%s\n",
138                   (rc == -ETIMEDOUT) ? "timeout" : "network error",
139                   desc->bd_nob_transferred, count,
140                   req->rq_export->exp_client_uuid.uuid,
141                   req->rq_export->exp_connection->c_remote_uuid.uuid);
142
143         class_fail_export(req->rq_export);
144
145         EXIT;
146  abort_bulk:
147         ptlrpc_abort_bulk (desc);
148  cleanup_buf:
149         for (i = 0; i < npages; i++)
150                 if (pages[i])
151                         __free_pages(pages[i], 0);
152
153         ptlrpc_free_bulk(desc);
154  out_free:
155         OBD_FREE(pages, sizeof(*pages) * npages);
156  out:
157         return rc;
158 }
159
160 /* only valid locked dentries or errors should be returned */
161 struct dentry *mds_fid2locked_dentry(struct obd_device *obd, struct ll_fid *fid,
162                                      struct vfsmount **mnt, int lock_mode,
163                                      struct lustre_handle *lockh,
164                                      char *name, int namelen)
165 {
166         struct mds_obd *mds = &obd->u.mds;
167         struct dentry *de = mds_fid2dentry(mds, fid, mnt), *retval = de;
168         struct ldlm_res_id res_id = { .name = {0} };
169         int flags = 0, rc;
170         ENTRY;
171
172         if (IS_ERR(de))
173                 RETURN(de);
174
175         res_id.name[0] = de->d_inode->i_ino;
176         res_id.name[1] = de->d_inode->i_generation;
177         rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace, res_id,
178                               LDLM_PLAIN, NULL, lock_mode, &flags,
179                               mds_blocking_ast, ldlm_completion_ast, NULL, NULL,
180                               NULL, 0, NULL, lockh);
181         if (rc != ELDLM_OK) {
182                 l_dput(de);
183                 retval = ERR_PTR(-EIO); /* XXX translate ldlm code */
184         }
185
186         RETURN(retval);
187 }
188
189 /* Look up an entry by inode number. */
190 /* this function ONLY returns valid dget'd dentries with an initialized inode
191    or errors */
192 struct dentry *mds_fid2dentry(struct mds_obd *mds, struct ll_fid *fid,
193                               struct vfsmount **mnt)
194 {
195         char fid_name[32];
196         unsigned long ino = fid->id;
197         __u32 generation = fid->generation;
198         struct inode *inode;
199         struct dentry *result;
200
201         if (ino == 0)
202                 RETURN(ERR_PTR(-ESTALE));
203
204         snprintf(fid_name, sizeof(fid_name), "0x%lx", ino);
205
206         CDEBUG(D_DENTRY, "--> mds_fid2dentry: ino/gen %lu/%u, sb %p\n",
207                ino, generation, mds->mds_sb);
208
209         /* under ext3 this is neither supposed to return bad inodes
210            nor NULL inodes. */
211         result = ll_lookup_one_len(fid_name, mds->mds_fid_de, strlen(fid_name));
212         if (IS_ERR(result))
213                 RETURN(result);
214
215         inode = result->d_inode;
216         if (!inode)
217                 RETURN(ERR_PTR(-ENOENT));
218
219         if (generation && inode->i_generation != generation) {
220                 /* we didn't find the right inode.. */
221                 CERROR("bad inode %lu, link: %lu ct: %d or generation %u/%u\n",
222                        inode->i_ino, (unsigned long)inode->i_nlink,
223                        atomic_read(&inode->i_count), inode->i_generation,
224                        generation);
225                 dput(result);
226                 RETURN(ERR_PTR(-ENOENT));
227         }
228
229         if (mnt) {
230                 *mnt = mds->mds_vfsmnt;
231                 mntget(*mnt);
232         }
233
234         RETURN(result);
235 }
236
237
238 /* Establish a connection to the MDS.
239  *
240  * This will set up an export structure for the client to hold state data
241  * about that client, like open files, the last operation number it did
242  * on the server, etc.
243  */
244 static int mds_connect(struct lustre_handle *conn, struct obd_device *obd,
245                        struct obd_uuid *cluuid, struct obd_connect_data *data)
246 {
247         struct obd_export *exp;
248         struct mds_export_data *med;
249         struct mds_client_data *mcd;
250         int rc, abort_recovery;
251         ENTRY;
252
253         if (!conn || !obd || !cluuid)
254                 RETURN(-EINVAL);
255
256         /* Check for aborted recovery. */
257         spin_lock_bh(&obd->obd_processing_task_lock);
258         abort_recovery = obd->obd_abort_recovery;
259         spin_unlock_bh(&obd->obd_processing_task_lock);
260         if (abort_recovery)
261                 target_abort_recovery(obd);
262
263         /* XXX There is a small race between checking the list and adding a
264          * new connection for the same UUID, but the real threat (list
265          * corruption when multiple different clients connect) is solved.
266          *
267          * There is a second race between adding the export to the list,
268          * and filling in the client data below.  Hence skipping the case
269          * of NULL mcd above.  We should already be controlling multiple
270          * connects at the client, and we can't hold the spinlock over
271          * memory allocations without risk of deadlocking.
272          */
273         rc = class_connect(conn, obd, cluuid);
274         if (rc)
275                 RETURN(rc);
276         exp = class_conn2export(conn);
277         LASSERT(exp);
278         med = &exp->exp_mds_data;
279
280         if (data != NULL) {
281                 data->ocd_connect_flags &= MDS_CONNECT_SUPPORTED;
282                 exp->exp_connect_flags = data->ocd_connect_flags;
283         }
284
285         OBD_ALLOC(mcd, sizeof(*mcd));
286         if (!mcd) {
287                 CERROR("mds: out of memory for client data\n");
288                 GOTO(out, rc = -ENOMEM);
289         }
290
291         memcpy(mcd->mcd_uuid, cluuid, sizeof(mcd->mcd_uuid));
292         med->med_mcd = mcd;
293
294         rc = mds_client_add(obd, &obd->u.mds, med, -1);
295         GOTO(out, rc);
296
297 out:
298         if (rc) {
299                 if (mcd) {
300                         OBD_FREE(mcd, sizeof(*mcd));
301                         med->med_mcd = NULL;
302                 }
303                 class_disconnect(exp);
304         } else {
305                 class_export_put(exp);
306         }
307
308         RETURN(rc);
309 }
310
311 static int mds_init_export(struct obd_export *exp) 
312 {
313         struct mds_export_data *med = &exp->exp_mds_data;
314
315         INIT_LIST_HEAD(&med->med_open_head);
316         spin_lock_init(&med->med_open_lock);
317         RETURN(0);
318 }
319
320 static int mds_destroy_export(struct obd_export *export)
321 {
322         struct mds_export_data *med;
323         struct obd_device *obd = export->exp_obd;
324         struct lvfs_run_ctxt saved;
325         int rc = 0;
326         ENTRY;
327
328         med = &export->exp_mds_data;
329         target_destroy_export(export);
330
331         if (obd_uuid_equals(&export->exp_client_uuid, &obd->obd_uuid))
332                 GOTO(out, 0);
333
334         push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
335         /* Close any open files (which may also cause orphan unlinking). */
336         spin_lock(&med->med_open_lock);
337         while (!list_empty(&med->med_open_head)) {
338                 struct list_head *tmp = med->med_open_head.next;
339                 struct mds_file_data *mfd =
340                         list_entry(tmp, struct mds_file_data, mfd_list);
341                 struct dentry *dentry = mfd->mfd_dentry;
342
343                 /* Remove mfd handle so it can't be found again.
344                  * We are consuming the mfd_list reference here. */
345                 mds_mfd_unlink(mfd, 0);
346                 spin_unlock(&med->med_open_lock);
347
348                 /* If you change this message, be sure to update
349                  * replay_single:test_46 */
350                 CDEBUG(D_INODE|D_IOCTL, "%s: force closing file handle for "
351                        "%.*s (ino %lu)\n", obd->obd_name, dentry->d_name.len,
352                        dentry->d_name.name, dentry->d_inode->i_ino);
353                 /* child orphan sem protects orphan_dec_test and
354                  * is_orphan race, mds_mfd_close drops it */
355                 MDS_DOWN_WRITE_ORPHAN_SEM(dentry->d_inode);
356                 rc = mds_mfd_close(NULL, obd, mfd,
357                                    !(export->exp_flags & OBD_OPT_FAILOVER));
358
359                 if (rc)
360                         CDEBUG(D_INODE|D_IOCTL, "Error closing file: %d\n", rc);
361                 spin_lock(&med->med_open_lock);
362         }
363         spin_unlock(&med->med_open_lock);
364         pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
365 out:
366         mds_client_free(export);
367
368         RETURN(rc);
369 }
370
371 static int mds_disconnect(struct obd_export *exp)
372 {
373         unsigned long irqflags;
374         int rc;
375         ENTRY;
376
377         LASSERT(exp);
378         class_export_get(exp);
379
380         /* Disconnect early so that clients can't keep using export */
381         rc = class_disconnect(exp);
382         ldlm_cancel_locks_for_export(exp);
383
384         /* complete all outstanding replies */
385         spin_lock_irqsave(&exp->exp_lock, irqflags);
386         while (!list_empty(&exp->exp_outstanding_replies)) {
387                 struct ptlrpc_reply_state *rs =
388                         list_entry(exp->exp_outstanding_replies.next,
389                                    struct ptlrpc_reply_state, rs_exp_list);
390                 struct ptlrpc_service *svc = rs->rs_service;
391
392                 spin_lock(&svc->srv_lock);
393                 list_del_init(&rs->rs_exp_list);
394                 ptlrpc_schedule_difficult_reply(rs);
395                 spin_unlock(&svc->srv_lock);
396         }
397         spin_unlock_irqrestore(&exp->exp_lock, irqflags);
398
399         class_export_put(exp);
400         RETURN(rc);
401 }
402
403 static int mds_getstatus(struct ptlrpc_request *req)
404 {
405         struct mds_obd *mds = mds_req2mds(req);
406         struct mds_body *body;
407         int rc, size = sizeof(*body);
408         ENTRY;
409
410         rc = lustre_pack_reply(req, 1, &size, NULL);
411         if (rc || OBD_FAIL_CHECK(OBD_FAIL_MDS_GETSTATUS_PACK)) {
412                 CERROR("mds: out of memory for message: size=%d\n", size);
413                 req->rq_status = -ENOMEM;       /* superfluous? */
414                 RETURN(-ENOMEM);
415         }
416
417         body = lustre_msg_buf(req->rq_repmsg, 0, sizeof (*body));
418         memcpy(&body->fid1, &mds->mds_rootfid, sizeof(body->fid1));
419
420         /* the last_committed and last_xid fields are filled in for all
421          * replies already - no need to do so here also.
422          */
423         RETURN(0);
424 }
425
426 int mds_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc,
427                      void *data, int flag)
428 {
429         int do_ast;
430         ENTRY;
431
432         if (flag == LDLM_CB_CANCELING) {
433                 /* Don't need to do anything here. */
434                 RETURN(0);
435         }
436
437         /* XXX layering violation!  -phil */
438         l_lock(&lock->l_resource->lr_namespace->ns_lock);
439         /* Get this: if mds_blocking_ast is racing with mds_intent_policy,
440          * such that mds_blocking_ast is called just before l_i_p takes the
441          * ns_lock, then by the time we get the lock, we might not be the
442          * correct blocking function anymore.  So check, and return early, if
443          * so. */
444         if (lock->l_blocking_ast != mds_blocking_ast) {
445                 l_unlock(&lock->l_resource->lr_namespace->ns_lock);
446                 RETURN(0);
447         }
448
449         lock->l_flags |= LDLM_FL_CBPENDING;
450         do_ast = (!lock->l_readers && !lock->l_writers);
451         l_unlock(&lock->l_resource->lr_namespace->ns_lock);
452
453         if (do_ast) {
454                 struct lustre_handle lockh;
455                 int rc;
456
457                 LDLM_DEBUG(lock, "already unused, calling ldlm_cli_cancel");
458                 ldlm_lock2handle(lock, &lockh);
459                 rc = ldlm_cli_cancel(&lockh);
460                 if (rc < 0)
461                         CERROR("ldlm_cli_cancel: %d\n", rc);
462         } else {
463                 LDLM_DEBUG(lock, "Lock still has references, will be "
464                            "cancelled later");
465         }
466         RETURN(0);
467 }
468
469 int mds_get_md(struct obd_device *obd, struct inode *inode, void *md,
470                int *size, int lock)
471 {
472         int rc = 0;
473         int lmm_size;
474
475         if (lock)
476                 down(&inode->i_sem);
477         rc = fsfilt_get_md(obd, inode, md, *size);
478
479         if (rc < 0) {
480                 CERROR("Error %d reading eadata for ino %lu\n",
481                        rc, inode->i_ino);
482         } else if (rc > 0) {
483                 lmm_size = rc;
484                 rc = mds_convert_lov_ea(obd, inode, md, lmm_size);
485
486                 if (rc == 0) {
487                         *size = lmm_size;
488                         rc = lmm_size;
489                 } else if (rc > 0) {
490                         *size = rc;
491                 }
492         }
493         if (lock)
494                 up(&inode->i_sem);
495
496         RETURN (rc);
497 }
498
499
500 /* Call with lock=1 if you want mds_pack_md to take the i_sem.
501  * Call with lock=0 if the caller has already taken the i_sem. */
502 int mds_pack_md(struct obd_device *obd, struct lustre_msg *msg, int offset,
503                 struct mds_body *body, struct inode *inode, int lock)
504 {
505         struct mds_obd *mds = &obd->u.mds;
506         void *lmm;
507         int lmm_size;
508         int rc;
509         ENTRY;
510
511         lmm = lustre_msg_buf(msg, offset, 0);
512         if (lmm == NULL) {
513                 /* Some problem with getting eadata when I sized the reply
514                  * buffer... */
515                 CDEBUG(D_INFO, "no space reserved for inode %lu MD\n",
516                        inode->i_ino);
517                 RETURN(0);
518         }
519         lmm_size = msg->buflens[offset];
520
521         /* I don't really like this, but it is a sanity check on the client
522          * MD request.  However, if the client doesn't know how much space
523          * to reserve for the MD, it shouldn't be bad to have too much space.
524          */
525         if (lmm_size > mds->mds_max_mdsize) {
526                 CWARN("Reading MD for inode %lu of %d bytes > max %d\n",
527                        inode->i_ino, lmm_size, mds->mds_max_mdsize);
528                 // RETURN(-EINVAL);
529         }
530         
531         rc = mds_get_md(obd, inode, lmm, &lmm_size, lock);
532         if (rc > 0) {
533                 if (S_ISDIR(inode->i_mode))
534                         body->valid |= OBD_MD_FLDIREA;
535                 else
536                         body->valid |= OBD_MD_FLEASIZE;
537                 body->eadatasize = lmm_size;
538                 rc = 0;
539         }
540
541         RETURN(rc);
542 }
543
544 static int mds_getattr_internal(struct obd_device *obd, struct dentry *dentry,
545                                 struct ptlrpc_request *req,
546                                 struct mds_body *reqbody, int reply_off)
547 {
548         struct mds_body *body;
549         struct inode *inode = dentry->d_inode;
550         int rc = 0;
551         ENTRY;
552
553         if (inode == NULL)
554                 RETURN(-ENOENT);
555
556         body = lustre_msg_buf(req->rq_repmsg, reply_off, sizeof(*body));
557         LASSERT(body != NULL);                 /* caller prepped reply */
558
559         mds_pack_inode2fid(&body->fid1, inode);
560         mds_pack_inode2body(body, inode);
561
562         if ((S_ISREG(inode->i_mode) && (reqbody->valid & OBD_MD_FLEASIZE)) ||
563             (S_ISDIR(inode->i_mode) && (reqbody->valid & OBD_MD_FLDIREA))) {
564                 rc = mds_pack_md(obd, req->rq_repmsg, reply_off + 1, body,
565                                  inode, 1);
566
567                 /* If we have LOV EA data, the OST holds size, atime, mtime */
568                 if (!(body->valid & OBD_MD_FLEASIZE) && 
569                     !(body->valid & OBD_MD_FLDIREA))
570                         body->valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS |
571                                         OBD_MD_FLATIME | OBD_MD_FLMTIME);
572         } else if (S_ISLNK(inode->i_mode) &&
573                    (reqbody->valid & OBD_MD_LINKNAME) != 0) {
574                 char *symname = lustre_msg_buf(req->rq_repmsg, reply_off + 1,0);
575                 int len;
576
577                 LASSERT (symname != NULL);       /* caller prepped reply */
578                 len = req->rq_repmsg->buflens[reply_off + 1];
579
580                 rc = inode->i_op->readlink(dentry, symname, len);
581                 if (rc < 0) {
582                         CERROR("readlink failed: %d\n", rc);
583                 } else if (rc != len - 1) {
584                         CERROR ("Unexpected readlink rc %d: expecting %d\n",
585                                 rc, len - 1);
586                         rc = -EINVAL;
587                 } else {
588                         CDEBUG(D_INODE, "read symlink dest %s\n", symname);
589                         body->valid |= OBD_MD_LINKNAME;
590                         body->eadatasize = rc + 1;
591                         symname[rc] = 0;        /* NULL terminate */
592                         rc = 0;
593                 }
594         }
595
596         RETURN(rc);
597 }
598
599 static int mds_getattr_pack_msg(struct ptlrpc_request *req, struct inode *inode,
600                                 int offset)
601 {
602         struct mds_obd *mds = mds_req2mds(req);
603         struct mds_body *body;
604         int rc = 0, size[2] = {sizeof(*body)}, bufcount = 1;
605         ENTRY;
606
607         body = lustre_msg_buf(req->rq_reqmsg, offset, sizeof (*body));
608         LASSERT(body != NULL);                 /* checked by caller */
609         LASSERT_REQSWABBED(req, offset);       /* swabbed by caller */
610
611         if ((S_ISREG(inode->i_mode) && (body->valid & OBD_MD_FLEASIZE)) ||
612             (S_ISDIR(inode->i_mode) && (body->valid & OBD_MD_FLDIREA))) {
613                 int ret;
614                 down(&inode->i_sem);
615                 ret = fsfilt_get_md(req->rq_export->exp_obd, inode, NULL, 0);
616                 up(&inode->i_sem);
617                 CDEBUG(D_INODE, "got %d bytes MD data for inode %lu\n",
618                        rc, inode->i_ino);
619                 if (ret < 0) {
620                         if (ret != -ENODATA) {
621                                 CERROR("error getting inode %lu MD: rc = %d\n",
622                                        inode->i_ino, ret);
623                                 /* should we return ret in req->rq_status? */
624                         }
625                         size[bufcount] = 0;
626                 } else if (ret > mds->mds_max_mdsize) {
627                         size[bufcount] = 0;
628                         CERROR("MD size %d larger than maximum possible %u\n",
629                                ret, mds->mds_max_mdsize);
630                 } else {
631                         size[bufcount] = ret;
632                 }
633                 bufcount++;
634         } else if (S_ISLNK(inode->i_mode) && (body->valid & OBD_MD_LINKNAME)) {
635                 if (inode->i_size + 1 != body->eadatasize)
636                         CERROR("symlink size: %Lu, reply space: %d\n",
637                                inode->i_size + 1, body->eadatasize);
638                 size[bufcount] = min_t(int, inode->i_size+1, body->eadatasize);
639                 bufcount++;
640                 CDEBUG(D_INODE, "symlink size: %Lu, reply space: %d\n",
641                        inode->i_size + 1, body->eadatasize);
642         }
643
644         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_GETATTR_PACK)) {
645                 CERROR("failed MDS_GETATTR_PACK test\n");
646                 req->rq_status = -ENOMEM;
647                 GOTO(out, rc = -ENOMEM);
648         }
649
650         rc = lustre_pack_reply(req, bufcount, size, NULL);
651         if (rc) {
652                 CERROR("lustre_pack_reply failed: rc %d\n", rc);
653                 GOTO(out, req->rq_status = rc);
654         }
655
656         EXIT;
657  out:
658         return(rc);
659 }
660
661 static int mds_getattr_name(int offset, struct ptlrpc_request *req,
662                             struct lustre_handle *child_lockh)
663 {
664         struct obd_device *obd = req->rq_export->exp_obd;
665         struct ldlm_reply *rep = NULL;
666         struct lvfs_run_ctxt saved;
667         struct mds_body *body;
668         struct dentry *dparent = NULL, *dchild = NULL;
669         struct lvfs_ucred uc;
670         struct lustre_handle parent_lockh;
671         int namesize;
672         int rc = 0, cleanup_phase = 0, resent_req = 0;
673         char *name;
674         ENTRY;
675
676         LASSERT(!strcmp(obd->obd_type->typ_name, "mds"));
677
678         /* Swab now, before anyone looks inside the request */
679
680         body = lustre_swab_reqbuf(req, offset, sizeof(*body),
681                                   lustre_swab_mds_body);
682         if (body == NULL) {
683                 CERROR("Can't swab mds_body\n");
684                 GOTO(cleanup, rc = -EFAULT);
685         }
686
687         LASSERT_REQSWAB(req, offset + 1);
688         name = lustre_msg_string(req->rq_reqmsg, offset + 1, 0);
689         if (name == NULL) {
690                 CERROR("Can't unpack name\n");
691                 GOTO(cleanup, rc = -EFAULT);
692         }
693         namesize = req->rq_reqmsg->buflens[offset + 1];
694
695         LASSERT (offset == 0 || offset == 2);
696         /* if requests were at offset 2, the getattr reply goes back at 1 */
697         if (offset) {
698                 rep = lustre_msg_buf(req->rq_repmsg, 0, sizeof (*rep));
699                 offset = 1;
700         }
701
702 #if CRAY_PORTALS
703         uc.luc_fsuid = req->rq_uid;
704 #else
705         uc.luc_fsuid = body->fsuid;
706 #endif
707         uc.luc_fsgid = body->fsgid;
708         uc.luc_cap = body->capability;
709         uc.luc_suppgid1 = body->suppgid;
710         uc.luc_suppgid2 = -1;
711         push_ctxt(&saved, &obd->obd_lvfs_ctxt, &uc);
712         cleanup_phase = 1; /* kernel context */
713         intent_set_disposition(rep, DISP_LOOKUP_EXECD);
714
715         /* FIXME: handle raw lookup */
716 #if 0
717         if (body->valid == OBD_MD_FLID) {
718                 struct mds_body *mds_reply;
719                 int size = sizeof(*mds_reply);
720                 ino_t inum;
721                 // The user requested ONLY the inode number, so do a raw lookup
722                 rc = lustre_pack_reply(req, 1, &size, NULL);
723                 if (rc) {
724                         CERROR("out of memory\n");
725                         GOTO(cleanup, rc);
726                 }
727
728                 rc = dir->i_op->lookup_raw(dir, name, namesize - 1, &inum);
729
730                 mds_reply = lustre_msg_buf(req->rq_repmsg, offset,
731                                            sizeof(*mds_reply));
732                 mds_reply->fid1.id = inum;
733                 mds_reply->valid = OBD_MD_FLID;
734                 GOTO(cleanup, rc);
735         }
736 #endif
737
738         if (child_lockh->cookie != 0) {
739                 LASSERT(lustre_msg_get_flags(req->rq_reqmsg) & MSG_RESENT);
740                 resent_req = 1;
741         }
742
743         if (resent_req == 0) {
744                 rc = mds_get_parent_child_locked(obd, &obd->u.mds, &body->fid1,
745                                                  &parent_lockh, &dparent,
746                                                  LCK_PR, name, namesize,
747                                                  child_lockh, &dchild, LCK_PR);
748                 if (rc)
749                         GOTO(cleanup, rc);
750         } else {
751                 struct ldlm_lock *granted_lock;
752                 struct ll_fid child_fid;
753                 struct ldlm_resource *res;
754                 DEBUG_REQ(D_DLMTRACE, req, "resent, not enqueuing new locks");
755                 granted_lock = ldlm_handle2lock(child_lockh);
756                 LASSERTF(granted_lock != NULL, LPU64"/%u lockh "LPX64"\n",
757                          body->fid1.id, body->fid1.generation,
758                          child_lockh->cookie);
759
760
761                 res = granted_lock->l_resource;
762                 child_fid.id = res->lr_name.name[0];
763                 child_fid.generation = res->lr_name.name[1];
764                 dchild = mds_fid2dentry(&obd->u.mds, &child_fid, NULL);
765                 LASSERT(!IS_ERR(dchild));
766                 LDLM_LOCK_PUT(granted_lock);
767         }
768
769         cleanup_phase = 2; /* dchild, dparent, locks */
770
771         if (dchild->d_inode == NULL) {
772                 intent_set_disposition(rep, DISP_LOOKUP_NEG);
773                 /* in the intent case, the policy clears this error:
774                    the disposition is enough */
775                 GOTO(cleanup, rc = -ENOENT);
776         } else {
777                 intent_set_disposition(rep, DISP_LOOKUP_POS);
778         }
779
780         if (req->rq_repmsg == NULL) {
781                 rc = mds_getattr_pack_msg(req, dchild->d_inode, offset);
782                 if (rc != 0) {
783                         CERROR ("mds_getattr_pack_msg: %d\n", rc);
784                         GOTO (cleanup, rc);
785                 }
786         }
787
788         rc = mds_getattr_internal(obd, dchild, req, body, offset);
789         GOTO(cleanup, rc); /* returns the lock to the client */
790
791  cleanup:
792         switch (cleanup_phase) {
793         case 2:
794                 if (resent_req == 0) {
795                         if (rc && dchild->d_inode)
796                                 ldlm_lock_decref(child_lockh, LCK_PR);
797                         ldlm_lock_decref(&parent_lockh, LCK_PR);
798                         l_dput(dparent);
799                 }
800                 l_dput(dchild);
801         case 1:
802                 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, &uc);
803         default: ;
804         }
805         return rc;
806 }
807
808 static int mds_getattr(int offset, struct ptlrpc_request *req)
809 {
810         struct mds_obd *mds = mds_req2mds(req);
811         struct obd_device *obd = req->rq_export->exp_obd;
812         struct lvfs_run_ctxt saved;
813         struct dentry *de;
814         struct mds_body *body;
815         struct lvfs_ucred uc;
816         int rc = 0;
817         ENTRY;
818
819         body = lustre_swab_reqbuf(req, offset, sizeof(*body),
820                                   lustre_swab_mds_body);
821         if (body == NULL) {
822                 CERROR("Can't unpack body\n");
823                 RETURN(-EFAULT);
824         }
825
826 #if CRAY_PORTALS
827         uc.luc_fsuid = req->rq_uid;
828 #else
829         uc.luc_fsuid = body->fsuid;
830 #endif
831         uc.luc_fsgid = body->fsgid;
832         uc.luc_cap = body->capability;
833         push_ctxt(&saved, &obd->obd_lvfs_ctxt, &uc);
834         de = mds_fid2dentry(mds, &body->fid1, NULL);
835         if (IS_ERR(de)) {
836                 rc = req->rq_status = PTR_ERR(de);
837                 GOTO(out_pop, rc);
838         }
839
840         rc = mds_getattr_pack_msg(req, de->d_inode, offset);
841         if (rc != 0) {
842                 CERROR("mds_getattr_pack_msg: %d\n", rc);
843                 GOTO(out_pop, rc);
844         }
845
846         req->rq_status = mds_getattr_internal(obd, de, req, body, 0);
847
848         l_dput(de);
849         GOTO(out_pop, rc);
850 out_pop:
851         pop_ctxt(&saved, &obd->obd_lvfs_ctxt, &uc);
852         return rc;
853 }
854
855
856 static int mds_obd_statfs(struct obd_device *obd, struct obd_statfs *osfs,
857                           unsigned long max_age)
858 {
859         int rc;
860
861         spin_lock(&obd->obd_osfs_lock);
862         rc = fsfilt_statfs(obd, obd->u.mds.mds_sb, max_age);
863         if (rc == 0)
864                 memcpy(osfs, &obd->obd_osfs, sizeof(*osfs));
865         spin_unlock(&obd->obd_osfs_lock);
866
867         return rc;
868 }
869
870 static int mds_statfs(struct ptlrpc_request *req)
871 {
872         struct obd_device *obd = req->rq_export->exp_obd;
873         int rc, size = sizeof(struct obd_statfs);
874         ENTRY;
875
876         /* This will trigger a watchdog timeout */
877         OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_STATFS_LCW_SLEEP,
878                          (MDS_SERVICE_WATCHDOG_TIMEOUT / 1000) + 1);
879
880         rc = lustre_pack_reply(req, 1, &size, NULL);
881         if (rc || OBD_FAIL_CHECK(OBD_FAIL_MDS_STATFS_PACK)) {
882                 CERROR("mds: statfs lustre_pack_reply failed: rc = %d\n", rc);
883                 GOTO(out, rc);
884         }
885
886         /* We call this so that we can cache a bit - 1 jiffie worth */
887         rc = mds_obd_statfs(obd, lustre_msg_buf(req->rq_repmsg, 0, size),
888                             jiffies - HZ);
889         if (rc) {
890                 CERROR("mds_obd_statfs failed: rc %d\n", rc);
891                 GOTO(out, rc);
892         }
893
894         EXIT;
895 out:
896         req->rq_status = rc;
897         return 0;
898 }
899
900 static int mds_sync(struct ptlrpc_request *req)
901 {
902         struct obd_device *obd = req->rq_export->exp_obd;
903         struct mds_obd *mds = &obd->u.mds;
904         struct mds_body *body;
905         int rc, size = sizeof(*body);
906         ENTRY;
907
908         body = lustre_swab_reqbuf(req, 0, sizeof(*body), lustre_swab_mds_body);
909         if (body == NULL)
910                 GOTO(out, rc = -EFAULT);
911
912         rc = lustre_pack_reply(req, 1, &size, NULL);
913         if (rc || OBD_FAIL_CHECK(OBD_FAIL_MDS_SYNC_PACK)) {
914                 CERROR("fsync lustre_pack_reply failed: rc = %d\n", rc);
915                 GOTO(out, rc);
916         }
917
918         if (body->fid1.id == 0) {
919                 /* a fid of zero is taken to mean "sync whole filesystem" */
920                 rc = fsfilt_sync(obd, mds->mds_sb);
921                 GOTO(out, rc);
922         } else {
923                 struct dentry *de;
924
925                 de = mds_fid2dentry(mds, &body->fid1, NULL);
926                 if (IS_ERR(de))
927                         GOTO(out, rc = PTR_ERR(de));
928
929                 /* The file parameter isn't used for anything */
930                 if (de->d_inode->i_fop && de->d_inode->i_fop->fsync)
931                         rc = de->d_inode->i_fop->fsync(NULL, de, 1);
932                 if (rc == 0) {
933                         body = lustre_msg_buf(req->rq_repmsg, 0, sizeof(*body));
934                         mds_pack_inode2fid(&body->fid1, de->d_inode);
935                         mds_pack_inode2body(body, de->d_inode);
936                 }
937
938                 l_dput(de);
939                 GOTO(out, rc);
940         }
941 out:
942         req->rq_status = rc;
943         return 0;
944 }
945
946 /* mds_readpage does not take a DLM lock on the inode, because the client must
947  * already have a PR lock.
948  *
949  * If we were to take another one here, a deadlock will result, if another
950  * thread is already waiting for a PW lock. */
951 static int mds_readpage(struct ptlrpc_request *req)
952 {
953         struct obd_device *obd = req->rq_export->exp_obd;
954         struct vfsmount *mnt;
955         struct dentry *de;
956         struct file *file;
957         struct mds_body *body, *repbody;
958         struct lvfs_run_ctxt saved;
959         int rc, size = sizeof(*repbody);
960         struct lvfs_ucred uc;
961         ENTRY;
962
963         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_READPAGE_PACK))
964                 RETURN(-ENOMEM);
965
966         rc = lustre_pack_reply(req, 1, &size, NULL);
967         if (rc) {
968                 CERROR("mds: out of memory while packing readpage reply\n");
969                 RETURN(-ENOMEM);
970         }
971
972         body = lustre_swab_reqbuf(req, 0, sizeof(*body), lustre_swab_mds_body);
973         if (body == NULL)
974                 GOTO (out, rc = -EFAULT);
975
976 #if CRAY_PORTALS
977         uc.luc_fsuid = req->rq_uid;
978 #else
979         uc.luc_fsuid = body->fsuid;
980 #endif
981         uc.luc_fsgid = body->fsgid;
982         uc.luc_cap = body->capability;
983         push_ctxt(&saved, &obd->obd_lvfs_ctxt, &uc);
984         de = mds_fid2dentry(&obd->u.mds, &body->fid1, &mnt);
985         if (IS_ERR(de))
986                 GOTO(out_pop, rc = PTR_ERR(de));
987
988         CDEBUG(D_INODE, "ino %lu\n", de->d_inode->i_ino);
989
990         file = dentry_open(de, mnt, O_RDONLY | O_LARGEFILE);
991         /* note: in case of an error, dentry_open puts dentry */
992         if (IS_ERR(file))
993                 GOTO(out_pop, rc = PTR_ERR(file));
994
995         /* body->size is actually the offset -eeb */
996         if ((body->size & (de->d_inode->i_blksize - 1)) != 0) {
997                 CERROR("offset "LPU64" not on a block boundary of %lu\n",
998                        body->size, de->d_inode->i_blksize);
999                 GOTO(out_file, rc = -EFAULT);
1000         }
1001
1002         /* body->nlink is actually the #bytes to read -eeb */
1003         if (body->nlink & (de->d_inode->i_blksize - 1)) {
1004                 CERROR("size %u is not multiple of blocksize %lu\n",
1005                        body->nlink, de->d_inode->i_blksize);
1006                 GOTO(out_file, rc = -EFAULT);
1007         }
1008
1009         repbody = lustre_msg_buf(req->rq_repmsg, 0, sizeof (*repbody));
1010         repbody->size = file->f_dentry->d_inode->i_size;
1011         repbody->valid = OBD_MD_FLSIZE;
1012
1013         /* to make this asynchronous make sure that the handling function
1014            doesn't send a reply when this function completes. Instead a
1015            callback function would send the reply */
1016         /* body->size is actually the offset -eeb */
1017         rc = mds_sendpage(req, file, body->size, body->nlink);
1018
1019 out_file:
1020         filp_close(file, 0);
1021 out_pop:
1022         pop_ctxt(&saved, &obd->obd_lvfs_ctxt, &uc);
1023 out:
1024         req->rq_status = rc;
1025         RETURN(0);
1026 }
1027
1028 int mds_reint(struct ptlrpc_request *req, int offset,
1029               struct lustre_handle *lockh)
1030 {
1031         struct mds_update_record *rec; /* 116 bytes on the stack?  no sir! */
1032         int rc;
1033
1034         OBD_ALLOC(rec, sizeof(*rec));
1035         if (rec == NULL)
1036                 RETURN(-ENOMEM);
1037
1038         rc = mds_update_unpack(req, offset, rec);
1039         if (rc || OBD_FAIL_CHECK(OBD_FAIL_MDS_REINT_UNPACK)) {
1040                 CERROR("invalid record\n");
1041                 GOTO(out, req->rq_status = -EINVAL);
1042         }
1043         /* rc will be used to interrupt a for loop over multiple records */
1044         rc = mds_reint_rec(rec, offset, req, lockh);
1045  out:
1046         OBD_FREE(rec, sizeof(*rec));
1047         return rc;
1048 }
1049
1050 static int mds_filter_recovery_request(struct ptlrpc_request *req,
1051                                        struct obd_device *obd, int *process)
1052 {
1053         switch (req->rq_reqmsg->opc) {
1054         case MDS_CONNECT: /* This will never get here, but for completeness. */
1055         case OST_CONNECT: /* This will never get here, but for completeness. */
1056         case MDS_DISCONNECT:
1057         case OST_DISCONNECT:
1058                *process = 1;
1059                RETURN(0);
1060
1061         case MDS_CLOSE:
1062         case MDS_SYNC: /* used in unmounting */
1063         case OBD_PING:
1064         case MDS_REINT:
1065         case LDLM_ENQUEUE:
1066                 *process = target_queue_recovery_request(req, obd);
1067                 RETURN(0);
1068
1069         default:
1070                 DEBUG_REQ(D_ERROR, req, "not permitted during recovery");
1071                 *process = 0;
1072                 /* XXX what should we set rq_status to here? */
1073                 req->rq_status = -EAGAIN;
1074                 RETURN(ptlrpc_error(req));
1075         }
1076 }
1077
1078 static char *reint_names[] = {
1079         [REINT_SETATTR] "setattr",
1080         [REINT_CREATE]  "create",
1081         [REINT_LINK]    "link",
1082         [REINT_UNLINK]  "unlink",
1083         [REINT_RENAME]  "rename",
1084         [REINT_OPEN]    "open",
1085 };
1086
1087 static int mds_set_info(struct obd_export *exp, struct ptlrpc_request *req)
1088 {
1089         char *key;
1090         __u32 *val;
1091         int keylen, rc = 0;
1092         ENTRY;
1093
1094         key = lustre_msg_buf(req->rq_reqmsg, 0, 1);
1095         if (key == NULL) {
1096                 DEBUG_REQ(D_HA, req, "no set_info key");
1097                 RETURN(-EFAULT);
1098         }
1099         keylen = req->rq_reqmsg->buflens[0];
1100
1101         val = lustre_msg_buf(req->rq_reqmsg, 1, sizeof(*val));
1102         if (val == NULL) {
1103                 DEBUG_REQ(D_HA, req, "no set_info val");
1104                 RETURN(-EFAULT);
1105         }
1106
1107         rc = lustre_pack_reply(req, 0, NULL, NULL);
1108         if (rc)
1109                 RETURN(rc);
1110         req->rq_repmsg->status = 0;
1111
1112         if (keylen < strlen("read-only") ||
1113             memcmp(key, "read-only", keylen) != 0)
1114                 RETURN(-EINVAL);
1115
1116         if (*val)
1117                 exp->exp_connect_flags |= OBD_CONNECT_RDONLY;
1118         else
1119                 exp->exp_connect_flags &= ~OBD_CONNECT_RDONLY;
1120
1121         RETURN(0);
1122 }
1123
1124
1125 int mds_handle(struct ptlrpc_request *req)
1126 {
1127         int should_process, fail = OBD_FAIL_MDS_ALL_REPLY_NET;
1128         int rc = 0;
1129         struct mds_obd *mds = NULL; /* quell gcc overwarning */
1130         struct obd_device *obd = NULL;
1131         ENTRY;
1132
1133         OBD_FAIL_RETURN(OBD_FAIL_MDS_ALL_REQUEST_NET | OBD_FAIL_ONCE, 0);
1134
1135         LASSERT(current->journal_info == NULL);
1136         /* XXX identical to OST */
1137         if (req->rq_reqmsg->opc != MDS_CONNECT) {
1138                 struct mds_export_data *med;
1139                 int recovering, abort_recovery;
1140
1141                 if (req->rq_export == NULL) {
1142                         CERROR("lustre_mds: operation %d on unconnected MDS\n",
1143                                req->rq_reqmsg->opc);
1144                         req->rq_status = -ENOTCONN;
1145                         GOTO(out, rc = -ENOTCONN);
1146                 }
1147
1148                 med = &req->rq_export->exp_mds_data;
1149                 obd = req->rq_export->exp_obd;
1150                 mds = &obd->u.mds;
1151
1152                 /* sanity check: if the xid matches, the request must
1153                  * be marked as a resent or replayed */
1154                 if (req->rq_xid == med->med_mcd->mcd_last_xid)
1155                         LASSERTF(lustre_msg_get_flags(req->rq_reqmsg) &
1156                                  (MSG_RESENT | MSG_REPLAY),
1157                                  "rq_xid "LPU64" matches last_xid, "
1158                                  "expected RESENT flag\n",
1159                                  req->rq_xid);
1160                 /* else: note the opposite is not always true; a
1161                  * RESENT req after a failover will usually not match
1162                  * the last_xid, since it was likely never
1163                  * committed. A REPLAYed request will almost never
1164                  * match the last xid, however it could for a
1165                  * committed, but still retained, open. */
1166
1167                 /* Check for aborted recovery. */
1168                 spin_lock_bh(&obd->obd_processing_task_lock);
1169                 abort_recovery = obd->obd_abort_recovery;
1170                 recovering = obd->obd_recovering;
1171                 spin_unlock_bh(&obd->obd_processing_task_lock);
1172                 if (abort_recovery) {
1173                         target_abort_recovery(obd);
1174                 } else if (recovering) {
1175                         rc = mds_filter_recovery_request(req, obd,
1176                                                          &should_process);
1177                         if (rc || !should_process)
1178                                 RETURN(rc);
1179                 }
1180         }
1181
1182         switch (req->rq_reqmsg->opc) {
1183         case MDS_CONNECT:
1184                 DEBUG_REQ(D_INODE, req, "connect");
1185                 OBD_FAIL_RETURN(OBD_FAIL_MDS_CONNECT_NET, 0);
1186                 rc = target_handle_connect(req, mds_handle);
1187                 if (!rc) {
1188                         /* Now that we have an export, set mds. */
1189                         obd = req->rq_export->exp_obd;
1190                         mds = mds_req2mds(req);
1191                 }
1192                 break;
1193
1194         case MDS_DISCONNECT:
1195                 DEBUG_REQ(D_INODE, req, "disconnect");
1196                 OBD_FAIL_RETURN(OBD_FAIL_MDS_DISCONNECT_NET, 0);
1197                 rc = target_handle_disconnect(req);
1198                 req->rq_status = rc;            /* superfluous? */
1199                 break;
1200
1201         case MDS_GETSTATUS:
1202                 DEBUG_REQ(D_INODE, req, "getstatus");
1203                 OBD_FAIL_RETURN(OBD_FAIL_MDS_GETSTATUS_NET, 0);
1204                 rc = mds_getstatus(req);
1205                 break;
1206
1207         case MDS_GETATTR:
1208                 DEBUG_REQ(D_INODE, req, "getattr");
1209                 OBD_FAIL_RETURN(OBD_FAIL_MDS_GETATTR_NET, 0);
1210                 rc = mds_getattr(0, req);
1211                 break;
1212
1213         case MDS_GETATTR_NAME: {
1214                 struct lustre_handle lockh;
1215                 DEBUG_REQ(D_INODE, req, "getattr_name");
1216                 OBD_FAIL_RETURN(OBD_FAIL_MDS_GETATTR_NAME_NET, 0);
1217
1218                 /* If this request gets a reconstructed reply, we won't be
1219                  * acquiring any new locks in mds_getattr_name, so we don't
1220                  * want to cancel.
1221                  */
1222                 lockh.cookie = 0;
1223                 rc = mds_getattr_name(0, req, &lockh);
1224                 /* this non-intent call (from an ioctl) is special */
1225                 req->rq_status = rc;
1226                 if (rc == 0 && lockh.cookie)
1227                         ldlm_lock_decref(&lockh, LCK_PR);
1228                 break;
1229         }
1230         case MDS_STATFS:
1231                 DEBUG_REQ(D_INODE, req, "statfs");
1232                 OBD_FAIL_RETURN(OBD_FAIL_MDS_STATFS_NET, 0);
1233                 rc = mds_statfs(req);
1234                 break;
1235
1236         case MDS_READPAGE:
1237                 DEBUG_REQ(D_INODE, req, "readpage");
1238                 OBD_FAIL_RETURN(OBD_FAIL_MDS_READPAGE_NET, 0);
1239                 rc = mds_readpage(req);
1240
1241                 if (OBD_FAIL_CHECK_ONCE(OBD_FAIL_MDS_SENDPAGE)) {
1242                         RETURN(0);
1243                 }
1244
1245                 break;
1246
1247         case MDS_REINT: {
1248                 __u32 *opcp = lustre_msg_buf(req->rq_reqmsg, 0, sizeof (*opcp));
1249                 __u32  opc;
1250                 int size[3] = {sizeof(struct mds_body), mds->mds_max_mdsize,
1251                                mds->mds_max_cookiesize};
1252                 int bufcount;
1253
1254                 /* NB only peek inside req now; mds_reint() will swab it */
1255                 if (opcp == NULL) {
1256                         CERROR ("Can't inspect opcode\n");
1257                         rc = -EINVAL;
1258                         break;
1259                 }
1260                 opc = *opcp;
1261                 if (lustre_msg_swabbed (req->rq_reqmsg))
1262                         __swab32s(&opc);
1263
1264                 DEBUG_REQ(D_INODE, req, "reint %d (%s)", opc,
1265                           (opc < sizeof(reint_names) / sizeof(reint_names[0]) ||
1266                            reint_names[opc] == NULL) ? reint_names[opc] :
1267                                                        "unknown opcode");
1268
1269                 OBD_FAIL_RETURN(OBD_FAIL_MDS_REINT_NET, 0);
1270
1271                 if (opc == REINT_UNLINK || opc == REINT_RENAME)
1272                         bufcount = 3;
1273                 else if (opc == REINT_OPEN)
1274                         bufcount = 2;
1275                 else
1276                         bufcount = 1;
1277
1278                 rc = lustre_pack_reply(req, bufcount, size, NULL);
1279                 if (rc)
1280                         break;
1281
1282                 rc = mds_reint(req, 0, NULL);
1283                 fail = OBD_FAIL_MDS_REINT_NET_REP;
1284                 break;
1285         }
1286
1287         case MDS_CLOSE:
1288                 DEBUG_REQ(D_INODE, req, "close");
1289                 OBD_FAIL_RETURN(OBD_FAIL_MDS_CLOSE_NET, 0);
1290                 rc = mds_close(req);
1291                 break;
1292
1293         case MDS_DONE_WRITING:
1294                 DEBUG_REQ(D_INODE, req, "done_writing");
1295                 OBD_FAIL_RETURN(OBD_FAIL_MDS_DONE_WRITING_NET, 0);
1296                 rc = mds_done_writing(req);
1297                 break;
1298
1299         case MDS_PIN:
1300                 DEBUG_REQ(D_INODE, req, "pin");
1301                 OBD_FAIL_RETURN(OBD_FAIL_MDS_PIN_NET, 0);
1302                 rc = mds_pin(req);
1303                 break;
1304
1305         case MDS_SYNC:
1306                 DEBUG_REQ(D_INODE, req, "sync");
1307                 OBD_FAIL_RETURN(OBD_FAIL_MDS_SYNC_NET, 0);
1308                 rc = mds_sync(req);
1309                 break;
1310
1311         case MDS_SET_INFO:
1312                 DEBUG_REQ(D_INODE, req, "set_info");
1313                 rc = mds_set_info(req->rq_export, req);
1314                 break;
1315
1316         case MDS_QUOTACHECK:
1317                 DEBUG_REQ(D_INODE, req, "quotacheck");
1318                 OBD_FAIL_RETURN(OBD_FAIL_MDS_QUOTACHECK_NET, 0);
1319                 rc = mds_quotacheck(req);
1320                 break;
1321
1322         case MDS_QUOTACTL:
1323                 DEBUG_REQ(D_INODE, req, "quotactl");
1324                 OBD_FAIL_RETURN(OBD_FAIL_MDS_QUOTACTL_NET, 0);
1325                 rc = mds_quotactl(req);
1326                 break;
1327
1328         case OBD_PING:
1329                 DEBUG_REQ(D_INODE, req, "ping");
1330                 rc = target_handle_ping(req);
1331                 break;
1332
1333         case OBD_LOG_CANCEL:
1334                 CDEBUG(D_INODE, "log cancel\n");
1335                 OBD_FAIL_RETURN(OBD_FAIL_OBD_LOG_CANCEL_NET, 0);
1336                 rc = -ENOTSUPP; /* la la la */
1337                 break;
1338
1339         case LDLM_ENQUEUE:
1340                 DEBUG_REQ(D_INODE, req, "enqueue");
1341                 OBD_FAIL_RETURN(OBD_FAIL_LDLM_ENQUEUE, 0);
1342                 rc = ldlm_handle_enqueue(req, ldlm_server_completion_ast,
1343                                          ldlm_server_blocking_ast, NULL);
1344                 fail = OBD_FAIL_LDLM_REPLY;
1345                 break;
1346         case LDLM_CONVERT:
1347                 DEBUG_REQ(D_INODE, req, "convert");
1348                 OBD_FAIL_RETURN(OBD_FAIL_LDLM_CONVERT, 0);
1349                 rc = ldlm_handle_convert(req);
1350                 break;
1351         case LDLM_BL_CALLBACK:
1352         case LDLM_CP_CALLBACK:
1353                 DEBUG_REQ(D_INODE, req, "callback");
1354                 CERROR("callbacks should not happen on MDS\n");
1355                 LBUG();
1356                 OBD_FAIL_RETURN(OBD_FAIL_LDLM_BL_CALLBACK, 0);
1357                 break;
1358         case LLOG_ORIGIN_HANDLE_CREATE:
1359                 DEBUG_REQ(D_INODE, req, "llog_init");
1360                 OBD_FAIL_RETURN(OBD_FAIL_OBD_LOGD_NET, 0);
1361                 rc = llog_origin_handle_create(req);
1362                 break;
1363         case LLOG_ORIGIN_HANDLE_NEXT_BLOCK:
1364                 DEBUG_REQ(D_INODE, req, "llog next block");
1365                 OBD_FAIL_RETURN(OBD_FAIL_OBD_LOGD_NET, 0);
1366                 rc = llog_origin_handle_next_block(req);
1367                 break;
1368         case LLOG_ORIGIN_HANDLE_READ_HEADER:
1369                 DEBUG_REQ(D_INODE, req, "llog read header");
1370                 OBD_FAIL_RETURN(OBD_FAIL_OBD_LOGD_NET, 0);
1371                 rc = llog_origin_handle_read_header(req);
1372                 break;
1373         case LLOG_ORIGIN_HANDLE_CLOSE:
1374                 DEBUG_REQ(D_INODE, req, "llog close");
1375                 OBD_FAIL_RETURN(OBD_FAIL_OBD_LOGD_NET, 0);
1376                 rc = llog_origin_handle_close(req);
1377                 break;
1378         case LLOG_CATINFO:
1379                 DEBUG_REQ(D_INODE, req, "llog catinfo");
1380                 OBD_FAIL_RETURN(OBD_FAIL_OBD_LOGD_NET, 0);
1381                 rc = llog_catinfo(req);
1382                 break;
1383         default:
1384                 req->rq_status = -ENOTSUPP;
1385                 rc = ptlrpc_error(req);
1386                 RETURN(rc);
1387         }
1388
1389         LASSERT(current->journal_info == NULL);
1390
1391         /* If we're DISCONNECTing, the mds_export_data is already freed */
1392         if (!rc && req->rq_reqmsg->opc != MDS_DISCONNECT) {
1393                 struct mds_export_data *med = &req->rq_export->exp_mds_data;
1394                 req->rq_repmsg->last_xid =
1395                         le64_to_cpu(med->med_mcd->mcd_last_xid);
1396
1397                 target_committed_to_req(req);
1398         }
1399
1400         EXIT;
1401  out:
1402
1403         if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_LAST_REPLAY) {
1404                 if (obd && obd->obd_recovering) {
1405                         DEBUG_REQ(D_HA, req, "LAST_REPLAY, queuing reply");
1406                         return target_queue_final_reply(req, rc);
1407                 }
1408                 /* Lost a race with recovery; let the error path DTRT. */
1409                 rc = req->rq_status = -ENOTCONN;
1410         }
1411
1412         target_send_reply(req, rc, fail);
1413         return 0;
1414 }
1415
1416 /* Update the server data on disk.  This stores the new mount_count and
1417  * also the last_rcvd value to disk.  If we don't have a clean shutdown,
1418  * then the server last_rcvd value may be less than that of the clients.
1419  * This will alert us that we may need to do client recovery.
1420  *
1421  * Also assumes for mds_last_transno that we are not modifying it (no locking).
1422  */
1423 int mds_update_server_data(struct obd_device *obd, int force_sync)
1424 {
1425         struct mds_obd *mds = &obd->u.mds;
1426         struct lr_server_data *lsd = mds->mds_server_data;
1427         struct file *filp = mds->mds_rcvd_filp;
1428         struct lvfs_run_ctxt saved;
1429         loff_t off = 0;
1430         int rc;
1431         ENTRY;
1432
1433         push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
1434         lsd->lsd_last_transno = cpu_to_le64(mds->mds_last_transno);
1435
1436         CDEBUG(D_SUPER, "MDS mount_count is "LPU64", last_transno is "LPU64"\n",
1437                mds->mds_mount_count, mds->mds_last_transno);
1438         rc = fsfilt_write_record(obd, filp, lsd, sizeof(*lsd), &off,force_sync);
1439         if (rc)
1440                 CERROR("error writing MDS server data: rc = %d\n", rc);
1441         pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
1442
1443         RETURN(rc);
1444 }
1445
1446
1447 /* mount the file system (secretly) */
1448 static int mds_setup(struct obd_device *obd, obd_count len, void *buf)
1449 {
1450         struct lprocfs_static_vars lvars;
1451         struct lustre_cfg* lcfg = buf;
1452         char *options = NULL;
1453         struct mds_obd *mds = &obd->u.mds;
1454         struct lustre_mount_info *lmi;
1455         struct vfsmount *mnt;
1456         char ns_name[48];
1457         unsigned long page;
1458         int rc = 0;
1459         ENTRY;
1460
1461         /* setup 1:/dev/loop/0 2:ext3 3:mdsA 4:errors=remount-ro,iopen_nopriv*/
1462
1463         if (lcfg->lcfg_bufcount < 3)
1464                 RETURN(rc = -EINVAL);
1465
1466         if (LUSTRE_CFG_BUFLEN(lcfg, 1) == 0 || LUSTRE_CFG_BUFLEN(lcfg, 2) == 0)
1467                 RETURN(rc = -EINVAL);
1468
1469         obd->obd_fsops = fsfilt_get_ops(lustre_cfg_string(lcfg, 2));
1470         if (IS_ERR(obd->obd_fsops))
1471                 RETURN(rc = PTR_ERR(obd->obd_fsops));
1472
1473         lmi = lustre_get_mount(obd->obd_name);
1474         if (lmi) {
1475                 /* We already mounted in lustre_fill_super */
1476                 mnt = lmi->lmi_mnt;
1477         } else {
1478                 /* old path - used for llog writing from mkfs.lustre */
1479                 CERROR("Using old MDS mount method\n");
1480                 page = __get_free_page(GFP_KERNEL);
1481                 if (!page)
1482                         RETURN(-ENOMEM);
1483
1484                 options = (char *)page;
1485                 memset(options, 0, PAGE_SIZE);
1486
1487                 /* here we use "iopen_nopriv" hardcoded, because it affects MDS utility
1488                  * and the rest of options are passed by mount options. Probably this
1489                  * should be moved to somewhere else like startup scripts or lconf. */
1490                 sprintf(options, "iopen_nopriv");
1491
1492                 if (LUSTRE_CFG_BUFLEN(lcfg, 4) > 0 && lustre_cfg_buf(lcfg, 4))
1493                         sprintf(options + strlen(options), ",%s",
1494                                 lustre_cfg_string(lcfg, 4));
1495                 
1496                 mnt = do_kern_mount(lustre_cfg_string(lcfg, 2), 0,
1497                                     lustre_cfg_string(lcfg, 1), (void *)options);
1498                 free_page(page);
1499                 if (IS_ERR(mnt)) {
1500                         rc = PTR_ERR(mnt);
1501                         CERROR("do_kern_mount failed: rc = %d\n", rc);
1502                         GOTO(err_ops, rc);
1503                 }
1504         }
1505
1506         CDEBUG(D_SUPER, "%s: mnt = %p\n", lustre_cfg_string(lcfg, 1), mnt);
1507
1508         LASSERT(!lvfs_check_rdonly(lvfs_sbdev(mnt->mnt_sb)));
1509
1510         sema_init(&mds->mds_orphan_recovery_sem, 1);
1511         sema_init(&mds->mds_epoch_sem, 1);
1512         spin_lock_init(&mds->mds_transno_lock);
1513         mds->mds_max_mdsize = sizeof(struct lov_mds_md);
1514         mds->mds_max_cookiesize = sizeof(struct llog_cookie);
1515
1516         sprintf(ns_name, "mds-%s", obd->obd_uuid.uuid);
1517         obd->obd_namespace = ldlm_namespace_new(ns_name, LDLM_NAMESPACE_SERVER);
1518         if (obd->obd_namespace == NULL) {
1519                 mds_cleanup(obd);
1520                 GOTO(err_put, rc = -ENOMEM);
1521         }
1522         ldlm_register_intent(obd->obd_namespace, mds_intent_policy);
1523
1524         rc = mds_fs_setup(obd, mnt);
1525         if (rc) {
1526                 CERROR("%s: MDS filesystem method init failed: rc = %d\n",
1527                        obd->obd_name, rc);
1528                 GOTO(err_ns, rc);
1529         }
1530
1531         rc = llog_start_commit_thread();
1532         if (rc < 0)
1533                 GOTO(err_fs, rc);
1534
1535         if (lcfg->lcfg_bufcount >= 4 && LUSTRE_CFG_BUFLEN(lcfg, 3) > 0) {
1536                 class_uuid_t uuid;
1537
1538                 generate_random_uuid(uuid);
1539                 class_uuid_unparse(uuid, &mds->mds_lov_uuid);
1540
1541                 OBD_ALLOC(mds->mds_profile, LUSTRE_CFG_BUFLEN(lcfg, 3));
1542                 if (mds->mds_profile == NULL)
1543                         GOTO(err_fs, rc = -ENOMEM);
1544
1545                 strncpy(mds->mds_profile, lustre_cfg_string(lcfg, 3),
1546                         LUSTRE_CFG_BUFLEN(lcfg, 3));
1547
1548         }
1549
1550         ptlrpc_init_client(LDLM_CB_REQUEST_PORTAL, LDLM_CB_REPLY_PORTAL,
1551                            "mds_ldlm_client", &obd->obd_ldlm_client);
1552         obd->obd_replayable = 1;
1553
1554         mds_quota_setup(mds);
1555
1556         rc = mds_postsetup(obd);
1557         if (rc)
1558                 GOTO(err_fs, rc);
1559
1560         lprocfs_init_vars(mds, &lvars);
1561         lprocfs_obd_setup(obd, lvars.obd_vars);
1562
1563         if (obd->obd_recovering) {
1564                 LCONSOLE_WARN("MDT %s now serving %s, but will be in recovery "
1565                               "until %d %s reconnect, or if no clients "
1566                               "reconnect for %d:%.02d; during that time new "
1567                               "clients will not be allowed to connect. "
1568                               "Recovery progress can be monitored by watching "
1569                               "/proc/fs/lustre/mds/%s/recovery_status.\n",
1570                               obd->obd_name,
1571                               lustre_cfg_string(lcfg, 1),
1572                               obd->obd_recoverable_clients,
1573                               (obd->obd_recoverable_clients == 1) 
1574                               ? "client" : "clients",
1575                               (int)(OBD_RECOVERY_TIMEOUT / HZ) / 60,
1576                               (int)(OBD_RECOVERY_TIMEOUT / HZ) % 60,
1577                               obd->obd_name);
1578         } else {
1579                 LCONSOLE_INFO("MDT %s now serving %s with recovery %s.\n",
1580                               obd->obd_name,
1581                               lustre_cfg_string(lcfg, 1),
1582                               obd->obd_replayable ? "enabled" : "disabled");
1583         }
1584
1585         ldlm_timeout = 6;
1586         ping_evictor_start();
1587
1588         RETURN(0);
1589
1590 err_fs:
1591         /* No extra cleanup needed for llog_init_commit_thread() */
1592         mds_fs_cleanup(obd);
1593 err_ns:
1594         ldlm_namespace_free(obd->obd_namespace, 0);
1595         obd->obd_namespace = NULL;
1596 err_put:
1597         if (lmi) {
1598                 lustre_put_mount(obd->obd_name);
1599         } else {
1600                 /* old method */
1601                 unlock_kernel();
1602                 mntput(mds->mds_vfsmnt);
1603                 lock_kernel();
1604         }               
1605         mds->mds_sb = 0;
1606 err_ops:
1607         fsfilt_put_ops(obd->obd_fsops);
1608         return rc;
1609 }
1610
1611 static int mds_postsetup(struct obd_device *obd)
1612 {
1613         struct mds_obd *mds = &obd->u.mds;
1614         int rc = 0;
1615         ENTRY;
1616
1617         rc = llog_setup(obd, LLOG_CONFIG_ORIG_CTXT, obd, 0, NULL,
1618                         &llog_lvfs_ops);
1619         if (rc)
1620                 RETURN(rc);
1621
1622         if (mds->mds_profile) {
1623                 struct lvfs_run_ctxt saved;
1624                 struct lustre_profile *lprof;
1625                 struct config_llog_instance cfg;
1626
1627                 cfg.cfg_instance = NULL;
1628                 cfg.cfg_uuid = mds->mds_lov_uuid;
1629                 push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
1630                 rc = class_config_parse_llog(llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT), 
1631                                              mds->mds_profile, &cfg);
1632                 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
1633                 switch (rc) {
1634                 case 0:
1635                         break;
1636                 case -EINVAL:
1637                         LCONSOLE_ERROR("%s: the profile %s could not be read. "
1638                                        "If you recently installed a new "
1639                                        "version of Lustre, you may need to "
1640                                        "re-run 'lconf --write_conf "
1641                                        "<yourconfig>.xml' command line before "
1642                                        "restarting the MDS.\n",
1643                                        obd->obd_name, mds->mds_profile);
1644                         /* fall through */
1645                 default:
1646                         GOTO(err_llog, rc);
1647                         break;
1648                 }
1649
1650                 lprof = class_get_profile(mds->mds_profile);
1651                 if (lprof == NULL) {
1652                         CERROR("No profile found: %s\n", mds->mds_profile);
1653                         GOTO(err_cleanup, rc = -ENOENT);
1654                 }
1655                 rc = mds_lov_connect(obd, lprof->lp_osc);
1656                 if (rc)
1657                         GOTO(err_cleanup, rc);
1658         }
1659
1660         RETURN(rc);
1661
1662 err_cleanup:
1663         mds_lov_clean(obd);
1664 err_llog:
1665         llog_cleanup(llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT));
1666         RETURN(rc);
1667 }
1668
1669 int mds_postrecov(struct obd_device *obd)
1670 {
1671         struct mds_obd *mds = &obd->u.mds;
1672         int rc, item = 0;
1673         ENTRY;
1674
1675         if (obd->obd_fail) 
1676                 RETURN(0);
1677
1678         LASSERT(!obd->obd_recovering);
1679         LASSERT(llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT) != NULL);
1680
1681         /* set nextid first, so we are sure it happens */
1682         rc = mds_lov_set_nextid(obd);
1683         if (rc) {
1684                 CERROR ("%s: mds_lov_set_nextid failed\n",
1685                         obd->obd_name);
1686                 GOTO(out, rc);
1687         }
1688
1689         /* clean PENDING dir */
1690         rc = mds_cleanup_orphans(obd);
1691         if (rc < 0) {
1692                 GOTO(out, rc);
1693         } else {
1694                 item = rc;
1695         }
1696
1697         rc = obd_set_info(mds->mds_osc_exp, strlen("mds_conn"), "mds_conn",
1698                           0, NULL);
1699         if (rc)
1700                 GOTO(out, rc);
1701
1702         rc = llog_connect(llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT),
1703                           obd->u.mds.mds_lov_desc.ld_tgt_count,
1704                           NULL, NULL, NULL);
1705         if (rc) {
1706                 CERROR("%s: failed at llog_origin_connect: %d\n", 
1707                        obd->obd_name, rc);
1708                 GOTO(out, rc);
1709         }
1710
1711         /* remove the orphaned precreated objects */
1712         rc = mds_lov_clearorphans(mds, NULL /* all OSTs */);
1713         if (rc) {
1714                 GOTO(err_llog, rc);
1715         }
1716
1717 out:
1718         RETURN(rc < 0 ? rc : item);
1719
1720 err_llog:
1721         /* cleanup all llogging subsystems */
1722         rc = obd_llog_finish(obd, mds->mds_lov_desc.ld_tgt_count);
1723         if (rc)
1724                 CERROR("%s: failed to cleanup llogging subsystems\n",
1725                         obd->obd_name);
1726         goto out;
1727 }
1728
1729 int mds_lov_clean(struct obd_device *obd)
1730 {
1731         struct mds_obd *mds = &obd->u.mds;
1732
1733         if (mds->mds_profile) {
1734                 char * cln_prof;
1735                 struct config_llog_instance cfg;
1736                 struct lvfs_run_ctxt saved;
1737                 int len = strlen(mds->mds_profile) + sizeof("-clean") + 1;
1738
1739                 OBD_ALLOC(cln_prof, len);
1740                 sprintf(cln_prof, "%s-clean", mds->mds_profile);
1741
1742                 cfg.cfg_instance = NULL;
1743                 cfg.cfg_uuid = mds->mds_lov_uuid;
1744
1745                 push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
1746                 class_config_parse_llog(llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT), 
1747                                         cln_prof, &cfg);
1748                 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
1749
1750                 OBD_FREE(cln_prof, len);
1751                 OBD_FREE(mds->mds_profile, strlen(mds->mds_profile) + 1);
1752                 mds->mds_profile = NULL;
1753         }
1754         RETURN(0);
1755 }
1756
1757 static int mds_precleanup(struct obd_device *obd, int stage)
1758 {
1759         int rc = 0;
1760         ENTRY;
1761
1762         switch (stage) {
1763         case 1:
1764                 mds_lov_set_cleanup_flags(obd);
1765                 target_cleanup_recovery(obd);
1766                 break;
1767         case 2:
1768                 mds_lov_disconnect(obd);
1769                 mds_lov_clean(obd);
1770                 llog_cleanup(llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT));
1771                 rc = obd_llog_finish(obd, 0);
1772         }
1773         RETURN(rc);
1774 }
1775
1776 static int mds_cleanup(struct obd_device *obd)
1777 {
1778         struct mds_obd *mds = &obd->u.mds;
1779         lvfs_sbdev_type save_dev;
1780         int must_put = 0;
1781         int must_relock = 0;
1782         ENTRY;
1783
1784         ping_evictor_stop();
1785
1786         if (mds->mds_sb == NULL)
1787                 RETURN(0);
1788         save_dev = lvfs_sbdev(mds->mds_sb);
1789
1790         if (mds->mds_osc_exp)
1791                 /* lov export was disconnected by mds_lov_clean;
1792                    we just need to drop our ref */
1793                 class_export_put(mds->mds_osc_exp);
1794
1795         lprocfs_obd_cleanup(obd);
1796
1797         mds_quota_cleanup(mds);
1798
1799         mds_update_server_data(obd, 1);
1800         if (mds->mds_lov_objids != NULL) {
1801                 OBD_FREE(mds->mds_lov_objids,
1802                          mds->mds_lov_desc.ld_tgt_count * sizeof(obd_id));
1803         }
1804         mds_fs_cleanup(obd);
1805
1806         /* 2 seems normal on mds, (may_umount() also expects 2
1807           fwiw), but we only see 1 at this point in obdfilter. */
1808         if (atomic_read(&obd->u.mds.mds_vfsmnt->mnt_count) > 2)
1809                 CERROR("%s: mount busy, mnt_count %d != 2\n", obd->obd_name,
1810                        atomic_read(&obd->u.mds.mds_vfsmnt->mnt_count));
1811
1812         must_put = lustre_put_mount(obd->obd_name);
1813
1814         /* We can only unlock kernel if we are in the context of sys_ioctl,
1815            otherwise we never called lock_kernel */
1816         if (kernel_locked()) {
1817                 unlock_kernel();
1818                 must_relock++;
1819         }
1820         
1821         if (must_put) 
1822                 /* In case we didn't mount with lustre_get_mount -- old method*/
1823                 mntput(mds->mds_vfsmnt);
1824         mds->mds_sb = NULL;
1825
1826         ldlm_namespace_free(obd->obd_namespace, obd->obd_force);
1827
1828         spin_lock_bh(&obd->obd_processing_task_lock);
1829         if (obd->obd_recovering) {
1830                 target_cancel_recovery_timer(obd);
1831                 obd->obd_recovering = 0;
1832         }
1833         spin_unlock_bh(&obd->obd_processing_task_lock);
1834
1835         lvfs_clear_rdonly(save_dev);
1836
1837         if (must_relock)
1838                 lock_kernel();
1839
1840         fsfilt_put_ops(obd->obd_fsops);
1841
1842         LCONSOLE_INFO("MDT %s has stopped.\n", obd->obd_name);
1843
1844         RETURN(0);
1845 }
1846
1847 static void fixup_handle_for_resent_req(struct ptlrpc_request *req,
1848                                         struct ldlm_lock *new_lock,
1849                                         struct ldlm_lock **old_lock,
1850                                         struct lustre_handle *lockh)
1851 {
1852         struct obd_export *exp = req->rq_export;
1853         struct obd_device *obd = exp->exp_obd;
1854         struct ldlm_request *dlmreq =
1855                 lustre_msg_buf(req->rq_reqmsg, 0, sizeof (*dlmreq));
1856         struct lustre_handle remote_hdl = dlmreq->lock_handle1;
1857         struct list_head *iter;
1858
1859         if (!(lustre_msg_get_flags(req->rq_reqmsg) & MSG_RESENT))
1860                 return;
1861
1862         l_lock(&obd->obd_namespace->ns_lock);
1863         list_for_each(iter, &exp->exp_ldlm_data.led_held_locks) {
1864                 struct ldlm_lock *lock;
1865                 lock = list_entry(iter, struct ldlm_lock, l_export_chain);
1866                 if (lock == new_lock)
1867                         continue;
1868                 if (lock->l_remote_handle.cookie == remote_hdl.cookie) {
1869                         lockh->cookie = lock->l_handle.h_cookie;
1870                         LDLM_DEBUG(lock, "restoring lock cookie");
1871                         DEBUG_REQ(D_HA, req, "restoring lock cookie "LPX64,
1872                                   lockh->cookie);
1873                         if (old_lock)
1874                                 *old_lock = LDLM_LOCK_GET(lock);
1875                         l_unlock(&obd->obd_namespace->ns_lock);
1876                         return;
1877                 }
1878         }
1879         l_unlock(&obd->obd_namespace->ns_lock);
1880
1881         /* If the xid matches, then we know this is a resent request,
1882          * and allow it. (It's probably an OPEN, for which we don't
1883          * send a lock */
1884         if (req->rq_xid == 
1885             le64_to_cpu(exp->exp_mds_data.med_mcd->mcd_last_xid))
1886                 return;
1887
1888         /* This remote handle isn't enqueued, so we never received or
1889          * processed this request.  Clear MSG_RESENT, because it can
1890          * be handled like any normal request now. */
1891
1892         lustre_msg_clear_flags(req->rq_reqmsg, MSG_RESENT);
1893
1894         DEBUG_REQ(D_HA, req, "no existing lock with rhandle "LPX64,
1895                   remote_hdl.cookie);
1896 }
1897
1898 int intent_disposition(struct ldlm_reply *rep, int flag)
1899 {
1900         if (!rep)
1901                 return 0;
1902         return (rep->lock_policy_res1 & flag);
1903 }
1904
1905 void intent_set_disposition(struct ldlm_reply *rep, int flag)
1906 {
1907         if (!rep)
1908                 return;
1909         rep->lock_policy_res1 |= flag;
1910 }
1911
1912 static int mds_intent_policy(struct ldlm_namespace *ns,
1913                              struct ldlm_lock **lockp, void *req_cookie,
1914                              ldlm_mode_t mode, int flags, void *data)
1915 {
1916         struct ptlrpc_request *req = req_cookie;
1917         struct ldlm_lock *lock = *lockp;
1918         struct ldlm_intent *it;
1919         struct mds_obd *mds = &req->rq_export->exp_obd->u.mds;
1920         struct ldlm_reply *rep;
1921         struct lustre_handle lockh = { 0 };
1922         struct ldlm_lock *new_lock = NULL;
1923         int rc, offset = 2, repsize[4] = {sizeof(struct ldlm_reply),
1924                                           sizeof(struct mds_body),
1925                                           mds->mds_max_mdsize,
1926                                           mds->mds_max_cookiesize};
1927         ENTRY;
1928
1929         LASSERT(req != NULL);
1930
1931         if (req->rq_reqmsg->bufcount <= 1) {
1932                 /* No intent was provided */
1933                 int size = sizeof(struct ldlm_reply);
1934                 rc = lustre_pack_reply(req, 1, &size, NULL);
1935                 LASSERT(rc == 0);
1936                 RETURN(0);
1937         }
1938
1939         it = lustre_swab_reqbuf(req, 1, sizeof(*it), lustre_swab_ldlm_intent);
1940         if (it == NULL) {
1941                 CERROR("Intent missing\n");
1942                 RETURN(req->rq_status = -EFAULT);
1943         }
1944
1945         LDLM_DEBUG(lock, "intent policy, opc: %s", ldlm_it2str(it->opc));
1946
1947         rc = lustre_pack_reply(req, it->opc == IT_UNLINK ? 4 : 3, repsize,
1948                                NULL);
1949         if (rc)
1950                 RETURN(req->rq_status = rc);
1951
1952         rep = lustre_msg_buf(req->rq_repmsg, 0, sizeof (*rep));
1953         intent_set_disposition(rep, DISP_IT_EXECD);
1954
1955
1956         /* execute policy */
1957         switch ((long)it->opc) {
1958         case IT_OPEN:
1959         case IT_CREAT|IT_OPEN:
1960                 fixup_handle_for_resent_req(req, lock, NULL, &lockh);
1961                 /* XXX swab here to assert that an mds_open reint
1962                  * packet is following */
1963                 rep->lock_policy_res2 = mds_reint(req, offset, &lockh);
1964 #if 0
1965                 /* We abort the lock if the lookup was negative and
1966                  * we did not make it to the OPEN portion */
1967                 if (!intent_disposition(rep, DISP_LOOKUP_EXECD))
1968                         RETURN(ELDLM_LOCK_ABORTED);
1969                 if (intent_disposition(rep, DISP_LOOKUP_NEG) &&
1970                     !intent_disposition(rep, DISP_OPEN_OPEN))
1971 #endif 
1972                         RETURN(ELDLM_LOCK_ABORTED);
1973                 break;
1974         case IT_GETATTR:
1975         case IT_LOOKUP:
1976         case IT_READDIR:
1977                 fixup_handle_for_resent_req(req, lock, &new_lock, &lockh);
1978                 rep->lock_policy_res2 = mds_getattr_name(offset, req, &lockh);
1979                 /* FIXME: LDLM can set req->rq_status. MDS sets
1980                    policy_res{1,2} with disposition and status.
1981                    - replay: returns 0 & req->status is old status
1982                    - otherwise: returns req->status */
1983                 if (intent_disposition(rep, DISP_LOOKUP_NEG))
1984                         rep->lock_policy_res2 = 0;
1985                 if (!intent_disposition(rep, DISP_LOOKUP_POS) ||
1986                     rep->lock_policy_res2)
1987                         RETURN(ELDLM_LOCK_ABORTED);
1988                 if (req->rq_status != 0) {
1989                         LBUG();
1990                         rep->lock_policy_res2 = req->rq_status;
1991                         RETURN(ELDLM_LOCK_ABORTED);
1992                 }
1993                 break;
1994         default:
1995                 CERROR("Unhandled intent "LPD64"\n", it->opc);
1996                 LBUG();
1997         }
1998
1999         /* By this point, whatever function we called above must have either
2000          * filled in 'lockh', been an intent replay, or returned an error.  We
2001          * want to allow replayed RPCs to not get a lock, since we would just
2002          * drop it below anyways because lock replay is done separately by the
2003          * client afterwards.  For regular RPCs we want to give the new lock to
2004          * the client instead of whatever lock it was about to get. */
2005         if (new_lock == NULL)
2006                 new_lock = ldlm_handle2lock(&lockh);
2007         if (new_lock == NULL && (flags & LDLM_FL_INTENT_ONLY))
2008                 RETURN(0);
2009
2010         LASSERTF(new_lock != NULL, "op "LPX64" lockh "LPX64"\n",
2011                  it->opc, lockh.cookie);
2012
2013         /* If we've already given this lock to a client once, then we should
2014          * have no readers or writers.  Otherwise, we should have one reader
2015          * _or_ writer ref (which will be zeroed below) before returning the
2016          * lock to a client. */
2017         if (new_lock->l_export == req->rq_export) {
2018                 LASSERT(new_lock->l_readers + new_lock->l_writers == 0);
2019         } else {
2020                 LASSERT(new_lock->l_export == NULL);
2021                 LASSERT(new_lock->l_readers + new_lock->l_writers == 1);
2022         }
2023
2024         *lockp = new_lock;
2025
2026         if (new_lock->l_export == req->rq_export) {
2027                 /* Already gave this to the client, which means that we
2028                  * reconstructed a reply. */
2029                 LASSERT(lustre_msg_get_flags(req->rq_reqmsg) &
2030                         MSG_RESENT);
2031                 RETURN(ELDLM_LOCK_REPLACED);
2032         }
2033
2034         /* Fixup the lock to be given to the client */
2035         l_lock(&new_lock->l_resource->lr_namespace->ns_lock);
2036         new_lock->l_readers = 0;
2037         new_lock->l_writers = 0;
2038
2039         new_lock->l_export = class_export_get(req->rq_export);
2040         list_add(&new_lock->l_export_chain,
2041                  &new_lock->l_export->exp_ldlm_data.led_held_locks);
2042
2043         new_lock->l_blocking_ast = lock->l_blocking_ast;
2044         new_lock->l_completion_ast = lock->l_completion_ast;
2045
2046         memcpy(&new_lock->l_remote_handle, &lock->l_remote_handle,
2047                sizeof(lock->l_remote_handle));
2048
2049         new_lock->l_flags &= ~LDLM_FL_LOCAL;
2050
2051         LDLM_LOCK_PUT(new_lock);
2052         l_unlock(&new_lock->l_resource->lr_namespace->ns_lock);
2053
2054         RETURN(ELDLM_LOCK_REPLACED);
2055 }
2056
2057 static int mdt_setup(struct obd_device *obd, obd_count len, void *buf)
2058 {
2059         struct mds_obd *mds = &obd->u.mds;
2060         struct lprocfs_static_vars lvars;
2061         int rc = 0;
2062         ENTRY;
2063
2064         lprocfs_init_vars(mdt, &lvars);
2065         lprocfs_obd_setup(obd, lvars.obd_vars);
2066
2067         mds->mds_service =
2068                 ptlrpc_init_svc(MDS_NBUFS, MDS_BUFSIZE, MDS_MAXREQSIZE,
2069                                 MDS_REQUEST_PORTAL, MDC_REPLY_PORTAL,
2070                                 MDS_SERVICE_WATCHDOG_TIMEOUT,
2071                                 mds_handle, "mds", obd->obd_proc_entry, NULL);
2072
2073         if (!mds->mds_service) {
2074                 CERROR("failed to start service\n");
2075                 GOTO(err_lprocfs, rc = -ENOMEM);
2076         }
2077
2078         rc = ptlrpc_start_n_threads(obd, mds->mds_service, MDT_NUM_THREADS,
2079                                     "ll_mdt");
2080         if (rc)
2081                 GOTO(err_thread, rc);
2082
2083         mds->mds_setattr_service =
2084                 ptlrpc_init_svc(MDS_NBUFS, MDS_BUFSIZE, MDS_MAXREQSIZE,
2085                                 MDS_SETATTR_PORTAL, MDC_REPLY_PORTAL,
2086                                 MDS_SERVICE_WATCHDOG_TIMEOUT,
2087                                 mds_handle, "mds_setattr",
2088                                 obd->obd_proc_entry, NULL);
2089         if (!mds->mds_setattr_service) {
2090                 CERROR("failed to start getattr service\n");
2091                 GOTO(err_thread, rc = -ENOMEM);
2092         }
2093
2094         rc = ptlrpc_start_n_threads(obd, mds->mds_setattr_service,
2095                                     MDT_NUM_THREADS, "ll_mdt_attr");
2096         if (rc)
2097                 GOTO(err_thread2, rc);
2098
2099         mds->mds_readpage_service =
2100                 ptlrpc_init_svc(MDS_NBUFS, MDS_BUFSIZE, MDS_MAXREQSIZE,
2101                                 MDS_READPAGE_PORTAL, MDC_REPLY_PORTAL,
2102                                 MDS_SERVICE_WATCHDOG_TIMEOUT,
2103                                 mds_handle, "mds_readpage",
2104                                 obd->obd_proc_entry, NULL);
2105         if (!mds->mds_readpage_service) {
2106                 CERROR("failed to start readpage service\n");
2107                 GOTO(err_thread2, rc = -ENOMEM);
2108         }
2109
2110         rc = ptlrpc_start_n_threads(obd, mds->mds_readpage_service,
2111                                     MDT_NUM_THREADS, "ll_mdt_rdpg");
2112
2113         if (rc)
2114                 GOTO(err_thread3, rc);
2115
2116         RETURN(0);
2117
2118 err_thread3:
2119         ptlrpc_unregister_service(mds->mds_readpage_service);
2120 err_thread2:
2121         ptlrpc_unregister_service(mds->mds_setattr_service);
2122 err_thread:
2123         ptlrpc_unregister_service(mds->mds_service);
2124 err_lprocfs:
2125         lprocfs_obd_cleanup(obd);
2126         return rc;
2127 }
2128
2129 static int mdt_cleanup(struct obd_device *obd)
2130 {
2131         struct mds_obd *mds = &obd->u.mds;
2132         ENTRY;
2133
2134         ptlrpc_unregister_service(mds->mds_readpage_service);
2135         ptlrpc_unregister_service(mds->mds_setattr_service);
2136         ptlrpc_unregister_service(mds->mds_service);
2137
2138         lprocfs_obd_cleanup(obd);
2139
2140         RETURN(0);
2141 }
2142
2143 static struct dentry *mds_lvfs_fid2dentry(__u64 id, __u32 gen, __u64 gr,
2144                                           void *data)
2145 {
2146         struct obd_device *obd = data;
2147         struct ll_fid fid;
2148         fid.id = id;
2149         fid.generation = gen;
2150         return mds_fid2dentry(&obd->u.mds, &fid, NULL);
2151 }
2152
2153 struct lvfs_callback_ops mds_lvfs_ops = {
2154         l_fid2dentry:     mds_lvfs_fid2dentry,
2155 };
2156
2157 /* use obd ops to offer management infrastructure */
2158 static struct obd_ops mds_obd_ops = {
2159         .o_owner           = THIS_MODULE,
2160         .o_connect         = mds_connect,
2161         .o_init_export     = mds_init_export,
2162         .o_destroy_export  = mds_destroy_export,
2163         .o_disconnect      = mds_disconnect,
2164         .o_setup           = mds_setup,
2165         .o_precleanup      = mds_precleanup,
2166         .o_cleanup         = mds_cleanup,
2167         .o_postrecov       = mds_postrecov,
2168         .o_statfs          = mds_obd_statfs,
2169         .o_iocontrol       = mds_iocontrol,
2170         .o_create          = mds_obd_create,
2171         .o_destroy         = mds_obd_destroy,
2172         .o_llog_init       = mds_llog_init,
2173         .o_llog_finish     = mds_llog_finish,
2174         .o_notify          = mds_notify,
2175 };
2176
2177 static struct obd_ops mdt_obd_ops = {
2178         .o_owner           = THIS_MODULE,
2179         .o_setup           = mdt_setup,
2180         .o_cleanup         = mdt_cleanup,
2181 };
2182
2183 static int __init mds_init(void)
2184 {
2185         int rc;
2186         struct lprocfs_static_vars lvars;
2187
2188         rc = lustre_dquot_init();
2189         if (rc)
2190                 return rc;
2191         
2192         lprocfs_init_vars(mds, &lvars);
2193         class_register_type(&mds_obd_ops, lvars.module_vars, LUSTRE_MDS_NAME);
2194         lprocfs_init_vars(mdt, &lvars);
2195         class_register_type(&mdt_obd_ops, lvars.module_vars, LUSTRE_MDT_NAME);
2196
2197         return 0;
2198 }
2199
2200 static void /*__exit*/ mds_exit(void)
2201 {
2202         lustre_dquot_exit();
2203
2204         class_unregister_type(LUSTRE_MDS_NAME);
2205         class_unregister_type(LUSTRE_MDT_NAME);
2206 }
2207
2208 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
2209 MODULE_DESCRIPTION("Lustre Metadata Server (MDS)");
2210 MODULE_LICENSE("GPL");
2211
2212 module_init(mds_init);
2213 module_exit(mds_exit);