Whamcloud - gitweb
b=11089
[fs/lustre-release.git] / lustre / mds / handler.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  lustre/mds/handler.c
5  *  Lustre Metadata Server (mds) request handler
6  *
7  *  Copyright (c) 2001-2005 Cluster File Systems, Inc.
8  *   Author: Peter Braam <braam@clusterfs.com>
9  *   Author: Andreas Dilger <adilger@clusterfs.com>
10  *   Author: Phil Schwan <phil@clusterfs.com>
11  *   Author: Mike Shaver <shaver@clusterfs.com>
12  *
13  *   This file is part of the Lustre file system, http://www.lustre.org
14  *   Lustre is a trademark of Cluster File Systems, Inc.
15  *
16  *   You may have signed or agreed to another license before downloading
17  *   this software.  If so, you are bound by the terms and conditions
18  *   of that agreement, and the following does not apply to you.  See the
19  *   LICENSE file included with this distribution for more information.
20  *
21  *   If you did not agree to a different license, then this copy of Lustre
22  *   is open source software; you can redistribute it and/or modify it
23  *   under the terms of version 2 of the GNU General Public License as
24  *   published by the Free Software Foundation.
25  *
26  *   In either case, Lustre is distributed in the hope that it will be
27  *   useful, but WITHOUT ANY WARRANTY; without even the implied warranty
28  *   of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
29  *   license text for more details.
30  */
31
32 #ifndef EXPORT_SYMTAB
33 # define EXPORT_SYMTAB
34 #endif
35 #define DEBUG_SUBSYSTEM S_MDS
36
37 #include <lustre_mds.h>
38 #include <linux/module.h>
39 #include <linux/init.h>
40 #include <linux/random.h>
41 #include <linux/fs.h>
42 #include <linux/jbd.h>
43 #include <linux/smp_lock.h>
44 #include <linux/buffer_head.h>
45 #include <linux/workqueue.h>
46 #include <linux/mount.h>
47
48 #include <linux/lustre_acl.h>
49 #include <obd_class.h>
50 #include <lustre_dlm.h>
51 #include <obd_lov.h>
52 #include <lustre_fsfilt.h>
53 #include <lprocfs_status.h>
54 #include <lustre_commit_confd.h>
55 #include <lustre_quota.h>
56 #include <lustre_disk.h>
57 #include <lustre_param.h>
58
59 #include "mds_internal.h"
60
61 int mds_num_threads;
62 CFS_MODULE_PARM(mds_num_threads, "i", int, 0444,
63                 "number of MDS service threads to start");
64
65 __u32 mds_max_ost_index=0xFFFF;
66 CFS_MODULE_PARM(mds_max_ost_index, "i", int, 0444,
67                 "maximal OST index");
68
69 static int mds_intent_policy(struct ldlm_namespace *ns,
70                              struct ldlm_lock **lockp, void *req_cookie,
71                              ldlm_mode_t mode, int flags, void *data);
72 static int mds_postsetup(struct obd_device *obd);
73 static int mds_cleanup(struct obd_device *obd);
74
75 /* Assumes caller has already pushed into the kernel filesystem context */
76 static int mds_sendpage(struct ptlrpc_request *req, struct file *file,
77                         loff_t offset, int count)
78 {
79         struct ptlrpc_bulk_desc *desc;
80         struct l_wait_info lwi;
81         struct page **pages;
82         int rc = 0, npages, i, tmpcount, tmpsize = 0;
83         ENTRY;
84
85         LASSERT((offset & ~CFS_PAGE_MASK) == 0); /* I'm dubious about this */
86
87         npages = (count + CFS_PAGE_SIZE - 1) >> CFS_PAGE_SHIFT;
88         OBD_ALLOC(pages, sizeof(*pages) * npages);
89         if (!pages)
90                 GOTO(out, rc = -ENOMEM);
91
92         desc = ptlrpc_prep_bulk_exp(req, npages, BULK_PUT_SOURCE,
93                                     MDS_BULK_PORTAL);
94         if (desc == NULL)
95                 GOTO(out_free, rc = -ENOMEM);
96
97         for (i = 0, tmpcount = count; i < npages; i++, tmpcount -= tmpsize) {
98                 tmpsize = tmpcount > CFS_PAGE_SIZE ? CFS_PAGE_SIZE : tmpcount;
99
100                 OBD_PAGE_ALLOC(pages[i], CFS_ALLOC_STD);
101                 if (pages[i] == NULL)
102                         GOTO(cleanup_buf, rc = -ENOMEM);
103
104                 ptlrpc_prep_bulk_page(desc, pages[i], 0, tmpsize);
105         }
106
107         for (i = 0, tmpcount = count; i < npages; i++, tmpcount -= tmpsize) {
108                 tmpsize = tmpcount > CFS_PAGE_SIZE ? CFS_PAGE_SIZE : tmpcount;
109                 CDEBUG(D_EXT2, "reading %u@%llu from dir %lu (size %llu)\n",
110                        tmpsize, offset, file->f_dentry->d_inode->i_ino,
111                        i_size_read(file->f_dentry->d_inode));
112
113                 rc = fsfilt_readpage(req->rq_export->exp_obd, file,
114                                      kmap(pages[i]), tmpsize, &offset);
115                 kunmap(pages[i]);
116
117                 if (rc != tmpsize)
118                         GOTO(cleanup_buf, rc = -EIO);
119         }
120
121         LASSERT(desc->bd_nob == count);
122
123         rc = ptlrpc_start_bulk_transfer(desc);
124         if (rc)
125                 GOTO(cleanup_buf, rc);
126
127         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_SENDPAGE)) {
128                 CERROR("obd_fail_loc=%x, fail operation rc=%d\n",
129                        OBD_FAIL_MDS_SENDPAGE, rc);
130                 GOTO(abort_bulk, rc);
131         }
132
133         lwi = LWI_TIMEOUT(obd_timeout * HZ / 4, NULL, NULL);
134         rc = l_wait_event(desc->bd_waitq, !ptlrpc_bulk_active(desc), &lwi);
135         LASSERT (rc == 0 || rc == -ETIMEDOUT);
136
137         if (rc == 0) {
138                 if (desc->bd_success &&
139                     desc->bd_nob_transferred == count)
140                         GOTO(cleanup_buf, rc);
141
142                 rc = -ETIMEDOUT; /* XXX should this be a different errno? */
143         }
144
145         DEBUG_REQ(D_ERROR, req, "bulk failed: %s %d(%d), evicting %s@%s",
146                   (rc == -ETIMEDOUT) ? "timeout" : "network error",
147                   desc->bd_nob_transferred, count,
148                   req->rq_export->exp_client_uuid.uuid,
149                   req->rq_export->exp_connection->c_remote_uuid.uuid);
150
151         class_fail_export(req->rq_export);
152
153         EXIT;
154  abort_bulk:
155         ptlrpc_abort_bulk (desc);
156  cleanup_buf:
157         for (i = 0; i < npages; i++)
158                 if (pages[i])
159                         OBD_PAGE_FREE(pages[i]);
160
161         ptlrpc_free_bulk(desc);
162  out_free:
163         OBD_FREE(pages, sizeof(*pages) * npages);
164  out:
165         return rc;
166 }
167
168 /* only valid locked dentries or errors should be returned */
169 struct dentry *mds_fid2locked_dentry(struct obd_device *obd, struct ll_fid *fid,
170                                      struct vfsmount **mnt, int lock_mode,
171                                      struct lustre_handle *lockh,
172                                      __u64 lockpart)
173 {
174         struct mds_obd *mds = &obd->u.mds;
175         struct dentry *de = mds_fid2dentry(mds, fid, mnt), *retval = de;
176         struct ldlm_res_id res_id = { .name = {0} };
177         int flags = LDLM_FL_ATOMIC_CB, rc;
178         ldlm_policy_data_t policy = { .l_inodebits = { lockpart} };
179         ENTRY;
180
181         if (IS_ERR(de))
182                 RETURN(de);
183
184         res_id.name[0] = de->d_inode->i_ino;
185         res_id.name[1] = de->d_inode->i_generation;
186         rc = ldlm_cli_enqueue_local(obd->obd_namespace, &res_id,
187                                     LDLM_IBITS, &policy, lock_mode, &flags,
188                                     ldlm_blocking_ast, ldlm_completion_ast,
189                                     NULL, NULL, 0, NULL, lockh);
190         if (rc != ELDLM_OK) {
191                 l_dput(de);
192                 retval = ERR_PTR(-EIO); /* XXX translate ldlm code */
193         }
194
195         RETURN(retval);
196 }
197
198 /* Look up an entry by inode number. */
199 /* this function ONLY returns valid dget'd dentries with an initialized inode
200    or errors */
201 struct dentry *mds_fid2dentry(struct mds_obd *mds, struct ll_fid *fid,
202                               struct vfsmount **mnt)
203 {
204         char fid_name[32];
205         unsigned long ino = fid->id;
206         __u32 generation = fid->generation;
207         struct inode *inode;
208         struct dentry *result;
209
210         if (ino == 0)
211                 RETURN(ERR_PTR(-ESTALE));
212
213         snprintf(fid_name, sizeof(fid_name), "0x%lx", ino);
214
215         CDEBUG(D_DENTRY, "--> mds_fid2dentry: ino/gen %lu/%u, sb %p\n",
216                ino, generation, mds->mds_obt.obt_sb);
217
218         /* under ext3 this is neither supposed to return bad inodes
219            nor NULL inodes. */
220         result = ll_lookup_one_len(fid_name, mds->mds_fid_de, strlen(fid_name));
221         if (IS_ERR(result))
222                 RETURN(result);
223
224         inode = result->d_inode;
225         if (!inode)
226                 RETURN(ERR_PTR(-ENOENT));
227
228         if (inode->i_generation == 0 || inode->i_nlink == 0) {
229                 LCONSOLE_WARN("Found inode with zero generation or link -- this"
230                               " may indicate disk corruption (inode: %lu/%u, "
231                               "link %lu, count %d)\n", inode->i_ino,
232                               inode->i_generation,(unsigned long)inode->i_nlink,
233                               atomic_read(&inode->i_count));
234                 dput(result);
235                 RETURN(ERR_PTR(-ENOENT));
236         }
237
238         if (generation && inode->i_generation != generation) {
239                 /* we didn't find the right inode.. */
240                 CDEBUG(D_INODE, "found wrong generation: inode %lu, link: %lu, "
241                        "count: %d, generation %u/%u\n", inode->i_ino,
242                        (unsigned long)inode->i_nlink,
243                        atomic_read(&inode->i_count), inode->i_generation,
244                        generation);
245                 dput(result);
246                 RETURN(ERR_PTR(-ENOENT));
247         }
248
249         if (mnt) {
250                 *mnt = mds->mds_vfsmnt;
251                 mntget(*mnt);
252         }
253
254         RETURN(result);
255 }
256
257 static int mds_connect_internal(struct obd_export *exp,
258                                 struct obd_connect_data *data)
259 {
260         struct obd_device *obd = exp->exp_obd;
261         if (data != NULL) {
262                 data->ocd_connect_flags &= MDT_CONNECT_SUPPORTED;
263                 data->ocd_ibits_known &= MDS_INODELOCK_FULL;
264
265                 /* If no known bits (which should not happen, probably,
266                    as everybody should support LOOKUP and UPDATE bits at least)
267                    revert to compat mode with plain locks. */
268                 if (!data->ocd_ibits_known &&
269                     data->ocd_connect_flags & OBD_CONNECT_IBITS)
270                         data->ocd_connect_flags &= ~OBD_CONNECT_IBITS;
271
272                 if (!obd->u.mds.mds_fl_acl)
273                         data->ocd_connect_flags &= ~OBD_CONNECT_ACL;
274
275                 if (!obd->u.mds.mds_fl_user_xattr)
276                         data->ocd_connect_flags &= ~OBD_CONNECT_XATTR;
277
278                 exp->exp_connect_flags = data->ocd_connect_flags;
279                 data->ocd_version = LUSTRE_VERSION_CODE;
280                 exp->exp_mds_data.med_ibits_known = data->ocd_ibits_known;
281         }
282
283         if (obd->u.mds.mds_fl_acl &&
284             ((exp->exp_connect_flags & OBD_CONNECT_ACL) == 0)) {
285                 CWARN("%s: MDS requires ACL support but client does not\n",
286                       obd->obd_name);
287                 return -EBADE;
288         }
289         return 0;
290 }
291
292 static int mds_reconnect(const struct lu_env *env,
293                          struct obd_export *exp, struct obd_device *obd,
294                          struct obd_uuid *cluuid,
295                          struct obd_connect_data *data)
296 {
297         int rc;
298         ENTRY;
299
300         if (exp == NULL || obd == NULL || cluuid == NULL)
301                 RETURN(-EINVAL);
302
303         rc = mds_connect_internal(exp, data);
304
305         RETURN(rc);
306 }
307
308 /* Establish a connection to the MDS.
309  *
310  * This will set up an export structure for the client to hold state data
311  * about that client, like open files, the last operation number it did
312  * on the server, etc.
313  */
314 static int mds_connect(const struct lu_env *env,
315                        struct lustre_handle *conn, struct obd_device *obd,
316                        struct obd_uuid *cluuid, struct obd_connect_data *data,
317                        void *localdata)
318 {
319         struct obd_export *exp;
320         struct mds_export_data *med;
321         struct mds_client_data *mcd = NULL;
322         lnet_nid_t *client_nid = (lnet_nid_t *)localdata;
323         int rc;
324         ENTRY;
325
326         if (!conn || !obd || !cluuid)
327                 RETURN(-EINVAL);
328
329         /* XXX There is a small race between checking the list and adding a
330          * new connection for the same UUID, but the real threat (list
331          * corruption when multiple different clients connect) is solved.
332          *
333          * There is a second race between adding the export to the list,
334          * and filling in the client data below.  Hence skipping the case
335          * of NULL mcd above.  We should already be controlling multiple
336          * connects at the client, and we can't hold the spinlock over
337          * memory allocations without risk of deadlocking.
338          */
339         rc = class_connect(conn, obd, cluuid);
340         if (rc)
341                 RETURN(rc);
342         exp = class_conn2export(conn);
343         LASSERT(exp);
344         med = &exp->exp_mds_data;
345
346         exp->exp_flvr.sf_rpc = SPTLRPC_FLVR_NULL;
347
348         rc = mds_connect_internal(exp, data);
349         if (rc)
350                 GOTO(out, rc);
351
352         OBD_ALLOC(mcd, sizeof(*mcd));
353         if (!mcd)
354                 GOTO(out, rc = -ENOMEM);
355
356         memcpy(mcd->mcd_uuid, cluuid, sizeof(mcd->mcd_uuid));
357         med->med_mcd = mcd;
358
359         rc = mds_client_add(obd, exp, -1, *client_nid);
360         GOTO(out, rc);
361
362 out:
363         if (rc) {
364                 if (mcd) {
365                         OBD_FREE(mcd, sizeof(*mcd));
366                         med->med_mcd = NULL;
367                 }
368                 class_disconnect(exp);
369         } else {
370                 class_export_put(exp);
371         }
372
373         RETURN(rc);
374 }
375
376 int mds_init_export(struct obd_export *exp)
377 {
378         struct mds_export_data *med = &exp->exp_mds_data;
379
380         INIT_LIST_HEAD(&med->med_open_head);
381         spin_lock_init(&med->med_open_lock);
382
383         spin_lock(&exp->exp_lock);
384         exp->exp_connecting = 1;
385         spin_unlock(&exp->exp_lock);
386
387         RETURN(0);
388 }
389
390 static int mds_destroy_export(struct obd_export *export)
391 {
392         struct mds_export_data *med;
393         struct obd_device *obd = export->exp_obd;
394         struct mds_obd *mds = &obd->u.mds;
395         struct lvfs_run_ctxt saved;
396         struct lov_mds_md *lmm;
397         struct llog_cookie *logcookies;
398         int rc = 0;
399         ENTRY;
400
401         med = &export->exp_mds_data;
402         target_destroy_export(export);
403
404         if (obd_uuid_equals(&export->exp_client_uuid, &obd->obd_uuid))
405                 RETURN(0);
406
407         push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
408         /* Close any open files (which may also cause orphan unlinking). */
409
410         OBD_ALLOC(lmm, mds->mds_max_mdsize);
411         if (lmm == NULL) {
412                 CWARN("%s: allocation failure during cleanup; can not force "
413                       "close file handles on this service.\n", obd->obd_name);
414                 GOTO(out, rc = -ENOMEM);
415         }
416
417         OBD_ALLOC(logcookies, mds->mds_max_cookiesize);
418         if (logcookies == NULL) {
419                 CWARN("%s: allocation failure during cleanup; can not force "
420                       "close file handles on this service.\n", obd->obd_name);
421                 OBD_FREE(lmm, mds->mds_max_mdsize);
422                 GOTO(out_lmm, rc = -ENOMEM);
423         }
424
425         spin_lock(&med->med_open_lock);
426         while (!list_empty(&med->med_open_head)) {
427                 struct list_head *tmp = med->med_open_head.next;
428                 struct mds_file_data *mfd =
429                         list_entry(tmp, struct mds_file_data, mfd_list);
430                 int lmm_size = mds->mds_max_mdsize;
431                 umode_t mode = mfd->mfd_dentry->d_inode->i_mode;
432                 __u64 valid = 0;
433
434                 /* Remove mfd handle so it can't be found again.
435                  * We are consuming the mfd_list reference here. */
436                 mds_mfd_unlink(mfd, 0);
437                 spin_unlock(&med->med_open_lock);
438
439                 /* If you change this message, be sure to update
440                  * replay_single:test_46 */
441                 CDEBUG(D_INODE|D_IOCTL, "%s: force closing file handle for "
442                        "%.*s (ino %lu)\n", obd->obd_name,
443                        mfd->mfd_dentry->d_name.len,mfd->mfd_dentry->d_name.name,
444                        mfd->mfd_dentry->d_inode->i_ino);
445
446                 rc = mds_get_md(obd, mfd->mfd_dentry->d_inode, lmm, &lmm_size, 1);
447                 if (rc < 0)
448                         CWARN("mds_get_md failure, rc=%d\n", rc);
449                 else
450                         valid |= OBD_MD_FLEASIZE;
451
452                 /* child orphan sem protects orphan_dec_test and
453                  * is_orphan race, mds_mfd_close drops it */
454                 MDS_DOWN_WRITE_ORPHAN_SEM(mfd->mfd_dentry->d_inode);
455                 rc = mds_mfd_close(NULL, REQ_REC_OFF, obd, mfd,
456                                    !(export->exp_flags & OBD_OPT_FAILOVER),
457                                    lmm, lmm_size, logcookies,
458                                    mds->mds_max_cookiesize,
459                                    &valid);
460
461                 if (rc)
462                         CDEBUG(D_INODE|D_IOCTL, "Error closing file: %d\n", rc);
463
464                 if (valid & OBD_MD_FLCOOKIE) {
465                         rc = mds_osc_destroy_orphan(obd, mode, lmm,
466                                                     lmm_size, logcookies, 1);
467                         if (rc < 0) {
468                                 CDEBUG(D_INODE, "%s: destroy of orphan failed,"
469                                        " rc = %d\n", obd->obd_name, rc);
470                                 rc = 0;
471                         }
472                         valid &= ~OBD_MD_FLCOOKIE;
473                 }
474
475                 spin_lock(&med->med_open_lock);
476         }
477         spin_unlock(&med->med_open_lock);
478
479         OBD_FREE(logcookies, mds->mds_max_cookiesize);
480 out_lmm:
481         OBD_FREE(lmm, mds->mds_max_mdsize);
482 out:
483         pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
484         mds_client_free(export);
485         RETURN(rc);
486 }
487
488 static int mds_disconnect(struct obd_export *exp)
489 {
490         int rc;
491         ENTRY;
492
493         LASSERT(exp);
494         class_export_get(exp);
495
496         /* Disconnect early so that clients can't keep using export */
497         rc = class_disconnect(exp);
498         if (exp->exp_obd->obd_namespace != NULL)
499                 ldlm_cancel_locks_for_export(exp);
500
501         /* complete all outstanding replies */
502         spin_lock(&exp->exp_lock);
503         while (!list_empty(&exp->exp_outstanding_replies)) {
504                 struct ptlrpc_reply_state *rs =
505                         list_entry(exp->exp_outstanding_replies.next,
506                                    struct ptlrpc_reply_state, rs_exp_list);
507                 struct ptlrpc_service *svc = rs->rs_service;
508
509                 spin_lock(&svc->srv_lock);
510                 list_del_init(&rs->rs_exp_list);
511                 ptlrpc_schedule_difficult_reply(rs);
512                 spin_unlock(&svc->srv_lock);
513         }
514         spin_unlock(&exp->exp_lock);
515
516         class_export_put(exp);
517         RETURN(rc);
518 }
519
520 static int mds_getstatus(struct ptlrpc_request *req)
521 {
522         struct mds_obd *mds = mds_req2mds(req);
523         struct mds_body *body;
524         int rc, size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
525         ENTRY;
526
527         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_GETSTATUS_PACK))
528                 RETURN(req->rq_status = -ENOMEM);
529         rc = lustre_pack_reply(req, 2, size, NULL);
530         if (rc)
531                 RETURN(req->rq_status = rc);
532
533         body = lustre_msg_buf(req->rq_repmsg, REPLY_REC_OFF, sizeof(*body));
534         memcpy(&body->fid1, &mds->mds_rootfid, sizeof(body->fid1));
535
536         /* the last_committed and last_xid fields are filled in for all
537          * replies already - no need to do so here also.
538          */
539         RETURN(0);
540 }
541
542 /* get the LOV EA from @inode and store it into @md.  It can be at most
543  * @size bytes, and @size is updated with the actual EA size.
544  * The EA size is also returned on success, and -ve errno on failure.
545  * If there is no EA then 0 is returned. */
546 int mds_get_md(struct obd_device *obd, struct inode *inode, void *md,
547                int *size, int lock)
548 {
549         int rc = 0;
550         int lmm_size;
551
552         if (lock)
553                 LOCK_INODE_MUTEX(inode);
554         rc = fsfilt_get_md(obd, inode, md, *size, "lov");
555
556         if (rc < 0) {
557                 CERROR("Error %d reading eadata for ino %lu\n",
558                        rc, inode->i_ino);
559         } else if (rc > 0) {
560                 lmm_size = rc;
561                 rc = mds_convert_lov_ea(obd, inode, md, lmm_size);
562
563                 if (rc == 0) {
564                         *size = lmm_size;
565                         rc = lmm_size;
566                 } else if (rc > 0) {
567                         *size = rc;
568                 }
569         } else {
570                 *size = 0;
571         }
572         if (lock)
573                 UNLOCK_INODE_MUTEX(inode);
574
575         RETURN (rc);
576 }
577
578
579 /* Call with lock=1 if you want mds_pack_md to take the i_mutex.
580  * Call with lock=0 if the caller has already taken the i_mutex. */
581 int mds_pack_md(struct obd_device *obd, struct lustre_msg *msg, int offset,
582                 struct mds_body *body, struct inode *inode, int lock)
583 {
584         struct mds_obd *mds = &obd->u.mds;
585         void *lmm;
586         int lmm_size;
587         int rc;
588         ENTRY;
589
590         lmm = lustre_msg_buf(msg, offset, 0);
591         if (lmm == NULL) {
592                 /* Some problem with getting eadata when I sized the reply
593                  * buffer... */
594                 CDEBUG(D_INFO, "no space reserved for inode %lu MD\n",
595                        inode->i_ino);
596                 RETURN(0);
597         }
598         lmm_size = lustre_msg_buflen(msg, offset);
599
600         /* I don't really like this, but it is a sanity check on the client
601          * MD request.  However, if the client doesn't know how much space
602          * to reserve for the MD, it shouldn't be bad to have too much space.
603          */
604         if (lmm_size > mds->mds_max_mdsize) {
605                 CWARN("Reading MD for inode %lu of %d bytes > max %d\n",
606                        inode->i_ino, lmm_size, mds->mds_max_mdsize);
607                 // RETURN(-EINVAL);
608         }
609
610         rc = mds_get_md(obd, inode, lmm, &lmm_size, lock);
611         if (rc > 0) {
612                 if (S_ISDIR(inode->i_mode))
613                         body->valid |= OBD_MD_FLDIREA;
614                 else
615                         body->valid |= OBD_MD_FLEASIZE;
616                 body->eadatasize = lmm_size;
617                 rc = 0;
618         }
619
620         RETURN(rc);
621 }
622
623 #ifdef CONFIG_FS_POSIX_ACL
624 static
625 int mds_pack_posix_acl(struct inode *inode, struct lustre_msg *repmsg,
626                        struct mds_body *repbody, int repoff)
627 {
628         struct dentry de = { .d_inode = inode };
629         int buflen, rc;
630         ENTRY;
631
632         LASSERT(repbody->aclsize == 0);
633         LASSERT(lustre_msg_bufcount(repmsg) > repoff);
634
635         buflen = lustre_msg_buflen(repmsg, repoff);
636         if (!buflen)
637                 GOTO(out, 0);
638
639         if (!inode->i_op || !inode->i_op->getxattr)
640                 GOTO(out, 0);
641
642         rc = inode->i_op->getxattr(&de, MDS_XATTR_NAME_ACL_ACCESS,
643                                    lustre_msg_buf(repmsg, repoff, buflen),
644                                    buflen);
645
646         if (rc >= 0)
647                 repbody->aclsize = rc;
648         else if (rc != -ENODATA) {
649                 CERROR("buflen %d, get acl: %d\n", buflen, rc);
650                 RETURN(rc);
651         }
652         EXIT;
653 out:
654         repbody->valid |= OBD_MD_FLACL;
655         return 0;
656 }
657 #else
658 #define mds_pack_posix_acl(inode, repmsg, repbody, repoff) 0
659 #endif
660
661 int mds_pack_acl(struct mds_export_data *med, struct inode *inode,
662                  struct lustre_msg *repmsg, struct mds_body *repbody,
663                  int repoff)
664 {
665         return mds_pack_posix_acl(inode, repmsg, repbody, repoff);
666 }
667
668 static int mds_getattr_internal(struct obd_device *obd, struct dentry *dentry,
669                                 struct ptlrpc_request *req,
670                                 struct mds_body *reqbody, int reply_off)
671 {
672         struct mds_body *body;
673         struct inode *inode = dentry->d_inode;
674         int rc = 0;
675         ENTRY;
676
677         if (inode == NULL)
678                 RETURN(-ENOENT);
679
680         body = lustre_msg_buf(req->rq_repmsg, reply_off, sizeof(*body));
681         LASSERT(body != NULL);                 /* caller prepped reply */
682
683         mds_pack_inode2fid(&body->fid1, inode);
684         body->flags = reqbody->flags; /* copy MDS_BFLAG_EXT_FLAGS if present */
685         mds_pack_inode2body(body, inode);
686         reply_off++;
687
688         if ((S_ISREG(inode->i_mode) && (reqbody->valid & OBD_MD_FLEASIZE)) ||
689             (S_ISDIR(inode->i_mode) && (reqbody->valid & OBD_MD_FLDIREA))) {
690                 rc = mds_pack_md(obd, req->rq_repmsg, reply_off, body,
691                                  inode, 1);
692
693                 /* If we have LOV EA data, the OST holds size, atime, mtime */
694                 if (!(body->valid & OBD_MD_FLEASIZE) &&
695                     !(body->valid & OBD_MD_FLDIREA))
696                         body->valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS |
697                                         OBD_MD_FLATIME | OBD_MD_FLMTIME);
698
699                 lustre_shrink_reply(req, reply_off, body->eadatasize, 0);
700                 if (body->eadatasize)
701                         reply_off++;
702         } else if (S_ISLNK(inode->i_mode) &&
703                    (reqbody->valid & OBD_MD_LINKNAME) != 0) {
704                 char *symname = lustre_msg_buf(req->rq_repmsg, reply_off, 0);
705                 int len;
706
707                 LASSERT (symname != NULL);       /* caller prepped reply */
708                 len = lustre_msg_buflen(req->rq_repmsg, reply_off);
709
710                 rc = inode->i_op->readlink(dentry, symname, len);
711                 if (rc < 0) {
712                         CERROR("readlink failed: %d\n", rc);
713                 } else if (rc != len - 1) {
714                         CERROR ("Unexpected readlink rc %d: expecting %d\n",
715                                 rc, len - 1);
716                         rc = -EINVAL;
717                 } else {
718                         CDEBUG(D_INODE, "read symlink dest %s\n", symname);
719                         body->valid |= OBD_MD_LINKNAME;
720                         body->eadatasize = rc + 1;
721                         symname[rc] = 0;        /* NULL terminate */
722                         rc = 0;
723                 }
724                 reply_off++;
725         } else if (reqbody->valid == OBD_MD_FLFLAGS &&
726                    reqbody->flags & MDS_BFLAG_EXT_FLAGS) {
727                 int flags;
728
729                 /* We only return the full set of flags on ioctl, otherwise we
730                  * get enough flags from the inode in mds_pack_inode2body(). */
731                 rc = fsfilt_iocontrol(obd, inode, NULL, EXT3_IOC_GETFLAGS,
732                                       (long)&flags);
733                 if (rc == 0)
734                         body->flags = flags | MDS_BFLAG_EXT_FLAGS;
735         }
736
737         if (reqbody->valid & OBD_MD_FLMODEASIZE) {
738                 struct mds_obd *mds = mds_req2mds(req);
739                 body->max_cookiesize = mds->mds_max_cookiesize;
740                 body->max_mdsize = mds->mds_max_mdsize;
741                 body->valid |= OBD_MD_FLMODEASIZE;
742         }
743
744         if (rc)
745                 RETURN(rc);
746
747 #ifdef CONFIG_FS_POSIX_ACL
748         if ((req->rq_export->exp_connect_flags & OBD_CONNECT_ACL) &&
749             (reqbody->valid & OBD_MD_FLACL)) {
750                 rc = mds_pack_acl(&req->rq_export->exp_mds_data,
751                                   inode, req->rq_repmsg,
752                                   body, reply_off);
753
754                 lustre_shrink_reply(req, reply_off, body->aclsize, 0);
755                 if (body->aclsize)
756                         reply_off++;
757         }
758 #endif
759
760         RETURN(rc);
761 }
762
763 static int mds_getattr_pack_msg(struct ptlrpc_request *req, struct inode *inode,
764                                 int offset)
765 {
766         struct mds_obd *mds = mds_req2mds(req);
767         struct mds_body *body;
768         int rc, bufcount = 2;
769         int size[4] = { sizeof(struct ptlrpc_body), sizeof(*body) };
770         ENTRY;
771
772         LASSERT(offset == REQ_REC_OFF); /* non-intent */
773
774         body = lustre_msg_buf(req->rq_reqmsg, offset, sizeof(*body));
775         LASSERT(body != NULL);                    /* checked by caller */
776         LASSERT(lustre_req_swabbed(req, offset)); /* swabbed by caller */
777
778         if ((S_ISREG(inode->i_mode) && (body->valid & OBD_MD_FLEASIZE)) ||
779             (S_ISDIR(inode->i_mode) && (body->valid & OBD_MD_FLDIREA))) {
780                 LOCK_INODE_MUTEX(inode);
781                 rc = fsfilt_get_md(req->rq_export->exp_obd, inode, NULL, 0,
782                                    "lov");
783                 UNLOCK_INODE_MUTEX(inode);
784                 CDEBUG(D_INODE, "got %d bytes MD data for inode %lu\n",
785                        rc, inode->i_ino);
786                 if (rc < 0) {
787                         if (rc != -ENODATA) {
788                                 CERROR("error getting inode %lu MD: rc = %d\n",
789                                        inode->i_ino, rc);
790                                 RETURN(rc);
791                         }
792                         size[bufcount] = 0;
793                 } else if (rc > mds->mds_max_mdsize) {
794                         size[bufcount] = 0;
795                         CERROR("MD size %d larger than maximum possible %u\n",
796                                rc, mds->mds_max_mdsize);
797                 } else {
798                         size[bufcount] = rc;
799                 }
800                 bufcount++;
801         } else if (S_ISLNK(inode->i_mode) && (body->valid & OBD_MD_LINKNAME)) {
802                 if (i_size_read(inode) + 1 != body->eadatasize)
803                         CERROR("symlink size: %Lu, reply space: %d\n",
804                                i_size_read(inode) + 1, body->eadatasize);
805                 size[bufcount] = min_t(int, i_size_read(inode) + 1,
806                                        body->eadatasize);
807                 bufcount++;
808                 CDEBUG(D_INODE, "symlink size: %Lu, reply space: %d\n",
809                        i_size_read(inode) + 1, body->eadatasize);
810         }
811
812 #ifdef CONFIG_FS_POSIX_ACL
813         if ((req->rq_export->exp_connect_flags & OBD_CONNECT_ACL) &&
814             (body->valid & OBD_MD_FLACL)) {
815                 struct dentry de = { .d_inode = inode };
816
817                 size[bufcount] = 0;
818                 if (inode->i_op && inode->i_op->getxattr) {
819                         rc = inode->i_op->getxattr(&de, MDS_XATTR_NAME_ACL_ACCESS,
820                                                    NULL, 0);
821
822                         if (rc < 0) {
823                                 if (rc != -ENODATA) {
824                                         CERROR("got acl size: %d\n", rc);
825                                         RETURN(rc);
826                                 }
827                         } else
828                                 size[bufcount] = rc;
829                 }
830                 bufcount++;
831         }
832 #endif
833
834         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_GETATTR_PACK)) {
835                 CERROR("failed MDS_GETATTR_PACK test\n");
836                 req->rq_status = -ENOMEM;
837                 RETURN(-ENOMEM);
838         }
839
840         rc = lustre_pack_reply(req, bufcount, size, NULL);
841         if (rc) {
842                 req->rq_status = rc;
843                 RETURN(rc);
844         }
845
846         RETURN(0);
847 }
848
849 static int mds_getattr_lock(struct ptlrpc_request *req, int offset,
850                             int child_part, struct lustre_handle *child_lockh)
851 {
852         struct obd_device *obd = req->rq_export->exp_obd;
853         struct mds_obd *mds = &obd->u.mds;
854         struct ldlm_reply *rep = NULL;
855         struct lvfs_run_ctxt saved;
856         struct mds_body *body;
857         struct dentry *dparent = NULL, *dchild = NULL;
858         struct lvfs_ucred uc = {0,};
859         struct lustre_handle parent_lockh;
860         int namesize;
861         int rc = 0, cleanup_phase = 0, resent_req = 0;
862         char *name;
863         ENTRY;
864
865         LASSERT(!strcmp(obd->obd_type->typ_name, LUSTRE_MDS_NAME));
866
867         /* Swab now, before anyone looks inside the request */
868         body = lustre_swab_reqbuf(req, offset, sizeof(*body),
869                                   lustre_swab_mds_body);
870         if (body == NULL) {
871                 CERROR("Can't swab mds_body\n");
872                 RETURN(-EFAULT);
873         }
874
875         lustre_set_req_swabbed(req, offset + 1);
876         name = lustre_msg_string(req->rq_reqmsg, offset + 1, 0);
877         if (name == NULL) {
878                 CERROR("Can't unpack name\n");
879                 RETURN(-EFAULT);
880         }
881         namesize = lustre_msg_buflen(req->rq_reqmsg, offset + 1);
882         /* namesize less than 2 means we have empty name, probably came from
883            revalidate by cfid, so no point in having name to be set */
884         if (namesize <= 1)
885                 name = NULL;
886
887         rc = mds_init_ucred(&uc, req, offset);
888         if (rc)
889                 GOTO(cleanup, rc);
890
891         LASSERT(offset == REQ_REC_OFF || offset == DLM_INTENT_REC_OFF);
892         /* if requests were at offset 2, the getattr reply goes back at 1 */
893         if (offset == DLM_INTENT_REC_OFF) {
894                 rep = lustre_msg_buf(req->rq_repmsg, DLM_LOCKREPLY_OFF,
895                                      sizeof(*rep));
896                 offset = DLM_REPLY_REC_OFF;
897         }
898
899         push_ctxt(&saved, &obd->obd_lvfs_ctxt, &uc);
900         cleanup_phase = 1; /* kernel context */
901         intent_set_disposition(rep, DISP_LOOKUP_EXECD);
902
903         /* FIXME: handle raw lookup */
904 #if 0
905         if (body->valid == OBD_MD_FLID) {
906                 struct mds_body *mds_reply;
907                 int size = sizeof(*mds_reply);
908                 ino_t inum;
909                 // The user requested ONLY the inode number, so do a raw lookup
910                 rc = lustre_pack_reply(req, 1, &size, NULL);
911                 if (rc) {
912                         CERROR("out of memory\n");
913                         GOTO(cleanup, rc);
914                 }
915
916                 rc = dir->i_op->lookup_raw(dir, name, namesize - 1, &inum);
917
918                 mds_reply = lustre_msg_buf(req->rq_repmsg, offset,
919                                            sizeof(*mds_reply));
920                 mds_reply->fid1.id = inum;
921                 mds_reply->valid = OBD_MD_FLID;
922                 GOTO(cleanup, rc);
923         }
924 #endif
925
926         if (lustre_handle_is_used(child_lockh)) {
927                 LASSERT(lustre_msg_get_flags(req->rq_reqmsg) & MSG_RESENT);
928                 resent_req = 1;
929         }
930
931         if (resent_req == 0) {
932                 if (name) {
933                         OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_RESEND, obd_timeout * 2);
934                         rc = mds_get_parent_child_locked(obd, &obd->u.mds,
935                                                          &body->fid1,
936                                                          &parent_lockh,
937                                                          &dparent, LCK_CR,
938                                                          MDS_INODELOCK_UPDATE,
939                                                          name, namesize,
940                                                          child_lockh, &dchild,
941                                                          LCK_CR, child_part);
942                 } else {
943                         /* For revalidate by fid we always take UPDATE lock */
944                         dchild = mds_fid2locked_dentry(obd, &body->fid2, NULL,
945                                                        LCK_CR, child_lockh,
946                                                        child_part);
947                         LASSERT(dchild);
948                         if (IS_ERR(dchild))
949                                 rc = PTR_ERR(dchild);
950                 }
951                 if (rc)
952                         GOTO(cleanup, rc);
953         } else {
954                 struct ldlm_lock *granted_lock;
955                 struct ll_fid child_fid;
956                 struct ldlm_resource *res;
957                 DEBUG_REQ(D_DLMTRACE, req, "resent, not enqueuing new locks");
958                 granted_lock = ldlm_handle2lock(child_lockh);
959                 LASSERTF(granted_lock != NULL, LPU64"/%u lockh "LPX64"\n",
960                          body->fid1.id, body->fid1.generation,
961                          child_lockh->cookie);
962
963
964                 res = granted_lock->l_resource;
965                 child_fid.id = res->lr_name.name[0];
966                 child_fid.generation = res->lr_name.name[1];
967                 dchild = mds_fid2dentry(&obd->u.mds, &child_fid, NULL);
968                 LASSERT(!IS_ERR(dchild));
969                 LDLM_LOCK_PUT(granted_lock);
970         }
971
972         cleanup_phase = 2; /* dchild, dparent, locks */
973
974         if (dchild->d_inode == NULL) {
975                 intent_set_disposition(rep, DISP_LOOKUP_NEG);
976                 /* in the intent case, the policy clears this error:
977                    the disposition is enough */
978                 GOTO(cleanup, rc = -ENOENT);
979         } else {
980                 intent_set_disposition(rep, DISP_LOOKUP_POS);
981         }
982
983         if (req->rq_repmsg == NULL) {
984                 rc = mds_getattr_pack_msg(req, dchild->d_inode, offset);
985                 if (rc != 0) {
986                         CERROR ("mds_getattr_pack_msg: %d\n", rc);
987                         GOTO (cleanup, rc);
988                 }
989         }
990
991         rc = mds_getattr_internal(obd, dchild, req, body, offset);
992         GOTO(cleanup, rc); /* returns the lock to the client */
993
994  cleanup:
995         switch (cleanup_phase) {
996         case 2:
997                 if (resent_req == 0) {
998                         if (rc && dchild->d_inode)
999                                 ldlm_lock_decref(child_lockh, LCK_CR);
1000                         if (name) {
1001                                 ldlm_lock_decref(&parent_lockh, LCK_CR);
1002                                 l_dput(dparent);
1003                         }
1004                 }
1005                 l_dput(dchild);
1006         case 1:
1007                 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, &uc);
1008         default:
1009                 mds_exit_ucred(&uc, mds);
1010                 if (req->rq_reply_state == NULL) {
1011                         int rc2 = lustre_pack_reply(req, 1, NULL, NULL);
1012                         if (rc == 0)
1013                                 rc = rc2;
1014                         req->rq_status = rc;
1015                 }
1016         }
1017         return rc;
1018 }
1019
1020 static int mds_getattr(struct ptlrpc_request *req, int offset)
1021 {
1022         struct mds_obd *mds = mds_req2mds(req);
1023         struct obd_device *obd = req->rq_export->exp_obd;
1024         struct lvfs_run_ctxt saved;
1025         struct dentry *de;
1026         struct mds_body *body;
1027         struct lvfs_ucred uc = {0,};
1028         int rc = 0;
1029         ENTRY;
1030
1031         OBD_COUNTER_INCREMENT(obd, getattr);
1032
1033         body = lustre_swab_reqbuf(req, offset, sizeof(*body),
1034                                   lustre_swab_mds_body);
1035         if (body == NULL)
1036                 RETURN(-EFAULT);
1037
1038         rc = mds_init_ucred(&uc, req, offset);
1039         if (rc)
1040                 GOTO(out_ucred, rc);
1041
1042         push_ctxt(&saved, &obd->obd_lvfs_ctxt, &uc);
1043         de = mds_fid2dentry(mds, &body->fid1, NULL);
1044         if (IS_ERR(de)) {
1045                 rc = req->rq_status = PTR_ERR(de);
1046                 GOTO(out_pop, rc);
1047         }
1048
1049         rc = mds_getattr_pack_msg(req, de->d_inode, offset);
1050         if (rc != 0) {
1051                 CERROR("mds_getattr_pack_msg: %d\n", rc);
1052                 GOTO(out_pop, rc);
1053         }
1054
1055         req->rq_status = mds_getattr_internal(obd, de, req, body,
1056                                               REPLY_REC_OFF);
1057
1058         l_dput(de);
1059         GOTO(out_pop, rc);
1060 out_pop:
1061         pop_ctxt(&saved, &obd->obd_lvfs_ctxt, &uc);
1062 out_ucred:
1063         if (req->rq_reply_state == NULL) {
1064                 int rc2 = lustre_pack_reply(req, 1, NULL, NULL);
1065                 if (rc == 0)
1066                         rc = rc2;
1067                 req->rq_status = rc;
1068         }
1069         mds_exit_ucred(&uc, mds);
1070         return rc;
1071 }
1072
1073 static int mds_obd_statfs(struct obd_device *obd, struct obd_statfs *osfs,
1074                           __u64 max_age, __u32 flags)
1075 {
1076         int rc;
1077
1078         spin_lock(&obd->obd_osfs_lock);
1079         rc = fsfilt_statfs(obd, obd->u.obt.obt_sb, max_age);
1080         if (rc == 0)
1081                 memcpy(osfs, &obd->obd_osfs, sizeof(*osfs));
1082         spin_unlock(&obd->obd_osfs_lock);
1083
1084         return rc;
1085 }
1086
1087 static int mds_statfs(struct ptlrpc_request *req)
1088 {
1089         struct obd_device *obd = req->rq_export->exp_obd;
1090         int rc, size[2] = { sizeof(struct ptlrpc_body),
1091                             sizeof(struct obd_statfs) };
1092         ENTRY;
1093
1094         /* This will trigger a watchdog timeout */
1095         OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_STATFS_LCW_SLEEP,
1096                          (MDS_SERVICE_WATCHDOG_TIMEOUT / 1000) + 1);
1097         OBD_COUNTER_INCREMENT(obd, statfs);
1098
1099         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_STATFS_PACK))
1100                 GOTO(out, rc = -ENOMEM);
1101         rc = lustre_pack_reply(req, 2, size, NULL);
1102         if (rc)
1103                 GOTO(out, rc);
1104
1105         /* We call this so that we can cache a bit - 1 jiffie worth */
1106         rc = mds_obd_statfs(obd, lustre_msg_buf(req->rq_repmsg, REPLY_REC_OFF,
1107                                                 size[REPLY_REC_OFF]),
1108                             cfs_time_current_64() - HZ, 0);
1109         if (rc) {
1110                 CERROR("mds_obd_statfs failed: rc %d\n", rc);
1111                 GOTO(out, rc);
1112         }
1113
1114         EXIT;
1115 out:
1116         req->rq_status = rc;
1117         return 0;
1118 }
1119
1120 static int mds_sync(struct ptlrpc_request *req, int offset)
1121 {
1122         struct obd_device *obd = req->rq_export->exp_obd;
1123         struct mds_obd *mds = &obd->u.mds;
1124         struct mds_body *body;
1125         int rc, size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
1126         ENTRY;
1127
1128         body = lustre_swab_reqbuf(req, offset, sizeof(*body),
1129                                   lustre_swab_mds_body);
1130         if (body == NULL)
1131                 GOTO(out, rc = -EFAULT);
1132
1133         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_SYNC_PACK))
1134                 GOTO(out, rc = -ENOMEM);
1135         rc = lustre_pack_reply(req, 2, size, NULL);
1136         if (rc)
1137                 GOTO(out, rc);
1138
1139         rc = fsfilt_sync(obd, obd->u.obt.obt_sb);
1140         if (rc == 0 && body->fid1.id != 0) {
1141                 struct dentry *de;
1142
1143                 de = mds_fid2dentry(mds, &body->fid1, NULL);
1144                 if (IS_ERR(de))
1145                         GOTO(out, rc = PTR_ERR(de));
1146
1147                 body = lustre_msg_buf(req->rq_repmsg, REPLY_REC_OFF,
1148                                       sizeof(*body));
1149                 mds_pack_inode2fid(&body->fid1, de->d_inode);
1150                 mds_pack_inode2body(body, de->d_inode);
1151
1152                 l_dput(de);
1153         }
1154         GOTO(out, rc);
1155 out:
1156         req->rq_status = rc;
1157         return 0;
1158 }
1159
1160 /* mds_readpage does not take a DLM lock on the inode, because the client must
1161  * already have a PR lock.
1162  *
1163  * If we were to take another one here, a deadlock will result, if another
1164  * thread is already waiting for a PW lock. */
1165 static int mds_readpage(struct ptlrpc_request *req, int offset)
1166 {
1167         struct obd_device *obd = req->rq_export->exp_obd;
1168         struct mds_obd *mds = &obd->u.mds;
1169         struct vfsmount *mnt;
1170         struct dentry *de;
1171         struct file *file;
1172         struct mds_body *body, *repbody;
1173         struct lvfs_run_ctxt saved;
1174         int rc, size[2] = { sizeof(struct ptlrpc_body), sizeof(*repbody) };
1175         struct lvfs_ucred uc = {0,};
1176         ENTRY;
1177
1178         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_READPAGE_PACK))
1179                 RETURN(-ENOMEM);
1180         rc = lustre_pack_reply(req, 2, size, NULL);
1181         if (rc)
1182                 GOTO(out, rc);
1183
1184         body = lustre_swab_reqbuf(req, offset, sizeof(*body),
1185                                   lustre_swab_mds_body);
1186         if (body == NULL)
1187                 GOTO (out, rc = -EFAULT);
1188
1189         rc = mds_init_ucred(&uc, req, offset);
1190         if (rc)
1191                 GOTO(out, rc);
1192
1193         push_ctxt(&saved, &obd->obd_lvfs_ctxt, &uc);
1194         de = mds_fid2dentry(&obd->u.mds, &body->fid1, &mnt);
1195         if (IS_ERR(de))
1196                 GOTO(out_pop, rc = PTR_ERR(de));
1197
1198         CDEBUG(D_INODE, "ino %lu\n", de->d_inode->i_ino);
1199
1200         file = dentry_open(de, mnt, O_RDONLY | O_LARGEFILE);
1201         /* note: in case of an error, dentry_open puts dentry */
1202         if (IS_ERR(file))
1203                 GOTO(out_pop, rc = PTR_ERR(file));
1204
1205         /* body->size is actually the offset -eeb */
1206         if ((body->size & (de->d_inode->i_sb->s_blocksize - 1)) != 0) {
1207                 CERROR("offset "LPU64" not on a block boundary of %lu\n",
1208                        body->size, de->d_inode->i_sb->s_blocksize);
1209                 GOTO(out_file, rc = -EFAULT);
1210         }
1211
1212         /* body->nlink is actually the #bytes to read -eeb */
1213         if (body->nlink & (de->d_inode->i_sb->s_blocksize - 1)) {
1214                 CERROR("size %u is not multiple of blocksize %lu\n",
1215                        body->nlink, de->d_inode->i_sb->s_blocksize);
1216                 GOTO(out_file, rc = -EFAULT);
1217         }
1218
1219         repbody = lustre_msg_buf(req->rq_repmsg, REPLY_REC_OFF,
1220                                  sizeof(*repbody));
1221         repbody->size = i_size_read(file->f_dentry->d_inode);
1222         repbody->valid = OBD_MD_FLSIZE;
1223
1224         /* to make this asynchronous make sure that the handling function
1225            doesn't send a reply when this function completes. Instead a
1226            callback function would send the reply */
1227         /* body->size is actually the offset -eeb */
1228         rc = mds_sendpage(req, file, body->size, body->nlink);
1229
1230 out_file:
1231         filp_close(file, 0);
1232 out_pop:
1233         pop_ctxt(&saved, &obd->obd_lvfs_ctxt, &uc);
1234 out:
1235         mds_exit_ucred(&uc, mds);
1236         req->rq_status = rc;
1237         RETURN(0);
1238 }
1239
1240 int mds_reint(struct ptlrpc_request *req, int offset,
1241               struct lustre_handle *lockh)
1242 {
1243         struct mds_update_record *rec; /* 116 bytes on the stack?  no sir! */
1244         int rc;
1245
1246         OBD_ALLOC(rec, sizeof(*rec));
1247         if (rec == NULL)
1248                 RETURN(-ENOMEM);
1249
1250         rc = mds_update_unpack(req, offset, rec);
1251         if (rc || OBD_FAIL_CHECK(OBD_FAIL_MDS_REINT_UNPACK)) {
1252                 CERROR("invalid record\n");
1253                 GOTO(out, req->rq_status = -EINVAL);
1254         }
1255
1256         /* rc will be used to interrupt a for loop over multiple records */
1257         rc = mds_reint_rec(rec, offset, req, lockh);
1258  out:
1259         OBD_FREE(rec, sizeof(*rec));
1260         return rc;
1261 }
1262
1263 int mds_filter_recovery_request(struct ptlrpc_request *req,
1264                                 struct obd_device *obd, int *process)
1265 {
1266         switch (lustre_msg_get_opc(req->rq_reqmsg)) {
1267         case MDS_CONNECT: /* This will never get here, but for completeness. */
1268         case OST_CONNECT: /* This will never get here, but for completeness. */
1269         case MDS_DISCONNECT:
1270         case OST_DISCONNECT:
1271                *process = 1;
1272                RETURN(0);
1273
1274         case MDS_CLOSE:
1275         case MDS_DONE_WRITING:
1276         case MDS_SYNC: /* used in unmounting */
1277         case OBD_PING:
1278         case MDS_REINT:
1279         case SEQ_QUERY:
1280         case FLD_QUERY:
1281         case LDLM_ENQUEUE:
1282                 *process = target_queue_recovery_request(req, obd);
1283                 RETURN(0);
1284
1285         default:
1286                 DEBUG_REQ(D_ERROR, req, "not permitted during recovery");
1287                 *process = -EAGAIN;
1288                 RETURN(0);
1289         }
1290 }
1291 EXPORT_SYMBOL(mds_filter_recovery_request);
1292
1293 static char *reint_names[] = {
1294         [REINT_SETATTR] "setattr",
1295         [REINT_CREATE]  "create",
1296         [REINT_LINK]    "link",
1297         [REINT_UNLINK]  "unlink",
1298         [REINT_RENAME]  "rename",
1299         [REINT_OPEN]    "open",
1300 };
1301
1302 static int mds_set_info_rpc(struct obd_export *exp, struct ptlrpc_request *req)
1303 {
1304         void *key, *val;
1305         int keylen, vallen, rc = 0;
1306         ENTRY;
1307
1308         key = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, 1);
1309         if (key == NULL) {
1310                 DEBUG_REQ(D_HA, req, "no set_info key");
1311                 RETURN(-EFAULT);
1312         }
1313         keylen = lustre_msg_buflen(req->rq_reqmsg, REQ_REC_OFF);
1314
1315         val = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 1, 0);
1316         vallen = lustre_msg_buflen(req->rq_reqmsg, REQ_REC_OFF + 1);
1317
1318         rc = lustre_pack_reply(req, 1, NULL, NULL);
1319         if (rc)
1320                 RETURN(rc);
1321
1322         lustre_msg_set_status(req->rq_repmsg, 0);
1323
1324         if (KEY_IS("read-only")) {
1325                 if (val == NULL || vallen < sizeof(__u32)) {
1326                         DEBUG_REQ(D_HA, req, "no set_info val");
1327                         RETURN(-EFAULT);
1328                 }
1329
1330                 if (*(__u32 *)val)
1331                         exp->exp_connect_flags |= OBD_CONNECT_RDONLY;
1332                 else
1333                         exp->exp_connect_flags &= ~OBD_CONNECT_RDONLY;
1334         } else {
1335                 RETURN(-EINVAL);
1336         }
1337
1338         RETURN(0);
1339 }
1340
1341 static int mds_handle_quotacheck(struct ptlrpc_request *req)
1342 {
1343         struct obd_quotactl *oqctl;
1344         int rc;
1345         ENTRY;
1346
1347         oqctl = lustre_swab_reqbuf(req, REQ_REC_OFF, sizeof(*oqctl),
1348                                    lustre_swab_obd_quotactl);
1349         if (oqctl == NULL)
1350                 RETURN(-EPROTO);
1351
1352         rc = lustre_pack_reply(req, 1, NULL, NULL);
1353         if (rc)
1354                 RETURN(rc);
1355
1356         req->rq_status = obd_quotacheck(req->rq_export, oqctl);
1357         RETURN(0);
1358 }
1359
1360 static int mds_handle_quotactl(struct ptlrpc_request *req)
1361 {
1362         struct obd_quotactl *oqctl, *repoqc;
1363         int rc, size[2] = { sizeof(struct ptlrpc_body), sizeof(*repoqc) };
1364         ENTRY;
1365
1366         oqctl = lustre_swab_reqbuf(req, REQ_REC_OFF, sizeof(*oqctl),
1367                                    lustre_swab_obd_quotactl);
1368         if (oqctl == NULL)
1369                 RETURN(-EPROTO);
1370
1371         rc = lustre_pack_reply(req, 2, size, NULL);
1372         if (rc)
1373                 RETURN(rc);
1374
1375         repoqc = lustre_msg_buf(req->rq_repmsg, REPLY_REC_OFF, sizeof(*repoqc));
1376
1377         req->rq_status = obd_quotactl(req->rq_export, oqctl);
1378         *repoqc = *oqctl;
1379         RETURN(0);
1380 }
1381
1382 int mds_msg_check_version(struct lustre_msg *msg)
1383 {
1384         int rc;
1385
1386         switch (lustre_msg_get_opc(msg)) {
1387         case MDS_CONNECT:
1388         case MDS_DISCONNECT:
1389         case OBD_PING:
1390         case SEC_CTX_INIT:
1391         case SEC_CTX_INIT_CONT:
1392         case SEC_CTX_FINI:
1393                 rc = lustre_msg_check_version(msg, LUSTRE_OBD_VERSION);
1394                 if (rc)
1395                         CERROR("bad opc %u version %08x, expecting %08x\n",
1396                                lustre_msg_get_opc(msg),
1397                                lustre_msg_get_version(msg),
1398                                LUSTRE_OBD_VERSION);
1399                 break;
1400         case MDS_GETSTATUS:
1401         case MDS_GETATTR:
1402         case MDS_GETATTR_NAME:
1403         case MDS_STATFS:
1404         case MDS_READPAGE:
1405         case MDS_WRITEPAGE:
1406         case MDS_IS_SUBDIR:
1407         case MDS_REINT:
1408         case MDS_CLOSE:
1409         case MDS_DONE_WRITING:
1410         case MDS_PIN:
1411         case MDS_SYNC:
1412         case MDS_GETXATTR:
1413         case MDS_SETXATTR:
1414         case MDS_SET_INFO:
1415         case MDS_QUOTACHECK:
1416         case MDS_QUOTACTL:
1417         case QUOTA_DQACQ:
1418         case QUOTA_DQREL:
1419         case SEQ_QUERY:
1420         case FLD_QUERY:
1421                 rc = lustre_msg_check_version(msg, LUSTRE_MDS_VERSION);
1422                 if (rc)
1423                         CERROR("bad opc %u version %08x, expecting %08x\n",
1424                                lustre_msg_get_opc(msg),
1425                                lustre_msg_get_version(msg),
1426                                LUSTRE_MDS_VERSION);
1427                 break;
1428         case LDLM_ENQUEUE:
1429         case LDLM_CONVERT:
1430         case LDLM_BL_CALLBACK:
1431         case LDLM_CP_CALLBACK:
1432                 rc = lustre_msg_check_version(msg, LUSTRE_DLM_VERSION);
1433                 if (rc)
1434                         CERROR("bad opc %u version %08x, expecting %08x\n",
1435                                lustre_msg_get_opc(msg),
1436                                lustre_msg_get_version(msg),
1437                                LUSTRE_DLM_VERSION);
1438                 break;
1439         case OBD_LOG_CANCEL:
1440         case LLOG_ORIGIN_HANDLE_CREATE:
1441         case LLOG_ORIGIN_HANDLE_NEXT_BLOCK:
1442         case LLOG_ORIGIN_HANDLE_READ_HEADER:
1443         case LLOG_ORIGIN_HANDLE_CLOSE:
1444         case LLOG_ORIGIN_HANDLE_DESTROY:
1445         case LLOG_ORIGIN_HANDLE_PREV_BLOCK:
1446         case LLOG_CATINFO:
1447                 rc = lustre_msg_check_version(msg, LUSTRE_LOG_VERSION);
1448                 if (rc)
1449                         CERROR("bad opc %u version %08x, expecting %08x\n",
1450                                lustre_msg_get_opc(msg),
1451                                lustre_msg_get_version(msg),
1452                                LUSTRE_LOG_VERSION);
1453                 break;
1454         default:
1455                 CERROR("MDS unknown opcode %d\n", lustre_msg_get_opc(msg));
1456                 rc = -ENOTSUPP;
1457         }
1458         return rc;
1459 }
1460 EXPORT_SYMBOL(mds_msg_check_version);
1461
1462 int mds_handle(struct ptlrpc_request *req)
1463 {
1464         int should_process, fail = OBD_FAIL_MDS_ALL_REPLY_NET;
1465         int rc;
1466         struct mds_obd *mds = NULL; /* quell gcc overwarning */
1467         struct obd_device *obd = NULL;
1468         ENTRY;
1469
1470         if (OBD_FAIL_CHECK_ORSET(OBD_FAIL_MDS_ALL_REQUEST_NET, OBD_FAIL_ONCE))
1471                 RETURN(0);
1472
1473         LASSERT(current->journal_info == NULL);
1474
1475         rc = mds_msg_check_version(req->rq_reqmsg);
1476         if (rc) {
1477                 CERROR("MDS drop mal-formed request\n");
1478                 RETURN(rc);
1479         }
1480
1481         /* XXX identical to OST */
1482         if (lustre_msg_get_opc(req->rq_reqmsg) != MDS_CONNECT) {
1483                 struct mds_export_data *med;
1484                 int recovering;
1485
1486                 if (req->rq_export == NULL) {
1487                         CERROR("operation %d on unconnected MDS from %s\n",
1488                                lustre_msg_get_opc(req->rq_reqmsg),
1489                                libcfs_id2str(req->rq_peer));
1490                         req->rq_status = -ENOTCONN;
1491                         GOTO(out, rc = -ENOTCONN);
1492                 }
1493
1494                 med = &req->rq_export->exp_mds_data;
1495                 obd = req->rq_export->exp_obd;
1496                 mds = mds_req2mds(req);
1497
1498                 /* sanity check: if the xid matches, the request must
1499                  * be marked as a resent or replayed */
1500                 if (req->rq_xid == le64_to_cpu(med->med_mcd->mcd_last_xid) ||
1501                    req->rq_xid == le64_to_cpu(med->med_mcd->mcd_last_close_xid))
1502                         if (!(lustre_msg_get_flags(req->rq_reqmsg) &
1503                                  (MSG_RESENT | MSG_REPLAY))) {
1504                                 CERROR("rq_xid "LPU64" matches last_xid, "
1505                                        "expected RESENT flag\n",
1506                                         req->rq_xid);
1507                                 req->rq_status = -ENOTCONN;
1508                                 GOTO(out, rc = -EFAULT);
1509                         }
1510                 /* else: note the opposite is not always true; a
1511                  * RESENT req after a failover will usually not match
1512                  * the last_xid, since it was likely never
1513                  * committed. A REPLAYed request will almost never
1514                  * match the last xid, however it could for a
1515                  * committed, but still retained, open. */
1516
1517                 /* Check for aborted recovery. */
1518                 spin_lock_bh(&obd->obd_processing_task_lock);
1519                 recovering = obd->obd_recovering;
1520                 spin_unlock_bh(&obd->obd_processing_task_lock);
1521                 if (recovering) {
1522                         rc = mds_filter_recovery_request(req, obd,
1523                                                          &should_process);
1524                         if (rc || !should_process)
1525                                 RETURN(rc);
1526                         else if (should_process < 0) {
1527                                 req->rq_status = should_process;
1528                                 rc = ptlrpc_error(req);
1529                                 RETURN(rc);
1530                         }
1531                 }
1532         }
1533
1534         switch (lustre_msg_get_opc(req->rq_reqmsg)) {
1535         case MDS_CONNECT:
1536                 DEBUG_REQ(D_INODE, req, "connect");
1537                 if (OBD_FAIL_CHECK(OBD_FAIL_MDS_CONNECT_NET))
1538                         RETURN(0);
1539                 rc = target_handle_connect(req);
1540                 if (!rc) {
1541                         /* Now that we have an export, set mds. */
1542                         /*
1543                          * XXX nikita: these assignments are useless: mds is
1544                          * never used below, and obd is only used for
1545                          * MSG_LAST_REPLAY case, which never happens for
1546                          * MDS_CONNECT.
1547                          */
1548                         obd = req->rq_export->exp_obd;
1549                         mds = mds_req2mds(req);
1550                 }
1551                 break;
1552
1553         case MDS_DISCONNECT:
1554                 DEBUG_REQ(D_INODE, req, "disconnect");
1555                 if (OBD_FAIL_CHECK(OBD_FAIL_MDS_DISCONNECT_NET))
1556                         RETURN(0);
1557                 rc = target_handle_disconnect(req);
1558                 req->rq_status = rc;            /* superfluous? */
1559                 break;
1560
1561         case MDS_GETSTATUS:
1562                 DEBUG_REQ(D_INODE, req, "getstatus");
1563                 if (OBD_FAIL_CHECK(OBD_FAIL_MDS_GETSTATUS_NET))
1564                         RETURN(0);
1565                 rc = mds_getstatus(req);
1566                 break;
1567
1568         case MDS_GETATTR:
1569                 DEBUG_REQ(D_INODE, req, "getattr");
1570                 if (OBD_FAIL_CHECK(OBD_FAIL_MDS_GETATTR_NET))
1571                         RETURN(0);
1572                 rc = mds_getattr(req, REQ_REC_OFF);
1573                 break;
1574
1575         case MDS_SETXATTR:
1576                 DEBUG_REQ(D_INODE, req, "setxattr");
1577                 if (OBD_FAIL_CHECK(OBD_FAIL_MDS_SETXATTR_NET))
1578                         RETURN(0);
1579                 rc = mds_setxattr(req);
1580                 break;
1581
1582         case MDS_GETXATTR:
1583                 DEBUG_REQ(D_INODE, req, "getxattr");
1584                 if (OBD_FAIL_CHECK(OBD_FAIL_MDS_GETXATTR_NET))
1585                         RETURN(0);
1586                 rc = mds_getxattr(req);
1587                 break;
1588
1589         case MDS_GETATTR_NAME: {
1590                 struct lustre_handle lockh = { 0 };
1591                 DEBUG_REQ(D_INODE, req, "getattr_name");
1592                 if (OBD_FAIL_CHECK(OBD_FAIL_MDS_GETATTR_NAME_NET))
1593                         RETURN(0);
1594
1595                 /* If this request gets a reconstructed reply, we won't be
1596                  * acquiring any new locks in mds_getattr_lock, so we don't
1597                  * want to cancel.
1598                  */
1599                 rc = mds_getattr_lock(req, REQ_REC_OFF, MDS_INODELOCK_UPDATE,
1600                                       &lockh);
1601                 /* this non-intent call (from an ioctl) is special */
1602                 req->rq_status = rc;
1603                 if (rc == 0 && lustre_handle_is_used(&lockh))
1604                         ldlm_lock_decref(&lockh, LCK_CR);
1605                 break;
1606         }
1607         case MDS_STATFS:
1608                 DEBUG_REQ(D_INODE, req, "statfs");
1609                 if (OBD_FAIL_CHECK(OBD_FAIL_MDS_STATFS_NET))
1610                         RETURN(0);
1611                 rc = mds_statfs(req);
1612                 break;
1613
1614         case MDS_READPAGE:
1615                 DEBUG_REQ(D_INODE, req, "readpage");
1616                 if (OBD_FAIL_CHECK(OBD_FAIL_MDS_READPAGE_NET))
1617                         RETURN(0);
1618                 rc = mds_readpage(req, REQ_REC_OFF);
1619
1620                 if (OBD_FAIL_CHECK(OBD_FAIL_MDS_SENDPAGE)) {
1621                         RETURN(0);
1622                 }
1623
1624                 break;
1625
1626         case MDS_REINT: {
1627                 __u32 *opcp = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF,
1628                                              sizeof(*opcp));
1629                 __u32  opc;
1630                 int op = 0;
1631                 int size[4] = { sizeof(struct ptlrpc_body),
1632                                 sizeof(struct mds_body),
1633                                 mds->mds_max_mdsize,
1634                                 mds->mds_max_cookiesize };
1635                 int bufcount;
1636
1637                 /* NB only peek inside req now; mds_reint() will swab it */
1638                 if (opcp == NULL) {
1639                         CERROR ("Can't inspect opcode\n");
1640                         rc = -EINVAL;
1641                         break;
1642                 }
1643                 opc = *opcp;
1644                 if (lustre_msg_swabbed(req->rq_reqmsg))
1645                         __swab32s(&opc);
1646
1647                 DEBUG_REQ(D_INODE, req, "reint %d (%s)", opc,
1648                           (opc < sizeof(reint_names) / sizeof(reint_names[0]) ||
1649                            reint_names[opc] == NULL) ? reint_names[opc] :
1650                                                        "unknown opcode");
1651                 switch (opc) {
1652                 case REINT_CREATE:
1653                         op = PTLRPC_LAST_CNTR + MDS_REINT_CREATE;
1654                         break;
1655                 case REINT_LINK:
1656                         op = PTLRPC_LAST_CNTR + MDS_REINT_LINK;
1657                         break;
1658                 case REINT_OPEN:
1659                         op = PTLRPC_LAST_CNTR + MDS_REINT_OPEN;
1660                         break;
1661                 case REINT_SETATTR:
1662                         op = PTLRPC_LAST_CNTR + MDS_REINT_SETATTR;
1663                         break;
1664                 case REINT_RENAME:
1665                         op = PTLRPC_LAST_CNTR + MDS_REINT_RENAME;
1666                         break;
1667                 case REINT_UNLINK:
1668                         op = PTLRPC_LAST_CNTR + MDS_REINT_UNLINK;
1669                         break;
1670                 default:
1671                         op = 0;
1672                         break;
1673                 }
1674
1675                 if (op && req->rq_rqbd->rqbd_service->srv_stats)
1676                         lprocfs_counter_incr(
1677                                 req->rq_rqbd->rqbd_service->srv_stats, op);
1678
1679                 if (OBD_FAIL_CHECK(OBD_FAIL_MDS_REINT_NET))
1680                         RETURN(0);
1681
1682                 if (opc == REINT_UNLINK || opc == REINT_RENAME)
1683                         bufcount = 4;
1684                 else if (opc == REINT_OPEN)
1685                         bufcount = 3;
1686                 else
1687                         bufcount = 2;
1688
1689                 rc = lustre_pack_reply(req, bufcount, size, NULL);
1690                 if (rc)
1691                         break;
1692
1693                 rc = mds_reint(req, REQ_REC_OFF, NULL);
1694                 fail = OBD_FAIL_MDS_REINT_NET_REP;
1695                 break;
1696         }
1697
1698         case MDS_CLOSE:
1699                 DEBUG_REQ(D_INODE, req, "close");
1700                 if (OBD_FAIL_CHECK(OBD_FAIL_MDS_CLOSE_NET))
1701                         RETURN(0);
1702                 rc = mds_close(req, REQ_REC_OFF);
1703                 fail = OBD_FAIL_MDS_CLOSE_NET_REP;
1704                 break;
1705
1706         case MDS_DONE_WRITING:
1707                 DEBUG_REQ(D_INODE, req, "done_writing");
1708                 if (OBD_FAIL_CHECK(OBD_FAIL_MDS_DONE_WRITING_NET))
1709                         RETURN(0);
1710                 rc = mds_done_writing(req, REQ_REC_OFF);
1711                 break;
1712
1713         case MDS_PIN:
1714                 DEBUG_REQ(D_INODE, req, "pin");
1715                 if (OBD_FAIL_CHECK(OBD_FAIL_MDS_PIN_NET))
1716                         RETURN(0);
1717                 rc = mds_pin(req, REQ_REC_OFF);
1718                 break;
1719
1720         case MDS_SYNC:
1721                 DEBUG_REQ(D_INODE, req, "sync");
1722                 if (OBD_FAIL_CHECK(OBD_FAIL_MDS_SYNC_NET))
1723                         RETURN(0);
1724                 rc = mds_sync(req, REQ_REC_OFF);
1725                 break;
1726
1727         case MDS_SET_INFO:
1728                 DEBUG_REQ(D_INODE, req, "set_info");
1729                 rc = mds_set_info_rpc(req->rq_export, req);
1730                 break;
1731
1732         case MDS_QUOTACHECK:
1733                 DEBUG_REQ(D_INODE, req, "quotacheck");
1734                 if (OBD_FAIL_CHECK(OBD_FAIL_MDS_QUOTACHECK_NET))
1735                         RETURN(0);
1736                 rc = mds_handle_quotacheck(req);
1737                 break;
1738
1739         case MDS_QUOTACTL:
1740                 DEBUG_REQ(D_INODE, req, "quotactl");
1741                 if (OBD_FAIL_CHECK(OBD_FAIL_MDS_QUOTACTL_NET))
1742                         RETURN(0);
1743                 rc = mds_handle_quotactl(req);
1744                 break;
1745
1746         case OBD_PING:
1747                 DEBUG_REQ(D_INODE, req, "ping");
1748                 rc = target_handle_ping(req);
1749                 break;
1750
1751         case OBD_LOG_CANCEL:
1752                 CDEBUG(D_INODE, "log cancel\n");
1753                 if (OBD_FAIL_CHECK(OBD_FAIL_OBD_LOG_CANCEL_NET))
1754                         RETURN(0);
1755                 rc = -ENOTSUPP; /* la la la */
1756                 break;
1757
1758         case LDLM_ENQUEUE:
1759                 DEBUG_REQ(D_INODE, req, "enqueue");
1760                 if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_ENQUEUE))
1761                         RETURN(0);
1762                 rc = ldlm_handle_enqueue(req, ldlm_server_completion_ast,
1763                                          ldlm_server_blocking_ast, NULL);
1764                 fail = OBD_FAIL_LDLM_REPLY;
1765                 break;
1766         case LDLM_CONVERT:
1767                 DEBUG_REQ(D_INODE, req, "convert");
1768                 if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_CONVERT))
1769                         RETURN(0);
1770                 rc = ldlm_handle_convert(req);
1771                 break;
1772         case LDLM_BL_CALLBACK:
1773         case LDLM_CP_CALLBACK:
1774                 DEBUG_REQ(D_INODE, req, "callback");
1775                 CERROR("callbacks should not happen on MDS\n");
1776                 LBUG();
1777                 if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_BL_CALLBACK))
1778                         RETURN(0);
1779                 break;
1780         case LLOG_ORIGIN_HANDLE_CREATE:
1781                 DEBUG_REQ(D_INODE, req, "llog_init");
1782                 if (OBD_FAIL_CHECK(OBD_FAIL_OBD_LOGD_NET))
1783                         RETURN(0);
1784                 rc = llog_origin_handle_create(req);
1785                 break;
1786         case LLOG_ORIGIN_HANDLE_DESTROY:
1787                 DEBUG_REQ(D_INODE, req, "llog_init");
1788                 if (OBD_FAIL_CHECK(OBD_FAIL_OBD_LOGD_NET))
1789                         RETURN(0);
1790                 rc = llog_origin_handle_destroy(req);
1791                 break;
1792         case LLOG_ORIGIN_HANDLE_NEXT_BLOCK:
1793                 DEBUG_REQ(D_INODE, req, "llog next block");
1794                 if (OBD_FAIL_CHECK(OBD_FAIL_OBD_LOGD_NET))
1795                         RETURN(0);
1796                 rc = llog_origin_handle_next_block(req);
1797                 break;
1798         case LLOG_ORIGIN_HANDLE_PREV_BLOCK:
1799                 DEBUG_REQ(D_INODE, req, "llog prev block");
1800                 if (OBD_FAIL_CHECK(OBD_FAIL_OBD_LOGD_NET))
1801                         RETURN(0);
1802                 rc = llog_origin_handle_prev_block(req);
1803                 break;
1804         case LLOG_ORIGIN_HANDLE_READ_HEADER:
1805                 DEBUG_REQ(D_INODE, req, "llog read header");
1806                 if (OBD_FAIL_CHECK(OBD_FAIL_OBD_LOGD_NET))
1807                         RETURN(0);
1808                 rc = llog_origin_handle_read_header(req);
1809                 break;
1810         case LLOG_ORIGIN_HANDLE_CLOSE:
1811                 DEBUG_REQ(D_INODE, req, "llog close");
1812                 if (OBD_FAIL_CHECK(OBD_FAIL_OBD_LOGD_NET))
1813                         RETURN(0);
1814                 rc = llog_origin_handle_close(req);
1815                 break;
1816         case LLOG_CATINFO:
1817                 DEBUG_REQ(D_INODE, req, "llog catinfo");
1818                 if (OBD_FAIL_CHECK(OBD_FAIL_OBD_LOGD_NET))
1819                         RETURN(0);
1820                 rc = llog_catinfo(req);
1821                 break;
1822         default:
1823                 req->rq_status = -ENOTSUPP;
1824                 rc = ptlrpc_error(req);
1825                 RETURN(rc);
1826         }
1827
1828         LASSERT(current->journal_info == NULL);
1829
1830         /* If we're DISCONNECTing, the mds_export_data is already freed */
1831         if (!rc && lustre_msg_get_opc(req->rq_reqmsg) != MDS_DISCONNECT) {
1832                 struct mds_export_data *med = &req->rq_export->exp_mds_data;
1833
1834                 /* I don't think last_xid is used for anyway, so I'm not sure
1835                    if we need to care about last_close_xid here.*/
1836                 lustre_msg_set_last_xid(req->rq_repmsg,
1837                                        le64_to_cpu(med->med_mcd->mcd_last_xid));
1838
1839                 target_committed_to_req(req);
1840         }
1841
1842         EXIT;
1843  out:
1844
1845         target_send_reply(req, rc, fail);
1846         return 0;
1847 }
1848
1849 /* Update the server data on disk.  This stores the new mount_count and
1850  * also the last_rcvd value to disk.  If we don't have a clean shutdown,
1851  * then the server last_rcvd value may be less than that of the clients.
1852  * This will alert us that we may need to do client recovery.
1853  *
1854  * Also assumes for mds_last_transno that we are not modifying it (no locking).
1855  */
1856 int mds_update_server_data(struct obd_device *obd, int force_sync)
1857 {
1858         struct mds_obd *mds = &obd->u.mds;
1859         struct lr_server_data *lsd = mds->mds_server_data;
1860         struct file *filp = mds->mds_rcvd_filp;
1861         struct lvfs_run_ctxt saved;
1862         loff_t off = 0;
1863         int rc;
1864         ENTRY;
1865
1866         CDEBUG(D_SUPER, "MDS mount_count is "LPU64", last_transno is "LPU64"\n",
1867                mds->mds_mount_count, mds->mds_last_transno);
1868
1869         spin_lock(&mds->mds_transno_lock);
1870         lsd->lsd_last_transno = cpu_to_le64(mds->mds_last_transno);
1871         spin_unlock(&mds->mds_transno_lock);
1872
1873         push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
1874         rc = fsfilt_write_record(obd, filp, lsd, sizeof(*lsd), &off,force_sync);
1875         pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
1876         if (rc)
1877                 CERROR("error writing MDS server data: rc = %d\n", rc);
1878         RETURN(rc);
1879 }
1880
1881 static void fsoptions_to_mds_flags(struct mds_obd *mds, char *options)
1882 {
1883         char *p = options;
1884
1885         if (!options)
1886                 return;
1887
1888         while (*options) {
1889                 int len;
1890
1891                 while (*p && *p != ',')
1892                         p++;
1893
1894                 len = p - options;
1895                 if (len == sizeof("user_xattr") - 1 &&
1896                     memcmp(options, "user_xattr", len) == 0) {
1897                         mds->mds_fl_user_xattr = 1;
1898                         LCONSOLE_INFO("Enabling user_xattr\n");
1899                 } else if (len == sizeof("nouser_xattr") - 1 &&
1900                            memcmp(options, "nouser_xattr", len) == 0) {
1901                         mds->mds_fl_user_xattr = 0;
1902                         LCONSOLE_INFO("Disabling user_xattr\n");
1903                 } else if (len == sizeof("acl") - 1 &&
1904                            memcmp(options, "acl", len) == 0) {
1905 #ifdef CONFIG_FS_POSIX_ACL
1906                         mds->mds_fl_acl = 1;
1907                         LCONSOLE_INFO("Enabling ACL\n");
1908 #else
1909                         CWARN("ignoring unsupported acl mount option\n");
1910 #endif
1911                 } else if (len == sizeof("noacl") - 1 &&
1912                            memcmp(options, "noacl", len) == 0) {
1913 #ifdef CONFIG_FS_POSIX_ACL
1914                         mds->mds_fl_acl = 0;
1915                         LCONSOLE_INFO("Disabling ACL\n");
1916 #endif
1917                 }
1918
1919                 options = ++p;
1920         }
1921 }
1922 static int mds_lov_presetup (struct mds_obd *mds, struct lustre_cfg *lcfg)
1923 {
1924         int rc = 0;
1925         ENTRY;
1926
1927         if (lcfg->lcfg_bufcount >= 4 && LUSTRE_CFG_BUFLEN(lcfg, 3) > 0) {
1928                 class_uuid_t uuid;
1929
1930                 ll_generate_random_uuid(uuid);
1931                 class_uuid_unparse(uuid, &mds->mds_lov_uuid);
1932
1933                 OBD_ALLOC(mds->mds_profile, LUSTRE_CFG_BUFLEN(lcfg, 3));
1934                 if (mds->mds_profile == NULL)
1935                         RETURN(-ENOMEM);
1936
1937                 strncpy(mds->mds_profile, lustre_cfg_string(lcfg, 3),
1938                         LUSTRE_CFG_BUFLEN(lcfg, 3));
1939         }
1940         RETURN(rc);
1941 }
1942
1943 /* mount the file system (secretly).  lustre_cfg parameters are:
1944  * 1 = device
1945  * 2 = fstype
1946  * 3 = config name
1947  * 4 = mount options
1948  */
1949 static int mds_setup(struct obd_device *obd, struct lustre_cfg* lcfg)
1950 {
1951         struct lprocfs_static_vars lvars;
1952         struct mds_obd *mds = &obd->u.mds;
1953         struct lustre_mount_info *lmi;
1954         struct vfsmount *mnt;
1955         struct lustre_sb_info *lsi;
1956         struct obd_uuid uuid;
1957         __u8 *uuid_ptr;
1958         char *str, *label;
1959         char ns_name[48];
1960         int rc = 0;
1961         ENTRY;
1962
1963         /* setup 1:/dev/loop/0 2:ext3 3:mdsA 4:errors=remount-ro,iopen_nopriv */
1964
1965         CLASSERT(offsetof(struct obd_device, u.obt) ==
1966                  offsetof(struct obd_device, u.mds.mds_obt));
1967
1968         if (lcfg->lcfg_bufcount < 3)
1969                 RETURN(-EINVAL);
1970
1971         if (LUSTRE_CFG_BUFLEN(lcfg, 1) == 0 || LUSTRE_CFG_BUFLEN(lcfg, 2) == 0)
1972                 RETURN(-EINVAL);
1973
1974         lmi = server_get_mount(obd->obd_name);
1975         if (!lmi) {
1976                 CERROR("Not mounted in lustre_fill_super?\n");
1977                 RETURN(-EINVAL);
1978         }
1979
1980         /* We mounted in lustre_fill_super.
1981            lcfg bufs 1, 2, 4 (device, fstype, mount opts) are ignored.*/
1982
1983         lsi = s2lsi(lmi->lmi_sb);
1984         fsoptions_to_mds_flags(mds, lsi->lsi_ldd->ldd_mount_opts);
1985         fsoptions_to_mds_flags(mds, lsi->lsi_lmd->lmd_opts);
1986         mnt = lmi->lmi_mnt;
1987         obd->obd_fsops = fsfilt_get_ops(MT_STR(lsi->lsi_ldd));
1988         if (IS_ERR(obd->obd_fsops))
1989                 GOTO(err_put, rc = PTR_ERR(obd->obd_fsops));
1990
1991         CDEBUG(D_SUPER, "%s: mnt = %p\n", lustre_cfg_string(lcfg, 1), mnt);
1992
1993         LASSERT(!lvfs_check_rdonly(lvfs_sbdev(mnt->mnt_sb)));
1994
1995         sema_init(&mds->mds_epoch_sem, 1);
1996         spin_lock_init(&mds->mds_transno_lock);
1997         mds->mds_max_mdsize = sizeof(struct lov_mds_md);
1998         mds->mds_max_cookiesize = sizeof(struct llog_cookie);
1999         mds->mds_atime_diff = MAX_ATIME_DIFF;
2000         mds->mds_evict_ost_nids = 1;
2001
2002         sprintf(ns_name, "mds-%s", obd->obd_uuid.uuid);
2003         obd->obd_namespace = ldlm_namespace_new(ns_name, LDLM_NAMESPACE_SERVER,
2004                                                 LDLM_NAMESPACE_GREEDY);
2005         if (obd->obd_namespace == NULL) {
2006                 mds_cleanup(obd);
2007                 GOTO(err_ops, rc = -ENOMEM);
2008         }
2009         ldlm_register_intent(obd->obd_namespace, mds_intent_policy);
2010
2011         lprocfs_mds_init_vars(&lvars);
2012         if (lprocfs_obd_setup(obd, lvars.obd_vars) == 0 &&
2013             lprocfs_alloc_obd_stats(obd, LPROC_MDS_LAST) == 0) {
2014                 /* Init private stats here */
2015                 mds_stats_counter_init(obd->obd_stats);
2016                 obd->obd_proc_exports_entry = proc_mkdir("exports",
2017                                                          obd->obd_proc_entry);
2018         }
2019
2020         rc = mds_fs_setup(obd, mnt);
2021         if (rc) {
2022                 CERROR("%s: MDS filesystem method init failed: rc = %d\n",
2023                        obd->obd_name, rc);
2024                 GOTO(err_ns, rc);
2025         }
2026
2027         if (obd->obd_proc_exports_entry)
2028                 lprocfs_add_simple(obd->obd_proc_exports_entry,
2029                                    "clear", lprocfs_nid_stats_clear_read,
2030                                    lprocfs_nid_stats_clear_write, obd);
2031
2032         rc = mds_lov_presetup(mds, lcfg);
2033         if (rc < 0)
2034                 GOTO(err_fs, rc);
2035
2036         ptlrpc_init_client(LDLM_CB_REQUEST_PORTAL, LDLM_CB_REPLY_PORTAL,
2037                            "mds_ldlm_client", &obd->obd_ldlm_client);
2038         obd->obd_replayable = 1;
2039
2040         rc = lquota_setup(mds_quota_interface_ref, obd);
2041         if (rc)
2042                 GOTO(err_fs, rc);
2043
2044 #if 0
2045         mds->mds_group_hash = upcall_cache_init(obd->obd_name);
2046         if (IS_ERR(mds->mds_group_hash)) {
2047                 rc = PTR_ERR(mds->mds_group_hash);
2048                 mds->mds_group_hash = NULL;
2049                 GOTO(err_qctxt, rc);
2050         }
2051 #endif
2052
2053         /* Don't wait for mds_postrecov trying to clear orphans */
2054         obd->obd_async_recov = 1;
2055         rc = mds_postsetup(obd);
2056         /* Bug 11557 - allow async abort_recov start
2057            FIXME can remove most of this obd_async_recov plumbing
2058         obd->obd_async_recov = 0;
2059         */
2060         if (rc)
2061                 GOTO(err_qctxt, rc);
2062
2063         uuid_ptr = fsfilt_uuid(obd, obd->u.obt.obt_sb);
2064         if (uuid_ptr != NULL) {
2065                 class_uuid_unparse(uuid_ptr, &uuid);
2066                 str = uuid.uuid;
2067         } else {
2068                 str = "no UUID";
2069         }
2070
2071         label = fsfilt_get_label(obd, obd->u.obt.obt_sb);
2072         if (obd->obd_recovering) {
2073                 LCONSOLE_WARN("MDT %s now serving %s (%s%s%s), but will be in "
2074                               "recovery until %d %s reconnect, or if no clients"
2075                               " reconnect for %d:%.02d; during that time new "
2076                               "clients will not be allowed to connect. "
2077                               "Recovery progress can be monitored by watching "
2078                               "/proc/fs/lustre/mds/%s/recovery_status.\n",
2079                               obd->obd_name, lustre_cfg_string(lcfg, 1),
2080                               label ?: "", label ? "/" : "", str,
2081                               obd->obd_max_recoverable_clients,
2082                               (obd->obd_max_recoverable_clients == 1) ?
2083                               "client" : "clients",
2084                               (int)(OBD_RECOVERY_TIMEOUT) / 60,
2085                               (int)(OBD_RECOVERY_TIMEOUT) % 60,
2086                               obd->obd_name);
2087         } else {
2088                 LCONSOLE_INFO("MDT %s now serving %s (%s%s%s) with recovery "
2089                               "%s\n", obd->obd_name, lustre_cfg_string(lcfg, 1),
2090                               label ?: "", label ? "/" : "", str,
2091                               obd->obd_replayable ? "enabled" : "disabled");
2092         }
2093
2094         if (ldlm_timeout == LDLM_TIMEOUT_DEFAULT)
2095                 ldlm_timeout = 6;
2096
2097         RETURN(0);
2098
2099 err_qctxt:
2100         lquota_cleanup(mds_quota_interface_ref, obd);
2101 err_fs:
2102         /* No extra cleanup needed for llog_init_commit_thread() */
2103         mds_fs_cleanup(obd);
2104 #if 0
2105         upcall_cache_cleanup(mds->mds_group_hash);
2106         mds->mds_group_hash = NULL;
2107 #endif
2108 err_ns:
2109         lprocfs_free_obd_stats(obd);
2110         lprocfs_obd_cleanup(obd);
2111         ldlm_namespace_free(obd->obd_namespace, 0);
2112         obd->obd_namespace = NULL;
2113 err_ops:
2114         fsfilt_put_ops(obd->obd_fsops);
2115 err_put:
2116         server_put_mount(obd->obd_name, mnt);
2117         obd->u.obt.obt_sb = NULL;
2118         return rc;
2119 }
2120
2121 static int mds_lov_clean(struct obd_device *obd)
2122 {
2123         struct mds_obd *mds = &obd->u.mds;
2124         struct obd_device *osc = mds->mds_osc_obd;
2125         ENTRY;
2126
2127         if (mds->mds_profile) {
2128                 class_del_profile(mds->mds_profile);
2129                 OBD_FREE(mds->mds_profile, strlen(mds->mds_profile) + 1);
2130                 mds->mds_profile = NULL;
2131         }
2132
2133         /* There better be a lov */
2134         if (!osc)
2135                 RETURN(0);
2136         if (IS_ERR(osc))
2137                 RETURN(PTR_ERR(osc));
2138
2139         obd_register_observer(osc, NULL);
2140
2141         /* Give lov our same shutdown flags */
2142         osc->obd_force = obd->obd_force;
2143         osc->obd_fail = obd->obd_fail;
2144
2145         /* Cleanup the lov */
2146         obd_disconnect(mds->mds_osc_exp);
2147         class_manual_cleanup(osc);
2148         mds->mds_osc_exp = NULL;
2149
2150         RETURN(0);
2151 }
2152
2153 static int mds_postsetup(struct obd_device *obd)
2154 {
2155         struct mds_obd *mds = &obd->u.mds;
2156         int rc = 0;
2157         ENTRY;
2158
2159         rc = llog_setup(obd, &obd->obd_olg, LLOG_CONFIG_ORIG_CTXT, obd, 0, NULL,
2160                         &llog_lvfs_ops);
2161         if (rc)
2162                 RETURN(rc);
2163
2164         rc = llog_setup(obd, &obd->obd_olg, LLOG_LOVEA_ORIG_CTXT, obd, 0, NULL,
2165                         &llog_lvfs_ops);
2166         if (rc)
2167                 RETURN(rc);
2168
2169         if (mds->mds_profile) {
2170                 struct lustre_profile *lprof;
2171                 /* The profile defines which osc and mdc to connect to, for a
2172                    client.  We reuse that here to figure out the name of the
2173                    lov to use (and ignore lprof->lp_md).
2174                    The profile was set in the config log with
2175                    LCFG_MOUNTOPT profilenm oscnm mdcnm */
2176                 lprof = class_get_profile(mds->mds_profile);
2177                 if (lprof == NULL) {
2178                         CERROR("No profile found: %s\n", mds->mds_profile);
2179                         GOTO(err_cleanup, rc = -ENOENT);
2180                 }
2181                 rc = mds_lov_connect(obd, lprof->lp_dt);
2182                 if (rc)
2183                         GOTO(err_cleanup, rc);
2184         }
2185
2186         RETURN(rc);
2187
2188 err_cleanup:
2189         mds_lov_clean(obd);
2190         llog_cleanup(llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT));
2191         llog_cleanup(llog_get_context(obd, LLOG_LOVEA_ORIG_CTXT));
2192         RETURN(rc);
2193 }
2194
2195 int mds_postrecov(struct obd_device *obd)
2196 {
2197         int rc = 0;
2198         ENTRY;
2199
2200         if (obd->obd_fail)
2201                 RETURN(0);
2202
2203         LASSERT(!obd->obd_recovering);
2204         LASSERT(!llog_ctxt_null(obd, LLOG_MDS_OST_ORIG_CTXT));
2205
2206         /* clean PENDING dir */
2207         if (strncmp(obd->obd_name, MDD_OBD_NAME, strlen(MDD_OBD_NAME)))
2208                 rc = mds_cleanup_pending(obd);
2209                 if (rc < 0)
2210                         GOTO(out, rc);
2211
2212         /* FIXME Does target_finish_recovery really need this to block? */
2213         /* Notify the LOV, which will in turn call mds_notify for each tgt */
2214         /* This means that we have to hack obd_notify to think we're obd_set_up
2215            during mds_lov_connect. */
2216         obd_notify(obd->u.mds.mds_osc_obd, NULL,
2217                    obd->obd_async_recov ? OBD_NOTIFY_SYNC_NONBLOCK :
2218                    OBD_NOTIFY_SYNC, NULL);
2219
2220         /* quota recovery */
2221         lquota_recovery(mds_quota_interface_ref, obd);
2222
2223 out:
2224         RETURN(rc);
2225 }
2226
2227 /* We need to be able to stop an mds_lov_synchronize */
2228 static int mds_lov_early_clean(struct obd_device *obd)
2229 {
2230         struct mds_obd *mds = &obd->u.mds;
2231         struct obd_device *osc = mds->mds_osc_obd;
2232
2233         if (!osc || (!obd->obd_force && !obd->obd_fail))
2234                 return(0);
2235
2236         CDEBUG(D_HA, "abort inflight\n");
2237         return (obd_precleanup(osc, OBD_CLEANUP_EARLY));
2238 }
2239
2240 static int mds_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
2241 {
2242         int rc = 0;
2243         ENTRY;
2244
2245         switch (stage) {
2246         case OBD_CLEANUP_EARLY:
2247                 break;
2248         case OBD_CLEANUP_EXPORTS:
2249                 /*XXX Use this for mdd mds cleanup, so comment out
2250                  *this target_cleanup_recovery for this tmp MDD MDS
2251                  *Wangdi*/
2252                 if (strncmp(obd->obd_name, MDD_OBD_NAME, strlen(MDD_OBD_NAME)))
2253                         target_cleanup_recovery(obd);
2254                 mds_lov_early_clean(obd);
2255                 break;
2256         case OBD_CLEANUP_SELF_EXP:
2257                 mds_lov_disconnect(obd);
2258                 mds_lov_clean(obd);
2259                 llog_cleanup(llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT));
2260                 llog_cleanup(llog_get_context(obd, LLOG_LOVEA_ORIG_CTXT));
2261                 rc = obd_llog_finish(obd, 0);
2262                 break;
2263         case OBD_CLEANUP_OBD:
2264                 break;
2265         }
2266         RETURN(rc);
2267 }
2268
2269 static int mds_cleanup(struct obd_device *obd)
2270 {
2271         struct mds_obd *mds = &obd->u.mds;
2272         lvfs_sbdev_type save_dev;
2273         ENTRY;
2274
2275         if (obd->u.obt.obt_sb == NULL)
2276                 RETURN(0);
2277         save_dev = lvfs_sbdev(obd->u.obt.obt_sb);
2278
2279         if (mds->mds_osc_exp)
2280                 /* lov export was disconnected by mds_lov_clean;
2281                    we just need to drop our ref */
2282                 class_export_put(mds->mds_osc_exp);
2283
2284         remove_proc_entry("clear", obd->obd_proc_exports_entry);
2285         lprocfs_free_per_client_stats(obd);
2286         lprocfs_free_obd_stats(obd);
2287         lprocfs_obd_cleanup(obd);
2288
2289         lquota_cleanup(mds_quota_interface_ref, obd);
2290
2291         mds_update_server_data(obd, 1);
2292         /* XXX
2293         mds_lov_destroy_objids(obd);
2294         */
2295         mds_fs_cleanup(obd);
2296
2297 #if 0
2298         upcall_cache_cleanup(mds->mds_group_hash);
2299         mds->mds_group_hash = NULL;
2300 #endif
2301
2302         server_put_mount(obd->obd_name, mds->mds_vfsmnt);
2303         obd->u.obt.obt_sb = NULL;
2304
2305         ldlm_namespace_free(obd->obd_namespace, obd->obd_force);
2306
2307         spin_lock_bh(&obd->obd_processing_task_lock);
2308         if (obd->obd_recovering) {
2309                 target_cancel_recovery_timer(obd);
2310                 obd->obd_recovering = 0;
2311         }
2312         spin_unlock_bh(&obd->obd_processing_task_lock);
2313
2314         fsfilt_put_ops(obd->obd_fsops);
2315
2316         LCONSOLE_INFO("MDT %s has stopped.\n", obd->obd_name);
2317
2318         RETURN(0);
2319 }
2320
2321 static void fixup_handle_for_resent_req(struct ptlrpc_request *req, int offset,
2322                                         struct ldlm_lock *new_lock,
2323                                         struct ldlm_lock **old_lock,
2324                                         struct lustre_handle *lockh)
2325 {
2326         struct obd_export *exp = req->rq_export;
2327         struct ldlm_request *dlmreq =
2328                 lustre_msg_buf(req->rq_reqmsg, offset, sizeof(*dlmreq));
2329         struct lustre_handle remote_hdl = dlmreq->lock_handle[0];
2330         struct list_head *iter;
2331
2332         if (!(lustre_msg_get_flags(req->rq_reqmsg) & MSG_RESENT))
2333                 return;
2334
2335         spin_lock(&exp->exp_ldlm_data.led_lock);
2336         list_for_each(iter, &exp->exp_ldlm_data.led_held_locks) {
2337                 struct ldlm_lock *lock;
2338                 lock = list_entry(iter, struct ldlm_lock, l_export_chain);
2339                 if (lock == new_lock)
2340                         continue;
2341                 if (lock->l_remote_handle.cookie == remote_hdl.cookie) {
2342                         lockh->cookie = lock->l_handle.h_cookie;
2343                         LDLM_DEBUG(lock, "restoring lock cookie");
2344                         DEBUG_REQ(D_DLMTRACE, req,"restoring lock cookie "LPX64,
2345                                   lockh->cookie);
2346                         if (old_lock)
2347                                 *old_lock = LDLM_LOCK_GET(lock);
2348                         spin_unlock(&exp->exp_ldlm_data.led_lock);
2349                         return;
2350                 }
2351         }
2352         spin_unlock(&exp->exp_ldlm_data.led_lock);
2353
2354         /* If the xid matches, then we know this is a resent request,
2355          * and allow it. (It's probably an OPEN, for which we don't
2356          * send a lock */
2357         if (req->rq_xid ==
2358             le64_to_cpu(exp->exp_mds_data.med_mcd->mcd_last_xid))
2359                 return;
2360
2361         if (req->rq_xid ==
2362             le64_to_cpu(exp->exp_mds_data.med_mcd->mcd_last_close_xid))
2363                 return;
2364
2365         /* This remote handle isn't enqueued, so we never received or
2366          * processed this request.  Clear MSG_RESENT, because it can
2367          * be handled like any normal request now. */
2368
2369         lustre_msg_clear_flags(req->rq_reqmsg, MSG_RESENT);
2370
2371         DEBUG_REQ(D_DLMTRACE, req, "no existing lock with rhandle "LPX64,
2372                   remote_hdl.cookie);
2373 }
2374
2375 int intent_disposition(struct ldlm_reply *rep, int flag)
2376 {
2377         if (!rep)
2378                 return 0;
2379         return (rep->lock_policy_res1 & flag);
2380 }
2381
2382 void intent_set_disposition(struct ldlm_reply *rep, int flag)
2383 {
2384         if (!rep)
2385                 return;
2386         rep->lock_policy_res1 |= flag;
2387 }
2388
2389 static int mds_intent_policy(struct ldlm_namespace *ns,
2390                              struct ldlm_lock **lockp, void *req_cookie,
2391                              ldlm_mode_t mode, int flags, void *data)
2392 {
2393         struct ptlrpc_request *req = req_cookie;
2394         struct ldlm_lock *lock = *lockp;
2395         struct ldlm_intent *it;
2396         struct mds_obd *mds = &req->rq_export->exp_obd->u.mds;
2397         struct ldlm_reply *rep;
2398         struct lustre_handle lockh = { 0 };
2399         struct ldlm_lock *new_lock = NULL;
2400         int getattr_part = MDS_INODELOCK_UPDATE;
2401         int repsize[5] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
2402                            [DLM_LOCKREPLY_OFF]   = sizeof(struct ldlm_reply),
2403                            [DLM_REPLY_REC_OFF]   = sizeof(struct mds_body),
2404                            [DLM_REPLY_REC_OFF+1] = mds->mds_max_mdsize };
2405         int repbufcnt = 4, rc;
2406         ENTRY;
2407
2408         LASSERT(req != NULL);
2409
2410         if (lustre_msg_bufcount(req->rq_reqmsg) <= DLM_INTENT_IT_OFF) {
2411                 /* No intent was provided */
2412                 rc = lustre_pack_reply(req, 2, repsize, NULL);
2413                 if (rc)
2414                         RETURN(rc);
2415                 RETURN(0);
2416         }
2417
2418         it = lustre_swab_reqbuf(req, DLM_INTENT_IT_OFF, sizeof(*it),
2419                                 lustre_swab_ldlm_intent);
2420         if (it == NULL) {
2421                 CERROR("Intent missing\n");
2422                 RETURN(req->rq_status = -EFAULT);
2423         }
2424
2425         LDLM_DEBUG(lock, "intent policy, opc: %s", ldlm_it2str(it->opc));
2426
2427         if ((req->rq_export->exp_connect_flags & OBD_CONNECT_ACL) &&
2428             (it->opc & (IT_OPEN | IT_GETATTR | IT_LOOKUP)))
2429                 /* we should never allow OBD_CONNECT_ACL if not configured */
2430                 repsize[repbufcnt++] = LUSTRE_POSIX_ACL_MAX_SIZE;
2431         else if (it->opc & IT_UNLINK)
2432                 repsize[repbufcnt++] = mds->mds_max_cookiesize;
2433
2434         rc = lustre_pack_reply(req, repbufcnt, repsize, NULL);
2435         if (rc)
2436                 RETURN(req->rq_status = rc);
2437
2438         rep = lustre_msg_buf(req->rq_repmsg, DLM_LOCKREPLY_OFF, sizeof(*rep));
2439         intent_set_disposition(rep, DISP_IT_EXECD);
2440
2441
2442         /* execute policy */
2443         switch ((long)it->opc) {
2444         case IT_OPEN:
2445         case IT_CREAT|IT_OPEN:
2446                 mds_counter_incr(req->rq_export, LPROC_MDS_OPEN);
2447                 fixup_handle_for_resent_req(req, DLM_LOCKREQ_OFF, lock, NULL,
2448                                             &lockh);
2449                 /* XXX swab here to assert that an mds_open reint
2450                  * packet is following */
2451                 rep->lock_policy_res2 = mds_reint(req, DLM_INTENT_REC_OFF,
2452                                                   &lockh);
2453 #if 0
2454                 /* We abort the lock if the lookup was negative and
2455                  * we did not make it to the OPEN portion */
2456                 if (!intent_disposition(rep, DISP_LOOKUP_EXECD))
2457                         RETURN(ELDLM_LOCK_ABORTED);
2458                 if (intent_disposition(rep, DISP_LOOKUP_NEG) &&
2459                     !intent_disposition(rep, DISP_OPEN_OPEN))
2460 #endif
2461
2462                 /* If there was an error of some sort or if we are not
2463                  * returning any locks */
2464                 if (rep->lock_policy_res2 ||
2465                     !intent_disposition(rep, DISP_OPEN_LOCK))
2466                         RETURN(ELDLM_LOCK_ABORTED);
2467                 break;
2468         case IT_LOOKUP:
2469                         getattr_part = MDS_INODELOCK_LOOKUP;
2470         case IT_GETATTR:
2471                         getattr_part |= MDS_INODELOCK_LOOKUP;
2472                         OBD_COUNTER_INCREMENT(req->rq_export->exp_obd, getattr);
2473         case IT_READDIR:
2474                 fixup_handle_for_resent_req(req, DLM_LOCKREQ_OFF, lock,
2475                                             &new_lock, &lockh);
2476
2477                 /* INODEBITS_INTEROP: if this lock was converted from a
2478                  * plain lock (client does not support inodebits), then
2479                  * child lock must be taken with both lookup and update
2480                  * bits set for all operations.
2481                  */
2482                 if (!(req->rq_export->exp_connect_flags & OBD_CONNECT_IBITS))
2483                         getattr_part = MDS_INODELOCK_LOOKUP |
2484                                        MDS_INODELOCK_UPDATE;
2485
2486                 rep->lock_policy_res2 = mds_getattr_lock(req,DLM_INTENT_REC_OFF,
2487                                                          getattr_part, &lockh);
2488                 /* FIXME: LDLM can set req->rq_status. MDS sets
2489                    policy_res{1,2} with disposition and status.
2490                    - replay: returns 0 & req->status is old status
2491                    - otherwise: returns req->status */
2492                 if (intent_disposition(rep, DISP_LOOKUP_NEG))
2493                         rep->lock_policy_res2 = 0;
2494                 if (!intent_disposition(rep, DISP_LOOKUP_POS) ||
2495                     rep->lock_policy_res2)
2496                         RETURN(ELDLM_LOCK_ABORTED);
2497                 if (req->rq_status != 0) {
2498                         LBUG();
2499                         rep->lock_policy_res2 = req->rq_status;
2500                         RETURN(ELDLM_LOCK_ABORTED);
2501                 }
2502                 break;
2503         default:
2504                 CERROR("Unhandled intent "LPD64"\n", it->opc);
2505                 RETURN(-EFAULT);
2506         }
2507
2508         /* By this point, whatever function we called above must have either
2509          * filled in 'lockh', been an intent replay, or returned an error.  We
2510          * want to allow replayed RPCs to not get a lock, since we would just
2511          * drop it below anyways because lock replay is done separately by the
2512          * client afterwards.  For regular RPCs we want to give the new lock to
2513          * the client instead of whatever lock it was about to get. */
2514         if (new_lock == NULL)
2515                 new_lock = ldlm_handle2lock(&lockh);
2516         if (new_lock == NULL && (flags & LDLM_FL_INTENT_ONLY))
2517                 RETURN(0);
2518
2519         LASSERTF(new_lock != NULL, "op "LPX64" lockh "LPX64"\n",
2520                  it->opc, lockh.cookie);
2521
2522         /* If we've already given this lock to a client once, then we should
2523          * have no readers or writers.  Otherwise, we should have one reader
2524          * _or_ writer ref (which will be zeroed below) before returning the
2525          * lock to a client. */
2526         if (new_lock->l_export == req->rq_export) {
2527                 LASSERT(new_lock->l_readers + new_lock->l_writers == 0);
2528         } else {
2529                 LASSERT(new_lock->l_export == NULL);
2530                 LASSERT(new_lock->l_readers + new_lock->l_writers == 1);
2531         }
2532
2533         *lockp = new_lock;
2534
2535         if (new_lock->l_export == req->rq_export) {
2536                 /* Already gave this to the client, which means that we
2537                  * reconstructed a reply. */
2538                 LASSERT(lustre_msg_get_flags(req->rq_reqmsg) &
2539                         MSG_RESENT);
2540                 RETURN(ELDLM_LOCK_REPLACED);
2541         }
2542
2543         /* Fixup the lock to be given to the client */
2544         lock_res_and_lock(new_lock);
2545         new_lock->l_readers = 0;
2546         new_lock->l_writers = 0;
2547
2548         new_lock->l_export = class_export_get(req->rq_export);
2549         spin_lock(&req->rq_export->exp_ldlm_data.led_lock);
2550         list_add(&new_lock->l_export_chain,
2551                  &new_lock->l_export->exp_ldlm_data.led_held_locks);
2552         spin_unlock(&req->rq_export->exp_ldlm_data.led_lock);
2553
2554         new_lock->l_blocking_ast = lock->l_blocking_ast;
2555         new_lock->l_completion_ast = lock->l_completion_ast;
2556
2557         memcpy(&new_lock->l_remote_handle, &lock->l_remote_handle,
2558                sizeof(lock->l_remote_handle));
2559
2560         new_lock->l_flags &= ~LDLM_FL_LOCAL;
2561
2562         unlock_res_and_lock(new_lock);
2563         LDLM_LOCK_PUT(new_lock);
2564
2565         RETURN(ELDLM_LOCK_REPLACED);
2566 }
2567
2568 static int mdt_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
2569 {
2570         struct mds_obd *mds = &obd->u.mds;
2571         struct lprocfs_static_vars lvars;
2572         int mds_min_threads;
2573         int mds_max_threads;
2574         int rc = 0;
2575         ENTRY;
2576
2577         lprocfs_mdt_init_vars(&lvars);
2578         lprocfs_obd_setup(obd, lvars.obd_vars);
2579
2580         sema_init(&mds->mds_health_sem, 1);
2581
2582         if (mds_num_threads) {
2583                 /* If mds_num_threads is set, it is the min and the max. */
2584                 if (mds_num_threads > MDS_THREADS_MAX)
2585                         mds_num_threads = MDS_THREADS_MAX;
2586                 if (mds_num_threads < MDS_THREADS_MIN)
2587                         mds_num_threads = MDS_THREADS_MIN;
2588                 mds_max_threads = mds_min_threads = mds_num_threads;
2589         } else {
2590                 /* Base min threads on memory and cpus */
2591                 mds_min_threads = num_possible_cpus() * num_physpages >>
2592                         (27 - CFS_PAGE_SHIFT);
2593                 if (mds_min_threads < MDS_THREADS_MIN)
2594                         mds_min_threads = MDS_THREADS_MIN;
2595                 /* Largest auto threads start value */
2596                 if (mds_min_threads > 32)
2597                         mds_min_threads = 32;
2598                 mds_max_threads = min(MDS_THREADS_MAX, mds_min_threads * 4);
2599         }
2600
2601         mds->mds_service =
2602                 ptlrpc_init_svc(MDS_NBUFS, MDS_BUFSIZE, MDS_MAXREQSIZE,
2603                                 MDS_MAXREPSIZE, MDS_REQUEST_PORTAL,
2604                                 MDC_REPLY_PORTAL, MDS_SERVICE_WATCHDOG_TIMEOUT,
2605                                 mds_handle, LUSTRE_MDS_NAME,
2606                                 obd->obd_proc_entry, NULL,
2607                                 mds_min_threads, mds_max_threads, "ll_mdt", 0);
2608
2609         if (!mds->mds_service) {
2610                 CERROR("failed to start service\n");
2611                 GOTO(err_lprocfs, rc = -ENOMEM);
2612         }
2613
2614         rc = ptlrpc_start_threads(obd, mds->mds_service);
2615         if (rc)
2616                 GOTO(err_thread, rc);
2617
2618         mds->mds_setattr_service =
2619                 ptlrpc_init_svc(MDS_NBUFS, MDS_BUFSIZE, MDS_MAXREQSIZE,
2620                                 MDS_MAXREPSIZE, MDS_SETATTR_PORTAL,
2621                                 MDC_REPLY_PORTAL, MDS_SERVICE_WATCHDOG_TIMEOUT,
2622                                 mds_handle, "mds_setattr",
2623                                 obd->obd_proc_entry, NULL,
2624                                 mds_min_threads, mds_max_threads,
2625                                 "ll_mdt_attr", 0);
2626         if (!mds->mds_setattr_service) {
2627                 CERROR("failed to start getattr service\n");
2628                 GOTO(err_thread, rc = -ENOMEM);
2629         }
2630
2631         rc = ptlrpc_start_threads(obd, mds->mds_setattr_service);
2632         if (rc)
2633                 GOTO(err_thread2, rc);
2634
2635         mds->mds_readpage_service =
2636                 ptlrpc_init_svc(MDS_NBUFS, MDS_BUFSIZE, MDS_MAXREQSIZE,
2637                                 MDS_MAXREPSIZE, MDS_READPAGE_PORTAL,
2638                                 MDC_REPLY_PORTAL, MDS_SERVICE_WATCHDOG_TIMEOUT,
2639                                 mds_handle, "mds_readpage",
2640                                 obd->obd_proc_entry, NULL,
2641                                 MDS_THREADS_MIN_READPAGE, mds_max_threads,
2642                                 "ll_mdt_rdpg", 0);
2643         if (!mds->mds_readpage_service) {
2644                 CERROR("failed to start readpage service\n");
2645                 GOTO(err_thread2, rc = -ENOMEM);
2646         }
2647
2648         rc = ptlrpc_start_threads(obd, mds->mds_readpage_service);
2649
2650         if (rc)
2651                 GOTO(err_thread3, rc);
2652
2653         ping_evictor_start();
2654
2655         RETURN(0);
2656
2657 err_thread3:
2658         ptlrpc_unregister_service(mds->mds_readpage_service);
2659         mds->mds_readpage_service = NULL;
2660 err_thread2:
2661         ptlrpc_unregister_service(mds->mds_setattr_service);
2662         mds->mds_setattr_service = NULL;
2663 err_thread:
2664         ptlrpc_unregister_service(mds->mds_service);
2665         mds->mds_service = NULL;
2666 err_lprocfs:
2667         lprocfs_obd_cleanup(obd);
2668         return rc;
2669 }
2670
2671 static int mdt_cleanup(struct obd_device *obd)
2672 {
2673         struct mds_obd *mds = &obd->u.mds;
2674         ENTRY;
2675
2676         ping_evictor_stop();
2677
2678         down(&mds->mds_health_sem);
2679         ptlrpc_unregister_service(mds->mds_readpage_service);
2680         ptlrpc_unregister_service(mds->mds_setattr_service);
2681         ptlrpc_unregister_service(mds->mds_service);
2682         mds->mds_readpage_service = NULL;
2683         mds->mds_setattr_service = NULL;
2684         mds->mds_service = NULL;
2685         up(&mds->mds_health_sem);
2686
2687         lprocfs_obd_cleanup(obd);
2688
2689         RETURN(0);
2690 }
2691
2692 static int mdt_health_check(struct obd_device *obd)
2693 {
2694         struct mds_obd *mds = &obd->u.mds;
2695         int rc = 0;
2696
2697         down(&mds->mds_health_sem);
2698         rc |= ptlrpc_service_health_check(mds->mds_readpage_service);
2699         rc |= ptlrpc_service_health_check(mds->mds_setattr_service);
2700         rc |= ptlrpc_service_health_check(mds->mds_service);
2701         up(&mds->mds_health_sem);
2702
2703         /*
2704          * health_check to return 0 on healthy
2705          * and 1 on unhealthy.
2706          */
2707         if(rc != 0)
2708                 rc = 1;
2709
2710         return rc;
2711 }
2712
2713 static struct dentry *mds_lvfs_fid2dentry(__u64 id, __u32 gen, __u64 gr,
2714                                           void *data)
2715 {
2716         struct obd_device *obd = data;
2717         struct ll_fid fid;
2718         fid.id = id;
2719         fid.generation = gen;
2720         return mds_fid2dentry(&obd->u.mds, &fid, NULL);
2721 }
2722
2723 static int mds_health_check(struct obd_device *obd)
2724 {
2725         struct obd_device_target *odt = &obd->u.obt;
2726 #ifdef USE_HEALTH_CHECK_WRITE
2727         struct mds_obd *mds = &obd->u.mds;
2728 #endif
2729         int rc = 0;
2730
2731         if (odt->obt_sb->s_flags & MS_RDONLY)
2732                 rc = 1;
2733
2734 #ifdef USE_HEALTH_CHECK_WRITE
2735         LASSERT(mds->mds_health_check_filp != NULL);
2736         rc |= !!lvfs_check_io_health(obd, mds->mds_health_check_filp);
2737 #endif
2738         return rc;
2739 }
2740
2741 static int mds_process_config(struct obd_device *obd, obd_count len, void *buf)
2742 {
2743         struct lustre_cfg *lcfg = buf;
2744         struct lprocfs_static_vars lvars;
2745         int rc;
2746
2747         lprocfs_mds_init_vars(&lvars);
2748
2749         rc = class_process_proc_param(PARAM_MDT, lvars.obd_vars, lcfg, obd);
2750         return(rc);
2751 }
2752
2753 struct lvfs_callback_ops mds_lvfs_ops = {
2754         l_fid2dentry:     mds_lvfs_fid2dentry,
2755 };
2756
2757 /* use obd ops to offer management infrastructure */
2758 static struct obd_ops mds_obd_ops = {
2759         .o_owner           = THIS_MODULE,
2760         .o_connect         = mds_connect,
2761         .o_reconnect       = mds_reconnect,
2762         .o_init_export     = mds_init_export,
2763         .o_destroy_export  = mds_destroy_export,
2764         .o_disconnect      = mds_disconnect,
2765         .o_setup           = mds_setup,
2766         .o_precleanup      = mds_precleanup,
2767         .o_cleanup         = mds_cleanup,
2768         .o_postrecov       = mds_postrecov,
2769         .o_statfs          = mds_obd_statfs,
2770         .o_iocontrol       = mds_iocontrol,
2771         .o_create          = mds_obd_create,
2772         .o_destroy         = mds_obd_destroy,
2773         .o_llog_init       = mds_llog_init,
2774         .o_llog_finish     = mds_llog_finish,
2775         .o_notify          = mds_notify,
2776         .o_health_check    = mds_health_check,
2777         .o_process_config  = mds_process_config,
2778 };
2779
2780 static struct obd_ops mdt_obd_ops = {
2781         .o_owner           = THIS_MODULE,
2782         .o_setup           = mdt_setup,
2783         .o_cleanup         = mdt_cleanup,
2784         .o_health_check    = mdt_health_check,
2785 };
2786
2787 quota_interface_t *mds_quota_interface_ref;
2788 extern quota_interface_t mds_quota_interface;
2789
2790 static __attribute__((unused)) int __init mds_init(void)
2791 {
2792         int rc;
2793         struct lprocfs_static_vars lvars;
2794
2795         request_module("lquota");
2796         mds_quota_interface_ref = PORTAL_SYMBOL_GET(mds_quota_interface);
2797         rc = lquota_init(mds_quota_interface_ref);
2798         if (rc) {
2799                 if (mds_quota_interface_ref)
2800                         PORTAL_SYMBOL_PUT(mds_quota_interface);
2801                 return rc;
2802         }
2803         init_obd_quota_ops(mds_quota_interface_ref, &mds_obd_ops);
2804
2805         lprocfs_mds_init_vars(&lvars);
2806         class_register_type(&mds_obd_ops, NULL,
2807                             lvars.module_vars, LUSTRE_MDS_NAME, NULL);
2808         lprocfs_mds_init_vars(&lvars);
2809         mdt_obd_ops = mdt_obd_ops; //make compiler happy
2810 //        class_register_type(&mdt_obd_ops, NULL,
2811 //                            lvars.module_vars, LUSTRE_MDT_NAME, NULL);
2812
2813         return 0;
2814 }
2815
2816 static __attribute__((unused)) void /*__exit*/ mds_exit(void)
2817 {
2818         lquota_exit(mds_quota_interface_ref);
2819         if (mds_quota_interface_ref)
2820                 PORTAL_SYMBOL_PUT(mds_quota_interface);
2821
2822         class_unregister_type(LUSTRE_MDS_NAME);
2823 //        class_unregister_type(LUSTRE_MDT_NAME);
2824 }
2825 /*mds still need lov setup here*/
2826 static int mds_cmd_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
2827 {
2828         struct mds_obd *mds = &obd->u.mds;
2829         struct lvfs_run_ctxt saved;
2830         const char     *dev;
2831         struct vfsmount *mnt;
2832         struct lustre_sb_info *lsi;
2833         struct lustre_mount_info *lmi;
2834         struct dentry  *dentry;
2835         int rc = 0;
2836         ENTRY;
2837
2838         CDEBUG(D_INFO, "obd %s setup \n", obd->obd_name);
2839         if (strncmp(obd->obd_name, MDD_OBD_NAME, strlen(MDD_OBD_NAME)))
2840                 RETURN(0);
2841
2842         if (lcfg->lcfg_bufcount < 5) {
2843                 CERROR("invalid arg for setup %s\n", MDD_OBD_NAME);
2844                 RETURN(-EINVAL);
2845         }
2846         dev = lustre_cfg_string(lcfg, 4);
2847         lmi = server_get_mount(dev);
2848         LASSERT(lmi != NULL);
2849
2850         lsi = s2lsi(lmi->lmi_sb);
2851         mnt = lmi->lmi_mnt;
2852         /* FIXME: MDD LOV initialize objects.
2853          * we need only lmi here but not get mount
2854          * OSD did mount already, so put mount back
2855          */
2856         atomic_dec(&lsi->lsi_mounts);
2857         mntput(mnt);
2858
2859         obd->obd_fsops = fsfilt_get_ops(MT_STR(lsi->lsi_ldd));
2860         mds_init_ctxt(obd, mnt);
2861
2862         push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
2863         dentry = simple_mkdir(current->fs->pwd, "OBJECTS", 0777, 1);
2864         if (IS_ERR(dentry)) {
2865                 rc = PTR_ERR(dentry);
2866                 CERROR("cannot create OBJECTS directory: rc = %d\n", rc);
2867                 GOTO(err_putfs, rc);
2868         }
2869         mds->mds_objects_dir = dentry;
2870
2871         dentry = lookup_one_len("__iopen__", current->fs->pwd,
2872                                 strlen("__iopen__"));
2873         if (IS_ERR(dentry)) {
2874                 rc = PTR_ERR(dentry);
2875                 CERROR("cannot lookup __iopen__ directory: rc = %d\n", rc);
2876                 GOTO(err_objects, rc);
2877         }
2878
2879         mds->mds_fid_de = dentry;
2880         if (!dentry->d_inode || is_bad_inode(dentry->d_inode)) {
2881                 rc = -ENOENT;
2882                 CERROR("__iopen__ directory has no inode? rc = %d\n", rc);
2883                 GOTO(err_fid, rc);
2884         }
2885         rc = mds_lov_init_objids(obd);
2886         if (rc != 0) {
2887                CERROR("cannot init lov objid rc = %d\n", rc);
2888                GOTO(err_fid, rc );
2889         }
2890
2891         rc = mds_lov_presetup(mds, lcfg);
2892         if (rc < 0)
2893                 GOTO(err_objects, rc);
2894
2895         /* Don't wait for mds_postrecov trying to clear orphans */
2896         obd->obd_async_recov = 1;
2897         rc = mds_postsetup(obd);
2898         /* Bug 11557 - allow async abort_recov start
2899            FIXME can remove most of this obd_async_recov plumbing
2900         obd->obd_async_recov = 0;
2901         */
2902
2903         if (rc)
2904                 GOTO(err_objects, rc);
2905
2906         mds->mds_max_mdsize = sizeof(struct lov_mds_md);
2907         mds->mds_max_cookiesize = sizeof(struct llog_cookie);
2908
2909 err_pop:
2910         pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
2911         RETURN(rc);
2912 err_fid:
2913         dput(mds->mds_fid_de);
2914 err_objects:
2915         dput(mds->mds_objects_dir);
2916 err_putfs:
2917         fsfilt_put_ops(obd->obd_fsops);
2918         goto err_pop;
2919 }
2920
2921 static int mds_cmd_cleanup(struct obd_device *obd)
2922 {
2923         struct mds_obd *mds = &obd->u.mds;
2924         struct lvfs_run_ctxt saved;
2925         int rc = 0;
2926         ENTRY;
2927
2928         if (obd->obd_fail)
2929                 LCONSOLE_WARN("%s: shutting down for failover; client state "
2930                               "will be preserved.\n", obd->obd_name);
2931
2932         push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
2933
2934         mds_lov_destroy_objids(obd);
2935
2936         if (mds->mds_objects_dir != NULL) {
2937                 l_dput(mds->mds_objects_dir);
2938                 mds->mds_objects_dir = NULL;
2939         }
2940
2941         shrink_dcache_parent(mds->mds_fid_de);
2942         dput(mds->mds_fid_de);
2943         LL_DQUOT_OFF(obd->u.obt.obt_sb);
2944         fsfilt_put_ops(obd->obd_fsops);
2945
2946         pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
2947         RETURN(rc);
2948 }
2949
2950 #if 0
2951 static int mds_cmd_health_check(struct obd_device *obd)
2952 {
2953         return 0;
2954 }
2955 #endif
2956 static struct obd_ops mds_cmd_obd_ops = {
2957         .o_owner           = THIS_MODULE,
2958         .o_setup           = mds_cmd_setup,
2959         .o_cleanup         = mds_cmd_cleanup,
2960         .o_precleanup      = mds_precleanup,
2961         .o_create          = mds_obd_create,
2962         .o_destroy         = mds_obd_destroy,
2963         .o_llog_init       = mds_llog_init,
2964         .o_llog_finish     = mds_llog_finish,
2965         .o_notify          = mds_notify,
2966         .o_postrecov       = mds_postrecov,
2967         //   .o_health_check    = mds_cmd_health_check,
2968 };
2969
2970 static int __init mds_cmd_init(void)
2971 {
2972         struct lprocfs_static_vars lvars;
2973
2974         lprocfs_mds_init_vars(&lvars);
2975         class_register_type(&mds_cmd_obd_ops, NULL, lvars.module_vars,
2976                             LUSTRE_MDS_NAME, NULL);
2977
2978         return 0;
2979 }
2980
2981 static void /*__exit*/ mds_cmd_exit(void)
2982 {
2983         class_unregister_type(LUSTRE_MDS_NAME);
2984 }
2985
2986 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
2987 MODULE_DESCRIPTION("Lustre Metadata Server (MDS)");
2988 MODULE_LICENSE("GPL");
2989
2990 module_init(mds_cmd_init);
2991 module_exit(mds_cmd_exit);