Whamcloud - gitweb
8cdf88756b8d7b27e50f5b6fa99dbd9e82e75d9d
[fs/lustre-release.git] / lustre / mds / handler.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  lustre/mds/handler.c
5  *  Lustre Metadata Server (mds) request handler
6  *
7  *  Copyright (c) 2001-2005 Cluster File Systems, Inc.
8  *   Author: Peter Braam <braam@clusterfs.com>
9  *   Author: Andreas Dilger <adilger@clusterfs.com>
10  *   Author: Phil Schwan <phil@clusterfs.com>
11  *   Author: Mike Shaver <shaver@clusterfs.com>
12  *
13  *   This file is part of the Lustre file system, http://www.lustre.org
14  *   Lustre is a trademark of Cluster File Systems, Inc.
15  *
16  *   You may have signed or agreed to another license before downloading
17  *   this software.  If so, you are bound by the terms and conditions
18  *   of that agreement, and the following does not apply to you.  See the
19  *   LICENSE file included with this distribution for more information.
20  *
21  *   If you did not agree to a different license, then this copy of Lustre
22  *   is open source software; you can redistribute it and/or modify it
23  *   under the terms of version 2 of the GNU General Public License as
24  *   published by the Free Software Foundation.
25  *
26  *   In either case, Lustre is distributed in the hope that it will be
27  *   useful, but WITHOUT ANY WARRANTY; without even the implied warranty
28  *   of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
29  *   license text for more details.
30  */
31
32 #ifndef EXPORT_SYMTAB
33 # define EXPORT_SYMTAB
34 #endif
35 #define DEBUG_SUBSYSTEM S_MDS
36
37 #include <lustre_mds.h>
38 #include <linux/module.h>
39 #include <linux/init.h>
40 #include <linux/random.h>
41 #include <linux/fs.h>
42 #include <linux/jbd.h>
43 #if (LINUX_VERSION_CODE >= KERNEL_VERSION(2,5,0))
44 # include <linux/smp_lock.h>
45 # include <linux/buffer_head.h>
46 # include <linux/workqueue.h>
47 # include <linux/mount.h>
48 #else
49 # include <linux/locks.h>
50 #endif
51
52 #include <obd_class.h>
53 #include <lustre_dlm.h>
54 #include <obd_lov.h>
55 #include <lustre_fsfilt.h>
56 #include <lprocfs_status.h>
57 #include <lustre_commit_confd.h>
58 #include <lustre_quota.h>
59 #include <lustre_disk.h>
60 #include <lustre_param.h>
61
62 #include "mds_internal.h"
63
64 int mds_num_threads;
65 CFS_MODULE_PARM(mds_num_threads, "i", int, 0444,
66                 "number of MDS service threads to start");
67
68 static int mds_intent_policy(struct ldlm_namespace *ns,
69                              struct ldlm_lock **lockp, void *req_cookie,
70                              ldlm_mode_t mode, int flags, void *data);
71 static int mds_postsetup(struct obd_device *obd);
72 static int mds_cleanup(struct obd_device *obd);
73
74 /* Assumes caller has already pushed into the kernel filesystem context */
75 static int mds_sendpage(struct ptlrpc_request *req, struct file *file,
76                         loff_t offset, int count)
77 {
78         struct ptlrpc_bulk_desc *desc;
79         struct l_wait_info lwi;
80         struct page **pages;
81         int timeout;
82         int rc = 0, npages, i, tmpcount, tmpsize = 0;
83         ENTRY;
84
85         LASSERT((offset & ~CFS_PAGE_MASK) == 0); /* I'm dubious about this */
86
87         npages = (count + CFS_PAGE_SIZE - 1) >> CFS_PAGE_SHIFT;
88         OBD_ALLOC(pages, sizeof(*pages) * npages);
89         if (!pages)
90                 GOTO(out, rc = -ENOMEM);
91
92         desc = ptlrpc_prep_bulk_exp(req, npages, BULK_PUT_SOURCE,
93                                     MDS_BULK_PORTAL);
94         if (desc == NULL)
95                 GOTO(out_free, rc = -ENOMEM);
96
97         for (i = 0, tmpcount = count; i < npages; i++, tmpcount -= tmpsize) {
98                 tmpsize = tmpcount > CFS_PAGE_SIZE ? CFS_PAGE_SIZE : tmpcount;
99
100                 OBD_PAGE_ALLOC(pages[i], CFS_ALLOC_STD);
101                 if (pages[i] == NULL)
102                         GOTO(cleanup_buf, rc = -ENOMEM);
103
104                 ptlrpc_prep_bulk_page(desc, pages[i], 0, tmpsize);
105         }
106
107         for (i = 0, tmpcount = count; i < npages; i++, tmpcount -= tmpsize) {
108                 tmpsize = tmpcount > CFS_PAGE_SIZE ? CFS_PAGE_SIZE : tmpcount;
109                 CDEBUG(D_EXT2, "reading %u@%llu from dir %lu (size %llu)\n",
110                        tmpsize, offset, file->f_dentry->d_inode->i_ino,
111                        i_size_read(file->f_dentry->d_inode));
112
113                 rc = fsfilt_readpage(req->rq_export->exp_obd, file,
114                                      kmap(pages[i]), tmpsize, &offset);
115                 kunmap(pages[i]);
116
117                 if (rc != tmpsize)
118                         GOTO(cleanup_buf, rc = -EIO);
119         }
120
121         LASSERT(desc->bd_nob == count);
122
123         rc = ptlrpc_start_bulk_transfer(desc);
124         if (rc)
125                 GOTO(cleanup_buf, rc);
126
127         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_SENDPAGE)) {
128                 CERROR("obd_fail_loc=%x, fail operation rc=%d\n",
129                        OBD_FAIL_MDS_SENDPAGE, rc);
130                 GOTO(abort_bulk, rc);
131         }
132
133         timeout = (int)req->rq_deadline - (int)cfs_time_current_sec();
134         if (timeout < 0) {
135                 CERROR("Req deadline already passed %lu (now: %lu)\n",
136                        req->rq_deadline, cfs_time_current_sec());
137         }
138         lwi = LWI_TIMEOUT(max(timeout, 1) * HZ, NULL, NULL);
139         rc = l_wait_event(desc->bd_waitq, !ptlrpc_bulk_active(desc), &lwi);
140         LASSERT (rc == 0 || rc == -ETIMEDOUT);
141
142         if (rc == 0) {
143                 if (desc->bd_success &&
144                     desc->bd_nob_transferred == count)
145                         GOTO(cleanup_buf, rc);
146
147                 rc = -ETIMEDOUT; /* XXX should this be a different errno? */
148         }
149
150         DEBUG_REQ(D_ERROR, req, "bulk failed: %s %d(%d), evicting %s@%s\n",
151                   (rc == -ETIMEDOUT) ? "timeout" : "network error",
152                   desc->bd_nob_transferred, count,
153                   req->rq_export->exp_client_uuid.uuid,
154                   req->rq_export->exp_connection->c_remote_uuid.uuid);
155
156         class_fail_export(req->rq_export);
157
158         EXIT;
159  abort_bulk:
160         ptlrpc_abort_bulk (desc);
161  cleanup_buf:
162         for (i = 0; i < npages; i++)
163                 if (pages[i])
164                         OBD_PAGE_FREE(pages[i]);
165
166         ptlrpc_free_bulk(desc);
167  out_free:
168         OBD_FREE(pages, sizeof(*pages) * npages);
169  out:
170         return rc;
171 }
172
173 /* only valid locked dentries or errors should be returned */
174 struct dentry *mds_fid2locked_dentry(struct obd_device *obd, struct ll_fid *fid,
175                                      struct vfsmount **mnt, int lock_mode,
176                                      struct lustre_handle *lockh,
177                                      char *name, int namelen, __u64 lockpart)
178 {
179         struct mds_obd *mds = &obd->u.mds;
180         struct dentry *de = mds_fid2dentry(mds, fid, mnt), *retval = de;
181         struct ldlm_res_id res_id = { .name = {0} };
182         int flags = LDLM_FL_ATOMIC_CB, rc;
183         ldlm_policy_data_t policy = { .l_inodebits = { lockpart} }; 
184         ENTRY;
185
186         if (IS_ERR(de))
187                 RETURN(de);
188
189         res_id.name[0] = de->d_inode->i_ino;
190         res_id.name[1] = de->d_inode->i_generation;
191         rc = ldlm_cli_enqueue_local(obd->obd_namespace, res_id, 
192                                     LDLM_IBITS, &policy, lock_mode, &flags, 
193                                     ldlm_blocking_ast, ldlm_completion_ast,
194                                     NULL, NULL, 0, NULL, lockh);
195         if (rc != ELDLM_OK) {
196                 l_dput(de);
197                 retval = ERR_PTR(-EIO); /* XXX translate ldlm code */
198         }
199
200         RETURN(retval);
201 }
202
203 /* Look up an entry by inode number. */
204 /* this function ONLY returns valid dget'd dentries with an initialized inode
205    or errors */
206 struct dentry *mds_fid2dentry(struct mds_obd *mds, struct ll_fid *fid,
207                               struct vfsmount **mnt)
208 {
209         char fid_name[32];
210         unsigned long ino = fid->id;
211         __u32 generation = fid->generation;
212         struct inode *inode;
213         struct dentry *result;
214
215         if (ino == 0)
216                 RETURN(ERR_PTR(-ESTALE));
217
218         snprintf(fid_name, sizeof(fid_name), "0x%lx", ino);
219
220         CDEBUG(D_DENTRY, "--> mds_fid2dentry: ino/gen %lu/%u, sb %p\n",
221                ino, generation, mds->mds_obt.obt_sb);
222
223         /* under ext3 this is neither supposed to return bad inodes
224            nor NULL inodes. */
225         result = ll_lookup_one_len(fid_name, mds->mds_fid_de, strlen(fid_name));
226         if (IS_ERR(result))
227                 RETURN(result);
228
229         inode = result->d_inode;
230         if (!inode)
231                 RETURN(ERR_PTR(-ENOENT));
232
233        if (inode->i_nlink == 0) {
234                 if (inode->i_mode == 0 &&
235                     LTIME_S(inode->i_ctime) == 0 ) {
236                         struct obd_device *obd = container_of(mds, struct
237                                                               obd_device, u.mds);
238                         LCONSOLE_WARN("Found inode with zero nlink, mode and "
239                                       "ctime -- this may indicate disk"
240                                       "corruption (device %s, inode %lu, link:"
241                                       " %lu, count: %d)\n", obd->obd_name, inode->i_ino,
242                                       (unsigned long)inode->i_nlink,
243                                       atomic_read(&inode->i_count));
244                 }
245                 dput(result);
246                 RETURN(ERR_PTR(-ENOENT));
247         }
248
249         if (generation && inode->i_generation != generation) {
250                 /* we didn't find the right inode.. */
251                 CDEBUG(D_INODE, "found wrong generation: inode %lu, link: %lu, "
252                        "count: %d, generation %u/%u\n", inode->i_ino,
253                        (unsigned long)inode->i_nlink,
254                        atomic_read(&inode->i_count), inode->i_generation,
255                        generation);
256                 dput(result);
257                 RETURN(ERR_PTR(-ENOENT));
258         }
259
260         if (mnt) {
261                 *mnt = mds->mds_vfsmnt;
262                 mntget(*mnt);
263         }
264
265         RETURN(result);
266 }
267
268 static int mds_connect_internal(struct obd_export *exp, 
269                                 struct obd_connect_data *data)
270 {
271         struct obd_device *obd = exp->exp_obd;
272         if (data != NULL) {
273                 data->ocd_connect_flags &= MDS_CONNECT_SUPPORTED;
274                 data->ocd_ibits_known &= MDS_INODELOCK_FULL;
275
276                 /* If no known bits (which should not happen, probably,
277                    as everybody should support LOOKUP and UPDATE bits at least)
278                    revert to compat mode with plain locks. */
279                 if (!data->ocd_ibits_known &&
280                     data->ocd_connect_flags & OBD_CONNECT_IBITS)
281                         data->ocd_connect_flags &= ~OBD_CONNECT_IBITS;
282
283                 if (!obd->u.mds.mds_fl_acl)
284                         data->ocd_connect_flags &= ~OBD_CONNECT_ACL;
285
286                 if (!obd->u.mds.mds_fl_user_xattr)
287                         data->ocd_connect_flags &= ~OBD_CONNECT_XATTR;
288
289                 exp->exp_connect_flags = data->ocd_connect_flags;
290                 data->ocd_version = LUSTRE_VERSION_CODE;
291                 exp->exp_mds_data.med_ibits_known = data->ocd_ibits_known;
292         }
293
294         if (obd->u.mds.mds_fl_acl &&
295             ((exp->exp_connect_flags & OBD_CONNECT_ACL) == 0)) {
296                 CWARN("%s: MDS requires ACL support but client does not\n",
297                       obd->obd_name);
298                 return -EBADE;
299         }
300         return 0;
301 }
302
303 static int mds_reconnect(struct obd_export *exp, struct obd_device *obd,
304                          struct obd_uuid *cluuid,
305                          struct obd_connect_data *data)
306 {
307         int rc;
308         ENTRY;
309
310         if (exp == NULL || obd == NULL || cluuid == NULL)
311                 RETURN(-EINVAL);
312
313         rc = mds_connect_internal(exp, data);
314
315         RETURN(rc);
316 }
317
318 /* Establish a connection to the MDS.
319  *
320  * This will set up an export structure for the client to hold state data
321  * about that client, like open files, the last operation number it did
322  * on the server, etc.
323  */
324 static int mds_connect(struct lustre_handle *conn, struct obd_device *obd,
325                        struct obd_uuid *cluuid, struct obd_connect_data *data,
326                        void *localdata)
327 {
328         struct obd_export *exp;
329         struct mds_export_data *med;
330         struct mds_client_data *mcd = NULL;
331         lnet_nid_t *client_nid = (lnet_nid_t *)localdata;
332         int rc, abort_recovery;
333         ENTRY;
334
335         if (!conn || !obd || !cluuid)
336                 RETURN(-EINVAL);
337
338         /* Check for aborted recovery. */
339         spin_lock_bh(&obd->obd_processing_task_lock);
340         abort_recovery = obd->obd_abort_recovery;
341         spin_unlock_bh(&obd->obd_processing_task_lock);
342         if (abort_recovery)
343                 target_abort_recovery(obd);
344
345         /* XXX There is a small race between checking the list and adding a
346          * new connection for the same UUID, but the real threat (list
347          * corruption when multiple different clients connect) is solved.
348          *
349          * There is a second race between adding the export to the list,
350          * and filling in the client data below.  Hence skipping the case
351          * of NULL mcd above.  We should already be controlling multiple
352          * connects at the client, and we can't hold the spinlock over
353          * memory allocations without risk of deadlocking.
354          */
355         rc = class_connect(conn, obd, cluuid);
356         if (rc)
357                 RETURN(rc);
358         exp = class_conn2export(conn);
359         LASSERT(exp);
360         med = &exp->exp_mds_data;
361
362         rc = mds_connect_internal(exp, data);
363         if (rc)
364                 GOTO(out, rc);
365
366         OBD_ALLOC(mcd, sizeof(*mcd));
367         if (!mcd)
368                 GOTO(out, rc = -ENOMEM);
369
370         memcpy(mcd->mcd_uuid, cluuid, sizeof(mcd->mcd_uuid));
371         med->med_mcd = mcd;
372
373         rc = mds_client_add(obd, exp, -1, *client_nid);
374         GOTO(out, rc);
375
376 out:
377         if (rc) {
378                 if (mcd) {
379                         OBD_FREE(mcd, sizeof(*mcd));
380                         med->med_mcd = NULL;
381                 }
382                 class_disconnect(exp);
383         } else {
384                 class_export_put(exp);
385         }
386
387         RETURN(rc);
388 }
389
390 int mds_init_export(struct obd_export *exp)
391 {
392         struct mds_export_data *med = &exp->exp_mds_data;
393
394         INIT_LIST_HEAD(&med->med_open_head);
395         spin_lock_init(&med->med_open_lock);
396         
397         spin_lock(&exp->exp_lock);
398         exp->exp_connecting = 1;
399         spin_unlock(&exp->exp_lock);
400
401         RETURN(0);
402 }
403
404 static int mds_destroy_export(struct obd_export *export)
405 {
406         struct mds_export_data *med;
407         struct obd_device *obd = export->exp_obd;
408         struct mds_obd *mds = &obd->u.mds;
409         struct lvfs_run_ctxt saved;
410         struct lov_mds_md *lmm;
411         struct llog_cookie *logcookies;
412         int rc = 0;
413         ENTRY;
414
415         med = &export->exp_mds_data;
416         target_destroy_export(export);
417
418         if (obd_uuid_equals(&export->exp_client_uuid, &obd->obd_uuid))
419                 RETURN(0);
420
421         push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
422         /* Close any open files (which may also cause orphan unlinking). */
423
424         OBD_ALLOC(lmm, mds->mds_max_mdsize);
425         if (lmm == NULL) {
426                 CWARN("%s: allocation failure during cleanup; can not force "
427                       "close file handles on this service.\n", obd->obd_name);
428                 GOTO(out, rc = -ENOMEM);
429         }
430
431         OBD_ALLOC(logcookies, mds->mds_max_cookiesize);
432         if (logcookies == NULL) {
433                 CWARN("%s: allocation failure during cleanup; can not force "
434                       "close file handles on this service.\n", obd->obd_name);
435                 OBD_FREE(lmm, mds->mds_max_mdsize);
436                 GOTO(out, rc = -ENOMEM);
437         }
438
439         spin_lock(&med->med_open_lock);
440         while (!list_empty(&med->med_open_head)) {
441                 struct list_head *tmp = med->med_open_head.next;
442                 struct mds_file_data *mfd =
443                         list_entry(tmp, struct mds_file_data, mfd_list);
444                 int lmm_size = mds->mds_max_mdsize;
445                 umode_t mode = mfd->mfd_dentry->d_inode->i_mode;
446                 __u64 valid = 0;
447
448                 /* Remove mfd handle so it can't be found again.
449                  * We are consuming the mfd_list reference here. */
450                 mds_mfd_unlink(mfd, 0);
451                 spin_unlock(&med->med_open_lock);
452
453                 /* If you change this message, be sure to update
454                  * replay_single:test_46 */
455                 CDEBUG(D_INODE|D_IOCTL, "%s: force closing file handle for "
456                        "%.*s (ino %lu)\n", obd->obd_name,
457                        mfd->mfd_dentry->d_name.len,mfd->mfd_dentry->d_name.name,
458                        mfd->mfd_dentry->d_inode->i_ino);
459
460                 rc = mds_get_md(obd, mfd->mfd_dentry->d_inode, lmm,&lmm_size,1);
461                 if (rc < 0)
462                         CWARN("mds_get_md failure, rc=%d\n", rc);
463                 else
464                         valid |= OBD_MD_FLEASIZE;
465
466                 /* child orphan sem protects orphan_dec_test and
467                  * is_orphan race, mds_mfd_close drops it */
468                 MDS_DOWN_WRITE_ORPHAN_SEM(mfd->mfd_dentry->d_inode);
469
470                 rc = mds_mfd_close(NULL, REQ_REC_OFF, obd, mfd,
471                                    !(export->exp_flags & OBD_OPT_FAILOVER),
472                                    lmm, lmm_size, logcookies,
473                                    mds->mds_max_cookiesize,
474                                    &valid);
475
476                 if (rc)
477                         CDEBUG(D_INODE|D_IOCTL, "Error closing file: %d\n", rc);
478
479                 if (valid & OBD_MD_FLCOOKIE) {
480                         rc = mds_osc_destroy_orphan(obd, mode, lmm,
481                                                     lmm_size, logcookies, 1);
482                         if (rc < 0) {
483                                 CDEBUG(D_INODE, "%s: destroy of orphan failed,"
484                                        " rc = %d\n", obd->obd_name, rc);
485                                 rc = 0;
486                         }
487                         valid &= ~OBD_MD_FLCOOKIE;
488                 }
489
490                 spin_lock(&med->med_open_lock);
491         }
492
493         OBD_FREE(logcookies, mds->mds_max_cookiesize);
494         OBD_FREE(lmm, mds->mds_max_mdsize);
495
496         spin_unlock(&med->med_open_lock);
497
498         pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
499         mds_client_free(export);
500
501  out:
502         RETURN(rc);
503 }
504
505 static int mds_disconnect(struct obd_export *exp)
506 {
507         int rc;
508         ENTRY;
509
510         LASSERT(exp);
511         class_export_get(exp);
512
513         /* Disconnect early so that clients can't keep using export */
514         rc = class_disconnect(exp);
515         if (exp->exp_obd->obd_namespace != NULL)
516                 ldlm_cancel_locks_for_export(exp);
517
518         /* complete all outstanding replies */
519         spin_lock(&exp->exp_lock);
520         while (!list_empty(&exp->exp_outstanding_replies)) {
521                 struct ptlrpc_reply_state *rs =
522                         list_entry(exp->exp_outstanding_replies.next,
523                                    struct ptlrpc_reply_state, rs_exp_list);
524                 struct ptlrpc_service *svc = rs->rs_service;
525
526                 spin_lock(&svc->srv_lock);
527                 list_del_init(&rs->rs_exp_list);
528                 ptlrpc_schedule_difficult_reply(rs);
529                 spin_unlock(&svc->srv_lock);
530         }
531         spin_unlock(&exp->exp_lock);
532
533         class_export_put(exp);
534         RETURN(rc);
535 }
536
537 static int mds_getstatus(struct ptlrpc_request *req)
538 {
539         struct mds_obd *mds = mds_req2mds(req);
540         struct mds_body *body;
541         int rc, size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
542         ENTRY;
543
544         rc = lustre_pack_reply(req, 2, size, NULL);
545         if (rc || OBD_FAIL_CHECK(OBD_FAIL_MDS_GETSTATUS_PACK)) {
546                 CERROR("mds: out of memory for message\n");
547                 req->rq_status = -ENOMEM;       /* superfluous? */
548                 RETURN(-ENOMEM);
549         }
550
551         body = lustre_msg_buf(req->rq_repmsg, REPLY_REC_OFF, sizeof(*body));
552         memcpy(&body->fid1, &mds->mds_rootfid, sizeof(body->fid1));
553
554         /* the last_committed and last_xid fields are filled in for all
555          * replies already - no need to do so here also.
556          */
557         RETURN(0);
558 }
559
560 /* get the LOV EA from @inode and store it into @md.  It can be at most
561  * @size bytes, and @size is updated with the actual EA size.
562  * The EA size is also returned on success, and -ve errno on failure. 
563  * If there is no EA then 0 is returned. */
564 int mds_get_md(struct obd_device *obd, struct inode *inode, void *md,
565                int *size, int lock)
566 {
567         int rc = 0;
568         int lmm_size;
569
570         if (lock)
571                 LOCK_INODE_MUTEX(inode);
572         rc = fsfilt_get_md(obd, inode, md, *size, "lov");
573
574         if (rc < 0) {
575                 CERROR("Error %d reading eadata for ino %lu\n",
576                        rc, inode->i_ino);
577         } else if (rc > 0) {
578                 lmm_size = rc;
579                 rc = mds_convert_lov_ea(obd, inode, md, lmm_size);
580
581                 if (rc == 0) {
582                         *size = lmm_size;
583                         rc = lmm_size;
584                 } else if (rc > 0) {
585                         *size = rc;
586                 }
587         } else {
588                 *size = 0;
589         }
590         if (lock)
591                 UNLOCK_INODE_MUTEX(inode);
592
593         RETURN (rc);
594 }
595
596
597 /* Call with lock=1 if you want mds_pack_md to take the i_mutex.
598  * Call with lock=0 if the caller has already taken the i_mutex. */
599 int mds_pack_md(struct obd_device *obd, struct lustre_msg *msg, int offset,
600                 struct mds_body *body, struct inode *inode, int lock)
601 {
602         struct mds_obd *mds = &obd->u.mds;
603         void *lmm;
604         int lmm_size;
605         int rc;
606         ENTRY;
607
608         lmm = lustre_msg_buf(msg, offset, 0);
609         if (lmm == NULL) {
610                 /* Some problem with getting eadata when I sized the reply
611                  * buffer... */
612                 CDEBUG(D_INFO, "no space reserved for inode %lu MD\n",
613                        inode->i_ino);
614                 RETURN(0);
615         }
616         lmm_size = lustre_msg_buflen(msg, offset);
617
618         /* I don't really like this, but it is a sanity check on the client
619          * MD request.  However, if the client doesn't know how much space
620          * to reserve for the MD, it shouldn't be bad to have too much space.
621          */
622         if (lmm_size > mds->mds_max_mdsize) {
623                 CWARN("Reading MD for inode %lu of %d bytes > max %d\n",
624                        inode->i_ino, lmm_size, mds->mds_max_mdsize);
625                 // RETURN(-EINVAL);
626         }
627
628         rc = mds_get_md(obd, inode, lmm, &lmm_size, lock);
629         if (rc > 0) {
630                 if (S_ISDIR(inode->i_mode))
631                         body->valid |= OBD_MD_FLDIREA;
632                 else
633                         body->valid |= OBD_MD_FLEASIZE;
634                 body->eadatasize = lmm_size;
635                 rc = 0;
636         }
637
638         RETURN(rc);
639 }
640
641 #ifdef CONFIG_FS_POSIX_ACL
642 static
643 int mds_pack_posix_acl(struct inode *inode, struct lustre_msg *repmsg,
644                        struct mds_body *repbody, int repoff)
645 {
646         struct dentry de = { .d_inode = inode };
647         int buflen, rc;
648         ENTRY;
649
650         LASSERT(repbody->aclsize == 0);
651         LASSERT(lustre_msg_bufcount(repmsg) > repoff);
652
653         buflen = lustre_msg_buflen(repmsg, repoff);
654         if (!buflen)
655                 GOTO(out, 0);
656
657         if (!inode->i_op || !inode->i_op->getxattr)
658                 GOTO(out, 0);
659
660         lock_24kernel();
661         rc = inode->i_op->getxattr(&de, MDS_XATTR_NAME_ACL_ACCESS,
662                                    lustre_msg_buf(repmsg, repoff, buflen),
663                                    buflen);
664         unlock_24kernel();
665
666         if (rc >= 0)
667                 repbody->aclsize = rc;
668         else if (rc != -ENODATA) {
669                 CERROR("buflen %d, get acl: %d\n", buflen, rc);
670                 RETURN(rc);
671         }
672         EXIT;
673 out:
674         repbody->valid |= OBD_MD_FLACL;
675         return 0;
676 }
677 #else
678 #define mds_pack_posix_acl(inode, repmsg, repbody, repoff) 0
679 #endif
680
681 int mds_pack_acl(struct mds_export_data *med, struct inode *inode,
682                  struct lustre_msg *repmsg, struct mds_body *repbody,
683                  int repoff)
684 {
685         return mds_pack_posix_acl(inode, repmsg, repbody, repoff);
686 }
687
688 static int mds_getattr_internal(struct obd_device *obd, struct dentry *dentry,
689                                 struct ptlrpc_request *req,
690                                 struct mds_body *reqbody, int reply_off)
691 {
692         struct mds_body *body;
693         struct inode *inode = dentry->d_inode;
694         int rc = 0;
695         ENTRY;
696
697         if (inode == NULL)
698                 RETURN(-ENOENT);
699
700         body = lustre_msg_buf(req->rq_repmsg, reply_off, sizeof(*body));
701         LASSERT(body != NULL);                 /* caller prepped reply */
702
703         mds_pack_inode2fid(&body->fid1, inode);
704         body->flags = reqbody->flags; /* copy MDS_BFLAG_EXT_FLAGS if present */
705         mds_pack_inode2body(body, inode);
706         reply_off++;
707
708         if ((S_ISREG(inode->i_mode) && (reqbody->valid & OBD_MD_FLEASIZE)) ||
709             (S_ISDIR(inode->i_mode) && (reqbody->valid & OBD_MD_FLDIREA))) {
710                 rc = mds_pack_md(obd, req->rq_repmsg, reply_off, body,
711                                  inode, 1);
712
713                 /* If we have LOV EA data, the OST holds size, atime, mtime */
714                 if (!(body->valid & OBD_MD_FLEASIZE) &&
715                     !(body->valid & OBD_MD_FLDIREA))
716                         body->valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS |
717                                         OBD_MD_FLATIME | OBD_MD_FLMTIME);
718
719                 lustre_shrink_reply(req, reply_off, body->eadatasize, 0);
720                 if (body->eadatasize)
721                         reply_off++;
722         } else if (S_ISLNK(inode->i_mode) &&
723                    (reqbody->valid & OBD_MD_LINKNAME) != 0) {
724                 char *symname = lustre_msg_buf(req->rq_repmsg, reply_off, 0);
725                 int len;
726
727                 LASSERT (symname != NULL);       /* caller prepped reply */
728                 len = lustre_msg_buflen(req->rq_repmsg, reply_off);
729
730                 rc = inode->i_op->readlink(dentry, symname, len);
731                 if (rc < 0) {
732                         CERROR("readlink failed: %d\n", rc);
733                 } else if (rc != len - 1) {
734                         CERROR ("Unexpected readlink rc %d: expecting %d\n",
735                                 rc, len - 1);
736                         rc = -EINVAL;
737                 } else {
738                         CDEBUG(D_INODE, "read symlink dest %s\n", symname);
739                         body->valid |= OBD_MD_LINKNAME;
740                         body->eadatasize = rc + 1;
741                         symname[rc] = 0;        /* NULL terminate */
742                         rc = 0;
743                 }
744                 reply_off++;
745         } else if (reqbody->valid == OBD_MD_FLFLAGS &&
746                    reqbody->flags & MDS_BFLAG_EXT_FLAGS) {
747                 int flags;
748
749                 /* We only return the full set of flags on ioctl, otherwise we
750                  * get enough flags from the inode in mds_pack_inode2body(). */
751                 rc = fsfilt_iocontrol(obd, inode, NULL, EXT3_IOC_GETFLAGS,
752                                       (long)&flags);
753                 if (rc == 0)
754                         body->flags = flags | MDS_BFLAG_EXT_FLAGS;
755         }
756
757         if (reqbody->valid & OBD_MD_FLMODEASIZE) {
758                 struct mds_obd *mds = mds_req2mds(req);
759                 body->max_cookiesize = mds->mds_max_cookiesize;
760                 body->max_mdsize = mds->mds_max_mdsize;
761                 body->valid |= OBD_MD_FLMODEASIZE;
762         }
763
764         if (rc)
765                 RETURN(rc);
766
767 #ifdef CONFIG_FS_POSIX_ACL
768         if ((req->rq_export->exp_connect_flags & OBD_CONNECT_ACL) &&
769             (reqbody->valid & OBD_MD_FLACL)) {
770                 rc = mds_pack_acl(&req->rq_export->exp_mds_data,
771                                   inode, req->rq_repmsg,
772                                   body, reply_off);
773
774                 lustre_shrink_reply(req, reply_off, body->aclsize, 0);
775                 if (body->aclsize)
776                         reply_off++;
777         }
778 #endif
779
780         RETURN(rc);
781 }
782
783 static int mds_getattr_pack_msg(struct ptlrpc_request *req, struct inode *inode,
784                                 int offset)
785 {
786         struct mds_obd *mds = mds_req2mds(req);
787         struct mds_body *body;
788         int rc, bufcount = 2;
789         int size[4] = { sizeof(struct ptlrpc_body), sizeof(*body) };
790         ENTRY;
791
792         LASSERT(offset == REQ_REC_OFF); /* non-intent */
793
794         body = lustre_msg_buf(req->rq_reqmsg, offset, sizeof(*body));
795         LASSERT(body != NULL);                    /* checked by caller */
796         LASSERT(lustre_req_swabbed(req, offset)); /* swabbed by caller */
797
798         if ((S_ISREG(inode->i_mode) && (body->valid & OBD_MD_FLEASIZE)) ||
799             (S_ISDIR(inode->i_mode) && (body->valid & OBD_MD_FLDIREA))) {
800                 LOCK_INODE_MUTEX(inode);
801                 rc = fsfilt_get_md(req->rq_export->exp_obd, inode, NULL, 0,
802                                    "lov");
803                 UNLOCK_INODE_MUTEX(inode);
804                 CDEBUG(D_INODE, "got %d bytes MD data for inode %lu\n",
805                        rc, inode->i_ino);
806                 if (rc < 0) {
807                         if (rc != -ENODATA) {
808                                 CERROR("error getting inode %lu MD: rc = %d\n",
809                                        inode->i_ino, rc);
810                                 RETURN(rc);
811                         }
812                         size[bufcount] = 0;
813                 } else if (rc > mds->mds_max_mdsize) {
814                         size[bufcount] = 0;
815                         CERROR("MD size %d larger than maximum possible %u\n",
816                                rc, mds->mds_max_mdsize);
817                 } else {
818                         size[bufcount] = rc;
819                 }
820                 bufcount++;
821         } else if (S_ISLNK(inode->i_mode) && (body->valid & OBD_MD_LINKNAME)) {
822                 if (i_size_read(inode) + 1 != body->eadatasize)
823                         CERROR("symlink size: %Lu, reply space: %d\n",
824                                i_size_read(inode) + 1, body->eadatasize);
825                 size[bufcount] = min_t(int, i_size_read(inode) + 1,
826                                        body->eadatasize);
827                 bufcount++;
828                 CDEBUG(D_INODE, "symlink size: %Lu, reply space: %d\n",
829                        i_size_read(inode) + 1, body->eadatasize);
830         }
831
832 #ifdef CONFIG_FS_POSIX_ACL
833         if ((req->rq_export->exp_connect_flags & OBD_CONNECT_ACL) &&
834             (body->valid & OBD_MD_FLACL)) {
835                 struct dentry de = { .d_inode = inode };
836
837                 size[bufcount] = 0;
838                 if (inode->i_op && inode->i_op->getxattr) {
839                         lock_24kernel();
840                         rc = inode->i_op->getxattr(&de, MDS_XATTR_NAME_ACL_ACCESS,
841                                                    NULL, 0);
842                         unlock_24kernel();
843
844                         if (rc < 0) {
845                                 if (rc != -ENODATA) {
846                                         CERROR("got acl size: %d\n", rc);
847                                         RETURN(rc);
848                                 }
849                         } else
850                                 size[bufcount] = rc;
851                 }
852                 bufcount++;
853         }
854 #endif
855
856         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_GETATTR_PACK)) {
857                 CERROR("failed MDS_GETATTR_PACK test\n");
858                 req->rq_status = -ENOMEM;
859                 RETURN(-ENOMEM);
860         }
861
862         rc = lustre_pack_reply(req, bufcount, size, NULL);
863         if (rc) {
864                 CERROR("lustre_pack_reply failed: rc %d\n", rc);
865                 req->rq_status = rc;
866                 RETURN(rc);
867         }
868
869         RETURN(0);
870 }
871
872 static int mds_getattr_lock(struct ptlrpc_request *req, int offset,
873                             int child_part, struct lustre_handle *child_lockh)
874 {
875         struct obd_device *obd = req->rq_export->exp_obd;
876         struct mds_obd *mds = &obd->u.mds;
877         struct ldlm_reply *rep = NULL;
878         struct lvfs_run_ctxt saved;
879         struct mds_body *body;
880         struct dentry *dparent = NULL, *dchild = NULL;
881         struct lvfs_ucred uc = {NULL,};
882         struct lustre_handle parent_lockh;
883         int namesize;
884         int rc = 0, cleanup_phase = 0, resent_req = 0;
885         char *name;
886         ENTRY;
887
888         LASSERT(!strcmp(obd->obd_type->typ_name, LUSTRE_MDS_NAME));
889
890         /* Swab now, before anyone looks inside the request */
891         body = lustre_swab_reqbuf(req, offset, sizeof(*body),
892                                   lustre_swab_mds_body);
893         if (body == NULL) {
894                 CERROR("Can't swab mds_body\n");
895                 RETURN(-EFAULT);
896         }
897
898         lustre_set_req_swabbed(req, offset + 1);
899         name = lustre_msg_string(req->rq_reqmsg, offset + 1, 0);
900         if (name == NULL) {
901                 CERROR("Can't unpack name\n");
902                 RETURN(-EFAULT);
903         }
904         namesize = lustre_msg_buflen(req->rq_reqmsg, offset + 1);
905         /* namesize less than 2 means we have empty name, probably came from
906            revalidate by cfid, so no point in having name to be set */
907         if (namesize <= 1)
908                 name = NULL;
909
910         rc = mds_init_ucred(&uc, req, offset);
911         if (rc)
912                 GOTO(cleanup, rc);
913
914         LASSERT(offset == REQ_REC_OFF || offset == DLM_INTENT_REC_OFF);
915         /* if requests were at offset 2, the getattr reply goes back at 1 */
916         if (offset == DLM_INTENT_REC_OFF) {
917                 rep = lustre_msg_buf(req->rq_repmsg, DLM_LOCKREPLY_OFF,
918                                      sizeof(*rep));
919                 offset = DLM_REPLY_REC_OFF;
920         }
921
922         push_ctxt(&saved, &obd->obd_lvfs_ctxt, &uc);
923         cleanup_phase = 1; /* kernel context */
924         intent_set_disposition(rep, DISP_LOOKUP_EXECD);
925
926         /* FIXME: handle raw lookup */
927 #if 0
928         if (body->valid == OBD_MD_FLID) {
929                 struct mds_body *mds_reply;
930                 int size = sizeof(*mds_reply);
931                 ino_t inum;
932                 // The user requested ONLY the inode number, so do a raw lookup
933                 rc = lustre_pack_reply(req, 1, &size, NULL);
934                 if (rc) {
935                         CERROR("out of memory\n");
936                         GOTO(cleanup, rc);
937                 }
938
939                 rc = dir->i_op->lookup_raw(dir, name, namesize - 1, &inum);
940
941                 mds_reply = lustre_msg_buf(req->rq_repmsg, offset,
942                                            sizeof(*mds_reply));
943                 mds_reply->fid1.id = inum;
944                 mds_reply->valid = OBD_MD_FLID;
945                 GOTO(cleanup, rc);
946         }
947 #endif
948
949         if (lustre_handle_is_used(child_lockh)) {
950                 LASSERT(lustre_msg_get_flags(req->rq_reqmsg) & MSG_RESENT);
951                 resent_req = 1;
952         }
953
954         if (resent_req == 0) {
955                 if (name) {
956                         OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_RESEND, obd_timeout*2);
957                         rc = mds_get_parent_child_locked(obd, &obd->u.mds, 
958                                                          &body->fid1,
959                                                          &parent_lockh, 
960                                                          &dparent, LCK_CR,
961                                                          MDS_INODELOCK_UPDATE,
962                                                          name, namesize,
963                                                          child_lockh, &dchild,
964                                                          LCK_CR, child_part);
965                 } else {
966                         /* For revalidate by fid we always take UPDATE lock */
967                         dchild = mds_fid2locked_dentry(obd, &body->fid2, NULL,
968                                                        LCK_CR, child_lockh,
969                                                        NULL, 0, child_part);
970                         LASSERT(dchild);
971                         if (IS_ERR(dchild))
972                                 rc = PTR_ERR(dchild);
973                 } 
974                 if (rc)
975                         GOTO(cleanup, rc);
976         } else {
977                 struct ldlm_lock *granted_lock;
978                 struct ll_fid child_fid;
979                 struct ldlm_resource *res;
980                 DEBUG_REQ(D_DLMTRACE, req, "resent, not enqueuing new locks");
981                 granted_lock = ldlm_handle2lock(child_lockh);
982                 LASSERTF(granted_lock != NULL, LPU64"/%u lockh "LPX64"\n",
983                          body->fid1.id, body->fid1.generation,
984                          child_lockh->cookie);
985
986
987                 res = granted_lock->l_resource;
988                 child_fid.id = res->lr_name.name[0];
989                 child_fid.generation = res->lr_name.name[1];
990                 dchild = mds_fid2dentry(&obd->u.mds, &child_fid, NULL);
991                 LASSERT(!IS_ERR(dchild));
992                 LDLM_LOCK_PUT(granted_lock);
993         }
994
995         cleanup_phase = 2; /* dchild, dparent, locks */
996
997         if (dchild->d_inode == NULL) {
998                 intent_set_disposition(rep, DISP_LOOKUP_NEG);
999                 /* in the intent case, the policy clears this error:
1000                    the disposition is enough */
1001                 GOTO(cleanup, rc = -ENOENT);
1002         } else {
1003                 intent_set_disposition(rep, DISP_LOOKUP_POS);
1004         }
1005
1006         if (req->rq_repmsg == NULL) {
1007                 rc = mds_getattr_pack_msg(req, dchild->d_inode, offset);
1008                 if (rc != 0) {
1009                         CERROR ("mds_getattr_pack_msg: %d\n", rc);
1010                         GOTO (cleanup, rc);
1011                 }
1012         }
1013
1014         rc = mds_getattr_internal(obd, dchild, req, body, offset);
1015         GOTO(cleanup, rc); /* returns the lock to the client */
1016
1017  cleanup:
1018         switch (cleanup_phase) {
1019         case 2:
1020                 if (resent_req == 0) {
1021                         if (rc && dchild->d_inode)
1022                                 ldlm_lock_decref(child_lockh, LCK_CR);
1023                         if (name) {
1024                                 ldlm_lock_decref(&parent_lockh, LCK_CR);
1025                                 l_dput(dparent);
1026                         }
1027                 }
1028                 l_dput(dchild);
1029         case 1:
1030                 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, &uc);
1031         default:
1032                 mds_exit_ucred(&uc, mds);
1033                 if (!req->rq_packed_final) {
1034                         req->rq_status = rc;
1035                         lustre_pack_reply(req, 1, NULL, NULL);
1036                 }
1037         }
1038         return rc;
1039 }
1040
1041 static int mds_getattr(struct ptlrpc_request *req, int offset)
1042 {
1043         struct mds_obd *mds = mds_req2mds(req);
1044         struct obd_device *obd = req->rq_export->exp_obd;
1045         struct lvfs_run_ctxt saved;
1046         struct dentry *de;
1047         struct mds_body *body;
1048         struct lvfs_ucred uc = { NULL, };
1049         int rc = 0;
1050         ENTRY;
1051
1052         OBD_COUNTER_INCREMENT(obd, getattr);
1053
1054         body = lustre_swab_reqbuf(req, offset, sizeof(*body),
1055                                   lustre_swab_mds_body);
1056         if (body == NULL)
1057                 RETURN(-EFAULT);
1058
1059         rc = mds_init_ucred(&uc, req, offset);
1060         if (rc)
1061                 GOTO(out_ucred, rc);
1062
1063         push_ctxt(&saved, &obd->obd_lvfs_ctxt, &uc);
1064         de = mds_fid2dentry(mds, &body->fid1, NULL);
1065         if (IS_ERR(de)) {
1066                 rc = req->rq_status = PTR_ERR(de);
1067                 GOTO(out_pop, rc);
1068         }
1069
1070         rc = mds_getattr_pack_msg(req, de->d_inode, offset);
1071         if (rc != 0) {
1072                 CERROR("mds_getattr_pack_msg: %d\n", rc);
1073                 GOTO(out_pop, rc);
1074         }
1075
1076         req->rq_status = mds_getattr_internal(obd, de, req, body,REPLY_REC_OFF);
1077
1078         l_dput(de);
1079         GOTO(out_pop, rc);
1080 out_pop:
1081         pop_ctxt(&saved, &obd->obd_lvfs_ctxt, &uc);
1082 out_ucred:
1083         if (!req->rq_packed_final) {
1084                 req->rq_status = rc;
1085                 lustre_pack_reply(req, 1, NULL, NULL);
1086         }
1087         mds_exit_ucred(&uc, mds);
1088         return rc;
1089 }
1090
1091 static int mds_obd_statfs(struct obd_device *obd, struct obd_statfs *osfs,
1092                           __u64 max_age)
1093 {
1094         int rc;
1095
1096         spin_lock(&obd->obd_osfs_lock);
1097         rc = fsfilt_statfs(obd, obd->u.obt.obt_sb, max_age);
1098         if (rc == 0)
1099                 memcpy(osfs, &obd->obd_osfs, sizeof(*osfs));
1100         spin_unlock(&obd->obd_osfs_lock);
1101
1102         return rc;
1103 }
1104
1105 static int mds_statfs(struct ptlrpc_request *req)
1106 {
1107         struct obd_device *obd = req->rq_export->exp_obd;
1108         struct ptlrpc_service *svc = req->rq_rqbd->rqbd_service;
1109         int rc, size[2] = { sizeof(struct ptlrpc_body),
1110                             sizeof(struct obd_statfs) };
1111         ENTRY;
1112
1113         /* This will trigger a watchdog timeout */
1114         OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_STATFS_LCW_SLEEP,
1115                          (MDS_SERVICE_WATCHDOG_FACTOR * 
1116                           at_get(&svc->srv_at_estimate) / 1000) + 1);
1117         OBD_COUNTER_INCREMENT(obd, statfs);
1118
1119         rc = lustre_pack_reply(req, 2, size, NULL);
1120         if (rc || OBD_FAIL_CHECK(OBD_FAIL_MDS_STATFS_PACK)) {
1121                 CERROR("mds: statfs lustre_pack_reply failed: rc = %d\n", rc);
1122                 GOTO(out, rc);
1123         }
1124
1125         /* We call this so that we can cache a bit - 1 jiffie worth */
1126         rc = mds_obd_statfs(obd, lustre_msg_buf(req->rq_repmsg, REPLY_REC_OFF,
1127                                                 size[REPLY_REC_OFF]),
1128                             cfs_time_current_64() - HZ);
1129         if (rc) {
1130                 CERROR("mds_obd_statfs failed: rc %d\n", rc);
1131                 GOTO(out, rc);
1132         }
1133
1134         EXIT;
1135 out:
1136         req->rq_status = rc;
1137         return 0;
1138 }
1139
1140 static int mds_sync(struct ptlrpc_request *req, int offset)
1141 {
1142         struct obd_device *obd = req->rq_export->exp_obd;
1143         struct mds_obd *mds = &obd->u.mds;
1144         struct mds_body *body;
1145         int rc, size[2] = { sizeof(struct ptlrpc_body), sizeof(*body) };
1146         ENTRY;
1147
1148         body = lustre_swab_reqbuf(req, offset, sizeof(*body),
1149                                   lustre_swab_mds_body);
1150         if (body == NULL)
1151                 GOTO(out, rc = -EFAULT);
1152
1153         rc = lustre_pack_reply(req, 2, size, NULL);
1154         if (rc || OBD_FAIL_CHECK(OBD_FAIL_MDS_SYNC_PACK)) {
1155                 CERROR("fsync lustre_pack_reply failed: rc = %d\n", rc);
1156                 GOTO(out, rc);
1157         }
1158
1159         if (body->fid1.id == 0) {
1160                 /* a fid of zero is taken to mean "sync whole filesystem" */
1161                 rc = fsfilt_sync(obd, obd->u.obt.obt_sb);
1162                 GOTO(out, rc);
1163         } else {
1164                 struct dentry *de;
1165
1166                 de = mds_fid2dentry(mds, &body->fid1, NULL);
1167                 if (IS_ERR(de))
1168                         GOTO(out, rc = PTR_ERR(de));
1169
1170                 /* The file parameter isn't used for anything */
1171                 if (de->d_inode->i_fop && de->d_inode->i_fop->fsync)
1172                         rc = de->d_inode->i_fop->fsync(NULL, de, 1);
1173                 if (rc == 0) {
1174                         body = lustre_msg_buf(req->rq_repmsg, REPLY_REC_OFF,
1175                                               sizeof(*body));
1176                         mds_pack_inode2fid(&body->fid1, de->d_inode);
1177                         mds_pack_inode2body(body, de->d_inode);
1178                 }
1179
1180                 l_dput(de);
1181                 GOTO(out, rc);
1182         }
1183 out:
1184         req->rq_status = rc;
1185         return 0;
1186 }
1187
1188 /* mds_readpage does not take a DLM lock on the inode, because the client must
1189  * already have a PR lock.
1190  *
1191  * If we were to take another one here, a deadlock will result, if another
1192  * thread is already waiting for a PW lock. */
1193 static int mds_readpage(struct ptlrpc_request *req, int offset)
1194 {
1195         struct obd_device *obd = req->rq_export->exp_obd;
1196         struct mds_obd *mds = &obd->u.mds;
1197         struct vfsmount *mnt;
1198         struct dentry *de;
1199         struct file *file;
1200         struct mds_body *body, *repbody;
1201         struct lvfs_run_ctxt saved;
1202         int rc, size[2] = { sizeof(struct ptlrpc_body), sizeof(*repbody) };
1203         struct lvfs_ucred uc = {NULL,};
1204         ENTRY;
1205
1206         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_READPAGE_PACK))
1207                 RETURN(-ENOMEM);
1208
1209         rc = lustre_pack_reply(req, 2, size, NULL);
1210         if (rc) {
1211                 CERROR("error packing readpage reply: rc %d\n", rc);
1212                 GOTO(out, rc);
1213         }
1214
1215         body = lustre_swab_reqbuf(req, offset, sizeof(*body),
1216                                   lustre_swab_mds_body);
1217         if (body == NULL)
1218                 GOTO (out, rc = -EFAULT);
1219
1220         rc = mds_init_ucred(&uc, req, offset);
1221         if (rc)
1222                 GOTO(out, rc);
1223
1224         push_ctxt(&saved, &obd->obd_lvfs_ctxt, &uc);
1225         de = mds_fid2dentry(&obd->u.mds, &body->fid1, &mnt);
1226         if (IS_ERR(de))
1227                 GOTO(out_pop, rc = PTR_ERR(de));
1228
1229         CDEBUG(D_INODE, "ino %lu\n", de->d_inode->i_ino);
1230
1231         file = dentry_open(de, mnt, O_RDONLY | O_LARGEFILE);
1232         /* note: in case of an error, dentry_open puts dentry */
1233         if (IS_ERR(file))
1234                 GOTO(out_pop, rc = PTR_ERR(file));
1235
1236         /* body->size is actually the offset -eeb */
1237         if ((body->size & (de->d_inode->i_sb->s_blocksize - 1)) != 0) {
1238                 CERROR("offset "LPU64" not on a block boundary of %lu\n",
1239                        body->size, de->d_inode->i_sb->s_blocksize);
1240                 GOTO(out_file, rc = -EFAULT);
1241         }
1242
1243         /* body->nlink is actually the #bytes to read -eeb */
1244         if (body->nlink & (de->d_inode->i_sb->s_blocksize - 1)) {
1245                 CERROR("size %u is not multiple of blocksize %lu\n",
1246                        body->nlink, de->d_inode->i_sb->s_blocksize);
1247                 GOTO(out_file, rc = -EFAULT);
1248         }
1249
1250         repbody = lustre_msg_buf(req->rq_repmsg, REPLY_REC_OFF,
1251                                  sizeof(*repbody));
1252         repbody->size = i_size_read(file->f_dentry->d_inode);
1253         repbody->valid = OBD_MD_FLSIZE;
1254
1255         /* to make this asynchronous make sure that the handling function
1256            doesn't send a reply when this function completes. Instead a
1257            callback function would send the reply */
1258         /* body->size is actually the offset -eeb */
1259         rc = mds_sendpage(req, file, body->size, body->nlink);
1260
1261 out_file:
1262         filp_close(file, 0);
1263 out_pop:
1264         pop_ctxt(&saved, &obd->obd_lvfs_ctxt, &uc);
1265 out:
1266         mds_exit_ucred(&uc, mds);
1267         req->rq_status = rc;
1268         RETURN(0);
1269 }
1270
1271 int mds_reint(struct ptlrpc_request *req, int offset,
1272               struct lustre_handle *lockh)
1273 {
1274         struct mds_update_record *rec; /* 116 bytes on the stack?  no sir! */
1275         int rc;
1276
1277         OBD_ALLOC(rec, sizeof(*rec));
1278         if (rec == NULL)
1279                 RETURN(-ENOMEM);
1280
1281         rc = mds_update_unpack(req, offset, rec);
1282         if (rc || OBD_FAIL_CHECK(OBD_FAIL_MDS_REINT_UNPACK)) {
1283                 CERROR("invalid record\n");
1284                 GOTO(out, req->rq_status = -EINVAL);
1285         }
1286
1287         /* rc will be used to interrupt a for loop over multiple records */
1288         rc = mds_reint_rec(rec, offset, req, lockh);
1289  out:
1290         OBD_FREE(rec, sizeof(*rec));
1291         return rc;
1292 }
1293
1294 static int mds_filter_recovery_request(struct ptlrpc_request *req,
1295                                        struct obd_device *obd, int *process)
1296 {
1297         switch (lustre_msg_get_opc(req->rq_reqmsg)) {
1298         case MDS_CONNECT: /* This will never get here, but for completeness. */
1299         case OST_CONNECT: /* This will never get here, but for completeness. */
1300         case MDS_DISCONNECT:
1301         case OST_DISCONNECT:
1302                *process = 1;
1303                RETURN(0);
1304
1305         case MDS_CLOSE:
1306         case MDS_SYNC: /* used in unmounting */
1307         case OBD_PING:
1308         case MDS_REINT:
1309         case LDLM_ENQUEUE:
1310                 *process = target_queue_recovery_request(req, obd);
1311                 RETURN(0);
1312
1313         default:
1314                 DEBUG_REQ(D_ERROR, req, "not permitted during recovery");
1315                 *process = 0;
1316                 /* XXX what should we set rq_status to here? */
1317                 req->rq_status = -EAGAIN;
1318                 RETURN(ptlrpc_error(req));
1319         }
1320 }
1321
1322 static char *reint_names[] = {
1323         [REINT_SETATTR] "setattr",
1324         [REINT_CREATE]  "create",
1325         [REINT_LINK]    "link",
1326         [REINT_UNLINK]  "unlink",
1327         [REINT_RENAME]  "rename",
1328         [REINT_OPEN]    "open",
1329 };
1330
1331 static int mds_set_info_rpc(struct obd_export *exp, struct ptlrpc_request *req)
1332 {
1333         void *key, *val;
1334         int keylen, vallen, rc = 0;
1335         ENTRY;
1336
1337         key = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF, 1);
1338         if (key == NULL) {
1339                 DEBUG_REQ(D_HA, req, "no set_info key");
1340                 RETURN(-EFAULT);
1341         }
1342         keylen = lustre_msg_buflen(req->rq_reqmsg, REQ_REC_OFF);
1343
1344         val = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF + 1, 0);
1345         vallen = lustre_msg_buflen(req->rq_reqmsg, REQ_REC_OFF + 1);
1346
1347         rc = lustre_pack_reply(req, 1, NULL, NULL);
1348         if (rc)
1349                 RETURN(rc);
1350         lustre_msg_set_status(req->rq_repmsg, 0);
1351
1352         if (KEY_IS("read-only")) {
1353                 if (val == NULL || vallen < sizeof(__u32)) {
1354                         DEBUG_REQ(D_HA, req, "no set_info val");
1355                         RETURN(-EFAULT);
1356                 }
1357
1358                 if (*(__u32 *)val)
1359                         exp->exp_connect_flags |= OBD_CONNECT_RDONLY;
1360                 else
1361                         exp->exp_connect_flags &= ~OBD_CONNECT_RDONLY;
1362         } else {
1363                 RETURN(-EINVAL);
1364         }
1365
1366         RETURN(0);
1367 }
1368
1369 static int mds_handle_quotacheck(struct ptlrpc_request *req)
1370 {
1371         struct obd_quotactl *oqctl;
1372         int rc;
1373         ENTRY;
1374
1375         oqctl = lustre_swab_reqbuf(req, REQ_REC_OFF, sizeof(*oqctl),
1376                                    lustre_swab_obd_quotactl);
1377         if (oqctl == NULL)
1378                 RETURN(-EPROTO);
1379
1380         rc = lustre_pack_reply(req, 1, NULL, NULL);
1381         if (rc) {
1382                 CERROR("mds: out of memory while packing quotacheck reply\n");
1383                 RETURN(rc);
1384         }
1385
1386         req->rq_status = obd_quotacheck(req->rq_export, oqctl);
1387         RETURN(0);
1388 }
1389
1390 static int mds_handle_quotactl(struct ptlrpc_request *req)
1391 {
1392         struct obd_quotactl *oqctl, *repoqc;
1393         int rc, size[2] = { sizeof(struct ptlrpc_body), sizeof(*repoqc) };
1394         ENTRY;
1395
1396         oqctl = lustre_swab_reqbuf(req, REQ_REC_OFF, sizeof(*oqctl),
1397                                    lustre_swab_obd_quotactl);
1398         if (oqctl == NULL)
1399                 RETURN(-EPROTO);
1400
1401         rc = lustre_pack_reply(req, 2, size, NULL);
1402         if (rc)
1403                 RETURN(rc);
1404
1405         repoqc = lustre_msg_buf(req->rq_repmsg, REPLY_REC_OFF, sizeof(*repoqc));
1406
1407         req->rq_status = obd_quotactl(req->rq_export, oqctl);
1408         *repoqc = *oqctl;
1409         RETURN(0);
1410 }
1411
1412 static int mds_msg_check_version(struct lustre_msg *msg)
1413 {
1414         int rc;
1415
1416         switch (lustre_msg_get_opc(msg)) {
1417         case MDS_CONNECT:
1418         case MDS_DISCONNECT:
1419         case OBD_PING:
1420                 rc = lustre_msg_check_version(msg, LUSTRE_OBD_VERSION);
1421                 if (rc)
1422                         CERROR("bad opc %u version %08x, expecting %08x\n",
1423                                lustre_msg_get_opc(msg),
1424                                lustre_msg_get_version(msg),
1425                                LUSTRE_OBD_VERSION);
1426                 break;
1427         case MDS_GETSTATUS:
1428         case MDS_GETATTR:
1429         case MDS_GETATTR_NAME:
1430         case MDS_STATFS:
1431         case MDS_READPAGE:
1432         case MDS_REINT:
1433         case MDS_CLOSE:
1434         case MDS_DONE_WRITING:
1435         case MDS_PIN:
1436         case MDS_SYNC:
1437         case MDS_GETXATTR:
1438         case MDS_SETXATTR:
1439         case MDS_SET_INFO:
1440         case MDS_QUOTACHECK:
1441         case MDS_QUOTACTL:
1442         case QUOTA_DQACQ:
1443         case QUOTA_DQREL:
1444                 rc = lustre_msg_check_version(msg, LUSTRE_MDS_VERSION);
1445                 if (rc)
1446                         CERROR("bad opc %u version %08x, expecting %08x\n",
1447                                lustre_msg_get_opc(msg),
1448                                lustre_msg_get_version(msg),
1449                                LUSTRE_MDS_VERSION);
1450                 break;
1451         case LDLM_ENQUEUE:
1452         case LDLM_CONVERT:
1453         case LDLM_BL_CALLBACK:
1454         case LDLM_CP_CALLBACK:
1455                 rc = lustre_msg_check_version(msg, LUSTRE_DLM_VERSION);
1456                 if (rc)
1457                         CERROR("bad opc %u version %08x, expecting %08x\n",
1458                                lustre_msg_get_opc(msg),
1459                                lustre_msg_get_version(msg),
1460                                LUSTRE_DLM_VERSION);
1461                 break;
1462         case OBD_LOG_CANCEL:
1463         case LLOG_ORIGIN_HANDLE_CREATE:
1464         case LLOG_ORIGIN_HANDLE_NEXT_BLOCK:
1465         case LLOG_ORIGIN_HANDLE_READ_HEADER:
1466         case LLOG_ORIGIN_HANDLE_CLOSE:
1467         case LLOG_ORIGIN_HANDLE_DESTROY:
1468         case LLOG_ORIGIN_HANDLE_PREV_BLOCK:
1469         case LLOG_CATINFO:
1470                 rc = lustre_msg_check_version(msg, LUSTRE_LOG_VERSION);
1471                 if (rc)
1472                         CERROR("bad opc %u version %08x, expecting %08x\n",
1473                                lustre_msg_get_opc(msg),
1474                                lustre_msg_get_version(msg),
1475                                LUSTRE_LOG_VERSION);
1476                 break;
1477         default:
1478                 CERROR("MDS unknown opcode %d\n", lustre_msg_get_opc(msg));
1479                 rc = -ENOTSUPP;
1480         }
1481         return rc;
1482 }
1483
1484 int mds_handle(struct ptlrpc_request *req)
1485 {
1486         int should_process, fail = OBD_FAIL_MDS_ALL_REPLY_NET;
1487         int rc = 0;
1488         struct mds_obd *mds = NULL; /* quell gcc overwarning */
1489         struct obd_device *obd = NULL;
1490         ENTRY;
1491
1492         OBD_FAIL_RETURN(OBD_FAIL_MDS_ALL_REQUEST_NET | OBD_FAIL_ONCE, 0);
1493
1494         LASSERT(current->journal_info == NULL);
1495
1496         rc = mds_msg_check_version(req->rq_reqmsg);
1497         if (rc) {
1498                 CERROR("MDS drop mal-formed request\n");
1499                 RETURN(rc);
1500         }
1501
1502         /* XXX identical to OST */
1503         if (lustre_msg_get_opc(req->rq_reqmsg) != MDS_CONNECT) {
1504                 struct mds_export_data *med;
1505                 int recovering, abort_recovery;
1506
1507                 if (req->rq_export == NULL) {
1508                         CERROR("operation %d on unconnected MDS from %s\n",
1509                                lustre_msg_get_opc(req->rq_reqmsg),
1510                                libcfs_id2str(req->rq_peer));
1511                         req->rq_status = -ENOTCONN;
1512                         GOTO(out, rc = -ENOTCONN);
1513                 }
1514
1515                 med = &req->rq_export->exp_mds_data;
1516                 obd = req->rq_export->exp_obd;
1517                 mds = &obd->u.mds;
1518
1519                 /* sanity check: if the xid matches, the request must
1520                  * be marked as a resent or replayed */
1521                 if (req->rq_xid == le64_to_cpu(med->med_mcd->mcd_last_xid) ||
1522                    req->rq_xid == le64_to_cpu(med->med_mcd->mcd_last_close_xid))
1523                         if (!(lustre_msg_get_flags(req->rq_reqmsg) &
1524                                  (MSG_RESENT | MSG_REPLAY))) {
1525                                 CERROR("rq_xid "LPU64" matches last_xid, "
1526                                        "expected RESENT flag\n",
1527                                         req->rq_xid);
1528                                 req->rq_status = -ENOTCONN;
1529                                 GOTO(out, rc = -EFAULT);
1530                         }
1531                 /* else: note the opposite is not always true; a
1532                  * RESENT req after a failover will usually not match
1533                  * the last_xid, since it was likely never
1534                  * committed. A REPLAYed request will almost never
1535                  * match the last xid, however it could for a
1536                  * committed, but still retained, open. */
1537
1538                 /* Check for aborted recovery. */
1539                 spin_lock_bh(&obd->obd_processing_task_lock);
1540                 abort_recovery = obd->obd_abort_recovery;
1541                 recovering = obd->obd_recovering;
1542                 spin_unlock_bh(&obd->obd_processing_task_lock);
1543                 if (abort_recovery) {
1544                         target_abort_recovery(obd);
1545                 } else if (recovering) {
1546                         rc = mds_filter_recovery_request(req, obd,
1547                                                          &should_process);
1548                         if (rc || !should_process)
1549                                 RETURN(rc);
1550                 }
1551         }
1552
1553         switch (lustre_msg_get_opc(req->rq_reqmsg)) {
1554         case MDS_CONNECT:
1555                 DEBUG_REQ(D_INODE, req, "connect");
1556                 OBD_FAIL_RETURN(OBD_FAIL_MDS_CONNECT_NET, 0);
1557                 rc = target_handle_connect(req, mds_handle);
1558                 if (!rc) {
1559                         /* Now that we have an export, set mds. */
1560                         obd = req->rq_export->exp_obd;
1561                         mds = mds_req2mds(req);
1562                 }
1563                 break;
1564
1565         case MDS_DISCONNECT:
1566                 DEBUG_REQ(D_INODE, req, "disconnect");
1567                 OBD_FAIL_RETURN(OBD_FAIL_MDS_DISCONNECT_NET, 0);
1568                 rc = target_handle_disconnect(req);
1569                 req->rq_status = rc;            /* superfluous? */
1570                 break;
1571
1572         case MDS_GETSTATUS:
1573                 DEBUG_REQ(D_INODE, req, "getstatus");
1574                 OBD_FAIL_RETURN(OBD_FAIL_MDS_GETSTATUS_NET, 0);
1575                 rc = mds_getstatus(req);
1576                 break;
1577
1578         case MDS_GETATTR:
1579                 DEBUG_REQ(D_INODE, req, "getattr");
1580                 OBD_FAIL_RETURN(OBD_FAIL_MDS_GETATTR_NET, 0);
1581                 rc = mds_getattr(req, REQ_REC_OFF);
1582                 break;
1583
1584         case MDS_SETXATTR:
1585                 DEBUG_REQ(D_INODE, req, "setxattr");
1586                 OBD_FAIL_RETURN(OBD_FAIL_MDS_SETXATTR_NET, 0);
1587                 rc = mds_setxattr(req);
1588                 break;
1589
1590         case MDS_GETXATTR:
1591                 DEBUG_REQ(D_INODE, req, "getxattr");
1592                 OBD_FAIL_RETURN(OBD_FAIL_MDS_GETXATTR_NET, 0);
1593                 rc = mds_getxattr(req);
1594                 break;
1595
1596         case MDS_GETATTR_NAME: {
1597                 struct lustre_handle lockh = { 0 };
1598                 DEBUG_REQ(D_INODE, req, "getattr_name");
1599                 OBD_FAIL_RETURN(OBD_FAIL_MDS_GETATTR_NAME_NET, 0);
1600
1601                 /* If this request gets a reconstructed reply, we won't be
1602                  * acquiring any new locks in mds_getattr_lock, so we don't
1603                  * want to cancel.
1604                  */
1605                 rc = mds_getattr_lock(req, REQ_REC_OFF, MDS_INODELOCK_UPDATE,
1606                                       &lockh);
1607                 /* this non-intent call (from an ioctl) is special */
1608                 req->rq_status = rc;
1609                 if (rc == 0 && lustre_handle_is_used(&lockh))
1610                         ldlm_lock_decref(&lockh, LCK_CR);
1611                 break;
1612         }
1613         case MDS_STATFS:
1614                 DEBUG_REQ(D_INODE, req, "statfs");
1615                 OBD_FAIL_RETURN(OBD_FAIL_MDS_STATFS_NET, 0);
1616                 rc = mds_statfs(req);
1617                 break;
1618
1619         case MDS_READPAGE:
1620                 DEBUG_REQ(D_INODE, req, "readpage");
1621                 OBD_FAIL_RETURN(OBD_FAIL_MDS_READPAGE_NET, 0);
1622                 rc = mds_readpage(req, REQ_REC_OFF);
1623
1624                 if (OBD_FAIL_CHECK_ONCE(OBD_FAIL_MDS_SENDPAGE)) {
1625                         RETURN(0);
1626                 }
1627
1628                 break;
1629
1630         case MDS_REINT: {
1631                 __u32 *opcp = lustre_msg_buf(req->rq_reqmsg, REQ_REC_OFF,
1632                                              sizeof(*opcp));
1633                 __u32  opc;
1634                 int size[4] = { sizeof(struct ptlrpc_body),
1635                                 sizeof(struct mds_body),
1636                                 mds->mds_max_mdsize,
1637                                 mds->mds_max_cookiesize };
1638                 int bufcount;
1639
1640                 /* NB only peek inside req now; mds_reint() will swab it */
1641                 if (opcp == NULL) {
1642                         CERROR ("Can't inspect opcode\n");
1643                         rc = -EINVAL;
1644                         break;
1645                 }
1646                 opc = *opcp;
1647                 if (lustre_msg_swabbed(req->rq_reqmsg))
1648                         __swab32s(&opc);
1649
1650                 DEBUG_REQ(D_INODE, req, "reint %d (%s)", opc,
1651                           (opc < sizeof(reint_names) / sizeof(reint_names[0]) ||
1652                            reint_names[opc] == NULL) ? reint_names[opc] :
1653                                                        "unknown opcode");
1654
1655                 OBD_FAIL_RETURN(OBD_FAIL_MDS_REINT_NET, 0);
1656
1657                 if (opc == REINT_UNLINK || opc == REINT_RENAME)
1658                         bufcount = 4;
1659                 else if (opc == REINT_OPEN)
1660                         bufcount = 3;
1661                 else
1662                         bufcount = 2;
1663
1664                 rc = lustre_pack_reply(req, bufcount, size, NULL);
1665                 if (rc)
1666                         break;
1667
1668                 rc = mds_reint(req, REQ_REC_OFF, NULL);
1669                 fail = OBD_FAIL_MDS_REINT_NET_REP;
1670                 break;
1671         }
1672
1673         case MDS_CLOSE:
1674                 DEBUG_REQ(D_INODE, req, "close");
1675                 OBD_FAIL_RETURN(OBD_FAIL_MDS_CLOSE_NET, 0);
1676                 rc = mds_close(req, REQ_REC_OFF);
1677                 break;
1678
1679         case MDS_DONE_WRITING:
1680                 DEBUG_REQ(D_INODE, req, "done_writing");
1681                 OBD_FAIL_RETURN(OBD_FAIL_MDS_DONE_WRITING_NET, 0);
1682                 rc = mds_done_writing(req, REQ_REC_OFF);
1683                 break;
1684
1685         case MDS_PIN:
1686                 DEBUG_REQ(D_INODE, req, "pin");
1687                 OBD_FAIL_RETURN(OBD_FAIL_MDS_PIN_NET, 0);
1688                 rc = mds_pin(req, REQ_REC_OFF);
1689                 break;
1690
1691         case MDS_SYNC:
1692                 DEBUG_REQ(D_INODE, req, "sync");
1693                 OBD_FAIL_RETURN(OBD_FAIL_MDS_SYNC_NET, 0);
1694                 rc = mds_sync(req, REQ_REC_OFF);
1695                 break;
1696
1697         case MDS_SET_INFO:
1698                 DEBUG_REQ(D_INODE, req, "set_info");
1699                 rc = mds_set_info_rpc(req->rq_export, req);
1700                 break;
1701
1702         case MDS_QUOTACHECK:
1703                 DEBUG_REQ(D_INODE, req, "quotacheck");
1704                 OBD_FAIL_RETURN(OBD_FAIL_MDS_QUOTACHECK_NET, 0);
1705                 rc = mds_handle_quotacheck(req);
1706                 break;
1707
1708         case MDS_QUOTACTL:
1709                 DEBUG_REQ(D_INODE, req, "quotactl");
1710                 OBD_FAIL_RETURN(OBD_FAIL_MDS_QUOTACTL_NET, 0);
1711                 rc = mds_handle_quotactl(req);
1712                 break;
1713
1714         case OBD_PING:
1715                 DEBUG_REQ(D_INODE, req, "ping");
1716                 rc = target_handle_ping(req);
1717                 break;
1718
1719         case OBD_LOG_CANCEL:
1720                 CDEBUG(D_INODE, "log cancel\n");
1721                 OBD_FAIL_RETURN(OBD_FAIL_OBD_LOG_CANCEL_NET, 0);
1722                 rc = -ENOTSUPP; /* la la la */
1723                 break;
1724
1725         case LDLM_ENQUEUE:
1726                 DEBUG_REQ(D_INODE, req, "enqueue");
1727                 OBD_FAIL_RETURN(OBD_FAIL_LDLM_ENQUEUE, 0);
1728                 rc = ldlm_handle_enqueue(req, ldlm_server_completion_ast,
1729                                          ldlm_server_blocking_ast, NULL);
1730                 fail = OBD_FAIL_LDLM_REPLY;
1731                 break;
1732         case LDLM_CONVERT:
1733                 DEBUG_REQ(D_INODE, req, "convert");
1734                 OBD_FAIL_RETURN(OBD_FAIL_LDLM_CONVERT, 0);
1735                 rc = ldlm_handle_convert(req);
1736                 break;
1737         case LDLM_BL_CALLBACK:
1738         case LDLM_CP_CALLBACK:
1739                 DEBUG_REQ(D_INODE, req, "callback");
1740                 CERROR("callbacks should not happen on MDS\n");
1741                 LBUG();
1742                 OBD_FAIL_RETURN(OBD_FAIL_LDLM_BL_CALLBACK, 0);
1743                 break;
1744         case LLOG_ORIGIN_HANDLE_CREATE:
1745                 DEBUG_REQ(D_INODE, req, "llog_init");
1746                 OBD_FAIL_RETURN(OBD_FAIL_OBD_LOGD_NET, 0);
1747                 rc = llog_origin_handle_create(req);
1748                 break;
1749         case LLOG_ORIGIN_HANDLE_DESTROY:
1750                 DEBUG_REQ(D_INODE, req, "llog_init");
1751                 OBD_FAIL_RETURN(OBD_FAIL_OBD_LOGD_NET, 0);
1752                 rc = llog_origin_handle_destroy(req);
1753                 break;
1754         case LLOG_ORIGIN_HANDLE_NEXT_BLOCK:
1755                 DEBUG_REQ(D_INODE, req, "llog next block");
1756                 OBD_FAIL_RETURN(OBD_FAIL_OBD_LOGD_NET, 0);
1757                 rc = llog_origin_handle_next_block(req);
1758                 break;
1759         case LLOG_ORIGIN_HANDLE_PREV_BLOCK:
1760                 DEBUG_REQ(D_INODE, req, "llog prev block");
1761                 OBD_FAIL_RETURN(OBD_FAIL_OBD_LOGD_NET, 0);
1762                 rc = llog_origin_handle_prev_block(req);
1763                 break;
1764         case LLOG_ORIGIN_HANDLE_READ_HEADER:
1765                 DEBUG_REQ(D_INODE, req, "llog read header");
1766                 OBD_FAIL_RETURN(OBD_FAIL_OBD_LOGD_NET, 0);
1767                 rc = llog_origin_handle_read_header(req);
1768                 break;
1769         case LLOG_ORIGIN_HANDLE_CLOSE:
1770                 DEBUG_REQ(D_INODE, req, "llog close");
1771                 OBD_FAIL_RETURN(OBD_FAIL_OBD_LOGD_NET, 0);
1772                 rc = llog_origin_handle_close(req);
1773                 break;
1774         case LLOG_CATINFO:
1775                 DEBUG_REQ(D_INODE, req, "llog catinfo");
1776                 OBD_FAIL_RETURN(OBD_FAIL_OBD_LOGD_NET, 0);
1777                 rc = llog_catinfo(req);
1778                 break;
1779         default:
1780                 req->rq_status = -ENOTSUPP;
1781                 rc = ptlrpc_error(req);
1782                 RETURN(rc);
1783         }
1784
1785         LASSERT(current->journal_info == NULL);
1786
1787         /* If we're DISCONNECTing, the mds_export_data is already freed */
1788         if (!rc && lustre_msg_get_opc(req->rq_reqmsg) != MDS_DISCONNECT) {
1789                 struct mds_export_data *med = &req->rq_export->exp_mds_data;
1790                 
1791                 /* I don't think last_xid is used for anyway, so I'm not sure
1792                    if we need to care about last_close_xid here.*/
1793                 lustre_msg_set_last_xid(req->rq_repmsg,
1794                                        le64_to_cpu(med->med_mcd->mcd_last_xid));
1795
1796                 target_committed_to_req(req);
1797         }
1798
1799         EXIT;
1800  out:
1801
1802         if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_LAST_REPLAY) {
1803                 if (obd && obd->obd_recovering) {
1804                         DEBUG_REQ(D_HA, req, "LAST_REPLAY, queuing reply");
1805                         return target_queue_last_replay_reply(req, rc);
1806                 }
1807                 /* Lost a race with recovery; let the error path DTRT. */
1808                 rc = req->rq_status = -ENOTCONN;
1809         }
1810
1811         target_send_reply(req, rc, fail);
1812         return 0;
1813 }
1814
1815 /* Update the server data on disk.  This stores the new mount_count and
1816  * also the last_rcvd value to disk.  If we don't have a clean shutdown,
1817  * then the server last_rcvd value may be less than that of the clients.
1818  * This will alert us that we may need to do client recovery.
1819  *
1820  * Also assumes for mds_last_transno that we are not modifying it (no locking).
1821  */
1822 int mds_update_server_data(struct obd_device *obd, int force_sync)
1823 {
1824         struct mds_obd *mds = &obd->u.mds;
1825         struct lr_server_data *lsd = mds->mds_server_data;
1826         struct file *filp = mds->mds_rcvd_filp;
1827         struct lvfs_run_ctxt saved;
1828         loff_t off = 0;
1829         int rc;
1830         ENTRY;
1831
1832         CDEBUG(D_SUPER, "MDS mount_count is "LPU64", last_transno is "LPU64"\n",
1833                mds->mds_mount_count, mds->mds_last_transno);
1834
1835         spin_lock(&mds->mds_transno_lock);
1836         lsd->lsd_last_transno = cpu_to_le64(mds->mds_last_transno);
1837         spin_unlock(&mds->mds_transno_lock);
1838
1839         push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
1840         rc = fsfilt_write_record(obd, filp, lsd, sizeof(*lsd), &off,force_sync);
1841         pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
1842         if (rc)
1843                 CERROR("error writing MDS server data: rc = %d\n", rc);
1844
1845         RETURN(rc);
1846 }
1847
1848 static void fsoptions_to_mds_flags(struct mds_obd *mds, char *options)
1849 {
1850         char *p = options;
1851
1852         if (!options)
1853                 return;
1854
1855         while (*options) {
1856                 int len;
1857
1858                 while (*p && *p != ',')
1859                         p++;
1860
1861                 len = p - options;
1862                 if (len == sizeof("user_xattr") - 1 &&
1863                     memcmp(options, "user_xattr", len) == 0) {
1864                         mds->mds_fl_user_xattr = 1;
1865                         LCONSOLE_INFO("Enabling user_xattr\n");
1866                 } else if (len == sizeof("nouser_xattr") - 1 &&
1867                            memcmp(options, "nouser_xattr", len) == 0) {
1868                         mds->mds_fl_user_xattr = 0;
1869                         LCONSOLE_INFO("Disabling user_xattr\n");
1870                 } else if (len == sizeof("acl") - 1 &&
1871                            memcmp(options, "acl", len) == 0) {
1872 #ifdef CONFIG_FS_POSIX_ACL
1873                         mds->mds_fl_acl = 1;
1874                         LCONSOLE_INFO("Enabling ACL\n");
1875 #else
1876                         CWARN("ignoring unsupported acl mount option\n");
1877 #endif
1878                 } else if (len == sizeof("noacl") - 1 &&
1879                            memcmp(options, "noacl", len) == 0) {
1880 #ifdef CONFIG_FS_POSIX_ACL
1881                         mds->mds_fl_acl = 0;
1882                         LCONSOLE_INFO("Disabling ACL\n");
1883 #endif
1884                 }
1885
1886                 options = ++p;
1887         }
1888 }
1889
1890 static int mds_nid_stats_clear_read(char *page, char **start, off_t off,
1891                                     int count, int *eof,  void *data)
1892 {
1893         *eof = 1;
1894         return snprintf(page, count, "%s\n",
1895                         "Write into this file to clear all nid stats and "
1896                         "stale nid entries");
1897 }
1898
1899 static int mds_nid_stats_clear_write(struct file *file, const char *buffer,
1900                                      unsigned long count, void *data)
1901 {
1902         struct obd_device *obd = (struct obd_device *)data;
1903         struct list_head *nids= &obd->obd_proc_nid_list;
1904         nid_stat_t *client_stat = NULL, *nxt;
1905
1906         spin_lock(&obd->nid_lock);
1907
1908         list_for_each_entry_safe (client_stat, nxt, nids, nid_chain) {
1909                 if (!client_stat->nid_exp_ref_count)
1910                         lprocfs_free_client_stats(client_stat);
1911                 else if (client_stat->nid_stats) {
1912                         lprocfs_clear_stats(client_stat->nid_stats);
1913                 }
1914         }
1915
1916         spin_unlock(&obd->nid_lock);
1917
1918         return count;
1919 }
1920
1921
1922 /* mount the file system (secretly).  lustre_cfg parameters are:
1923  * 1 = device
1924  * 2 = fstype
1925  * 3 = config name
1926  * 4 = mount options
1927  */
1928 static int mds_setup(struct obd_device *obd, obd_count len, void *buf)
1929 {
1930         struct lprocfs_static_vars lvars;
1931         struct lustre_cfg* lcfg = buf;
1932         struct mds_obd *mds = &obd->u.mds;
1933         struct lustre_sb_info *lsi;
1934         struct lustre_mount_info *lmi;
1935         struct vfsmount *mnt;
1936         struct obd_uuid uuid;
1937         __u8 *uuid_ptr;
1938         char *str, *label;
1939         char ns_name[48];
1940         int rc = 0;
1941         ENTRY;
1942
1943         /* setup 1:/dev/loop/0 2:ext3 3:mdsA 4:errors=remount-ro,iopen_nopriv */
1944
1945         CLASSERT(offsetof(struct obd_device, u.obt) ==
1946                  offsetof(struct obd_device, u.mds.mds_obt));
1947
1948         if (lcfg->lcfg_bufcount < 3)
1949                 RETURN(-EINVAL);
1950
1951         if (LUSTRE_CFG_BUFLEN(lcfg, 1) == 0 || LUSTRE_CFG_BUFLEN(lcfg, 2) == 0)
1952                 RETURN(-EINVAL);
1953
1954         lmi = server_get_mount(obd->obd_name);
1955         if (!lmi) {
1956                 CERROR("Not mounted in lustre_fill_super?\n");
1957                 RETURN(-EINVAL);
1958         }
1959
1960         /* We mounted in lustre_fill_super.
1961            lcfg bufs 1, 2, 4 (device, fstype, mount opts) are ignored.*/
1962         lsi = s2lsi(lmi->lmi_sb);
1963         fsoptions_to_mds_flags(mds, lsi->lsi_ldd->ldd_mount_opts);
1964         fsoptions_to_mds_flags(mds, lsi->lsi_lmd->lmd_opts);
1965         mnt = lmi->lmi_mnt;
1966         obd->obd_fsops = fsfilt_get_ops(MT_STR(lsi->lsi_ldd));
1967         if (IS_ERR(obd->obd_fsops))
1968                 GOTO(err_put, rc = PTR_ERR(obd->obd_fsops));
1969
1970         CDEBUG(D_SUPER, "%s: mnt = %p\n", lustre_cfg_string(lcfg, 1), mnt);
1971
1972         LASSERT(!lvfs_check_rdonly(lvfs_sbdev(mnt->mnt_sb)));
1973
1974         sema_init(&mds->mds_epoch_sem, 1);
1975         spin_lock_init(&mds->mds_transno_lock);
1976         mds->mds_max_mdsize = sizeof(struct lov_mds_md);
1977         mds->mds_max_cookiesize = sizeof(struct llog_cookie);
1978         mds->mds_atime_diff = MAX_ATIME_DIFF;
1979         mds->mds_evict_ost_nids = 1;
1980
1981         sprintf(ns_name, "mds-%s", obd->obd_uuid.uuid);
1982         obd->obd_namespace = ldlm_namespace_new(ns_name, LDLM_NAMESPACE_SERVER,
1983                                                 LDLM_NAMESPACE_GREEDY);
1984         if (obd->obd_namespace == NULL) {
1985                 mds_cleanup(obd);
1986                 GOTO(err_ops, rc = -ENOMEM);
1987         }
1988         ldlm_register_intent(obd->obd_namespace, mds_intent_policy);
1989
1990         lprocfs_init_vars(mds, &lvars);
1991         if (lprocfs_obd_setup(obd, lvars.obd_vars) == 0 &&
1992             lprocfs_alloc_obd_stats(obd, LPROC_MDS_LAST) == 0) {
1993                 /* Init private stats here */
1994                 mds_stats_counter_init(obd->obd_stats);
1995                 obd->obd_proc_exports_entry = proc_mkdir("exports",
1996                                                          obd->obd_proc_entry);
1997         }
1998
1999         rc = mds_fs_setup(obd, mnt);
2000         if (rc) {
2001                 CERROR("%s: MDS filesystem method init failed: rc = %d\n",
2002                        obd->obd_name, rc);
2003                 GOTO(err_ns, rc);
2004         }
2005
2006         if (obd->obd_proc_exports_entry)
2007                 lprocfs_add_simple(obd->obd_proc_exports_entry,
2008                                    "clear", mds_nid_stats_clear_read,
2009                                    mds_nid_stats_clear_write, obd);
2010
2011         if (lcfg->lcfg_bufcount >= 4 && LUSTRE_CFG_BUFLEN(lcfg, 3) > 0) {
2012                 class_uuid_t uuid;
2013
2014                 ll_generate_random_uuid(uuid);
2015                 class_uuid_unparse(uuid, &mds->mds_lov_uuid);
2016
2017                 OBD_ALLOC(mds->mds_profile, LUSTRE_CFG_BUFLEN(lcfg, 3));
2018                 if (mds->mds_profile == NULL)
2019                         GOTO(err_fs, rc = -ENOMEM);
2020
2021                 strncpy(mds->mds_profile, lustre_cfg_string(lcfg, 3),
2022                         LUSTRE_CFG_BUFLEN(lcfg, 3));
2023         }
2024
2025         ptlrpc_init_client(LDLM_CB_REQUEST_PORTAL, LDLM_CB_REPLY_PORTAL,
2026                            "mds_ldlm_client", &obd->obd_ldlm_client);
2027         obd->obd_replayable = 1;
2028
2029         rc = lquota_setup(mds_quota_interface_ref, obd);
2030         if (rc)
2031                 GOTO(err_fs, rc);
2032
2033         mds->mds_group_hash = upcall_cache_init(obd->obd_name);
2034         if (IS_ERR(mds->mds_group_hash)) {
2035                 rc = PTR_ERR(mds->mds_group_hash);
2036                 mds->mds_group_hash = NULL;
2037                 GOTO(err_qctxt, rc);
2038         }
2039
2040         /* Don't wait for mds_postrecov trying to clear orphans */
2041         obd->obd_async_recov = 1;
2042         rc = mds_postsetup(obd);
2043         /* Bug 11557 - allow async abort_recov start
2044            FIXME can remove most of this obd_async_recov plumbing
2045         obd->obd_async_recov = 0;
2046         */
2047         if (rc)
2048                 GOTO(err_qctxt, rc);
2049
2050         uuid_ptr = fsfilt_uuid(obd, obd->u.obt.obt_sb);
2051         if (uuid_ptr != NULL) {
2052                 class_uuid_unparse(uuid_ptr, &uuid);
2053                 str = uuid.uuid;
2054         } else {
2055                 str = "no UUID";
2056         }
2057
2058         label = fsfilt_get_label(obd, obd->u.obt.obt_sb);
2059         if (obd->obd_recovering) {
2060                 LCONSOLE_WARN("MDT %s now serving %s (%s%s%s), but will be in "
2061                               "recovery until %d %s reconnect, or if no clients"
2062                               " reconnect for %d:%.02d; during that time new "
2063                               "clients will not be allowed to connect. "
2064                               "Recovery progress can be monitored by watching "
2065                               "/proc/fs/lustre/mds/%s/recovery_status.\n",
2066                               obd->obd_name, lustre_cfg_string(lcfg, 1),
2067                               label ?: "", label ? "/" : "", str,
2068                               obd->obd_recoverable_clients,
2069                               (obd->obd_recoverable_clients == 1) ?
2070                               "client" : "clients",
2071                               obd->obd_recovery_timeout / 60,
2072                               obd->obd_recovery_timeout % 60,
2073                               obd->obd_name);
2074         } else {
2075                 LCONSOLE_INFO("MDT %s now serving %s (%s%s%s) with recovery "
2076                               "%s\n", obd->obd_name, lustre_cfg_string(lcfg, 1),
2077                               label ?: "", label ? "/" : "", str,
2078                               obd->obd_replayable ? "enabled" : "disabled");
2079         }
2080
2081         if (ldlm_timeout == LDLM_TIMEOUT_DEFAULT)
2082                 ldlm_timeout = 6;
2083
2084         RETURN(0);
2085
2086 err_qctxt:
2087         lquota_cleanup(mds_quota_interface_ref, obd);
2088 err_fs:
2089         /* No extra cleanup needed for llog_init_commit_thread() */
2090         mds_fs_cleanup(obd);
2091         upcall_cache_cleanup(mds->mds_group_hash);
2092         mds->mds_group_hash = NULL;
2093 err_ns:
2094         lprocfs_obd_cleanup(obd);
2095         lprocfs_free_obd_stats(obd);
2096         ldlm_namespace_free(obd->obd_namespace, 0);
2097         obd->obd_namespace = NULL;
2098 err_ops:
2099         fsfilt_put_ops(obd->obd_fsops);
2100 err_put:
2101         server_put_mount(obd->obd_name, mnt);
2102         obd->u.obt.obt_sb = NULL;
2103         return rc;
2104 }
2105
2106 static int mds_lov_clean(struct obd_device *obd)
2107 {
2108         struct mds_obd *mds = &obd->u.mds;
2109         struct obd_device *osc = mds->mds_osc_obd;
2110         ENTRY;
2111
2112         if (mds->mds_profile) {
2113                 class_del_profile(mds->mds_profile);
2114                 OBD_FREE(mds->mds_profile, strlen(mds->mds_profile) + 1);
2115                 mds->mds_profile = NULL;
2116         }
2117
2118         /* There better be a lov */
2119         if (!osc)
2120                 RETURN(0);
2121         if (IS_ERR(osc))
2122                 RETURN(PTR_ERR(osc));
2123
2124         obd_register_observer(osc, NULL);
2125
2126         /* Give lov our same shutdown flags */
2127         osc->obd_force = obd->obd_force;
2128         osc->obd_fail = obd->obd_fail;
2129
2130         /* Cleanup the lov */
2131         obd_disconnect(mds->mds_osc_exp);
2132         class_manual_cleanup(osc);
2133         mds->mds_osc_exp = NULL;
2134
2135         RETURN(0);
2136 }
2137
2138 static int mds_postsetup(struct obd_device *obd)
2139 {
2140         struct mds_obd *mds = &obd->u.mds;
2141         int rc = 0;
2142         ENTRY;
2143
2144         rc = llog_setup(obd, LLOG_CONFIG_ORIG_CTXT, obd, 0, NULL,
2145                         &llog_lvfs_ops);
2146         if (rc)
2147                 RETURN(rc);
2148
2149         rc = llog_setup(obd, LLOG_LOVEA_ORIG_CTXT, obd, 0, NULL,
2150                         &llog_lvfs_ops);
2151         if (rc)
2152                 RETURN(rc);
2153
2154         if (mds->mds_profile) {
2155                 struct lustre_profile *lprof;
2156                 /* The profile defines which osc and mdc to connect to, for a 
2157                    client.  We reuse that here to figure out the name of the
2158                    lov to use (and ignore lprof->lp_mdc).
2159                    The profile was set in the config log with 
2160                    LCFG_MOUNTOPT profilenm oscnm mdcnm */
2161                 lprof = class_get_profile(mds->mds_profile);
2162                 if (lprof == NULL) {
2163                         CERROR("No profile found: %s\n", mds->mds_profile);
2164                         GOTO(err_cleanup, rc = -ENOENT);
2165                 }
2166                 rc = mds_lov_connect(obd, lprof->lp_osc);
2167                 if (rc)
2168                         GOTO(err_cleanup, rc);
2169         }
2170
2171         RETURN(rc);
2172
2173 err_cleanup:
2174         mds_lov_clean(obd);
2175         llog_cleanup(llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT));
2176         llog_cleanup(llog_get_context(obd, LLOG_LOVEA_ORIG_CTXT));
2177         RETURN(rc);
2178 }
2179
2180 int mds_postrecov(struct obd_device *obd)
2181 {
2182         struct llog_ctxt *ctxt;
2183         int rc;
2184         ENTRY;
2185
2186         if (obd->obd_fail)
2187                 RETURN(0);
2188
2189         LASSERT(!obd->obd_recovering);
2190         ctxt = llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT); 
2191         LASSERT(ctxt != NULL);
2192         llog_ctxt_put(ctxt);
2193
2194         /* set nextid first, so we are sure it happens */
2195         mutex_down(&obd->obd_dev_sem);
2196         rc = mds_lov_set_nextid(obd);
2197         mutex_up(&obd->obd_dev_sem);
2198         if (rc) {
2199                 CERROR("%s: mds_lov_set_nextid failed %d\n",
2200                        obd->obd_name, rc);
2201                 GOTO(out, rc);
2202         }
2203
2204         /* clean PENDING dir */
2205         rc = mds_cleanup_pending(obd);
2206         if (rc < 0)
2207                 GOTO(out, rc);
2208
2209         /* FIXME Does target_finish_recovery really need this to block? */
2210         /* Notify the LOV, which will in turn call mds_notify for each tgt */
2211         /* This means that we have to hack obd_notify to think we're obd_set_up
2212            during mds_lov_connect. */
2213         obd_notify(obd->u.mds.mds_osc_obd, NULL, 
2214                    obd->obd_async_recov ? OBD_NOTIFY_SYNC_NONBLOCK :
2215                    OBD_NOTIFY_SYNC, NULL);
2216
2217         /* quota recovery */
2218         lquota_recovery(mds_quota_interface_ref, obd);
2219
2220 out:
2221         RETURN(rc);
2222 }
2223
2224 /* We need to be able to stop an mds_lov_synchronize */
2225 static int mds_lov_early_clean(struct obd_device *obd)
2226 {
2227         struct mds_obd *mds = &obd->u.mds;
2228         struct obd_device *osc = mds->mds_osc_obd;
2229
2230         if (!osc || (!obd->obd_force && !obd->obd_fail))
2231                 return(0);
2232
2233         CDEBUG(D_HA, "abort inflight\n");
2234         return (obd_precleanup(osc, OBD_CLEANUP_EARLY));
2235 }
2236
2237 static int mds_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
2238 {
2239         int rc = 0;
2240         ENTRY;
2241
2242         switch (stage) {
2243         case OBD_CLEANUP_EARLY:
2244                 break;
2245         case OBD_CLEANUP_EXPORTS:
2246                 target_cleanup_recovery(obd);
2247                 mds_lov_early_clean(obd);
2248                 break;
2249         case OBD_CLEANUP_SELF_EXP:
2250                 mds_lov_disconnect(obd);
2251                 mds_lov_clean(obd);
2252                 llog_cleanup(llog_get_context(obd, LLOG_CONFIG_ORIG_CTXT));
2253                 llog_cleanup(llog_get_context(obd, LLOG_LOVEA_ORIG_CTXT));
2254                 rc = obd_llog_finish(obd, 0);
2255                 break;
2256         case OBD_CLEANUP_OBD:
2257                 break;
2258         }
2259         RETURN(rc);
2260 }
2261
2262 static int mds_cleanup(struct obd_device *obd)
2263 {
2264         struct mds_obd *mds = &obd->u.mds;
2265         lvfs_sbdev_type save_dev;
2266         ENTRY;
2267
2268         if (obd->u.obt.obt_sb == NULL)
2269                 RETURN(0);
2270         save_dev = lvfs_sbdev(obd->u.obt.obt_sb);
2271
2272         if (mds->mds_osc_exp)
2273                 /* lov export was disconnected by mds_lov_clean;
2274                    we just need to drop our ref */
2275                 class_export_put(mds->mds_osc_exp);
2276
2277         lprocfs_free_per_client_stats(obd);
2278         remove_proc_entry("clear", obd->obd_proc_exports_entry);
2279         lprocfs_obd_cleanup(obd);
2280         lprocfs_free_obd_stats(obd);
2281
2282         lquota_cleanup(mds_quota_interface_ref, obd);
2283
2284         mds_update_server_data(obd, 1);
2285         if (mds->mds_lov_objids != NULL) 
2286                 OBD_FREE(mds->mds_lov_objids, mds->mds_lov_objids_size);
2287         mds_fs_cleanup(obd);
2288
2289         upcall_cache_cleanup(mds->mds_group_hash);
2290         mds->mds_group_hash = NULL;
2291
2292         server_put_mount(obd->obd_name, mds->mds_vfsmnt);
2293         obd->u.obt.obt_sb = NULL;
2294
2295         ldlm_namespace_free(obd->obd_namespace, obd->obd_force);
2296
2297         spin_lock_bh(&obd->obd_processing_task_lock);
2298         if (obd->obd_recovering) {
2299                 target_cancel_recovery_timer(obd);
2300                 obd->obd_recovering = 0;
2301         }
2302         spin_unlock_bh(&obd->obd_processing_task_lock);
2303
2304         fsfilt_put_ops(obd->obd_fsops);
2305
2306         LCONSOLE_INFO("MDT %s has stopped.\n", obd->obd_name);
2307
2308         RETURN(0);
2309 }
2310
2311 static void fixup_handle_for_resent_req(struct ptlrpc_request *req, int offset,
2312                                         struct ldlm_lock *new_lock,
2313                                         struct ldlm_lock **old_lock,
2314                                         struct lustre_handle *lockh)
2315 {
2316         struct obd_export *exp = req->rq_export;
2317         struct ldlm_request *dlmreq =
2318                 lustre_msg_buf(req->rq_reqmsg, offset, sizeof(*dlmreq));
2319         struct lustre_handle remote_hdl = dlmreq->lock_handle[0];
2320         struct list_head *iter;
2321
2322         if (!(lustre_msg_get_flags(req->rq_reqmsg) & MSG_RESENT))
2323                 return;
2324
2325         spin_lock(&exp->exp_ldlm_data.led_lock);
2326         list_for_each(iter, &exp->exp_ldlm_data.led_held_locks) {
2327                 struct ldlm_lock *lock;
2328                 lock = list_entry(iter, struct ldlm_lock, l_export_chain);
2329                 if (lock == new_lock)
2330                         continue;
2331                 if (lock->l_remote_handle.cookie == remote_hdl.cookie) {
2332                         lockh->cookie = lock->l_handle.h_cookie;
2333                         LDLM_DEBUG(lock, "restoring lock cookie");
2334                         DEBUG_REQ(D_DLMTRACE, req,"restoring lock cookie "LPX64,
2335                                   lockh->cookie);
2336                         if (old_lock)
2337                                 *old_lock = LDLM_LOCK_GET(lock);
2338                         spin_unlock(&exp->exp_ldlm_data.led_lock);
2339                         return;
2340                 }
2341         }
2342         spin_unlock(&exp->exp_ldlm_data.led_lock);
2343
2344         /* If the xid matches, then we know this is a resent request,
2345          * and allow it. (It's probably an OPEN, for which we don't
2346          * send a lock */
2347         if (req->rq_xid ==
2348             le64_to_cpu(exp->exp_mds_data.med_mcd->mcd_last_xid))
2349                 return;
2350
2351         if (req->rq_xid ==
2352             le64_to_cpu(exp->exp_mds_data.med_mcd->mcd_last_close_xid))
2353                 return;
2354
2355         /* This remote handle isn't enqueued, so we never received or
2356          * processed this request.  Clear MSG_RESENT, because it can
2357          * be handled like any normal request now. */
2358
2359         lustre_msg_clear_flags(req->rq_reqmsg, MSG_RESENT);
2360
2361         DEBUG_REQ(D_DLMTRACE, req, "no existing lock with rhandle "LPX64,
2362                   remote_hdl.cookie);
2363 }
2364
2365 int intent_disposition(struct ldlm_reply *rep, int flag)
2366 {
2367         if (!rep)
2368                 return 0;
2369         return (rep->lock_policy_res1 & flag);
2370 }
2371
2372 void intent_set_disposition(struct ldlm_reply *rep, int flag)
2373 {
2374         if (!rep)
2375                 return;
2376         rep->lock_policy_res1 |= flag;
2377 }
2378
2379 #define IS_CLIENT_DISCONNECT_ERROR(error) \
2380                 (error == -ENOTCONN || error == -ENODEV)
2381
2382 static int mds_intent_policy(struct ldlm_namespace *ns,
2383                              struct ldlm_lock **lockp, void *req_cookie,
2384                              ldlm_mode_t mode, int flags, void *data)
2385 {
2386         struct ptlrpc_request *req = req_cookie;
2387         struct ldlm_lock *lock = *lockp;
2388         struct ldlm_intent *it;
2389         struct mds_obd *mds = &req->rq_export->exp_obd->u.mds;
2390         struct ldlm_reply *rep;
2391         struct lustre_handle lockh = { 0 };
2392         struct ldlm_lock *new_lock = NULL;
2393         int getattr_part = MDS_INODELOCK_UPDATE;
2394         int repsize[5] = { [MSG_PTLRPC_BODY_OFF] = sizeof(struct ptlrpc_body),
2395                            [DLM_LOCKREPLY_OFF]   = sizeof(struct ldlm_reply),
2396                            [DLM_REPLY_REC_OFF]   = sizeof(struct mds_body),
2397                            [DLM_REPLY_REC_OFF+1] = mds->mds_max_mdsize };
2398         int repbufcnt = 4, rc;
2399         ENTRY;
2400
2401         LASSERT(req != NULL);
2402
2403         if (lustre_msg_bufcount(req->rq_reqmsg) <= DLM_INTENT_IT_OFF) {
2404                 /* No intent was provided */
2405                 rc = lustre_pack_reply(req, 2, repsize, NULL);
2406                 LASSERT(rc == 0);
2407                 RETURN(0);
2408         }
2409
2410         it = lustre_swab_reqbuf(req, DLM_INTENT_IT_OFF, sizeof(*it),
2411                                 lustre_swab_ldlm_intent);
2412         if (it == NULL) {
2413                 CERROR("Intent missing\n");
2414                 RETURN(req->rq_status = -EFAULT);
2415         }
2416
2417         LDLM_DEBUG(lock, "intent policy, opc: %s", ldlm_it2str(it->opc));
2418
2419         if ((req->rq_export->exp_connect_flags & OBD_CONNECT_ACL) &&
2420             (it->opc & (IT_OPEN | IT_GETATTR | IT_LOOKUP)))
2421                 /* we should never allow OBD_CONNECT_ACL if not configured */
2422                 repsize[repbufcnt++] = LUSTRE_POSIX_ACL_MAX_SIZE;
2423         else if (it->opc & IT_UNLINK)
2424                 repsize[repbufcnt++] = mds->mds_max_cookiesize;
2425
2426         rc = lustre_pack_reply(req, repbufcnt, repsize, NULL);
2427         if (rc)
2428                 RETURN(req->rq_status = rc);
2429
2430         rep = lustre_msg_buf(req->rq_repmsg, DLM_LOCKREPLY_OFF, sizeof(*rep));
2431         intent_set_disposition(rep, DISP_IT_EXECD);
2432
2433
2434         /* execute policy */
2435         switch ((long)it->opc) {
2436         case IT_OPEN:
2437         case IT_CREAT|IT_OPEN:
2438                 mds_counter_incr(req->rq_export, LPROC_MDS_OPEN);
2439                 fixup_handle_for_resent_req(req, DLM_LOCKREQ_OFF, lock, NULL,
2440                                             &lockh);
2441                 /* XXX swab here to assert that an mds_open reint
2442                  * packet is following */
2443                 rep->lock_policy_res2 = mds_reint(req, DLM_INTENT_REC_OFF,
2444                                                   &lockh);
2445 #if 0
2446                 /* We abort the lock if the lookup was negative and
2447                  * we did not make it to the OPEN portion */
2448                 if (!intent_disposition(rep, DISP_LOOKUP_EXECD))
2449                         RETURN(ELDLM_LOCK_ABORTED);
2450                 if (intent_disposition(rep, DISP_LOOKUP_NEG) &&
2451                     !intent_disposition(rep, DISP_OPEN_OPEN))
2452 #endif
2453
2454                 /* If there was an error of some sort or if we are not
2455                  * returning any locks */
2456                  if (rep->lock_policy_res2 ||
2457                      !intent_disposition(rep, DISP_OPEN_LOCK)) {
2458                         /* If it is the disconnect error (ENODEV & ENOCONN)
2459                          * ptlrpc layer should know this imediately, it should
2460                          * be replied by rq_stats, otherwise, return it by 
2461                          * intent here
2462                          */
2463                         if (IS_CLIENT_DISCONNECT_ERROR(rep->lock_policy_res2))
2464                                 RETURN(rep->lock_policy_res2);
2465                         else
2466                                 RETURN(ELDLM_LOCK_ABORTED);
2467                  }
2468                 break;
2469         case IT_LOOKUP:
2470                         getattr_part = MDS_INODELOCK_LOOKUP;
2471         case IT_GETATTR:
2472                         getattr_part |= MDS_INODELOCK_LOOKUP;
2473                         OBD_COUNTER_INCREMENT(req->rq_export->exp_obd, getattr);
2474         case IT_READDIR:
2475                 fixup_handle_for_resent_req(req, DLM_LOCKREQ_OFF, lock,
2476                                             &new_lock, &lockh);
2477
2478                 /* INODEBITS_INTEROP: if this lock was converted from a
2479                  * plain lock (client does not support inodebits), then
2480                  * child lock must be taken with both lookup and update
2481                  * bits set for all operations.
2482                  */
2483                 if (!(req->rq_export->exp_connect_flags & OBD_CONNECT_IBITS))
2484                         getattr_part = MDS_INODELOCK_LOOKUP |
2485                                        MDS_INODELOCK_UPDATE;
2486
2487                 rep->lock_policy_res2 = mds_getattr_lock(req,DLM_INTENT_REC_OFF,
2488                                                          getattr_part, &lockh);
2489                 /* FIXME: LDLM can set req->rq_status. MDS sets
2490                    policy_res{1,2} with disposition and status.
2491                    - replay: returns 0 & req->status is old status
2492                    - otherwise: returns req->status */
2493                 if (intent_disposition(rep, DISP_LOOKUP_NEG))
2494                         rep->lock_policy_res2 = 0;
2495                 if (!intent_disposition(rep, DISP_LOOKUP_POS) ||
2496                     rep->lock_policy_res2)
2497                         RETURN(ELDLM_LOCK_ABORTED);
2498                 if (req->rq_status != 0) {
2499                         LBUG();
2500                         rep->lock_policy_res2 = req->rq_status;
2501                         RETURN(ELDLM_LOCK_ABORTED);
2502                 }
2503                 break;
2504         default:
2505                 CERROR("Unhandled intent "LPD64"\n", it->opc);
2506                 RETURN(-EFAULT);
2507         }
2508
2509         /* By this point, whatever function we called above must have either
2510          * filled in 'lockh', been an intent replay, or returned an error.  We
2511          * want to allow replayed RPCs to not get a lock, since we would just
2512          * drop it below anyways because lock replay is done separately by the
2513          * client afterwards.  For regular RPCs we want to give the new lock to
2514          * the client instead of whatever lock it was about to get. */
2515         if (new_lock == NULL)
2516                 new_lock = ldlm_handle2lock(&lockh);
2517         if (new_lock == NULL && (flags & LDLM_FL_INTENT_ONLY))
2518                 RETURN(0);
2519
2520         LASSERTF(new_lock != NULL, "op "LPX64" lockh "LPX64"\n",
2521                  it->opc, lockh.cookie);
2522
2523         /* If we've already given this lock to a client once, then we should
2524          * have no readers or writers.  Otherwise, we should have one reader
2525          * _or_ writer ref (which will be zeroed below) before returning the
2526          * lock to a client. */
2527         if (new_lock->l_export == req->rq_export) {
2528                 LASSERT(new_lock->l_readers + new_lock->l_writers == 0);
2529         } else {
2530                 LASSERT(new_lock->l_export == NULL);
2531                 LASSERT(new_lock->l_readers + new_lock->l_writers == 1);
2532         }
2533
2534         *lockp = new_lock;
2535
2536         if (new_lock->l_export == req->rq_export) {
2537                 /* Already gave this to the client, which means that we
2538                  * reconstructed a reply. */
2539                 LASSERT(lustre_msg_get_flags(req->rq_reqmsg) &
2540                         MSG_RESENT);
2541                 RETURN(ELDLM_LOCK_REPLACED);
2542         }
2543
2544         /* Fixup the lock to be given to the client */
2545         lock_res_and_lock(new_lock);
2546         new_lock->l_readers = 0;
2547         new_lock->l_writers = 0;
2548
2549         new_lock->l_export = class_export_get(req->rq_export);
2550         spin_lock(&req->rq_export->exp_ldlm_data.led_lock);
2551         list_add(&new_lock->l_export_chain,
2552                  &new_lock->l_export->exp_ldlm_data.led_held_locks);
2553         spin_unlock(&req->rq_export->exp_ldlm_data.led_lock);
2554
2555         new_lock->l_blocking_ast = lock->l_blocking_ast;
2556         new_lock->l_completion_ast = lock->l_completion_ast;
2557
2558         memcpy(&new_lock->l_remote_handle, &lock->l_remote_handle,
2559                sizeof(lock->l_remote_handle));
2560
2561         new_lock->l_flags &= ~LDLM_FL_LOCAL;
2562
2563         unlock_res_and_lock(new_lock);
2564         LDLM_LOCK_PUT(new_lock);
2565
2566         RETURN(ELDLM_LOCK_REPLACED);
2567 }
2568
2569 static int mdt_setup(struct obd_device *obd, obd_count len, void *buf)
2570 {
2571         struct mds_obd *mds = &obd->u.mds;
2572         struct lprocfs_static_vars lvars;
2573         int mds_min_threads;
2574         int mds_max_threads;
2575         int rc = 0;
2576         ENTRY;
2577
2578         lprocfs_init_vars(mdt, &lvars);
2579         lprocfs_obd_setup(obd, lvars.obd_vars);
2580
2581         sema_init(&mds->mds_health_sem, 1);
2582
2583         if (mds_num_threads) {
2584                 /* If mds_num_threads is set, it is the min and the max. */
2585                 if (mds_num_threads > MDS_THREADS_MAX)
2586                         mds_num_threads = MDS_THREADS_MAX;
2587                 if (mds_num_threads < MDS_THREADS_MIN)
2588                         mds_num_threads = MDS_THREADS_MIN;
2589                 mds_max_threads = mds_min_threads = mds_num_threads;
2590         } else {
2591                 /* Base min threads on memory and cpus */
2592                 mds_min_threads = num_possible_cpus() * num_physpages >> 
2593                         (27 - CFS_PAGE_SHIFT);
2594                 if (mds_min_threads < MDS_THREADS_MIN)
2595                         mds_min_threads = MDS_THREADS_MIN;
2596                 /* Largest auto threads start value */
2597                 if (mds_min_threads > 32) 
2598                         mds_min_threads = 32;
2599                 mds_max_threads = min(MDS_THREADS_MAX, mds_min_threads * 4);
2600         }
2601
2602         mds->mds_service =
2603                 ptlrpc_init_svc(MDS_NBUFS, MDS_BUFSIZE, MDS_MAXREQSIZE,
2604                                 MDS_MAXREPSIZE, MDS_REQUEST_PORTAL,
2605                                 MDC_REPLY_PORTAL, MDS_SERVICE_WATCHDOG_FACTOR,
2606                                 mds_handle, LUSTRE_MDS_NAME,
2607                                 obd->obd_proc_entry, target_print_req,
2608                                 mds_min_threads, mds_max_threads, "ll_mdt");
2609
2610         if (!mds->mds_service) {
2611                 CERROR("failed to start service\n");
2612                 GOTO(err_lprocfs, rc = -ENOMEM);
2613         }
2614
2615         rc = ptlrpc_start_threads(obd, mds->mds_service);
2616         if (rc)
2617                 GOTO(err_thread, rc);
2618
2619         mds->mds_setattr_service =
2620                 ptlrpc_init_svc(MDS_NBUFS, MDS_BUFSIZE, MDS_MAXREQSIZE,
2621                                 MDS_MAXREPSIZE, MDS_SETATTR_PORTAL,
2622                                 MDC_REPLY_PORTAL, MDS_SERVICE_WATCHDOG_FACTOR,
2623                                 mds_handle, "mds_setattr",
2624                                 obd->obd_proc_entry, target_print_req,
2625                                 mds_min_threads, mds_max_threads,
2626                                 "ll_mdt_attr");
2627         if (!mds->mds_setattr_service) {
2628                 CERROR("failed to start getattr service\n");
2629                 GOTO(err_thread, rc = -ENOMEM);
2630         }
2631
2632         rc = ptlrpc_start_threads(obd, mds->mds_setattr_service);
2633         if (rc)
2634                 GOTO(err_thread2, rc);
2635
2636         mds->mds_readpage_service =
2637                 ptlrpc_init_svc(MDS_NBUFS, MDS_BUFSIZE, MDS_MAXREQSIZE,
2638                                 MDS_MAXREPSIZE, MDS_READPAGE_PORTAL,
2639                                 MDC_REPLY_PORTAL, MDS_SERVICE_WATCHDOG_FACTOR,
2640                                 mds_handle, "mds_readpage",
2641                                 obd->obd_proc_entry, target_print_req,
2642                                 MDS_THREADS_MIN_READPAGE, mds_max_threads,
2643                                 "ll_mdt_rdpg");
2644         if (!mds->mds_readpage_service) {
2645                 CERROR("failed to start readpage service\n");
2646                 GOTO(err_thread2, rc = -ENOMEM);
2647         }
2648
2649         rc = ptlrpc_start_threads(obd, mds->mds_readpage_service);
2650
2651         if (rc)
2652                 GOTO(err_thread3, rc);
2653
2654         ping_evictor_start();
2655
2656         RETURN(0);
2657
2658 err_thread3:
2659         ptlrpc_unregister_service(mds->mds_readpage_service);
2660         mds->mds_readpage_service = NULL;
2661 err_thread2:
2662         ptlrpc_unregister_service(mds->mds_setattr_service);
2663         mds->mds_setattr_service = NULL;
2664 err_thread:
2665         ptlrpc_unregister_service(mds->mds_service);
2666         mds->mds_service = NULL;
2667 err_lprocfs:
2668         lprocfs_obd_cleanup(obd);
2669         return rc;
2670 }
2671
2672 static int mdt_cleanup(struct obd_device *obd)
2673 {
2674         struct mds_obd *mds = &obd->u.mds;
2675         ENTRY;
2676
2677         ping_evictor_stop();
2678
2679         down(&mds->mds_health_sem);
2680         ptlrpc_unregister_service(mds->mds_readpage_service);
2681         ptlrpc_unregister_service(mds->mds_setattr_service);
2682         ptlrpc_unregister_service(mds->mds_service);
2683         mds->mds_readpage_service = NULL;
2684         mds->mds_setattr_service = NULL;
2685         mds->mds_service = NULL;
2686         up(&mds->mds_health_sem);
2687
2688         lprocfs_obd_cleanup(obd);
2689
2690         RETURN(0);
2691 }
2692
2693 static int mdt_health_check(struct obd_device *obd)
2694 {
2695         struct mds_obd *mds = &obd->u.mds;
2696         int rc = 0;
2697
2698         down(&mds->mds_health_sem);
2699         rc |= ptlrpc_service_health_check(mds->mds_readpage_service);
2700         rc |= ptlrpc_service_health_check(mds->mds_setattr_service);
2701         rc |= ptlrpc_service_health_check(mds->mds_service);
2702         up(&mds->mds_health_sem);
2703
2704         /*
2705          * health_check to return 0 on healthy
2706          * and 1 on unhealthy.
2707          */
2708         if(rc != 0)
2709                 rc = 1;
2710
2711         return rc;
2712 }
2713
2714 static struct dentry *mds_lvfs_fid2dentry(__u64 id, __u32 gen, __u64 gr,
2715                                           void *data)
2716 {
2717         struct obd_device *obd = data;
2718         struct ll_fid fid;
2719         fid.id = id;
2720         fid.generation = gen;
2721         return mds_fid2dentry(&obd->u.mds, &fid, NULL);
2722 }
2723
2724 static int mds_health_check(struct obd_device *obd)
2725 {
2726         struct obd_device_target *odt = &obd->u.obt;
2727 #ifdef USE_HEALTH_CHECK_WRITE
2728         struct mds_obd *mds = &obd->u.mds;
2729 #endif
2730         int rc = 0;
2731
2732         if (odt->obt_sb->s_flags & MS_RDONLY)
2733                 rc = 1;
2734
2735 #ifdef USE_HEALTH_CHECK_WRITE
2736         LASSERT(mds->mds_health_check_filp != NULL);
2737         rc |= !!lvfs_check_io_health(obd, mds->mds_health_check_filp);
2738 #endif
2739
2740         return rc;
2741 }
2742
2743 static int mds_process_config(struct obd_device *obd, obd_count len, void *buf)
2744 {
2745         struct lustre_cfg *lcfg = buf;
2746         struct lprocfs_static_vars lvars;
2747         int rc;
2748
2749         lprocfs_init_vars(mds, &lvars);
2750         
2751         rc = class_process_proc_param(PARAM_MDT, lvars.obd_vars, lcfg, obd);
2752         
2753         return(rc);
2754 }
2755
2756 struct lvfs_callback_ops mds_lvfs_ops = {
2757         l_fid2dentry:     mds_lvfs_fid2dentry,
2758 };
2759
2760 /* use obd ops to offer management infrastructure */
2761 static struct obd_ops mds_obd_ops = {
2762         .o_owner           = THIS_MODULE,
2763         .o_connect         = mds_connect,
2764         .o_reconnect       = mds_reconnect,
2765         .o_init_export     = mds_init_export,
2766         .o_destroy_export  = mds_destroy_export,
2767         .o_disconnect      = mds_disconnect,
2768         .o_setup           = mds_setup,
2769         .o_precleanup      = mds_precleanup,
2770         .o_cleanup         = mds_cleanup,
2771         .o_postrecov       = mds_postrecov,
2772         .o_statfs          = mds_obd_statfs,
2773         .o_iocontrol       = mds_iocontrol,
2774         .o_create          = mds_obd_create,
2775         .o_destroy         = mds_obd_destroy,
2776         .o_llog_init       = mds_llog_init,
2777         .o_llog_finish     = mds_llog_finish,
2778         .o_notify          = mds_notify,
2779         .o_health_check    = mds_health_check,
2780         .o_process_config  = mds_process_config,
2781 };
2782
2783 static struct obd_ops mdt_obd_ops = {
2784         .o_owner           = THIS_MODULE,
2785         .o_setup           = mdt_setup,
2786         .o_cleanup         = mdt_cleanup,
2787         .o_health_check    = mdt_health_check,
2788 };
2789
2790 quota_interface_t *mds_quota_interface_ref;
2791 extern quota_interface_t mds_quota_interface;
2792
2793 static int __init mds_init(void)
2794 {
2795         int rc;
2796         struct lprocfs_static_vars lvars;
2797
2798         request_module("lquota");
2799         mds_quota_interface_ref = PORTAL_SYMBOL_GET(mds_quota_interface);
2800         rc = lquota_init(mds_quota_interface_ref);
2801         if (rc) {
2802                 if (mds_quota_interface_ref)
2803                         PORTAL_SYMBOL_PUT(mds_quota_interface);
2804                 return rc;
2805         }
2806         init_obd_quota_ops(mds_quota_interface_ref, &mds_obd_ops);
2807         
2808         lprocfs_init_vars(mds, &lvars);
2809         class_register_type(&mds_obd_ops, lvars.module_vars, LUSTRE_MDS_NAME);
2810         lprocfs_init_vars(mdt, &lvars);
2811         class_register_type(&mdt_obd_ops, lvars.module_vars, LUSTRE_MDT_NAME);
2812
2813         return 0;
2814 }
2815
2816 static void /*__exit*/ mds_exit(void)
2817 {
2818         lquota_exit(mds_quota_interface_ref);
2819         if (mds_quota_interface_ref)
2820                 PORTAL_SYMBOL_PUT(mds_quota_interface);
2821
2822         class_unregister_type(LUSTRE_MDS_NAME);
2823         class_unregister_type(LUSTRE_MDT_NAME);
2824 }
2825
2826 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
2827 MODULE_DESCRIPTION("Lustre Metadata Server (MDS)");
2828 MODULE_LICENSE("GPL");
2829
2830 module_init(mds_init);
2831 module_exit(mds_exit);