Whamcloud - gitweb
8346d069c9acb1b5d74d83d3103f5941fb32b62d
[fs/lustre-release.git] / lustre / mds / handler.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  lustre/mds/handler.c
5  *  Lustre Metadata Server (mds) request handler
6  *
7  *  Copyright (c) 2001-2005 Cluster File Systems, Inc.
8  *   Author: Peter Braam <braam@clusterfs.com>
9  *   Author: Andreas Dilger <adilger@clusterfs.com>
10  *   Author: Phil Schwan <phil@clusterfs.com>
11  *   Author: Mike Shaver <shaver@clusterfs.com>
12  *
13  *   This file is part of the Lustre file system, http://www.lustre.org
14  *   Lustre is a trademark of Cluster File Systems, Inc.
15  *
16  *   You may have signed or agreed to another license before downloading
17  *   this software.  If so, you are bound by the terms and conditions
18  *   of that agreement, and the following does not apply to you.  See the
19  *   LICENSE file included with this distribution for more information.
20  *
21  *   If you did not agree to a different license, then this copy of Lustre
22  *   is open source software; you can redistribute it and/or modify it
23  *   under the terms of version 2 of the GNU General Public License as
24  *   published by the Free Software Foundation.
25  *
26  *   In either case, Lustre is distributed in the hope that it will be
27  *   useful, but WITHOUT ANY WARRANTY; without even the implied warranty
28  *   of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
29  *   license text for more details.
30  */
31
32 #ifndef EXPORT_SYMTAB
33 # define EXPORT_SYMTAB
34 #endif
35 #define DEBUG_SUBSYSTEM S_MDS
36
37 #include <lustre_mds.h>
38 #include <linux/module.h>
39 #include <linux/init.h>
40 #include <linux/random.h>
41 #include <linux/fs.h>
42 #include <linux/jbd.h>
43 #include <linux/ext3_fs.h>
44 #if (LINUX_VERSION_CODE >= KERNEL_VERSION(2,5,0))
45 # include <linux/smp_lock.h>
46 # include <linux/buffer_head.h>
47 # include <linux/workqueue.h>
48 # include <linux/mount.h>
49 #else
50 # include <linux/locks.h>
51 #endif
52
53 #include <linux/lustre_acl.h>
54 #include <obd_class.h>
55 #include <lustre_dlm.h>
56 #include <obd_lov.h>
57 #include <lustre_fsfilt.h>
58 #include <lprocfs_status.h>
59 #include <lustre_commit_confd.h>
60 #include <lustre_quota.h>
61 #include <lustre_disk.h>
62 #include <lustre_ver.h>
63
64 #include "mds_internal.h"
65
66 int mds_num_threads;
67 CFS_MODULE_PARM(mds_num_threads, "i", int, 0444,
68                 "number of MDS service threads to start");
69
70 static int mds_intent_policy(struct ldlm_namespace *ns,
71                              struct ldlm_lock **lockp, void *req_cookie,
72                              ldlm_mode_t mode, int flags, void *data);
73 static int mds_postsetup(struct obd_device *obd);
74 static int mds_cleanup(struct obd_device *obd);
75
76 /* Assumes caller has already pushed into the kernel filesystem context */
77 static int mds_sendpage(struct ptlrpc_request *req, struct file *file,
78                         loff_t offset, int count)
79 {
80         struct ptlrpc_bulk_desc *desc;
81         struct l_wait_info lwi;
82         struct page **pages;
83         int rc = 0, npages, i, tmpcount, tmpsize = 0;
84         ENTRY;
85
86         LASSERT((offset & (PAGE_SIZE - 1)) == 0); /* I'm dubious about this */
87
88         npages = (count + PAGE_SIZE - 1) >> PAGE_SHIFT;
89         OBD_ALLOC(pages, sizeof(*pages) * npages);
90         if (!pages)
91                 GOTO(out, rc = -ENOMEM);
92
93         desc = ptlrpc_prep_bulk_exp(req, npages, BULK_PUT_SOURCE,
94                                     MDS_BULK_PORTAL);
95         if (desc == NULL)
96                 GOTO(out_free, rc = -ENOMEM);
97
98         for (i = 0, tmpcount = count; i < npages; i++, tmpcount -= tmpsize) {
99                 tmpsize = tmpcount > PAGE_SIZE ? PAGE_SIZE : tmpcount;
100
101                 pages[i] = alloc_pages(GFP_KERNEL, 0);
102                 if (pages[i] == NULL)
103                         GOTO(cleanup_buf, rc = -ENOMEM);
104
105                 ptlrpc_prep_bulk_page(desc, pages[i], 0, tmpsize);
106         }
107
108         for (i = 0, tmpcount = count; i < npages; i++, tmpcount -= tmpsize) {
109                 tmpsize = tmpcount > PAGE_SIZE ? PAGE_SIZE : tmpcount;
110                 CDEBUG(D_EXT2, "reading %u@%llu from dir %lu (size %llu)\n",
111                        tmpsize, offset, file->f_dentry->d_inode->i_ino,
112                        file->f_dentry->d_inode->i_size);
113
114                 rc = fsfilt_readpage(req->rq_export->exp_obd, file,
115                                      kmap(pages[i]), tmpsize, &offset);
116                 kunmap(pages[i]);
117
118                 if (rc != tmpsize)
119                         GOTO(cleanup_buf, rc = -EIO);
120         }
121
122         LASSERT(desc->bd_nob == count);
123
124         rc = ptlrpc_start_bulk_transfer(desc);
125         if (rc)
126                 GOTO(cleanup_buf, rc);
127
128         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_SENDPAGE)) {
129                 CERROR("obd_fail_loc=%x, fail operation rc=%d\n",
130                        OBD_FAIL_MDS_SENDPAGE, rc);
131                 GOTO(abort_bulk, rc);
132         }
133
134         lwi = LWI_TIMEOUT(obd_timeout * HZ / 4, NULL, NULL);
135         rc = l_wait_event(desc->bd_waitq, !ptlrpc_bulk_active(desc), &lwi);
136         LASSERT (rc == 0 || rc == -ETIMEDOUT);
137
138         if (rc == 0) {
139                 if (desc->bd_success &&
140                     desc->bd_nob_transferred == count)
141                         GOTO(cleanup_buf, rc);
142
143                 rc = -ETIMEDOUT; /* XXX should this be a different errno? */
144         }
145
146         DEBUG_REQ(D_ERROR, req, "bulk failed: %s %d(%d), evicting %s@%s\n",
147                   (rc == -ETIMEDOUT) ? "timeout" : "network error",
148                   desc->bd_nob_transferred, count,
149                   req->rq_export->exp_client_uuid.uuid,
150                   req->rq_export->exp_connection->c_remote_uuid.uuid);
151
152         class_fail_export(req->rq_export);
153
154         EXIT;
155  abort_bulk:
156         ptlrpc_abort_bulk (desc);
157  cleanup_buf:
158         for (i = 0; i < npages; i++)
159                 if (pages[i])
160                         __free_pages(pages[i], 0);
161
162         ptlrpc_free_bulk(desc);
163  out_free:
164         OBD_FREE(pages, sizeof(*pages) * npages);
165  out:
166         return rc;
167 }
168
169 /* only valid locked dentries or errors should be returned */
170 struct dentry *mds_fid2locked_dentry(struct obd_device *obd, struct ll_fid *fid,
171                                      struct vfsmount **mnt, int lock_mode,
172                                      struct lustre_handle *lockh,
173                                      __u64 lockpart)
174 {
175         struct mds_obd *mds = &obd->u.mds;
176         struct dentry *de = mds_fid2dentry(mds, fid, mnt), *retval = de;
177         struct ldlm_res_id res_id = { .name = {0} };
178         int flags = 0, rc;
179         ldlm_policy_data_t policy = { .l_inodebits = { lockpart} };
180         ENTRY;
181
182         if (IS_ERR(de))
183                 RETURN(de);
184
185         res_id.name[0] = de->d_inode->i_ino;
186         res_id.name[1] = de->d_inode->i_generation;
187         rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace, res_id,
188                               LDLM_IBITS, &policy, lock_mode, &flags,
189                               ldlm_blocking_ast, ldlm_completion_ast,
190                               NULL, NULL, NULL, 0, NULL, lockh);
191         if (rc != ELDLM_OK) {
192                 l_dput(de);
193                 retval = ERR_PTR(-EIO); /* XXX translate ldlm code */
194         }
195
196         RETURN(retval);
197 }
198
199 /* Look up an entry by inode number. */
200 /* this function ONLY returns valid dget'd dentries with an initialized inode
201    or errors */
202 struct dentry *mds_fid2dentry(struct mds_obd *mds, struct ll_fid *fid,
203                               struct vfsmount **mnt)
204 {
205         char fid_name[32];
206         unsigned long ino = fid->id;
207         __u32 generation = fid->generation;
208         struct inode *inode;
209         struct dentry *result;
210
211         if (ino == 0)
212                 RETURN(ERR_PTR(-ESTALE));
213
214         snprintf(fid_name, sizeof(fid_name), "0x%lx", ino);
215
216         CDEBUG(D_DENTRY, "--> mds_fid2dentry: ino/gen %lu/%u, sb %p\n",
217                ino, generation, mds->mds_obt.obt_sb);
218
219         /* under ext3 this is neither supposed to return bad inodes
220            nor NULL inodes. */
221         result = ll_lookup_one_len(fid_name, mds->mds_fid_de, strlen(fid_name));
222         if (IS_ERR(result))
223                 RETURN(result);
224
225         inode = result->d_inode;
226         if (!inode)
227                 RETURN(ERR_PTR(-ENOENT));
228
229         if (inode->i_generation == 0 || inode->i_nlink == 0) {
230                 LCONSOLE_WARN("Found inode with zero generation or link -- this"
231                               " may indicate disk corruption (inode: %lu/%u, "
232                               "link %lu, count %d)\n", inode->i_ino,
233                               inode->i_generation,(unsigned long)inode->i_nlink,
234                               atomic_read(&inode->i_count));
235                 dput(result);
236                 RETURN(ERR_PTR(-ENOENT));
237         }
238
239         if (generation && inode->i_generation != generation) {
240                 /* we didn't find the right inode.. */
241                 CDEBUG(D_INODE, "found wrong generation: inode %lu, link: %lu, "
242                        "count: %d, generation %u/%u\n", inode->i_ino,
243                        (unsigned long)inode->i_nlink,
244                        atomic_read(&inode->i_count), inode->i_generation,
245                        generation);
246                 dput(result);
247                 RETURN(ERR_PTR(-ENOENT));
248         }
249
250         if (mnt) {
251                 *mnt = mds->mds_vfsmnt;
252                 mntget(*mnt);
253         }
254
255         RETURN(result);
256 }
257
258 static int mds_connect_internal(struct obd_export *exp,
259                                 struct obd_connect_data *data)
260 {
261         struct obd_device *obd = exp->exp_obd;
262         if (data != NULL) {
263                 data->ocd_connect_flags &= MDS_CONNECT_SUPPORTED;
264                 data->ocd_ibits_known &= MDS_INODELOCK_FULL;
265
266                 /* If no known bits (which should not happen, probably,
267                    as everybody should support LOOKUP and UPDATE bits at least)
268                    revert to compat mode with plain locks. */
269                 if (!data->ocd_ibits_known &&
270                     data->ocd_connect_flags & OBD_CONNECT_IBITS)
271                         data->ocd_connect_flags &= ~OBD_CONNECT_IBITS;
272
273                 if (!obd->u.mds.mds_fl_acl)
274                         data->ocd_connect_flags &= ~OBD_CONNECT_ACL;
275
276                 if (!obd->u.mds.mds_fl_user_xattr)
277                         data->ocd_connect_flags &= ~OBD_CONNECT_XATTR;
278
279                 exp->exp_connect_flags = data->ocd_connect_flags;
280                 data->ocd_version = LUSTRE_VERSION_CODE;
281                 exp->exp_mds_data.med_ibits_known = data->ocd_ibits_known;
282         }
283
284         if (obd->u.mds.mds_fl_acl &&
285             ((exp->exp_connect_flags & OBD_CONNECT_ACL) == 0)) {
286                 CWARN("%s: MDS requires ACL support but client does not\n",
287                       obd->obd_name);
288                 return -EBADE;
289         }
290         return 0;
291 }
292
293 static int mds_reconnect(struct obd_export *exp, struct obd_device *obd,
294                          struct obd_uuid *cluuid,
295                          struct obd_connect_data *data)
296 {
297         int rc;
298         ENTRY;
299
300         if (exp == NULL || obd == NULL || cluuid == NULL)
301                 RETURN(-EINVAL);
302
303         rc = mds_connect_internal(exp, data);
304
305         RETURN(rc);
306 }
307
308 /* Establish a connection to the MDS.
309  *
310  * This will set up an export structure for the client to hold state data
311  * about that client, like open files, the last operation number it did
312  * on the server, etc.
313  */
314 static int mds_connect(const struct lu_context *ctx,
315                        struct lustre_handle *conn, struct obd_device *obd,
316                        struct obd_uuid *cluuid, struct obd_connect_data *data)
317 {
318         struct obd_export *exp;
319         struct mds_export_data *med;
320         struct mds_client_data *mcd = NULL;
321         int rc, abort_recovery;
322         ENTRY;
323
324         if (!conn || !obd || !cluuid)
325                 RETURN(-EINVAL);
326
327         /* Check for aborted recovery. */
328         spin_lock_bh(&obd->obd_processing_task_lock);
329         abort_recovery = obd->obd_abort_recovery;
330         spin_unlock_bh(&obd->obd_processing_task_lock);
331         if (abort_recovery)
332                 target_abort_recovery(obd);
333
334         /* XXX There is a small race between checking the list and adding a
335          * new connection for the same UUID, but the real threat (list
336          * corruption when multiple different clients connect) is solved.
337          *
338          * There is a second race between adding the export to the list,
339          * and filling in the client data below.  Hence skipping the case
340          * of NULL mcd above.  We should already be controlling multiple
341          * connects at the client, and we can't hold the spinlock over
342          * memory allocations without risk of deadlocking.
343          */
344         rc = class_connect(conn, obd, cluuid);
345         if (rc)
346                 RETURN(rc);
347         exp = class_conn2export(conn);
348         LASSERT(exp);
349         med = &exp->exp_mds_data;
350
351         rc = mds_connect_internal(exp, data);
352         if (rc)
353                 GOTO(out, rc);
354
355         OBD_ALLOC(mcd, sizeof(*mcd));
356         if (!mcd)
357                 GOTO(out, rc = -ENOMEM);
358
359         memcpy(mcd->mcd_uuid, cluuid, sizeof(mcd->mcd_uuid));
360         med->med_mcd = mcd;
361
362         rc = mds_client_add(obd, &obd->u.mds, med, -1);
363         GOTO(out, rc);
364
365 out:
366         if (rc) {
367                 if (mcd) {
368                         OBD_FREE(mcd, sizeof(*mcd));
369                         med->med_mcd = NULL;
370                 }
371                 class_disconnect(exp);
372         } else {
373                 class_export_put(exp);
374         }
375
376         RETURN(rc);
377 }
378
379 int mds_init_export(struct obd_export *exp)
380 {
381         struct mds_export_data *med = &exp->exp_mds_data;
382
383         INIT_LIST_HEAD(&med->med_open_head);
384         spin_lock_init(&med->med_open_lock);
385         exp->exp_connecting = 1;
386         RETURN(0);
387 }
388
389 static int mds_destroy_export(struct obd_export *export)
390 {
391         struct mds_export_data *med;
392         struct obd_device *obd = export->exp_obd;
393         struct lvfs_run_ctxt saved;
394         int rc = 0;
395         ENTRY;
396
397         med = &export->exp_mds_data;
398         target_destroy_export(export);
399
400         if (obd_uuid_equals(&export->exp_client_uuid, &obd->obd_uuid))
401                 RETURN(0);
402
403         push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
404         /* Close any open files (which may also cause orphan unlinking). */
405         spin_lock(&med->med_open_lock);
406         while (!list_empty(&med->med_open_head)) {
407                 struct list_head *tmp = med->med_open_head.next;
408                 struct mds_file_data *mfd =
409                         list_entry(tmp, struct mds_file_data, mfd_list);
410                 struct dentry *dentry = mfd->mfd_dentry;
411
412                 /* Remove mfd handle so it can't be found again.
413                  * We are consuming the mfd_list reference here. */
414                 mds_mfd_unlink(mfd, 0);
415                 spin_unlock(&med->med_open_lock);
416
417                 /* If you change this message, be sure to update
418                  * replay_single:test_46 */
419                 CDEBUG(D_INODE|D_IOCTL, "%s: force closing file handle for "
420                        "%.*s (ino %lu)\n", obd->obd_name, dentry->d_name.len,
421                        dentry->d_name.name, dentry->d_inode->i_ino);
422                 /* child orphan sem protects orphan_dec_test and
423                  * is_orphan race, mds_mfd_close drops it */
424                 MDS_DOWN_WRITE_ORPHAN_SEM(dentry->d_inode);
425                 rc = mds_mfd_close(NULL, MDS_REQ_REC_OFF, obd, mfd,
426                                    !(export->exp_flags & OBD_OPT_FAILOVER));
427
428                 if (rc)
429                         CDEBUG(D_INODE|D_IOCTL, "Error closing file: %d\n", rc);
430                 spin_lock(&med->med_open_lock);
431         }
432         spin_unlock(&med->med_open_lock);
433         pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
434         mds_client_free(export);
435
436         RETURN(rc);
437 }
438
439 static int mds_disconnect(struct obd_export *exp)
440 {
441         unsigned long irqflags;
442         int rc;
443         ENTRY;
444
445         LASSERT(exp);
446         class_export_get(exp);
447
448         /* Disconnect early so that clients can't keep using export */
449         rc = class_disconnect(exp);
450         ldlm_cancel_locks_for_export(exp);
451
452         /* complete all outstanding replies */
453         spin_lock_irqsave(&exp->exp_lock, irqflags);
454         while (!list_empty(&exp->exp_outstanding_replies)) {
455                 struct ptlrpc_reply_state *rs =
456                         list_entry(exp->exp_outstanding_replies.next,
457                                    struct ptlrpc_reply_state, rs_exp_list);
458                 struct ptlrpc_service *svc = rs->rs_service;
459
460                 spin_lock(&svc->srv_lock);
461                 list_del_init(&rs->rs_exp_list);
462                 ptlrpc_schedule_difficult_reply(rs);
463                 spin_unlock(&svc->srv_lock);
464         }
465         spin_unlock_irqrestore(&exp->exp_lock, irqflags);
466
467         class_export_put(exp);
468         RETURN(rc);
469 }
470
471 static int mds_getstatus(struct ptlrpc_request *req)
472 {
473         struct mds_obd *mds = mds_req2mds(req);
474         struct mds_body *body;
475         int rc, size = sizeof(*body);
476         ENTRY;
477
478         rc = lustre_pack_reply(req, 1, &size, NULL);
479         if (rc || OBD_FAIL_CHECK(OBD_FAIL_MDS_GETSTATUS_PACK)) {
480                 CERROR("mds: out of memory for message: size=%d\n", size);
481                 req->rq_status = -ENOMEM;       /* superfluous? */
482                 RETURN(-ENOMEM);
483         }
484
485         body = lustre_msg_buf(req->rq_repmsg, 0, sizeof (*body));
486         memcpy(&body->fid1, &mds->mds_rootfid, sizeof(body->fid1));
487
488         /* the last_committed and last_xid fields are filled in for all
489          * replies already - no need to do so here also.
490          */
491         RETURN(0);
492 }
493
494 /* get the LOV EA from @inode and store it into @md.  It can be at most
495  * @size bytes, and @size is updated with the actual EA size.
496  * The EA size is also returned on success, and -ve errno on failure.
497  * If there is no EA then 0 is returned. */
498 int mds_get_md(struct obd_device *obd, struct inode *inode, void *md,
499                int *size, int lock)
500 {
501         int rc = 0;
502         int lmm_size;
503
504         if (lock)
505                 LOCK_INODE_MUTEX(inode);
506         rc = fsfilt_get_md(obd, inode, md, *size, "lov");
507
508         if (rc < 0) {
509                 CERROR("Error %d reading eadata for ino %lu\n",
510                        rc, inode->i_ino);
511         } else if (rc > 0) {
512                 lmm_size = rc;
513                 rc = mds_convert_lov_ea(obd, inode, md, lmm_size);
514
515                 if (rc == 0) {
516                         *size = lmm_size;
517                         rc = lmm_size;
518                 } else if (rc > 0) {
519                         *size = rc;
520                 }
521         } else {
522                 *size = 0;
523         }
524         if (lock)
525                 UNLOCK_INODE_MUTEX(inode);
526
527         RETURN (rc);
528 }
529
530
531 /* Call with lock=1 if you want mds_pack_md to take the i_mutex.
532  * Call with lock=0 if the caller has already taken the i_mutex. */
533 int mds_pack_md(struct obd_device *obd, struct lustre_msg *msg, int offset,
534                 struct mds_body *body, struct inode *inode, int lock)
535 {
536         struct mds_obd *mds = &obd->u.mds;
537         void *lmm;
538         int lmm_size;
539         int rc;
540         ENTRY;
541
542         lmm = lustre_msg_buf(msg, offset, 0);
543         if (lmm == NULL) {
544                 /* Some problem with getting eadata when I sized the reply
545                  * buffer... */
546                 CDEBUG(D_INFO, "no space reserved for inode %lu MD\n",
547                        inode->i_ino);
548                 RETURN(0);
549         }
550         lmm_size = msg->buflens[offset];
551
552         /* I don't really like this, but it is a sanity check on the client
553          * MD request.  However, if the client doesn't know how much space
554          * to reserve for the MD, it shouldn't be bad to have too much space.
555          */
556         if (lmm_size > mds->mds_max_mdsize) {
557                 CWARN("Reading MD for inode %lu of %d bytes > max %d\n",
558                        inode->i_ino, lmm_size, mds->mds_max_mdsize);
559                 // RETURN(-EINVAL);
560         }
561
562         rc = mds_get_md(obd, inode, lmm, &lmm_size, lock);
563         if (rc > 0) {
564                 if (S_ISDIR(inode->i_mode))
565                         body->valid |= OBD_MD_FLDIREA;
566                 else
567                         body->valid |= OBD_MD_FLEASIZE;
568                 body->eadatasize = lmm_size;
569                 rc = 0;
570         }
571
572         RETURN(rc);
573 }
574
575 #ifdef CONFIG_FS_POSIX_ACL
576 static
577 int mds_pack_posix_acl(struct inode *inode, struct lustre_msg *repmsg,
578                        struct mds_body *repbody, int repoff)
579 {
580         struct dentry de = { .d_inode = inode };
581         int buflen, rc;
582         ENTRY;
583
584         LASSERT(repbody->aclsize == 0);
585         LASSERT(repmsg->bufcount > repoff);
586
587         buflen = lustre_msg_buflen(repmsg, repoff);
588         if (!buflen)
589                 GOTO(out, 0);
590
591         if (!inode->i_op || !inode->i_op->getxattr)
592                 GOTO(out, 0);
593
594         lock_24kernel();
595         rc = inode->i_op->getxattr(&de, XATTR_NAME_ACL_ACCESS,
596                                    lustre_msg_buf(repmsg, repoff, buflen),
597                                    buflen);
598         unlock_24kernel();
599
600         if (rc >= 0)
601                 repbody->aclsize = rc;
602         else if (rc != -ENODATA) {
603                 CERROR("buflen %d, get acl: %d\n", buflen, rc);
604                 RETURN(rc);
605         }
606         EXIT;
607 out:
608         repbody->valid |= OBD_MD_FLACL;
609         return 0;
610 }
611 #else
612 #define mds_pack_posix_acl(inode, repmsg, repbody, repoff) 0
613 #endif
614
615 int mds_pack_acl(struct mds_export_data *med, struct inode *inode,
616                  struct lustre_msg *repmsg, struct mds_body *repbody,
617                  int repoff)
618 {
619         return mds_pack_posix_acl(inode, repmsg, repbody, repoff);
620 }
621
622 static int mds_getattr_internal(struct obd_device *obd, struct dentry *dentry,
623                                 struct ptlrpc_request *req,
624                                 struct mds_body *reqbody, int reply_off)
625 {
626         struct mds_body *body;
627         struct inode *inode = dentry->d_inode;
628         int rc = 0;
629         ENTRY;
630
631         if (inode == NULL)
632                 RETURN(-ENOENT);
633
634         body = lustre_msg_buf(req->rq_repmsg, reply_off, sizeof(*body));
635         LASSERT(body != NULL);                 /* caller prepped reply */
636
637         mds_pack_inode2fid(&body->fid1, inode);
638         mds_pack_inode2body(body, inode);
639         reply_off++;
640
641         if ((S_ISREG(inode->i_mode) && (reqbody->valid & OBD_MD_FLEASIZE)) ||
642             (S_ISDIR(inode->i_mode) && (reqbody->valid & OBD_MD_FLDIREA))) {
643                 rc = mds_pack_md(obd, req->rq_repmsg, reply_off, body,
644                                  inode, 1);
645
646                 /* If we have LOV EA data, the OST holds size, atime, mtime */
647                 if (!(body->valid & OBD_MD_FLEASIZE) &&
648                     !(body->valid & OBD_MD_FLDIREA))
649                         body->valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS |
650                                         OBD_MD_FLATIME | OBD_MD_FLMTIME);
651
652                 lustre_shrink_reply(req, reply_off, body->eadatasize, 0);
653                 if (body->eadatasize)
654                         reply_off++;
655         } else if (S_ISLNK(inode->i_mode) &&
656                    (reqbody->valid & OBD_MD_LINKNAME) != 0) {
657                 char *symname = lustre_msg_buf(req->rq_repmsg, reply_off, 0);
658                 int len;
659
660                 LASSERT (symname != NULL);       /* caller prepped reply */
661                 len = req->rq_repmsg->buflens[reply_off];
662
663                 rc = inode->i_op->readlink(dentry, symname, len);
664                 if (rc < 0) {
665                         CERROR("readlink failed: %d\n", rc);
666                 } else if (rc != len - 1) {
667                         CERROR ("Unexpected readlink rc %d: expecting %d\n",
668                                 rc, len - 1);
669                         rc = -EINVAL;
670                 } else {
671                         CDEBUG(D_INODE, "read symlink dest %s\n", symname);
672                         body->valid |= OBD_MD_LINKNAME;
673                         body->eadatasize = rc + 1;
674                         symname[rc] = 0;        /* NULL terminate */
675                         rc = 0;
676                 }
677                 reply_off++;
678         }
679
680         if (reqbody->valid & OBD_MD_FLMODEASIZE) {
681                 struct mds_obd *mds = mds_req2mds(req);
682                 body->max_cookiesize = mds->mds_max_cookiesize;
683                 body->max_mdsize = mds->mds_max_mdsize;
684                 body->valid |= OBD_MD_FLMODEASIZE;
685         }
686
687         if (rc)
688                 RETURN(rc);
689
690 #ifdef CONFIG_FS_POSIX_ACL
691         if ((req->rq_export->exp_connect_flags & OBD_CONNECT_ACL) &&
692             (reqbody->valid & OBD_MD_FLACL)) {
693                 rc = mds_pack_acl(&req->rq_export->exp_mds_data,
694                                   inode, req->rq_repmsg,
695                                   body, reply_off);
696
697                 lustre_shrink_reply(req, reply_off, body->aclsize, 0);
698                 if (body->aclsize)
699                         reply_off++;
700         }
701 #endif
702
703         RETURN(rc);
704 }
705
706 static int mds_getattr_pack_msg(struct ptlrpc_request *req, struct inode *inode,
707                                 int offset)
708 {
709         struct mds_obd *mds = mds_req2mds(req);
710         struct mds_body *body;
711         int rc, size[3] = {sizeof(*body)}, bufcount = 1;
712         ENTRY;
713
714         body = lustre_msg_buf(req->rq_reqmsg, offset, sizeof (*body));
715         LASSERT(body != NULL);                 /* checked by caller */
716         LASSERT_REQSWABBED(req, offset);       /* swabbed by caller */
717
718         if ((S_ISREG(inode->i_mode) && (body->valid & OBD_MD_FLEASIZE)) ||
719             (S_ISDIR(inode->i_mode) && (body->valid & OBD_MD_FLDIREA))) {
720                 LOCK_INODE_MUTEX(inode);
721                 rc = fsfilt_get_md(req->rq_export->exp_obd, inode, NULL, 0,
722                                    "lov");
723                 UNLOCK_INODE_MUTEX(inode);
724                 CDEBUG(D_INODE, "got %d bytes MD data for inode %lu\n",
725                        rc, inode->i_ino);
726                 if (rc < 0) {
727                         if (rc != -ENODATA) {
728                                 CERROR("error getting inode %lu MD: rc = %d\n",
729                                        inode->i_ino, rc);
730                                 RETURN(rc);
731                         }
732                         size[bufcount] = 0;
733                 } else if (rc > mds->mds_max_mdsize) {
734                         size[bufcount] = 0;
735                         CERROR("MD size %d larger than maximum possible %u\n",
736                                rc, mds->mds_max_mdsize);
737                 } else {
738                         size[bufcount] = rc;
739                 }
740                 bufcount++;
741         } else if (S_ISLNK(inode->i_mode) && (body->valid & OBD_MD_LINKNAME)) {
742                 if (inode->i_size + 1 != body->eadatasize)
743                         CERROR("symlink size: %Lu, reply space: %d\n",
744                                inode->i_size + 1, body->eadatasize);
745                 size[bufcount] = min_t(int, inode->i_size+1, body->eadatasize);
746                 bufcount++;
747                 CDEBUG(D_INODE, "symlink size: %Lu, reply space: %d\n",
748                        inode->i_size + 1, body->eadatasize);
749         }
750
751 #ifdef CONFIG_FS_POSIX_ACL
752         if ((req->rq_export->exp_connect_flags & OBD_CONNECT_ACL) &&
753             (body->valid & OBD_MD_FLACL)) {
754                 struct dentry de = { .d_inode = inode };
755
756                 size[bufcount] = 0;
757                 if (inode->i_op && inode->i_op->getxattr) {
758                         lock_24kernel();
759                         rc = inode->i_op->getxattr(&de, XATTR_NAME_ACL_ACCESS,
760                                                    NULL, 0);
761                         unlock_24kernel();
762
763                         if (rc < 0) {
764                                 if (rc != -ENODATA) {
765                                         CERROR("got acl size: %d\n", rc);
766                                         RETURN(rc);
767                                 }
768                         } else
769                                 size[bufcount] = rc;
770                 }
771                 bufcount++;
772         }
773 #endif
774
775         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_GETATTR_PACK)) {
776                 CERROR("failed MDS_GETATTR_PACK test\n");
777                 req->rq_status = -ENOMEM;
778                 RETURN(-ENOMEM);
779         }
780
781         rc = lustre_pack_reply(req, bufcount, size, NULL);
782         if (rc) {
783                 CERROR("lustre_pack_reply failed: rc %d\n", rc);
784                 req->rq_status = rc;
785                 RETURN(rc);
786         }
787
788         RETURN(0);
789 }
790
791 static int mds_getattr_name(int offset, struct ptlrpc_request *req,
792                             int child_part, struct lustre_handle *child_lockh)
793 {
794         struct obd_device *obd = req->rq_export->exp_obd;
795         struct mds_obd *mds = &obd->u.mds;
796         struct ldlm_reply *rep = NULL;
797         struct lvfs_run_ctxt saved;
798         struct mds_body *body;
799         struct dentry *dparent = NULL, *dchild = NULL;
800         struct lvfs_ucred uc = {NULL,};
801         struct lustre_handle parent_lockh;
802         int namesize;
803         int rc = 0, cleanup_phase = 0, resent_req = 0;
804         char *name;
805         ENTRY;
806
807         LASSERT(!strcmp(obd->obd_type->typ_name, LUSTRE_MDS_NAME));
808
809         /* Swab now, before anyone looks inside the request */
810
811         body = lustre_swab_reqbuf(req, offset, sizeof(*body),
812                                   lustre_swab_mds_body);
813         if (body == NULL) {
814                 CERROR("Can't swab mds_body\n");
815                 RETURN(-EFAULT);
816         }
817
818         LASSERT_REQSWAB(req, offset + 1);
819         name = lustre_msg_string(req->rq_reqmsg, offset + 1, 0);
820         if (name == NULL) {
821                 CERROR("Can't unpack name\n");
822                 RETURN(-EFAULT);
823         }
824         namesize = lustre_msg_buflen(req->rq_reqmsg, offset + 1);
825
826         rc = mds_init_ucred(&uc, req, offset);
827         if (rc)
828                 GOTO(cleanup, rc);
829
830         LASSERT (offset == MDS_REQ_REC_OFF || offset == MDS_REQ_INTENT_REC_OFF);
831         /* if requests were at offset 2, the getattr reply goes back at 1 */
832         if (offset == MDS_REQ_INTENT_REC_OFF) {
833                 rep = lustre_msg_buf(req->rq_repmsg, 0, sizeof (*rep));
834                 offset = 1;
835         }
836
837         push_ctxt(&saved, &obd->obd_lvfs_ctxt, &uc);
838         cleanup_phase = 1; /* kernel context */
839         intent_set_disposition(rep, DISP_LOOKUP_EXECD);
840
841         /* FIXME: handle raw lookup */
842 #if 0
843         if (body->valid == OBD_MD_FLID) {
844                 struct mds_body *mds_reply;
845                 int size = sizeof(*mds_reply);
846                 ino_t inum;
847                 // The user requested ONLY the inode number, so do a raw lookup
848                 rc = lustre_pack_reply(req, 1, &size, NULL);
849                 if (rc) {
850                         CERROR("out of memory\n");
851                         GOTO(cleanup, rc);
852                 }
853
854                 rc = dir->i_op->lookup_raw(dir, name, namesize - 1, &inum);
855
856                 mds_reply = lustre_msg_buf(req->rq_repmsg, offset,
857                                            sizeof(*mds_reply));
858                 mds_reply->fid1.id = inum;
859                 mds_reply->valid = OBD_MD_FLID;
860                 GOTO(cleanup, rc);
861         }
862 #endif
863
864         if (lustre_handle_is_used(child_lockh)) {
865                 LASSERT(lustre_msg_get_flags(req->rq_reqmsg) & MSG_RESENT);
866                 resent_req = 1;
867         }
868
869         if (resent_req == 0) {
870             if (name) {
871                 rc = mds_get_parent_child_locked(obd, &obd->u.mds, &body->fid1,
872                                                  &parent_lockh, &dparent,
873                                                  LCK_CR,
874                                                  MDS_INODELOCK_UPDATE,
875                                                  name, namesize,
876                                                  child_lockh, &dchild, LCK_CR,
877                                                  child_part);
878             } else {
879                         /* For revalidate by fid we always take UPDATE lock */
880                         dchild = mds_fid2locked_dentry(obd, &body->fid2, NULL,
881                                                        LCK_CR, child_lockh,
882                                                        MDS_INODELOCK_UPDATE);
883                         LASSERT(dchild);
884                         if (IS_ERR(dchild))
885                                 rc = PTR_ERR(dchild);
886             }
887             if (rc)
888                     GOTO(cleanup, rc);
889         } else {
890                 struct ldlm_lock *granted_lock;
891                 struct ll_fid child_fid;
892                 struct ldlm_resource *res;
893                 DEBUG_REQ(D_DLMTRACE, req, "resent, not enqueuing new locks");
894                 granted_lock = ldlm_handle2lock(child_lockh);
895                 LASSERTF(granted_lock != NULL, LPU64"/%u lockh "LPX64"\n",
896                          body->fid1.id, body->fid1.generation,
897                          child_lockh->cookie);
898
899
900                 res = granted_lock->l_resource;
901                 child_fid.id = res->lr_name.name[0];
902                 child_fid.generation = res->lr_name.name[1];
903                 dchild = mds_fid2dentry(&obd->u.mds, &child_fid, NULL);
904                 LASSERT(!IS_ERR(dchild));
905                 LDLM_LOCK_PUT(granted_lock);
906         }
907
908         cleanup_phase = 2; /* dchild, dparent, locks */
909
910         if (dchild->d_inode == NULL) {
911                 intent_set_disposition(rep, DISP_LOOKUP_NEG);
912                 /* in the intent case, the policy clears this error:
913                    the disposition is enough */
914                 GOTO(cleanup, rc = -ENOENT);
915         } else {
916                 intent_set_disposition(rep, DISP_LOOKUP_POS);
917         }
918
919         if (req->rq_repmsg == NULL) {
920                 rc = mds_getattr_pack_msg(req, dchild->d_inode, offset);
921                 if (rc != 0) {
922                         CERROR ("mds_getattr_pack_msg: %d\n", rc);
923                         GOTO (cleanup, rc);
924                 }
925         }
926
927         rc = mds_getattr_internal(obd, dchild, req, body, offset);
928         GOTO(cleanup, rc); /* returns the lock to the client */
929
930  cleanup:
931         switch (cleanup_phase) {
932         case 2:
933                 if (resent_req == 0) {
934                         if (rc && dchild->d_inode)
935                                 ldlm_lock_decref(child_lockh, LCK_CR);
936                         ldlm_lock_decref(&parent_lockh, LCK_CR);
937                         l_dput(dparent);
938                 }
939                 l_dput(dchild);
940         case 1:
941                 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, &uc);
942         default:
943                 mds_exit_ucred(&uc, mds);
944                 if (req->rq_reply_state == NULL) {
945                         req->rq_status = rc;
946                         lustre_pack_reply(req, 0, NULL, NULL);
947                 }
948         }
949         return rc;
950 }
951
952 static int mds_getattr(struct ptlrpc_request *req, int offset)
953 {
954         struct mds_obd *mds = mds_req2mds(req);
955         struct obd_device *obd = req->rq_export->exp_obd;
956         struct lvfs_run_ctxt saved;
957         struct dentry *de;
958         struct mds_body *body;
959         struct lvfs_ucred uc = {NULL,};
960         int rc = 0;
961         ENTRY;
962
963         body = lustre_swab_reqbuf(req, offset, sizeof(*body),
964                                   lustre_swab_mds_body);
965         if (body == NULL)
966                 RETURN(-EFAULT);
967
968         rc = mds_init_ucred(&uc, req, offset);
969         if (rc)
970                 GOTO(out_ucred, rc);
971
972         push_ctxt(&saved, &obd->obd_lvfs_ctxt, &uc);
973         de = mds_fid2dentry(mds, &body->fid1, NULL);
974         if (IS_ERR(de)) {
975                 rc = req->rq_status = PTR_ERR(de);
976                 GOTO(out_pop, rc);
977         }
978
979         rc = mds_getattr_pack_msg(req, de->d_inode, offset);
980         if (rc != 0) {
981                 CERROR("mds_getattr_pack_msg: %d\n", rc);
982                 GOTO(out_pop, rc);
983         }
984
985         req->rq_status = mds_getattr_internal(obd, de, req, body, 0);
986
987         l_dput(de);
988         GOTO(out_pop, rc);
989 out_pop:
990         pop_ctxt(&saved, &obd->obd_lvfs_ctxt, &uc);
991 out_ucred:
992         if (req->rq_reply_state == NULL) {
993                 req->rq_status = rc;
994                 lustre_pack_reply(req, 0, NULL, NULL);
995         }
996         mds_exit_ucred(&uc, mds);
997         return rc;
998 }
999
1000 static int mds_obd_statfs(struct obd_device *obd, struct obd_statfs *osfs,
1001                           unsigned long max_age)
1002 {
1003         int rc;
1004
1005         spin_lock(&obd->obd_osfs_lock);
1006         rc = fsfilt_statfs(obd, obd->u.obt.obt_sb, max_age);
1007         if (rc == 0)
1008                 memcpy(osfs, &obd->obd_osfs, sizeof(*osfs));
1009         spin_unlock(&obd->obd_osfs_lock);
1010
1011         return rc;
1012 }
1013
1014 static int mds_statfs(struct ptlrpc_request *req)
1015 {
1016         struct obd_device *obd = req->rq_export->exp_obd;
1017         int rc, size = sizeof(struct obd_statfs);
1018         ENTRY;
1019
1020         /* This will trigger a watchdog timeout */
1021         OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_STATFS_LCW_SLEEP,
1022                          (MDS_SERVICE_WATCHDOG_TIMEOUT / 1000) + 1);
1023
1024         rc = lustre_pack_reply(req, 1, &size, NULL);
1025         if (rc || OBD_FAIL_CHECK(OBD_FAIL_MDS_STATFS_PACK)) {
1026                 CERROR("mds: statfs lustre_pack_reply failed: rc = %d\n", rc);
1027                 GOTO(out, rc);
1028         }
1029
1030         /* We call this so that we can cache a bit - 1 jiffie worth */
1031         rc = mds_obd_statfs(obd, lustre_msg_buf(req->rq_repmsg, 0, size),
1032                             jiffies - HZ);
1033         if (rc) {
1034                 CERROR("mds_obd_statfs failed: rc %d\n", rc);
1035                 GOTO(out, rc);
1036         }
1037
1038         EXIT;
1039 out:
1040         req->rq_status = rc;
1041         return 0;
1042 }
1043
1044 static int mds_sync(struct ptlrpc_request *req, int offset)
1045 {
1046         struct obd_device *obd = req->rq_export->exp_obd;
1047         struct mds_obd *mds = &obd->u.mds;
1048         struct mds_body *body;
1049         int rc, size = sizeof(*body);
1050         ENTRY;
1051
1052         body = lustre_swab_reqbuf(req, 0, sizeof(*body), lustre_swab_mds_body);
1053         if (body == NULL)
1054                 GOTO(out, rc = -EFAULT);
1055
1056         rc = lustre_pack_reply(req, 1, &size, NULL);
1057         if (rc || OBD_FAIL_CHECK(OBD_FAIL_MDS_SYNC_PACK)) {
1058                 CERROR("fsync lustre_pack_reply failed: rc = %d\n", rc);
1059                 GOTO(out, rc);
1060         }
1061
1062         if (body->fid1.id == 0) {
1063                 /* a fid of zero is taken to mean "sync whole filesystem" */
1064                 rc = fsfilt_sync(obd, obd->u.obt.obt_sb);
1065                 GOTO(out, rc);
1066         } else {
1067                 struct dentry *de;
1068
1069                 de = mds_fid2dentry(mds, &body->fid1, NULL);
1070                 if (IS_ERR(de))
1071                         GOTO(out, rc = PTR_ERR(de));
1072
1073                 /* The file parameter isn't used for anything */
1074                 if (de->d_inode->i_fop && de->d_inode->i_fop->fsync)
1075                         rc = de->d_inode->i_fop->fsync(NULL, de, 1);
1076                 if (rc == 0) {
1077                         body = lustre_msg_buf(req->rq_repmsg, 0, sizeof(*body));
1078                         mds_pack_inode2fid(&body->fid1, de->d_inode);
1079                         mds_pack_inode2body(body, de->d_inode);
1080                 }
1081
1082                 l_dput(de);
1083                 GOTO(out, rc);
1084         }
1085 out:
1086         req->rq_status = rc;
1087         return 0;
1088 }
1089
1090 /* mds_readpage does not take a DLM lock on the inode, because the client must
1091  * already have a PR lock.
1092  *
1093  * If we were to take another one here, a deadlock will result, if another
1094  * thread is already waiting for a PW lock. */
1095 static int mds_readpage(struct ptlrpc_request *req, int offset)
1096 {
1097         struct obd_device *obd = req->rq_export->exp_obd;
1098         struct mds_obd *mds = &obd->u.mds;
1099         struct vfsmount *mnt;
1100         struct dentry *de;
1101         struct file *file;
1102         struct mds_body *body, *repbody;
1103         struct lvfs_run_ctxt saved;
1104         int rc, size = sizeof(*repbody);
1105         struct lvfs_ucred uc = {NULL,};
1106         ENTRY;
1107
1108         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_READPAGE_PACK))
1109                 RETURN(-ENOMEM);
1110
1111         rc = lustre_pack_reply(req, 1, &size, NULL);
1112         if (rc) {
1113                 CERROR("error packing readpage reply: rc %d\n", rc);
1114                 GOTO(out, rc);
1115         }
1116
1117         body = lustre_swab_reqbuf(req, offset, sizeof(*body),
1118                                   lustre_swab_mds_body);
1119         if (body == NULL)
1120                 GOTO (out, rc = -EFAULT);
1121
1122         rc = mds_init_ucred(&uc, req, 0);
1123         if (rc)
1124                 GOTO(out, rc);
1125
1126         push_ctxt(&saved, &obd->obd_lvfs_ctxt, &uc);
1127         de = mds_fid2dentry(&obd->u.mds, &body->fid1, &mnt);
1128         if (IS_ERR(de))
1129                 GOTO(out_pop, rc = PTR_ERR(de));
1130
1131         CDEBUG(D_INODE, "ino %lu\n", de->d_inode->i_ino);
1132
1133         file = dentry_open(de, mnt, O_RDONLY | O_LARGEFILE);
1134         /* note: in case of an error, dentry_open puts dentry */
1135         if (IS_ERR(file))
1136                 GOTO(out_pop, rc = PTR_ERR(file));
1137
1138         /* body->size is actually the offset -eeb */
1139         if ((body->size & (de->d_inode->i_blksize - 1)) != 0) {
1140                 CERROR("offset "LPU64" not on a block boundary of %lu\n",
1141                        body->size, de->d_inode->i_blksize);
1142                 GOTO(out_file, rc = -EFAULT);
1143         }
1144
1145         /* body->nlink is actually the #bytes to read -eeb */
1146         if (body->nlink & (de->d_inode->i_blksize - 1)) {
1147                 CERROR("size %u is not multiple of blocksize %lu\n",
1148                        body->nlink, de->d_inode->i_blksize);
1149                 GOTO(out_file, rc = -EFAULT);
1150         }
1151
1152         repbody = lustre_msg_buf(req->rq_repmsg, 0, sizeof (*repbody));
1153         repbody->size = file->f_dentry->d_inode->i_size;
1154         repbody->valid = OBD_MD_FLSIZE;
1155
1156         /* to make this asynchronous make sure that the handling function
1157            doesn't send a reply when this function completes. Instead a
1158            callback function would send the reply */
1159         /* body->size is actually the offset -eeb */
1160         rc = mds_sendpage(req, file, body->size, body->nlink);
1161
1162 out_file:
1163         filp_close(file, 0);
1164 out_pop:
1165         pop_ctxt(&saved, &obd->obd_lvfs_ctxt, &uc);
1166 out:
1167         mds_exit_ucred(&uc, mds);
1168         req->rq_status = rc;
1169         RETURN(0);
1170 }
1171
1172 int mds_reint(struct ptlrpc_request *req, int offset,
1173               struct lustre_handle *lockh)
1174 {
1175         struct mds_update_record *rec; /* 116 bytes on the stack?  no sir! */
1176         int rc;
1177
1178         OBD_ALLOC(rec, sizeof(*rec));
1179         if (rec == NULL)
1180                 RETURN(-ENOMEM);
1181
1182         rc = mds_update_unpack(req, offset, rec);
1183         if (rc || OBD_FAIL_CHECK(OBD_FAIL_MDS_REINT_UNPACK)) {
1184                 CERROR("invalid record\n");
1185                 GOTO(out, req->rq_status = -EINVAL);
1186         }
1187
1188         /* rc will be used to interrupt a for loop over multiple records */
1189         rc = mds_reint_rec(rec, offset, req, lockh);
1190  out:
1191         OBD_FREE(rec, sizeof(*rec));
1192         return rc;
1193 }
1194
1195 int mds_filter_recovery_request(struct ptlrpc_request *req,
1196                                 struct obd_device *obd, int *process)
1197 {
1198         switch (req->rq_reqmsg->opc) {
1199         case MDS_CONNECT: /* This will never get here, but for completeness. */
1200         case OST_CONNECT: /* This will never get here, but for completeness. */
1201         case MDS_DISCONNECT:
1202         case OST_DISCONNECT:
1203                *process = 1;
1204                RETURN(0);
1205
1206         case MDS_CLOSE:
1207         case MDS_SYNC: /* used in unmounting */
1208         case OBD_PING:
1209         case MDS_REINT:
1210         case LDLM_ENQUEUE:
1211                 *process = target_queue_recovery_request(req, obd);
1212                 RETURN(0);
1213
1214         default:
1215                 DEBUG_REQ(D_ERROR, req, "not permitted during recovery");
1216                 *process = 0;
1217                 /* XXX what should we set rq_status to here? */
1218                 req->rq_status = -EAGAIN;
1219                 RETURN(ptlrpc_error(req));
1220         }
1221 }
1222 EXPORT_SYMBOL(mds_filter_recovery_request);
1223
1224 static char *reint_names[] = {
1225         [REINT_SETATTR] "setattr",
1226         [REINT_CREATE]  "create",
1227         [REINT_LINK]    "link",
1228         [REINT_UNLINK]  "unlink",
1229         [REINT_RENAME]  "rename",
1230         [REINT_OPEN]    "open",
1231 };
1232
1233 static int mds_set_info_rpc(struct obd_export *exp, struct ptlrpc_request *req)
1234 {
1235         char *key;
1236         __u32 *val;
1237         int keylen, rc = 0;
1238         ENTRY;
1239
1240         key = lustre_msg_buf(req->rq_reqmsg, 0, 1);
1241         if (key == NULL) {
1242                 DEBUG_REQ(D_HA, req, "no set_info key");
1243                 RETURN(-EFAULT);
1244         }
1245         keylen = req->rq_reqmsg->buflens[0];
1246
1247         val = lustre_msg_buf(req->rq_reqmsg, 1, sizeof(*val));
1248         if (val == NULL) {
1249                 DEBUG_REQ(D_HA, req, "no set_info val");
1250                 RETURN(-EFAULT);
1251         }
1252
1253         rc = lustre_pack_reply(req, 0, NULL, NULL);
1254         if (rc)
1255                 RETURN(rc);
1256         req->rq_repmsg->status = 0;
1257
1258         if (keylen < strlen("read-only") ||
1259             memcmp(key, "read-only", keylen) != 0)
1260                 RETURN(-EINVAL);
1261
1262         if (*val)
1263                 exp->exp_connect_flags |= OBD_CONNECT_RDONLY;
1264         else
1265                 exp->exp_connect_flags &= ~OBD_CONNECT_RDONLY;
1266
1267         RETURN(0);
1268 }
1269
1270 static int mds_handle_quotacheck(struct ptlrpc_request *req)
1271 {
1272         struct obd_quotactl *oqctl;
1273         int rc;
1274         ENTRY;
1275
1276         oqctl = lustre_swab_reqbuf(req, 0, sizeof(*oqctl),
1277                                    lustre_swab_obd_quotactl);
1278         if (oqctl == NULL)
1279                 RETURN(-EPROTO);
1280
1281         rc = lustre_pack_reply(req, 0, NULL, NULL);
1282         if (rc) {
1283                 CERROR("mds: out of memory while packing quotacheck reply\n");
1284                 RETURN(rc);
1285         }
1286
1287         req->rq_status = obd_quotacheck(req->rq_export, oqctl);
1288         RETURN(0);
1289 }
1290
1291 static int mds_handle_quotactl(struct ptlrpc_request *req)
1292 {
1293         struct obd_quotactl *oqctl, *repoqc;
1294         int rc, size = sizeof(*repoqc);
1295         ENTRY;
1296
1297         oqctl = lustre_swab_reqbuf(req, 0, sizeof(*oqctl),
1298                                    lustre_swab_obd_quotactl);
1299         if (oqctl == NULL)
1300                 RETURN(-EPROTO);
1301
1302         rc = lustre_pack_reply(req, 1, &size, NULL);
1303         if (rc)
1304                 RETURN(rc);
1305
1306         repoqc = lustre_msg_buf(req->rq_repmsg, 0, sizeof(*repoqc));
1307
1308         req->rq_status = obd_quotactl(req->rq_export, oqctl);
1309         *repoqc = *oqctl;
1310         RETURN(0);
1311 }
1312
1313 int mds_msg_check_version(struct lustre_msg *msg)
1314 {
1315         int rc;
1316
1317         /* TODO: enable the below check while really introducing msg version.
1318          * it's disabled because it will break compatibility with b1_4.
1319          */
1320         return (0);
1321
1322         switch (msg->opc) {
1323         case MDS_CONNECT:
1324         case MDS_DISCONNECT:
1325         case OBD_PING:
1326                 rc = lustre_msg_check_version(msg, LUSTRE_OBD_VERSION);
1327                 if (rc)
1328                         CERROR("bad opc %u version %08x, expecting %08x\n",
1329                                msg->opc, msg->version, LUSTRE_OBD_VERSION);
1330                 break;
1331         case MDS_GETSTATUS:
1332         case MDS_GETATTR:
1333         case MDS_GETATTR_NAME:
1334         case MDS_STATFS:
1335         case MDS_READPAGE:
1336         case MDS_REINT:
1337         case MDS_CLOSE:
1338         case MDS_DONE_WRITING:
1339         case MDS_PIN:
1340         case MDS_SYNC:
1341         case MDS_GETXATTR:
1342         case MDS_SETXATTR:
1343         case MDS_SET_INFO:
1344         case MDS_QUOTACHECK:
1345         case MDS_QUOTACTL:
1346         case QUOTA_DQACQ:
1347         case QUOTA_DQREL:
1348                 rc = lustre_msg_check_version(msg, LUSTRE_MDS_VERSION);
1349                 if (rc)
1350                         CERROR("bad opc %u version %08x, expecting %08x\n",
1351                                msg->opc, msg->version, LUSTRE_MDS_VERSION);
1352                 break;
1353         case LDLM_ENQUEUE:
1354         case LDLM_CONVERT:
1355         case LDLM_BL_CALLBACK:
1356         case LDLM_CP_CALLBACK:
1357                 rc = lustre_msg_check_version(msg, LUSTRE_DLM_VERSION);
1358                 if (rc)
1359                         CERROR("bad opc %u version %08x, expecting %08x\n",
1360                                msg->opc, msg->version, LUSTRE_DLM_VERSION);
1361                 break;
1362         case OBD_LOG_CANCEL:
1363         case LLOG_ORIGIN_HANDLE_CREATE:
1364         case LLOG_ORIGIN_HANDLE_NEXT_BLOCK:
1365         case LLOG_ORIGIN_HANDLE_PREV_BLOCK:
1366         case LLOG_ORIGIN_HANDLE_READ_HEADER:
1367         case LLOG_ORIGIN_HANDLE_CLOSE:
1368         case LLOG_CATINFO:
1369                 rc = lustre_msg_check_version(msg, LUSTRE_LOG_VERSION);
1370                 if (rc)
1371                         CERROR("bad opc %u version %08x, expecting %08x\n",
1372                                msg->opc, msg->version, LUSTRE_LOG_VERSION);
1373                 break;
1374         default:
1375                 CERROR("MDS unknown opcode %d\n", msg->opc);
1376                 rc = -ENOTSUPP;
1377         }
1378         return rc;
1379 }
1380 EXPORT_SYMBOL(mds_msg_check_version);
1381
1382 int mds_handle(struct ptlrpc_request *req)
1383 {
1384         int should_process, fail = OBD_FAIL_MDS_ALL_REPLY_NET;
1385         int rc;
1386         struct mds_obd *mds = NULL; /* quell gcc overwarning */
1387         struct obd_device *obd = NULL;
1388         ENTRY;
1389
1390         OBD_FAIL_RETURN(OBD_FAIL_MDS_ALL_REQUEST_NET | OBD_FAIL_ONCE, 0);
1391
1392         LASSERT(current->journal_info == NULL);
1393
1394         rc = mds_msg_check_version(req->rq_reqmsg);
1395         if (rc) {
1396                 CERROR("MDS drop mal-formed request\n");
1397                 RETURN(rc);
1398         }
1399
1400         /* XXX identical to OST */
1401         if (req->rq_reqmsg->opc != MDS_CONNECT) {
1402                 struct mds_export_data *med;
1403                 int recovering, abort_recovery;
1404
1405                 if (req->rq_export == NULL) {
1406                         CERROR("operation %d on unconnected MDS from %s\n",
1407                                req->rq_reqmsg->opc,
1408                                libcfs_id2str(req->rq_peer));
1409                         req->rq_status = -ENOTCONN;
1410                         GOTO(out, rc = -ENOTCONN);
1411                 }
1412
1413                 med = &req->rq_export->exp_mds_data;
1414                 obd = req->rq_export->exp_obd;
1415                 mds = mds_req2mds(req);
1416
1417                 /* sanity check: if the xid matches, the request must
1418                  * be marked as a resent or replayed */
1419                 if (req->rq_xid == med->med_mcd->mcd_last_xid)
1420                         LASSERTF(lustre_msg_get_flags(req->rq_reqmsg) &
1421                                  (MSG_RESENT | MSG_REPLAY),
1422                                  "rq_xid "LPU64" matches last_xid, "
1423                                  "expected RESENT flag\n",
1424                                  req->rq_xid);
1425                 /* else: note the opposite is not always true; a
1426                  * RESENT req after a failover will usually not match
1427                  * the last_xid, since it was likely never
1428                  * committed. A REPLAYed request will almost never
1429                  * match the last xid, however it could for a
1430                  * committed, but still retained, open. */
1431
1432                 /* Check for aborted recovery. */
1433                 spin_lock_bh(&obd->obd_processing_task_lock);
1434                 abort_recovery = obd->obd_abort_recovery;
1435                 recovering = obd->obd_recovering;
1436                 spin_unlock_bh(&obd->obd_processing_task_lock);
1437                 if (abort_recovery) {
1438                         target_abort_recovery(obd);
1439                 } else if (recovering) {
1440                         rc = mds_filter_recovery_request(req, obd,
1441                                                          &should_process);
1442                         if (rc || !should_process)
1443                                 RETURN(rc);
1444                 }
1445         }
1446
1447         switch (req->rq_reqmsg->opc) {
1448         case MDS_CONNECT:
1449                 DEBUG_REQ(D_INODE, req, "connect");
1450                 OBD_FAIL_RETURN(OBD_FAIL_MDS_CONNECT_NET, 0);
1451                 rc = target_handle_connect(req, mds_handle);
1452                 if (!rc) {
1453                         /* Now that we have an export, set mds. */
1454                         /*
1455                          * XXX nikita: these assignments are useless: mds is
1456                          * never used below, and obd is only used for
1457                          * MSG_LAST_REPLAY case, which never happens for
1458                          * MDS_CONNECT.
1459                          */
1460                         obd = req->rq_export->exp_obd;
1461                         mds = mds_req2mds(req);
1462                 }
1463                 break;
1464
1465         case MDS_DISCONNECT:
1466                 DEBUG_REQ(D_INODE, req, "disconnect");
1467                 OBD_FAIL_RETURN(OBD_FAIL_MDS_DISCONNECT_NET, 0);
1468                 rc = target_handle_disconnect(req);
1469                 req->rq_status = rc;            /* superfluous? */
1470                 break;
1471
1472         case MDS_GETSTATUS:
1473                 DEBUG_REQ(D_INODE, req, "getstatus");
1474                 OBD_FAIL_RETURN(OBD_FAIL_MDS_GETSTATUS_NET, 0);
1475                 rc = mds_getstatus(req);
1476                 break;
1477
1478         case MDS_GETATTR:
1479                 DEBUG_REQ(D_INODE, req, "getattr");
1480                 OBD_FAIL_RETURN(OBD_FAIL_MDS_GETATTR_NET, 0);
1481                 rc = mds_getattr(req, MDS_REQ_REC_OFF);
1482                 break;
1483
1484         case MDS_SETXATTR:
1485                 DEBUG_REQ(D_INODE, req, "setxattr");
1486                 OBD_FAIL_RETURN(OBD_FAIL_MDS_SETXATTR_NET, 0);
1487                 rc = mds_setxattr(req);
1488                 break;
1489
1490         case MDS_GETXATTR:
1491                 DEBUG_REQ(D_INODE, req, "getxattr");
1492                 OBD_FAIL_RETURN(OBD_FAIL_MDS_GETXATTR_NET, 0);
1493                 rc = mds_getxattr(req);
1494                 break;
1495
1496         case MDS_GETATTR_NAME: {
1497                 struct lustre_handle lockh = { 0 };
1498                 DEBUG_REQ(D_INODE, req, "getattr_name");
1499                 OBD_FAIL_RETURN(OBD_FAIL_MDS_GETATTR_NAME_NET, 0);
1500
1501                 /* If this request gets a reconstructed reply, we won't be
1502                  * acquiring any new locks in mds_getattr_name, so we don't
1503                  * want to cancel.
1504                  */
1505                 rc = mds_getattr_name(MDS_REQ_REC_OFF, req,
1506                                       MDS_INODELOCK_UPDATE, &lockh);
1507                 /* this non-intent call (from an ioctl) is special */
1508                 req->rq_status = rc;
1509                 if (rc == 0 && lustre_handle_is_used(&lockh))
1510                         ldlm_lock_decref(&lockh, LCK_CR);
1511                 break;
1512         }
1513         case MDS_STATFS:
1514                 DEBUG_REQ(D_INODE, req, "statfs");
1515                 OBD_FAIL_RETURN(OBD_FAIL_MDS_STATFS_NET, 0);
1516                 rc = mds_statfs(req);
1517                 break;
1518
1519         case MDS_READPAGE:
1520                 DEBUG_REQ(D_INODE, req, "readpage");
1521                 OBD_FAIL_RETURN(OBD_FAIL_MDS_READPAGE_NET, 0);
1522                 rc = mds_readpage(req, MDS_REQ_REC_OFF);
1523
1524                 if (OBD_FAIL_CHECK_ONCE(OBD_FAIL_MDS_SENDPAGE)) {
1525                         RETURN(0);
1526                 }
1527
1528                 break;
1529
1530         case MDS_REINT: {
1531                 __u32 *opcp = lustre_msg_buf(req->rq_reqmsg, MDS_REQ_REC_OFF,
1532                                              sizeof (*opcp));
1533                 __u32  opc;
1534                 int size[] = { sizeof(struct mds_body), mds->mds_max_mdsize,
1535                                mds->mds_max_cookiesize};
1536                 int bufcount;
1537
1538                 /* NB only peek inside req now; mds_reint() will swab it */
1539                 if (opcp == NULL) {
1540                         CERROR ("Can't inspect opcode\n");
1541                         rc = -EINVAL;
1542                         break;
1543                 }
1544                 opc = *opcp;
1545                 if (lustre_msg_swabbed (req->rq_reqmsg))
1546                         __swab32s(&opc);
1547
1548                 DEBUG_REQ(D_INODE, req, "reint %d (%s)", opc,
1549                           (opc < sizeof(reint_names) / sizeof(reint_names[0]) ||
1550                            reint_names[opc] == NULL) ? reint_names[opc] :
1551                                                        "unknown opcode");
1552
1553                 OBD_FAIL_RETURN(OBD_FAIL_MDS_REINT_NET, 0);
1554
1555                 if (opc == REINT_UNLINK || opc == REINT_RENAME)
1556                         bufcount = 3;
1557                 else if (opc == REINT_OPEN)
1558                         bufcount = 2;
1559                 else
1560                         bufcount = 1;
1561
1562                 rc = lustre_pack_reply(req, bufcount, size, NULL);
1563                 if (rc)
1564                         break;
1565
1566                 rc = mds_reint(req, MDS_REQ_REC_OFF, NULL);
1567                 fail = OBD_FAIL_MDS_REINT_NET_REP;
1568                 break;
1569         }
1570
1571         case MDS_CLOSE:
1572                 DEBUG_REQ(D_INODE, req, "close");
1573                 OBD_FAIL_RETURN(OBD_FAIL_MDS_CLOSE_NET, 0);
1574                 rc = mds_close(req, MDS_REQ_REC_OFF);
1575                 break;
1576
1577         case MDS_DONE_WRITING:
1578                 DEBUG_REQ(D_INODE, req, "done_writing");
1579                 OBD_FAIL_RETURN(OBD_FAIL_MDS_DONE_WRITING_NET, 0);
1580                 rc = mds_done_writing(req, MDS_REQ_REC_OFF);
1581                 break;
1582
1583         case MDS_PIN:
1584                 DEBUG_REQ(D_INODE, req, "pin");
1585                 OBD_FAIL_RETURN(OBD_FAIL_MDS_PIN_NET, 0);
1586                 rc = mds_pin(req, MDS_REQ_REC_OFF);
1587                 break;
1588
1589         case MDS_SYNC:
1590                 DEBUG_REQ(D_INODE, req, "sync");
1591                 OBD_FAIL_RETURN(OBD_FAIL_MDS_SYNC_NET, 0);
1592                 rc = mds_sync(req, MDS_REQ_REC_OFF);
1593                 break;
1594
1595         case MDS_SET_INFO:
1596                 DEBUG_REQ(D_INODE, req, "set_info");
1597                 rc = mds_set_info_rpc(req->rq_export, req);
1598                 break;
1599
1600         case MDS_QUOTACHECK:
1601                 DEBUG_REQ(D_INODE, req, "quotacheck");
1602                 OBD_FAIL_RETURN(OBD_FAIL_MDS_QUOTACHECK_NET, 0);
1603                 rc = mds_handle_quotacheck(req);
1604                 break;
1605
1606         case MDS_QUOTACTL:
1607                 DEBUG_REQ(D_INODE, req, "quotactl");
1608                 OBD_FAIL_RETURN(OBD_FAIL_MDS_QUOTACTL_NET, 0);
1609                 rc = mds_handle_quotactl(req);
1610                 break;
1611
1612         case OBD_PING:
1613                 DEBUG_REQ(D_INODE, req, "ping");
1614                 rc = target_handle_ping(req);
1615                 break;
1616
1617         case OBD_LOG_CANCEL:
1618                 CDEBUG(D_INODE, "log cancel\n");
1619                 OBD_FAIL_RETURN(OBD_FAIL_OBD_LOG_CANCEL_NET, 0);
1620                 rc = -ENOTSUPP; /* la la la */
1621                 break;
1622
1623         case LDLM_ENQUEUE:
1624                 DEBUG_REQ(D_INODE, req, "enqueue");
1625                 OBD_FAIL_RETURN(OBD_FAIL_LDLM_ENQUEUE, 0);
1626                 rc = ldlm_handle_enqueue(req, ldlm_server_completion_ast,
1627                                          ldlm_server_blocking_ast, NULL);
1628                 fail = OBD_FAIL_LDLM_REPLY;
1629                 break;
1630         case LDLM_CONVERT:
1631                 DEBUG_REQ(D_INODE, req, "convert");
1632                 OBD_FAIL_RETURN(OBD_FAIL_LDLM_CONVERT, 0);
1633                 rc = ldlm_handle_convert(req);
1634                 break;
1635         case LDLM_BL_CALLBACK:
1636         case LDLM_CP_CALLBACK:
1637                 DEBUG_REQ(D_INODE, req, "callback");
1638                 CERROR("callbacks should not happen on MDS\n");
1639                 LBUG();
1640                 OBD_FAIL_RETURN(OBD_FAIL_LDLM_BL_CALLBACK, 0);
1641                 break;
1642         case LLOG_ORIGIN_HANDLE_CREATE:
1643                 DEBUG_REQ(D_INODE, req, "llog_init");
1644                 OBD_FAIL_RETURN(OBD_FAIL_OBD_LOGD_NET, 0);
1645                 rc = llog_origin_handle_create(req);
1646                 break;
1647         case LLOG_ORIGIN_HANDLE_DESTROY:
1648                 DEBUG_REQ(D_INODE, req, "llog_init");
1649                 OBD_FAIL_RETURN(OBD_FAIL_OBD_LOGD_NET, 0);
1650                 rc = llog_origin_handle_destroy(req);
1651                 break;
1652         case LLOG_ORIGIN_HANDLE_NEXT_BLOCK:
1653                 DEBUG_REQ(D_INODE, req, "llog next block");
1654                 OBD_FAIL_RETURN(OBD_FAIL_OBD_LOGD_NET, 0);
1655                 rc = llog_origin_handle_next_block(req);
1656                 break;
1657         case LLOG_ORIGIN_HANDLE_PREV_BLOCK:
1658                 DEBUG_REQ(D_INODE, req, "llog prev block");
1659                 OBD_FAIL_RETURN(OBD_FAIL_OBD_LOGD_NET, 0);
1660                 rc = llog_origin_handle_prev_block(req);
1661                 break;
1662         case LLOG_ORIGIN_HANDLE_READ_HEADER:
1663                 DEBUG_REQ(D_INODE, req, "llog read header");
1664                 OBD_FAIL_RETURN(OBD_FAIL_OBD_LOGD_NET, 0);
1665                 rc = llog_origin_handle_read_header(req);
1666                 break;
1667         case LLOG_ORIGIN_HANDLE_CLOSE:
1668                 DEBUG_REQ(D_INODE, req, "llog close");
1669                 OBD_FAIL_RETURN(OBD_FAIL_OBD_LOGD_NET, 0);
1670                 rc = llog_origin_handle_close(req);
1671                 break;
1672         case LLOG_CATINFO:
1673                 DEBUG_REQ(D_INODE, req, "llog catinfo");
1674                 OBD_FAIL_RETURN(OBD_FAIL_OBD_LOGD_NET, 0);
1675                 rc = llog_catinfo(req);
1676                 break;
1677         default:
1678                 req->rq_status = -ENOTSUPP;
1679                 rc = ptlrpc_error(req);
1680                 RETURN(rc);
1681         }
1682
1683         LASSERT(current->journal_info == NULL);
1684
1685         /* If we're DISCONNECTing, the mds_export_data is already freed */
1686         if (!rc && req->rq_reqmsg->opc != MDS_DISCONNECT) {
1687                 struct mds_export_data *med = &req->rq_export->exp_mds_data;
1688                 req->rq_repmsg->last_xid =
1689                         le64_to_cpu(med->med_mcd->mcd_last_xid);
1690
1691                 target_committed_to_req(req);
1692         }
1693
1694         EXIT;
1695  out:
1696
1697         if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_LAST_REPLAY) {
1698                 if (obd && obd->obd_recovering) {
1699                         DEBUG_REQ(D_HA, req, "LAST_REPLAY, queuing reply");
1700                         return target_queue_final_reply(req, rc);
1701                 }
1702                 /* Lost a race with recovery; let the error path DTRT. */
1703                 rc = req->rq_status = -ENOTCONN;
1704         }
1705
1706         target_send_reply(req, rc, fail);
1707         return 0;
1708 }
1709
1710 /* Update the server data on disk.  This stores the new mount_count and
1711  * also the last_rcvd value to disk.  If we don't have a clean shutdown,
1712  * then the server last_rcvd value may be less than that of the clients.
1713  * This will alert us that we may need to do client recovery.
1714  *
1715  * Also assumes for mds_last_transno that we are not modifying it (no locking).
1716  */
1717 int mds_update_server_data(struct obd_device *obd, int force_sync)
1718 {
1719         struct mds_obd *mds = &obd->u.mds;
1720         struct lr_server_data *lsd = mds->mds_server_data;
1721         struct lr_server_data *lsd_copy = NULL;
1722         struct file *filp = mds->mds_rcvd_filp;
1723         struct lvfs_run_ctxt saved;
1724         loff_t off = 0;
1725         int rc;
1726         ENTRY;
1727
1728         CDEBUG(D_SUPER, "MDS mount_count is "LPU64", last_transno is "LPU64"\n",
1729                mds->mds_mount_count, mds->mds_last_transno);
1730
1731         lsd->lsd_last_transno = cpu_to_le64(mds->mds_last_transno);
1732
1733         if (!(lsd->lsd_feature_incompat & cpu_to_le32(OBD_INCOMPAT_COMMON_LR))){
1734                 /* Swap to the old mds_server_data format, in case
1735                    someone wants to revert to a pre-1.6 lustre */
1736                 CDEBUG(D_CONFIG, "writing old last_rcvd format\n");
1737                 /* malloc new struct instead of swap in-place because
1738                    we don't have a lock on the last_trasno or mount count -
1739                    someone may modify it while we're here, and we don't want
1740                    them to inc the wrong thing. */
1741                 OBD_ALLOC(lsd_copy, sizeof(*lsd_copy));
1742                 if (!lsd_copy)
1743                         RETURN(-ENOMEM);
1744                 *lsd_copy = *lsd;
1745                 lsd_copy->lsd_unused = lsd->lsd_last_transno;
1746                 lsd_copy->lsd_last_transno = lsd->lsd_mount_count;
1747                 lsd = lsd_copy;
1748         }
1749
1750         push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
1751         rc = fsfilt_write_record(obd, filp, lsd, sizeof(*lsd), &off,force_sync);
1752         pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
1753         if (rc)
1754                 CERROR("error writing MDS server data: rc = %d\n", rc);
1755
1756         if (lsd_copy)
1757                 OBD_FREE(lsd_copy, sizeof(*lsd_copy));
1758
1759         RETURN(rc);
1760 }
1761
1762 static
1763 void fsoptions_to_mds_flags(struct mds_obd *mds, char *options)
1764 {
1765         char *p = options;
1766
1767         while (*options) {
1768                 int len;
1769
1770                 while (*p && *p != ',')
1771                         p++;
1772
1773                 len = p - options;
1774                 if (len == sizeof("user_xattr") - 1 &&
1775                     memcmp(options, "user_xattr", len) == 0) {
1776                         mds->mds_fl_user_xattr = 1;
1777                 } else if (len == sizeof("acl") - 1 &&
1778                          memcmp(options, "acl", len) == 0) {
1779 #ifdef CONFIG_FS_POSIX_ACL
1780                         mds->mds_fl_acl = 1;
1781 #else
1782                         CWARN("ignoring unsupported acl mount option\n");
1783                         memmove(options, p, strlen(p) + 1);
1784 #endif
1785                 }
1786
1787                 options = ++p;
1788         }
1789 }
1790 static int mds_lov_presetup (struct mds_obd *mds, struct lustre_cfg *lcfg)
1791 {
1792         int rc;
1793         ENTRY;
1794
1795         rc = llog_start_commit_thread();
1796         if (rc < 0)
1797                 RETURN(rc);
1798
1799         if (lcfg->lcfg_bufcount >= 4 && LUSTRE_CFG_BUFLEN(lcfg, 3) > 0) {
1800                 class_uuid_t uuid;
1801
1802                 generate_random_uuid(uuid);
1803                 class_uuid_unparse(uuid, &mds->mds_lov_uuid);
1804
1805                 OBD_ALLOC(mds->mds_profile, LUSTRE_CFG_BUFLEN(lcfg, 3));
1806                 if (mds->mds_profile == NULL)
1807                         RETURN(-ENOMEM);
1808
1809                 strncpy(mds->mds_profile, lustre_cfg_string(lcfg, 3),
1810                         LUSTRE_CFG_BUFLEN(lcfg, 3));
1811         }
1812         RETURN(rc);
1813 }
1814
1815 /* mount the file system (secretly).  lustre_cfg parameters are:
1816  * 1 = device
1817  * 2 = fstype
1818  * 3 = config name
1819  * 4 = mount options
1820  */
1821 static int mds_setup(struct obd_device *obd, struct lustre_cfg* lcfg)
1822 {
1823         struct lprocfs_static_vars lvars;
1824         struct mds_obd *mds = &obd->u.mds;
1825         struct lustre_mount_info *lmi;
1826         struct vfsmount *mnt;
1827         struct obd_uuid uuid;
1828         __u8 *uuid_ptr;
1829         char *options, *str, *label;
1830         char ns_name[48];
1831         unsigned long page;
1832         int rc = 0;
1833         ENTRY;
1834
1835         /* setup 1:/dev/loop/0 2:ext3 3:mdsA 4:errors=remount-ro,iopen_nopriv */
1836         
1837         CLASSERT(offsetof(struct obd_device, u.obt) ==
1838                  offsetof(struct obd_device, u.mds.mds_obt));
1839
1840         if (lcfg->lcfg_bufcount < 3)
1841                 RETURN(rc = -EINVAL);
1842
1843         if (LUSTRE_CFG_BUFLEN(lcfg, 1) == 0 || LUSTRE_CFG_BUFLEN(lcfg, 2) == 0)
1844                 RETURN(rc = -EINVAL);
1845
1846         lmi = server_get_mount(obd->obd_name);
1847         if (lmi) {
1848                 /* We already mounted in lustre_fill_super.
1849                    lcfg bufs 1, 2, 4 (device, fstype, mount opts) are ignored.*/
1850                 struct lustre_sb_info *lsi = s2lsi(lmi->lmi_sb);
1851                 mnt = lmi->lmi_mnt;
1852                 obd->obd_fsops = fsfilt_get_ops(MT_STR(lsi->lsi_ldd));
1853         } else {
1854                 /* old path - used by lctl */
1855                 CERROR("Using old MDS mount method\n");
1856                 page = __get_free_page(GFP_KERNEL);
1857                 if (!page)
1858                         RETURN(-ENOMEM);
1859
1860                 options = (char *)page;
1861                 memset(options, 0, PAGE_SIZE);
1862
1863                 /* here we use "iopen_nopriv" hardcoded, because it affects
1864                  * MDS utility and the rest of options are passed by mount
1865                  * options. Probably this should be moved to somewhere else
1866                  * like startup scripts or lconf. */
1867                 strcpy(options, "iopen_nopriv");
1868
1869                 if (LUSTRE_CFG_BUFLEN(lcfg, 4) > 0 && lustre_cfg_buf(lcfg, 4)) {
1870                         sprintf(options + strlen(options), ",%s",
1871                                 lustre_cfg_string(lcfg, 4));
1872                         fsoptions_to_mds_flags(mds, options);
1873                 }
1874
1875                 mnt = do_kern_mount(lustre_cfg_string(lcfg, 2), 0,
1876                                     lustre_cfg_string(lcfg, 1),
1877                                     (void *)options);
1878                 free_page(page);
1879                 if (IS_ERR(mnt)) {
1880                         rc = PTR_ERR(mnt);
1881                         LCONSOLE_ERROR("Can't mount disk %s (%d)\n",
1882                                        lustre_cfg_string(lcfg, 1), rc);
1883                         RETURN(rc);
1884                 }
1885
1886                 obd->obd_fsops = fsfilt_get_ops(lustre_cfg_string(lcfg, 2));
1887         }
1888         if (IS_ERR(obd->obd_fsops))
1889                 GOTO(err_put, rc = PTR_ERR(obd->obd_fsops));
1890
1891         CDEBUG(D_SUPER, "%s: mnt = %p\n", lustre_cfg_string(lcfg, 1), mnt);
1892
1893         LASSERT(!lvfs_check_rdonly(lvfs_sbdev(mnt->mnt_sb)));
1894
1895         //sema_init(&mds->mds_orphan_recovery_sem, 1);
1896         sema_init(&mds->mds_epoch_sem, 1);
1897         spin_lock_init(&mds->mds_transno_lock);
1898         mds->mds_max_mdsize = sizeof(struct lov_mds_md);
1899         mds->mds_max_cookiesize = sizeof(struct llog_cookie);
1900         mds->mds_atime_diff = MAX_ATIME_DIFF;
1901
1902         sprintf(ns_name, "mds-%s", obd->obd_uuid.uuid);
1903         obd->obd_namespace = ldlm_namespace_new(ns_name, LDLM_NAMESPACE_SERVER);
1904         if (obd->obd_namespace == NULL) {
1905                 mds_cleanup(obd);
1906                 GOTO(err_ops, rc = -ENOMEM);
1907         }
1908         ldlm_register_intent(obd->obd_namespace, mds_intent_policy);
1909
1910         rc = mds_fs_setup(obd, mnt);
1911         if (rc) {
1912                 CERROR("%s: MDS filesystem method init failed: rc = %d\n",
1913                        obd->obd_name, rc);
1914                 GOTO(err_ns, rc);
1915         }
1916
1917         rc = mds_lov_presetup(mds, lcfg);
1918         if (rc < 0)
1919                 GOTO(err_fs, rc);
1920
1921         ptlrpc_init_client(LDLM_CB_REQUEST_PORTAL, LDLM_CB_REPLY_PORTAL,
1922                            "mds_ldlm_client", &obd->obd_ldlm_client);
1923         obd->obd_replayable = 1;
1924
1925         rc = lquota_setup(quota_interface, obd, lcfg);
1926         if (rc)
1927                 GOTO(err_fs, rc);
1928
1929         mds->mds_group_hash = upcall_cache_init(obd->obd_name);
1930         if (IS_ERR(mds->mds_group_hash)) {
1931                 rc = PTR_ERR(mds->mds_group_hash);
1932                 mds->mds_group_hash = NULL;
1933                 GOTO(err_qctxt, rc);
1934         }
1935
1936         /* Don't wait for mds_postrecov trying to clear orphans */
1937         obd->obd_async_recov = 1;
1938         rc = mds_postsetup(obd);
1939         obd->obd_async_recov = 0;
1940         if (rc)
1941                 GOTO(err_qctxt, rc);
1942
1943         lprocfs_init_vars(mds, &lvars);
1944         lprocfs_obd_setup(obd, lvars.obd_vars);
1945
1946         uuid_ptr = fsfilt_uuid(obd, obd->u.obt.obt_sb);
1947         if (uuid_ptr != NULL) {
1948                 class_uuid_unparse(uuid_ptr, &uuid);
1949                 str = uuid.uuid;
1950         } else {
1951                 str = "no UUID";
1952         }
1953
1954         label = fsfilt_get_label(obd, obd->u.obt.obt_sb);
1955         if (obd->obd_recovering) {
1956                 LCONSOLE_WARN("MDT %s now serving %s (%s%s%s), but will be in "
1957                               "recovery until %d %s reconnect, or if no clients"
1958                               " reconnect for %d:%.02d; during that time new "
1959                               "clients will not be allowed to connect. "
1960                               "Recovery progress can be monitored by watching "
1961                               "/proc/fs/lustre/mds/%s/recovery_status.\n",
1962                               obd->obd_name, lustre_cfg_string(lcfg, 1),
1963                               label ?: "", label ? "/" : "", str,
1964                               obd->obd_recoverable_clients,
1965                               (obd->obd_recoverable_clients == 1) ?
1966                               "client" : "clients",
1967                               (int)(OBD_RECOVERY_TIMEOUT) / 60,
1968                               (int)(OBD_RECOVERY_TIMEOUT) % 60,
1969                               obd->obd_name);
1970         } else {
1971                 LCONSOLE_INFO("MDT %s now serving %s (%s%s%s) with recovery "
1972                               "%s\n", obd->obd_name, lustre_cfg_string(lcfg, 1),
1973                               label ?: "", label ? "/" : "", str,
1974                               obd->obd_replayable ? "enabled" : "disabled");
1975         }
1976
1977         ldlm_timeout = 2;
1978
1979         RETURN(0);
1980
1981 err_qctxt:
1982         lquota_cleanup(quota_interface, obd);
1983 err_fs:
1984         /* No extra cleanup needed for llog_init_commit_thread() */
1985         mds_fs_cleanup(obd);
1986         upcall_cache_cleanup(mds->mds_group_hash);
1987         mds->mds_group_hash = NULL;
1988 err_ns:
1989         ldlm_namespace_free(obd->obd_namespace, 0);
1990         obd->obd_namespace = NULL;
1991 err_ops:
1992         fsfilt_put_ops(obd->obd_fsops);
1993 err_put:
1994         if (lmi) {
1995                 server_put_mount(obd->obd_name, mds->mds_vfsmnt);
1996         } else {
1997                 /* old method */
1998                 unlock_kernel();
1999                 mntput(mds->mds_vfsmnt);
2000                 lock_kernel();
2001         }
2002         obd->u.obt.obt_sb = NULL;
2003         return rc;
2004 }
2005
2006 static int mds_lov_clean(struct obd_device *obd)
2007 {
2008         struct mds_obd *mds = &obd->u.mds;
2009         struct obd_device *osc = mds->mds_osc_obd;
2010         ENTRY;
2011
2012         if (mds->mds_profile) {
2013                 class_del_profile(mds->mds_profile);
2014                 OBD_FREE(mds->mds_profile, strlen(mds->mds_profile) + 1);
2015                 mds->mds_profile = NULL;
2016         }
2017
2018         /* There better be a lov */
2019         if (!osc)
2020                 RETURN(0);
2021         if (IS_ERR(osc))
2022                 RETURN(PTR_ERR(osc));
2023
2024         obd_register_observer(osc, NULL);
2025
2026         /* Give lov our same shutdown flags */
2027         osc->obd_force = obd->obd_force;
2028         osc->obd_fail = obd->obd_fail;
2029
2030         /* Cleanup the lov */
2031         obd_disconnect(mds->mds_osc_exp);
2032         class_manual_cleanup(osc);
2033         mds->mds_osc_exp = NULL;
2034
2035         RETURN(0);
2036 }
2037
2038 static int mds_postsetup(struct obd_device *obd)
2039 {
2040         struct mds_obd *mds = &obd->u.mds;
2041         int rc = 0;
2042         ENTRY;
2043
2044         rc = llog_setup(obd, LLOG_CONFIG_ORIG_CTXT, obd, 0, NULL,
2045                         &llog_lvfs_ops);
2046         if (rc)
2047                 RETURN(rc);
2048
2049         rc = llog_setup(obd, LLOG_LOVEA_ORIG_CTXT, obd, 0, NULL,
2050                         &llog_lvfs_ops);
2051         if (rc)
2052                 RETURN(rc);
2053
2054         if (mds->mds_profile) {
2055                 struct lustre_profile *lprof;
2056                 /* The profile defines which osc and mdc to connect to, for a
2057                    client.  We reuse that here to figure out the name of the
2058                    lov to use (and ignore lprof->lp_mdc).
2059                    The profile was set in the config log with
2060                    LCFG_MOUNTOPT profilenm oscnm mdcnm */
2061                 lprof = class_get_profile(mds->mds_profile);
2062                 if (lprof == NULL) {
2063                         CERROR("No profile found: %s\n", mds->mds_profile);
2064                         GOTO(err_cleanup, rc = -ENOENT);
2065                 }
2066                 rc = mds_lov_connect(obd, lprof->lp_osc);
2067                 if (rc)
2068                         GOTO(err_cleanup, rc);
2069         }
2070
2071         RETURN(rc);
2072
2073 err_cleanup:
2074         mds_lov_clean(obd);
2075         llog_cleanup(llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT));
2076         llog_cleanup(llog_get_context(obd, LLOG_LOVEA_ORIG_CTXT));
2077         RETURN(rc);
2078 }
2079
2080 int mds_postrecov(struct obd_device *obd)
2081 {
2082         int rc;
2083         ENTRY;
2084
2085         if (obd->obd_fail)
2086                 RETURN(0);
2087
2088         LASSERT(!obd->obd_recovering);
2089         LASSERT(llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT) != NULL);
2090
2091         /* FIXME why not put this in the synchronize? */
2092         /* set nextid first, so we are sure it happens */
2093         rc = mds_lov_set_nextid(obd);
2094         if (rc) {
2095                 CERROR("%s: mds_lov_set_nextid failed %d\n",
2096                        obd->obd_name, rc);
2097                 GOTO(out, rc);
2098         }
2099
2100         /* clean PENDING dir */
2101         if (strcmp(obd->obd_name, MDD_OBD_NAME))
2102                 rc = mds_cleanup_pending(obd);
2103                 if (rc < 0)
2104                         GOTO(out, rc);
2105
2106         /* FIXME Does target_finish_recovery really need this to block? */
2107         /* Notify the LOV, which will in turn call mds_notify for each tgt */
2108         /* This means that we have to hack obd_notify to think we're obd_set_up
2109            during mds_lov_connect. */
2110         obd_notify(obd->u.mds.mds_osc_obd, NULL,
2111                    obd->obd_async_recov ? OBD_NOTIFY_SYNC_NONBLOCK :
2112                    OBD_NOTIFY_SYNC, NULL);
2113
2114         /* quota recovery */
2115         lquota_recovery(quota_interface, obd);
2116
2117 out:
2118         RETURN(rc);
2119 }
2120
2121 /* We need to be able to stop an mds_lov_synchronize */
2122 static int mds_lov_early_clean(struct obd_device *obd)
2123 {
2124         struct mds_obd *mds = &obd->u.mds;
2125         struct obd_device *osc = mds->mds_osc_obd;
2126
2127         if (!osc || (!obd->obd_force && !obd->obd_fail))
2128                 return(0);
2129
2130         CDEBUG(D_HA, "abort inflight\n");
2131         return (obd_precleanup(osc, OBD_CLEANUP_EARLY));
2132 }
2133
2134 static int mds_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
2135 {
2136         int rc = 0;
2137         ENTRY;
2138
2139         switch (stage) {
2140         case OBD_CLEANUP_EARLY:
2141                 break;
2142         case OBD_CLEANUP_EXPORTS:
2143                 /*XXX Use this for mdd mds cleanup, so comment out 
2144                  *this target_cleanup_recovery for this tmp MDD MDS
2145                  *Wangdi*/
2146                 if (strcmp(obd->obd_name, MDD_OBD_NAME))
2147                         target_cleanup_recovery(obd); 
2148                 mds_lov_early_clean(obd);
2149                 break;
2150         case OBD_CLEANUP_SELF_EXP:
2151                 mds_lov_disconnect(obd);
2152                 mds_lov_clean(obd);
2153                 llog_cleanup(llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT));
2154                 llog_cleanup(llog_get_context(obd, LLOG_LOVEA_ORIG_CTXT));
2155                 rc = obd_llog_finish(obd, 0);
2156                 break;
2157         case OBD_CLEANUP_OBD:
2158                 break;
2159         }
2160         RETURN(rc);
2161 }
2162
2163 static int mds_cleanup(struct obd_device *obd)
2164 {
2165         struct mds_obd *mds = &obd->u.mds;
2166         lvfs_sbdev_type save_dev;
2167         int must_put = 0;
2168         int must_relock = 0;
2169         ENTRY;
2170
2171         if (obd->u.obt.obt_sb == NULL)
2172                 RETURN(0);
2173         save_dev = lvfs_sbdev(obd->u.obt.obt_sb);
2174
2175         if (mds->mds_osc_exp)
2176                 /* lov export was disconnected by mds_lov_clean;
2177                    we just need to drop our ref */
2178                 class_export_put(mds->mds_osc_exp);
2179
2180         lprocfs_obd_cleanup(obd);
2181
2182         lquota_cleanup(quota_interface, obd);
2183
2184         mds_update_server_data(obd, 1);
2185         if (mds->mds_lov_objids != NULL)
2186                 OBD_FREE(mds->mds_lov_objids, mds->mds_lov_objids_size);
2187         mds_fs_cleanup(obd);
2188
2189         upcall_cache_cleanup(mds->mds_group_hash);
2190         mds->mds_group_hash = NULL;
2191
2192         must_put = server_put_mount(obd->obd_name, mds->mds_vfsmnt);
2193         /* must_put is for old method (l_p_m returns non-0 on err) */
2194
2195         /* We can only unlock kernel if we are in the context of sys_ioctl,
2196            otherwise we never called lock_kernel */
2197         if (ll_kernel_locked()) {
2198                 unlock_kernel();
2199                 must_relock++;
2200         }
2201
2202         if (must_put)
2203                 /* In case we didn't mount with lustre_get_mount -- old method*/
2204                 mntput(mds->mds_vfsmnt);
2205         obd->u.obt.obt_sb = NULL;
2206
2207         ldlm_namespace_free(obd->obd_namespace, obd->obd_force);
2208
2209         spin_lock_bh(&obd->obd_processing_task_lock);
2210         if (obd->obd_recovering) {
2211                 target_cancel_recovery_timer(obd);
2212                 obd->obd_recovering = 0;
2213         }
2214         spin_unlock_bh(&obd->obd_processing_task_lock);
2215
2216         lvfs_clear_rdonly(save_dev);
2217
2218         if (must_relock)
2219                 lock_kernel();
2220
2221         fsfilt_put_ops(obd->obd_fsops);
2222
2223         LCONSOLE_INFO("MDT %s has stopped.\n", obd->obd_name);
2224
2225         RETURN(0);
2226 }
2227
2228 static void fixup_handle_for_resent_req(struct ptlrpc_request *req, int offset,
2229                                         struct ldlm_lock *new_lock,
2230                                         struct ldlm_lock **old_lock,
2231                                         struct lustre_handle *lockh)
2232 {
2233         struct obd_export *exp = req->rq_export;
2234         struct obd_device *obd = exp->exp_obd;
2235         struct ldlm_request *dlmreq =
2236                 lustre_msg_buf(req->rq_reqmsg, offset, sizeof (*dlmreq));
2237         struct lustre_handle remote_hdl = dlmreq->lock_handle1;
2238         struct list_head *iter;
2239
2240         if (!(lustre_msg_get_flags(req->rq_reqmsg) & MSG_RESENT))
2241                 return;
2242
2243         l_lock(&obd->obd_namespace->ns_lock);
2244         list_for_each(iter, &exp->exp_ldlm_data.led_held_locks) {
2245                 struct ldlm_lock *lock;
2246                 lock = list_entry(iter, struct ldlm_lock, l_export_chain);
2247                 if (lock == new_lock)
2248                         continue;
2249                 if (lock->l_remote_handle.cookie == remote_hdl.cookie) {
2250                         lockh->cookie = lock->l_handle.h_cookie;
2251                         LDLM_DEBUG(lock, "restoring lock cookie");
2252                         DEBUG_REQ(D_HA, req, "restoring lock cookie "LPX64,
2253                                   lockh->cookie);
2254                         if (old_lock)
2255                                 *old_lock = LDLM_LOCK_GET(lock);
2256                         l_unlock(&obd->obd_namespace->ns_lock);
2257                         return;
2258                 }
2259         }
2260         l_unlock(&obd->obd_namespace->ns_lock);
2261
2262         /* If the xid matches, then we know this is a resent request,
2263          * and allow it. (It's probably an OPEN, for which we don't
2264          * send a lock */
2265         if (req->rq_xid ==
2266             le64_to_cpu(exp->exp_mds_data.med_mcd->mcd_last_xid))
2267                 return;
2268
2269         /* This remote handle isn't enqueued, so we never received or
2270          * processed this request.  Clear MSG_RESENT, because it can
2271          * be handled like any normal request now. */
2272
2273         lustre_msg_clear_flags(req->rq_reqmsg, MSG_RESENT);
2274
2275         DEBUG_REQ(D_HA, req, "no existing lock with rhandle "LPX64,
2276                   remote_hdl.cookie);
2277 }
2278
2279 int intent_disposition(struct ldlm_reply *rep, int flag)
2280 {
2281         if (!rep)
2282                 return 0;
2283         return (rep->lock_policy_res1 & flag);
2284 }
2285
2286 void intent_set_disposition(struct ldlm_reply *rep, int flag)
2287 {
2288         if (!rep)
2289                 return;
2290         rep->lock_policy_res1 |= flag;
2291 }
2292
2293 static int mds_intent_policy(struct ldlm_namespace *ns,
2294                              struct ldlm_lock **lockp, void *req_cookie,
2295                              ldlm_mode_t mode, int flags, void *data)
2296 {
2297         struct ptlrpc_request *req = req_cookie;
2298         struct ldlm_lock *lock = *lockp;
2299         struct ldlm_intent *it;
2300         struct mds_obd *mds = &req->rq_export->exp_obd->u.mds;
2301         struct ldlm_reply *rep;
2302         struct lustre_handle lockh = { 0 };
2303         struct ldlm_lock *new_lock = NULL;
2304         int getattr_part = MDS_INODELOCK_UPDATE;
2305         int repsize[4] = {sizeof(*rep),
2306                           sizeof(struct mds_body),
2307                           mds->mds_max_mdsize};
2308         int repbufcnt = 3, offset = MDS_REQ_INTENT_REC_OFF;
2309         int rc;
2310         ENTRY;
2311
2312         LASSERT(req != NULL);
2313
2314         if (req->rq_reqmsg->bufcount <= MDS_REQ_INTENT_IT_OFF) {
2315                 /* No intent was provided */
2316                 int size = sizeof(struct ldlm_reply);
2317                 rc = lustre_pack_reply(req, 1, &size, NULL);
2318                 LASSERT(rc == 0);
2319                 RETURN(0);
2320         }
2321
2322         it = lustre_swab_reqbuf(req, MDS_REQ_INTENT_IT_OFF, sizeof(*it),
2323                                 lustre_swab_ldlm_intent);
2324         if (it == NULL) {
2325                 CERROR("Intent missing\n");
2326                 RETURN(req->rq_status = -EFAULT);
2327         }
2328
2329         LDLM_DEBUG(lock, "intent policy, opc: %s", ldlm_it2str(it->opc));
2330
2331         if ((req->rq_export->exp_connect_flags & OBD_CONNECT_ACL) &&
2332             (it->opc & (IT_OPEN | IT_GETATTR | IT_LOOKUP)))
2333                 /* we should never allow OBD_CONNECT_ACL if not configured */
2334                 repsize[repbufcnt++] = LUSTRE_POSIX_ACL_MAX_SIZE;
2335         else if (it->opc & IT_UNLINK)
2336                 repsize[repbufcnt++] = mds->mds_max_cookiesize;
2337
2338         rc = lustre_pack_reply(req, repbufcnt, repsize, NULL);
2339         if (rc)
2340                 RETURN(req->rq_status = rc);
2341
2342         rep = lustre_msg_buf(req->rq_repmsg, 0, sizeof (*rep));
2343         intent_set_disposition(rep, DISP_IT_EXECD);
2344
2345
2346         /* execute policy */
2347         switch ((long)it->opc) {
2348         case IT_OPEN:
2349         case IT_CREAT|IT_OPEN:
2350                 fixup_handle_for_resent_req(req, MDS_REQ_INTENT_LOCKREQ_OFF,
2351                                             lock, NULL, &lockh);
2352                 /* XXX swab here to assert that an mds_open reint
2353                  * packet is following */
2354                 rep->lock_policy_res2 = mds_reint(req, offset, &lockh);
2355 #if 0
2356                 /* We abort the lock if the lookup was negative and
2357                  * we did not make it to the OPEN portion */
2358                 if (!intent_disposition(rep, DISP_LOOKUP_EXECD))
2359                         RETURN(ELDLM_LOCK_ABORTED);
2360                 if (intent_disposition(rep, DISP_LOOKUP_NEG) &&
2361                     !intent_disposition(rep, DISP_OPEN_OPEN))
2362 #endif
2363                         RETURN(ELDLM_LOCK_ABORTED);
2364                 break;
2365         case IT_LOOKUP:
2366                         getattr_part = MDS_INODELOCK_LOOKUP;
2367         case IT_GETATTR:
2368                         getattr_part |= MDS_INODELOCK_LOOKUP;
2369         case IT_READDIR:
2370                 fixup_handle_for_resent_req(req, MDS_REQ_INTENT_LOCKREQ_OFF,
2371                                             lock, &new_lock, &lockh);
2372
2373                 /* INODEBITS_INTEROP: if this lock was converted from a
2374                  * plain lock (client does not support inodebits), then
2375                  * child lock must be taken with both lookup and update
2376                  * bits set for all operations.
2377                  */
2378                 if (!(req->rq_export->exp_connect_flags & OBD_CONNECT_IBITS))
2379                         getattr_part = MDS_INODELOCK_LOOKUP |
2380                                        MDS_INODELOCK_UPDATE;
2381
2382                 rep->lock_policy_res2 = mds_getattr_name(offset, req,
2383                                                          getattr_part, &lockh);
2384                 /* FIXME: LDLM can set req->rq_status. MDS sets
2385                    policy_res{1,2} with disposition and status.
2386                    - replay: returns 0 & req->status is old status
2387                    - otherwise: returns req->status */
2388                 if (intent_disposition(rep, DISP_LOOKUP_NEG))
2389                         rep->lock_policy_res2 = 0;
2390                 if (!intent_disposition(rep, DISP_LOOKUP_POS) ||
2391                     rep->lock_policy_res2)
2392                         RETURN(ELDLM_LOCK_ABORTED);
2393                 if (req->rq_status != 0) {
2394                         LBUG();
2395                         rep->lock_policy_res2 = req->rq_status;
2396                         RETURN(ELDLM_LOCK_ABORTED);
2397                 }
2398                 break;
2399         default:
2400                 CERROR("Unhandled intent "LPD64"\n", it->opc);
2401                 RETURN(-EFAULT);
2402         }
2403
2404         /* By this point, whatever function we called above must have either
2405          * filled in 'lockh', been an intent replay, or returned an error.  We
2406          * want to allow replayed RPCs to not get a lock, since we would just
2407          * drop it below anyways because lock replay is done separately by the
2408          * client afterwards.  For regular RPCs we want to give the new lock to
2409          * the client instead of whatever lock it was about to get. */
2410         if (new_lock == NULL)
2411                 new_lock = ldlm_handle2lock(&lockh);
2412         if (new_lock == NULL && (flags & LDLM_FL_INTENT_ONLY))
2413                 RETURN(0);
2414
2415         LASSERTF(new_lock != NULL, "op "LPX64" lockh "LPX64"\n",
2416                  it->opc, lockh.cookie);
2417
2418         /* If we've already given this lock to a client once, then we should
2419          * have no readers or writers.  Otherwise, we should have one reader
2420          * _or_ writer ref (which will be zeroed below) before returning the
2421          * lock to a client. */
2422         if (new_lock->l_export == req->rq_export) {
2423                 LASSERT(new_lock->l_readers + new_lock->l_writers == 0);
2424         } else {
2425                 LASSERT(new_lock->l_export == NULL);
2426                 LASSERT(new_lock->l_readers + new_lock->l_writers == 1);
2427         }
2428
2429         *lockp = new_lock;
2430
2431         if (new_lock->l_export == req->rq_export) {
2432                 /* Already gave this to the client, which means that we
2433                  * reconstructed a reply. */
2434                 LASSERT(lustre_msg_get_flags(req->rq_reqmsg) &
2435                         MSG_RESENT);
2436                 RETURN(ELDLM_LOCK_REPLACED);
2437         }
2438
2439         /* Fixup the lock to be given to the client */
2440         l_lock(&new_lock->l_resource->lr_namespace->ns_lock);
2441         new_lock->l_readers = 0;
2442         new_lock->l_writers = 0;
2443
2444         new_lock->l_export = class_export_get(req->rq_export);
2445         list_add(&new_lock->l_export_chain,
2446                  &new_lock->l_export->exp_ldlm_data.led_held_locks);
2447
2448         new_lock->l_blocking_ast = lock->l_blocking_ast;
2449         new_lock->l_completion_ast = lock->l_completion_ast;
2450
2451         memcpy(&new_lock->l_remote_handle, &lock->l_remote_handle,
2452                sizeof(lock->l_remote_handle));
2453
2454         new_lock->l_flags &= ~LDLM_FL_LOCAL;
2455
2456         LDLM_LOCK_PUT(new_lock);
2457         l_unlock(&new_lock->l_resource->lr_namespace->ns_lock);
2458
2459         RETURN(ELDLM_LOCK_REPLACED);
2460 }
2461
2462 static int mdt_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
2463 {
2464         struct mds_obd *mds = &obd->u.mds;
2465         struct lprocfs_static_vars lvars;
2466         int rc = 0;
2467         ENTRY;
2468
2469         lprocfs_init_vars(mdt, &lvars);
2470         lprocfs_obd_setup(obd, lvars.obd_vars);
2471
2472         sema_init(&mds->mds_health_sem, 1);
2473
2474         if (mds_num_threads < 2)
2475                 mds_num_threads = MDT_NUM_THREADS;
2476         if (mds_num_threads > MDT_MAX_THREADS)
2477                 mds_num_threads = MDT_MAX_THREADS;
2478
2479         mds->mds_service =
2480                 ptlrpc_init_svc(MDS_NBUFS, MDS_BUFSIZE, MDS_MAXREQSIZE,
2481                                 MDS_MAXREPSIZE, MDS_REQUEST_PORTAL,
2482                                 MDC_REPLY_PORTAL, MDS_SERVICE_WATCHDOG_TIMEOUT,
2483                                 mds_handle, LUSTRE_MDS_NAME,
2484                                 obd->obd_proc_entry, NULL, mds_num_threads, 0);
2485
2486         if (!mds->mds_service) {
2487                 CERROR("failed to start service\n");
2488                 GOTO(err_lprocfs, rc = -ENOMEM);
2489         }
2490
2491         rc = ptlrpc_start_threads(obd, mds->mds_service, "ll_mdt");
2492         if (rc)
2493                 GOTO(err_thread, rc);
2494
2495         mds->mds_setattr_service =
2496                 ptlrpc_init_svc(MDS_NBUFS, MDS_BUFSIZE, MDS_MAXREQSIZE,
2497                                 MDS_MAXREPSIZE, MDS_SETATTR_PORTAL,
2498                                 MDC_REPLY_PORTAL, MDS_SERVICE_WATCHDOG_TIMEOUT,
2499                                 mds_handle, "mds_setattr",
2500                                 obd->obd_proc_entry, NULL, mds_num_threads, 0);
2501         if (!mds->mds_setattr_service) {
2502                 CERROR("failed to start getattr service\n");
2503                 GOTO(err_thread, rc = -ENOMEM);
2504         }
2505
2506         rc = ptlrpc_start_threads(obd, mds->mds_setattr_service,
2507                                   "ll_mdt_attr");
2508         if (rc)
2509                 GOTO(err_thread2, rc);
2510
2511         mds->mds_readpage_service =
2512                 ptlrpc_init_svc(MDS_NBUFS, MDS_BUFSIZE, MDS_MAXREQSIZE,
2513                                 MDS_MAXREPSIZE, MDS_READPAGE_PORTAL,
2514                                 MDC_REPLY_PORTAL, MDS_SERVICE_WATCHDOG_TIMEOUT,
2515                                 mds_handle, "mds_readpage",
2516                                 obd->obd_proc_entry, NULL, mds_num_threads, 0);
2517         if (!mds->mds_readpage_service) {
2518                 CERROR("failed to start readpage service\n");
2519                 GOTO(err_thread2, rc = -ENOMEM);
2520         }
2521
2522         rc = ptlrpc_start_threads(obd, mds->mds_readpage_service,
2523                                   "ll_mdt_rdpg");
2524
2525         if (rc)
2526                 GOTO(err_thread3, rc);
2527
2528         ping_evictor_start();
2529
2530         RETURN(0);
2531
2532 err_thread3:
2533         ptlrpc_unregister_service(mds->mds_readpage_service);
2534         mds->mds_readpage_service = NULL;
2535 err_thread2:
2536         ptlrpc_unregister_service(mds->mds_setattr_service);
2537         mds->mds_setattr_service = NULL;
2538 err_thread:
2539         ptlrpc_unregister_service(mds->mds_service);
2540         mds->mds_service = NULL;
2541 err_lprocfs:
2542         lprocfs_obd_cleanup(obd);
2543         return rc;
2544 }
2545
2546 static int mdt_cleanup(struct obd_device *obd)
2547 {
2548         struct mds_obd *mds = &obd->u.mds;
2549         ENTRY;
2550
2551         ping_evictor_stop();
2552
2553         down(&mds->mds_health_sem);
2554         ptlrpc_unregister_service(mds->mds_readpage_service);
2555         ptlrpc_unregister_service(mds->mds_setattr_service);
2556         ptlrpc_unregister_service(mds->mds_service);
2557         mds->mds_readpage_service = NULL;
2558         mds->mds_setattr_service = NULL;
2559         mds->mds_service = NULL;
2560         up(&mds->mds_health_sem);
2561
2562         lprocfs_obd_cleanup(obd);
2563
2564         RETURN(0);
2565 }
2566
2567 static int mdt_health_check(struct obd_device *obd)
2568 {
2569         struct mds_obd *mds = &obd->u.mds;
2570         int rc = 0;
2571
2572         down(&mds->mds_health_sem);
2573         rc |= ptlrpc_service_health_check(mds->mds_readpage_service);
2574         rc |= ptlrpc_service_health_check(mds->mds_setattr_service);
2575         rc |= ptlrpc_service_health_check(mds->mds_service);
2576         up(&mds->mds_health_sem);
2577
2578         /*
2579          * health_check to return 0 on healthy
2580          * and 1 on unhealthy.
2581          */
2582         if(rc != 0)
2583                 rc = 1;
2584
2585         return rc;
2586 }
2587
2588 static struct dentry *mds_lvfs_fid2dentry(__u64 id, __u32 gen, __u64 gr,
2589                                           void *data)
2590 {
2591         struct obd_device *obd = data;
2592         struct ll_fid fid;
2593         fid.id = id;
2594         fid.generation = gen;
2595         return mds_fid2dentry(&obd->u.mds, &fid, NULL);
2596 }
2597
2598 static int mds_health_check(struct obd_device *obd)
2599 {
2600         struct obd_device_target *odt = &obd->u.obt;
2601         struct mds_obd *mds = &obd->u.mds;
2602         int rc = 0;
2603
2604         if (odt->obt_sb->s_flags & MS_RDONLY)
2605                 rc = 1;
2606
2607         LASSERT(mds->mds_health_check_filp != NULL);
2608         rc |= !!lvfs_check_io_health(obd, mds->mds_health_check_filp);
2609
2610         return rc;
2611 }
2612
2613 struct lvfs_callback_ops mds_lvfs_ops = {
2614         l_fid2dentry:     mds_lvfs_fid2dentry,
2615 };
2616
2617 /* use obd ops to offer management infrastructure */
2618 static struct obd_ops mds_obd_ops = {
2619         .o_owner           = THIS_MODULE,
2620         .o_connect         = mds_connect,
2621         .o_reconnect       = mds_reconnect,
2622         .o_init_export     = mds_init_export,
2623         .o_destroy_export  = mds_destroy_export,
2624         .o_disconnect      = mds_disconnect,
2625         .o_setup           = mds_setup,
2626         .o_precleanup      = mds_precleanup,
2627         .o_cleanup         = mds_cleanup,
2628         .o_postrecov       = mds_postrecov,
2629         .o_statfs          = mds_obd_statfs,
2630         .o_iocontrol       = mds_iocontrol,
2631         .o_create          = mds_obd_create,
2632         .o_destroy         = mds_obd_destroy,
2633         .o_llog_init       = mds_llog_init,
2634         .o_llog_finish     = mds_llog_finish,
2635         .o_notify          = mds_notify,
2636         .o_health_check    = mds_health_check,
2637 };
2638
2639 static struct obd_ops mdt_obd_ops = {
2640         .o_owner           = THIS_MODULE,
2641         .o_setup           = mdt_setup,
2642         .o_cleanup         = mdt_cleanup,
2643         .o_health_check    = mdt_health_check,
2644 };
2645
2646 quota_interface_t *quota_interface;
2647 quota_interface_t mds_quota_interface;
2648
2649 static __attribute__((unused)) int __init mds_init(void)
2650 {
2651         int rc;
2652         struct lprocfs_static_vars lvars;
2653
2654         quota_interface = PORTAL_SYMBOL_GET(mds_quota_interface);
2655         rc = lquota_init(quota_interface);
2656         if (rc) {
2657                 if (quota_interface)
2658                         PORTAL_SYMBOL_PUT(mds_quota_interface);
2659                 return rc;
2660         }
2661         init_obd_quota_ops(quota_interface, &mds_obd_ops);
2662
2663         lprocfs_init_vars(mds, &lvars);
2664         class_register_type(&mds_obd_ops, NULL,
2665                             lvars.module_vars, LUSTRE_MDS_NAME, NULL);
2666         lprocfs_init_vars(mdt, &lvars);
2667         class_register_type(&mdt_obd_ops, NULL,
2668                             lvars.module_vars, LUSTRE_MDT_NAME, NULL);
2669
2670         return 0;
2671 }
2672
2673 static __attribute__((unused)) void /*__exit*/ mds_exit(void)
2674 {
2675         lquota_exit(quota_interface);
2676         if (quota_interface)
2677                 PORTAL_SYMBOL_PUT(mds_quota_interface);
2678
2679         class_unregister_type(LUSTRE_MDS_NAME);
2680         class_unregister_type(LUSTRE_MDT_NAME);
2681 }
2682 /*mds still need lov setup here*/
2683 static int mds_cmd_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
2684 {
2685         struct mds_obd *mds = &obd->u.mds;
2686         struct lvfs_run_ctxt saved;
2687         const char     *dev;
2688         struct vfsmount *mnt;
2689         struct lustre_sb_info *lsi;
2690         struct lustre_mount_info *lmi;
2691         struct dentry  *dentry;
2692         struct file *file;
2693         int rc = 0;
2694         ENTRY;
2695
2696         CDEBUG(D_INFO, "obd %s setup \n", obd->obd_name);
2697         if (strcmp(obd->obd_name, MDD_OBD_NAME))
2698                 RETURN(0);
2699      
2700         if (lcfg->lcfg_bufcount < 5) {
2701                 CERROR("invalid arg for setup %s\n", MDD_OBD_NAME);
2702                 RETURN(-EINVAL);
2703         }
2704         dev = lustre_cfg_string(lcfg, 4);
2705         lmi = server_get_mount(dev);
2706         LASSERT(lmi != NULL); 
2707         
2708         lsi = s2lsi(lmi->lmi_sb);
2709         mnt = lmi->lmi_mnt;
2710         
2711         obd->obd_fsops = fsfilt_get_ops(MT_STR(lsi->lsi_ldd));
2712         mds_init_ctxt(obd, mnt);
2713
2714         push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
2715         dentry = simple_mkdir(current->fs->pwd, "OBJECTS", 0777, 1);
2716         if (IS_ERR(dentry)) {
2717                 rc = PTR_ERR(dentry);
2718                 CERROR("cannot create OBJECTS directory: rc = %d\n", rc);
2719                 GOTO(err_putfs, rc);
2720         }
2721         mds->mds_objects_dir = dentry;
2722
2723         dentry = lookup_one_len("__iopen__", current->fs->pwd,
2724                                 strlen("__iopen__"));
2725         if (IS_ERR(dentry)) {
2726                 rc = PTR_ERR(dentry);
2727                 CERROR("cannot lookup __iopen__ directory: rc = %d\n", rc);
2728                 GOTO(err_objects, rc);
2729         }
2730
2731         mds->mds_fid_de = dentry;
2732         if (!dentry->d_inode || is_bad_inode(dentry->d_inode)) {
2733                 rc = -ENOENT;
2734                 CERROR("__iopen__ directory has no inode? rc = %d\n", rc);
2735                 GOTO(err_fid, rc);
2736         }
2737         
2738         /* open and test the lov objd file */
2739         file = filp_open(LOV_OBJID, O_RDWR | O_CREAT, 0644);
2740         if (IS_ERR(file)) {
2741                 rc = PTR_ERR(file);
2742                 CERROR("cannot open/create %s file: rc = %d\n", LOV_OBJID, rc);
2743                 GOTO(err_fid, rc = PTR_ERR(file));
2744         }
2745         mds->mds_lov_objid_filp = file;
2746         if (!S_ISREG(file->f_dentry->d_inode->i_mode)) {
2747                 CERROR("%s is not a regular file!: mode = %o\n", LOV_OBJID,
2748                        file->f_dentry->d_inode->i_mode);
2749                 GOTO(err_lov_objid, rc = -ENOENT);
2750         }
2751
2752         rc = mds_lov_presetup(mds, lcfg);
2753         if (rc < 0)
2754                 GOTO(err_objects, rc);
2755
2756         /* Don't wait for mds_postrecov trying to clear orphans */
2757         obd->obd_async_recov = 1;
2758         rc = mds_postsetup(obd);
2759         obd->obd_async_recov = 0;
2760         
2761         if (rc)
2762                 GOTO(err_objects, rc);
2763         
2764         mds->mds_max_mdsize = sizeof(struct lov_mds_md);
2765         mds->mds_max_cookiesize = sizeof(struct llog_cookie);
2766
2767 err_pop:
2768         pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
2769         RETURN(rc);
2770 err_lov_objid:
2771         if (mds->mds_lov_objid_filp && 
2772                 filp_close((struct file *)mds->mds_lov_objid_filp, 0))
2773                 CERROR("can't close %s after error\n", LOV_OBJID);
2774 err_fid:
2775         dput(mds->mds_fid_de);
2776 err_objects:
2777         dput(mds->mds_objects_dir);
2778 err_putfs:
2779         fsfilt_put_ops(obd->obd_fsops);
2780         goto err_pop;
2781 }
2782
2783 static int mds_cmd_cleanup(struct obd_device *obd)
2784 {
2785         struct mds_obd *mds = &obd->u.mds;
2786         struct lvfs_run_ctxt saved;
2787         int rc = 0;
2788         ENTRY;
2789
2790         if (obd->obd_fail)
2791                 LCONSOLE_WARN("%s: shutting down for failover; client state "
2792                               "will be preserved.\n", obd->obd_name);
2793
2794         push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
2795         if (mds->mds_lov_objid_filp) {
2796                 rc = filp_close((struct file *)mds->mds_lov_objid_filp, 0);
2797                 mds->mds_lov_objid_filp = NULL;
2798                 if (rc)
2799                         CERROR("%s file won't close, rc=%d\n", LOV_OBJID, rc);
2800         }
2801         if (mds->mds_objects_dir != NULL) {
2802                 l_dput(mds->mds_objects_dir);
2803                 mds->mds_objects_dir = NULL;
2804         }
2805
2806         pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
2807         shrink_dcache_parent(mds->mds_fid_de);
2808         dput(mds->mds_fid_de);
2809         LL_DQUOT_OFF(obd->u.obt.obt_sb);
2810         fsfilt_put_ops(obd->obd_fsops);
2811         
2812         RETURN(rc);
2813 }
2814
2815 #if 0
2816 static int mds_cmd_health_check(struct obd_device *obd)
2817 {
2818         return 0;
2819 }
2820 #endif
2821 static struct obd_ops mds_cmd_obd_ops = {
2822         .o_owner           = THIS_MODULE,
2823         .o_setup           = mds_cmd_setup,
2824         .o_cleanup         = mds_cmd_cleanup,
2825         .o_precleanup      = mds_precleanup,
2826         .o_create          = mds_obd_create,
2827         .o_destroy         = mds_obd_destroy,
2828         .o_llog_init       = mds_llog_init,
2829         .o_llog_finish     = mds_llog_finish,
2830         .o_notify          = mds_notify,
2831      //   .o_health_check    = mds_cmd_health_check,
2832 };
2833
2834 static int __init mds_cmd_init(void)
2835 {
2836         struct lprocfs_static_vars lvars;
2837
2838         lprocfs_init_vars(mds, &lvars);
2839         class_register_type(&mds_cmd_obd_ops, NULL, lvars.module_vars,
2840                             LUSTRE_MDS_NAME, NULL);
2841
2842         return 0;
2843 }
2844
2845 static void /*__exit*/ mds_cmd_exit(void)
2846 {
2847         class_unregister_type(LUSTRE_MDS_NAME);
2848 }
2849
2850 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
2851 MODULE_DESCRIPTION("Lustre Metadata Server (MDS)");
2852 MODULE_LICENSE("GPL");
2853
2854 module_init(mds_cmd_init);
2855 module_exit(mds_cmd_exit);