Whamcloud - gitweb
b=3462
[fs/lustre-release.git] / lustre / mds / handler.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  lustre/mds/handler.c
5  *  Lustre Metadata Server (mds) request handler
6  *
7  *  Copyright (c) 2001-2005 Cluster File Systems, Inc.
8  *   Author: Peter Braam <braam@clusterfs.com>
9  *   Author: Andreas Dilger <adilger@clusterfs.com>
10  *   Author: Phil Schwan <phil@clusterfs.com>
11  *   Author: Mike Shaver <shaver@clusterfs.com>
12  *
13  *   This file is part of the Lustre file system, http://www.lustre.org
14  *   Lustre is a trademark of Cluster File Systems, Inc.
15  *
16  *   You may have signed or agreed to another license before downloading
17  *   this software.  If so, you are bound by the terms and conditions
18  *   of that agreement, and the following does not apply to you.  See the
19  *   LICENSE file included with this distribution for more information.
20  *
21  *   If you did not agree to a different license, then this copy of Lustre
22  *   is open source software; you can redistribute it and/or modify it
23  *   under the terms of version 2 of the GNU General Public License as
24  *   published by the Free Software Foundation.
25  *
26  *   In either case, Lustre is distributed in the hope that it will be
27  *   useful, but WITHOUT ANY WARRANTY; without even the implied warranty
28  *   of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
29  *   license text for more details.
30  */
31
32 #ifndef EXPORT_SYMTAB
33 # define EXPORT_SYMTAB
34 #endif
35 #define DEBUG_SUBSYSTEM S_MDS
36
37 #include <lustre_mds.h>
38 #include <linux/module.h>
39 #include <linux/init.h>
40 #include <linux/random.h>
41 #include <linux/fs.h>
42 #include <linux/jbd.h>
43 #if (LINUX_VERSION_CODE >= KERNEL_VERSION(2,5,0))
44 # include <linux/smp_lock.h>
45 # include <linux/buffer_head.h>
46 # include <linux/workqueue.h>
47 # include <linux/mount.h>
48 #else
49 # include <linux/locks.h>
50 #endif
51
52 #include <obd_class.h>
53 #include <lustre_dlm.h>
54 #include <obd_lov.h>
55 #include <lustre_fsfilt.h>
56 #include <lprocfs_status.h>
57 #include <lustre_commit_confd.h>
58 #include <lustre_quota.h>
59 #include <lustre_disk.h>
60 #include <lustre_param.h>
61
62 #include "mds_internal.h"
63
64 int mds_num_threads;
65 CFS_MODULE_PARM(mds_num_threads, "i", int, 0444,
66                 "number of MDS service threads to start");
67
68 static int mds_intent_policy(struct ldlm_namespace *ns,
69                              struct ldlm_lock **lockp, void *req_cookie,
70                              ldlm_mode_t mode, int flags, void *data);
71 static int mds_postsetup(struct obd_device *obd);
72 static int mds_cleanup(struct obd_device *obd);
73
74 /* Assumes caller has already pushed into the kernel filesystem context */
75 static int mds_sendpage(struct ptlrpc_request *req, struct file *file,
76                         loff_t offset, int count)
77 {
78         struct ptlrpc_bulk_desc *desc;
79         struct l_wait_info lwi;
80         struct page **pages;
81         int timeout;
82         int rc = 0, npages, i, tmpcount, tmpsize = 0;
83         ENTRY;
84
85         LASSERT((offset & ~CFS_PAGE_MASK) == 0); /* I'm dubious about this */
86
87         npages = (count + CFS_PAGE_SIZE - 1) >> CFS_PAGE_SHIFT;
88         OBD_ALLOC(pages, sizeof(*pages) * npages);
89         if (!pages)
90                 GOTO(out, rc = -ENOMEM);
91
92         desc = ptlrpc_prep_bulk_exp(req, npages, BULK_PUT_SOURCE,
93                                     MDS_BULK_PORTAL);
94         if (desc == NULL)
95                 GOTO(out_free, rc = -ENOMEM);
96
97         for (i = 0, tmpcount = count; i < npages; i++, tmpcount -= tmpsize) {
98                 tmpsize = tmpcount > CFS_PAGE_SIZE ? CFS_PAGE_SIZE : tmpcount;
99
100                 OBD_PAGE_ALLOC(pages[i], CFS_ALLOC_STD);
101                 if (pages[i] == NULL)
102                         GOTO(cleanup_buf, rc = -ENOMEM);
103
104                 ptlrpc_prep_bulk_page(desc, pages[i], 0, tmpsize);
105         }
106
107         for (i = 0, tmpcount = count; i < npages; i++, tmpcount -= tmpsize) {
108                 tmpsize = tmpcount > CFS_PAGE_SIZE ? CFS_PAGE_SIZE : tmpcount;
109                 CDEBUG(D_EXT2, "reading %u@%llu from dir %lu (size %llu)\n",
110                        tmpsize, offset, file->f_dentry->d_inode->i_ino,
111                        i_size_read(file->f_dentry->d_inode));
112
113                 rc = fsfilt_readpage(req->rq_export->exp_obd, file,
114                                      kmap(pages[i]), tmpsize, &offset);
115                 kunmap(pages[i]);
116
117                 if (rc != tmpsize)
118                         GOTO(cleanup_buf, rc = -EIO);
119         }
120
121         LASSERT(desc->bd_nob == count);
122
123         rc = ptlrpc_start_bulk_transfer(desc);
124         if (rc)
125                 GOTO(cleanup_buf, rc);
126
127         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_SENDPAGE)) {
128                 CERROR("obd_fail_loc=%x, fail operation rc=%d\n",
129                        OBD_FAIL_MDS_SENDPAGE, rc);
130                 GOTO(abort_bulk, rc);
131         }
132
133         timeout = (int)req->rq_deadline - (int)cfs_time_current_sec();
134         if (timeout < 0) {
135                 CERROR("Req deadline already passed %lu (now: %lu)\n",
136                        req->rq_deadline, cfs_time_current_sec());
137         }
138         lwi = LWI_TIMEOUT(max(timeout, 1) * HZ, NULL, NULL);
139         rc = l_wait_event(desc->bd_waitq, !ptlrpc_bulk_active(desc), &lwi);
140         LASSERT (rc == 0 || rc == -ETIMEDOUT);
141
142         if (rc == 0) {
143                 if (desc->bd_success &&
144                     desc->bd_nob_transferred == count)
145                         GOTO(cleanup_buf, rc);
146
147                 rc = -ETIMEDOUT; /* XXX should this be a different errno? */
148         }
149
150         DEBUG_REQ(D_ERROR, req, "bulk failed: %s %d(%d), evicting %s@%s\n",
151                   (rc == -ETIMEDOUT) ? "timeout" : "network error",
152                   desc->bd_nob_transferred, count,
153                   req->rq_export->exp_client_uuid.uuid,
154                   req->rq_export->exp_connection->c_remote_uuid.uuid);
155
156         class_fail_export(req->rq_export);
157
158         EXIT;
159  abort_bulk:
160         ptlrpc_abort_bulk (desc);
161  cleanup_buf:
162         for (i = 0; i < npages; i++)
163                 if (pages[i])
164                         OBD_PAGE_FREE(pages[i]);
165
166         ptlrpc_free_bulk(desc);
167  out_free:
168         OBD_FREE(pages, sizeof(*pages) * npages);
169  out:
170         return rc;
171 }
172
173 /* only valid locked dentries or errors should be returned */
174 struct dentry *mds_fid2locked_dentry(struct obd_device *obd, struct ll_fid *fid,
175                                      struct vfsmount **mnt, int lock_mode,
176                                      struct lustre_handle *lockh,
177                                      char *name, int namelen, __u64 lockpart)
178 {
179         struct mds_obd *mds = &obd->u.mds;
180         struct dentry *de = mds_fid2dentry(mds, fid, mnt), *retval = de;
181         struct ldlm_res_id res_id = { .name = {0} };
182         int flags = LDLM_FL_ATOMIC_CB, rc;
183         ldlm_policy_data_t policy = { .l_inodebits = { lockpart} }; 
184         ENTRY;
185
186         if (IS_ERR(de))
187                 RETURN(de);
188
189         res_id.name[0] = de->d_inode->i_ino;
190         res_id.name[1] = de->d_inode->i_generation;
191         rc = ldlm_cli_enqueue_local(obd->obd_namespace, res_id, 
192                                     LDLM_IBITS, &policy, lock_mode, &flags, 
193                                     ldlm_blocking_ast, ldlm_completion_ast,
194                                     NULL, NULL, 0, NULL, lockh);
195         if (rc != ELDLM_OK) {
196                 l_dput(de);
197                 retval = ERR_PTR(-EIO); /* XXX translate ldlm code */
198         }
199
200         RETURN(retval);
201 }
202
203 /* Look up an entry by inode number. */
204 /* this function ONLY returns valid dget'd dentries with an initialized inode
205    or errors */
206 struct dentry *mds_fid2dentry(struct mds_obd *mds, struct ll_fid *fid,
207                               struct vfsmount **mnt)
208 {
209         char fid_name[32];
210         unsigned long ino = fid->id;
211         __u32 generation = fid->generation;
212         struct inode *inode;
213         struct dentry *result;
214
215         if (ino == 0)
216                 RETURN(ERR_PTR(-ESTALE));
217
218         snprintf(fid_name, sizeof(fid_name), "0x%lx", ino);
219
220         CDEBUG(D_DENTRY, "--> mds_fid2dentry: ino/gen %lu/%u, sb %p\n",
221                ino, generation, mds->mds_obt.obt_sb);
222
223         /* under ext3 this is neither supposed to return bad inodes
224            nor NULL inodes. */
225         result = ll_lookup_one_len(fid_name, mds->mds_fid_de, strlen(fid_name));
226         if (IS_ERR(result))
227                 RETURN(result);
228
229         inode = result->d_inode;
230         if (!inode)
231                 RETURN(ERR_PTR(-ENOENT));
232
233        if (inode->i_nlink == 0) {
234                 if (inode->i_mode == 0 &&
235                     LTIME_S(inode->i_ctime) == 0 ) {
236                         struct obd_device *obd = container_of(mds, struct
237                                                               obd_device, u.mds);
238                         LCONSOLE_WARN("Found inode with zero nlink, mode and "
239                                       "ctime -- this may indicate disk"
240                                       "corruption (device %s, inode %lu, link:"
241                                       " %lu, count: %d)\n", obd->obd_name, inode->i_ino,
242                                       (unsigned long)inode->i_nlink,
243                                       atomic_read(&inode->i_count));
244                 }
245                 dput(result);
246                 RETURN(ERR_PTR(-ENOENT));
247         }
248
249         if (generation && inode->i_generation != generation) {
250                 /* we didn't find the right inode.. */
251                 CDEBUG(D_INODE, "found wrong generation: inode %lu, link: %lu, "
252                        "count: %d, generation %u/%u\n", inode->i_ino,
253                        (unsigned long)inode->i_nlink,
254                        atomic_read(&inode->i_count), inode->i_generation,
255                        generation);
256                 dput(result);
257                 RETURN(ERR_PTR(-ENOENT));
258         }
259
260         if (mnt) {
261                 *mnt = mds->mds_vfsmnt;
262                 mntget(*mnt);
263         }
264
265         RETURN(result);
266 }
267
268 static int mds_connect_internal(struct obd_export *exp, 
269                                 struct obd_connect_data *data)
270 {
271         struct obd_device *obd = exp->exp_obd;
272         if (data != NULL) {
273                 data->ocd_connect_flags &= MDS_CONNECT_SUPPORTED;
274                 data->ocd_ibits_known &= MDS_INODELOCK_FULL;
275
276                 /* If no known bits (which should not happen, probably,
277                    as everybody should support LOOKUP and UPDATE bits at least)
278                    revert to compat mode with plain locks. */
279                 if (!data->ocd_ibits_known &&
280                     data->ocd_connect_flags & OBD_CONNECT_IBITS)
281                         data->ocd_connect_flags &= ~OBD_CONNECT_IBITS;
282
283                 if (!obd->u.mds.mds_fl_acl)
284                         data->ocd_connect_flags &= ~OBD_CONNECT_ACL;
285
286                 if (!obd->u.mds.mds_fl_user_xattr)
287                         data->ocd_connect_flags &= ~OBD_CONNECT_XATTR;
288
289                 exp->exp_connect_flags = data->ocd_connect_flags;
290                 data->ocd_version = LUSTRE_VERSION_CODE;
291                 exp->exp_mds_data.med_ibits_known = data->ocd_ibits_known;
292         }
293
294         if (obd->u.mds.mds_fl_acl &&
295             ((exp->exp_connect_flags & OBD_CONNECT_ACL) == 0)) {
296                 CWARN("%s: MDS requires ACL support but client does not\n",
297                       obd->obd_name);
298                 return -EBADE;
299         }
300         return 0;
301 }
302
303 static int mds_reconnect(struct obd_export *exp, struct obd_device *obd,
304                          struct obd_uuid *cluuid,
305                          struct obd_connect_data *data)
306 {
307         int rc;
308         ENTRY;
309
310         if (exp == NULL || obd == NULL || cluuid == NULL)
311                 RETURN(-EINVAL);
312
313         rc = mds_connect_internal(exp, data);
314
315         RETURN(rc);
316 }
317
318 /* Establish a connection to the MDS.
319  *
320  * This will set up an export structure for the client to hold state data
321  * about that client, like open files, the last operation number it did
322  * on the server, etc.
323  */
324 static int mds_connect(struct lustre_handle *conn, struct obd_device *obd,
325                        struct obd_uuid *cluuid, struct obd_connect_data *data,
326                        void *localdata)
327 {
328         struct obd_export *exp;
329         struct mds_export_data *med;
330         struct mds_client_data *mcd = NULL;
331         lnet_nid_t *client_nid = (lnet_nid_t *)localdata;
332         int rc, abort_recovery;
333         ENTRY;
334
335         if (!conn || !obd || !cluuid)
336                 RETURN(-EINVAL);
337
338         /* Check for aborted recovery. */
339         spin_lock_bh(&obd->obd_processing_task_lock);
340         abort_recovery = obd->obd_abort_recovery;
341         spin_unlock_bh(&obd->obd_processing_task_lock);
342         if (abort_recovery)
343                 target_abort_recovery(obd);
344
345         /* XXX There is a small race between checking the list and adding a
346          * new connection for the same UUID, but the real threat (list
347          * corruption when multiple different clients connect) is solved.
348          *
349          * There is a second race between adding the export to the list,
350          * and filling in the client data below.  Hence skipping the case
351          * of NULL mcd above.  We should already be controlling multiple
352          * connects at the client, and we can't hold the spinlock over
353          * memory allocations without risk of deadlocking.
354          */
355         rc = class_connect(conn, obd, cluuid);
356         if (rc)
357                 RETURN(rc);
358         exp = class_conn2export(conn);
359         LASSERT(exp);
360         med = &exp->exp_mds_data;
361
362         rc = mds_connect_internal(exp, data);
363         if (rc)
364                 GOTO(out, rc);
365
366         OBD_ALLOC(mcd, sizeof(*mcd));
367         if (!mcd)
368                 GOTO(out, rc = -ENOMEM);
369
370         memcpy(mcd->mcd_uuid, cluuid, sizeof(mcd->mcd_uuid));
371         med->med_mcd = mcd;
372
373         rc = mds_client_add(obd, exp, -1, *client_nid);
374         GOTO(out, rc);
375
376 out:
377         if (rc) {
378                 if (mcd) {
379                         OBD_FREE(mcd, sizeof(*mcd));
380                         med->med_mcd = NULL;
381                 }
382                 class_disconnect(exp);
383         } else {
384                 class_export_put(exp);
385         }
386
387         RETURN(rc);
388 }
389
390 int mds_init_export(struct obd_export *exp)
391 {
392         struct mds_export_data *med = &exp->exp_mds_data;
393
394         INIT_LIST_HEAD(&med->med_open_head);
395         spin_lock_init(&med->med_open_lock);
396         
397         spin_lock(&exp->exp_lock);
398         exp->exp_connecting = 1;
399         spin_unlock(&exp->exp_lock);
400
401         RETURN(0);
402 }
403
404 static int mds_destroy_export(struct obd_export *export)
405 {
406         struct mds_export_data *med;
407         struct obd_device *obd = export->exp_obd;
408         struct mds_obd *mds = &obd->u.mds;
409         struct lvfs_run_ctxt saved;
410         struct lov_mds_md *lmm;
411         struct llog_cookie *logcookies;
412         int rc = 0;
413         ENTRY;
414
415         med = &export->exp_mds_data;
416         target_destroy_export(export);
417
418         if (obd_uuid_equals(&export->exp_client_uuid, &obd->obd_uuid))
419                 RETURN(0);
420
421         push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
422         /* Close any open files (which may also cause orphan unlinking). */
423
424         OBD_ALLOC(lmm, mds->mds_max_mdsize);
425         if (lmm == NULL) {
426                 CWARN("%s: allocation failure during cleanup; can not force "
427                       "close file handles on this service.\n", obd->obd_name);
428                 GOTO(out, rc = -ENOMEM);
429         }
430
431         OBD_ALLOC(logcookies, mds->mds_max_cookiesize);
432         if (logcookies == NULL) {
433                 CWARN("%s: allocation failure during cleanup; can not force "
434                       "close file handles on this service.\n", obd->obd_name);
435                 OBD_FREE(lmm, mds->mds_max_mdsize);
436                 GOTO(out, rc = -ENOMEM);
437         }
438
439         spin_lock(&med->med_open_lock);
440         while (!list_empty(&med->med_open_head)) {
441                 struct list_head *tmp = med->med_open_head.next;
442                 struct mds_file_data *mfd =
443                         list_entry(tmp, struct mds_file_data, mfd_list);
444                 int lmm_size = mds->mds_max_mdsize;
445                 umode_t mode = mfd->mfd_dentry->d_inode->i_mode;
446                 __u64 valid = 0;
447
448                 /* Remove mfd handle so it can't be found again.
449                  * We are consuming the mfd_list reference here. */
450                 mds_mfd_unlink(mfd, 0);
451                 spin_unlock(&med->med_open_lock);
452
453                 /* If you change this message, be sure to update
454                  * replay_single:test_46 */
455                 CDEBUG(D_INODE|D_IOCTL, "%s: force closing file handle for "
456                        "%.*s (ino %lu)\n", obd->obd_name,
457                        mfd->mfd_dentry->d_name.len,mfd->mfd_dentry->d_name.name,
458                        mfd->mfd_dentry->d_inode->i_ino);
459
460                 rc = mds_get_md(obd, mfd->mfd_dentry->d_inode, lmm,&lmm_size,1);
461                 if (rc < 0)
462                         CWARN("mds_get_md failure, rc=%d\n", rc);
463                 else
464                         valid |= OBD_MD_FLEASIZE;
465
466                 /* child orphan sem protects orphan_dec_test and
467                  * is_orphan race, mds_mfd_close drops it */
468                 MDS_DOWN_WRITE_ORPHAN_SEM(mfd->mfd_dentry->d_inode);
469
470                 rc = mds_mfd_close(NULL, REQ_REC_OFF, obd, mfd,
471                                    !(export->exp_flags & OBD_OPT_FAILOVER),
472                                    lmm, lmm_size, logcookies,
473                                    mds->mds_max_cookiesize,
474                                    &valid);
475
476                 if (rc)
477                         CDEBUG(D_INODE|D_IOCTL, "Error closing file: %d\n", rc);
478
479                 if (valid & OBD_MD_FLCOOKIE) {
480                         rc = mds_osc_destroy_orphan(obd, mode, lmm,
481                                                     lmm_size, logcookies, 1);
482                         if (rc < 0) {
483                                 CDEBUG(D_INODE, "%s: destroy of orphan failed,"
484                                        " rc = %d\n", obd->obd_name, rc);
485                                 rc = 0;
486                         }
487                         valid &= ~OBD_MD_FLCOOKIE;
488                 }
489
490                 spin_lock(&med->med_open_lock);
491         }
492
493         OBD_FREE(logcookies, mds->mds_max_cookiesize);
494         OBD_FREE(lmm, mds->mds_max_mdsize);
495
496         spin_unlock(&med->med_open_lock);
497
498         pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
499         mds_client_free(export);
500
501  out:
502         RETURN(rc);
503 }
504
505 static int mds_disconnect(struct obd_export *exp)
506 {
507         int rc;
508         ENTRY;
509
510         LASSERT(exp);
511         class_export_get(exp);
512
513         /* Disconnect early so that clients can't keep using export */
514         rc = class_disconnect(exp);
515         if (exp->exp_obd->obd_namespace != NULL)
516                 ldlm_cancel_locks_for_export(exp);
517
518         /* complete all outstanding replies */
519         spin_lock(&exp->exp_lock);
520         while (!list_empty(&exp->exp_outstanding_replies)) {
521                 struct ptlrpc_reply_state *rs =
522                         list_entry(exp->exp_outstanding_replies.next,
523                                    struct ptlrpc_reply_state, rs_exp_list);
524                 struct ptlrpc_service *svc = rs->rs_service;
525
526                 spin_lock(&svc->srv_lock);
527                 list_del_init(&rs->rs_exp_list);
528                 ptlrpc_schedule_difficult_reply(rs);
529                 spin_unlock(&svc->srv_lock);
530         }
531         spin_unlock(&exp->exp_lock);
532
533         class_export_put(exp);
534         RETURN(rc);
535 }
536
537 static int mds_getstatus(struct ptlrpc_request *req)
538 {
539         struct mds_obd *mds = mds_req2mds(req);
540         struct mds_body *body;
541         int rc, size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
542         ENTRY;
543
544         rc = lustre_pack_reply(req, 2, size, NULL);
545         if (rc || OBD_FAIL_CHECK(OBD_FAIL_MDS_GETSTATUS_PACK)) {
546                 CERROR("mds: out of memory for message\n");
547                 req->rq_status = -ENOMEM;       /* superfluous? */
548                 RETURN(-ENOMEM);
549         }
550
551         body = lustre_msg_buf(req->rq_repmsg, REPLY_REC_OFF, sizeof(*body));
552         memcpy(&body->fid1, &mds->mds_rootfid, sizeof(body->fid1));
553
554         /* the last_committed and last_xid fields are filled in for all
555          * replies already - no need to do so here also.
556          */
557         RETURN(0);
558 }
559
560 /* get the LOV EA from @inode and store it into @md.  It can be at most
561  * @size bytes, and @size is updated with the actual EA size.
562  * The EA size is also returned on success, and -ve errno on failure. 
563  * If there is no EA then 0 is returned. */
564 int mds_get_md(struct obd_device *obd, struct inode *inode, void *md,
565                int *size, int lock)
566 {
567         int rc = 0;
568         int lmm_size;
569
570         if (lock)
571                 LOCK_INODE_MUTEX(inode);
572         rc = fsfilt_get_md(obd, inode, md, *size, "lov");
573
574         if (rc < 0) {
575                 CERROR("Error %d reading eadata for ino %lu\n",
576                        rc, inode->i_ino);
577         } else if (rc > 0) {
578                 lmm_size = rc;
579                 rc = mds_convert_lov_ea(obd, inode, md, lmm_size);
580
581                 if (rc == 0) {
582                         *size = lmm_size;
583                         rc = lmm_size;
584                 } else if (rc > 0) {
585                         *size = rc;
586                 }
587         } else {
588                 *size = 0;
589         }
590         if (lock)
591                 UNLOCK_INODE_MUTEX(inode);
592
593         RETURN (rc);
594 }
595
596
597 /* Call with lock=1 if you want mds_pack_md to take the i_mutex.
598  * Call with lock=0 if the caller has already taken the i_mutex. */
599 int mds_pack_md(struct obd_device *obd, struct lustre_msg *msg, int offset,
600                 struct mds_body *body, struct inode *inode, int lock)
601 {
602         struct mds_obd *mds = &obd->u.mds;
603         void *lmm;
604         int lmm_size;
605         int rc;
606         ENTRY;
607
608         lmm = lustre_msg_buf(msg, offset, 0);
609         if (lmm == NULL) {
610                 /* Some problem with getting eadata when I sized the reply
611                  * buffer... */
612                 CDEBUG(D_INFO, "no space reserved for inode %lu MD\n",
613                        inode->i_ino);
614                 RETURN(0);
615         }
616         lmm_size = lustre_msg_buflen(msg, offset);
617
618         /* I don't really like this, but it is a sanity check on the client
619          * MD request.  However, if the client doesn't know how much space
620          * to reserve for the MD, it shouldn't be bad to have too much space.
621          */
622         if (lmm_size > mds->mds_max_mdsize) {
623                 CWARN("Reading MD for inode %lu of %d bytes > max %d\n",
624                        inode->i_ino, lmm_size, mds->mds_max_mdsize);
625                 // RETURN(-EINVAL);
626         }
627
628         rc = mds_get_md(obd, inode, lmm, &lmm_size, lock);
629         if (rc > 0) {
630                 if (S_ISDIR(inode->i_mode))
631                         body->valid |= OBD_MD_FLDIREA;
632                 else
633                         body->valid |= OBD_MD_FLEASIZE;
634                 body->eadatasize = lmm_size;
635                 rc = 0;
636         }
637
638         RETURN(rc);
639 }
640
641 #ifdef CONFIG_FS_POSIX_ACL
642 static
643 int mds_pack_posix_acl(struct inode *inode, struct lustre_msg *repmsg,
644                        struct mds_body *repbody, int repoff)
645 {
646         struct dentry de = { .d_inode = inode };
647         int buflen, rc;
648         ENTRY;
649
650         LASSERT(repbody->aclsize == 0);
651         LASSERT(lustre_msg_bufcount(repmsg) > repoff);
652
653         buflen = lustre_msg_buflen(repmsg, repoff);
654         if (!buflen)
655                 GOTO(out, 0);
656
657         if (!inode->i_op || !inode->i_op->getxattr)
658                 GOTO(out, 0);
659
660         lock_24kernel();
661         rc = inode->i_op->getxattr(&de, MDS_XATTR_NAME_ACL_ACCESS,
662                                    lustre_msg_buf(repmsg, repoff, buflen),
663                                    buflen);
664         unlock_24kernel();
665
666         if (rc >= 0)
667                 repbody->aclsize = rc;
668         else if (rc != -ENODATA) {
669                 CERROR("buflen %d, get acl: %d\n", buflen, rc);
670                 RETURN(rc);
671         }
672         EXIT;
673 out:
674         repbody->valid |= OBD_MD_FLACL;
675         return 0;
676 }
677 #else
678 #define mds_pack_posix_acl(inode, repmsg, repbody, repoff) 0
679 #endif
680
681 int mds_pack_acl(struct mds_export_data *med, struct inode *inode,
682                  struct lustre_msg *repmsg, struct mds_body *repbody,
683                  int repoff)
684 {
685         return mds_pack_posix_acl(inode, repmsg, repbody, repoff);
686 }
687
688 static int mds_getattr_internal(struct obd_device *obd, struct dentry *dentry,
689                                 struct ptlrpc_request *req,
690                                 struct mds_body *reqbody, int reply_off)
691 {
692         struct mds_body *body;
693         struct inode *inode = dentry->d_inode;
694         int rc = 0;
695         ENTRY;
696
697         if (inode == NULL)
698                 RETURN(-ENOENT);
699
700         body = lustre_msg_buf(req->rq_repmsg, reply_off, sizeof(*body));
701         LASSERT(body != NULL);                 /* caller prepped reply */
702
703         mds_pack_inode2fid(&body->fid1, inode);
704         body->flags = reqbody->flags; /* copy MDS_BFLAG_EXT_FLAGS if present */
705         mds_pack_inode2body(body, inode);
706         reply_off++;
707
708         if ((S_ISREG(inode->i_mode) && (reqbody->valid & OBD_MD_FLEASIZE)) ||
709             (S_ISDIR(inode->i_mode) && (reqbody->valid & OBD_MD_FLDIREA))) {
710                 rc = mds_pack_md(obd, req->rq_repmsg, reply_off, body,
711                                  inode, 1);
712
713                 /* If we have LOV EA data, the OST holds size, atime, mtime */
714                 if (!(body->valid & OBD_MD_FLEASIZE) &&
715                     !(body->valid & OBD_MD_FLDIREA))
716                         body->valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS |
717                                         OBD_MD_FLATIME | OBD_MD_FLMTIME);
718
719                 lustre_shrink_reply(req, reply_off, body->eadatasize, 0);
720                 if (body->eadatasize)
721                         reply_off++;
722         } else if (S_ISLNK(inode->i_mode) &&
723                    (reqbody->valid & OBD_MD_LINKNAME) != 0) {
724                 char *symname = lustre_msg_buf(req->rq_repmsg, reply_off, 0);
725                 int len;
726
727                 LASSERT (symname != NULL);       /* caller prepped reply */
728                 len = lustre_msg_buflen(req->rq_repmsg, reply_off);
729
730                 rc = inode->i_op->readlink(dentry, symname, len);
731                 if (rc < 0) {
732                         CERROR("readlink failed: %d\n", rc);
733                 } else if (rc != len - 1) {
734                         CERROR ("Unexpected readlink rc %d: expecting %d\n",
735                                 rc, len - 1);
736                         rc = -EINVAL;
737                 } else {
738                         CDEBUG(D_INODE, "read symlink dest %s\n", symname);
739                         body->valid |= OBD_MD_LINKNAME;
740                         body->eadatasize = rc + 1;
741                         symname[rc] = 0;        /* NULL terminate */
742                         rc = 0;
743                 }
744                 reply_off++;
745         } else if (reqbody->valid == OBD_MD_FLFLAGS &&
746                    reqbody->flags & MDS_BFLAG_EXT_FLAGS) {
747                 int flags;
748
749                 /* We only return the full set of flags on ioctl, otherwise we
750                  * get enough flags from the inode in mds_pack_inode2body(). */
751                 rc = fsfilt_iocontrol(obd, inode, NULL, EXT3_IOC_GETFLAGS,
752                                       (long)&flags);
753                 if (rc == 0)
754                         body->flags = flags | MDS_BFLAG_EXT_FLAGS;
755         }
756
757         if (reqbody->valid & OBD_MD_FLMODEASIZE) {
758                 struct mds_obd *mds = mds_req2mds(req);
759                 body->max_cookiesize = mds->mds_max_cookiesize;
760                 body->max_mdsize = mds->mds_max_mdsize;
761                 body->valid |= OBD_MD_FLMODEASIZE;
762         }
763
764         if (rc)
765                 RETURN(rc);
766
767 #ifdef CONFIG_FS_POSIX_ACL
768         if ((req->rq_export->exp_connect_flags & OBD_CONNECT_ACL) &&
769             (reqbody->valid & OBD_MD_FLACL)) {
770                 rc = mds_pack_acl(&req->rq_export->exp_mds_data,
771                                   inode, req->rq_repmsg,
772                                   body, reply_off);
773
774                 lustre_shrink_reply(req, reply_off, body->aclsize, 0);
775                 if (body->aclsize)
776                         reply_off++;
777         }
778 #endif
779
780         RETURN(rc);
781 }
782
783 static int mds_getattr_pack_msg(struct ptlrpc_request *req, struct inode *inode,
784                                 int offset)
785 {
786         struct mds_obd *mds = mds_req2mds(req);
787         struct mds_body *body;
788         int rc, bufcount = 2;
789         int size[4] = { sizeof(struct ptlrpc_body), sizeof(*body) };
790         ENTRY;
791
792         LASSERT(offset == REQ_REC_OFF); /* non-intent */
793
794         body = lustre_msg_buf(req->rq_reqmsg, offset, sizeof(*body));
795         LASSERT(body != NULL);                    /* checked by caller */
796         LASSERT(lustre_req_swabbed(req, offset)); /* swabbed by caller */
797
798         if ((S_ISREG(inode->i_mode) && (body->valid & OBD_MD_FLEASIZE)) ||
799             (S_ISDIR(inode->i_mode) && (body->valid & OBD_MD_FLDIREA))) {
800                 LOCK_INODE_MUTEX(inode);
801                 rc = fsfilt_get_md(req->rq_export->exp_obd, inode, NULL, 0,
802                                    "lov");
803                 UNLOCK_INODE_MUTEX(inode);
804                 CDEBUG(D_INODE, "got %d bytes MD data for inode %lu\n",
805                        rc, inode->i_ino);
806                 if (rc < 0) {
807                         if (rc != -ENODATA) {
808                                 CERROR("error getting inode %lu MD: rc = %d\n",
809                                        inode->i_ino, rc);
810                                 RETURN(rc);
811                         }
812                         size[bufcount] = 0;
813                 } else if (rc > mds->mds_max_mdsize) {
814                         size[bufcount] = 0;
815                         CERROR("MD size %d larger than maximum possible %u\n",
816                                rc, mds->mds_max_mdsize);
817                 } else {
818                         size[bufcount] = rc;
819                 }
820                 bufcount++;
821         } else if (S_ISLNK(inode->i_mode) && (body->valid & OBD_MD_LINKNAME)) {
822                 if (i_size_read(inode) + 1 != body->eadatasize)
823                         CERROR("symlink size: %Lu, reply space: %d\n",
824                                i_size_read(inode) + 1, body->eadatasize);
825                 size[bufcount] = min_t(int, i_size_read(inode) + 1,
826                                        body->eadatasize);
827                 bufcount++;
828                 CDEBUG(D_INODE, "symlink size: %Lu, reply space: %d\n",
829                        i_size_read(inode) + 1, body->eadatasize);
830         }
831
832 #ifdef CONFIG_FS_POSIX_ACL
833         if ((req->rq_export->exp_connect_flags & OBD_CONNECT_ACL) &&
834             (body->valid & OBD_MD_FLACL)) {
835                 struct dentry de = { .d_inode = inode };
836
837                 size[bufcount] = 0;
838                 if (inode->i_op && inode->i_op->getxattr) {
839                         lock_24kernel();
840                         rc = inode->i_op->getxattr(&de, MDS_XATTR_NAME_ACL_ACCESS,
841                                                    NULL, 0);
842                         unlock_24kernel();
843
844                         if (rc < 0) {
845                                 if (rc != -ENODATA) {
846                                         CERROR("got acl size: %d\n", rc);
847                                         RETURN(rc);
848                                 }
849                         } else
850                                 size[bufcount] = rc;
851                 }
852                 bufcount++;
853         }
854 #endif
855
856         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_GETATTR_PACK)) {
857                 CERROR("failed MDS_GETATTR_PACK test\n");
858                 req->rq_status = -ENOMEM;
859                 RETURN(-ENOMEM);
860         }
861
862         rc = lustre_pack_reply(req, bufcount, size, NULL);
863         if (rc) {
864                 CERROR("lustre_pack_reply failed: rc %d\n", rc);
865                 req->rq_status = rc;
866                 RETURN(rc);
867         }
868
869         RETURN(0);
870 }
871
872 static int mds_getattr_lock(struct ptlrpc_request *req, int offset,
873                             int child_part, struct lustre_handle *child_lockh)
874 {
875         struct obd_device *obd = req->rq_export->exp_obd;
876         struct mds_obd *mds = &obd->u.mds;
877         struct ldlm_reply *rep = NULL;
878         struct lvfs_run_ctxt saved;
879         struct mds_body *body;
880         struct dentry *dparent = NULL, *dchild = NULL;
881         struct lvfs_ucred uc = {NULL,};
882         struct lustre_handle parent_lockh;
883         int namesize;
884         int rc = 0, cleanup_phase = 0, resent_req = 0;
885         char *name;
886         ENTRY;
887
888         LASSERT(!strcmp(obd->obd_type->typ_name, LUSTRE_MDS_NAME));
889
890         /* Swab now, before anyone looks inside the request */
891         body = lustre_swab_reqbuf(req, offset, sizeof(*body),
892                                   lustre_swab_mds_body);
893         if (body == NULL) {
894                 CERROR("Can't swab mds_body\n");
895                 RETURN(-EFAULT);
896         }
897
898         lustre_set_req_swabbed(req, offset + 1);
899         name = lustre_msg_string(req->rq_reqmsg, offset + 1, 0);
900         if (name == NULL) {
901                 CERROR("Can't unpack name\n");
902                 RETURN(-EFAULT);
903         }
904         namesize = lustre_msg_buflen(req->rq_reqmsg, offset + 1);
905         /* namesize less than 2 means we have empty name, probably came from
906            revalidate by cfid, so no point in having name to be set */
907         if (namesize <= 1)
908                 name = NULL;
909
910         rc = mds_init_ucred(&uc, req, offset);
911         if (rc)
912                 GOTO(cleanup, rc);
913
914         LASSERT(offset == REQ_REC_OFF || offset == DLM_INTENT_REC_OFF);
915         /* if requests were at offset 2, the getattr reply goes back at 1 */
916         if (offset == DLM_INTENT_REC_OFF) {
917                 rep = lustre_msg_buf(req->rq_repmsg, DLM_LOCKREPLY_OFF,
918                                      sizeof(*rep));
919                 offset = DLM_REPLY_REC_OFF;
920         }
921
922         push_ctxt(&saved, &obd->obd_lvfs_ctxt, &uc);
923         cleanup_phase = 1; /* kernel context */
924         intent_set_disposition(rep, DISP_LOOKUP_EXECD);
925
926         /* FIXME: handle raw lookup */
927 #if 0
928         if (body->valid == OBD_MD_FLID) {
929                 struct mds_body *mds_reply;
930                 int size = sizeof(*mds_reply);
931                 ino_t inum;
932                 // The user requested ONLY the inode number, so do a raw lookup
933                 rc = lustre_pack_reply(req, 1, &size, NULL);
934                 if (rc) {
935                         CERROR("out of memory\n");
936                         GOTO(cleanup, rc);
937                 }
938
939                 rc = dir->i_op->lookup_raw(dir, name, namesize - 1, &inum);
940
941                 mds_reply = lustre_msg_buf(req->rq_repmsg, offset,
942                                            sizeof(*mds_reply));
943                 mds_reply->fid1.id = inum;
944                 mds_reply->valid = OBD_MD_FLID;
945                 GOTO(cleanup, rc);
946         }
947 #endif
948
949         if (lustre_handle_is_used(child_lockh)) {
950                 LASSERT(lustre_msg_get_flags(req->rq_reqmsg) & MSG_RESENT);
951                 resent_req = 1;
952         }
953
954         if (resent_req == 0) {
955                 if (name) {
956                         OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_RESEND, obd_timeout*2);
957                         rc = mds_get_parent_child_locked(obd, &obd->u.mds, 
958                                                          &body->fid1,
959                                                          &parent_lockh, 
960                                                          &dparent, LCK_CR,
961                                                          MDS_INODELOCK_UPDATE,
962                                                          name, namesize,
963                                                          child_lockh, &dchild,
964                                                          LCK_CR, child_part);
965                 } else {
966                         /* For revalidate by fid we always take UPDATE lock */
967                         dchild = mds_fid2locked_dentry(obd, &body->fid2, NULL,
968                                                        LCK_CR, child_lockh,
969                                                        NULL, 0, child_part);
970                         LASSERT(dchild);
971                         if (IS_ERR(dchild))
972                                 rc = PTR_ERR(dchild);
973                 } 
974                 if (rc)
975                         GOTO(cleanup, rc);
976         } else {
977                 struct ldlm_lock *granted_lock;
978                 struct ll_fid child_fid;
979                 struct ldlm_resource *res;
980                 DEBUG_REQ(D_DLMTRACE, req, "resent, not enqueuing new locks");
981                 granted_lock = ldlm_handle2lock(child_lockh);
982                 LASSERTF(granted_lock != NULL, LPU64"/%u lockh "LPX64"\n",
983                          body->fid1.id, body->fid1.generation,
984                          child_lockh->cookie);
985
986
987                 res = granted_lock->l_resource;
988                 child_fid.id = res->lr_name.name[0];
989                 child_fid.generation = res->lr_name.name[1];
990                 dchild = mds_fid2dentry(&obd->u.mds, &child_fid, NULL);
991                 LASSERT(!IS_ERR(dchild));
992                 LDLM_LOCK_PUT(granted_lock);
993         }
994
995         cleanup_phase = 2; /* dchild, dparent, locks */
996
997         if (dchild->d_inode == NULL) {
998                 intent_set_disposition(rep, DISP_LOOKUP_NEG);
999                 /* in the intent case, the policy clears this error:
1000                    the disposition is enough */
1001                 GOTO(cleanup, rc = -ENOENT);
1002         } else {
1003                 intent_set_disposition(rep, DISP_LOOKUP_POS);
1004         }
1005
1006         if (req->rq_repmsg == NULL) {
1007                 rc = mds_getattr_pack_msg(req, dchild->d_inode, offset);
1008                 if (rc != 0) {
1009                         CERROR ("mds_getattr_pack_msg: %d\n", rc);
1010                         GOTO (cleanup, rc);
1011                 }
1012         }
1013
1014         rc = mds_getattr_internal(obd, dchild, req, body, offset);
1015         GOTO(cleanup, rc); /* returns the lock to the client */
1016
1017  cleanup:
1018         switch (cleanup_phase) {
1019         case 2:
1020                 if (resent_req == 0) {
1021                         if (rc && dchild->d_inode)
1022                                 ldlm_lock_decref(child_lockh, LCK_CR);
1023                         if (name) {
1024                                 ldlm_lock_decref(&parent_lockh, LCK_CR);
1025                                 l_dput(dparent);
1026                         }
1027                 }
1028                 l_dput(dchild);
1029         case 1:
1030                 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, &uc);
1031         default:
1032                 mds_exit_ucred(&uc, mds);
1033                 if (!req->rq_packed_final) {
1034                         req->rq_status = rc;
1035                         lustre_pack_reply(req, 1, NULL, NULL);
1036                 }
1037         }
1038         return rc;
1039 }
1040
1041 static int mds_getattr(struct ptlrpc_request *req, int offset)
1042 {
1043         struct mds_obd *mds = mds_req2mds(req);
1044         struct obd_device *obd = req->rq_export->exp_obd;
1045         struct lvfs_run_ctxt saved;
1046         struct dentry *de;
1047         struct mds_body *body;
1048         struct lvfs_ucred uc = { NULL, };
1049         int rc = 0;
1050         ENTRY;
1051
1052         OBD_COUNTER_INCREMENT(obd, getattr);
1053
1054         body = lustre_swab_reqbuf(req, offset, sizeof(*body),
1055                                   lustre_swab_mds_body);
1056         if (body == NULL)
1057                 RETURN(-EFAULT);
1058
1059         rc = mds_init_ucred(&uc, req, offset);
1060         if (rc)
1061                 GOTO(out_ucred, rc);
1062
1063         push_ctxt(&saved, &obd->obd_lvfs_ctxt, &uc);
1064         de = mds_fid2dentry(mds, &body->fid1, NULL);
1065         if (IS_ERR(de)) {
1066                 rc = req->rq_status = PTR_ERR(de);
1067                 GOTO(out_pop, rc);
1068         }
1069
1070         rc = mds_getattr_pack_msg(req, de->d_inode, offset);
1071         if (rc != 0) {
1072                 CERROR("mds_getattr_pack_msg: %d\n", rc);
1073                 GOTO(out_pop, rc);
1074         }
1075
1076         req->rq_status = mds_getattr_internal(obd, de, req, body,REPLY_REC_OFF);
1077
1078         l_dput(de);
1079         GOTO(out_pop, rc);
1080 out_pop:
1081         pop_ctxt(&saved, &obd->obd_lvfs_ctxt, &uc);
1082 out_ucred:
1083         if (!req->rq_packed_final) {
1084                 req->rq_status = rc;
1085                 lustre_pack_reply(req, 1, NULL, NULL);
1086         }
1087         mds_exit_ucred(&uc, mds);
1088         return rc;
1089 }
1090
1091 static int mds_obd_statfs(struct obd_device *obd, struct obd_statfs *osfs,
1092                           __u64 max_age)
1093 {
1094         int rc;
1095
1096         spin_lock(&obd->obd_osfs_lock);
1097         rc = fsfilt_statfs(obd, obd->u.obt.obt_sb, max_age);
1098         if (rc == 0)
1099                 memcpy(osfs, &obd->obd_osfs, sizeof(*osfs));
1100         spin_unlock(&obd->obd_osfs_lock);
1101
1102         return rc;
1103 }
1104
1105 static int mds_statfs(struct ptlrpc_request *req)
1106 {
1107         struct obd_device *obd = req->rq_export->exp_obd;
1108         struct ptlrpc_service *svc = req->rq_rqbd->rqbd_service;
1109         int rc, size[2] = { sizeof(struct ptlrpc_body),
1110                             sizeof(struct obd_statfs) };
1111         ENTRY;
1112
1113         /* This will trigger a watchdog timeout */
1114         OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_STATFS_LCW_SLEEP,
1115                          (MDS_SERVICE_WATCHDOG_FACTOR * 
1116                           at_get(&svc->srv_at_estimate) / 1000) + 1);
1117         OBD_COUNTER_INCREMENT(obd, statfs);
1118
1119         rc = lustre_pack_reply(req, 2, size, NULL);
1120         if (rc || OBD_FAIL_CHECK(OBD_FAIL_MDS_STATFS_PACK)) {
1121                 CERROR("mds: statfs lustre_pack_reply failed: rc = %d\n", rc);
1122                 GOTO(out, rc);
1123         }
1124
1125         /* We call this so that we can cache a bit - 1 jiffie worth */
1126         rc = mds_obd_statfs(obd, lustre_msg_buf(req->rq_repmsg, REPLY_REC_OFF,
1127                                                 size[REPLY_REC_OFF]),
1128                             cfs_time_current_64() - HZ);
1129         if (rc) {
1130                 CERROR("mds_obd_statfs failed: rc %d\n", rc);
1131                 GOTO(out, rc);
1132         }
1133
1134         EXIT;
1135 out:
1136         req->rq_status = rc;
1137         return 0;
1138 }
1139
1140 static int mds_sync(struct ptlrpc_request *req, int offset)
1141 {
1142         struct obd_device *obd = req->rq_export->exp_obd;
1143         struct mds_obd *mds = &obd->u.mds;
1144         struct mds_body *body;
1145         int rc, size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
1146         ENTRY;
1147
1148         body = lustre_swab_reqbuf(req, offset, sizeof(*body),
1149                                   lustre_swab_mds_body);
1150         if (body == NULL)
1151                 GOTO(out, rc = -EFAULT);
1152
1153         rc = lustre_pack_reply(req, 2, size, NULL);
1154         if (rc || OBD_FAIL_CHECK(OBD_FAIL_MDS_SYNC_PACK)) {
1155                 CERROR("fsync lustre_pack_reply failed: rc = %d\n", rc);
1156                 GOTO(out, rc);
1157         }
1158
1159         if (body->fid1.id == 0) {
1160                 /* a fid of zero is taken to mean "sync whole filesystem" */
1161                 rc = fsfilt_sync(obd, obd->u.obt.obt_sb);
1162                 GOTO(out, rc);
1163         } else {
1164                 struct dentry *de;
1165
1166                 de = mds_fid2dentry(mds, &body->fid1, NULL);
1167                 if (IS_ERR(de))
1168                         GOTO(out, rc = PTR_ERR(de));
1169
1170                 /* The file parameter isn't used for anything */
1171                 if (de->d_inode->i_fop && de->d_inode->i_fop->fsync)
1172                         rc = de->d_inode->i_fop->fsync(NULL, de, 1);
1173                 if (rc == 0) {
1174                         body = lustre_msg_buf(req->rq_repmsg, REPLY_REC_OFF,
1175                                               sizeof(*body));
1176                         mds_pack_inode2fid(&body->fid1, de->d_inode);
1177                         mds_pack_inode2body(body, de->d_inode);
1178                 }
1179
1180                 l_dput(de);
1181                 GOTO(out, rc);
1182         }
1183 out:
1184         req->rq_status = rc;
1185         return 0;
1186 }
1187
1188 /* mds_readpage does not take a DLM lock on the inode, because the client must
1189  * already have a PR lock.
1190  *
1191  * If we were to take another one here, a deadlock will result, if another
1192  * thread is already waiting for a PW lock. */
1193 static int mds_readpage(struct ptlrpc_request *req, int offset)
1194 {
1195         struct obd_device *obd = req->rq_export->exp_obd;
1196         struct mds_obd *mds = &obd->u.mds;
1197         struct vfsmount *mnt;
1198         struct dentry *de;
1199         struct file *file;
1200         struct mds_body *body, *repbody;
1201         struct lvfs_run_ctxt saved;
1202         int rc, size[2] = { sizeof(struct ptlrpc_body), sizeof(*repbody) };
1203         struct lvfs_ucred uc = {NULL,};
1204         ENTRY;
1205
1206         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_READPAGE_PACK))
1207                 RETURN(-ENOMEM);
1208
1209         rc = lustre_pack_reply(req, 2, size, NULL);
1210         if (rc) {
1211                 CERROR("error packing readpage reply: rc %d\n", rc);
1212                 GOTO(out, rc);
1213         }
1214
1215         body = lustre_swab_reqbuf(req, offset, sizeof(*body),
1216                                   lustre_swab_mds_body);
1217         if (body == NULL)
1218                 GOTO (out, rc = -EFAULT);
1219
1220         rc = mds_init_ucred(&uc, req, offset);
1221         if (rc)
1222                 GOTO(out, rc);
1223
1224         push_ctxt(&saved, &obd->obd_lvfs_ctxt, &uc);
1225         de = mds_fid2dentry(&obd->u.mds, &body->fid1, &mnt);
1226         if (IS_ERR(de))
1227                 GOTO(out_pop, rc = PTR_ERR(de));
1228
1229         CDEBUG(D_INODE, "ino %lu\n", de->d_inode->i_ino);
1230
1231         file = dentry_open(de, mnt, O_RDONLY | O_LARGEFILE);
1232         /* note: in case of an error, dentry_open puts dentry */
1233         if (IS_ERR(file))
1234                 GOTO(out_pop, rc = PTR_ERR(file));
1235
1236         /* body->size is actually the offset -eeb */
1237         if ((body->size & (de->d_inode->i_sb->s_blocksize - 1)) != 0) {
1238                 CERROR("offset "LPU64" not on a block boundary of %lu\n",
1239                        body->size, de->d_inode->i_sb->s_blocksize);
1240                 GOTO(out_file, rc = -EFAULT);
1241         }
1242
1243         /* body->nlink is actually the #bytes to read -eeb */
1244         if (body->nlink & (de->d_inode->i_sb->s_blocksize - 1)) {
1245                 CERROR("size %u is not multiple of blocksize %lu\n",
1246                        body->nlink, de->d_inode->i_sb->s_blocksize);
1247                 GOTO(out_file, rc = -EFAULT);
1248         }
1249
1250         repbody = lustre_msg_buf(req->rq_repmsg, REPLY_REC_OFF,
1251                                  sizeof(*repbody));
1252         repbody->size = i_size_read(file->f_dentry->d_inode);
1253         repbody->valid = OBD_MD_FLSIZE;
1254
1255         /* to make this asynchronous make sure that the handling function
1256            doesn't send a reply when this function completes. Instead a
1257            callback function would send the reply */
1258         /* body->size is actually the offset -eeb */
1259         rc = mds_sendpage(req, file, body->size, body->nlink);
1260
1261 out_file:
1262         filp_close(file, 0);
1263 out_pop:
1264         pop_ctxt(&saved, &obd->obd_lvfs_ctxt, &uc);
1265 out:
1266         mds_exit_ucred(&uc, mds);
1267         req->rq_status = rc;
1268         RETURN(0);
1269 }
1270
1271 int mds_reint(struct ptlrpc_request *req, int offset,
1272               struct lustre_handle *lockh)
1273 {
1274         struct mds_update_record *rec; /* 116 bytes on the stack?  no sir! */
1275         int rc;
1276
1277         OBD_ALLOC(rec, sizeof(*rec));
1278         if (rec == NULL)
1279                 RETURN(-ENOMEM);
1280
1281         rc = mds_update_unpack(req, offset, rec);
1282         if (rc || OBD_FAIL_CHECK(OBD_FAIL_MDS_REINT_UNPACK)) {
1283                 CERROR("invalid record\n");
1284                 GOTO(out, req->rq_status = -EINVAL);
1285         }
1286
1287         /* rc will be used to interrupt a for loop over multiple records */
1288         rc = mds_reint_rec(rec, offset, req, lockh);
1289  out:
1290         OBD_FREE(rec, sizeof(*rec));
1291         return rc;
1292 }
1293
1294 static int mds_filter_recovery_request(struct ptlrpc_request *req,
1295                                        struct obd_device *obd, int *process)
1296 {
1297         switch (lustre_msg_get_opc(req->rq_reqmsg)) {
1298         case MDS_CONNECT: /* This will never get here, but for completeness. */
1299         case OST_CONNECT: /* This will never get here, but for completeness. */
1300         case MDS_DISCONNECT:
1301         case OST_DISCONNECT:
1302                *process = 1;
1303                RETURN(0);
1304
1305         case MDS_CLOSE:
1306         case MDS_SYNC: /* used in unmounting */
1307         case OBD_PING:
1308         case MDS_REINT:
1309         case LDLM_ENQUEUE:
1310                 *process = target_queue_recovery_request(req, obd);
1311                 RETURN(0);
1312
1313         default:
1314                 DEBUG_REQ(D_ERROR, req, "not permitted during recovery");
1315                 *process = 0;
1316                 /* XXX what should we set rq_status to here? */
1317                 req->rq_status = -EAGAIN;
1318                 RETURN(ptlrpc_error(req));
1319         }
1320 }
1321
1322 static char *reint_names[] = {
1323         [REINT_SETATTR] "setattr",
1324         [REINT_CREATE]  "create",
1325         [REINT_LINK]    "link",
1326         [REINT_UNLINK]  "unlink",
1327         [REINT_RENAME]  "rename",
1328         [REINT_OPEN]    "open",
1329 };
1330
1331 static int mds_set_info_rpc(struct obd_export *exp, struct ptlrpc_request *req)
1332 {
1333         void *key, *val;
1334         int keylen, vallen, rc = 0;
1335         ENTRY;
1336
1337         key = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, 1);
1338         if (key == NULL) {
1339                 DEBUG_REQ(D_HA, req, "no set_info key");
1340                 RETURN(-EFAULT);
1341         }
1342         keylen = lustre_msg_buflen(req->rq_reqmsg, REQ_REC_OFF);
1343
1344         val = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 1, 0);
1345         vallen = lustre_msg_buflen(req->rq_reqmsg, REQ_REC_OFF + 1);
1346
1347         rc = lustre_pack_reply(req, 1, NULL, NULL);
1348         if (rc)
1349                 RETURN(rc);
1350         lustre_msg_set_status(req->rq_repmsg, 0);
1351
1352         if (KEY_IS("read-only")) {
1353                 if (val == NULL || vallen < sizeof(__u32)) {
1354                         DEBUG_REQ(D_HA, req, "no set_info val");
1355                         RETURN(-EFAULT);
1356                 }
1357
1358                 if (*(__u32 *)val)
1359                         exp->exp_connect_flags |= OBD_CONNECT_RDONLY;
1360                 else
1361                         exp->exp_connect_flags &= ~OBD_CONNECT_RDONLY;
1362         } else {
1363                 RETURN(-EINVAL);
1364         }
1365
1366         RETURN(0);
1367 }
1368
1369 static int mds_handle_quotacheck(struct ptlrpc_request *req)
1370 {
1371         struct obd_quotactl *oqctl;
1372         int rc;
1373         ENTRY;
1374
1375         oqctl = lustre_swab_reqbuf(req, REQ_REC_OFF, sizeof(*oqctl),
1376                                    lustre_swab_obd_quotactl);
1377         if (oqctl == NULL)
1378                 RETURN(-EPROTO);
1379
1380         rc = lustre_pack_reply(req, 1, NULL, NULL);
1381         if (rc) {
1382                 CERROR("mds: out of memory while packing quotacheck reply\n");
1383                 RETURN(rc);
1384         }
1385
1386         req->rq_status = obd_quotacheck(req->rq_export, oqctl);
1387         RETURN(0);
1388 }
1389
1390 static int mds_handle_quotactl(struct ptlrpc_request *req)
1391 {
1392         struct obd_quotactl *oqctl, *repoqc;
1393         int rc, size[2] = { sizeof(struct ptlrpc_body), sizeof(*repoqc) };
1394         ENTRY;
1395
1396         oqctl = lustre_swab_reqbuf(req, REQ_REC_OFF, sizeof(*oqctl),
1397                                    lustre_swab_obd_quotactl);
1398         if (oqctl == NULL)
1399                 RETURN(-EPROTO);
1400
1401         rc = lustre_pack_reply(req, 2, size, NULL);
1402         if (rc)
1403                 RETURN(rc);
1404
1405         repoqc = lustre_msg_buf(req->rq_repmsg, REPLY_REC_OFF, sizeof(*repoqc));
1406
1407         req->rq_status = obd_quotactl(req->rq_export, oqctl);
1408         *repoqc = *oqctl;
1409         RETURN(0);
1410 }
1411
1412 static int mds_msg_check_version(struct lustre_msg *msg)
1413 {
1414         int rc;
1415
1416         switch (lustre_msg_get_opc(msg)) {
1417         case MDS_CONNECT:
1418         case MDS_DISCONNECT:
1419         case OBD_PING:
1420                 rc = lustre_msg_check_version(msg, LUSTRE_OBD_VERSION);
1421                 if (rc)
1422                         CERROR("bad opc %u version %08x, expecting %08x\n",
1423                                lustre_msg_get_opc(msg),
1424                                lustre_msg_get_version(msg),
1425                                LUSTRE_OBD_VERSION);
1426                 break;
1427         case MDS_GETSTATUS:
1428         case MDS_GETATTR:
1429         case MDS_GETATTR_NAME:
1430         case MDS_STATFS:
1431         case MDS_READPAGE:
1432         case MDS_REINT:
1433         case MDS_CLOSE:
1434         case MDS_DONE_WRITING:
1435         case MDS_PIN:
1436         case MDS_SYNC:
1437         case MDS_GETXATTR:
1438         case MDS_SETXATTR:
1439         case MDS_SET_INFO:
1440         case MDS_QUOTACHECK:
1441         case MDS_QUOTACTL:
1442         case QUOTA_DQACQ:
1443         case QUOTA_DQREL:
1444                 rc = lustre_msg_check_version(msg, LUSTRE_MDS_VERSION);
1445                 if (rc)
1446                         CERROR("bad opc %u version %08x, expecting %08x\n",
1447                                lustre_msg_get_opc(msg),
1448                                lustre_msg_get_version(msg),
1449                                LUSTRE_MDS_VERSION);
1450                 break;
1451         case LDLM_ENQUEUE:
1452         case LDLM_CONVERT:
1453         case LDLM_BL_CALLBACK:
1454         case LDLM_CP_CALLBACK:
1455                 rc = lustre_msg_check_version(msg, LUSTRE_DLM_VERSION);
1456                 if (rc)
1457                         CERROR("bad opc %u version %08x, expecting %08x\n",
1458                                lustre_msg_get_opc(msg),
1459                                lustre_msg_get_version(msg),
1460                                LUSTRE_DLM_VERSION);
1461                 break;
1462         case OBD_LOG_CANCEL:
1463         case LLOG_ORIGIN_HANDLE_CREATE:
1464         case LLOG_ORIGIN_HANDLE_NEXT_BLOCK:
1465         case LLOG_ORIGIN_HANDLE_READ_HEADER:
1466         case LLOG_ORIGIN_HANDLE_CLOSE:
1467         case LLOG_ORIGIN_HANDLE_DESTROY:
1468         case LLOG_ORIGIN_HANDLE_PREV_BLOCK:
1469         case LLOG_CATINFO:
1470                 rc = lustre_msg_check_version(msg, LUSTRE_LOG_VERSION);
1471                 if (rc)
1472                         CERROR("bad opc %u version %08x, expecting %08x\n",
1473                                lustre_msg_get_opc(msg),
1474                                lustre_msg_get_version(msg),
1475                                LUSTRE_LOG_VERSION);
1476                 break;
1477         default:
1478                 CERROR("MDS unknown opcode %d\n", lustre_msg_get_opc(msg));
1479                 rc = -ENOTSUPP;
1480         }
1481         return rc;
1482 }
1483
1484 int mds_handle(struct ptlrpc_request *req)
1485 {
1486         int should_process, fail = OBD_FAIL_MDS_ALL_REPLY_NET;
1487         int rc = 0;
1488         struct mds_obd *mds = NULL; /* quell gcc overwarning */
1489         struct obd_device *obd = NULL;
1490         ENTRY;
1491
1492         OBD_FAIL_RETURN(OBD_FAIL_MDS_ALL_REQUEST_NET | OBD_FAIL_ONCE, 0);
1493
1494         LASSERT(current->journal_info == NULL);
1495
1496         rc = mds_msg_check_version(req->rq_reqmsg);
1497         if (rc) {
1498                 CERROR("MDS drop mal-formed request\n");
1499                 RETURN(rc);
1500         }
1501
1502         /* XXX identical to OST */
1503         if (lustre_msg_get_opc(req->rq_reqmsg) != MDS_CONNECT) {
1504                 struct mds_export_data *med;
1505                 int recovering, abort_recovery;
1506
1507                 if (req->rq_export == NULL) {
1508                         CERROR("operation %d on unconnected MDS from %s\n",
1509                                lustre_msg_get_opc(req->rq_reqmsg),
1510                                libcfs_id2str(req->rq_peer));
1511                         req->rq_status = -ENOTCONN;
1512                         GOTO(out, rc = -ENOTCONN);
1513                 }
1514
1515                 med = &req->rq_export->exp_mds_data;
1516                 obd = req->rq_export->exp_obd;
1517                 mds = &obd->u.mds;
1518
1519                 /* sanity check: if the xid matches, the request must
1520                  * be marked as a resent or replayed */
1521                 if (req->rq_xid == le64_to_cpu(med->med_mcd->mcd_last_xid) ||
1522                    req->rq_xid == le64_to_cpu(med->med_mcd->mcd_last_close_xid))
1523                         if (!(lustre_msg_get_flags(req->rq_reqmsg) &
1524                                  (MSG_RESENT | MSG_REPLAY))) {
1525                                 CERROR("rq_xid "LPU64" matches last_xid, "
1526                                        "expected RESENT flag\n",
1527                                         req->rq_xid);
1528                                 req->rq_status = -ENOTCONN;
1529                                 GOTO(out, rc = -EFAULT);
1530                         }
1531                 /* else: note the opposite is not always true; a
1532                  * RESENT req after a failover will usually not match
1533                  * the last_xid, since it was likely never
1534                  * committed. A REPLAYed request will almost never
1535                  * match the last xid, however it could for a
1536                  * committed, but still retained, open. */
1537
1538                 /* Check for aborted recovery. */
1539                 spin_lock_bh(&obd->obd_processing_task_lock);
1540                 abort_recovery = obd->obd_abort_recovery;
1541                 recovering = obd->obd_recovering;
1542                 spin_unlock_bh(&obd->obd_processing_task_lock);
1543                 if (abort_recovery) {
1544                         target_abort_recovery(obd);
1545                 } else if (recovering) {
1546                         rc = mds_filter_recovery_request(req, obd,
1547                                                          &should_process);
1548                         if (rc || !should_process)
1549                                 RETURN(rc);
1550                 }
1551         }
1552
1553         switch (lustre_msg_get_opc(req->rq_reqmsg)) {
1554         case MDS_CONNECT:
1555                 DEBUG_REQ(D_INODE, req, "connect");
1556                 OBD_FAIL_RETURN(OBD_FAIL_MDS_CONNECT_NET, 0);
1557                 rc = target_handle_connect(req, mds_handle);
1558                 if (!rc) {
1559                         /* Now that we have an export, set mds. */
1560                         obd = req->rq_export->exp_obd;
1561                         mds = mds_req2mds(req);
1562                 }
1563                 break;
1564
1565         case MDS_DISCONNECT:
1566                 DEBUG_REQ(D_INODE, req, "disconnect");
1567                 OBD_FAIL_RETURN(OBD_FAIL_MDS_DISCONNECT_NET, 0);
1568                 rc = target_handle_disconnect(req);
1569                 req->rq_status = rc;            /* superfluous? */
1570                 break;
1571
1572         case MDS_GETSTATUS:
1573                 DEBUG_REQ(D_INODE, req, "getstatus");
1574                 OBD_FAIL_RETURN(OBD_FAIL_MDS_GETSTATUS_NET, 0);
1575                 rc = mds_getstatus(req);
1576                 break;
1577
1578         case MDS_GETATTR:
1579                 DEBUG_REQ(D_INODE, req, "getattr");
1580                 OBD_FAIL_RETURN(OBD_FAIL_MDS_GETATTR_NET, 0);
1581                 rc = mds_getattr(req, REQ_REC_OFF);
1582                 break;
1583
1584         case MDS_SETXATTR:
1585                 DEBUG_REQ(D_INODE, req, "setxattr");
1586                 OBD_FAIL_RETURN(OBD_FAIL_MDS_SETXATTR_NET, 0);
1587                 rc = mds_setxattr(req);
1588                 break;
1589
1590         case MDS_GETXATTR:
1591                 DEBUG_REQ(D_INODE, req, "getxattr");
1592                 OBD_FAIL_RETURN(OBD_FAIL_MDS_GETXATTR_NET, 0);
1593                 rc = mds_getxattr(req);
1594                 break;
1595
1596         case MDS_GETATTR_NAME: {
1597                 struct lustre_handle lockh = { 0 };
1598                 DEBUG_REQ(D_INODE, req, "getattr_name");
1599                 OBD_FAIL_RETURN(OBD_FAIL_MDS_GETATTR_NAME_NET, 0);
1600
1601                 /* If this request gets a reconstructed reply, we won't be
1602                  * acquiring any new locks in mds_getattr_lock, so we don't
1603                  * want to cancel.
1604                  */
1605                 rc = mds_getattr_lock(req, REQ_REC_OFF, MDS_INODELOCK_UPDATE,
1606                                       &lockh);
1607                 /* this non-intent call (from an ioctl) is special */
1608                 req->rq_status = rc;
1609                 if (rc == 0 && lustre_handle_is_used(&lockh))
1610                         ldlm_lock_decref(&lockh, LCK_CR);
1611                 break;
1612         }
1613         case MDS_STATFS:
1614                 DEBUG_REQ(D_INODE, req, "statfs");
1615                 OBD_FAIL_RETURN(OBD_FAIL_MDS_STATFS_NET, 0);
1616                 rc = mds_statfs(req);
1617                 break;
1618
1619         case MDS_READPAGE:
1620                 DEBUG_REQ(D_INODE, req, "readpage");
1621                 OBD_FAIL_RETURN(OBD_FAIL_MDS_READPAGE_NET, 0);
1622                 rc = mds_readpage(req, REQ_REC_OFF);
1623
1624                 if (OBD_FAIL_CHECK_ONCE(OBD_FAIL_MDS_SENDPAGE)) {
1625                         RETURN(0);
1626                 }
1627
1628                 break;
1629
1630         case MDS_REINT: {
1631                 __u32 *opcp = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF,
1632                                              sizeof(*opcp));
1633                 __u32  opc;
1634                 int size[4] = { sizeof(struct ptlrpc_body),
1635                                 sizeof(struct mds_body),
1636                                 mds->mds_max_mdsize,
1637                                 mds->mds_max_cookiesize };
1638                 int bufcount;
1639
1640                 /* NB only peek inside req now; mds_reint() will swab it */
1641                 if (opcp == NULL) {
1642                         CERROR ("Can't inspect opcode\n");
1643                         rc = -EINVAL;
1644                         break;
1645                 }
1646                 opc = *opcp;
1647                 if (lustre_msg_swabbed(req->rq_reqmsg))
1648                         __swab32s(&opc);
1649
1650                 DEBUG_REQ(D_INODE, req, "reint %d (%s)", opc,
1651                           (opc < sizeof(reint_names) / sizeof(reint_names[0]) ||
1652                            reint_names[opc] == NULL) ? reint_names[opc] :
1653                                                        "unknown opcode");
1654
1655                 OBD_FAIL_RETURN(OBD_FAIL_MDS_REINT_NET, 0);
1656
1657                 if (opc == REINT_UNLINK || opc == REINT_RENAME)
1658                         bufcount = 4;
1659                 else if (opc == REINT_OPEN)
1660                         bufcount = 3;
1661                 else
1662                         bufcount = 2;
1663
1664                 rc = lustre_pack_reply(req, bufcount, size, NULL);
1665                 if (rc)
1666                         break;
1667
1668                 rc = mds_reint(req, REQ_REC_OFF, NULL);
1669                 fail = OBD_FAIL_MDS_REINT_NET_REP;
1670                 break;
1671         }
1672
1673         case MDS_CLOSE:
1674                 DEBUG_REQ(D_INODE, req, "close");
1675                 OBD_FAIL_RETURN(OBD_FAIL_MDS_CLOSE_NET, 0);
1676                 rc = mds_close(req, REQ_REC_OFF);
1677                 fail = OBD_FAIL_MDS_CLOSE_NET_REP;
1678                 break;
1679
1680         case MDS_DONE_WRITING:
1681                 DEBUG_REQ(D_INODE, req, "done_writing");
1682                 OBD_FAIL_RETURN(OBD_FAIL_MDS_DONE_WRITING_NET, 0);
1683                 rc = mds_done_writing(req, REQ_REC_OFF);
1684                 break;
1685
1686         case MDS_PIN:
1687                 DEBUG_REQ(D_INODE, req, "pin");
1688                 OBD_FAIL_RETURN(OBD_FAIL_MDS_PIN_NET, 0);
1689                 rc = mds_pin(req, REQ_REC_OFF);
1690                 break;
1691
1692         case MDS_SYNC:
1693                 DEBUG_REQ(D_INODE, req, "sync");
1694                 OBD_FAIL_RETURN(OBD_FAIL_MDS_SYNC_NET, 0);
1695                 rc = mds_sync(req, REQ_REC_OFF);
1696                 break;
1697
1698         case MDS_SET_INFO:
1699                 DEBUG_REQ(D_INODE, req, "set_info");
1700                 rc = mds_set_info_rpc(req->rq_export, req);
1701                 break;
1702
1703         case MDS_QUOTACHECK:
1704                 DEBUG_REQ(D_INODE, req, "quotacheck");
1705                 OBD_FAIL_RETURN(OBD_FAIL_MDS_QUOTACHECK_NET, 0);
1706                 rc = mds_handle_quotacheck(req);
1707                 break;
1708
1709         case MDS_QUOTACTL:
1710                 DEBUG_REQ(D_INODE, req, "quotactl");
1711                 OBD_FAIL_RETURN(OBD_FAIL_MDS_QUOTACTL_NET, 0);
1712                 rc = mds_handle_quotactl(req);
1713                 break;
1714
1715         case OBD_PING:
1716                 DEBUG_REQ(D_INODE, req, "ping");
1717                 rc = target_handle_ping(req);
1718                 break;
1719
1720         case OBD_LOG_CANCEL:
1721                 CDEBUG(D_INODE, "log cancel\n");
1722                 OBD_FAIL_RETURN(OBD_FAIL_OBD_LOG_CANCEL_NET, 0);
1723                 rc = -ENOTSUPP; /* la la la */
1724                 break;
1725
1726         case LDLM_ENQUEUE:
1727                 DEBUG_REQ(D_INODE, req, "enqueue");
1728                 OBD_FAIL_RETURN(OBD_FAIL_LDLM_ENQUEUE, 0);
1729                 rc = ldlm_handle_enqueue(req, ldlm_server_completion_ast,
1730                                          ldlm_server_blocking_ast, NULL);
1731                 fail = OBD_FAIL_LDLM_REPLY;
1732                 break;
1733         case LDLM_CONVERT:
1734                 DEBUG_REQ(D_INODE, req, "convert");
1735                 OBD_FAIL_RETURN(OBD_FAIL_LDLM_CONVERT, 0);
1736                 rc = ldlm_handle_convert(req);
1737                 break;
1738         case LDLM_BL_CALLBACK:
1739         case LDLM_CP_CALLBACK:
1740                 DEBUG_REQ(D_INODE, req, "callback");
1741                 CERROR("callbacks should not happen on MDS\n");
1742                 LBUG();
1743                 OBD_FAIL_RETURN(OBD_FAIL_LDLM_BL_CALLBACK, 0);
1744                 break;
1745         case LLOG_ORIGIN_HANDLE_CREATE:
1746                 DEBUG_REQ(D_INODE, req, "llog_init");
1747                 OBD_FAIL_RETURN(OBD_FAIL_OBD_LOGD_NET, 0);
1748                 rc = llog_origin_handle_create(req);
1749                 break;
1750         case LLOG_ORIGIN_HANDLE_DESTROY:
1751                 DEBUG_REQ(D_INODE, req, "llog_init");
1752                 OBD_FAIL_RETURN(OBD_FAIL_OBD_LOGD_NET, 0);
1753                 rc = llog_origin_handle_destroy(req);
1754                 break;
1755         case LLOG_ORIGIN_HANDLE_NEXT_BLOCK:
1756                 DEBUG_REQ(D_INODE, req, "llog next block");
1757                 OBD_FAIL_RETURN(OBD_FAIL_OBD_LOGD_NET, 0);
1758                 rc = llog_origin_handle_next_block(req);
1759                 break;
1760         case LLOG_ORIGIN_HANDLE_PREV_BLOCK:
1761                 DEBUG_REQ(D_INODE, req, "llog prev block");
1762                 OBD_FAIL_RETURN(OBD_FAIL_OBD_LOGD_NET, 0);
1763                 rc = llog_origin_handle_prev_block(req);
1764                 break;
1765         case LLOG_ORIGIN_HANDLE_READ_HEADER:
1766                 DEBUG_REQ(D_INODE, req, "llog read header");
1767                 OBD_FAIL_RETURN(OBD_FAIL_OBD_LOGD_NET, 0);
1768                 rc = llog_origin_handle_read_header(req);
1769                 break;
1770         case LLOG_ORIGIN_HANDLE_CLOSE:
1771                 DEBUG_REQ(D_INODE, req, "llog close");
1772                 OBD_FAIL_RETURN(OBD_FAIL_OBD_LOGD_NET, 0);
1773                 rc = llog_origin_handle_close(req);
1774                 break;
1775         case LLOG_CATINFO:
1776                 DEBUG_REQ(D_INODE, req, "llog catinfo");
1777                 OBD_FAIL_RETURN(OBD_FAIL_OBD_LOGD_NET, 0);
1778                 rc = llog_catinfo(req);
1779                 break;
1780         default:
1781                 req->rq_status = -ENOTSUPP;
1782                 rc = ptlrpc_error(req);
1783                 RETURN(rc);
1784         }
1785
1786         LASSERT(current->journal_info == NULL);
1787
1788         /* If we're DISCONNECTing, the mds_export_data is already freed */
1789         if (!rc && lustre_msg_get_opc(req->rq_reqmsg) != MDS_DISCONNECT) {
1790                 struct mds_export_data *med = &req->rq_export->exp_mds_data;
1791                 
1792                 /* I don't think last_xid is used for anyway, so I'm not sure
1793                    if we need to care about last_close_xid here.*/
1794                 lustre_msg_set_last_xid(req->rq_repmsg,
1795                                        le64_to_cpu(med->med_mcd->mcd_last_xid));
1796
1797                 target_committed_to_req(req);
1798         }
1799
1800         EXIT;
1801  out:
1802
1803         if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_LAST_REPLAY) {
1804                 if (obd && obd->obd_recovering) {
1805                         DEBUG_REQ(D_HA, req, "LAST_REPLAY, queuing reply");
1806                         return target_queue_last_replay_reply(req, rc);
1807                 }
1808                 /* Lost a race with recovery; let the error path DTRT. */
1809                 rc = req->rq_status = -ENOTCONN;
1810         }
1811
1812         target_send_reply(req, rc, fail);
1813         return 0;
1814 }
1815
1816 /* Update the server data on disk.  This stores the new mount_count and
1817  * also the last_rcvd value to disk.  If we don't have a clean shutdown,
1818  * then the server last_rcvd value may be less than that of the clients.
1819  * This will alert us that we may need to do client recovery.
1820  *
1821  * Also assumes for mds_last_transno that we are not modifying it (no locking).
1822  */
1823 int mds_update_server_data(struct obd_device *obd, int force_sync)
1824 {
1825         struct mds_obd *mds = &obd->u.mds;
1826         struct lr_server_data *lsd = mds->mds_server_data;
1827         struct file *filp = mds->mds_rcvd_filp;
1828         struct lvfs_run_ctxt saved;
1829         loff_t off = 0;
1830         int rc;
1831         ENTRY;
1832
1833         CDEBUG(D_SUPER, "MDS mount_count is "LPU64", last_transno is "LPU64"\n",
1834                mds->mds_mount_count, mds->mds_last_transno);
1835
1836         spin_lock(&mds->mds_transno_lock);
1837         lsd->lsd_last_transno = cpu_to_le64(mds->mds_last_transno);
1838         spin_unlock(&mds->mds_transno_lock);
1839
1840         push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
1841         rc = fsfilt_write_record(obd, filp, lsd, sizeof(*lsd), &off,force_sync);
1842         pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
1843         if (rc)
1844                 CERROR("error writing MDS server data: rc = %d\n", rc);
1845
1846         RETURN(rc);
1847 }
1848
1849 static void fsoptions_to_mds_flags(struct mds_obd *mds, char *options)
1850 {
1851         char *p = options;
1852
1853         if (!options)
1854                 return;
1855
1856         while (*options) {
1857                 int len;
1858
1859                 while (*p && *p != ',')
1860                         p++;
1861
1862                 len = p - options;
1863                 if (len == sizeof("user_xattr") - 1 &&
1864                     memcmp(options, "user_xattr", len) == 0) {
1865                         mds->mds_fl_user_xattr = 1;
1866                         LCONSOLE_INFO("Enabling user_xattr\n");
1867                 } else if (len == sizeof("nouser_xattr") - 1 &&
1868                            memcmp(options, "nouser_xattr", len) == 0) {
1869                         mds->mds_fl_user_xattr = 0;
1870                         LCONSOLE_INFO("Disabling user_xattr\n");
1871                 } else if (len == sizeof("acl") - 1 &&
1872                            memcmp(options, "acl", len) == 0) {
1873 #ifdef CONFIG_FS_POSIX_ACL
1874                         mds->mds_fl_acl = 1;
1875                         LCONSOLE_INFO("Enabling ACL\n");
1876 #else
1877                         CWARN("ignoring unsupported acl mount option\n");
1878 #endif
1879                 } else if (len == sizeof("noacl") - 1 &&
1880                            memcmp(options, "noacl", len) == 0) {
1881 #ifdef CONFIG_FS_POSIX_ACL
1882                         mds->mds_fl_acl = 0;
1883                         LCONSOLE_INFO("Disabling ACL\n");
1884 #endif
1885                 }
1886
1887                 options = ++p;
1888         }
1889 }
1890
1891 static int mds_nid_stats_clear_read(char *page, char **start, off_t off,
1892                                     int count, int *eof,  void *data)
1893 {
1894         *eof = 1;
1895         return snprintf(page, count, "%s\n",
1896                         "Write into this file to clear all nid stats and "
1897                         "stale nid entries");
1898 }
1899
1900 static int mds_nid_stats_clear_write(struct file *file, const char *buffer,
1901                                      unsigned long count, void *data)
1902 {
1903         struct obd_device *obd = (struct obd_device *)data;
1904         struct list_head *nids= &obd->obd_proc_nid_list;
1905         nid_stat_t *client_stat = NULL, *nxt;
1906
1907         spin_lock(&obd->nid_lock);
1908
1909         list_for_each_entry_safe (client_stat, nxt, nids, nid_chain) {
1910                 if (!client_stat->nid_exp_ref_count)
1911                         lprocfs_free_client_stats(client_stat);
1912                 else if (client_stat->nid_stats) {
1913                         lprocfs_clear_stats(client_stat->nid_stats);
1914                 }
1915         }
1916
1917         spin_unlock(&obd->nid_lock);
1918
1919         return count;
1920 }
1921
1922
1923 /* mount the file system (secretly).  lustre_cfg parameters are:
1924  * 1 = device
1925  * 2 = fstype
1926  * 3 = config name
1927  * 4 = mount options
1928  */
1929 static int mds_setup(struct obd_device *obd, obd_count len, void *buf)
1930 {
1931         struct lprocfs_static_vars lvars;
1932         struct lustre_cfg* lcfg = buf;
1933         struct mds_obd *mds = &obd->u.mds;
1934         struct lustre_sb_info *lsi;
1935         struct lustre_mount_info *lmi;
1936         struct vfsmount *mnt;
1937         struct obd_uuid uuid;
1938         __u8 *uuid_ptr;
1939         char *str, *label;
1940         char ns_name[48];
1941         int rc = 0;
1942         ENTRY;
1943
1944         /* setup 1:/dev/loop/0 2:ext3 3:mdsA 4:errors=remount-ro,iopen_nopriv */
1945
1946         CLASSERT(offsetof(struct obd_device, u.obt) ==
1947                  offsetof(struct obd_device, u.mds.mds_obt));
1948
1949         if (lcfg->lcfg_bufcount < 3)
1950                 RETURN(-EINVAL);
1951
1952         if (LUSTRE_CFG_BUFLEN(lcfg, 1) == 0 || LUSTRE_CFG_BUFLEN(lcfg, 2) == 0)
1953                 RETURN(-EINVAL);
1954
1955         lmi = server_get_mount(obd->obd_name);
1956         if (!lmi) {
1957                 CERROR("Not mounted in lustre_fill_super?\n");
1958                 RETURN(-EINVAL);
1959         }
1960
1961         /* We mounted in lustre_fill_super.
1962            lcfg bufs 1, 2, 4 (device, fstype, mount opts) are ignored.*/
1963         lsi = s2lsi(lmi->lmi_sb);
1964         fsoptions_to_mds_flags(mds, lsi->lsi_ldd->ldd_mount_opts);
1965         fsoptions_to_mds_flags(mds, lsi->lsi_lmd->lmd_opts);
1966         mnt = lmi->lmi_mnt;
1967         obd->obd_fsops = fsfilt_get_ops(MT_STR(lsi->lsi_ldd));
1968         if (IS_ERR(obd->obd_fsops))
1969                 GOTO(err_put, rc = PTR_ERR(obd->obd_fsops));
1970
1971         CDEBUG(D_SUPER, "%s: mnt = %p\n", lustre_cfg_string(lcfg, 1), mnt);
1972
1973         LASSERT(!lvfs_check_rdonly(lvfs_sbdev(mnt->mnt_sb)));
1974
1975         sema_init(&mds->mds_epoch_sem, 1);
1976         spin_lock_init(&mds->mds_transno_lock);
1977         mds->mds_max_mdsize = sizeof(struct lov_mds_md);
1978         mds->mds_max_cookiesize = sizeof(struct llog_cookie);
1979         mds->mds_atime_diff = MAX_ATIME_DIFF;
1980         mds->mds_evict_ost_nids = 1;
1981
1982         sprintf(ns_name, "mds-%s", obd->obd_uuid.uuid);
1983         obd->obd_namespace = ldlm_namespace_new(ns_name, LDLM_NAMESPACE_SERVER,
1984                                                 LDLM_NAMESPACE_GREEDY);
1985         if (obd->obd_namespace == NULL) {
1986                 mds_cleanup(obd);
1987                 GOTO(err_ops, rc = -ENOMEM);
1988         }
1989         ldlm_register_intent(obd->obd_namespace, mds_intent_policy);
1990
1991         lprocfs_init_vars(mds, &lvars);
1992         if (lprocfs_obd_setup(obd, lvars.obd_vars) == 0 &&
1993             lprocfs_alloc_obd_stats(obd, LPROC_MDS_LAST) == 0) {
1994                 /* Init private stats here */
1995                 mds_stats_counter_init(obd->obd_stats);
1996                 obd->obd_proc_exports_entry = proc_mkdir("exports",
1997                                                          obd->obd_proc_entry);
1998         }
1999
2000         rc = mds_fs_setup(obd, mnt);
2001         if (rc) {
2002                 CERROR("%s: MDS filesystem method init failed: rc = %d\n",
2003                        obd->obd_name, rc);
2004                 GOTO(err_ns, rc);
2005         }
2006
2007         if (obd->obd_proc_exports_entry)
2008                 lprocfs_add_simple(obd->obd_proc_exports_entry,
2009                                    "clear", mds_nid_stats_clear_read,
2010                                    mds_nid_stats_clear_write, obd);
2011
2012         if (lcfg->lcfg_bufcount >= 4 && LUSTRE_CFG_BUFLEN(lcfg, 3) > 0) {
2013                 class_uuid_t uuid;
2014
2015                 ll_generate_random_uuid(uuid);
2016                 class_uuid_unparse(uuid, &mds->mds_lov_uuid);
2017
2018                 OBD_ALLOC(mds->mds_profile, LUSTRE_CFG_BUFLEN(lcfg, 3));
2019                 if (mds->mds_profile == NULL)
2020                         GOTO(err_fs, rc = -ENOMEM);
2021
2022                 strncpy(mds->mds_profile, lustre_cfg_string(lcfg, 3),
2023                         LUSTRE_CFG_BUFLEN(lcfg, 3));
2024         }
2025
2026         ptlrpc_init_client(LDLM_CB_REQUEST_PORTAL, LDLM_CB_REPLY_PORTAL,
2027                            "mds_ldlm_client", &obd->obd_ldlm_client);
2028         obd->obd_replayable = 1;
2029
2030         rc = lquota_setup(mds_quota_interface_ref, obd);
2031         if (rc)
2032                 GOTO(err_fs, rc);
2033
2034         mds->mds_group_hash = upcall_cache_init(obd->obd_name);
2035         if (IS_ERR(mds->mds_group_hash)) {
2036                 rc = PTR_ERR(mds->mds_group_hash);
2037                 mds->mds_group_hash = NULL;
2038                 GOTO(err_qctxt, rc);
2039         }
2040
2041         /* Don't wait for mds_postrecov trying to clear orphans */
2042         obd->obd_async_recov = 1;
2043         rc = mds_postsetup(obd);
2044         /* Bug 11557 - allow async abort_recov start
2045            FIXME can remove most of this obd_async_recov plumbing
2046         obd->obd_async_recov = 0;
2047         */
2048         if (rc)
2049                 GOTO(err_qctxt, rc);
2050
2051         uuid_ptr = fsfilt_uuid(obd, obd->u.obt.obt_sb);
2052         if (uuid_ptr != NULL) {
2053                 class_uuid_unparse(uuid_ptr, &uuid);
2054                 str = uuid.uuid;
2055         } else {
2056                 str = "no UUID";
2057         }
2058
2059         label = fsfilt_get_label(obd, obd->u.obt.obt_sb);
2060         if (obd->obd_recovering) {
2061                 LCONSOLE_WARN("MDT %s now serving %s (%s%s%s), but will be in "
2062                               "recovery until %d %s reconnect, or if no clients"
2063                               " reconnect for %d:%.02d; during that time new "
2064                               "clients will not be allowed to connect. "
2065                               "Recovery progress can be monitored by watching "
2066                               "/proc/fs/lustre/mds/%s/recovery_status.\n",
2067                               obd->obd_name, lustre_cfg_string(lcfg, 1),
2068                               label ?: "", label ? "/" : "", str,
2069                               obd->obd_recoverable_clients,
2070                               (obd->obd_recoverable_clients == 1) ?
2071                               "client" : "clients",
2072                               obd->obd_recovery_timeout / 60,
2073                               obd->obd_recovery_timeout % 60,
2074                               obd->obd_name);
2075         } else {
2076                 LCONSOLE_INFO("MDT %s now serving %s (%s%s%s) with recovery "
2077                               "%s\n", obd->obd_name, lustre_cfg_string(lcfg, 1),
2078                               label ?: "", label ? "/" : "", str,
2079                               obd->obd_replayable ? "enabled" : "disabled");
2080         }
2081
2082         if (ldlm_timeout == LDLM_TIMEOUT_DEFAULT)
2083                 ldlm_timeout = 6;
2084
2085         RETURN(0);
2086
2087 err_qctxt:
2088         lquota_cleanup(mds_quota_interface_ref, obd);
2089 err_fs:
2090         /* No extra cleanup needed for llog_init_commit_thread() */
2091         mds_fs_cleanup(obd);
2092         upcall_cache_cleanup(mds->mds_group_hash);
2093         mds->mds_group_hash = NULL;
2094 err_ns:
2095         lprocfs_obd_cleanup(obd);
2096         lprocfs_free_obd_stats(obd);
2097         ldlm_namespace_free(obd->obd_namespace, 0);
2098         obd->obd_namespace = NULL;
2099 err_ops:
2100         fsfilt_put_ops(obd->obd_fsops);
2101 err_put:
2102         server_put_mount(obd->obd_name, mnt);
2103         obd->u.obt.obt_sb = NULL;
2104         return rc;
2105 }
2106
2107 static int mds_lov_clean(struct obd_device *obd)
2108 {
2109         struct mds_obd *mds = &obd->u.mds;
2110         struct obd_device *osc = mds->mds_osc_obd;
2111         ENTRY;
2112
2113         if (mds->mds_profile) {
2114                 class_del_profile(mds->mds_profile);
2115                 OBD_FREE(mds->mds_profile, strlen(mds->mds_profile) + 1);
2116                 mds->mds_profile = NULL;
2117         }
2118
2119         /* There better be a lov */
2120         if (!osc)
2121                 RETURN(0);
2122         if (IS_ERR(osc))
2123                 RETURN(PTR_ERR(osc));
2124
2125         obd_register_observer(osc, NULL);
2126
2127         /* Give lov our same shutdown flags */
2128         osc->obd_force = obd->obd_force;
2129         osc->obd_fail = obd->obd_fail;
2130
2131         /* Cleanup the lov */
2132         obd_disconnect(mds->mds_osc_exp);
2133         class_manual_cleanup(osc);
2134         mds->mds_osc_exp = NULL;
2135
2136         RETURN(0);
2137 }
2138
2139 static int mds_postsetup(struct obd_device *obd)
2140 {
2141         struct mds_obd *mds = &obd->u.mds;
2142         int rc = 0;
2143         ENTRY;
2144
2145         rc = llog_setup(obd, LLOG_CONFIG_ORIG_CTXT, obd, 0, NULL,
2146                         &llog_lvfs_ops);
2147         if (rc)
2148                 RETURN(rc);
2149
2150         rc = llog_setup(obd, LLOG_LOVEA_ORIG_CTXT, obd, 0, NULL,
2151                         &llog_lvfs_ops);
2152         if (rc)
2153                 RETURN(rc);
2154
2155         if (mds->mds_profile) {
2156                 struct lustre_profile *lprof;
2157                 /* The profile defines which osc and mdc to connect to, for a 
2158                    client.  We reuse that here to figure out the name of the
2159                    lov to use (and ignore lprof->lp_mdc).
2160                    The profile was set in the config log with 
2161                    LCFG_MOUNTOPT profilenm oscnm mdcnm */
2162                 lprof = class_get_profile(mds->mds_profile);
2163                 if (lprof == NULL) {
2164                         CERROR("No profile found: %s\n", mds->mds_profile);
2165                         GOTO(err_cleanup, rc = -ENOENT);
2166                 }
2167                 rc = mds_lov_connect(obd, lprof->lp_osc);
2168                 if (rc)
2169                         GOTO(err_cleanup, rc);
2170         }
2171
2172         RETURN(rc);
2173
2174 err_cleanup:
2175         mds_lov_clean(obd);
2176         llog_cleanup(llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT));
2177         llog_cleanup(llog_get_context(obd, LLOG_LOVEA_ORIG_CTXT));
2178         RETURN(rc);
2179 }
2180
2181 int mds_postrecov(struct obd_device *obd)
2182 {
2183         struct llog_ctxt *ctxt;
2184         int rc;
2185         ENTRY;
2186
2187         if (obd->obd_fail)
2188                 RETURN(0);
2189
2190         LASSERT(!obd->obd_recovering);
2191         ctxt = llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT); 
2192         LASSERT(ctxt != NULL);
2193         llog_ctxt_put(ctxt);
2194
2195         /* set nextid first, so we are sure it happens */
2196         mutex_down(&obd->obd_dev_sem);
2197         rc = mds_lov_set_nextid(obd);
2198         mutex_up(&obd->obd_dev_sem);
2199         if (rc) {
2200                 CERROR("%s: mds_lov_set_nextid failed %d\n",
2201                        obd->obd_name, rc);
2202                 GOTO(out, rc);
2203         }
2204
2205         /* clean PENDING dir */
2206         rc = mds_cleanup_pending(obd);
2207         if (rc < 0)
2208                 GOTO(out, rc);
2209
2210         /* FIXME Does target_finish_recovery really need this to block? */
2211         /* Notify the LOV, which will in turn call mds_notify for each tgt */
2212         /* This means that we have to hack obd_notify to think we're obd_set_up
2213            during mds_lov_connect. */
2214         obd_notify(obd->u.mds.mds_osc_obd, NULL, 
2215                    obd->obd_async_recov ? OBD_NOTIFY_SYNC_NONBLOCK :
2216                    OBD_NOTIFY_SYNC, NULL);
2217
2218         /* quota recovery */
2219         lquota_recovery(mds_quota_interface_ref, obd);
2220
2221 out:
2222         RETURN(rc);
2223 }
2224
2225 /* We need to be able to stop an mds_lov_synchronize */
2226 static int mds_lov_early_clean(struct obd_device *obd)
2227 {
2228         struct mds_obd *mds = &obd->u.mds;
2229         struct obd_device *osc = mds->mds_osc_obd;
2230
2231         if (!osc || (!obd->obd_force && !obd->obd_fail))
2232                 return(0);
2233
2234         CDEBUG(D_HA, "abort inflight\n");
2235         return (obd_precleanup(osc, OBD_CLEANUP_EARLY));
2236 }
2237
2238 static int mds_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
2239 {
2240         int rc = 0;
2241         ENTRY;
2242
2243         switch (stage) {
2244         case OBD_CLEANUP_EARLY:
2245                 break;
2246         case OBD_CLEANUP_EXPORTS:
2247                 target_cleanup_recovery(obd);
2248                 mds_lov_early_clean(obd);
2249                 break;
2250         case OBD_CLEANUP_SELF_EXP:
2251                 mds_lov_disconnect(obd);
2252                 mds_lov_clean(obd);
2253                 llog_cleanup(llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT));
2254                 llog_cleanup(llog_get_context(obd, LLOG_LOVEA_ORIG_CTXT));
2255                 rc = obd_llog_finish(obd, 0);
2256                 break;
2257         case OBD_CLEANUP_OBD:
2258                 break;
2259         }
2260         RETURN(rc);
2261 }
2262
2263 static int mds_cleanup(struct obd_device *obd)
2264 {
2265         struct mds_obd *mds = &obd->u.mds;
2266         lvfs_sbdev_type save_dev;
2267         ENTRY;
2268
2269         if (obd->u.obt.obt_sb == NULL)
2270                 RETURN(0);
2271         save_dev = lvfs_sbdev(obd->u.obt.obt_sb);
2272
2273         if (mds->mds_osc_exp)
2274                 /* lov export was disconnected by mds_lov_clean;
2275                    we just need to drop our ref */
2276                 class_export_put(mds->mds_osc_exp);
2277
2278         lprocfs_free_per_client_stats(obd);
2279         remove_proc_entry("clear", obd->obd_proc_exports_entry);
2280         lprocfs_obd_cleanup(obd);
2281         lprocfs_free_obd_stats(obd);
2282
2283         lquota_cleanup(mds_quota_interface_ref, obd);
2284
2285         mds_update_server_data(obd, 1);
2286         if (mds->mds_lov_objids != NULL) 
2287                 OBD_FREE(mds->mds_lov_objids, mds->mds_lov_objids_size);
2288         mds_fs_cleanup(obd);
2289
2290         upcall_cache_cleanup(mds->mds_group_hash);
2291         mds->mds_group_hash = NULL;
2292
2293         server_put_mount(obd->obd_name, mds->mds_vfsmnt);
2294         obd->u.obt.obt_sb = NULL;
2295
2296         ldlm_namespace_free(obd->obd_namespace, obd->obd_force);
2297
2298         spin_lock_bh(&obd->obd_processing_task_lock);
2299         if (obd->obd_recovering) {
2300                 target_cancel_recovery_timer(obd);
2301                 obd->obd_recovering = 0;
2302         }
2303         spin_unlock_bh(&obd->obd_processing_task_lock);
2304
2305         fsfilt_put_ops(obd->obd_fsops);
2306
2307         LCONSOLE_INFO("MDT %s has stopped.\n", obd->obd_name);
2308
2309         RETURN(0);
2310 }
2311
2312 static void fixup_handle_for_resent_req(struct ptlrpc_request *req, int offset,
2313                                         struct ldlm_lock *new_lock,
2314                                         struct ldlm_lock **old_lock,
2315                                         struct lustre_handle *lockh)
2316 {
2317         struct obd_export *exp = req->rq_export;
2318         struct ldlm_request *dlmreq =
2319                 lustre_msg_buf(req->rq_reqmsg, offset, sizeof(*dlmreq));
2320         struct lustre_handle remote_hdl = dlmreq->lock_handle[0];
2321         struct list_head *iter;
2322
2323         if (!(lustre_msg_get_flags(req->rq_reqmsg) & MSG_RESENT))
2324                 return;
2325
2326         spin_lock(&exp->exp_ldlm_data.led_lock);
2327         list_for_each(iter, &exp->exp_ldlm_data.led_held_locks) {
2328                 struct ldlm_lock *lock;
2329                 lock = list_entry(iter, struct ldlm_lock, l_export_chain);
2330                 if (lock == new_lock)
2331                         continue;
2332                 if (lock->l_remote_handle.cookie == remote_hdl.cookie) {
2333                         lockh->cookie = lock->l_handle.h_cookie;
2334                         LDLM_DEBUG(lock, "restoring lock cookie");
2335                         DEBUG_REQ(D_DLMTRACE, req,"restoring lock cookie "LPX64,
2336                                   lockh->cookie);
2337                         if (old_lock)
2338                                 *old_lock = LDLM_LOCK_GET(lock);
2339                         spin_unlock(&exp->exp_ldlm_data.led_lock);
2340                         return;
2341                 }
2342         }
2343         spin_unlock(&exp->exp_ldlm_data.led_lock);
2344
2345         /* If the xid matches, then we know this is a resent request,
2346          * and allow it. (It's probably an OPEN, for which we don't
2347          * send a lock */
2348         if (req->rq_xid ==
2349             le64_to_cpu(exp->exp_mds_data.med_mcd->mcd_last_xid))
2350                 return;
2351
2352         if (req->rq_xid ==
2353             le64_to_cpu(exp->exp_mds_data.med_mcd->mcd_last_close_xid))
2354                 return;
2355
2356         /* This remote handle isn't enqueued, so we never received or
2357          * processed this request.  Clear MSG_RESENT, because it can
2358          * be handled like any normal request now. */
2359
2360         lustre_msg_clear_flags(req->rq_reqmsg, MSG_RESENT);
2361
2362         DEBUG_REQ(D_DLMTRACE, req, "no existing lock with rhandle "LPX64,
2363                   remote_hdl.cookie);
2364 }
2365
2366 int intent_disposition(struct ldlm_reply *rep, int flag)
2367 {
2368         if (!rep)
2369                 return 0;
2370         return (rep->lock_policy_res1 & flag);
2371 }
2372
2373 void intent_set_disposition(struct ldlm_reply *rep, int flag)
2374 {
2375         if (!rep)
2376                 return;
2377         rep->lock_policy_res1 |= flag;
2378 }
2379
2380 #define IS_CLIENT_DISCONNECT_ERROR(error) \
2381                 (error == -ENOTCONN || error == -ENODEV)
2382
2383 static int mds_intent_policy(struct ldlm_namespace *ns,
2384                              struct ldlm_lock **lockp, void *req_cookie,
2385                              ldlm_mode_t mode, int flags, void *data)
2386 {
2387         struct ptlrpc_request *req = req_cookie;
2388         struct ldlm_lock *lock = *lockp;
2389         struct ldlm_intent *it;
2390         struct mds_obd *mds = &req->rq_export->exp_obd->u.mds;
2391         struct ldlm_reply *rep;
2392         struct lustre_handle lockh = { 0 };
2393         struct ldlm_lock *new_lock = NULL;
2394         int getattr_part = MDS_INODELOCK_UPDATE;
2395         int repsize[5] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
2396                            [DLM_LOCKREPLY_OFF]   = sizeof(struct ldlm_reply),
2397                            [DLM_REPLY_REC_OFF]   = sizeof(struct mds_body),
2398                            [DLM_REPLY_REC_OFF+1] = mds->mds_max_mdsize };
2399         int repbufcnt = 4, rc;
2400         ENTRY;
2401
2402         LASSERT(req != NULL);
2403
2404         if (lustre_msg_bufcount(req->rq_reqmsg) <= DLM_INTENT_IT_OFF) {
2405                 /* No intent was provided */
2406                 rc = lustre_pack_reply(req, 2, repsize, NULL);
2407                 LASSERT(rc == 0);
2408                 RETURN(0);
2409         }
2410
2411         it = lustre_swab_reqbuf(req, DLM_INTENT_IT_OFF, sizeof(*it),
2412                                 lustre_swab_ldlm_intent);
2413         if (it == NULL) {
2414                 CERROR("Intent missing\n");
2415                 RETURN(req->rq_status = -EFAULT);
2416         }
2417
2418         LDLM_DEBUG(lock, "intent policy, opc: %s", ldlm_it2str(it->opc));
2419
2420         if ((req->rq_export->exp_connect_flags & OBD_CONNECT_ACL) &&
2421             (it->opc & (IT_OPEN | IT_GETATTR | IT_LOOKUP)))
2422                 /* we should never allow OBD_CONNECT_ACL if not configured */
2423                 repsize[repbufcnt++] = LUSTRE_POSIX_ACL_MAX_SIZE;
2424         else if (it->opc & IT_UNLINK)
2425                 repsize[repbufcnt++] = mds->mds_max_cookiesize;
2426
2427         rc = lustre_pack_reply(req, repbufcnt, repsize, NULL);
2428         if (rc)
2429                 RETURN(req->rq_status = rc);
2430
2431         rep = lustre_msg_buf(req->rq_repmsg, DLM_LOCKREPLY_OFF, sizeof(*rep));
2432         intent_set_disposition(rep, DISP_IT_EXECD);
2433
2434
2435         /* execute policy */
2436         switch ((long)it->opc) {
2437         case IT_OPEN:
2438         case IT_CREAT|IT_OPEN:
2439                 mds_counter_incr(req->rq_export, LPROC_MDS_OPEN);
2440                 fixup_handle_for_resent_req(req, DLM_LOCKREQ_OFF, lock, NULL,
2441                                             &lockh);
2442                 /* XXX swab here to assert that an mds_open reint
2443                  * packet is following */
2444                 rep->lock_policy_res2 = mds_reint(req, DLM_INTENT_REC_OFF,
2445                                                   &lockh);
2446 #if 0
2447                 /* We abort the lock if the lookup was negative and
2448                  * we did not make it to the OPEN portion */
2449                 if (!intent_disposition(rep, DISP_LOOKUP_EXECD))
2450                         RETURN(ELDLM_LOCK_ABORTED);
2451                 if (intent_disposition(rep, DISP_LOOKUP_NEG) &&
2452                     !intent_disposition(rep, DISP_OPEN_OPEN))
2453 #endif
2454
2455                 /* If there was an error of some sort or if we are not
2456                  * returning any locks */
2457                  if (rep->lock_policy_res2 ||
2458                      !intent_disposition(rep, DISP_OPEN_LOCK)) {
2459                         /* If it is the disconnect error (ENODEV & ENOCONN)
2460                          * ptlrpc layer should know this imediately, it should
2461                          * be replied by rq_stats, otherwise, return it by 
2462                          * intent here
2463                          */
2464                         if (IS_CLIENT_DISCONNECT_ERROR(rep->lock_policy_res2))
2465                                 RETURN(rep->lock_policy_res2);
2466                         else
2467                                 RETURN(ELDLM_LOCK_ABORTED);
2468                  }
2469                 break;
2470         case IT_LOOKUP:
2471                         getattr_part = MDS_INODELOCK_LOOKUP;
2472         case IT_GETATTR:
2473                         getattr_part |= MDS_INODELOCK_LOOKUP;
2474                         OBD_COUNTER_INCREMENT(req->rq_export->exp_obd, getattr);
2475         case IT_READDIR:
2476                 fixup_handle_for_resent_req(req, DLM_LOCKREQ_OFF, lock,
2477                                             &new_lock, &lockh);
2478
2479                 /* INODEBITS_INTEROP: if this lock was converted from a
2480                  * plain lock (client does not support inodebits), then
2481                  * child lock must be taken with both lookup and update
2482                  * bits set for all operations.
2483                  */
2484                 if (!(req->rq_export->exp_connect_flags & OBD_CONNECT_IBITS))
2485                         getattr_part = MDS_INODELOCK_LOOKUP |
2486                                        MDS_INODELOCK_UPDATE;
2487
2488                 rep->lock_policy_res2 = mds_getattr_lock(req,DLM_INTENT_REC_OFF,
2489                                                          getattr_part, &lockh);
2490                 /* FIXME: LDLM can set req->rq_status. MDS sets
2491                    policy_res{1,2} with disposition and status.
2492                    - replay: returns 0 & req->status is old status
2493                    - otherwise: returns req->status */
2494                 if (intent_disposition(rep, DISP_LOOKUP_NEG))
2495                         rep->lock_policy_res2 = 0;
2496                 if (!intent_disposition(rep, DISP_LOOKUP_POS) ||
2497                     rep->lock_policy_res2)
2498                         RETURN(ELDLM_LOCK_ABORTED);
2499                 if (req->rq_status != 0) {
2500                         LBUG();
2501                         rep->lock_policy_res2 = req->rq_status;
2502                         RETURN(ELDLM_LOCK_ABORTED);
2503                 }
2504                 break;
2505         default:
2506                 CERROR("Unhandled intent "LPD64"\n", it->opc);
2507                 RETURN(-EFAULT);
2508         }
2509
2510         /* By this point, whatever function we called above must have either
2511          * filled in 'lockh', been an intent replay, or returned an error.  We
2512          * want to allow replayed RPCs to not get a lock, since we would just
2513          * drop it below anyways because lock replay is done separately by the
2514          * client afterwards.  For regular RPCs we want to give the new lock to
2515          * the client instead of whatever lock it was about to get. */
2516         if (new_lock == NULL)
2517                 new_lock = ldlm_handle2lock(&lockh);
2518         if (new_lock == NULL && (flags & LDLM_FL_INTENT_ONLY))
2519                 RETURN(0);
2520
2521         LASSERTF(new_lock != NULL, "op "LPX64" lockh "LPX64"\n",
2522                  it->opc, lockh.cookie);
2523
2524         /* If we've already given this lock to a client once, then we should
2525          * have no readers or writers.  Otherwise, we should have one reader
2526          * _or_ writer ref (which will be zeroed below) before returning the
2527          * lock to a client. */
2528         if (new_lock->l_export == req->rq_export) {
2529                 LASSERT(new_lock->l_readers + new_lock->l_writers == 0);
2530         } else {
2531                 LASSERT(new_lock->l_export == NULL);
2532                 LASSERT(new_lock->l_readers + new_lock->l_writers == 1);
2533         }
2534
2535         *lockp = new_lock;
2536
2537         if (new_lock->l_export == req->rq_export) {
2538                 /* Already gave this to the client, which means that we
2539                  * reconstructed a reply. */
2540                 LASSERT(lustre_msg_get_flags(req->rq_reqmsg) &
2541                         MSG_RESENT);
2542                 RETURN(ELDLM_LOCK_REPLACED);
2543         }
2544
2545         /* Fixup the lock to be given to the client */
2546         lock_res_and_lock(new_lock);
2547         new_lock->l_readers = 0;
2548         new_lock->l_writers = 0;
2549
2550         new_lock->l_export = class_export_get(req->rq_export);
2551         spin_lock(&req->rq_export->exp_ldlm_data.led_lock);
2552         list_add(&new_lock->l_export_chain,
2553                  &new_lock->l_export->exp_ldlm_data.led_held_locks);
2554         spin_unlock(&req->rq_export->exp_ldlm_data.led_lock);
2555
2556         new_lock->l_blocking_ast = lock->l_blocking_ast;
2557         new_lock->l_completion_ast = lock->l_completion_ast;
2558
2559         memcpy(&new_lock->l_remote_handle, &lock->l_remote_handle,
2560                sizeof(lock->l_remote_handle));
2561
2562         new_lock->l_flags &= ~LDLM_FL_LOCAL;
2563
2564         unlock_res_and_lock(new_lock);
2565         LDLM_LOCK_PUT(new_lock);
2566
2567         RETURN(ELDLM_LOCK_REPLACED);
2568 }
2569
2570 static int mdt_setup(struct obd_device *obd, obd_count len, void *buf)
2571 {
2572         struct mds_obd *mds = &obd->u.mds;
2573         struct lprocfs_static_vars lvars;
2574         int mds_min_threads;
2575         int mds_max_threads;
2576         int rc = 0;
2577         ENTRY;
2578
2579         lprocfs_init_vars(mdt, &lvars);
2580         lprocfs_obd_setup(obd, lvars.obd_vars);
2581
2582         sema_init(&mds->mds_health_sem, 1);
2583
2584         if (mds_num_threads) {
2585                 /* If mds_num_threads is set, it is the min and the max. */
2586                 if (mds_num_threads > MDS_THREADS_MAX)
2587                         mds_num_threads = MDS_THREADS_MAX;
2588                 if (mds_num_threads < MDS_THREADS_MIN)
2589                         mds_num_threads = MDS_THREADS_MIN;
2590                 mds_max_threads = mds_min_threads = mds_num_threads;
2591         } else {
2592                 /* Base min threads on memory and cpus */
2593                 mds_min_threads = num_possible_cpus() * num_physpages >> 
2594                         (27 - CFS_PAGE_SHIFT);
2595                 if (mds_min_threads < MDS_THREADS_MIN)
2596                         mds_min_threads = MDS_THREADS_MIN;
2597                 /* Largest auto threads start value */
2598                 if (mds_min_threads > 32) 
2599                         mds_min_threads = 32;
2600                 mds_max_threads = min(MDS_THREADS_MAX, mds_min_threads * 4);
2601         }
2602
2603         mds->mds_service =
2604                 ptlrpc_init_svc(MDS_NBUFS, MDS_BUFSIZE, MDS_MAXREQSIZE,
2605                                 MDS_MAXREPSIZE, MDS_REQUEST_PORTAL,
2606                                 MDC_REPLY_PORTAL, MDS_SERVICE_WATCHDOG_FACTOR,
2607                                 mds_handle, LUSTRE_MDS_NAME,
2608                                 obd->obd_proc_entry, target_print_req,
2609                                 mds_min_threads, mds_max_threads, "ll_mdt");
2610
2611         if (!mds->mds_service) {
2612                 CERROR("failed to start service\n");
2613                 GOTO(err_lprocfs, rc = -ENOMEM);
2614         }
2615
2616         rc = ptlrpc_start_threads(obd, mds->mds_service);
2617         if (rc)
2618                 GOTO(err_thread, rc);
2619
2620         mds->mds_setattr_service =
2621                 ptlrpc_init_svc(MDS_NBUFS, MDS_BUFSIZE, MDS_MAXREQSIZE,
2622                                 MDS_MAXREPSIZE, MDS_SETATTR_PORTAL,
2623                                 MDC_REPLY_PORTAL, MDS_SERVICE_WATCHDOG_FACTOR,
2624                                 mds_handle, "mds_setattr",
2625                                 obd->obd_proc_entry, target_print_req,
2626                                 mds_min_threads, mds_max_threads,
2627                                 "ll_mdt_attr");
2628         if (!mds->mds_setattr_service) {
2629                 CERROR("failed to start getattr service\n");
2630                 GOTO(err_thread, rc = -ENOMEM);
2631         }
2632
2633         rc = ptlrpc_start_threads(obd, mds->mds_setattr_service);
2634         if (rc)
2635                 GOTO(err_thread2, rc);
2636
2637         mds->mds_readpage_service =
2638                 ptlrpc_init_svc(MDS_NBUFS, MDS_BUFSIZE, MDS_MAXREQSIZE,
2639                                 MDS_MAXREPSIZE, MDS_READPAGE_PORTAL,
2640                                 MDC_REPLY_PORTAL, MDS_SERVICE_WATCHDOG_FACTOR,
2641                                 mds_handle, "mds_readpage",
2642                                 obd->obd_proc_entry, target_print_req,
2643                                 MDS_THREADS_MIN_READPAGE, mds_max_threads,
2644                                 "ll_mdt_rdpg");
2645         if (!mds->mds_readpage_service) {
2646                 CERROR("failed to start readpage service\n");
2647                 GOTO(err_thread2, rc = -ENOMEM);
2648         }
2649
2650         rc = ptlrpc_start_threads(obd, mds->mds_readpage_service);
2651
2652         if (rc)
2653                 GOTO(err_thread3, rc);
2654
2655         ping_evictor_start();
2656
2657         RETURN(0);
2658
2659 err_thread3:
2660         ptlrpc_unregister_service(mds->mds_readpage_service);
2661         mds->mds_readpage_service = NULL;
2662 err_thread2:
2663         ptlrpc_unregister_service(mds->mds_setattr_service);
2664         mds->mds_setattr_service = NULL;
2665 err_thread:
2666         ptlrpc_unregister_service(mds->mds_service);
2667         mds->mds_service = NULL;
2668 err_lprocfs:
2669         lprocfs_obd_cleanup(obd);
2670         return rc;
2671 }
2672
2673 static int mdt_cleanup(struct obd_device *obd)
2674 {
2675         struct mds_obd *mds = &obd->u.mds;
2676         ENTRY;
2677
2678         ping_evictor_stop();
2679
2680         down(&mds->mds_health_sem);
2681         ptlrpc_unregister_service(mds->mds_readpage_service);
2682         ptlrpc_unregister_service(mds->mds_setattr_service);
2683         ptlrpc_unregister_service(mds->mds_service);
2684         mds->mds_readpage_service = NULL;
2685         mds->mds_setattr_service = NULL;
2686         mds->mds_service = NULL;
2687         up(&mds->mds_health_sem);
2688
2689         lprocfs_obd_cleanup(obd);
2690
2691         RETURN(0);
2692 }
2693
2694 static int mdt_health_check(struct obd_device *obd)
2695 {
2696         struct mds_obd *mds = &obd->u.mds;
2697         int rc = 0;
2698
2699         down(&mds->mds_health_sem);
2700         rc |= ptlrpc_service_health_check(mds->mds_readpage_service);
2701         rc |= ptlrpc_service_health_check(mds->mds_setattr_service);
2702         rc |= ptlrpc_service_health_check(mds->mds_service);
2703         up(&mds->mds_health_sem);
2704
2705         /*
2706          * health_check to return 0 on healthy
2707          * and 1 on unhealthy.
2708          */
2709         if(rc != 0)
2710                 rc = 1;
2711
2712         return rc;
2713 }
2714
2715 static struct dentry *mds_lvfs_fid2dentry(__u64 id, __u32 gen, __u64 gr,
2716                                           void *data)
2717 {
2718         struct obd_device *obd = data;
2719         struct ll_fid fid;
2720         fid.id = id;
2721         fid.generation = gen;
2722         return mds_fid2dentry(&obd->u.mds, &fid, NULL);
2723 }
2724
2725 static int mds_health_check(struct obd_device *obd)
2726 {
2727         struct obd_device_target *odt = &obd->u.obt;
2728 #ifdef USE_HEALTH_CHECK_WRITE
2729         struct mds_obd *mds = &obd->u.mds;
2730 #endif
2731         int rc = 0;
2732
2733         if (odt->obt_sb->s_flags & MS_RDONLY)
2734                 rc = 1;
2735
2736 #ifdef USE_HEALTH_CHECK_WRITE
2737         LASSERT(mds->mds_health_check_filp != NULL);
2738         rc |= !!lvfs_check_io_health(obd, mds->mds_health_check_filp);
2739 #endif
2740
2741         return rc;
2742 }
2743
2744 static int mds_process_config(struct obd_device *obd, obd_count len, void *buf)
2745 {
2746         struct lustre_cfg *lcfg = buf;
2747         struct lprocfs_static_vars lvars;
2748         int rc;
2749
2750         lprocfs_init_vars(mds, &lvars);
2751         
2752         rc = class_process_proc_param(PARAM_MDT, lvars.obd_vars, lcfg, obd);
2753         
2754         return(rc);
2755 }
2756
2757 struct lvfs_callback_ops mds_lvfs_ops = {
2758         l_fid2dentry:     mds_lvfs_fid2dentry,
2759 };
2760
2761 /* use obd ops to offer management infrastructure */
2762 static struct obd_ops mds_obd_ops = {
2763         .o_owner           = THIS_MODULE,
2764         .o_connect         = mds_connect,
2765         .o_reconnect       = mds_reconnect,
2766         .o_init_export     = mds_init_export,
2767         .o_destroy_export  = mds_destroy_export,
2768         .o_disconnect      = mds_disconnect,
2769         .o_setup           = mds_setup,
2770         .o_precleanup      = mds_precleanup,
2771         .o_cleanup         = mds_cleanup,
2772         .o_postrecov       = mds_postrecov,
2773         .o_statfs          = mds_obd_statfs,
2774         .o_iocontrol       = mds_iocontrol,
2775         .o_create          = mds_obd_create,
2776         .o_destroy         = mds_obd_destroy,
2777         .o_llog_init       = mds_llog_init,
2778         .o_llog_finish     = mds_llog_finish,
2779         .o_notify          = mds_notify,
2780         .o_health_check    = mds_health_check,
2781         .o_process_config  = mds_process_config,
2782 };
2783
2784 static struct obd_ops mdt_obd_ops = {
2785         .o_owner           = THIS_MODULE,
2786         .o_setup           = mdt_setup,
2787         .o_cleanup         = mdt_cleanup,
2788         .o_health_check    = mdt_health_check,
2789 };
2790
2791 quota_interface_t *mds_quota_interface_ref;
2792 extern quota_interface_t mds_quota_interface;
2793
2794 static int __init mds_init(void)
2795 {
2796         int rc;
2797         struct lprocfs_static_vars lvars;
2798
2799         request_module("lquota");
2800         mds_quota_interface_ref = PORTAL_SYMBOL_GET(mds_quota_interface);
2801         rc = lquota_init(mds_quota_interface_ref);
2802         if (rc) {
2803                 if (mds_quota_interface_ref)
2804                         PORTAL_SYMBOL_PUT(mds_quota_interface);
2805                 return rc;
2806         }
2807         init_obd_quota_ops(mds_quota_interface_ref, &mds_obd_ops);
2808         
2809         lprocfs_init_vars(mds, &lvars);
2810         class_register_type(&mds_obd_ops, lvars.module_vars, LUSTRE_MDS_NAME);
2811         lprocfs_init_vars(mdt, &lvars);
2812         class_register_type(&mdt_obd_ops, lvars.module_vars, LUSTRE_MDT_NAME);
2813
2814         return 0;
2815 }
2816
2817 static void /*__exit*/ mds_exit(void)
2818 {
2819         lquota_exit(mds_quota_interface_ref);
2820         if (mds_quota_interface_ref)
2821                 PORTAL_SYMBOL_PUT(mds_quota_interface);
2822
2823         class_unregister_type(LUSTRE_MDS_NAME);
2824         class_unregister_type(LUSTRE_MDT_NAME);
2825 }
2826
2827 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
2828 MODULE_DESCRIPTION("Lustre Metadata Server (MDS)");
2829 MODULE_LICENSE("GPL");
2830
2831 module_init(mds_init);
2832 module_exit(mds_exit);