Whamcloud - gitweb
Introduction of lu_env.
[fs/lustre-release.git] / lustre / mds / handler.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  lustre/mds/handler.c
5  *  Lustre Metadata Server (mds) request handler
6  *
7  *  Copyright (c) 2001-2005 Cluster File Systems, Inc.
8  *   Author: Peter Braam <braam@clusterfs.com>
9  *   Author: Andreas Dilger <adilger@clusterfs.com>
10  *   Author: Phil Schwan <phil@clusterfs.com>
11  *   Author: Mike Shaver <shaver@clusterfs.com>
12  *
13  *   This file is part of the Lustre file system, http://www.lustre.org
14  *   Lustre is a trademark of Cluster File Systems, Inc.
15  *
16  *   You may have signed or agreed to another license before downloading
17  *   this software.  If so, you are bound by the terms and conditions
18  *   of that agreement, and the following does not apply to you.  See the
19  *   LICENSE file included with this distribution for more information.
20  *
21  *   If you did not agree to a different license, then this copy of Lustre
22  *   is open source software; you can redistribute it and/or modify it
23  *   under the terms of version 2 of the GNU General Public License as
24  *   published by the Free Software Foundation.
25  *
26  *   In either case, Lustre is distributed in the hope that it will be
27  *   useful, but WITHOUT ANY WARRANTY; without even the implied warranty
28  *   of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
29  *   license text for more details.
30  */
31
32 #ifndef EXPORT_SYMTAB
33 # define EXPORT_SYMTAB
34 #endif
35 #define DEBUG_SUBSYSTEM S_MDS
36
37 #include <lustre_mds.h>
38 #include <linux/module.h>
39 #include <linux/init.h>
40 #include <linux/random.h>
41 #include <linux/fs.h>
42 #include <linux/jbd.h>
43 #include <linux/ext3_fs.h>
44 #if (LINUX_VERSION_CODE >= KERNEL_VERSION(2,5,0))
45 # include <linux/smp_lock.h>
46 # include <linux/buffer_head.h>
47 # include <linux/workqueue.h>
48 # include <linux/mount.h>
49 #else
50 # include <linux/locks.h>
51 #endif
52
53 #include <linux/lustre_acl.h>
54 #include <obd_class.h>
55 #include <lustre_dlm.h>
56 #include <obd_lov.h>
57 #include <lustre_fsfilt.h>
58 #include <lprocfs_status.h>
59 #include <lustre_commit_confd.h>
60 #include <lustre_quota.h>
61 #include <lustre_disk.h>
62 #include <lustre_param.h>
63 #include <lustre_ver.h>
64
65 #include "mds_internal.h"
66
67 int mds_num_threads;
68 CFS_MODULE_PARM(mds_num_threads, "i", int, 0444,
69                 "number of MDS service threads to start");
70
71 static int mds_intent_policy(struct ldlm_namespace *ns,
72                              struct ldlm_lock **lockp, void *req_cookie,
73                              ldlm_mode_t mode, int flags, void *data);
74 static int mds_postsetup(struct obd_device *obd);
75 static int mds_cleanup(struct obd_device *obd);
76
77 /* Assumes caller has already pushed into the kernel filesystem context */
78 static int mds_sendpage(struct ptlrpc_request *req, struct file *file,
79                         loff_t offset, int count)
80 {
81         struct ptlrpc_bulk_desc *desc;
82         struct l_wait_info lwi;
83         struct page **pages;
84         int rc = 0, npages, i, tmpcount, tmpsize = 0;
85         ENTRY;
86
87         LASSERT((offset & (PAGE_SIZE - 1)) == 0); /* I'm dubious about this */
88
89         npages = (count + PAGE_SIZE - 1) >> PAGE_SHIFT;
90         OBD_ALLOC(pages, sizeof(*pages) * npages);
91         if (!pages)
92                 GOTO(out, rc = -ENOMEM);
93
94         desc = ptlrpc_prep_bulk_exp(req, npages, BULK_PUT_SOURCE,
95                                     MDS_BULK_PORTAL);
96         if (desc == NULL)
97                 GOTO(out_free, rc = -ENOMEM);
98
99         for (i = 0, tmpcount = count; i < npages; i++, tmpcount -= tmpsize) {
100                 tmpsize = tmpcount > PAGE_SIZE ? PAGE_SIZE : tmpcount;
101
102                 pages[i] = alloc_pages(GFP_KERNEL, 0);
103                 if (pages[i] == NULL)
104                         GOTO(cleanup_buf, rc = -ENOMEM);
105
106                 ptlrpc_prep_bulk_page(desc, pages[i], 0, tmpsize);
107         }
108
109         for (i = 0, tmpcount = count; i < npages; i++, tmpcount -= tmpsize) {
110                 tmpsize = tmpcount > PAGE_SIZE ? PAGE_SIZE : tmpcount;
111                 CDEBUG(D_EXT2, "reading %u@%llu from dir %lu (size %llu)\n",
112                        tmpsize, offset, file->f_dentry->d_inode->i_ino,
113                        file->f_dentry->d_inode->i_size);
114
115                 rc = fsfilt_readpage(req->rq_export->exp_obd, file,
116                                      kmap(pages[i]), tmpsize, &offset);
117                 kunmap(pages[i]);
118
119                 if (rc != tmpsize)
120                         GOTO(cleanup_buf, rc = -EIO);
121         }
122
123         LASSERT(desc->bd_nob == count);
124
125         rc = ptlrpc_start_bulk_transfer(desc);
126         if (rc)
127                 GOTO(cleanup_buf, rc);
128
129         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_SENDPAGE)) {
130                 CERROR("obd_fail_loc=%x, fail operation rc=%d\n",
131                        OBD_FAIL_MDS_SENDPAGE, rc);
132                 GOTO(abort_bulk, rc);
133         }
134
135         lwi = LWI_TIMEOUT(obd_timeout * HZ / 4, NULL, NULL);
136         rc = l_wait_event(desc->bd_waitq, !ptlrpc_bulk_active(desc), &lwi);
137         LASSERT (rc == 0 || rc == -ETIMEDOUT);
138
139         if (rc == 0) {
140                 if (desc->bd_success &&
141                     desc->bd_nob_transferred == count)
142                         GOTO(cleanup_buf, rc);
143
144                 rc = -ETIMEDOUT; /* XXX should this be a different errno? */
145         }
146
147         DEBUG_REQ(D_ERROR, req, "bulk failed: %s %d(%d), evicting %s@%s\n",
148                   (rc == -ETIMEDOUT) ? "timeout" : "network error",
149                   desc->bd_nob_transferred, count,
150                   req->rq_export->exp_client_uuid.uuid,
151                   req->rq_export->exp_connection->c_remote_uuid.uuid);
152
153         class_fail_export(req->rq_export);
154
155         EXIT;
156  abort_bulk:
157         ptlrpc_abort_bulk (desc);
158  cleanup_buf:
159         for (i = 0; i < npages; i++)
160                 if (pages[i])
161                         __free_pages(pages[i], 0);
162
163         ptlrpc_free_bulk(desc);
164  out_free:
165         OBD_FREE(pages, sizeof(*pages) * npages);
166  out:
167         return rc;
168 }
169
170 /* only valid locked dentries or errors should be returned */
171 struct dentry *mds_fid2locked_dentry(struct obd_device *obd, struct ll_fid *fid,
172                                      struct vfsmount **mnt, int lock_mode,
173                                      struct lustre_handle *lockh,
174                                      __u64 lockpart)
175 {
176         struct mds_obd *mds = &obd->u.mds;
177         struct dentry *de = mds_fid2dentry(mds, fid, mnt), *retval = de;
178         struct ldlm_res_id res_id = { .name = {0} };
179         int flags = LDLM_FL_ATOMIC_CB, rc;
180         ldlm_policy_data_t policy = { .l_inodebits = { lockpart} };
181         ENTRY;
182
183         if (IS_ERR(de))
184                 RETURN(de);
185
186         res_id.name[0] = de->d_inode->i_ino;
187         res_id.name[1] = de->d_inode->i_generation;
188         rc = ldlm_cli_enqueue_local(obd->obd_namespace, res_id,
189                                     LDLM_IBITS, &policy, lock_mode, &flags,
190                                     ldlm_blocking_ast, ldlm_completion_ast,
191                                     NULL, NULL, 0, NULL, lockh);
192         if (rc != ELDLM_OK) {
193                 l_dput(de);
194                 retval = ERR_PTR(-EIO); /* XXX translate ldlm code */
195         }
196
197         RETURN(retval);
198 }
199
200 /* Look up an entry by inode number. */
201 /* this function ONLY returns valid dget'd dentries with an initialized inode
202    or errors */
203 struct dentry *mds_fid2dentry(struct mds_obd *mds, struct ll_fid *fid,
204                               struct vfsmount **mnt)
205 {
206         char fid_name[32];
207         unsigned long ino = fid->id;
208         __u32 generation = fid->generation;
209         struct inode *inode;
210         struct dentry *result;
211
212         if (ino == 0)
213                 RETURN(ERR_PTR(-ESTALE));
214
215         snprintf(fid_name, sizeof(fid_name), "0x%lx", ino);
216
217         CDEBUG(D_DENTRY, "--> mds_fid2dentry: ino/gen %lu/%u, sb %p\n",
218                ino, generation, mds->mds_obt.obt_sb);
219
220         /* under ext3 this is neither supposed to return bad inodes
221            nor NULL inodes. */
222         result = ll_lookup_one_len(fid_name, mds->mds_fid_de, strlen(fid_name));
223         if (IS_ERR(result))
224                 RETURN(result);
225
226         inode = result->d_inode;
227         if (!inode)
228                 RETURN(ERR_PTR(-ENOENT));
229
230         if (inode->i_generation == 0 || inode->i_nlink == 0) {
231                 LCONSOLE_WARN("Found inode with zero generation or link -- this"
232                               " may indicate disk corruption (inode: %lu/%u, "
233                               "link %lu, count %d)\n", inode->i_ino,
234                               inode->i_generation,(unsigned long)inode->i_nlink,
235                               atomic_read(&inode->i_count));
236                 dput(result);
237                 RETURN(ERR_PTR(-ENOENT));
238         }
239
240         if (generation && inode->i_generation != generation) {
241                 /* we didn't find the right inode.. */
242                 CDEBUG(D_INODE, "found wrong generation: inode %lu, link: %lu, "
243                        "count: %d, generation %u/%u\n", inode->i_ino,
244                        (unsigned long)inode->i_nlink,
245                        atomic_read(&inode->i_count), inode->i_generation,
246                        generation);
247                 dput(result);
248                 RETURN(ERR_PTR(-ENOENT));
249         }
250
251         if (mnt) {
252                 *mnt = mds->mds_vfsmnt;
253                 mntget(*mnt);
254         }
255
256         RETURN(result);
257 }
258
259 static int mds_connect_internal(struct obd_export *exp,
260                                 struct obd_connect_data *data)
261 {
262         struct obd_device *obd = exp->exp_obd;
263         if (data != NULL) {
264                 data->ocd_connect_flags &= MDT_CONNECT_SUPPORTED;
265                 data->ocd_ibits_known &= MDS_INODELOCK_FULL;
266
267                 /* If no known bits (which should not happen, probably,
268                    as everybody should support LOOKUP and UPDATE bits at least)
269                    revert to compat mode with plain locks. */
270                 if (!data->ocd_ibits_known &&
271                     data->ocd_connect_flags & OBD_CONNECT_IBITS)
272                         data->ocd_connect_flags &= ~OBD_CONNECT_IBITS;
273
274                 if (!obd->u.mds.mds_fl_acl)
275                         data->ocd_connect_flags &= ~OBD_CONNECT_ACL;
276
277                 if (!obd->u.mds.mds_fl_user_xattr)
278                         data->ocd_connect_flags &= ~OBD_CONNECT_XATTR;
279
280                 exp->exp_connect_flags = data->ocd_connect_flags;
281                 data->ocd_version = LUSTRE_VERSION_CODE;
282                 exp->exp_mds_data.med_ibits_known = data->ocd_ibits_known;
283         }
284
285         if (obd->u.mds.mds_fl_acl &&
286             ((exp->exp_connect_flags & OBD_CONNECT_ACL) == 0)) {
287                 CWARN("%s: MDS requires ACL support but client does not\n",
288                       obd->obd_name);
289                 return -EBADE;
290         }
291         return 0;
292 }
293
294 static int mds_reconnect(struct obd_export *exp, struct obd_device *obd,
295                          struct obd_uuid *cluuid,
296                          struct obd_connect_data *data)
297 {
298         int rc;
299         ENTRY;
300
301         if (exp == NULL || obd == NULL || cluuid == NULL)
302                 RETURN(-EINVAL);
303
304         rc = mds_connect_internal(exp, data);
305
306         RETURN(rc);
307 }
308
309 /* Establish a connection to the MDS.
310  *
311  * This will set up an export structure for the client to hold state data
312  * about that client, like open files, the last operation number it did
313  * on the server, etc.
314  */
315 static int mds_connect(const struct lu_env *env,
316                        struct lustre_handle *conn, struct obd_device *obd,
317                        struct obd_uuid *cluuid, struct obd_connect_data *data)
318 {
319         struct obd_export *exp;
320         struct mds_export_data *med;
321         struct mds_client_data *mcd = NULL;
322         int rc, abort_recovery;
323         ENTRY;
324
325         if (!conn || !obd || !cluuid)
326                 RETURN(-EINVAL);
327
328         /* Check for aborted recovery. */
329         spin_lock_bh(&obd->obd_processing_task_lock);
330         abort_recovery = obd->obd_abort_recovery;
331         spin_unlock_bh(&obd->obd_processing_task_lock);
332         if (abort_recovery)
333                 target_abort_recovery(obd);
334
335         /* XXX There is a small race between checking the list and adding a
336          * new connection for the same UUID, but the real threat (list
337          * corruption when multiple different clients connect) is solved.
338          *
339          * There is a second race between adding the export to the list,
340          * and filling in the client data below.  Hence skipping the case
341          * of NULL mcd above.  We should already be controlling multiple
342          * connects at the client, and we can't hold the spinlock over
343          * memory allocations without risk of deadlocking.
344          */
345         rc = class_connect(conn, obd, cluuid);
346         if (rc)
347                 RETURN(rc);
348         exp = class_conn2export(conn);
349         LASSERT(exp);
350         med = &exp->exp_mds_data;
351
352         rc = mds_connect_internal(exp, data);
353         if (rc)
354                 GOTO(out, rc);
355
356         OBD_ALLOC(mcd, sizeof(*mcd));
357         if (!mcd)
358                 GOTO(out, rc = -ENOMEM);
359
360         memcpy(mcd->mcd_uuid, cluuid, sizeof(mcd->mcd_uuid));
361         med->med_mcd = mcd;
362
363         rc = mds_client_add(obd, &obd->u.mds, med, -1);
364         GOTO(out, rc);
365
366 out:
367         if (rc) {
368                 if (mcd) {
369                         OBD_FREE(mcd, sizeof(*mcd));
370                         med->med_mcd = NULL;
371                 }
372                 class_disconnect(exp);
373         } else {
374                 class_export_put(exp);
375         }
376
377         RETURN(rc);
378 }
379
380 int mds_init_export(struct obd_export *exp)
381 {
382         struct mds_export_data *med = &exp->exp_mds_data;
383
384         INIT_LIST_HEAD(&med->med_open_head);
385         spin_lock_init(&med->med_open_lock);
386         exp->exp_connecting = 1;
387         RETURN(0);
388 }
389
390 static int mds_destroy_export(struct obd_export *export)
391 {
392         struct mds_export_data *med;
393         struct obd_device *obd = export->exp_obd;
394         struct lvfs_run_ctxt saved;
395         int rc = 0;
396         ENTRY;
397
398         med = &export->exp_mds_data;
399         target_destroy_export(export);
400
401         if (obd_uuid_equals(&export->exp_client_uuid, &obd->obd_uuid))
402                 RETURN(0);
403
404         push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
405         /* Close any open files (which may also cause orphan unlinking). */
406         spin_lock(&med->med_open_lock);
407         while (!list_empty(&med->med_open_head)) {
408                 struct list_head *tmp = med->med_open_head.next;
409                 struct mds_file_data *mfd =
410                         list_entry(tmp, struct mds_file_data, mfd_list);
411                 struct dentry *dentry = mfd->mfd_dentry;
412
413                 /* Remove mfd handle so it can't be found again.
414                  * We are consuming the mfd_list reference here. */
415                 mds_mfd_unlink(mfd, 0);
416                 spin_unlock(&med->med_open_lock);
417
418                 /* If you change this message, be sure to update
419                  * replay_single:test_46 */
420                 CDEBUG(D_INODE|D_IOCTL, "%s: force closing file handle for "
421                        "%.*s (ino %lu)\n", obd->obd_name, dentry->d_name.len,
422                        dentry->d_name.name, dentry->d_inode->i_ino);
423                 /* child orphan sem protects orphan_dec_test and
424                  * is_orphan race, mds_mfd_close drops it */
425                 MDS_DOWN_WRITE_ORPHAN_SEM(dentry->d_inode);
426                 rc = mds_mfd_close(NULL, REQ_REC_OFF, obd, mfd,
427                                    !(export->exp_flags & OBD_OPT_FAILOVER));
428
429                 if (rc)
430                         CDEBUG(D_INODE|D_IOCTL, "Error closing file: %d\n", rc);
431                 spin_lock(&med->med_open_lock);
432         }
433         spin_unlock(&med->med_open_lock);
434         pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
435         mds_client_free(export);
436
437         RETURN(rc);
438 }
439
440 static int mds_disconnect(struct obd_export *exp)
441 {
442         int rc;
443         ENTRY;
444
445         LASSERT(exp);
446         class_export_get(exp);
447
448         /* Disconnect early so that clients can't keep using export */
449         rc = class_disconnect(exp);
450         if (exp->exp_obd->obd_namespace != NULL)
451                 ldlm_cancel_locks_for_export(exp);
452
453         /* complete all outstanding replies */
454         spin_lock(&exp->exp_lock);
455         while (!list_empty(&exp->exp_outstanding_replies)) {
456                 struct ptlrpc_reply_state *rs =
457                         list_entry(exp->exp_outstanding_replies.next,
458                                    struct ptlrpc_reply_state, rs_exp_list);
459                 struct ptlrpc_service *svc = rs->rs_service;
460
461                 spin_lock(&svc->srv_lock);
462                 list_del_init(&rs->rs_exp_list);
463                 ptlrpc_schedule_difficult_reply(rs);
464                 spin_unlock(&svc->srv_lock);
465         }
466         spin_unlock(&exp->exp_lock);
467
468         class_export_put(exp);
469         RETURN(rc);
470 }
471
472 static int mds_getstatus(struct ptlrpc_request *req)
473 {
474         struct mds_obd *mds = mds_req2mds(req);
475         struct mds_body *body;
476         int rc, size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
477         ENTRY;
478
479         rc = lustre_pack_reply(req, 2, size, NULL);
480         if (rc || OBD_FAIL_CHECK(OBD_FAIL_MDS_GETSTATUS_PACK)) {
481                 CERROR("mds: out of memory for message\n");
482                 req->rq_status = -ENOMEM;       /* superfluous? */
483                 RETURN(-ENOMEM);
484         }
485
486         body = lustre_msg_buf(req->rq_repmsg, REPLY_REC_OFF, sizeof(*body));
487         memcpy(&body->fid1, &mds->mds_rootfid, sizeof(body->fid1));
488
489         /* the last_committed and last_xid fields are filled in for all
490          * replies already - no need to do so here also.
491          */
492         RETURN(0);
493 }
494
495 /* get the LOV EA from @inode and store it into @md.  It can be at most
496  * @size bytes, and @size is updated with the actual EA size.
497  * The EA size is also returned on success, and -ve errno on failure.
498  * If there is no EA then 0 is returned. */
499 int mds_get_md(struct obd_device *obd, struct inode *inode, void *md,
500                int *size, int lock)
501 {
502         int rc = 0;
503         int lmm_size;
504
505         if (lock)
506                 LOCK_INODE_MUTEX(inode);
507         rc = fsfilt_get_md(obd, inode, md, *size, "lov");
508
509         if (rc < 0) {
510                 CERROR("Error %d reading eadata for ino %lu\n",
511                        rc, inode->i_ino);
512         } else if (rc > 0) {
513                 lmm_size = rc;
514                 rc = mds_convert_lov_ea(obd, inode, md, lmm_size);
515
516                 if (rc == 0) {
517                         *size = lmm_size;
518                         rc = lmm_size;
519                 } else if (rc > 0) {
520                         *size = rc;
521                 }
522         } else {
523                 *size = 0;
524         }
525         if (lock)
526                 UNLOCK_INODE_MUTEX(inode);
527
528         RETURN (rc);
529 }
530
531
532 /* Call with lock=1 if you want mds_pack_md to take the i_mutex.
533  * Call with lock=0 if the caller has already taken the i_mutex. */
534 int mds_pack_md(struct obd_device *obd, struct lustre_msg *msg, int offset,
535                 struct mds_body *body, struct inode *inode, int lock)
536 {
537         struct mds_obd *mds = &obd->u.mds;
538         void *lmm;
539         int lmm_size;
540         int rc;
541         ENTRY;
542
543         lmm = lustre_msg_buf(msg, offset, 0);
544         if (lmm == NULL) {
545                 /* Some problem with getting eadata when I sized the reply
546                  * buffer... */
547                 CDEBUG(D_INFO, "no space reserved for inode %lu MD\n",
548                        inode->i_ino);
549                 RETURN(0);
550         }
551         lmm_size = lustre_msg_buflen(msg, offset);
552
553         /* I don't really like this, but it is a sanity check on the client
554          * MD request.  However, if the client doesn't know how much space
555          * to reserve for the MD, it shouldn't be bad to have too much space.
556          */
557         if (lmm_size > mds->mds_max_mdsize) {
558                 CWARN("Reading MD for inode %lu of %d bytes > max %d\n",
559                        inode->i_ino, lmm_size, mds->mds_max_mdsize);
560                 // RETURN(-EINVAL);
561         }
562
563         rc = mds_get_md(obd, inode, lmm, &lmm_size, lock);
564         if (rc > 0) {
565                 if (S_ISDIR(inode->i_mode))
566                         body->valid |= OBD_MD_FLDIREA;
567                 else
568                         body->valid |= OBD_MD_FLEASIZE;
569                 body->eadatasize = lmm_size;
570                 rc = 0;
571         }
572
573         RETURN(rc);
574 }
575
576 #ifdef CONFIG_FS_POSIX_ACL
577 static
578 int mds_pack_posix_acl(struct inode *inode, struct lustre_msg *repmsg,
579                        struct mds_body *repbody, int repoff)
580 {
581         struct dentry de = { .d_inode = inode };
582         int buflen, rc;
583         ENTRY;
584
585         LASSERT(repbody->aclsize == 0);
586         LASSERT(lustre_msg_bufcount(repmsg) > repoff);
587
588         buflen = lustre_msg_buflen(repmsg, repoff);
589         if (!buflen)
590                 GOTO(out, 0);
591
592         if (!inode->i_op || !inode->i_op->getxattr)
593                 GOTO(out, 0);
594
595         lock_24kernel();
596         rc = inode->i_op->getxattr(&de, MDS_XATTR_NAME_ACL_ACCESS,
597                                    lustre_msg_buf(repmsg, repoff, buflen),
598                                    buflen);
599         unlock_24kernel();
600
601         if (rc >= 0)
602                 repbody->aclsize = rc;
603         else if (rc != -ENODATA) {
604                 CERROR("buflen %d, get acl: %d\n", buflen, rc);
605                 RETURN(rc);
606         }
607         EXIT;
608 out:
609         repbody->valid |= OBD_MD_FLACL;
610         return 0;
611 }
612 #else
613 #define mds_pack_posix_acl(inode, repmsg, repbody, repoff) 0
614 #endif
615
616 int mds_pack_acl(struct mds_export_data *med, struct inode *inode,
617                  struct lustre_msg *repmsg, struct mds_body *repbody,
618                  int repoff)
619 {
620         return mds_pack_posix_acl(inode, repmsg, repbody, repoff);
621 }
622
623 static int mds_getattr_internal(struct obd_device *obd, struct dentry *dentry,
624                                 struct ptlrpc_request *req,
625                                 struct mds_body *reqbody, int reply_off)
626 {
627         struct mds_body *body;
628         struct inode *inode = dentry->d_inode;
629         int rc = 0;
630         ENTRY;
631
632         if (inode == NULL)
633                 RETURN(-ENOENT);
634
635         body = lustre_msg_buf(req->rq_repmsg, reply_off, sizeof(*body));
636         LASSERT(body != NULL);                 /* caller prepped reply */
637
638         mds_pack_inode2fid(&body->fid1, inode);
639         body->flags = reqbody->flags; /* copy MDS_BFLAG_EXT_FLAGS if present */
640         mds_pack_inode2body(body, inode);
641         reply_off++;
642
643         if ((S_ISREG(inode->i_mode) && (reqbody->valid & OBD_MD_FLEASIZE)) ||
644             (S_ISDIR(inode->i_mode) && (reqbody->valid & OBD_MD_FLDIREA))) {
645                 rc = mds_pack_md(obd, req->rq_repmsg, reply_off, body,
646                                  inode, 1);
647
648                 /* If we have LOV EA data, the OST holds size, atime, mtime */
649                 if (!(body->valid & OBD_MD_FLEASIZE) &&
650                     !(body->valid & OBD_MD_FLDIREA))
651                         body->valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS |
652                                         OBD_MD_FLATIME | OBD_MD_FLMTIME);
653
654                 lustre_shrink_reply(req, reply_off, body->eadatasize, 0);
655                 if (body->eadatasize)
656                         reply_off++;
657         } else if (S_ISLNK(inode->i_mode) &&
658                    (reqbody->valid & OBD_MD_LINKNAME) != 0) {
659                 char *symname = lustre_msg_buf(req->rq_repmsg, reply_off, 0);
660                 int len;
661
662                 LASSERT (symname != NULL);       /* caller prepped reply */
663                 len = lustre_msg_buflen(req->rq_repmsg, reply_off);
664
665                 rc = inode->i_op->readlink(dentry, symname, len);
666                 if (rc < 0) {
667                         CERROR("readlink failed: %d\n", rc);
668                 } else if (rc != len - 1) {
669                         CERROR ("Unexpected readlink rc %d: expecting %d\n",
670                                 rc, len - 1);
671                         rc = -EINVAL;
672                 } else {
673                         CDEBUG(D_INODE, "read symlink dest %s\n", symname);
674                         body->valid |= OBD_MD_LINKNAME;
675                         body->eadatasize = rc + 1;
676                         symname[rc] = 0;        /* NULL terminate */
677                         rc = 0;
678                 }
679                 reply_off++;
680         } else if (reqbody->valid == OBD_MD_FLFLAGS &&
681                    reqbody->flags & MDS_BFLAG_EXT_FLAGS) {
682                 int flags;
683
684                 /* We only return the full set of flags on ioctl, otherwise we
685                  * get enough flags from the inode in mds_pack_inode2body(). */
686                 rc = fsfilt_iocontrol(obd, inode, NULL, EXT3_IOC_GETFLAGS,
687                                       (long)&flags);
688                 if (rc == 0)
689                         body->flags = flags | MDS_BFLAG_EXT_FLAGS;
690         }
691
692         if (reqbody->valid & OBD_MD_FLMODEASIZE) {
693                 struct mds_obd *mds = mds_req2mds(req);
694                 body->max_cookiesize = mds->mds_max_cookiesize;
695                 body->max_mdsize = mds->mds_max_mdsize;
696                 body->valid |= OBD_MD_FLMODEASIZE;
697         }
698
699         if (rc)
700                 RETURN(rc);
701
702 #ifdef CONFIG_FS_POSIX_ACL
703         if ((req->rq_export->exp_connect_flags & OBD_CONNECT_ACL) &&
704             (reqbody->valid & OBD_MD_FLACL)) {
705                 rc = mds_pack_acl(&req->rq_export->exp_mds_data,
706                                   inode, req->rq_repmsg,
707                                   body, reply_off);
708
709                 lustre_shrink_reply(req, reply_off, body->aclsize, 0);
710                 if (body->aclsize)
711                         reply_off++;
712         }
713 #endif
714
715         RETURN(rc);
716 }
717
718 static int mds_getattr_pack_msg(struct ptlrpc_request *req, struct inode *inode,
719                                 int offset)
720 {
721         struct mds_obd *mds = mds_req2mds(req);
722         struct mds_body *body;
723         int rc, bufcount = 2;
724         int size[4] = { sizeof(struct ptlrpc_body), sizeof(*body) };
725         ENTRY;
726
727         LASSERT(offset == REQ_REC_OFF); /* non-intent */
728
729         body = lustre_msg_buf(req->rq_reqmsg, offset, sizeof(*body));
730         LASSERT(body != NULL);                 /* checked by caller */
731         LASSERT_REQSWABBED(req, offset);       /* swabbed by caller */
732
733         if ((S_ISREG(inode->i_mode) && (body->valid & OBD_MD_FLEASIZE)) ||
734             (S_ISDIR(inode->i_mode) && (body->valid & OBD_MD_FLDIREA))) {
735                 LOCK_INODE_MUTEX(inode);
736                 rc = fsfilt_get_md(req->rq_export->exp_obd, inode, NULL, 0,
737                                    "lov");
738                 UNLOCK_INODE_MUTEX(inode);
739                 CDEBUG(D_INODE, "got %d bytes MD data for inode %lu\n",
740                        rc, inode->i_ino);
741                 if (rc < 0) {
742                         if (rc != -ENODATA) {
743                                 CERROR("error getting inode %lu MD: rc = %d\n",
744                                        inode->i_ino, rc);
745                                 RETURN(rc);
746                         }
747                         size[bufcount] = 0;
748                 } else if (rc > mds->mds_max_mdsize) {
749                         size[bufcount] = 0;
750                         CERROR("MD size %d larger than maximum possible %u\n",
751                                rc, mds->mds_max_mdsize);
752                 } else {
753                         size[bufcount] = rc;
754                 }
755                 bufcount++;
756         } else if (S_ISLNK(inode->i_mode) && (body->valid & OBD_MD_LINKNAME)) {
757                 if (inode->i_size + 1 != body->eadatasize)
758                         CERROR("symlink size: %Lu, reply space: %d\n",
759                                inode->i_size + 1, body->eadatasize);
760                 size[bufcount] = min_t(int, inode->i_size+1, body->eadatasize);
761                 bufcount++;
762                 CDEBUG(D_INODE, "symlink size: %Lu, reply space: %d\n",
763                        inode->i_size + 1, body->eadatasize);
764         }
765
766 #ifdef CONFIG_FS_POSIX_ACL
767         if ((req->rq_export->exp_connect_flags & OBD_CONNECT_ACL) &&
768             (body->valid & OBD_MD_FLACL)) {
769                 struct dentry de = { .d_inode = inode };
770
771                 size[bufcount] = 0;
772                 if (inode->i_op && inode->i_op->getxattr) {
773                         lock_24kernel();
774                         rc = inode->i_op->getxattr(&de, MDS_XATTR_NAME_ACL_ACCESS,
775                                                    NULL, 0);
776                         unlock_24kernel();
777
778                         if (rc < 0) {
779                                 if (rc != -ENODATA) {
780                                         CERROR("got acl size: %d\n", rc);
781                                         RETURN(rc);
782                                 }
783                         } else
784                                 size[bufcount] = rc;
785                 }
786                 bufcount++;
787         }
788 #endif
789
790         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_GETATTR_PACK)) {
791                 CERROR("failed MDS_GETATTR_PACK test\n");
792                 req->rq_status = -ENOMEM;
793                 RETURN(-ENOMEM);
794         }
795
796         rc = lustre_pack_reply(req, bufcount, size, NULL);
797         if (rc) {
798                 CERROR("lustre_pack_reply failed: rc %d\n", rc);
799                 req->rq_status = rc;
800                 RETURN(rc);
801         }
802
803         RETURN(0);
804 }
805
806 static int mds_getattr_lock(struct ptlrpc_request *req, int offset,
807                             int child_part, struct lustre_handle *child_lockh)
808 {
809         struct obd_device *obd = req->rq_export->exp_obd;
810         struct mds_obd *mds = &obd->u.mds;
811         struct ldlm_reply *rep = NULL;
812         struct lvfs_run_ctxt saved;
813         struct mds_body *body;
814         struct dentry *dparent = NULL, *dchild = NULL;
815         struct lvfs_ucred uc = {0,};
816         struct lustre_handle parent_lockh;
817         int namesize;
818         int rc = 0, cleanup_phase = 0, resent_req = 0;
819         char *name;
820         ENTRY;
821
822         LASSERT(!strcmp(obd->obd_type->typ_name, LUSTRE_MDS_NAME));
823
824         /* Swab now, before anyone looks inside the request */
825         body = lustre_swab_reqbuf(req, offset, sizeof(*body),
826                                   lustre_swab_mds_body);
827         if (body == NULL) {
828                 CERROR("Can't swab mds_body\n");
829                 RETURN(-EFAULT);
830         }
831
832         LASSERT_REQSWAB(req, offset + 1);
833         name = lustre_msg_string(req->rq_reqmsg, offset + 1, 0);
834         if (name == NULL) {
835                 CERROR("Can't unpack name\n");
836                 RETURN(-EFAULT);
837         }
838         namesize = lustre_msg_buflen(req->rq_reqmsg, offset + 1);
839         /* namesize less than 2 means we have empty name, probably came from
840            revalidate by cfid, so no point in having name to be set */
841         if (namesize <= 1)
842                 name = NULL;
843
844         rc = mds_init_ucred(&uc, req, offset);
845         if (rc)
846                 GOTO(cleanup, rc);
847
848         LASSERT(offset == REQ_REC_OFF || offset == DLM_INTENT_REC_OFF);
849         /* if requests were at offset 2, the getattr reply goes back at 1 */
850         if (offset == DLM_INTENT_REC_OFF) {
851                 rep = lustre_msg_buf(req->rq_repmsg, DLM_LOCKREPLY_OFF,
852                                      sizeof(*rep));
853                 offset = DLM_REPLY_REC_OFF;
854         }
855
856         push_ctxt(&saved, &obd->obd_lvfs_ctxt, &uc);
857         cleanup_phase = 1; /* kernel context */
858         intent_set_disposition(rep, DISP_LOOKUP_EXECD);
859
860         /* FIXME: handle raw lookup */
861 #if 0
862         if (body->valid == OBD_MD_FLID) {
863                 struct mds_body *mds_reply;
864                 int size = sizeof(*mds_reply);
865                 ino_t inum;
866                 // The user requested ONLY the inode number, so do a raw lookup
867                 rc = lustre_pack_reply(req, 1, &size, NULL);
868                 if (rc) {
869                         CERROR("out of memory\n");
870                         GOTO(cleanup, rc);
871                 }
872
873                 rc = dir->i_op->lookup_raw(dir, name, namesize - 1, &inum);
874
875                 mds_reply = lustre_msg_buf(req->rq_repmsg, offset,
876                                            sizeof(*mds_reply));
877                 mds_reply->fid1.id = inum;
878                 mds_reply->valid = OBD_MD_FLID;
879                 GOTO(cleanup, rc);
880         }
881 #endif
882
883         if (lustre_handle_is_used(child_lockh)) {
884                 LASSERT(lustre_msg_get_flags(req->rq_reqmsg) & MSG_RESENT);
885                 resent_req = 1;
886         }
887
888         if (resent_req == 0) {
889                 if (name) {
890                         rc = mds_get_parent_child_locked(obd, &obd->u.mds,
891                                                          &body->fid1,
892                                                          &parent_lockh,
893                                                          &dparent, LCK_CR,
894                                                          MDS_INODELOCK_UPDATE,
895                                                          name, namesize,
896                                                          child_lockh, &dchild,
897                                                          LCK_CR, child_part);
898                 } else {
899                         /* For revalidate by fid we always take UPDATE lock */
900                         dchild = mds_fid2locked_dentry(obd, &body->fid2, NULL,
901                                                        LCK_CR, child_lockh,
902                                                        child_part);
903                         LASSERT(dchild);
904                         if (IS_ERR(dchild))
905                                 rc = PTR_ERR(dchild);
906                 }
907                 if (rc)
908                         GOTO(cleanup, rc);
909         } else {
910                 struct ldlm_lock *granted_lock;
911                 struct ll_fid child_fid;
912                 struct ldlm_resource *res;
913                 DEBUG_REQ(D_DLMTRACE, req, "resent, not enqueuing new locks");
914                 granted_lock = ldlm_handle2lock(child_lockh);
915                 LASSERTF(granted_lock != NULL, LPU64"/%u lockh "LPX64"\n",
916                          body->fid1.id, body->fid1.generation,
917                          child_lockh->cookie);
918
919
920                 res = granted_lock->l_resource;
921                 child_fid.id = res->lr_name.name[0];
922                 child_fid.generation = res->lr_name.name[1];
923                 dchild = mds_fid2dentry(&obd->u.mds, &child_fid, NULL);
924                 LASSERT(!IS_ERR(dchild));
925                 LDLM_LOCK_PUT(granted_lock);
926         }
927
928         cleanup_phase = 2; /* dchild, dparent, locks */
929
930         if (dchild->d_inode == NULL) {
931                 intent_set_disposition(rep, DISP_LOOKUP_NEG);
932                 /* in the intent case, the policy clears this error:
933                    the disposition is enough */
934                 GOTO(cleanup, rc = -ENOENT);
935         } else {
936                 intent_set_disposition(rep, DISP_LOOKUP_POS);
937         }
938
939         if (req->rq_repmsg == NULL) {
940                 rc = mds_getattr_pack_msg(req, dchild->d_inode, offset);
941                 if (rc != 0) {
942                         CERROR ("mds_getattr_pack_msg: %d\n", rc);
943                         GOTO (cleanup, rc);
944                 }
945         }
946
947         rc = mds_getattr_internal(obd, dchild, req, body, offset);
948         GOTO(cleanup, rc); /* returns the lock to the client */
949
950  cleanup:
951         switch (cleanup_phase) {
952         case 2:
953                 if (resent_req == 0) {
954                         if (rc && dchild->d_inode)
955                                 ldlm_lock_decref(child_lockh, LCK_CR);
956                         if (name) {
957                                 ldlm_lock_decref(&parent_lockh, LCK_CR);
958                                 l_dput(dparent);
959                         }
960                 }
961                 l_dput(dchild);
962         case 1:
963                 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, &uc);
964         default:
965                 mds_exit_ucred(&uc, mds);
966                 if (req->rq_reply_state == NULL) {
967                         req->rq_status = rc;
968                         lustre_pack_reply(req, 1, NULL, NULL);
969                 }
970         }
971         return rc;
972 }
973
974 static int mds_getattr(struct ptlrpc_request *req, int offset)
975 {
976         struct mds_obd *mds = mds_req2mds(req);
977         struct obd_device *obd = req->rq_export->exp_obd;
978         struct lvfs_run_ctxt saved;
979         struct dentry *de;
980         struct mds_body *body;
981         struct lvfs_ucred uc = {0,};
982         int rc = 0;
983         ENTRY;
984
985         OBD_COUNTER_INCREMENT(obd, getattr);
986
987         body = lustre_swab_reqbuf(req, offset, sizeof(*body),
988                                   lustre_swab_mds_body);
989         if (body == NULL)
990                 RETURN(-EFAULT);
991
992         rc = mds_init_ucred(&uc, req, offset);
993         if (rc)
994                 GOTO(out_ucred, rc);
995
996         push_ctxt(&saved, &obd->obd_lvfs_ctxt, &uc);
997         de = mds_fid2dentry(mds, &body->fid1, NULL);
998         if (IS_ERR(de)) {
999                 rc = req->rq_status = PTR_ERR(de);
1000                 GOTO(out_pop, rc);
1001         }
1002
1003         rc = mds_getattr_pack_msg(req, de->d_inode, offset);
1004         if (rc != 0) {
1005                 CERROR("mds_getattr_pack_msg: %d\n", rc);
1006                 GOTO(out_pop, rc);
1007         }
1008
1009         req->rq_status = mds_getattr_internal(obd, de, req, body,
1010                                               REPLY_REC_OFF);
1011
1012         l_dput(de);
1013         GOTO(out_pop, rc);
1014 out_pop:
1015         pop_ctxt(&saved, &obd->obd_lvfs_ctxt, &uc);
1016 out_ucred:
1017         if (req->rq_reply_state == NULL) {
1018                 req->rq_status = rc;
1019                 lustre_pack_reply(req, 1, NULL, NULL);
1020         }
1021         mds_exit_ucred(&uc, mds);
1022         return rc;
1023 }
1024
1025 static int mds_obd_statfs(struct obd_device *obd, struct obd_statfs *osfs,
1026                           __u64 max_age)
1027 {
1028         int rc;
1029
1030         spin_lock(&obd->obd_osfs_lock);
1031         rc = fsfilt_statfs(obd, obd->u.obt.obt_sb, max_age);
1032         if (rc == 0)
1033                 memcpy(osfs, &obd->obd_osfs, sizeof(*osfs));
1034         spin_unlock(&obd->obd_osfs_lock);
1035
1036         return rc;
1037 }
1038
1039 static int mds_statfs(struct ptlrpc_request *req)
1040 {
1041         struct obd_device *obd = req->rq_export->exp_obd;
1042         int rc, size[2] = { sizeof(struct ptlrpc_body),
1043                             sizeof(struct obd_statfs) };
1044         ENTRY;
1045
1046         /* This will trigger a watchdog timeout */
1047         OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_STATFS_LCW_SLEEP,
1048                          (MDS_SERVICE_WATCHDOG_TIMEOUT / 1000) + 1);
1049         OBD_COUNTER_INCREMENT(obd, statfs);
1050
1051         rc = lustre_pack_reply(req, 2, size, NULL);
1052         if (rc || OBD_FAIL_CHECK(OBD_FAIL_MDS_STATFS_PACK)) {
1053                 CERROR("mds: statfs lustre_pack_reply failed: rc = %d\n", rc);
1054                 GOTO(out, rc);
1055         }
1056
1057         /* We call this so that we can cache a bit - 1 jiffie worth */
1058         rc = mds_obd_statfs(obd, lustre_msg_buf(req->rq_repmsg, REPLY_REC_OFF,
1059                                                 size[REPLY_REC_OFF]),
1060                             cfs_time_current_64() - HZ);
1061         if (rc) {
1062                 CERROR("mds_obd_statfs failed: rc %d\n", rc);
1063                 GOTO(out, rc);
1064         }
1065
1066         EXIT;
1067 out:
1068         req->rq_status = rc;
1069         return 0;
1070 }
1071
1072 static int mds_sync(struct ptlrpc_request *req, int offset)
1073 {
1074         struct obd_device *obd = req->rq_export->exp_obd;
1075         struct mds_obd *mds = &obd->u.mds;
1076         struct mds_body *body;
1077         int rc, size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
1078         ENTRY;
1079
1080         body = lustre_swab_reqbuf(req, offset, sizeof(*body),
1081                                   lustre_swab_mds_body);
1082         if (body == NULL)
1083                 GOTO(out, rc = -EFAULT);
1084
1085         rc = lustre_pack_reply(req, 2, size, NULL);
1086         if (rc || OBD_FAIL_CHECK(OBD_FAIL_MDS_SYNC_PACK)) {
1087                 CERROR("fsync lustre_pack_reply failed: rc = %d\n", rc);
1088                 GOTO(out, rc);
1089         }
1090
1091         if (body->fid1.id == 0) {
1092                 /* a fid of zero is taken to mean "sync whole filesystem" */
1093                 rc = fsfilt_sync(obd, obd->u.obt.obt_sb);
1094                 GOTO(out, rc);
1095         } else {
1096                 struct dentry *de;
1097
1098                 de = mds_fid2dentry(mds, &body->fid1, NULL);
1099                 if (IS_ERR(de))
1100                         GOTO(out, rc = PTR_ERR(de));
1101
1102                 /* The file parameter isn't used for anything */
1103                 if (de->d_inode->i_fop && de->d_inode->i_fop->fsync)
1104                         rc = de->d_inode->i_fop->fsync(NULL, de, 1);
1105                 if (rc == 0) {
1106                         body = lustre_msg_buf(req->rq_repmsg, REPLY_REC_OFF,
1107                                               sizeof(*body));
1108                         mds_pack_inode2fid(&body->fid1, de->d_inode);
1109                         mds_pack_inode2body(body, de->d_inode);
1110                 }
1111
1112                 l_dput(de);
1113                 GOTO(out, rc);
1114         }
1115 out:
1116         req->rq_status = rc;
1117         return 0;
1118 }
1119
1120 /* mds_readpage does not take a DLM lock on the inode, because the client must
1121  * already have a PR lock.
1122  *
1123  * If we were to take another one here, a deadlock will result, if another
1124  * thread is already waiting for a PW lock. */
1125 static int mds_readpage(struct ptlrpc_request *req, int offset)
1126 {
1127         struct obd_device *obd = req->rq_export->exp_obd;
1128         struct mds_obd *mds = &obd->u.mds;
1129         struct vfsmount *mnt;
1130         struct dentry *de;
1131         struct file *file;
1132         struct mds_body *body, *repbody;
1133         struct lvfs_run_ctxt saved;
1134         int rc, size[2] = { sizeof(struct ptlrpc_body), sizeof(*repbody) };
1135         struct lvfs_ucred uc = {0,};
1136         ENTRY;
1137
1138         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_READPAGE_PACK))
1139                 RETURN(-ENOMEM);
1140
1141         rc = lustre_pack_reply(req, 2, size, NULL);
1142         if (rc) {
1143                 CERROR("error packing readpage reply: rc %d\n", rc);
1144                 GOTO(out, rc);
1145         }
1146
1147         body = lustre_swab_reqbuf(req, offset, sizeof(*body),
1148                                   lustre_swab_mds_body);
1149         if (body == NULL)
1150                 GOTO (out, rc = -EFAULT);
1151
1152         rc = mds_init_ucred(&uc, req, offset);
1153         if (rc)
1154                 GOTO(out, rc);
1155
1156         push_ctxt(&saved, &obd->obd_lvfs_ctxt, &uc);
1157         de = mds_fid2dentry(&obd->u.mds, &body->fid1, &mnt);
1158         if (IS_ERR(de))
1159                 GOTO(out_pop, rc = PTR_ERR(de));
1160
1161         CDEBUG(D_INODE, "ino %lu\n", de->d_inode->i_ino);
1162
1163         file = dentry_open(de, mnt, O_RDONLY | O_LARGEFILE);
1164         /* note: in case of an error, dentry_open puts dentry */
1165         if (IS_ERR(file))
1166                 GOTO(out_pop, rc = PTR_ERR(file));
1167
1168         /* body->size is actually the offset -eeb */
1169         if ((body->size & (de->d_inode->i_blksize - 1)) != 0) {
1170                 CERROR("offset "LPU64" not on a block boundary of %lu\n",
1171                        body->size, de->d_inode->i_blksize);
1172                 GOTO(out_file, rc = -EFAULT);
1173         }
1174
1175         /* body->nlink is actually the #bytes to read -eeb */
1176         if (body->nlink & (de->d_inode->i_blksize - 1)) {
1177                 CERROR("size %u is not multiple of blocksize %lu\n",
1178                        body->nlink, de->d_inode->i_blksize);
1179                 GOTO(out_file, rc = -EFAULT);
1180         }
1181
1182         repbody = lustre_msg_buf(req->rq_repmsg, REPLY_REC_OFF,
1183                                  sizeof(*repbody));
1184         repbody->size = file->f_dentry->d_inode->i_size;
1185         repbody->valid = OBD_MD_FLSIZE;
1186
1187         /* to make this asynchronous make sure that the handling function
1188            doesn't send a reply when this function completes. Instead a
1189            callback function would send the reply */
1190         /* body->size is actually the offset -eeb */
1191         rc = mds_sendpage(req, file, body->size, body->nlink);
1192
1193 out_file:
1194         filp_close(file, 0);
1195 out_pop:
1196         pop_ctxt(&saved, &obd->obd_lvfs_ctxt, &uc);
1197 out:
1198         mds_exit_ucred(&uc, mds);
1199         req->rq_status = rc;
1200         RETURN(0);
1201 }
1202
1203 int mds_reint(struct ptlrpc_request *req, int offset,
1204               struct lustre_handle *lockh)
1205 {
1206         struct mds_update_record *rec; /* 116 bytes on the stack?  no sir! */
1207         int rc;
1208
1209         OBD_ALLOC(rec, sizeof(*rec));
1210         if (rec == NULL)
1211                 RETURN(-ENOMEM);
1212
1213         rc = mds_update_unpack(req, offset, rec);
1214         if (rc || OBD_FAIL_CHECK(OBD_FAIL_MDS_REINT_UNPACK)) {
1215                 CERROR("invalid record\n");
1216                 GOTO(out, req->rq_status = -EINVAL);
1217         }
1218
1219         /* rc will be used to interrupt a for loop over multiple records */
1220         rc = mds_reint_rec(rec, offset, req, lockh);
1221  out:
1222         OBD_FREE(rec, sizeof(*rec));
1223         return rc;
1224 }
1225
1226 int mds_filter_recovery_request(struct ptlrpc_request *req,
1227                                 struct obd_device *obd, int *process)
1228 {
1229         switch (lustre_msg_get_opc(req->rq_reqmsg)) {
1230         case MDS_CONNECT: /* This will never get here, but for completeness. */
1231         case OST_CONNECT: /* This will never get here, but for completeness. */
1232         case MDS_DISCONNECT:
1233         case OST_DISCONNECT:
1234                *process = 1;
1235                RETURN(0);
1236
1237         case MDS_CLOSE:
1238         case MDS_SYNC: /* used in unmounting */
1239         case OBD_PING:
1240         case MDS_REINT:
1241         case SEQ_QUERY:
1242         case FLD_QUERY:
1243         case LDLM_ENQUEUE:
1244                 *process = target_queue_recovery_request(req, obd);
1245                 RETURN(0);
1246
1247         default:
1248                 DEBUG_REQ(D_ERROR, req, "not permitted during recovery");
1249                 *process = 0;
1250                 /* XXX what should we set rq_status to here? */
1251                 req->rq_status = -EAGAIN;
1252                 RETURN(ptlrpc_error(req));
1253         }
1254 }
1255 EXPORT_SYMBOL(mds_filter_recovery_request);
1256
1257 static char *reint_names[] = {
1258         [REINT_SETATTR] "setattr",
1259         [REINT_CREATE]  "create",
1260         [REINT_LINK]    "link",
1261         [REINT_UNLINK]  "unlink",
1262         [REINT_RENAME]  "rename",
1263         [REINT_OPEN]    "open",
1264 };
1265
1266 static int mds_set_info_rpc(struct obd_export *exp, struct ptlrpc_request *req)
1267 {
1268         char *key;
1269         __u32 *val;
1270         int keylen, rc = 0;
1271         ENTRY;
1272
1273         key = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, 1);
1274         if (key == NULL) {
1275                 DEBUG_REQ(D_HA, req, "no set_info key");
1276                 RETURN(-EFAULT);
1277         }
1278         keylen = lustre_msg_buflen(req->rq_reqmsg, REQ_REC_OFF);
1279
1280         val = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 1, sizeof(*val));
1281         if (val == NULL) {
1282                 DEBUG_REQ(D_HA, req, "no set_info val");
1283                 RETURN(-EFAULT);
1284         }
1285
1286         rc = lustre_pack_reply(req, 1, NULL, NULL);
1287         if (rc)
1288                 RETURN(rc);
1289         lustre_msg_set_status(req->rq_repmsg, 0);
1290
1291         if (keylen < strlen("read-only") ||
1292             memcmp(key, "read-only", keylen) != 0)
1293                 RETURN(-EINVAL);
1294
1295         if (*val)
1296                 exp->exp_connect_flags |= OBD_CONNECT_RDONLY;
1297         else
1298                 exp->exp_connect_flags &= ~OBD_CONNECT_RDONLY;
1299
1300         RETURN(0);
1301 }
1302
1303 static int mds_handle_quotacheck(struct ptlrpc_request *req)
1304 {
1305         struct obd_quotactl *oqctl;
1306         int rc;
1307         ENTRY;
1308
1309         oqctl = lustre_swab_reqbuf(req, REQ_REC_OFF, sizeof(*oqctl),
1310                                    lustre_swab_obd_quotactl);
1311         if (oqctl == NULL)
1312                 RETURN(-EPROTO);
1313
1314         rc = lustre_pack_reply(req, 1, NULL, NULL);
1315         if (rc) {
1316                 CERROR("mds: out of memory while packing quotacheck reply\n");
1317                 RETURN(rc);
1318         }
1319
1320         req->rq_status = obd_quotacheck(req->rq_export, oqctl);
1321         RETURN(0);
1322 }
1323
1324 static int mds_handle_quotactl(struct ptlrpc_request *req)
1325 {
1326         struct obd_quotactl *oqctl, *repoqc;
1327         int rc, size[2] = { sizeof(struct ptlrpc_body), sizeof(*repoqc) };
1328         ENTRY;
1329
1330         oqctl = lustre_swab_reqbuf(req, REQ_REC_OFF, sizeof(*oqctl),
1331                                    lustre_swab_obd_quotactl);
1332         if (oqctl == NULL)
1333                 RETURN(-EPROTO);
1334
1335         rc = lustre_pack_reply(req, 2, size, NULL);
1336         if (rc)
1337                 RETURN(rc);
1338
1339         repoqc = lustre_msg_buf(req->rq_repmsg, REPLY_REC_OFF, sizeof(*repoqc));
1340
1341         req->rq_status = obd_quotactl(req->rq_export, oqctl);
1342         *repoqc = *oqctl;
1343         RETURN(0);
1344 }
1345
1346 int mds_msg_check_version(struct lustre_msg *msg)
1347 {
1348         int rc;
1349
1350         switch (lustre_msg_get_opc(msg)) {
1351         case MDS_CONNECT:
1352         case MDS_DISCONNECT:
1353         case OBD_PING:
1354         case SEC_CTX_INIT:
1355         case SEC_CTX_INIT_CONT:
1356         case SEC_CTX_FINI:
1357                 rc = lustre_msg_check_version(msg, LUSTRE_OBD_VERSION);
1358                 if (rc)
1359                         CERROR("bad opc %u version %08x, expecting %08x\n",
1360                                lustre_msg_get_opc(msg),
1361                                lustre_msg_get_version(msg),
1362                                LUSTRE_OBD_VERSION);
1363                 break;
1364         case MDS_GETSTATUS:
1365         case MDS_GETATTR:
1366         case MDS_GETATTR_NAME:
1367         case MDS_STATFS:
1368         case MDS_READPAGE:
1369         case MDS_WRITEPAGE:
1370         case MDS_IS_SUBDIR:
1371         case MDS_REINT:
1372         case MDS_CLOSE:
1373         case MDS_DONE_WRITING:
1374         case MDS_PIN:
1375         case MDS_SYNC:
1376         case MDS_GETXATTR:
1377         case MDS_SETXATTR:
1378         case MDS_SET_INFO:
1379         case MDS_QUOTACHECK:
1380         case MDS_QUOTACTL:
1381         case QUOTA_DQACQ:
1382         case QUOTA_DQREL:
1383         case SEQ_QUERY:
1384         case FLD_QUERY:
1385                 rc = lustre_msg_check_version(msg, LUSTRE_MDS_VERSION);
1386                 if (rc)
1387                         CERROR("bad opc %u version %08x, expecting %08x\n",
1388                                lustre_msg_get_opc(msg),
1389                                lustre_msg_get_version(msg),
1390                                LUSTRE_MDS_VERSION);
1391                 break;
1392         case LDLM_ENQUEUE:
1393         case LDLM_CONVERT:
1394         case LDLM_BL_CALLBACK:
1395         case LDLM_CP_CALLBACK:
1396                 rc = lustre_msg_check_version(msg, LUSTRE_DLM_VERSION);
1397                 if (rc)
1398                         CERROR("bad opc %u version %08x, expecting %08x\n",
1399                                lustre_msg_get_opc(msg),
1400                                lustre_msg_get_version(msg),
1401                                LUSTRE_DLM_VERSION);
1402                 break;
1403         case OBD_LOG_CANCEL:
1404         case LLOG_ORIGIN_HANDLE_CREATE:
1405         case LLOG_ORIGIN_HANDLE_NEXT_BLOCK:
1406         case LLOG_ORIGIN_HANDLE_READ_HEADER:
1407         case LLOG_ORIGIN_HANDLE_CLOSE:
1408         case LLOG_ORIGIN_HANDLE_DESTROY:
1409         case LLOG_ORIGIN_HANDLE_PREV_BLOCK:
1410         case LLOG_CATINFO:
1411                 rc = lustre_msg_check_version(msg, LUSTRE_LOG_VERSION);
1412                 if (rc)
1413                         CERROR("bad opc %u version %08x, expecting %08x\n",
1414                                lustre_msg_get_opc(msg),
1415                                lustre_msg_get_version(msg),
1416                                LUSTRE_LOG_VERSION);
1417                 break;
1418         default:
1419                 CERROR("MDS unknown opcode %d\n", lustre_msg_get_opc(msg));
1420                 rc = -ENOTSUPP;
1421         }
1422         return rc;
1423 }
1424 EXPORT_SYMBOL(mds_msg_check_version);
1425
1426 int mds_handle(struct ptlrpc_request *req)
1427 {
1428         int should_process, fail = OBD_FAIL_MDS_ALL_REPLY_NET;
1429         int rc;
1430         struct mds_obd *mds = NULL; /* quell gcc overwarning */
1431         struct obd_device *obd = NULL;
1432         ENTRY;
1433
1434         OBD_FAIL_RETURN(OBD_FAIL_MDS_ALL_REQUEST_NET | OBD_FAIL_ONCE, 0);
1435
1436         LASSERT(current->journal_info == NULL);
1437
1438         rc = mds_msg_check_version(req->rq_reqmsg);
1439         if (rc) {
1440                 CERROR("MDS drop mal-formed request\n");
1441                 RETURN(rc);
1442         }
1443
1444         /* XXX identical to OST */
1445         if (lustre_msg_get_opc(req->rq_reqmsg) != MDS_CONNECT) {
1446                 struct mds_export_data *med;
1447                 int recovering, abort_recovery;
1448
1449                 if (req->rq_export == NULL) {
1450                         CERROR("operation %d on unconnected MDS from %s\n",
1451                                lustre_msg_get_opc(req->rq_reqmsg),
1452                                libcfs_id2str(req->rq_peer));
1453                         req->rq_status = -ENOTCONN;
1454                         GOTO(out, rc = -ENOTCONN);
1455                 }
1456
1457                 med = &req->rq_export->exp_mds_data;
1458                 obd = req->rq_export->exp_obd;
1459                 mds = mds_req2mds(req);
1460
1461                 /* sanity check: if the xid matches, the request must
1462                  * be marked as a resent or replayed */
1463                 if (req->rq_xid == le64_to_cpu(med->med_mcd->mcd_last_xid) ||
1464                    req->rq_xid == le64_to_cpu(med->med_mcd->mcd_last_close_xid))
1465                         if (!(lustre_msg_get_flags(req->rq_reqmsg) &
1466                                  (MSG_RESENT | MSG_REPLAY))) {
1467                                 CERROR("rq_xid "LPU64" matches last_xid, "
1468                                        "expected RESENT flag\n",
1469                                         req->rq_xid);
1470                                 req->rq_status = -ENOTCONN;
1471                                 GOTO(out, rc = -EFAULT);
1472                         }
1473                 /* else: note the opposite is not always true; a
1474                  * RESENT req after a failover will usually not match
1475                  * the last_xid, since it was likely never
1476                  * committed. A REPLAYed request will almost never
1477                  * match the last xid, however it could for a
1478                  * committed, but still retained, open. */
1479
1480                 /* Check for aborted recovery. */
1481                 spin_lock_bh(&obd->obd_processing_task_lock);
1482                 abort_recovery = obd->obd_abort_recovery;
1483                 recovering = obd->obd_recovering;
1484                 spin_unlock_bh(&obd->obd_processing_task_lock);
1485                 if (abort_recovery) {
1486                         target_abort_recovery(obd);
1487                 } else if (recovering) {
1488                         rc = mds_filter_recovery_request(req, obd,
1489                                                          &should_process);
1490                         if (rc || !should_process)
1491                                 RETURN(rc);
1492                 }
1493         }
1494
1495         switch (lustre_msg_get_opc(req->rq_reqmsg)) {
1496         case MDS_CONNECT:
1497                 DEBUG_REQ(D_INODE, req, "connect");
1498                 OBD_FAIL_RETURN(OBD_FAIL_MDS_CONNECT_NET, 0);
1499                 rc = target_handle_connect(req, mds_handle);
1500                 if (!rc) {
1501                         /* Now that we have an export, set mds. */
1502                         /*
1503                          * XXX nikita: these assignments are useless: mds is
1504                          * never used below, and obd is only used for
1505                          * MSG_LAST_REPLAY case, which never happens for
1506                          * MDS_CONNECT.
1507                          */
1508                         obd = req->rq_export->exp_obd;
1509                         mds = mds_req2mds(req);
1510                 }
1511                 break;
1512
1513         case MDS_DISCONNECT:
1514                 DEBUG_REQ(D_INODE, req, "disconnect");
1515                 OBD_FAIL_RETURN(OBD_FAIL_MDS_DISCONNECT_NET, 0);
1516                 rc = target_handle_disconnect(req);
1517                 req->rq_status = rc;            /* superfluous? */
1518                 break;
1519
1520         case MDS_GETSTATUS:
1521                 DEBUG_REQ(D_INODE, req, "getstatus");
1522                 OBD_FAIL_RETURN(OBD_FAIL_MDS_GETSTATUS_NET, 0);
1523                 rc = mds_getstatus(req);
1524                 break;
1525
1526         case MDS_GETATTR:
1527                 DEBUG_REQ(D_INODE, req, "getattr");
1528                 OBD_FAIL_RETURN(OBD_FAIL_MDS_GETATTR_NET, 0);
1529                 rc = mds_getattr(req, REQ_REC_OFF);
1530                 break;
1531
1532         case MDS_SETXATTR:
1533                 DEBUG_REQ(D_INODE, req, "setxattr");
1534                 OBD_FAIL_RETURN(OBD_FAIL_MDS_SETXATTR_NET, 0);
1535                 rc = mds_setxattr(req);
1536                 break;
1537
1538         case MDS_GETXATTR:
1539                 DEBUG_REQ(D_INODE, req, "getxattr");
1540                 OBD_FAIL_RETURN(OBD_FAIL_MDS_GETXATTR_NET, 0);
1541                 rc = mds_getxattr(req);
1542                 break;
1543
1544         case MDS_GETATTR_NAME: {
1545                 struct lustre_handle lockh = { 0 };
1546                 DEBUG_REQ(D_INODE, req, "getattr_name");
1547                 OBD_FAIL_RETURN(OBD_FAIL_MDS_GETATTR_NAME_NET, 0);
1548
1549                 /* If this request gets a reconstructed reply, we won't be
1550                  * acquiring any new locks in mds_getattr_lock, so we don't
1551                  * want to cancel.
1552                  */
1553                 rc = mds_getattr_lock(req, REQ_REC_OFF, MDS_INODELOCK_UPDATE,
1554                                       &lockh);
1555                 /* this non-intent call (from an ioctl) is special */
1556                 req->rq_status = rc;
1557                 if (rc == 0 && lustre_handle_is_used(&lockh))
1558                         ldlm_lock_decref(&lockh, LCK_CR);
1559                 break;
1560         }
1561         case MDS_STATFS:
1562                 DEBUG_REQ(D_INODE, req, "statfs");
1563                 OBD_FAIL_RETURN(OBD_FAIL_MDS_STATFS_NET, 0);
1564                 rc = mds_statfs(req);
1565                 break;
1566
1567         case MDS_READPAGE:
1568                 DEBUG_REQ(D_INODE, req, "readpage");
1569                 OBD_FAIL_RETURN(OBD_FAIL_MDS_READPAGE_NET, 0);
1570                 rc = mds_readpage(req, REQ_REC_OFF);
1571
1572                 if (OBD_FAIL_CHECK_ONCE(OBD_FAIL_MDS_SENDPAGE)) {
1573                         RETURN(0);
1574                 }
1575
1576                 break;
1577
1578         case MDS_REINT: {
1579                 __u32 *opcp = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF,
1580                                              sizeof(*opcp));
1581                 __u32  opc;
1582                 int size[4] = { sizeof(struct ptlrpc_body),
1583                                 sizeof(struct mds_body),
1584                                 mds->mds_max_mdsize,
1585                                 mds->mds_max_cookiesize };
1586                 int bufcount;
1587
1588                 /* NB only peek inside req now; mds_reint() will swab it */
1589                 if (opcp == NULL) {
1590                         CERROR ("Can't inspect opcode\n");
1591                         rc = -EINVAL;
1592                         break;
1593                 }
1594                 opc = *opcp;
1595                 if (lustre_msg_swabbed(req->rq_reqmsg))
1596                         __swab32s(&opc);
1597
1598                 DEBUG_REQ(D_INODE, req, "reint %d (%s)", opc,
1599                           (opc < sizeof(reint_names) / sizeof(reint_names[0]) ||
1600                            reint_names[opc] == NULL) ? reint_names[opc] :
1601                                                        "unknown opcode");
1602
1603                 OBD_FAIL_RETURN(OBD_FAIL_MDS_REINT_NET, 0);
1604
1605                 if (opc == REINT_UNLINK || opc == REINT_RENAME)
1606                         bufcount = 4;
1607                 else if (opc == REINT_OPEN)
1608                         bufcount = 3;
1609                 else
1610                         bufcount = 2;
1611
1612                 rc = lustre_pack_reply(req, bufcount, size, NULL);
1613                 if (rc)
1614                         break;
1615
1616                 rc = mds_reint(req, REQ_REC_OFF, NULL);
1617                 fail = OBD_FAIL_MDS_REINT_NET_REP;
1618                 break;
1619         }
1620
1621         case MDS_CLOSE:
1622                 DEBUG_REQ(D_INODE, req, "close");
1623                 OBD_FAIL_RETURN(OBD_FAIL_MDS_CLOSE_NET, 0);
1624                 rc = mds_close(req, REQ_REC_OFF);
1625                 break;
1626
1627         case MDS_DONE_WRITING:
1628                 DEBUG_REQ(D_INODE, req, "done_writing");
1629                 OBD_FAIL_RETURN(OBD_FAIL_MDS_DONE_WRITING_NET, 0);
1630                 rc = mds_done_writing(req, REQ_REC_OFF);
1631                 break;
1632
1633         case MDS_PIN:
1634                 DEBUG_REQ(D_INODE, req, "pin");
1635                 OBD_FAIL_RETURN(OBD_FAIL_MDS_PIN_NET, 0);
1636                 rc = mds_pin(req, REQ_REC_OFF);
1637                 break;
1638
1639         case MDS_SYNC:
1640                 DEBUG_REQ(D_INODE, req, "sync");
1641                 OBD_FAIL_RETURN(OBD_FAIL_MDS_SYNC_NET, 0);
1642                 rc = mds_sync(req, REQ_REC_OFF);
1643                 break;
1644
1645         case MDS_SET_INFO:
1646                 DEBUG_REQ(D_INODE, req, "set_info");
1647                 rc = mds_set_info_rpc(req->rq_export, req);
1648                 break;
1649
1650         case MDS_QUOTACHECK:
1651                 DEBUG_REQ(D_INODE, req, "quotacheck");
1652                 OBD_FAIL_RETURN(OBD_FAIL_MDS_QUOTACHECK_NET, 0);
1653                 rc = mds_handle_quotacheck(req);
1654                 break;
1655
1656         case MDS_QUOTACTL:
1657                 DEBUG_REQ(D_INODE, req, "quotactl");
1658                 OBD_FAIL_RETURN(OBD_FAIL_MDS_QUOTACTL_NET, 0);
1659                 rc = mds_handle_quotactl(req);
1660                 break;
1661
1662         case OBD_PING:
1663                 DEBUG_REQ(D_INODE, req, "ping");
1664                 rc = target_handle_ping(req);
1665                 break;
1666
1667         case OBD_LOG_CANCEL:
1668                 CDEBUG(D_INODE, "log cancel\n");
1669                 OBD_FAIL_RETURN(OBD_FAIL_OBD_LOG_CANCEL_NET, 0);
1670                 rc = -ENOTSUPP; /* la la la */
1671                 break;
1672
1673         case LDLM_ENQUEUE:
1674                 DEBUG_REQ(D_INODE, req, "enqueue");
1675                 OBD_FAIL_RETURN(OBD_FAIL_LDLM_ENQUEUE, 0);
1676                 rc = ldlm_handle_enqueue(req, ldlm_server_completion_ast,
1677                                          ldlm_server_blocking_ast, NULL);
1678                 fail = OBD_FAIL_LDLM_REPLY;
1679                 break;
1680         case LDLM_CONVERT:
1681                 DEBUG_REQ(D_INODE, req, "convert");
1682                 OBD_FAIL_RETURN(OBD_FAIL_LDLM_CONVERT, 0);
1683                 rc = ldlm_handle_convert(req);
1684                 break;
1685         case LDLM_BL_CALLBACK:
1686         case LDLM_CP_CALLBACK:
1687                 DEBUG_REQ(D_INODE, req, "callback");
1688                 CERROR("callbacks should not happen on MDS\n");
1689                 LBUG();
1690                 OBD_FAIL_RETURN(OBD_FAIL_LDLM_BL_CALLBACK, 0);
1691                 break;
1692         case LLOG_ORIGIN_HANDLE_CREATE:
1693                 DEBUG_REQ(D_INODE, req, "llog_init");
1694                 OBD_FAIL_RETURN(OBD_FAIL_OBD_LOGD_NET, 0);
1695                 rc = llog_origin_handle_create(req);
1696                 break;
1697         case LLOG_ORIGIN_HANDLE_DESTROY:
1698                 DEBUG_REQ(D_INODE, req, "llog_init");
1699                 OBD_FAIL_RETURN(OBD_FAIL_OBD_LOGD_NET, 0);
1700                 rc = llog_origin_handle_destroy(req);
1701                 break;
1702         case LLOG_ORIGIN_HANDLE_NEXT_BLOCK:
1703                 DEBUG_REQ(D_INODE, req, "llog next block");
1704                 OBD_FAIL_RETURN(OBD_FAIL_OBD_LOGD_NET, 0);
1705                 rc = llog_origin_handle_next_block(req);
1706                 break;
1707         case LLOG_ORIGIN_HANDLE_PREV_BLOCK:
1708                 DEBUG_REQ(D_INODE, req, "llog prev block");
1709                 OBD_FAIL_RETURN(OBD_FAIL_OBD_LOGD_NET, 0);
1710                 rc = llog_origin_handle_prev_block(req);
1711                 break;
1712         case LLOG_ORIGIN_HANDLE_READ_HEADER:
1713                 DEBUG_REQ(D_INODE, req, "llog read header");
1714                 OBD_FAIL_RETURN(OBD_FAIL_OBD_LOGD_NET, 0);
1715                 rc = llog_origin_handle_read_header(req);
1716                 break;
1717         case LLOG_ORIGIN_HANDLE_CLOSE:
1718                 DEBUG_REQ(D_INODE, req, "llog close");
1719                 OBD_FAIL_RETURN(OBD_FAIL_OBD_LOGD_NET, 0);
1720                 rc = llog_origin_handle_close(req);
1721                 break;
1722         case LLOG_CATINFO:
1723                 DEBUG_REQ(D_INODE, req, "llog catinfo");
1724                 OBD_FAIL_RETURN(OBD_FAIL_OBD_LOGD_NET, 0);
1725                 rc = llog_catinfo(req);
1726                 break;
1727         default:
1728                 req->rq_status = -ENOTSUPP;
1729                 rc = ptlrpc_error(req);
1730                 RETURN(rc);
1731         }
1732
1733         LASSERT(current->journal_info == NULL);
1734
1735         /* If we're DISCONNECTing, the mds_export_data is already freed */
1736         if (!rc && lustre_msg_get_opc(req->rq_reqmsg) != MDS_DISCONNECT) {
1737                 struct mds_export_data *med = &req->rq_export->exp_mds_data;
1738
1739                 /* I don't think last_xid is used for anyway, so I'm not sure
1740                    if we need to care about last_close_xid here.*/
1741                 lustre_msg_set_last_xid(req->rq_repmsg,
1742                                        le64_to_cpu(med->med_mcd->mcd_last_xid));
1743
1744                 target_committed_to_req(req);
1745         }
1746
1747         EXIT;
1748  out:
1749
1750         if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_LAST_REPLAY) {
1751                 if (obd && obd->obd_recovering) {
1752                         DEBUG_REQ(D_HA, req, "LAST_REPLAY, queuing reply");
1753                         return target_queue_final_reply(req, rc);
1754                 }
1755                 /* Lost a race with recovery; let the error path DTRT. */
1756                 rc = req->rq_status = -ENOTCONN;
1757         }
1758
1759         target_send_reply(req, rc, fail);
1760         return 0;
1761 }
1762
1763 /* Update the server data on disk.  This stores the new mount_count and
1764  * also the last_rcvd value to disk.  If we don't have a clean shutdown,
1765  * then the server last_rcvd value may be less than that of the clients.
1766  * This will alert us that we may need to do client recovery.
1767  *
1768  * Also assumes for mds_last_transno that we are not modifying it (no locking).
1769  */
1770 int mds_update_server_data(struct obd_device *obd, int force_sync)
1771 {
1772         struct mds_obd *mds = &obd->u.mds;
1773         struct lr_server_data *lsd = mds->mds_server_data;
1774         struct file *filp = mds->mds_rcvd_filp;
1775         struct lvfs_run_ctxt saved;
1776         loff_t off = 0;
1777         int rc;
1778         ENTRY;
1779
1780         CDEBUG(D_SUPER, "MDS mount_count is "LPU64", last_transno is "LPU64"\n",
1781                mds->mds_mount_count, mds->mds_last_transno);
1782
1783         lsd->lsd_last_transno = cpu_to_le64(mds->mds_last_transno);
1784
1785         push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
1786         rc = fsfilt_write_record(obd, filp, lsd, sizeof(*lsd), &off,force_sync);
1787         pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
1788         if (rc)
1789                 CERROR("error writing MDS server data: rc = %d\n", rc);
1790         RETURN(rc);
1791 }
1792
1793 static void fsoptions_to_mds_flags(struct mds_obd *mds, char *options)
1794 {
1795         char *p = options;
1796
1797         if (!options)
1798                 return;
1799
1800         while (*options) {
1801                 int len;
1802
1803                 while (*p && *p != ',')
1804                         p++;
1805
1806                 len = p - options;
1807                 if (len == sizeof("user_xattr") - 1 &&
1808                     memcmp(options, "user_xattr", len) == 0) {
1809                         mds->mds_fl_user_xattr = 1;
1810                         LCONSOLE_INFO("Enabling user_xattr\n");
1811                 } else if (len == sizeof("nouser_xattr") - 1 &&
1812                            memcmp(options, "nouser_xattr", len) == 0) {
1813                         mds->mds_fl_user_xattr = 0;
1814                         LCONSOLE_INFO("Disabling user_xattr\n");
1815                 } else if (len == sizeof("acl") - 1 &&
1816                            memcmp(options, "acl", len) == 0) {
1817 #ifdef CONFIG_FS_POSIX_ACL
1818                         mds->mds_fl_acl = 1;
1819                         LCONSOLE_INFO("Enabling ACL\n");
1820 #else
1821                         CWARN("ignoring unsupported acl mount option\n");
1822 #endif
1823                 } else if (len == sizeof("noacl") - 1 &&
1824                            memcmp(options, "noacl", len) == 0) {
1825 #ifdef CONFIG_FS_POSIX_ACL
1826                         mds->mds_fl_acl = 0;
1827                         LCONSOLE_INFO("Disabling ACL\n");
1828 #endif
1829                 }
1830
1831                 options = ++p;
1832         }
1833 }
1834 static int mds_lov_presetup (struct mds_obd *mds, struct lustre_cfg *lcfg)
1835 {
1836         int rc;
1837         ENTRY;
1838
1839         rc = llog_start_commit_thread();
1840         if (rc < 0)
1841                 RETURN(rc);
1842
1843         if (lcfg->lcfg_bufcount >= 4 && LUSTRE_CFG_BUFLEN(lcfg, 3) > 0) {
1844                 class_uuid_t uuid;
1845
1846                 generate_random_uuid(uuid);
1847                 class_uuid_unparse(uuid, &mds->mds_lov_uuid);
1848
1849                 OBD_ALLOC(mds->mds_profile, LUSTRE_CFG_BUFLEN(lcfg, 3));
1850                 if (mds->mds_profile == NULL)
1851                         RETURN(-ENOMEM);
1852
1853                 strncpy(mds->mds_profile, lustre_cfg_string(lcfg, 3),
1854                         LUSTRE_CFG_BUFLEN(lcfg, 3));
1855         }
1856         RETURN(rc);
1857 }
1858
1859 /* mount the file system (secretly).  lustre_cfg parameters are:
1860  * 1 = device
1861  * 2 = fstype
1862  * 3 = config name
1863  * 4 = mount options
1864  */
1865 static int mds_setup(struct obd_device *obd, struct lustre_cfg* lcfg)
1866 {
1867         struct lprocfs_static_vars lvars;
1868         struct mds_obd *mds = &obd->u.mds;
1869         struct lustre_mount_info *lmi;
1870         struct vfsmount *mnt;
1871         struct obd_uuid uuid;
1872         __u8 *uuid_ptr;
1873         char *options, *str, *label;
1874         char ns_name[48];
1875         unsigned long page;
1876         int rc = 0;
1877         ENTRY;
1878
1879         /* setup 1:/dev/loop/0 2:ext3 3:mdsA 4:errors=remount-ro,iopen_nopriv */
1880
1881         CLASSERT(offsetof(struct obd_device, u.obt) ==
1882                  offsetof(struct obd_device, u.mds.mds_obt));
1883
1884         if (lcfg->lcfg_bufcount < 3)
1885                 RETURN(rc = -EINVAL);
1886
1887         if (LUSTRE_CFG_BUFLEN(lcfg, 1) == 0 || LUSTRE_CFG_BUFLEN(lcfg, 2) == 0)
1888                 RETURN(rc = -EINVAL);
1889
1890         lmi = server_get_mount(obd->obd_name);
1891         if (lmi) {
1892                 /* We already mounted in lustre_fill_super.
1893                    lcfg bufs 1, 2, 4 (device, fstype, mount opts) are ignored.*/
1894                 struct lustre_sb_info *lsi = s2lsi(lmi->lmi_sb);
1895                 fsoptions_to_mds_flags(mds, lsi->lsi_ldd->ldd_mount_opts);
1896                 fsoptions_to_mds_flags(mds, lsi->lsi_lmd->lmd_opts);
1897                 mnt = lmi->lmi_mnt;
1898                 obd->obd_fsops = fsfilt_get_ops(MT_STR(lsi->lsi_ldd));
1899         } else {
1900                 /* old path - used by lctl */
1901                 CERROR("Using old MDS mount method\n");
1902                 page = __get_free_page(GFP_KERNEL);
1903                 if (!page)
1904                         RETURN(-ENOMEM);
1905
1906                 options = (char *)page;
1907                 memset(options, 0, PAGE_SIZE);
1908
1909                 /* here we use "iopen_nopriv" hardcoded, because it affects
1910                  * MDS utility and the rest of options are passed by mount
1911                  * options. Probably this should be moved to somewhere else
1912                  * like startup scripts or lconf. */
1913                 strcpy(options, "iopen_nopriv");
1914
1915                 if (LUSTRE_CFG_BUFLEN(lcfg, 4) > 0 && lustre_cfg_buf(lcfg, 4)) {
1916                         sprintf(options + strlen(options), ",%s",
1917                                 lustre_cfg_string(lcfg, 4));
1918                         fsoptions_to_mds_flags(mds, options);
1919                 }
1920
1921                 mnt = do_kern_mount(lustre_cfg_string(lcfg, 2), 0,
1922                                     lustre_cfg_string(lcfg, 1),
1923                                     (void *)options);
1924                 free_page(page);
1925                 if (IS_ERR(mnt)) {
1926                         rc = PTR_ERR(mnt);
1927                         LCONSOLE_ERROR("Can't mount disk %s (%d)\n",
1928                                        lustre_cfg_string(lcfg, 1), rc);
1929                         RETURN(rc);
1930                 }
1931
1932                 obd->obd_fsops = fsfilt_get_ops(lustre_cfg_string(lcfg, 2));
1933         }
1934         if (IS_ERR(obd->obd_fsops))
1935                 GOTO(err_put, rc = PTR_ERR(obd->obd_fsops));
1936
1937         CDEBUG(D_SUPER, "%s: mnt = %p\n", lustre_cfg_string(lcfg, 1), mnt);
1938
1939         LASSERT(!lvfs_check_rdonly(lvfs_sbdev(mnt->mnt_sb)));
1940
1941         sema_init(&mds->mds_epoch_sem, 1);
1942         spin_lock_init(&mds->mds_transno_lock);
1943         mds->mds_max_mdsize = sizeof(struct lov_mds_md);
1944         mds->mds_max_cookiesize = sizeof(struct llog_cookie);
1945         mds->mds_atime_diff = MAX_ATIME_DIFF;
1946
1947         sprintf(ns_name, "mds-%s", obd->obd_uuid.uuid);
1948         obd->obd_namespace = ldlm_namespace_new(ns_name, LDLM_NAMESPACE_SERVER);
1949         if (obd->obd_namespace == NULL) {
1950                 mds_cleanup(obd);
1951                 GOTO(err_ops, rc = -ENOMEM);
1952         }
1953         ldlm_register_intent(obd->obd_namespace, mds_intent_policy);
1954
1955         rc = mds_fs_setup(obd, mnt);
1956         if (rc) {
1957                 CERROR("%s: MDS filesystem method init failed: rc = %d\n",
1958                        obd->obd_name, rc);
1959                 GOTO(err_ns, rc);
1960         }
1961
1962         rc = mds_lov_presetup(mds, lcfg);
1963         if (rc < 0)
1964                 GOTO(err_fs, rc);
1965
1966         ptlrpc_init_client(LDLM_CB_REQUEST_PORTAL, LDLM_CB_REPLY_PORTAL,
1967                            "mds_ldlm_client", &obd->obd_ldlm_client);
1968         obd->obd_replayable = 1;
1969
1970         rc = lquota_setup(quota_interface, obd, lcfg);
1971         if (rc)
1972                 GOTO(err_fs, rc);
1973
1974 #if 0
1975         mds->mds_group_hash = upcall_cache_init(obd->obd_name);
1976         if (IS_ERR(mds->mds_group_hash)) {
1977                 rc = PTR_ERR(mds->mds_group_hash);
1978                 mds->mds_group_hash = NULL;
1979                 GOTO(err_qctxt, rc);
1980         }
1981 #endif
1982
1983         /* Don't wait for mds_postrecov trying to clear orphans */
1984         obd->obd_async_recov = 1;
1985         rc = mds_postsetup(obd);
1986         obd->obd_async_recov = 0;
1987         if (rc)
1988                 GOTO(err_qctxt, rc);
1989
1990         lprocfs_init_vars(mds, &lvars);
1991         if (lprocfs_obd_setup(obd, lvars.obd_vars) == 0 &&
1992             lprocfs_alloc_obd_stats(obd, LPROC_MDS_LAST) == 0) {
1993                 /* Init private stats here */
1994                 lprocfs_counter_init(obd->obd_stats, LPROC_MDS_OPEN,
1995                                      /*LPROCFS_CNTR_AVGMINMAX*/0,
1996                                      "open", "reqs");
1997                 lprocfs_counter_init(obd->obd_stats, LPROC_MDS_CLOSE,
1998                                      0, "close", "reqs");
1999                 lprocfs_counter_init(obd->obd_stats, LPROC_MDS_MKNOD,
2000                                      0, "mknod", "reqs");
2001                 lprocfs_counter_init(obd->obd_stats, LPROC_MDS_LINK,
2002                                      0, "link", "reqs");
2003                 lprocfs_counter_init(obd->obd_stats, LPROC_MDS_UNLINK,
2004                                      0, "unlink", "reqs");
2005                 lprocfs_counter_init(obd->obd_stats, LPROC_MDS_MKDIR,
2006                                      0, "mkdir", "reqs");
2007                 lprocfs_counter_init(obd->obd_stats, LPROC_MDS_RMDIR,
2008                                      0, "rmdir", "reqs");
2009                 lprocfs_counter_init(obd->obd_stats, LPROC_MDS_RENAME,
2010                                      0, "rename", "reqs");
2011                 lprocfs_counter_init(obd->obd_stats, LPROC_MDS_GETXATTR,
2012                                      0, "getxattr", "reqs");
2013                 lprocfs_counter_init(obd->obd_stats, LPROC_MDS_SETXATTR,
2014                                      0, "setxattr", "reqs");
2015         }
2016
2017         uuid_ptr = fsfilt_uuid(obd, obd->u.obt.obt_sb);
2018         if (uuid_ptr != NULL) {
2019                 class_uuid_unparse(uuid_ptr, &uuid);
2020                 str = uuid.uuid;
2021         } else {
2022                 str = "no UUID";
2023         }
2024
2025         label = fsfilt_get_label(obd, obd->u.obt.obt_sb);
2026         if (obd->obd_recovering) {
2027                 LCONSOLE_WARN("MDT %s now serving %s (%s%s%s), but will be in "
2028                               "recovery until %d %s reconnect, or if no clients"
2029                               " reconnect for %d:%.02d; during that time new "
2030                               "clients will not be allowed to connect. "
2031                               "Recovery progress can be monitored by watching "
2032                               "/proc/fs/lustre/mds/%s/recovery_status.\n",
2033                               obd->obd_name, lustre_cfg_string(lcfg, 1),
2034                               label ?: "", label ? "/" : "", str,
2035                               obd->obd_recoverable_clients,
2036                               (obd->obd_recoverable_clients == 1) ?
2037                               "client" : "clients",
2038                               (int)(OBD_RECOVERY_TIMEOUT) / 60,
2039                               (int)(OBD_RECOVERY_TIMEOUT) % 60,
2040                               obd->obd_name);
2041         } else {
2042                 LCONSOLE_INFO("MDT %s now serving %s (%s%s%s) with recovery "
2043                               "%s\n", obd->obd_name, lustre_cfg_string(lcfg, 1),
2044                               label ?: "", label ? "/" : "", str,
2045                               obd->obd_replayable ? "enabled" : "disabled");
2046         }
2047
2048         ldlm_timeout = 6;
2049
2050         RETURN(0);
2051
2052 err_qctxt:
2053         lquota_cleanup(quota_interface, obd);
2054 err_fs:
2055         /* No extra cleanup needed for llog_init_commit_thread() */
2056         mds_fs_cleanup(obd);
2057 #if 0
2058         upcall_cache_cleanup(mds->mds_group_hash);
2059         mds->mds_group_hash = NULL;
2060 #endif
2061 err_ns:
2062         ldlm_namespace_free(obd->obd_namespace, 0);
2063         obd->obd_namespace = NULL;
2064 err_ops:
2065         fsfilt_put_ops(obd->obd_fsops);
2066 err_put:
2067         if (lmi) {
2068                 server_put_mount(obd->obd_name, mds->mds_vfsmnt);
2069         } else {
2070                 /* old method */
2071                 unlock_kernel();
2072                 mntput(mds->mds_vfsmnt);
2073                 lock_kernel();
2074         }
2075         obd->u.obt.obt_sb = NULL;
2076         return rc;
2077 }
2078
2079 static int mds_lov_clean(struct obd_device *obd)
2080 {
2081         struct mds_obd *mds = &obd->u.mds;
2082         struct obd_device *osc = mds->mds_osc_obd;
2083         ENTRY;
2084
2085         if (mds->mds_profile) {
2086                 class_del_profile(mds->mds_profile);
2087                 OBD_FREE(mds->mds_profile, strlen(mds->mds_profile) + 1);
2088                 mds->mds_profile = NULL;
2089         }
2090
2091         /* There better be a lov */
2092         if (!osc)
2093                 RETURN(0);
2094         if (IS_ERR(osc))
2095                 RETURN(PTR_ERR(osc));
2096
2097         obd_register_observer(osc, NULL);
2098
2099         /* Give lov our same shutdown flags */
2100         osc->obd_force = obd->obd_force;
2101         osc->obd_fail = obd->obd_fail;
2102
2103         /* Cleanup the lov */
2104         obd_disconnect(mds->mds_osc_exp);
2105         class_manual_cleanup(osc);
2106         mds->mds_osc_exp = NULL;
2107
2108         RETURN(0);
2109 }
2110
2111 static int mds_postsetup(struct obd_device *obd)
2112 {
2113         struct mds_obd *mds = &obd->u.mds;
2114         int rc = 0;
2115         ENTRY;
2116
2117         rc = llog_setup(obd, NULL, LLOG_CONFIG_ORIG_CTXT, obd, 0, NULL,
2118                         &llog_lvfs_ops);
2119         if (rc)
2120                 RETURN(rc);
2121
2122         rc = llog_setup(obd, NULL, LLOG_LOVEA_ORIG_CTXT, obd, 0, NULL,
2123                         &llog_lvfs_ops);
2124         if (rc)
2125                 RETURN(rc);
2126
2127         if (mds->mds_profile) {
2128                 struct lustre_profile *lprof;
2129                 /* The profile defines which osc and mdc to connect to, for a
2130                    client.  We reuse that here to figure out the name of the
2131                    lov to use (and ignore lprof->lp_md).
2132                    The profile was set in the config log with
2133                    LCFG_MOUNTOPT profilenm oscnm mdcnm */
2134                 lprof = class_get_profile(mds->mds_profile);
2135                 if (lprof == NULL) {
2136                         CERROR("No profile found: %s\n", mds->mds_profile);
2137                         GOTO(err_cleanup, rc = -ENOENT);
2138                 }
2139                 rc = mds_lov_connect(obd, lprof->lp_dt);
2140                 if (rc)
2141                         GOTO(err_cleanup, rc);
2142         }
2143
2144         RETURN(rc);
2145
2146 err_cleanup:
2147         mds_lov_clean(obd);
2148         llog_cleanup(llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT));
2149         llog_cleanup(llog_get_context(obd, LLOG_LOVEA_ORIG_CTXT));
2150         RETURN(rc);
2151 }
2152
2153 int mds_postrecov(struct obd_device *obd)
2154 {
2155         int rc;
2156         ENTRY;
2157
2158         if (obd->obd_fail)
2159                 RETURN(0);
2160
2161         LASSERT(!obd->obd_recovering);
2162         LASSERT(llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT) != NULL);
2163
2164         /* FIXME why not put this in the synchronize? */
2165         /* set nextid first, so we are sure it happens */
2166         rc = mds_lov_set_nextid(obd);
2167         if (rc) {
2168                 CERROR("%s: mds_lov_set_nextid failed %d\n",
2169                        obd->obd_name, rc);
2170                 GOTO(out, rc);
2171         }
2172
2173         /* clean PENDING dir */
2174         if (strncmp(obd->obd_name, MDD_OBD_NAME, strlen(MDD_OBD_NAME)))
2175                 rc = mds_cleanup_pending(obd);
2176                 if (rc < 0)
2177                         GOTO(out, rc);
2178
2179         /* FIXME Does target_finish_recovery really need this to block? */
2180         /* Notify the LOV, which will in turn call mds_notify for each tgt */
2181         /* This means that we have to hack obd_notify to think we're obd_set_up
2182            during mds_lov_connect. */
2183         obd_notify(obd->u.mds.mds_osc_obd, NULL,
2184                    obd->obd_async_recov ? OBD_NOTIFY_SYNC_NONBLOCK :
2185                    OBD_NOTIFY_SYNC, NULL);
2186
2187         /* quota recovery */
2188         lquota_recovery(quota_interface, obd);
2189
2190 out:
2191         RETURN(rc);
2192 }
2193
2194 /* We need to be able to stop an mds_lov_synchronize */
2195 static int mds_lov_early_clean(struct obd_device *obd)
2196 {
2197         struct mds_obd *mds = &obd->u.mds;
2198         struct obd_device *osc = mds->mds_osc_obd;
2199
2200         if (!osc || (!obd->obd_force && !obd->obd_fail))
2201                 return(0);
2202
2203         CDEBUG(D_HA, "abort inflight\n");
2204         return (obd_precleanup(osc, OBD_CLEANUP_EARLY));
2205 }
2206
2207 static int mds_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
2208 {
2209         int rc = 0;
2210         ENTRY;
2211
2212         switch (stage) {
2213         case OBD_CLEANUP_EARLY:
2214                 break;
2215         case OBD_CLEANUP_EXPORTS:
2216                 /*XXX Use this for mdd mds cleanup, so comment out
2217                  *this target_cleanup_recovery for this tmp MDD MDS
2218                  *Wangdi*/
2219                 if (strncmp(obd->obd_name, MDD_OBD_NAME, strlen(MDD_OBD_NAME)))
2220                         target_cleanup_recovery(obd);
2221                 mds_lov_early_clean(obd);
2222                 break;
2223         case OBD_CLEANUP_SELF_EXP:
2224                 mds_lov_disconnect(obd);
2225                 mds_lov_clean(obd);
2226                 llog_cleanup(llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT));
2227                 llog_cleanup(llog_get_context(obd, LLOG_LOVEA_ORIG_CTXT));
2228                 rc = obd_llog_finish(obd, 0);
2229                 break;
2230         case OBD_CLEANUP_OBD:
2231                 break;
2232         }
2233         RETURN(rc);
2234 }
2235
2236 static int mds_cleanup(struct obd_device *obd)
2237 {
2238         struct mds_obd *mds = &obd->u.mds;
2239         lvfs_sbdev_type save_dev;
2240         int must_put = 0;
2241         int must_relock = 0;
2242         ENTRY;
2243
2244         if (obd->u.obt.obt_sb == NULL)
2245                 RETURN(0);
2246         save_dev = lvfs_sbdev(obd->u.obt.obt_sb);
2247
2248         if (mds->mds_osc_exp)
2249                 /* lov export was disconnected by mds_lov_clean;
2250                    we just need to drop our ref */
2251                 class_export_put(mds->mds_osc_exp);
2252
2253         lprocfs_free_obd_stats(obd);
2254         lprocfs_obd_cleanup(obd);
2255
2256         lquota_cleanup(quota_interface, obd);
2257
2258         mds_update_server_data(obd, 1);
2259         if (mds->mds_lov_objids != NULL)
2260                 OBD_FREE(mds->mds_lov_objids, mds->mds_lov_objids_size);
2261         mds_fs_cleanup(obd);
2262
2263 #if 0
2264         upcall_cache_cleanup(mds->mds_group_hash);
2265         mds->mds_group_hash = NULL;
2266 #endif
2267
2268         must_put = server_put_mount(obd->obd_name, mds->mds_vfsmnt);
2269         /* must_put is for old method (l_p_m returns non-0 on err) */
2270
2271         /* We can only unlock kernel if we are in the context of sys_ioctl,
2272            otherwise we never called lock_kernel */
2273         if (ll_kernel_locked()) {
2274                 unlock_kernel();
2275                 must_relock++;
2276         }
2277
2278         if (must_put) {
2279                 /* In case we didn't mount with lustre_get_mount -- old method*/
2280                 mntput(mds->mds_vfsmnt);
2281                 lvfs_clear_rdonly(save_dev);
2282         }
2283         obd->u.obt.obt_sb = NULL;
2284
2285         ldlm_namespace_free(obd->obd_namespace, obd->obd_force);
2286
2287         spin_lock_bh(&obd->obd_processing_task_lock);
2288         if (obd->obd_recovering) {
2289                 target_cancel_recovery_timer(obd);
2290                 obd->obd_recovering = 0;
2291         }
2292         spin_unlock_bh(&obd->obd_processing_task_lock);
2293
2294         if (must_relock)
2295                 lock_kernel();
2296
2297         fsfilt_put_ops(obd->obd_fsops);
2298
2299         LCONSOLE_INFO("MDT %s has stopped.\n", obd->obd_name);
2300
2301         RETURN(0);
2302 }
2303
2304 static void fixup_handle_for_resent_req(struct ptlrpc_request *req, int offset,
2305                                         struct ldlm_lock *new_lock,
2306                                         struct ldlm_lock **old_lock,
2307                                         struct lustre_handle *lockh)
2308 {
2309         struct obd_export *exp = req->rq_export;
2310         struct obd_device *obd = exp->exp_obd;
2311         struct ldlm_request *dlmreq =
2312                 lustre_msg_buf(req->rq_reqmsg, offset, sizeof(*dlmreq));
2313         struct lustre_handle remote_hdl = dlmreq->lock_handle1;
2314         struct list_head *iter;
2315
2316         if (!(lustre_msg_get_flags(req->rq_reqmsg) & MSG_RESENT))
2317                 return;
2318
2319         spin_lock(&obd->obd_namespace->ns_hash_lock);
2320         list_for_each(iter, &exp->exp_ldlm_data.led_held_locks) {
2321                 struct ldlm_lock *lock;
2322                 lock = list_entry(iter, struct ldlm_lock, l_export_chain);
2323                 if (lock == new_lock)
2324                         continue;
2325                 if (lock->l_remote_handle.cookie == remote_hdl.cookie) {
2326                         lockh->cookie = lock->l_handle.h_cookie;
2327                         LDLM_DEBUG(lock, "restoring lock cookie");
2328                         DEBUG_REQ(D_HA, req, "restoring lock cookie "LPX64,
2329                                   lockh->cookie);
2330                         if (old_lock)
2331                                 *old_lock = LDLM_LOCK_GET(lock);
2332                         spin_unlock(&obd->obd_namespace->ns_hash_lock);
2333                         return;
2334                 }
2335         }
2336         spin_unlock(&obd->obd_namespace->ns_hash_lock);
2337
2338         /* If the xid matches, then we know this is a resent request,
2339          * and allow it. (It's probably an OPEN, for which we don't
2340          * send a lock */
2341         if (req->rq_xid ==
2342             le64_to_cpu(exp->exp_mds_data.med_mcd->mcd_last_xid))
2343                 return;
2344
2345         if (req->rq_xid ==
2346             le64_to_cpu(exp->exp_mds_data.med_mcd->mcd_last_close_xid))
2347                 return;
2348
2349         /* This remote handle isn't enqueued, so we never received or
2350          * processed this request.  Clear MSG_RESENT, because it can
2351          * be handled like any normal request now. */
2352
2353         lustre_msg_clear_flags(req->rq_reqmsg, MSG_RESENT);
2354
2355         DEBUG_REQ(D_HA, req, "no existing lock with rhandle "LPX64,
2356                   remote_hdl.cookie);
2357 }
2358
2359 int intent_disposition(struct ldlm_reply *rep, int flag)
2360 {
2361         if (!rep)
2362                 return 0;
2363         return (rep->lock_policy_res1 & flag);
2364 }
2365
2366 void intent_set_disposition(struct ldlm_reply *rep, int flag)
2367 {
2368         if (!rep)
2369                 return;
2370         rep->lock_policy_res1 |= flag;
2371 }
2372
2373 static int mds_intent_policy(struct ldlm_namespace *ns,
2374                              struct ldlm_lock **lockp, void *req_cookie,
2375                              ldlm_mode_t mode, int flags, void *data)
2376 {
2377         struct ptlrpc_request *req = req_cookie;
2378         struct ldlm_lock *lock = *lockp;
2379         struct ldlm_intent *it;
2380         struct mds_obd *mds = &req->rq_export->exp_obd->u.mds;
2381         struct ldlm_reply *rep;
2382         struct lustre_handle lockh = { 0 };
2383         struct ldlm_lock *new_lock = NULL;
2384         int getattr_part = MDS_INODELOCK_UPDATE;
2385         int repsize[5] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
2386                            [DLM_LOCKREPLY_OFF]   = sizeof(struct ldlm_reply),
2387                            [DLM_REPLY_REC_OFF]   = sizeof(struct mds_body),
2388                            [DLM_REPLY_REC_OFF+1] = mds->mds_max_mdsize };
2389         int repbufcnt = 4, rc;
2390         ENTRY;
2391
2392         LASSERT(req != NULL);
2393
2394         if (lustre_msg_bufcount(req->rq_reqmsg) <= DLM_INTENT_IT_OFF) {
2395                 /* No intent was provided */
2396                 rc = lustre_pack_reply(req, 2, repsize, NULL);
2397                 LASSERT(rc == 0);
2398                 RETURN(0);
2399         }
2400
2401         it = lustre_swab_reqbuf(req, DLM_INTENT_IT_OFF, sizeof(*it),
2402                                 lustre_swab_ldlm_intent);
2403         if (it == NULL) {
2404                 CERROR("Intent missing\n");
2405                 RETURN(req->rq_status = -EFAULT);
2406         }
2407
2408         LDLM_DEBUG(lock, "intent policy, opc: %s", ldlm_it2str(it->opc));
2409
2410         if ((req->rq_export->exp_connect_flags & OBD_CONNECT_ACL) &&
2411             (it->opc & (IT_OPEN | IT_GETATTR | IT_LOOKUP)))
2412                 /* we should never allow OBD_CONNECT_ACL if not configured */
2413                 repsize[repbufcnt++] = LUSTRE_POSIX_ACL_MAX_SIZE;
2414         else if (it->opc & IT_UNLINK)
2415                 repsize[repbufcnt++] = mds->mds_max_cookiesize;
2416
2417         rc = lustre_pack_reply(req, repbufcnt, repsize, NULL);
2418         if (rc)
2419                 RETURN(req->rq_status = rc);
2420
2421         rep = lustre_msg_buf(req->rq_repmsg, DLM_LOCKREPLY_OFF, sizeof(*rep));
2422         intent_set_disposition(rep, DISP_IT_EXECD);
2423
2424
2425         /* execute policy */
2426         switch ((long)it->opc) {
2427         case IT_OPEN:
2428         case IT_CREAT|IT_OPEN:
2429                 lprocfs_counter_incr(req->rq_export->exp_obd->obd_stats,
2430                                      LPROC_MDS_OPEN);
2431                 fixup_handle_for_resent_req(req, DLM_LOCKREQ_OFF, lock, NULL,
2432                                             &lockh);
2433                 /* XXX swab here to assert that an mds_open reint
2434                  * packet is following */
2435                 rep->lock_policy_res2 = mds_reint(req, DLM_INTENT_REC_OFF,
2436                                                   &lockh);
2437 #if 0
2438                 /* We abort the lock if the lookup was negative and
2439                  * we did not make it to the OPEN portion */
2440                 if (!intent_disposition(rep, DISP_LOOKUP_EXECD))
2441                         RETURN(ELDLM_LOCK_ABORTED);
2442                 if (intent_disposition(rep, DISP_LOOKUP_NEG) &&
2443                     !intent_disposition(rep, DISP_OPEN_OPEN))
2444 #endif
2445                 if (rep->lock_policy_res2) {
2446                         /* mds_open returns ENOLCK where it should return zero,
2447                            but it has no lock to return */
2448                         if (rep->lock_policy_res2 == ENOLCK)
2449                                 rep->lock_policy_res2 = 0;
2450                         RETURN(ELDLM_LOCK_ABORTED);
2451                 }
2452                 break;
2453         case IT_LOOKUP:
2454                         getattr_part = MDS_INODELOCK_LOOKUP;
2455         case IT_GETATTR:
2456                         getattr_part |= MDS_INODELOCK_LOOKUP;
2457                         OBD_COUNTER_INCREMENT(req->rq_export->exp_obd, getattr);
2458         case IT_READDIR:
2459                 fixup_handle_for_resent_req(req, DLM_LOCKREQ_OFF, lock,
2460                                             &new_lock, &lockh);
2461
2462                 /* INODEBITS_INTEROP: if this lock was converted from a
2463                  * plain lock (client does not support inodebits), then
2464                  * child lock must be taken with both lookup and update
2465                  * bits set for all operations.
2466                  */
2467                 if (!(req->rq_export->exp_connect_flags & OBD_CONNECT_IBITS))
2468                         getattr_part = MDS_INODELOCK_LOOKUP |
2469                                        MDS_INODELOCK_UPDATE;
2470
2471                 rep->lock_policy_res2 = mds_getattr_lock(req,DLM_INTENT_REC_OFF,
2472                                                          getattr_part, &lockh);
2473                 /* FIXME: LDLM can set req->rq_status. MDS sets
2474                    policy_res{1,2} with disposition and status.
2475                    - replay: returns 0 & req->status is old status
2476                    - otherwise: returns req->status */
2477                 if (intent_disposition(rep, DISP_LOOKUP_NEG))
2478                         rep->lock_policy_res2 = 0;
2479                 if (!intent_disposition(rep, DISP_LOOKUP_POS) ||
2480                     rep->lock_policy_res2)
2481                         RETURN(ELDLM_LOCK_ABORTED);
2482                 if (req->rq_status != 0) {
2483                         LBUG();
2484                         rep->lock_policy_res2 = req->rq_status;
2485                         RETURN(ELDLM_LOCK_ABORTED);
2486                 }
2487                 break;
2488         default:
2489                 CERROR("Unhandled intent "LPD64"\n", it->opc);
2490                 RETURN(-EFAULT);
2491         }
2492
2493         /* By this point, whatever function we called above must have either
2494          * filled in 'lockh', been an intent replay, or returned an error.  We
2495          * want to allow replayed RPCs to not get a lock, since we would just
2496          * drop it below anyways because lock replay is done separately by the
2497          * client afterwards.  For regular RPCs we want to give the new lock to
2498          * the client instead of whatever lock it was about to get. */
2499         if (new_lock == NULL)
2500                 new_lock = ldlm_handle2lock(&lockh);
2501         if (new_lock == NULL && (flags & LDLM_FL_INTENT_ONLY))
2502                 RETURN(0);
2503
2504         LASSERTF(new_lock != NULL, "op "LPX64" lockh "LPX64"\n",
2505                  it->opc, lockh.cookie);
2506
2507         /* If we've already given this lock to a client once, then we should
2508          * have no readers or writers.  Otherwise, we should have one reader
2509          * _or_ writer ref (which will be zeroed below) before returning the
2510          * lock to a client. */
2511         if (new_lock->l_export == req->rq_export) {
2512                 LASSERT(new_lock->l_readers + new_lock->l_writers == 0);
2513         } else {
2514                 LASSERT(new_lock->l_export == NULL);
2515                 LASSERT(new_lock->l_readers + new_lock->l_writers == 1);
2516         }
2517
2518         *lockp = new_lock;
2519
2520         if (new_lock->l_export == req->rq_export) {
2521                 /* Already gave this to the client, which means that we
2522                  * reconstructed a reply. */
2523                 LASSERT(lustre_msg_get_flags(req->rq_reqmsg) &
2524                         MSG_RESENT);
2525                 RETURN(ELDLM_LOCK_REPLACED);
2526         }
2527
2528         /* Fixup the lock to be given to the client */
2529         lock_res_and_lock(new_lock);
2530         new_lock->l_readers = 0;
2531         new_lock->l_writers = 0;
2532
2533         new_lock->l_export = class_export_get(req->rq_export);
2534         list_add(&new_lock->l_export_chain,
2535                  &new_lock->l_export->exp_ldlm_data.led_held_locks);
2536
2537         new_lock->l_blocking_ast = lock->l_blocking_ast;
2538         new_lock->l_completion_ast = lock->l_completion_ast;
2539
2540         memcpy(&new_lock->l_remote_handle, &lock->l_remote_handle,
2541                sizeof(lock->l_remote_handle));
2542
2543         new_lock->l_flags &= ~LDLM_FL_LOCAL;
2544
2545         unlock_res_and_lock(new_lock);
2546         LDLM_LOCK_PUT(new_lock);
2547
2548         RETURN(ELDLM_LOCK_REPLACED);
2549 }
2550
2551 static int mdt_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
2552 {
2553         struct mds_obd *mds = &obd->u.mds;
2554         struct lprocfs_static_vars lvars;
2555         int rc = 0;
2556         ENTRY;
2557
2558         lprocfs_init_vars(mdt, &lvars);
2559         lprocfs_obd_setup(obd, lvars.obd_vars);
2560
2561         sema_init(&mds->mds_health_sem, 1);
2562
2563         if (mds_num_threads < 2)
2564                 mds_num_threads = MDT_NUM_THREADS;
2565         if (mds_num_threads > MDT_MAX_THREADS)
2566                 mds_num_threads = MDT_MAX_THREADS;
2567
2568         mds->mds_service =
2569                 ptlrpc_init_svc(MDS_NBUFS, MDS_BUFSIZE, MDS_MAXREQSIZE,
2570                                 MDS_MAXREPSIZE, MDS_REQUEST_PORTAL,
2571                                 MDC_REPLY_PORTAL, MDS_SERVICE_WATCHDOG_TIMEOUT,
2572                                 mds_handle, LUSTRE_MDS_NAME,
2573                                 obd->obd_proc_entry, NULL, mds_num_threads, 0);
2574
2575         if (!mds->mds_service) {
2576                 CERROR("failed to start service\n");
2577                 GOTO(err_lprocfs, rc = -ENOMEM);
2578         }
2579
2580         rc = ptlrpc_start_threads(obd, mds->mds_service, "ll_mdt");
2581         if (rc)
2582                 GOTO(err_thread, rc);
2583
2584         mds->mds_setattr_service =
2585                 ptlrpc_init_svc(MDS_NBUFS, MDS_BUFSIZE, MDS_MAXREQSIZE,
2586                                 MDS_MAXREPSIZE, MDS_SETATTR_PORTAL,
2587                                 MDC_REPLY_PORTAL, MDS_SERVICE_WATCHDOG_TIMEOUT,
2588                                 mds_handle, "mds_setattr",
2589                                 obd->obd_proc_entry, NULL, mds_num_threads, 0);
2590         if (!mds->mds_setattr_service) {
2591                 CERROR("failed to start getattr service\n");
2592                 GOTO(err_thread, rc = -ENOMEM);
2593         }
2594
2595         rc = ptlrpc_start_threads(obd, mds->mds_setattr_service,
2596                                   "ll_mdt_attr");
2597         if (rc)
2598                 GOTO(err_thread2, rc);
2599
2600         mds->mds_readpage_service =
2601                 ptlrpc_init_svc(MDS_NBUFS, MDS_BUFSIZE, MDS_MAXREQSIZE,
2602                                 MDS_MAXREPSIZE, MDS_READPAGE_PORTAL,
2603                                 MDC_REPLY_PORTAL, MDS_SERVICE_WATCHDOG_TIMEOUT,
2604                                 mds_handle, "mds_readpage",
2605                                 obd->obd_proc_entry, NULL, mds_num_threads, 0);
2606         if (!mds->mds_readpage_service) {
2607                 CERROR("failed to start readpage service\n");
2608                 GOTO(err_thread2, rc = -ENOMEM);
2609         }
2610
2611         rc = ptlrpc_start_threads(obd, mds->mds_readpage_service,
2612                                   "ll_mdt_rdpg");
2613
2614         if (rc)
2615                 GOTO(err_thread3, rc);
2616
2617         ping_evictor_start();
2618         RETURN(0);
2619
2620 err_thread3:
2621         ptlrpc_unregister_service(mds->mds_readpage_service);
2622         mds->mds_readpage_service = NULL;
2623 err_thread2:
2624         ptlrpc_unregister_service(mds->mds_setattr_service);
2625         mds->mds_setattr_service = NULL;
2626 err_thread:
2627         ptlrpc_unregister_service(mds->mds_service);
2628         mds->mds_service = NULL;
2629 err_lprocfs:
2630         lprocfs_obd_cleanup(obd);
2631         return rc;
2632 }
2633
2634 static int mdt_cleanup(struct obd_device *obd)
2635 {
2636         struct mds_obd *mds = &obd->u.mds;
2637         ENTRY;
2638
2639         ping_evictor_stop();
2640
2641         down(&mds->mds_health_sem);
2642         ptlrpc_unregister_service(mds->mds_readpage_service);
2643         ptlrpc_unregister_service(mds->mds_setattr_service);
2644         ptlrpc_unregister_service(mds->mds_service);
2645         mds->mds_readpage_service = NULL;
2646         mds->mds_setattr_service = NULL;
2647         mds->mds_service = NULL;
2648         up(&mds->mds_health_sem);
2649
2650         lprocfs_obd_cleanup(obd);
2651
2652         RETURN(0);
2653 }
2654
2655 static int mdt_health_check(struct obd_device *obd)
2656 {
2657         struct mds_obd *mds = &obd->u.mds;
2658         int rc = 0;
2659
2660         down(&mds->mds_health_sem);
2661         rc |= ptlrpc_service_health_check(mds->mds_readpage_service);
2662         rc |= ptlrpc_service_health_check(mds->mds_setattr_service);
2663         rc |= ptlrpc_service_health_check(mds->mds_service);
2664         up(&mds->mds_health_sem);
2665
2666         /*
2667          * health_check to return 0 on healthy
2668          * and 1 on unhealthy.
2669          */
2670         if(rc != 0)
2671                 rc = 1;
2672
2673         return rc;
2674 }
2675
2676 static struct dentry *mds_lvfs_fid2dentry(__u64 id, __u32 gen, __u64 gr,
2677                                           void *data)
2678 {
2679         struct obd_device *obd = data;
2680         struct ll_fid fid;
2681         fid.id = id;
2682         fid.generation = gen;
2683         return mds_fid2dentry(&obd->u.mds, &fid, NULL);
2684 }
2685
2686 static int mds_health_check(struct obd_device *obd)
2687 {
2688         struct obd_device_target *odt = &obd->u.obt;
2689         struct mds_obd *mds = &obd->u.mds;
2690         int rc = 0;
2691
2692         if (odt->obt_sb->s_flags & MS_RDONLY)
2693                 rc = 1;
2694
2695         LASSERT(mds->mds_health_check_filp != NULL);
2696         rc |= !!lvfs_check_io_health(obd, mds->mds_health_check_filp);
2697
2698         return rc;
2699 }
2700
2701 static int mds_process_config(struct obd_device *obd, obd_count len, void *buf)
2702 {
2703         struct lustre_cfg *lcfg = buf;
2704         struct lprocfs_static_vars lvars;
2705         int rc;
2706
2707         lprocfs_init_vars(mds, &lvars);
2708
2709         rc = class_process_proc_param(PARAM_MDT, lvars.obd_vars, lcfg, obd);
2710         return(rc);
2711 }
2712
2713 struct lvfs_callback_ops mds_lvfs_ops = {
2714         l_fid2dentry:     mds_lvfs_fid2dentry,
2715 };
2716
2717 /* use obd ops to offer management infrastructure */
2718 static struct obd_ops mds_obd_ops = {
2719         .o_owner           = THIS_MODULE,
2720         .o_connect         = mds_connect,
2721         .o_reconnect       = mds_reconnect,
2722         .o_init_export     = mds_init_export,
2723         .o_destroy_export  = mds_destroy_export,
2724         .o_disconnect      = mds_disconnect,
2725         .o_setup           = mds_setup,
2726         .o_precleanup      = mds_precleanup,
2727         .o_cleanup         = mds_cleanup,
2728         .o_postrecov       = mds_postrecov,
2729         .o_statfs          = mds_obd_statfs,
2730         .o_iocontrol       = mds_iocontrol,
2731         .o_create          = mds_obd_create,
2732         .o_destroy         = mds_obd_destroy,
2733         .o_llog_init       = mds_llog_init,
2734         .o_llog_finish     = mds_llog_finish,
2735         .o_notify          = mds_notify,
2736         .o_health_check    = mds_health_check,
2737         .o_process_config  = mds_process_config,
2738 };
2739
2740 static struct obd_ops mdt_obd_ops = {
2741         .o_owner           = THIS_MODULE,
2742         .o_setup           = mdt_setup,
2743         .o_cleanup         = mdt_cleanup,
2744         .o_health_check    = mdt_health_check,
2745 };
2746
2747 quota_interface_t *quota_interface;
2748 quota_interface_t mds_quota_interface;
2749
2750 static __attribute__((unused)) int __init mds_init(void)
2751 {
2752         int rc;
2753         struct lprocfs_static_vars lvars;
2754
2755         request_module("lquota");
2756         quota_interface = PORTAL_SYMBOL_GET(mds_quota_interface);
2757         rc = lquota_init(quota_interface);
2758         if (rc) {
2759                 if (quota_interface)
2760                         PORTAL_SYMBOL_PUT(mds_quota_interface);
2761                 return rc;
2762         }
2763         init_obd_quota_ops(quota_interface, &mds_obd_ops);
2764
2765         lprocfs_init_vars(mds, &lvars);
2766         class_register_type(&mds_obd_ops, NULL,
2767                             lvars.module_vars, LUSTRE_MDS_NAME, NULL);
2768         lprocfs_init_vars(mdt, &lvars);
2769         mdt_obd_ops = mdt_obd_ops; //make compiler happy
2770 //        class_register_type(&mdt_obd_ops, NULL,
2771 //                            lvars.module_vars, LUSTRE_MDT_NAME, NULL);
2772
2773         return 0;
2774 }
2775
2776 static __attribute__((unused)) void /*__exit*/ mds_exit(void)
2777 {
2778         lquota_exit(quota_interface);
2779         if (quota_interface)
2780                 PORTAL_SYMBOL_PUT(mds_quota_interface);
2781
2782         class_unregister_type(LUSTRE_MDS_NAME);
2783 //        class_unregister_type(LUSTRE_MDT_NAME);
2784 }
2785 /*mds still need lov setup here*/
2786 static int mds_cmd_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
2787 {
2788         struct mds_obd *mds = &obd->u.mds;
2789         struct lvfs_run_ctxt saved;
2790         const char     *dev;
2791         struct vfsmount *mnt;
2792         struct lustre_sb_info *lsi;
2793         struct lustre_mount_info *lmi;
2794         struct dentry  *dentry;
2795         struct file *file;
2796         int rc = 0;
2797         ENTRY;
2798
2799         CDEBUG(D_INFO, "obd %s setup \n", obd->obd_name);
2800         if (strncmp(obd->obd_name, MDD_OBD_NAME, strlen(MDD_OBD_NAME)))
2801                 RETURN(0);
2802
2803         if (lcfg->lcfg_bufcount < 5) {
2804                 CERROR("invalid arg for setup %s\n", MDD_OBD_NAME);
2805                 RETURN(-EINVAL);
2806         }
2807         dev = lustre_cfg_string(lcfg, 4);
2808         lmi = server_get_mount(dev);
2809         LASSERT(lmi != NULL);
2810
2811         lsi = s2lsi(lmi->lmi_sb);
2812         mnt = lmi->lmi_mnt;
2813         /* FIXME: MDD LOV initialize objects.
2814          * we need only lmi here but not get mount
2815          * OSD did mount already, so put mount back
2816          */
2817         atomic_dec(&lsi->lsi_mounts);
2818         mntput(mnt);
2819
2820         obd->obd_fsops = fsfilt_get_ops(MT_STR(lsi->lsi_ldd));
2821         mds_init_ctxt(obd, mnt);
2822
2823         push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
2824         dentry = simple_mkdir(current->fs->pwd, "OBJECTS", 0777, 1);
2825         if (IS_ERR(dentry)) {
2826                 rc = PTR_ERR(dentry);
2827                 CERROR("cannot create OBJECTS directory: rc = %d\n", rc);
2828                 GOTO(err_putfs, rc);
2829         }
2830         mds->mds_objects_dir = dentry;
2831
2832         dentry = lookup_one_len("__iopen__", current->fs->pwd,
2833                                 strlen("__iopen__"));
2834         if (IS_ERR(dentry)) {
2835                 rc = PTR_ERR(dentry);
2836                 CERROR("cannot lookup __iopen__ directory: rc = %d\n", rc);
2837                 GOTO(err_objects, rc);
2838         }
2839
2840         mds->mds_fid_de = dentry;
2841         if (!dentry->d_inode || is_bad_inode(dentry->d_inode)) {
2842                 rc = -ENOENT;
2843                 CERROR("__iopen__ directory has no inode? rc = %d\n", rc);
2844                 GOTO(err_fid, rc);
2845         }
2846
2847         /* open and test the lov objd file */
2848         file = filp_open(LOV_OBJID, O_RDWR | O_CREAT, 0644);
2849         if (IS_ERR(file)) {
2850                 rc = PTR_ERR(file);
2851                 CERROR("cannot open/create %s file: rc = %d\n", LOV_OBJID, rc);
2852                 GOTO(err_fid, rc = PTR_ERR(file));
2853         }
2854         mds->mds_lov_objid_filp = file;
2855         if (!S_ISREG(file->f_dentry->d_inode->i_mode)) {
2856                 CERROR("%s is not a regular file!: mode = %o\n", LOV_OBJID,
2857                        file->f_dentry->d_inode->i_mode);
2858                 GOTO(err_lov_objid, rc = -ENOENT);
2859         }
2860
2861         rc = mds_lov_presetup(mds, lcfg);
2862         if (rc < 0)
2863                 GOTO(err_objects, rc);
2864
2865         /* Don't wait for mds_postrecov trying to clear orphans */
2866         obd->obd_async_recov = 1;
2867         rc = mds_postsetup(obd);
2868         obd->obd_async_recov = 0;
2869
2870         if (rc)
2871                 GOTO(err_objects, rc);
2872
2873         mds->mds_max_mdsize = sizeof(struct lov_mds_md);
2874         mds->mds_max_cookiesize = sizeof(struct llog_cookie);
2875
2876 err_pop:
2877         pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
2878         RETURN(rc);
2879 err_lov_objid:
2880         if (mds->mds_lov_objid_filp &&
2881                 filp_close((struct file *)mds->mds_lov_objid_filp, 0))
2882                 CERROR("can't close %s after error\n", LOV_OBJID);
2883 err_fid:
2884         dput(mds->mds_fid_de);
2885 err_objects:
2886         dput(mds->mds_objects_dir);
2887 err_putfs:
2888         fsfilt_put_ops(obd->obd_fsops);
2889         goto err_pop;
2890 }
2891
2892 static int mds_cmd_cleanup(struct obd_device *obd)
2893 {
2894         struct mds_obd *mds = &obd->u.mds;
2895         struct lvfs_run_ctxt saved;
2896         int rc = 0;
2897         ENTRY;
2898
2899         if (obd->obd_fail)
2900                 LCONSOLE_WARN("%s: shutting down for failover; client state "
2901                               "will be preserved.\n", obd->obd_name);
2902
2903         push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
2904         if (mds->mds_lov_objid_filp) {
2905                 rc = filp_close((struct file *)mds->mds_lov_objid_filp, 0);
2906                 mds->mds_lov_objid_filp = NULL;
2907                 if (rc)
2908                         CERROR("%s file won't close, rc=%d\n", LOV_OBJID, rc);
2909         }
2910         if (mds->mds_objects_dir != NULL) {
2911                 l_dput(mds->mds_objects_dir);
2912                 mds->mds_objects_dir = NULL;
2913         }
2914
2915         if (mds->mds_lov_objids != NULL)
2916                 OBD_FREE(mds->mds_lov_objids, mds->mds_lov_objids_size);
2917
2918         shrink_dcache_parent(mds->mds_fid_de);
2919         dput(mds->mds_fid_de);
2920         LL_DQUOT_OFF(obd->u.obt.obt_sb);
2921         fsfilt_put_ops(obd->obd_fsops);
2922
2923         pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
2924         RETURN(rc);
2925 }
2926
2927 #if 0
2928 static int mds_cmd_health_check(struct obd_device *obd)
2929 {
2930         return 0;
2931 }
2932 #endif
2933 static struct obd_ops mds_cmd_obd_ops = {
2934         .o_owner           = THIS_MODULE,
2935         .o_setup           = mds_cmd_setup,
2936         .o_cleanup         = mds_cmd_cleanup,
2937         .o_precleanup      = mds_precleanup,
2938         .o_create          = mds_obd_create,
2939         .o_destroy         = mds_obd_destroy,
2940         .o_llog_init       = mds_llog_init,
2941         .o_llog_finish     = mds_llog_finish,
2942         .o_notify          = mds_notify,
2943         .o_postrecov       = mds_postrecov,
2944         //   .o_health_check    = mds_cmd_health_check,
2945 };
2946
2947 static int __init mds_cmd_init(void)
2948 {
2949         struct lprocfs_static_vars lvars;
2950
2951         lprocfs_init_vars(mds, &lvars);
2952         class_register_type(&mds_cmd_obd_ops, NULL, lvars.module_vars,
2953                             LUSTRE_MDS_NAME, NULL);
2954
2955         return 0;
2956 }
2957
2958 static void /*__exit*/ mds_cmd_exit(void)
2959 {
2960         class_unregister_type(LUSTRE_MDS_NAME);
2961 }
2962
2963 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
2964 MODULE_DESCRIPTION("Lustre Metadata Server (MDS)");
2965 MODULE_LICENSE("GPL");
2966
2967 module_init(mds_cmd_init);
2968 module_exit(mds_cmd_exit);