Whamcloud - gitweb
landing b_cmobd_merge on HEAD
[fs/lustre-release.git] / lustre / mds / handler.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  lustre/mds/handler.c
5  *  Lustre Metadata Server (mds) request handler
6  *
7  *  Copyright (c) 2001-2003 Cluster File Systems, Inc.
8  *   Author: Peter Braam <braam@clusterfs.com>
9  *   Author: Andreas Dilger <adilger@clusterfs.com>
10  *   Author: Phil Schwan <phil@clusterfs.com>
11  *   Author: Mike Shaver <shaver@clusterfs.com>
12  *
13  *   This file is part of Lustre, http://www.lustre.org.
14  *
15  *   Lustre is free software; you can redistribute it and/or
16  *   modify it under the terms of version 2 of the GNU General Public
17  *   License as published by the Free Software Foundation.
18  *
19  *   Lustre is distributed in the hope that it will be useful,
20  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
21  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
22  *   GNU General Public License for more details.
23  *
24  *   You should have received a copy of the GNU General Public License
25  *   along with Lustre; if not, write to the Free Software
26  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
27  */
28
29 #ifndef EXPORT_SYMTAB
30 # define EXPORT_SYMTAB
31 #endif
32 #define DEBUG_SUBSYSTEM S_MDS
33
34 #include <linux/module.h>
35 #include <linux/lustre_mds.h>
36 #include <linux/lustre_dlm.h>
37 #include <linux/init.h>
38 #include <linux/obd_class.h>
39 #include <linux/random.h>
40 #include <linux/fs.h>
41 #include <linux/jbd.h>
42 #include <linux/ext3_fs.h>
43 #if (LINUX_VERSION_CODE >= KERNEL_VERSION(2,5,0))
44 # include <linux/smp_lock.h>
45 # include <linux/buffer_head.h>
46 # include <linux/workqueue.h>
47 # include <linux/mount.h>
48 #else
49 # include <linux/locks.h>
50 #endif
51 #include <linux/obd_lov.h>
52 #include <linux/lustre_mds.h>
53 #include <linux/lustre_fsfilt.h>
54 #include <linux/lprocfs_status.h>
55 #include <linux/lustre_commit_confd.h>
56
57 #include "mds_internal.h"
58
59 static int mds_intent_policy(struct ldlm_namespace *ns,
60                              struct ldlm_lock **lockp, void *req_cookie,
61                              ldlm_mode_t mode, int flags, void *data);
62 static int mds_postsetup(struct obd_device *obd);
63 static int mds_cleanup(struct obd_device *obd, int flags);
64
65 /* Assumes caller has already pushed into the kernel filesystem context */
66 static int mds_sendpage(struct ptlrpc_request *req, struct file *file,
67                         loff_t offset, int count)
68 {
69         struct ptlrpc_bulk_desc *desc;
70         struct l_wait_info lwi;
71         struct page **pages;
72         int rc = 0, npages, i, tmpcount, tmpsize = 0;
73         ENTRY;
74
75         LASSERT((offset & (PAGE_SIZE - 1)) == 0); /* I'm dubious about this */
76
77         npages = (count + PAGE_SIZE - 1) >> PAGE_SHIFT;
78         OBD_ALLOC(pages, sizeof(*pages) * npages);
79         if (!pages)
80                 GOTO(out, rc = -ENOMEM);
81
82         desc = ptlrpc_prep_bulk_exp(req, npages, BULK_PUT_SOURCE,
83                                     MDS_BULK_PORTAL);
84         if (desc == NULL)
85                 GOTO(out_free, rc = -ENOMEM);
86
87         for (i = 0, tmpcount = count; i < npages; i++, tmpcount -= tmpsize) {
88                 tmpsize = tmpcount > PAGE_SIZE ? PAGE_SIZE : tmpcount;
89
90                 pages[i] = alloc_pages(GFP_KERNEL, 0);
91                 if (pages[i] == NULL)
92                         GOTO(cleanup_buf, rc = -ENOMEM);
93
94                 ptlrpc_prep_bulk_page(desc, pages[i], 0, tmpsize);
95         }
96
97         for (i = 0, tmpcount = count; i < npages; i++, tmpcount -= tmpsize) {
98                 tmpsize = tmpcount > PAGE_SIZE ? PAGE_SIZE : tmpcount;
99                 CDEBUG(D_EXT2, "reading %u@%llu from dir %lu (size %llu)\n",
100                        tmpsize, offset, file->f_dentry->d_inode->i_ino,
101                        file->f_dentry->d_inode->i_size);
102
103                 rc = fsfilt_readpage(req->rq_export->exp_obd, file,
104                                      kmap(pages[i]), tmpsize, &offset);
105                 kunmap(pages[i]);
106
107                 if (rc != tmpsize)
108                         GOTO(cleanup_buf, rc = -EIO);
109         }
110
111         LASSERT(desc->bd_nob == count);
112
113         rc = ptlrpc_start_bulk_transfer(desc);
114         if (rc)
115                 GOTO(cleanup_buf, rc);
116
117         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_SENDPAGE)) {
118                 CERROR("obd_fail_loc=%x, fail operation rc=%d\n",
119                        OBD_FAIL_MDS_SENDPAGE, rc);
120                 GOTO(abort_bulk, rc);
121         }
122
123         lwi = LWI_TIMEOUT(obd_timeout * HZ / 4, NULL, NULL);
124         rc = l_wait_event(desc->bd_waitq, !ptlrpc_bulk_active(desc), &lwi);
125         LASSERT (rc == 0 || rc == -ETIMEDOUT);
126
127         if (rc == 0) {
128                 if (desc->bd_success &&
129                     desc->bd_nob_transferred == count)
130                         GOTO(cleanup_buf, rc);
131
132                 rc = -ETIMEDOUT; /* XXX should this be a different errno? */
133         }
134
135         DEBUG_REQ(D_ERROR, req, "bulk failed: %s %d(%d), evicting %s@%s\n",
136                   (rc == -ETIMEDOUT) ? "timeout" : "network error",
137                   desc->bd_nob_transferred, count,
138                   req->rq_export->exp_client_uuid.uuid,
139                   req->rq_export->exp_connection->c_remote_uuid.uuid);
140
141         ptlrpc_fail_export(req->rq_export);
142
143         EXIT;
144  abort_bulk:
145         ptlrpc_abort_bulk (desc);
146  cleanup_buf:
147         for (i = 0; i < npages; i++)
148                 if (pages[i])
149                         __free_pages(pages[i], 0);
150
151         ptlrpc_free_bulk(desc);
152  out_free:
153         OBD_FREE(pages, sizeof(*pages) * npages);
154  out:
155         return rc;
156 }
157
158 /* only valid locked dentries or errors should be returned */
159 struct dentry *mds_fid2locked_dentry(struct obd_device *obd, struct ll_fid *fid,
160                                      struct vfsmount **mnt, int lock_mode,
161                                      struct lustre_handle *lockh,
162                                      char *name, int namelen, __u64 lockpart)
163 {
164         struct mds_obd *mds = &obd->u.mds;
165         struct dentry *de = mds_fid2dentry(mds, fid, mnt), *retval = de;
166         struct ldlm_res_id res_id = { .name = {0} };
167         int flags = 0, rc;
168         ldlm_policy_data_t policy = { .l_inodebits = { lockpart } };
169
170         ENTRY;
171
172         if (IS_ERR(de))
173                 RETURN(de);
174
175         res_id.name[0] = de->d_inode->i_ino;
176         res_id.name[1] = de->d_inode->i_generation;
177 #ifdef S_PDIROPS
178         lockh[1].cookie = 0;
179         if (name && IS_PDIROPS(de->d_inode)) {
180                 ldlm_policy_data_t cpolicy =
181                         { .l_inodebits = { MDS_INODELOCK_UPDATE } };
182                 /* lock just dir { ino, generation } to flush client cache */
183                 if (lock_mode == LCK_PW) {
184                         rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace,
185                                               res_id, LDLM_IBITS,
186                                               &cpolicy, LCK_CW, &flags,
187                                               mds_blocking_ast,
188                                               ldlm_completion_ast, NULL, NULL,
189                                               NULL, 0, NULL, lockh + 1);
190                         if (rc != ELDLM_OK) {
191                                 l_dput(de);
192                                 RETURN(ERR_PTR(-ENOLCK));
193                         }
194                        flags = 0;
195                 }
196
197                 res_id.name[2] = full_name_hash(name, namelen);
198                 CDEBUG(D_INFO, "take lock on %lu:%u:"LPX64"\n",
199                        de->d_inode->i_ino, de->d_inode->i_generation,
200                        res_id.name[2]);
201         }
202 #endif
203         rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace, res_id,
204                               LDLM_IBITS, &policy, lock_mode, &flags,
205                               mds_blocking_ast, ldlm_completion_ast, NULL, NULL,
206                               NULL, 0, NULL, lockh);
207         if (rc != ELDLM_OK) {
208                 l_dput(de);
209                 retval = ERR_PTR(-EIO); /* XXX translate ldlm code */
210 #ifdef S_PDIROPS
211                 if (lockh[1].cookie)
212                         ldlm_lock_decref(lockh + 1, LCK_CW);
213 #endif
214         }
215
216         RETURN(retval);
217 }
218
219 #ifndef DCACHE_DISCONNECTED
220 #define DCACHE_DISCONNECTED DCACHE_NFSD_DISCONNECTED
221 #endif
222
223
224 /* Look up an entry by inode number. */
225 /* this function ONLY returns valid dget'd dentries with an initialized inode
226    or errors */
227 struct dentry *mds_fid2dentry(struct mds_obd *mds, struct ll_fid *fid,
228                               struct vfsmount **mnt)
229 {
230         char fid_name[32];
231         unsigned long ino = fid->id;
232         __u32 generation = fid->generation;
233         struct inode *inode;
234         struct dentry *result;
235
236         if (ino == 0)
237                 RETURN(ERR_PTR(-ESTALE));
238
239         snprintf(fid_name, sizeof(fid_name), "0x%lx", ino);
240
241         CDEBUG(D_DENTRY, "--> mds_fid2dentry: ino/gen %lu/%u, sb %p\n",
242                ino, generation, mds->mds_sb);
243
244         /* under ext3 this is neither supposed to return bad inodes
245            nor NULL inodes. */
246         result = ll_lookup_one_len(fid_name, mds->mds_fid_de, strlen(fid_name));
247         if (IS_ERR(result))
248                 RETURN(result);
249
250         inode = result->d_inode;
251         if (!inode)
252                 RETURN(ERR_PTR(-ENOENT));
253
254 #if 0
255         /* here we disabled generation check, as root inode i_generation
256          * of cache mds and real mds are different. */
257         if (generation && inode->i_generation != generation) {
258                 /* we didn't find the right inode.. */
259                 CERROR("bad inode %lu, link: %lu ct: %d or generation %u/%u\n",
260                        inode->i_ino, (unsigned long)inode->i_nlink,
261                        atomic_read(&inode->i_count), inode->i_generation,
262                        generation);
263                 dput(result);
264                 RETURN(ERR_PTR(-ENOENT));
265         }
266 #endif
267
268         if (mnt) {
269                 *mnt = mds->mds_vfsmnt;
270                 mntget(*mnt);
271         }
272
273         RETURN(result);
274 }
275
276
277 /* Establish a connection to the MDS.
278  *
279  * This will set up an export structure for the client to hold state data
280  * about that client, like open files, the last operation number it did
281  * on the server, etc.
282  */
283 static int mds_connect(struct lustre_handle *conn, struct obd_device *obd,
284                        struct obd_uuid *cluuid)
285 {
286         struct obd_export *exp;
287         struct mds_export_data *med; /*  */
288         struct mds_client_data *mcd;
289         int rc, abort_recovery;
290         ENTRY;
291
292         if (!conn || !obd || !cluuid)
293                 RETURN(-EINVAL);
294
295         /* Check for aborted recovery. */
296         spin_lock_bh(&obd->obd_processing_task_lock);
297         abort_recovery = obd->obd_abort_recovery;
298         spin_unlock_bh(&obd->obd_processing_task_lock);
299         if (abort_recovery)
300                 target_abort_recovery(obd);
301
302         /* XXX There is a small race between checking the list and adding a
303          * new connection for the same UUID, but the real threat (list
304          * corruption when multiple different clients connect) is solved.
305          *
306          * There is a second race between adding the export to the list,
307          * and filling in the client data below.  Hence skipping the case
308          * of NULL mcd above.  We should already be controlling multiple
309          * connects at the client, and we can't hold the spinlock over
310          * memory allocations without risk of deadlocking.
311          */
312         rc = class_connect(conn, obd, cluuid);
313         if (rc)
314                 RETURN(rc);
315         exp = class_conn2export(conn);
316         LASSERT(exp);
317         med = &exp->exp_mds_data;
318
319         OBD_ALLOC(mcd, sizeof(*mcd));
320         if (!mcd) {
321                 CERROR("mds: out of memory for client data\n");
322                 GOTO(out, rc = -ENOMEM);
323         }
324
325         memcpy(mcd->mcd_uuid, cluuid, sizeof(mcd->mcd_uuid));
326         med->med_mcd = mcd;
327
328         rc = mds_client_add(obd, &obd->u.mds, med, -1);
329         if (rc == 0)
330                 EXIT;
331 out:
332         if (rc) {
333                 OBD_FREE(mcd, sizeof(*mcd));
334                 class_disconnect(exp, 0);
335         }
336         class_export_put(exp);
337
338         return rc;
339 }
340
341 static int mds_init_export(struct obd_export *exp)
342 {
343         struct mds_export_data *med = &exp->exp_mds_data;
344
345         INIT_LIST_HEAD(&med->med_open_head);
346         spin_lock_init(&med->med_open_lock);
347         RETURN(0);
348 }
349
350 static int mds_destroy_export(struct obd_export *export)
351 {
352         struct mds_export_data *med;
353         struct obd_device *obd = export->exp_obd;
354         struct lvfs_run_ctxt saved;
355         int rc = 0;
356         ENTRY;
357
358         med = &export->exp_mds_data;
359         target_destroy_export(export);
360
361         if (obd_uuid_equals(&export->exp_client_uuid, &obd->obd_uuid))
362                 GOTO(out, 0);
363
364         push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
365
366         /* Close any open files (which may also cause orphan unlinking). */
367         spin_lock(&med->med_open_lock);
368         while (!list_empty(&med->med_open_head)) {
369                 struct list_head *tmp = med->med_open_head.next;
370                 struct mds_file_data *mfd =
371                         list_entry(tmp, struct mds_file_data, mfd_list);
372                 BDEVNAME_DECLARE_STORAGE(btmp);
373
374                 /* bug 1579: fix force-closing for 2.5 */
375                 struct dentry *dentry = mfd->mfd_dentry;
376
377                 list_del(&mfd->mfd_list);
378                 spin_unlock(&med->med_open_lock);
379
380                 /* If you change this message, be sure to update
381                  * replay_single:test_46 */
382                 CERROR("force closing client file handle for %*s (%s:%lu)\n",
383                        dentry->d_name.len, dentry->d_name.name,
384                        ll_bdevname(dentry->d_inode->i_sb, btmp),
385                        dentry->d_inode->i_ino);
386                 rc = mds_mfd_close(NULL, obd, mfd,
387                                    !(export->exp_flags & OBD_OPT_FAILOVER));
388
389                 if (rc)
390                         CDEBUG(D_INODE, "Error closing file: %d\n", rc);
391                 spin_lock(&med->med_open_lock);
392         }
393         spin_unlock(&med->med_open_lock);
394         pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
395
396 out:
397         mds_client_free(export, !(export->exp_flags & OBD_OPT_FAILOVER));
398
399         RETURN(rc);
400 }
401
402 static int mds_disconnect(struct obd_export *exp, int flags)
403 {
404         struct obd_device *obd;
405         struct mds_obd *mds;
406         unsigned long irqflags;
407         int rc;
408         ENTRY;
409
410         LASSERT(exp);
411         class_export_get(exp);
412
413         obd = class_exp2obd(exp);
414         if (obd == NULL) {
415                 CDEBUG(D_IOCTL, "invalid client cookie "LPX64"\n",
416                        exp->exp_handle.h_cookie);
417                 RETURN(-EINVAL);
418         }
419         mds = &obd->u.mds;
420
421         if (!(exp->exp_flags & OBD_OPT_REAL_CLIENT)
422                         && !atomic_read(&mds->mds_real_clients)) {
423                 /* there was no client at all */
424                 mds_lmv_disconnect(obd, flags);
425         }
426
427         if ((exp->exp_flags & OBD_OPT_REAL_CLIENT)
428                         && atomic_dec_and_test(&mds->mds_real_clients)) {
429                 /* time to drop LMV connections */
430                 CDEBUG(D_OTHER, "%s: last real client %s disconnected.  "
431                        "Disconnnect from LMV now\n",
432                        obd->obd_name, exp->exp_client_uuid.uuid);
433                 mds_lmv_disconnect(obd, flags);
434         }
435
436         spin_lock_irqsave(&exp->exp_lock, irqflags);
437         exp->exp_flags = flags;
438         spin_unlock_irqrestore(&exp->exp_lock, irqflags);
439
440         /* Disconnect early so that clients can't keep using export */
441         rc = class_disconnect(exp, flags);
442         ldlm_cancel_locks_for_export(exp);
443
444         /* complete all outstanding replies */
445         spin_lock_irqsave(&exp->exp_lock, irqflags);
446         while (!list_empty(&exp->exp_outstanding_replies)) {
447                 struct ptlrpc_reply_state *rs =
448                         list_entry(exp->exp_outstanding_replies.next,
449                                    struct ptlrpc_reply_state, rs_exp_list);
450                 struct ptlrpc_service *svc = rs->rs_srv_ni->sni_service;
451
452                 spin_lock(&svc->srv_lock);
453                 list_del_init(&rs->rs_exp_list);
454                 ptlrpc_schedule_difficult_reply(rs);
455                 spin_unlock(&svc->srv_lock);
456         }
457         spin_unlock_irqrestore(&exp->exp_lock, irqflags);
458
459         class_export_put(exp);
460         RETURN(rc);
461 }
462
463 static int mds_getstatus(struct ptlrpc_request *req)
464 {
465         struct mds_obd *mds = mds_req2mds(req);
466         struct mds_body *body;
467         int rc, size = sizeof(*body);
468         ENTRY;
469
470         rc = lustre_pack_reply(req, 1, &size, NULL);
471         if (rc || OBD_FAIL_CHECK(OBD_FAIL_MDS_GETSTATUS_PACK)) {
472                 CERROR("mds: out of memory for message: size=%d\n", size);
473                 req->rq_status = -ENOMEM;       /* superfluous? */
474                 RETURN(-ENOMEM);
475         }
476
477         body = lustre_msg_buf(req->rq_repmsg, 0, sizeof (*body));
478         memcpy(&body->fid1, &mds->mds_rootfid, sizeof(body->fid1));
479
480         /* the last_committed and last_xid fields are filled in for all
481          * replies already - no need to do so here also.
482          */
483         RETURN(0);
484 }
485
486 int mds_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc,
487                      void *data, int flag)
488 {
489         int do_ast;
490         ENTRY;
491
492         if (flag == LDLM_CB_CANCELING) {
493                 /* Don't need to do anything here. */
494                 RETURN(0);
495         }
496
497         /* XXX layering violation!  -phil */
498         l_lock(&lock->l_resource->lr_namespace->ns_lock);
499         /* Get this: if mds_blocking_ast is racing with mds_intent_policy,
500          * such that mds_blocking_ast is called just before l_i_p takes the
501          * ns_lock, then by the time we get the lock, we might not be the
502          * correct blocking function anymore.  So check, and return early, if
503          * so. */
504         if (lock->l_blocking_ast != mds_blocking_ast) {
505                 l_unlock(&lock->l_resource->lr_namespace->ns_lock);
506                 RETURN(0);
507         }
508
509         lock->l_flags |= LDLM_FL_CBPENDING;
510         do_ast = (!lock->l_readers && !lock->l_writers);
511         l_unlock(&lock->l_resource->lr_namespace->ns_lock);
512
513         if (do_ast) {
514                 struct lustre_handle lockh;
515                 int rc;
516
517                 LDLM_DEBUG(lock, "already unused, calling ldlm_cli_cancel");
518                 ldlm_lock2handle(lock, &lockh);
519                 rc = ldlm_cli_cancel(&lockh);
520                 if (rc < 0)
521                         CERROR("ldlm_cli_cancel: %d\n", rc);
522         } else {
523                 LDLM_DEBUG(lock, "Lock still has references, will be "
524                            "cancelled later");
525         }
526         RETURN(0);
527 }
528
529 int mds_get_md(struct obd_device *obd, struct inode *inode, void *md,
530                int *size, int lock)
531 {
532         int rc = 0;
533         int lmm_size;
534
535         if (lock)
536                 down(&inode->i_sem);
537         rc = fsfilt_get_md(obd, inode, md, *size);
538         if (lock)
539                 up(&inode->i_sem);
540
541         if (rc < 0) {
542                 CERROR("Error %d reading eadata for ino %lu\n",
543                        rc, inode->i_ino);
544         } else if (rc > 0) {
545                 lmm_size = rc;
546                 
547                 if (S_ISREG(inode->i_mode))
548                         rc = mds_convert_lov_ea(obd, inode, md, lmm_size);
549
550                 if (rc == 0) {
551                         *size = lmm_size;
552                         rc = lmm_size;
553                 } else if (rc > 0) {
554                         *size = rc;
555                 }
556         }
557
558         RETURN (rc);
559 }
560
561
562 /* Call with lock=1 if you want mds_pack_md to take the i_sem.
563  * Call with lock=0 if the caller has already taken the i_sem. */
564 int mds_pack_md(struct obd_device *obd, struct lustre_msg *msg, int offset,
565                 struct mds_body *body, struct inode *inode, int lock)
566 {
567         struct mds_obd *mds = &obd->u.mds;
568         void *lmm;
569         int lmm_size;
570         int rc;
571         ENTRY;
572
573         lmm = lustre_msg_buf(msg, offset, 0);
574         if (lmm == NULL) {
575                 /* Some problem with getting eadata when I sized the reply
576                  * buffer... */
577                 CDEBUG(D_INFO, "no space reserved for inode %lu MD\n",
578                        inode->i_ino);
579                 RETURN(0);
580         }
581         lmm_size = msg->buflens[offset];
582
583         /* I don't really like this, but it is a sanity check on the client
584          * MD request.  However, if the client doesn't know how much space
585          * to reserve for the MD, it shouldn't be bad to have too much space.
586          */
587         if (lmm_size > mds->mds_max_mdsize) {
588                 CWARN("Reading MD for inode %lu of %d bytes > max %d\n",
589                        inode->i_ino, lmm_size, mds->mds_max_mdsize);
590                 // RETURN(-EINVAL);
591         }
592
593         rc = mds_get_md(obd, inode, lmm, &lmm_size, lock);
594         if (rc > 0) {
595                 if (S_ISDIR(inode->i_mode))
596                         body->valid |= OBD_MD_FLDIREA;
597                 else
598                         body->valid |= OBD_MD_FLEASIZE;
599                 body->eadatasize = lmm_size;
600                 rc = 0;
601         }
602
603         RETURN(rc);
604 }
605
606 static int mds_getattr_internal(struct obd_device *obd, struct dentry *dentry,
607                                 struct ptlrpc_request *req,
608                                 struct mds_body *reqbody, int reply_off)
609 {
610         struct mds_body *body;
611         struct inode *inode = dentry->d_inode;
612         int rc = 0;
613         ENTRY;
614
615         if (inode == NULL && !(dentry->d_flags & DCACHE_CROSS_REF))
616                 RETURN(-ENOENT);
617
618         body = lustre_msg_buf(req->rq_repmsg, reply_off, sizeof(*body));
619         LASSERT(body != NULL);                 /* caller prepped reply */
620
621         if (dentry->d_flags & DCACHE_CROSS_REF) {
622                 CDEBUG(D_OTHER, "cross reference: %lu/%lu/%lu\n",
623                        (unsigned long) dentry->d_mdsnum,
624                        (unsigned long) dentry->d_inum,
625                        (unsigned long) dentry->d_generation);
626                 body->valid |= OBD_MD_FLID | OBD_MD_MDS;
627                 body->fid1.id = dentry->d_inum;
628                 body->fid1.mds = dentry->d_mdsnum;
629                 body->fid1.generation = dentry->d_generation;
630                 RETURN(0);
631         }
632         mds_pack_inode2fid(obd, &body->fid1, inode);
633         mds_pack_inode2body(obd, body, inode);
634
635         if ((S_ISREG(inode->i_mode) && (reqbody->valid & OBD_MD_FLEASIZE)) ||
636             (S_ISDIR(inode->i_mode) && (reqbody->valid & OBD_MD_FLDIREA))) {
637                 rc = mds_pack_md(obd, req->rq_repmsg, reply_off + 1, body,
638                                  inode, 1);
639
640                 /* If we have LOV EA data, the OST holds size, atime, mtime */
641                 if (!(body->valid & OBD_MD_FLEASIZE) &&
642                     !(body->valid & OBD_MD_FLDIREA))
643                         body->valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS |
644                                         OBD_MD_FLATIME | OBD_MD_FLMTIME);
645         } else if (S_ISLNK(inode->i_mode) &&
646                    (reqbody->valid & OBD_MD_LINKNAME) != 0) {
647                 char *symname = lustre_msg_buf(req->rq_repmsg, reply_off + 1,0);
648                 int len;
649
650                 LASSERT (symname != NULL);       /* caller prepped reply */
651                 len = req->rq_repmsg->buflens[reply_off + 1];
652
653                 rc = inode->i_op->readlink(dentry, symname, len);
654                 if (rc < 0) {
655                         CERROR("readlink failed: %d\n", rc);
656                 } else if (rc != len - 1) {
657                         CERROR ("Unexpected readlink rc %d: expecting %d\n",
658                                 rc, len - 1);
659                         rc = -EINVAL;
660                 } else {
661                         CDEBUG(D_INODE, "read symlink dest %s\n", symname);
662                         body->valid |= OBD_MD_LINKNAME;
663                         body->eadatasize = rc + 1;
664                         symname[rc] = 0;        /* NULL terminate */
665                         rc = 0;
666                 }
667         }
668
669         RETURN(rc);
670 }
671
672 static int mds_getattr_pack_msg_cf(struct ptlrpc_request *req,
673                                         struct dentry *dentry,
674                                         int offset)
675 {
676         int rc = 0, size[1] = {sizeof(struct mds_body)};
677         ENTRY;
678
679         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_GETATTR_PACK)) {
680                 CERROR("failed MDS_GETATTR_PACK test\n");
681                 req->rq_status = -ENOMEM;
682                 GOTO(out, rc = -ENOMEM);
683         }
684
685         rc = lustre_pack_reply(req, 1, size, NULL);
686         if (rc) {
687                 CERROR("out of memory\n");
688                 GOTO(out, req->rq_status = rc);
689         }
690
691         EXIT;
692  out:
693         return(rc);
694 }
695
696 static int mds_getattr_pack_msg(struct ptlrpc_request *req, struct inode *inode,
697                                 int offset)
698 {
699         struct mds_obd *mds = mds_req2mds(req);
700         struct mds_body *body;
701         int rc = 0, size[2] = {sizeof(*body)}, bufcount = 1;
702         ENTRY;
703
704         body = lustre_msg_buf(req->rq_reqmsg, offset, sizeof (*body));
705         LASSERT(body != NULL);                 /* checked by caller */
706         LASSERT_REQSWABBED(req, offset);       /* swabbed by caller */
707
708         if ((S_ISREG(inode->i_mode) && (body->valid & OBD_MD_FLEASIZE)) ||
709             (S_ISDIR(inode->i_mode) && (body->valid & OBD_MD_FLDIREA))) {
710                 int rc;
711                 down(&inode->i_sem);
712                 rc = fsfilt_get_md(req->rq_export->exp_obd, inode, NULL, 0);
713                 up(&inode->i_sem);
714                 CDEBUG(D_INODE, "got %d bytes MD data for inode %lu\n",
715                        rc, inode->i_ino);
716                 if (rc < 0) {
717                         if (rc != -ENODATA)
718                                 CERROR("error getting inode %lu MD: rc = %d\n",
719                                        inode->i_ino, rc);
720                         size[bufcount] = 0;
721                 } else if (rc > mds->mds_max_mdsize) {
722                         size[bufcount] = 0;
723                         CERROR("MD size %d larger than maximum possible %u\n",
724                                rc, mds->mds_max_mdsize);
725                 } else {
726                         size[bufcount] = rc;
727                 }
728                 bufcount++;
729         } else if (S_ISLNK(inode->i_mode) && (body->valid & OBD_MD_LINKNAME)) {
730                 if (inode->i_size + 1 != body->eadatasize)
731                         CERROR("symlink size: %Lu, reply space: %d\n",
732                                inode->i_size + 1, body->eadatasize);
733                 size[bufcount] = min_t(int, inode->i_size+1, body->eadatasize);
734                 bufcount++;
735                 CDEBUG(D_INODE, "symlink size: %Lu, reply space: %d\n",
736                        inode->i_size + 1, body->eadatasize);
737         }
738
739         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_GETATTR_PACK)) {
740                 CERROR("failed MDS_GETATTR_PACK test\n");
741                 req->rq_status = -ENOMEM;
742                 GOTO(out, rc = -ENOMEM);
743         }
744
745         rc = lustre_pack_reply(req, bufcount, size, NULL);
746         if (rc) {
747                 CERROR("out of memory\n");
748                 GOTO(out, req->rq_status = rc);
749         }
750
751         EXIT;
752  out:
753         return(rc);
754 }
755
756 static int mds_getattr_name(int offset, struct ptlrpc_request *req,
757                             struct lustre_handle *child_lockh, int child_part)
758 {
759         struct obd_device *obd = req->rq_export->exp_obd;
760         struct ldlm_reply *rep = NULL;
761         struct lvfs_run_ctxt saved;
762         struct mds_body *body;
763         struct dentry *dparent = NULL, *dchild = NULL;
764         struct lvfs_ucred uc;
765         struct lustre_handle parent_lockh[2];
766         int namesize;
767         int rc = 0, cleanup_phase = 0, resent_req = 0;
768         char *name;
769         ENTRY;
770
771         LASSERT(!strcmp(obd->obd_type->typ_name, "mds"));
772
773         /* Swab now, before anyone looks inside the request */
774
775         body = lustre_swab_reqbuf(req, offset, sizeof(*body),
776                                   lustre_swab_mds_body);
777         if (body == NULL) {
778                 CERROR("Can't swab mds_body\n");
779                 GOTO(cleanup, rc = -EFAULT);
780         }
781
782         LASSERT_REQSWAB(req, offset + 1);
783         name = lustre_msg_string(req->rq_reqmsg, offset + 1, 0);
784         if (name == NULL) {
785                 CERROR("Can't unpack name\n");
786                 GOTO(cleanup, rc = -EFAULT);
787         }
788         namesize = req->rq_reqmsg->buflens[offset + 1];
789
790         LASSERT (offset == 0 || offset == 2);
791         /* if requests were at offset 2, the getattr reply goes back at 1 */
792         if (offset) {
793                 rep = lustre_msg_buf(req->rq_repmsg, 0, sizeof (*rep));
794                 offset = 1;
795         }
796
797         uc.luc_fsuid = body->fsuid;
798         uc.luc_fsgid = body->fsgid;
799         uc.luc_cap = body->capability;
800         uc.luc_suppgid1 = body->suppgid;
801         uc.luc_suppgid2 = -1;
802         push_ctxt(&saved, &obd->obd_lvfs_ctxt, &uc);
803         cleanup_phase = 1; /* kernel context */
804         intent_set_disposition(rep, DISP_LOOKUP_EXECD);
805
806         LASSERT(namesize > 0);
807         if (namesize == 1) {
808                 /* we have no dentry here, drop LOOKUP bit */
809                 child_part &= ~MDS_INODELOCK_LOOKUP;
810                 CDEBUG(D_OTHER, "%s: request to retrieve attrs for %lu/%lu\n",
811                        obd->obd_name, (unsigned long) body->fid1.id,
812                        (unsigned long) body->fid1.generation);
813                 dchild = mds_fid2locked_dentry(obd, &body->fid1, NULL, LCK_PR,
814                                                parent_lockh, NULL, 0, child_part);
815                 if (IS_ERR(dchild)) {
816                         CERROR("can't find inode: %d\n", (int) PTR_ERR(dchild));
817                         GOTO(cleanup, rc = PTR_ERR(dchild));
818                 }
819                 memcpy(child_lockh, parent_lockh, sizeof(parent_lockh[0]));
820 #ifdef S_PDIROPS
821                 if (parent_lockh[1].cookie)
822                         ldlm_lock_decref(parent_lockh + 1, LCK_CW);
823 #endif
824                 cleanup_phase = 2;
825                 goto fill_inode;
826         }
827         
828         /* FIXME: handle raw lookup */
829 #if 0
830         if (body->valid == OBD_MD_FLID) {
831                 struct mds_body *mds_reply;
832                 int size = sizeof(*mds_reply);
833                 ino_t inum;
834                 // The user requested ONLY the inode number, so do a raw lookup
835                 rc = lustre_pack_reply(req, 1, &size, NULL);
836                 if (rc) {
837                         CERROR("out of memory\n");
838                         GOTO(cleanup, rc);
839                 }
840
841                 rc = dir->i_op->lookup_raw(dir, name, namesize - 1, &inum);
842
843                 mds_reply = lustre_msg_buf(req->rq_repmsg, offset,
844                                            sizeof(*mds_reply));
845                 mds_reply->fid1.id = inum;
846                 mds_reply->valid = OBD_MD_FLID;
847                 GOTO(cleanup, rc);
848         }
849 #endif
850
851         if (child_lockh->cookie != 0) {
852                 LASSERT(lustre_msg_get_flags(req->rq_reqmsg) & MSG_RESENT);
853                 resent_req = 1;
854         }
855
856         if (resent_req == 0) {
857                 rc = mds_get_parent_child_locked(obd, &obd->u.mds, &body->fid1,
858                                                  parent_lockh, &dparent,
859                                                  LCK_PR, MDS_INODELOCK_LOOKUP,
860                                                  name, namesize,
861                                                  child_lockh, &dchild, LCK_PR,
862                                                  child_part);
863                 if (rc)
864                         GOTO(cleanup, rc);
865         } else {
866                 struct ldlm_lock *granted_lock;
867                 struct ll_fid child_fid;
868                 struct ldlm_resource *res;
869                 DEBUG_REQ(D_DLMTRACE, req, "resent, not enqueuing new locks");
870                 granted_lock = ldlm_handle2lock(child_lockh);
871                 LASSERT(granted_lock);
872
873                 res = granted_lock->l_resource;
874                 child_fid.id = res->lr_name.name[0];
875                 child_fid.generation = res->lr_name.name[1];
876                 dchild = mds_fid2dentry(&obd->u.mds, &child_fid, NULL);
877                 LASSERT(dchild);
878                 LDLM_LOCK_PUT(granted_lock);
879         }
880
881         cleanup_phase = 2; /* dchild, dparent, locks */
882
883 fill_inode:
884         if (!DENTRY_VALID(dchild)) {
885                 intent_set_disposition(rep, DISP_LOOKUP_NEG);
886                 /* in the intent case, the policy clears this error:
887                    the disposition is enough */
888                 GOTO(cleanup, rc = -ENOENT);
889         } else {
890                 intent_set_disposition(rep, DISP_LOOKUP_POS);
891         }
892
893         if (req->rq_repmsg == NULL) {
894                 if (dchild->d_flags & DCACHE_CROSS_REF)
895                         rc = mds_getattr_pack_msg_cf(req, dchild, offset);
896                 else
897                         rc = mds_getattr_pack_msg(req, dchild->d_inode, offset);
898                 if (rc != 0) {
899                         CERROR ("mds_getattr_pack_msg: %d\n", rc);
900                         GOTO (cleanup, rc);
901                 }
902         }
903
904         rc = mds_getattr_internal(obd, dchild, req, body, offset);
905         GOTO(cleanup, rc); /* returns the lock to the client */
906
907  cleanup:
908         switch (cleanup_phase) {
909         case 2:
910                 if (resent_req == 0) {
911                         if (rc && DENTRY_VALID(dchild))
912                                 ldlm_lock_decref(child_lockh, LCK_PR);
913                         if (dparent) {
914                                 ldlm_lock_decref(parent_lockh, LCK_PR);
915 #ifdef S_PDIROPS
916                                 if (parent_lockh[1].cookie != 0)
917                                         ldlm_lock_decref(parent_lockh + 1,
918                                                         LCK_CW);
919 #endif
920                         }
921                         if (dparent)
922                                 l_dput(dparent);
923                 }
924                 l_dput(dchild);
925         case 1:
926                 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, &uc);
927         default: ;
928         }
929         return rc;
930 }
931
932 static int mds_getattr(int offset, struct ptlrpc_request *req)
933 {
934         struct mds_obd *mds = mds_req2mds(req);
935         struct obd_device *obd = req->rq_export->exp_obd;
936         struct lvfs_run_ctxt saved;
937         struct dentry *de;
938         struct mds_body *body;
939         struct lvfs_ucred uc;
940         int rc = 0;
941         ENTRY;
942
943         body = lustre_swab_reqbuf (req, offset, sizeof (*body),
944                                    lustre_swab_mds_body);
945         if (body == NULL) {
946                 CERROR ("Can't unpack body\n");
947                 RETURN (-EFAULT);
948         }
949
950         uc.luc_fsuid = body->fsuid;
951         uc.luc_fsgid = body->fsgid;
952         uc.luc_cap = body->capability;
953         push_ctxt(&saved, &obd->obd_lvfs_ctxt, &uc);
954         de = mds_fid2dentry(mds, &body->fid1, NULL);
955         if (IS_ERR(de)) {
956                 rc = req->rq_status = -ENOENT;
957                 GOTO(out_pop, PTR_ERR(de));
958         }
959
960         rc = mds_getattr_pack_msg(req, de->d_inode, offset);
961         if (rc != 0) {
962                 CERROR ("mds_getattr_pack_msg: %d\n", rc);
963                 GOTO (out_pop, rc);
964         }
965
966         req->rq_status = mds_getattr_internal(obd, de, req, body, 0);
967
968         l_dput(de);
969         GOTO(out_pop, rc);
970 out_pop:
971         pop_ctxt(&saved, &obd->obd_lvfs_ctxt, &uc);
972         return rc;
973 }
974
975
976 static int mds_obd_statfs(struct obd_device *obd, struct obd_statfs *osfs,
977                           unsigned long max_age)
978 {
979         int rc;
980
981         spin_lock(&obd->obd_osfs_lock);
982         rc = fsfilt_statfs(obd, obd->u.mds.mds_sb, max_age);
983         if (rc == 0)
984                 memcpy(osfs, &obd->obd_osfs, sizeof(*osfs));
985         spin_unlock(&obd->obd_osfs_lock);
986
987         return rc;
988 }
989
990 static int mds_statfs(struct ptlrpc_request *req)
991 {
992         struct obd_device *obd = req->rq_export->exp_obd;
993         int rc, size = sizeof(struct obd_statfs);
994         ENTRY;
995
996         rc = lustre_pack_reply(req, 1, &size, NULL);
997         if (rc || OBD_FAIL_CHECK(OBD_FAIL_MDS_STATFS_PACK)) {
998                 CERROR("mds: statfs lustre_pack_reply failed: rc = %d\n", rc);
999                 GOTO(out, rc);
1000         }
1001
1002         /* We call this so that we can cache a bit - 1 jiffie worth */
1003         rc = mds_obd_statfs(obd, lustre_msg_buf(req->rq_repmsg, 0, size),
1004                             jiffies - HZ);
1005         if (rc) {
1006                 CERROR("mds_obd_statfs failed: rc %d\n", rc);
1007                 GOTO(out, rc);
1008         }
1009
1010         EXIT;
1011 out:
1012         req->rq_status = rc;
1013         return 0;
1014 }
1015
1016 static int mds_sync(struct ptlrpc_request *req)
1017 {
1018         struct obd_device *obd = req->rq_export->exp_obd;
1019         struct mds_obd *mds = &obd->u.mds;
1020         struct mds_body *body;
1021         int rc, size = sizeof(*body);
1022         ENTRY;
1023
1024         body = lustre_msg_buf(req->rq_reqmsg, 0, sizeof(*body));
1025         if (body == NULL)
1026                 GOTO(out, rc = -EPROTO);
1027
1028         rc = lustre_pack_reply(req, 1, &size, NULL);
1029         if (rc || OBD_FAIL_CHECK(OBD_FAIL_MDS_SYNC_PACK)) {
1030                 CERROR("fsync lustre_pack_reply failed: rc = %d\n", rc);
1031                 GOTO(out, rc);
1032         }
1033
1034         if (body->fid1.id == 0) {
1035                 /* a fid of zero is taken to mean "sync whole filesystem" */
1036                 rc = fsfilt_sync(obd, mds->mds_sb);
1037                 if (rc)
1038                         GOTO(out, rc);
1039         } else {
1040                 /* just any file to grab fsync method - "file" arg unused */
1041                 struct file *file = mds->mds_rcvd_filp;
1042                 struct dentry *de;
1043
1044                 de = mds_fid2dentry(mds, &body->fid1, NULL);
1045                 if (IS_ERR(de))
1046                         GOTO(out, rc = PTR_ERR(de));
1047
1048                 rc = file->f_op->fsync(NULL, de, 1);
1049                 l_dput(de);
1050                 if (rc)
1051                         GOTO(out, rc);
1052
1053                 body = lustre_msg_buf(req->rq_repmsg, 0, sizeof(*body));
1054                 mds_pack_inode2fid(obd, &body->fid1, de->d_inode);
1055                 mds_pack_inode2body(obd, body, de->d_inode);
1056         }
1057 out:
1058         req->rq_status = rc;
1059         return 0;
1060 }
1061
1062 /* mds_readpage does not take a DLM lock on the inode, because the client must
1063  * already have a PR lock.
1064  *
1065  * If we were to take another one here, a deadlock will result, if another
1066  * thread is already waiting for a PW lock. */
1067 static int mds_readpage(struct ptlrpc_request *req)
1068 {
1069         struct obd_device *obd = req->rq_export->exp_obd;
1070         struct vfsmount *mnt;
1071         struct dentry *de;
1072         struct file *file;
1073         struct mds_body *body, *repbody;
1074         struct lvfs_run_ctxt saved;
1075         int rc, size = sizeof(*repbody);
1076         struct lvfs_ucred uc;
1077         ENTRY;
1078
1079         rc = lustre_pack_reply(req, 1, &size, NULL);
1080         if (rc || OBD_FAIL_CHECK(OBD_FAIL_MDS_READPAGE_PACK)) {
1081                 CERROR("mds: out of memory\n");
1082                 GOTO(out, rc = -ENOMEM);
1083         }
1084
1085         body = lustre_swab_reqbuf(req, 0, sizeof(*body), lustre_swab_mds_body);
1086         if (body == NULL)
1087                 GOTO (out, rc = -EFAULT);
1088
1089         uc.luc_fsuid = body->fsuid;
1090         uc.luc_fsgid = body->fsgid;
1091         uc.luc_cap = body->capability;
1092         push_ctxt(&saved, &obd->obd_lvfs_ctxt, &uc);
1093         de = mds_fid2dentry(&obd->u.mds, &body->fid1, &mnt);
1094         if (IS_ERR(de))
1095                 GOTO(out_pop, rc = PTR_ERR(de));
1096
1097         CDEBUG(D_INODE, "ino %lu\n", de->d_inode->i_ino);
1098
1099         file = dentry_open(de, mnt, O_RDONLY | O_LARGEFILE);
1100         /* note: in case of an error, dentry_open puts dentry */
1101         if (IS_ERR(file))
1102                 GOTO(out_pop, rc = PTR_ERR(file));
1103
1104         /* body->size is actually the offset -eeb */
1105         if ((body->size & (de->d_inode->i_blksize - 1)) != 0) {
1106                 CERROR("offset "LPU64" not on a block boundary of %lu\n",
1107                        body->size, de->d_inode->i_blksize);
1108                 GOTO(out_file, rc = -EFAULT);
1109         }
1110
1111         /* body->nlink is actually the #bytes to read -eeb */
1112         if (body->nlink & (de->d_inode->i_blksize - 1)) {
1113                 CERROR("size %u is not multiple of blocksize %lu\n",
1114                        body->nlink, de->d_inode->i_blksize);
1115                 GOTO(out_file, rc = -EFAULT);
1116         }
1117
1118         repbody = lustre_msg_buf(req->rq_repmsg, 0, sizeof (*repbody));
1119         repbody->size = file->f_dentry->d_inode->i_size;
1120         repbody->valid = OBD_MD_FLSIZE;
1121
1122         /* to make this asynchronous make sure that the handling function
1123            doesn't send a reply when this function completes. Instead a
1124            callback function would send the reply */
1125         /* body->size is actually the offset -eeb */
1126         rc = mds_sendpage(req, file, body->size, body->nlink);
1127
1128 out_file:
1129         filp_close(file, 0);
1130 out_pop:
1131         pop_ctxt(&saved, &obd->obd_lvfs_ctxt, &uc);
1132 out:
1133         req->rq_status = rc;
1134         RETURN(0);
1135 }
1136
1137 int mds_reint(struct ptlrpc_request *req, int offset,
1138               struct lustre_handle *lockh)
1139 {
1140         struct mds_update_record *rec; /* 116 bytes on the stack?  no sir! */
1141         int rc;
1142         ENTRY;
1143
1144         OBD_ALLOC(rec, sizeof(*rec));
1145         if (rec == NULL)
1146                 RETURN(-ENOMEM);
1147
1148         rc = mds_update_unpack(req, offset, rec);
1149         if (rc || OBD_FAIL_CHECK(OBD_FAIL_MDS_REINT_UNPACK)) {
1150                 CERROR("invalid record\n");
1151                 GOTO(out, req->rq_status = -EINVAL);
1152         }
1153         /* rc will be used to interrupt a for loop over multiple records */
1154         rc = mds_reint_rec(rec, offset, req, lockh);
1155  out:
1156         OBD_FREE(rec, sizeof(*rec));
1157         RETURN(rc);
1158 }
1159
1160 static int mds_filter_recovery_request(struct ptlrpc_request *req,
1161                                        struct obd_device *obd, int *process)
1162 {
1163         switch (req->rq_reqmsg->opc) {
1164         case MDS_CONNECT: /* This will never get here, but for completeness. */
1165         case OST_CONNECT: /* This will never get here, but for completeness. */
1166         case MDS_DISCONNECT:
1167         case OST_DISCONNECT:
1168                *process = 1;
1169                RETURN(0);
1170
1171         case MDS_CLOSE:
1172         case MDS_SYNC: /* used in unmounting */
1173         case OBD_PING:
1174         case MDS_REINT:
1175         case LDLM_ENQUEUE:
1176         case OST_CREATE:
1177                 *process = target_queue_recovery_request(req, obd);
1178                 RETURN(0);
1179
1180         default:
1181                 DEBUG_REQ(D_ERROR, req, "not permitted during recovery");
1182                 *process = 0;
1183                 /* XXX what should we set rq_status to here? */
1184                 req->rq_status = -EAGAIN;
1185                 RETURN(ptlrpc_error(req));
1186         }
1187 }
1188
1189 static char *reint_names[] = {
1190         [REINT_SETATTR] "setattr",
1191         [REINT_CREATE]  "create",
1192         [REINT_LINK]    "link",
1193         [REINT_UNLINK]  "unlink",
1194         [REINT_RENAME]  "rename",
1195         [REINT_OPEN]    "open",
1196 };
1197
1198 #define FILTER_VALID_FLAGS (OBD_MD_FLTYPE | OBD_MD_FLMODE | OBD_MD_FLGENER  |\
1199                             OBD_MD_FLSIZE | OBD_MD_FLBLOCKS | OBD_MD_FLBLKSZ|\
1200                             OBD_MD_FLATIME | OBD_MD_FLMTIME | OBD_MD_FLCTIME|\
1201                             OBD_MD_FLID) 
1202
1203 static void reconstruct_create(struct ptlrpc_request *req)
1204 {
1205         struct mds_export_data *med = &req->rq_export->exp_mds_data;
1206         struct mds_client_data *mcd = med->med_mcd;
1207         struct ost_body *body;
1208
1209         body = lustre_msg_buf(req->rq_repmsg, 0, sizeof(*body));
1210
1211         /* copy rc, transno and disp; steal locks */
1212         mds_req_from_mcd(req, mcd);
1213         CERROR("reconstruct reply for x"LPU64"\n", req->rq_xid);
1214 }
1215
1216 static int mdt_obj_create(struct ptlrpc_request *req)
1217 {
1218         struct obd_device *obd = req->rq_export->exp_obd;
1219         struct ldlm_res_id res_id = { .name = {0} };
1220         struct mds_obd *mds = &obd->u.mds;
1221         struct ost_body *body, *repbody;
1222         int rc, size = sizeof(*repbody);
1223         char fidname[LL_FID_NAMELEN];
1224         struct inode *parent_inode;
1225         struct lustre_handle lockh;
1226         struct lvfs_run_ctxt saved;
1227         ldlm_policy_data_t policy;
1228         struct dentry *new = NULL;
1229         struct dentry_params dp;
1230         int mealen, flags = 0;
1231         unsigned int tmpname;
1232         struct lvfs_ucred uc;
1233         struct mea *mea;
1234         void *handle;
1235         ENTRY;
1236        
1237         DEBUG_REQ(D_HA, req, "create remote object");
1238
1239         parent_inode = mds->mds_objects_dir->d_inode;
1240
1241         body = lustre_swab_reqbuf(req, 0, sizeof(*body),
1242                                   lustre_swab_ost_body);
1243         if (body == NULL)
1244                 RETURN(-EFAULT);
1245
1246         MDS_CHECK_RESENT(req, reconstruct_create(req));
1247
1248         uc.luc_fsuid = body->oa.o_uid;
1249         uc.luc_fsgid = body->oa.o_gid;
1250
1251         push_ctxt(&saved, &obd->obd_lvfs_ctxt, &uc);
1252         
1253         rc = lustre_pack_reply(req, 1, &size, NULL);
1254         if (rc)
1255                 RETURN(rc);
1256
1257         repbody = lustre_msg_buf(req->rq_repmsg, 0, sizeof(*repbody));
1258
1259         if (body->oa.o_flags & OBD_FL_RECREATE_OBJS) {
1260                 /* this is re-create request from MDS holding directory name.
1261                  * we have to lookup given ino/generation first. if it exists
1262                  * (good case) then there is nothing to do. if it does not
1263                  * then we have to recreate it */
1264                 struct ll_fid fid;
1265                 fid.id = body->oa.o_id;
1266                 fid.generation = body->oa.o_generation;
1267                 new = mds_fid2dentry(mds, &fid, NULL);
1268                 if (!IS_ERR(new) && new->d_inode) {
1269                         CWARN("mkdir() repairing is on its way: %lu/%lu\n",
1270                               (unsigned long) fid.id,
1271                               (unsigned long) fid.generation);
1272                         obdo_from_inode(&repbody->oa, new->d_inode,
1273                                         FILTER_VALID_FLAGS);
1274                         repbody->oa.o_id = new->d_inode->i_ino;
1275                         repbody->oa.o_generation = new->d_inode->i_generation;
1276                         repbody->oa.o_valid |= OBD_MD_FLID | OBD_MD_FLGENER;
1277                         GOTO(cleanup2, rc = 0);
1278                 }
1279                 CWARN("hmm. for some reason dir %lu/%lu (or reply) got lost\n",
1280                       (unsigned long) fid.id, (unsigned long) fid.generation);
1281                 LASSERT(new->d_inode == NULL ||
1282                         new->d_inode->i_generation != fid.generation);
1283                 l_dput(new); 
1284         }
1285         
1286         down(&parent_inode->i_sem);
1287         handle = fsfilt_start(obd, parent_inode, FSFILT_OP_MKDIR, NULL);
1288         LASSERT(!IS_ERR(handle));
1289
1290 repeat:
1291         tmpname = ll_insecure_random_int();
1292         rc = sprintf(fidname, "%u", tmpname);
1293         new = lookup_one_len(fidname, mds->mds_objects_dir, rc);
1294         if (IS_ERR(new)) {
1295                 CERROR("%s: can't lookup new inode (%s) for mkdir: %d\n",
1296                        obd->obd_name, fidname, (int) PTR_ERR(new));
1297                 fsfilt_commit(obd, new->d_inode, handle, 0);
1298                 up(&parent_inode->i_sem);
1299                 RETURN(PTR_ERR(new));
1300         } else if (new->d_inode) {
1301                 CERROR("%s: name exists. repeat\n", obd->obd_name);
1302                 goto repeat;
1303         }
1304
1305         new->d_fsdata = (void *) &dp;
1306         dp.p_inum = 0;
1307         dp.p_ptr = req;
1308
1309         if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_REPLAY) {
1310                 DEBUG_REQ(D_HA, req, "replay create obj %lu/%lu",
1311                           (unsigned long) body->oa.o_id,
1312                           (unsigned long) body->oa.o_generation);
1313                 dp.p_inum = body->oa.o_id;
1314                 dp.p_generation = body->oa.o_generation;
1315         }
1316         rc = vfs_mkdir(parent_inode, new, body->oa.o_mode);
1317         if (rc == 0) {
1318                 obdo_from_inode(&repbody->oa, new->d_inode, FILTER_VALID_FLAGS);
1319                 repbody->oa.o_id = new->d_inode->i_ino;
1320                 repbody->oa.o_generation = new->d_inode->i_generation;
1321                 repbody->oa.o_valid |= OBD_MD_FLID | OBD_MD_FLGENER;
1322
1323                 rc = fsfilt_del_dir_entry(obd, new);
1324                 up(&parent_inode->i_sem);
1325
1326                 if (rc) {
1327                         CERROR("can't remove name for object: %d\n", rc);
1328                         GOTO(cleanup, rc);
1329                 }
1330                         
1331                 /* this lock should be taken to serialize MDS modifications
1332                  * in failure case */
1333                 res_id.name[0] = new->d_inode->i_ino;
1334                 res_id.name[1] = new->d_inode->i_generation;
1335                 policy.l_inodebits.bits = MDS_INODELOCK_UPDATE;
1336                 rc = ldlm_cli_enqueue(NULL, NULL, obd->obd_namespace,
1337                                 res_id, LDLM_IBITS, &policy,
1338                                 LCK_EX, &flags, mds_blocking_ast,
1339                                 ldlm_completion_ast, NULL, NULL,
1340                                 NULL, 0, NULL, &lockh);
1341                 if (rc != ELDLM_OK)
1342                         GOTO(cleanup, rc);
1343
1344                 CDEBUG(D_OTHER, "created dirobj: %lu/%lu mode %o\n",
1345                                 (unsigned long) new->d_inode->i_ino,
1346                                 (unsigned long) new->d_inode->i_generation,
1347                                 (unsigned) new->d_inode->i_mode);
1348         } else {
1349                 up(&parent_inode->i_sem);
1350                 CERROR("%s: can't create dirobj: %d\n", obd->obd_name, rc);
1351         }
1352
1353         if (rc == 0 && body->oa.o_valid & OBD_MD_FLID) {
1354                 /* this is new object for splitted dir. we have to
1355                  * prevent recursive splitting on it -bzzz */
1356                 mealen = obd_size_diskmd(mds->mds_lmv_exp, NULL);
1357                 OBD_ALLOC(mea, mealen);
1358                 if (mea == NULL)
1359                         GOTO(cleanup, rc = -ENOMEM);
1360                 mea->mea_count = 0;
1361                 down(&new->d_inode->i_sem);
1362                 rc = fsfilt_set_md(obd, new->d_inode, handle, mea, mealen);
1363                 up(&new->d_inode->i_sem);
1364                 OBD_FREE(mea, mealen);
1365         }
1366
1367 cleanup:
1368         rc = mds_finish_transno(mds, parent_inode, handle, req, rc, 0);
1369         if (rc == 0)
1370                 ptlrpc_save_lock(req, &lockh, LCK_EX);
1371         else
1372                 ldlm_lock_decref(&lockh, LCK_EX);
1373 cleanup2:
1374         l_dput(new);
1375         pop_ctxt(&saved, &obd->obd_lvfs_ctxt, &uc);
1376         RETURN(rc);
1377 }
1378
1379 static int mdt_get_info(struct ptlrpc_request *req)
1380 {
1381         char *key;
1382         struct obd_export *exp = req->rq_export;
1383         int keylen, rc = 0, size = sizeof(obd_id);
1384         obd_id *reply;
1385         ENTRY;
1386
1387         key = lustre_msg_buf(req->rq_reqmsg, 0, 1);
1388         if (key == NULL) {
1389                 DEBUG_REQ(D_HA, req, "no get_info key");
1390                 RETURN(-EFAULT);
1391         }
1392         keylen = req->rq_reqmsg->buflens[0];
1393
1394         if (keylen < strlen("mdsize") || memcmp(key, "mdsize", 6) != 0)
1395                 RETURN(-EPROTO);
1396
1397         rc = lustre_pack_reply(req, 1, &size, NULL);
1398         if (rc)
1399                 RETURN(rc);
1400
1401         reply = lustre_msg_buf(req->rq_repmsg, 0, sizeof(*reply));
1402         rc = obd_get_info(exp, keylen, key, &size, reply);
1403         req->rq_repmsg->status = 0;
1404         RETURN(rc);
1405 }
1406
1407 static int mds_set_info(struct obd_export *exp, __u32 keylen,
1408                         void *key, __u32 vallen, void *val)
1409 {
1410         struct obd_device *obd;
1411         struct mds_obd *mds;
1412         int    rc = 0;
1413         ENTRY;
1414
1415         obd = class_exp2obd(exp);
1416         if (obd == NULL) {
1417                 CDEBUG(D_IOCTL, "invalid client cookie "LPX64"\n",
1418                        exp->exp_handle.h_cookie);
1419                 RETURN(-EINVAL);
1420         }
1421
1422         mds = &obd->u.mds;
1423         if (keylen == strlen("mds_num") &&
1424             memcmp(key, "mds_num", keylen) == 0) {
1425                 int valsize;
1426                 __u32 group;
1427                 CDEBUG(D_IOCTL, "set mds num %d\n", *(int*)val);
1428                 mds->mds_num = *(int*)val;
1429                 group = FILTER_GROUP_FIRST_MDS + mds->mds_num;
1430                 valsize = sizeof(group);
1431                 /*mds number has been changed, so the corresponding obdfilter exp
1432                  *need to be changed too*/
1433                 rc = obd_set_info(mds->mds_osc_exp, strlen("mds_conn"), "mds_conn",
1434                           valsize, &group);
1435                 RETURN(rc);
1436         } else if (keylen == strlen("client") &&
1437                    memcmp(key, "client", keylen) == 0) {
1438                 if (!(exp->exp_flags & OBD_OPT_REAL_CLIENT)) {
1439                         atomic_inc(&mds->mds_real_clients);
1440                         CDEBUG(D_OTHER,"%s: peer from %s is real client (%d)\n",
1441                                obd->obd_name, exp->exp_client_uuid.uuid,
1442                                atomic_read(&mds->mds_real_clients));
1443                         exp->exp_flags |= OBD_OPT_REAL_CLIENT;
1444                 }
1445                 if (mds->mds_lmv_name) {
1446                         rc = mds_lmv_connect(obd, mds->mds_lmv_name);
1447                         LASSERT(rc == 0);
1448                 }
1449                 RETURN(0);
1450         }
1451         CDEBUG(D_IOCTL, "invalid key\n");
1452         RETURN(-EINVAL);
1453 }
1454
1455 static int mdt_set_info(struct ptlrpc_request *req)
1456 {
1457         char *key, *val;
1458         struct obd_export *exp = req->rq_export;
1459         int keylen, rc = 0, vallen;
1460         ENTRY;
1461
1462         key = lustre_msg_buf(req->rq_reqmsg, 0, 1);
1463         if (key == NULL) {
1464                 DEBUG_REQ(D_HA, req, "no set_info key");
1465                 RETURN(-EFAULT);
1466         }
1467         keylen = req->rq_reqmsg->buflens[0];
1468
1469         if (keylen == strlen("mds_num") &&
1470             memcmp(key, "mds_num", keylen) == 0) {
1471                 rc = lustre_pack_reply(req, 0, NULL, NULL);
1472                 if (rc)
1473                         RETURN(rc);
1474                 val = lustre_msg_buf(req->rq_reqmsg, 1, 0);
1475
1476                 vallen = req->rq_reqmsg->buflens[1];
1477
1478                 rc = obd_set_info(exp, keylen, key, vallen, val);
1479                 req->rq_repmsg->status = 0;
1480                 RETURN(rc);
1481         } else if (keylen == strlen("client") &&
1482                    memcmp(key, "client", keylen) == 0) {
1483                 rc = lustre_pack_reply(req, 0, NULL, NULL);
1484                 if (rc)
1485                         RETURN(rc);
1486                 rc = obd_set_info(exp, keylen, key, sizeof(obd_id), NULL);
1487                 req->rq_repmsg->status = 0;
1488                 RETURN(rc);
1489         } 
1490         CDEBUG(D_IOCTL, "invalid key\n");
1491         RETURN(-EINVAL);
1492 }
1493
1494 extern int ost_brw_write(struct ptlrpc_request *, struct obd_trans_info *);
1495 int mds_handle(struct ptlrpc_request *req)
1496 {
1497         int should_process, fail = OBD_FAIL_MDS_ALL_REPLY_NET;
1498         int rc = 0;
1499         struct mds_obd *mds = NULL; /* quell gcc overwarning */
1500         struct obd_device *obd = NULL;
1501         ENTRY;
1502
1503         OBD_FAIL_RETURN(OBD_FAIL_MDS_ALL_REQUEST_NET | OBD_FAIL_ONCE, 0);
1504
1505         LASSERT(current->journal_info == NULL);
1506         /* XXX identical to OST */
1507         if (req->rq_reqmsg->opc != MDS_CONNECT) {
1508                 struct mds_export_data *med;
1509                 int recovering, abort_recovery;
1510
1511                 if (req->rq_export == NULL) {
1512                         CERROR("lustre_mds: operation %d on unconnected MDS\n",
1513                                req->rq_reqmsg->opc);
1514                         req->rq_status = -ENOTCONN;
1515                         GOTO(out, rc = -ENOTCONN);
1516                 }
1517
1518                 med = &req->rq_export->exp_mds_data;
1519                 obd = req->rq_export->exp_obd;
1520                 mds = &obd->u.mds;
1521
1522                 /* sanity check: if the xid matches, the request must
1523                  * be marked as a resent or replayed */
1524                 if (req->rq_xid == med->med_mcd->mcd_last_xid)
1525                         LASSERTF(lustre_msg_get_flags(req->rq_reqmsg) &
1526                                  (MSG_RESENT | MSG_REPLAY),
1527                                  "rq_xid "LPU64" matches last_xid, "
1528                                  "expected RESENT flag\n",
1529                                  req->rq_xid);
1530                 /* else: note the opposite is not always true; a
1531                  * RESENT req after a failover will usually not match
1532                  * the last_xid, since it was likely never
1533                  * committed. A REPLAYed request will almost never
1534                  * match the last xid, however it could for a
1535                  * committed, but still retained, open. */
1536
1537                 /* Check for aborted recovery. */
1538                 spin_lock_bh(&obd->obd_processing_task_lock);
1539                 abort_recovery = obd->obd_abort_recovery;
1540                 recovering = obd->obd_recovering;
1541                 spin_unlock_bh(&obd->obd_processing_task_lock);
1542                 if (abort_recovery) {
1543                         target_abort_recovery(obd);
1544                 } else if (recovering) {
1545                         rc = mds_filter_recovery_request(req, obd,
1546                                                          &should_process);
1547                         if (rc || !should_process)
1548                                 RETURN(rc);
1549                 }
1550         }
1551
1552         switch (req->rq_reqmsg->opc) {
1553         case MDS_CONNECT:
1554                 DEBUG_REQ(D_INODE, req, "connect");
1555                 OBD_FAIL_RETURN(OBD_FAIL_MDS_CONNECT_NET, 0);
1556                 rc = target_handle_connect(req, mds_handle);
1557                 if (!rc)
1558                         /* Now that we have an export, set mds. */
1559                         mds = mds_req2mds(req);
1560                 break;
1561
1562         case MDS_DISCONNECT:
1563                 DEBUG_REQ(D_INODE, req, "disconnect");
1564                 OBD_FAIL_RETURN(OBD_FAIL_MDS_DISCONNECT_NET, 0);
1565                 rc = target_handle_disconnect(req);
1566                 req->rq_status = rc;            /* superfluous? */
1567                 break;
1568
1569         case MDS_GETSTATUS:
1570                 DEBUG_REQ(D_INODE, req, "getstatus");
1571                 OBD_FAIL_RETURN(OBD_FAIL_MDS_GETSTATUS_NET, 0);
1572                 rc = mds_getstatus(req);
1573                 break;
1574
1575         case MDS_GETATTR:
1576                 DEBUG_REQ(D_INODE, req, "getattr");
1577                 OBD_FAIL_RETURN(OBD_FAIL_MDS_GETATTR_NET, 0);
1578                 rc = mds_getattr(0, req);
1579                 break;
1580
1581         case MDS_GETATTR_NAME: {
1582                 struct lustre_handle lockh;
1583                 DEBUG_REQ(D_INODE, req, "getattr_name");
1584                 OBD_FAIL_RETURN(OBD_FAIL_MDS_GETATTR_NAME_NET, 0);
1585
1586                 /* If this request gets a reconstructed reply, we won't be
1587                  * acquiring any new locks in mds_getattr_name, so we don't
1588                  * want to cancel.
1589                  */
1590                 lockh.cookie = 0;
1591                 rc = mds_getattr_name(0, req, &lockh, MDS_INODELOCK_UPDATE);
1592                 /* this non-intent call (from an ioctl) is special */
1593                 req->rq_status = rc;
1594                 if (rc == 0 && lockh.cookie)
1595                         ldlm_lock_decref(&lockh, LCK_PR);
1596                 break;
1597         }
1598         case MDS_STATFS:
1599                 DEBUG_REQ(D_INODE, req, "statfs");
1600                 OBD_FAIL_RETURN(OBD_FAIL_MDS_STATFS_NET, 0);
1601                 rc = mds_statfs(req);
1602                 break;
1603
1604         case MDS_READPAGE:
1605                 DEBUG_REQ(D_INODE, req, "readpage");
1606                 OBD_FAIL_RETURN(OBD_FAIL_MDS_READPAGE_NET, 0);
1607                 rc = mds_readpage(req);
1608
1609                 if (OBD_FAIL_CHECK_ONCE(OBD_FAIL_MDS_SENDPAGE)) {
1610                         if (req->rq_reply_state) {
1611                                 lustre_free_reply_state (req->rq_reply_state);
1612                                 req->rq_reply_state = NULL;
1613                         }
1614                         RETURN(0);
1615                 }
1616
1617                 break;
1618
1619         case MDS_REINT: {
1620                 __u32 *opcp = lustre_msg_buf(req->rq_reqmsg, 0, sizeof (*opcp));
1621                 __u32  opc;
1622                 int size[3] = {sizeof(struct mds_body), mds->mds_max_mdsize,
1623                                mds->mds_max_cookiesize};
1624                 int bufcount;
1625
1626                 /* NB only peek inside req now; mds_reint() will swab it */
1627                 if (opcp == NULL) {
1628                         CERROR ("Can't inspect opcode\n");
1629                         rc = -EINVAL;
1630                         break;
1631                 }
1632                 opc = *opcp;
1633                 if (lustre_msg_swabbed (req->rq_reqmsg))
1634                         __swab32s(&opc);
1635
1636                 DEBUG_REQ(D_INODE, req, "reint %d (%s)", opc,
1637                           (opc < sizeof(reint_names) / sizeof(reint_names[0]) ||
1638                            reint_names[opc] == NULL) ? reint_names[opc] :
1639                                                        "unknown opcode");
1640
1641                 OBD_FAIL_RETURN(OBD_FAIL_MDS_REINT_NET, 0);
1642
1643                 if (opc == REINT_UNLINK)
1644                         bufcount = 3;
1645                 else if (opc == REINT_OPEN || opc == REINT_RENAME)
1646                         bufcount = 2;
1647                 else
1648                         bufcount = 1;
1649
1650                 rc = lustre_pack_reply(req, bufcount, size, NULL);
1651                 if (rc)
1652                         break;
1653
1654                 rc = mds_reint(req, 0, NULL);
1655                 fail = OBD_FAIL_MDS_REINT_NET_REP;
1656                 break;
1657         }
1658
1659         case MDS_CLOSE:
1660                 DEBUG_REQ(D_INODE, req, "close");
1661                 OBD_FAIL_RETURN(OBD_FAIL_MDS_CLOSE_NET, 0);
1662                 rc = mds_close(req);
1663                 break;
1664
1665         case MDS_DONE_WRITING:
1666                 DEBUG_REQ(D_INODE, req, "done_writing");
1667                 OBD_FAIL_RETURN(OBD_FAIL_MDS_DONE_WRITING_NET, 0);
1668                 rc = mds_done_writing(req);
1669                 break;
1670
1671         case MDS_PIN:
1672                 DEBUG_REQ(D_INODE, req, "pin");
1673                 OBD_FAIL_RETURN(OBD_FAIL_MDS_PIN_NET, 0);
1674                 rc = mds_pin(req);
1675                 break;
1676
1677         case MDS_SYNC:
1678                 DEBUG_REQ(D_INODE, req, "sync");
1679                 OBD_FAIL_RETURN(OBD_FAIL_MDS_SYNC_NET, 0);
1680                 rc = mds_sync(req);
1681                 break;
1682
1683         case OBD_PING:
1684                 DEBUG_REQ(D_INODE, req, "ping");
1685                 rc = target_handle_ping(req);
1686                 break;
1687
1688         case OBD_LOG_CANCEL:
1689                 CDEBUG(D_INODE, "log cancel\n");
1690                 OBD_FAIL_RETURN(OBD_FAIL_OBD_LOG_CANCEL_NET, 0);
1691                 rc = -ENOTSUPP; /* la la la */
1692                 break;
1693
1694         case LDLM_ENQUEUE:
1695                 DEBUG_REQ(D_INODE, req, "enqueue");
1696                 OBD_FAIL_RETURN(OBD_FAIL_LDLM_ENQUEUE, 0);
1697                 rc = ldlm_handle_enqueue(req, ldlm_server_completion_ast,
1698                                          ldlm_server_blocking_ast, NULL);
1699                 break;
1700         case LDLM_CONVERT:
1701                 DEBUG_REQ(D_INODE, req, "convert");
1702                 OBD_FAIL_RETURN(OBD_FAIL_LDLM_CONVERT, 0);
1703                 rc = ldlm_handle_convert(req);
1704                 break;
1705         case LDLM_BL_CALLBACK:
1706         case LDLM_CP_CALLBACK:
1707                 DEBUG_REQ(D_INODE, req, "callback");
1708                 CERROR("callbacks should not happen on MDS\n");
1709                 LBUG();
1710                 OBD_FAIL_RETURN(OBD_FAIL_LDLM_BL_CALLBACK, 0);
1711                 break;
1712         case LLOG_ORIGIN_HANDLE_CREATE:
1713                 DEBUG_REQ(D_INODE, req, "llog_init");
1714                 OBD_FAIL_RETURN(OBD_FAIL_OBD_LOGD_NET, 0);
1715                 rc = llog_origin_handle_create(req);
1716                 break;
1717         case LLOG_ORIGIN_HANDLE_NEXT_BLOCK:
1718                 DEBUG_REQ(D_INODE, req, "llog next block");
1719                 OBD_FAIL_RETURN(OBD_FAIL_OBD_LOGD_NET, 0);
1720                 rc = llog_origin_handle_next_block(req);
1721                 break;
1722         case LLOG_ORIGIN_HANDLE_PREV_BLOCK:
1723                 DEBUG_REQ(D_INODE, req, "llog prev block");
1724                 OBD_FAIL_RETURN(OBD_FAIL_OBD_LOGD_NET, 0);
1725                 rc = llog_origin_handle_prev_block(req);
1726                 break;
1727         case LLOG_ORIGIN_HANDLE_READ_HEADER:
1728                 DEBUG_REQ(D_INODE, req, "llog read header");
1729                 OBD_FAIL_RETURN(OBD_FAIL_OBD_LOGD_NET, 0);
1730                 rc = llog_origin_handle_read_header(req);
1731                 break;
1732         case LLOG_ORIGIN_HANDLE_CLOSE:
1733                 DEBUG_REQ(D_INODE, req, "llog close");
1734                 OBD_FAIL_RETURN(OBD_FAIL_OBD_LOGD_NET, 0);
1735                 rc = llog_origin_handle_close(req);
1736                 break;
1737         case OST_CREATE:
1738                 DEBUG_REQ(D_INODE, req, "ost_create");
1739                 rc = mdt_obj_create(req);
1740                 break;
1741         case OST_GET_INFO:
1742                 DEBUG_REQ(D_INODE, req, "get_info");
1743                 rc = mdt_get_info(req);
1744                 break;
1745         case OST_SET_INFO:
1746                 DEBUG_REQ(D_INODE, req, "set_info");
1747                 rc = mdt_set_info(req);
1748                 break;
1749         case OST_WRITE:
1750                 CDEBUG(D_INODE, "write\n");
1751                 OBD_FAIL_RETURN(OBD_FAIL_OST_BRW_NET, 0);
1752                 rc = ost_brw_write(req, NULL);
1753                 LASSERT(current->journal_info == NULL);
1754                 /* mdt_brw sends its own replies */
1755                 RETURN(rc);
1756                 break;
1757         case LLOG_CATINFO:
1758                 DEBUG_REQ(D_INODE, req, "llog catinfo");
1759                 OBD_FAIL_RETURN(OBD_FAIL_OBD_LOGD_NET, 0);
1760                 rc = llog_catinfo(req);
1761                 break;
1762         default:
1763                 req->rq_status = -ENOTSUPP;
1764                 rc = ptlrpc_error(req);
1765                 RETURN(rc);
1766         }
1767
1768         LASSERT(current->journal_info == NULL);
1769
1770         EXIT;
1771
1772         /* If we're DISCONNECTing, the mds_export_data is already freed */
1773         if (!rc && req->rq_reqmsg->opc != MDS_DISCONNECT) {
1774                 struct mds_export_data *med = &req->rq_export->exp_mds_data;
1775                 struct obd_device *obd = list_entry(mds, struct obd_device,
1776                                                     u.mds);
1777                 req->rq_repmsg->last_xid =
1778                         le64_to_cpu(med->med_mcd->mcd_last_xid);
1779
1780                 if (!obd->obd_no_transno) {
1781                         req->rq_repmsg->last_committed =
1782                                 obd->obd_last_committed;
1783                 } else {
1784                         DEBUG_REQ(D_IOCTL, req,
1785                                   "not sending last_committed update");
1786                 }
1787                 CDEBUG(D_INFO, "last_transno "LPU64", last_committed "LPU64
1788                        ", xid "LPU64"\n",
1789                        mds->mds_last_transno, obd->obd_last_committed,
1790                        req->rq_xid);
1791         }
1792  out:
1793
1794         if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_LAST_REPLAY) {
1795                 if (obd && obd->obd_recovering) {
1796                         DEBUG_REQ(D_HA, req, "LAST_REPLAY, queuing reply");
1797                         return target_queue_final_reply(req, rc);
1798                 }
1799                 /* Lost a race with recovery; let the error path DTRT. */
1800                 rc = req->rq_status = -ENOTCONN;
1801         }
1802
1803         target_send_reply(req, rc, fail);
1804         return 0;
1805 }
1806
1807 /* Update the server data on disk.  This stores the new mount_count and
1808  * also the last_rcvd value to disk.  If we don't have a clean shutdown,
1809  * then the server last_rcvd value may be less than that of the clients.
1810  * This will alert us that we may need to do client recovery.
1811  *
1812  * Also assumes for mds_last_transno that we are not modifying it (no locking).
1813  */
1814 int mds_update_server_data(struct obd_device *obd, int force_sync)
1815 {
1816         struct mds_obd *mds = &obd->u.mds;
1817         struct mds_server_data *msd = mds->mds_server_data;
1818         struct file *filp = mds->mds_rcvd_filp;
1819         struct lvfs_run_ctxt saved;
1820         loff_t off = 0;
1821         int rc;
1822         ENTRY;
1823
1824         push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
1825         msd->msd_last_transno = cpu_to_le64(mds->mds_last_transno);
1826
1827         CDEBUG(D_SUPER, "MDS mount_count is "LPU64", last_transno is "LPU64"\n",
1828                mds->mds_mount_count, mds->mds_last_transno);
1829         rc = fsfilt_write_record(obd, filp, msd, sizeof(*msd), &off,force_sync);
1830         if (rc)
1831                 CERROR("error writing MDS server data: rc = %d\n", rc);
1832         pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
1833
1834         RETURN(rc);
1835 }
1836
1837 /* mount the file system (secretly) */
1838 static int mds_setup(struct obd_device *obd, obd_count len, void *buf)
1839 {
1840         struct lustre_cfg* lcfg = buf;
1841         struct mds_obd *mds = &obd->u.mds;
1842         char *options = NULL;
1843         struct vfsmount *mnt;
1844         unsigned long page;
1845         int rc = 0;
1846         ENTRY;
1847
1848         dev_clear_rdonly(2);
1849
1850         if (!lcfg->lcfg_inlbuf1 || !lcfg->lcfg_inlbuf2)
1851                 RETURN(rc = -EINVAL);
1852
1853         obd->obd_fsops = fsfilt_get_ops(lcfg->lcfg_inlbuf2);
1854         if (IS_ERR(obd->obd_fsops))
1855                 RETURN(rc = PTR_ERR(obd->obd_fsops));
1856
1857         mds->mds_max_mdsize = sizeof(struct lov_mds_md);
1858
1859         page = __get_free_page(GFP_KERNEL);
1860         if (!page)
1861                 RETURN(-ENOMEM);
1862
1863         options = (char *)page;
1864         memset(options, 0, PAGE_SIZE);
1865
1866         /* here we use "iopen_nopriv" hardcoded, because it affects MDS utility
1867          * and the rest of options are passed by mount options. Probably this
1868          * should be moved to somewhere else like startup scripts or lconf. */
1869         sprintf(options, "iopen_nopriv");
1870
1871         if (lcfg->lcfg_inllen4 > 0 && lcfg->lcfg_inlbuf4)
1872                 sprintf(options + strlen(options), ",%s",
1873                         lcfg->lcfg_inlbuf4);
1874
1875         /* we have to know mdsnum before touching underlying fs -bzzz */
1876         if (lcfg->lcfg_inllen5 > 0 && lcfg->lcfg_inlbuf5 && 
1877             strcmp(lcfg->lcfg_inlbuf5, "dumb")) {
1878                 class_uuid_t uuid;
1879
1880                 CDEBUG(D_OTHER, "MDS: %s is master for %s\n",
1881                        obd->obd_name, lcfg->lcfg_inlbuf5);
1882
1883                 generate_random_uuid(uuid);
1884                 class_uuid_unparse(uuid, &mds->mds_lmv_uuid);
1885
1886                 OBD_ALLOC(mds->mds_lmv_name, lcfg->lcfg_inllen5);
1887                 if (mds->mds_lmv_name == NULL) 
1888                         RETURN(rc = -ENOMEM);
1889
1890                 memcpy(mds->mds_lmv_name, lcfg->lcfg_inlbuf5,
1891                        lcfg->lcfg_inllen5);
1892                 
1893                 rc = mds_lmv_connect(obd, mds->mds_lmv_name);
1894                 if (rc) {
1895                         OBD_FREE(mds->mds_lmv_name, lcfg->lcfg_inllen5);
1896                         GOTO(err_ops, rc);
1897                 }
1898         }
1899         
1900         /* FIXME-WANGDI: this should be reworked when we will use lmv along 
1901          * with cobd, because correct mdsnum is set in mds_lmv_connect(). */
1902         if (lcfg->lcfg_inllen6 > 0 && lcfg->lcfg_inlbuf6 && !mds->mds_lmv_obd &&
1903             strcmp(lcfg->lcfg_inlbuf6, "dumb")) {
1904                 if (!memcmp(lcfg->lcfg_inlbuf6, "master", strlen("master")) &&
1905                     mds->mds_num == 0) {
1906                         mds->mds_num = REAL_MDS_NUMBER;
1907                 } else if (!memcmp(lcfg->lcfg_inlbuf6, "cache", strlen("cache")) &&
1908                            mds->mds_num == 0) {
1909                         mds->mds_num = CACHE_MDS_NUMBER;
1910                 }     
1911         }
1912
1913         mnt = do_kern_mount(lcfg->lcfg_inlbuf2, 0, 
1914                             lcfg->lcfg_inlbuf1, options);
1915
1916         free_page(page);
1917
1918         if (IS_ERR(mnt)) {
1919                 rc = PTR_ERR(mnt);
1920                 CERROR("do_kern_mount failed: rc = %d\n", rc);
1921                 GOTO(err_ops, rc);
1922         }
1923
1924         CDEBUG(D_SUPER, "%s: mnt = %p\n", lcfg->lcfg_inlbuf1, mnt);
1925
1926         sema_init(&mds->mds_orphan_recovery_sem, 1);
1927         sema_init(&mds->mds_epoch_sem, 1);
1928         spin_lock_init(&mds->mds_transno_lock);
1929         mds->mds_max_cookiesize = sizeof(struct llog_cookie);
1930         atomic_set(&mds->mds_open_count, 0);
1931         atomic_set(&mds->mds_real_clients, 0);
1932
1933         obd->obd_namespace = ldlm_namespace_new(obd->obd_name,
1934                                                 LDLM_NAMESPACE_SERVER);
1935         if (obd->obd_namespace == NULL) {
1936                 mds_cleanup(obd, 0);
1937                 GOTO(err_put, rc = -ENOMEM);
1938         }
1939         ldlm_register_intent(obd->obd_namespace, mds_intent_policy);
1940
1941         rc = mds_fs_setup(obd, mnt);
1942         if (rc) {
1943                 CERROR("MDS filesystem method init failed: rc = %d\n", rc);
1944                 GOTO(err_ns, rc);
1945         }
1946
1947         rc = llog_start_commit_thread();
1948         if (rc < 0)
1949                 GOTO(err_fs, rc);
1950
1951         if (lcfg->lcfg_inllen3 > 0 && lcfg->lcfg_inlbuf3) {
1952                 class_uuid_t uuid;
1953
1954                 generate_random_uuid(uuid);
1955                 class_uuid_unparse(uuid, &mds->mds_lov_uuid);
1956
1957                 OBD_ALLOC(mds->mds_profile, lcfg->lcfg_inllen3);
1958                 if (mds->mds_profile == NULL)
1959                         GOTO(err_fs, rc = -ENOMEM);
1960
1961                 memcpy(mds->mds_profile, lcfg->lcfg_inlbuf3,
1962                        lcfg->lcfg_inllen3);
1963         }
1964
1965         ptlrpc_init_client(LDLM_CB_REQUEST_PORTAL, LDLM_CB_REPLY_PORTAL,
1966                            "mds_ldlm_client", &obd->obd_ldlm_client);
1967         obd->obd_replayable = 1;
1968
1969         rc = mds_postsetup(obd);
1970         if (rc)
1971                 GOTO(err_fs, rc);
1972
1973         RETURN(0);
1974
1975 err_fs:
1976         /* No extra cleanup needed for llog_init_commit_thread() */
1977         mds_fs_cleanup(obd, 0);
1978 err_ns:
1979         ldlm_namespace_free(obd->obd_namespace, 0);
1980         obd->obd_namespace = NULL;
1981 err_put:
1982         unlock_kernel();
1983         mntput(mds->mds_vfsmnt);
1984         mds->mds_sb = 0;
1985         lock_kernel();
1986 err_ops:
1987         fsfilt_put_ops(obd->obd_fsops);
1988         return rc;
1989 }
1990
1991 static int mds_postsetup(struct obd_device *obd)
1992 {
1993         struct mds_obd *mds = &obd->u.mds;
1994         int rc = 0;
1995         ENTRY;
1996
1997         rc = obd_llog_setup(obd, &obd->obd_llogs, LLOG_CONFIG_ORIG_CTXT, 
1998                             obd, 0, NULL, &llog_lvfs_ops);
1999         if (rc)
2000                 RETURN(rc);
2001
2002         /* This check for @dumb string is needed to handle mounting MDS 
2003            with smfs. Read lconf:MDSDEV.write_conf() for more detail 
2004            explanation. */
2005         if (mds->mds_profile && strcmp(mds->mds_profile, "dumb")) {
2006                 struct lvfs_run_ctxt saved;
2007                 struct lustre_profile *lprof;
2008                 struct config_llog_instance cfg;
2009
2010                 cfg.cfg_instance = NULL;
2011                 cfg.cfg_uuid = mds->mds_lov_uuid;
2012                 push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
2013                 rc = class_config_parse_llog(llog_get_context(&obd->obd_llogs, LLOG_CONFIG_ORIG_CTXT),
2014                                              mds->mds_profile, &cfg);
2015                 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
2016                 if (rc)
2017                         GOTO(err_llog, rc);
2018
2019                 lprof = class_get_profile(mds->mds_profile);
2020                 if (lprof == NULL) {
2021                         CERROR("No profile found: %s\n", mds->mds_profile);
2022                         GOTO(err_cleanup, rc = -ENOENT);
2023                 }
2024                 rc = mds_lov_connect(obd, lprof->lp_osc);
2025                 if (rc)
2026                         GOTO(err_cleanup, rc);
2027
2028                 rc = mds_lmv_postsetup(obd);
2029                 if (rc)
2030                         GOTO(err_cleanup, rc);
2031         }
2032
2033         RETURN(rc);
2034
2035 err_cleanup:
2036         mds_lov_clean(obd);
2037 err_llog:
2038         obd_llog_cleanup(llog_get_context(&obd->obd_llogs, LLOG_CONFIG_ORIG_CTXT));
2039         RETURN(rc);
2040 }
2041
2042 static int mds_postrecov(struct obd_device *obd)
2043
2044 {
2045         struct llog_ctxt *ctxt;
2046         int rc, rc2;
2047         ENTRY;
2048
2049         LASSERT(!obd->obd_recovering);
2050         ctxt = llog_get_context(&obd->obd_llogs, LLOG_UNLINK_ORIG_CTXT);
2051         LASSERT(ctxt != NULL);
2052
2053         rc = llog_connect(ctxt, obd->u.mds.mds_lov_desc.ld_tgt_count,
2054                           NULL, NULL, NULL);
2055         if (rc != 0) {
2056                 CERROR("faild at llog_origin_connect: %d\n", rc);
2057         }
2058
2059         rc = mds_cleanup_orphans(obd);
2060
2061         rc2 = mds_lov_set_nextid(obd);
2062         if (rc2 == 0)
2063                 rc2 = rc;
2064         RETURN(rc2);
2065 }
2066
2067 int mds_lov_clean(struct obd_device *obd)
2068 {
2069         struct mds_obd *mds = &obd->u.mds;
2070
2071         if (mds->mds_profile) {
2072                 char * cln_prof;
2073                 struct config_llog_instance cfg;
2074                 struct lvfs_run_ctxt saved;
2075                 int len = strlen(mds->mds_profile) + sizeof("-clean") + 1;
2076
2077                 OBD_ALLOC(cln_prof, len);
2078                 sprintf(cln_prof, "%s-clean", mds->mds_profile);
2079
2080                 cfg.cfg_instance = NULL;
2081                 cfg.cfg_uuid = mds->mds_lov_uuid;
2082
2083                 push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
2084                 class_config_parse_llog(llog_get_context(&obd->obd_llogs, LLOG_CONFIG_ORIG_CTXT),
2085                                         cln_prof, &cfg);
2086                 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
2087
2088                 OBD_FREE(cln_prof, len);
2089                 OBD_FREE(mds->mds_profile, strlen(mds->mds_profile) + 1);
2090                 mds->mds_profile = NULL;
2091         }
2092         RETURN(0);
2093 }
2094
2095 int mds_lmv_clean(struct obd_device *obd)
2096 {
2097         struct mds_obd *mds = &obd->u.mds;
2098
2099         if (mds->mds_lmv_name) {
2100                 OBD_FREE(mds->mds_lmv_name, strlen(mds->mds_lmv_name) + 1);
2101                 mds->mds_lmv_name = NULL;
2102         }
2103         RETURN(0);
2104 }
2105
2106 static int mds_precleanup(struct obd_device *obd, int flags)
2107 {
2108         int rc = 0;
2109         ENTRY;
2110
2111         mds_lmv_clean(obd);
2112         mds_lov_disconnect(obd, flags);
2113         mds_lov_clean(obd);
2114         obd_llog_cleanup(llog_get_context(&obd->obd_llogs, LLOG_CONFIG_ORIG_CTXT));
2115         RETURN(rc);
2116 }
2117
2118 static int mds_cleanup(struct obd_device *obd, int flags)
2119 {
2120         struct mds_obd *mds = &obd->u.mds;
2121         ENTRY;
2122
2123         if (mds->mds_sb == NULL)
2124                 RETURN(0);
2125
2126         mds_update_server_data(obd, 1);
2127         if (mds->mds_lov_objids != NULL) {
2128                 OBD_FREE(mds->mds_lov_objids,
2129                          mds->mds_lov_desc.ld_tgt_count * sizeof(obd_id));
2130         }
2131         mds_fs_cleanup(obd, flags);
2132
2133         unlock_kernel();
2134
2135         /* 2 seems normal on mds, (may_umount() also expects 2
2136           fwiw), but we only see 1 at this point in obdfilter. */
2137         if (atomic_read(&obd->u.mds.mds_vfsmnt->mnt_count) > 2)
2138                 CERROR("%s: mount busy, mnt_count %d != 2\n", obd->obd_name,
2139                        atomic_read(&obd->u.mds.mds_vfsmnt->mnt_count));
2140
2141         mntput(mds->mds_vfsmnt);
2142
2143         mds->mds_sb = 0;
2144
2145         ldlm_namespace_free(obd->obd_namespace, flags & OBD_OPT_FORCE);
2146
2147         spin_lock_bh(&obd->obd_processing_task_lock);
2148         if (obd->obd_recovering) {
2149                 target_cancel_recovery_timer(obd);
2150                 obd->obd_recovering = 0;
2151         }
2152         spin_unlock_bh(&obd->obd_processing_task_lock);
2153
2154         lock_kernel();
2155         dev_clear_rdonly(2);
2156         fsfilt_put_ops(obd->obd_fsops);
2157
2158         RETURN(0);
2159 }
2160
2161 static void fixup_handle_for_resent_req(struct ptlrpc_request *req,
2162                                         struct ldlm_lock *new_lock,
2163                                         struct lustre_handle *lockh)
2164 {
2165         struct obd_export *exp = req->rq_export;
2166         struct obd_device *obd = exp->exp_obd;
2167         struct ldlm_request *dlmreq =
2168                 lustre_msg_buf(req->rq_reqmsg, 0, sizeof (*dlmreq));
2169         struct lustre_handle remote_hdl = dlmreq->lock_handle1;
2170         struct list_head *iter;
2171
2172         if (!(lustre_msg_get_flags(req->rq_reqmsg) & MSG_RESENT))
2173                 return;
2174
2175         l_lock(&obd->obd_namespace->ns_lock);
2176         list_for_each(iter, &exp->exp_ldlm_data.led_held_locks) {
2177                 struct ldlm_lock *lock;
2178                 lock = list_entry(iter, struct ldlm_lock, l_export_chain);
2179                 if (lock == new_lock)
2180                         continue;
2181                 if (lock->l_remote_handle.cookie == remote_hdl.cookie) {
2182                         lockh->cookie = lock->l_handle.h_cookie;
2183                         DEBUG_REQ(D_HA, req, "restoring lock cookie "LPX64,
2184                                   lockh->cookie);
2185                         l_unlock(&obd->obd_namespace->ns_lock);
2186                         return;
2187                 }
2188         }
2189         l_unlock(&obd->obd_namespace->ns_lock);
2190
2191         /* If the xid matches, then we know this is a resent request,
2192          * and allow it. (It's probably an OPEN, for which we don't
2193          * send a lock */
2194         if (req->rq_xid == exp->exp_mds_data.med_mcd->mcd_last_xid)
2195                 return;
2196
2197         /* This remote handle isn't enqueued, so we never received or
2198          * processed this request.  Clear MSG_RESENT, because it can
2199          * be handled like any normal request now. */
2200
2201         lustre_msg_clear_flags(req->rq_reqmsg, MSG_RESENT);
2202
2203         DEBUG_REQ(D_HA, req, "no existing lock with rhandle "LPX64,
2204                   remote_hdl.cookie);
2205 }
2206
2207 int intent_disposition(struct ldlm_reply *rep, int flag)
2208 {
2209         if (!rep)
2210                 return 0;
2211         return (rep->lock_policy_res1 & flag);
2212 }
2213
2214 void intent_set_disposition(struct ldlm_reply *rep, int flag)
2215 {
2216         if (!rep)
2217                 return;
2218         rep->lock_policy_res1 |= flag;
2219 }
2220
2221 static int mds_intent_policy(struct ldlm_namespace *ns,
2222                              struct ldlm_lock **lockp, void *req_cookie,
2223                              ldlm_mode_t mode, int flags, void *data)
2224 {
2225         struct ptlrpc_request *req = req_cookie;
2226         struct ldlm_lock *lock = *lockp;
2227         struct ldlm_intent *it;
2228         struct mds_obd *mds = &req->rq_export->exp_obd->u.mds;
2229         struct ldlm_reply *rep;
2230         struct lustre_handle lockh = { 0 };
2231         struct ldlm_lock *new_lock;
2232         int getattr_part = MDS_INODELOCK_UPDATE;
2233         int rc, offset = 2, repsize[4] = {sizeof(struct ldlm_reply),
2234                                           sizeof(struct mds_body),
2235                                           mds->mds_max_mdsize,
2236                                           mds->mds_max_cookiesize};
2237         ENTRY;
2238
2239         LASSERT(req != NULL);
2240
2241         if (req->rq_reqmsg->bufcount <= 1) {
2242                 /* No intent was provided */
2243                 int size = sizeof(struct ldlm_reply);
2244                 rc = lustre_pack_reply(req, 1, &size, NULL);
2245                 LASSERT(rc == 0);
2246                 RETURN(0);
2247         }
2248
2249         it = lustre_swab_reqbuf(req, 1, sizeof(*it), lustre_swab_ldlm_intent);
2250         if (it == NULL) {
2251                 CERROR("Intent missing\n");
2252                 RETURN(req->rq_status = -EFAULT);
2253         }
2254
2255         LDLM_DEBUG(lock, "intent policy, opc: %s", ldlm_it2str(it->opc));
2256
2257         rc = lustre_pack_reply(req, it->opc == IT_UNLINK ? 4 : 3, repsize,
2258                                NULL);
2259         if (rc)
2260                 RETURN(req->rq_status = rc);
2261
2262         rep = lustre_msg_buf(req->rq_repmsg, 0, sizeof (*rep));
2263         intent_set_disposition(rep, DISP_IT_EXECD);
2264
2265         fixup_handle_for_resent_req(req, lock, &lockh);
2266
2267         /* execute policy */
2268         switch ((long)it->opc) {
2269         case IT_OPEN:
2270         case IT_CREAT|IT_OPEN:
2271                 /* XXX swab here to assert that an mds_open reint
2272                  * packet is following */
2273                 rep->lock_policy_res2 = mds_reint(req, offset, &lockh);
2274 #if 0
2275                 /* We abort the lock if the lookup was negative and
2276                  * we did not make it to the OPEN portion */
2277                 if (!intent_disposition(rep, DISP_LOOKUP_EXECD))
2278                         RETURN(ELDLM_LOCK_ABORTED);
2279                 if (intent_disposition(rep, DISP_LOOKUP_NEG) &&
2280                     !intent_disposition(rep, DISP_OPEN_OPEN))
2281 #endif 
2282                         if (rep->lock_policy_res2) {
2283                                 /* mds_open returns ENOLCK where it
2284                                  * should return zero, but it has no
2285                                  * lock to return */
2286                                 if (rep->lock_policy_res2 == ENOLCK)
2287                                         rep->lock_policy_res2 = 0;
2288                                 RETURN(ELDLM_LOCK_ABORTED);
2289                         }
2290                 break;
2291         case IT_LOOKUP:
2292                 getattr_part = MDS_INODELOCK_LOOKUP;
2293         case IT_GETATTR:
2294                 getattr_part |= MDS_INODELOCK_LOOKUP;
2295         case IT_READDIR:
2296                 rep->lock_policy_res2 = mds_getattr_name(offset, req, &lockh,
2297                                                          getattr_part);
2298                 /* FIXME: LDLM can set req->rq_status. MDS sets
2299                    policy_res{1,2} with disposition and status.
2300                    - replay: returns 0 & req->status is old status
2301                    - otherwise: returns req->status */
2302                 if (intent_disposition(rep, DISP_LOOKUP_NEG))
2303                         rep->lock_policy_res2 = 0;
2304                 if (!intent_disposition(rep, DISP_LOOKUP_POS) ||
2305                     rep->lock_policy_res2)
2306                         RETURN(ELDLM_LOCK_ABORTED);
2307                 if (req->rq_status != 0) {
2308                         LBUG();
2309                         rep->lock_policy_res2 = req->rq_status;
2310                         RETURN(ELDLM_LOCK_ABORTED);
2311                 }
2312                 break;
2313         default:
2314                 CERROR("Unhandled intent "LPD64"\n", it->opc);
2315                 LBUG();
2316         }
2317
2318         /* By this point, whatever function we called above must have either
2319          * filled in 'lockh', been an intent replay, or returned an error.  We
2320          * want to allow replayed RPCs to not get a lock, since we would just
2321          * drop it below anyways because lock replay is done separately by the
2322          * client afterwards.  For regular RPCs we want to give the new lock to
2323          * the client instead of whatever lock it was about to get. */
2324         new_lock = ldlm_handle2lock(&lockh);
2325         if (new_lock == NULL && (flags & LDLM_FL_INTENT_ONLY))
2326                 RETURN(0);
2327
2328         LASSERT(new_lock != NULL);
2329
2330         /* If we've already given this lock to a client once, then we should
2331          * have no readers or writers.  Otherwise, we should have one reader
2332          * _or_ writer ref (which will be zeroed below) before returning the
2333          * lock to a client. */
2334         if (new_lock->l_export == req->rq_export) {
2335                 LASSERT(new_lock->l_readers + new_lock->l_writers == 0);
2336         } else {
2337                 LASSERT(new_lock->l_export == NULL);
2338                 LASSERT(new_lock->l_readers + new_lock->l_writers == 1);
2339         }
2340
2341         *lockp = new_lock;
2342
2343         if (new_lock->l_export == req->rq_export) {
2344                 /* Already gave this to the client, which means that we
2345                  * reconstructed a reply. */
2346                 LASSERT(lustre_msg_get_flags(req->rq_reqmsg) &
2347                         MSG_RESENT);
2348                 RETURN(ELDLM_LOCK_REPLACED);
2349         }
2350
2351         /* Fixup the lock to be given to the client */
2352         l_lock(&new_lock->l_resource->lr_namespace->ns_lock);
2353         new_lock->l_readers = 0;
2354         new_lock->l_writers = 0;
2355
2356         new_lock->l_export = class_export_get(req->rq_export);
2357         list_add(&new_lock->l_export_chain,
2358                  &new_lock->l_export->exp_ldlm_data.led_held_locks);
2359
2360         new_lock->l_blocking_ast = lock->l_blocking_ast;
2361         new_lock->l_completion_ast = lock->l_completion_ast;
2362
2363         memcpy(&new_lock->l_remote_handle, &lock->l_remote_handle,
2364                sizeof(lock->l_remote_handle));
2365
2366         new_lock->l_flags &= ~LDLM_FL_LOCAL;
2367
2368         LDLM_LOCK_PUT(new_lock);
2369         l_unlock(&new_lock->l_resource->lr_namespace->ns_lock);
2370
2371         RETURN(ELDLM_LOCK_REPLACED);
2372 }
2373
2374 int mds_attach(struct obd_device *dev, obd_count len, void *data)
2375 {
2376         struct lprocfs_static_vars lvars;
2377
2378         lprocfs_init_multi_vars(0, &lvars);
2379         return lprocfs_obd_attach(dev, lvars.obd_vars);
2380 }
2381
2382 int mds_detach(struct obd_device *dev)
2383 {
2384         return lprocfs_obd_detach(dev);
2385 }
2386
2387 int mdt_attach(struct obd_device *dev, obd_count len, void *data)
2388 {
2389         struct lprocfs_static_vars lvars;
2390
2391         lprocfs_init_multi_vars(1, &lvars);
2392         return lprocfs_obd_attach(dev, lvars.obd_vars);
2393 }
2394
2395 int mdt_detach(struct obd_device *dev)
2396 {
2397         return lprocfs_obd_detach(dev);
2398 }
2399
2400 static int mdt_setup(struct obd_device *obd, obd_count len, void *buf)
2401 {
2402         struct mds_obd *mds = &obd->u.mds;
2403         int rc = 0;
2404         ENTRY;
2405
2406         mds->mds_service =
2407                 ptlrpc_init_svc(MDS_NBUFS, MDS_BUFSIZE, MDS_MAXREQSIZE,
2408                                 MDS_REQUEST_PORTAL, MDC_REPLY_PORTAL,
2409                                 mds_handle, "mds", obd->obd_proc_entry);
2410
2411         if (!mds->mds_service) {
2412                 CERROR("failed to start service\n");
2413                 RETURN(-ENOMEM);
2414         }
2415
2416         rc = ptlrpc_start_n_threads(obd, mds->mds_service, MDT_NUM_THREADS,
2417                                     "ll_mdt");
2418         if (rc)
2419                 GOTO(err_thread, rc);
2420
2421         mds->mds_setattr_service =
2422                 ptlrpc_init_svc(MDS_NBUFS, MDS_BUFSIZE, MDS_MAXREQSIZE,
2423                                 MDS_SETATTR_PORTAL, MDC_REPLY_PORTAL,
2424                                 mds_handle, "mds_setattr",
2425                                 obd->obd_proc_entry);
2426         if (!mds->mds_setattr_service) {
2427                 CERROR("failed to start getattr service\n");
2428                 GOTO(err_thread, rc = -ENOMEM);
2429         }
2430
2431         rc = ptlrpc_start_n_threads(obd, mds->mds_setattr_service,
2432                                     MDT_NUM_THREADS, "ll_mdt_attr");
2433         if (rc)
2434                 GOTO(err_thread2, rc);
2435
2436         mds->mds_readpage_service =
2437                 ptlrpc_init_svc(MDS_NBUFS, MDS_BUFSIZE, MDS_MAXREQSIZE,
2438                                 MDS_READPAGE_PORTAL, MDC_REPLY_PORTAL,
2439                                 mds_handle, "mds_readpage",
2440                                 obd->obd_proc_entry);
2441         if (!mds->mds_readpage_service) {
2442                 CERROR("failed to start readpage service\n");
2443                 GOTO(err_thread2, rc = -ENOMEM);
2444         }
2445
2446         rc = ptlrpc_start_n_threads(obd, mds->mds_readpage_service,
2447                                     MDT_NUM_THREADS, "ll_mdt_rdpg");
2448
2449         if (rc)
2450                 GOTO(err_thread3, rc);
2451
2452         RETURN(0);
2453
2454 err_thread3:
2455         ptlrpc_unregister_service(mds->mds_readpage_service);
2456 err_thread2:
2457         ptlrpc_unregister_service(mds->mds_setattr_service);
2458 err_thread:
2459         ptlrpc_unregister_service(mds->mds_service);
2460         return rc;
2461 }
2462
2463 static int mdt_cleanup(struct obd_device *obd, int flags)
2464 {
2465         struct mds_obd *mds = &obd->u.mds;
2466         ENTRY;
2467
2468         ptlrpc_stop_all_threads(mds->mds_readpage_service);
2469         ptlrpc_unregister_service(mds->mds_readpage_service);
2470
2471         ptlrpc_stop_all_threads(mds->mds_setattr_service);
2472         ptlrpc_unregister_service(mds->mds_setattr_service);
2473
2474         ptlrpc_stop_all_threads(mds->mds_service);
2475         ptlrpc_unregister_service(mds->mds_service);
2476
2477         RETURN(0);
2478 }
2479
2480 static struct dentry *mds_lvfs_fid2dentry(__u64 id, __u32 gen, __u64 gr,
2481                                           void *data)
2482 {
2483         struct obd_device *obd = data;
2484         struct ll_fid fid;
2485         fid.id = id;
2486         fid.generation = gen;
2487         return mds_fid2dentry(&obd->u.mds, &fid, NULL);
2488 }
2489
2490 static int mds_get_info(struct obd_export *exp, __u32 keylen,
2491                         void *key, __u32 *vallen, void *val)
2492 {
2493         struct obd_device *obd;
2494         struct mds_obd *mds;
2495         ENTRY;
2496
2497         obd = class_exp2obd(exp);
2498         if (obd == NULL) {
2499                 CDEBUG(D_IOCTL, "invalid client cookie "LPX64"\n",
2500                        exp->exp_handle.h_cookie);
2501                 RETURN(-EINVAL);
2502         }
2503
2504         if (keylen >= strlen("reint_log") && memcmp(key, "reint_log", 9) == 0) {
2505                 /*Get log_context handle*/
2506                 unsigned long *llh_handle = val;
2507                 *vallen = sizeof(unsigned long);
2508                 *llh_handle = (unsigned long)obd->obd_llog_ctxt[LLOG_REINT_ORIG_CTXT];
2509                 RETURN(0);
2510         }
2511         if (keylen >= strlen("cache_sb") && memcmp(key, "cache_sb", 8) == 0) {
2512                 /*Get log_context handle*/
2513                 unsigned long *sb = val;
2514                 *vallen = sizeof(unsigned long);
2515                 *sb = (unsigned long)obd->u.mds.mds_sb;
2516                 RETURN(0);
2517         }
2518
2519         mds = &obd->u.mds;
2520         keylen == strlen("mdsize");
2521         if (keylen && memcmp(key, "mdsize", keylen) == 0) {
2522                 __u32 *mdsize = val;
2523                 *vallen = sizeof(*mdsize);
2524                 *mdsize = mds->mds_max_mdsize;
2525                 RETURN(0);
2526         }
2527
2528         CDEBUG(D_IOCTL, "invalid key\n");
2529         RETURN(-EINVAL);
2530
2531 }
2532 struct lvfs_callback_ops mds_lvfs_ops = {
2533         l_fid2dentry:     mds_lvfs_fid2dentry,
2534 };
2535
2536 int mds_preprw(int cmd, struct obd_export *exp, struct obdo *oa,
2537                 int objcount, struct obd_ioobj *obj,
2538                 int niocount, struct niobuf_remote *nb,
2539                 struct niobuf_local *res,
2540                 struct obd_trans_info *oti);
2541 int mds_commitrw(int cmd, struct obd_export *exp, struct obdo *oa,
2542                  int objcount, struct obd_ioobj *obj, int niocount,
2543                  struct niobuf_local *res, struct obd_trans_info *oti,
2544                  int rc);
2545
2546 /* use obd ops to offer management infrastructure */
2547 static struct obd_ops mds_obd_ops = {
2548         .o_owner           = THIS_MODULE,
2549         .o_attach          = mds_attach,
2550         .o_detach          = mds_detach,
2551         .o_connect         = mds_connect,
2552         .o_init_export     = mds_init_export,
2553         .o_destroy_export  = mds_destroy_export,
2554         .o_disconnect      = mds_disconnect,
2555         .o_setup           = mds_setup,
2556         .o_precleanup      = mds_precleanup,
2557         .o_cleanup         = mds_cleanup,
2558         .o_postrecov       = mds_postrecov,
2559         .o_statfs          = mds_obd_statfs,
2560         .o_iocontrol       = mds_iocontrol,
2561         .o_create          = mds_obd_create,
2562         .o_destroy         = mds_obd_destroy,
2563         .o_llog_init       = mds_llog_init,
2564         .o_llog_finish     = mds_llog_finish,
2565         .o_notify          = mds_notify,
2566         .o_get_info        = mds_get_info,
2567         .o_set_info        = mds_set_info,
2568         .o_preprw          = mds_preprw, 
2569         .o_commitrw        = mds_commitrw,
2570 };
2571
2572 static struct obd_ops mdt_obd_ops = {
2573         .o_owner           = THIS_MODULE,
2574         .o_attach          = mdt_attach,
2575         .o_detach          = mdt_detach,
2576         .o_setup           = mdt_setup,
2577         .o_cleanup         = mdt_cleanup,
2578         .o_attach          = mdt_attach,
2579         .o_detach          = mdt_detach,
2580 };
2581
2582 static int __init mds_init(void)
2583 {
2584         struct lprocfs_static_vars lvars;
2585
2586         lprocfs_init_multi_vars(0, &lvars);
2587         class_register_type(&mds_obd_ops, NULL, lvars.module_vars,
2588                             LUSTRE_MDS_NAME);
2589         lprocfs_init_multi_vars(1, &lvars);
2590         class_register_type(&mdt_obd_ops, NULL, lvars.module_vars,
2591                             LUSTRE_MDT_NAME);
2592
2593         return 0;
2594 }
2595
2596 static void /*__exit*/ mds_exit(void)
2597 {
2598         class_unregister_type(LUSTRE_MDS_NAME);
2599         class_unregister_type(LUSTRE_MDT_NAME);
2600 }
2601
2602 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
2603 MODULE_DESCRIPTION("Lustre Metadata Server (MDS)");
2604 MODULE_LICENSE("GPL");
2605
2606 module_init(mds_init);
2607 module_exit(mds_exit);